12 #include <rapidsmpf/allgather/allgather.hpp>
13 #include <rapidsmpf/buffer/packed_data.hpp>
14 #include <rapidsmpf/communicator/communicator.hpp>
15 #include <rapidsmpf/streaming/chunks/packed_data.hpp>
16 #include <rapidsmpf/streaming/core/channel.hpp>
17 #include <rapidsmpf/streaming/core/context.hpp>
19 #include <coro/event.hpp>
20 #include <coro/task.hpp>
56 [[nodiscard]] std::shared_ptr<Context>
ctx() const noexcept;
84 std::shared_ptr<Context> ctx_;
110 std::shared_ptr<Context> ctx,
111 std::shared_ptr<Channel> ch_in,
112 std::shared_ptr<Channel> ch_out,
AllGather communication service.
Ordered
Tag requesting ordering for extraction.
@ YES
Extraction is ordered.
Asynchronous (coroutine) interface to allgather::AllGather.
void insert_finished()
Mark that this rank has finished contributing data.
std::shared_ptr< Context > ctx() const noexcept
Gets the streaming context associated with this AllGather object.
AllGather(std::shared_ptr< Context > ctx, OpID op_id)
Construct an asynchronous allgather.
void insert(std::uint64_t sequence_number, PackedDataChunk &&chunk)
Insert a chunk into the allgather.
coro::task< std::vector< PackedDataChunk > > extract_all(Ordered ordered=Ordered::YES)
Extract all gathered data.
Node allgather(std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, OpID op_id, AllGather::Ordered ordered=AllGather::Ordered::YES)
Create an allgather node for a single allgather operation.
coro::task< void > Node
Alias for a node in a streaming pipeline.