Dask cuDF Best Practices#
This page outlines several important guidelines for using Dask cuDF effectively.
Note
Since Dask cuDF is a backend extension for Dask DataFrame, the guidelines discussed in the Dask DataFrames Best Practices documentation also apply to Dask cuDF (excluding any pandas-specific details).
Deployment and Configuration#
Use Dask-CUDA#
To execute a Dask workflow on multiple GPUs, a Dask cluster must be deployed with Dask-CUDA and Dask.distributed.
When running on a single machine, the LocalCUDACluster convenience function is strongly recommended. No matter how many GPUs are available on the machine (even one!), using Dask-CUDA has many advantages over default (threaded) execution. Just to list a few:
Dask-CUDA makes it easy to pin workers to specific devices.
Dask-CUDA makes it easy to configure memory-spilling options.
The distributed scheduler collects useful diagnostic information that can be viewed on a dashboard in real time.
Please see Dask-CUDA’s API
and Best Practices
documentation for detailed information. Typical LocalCUDACluster
usage
is also illustrated within the multi-GPU section of Dask cuDF’s documentation.
Note
When running on cloud infrastructure or HPC systems, it is usually best to leverage system-specific deployment libraries like Dask Operator and Dask-Jobqueue.
Please see the RAPIDS deployment documentation for further details and examples.
Use diagnostic tools#
The Dask ecosystem includes several diagnostic tools that you should absolutely use. These tools include an intuitive browser dashboard as well as a dedicated API for collecting performance profiles.
No matter the workflow, using the dashboard is strongly recommended.
It provides a visual representation of the worker resources and compute
progress. It also shows basic GPU memory and utilization metrics (under
the GPU
tab). To visualize more detailed GPU metrics in JupyterLab,
use NVDashboard.
Enable cuDF spilling#
When using Dask cuDF for classic ETL workloads, it is usually best
to enable native spilling support in cuDF.
When using LocalCUDACluster()
, this is easily accomplished by
setting enable_cudf_spill=True
.
When a Dask cuDF workflow includes conversion between DataFrame and Array representations, native cuDF spilling may be insufficient. For these cases, JIT-unspill is likely to produce better protection from out-of-memory (OOM) errors. Please see Dask-CUDA’s spilling documentation for further details and guidance.
Use RMM#
Memory allocations in cuDF are significantly faster and more efficient when
the RAPIDS Memory Manager (RMM)
library is configured appropriately on worker processes. In most cases, the best way to manage
memory is by initializing an RMM pool on each worker before executing a
workflow. When using LocalCUDACluster()
, this is easily accomplished
by setting rmm_pool_size
to a large fraction (e.g. 0.9
).
See the Dask-CUDA memory-management documentation for more details.
Use the Dask DataFrame API#
Although Dask cuDF provides a public dask_cudf
Python module, we
strongly recommended that you use the CPU/GPU portable dask.dataframe
API instead. Simply use the Dask configuration system
to set the "dataframe.backend"
option to "cudf"
, and the
dask_cudf
module will be imported and used implicitly.
Be sure to use the to_backend()
method if you need to convert
between the different DataFrame backends. For example:
df = df.to_backend("pandas") # This gives us a pandas-backed collection
Note
Although to_backend()
makes it easy to move data between pandas
and cuDF, repetitive CPU-GPU data movement can degrade performance
significantly. For optimal results, keep your data on the GPU as much
as possible.
Avoid eager execution#
Although Dask DataFrame collections are lazy by default, there are several notable methods that will result in the immediate execution of the underlying task graph:
compute()
: Calling ddf.compute()
will materialize the result of
ddf
and return a single cuDF object. This is done by executing the entire
task graph associated with ddf
and concatenating its partitions in
local memory on the client process.
Note
Never call compute()
on a large collection that cannot fit comfortably
in the memory of a single GPU!
persist()
: Like compute()
, calling ddf.persist()
will
execute the entire task graph associated with ddf
. The most important
difference is that the computed partitions will remain in distributed
worker memory instead of being concatenated together on the client process.
Another difference is that persist()
will return immediately when
executing on a distributed cluster. If you need a blocking synchronization
point in your workflow, simply use the wait()
function:
ddf = ddf.persist()
wait(ddf)
Note
Avoid calling persist()
on a large collection that cannot fit comfortably
in global worker memory. If the total sum of the partition sizes is larger
than the sum of all GPU memory, calling persist will result in significant
spilling from device memory. If the individual partition sizes are large, this
is likely to produce an OOM error.
len()
/ head()
/ tail()
: Although these operations are used
often within pandas/cuDF code to quickly inspect data, it is best to avoid
them in Dask DataFrame. In most cases, these operations will execute some or all
of the underlying task graph to materialize the collection.
sort_values()
/ set_index()
: These operations both require Dask to
eagerly collect quantile information about the column(s) being targeted by the
global sort operation. See the next section for notes on sorting considerations.
Note
When using set_index()
, be sure to pass in sort=False
whenever the
global collection does not need to be sorted by the new index.
Avoid Sorting#
The design of Dask DataFrame makes it advantageous to work with data that is already sorted along its index at creation time. For most other cases, it is best to avoid sorting unless the logic of the workflow makes global ordering absolutely necessary.
If the purpose of a sort_values()
operation is to ensure that all unique
values in by
will be moved to the same output partition, then shuffle
is often the better option.
Reading Data#
Tune the partition size#
The ideal partition size is usually between 1/32 and 1/8 the memory capacity of a single GPU. Increasing the partition size will typically reduce the number of tasks in your workflow and improve the GPU utilization for each task. However, if the partitions are too large, the risk of OOM errors can become significant.
Note
As a general rule of thumb, start with 1/32-1/16 for shuffle-intensive workflows (e.g. large-scale sorting and joining), and 1/16-1/8 otherwise. For pathologically skewed data distributions, it may be necessary to target 1/64 or smaller. This rule of thumb comes from anecdotal optimization and OOM-debugging experience. Since every workflow is different, choosing the best partition size is both an art and a science.
The easiest way to tune the partition size is when the DataFrame collection
is first created by a function like read_parquet()
, read_csv()
,
or from_map()
. For example, both read_parquet()
and read_csv()
expose a blocksize
argument for adjusting the maximum partition size.
If the partition size cannot be tuned effectively at creation time, the repartition method can be used as a last resort.
Use Parquet#
Parquet is the recommended file format for Dask cuDF. It provides efficient columnar storage and enables Dask to perform valuable query optimizations like column projection and predicate pushdown.
The most important arguments to read_parquet()
are blocksize
and
aggregate_files
:
blocksize
: Use this argument to specify the maximum partition size.
The default is “256 MiB”, but larger values are usually more performant
on GPUs with more than 8 GiB of memory. Dask will use the blocksize
value to map a discrete number of Parquet row-groups (or files) to each
output partition. This mapping will only account for the uncompressed
storage size of each row group, which is usually smaller than the
correspondng cudf.DataFrame
.
aggregate_files
: Use this argument to specify whether Dask should
map multiple files to the same DataFrame partition. The default is
False
, but aggregate_files=True
is usually more performant when
the dataset contains many files that are smaller than half of blocksize
.
If you know that your files correspond to a reasonable partition size
before splitting or aggregation, set blocksize=None
to disallow
file splitting. In the absence of column-projection pushdown, this will
result in a simple 1-to-1 mapping between files and output partitions.
Note
If your workflow requires a strict 1-to-1 mapping between files and
partitions, use from_map()
to manually construct your partitions
with cudf.read_parquet
. When dd.read_parquet()
is used,
query-planning optimizations may automatically aggregate distinct files
into the same partition (even when aggregate_files=False
).
Note
Metadata collection can be extremely slow when reading from remote
storage (e.g. S3 and GCS). When reading many remote files that all
correspond to a reasonable partition size, use blocksize=None
to avoid unnecessary metadata collection.
Note
When reading from remote storage (e.g. S3 and GCS), performance will
likely improve with filesystem="arrow"
. When this option is set,
PyArrow will be used to perform IO on multiple CPU threads. Please be
aware that this feature is experimental, and behavior may change in
the future (without deprecation). Do not pass in blocksize
or
aggregate_files
when this feature is used. Instead, set the
"dataframe.parquet.minimum-partition-size"
config to control
file aggregation.
Use from_map()
#
To implement custom DataFrame-creation logic that is not covered by
existing APIs (like read_parquet()
), use dask.dataframe.from_map()
whenever possible. The from_map()
API has several advantages
over from_delayed()
:
It allows proper lazy execution of your custom logic
It enables column projection (as long as the mapped function supports a
columns
key-word argument)
See the from_map API documentation for more details.
Note
Whenever possible, be sure to specify the meta
argument to
from_map()
. If this argument is excluded, Dask will need to
materialize the first partition eagerly. If a large RMM pool is in
use on the first visible device, this eager execution on the client
may lead to an OOM error.
Sorting, Joining, and Grouping#
Sorting, joining, and grouping operations all have the potential to require the global shuffling of data between distinct partitions. When the initial data fits comfortably in global GPU memory, these “all-to-all” operations are typically bound by worker-to-worker communication. When the data is larger than global GPU memory, the bottleneck is typically device-to-host memory spilling.
Although every workflow is different, the following guidelines are often recommended:
Use a distributed cluster with Dask-CUDA workers
Use native cuDF spilling whenever possible (Dask-CUDA spilling documentation)
- Avoid shuffling whenever possible
Use
split_out=1
for low-cardinality groupby aggregationsUse
broadcast=True
for joins when at least one collection comprises a small number of partitions (e.g.<=5
)
Use UCX if communication is a bottleneck.
Note
UCX enables Dask-CUDA workers to communicate using high-performance tansport technologies like NVLink and Infiniband. Without UCX, inter-process communication will rely on TCP sockets.
User-defined functions#
Most real-world Dask DataFrame workflows use map_partitions
to map user-defined functions across every partition of the underlying data.
This API is a fantastic way to apply custom operations in an intuitive and
scalable way. With that said, the map_partitions()
method will produce
an opaque DataFrame expression that blocks the query-planning optimizer from performing
useful optimizations (like projection and filter pushdown).
Since column-projection pushdown is often the most effective optimization,
it is important to select the necessary columns both before and after calling
map_partitions()
. You can also add explicit filter operations to further
mitigate the loss of filter pushdown.