spillable_messages.hpp
1 
6 #pragma once
7 
8 #include <cstdint>
9 #include <map>
10 #include <mutex>
11 #include <optional>
12 #include <unordered_map>
13 
14 #include <rapidsmpf/memory/buffer_resource.hpp>
15 #include <rapidsmpf/memory/content_description.hpp>
16 #include <rapidsmpf/streaming/core/message.hpp>
17 
18 namespace rapidsmpf::streaming {
19 
40  public:
42  using MessageId = std::uint64_t;
43 
44  SpillableMessages() = default;
45  SpillableMessages(SpillableMessages const&) = delete;
46  SpillableMessages& operator=(SpillableMessages const&) = delete;
47  SpillableMessages(SpillableMessages&&) noexcept = delete;
48  SpillableMessages& operator=(SpillableMessages&&) noexcept = delete;
49 
56  [[nodiscard]] MessageId insert(Message&& message);
57 
69  [[nodiscard]] Message extract(MessageId mid);
70 
91  [[nodiscard]] Message copy(MessageId mid, MemoryReservation& reservation);
92 
109  [[nodiscard]] std::size_t spill(MessageId mid, BufferResource* br) const;
110 
125 
135 
136  private:
150  struct Item {
151  mutable std::mutex mutex;
152  std::optional<Message> message;
153 
154  Item() noexcept = default;
155 
156  Item(Message&& message) : message(std::move(message)) {}
157  };
158 
159  // Never lock the global mutex and an item's mutex at the same time!
160  mutable std::mutex global_mutex_;
161  MessageId counter_{0};
162  std::unordered_map<MessageId, std::shared_ptr<Item>> items_;
163  mutable std::map<MessageId, ContentDescription> content_descriptions_;
164 };
165 
166 } // namespace rapidsmpf::streaming
Class managing buffer resources.
Description of an object's content.
Represents a reservation for future memory allocation.
Type-erased message wrapper around a payload.
Definition: message.hpp:27
Container for individually spillable messages.
MessageId insert(Message &&message)
Insert a new message and return its assigned ID.
Message extract(MessageId mid)
Extract and remove a message by ID.
Message copy(MessageId mid, MemoryReservation &reservation)
Create a deep copy of a message without removing it.
std::size_t spill(MessageId mid, BufferResource *br) const
Spill a message's device memory to host memory.
std::map< MessageId, ContentDescription > get_content_descriptions() const
Get a snapshot of current messages' content descriptions.
std::uint64_t MessageId
Unique identifier assigned to each message.
ContentDescription get_content_description(MessageId mid) const
Get the content description of a message by ID.