#include <allreduce.hpp>
Public Member Functions | |
| AllReduce (std::shared_ptr< Communicator > comm, std::unique_ptr< Buffer > input, std::unique_ptr< Buffer > output, OpID op_id, ReduceOperator reduce_operator, std::function< void(void)> finished_callback=nullptr) | |
| Construct a new AllReduce operation. More... | |
| std::shared_ptr< Communicator > const & | comm () const noexcept |
| Gets the communicator associated with this AllReduce. More... | |
| AllReduce (AllReduce const &)=delete | |
| AllReduce & | operator= (AllReduce const &)=delete |
| AllReduce (AllReduce &&)=delete | |
| AllReduce & | operator= (AllReduce &&)=delete |
| ~AllReduce () noexcept | |
| Destructor. More... | |
| bool | finished () const noexcept |
| Check if the allreduce operation has completed. More... | |
| std::pair< std::unique_ptr< Buffer >, std::unique_ptr< Buffer > > | wait_and_extract (std::chrono::milliseconds timeout=std::chrono::milliseconds{-1}) |
| Wait for completion and extract the reduced data. More... | |
| bool | is_ready () const noexcept |
| Check if reduced results are ready for extraction. More... | |
AllReduce collective.
The implementation uses a butterfly recursive doubling scheme for message exchange, using no extra memory and O(log P) rounds for P ranks.
The actual reduction is implemented via a type-erased ReduceOperator that is supplied at construction time. Helper factories such as detail::make_host_reduce_operator or detail::make_device_reduce_operator can be used to build range-based reductions over contiguous arrays.
op_id passed to the AllReduce construction locally as soon as wait_and_extract is complete.wait_and_extract completes successfully. Definition at line 66 of file allreduce.hpp.
| rapidsmpf::coll::AllReduce::AllReduce | ( | std::shared_ptr< Communicator > | comm, |
| std::unique_ptr< Buffer > | input, | ||
| std::unique_ptr< Buffer > | output, | ||
| OpID | op_id, | ||
| ReduceOperator | reduce_operator, | ||
| std::function< void(void)> | finished_callback = nullptr |
||
| ) |
Construct a new AllReduce operation.
| comm | The communicator for communication. |
| input | Local data to contribute to the reduction. |
| output | Allocated buffer in which to place reduction result. Must be the same size and memory type as input. Overwritten with the reduction result (values already in the buffer are ignored). |
| op_id | Unique operation identifier for this allreduce. |
| reduce_operator | Type-erased reduction operator to use. See ReduceOperator. |
| finished_callback | Optional callback run once locally when the allreduce is finished and results are ready for extraction. |
op_id as soon as wait_and_extract has completed locally.| std::invalid_argument | If the input and output buffers do not match appropriately (same size, same memory type). |
|
noexcept |
Destructor.
AllReduce is locally destructed before wait_and_extract is called, there is no guarantee that in-flight communication will be completed.
|
inlinenoexcept |
Gets the communicator associated with this AllReduce.
Definition at line 101 of file allreduce.hpp.
|
noexcept |
Check if the allreduce operation has completed.
|
noexcept |
Check if reduced results are ready for extraction.
This returns true once the underlying allgather has completed and, if wait_and_extract has not yet been called, indicates that calling it would not block.
| std::pair<std::unique_ptr<Buffer>, std::unique_ptr<Buffer> > rapidsmpf::coll::AllReduce::wait_and_extract | ( | std::chrono::milliseconds | timeout = std::chrono::milliseconds{-1} | ) |
Wait for completion and extract the reduced data.
Blocks until the allreduce operation completes and returns the globally reduced result.
This method is destructive and can only be called once. The first call extracts the buffers provided to the AllReduce constructor. Subsequent calls will throw std::runtime_error because the underlying data has already been consumed.
| timeout | Optional maximum duration to wait. Negative values mean no timeout. |
Buffers passed to the constructor. The first Buffer contains an implementation-defined value, the second Buffer contains the final reduced result.AllReduce object, if you need to launch new stream-ordered work on a Buffer you obtain from this function, you must obtain the correct stream from the Buffer itself.Buffer when this function returns (not tracked by the buffer's Buffer::latest_write_event()). If you want to pass it to a non-stream-ordered API that writes to the buffer you must synchronise its stream first.| std::runtime_error | If the timeout is reached or if this method is called more than once. |