API Reference#
This page contains the API reference for rapidsmpf.
Integrations#
The subpackages under rapidsmpf.integrations contain integrations with other
libraries.
Generic#
RapidsMPF Integrations.
- class rapidsmpf.integrations.WorkerContext(br: BufferResource, progress_thread: ProgressThread, comm: Communicator, statistics: Statistics, spill_collection: SpillCollection = <factory>, shufflers: dict[int, Shuffler] = <factory>, options: Options = <factory>)#
RapidsMPF specific attributes for a worker.
- Attributes:
- lock
The global worker lock. Must be acquired before accessing attributes that might be modified while the worker is running such as the shufflers.
- br
The buffer resource used by the worker exclusively.
- progress_thread
The progress thread used by the worker.
- comm
The communicator connected to all other workers.
- statistics
The statistics used by the worker. If None, statistics is disabled.
- spill_collection
A collection of Python objects that can be spilled to free up device memory.
- shufflers
A mapping from shuffler IDs to active shuffler instances.
- options
Configuration options.
Methods
Get the statistics from the worker context.
- get_statistics() dict[str, dict[str, Number]]#
Get the statistics from the worker context.
- Returns:
- statistics
A dictionary of statistics. The keys are the names of the statistics. The values are dictionaries with two keys:
“count” is the number of times the statistic was recorded.
“value” is the value of the statistic.
Notes
Statistics are global across all shuffles. To measure statistics for any given shuffle, gather statistics before and after the shuffle and compute the difference.
Dask#
RapidsMPF Dask Integrations.
- rapidsmpf.integrations.dask.bootstrap_dask_cluster(client: distributed.Client, *, options: Options = <rapidsmpf.config.Options object>) None#
Setup a Dask cluster for RapidsMPF shuffling.
Calling
bootstrap_dask_clustermultiple times on the same worker is a noop, which also means that any new options values are ignored.- Parameters:
- client
The current Dask client.
- options
Configuration options. Reads environment variables for any options not set explicitly using
get_environment_variables().
Notes
This utility must be executed before RapidsMPF shuffling can be used within a Dask cluster. This function is called automatically by
rapidsmpf.integrations.dask.rapidsmpf_shuffle_graph, but may be called manually to set things up before the first shuffle.Subsequent shuffles on the same cluster will reuse the resources established on the cluster by this function.
All the workers reported by
distributed.Client.scheduler_info()will be used. Note that RapidsMPF does not currently support adding or removing workers from the cluster.
- rapidsmpf.integrations.dask.get_worker_context(worker: distributed.Worker | None = None) WorkerContext#
Retrieve the
WorkerContextassociated with a Dask worker.If the worker context does not already exist on the worker, it will be created.
- Parameters:
- worker
An optional Dask worker instance. If not provided, the current worker is retrieved using
get_worker().
- Returns:
- The existing or newly initialized worker context.
- rapidsmpf.integrations.dask.rapidsmpf_shuffle_graph(input_name: str, output_name: str, partition_count_in: int, partition_count_out: int, integration: ShufflerIntegration, options: Any, *other_keys: str | tuple[str, int], config_options: Options = <rapidsmpf.config.Options object>) dict[Any, Any]#
Return the task graph for a RapidsMPF shuffle.
- Parameters:
- input_name
The task name for input DataFrame tasks.
- output_name
The task name for output DataFrame tasks.
- partition_count_in
Partition count of input collection.
- partition_count_out
Partition count of output collection.
- integration
Dask-integration specification.
- options
Optional key-word arguments.
- *other_keys
Other keys needed by
integration.insert_partition.- config_options
RapidsMPF configuration options.
- Returns:
- A valid task graph for Dask execution.
Notes
A RapidsMPF shuffle operation comprises four general phases:
Staging phase A new
rapidsmpf.shuffler.Shufflerobject must be staged on every worker in the current Dask cluster.Insertion phase Each input partition is split into a dictionary of chunks, and that dictionary is passed to the appropriate
rapidsmpf.shuffler.Shufflerobject (usingrapidsmpf.shuffler.Shuffler.insert_chunks).The insertion phase will include a single task for each of the
partition_count_inpartitions in the input DataFrame. The partitioning and insertion logic must be defined by theinsert_partitionclassmethod of theintegrationargument.Insertion tasks are NOT restricted to specific Dask workers. These tasks may run anywhere in the cluster.
Barrier phase All
rapidsmpf.shuffler.Shufflerobjects must be ‘informed’ that the insertion phase is complete (on all workers) before the subsequent extraction phase begins. We call this synchronization step the ‘barrier phase’.The barrier phase comprises three types of barrier tasks:
1. First global barrier - A single barrier task is used to signal that all input partitions have been submitted to a
rapidsmpf.shuffler.Shufflerobject on one of the workers. This task may also run anywhere on the cluster, but it must depend on ALL insertion tasks.2. Worker barrier(s) - Each worker must execute a single worker-barrier task. This task will call
insert_finishedfor every output partition on the localrapidsmpf.shuffler.Shuffler. These tasks must be restricted to specific workers, and they must all depend on the first global barrier.3. Second global barrier - A single barrier task is used to signal that all workers are ready to begin the extraction phase. This task may run anywhere on the cluster, but it must depend on all worker-barrier tasks.
Extraction phase Each output partition is extracted from the local
rapidsmpf.shuffler.Shufflerobject on the worker (usingrapidsmpf.shuffler.Shuffler.wait_onandrapidsmpf.integrations.cudf.partition.unpack_and_concat).The extraction phase will include a single task for each of the
partition_count_outpartitions in the shuffled output DataFrame. The extraction logic must be defined by theextract_partitionclassmethod of theintegrationargument.Extraction tasks must be restricted to specific Dask workers, and they must also depend on the second global-barrier task.
Single-process#
Shuffler integration for single-worker pylibcudf execution.
- rapidsmpf.integrations.single.get_worker_context() WorkerContext#
Retrieve the single-worker
WorkerContext.- Returns:
- The worker context
- Raises:
- ValueError
If a worker context was never created.
See also
setup_workerMust be called before this function.
- rapidsmpf.integrations.single.rapidsmpf_shuffle_graph(input_name: str, output_name: str, partition_count_in: int, partition_count_out: int, integration: ShufflerIntegration, options: Any, *other_keys: str | tuple[str, int], config_options: Options = <rapidsmpf.config.Options object>) dict[Any, Any]#
Return the task graph for a RapidsMPF shuffle.
This shuffle will use the single-process RapidsMPF Communicator.
- Parameters:
- input_name
The task name for input DataFrame tasks.
- output_name
The task name for output DataFrame tasks.
- partition_count_in
Partition count of input collection.
- partition_count_out
Partition count of output collection.
- integration
Shuffle-integration specification.
- options
Optional key-word arguments.
- *other_keys
Other keys needed by
integration.insert_partition.- config_options
RapidsMPF configuration options.
- Returns:
- A valid task graph for single-worker execution.
- rapidsmpf.integrations.single.setup_worker(options: ~rapidsmpf.config.Options = <rapidsmpf.config.Options object>) None#
Attach RapidsMPF shuffling attributes to a single worker.
- Parameters:
- options
Configuration options.
Warning
This function creates a new RMM memory pool, and sets it as the current device resource.
Ray#
Integration for Ray clusters.
- class rapidsmpf.integrations.ray.RapidsMPFActor(nranks: int)#
RapidsMPFActor is a base class that instantiates a UCXX communication within them.
- Parameters:
- nranks
The number of workers in the cluster.
- Attributes:
commThe UCXX communicator object.
Methods
Check if the communicator is initialized.
nranks()Get the number of ranks in the UCXX communicator.
rank()Get the rank of the worker, as inferred from the UCXX communicator.
Setup root communicator in the cluster.
setup_worker(root_address_bytes)Setup the worker in the cluster once the root is initialized.
Return a string representation of the actor.
Examples
>>> @ray.remote(num_cpus=1) ... class DummyActor(RapidsMPFActor): ... >>> actors = setup_ray_ucx_cluster(DummyActor, 2) >>> ray.get([actor.status_check.remote() for actor in actors]
- property comm: Communicator#
The UCXX communicator object.
- Returns:
- The UCXX communicator object if initialized, otherwise None
- Raises:
- RuntimeError if the communicator is not initialized
Notes
This property is not meant to be called remotely from the client. Then Ray will attempt to serialize the Communicator object, which will fail. Instead, the subclasses can use the
commproperty to access the communicator. For example, to create a Shuffle operation
- is_initialized() bool#
Check if the communicator is initialized.
- Returns:
- True if the communicator is initialized, False otherwise.
- nranks() int#
Get the number of ranks in the UCXX communicator.
- Returns:
- The number of ranks in the UCXX communicator
- rank() int#
Get the rank of the worker, as inferred from the UCXX communicator.
- Returns:
- The rank of the worker
- setup_root() tuple[int, bytes]#
Setup root communicator in the cluster.
- Returns:
- rank
The rank of the root.
- root_address_bytes
The address of the root.
- rapidsmpf.integrations.ray.setup_ray_ucxx_cluster(actor_cls: ActorClass, num_workers: int, *args: Any, **kwargs: Any) list[ActorHandle]#
A utility method to setup the UCXX communication using RapidsMPFActor actor objects.
- Parameters:
- actor_cls
The actor class to be instantiated in the cluster.
- num_workers
The number of workers in the cluster.
- *args
Additional arguments to be passed to the actor class.
- **kwargs
Additional keyword arguments to be passed to the actor class.
- Returns:
- gpu_actors
A list of actors in the cluster.
cuDF#
Collection of cuDF specific functions.
Partition#
- rapidsmpf.integrations.cudf.partition.split_and_pack(table: Table, splits: Iterable[int], stream: Stream, br: BufferResource) dict[int, PackedData]#
- rapidsmpf.integrations.cudf.partition.unpack_and_concat(partitions: Iterable[PackedData], stream: Stream, br: BufferResource) Table#
Shuffler#
The Shuffler interface for RapidsMPF.
- class rapidsmpf.shuffler.Shuffler(Communicator comm, ProgressThread progress_thread, uint8_t op_id, uint32_t total_num_partitions, Stream stream, BufferResource br, Statistics statistics=None)#
Shuffle service for partitioned data.
The
rapidsmpf.shuffler.Shufflerclass provides an interface for performing a shuffle operation on partitioned data. It uses a distribution scheme to distribute and collect data chunks across different ranks.- Parameters:
- comm
The communicator to use for data exchange between ranks.
- progress_thread
The progress thread to use for tracking progress.
- op_id
The operation ID of the shuffle. Must have a value between 0 and
max_concurrent_shuffles-1.- total_num_partitions
Total number of partitions in the shuffle.
- stream
The CUDA stream used for memory operations.
- br
The buffer resource used to allocate temporary storage and shuffle results.
- statistics
The statistics instance to use. If None, statistics is disabled.
- Attributes:
- max_concurrent_shuffles
Maximum number of concurrent shufflers.
Methods
concat_insert(self, chunks)Insert a batch of packed (serialized) chunks into the shuffle while concatenating the chunks based on the destination rank.
extract(self, uint32_t pid)Extract all chunks of the specified partition.
finished(self)Check if all partitions are finished.
insert_chunks(self, chunks)Insert a batch of packed (serialized) chunks into the shuffle.
insert_finished(self, pids)Mark partitions as finished.
shutdown(self)Shutdown the shuffle, blocking until all inflight communication is completed.
wait_any(self)Wait for any partition to finish.
wait_on(self, uint32_t pid)Wait for a specific partition to finish.
Notes
This class is designed to handle distributed operations by partitioning data and redistributing it across ranks in a cluster. It is typically used in distributed data processing workflows involving cuDF tables.
- comm#
Get the communicator used by the shuffler.
- Returns:
- The communicator.
- concat_insert(self, chunks)#
Insert a batch of packed (serialized) chunks into the shuffle while concatenating the chunks based on the destination rank.
- Parameters:
- chunks
A map where keys are partition IDs (
int) and values are packed data (PackedData).
Notes
There are some considerations for using this method:
The chunks are grouped by the destination rank of the partition ID and concatenated on device memory.
The caller thread will perform the concatenation, and hence it will be blocked.
Concatenation may cause device memory pressure.
- extract(self, uint32_t pid)#
Extract all chunks of the specified partition.
- Parameters:
- pid
The partition ID to extract chunks for.
- Returns:
- A list of packed data belonging to the specified partition.
- finished(self)#
Check if all partitions are finished.
This method verifies if all partitions have been completed, meaning all chunks have been inserted and no further data is expected from neither the local nor any remote nodes.
- Returns:
- True if all partitions are finished, otherwise False.
- insert_chunks(self, chunks)#
Insert a batch of packed (serialized) chunks into the shuffle.
- Parameters:
- chunks
A map where keys are partition IDs (
int) and values are packed data (PackedData).
Notes
This method adds the given chunks to the shuffle, associating them with their respective partition IDs.
- insert_finished(self, pids)#
Mark partitions as finished.
This informs the shuffler that no more chunks for the specified partitions will be inserted.
- Parameters:
- pids
Partition IDs to mark as finished (int or an iterable of ints).
Notes
Once a partition is marked as finished, it is considered complete and no further chunks will be accepted for that partition.
- shutdown(self)#
Shutdown the shuffle, blocking until all inflight communication is completed.
- Raises:
- RuntimeError
If the shuffler is already inactive.
Notes
This method ensures that all pending shuffle operations and communications are completed before shutting down. It blocks until no inflight operations remain.
- wait_any(self)#
Wait for any partition to finish.
This method blocks until at least one partition is marked as finished. It is useful for processing partitions as they are completed.
- Returns:
- The partition ID of the next finished partition.
- wait_on(self, uint32_t pid)#
Wait for a specific partition to finish.
This method blocks until the desired partition is ready for processing.
- Parameters:
- pid
The desired partition ID.
Communicator#
Submodule for communication abstraction (e.g. UCXX and MPI).
- rapidsmpf.communicator.COMMUNICATORS = ('single', 'ucxx', 'mpi')#
Tuple of available communicators.
RapidsMPF includes a collection of communicator backends, available as submodules under
rapidsmpf.communicator.*. Typically, the Conda distribution includes both UCXX and MPI support, while the PIP installation generally supports only UCXX.
- class rapidsmpf.communicator.communicator.Communicator#
Abstract base class for a communication mechanism between nodes.
Provides an interface for sending and receiving messages between nodes, supporting asynchronous operations, GPU data transfers, and custom logging. Concrete implementations must define the virtual methods to enable specific communication backends.
- Attributes:
Methods
get_str(self)Get a string representation of the communicator.
Notes
This class is designed as an abstract base class, meaning it cannot be instantiated directly. Subclasses are required to implement the necessary methods to support the desired communication backend and functionality.
- get_str(self)#
Get a string representation of the communicator.
- Returns:
- A string describing the communicater
- logger#
Get the logger.
- Returns:
- A logger instance.
- nranks#
Get the total number of ranks.
- Returns:
- Total number of ranks.
- rank#
Get the rank of this communication node.
- Returns:
- The rank.
- class rapidsmpf.communicator.communicator.LOG_LEVEL(*values)#
- Attributes:
denominatorthe denominator of a rational number in lowest terms
imagthe imaginary part of a complex number
numeratorthe numerator of a rational number in lowest terms
realthe real part of a complex number
Methods
as_integer_ratio(/)Return a pair of integers, whose ratio is equal to the original int.
bit_count(/)Number of ones in the binary representation of the absolute value of self.
bit_length(/)Number of bits necessary to represent self in binary.
conjugate(/)Returns self, the complex conjugate of any int.
from_bytes(/, bytes[, byteorder, signed])Return the integer represented by the given array of bytes.
is_integer(/)Returns True.
to_bytes(/[, length, byteorder, signed])Return an array of bytes representing an integer.
- class rapidsmpf.communicator.communicator.Logger#
Logger.
- To control the verbosity level, set the environment variable
RAPIDSMPF_LOG: NONE: No logging.
PRINT: General print messages.
WARN: Warning messages (default)
INFO: Informational messages.
DEBUG: Debug messages.
TRACE: Trace messages.
- Attributes:
verbosity_levelGet the verbosity level of the logger.
Methods
debug(self, str msg)Logs a debug message.
info(self, str msg)Logs an informational message.
print(self, str msg)Logs a print message.
trace(self, str msg)Logs a trace message.
warn(self, str msg)Logs a warning message.
- debug(self, str msg)#
Logs a debug message.
- Parameters:
- msg
The message to log.
- info(self, str msg)#
Logs an informational message.
- Parameters:
- msg
The message to log.
- print(self, str msg)#
Logs a print message.
- Parameters:
- msg
The message to log.
- trace(self, str msg)#
Logs a trace message.
- Parameters:
- msg
The message to log.
- verbosity_level#
Get the verbosity level of the logger.
- Returns:
- The verbosity level.
- warn(self, str msg)#
Logs a warning message.
- Parameters:
- msg
The message to log.
- To control the verbosity level, set the environment variable
MPI Communicator#
UCXX Communicator#
ucxx-based implementation of a RapidsMPF Communicator.
- rapidsmpf.communicator.ucxx.barrier(Communicator comm)#
Execute a barrier on the UCXX communicator.
Ensures all ranks connected to the root and all ranks reached the barrier before continuing.
Notes
Executing this barrier is required after the ranks are bootstrapped to ensure everyone is connected to the root. An alternative barrier, such as
MPI_Barrierwill not suffice for that purpose.
- rapidsmpf.communicator.ucxx.get_root_ucxx_address(Communicator comm)#
Get the address of the communicator’s UCXX worker.
This function is intended to be called from the root rank to communicate to other processes how to reach the root, but it will return the address of UCXX worker of other ranks too.
- Parameters:
- comm
The RapidsMPF-UCXX communicator.
- Returns:
- A bytes sequence with the UCXX worker address.
- Raises:
- NotImplementedError
If the communicator was created with a HostPortPair, which is not yet supported.
- rapidsmpf.communicator.ucxx.new_communicator(Rank nranks, UCXWorker ucx_worker, UCXAddress root_ucxx_address, Options options)#
Create a new UCXX communicator with the given number of ranks.
An existing UCXWorker may be specified, otherwise one will be created. The root rank is created if no
root_ucxx_addressis specific, all other ranks must specify the the address of the root rank via that argument.- Parameters:
- nranks
The number of ranks in the cluster.
- ucx_worker
An existing UCXX worker to use if specified, otherwise one will be created.
- root_ucxx_address
The UCXX address of the root rank (only specified for non-root ranks).
- options
Configuration options.
- Returns:
- A new RapidsMPF-UCXX communicator.
Buffer#
Submodule for buffer abstraction.
- class rapidsmpf.buffer.buffer.MemoryType(*values)#
- Attributes:
denominatorthe denominator of a rational number in lowest terms
imagthe imaginary part of a complex number
numeratorthe numerator of a rational number in lowest terms
realthe real part of a complex number
Methods
as_integer_ratio(/)Return a pair of integers, whose ratio is equal to the original int.
bit_count(/)Number of ones in the binary representation of the absolute value of self.
bit_length(/)Number of bits necessary to represent self in binary.
conjugate(/)Returns self, the complex conjugate of any int.
from_bytes(/, bytes[, byteorder, signed])Return the integer represented by the given array of bytes.
is_integer(/)Returns True.
to_bytes(/[, length, byteorder, signed])Return an array of bytes representing an integer.
- class rapidsmpf.buffer.resource.BufferResource#
Class managing buffer resources.
This class handles memory allocation and transfers between different memory types (e.g., host and device). All memory operations in RapidsMPF, such as those performed by the Shuffler, rely on a buffer resource for memory management.
- Parameters:
- device_mr
Reference to the RMM device memory resource used for device allocations.
- memory_available
Optional memory availability functions. Memory types without availability functions are unlimited. A function must return the current available memory of a specific type. It must be thread-safe if used by multiple
BufferResourceinstances concurrently. Warning: calling anyBufferResourceinstance methods within the function might result in a deadlock. This is because the buffer resource is locked when the function is called.- periodic_spill_check
Enable periodic spill checks. A dedicated thread continuously checks and perform spilling based on the memory availability functions. The value of
periodic_spill_checkis used as the pause between checks (in seconds). If None, no periodic spill check is performed.
- Attributes:
- spill_manager
Methods
memory_available(self, MemoryType mem_type)Get the current available memory of the specified memory type.
memory_reserved(self, MemoryType mem_type)Get the current reserved memory of the specified memory type.
- memory_available(self, MemoryType mem_type)#
Get the current available memory of the specified memory type.
- memory_reserved(self, MemoryType mem_type)#
Get the current reserved memory of the specified memory type.
- Parameters:
- mem_type
The target memory type.
- Returns:
- The memory reserved, in bytes.
- class rapidsmpf.buffer.resource.LimitAvailableMemory(RmmResourceAdaptor mr, int64_t limit)#
A callback class for querying the remaining available memory within a defined limit from an RMM resource adaptor.
This class is primarily designed to simulate constrained memory environments or prevent memory allocation beyond a specific threshold. It provides information about the available memory by subtracting the memory currently used (as reported by the RMM resource adaptor) from a user-defined limit.
It is typically used in the context of memory management operations such as with
BufferResource.- Parameters:
- mr
A statistics resource adaptor that tracks memory usage and provides statistics about the memory consumption. The
LimitAvailableMemoryinstance keeps a reference tomrto keep it alive.- limit
The maximum memory limit (in bytes). Used to calculate the remaining available memory.
Methods
__call__(self)Returns the remaining available memory within the defined limit.
Notes
The
mrresource must not be destroyed while this object is still in use.Examples
>>> mr = RmmResourceAdaptor(...) >>> memory_limiter = LimitAvailableMemory(mr, limit=1_000_000)
- class rapidsmpf.buffer.packed_data.PackedData#
Methods
from_cudf_packed_columns(cls, ...)Constructs a PackedData from CudfPackedColumns by taking the ownership of the data and releasing
packed_columns.- classmethod from_cudf_packed_columns(cls, PackedColumns packed_columns, Stream stream, BufferResource br)#
Constructs a PackedData from CudfPackedColumns by taking the ownership of the data and releasing
packed_columns.- Parameters:
- packed_columns
Packed data containing metadata and GPU data buffers
- Returns:
- A new PackedData instance containing the packed columns data
- Raises:
- ValueError
If the PackedColumns object is empty (has been released already).
Config Options#
- class rapidsmpf.config.Optional(value)#
Represents an option value that can be explicitly disabled.
This class wraps an option value and interprets certain strings as indicators that the value is disabled (case-insensitive): {“false”, “no”, “off”, “disable”, “disabled”}.
This is typically used to simplify optional or Optional options with
Options.get_or_default().- Parameters:
- value
The input value to interpret.
- Attributes:
- value
The raw input value, unless it matched a disable keyword, in which case the value is
None.
Examples
>>> from rapidsmpf.config import Optional, Options >>> Optional("OFF").value None
>>> Optional("no").value None
>>> Optional("100").value '100'
>>> Optional("").value ''
>>> opts = Options() >>> opts.get_or_default( ... "dask_periodic_spill_check", ... default_value=Optional(1e-3) ... ).value 0.001
- class rapidsmpf.config.OptionalBytes(value)#
Represents a byte-sized option that can be explicitly disabled.
This class is a specialization of
Optionalthat interprets the input as a human-readable byte size string (e.g., “100 MB”, “1KiB”, “1e6”). If the input is one of the disable keywords (e.g., “off”, “no”, “false”), the value is treated as disabled (None). Otherwise, it is parsed to an integer number of bytes usingrapidsmpf.utils.string.parse_bytes().This is useful for configuration options that may be set to a size limit or explicitly turned off.
- Parameters:
- value
A human-readable byte size (e.g., “1MiB”, “100 MB”) or a disable keyword (case-insensitive), or an integer number of bytes.
- Attributes:
- value
The size in bytes, or
Noneif disabled.
Examples
>>> from rapidsmpf.config import OptionalBytes >>> OptionalBytes("1KiB").value 1024
>>> OptionalBytes("OFF").value is None True
>>> OptionalBytes(2048).value 2048
- class rapidsmpf.config.Options#
Initialize an Options object with a dictionary of string options.
- Parameters:
- options_as_strings
A dictionary representing option names and their corresponding values.
Methods
deserialize(bytes serialized_buffer)Deserialize a binary buffer into an Options object.
get(self, str key, *, return_type, factory)Retrieves a configuration option by key.
get_or_default(self, str key, *, default_value)Retrieve a configuration option by key, using a default value if not present.
get_strings(self)Get all option key-value pairs as strings.
insert_if_absent(self, dict options_as_strings)Insert multiple options if they are not already present.
serialize(self)Serialize the Options object into a binary buffer.
- static deserialize(bytes serialized_buffer)#
Deserialize a binary buffer into an Options object.
This method reconstructs an Options instance from a byte buffer produced by the
Options.serialize()method.See
Options.serialize()for the binary format.- Parameters:
- serialized_buffer
A buffer containing serialized options in the defined binary format.
- Returns:
- Options
A reconstructed Options instance containing the deserialized key-value pairs.
- Raises:
- ValueError
If the input buffer is malformed or inconsistent with the expected format.
- get(self, str key, *, return_type, factory)#
Retrieves a configuration option by key.
If the option is not present, it is constructed using the provided factory function, which receives the string representation of the option (or an empty string if unset). The option is cached after the first access.
The option is cast to the specified
return_type. To be accessible from C++, it must be one of:bool,int,float,str. Otherwise, it is stored as aPyObject*.Once a key has been accessed with a particular
return_type, subsequent calls togetwith the same key must use the samereturn_type. Using a different type for the same key will result in aTypeError.- Parameters:
- Returns:
- The value of the requested option, cast to the specified
return_type.
- The value of the requested option, cast to the specified
- Raises:
- ValueError
If the
return_typeis unsupported, or if the stored option type does not match the expected type.- TypeError
If the option has already been accessed with a different
return_type.
Warning
The factory must not access the Options instance, as this may lead to a deadlock due to internal locking.
- get_or_default(self, str key, *, default_value)#
Retrieve a configuration option by key, using a default value if not present.
This is a convenience wrapper around
get()that uses the type of thedefault_valueas the return type and provides a default factory that parses a string into that type.- Parameters:
- key
The name of the option to retrieve.
- default_value
The default value to return if the option is not set. Its type is used to determine the expected return type.
- Returns:
- The value of the option if it exists and can be parsed to the type of
default_value, otherwisedefault_value.
- Raises:
- ValueError
If the stored option value cannot be parsed to the required type.
- TypeError
If the option has already been accessed with a different return type.
Notes
This method infers the return type from
type(default_value).If
default_valueis used, it will be cached and reused for subsequent accesses of the same key.
Examples
>>> opts = Options() >>> opts.get_or_default("debug", default_value=False) False >>> opts.get_or_default("timeout", default_value=1.5) 1.5 >>> opts.get_or_default("level", default_value="info") 'info'
- get_strings(self)#
Get all option key-value pairs as strings.
- Returns:
- A dictionary containing all stored options, where the keys and values are
- both strings.
- insert_if_absent(self, dict options_as_strings)#
Insert multiple options if they are not already present.
Attempts to insert each key-value pair from the provided dictionary, skipping keys that already exist in the options.
- Parameters:
- options_as_strings
Dictionary of option keys mapped to their string representations. Keys are inserted only if they do not already exist. The keys are trimmed and converted to lower case before insertion.
- Returns:
- Number of newly inserted options (0 if none were added).
- serialize(self) bytes#
Serialize the Options object into a binary buffer.
This method produces a compact binary representation of the internal key-value options. The format is suitable for storage or transmission and can be later restored using
Options.deserialize().- The binary format is:
[uint64_t count] — number of key-value pairs
[count * 2 * uint64_t] — offset pairs (key_offset, value_offset)
[raw bytes] — key and value strings stored contiguously
- Returns:
- bytes
A
bytesobject containing the serialized binary representation of the options.
- Raises:
- ValueError
If any option has already been accessed and cannot be serialized.
Notes
An Options instance can only be serialized if no options have been accessed. This is because serialization is based on the original string representations of the options. Once an option has been accessed and parsed, its string value may no longer accurately reflect its state, making serialization potentially inconsistent.
- rapidsmpf.config.get_environment_variables(str key_regex='RAPIDSMPF_(.*)')#
Returns a dictionary of environment variables matching a given regular expression.
This function scans the current process’s environment variables and inserts those whose keys match the provided regular expression. The regular expression must contain exactly one capture group to extract the portion of the environment variable key to use as the dictionary key.
For example, to strip the
RAPIDSMPF_prefix, user"RAPIDSMPF_(.*)"as the regex. The captured group will be used as the key in the output dictionary.- Example:
Environment variable: RAPIDSMPF_FOO=bar
key_regex: r”RAPIDSMPF_(.*)”
Resulting dictionary entry: { “FOO”: “bar” }
- Parameters:
- key_regex
A regular expression with a single capture group to match and extract the environment variable keys.
- Returns:
- A dictionary containing all matching environment variables, with keys as
- extracted by the capture group.
- Raises:
- ValueError
If
key_regexdoes not contain exactly one capture group.
See also
os.environDictionary of the current environment variables.
Statistics#
- class rapidsmpf.statistics.MemoryRecord(scoped: ScopedMemoryRecord, global_peak: int, num_calls: int)#
Holds memory profiling statistics for a named scope.
- Attributes:
- scoped
Memory statistics collected while the scope was active, including number of allocations, peak bytes allocated, and total allocated bytes.
- global_peak
The maximum global memory usage observed during the scope, including allocations from other threads or nested scopes.
- num_calls
Number of times the profiling context with this name was entered.
- class rapidsmpf.statistics.MemoryRecorder#
A context manager for recording memory allocation statistics within a code block.
This class is not intended to be used directly by end users. Instead, use
Statistics.memory_profiling(), which creates and manages an instance of this class.- Parameters:
- stats
The statistics object responsible for aggregating memory profiling data.
- mr
The memory resource through which allocations are tracked.
- name
The name of the profiling scope. Used as a key in the statistics record.
- class rapidsmpf.statistics.Statistics#
Track statistics across RapidsMPF operations.
- Parameters:
- enable
Whether statistics tracking is enabled.
- mr
Enable memory profiling by providing a RMM resource adaptor.
- Attributes:
enabledChecks if statistics is enabled.
memory_profiling_enabledChecks if memory profiling is enabled.
Methods
add_stat(self, name, double value)Adds a value to a statistic.
clear(self)Clears all statistics.
get_memory_records(self)Retrieves all memory profiling records stored by this instance.
get_stat(self, name)Retrieves a statistic by name.
list_stat_names(self)Returns a list of all statistic names.
memory_profiling(self, name)Create a scoped memory profiling context for a named code region.
report(self)Generates a report of statistics in a formatted string.
- add_stat(self, name, double value)#
Adds a value to a statistic.
- Parameters:
- name
Name of the statistic.
- value
Value to add.
- Returns:
- Updated total value.
- enabled#
Checks if statistics is enabled.
Operations on disabled statistics is no-ops.
- Returns:
- True if statistics is enabled, otherwise False.
- get_memory_records(self)#
Retrieves all memory profiling records stored by this instance.
- Returns:
- Dictionary mapping record names to memory usage data.
- get_stat(self, name)#
Retrieves a statistic by name.
- Parameters:
- name
Name of the statistic to retrieve.
- Returns:
- A dict of the statistic.
- Raises:
- KeyError
If the statistic with the specified name does not exist.
- list_stat_names(self)#
Returns a list of all statistic names.
- memory_profiling(self, name)#
Create a scoped memory profiling context for a named code region.
Returns a context manager that tracks memory allocations and deallocations made through the associated memory resource while the context is active. The profiling data is aggregated under the provided
nameand made available viaStatistics.get_memory_records().The statistics include: - Total and peak memory allocated within the scope (
scoped) - Global peak memory usage during the scope (global_peak) - Number of times the named scope was entered (num_calls)If memory profiling is disabled or the memory resource is
None, this is a no-op.- Parameters:
- name
A unique identifier for the profiling scope. Used as a key when accessing profiling data via
Statistics.get_memory_records().
- Returns:
- A context manager that collects memory profiling data.
Examples
>>> import rmm >>> mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) >>> stats = Statistics(enable=True, mr=mr) >>> with stats.memory_profiling("outer"): ... b1 = rmm.DeviceBuffer(size=1024, mr=mr) ... with stats.memory_profiling("inner"): ... b2 = rmm.DeviceBuffer(size=1024, mr=mr) >>> inner = stats.get_memory_records()["inner"] >>> print(inner.scoped.peak()) 1024 >>> outer = stats.get_memory_records()["outer"] >>> print(outer.scoped.peak()) 2048
- memory_profiling_enabled#
Checks if memory profiling is enabled.
- Returns:
- True if memory profiling is enabled, otherwise False.
- report(self)#
Generates a report of statistics in a formatted string.
Operations on disabled statistics is no-ops.
- Returns:
- A string representing the formatted statistics report.
RMM Resource Adaptor#
- class rapidsmpf.rmm_resource_adaptor.AllocType(*values)#
- Attributes:
denominatorthe denominator of a rational number in lowest terms
imagthe imaginary part of a complex number
numeratorthe numerator of a rational number in lowest terms
realthe real part of a complex number
Methods
as_integer_ratio(/)Return a pair of integers, whose ratio is equal to the original int.
bit_count(/)Number of ones in the binary representation of the absolute value of self.
bit_length(/)Number of bits necessary to represent self in binary.
conjugate(/)Returns self, the complex conjugate of any int.
from_bytes(/, bytes[, byteorder, signed])Return the integer represented by the given array of bytes.
is_integer(/)Returns True.
to_bytes(/[, length, byteorder, signed])Return an array of bytes representing an integer.
- class rapidsmpf.rmm_resource_adaptor.RmmResourceAdaptor#
A RMM memory resource adaptor tailored to RapidsMPF.
- Attributes:
current_allocatedRmmResourceAdaptor.current_allocated: int
- fallback_mr
- upstream_mr
Methods
allocate(self, size_t nbytes, ...)Allocate
nbytesbytes of memory.deallocate(self, uintptr_t ptr, ...)Deallocate memory pointed to by
ptrof sizenbytes.get_main_record(self)Returns a copy of the main memory record.
get_upstream(self)- current_allocated#
RmmResourceAdaptor.current_allocated: int
Get the total number of currently allocated bytes.
This includes both allocations on the primary and fallback memory resources.
- Returns:
- Total number of currently allocated bytes.
- get_main_record(self)#
Returns a copy of the main memory record.
The main record tracks memory statistics for the lifetime of the resource.
- Returns:
- A copy of the current main memory record.
- class rapidsmpf.rmm_resource_adaptor.ScopedMemoryRecord#
Scoped memory record for tracking memory usage statistics.
Methods
current(self, AllocType alloc_type=AllocType.ALL)Current memory usage in bytes.
num_current_allocs(self, ...)Number of currently active (non-deallocated) allocations.
num_total_allocs(self, ...)Total number of allocations performed.
peak(self, AllocType alloc_type=AllocType.ALL)Peak memory usage in bytes.
record_allocation(self, ...)Record a memory allocation event.
record_deallocation(self, ...)Record a memory deallocation event.
total(self, AllocType alloc_type=AllocType.ALL)Total number of bytes allocated over the lifetime.
- current(self, AllocType alloc_type=AllocType.ALL)#
Current memory usage in bytes.
- Parameters:
- alloc_type
Allocator type to query. Defaults to ALL.
- Returns:
- Current memory usage in bytes.
- num_current_allocs(self, AllocType alloc_type=AllocType.ALL)#
Number of currently active (non-deallocated) allocations.
- Parameters:
- alloc_type
Allocator type to query. Defaults to ALL.
- Returns:
- Number of active allocations.
- num_total_allocs(self, AllocType alloc_type=AllocType.ALL)#
Total number of allocations performed.
- Parameters:
- alloc_type
Allocator type to query. Defaults to ALL.
- Returns:
- Number of total allocations.
- peak(self, AllocType alloc_type=AllocType.ALL)#
Peak memory usage in bytes.
- Parameters:
- alloc_type
Allocator type to query. Defaults to ALL.
- Returns:
- Peak memory usage in bytes.
- record_allocation(self, AllocType alloc_type, uint64_t nbytes)#
Record a memory allocation event.
Updates internal statistics for memory allocation and adjusts peak usage if the current memory usage exceeds the previous peak.
- Parameters:
- alloc_type
The allocator that performed the allocation.
- nbytes
The number of bytes allocated.
- record_deallocation(self, AllocType alloc_type, uint64_t nbytes)#
Record a memory deallocation event.
Updates internal statistics to reflect memory being freed.
- Parameters:
- alloc_type
The allocator that performed the deallocation.
- nbytes
The number of bytes deallocated.
- total(self, AllocType alloc_type=AllocType.ALL)#
Total number of bytes allocated over the lifetime.
- Parameters:
- alloc_type
Allocator type to query. Defaults to ALL.
- Returns:
- Total allocated bytes.