API Reference#
This page contains the API reference for rapidsmpf
.
Integrations#
The subpackages under rapidsmpf.integrations
contain integrations with other
libraries.
Dask#
RapidsMPF Dask Integrations.
- class rapidsmpf.integrations.dask.DaskIntegration(*args, **kwargs)#
dask-integration protocol.
This protocol can be used to implement a RapidsMPF-shuffle operation using a Dask task graph.
Methods
extract_partition
(partition_id, shuffler, ...)Extract a DataFrame partition from a RapidsMPF Shuffler.
insert_partition
(df, partition_id, ...)Add a partition to a RapidsMPF Shuffler.
- static extract_partition(partition_id: int, shuffler: Shuffler, options: Any) DataFrameT #
Extract a DataFrame partition from a RapidsMPF Shuffler.
- Parameters:
- partition_id
Partition id to extract.
- shuffler
The RapidsMPF Shuffler object to extract from.
- options
Additional options.
- Returns:
- A shuffled DataFrame partition.
- static insert_partition(df: DataFrameT, partition_id: int, partition_count: int, shuffler: Shuffler, options: Any, *other: Any) None #
Add a partition to a RapidsMPF Shuffler.
- Parameters:
- df
DataFrame partition to add to a RapidsMPF shuffler.
- partition_id
The input partition id of
df
.- partition_count
Number of output partitions for the current shuffle.
- shuffler
The RapidsMPF Shuffler object to extract from.
- options
Additional options.
- *other
Other data needed for partitioning. For example, this may be boundary values needed for sorting.
- rapidsmpf.integrations.dask.bootstrap_dask_cluster(client: distributed.Client, *, options: Options = <rapidsmpf.config.Options object>) None #
Setup a Dask cluster for RapidsMPF shuffling.
Calling
bootstrap_dask_cluster
multiple times on the same worker is a noop, which also means that any new options values are ignored.- Parameters:
- client
The current Dask client.
- options
Configuration options. Reads environment variables for any options not set explicitly using
get_environment_variables()
.
Notes
This utility must be executed before RapidsMPF shuffling can be used within a Dask cluster. This function is called automatically by
rapidsmpf.integrations.dask.rapidsmpf_shuffle_graph
, but may be called manually to set things up before the first shuffle.Subsequent shuffles on the same cluster will reuse the resources established on the cluster by this function.
All the workers reported by
distributed.Client.scheduler_info()
will be used. Note that RapidsMPF does not currently support adding or removing workers from the cluster.
- rapidsmpf.integrations.dask.rapidsmpf_shuffle_graph(input_name: str, output_name: str, partition_count_in: int, partition_count_out: int, integration: ~rapidsmpf.integrations.dask.shuffler.DaskIntegration, options: ~typing.Any, *other_keys: str | tuple[str, int], config_options: ~rapidsmpf.config.Options = <rapidsmpf.config.Options object>) dict[Any, Any] #
Return the task graph for a RapidsMPF shuffle.
- Parameters:
- input_name
The task name for input DataFrame tasks.
- output_name
The task name for output DataFrame tasks.
- partition_count_in
Partition count of input collection.
- partition_count_out
Partition count of output collection.
- integration
Dask-integration specification.
- options
Optional key-word arguments.
- *other_keys
Other keys needed by
integration.insert_partition
.- config_options
RapidsMPF configuration options.
- Returns:
- A valid task graph for Dask execution.
Notes
A RapidsMPF shuffle operation comprises four general phases:
Staging phase A new
rapidsmpf.shuffler.Shuffler
object must be staged on every worker in the current Dask cluster.Insertion phase Each input partition is split into a dictionary of chunks, and that dictionary is passed to the appropriate
rapidsmpf.shuffler.Shuffler
object (usingrapidsmpf.shuffler.Shuffler.insert_chunks
).The insertion phase will include a single task for each of the
partition_count_in
partitions in the input DataFrame. The partitioning and insertion logic must be defined by theinsert_partition
classmethod of theintegration
argument.Insertion tasks are NOT restricted to specific Dask workers. These tasks may run anywhere in the cluster.
Barrier phase All
rapidsmpf.shuffler.Shuffler
objects must be ‘informed’ that the insertion phase is complete (on all workers) before the subsequent extraction phase begins. We call this synchronization step the ‘barrier phase’.The barrier phase comprises three types of barrier tasks:
1. First global barrier - A single barrier task is used to signal that all input partitions have been submitted to a
rapidsmpf.shuffler.Shuffler
object on one of the workers. This task may also run anywhere on the cluster, but it must depend on ALL insertion tasks.2. Worker barrier(s) - Each worker must execute a single worker-barrier task. This task will call
insert_finished
for every output partition on the localrapidsmpf.shuffler.Shuffler
. These tasks must be restricted to specific workers, and they must all depend on the first global barrier.3. Second global barrier - A single barrier task is used to signal that all workers are ready to begin the extraction phase. This task may run anywhere on the cluster, but it must depend on all worker-barrier tasks.
Extraction phase Each output partition is extracted from the local
rapidsmpf.shuffler.Shuffler
object on the worker (usingrapidsmpf.shuffler.Shuffler.wait_on
andrapidsmpf.integrations.cudf.partition.unpack_and_concat
).The extraction phase will include a single task for each of the
partition_count_out
partitions in the shuffled output DataFrame. The extraction logic must be defined by theextract_partition
classmethod of theintegration
argument.Extraction tasks must be restricted to specific Dask workers, and they must also depend on the second global-barrier task.
Ray#
Integration for Ray clusters.
- class rapidsmpf.integrations.ray.RapidsMPFActor(nranks: int)#
RapidsMPFActor is a base class that instantiates a UCXX communication within them.
- Parameters:
- nranks
The number of workers in the cluster.
- Attributes:
comm
The UCXX communicator object.
Methods
Check if the communicator is initialized.
nranks
()Get the number of ranks in the UCXX communicator.
rank
()Get the rank of the worker, as inferred from the UCXX communicator.
Setup root communicator in the cluster.
setup_worker
(root_address_bytes)Setup the worker in the cluster once the root is initialized.
Return a string representation of the actor.
Examples
>>> @ray.remote(num_cpus=1) ... class DummyActor(RapidsMPFActor): ... >>> actors = setup_ray_ucx_cluster(DummyActor, 2) >>> ray.get([actor.status_check.remote() for actor in actors]
- property comm: Communicator#
The UCXX communicator object.
- Returns:
- The UCXX communicator object if initialized, otherwise None
- Raises:
- RuntimeError if the communicator is not initialized
Notes
This property is not meant to be called remotely from the client. Then Ray will attempt to serialize the Communicator object, which will fail. Instead, the subclasses can use the
comm
property to access the communicator. For example, to create a Shuffle operation
- is_initialized() bool #
Check if the communicator is initialized.
- Returns:
- True if the communicator is initialized, False otherwise.
- nranks() int #
Get the number of ranks in the UCXX communicator.
- Returns:
- The number of ranks in the UCXX communicator
- rank() int #
Get the rank of the worker, as inferred from the UCXX communicator.
- Returns:
- The rank of the worker
- setup_root() tuple[int, bytes] #
Setup root communicator in the cluster.
- Returns:
- rank
The rank of the root.
- root_address_bytes
The address of the root.
- rapidsmpf.integrations.ray.setup_ray_ucxx_cluster(actor_cls: ActorClass, num_workers: int, *args: Any, **kwargs: Any) list[ActorHandle] #
A utility method to setup the UCXX communication using RapidsMPFActor actor objects.
- Parameters:
- actor_cls
The actor class to be instantiated in the cluster.
- num_workers
The number of workers in the cluster.
- *args
Additional arguments to be passed to the actor class.
- **kwargs
Additional keyword arguments to be passed to the actor class.
- Returns:
- gpu_actors
A list of actors in the cluster.
cuDF#
Collection of cuDF specific functions.
Partition#
- rapidsmpf.integrations.cudf.partition.split_and_pack(table: Table, splits: Iterable[int], stream: Stream, device_mr: DeviceMemoryResource) dict[int, PackedData] #
- rapidsmpf.integrations.cudf.partition.unpack_and_concat(partitions: Iterable[PackedData], stream: Stream, device_mr: DeviceMemoryResource) Table #
Shuffler#
The Shuffler interface for RapidsMPF.
- class rapidsmpf.shuffler.Shuffler(Communicator comm, ProgressThread progress_thread, uint8_t op_id, uint32_t total_num_partitions, stream, BufferResource br, Statistics statistics=None)#
Shuffle service for partitioned data.
The
rapidsmpf.shuffler.Shuffler
class provides an interface for performing a shuffle operation on partitioned data. It uses a distribution scheme to distribute and collect data chunks across different ranks.- Parameters:
- comm
The communicator to use for data exchange between ranks.
- progress_thread
The progress thread to use for tracking progress.
- op_id
The operation ID of the shuffle. Must have a value between 0 and
max_concurrent_shuffles-1
.- total_num_partitions
Total number of partitions in the shuffle.
- stream
The CUDA stream used for memory operations.
- br
The buffer resource used to allocate temporary storage and shuffle results.
- statistics
The statistics instance to use. If None, statistics is disabled.
- Attributes:
- max_concurrent_shuffles
Maximum number of concurrent shufflers.
Methods
concat_insert
(self, chunks)Insert a batch of packed (serialized) chunks into the shuffle while concatenating the chunks based on the destination rank.
extract
(self, uint32_t pid)Extract all chunks of the specified partition.
finished
(self)Check if all partitions are finished.
insert_chunks
(self, chunks)Insert a batch of packed (serialized) chunks into the shuffle.
insert_finished
(self, pids)Mark partitions as finished.
shutdown
(self)Shutdown the shuffle, blocking until all inflight communication is completed.
wait_any
(self)Wait for any partition to finish.
wait_on
(self, uint32_t pid)Wait for a specific partition to finish.
Notes
This class is designed to handle distributed operations by partitioning data and redistributing it across ranks in a cluster. It is typically used in distributed data processing workflows involving cuDF tables.
- comm#
Get the communicator used by the shuffler.
- Returns:
- The communicator.
- concat_insert(self, chunks: Mapping[int, PackedData])#
Insert a batch of packed (serialized) chunks into the shuffle while concatenating the chunks based on the destination rank.
- Parameters:
- chunks
A map where keys are partition IDs (
int
) and values are packed data (PackedData
).
Notes
There are some considerations for using this method:
The chunks are grouped by the destination rank of the partition ID and concatenated on device memory.
The caller thread will perform the concatenation, and hence it will be blocked.
Concatenation may cause device memory pressure.
- extract(self, uint32_t pid)#
Extract all chunks of the specified partition.
- Parameters:
- pid
The partition ID to extract chunks for.
- Returns:
- A list of packed data belonging to the specified partition.
- finished(self)#
Check if all partitions are finished.
This method verifies if all partitions have been completed, meaning all chunks have been inserted and no further data is expected from neither the local nor any remote nodes.
- Returns:
- True if all partitions are finished, otherwise False.
- insert_chunks(self, chunks: Mapping[int, PackedData])#
Insert a batch of packed (serialized) chunks into the shuffle.
- Parameters:
- chunks
A map where keys are partition IDs (
int
) and values are packed data (PackedData
).
Notes
This method adds the given chunks to the shuffle, associating them with their respective partition IDs.
- insert_finished(self, pids: int | Iterable[int])#
Mark partitions as finished.
This informs the shuffler that no more chunks for the specified partitions will be inserted.
- Parameters:
- pids
Partition IDs to mark as finished (int or an iterable of ints).
Notes
Once a partition is marked as finished, it is considered complete and no further chunks will be accepted for that partition.
- shutdown(self)#
Shutdown the shuffle, blocking until all inflight communication is completed.
- Raises:
- RuntimeError
If the shuffler is already inactive.
Notes
This method ensures that all pending shuffle operations and communications are completed before shutting down. It blocks until no inflight operations remain.
- wait_any(self)#
Wait for any partition to finish.
This method blocks until at least one partition is marked as finished. It is useful for processing partitions as they are completed.
- Returns:
- The partition ID of the next finished partition.
- wait_on(self, uint32_t pid)#
Wait for a specific partition to finish.
This method blocks until the desired partition is ready for processing.
- Parameters:
- pid
The desired partition ID.
Communicator#
Submodule for communication abstraction (e.g. UCXX and MPI).
- rapidsmpf.communicator.COMMUNICATORS = ('single', 'ucxx', 'mpi')#
Tuple of available communicators.
RapidsMPF includes a collection of communicator backends, available as submodules under
rapidsmpf.communicator.*
. Typically, the Conda distribution includes both UCXX and MPI support, while the PIP installation generally supports only UCXX.
- class rapidsmpf.communicator.communicator.Communicator#
Abstract base class for a communication mechanism between nodes.
Provides an interface for sending and receiving messages between nodes, supporting asynchronous operations, GPU data transfers, and custom logging. Concrete implementations must define the virtual methods to enable specific communication backends.
- Attributes:
Methods
get_str
(self)Get a string representation of the communicator.
Notes
This class is designed as an abstract base class, meaning it cannot be instantiated directly. Subclasses are required to implement the necessary methods to support the desired communication backend and functionality.
- get_str(self)#
Get a string representation of the communicator.
- Returns:
- A string describing the communicater
- logger#
Get the logger.
- Returns:
- A logger instance.
- nranks#
Get the total number of ranks.
- Returns:
- Total number of ranks.
- rank#
Get the rank of this communication node.
- Returns:
- The rank.
- class rapidsmpf.communicator.communicator.LOG_LEVEL(*values)#
- Attributes:
denominator
the denominator of a rational number in lowest terms
imag
the imaginary part of a complex number
numerator
the numerator of a rational number in lowest terms
real
the real part of a complex number
Methods
as_integer_ratio
(/)Return a pair of integers, whose ratio is equal to the original int.
bit_count
(/)Number of ones in the binary representation of the absolute value of self.
bit_length
(/)Number of bits necessary to represent self in binary.
conjugate
(/)Returns self, the complex conjugate of any int.
from_bytes
(/, bytes[, byteorder, signed])Return the integer represented by the given array of bytes.
is_integer
(/)Returns True.
to_bytes
(/[, length, byteorder, signed])Return an array of bytes representing an integer.
- class rapidsmpf.communicator.communicator.Logger#
Logger.
- To control the verbosity level, set the environment variable
RAPIDSMPF_LOG
: NONE: No logging.
PRINT: General print messages.
WARN: Warning messages (default)
INFO: Informational messages.
DEBUG: Debug messages.
TRACE: Trace messages.
- Attributes:
verbosity_level
Get the verbosity level of the logger.
Methods
debug
(self, str msg)Logs a debug message.
info
(self, str msg)Logs an informational message.
print
(self, str msg)Logs a print message.
trace
(self, str msg)Logs a trace message.
warn
(self, str msg)Logs a warning message.
- debug(self, str msg: str)#
Logs a debug message.
- Parameters:
- msg
The message to log.
- info(self, str msg: str)#
Logs an informational message.
- Parameters:
- msg
The message to log.
- print(self, str msg: str)#
Logs a print message.
- Parameters:
- msg
The message to log.
- trace(self, str msg: str)#
Logs a trace message.
- Parameters:
- msg
The message to log.
- verbosity_level#
Get the verbosity level of the logger.
- Returns:
- The verbosity level.
- warn(self, str msg: str)#
Logs a warning message.
- Parameters:
- msg
The message to log.
- To control the verbosity level, set the environment variable
MPI Communicator#
UCXX Communicator#
ucxx-based implementation of a RapidsMPF Communicator.
- rapidsmpf.communicator.ucxx.barrier(Communicator comm)#
Execute a barrier on the UCXX communicator.
Ensures all ranks connected to the root and all ranks reached the barrier before continuing.
Notes
Executing this barrier is required after the ranks are bootstrapped to ensure everyone is connected to the root. An alternative barrier, such as
MPI_Barrier
will not suffice for that purpose.
- rapidsmpf.communicator.ucxx.get_root_ucxx_address(Communicator comm)#
Get the address of the communicator’s UCXX worker.
This function is intended to be called from the root rank to communicate to other processes how to reach the root, but it will return the address of UCXX worker of other ranks too.
- Parameters:
- comm
The RapidsMPF-UCXX communicator.
- Returns:
- A bytes sequence with the UCXX worker address.
- Raises:
- NotImplementedError
If the communicator was created with a HostPortPair, which is not yet supported.
- rapidsmpf.communicator.ucxx.new_communicator(Rank nranks, UCXWorker ucx_worker, UCXAddress root_ucxx_address, Options options)#
Create a new UCXX communicator with the given number of ranks.
An existing UCXWorker may be specified, otherwise one will be created. The root rank is created if no
root_ucxx_address
is specific, all other ranks must specify the the address of the root rank via that argument.- Parameters:
- nranks
The number of ranks in the cluster.
- ucx_worker
An existing UCXX worker to use if specified, otherwise one will be created.
- root_ucxx_address
The UCXX address of the root rank (only specified for non-root ranks).
- options
Configuration options.
- Returns:
- A new RapidsMPF-UCXX communicator.
Buffer#
Submodule for buffer abstraction.
- class rapidsmpf.buffer.buffer.MemoryType(*values)#
- Attributes:
denominator
the denominator of a rational number in lowest terms
imag
the imaginary part of a complex number
numerator
the numerator of a rational number in lowest terms
real
the real part of a complex number
Methods
as_integer_ratio
(/)Return a pair of integers, whose ratio is equal to the original int.
bit_count
(/)Number of ones in the binary representation of the absolute value of self.
bit_length
(/)Number of bits necessary to represent self in binary.
conjugate
(/)Returns self, the complex conjugate of any int.
from_bytes
(/, bytes[, byteorder, signed])Return the integer represented by the given array of bytes.
is_integer
(/)Returns True.
to_bytes
(/[, length, byteorder, signed])Return an array of bytes representing an integer.
- class rapidsmpf.buffer.resource.BufferResource#
Class managing buffer resources.
This class handles memory allocation and transfers between different memory types (e.g., host and device). All memory operations in RapidsMPF, such as those performed by the Shuffler, rely on a buffer resource for memory management.
- Parameters:
- device_mr
Reference to the RMM device memory resource used for device allocations.
- memory_available
Optional memory availability functions. Memory types without availability functions are unlimited. A function must return the current available memory of a specific type. It must be thread-safe if used by multiple
BufferResource
instances concurrently. Warning: calling anyBufferResource
instance methods within the function might result in a deadlock. This is because the buffer resource is locked when the function is called.- periodic_spill_check
Enable periodic spill checks. A dedicated thread continuously checks and perform spilling based on the memory availability functions. The value of
periodic_spill_check
is used as the pause between checks (in seconds). If None, no periodic spill check is performed.
- Attributes:
- spill_manager
Methods
memory_reserved
(self, MemoryType mem_type)Get the current reserved memory of the specified memory type.
- memory_reserved(self, MemoryType mem_type)#
Get the current reserved memory of the specified memory type.
- Parameters:
- mem_type
The target memory type.
- Returns:
- The memory reserved, in bytes.
- class rapidsmpf.buffer.resource.LimitAvailableMemory(RmmResourceAdaptor mr, int64_t limit)#
A callback class for querying the remaining available memory within a defined limit from an RMM resource adaptor.
This class is primarily designed to simulate constrained memory environments or prevent memory allocation beyond a specific threshold. It provides information about the available memory by subtracting the memory currently used (as reported by the RMM resource adaptor) from a user-defined limit.
It is typically used in the context of memory management operations such as with
BufferResource
.- Parameters:
- mr
A statistics resource adaptor that tracks memory usage and provides statistics about the memory consumption. The
LimitAvailableMemory
instance keeps a reference tomr
to keep it alive.- limit
The maximum memory limit (in bytes). Used to calculate the remaining available memory.
Methods
__call__
(self)Returns the remaining available memory within the defined limit.
Notes
The
mr
resource must not be destroyed while this object is still in use.Examples
>>> mr = RmmResourceAdaptor(...) >>> memory_limiter = LimitAvailableMemory(mr, limit=1_000_000)
- class rapidsmpf.buffer.packed_data.PackedData#
Methods
from_cudf_packed_columns
(cls, ...)Constructs a PackedData from CudfPackedColumns by taking the ownership of the data and releasing
packed_columns
.- classmethod from_cudf_packed_columns(cls, PackedColumns packed_columns)#
Constructs a PackedData from CudfPackedColumns by taking the ownership of the data and releasing
packed_columns
.- Parameters:
- packed_columns
Packed data containing metadata and GPU data buffers
- Returns:
- A new PackedData instance containing the packed columns data
- Raises:
- ValueError
If the PackedColumns object is empty (has been released already).
Config Options#
- class rapidsmpf.config.Optional(value)#
Represents an option value that can be explicitly disabled.
This class wraps an option value and interprets certain strings as indicators that the value is disabled (case-insensitive): {“false”, “no”, “off”, “disable”, “disabled”}.
This is typically used to simplify optional or Optional options with
Options.get_or_default()
.- Parameters:
- value
The input value to interpret.
- Attributes:
- value
The raw input value, unless it matched a disable keyword, in which case the value is
None
.
Examples
>>> from rapidsmpf.config import Optional, Options >>> Optional("OFF").value None
>>> Optional("no").value None
>>> Optional("100").value '100'
>>> Optional("").value ''
>>> opts = Options() >>> opts.get_or_default( ... "dask_periodic_spill_check", ... default_value=Optional(1e-3) ... ).value 0.001
- class rapidsmpf.config.OptionalBytes(value)#
Represents a byte-sized option that can be explicitly disabled.
This class is a specialization of
Optional
that interprets the input as a human-readable byte size string (e.g., “100 MB”, “1KiB”, “1e6”). If the input is one of the disable keywords (e.g., “off”, “no”, “false”), the value is treated as disabled (None
). Otherwise, it is parsed to an integer number of bytes usingrapidsmpf.utils.string.parse_bytes()
.This is useful for configuration options that may be set to a size limit or explicitly turned off.
- Parameters:
- value
A human-readable byte size (e.g., “1MiB”, “100 MB”) or a disable keyword (case-insensitive), or an integer number of bytes.
- Attributes:
- value
The size in bytes, or
None
if disabled.
Examples
>>> from rapidsmpf.config import OptionalBytes >>> OptionalBytes("1KiB").value 1024
>>> OptionalBytes("OFF").value is None True
>>> OptionalBytes(2048).value 2048
- class rapidsmpf.config.Options#
Initialize an Options object with a dictionary of string options.
- Parameters:
- options_as_strings
A dictionary representing option names and their corresponding values.
Methods
deserialize
(bytes serialized_buffer)Deserialize a binary buffer into an Options object.
get
(self, str key, *, return_type, factory)Retrieves a configuration option by key.
get_or_default
(self, str key, *, default_value)Retrieve a configuration option by key, using a default value if not present.
get_strings
(self)Get all option key-value pairs as strings.
insert_if_absent
(self, dict options_as_strings)Insert multiple options if they are not already present.
serialize
(self)Serialize the Options object into a binary buffer.
- static deserialize(bytes serialized_buffer)#
Deserialize a binary buffer into an Options object.
This method reconstructs an Options instance from a byte buffer produced by the
Options.serialize()
method.See
Options.serialize()
for the binary format.- Parameters:
- serialized_buffer
A buffer containing serialized options in the defined binary format.
- Returns:
- Options
A reconstructed Options instance containing the deserialized key-value pairs.
- Raises:
- ValueError
If the input buffer is malformed or inconsistent with the expected format.
- get(self, str key, *, return_type, factory)#
Retrieves a configuration option by key.
If the option is not present, it is constructed using the provided factory function, which receives the string representation of the option (or an empty string if unset). The option is cached after the first access.
The option is cast to the specified
return_type
. To be accessible from C++, it must be one of:bool
,int
,float
,str
. Otherwise, it is stored as aPyObject*
.Once a key has been accessed with a particular
return_type
, subsequent calls toget
with the same key must use the samereturn_type
. Using a different type for the same key will result in aTypeError
.- Parameters:
- Returns:
- The value of the requested option, cast to the specified
return_type
.
- The value of the requested option, cast to the specified
- Raises:
- ValueError
If the
return_type
is unsupported, or if the stored option type does not match the expected type.- TypeError
If the option has already been accessed with a different
return_type
.
Warning
The factory must not access the Options instance, as this may lead to a deadlock due to internal locking.
- get_or_default(self, str key, *, default_value)#
Retrieve a configuration option by key, using a default value if not present.
This is a convenience wrapper around
get()
that uses the type of thedefault_value
as the return type and provides a default factory that parses a string into that type.- Parameters:
- key
The name of the option to retrieve.
- default_value
The default value to return if the option is not set. Its type is used to determine the expected return type.
- Returns:
- The value of the option if it exists and can be parsed to the type of
default_value
, otherwisedefault_value
.
- Raises:
- ValueError
If the stored option value cannot be parsed to the required type.
- TypeError
If the option has already been accessed with a different return type.
Notes
This method infers the return type from
type(default_value)
.If
default_value
is used, it will be cached and reused for subsequent accesses of the same key.
Examples
>>> opts = Options() >>> opts.get_or_default("debug", default_value=False) False >>> opts.get_or_default("timeout", default_value=1.5) 1.5 >>> opts.get_or_default("level", default_value="info") 'info'
- get_strings(self)#
Get all option key-value pairs as strings.
- Returns:
- A dictionary containing all stored options, where the keys and values are
- both strings.
- insert_if_absent(self, dict options_as_strings)#
Insert multiple options if they are not already present.
Attempts to insert each key-value pair from the provided dictionary, skipping keys that already exist in the options.
- Parameters:
- options_as_strings
Dictionary of option keys mapped to their string representations. Keys are inserted only if they do not already exist. The keys are trimmed and converted to lower case before insertion.
- Returns:
- Number of newly inserted options (0 if none were added).
- serialize(self) bytes #
Serialize the Options object into a binary buffer.
This method produces a compact binary representation of the internal key-value options. The format is suitable for storage or transmission and can be later restored using
Options.deserialize()
.- The binary format is:
[uint64_t count] — number of key-value pairs
[count * 2 * uint64_t] — offset pairs (key_offset, value_offset)
[raw bytes] — key and value strings stored contiguously
- Returns:
- bytes
A
bytes
object containing the serialized binary representation of the options.
- Raises:
- ValueError
If any option has already been accessed and cannot be serialized.
Notes
An Options instance can only be serialized if no options have been accessed. This is because serialization is based on the original string representations of the options. Once an option has been accessed and parsed, its string value may no longer accurately reflect its state, making serialization potentially inconsistent.
- rapidsmpf.config.get_environment_variables(str key_regex='RAPIDSMPF_(.*)')#
Returns a dictionary of environment variables matching a given regular expression.
This function scans the current process’s environment variables and inserts those whose keys match the provided regular expression. The regular expression must contain exactly one capture group to extract the portion of the environment variable key to use as the dictionary key.
For example, to strip the
RAPIDSMPF_
prefix, user"RAPIDSMPF_(.*)"
as the regex. The captured group will be used as the key in the output dictionary.- Example:
Environment variable: RAPIDSMPF_FOO=bar
key_regex: r”RAPIDSMPF_(.*)”
Resulting dictionary entry: { “FOO”: “bar” }
- Parameters:
- key_regex
A regular expression with a single capture group to match and extract the environment variable keys.
- Returns:
- A dictionary containing all matching environment variables, with keys as
- extracted by the capture group.
- Raises:
- ValueError
If
key_regex
does not contain exactly one capture group.
See also
os.environ
Dictionary of the current environment variables.