Public Types | Public Member Functions | Static Public Member Functions | Public Attributes | Static Public Attributes | List of all members
rapidsmpf::shuffler::Shuffler Class Reference

Shuffle service for cuDF tables. More...

#include <shuffler.hpp>

Public Types

using PartitionOwner = std::function< Rank(std::shared_ptr< Communicator > const &, PartID, PartID)>
 Function that given a Communicator, PartID, and total partition count, returns the rapidsmpf::Rank of the owning node.
 
using FinishedCallback = detail::FinishCounter::FinishedCallback
 Callback function type called when a partition is finished. More...
 

Public Member Functions

 Shuffler (std::shared_ptr< Communicator > comm, OpID op_id, PartID total_num_partitions, BufferResource *br, FinishedCallback &&finished_callback, PartitionOwner partition_owner=round_robin)
 Construct a new shuffler for a single shuffle. More...
 
 Shuffler (std::shared_ptr< Communicator > comm, OpID op_id, PartID total_num_partitions, BufferResource *br, PartitionOwner partition_owner=round_robin)
 Construct a new shuffler for a single shuffle. More...
 
std::shared_ptr< Communicator > const & comm () const noexcept
 Gets the communicator associated with this Shuffler. More...
 
 Shuffler (Shuffler const &)=delete
 
Shuffleroperator= (Shuffler const &)=delete
 
void shutdown ()
 Shutdown the shuffle, blocking until all inflight communication is done. More...
 
void insert (std::unordered_map< PartID, PackedData > &&chunks)
 Insert a bunch of packed (serialized) chunks into the shuffle. More...
 
void insert_finished (PartID pid)
 Insert a finish mark for a partition. More...
 
void insert_finished (std::vector< PartID > &&pids)
 Insert a finish mark for a list of partitions. More...
 
std::vector< PackedDataextract (PartID pid)
 Extract all chunks belonging to the specified partition. More...
 
bool finished () const
 Check if all partitions are finished. More...
 
bool is_finished (PartID pid) const
 Check if a partition is finished. More...
 
PartID wait_any (std::optional< std::chrono::milliseconds > timeout={})
 Wait for any partition to finish. More...
 
void wait_on (PartID pid, std::optional< std::chrono::milliseconds > timeout={})
 Wait for a specific partition to finish (blocking). More...
 
std::size_t spill (std::optional< std::size_t > amount=std::nullopt)
 Spills data to device if necessary. More...
 
std::string str () const
 Returns a description of this instance. More...
 
