API#

For the most part, the public API of cudf-polars is the polars API.

Configuration utilities for the cudf-polars engine.

Most users will not construct these objects directly. Instead, you’ll pass keyword arguments to GPUEngine. The majority of the options are passed as **kwargs and collected into the configuration described below:

>>> import polars as pl
>>> engine = pl.GPUEngine(
...     executor="streaming",
...     executor_options={"fallback_mode": "raise"}
... )
class cudf_polars.utils.config.ConfigOptions(raise_on_fail: bool = False, parquet_options: ~cudf_polars.utils.config.ParquetOptions = <factory>, executor: ~cudf_polars.utils.config.StreamingExecutor | ~cudf_polars.utils.config.InMemoryExecutor = <factory>, device: int | None = None)[source]#

Configuration for the polars GPUEngine.

Parameters:
raise_on_fail

Whether to raise an exception when the GPU engine cannot execute a query. False by default.

parquet_options

Options controlling parquet file reading and writing. See ParquetOptions for more.

executor

The executor to use for the GPU engine. See StreamingExecutor and InMemoryExecutor for more.

device

The GPU used to run the query. If not provided, the query uses the current CUDA device.

classmethod from_polars_engine(engine: polars.lazyframe.engine_config.GPUEngine) Self[source]#

Create a ConfigOptions from a GPUEngine.

class cudf_polars.utils.config.InMemoryExecutor[source]#

Configuration for the cudf-polars in-memory executor.

Parameters:
scheduler:

The scheduler to use for the in-memory executor. Currently only Scheduler.SYNCHRONOUS is supported for the in-memory executor.

class cudf_polars.utils.config.ParquetOptions(chunked: bool = <factory>, chunk_read_limit: int = <factory>, pass_read_limit: int = <factory>, max_footer_samples: int = <factory>, max_row_group_samples: int = <factory>)[source]#

Configuration for the cudf-polars Parquet engine.

These options can be configured via environment variables with the prefix CUDF_POLARS__PARQUET_OPTIONS__.

Parameters:
chunked

Whether to use libcudf’s ChunkedParquetReader to read the parquet dataset in chunks. This is useful when reading very large parquet files.

chunk_read_limit

Limit on total number of bytes to be returned per read, or 0 if there is no limit.

pass_read_limit

Limit on the amount of memory used for reading and decompressing data or 0 if there is no limit.

max_footer_samples

Maximum number of file footers to sample for metadata. This option is currently used by the streaming executor to gather datasource statistics before generating a physical plan. Set to 0 to avoid metadata sampling. Default is 3.

max_row_group_samples

Maximum number of row-groups to sample for unique-value statistics. This option may be used by the streaming executor to optimize the physical plan. Default is 1.

Set to 0 to avoid row-group sampling. Note that row-group sampling will also be skipped if max_footer_samples is 0.

class cudf_polars.utils.config.Scheduler(*values)[source]#

The scheduler to use for the streaming executor.

  • Scheduler.SYNCHRONOUS : A zero-dependency, synchronous, single-threaded scheduler.

  • Scheduler.DISTRIBUTED : A Dask-based distributed scheduler. Using this scheduler requires an active Dask cluster.

class cudf_polars.utils.config.ShuffleMethod(*values)[source]#

The method to use for shuffling data between workers with the streaming executor.

  • ShuffleMethod.TASKS : Use the task-based shuffler.

  • ShuffleMethod.RAPIDSMPF : Use the rapidsmpf scheduler.

With cudf_polars.utils.config.StreamingExecutor, the default of None will attempt to use ShuffleMethod.RAPIDSMPF, but will fall back to ShuffleMethod.TASKS if rapidsmpf is not installed.

class cudf_polars.utils.config.StreamingExecutor(scheduler: ~cudf_polars.utils.config.Scheduler = <factory>, fallback_mode: ~cudf_polars.utils.config.StreamingFallbackMode = <factory>, max_rows_per_partition: int = <factory>, unique_fraction: dict[str, float] = <factory>, target_partition_size: int = <factory>, groupby_n_ary: int = <factory>, broadcast_join_limit: int = <factory>, shuffle_method: ~cudf_polars.utils.config.ShuffleMethod = <factory>, rapidsmpf_spill: bool = <factory>, sink_to_directory: bool | None = <factory>)[source]#

Configuration for the cudf-polars streaming executor.

These options can be configured via environment variables with the prefix CUDF_POLARS__EXECUTOR__.

Parameters:
scheduler

The scheduler to use for the streaming executor. Scheduler.SYNCHRONOUS by default.

Note scheduler="distributed" requires a Dask cluster to be running.

fallback_mode

How to handle errors when the GPU engine fails to execute a query. StreamingFallbackMode.WARN by default.

This can be set using the CUDF_POLARS__EXECUTOR__FALLBACK_MODE environment variable.

max_rows_per_partition

The maximum number of rows to process per partition. 1_000_000 by default. When the number of rows exceeds this value, the query will be split into multiple partitions and executed in parallel.

unique_fraction

A dictionary mapping column names to floats between 0 and 1 (inclusive on the right).

Each factor estimates the fractional number of unique values in the column. By default, 1.0 is used for any column not included in unique_fraction.

target_partition_size

Target partition size, in bytes, for IO tasks. This configuration currently controls how large parquet files are split into multiple partitions. Files larger than target_partition_size bytes are split into multiple partitions.

This can be set via

  • keyword argument to polars.GPUEngine

  • the CUDF_POLARS__EXECUTOR__TARGET_PARTITION_SIZE environment variable

By default, cudf-polars uses a target partition size that’s a fraction of the device memory, where the fraction depends on the scheduler:

  • distributed: 1/40th of the device memory

  • synchronous: 1/16th of the device memory

The optional pynvml dependency is used to query the device memory size. If pynvml is not available, a warning is emitted and the device size is assumed to be 12 GiB.

groupby_n_ary

The factor by which the number of partitions is decreased when performing a groupby on a partitioned column. For example, if a column has 64 partitions, it will first be reduced to ceil(64 / 32) = 2 partitions.

This is useful when the absolute number of partitions is large.

broadcast_join_limit

The maximum number of partitions to allow for the smaller table in a broadcast join.

shuffle_method

The method to use for shuffling data between workers. Defaults to ‘rapidsmpf’ for distributed scheduler if available (otherwise ‘tasks’), and ‘tasks’ for synchronous scheduler.

rapidsmpf_spill

Whether to wrap task arguments and output in objects that are spillable by ‘rapidsmpf’.

sink_to_directory

Whether multi-partition sink operations should write to a directory rather than a single file. By default, this will be set to True for the ‘distributed’ scheduler and False otherwise. The ‘distrubuted’ scheduler does not currently support sink_to_directory=False.

Notes

The streaming executor does not currently support profiling a query via the .profile() method. We recommend using nsys to profile queries with the ‘synchronous’ scheduler and Dask’s built-in profiling tools with the ‘distributed’ scheduler.

class cudf_polars.utils.config.StreamingFallbackMode(*values)[source]#

How the streaming executor handles operations that don’t support multiple partitions.

Upon encountering an unsupported operation, the streaming executor will fall back to using a single partition, which might use a large amount of memory.

  • StreamingFallbackMode.WARN : Emit a warning and fall back to a single partition.

  • StreamingFallbackMode.SILENT: Silently fall back to a single partition.

  • StreamingFallbackMode.RAISE : Raise an exception.