Python API Reference#

This page contains the API reference for rapidsmpf.

Integrations#

The subpackages under rapidsmpf.integrations contain integrations with other libraries.

Generic#

RapidsMPF Integrations.

class rapidsmpf.integrations.WorkerContext(br: BufferResource, statistics: Statistics, comm: Communicator | None = None, spill_collection: SpillCollection = <factory>, shufflers: dict[int, Shuffler] = <factory>, options: Options = <factory>)#

RapidsMPF specific attributes for a worker.

Attributes:
lock

The global worker lock. Must be acquired before accessing attributes that might be modified while the worker is running such as the shufflers.

br

The buffer resource used by the worker exclusively.

statistics

The statistics used by the worker. If None, statistics is disabled.

comm

The communicator connected to all other workers.

spill_collection

A collection of Python objects that can be spilled to free up device memory.

shufflers

A mapping from shuffler IDs to active shuffler instances.

options

Configuration options.

Methods

get_statistics()

Get the statistics from the worker context.

get_statistics() dict[str, dict[str, int | float]]#

Get the statistics from the worker context.

Returns:
statistics

A dictionary of statistics. The keys are the names of the statistics. The values are dictionaries with two keys:

  • “count” is the number of times the statistic was recorded.

  • “value” is the value of the statistic.

Notes

Statistics are global across all shuffles. To measure statistics for any given shuffle, gather statistics before and after the shuffle and compute the difference.

Dask#

RapidsMPF Dask Integrations.

rapidsmpf.integrations.dask.bootstrap_dask_cluster(client: ~distributed.client.Client, *, options: ~rapidsmpf.config.Options = <rapidsmpf.config.Options object>) None#

Setup a Dask cluster for RapidsMPF shuffling.

Calling bootstrap_dask_cluster multiple times on the same worker is a noop, which also means that any new options values are ignored.

Parameters:
client

The current Dask client.

options

Configuration options. Reads environment variables for any options not set explicitly using get_environment_variables().

Notes

This utility must be executed before RapidsMPF shuffling can be used within a Dask cluster. This function is called automatically by rapidsmpf.integrations.dask.rapidsmpf_shuffle_graph, but may be called manually to set things up before the first shuffle.

Subsequent shuffles on the same cluster will reuse the resources established on the cluster by this function.

All the workers reported by distributed.Client.scheduler_info() will be used. Note that RapidsMPF does not currently support adding or removing workers from the cluster.

rapidsmpf.integrations.dask.get_worker_context(worker: Worker | None = None) WorkerContext#

Retrieve the WorkerContext associated with a Dask worker.

If the worker context does not already exist on the worker, it will be created.

Parameters:
worker

An optional Dask worker instance. If not provided, the current worker is retrieved using get_worker().

Returns:
The existing or newly initialized worker context.
rapidsmpf.integrations.dask.rapidsmpf_shuffle_graph(input_name: str, output_name: str, partition_count_in: int, partition_count_out: int, integration: ShufflerIntegration, options: Any, *other_keys: str | tuple[str, int], config_options: Options = <rapidsmpf.config.Options object>) dict[Any, Any]#

Return the task graph for a RapidsMPF shuffle.

Parameters:
input_name

The task name for input DataFrame tasks.

output_name

The task name for output DataFrame tasks.

partition_count_in

Partition count of input collection.

partition_count_out

Partition count of output collection.

integration

Dask-integration specification.

options

Optional key-word arguments.

*other_keys

Other keys needed by integration.insert_partition.

config_options

RapidsMPF configuration options.

Returns:
A valid task graph for Dask execution.

Notes

A RapidsMPF shuffle operation comprises four general phases:

Staging phase A new rapidsmpf.shuffler.Shuffler object must be staged on every worker in the current Dask cluster.

Insertion phase Each input partition is split into a dictionary of chunks, and that dictionary is passed to the appropriate rapidsmpf.shuffler.Shuffler object (using rapidsmpf.shuffler.Shuffler.insert_chunks).

The insertion phase will include a single task for each of the partition_count_in partitions in the input DataFrame. The partitioning and insertion logic must be defined by the insert_partition classmethod of the integration argument.

Insertion tasks are NOT restricted to specific Dask workers. These tasks may run anywhere in the cluster.

Barrier phase All rapidsmpf.shuffler.Shuffler objects must be ‘informed’ that the insertion phase is complete (on all workers) before the subsequent extraction phase begins. We call this synchronization step the ‘barrier phase’.

The barrier phase comprises three types of barrier tasks:

1. First global barrier - A single barrier task is used to signal that all input partitions have been submitted to a rapidsmpf.shuffler.Shuffler object on one of the workers. This task may also run anywhere on the cluster, but it must depend on ALL insertion tasks.

2. Worker barrier(s) - Each worker must execute a single worker-barrier task. This task will call insert_finished for every output partition on the local rapidsmpf.shuffler.Shuffler. These tasks must be restricted to specific workers, and they must all depend on the first global barrier.

3. Second global barrier - A single barrier task is used to signal that all workers are ready to begin the extraction phase. This task may run anywhere on the cluster, but it must depend on all worker-barrier tasks.

Extraction phase Each output partition is extracted from the local rapidsmpf.shuffler.Shuffler object on the worker (using rapidsmpf.shuffler.Shuffler.wait_on and rapidsmpf.integrations.cudf.partition.unpack_and_concat).

The extraction phase will include a single task for each of the partition_count_out partitions in the shuffled output DataFrame. The extraction logic must be defined by the extract_partition classmethod of the integration argument.

Extraction tasks must be restricted to specific Dask workers, and they must also depend on the second global-barrier task.

Single-process#

Shuffler integration for single-worker pylibcudf execution.

rapidsmpf.integrations.single.get_worker_context() WorkerContext#

Retrieve the single-worker WorkerContext.

Returns:
The worker context
Raises:
ValueError

If a worker context was never created.

See also

setup_worker

Must be called before this function.

rapidsmpf.integrations.single.rapidsmpf_shuffle_graph(input_name: str, output_name: str, partition_count_in: int, partition_count_out: int, integration: ShufflerIntegration, options: Any, *other_keys: str | tuple[str, int], config_options: Options = <rapidsmpf.config.Options object>) dict[Any, Any]#

