21 #include <system_error>
26 #include <kvikio/error.hpp>
27 #include <kvikio/utils.hpp>
33 template <
typename F,
typename T>
34 std::future<std::size_t> submit_task(
35 F op, T buf, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset)
38 [=] {
return op(buf, size, file_offset, devPtr_offset); });
55 template <
typename F,
typename T>
56 std::future<std::size_t> parallel_io(F op,
59 std::size_t file_offset,
60 std::size_t task_size,
61 std::size_t devPtr_offset)
63 if (task_size == 0) {
throw std::invalid_argument(
"`task_size` cannot be zero"); }
66 if (task_size >= size || page_size >= size) {
67 return detail::submit_task(op, buf, size, file_offset, devPtr_offset);
71 std::vector<std::future<std::size_t>> tasks;
72 tasks.reserve(size / task_size + 2);
75 while (size >= task_size) {
76 tasks.push_back(detail::submit_task(op, buf, task_size, file_offset, devPtr_offset));
77 file_offset += task_size;
78 devPtr_offset += task_size;
83 if (size > 0) { tasks.push_back(detail::submit_task(op, buf, size, file_offset, devPtr_offset)); }
86 auto gather_tasks = [](std::vector<std::future<std::size_t>>&& tasks) -> std::size_t {
88 for (
auto& task : tasks) {
93 return std::async(std::launch::deferred, gather_tasks, std::move(tasks));
static BS::thread_pool & thread_pool()
Get the default thread pool.