Welcome to Dask cuDF’s documentation!#
Dask cuDF (pronounced “DASK KOO-dee-eff”) is an extension
library for the Dask parallel computing
framework. When installed, Dask cuDF is automatically registered
as the "cudf"
dataframe backend for
Dask DataFrame.
Note
Neither Dask cuDF nor Dask DataFrame provide support for multi-GPU or multi-node execution on their own. You must also deploy a dask.distributed cluster to leverage multiple GPUs. We strongly recommend using Dask-CUDA to simplify the setup of the cluster, taking advantage of all features of the GPU and networking hardware.
If you are familiar with Dask and pandas or cuDF, then Dask cuDF should feel familiar to you. If not, we recommend starting with 10 minutes to Dask followed by 10 minutes to cuDF and Dask cuDF.
After reviewing the sections below, please see the Best Practices page for further guidance on using Dask cuDF effectively.
Using Dask cuDF#
The Dask DataFrame API (Recommended)#
Simply use the Dask configuration
system to set the "dataframe.backend"
option to "cudf"
.
From Python, this can be achieved like so:
import dask
dask.config.set({"dataframe.backend": "cudf"})
Alternatively, you can set DASK_DATAFRAME__BACKEND=cudf
in the
environment before running your code.
Once this is done, the public Dask DataFrame API will leverage
cudf
automatically when a new DataFrame collection is created
from an on-disk format using any of the following dask.dataframe
functions:
read_parquet()
read_json()
read_csv()
read_orc()
read_hdf()
from_dict()
For example:
import dask.dataframe as dd
# By default, we obtain a pandas-backed dataframe
df = dd.read_parquet("data.parquet", ...)
import dask
dask.config.set({"dataframe.backend": "cudf"})
# This now gives us a cuDF-backed dataframe
df = dd.read_parquet("data.parquet", ...)
When other functions are used to create a new collection
(e.g. from_map()
, from_pandas()
, from_delayed()
,
and from_array()
), the backend of the new collection will
depend on the inputs to those functions. For example:
import pandas as pd
import cudf
# This gives us a pandas-backed dataframe
dd.from_pandas(pd.DataFrame({"a": range(10)}))
# This gives us a cuDF-backed dataframe
dd.from_pandas(cudf.DataFrame({"a": range(10)}))
An existing collection can always be moved to a specific backend
using the dask.dataframe.DataFrame.to_backend()
API:
# This ensures that we have a cuDF-backed dataframe
df = df.to_backend("cudf")
# This ensures that we have a pandas-backed dataframe
df = df.to_backend("pandas")
The explicit Dask cuDF API#
In addition to providing the "cudf"
backend for Dask DataFrame,
Dask cuDF also provides an explicit dask_cudf
API:
import dask_cudf
# This always gives us a cuDF-backed dataframe
df = dask_cudf.read_parquet("data.parquet", ...)
This API is used implicitly by the Dask DataFrame API when the "cudf"
backend is enabled. Therefore, using it directly will not provide any
performance benefit over the CPU/GPU-portable dask.dataframe
API.
Also, using some parts of the explicit API are incompatible with
automatic query planning (see the next section).
Query Planning#
Dask cuDF now provides automatic query planning by default (RAPIDS 24.06+).
As long as the "dataframe.query-planning"
configuration is set to
True
(the default) when dask.dataframe
is first imported, Dask
Expressions will be used under the hood.
For example, the following code will automatically benefit from predicate pushdown when the result is computed:
df = dd.read_parquet("/my/parquet/dataset/")
result = df.sort_values('B')['A']
Unoptimized expression graph (df.pprint()
):
Projection: columns='A'
SortValues: by=['B'] shuffle_method='tasks' options={}
ReadParquetFSSpec: path='/my/parquet/dataset/' ...
Simplified expression graph (df.simplify().pprint()
):
Projection: columns='A'
SortValues: by=['B'] shuffle_method='tasks' options={}
ReadParquetFSSpec: path='/my/parquet/dataset/' columns=['A', 'B'] ...
Note
Dask will automatically simplify the expression graph (within
optimize()
) when the result is converted to a task graph
(via compute()
or persist()
). You do not need to call
simplify()
yourself.
Using Multiple GPUs and Multiple Nodes#
Whenever possible, Dask cuDF (i.e. Dask DataFrame) will automatically try to partition your data into small-enough tasks to fit comfortably in the memory of a single GPU. This means the necessary compute tasks needed to compute a query can often be streamed to a single GPU process for out-of-core computing. This also means that the compute tasks can be executed in parallel over a multi-GPU cluster.
In order to execute your Dask workflow on multiple GPUs, you will typically need to use Dask-CUDA to deploy distributed Dask cluster, and Distributed to define a client object. For example:
from dask_cuda import LocalCUDACluster
from distributed import Client
if __name__ == "__main__":
client = Client(
LocalCUDACluster(
CUDA_VISIBLE_DEVICES="0,1", # Use two workers (on devices 0 and 1)
rmm_pool_size=0.9, # Use 90% of GPU memory as a pool for faster allocations
enable_cudf_spill=True, # Improve device memory stability
local_directory="/fast/scratch/", # Use fast local storage for spilling
)
)
df = dd.read_parquet("/my/parquet/dataset/")
agg = df.groupby('B').sum()
agg.compute() # This will use the cluster defined above
Note
This example uses compute()
to materialize a concrete
cudf.DataFrame
object in local memory. Never call compute()
on a large collection that cannot fit comfortably in the memory of a
single GPU! See Dask’s documentation on managing computation
for more details.
Please see the Dask-CUDA documentation for more information about deploying GPU-aware clusters (including best practices).
API Reference#
Generally speaking, Dask cuDF tries to offer exactly the same API as Dask DataFrame. There are, however, some minor differences mostly because cuDF does not perfectly mirror the pandas API, or because cuDF provides additional configuration flags (these mostly occur in data reading and writing interfaces).
As a result, straightforward workflows can be migrated without too much trouble, but more complex ones that utilise more features may need a bit of tweaking. The API documentation describes details of the differences and all functionality that Dask cuDF supports.