std::span< PartID const > local_partitions () const
 Returns the local partition IDs owned by the shuffler`. More...
 

Static Public Member Functions

static Rank round_robin (std::shared_ptr< Communicator > const &comm, PartID pid, [[maybe_unused]] PartID total_num_partitions)
 A PartitionOwner that distributes partitions using round robin assignment. More...
 
static Rank contiguous (std::shared_ptr< Communicator > const &comm, PartID pid, PartID total_num_partitions)
 A PartitionOwner that assigns contiguous partition ID ranges to ranks. More...
 
static std::vector< PartIDlocal_partitions (std::shared_ptr< Communicator > const &comm, PartID total_num_partitions, PartitionOwner partition_owner)
 Returns the local partition IDs owned by the current node. More...
 
static constexpr std::uint64_t extract_counter (detail::ChunkID cid)
 Extract the counter from a chunk ID. More...
 
static constexpr Rank extract_rank (detail::ChunkID cid)
 Extract the rank from a chunk ID. More...
 
static constexpr std::pair< Rank, std::uint64_t > extract_info (detail::ChunkID cid)
 Extract the rank and counter from a chunk ID. More...
 

Public Attributes

PartID const total_num_partitions
 Total number of partition in the shuffle.
 
PartitionOwner const partition_owner
 Function to determine partition ownership.
 

Static Public Attributes

static constexpr int chunk_id_counter_bits = 38
 The number of bits used to store the counter in a chunk ID.
 
static constexpr std::uint64_t counter_mask
 The mask for the counter in a chunk ID. More...
 

Detailed Description

Shuffle service for cuDF tables.

The Shuffler class provides an interface for performing a shuffle operation on cuDF tables, using a partitioning scheme to distribute and collect data chunks across different ranks.

Definition at line 44 of file shuffler.hpp.

Member Typedef Documentation

◆ FinishedCallback

Callback function type called when a partition is finished.

The callback receives the partition ID of the finished partition.

Warning
A callback must be fast and non-blocking and should not call any of the wait* methods. And be very careful if acquiring locks. Ideally it should be used to signal a separate thread to do the actual processing.

Definition at line 103 of file shuffler.hpp.

Constructor & Destructor Documentation

◆ Shuffler() [1/2]

rapidsmpf::shuffler::Shuffler::Shuffler ( std::shared_ptr< Communicator comm,
OpID  op_id,
PartID  total_num_partitions,
BufferResource br,
FinishedCallback &&  finished_callback,
PartitionOwner  partition_owner = round_robin 
)

Construct a new shuffler for a single shuffle.

Parameters
commThe communicator to use.
op_idThe operation ID of the shuffle. This ID is unique for this operation, and should not be reused until all nodes has called Shuffler::shutdown().
total_num_partitionsTotal number of partitions in the shuffle.
brBuffer resource used to allocate temporary and the shuffle result.
finished_callbackCallback to notify when a partition is finished.
partition_ownerFunction to determine partition ownership.
Note
The caller promises that inserted buffers are stream-ordered with respect to their own stream, and extracted buffers are likewise guaranteed to be stream- ordered with respect to their own stream.

◆ Shuffler() [2/2]

rapidsmpf::shuffler::Shuffler::Shuffler ( std::shared_ptr< Communicator comm,
OpID  op_id,
PartID  total_num_partitions,
BufferResource br,
PartitionOwner  partition_owner = round_robin 
)
inline

Construct a new shuffler for a single shuffle.

Parameters
commThe communicator to use.
op_idThe operation ID of the shuffle. This ID is unique for this operation, and should not be reused until all nodes has called Shuffler::shutdown().
total_num_partitionsTotal number of partitions in the shuffle.
brBuffer resource used to allocate temporary and the shuffle result.
partition_ownerFunction to determine partition ownership.
Note
The caller promises that inserted buffers are stream-ordered with respect to their own stream, and extracted buffers are likewise guaranteed to be stream- ordered with respect to their own stream.

Definition at line 143 of file shuffler.hpp.

Member Function Documentation

◆ comm()

std::shared_ptr<Communicator> const& rapidsmpf::shuffler::Shuffler::comm ( ) const
inlinenoexcept

Gets the communicator associated with this Shuffler.

Returns
Shared pointer to communicator.

Definition at line 159 of file shuffler.hpp.

◆ contiguous()

static Rank rapidsmpf::shuffler::Shuffler::contiguous ( std::shared_ptr< Communicator > const &  comm,
PartID  pid,
PartID  total_num_partitions 
)
inlinestatic

A PartitionOwner that assigns contiguous partition ID ranges to ranks.

Rank 0 gets [0, k), rank 1 gets [k, 2k), etc. Use for sort so that each rank's local_partitions() are adjacent and in order.

Parameters
commThe communicator to use.
pidThe partition ID to query.
total_num_partitionsTotal number of partitions (must match the shuffle).
Returns
The rank owning the partition.

Definition at line 80 of file shuffler.hpp.

◆ extract()

std::vector<PackedData> rapidsmpf::shuffler::Shuffler::extract ( PartID  pid)

Extract all chunks belonging to the specified partition.

It is valid to extract a partition that has not yet been fully received. In such cases, only the chunks received so far are returned.

To ensure the partition is complete, use wait_any(), wait_on(), or another appropriate synchronization mechanism beforehand.

Parameters
pidThe ID of the partition to extract.
Returns
A vector of PackedData chunks associated with the partition.

◆ extract_counter()

static constexpr std::uint64_t rapidsmpf::shuffler::Shuffler::extract_counter ( detail::ChunkID  cid)
inlinestaticconstexpr

Extract the counter from a chunk ID.

Parameters
cidThe chunk ID.
Returns
The counter.

Definition at line 290 of file shuffler.hpp.

◆ extract_info()

static constexpr std::pair<Rank, std::uint64_t> rapidsmpf::shuffler::Shuffler::extract_info ( detail::ChunkID  cid)
inlinestaticconstexpr

Extract the rank and counter from a chunk ID.

Parameters
cidThe chunk ID.
Returns
A pair of the rank and counter.

Definition at line 308 of file shuffler.hpp.

◆ extract_rank()

static constexpr Rank rapidsmpf::shuffler::Shuffler::extract_rank ( detail::ChunkID  cid)
inlinestaticconstexpr

Extract the rank from a chunk ID.

Parameters
cidThe chunk ID.
Returns
The rank.

Definition at line 299 of file shuffler.hpp.

◆ finished()

bool rapidsmpf::shuffler::Shuffler::finished ( ) const

Check if all partitions are finished.

Returns
True if all partitions are finished, otherwise False.

◆ insert()

void rapidsmpf::shuffler::Shuffler::insert ( std::unordered_map< PartID, PackedData > &&  chunks)

Insert a bunch of packed (serialized) chunks into the shuffle.

Parameters
chunksA map of partition IDs and their packed chunks.

◆ insert_finished() [1/2]

void rapidsmpf::shuffler::Shuffler::insert_finished ( PartID  pid)

Insert a finish mark for a partition.

This tells the shuffler that no more chunks of the specified partition are coming.

Parameters
pidThe partition ID to mark as finished.

◆ insert_finished() [2/2]

void rapidsmpf::shuffler::Shuffler::insert_finished ( std::vector< PartID > &&  pids)

Insert a finish mark for a list of partitions.

Parameters
pidsThe list of partition IDs to mark as finished.

◆ is_finished()

bool rapidsmpf::shuffler::Shuffler::is_finished ( PartID  pid) const

Check if a partition is finished.

Parameters
pidThe partition ID to check.
Returns
True if the partition is finished, otherwise False.

◆ local_partitions() [1/2]

std::span<PartID const> rapidsmpf::shuffler::Shuffler::local_partitions ( ) const

Returns the local partition IDs owned by the shuffler`.

