Dask#

DaskEngine runs the streaming executor on a Dask distributed cluster: one Dask worker per GPU, coordinated by a single client process. Partitions are streamed through the query plan and collective operations (shuffles, allgathers, joins) run across workers over a shared UCXX communicator. On startup, each worker is pinned to the CPU cores and NUMA node closest to its GPU (see Pre-configured GPU clusters below).

import polars as pl
from cudf_polars.engine.dask import DaskEngine

with DaskEngine() as engine:
    result = (
        pl.scan_parquet("/data/dataset/*.parquet")
          .filter(pl.col("amount") > 100)
          .group_by("customer_id")
          .agg(pl.col("amount").sum())
          .collect(engine=engine)
    )
    print(result)

With no arguments, DaskEngine creates a distributed.LocalCluster with one worker per visible GPU, a distributed.Client, and bootstraps a UCXX communicator across all workers. On exit, everything it created is torn down.

Note

.collect() pulls the full result back to the client process. For large distributed outputs, prefer .sink_*() or aggregate/sample inside the query before .collect(). See Result collection.

Configuring DaskEngine#

For custom configuration, build StreamingOptions and use DaskEngine.from_options():

import polars as pl
from cudf_polars.engine.options import StreamingOptions
from cudf_polars.engine.dask import DaskEngine

opts = StreamingOptions(num_streaming_threads=8, fallback_mode="silent")

with DaskEngine.from_options(opts) as engine:
    result = pl.scan_parquet("/data/dataset/*.parquet").collect(engine=engine)

See Configuration Options for the available fields.

Bring your own Dask client#

Pass an existing distributed.Client via dask_client= to attach to an already-running scheduler:

from distributed import Client
import polars as pl
from cudf_polars.engine.dask import DaskEngine

with Client("scheduler-address:8786") as dc:
    with DaskEngine(dask_client=dc) as engine:
        result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)

When you supply the client, DaskEngine leaves it (and the cluster) alone on exit.

Pre-configured GPU clusters#

Some Dask launchers, notably dask_cuda.LocalCUDACluster, already pin CPU affinity and set CUDA_VISIBLE_DEVICES per worker. Disable the built-in hardware binding via HardwareBindingPolicy to avoid having both layers fight over each worker’s affinity (the second to run wins, which makes the resulting placement non-deterministic):

from dask_cuda import LocalCUDACluster
from distributed import Client
from cudf_polars.engine.dask import DaskEngine
from cudf_polars.engine.hardware_binding import (
    HardwareBindingPolicy,
)

with Client(LocalCUDACluster()) as dc, DaskEngine(
    dask_client=dc,
    engine_options={
        "hardware_binding": HardwareBindingPolicy(enabled=False),
    },
) as engine:
    ...

Manually launched workers#

When launching workers yourself (for example on a multi-node HPC cluster), use the built-in nanny preload to assign one GPU per worker. The preload sets CUDA_VISIBLE_DEVICES on each worker before the process spawns:

# On each node, launch one worker per GPU with a single thread each:
dask worker SCHEDULER_ADDRESS:8786 --nworkers N --nthreads 1 \
    --preload-nanny cudf_polars.engine.dask

Then connect from the client:

import polars as pl
from distributed import Client
from cudf_polars.engine.dask import DaskEngine

with Client("SCHEDULER_ADDRESS:8786") as dc:
    with DaskEngine(dask_client=dc) as engine:
        result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)

Hardware binding (CPU affinity, NUMA, network) is handled automatically by DaskEngine; the nanny preload only deals with GPU assignment.

See the Dask CLI deployment guide for more on dask worker options.

Using dask-cuda-worker#

As an alternative to the built-in nanny preload, you can launch workers with dask-cuda-worker from the dask-cuda project. It launches one worker per visible GPU and installs a set of plugins on every worker: a CPUAffinity plugin that pins the worker to the NUMA node of its GPU, an RMMSetup plugin, and a nanny preload that configures UCX.

DaskEngine sets up the same things for its own streaming runtime, so the two need to be coordinated or they will fight:

  • CPU affinity is unconditional in dask-cuda-worker, the CPUAffinity plugin is always installed and there is no CLI flag to turn it off. Pass hardware_binding=HardwareBindingPolicy(enabled=False) to DaskEngine so it does not try to re-pin affinity on top of dask-cuda’s binding.

  • Do not pass --rmm-pool-size, --rmm-managed-memory, or similar RMM flags to dask-cuda-worker. Let DaskEngine own the memory resource via its memory_resource_config (see Configuration Options) otherwise two different memory resources will be installed on the same worker.

  • Do not pass --enable-tcp-over-ucx, --enable-infiniband, --enable-nvlink, or --enable-rdmacm to dask-cuda-worker. DaskEngine bootstraps its own UCXX communicator and will select transports itself. Enabling them on both sides can produce inconsistent UCX configuration across the cluster.

# On each node, GPU assignment + CPU affinity only (no RMM, no UCX flags):
dask-cuda-worker SCHEDULER_ADDRESS:8786
import polars as pl
from distributed import Client
from cudf_polars.engine.dask import DaskEngine
from cudf_polars.engine.hardware_binding import (
    HardwareBindingPolicy,
)

with Client("SCHEDULER_ADDRESS:8786") as dc:
    with DaskEngine(
        dask_client=dc,
        engine_options={
            # dask-cuda-worker always pins CPU affinity; disable DaskEngine's
            # binding so the two don't conflict.
            "hardware_binding": HardwareBindingPolicy(enabled=False),
        },
    ) as engine:
        result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)

Cluster diagnostics#

gather_cluster_info() returns placement information for every worker:

with DaskEngine() as engine:
    print(f"cluster has {engine.nranks} workers")
    for info in engine.gather_cluster_info():
        print(
            f"hostname={info['hostname']}, pid={info['pid']}, "
            f"CUDA_VISIBLE_DEVICES={info['cuda_visible_devices']}"
        )

DaskEngine raises RuntimeError if created inside an rrun cluster.