allgather.hpp
1 
6 #pragma once
7 
8 #include <cstdint>
9 #include <memory>
10 #include <vector>
11 
12 #include <coro/event.hpp>
13 #include <coro/task.hpp>
14 
15 #include <rapidsmpf/coll/allgather.hpp>
16 #include <rapidsmpf/communicator/communicator.hpp>
17 #include <rapidsmpf/memory/packed_data.hpp>
18 #include <rapidsmpf/streaming/core/channel.hpp>
19 #include <rapidsmpf/streaming/core/context.hpp>
20 
21 namespace rapidsmpf::streaming {
22 
31 class AllGather {
32  public:
43  std::shared_ptr<Context> ctx, std::shared_ptr<Communicator> comm, OpID op_id
44  );
45 
46  AllGather(AllGather const&) = delete;
47  AllGather& operator=(AllGather const&) = delete;
48  AllGather(AllGather&&) = delete;
49  AllGather& operator=(AllGather&&) = delete;
50 
51  ~AllGather() noexcept;
52 
58  [[nodiscard]] std::shared_ptr<Context> const& ctx() const noexcept;
59 
65  [[nodiscard]] std::shared_ptr<Communicator> const& comm() const noexcept;
66 
73  void insert(std::uint64_t sequence_number, PackedData&& chunk);
74 
77 
88  coro::task<std::vector<PackedData>> extract_all(Ordered ordered = Ordered::YES);
89 
90  private:
91  coro::event
92  event_{};
93  std::shared_ptr<Context> ctx_;
94  coll::AllGather gatherer_;
95 };
96 
97 namespace actor {
98 
120  std::shared_ptr<Context> ctx,
121  std::shared_ptr<Communicator> comm,
122  std::shared_ptr<Channel> ch_in,
123  std::shared_ptr<Channel> ch_out,
124  OpID op_id,
126 );
127 } // namespace actor
128 } // namespace rapidsmpf::streaming
Abstract base class for a communication mechanism between nodes.
AllGather communication service.
Definition: allgather.hpp:376
Ordered
Tag requesting ordering for extraction.
Definition: allgather.hpp:400
@ YES
Extraction is ordered.
Asynchronous (coroutine) interface to coll::AllGather.
Definition: allgather.hpp:31
void insert(std::uint64_t sequence_number, PackedData &&chunk)
Insert a chunk into the allgather.
AllGather(std::shared_ptr< Context > ctx, std::shared_ptr< Communicator > comm, OpID op_id)
Construct an asynchronous allgather.
void insert_finished()
Mark that this rank has finished contributing data.
coro::task< std::vector< PackedData > > extract_all(Ordered ordered=Ordered::YES)
Extract all gathered data.
std::shared_ptr< Communicator > const & comm() const noexcept
Gets the communicator associated with this AllGather.
std::shared_ptr< Context > const & ctx() const noexcept
Gets the streaming context associated with this AllGather object.
Context for actors (coroutines) in rapidsmpf.
Definition: context.hpp:41
Actor allgather(std::shared_ptr< Context > ctx, std::shared_ptr< Communicator > comm, std::shared_ptr< Channel > ch_in, std::shared_ptr< Channel > ch_out, OpID op_id, AllGather::Ordered ordered=AllGather::Ordered::YES)
Create an allgather actor for a single allgather operation.
coro::task< void > Actor
Alias for an actor in a streaming graph.
Definition: actor.hpp:18
std::int32_t OpID
Operation ID defined by the user. This allows users to concurrently execute multiple operations,...
Bag of bytes with metadata suitable for sending over the wire.
Definition: packed_data.hpp:26