spillable_messages.hpp
1 
6 #pragma once
7 
8 #include <cstdint>
9 #include <map>
10 #include <mutex>
11 #include <optional>
12 #include <unordered_map>
13 
14 #include <rapidsmpf/buffer/content_description.hpp>
15 #include <rapidsmpf/streaming/core/message.hpp>
16 
17 namespace rapidsmpf::streaming {
18 
39  public:
41  using MessageId = std::uint64_t;
42 
43  SpillableMessages() = default;
44  SpillableMessages(SpillableMessages const&) = delete;
45  SpillableMessages& operator=(SpillableMessages const&) = delete;
46  SpillableMessages(SpillableMessages&&) noexcept = delete;
47  SpillableMessages& operator=(SpillableMessages&&) noexcept = delete;
48 
55  [[nodiscard]] MessageId insert(Message&& message);
56 
68  [[nodiscard]] Message extract(MessageId mid);
69 
86  [[nodiscard]] std::size_t spill(MessageId mid, BufferResource* br) const;
87 
102 
103  private:
117  struct Item {
118  mutable std::mutex mutex;
119  std::optional<Message> message;
120 
121  Item() noexcept = default;
122 
123  Item(Message&& message) : message(std::move(message)) {}
124  };
125 
126  // Never lock the global mutex and an item's mutex at the same time!
127  mutable std::mutex global_mutex_;
128  MessageId counter_{0};
129  std::unordered_map<MessageId, std::shared_ptr<Item>> items_;
130  mutable std::map<MessageId, ContentDescription> content_descriptions_;
131 };
132 
133 } // namespace rapidsmpf::streaming
Class managing buffer resources.
Definition: resource.hpp:133
Description of an object's content.
Type-erased message wrapper around a payload.
Definition: message.hpp:27
Container for individually spillable messages.
MessageId insert(Message &&message)
Insert a new message and return its assigned ID.
Message extract(MessageId mid)
Extract and remove a message by ID.
std::size_t spill(MessageId mid, BufferResource *br) const
Spill a message's device memory to host memory.
std::map< MessageId, ContentDescription > get_content_descriptions() const
Get a snapshot of current messages' content descriptions.
std::uint64_t MessageId
Unique identifier assigned to each message.