All Classes Namespaces Functions Enumerations Enumerator Modules Pages
parallel_operation.hpp
1 /*
2  * Copyright (c) 2021-2025, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <atomic>
19 #include <cassert>
20 #include <future>
21 #include <memory>
22 #include <numeric>
23 #include <system_error>
24 #include <type_traits>
25 #include <utility>
26 #include <vector>
27 
28 #include <kvikio/defaults.hpp>
29 #include <kvikio/error.hpp>
30 #include <kvikio/nvtx.hpp>
31 #include <kvikio/utils.hpp>
32 
33 namespace kvikio {
34 
35 namespace detail {
36 
49 template <typename F>
50 auto make_copyable_lambda(F op)
51 {
52  // Create the callable on the heap by moving from op. Use a shared pointer to manage its lifetime.
53  auto sp = std::make_shared<F>(std::move(op));
54 
55  // Use the copyable closure as the proxy of the move-only callable.
56  return
57  [sp](auto&&... args) -> decltype(auto) { return (*sp)(std::forward<decltype(args)>(args)...); };
58 }
59 
70 inline const std::pair<const nvtx_color_type&, std::uint64_t> get_next_color_and_call_idx() noexcept
71 {
72  static std::atomic_uint64_t call_counter{1ull};
73  auto call_idx = call_counter.fetch_add(1ull, std::memory_order_relaxed);
74  auto& nvtx_color = NvtxManager::get_color_by_index(call_idx);
75  return {nvtx_color, call_idx};
76 }
77 
83 template <typename F, typename T>
84 std::future<std::size_t> submit_task(F op,
85  T buf,
86  std::size_t size,
87  std::size_t file_offset,
88  std::size_t devPtr_offset,
89  std::uint64_t nvtx_payload = 0ull,
90  nvtx_color_type nvtx_color = NvtxManager::default_color())
91 {
92  static_assert(std::is_invocable_r_v<std::size_t,
93  decltype(op),
94  decltype(buf),
95  decltype(size),
96  decltype(file_offset),
97  decltype(devPtr_offset)>);
98 
99  return defaults::thread_pool().submit_task([=] {
100  KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
101  return op(buf, size, file_offset, devPtr_offset);
102  });
103 }
104 
112 template <typename F>
113 std::future<std::size_t> submit_move_only_task(
114  F op_move_only,
115  std::uint64_t nvtx_payload = 0ull,
116  nvtx_color_type nvtx_color = NvtxManager::default_color())
117 {
118  static_assert(std::is_invocable_r_v<std::size_t, F>);
119  auto op_copyable = make_copyable_lambda(std::move(op_move_only));
120  return defaults::thread_pool().submit_task([=] {
121  KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
122  return op_copyable();
123  });
124 }
125 
126 } // namespace detail
127 
140 template <typename F, typename T>
141 std::future<std::size_t> parallel_io(F op,
142  T buf,
143  std::size_t size,
144  std::size_t file_offset,
145  std::size_t task_size,
146  std::size_t devPtr_offset,
147  std::uint64_t call_idx = 0,
148  nvtx_color_type nvtx_color = NvtxManager::default_color())
149 {
150  KVIKIO_EXPECT(task_size > 0, "`task_size` must be positive", std::invalid_argument);
151  static_assert(std::is_invocable_r_v<std::size_t,
152  decltype(op),
153  decltype(buf),
154  decltype(size),
155  decltype(file_offset),
156  decltype(devPtr_offset)>);
157 
158  // Single-task guard
159  if (task_size >= size || page_size >= size) {
160  return detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color);
161  }
162 
163  std::vector<std::future<std::size_t>> tasks;
164  tasks.reserve(size / task_size);
165 
166  // 1) Submit all tasks but the last one. These are all `task_size` sized tasks.
167  while (size > task_size) {
168  tasks.push_back(
169  detail::submit_task(op, buf, task_size, file_offset, devPtr_offset, call_idx, nvtx_color));
170  file_offset += task_size;
171  devPtr_offset += task_size;
172  size -= task_size;
173  }
174 
175  // 2) Submit the last task, which consists of performing the last I/O and waiting the previous
176  // tasks.
177  auto last_task = [=, tasks = std::move(tasks)]() mutable -> std::size_t {
178  auto ret = op(buf, size, file_offset, devPtr_offset);
179  for (auto& task : tasks) {
180  ret += task.get();
181  }
182  return ret;
183  };
184  return detail::submit_move_only_task(std::move(last_task), call_idx, nvtx_color);
185 }
186 
187 } // namespace kvikio
static const nvtx_color_type & get_color_by_index(std::uint64_t idx) noexcept
Return the color at the given index from the internal color palette whose size n is a power of 2....
static const nvtx_color_type & default_color() noexcept
Return the default color.
static BS_thread_pool & thread_pool()
Get the default thread pool.
#define KVIKIO_EXPECT(...)
Macro for checking pre-conditions or conditions that throws an exception when a condition is violated...
Definition: error.hpp:216
KvikIO namespace.
Definition: batch.hpp:27
std::future< std::size_t > parallel_io(F op, T buf, std::size_t size, std::size_t file_offset, std::size_t task_size, std::size_t devPtr_offset, std::uint64_t call_idx=0, nvtx_color_type nvtx_color=NvtxManager::default_color())
Apply read or write operation in parallel.