Namespaces | Classes | Typedefs | Functions
rapidsmpf::streaming Namespace Reference

Namespaces

 actor
 

Classes

struct  PartitionMapChunk
 Chunk of packed partitions identified by partition ID. More...
 
struct  PartitionVectorChunk
 Chunk of packed partitions stored as a vector. More...
 
class  AllGather
 Asynchronous (coroutine) interface to coll::AllGather. More...
 
class  ShufflerAsync
 An asynchronous shuffler that allows concurrent insertion and extraction of data. More...
 
class  Channel
 A coroutine-based channel for sending and receiving messages asynchronously. More...
 
class  ThrottlingAdaptor
 An adaptor to throttle access to a channel. More...
 
class  ShutdownAtExit
 Helper RAII class to shut down channels when they go out of scope. More...
 
class  Context
 Context for actors (coroutines) in rapidsmpf. More...
 
class  CoroThreadPoolExecutor
 Executor wrapper around a coro::thread_pool used for coroutine execution. More...
 
class  Lineariser
 Linearise insertion into an output channel from a fixed number of producers by sequence number. More...
 
class  MemoryReserveOrWait
 Asynchronous coordinator for memory reservation requests. More...
 
class  Message
 Type-erased message wrapper around a payload. More...
 
class  BoundedQueue
 A bounded queue for type-erased Messages. More...
 
class  SpillableMessages
 Container for individually spillable messages. More...
 
struct  BloomFilter
 Utility managing construction and use of a bloom filter. More...
 
struct  HashScheme
 Hash partitioning scheme. More...
 
struct  PartitioningSpec
 Partitioning specification for a single hierarchical level. More...
 
struct  Partitioning
 Hierarchical partitioning metadata for a data stream. More...
 
struct  ChannelMetadata
 Channel-level metadata describing the data stream. More...
 
struct  Filter
 Filter ast expression with lifetime/stream management. More...
 
class  TableChunk
 A unit of table data in a streaming pipeline. More...
 

Typedefs

using Actor = coro::task< void >
 Alias for an actor in a streaming graph. More...
 
using Semaphore = coro::semaphore< std::numeric_limits< std::ptrdiff_t >::max()>
 An awaitable semaphore to manage acquisition and release of finite resources.
 

Functions

ContentDescription get_content_description (PackedData const &obj)
 Generate a content description for PackedData. More...
 
Message to_message (std::uint64_t sequence_number, std::unique_ptr< PackedData > chunk)
 Wrap PackedData into a Message. More...
 
ContentDescription get_content_description (PartitionMapChunk const &obj)
 Generate a content description for a PartitionMapChunk. More...
 
ContentDescription get_content_description (PartitionVectorChunk const &obj)
 Generate a content description for a PartitionVectorChunk. More...
 
Message to_message (std::uint64_t sequence_number, std::unique_ptr< PartitionMapChunk > chunk)
 Wrap a PartitionMapChunk into a Message. More...
 
Message to_message (std::uint64_t sequence_number, std::unique_ptr< PartitionVectorChunk > chunk)
 Wrap a PartitionVectorChunk into a Message. More...
 
void run_actor_network (std::vector< Actor > actors)
 Runs a list of actors concurrently and waits for all to complete. More...
 
template<std::ranges::range Range>
auto coro_results (Range &&task_results)
 Collect the results of multiple finished coroutines. More...
 
template<typename... Args>
auto coro_results (std::tuple< Args... > &&results)
 Collect the results of multiple finished coroutines from a tuple. More...
 
coro::task< MemoryReservationreserve_memory (std::shared_ptr< Context > ctx, std::size_t size, std::int64_t net_memory_delta, MemoryType mem_type=MemoryType::DEVICE, std::optional< AllowOverbooking > allow_overbooking=std::nullopt)
 Reserve memory using the context memory reservation mechanism. More...
 
Message to_message (std::uint64_t sequence_number, std::unique_ptr< ChannelMetadata > m)
 Wrap a ChannelMetadata into a Message. More...
 
ContentDescription get_content_description (TableChunk const &obj)
 Generate a content description for a TableChunk. More...
 
Message to_message (std::uint64_t sequence_number, std::unique_ptr< TableChunk > chunk)
 Wrap a TableChunk into a Message. More...
 

Detailed Description

SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. SPDX-License-Identifier: Apache-2.0

SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. SPDX-License-Identifier: Apache-2.0

SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. SPDX-License-Identifier: Apache-2.0

Typedef Documentation

◆ Actor

using rapidsmpf::streaming::Actor = typedef coro::task<void>

Alias for an actor in a streaming graph.

Actors represent coroutine-based asynchronous operations used throughout the streaming graph.

Definition at line 18 of file actor.hpp.

Function Documentation

◆ coro_results() [1/2]

template<std::ranges::range Range>
auto rapidsmpf::streaming::coro_results ( Range &&  task_results)

Collect the results of multiple finished coroutines.

This helper consumes a range of coroutine result objects (e.g., from coro::when_all or coro::when_any) and extracts their return values by invoking .return_value() on each element.

  • If the tasks produce a non-void type T, all values are collected into a std::vector<T> and returned.
  • If the tasks return void, the function simply invokes .return_value() on each element to surface any unhandled exceptions and then returns void.
Template Parameters
RangeA range type whose elements support a .return_value() member function. Typically the result of functions and coroutines like coro::when_all and coro::wait_all
Parameters
task_resultsA range of completed coroutine results.
Returns
std::vector<T> if the underlying tasks return a value of type T or void if the underlying tasks return void.
Note
All result types must be the same. If your coroutines produce heterogeneous result types, this helper cannot be used; you must instead extract each result manually by calling .return_value() on each element, or use the tuple form of coro_results.
The return values of libcoro's gather functions such as coro::when_all and coro::wait_all must always be retrieved by calling .return_value() (either directly or via this helper). Failing to do so leaves exceptions unobserved, which can cause the streaming pipeline to deadlock or hang indefinitely while waiting for error propagation.

Definition at line 48 of file coro_utils.hpp.

◆ coro_results() [2/2]

template<typename... Args>
auto rapidsmpf::streaming::coro_results ( std::tuple< Args... > &&  results)

Collect the results of multiple finished coroutines from a tuple.

This overload works with a tuple of coroutine result objects, typically from co_await coro::when_all(...).

  • If the tasks produce non-void types, all values are collected into a std::tuple<T1, T2, ...> and returned.
  • If the tasks return void, the function simply invokes .return_value() on each element to surface any unhandled exceptions and then returns void.
Template Parameters
ArgsTypes of coroutine result objects in the tuple
Parameters
resultsTuple of coroutine result objects to extract values from
Returns
std::tuple<T1, T2, ...> if the underlying tasks return values, or void if all underlying tasks return void.

Definition at line 87 of file coro_utils.hpp.

◆ get_content_description() [1/4]

ContentDescription rapidsmpf::streaming::get_content_description ( PackedData const &  obj)
inline

Generate a content description for PackedData.

Parameters
objThe object's content to describe.
Returns
A new content description.

Definition at line 20 of file packed_data.hpp.

◆ get_content_description() [2/4]

ContentDescription rapidsmpf::streaming::get_content_description ( PartitionMapChunk const &  obj)

Generate a content description for a PartitionMapChunk.

Parameters
objThe object's content to describe.
Returns
A new content description.

◆ get_content_description() [3/4]

ContentDescription rapidsmpf::streaming::get_content_description ( PartitionVectorChunk const &  obj)

Generate a content description for a PartitionVectorChunk.

Parameters
objThe object's content to describe.
Returns
A new content description.

◆ get_content_description() [4/4]

ContentDescription rapidsmpf::streaming::get_content_description ( TableChunk const &  obj)

Generate a content description for a TableChunk.

Parameters
objThe object's content to describe.
Returns
A new content description.

◆ reserve_memory()

coro::task<MemoryReservation> rapidsmpf::streaming::reserve_memory ( std::shared_ptr< Context ctx,
std::size_t  size,
std::int64_t  net_memory_delta,
MemoryType  mem_type = MemoryType::DEVICE,
std::optional< AllowOverbooking allow_overbooking = std::nullopt 
)

Reserve memory using the context memory reservation mechanism.

Submits a memory reservation request for the configured memory type and suspends until the request is satisfied. If no pending reservation request can be satisfied within the configured "memory_reserve_timeout", the behavior depends on allow_overbooking.

This is a convenience helper that returns only the MemoryReservation. If more control is required, for example inspecting the amount of overbooking, callers should use MemoryReserveOrWait directly, such as ctx.memory(MemoryType::DEVICE).reserve_or_wait_or_overbook(size, net_memory_delta).

Priority and progress semantics are identical to MemoryReserveOrWait::reserve_or_wait(). In particular, net_memory_delta is used as a heuristic to prefer eligible requests that are expected to reduce memory pressure sooner. Smaller values have higher priority.

Parameters
ctxActor context used to obtain the memory reservation handle.
sizeNumber of bytes to reserve.
net_memory_deltaEstimated net change in memory usage after the reservation is allocated and the dependent operation completes. Smaller values have higher priority.
mem_typeMemory type for which to reserve memory.
allow_overbookingControls the behavior when no progress is possible within the configured timeout:
  • If set to AllowOverbooking::YES, the call may overbook memory when forcing progress.
  • If set to AllowOverbooking::NO, the call fails if no progress is possible.
  • If not provided, the default behavior is determined by the configuration option "allow_overbooking_by_default".
Returns
The allocated memory reservation.
Exceptions
std::runtime_errorIf shutdown occurs before the request can be processed.
rapidsmpf::reservation_errorIf no progress is possible within the timeout and allow_overbooking resolves to AllowOverbooking::NO.
// Reserve memory inside an actor:
auto res = co_await reserve_memory(
ctx,
1024,
0, // net_memory_delta
);
EXPECT_EQ(res.size(), 1024);
// Disable overbooking and fail if no progress is possible:
auto res2 = co_await reserve_memory(
ctx,
2048,
0, // net_memory_delta
);
coro::task< MemoryReservation > reserve_memory(std::shared_ptr< Context > ctx, std::size_t size, std::int64_t net_memory_delta, MemoryType mem_type=MemoryType::DEVICE, std::optional< AllowOverbooking > allow_overbooking=std::nullopt)
Reserve memory using the context memory reservation mechanism.
@ YES
Overbooking is allowed.
@ NO
Overbooking is not allowed.
@ DEVICE
Device memory.
See also
MemoryReserveOrWait::reserve_or_wait()
MemoryReserveOrWait::reserve_or_wait_or_overbook()
MemoryReserveOrWait::reserve_or_wait_or_fail()

◆ run_actor_network()

void rapidsmpf::streaming::run_actor_network ( std::vector< Actor actors)

Runs a list of actors concurrently and waits for all to complete.

This function schedules each actor and blocks until all of them have finished execution. Typically used to launch multiple producer/consumer coroutines in parallel.

Parameters
actorsA vector of actors to run.

◆ to_message() [1/5]

Message rapidsmpf::streaming::to_message ( std::uint64_t  sequence_number,
std::unique_ptr< ChannelMetadata m 
)

Wrap a ChannelMetadata into a Message.

Parameters
sequence_numberOrdering identifier for the message.
mThe metadata to wrap.
Returns
A Message encapsulating the metadata as its payload.

◆ to_message() [2/5]

Message rapidsmpf::streaming::to_message ( std::uint64_t  sequence_number,
std::unique_ptr< PackedData chunk 
)

Wrap PackedData into a Message.

Parameters
sequence_numberOrdering identifier for the message.
chunkThe chunk to wrap into a message.
Returns
A Message encapsulating the provided chunk as its payload.

Definition at line 33 of file packed_data.hpp.

◆ to_message() [3/5]

Message rapidsmpf::streaming::to_message ( std::uint64_t  sequence_number,
std::unique_ptr< PartitionMapChunk chunk 
)

Wrap a PartitionMapChunk into a Message.

Parameters
sequence_numberOrdering identifier for the message.
chunkThe chunk to wrap into a message.
Returns
A Message encapsulating the provided chunk as its payload.

◆ to_message() [4/5]

Message rapidsmpf::streaming::to_message ( std::uint64_t  sequence_number,
std::unique_ptr< PartitionVectorChunk chunk 
)

Wrap a PartitionVectorChunk into a Message.

Parameters
sequence_numberOrdering identifier for the message.
chunkThe chunk to wrap into a message.
Returns
A Message encapsulating the provided chunk as its payload.

◆ to_message() [5/5]

Message rapidsmpf::streaming::to_message ( std::uint64_t  sequence_number,
std::unique_ptr< TableChunk chunk 
)

Wrap a TableChunk into a Message.

Parameters
sequence_numberOrdering identifier for the message.
chunkThe chunk to wrap into a message.
Returns
A Message encapsulating the provided chunk as its payload.