parallel_operation.hpp
1 /*
2  * SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION.
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 #pragma once
6 
7 #include <atomic>
8 #include <cassert>
9 #include <future>
10 #include <memory>
11 #include <numeric>
12 #include <system_error>
13 #include <type_traits>
14 #include <utility>
15 #include <vector>
16 
17 #include <kvikio/defaults.hpp>
18 #include <kvikio/detail/nvtx.hpp>
19 #include <kvikio/error.hpp>
20 #include <kvikio/utils.hpp>
21 
22 namespace kvikio {
23 
24 namespace detail {
25 
38 template <typename F>
39 auto make_copyable_lambda(F op)
40 {
41  // Create the callable on the heap by moving from op. Use a shared pointer to manage its lifetime.
42  auto sp = std::make_shared<F>(std::move(op));
43 
44  // Use the copyable closure as the proxy of the move-only callable.
45  return
46  [sp](auto&&... args) -> decltype(auto) { return (*sp)(std::forward<decltype(args)>(args)...); };
47 }
48 
59 inline const std::pair<const nvtx_color_type&, std::uint64_t> get_next_color_and_call_idx() noexcept
60 {
61  static std::atomic_uint64_t call_counter{1ull};
62  auto call_idx = call_counter.fetch_add(1ull, std::memory_order_relaxed);
63  auto& nvtx_color = NvtxManager::get_color_by_index(call_idx);
64  return {nvtx_color, call_idx};
65 }
66 
72 template <typename F, typename T>
73 std::future<std::size_t> submit_task(F op,
74  T buf,
75  std::size_t size,
76  std::size_t file_offset,
77  std::size_t devPtr_offset,
78  std::uint64_t nvtx_payload = 0ull,
79  nvtx_color_type nvtx_color = NvtxManager::default_color())
80 {
81  static_assert(std::is_invocable_r_v<std::size_t,
82  decltype(op),
83  decltype(buf),
84  decltype(size),
85  decltype(file_offset),
86  decltype(devPtr_offset)>);
87 
88  return defaults::thread_pool().submit_task([=] {
89  KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
90  return op(buf, size, file_offset, devPtr_offset);
91  });
92 }
93 
101 template <typename F>
102 std::future<std::size_t> submit_move_only_task(
103  F op_move_only,
104  std::uint64_t nvtx_payload = 0ull,
105  nvtx_color_type nvtx_color = NvtxManager::default_color())
106 {
107  static_assert(std::is_invocable_r_v<std::size_t, F>);
108  auto op_copyable = make_copyable_lambda(std::move(op_move_only));
109  return defaults::thread_pool().submit_task([=] {
110  KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
111  return op_copyable();
112  });
113 }
114 
115 } // namespace detail
116 
129 template <typename F, typename T>
130 std::future<std::size_t> parallel_io(F op,
131  T buf,
132  std::size_t size,
133  std::size_t file_offset,
134  std::size_t task_size,
135  std::size_t devPtr_offset,
136  std::uint64_t call_idx = 0,
137  nvtx_color_type nvtx_color = NvtxManager::default_color())
138 {
139  KVIKIO_EXPECT(task_size > 0, "`task_size` must be positive", std::invalid_argument);
140  static_assert(std::is_invocable_r_v<std::size_t,
141  decltype(op),
142  decltype(buf),
143  decltype(size),
144  decltype(file_offset),
145  decltype(devPtr_offset)>);
146 
147  // Single-task guard
148  if (task_size >= size || get_page_size() >= size) {
149  return detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color);
150  }
151 
152  std::vector<std::future<std::size_t>> tasks;
153  tasks.reserve(size / task_size);
154 
155  // 1) Submit all tasks but the last one. These are all `task_size` sized tasks.
156  while (size > task_size) {
157  tasks.push_back(
158  detail::submit_task(op, buf, task_size, file_offset, devPtr_offset, call_idx, nvtx_color));
159  file_offset += task_size;
160  devPtr_offset += task_size;
161  size -= task_size;
162  }
163 
164  // 2) Submit the last task, which consists of performing the last I/O and waiting the previous
165  // tasks.
166  auto last_task = [=, tasks = std::move(tasks)]() mutable -> std::size_t {
167  auto ret = op(buf, size, file_offset, devPtr_offset);
168  for (auto& task : tasks) {
169  ret += task.get();
170  }
171  return ret;
172  };
173  return detail::submit_move_only_task(std::move(last_task), call_idx, nvtx_color);
174 }
175 
176 } // namespace kvikio
static const nvtx_color_type & get_color_by_index(std::uint64_t idx) noexcept
Return the color at the given index from the internal color palette whose size n is a power of 2....
static const nvtx_color_type & default_color() noexcept
Return the default color.
static BS_thread_pool & thread_pool()
Get the default thread pool.
#define KVIKIO_EXPECT(...)
Macro for checking pre-conditions or conditions that throws an exception when a condition is violated...
Definition: error.hpp:205
KvikIO namespace.
Definition: batch.hpp:16
std::future< std::size_t > parallel_io(F op, T buf, std::size_t size, std::size_t file_offset, std::size_t task_size, std::size_t devPtr_offset, std::uint64_t call_idx=0, nvtx_color_type nvtx_color=NvtxManager::default_color())
Apply read or write operation in parallel.