API#
For the most part, the public API of cudf-polars is the polars API.
Configuration utilities for the cudf-polars engine.
Most users will not construct these objects directly. Instead, you’ll pass
keyword arguments to GPUEngine. The
majority of the options are passed as **kwargs and collected into the
configuration described below:
>>> import polars as pl
>>> engine = pl.GPUEngine(
... executor="streaming",
... executor_options={"fallback_mode": "raise"}
... )
- class cudf_polars.utils.config.CUDAStreamPolicy(*values)[source]#
The policy to use for acquiring new CUDA streams.
CUDAStreamPolicy.DEFAULT: Use the default CUDA stream.CUDAStreamPolicy.NEW: Create a new CUDA stream.
- class cudf_polars.utils.config.Cluster(*values)[source]#
The cluster configuration for the streaming executor.
Cluster.SINGLE: Single-GPU execution. Currently uses a zero-dependency, synchronous, single-threaded task scheduler.Cluster.DISTRIBUTED: Multi-GPU distributed execution. Currently uses a Dask-based distributed scheduler and requires an active Dask cluster.
- class cudf_polars.utils.config.ConfigOptions(raise_on_fail: bool = False, parquet_options: ~cudf_polars.utils.config.ParquetOptions = <factory>, executor: ~cudf_polars.utils.config.StreamingExecutor | ~cudf_polars.utils.config.InMemoryExecutor = <factory>, device: int | None = None, memory_resource_config: ~cudf_polars.utils.config.MemoryResourceConfig | None = None, cuda_stream_policy: ~cudf_polars.utils.config.CUDAStreamPolicy | ~cudf_polars.utils.config.CUDAStreamPoolConfig = <factory>)[source]#
Configuration for the polars GPUEngine.
- Parameters:
- raise_on_fail
Whether to raise an exception when the GPU engine cannot execute a query.
Falseby default.- parquet_options
Options controlling parquet file reading and writing. See
ParquetOptionsfor more.- executor
The executor to use for the GPU engine. See
StreamingExecutorandInMemoryExecutorfor more.- device
The GPU used to run the query. If not provided, the query uses the current CUDA device.
- cuda_stream_policy
The policy to use for acquiring new CUDA streams. See
CUDAStreamPolicyfor more.
- classmethod from_polars_engine(engine: polars.lazyframe.engine_config.GPUEngine) Self[source]#
Create a
ConfigOptionsfrom aGPUEngine.
- class cudf_polars.utils.config.InMemoryExecutor[source]#
Configuration for the cudf-polars in-memory executor.
The in-memory executor only supports single-GPU execution.
- class cudf_polars.utils.config.ParquetOptions(chunked: bool = <factory>, n_output_chunks: int = <factory>, chunk_read_limit: int = <factory>, pass_read_limit: int = <factory>, max_footer_samples: int = <factory>, max_row_group_samples: int = <factory>, use_rapidsmpf_native: bool = <factory>)[source]#
Configuration for the cudf-polars Parquet engine.
These options can be configured via environment variables with the prefix
CUDF_POLARS__PARQUET_OPTIONS__.- Parameters:
- chunked
Whether to use libcudf’s
ChunkedParquetReaderorChunkedParquetWriterto read/write the parquet dataset in chunks. This is useful when reading/writing very large parquet files.- n_output_chunks
Split the dataframe in
n_output_chunkswhen using libcudf’sChunkedParquetWriter.- chunk_read_limit
Limit on total number of bytes to be returned per read, or 0 if there is no limit.
- pass_read_limit
Limit on the amount of memory used for reading and decompressing data or 0 if there is no limit.
- max_footer_samples
Maximum number of file footers to sample for metadata. This option is currently used by the streaming executor to gather datasource statistics before generating a physical plan. Set to 0 to avoid metadata sampling. Default is 3.
- max_row_group_samples
Maximum number of row-groups to sample for unique-value statistics. This option may be used by the streaming executor to optimize the physical plan. Default is 1.
Set to 0 to avoid row-group sampling. Note that row-group sampling will also be skipped if
max_footer_samplesis 0.- use_rapidsmpf_native
Whether to use the native rapidsmpf node for parquet reading. This option is only used when the rapidsmpf runtime is enabled. Default is True.
- class cudf_polars.utils.config.ShuffleMethod(*values)[source]#
The method to use for shuffling data between workers with the streaming executor.
ShuffleMethod.TASKS: Use the task-based shuffler.ShuffleMethod.RAPIDSMPF: Use the rapidsmpf shuffler.ShuffleMethod._RAPIDSMPF_SINGLE: Use the single-process rapidsmpf shuffler.
With
cudf_polars.utils.config.StreamingExecutor, the default ofNonewill attempt to useShuffleMethod.RAPIDSMPFfor a distributed cluster, but will fall back toShuffleMethod.TASKSif rapidsmpf is not installed.The user should not specify
ShuffleMethod._RAPIDSMPF_SINGLEdirectly. A setting ofShuffleMethod.RAPIDSMPFwill be converted to the single-process shuffler automatically when using single-GPU execution.
- class cudf_polars.utils.config.ShufflerInsertionMethod(*values)[source]#
The method to use for inserting chunks into the rapidsmpf shuffler.
ShufflerInsertionMethod.INSERT_CHUNKS: Use insert_chunks for inserting data.ShufflerInsertionMethod.CONCAT_INSERT: Use concat_insert for inserting data.
Only applicable with the “rapidsmpf” shuffle method and the “tasks” runtime.
- class cudf_polars.utils.config.StatsPlanningOptions(use_io_partitioning: bool = <factory>, use_reduction_planning: bool = <factory>, use_join_heuristics: bool = <factory>, use_sampling: bool = <factory>, default_selectivity: float = <factory>)[source]#
Configuration for statistics-based query planning.
These options can be configured via environment variables with the prefix
CUDF_POLARS__EXECUTOR__STATS_PLANNING__.- Parameters:
- use_io_partitioning
Whether to use estimated file-size statistics to calculate the ideal input-partition count for IO operations. This option currently applies to Parquet data only. Default is True.
- use_reduction_planning
Whether to use estimated column statistics to calculate the output-partition count for reduction operations like Distinct, GroupBy, and Select(unique). Default is False.
- use_join_heuristics
Whether to use join heuristics to estimate row-count and unique-count statistics. Default is True. These statistics may only be collected when they are actually needed for query planning and when row-count statistics are available for the underlying datasource (e.g. Parquet and in-memory LazyFrame data).
- use_sampling
Whether to sample real data to estimate unique-value statistics. Default is True. These statistics may only be collected when they are actually needed for query planning, and when the underlying datasource supports sampling (e.g. Parquet and in-memory LazyFrame data).
- default_selectivity
The default selectivity of a predicate. Default is 0.8.
- class cudf_polars.utils.config.StreamingExecutor(runtime: ~cudf_polars.utils.config.Runtime = <factory>, cluster: ~cudf_polars.utils.config.Cluster | None = <factory>, scheduler: ~cudf_polars.utils.config.Scheduler | None = <factory>, fallback_mode: ~cudf_polars.utils.config.StreamingFallbackMode = <factory>, max_rows_per_partition: int = <factory>, unique_fraction: dict[str, float] = <factory>, target_partition_size: int = <factory>, groupby_n_ary: int = <factory>, broadcast_join_limit: int = <factory>, shuffle_method: ~cudf_polars.utils.config.ShuffleMethod = <factory>, shuffler_insertion_method: ~cudf_polars.utils.config.ShufflerInsertionMethod = <factory>, rapidsmpf_spill: bool = <factory>, client_device_threshold: float = <factory>, sink_to_directory: bool | None = <factory>, stats_planning: ~cudf_polars.utils.config.StatsPlanningOptions = <factory>, max_io_threads: int = <factory>)[source]#
Configuration for the cudf-polars streaming executor.
These options can be configured via environment variables with the prefix
CUDF_POLARS__EXECUTOR__.- Parameters:
- runtime
The runtime to use for the streaming executor.
Runtime.TASKSby default.- cluster
The cluster configuration for the streaming executor.
Cluster.SINGLEby default.This setting applies to both task-based and rapidsmpf execution modes:
Cluster.SINGLE: Single-GPU executionCluster.DISTRIBUTED: Multi-GPU distributed execution (requires an active Dask cluster)
- scheduler
Deprecated: Use
clusterinstead.For backward compatibility: *
Scheduler.SYNCHRONOUSmaps toCluster.SINGLE*Scheduler.DISTRIBUTEDmaps toCluster.DISTRIBUTED- fallback_mode
How to handle errors when the GPU engine fails to execute a query.
StreamingFallbackMode.WARNby default.This can be set using the
CUDF_POLARS__EXECUTOR__FALLBACK_MODEenvironment variable.- max_rows_per_partition
The maximum number of rows to process per partition. 1_000_000 by default. When the number of rows exceeds this value, the query will be split into multiple partitions and executed in parallel.
- unique_fraction
A dictionary mapping column names to floats between 0 and 1 (inclusive on the right).
Each factor estimates the fractional number of unique values in the column. By default,
1.0is used for any column not included inunique_fraction.- target_partition_size
Target partition size, in bytes, for IO tasks. This configuration currently controls how large parquet files are split into multiple partitions. Files larger than
target_partition_sizebytes are split into multiple partitions.This can be set via
keyword argument to
polars.GPUEnginethe
CUDF_POLARS__EXECUTOR__TARGET_PARTITION_SIZEenvironment variable
By default, cudf-polars uses a target partition size that’s a fraction of the device memory, where the fraction depends on the cluster:
distributed: 1/40th of the device memory
single: 1/16th of the device memory
The optional pynvml dependency is used to query the device memory size. If pynvml is not available, a warning is emitted and the device size is assumed to be 12 GiB.
- groupby_n_ary
The factor by which the number of partitions is decreased when performing a groupby on a partitioned column. For example, if a column has 64 partitions, it will first be reduced to
ceil(64 / 32) = 2partitions.This is useful when the absolute number of partitions is large.
- broadcast_join_limit
The maximum number of partitions to allow for the smaller table in a broadcast join.
- shuffle_method
The method to use for shuffling data between workers. Defaults to ‘rapidsmpf’ for distributed cluster if available (otherwise ‘tasks’), and ‘tasks’ for single-GPU cluster.
- shuffler_insertion_method
The method to use for inserting chunks with the rapidsmpf shuffler. Can be ‘insert_chunks’ (default) or ‘concat_insert’.
Only applicable with
shuffle_method="rapidsmpf"andruntime="tasks".- rapidsmpf_spill
Whether to wrap task arguments and output in objects that are spillable by ‘rapidsmpf’.
- client_device_threshold
Threshold for spilling data from device memory in rapidsmpf. Default is 50% of device memory on the client process. This argument is only used by the “rapidsmpf” runtime.
- sink_to_directory
Whether multi-partition sink operations should write to a directory rather than a single file. By default, this will be set to True for the ‘distributed’ cluster and False otherwise. The ‘distributed’ cluster does not currently support
sink_to_directory=False.- stats_planning
Options controlling statistics-based query planning. See
StatsPlanningOptionsfor more.- max_io_threads
Maximum number of IO threads for the rapidsmpf runtime. Default is 2. This controls the parallelism of IO operations when reading data.
Notes
The streaming executor does not currently support profiling a query via the
.profile()method. We recommend using nsys to profile queries with single-GPU execution and Dask’s built-in profiling tools with distributed execution.
- class cudf_polars.utils.config.StreamingFallbackMode(*values)[source]#
How the streaming executor handles operations that don’t support multiple partitions.
Upon encountering an unsupported operation, the streaming executor will fall back to using a single partition, which might use a large amount of memory.
StreamingFallbackMode.WARN: Emit a warning and fall back to a single partition.StreamingFallbackMode.SILENT: Silently fall back to a single partition.StreamingFallbackMode.RAISE: Raise an exception.