Dask#
DaskEngine runs the streaming executor
on a Dask distributed cluster: one Dask worker per GPU, coordinated by a
single client process. Partitions are streamed through the query plan and collective operations
(shuffles, allgathers, joins) run across workers over a shared UCXX communicator. On startup,
each worker is pinned to the CPU cores and NUMA node closest to its GPU (see
Pre-configured GPU clusters below).
import polars as pl
from cudf_polars.engine.dask import DaskEngine
with DaskEngine() as engine:
result = (
pl.scan_parquet("/data/dataset/*.parquet")
.filter(pl.col("amount") > 100)
.group_by("customer_id")
.agg(pl.col("amount").sum())
.collect(engine=engine)
)
print(result)
With no arguments, DaskEngine creates a
distributed.LocalCluster with one worker per visible GPU, a distributed.Client, and
bootstraps a UCXX communicator across all workers. On exit, everything it created is torn down.
Note
.collect() pulls the full result back to the client process. For large distributed outputs,
prefer .sink_*() or aggregate/sample inside the query before .collect(). See
Result collection.
Configuring DaskEngine#
For custom configuration, build
StreamingOptions and use
DaskEngine.from_options():
import polars as pl
from cudf_polars.engine.options import StreamingOptions
from cudf_polars.engine.dask import DaskEngine
opts = StreamingOptions(num_streaming_threads=8, fallback_mode="silent")
with DaskEngine.from_options(opts) as engine:
result = pl.scan_parquet("/data/dataset/*.parquet").collect(engine=engine)
See Configuration Options for the available fields.
Bring your own Dask client#
Pass an existing distributed.Client via dask_client= to attach to an already-running
scheduler:
from distributed import Client
import polars as pl
from cudf_polars.engine.dask import DaskEngine
with Client("scheduler-address:8786") as dc:
with DaskEngine(dask_client=dc) as engine:
result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)
When you supply the client, DaskEngine
leaves it (and the cluster) alone on exit.
Pre-configured GPU clusters#
Some Dask launchers, notably dask_cuda.LocalCUDACluster, already pin CPU affinity and set
CUDA_VISIBLE_DEVICES per worker. Disable the built-in hardware binding via
HardwareBindingPolicy
to avoid having both layers fight over each worker’s affinity (the second to run wins, which
makes the resulting placement non-deterministic):
from dask_cuda import LocalCUDACluster
from distributed import Client
from cudf_polars.engine.dask import DaskEngine
from cudf_polars.engine.hardware_binding import (
HardwareBindingPolicy,
)
with Client(LocalCUDACluster()) as dc, DaskEngine(
dask_client=dc,
engine_options={
"hardware_binding": HardwareBindingPolicy(enabled=False),
},
) as engine:
...
Manually launched workers#
When launching workers yourself (for example on a multi-node HPC cluster), use the built-in nanny
preload to assign one GPU per worker. The preload sets CUDA_VISIBLE_DEVICES on each worker
before the process spawns:
# On each node, launch one worker per GPU with a single thread each:
dask worker SCHEDULER_ADDRESS:8786 --nworkers N --nthreads 1 \
--preload-nanny cudf_polars.engine.dask
Then connect from the client:
import polars as pl
from distributed import Client
from cudf_polars.engine.dask import DaskEngine
with Client("SCHEDULER_ADDRESS:8786") as dc:
with DaskEngine(dask_client=dc) as engine:
result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)
Hardware binding (CPU affinity, NUMA, network) is handled automatically by
DaskEngine; the nanny preload only
deals with GPU assignment.
See the Dask CLI deployment guide for more on dask worker options.
Using dask-cuda-worker#
As an alternative to the built-in nanny preload, you can launch workers with
dask-cuda-worker from the dask-cuda project. It launches one
worker per visible GPU and installs a set of plugins on every worker: a CPUAffinity plugin
that pins the worker to the NUMA node of its GPU, an RMMSetup plugin, and a nanny preload that
configures UCX.
DaskEngine sets up the same things for its own streaming runtime, so the two need to be
coordinated or they will fight:
CPU affinity is unconditional in
dask-cuda-worker, theCPUAffinityplugin is always installed and there is no CLI flag to turn it off. Passhardware_binding=HardwareBindingPolicy(enabled=False)toDaskEngineso it does not try to re-pin affinity on top of dask-cuda’s binding.Do not pass
--rmm-pool-size,--rmm-managed-memory, or similar RMM flags todask-cuda-worker. LetDaskEngineown the memory resource via itsmemory_resource_config(see Configuration Options) otherwise two different memory resources will be installed on the same worker.Do not pass
--enable-tcp-over-ucx,--enable-infiniband,--enable-nvlink, or--enable-rdmacmtodask-cuda-worker.DaskEnginebootstraps its own UCXX communicator and will select transports itself. Enabling them on both sides can produce inconsistent UCX configuration across the cluster.
# On each node, GPU assignment + CPU affinity only (no RMM, no UCX flags):
dask-cuda-worker SCHEDULER_ADDRESS:8786
import polars as pl
from distributed import Client
from cudf_polars.engine.dask import DaskEngine
from cudf_polars.engine.hardware_binding import (
HardwareBindingPolicy,
)
with Client("SCHEDULER_ADDRESS:8786") as dc:
with DaskEngine(
dask_client=dc,
engine_options={
# dask-cuda-worker always pins CPU affinity; disable DaskEngine's
# binding so the two don't conflict.
"hardware_binding": HardwareBindingPolicy(enabled=False),
},
) as engine:
result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)
Cluster diagnostics#
gather_cluster_info() returns
placement information for every worker:
with DaskEngine() as engine:
print(f"cluster has {engine.nranks} workers")
for info in engine.gather_cluster_info():
print(
f"hostname={info['hostname']}, pid={info['pid']}, "
f"CUDA_VISIBLE_DEVICES={info['cuda_visible_devices']}"
)
DaskEngine raises RuntimeError if
created inside an rrun cluster.