resource.hpp
1 
6 #pragma once
7 
8 #include <array>
9 #include <memory>
10 #include <mutex>
11 #include <optional>
12 #include <ranges>
13 #include <unordered_map>
14 #include <utility>
15 
16 #include <rmm/cuda_stream_pool.hpp>
17 
18 #include <rapidsmpf/buffer/buffer.hpp>
19 #include <rapidsmpf/buffer/spill_manager.hpp>
20 #include <rapidsmpf/error.hpp>
21 #include <rapidsmpf/rmm_resource_adaptor.hpp>
22 #include <rapidsmpf/statistics.hpp>
23 #include <rapidsmpf/utils.hpp>
24 
25 namespace rapidsmpf {
26 
34  friend class BufferResource;
35 
36  public:
42  ~MemoryReservation() noexcept;
43 
47  void clear() noexcept;
48 
56  o.mem_type_, std::exchange(o.br_, nullptr), std::exchange(o.size_, 0)
57  } {}
58 
66  clear();
67  mem_type_ = o.mem_type_;
68  br_ = std::exchange(o.br_, nullptr);
69  size_ = std::exchange(o.size_, 0);
70  return *this;
71  }
72 
76 
82  [[nodiscard]] constexpr std::size_t size() const noexcept {
83  return size_;
84  }
85 
91  [[nodiscard]] constexpr MemoryType mem_type() const noexcept {
92  return mem_type_;
93  }
94 
100  [[nodiscard]] constexpr BufferResource* br() const noexcept {
101  return br_;
102  }
103 
104  private:
114  constexpr MemoryReservation(MemoryType mem_type, BufferResource* br, std::size_t size)
115  : mem_type_{mem_type}, br_{br}, size_{size} {}
116 
117  private:
118  MemoryType mem_type_;
119  BufferResource* br_;
120  std::size_t size_;
121 };
122 
134  public:
145  using MemoryAvailable = std::function<std::int64_t()>;
146 
165  std::unordered_map<MemoryType, MemoryAvailable> memory_available = {},
166  std::optional<Duration> periodic_spill_check = std::chrono::milliseconds{1},
167  std::shared_ptr<rmm::cuda_stream_pool> stream_pool = std::make_shared<
169  std::shared_ptr<Statistics> statistics = Statistics::disabled()
170  );
171 
172  ~BufferResource() noexcept = default;
173 
179  [[nodiscard]] rmm::device_async_resource_ref device_mr() const noexcept {
180  return device_mr_;
181  }
182 
192  [[nodiscard]] MemoryAvailable const& memory_available(MemoryType mem_type) const {
193  return memory_available_.at(mem_type);
194  }
195 
202  [[nodiscard]] std::size_t memory_reserved(MemoryType mem_type) const {
203  return memory_reserved_[static_cast<std::size_t>(mem_type)];
204  }
205 
212  [[nodiscard]] std::size_t& memory_reserved(MemoryType mem_type) {
213  return memory_reserved_[static_cast<std::size_t>(mem_type)];
214  }
215 
235  std::pair<MemoryReservation, std::size_t> reserve(
236  MemoryType mem_type, size_t size, bool allow_overbooking
237  );
238 
252  MemoryType mem_type, size_t size, bool allow_overbooking
253  );
254 
265  template <std::ranges::input_range Range>
266  requires std::convertible_to<std::ranges::range_value_t<Range>, MemoryType>
267  [[nodiscard]] MemoryReservation reserve_or_fail(size_t size, Range mem_types) {
268  // try to reserve memory from the given order
269  for (auto const& mem_type : mem_types) {
270  auto [res, _] = reserve(mem_type, size, false);
271  if (res.size() == size) {
272  return std::move(res);
273  }
274  }
275  RAPIDSMPF_FAIL("failed to reserve memory", std::runtime_error);
276  }
277 
286  [[nodiscard]] MemoryReservation reserve_or_fail(size_t size, MemoryType mem_type) {
287  return reserve_or_fail(size, std::ranges::single_view{mem_type});
288  }
289 
302  std::size_t release(MemoryReservation& reservation, std::size_t size);
303 
315  std::unique_ptr<Buffer> allocate(
316  std::size_t size, rmm::cuda_stream_view stream, MemoryReservation& reservation
317  );
318 
329  std::unique_ptr<Buffer> allocate(
330  rmm::cuda_stream_view stream, MemoryReservation&& reservation
331  );
332 
348  std::unique_ptr<Buffer> move(
349  std::unique_ptr<rmm::device_buffer> data, rmm::cuda_stream_view stream
350  );
351 
364  std::unique_ptr<Buffer> move(
365  std::unique_ptr<Buffer> buffer, MemoryReservation& reservation
366  );
367 
381  std::unique_ptr<rmm::device_buffer> move_to_device_buffer(
382  std::unique_ptr<Buffer> buffer, MemoryReservation& reservation
383  );
384 
398  std::unique_ptr<std::vector<uint8_t>> move_to_host_vector(
399  std::unique_ptr<Buffer> buffer, MemoryReservation& reservation
400  );
401 
410 
417 
424  std::shared_ptr<Statistics> statistics();
425 
426  private:
427  std::mutex mutex_;
429  std::unordered_map<MemoryType, MemoryAvailable> memory_available_;
430  // Zero initialized reserved counters.
431  std::array<std::size_t, MEMORY_TYPES.size()> memory_reserved_ = {};
432  std::shared_ptr<rmm::cuda_stream_pool> stream_pool_;
433  SpillManager spill_manager_;
434  std::shared_ptr<Statistics> statistics_;
435 };
436 
453  public:
462  constexpr LimitAvailableMemory(RmmResourceAdaptor const* mr, std::int64_t limit)
463  : limit{limit}, mr_{mr} {}
464 
474  std::int64_t operator()() const {
475  return limit - static_cast<std::int64_t>(mr_->current_allocated());
476  }
477 
478  public:
479  std::int64_t const limit;
480 
481  private:
482  RmmResourceAdaptor const* mr_;
483 };
484 
485 } // namespace rapidsmpf
Class managing buffer resources.
Definition: resource.hpp:133
std::size_t release(MemoryReservation &reservation, std::size_t size)
Consume a portion of the reserved memory.
std::unique_ptr< Buffer > allocate(std::size_t size, rmm::cuda_stream_view stream, MemoryReservation &reservation)
Allocate a buffer of the specified memory type by the reservation.
std::shared_ptr< Statistics > statistics()
Gets a shared pointer to the statistics associated with this buffer resource.
BufferResource(rmm::device_async_resource_ref device_mr, std::unordered_map< MemoryType, MemoryAvailable > memory_available={}, std::optional< Duration > periodic_spill_check=std::chrono::milliseconds{1}, std::shared_ptr< rmm::cuda_stream_pool > stream_pool=std::make_shared< rmm::cuda_stream_pool >(16, rmm::cuda_stream::flags::non_blocking), std::shared_ptr< Statistics > statistics=Statistics::disabled())
Constructs a buffer resource.
std::pair< MemoryReservation, std::size_t > reserve(MemoryType mem_type, size_t size, bool allow_overbooking)
Reserve an amount of the specified memory type.
MemoryReservation reserve_and_spill(MemoryType mem_type, size_t size, bool allow_overbooking)
Reserve memory and spill if necessary.
rmm::device_async_resource_ref device_mr() const noexcept
Get the RMM device memory resource.
Definition: resource.hpp:179
std::unique_ptr< Buffer > allocate(rmm::cuda_stream_view stream, MemoryReservation &&reservation)
Allocate a buffer consuming the entire reservation.
std::unique_ptr< Buffer > move(std::unique_ptr< Buffer > buffer, MemoryReservation &reservation)
Move a Buffer to the memory type specified by the reservation.
requires std::convertible_to< std::ranges::range_value_t< Range >, MemoryType > MemoryReservation reserve_or_fail(size_t size, Range mem_types)
Make a memory reservation or fail based on the given order of memory types.
Definition: resource.hpp:267
std::size_t memory_reserved(MemoryType mem_type) const
Get the current reserved memory of the specified memory type.
Definition: resource.hpp:202
std::unique_ptr< rmm::device_buffer > move_to_device_buffer(std::unique_ptr< Buffer > buffer, MemoryReservation &reservation)
Move a Buffer to a device buffer.
MemoryAvailable const & memory_available(MemoryType mem_type) const
Retrieves the memory availability function for a given memory type.
Definition: resource.hpp:192
MemoryReservation reserve_or_fail(size_t size, MemoryType mem_type)
Make a memory reservation or fail.
Definition: resource.hpp:286
std::function< std::int64_t()> MemoryAvailable
Callback function to determine available memory.
Definition: resource.hpp:145
SpillManager & spill_manager()
Gets a reference to the spill manager used.
std::size_t & memory_reserved(MemoryType mem_type)
Get a reference to the current reserved memory of the specified memory type.
Definition: resource.hpp:212
rmm::cuda_stream_pool const & stream_pool() const
Returns the CUDA stream pool used by this buffer resource.
std::unique_ptr< std::vector< uint8_t > > move_to_host_vector(std::unique_ptr< Buffer > buffer, MemoryReservation &reservation)
Move a Buffer into a host vector.
std::unique_ptr< Buffer > move(std::unique_ptr< rmm::device_buffer > data, rmm::cuda_stream_view stream)
Move device buffer data into a Buffer.
A functor for querying the remaining available memory within a defined limit from an RMM statistics r...
Definition: resource.hpp:452
constexpr LimitAvailableMemory(RmmResourceAdaptor const *mr, std::int64_t limit)
Constructs a LimitAvailableMemory instance.
Definition: resource.hpp:462
std::int64_t operator()() const
Returns the remaining available memory within the defined limit.
Definition: resource.hpp:474
std::int64_t const limit
The memory limit.
Definition: resource.hpp:479
Represents a reservation for future memory allocation.
Definition: resource.hpp:33
constexpr BufferResource * br() const noexcept
Get the buffer resource associated with this reservation.
Definition: resource.hpp:100
~MemoryReservation() noexcept
Destructor for the memory reservation.
constexpr std::size_t size() const noexcept
Get the remaining size of the reserved memory.
Definition: resource.hpp:82
MemoryReservation & operator=(MemoryReservation &&o) noexcept
Move assignment operator for MemoryReservation.
Definition: resource.hpp:65
MemoryReservation(MemoryReservation &&o)
Move constructor for MemoryReservation.
Definition: resource.hpp:54
void clear() noexcept
Clear the remaining size of the reservation.
constexpr MemoryType mem_type() const noexcept
Get the type of memory associated with this reservation.
Definition: resource.hpp:91
MemoryReservation(MemoryReservation const &)=delete
A memory reservation is not copyable.
A RMM memory resource adaptor tailored to RapidsMPF.
std::int64_t current_allocated() const noexcept
Get the total current allocated memory from both primary and fallback.
Manages memory spilling to free up device memory when needed.
static std::shared_ptr< Statistics > disabled()
Returns a shared pointer to a disabled (no-op) Statistics instance.
detail::cccl_async_resource_ref< cuda::mr::resource_ref< cuda::mr::device_accessible > > device_async_resource_ref