Functions | |
| Node | allgather (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, OpID op_id, AllGather::Ordered ordered=AllGather::Ordered::YES) |
| Create an allgather node for a single allgather operation. More... | |
| Node | shuffler (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin) |
| Launches a shuffler node for a single shuffle operation. More... | |
| Node | push_to_channel (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_out, std::vector< Message > messages) |
| Asynchronously pushes all messages from a vector into an output channel. More... | |
| Node | pull_from_channel (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::vector< Message > &out_messages) |
| Asynchronously pulls all messages from an input channel into a vector. More... | |
| Node | read_parquet (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_out, std::size_t num_producers, cudf::io::parquet_reader_options options, cudf::size_type num_rows_per_chunk, std::unique_ptr< Filter > filter=nullptr) |
| Asynchronously read parquet files into an output channel. More... | |
| Node | partition_and_pack (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, std::vector< cudf::size_type > columns_to_hash, int num_partitions, cudf::hash_id hash_function, uint32_t seed) |
| Asynchronously partitions input tables into multiple packed (serialized) tables. More... | |
| Node | unpack_and_concat (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out) |
| Asynchronously unpacks and concatenates packed partitions. More... | |
SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. SPDX-License-Identifier: Apache-2.0
| Node rapidsmpf::streaming::node::allgather | ( | std::shared_ptr< Context > | ctx, |
| std::shared_ptr< Channel > | ch_in, | ||
| std::shared_ptr< Channel > | ch_out, | ||
| OpID | op_id, | ||
| AllGather::Ordered | ordered = AllGather::Ordered::YES |
||
| ) |
Create an allgather node for a single allgather operation.
This is a streaming version of rapidsmpf::allgather::AllGather that operates on packed data received through Channels.
| ctx | The streaming context to use. |
| ch_in | Input channel providing PackedDataChunks to be gathered. |
| ch_out | Output channel where the gathered PackedDataChunks are sent. |
| op_id | Unique identifier for the operation. |
| ordered | If the extracted data should be sent to the output channel with sequence numbers corresponding to the global total order of input chunks. If yes, then the sequence numbers of the extracted data will be ordered first by rank and then by input sequence number. If no, the sequence number of the extracted chunks will have no relation to any input sequence order. |
| Node rapidsmpf::streaming::node::partition_and_pack | ( | std::shared_ptr< Context > | ctx, |
| std::shared_ptr< Channel > | ch_in, | ||
| std::shared_ptr< Channel > | ch_out, | ||
| std::vector< cudf::size_type > | columns_to_hash, | ||
| int | num_partitions, | ||
| cudf::hash_id | hash_function, | ||
| uint32_t | seed | ||
| ) |
Asynchronously partitions input tables into multiple packed (serialized) tables.
This is a streaming version of rapidsmpf::partition_and_split that operates on table chunks using channels.
It receives tables from an input channel, partitions each row into one of num_partitions based on a hash of the selected columns, packs the resulting partitions, and sends them to an output channel.
| ctx | The node context to use. |
| ch_in | Input channel providing TableChunks to partition. |
| ch_out | Output channel to which PartitionMapChunks are sent. |
| columns_to_hash | Indices of input columns to hash. |
| num_partitions | The number of partitions to use. |
| hash_function | Hash function to use for partitioning. |
| seed | Seed value for the hash function. |
| std::out_of_range | if any index in columns_to_hash is invalid. |
| Node rapidsmpf::streaming::node::pull_from_channel | ( | std::shared_ptr< Context > | ctx, |
| std::shared_ptr< Channel > | ch_in, | ||
| std::vector< Message > & | out_messages | ||
| ) |
Asynchronously pulls all messages from an input channel into a vector.
Receives messages from the channel until it is closed and appends them to the provided output vector.
| ctx | The node context to use. |
| ch_in | Input channel providing messages. |
| out_messages | Output vector to store the received messages. |
| Node rapidsmpf::streaming::node::push_to_channel | ( | std::shared_ptr< Context > | ctx, |
| std::shared_ptr< Channel > | ch_out, | ||
| std::vector< Message > | messages | ||
| ) |
Asynchronously pushes all messages from a vector into an output channel.
Sends each message of the input vector into the channel in order, marking the end of the stream once done.
| ctx | The node context to use. |
| ch_out | Output channel to which messages will be sent. |
| messages | Input vector containing the messages to send. |
| std::invalid_argument | if any of the elements in messages is empty. |
| Node rapidsmpf::streaming::node::read_parquet | ( | std::shared_ptr< Context > | ctx, |
| std::shared_ptr< Channel > | ch_out, | ||
| std::size_t | num_producers, | ||
| cudf::io::parquet_reader_options | options, | ||
| cudf::size_type | num_rows_per_chunk, | ||
| std::unique_ptr< Filter > | filter = nullptr |
||
| ) |
Asynchronously read parquet files into an output channel.
read_parquet node appears only on a subset of the ranks named by the communicator, or the options differ between ranks.| ctx | The execution context to use. |
| ch_out | Channel to which TableChunks are sent. |
| num_producers | Number of concurrent producer tasks. |
| options | Template reader options. The files within will be picked apart and used to reconstruct new options for each read chunk. The options should therefore specify the read options "as-if" one were reading the whole input in one go. |
| num_rows_per_chunk | Target (maximum) number of rows any sent TableChunk should have. |
| filter | Optional filter expression to apply to the read. |
| Node rapidsmpf::streaming::node::shuffler | ( | std::shared_ptr< Context > | ctx, |
| std::shared_ptr< Channel > | ch_in, | ||
| std::shared_ptr< Channel > | ch_out, | ||
| OpID | op_id, | ||
| shuffler::PartID | total_num_partitions, | ||
| shuffler::Shuffler::PartitionOwner | partition_owner = shuffler::Shuffler::round_robin |
||
| ) |
Launches a shuffler node for a single shuffle operation.
This is a streaming version of rapidsmpf::shuffler::Shuffler that operates on packed partition chunks using channels.
It consumes partitioned input data from the input channel and produces output chunks grouped by partition_owner.
| ctx | The context to use. |
| ch_in | Input channel providing PartitionMapChunk to be shuffled. |
| ch_out | Output channel where the resulting PartitionVectorChunks are sent. |
| op_id | Unique operation ID for this shuffle. Must not be reused until all nodes have called Shuffler::shutdown(). |
| total_num_partitions | Total number of partitions to shuffle the data into. |
| partition_owner | Function that maps a partition ID to its owning rank/node. |
| Node rapidsmpf::streaming::node::unpack_and_concat | ( | std::shared_ptr< Context > | ctx, |
| std::shared_ptr< Channel > | ch_in, | ||
| std::shared_ptr< Channel > | ch_out | ||
| ) |
Asynchronously unpacks and concatenates packed partitions.
This is a streaming version of rapidsmpf::unpack_and_concat that operates on packed partition chunks using channels.
It receives packed partitions from the input channel, deserializes and concatenates them, and sends the resulting tables to the output channel. Empty partitions are ignored.
| ctx | The node context to use. |
| ch_in | Input channel providing packed partitions as PartitionMapChunk or PartitionVectorChunk. |
| ch_out | Output channel to which unpacked and concatenated tables table are sent. |