Shuffle service for all-to-all style communication of partitioned data. More...
#include <shuffler.hpp>
Public Types | |
| using | PartitionOwner = std::function< Rank(std::shared_ptr< Communicator > const &, PartID, PartID)> |
Function that given a Communicator, PartID, and total partition count, returns the rapidsmpf::Rank of the owning node. | |
| using | FinishedCallback = std::function< void()> |
| Callback function type called when all partitions are finished and data can be extracted. More... | |
Public Member Functions | |
| Shuffler (std::shared_ptr< Communicator > comm, OpID op_id, PartID total_num_partitions, BufferResource *br, FinishedCallback &&finished_callback, PartitionOwner partition_owner=round_robin, std::unique_ptr< communicator::MetadataPayloadExchange > mpe=nullptr) | |
| Construct a new shuffler for a single shuffle. More... | |
| Shuffler (std::shared_ptr< Communicator > comm, OpID op_id, PartID total_num_partitions, BufferResource *br, PartitionOwner partition_owner=round_robin, std::unique_ptr< communicator::MetadataPayloadExchange > mpe=nullptr) | |
| Construct a new shuffler for a single shuffle. More... | |
| std::shared_ptr< Communicator > const & | comm () const noexcept |
| Gets the communicator associated with this Shuffler. More... | |
| Shuffler (Shuffler const &)=delete | |
| Shuffler & | operator= (Shuffler const &)=delete |
| void | shutdown () |
| Shutdown the shuffle, blocking until all inflight communication is done. More... | |
| void | insert (std::unordered_map< PartID, PackedData > &&chunks) |
| Insert a bunch of packed (serialized) chunks into the shuffle. More... | |
| void | insert_finished () |
| Signal that no more data will be inserted into the shuffle. More... | |
| std::vector< PackedData > | extract (PartID pid) |
| Extract all chunks belonging to the specified partition. More... | |
| bool | finished () const |
| Check if all partitions are finished. More... | |
| void | wait (std::optional< std::chrono::milliseconds > timeout={}) |
| Wait for all partitions to finish (blocking). More... | |
| std::size_t | spill (std::optional< std::size_t > amount=std::nullopt) |
| Spills data to device if necessary. More... | |
| std::string | str () const |
| Returns a description of this instance. More... | |
| std::span< PartID const > | local_partitions () const |
| Returns the local partition IDs owned by the shuffler`. More... | |
Static Public Member Functions | |
| static Rank | round_robin (std::shared_ptr< Communicator > const &comm, PartID pid, [[maybe_unused]] PartID total_num_partitions) |
A PartitionOwner that distributes partitions using round robin assignment. More... | |
| static Rank | contiguous (std::shared_ptr< Communicator > const &comm, PartID pid, PartID total_num_partitions) |
A PartitionOwner that assigns contiguous partition ID ranges to ranks. More... | |
| static std::vector< PartID > | local_partitions (std::shared_ptr< Communicator > const &comm, PartID total_num_partitions, PartitionOwner partition_owner) |
| Returns the local partition IDs owned by the current node. More... | |
| static constexpr Rank | extract_rank (detail::ChunkID cid) |
| Extract the rank from a chunk ID. More... | |
Public Attributes | |
| PartID const | total_num_partitions |
| Total number of partition in the shuffle. | |
| PartitionOwner const | partition_owner |
| Function to determine partition ownership. | |
Static Public Attributes | |
| static constexpr int | chunk_id_counter_bits = 38 |
| The number of bits used to store the counter in a chunk ID. | |
Shuffle service for all-to-all style communication of partitioned data.
The Shuffler class provides an interface for performing a shuffle operation on distributed data, using a partitioning scheme to distribute and collect data chunks across different ranks.
Definition at line 46 of file shuffler.hpp.
| using rapidsmpf::shuffler::Shuffler::FinishedCallback = std::function<void()> |
Callback function type called when all partitions are finished and data can be extracted.
Definition at line 111 of file shuffler.hpp.
| rapidsmpf::shuffler::Shuffler::Shuffler | ( | std::shared_ptr< Communicator > | comm, |
| OpID | op_id, | ||
| PartID | total_num_partitions, | ||
| BufferResource * | br, | ||
| FinishedCallback && | finished_callback, | ||
| PartitionOwner | partition_owner = round_robin, |
||
| std::unique_ptr< communicator::MetadataPayloadExchange > | mpe = nullptr |
||
| ) |
Construct a new shuffler for a single shuffle.
| comm | The communicator to use. |
| op_id | The operation ID of the shuffle. |
| total_num_partitions | Total number of partitions in the shuffle. |
| br | Buffer resource used to allocate temporary and the shuffle result. |
| finished_callback | Callback to notify when all partitions are finished. |
| partition_owner | Function to determine partition ownership. |
| mpe | Optional custom metadata payload exchange. If not provided, uses the default tag-based implementation. |
op_id as soon as wait has completed locally.
|
inline |
Construct a new shuffler for a single shuffle.
| comm | The communicator to use. |
| op_id | The operation ID of the shuffle. This ID is unique for this operation, and should not be reused until all nodes has called Shuffler::shutdown(). |
| total_num_partitions | Total number of partitions in the shuffle. |
| br | Buffer resource used to allocate temporary and the shuffle result. |
| partition_owner | Function to determine partition ownership. |
| mpe | Optional custom metadata payload exchange. If not provided, uses the default tag-based implementation. |
Definition at line 158 of file shuffler.hpp.
|
inlinenoexcept |
Gets the communicator associated with this Shuffler.
Definition at line 183 of file shuffler.hpp.
|
inlinestatic |
A PartitionOwner that assigns contiguous partition ID ranges to ranks.
Rank 0 gets [0, k), rank 1 gets [k, 2k), etc. Use for sort so that each rank's local_partitions() are adjacent and in order.
| comm | The communicator to use. |
| pid | The partition ID to query. |
| total_num_partitions | Total number of partitions (must match the shuffle). |
Definition at line 82 of file shuffler.hpp.
| std::vector<PackedData> rapidsmpf::shuffler::Shuffler::extract | ( | PartID | pid | ) |
Extract all chunks belonging to the specified partition.
It is valid to extract a partition that has not yet been fully received. In such cases, only the chunks received so far are returned.
To ensure the partition is complete, use wait() or another appropriate synchronization mechanism beforehand.
| pid | The ID of the partition to extract. |
|
inlinestaticconstexpr |
Extract the rank from a chunk ID.
| cid | The chunk ID. |
Definition at line 287 of file shuffler.hpp.
| bool rapidsmpf::shuffler::Shuffler::finished | ( | ) | const |
Check if all partitions are finished.
| void rapidsmpf::shuffler::Shuffler::insert | ( | std::unordered_map< PartID, PackedData > && | chunks | ) |
Insert a bunch of packed (serialized) chunks into the shuffle.
insert_finished() is called after all insert() calls have completed.| chunks | A map of partition IDs and their packed chunks. |
| void rapidsmpf::shuffler::Shuffler::insert_finished | ( | ) |
Signal that no more data will be inserted into the shuffle.
This informs the shuffler that this rank has finished inserting data. Must be called exactly once.
insert()ing, you must establish a happens-before relationship between the completion of all insert()s and the final call to insert_finished(). | std::span<PartID const> rapidsmpf::shuffler::Shuffler::local_partitions | ( | ) | const |
Returns the local partition IDs owned by the shuffler`.
|
static |
Returns the local partition IDs owned by the current node.
| comm | The communicator to use. |
| total_num_partitions | Total number of partitions in the shuffle. |
| partition_owner | Function that determines partition ownership. |
|
inlinestatic |
A PartitionOwner that distributes partitions using round robin assignment.
| comm | The communicator to use. |
| pid | The partition ID to query. |
| total_num_partitions | Total number of partitions (unused). |
Definition at line 63 of file shuffler.hpp.
| void rapidsmpf::shuffler::Shuffler::shutdown | ( | ) |
Shutdown the shuffle, blocking until all inflight communication is done.
| std::logic_error | If the shuffler is already inactive. |
| std::size_t rapidsmpf::shuffler::Shuffler::spill | ( | std::optional< std::size_t > | amount = std::nullopt | ) |
Spills data to device if necessary.
This function has two modes:
amount is specified, it tries to spill at least amount bytes of device memory.amount is not specified (the default case), it spills based on the current available device memory returned by the buffer resource.| amount | An optional amount of memory to spill. If not provided, the function will check the current available device memory. |
| std::string rapidsmpf::shuffler::Shuffler::str | ( | ) | const |
Returns a description of this instance.
| void rapidsmpf::shuffler::Shuffler::wait | ( | std::optional< std::chrono::milliseconds > | timeout = {} | ) |
Wait for all partitions to finish (blocking).
| timeout | Optional timeout (ms) to wait. |
| std::runtime_error | if the timeout is reached. |