Namespaces | Classes | Typedefs | Functions
rapidsmpf::streaming Namespace Reference

Namespaces

 node
 

Classes

struct  PackedDataChunk
 Chunk of PackedData. More...
 
struct  PartitionMapChunk
 Chunk of packed partitions identified by partition ID. More...
 
struct  PartitionVectorChunk
 Chunk of packed partitions stored as a vector. More...
 
class  AllGather
 Asynchronous (coroutine) interface to allgather::AllGather. More...
 
class  ShufflerAsync
 An asynchronous shuffler that allows concurrent insertion and extraction of data. More...
 
class  Channel
 A coroutine-based channel for sending and receiving messages asynchronously. More...
 
class  ThrottlingAdaptor
 An adaptor to throttle access to a channel. More...
 
class  ShutdownAtExit
 Helper RAII class to shut down channels when they go out of scope. More...
 
class  Context
 Context for nodes (coroutines) in rapidsmpf. More...
 
class  Lineariser
 Linearise insertion into an output channel from a fixed number of producers by sequence number. More...
 
class  Message
 Type-erased message wrapper around a payload. More...
 
class  BoundedQueue
 A bounded queue for type-erased Messages. More...
 
class  SpillableMessages
 Container for individually spillable messages. More...
 
struct  Filter
 Filter ast expression with lifetime/stream management. More...
 
class  TableChunk
 A unit of table data in a streaming pipeline. More...
 

Typedefs

using Semaphore = coro::semaphore< std::numeric_limits< std::ptrdiff_t >::max()>
 An awaitable semaphore to manage acquisition and release of finite resources.
 
using Node = coro::task< void >
 Alias for a node in a streaming pipeline. More...
 

Functions

ContentDescription get_content_description (PackedDataChunk const &obj)
 Generate a content description for a PackedDataChunk. More...
 
Message to_message (std::uint64_t sequence_number, std::unique_ptr< PackedDataChunk > chunk)
 Wrap a PackedDataChunk into a Message. More...
 
ContentDescription get_content_description (PartitionMapChunk const &obj)
 Generate a content description for a PartitionMapChunk. More...
 
ContentDescription get_content_description (PartitionVectorChunk const &obj)
 Generate a content description for a PartitionVectorChunk. More...
 
Message to_message (std::uint64_t sequence_number, std::unique_ptr< PartitionMapChunk > chunk)
 Wrap a PartitionMapChunk into a Message. More...
 
Message to_message (std::uint64_t sequence_number, std::unique_ptr< PartitionVectorChunk > chunk)
 Wrap a PartitionVectorChunk into a Message. More...
 
template<std::ranges::range Range>
auto coro_results (Range &&task_results)
 Collect the results of multiple finished coroutines. More...
 
template<typename... Args>
auto coro_results (std::tuple< Args... > &&results)
 Collect the results of multiple finished coroutines from a tuple. More...
 
void run_streaming_pipeline (std::vector< Node > nodes)
 Runs a list of nodes concurrently and waits for all to complete. More...
 
ContentDescription get_content_description (TableChunk const &obj)
 Generate a content description for a TableChunk. More...
 
Message to_message (std::uint64_t sequence_number, std::unique_ptr< TableChunk > chunk)
 Wrap a TableChunk into a Message. More...
 

Detailed Description

SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. SPDX-License-Identifier: Apache-2.0

Typedef Documentation

◆ Node

using rapidsmpf::streaming::Node = typedef coro::task<void>

Alias for a node in a streaming pipeline.

Nodes represent coroutine-based asynchronous operations used throughout the streaming pipeline.

Definition at line 18 of file node.hpp.

Function Documentation

◆ coro_results() [1/2]

template<std::ranges::range Range>
auto rapidsmpf::streaming::coro_results ( Range &&  task_results)

Collect the results of multiple finished coroutines.

This helper consumes a range of coroutine result objects (e.g., from coro::when_all or coro::when_any) and extracts their return values by invoking .return_value() on each element.

  • If the tasks produce a non-void type T, all values are collected into a std::vector<T> and returned.
  • If the tasks return void, the function simply invokes .return_value() on each element to surface any unhandled exceptions and then returns void.
