12 #include <system_error>
13 #include <type_traits>
17 #include <kvikio/defaults.hpp>
18 #include <kvikio/detail/nvtx.hpp>
19 #include <kvikio/error.hpp>
20 #include <kvikio/utils.hpp>
39 auto make_copyable_lambda(F op)
42 auto sp = std::make_shared<F>(std::move(op));
46 [sp](
auto&&... args) -> decltype(
auto) {
return (*sp)(std::forward<decltype(args)>(args)...); };
59 inline const std::pair<const nvtx_color_type&, std::uint64_t> get_next_color_and_call_idx() noexcept
61 static std::atomic_uint64_t call_counter{1ull};
62 auto call_idx = call_counter.fetch_add(1ull, std::memory_order_relaxed);
64 return {nvtx_color, call_idx};
72 template <
typename F,
typename T>
73 std::future<std::size_t> submit_task(F op,
76 std::size_t file_offset,
77 std::size_t devPtr_offset,
78 std::uint64_t nvtx_payload = 0ull,
81 static_assert(std::is_invocable_r_v<std::size_t,
85 decltype(file_offset),
86 decltype(devPtr_offset)>);
89 KVIKIO_NVTX_SCOPED_RANGE(
"task", nvtx_payload, nvtx_color);
90 return op(buf, size, file_offset, devPtr_offset);
101 template <
typename F>
102 std::future<std::size_t> submit_move_only_task(
104 std::uint64_t nvtx_payload = 0ull,
107 static_assert(std::is_invocable_r_v<std::size_t, F>);
108 auto op_copyable = make_copyable_lambda(std::move(op_move_only));
110 KVIKIO_NVTX_SCOPED_RANGE(
"task", nvtx_payload, nvtx_color);
111 return op_copyable();
129 template <
typename F,
typename T>
133 std::size_t file_offset,
134 std::size_t task_size,
135 std::size_t devPtr_offset,
136 std::uint64_t call_idx = 0,
139 KVIKIO_EXPECT(task_size > 0,
"`task_size` must be positive", std::invalid_argument);
140 static_assert(std::is_invocable_r_v<std::size_t,
144 decltype(file_offset),
145 decltype(devPtr_offset)>);
148 if (task_size >= size || get_page_size() >= size) {
149 return detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color);
152 std::vector<std::future<std::size_t>> tasks;
153 tasks.reserve(size / task_size);
156 while (size > task_size) {
158 detail::submit_task(op, buf, task_size, file_offset, devPtr_offset, call_idx, nvtx_color));
159 file_offset += task_size;
160 devPtr_offset += task_size;
166 auto last_task = [=, tasks = std::move(tasks)]()
mutable -> std::size_t {
167 auto ret = op(buf, size, file_offset, devPtr_offset);
168 for (
auto& task : tasks) {
173 return detail::submit_move_only_task(std::move(last_task), call_idx, nvtx_color);
static const nvtx_color_type & get_color_by_index(std::uint64_t idx) noexcept
Return the color at the given index from the internal color palette whose size n is a power of 2....
static const nvtx_color_type & default_color() noexcept
Return the default color.
static BS_thread_pool & thread_pool()
Get the default thread pool.
#define KVIKIO_EXPECT(...)
Macro for checking pre-conditions or conditions that throws an exception when a condition is violated...
std::future< std::size_t > parallel_io(F op, T buf, std::size_t size, std::size_t file_offset, std::size_t task_size, std::size_t devPtr_offset, std::uint64_t call_idx=0, nvtx_color_type nvtx_color=NvtxManager::default_color())
Apply read or write operation in parallel.