memory_reserve_or_wait.hpp
1 
6 #pragma once
7 
8 #include <optional>
9 #include <set>
10 
11 #include <coro/task.hpp>
12 
13 #include <rapidsmpf/config.hpp>
14 #include <rapidsmpf/memory/buffer_resource.hpp>
15 #include <rapidsmpf/streaming/core/actor.hpp>
16 #include <rapidsmpf/streaming/core/coro_executor.hpp>
17 #include <rapidsmpf/streaming/core/coro_utils.hpp>
18 #include <rapidsmpf/utils/misc.hpp>
19 
20 namespace rapidsmpf::streaming {
21 
22 // Forward declaration
23 class Context;
24 
34  public:
46  static constexpr std::int64_t missing_net_memory_delta = 0;
47 
63  config::Options options,
64  MemoryType mem_type,
65  std::shared_ptr<CoroThreadPoolExecutor> executor,
66  std::shared_ptr<BufferResource> br
67  );
68 
69  ~MemoryReserveOrWait() noexcept;
70 
78 
123  std::size_t size, std::int64_t net_memory_delta
124  );
125 
150  coro::task<std::pair<MemoryReservation, std::size_t>> reserve_or_wait_or_overbook(
151  std::size_t size, std::int64_t net_memory_delta
152  );
153 
177  std::size_t size, std::int64_t net_memory_delta
178  );
179 
187  [[nodiscard]] std::size_t size() const noexcept;
188 
197  [[nodiscard]] std::size_t periodic_memory_check_counter() const noexcept;
198 
204  [[nodiscard]] std::shared_ptr<CoroThreadPoolExecutor> const&
205  executor() const noexcept;
206 
212  [[nodiscard]] std::shared_ptr<BufferResource> const& br() const noexcept;
213 
219  [[nodiscard]] Duration timeout() const noexcept;
220 
221  private:
228  struct Request {
230  std::size_t size;
231 
234  std::int64_t net_memory_delta;
235 
237  std::uint64_t sequence_number;
238 
240  coro::queue<MemoryReservation>& queue;
241 
243  friend bool operator<(Request const& a, Request const& b) {
244  return std::tie(a.size, a.sequence_number)
245  < std::tie(b.size, b.sequence_number);
246  }
247  };
248 
279  coro::task<void> periodic_memory_check();
280 
281  mutable std::mutex mutex_;
282  std::uint64_t sequence_counter{0};
283  MemoryType const mem_type_;
284  std::shared_ptr<CoroThreadPoolExecutor> executor_;
285  std::shared_ptr<BufferResource> br_;
286  Duration const timeout_;
287  std::set<Request> reservation_requests_;
288  std::atomic<std::uint64_t> periodic_memory_check_counter_{0};
289  std::optional<coro::task<void>> periodic_memory_check_task_;
290 };
291 
353 coro::task<MemoryReservation> reserve_memory(
354  std::shared_ptr<Context> ctx,
355  std::size_t size,
356  std::int64_t net_memory_delta,
357  MemoryType mem_type = MemoryType::DEVICE,
358  std::optional<AllowOverbooking> allow_overbooking = std::nullopt
359 );
360 
361 } // namespace rapidsmpf::streaming
Class managing buffer resources.
Represents a reservation for future memory allocation.
Manages configuration options for RapidsMPF operations.
Definition: config.hpp:140
Executor wrapper around a coro::thread_pool used for coroutine execution.
Asynchronous coordinator for memory reservation requests.
Actor shutdown()
Shuts down all pending memory reservation requests.
coro::task< MemoryReservation > reserve_or_wait_or_fail(std::size_t size, std::int64_t net_memory_delta)
Variant of reserve_or_wait() that fails if no progress is possible.
coro::task< MemoryReservation > reserve_or_wait(std::size_t size, std::int64_t net_memory_delta)
Attempts to reserve memory or waits until progress can be made.
std::size_t size() const noexcept
Returns the number of pending memory reservation requests.
MemoryReserveOrWait(config::Options options, MemoryType mem_type, std::shared_ptr< CoroThreadPoolExecutor > executor, std::shared_ptr< BufferResource > br)
Constructs a MemoryReserveOrWait instance.
std::shared_ptr< CoroThreadPoolExecutor > const & executor() const noexcept
Get the coroutine executor used by this instance.
std::shared_ptr< BufferResource > const & br() const noexcept
Get the buffer resource used for memory reservations.
coro::task< std::pair< MemoryReservation, std::size_t > > reserve_or_wait_or_overbook(std::size_t size, std::int64_t net_memory_delta)
Variant of reserve_or_wait() that allows overbooking on timeout.
static constexpr std::int64_t missing_net_memory_delta
Sentinel indicating that net_memory_delta estimation has not yet been implemented.
Duration timeout() const noexcept
Get the configured progress timeout.
std::size_t periodic_memory_check_counter() const noexcept
Returns the number of iterations performed by periodic_memory_check().
coro::task< void > Actor
Alias for an actor in a streaming graph.
Definition: actor.hpp:18
coro::task< MemoryReservation > reserve_memory(std::shared_ptr< Context > ctx, std::size_t size, std::int64_t net_memory_delta, MemoryType mem_type=MemoryType::DEVICE, std::optional< AllowOverbooking > allow_overbooking=std::nullopt)
Reserve memory using the context memory reservation mechanism.
std::chrono::duration< double > Duration
Alias for a duration type representing time in seconds as a double.
Definition: misc.hpp:33
MemoryType
Enum representing the type of memory sorted in decreasing order of preference.
Definition: memory_type.hpp:16
@ DEVICE
Device memory.