Welcome to Dask cuDF’s documentation!#

Dask cuDF (pronounced “DASK KOO-dee-eff”) is an extension library for the Dask parallel computing framework. When installed, Dask cuDF is automatically registered as the "cudf" dataframe backend for Dask DataFrame.

Note

Neither Dask cuDF nor Dask DataFrame provide support for multi-GPU or multi-node execution on their own. You must also deploy a dask.distributed cluster to leverage multiple GPUs. We strongly recommend using Dask-CUDA to simplify the setup of the cluster, taking advantage of all features of the GPU and networking hardware.

If you are familiar with Dask and pandas or cuDF, then Dask cuDF should feel familiar to you. If not, we recommend starting with 10 minutes to Dask followed by 10 minutes to cuDF and Dask cuDF.

After reviewing the sections below, please see the Best Practices page for further guidance on using Dask cuDF effectively.

Using Dask cuDF#

The explicit Dask cuDF API#

In addition to providing the "cudf" backend for Dask DataFrame, Dask cuDF also provides an explicit dask_cudf API:

import dask_cudf

# This always gives us a cuDF-backed dataframe
df = dask_cudf.read_parquet("data.parquet", ...)

This API is used implicitly by the Dask DataFrame API when the "cudf" backend is enabled. Therefore, using it directly will not provide any performance benefit over the CPU/GPU-portable dask.dataframe API. Also, using some parts of the explicit API are incompatible with automatic query planning (see the next section).

Query Planning#

Dask cuDF now provides automatic query planning by default (RAPIDS 24.06+). As long as the "dataframe.query-planning" configuration is set to True (the default) when dask.dataframe is first imported, Dask Expressions will be used under the hood.

For example, the following code will automatically benefit from predicate pushdown when the result is computed:

df = dd.read_parquet("/my/parquet/dataset/")
result = df.sort_values('B')['A']

Unoptimized expression graph (df.pprint()):

Projection: columns='A'
  SortValues: by=['B'] shuffle_method='tasks' options={}
    ReadParquetFSSpec: path='/my/parquet/dataset/' ...

Simplified expression graph (df.simplify().pprint()):

Projection: columns='A'
  SortValues: by=['B'] shuffle_method='tasks' options={}
    ReadParquetFSSpec: path='/my/parquet/dataset/' columns=['A', 'B'] ...

Note

Dask will automatically simplify the expression graph (within optimize()) when the result is converted to a task graph (via compute() or persist()). You do not need to call simplify() yourself.

Using Multiple GPUs and Multiple Nodes#

Whenever possible, Dask cuDF (i.e. Dask DataFrame) will automatically try to partition your data into small-enough tasks to fit comfortably in the memory of a single GPU. This means the necessary compute tasks needed to compute a query can often be streamed to a single GPU process for out-of-core computing. This also means that the compute tasks can be executed in parallel over a multi-GPU cluster.

In order to execute your Dask workflow on multiple GPUs, you will typically need to use Dask-CUDA to deploy distributed Dask cluster, and Distributed to define a client object. For example:

from dask_cuda import LocalCUDACluster
from distributed import Client

if __name__ == "__main__":

  client = Client(
    LocalCUDACluster(
      CUDA_VISIBLE_DEVICES="0,1",  # Use two workers (on devices 0 and 1)
      rmm_pool_size=0.9,  # Use 90% of GPU memory as a pool for faster allocations
      enable_cudf_spill=True,  # Improve device memory stability
      local_directory="/fast/scratch/",  # Use fast local storage for spilling
    )
  )

  df = dd.read_parquet("/my/parquet/dataset/")
  agg = df.groupby('B').sum()
  agg.compute()  # This will use the cluster defined above

Note

This example uses compute() to materialize a concrete cudf.DataFrame object in local memory. Never call compute() on a large collection that cannot fit comfortably in the memory of a single GPU! See Dask’s documentation on managing computation for more details.

Please see the Dask-CUDA documentation for more information about deploying GPU-aware clusters (including best practices).

API Reference#

Generally speaking, Dask cuDF tries to offer exactly the same API as Dask DataFrame. There are, however, some minor differences mostly because cuDF does not perfectly mirror the pandas API, or because cuDF provides additional configuration flags (these mostly occur in data reading and writing interfaces).

As a result, straightforward workflows can be migrated without too much trouble, but more complex ones that utilise more features may need a bit of tweaking. The API documentation describes details of the differences and all functionality that Dask cuDF supports.

Indices and tables#