single.hpp
1 
5 #pragma once
6 
7 #include <cstdlib>
8 #include <memory>
9 #include <string>
10 #include <unordered_map>
11 #include <vector>
12 
13 #include <rapidsmpf/communicator/communicator.hpp>
14 #include <rapidsmpf/config.hpp>
15 #include <rapidsmpf/memory/buffer.hpp>
16 #include <rapidsmpf/memory/buffer_resource.hpp>
17 #include <rapidsmpf/progress_thread.hpp>
18 
19 namespace rapidsmpf {
20 
28 class Single final : public Communicator {
29  public:
36  class Future : public Communicator::Future {
37  friend class Single;
38 
39  public:
40  ~Future() noexcept override = default;
41  };
42 
49  Single(config::Options options, std::shared_ptr<ProgressThread> progress_thread);
50 
51  ~Single() noexcept override = default;
52 
56  [[nodiscard]] constexpr Rank rank() const override {
57  return 0;
58  }
59 
63  [[nodiscard]] constexpr Rank nranks() const override {
64  return 1;
65  }
66 
70  [[nodiscard]] std::unique_ptr<Communicator::Future> send(
71  std::unique_ptr<std::vector<std::uint8_t>> msg, Rank rank, Tag tag
72  ) override;
73 
74  // clang-format off
80  // clang-format on
81  [[nodiscard]] std::unique_ptr<Communicator::Future> send(
82  std::unique_ptr<Buffer> msg, Rank rank, Tag tag
83  ) override;
84 
91  [[nodiscard]] std::unique_ptr<Communicator::Future> recv(
92  Rank rank, Tag tag, std::unique_ptr<Buffer> recv_buffer
93  ) override;
94 
95  // clang-format off
102  // clang-format on
103  [[nodiscard]] std::unique_ptr<Communicator::Future> recv_sync_host_data(
104  Rank rank, Tag tag, std::unique_ptr<std::vector<std::uint8_t>> synced_buffer
105  ) override;
106 
113  [[nodiscard]] std::pair<std::unique_ptr<std::vector<std::uint8_t>>, Rank> recv_any(
114  Tag tag
115  ) override;
116 
123  [[nodiscard]] std::unique_ptr<std::vector<std::uint8_t>> recv_from(
124  Rank src, Tag tag
125  ) override;
126 
133  std::pair<
134  std::vector<std::unique_ptr<Communicator::Future>>,
135  std::vector<std::size_t>>
136  test_some(std::vector<std::unique_ptr<Communicator::Future>>& future_vector) override;
137 
138  // clang-format off
144  // clang-format on
145  std::vector<std::size_t> test_some(
146  std::unordered_map<std::size_t, std::unique_ptr<Communicator::Future>> const&
147  future_map
148  ) override;
149 
151  bool test(std::unique_ptr<Communicator::Future>& future) override;
153  std::vector<std::unique_ptr<Buffer>> wait_all(
154  std::vector<std::unique_ptr<Communicator::Future>>&& futures
155  ) override;
156 
163  [[nodiscard]] std::unique_ptr<Buffer> wait(
164  std::unique_ptr<Communicator::Future> future
165  ) override;
166 
173  [[nodiscard]] std::unique_ptr<Buffer> release_data(
174  std::unique_ptr<Communicator::Future> future
175  ) override;
176 
183  [[nodiscard]] std::unique_ptr<std::vector<std::uint8_t>> release_sync_host_data(
184  std::unique_ptr<Communicator::Future> future
185  ) override;
186 
190  [[nodiscard]] std::shared_ptr<Communicator::Logger> const& logger() override {
191  return logger_;
192  }
193 
197  [[nodiscard]] std::shared_ptr<ProgressThread> const&
198  progress_thread() const override {
199  return progress_thread_;
200  }
201 
205  [[nodiscard]] std::string str() const override;
206 
207  private:
208  std::shared_ptr<Logger> logger_;
209  std::shared_ptr<ProgressThread> progress_thread_;
210 };
211 
212 
213 } // namespace rapidsmpf
Abstract base class for asynchronous operation within the communicator.
Abstract base class for a communication mechanism between nodes.
Represents the future result of an operation.
Definition: single.hpp:36
Single process communicator class that implements the Communicator interface.
Definition: single.hpp:28
Single(config::Options options, std::shared_ptr< ProgressThread > progress_thread)
Construct a single process communicator.
std::string str() const override
Provides a string representation of the communicator.
std::unique_ptr< Communicator::Future > recv_sync_host_data(Rank rank, Tag tag, std::unique_ptr< std::vector< std::uint8_t >> synced_buffer) override
Receives a message from a specific rank to an allocated (synchronized) host buffer....
constexpr Rank rank() const override
Retrieves the rank of the current node.
Definition: single.hpp:56
std::unique_ptr< Communicator::Future > recv(Rank rank, Tag tag, std::unique_ptr< Buffer > recv_buffer) override
Receives a message from a specific rank to a buffer. Use release_data to extract the data out of the ...
std::vector< std::unique_ptr< Buffer > > wait_all(std::vector< std::unique_ptr< Communicator::Future >> &&futures) override
Wait for completion of all futures and return their data buffers.
bool test(std::unique_ptr< Communicator::Future > &future) override
Test for completion of a single future.
constexpr Rank nranks() const override
Retrieves the total number of ranks.
Definition: single.hpp:63
std::shared_ptr< Communicator::Logger > const & logger() override
Retrieves the logger associated with this communicator.
Definition: single.hpp:190
std::pair< std::vector< std::unique_ptr< Communicator::Future > >, std::vector< std::size_t > > test_some(std::vector< std::unique_ptr< Communicator::Future >> &future_vector) override
Tests for completion of multiple futures.
std::unique_ptr< std::vector< std::uint8_t > > recv_from(Rank src, Tag tag) override
Receives a message from a specific rank (blocking).
std::unique_ptr< Buffer > wait(std::unique_ptr< Communicator::Future > future) override
Wait for a future to complete and return the data buffer.
std::vector< std::size_t > test_some(std::unordered_map< std::size_t, std::unique_ptr< Communicator::Future >> const &future_map) override
Tests for completion of multiple futures in a map.
std::unique_ptr< Buffer > release_data(std::unique_ptr< Communicator::Future > future) override
Retrieves data associated with a completed future.
std::unique_ptr< Communicator::Future > send(std::unique_ptr< std::vector< std::uint8_t >> msg, Rank rank, Tag tag) override
Sends a host message to a specific rank.
std::unique_ptr< Communicator::Future > send(std::unique_ptr< Buffer > msg, Rank rank, Tag tag) override
Sends a message (device or host) to a specific rank. Use release_data to obtain the data buffer again...
std::unique_ptr< std::vector< std::uint8_t > > release_sync_host_data(std::unique_ptr< Communicator::Future > future) override
Retrieves synchronized host data associated with a completed future. When the future is completed,...
std::shared_ptr< ProgressThread > const & progress_thread() const override
Retrieves the progress thread associated with this communicator.
Definition: single.hpp:198
std::pair< std::unique_ptr< std::vector< std::uint8_t > >, Rank > recv_any(Tag tag) override
Receives a message from any rank (blocking).
A tag used for identifying messages in a communication operation.
Manages configuration options for RapidsMPF operations.
Definition: config.hpp:140
RAPIDS Multi-Processor interfaces.
Definition: backend.hpp:13
std::int32_t Rank
The rank of a node (e.g. the rank of a MPI process), or world size (total number of ranks).