12 #include <system_error>
13 #include <type_traits>
17 #include <kvikio/defaults.hpp>
18 #include <kvikio/detail/nvtx.hpp>
19 #include <kvikio/error.hpp>
20 #include <kvikio/threadpool_wrapper.hpp>
21 #include <kvikio/utils.hpp>
40 auto make_copyable_lambda(F op)
43 auto sp = std::make_shared<F>(std::move(op));
47 [sp](
auto&&... args) -> decltype(
auto) {
return (*sp)(std::forward<decltype(args)>(args)...); };
60 inline const std::pair<const nvtx_color_type&, std::uint64_t> get_next_color_and_call_idx() noexcept
62 static std::atomic_uint64_t call_counter{1ull};
63 auto call_idx = call_counter.fetch_add(1ull, std::memory_order_relaxed);
65 return {nvtx_color, call_idx};
73 template <
typename F,
typename T>
74 std::future<std::size_t> submit_task(F op,
77 std::size_t file_offset,
78 std::size_t devPtr_offset,
80 std::uint64_t nvtx_payload = 0ull,
83 static_assert(std::is_invocable_r_v<std::size_t,
87 decltype(file_offset),
88 decltype(devPtr_offset)>);
90 return thread_pool->submit_task([=] {
91 KVIKIO_NVTX_SCOPED_RANGE(
"task", nvtx_payload, nvtx_color);
92 return op(buf, size, file_offset, devPtr_offset);
103 template <
typename F>
104 std::future<std::size_t> submit_move_only_task(
107 std::uint64_t nvtx_payload = 0ull,
110 static_assert(std::is_invocable_r_v<std::size_t, F>);
111 auto op_copyable = make_copyable_lambda(std::move(op_move_only));
112 return thread_pool->submit_task([=] {
113 KVIKIO_NVTX_SCOPED_RANGE(
"task", nvtx_payload, nvtx_color);
114 return op_copyable();
136 template <
typename F,
typename T>
140 std::size_t file_offset,
141 std::size_t task_size,
142 std::size_t devPtr_offset,
144 std::uint64_t call_idx = 0,
147 KVIKIO_EXPECT(task_size > 0,
"`task_size` must be positive", std::invalid_argument);
148 KVIKIO_EXPECT(thread_pool !=
nullptr,
"The thread pool must not be nullptr");
149 static_assert(std::is_invocable_r_v<std::size_t,
153 decltype(file_offset),
154 decltype(devPtr_offset)>);
157 if (task_size >= size || get_page_size() >= size) {
158 return detail::submit_task(
159 op, buf, size, file_offset, devPtr_offset, thread_pool, call_idx, nvtx_color);
162 std::vector<std::future<std::size_t>> tasks;
163 tasks.reserve(size / task_size);
166 while (size > task_size) {
167 tasks.push_back(detail::submit_task(
168 op, buf, task_size, file_offset, devPtr_offset, thread_pool, call_idx, nvtx_color));
169 file_offset += task_size;
170 devPtr_offset += task_size;
176 auto last_task = [=, tasks = std::move(tasks)]()
mutable -> std::size_t {
177 auto ret = op(buf, size, file_offset, devPtr_offset);
178 for (
auto& task : tasks) {
183 return detail::submit_move_only_task(std::move(last_task), thread_pool, call_idx, nvtx_color);
static const nvtx_color_type & get_color_by_index(std::uint64_t idx) noexcept
Return the color at the given index from the internal color palette whose size n is a power of 2....
static const nvtx_color_type & default_color() noexcept
Return the default color.
static ThreadPool & thread_pool()
Get the default thread pool.
#define KVIKIO_EXPECT(...)
Macro for checking pre-conditions or conditions that throws an exception when a condition is violated...
BS::thread_pool ThreadPool
Thread pool type used for parallel I/O operations.
std::future< std::size_t > parallel_io(F op, T buf, std::size_t size, std::size_t file_offset, std::size_t task_size, std::size_t devPtr_offset, ThreadPool *thread_pool=&defaults::thread_pool(), std::uint64_t call_idx=0, nvtx_color_type nvtx_color=NvtxManager::default_color())
Apply read or write operation in parallel.