Public Member Functions | List of all members
rapidsmpf::coll::AllReduce Class Reference

AllReduce collective. More...

#include <allreduce.hpp>

Public Member Functions

 AllReduce (std::shared_ptr< Communicator > comm, std::unique_ptr< Buffer > input, std::unique_ptr< Buffer > output, OpID op_id, ReduceOperator reduce_operator, std::function< void(void)> finished_callback=nullptr)
 Construct a new AllReduce operation. More...
 
std::shared_ptr< Communicator > const & comm () const noexcept
 Gets the communicator associated with this AllReduce. More...
 
 AllReduce (AllReduce const &)=delete
 
AllReduceoperator= (AllReduce const &)=delete
 
 AllReduce (AllReduce &&)=delete
 
AllReduceoperator= (AllReduce &&)=delete
 
 ~AllReduce () noexcept
 Destructor. More...
 
bool finished () const noexcept
 Check if the allreduce operation has completed. More...
 
std::pair< std::unique_ptr< Buffer >, std::unique_ptr< Buffer > > wait_and_extract (std::chrono::milliseconds timeout=std::chrono::milliseconds{-1})
 Wait for completion and extract the reduced data. More...
 
bool is_ready () const noexcept
 Check if reduced results are ready for extraction. More...
 

Detailed Description

AllReduce collective.

The implementation uses a butterfly recursive doubling scheme for message exchange, using no extra memory and O(log P) rounds for P ranks.

The actual reduction is implemented via a type-erased ReduceOperator that is supplied at construction time. Helper factories such as detail::make_host_reduce_operator or detail::make_device_reduce_operator can be used to build range-based reductions over contiguous arrays.

Note
No internal allocations are made. The memory types and sizes of the two provided buffers must match, and the provided reduction operator must be valid for the memory type of the buffers.
The reduction is safe to use with both non-associative and non-commutative reduction operations in the sense that all participating ranks are guaranteed to receive the same answer even if the operator is not associative or commutative.
It is safe to reuse the op_id passed to the AllReduce construction locally as soon as wait_and_extract is complete.
Warning
Behaviour of this object is undefined if it is destructed without first ensuring that wait_and_extract completes successfully.

Definition at line 66 of file allreduce.hpp.

Constructor & Destructor Documentation

◆ AllReduce()

rapidsmpf::coll::AllReduce::AllReduce ( std::shared_ptr< Communicator comm,
std::unique_ptr< Buffer input,
std::unique_ptr< Buffer output,
OpID  op_id,
ReduceOperator  reduce_operator,
std::function< void(void)>  finished_callback = nullptr 
)

Construct a new AllReduce operation.

Parameters
commThe communicator for communication.
inputLocal data to contribute to the reduction.
outputAllocated buffer in which to place reduction result. Must be the same size and memory type as input. Overwritten with the reduction result (values already in the buffer are ignored).
op_idUnique operation identifier for this allreduce.
reduce_operatorType-erased reduction operator to use. See ReduceOperator.
finished_callbackOptional callback run once locally when the allreduce is finished and results are ready for extraction.
Note
It is safe to reuse the op_id as soon as wait_and_extract has completed locally.
Exceptions
std::invalid_argumentIf the input and output buffers do not match appropriately (same size, same memory type).

◆ ~AllReduce()

rapidsmpf::coll::AllReduce::~AllReduce ( )
noexcept

Destructor.

Note
This operation is logically collective. If an AllReduce is locally destructed before wait_and_extract is called, there is no guarantee that in-flight communication will be completed.

Member Function Documentation

◆ comm()

std::shared_ptr<Communicator> const& rapidsmpf::coll::AllReduce::comm ( ) const
inlinenoexcept

Gets the communicator associated with this AllReduce.

Returns
Shared pointer to communicator.

Definition at line 101 of file allreduce.hpp.

◆ finished()

bool rapidsmpf::coll::AllReduce::finished ( ) const
noexcept

Check if the allreduce operation has completed.

Returns
True if all data and finish messages from all ranks have been received and locally reduced.

◆ is_ready()

bool rapidsmpf::coll::AllReduce::is_ready ( ) const
noexcept

Check if reduced results are ready for extraction.

This returns true once the underlying allgather has completed and, if wait_and_extract has not yet been called, indicates that calling it would not block.

Returns
True if the allreduce operation has completed and results are ready for extraction, false otherwise.

◆ wait_and_extract()

std::pair<std::unique_ptr<Buffer>, std::unique_ptr<Buffer> > rapidsmpf::coll::AllReduce::wait_and_extract ( std::chrono::milliseconds  timeout = std::chrono::milliseconds{-1})

Wait for completion and extract the reduced data.

Blocks until the allreduce operation completes and returns the globally reduced result.

This method is destructive and can only be called once. The first call extracts the buffers provided to the AllReduce constructor. Subsequent calls will throw std::runtime_error because the underlying data has already been consumed.

Parameters
timeoutOptional maximum duration to wait. Negative values mean no timeout.
Returns
A pair of the two Buffers passed to the constructor. The first Buffer contains an implementation-defined value, the second Buffer contains the final reduced result.
Note
The streams of the Buffers may change in an implementation-defined way while owned by the AllReduce object, if you need to launch new stream-ordered work on a Buffer you obtain from this function, you must obtain the correct stream from the Buffer itself.
Warning
There may be outstanding stream-ordered work reading from the first Buffer when this function returns (not tracked by the buffer's Buffer::latest_write_event()). If you want to pass it to a non-stream-ordered API that writes to the buffer you must synchronise its stream first.
Exceptions
std::runtime_errorIf the timeout is reached or if this method is called more than once.

The documentation for this class was generated from the following file: