stream_ordered_memory_resource.hpp
1 /*
2  * Copyright (c) 2020-2021, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <rmm/detail/aligned.hpp>
19 #include <rmm/detail/error.hpp>
20 #include <rmm/logger.hpp>
21 #include <rmm/mr/device/device_memory_resource.hpp>
22 
23 #include <cuda_runtime_api.h>
24 
25 #include <cstddef>
26 #include <functional>
27 #include <limits>
28 #include <map>
29 #include <mutex>
30 #include <set>
31 #include <thread>
32 #include <unordered_map>
33 
34 namespace rmm::mr::detail {
35 
47 template <typename T>
48 struct crtp {
49  [[nodiscard]] T& underlying() { return static_cast<T&>(*this); }
50  [[nodiscard]] T const& underlying() const { return static_cast<T const&>(*this); }
51 };
52 
74 template <typename PoolResource, typename FreeListType>
75 class stream_ordered_memory_resource : public crtp<PoolResource>, public device_memory_resource {
76  public:
77  ~stream_ordered_memory_resource() override { release(); }
78 
84 
85  protected:
86  using free_list = FreeListType;
87  using block_type = typename free_list::block_type;
88  using lock_guard = std::lock_guard<std::mutex>;
89 
90  // Derived classes must implement these four methods
91 
92  /*
93  * @brief Get the maximum size of a single allocation supported by this suballocator memory
94  * resource
95  *
96  * Default implementation is the maximum `std::size_t` value, but fixed-size allocators will have
97  * a lower limit. Override this function in derived classes as necessary.
98  *
99  * @return std::size_t The maximum size of a single allocation supported by this memory resource
100  */
101  // std::size_t get_maximum_allocation_size() const
102 
103  /*
104  * @brief Allocate space (typically from upstream) to supply the suballocation pool and return
105  * a sufficiently sized block.
106  *
107  * This function returns a block because in some suballocators, a single block is allocated
108  * from upstream and returned. In other suballocators, many blocks are created from upstream. In
109  * the latter case, the function returns one block and inserts all the rest into the free list
110  * `blocks`.
111  *
112  * @param size The minimum size block to return
113  * @param blocks The free list into which to optionally insert new blocks
114  * @param stream The stream on which the memory is to be used.
115  * @return block_type a block of at least `size` bytes
116  */
117  // block_type expand_pool(std::size_t size, free_list& blocks, cuda_stream_view stream)
118 
120  using split_block = std::pair<block_type, block_type>;
121 
122  /*
123  * @brief Split block `b` if necessary to return a pointer to memory of `size` bytes.
124  *
125  * If the block is split, the remainder is returned as the remainder element in the output
126  * `split_block`.
127  *
128  * @param b The block to allocate from.
129  * @param size The size in bytes of the requested allocation.
130  * @param stream_event The stream and associated event on which the allocation will be used.
131  * @return A `split_block` comprising the allocated pointer and any unallocated remainder of the
132  * input block.
133  */
134  // split_block allocate_from_block(block_type const& b, std::size_t size)
135 
136  /*
137  * @brief Finds, frees and returns the block associated with pointer `p`.
138  *
139  * @param p The pointer to the memory to free.
140  * @param size The size of the memory to free. Must be equal to the original allocation size.
141  * @return The (now freed) block associated with `p`. The caller is expected to return the block
142  * to the pool.
143  */
144  // block_type free_block(void* p, std::size_t size) noexcept
145 
152  void insert_block(block_type const& block, cuda_stream_view stream)
153  {
154  stream_free_blocks_[get_event(stream)].insert(block);
155  }
156 
157  void insert_blocks(free_list&& blocks, cuda_stream_view stream)
158  {
159  stream_free_blocks_[get_event(stream)].insert(std::move(blocks));
160  }
161 
162 #ifdef RMM_DEBUG_PRINT
163  void print_free_blocks() const
164  {
165  std::cout << "stream free blocks: ";
166  for (auto& free_blocks : stream_free_blocks_) {
167  std::cout << "stream: " << free_blocks.first.stream << " event: " << free_blocks.first.event
168  << " ";
169  free_blocks.second.print();
170  std::cout << std::endl;
171  }
172  std::cout << std::endl;
173  }
174 #endif
175 
181  std::mutex& get_mutex() { return mtx_; }
182 
184  cudaStream_t stream;
185  cudaEvent_t event;
186 
187  bool operator<(stream_event_pair const& rhs) const { return event < rhs.event; }
188  };
189 
201  void* do_allocate(std::size_t size, cuda_stream_view stream) override
202  {
203  RMM_LOG_TRACE("[A][stream {:p}][{}B]", fmt::ptr(stream.value()), size);
204 
205  if (size <= 0) { return nullptr; }
206 
207  lock_guard lock(mtx_);
208 
209  auto stream_event = get_event(stream);
210 
211  size = rmm::detail::align_up(size, rmm::detail::CUDA_ALLOCATION_ALIGNMENT);
212  RMM_EXPECTS(size <= this->underlying().get_maximum_allocation_size(),
214  "Maximum allocation size exceeded");
215  auto const block = this->underlying().get_block(size, stream_event);
216 
217  RMM_LOG_TRACE("[A][stream {:p}][{}B][{:p}]",
218  fmt::ptr(stream_event.stream),
219  size,
220  fmt::ptr(block.pointer()));
221 
222  log_summary_trace();
223 
224  return block.pointer();
225  }
226 
236  void do_deallocate(void* ptr, std::size_t size, cuda_stream_view stream) override
237  {
238  RMM_LOG_TRACE("[D][stream {:p}][{}B][{:p}]", fmt::ptr(stream.value()), size, ptr);
239 
240  if (size <= 0 || ptr == nullptr) { return; }
241 
242  lock_guard lock(mtx_);
243  auto stream_event = get_event(stream);
244 
245  size = rmm::detail::align_up(size, rmm::detail::CUDA_ALLOCATION_ALIGNMENT);
246  auto const block = this->underlying().free_block(ptr, size);
247 
248  // TODO: cudaEventRecord has significant overhead on deallocations. For the non-PTDS case
249  // we may be able to delay recording the event in some situations. But using events rather than
250  // streams allows stealing from deleted streams.
251  RMM_ASSERT_CUDA_SUCCESS(cudaEventRecord(stream_event.event, stream.value()));
252 
253  stream_free_blocks_[stream_event].insert(block);
254 
255  log_summary_trace();
256  }
257 
258  private:
262  struct event_wrapper {
263  event_wrapper()
264  {
265  RMM_ASSERT_CUDA_SUCCESS(cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
266  }
267  ~event_wrapper() { RMM_ASSERT_CUDA_SUCCESS(cudaEventDestroy(event)); }
268  cudaEvent_t event{};
269 
270  event_wrapper(event_wrapper const&) = delete;
271  event_wrapper& operator=(event_wrapper const&) = delete;
272  event_wrapper(event_wrapper&&) noexcept = delete;
273  event_wrapper& operator=(event_wrapper&&) = delete;
274  };
275 
286  stream_event_pair get_event(cuda_stream_view stream)
287  {
288  if (stream.is_per_thread_default()) {
289  // Create a thread-local shared event wrapper. Shared pointers in the thread and in each MR
290  // instance ensures it is destroyed cleaned up only after all are finished with it.
291  thread_local auto event_tls = std::make_shared<event_wrapper>();
292  default_stream_events.insert(event_tls);
293  return stream_event_pair{stream.value(), event_tls->event};
294  }
295  // We use cudaStreamLegacy as the event map key for the default stream for consistency between
296  // PTDS and non-PTDS mode. In PTDS mode, the cudaStreamLegacy map key will only exist if the
297  // user explicitly passes it, so it is used as the default location for the free list
298  // at construction. For consistency, the same key is used for null stream free lists in non-PTDS
299  // mode.
300  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-cstyle-cast)
301  auto* const stream_to_store = stream.is_default() ? cudaStreamLegacy : stream.value();
302 
303  auto const iter = stream_events_.find(stream_to_store);
304  return (iter != stream_events_.end()) ? iter->second : [&]() {
305  stream_event_pair stream_event{stream_to_store};
306  RMM_ASSERT_CUDA_SUCCESS(
307  cudaEventCreateWithFlags(&stream_event.event, cudaEventDisableTiming));
308  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
309  stream_events_[stream_to_store] = stream_event;
310  return stream_event;
311  }();
312  }
313 
323  block_type allocate_and_insert_remainder(block_type block, std::size_t size, free_list& blocks)
324  {
325  auto const [allocated, remainder] = this->underlying().allocate_from_block(block, size);
326  if (remainder.is_valid()) { blocks.insert(remainder); }
327  return allocated;
328  }
329 
337  block_type get_block(std::size_t size, stream_event_pair stream_event)
338  {
339  // Try to find a satisfactory block in free list for the same stream (no sync required)
340  auto iter = stream_free_blocks_.find(stream_event);
341  if (iter != stream_free_blocks_.end()) {
342  block_type const block = iter->second.get_block(size);
343  if (block.is_valid()) { return allocate_and_insert_remainder(block, size, iter->second); }
344  }
345 
346  free_list& blocks =
347  (iter != stream_free_blocks_.end()) ? iter->second : stream_free_blocks_[stream_event];
348 
349  // Try to find an existing block in another stream
350  {
351  block_type const block = get_block_from_other_stream(size, stream_event, blocks, false);
352  if (block.is_valid()) { return block; }
353  }
354 
355  // no large enough blocks available on other streams, so sync and merge until we find one
356  {
357  block_type const block = get_block_from_other_stream(size, stream_event, blocks, true);
358  if (block.is_valid()) { return block; }
359  }
360 
361  log_summary_trace();
362 
363  // no large enough blocks available after merging, so grow the pool
364  block_type const block =
365  this->underlying().expand_pool(size, blocks, cuda_stream_view{stream_event.stream});
366 
367  return allocate_and_insert_remainder(block, size, blocks);
368  }
369 
383  block_type get_block_from_other_stream(std::size_t size,
384  stream_event_pair stream_event,
385  free_list& blocks,
386  bool merge_first)
387  {
388  auto find_block = [&](auto iter) {
389  auto other_event = iter->first.event;
390  auto& other_blocks = iter->second;
391  if (merge_first) {
392  merge_lists(stream_event, blocks, other_event, std::move(other_blocks));
393 
394  RMM_LOG_DEBUG("[A][Stream {:p}][{}B][Merged stream {:p}]",
395  fmt::ptr(stream_event.stream),
396  size,
397  fmt::ptr(iter->first.stream));
398 
399  stream_free_blocks_.erase(iter);
400 
401  block_type const block = blocks.get_block(size); // get the best fit block in merged lists
402  if (block.is_valid()) { return allocate_and_insert_remainder(block, size, blocks); }
403  } else {
404  block_type const block = other_blocks.get_block(size);
405  if (block.is_valid()) {
406  // Since we found a block associated with a different stream, we have to insert a wait
407  // on the stream's associated event into the allocating stream.
408  RMM_CUDA_TRY(cudaStreamWaitEvent(stream_event.stream, other_event, 0));
409  return allocate_and_insert_remainder(block, size, other_blocks);
410  }
411  }
412  return block_type{};
413  };
414 
415  for (auto iter = stream_free_blocks_.begin(), next_iter = iter;
416  iter != stream_free_blocks_.end();
417  iter = next_iter) {
418  ++next_iter; // Points to element after `iter` to allow erasing `iter` in the loop body
419 
420  if (iter->first.event != stream_event.event) {
421  block_type const block = find_block(iter);
422 
423  if (block.is_valid()) {
424  RMM_LOG_DEBUG((merge_first) ? "[A][Stream {:p}][{}B][Found after merging stream {:p}]"
425  : "[A][Stream {:p}][{}B][Taken from stream {:p}]",
426  fmt::ptr(stream_event.stream),
427  size,
428  fmt::ptr(iter->first.stream));
429  return block;
430  }
431  }
432  }
433  return block_type{};
434  }
435 
436  void merge_lists(stream_event_pair stream_event,
437  free_list& blocks,
438  cudaEvent_t other_event,
439  free_list&& other_blocks)
440  {
441  // Since we found a block associated with a different stream, we have to insert a wait
442  // on the stream's associated event into the allocating stream.
443  RMM_CUDA_TRY(cudaStreamWaitEvent(stream_event.stream, other_event, 0));
444 
445  // Merge the two free lists
446  blocks.insert(std::move(other_blocks));
447  }
448 
454  void release()
455  {
456  lock_guard lock(mtx_);
457 
458  for (auto s_e : stream_events_) {
459  RMM_ASSERT_CUDA_SUCCESS(cudaEventSynchronize(s_e.second.event));
460  RMM_ASSERT_CUDA_SUCCESS(cudaEventDestroy(s_e.second.event));
461  }
462 
463  stream_events_.clear();
464  stream_free_blocks_.clear();
465  }
466 
467  void log_summary_trace()
468  {
469 #if (SPDLOG_ACTIVE_LEVEL <= SPDLOG_LEVEL_TRACE)
470  std::size_t num_blocks{0};
471  std::size_t max_block{0};
472  std::size_t free_mem{0};
473  std::for_each(stream_free_blocks_.cbegin(),
474  stream_free_blocks_.cend(),
475  [this, &num_blocks, &max_block, &free_mem](auto const& freelist) {
476  num_blocks += freelist.second.size();
477  auto summary = this->underlying().free_list_summary(freelist.second);
478  max_block = std::max(summary.first, max_block);
479  free_mem += summary.second;
480  });
481  RMM_LOG_TRACE("[Summary][Free lists: {}][Blocks: {}][Max Block: {}][Total Free: {}]",
482  stream_free_blocks_.size(),
483  num_blocks,
484  max_block,
485  free_mem);
486 #endif
487  }
488 
489  // map of stream_event_pair --> free_list
490  // Event (or associated stream) must be synced before allocating from associated free_list to a
491  // different stream
492  std::map<stream_event_pair, free_list> stream_free_blocks_;
493 
494  // bidirectional mapping between non-default streams and events
495  std::unordered_map<cudaStream_t, stream_event_pair> stream_events_;
496 
497  // shared pointers to events keeps the events alive as long as either the thread that created them
498  // or the MR that is using them exists.
499  std::set<std::shared_ptr<event_wrapper>> default_stream_events;
500 
501  std::mutex mtx_; // mutex for thread-safe access
502 }; // namespace detail
503 
504 } // namespace rmm::mr::detail
rmm::mr::detail::crtp
A CRTP helper function.
Definition: stream_ordered_memory_resource.hpp:48
rmm::cuda_stream_view
Strongly-typed non-owning wrapper for CUDA streams with default constructor.
Definition: cuda_stream_view.hpp:34
rmm::mr::detail::free_list
Base class defining an interface for a list of free memory blocks.
Definition: free_list.hpp:65
rmm::out_of_memory
Exception thrown when RMM runs out of memory.
Definition: error.hpp:68
rmm::mr::detail::block
A simple block structure specifying the size and location of a block of memory, with a flag indicatin...
Definition: coalescing_free_list.hpp:36
rmm::mr::detail::stream_ordered_memory_resource< fixed_size_memory_resource< Upstream >, detail::fixed_size_free_list >::split_block
std::pair< block_type, block_type > split_block
Pair representing a block that has been split for allocation.
Definition: stream_ordered_memory_resource.hpp:120
rmm::mr::detail::stream_ordered_memory_resource
Base class for a stream-ordered memory resource.
Definition: stream_ordered_memory_resource.hpp:75
rmm::mr::detail::stream_ordered_memory_resource::insert_block
void insert_block(block_type const &block, cuda_stream_view stream)
Returns the block b (last used on stream stream_event) to the pool.
Definition: stream_ordered_memory_resource.hpp:152
rmm::mr::detail::block::pointer
char * pointer() const
Returns the pointer to the memory represented by this block.
Definition: coalescing_free_list.hpp:48
rmm::mr::detail::stream_ordered_memory_resource::do_deallocate
void do_deallocate(void *ptr, std::size_t size, cuda_stream_view stream) override
Deallocate memory pointed to by p.
Definition: stream_ordered_memory_resource.hpp:236
rmm::mr::detail::stream_ordered_memory_resource::get_mutex
std::mutex & get_mutex()
Get the mutex object.
Definition: stream_ordered_memory_resource.hpp:181
rmm::cuda_stream_view::value
constexpr cudaStream_t value() const noexcept
Get the wrapped stream.
Definition: cuda_stream_view.hpp:57
rmm::mr::device_memory_resource
Base class for all libcudf device memory allocation.
Definition: device_memory_resource.hpp:82
rmm::mr::detail::stream_ordered_memory_resource::stream_event_pair
Definition: stream_ordered_memory_resource.hpp:183
rmm::mr::detail::stream_ordered_memory_resource::do_allocate
void * do_allocate(std::size_t size, cuda_stream_view stream) override
Allocates memory of size at least bytes.
Definition: stream_ordered_memory_resource.hpp:201