parallel_operation.hpp
1 /*
2  * SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION.
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 #pragma once
6 
7 #include <atomic>
8 #include <cassert>
9 #include <future>
10 #include <memory>
11 #include <numeric>
12 #include <system_error>
13 #include <type_traits>
14 #include <utility>
15 #include <vector>
16 
17 #include <kvikio/defaults.hpp>
18 #include <kvikio/detail/nvtx.hpp>
19 #include <kvikio/error.hpp>
20 #include <kvikio/threadpool_wrapper.hpp>
21 #include <kvikio/utils.hpp>
22 
23 namespace kvikio {
24 
25 namespace detail {
26 
39 template <typename F>
40 auto make_copyable_lambda(F op)
41 {
42  // Create the callable on the heap by moving from op. Use a shared pointer to manage its lifetime.
43  auto sp = std::make_shared<F>(std::move(op));
44 
45  // Use the copyable closure as the proxy of the move-only callable.
46  return
47  [sp](auto&&... args) -> decltype(auto) { return (*sp)(std::forward<decltype(args)>(args)...); };
48 }
49 
60 inline const std::pair<const nvtx_color_type&, std::uint64_t> get_next_color_and_call_idx() noexcept
61 {
62  static std::atomic_uint64_t call_counter{1ull};
63  auto call_idx = call_counter.fetch_add(1ull, std::memory_order_relaxed);
64  auto& nvtx_color = NvtxManager::get_color_by_index(call_idx);
65  return {nvtx_color, call_idx};
66 }
67 
73 template <typename F, typename T>
74 std::future<std::size_t> submit_task(F op,
75  T buf,
76  std::size_t size,
77  std::size_t file_offset,
78  std::size_t devPtr_offset,
79  ThreadPool* thread_pool = &defaults::thread_pool(),
80  std::uint64_t nvtx_payload = 0ull,
81  nvtx_color_type nvtx_color = NvtxManager::default_color())
82 {
83  static_assert(std::is_invocable_r_v<std::size_t,
84  decltype(op),
85  decltype(buf),
86  decltype(size),
87  decltype(file_offset),
88  decltype(devPtr_offset)>);
89 
90  return thread_pool->submit_task([=] {
91  KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
92  return op(buf, size, file_offset, devPtr_offset);
93  });
94 }
95 
103 template <typename F>
104 std::future<std::size_t> submit_move_only_task(
105  F op_move_only,
106  ThreadPool* thread_pool = &defaults::thread_pool(),
107  std::uint64_t nvtx_payload = 0ull,
108  nvtx_color_type nvtx_color = NvtxManager::default_color())
109 {
110  static_assert(std::is_invocable_r_v<std::size_t, F>);
111  auto op_copyable = make_copyable_lambda(std::move(op_move_only));
112  return thread_pool->submit_task([=] {
113  KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
114  return op_copyable();
115  });
116 }
117 
118 } // namespace detail
119 
136 template <typename F, typename T>
137 std::future<std::size_t> parallel_io(F op,
138  T buf,
139  std::size_t size,
140  std::size_t file_offset,
141  std::size_t task_size,
142  std::size_t devPtr_offset,
143  ThreadPool* thread_pool = &defaults::thread_pool(),
144  std::uint64_t call_idx = 0,
145  nvtx_color_type nvtx_color = NvtxManager::default_color())
146 {
147  KVIKIO_EXPECT(task_size > 0, "`task_size` must be positive", std::invalid_argument);
148  KVIKIO_EXPECT(thread_pool != nullptr, "The thread pool must not be nullptr");
149  static_assert(std::is_invocable_r_v<std::size_t,
150  decltype(op),
151  decltype(buf),
152  decltype(size),
153  decltype(file_offset),
154  decltype(devPtr_offset)>);
155 
156  // Single-task guard
157  if (task_size >= size || get_page_size() >= size) {
158  return detail::submit_task(
159  op, buf, size, file_offset, devPtr_offset, thread_pool, call_idx, nvtx_color);
160  }
161 
162  std::vector<std::future<std::size_t>> tasks;
163  tasks.reserve(size / task_size);
164 
165  // 1) Submit all tasks but the last one. These are all `task_size` sized tasks.
166  while (size > task_size) {
167  tasks.push_back(detail::submit_task(
168  op, buf, task_size, file_offset, devPtr_offset, thread_pool, call_idx, nvtx_color));
169  file_offset += task_size;
170  devPtr_offset += task_size;
171  size -= task_size;
172  }
173 
174  // 2) Submit the last task, which consists of performing the last I/O and waiting the previous
175  // tasks.
176  auto last_task = [=, tasks = std::move(tasks)]() mutable -> std::size_t {
177  auto ret = op(buf, size, file_offset, devPtr_offset);
178  for (auto& task : tasks) {
179  ret += task.get();
180  }
181  return ret;
182  };
183  return detail::submit_move_only_task(std::move(last_task), thread_pool, call_idx, nvtx_color);
184 }
185 
186 } // namespace kvikio
static const nvtx_color_type & get_color_by_index(std::uint64_t idx) noexcept
Return the color at the given index from the internal color palette whose size n is a power of 2....
static const nvtx_color_type & default_color() noexcept
Return the default color.
static ThreadPool & thread_pool()
Get the default thread pool.
#define KVIKIO_EXPECT(...)
Macro for checking pre-conditions or conditions that throws an exception when a condition is violated...
Definition: error.hpp:205
KvikIO namespace.
Definition: batch.hpp:16
BS::thread_pool ThreadPool
Thread pool type used for parallel I/O operations.
std::future< std::size_t > parallel_io(F op, T buf, std::size_t size, std::size_t file_offset, std::size_t task_size, std::size_t devPtr_offset, ThreadPool *thread_pool=&defaults::thread_pool(), std::uint64_t call_idx=0, nvtx_color_type nvtx_color=NvtxManager::default_color())
Apply read or write operation in parallel.