Returns
A span of partition IDs owned by the shuffler.

◆ local_partitions() [2/2]

static std::vector<PartID> rapidsmpf::shuffler::Shuffler::local_partitions ( std::shared_ptr< Communicator > const &  comm,
PartID  total_num_partitions,
PartitionOwner  partition_owner 
)
static

Returns the local partition IDs owned by the current node.

Parameters
commThe communicator to use.
total_num_partitionsTotal number of partitions in the shuffle.
partition_ownerFunction that determines partition ownership.
Returns
A vector of partition IDs owned by the current node.

◆ round_robin()

static Rank rapidsmpf::shuffler::Shuffler::round_robin ( std::shared_ptr< Communicator > const &  comm,
PartID  pid,
[[maybe_unused] ] PartID  total_num_partitions 
)
inlinestatic

A PartitionOwner that distributes partitions using round robin assignment.

Parameters
commThe communicator to use.
pidThe partition ID to query.
total_num_partitionsTotal number of partitions (unused).
Returns
The rank owning the partition.

Definition at line 61 of file shuffler.hpp.

◆ shutdown()

void rapidsmpf::shuffler::Shuffler::shutdown ( )

Shutdown the shuffle, blocking until all inflight communication is done.

Exceptions
std::logic_errorIf the shuffler is already inactive.

◆ spill()

std::size_t rapidsmpf::shuffler::Shuffler::spill ( std::optional< std::size_t >  amount = std::nullopt)

Spills data to device if necessary.

This function has two modes:

  • If amount is specified, it tries to spill at least amount bytes of device memory.
  • If amount is not specified (the default case), it spills based on the current available device memory returned by the buffer resource.
Parameters
amountAn optional amount of memory to spill. If not provided, the function will check the current available device memory.
Returns
The amount of memory actually spilled.

◆ str()

std::string rapidsmpf::shuffler::Shuffler::str ( ) const

Returns a description of this instance.

Returns
The description.

◆ wait_any()

PartID rapidsmpf::shuffler::Shuffler::wait_any ( std::optional< std::chrono::milliseconds >  timeout = {})

Wait for any partition to finish.

Parameters
timeoutOptional timeout (ms) to wait.
Returns
The partition ID of the next finished partition.
Exceptions
std::runtime_errorif the timeout is reached.

◆ wait_on()

void rapidsmpf::shuffler::Shuffler::wait_on ( PartID  pid,
std::optional< std::chrono::milliseconds >  timeout = {} 
)

Wait for a specific partition to finish (blocking).

Parameters
pidThe desired partition ID.
timeoutOptional timeout (ms) to wait.
Exceptions
std::runtime_errorif the timeout is reached.

Member Data Documentation

◆ counter_mask

constexpr std::uint64_t rapidsmpf::shuffler::Shuffler::counter_mask
staticconstexpr
Initial value:
=
(std::uint64_t{1} << chunk_id_counter_bits) - 1
static constexpr int chunk_id_counter_bits
The number of bits used to store the counter in a chunk ID.
Definition: shuffler.hpp:277

The mask for the counter in a chunk ID.

Definition at line 282 of file shuffler.hpp.


The documentation for this class was generated from the following file: