API#
For the most part, the public API of cudf-polars
is the polars API.
Configuration utilities for the cudf-polars engine.
Most users will not construct these objects directly. Instead, you’ll pass
keyword arguments to GPUEngine
. The
majority of the options are passed as **kwargs and collected into the
configuration described below:
>>> import polars as pl
>>> engine = pl.GPUEngine(
... executor="streaming",
... executor_options={"fallback_mode": "raise"}
... )
- class cudf_polars.utils.config.ConfigOptions(raise_on_fail: bool = False, parquet_options: ~cudf_polars.utils.config.ParquetOptions = <factory>, executor: ~cudf_polars.utils.config.StreamingExecutor | ~cudf_polars.utils.config.InMemoryExecutor = <factory>, device: int | None = None)[source]#
Configuration for the polars GPUEngine.
- Parameters:
- raise_on_fail
Whether to raise an exception when the GPU engine cannot execute a query.
False
by default.- parquet_options
Options controlling parquet file reading and writing. See
ParquetOptions
for more.- executor
The executor to use for the GPU engine. See
StreamingExecutor
andInMemoryExecutor
for more.- device
The GPU used to run the query. If not provided, the query uses the current CUDA device.
- classmethod from_polars_engine(engine: polars.lazyframe.engine_config.GPUEngine) Self [source]#
Create a
ConfigOptions
from aGPUEngine
.
- class cudf_polars.utils.config.InMemoryExecutor[source]#
Configuration for the cudf-polars in-memory executor.
- Parameters:
- scheduler:
The scheduler to use for the in-memory executor. Currently only
Scheduler.SYNCHRONOUS
is supported for the in-memory executor.
- class cudf_polars.utils.config.ParquetOptions(chunked: bool = <factory>, chunk_read_limit: int = <factory>, pass_read_limit: int = <factory>, max_footer_samples: int = <factory>, max_row_group_samples: int = <factory>)[source]#
Configuration for the cudf-polars Parquet engine.
These options can be configured via environment variables with the prefix
CUDF_POLARS__PARQUET_OPTIONS__
.- Parameters:
- chunked
Whether to use libcudf’s
ChunkedParquetReader
to read the parquet dataset in chunks. This is useful when reading very large parquet files.- chunk_read_limit
Limit on total number of bytes to be returned per read, or 0 if there is no limit.
- pass_read_limit
Limit on the amount of memory used for reading and decompressing data or 0 if there is no limit.
- max_footer_samples
Maximum number of file footers to sample for metadata. This option is currently used by the streaming executor to gather datasource statistics before generating a physical plan. Set to 0 to avoid metadata sampling. Default is 3.
- max_row_group_samples
Maximum number of row-groups to sample for unique-value statistics. This option may be used by the streaming executor to optimize the physical plan. Default is 1.
Set to 0 to avoid row-group sampling. Note that row-group sampling will also be skipped if
max_footer_samples
is 0.
- class cudf_polars.utils.config.Scheduler(*values)[source]#
The scheduler to use for the streaming executor.
Scheduler.SYNCHRONOUS
: A zero-dependency, synchronous, single-threaded scheduler.Scheduler.DISTRIBUTED
: A Dask-based distributed scheduler. Using this scheduler requires an active Dask cluster.
- class cudf_polars.utils.config.ShuffleMethod(*values)[source]#
The method to use for shuffling data between workers with the streaming executor.
ShuffleMethod.TASKS
: Use the task-based shuffler.ShuffleMethod.RAPIDSMPF
: Use the rapidsmpf scheduler.
With
cudf_polars.utils.config.StreamingExecutor
, the default ofNone
will attempt to useShuffleMethod.RAPIDSMPF
, but will fall back toShuffleMethod.TASKS
if rapidsmpf is not installed.
- class cudf_polars.utils.config.StreamingExecutor(scheduler: ~cudf_polars.utils.config.Scheduler = <factory>, fallback_mode: ~cudf_polars.utils.config.StreamingFallbackMode = <factory>, max_rows_per_partition: int = <factory>, unique_fraction: dict[str, float] = <factory>, target_partition_size: int = <factory>, groupby_n_ary: int = <factory>, broadcast_join_limit: int = <factory>, shuffle_method: ~cudf_polars.utils.config.ShuffleMethod = <factory>, rapidsmpf_spill: bool = <factory>, sink_to_directory: bool | None = <factory>)[source]#
Configuration for the cudf-polars streaming executor.
These options can be configured via environment variables with the prefix
CUDF_POLARS__EXECUTOR__
.- Parameters:
- scheduler
The scheduler to use for the streaming executor.
Scheduler.SYNCHRONOUS
by default.Note
scheduler="distributed"
requires a Dask cluster to be running.- fallback_mode
How to handle errors when the GPU engine fails to execute a query.
StreamingFallbackMode.WARN
by default.This can be set using the
CUDF_POLARS__EXECUTOR__FALLBACK_MODE
environment variable.- max_rows_per_partition
The maximum number of rows to process per partition. 1_000_000 by default. When the number of rows exceeds this value, the query will be split into multiple partitions and executed in parallel.
- unique_fraction
A dictionary mapping column names to floats between 0 and 1 (inclusive on the right).
Each factor estimates the fractional number of unique values in the column. By default,
1.0
is used for any column not included inunique_fraction
.- target_partition_size
Target partition size, in bytes, for IO tasks. This configuration currently controls how large parquet files are split into multiple partitions. Files larger than
target_partition_size
bytes are split into multiple partitions.This can be set via
keyword argument to
polars.GPUEngine
the
CUDF_POLARS__EXECUTOR__TARGET_PARTITION_SIZE
environment variable
By default, cudf-polars uses a target partition size that’s a fraction of the device memory, where the fraction depends on the scheduler:
distributed: 1/40th of the device memory
synchronous: 1/16th of the device memory
The optional pynvml dependency is used to query the device memory size. If pynvml is not available, a warning is emitted and the device size is assumed to be 12 GiB.
- groupby_n_ary
The factor by which the number of partitions is decreased when performing a groupby on a partitioned column. For example, if a column has 64 partitions, it will first be reduced to
ceil(64 / 32) = 2
partitions.This is useful when the absolute number of partitions is large.
- broadcast_join_limit
The maximum number of partitions to allow for the smaller table in a broadcast join.
- shuffle_method
The method to use for shuffling data between workers. Defaults to ‘rapidsmpf’ for distributed scheduler if available (otherwise ‘tasks’), and ‘tasks’ for synchronous scheduler.
- rapidsmpf_spill
Whether to wrap task arguments and output in objects that are spillable by ‘rapidsmpf’.
- sink_to_directory
Whether multi-partition sink operations should write to a directory rather than a single file. By default, this will be set to True for the ‘distributed’ scheduler and False otherwise. The ‘distrubuted’ scheduler does not currently support
sink_to_directory=False
.
Notes
The streaming executor does not currently support profiling a query via the
.profile()
method. We recommend using nsys to profile queries with the ‘synchronous’ scheduler and Dask’s built-in profiling tools with the ‘distributed’ scheduler.
- class cudf_polars.utils.config.StreamingFallbackMode(*values)[source]#
How the streaming executor handles operations that don’t support multiple partitions.
Upon encountering an unsupported operation, the streaming executor will fall back to using a single partition, which might use a large amount of memory.
StreamingFallbackMode.WARN
: Emit a warning and fall back to a single partition.StreamingFallbackMode.SILENT
: Silently fall back to a single partition.StreamingFallbackMode.RAISE
: Raise an exception.