Public Types | Public Member Functions | Static Public Member Functions | Public Attributes | Static Public Attributes | List of all members
rapidsmpf::shuffler::Shuffler Class Reference

Shuffle service for all-to-all style communication of partitioned data. More...

#include <shuffler.hpp>

Public Types

using PartitionOwner = std::function< Rank(std::shared_ptr< Communicator > const &, PartID, PartID)>
 Function that given a Communicator, PartID, and total partition count, returns the rapidsmpf::Rank of the owning node.
 
using FinishedCallback = std::function< void()>
 Callback function type called when all partitions are finished and data can be extracted. More...
 

Public Member Functions

 Shuffler (std::shared_ptr< Communicator > comm, OpID op_id, PartID total_num_partitions, BufferResource *br, FinishedCallback &&finished_callback, PartitionOwner partition_owner=round_robin, std::unique_ptr< communicator::MetadataPayloadExchange > mpe=nullptr)
 Construct a new shuffler for a single shuffle. More...
 
 Shuffler (std::shared_ptr< Communicator > comm, OpID op_id, PartID total_num_partitions, BufferResource *br, PartitionOwner partition_owner=round_robin, std::unique_ptr< communicator::MetadataPayloadExchange > mpe=nullptr)
 Construct a new shuffler for a single shuffle. More...
 
std::shared_ptr< Communicator > const & comm () const noexcept
 Gets the communicator associated with this Shuffler. More...
 
 Shuffler (Shuffler const &)=delete
 
Shuffleroperator= (Shuffler const &)=delete
 
void shutdown ()
 Shutdown the shuffle, blocking until all inflight communication is done. More...
 
void insert (std::unordered_map< PartID, PackedData > &&chunks)
 Insert a bunch of packed (serialized) chunks into the shuffle. More...
 
void insert_finished ()
 Signal that no more data will be inserted into the shuffle. More...
 
std::vector< PackedDataextract (PartID pid)
 Extract all chunks belonging to the specified partition. More...
 
bool finished () const
 Check if all partitions are finished. More...
 
void wait (std::optional< std::chrono::milliseconds > timeout={})
 Wait for all partitions to finish (blocking). More...
 
std::size_t spill (std::optional< std::size_t > amount=std::nullopt)
 Spills data to device if necessary. More...
 
std::string str () const
 Returns a description of this instance. More...
 
