Shuffle service for cuDF tables. More...
#include <shuffler.hpp>
Public Types | |
| using | PartitionOwner = std::function< Rank(std::shared_ptr< Communicator >, PartID)> |
Function that given a Communicator and a PartID, returns the rapidsmpf::Rank of the owning node. | |
| using | FinishedCallback = detail::FinishCounter::FinishedCallback |
| Callback function type called when a partition is finished. More... | |
Public Member Functions | |
| Shuffler (std::shared_ptr< Communicator > comm, std::shared_ptr< ProgressThread > progress_thread, OpID op_id, PartID total_num_partitions, BufferResource *br, FinishedCallback &&finished_callback, std::shared_ptr< Statistics > statistics=Statistics::disabled(), PartitionOwner partition_owner=round_robin) | |
| Construct a new shuffler for a single shuffle. More... | |
| Shuffler (std::shared_ptr< Communicator > comm, std::shared_ptr< ProgressThread > progress_thread, OpID op_id, PartID total_num_partitions, BufferResource *br, std::shared_ptr< Statistics > statistics=Statistics::disabled(), PartitionOwner partition_owner=round_robin) | |
| Construct a new shuffler for a single shuffle. More... | |
| Shuffler (Shuffler const &)=delete | |
| Shuffler & | operator= (Shuffler const &)=delete |
| void | shutdown () |
| Shutdown the shuffle, blocking until all inflight communication is done. More... | |
| void | concat_insert (std::unordered_map< PartID, PackedData > &&chunks) |
| Insert a map of packed data, grouping them by destination rank, and concatenating into a single chunk per rank. More... | |
| void | insert (std::unordered_map< PartID, PackedData > &&chunks) |
| Insert a bunch of packed (serialized) chunks into the shuffle. More... | |
| void | insert_finished (PartID pid) |
| Insert a finish mark for a partition. More... | |
| void | insert_finished (std::vector< PartID > &&pids) |
| Insert a finish mark for a list of partitions. More... | |
| std::vector< PackedData > | extract (PartID pid) |
| Extract all chunks belonging to the specified partition. More... | |
| bool | finished () const |
| Check if all partitions are finished. More... | |
| bool | is_finished (PartID pid) const |
| Check if a partition is finished. More... | |
| PartID | wait_any (std::optional< std::chrono::milliseconds > timeout={}) |
| Wait for any partition to finish. More... | |
| void | wait_on (PartID pid, std::optional< std::chrono::milliseconds > timeout={}) |
| Wait for a specific partition to finish (blocking). More... | |
| std::size_t | spill (std::optional< std::size_t > amount=std::nullopt) |
| Spills data to device if necessary. More... | |
| std::string | str () const |
| Returns a description of this instance. More... | |
| std::span< PartID const > | local_partitions () const |
| Returns the local partition IDs owned by the shuffler`. More... | |
Static Public Member Functions | |
| static Rank | round_robin (std::shared_ptr< Communicator > const &comm, PartID pid) |
A PartitionOwner that distribute the partition using round robin. More... | |
| static std::vector< PartID > | local_partitions (std::shared_ptr< Communicator > const &comm, PartID total_num_partitions, PartitionOwner partition_owner) |
| Returns the local partition IDs owned by the current node. More... | |
| static constexpr uint64_t | extract_counter (detail::ChunkID cid) |
| Extract the counter from a chunk ID. More... | |
| static constexpr Rank | extract_rank (detail::ChunkID cid) |
| Extract the rank from a chunk ID. More... | |
| static constexpr std::pair< Rank, uint64_t > | extract_info (detail::ChunkID cid) |
| Extract the rank and counter from a chunk ID. More... | |
Public Attributes | |
| PartID const | total_num_partitions |
| Total number of partition in the shuffle. | |
| PartitionOwner const | partition_owner |
| Function to determine partition ownership. | |
Static Public Attributes | |
| static constexpr int | chunk_id_counter_bits = 38 |
| The number of bits used to store the counter in a chunk ID. | |
| static constexpr uint64_t | counter_mask = (uint64_t(1) << chunk_id_counter_bits) - 1 |
| The mask for the counter in a chunk ID. | |
Friends | |
| class | ::ShuffleInsertGroupedTest |
Shuffle service for cuDF tables.
The Shuffler class provides an interface for performing a shuffle operation on cuDF tables, using a partitioning scheme to distribute and collect data chunks across different ranks.
Definition at line 47 of file shuffler.hpp.
Callback function type called when a partition is finished.
The callback receives the partition ID of the finished partition.
wait* methods. And be very careful if acquiring locks. Ideally it should be used to signal a separate thread to do the actual processing. Definition at line 83 of file shuffler.hpp.
| rapidsmpf::shuffler::Shuffler::Shuffler | ( | std::shared_ptr< Communicator > | comm, |
| std::shared_ptr< ProgressThread > | progress_thread, | ||
| OpID | op_id, | ||
| PartID | total_num_partitions, | ||
| BufferResource * | br, | ||
| FinishedCallback && | finished_callback, | ||
| std::shared_ptr< Statistics > | statistics = Statistics::disabled(), |
||
| PartitionOwner | partition_owner = round_robin |
||
| ) |
Construct a new shuffler for a single shuffle.
| comm | The communicator to use. |
| progress_thread | The progress thread to use. |
| op_id | The operation ID of the shuffle. This ID is unique for this operation, and should not be reused until all nodes has called Shuffler::shutdown(). |
| total_num_partitions | Total number of partitions in the shuffle. |
| br | Buffer resource used to allocate temporary and the shuffle result. |
| finished_callback | Callback to notify when a partition is finished. |
| statistics | The statistics instance to use (disabled by default). |
| partition_owner | Function to determine partition ownership. |
|
inline |
Construct a new shuffler for a single shuffle.
| comm | The communicator to use. |
| progress_thread | The progress thread to use. |
| op_id | The operation ID of the shuffle. This ID is unique for this operation, and should not be reused until all nodes has called Shuffler::shutdown(). |
| total_num_partitions | Total number of partitions in the shuffle. |
| br | Buffer resource used to allocate temporary and the shuffle result. |
| statistics | The statistics instance to use (disabled by default). |
| partition_owner | Function to determine partition ownership. |
Definition at line 129 of file shuffler.hpp.
| void rapidsmpf::shuffler::Shuffler::concat_insert | ( | std::unordered_map< PartID, PackedData > && | chunks | ) |
Insert a map of packed data, grouping them by destination rank, and concatenating into a single chunk per rank.
| chunks | A map of partition IDs and their packed chunks. |
| std::vector<PackedData> rapidsmpf::shuffler::Shuffler::extract | ( | PartID | pid | ) |
Extract all chunks belonging to the specified partition.
It is valid to extract a partition that has not yet been fully received. In such cases, only the chunks received so far are returned.
To ensure the partition is complete, use wait_any(), wait_on(), or another appropriate synchronization mechanism beforehand.
| pid | The ID of the partition to extract. |
|
inlinestaticconstexpr |
Extract the counter from a chunk ID.
| cid | The chunk ID. |
Definition at line 288 of file shuffler.hpp.
|
inlinestaticconstexpr |
Extract the rank and counter from a chunk ID.
| cid | The chunk ID. |
Definition at line 306 of file shuffler.hpp.
|
inlinestaticconstexpr |
Extract the rank from a chunk ID.
| cid | The chunk ID. |
Definition at line 297 of file shuffler.hpp.
| bool rapidsmpf::shuffler::Shuffler::finished | ( | ) | const |
Check if all partitions are finished.
| void rapidsmpf::shuffler::Shuffler::insert | ( | std::unordered_map< PartID, PackedData > && | chunks | ) |
Insert a bunch of packed (serialized) chunks into the shuffle.
| chunks | A map of partition IDs and their packed chunks. |
| void rapidsmpf::shuffler::Shuffler::insert_finished | ( | PartID | pid | ) |
Insert a finish mark for a partition.
This tells the shuffler that no more chunks of the specified partition are coming.
| pid | The partition ID to mark as finished. |
| void rapidsmpf::shuffler::Shuffler::insert_finished | ( | std::vector< PartID > && | pids | ) |
Insert a finish mark for a list of partitions.
| pids | The list of partition IDs to mark as finished. |
| bool rapidsmpf::shuffler::Shuffler::is_finished | ( | PartID | pid | ) | const |
Check if a partition is finished.
| pid | The partition ID to check. |
| std::span<PartID const> rapidsmpf::shuffler::Shuffler::local_partitions | ( | ) | const |
Returns the local partition IDs owned by the shuffler`.
|
static |
Returns the local partition IDs owned by the current node.
| comm | The communicator to use. |
| total_num_partitions | Total number of partitions in the shuffle. |
| partition_owner | Function that determines partition ownership. |
|
inlinestatic |
A PartitionOwner that distribute the partition using round robin.
| comm | The communicator to use. |
| pid | The partition ID to query. |
Definition at line 64 of file shuffler.hpp.
| void rapidsmpf::shuffler::Shuffler::shutdown | ( | ) |
Shutdown the shuffle, blocking until all inflight communication is done.
| std::logic_error | If the shuffler is already inactive. |
| std::size_t rapidsmpf::shuffler::Shuffler::spill | ( | std::optional< std::size_t > | amount = std::nullopt | ) |
Spills data to device if necessary.
This function has two modes:
amount is specified, it tries to spill at least amount bytes of device memory.amount is not specified (the default case), it spills based on the current available device memory returned by the buffer resource.In both modes, it adds to the "spill-device-limit-breach" statistic if not enough memory could be spilled.
| amount | An optional amount of memory to spill. If not provided, the function will check the current available device memory. |
| std::string rapidsmpf::shuffler::Shuffler::str | ( | ) | const |
Returns a description of this instance.
| PartID rapidsmpf::shuffler::Shuffler::wait_any | ( | std::optional< std::chrono::milliseconds > | timeout = {} | ) |
Wait for any partition to finish.
| timeout | Optional timeout (ms) to wait. |
| std::runtime_error | if the timeout is reached. |
| void rapidsmpf::shuffler::Shuffler::wait_on | ( | PartID | pid, |
| std::optional< std::chrono::milliseconds > | timeout = {} |
||
| ) |
Wait for a specific partition to finish (blocking).
| pid | The desired partition ID. |
| timeout | Optional timeout (ms) to wait. |
| std::runtime_error | if the timeout is reached. |