Namespaces | |
| actor | |
Classes | |
| struct | PartitionMapChunk |
| Chunk of packed partitions identified by partition ID. More... | |
| struct | PartitionVectorChunk |
| Chunk of packed partitions stored as a vector. More... | |
| class | AllGather |
Asynchronous (coroutine) interface to coll::AllGather. More... | |
| class | ShufflerAsync |
| An asynchronous shuffler that allows concurrent insertion and extraction of data. More... | |
| class | Channel |
| A coroutine-based channel for sending and receiving messages asynchronously. More... | |
| class | ThrottlingAdaptor |
| An adaptor to throttle access to a channel. More... | |
| class | ShutdownAtExit |
| Helper RAII class to shut down channels when they go out of scope. More... | |
| class | Context |
| Context for actors (coroutines) in rapidsmpf. More... | |
| class | CoroThreadPoolExecutor |
Executor wrapper around a coro::thread_pool used for coroutine execution. More... | |
| class | Lineariser |
| Linearise insertion into an output channel from a fixed number of producers by sequence number. More... | |
| class | MemoryReserveOrWait |
| Asynchronous coordinator for memory reservation requests. More... | |
| class | Message |
| Type-erased message wrapper around a payload. More... | |
| class | BoundedQueue |
A bounded queue for type-erased Messages. More... | |
| class | SpillableMessages |
| Container for individually spillable messages. More... | |
| struct | BloomFilter |
| Utility managing construction and use of a bloom filter. More... | |
| struct | HashScheme |
| Hash partitioning scheme. More... | |
| struct | PartitioningSpec |
| Partitioning specification for a single hierarchical level. More... | |
| struct | Partitioning |
| Hierarchical partitioning metadata for a data stream. More... | |
| struct | ChannelMetadata |
| Channel-level metadata describing the data stream. More... | |
| struct | Filter |
| Filter ast expression with lifetime/stream management. More... | |
| class | TableChunk |
| A unit of table data in a streaming pipeline. More... | |
Typedefs | |
| using | Actor = coro::task< void > |
| Alias for an actor in a streaming graph. More... | |
| using | Semaphore = coro::semaphore< std::numeric_limits< std::ptrdiff_t >::max()> |
| An awaitable semaphore to manage acquisition and release of finite resources. | |
Functions | |
| ContentDescription | get_content_description (PackedData const &obj) |
Generate a content description for PackedData. More... | |
| Message | to_message (std::uint64_t sequence_number, std::unique_ptr< PackedData > chunk) |
Wrap PackedData into a Message. More... | |
| ContentDescription | get_content_description (PartitionMapChunk const &obj) |
Generate a content description for a PartitionMapChunk. More... | |
| ContentDescription | get_content_description (PartitionVectorChunk const &obj) |
Generate a content description for a PartitionVectorChunk. More... | |
| Message | to_message (std::uint64_t sequence_number, std::unique_ptr< PartitionMapChunk > chunk) |
Wrap a PartitionMapChunk into a Message. More... | |
| Message | to_message (std::uint64_t sequence_number, std::unique_ptr< PartitionVectorChunk > chunk) |
Wrap a PartitionVectorChunk into a Message. More... | |
| void | run_actor_network (std::vector< Actor > actors) |
| Runs a list of actors concurrently and waits for all to complete. More... | |
| template<std::ranges::range Range> | |
| auto | coro_results (Range &&task_results) |
| Collect the results of multiple finished coroutines. More... | |
| template<typename... Args> | |
| auto | coro_results (std::tuple< Args... > &&results) |
| Collect the results of multiple finished coroutines from a tuple. More... | |
| coro::task< MemoryReservation > | reserve_memory (std::shared_ptr< Context > ctx, std::size_t size, std::int64_t net_memory_delta, MemoryType mem_type=MemoryType::DEVICE, std::optional< AllowOverbooking > allow_overbooking=std::nullopt) |
| Reserve memory using the context memory reservation mechanism. More... | |
| Message | to_message (std::uint64_t sequence_number, std::unique_ptr< ChannelMetadata > m) |
Wrap a ChannelMetadata into a Message. More... | |
| ContentDescription | get_content_description (TableChunk const &obj) |
Generate a content description for a TableChunk. More... | |
| Message | to_message (std::uint64_t sequence_number, std::unique_ptr< TableChunk > chunk) |
Wrap a TableChunk into a Message. More... | |
SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. SPDX-License-Identifier: Apache-2.0
SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. SPDX-License-Identifier: Apache-2.0
SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. SPDX-License-Identifier: Apache-2.0
| using rapidsmpf::streaming::Actor = typedef coro::task<void> |
| auto rapidsmpf::streaming::coro_results | ( | Range && | task_results | ) |
Collect the results of multiple finished coroutines.
This helper consumes a range of coroutine result objects (e.g., from coro::when_all or coro::when_any) and extracts their return values by invoking .return_value() on each element.
T, all values are collected into a std::vector<T> and returned.void, the function simply invokes .return_value() on each element to surface any unhandled exceptions and then returns void.| Range | A range type whose elements support a .return_value() member function. Typically the result of functions and coroutines like coro::when_all and coro::wait_all |
| task_results | A range of completed coroutine results. |
std::vector<T> if the underlying tasks return a value of type T or void if the underlying tasks return void..return_value() on each element, or use the tuple form of coro_results.coro::when_all and coro::wait_all must always be retrieved by calling .return_value() (either directly or via this helper). Failing to do so leaves exceptions unobserved, which can cause the streaming pipeline to deadlock or hang indefinitely while waiting for error propagation. Definition at line 48 of file coro_utils.hpp.
| auto rapidsmpf::streaming::coro_results | ( | std::tuple< Args... > && | results | ) |
Collect the results of multiple finished coroutines from a tuple.
This overload works with a tuple of coroutine result objects, typically from co_await coro::when_all(...).
std::tuple<T1, T2, ...> and returned.void, the function simply invokes .return_value() on each element to surface any unhandled exceptions and then returns void.| Args | Types of coroutine result objects in the tuple |
| results | Tuple of coroutine result objects to extract values from |
std::tuple<T1, T2, ...> if the underlying tasks return values, or void if all underlying tasks return void. Definition at line 87 of file coro_utils.hpp.
|
inline |
Generate a content description for PackedData.
| obj | The object's content to describe. |
Definition at line 20 of file packed_data.hpp.
| ContentDescription rapidsmpf::streaming::get_content_description | ( | PartitionMapChunk const & | obj | ) |
Generate a content description for a PartitionMapChunk.
| obj | The object's content to describe. |
| ContentDescription rapidsmpf::streaming::get_content_description | ( | PartitionVectorChunk const & | obj | ) |
Generate a content description for a PartitionVectorChunk.
| obj | The object's content to describe. |
| ContentDescription rapidsmpf::streaming::get_content_description | ( | TableChunk const & | obj | ) |
Generate a content description for a TableChunk.
| obj | The object's content to describe. |
| coro::task<MemoryReservation> rapidsmpf::streaming::reserve_memory | ( | std::shared_ptr< Context > | ctx, |
| std::size_t | size, | ||
| std::int64_t | net_memory_delta, | ||
| MemoryType | mem_type = MemoryType::DEVICE, |
||
| std::optional< AllowOverbooking > | allow_overbooking = std::nullopt |
||
| ) |
Reserve memory using the context memory reservation mechanism.
Submits a memory reservation request for the configured memory type and suspends until the request is satisfied. If no pending reservation request can be satisfied within the configured "memory_reserve_timeout", the behavior depends on allow_overbooking.
This is a convenience helper that returns only the MemoryReservation. If more control is required, for example inspecting the amount of overbooking, callers should use MemoryReserveOrWait directly, such as ctx.memory(MemoryType::DEVICE).reserve_or_wait_or_overbook(size, net_memory_delta).
Priority and progress semantics are identical to MemoryReserveOrWait::reserve_or_wait(). In particular, net_memory_delta is used as a heuristic to prefer eligible requests that are expected to reduce memory pressure sooner. Smaller values have higher priority.
| ctx | Actor context used to obtain the memory reservation handle. |
| size | Number of bytes to reserve. |
| net_memory_delta | Estimated net change in memory usage after the reservation is allocated and the dependent operation completes. Smaller values have higher priority. |
| mem_type | Memory type for which to reserve memory. |
| allow_overbooking | Controls the behavior when no progress is possible within the configured timeout:
|
| std::runtime_error | If shutdown occurs before the request can be processed. |
| rapidsmpf::reservation_error | If no progress is possible within the timeout and allow_overbooking resolves to AllowOverbooking::NO. |
| void rapidsmpf::streaming::run_actor_network | ( | std::vector< Actor > | actors | ) |
Runs a list of actors concurrently and waits for all to complete.
This function schedules each actor and blocks until all of them have finished execution. Typically used to launch multiple producer/consumer coroutines in parallel.
| actors | A vector of actors to run. |
| Message rapidsmpf::streaming::to_message | ( | std::uint64_t | sequence_number, |
| std::unique_ptr< ChannelMetadata > | m | ||
| ) |
Wrap a ChannelMetadata into a Message.
| sequence_number | Ordering identifier for the message. |
| m | The metadata to wrap. |
Message encapsulating the metadata as its payload. | Message rapidsmpf::streaming::to_message | ( | std::uint64_t | sequence_number, |
| std::unique_ptr< PackedData > | chunk | ||
| ) |
Wrap PackedData into a Message.
| sequence_number | Ordering identifier for the message. |
| chunk | The chunk to wrap into a message. |
Message encapsulating the provided chunk as its payload. Definition at line 33 of file packed_data.hpp.
| Message rapidsmpf::streaming::to_message | ( | std::uint64_t | sequence_number, |
| std::unique_ptr< PartitionMapChunk > | chunk | ||
| ) |
Wrap a PartitionMapChunk into a Message.
| sequence_number | Ordering identifier for the message. |
| chunk | The chunk to wrap into a message. |
Message encapsulating the provided chunk as its payload. | Message rapidsmpf::streaming::to_message | ( | std::uint64_t | sequence_number, |
| std::unique_ptr< PartitionVectorChunk > | chunk | ||
| ) |
Wrap a PartitionVectorChunk into a Message.
| sequence_number | Ordering identifier for the message. |
| chunk | The chunk to wrap into a message. |
Message encapsulating the provided chunk as its payload. | Message rapidsmpf::streaming::to_message | ( | std::uint64_t | sequence_number, |
| std::unique_ptr< TableChunk > | chunk | ||
| ) |
Wrap a TableChunk into a Message.
| sequence_number | Ordering identifier for the message. |
| chunk | The chunk to wrap into a message. |
Message encapsulating the provided chunk as its payload.