12 #include <unordered_map>
15 #include <rapidsmpf/buffer/buffer.hpp>
16 #include <rapidsmpf/buffer/resource.hpp>
17 #include <rapidsmpf/config.hpp>
18 #include <rapidsmpf/error.hpp>
33 using Rank = std::int32_t;
42 using OpID = std::uint8_t;
48 using StageID = std::uint8_t;
94 [[nodiscard]]
static constexpr
size_t bit_length() noexcept {
118 [[nodiscard]] constexpr OpID
op() const noexcept {
126 [[nodiscard]] constexpr StageID
stage() const noexcept {
152 virtual ~
Future() noexcept =
default;
193 "NONE",
"PRINT",
"WARN",
"INFO",
"DEBUG",
"TRACE"
203 auto index =
static_cast<std::size_t
>(level);
223 virtual ~
Logger() noexcept = default;
243 template <
typename... Args>
245 if (
static_cast<std::uint32_t
>(level_) <
static_cast<std::uint32_t
>(level)) {
248 std::ostringstream ss;
250 do_log(level, std::move(ss));
259 template <
typename... Args>
270 template <
typename... Args>
271 void warn(Args
const&... args) {
281 template <
typename... Args>
282 void info(Args
const&... args) {
292 template <
typename... Args>
303 template <
typename... Args>
315 auto const tid = std::this_thread::get_id();
318 auto const [name, inserted] =
319 thread_id_names.insert({tid, thread_id_names_counter});
321 ++thread_id_names_counter;
338 std::ostringstream full_log_msg;
339 full_log_msg <<
"[" <<
level_name(level) <<
":" << comm_->
rank() <<
":"
341 std::lock_guard<std::mutex> lock(mutex_);
342 std::cout << full_log_msg.str() << std::endl;
361 std::uint32_t thread_id_names_counter{0};
364 std::unordered_map<std::thread::id, std::uint32_t> thread_id_names;
368 Communicator() =
default;
371 virtual ~Communicator() noexcept = default;
377 [[nodiscard]] virtual Rank
rank() const = 0;
383 [[nodiscard]] virtual Rank
nranks() const = 0;
397 std::unique_ptr<std::vector<uint8_t>> msg, Rank
rank,
Tag tag
448 Rank
rank,
Tag tag, std::unique_ptr<std::vector<uint8_t>> synced_buffer
460 [[nodiscard]] virtual std::pair<std::unique_ptr<std::vector<uint8_t>>, Rank>
recv_any(
474 [[nodiscard]] virtual std::unique_ptr<std::vector<uint8_t>>
recv_from(
485 [[nodiscard]] virtual std::
486 pair<std::vector<std::unique_ptr<
Future>>, std::vector<std::
size_t>>
508 std::unique_ptr<
Future> future
547 [[nodiscard]] virtual std::
string str() const = 0;
551 #ifdef RAPIDSMPF_HAVE_UCXX
552 constexpr
bool COMM_HAVE_UCXX =
true;
554 constexpr
bool COMM_HAVE_UCXX =
false;
558 #ifdef RAPIDSMPF_HAVE_MPI
559 constexpr
bool COMM_HAVE_MPI =
true;
561 constexpr
bool COMM_HAVE_MPI =
false;
Buffer representing device or host memory.
Abstract base class for asynchronous operation within the communicator.
Future(Future const &)=delete
Not copyable.
Future & operator=(Future &&)=default
Move assignment.
Future(Future &&)=default
Movable.
Future & operator=(Future const &)=delete
Not copy-assignable.
A logger base class for handling different levels of log messages.
void print(Args const &... args)
Logs a print message.
virtual void do_log(LOG_LEVEL level, std::ostringstream &&ss)
Handles the logging of a messages.
virtual std::uint32_t get_thread_id()
Returns a unique thread ID for the current thread.
void debug(Args const &... args)
Logs a debug message.
static constexpr const char * level_name(LOG_LEVEL level)
Get the string name of a log level.
Communicator * get_communicator() const
Get the communicator used by the logger.
Logger(Communicator *comm, config::Options options)
Construct a new logger.
void info(Args const &... args)
Logs an informational message.
static constexpr std::array< char const *, 6 > LOG_LEVEL_NAMES
Log level names corresponding to the LOG_LEVEL enum.
LOG_LEVEL
Log verbosity levels.
@ INFO
Informational messages.
@ PRINT
General print messages.
void log(LOG_LEVEL level, Args const &... args)
Logs a message using the specified verbosity level.
void warn(Args const &... args)
Logs a warning message.
void trace(Args const &... args)
Logs a trace message.
LOG_LEVEL verbosity_level() const
Get the verbosity level of the logger.
Abstract base class for a communication mechanism between nodes.
virtual Rank nranks() const =0
Retrieves the total number of ranks.
virtual std::string str() const =0
Provides a string representation of the communicator.
virtual std::unique_ptr< std::vector< uint8_t > > release_sync_host_data(std::unique_ptr< Communicator::Future > future)=0
Retrieves synchronized host data associated with a completed future. When the future is completed,...
virtual std::unique_ptr< Buffer > release_data(std::unique_ptr< Communicator::Future > future)=0
Retrieves data associated with a completed future.
virtual std::unique_ptr< std::vector< uint8_t > > recv_from(Rank src, Tag tag)=0
Receives a message from a specific rank (blocking).
virtual Rank rank() const =0
Retrieves the rank of the current node.
virtual std::unique_ptr< Future > send(std::unique_ptr< std::vector< uint8_t >> msg, Rank rank, Tag tag)=0
Sends a host message to a specific rank.
virtual std::pair< std::unique_ptr< std::vector< uint8_t > >, Rank > recv_any(Tag tag)=0
Receives a message from any rank (blocking).
virtual std::unique_ptr< Future > recv_sync_host_data(Rank rank, Tag tag, std::unique_ptr< std::vector< uint8_t >> synced_buffer)=0
Receives a message from a specific rank to an allocated (synchronized) host buffer....
virtual std::pair< std::vector< std::unique_ptr< Future > >, std::vector< std::size_t > > test_some(std::vector< std::unique_ptr< Future >> &future_vector)=0
Tests for completion of multiple futures.
virtual std::unique_ptr< Future > recv(Rank rank, Tag tag, std::unique_ptr< Buffer > recv_buffer)=0
Receives a message from a specific rank to a buffer. Use release_data to extract the data out of the ...
virtual Logger & logger()=0
Retrieves the logger associated with this communicator.
virtual std::unique_ptr< Buffer > wait(std::unique_ptr< Future > future)=0
Wait for a future to complete and return the data buffer.
A tag used for identifying messages in a communication operation.
static constexpr StorageT stage_id_mask
Mask for the stage ID.
constexpr Tag(OpID const op, StageID const stage)
Constructs a tag.
constexpr OpID op() const noexcept
Extracts the operation ID from the tag.
static constexpr size_t bit_length() noexcept
Returns the max number of bits used for the tag.
constexpr StageID stage() const noexcept
Extracts the stage ID from the tag.
static constexpr int stage_id_bits
Number of bits for the stage ID.
static constexpr StorageT max_value() noexcept
Returns the max value of the tag.
static constexpr int op_id_bits
Number of bits for the operation ID.
std::int32_t StorageT
The physical data type to store the tag.
static constexpr StorageT op_id_mask
Mask for the operation ID.
Manages configuration options for RapidsMPF operations.
std::ostream & operator<<(std::ostream &os, cuda_stream_view stream)
std::int32_t Rank
Type alias for communicator::Rank.