sparse_alltoall.hpp
1 
6 #pragma once
7 
8 #include <memory>
9 #include <vector>
10 
11 #include <coro/event.hpp>
12 #include <coro/task.hpp>
13 
14 #include <rapidsmpf/coll/sparse_alltoall.hpp>
15 #include <rapidsmpf/communicator/communicator.hpp>
16 #include <rapidsmpf/memory/packed_data.hpp>
17 #include <rapidsmpf/streaming/core/context.hpp>
18 
19 namespace rapidsmpf::streaming {
20 
30  public:
41  std::shared_ptr<Context> ctx,
42  std::shared_ptr<Communicator> comm,
43  OpID op_id,
44  std::vector<Rank> srcs,
45  std::vector<Rank> dsts
46  );
47 
48  SparseAlltoall(SparseAlltoall const&) = delete;
49  SparseAlltoall& operator=(SparseAlltoall const&) = delete;
50  SparseAlltoall(SparseAlltoall&&) = delete;
51  SparseAlltoall& operator=(SparseAlltoall&&) = delete;
52 
53  ~SparseAlltoall() noexcept;
54 
60  [[nodiscard]] std::shared_ptr<Context> const& ctx() const noexcept;
61 
67  [[nodiscard]] std::shared_ptr<Communicator> const& comm() const noexcept;
68 
70  void insert(Rank dst, PackedData&& packed_data);
71 
77  [[nodiscard]] coro::task<void> insert_finished();
78 
80  [[nodiscard]] std::vector<PackedData> extract(Rank src);
81 
82  private:
83  coro::event
84  event_{};
85  std::shared_ptr<Context> ctx_;
86  coll::SparseAlltoall exchange_;
87 };
88 
89 } // namespace rapidsmpf::streaming
Abstract base class for a communication mechanism between nodes.
Sparse all-to-all collective over explicit source and destination peer sets.
Context for actors (coroutines) in rapidsmpf.
Definition: context.hpp:41
Asynchronous (coroutine) interface to coll::SparseAlltoall.
std::shared_ptr< Communicator > const & comm() const noexcept
Gets the communicator associated with this SparseAlltoall.
coro::task< void > insert_finished()
Indicate that no more data will be inserted for any destination.
std::shared_ptr< Context > const & ctx() const noexcept
Gets the streaming context associated with this object.
SparseAlltoall(std::shared_ptr< Context > ctx, std::shared_ptr< Communicator > comm, OpID op_id, std::vector< Rank > srcs, std::vector< Rank > dsts)
Construct an asynchronous sparse all-to-all.
void insert(Rank dst, PackedData &&packed_data)
Insert data to send to a destination rank.
std::vector< PackedData > extract(Rank src)
Extract all received messages from a source rank.
std::int32_t Rank
The rank of a node (e.g. the rank of a MPI process), or world size (total number of ranks).
std::int32_t OpID
Operation ID defined by the user. This allows users to concurrently execute multiple operations,...
Bag of bytes with metadata suitable for sending over the wire.
Definition: packed_data.hpp:26