14 #include <unordered_map>
15 #include <unordered_set>
18 #include <cuda_runtime_api.h>
20 #include <cuda/memory_resource>
24 #include <rmm/error.hpp>
27 #include <rapidsmpf/error.hpp>
28 #include <rapidsmpf/memory/scoped_memory_record.hpp>
29 #include <rapidsmpf/utils/misc.hpp>
49 cuda::mr::resource_with<cuda::mr::device_accessible> PrimaryMR,
50 cuda::mr::resource_with<cuda::mr::device_accessible> FallbackMR =
51 cuda::mr::any_resource<cuda::mr::device_accessible>>
66 std::optional<FallbackMR> fallback_mr = std::nullopt
68 : primary_mr_{std::move(primary_mr)}, fallback_mr_{std::move(fallback_mr)} {}
81 template <
typename... Args>
83 : primary_mr_{std::forward<Args>(args)...}, fallback_mr_{std::nullopt} {}
99 return this == std::addressof(other);
115 [[nodiscard]] std::optional<FallbackMR>
const&
122 std::lock_guard<std::mutex> lock(mutex_);
128 std::lock_guard<std::mutex> lock(mutex_);
134 std::lock_guard<std::mutex> lock(mutex_);
135 record_stacks_[std::this_thread::get_id()].emplace();
140 std::lock_guard lock(mutex_);
141 auto& stack = record_stacks_.at(std::this_thread::get_id());
144 "calling end_scoped_memory_record() on an empty stack.",
147 auto ret = stack.top();
149 if (!stack.empty()) {
150 stack.top().add_subscope(ret);
164 cuda::stream_ref stream,
172 auto alloc_type = PRIMARY;
174 ret = primary_mr_.allocate(stream, bytes, alignment);
176 if (fallback_mr_.has_value()) {
177 alloc_type = FALLBACK;
178 ret = fallback_mr_->allocate(stream, bytes, alignment);
179 std::lock_guard<std::mutex> lock(mutex_);
180 fallback_allocations_.insert(ret);
185 std::lock_guard<std::mutex> lock(mutex_);
187 if (!record_stacks_.empty()) {
188 auto const thread_id = std::this_thread::get_id();
189 auto& record = record_stacks_[thread_id];
190 if (!record.empty()) {
191 record.top().record_allocation(
192 alloc_type, safe_cast<std::int64_t>(bytes)
195 allocating_threads_.insert({ret, thread_id}).second,
196 "duplicate memory pointer"
212 cuda::stream_ref stream,
222 std::lock_guard<std::mutex> lock(mutex_);
223 alloc_type = (fallback_allocations_.erase(ptr) == 0) ? PRIMARY : FALLBACK;
225 if (!allocating_threads_.empty()) {
226 auto const node = allocating_threads_.extract(ptr);
228 auto thread_id = node.mapped();
229 auto& record = record_stacks_[thread_id];
230 if (!record.empty()) {
231 record.top().record_deallocation(
232 alloc_type, safe_cast<std::int64_t>(bytes)
238 if (alloc_type == PRIMARY) {
239 primary_mr_.deallocate(stream, ptr, bytes, alignment);
241 fallback_mr_->deallocate(stream, ptr, bytes, alignment);
255 auto* ptr =
allocate(sync_stream_, bytes, alignment);
272 deallocate(sync_stream_, ptr, bytes, alignment);
281 mutable std::mutex mutex_;
282 PrimaryMR primary_mr_;
283 std::optional<FallbackMR> fallback_mr_;
284 std::unordered_set<void*> fallback_allocations_;
287 std::unordered_map<std::thread::id, std::stack<ScopedMemoryRecord>> record_stacks_;
288 std::unordered_map<void*, std::thread::id> allocating_threads_;
Implementation class for RmmResourceAdaptor.
void deallocate(cuda::stream_ref stream, void *ptr, std::size_t bytes, std::size_t alignment=rmm::CUDA_ALLOCATION_ALIGNMENT) noexcept
Deallocate memory asynchronously on the given stream.
friend void get_property(RmmResourceAdaptorImpl const &, cuda::mr::device_accessible) noexcept
Tag this resource as device-accessible for the CCCL concept.
bool operator==(RmmResourceAdaptorImpl const &other) const noexcept
Equality comparison.
void * allocate_sync(std::size_t bytes, std::size_t alignment=rmm::CUDA_ALLOCATION_ALIGNMENT)
Allocate memory synchronously.
ScopedMemoryRecord end_scoped_memory_record()
End the current scoped memory record and return it.
void * allocate(cuda::stream_ref stream, std::size_t bytes, std::size_t alignment=rmm::CUDA_ALLOCATION_ALIGNMENT)
Allocate memory asynchronously on the given stream.
ScopedMemoryRecord get_main_record() const
Returns a copy of the main memory record.
std::optional< FallbackMR > const & get_fallback_resource() const noexcept
Returns a const reference to the optional fallback resource.
void begin_scoped_memory_record()
Begin recording a new scoped memory usage record for the current thread.
RmmResourceAdaptorImpl(std::in_place_t, Args &&... args)
Construct the primary resource in-place from forwarded arguments.
void deallocate_sync(void *ptr, std::size_t bytes, std::size_t alignment=rmm::CUDA_ALLOCATION_ALIGNMENT) noexcept
Deallocate memory synchronously.
std::int64_t current_allocated() const noexcept
Get the total current allocated memory from both primary and fallback.
PrimaryMR const & get_upstream_resource() const noexcept
Returns a reference to the primary upstream resource.
RmmResourceAdaptorImpl(PrimaryMR primary_mr, std::optional< FallbackMR > fallback_mr=std::nullopt)
Construct with a primary and optional fallback memory resource.
static constexpr std::size_t CUDA_ALLOCATION_ALIGNMENT
Memory statistics for a specific scope.
void record_allocation(AllocType alloc_type, std::int64_t nbytes)
Records a memory allocation event.
void record_deallocation(AllocType alloc_type, std::int64_t nbytes)
Records a memory deallocation event.
AllocType
Allocation source types.
@ PRIMARY
The primary allocator (first-choice allocator).
@ FALLBACK
The fallback allocator (used when the primary fails).
std::int64_t current(AllocType alloc_type=AllocType::ALL) const noexcept
Returns the current memory usage in bytes for the specified allocator type.