API#

For the most part, the public API of cudf-polars is the polars API.

Configuration utilities for the cudf-polars engine.

Most users will not construct these objects directly. Instead, you’ll pass keyword arguments to GPUEngine. The majority of the options are passed as **kwargs and collected into the configuration described below:

>>> import polars as pl
>>> engine = pl.GPUEngine(
...     executor="streaming",
...     executor_options={"fallback_mode": "raise"}
... )
class cudf_polars.utils.config.CUDAStreamPoolConfig(pool_size: int = 16, flags: CudaStreamFlags = CudaStreamFlags.NON_BLOCKING)[source]#

Configuration for the CUDA stream pool.

Parameters:
pool_size

The size of the CUDA stream pool.

flags

The flags to use for the CUDA stream pool.

class cudf_polars.utils.config.Cluster(*values)[source]#

The cluster configuration for the streaming executor.

  • Cluster.DEFAULT_SINGLETON : Single-GPU execution via the DefaultSingletonEngine.

  • Cluster.SPMD : Multi-GPU SPMD execution via the SPMDEngine.

  • Cluster.RAY : Multi-GPU execution via the RayEngine.

  • Cluster.DASK : Multi-GPU execution via the DaskEngine.

class cudf_polars.utils.config.ConfigOptions(raise_on_fail: bool = False, parquet_options: ~cudf_polars.utils.config.ParquetOptions = <factory>, executor: ~cudf_polars.utils.config.ExecutorType = <factory>, device: int | None = None, memory_resource_config: ~cudf_polars.utils.config.MemoryResourceConfig | None = None, cuda_stream_policy: ~cudf_polars.utils.config.CUDAStreamPoolConfig | None = <factory>)[source]#

Configuration for the polars GPUEngine.

Parameters:
raise_on_fail

Whether to raise an exception when the GPU engine cannot execute a query. False by default.

parquet_options

Options controlling parquet file reading and writing. See ParquetOptions for more.

executor

The executor to use for the GPU engine. See StreamingExecutor and InMemoryExecutor for more.

device

The GPU used to run the query. If not provided, the query uses the current CUDA device.

cuda_stream_policy

The policy to use for CUDA streams. None (the default) uses the default CUDA stream. A CUDAStreamPoolConfig can be used to configure a stream pool.

classmethod from_polars_engine(engine: polars.lazyframe.engine_config.GPUEngine) ConfigOptions[ExecutorType][source]#

Create a ConfigOptions from a GPUEngine.

class cudf_polars.utils.config.DynamicPlanningOptions(sample_chunk_count: int = <factory>, bloom_filter_threshold: float = <factory>)[source]#

Configuration for dynamic shuffle planning.

When enabled, shuffle decisions for GroupBy/Join/Unique operations are made at runtime by sampling real chunks.

To enable dynamic planning, pass a DynamicPlanningOptions instance to StreamingExecutor(dynamic_planning=...). To disable it, pass None (the default).

These options can be configured via environment variables with the prefix CUDF_POLARS__EXECUTOR__DYNAMIC_PLANNING__.

Parameters:
sample_chunk_count

The maximum number of chunks to sample before deciding whether to shuffle. Default is 2.

bloom_filter_threshold

Row-count ratio (small / large) below which a bloom filter is applied to pre-filter the large side of an inner or semi shuffle join. Set to 0 to disable bloom filtering. Default is 0.5.

class cudf_polars.utils.config.InMemoryExecutor[source]#

Configuration for the cudf-polars in-memory executor.

The in-memory executor only supports single-GPU execution.

class cudf_polars.utils.config.ParquetOptions(chunked: bool = <factory>, n_output_chunks: int = <factory>, chunk_read_limit: int = <factory>, pass_read_limit: int = <factory>, max_footer_samples: int = <factory>, max_row_group_samples: int = <factory>, use_rapidsmpf_native: bool = <factory>)[source]#

Configuration for the cudf-polars Parquet engine.

These options can be configured via environment variables with the prefix CUDF_POLARS__PARQUET_OPTIONS__.

Parameters:
chunked

Whether to use libcudf’s ChunkedParquetReader or ChunkedParquetWriter to read/write the parquet dataset in chunks. This is useful when reading/writing very large parquet files.

n_output_chunks

Split the dataframe in n_output_chunks when using libcudf’s ChunkedParquetWriter.

chunk_read_limit

Limit on total number of bytes to be returned per read, or 0 if there is no limit.

pass_read_limit

Limit on the amount of memory used for reading and decompressing data or 0 if there is no limit.

max_footer_samples

Maximum number of file footers to sample for metadata. This option is currently used by the streaming executor to gather datasource statistics before generating a physical plan. Set to 0 to avoid metadata sampling. Default is 3.

max_row_group_samples

Maximum number of row-groups to sample for unique-value statistics. This option may be used by the streaming executor to optimize the physical plan. Default is 1.

Set to 0 to avoid row-group sampling. Note that row-group sampling will also be skipped if max_footer_samples is 0.

use_rapidsmpf_native

Whether to use the native rapidsmpf node for parquet reading. This option is only used by the streaming executor. Default is False.

class cudf_polars.utils.config.StreamingExecutor(cluster: ~cudf_polars.utils.config.Cluster | None = <factory>, fallback_mode: ~cudf_polars.utils.config.StreamingFallbackMode = <factory>, max_rows_per_partition: int = <factory>, target_partition_size: int = <factory>, broadcast_limit: int = <factory>, client_device_threshold: float = <factory>, sink_to_directory: bool | None = <factory>, dynamic_planning: ~cudf_polars.utils.config.DynamicPlanningOptions | None = <factory>, max_io_threads: int = <factory>, spill_to_pinned_memory: bool = <factory>, num_py_executors: int = <factory>, min_device_size: int | None = None, spmd_context: ~cudf_polars.utils.config.SPMDContext | None = None, ray_context: ~cudf_polars.utils.config.RayContext | None = None, dask_context: ~cudf_polars.utils.config.DaskContext | None = None)[source]#

Configuration for the cudf-polars streaming executor.

These options can be configured via environment variables with the prefix CUDF_POLARS__EXECUTOR__.

Parameters:
cluster

The cluster configuration for the streaming executor. Cluster.DEFAULT_SINGLETON by default.

  • Cluster.DEFAULT_SINGLETON: Single-GPU execution

  • Cluster.SPMD: Multi-GPU SPMD execution

  • Cluster.RAY: Multi-GPU Ray execution

  • Cluster.DASK: Multi-GPU Dask execution

fallback_mode

How to handle errors when the GPU engine fails to execute a query. StreamingFallbackMode.WARN by default.

This can be set using the CUDF_POLARS__EXECUTOR__FALLBACK_MODE environment variable.

max_rows_per_partition

The maximum number of rows to process per partition. 1_000_000 by default. When the number of rows exceeds this value, the query will be split into multiple partitions and executed in parallel.

target_partition_size

Target partition size, in bytes, for IO tasks. This configuration currently controls how large parquet files are split into multiple partitions. Files larger than target_partition_size bytes are split into multiple partitions.

This can be set via

  • keyword argument to polars.GPUEngine

  • the CUDF_POLARS__EXECUTOR__TARGET_PARTITION_SIZE environment variable

By default, cudf-polars uses the minimum of 1.5GB or 2.5% of the minimum device size in the cluster. If pynvml cannot query the the device size(s), the default target_partition_size will be 1.5GB.

broadcast_limit

The maximum number of bytes to broadcast in a single operation. By default, cudf-polars uses the minimum of 16GB or 15% of the minimum device size in the cluster. If pynvml cannot query the the device size(s), the default broadcast_limit will be 16GB.

client_device_threshold

Threshold for spilling data from device memory. Default is 50% of device memory on the client process.

sink_to_directory

Whether multi-partition sink operations write to a directory rather than a single file. For the spmd, ray, and dask clusters this is always True; setting it to False raises a ValueError.

dynamic_planning

Options controlling dynamic shuffle planning. See DynamicPlanningOptions for more.

max_io_threads

Maximum number of IO threads. Default is 4. This controls the parallelism of IO operations when reading data.

spill_to_pinned_memory

Whether RapidsMPF should spill to pinned host memory when available, or use regular pageable host memory. Pinned host memory offers higher bandwidth and lower latency for device to host transfers compared to regular pageable host memory.

num_py_executors

Maximum number of workers for the Python ThreadPoolExecutor. Default is 8.

Notes

The streaming executor does not currently support profiling a query via the .profile() method. We recommend using nsys to profile queries.

class cudf_polars.utils.config.StreamingFallbackMode(*values)[source]#

How the streaming executor handles operations that don’t support multiple partitions.

Upon encountering an unsupported operation, the streaming executor will fall back to using a single partition, which might use a large amount of memory.

  • StreamingFallbackMode.WARN : Emit a warning and fall back to a single partition.

  • StreamingFallbackMode.SILENT: Silently fall back to a single partition.

  • StreamingFallbackMode.RAISE : Raise an exception.