Streaming Execution#
The streaming executors work best when the inputs to your query come
from parquet files. That is, start with scan_parquet
, not existing
Polars DataFrame
s or CSV files.
Single GPU streaming#
The simplest case, requiring no additional dependencies, is the
synchronous
scheduler. An appropriate engine is:
engine = pl.GPUEngine()
This uses the default synchronous scheduler and is equivalent to
pl.GPUEngine(executor="streaming", executor_options={"scheduler": "synchronous"})
,
or simply passing engine="gpu"
to .collect()
.
When executed with this engine, any parquet inputs are split into “partitions” that are streamed through the query graph. We try to pick a good default for the typical partition size (based on the amount of GPU memory available), however, it might not be optimal. You can configure the execution by providing more options to the executor. For example, to split input parquet files into 125 MB chunks:
engine = pl.GPUEngine(
executor="streaming",
executor_options={
"target_partition_size": 125_000_000 # 125 MB
}
)
Use the executor option max_rows_per_partition
to control how in-memory
DataFrame
inputs are split into multiple partitions.
You may find, at the cost of higher memory footprint, that a larger value gives better performance.
Note
If part of a query does not run in streaming mode, but does run using the in-memory GPU engine, then we automatically concatenate the inputs for that operation into a single partition, and effectively fall back to the in-memory engine.
The fallback_mode
option can be used to raise an exception when
this fallback occurs or silence the warning instead:
engine = pl.GPUEngine(
executor="streaming",
executor_options={
"fallback_mode": "raise",
}
)
Multi GPU streaming#
Note
The distributed scheduler is considered experimental and might change without warning.
Streaming utilising multiple GPUs simultaneously is supported by
setting the "scheduler"
to "distributed"
:
engine = pl.GPUEngine(
executor="streaming",
executor_options={"scheduler": "distributed"},
)
Unlike the single GPU executor, this does require a number of additional dependencies. We currently require Dask and Dask-CUDA to be installed. In addition, we recommend that ucxx and rapidsmpf are installed to take advantage of any high-performance networking.
To quickly install all of these dependencies into a conda environment, you can run:
conda install -c rapidsai -c conda-forge \
cudf-polars rapidsmpf dask-cuda ucxx
Note
Identically to the single-GPU streaming case, if part of a query does not support execution with multiple partitions, but is supported by the in-memory GPU engine, we concatenate the inputs and execute using a single partition.
The multi-GPU engine uses the currently active Dask client to carry out the partitioned execution, so for multi-GPU we would use something like
from dask_cuda import LocalCUDACluster
...
client = LocalCUDACluster(...).get_client()
q = ...
engine = pl.GPUEngine(
executor="streaming",
executor_options={"scheduler": "distributed"},
)
result = q.collect(engine=engine)
Warning
If you request a "distributed"
scheduler but do not have a cluster
deployed, collect
ing the query will fail.
Streaming sink operations#
When the "distributed"
scheduler is active, sink operations like
df.sink_parquet("my_path")
will always produce a directory containing
one or more files. It is not currently possible to disable this behavior.
When the "sycnhronous"
scheduler is active, sink operations will
generate a single file by default. However, you may opt into the
distributed sink behavior by adding {"sink_to_directory": True}
to your executor_options
dictionary.
Get Started#
The experimental streaming GPU executor is now available. For a quick walkthrough of a multi-GPU example workflow and performance on a real dataset, check out the multi-GPU Polars demo.