12 #include <rapidsmpf/communicator/communicator.hpp>
13 #include <rapidsmpf/memory/buffer.hpp>
14 #include <rapidsmpf/memory/buffer_resource.hpp>
15 #include <rapidsmpf/memory/packed_data.hpp>
35 std::unique_ptr<std::vector<std::uint8_t>> metadata_;
36 std::unique_ptr<Buffer> data_;
60 std::unique_ptr<std::vector<std::uint8_t>> metadata,
61 std::unique_ptr<Buffer> data
193 sizeof(
ChunkID) * std::numeric_limits<
unsigned char>::digits -
ID_BITS;
210 [[nodiscard]] std::unique_ptr<std::vector<std::uint8_t>>
serialize() const;
282 void insert(std::unique_ptr<Chunk> chunk);
289 void insert(std::vector<std::unique_ptr<Chunk>>&& chunks);
309 [[nodiscard]] std::vector<std::unique_ptr<Chunk>>
extract();
316 [[nodiscard]] std::size_t
size() const noexcept;
323 [[nodiscard]]
bool empty() const noexcept;
340 mutable std::mutex mutex_{};
341 std::vector<std::unique_ptr<Chunk>> chunks_{};
356 [[nodiscard]] std::vector<std::unique_ptr<Chunk>>
test_some(
357 std::vector<std::unique_ptr<Chunk>>& chunks,
358 std::vector<std::unique_ptr<Communicator::Future>>& futures,
Class managing buffer resources.
Buffer representing device or host memory.
Abstract base class for a communication mechanism between nodes.
Represents a data chunk in the allgather operation.
Rank destination() const noexcept
The local destination rank associated with this chunk.
bool is_ready() const noexcept
Check if the chunk is ready for processing.
static std::unique_ptr< Chunk > deserialize(std::vector< std::uint8_t > &data, BufferResource *br)
Deserialize a chunk from a byte vector of its metadata.
static std::unique_ptr< Chunk > from_packed_data(std::uint64_t sequence, Rank origin, Rank destination, PackedData &&packed_data)
Create a data chunk from packed data.
static std::unique_ptr< Chunk > from_empty(std::uint64_t num_local_insertions, Rank origin, Rank destination)
Create an empty finish marker chunk.
std::unique_ptr< Buffer > release_data_buffer() noexcept
Release and return the data buffer.
void attach_data_buffer(std::unique_ptr< Buffer > data)
Attach a data buffer to this chunk.
Rank origin() const noexcept
The origin rank of the chunk.
static constexpr std::uint64_t RANK_BITS
Number of bits used for the rank in the chunk identifier.
std::uint64_t data_size() const noexcept
The size of the data buffer in bytes.
std::uint64_t metadata_size() const noexcept
The size of the metadata buffer in bytes.
static constexpr std::uint64_t ID_BITS
Number of bits used for the sequence ID in the chunk identifier.
std::unique_ptr< std::vector< std::uint8_t > > serialize() const
Serialize the metadata of the chunk to a byte vector.
static constexpr Rank INVALID_RANK
Sentinel destination for chunks with unknown destination.
ChunkID sequence() const noexcept
The sequence number of the chunk.
bool is_finish() const noexcept
Check if this is a finish marker chunk.
PackedData release()
Release the chunk's data as PackedData.
MemoryType memory_type() const noexcept
Return the memory type of the chunk.
static constexpr ChunkID chunk_id(std::uint64_t sequence, Rank origin)
Create a ChunkID from a sequence number and origin rank.
A thread-safe container for managing chunks in collectives.
void insert(std::unique_ptr< Chunk > chunk)
Insert a single chunk into the postbox.
std::vector< std::unique_ptr< Chunk > > extract()
Extract all chunks from the postbox.
PostBox & operator=(PostBox &&)=delete
Deleted move assignment operator.
std::vector< std::unique_ptr< Chunk > > extract_ready()
Extract ready chunks from the postbox.
std::size_t size() const noexcept
Check the number of chunks currently stored.
~PostBox()=default
Default destructor.
void insert(std::vector< std::unique_ptr< Chunk >> &&chunks)
Insert multiple chunks into the postbox.
PostBox()=default
Default constructor.
PostBox(PostBox &&)=delete
Deleted move constructor.
PostBox & operator=(PostBox const &)=delete
Deleted copy assignment operator.
PostBox(PostBox const &)=delete
Deleted copy constructor.
std::vector< std::unique_ptr< Chunk > > test_some(std::vector< std::unique_ptr< Chunk >> &chunks, std::vector< std::unique_ptr< Communicator::Future >> &futures, Communicator *comm)
Complete posted chunk receives.
std::uint64_t ChunkID
Type alias for chunk identifiers.
std::int32_t Rank
The rank of a node (e.g. the rank of a MPI process), or world size (total number of ranks).
MemoryType
Enum representing the type of memory sorted in decreasing order of preference.
Bag of bytes with metadata suitable for sending over the wire.