file_handle.hpp
1 /*
2  * Copyright (c) 2021-2023, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <fcntl.h>
19 #include <sys/stat.h>
20 #include <sys/types.h>
21 #include <unistd.h>
22 
23 #include <cstddef>
24 #include <cstdlib>
25 #include <iostream>
26 #include <numeric>
27 #include <optional>
28 #include <system_error>
29 #include <utility>
30 
31 #include <kvikio/buffer.hpp>
32 #include <kvikio/defaults.hpp>
33 #include <kvikio/error.hpp>
34 #include <kvikio/parallel_operation.hpp>
35 #include <kvikio/posix_io.hpp>
36 #include <kvikio/shim/cufile.hpp>
37 #include <kvikio/stream.hpp>
38 #include <kvikio/utils.hpp>
39 
40 namespace kvikio {
41 namespace detail {
42 
50 inline int open_fd_parse_flags(const std::string& flags, bool o_direct)
51 {
52  int file_flags = -1;
53  if (flags.empty()) { throw std::invalid_argument("Unknown file open flag"); }
54  switch (flags[0]) {
55  case 'r':
56  file_flags = O_RDONLY;
57  if (flags[1] == '+') { file_flags = O_RDWR; }
58  break;
59  case 'w':
60  file_flags = O_WRONLY;
61  if (flags[1] == '+') { file_flags = O_RDWR; }
62  file_flags |= O_CREAT | O_TRUNC;
63  break;
64  case 'a': throw std::invalid_argument("Open flag 'a' isn't supported");
65  default: throw std::invalid_argument("Unknown file open flag");
66  }
67  file_flags |= O_CLOEXEC;
68  if (o_direct) { file_flags |= O_DIRECT; }
69  return file_flags;
70 }
71 
80 inline int open_fd(const std::string& file_path,
81  const std::string& flags,
82  bool o_direct,
83  mode_t mode)
84 {
85  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-vararg)
86  int fd = ::open(file_path.c_str(), open_fd_parse_flags(flags, o_direct), mode);
87  if (fd == -1) { throw std::system_error(errno, std::generic_category(), "Unable to open file"); }
88  return fd;
89 }
90 
96 [[nodiscard]] inline int open_flags(int fd)
97 {
98  int ret = fcntl(fd, F_GETFL); // NOLINT(cppcoreguidelines-pro-type-vararg)
99  if (ret == -1) {
100  throw std::system_error(errno, std::generic_category(), "Unable to retrieve open flags");
101  }
102  return ret;
103 }
104 
111 [[nodiscard]] inline std::size_t get_file_size(int file_descriptor)
112 {
113  struct stat st {};
114  int ret = fstat(file_descriptor, &st);
115  if (ret == -1) {
116  throw std::system_error(errno, std::generic_category(), "Unable to query file size");
117  }
118  return static_cast<std::size_t>(st.st_size);
119 }
120 
121 } // namespace detail
122 
128 class FileHandle {
129  private:
130  // We use two file descriptors, one opened with the O_DIRECT flag and one without.
131  int _fd_direct_on{-1};
132  int _fd_direct_off{-1};
133  bool _initialized{false};
134  bool _compat_mode{false};
135  mutable std::size_t _nbytes{0}; // The size of the underlying file, zero means unknown.
136  CUfileHandle_t _handle{};
137 
138  public:
139  static constexpr mode_t m644 = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
140  FileHandle() noexcept = default;
141 
158  FileHandle(const std::string& file_path,
159  const std::string& flags = "r",
160  mode_t mode = m644,
161  bool compat_mode = defaults::compat_mode())
162  : _fd_direct_off{detail::open_fd(file_path, flags, false, mode)},
163  _initialized{true},
164  _compat_mode{compat_mode}
165  {
166  try {
167  _fd_direct_on = detail::open_fd(file_path, flags, true, mode);
168  } catch (const std::system_error&) {
169  _compat_mode = true; // Fall back to compat mode if we cannot open the file with O_DIRECT
170  }
171 
172  if (_compat_mode) { return; }
173 #ifdef KVIKIO_CUFILE_FOUND
174  CUfileDescr_t desc{}; // It is important to set to zero!
175  desc.type = CU_FILE_HANDLE_TYPE_OPAQUE_FD;
176  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
177  desc.handle.fd = _fd_direct_on;
178  CUFILE_TRY(cuFileAPI::instance().HandleRegister(&_handle, &desc));
179 #endif
180  }
181 
185  FileHandle(const FileHandle&) = delete;
186  FileHandle& operator=(FileHandle const&) = delete;
187  FileHandle(FileHandle&& o) noexcept
188  : _fd_direct_on{std::exchange(o._fd_direct_on, -1)},
189  _fd_direct_off{std::exchange(o._fd_direct_off, -1)},
190  _initialized{std::exchange(o._initialized, false)},
191  _compat_mode{std::exchange(o._compat_mode, false)},
192  _nbytes{std::exchange(o._nbytes, 0)},
193  _handle{std::exchange(o._handle, CUfileHandle_t{})}
194  {
195  }
196  FileHandle& operator=(FileHandle&& o) noexcept
197  {
198  _fd_direct_on = std::exchange(o._fd_direct_on, -1);
199  _fd_direct_off = std::exchange(o._fd_direct_off, -1);
200  _initialized = std::exchange(o._initialized, false);
201  _compat_mode = std::exchange(o._compat_mode, false);
202  _nbytes = std::exchange(o._nbytes, 0);
203  _handle = std::exchange(o._handle, CUfileHandle_t{});
204  return *this;
205  }
206  ~FileHandle() noexcept { close(); }
207 
208  [[nodiscard]] bool closed() const noexcept { return !_initialized; }
209 
213  void close() noexcept
214  {
215  if (closed()) { return; }
216 
217  if (!_compat_mode) {
218 #ifdef KVIKIO_CUFILE_FOUND
219  cuFileAPI::instance().HandleDeregister(_handle);
220 #endif
221  }
222  ::close(_fd_direct_off);
223  if (_fd_direct_on != -1) { ::close(_fd_direct_on); }
224  _fd_direct_on = -1;
225  _fd_direct_off = -1;
226  _initialized = false;
227  }
228 
237  [[nodiscard]] CUfileHandle_t handle()
238  {
239  if (closed()) { throw CUfileException("File handle is closed"); }
240  if (_compat_mode) {
241  throw CUfileException("The underlying cuFile handle isn't available in compatibility mode");
242  }
243  return _handle;
244  }
245 
255  [[nodiscard]] int fd() const noexcept { return _fd_direct_off; }
256 
266  [[nodiscard]] int fd_open_flags() const { return detail::open_flags(_fd_direct_off); }
267 
275  [[nodiscard]] inline std::size_t nbytes() const
276  {
277  if (closed()) { return 0; }
278  if (_nbytes == 0) { _nbytes = detail::get_file_size(_fd_direct_off); }
279  return _nbytes;
280  }
281 
305  std::size_t read(void* devPtr_base,
306  std::size_t size,
307  std::size_t file_offset,
308  std::size_t devPtr_offset)
309  {
310  if (_compat_mode) {
311  return posix_device_read(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
312  }
313 #ifdef KVIKIO_CUFILE_FOUND
314  ssize_t ret = cuFileAPI::instance().Read(
315  _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
316  if (ret == -1) {
317  throw std::system_error(errno, std::generic_category(), "Unable to read file");
318  }
319  if (ret < -1) {
320  throw CUfileException(std::string{"cuFile error at: "} + __FILE__ + ":" +
321  KVIKIO_STRINGIFY(__LINE__) + ": " + CUFILE_ERRSTR(ret));
322  }
323  return ret;
324 #else
325  throw CUfileException("KvikIO not compiled with cuFile.h");
326 #endif
327  }
328 
353  std::size_t write(const void* devPtr_base,
354  std::size_t size,
355  std::size_t file_offset,
356  std::size_t devPtr_offset)
357  {
358  _nbytes = 0; // Invalidate the computed file size
359 
360  if (_compat_mode) {
361  return posix_device_write(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
362  }
363 #ifdef KVIKIO_CUFILE_FOUND
364  ssize_t ret = cuFileAPI::instance().Write(
365  _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
366  if (ret == -1) {
367  throw std::system_error(errno, std::generic_category(), "Unable to write file");
368  }
369  if (ret < -1) {
370  throw CUfileException(std::string{"cuFile error at: "} + __FILE__ + ":" +
371  KVIKIO_STRINGIFY(__LINE__) + ": " + CUFILE_ERRSTR(ret));
372  }
373  return ret;
374 #else
375  throw CUfileException("KvikIO not compiled with cuFile.h");
376 #endif
377  }
378 
399  std::future<std::size_t> pread(void* buf,
400  std::size_t size,
401  std::size_t file_offset = 0,
402  std::size_t task_size = defaults::task_size(),
403  std::size_t gds_threshold = defaults::gds_threshold())
404  {
405  if (is_host_memory(buf)) {
406  auto op = [this](void* hostPtr_base,
407  std::size_t size,
408  std::size_t file_offset,
409  std::size_t hostPtr_offset) -> std::size_t {
410  char* buf = static_cast<char*>(hostPtr_base) + hostPtr_offset;
411  return posix_host_read(_fd_direct_off, buf, size, file_offset, false);
412  };
413 
414  return parallel_io(op, buf, size, file_offset, task_size, 0);
415  }
416 
417  CUcontext ctx = get_context_from_pointer(buf);
418 
419  // Shortcut that circumvent the threadpool and use the POSIX backend directly.
420  if (size < gds_threshold) {
421  auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
422  PushAndPopContext c(ctx);
423  return posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
424  };
425  return std::async(std::launch::deferred, task);
426  }
427 
428  // Regular case that use the threadpool and run the tasks in parallel
429  auto task = [this, ctx](void* devPtr_base,
430  std::size_t size,
431  std::size_t file_offset,
432  std::size_t devPtr_offset) -> std::size_t {
433  PushAndPopContext c(ctx);
434  return read(devPtr_base, size, file_offset, devPtr_offset);
435  };
436  auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
437  return parallel_io(task, devPtr_base, size, file_offset, task_size, devPtr_offset);
438  }
439 
460  std::future<std::size_t> pwrite(const void* buf,
461  std::size_t size,
462  std::size_t file_offset = 0,
463  std::size_t task_size = defaults::task_size(),
464  std::size_t gds_threshold = defaults::gds_threshold())
465  {
466  if (is_host_memory(buf)) {
467  auto op = [this](const void* hostPtr_base,
468  std::size_t size,
469  std::size_t file_offset,
470  std::size_t hostPtr_offset) -> std::size_t {
471  const char* buf = static_cast<const char*>(hostPtr_base) + hostPtr_offset;
472  return posix_host_write(_fd_direct_off, buf, size, file_offset, false);
473  };
474 
475  return parallel_io(op, buf, size, file_offset, task_size, 0);
476  }
477 
478  CUcontext ctx = get_context_from_pointer(buf);
479 
480  // Shortcut that circumvent the threadpool and use the POSIX backend directly.
481  if (size < gds_threshold) {
482  auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
483  PushAndPopContext c(ctx);
484  return posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
485  };
486  return std::async(std::launch::deferred, task);
487  }
488 
489  // Regular case that use the threadpool and run the tasks in parallel
490  auto op = [this, ctx](const void* devPtr_base,
491  std::size_t size,
492  std::size_t file_offset,
493  std::size_t devPtr_offset) -> std::size_t {
494  PushAndPopContext c(ctx);
495  return write(devPtr_base, size, file_offset, devPtr_offset);
496  };
497  auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
498  return parallel_io(op, devPtr_base, size, file_offset, task_size, devPtr_offset);
499  }
500 
535  void read_async(void* devPtr_base,
536  std::size_t* size_p,
537  off_t* file_offset_p,
538  off_t* devPtr_offset_p,
539  ssize_t* bytes_read_p,
540  CUstream stream)
541  {
542 #ifdef KVIKIO_CUFILE_STREAM_API_FOUND
543  if (kvikio::is_batch_and_stream_available() && !_compat_mode) {
544  CUFILE_TRY(cuFileAPI::instance().ReadAsync(
545  _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream));
546  return;
547  }
548 #endif
549 
550  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
551  *bytes_read_p =
552  static_cast<ssize_t>(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
553  }
554 
580  [[nodiscard]] StreamFuture read_async(void* devPtr_base,
581  std::size_t size,
582  off_t file_offset = 0,
583  off_t devPtr_offset = 0,
584  CUstream stream = nullptr)
585  {
586  StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
587  auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_] =
588  ret.get_args();
589  read_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_);
590  return ret;
591  }
592 
628  void write_async(void* devPtr_base,
629  std::size_t* size_p,
630  off_t* file_offset_p,
631  off_t* devPtr_offset_p,
632  ssize_t* bytes_written_p,
633  CUstream stream)
634  {
635 #ifdef KVIKIO_CUFILE_STREAM_API_FOUND
636  if (kvikio::is_batch_and_stream_available() && !_compat_mode) {
637  CUFILE_TRY(cuFileAPI::instance().WriteAsync(
638  _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream));
639  return;
640  }
641 #endif
642 
643  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
644  *bytes_written_p =
645  static_cast<ssize_t>(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
646  }
647 
673  [[nodiscard]] StreamFuture write_async(void* devPtr_base,
674  std::size_t size,
675  off_t file_offset = 0,
676  off_t devPtr_offset = 0,
677  CUstream stream = nullptr)
678  {
679  StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
680  auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_] =
681  ret.get_args();
682  write_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_);
683  return ret;
684  }
685 
694  [[nodiscard]] bool is_compat_mode_on() const noexcept { return _compat_mode; }
695 };
696 
697 } // namespace kvikio
Handle of an open file registered with cufile.
void close() noexcept
Deregister the file and close the two files.
FileHandle(const FileHandle &)=delete
FileHandle support move semantic but isn't copyable.
void read_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_read_p, CUstream stream)
Reads specified bytes from the file into the device memory asynchronously.
std::future< std::size_t > pread(void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold())
Reads specified bytes from the file into the device or host memory in parallel.
CUfileHandle_t handle()
Get the underlying cuFile file handle.
int fd_open_flags() const
Get the flags of one of the file descriptors (see open(2))
FileHandle(const std::string &file_path, const std::string &flags="r", mode_t mode=m644, bool compat_mode=defaults::compat_mode())
Construct a file handle from a file path.
bool is_compat_mode_on() const noexcept
Returns true if the compatibility mode has been enabled for this file.
StreamFuture write_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Writes specified bytes from the device memory into the file asynchronously.
std::size_t read(void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset)
Reads specified bytes from the file into the device memory.
StreamFuture read_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Reads specified bytes from the file into the device memory asynchronously.
int fd() const noexcept
Get one of the file descriptors.
void write_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_written_p, CUstream stream)
Writes specified bytes from the device memory into the file asynchronously.
std::size_t write(const void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset)
Writes specified bytes from the device memory into the file.
std::size_t nbytes() const
Get the file size.
std::future< std::size_t > pwrite(const void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold())
Writes specified bytes from device or host memory into the file in parallel.
Push CUDA context on creation and pop it on destruction.
Definition: utils.hpp:214
Future of an asynchronous IO operation.
Definition: stream.hpp:47
std::tuple< void *, std::size_t *, off_t *, off_t *, ssize_t *, CUstream > get_args() const
Return the arguments of the future call.
Definition: stream.hpp:104
static std::size_t task_size()
Get the default task size used for parallel IO operations.
Definition: defaults.hpp:206
static bool compat_mode()
Return whether the KvikIO library is running in compatibility mode or not.
Definition: defaults.hpp:148
static std::size_t gds_threshold()
Get the default GDS threshold, which is the minimum size to use GDS (in bytes).
Definition: defaults.hpp:226