All Classes Files Functions Enumerations Enumerator Pages
parallel_operation.hpp
1 /*
2  * Copyright (c) 2021-2024, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <cassert>
19 #include <future>
20 #include <numeric>
21 #include <system_error>
22 #include <utility>
23 #include <vector>
24 
25 #include <kvikio/defaults.hpp>
26 #include <kvikio/error.hpp>
27 #include <kvikio/utils.hpp>
28 
29 namespace kvikio {
30 
31 namespace detail {
32 
33 template <typename F, typename T>
34 std::future<std::size_t> submit_task(
35  F op, T buf, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset)
36 {
37  return defaults::thread_pool().submit_task(
38  [=] { return op(buf, size, file_offset, devPtr_offset); });
39 }
40 
41 } // namespace detail
42 
55 template <typename F, typename T>
56 std::future<std::size_t> parallel_io(F op,
57  T buf,
58  std::size_t size,
59  std::size_t file_offset,
60  std::size_t task_size,
61  std::size_t devPtr_offset)
62 {
63  if (task_size == 0) { throw std::invalid_argument("`task_size` cannot be zero"); }
64 
65  // Single-task guard
66  if (task_size >= size || page_size >= size) {
67  return detail::submit_task(op, buf, size, file_offset, devPtr_offset);
68  }
69 
70  // We know an upper bound of the total number of tasks
71  std::vector<std::future<std::size_t>> tasks;
72  tasks.reserve(size / task_size + 2);
73 
74  // 1) Submit `task_size` sized tasks
75  while (size >= task_size) {
76  tasks.push_back(detail::submit_task(op, buf, task_size, file_offset, devPtr_offset));
77  file_offset += task_size;
78  devPtr_offset += task_size;
79  size -= task_size;
80  }
81 
82  // 2) Submit a task for the remainder
83  if (size > 0) { tasks.push_back(detail::submit_task(op, buf, size, file_offset, devPtr_offset)); }
84 
85  // Finally, we sum the result of all tasks.
86  auto gather_tasks = [](std::vector<std::future<std::size_t>>&& tasks) -> std::size_t {
87  std::size_t ret = 0;
88  for (auto& task : tasks) {
89  ret += task.get();
90  }
91  return ret;
92  };
93  return std::async(std::launch::deferred, gather_tasks, std::move(tasks));
94 }
95 
96 } // namespace kvikio
static BS::thread_pool & thread_pool()
Get the default thread pool.