chunk.hpp
1 
5 #pragma once
6 
7 #include <memory>
8 #include <vector>
9 
10 #include <rapidsmpf/communicator/communicator.hpp>
11 #include <rapidsmpf/memory/buffer.hpp>
12 #include <rapidsmpf/memory/buffer_resource.hpp>
13 #include <rapidsmpf/memory/packed_data.hpp>
14 
16 
22 using PartID = std::uint32_t;
23 
24 namespace detail {
25 
29 using ChunkID = std::uint64_t;
30 
58 class Chunk {
59  // friend a method that creates a dummy chunk for testing
60  friend Chunk make_dummy_chunk(ChunkID, PartID);
61 
62  public:
67  Chunk(Chunk&& other) noexcept = default;
68 
74  Chunk& operator=(Chunk&& other) noexcept = default;
75 
76  // delete copy constructor
77  Chunk(Chunk const&) = delete;
78 
79  // delete copy assignment operator
80  Chunk& operator=(Chunk const&) = delete;
81 
87  static constexpr std::size_t metadata_message_header_size() {
88  return sizeof(ChunkID) + sizeof(PartID) + sizeof(std::size_t)
89  + sizeof(std::uint32_t) + sizeof(std::uint64_t);
90  }
91 
97  [[nodiscard]] constexpr ChunkID chunk_id() const {
98  return chunk_id_;
99  }
100 
106  [[nodiscard]] constexpr PartID part_id() const {
107  return part_id_;
108  }
109 
116  [[nodiscard]] constexpr std::size_t expected_num_chunks() const {
117  return expected_num_chunks_;
118  }
119 
125  [[nodiscard]] constexpr bool is_control_message() const {
126  // We use `expected_num_chunks > 0` to flag a message as a "control message".
127  return expected_num_chunks() > 0;
128  }
129 
136  [[nodiscard]] constexpr std::uint32_t metadata_size() const {
137  return metadata_size_;
138  }
139 
146  [[nodiscard]] constexpr std::size_t data_size() const {
147  return data_size_;
148  }
149 
155  void set_data_buffer(std::unique_ptr<Buffer> data) {
156  RAPIDSMPF_EXPECTS(!data_, "buffer is already set");
157  data_ = std::move(data);
158  }
159 
165  [[nodiscard]] bool is_data_buffer_set() const {
166  return data_ != nullptr;
167  }
168 
174  [[nodiscard]] bool is_metadata_buffer_set() const {
175  return metadata_ != nullptr && !metadata_->empty();
176  }
177 
183  [[nodiscard]] MemoryType data_memory_type() const {
184  RAPIDSMPF_EXPECTS(data_, "data buffer is not set");
185  return data_->mem_type();
186  }
187 
193  [[nodiscard]] std::unique_ptr<std::vector<std::uint8_t>> release_metadata_buffer() {
194  return std::move(metadata_);
195  }
196 
202  [[nodiscard]] std::unique_ptr<Buffer> release_data_buffer() {
203  return std::move(data_);
204  }
205 
215  ChunkID chunk_id, PartID part_id, PackedData&& packed_data
216  );
217 
228  );
229 
244  std::vector<std::uint8_t> const& msg, BufferResource* br, bool validate = true
245  );
246 
254  static bool validate_format(std::vector<std::uint8_t> const& serialized_buf);
255 
263  [[nodiscard]] bool is_ready() const {
264  // data_size_ contains the size of the data buffer. If it is 0, the chunk
265  // has no data, so it is ready. Else, the chunk is ready if the data
266  // buffer is non-null and the data buffer is ready.
267  return data_size_ == 0 || (data_ && data_->is_latest_write_done());
268  }
269 
275  [[nodiscard]] std::string str() const;
276 
282  [[nodiscard]] std::unique_ptr<std::vector<std::uint8_t>> serialize() const;
283 
284  private:
285  // constructor
286  Chunk(
288  PartID part_id,
289  std::size_t expected_num_chunks,
290  std::uint32_t metadata_size,
291  std::uint64_t data_size,
292  std::unique_ptr<std::vector<std::uint8_t>> metadata = nullptr,
293  std::unique_ptr<Buffer> data = nullptr
294  );
295 
296  ChunkID chunk_id_;
297  PartID part_id_;
298  std::size_t
299  expected_num_chunks_;
300  std::uint32_t metadata_size_;
301  std::uint64_t data_size_;
302 
304  std::unique_ptr<std::vector<std::uint8_t>> metadata_;
305 
307  std::unique_ptr<Buffer> data_;
308 };
309 
319 std::ostream& operator<<(std::ostream& os, Chunk const& obj);
320 
321 } // namespace detail
322 } // namespace rapidsmpf::shuffler
Class managing buffer resources.
A partition chunk representing either a data message or a control message.
Definition: chunk.hpp:58
void set_data_buffer(std::unique_ptr< Buffer > data)
Set the data buffer.
Definition: chunk.hpp:155
static constexpr std::size_t metadata_message_header_size()
The size of the metadata message header.
Definition: chunk.hpp:87
constexpr PartID part_id() const
Partition ID of the message.
Definition: chunk.hpp:106
std::unique_ptr< std::vector< std::uint8_t > > serialize() const
Returns a metadata message that represents this chunk.
bool is_metadata_buffer_set() const
Whether the metadata buffer is set.
Definition: chunk.hpp:174
std::unique_ptr< std::vector< std::uint8_t > > release_metadata_buffer()
Release the ownership of the metadata buffer.
Definition: chunk.hpp:193
constexpr std::size_t expected_num_chunks() const
The expected number of chunks for the message.
Definition: chunk.hpp:116
Chunk & operator=(Chunk &&other) noexcept=default
move assignment operator
Chunk(Chunk &&other) noexcept=default
move constructor
static Chunk from_packed_data(ChunkID chunk_id, PartID part_id, PackedData &&packed_data)
Create a chunk from a packed data.
constexpr bool is_control_message() const
Whether the message is a control message.
Definition: chunk.hpp:125
static bool validate_format(std::vector< std::uint8_t > const &serialized_buf)
Validate if a deserialized buffer follows the Chunk format.
static Chunk deserialize(std::vector< std::uint8_t > const &msg, BufferResource *br, bool validate=true)
Create a chunk by deserializing a metadata message.
static Chunk from_finished_partition(ChunkID chunk_id, PartID part_id, std::size_t expected_num_chunks)
Create a chunk for a finished partition (control message).
constexpr std::uint32_t metadata_size() const
Get the size of the metadata of the message.
Definition: chunk.hpp:136
bool is_ready() const
Whether the chunk is ready for consumption.
Definition: chunk.hpp:263
MemoryType data_memory_type() const
Get the memory type of the data buffer.
Definition: chunk.hpp:183
constexpr std::size_t data_size() const
Get the size of the packed data of the message.
Definition: chunk.hpp:146
bool is_data_buffer_set() const
Whether the data buffer is set.
Definition: chunk.hpp:165
constexpr ChunkID chunk_id() const
The ID of the chunk.
Definition: chunk.hpp:97
std::string str() const
Returns a description of this chunk.
std::unique_ptr< Buffer > release_data_buffer()
Release the ownership of the data buffer.
Definition: chunk.hpp:202
std::ostream & operator<<(std::ostream &os, Chunk const &obj)
Overloads the stream insertion operator for the Chunk class.
std::uint64_t ChunkID
The globally unique ID of a chunk.
Definition: chunk.hpp:29
Shuffler interfaces.
Definition: chunk.hpp:15
std::uint32_t PartID
Partition ID, which goes from 0 to the total number of partitions.
Definition: chunk.hpp:22
MemoryType
Enum representing the type of memory sorted in decreasing order of preference.
Definition: memory_type.hpp:16
Bag of bytes with metadata suitable for sending over the wire.
Definition: packed_data.hpp:26