10 #include <condition_variable>
18 #include <thrust/execution_policy.h>
19 #include <thrust/transform.h>
22 #include <rapidsmpf/communicator/communicator.hpp>
23 #include <rapidsmpf/error.hpp>
24 #include <rapidsmpf/memory/buffer.hpp>
25 #include <rapidsmpf/progress_thread.hpp>
26 #include <rapidsmpf/statistics.hpp>
89 std::shared_ptr<Communicator>
comm,
90 std::unique_ptr<Buffer> input,
91 std::unique_ptr<Buffer> output,
94 std::function<
void(
void)> finished_callback =
nullptr
102 [[nodiscard]] std::shared_ptr<Communicator>
const&
comm() const noexcept {
157 [[nodiscard]] std::pair<std::unique_ptr<
Buffer>, std::unique_ptr<
Buffer>>
173 enum class Phase : uint8_t {
175 CompletePreRemainder,
179 CompletePostRemainder,
187 std::shared_ptr<Communicator> comm_{};
189 std::unique_ptr<Buffer> in_buffer_{};
190 std::unique_ptr<Buffer> out_buffer_{};
192 std::atomic<Phase> phase_{Phase::StartPreRemainder};
193 std::atomic<bool> active_{
true};
194 std::function<void()>
197 mutable std::mutex mutex_;
198 mutable std::condition_variable cv_;
200 Rank logical_rank_{-1};
201 Rank nearest_pow2_{0};
202 Rank non_pow2_remainder_{0};
204 Rank stage_partner_{-1};
206 ProgressThread::FunctionID function_id_{};
208 std::unique_ptr<Communicator::Future> send_future_{};
209 std::unique_ptr<Communicator::Future> recv_future_{};
222 template <
typename T,
typename Op>
233 auto const left_nbytes = left->
size;
235 left_nbytes %
sizeof(T) == 0,
236 "HostOp buffer size must be a multiple of sizeof(T)"
239 auto const count = left_nbytes /
sizeof(T);
246 "HostOp expects host memory"
249 auto* left_bytes = left->
data();
252 std::span<T const> left_span{
reinterpret_cast<T const*
>(left_bytes), count};
253 std::span<T> right_span{
reinterpret_cast<T*
>(right_bytes), count};
255 std::ranges::transform(left_span, right_span, right_span.begin(),
op);
271 template <
typename T,
typename Op>
283 auto const left_nbytes = left->
size;
285 left_nbytes %
sizeof(T) == 0,
286 "DeviceOp buffer size must be a multiple of sizeof(T)"
289 auto const count = left_nbytes /
sizeof(T);
297 "DeviceOp expects device memory"
301 auto const* left_bytes =
reinterpret_cast<std::byte const*
>(left->
data());
303 T* right_ptr =
reinterpret_cast<T*
>(right_bytes);
304 T
const* left_ptr =
reinterpret_cast<T const*
>(left_bytes);
307 thrust::cuda::par_nosync.on(stream.
value()),
320 "DeviceOp::operator() called but CUDA compilation (__CUDACC__) "
321 "was not available. DeviceOp requires CUDA/thrust support.",
336 template <
typename T,
typename Op>
337 requires std::invocable<Op, T const&, T const&>
353 template <
typename T,
typename Op>
354 requires std::invocable<Op, T const&, T const&>
362 "make_device_reduce_operator was called from code that was not compiled "
363 "with NVCC (__CUDACC__ is not defined).",
Buffer representing device or host memory.
auto write_access(F &&f) -> std::invoke_result_t< F, std::byte *, rmm::cuda_stream_view >
Provides stream-ordered write access to the buffer.
std::byte * exclusive_data_access()
Acquire non-stream-ordered exclusive access to the buffer's memory.
std::byte const * data() const
Access the underlying memory buffer (host or device memory).
std::size_t const size
The size of the buffer in bytes.
void unlock()
Release the exclusive lock acquired by exclusive_data_access().
constexpr MemoryType mem_type() const
Get the memory type of the buffer.
ProgressState
The progress state of a function, can be either InProgress or Done.
~AllReduce() noexcept
Destructor.
bool finished() const noexcept
Check if the allreduce operation has completed.
std::pair< std::unique_ptr< Buffer >, std::unique_ptr< Buffer > > wait_and_extract(std::chrono::milliseconds timeout=std::chrono::milliseconds{-1})
Wait for completion and extract the reduced data.
std::shared_ptr< Communicator > const & comm() const noexcept
Gets the communicator associated with this AllReduce.
AllReduce(std::shared_ptr< Communicator > comm, std::unique_ptr< Buffer > input, std::unique_ptr< Buffer > output, OpID op_id, ReduceOperator reduce_operator, std::function< void(void)> finished_callback=nullptr)
Construct a new AllReduce operation.
bool is_ready() const noexcept
Check if reduced results are ready for extraction.
cudaStream_t value() const noexcept
requires std::invocable< Op, T const &, T const & > ReduceOperator make_device_reduce_operator(Op op)
Create a device-based reduction operator from a typed binary operation.
requires std::invocable< Op, T const &, T const & > ReduceOperator make_host_reduce_operator(Op op)
Create a host-based reduction operator from a typed binary operation.
Collective communication interfaces.
std::function< void(Buffer const *left, Buffer *right)> ReduceOperator
Type alias for the reduction function signature.
std::int32_t Rank
The rank of a node (e.g. the rank of a MPI process), or world size (total number of ranks).
std::int32_t OpID
Operation ID defined by the user. This allows users to concurrently execute multiple operations,...
Device-side range-based reduction operator.
void operator()(Buffer const *left, Buffer *right)
Apply the reduction operator to the packed data ranges.
Op op
The binary reduction operator.
Host-side range-based reduction operator.
void operator()(Buffer const *left, Buffer *right)
Apply the reduction operator to the packed data ranges.
Op op
The binary reduction operator.