API#
For the most part, the public API of cudf-polars is the polars API.
Configuration utilities for the cudf-polars engine.
Most users will not construct these objects directly. Instead, you’ll pass
keyword arguments to GPUEngine. The
majority of the options are passed as **kwargs and collected into the
configuration described below:
>>> import polars as pl
>>> engine = pl.GPUEngine(
... executor="streaming",
... executor_options={"fallback_mode": "raise"}
... )
- class cudf_polars.utils.config.CUDAStreamPoolConfig(pool_size: int = 16, flags: CudaStreamFlags = CudaStreamFlags.NON_BLOCKING)[source]#
Configuration for the CUDA stream pool.
- Parameters:
- pool_size
The size of the CUDA stream pool.
- flags
The flags to use for the CUDA stream pool.
- class cudf_polars.utils.config.Cluster(*values)[source]#
The cluster configuration for the streaming executor.
Cluster.DEFAULT_SINGLETON: Single-GPU execution via the DefaultSingletonEngine.Cluster.SPMD: Multi-GPU SPMD execution via the SPMDEngine.Cluster.RAY: Multi-GPU execution via the RayEngine.Cluster.DASK: Multi-GPU execution via the DaskEngine.
- class cudf_polars.utils.config.ConfigOptions(raise_on_fail: bool = False, parquet_options: ~cudf_polars.utils.config.ParquetOptions = <factory>, executor: ~cudf_polars.utils.config.ExecutorType = <factory>, device: int | None = None, memory_resource_config: ~cudf_polars.utils.config.MemoryResourceConfig | None = None, cuda_stream_policy: ~cudf_polars.utils.config.CUDAStreamPoolConfig | None = <factory>)[source]#
Configuration for the polars GPUEngine.
- Parameters:
- raise_on_fail
Whether to raise an exception when the GPU engine cannot execute a query.
Falseby default.- parquet_options
Options controlling parquet file reading and writing. See
ParquetOptionsfor more.- executor
The executor to use for the GPU engine. See
StreamingExecutorandInMemoryExecutorfor more.- device
The GPU used to run the query. If not provided, the query uses the current CUDA device.
- cuda_stream_policy
The policy to use for CUDA streams.
None(the default) uses the default CUDA stream. ACUDAStreamPoolConfigcan be used to configure a stream pool.
- classmethod from_polars_engine(engine: polars.lazyframe.engine_config.GPUEngine) ConfigOptions[ExecutorType][source]#
Create a
ConfigOptionsfrom aGPUEngine.
- class cudf_polars.utils.config.DynamicPlanningOptions(sample_chunk_count: int = <factory>, bloom_filter_threshold: float = <factory>)[source]#
Configuration for dynamic shuffle planning.
When enabled, shuffle decisions for GroupBy/Join/Unique operations are made at runtime by sampling real chunks.
To enable dynamic planning, pass a
DynamicPlanningOptionsinstance toStreamingExecutor(dynamic_planning=...). To disable it, passNone(the default).These options can be configured via environment variables with the prefix
CUDF_POLARS__EXECUTOR__DYNAMIC_PLANNING__.- Parameters:
- sample_chunk_count
The maximum number of chunks to sample before deciding whether to shuffle. Default is 2.
- bloom_filter_threshold
Row-count ratio (small / large) below which a bloom filter is applied to pre-filter the large side of an inner or semi shuffle join. Set to 0 to disable bloom filtering. Default is 0.5.
- class cudf_polars.utils.config.InMemoryExecutor[source]#
Configuration for the cudf-polars in-memory executor.
The in-memory executor only supports single-GPU execution.
- class cudf_polars.utils.config.ParquetOptions(chunked: bool = <factory>, n_output_chunks: int = <factory>, chunk_read_limit: int = <factory>, pass_read_limit: int = <factory>, max_footer_samples: int = <factory>, max_row_group_samples: int = <factory>, use_rapidsmpf_native: bool = <factory>)[source]#
Configuration for the cudf-polars Parquet engine.
These options can be configured via environment variables with the prefix
CUDF_POLARS__PARQUET_OPTIONS__.- Parameters:
- chunked
Whether to use libcudf’s
ChunkedParquetReaderorChunkedParquetWriterto read/write the parquet dataset in chunks. This is useful when reading/writing very large parquet files.- n_output_chunks
Split the dataframe in
n_output_chunkswhen using libcudf’sChunkedParquetWriter.- chunk_read_limit
Limit on total number of bytes to be returned per read, or 0 if there is no limit.
- pass_read_limit
Limit on the amount of memory used for reading and decompressing data or 0 if there is no limit.
- max_footer_samples
Maximum number of file footers to sample for metadata. This option is currently used by the streaming executor to gather datasource statistics before generating a physical plan. Set to 0 to avoid metadata sampling. Default is 3.
- max_row_group_samples
Maximum number of row-groups to sample for unique-value statistics. This option may be used by the streaming executor to optimize the physical plan. Default is 1.
Set to 0 to avoid row-group sampling. Note that row-group sampling will also be skipped if
max_footer_samplesis 0.- use_rapidsmpf_native
Whether to use the native rapidsmpf node for parquet reading. This option is only used by the streaming executor. Default is False.
- class cudf_polars.utils.config.StreamingExecutor(cluster: ~cudf_polars.utils.config.Cluster | None = <factory>, fallback_mode: ~cudf_polars.utils.config.StreamingFallbackMode = <factory>, max_rows_per_partition: int = <factory>, target_partition_size: int = <factory>, broadcast_limit: int = <factory>, client_device_threshold: float = <factory>, sink_to_directory: bool | None = <factory>, dynamic_planning: ~cudf_polars.utils.config.DynamicPlanningOptions | None = <factory>, max_io_threads: int = <factory>, spill_to_pinned_memory: bool = <factory>, num_py_executors: int = <factory>, min_device_size: int | None = None, spmd_context: ~cudf_polars.utils.config.SPMDContext | None = None, ray_context: ~cudf_polars.utils.config.RayContext | None = None, dask_context: ~cudf_polars.utils.config.DaskContext | None = None)[source]#
Configuration for the cudf-polars streaming executor.
These options can be configured via environment variables with the prefix
CUDF_POLARS__EXECUTOR__.- Parameters:
- cluster
The cluster configuration for the streaming executor.
Cluster.DEFAULT_SINGLETONby default.Cluster.DEFAULT_SINGLETON: Single-GPU executionCluster.SPMD: Multi-GPU SPMD executionCluster.RAY: Multi-GPU Ray executionCluster.DASK: Multi-GPU Dask execution
- fallback_mode
How to handle errors when the GPU engine fails to execute a query.
StreamingFallbackMode.WARNby default.This can be set using the
CUDF_POLARS__EXECUTOR__FALLBACK_MODEenvironment variable.- max_rows_per_partition
The maximum number of rows to process per partition. 1_000_000 by default. When the number of rows exceeds this value, the query will be split into multiple partitions and executed in parallel.
- target_partition_size
Target partition size, in bytes, for IO tasks. This configuration currently controls how large parquet files are split into multiple partitions. Files larger than
target_partition_sizebytes are split into multiple partitions.This can be set via
keyword argument to
polars.GPUEnginethe
CUDF_POLARS__EXECUTOR__TARGET_PARTITION_SIZEenvironment variable
By default, cudf-polars uses the minimum of 1.5GB or 2.5% of the minimum device size in the cluster. If pynvml cannot query the the device size(s), the default
target_partition_sizewill be 1.5GB.- broadcast_limit
The maximum number of bytes to broadcast in a single operation. By default, cudf-polars uses the minimum of 16GB or 15% of the minimum device size in the cluster. If pynvml cannot query the the device size(s), the default
broadcast_limitwill be 16GB.- client_device_threshold
Threshold for spilling data from device memory. Default is 50% of device memory on the client process.
- sink_to_directory
Whether multi-partition sink operations write to a directory rather than a single file. For the spmd, ray, and dask clusters this is always True; setting it to False raises a ValueError.
- dynamic_planning
Options controlling dynamic shuffle planning. See
DynamicPlanningOptionsfor more.- max_io_threads
Maximum number of IO threads. Default is 4. This controls the parallelism of IO operations when reading data.
- spill_to_pinned_memory
Whether RapidsMPF should spill to pinned host memory when available, or use regular pageable host memory. Pinned host memory offers higher bandwidth and lower latency for device to host transfers compared to regular pageable host memory.
- num_py_executors
Maximum number of workers for the Python ThreadPoolExecutor. Default is 8.
Notes
The streaming executor does not currently support profiling a query via the
.profile()method. We recommend using nsys to profile queries.
- class cudf_polars.utils.config.StreamingFallbackMode(*values)[source]#
How the streaming executor handles operations that don’t support multiple partitions.
Upon encountering an unsupported operation, the streaming executor will fall back to using a single partition, which might use a large amount of memory.
StreamingFallbackMode.WARN: Emit a warning and fall back to a single partition.StreamingFallbackMode.SILENT: Silently fall back to a single partition.StreamingFallbackMode.RAISE: Raise an exception.