Engines#
What is an engine?#
cudf-polars executes Polars LazyFrame queries on GPU. You select GPU execution by passing an
engine= argument to .collect() or .sink_*(). The engine you pass decides how the
query runs: whether it streams through partitioned inputs or fits everything in device memory,
whether it runs in-process or distributes work across a cluster of GPU workers, and which
cluster backend coordinates those workers.
Execution modes#
Streaming#
Streaming engines partition their inputs (Parquet files or in-memory DataFrames) and process
those partitions through the query graph in chunks. This lets queries scale past device memory
and (on Ray, Dask, and SPMD) across multiple GPUs and multiple nodes. cudf-polars’ streaming
executor is its own GPU implementation, but conceptually parallels
Polars’ CPU streaming engine: the same
partition-and-stream model, just on the GPU.
All four ways of running cudf-polars use this same streaming executor:
RayEngine,
DaskEngine,
SPMDEngine, and the default
engine="gpu" (backed internally by
DefaultSingletonEngine).
They differ only in how their GPU worker(s) are provisioned.
RayEngine with no arguments uses every
GPU visible to the process, so on a single node with N GPUs it runs the query on all N of them
without any extra configuration. Launching a multi-node cluster simply means pointing the
engine at that cluster; the user-facing code is the same.
In-memory#
The in-memory engine (engine=pl.GPUEngine(executor="in-memory")) is
the only non-streaming path. It runs the query on a single GPU, materializing intermediates in
device memory. Use it for small queries (data that fits in device memory), debugging, or when
you specifically need LazyFrame.profile support (see Profiling and Tracing). For production
workloads on any nontrivial dataset, use a streaming engine. See In-memory engine for
details.
Cluster backends#
Engine |
Cluster model |
Extra runtime dependency |
Typical use |
|---|---|---|---|
Single client, one Ray actor per GPU |
Works from a laptop to a cloud cluster. No separate cluster setup needed. |
||
Single client, one Dask worker per GPU |
Teams with an existing Dask deployment or a preferred Dask launcher. |
||
Same script runs once per GPU, joined by a communicator |
UCXX (under |
HPC / SPMD launchers such as |
|
Implicit process-wide singleton on one GPU; no cluster |
None |
Default when no engine is constructed. Short scripts and notebooks. No options. |
All four approaches use the same execution model under the hood, so which to select depends
on your preferred deployment method, not performance tradeoffs. For any non-trivial workflow,
construct one of the first three engines explicitly (see Usage). engine="gpu" is a
convenience and accepts no options, so it cannot be tuned. See Default engine="gpu"
for details on the implementation that backs it.
Result collection#
.collect() returns a single pl.DataFrame on the caller’s process. On the streaming
engines that has two flavors:
RayEngine/DaskEngine(single client): every partition is pulled from the cluster workers back to the client and concatenated there. This is convenient for small results but does not scale to large queries. E.g., calling.collect()on a 1 TB query result sends 1 TB through your client. Sink the result (.sink_parquet("path/"),.sink_csv(...), …) so each rank writes its own partition directly, or reduce/sample the data inside the query before.collect().SPMDEngine(one process per GPU): each rank’s.collect()returns that rank’s local fragment. There is no client to gather to. If you need a single concatenatedpl.DataFrameacross ranks, callallgather_polars_dataframe()explicitly (see Collecting distributed results). If you want to keep processing the data rank-by-rank, just stay inSPMDEngineand use its MPI-style model: each rank already owns its fragment.engine="gpu": single GPU, no cluster, so.collect()is the only sensible option.
Rules of thumb for multi-machine RayEngine / DaskEngine runs:
For exports: prefer
.sink_*()over.collect().For analysis: aggregate, sample, or
limit()the result inside the lazy query before.collect()so the client only sees a small DataFrame.For further distributed processing in Python: switch to
SPMDEngineso each rank keeps its fragment.
Where to go next#
Usage: tutorial that walks through running your first GPU query end-to-end.
Other Engines: per-engine reference pages for DaskEngine and SPMDEngine.
Configuration Options: the
StreamingOptionsconfiguration object and every field it surfaces.