15 #include <rapidsmpf/communicator/communicator.hpp>
16 #include <rapidsmpf/error.hpp>
17 #include <rapidsmpf/progress_thread.hpp>
33 void init(
int* argc,
char*** argv);
50 #define RAPIDSMPF_MPI(call) \
51 rapidsmpf::mpi::detail::check_mpi_error((call), __FILE__, __LINE__)
61 void check_mpi_error(
int error_code,
char const* file,
int line);
91 Future(MPI_Request req, std::unique_ptr<Buffer> data_buffer)
92 : req_{req}, data_buffer_{std::move(data_buffer)} {}
104 MPI_Request req, std::unique_ptr<std::vector<std::uint8_t>> synced_host_data
106 : req_{std::move(req)}, synced_host_data_{std::move(synced_host_data)} {}
108 ~
Future() noexcept override = default;
114 std::unique_ptr<
Buffer> data_buffer_;
116 std::unique_ptr<std::vector<std::uint8_t>> synced_host_data_;
127 config::Options options,
130 ~
MPI() noexcept override = default;
151 [[nodiscard]] std::unique_ptr<Communicator::Future>
send(
152 std::unique_ptr<std::vector<std::uint8_t>> msg,
Rank rank,
Tag tag
162 [[nodiscard]] std::unique_ptr<Communicator::Future>
send(
171 [[nodiscard]] std::unique_ptr<Communicator::Future>
recv(
172 Rank rank,
Tag tag, std::unique_ptr<Buffer> recv_buffer
183 Rank rank,
Tag tag, std::unique_ptr<std::vector<std::uint8_t>> synced_buffer
189 [[nodiscard]] std::pair<std::unique_ptr<std::vector<std::uint8_t>>,
Rank>
recv_any(
196 [[nodiscard]] std::unique_ptr<std::vector<std::uint8_t>>
recv_from(
203 std::vector<std::unique_ptr<Communicator::Future>>,
204 std::vector<std::size_t>>
205 test_some(std::vector<std::unique_ptr<Communicator::Future>>& future_vector)
override;
213 std::unordered_map<std::size_t, std::unique_ptr<Communicator::Future>>
const&
218 bool test(std::unique_ptr<Communicator::Future>& future)
override;
221 std::vector<std::unique_ptr<Communicator::Future>>&& futures
227 [[nodiscard]] std::unique_ptr<Buffer>
wait(
228 std::unique_ptr<Communicator::Future> future
235 std::unique_ptr<Communicator::Future> future
242 std::unique_ptr<Communicator::Future> future
248 [[nodiscard]] std::shared_ptr<Communicator::Logger>
const&
logger()
override {
255 [[nodiscard]] std::shared_ptr<ProgressThread>
const&
257 return progress_thread_;
263 [[nodiscard]] std::string
str()
const override;
269 std::shared_ptr<Logger> logger_;
270 std::shared_ptr<ProgressThread> progress_thread_;
Buffer representing device or host memory.
Abstract base class for asynchronous operation within the communicator.
Abstract base class for a communication mechanism between nodes.
Represents the future result of an MPI operation.
Future(MPI_Request req, std::unique_ptr< std::vector< std::uint8_t >> synced_host_data)
Construct a Future from synchronized host data.
Future(MPI_Request req, std::unique_ptr< Buffer > data_buffer)
Construct a Future from a data buffer.
MPI communicator class that implements the Communicator interface.
std::shared_ptr< Communicator::Logger > const & logger() override
Retrieves the logger associated with this communicator.
std::unique_ptr< Communicator::Future > send(std::unique_ptr< std::vector< std::uint8_t >> msg, Rank rank, Tag tag) override
Sends a host message to a specific rank.
std::vector< std::size_t > test_some(std::unordered_map< std::size_t, std::unique_ptr< Communicator::Future >> const &future_map) override
Tests for completion of multiple futures in a map.
std::unique_ptr< std::vector< std::uint8_t > > recv_from(Rank src, Tag tag) override
Receives a message from a specific rank (blocking).
std::vector< std::unique_ptr< Buffer > > wait_all(std::vector< std::unique_ptr< Communicator::Future >> &&futures) override
Wait for completion of all futures and return their data buffers.
std::unique_ptr< Buffer > wait(std::unique_ptr< Communicator::Future > future) override
Wait for a future to complete and return the data buffer.
std::pair< std::vector< std::unique_ptr< Communicator::Future > >, std::vector< std::size_t > > test_some(std::vector< std::unique_ptr< Communicator::Future >> &future_vector) override
Tests for completion of multiple futures.
std::shared_ptr< ProgressThread > const & progress_thread() const override
Retrieves the progress thread associated with this communicator.
std::unique_ptr< Communicator::Future > send(std::unique_ptr< Buffer > msg, Rank rank, Tag tag) override
Sends a message (device or host) to a specific rank. Use release_data to obtain the data buffer again...
bool test(std::unique_ptr< Communicator::Future > &future) override
Test for completion of a single future.
std::unique_ptr< Communicator::Future > recv_sync_host_data(Rank rank, Tag tag, std::unique_ptr< std::vector< std::uint8_t >> synced_buffer) override
Receives a message from a specific rank to an allocated (synchronized) host buffer....
Rank nranks() const override
Retrieves the total number of ranks.
std::unique_ptr< std::vector< std::uint8_t > > release_sync_host_data(std::unique_ptr< Communicator::Future > future) override
Retrieves synchronized host data associated with a completed future. When the future is completed,...
Rank rank() const override
Retrieves the rank of the current node.
std::pair< std::unique_ptr< std::vector< std::uint8_t > >, Rank > recv_any(Tag tag) override
Receives a message from any rank (blocking).
std::string str() const override
Provides a string representation of the communicator.
std::unique_ptr< Communicator::Future > recv(Rank rank, Tag tag, std::unique_ptr< Buffer > recv_buffer) override
Receives a message from a specific rank to a buffer. Use release_data to extract the data out of the ...
std::unique_ptr< Buffer > release_data(std::unique_ptr< Communicator::Future > future) override
Retrieves data associated with a completed future.
A progress thread that can execute arbitrary functions.
A tag used for identifying messages in a communication operation.
void init(int *argc, char ***argv)
Helper to initialize MPI with threading support.
bool is_initialized()
Check if MPI is initialized.
RAPIDS Multi-Processor interfaces.
std::int32_t Rank
The rank of a node (e.g. the rank of a MPI process), or world size (total number of ranks).