An asynchronous shuffler that allows concurrent insertion and extraction of data. More...
#include <shuffler.hpp>
Public Types | |
| using | ExtractResult = std::pair< shuffler::PartID, std::vector< PackedData > > |
| Result type for extract_any_async operations. More... | |
Public Member Functions | |
| ShufflerAsync (std::shared_ptr< Context > ctx, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin) | |
| Constructs a new ShufflerAsync instance. More... | |
| ShufflerAsync (ShufflerAsync const &)=delete | |
| ShufflerAsync & | operator= (ShufflerAsync const &)=delete |
| constexpr std::shared_ptr< Context > const & | ctx () const |
| Gets the streaming context associated with this shuffler. More... | |
| constexpr shuffler::PartID | total_num_partitions () const |
| Gets the total number of partitions for this shuffle operation. More... | |
| constexpr shuffler::Shuffler::PartitionOwner const & | partition_owner () const |
| Gets the partition owner function used by this shuffler. More... | |
| std::span< shuffler::PartID const > | local_partitions () const |
| Returns the local partition IDs owned by the current node. More... | |
| void | insert (std::unordered_map< shuffler::PartID, PackedData > &&chunks) |
| Insert a bunch of packed (serialized) chunks into the shuffle. More... | |
| Node | insert_finished (std::vector< shuffler::PartID > &&pids) |
| Insert a finish mark for a list of partitions. More... | |
| coro::task< std::optional< std::vector< PackedData > > > | extract_async (shuffler::PartID pid) |
| Asynchronously extracts all data for a specific partition. More... | |
| coro::task< std::optional< ExtractResult > > | extract_any_async () |
| Asynchronously extracts data for any ready partition. More... | |
An asynchronous shuffler that allows concurrent insertion and extraction of data.
ShufflerAsync provides an asynchronous interface to the shuffler, allowing data to be inserted while previously shuffled partitions are extracted concurrently. This is useful for streaming scenarios where data can be processed as soon as individual partitions are ready, rather than waiting for the entire shuffle to complete.
Inserting the finished flags provides a token that one must await to "finalize" extractions. One can asynchronously extract partitions before awaiting this token.
Example usage:
{}
std::nullopt if no more partitions are available. Definition at line 50 of file shuffler.hpp.
| using rapidsmpf::streaming::ShufflerAsync::ExtractResult = std::pair<shuffler::PartID, std::vector<PackedData> > |
Result type for extract_any_async operations.
Contains the partition ID and associated data chunks from an extract operation.
Definition at line 158 of file shuffler.hpp.
| rapidsmpf::streaming::ShufflerAsync::ShufflerAsync | ( | std::shared_ptr< Context > | ctx, |
| OpID | op_id, | ||
| shuffler::PartID | total_num_partitions, | ||
| shuffler::Shuffler::PartitionOwner | partition_owner = shuffler::Shuffler::round_robin |
||
| ) |
Constructs a new ShufflerAsync instance.
| ctx | The streaming context to use. |
| op_id | Unique operation ID for this shuffle. Must not be reused until all participants have completed the shuffle operation. |
| total_num_partitions | Total number of partitions to shuffle data into. |
| partition_owner | Function that maps a partition ID to its owning rank/node. Defaults to round-robin distribution. |
|
inlineconstexpr |
Gets the streaming context associated with this shuffler.
Definition at line 85 of file shuffler.hpp.
| coro::task<std::optional<ExtractResult> > rapidsmpf::streaming::ShufflerAsync::extract_any_async | ( | ) |
Asynchronously extracts data for any ready partition.
This coroutine will suspend until at least one partition is ready for extraction, then extract and return the data for one such partition. If no partitions become ready and the shuffle is finished, returns a nullopt.
ExtractResult containing the partition ID and data chunks, or a nullopt if all partitions has been extracted.extract_async and extract_any_async. A partition intended for extract_async may already have been consumed by extract_any_async, in which case extract_async will later return std::nullopt. | coro::task<std::optional<std::vector<PackedData> > > rapidsmpf::streaming::ShufflerAsync::extract_async | ( | shuffler::PartID | pid | ) |
Asynchronously extracts all data for a specific partition.
This coroutine suspends until the specified partition is ready for extraction (i.e., insert_finished has been called for this partition and all data has been shuffled).
extract_async and extract_any_async. A partition intended for extract_async may already have been consumed by extract_any_async, in which case this function returns std::nullopt.| pid | The partition ID to extract data for. |
std::nullopt if the partition ID is not ready or has already been extracted.PackedData chunks belonging to the partition.| std::out_of_range | If the partition ID isn't owned by this rank, see partition_owner(). |
| void rapidsmpf::streaming::ShufflerAsync::insert | ( | std::unordered_map< shuffler::PartID, PackedData > && | chunks | ) |
Insert a bunch of packed (serialized) chunks into the shuffle.
| chunks | A map of partition IDs and their packed chunks. |
| Node rapidsmpf::streaming::ShufflerAsync::insert_finished | ( | std::vector< shuffler::PartID > && | pids | ) |
Insert a finish mark for a list of partitions.
| pids | The list of partition IDs to mark as finished. |
| std::span<shuffler::PartID const> rapidsmpf::streaming::ShufflerAsync::local_partitions | ( | ) | const |
Returns the local partition IDs owned by the current node.
| comm | The communicator to use. |
| total_num_partitions | Total number of partitions in the shuffle. |
| partition_owner | Function that determines partition ownership. |
|
inlineconstexpr |
Gets the partition owner function used by this shuffler.
Definition at line 104 of file shuffler.hpp.
|
inlineconstexpr |
Gets the total number of partitions for this shuffle operation.
Definition at line 94 of file shuffler.hpp.