Return the task graph for a RapidsMPF shuffle.

This shuffle will use the single-process RapidsMPF Communicator.

Parameters:
input_name

The task name for input DataFrame tasks.

output_name

The task name for output DataFrame tasks.

partition_count_in

Partition count of input collection.

partition_count_out

Partition count of output collection.

integration

Shuffle-integration specification.

options

Optional key-word arguments.

*other_keys

Other keys needed by integration.insert_partition.

config_options

RapidsMPF configuration options.

Returns:
A valid task graph for single-worker execution.
rapidsmpf.integrations.single.setup_worker(options: ~rapidsmpf.config.Options = <rapidsmpf.config.Options object>) None#

Attach RapidsMPF shuffling attributes to a single worker.

Parameters:
options

Configuration options.

Ray#

Integration for Ray clusters.

class rapidsmpf.integrations.ray.RapidsMPFActor(nranks: int, statistics: Statistics | None = None)#

RapidsMPFActor is a base class that instantiates a UCXX communicator across all workers.

Parameters:
nranks

The number of workers in the cluster.

statistics

Optional statistics tracking object.

Attributes:
comm

The UCXX communicator object.

statistics

The statistics object used on this actor.

Methods

is_initialized()

Check if the communicator is initialized.

nranks()

Get the number of ranks in the UCXX communicator.

rank()

Get the rank of the worker, as inferred from the UCXX communicator.

setup_root()

Setup root communicator in the cluster.

setup_worker(root_address_bytes)

Setup the worker in the cluster once the root is initialized.

to_string()

Return a string representation of the actor.

Examples

