11 #include <condition_variable>
20 #include <thrust/execution_policy.h>
21 #include <thrust/transform.h>
24 #include <rapidsmpf/communicator/communicator.hpp>
25 #include <rapidsmpf/error.hpp>
26 #include <rapidsmpf/memory/buffer.hpp>
27 #include <rapidsmpf/progress_thread.hpp>
28 #include <rapidsmpf/statistics.hpp>
88 std::shared_ptr<Communicator>
comm,
89 std::unique_ptr<Buffer> input,
90 std::unique_ptr<Buffer> output,
93 std::function<
void(
void)> finished_callback =
nullptr
101 [[nodiscard]] std::shared_ptr<Communicator>
const&
comm() const noexcept {
156 [[nodiscard]] std::pair<std::unique_ptr<
Buffer>, std::unique_ptr<
Buffer>>
172 enum class Phase : uint8_t {
174 CompletePreRemainder,
178 CompletePostRemainder,
186 std::shared_ptr<Communicator> comm_{};
188 std::unique_ptr<Buffer> in_buffer_{};
189 std::unique_ptr<Buffer> out_buffer_{};
191 std::atomic<Phase> phase_{Phase::StartPreRemainder};
192 std::atomic<bool> active_{
true};
193 std::function<void()>
196 mutable std::mutex mutex_;
197 mutable std::condition_variable cv_;
199 Rank logical_rank_{-1};
200 Rank nearest_pow2_{0};
201 Rank non_pow2_remainder_{0};
203 Rank stage_partner_{-1};
205 ProgressThread::FunctionID function_id_{};
207 std::unique_ptr<Communicator::Future> send_future_{};
208 std::unique_ptr<Communicator::Future> recv_future_{};
221 template <
typename T,
typename Op>
232 auto const left_nbytes = left->
size;
234 left_nbytes %
sizeof(T) == 0,
235 "HostOp buffer size must be a multiple of sizeof(T)"
238 auto const count = left_nbytes /
sizeof(T);
245 "HostOp expects host memory"
248 auto* left_bytes = left->
data();
251 std::span<T const> left_span{
reinterpret_cast<T const*
>(left_bytes), count};
252 std::span<T> right_span{
reinterpret_cast<T*
>(right_bytes), count};
254 std::ranges::transform(left_span, right_span, right_span.begin(),
op);
270 template <
typename T,
typename Op>
282 auto const left_nbytes = left->
size;
284 left_nbytes %
sizeof(T) == 0,
285 "DeviceOp buffer size must be a multiple of sizeof(T)"
288 auto const count = left_nbytes /
sizeof(T);
296 "DeviceOp expects device memory"
300 auto const* left_bytes =
reinterpret_cast<std::byte const*
>(left->
data());
302 T* right_ptr =
reinterpret_cast<T*
>(right_bytes);
303 T
const* left_ptr =
reinterpret_cast<T const*
>(left_bytes);
306 thrust::cuda::par_nosync.on(stream.
value()),
319 "DeviceOp::operator() called but CUDA compilation (__CUDACC__) "
320 "was not available. DeviceOp requires CUDA/thrust support.",
335 template <
typename T,
typename Op>
336 requires std::invocable<Op, T const&, T const&>
338 return HostOp<T, Op>{std::move(op)};
352 template <
typename T,
typename Op>
353 requires std::invocable<Op, T const&, T const&>
356 return DeviceOp<T, Op>{std::move(op)};
361 "make_device_reduce_operator was called from code that was not compiled "
362 "with NVCC (__CUDACC__ is not defined).",
Buffer representing device or host memory.
auto write_access(F &&f) -> std::invoke_result_t< F, std::byte *, rmm::cuda_stream_view >
Provides stream-ordered write access to the buffer.
std::byte * exclusive_data_access()
Acquire non-stream-ordered exclusive access to the buffer's memory.
std::byte const * data() const
Access the underlying memory buffer (host or device memory).
std::size_t const size
The size of the buffer in bytes.
void unlock()
Release the exclusive lock acquired by exclusive_data_access().
constexpr MemoryType mem_type() const
Get the memory type of the buffer.
ProgressState
The progress state of a function, can be either InProgress or Done.
~AllReduce() noexcept
Destructor.
bool finished() const noexcept
Check if the allreduce operation has completed.
std::pair< std::unique_ptr< Buffer >, std::unique_ptr< Buffer > > wait_and_extract(std::chrono::milliseconds timeout=std::chrono::milliseconds{-1})
Wait for completion and extract the reduced data.
std::shared_ptr< Communicator > const & comm() const noexcept
Gets the communicator associated with this AllReduce.
AllReduce(std::shared_ptr< Communicator > comm, std::unique_ptr< Buffer > input, std::unique_ptr< Buffer > output, OpID op_id, ReduceOperator reduce_operator, std::function< void(void)> finished_callback=nullptr)
Construct a new AllReduce operation.
bool is_ready() const noexcept
Check if reduced results are ready for extraction.
cudaStream_t value() const noexcept
Collective communication interfaces.
std::function< void(Buffer const *left, Buffer *right)> ReduceOperator
Type alias for the reduction function signature.
std::int32_t Rank
The rank of a node (e.g. the rank of a MPI process), or world size (total number of ranks).
std::int32_t OpID
Operation ID defined by the user. This allows users to concurrently execute multiple operations,...
Device-side range-based reduction operator.
void operator()(Buffer const *left, Buffer *right)
Apply the reduction operator to the packed data ranges.
Op op
The binary reduction operator.
Host-side range-based reduction operator.
void operator()(Buffer const *left, Buffer *right)
Apply the reduction operator to the packed data ranges.
Op op
The binary reduction operator.