stream.hpp
1 /*
2  * Copyright (c) 2023-2024, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <sys/types.h>
19 #include <cstdlib>
20 #include <iostream>
21 #include <kvikio/error.hpp>
22 #include <kvikio/shim/cuda.hpp>
23 #include <kvikio/shim/cufile.hpp>
24 #include <tuple>
25 #include <utility>
26 
27 namespace kvikio {
28 
48 class StreamFuture {
49  private:
50  struct ArgByVal {
51  std::size_t size;
52  off_t file_offset;
53  off_t devPtr_offset;
54  ssize_t bytes_done;
55  };
56 
57  void* _devPtr_base{nullptr};
58  CUstream _stream{nullptr};
59  ArgByVal* _val{nullptr};
60  bool _stream_synchronized{false};
61 
62  public:
63  StreamFuture() noexcept = default;
64 
66  void* devPtr_base, std::size_t size, off_t file_offset, off_t devPtr_offset, CUstream stream)
67  : _devPtr_base{devPtr_base}, _stream{stream}
68  {
69  // Notice, we allocate the arguments using malloc() as specified in the cuFile docs:
70  // <https://docs.nvidia.com/gpudirect-storage/api-reference-guide/index.html#cufilewriteasync>
71  if ((_val = static_cast<ArgByVal*>(std::malloc(sizeof(ArgByVal)))) == nullptr) {
72  throw std::bad_alloc{};
73  }
74  *_val = {
75  .size = size, .file_offset = file_offset, .devPtr_offset = devPtr_offset, .bytes_done = 0};
76  }
77 
81  StreamFuture(const StreamFuture&) = delete;
82  StreamFuture& operator=(StreamFuture& o) = delete;
83  StreamFuture(StreamFuture&& o) noexcept
84  : _devPtr_base{std::exchange(o._devPtr_base, nullptr)},
85  _stream{std::exchange(o._stream, nullptr)},
86  _val{std::exchange(o._val, nullptr)},
87  _stream_synchronized{o._stream_synchronized}
88  {
89  }
90  StreamFuture& operator=(StreamFuture&& o) noexcept
91  {
92  _devPtr_base = std::exchange(o._devPtr_base, nullptr);
93  _stream = std::exchange(o._stream, nullptr);
94  _val = std::exchange(o._val, nullptr);
95  _stream_synchronized = o._stream_synchronized;
96  return *this;
97  }
98 
105  std::tuple<void*, std::size_t*, off_t*, off_t*, ssize_t*, CUstream> get_args() const
106  {
107  if (_val == nullptr) {
108  throw kvikio::CUfileException("cannot get arguments from an uninitialized StreamFuture");
109  }
110  return {_devPtr_base,
111  &_val->size,
112  &_val->file_offset,
113  &_val->devPtr_offset,
114  &_val->bytes_done,
115  _stream};
116  }
117 
125  std::size_t check_bytes_done()
126  {
127  if (_val == nullptr) {
128  throw kvikio::CUfileException("cannot check bytes done on an uninitialized StreamFuture");
129  }
130 
131  if (!_stream_synchronized) {
132  _stream_synchronized = true;
133  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream));
134  }
135 
136  CUFILE_CHECK_BYTES_DONE(_val->bytes_done);
137  // At this point, we know `_val->bytes_done` is a positive value otherwise
138  // CUFILE_CHECK_BYTES_DONE() would have raised an exception.
139  return static_cast<std::size_t>(_val->bytes_done);
140  }
141 
146  ~StreamFuture() noexcept
147  {
148  if (_val != nullptr) {
149  try {
151  } catch (const kvikio::CUfileException& e) {
152  std::cerr << e.what() << std::endl;
153  }
154  std::free(_val);
155  }
156  }
157 };
158 
159 } // namespace kvikio
Future of an asynchronous IO operation.
Definition: stream.hpp:48
StreamFuture(const StreamFuture &)=delete
StreamFuture support move semantic but isn't copyable.
~StreamFuture() noexcept
Free the by-value arguments and make sure the associated CUDA stream has been synchronized.
Definition: stream.hpp:146
std::tuple< void *, std::size_t *, off_t *, off_t *, ssize_t *, CUstream > get_args() const
Return the arguments of the future call.
Definition: stream.hpp:105
std::size_t check_bytes_done()
Return the number of bytes read or written by the future operation.
Definition: stream.hpp:125