>>> @ray.remote(num_cpus=1)
... class DummyActor(RapidsMPFActor): ...
>>> actors = setup_ray_ucx_cluster(DummyActor, 2)
>>> ray.get([actor.status_check.remote() for actor in actors]
property comm: Communicator#

The UCXX communicator object.

Returns:
The UCXX communicator object if initialized, otherwise None
Raises:
RuntimeError

If the communicator is not initialized.

Notes

This property is not meant to be called remotely from the client. Then Ray will attempt to serialize the Communicator object, which will fail. Instead, the subclasses can use the comm property to access the communicator. For example, to create a Shuffle operation

is_initialized() bool#

Check if the communicator is initialized.

Returns:
True if the communicator is initialized, False otherwise.
nranks() int#

Get the number of ranks in the UCXX communicator.

Returns:
The number of ranks in the UCXX communicator
rank() int#

Get the rank of the worker, as inferred from the UCXX communicator.

Returns:
The rank of the worker
setup_root() tuple[int, bytes]#

Setup root communicator in the cluster.

Returns:
rank

The rank of the root.

root_address_bytes

The address of the root.

setup_worker(root_address_bytes: bytes) None#

Setup the worker in the cluster once the root is initialized.

This method needs to be called by every worker including the root.

Parameters:
root_address_bytes

The address of the root.

property statistics: Statistics#

The statistics object used on this actor.

Returns:
Statistics object.
to_string() str#

Return a string representation of the actor.

Returns:
A string representation of the actor
Raises:
RuntimeError

If the communicator is not initialized.

rapidsmpf.integrations.ray.setup_ray_ucxx_cluster(actor_cls: ActorClass, num_workers: int, *args: Any, **kwargs: Any) list[ActorHandle]#

A utility method to setup the UCXX communication using RapidsMPFActor actor objects.

Parameters:
actor_cls

The actor class to be instantiated in the cluster.

num_workers

The number of workers in the cluster.

*args

Additional arguments to be passed to the actor class.

**kwargs

Additional keyword arguments to be passed to the actor class.

Returns:
gpu_actors

A list of actors in the cluster.

cuDF#

Collection of cuDF specific functions.

Partition#

Partitioning of cuDF tables.

rapidsmpf.integrations.cudf.partition.partition_and_pack(Table table, columns_to_hash, int num_partitions, Stream stream, BufferResource br)#

Partition rows from the input table into multiple packed (serialized) tables.

Parameters:
table

The input table to partition.

columns_to_hash

Indices of the input columns to use for hashing.

num_partitions

The number of partitions to create.

stream

The CUDA stream used for memory operations.

br

Buffer resource for memory allocations.

Returns:
A dictionary where the keys are partition IDs and the values are packed tables.
Raises:
IndexError

If any index in columns_to_hash is invalid.

rapidsmpf.integrations.cudf.partition.spill_partitions(partitions, BufferResource br)#

Spill partitions from device memory to host memory.

Moves the buffer of each PackedData from device memory to host memory using the provided buffer resource and the buffer’s CUDA stream. Partitions already in host memory are returned unchanged.

For device-resident partitions, a host memory reservation is made before moving the buffer. If the reservation fails due to insufficient host memory, an exception is raised. Overbooking is not allowed.

The input partitions are released and are left empty on return.

Parameters:
partitions

The partitions to spill.

br

Buffer resource used to reserve host memory and perform the move.

Returns:
A list of partitions whose buffers reside in host memory.
Raises:
ReservationError

If host memory reservation fails.

rapidsmpf.integrations.cudf.partition.split_and_pack(Table table, splits, Stream stream, BufferResource br)#

Split rows from the input table into multiple packed (serialized) tables.

Parameters:
table

The input table to split and pack. The table cannot be empty (the split points would not be valid).

splits

The split points, one less than the number of result partitions.

stream

The CUDA stream used for memory operations.

br

Buffer resource for memory allocations.

Returns:
A map of partition IDs and their packed tables.
Raises:
IndexError

If the splits are out of range for [0, len(table)].

rapidsmpf.integrations.cudf.partition.unpack_and_concat(partitions, Stream stream, BufferResource br)#

Unpack (deserialize) input partitions and concatenate them into a single table.

Empty partitions are ignored.

The unpacking of each partition is stream-ordered on that partition’s own CUDA stream. The returned table is stream-ordered on the provided stream and synchronized with the unpacking.

Parameters:
partitions

Packed input tables (partitions).

stream

CUDA stream on which concatenation occurs and on which the resulting table is ordered.

br

Buffer resource used for memory allocations.

Returns:
The concatenated table resulting from unpacking the input partitions.
Raises:
ReservationError

If the buffer resource cannot reserve enough memory to concatenate all partitions.

Notes

The input partitions are released and left empty on return.

rapidsmpf.integrations.cudf.partition.unspill_partitions(partitions, BufferResource br, bool allow_overbooking)#

Move spilled partitions (i.e., packed tables in host memory) back to device memory.

Each partition is inspected to determine whether its buffer resides in device memory. Buffers already in device memory are left untouched. Host-resident buffers are moved to device memory using the provided buffer resource and the buffer’s CUDA stream.

If insufficient device memory is available, the buffer resource’s spill manager is invoked to free memory. If overbooking occurs and spilling fails to reclaim enough memory, behavior depends on allow_overbooking.

The input partitions are released and are left empty on return.

Parameters:
partitions

The partitions to unspill, potentially containing host-resident data.

br

Buffer resource responsible for memory reservation and spills.

allow_overbooking

If False, ensures enough memory is freed to satisfy the reservation; otherwise, allows overbooking even if spilling was insufficient.

Returns:
A list of partitions whose buffers reside in device memory.
Raises:
ReservationError

If overbooking exceeds the amount spilled and allow_overbooking is False.

Shuffler#

The Shuffler interface for RapidsMPF.

class rapidsmpf.shuffler.PartitionAssignment(*values)#
ROUND_ROBIN#
CONTIGUOUS#
class rapidsmpf.shuffler.Shuffler(Communicator comm, int32_t op_id, uint32_t total_num_partitions, BufferResource br, PartitionAssignment partition_assignment=PartitionAssignment.ROUND_ROBIN)#

Shuffle service for partitioned data.

The rapidsmpf.shuffler.Shuffler class provides an interface for performing a shuffle operation on partitioned data. It uses a distribution scheme to distribute and collect data chunks across different ranks.

Parameters:
comm

The communicator to use for data exchange between ranks.

op_id

The operation ID of the shuffle. Must have a value between 0 and max_concurrent_shuffles-1.

total_num_partitions

Total number of partitions in the shuffle.

br

The buffer resource used to allocate temporary storage and shuffle results.

partition_assignment

How to assign partition IDs to ranks: ROUND_ROBIN (default) for load balance (e.g. hash shuffle), or CONTIGUOUS so each rank gets a contiguous range of partition IDs (e.g. for sort so concatenation order matches global order). A custom callable may be supported in the future.

Attributes:
max_concurrent_shuffles

Maximum number of concurrent shufflers.

Methods

extract(self, uint32_t pid)

Extract all chunks of the specified partition.

finished(self)

Check if all partitions are finished.

insert_chunks(self, chunks)

Insert a batch of packed (serialized) chunks into the shuffle.

insert_finished(self, pids)

Mark partitions as finished.

local_partitions(self)

Return the partition IDs owned by this rank.

shutdown(self)

Shutdown the shuffle, blocking until all inflight communication is completed.

wait_any(self)

Wait for any partition to finish.

wait_on(self, uint32_t pid)

Wait for a specific partition to finish.

Notes

This class is designed to handle distributed operations by partitioning data and redistributing it across ranks in a cluster. It is typically used in distributed data processing workflows involving cuDF tables.

The caller promises that inserted buffers are stream-ordered with respect to their own stream, and extracted buffers are likewise guaranteed to be stream- ordered with respect to their own stream.

comm#

Get the communicator used by the shuffler.

Returns:
The communicator.
extract(self, uint32_t pid)#

Extract all chunks of the specified partition.

Parameters:
pid

The partition ID to extract chunks for.

Returns:
A list of packed data belonging to the specified partition.
finished(self)#

Check if all partitions are finished.

This method verifies if all partitions have been completed, meaning all chunks have been inserted and no further data is expected from neither the local nor any remote nodes.

Returns:
True if all partitions are finished, otherwise False.
insert_chunks(self, chunks)#

Insert a batch of packed (serialized) chunks into the shuffle.

Parameters:
chunks

A map where keys are partition IDs (int) and values are packed data (PackedData).

Notes

This method adds the given chunks to the shuffle, associating them with their respective partition IDs.

insert_finished(self, pids)#

Mark partitions as finished.

This informs the shuffler that no more chunks for the specified partitions will be inserted.

Parameters:
pids

Partition IDs to mark as finished (int or an iterable of ints).

Notes

Once a partition is marked as finished, it is considered complete and no further chunks will be accepted for that partition.

local_partitions(self)#

Return the partition IDs owned by this rank.

Returns:
Partition IDs owned by this shuffler.
shutdown(self)#

Shutdown the shuffle, blocking until all inflight communication is completed.

Raises:
RuntimeError

If the shuffler is already inactive.

Notes

This method ensures that all pending shuffle operations and communications are completed before shutting down. It blocks until no inflight operations remain.

wait_any(self)#

Wait for any partition to finish.

This method blocks until at least one partition is marked as finished. It is useful for processing partitions as they are completed.

Returns:
The partition ID of the next finished partition.
wait_on(self, uint32_t pid)#

Wait for a specific partition to finish.

This method blocks until the desired partition is ready for processing.

Parameters:
pid

The desired partition ID.

Communicator#

Submodule for communication abstraction (e.g. UCXX and MPI).

rapidsmpf.communicator.COMMUNICATORS = ('single', 'ucxx', 'mpi')#

Tuple of available communicators.

RapidsMPF includes a collection of communicator backends, available as submodules under rapidsmpf.communicator.*. Typically, the Conda distribution includes both UCXX and MPI support, while the PIP installation generally supports only UCXX.

class rapidsmpf.communicator.communicator.Communicator#

Abstract base class for a communication mechanism between nodes.

Provides an interface for sending and receiving messages between nodes, supporting asynchronous operations, GPU data transfers, and custom logging. Concrete implementations must define the virtual methods to enable specific communication backends.

Attributes:
logger

Get the logger.

nranks

Get the total number of ranks.

progress_thread

Get the communicator’s progress thread.

rank

Get the rank of this communication node.

Methods

get_str(self)

Get a string representation of the communicator.

Notes

This class is designed as an abstract base class, meaning it cannot be instantiated directly. Subclasses are required to implement the necessary methods to support the desired communication backend and functionality.

get_str(self)#

Get a string representation of the communicator.

Returns:
A string describing the communicator
logger#

Get the logger.

Returns:
A logger instance.
nranks#

Get the total number of ranks.

Returns:
Total number of ranks.
progress_thread#

Get the communicator’s progress thread.

Returns:
The progress thread.
rank#

Get the rank of this communication node.

Returns:
The rank.
class rapidsmpf.communicator.communicator.LOG_LEVEL(*values)#
NONE#
PRINT#
WARN#
INFO#
DEBUG#
TRACE#
class rapidsmpf.communicator.communicator.Logger#

Logger.

To control the verbosity level, set the environment variable RAPIDSMPF_LOG:
  • NONE: No logging.

  • PRINT: General print messages.

  • WARN: Warning messages (default)

  • INFO: Informational messages.

  • DEBUG: Debug messages.

  • TRACE: Trace messages.

Attributes:
verbosity_level

Get the verbosity level of the logger.

Methods

debug(self, str msg)

Logs a debug message.

info(self, str msg)

Logs an informational message.

print(self, str msg)

Logs a print message.

trace(self, str msg)

Logs a trace message.

warn(self, str msg)

Logs a warning message.

debug(self, str msg)#

Logs a debug message.

Parameters:
msg

The message to log.

info(self, str msg)#

Logs an informational message.

Parameters:
msg

The message to log.

print(self, str msg)#

Logs a print message.

Parameters:
msg

The message to log.

trace(self, str msg)#

Logs a trace message.

Parameters:
msg

The message to log.

verbosity_level#

Get the verbosity level of the logger.

Returns:
The verbosity level.
warn(self, str msg)#

Logs a warning message.

Parameters:
msg

The message to log.

MPI Communicator#

rapidsmpf.communicator.mpi.new_communicator(Intracomm comm, Options options, ProgressThread progress_thread)#

Create a new RapidsMPF-MPI communicator based on an existing mpi4py communicator.

Parameters:
comm

The existing mpi communicator from mpi4py.

options

Configuration options.

progress_thread

Progress thread for the communicator.

Returns:
A new RapidsMPF-MPI communicator.

UCXX Communicator#

ucxx-based implementation of a RapidsMPF Communicator.

rapidsmpf.communicator.ucxx.barrier(Communicator comm)#

Execute a barrier on the UCXX communicator.

Ensures all ranks connected to the root and all ranks reached the barrier before continuing.

Notes

Executing this barrier is required after the ranks are bootstrapped to ensure everyone is connected to the root. An alternative barrier, such as MPI_Barrier will not suffice for that purpose.

rapidsmpf.communicator.ucxx.get_root_ucxx_address(Communicator comm)#

Get the address of the communicator’s UCXX worker.

This function is intended to be called from the root rank to communicate to other processes how to reach the root, but it will return the address of UCXX worker of other ranks too.

Parameters:
comm

The RapidsMPF-UCXX communicator.

Returns:
A bytes sequence with the UCXX worker address.
Raises:
NotImplementedError

If the communicator was created with a HostPortPair, which is not yet supported.

rapidsmpf.communicator.ucxx.new_communicator(Rank nranks, UCXWorker ucx_worker, UCXAddress root_ucxx_address, Options options, ProgressThread progress_thread)#

Create a new UCXX communicator with the given number of ranks.

An existing UCXWorker may be specified, otherwise one will be created. The root rank is created if no root_ucxx_address is specific, all other ranks must specify the the address of the root rank via that argument.

Parameters:
nranks

The number of ranks in the cluster.

ucx_worker

An existing UCXX worker to use if specified, otherwise one will be created.

root_ucxx_address

The UCXX address of the root rank (only specified for non-root ranks).

options

Configuration options.

progress_thread

Progress thread for the communicator.

Returns:
A new RapidsMPF-UCXX communicator.

Buffer#

Submodule for memory abstraction.

class rapidsmpf.memory.buffer.MemoryType(*values)#
DEVICE#
PINNED_HOST#
HOST#
class rapidsmpf.memory.memory_reservation.MemoryReservation#

Represents a reservation for future memory allocation.

A reservation is created by BufferResource.reserve and must be used when allocating buffers through the same BufferResource.

Attributes:
br

Get the buffer resource associated with this reservation.

mem_type

Get the type of memory associated with this reservation.

size

Get the remaining size of the reserved memory.

Methods

clear(self)

Clear the remaining size of the reservation.

br#

Get the buffer resource associated with this reservation.

Returns:
The buffer resource associated with this reservation.
clear(self)#

Clear the remaining size of the reservation.

Resets the reservation so that any remaining, unconsumed bytes are released back to the underlying memory resource. After this call, the reservation has a remaining size of zero and cannot be used to satisfy further allocations.

mem_type#

Get the type of memory associated with this reservation.

Returns:
The memory type associated with this reservation.
size#

Get the remaining size of the reserved memory.

Returns:
The size of the reserved memory in bytes.
rapidsmpf.memory.memory_reservation.opaque_memory_usage(MemoryReservation reservation)#

Associate untracked memory usage with an existing reservation.

This context manager is intended for code paths that use memory outside of RapidsMPF’s memory reservation system, for example internal allocations in libcudf or other third-party libraries. The memory may be of any type covered by a MemoryReservation, most commonly device memory.

While the context is active, the provided memory reservation is considered consumed by the enclosed code block. On exit, the reservation is cleared, releasing any remaining, unconsumed bytes back to the underlying memory resource.

Parameters:
reservation

Memory reservation that accounts for the untracked memory usage.

Yields:
The same reservation, which may be passed to APIs that require an explicit
reservation object.

Examples

Account for allocations outside RapidsMPF: >>> with opaque_memory_usage(ctx, reservation): … # library call that allocates memory unknown to RapidsMPF. … result = library_op(…)

class rapidsmpf.memory.buffer_resource.AvailableMemoryMap#

Map of functions reporting available memory for different memory types.

This class acts as an opaque handle to C++ memory-availability functions that cannot be directly represented or exposed in Python. It enables RapidsMPF to configure and use such functions from Python while keeping the implementation in C++.

Instances of this class should be constructed from configuration options using the from_options() factory method.

Methods

from_options(cls, RmmResourceAdaptor mr, ...)

Construct an AvailableMemoryMap from configuration options.

classmethod from_options(cls, RmmResourceAdaptor mr, Options options)#

Construct an AvailableMemoryMap from configuration options.

Parameters:
mr

Pointer to a memory resource adaptor.

options

Configuration options.

Returns:
The constructed map of memory-available functions.
class rapidsmpf.memory.buffer_resource.BufferResource#

Class managing buffer resources.

This class handles memory allocation and transfers between different memory types (e.g., host and device). All memory operations in RapidsMPF, such as those performed by the Shuffler, rely on a buffer resource for memory management.

Parameters:
device_mr

Reference to the RMM device memory resource used for device allocations.

pinned_mr

The pinned host memory resource used for PINNED_HOST allocations. If None, pinned host allocations are disabled. In that case, any attempt to allocate pinned memory will fail regardless of what memory_available reports.

memory_available

Optional memory availability functions. Memory types without availability functions are unlimited. A function must return the current available memory of a specific type. It must be thread-safe if used by multiple BufferResource instances concurrently. Warning: calling any BufferResource instance methods within the function might result in a deadlock. This is because the buffer resource is locked when the function is called.

periodic_spill_check

Enable periodic spill checks. A dedicated thread continuously checks and perform spilling based on the memory availability functions. The value of periodic_spill_check is used as the pause between checks (in seconds). If None, no periodic spill check is performed.

stream_pool

Optional CUDA stream pool to use. If None, a new pool with 16 streams will be created. Must be an instance of rmm.pylibrmm.cuda_stream_pool.CudaStreamPool.

statistics

The statistics instance to use. If None, a disabled statistics instance will be created.

Attributes:
device_mr

The memory resource used for device memory allocations.

pinned_mr

The memory resource used for pinned host memory allocations.

spill_manager
statistics

Gets the statistics instance associated with this buffer resource.

Methods

from_options(cls, RmmResourceAdaptor mr, ...)

Construct a BufferResource from configuration options.

memory_available(self, MemoryType mem_type)

Get the current available memory of the specified memory type.

memory_reserved(self, MemoryType mem_type)

Get the current reserved memory of the specified memory type.

release(self, MemoryReservation reservation, ...)

Consume a portion of the reserved memory.

reserve(self, MemoryType mem_type, ...)

Reserve an amount of the specified memory type.

reserve_device_memory_and_spill(self, ...)

Reserve device memory and spill if necessary.

stream_pool_size(self)

Get the size of the stream pool.

device_mr#

The memory resource used for device memory allocations.

Returns:
The device memory resource.
classmethod from_options(cls, RmmResourceAdaptor mr, Options options)#

Construct a BufferResource from configuration options.

This factory method creates a BufferResource using configuration options to initialize all components.

Parameters:
mr

RMM resource adaptor. The adaptor must outlive the returned BufferResource.

options

Configuration options.

Returns:
A BufferResource instance configured according to the options.
memory_available(self, MemoryType mem_type)#

Get the current available memory of the specified memory type.

memory_reserved(self, MemoryType mem_type)#

Get the current reserved memory of the specified memory type.

Parameters:
mem_type

The target memory type.

Returns:
The memory reserved, in bytes.
pinned_mr#

The memory resource used for pinned host memory allocations.

Returns:
The pinned host memory resource, or None if pinned host allocations
are disabled.
release(self, MemoryReservation reservation, size_t size)#

Consume a portion of the reserved memory.

Reduces the remaining size of the reserved memory by the specified amount.

Parameters:
reservation

The memory reservation to consume from.

size

The number of bytes to consume.

Returns:
The remaining size of the reserved memory after consumption.
Raises:
ReservationError

If the released size exceeds the total reserved size.

reserve(self, MemoryType mem_type, size_t size, *, bool allow_overbooking)#

Reserve an amount of the specified memory type.

Creates a new reservation of the specified size and memory type to inform the system about upcoming buffer allocations.

If overbooking is allowed, a reservation of the requested size is returned even if the memory is not currently available. In that case, the caller must guarantee that at least the overbooked amount of memory will be freed before the reservation is used.

If overbooking is not allowed, a reservation of size zero is returned on failure.

Parameters:
mem_type

The target memory type.

size

The number of bytes to reserve.

allow_overbooking

Whether overbooking is permitted.

Returns:
A tuple (reservation, overbooked_bytes):
  • On success, the reservation’s size equals size.

  • On failure, the reservation’s size equals zero (a zero-sized reservation never fails).

reserve_device_memory_and_spill(self, size_t size, *, bool allow_overbooking)#

Reserve device memory and spill if necessary.

Attempts to reserve the requested amount of device memory. If insufficient memory is available, spilling is triggered to free space. When overbooking is allowed, the reservation may succeed even if spilling was not sufficient to fully satisfy the request.

Parameters:
size

The amount of memory to reserve.

allow_overbooking

Whether to allow overbooking. If false, ensures enough memory is freed to satisfy the reservation. If true, the reservation may succeed even if spilling was insufficient.

Returns:
The resulting memory reservation.
Raises:
ReservationError

If overbooking is disabled and the buffer resource cannot free enough device memory through spilling to satisfy the request.

statistics#

Gets the statistics instance associated with this buffer resource.

Returns:
The Statistics instance.
stream_pool_size(self) int#

Get the size of the stream pool.

Returns:
int

The size of the stream pool.

class rapidsmpf.memory.buffer_resource.LimitAvailableMemory(RmmResourceAdaptor mr, int64_t limit)#

A callback class for querying the remaining available memory within a defined limit from an RMM resource adaptor.

This class is primarily designed to simulate constrained memory environments or prevent memory allocation beyond a specific threshold. It provides information about the available memory by subtracting the memory currently used (as reported by the RMM resource adaptor) from a user-defined limit.

It is typically used in the context of memory management operations such as with BufferResource.

Parameters:
mr

A statistics resource adaptor that tracks memory usage and provides statistics about the memory consumption. The LimitAvailableMemory instance keeps a reference to mr to keep it alive.

limit

The maximum memory limit (in bytes). Used to calculate the remaining available memory.

Methods

__call__(*args, **kwargs)

Call self as a function.

Notes

The mr resource must not be destroyed while this object is still in use.

Examples

>>> mr = RmmResourceAdaptor(...)
>>> memory_limiter = LimitAvailableMemory(mr, limit=1_000_000)
rapidsmpf.memory.buffer_resource.periodic_spill_check_from_options(Options options)#

Get the periodic_spill_check parameter from configuration options.

Parameters:
options

Configuration options.

Returns:
The duration of the pause between spill checks in seconds, or None if
periodic spill checks are disabled.
rapidsmpf.memory.buffer_resource.stream_pool_from_options(Options options)#

Create a new CUDA stream pool from configuration options.

Parameters:
options

Configuration options.

Returns:
Pool of CUDA streams used throughout RapidsMPF for operations that do not take
an explicit CUDA stream.
class rapidsmpf.memory.packed_data.PackedData#

Methods

from_cudf_packed_columns(cls, ...)

Constructs a PackedData from CudfPackedColumns by taking the ownership of the data and releasing packed_columns.

from_host_bytes(cls, const uint8_t[, ...)

Construct a PackedData from raw host bytes.

to_host_bytes(self)

Extract the host bytes from this PackedData.

classmethod from_cudf_packed_columns(cls, PackedColumns packed_columns, Stream stream, BufferResource br)#

Constructs a PackedData from CudfPackedColumns by taking the ownership of the data and releasing packed_columns.

Parameters:
packed_columns

Packed data containing metadata and GPU data buffers

Returns:
A new PackedData instance containing the packed columns data
Raises:
ValueError

If the PackedColumns object is empty (has been released already).

classmethod from_host_bytes(cls, const uint8_t[::1] data, BufferResource br)#

Construct a PackedData from raw host bytes.

The bytes are stored in the data buffer (as host memory) with minimal metadata. This is useful for scalar allreduce operations.

Note: This makes a copy of the input data.

Parameters:
data

Contiguous buffer of bytes (bytes, bytearray, or buffer-protocol object).

br

Buffer resource for memory allocation.

Returns:
A new PackedData instance containing the bytes.
to_host_bytes(self) bytes#

Extract the host bytes from this PackedData.

Returns the bytes stored in the data buffer. Works with both host and device memory buffers.

Returns a copy of the bytes stored in the data buffer. Works with both host and device memory buffers. The method synchronizes with the underlying buffer’s CUDA stream before returning.

Returns:
The raw bytes.
Raises:
ValueError

If the PackedData is empty.

class rapidsmpf.memory.scoped_memory_record.AllocType(*values)#
PRIMARY#
FALLBACK#
ALL#
class rapidsmpf.memory.scoped_memory_record.ScopedMemoryRecord#

Scoped memory record for tracking memory usage statistics.

Methods

current(self, AllocType alloc_type=AllocType.ALL)

Current memory usage in bytes.

num_current_allocs(self, ...)

Number of currently active (non-deallocated) allocations.

num_total_allocs(self, ...)

Total number of allocations performed.

peak(self, AllocType alloc_type=AllocType.ALL)

Peak memory usage in bytes.

record_allocation(self, ...)

Record a memory allocation event.

record_deallocation(self, ...)

Record a memory deallocation event.

total(self, AllocType alloc_type=AllocType.ALL)

Total number of bytes allocated over the lifetime.

current(self, AllocType alloc_type=AllocType.ALL)#

Current memory usage in bytes.

Parameters:
alloc_type

Allocator type to query. Defaults to ALL.

Returns:
Current memory usage in bytes.
num_current_allocs(self, AllocType alloc_type=AllocType.ALL)#

Number of currently active (non-deallocated) allocations.

Parameters:
alloc_type

Allocator type to query. Defaults to ALL.

Returns:
Number of active allocations.
num_total_allocs(self, AllocType alloc_type=AllocType.ALL)#

Total number of allocations performed.

Parameters:
alloc_type

Allocator type to query. Defaults to ALL.

Returns:
Number of total allocations.
peak(self, AllocType alloc_type=AllocType.ALL)#

Peak memory usage in bytes.

Parameters:
alloc_type

Allocator type to query. Defaults to ALL.

Returns:
Peak memory usage in bytes.
record_allocation(self, AllocType alloc_type, uint64_t nbytes)#

Record a memory allocation event.

Updates internal statistics for memory allocation and adjusts peak usage if the current memory usage exceeds the previous peak.

Parameters:
alloc_type

The allocator that performed the allocation.

nbytes

The number of bytes allocated.

record_deallocation(self, AllocType alloc_type, uint64_t nbytes)#

Record a memory deallocation event.

Updates internal statistics to reflect memory being freed.

Parameters:
alloc_type

The allocator that performed the deallocation.

nbytes

The number of bytes deallocated.

total(self, AllocType alloc_type=AllocType.ALL)#

Total number of bytes allocated over the lifetime.

Parameters:
alloc_type

Allocator type to query. Defaults to ALL.

Returns:
Total allocated bytes.

Config Options#

class rapidsmpf.config.Optional(value)#

Represents an option value that can be explicitly disabled.

This class wraps an option value and interprets certain strings as indicators that the value is disabled (case-insensitive): {“false”, “no”, “off”, “disable”, “disabled”}.

This is typically used to simplify optional or Optional options with Options.get_or_default().

Parameters:
value

The input value to interpret.

Attributes:
value

The raw input value, unless it matched a disable keyword, in which case the value is None.

Examples

>>> from rapidsmpf.config import Optional, Options
>>> Optional("OFF").value
None
>>> Optional("no").value
None
>>> Optional("100").value
'100'
>>> Optional("").value
''
>>> opts = Options()
>>> opts.get_or_default(
...     "dask_periodic_spill_check",
...     default_value=Optional(1e-3)
... ).value
0.001
class rapidsmpf.config.OptionalBytes(value)#

Represents a byte-sized option that can be explicitly disabled.

This class is a specialization of Optional that interprets the input as a human-readable byte size string (e.g., “100 MB”, “1KiB”, “1e6”). If the input is one of the disable keywords (e.g., “off”, “no”, “false”), the value is treated as disabled (None). Otherwise, it is parsed to an integer number of bytes using rapidsmpf.utils.string.parse_bytes().

This is useful for configuration options that may be set to a size limit or explicitly turned off.

Parameters:
value

A human-readable byte size (e.g., “1MiB”, “100 MB”) or a disable keyword (case-insensitive), or an integer number of bytes.

Attributes:
value

The size in bytes, or None if disabled.

Examples

>>> from rapidsmpf.config import OptionalBytes
>>> OptionalBytes("1KiB").value
1024
>>> OptionalBytes("OFF").value is None
True
>>> OptionalBytes(2048).value
2048
class rapidsmpf.config.Options#

Initialize an Options object with a dictionary of string options.

Parameters:
options_as_strings

A dictionary representing option names and their corresponding values.

Methods

deserialize(bytes serialized_buffer)

Deserialize a binary buffer into an Options object.

get(self, str key, *, return_type, factory)

Retrieves a configuration option by key.

get_or_default(self, str key, *, default_value)

Retrieve a configuration option by key, using a default value if not present.

get_strings(self)

Get all option key-value pairs as strings.

insert_if_absent(self, dict options_as_strings)

Insert multiple options if they are not already present.

serialize(self)

Serialize the Options object into a binary buffer.

static deserialize(bytes serialized_buffer)#

Deserialize a binary buffer into an Options object.

This method reconstructs an Options instance from a byte buffer produced by the Options.serialize() method.

See Options.serialize() for the binary format.

Parameters:
serialized_buffer

A buffer containing serialized options in the defined binary format.

Returns:
Options

A reconstructed Options instance containing the deserialized key-value pairs.

Raises:
ValueError

If the input buffer is malformed or inconsistent with the expected format.

get(self, str key, *, return_type, factory)#

Retrieves a configuration option by key.

If the option is not present, it is constructed using the provided factory function, which receives the string representation of the option (or an empty string if unset). The option is cached after the first access.

The option is cast to the specified return_type. To be accessible from C++, it must be one of: bool, int, float, str. Otherwise, it is stored as a PyObject*.

Once a key has been accessed with a particular return_type, subsequent calls to get with the same key must use the same return_type. Using a different type for the same key will result in a TypeError.

Parameters:
key

The option key. Should be in lowercase.

return_type

The return type. To be accessible from C++, it must be one of: bool, int, float, str. Use object to indicate any Python type.

factory

A factory function that constructs an instance of the desired type from a string representation.

Returns:
The value of the requested option, cast to the specified return_type.
Raises:
ValueError

If the return_type is unsupported, or if the stored option type does not match the expected type.

TypeError

If the option has already been accessed with a different return_type.

Warning

The factory must not access the Options instance, as this may lead to a deadlock due to internal locking.

get_or_default(self, str key, *, default_value)#

Retrieve a configuration option by key, using a default value if not present.

This is a convenience wrapper around get() that uses the type of the default_value as the return type and provides a default factory that parses a string into that type.

Parameters:
key

The name of the option to retrieve.

default_value

The default value to return if the option is not set. Its type is used to determine the expected return type.

Returns:
The value of the option if it exists and can be parsed to the type of
default_value, otherwise default_value.
Raises:
ValueError

If the stored option value cannot be parsed to the required type.

TypeError

If the option has already been accessed with a different return type.

Notes

  • This method infers the return type from type(default_value).

  • If default_value is used, it will be cached and reused for subsequent accesses of the same key.

Examples

>>> opts = Options()
>>> opts.get_or_default("debug", default_value=False)
False
>>> opts.get_or_default("timeout", default_value=1.5)
1.5
>>> opts.get_or_default("level", default_value="info")
'info'
get_strings(self)#

Get all option key-value pairs as strings.

Options that do not have a string representation, such as options inserted as typed values in C++ are included with an empty string value.

Returns:
A dictionary containing all stored options, where the keys and values are
both strings.
insert_if_absent(self, dict options_as_strings)#

Insert multiple options if they are not already present.

Attempts to insert each key-value pair from the provided dictionary, skipping keys that already exist in the options.

Parameters:
options_as_strings

Dictionary of option keys mapped to their string representations. Keys are inserted only if they do not already exist. The keys are trimmed and converted to lower case before insertion.

Returns:
Number of newly inserted options (0 if none were added).
serialize(self) bytes#

Serialize the Options object into a binary buffer.

This method produces a compact binary representation of the internal key-value options. The format is suitable for storage or transmission and can be later restored using Options.deserialize().

The binary format is:
  • [uint64_t count] — number of key-value pairs

  • [count * 2 * uint64_t] — offset pairs (key_offset, value_offset)

  • [raw bytes] — key and value strings stored contiguously

Returns:
bytes

A bytes object containing the serialized binary representation of the options.

Raises:
ValueError

If any option has already been accessed and cannot be serialized.

Notes

An Options instance can only be serialized if no options have been accessed. This is because serialization is based on the original string representations of the options. Once an option has been accessed and parsed, its string value may no longer accurately reflect its state, making serialization potentially inconsistent.

rapidsmpf.config.get_environment_variables(str key_regex='RAPIDSMPF_(.*)')#

Returns a dictionary of environment variables matching a given regular expression.

This function scans the current process’s environment variables and inserts those whose keys match the provided regular expression. The regular expression must contain exactly one capture group to extract the portion of the environment variable key to use as the dictionary key.

For example, to strip the RAPIDSMPF_ prefix, use r"RAPIDSMPF_(.*)" as the regex. The captured group will be used as the key in the output dictionary.

Example:
  • Environment variable: RAPIDSMPF_FOO=bar

  • key_regex: r”RAPIDSMPF_(.*)”

  • Resulting dictionary entry: { “FOO”: “bar” }

Parameters:
key_regex

A regular expression with a single capture group to match and extract the environment variable keys.

Returns:
A dictionary containing all matching environment variables, with keys as
extracted by the capture group.
Raises:
ValueError

If key_regex does not contain exactly one capture group.

See also

os.environ

Dictionary of the current environment variables.

Statistics#

class rapidsmpf.statistics.MemoryRecord(scoped: ScopedMemoryRecord, global_peak: int, num_calls: int)#

Holds memory profiling statistics for a named scope.

Attributes:
scoped

Memory statistics collected while the scope was active, including number of allocations, peak bytes allocated, and total allocated bytes.

global_peak

The maximum global memory usage observed during the scope, including allocations from other threads or nested scopes.

num_calls

Number of times the profiling context with this name was entered.

class rapidsmpf.statistics.MemoryRecorder#

A context manager for recording memory allocation statistics within a code block.

This class is not intended to be used directly by end users. Instead, use Statistics.memory_profiling(), which creates and manages an instance of this class.

Parameters:
stats

The statistics object responsible for aggregating memory profiling data.

mr

The memory resource through which allocations are tracked.

name

The name of the profiling scope. Used as a key in the statistics record.

class rapidsmpf.statistics.Statistics(bool enable, *, RmmResourceAdaptor mr=None)#

Track statistics across RapidsMPF operations.

Parameters:
enable

Whether statistics tracking is enabled.

mr

Enable memory profiling by providing a RMM resource adaptor.

Attributes:
enabled

Checks if statistics is enabled.

memory_profiling_enabled

Checks if memory profiling is enabled.

Methods

add_stat(self, name, double value)

Adds a value to a statistic.

clear(self)

Clears all statistics.

from_options(cls, RmmResourceAdaptor mr, ...)

Construct from configuration options.

get_memory_records(self)

Retrieves all memory profiling records stored by this instance.

get_stat(self, name)

Retrieves a statistic by name.

list_stat_names(self)

Returns a list of all statistic names.

memory_profiling(self, name)

Create a scoped memory profiling context for a named code region.

report(self)

Generates a report of statistics in a formatted string.

write_json(self, filepath)

Writes a JSON report of all collected statistics to a file.

write_json_string(self)

Returns a JSON representation of all collected statistics as a string.

add_stat(self, name, double value)#

Adds a value to a statistic.

Parameters:
name

Name of the statistic.

value

Value to add.

clear(self) None#

Clears all statistics.

Memory profiling records are not cleared.

enabled#

Checks if statistics is enabled.

Operations on disabled statistics is no-ops.

Returns:
True if statistics is enabled, otherwise False.
classmethod from_options(cls, RmmResourceAdaptor mr, Options options)#

Construct from configuration options.

Parameters:
mr

Pointer to a memory resource used for memory profiling.

options

Configuration options.

Returns:
The constructed Statistics instance.
get_memory_records(self)#

Retrieves all memory profiling records stored by this instance.

Returns:
Dictionary mapping record names to memory usage data.
get_stat(self, name)#

Retrieves a statistic by name.

Parameters:
name

Name of the statistic to retrieve.

Returns:
A dict of the statistic.
Raises:
KeyError

If the statistic with the specified name does not exist.

list_stat_names(self)#

Returns a list of all statistic names.

memory_profiling(self, name)#

Create a scoped memory profiling context for a named code region.

Returns a context manager that tracks memory allocations and deallocations made through the associated memory resource while the context is active. The profiling data is aggregated under the provided name and made available via Statistics.get_memory_records().

The statistics include: - Total and peak memory allocated within the scope (scoped) - Global peak memory usage during the scope (global_peak) - Number of times the named scope was entered (num_calls)

If memory profiling is disabled or the memory resource is None, this is a no-op.

Parameters:
name

A unique identifier for the profiling scope. Used as a key when accessing profiling data via Statistics.get_memory_records().

Returns:
A context manager that collects memory profiling data.

Examples

>>> import rmm
>>> mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource())
>>> stats = Statistics(enable=True, mr=mr)
>>> with stats.memory_profiling("outer"):
...     b1 = rmm.DeviceBuffer(size=1024, mr=mr)
...     with stats.memory_profiling("inner"):
...         b2 = rmm.DeviceBuffer(size=1024, mr=mr)
>>> inner = stats.get_memory_records()["inner"]
>>> print(inner.scoped.peak())
1024
>>> outer = stats.get_memory_records()["outer"]
>>> print(outer.scoped.peak())
2048
memory_profiling_enabled#

Checks if memory profiling is enabled.

Returns:
True if memory profiling is enabled, otherwise False.
report(self)#

Generates a report of statistics in a formatted string.

Operations on disabled statistics is no-ops.

Returns:
A string representing the formatted statistics report.
write_json(self, filepath) None#

Writes a JSON report of all collected statistics to a file.

Parameters:
filepath

Path to the output file. Created or overwritten.

Raises:
OSError

If the file cannot be opened or writing fails.

ValueError

If any stat name or memory record name contains a double quote, backslash, or ASCII control character (0x00–0x1F).

write_json_string(self) str#

Returns a JSON representation of all collected statistics as a string.

Returns:
A JSON-formatted string.
Raises:
ValueError

If any stat name or memory record name contains a double quote, backslash, or ASCII control character (0x00–0x1F).

RMM Resource Adaptor#

class rapidsmpf.rmm_resource_adaptor.RmmResourceAdaptor#

A RMM memory resource adaptor tailored to RapidsMPF.

Attributes:
current_allocated

RmmResourceAdaptor.current_allocated: int

fallback_mr
upstream_mr

Methods

allocate(self, size_t nbytes, ...)

Allocate nbytes bytes of memory.

deallocate(self, uintptr_t ptr, ...)

Deallocate memory pointed to by ptr of size nbytes.

get_main_record(self)

Returns a copy of the main memory record.

get_upstream(self)

current_allocated#

RmmResourceAdaptor.current_allocated: int

Get the total number of currently allocated bytes.

This includes both allocations on the primary and fallback memory resources.

Returns:
Total number of currently allocated bytes.
get_main_record(self)#

Returns a copy of the main memory record.

The main record tracks memory statistics for the lifetime of the resource.

Returns:
A copy of the current main memory record.