chunk.hpp
1 
5 #pragma once
6 
7 #include <memory>
8 #include <vector>
9 
10 #include <rapidsmpf/communicator/communicator.hpp>
11 #include <rapidsmpf/memory/buffer.hpp>
12 #include <rapidsmpf/memory/buffer_resource.hpp>
13 #include <rapidsmpf/memory/packed_data.hpp>
14 
16 
22 using PartID = std::uint32_t;
23 
24 namespace detail {
25 
29 using ChunkID = std::uint64_t;
30 
58 class Chunk {
59  public:
64  Chunk(Chunk&& other) noexcept = default;
65 
71  Chunk& operator=(Chunk&& other) noexcept = default;
72 
73  // delete copy constructor
74  Chunk(Chunk const&) = delete;
75 
76  // delete copy assignment operator
77  Chunk& operator=(Chunk const&) = delete;
78 
84  static constexpr std::size_t metadata_message_header_size() {
85  return sizeof(ChunkID) + sizeof(PartID) + sizeof(std::size_t)
86  + sizeof(std::uint32_t) + sizeof(std::uint64_t);
87  }
88 
94  [[nodiscard]] constexpr ChunkID chunk_id() const {
95  return chunk_id_;
96  }
97 
103  [[nodiscard]] constexpr PartID part_id() const {
104  return part_id_;
105  }
106 
113  [[nodiscard]] constexpr std::size_t expected_num_chunks() const {
114  return expected_num_chunks_;
115  }
116 
122  [[nodiscard]] constexpr bool is_control_message() const {
123  // We use `expected_num_chunks > 0` to flag a message as a "control message".
124  return expected_num_chunks() > 0;
125  }
126 
133  [[nodiscard]] constexpr std::uint32_t metadata_size() const {
134  return metadata_size_;
135  }
136 
143  [[nodiscard]] constexpr std::size_t data_size() const {
144  return data_size_;
145  }
146 
152  void set_data_buffer(std::unique_ptr<Buffer> data) {
153  RAPIDSMPF_EXPECTS(!data_, "buffer is already set");
154  data_ = std::move(data);
155  }
156 
162  [[nodiscard]] bool is_data_buffer_set() const {
163  return data_ != nullptr;
164  }
165 
171  [[nodiscard]] bool is_metadata_buffer_set() const {
172  return metadata_ != nullptr && !metadata_->empty();
173  }
174 
180  [[nodiscard]] MemoryType data_memory_type() const {
181  RAPIDSMPF_EXPECTS(data_, "data buffer is not set");
182  return data_->mem_type();
183  }
184 
190  [[nodiscard]] std::unique_ptr<std::vector<std::uint8_t>> release_metadata_buffer() {
191  return std::move(metadata_);
192  }
193 
199  [[nodiscard]] std::unique_ptr<Buffer> release_data_buffer() {
200  return std::move(data_);
201  }
202 
212  ChunkID chunk_id, PartID part_id, PackedData&& packed_data
213  );
214 
225  );
226 
245  std::vector<std::uint8_t> const& msg,
246  BufferResource* br,
247  bool validate = true,
248  std::unique_ptr<Buffer> data = nullptr
249  );
250 
258  static bool validate_format(std::vector<std::uint8_t> const& serialized_buf);
259 
267  [[nodiscard]] bool is_ready() const {
268  // data_size_ contains the size of the data buffer. If it is 0, the chunk
269  // has no data, so it is ready. Else, the chunk is ready if the data
270  // buffer is non-null and the data buffer is ready.
271  return data_size_ == 0 || (data_ && data_->is_latest_write_done());
272  }
273 
279  [[nodiscard]] std::string str() const;
280 
286  [[nodiscard]] std::unique_ptr<std::vector<std::uint8_t>> serialize() const;
287 
288  private:
289  // constructor
290  Chunk(
292  PartID part_id,
293  std::size_t expected_num_chunks,
294  std::uint32_t metadata_size,
295  std::uint64_t data_size,
296  std::unique_ptr<std::vector<std::uint8_t>> metadata = nullptr,
297  std::unique_ptr<Buffer> data = nullptr
298  );
299 
300  ChunkID chunk_id_;
301  PartID part_id_;
302  std::size_t
303  expected_num_chunks_;
304  std::uint32_t metadata_size_;
305  std::uint64_t data_size_;
306 
308  std::unique_ptr<std::vector<std::uint8_t>> metadata_;
309 
311  std::unique_ptr<Buffer> data_;
312 };
313 
323 std::ostream& operator<<(std::ostream& os, Chunk const& obj);
324 
325 } // namespace detail
326 } // namespace rapidsmpf::shuffler
Class managing buffer resources.
A partition chunk representing either a data message or a control message.
Definition: chunk.hpp:58
void set_data_buffer(std::unique_ptr< Buffer > data)
Set the data buffer.
Definition: chunk.hpp:152
static constexpr std::size_t metadata_message_header_size()
The size of the metadata message header.
Definition: chunk.hpp:84
constexpr PartID part_id() const
Partition ID of the message.
Definition: chunk.hpp:103
std::unique_ptr< std::vector< std::uint8_t > > serialize() const
Returns a metadata message that represents this chunk.
bool is_metadata_buffer_set() const
Whether the metadata buffer is set.
Definition: chunk.hpp:171
std::unique_ptr< std::vector< std::uint8_t > > release_metadata_buffer()
Release the ownership of the metadata buffer.
Definition: chunk.hpp:190
constexpr std::size_t expected_num_chunks() const
The expected number of chunks for the message.
Definition: chunk.hpp:113
Chunk & operator=(Chunk &&other) noexcept=default
move assignment operator
Chunk(Chunk &&other) noexcept=default
move constructor
static Chunk from_packed_data(ChunkID chunk_id, PartID part_id, PackedData &&packed_data)
Create a chunk from a packed data.
constexpr bool is_control_message() const
Whether the message is a control message.
Definition: chunk.hpp:122
static bool validate_format(std::vector< std::uint8_t > const &serialized_buf)
Validate if a deserialized buffer follows the Chunk format.
static Chunk from_finished_partition(ChunkID chunk_id, PartID part_id, std::size_t expected_num_chunks)
Create a chunk for a finished partition (control message).
constexpr std::uint32_t metadata_size() const
Get the size of the metadata of the message.
Definition: chunk.hpp:133
bool is_ready() const
Whether the chunk is ready for consumption.
Definition: chunk.hpp:267
MemoryType data_memory_type() const
Get the memory type of the data buffer.
Definition: chunk.hpp:180
constexpr std::size_t data_size() const
Get the size of the packed data of the message.
Definition: chunk.hpp:143
bool is_data_buffer_set() const
Whether the data buffer is set.
Definition: chunk.hpp:162
static Chunk deserialize(std::vector< std::uint8_t > const &msg, BufferResource *br, bool validate=true, std::unique_ptr< Buffer > data=nullptr)
Create a chunk by deserializing a metadata message.
constexpr ChunkID chunk_id() const
The ID of the chunk.
Definition: chunk.hpp:94
std::string str() const
Returns a description of this chunk.
std::unique_ptr< Buffer > release_data_buffer()
Release the ownership of the data buffer.
Definition: chunk.hpp:199
std::ostream & operator<<(std::ostream &os, Chunk const &obj)
Overloads the stream insertion operator for the Chunk class.
std::uint64_t ChunkID
The globally unique ID of a chunk.
Definition: chunk.hpp:29
Shuffler interfaces.
Definition: chunk.hpp:15
std::uint32_t PartID
Partition ID, which goes from 0 to the total number of partitions.
Definition: chunk.hpp:22
MemoryType
Enum representing the type of memory sorted in decreasing order of preference.
Definition: memory_type.hpp:16
Bag of bytes with metadata suitable for sending over the wire.
Definition: packed_data.hpp:26