Public Types | Public Member Functions | Static Public Member Functions | Public Attributes | Static Public Attributes | Friends | List of all members
rapidsmpf::shuffler::Shuffler Class Reference

Shuffle service for cuDF tables. More...

#include <shuffler.hpp>

Public Types

using PartitionOwner = std::function< Rank(std::shared_ptr< Communicator >, PartID)>
 Function that given a Communicator and a PartID, returns the rapidsmpf::Rank of the owning node.
 
using FinishedCallback = detail::FinishCounter::FinishedCallback
 Callback function type called when a partition is finished. More...
 

Public Member Functions

 Shuffler (std::shared_ptr< Communicator > comm, std::shared_ptr< ProgressThread > progress_thread, OpID op_id, PartID total_num_partitions, BufferResource *br, FinishedCallback &&finished_callback, std::shared_ptr< Statistics > statistics=Statistics::disabled(), PartitionOwner partition_owner=round_robin)
 Construct a new shuffler for a single shuffle. More...
 
 Shuffler (std::shared_ptr< Communicator > comm, std::shared_ptr< ProgressThread > progress_thread, OpID op_id, PartID total_num_partitions, BufferResource *br, std::shared_ptr< Statistics > statistics=Statistics::disabled(), PartitionOwner partition_owner=round_robin)
 Construct a new shuffler for a single shuffle. More...
 
 Shuffler (Shuffler const &)=delete
 
Shuffleroperator= (Shuffler const &)=delete
 
void shutdown ()
 Shutdown the shuffle, blocking until all inflight communication is done. More...
 
void concat_insert (std::unordered_map< PartID, PackedData > &&chunks)
 Insert a map of packed data, grouping them by destination rank, and concatenating into a single chunk per rank. More...
 
void insert (std::unordered_map< PartID, PackedData > &&chunks)
 Insert a bunch of packed (serialized) chunks into the shuffle. More...
 
void insert_finished (PartID pid)
 Insert a finish mark for a partition. More...
 
void insert_finished (std::vector< PartID > &&pids)
 Insert a finish mark for a list of partitions. More...
 
std::vector< PackedDataextract (PartID pid)
 Extract all chunks belonging to the specified partition. More...
 
bool finished () const
 Check if all partitions are finished. More...
 
bool is_finished (PartID pid) const
 Check if a partition is finished. More...
 
PartID wait_any (std::optional< std::chrono::milliseconds > timeout={})
 Wait for any partition to finish. More...
 
void wait_on (PartID pid, std::optional< std::chrono::milliseconds > timeout={})
 Wait for a specific partition to finish (blocking). More...
 
std::size_t spill (std::optional< std::size_t > amount=std::nullopt)
 Spills data to device if necessary. More...
 
std::string str () const
 Returns a description of this instance. More...
 
