Namespaces | |
| node | |
Classes | |
| struct | PackedDataChunk |
Chunk of PackedData. More... | |
| struct | PartitionMapChunk |
| Chunk of packed partitions identified by partition ID. More... | |
| struct | PartitionVectorChunk |
| Chunk of packed partitions stored as a vector. More... | |
| class | AllGather |
Asynchronous (coroutine) interface to allgather::AllGather. More... | |
| class | ShufflerAsync |
| An asynchronous shuffler that allows concurrent insertion and extraction of data. More... | |
| class | Channel |
| A coroutine-based channel for sending and receiving messages asynchronously. More... | |
| class | ThrottlingAdaptor |
| An adaptor to throttle access to a channel. More... | |
| class | ShutdownAtExit |
| Helper RAII class to shut down channels when they go out of scope. More... | |
| class | Context |
| Context for nodes (coroutines) in rapidsmpf. More... | |
| class | Lineariser |
| Linearise insertion into an output channel from a fixed number of producers by sequence number. More... | |
| class | Message |
| Type-erased message wrapper around a payload. More... | |
| class | BoundedQueue |
A bounded queue for type-erased Messages. More... | |
| class | SpillableMessages |
| Container for individually spillable messages. More... | |
| struct | Filter |
| Filter ast expression with lifetime/stream management. More... | |
| class | TableChunk |
| A unit of table data in a streaming pipeline. More... | |
Typedefs | |
| using | Semaphore = coro::semaphore< std::numeric_limits< std::ptrdiff_t >::max()> |
| An awaitable semaphore to manage acquisition and release of finite resources. | |
| using | Node = coro::task< void > |
| Alias for a node in a streaming pipeline. More... | |
Functions | |
| ContentDescription | get_content_description (PackedDataChunk const &obj) |
Generate a content description for a PackedDataChunk. More... | |
| Message | to_message (std::uint64_t sequence_number, std::unique_ptr< PackedDataChunk > chunk) |
Wrap a PackedDataChunk into a Message. More... | |
| ContentDescription | get_content_description (PartitionMapChunk const &obj) |
Generate a content description for a PartitionMapChunk. More... | |
| ContentDescription | get_content_description (PartitionVectorChunk const &obj) |
Generate a content description for a PartitionVectorChunk. More... | |
| Message | to_message (std::uint64_t sequence_number, std::unique_ptr< PartitionMapChunk > chunk) |
Wrap a PartitionMapChunk into a Message. More... | |
| Message | to_message (std::uint64_t sequence_number, std::unique_ptr< PartitionVectorChunk > chunk) |
Wrap a PartitionVectorChunk into a Message. More... | |
| template<std::ranges::range Range> | |
| auto | coro_results (Range &&task_results) |
| Collect the results of multiple finished coroutines. More... | |
| template<typename... Args> | |
| auto | coro_results (std::tuple< Args... > &&results) |
| Collect the results of multiple finished coroutines from a tuple. More... | |
| void | run_streaming_pipeline (std::vector< Node > nodes) |
| Runs a list of nodes concurrently and waits for all to complete. More... | |
| ContentDescription | get_content_description (TableChunk const &obj) |
Generate a content description for a TableChunk. More... | |
| Message | to_message (std::uint64_t sequence_number, std::unique_ptr< TableChunk > chunk) |
Wrap a TableChunk into a Message. More... | |
SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. SPDX-License-Identifier: Apache-2.0
| using rapidsmpf::streaming::Node = typedef coro::task<void> |
| auto rapidsmpf::streaming::coro_results | ( | Range && | task_results | ) |
Collect the results of multiple finished coroutines.
This helper consumes a range of coroutine result objects (e.g., from coro::when_all or coro::when_any) and extracts their return values by invoking .return_value() on each element.
T, all values are collected into a std::vector<T> and returned.void, the function simply invokes .return_value() on each element to surface any unhandled exceptions and then returns void.| Range | A range type whose elements support a .return_value() member function. Typically the result of functions and coroutines like coro::when_all and coro::wait_all |
| task_results | A range of completed coroutine results. |
std::vector<T> if the underlying tasks return a value of type T or void if the underlying tasks return void..return_value() on each element, or use the tuple form of coro_results.coro::when_all and coro::wait_all must always be retrieved by calling .return_value() (either directly or via this helper). Failing to do so leaves exceptions unobserved, which can cause the streaming pipeline to deadlock or hang indefinitely while waiting for error propagation. Definition at line 48 of file coro_utils.hpp.
| auto rapidsmpf::streaming::coro_results | ( | std::tuple< Args... > && | results | ) |
Collect the results of multiple finished coroutines from a tuple.
This overload works with a tuple of coroutine result objects, typically from co_await coro::when_all(...).
std::tuple<T1, T2, ...> and returned.void, the function simply invokes .return_value() on each element to surface any unhandled exceptions and then returns void.| Args | Types of coroutine result objects in the tuple |
| results | Tuple of coroutine result objects to extract values from |
std::tuple<T1, T2, ...> if the underlying tasks return values, or void if all underlying tasks return void. Definition at line 87 of file coro_utils.hpp.
|
inline |
Generate a content description for a PackedDataChunk.
| obj | The object's content to describe. |
Definition at line 30 of file packed_data.hpp.
| ContentDescription rapidsmpf::streaming::get_content_description | ( | PartitionMapChunk const & | obj | ) |
Generate a content description for a PartitionMapChunk.
| obj | The object's content to describe. |
| ContentDescription rapidsmpf::streaming::get_content_description | ( | PartitionVectorChunk const & | obj | ) |
Generate a content description for a PartitionVectorChunk.
| obj | The object's content to describe. |
| ContentDescription rapidsmpf::streaming::get_content_description | ( | TableChunk const & | obj | ) |
Generate a content description for a TableChunk.
| obj | The object's content to describe. |
| void rapidsmpf::streaming::run_streaming_pipeline | ( | std::vector< Node > | nodes | ) |
Runs a list of nodes concurrently and waits for all to complete.
This function schedules each node and blocks until all of them have finished execution. Typically used to launch multiple producer/consumer coroutines in parallel.
| nodes | A vector of nodes to run. |
| Message rapidsmpf::streaming::to_message | ( | std::uint64_t | sequence_number, |
| std::unique_ptr< PackedDataChunk > | chunk | ||
| ) |
Wrap a PackedDataChunk into a Message.
| sequence_number | Ordering identifier for the message. |
| chunk | The chunk to wrap into a message. |
Message encapsulating the provided chunk as its payload. Definition at line 44 of file packed_data.hpp.
| Message rapidsmpf::streaming::to_message | ( | std::uint64_t | sequence_number, |
| std::unique_ptr< PartitionMapChunk > | chunk | ||
| ) |
Wrap a PartitionMapChunk into a Message.
| sequence_number | Ordering identifier for the message. |
| chunk | The chunk to wrap into a message. |
Message encapsulating the provided chunk as its payload. | Message rapidsmpf::streaming::to_message | ( | std::uint64_t | sequence_number, |
| std::unique_ptr< PartitionVectorChunk > | chunk | ||
| ) |
Wrap a PartitionVectorChunk into a Message.
| sequence_number | Ordering identifier for the message. |
| chunk | The chunk to wrap into a message. |
Message encapsulating the provided chunk as its payload. | Message rapidsmpf::streaming::to_message | ( | std::uint64_t | sequence_number, |
| std::unique_ptr< TableChunk > | chunk | ||
| ) |
Wrap a TableChunk into a Message.
| sequence_number | Ordering identifier for the message. |
| chunk | The chunk to wrap into a message. |
Message encapsulating the provided chunk as its payload.