11 #include <rapidsmpf/buffer/buffer.hpp>
12 #include <rapidsmpf/buffer/packed_data.hpp>
13 #include <rapidsmpf/communicator/communicator.hpp>
89 return sizeof(
ChunkID) +
sizeof(
size_t)
91 * (
sizeof(
PartID) +
sizeof(
size_t) +
sizeof(uint32_t)
110 return part_ids_.size();
120 return part_ids_.at(i);
131 return expected_num_chunks_.at(i);
173 return i == 0 ? meta_offsets_.at(0)
174 : meta_offsets_.at(i) - meta_offsets_.at(i - 1);
184 [[nodiscard]] constexpr
size_t data_size(
size_t i)
const {
185 return i == 0 ? data_offsets_.at(0)
186 : data_offsets_.at(i) - data_offsets_.at(i - 1);
195 RAPIDSMPF_EXPECTS(!data_,
"buffer is already set");
196 data_ = std::move(data);
205 return data_ !=
nullptr;
214 return metadata_ !=
nullptr && !metadata_->empty();
223 RAPIDSMPF_EXPECTS(data_,
"data buffer is not set");
224 return data_->mem_type();
251 return std::move(metadata_);
260 return std::move(data_);
320 return !data_offsets_.empty()
322 || (data_ && data_->is_latest_write_done()));
330 [[nodiscard]] std::string
str()
const;
337 [[nodiscard]] std::unique_ptr<std::vector<uint8_t>>
serialize()
const;
356 std::vector<Chunk>&& chunks,
359 std::optional<MemoryType> preferred_mem_type = std::nullopt
366 std::vector<PartID> part_ids,
368 std::vector<uint32_t> meta_offsets,
369 std::vector<uint64_t> data_offsets,
370 std::unique_ptr<std::vector<uint8_t>> metadata =
nullptr,
371 std::unique_ptr<Buffer> data =
nullptr
375 std::vector<PartID> part_ids_;
377 std::vector<size_t> expected_num_chunks_;
379 std::vector<uint32_t>
381 std::vector<uint64_t>
385 std::unique_ptr<std::vector<uint8_t>> metadata_;
388 std::unique_ptr<Buffer> data_;
410 [[nodiscard]] std::unique_ptr<std::vector<uint8_t>>
pack() {
411 auto msg = std::make_unique<std::vector<uint8_t>>(
sizeof(
ChunkID));
412 std::memcpy(msg->data(), &
cid,
sizeof(
cid));
423 std::unique_ptr<std::vector<uint8_t>>
const& msg
426 std::memcpy(&
cid, msg->data(),
sizeof(
cid));
434 [[nodiscard]] std::string
str()
const {
435 std::stringstream ss;
436 ss <<
"ReadyForDataMessage(cid=" <<
cid <<
")";
Class managing buffer resources.
Chunk with multiple messages. This class contains two buffers for concatenated metadata and data.
constexpr size_t n_messages() const
The number of messages in the chunk.
void set_data_buffer(std::unique_ptr< Buffer > data)
Set the data buffer.
static Chunk from_finished_partition(ChunkID chunk_id, PartID part_id, size_t expected_num_chunks)
Create a single-message chunk for a finished partition (control message).
constexpr PartID part_id(size_t i) const
Partition ID of the i-th message.
bool is_metadata_buffer_set() const
Whether the metadata buffer is set.
static Chunk deserialize(std::vector< uint8_t > const &msg, bool validate=true)
Create a chunk by deserializing a metadata message.
constexpr size_t concat_metadata_size() const
Get the size of the concatenated metadata.
Chunk & operator=(Chunk &&other) noexcept=default
move assignment operator
Chunk(Chunk &&other) noexcept=default
move constructor
static Chunk from_packed_data(ChunkID chunk_id, PartID part_id, PackedData &&packed_data)
Create a single-message chunk from a packed data.
static bool validate_format(std::vector< uint8_t > const &serialized_buf)
Validate if a deserialized buffer follows the Chunk format.
bool is_ready() const
Whether the chunk is ready for consumption.
MemoryType data_memory_type() const
Get the memory type of the data buffer.
std::unique_ptr< std::vector< uint8_t > > serialize() const
Returns a metadata message that represents this chunk.
static Chunk concat(std::vector< Chunk > &&chunks, ChunkID chunk_id, BufferResource *br, std::optional< MemoryType > preferred_mem_type=std::nullopt)
Concatenate multiple chunks into a single chunk.
Chunk get_data(ChunkID new_chunk_id, size_t i, BufferResource *br)
Get the data of the i-th message, as a new chunk.
bool is_data_buffer_set() const
Whether the data buffer is set.
constexpr size_t expected_num_chunks(size_t i) const
The expected number of chunks of the i-th message.
static constexpr size_t metadata_message_header_size(size_t n_messages)
The size of the metadata message header.
constexpr bool is_control_message(size_t i) const
Whether the i-th message is a control message.
std::unique_ptr< std::vector< uint8_t > > release_metadata_buffer()
Release the ownership of the metadata buffer.
constexpr ChunkID chunk_id() const
The ID of the chunk.
constexpr size_t data_size(size_t i) const
Get the size of the packed data of the i-th message.
constexpr size_t concat_data_size() const
Get the size of the concatenated data.
constexpr uint32_t metadata_size(size_t i) const
Get the size of the metadata of the i-th message.
std::string str() const
Returns a description of this chunk.
std::unique_ptr< Buffer > release_data_buffer()
Release the ownership of the data buffer.
Represents a message indicating readiness to receive data for a specific chunk.
ChunkID cid
Chunk ID associated with the message.
static ReadyForDataMessage unpack(std::unique_ptr< std::vector< uint8_t >> const &msg)
Deserializes a message from a byte array.
std::unique_ptr< std::vector< uint8_t > > pack()
Serializes the message into a byte array.
std::string str() const
Returns a description of this instance.
static constexpr size_t byte_size
The size of the message in bytes when serialized.
std::ostream & operator<<(std::ostream &os, Chunk const &obj)
Overloads the stream insertion operator for the Chunk class.
std::uint64_t ChunkID
The globally unique ID of a chunk.
std::uint32_t PartID
Partition ID, which goes from 0 to the total number of partitions.
Bag of bytes with metadata suitable for sending over the wire.