chunk.hpp
1 
5 #pragma once
6 
7 #include <memory>
8 #include <sstream>
9 #include <vector>
10 
11 #include <rapidsmpf/buffer/buffer.hpp>
12 #include <rapidsmpf/buffer/packed_data.hpp>
13 #include <rapidsmpf/communicator/communicator.hpp>
14 
16 
22 using PartID = std::uint32_t;
23 
24 namespace detail {
25 
29 using ChunkID = std::uint64_t;
30 
58 class Chunk {
59  // friend a method that creates a dummy chunk for testing
60  friend Chunk make_dummy_chunk(ChunkID, PartID);
61 
62  public:
67  Chunk(Chunk&& other) noexcept = default;
68 
74  Chunk& operator=(Chunk&& other) noexcept = default;
75 
76  // delete copy constructor
77  Chunk(Chunk const&) = delete;
78 
79  // delete copy assignment operator
80  Chunk& operator=(Chunk const&) = delete;
81 
88  static constexpr size_t metadata_message_header_size(size_t n_messages) {
89  return sizeof(ChunkID) + sizeof(size_t)
90  + n_messages
91  * (sizeof(PartID) + sizeof(size_t) + sizeof(uint32_t)
92  + sizeof(uint64_t));
93  }
94 
100  [[nodiscard]] constexpr ChunkID chunk_id() const {
101  return chunk_id_;
102  }
103 
109  [[nodiscard]] constexpr size_t n_messages() const {
110  return part_ids_.size();
111  }
112 
119  [[nodiscard]] constexpr PartID part_id(size_t i) const {
120  return part_ids_.at(i);
121  }
122 
130  [[nodiscard]] constexpr size_t expected_num_chunks(size_t i) const {
131  return expected_num_chunks_.at(i);
132  }
133 
140  [[nodiscard]] constexpr bool is_control_message(size_t i) const {
141  // We use `expected_num_chunks > 0` to flag a message as a "control message".
142  return expected_num_chunks(i) > 0;
143  }
144 
163  Chunk get_data(ChunkID new_chunk_id, size_t i, BufferResource* br);
164 
172  [[nodiscard]] constexpr uint32_t metadata_size(size_t i) const {
173  return i == 0 ? meta_offsets_.at(0)
174  : meta_offsets_.at(i) - meta_offsets_.at(i - 1);
175  }
176 
184  [[nodiscard]] constexpr size_t data_size(size_t i) const {
185  return i == 0 ? data_offsets_.at(0)
186  : data_offsets_.at(i) - data_offsets_.at(i - 1);
187  }
188 
194  void set_data_buffer(std::unique_ptr<Buffer> data) {
195  RAPIDSMPF_EXPECTS(!data_, "buffer is already set");
196  data_ = std::move(data);
197  }
198 
204  [[nodiscard]] bool is_data_buffer_set() const {
205  return data_ != nullptr;
206  }
207 
213  [[nodiscard]] bool is_metadata_buffer_set() const {
214  return metadata_ != nullptr && !metadata_->empty();
215  }
216 
222  [[nodiscard]] MemoryType data_memory_type() const {
223  RAPIDSMPF_EXPECTS(data_, "data buffer is not set");
224  return data_->mem_type();
225  }
226 
232  [[nodiscard]] constexpr size_t concat_data_size() const {
233  return data_offsets_.at(n_messages() - 1);
234  }
235 
241  [[nodiscard]] constexpr size_t concat_metadata_size() const {
242  return meta_offsets_.at(n_messages() - 1);
243  }
244 
250  [[nodiscard]] std::unique_ptr<std::vector<uint8_t>> release_metadata_buffer() {
251  return std::move(metadata_);
252  }
253 
259  [[nodiscard]] std::unique_ptr<Buffer> release_data_buffer() {
260  return std::move(data_);
261  }
262 
272  ChunkID chunk_id, PartID part_id, PackedData&& packed_data
273  );
274 
286  );
287 
298  static Chunk deserialize(std::vector<uint8_t> const& msg, bool validate = true);
299 
307  static bool validate_format(std::vector<uint8_t> const& serialized_buf);
308 
316  [[nodiscard]] bool is_ready() const {
317  // data_offsets_[-1] contains the size of the data buffer. If it is 0, the chunk
318  // has no data messages, so it is ready. Else, the chunk is ready if the data
319  // buffer is non-null and the data buffer is ready.
320  return !data_offsets_.empty()
321  && (data_offsets_[n_messages() - 1] == 0
322  || (data_ && data_->is_latest_write_done()));
323  }
324 
330  [[nodiscard]] std::string str() const;
331 
337  [[nodiscard]] std::unique_ptr<std::vector<uint8_t>> serialize() const;
338 
355  static Chunk concat(
356  std::vector<Chunk>&& chunks,
358  BufferResource* br,
359  std::optional<MemoryType> preferred_mem_type = std::nullopt
360  );
361 
362  private:
363  // constructor
364  Chunk(
366  std::vector<PartID> part_ids,
367  std::vector<size_t> expected_num_chunks,
368  std::vector<uint32_t> meta_offsets,
369  std::vector<uint64_t> data_offsets,
370  std::unique_ptr<std::vector<uint8_t>> metadata = nullptr,
371  std::unique_ptr<Buffer> data = nullptr
372  );
373 
374  ChunkID chunk_id_;
375  std::vector<PartID> part_ids_;
377  std::vector<size_t> expected_num_chunks_;
379  std::vector<uint32_t>
380  meta_offsets_;
381  std::vector<uint64_t>
382  data_offsets_;
383 
385  std::unique_ptr<std::vector<uint8_t>> metadata_;
386 
388  std::unique_ptr<Buffer> data_;
389 };
390 
395  public:
397 
403  static constexpr size_t byte_size = sizeof(ChunkID);
404 
410  [[nodiscard]] std::unique_ptr<std::vector<uint8_t>> pack() {
411  auto msg = std::make_unique<std::vector<uint8_t>>(sizeof(ChunkID));
412  std::memcpy(msg->data(), &cid, sizeof(cid));
413  return msg;
414  }
415 
422  [[nodiscard]] static ReadyForDataMessage unpack(
423  std::unique_ptr<std::vector<uint8_t>> const& msg
424  ) {
425  ChunkID cid;
426  std::memcpy(&cid, msg->data(), sizeof(cid));
427  return ReadyForDataMessage{cid};
428  }
429 
434  [[nodiscard]] std::string str() const {
435  std::stringstream ss;
436  ss << "ReadyForDataMessage(cid=" << cid << ")";
437  return ss.str();
438  }
439 };
440 
450 inline std::ostream& operator<<(std::ostream& os, Chunk const& obj) {
451  os << obj.str();
452  return os;
453 }
454 
465 inline std::ostream& operator<<(std::ostream& os, ReadyForDataMessage const& obj) {
466  os << obj.str();
467  return os;
468 }
469 
470 } // namespace detail
471 } // namespace rapidsmpf::shuffler
Class managing buffer resources.
Definition: resource.hpp:133
Chunk with multiple messages. This class contains two buffers for concatenated metadata and data.
Definition: chunk.hpp:58
constexpr size_t n_messages() const
The number of messages in the chunk.
Definition: chunk.hpp:109
void set_data_buffer(std::unique_ptr< Buffer > data)
Set the data buffer.
Definition: chunk.hpp:194
static Chunk from_finished_partition(ChunkID chunk_id, PartID part_id, size_t expected_num_chunks)
Create a single-message chunk for a finished partition (control message).
constexpr PartID part_id(size_t i) const
Partition ID of the i-th message.
Definition: chunk.hpp:119
bool is_metadata_buffer_set() const
Whether the metadata buffer is set.
Definition: chunk.hpp:213
static Chunk deserialize(std::vector< uint8_t > const &msg, bool validate=true)
Create a chunk by deserializing a metadata message.
constexpr size_t concat_metadata_size() const
Get the size of the concatenated metadata.
Definition: chunk.hpp:241
Chunk & operator=(Chunk &&other) noexcept=default
move assignment operator
Chunk(Chunk &&other) noexcept=default
move constructor
static Chunk from_packed_data(ChunkID chunk_id, PartID part_id, PackedData &&packed_data)
Create a single-message chunk from a packed data.
static bool validate_format(std::vector< uint8_t > const &serialized_buf)
Validate if a deserialized buffer follows the Chunk format.
bool is_ready() const
Whether the chunk is ready for consumption.
Definition: chunk.hpp:316
MemoryType data_memory_type() const
Get the memory type of the data buffer.
Definition: chunk.hpp:222
std::unique_ptr< std::vector< uint8_t > > serialize() const
Returns a metadata message that represents this chunk.
static Chunk concat(std::vector< Chunk > &&chunks, ChunkID chunk_id, BufferResource *br, std::optional< MemoryType > preferred_mem_type=std::nullopt)
Concatenate multiple chunks into a single chunk.
Chunk get_data(ChunkID new_chunk_id, size_t i, BufferResource *br)
Get the data of the i-th message, as a new chunk.
bool is_data_buffer_set() const
Whether the data buffer is set.
Definition: chunk.hpp:204
constexpr size_t expected_num_chunks(size_t i) const
The expected number of chunks of the i-th message.
Definition: chunk.hpp:130
static constexpr size_t metadata_message_header_size(size_t n_messages)
The size of the metadata message header.
Definition: chunk.hpp:88
constexpr bool is_control_message(size_t i) const
Whether the i-th message is a control message.
Definition: chunk.hpp:140
std::unique_ptr< std::vector< uint8_t > > release_metadata_buffer()
Release the ownership of the metadata buffer.
Definition: chunk.hpp:250
constexpr ChunkID chunk_id() const
The ID of the chunk.
Definition: chunk.hpp:100
constexpr size_t data_size(size_t i) const
Get the size of the packed data of the i-th message.
Definition: chunk.hpp:184
constexpr size_t concat_data_size() const
Get the size of the concatenated data.
Definition: chunk.hpp:232
constexpr uint32_t metadata_size(size_t i) const
Get the size of the metadata of the i-th message.
Definition: chunk.hpp:172
std::string str() const
Returns a description of this chunk.
std::unique_ptr< Buffer > release_data_buffer()
Release the ownership of the data buffer.
Definition: chunk.hpp:259
Represents a message indicating readiness to receive data for a specific chunk.
Definition: chunk.hpp:394
ChunkID cid
Chunk ID associated with the message.
Definition: chunk.hpp:396
static ReadyForDataMessage unpack(std::unique_ptr< std::vector< uint8_t >> const &msg)
Deserializes a message from a byte array.
Definition: chunk.hpp:422
std::unique_ptr< std::vector< uint8_t > > pack()
Serializes the message into a byte array.
Definition: chunk.hpp:410
std::string str() const
Returns a description of this instance.
Definition: chunk.hpp:434
static constexpr size_t byte_size
The size of the message in bytes when serialized.
Definition: chunk.hpp:403
std::ostream & operator<<(std::ostream &os, Chunk const &obj)
Overloads the stream insertion operator for the Chunk class.
Definition: chunk.hpp:450
std::uint64_t ChunkID
The globally unique ID of a chunk.
Definition: chunk.hpp:29
Shuffler interfaces.
Definition: chunk.hpp:15
std::uint32_t PartID
Partition ID, which goes from 0 to the total number of partitions.
Definition: chunk.hpp:22
Bag of bytes with metadata suitable for sending over the wire.
Definition: packed_data.hpp:26