message.hpp
1 
6 #pragma once
7 
8 #include <any>
9 #include <cstdint>
10 #include <functional>
11 #include <memory>
12 #include <stdexcept>
13 #include <typeinfo>
14 #include <utility>
15 
16 #include <rapidsmpf/buffer/buffer.hpp>
17 #include <rapidsmpf/buffer/content_description.hpp>
18 #include <rapidsmpf/buffer/resource.hpp>
19 #include <rapidsmpf/error.hpp>
20 
21 namespace rapidsmpf::streaming {
22 
23 
27 class Message {
28  public:
41  using CopyCallback =
42  std::function<Message(Message const&, MemoryReservation& reservation)>;
43 
45  Message() = default;
46 
75  template <typename T>
77  std::uint64_t sequence_number,
78  std::unique_ptr<T> payload,
80  CopyCallback copy_cb = nullptr
81  )
82  : sequence_number_(sequence_number),
83  content_description_{content_description},
84  copy_cb_{std::move(copy_cb)} {
85  RAPIDSMPF_EXPECTS(
86  payload != nullptr, "nullptr not allowed", std::invalid_argument
87  );
88  // Conceptually, a `std::unique_ptr` would be sufficient for exclusive ownership.
89  // However, since `std::any` requires its contents to be copyable, we store the
90  // payload in a `std::shared_ptr` instead. This shared ownership is internal only
91  // and never exposed to the user.
92  payload_ = std::shared_ptr<T>(std::move(payload));
93  }
94 
95  // In tandem with coro::queue the move assignment of std::any breaks GCC's
96  // uninitialized variable tracking and we get a warning that std::any::_M_manager' may
97  // be used uninitialized [-Wmaybe-uninitialized] This is a bug in GCC 14.x which we
98  // workaround by suppressing the warning for the move ctors/assignment. Fixed in
99  // GCC 15.2.
100 #if defined(__GNUC__) && __GNUC__ == 14
101 #pragma GCC diagnostic push
102 #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
103 #endif
105  Message(Message&& other) noexcept = default;
106 
108  Message& operator=(Message&& other) noexcept = default;
109 #if defined(__GNUC__) && __GNUC__ == 14
110 #pragma GCC diagnostic pop
111 #endif
112  Message(Message const&) = delete;
113  Message& operator=(Message const&) = delete;
114 
118  void reset() noexcept {
119  return payload_.reset();
120  }
121 
127  [[nodiscard]] bool empty() const noexcept {
128  return !payload_.has_value();
129  }
130 
136  [[nodiscard]] constexpr std::uint64_t sequence_number() const noexcept {
137  return sequence_number_;
138  }
139 
146  template <typename T>
147  [[nodiscard]] bool holds() const noexcept {
148  return payload_.type() == typeid(std::shared_ptr<T>);
149  }
150 
160  template <typename T>
161  [[nodiscard]] T const& get() const {
162  return *get_ptr<T>();
163  }
164 
172  template <typename T>
173  [[nodiscard]] T release() {
174  std::shared_ptr<T> ret = get_ptr<T>();
175  reset();
176  return std::move(*ret);
177  }
178 
184  [[nodiscard]] constexpr ContentDescription const&
185  content_description() const noexcept {
186  return content_description_;
187  }
188 
194  [[nodiscard]] constexpr CopyCallback const& copy_cb() const noexcept {
195  return copy_cb_;
196  }
197 
212  [[nodiscard]] constexpr size_t copy_cost() const noexcept {
214  }
215 
231  [[nodiscard]] Message copy(MemoryReservation& reservation) const {
232  RAPIDSMPF_EXPECTS(
233  copy_cb(), "message doesn't support `copy`", std::invalid_argument
234  );
235  return copy_cb()(*this, reservation);
236  }
237 
238  private:
246  template <typename T>
247  [[nodiscard]] std::shared_ptr<T> get_ptr() const {
248  RAPIDSMPF_EXPECTS(!empty(), "message is empty", std::invalid_argument);
249  RAPIDSMPF_EXPECTS(holds<T>(), "wrong message type", std::invalid_argument);
250  return std::any_cast<std::shared_ptr<T>>(payload_);
251  }
252 
253  private:
254  std::uint64_t sequence_number_{0};
255  std::any payload_;
256  ContentDescription content_description_;
257  CopyCallback copy_cb_;
258 };
259 
260 } // namespace rapidsmpf::streaming
Description of an object's content.
constexpr std::size_t & content_size(MemoryType mem_type) noexcept
Access (read/write) the size for a specific memory type.
Represents a reservation for future memory allocation.
Definition: resource.hpp:33
Type-erased message wrapper around a payload.
Definition: message.hpp:27
constexpr ContentDescription const & content_description() const noexcept
Returns the content description associated with the message.
Definition: message.hpp:185
void reset() noexcept
Reset the message to empty.
Definition: message.hpp:118
Message(Message &&other) noexcept=default
Move construct.
Message & operator=(Message &&other) noexcept=default
Move assign.
std::function< Message(Message const &, MemoryReservation &reservation)> CopyCallback
Callback for performing a deep copy of a message.
Definition: message.hpp:42
Message(std::uint64_t sequence_number, std::unique_ptr< T > payload, ContentDescription content_description, CopyCallback copy_cb=nullptr)
Construct a new message from a unique pointer to its payload.
Definition: message.hpp:76
T const & get() const
Reference to the payload.
Definition: message.hpp:161
Message()=default
Create an empty message.
bool holds() const noexcept
Compare the payload type.
Definition: message.hpp:147
constexpr std::uint64_t sequence_number() const noexcept
Returns the sequence number of this message.
Definition: message.hpp:136
T release()
Extracts the payload and resets the message.
Definition: message.hpp:173
bool empty() const noexcept
Returns true when no payload is stored.
Definition: message.hpp:127
Message copy(MemoryReservation &reservation) const
Perform a deep copy of this message and its payload.
Definition: message.hpp:231
constexpr size_t copy_cost() const noexcept
Returns the total memory size required for a deep copy of the payload.
Definition: message.hpp:212
constexpr CopyCallback const & copy_cb() const noexcept
Returns the copy callback associated with the message.
Definition: message.hpp:194