std::span< PartID const > local_partitions () const
 Returns the local partition IDs owned by the shuffler`. More...
 

Static Public Member Functions

static Rank round_robin (std::shared_ptr< Communicator > const &comm, PartID pid, [[maybe_unused]] PartID total_num_partitions)
 A PartitionOwner that distributes partitions using round robin assignment. More...
 
static Rank contiguous (std::shared_ptr< Communicator > const &comm, PartID pid, PartID total_num_partitions)
 A PartitionOwner that assigns contiguous partition ID ranges to ranks. More...
 
static std::vector< PartIDlocal_partitions (std::shared_ptr< Communicator > const &comm, PartID total_num_partitions, PartitionOwner partition_owner)
 Returns the local partition IDs owned by the current node. More...
 
static constexpr Rank extract_rank (detail::ChunkID cid)
 Extract the rank from a chunk ID. More...
 

Public Attributes

PartID const total_num_partitions
 Total number of partition in the shuffle.
 
PartitionOwner const partition_owner
 Function to determine partition ownership.
 

Static Public Attributes

static constexpr int chunk_id_counter_bits = 38
 The number of bits used to store the counter in a chunk ID.
 

Detailed Description

Shuffle service for all-to-all style communication of partitioned data.

The Shuffler class provides an interface for performing a shuffle operation on distributed data, using a partitioning scheme to distribute and collect data chunks across different ranks.

Definition at line 46 of file shuffler.hpp.

Member Typedef Documentation

◆ FinishedCallback

using rapidsmpf::shuffler::Shuffler::FinishedCallback = std::function<void()>

Callback function type called when all partitions are finished and data can be extracted.

Warning
A callback must be fast and non-blocking. Ideally it should be used to signal a separate thread to do the actual processing.

Definition at line 111 of file shuffler.hpp.

Constructor & Destructor Documentation

◆ Shuffler() [1/2]

rapidsmpf::shuffler::Shuffler::Shuffler ( std::shared_ptr< Communicator comm,
OpID  op_id,
PartID  total_num_partitions,
BufferResource br,
FinishedCallback &&  finished_callback,
PartitionOwner  partition_owner = round_robin,
std::unique_ptr< communicator::MetadataPayloadExchange mpe = nullptr 
)

Construct a new shuffler for a single shuffle.

Parameters
commThe communicator to use.
op_idThe operation ID of the shuffle.
total_num_partitionsTotal number of partitions in the shuffle.
brBuffer resource used to allocate temporary and the shuffle result.
finished_callbackCallback to notify when all partitions are finished.
partition_ownerFunction to determine partition ownership.
mpeOptional custom metadata payload exchange. If not provided, uses the default tag-based implementation.
Note
It is safe to reuse the op_id as soon as wait has completed locally.
The caller promises that inserted buffers are stream-ordered with respect to their own stream, and extracted buffers are likewise guaranteed to be stream- ordered with respect to their own stream.

◆ Shuffler() [2/2]

rapidsmpf::shuffler::Shuffler::Shuffler ( std::shared_ptr< Communicator comm,
OpID  op_id,
PartID  total_num_partitions,
BufferResource br,
PartitionOwner  partition_owner = round_robin,
std::unique_ptr< communicator::MetadataPayloadExchange mpe = nullptr 
)
inline

Construct a new shuffler for a single shuffle.

Parameters
commThe communicator to use.
op_idThe operation ID of the shuffle. This ID is unique for this operation, and should not be reused until all nodes has called Shuffler::shutdown().
total_num_partitionsTotal number of partitions in the shuffle.
brBuffer resource used to allocate temporary and the shuffle result.
partition_ownerFunction to determine partition ownership.
mpeOptional custom metadata payload exchange. If not provided, uses the default tag-based implementation.
Note
The caller promises that inserted buffers are stream-ordered with respect to their own stream, and extracted buffers are likewise guaranteed to be stream- ordered with respect to their own stream.

Definition at line 158 of file shuffler.hpp.

Member Function Documentation

◆ comm()

std::shared_ptr<Communicator> const& rapidsmpf::shuffler::Shuffler::comm ( ) const
inlinenoexcept

Gets the communicator associated with this Shuffler.

Returns
Shared pointer to communicator.

Definition at line 183 of file shuffler.hpp.

◆ contiguous()

static Rank rapidsmpf::shuffler::Shuffler::contiguous ( std::shared_ptr< Communicator > const &  comm,
PartID  pid,
PartID  total_num_partitions 
)
inlinestatic

A PartitionOwner that assigns contiguous partition ID ranges to ranks.

Rank 0 gets [0, k), rank 1 gets [k, 2k), etc. Use for sort so that each rank's local_partitions() are adjacent and in order.

Parameters
commThe communicator to use.
pidThe partition ID to query.
total_num_partitionsTotal number of partitions (must match the shuffle).
Returns
The rank owning the partition.

Definition at line 82 of file shuffler.hpp.

◆ extract()

std::vector<PackedData> rapidsmpf::shuffler::Shuffler::extract ( PartID  pid)

Extract all chunks belonging to the specified partition.

It is valid to extract a partition that has not yet been fully received. In such cases, only the chunks received so far are returned.

To ensure the partition is complete, use wait() or another appropriate synchronization mechanism beforehand.

Parameters
pidThe ID of the partition to extract.
Returns
A vector of PackedData chunks associated with the partition.

◆ extract_rank()

static constexpr Rank rapidsmpf::shuffler::Shuffler::extract_rank ( detail::ChunkID  cid)
inlinestaticconstexpr

Extract the rank from a chunk ID.

Parameters
cidThe chunk ID.
Returns
The rank.

Definition at line 287 of file shuffler.hpp.

◆ finished()

bool rapidsmpf::shuffler::Shuffler::finished ( ) const

Check if all partitions are finished.

Returns
True if all partitions are finished, otherwise False.

◆ insert()

void rapidsmpf::shuffler::Shuffler::insert ( std::unordered_map< PartID, PackedData > &&  chunks)

Insert a bunch of packed (serialized) chunks into the shuffle.

Note
Concurrent insertion by multiple threads is supported, the caller must ensure that insert_finished() is called after all insert() calls have completed.
Parameters
chunksA map of partition IDs and their packed chunks.

◆ insert_finished()

void rapidsmpf::shuffler::Shuffler::insert_finished ( )

Signal that no more data will be inserted into the shuffle.

This informs the shuffler that this rank has finished inserting data. Must be called exactly once.

Note
If multiple threads are insert()ing, you must establish a happens-before relationship between the completion of all insert()s and the final call to insert_finished().

◆ local_partitions() [1/2]

std::span<PartID const> rapidsmpf::shuffler::Shuffler::local_partitions ( ) const

Returns the local partition IDs owned by the shuffler`.

Returns
A span of partition IDs owned by the shuffler.

◆ local_partitions() [2/2]

static std::vector<PartID> rapidsmpf::shuffler::Shuffler::local_partitions ( std::shared_ptr< Communicator > const &  comm,
PartID  total_num_partitions,
PartitionOwner  partition_owner 
)
static

Returns the local partition IDs owned by the current node.

Parameters
commThe communicator to use.
total_num_partitionsTotal number of partitions in the shuffle.
partition_ownerFunction that determines partition ownership.
Returns
A vector of partition IDs owned by the current node.

◆ round_robin()

static Rank rapidsmpf::shuffler::Shuffler::round_robin ( std::shared_ptr< Communicator > const &  comm,
PartID  pid,
[[maybe_unused] ] PartID  total_num_partitions 
)
inlinestatic

A PartitionOwner that distributes partitions using round robin assignment.

Parameters
commThe communicator to use.
pidThe partition ID to query.
total_num_partitionsTotal number of partitions (unused).
Returns
The rank owning the partition.

Definition at line 63 of file shuffler.hpp.

◆ shutdown()

void rapidsmpf::shuffler::Shuffler::shutdown ( )

Shutdown the shuffle, blocking until all inflight communication is done.

Exceptions
std::logic_errorIf the shuffler is already inactive.

◆ spill()

std::size_t rapidsmpf::shuffler::Shuffler::spill ( std::optional< std::size_t >  amount = std::nullopt)

Spills data to device if necessary.

This function has two modes:

  • If amount is specified, it tries to spill at least amount bytes of device memory.
  • If amount is not specified (the default case), it spills based on the current available device memory returned by the buffer resource.
Parameters
amountAn optional amount of memory to spill. If not provided, the function will check the current available device memory.
Returns
The amount of memory actually spilled.

◆ str()

std::string rapidsmpf::shuffler::Shuffler::str ( ) const

Returns a description of this instance.

Returns
The description.

◆ wait()

void rapidsmpf::shuffler::Shuffler::wait ( std::optional< std::chrono::milliseconds >  timeout = {})

Wait for all partitions to finish (blocking).

Parameters
timeoutOptional timeout (ms) to wait.
Exceptions
std::runtime_errorif the timeout is reached.

The documentation for this class was generated from the following file: