23 #include <system_error> 
   24 #include <type_traits> 
   28 #include <kvikio/defaults.hpp> 
   29 #include <kvikio/error.hpp> 
   30 #include <kvikio/nvtx.hpp> 
   31 #include <kvikio/utils.hpp> 
   50 auto make_copyable_lambda(F op)
 
   53   auto sp = std::make_shared<F>(std::move(op));
 
   57     [sp](
auto&&... args) -> decltype(
auto) { 
return (*sp)(std::forward<decltype(args)>(args)...); };
 
   70 inline const std::pair<const nvtx_color_type&, std::uint64_t> get_next_color_and_call_idx() noexcept
 
   72   static std::atomic_uint64_t call_counter{1ull};
 
   73   auto call_idx    = call_counter.fetch_add(1ull, std::memory_order_relaxed);
 
   75   return {nvtx_color, call_idx};
 
   83 template <
typename F, 
typename T>
 
   84 std::future<std::size_t> submit_task(F op,
 
   87                                      std::size_t file_offset,
 
   88                                      std::size_t devPtr_offset,
 
   89                                      std::uint64_t nvtx_payload = 0ull,
 
   92   static_assert(std::is_invocable_r_v<std::size_t,
 
   96                                       decltype(file_offset),
 
   97                                       decltype(devPtr_offset)>);
 
  100     KVIKIO_NVTX_SCOPED_RANGE(
"task", nvtx_payload, nvtx_color);
 
  101     return op(buf, size, file_offset, devPtr_offset);
 
  112 template <
typename F>
 
  113 std::future<std::size_t> submit_move_only_task(
 
  115   std::uint64_t nvtx_payload = 0ull,
 
  118   static_assert(std::is_invocable_r_v<std::size_t, F>);
 
  119   auto op_copyable = make_copyable_lambda(std::move(op_move_only));
 
  121     KVIKIO_NVTX_SCOPED_RANGE(
"task", nvtx_payload, nvtx_color);
 
  122     return op_copyable();
 
  140 template <
typename F, 
typename T>
 
  144                                      std::size_t file_offset,
 
  145                                      std::size_t task_size,
 
  146                                      std::size_t devPtr_offset,
 
  147                                      std::uint64_t call_idx     = 0,
 
  150   KVIKIO_EXPECT(task_size > 0, 
"`task_size` must be positive", std::invalid_argument);
 
  151   static_assert(std::is_invocable_r_v<std::size_t,
 
  155                                       decltype(file_offset),
 
  156                                       decltype(devPtr_offset)>);
 
  159   if (task_size >= size || get_page_size() >= size) {
 
  160     return detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color);
 
  163   std::vector<std::future<std::size_t>> tasks;
 
  164   tasks.reserve(size / task_size);
 
  167   while (size > task_size) {
 
  169       detail::submit_task(op, buf, task_size, file_offset, devPtr_offset, call_idx, nvtx_color));
 
  170     file_offset += task_size;
 
  171     devPtr_offset += task_size;
 
  177   auto last_task = [=, tasks = std::move(tasks)]() 
mutable -> std::size_t {
 
  178     auto ret = op(buf, size, file_offset, devPtr_offset);
 
  179     for (
auto& task : tasks) {
 
  184   return detail::submit_move_only_task(std::move(last_task), call_idx, nvtx_color);
 
static const nvtx_color_type & get_color_by_index(std::uint64_t idx) noexcept
Return the color at the given index from the internal color palette whose size n is a power of 2....
 
static const nvtx_color_type & default_color() noexcept
Return the default color.
 
static BS_thread_pool & thread_pool()
Get the default thread pool.
 
#define KVIKIO_EXPECT(...)
Macro for checking pre-conditions or conditions that throws an exception when a condition is violated...
 
std::future< std::size_t > parallel_io(F op, T buf, std::size_t size, std::size_t file_offset, std::size_t task_size, std::size_t devPtr_offset, std::uint64_t call_idx=0, nvtx_color_type nvtx_color=NvtxManager::default_color())
Apply read or write operation in parallel.