std::span< PartID const > local_partitions () const
 Returns the local partition IDs owned by the shuffler`. More...
 

Static Public Member Functions

static Rank round_robin (std::shared_ptr< Communicator > const &comm, PartID pid)
 A PartitionOwner that distribute the partition using round robin. More...
 
static std::vector< PartIDlocal_partitions (std::shared_ptr< Communicator > const &comm, PartID total_num_partitions, PartitionOwner partition_owner)
 Returns the local partition IDs owned by the current node. More...
 
static constexpr uint64_t extract_counter (detail::ChunkID cid)
 Extract the counter from a chunk ID. More...
 
static constexpr Rank extract_rank (detail::ChunkID cid)
 Extract the rank from a chunk ID. More...
 
static constexpr std::pair< Rank, uint64_t > extract_info (detail::ChunkID cid)
 Extract the rank and counter from a chunk ID. More...
 

Public Attributes

PartID const total_num_partitions
 Total number of partition in the shuffle.
 
PartitionOwner const partition_owner
 Function to determine partition ownership.
 

Static Public Attributes

static constexpr int chunk_id_counter_bits = 38
 The number of bits used to store the counter in a chunk ID.
 
static constexpr uint64_t counter_mask = (uint64_t(1) << chunk_id_counter_bits) - 1
 The mask for the counter in a chunk ID.
 

Friends

class ::ShuffleInsertGroupedTest
 

Detailed Description

Shuffle service for cuDF tables.

The Shuffler class provides an interface for performing a shuffle operation on cuDF tables, using a partitioning scheme to distribute and collect data chunks across different ranks.

Definition at line 47 of file shuffler.hpp.

Member Typedef Documentation

◆ FinishedCallback

Callback function type called when a partition is finished.

The callback receives the partition ID of the finished partition.

Warning
A callback must be fast and non-blocking and should not call any of the wait* methods. And be very careful if acquiring locks. Ideally it should be used to signal a separate thread to do the actual processing.

Definition at line 83 of file shuffler.hpp.

Constructor & Destructor Documentation

◆ Shuffler() [1/2]

rapidsmpf::shuffler::Shuffler::Shuffler ( std::shared_ptr< Communicator comm,
std::shared_ptr< ProgressThread progress_thread,
OpID  op_id,
PartID  total_num_partitions,
BufferResource br,
FinishedCallback &&  finished_callback,
std::shared_ptr< Statistics statistics = Statistics::disabled(),
PartitionOwner  partition_owner = round_robin 
)

Construct a new shuffler for a single shuffle.

Parameters
commThe communicator to use.
progress_threadThe progress thread to use.
op_idThe operation ID of the shuffle. This ID is unique for this operation, and should not be reused until all nodes has called Shuffler::shutdown().
total_num_partitionsTotal number of partitions in the shuffle.
brBuffer resource used to allocate temporary and the shuffle result.
finished_callbackCallback to notify when a partition is finished.
statisticsThe statistics instance to use (disabled by default).
partition_ownerFunction to determine partition ownership.
Note
The caller promises that inserted buffers are stream-ordered with respect to their own stream, and extracted buffers are likewise guaranteed to be stream- ordered with respect to their own stream.

◆ Shuffler() [2/2]

rapidsmpf::shuffler::Shuffler::Shuffler ( std::shared_ptr< Communicator comm,
std::shared_ptr< ProgressThread progress_thread,
OpID  op_id,
PartID  total_num_partitions,
BufferResource br,
std::shared_ptr< Statistics statistics = Statistics::disabled(),
PartitionOwner  partition_owner = round_robin 
)
inline

Construct a new shuffler for a single shuffle.

Parameters
commThe communicator to use.
progress_threadThe progress thread to use.
op_idThe operation ID of the shuffle. This ID is unique for this operation, and should not be reused until all nodes has called Shuffler::shutdown().
total_num_partitionsTotal number of partitions in the shuffle.
brBuffer resource used to allocate temporary and the shuffle result.
statisticsThe statistics instance to use (disabled by default).
partition_ownerFunction to determine partition ownership.
Note
The caller promises that inserted buffers are stream-ordered with respect to their own stream, and extracted buffers are likewise guaranteed to be stream- ordered with respect to their own stream.

Definition at line 129 of file shuffler.hpp.

Member Function Documentation

◆ concat_insert()

void rapidsmpf::shuffler::Shuffler::concat_insert ( std::unordered_map< PartID, PackedData > &&  chunks)

Insert a map of packed data, grouping them by destination rank, and concatenating into a single chunk per rank.

Parameters
chunksA map of partition IDs and their packed chunks.

◆ extract()

std::vector<PackedData> rapidsmpf::shuffler::Shuffler::extract ( PartID  pid)

Extract all chunks belonging to the specified partition.

It is valid to extract a partition that has not yet been fully received. In such cases, only the chunks received so far are returned.

To ensure the partition is complete, use wait_any(), wait_on(), or another appropriate synchronization mechanism beforehand.

Parameters
pidThe ID of the partition to extract.
Returns
A vector of PackedData chunks associated with the partition.

◆ extract_counter()

static constexpr uint64_t rapidsmpf::shuffler::Shuffler::extract_counter ( detail::ChunkID  cid)
inlinestaticconstexpr

Extract the counter from a chunk ID.

Parameters
cidThe chunk ID.
Returns
The counter.

Definition at line 288 of file shuffler.hpp.

◆ extract_info()

static constexpr std::pair<Rank, uint64_t> rapidsmpf::shuffler::Shuffler::extract_info ( detail::ChunkID  cid)
inlinestaticconstexpr

Extract the rank and counter from a chunk ID.

Parameters
cidThe chunk ID.
Returns
A pair of the rank and counter.

Definition at line 306 of file shuffler.hpp.

◆ extract_rank()

static constexpr Rank rapidsmpf::shuffler::Shuffler::extract_rank ( detail::ChunkID  cid)
inlinestaticconstexpr

Extract the rank from a chunk ID.

Parameters
cidThe chunk ID.
Returns
The rank.

Definition at line 297 of file shuffler.hpp.

◆ finished()

bool rapidsmpf::shuffler::Shuffler::finished ( ) const

Check if all partitions are finished.

Returns
True if all partitions are finished, otherwise False.

◆ insert()

void rapidsmpf::shuffler::Shuffler::insert ( std::unordered_map< PartID, PackedData > &&  chunks)

Insert a bunch of packed (serialized) chunks into the shuffle.

Parameters
chunksA map of partition IDs and their packed chunks.

◆ insert_finished() [1/2]

void rapidsmpf::shuffler::Shuffler::insert_finished ( PartID  pid)

Insert a finish mark for a partition.

This tells the shuffler that no more chunks of the specified partition are coming.

Parameters
pidThe partition ID to mark as finished.

◆ insert_finished() [2/2]

void rapidsmpf::shuffler::Shuffler::insert_finished ( std::vector< PartID > &&  pids)

Insert a finish mark for a list of partitions.

Parameters
pidsThe list of partition IDs to mark as finished.

◆ is_finished()

bool rapidsmpf::shuffler::Shuffler::is_finished ( PartID  pid) const

Check if a partition is finished.

Parameters
pidThe partition ID to check.
Returns
True if the partition is finished, otherwise False.

◆ local_partitions() [1/2]

std::span<PartID const> rapidsmpf::shuffler::Shuffler::local_partitions ( ) const

Returns the local partition IDs owned by the shuffler`.

