Enumerations | Functions
rapidsmpf::streaming::actor Namespace Reference

Enumerations

enum class  FanoutPolicy : std::uint8_t { BOUNDED , UNBOUNDED }
 Fanout policy controlling how messages are propagated. More...
 

Functions

Actor allgather (std::shared_ptr< Context > ctx, std::shared_ptr< Communicator > comm, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, OpID op_id, AllGather::Ordered ordered=AllGather::Ordered::YES)
 Create an allgather actor for a single allgather operation. More...
 
Actor shuffler (std::shared_ptr< Context > ctx, std::shared_ptr< Communicator > comm, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin)
 Launches a shuffler actor for a single shuffle operation. More...
 
Actor fanout (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::vector< std::shared_ptr< Channel >> chs_out, FanoutPolicy policy)
 Broadcast messages from one input channel to multiple output channels. More...
 
Actor push_to_channel (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_out, std::vector< Message > messages)
 Asynchronously pushes all messages from a vector into an output channel. More...
 
Actor pull_from_channel (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::vector< Message > &out_messages)
 Asynchronously pulls all messages from an input channel into a vector. More...
 
Actor read_parquet (std::shared_ptr< Context > ctx, std::shared_ptr< Communicator > comm, std::shared_ptr< Channel > ch_out, std::size_t num_producers, cudf::io::parquet_reader_options options, cudf::size_type num_rows_per_chunk, std::unique_ptr< Filter > filter=nullptr)
 Asynchronously read parquet files into an output channel. More...
 
Actor partition_and_pack (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, std::vector< cudf::size_type > columns_to_hash, int num_partitions, cudf::hash_id hash_function, std::uint32_t seed)
 Asynchronously partitions input tables into multiple packed (serialized) tables. More...
 
Actor unpack_and_concat (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out)
 Asynchronously unpacks and concatenates packed partitions. More...
 

Detailed Description

SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. SPDX-License-Identifier: Apache-2.0

Enumeration Type Documentation

◆ FanoutPolicy

enum rapidsmpf::streaming::actor::FanoutPolicy : std::uint8_t
strong

Fanout policy controlling how messages are propagated.

Enumerator
BOUNDED 

Process messages as they arrive and immediately forward them.

Messages are forwarded as soon as they are received from the input channel. The next message is not processed until all output channels have completed sending the current one, ensuring backpressure and synchronized flow.

UNBOUNDED 

Forward messages without enforcing backpressure.

In this mode, messages may be accumulated internally before being broadcast, or they may be forwarded immediately depending on the implementation and downstream consumption rate.

This mode disables coordinated backpressure between outputs, allowing consumers to process at independent rates, but can lead to unbounded buffering and increased memory usage.

Definition at line 17 of file fanout.hpp.

Function Documentation

◆ allgather()

Actor rapidsmpf::streaming::actor::allgather ( std::shared_ptr< Context ctx,
std::shared_ptr< Communicator comm,
std::shared_ptr< Channel ch_in,
std::shared_ptr< Channel ch_out,
OpID  op_id,
AllGather::Ordered  ordered = AllGather::Ordered::YES 
)

Create an allgather actor for a single allgather operation.

This is a streaming version of rapidsmpf::coll::AllGather that operates on packed data received through Channels.

Parameters
ctxThe streaming context to use.
commCommunicator for the collective operation.
ch_inInput channel providing PackedDatas to be gathered.
ch_outOutput channel where the gathered PackedDatas are sent.
op_idUnique identifier for the operation.
orderedIf the extracted data should be sent to the output channel with sequence numbers corresponding to the global total order of input chunks. If yes, then the sequence numbers of the extracted data will be ordered first by rank and then by input sequence number. If no, the sequence number of the extracted chunks will have no relation to any input sequence order.
Returns
A streaming actor that completes when the allgather is finished and the output channel is drained.

◆ fanout()

Actor rapidsmpf::streaming::actor::fanout ( std::shared_ptr< Context ctx,
std::shared_ptr< Channel ch_in,
std::vector< std::shared_ptr< Channel >>  chs_out,
FanoutPolicy  policy 
)

Broadcast messages from one input channel to multiple output channels.

The actor continuously receives messages from the input channel and forwards them to all output channels according to the selected fanout policy, see FanoutPolicy.

Each output channel receives a deep copy of the same message.

Parameters
ctxThe actor context to use.
ch_inInput channel from which messages are received.
chs_outOutput channels to which messages are broadcast. Must be at least 2.
policyThe fanout strategy to use (see FanoutPolicy).
Returns
Streaming actor representing the fanout operation.
Exceptions
std::invalid_argumentIf an unknown fanout policy is specified or if the number of output channels is less than 2.

◆ partition_and_pack()

Actor rapidsmpf::streaming::actor::partition_and_pack ( std::shared_ptr< Context ctx,
std::shared_ptr< Channel ch_in,
std::shared_ptr< Channel ch_out,
std::vector< cudf::size_type >  columns_to_hash,
int  num_partitions,
cudf::hash_id  hash_function,
std::uint32_t  seed 
)

Asynchronously partitions input tables into multiple packed (serialized) tables.

