AllGather communication service. More...
#include <allgather.hpp>
Public Types | |
| enum class | Ordered : bool { NO , YES } |
| Tag requesting ordering for extraction. More... | |
Public Member Functions | |
| void | insert (std::uint64_t sequence_number, PackedData &&packed_data) |
| Insert packed data into the allgather operation. More... | |
| void | insert_finished () |
| Mark that this rank has finished contributing data. | |
| bool | finished () const noexcept |
| Check if the allgather operation has completed. More... | |
| std::vector< PackedData > | wait_and_extract (Ordered ordered=Ordered::YES, std::chrono::milliseconds timeout=std::chrono::milliseconds{-1}) |
| Wait for completion and extract all gathered data. More... | |
| std::vector< PackedData > | extract_ready () |
| Extract any available partitions. More... | |
| AllGather (std::shared_ptr< Communicator > comm, std::shared_ptr< ProgressThread > progress_thread, OpID op_id, BufferResource *br, std::shared_ptr< Statistics > statistics=Statistics::disabled(), std::function< void(void)> &&finished_callback=nullptr) | |
| Construct a new allgather operation. More... | |
| AllGather (AllGather const &)=delete | |
| Deleted copy constructor. | |
| AllGather & | operator= (AllGather const &)=delete |
| Deleted copy assignment operator. | |
| AllGather (AllGather &&)=delete | |
| Deleted move constructor. | |
| AllGather & | operator= (AllGather &&)=delete |
| Deleted move assignment operator. | |
| ~AllGather () | |
| Destructor. More... | |
| ProgressThread::ProgressState | event_loop () |
| Main event loop for processing allgather operations. More... | |
AllGather communication service.
The class provides a communication service where each rank contributes data and all ranks receive all inputs on all ranks.
The implementation uses a ring broadcast. Each rank receives a contribution from its left neighbour, forwards the message to its right neighbour (unless at the end of the ring) and then stores the contribution locally. The cost on P ranks if each rank inserts a message of size N is
(P - 1) alpha + N ((P - 1) / P) beta
Per insertion. Where alpha is the network latency and beta the inverse bandwidth. Although the latency term is linear (rather than logarithmic as is the case for Bruck's algorithm or recursive doubling) MPI implementations typically observe that for large messages ring allgorithms perform better since message passing is only nearest neighbour.
Definition at line 372 of file allgather.hpp.
|
strong |
Tag requesting ordering for extraction.
| Enumerator | |
|---|---|
| NO | Extraction is unordered. |
| YES | Extraction is ordered. |
Definition at line 396 of file allgather.hpp.
| rapidsmpf::allgather::AllGather::AllGather | ( | std::shared_ptr< Communicator > | comm, |
| std::shared_ptr< ProgressThread > | progress_thread, | ||
| OpID | op_id, | ||
| BufferResource * | br, | ||
| std::shared_ptr< Statistics > | statistics = Statistics::disabled(), |
||
| std::function< void(void)> && | finished_callback = nullptr |
||
| ) |
Construct a new allgather operation.
| comm | The communicator for communication. |
| progress_thread | The progress thread for asynchronous operations. |
| op_id | Unique operation identifier for this allgather. |
| br | Buffer resource for memory allocation. |
| statistics | Statistics collection instance (disabled by default). |
| finished_callback | Optional callback run when partitions are locally finished. The callback is guaranteed to be called by the progress thread exactly once when the allgather is locally ready. |
| rapidsmpf::allgather::AllGather::~AllGather | ( | ) |
Destructor.
AllGather is locally destructed before waiting to extract, there is no guarantee that in-flight communication will be completed. | ProgressThread::ProgressState rapidsmpf::allgather::AllGather::event_loop | ( | ) |
Main event loop for processing allgather operations.
This method is called by the progress thread to handle ongoing communication and data transfers.
| std::vector<PackedData> rapidsmpf::allgather::AllGather::extract_ready | ( | ) |
Extract any available partitions.
Example usage to drain an AllGather:
|
noexcept |
Check if the allgather operation has completed.
| void rapidsmpf::allgather::AllGather::insert | ( | std::uint64_t | sequence_number, |
| PackedData && | packed_data | ||
| ) |
Insert packed data into the allgather operation.
| sequence_number | Local ordered sequence number of the data. |
| packed_data | The data to contribute to the allgather. |
| std::vector<PackedData> rapidsmpf::allgather::AllGather::wait_and_extract | ( | Ordered | ordered = Ordered::YES, |
| std::chrono::milliseconds | timeout = std::chrono::milliseconds{-1} |
||
| ) |
Wait for completion and extract all gathered data.
Blocks until the allgather operation completes and returns all collected data from all ranks.
| ordered | If the extracted data should be ordered? if ordered, returned data will be ordered first by rank and then by insertion order on that rank. |
| timeout | Optional maximum duration to wait. Negative values mean no timeout. |
| std::runtime_error | If the timeout is reached. |