Python API Reference#
This page contains the API reference for rapidsmpf.
Integrations#
The subpackages under rapidsmpf.integrations contain integrations with other
libraries.
Generic#
RapidsMPF Integrations.
- class rapidsmpf.integrations.WorkerContext(br: BufferResource, statistics: Statistics, comm: Communicator | None = None, spill_collection: SpillCollection = <factory>, shufflers: dict[int, Shuffler] = <factory>, options: Options = <factory>)#
RapidsMPF specific attributes for a worker.
- Attributes:
- lock
The global worker lock. Must be acquired before accessing attributes that might be modified while the worker is running such as the shufflers.
- br
The buffer resource used by the worker exclusively.
- statistics
The statistics used by the worker. If None, statistics is disabled.
- comm
The communicator connected to all other workers.
- spill_collection
A collection of Python objects that can be spilled to free up device memory.
- shufflers
A mapping from shuffler IDs to active shuffler instances.
- options
Configuration options.
Methods
Get the statistics from the worker context.
- get_statistics() dict[str, dict[str, int | float]]#
Get the statistics from the worker context.
- Returns:
- statistics
A dictionary of statistics. The keys are the names of the statistics. The values are dictionaries with two keys:
“count” is the number of times the statistic was recorded.
“value” is the value of the statistic.
Notes
Statistics are global across all shuffles. To measure statistics for any given shuffle, gather statistics before and after the shuffle and compute the difference.
Dask#
RapidsMPF Dask Integrations.
- rapidsmpf.integrations.dask.bootstrap_dask_cluster(client: ~distributed.client.Client, *, options: ~rapidsmpf.config.Options = <rapidsmpf.config.Options object>) None#
Setup a Dask cluster for RapidsMPF shuffling.
Calling
bootstrap_dask_clustermultiple times on the same worker is a noop, which also means that any new options values are ignored.- Parameters:
- client
The current Dask client.
- options
Configuration options. Reads environment variables for any options not set explicitly using
get_environment_variables().
Notes
This utility must be executed before RapidsMPF shuffling can be used within a Dask cluster. This function is called automatically by
rapidsmpf.integrations.dask.rapidsmpf_shuffle_graph, but may be called manually to set things up before the first shuffle.Subsequent shuffles on the same cluster will reuse the resources established on the cluster by this function.
All the workers reported by
distributed.Client.scheduler_info()will be used. Note that RapidsMPF does not currently support adding or removing workers from the cluster.
- rapidsmpf.integrations.dask.get_worker_context(worker: Worker | None = None) WorkerContext#
Retrieve the
WorkerContextassociated with a Dask worker.If the worker context does not already exist on the worker, it will be created.
- Parameters:
- worker
An optional Dask worker instance. If not provided, the current worker is retrieved using
get_worker().
- Returns:
- The existing or newly initialized worker context.
- rapidsmpf.integrations.dask.rapidsmpf_shuffle_graph(input_name: str, output_name: str, partition_count_in: int, partition_count_out: int, integration: ShufflerIntegration, options: Any, *other_keys: str | tuple[str, int], config_options: Options = <rapidsmpf.config.Options object>) dict[Any, Any]#
Return the task graph for a RapidsMPF shuffle.
- Parameters:
- input_name
The task name for input DataFrame tasks.
- output_name
The task name for output DataFrame tasks.
- partition_count_in
Partition count of input collection.
- partition_count_out
Partition count of output collection.
- integration
Dask-integration specification.
- options
Optional key-word arguments.
- *other_keys
Other keys needed by
integration.insert_partition.- config_options
RapidsMPF configuration options.
- Returns:
- A valid task graph for Dask execution.
Notes
A RapidsMPF shuffle operation comprises four general phases:
Staging phase A new
rapidsmpf.shuffler.Shufflerobject must be staged on every worker in the current Dask cluster.Insertion phase Each input partition is split into a dictionary of chunks, and that dictionary is passed to the appropriate
rapidsmpf.shuffler.Shufflerobject (usingrapidsmpf.shuffler.Shuffler.insert_chunks).The insertion phase will include a single task for each of the
partition_count_inpartitions in the input DataFrame. The partitioning and insertion logic must be defined by theinsert_partitionclassmethod of theintegrationargument.Insertion tasks are NOT restricted to specific Dask workers. These tasks may run anywhere in the cluster.
Barrier phase All
rapidsmpf.shuffler.Shufflerobjects must be ‘informed’ that the insertion phase is complete (on all workers) before the subsequent extraction phase begins. We call this synchronization step the ‘barrier phase’.The barrier phase comprises three types of barrier tasks:
1. First global barrier - A single barrier task is used to signal that all input partitions have been submitted to a
rapidsmpf.shuffler.Shufflerobject on one of the workers. This task may also run anywhere on the cluster, but it must depend on ALL insertion tasks.2. Worker barrier(s) - Each worker must execute a single worker-barrier task. This task will call
insert_finishedfor every output partition on the localrapidsmpf.shuffler.Shuffler. These tasks must be restricted to specific workers, and they must all depend on the first global barrier.3. Second global barrier - A single barrier task is used to signal that all workers are ready to begin the extraction phase. This task may run anywhere on the cluster, but it must depend on all worker-barrier tasks.
Extraction phase Each output partition is extracted from the local
rapidsmpf.shuffler.Shufflerobject on the worker (usingrapidsmpf.shuffler.Shuffler.wait_onandrapidsmpf.integrations.cudf.partition.unpack_and_concat).The extraction phase will include a single task for each of the
partition_count_outpartitions in the shuffled output DataFrame. The extraction logic must be defined by theextract_partitionclassmethod of theintegrationargument.Extraction tasks must be restricted to specific Dask workers, and they must also depend on the second global-barrier task.
Single-process#
Shuffler integration for single-worker pylibcudf execution.
- rapidsmpf.integrations.single.get_worker_context() WorkerContext#
Retrieve the single-worker
WorkerContext.- Returns:
- The worker context
- Raises:
- ValueError
If a worker context was never created.
See also
setup_workerMust be called before this function.
- rapidsmpf.integrations.single.rapidsmpf_shuffle_graph(input_name: str, output_name: str, partition_count_in: int, partition_count_out: int, integration: ShufflerIntegration, options: Any, *other_keys: str | tuple[str, int], config_options: Options = <rapidsmpf.config.Options object>) dict[Any, Any]#
Return the task graph for a RapidsMPF shuffle.
This shuffle will use the single-process RapidsMPF Communicator.
- Parameters:
- input_name
The task name for input DataFrame tasks.
- output_name
The task name for output DataFrame tasks.
- partition_count_in
Partition count of input collection.
- partition_count_out
Partition count of output collection.
- integration
Shuffle-integration specification.
- options
Optional key-word arguments.
- *other_keys
Other keys needed by
integration.insert_partition.- config_options
RapidsMPF configuration options.
- Returns:
- A valid task graph for single-worker execution.
Ray#
Integration for Ray clusters.
- class rapidsmpf.integrations.ray.RapidsMPFActor(nranks: int, statistics: Statistics | None = None)#
RapidsMPFActor is a base class that instantiates a UCXX communicator across all workers.
- Parameters:
- nranks
The number of workers in the cluster.
- statistics
Optional statistics tracking object.
- Attributes:
commThe UCXX communicator object.
statisticsThe statistics object used on this actor.
Methods
Check if the communicator is initialized.
nranks()Get the number of ranks in the UCXX communicator.
rank()Get the rank of the worker, as inferred from the UCXX communicator.
Setup root communicator in the cluster.
setup_worker(root_address_bytes)Setup the worker in the cluster once the root is initialized.
Return a string representation of the actor.
Examples
>>> @ray.remote(num_cpus=1) ... class DummyActor(RapidsMPFActor): ... >>> actors = setup_ray_ucx_cluster(DummyActor, 2) >>> ray.get([actor.status_check.remote() for actor in actors]
- property comm: Communicator#
The UCXX communicator object.
- Returns:
- The UCXX communicator object if initialized, otherwise None
- Raises:
- RuntimeError
If the communicator is not initialized.
Notes
This property is not meant to be called remotely from the client. Then Ray will attempt to serialize the Communicator object, which will fail. Instead, the subclasses can use the
commproperty to access the communicator. For example, to create a Shuffle operation
- is_initialized() bool#
Check if the communicator is initialized.
- Returns:
- True if the communicator is initialized, False otherwise.
- nranks() int#
Get the number of ranks in the UCXX communicator.
- Returns:
- The number of ranks in the UCXX communicator
- rank() int#
Get the rank of the worker, as inferred from the UCXX communicator.
- Returns:
- The rank of the worker
- setup_root() tuple[int, bytes]#
Setup root communicator in the cluster.
- Returns:
- rank
The rank of the root.
- root_address_bytes
The address of the root.
- setup_worker(root_address_bytes: bytes) None#
Setup the worker in the cluster once the root is initialized.
This method needs to be called by every worker including the root.
- Parameters:
- root_address_bytes
The address of the root.
- property statistics: Statistics#
The statistics object used on this actor.
- Returns:
- Statistics object.
- rapidsmpf.integrations.ray.setup_ray_ucxx_cluster(actor_cls: ActorClass, num_workers: int, *args: Any, **kwargs: Any) list[ActorHandle]#
A utility method to setup the UCXX communication using RapidsMPFActor actor objects.
- Parameters:
- actor_cls
The actor class to be instantiated in the cluster.
- num_workers
The number of workers in the cluster.
- *args
Additional arguments to be passed to the actor class.
- **kwargs
Additional keyword arguments to be passed to the actor class.
- Returns:
- gpu_actors
A list of actors in the cluster.
cuDF#
Collection of cuDF specific functions.
Partition#
Partitioning of cuDF tables.
- rapidsmpf.integrations.cudf.partition.partition_and_pack(Table table, columns_to_hash, int num_partitions, Stream stream, BufferResource br)#
Partition rows from the input table into multiple packed (serialized) tables.
- Parameters:
- table
The input table to partition.
- columns_to_hash
Indices of the input columns to use for hashing.
- num_partitions
The number of partitions to create.
- stream
The CUDA stream used for memory operations.
- br
Buffer resource for memory allocations.
- Returns:
- A dictionary where the keys are partition IDs and the values are packed tables.
- Raises:
- IndexError
If any index in
columns_to_hashis invalid.
- rapidsmpf.integrations.cudf.partition.spill_partitions(partitions, BufferResource br)#
Spill partitions from device memory to host memory.
Moves the buffer of each
PackedDatafrom device memory to host memory using the provided buffer resource and the buffer’s CUDA stream. Partitions already in host memory are returned unchanged.For device-resident partitions, a host memory reservation is made before moving the buffer. If the reservation fails due to insufficient host memory, an exception is raised. Overbooking is not allowed.
The input partitions are released and are left empty on return.
- Parameters:
- partitions
The partitions to spill.
- br
Buffer resource used to reserve host memory and perform the move.
- Returns:
- A list of partitions whose buffers reside in host memory.
- Raises:
- ReservationError
If host memory reservation fails.
- rapidsmpf.integrations.cudf.partition.split_and_pack(Table table, splits, Stream stream, BufferResource br)#
Split rows from the input table into multiple packed (serialized) tables.
- Parameters:
- table
The input table to split and pack. The table cannot be empty (the split points would not be valid).
- splits
The split points, one less than the number of result partitions.
- stream
The CUDA stream used for memory operations.
- br
Buffer resource for memory allocations.
- Returns:
- A map of partition IDs and their packed tables.
- Raises:
- IndexError
If the splits are out of range for
[0, len(table)].
- rapidsmpf.integrations.cudf.partition.unpack_and_concat(partitions, Stream stream, BufferResource br)#
Unpack (deserialize) input partitions and concatenate them into a single table.
Empty partitions are ignored.
The unpacking of each partition is stream-ordered on that partition’s own CUDA stream. The returned table is stream-ordered on the provided
streamand synchronized with the unpacking.- Parameters:
- partitions
Packed input tables (partitions).
- stream
CUDA stream on which concatenation occurs and on which the resulting table is ordered.
- br
Buffer resource used for memory allocations.
- Returns:
- The concatenated table resulting from unpacking the input partitions.
- Raises:
- ReservationError
If the buffer resource cannot reserve enough memory to concatenate all partitions.
Notes
The input partitions are released and left empty on return.
- rapidsmpf.integrations.cudf.partition.unspill_partitions(partitions, BufferResource br, bool allow_overbooking)#
Move spilled partitions (i.e., packed tables in host memory) back to device memory.
Each partition is inspected to determine whether its buffer resides in device memory. Buffers already in device memory are left untouched. Host-resident buffers are moved to device memory using the provided buffer resource and the buffer’s CUDA stream.
If insufficient device memory is available, the buffer resource’s spill manager is invoked to free memory. If overbooking occurs and spilling fails to reclaim enough memory, behavior depends on
allow_overbooking.The input partitions are released and are left empty on return.
- Parameters:
- partitions
The partitions to unspill, potentially containing host-resident data.
- br
Buffer resource responsible for memory reservation and spills.
- allow_overbooking
If False, ensures enough memory is freed to satisfy the reservation; otherwise, allows overbooking even if spilling was insufficient.
- Returns:
- A list of partitions whose buffers reside in device memory.
- Raises:
- ReservationError
If overbooking exceeds the amount spilled and
allow_overbooking is False.
Shuffler#
The Shuffler interface for RapidsMPF.
- class rapidsmpf.shuffler.PartitionAssignment(*values)#
- class rapidsmpf.shuffler.Shuffler(Communicator comm, int32_t op_id, uint32_t total_num_partitions, BufferResource br, PartitionAssignment partition_assignment=PartitionAssignment.ROUND_ROBIN)#
Shuffle service for partitioned data.
The
rapidsmpf.shuffler.Shufflerclass provides an interface for performing a shuffle operation on partitioned data. It uses a distribution scheme to distribute and collect data chunks across different ranks.- Parameters:
- comm
The communicator to use for data exchange between ranks.
- op_id
The operation ID of the shuffle. Must have a value between 0 and
max_concurrent_shuffles-1.- total_num_partitions
Total number of partitions in the shuffle.
- br
The buffer resource used to allocate temporary storage and shuffle results.
- partition_assignment
How to assign partition IDs to ranks:
ROUND_ROBIN(default) for load balance (e.g. hash shuffle), orCONTIGUOUSso each rank gets a contiguous range of partition IDs (e.g. for sort so concatenation order matches global order). A custom callable may be supported in the future.
- Attributes:
- max_concurrent_shuffles
Maximum number of concurrent shufflers.
Methods
extract(self, uint32_t pid)Extract all chunks of the specified partition.
finished(self)Check if all partitions are finished.
insert_chunks(self, chunks)Insert a batch of packed (serialized) chunks into the shuffle.
insert_finished(self, pids)Mark partitions as finished.
local_partitions(self)Return the partition IDs owned by this rank.
shutdown(self)Shutdown the shuffle, blocking until all inflight communication is completed.
wait_any(self)Wait for any partition to finish.
wait_on(self, uint32_t pid)Wait for a specific partition to finish.
Notes
This class is designed to handle distributed operations by partitioning data and redistributing it across ranks in a cluster. It is typically used in distributed data processing workflows involving cuDF tables.
The caller promises that inserted buffers are stream-ordered with respect to their own stream, and extracted buffers are likewise guaranteed to be stream- ordered with respect to their own stream.
- comm#
Get the communicator used by the shuffler.
- Returns:
- The communicator.
- extract(self, uint32_t pid)#
Extract all chunks of the specified partition.
- Parameters:
- pid
The partition ID to extract chunks for.
- Returns:
- A list of packed data belonging to the specified partition.
- finished(self)#
Check if all partitions are finished.
This method verifies if all partitions have been completed, meaning all chunks have been inserted and no further data is expected from neither the local nor any remote nodes.
- Returns:
- True if all partitions are finished, otherwise False.
- insert_chunks(self, chunks)#
Insert a batch of packed (serialized) chunks into the shuffle.
- Parameters:
- chunks
A map where keys are partition IDs (
int) and values are packed data (PackedData).
Notes
This method adds the given chunks to the shuffle, associating them with their respective partition IDs.
- insert_finished(self, pids)#
Mark partitions as finished.
This informs the shuffler that no more chunks for the specified partitions will be inserted.
- Parameters:
- pids
Partition IDs to mark as finished (int or an iterable of ints).
Notes
Once a partition is marked as finished, it is considered complete and no further chunks will be accepted for that partition.
- local_partitions(self)#
Return the partition IDs owned by this rank.
- Returns:
- Partition IDs owned by this shuffler.
- shutdown(self)#
Shutdown the shuffle, blocking until all inflight communication is completed.
- Raises:
- RuntimeError
If the shuffler is already inactive.
Notes
This method ensures that all pending shuffle operations and communications are completed before shutting down. It blocks until no inflight operations remain.
- wait_any(self)#
Wait for any partition to finish.
This method blocks until at least one partition is marked as finished. It is useful for processing partitions as they are completed.
- Returns:
- The partition ID of the next finished partition.
- wait_on(self, uint32_t pid)#
Wait for a specific partition to finish.
This method blocks until the desired partition is ready for processing.
- Parameters:
- pid
The desired partition ID.
Communicator#
Submodule for communication abstraction (e.g. UCXX and MPI).
- rapidsmpf.communicator.COMMUNICATORS = ('single', 'ucxx', 'mpi')#
Tuple of available communicators.
RapidsMPF includes a collection of communicator backends, available as submodules under
rapidsmpf.communicator.*. Typically, the Conda distribution includes both UCXX and MPI support, while the PIP installation generally supports only UCXX.
- class rapidsmpf.communicator.communicator.Communicator#
Abstract base class for a communication mechanism between nodes.
Provides an interface for sending and receiving messages between nodes, supporting asynchronous operations, GPU data transfers, and custom logging. Concrete implementations must define the virtual methods to enable specific communication backends.
- Attributes:
loggerGet the logger.
nranksGet the total number of ranks.
progress_threadGet the communicator’s progress thread.
rankGet the rank of this communication node.
Methods
get_str(self)Get a string representation of the communicator.
Notes
This class is designed as an abstract base class, meaning it cannot be instantiated directly. Subclasses are required to implement the necessary methods to support the desired communication backend and functionality.
- get_str(self)#
Get a string representation of the communicator.
- Returns:
- A string describing the communicator
- logger#
Get the logger.
- Returns:
- A logger instance.
- nranks#
Get the total number of ranks.
- Returns:
- Total number of ranks.
- progress_thread#
Get the communicator’s progress thread.
- Returns:
- The progress thread.
- rank#
Get the rank of this communication node.
- Returns:
- The rank.
- class rapidsmpf.communicator.communicator.LOG_LEVEL(*values)#
- class rapidsmpf.communicator.communicator.Logger#
Logger.
- To control the verbosity level, set the environment variable
RAPIDSMPF_LOG: NONE: No logging.
PRINT: General print messages.
WARN: Warning messages (default)
INFO: Informational messages.
DEBUG: Debug messages.
TRACE: Trace messages.
- Attributes:
verbosity_levelGet the verbosity level of the logger.
Methods
debug(self, str msg)Logs a debug message.
info(self, str msg)Logs an informational message.
print(self, str msg)Logs a print message.
trace(self, str msg)Logs a trace message.
warn(self, str msg)Logs a warning message.
- debug(self, str msg)#
Logs a debug message.
- Parameters:
- msg
The message to log.
- info(self, str msg)#
Logs an informational message.
- Parameters:
- msg
The message to log.
- print(self, str msg)#
Logs a print message.
- Parameters:
- msg
The message to log.
- trace(self, str msg)#
Logs a trace message.
- Parameters:
- msg
The message to log.
- verbosity_level#
Get the verbosity level of the logger.
- Returns:
- The verbosity level.
- warn(self, str msg)#
Logs a warning message.
- Parameters:
- msg
The message to log.
- To control the verbosity level, set the environment variable
MPI Communicator#
- rapidsmpf.communicator.mpi.new_communicator(Intracomm comm, Options options, ProgressThread progress_thread)#
Create a new RapidsMPF-MPI communicator based on an existing mpi4py communicator.
- Parameters:
- comm
The existing mpi communicator from mpi4py.
- options
Configuration options.
- progress_thread
Progress thread for the communicator.
- Returns:
- A new RapidsMPF-MPI communicator.
UCXX Communicator#
ucxx-based implementation of a RapidsMPF Communicator.
- rapidsmpf.communicator.ucxx.barrier(Communicator comm)#
Execute a barrier on the UCXX communicator.
Ensures all ranks connected to the root and all ranks reached the barrier before continuing.
Notes
Executing this barrier is required after the ranks are bootstrapped to ensure everyone is connected to the root. An alternative barrier, such as
MPI_Barrierwill not suffice for that purpose.
- rapidsmpf.communicator.ucxx.get_root_ucxx_address(Communicator comm)#
Get the address of the communicator’s UCXX worker.
This function is intended to be called from the root rank to communicate to other processes how to reach the root, but it will return the address of UCXX worker of other ranks too.
- Parameters:
- comm
The RapidsMPF-UCXX communicator.
- Returns:
- A bytes sequence with the UCXX worker address.
- Raises:
- NotImplementedError
If the communicator was created with a HostPortPair, which is not yet supported.
- rapidsmpf.communicator.ucxx.new_communicator(Rank nranks, UCXWorker ucx_worker, UCXAddress root_ucxx_address, Options options, ProgressThread progress_thread)#
Create a new UCXX communicator with the given number of ranks.
An existing UCXWorker may be specified, otherwise one will be created. The root rank is created if no
root_ucxx_addressis specific, all other ranks must specify the the address of the root rank via that argument.- Parameters:
- nranks
The number of ranks in the cluster.
- ucx_worker
An existing UCXX worker to use if specified, otherwise one will be created.
- root_ucxx_address
The UCXX address of the root rank (only specified for non-root ranks).
- options
Configuration options.
- progress_thread
Progress thread for the communicator.
- Returns:
- A new RapidsMPF-UCXX communicator.
Buffer#
Submodule for memory abstraction.
- class rapidsmpf.memory.buffer.MemoryType(*values)#
- class rapidsmpf.memory.memory_reservation.MemoryReservation#
Represents a reservation for future memory allocation.
A reservation is created by
BufferResource.reserveand must be used when allocating buffers through the sameBufferResource.- Attributes:
Methods
clear(self)Clear the remaining size of the reservation.
- br#
Get the buffer resource associated with this reservation.
- Returns:
- The buffer resource associated with this reservation.
- clear(self)#
Clear the remaining size of the reservation.
Resets the reservation so that any remaining, unconsumed bytes are released back to the underlying memory resource. After this call, the reservation has a remaining size of zero and cannot be used to satisfy further allocations.
- mem_type#
Get the type of memory associated with this reservation.
- Returns:
- The memory type associated with this reservation.
- size#
Get the remaining size of the reserved memory.
- Returns:
- The size of the reserved memory in bytes.
- rapidsmpf.memory.memory_reservation.opaque_memory_usage(MemoryReservation reservation)#
Associate untracked memory usage with an existing reservation.
This context manager is intended for code paths that use memory outside of RapidsMPF’s memory reservation system, for example internal allocations in libcudf or other third-party libraries. The memory may be of any type covered by a
MemoryReservation, most commonly device memory.While the context is active, the provided memory reservation is considered consumed by the enclosed code block. On exit, the reservation is cleared, releasing any remaining, unconsumed bytes back to the underlying memory resource.
- Parameters:
- reservation
Memory reservation that accounts for the untracked memory usage.
- Yields:
- The same reservation, which may be passed to APIs that require an explicit
- reservation object.
Examples
Account for allocations outside RapidsMPF: >>> with opaque_memory_usage(ctx, reservation): … # library call that allocates memory unknown to RapidsMPF. … result = library_op(…)
- class rapidsmpf.memory.buffer_resource.AvailableMemoryMap#
Map of functions reporting available memory for different memory types.
This class acts as an opaque handle to C++ memory-availability functions that cannot be directly represented or exposed in Python. It enables RapidsMPF to configure and use such functions from Python while keeping the implementation in C++.
Instances of this class should be constructed from configuration options using the
from_options()factory method.Methods
from_options(cls, RmmResourceAdaptor mr, ...)Construct an AvailableMemoryMap from configuration options.
- classmethod from_options(cls, RmmResourceAdaptor mr, Options options)#
Construct an AvailableMemoryMap from configuration options.
- Parameters:
- mr
Pointer to a memory resource adaptor.
- options
Configuration options.
- Returns:
- The constructed map of memory-available functions.
- class rapidsmpf.memory.buffer_resource.BufferResource#
Class managing buffer resources.
This class handles memory allocation and transfers between different memory types (e.g., host and device). All memory operations in RapidsMPF, such as those performed by the Shuffler, rely on a buffer resource for memory management.
- Parameters:
- device_mr
Reference to the RMM device memory resource used for device allocations.
- pinned_mr
The pinned host memory resource used for
PINNED_HOSTallocations. If None, pinned host allocations are disabled. In that case, any attempt to allocate pinned memory will fail regardless of whatmemory_availablereports.- memory_available
Optional memory availability functions. Memory types without availability functions are unlimited. A function must return the current available memory of a specific type. It must be thread-safe if used by multiple
BufferResourceinstances concurrently. Warning: calling anyBufferResourceinstance methods within the function might result in a deadlock. This is because the buffer resource is locked when the function is called.- periodic_spill_check
Enable periodic spill checks. A dedicated thread continuously checks and perform spilling based on the memory availability functions. The value of
periodic_spill_checkis used as the pause between checks (in seconds). If None, no periodic spill check is performed.- stream_pool
Optional CUDA stream pool to use. If None, a new pool with 16 streams will be created. Must be an instance of
rmm.pylibrmm.cuda_stream_pool.CudaStreamPool.- statistics
The statistics instance to use. If None, a disabled statistics instance will be created.
- Attributes:
device_mrThe memory resource used for device memory allocations.
pinned_mrThe memory resource used for pinned host memory allocations.
- spill_manager
statisticsGets the statistics instance associated with this buffer resource.
Methods
from_options(cls, RmmResourceAdaptor mr, ...)Construct a BufferResource from configuration options.
memory_available(self, MemoryType mem_type)Get the current available memory of the specified memory type.
memory_reserved(self, MemoryType mem_type)Get the current reserved memory of the specified memory type.
release(self, MemoryReservation reservation, ...)Consume a portion of the reserved memory.
reserve(self, MemoryType mem_type, ...)Reserve an amount of the specified memory type.
reserve_device_memory_and_spill(self, ...)Reserve device memory and spill if necessary.
stream_pool_size(self)Get the size of the stream pool.
- device_mr#
The memory resource used for device memory allocations.
- Returns:
- The device memory resource.
- classmethod from_options(cls, RmmResourceAdaptor mr, Options options)#
Construct a BufferResource from configuration options.
This factory method creates a BufferResource using configuration options to initialize all components.
- Parameters:
- mr
RMM resource adaptor. The adaptor must outlive the returned BufferResource.
- options
Configuration options.
- Returns:
- A BufferResource instance configured according to the options.
- memory_available(self, MemoryType mem_type)#
Get the current available memory of the specified memory type.
- memory_reserved(self, MemoryType mem_type)#
Get the current reserved memory of the specified memory type.
- Parameters:
- mem_type
The target memory type.
- Returns:
- The memory reserved, in bytes.
- pinned_mr#
The memory resource used for pinned host memory allocations.
- Returns:
- The pinned host memory resource, or None if pinned host allocations
- are disabled.
- release(self, MemoryReservation reservation, size_t size)#
Consume a portion of the reserved memory.
Reduces the remaining size of the reserved memory by the specified amount.
- Parameters:
- reservation
The memory reservation to consume from.
- size
The number of bytes to consume.
- Returns:
- The remaining size of the reserved memory after consumption.
- Raises:
- ReservationError
If the released size exceeds the total reserved size.
- reserve(self, MemoryType mem_type, size_t size, *, bool allow_overbooking)#
Reserve an amount of the specified memory type.
Creates a new reservation of the specified size and memory type to inform the system about upcoming buffer allocations.
If overbooking is allowed, a reservation of the requested
sizeis returned even if the memory is not currently available. In that case, the caller must guarantee that at least the overbooked amount of memory will be freed before the reservation is used.If overbooking is not allowed, a reservation of size zero is returned on failure.
- Parameters:
- mem_type
The target memory type.
- size
The number of bytes to reserve.
- allow_overbooking
Whether overbooking is permitted.
- Returns:
- A tuple (reservation, overbooked_bytes):
On success, the reservation’s size equals
size.On failure, the reservation’s size equals zero (a zero-sized reservation never fails).
- reserve_device_memory_and_spill(self, size_t size, *, bool allow_overbooking)#
Reserve device memory and spill if necessary.
Attempts to reserve the requested amount of device memory. If insufficient memory is available, spilling is triggered to free space. When overbooking is allowed, the reservation may succeed even if spilling was not sufficient to fully satisfy the request.
- Parameters:
- size
The amount of memory to reserve.
- allow_overbooking
Whether to allow overbooking. If false, ensures enough memory is freed to satisfy the reservation. If true, the reservation may succeed even if spilling was insufficient.
- Returns:
- The resulting memory reservation.
- Raises:
- ReservationError
If overbooking is disabled and the buffer resource cannot free enough device memory through spilling to satisfy the request.
- statistics#
Gets the statistics instance associated with this buffer resource.
- Returns:
- The Statistics instance.
- class rapidsmpf.memory.buffer_resource.LimitAvailableMemory(RmmResourceAdaptor mr, int64_t limit)#
A callback class for querying the remaining available memory within a defined limit from an RMM resource adaptor.
This class is primarily designed to simulate constrained memory environments or prevent memory allocation beyond a specific threshold. It provides information about the available memory by subtracting the memory currently used (as reported by the RMM resource adaptor) from a user-defined limit.
It is typically used in the context of memory management operations such as with
BufferResource.- Parameters:
- mr
A statistics resource adaptor that tracks memory usage and provides statistics about the memory consumption. The
LimitAvailableMemoryinstance keeps a reference tomrto keep it alive.- limit
The maximum memory limit (in bytes). Used to calculate the remaining available memory.
Methods
__call__(*args, **kwargs)Call self as a function.
Notes
The
mrresource must not be destroyed while this object is still in use.Examples
>>> mr = RmmResourceAdaptor(...) >>> memory_limiter = LimitAvailableMemory(mr, limit=1_000_000)
- rapidsmpf.memory.buffer_resource.periodic_spill_check_from_options(Options options)#
Get the
periodic_spill_checkparameter from configuration options.- Parameters:
- options
Configuration options.
- Returns:
- The duration of the pause between spill checks in seconds, or
Noneif - periodic spill checks are disabled.
- The duration of the pause between spill checks in seconds, or
- rapidsmpf.memory.buffer_resource.stream_pool_from_options(Options options)#
Create a new CUDA stream pool from configuration options.
- Parameters:
- options
Configuration options.
- Returns:
- Pool of CUDA streams used throughout RapidsMPF for operations that do not take
- an explicit CUDA stream.
- class rapidsmpf.memory.packed_data.PackedData#
Methods
from_cudf_packed_columns(cls, ...)Constructs a PackedData from CudfPackedColumns by taking the ownership of the data and releasing
packed_columns.from_host_bytes(cls, const uint8_t[, ...)Construct a PackedData from raw host bytes.
to_host_bytes(self)Extract the host bytes from this PackedData.
- classmethod from_cudf_packed_columns(cls, PackedColumns packed_columns, Stream stream, BufferResource br)#
Constructs a PackedData from CudfPackedColumns by taking the ownership of the data and releasing
packed_columns.- Parameters:
- packed_columns
Packed data containing metadata and GPU data buffers
- Returns:
- A new PackedData instance containing the packed columns data
- Raises:
- ValueError
If the PackedColumns object is empty (has been released already).
- classmethod from_host_bytes(cls, const uint8_t[::1] data, BufferResource br)#
Construct a PackedData from raw host bytes.
The bytes are stored in the data buffer (as host memory) with minimal metadata. This is useful for scalar allreduce operations.
Note: This makes a copy of the input data.
- Parameters:
- data
Contiguous buffer of bytes (bytes, bytearray, or buffer-protocol object).
- br
Buffer resource for memory allocation.
- Returns:
- A new PackedData instance containing the bytes.
- to_host_bytes(self) bytes#
Extract the host bytes from this PackedData.
Returns the bytes stored in the data buffer. Works with both host and device memory buffers.
Returns a copy of the bytes stored in the data buffer. Works with both host and device memory buffers. The method synchronizes with the underlying buffer’s CUDA stream before returning.
- Returns:
- The raw bytes.
- Raises:
- ValueError
If the PackedData is empty.
- class rapidsmpf.memory.scoped_memory_record.AllocType(*values)#
- class rapidsmpf.memory.scoped_memory_record.ScopedMemoryRecord#
Scoped memory record for tracking memory usage statistics.
Methods
current(self, AllocType alloc_type=AllocType.ALL)Current memory usage in bytes.
num_current_allocs(self, ...)Number of currently active (non-deallocated) allocations.
num_total_allocs(self, ...)Total number of allocations performed.
peak(self, AllocType alloc_type=AllocType.ALL)Peak memory usage in bytes.
record_allocation(self, ...)Record a memory allocation event.
record_deallocation(self, ...)Record a memory deallocation event.
total(self, AllocType alloc_type=AllocType.ALL)Total number of bytes allocated over the lifetime.
- current(self, AllocType alloc_type=AllocType.ALL)#
Current memory usage in bytes.
- Parameters:
- alloc_type
Allocator type to query. Defaults to ALL.
- Returns:
- Current memory usage in bytes.
- num_current_allocs(self, AllocType alloc_type=AllocType.ALL)#
Number of currently active (non-deallocated) allocations.
- Parameters:
- alloc_type
Allocator type to query. Defaults to ALL.
- Returns:
- Number of active allocations.
- num_total_allocs(self, AllocType alloc_type=AllocType.ALL)#
Total number of allocations performed.
- Parameters:
- alloc_type
Allocator type to query. Defaults to ALL.
- Returns:
- Number of total allocations.
- peak(self, AllocType alloc_type=AllocType.ALL)#
Peak memory usage in bytes.
- Parameters:
- alloc_type
Allocator type to query. Defaults to ALL.
- Returns:
- Peak memory usage in bytes.
- record_allocation(self, AllocType alloc_type, uint64_t nbytes)#
Record a memory allocation event.
Updates internal statistics for memory allocation and adjusts peak usage if the current memory usage exceeds the previous peak.
- Parameters:
- alloc_type
The allocator that performed the allocation.
- nbytes
The number of bytes allocated.
- record_deallocation(self, AllocType alloc_type, uint64_t nbytes)#
Record a memory deallocation event.
Updates internal statistics to reflect memory being freed.
- Parameters:
- alloc_type
The allocator that performed the deallocation.
- nbytes
The number of bytes deallocated.
- total(self, AllocType alloc_type=AllocType.ALL)#
Total number of bytes allocated over the lifetime.
- Parameters:
- alloc_type
Allocator type to query. Defaults to ALL.
- Returns:
- Total allocated bytes.
Config Options#
- class rapidsmpf.config.Optional(value)#
Represents an option value that can be explicitly disabled.
This class wraps an option value and interprets certain strings as indicators that the value is disabled (case-insensitive): {“false”, “no”, “off”, “disable”, “disabled”}.
This is typically used to simplify optional or Optional options with
Options.get_or_default().- Parameters:
- value
The input value to interpret.
- Attributes:
- value
The raw input value, unless it matched a disable keyword, in which case the value is
None.
Examples
>>> from rapidsmpf.config import Optional, Options >>> Optional("OFF").value None
>>> Optional("no").value None
>>> Optional("100").value '100'
>>> Optional("").value ''
>>> opts = Options() >>> opts.get_or_default( ... "dask_periodic_spill_check", ... default_value=Optional(1e-3) ... ).value 0.001
- class rapidsmpf.config.OptionalBytes(value)#
Represents a byte-sized option that can be explicitly disabled.
This class is a specialization of
Optionalthat interprets the input as a human-readable byte size string (e.g., “100 MB”, “1KiB”, “1e6”). If the input is one of the disable keywords (e.g., “off”, “no”, “false”), the value is treated as disabled (None). Otherwise, it is parsed to an integer number of bytes usingrapidsmpf.utils.string.parse_bytes().This is useful for configuration options that may be set to a size limit or explicitly turned off.
- Parameters:
- value
A human-readable byte size (e.g., “1MiB”, “100 MB”) or a disable keyword (case-insensitive), or an integer number of bytes.
- Attributes:
- value
The size in bytes, or
Noneif disabled.
Examples
>>> from rapidsmpf.config import OptionalBytes >>> OptionalBytes("1KiB").value 1024
>>> OptionalBytes("OFF").value is None True
>>> OptionalBytes(2048).value 2048
- class rapidsmpf.config.Options#
Initialize an Options object with a dictionary of string options.
- Parameters:
- options_as_strings
A dictionary representing option names and their corresponding values.
Methods
deserialize(bytes serialized_buffer)Deserialize a binary buffer into an Options object.
get(self, str key, *, return_type, factory)Retrieves a configuration option by key.
get_or_default(self, str key, *, default_value)Retrieve a configuration option by key, using a default value if not present.
get_strings(self)Get all option key-value pairs as strings.
insert_if_absent(self, dict options_as_strings)Insert multiple options if they are not already present.
serialize(self)Serialize the Options object into a binary buffer.
- static deserialize(bytes serialized_buffer)#
Deserialize a binary buffer into an Options object.
This method reconstructs an Options instance from a byte buffer produced by the
Options.serialize()method.See
Options.serialize()for the binary format.- Parameters:
- serialized_buffer
A buffer containing serialized options in the defined binary format.
- Returns:
- Options
A reconstructed Options instance containing the deserialized key-value pairs.
- Raises:
- ValueError
If the input buffer is malformed or inconsistent with the expected format.
- get(self, str key, *, return_type, factory)#
Retrieves a configuration option by key.
If the option is not present, it is constructed using the provided factory function, which receives the string representation of the option (or an empty string if unset). The option is cached after the first access.
The option is cast to the specified
return_type. To be accessible from C++, it must be one of:bool,int,float,str. Otherwise, it is stored as aPyObject*.Once a key has been accessed with a particular
return_type, subsequent calls togetwith the same key must use the samereturn_type. Using a different type for the same key will result in aTypeError.- Parameters:
- Returns:
- The value of the requested option, cast to the specified
return_type.
- The value of the requested option, cast to the specified
- Raises:
- ValueError
If the
return_typeis unsupported, or if the stored option type does not match the expected type.- TypeError
If the option has already been accessed with a different
return_type.
Warning
The factory must not access the Options instance, as this may lead to a deadlock due to internal locking.
- get_or_default(self, str key, *, default_value)#
Retrieve a configuration option by key, using a default value if not present.
This is a convenience wrapper around
get()that uses the type of thedefault_valueas the return type and provides a default factory that parses a string into that type.- Parameters:
- key
The name of the option to retrieve.
- default_value
The default value to return if the option is not set. Its type is used to determine the expected return type.
- Returns:
- The value of the option if it exists and can be parsed to the type of
default_value, otherwisedefault_value.
- Raises:
- ValueError
If the stored option value cannot be parsed to the required type.
- TypeError
If the option has already been accessed with a different return type.
Notes
This method infers the return type from
type(default_value).If
default_valueis used, it will be cached and reused for subsequent accesses of the same key.
Examples
>>> opts = Options() >>> opts.get_or_default("debug", default_value=False) False >>> opts.get_or_default("timeout", default_value=1.5) 1.5 >>> opts.get_or_default("level", default_value="info") 'info'
- get_strings(self)#
Get all option key-value pairs as strings.
Options that do not have a string representation, such as options inserted as typed values in C++ are included with an empty string value.
- Returns:
- A dictionary containing all stored options, where the keys and values are
- both strings.
- insert_if_absent(self, dict options_as_strings)#
Insert multiple options if they are not already present.
Attempts to insert each key-value pair from the provided dictionary, skipping keys that already exist in the options.
- Parameters:
- options_as_strings
Dictionary of option keys mapped to their string representations. Keys are inserted only if they do not already exist. The keys are trimmed and converted to lower case before insertion.
- Returns:
- Number of newly inserted options (0 if none were added).
- serialize(self) bytes#
Serialize the Options object into a binary buffer.
This method produces a compact binary representation of the internal key-value options. The format is suitable for storage or transmission and can be later restored using
Options.deserialize().- The binary format is:
[uint64_t count] — number of key-value pairs
[count * 2 * uint64_t] — offset pairs (key_offset, value_offset)
[raw bytes] — key and value strings stored contiguously
- Returns:
- bytes
A
bytesobject containing the serialized binary representation of the options.
- Raises:
- ValueError
If any option has already been accessed and cannot be serialized.
Notes
An Options instance can only be serialized if no options have been accessed. This is because serialization is based on the original string representations of the options. Once an option has been accessed and parsed, its string value may no longer accurately reflect its state, making serialization potentially inconsistent.
- rapidsmpf.config.get_environment_variables(str key_regex='RAPIDSMPF_(.*)')#
Returns a dictionary of environment variables matching a given regular expression.
This function scans the current process’s environment variables and inserts those whose keys match the provided regular expression. The regular expression must contain exactly one capture group to extract the portion of the environment variable key to use as the dictionary key.
For example, to strip the
RAPIDSMPF_prefix, user"RAPIDSMPF_(.*)"as the regex. The captured group will be used as the key in the output dictionary.- Example:
Environment variable: RAPIDSMPF_FOO=bar
key_regex: r”RAPIDSMPF_(.*)”
Resulting dictionary entry: { “FOO”: “bar” }
- Parameters:
- key_regex
A regular expression with a single capture group to match and extract the environment variable keys.
- Returns:
- A dictionary containing all matching environment variables, with keys as
- extracted by the capture group.
- Raises:
- ValueError
If
key_regexdoes not contain exactly one capture group.
See also
os.environDictionary of the current environment variables.
Statistics#
- class rapidsmpf.statistics.MemoryRecord(scoped: ScopedMemoryRecord, global_peak: int, num_calls: int)#
Holds memory profiling statistics for a named scope.
- Attributes:
- scoped
Memory statistics collected while the scope was active, including number of allocations, peak bytes allocated, and total allocated bytes.
- global_peak
The maximum global memory usage observed during the scope, including allocations from other threads or nested scopes.
- num_calls
Number of times the profiling context with this name was entered.
- class rapidsmpf.statistics.MemoryRecorder#
A context manager for recording memory allocation statistics within a code block.
This class is not intended to be used directly by end users. Instead, use
Statistics.memory_profiling(), which creates and manages an instance of this class.- Parameters:
- stats
The statistics object responsible for aggregating memory profiling data.
- mr
The memory resource through which allocations are tracked.
- name
The name of the profiling scope. Used as a key in the statistics record.
- class rapidsmpf.statistics.Statistics(bool enable, *, RmmResourceAdaptor mr=None)#
Track statistics across RapidsMPF operations.
- Parameters:
- enable
Whether statistics tracking is enabled.
- mr
Enable memory profiling by providing a RMM resource adaptor.
- Attributes:
enabledChecks if statistics is enabled.
memory_profiling_enabledChecks if memory profiling is enabled.
Methods
add_stat(self, name, double value)Adds a value to a statistic.
clear(self)Clears all statistics.
from_options(cls, RmmResourceAdaptor mr, ...)Construct from configuration options.
get_memory_records(self)Retrieves all memory profiling records stored by this instance.
get_stat(self, name)Retrieves a statistic by name.
list_stat_names(self)Returns a list of all statistic names.
memory_profiling(self, name)Create a scoped memory profiling context for a named code region.
report(self)Generates a report of statistics in a formatted string.
write_json(self, filepath)Writes a JSON report of all collected statistics to a file.
write_json_string(self)Returns a JSON representation of all collected statistics as a string.
- add_stat(self, name, double value)#
Adds a value to a statistic.
- Parameters:
- name
Name of the statistic.
- value
Value to add.
- enabled#
Checks if statistics is enabled.
Operations on disabled statistics is no-ops.
- Returns:
- True if statistics is enabled, otherwise False.
- classmethod from_options(cls, RmmResourceAdaptor mr, Options options)#
Construct from configuration options.
- Parameters:
- mr
Pointer to a memory resource used for memory profiling.
- options
Configuration options.
- Returns:
- The constructed Statistics instance.
- get_memory_records(self)#
Retrieves all memory profiling records stored by this instance.
- Returns:
- Dictionary mapping record names to memory usage data.
- get_stat(self, name)#
Retrieves a statistic by name.
- Parameters:
- name
Name of the statistic to retrieve.
- Returns:
- A dict of the statistic.
- Raises:
- KeyError
If the statistic with the specified name does not exist.
- list_stat_names(self)#
Returns a list of all statistic names.
- memory_profiling(self, name)#
Create a scoped memory profiling context for a named code region.
Returns a context manager that tracks memory allocations and deallocations made through the associated memory resource while the context is active. The profiling data is aggregated under the provided
nameand made available viaStatistics.get_memory_records().The statistics include: - Total and peak memory allocated within the scope (
scoped) - Global peak memory usage during the scope (global_peak) - Number of times the named scope was entered (num_calls)If memory profiling is disabled or the memory resource is
None, this is a no-op.- Parameters:
- name
A unique identifier for the profiling scope. Used as a key when accessing profiling data via
Statistics.get_memory_records().
- Returns:
- A context manager that collects memory profiling data.
Examples
>>> import rmm >>> mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) >>> stats = Statistics(enable=True, mr=mr) >>> with stats.memory_profiling("outer"): ... b1 = rmm.DeviceBuffer(size=1024, mr=mr) ... with stats.memory_profiling("inner"): ... b2 = rmm.DeviceBuffer(size=1024, mr=mr) >>> inner = stats.get_memory_records()["inner"] >>> print(inner.scoped.peak()) 1024 >>> outer = stats.get_memory_records()["outer"] >>> print(outer.scoped.peak()) 2048
- memory_profiling_enabled#
Checks if memory profiling is enabled.
- Returns:
- True if memory profiling is enabled, otherwise False.
- report(self)#
Generates a report of statistics in a formatted string.
Operations on disabled statistics is no-ops.
- Returns:
- A string representing the formatted statistics report.
- write_json(self, filepath) None#
Writes a JSON report of all collected statistics to a file.
- Parameters:
- filepath
Path to the output file. Created or overwritten.
- Raises:
- OSError
If the file cannot be opened or writing fails.
- ValueError
If any stat name or memory record name contains a double quote, backslash, or ASCII control character (0x00–0x1F).
RMM Resource Adaptor#
- class rapidsmpf.rmm_resource_adaptor.RmmResourceAdaptor#
A RMM memory resource adaptor tailored to RapidsMPF.
- Attributes:
current_allocatedRmmResourceAdaptor.current_allocated: int
- fallback_mr
- upstream_mr
Methods
allocate(self, size_t nbytes, ...)Allocate
nbytesbytes of memory.deallocate(self, uintptr_t ptr, ...)Deallocate memory pointed to by
ptrof sizenbytes.get_main_record(self)Returns a copy of the main memory record.
get_upstream(self)- current_allocated#
RmmResourceAdaptor.current_allocated: int
Get the total number of currently allocated bytes.
This includes both allocations on the primary and fallback memory resources.
- Returns:
- Total number of currently allocated bytes.
- get_main_record(self)#
Returns a copy of the main memory record.
The main record tracks memory statistics for the lifetime of the resource.
- Returns:
- A copy of the current main memory record.