file_handle.hpp
1 /*
2  * Copyright (c) 2021-2024, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <fcntl.h>
19 #include <sys/stat.h>
20 #include <sys/types.h>
21 #include <unistd.h>
22 
23 #include <cstddef>
24 #include <cstdlib>
25 #include <iostream>
26 #include <numeric>
27 #include <optional>
28 #include <system_error>
29 #include <utility>
30 
31 #include <kvikio/buffer.hpp>
32 #include <kvikio/defaults.hpp>
33 #include <kvikio/error.hpp>
34 #include <kvikio/parallel_operation.hpp>
35 #include <kvikio/posix_io.hpp>
36 #include <kvikio/shim/cufile.hpp>
37 #include <kvikio/stream.hpp>
38 #include <kvikio/utils.hpp>
39 
40 namespace kvikio {
41 namespace detail {
42 
50 inline int open_fd_parse_flags(const std::string& flags, bool o_direct)
51 {
52  int file_flags = -1;
53  if (flags.empty()) { throw std::invalid_argument("Unknown file open flag"); }
54  switch (flags[0]) {
55  case 'r':
56  file_flags = O_RDONLY;
57  if (flags[1] == '+') { file_flags = O_RDWR; }
58  break;
59  case 'w':
60  file_flags = O_WRONLY;
61  if (flags[1] == '+') { file_flags = O_RDWR; }
62  file_flags |= O_CREAT | O_TRUNC;
63  break;
64  case 'a': throw std::invalid_argument("Open flag 'a' isn't supported");
65  default: throw std::invalid_argument("Unknown file open flag");
66  }
67  file_flags |= O_CLOEXEC;
68  if (o_direct) { file_flags |= O_DIRECT; }
69  return file_flags;
70 }
71 
80 inline int open_fd(const std::string& file_path,
81  const std::string& flags,
82  bool o_direct,
83  mode_t mode)
84 {
85  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-vararg)
86  int fd = ::open(file_path.c_str(), open_fd_parse_flags(flags, o_direct), mode);
87  if (fd == -1) { throw std::system_error(errno, std::generic_category(), "Unable to open file"); }
88  return fd;
89 }
90 
96 [[nodiscard]] inline int open_flags(int fd)
97 {
98  int ret = fcntl(fd, F_GETFL); // NOLINT(cppcoreguidelines-pro-type-vararg)
99  if (ret == -1) {
100  throw std::system_error(errno, std::generic_category(), "Unable to retrieve open flags");
101  }
102  return ret;
103 }
104 
111 [[nodiscard]] inline std::size_t get_file_size(int file_descriptor)
112 {
113  struct stat st {};
114  int ret = fstat(file_descriptor, &st);
115  if (ret == -1) {
116  throw std::system_error(errno, std::generic_category(), "Unable to query file size");
117  }
118  return static_cast<std::size_t>(st.st_size);
119 }
120 
121 } // namespace detail
122 
128 class FileHandle {
129  private:
130  // We use two file descriptors, one opened with the O_DIRECT flag and one without.
131  int _fd_direct_on{-1};
132  int _fd_direct_off{-1};
133  bool _initialized{false};
134  bool _compat_mode{false};
135  mutable std::size_t _nbytes{0}; // The size of the underlying file, zero means unknown.
136  CUfileHandle_t _handle{};
137 
138  public:
139  static constexpr mode_t m644 = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
140  FileHandle() noexcept = default;
141 
158  FileHandle(const std::string& file_path,
159  const std::string& flags = "r",
160  mode_t mode = m644,
161  bool compat_mode = defaults::compat_mode())
162  : _fd_direct_off{detail::open_fd(file_path, flags, false, mode)},
163  _initialized{true},
164  _compat_mode{compat_mode}
165  {
166  try {
167  _fd_direct_on = detail::open_fd(file_path, flags, true, mode);
168  } catch (const std::system_error&) {
169  _compat_mode = true; // Fall back to compat mode if we cannot open the file with O_DIRECT
170  }
171 
172  if (!_compat_mode) {
173  CUfileDescr_t desc{}; // It is important to set to zero!
174  desc.type = CU_FILE_HANDLE_TYPE_OPAQUE_FD;
175  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
176  desc.handle.fd = _fd_direct_on;
177  CUFILE_TRY(cuFileAPI::instance().HandleRegister(&_handle, &desc));
178  }
179  }
180 
184  FileHandle(const FileHandle&) = delete;
185  FileHandle& operator=(FileHandle const&) = delete;
186  FileHandle(FileHandle&& o) noexcept
187  : _fd_direct_on{std::exchange(o._fd_direct_on, -1)},
188  _fd_direct_off{std::exchange(o._fd_direct_off, -1)},
189  _initialized{std::exchange(o._initialized, false)},
190  _compat_mode{std::exchange(o._compat_mode, false)},
191  _nbytes{std::exchange(o._nbytes, 0)},
192  _handle{std::exchange(o._handle, CUfileHandle_t{})}
193  {
194  }
195  FileHandle& operator=(FileHandle&& o) noexcept
196  {
197  _fd_direct_on = std::exchange(o._fd_direct_on, -1);
198  _fd_direct_off = std::exchange(o._fd_direct_off, -1);
199  _initialized = std::exchange(o._initialized, false);
200  _compat_mode = std::exchange(o._compat_mode, false);
201  _nbytes = std::exchange(o._nbytes, 0);
202  _handle = std::exchange(o._handle, CUfileHandle_t{});
203  return *this;
204  }
205  ~FileHandle() noexcept { close(); }
206 
207  [[nodiscard]] bool closed() const noexcept { return !_initialized; }
208 
212  void close() noexcept
213  {
214  if (closed()) { return; }
215 
216  if (!_compat_mode) { cuFileAPI::instance().HandleDeregister(_handle); }
217  ::close(_fd_direct_off);
218  if (_fd_direct_on != -1) { ::close(_fd_direct_on); }
219  _fd_direct_on = -1;
220  _fd_direct_off = -1;
221  _initialized = false;
222  }
223 
232  [[nodiscard]] CUfileHandle_t handle()
233  {
234  if (closed()) { throw CUfileException("File handle is closed"); }
235  if (_compat_mode) {
236  throw CUfileException("The underlying cuFile handle isn't available in compatibility mode");
237  }
238  return _handle;
239  }
240 
250  [[nodiscard]] int fd() const noexcept { return _fd_direct_off; }
251 
261  [[nodiscard]] int fd_open_flags() const { return detail::open_flags(_fd_direct_off); }
262 
270  [[nodiscard]] inline std::size_t nbytes() const
271  {
272  if (closed()) { return 0; }
273  if (_nbytes == 0) { _nbytes = detail::get_file_size(_fd_direct_off); }
274  return _nbytes;
275  }
276 
300  std::size_t read(void* devPtr_base,
301  std::size_t size,
302  std::size_t file_offset,
303  std::size_t devPtr_offset)
304  {
305  if (_compat_mode) {
306  return posix_device_read(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
307  }
308  ssize_t ret = cuFileAPI::instance().Read(
309  _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
310  if (ret == -1) {
311  throw std::system_error(errno, std::generic_category(), "Unable to read file");
312  }
313  if (ret < -1) {
314  throw CUfileException(std::string{"cuFile error at: "} + __FILE__ + ":" +
315  KVIKIO_STRINGIFY(__LINE__) + ": " + CUFILE_ERRSTR(ret));
316  }
317  return ret;
318  }
319 
344  std::size_t write(const void* devPtr_base,
345  std::size_t size,
346  std::size_t file_offset,
347  std::size_t devPtr_offset)
348  {
349  _nbytes = 0; // Invalidate the computed file size
350 
351  if (_compat_mode) {
352  return posix_device_write(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
353  }
354  ssize_t ret = cuFileAPI::instance().Write(
355  _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
356  if (ret == -1) {
357  throw std::system_error(errno, std::generic_category(), "Unable to write file");
358  }
359  if (ret < -1) {
360  throw CUfileException(std::string{"cuFile error at: "} + __FILE__ + ":" +
361  KVIKIO_STRINGIFY(__LINE__) + ": " + CUFILE_ERRSTR(ret));
362  }
363  return ret;
364  }
365 
386  std::future<std::size_t> pread(void* buf,
387  std::size_t size,
388  std::size_t file_offset = 0,
389  std::size_t task_size = defaults::task_size(),
390  std::size_t gds_threshold = defaults::gds_threshold())
391  {
392  if (is_host_memory(buf)) {
393  auto op = [this](void* hostPtr_base,
394  std::size_t size,
395  std::size_t file_offset,
396  std::size_t hostPtr_offset) -> std::size_t {
397  char* buf = static_cast<char*>(hostPtr_base) + hostPtr_offset;
398  return posix_host_read(_fd_direct_off, buf, size, file_offset, false);
399  };
400 
401  return parallel_io(op, buf, size, file_offset, task_size, 0);
402  }
403 
404  CUcontext ctx = get_context_from_pointer(buf);
405 
406  // Shortcut that circumvent the threadpool and use the POSIX backend directly.
407  if (size < gds_threshold) {
408  auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
409  PushAndPopContext c(ctx);
410  return posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
411  };
412  return std::async(std::launch::deferred, task);
413  }
414 
415  // Regular case that use the threadpool and run the tasks in parallel
416  auto task = [this, ctx](void* devPtr_base,
417  std::size_t size,
418  std::size_t file_offset,
419  std::size_t devPtr_offset) -> std::size_t {
420  PushAndPopContext c(ctx);
421  return read(devPtr_base, size, file_offset, devPtr_offset);
422  };
423  auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
424  return parallel_io(task, devPtr_base, size, file_offset, task_size, devPtr_offset);
425  }
426 
447  std::future<std::size_t> pwrite(const void* buf,
448  std::size_t size,
449  std::size_t file_offset = 0,
450  std::size_t task_size = defaults::task_size(),
451  std::size_t gds_threshold = defaults::gds_threshold())
452  {
453  if (is_host_memory(buf)) {
454  auto op = [this](const void* hostPtr_base,
455  std::size_t size,
456  std::size_t file_offset,
457  std::size_t hostPtr_offset) -> std::size_t {
458  const char* buf = static_cast<const char*>(hostPtr_base) + hostPtr_offset;
459  return posix_host_write(_fd_direct_off, buf, size, file_offset, false);
460  };
461 
462  return parallel_io(op, buf, size, file_offset, task_size, 0);
463  }
464 
465  CUcontext ctx = get_context_from_pointer(buf);
466 
467  // Shortcut that circumvent the threadpool and use the POSIX backend directly.
468  if (size < gds_threshold) {
469  auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
470  PushAndPopContext c(ctx);
471  return posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
472  };
473  return std::async(std::launch::deferred, task);
474  }
475 
476  // Regular case that use the threadpool and run the tasks in parallel
477  auto op = [this, ctx](const void* devPtr_base,
478  std::size_t size,
479  std::size_t file_offset,
480  std::size_t devPtr_offset) -> std::size_t {
481  PushAndPopContext c(ctx);
482  return write(devPtr_base, size, file_offset, devPtr_offset);
483  };
484  auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
485  return parallel_io(op, devPtr_base, size, file_offset, task_size, devPtr_offset);
486  }
487 
522  void read_async(void* devPtr_base,
523  std::size_t* size_p,
524  off_t* file_offset_p,
525  off_t* devPtr_offset_p,
526  ssize_t* bytes_read_p,
527  CUstream stream)
528  {
529  if (kvikio::is_batch_and_stream_available() && !_compat_mode) {
530  CUFILE_TRY(cuFileAPI::instance().ReadAsync(
531  _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream));
532  return;
533  }
534  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
535  *bytes_read_p =
536  static_cast<ssize_t>(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
537  }
538 
564  [[nodiscard]] StreamFuture read_async(void* devPtr_base,
565  std::size_t size,
566  off_t file_offset = 0,
567  off_t devPtr_offset = 0,
568  CUstream stream = nullptr)
569  {
570  StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
571  auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_] =
572  ret.get_args();
573  read_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_);
574  return ret;
575  }
576 
612  void write_async(void* devPtr_base,
613  std::size_t* size_p,
614  off_t* file_offset_p,
615  off_t* devPtr_offset_p,
616  ssize_t* bytes_written_p,
617  CUstream stream)
618  {
619  if (kvikio::is_batch_and_stream_available() && !_compat_mode) {
620  CUFILE_TRY(cuFileAPI::instance().WriteAsync(
621  _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream));
622  return;
623  }
624  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
625  *bytes_written_p =
626  static_cast<ssize_t>(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
627  }
628 
654  [[nodiscard]] StreamFuture write_async(void* devPtr_base,
655  std::size_t size,
656  off_t file_offset = 0,
657  off_t devPtr_offset = 0,
658  CUstream stream = nullptr)
659  {
660  StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
661  auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_] =
662  ret.get_args();
663  write_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_);
664  return ret;
665  }
666 
675  [[nodiscard]] bool is_compat_mode_on() const noexcept { return _compat_mode; }
676 };
677 
678 } // namespace kvikio
Handle of an open file registered with cufile.
void close() noexcept
Deregister the file and close the two files.
FileHandle(const FileHandle &)=delete
FileHandle support move semantic but isn't copyable.
void read_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_read_p, CUstream stream)
Reads specified bytes from the file into the device memory asynchronously.
std::future< std::size_t > pread(void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold())
Reads specified bytes from the file into the device or host memory in parallel.
CUfileHandle_t handle()
Get the underlying cuFile file handle.
int fd_open_flags() const
Get the flags of one of the file descriptors (see open(2))
FileHandle(const std::string &file_path, const std::string &flags="r", mode_t mode=m644, bool compat_mode=defaults::compat_mode())
Construct a file handle from a file path.
bool is_compat_mode_on() const noexcept
Returns true if the compatibility mode has been enabled for this file.
StreamFuture write_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Writes specified bytes from the device memory into the file asynchronously.
std::size_t read(void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset)
Reads specified bytes from the file into the device memory.
StreamFuture read_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Reads specified bytes from the file into the device memory asynchronously.
int fd() const noexcept
Get one of the file descriptors.
void write_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_written_p, CUstream stream)
Writes specified bytes from the device memory into the file asynchronously.
std::size_t write(const void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset)
Writes specified bytes from the device memory into the file.
std::size_t nbytes() const
Get the file size.
std::future< std::size_t > pwrite(const void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold())
Writes specified bytes from device or host memory into the file in parallel.
Push CUDA context on creation and pop it on destruction.
Definition: utils.hpp:214
Future of an asynchronous IO operation.
Definition: stream.hpp:47
std::tuple< void *, std::size_t *, off_t *, off_t *, ssize_t *, CUstream > get_args() const
Return the arguments of the future call.
Definition: stream.hpp:104
static std::size_t task_size()
Get the default task size used for parallel IO operations.
Definition: defaults.hpp:206
static bool compat_mode()
Return whether the KvikIO library is running in compatibility mode or not.
Definition: defaults.hpp:148
static std::size_t gds_threshold()
Get the default GDS threshold, which is the minimum size to use GDS (in bytes).
Definition: defaults.hpp:226