Public Member Functions | List of all members
rapidsmpf::streaming::ShufflerAsync Class Reference

An asynchronous shuffler that wraps the synchronous shuffler with a coroutine interface. More...

#include <shuffler.hpp>

Public Member Functions

 ShufflerAsync (std::shared_ptr< Context > ctx, std::shared_ptr< Communicator > comm, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin)
 Constructs a new ShufflerAsync instance. More...
 
 ShufflerAsync (ShufflerAsync const &)=delete
 
ShufflerAsyncoperator= (ShufflerAsync const &)=delete
 
constexpr std::shared_ptr< Context > const & ctx () const
 Gets the streaming context associated with this shuffler. More...
 
std::shared_ptr< Communicator > const & comm () const noexcept
 Gets the communicator associated with this shuffler. More...
 
constexpr shuffler::PartID total_num_partitions () const
 Gets the total number of partitions for this shuffle operation. More...
 
constexpr shuffler::Shuffler::PartitionOwner const & partition_owner () const
 Gets the partition owner function used by this shuffler. More...
 
std::span< shuffler::PartID const > local_partitions () const
 Returns the local partition IDs owned by the current node. More...
 
void insert (std::unordered_map< shuffler::PartID, PackedData > &&chunks)
 Insert a bunch of packed (serialized) chunks into the shuffle. More...
 
Actor insert_finished ()
 Signal that no more data will be inserted into the shuffle. More...
 
std::vector< PackedDataextract (shuffler::PartID pid)
 Extract all chunks belonging to the specified partition. More...
 

Detailed Description

An asynchronous shuffler that wraps the synchronous shuffler with a coroutine interface.

ShufflerAsync provides an asynchronous interface to the shuffler, allowing data to be inserted and then extracted after the shuffle completes. All local partitions complete simultaneously, so extraction is non-blocking after awaiting insert_finished().

Warning
The coroutine returned by insert_finished() must be awaited before the object is destroyed, otherwise the shuffle with terminate in destruction and/or deadlocks will occur.

Example usage:

auto shuffle = ShufflerAsync(...);
while (...) {
shuffle.insert(...);
}
co_await shuffle.insert_finished();
for (auto pid : shuffle.local_partitions()) {
auto chunks = shuffle.extract(pid);
// process chunks...
}
ShufflerAsync(std::shared_ptr< Context > ctx, std::shared_ptr< Communicator > comm, OpID op_id, shuffler::PartID total_num_partitions, shuffler::Shuffler::PartitionOwner partition_owner=shuffler::Shuffler::round_robin)
Constructs a new ShufflerAsync instance.

{}

Definition at line 43 of file shuffler.hpp.

Constructor & Destructor Documentation

◆ ShufflerAsync()

rapidsmpf::streaming::ShufflerAsync::ShufflerAsync ( std::shared_ptr< Context ctx,
std::shared_ptr< Communicator comm,
OpID  op_id,
shuffler::PartID  total_num_partitions,
shuffler::Shuffler::PartitionOwner  partition_owner = shuffler::Shuffler::round_robin 
)

Constructs a new ShufflerAsync instance.

Parameters
ctxThe streaming context to use.
commCommunicator for the collective operation.
op_idUnique operation ID for this shuffle. Must not be reused until all participants have completed the shuffle operation.
total_num_partitionsTotal number of partitions to shuffle data into.
partition_ownerFunction that maps a partition ID to its owning rank/node. Defaults to round-robin distribution.
Note
The caller promises that inserted buffers are stream-ordered with respect to their own stream, and extracted buffers are likewise guaranteed to be stream- ordered with respect to their own stream.

Member Function Documentation

◆ comm()

std::shared_ptr<Communicator> const& rapidsmpf::streaming::ShufflerAsync::comm ( ) const
inlinenoexcept

Gets the communicator associated with this shuffler.

Returns
Shared pointer to communicator.

Definition at line 89 of file shuffler.hpp.

◆ ctx()

constexpr std::shared_ptr<Context> const& rapidsmpf::streaming::ShufflerAsync::ctx ( ) const
inlineconstexpr

Gets the streaming context associated with this shuffler.

Returns
A reference to the shared context object.

Definition at line 80 of file shuffler.hpp.

◆ extract()

std::vector<PackedData> rapidsmpf::streaming::ShufflerAsync::extract ( shuffler::PartID  pid)

Extract all chunks belonging to the specified partition.

Parameters
pidThe ID of the partition to extract.
Exceptions
std::logic_errorIf the partition has already been extracted or is otherwise not available.
Returns
A vector of PackedData chunks associated with the partition.

◆ insert()

void rapidsmpf::streaming::ShufflerAsync::insert ( std::unordered_map< shuffler::PartID, PackedData > &&  chunks)

Insert a bunch of packed (serialized) chunks into the shuffle.

Note
Concurrent insertion by multiple threads is supported, the caller must ensure that insert_finished() is called after all insert() calls have completed.
Parameters
chunksA map of partition IDs and their packed chunks.

◆ insert_finished()

Actor rapidsmpf::streaming::ShufflerAsync::insert_finished ( )

Signal that no more data will be inserted into the shuffle.

This informs the shuffler that this rank has finished inserting data. Must be called exactly once.

Note
If multiple threads are insert()ing, you must establish a happens-before relationship between the completion of all insert()s and the final call to insert_finished().
This coroutine function must be awaited to ensure the shuffler has fully completed its asynchronous operations.
Returns
A coroutine that inserts the finish marker and suspends until the shuffle has completed. Once complete,

◆ local_partitions()

std::span<shuffler::PartID const> rapidsmpf::streaming::ShufflerAsync::local_partitions ( ) const

Returns the local partition IDs owned by the current node.

Parameters
commThe communicator to use.
total_num_partitionsTotal number of partitions in the shuffle.
partition_ownerFunction that determines partition ownership.
Returns
A vector of partition IDs owned by the current node.

◆ partition_owner()

constexpr shuffler::Shuffler::PartitionOwner const& rapidsmpf::streaming::ShufflerAsync::partition_owner ( ) const
inlineconstexpr

Gets the partition owner function used by this shuffler.

Returns
A const reference to the function that maps partition IDs to owning ranks.

Definition at line 108 of file shuffler.hpp.

◆ total_num_partitions()

constexpr shuffler::PartID rapidsmpf::streaming::ShufflerAsync::total_num_partitions ( ) const
inlineconstexpr

Gets the total number of partitions for this shuffle operation.

Returns
The total number of partitions that data will be shuffled into.

Definition at line 98 of file shuffler.hpp.


The documentation for this class was generated from the following file: