Public Types | Public Member Functions | List of all members
rapidsmpf::streaming::ShufflerAsync Class Reference

An asynchronous shuffler that allows concurrent insertion and extraction of data. More...

#include <shuffler.hpp>

Public Types

using ExtractResult = std::pair< shuffler::PartID, std::vector< PackedData > >
 Result type for extract_any_async operations. More...
 

Public Member Functions

 ShufflerAsync (std::shared_ptr< Context > ctx, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin)
 Constructs a new ShufflerAsync instance. More...
 
 ShufflerAsync (ShufflerAsync const &)=delete
 
ShufflerAsyncoperator= (ShufflerAsync const &)=delete
 
constexpr std::shared_ptr< Context > const & ctx () const
 Gets the streaming context associated with this shuffler. More...
 
constexpr shuffler::PartID total_num_partitions () const
 Gets the total number of partitions for this shuffle operation. More...
 
constexpr shuffler::Shuffler::PartitionOwner const & partition_owner () const
 Gets the partition owner function used by this shuffler. More...
 
std::span< shuffler::PartID const > local_partitions () const
 Returns the local partition IDs owned by the current node. More...
 
void insert (std::unordered_map< shuffler::PartID, PackedData > &&chunks)
 Insert a bunch of packed (serialized) chunks into the shuffle. More...
 
Node insert_finished (std::vector< shuffler::PartID > &&pids)
 Insert a finish mark for a list of partitions. More...
 
coro::task< std::optional< std::vector< PackedData > > > extract_async (shuffler::PartID pid)
 Asynchronously extracts all data for a specific partition. More...
 
coro::task< std::optional< ExtractResult > > extract_any_async ()
 Asynchronously extracts data for any ready partition. More...
 

Detailed Description

An asynchronous shuffler that allows concurrent insertion and extraction of data.

ShufflerAsync provides an asynchronous interface to the shuffler, allowing data to be inserted while previously shuffled partitions are extracted concurrently. This is useful for streaming scenarios where data can be processed as soon as individual partitions are ready, rather than waiting for the entire shuffle to complete.

Inserting the finished flags provides a token that one must await to "finalize" extractions. One can asynchronously extract partitions before awaiting this token.

Warning
The finish token must be awaited otherwise the shuffle will throw in destruction or deadlocks will occur.

Example usage:

auto shuffle = ShufflerAsync(...);
while (...) {
shuffle.insert(...);
}
auto finished_token = shuffle.insert_finished(...);
for (auto i = 0; i < shuffle.local_partitions().size(); i++) {
auto part = co_await shuffle.extract_any_async();
}
co_await finished_token;
ShufflerAsync(std::shared_ptr< Context > ctx, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin)
Constructs a new ShufflerAsync instance.

{}

Note
One can launch more extraction tasks than there are partitions to extract, for example if we have multiple consumers of a shuffle, the extraction will return std::nullopt if no more partitions are available.

Definition at line 50 of file shuffler.hpp.

Member Typedef Documentation

◆ ExtractResult

Result type for extract_any_async operations.

Contains the partition ID and associated data chunks from an extract operation.

Definition at line 158 of file shuffler.hpp.

Constructor & Destructor Documentation

◆ ShufflerAsync()

rapidsmpf::streaming::ShufflerAsync::ShufflerAsync ( std::shared_ptr< Context ctx,
OpID  op_id,
shuffler::PartID  total_num_partitions,
shuffler::Shuffler::PartitionOwner  partition_owner = shuffler::Shuffler::round_robin 
)

Constructs a new ShufflerAsync instance.

Parameters
ctxThe streaming context to use.
op_idUnique operation ID for this shuffle. Must not be reused until all participants have completed the shuffle operation.
total_num_partitionsTotal number of partitions to shuffle data into.
partition_ownerFunction that maps a partition ID to its owning rank/node. Defaults to round-robin distribution.
Note
The caller promises that inserted buffers are stream-ordered with respect to their own stream, and extracted buffers are likewise guaranteed to be stream- ordered with respect to their own stream.