Returns
A span of partition IDs owned by the shuffler.

◆ local_partitions() [2/2]

static std::vector<PartID> rapidsmpf::shuffler::Shuffler::local_partitions ( std::shared_ptr< Communicator > const &  comm,
PartID  total_num_partitions,
PartitionOwner  partition_owner 
)
static

Returns the local partition IDs owned by the current node.

Parameters
commThe communicator to use.
total_num_partitionsTotal number of partitions in the shuffle.
partition_ownerFunction that determines partition ownership.
Returns
A vector of partition IDs owned by the current node.

◆ round_robin()

static Rank rapidsmpf::shuffler::Shuffler::round_robin ( std::shared_ptr< Communicator > const &  comm,
PartID  pid 
)
inlinestatic

A PartitionOwner that distribute the partition using round robin.

Parameters
commThe communicator to use.
pidThe partition ID to query.
Returns
The rank owning the partition.

Definition at line 64 of file shuffler.hpp.

◆ shutdown()

void rapidsmpf::shuffler::Shuffler::shutdown ( )

Shutdown the shuffle, blocking until all inflight communication is done.

Exceptions
std::logic_errorIf the shuffler is already inactive.

◆ spill()

std::size_t rapidsmpf::shuffler::Shuffler::spill ( std::optional< std::size_t >  amount = std::nullopt)

Spills data to device if necessary.

This function has two modes:

  • If amount is specified, it tries to spill at least amount bytes of device memory.
  • If amount is not specified (the default case), it spills based on the current available device memory returned by the buffer resource.

In both modes, it adds to the "spill-device-limit-breach" statistic if not enough memory could be spilled.

Parameters
amountAn optional amount of memory to spill. If not provided, the function will check the current available device memory.
Returns
The amount of memory actually spilled.

◆ str()

std::string rapidsmpf::shuffler::Shuffler::str ( ) const

Returns a description of this instance.

Returns
The description.

◆ wait_any()

PartID rapidsmpf::shuffler::Shuffler::wait_any ( std::optional< std::chrono::milliseconds >  timeout = {})

Wait for any partition to finish.

Parameters
timeoutOptional timeout (ms) to wait.
Returns
The partition ID of the next finished partition.
Exceptions
std::runtime_errorif the timeout is reached.

◆ wait_on()

void rapidsmpf::shuffler::Shuffler::wait_on ( PartID  pid,
std::optional< std::chrono::milliseconds >  timeout = {} 
)

Wait for a specific partition to finish (blocking).

Parameters
pidThe desired partition ID.
timeoutOptional timeout (ms) to wait.
Exceptions
std::runtime_errorif the timeout is reached.

The documentation for this class was generated from the following file: