13 #include <unordered_map>
16 #include <rapidsmpf/config.hpp>
17 #include <rapidsmpf/error.hpp>
18 #include <rapidsmpf/memory/buffer.hpp>
19 #include <rapidsmpf/progress_thread.hpp>
120 "Invalid stage value",
124 op >= 0 &&
op < (1 <<
op_id_bits),
"Invalid OpID value", std::overflow_error
132 [[nodiscard]]
static constexpr std::size_t
bit_length() noexcept {
156 [[nodiscard]] constexpr
OpID op() const noexcept {
199 virtual ~
Future() noexcept =
default;
240 "NONE",
"PRINT",
"WARN",
"INFO",
"DEBUG",
"TRACE"
250 auto index =
static_cast<std::size_t
>(level);
270 virtual ~
Logger() noexcept = default;
290 template <
typename... Args>
292 if (
static_cast<std::uint32_t
>(level_) <
static_cast<std::uint32_t
>(level)) {
295 std::ostringstream ss;
297 do_log(level, std::move(ss));
306 template <
typename... Args>
317 template <
typename... Args>
318 void warn(Args
const&... args) {
328 template <
typename... Args>
329 void info(Args
const&... args) {
339 template <
typename... Args>
350 template <
typename... Args>
362 auto const tid = std::this_thread::get_id();
365 auto const [name, inserted] =
366 thread_id_names.insert({tid, thread_id_names_counter});
368 ++thread_id_names_counter;
385 std::ostringstream full_log_msg;
386 full_log_msg <<
"[" <<
level_name(level) <<
":" << rank_ <<
":"
387 <<
get_thread_id() <<
":" << Clock::now() <<
"] " << ss.str();
388 std::lock_guard<std::mutex> lock(mutex_);
389 std::cout << full_log_msg.str() << std::endl;
399 std::uint32_t thread_id_names_counter{0};
402 std::unordered_map<std::thread::id, std::uint32_t> thread_id_names;
406 Communicator() =
default;
409 virtual ~Communicator() noexcept = default;
440 std::unique_ptr<std::vector<std::uint8_t>> msg,
Rank rank,
Tag tag
501 Rank rank,
Tag tag, std::unique_ptr<std::vector<std::uint8_t>> synced_buffer
513 [[nodiscard]] virtual std::pair<std::unique_ptr<std::vector<std::uint8_t>>,
Rank>
526 [[nodiscard]] virtual std::unique_ptr<std::vector<std::uint8_t>>
recv_from(
537 [[nodiscard]] virtual std::
538 pair<std::vector<std::unique_ptr<
Future>>, std::vector<std::
size_t>>
579 std::unique_ptr<
Future> future
626 [[nodiscard]] virtual std::
string str() const = 0;
630 #ifdef RAPIDSMPF_HAVE_UCXX
637 #ifdef RAPIDSMPF_HAVE_MPI
Buffer representing device or host memory.
Abstract base class for asynchronous operation within the communicator.
Future(Future const &)=delete
Not copyable.
Future & operator=(Future &&)=default
Move assignment.
Future(Future &&)=default
Movable.
Future & operator=(Future const &)=delete
Not copy-assignable.
A logger base class for handling different levels of log messages.
void print(Args const &... args)
Logs a print message.
Logger(Rank rank, config::Options options)
Construct a new logger.
virtual void do_log(LOG_LEVEL level, std::ostringstream &&ss)
Handles the logging of a messages.
virtual std::uint32_t get_thread_id()
Returns a unique thread ID for the current thread.
void debug(Args const &... args)
Logs a debug message.
void info(Args const &... args)
Logs an informational message.
static constexpr std::array< char const *, 6 > LOG_LEVEL_NAMES
Log level names corresponding to the LOG_LEVEL enum.
LOG_LEVEL
Log verbosity levels.
@ INFO
Informational messages.
@ PRINT
General print messages.
void log(LOG_LEVEL level, Args const &... args)
Logs a message using the specified verbosity level.
void warn(Args const &... args)
Logs a warning message.
static constexpr char const * level_name(LOG_LEVEL level)
Get the string name of a log level.
void trace(Args const &... args)
Logs a trace message.
LOG_LEVEL verbosity_level() const
Get the verbosity level of the logger.
Abstract base class for a communication mechanism between nodes.
virtual std::unique_ptr< std::vector< std::uint8_t > > recv_from(Rank src, Tag tag)=0
Receives a message from a specific rank (blocking).
virtual Rank nranks() const =0
Retrieves the total number of ranks.
virtual std::string str() const =0
Provides a string representation of the communicator.
virtual std::unique_ptr< Buffer > release_data(std::unique_ptr< Communicator::Future > future)=0
Retrieves data associated with a completed future.
virtual Rank rank() const =0
Retrieves the rank of the current node.
virtual std::shared_ptr< ProgressThread > const & progress_thread() const =0
Retrieves the progress thread associated with this communicator.
virtual bool test(std::unique_ptr< Communicator::Future > &future)=0
Test for completion of a single future.
virtual std::unique_ptr< Future > recv_sync_host_data(Rank rank, Tag tag, std::unique_ptr< std::vector< std::uint8_t >> synced_buffer)=0
Receives a message from a specific rank to an allocated (synchronized) host buffer....
virtual std::vector< std::unique_ptr< Buffer > > wait_all(std::vector< std::unique_ptr< Communicator::Future >> &&futures)=0
Wait for completion of all futures and return their data buffers.
virtual std::pair< std::vector< std::unique_ptr< Future > >, std::vector< std::size_t > > test_some(std::vector< std::unique_ptr< Future >> &future_vector)=0
Tests for completion of multiple futures.
virtual std::unique_ptr< std::vector< std::uint8_t > > release_sync_host_data(std::unique_ptr< Communicator::Future > future)=0
Retrieves synchronized host data associated with a completed future. When the future is completed,...
virtual std::unique_ptr< Future > recv(Rank rank, Tag tag, std::unique_ptr< Buffer > recv_buffer)=0
Receives a message from a specific rank to a buffer. Use release_data to extract the data out of the ...
virtual std::unique_ptr< Buffer > wait(std::unique_ptr< Future > future)=0
Wait for a future to complete and return the data buffer.
virtual std::shared_ptr< Communicator::Logger > const & logger()=0
Retrieves the logger associated with this communicator.
virtual std::unique_ptr< Future > send(std::unique_ptr< std::vector< std::uint8_t >> msg, Rank rank, Tag tag)=0
Sends a host message to a specific rank.
virtual std::pair< std::unique_ptr< std::vector< std::uint8_t > >, Rank > recv_any(Tag tag)=0
Receives a message from any rank (blocking).
A progress thread that can execute arbitrary functions.
A tag used for identifying messages in a communication operation.
static constexpr StorageT stage_id_mask
Mask for the stage ID.
constexpr Tag(OpID const op, StageID const stage)
Constructs a tag.
constexpr OpID op() const noexcept
Extracts the operation ID from the tag.
constexpr StageID stage() const noexcept
Extracts the stage ID from the tag.
static constexpr int stage_id_bits
Number of bits for the stage ID.
static constexpr StorageT max_value() noexcept
Returns the max value of the tag.
static constexpr int op_id_bits
Number of bits for the operation ID.
static constexpr std::size_t bit_length() noexcept
Returns the max number of bits used for the tag.
std::int32_t StorageT
The physical data type to store the tag.
static constexpr StorageT op_id_mask
Mask for the operation ID.
Manages configuration options for RapidsMPF operations.
RAPIDS Multi-Processor interfaces.
std::int32_t Rank
The rank of a node (e.g. the rank of a MPI process), or world size (total number of ranks).
std::int32_t StageID
Identifier for a stage of a communication operation.
constexpr bool COMM_HAVE_MPI
Whether RapidsMPF was built with the MPI Communicator.
std::int32_t OpID
Operation ID defined by the user. This allows users to concurrently execute multiple operations,...
std::ostream & operator<<(std::ostream &os, Communicator const &obj)
Overloads the stream insertion operator for the Communicator class.
constexpr bool COMM_HAVE_UCXX
Whether RapidsMPF was built with the UCXX Communicator.