This is a streaming version of rapidsmpf::partition_and_split that operates on table chunks using channels.

It receives tables from an input channel, partitions each row into one of num_partitions based on a hash of the selected columns, packs the resulting partitions, and sends them to an output channel.

Parameters
ctxThe actor context to use.
ch_inInput channel providing TableChunks to partition.
ch_outOutput channel to which PartitionMapChunks are sent.
columns_to_hashIndices of input columns to hash.
num_partitionsThe number of partitions to use.
hash_functionHash function to use for partitioning.
seedSeed value for the hash function.
Returns
Streaming actor representing the asynchronous partitioning and packing operation.
Exceptions
std::out_of_rangeif any index in columns_to_hash is invalid.
See also
rapidsmpf::partition_and_split

◆ pull_from_channel()

Actor rapidsmpf::streaming::actor::pull_from_channel ( std::shared_ptr< Context ctx,
std::shared_ptr< Channel ch_in,
std::vector< Message > &  out_messages 
)

Asynchronously pulls all messages from an input channel into a vector.

Receives messages from the channel until it is closed and appends them to the provided output vector.

Parameters
ctxThe actor context to use.
ch_inInput channel providing messages.
out_messagesOutput vector to store the received messages.
Returns
Streaming actor representing the asynchronous operation.

◆ push_to_channel()

Actor rapidsmpf::streaming::actor::push_to_channel ( std::shared_ptr< Context ctx,
std::shared_ptr< Channel ch_out,
std::vector< Message messages 
)

Asynchronously pushes all messages from a vector into an output channel.

Sends each message of the input vector into the channel in order, marking the end of the stream once done.

Parameters
ctxThe actor context to use.
ch_outOutput channel to which messages will be sent.
messagesInput vector containing the messages to send.
Returns
Streaming actor representing the asynchronous operation.
Exceptions
std::invalid_argumentif any of the elements in messages is empty.

◆ read_parquet()

Actor rapidsmpf::streaming::actor::read_parquet ( std::shared_ptr< Context ctx,
std::shared_ptr< Communicator comm,
std::shared_ptr< Channel ch_out,
std::size_t  num_producers,
cudf::io::parquet_reader_options  options,
cudf::size_type  num_rows_per_chunk,
std::unique_ptr< Filter filter = nullptr 
)

Asynchronously read parquet files into an output channel.

Note
This is a collective operation, all ranks named by the execution context's communicator will participate. All ranks must specify the same set of options. Behaviour is undefined if a read_parquet actor appears only on a subset of the ranks named by the communicator, or the options differ between ranks.
Parameters
ctxThe execution context to use.
commCommunicator for distributing files across ranks.
ch_outChannel to which TableChunks are sent.
num_producersNumber of concurrent producer tasks.
optionsTemplate reader options. The files within will be picked apart and used to reconstruct new options for each read chunk. The options should therefore specify the read options "as-if" one were reading the whole input in one go.
num_rows_per_chunkTarget (maximum) number of rows any sent TableChunk should have.
filterOptional filter expression to apply to the read.
Returns
Streaming actor representing the asynchronous read.

◆ shuffler()

Actor rapidsmpf::streaming::actor::shuffler ( std::shared_ptr< Context ctx,
std::shared_ptr< Communicator comm,
std::shared_ptr< Channel ch_in,
std::shared_ptr< Channel ch_out,
OpID  op_id,
shuffler::PartID  total_num_partitions,
shuffler::Shuffler::PartitionOwner  partition_owner = shuffler::Shuffler::round_robin 
)

Launches a shuffler actor for a single shuffle operation.

This is a streaming version of rapidsmpf::shuffler::Shuffler that operates on packed partition chunks using channels.

It consumes partitioned input data from the input channel and produces output chunks grouped by partition_owner.

Parameters
ctxThe context to use.
commCommunicator for the collective operation.
ch_inInput channel providing PartitionMapChunk to be shuffled.
ch_outOutput channel where the resulting PartitionVectorChunks are sent.
op_idUnique operation ID for this shuffle. Must not be reused until all actors have called Shuffler::shutdown().
total_num_partitionsTotal number of partitions to shuffle the data into.
partition_ownerFunction that maps a partition ID to its owning rank/node.
Returns
A streaming actor that completes when the shuffling has finished and the output channel is drained.

◆ unpack_and_concat()

Actor rapidsmpf::streaming::actor::unpack_and_concat ( std::shared_ptr< Context ctx,
std::shared_ptr< Channel ch_in,
std::shared_ptr< Channel ch_out 
)

Asynchronously unpacks and concatenates packed partitions.

This is a streaming version of rapidsmpf::unpack_and_concat that operates on packed partition chunks using channels.

It receives packed partitions from the input channel, deserializes and concatenates them, and sends the resulting tables to the output channel. Empty partitions are ignored.

Parameters
ctxThe actor context to use.
ch_inInput channel providing packed partitions as PartitionMapChunk or PartitionVectorChunk.
ch_outOutput channel to which unpacked and concatenated tables table are sent.
Returns
Streaming actor representing the asynchronous unpacking and concatenation operation.
See also
rapidsmpf::unpack_and_concat