Functions
rapidsmpf::streaming::node Namespace Reference

Functions

Node allgather (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, OpID op_id, AllGather::Ordered ordered=AllGather::Ordered::YES)
 Create an allgather node for a single allgather operation. More...
 
Node shuffler (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin)
 Launches a shuffler node for a single shuffle operation. More...
 
Node push_to_channel (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_out, std::vector< Message > messages)
 Asynchronously pushes all messages from a vector into an output channel. More...
 
Node pull_from_channel (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::vector< Message > &out_messages)
 Asynchronously pulls all messages from an input channel into a vector. More...
 
Node read_parquet (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_out, std::size_t num_producers, cudf::io::parquet_reader_options options, cudf::size_type num_rows_per_chunk, std::unique_ptr< Filter > filter=nullptr)
 Asynchronously read parquet files into an output channel. More...
 
Node partition_and_pack (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, std::vector< cudf::size_type > columns_to_hash, int num_partitions, cudf::hash_id hash_function, uint32_t seed)
 Asynchronously partitions input tables into multiple packed (serialized) tables. More...
 
Node unpack_and_concat (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out)
 Asynchronously unpacks and concatenates packed partitions. More...
 

Detailed Description

SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. SPDX-License-Identifier: Apache-2.0

Function Documentation

◆ allgather()

Node rapidsmpf::streaming::node::allgather ( std::shared_ptr< Context ctx,
std::shared_ptr< Channel ch_in,
std::shared_ptr< Channel ch_out,
OpID  op_id,
AllGather::Ordered  ordered = AllGather::Ordered::YES 
)

Create an allgather node for a single allgather operation.

This is a streaming version of rapidsmpf::allgather::AllGather that operates on packed data received through Channels.

Parameters
ctxThe streaming context to use.
ch_inInput channel providing PackedDataChunks to be gathered.
ch_outOutput channel where the gathered PackedDataChunks are sent.
op_idUnique identifier for the operation.
orderedIf the extracted data should be sent to the output channel with sequence numbers corresponding to the global total order of input chunks. If yes, then the sequence numbers of the extracted data will be ordered first by rank and then by input sequence number. If no, the sequence number of the extracted chunks will have no relation to any input sequence order.
Returns
A streaming node that completes when the allgather is finished and the output channel is drained.

◆ partition_and_pack()

Node rapidsmpf::streaming::node::partition_and_pack ( std::shared_ptr< Context ctx,
std::shared_ptr< Channel ch_in,
std::shared_ptr< Channel ch_out,
std::vector< cudf::size_type >  columns_to_hash,
int  num_partitions,
cudf::hash_id  hash_function,
uint32_t  seed 
)

Asynchronously partitions input tables into multiple packed (serialized) tables.

This is a streaming version of rapidsmpf::partition_and_split that operates on table chunks using channels.

It receives tables from an input channel, partitions each row into one of num_partitions based on a hash of the selected columns, packs the resulting partitions, and sends them to an output channel.

Parameters
ctxThe node context to use.
ch_inInput channel providing TableChunks to partition.
ch_outOutput channel to which PartitionMapChunks are sent.
columns_to_hashIndices of input columns to hash.
num_partitionsThe number of partitions to use.
hash_functionHash function to use for partitioning.
seedSeed value for the hash function.
Returns
Streaming node representing the asynchronous partitioning and packing operation.
Exceptions
std::out_of_rangeif any index in columns_to_hash is invalid.
See also
rapidsmpf::partition_and_split

◆ pull_from_channel()

Node rapidsmpf::streaming::node::pull_from_channel ( std::shared_ptr< Context ctx,
std::shared_ptr< Channel ch_in,
std::vector< Message > &  out_messages 
)

Asynchronously pulls all messages from an input channel into a vector.

Receives messages from the channel until it is closed and appends them to the provided output vector.

Parameters
ctxThe node context to use.
ch_inInput channel providing messages.
out_messagesOutput vector to store the received messages.
Returns
Streaming node representing the asynchronous operation.

◆ push_to_channel()

Node rapidsmpf::streaming::node::push_to_channel ( std::shared_ptr< Context ctx,
std::shared_ptr< Channel ch_out,
std::vector< Message messages 
)

Asynchronously pushes all messages from a vector into an output channel.

Sends each message of the input vector into the channel in order, marking the end of the stream once done.

Parameters
ctxThe node context to use.
ch_outOutput channel to which messages will be sent.
messagesInput vector containing the messages to send.
Returns
Streaming node representing the asynchronous operation.
Exceptions
std::invalid_argumentif any of the elements in messages is empty.

◆ read_parquet()

Node rapidsmpf::streaming::node::read_parquet ( std::shared_ptr< Context ctx,
std::shared_ptr< Channel ch_out,
std::size_t  num_producers,
cudf::io::parquet_reader_options  options,
cudf::size_type  num_rows_per_chunk,
std::unique_ptr< Filter filter = nullptr 
)

Asynchronously read parquet files into an output channel.

Note
This is a collective operation, all ranks named by the execution context's communicator will participate. All ranks must specify the same set of options. Behaviour is undefined if a read_parquet node appears only on a subset of the ranks named by the communicator, or the options differ between ranks.
Parameters
ctxThe execution context to use.
ch_outChannel to which TableChunks are sent.
num_producersNumber of concurrent producer tasks.
optionsTemplate reader options. The files within will be picked apart and used to reconstruct new options for each read chunk. The options should therefore specify the read options "as-if" one were reading the whole input in one go.
num_rows_per_chunkTarget (maximum) number of rows any sent TableChunk should have.
filterOptional filter expression to apply to the read.
Returns
Streaming node representing the asynchronous read.

◆ shuffler()

Node rapidsmpf::streaming::node::shuffler ( std::shared_ptr< Context ctx,
std::shared_ptr< Channel ch_in,
std::shared_ptr< Channel ch_out,
OpID  op_id,
shuffler::PartID  total_num_partitions,
shuffler::Shuffler::PartitionOwner  partition_owner = shuffler::Shuffler::round_robin 
)

Launches a shuffler node for a single shuffle operation.

This is a streaming version of rapidsmpf::shuffler::Shuffler that operates on packed partition chunks using channels.

It consumes partitioned input data from the input channel and produces output chunks grouped by partition_owner.

Parameters
ctxThe context to use.
ch_inInput channel providing PartitionMapChunk to be shuffled.
ch_outOutput channel where the resulting PartitionVectorChunks are sent.
op_idUnique operation ID for this shuffle. Must not be reused until all nodes have called Shuffler::shutdown().
total_num_partitionsTotal number of partitions to shuffle the data into.
partition_ownerFunction that maps a partition ID to its owning rank/node.
Returns
A streaming node that completes when the shuffling has finished and the output channel is drained.

◆ unpack_and_concat()

Node rapidsmpf::streaming::node::unpack_and_concat ( std::shared_ptr< Context ctx,
std::shared_ptr< Channel ch_in,
std::shared_ptr< Channel ch_out 
)

Asynchronously unpacks and concatenates packed partitions.

This is a streaming version of rapidsmpf::unpack_and_concat that operates on packed partition chunks using channels.

It receives packed partitions from the input channel, deserializes and concatenates them, and sends the resulting tables to the output channel. Empty partitions are ignored.

Parameters
ctxThe node context to use.
ch_inInput channel providing packed partitions as PartitionMapChunk or PartitionVectorChunk.
ch_outOutput channel to which unpacked and concatenated tables table are sent.
Returns
Streaming node representing the asynchronous unpacking and concatenation operation.
See also
rapidsmpf::unpack_and_concat