file_handle.hpp
1 /*
2  * Copyright (c) 2021-2024, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <fcntl.h>
19 #include <sys/stat.h>
20 #include <sys/types.h>
21 #include <unistd.h>
22 
23 #include <cstddef>
24 #include <cstdlib>
25 #include <iostream>
26 #include <numeric>
27 #include <optional>
28 #include <stdexcept>
29 #include <system_error>
30 #include <utility>
31 
32 #include <kvikio/buffer.hpp>
33 #include <kvikio/cufile_config.hpp>
34 #include <kvikio/defaults.hpp>
35 #include <kvikio/error.hpp>
36 #include <kvikio/parallel_operation.hpp>
37 #include <kvikio/posix_io.hpp>
38 #include <kvikio/shim/cufile.hpp>
39 #include <kvikio/stream.hpp>
40 #include <kvikio/utils.hpp>
41 
42 namespace kvikio {
43 namespace detail {
44 
55 inline int open_fd_parse_flags(const std::string& flags, bool o_direct)
56 {
57  int file_flags = -1;
58  if (flags.empty()) { throw std::invalid_argument("Unknown file open flag"); }
59  switch (flags[0]) {
60  case 'r':
61  file_flags = O_RDONLY;
62  if (flags[1] == '+') { file_flags = O_RDWR; }
63  break;
64  case 'w':
65  file_flags = O_WRONLY;
66  if (flags[1] == '+') { file_flags = O_RDWR; }
67  file_flags |= O_CREAT | O_TRUNC;
68  break;
69  case 'a': throw std::invalid_argument("Open flag 'a' isn't supported");
70  default: throw std::invalid_argument("Unknown file open flag");
71  }
72  file_flags |= O_CLOEXEC;
73  if (o_direct) {
74 #if defined(O_DIRECT)
75  file_flags |= O_DIRECT;
76 #else
77  throw std::invalid_argument("'o_direct' flag unsupported on this platform");
78 #endif
79  }
80  return file_flags;
81 }
82 
91 inline int open_fd(const std::string& file_path,
92  const std::string& flags,
93  bool o_direct,
94  mode_t mode)
95 {
96  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-vararg)
97  int fd = ::open(file_path.c_str(), open_fd_parse_flags(flags, o_direct), mode);
98  if (fd == -1) { throw std::system_error(errno, std::generic_category(), "Unable to open file"); }
99  return fd;
100 }
101 
107 [[nodiscard]] inline int open_flags(int fd)
108 {
109  int ret = fcntl(fd, F_GETFL); // NOLINT(cppcoreguidelines-pro-type-vararg)
110  if (ret == -1) {
111  throw std::system_error(errno, std::generic_category(), "Unable to retrieve open flags");
112  }
113  return ret;
114 }
115 
122 [[nodiscard]] inline std::size_t get_file_size(int file_descriptor)
123 {
124  struct stat st {};
125  int ret = fstat(file_descriptor, &st);
126  if (ret == -1) {
127  throw std::system_error(errno, std::generic_category(), "Unable to query file size");
128  }
129  return static_cast<std::size_t>(st.st_size);
130 }
131 
132 } // namespace detail
133 
139 class FileHandle {
140  private:
141  // We use two file descriptors, one opened with the O_DIRECT flag and one without.
142  int _fd_direct_on{-1};
143  int _fd_direct_off{-1};
144  bool _initialized{false};
145  bool _compat_mode{false};
146  mutable std::size_t _nbytes{0}; // The size of the underlying file, zero means unknown.
147  CUfileHandle_t _handle{};
148 
149  public:
150  static constexpr mode_t m644 = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
151  FileHandle() noexcept = default;
152 
169  FileHandle(const std::string& file_path,
170  const std::string& flags = "r",
171  mode_t mode = m644,
172  bool compat_mode = defaults::compat_mode())
173  : _fd_direct_off{detail::open_fd(file_path, flags, false, mode)},
174  _initialized{true},
175  _compat_mode{compat_mode}
176  {
177  if (_compat_mode) {
178  return; // Nothing to do in compatibility mode
179  }
180 
181  // Try to open the file with the O_DIRECT flag. Fall back to compatibility mode, if it fails.
182  try {
183  _fd_direct_on = detail::open_fd(file_path, flags, true, mode);
184  } catch (const std::system_error&) {
185  _compat_mode = true;
186  } catch (const std::invalid_argument&) {
187  _compat_mode = true;
188  }
189 
190  // Create a cuFile handle, if not in compatibility mode
191  if (!_compat_mode) {
192  CUfileDescr_t desc{}; // It is important to set to zero!
193  desc.type = CU_FILE_HANDLE_TYPE_OPAQUE_FD;
194  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
195  desc.handle.fd = _fd_direct_on;
196  CUFILE_TRY(cuFileAPI::instance().HandleRegister(&_handle, &desc));
197  }
198  }
199 
203  FileHandle(const FileHandle&) = delete;
204  FileHandle& operator=(FileHandle const&) = delete;
205  FileHandle(FileHandle&& o) noexcept
206  : _fd_direct_on{std::exchange(o._fd_direct_on, -1)},
207  _fd_direct_off{std::exchange(o._fd_direct_off, -1)},
208  _initialized{std::exchange(o._initialized, false)},
209  _compat_mode{std::exchange(o._compat_mode, false)},
210  _nbytes{std::exchange(o._nbytes, 0)},
211  _handle{std::exchange(o._handle, CUfileHandle_t{})}
212  {
213  }
214  FileHandle& operator=(FileHandle&& o) noexcept
215  {
216  _fd_direct_on = std::exchange(o._fd_direct_on, -1);
217  _fd_direct_off = std::exchange(o._fd_direct_off, -1);
218  _initialized = std::exchange(o._initialized, false);
219  _compat_mode = std::exchange(o._compat_mode, false);
220  _nbytes = std::exchange(o._nbytes, 0);
221  _handle = std::exchange(o._handle, CUfileHandle_t{});
222  return *this;
223  }
224  ~FileHandle() noexcept { close(); }
225 
226  [[nodiscard]] bool closed() const noexcept { return !_initialized; }
227 
231  void close() noexcept
232  {
233  if (closed()) { return; }
234 
235  if (!_compat_mode) { cuFileAPI::instance().HandleDeregister(_handle); }
236  ::close(_fd_direct_off);
237  if (_fd_direct_on != -1) { ::close(_fd_direct_on); }
238  _fd_direct_on = -1;
239  _fd_direct_off = -1;
240  _initialized = false;
241  }
242 
251  [[nodiscard]] CUfileHandle_t handle()
252  {
253  if (closed()) { throw CUfileException("File handle is closed"); }
254  if (_compat_mode) {
255  throw CUfileException("The underlying cuFile handle isn't available in compatibility mode");
256  }
257  return _handle;
258  }
259 
269  [[nodiscard]] int fd() const noexcept { return _fd_direct_off; }
270 
280  [[nodiscard]] int fd_open_flags() const { return detail::open_flags(_fd_direct_off); }
281 
289  [[nodiscard]] std::size_t nbytes() const
290  {
291  if (closed()) { return 0; }
292  if (_nbytes == 0) { _nbytes = detail::get_file_size(_fd_direct_off); }
293  return _nbytes;
294  }
295 
326  std::size_t read(void* devPtr_base,
327  std::size_t size,
328  std::size_t file_offset,
329  std::size_t devPtr_offset,
330  bool sync_default_stream = true)
331  {
332  if (_compat_mode) {
333  return posix_device_read(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
334  }
335  if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); }
336 
337  KVIKIO_NVTX_FUNC_RANGE("cufileRead()", size);
338  ssize_t ret = cuFileAPI::instance().Read(
339  _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
340  CUFILE_CHECK_BYTES_DONE(ret);
341  return ret;
342  }
343 
375  std::size_t write(const void* devPtr_base,
376  std::size_t size,
377  std::size_t file_offset,
378  std::size_t devPtr_offset,
379  bool sync_default_stream = true)
380  {
381  _nbytes = 0; // Invalidate the computed file size
382 
383  if (_compat_mode) {
384  return posix_device_write(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
385  }
386  if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); }
387 
388  KVIKIO_NVTX_FUNC_RANGE("cufileWrite()", size);
389  ssize_t ret = cuFileAPI::instance().Write(
390  _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
391  if (ret == -1) {
392  throw std::system_error(errno, std::generic_category(), "Unable to write file");
393  }
394  if (ret < -1) {
395  throw CUfileException(std::string{"cuFile error at: "} + __FILE__ + ":" +
396  KVIKIO_STRINGIFY(__LINE__) + ": " + CUFILE_ERRSTR(ret));
397  }
398  return ret;
399  }
400 
428  std::future<std::size_t> pread(void* buf,
429  std::size_t size,
430  std::size_t file_offset = 0,
431  std::size_t task_size = defaults::task_size(),
432  std::size_t gds_threshold = defaults::gds_threshold(),
433  bool sync_default_stream = true)
434  {
435  if (is_host_memory(buf)) {
436  auto op = [this](void* hostPtr_base,
437  std::size_t size,
438  std::size_t file_offset,
439  std::size_t hostPtr_offset) -> std::size_t {
440  char* buf = static_cast<char*>(hostPtr_base) + hostPtr_offset;
441  return posix_host_read(_fd_direct_off, buf, size, file_offset, false);
442  };
443 
444  return parallel_io(op, buf, size, file_offset, task_size, 0);
445  }
446 
447  CUcontext ctx = get_context_from_pointer(buf);
448 
449  // Shortcut that circumvent the threadpool and use the POSIX backend directly.
450  if (size < gds_threshold) {
451  auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
452  PushAndPopContext c(ctx);
453  return posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
454  };
455  return std::async(std::launch::deferred, task);
456  }
457 
458  // Let's synchronize once instead of in each task.
459  if (sync_default_stream && !_compat_mode) {
460  PushAndPopContext c(ctx);
461  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr));
462  }
463 
464  // Regular case that use the threadpool and run the tasks in parallel
465  auto task = [this, ctx](void* devPtr_base,
466  std::size_t size,
467  std::size_t file_offset,
468  std::size_t devPtr_offset) -> std::size_t {
469  PushAndPopContext c(ctx);
470  return read(devPtr_base, size, file_offset, devPtr_offset, /* sync_default_stream = */ false);
471  };
472  auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
473  return parallel_io(task, devPtr_base, size, file_offset, task_size, devPtr_offset);
474  }
475 
503  std::future<std::size_t> pwrite(const void* buf,
504  std::size_t size,
505  std::size_t file_offset = 0,
506  std::size_t task_size = defaults::task_size(),
507  std::size_t gds_threshold = defaults::gds_threshold(),
508  bool sync_default_stream = true)
509  {
510  if (is_host_memory(buf)) {
511  auto op = [this](const void* hostPtr_base,
512  std::size_t size,
513  std::size_t file_offset,
514  std::size_t hostPtr_offset) -> std::size_t {
515  const char* buf = static_cast<const char*>(hostPtr_base) + hostPtr_offset;
516  return posix_host_write(_fd_direct_off, buf, size, file_offset, false);
517  };
518 
519  return parallel_io(op, buf, size, file_offset, task_size, 0);
520  }
521 
522  CUcontext ctx = get_context_from_pointer(buf);
523 
524  // Shortcut that circumvent the threadpool and use the POSIX backend directly.
525  if (size < gds_threshold) {
526  auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
527  PushAndPopContext c(ctx);
528  return posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
529  };
530  return std::async(std::launch::deferred, task);
531  }
532 
533  // Let's synchronize once instead of in each task.
534  if (sync_default_stream && !_compat_mode) {
535  PushAndPopContext c(ctx);
536  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr));
537  }
538 
539  // Regular case that use the threadpool and run the tasks in parallel
540  auto op = [this, ctx](const void* devPtr_base,
541  std::size_t size,
542  std::size_t file_offset,
543  std::size_t devPtr_offset) -> std::size_t {
544  PushAndPopContext c(ctx);
545  return write(
546  devPtr_base, size, file_offset, devPtr_offset, /* sync_default_stream = */ false);
547  };
548  auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
549  return parallel_io(op, devPtr_base, size, file_offset, task_size, devPtr_offset);
550  }
551 
586  void read_async(void* devPtr_base,
587  std::size_t* size_p,
588  off_t* file_offset_p,
589  off_t* devPtr_offset_p,
590  ssize_t* bytes_read_p,
591  CUstream stream)
592  {
593  // When checking for availability, we also check if cuFile's config file exist. This is because
594  // even when the stream API is available, it doesn't work if no config file exist.
595  if (kvikio::is_batch_and_stream_available() && !_compat_mode && !config_path().empty()) {
596  CUFILE_TRY(cuFileAPI::instance().ReadAsync(
597  _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream));
598  return;
599  }
600  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
601  *bytes_read_p =
602  static_cast<ssize_t>(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
603  }
604 
630  [[nodiscard]] StreamFuture read_async(void* devPtr_base,
631  std::size_t size,
632  off_t file_offset = 0,
633  off_t devPtr_offset = 0,
634  CUstream stream = nullptr)
635  {
636  StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
637  auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_] =
638  ret.get_args();
639  read_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_);
640  return ret;
641  }
642 
678  void write_async(void* devPtr_base,
679  std::size_t* size_p,
680  off_t* file_offset_p,
681  off_t* devPtr_offset_p,
682  ssize_t* bytes_written_p,
683  CUstream stream)
684  {
685  // When checking for availability, we also check if cuFile's config file exist. This is because
686  // even when the stream API is available, it doesn't work if no config file exist.
687  if (kvikio::is_batch_and_stream_available() && !_compat_mode && !config_path().empty()) {
688  CUFILE_TRY(cuFileAPI::instance().WriteAsync(
689  _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream));
690  return;
691  }
692  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
693  *bytes_written_p =
694  static_cast<ssize_t>(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
695  }
696 
722  [[nodiscard]] StreamFuture write_async(void* devPtr_base,
723  std::size_t size,
724  off_t file_offset = 0,
725  off_t devPtr_offset = 0,
726  CUstream stream = nullptr)
727  {
728  StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
729  auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_] =
730  ret.get_args();
731  write_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_);
732  return ret;
733  }
734 
743  [[nodiscard]] bool is_compat_mode_on() const noexcept { return _compat_mode; }
744 };
745 
746 } // namespace kvikio
Handle of an open file registered with cufile.
void close() noexcept
Deregister the file and close the two files.
FileHandle(const FileHandle &)=delete
FileHandle support move semantic but isn't copyable.
std::future< std::size_t > pread(void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold(), bool sync_default_stream=true)
Reads specified bytes from the file into the device or host memory in parallel.
void read_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_read_p, CUstream stream)
Reads specified bytes from the file into the device memory asynchronously.
CUfileHandle_t handle()
Get the underlying cuFile file handle.
int fd_open_flags() const
Get the flags of one of the file descriptors (see open(2))
FileHandle(const std::string &file_path, const std::string &flags="r", mode_t mode=m644, bool compat_mode=defaults::compat_mode())
Construct a file handle from a file path.
bool is_compat_mode_on() const noexcept
Returns true if the compatibility mode has been enabled for this file.
StreamFuture write_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Writes specified bytes from the device memory into the file asynchronously.
std::size_t write(const void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset, bool sync_default_stream=true)
Writes specified bytes from the device memory into the file.
StreamFuture read_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Reads specified bytes from the file into the device memory asynchronously.
int fd() const noexcept
Get one of the file descriptors.
void write_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_written_p, CUstream stream)
Writes specified bytes from the device memory into the file asynchronously.
std::future< std::size_t > pwrite(const void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold(), bool sync_default_stream=true)
Writes specified bytes from device or host memory into the file in parallel.
std::size_t nbytes() const
Get the file size.
std::size_t read(void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset, bool sync_default_stream=true)
Reads specified bytes from the file into the device memory.
Push CUDA context on creation and pop it on destruction.
Definition: utils.hpp:247
Future of an asynchronous IO operation.
Definition: stream.hpp:47
std::tuple< void *, std::size_t *, off_t *, off_t *, ssize_t *, CUstream > get_args() const
Return the arguments of the future call.
Definition: stream.hpp:104
static std::size_t task_size()
Get the default task size used for parallel IO operations.
Definition: defaults.hpp:222
static bool compat_mode()
Return whether the KvikIO library is running in compatibility mode or not.
Definition: defaults.hpp:161
static std::size_t gds_threshold()
Get the default GDS threshold, which is the minimum size to use GDS (in bytes).
Definition: defaults.hpp:248