buffer_resource.hpp
1 
6 #pragma once
7 
8 #include <array>
9 #include <memory>
10 #include <mutex>
11 #include <optional>
12 #include <ranges>
13 #include <unordered_map>
14 #include <utility>
15 
16 #include <cuda/memory_resource>
17 
18 #include <rmm/cuda_stream_pool.hpp>
19 
20 #include <rapidsmpf/error.hpp>
21 #include <rapidsmpf/memory/buffer.hpp>
22 #include <rapidsmpf/memory/host_memory_resource.hpp>
23 #include <rapidsmpf/memory/memory_reservation.hpp>
24 #include <rapidsmpf/memory/pinned_memory_resource.hpp>
25 #include <rapidsmpf/memory/resource_types.hpp>
26 #include <rapidsmpf/memory/spill_manager.hpp>
27 #include <rapidsmpf/rmm_resource_adaptor.hpp>
28 #include <rapidsmpf/statistics.hpp>
29 #include <rapidsmpf/utils/misc.hpp>
30 
31 namespace rapidsmpf {
32 
40 enum class AllowOverbooking : bool {
41  NO,
42  YES,
43 };
44 
56  public:
67  using MemoryAvailable = std::function<std::int64_t()>;
68 
91  cuda::mr::any_resource<cuda::mr::device_accessible> device_mr,
92  std::optional<PinnedMemoryResource> pinned_mr = PinnedMemoryResource::Disabled,
93  std::unordered_map<MemoryType, MemoryAvailable> memory_available = {},
94  std::optional<Duration> periodic_spill_check = std::chrono::milliseconds{1},
95  std::shared_ptr<rmm::cuda_stream_pool> stream_pool = std::make_shared<
97  std::shared_ptr<Statistics> statistics = Statistics::disabled()
98  );
99 
112  static std::shared_ptr<BufferResource> from_options(
114  );
115 
116  ~BufferResource() noexcept = default;
117 
123  [[nodiscard]] rmm::device_async_resource_ref device_mr() const noexcept;
124 
130  [[nodiscard]] rmm::host_async_resource_ref host_mr() noexcept;
131 
139 
146  [[nodiscard]] std::optional<any_host_device_resource> try_pinned_mr() const noexcept;
147 
157  [[nodiscard]] MemoryAvailable const& memory_available(MemoryType mem_type) const {
158  return memory_available_.at(mem_type);
159  }
160 
167  [[nodiscard]] std::size_t memory_reserved(MemoryType mem_type) const {
168  return memory_reserved_[static_cast<std::size_t>(mem_type)];
169  }
170 
193  std::pair<MemoryReservation, std::size_t> reserve(
194  MemoryType mem_type, std::size_t size, AllowOverbooking allow_overbooking
195  );
196 
215  std::size_t size, AllowOverbooking allow_overbooking
216  );
217 
231  template <std::ranges::input_range Range>
232  requires std::convertible_to<std::ranges::range_value_t<Range>, MemoryType>
233  [[nodiscard]] MemoryReservation reserve_or_fail(std::size_t size, Range mem_types) {
234  // try to reserve memory from the given order
235  for (auto const& mem_type : mem_types) {
236  if (mem_type == MemoryType::PINNED_HOST
237  && pinned_mr_ == PinnedMemoryResource::Disabled)
238  {
239  // Pinned host memory is only available if the memory resource is
240  // available.
241  continue;
242  }
243  auto [res, _] = reserve(mem_type, size, AllowOverbooking::NO);
244  if (res.size() == size) {
245  return std::move(res);
246  }
247  }
248  RAPIDSMPF_FAIL("failed to reserve memory", std::runtime_error);
249  }
250 
261  std::size_t size, MemoryType mem_type
262  ) {
263  return reserve_or_fail(size, std::ranges::single_view{mem_type});
264  }
265 
278  std::size_t release(MemoryReservation& reservation, std::size_t size);
279 
291  std::unique_ptr<Buffer> allocate(
292  std::size_t size, rmm::cuda_stream_view stream, MemoryReservation& reservation
293  );
294 
305  std::unique_ptr<Buffer> allocate(
306  rmm::cuda_stream_view stream, MemoryReservation&& reservation
307  );
308 
324  std::unique_ptr<Buffer> move(
325  std::unique_ptr<rmm::device_buffer> data, rmm::cuda_stream_view stream
326  );
327 
341  std::unique_ptr<Buffer> move(
342  std::unique_ptr<Buffer> buffer, MemoryReservation& reservation
343  );
344 
359  std::unique_ptr<rmm::device_buffer> move_to_device_buffer(
360  std::unique_ptr<Buffer> buffer, MemoryReservation& reservation
361  );
362 
377  std::unique_ptr<HostBuffer> move_to_host_buffer(
378  std::unique_ptr<Buffer> buffer, MemoryReservation& reservation
379  );
380 
389 
396 
403  std::shared_ptr<Statistics> statistics();
404 
405  private:
406  std::mutex mutex_;
407  cuda::mr::any_resource<cuda::mr::device_accessible> device_mr_;
408  std::optional<PinnedMemoryResource> pinned_mr_;
409  HostMemoryResource host_mr_;
410  std::unordered_map<MemoryType, MemoryAvailable> memory_available_;
411  // Zero initialized reserved counters.
412  std::array<std::size_t, MEMORY_TYPES.size()> memory_reserved_ = {};
413  std::shared_ptr<rmm::cuda_stream_pool> stream_pool_;
414  SpillManager spill_manager_;
415  std::shared_ptr<Statistics> statistics_;
416 };
417 
434  public:
443  : limit{limit}, mr_{std::move(mr)} {}
444 
454  std::int64_t operator()() const {
455  return limit - mr_.current_allocated();
456  }
457 
458  public:
459  std::int64_t const limit;
460 
461  private:
462  RmmResourceAdaptor const mr_;
463 };
464 
473 std::unordered_map<MemoryType, BufferResource::MemoryAvailable>
475 
484 std::optional<Duration> periodic_spill_check_from_options(config::Options options);
485 
493 std::shared_ptr<rmm::cuda_stream_pool> stream_pool_from_options(config::Options options);
494 
495 
496 } // namespace rapidsmpf
Class managing buffer resources.
std::size_t release(MemoryReservation &reservation, std::size_t size)
Consume a portion of the reserved memory.
std::unique_ptr< Buffer > allocate(std::size_t size, rmm::cuda_stream_view stream, MemoryReservation &reservation)
Allocate a buffer of the specified memory type by the reservation.
std::shared_ptr< Statistics > statistics()
Gets a shared pointer to the statistics associated with this buffer resource.
rmm::host_async_resource_ref host_mr() noexcept
Get the RMM host memory resource.
rmm::device_async_resource_ref device_mr() const noexcept
Get the RMM device memory resource.
std::unique_ptr< Buffer > allocate(rmm::cuda_stream_view stream, MemoryReservation &&reservation)
Allocate a buffer consuming the entire reservation.
BufferResource(cuda::mr::any_resource< cuda::mr::device_accessible > device_mr, std::optional< PinnedMemoryResource > pinned_mr=PinnedMemoryResource::Disabled, std::unordered_map< MemoryType, MemoryAvailable > memory_available={}, std::optional< Duration > periodic_spill_check=std::chrono::milliseconds{1}, std::shared_ptr< rmm::cuda_stream_pool > stream_pool=std::make_shared< rmm::cuda_stream_pool >(16, rmm::cuda_stream::flags::non_blocking), std::shared_ptr< Statistics > statistics=Statistics::disabled())
Constructs a buffer resource.
std::unique_ptr< Buffer > move(std::unique_ptr< Buffer > buffer, MemoryReservation &reservation)
Move a Buffer to the memory type specified by the reservation.
std::optional< any_host_device_resource > try_pinned_mr() const noexcept
Get the pinned host memory resource if available.
std::size_t memory_reserved(MemoryType mem_type) const
Get the current reserved memory of the specified memory type.
std::unique_ptr< rmm::device_buffer > move_to_device_buffer(std::unique_ptr< Buffer > buffer, MemoryReservation &reservation)
Move a Buffer to a device buffer.
MemoryReservation reserve_device_memory_and_spill(std::size_t size, AllowOverbooking allow_overbooking)
Reserve device memory and spill if necessary.
MemoryReservation reserve_or_fail(std::size_t size, MemoryType mem_type)
Make a memory reservation or fail.
requires std::convertible_to< std::ranges::range_value_t< Range >, MemoryType > MemoryReservation reserve_or_fail(std::size_t size, Range mem_types)
Make a memory reservation or fail based on the given order of memory types.
static std::shared_ptr< BufferResource > from_options(RmmResourceAdaptor mr, config::Options options)
Construct a BufferResource from configuration options.
MemoryAvailable const & memory_available(MemoryType mem_type) const
Retrieves the memory availability function for a given memory type.
rmm::host_device_async_resource_ref pinned_mr()
Get the RMM pinned host memory resource.
std::function< std::int64_t()> MemoryAvailable
Callback function to determine available memory.
SpillManager & spill_manager()
Gets a reference to the spill manager used.
rmm::cuda_stream_pool const & stream_pool() const
Returns the CUDA stream pool used by this buffer resource.
std::unique_ptr< HostBuffer > move_to_host_buffer(std::unique_ptr< Buffer > buffer, MemoryReservation &reservation)
Move a Buffer into a host buffer.
std::pair< MemoryReservation, std::size_t > reserve(MemoryType mem_type, std::size_t size, AllowOverbooking allow_overbooking)
Reserve an amount of the specified memory type.
std::unique_ptr< Buffer > move(std::unique_ptr< rmm::device_buffer > data, rmm::cuda_stream_view stream)
Move device buffer data into a Buffer.
Host memory resource using standard CPU allocation.
A functor for querying the remaining available memory within a defined limit from an RMM statistics r...
std::int64_t operator()() const
Returns the remaining available memory within the defined limit.
std::int64_t const limit
The memory limit.
LimitAvailableMemory(RmmResourceAdaptor mr, std::int64_t limit)
Constructs a LimitAvailableMemory instance.
Represents a reservation for future memory allocation.
static constexpr std::nullopt_t Disabled
Sentinel value indicating that pinned host memory is disabled.
A RMM memory resource adaptor tailored to RapidsMPF.
std::int64_t current_allocated() const noexcept
Get the total current allocated memory from both primary and fallback.
Manages memory spilling to free up device memory when needed.
static std::shared_ptr< Statistics > disabled()
Returns a shared pointer to a disabled (no-op) Statistics instance.
Manages configuration options for RapidsMPF operations.
Definition: config.hpp:140
cuda::mr::resource_ref< cuda::mr::host_accessible > host_async_resource_ref
cuda::mr::resource_ref< cuda::mr::device_accessible > device_async_resource_ref
cuda::mr::resource_ref< cuda::mr::host_accessible, cuda::mr::device_accessible > host_device_async_resource_ref
RAPIDS Multi-Processor interfaces.
Definition: backend.hpp:14
std::optional< Duration > periodic_spill_check_from_options(config::Options options)
Get the periodic_spill_check parameter from configuration options.
AllowOverbooking
Policy controlling whether a memory reservation is allowed to overbook.
@ YES
Overbooking is allowed.
@ NO
Overbooking is not allowed.
constexpr std::array< MemoryType, 3 > MEMORY_TYPES
All memory types sorted in decreasing order of preference.
Definition: memory_type.hpp:23
MemoryType
Enum representing the type of memory sorted in decreasing order of preference.
Definition: memory_type.hpp:16
@ PINNED_HOST
Pinned host memory.
cuda::mr::any_resource< cuda::mr::host_accessible, cuda::mr::device_accessible > any_host_device_resource
Owning type-erased host- and device-accessible memory resource.
std::unordered_map< MemoryType, BufferResource::MemoryAvailable > memory_available_from_options(RmmResourceAdaptor mr, config::Options options)
Construct a map of memory-available functions from configuration options.
std::shared_ptr< rmm::cuda_stream_pool > stream_pool_from_options(config::Options options)
Get a new CUDA stream pool from configuration options.