Public Types | Public Member Functions | List of all members
rapidsmpf::allgather::AllGather Class Reference

AllGather communication service. More...

#include <allgather.hpp>

Public Types

enum class  Ordered : bool { NO , YES }
 Tag requesting ordering for extraction. More...
 

Public Member Functions

void insert (std::uint64_t sequence_number, PackedData &&packed_data)
 Insert packed data into the allgather operation. More...
 
void insert_finished ()
 Mark that this rank has finished contributing data.
 
bool finished () const noexcept
 Check if the allgather operation has completed. More...
 
std::vector< PackedDatawait_and_extract (Ordered ordered=Ordered::YES, std::chrono::milliseconds timeout=std::chrono::milliseconds{-1})
 Wait for completion and extract all gathered data. More...
 
std::vector< PackedDataextract_ready ()
 Extract any available partitions. More...
 
 AllGather (std::shared_ptr< Communicator > comm, std::shared_ptr< ProgressThread > progress_thread, OpID op_id, BufferResource *br, std::shared_ptr< Statistics > statistics=Statistics::disabled(), std::function< void(void)> &&finished_callback=nullptr)
 Construct a new allgather operation. More...
 
 AllGather (AllGather const &)=delete
 Deleted copy constructor.
 
AllGatheroperator= (AllGather const &)=delete
 Deleted copy assignment operator.
 
 AllGather (AllGather &&)=delete
 Deleted move constructor.
 
AllGatheroperator= (AllGather &&)=delete
 Deleted move assignment operator.
 
 ~AllGather ()
 Destructor. More...
 
ProgressThread::ProgressState event_loop ()
 Main event loop for processing allgather operations. More...
 

Detailed Description

AllGather communication service.

The class provides a communication service where each rank contributes data and all ranks receive all inputs on all ranks.

The implementation uses a ring broadcast. Each rank receives a contribution from its left neighbour, forwards the message to its right neighbour (unless at the end of the ring) and then stores the contribution locally. The cost on P ranks if each rank inserts a message of size N is

(P - 1) alpha + N ((P - 1) / P) beta

Per insertion. Where alpha is the network latency and beta the inverse bandwidth. Although the latency term is linear (rather than logarithmic as is the case for Bruck's algorithm or recursive doubling) MPI implementations typically observe that for large messages ring allgorithms perform better since message passing is only nearest neighbour.

Definition at line 372 of file allgather.hpp.

Member Enumeration Documentation

◆ Ordered

Tag requesting ordering for extraction.

Enumerator
NO 

Extraction is unordered.

YES 

Extraction is ordered.

Definition at line 396 of file allgather.hpp.

Constructor & Destructor Documentation

◆ AllGather()

rapidsmpf::allgather::AllGather::AllGather ( std::shared_ptr< Communicator comm,
std::shared_ptr< ProgressThread progress_thread,
OpID  op_id,
BufferResource br,
std::shared_ptr< Statistics statistics = Statistics::disabled(),
std::function< void(void)> &&  finished_callback = nullptr 
)

Construct a new allgather operation.

Parameters
commThe communicator for communication.
progress_threadThe progress thread for asynchronous operations.
op_idUnique operation identifier for this allgather.
brBuffer resource for memory allocation.
statisticsStatistics collection instance (disabled by default).
finished_callbackOptional callback run when partitions are locally finished. The callback is guaranteed to be called by the progress thread exactly once when the allgather is locally ready.
Note
The caller promises that inserted buffers are stream-ordered with respect to their own stream, and extracted buffers are likewise guaranteed to be stream- ordered with respect to their own stream.

◆ ~AllGather()

rapidsmpf::allgather::AllGather::~AllGather ( )

Destructor.

Note
This operation is logically collective. If an AllGather is locally destructed before waiting to extract, there is no guarantee that in-flight communication will be completed.

Member Function Documentation

◆ event_loop()

ProgressThread::ProgressState rapidsmpf::allgather::AllGather::event_loop ( )

Main event loop for processing allgather operations.

This method is called by the progress thread to handle ongoing communication and data transfers.

Returns
The current progress state.

◆ extract_ready()

std::vector<PackedData> rapidsmpf::allgather::AllGather::extract_ready ( )

Extract any available partitions.

Returns
A vector containing available data (or empty if none).
Note
This is a non-blocking, unordered interface.

Example usage to drain an AllGather:

auto allgather = ...; // create
...; // insert data
allgather->insert_finished(); // finish inserting
std::vector<PackedData> results;
while (!allgather->finished()) {
std::ranges::move(allgather->extract_ready(), std::back_inserter(results));
}
// Extract any final chunks that may have arrived.
std::ranges::move(allgather->extract_ready(), std::back_inserter(results));
Node allgather(std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, OpID op_id, AllGather::Ordered ordered=AllGather::Ordered::YES)
Create an allgather node for a single allgather operation.

◆ finished()

bool rapidsmpf::allgather::AllGather::finished ( ) const
noexcept

Check if the allgather operation has completed.

Returns
True if we have received all data and finish messages from all ranks.

◆ insert()

void rapidsmpf::allgather::AllGather::insert ( std::uint64_t  sequence_number,
PackedData &&  packed_data 
)

Insert packed data into the allgather operation.

Parameters
sequence_numberLocal ordered sequence number of the data.
packed_dataThe data to contribute to the allgather.

◆ wait_and_extract()

std::vector<PackedData> rapidsmpf::allgather::AllGather::wait_and_extract ( Ordered  ordered = Ordered::YES,
std::chrono::milliseconds  timeout = std::chrono::milliseconds{-1} 
)

Wait for completion and extract all gathered data.

Blocks until the allgather operation completes and returns all collected data from all ranks.

Parameters
orderedIf the extracted data should be ordered? if ordered, returned data will be ordered first by rank and then by insertion order on that rank.
timeoutOptional maximum duration to wait. Negative values mean no timeout.
Returns
A vector containing packed data from all participating ranks.
Exceptions
std::runtime_errorIf the timeout is reached.

The documentation for this class was generated from the following file: