utils.hpp
1 
5 #pragma once
6 
7 #include <cstdint>
8 #include <limits>
9 #include <memory>
10 #include <vector>
11 
12 #include <rapidsmpf/communicator/communicator.hpp>
13 #include <rapidsmpf/memory/buffer.hpp>
14 #include <rapidsmpf/memory/buffer_resource.hpp>
15 #include <rapidsmpf/memory/packed_data.hpp>
16 
17 namespace rapidsmpf::coll::detail {
18 
20 using ChunkID = std::uint64_t;
21 
31 class Chunk {
32  private:
33  ChunkID id_;
34  Rank destination_;
35  std::unique_ptr<std::vector<std::uint8_t>> metadata_;
36  std::unique_ptr<Buffer> data_;
37  std::uint64_t
38  data_size_;
40 
57  Chunk(
58  ChunkID id,
60  std::unique_ptr<std::vector<std::uint8_t>> metadata,
61  std::unique_ptr<Buffer> data
62  );
63 
74 
75  public:
79  static constexpr Rank INVALID_RANK = std::numeric_limits<Rank>::max();
80 
89  [[nodiscard]] bool is_ready() const noexcept;
90 
97  [[nodiscard]] MemoryType memory_type() const noexcept;
98 
104  [[nodiscard]] bool is_finish() const noexcept;
105 
111  [[nodiscard]] ChunkID id() const noexcept;
112 
118  [[nodiscard]] ChunkID sequence() const noexcept;
119 
125  [[nodiscard]] Rank origin() const noexcept;
126 
135  [[nodiscard]] Rank destination() const noexcept;
136 
142  [[nodiscard]] std::uint64_t data_size() const noexcept;
143 
149  [[nodiscard]] std::uint64_t metadata_size() const noexcept;
150 
160  [[nodiscard]] static std::unique_ptr<Chunk> from_packed_data(
161  std::uint64_t sequence, Rank origin, Rank destination, PackedData&& packed_data
162  );
163 
173  [[nodiscard]] static std::unique_ptr<Chunk> from_empty(
174  std::uint64_t num_local_insertions, Rank origin, Rank destination
175  );
176 
187  [[nodiscard]] PackedData release();
188 
190  static constexpr std::uint64_t ID_BITS = 38;
192  static constexpr std::uint64_t RANK_BITS =
193  sizeof(ChunkID) * std::numeric_limits<unsigned char>::digits - ID_BITS;
194 
203  static constexpr ChunkID chunk_id(std::uint64_t sequence, Rank origin);
204 
210  [[nodiscard]] std::unique_ptr<std::vector<std::uint8_t>> serialize() const;
211 
222  [[nodiscard]] static std::unique_ptr<Chunk> deserialize(
223  std::vector<std::uint8_t>& data, BufferResource* br
224  );
225 
231  [[nodiscard]] std::unique_ptr<Buffer> release_data_buffer() noexcept;
232 
241  void attach_data_buffer(std::unique_ptr<Buffer> data);
242 
244  ~Chunk() = default;
246  Chunk(Chunk&&) = default;
249  Chunk& operator=(Chunk&&) = default;
251  Chunk(Chunk const&) = delete;
253  Chunk& operator=(Chunk const&) = delete;
254 };
255 
262 class PostBox {
263  public:
265  PostBox() = default;
267  ~PostBox() = default;
269  PostBox(PostBox const&) = delete;
271  PostBox& operator=(PostBox const&) = delete;
273  PostBox(PostBox&&) = delete;
275  PostBox& operator=(PostBox&&) = delete;
276 
282  void insert(std::unique_ptr<Chunk> chunk);
283 
289  void insert(std::vector<std::unique_ptr<Chunk>>&& chunks);
290 
299  [[nodiscard]] std::vector<std::unique_ptr<Chunk>> extract_ready();
300 
309  [[nodiscard]] std::vector<std::unique_ptr<Chunk>> extract();
310 
316  [[nodiscard]] std::size_t size() const noexcept;
317 
323  [[nodiscard]] bool empty() const noexcept;
324 
337  [[nodiscard]] std::size_t spill(BufferResource* br, std::size_t amount);
338 
339  private:
340  mutable std::mutex mutex_{};
341  std::vector<std::unique_ptr<Chunk>> chunks_{};
342 };
343 
356 [[nodiscard]] std::vector<std::unique_ptr<Chunk>> test_some(
357  std::vector<std::unique_ptr<Chunk>>& chunks,
358  std::vector<std::unique_ptr<Communicator::Future>>& futures,
359  Communicator* comm
360 );
361 
362 } // namespace rapidsmpf::coll::detail
Class managing buffer resources.
Buffer representing device or host memory.
Definition: buffer.hpp:47
Abstract base class for a communication mechanism between nodes.
Represents a data chunk in the allgather operation.
Definition: utils.hpp:31
Rank destination() const noexcept
The local destination rank associated with this chunk.
bool is_ready() const noexcept
Check if the chunk is ready for processing.
static std::unique_ptr< Chunk > deserialize(std::vector< std::uint8_t > &data, BufferResource *br)
Deserialize a chunk from a byte vector of its metadata.
static std::unique_ptr< Chunk > from_packed_data(std::uint64_t sequence, Rank origin, Rank destination, PackedData &&packed_data)
Create a data chunk from packed data.
static std::unique_ptr< Chunk > from_empty(std::uint64_t num_local_insertions, Rank origin, Rank destination)
Create an empty finish marker chunk.
std::unique_ptr< Buffer > release_data_buffer() noexcept
Release and return the data buffer.
void attach_data_buffer(std::unique_ptr< Buffer > data)
Attach a data buffer to this chunk.
Rank origin() const noexcept
The origin rank of the chunk.
static constexpr std::uint64_t RANK_BITS
Number of bits used for the rank in the chunk identifier.
Definition: utils.hpp:192
std::uint64_t data_size() const noexcept
The size of the data buffer in bytes.
std::uint64_t metadata_size() const noexcept
The size of the metadata buffer in bytes.
static constexpr std::uint64_t ID_BITS
Number of bits used for the sequence ID in the chunk identifier.
Definition: utils.hpp:190
std::unique_ptr< std::vector< std::uint8_t > > serialize() const
Serialize the metadata of the chunk to a byte vector.
static constexpr Rank INVALID_RANK
Sentinel destination for chunks with unknown destination.
Definition: utils.hpp:79
ChunkID sequence() const noexcept
The sequence number of the chunk.
bool is_finish() const noexcept
Check if this is a finish marker chunk.
PackedData release()
Release the chunk's data as PackedData.
MemoryType memory_type() const noexcept
Return the memory type of the chunk.
static constexpr ChunkID chunk_id(std::uint64_t sequence, Rank origin)
Create a ChunkID from a sequence number and origin rank.
A thread-safe container for managing chunks in collectives.
Definition: utils.hpp:262
void insert(std::unique_ptr< Chunk > chunk)
Insert a single chunk into the postbox.
std::vector< std::unique_ptr< Chunk > > extract()
Extract all chunks from the postbox.
PostBox & operator=(PostBox &&)=delete
Deleted move assignment operator.
std::vector< std::unique_ptr< Chunk > > extract_ready()
Extract ready chunks from the postbox.
std::size_t size() const noexcept
Check the number of chunks currently stored.
~PostBox()=default
Default destructor.
void insert(std::vector< std::unique_ptr< Chunk >> &&chunks)
Insert multiple chunks into the postbox.
PostBox()=default
Default constructor.
PostBox(PostBox &&)=delete
Deleted move constructor.
PostBox & operator=(PostBox const &)=delete
Deleted copy assignment operator.
PostBox(PostBox const &)=delete
Deleted copy constructor.
std::vector< std::unique_ptr< Chunk > > test_some(std::vector< std::unique_ptr< Chunk >> &chunks, std::vector< std::unique_ptr< Communicator::Future >> &futures, Communicator *comm)
Complete posted chunk receives.
std::uint64_t ChunkID
Type alias for chunk identifiers.
Definition: utils.hpp:20
std::int32_t Rank
The rank of a node (e.g. the rank of a MPI process), or world size (total number of ranks).
MemoryType
Enum representing the type of memory sorted in decreasing order of preference.
Definition: memory_type.hpp:16
Bag of bytes with metadata suitable for sending over the wire.
Definition: packed_data.hpp:26