Member Function Documentation

◆ ctx()

constexpr std::shared_ptr<Context> const& rapidsmpf::streaming::ShufflerAsync::ctx ( ) const
inlineconstexpr

Gets the streaming context associated with this shuffler.

Returns
A reference to the shared context object.

Definition at line 85 of file shuffler.hpp.

◆ extract_any_async()

coro::task<std::optional<ExtractResult> > rapidsmpf::streaming::ShufflerAsync::extract_any_async ( )

Asynchronously extracts data for any ready partition.

This coroutine will suspend until at least one partition is ready for extraction, then extract and return the data for one such partition. If no partitions become ready and the shuffle is finished, returns a nullopt.

Returns
ExtractResult containing the partition ID and data chunks, or a nullopt if all partitions has been extracted.
Warning
Be careful when mixing extract_async and extract_any_async. A partition intended for extract_async may already have been consumed by extract_any_async, in which case extract_async will later return std::nullopt.

◆ extract_async()

coro::task<std::optional<std::vector<PackedData> > > rapidsmpf::streaming::ShufflerAsync::extract_async ( shuffler::PartID  pid)

Asynchronously extracts all data for a specific partition.

This coroutine suspends until the specified partition is ready for extraction (i.e., insert_finished has been called for this partition and all data has been shuffled).

Warning
Be careful when mixing extract_async and extract_any_async. A partition intended for extract_async may already have been consumed by extract_any_async, in which case this function returns std::nullopt.
Parameters
pidThe partition ID to extract data for.
Returns
  • std::nullopt if the partition ID is not ready or has already been extracted.
  • Otherwise, a vector of PackedData chunks belonging to the partition.
Exceptions
std::out_of_rangeIf the partition ID isn't owned by this rank, see partition_owner().

◆ insert()

void rapidsmpf::streaming::ShufflerAsync::insert ( std::unordered_map< shuffler::PartID, PackedData > &&  chunks)

Insert a bunch of packed (serialized) chunks into the shuffle.

Parameters
chunksA map of partition IDs and their packed chunks.

◆ insert_finished()

Node rapidsmpf::streaming::ShufflerAsync::insert_finished ( std::vector< shuffler::PartID > &&  pids)

Insert a finish mark for a list of partitions.

Parameters
pidsThe list of partition IDs to mark as finished.
Note
This function itself is not a coroutine. Instead, it returns a coroutine that must be awaited to ensure the shuffler has fully completed its asynchronous operations. Awaiting this coroutine guarantees that all notifications and background tasks in the underlying shuffler have finished before destruction. The coroutine does not need to be awaited before extraction begins, but it must eventually be awaited before the shuffle object is destroyed. Any pending extractions will wake up and either extract remaining partitions or return empty results if none remain.
Returns
A coroutine that, when awaited, indicates the shuffle has completed.

◆ local_partitions()

std::span<shuffler::PartID const> rapidsmpf::streaming::ShufflerAsync::local_partitions ( ) const

Returns the local partition IDs owned by the current node.

Parameters
commThe communicator to use.
total_num_partitionsTotal number of partitions in the shuffle.
partition_ownerFunction that determines partition ownership.
Returns
A vector of partition IDs owned by the current node.

◆ partition_owner()

constexpr shuffler::Shuffler::PartitionOwner const& rapidsmpf::streaming::ShufflerAsync::partition_owner ( ) const
inlineconstexpr

Gets the partition owner function used by this shuffler.

Returns
A const reference to the function that maps partition IDs to owning ranks.

Definition at line 104 of file shuffler.hpp.

◆ total_num_partitions()

constexpr shuffler::PartID rapidsmpf::streaming::ShufflerAsync::total_num_partitions ( ) const
inlineconstexpr

Gets the total number of partitions for this shuffle operation.

Returns
The total number of partitions that data will be shuffled into.

Definition at line 94 of file shuffler.hpp.


The documentation for this class was generated from the following file: