stream.hpp
1 /*
2  * Copyright (c) 2023, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <sys/types.h>
19 #include <algorithm>
20 #include <cstdlib>
21 #include <kvikio/error.hpp>
22 #include <kvikio/shim/cuda.hpp>
23 #include <kvikio/shim/cufile.hpp>
24 #include <tuple>
25 
26 namespace kvikio {
27 
47 class StreamFuture {
48  private:
49  struct ArgByVal {
50  std::size_t size;
51  off_t file_offset;
52  off_t devPtr_offset;
53  ssize_t bytes_done;
54  };
55 
56  void* _devPtr_base{nullptr};
57  CUstream _stream{nullptr};
58  ArgByVal* _val{nullptr};
59  bool _stream_synchronized{false};
60 
61  public:
62  StreamFuture() noexcept = default;
63 
65  void* devPtr_base, std::size_t size, off_t file_offset, off_t devPtr_offset, CUstream stream)
66  : _devPtr_base{devPtr_base}, _stream{stream}
67  {
68  // Notice, we allocate the arguments using malloc() as specified in the cuFile docs:
69  // <https://docs.nvidia.com/gpudirect-storage/api-reference-guide/index.html#cufilewriteasync>
70  if ((_val = static_cast<ArgByVal*>(std::malloc(sizeof(ArgByVal)))) == nullptr) {
71  throw std::bad_alloc{};
72  }
73  *_val = {
74  .size = size, .file_offset = file_offset, .devPtr_offset = devPtr_offset, .bytes_done = 0};
75  }
76 
80  StreamFuture(const StreamFuture&) = delete;
81  StreamFuture& operator=(StreamFuture& o) = delete;
82  StreamFuture(StreamFuture&& o) noexcept
83  : _devPtr_base{std::exchange(o._devPtr_base, nullptr)},
84  _stream{std::exchange(o._stream, nullptr)},
85  _val{std::exchange(o._val, nullptr)},
86  _stream_synchronized{o._stream_synchronized}
87  {
88  }
89  StreamFuture& operator=(StreamFuture&& o) noexcept
90  {
91  _devPtr_base = std::exchange(o._devPtr_base, nullptr);
92  _stream = std::exchange(o._stream, nullptr);
93  _val = std::exchange(o._val, nullptr);
94  _stream_synchronized = o._stream_synchronized;
95  return *this;
96  }
97 
104  std::tuple<void*, std::size_t*, off_t*, off_t*, ssize_t*, CUstream> get_args() const
105  {
106  if (_val == nullptr) {
107  throw kvikio::CUfileException("cannot get arguments from an uninitialized StreamFuture");
108  }
109  return {_devPtr_base,
110  &_val->size,
111  &_val->file_offset,
112  &_val->devPtr_offset,
113  &_val->bytes_done,
114  _stream};
115  }
116 
124  std::size_t check_bytes_done()
125  {
126  if (_val == nullptr) {
127  throw kvikio::CUfileException("cannot check bytes done on an uninitialized StreamFuture");
128  }
129 
130  if (!_stream_synchronized) {
131  _stream_synchronized = true;
132  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream));
133  }
134 
135  CUFILE_CHECK_STREAM_IO(&_val->bytes_done);
136  // At this point, we know `*_val->bytes_done` is a positive value otherwise
137  // CUFILE_CHECK_STREAM_IO() would have raised an exception.
138  return static_cast<std::size_t>(_val->bytes_done);
139  }
140 
145  ~StreamFuture() noexcept
146  {
147  if (_val != nullptr) {
148  try {
150  } catch (const kvikio::CUfileException& e) {
151  std::cerr << e.what() << std::endl;
152  }
153  std::free(_val);
154  }
155  }
156 };
157 
158 } // namespace kvikio
Future of an asynchronous IO operation.
Definition: stream.hpp:47
StreamFuture(const StreamFuture &)=delete
StreamFuture support move semantic but isn't copyable.
~StreamFuture() noexcept
Free the by-value arguments and make sure the associated CUDA stream has been synchronized.
Definition: stream.hpp:145
std::tuple< void *, std::size_t *, off_t *, off_t *, ssize_t *, CUstream > get_args() const
Return the arguments of the future call.
Definition: stream.hpp:104
std::size_t check_bytes_done()
Return the number of bytes read or written by the future operation.
Definition: stream.hpp:124