23 #include <system_error>
24 #include <type_traits>
28 #include <kvikio/defaults.hpp>
29 #include <kvikio/error.hpp>
30 #include <kvikio/nvtx.hpp>
31 #include <kvikio/utils.hpp>
50 auto make_copyable_lambda(F op)
53 auto sp = std::make_shared<F>(std::move(op));
57 [sp](
auto&&... args) -> decltype(
auto) {
return (*sp)(std::forward<decltype(args)>(args)...); };
70 inline const std::pair<const nvtx_color_type&, std::uint64_t> get_next_color_and_call_idx() noexcept
72 static std::atomic_uint64_t call_counter{1ull};
73 auto call_idx = call_counter.fetch_add(1ull, std::memory_order_relaxed);
75 return {nvtx_color, call_idx};
83 template <
typename F,
typename T>
84 std::future<std::size_t> submit_task(F op,
87 std::size_t file_offset,
88 std::size_t devPtr_offset,
89 std::uint64_t nvtx_payload = 0ull,
92 static_assert(std::is_invocable_r_v<std::size_t,
96 decltype(file_offset),
97 decltype(devPtr_offset)>);
100 KVIKIO_NVTX_SCOPED_RANGE(
"task", nvtx_payload, nvtx_color);
101 return op(buf, size, file_offset, devPtr_offset);
112 template <
typename F>
113 std::future<std::size_t> submit_move_only_task(
115 std::uint64_t nvtx_payload = 0ull,
118 static_assert(std::is_invocable_r_v<std::size_t, F>);
119 auto op_copyable = make_copyable_lambda(std::move(op_move_only));
121 KVIKIO_NVTX_SCOPED_RANGE(
"task", nvtx_payload, nvtx_color);
122 return op_copyable();
140 template <
typename F,
typename T>
144 std::size_t file_offset,
145 std::size_t task_size,
146 std::size_t devPtr_offset,
147 std::uint64_t call_idx = 0,
150 KVIKIO_EXPECT(task_size > 0,
"`task_size` must be positive", std::invalid_argument);
151 static_assert(std::is_invocable_r_v<std::size_t,
155 decltype(file_offset),
156 decltype(devPtr_offset)>);
159 if (task_size >= size || page_size >= size) {
160 return detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color);
163 std::vector<std::future<std::size_t>> tasks;
164 tasks.reserve(size / task_size);
167 while (size > task_size) {
169 detail::submit_task(op, buf, task_size, file_offset, devPtr_offset, call_idx, nvtx_color));
170 file_offset += task_size;
171 devPtr_offset += task_size;
177 auto last_task = [=, tasks = std::move(tasks)]()
mutable -> std::size_t {
178 auto ret = op(buf, size, file_offset, devPtr_offset);
179 for (
auto& task : tasks) {
184 return detail::submit_move_only_task(std::move(last_task), call_idx, nvtx_color);
static const nvtx_color_type & get_color_by_index(std::uint64_t idx) noexcept
Return the color at the given index from the internal color palette whose size n is a power of 2....
static const nvtx_color_type & default_color() noexcept
Return the default color.
static BS_thread_pool & thread_pool()
Get the default thread pool.
#define KVIKIO_EXPECT(...)
Macro for checking pre-conditions or conditions that throws an exception when a condition is violated...
std::future< std::size_t > parallel_io(F op, T buf, std::size_t size, std::size_t file_offset, std::size_t task_size, std::size_t devPtr_offset, std::uint64_t call_idx=0, nvtx_color_type nvtx_color=NvtxManager::default_color())
Apply read or write operation in parallel.