postbox.hpp
1 
5 #pragma once
6 
7 #include <mutex>
8 #include <string>
9 #include <unordered_map>
10 #include <vector>
11 
12 #include <rapidsmpf/error.hpp>
13 #include <rapidsmpf/shuffler/chunk.hpp>
14 
16 
20 class ChunksToSend {
21  public:
22  ChunksToSend() = default;
23 
29  void insert(std::unique_ptr<Chunk> c);
30 
38  [[nodiscard]] std::vector<Chunk> extract_ready();
39 
43  [[nodiscard]] bool empty() const;
44 
48  [[nodiscard]] std::string str() const;
49 
50  private:
51  mutable std::mutex mutex_{};
52  std::vector<std::unique_ptr<Chunk>> chunks_{};
53 };
54 
64 inline std::ostream& operator<<(std::ostream& os, ChunksToSend const& obj) {
65  os << obj.str();
66  return os;
67 }
68 
73  public:
79  ReceivedChunks(std::size_t num_keys_hint = 0) {
80  if (num_keys_hint > 0) {
81  pigeonhole_.reserve(num_keys_hint);
82  }
83  }
84 
90  void insert(Chunk&& chunk);
91 
101  [[nodiscard]] bool is_empty(PartID pid) const;
102 
111  [[nodiscard]] std::vector<Chunk> extract(PartID pid);
112 
121  [[nodiscard]] bool empty() const;
122 
126  [[nodiscard]] std::string str() const;
127 
137  [[nodiscard]] std::size_t spill(BufferResource* br, std::size_t amount);
138 
139  private:
140  // TODO: more fine-grained locking e.g. by locking each partition individually.
141  mutable std::mutex mutex_;
142  std::unordered_map<PartID, std::vector<Chunk>>
143  pigeonhole_;
144 };
145 
155 inline std::ostream& operator<<(std::ostream& os, ReceivedChunks const& obj) {
156  os << obj.str();
157  return os;
158 }
159 
160 } // namespace rapidsmpf::shuffler::detail
Class managing buffer resources.
A partition chunk representing either a data message or a control message.
Definition: chunk.hpp:58
A thread-safe container for managing outgoing (to send) chunks.
Definition: postbox.hpp:20
std::vector< Chunk > extract_ready()
Extract ready chunks.
void insert(std::unique_ptr< Chunk > c)
Insert a chunk into the container.
A thread-safe container for managing received chunks stratified by partition ID.
Definition: postbox.hpp:72
ReceivedChunks(std::size_t num_keys_hint=0)
Construct a new container.
Definition: postbox.hpp:79
std::size_t spill(BufferResource *br, std::size_t amount)
Spill device data.
void insert(Chunk &&chunk)
Insert a chunk.
bool is_empty(PartID pid) const
Check whether the specified partition contains any chunks.
std::vector< Chunk > extract(PartID pid)
Extracts all chunks associated with a specific partition.
bool empty() const
Checks if the container is empty.
Shuffler private interfaces.
Definition: chunk.hpp:24
std::ostream & operator<<(std::ostream &os, Chunk const &obj)
Overloads the stream insertion operator for the Chunk class.
std::uint32_t PartID
Partition ID, which goes from 0 to the total number of partitions.
Definition: chunk.hpp:22