Shuffle service for cuDF tables. More...
#include <shuffler.hpp>
Public Types | |
| using | PartitionOwner = std::function< Rank(std::shared_ptr< Communicator > const &, PartID, PartID)> |
Function that given a Communicator, PartID, and total partition count, returns the rapidsmpf::Rank of the owning node. | |
| using | FinishedCallback = detail::FinishCounter::FinishedCallback |
| Callback function type called when a partition is finished. More... | |
Public Member Functions | |
| Shuffler (std::shared_ptr< Communicator > comm, OpID op_id, PartID total_num_partitions, BufferResource *br, FinishedCallback &&finished_callback, PartitionOwner partition_owner=round_robin) | |
| Construct a new shuffler for a single shuffle. More... | |
| Shuffler (std::shared_ptr< Communicator > comm, OpID op_id, PartID total_num_partitions, BufferResource *br, PartitionOwner partition_owner=round_robin) | |
| Construct a new shuffler for a single shuffle. More... | |
| std::shared_ptr< Communicator > const & | comm () const noexcept |
| Gets the communicator associated with this Shuffler. More... | |
| Shuffler (Shuffler const &)=delete | |
| Shuffler & | operator= (Shuffler const &)=delete |
| void | shutdown () |
| Shutdown the shuffle, blocking until all inflight communication is done. More... | |
| void | insert (std::unordered_map< PartID, PackedData > &&chunks) |
| Insert a bunch of packed (serialized) chunks into the shuffle. More... | |
| void | insert_finished (PartID pid) |
| Insert a finish mark for a partition. More... | |
| void | insert_finished (std::vector< PartID > &&pids) |
| Insert a finish mark for a list of partitions. More... | |
| std::vector< PackedData > | extract (PartID pid) |
| Extract all chunks belonging to the specified partition. More... | |
| bool | finished () const |
| Check if all partitions are finished. More... | |
| bool | is_finished (PartID pid) const |
| Check if a partition is finished. More... | |
| PartID | wait_any (std::optional< std::chrono::milliseconds > timeout={}) |
| Wait for any partition to finish. More... | |
| void | wait_on (PartID pid, std::optional< std::chrono::milliseconds > timeout={}) |
| Wait for a specific partition to finish (blocking). More... | |
| std::size_t | spill (std::optional< std::size_t > amount=std::nullopt) |
| Spills data to device if necessary. More... | |
| std::string | str () const |
| Returns a description of this instance. More... | |
| std::span< PartID const > | local_partitions () const |
| Returns the local partition IDs owned by the shuffler`. More... | |
Static Public Member Functions | |
| static Rank | round_robin (std::shared_ptr< Communicator > const &comm, PartID pid, [[maybe_unused]] PartID total_num_partitions) |
A PartitionOwner that distributes partitions using round robin assignment. More... | |
| static Rank | contiguous (std::shared_ptr< Communicator > const &comm, PartID pid, PartID total_num_partitions) |
A PartitionOwner that assigns contiguous partition ID ranges to ranks. More... | |
| static std::vector< PartID > | local_partitions (std::shared_ptr< Communicator > const &comm, PartID total_num_partitions, PartitionOwner partition_owner) |
| Returns the local partition IDs owned by the current node. More... | |
| static constexpr std::uint64_t | extract_counter (detail::ChunkID cid) |
| Extract the counter from a chunk ID. More... | |
| static constexpr Rank | extract_rank (detail::ChunkID cid) |
| Extract the rank from a chunk ID. More... | |
| static constexpr std::pair< Rank, std::uint64_t > | extract_info (detail::ChunkID cid) |
| Extract the rank and counter from a chunk ID. More... | |
Public Attributes | |
| PartID const | total_num_partitions |
| Total number of partition in the shuffle. | |
| PartitionOwner const | partition_owner |
| Function to determine partition ownership. | |
Static Public Attributes | |
| static constexpr int | chunk_id_counter_bits = 38 |
| The number of bits used to store the counter in a chunk ID. | |
| static constexpr std::uint64_t | counter_mask |
| The mask for the counter in a chunk ID. More... | |
Shuffle service for cuDF tables.
The Shuffler class provides an interface for performing a shuffle operation on cuDF tables, using a partitioning scheme to distribute and collect data chunks across different ranks.
Definition at line 44 of file shuffler.hpp.
Callback function type called when a partition is finished.
The callback receives the partition ID of the finished partition.
wait* methods. And be very careful if acquiring locks. Ideally it should be used to signal a separate thread to do the actual processing. Definition at line 103 of file shuffler.hpp.
| rapidsmpf::shuffler::Shuffler::Shuffler | ( | std::shared_ptr< Communicator > | comm, |
| OpID | op_id, | ||
| PartID | total_num_partitions, | ||
| BufferResource * | br, | ||
| FinishedCallback && | finished_callback, | ||
| PartitionOwner | partition_owner = round_robin |
||
| ) |
Construct a new shuffler for a single shuffle.
| comm | The communicator to use. |
| op_id | The operation ID of the shuffle. This ID is unique for this operation, and should not be reused until all nodes has called Shuffler::shutdown(). |
| total_num_partitions | Total number of partitions in the shuffle. |
| br | Buffer resource used to allocate temporary and the shuffle result. |
| finished_callback | Callback to notify when a partition is finished. |
| partition_owner | Function to determine partition ownership. |
|
inline |
Construct a new shuffler for a single shuffle.
| comm | The communicator to use. |
| op_id | The operation ID of the shuffle. This ID is unique for this operation, and should not be reused until all nodes has called Shuffler::shutdown(). |
| total_num_partitions | Total number of partitions in the shuffle. |
| br | Buffer resource used to allocate temporary and the shuffle result. |
| partition_owner | Function to determine partition ownership. |
Definition at line 143 of file shuffler.hpp.
|
inlinenoexcept |
Gets the communicator associated with this Shuffler.
Definition at line 159 of file shuffler.hpp.
|
inlinestatic |
A PartitionOwner that assigns contiguous partition ID ranges to ranks.
Rank 0 gets [0, k), rank 1 gets [k, 2k), etc. Use for sort so that each rank's local_partitions() are adjacent and in order.
| comm | The communicator to use. |
| pid | The partition ID to query. |
| total_num_partitions | Total number of partitions (must match the shuffle). |
Definition at line 80 of file shuffler.hpp.
| std::vector<PackedData> rapidsmpf::shuffler::Shuffler::extract | ( | PartID | pid | ) |
Extract all chunks belonging to the specified partition.
It is valid to extract a partition that has not yet been fully received. In such cases, only the chunks received so far are returned.
To ensure the partition is complete, use wait_any(), wait_on(), or another appropriate synchronization mechanism beforehand.
| pid | The ID of the partition to extract. |
|
inlinestaticconstexpr |
Extract the counter from a chunk ID.
| cid | The chunk ID. |
Definition at line 290 of file shuffler.hpp.
|
inlinestaticconstexpr |
Extract the rank and counter from a chunk ID.
| cid | The chunk ID. |
Definition at line 308 of file shuffler.hpp.
|
inlinestaticconstexpr |
Extract the rank from a chunk ID.
| cid | The chunk ID. |
Definition at line 299 of file shuffler.hpp.
| bool rapidsmpf::shuffler::Shuffler::finished | ( | ) | const |
Check if all partitions are finished.
| void rapidsmpf::shuffler::Shuffler::insert | ( | std::unordered_map< PartID, PackedData > && | chunks | ) |
Insert a bunch of packed (serialized) chunks into the shuffle.
| chunks | A map of partition IDs and their packed chunks. |
| void rapidsmpf::shuffler::Shuffler::insert_finished | ( | PartID | pid | ) |
Insert a finish mark for a partition.
This tells the shuffler that no more chunks of the specified partition are coming.
| pid | The partition ID to mark as finished. |
| void rapidsmpf::shuffler::Shuffler::insert_finished | ( | std::vector< PartID > && | pids | ) |
Insert a finish mark for a list of partitions.
| pids | The list of partition IDs to mark as finished. |
| bool rapidsmpf::shuffler::Shuffler::is_finished | ( | PartID | pid | ) | const |
Check if a partition is finished.
| pid | The partition ID to check. |
| std::span<PartID const> rapidsmpf::shuffler::Shuffler::local_partitions | ( | ) | const |
Returns the local partition IDs owned by the shuffler`.
|
static |
Returns the local partition IDs owned by the current node.
| comm | The communicator to use. |
| total_num_partitions | Total number of partitions in the shuffle. |
| partition_owner | Function that determines partition ownership. |
|
inlinestatic |
A PartitionOwner that distributes partitions using round robin assignment.
| comm | The communicator to use. |
| pid | The partition ID to query. |
| total_num_partitions | Total number of partitions (unused). |
Definition at line 61 of file shuffler.hpp.
| void rapidsmpf::shuffler::Shuffler::shutdown | ( | ) |
Shutdown the shuffle, blocking until all inflight communication is done.
| std::logic_error | If the shuffler is already inactive. |
| std::size_t rapidsmpf::shuffler::Shuffler::spill | ( | std::optional< std::size_t > | amount = std::nullopt | ) |
Spills data to device if necessary.
This function has two modes:
amount is specified, it tries to spill at least amount bytes of device memory.amount is not specified (the default case), it spills based on the current available device memory returned by the buffer resource.| amount | An optional amount of memory to spill. If not provided, the function will check the current available device memory. |
| std::string rapidsmpf::shuffler::Shuffler::str | ( | ) | const |
Returns a description of this instance.
| PartID rapidsmpf::shuffler::Shuffler::wait_any | ( | std::optional< std::chrono::milliseconds > | timeout = {} | ) |
Wait for any partition to finish.
| timeout | Optional timeout (ms) to wait. |
| std::runtime_error | if the timeout is reached. |
| void rapidsmpf::shuffler::Shuffler::wait_on | ( | PartID | pid, |
| std::optional< std::chrono::milliseconds > | timeout = {} |
||
| ) |
Wait for a specific partition to finish (blocking).
| pid | The desired partition ID. |
| timeout | Optional timeout (ms) to wait. |
| std::runtime_error | if the timeout is reached. |
|
staticconstexpr |
The mask for the counter in a chunk ID.
Definition at line 282 of file shuffler.hpp.