15 #include <rapidsmpf/communicator/communicator.hpp>
16 #include <rapidsmpf/config.hpp>
17 #include <rapidsmpf/error.hpp>
25 std::pair<std::string, uint16_t>;
27 using RemoteAddress = std::variant<
29 std::shared_ptr<::ucxx::Address>>;
35 enum class ProgressMode : std::uint8_t {
54 class SharedResources;
96 std::unique_ptr<rapidsmpf::ucxx::InitializedRank> init(
97 std::shared_ptr<::ucxx::Worker> worker,
99 std::optional<RemoteAddress> remote_address,
129 Future(std::shared_ptr<::ucxx::Request> req, std::unique_ptr<Buffer> data_buffer)
130 : req_{std::move(req)}, data_buffer_{std::move(data_buffer)} {}
142 std::shared_ptr<::ucxx::Request> req,
143 std::unique_ptr<std::vector<uint8_t>> synced_host_data
145 : req_{std::move(req)}, synced_host_data_{std::move(synced_host_data)} {}
147 ~
Future() noexcept override = default;
150 std::shared_ptr<::ucxx::Request> req_;
151 std::unique_ptr<
Buffer> data_buffer_;
153 std::unique_ptr<std::vector<uint8_t>> synced_host_data_;
167 ~
UCXX() noexcept override;
172 [[nodiscard]] Rank
rank() const override;
177 [[nodiscard]] Rank
nranks() const override;
183 std::unique_ptr<std::vector<uint8_t>> msg, Rank
rank,
Tag tag
208 Rank
rank,
Tag tag, std::unique_ptr<std::vector<uint8_t>> synced_buffer
217 [[nodiscard]] std::pair<std::unique_ptr<std::vector<uint8_t>>, Rank>
recv_any(
227 [[nodiscard]] std::unique_ptr<std::vector<uint8_t>>
recv_from(
238 std::vector<std::
size_t>>
286 [[nodiscard]] std::string
str()
const override;
318 std::shared_ptr<SharedResources> shared_resources_;
322 std::shared_ptr<::ucxx::Endpoint> get_endpoint(Rank
rank);
323 void progress_worker();
Buffer representing device or host memory.
Abstract base class for asynchronous operation within the communicator.
A logger base class for handling different levels of log messages.
Abstract base class for a communication mechanism between nodes.
A tag used for identifying messages in a communication operation.
Manages configuration options for RapidsMPF operations.
InitializedRank(std::shared_ptr< SharedResources > shared_resources)
Construct an initialized UCXX rank.
std::shared_ptr< SharedResources > shared_resources_
Opaque object created by init().
Storage for a listener address.
RemoteAddress address
Hostname/port pair or UCXX address.
Rank rank
The rank of the listener.
Represents the future result of an UCXX operation.
Future(std::shared_ptr<::ucxx::Request > req, std::unique_ptr< Buffer > data_buffer)
Construct a Future from a data buffer.
Future(std::shared_ptr<::ucxx::Request > req, std::unique_ptr< std::vector< uint8_t >> synced_host_data)
Construct a Future from synchronized host data.
UCXX communicator class that implements the Communicator interface.
ListenerAddress listener_address()
Get address to which listener is bound.
std::pair< std::unique_ptr< std::vector< uint8_t > >, Rank > recv_any(Tag tag) override
Receives a message from any rank (blocking).
std::unique_ptr< Communicator::Future > recv(Rank rank, Tag tag, std::unique_ptr< Buffer > recv_buffer) override
Receives a message from a specific rank to a buffer. Use release_data to extract the data out of the ...
std::unique_ptr< Communicator::Future > send(std::unique_ptr< std::vector< uint8_t >> msg, Rank rank, Tag tag) override
Sends a host message to a specific rank.
std::unique_ptr< std::vector< uint8_t > > recv_from(Rank src, Tag tag) override
Receives a message from a specific rank (blocking).
std::string str() const override
Provides a string representation of the communicator.
Rank rank() const override
Retrieves the rank of the current node.
std::unique_ptr< std::vector< uint8_t > > release_sync_host_data(std::unique_ptr< Communicator::Future > future) override
Retrieves synchronized host data associated with a completed future. When the future is completed,...
Logger & logger() override
Retrieves the logger associated with this communicator.
void barrier()
Barrier to synchronize all ranks.
std::unique_ptr< Communicator::Future > recv_sync_host_data(Rank rank, Tag tag, std::unique_ptr< std::vector< uint8_t >> synced_buffer) override
Receives a message from a specific rank to an allocated (synchronized) host buffer....
std::unique_ptr< Buffer > release_data(std::unique_ptr< Communicator::Future > future) override
Retrieves data associated with a completed future.
Rank nranks() const override
Retrieves the total number of ranks.
std::shared_ptr< UCXX > split()
Creates a new communicator with a single rank.
std::pair< std::vector< std::unique_ptr< Communicator::Future > >, std::vector< std::size_t > > test_some(std::vector< std::unique_ptr< Communicator::Future >> &future_vector) override
Tests for completion of multiple futures.
std::unique_ptr< Buffer > wait(std::unique_ptr< Communicator::Future > future) override
Wait for a future to complete and return the data buffer.