Template Parameters
RangeA range type whose elements support a .return_value() member function. Typically the result of functions and coroutines like coro::when_all and coro::wait_all
Parameters
task_resultsA range of completed coroutine results.
Returns
std::vector<T> if the underlying tasks return a value of type T or void if the underlying tasks return void.
Note
All result types must be the same. If your coroutines produce heterogeneous result types, this helper cannot be used; you must instead extract each result manually by calling .return_value() on each element, or use the tuple form of coro_results.
The return values of libcoro's gather functions such as coro::when_all and coro::wait_all must always be retrieved by calling .return_value() (either directly or via this helper). Failing to do so leaves exceptions unobserved, which can cause the streaming pipeline to deadlock or hang indefinitely while waiting for error propagation.

Definition at line 48 of file coro_utils.hpp.

◆ coro_results() [2/2]

template<typename... Args>
auto rapidsmpf::streaming::coro_results ( std::tuple< Args... > &&  results)

Collect the results of multiple finished coroutines from a tuple.

This overload works with a tuple of coroutine result objects, typically from co_await coro::when_all(...).

  • If the tasks produce non-void types, all values are collected into a std::tuple<T1, T2, ...> and returned.
  • If the tasks return void, the function simply invokes .return_value() on each element to surface any unhandled exceptions and then returns void.
Template Parameters
ArgsTypes of coroutine result objects in the tuple
Parameters
resultsTuple of coroutine result objects to extract values from
Returns
std::tuple<T1, T2, ...> if the underlying tasks return values, or void if all underlying tasks return void.

Definition at line 87 of file coro_utils.hpp.

◆ get_content_description() [1/4]

ContentDescription rapidsmpf::streaming::get_content_description ( PackedDataChunk const &  obj)
inline

Generate a content description for a PackedDataChunk.

Parameters
objThe object's content to describe.
Returns
A new content description.

Definition at line 30 of file packed_data.hpp.

◆ get_content_description() [2/4]

ContentDescription rapidsmpf::streaming::get_content_description ( PartitionMapChunk const &  obj)

Generate a content description for a PartitionMapChunk.

Parameters
objThe object's content to describe.
Returns
A new content description.

◆ get_content_description() [3/4]

ContentDescription rapidsmpf::streaming::get_content_description ( PartitionVectorChunk const &  obj)

Generate a content description for a PartitionVectorChunk.

Parameters
objThe object's content to describe.
Returns
A new content description.

◆ get_content_description() [4/4]

ContentDescription rapidsmpf::streaming::get_content_description ( TableChunk const &  obj)

Generate a content description for a TableChunk.

Parameters
objThe object's content to describe.
Returns
A new content description.

◆ run_streaming_pipeline()

void rapidsmpf::streaming::run_streaming_pipeline ( std::vector< Node nodes)

Runs a list of nodes concurrently and waits for all to complete.

This function schedules each node and blocks until all of them have finished execution. Typically used to launch multiple producer/consumer coroutines in parallel.

Parameters
nodesA vector of nodes to run.

◆ to_message() [1/4]

Message rapidsmpf::streaming::to_message ( std::uint64_t  sequence_number,
std::unique_ptr< PackedDataChunk chunk 
)

Wrap a PackedDataChunk into a Message.

Parameters
sequence_numberOrdering identifier for the message.
chunkThe chunk to wrap into a message.
Returns
A Message encapsulating the provided chunk as its payload.

Definition at line 44 of file packed_data.hpp.

◆ to_message() [2/4]

Message rapidsmpf::streaming::to_message ( std::uint64_t  sequence_number,
std::unique_ptr< PartitionMapChunk chunk 
)

Wrap a PartitionMapChunk into a Message.

Parameters
sequence_numberOrdering identifier for the message.
chunkThe chunk to wrap into a message.
Returns
A Message encapsulating the provided chunk as its payload.

◆ to_message() [3/4]

Message rapidsmpf::streaming::to_message ( std::uint64_t  sequence_number,
std::unique_ptr< PartitionVectorChunk chunk 
)

Wrap a PartitionVectorChunk into a Message.

Parameters
sequence_numberOrdering identifier for the message.
chunkThe chunk to wrap into a message.
Returns
A Message encapsulating the provided chunk as its payload.

◆ to_message() [4/4]

Message rapidsmpf::streaming::to_message ( std::uint64_t  sequence_number,
std::unique_ptr< TableChunk chunk 
)

Wrap a TableChunk into a Message.

Parameters
sequence_numberOrdering identifier for the message.
chunkThe chunk to wrap into a message.
Returns
A Message encapsulating the provided chunk as its payload.