rmm_resource_adaptor_impl.hpp
1 
6 #pragma once
7 
8 #include <cstddef>
9 #include <cstdint>
10 #include <mutex>
11 #include <optional>
12 #include <stack>
13 #include <thread>
14 #include <unordered_map>
15 #include <unordered_set>
16 #include <utility>
17 
18 #include <cuda_runtime_api.h>
19 
20 #include <cuda/memory_resource>
21 
22 #include <rmm/aligned.hpp>
23 #include <rmm/cuda_stream.hpp>
24 #include <rmm/error.hpp>
25 #include <rmm/resource_ref.hpp>
26 
27 #include <rapidsmpf/error.hpp>
28 #include <rapidsmpf/memory/scoped_memory_record.hpp>
29 #include <rapidsmpf/utils/misc.hpp>
30 
31 namespace rapidsmpf::detail {
32 
48 template <
49  cuda::mr::resource_with<cuda::mr::device_accessible> PrimaryMR,
50  cuda::mr::resource_with<cuda::mr::device_accessible> FallbackMR =
51  cuda::mr::any_resource<cuda::mr::device_accessible>>
53  public:
60  // NOLINTBEGIN(clang-analyzer-core.StackAddressEscape): false positive — primary_mr
61  // and fallback_mr are moved into a heap-allocated control block inside
62  // make_shared_resource; the analyzer incorrectly traces the forwarding
63  // reference chain back to the outer caller's stack frame.
64  RmmResourceAdaptorImpl( // NOLINT(clang-analyzer-core.StackAddressEscape)
65  PrimaryMR primary_mr,
66  std::optional<FallbackMR> fallback_mr = std::nullopt
67  )
68  : primary_mr_{std::move(primary_mr)}, fallback_mr_{std::move(fallback_mr)} {}
69 
70  // NOLINTEND(clang-analyzer-core.StackAddressEscape)
71 
81  template <typename... Args>
82  explicit RmmResourceAdaptorImpl(std::in_place_t, Args&&... args)
83  : primary_mr_{std::forward<Args>(args)...}, fallback_mr_{std::nullopt} {}
84 
85  ~RmmResourceAdaptorImpl() = default;
86 
89  RmmResourceAdaptorImpl& operator=(RmmResourceAdaptorImpl const&) = delete;
90  RmmResourceAdaptorImpl& operator=(RmmResourceAdaptorImpl&&) = delete;
91 
98  [[nodiscard]] bool operator==(RmmResourceAdaptorImpl const& other) const noexcept {
99  return this == std::addressof(other);
100  }
101 
106  [[nodiscard]] PrimaryMR const& get_upstream_resource() const noexcept {
107  return primary_mr_;
108  }
109 
115  [[nodiscard]] std::optional<FallbackMR> const&
116  get_fallback_resource() const noexcept {
117  return fallback_mr_;
118  }
119 
121  [[nodiscard]] ScopedMemoryRecord get_main_record() const {
122  std::lock_guard<std::mutex> lock(mutex_);
123  return main_record_;
124  }
125 
127  [[nodiscard]] std::int64_t current_allocated() const noexcept {
128  std::lock_guard<std::mutex> lock(mutex_);
129  return main_record_.current();
130  }
131 
134  std::lock_guard<std::mutex> lock(mutex_);
135  record_stacks_[std::this_thread::get_id()].emplace();
136  }
137 
140  std::lock_guard lock(mutex_);
141  auto& stack = record_stacks_.at(std::this_thread::get_id());
142  RAPIDSMPF_EXPECTS(
143  !stack.empty(),
144  "calling end_scoped_memory_record() on an empty stack.",
145  std::out_of_range
146  );
147  auto ret = stack.top();
148  stack.pop();
149  if (!stack.empty()) {
150  stack.top().add_subscope(ret);
151  }
152  return ret;
153  }
154 
163  void* allocate(
164  cuda::stream_ref stream,
165  std::size_t bytes,
166  std::size_t alignment = rmm::CUDA_ALLOCATION_ALIGNMENT
167  ) {
168  constexpr auto PRIMARY = ScopedMemoryRecord::AllocType::PRIMARY;
169  constexpr auto FALLBACK = ScopedMemoryRecord::AllocType::FALLBACK;
170 
171  void* ret{};
172  auto alloc_type = PRIMARY;
173  try {
174  ret = primary_mr_.allocate(stream, bytes, alignment);
175  } catch (rmm::out_of_memory const& e) {
176  if (fallback_mr_.has_value()) {
177  alloc_type = FALLBACK;
178  ret = fallback_mr_->allocate(stream, bytes, alignment);
179  std::lock_guard<std::mutex> lock(mutex_);
180  fallback_allocations_.insert(ret);
181  } else {
182  throw;
183  }
184  }
185  std::lock_guard<std::mutex> lock(mutex_);
186  main_record_.record_allocation(alloc_type, safe_cast<std::int64_t>(bytes));
187  if (!record_stacks_.empty()) {
188  auto const thread_id = std::this_thread::get_id();
189  auto& record = record_stacks_[thread_id];
190  if (!record.empty()) {
191  record.top().record_allocation(
192  alloc_type, safe_cast<std::int64_t>(bytes)
193  );
194  RAPIDSMPF_EXPECTS(
195  allocating_threads_.insert({ret, thread_id}).second,
196  "duplicate memory pointer"
197  );
198  }
199  }
200  return ret;
201  }
202 
212  cuda::stream_ref stream,
213  void* ptr,
214  std::size_t bytes,
215  std::size_t alignment = rmm::CUDA_ALLOCATION_ALIGNMENT
216  ) noexcept {
217  constexpr auto PRIMARY = ScopedMemoryRecord::AllocType::PRIMARY;
218  constexpr auto FALLBACK = ScopedMemoryRecord::AllocType::FALLBACK;
219 
221  {
222  std::lock_guard<std::mutex> lock(mutex_);
223  alloc_type = (fallback_allocations_.erase(ptr) == 0) ? PRIMARY : FALLBACK;
224  main_record_.record_deallocation(alloc_type, safe_cast<std::int64_t>(bytes));
225  if (!allocating_threads_.empty()) {
226  auto const node = allocating_threads_.extract(ptr);
227  if (node) {
228  auto thread_id = node.mapped();
229  auto& record = record_stacks_[thread_id];
230  if (!record.empty()) {
231  record.top().record_deallocation(
232  alloc_type, safe_cast<std::int64_t>(bytes)
233  );
234  }
235  }
236  }
237  }
238  if (alloc_type == PRIMARY) {
239  primary_mr_.deallocate(stream, ptr, bytes, alignment);
240  } else {
241  fallback_mr_->deallocate(stream, ptr, bytes, alignment);
242  }
243  }
244 
253  std::size_t bytes, std::size_t alignment = rmm::CUDA_ALLOCATION_ALIGNMENT
254  ) {
255  auto* ptr = allocate(sync_stream_, bytes, alignment);
256  sync_stream_.synchronize();
257  return ptr;
258  }
259 
268  void* ptr,
269  std::size_t bytes,
270  std::size_t alignment = rmm::CUDA_ALLOCATION_ALIGNMENT
271  ) noexcept {
272  deallocate(sync_stream_, ptr, bytes, alignment);
273  }
274 
276  friend void get_property(
277  RmmResourceAdaptorImpl const&, cuda::mr::device_accessible
278  ) noexcept {}
279 
280  private:
281  mutable std::mutex mutex_;
282  PrimaryMR primary_mr_;
283  std::optional<FallbackMR> fallback_mr_;
284  std::unordered_set<void*> fallback_allocations_;
285 
286  ScopedMemoryRecord main_record_;
287  std::unordered_map<std::thread::id, std::stack<ScopedMemoryRecord>> record_stacks_;
288  std::unordered_map<void*, std::thread::id> allocating_threads_;
289 
290  rmm::cuda_stream sync_stream_{
292  };
293 };
294 
295 } // namespace rapidsmpf::detail
Implementation class for RmmResourceAdaptor.
void deallocate(cuda::stream_ref stream, void *ptr, std::size_t bytes, std::size_t alignment=rmm::CUDA_ALLOCATION_ALIGNMENT) noexcept
Deallocate memory asynchronously on the given stream.
friend void get_property(RmmResourceAdaptorImpl const &, cuda::mr::device_accessible) noexcept
Tag this resource as device-accessible for the CCCL concept.
bool operator==(RmmResourceAdaptorImpl const &other) const noexcept
Equality comparison.
void * allocate_sync(std::size_t bytes, std::size_t alignment=rmm::CUDA_ALLOCATION_ALIGNMENT)
Allocate memory synchronously.
ScopedMemoryRecord end_scoped_memory_record()
End the current scoped memory record and return it.
void * allocate(cuda::stream_ref stream, std::size_t bytes, std::size_t alignment=rmm::CUDA_ALLOCATION_ALIGNMENT)
Allocate memory asynchronously on the given stream.
ScopedMemoryRecord get_main_record() const
Returns a copy of the main memory record.
std::optional< FallbackMR > const & get_fallback_resource() const noexcept
Returns a const reference to the optional fallback resource.
void begin_scoped_memory_record()
Begin recording a new scoped memory usage record for the current thread.
RmmResourceAdaptorImpl(std::in_place_t, Args &&... args)
Construct the primary resource in-place from forwarded arguments.
void deallocate_sync(void *ptr, std::size_t bytes, std::size_t alignment=rmm::CUDA_ALLOCATION_ALIGNMENT) noexcept
Deallocate memory synchronously.
std::int64_t current_allocated() const noexcept
Get the total current allocated memory from both primary and fallback.
PrimaryMR const & get_upstream_resource() const noexcept
Returns a reference to the primary upstream resource.
RmmResourceAdaptorImpl(PrimaryMR primary_mr, std::optional< FallbackMR > fallback_mr=std::nullopt)
Construct with a primary and optional fallback memory resource.
void synchronize() const
static constexpr std::size_t CUDA_ALLOCATION_ALIGNMENT
Memory statistics for a specific scope.
void record_allocation(AllocType alloc_type, std::int64_t nbytes)
Records a memory allocation event.
void record_deallocation(AllocType alloc_type, std::int64_t nbytes)
Records a memory deallocation event.
AllocType
Allocation source types.
@ PRIMARY
The primary allocator (first-choice allocator).
@ FALLBACK
The fallback allocator (used when the primary fails).
std::int64_t current(AllocType alloc_type=AllocType::ALL) const noexcept
Returns the current memory usage in bytes for the specified allocator type.