file_handle.hpp
1 /*
2  * Copyright (c) 2021-2024, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <sys/stat.h>
19 #include <sys/types.h>
20 
21 #include <cstddef>
22 #include <cstdlib>
23 #include <stdexcept>
24 #include <system_error>
25 #include <utility>
26 
27 #include <kvikio/buffer.hpp>
28 #include <kvikio/cufile/config.hpp>
29 #include <kvikio/defaults.hpp>
30 #include <kvikio/error.hpp>
31 #include <kvikio/parallel_operation.hpp>
32 #include <kvikio/posix_io.hpp>
33 #include <kvikio/shim/cufile.hpp>
34 #include <kvikio/stream.hpp>
35 #include <kvikio/utils.hpp>
36 
37 namespace kvikio {
38 
44 class FileHandle {
45  private:
46  // We use two file descriptors, one opened with the O_DIRECT flag and one without.
47  int _fd_direct_on{-1};
48  int _fd_direct_off{-1};
49  bool _initialized{false};
50  CompatMode _compat_mode{CompatMode::AUTO};
51  mutable std::size_t _nbytes{0}; // The size of the underlying file, zero means unknown.
52  CUfileHandle_t _handle{};
53 
63  bool is_compat_mode_preferred_for_async(CompatMode requested_compat_mode)
64  {
65  if (defaults::is_compat_mode_preferred(requested_compat_mode)) { return true; }
66 
67  if (!is_stream_api_available()) {
68  if (requested_compat_mode == CompatMode::AUTO) { return true; }
69  throw std::runtime_error("Missing the cuFile stream api.");
70  }
71 
72  // When checking for availability, we also check if cuFile's config file exists. This is
73  // because even when the stream API is available, it doesn't work if no config file exists.
74  if (config_path().empty()) {
75  if (requested_compat_mode == CompatMode::AUTO) { return true; }
76  throw std::runtime_error("Missing cuFile configuration file.");
77  }
78  return false;
79  }
80 
81  public:
82  static constexpr mode_t m644 = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
83  FileHandle() noexcept = default;
84 
101  FileHandle(const std::string& file_path,
102  const std::string& flags = "r",
103  mode_t mode = m644,
104  CompatMode compat_mode = defaults::compat_mode());
105 
109  FileHandle(const FileHandle&) = delete;
110  FileHandle& operator=(FileHandle const&) = delete;
111  FileHandle(FileHandle&& o) noexcept
112  : _fd_direct_on{std::exchange(o._fd_direct_on, -1)},
113  _fd_direct_off{std::exchange(o._fd_direct_off, -1)},
114  _initialized{std::exchange(o._initialized, false)},
115  _compat_mode{std::exchange(o._compat_mode, CompatMode::AUTO)},
116  _nbytes{std::exchange(o._nbytes, 0)},
117  _handle{std::exchange(o._handle, CUfileHandle_t{})}
118  {
119  }
120  FileHandle& operator=(FileHandle&& o) noexcept
121  {
122  _fd_direct_on = std::exchange(o._fd_direct_on, -1);
123  _fd_direct_off = std::exchange(o._fd_direct_off, -1);
124  _initialized = std::exchange(o._initialized, false);
125  _compat_mode = std::exchange(o._compat_mode, CompatMode::AUTO);
126  _nbytes = std::exchange(o._nbytes, 0);
127  _handle = std::exchange(o._handle, CUfileHandle_t{});
128  return *this;
129  }
130  ~FileHandle() noexcept { close(); }
131 
137  [[nodiscard]] bool closed() const noexcept { return !_initialized; }
138 
142  void close() noexcept
143  {
144  if (closed()) { return; }
145 
146  if (!is_compat_mode_preferred()) { cuFileAPI::instance().HandleDeregister(_handle); }
147  _compat_mode = CompatMode::AUTO;
148  ::close(_fd_direct_off);
149  if (_fd_direct_on != -1) { ::close(_fd_direct_on); }
150  _fd_direct_on = -1;
151  _fd_direct_off = -1;
152  _initialized = false;
153  }
154 
163  [[nodiscard]] CUfileHandle_t handle()
164  {
165  if (closed()) { throw CUfileException("File handle is closed"); }
166  if (is_compat_mode_preferred()) {
167  throw CUfileException("The underlying cuFile handle isn't available in compatibility mode");
168  }
169  return _handle;
170  }
171 
181  [[nodiscard]] int fd() const noexcept { return _fd_direct_off; }
182 
192  [[nodiscard]] int fd_open_flags() const;
193 
201  [[nodiscard]] std::size_t nbytes() const;
202 
233  std::size_t read(void* devPtr_base,
234  std::size_t size,
235  std::size_t file_offset,
236  std::size_t devPtr_offset,
237  bool sync_default_stream = true)
238  {
239  if (is_compat_mode_preferred()) {
240  return detail::posix_device_read(
241  _fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
242  }
243  if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); }
244 
245  KVIKIO_NVTX_SCOPED_RANGE("cufileRead()", size);
246  ssize_t ret = cuFileAPI::instance().Read(
247  _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
248  CUFILE_CHECK_BYTES_DONE(ret);
249  return ret;
250  }
251 
283  std::size_t write(const void* devPtr_base,
284  std::size_t size,
285  std::size_t file_offset,
286  std::size_t devPtr_offset,
287  bool sync_default_stream = true)
288  {
289  _nbytes = 0; // Invalidate the computed file size
290 
291  if (is_compat_mode_preferred()) {
292  return detail::posix_device_write(
293  _fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
294  }
295  if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); }
296 
297  KVIKIO_NVTX_SCOPED_RANGE("cufileWrite()", size);
298  ssize_t ret = cuFileAPI::instance().Write(
299  _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
300  if (ret == -1) {
301  throw std::system_error(errno, std::generic_category(), "Unable to write file");
302  }
303  if (ret < -1) {
304  throw CUfileException(std::string{"cuFile error at: "} + __FILE__ + ":" +
305  KVIKIO_STRINGIFY(__LINE__) + ": " + CUFILE_ERRSTR(ret));
306  }
307  return ret;
308  }
309 
337  std::future<std::size_t> pread(void* buf,
338  std::size_t size,
339  std::size_t file_offset = 0,
340  std::size_t task_size = defaults::task_size(),
341  std::size_t gds_threshold = defaults::gds_threshold(),
342  bool sync_default_stream = true)
343  {
344  KVIKIO_NVTX_MARKER("FileHandle::pread()", size);
345  if (is_host_memory(buf)) {
346  auto op = [this](void* hostPtr_base,
347  std::size_t size,
348  std::size_t file_offset,
349  std::size_t hostPtr_offset) -> std::size_t {
350  char* buf = static_cast<char*>(hostPtr_base) + hostPtr_offset;
351  return detail::posix_host_read<detail::PartialIO::NO>(
352  _fd_direct_off, buf, size, file_offset);
353  };
354 
355  return parallel_io(op, buf, size, file_offset, task_size, 0);
356  }
357 
358  CUcontext ctx = get_context_from_pointer(buf);
359 
360  // Shortcut that circumvent the threadpool and use the POSIX backend directly.
361  if (size < gds_threshold) {
362  auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
363  PushAndPopContext c(ctx);
364  return detail::posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
365  };
366  return std::async(std::launch::deferred, task);
367  }
368 
369  // Let's synchronize once instead of in each task.
370  if (sync_default_stream && !is_compat_mode_preferred()) {
371  PushAndPopContext c(ctx);
372  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr));
373  }
374 
375  // Regular case that use the threadpool and run the tasks in parallel
376  auto task = [this, ctx](void* devPtr_base,
377  std::size_t size,
378  std::size_t file_offset,
379  std::size_t devPtr_offset) -> std::size_t {
380  PushAndPopContext c(ctx);
381  return read(devPtr_base, size, file_offset, devPtr_offset, /* sync_default_stream = */ false);
382  };
383  auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
384  return parallel_io(task, devPtr_base, size, file_offset, task_size, devPtr_offset);
385  }
386 
414  std::future<std::size_t> pwrite(const void* buf,
415  std::size_t size,
416  std::size_t file_offset = 0,
417  std::size_t task_size = defaults::task_size(),
418  std::size_t gds_threshold = defaults::gds_threshold(),
419  bool sync_default_stream = true)
420  {
421  KVIKIO_NVTX_MARKER("FileHandle::pwrite()", size);
422  if (is_host_memory(buf)) {
423  auto op = [this](const void* hostPtr_base,
424  std::size_t size,
425  std::size_t file_offset,
426  std::size_t hostPtr_offset) -> std::size_t {
427  const char* buf = static_cast<const char*>(hostPtr_base) + hostPtr_offset;
428  return detail::posix_host_write<detail::PartialIO::NO>(
429  _fd_direct_off, buf, size, file_offset);
430  };
431 
432  return parallel_io(op, buf, size, file_offset, task_size, 0);
433  }
434 
435  CUcontext ctx = get_context_from_pointer(buf);
436 
437  // Shortcut that circumvent the threadpool and use the POSIX backend directly.
438  if (size < gds_threshold) {
439  auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
440  PushAndPopContext c(ctx);
441  return detail::posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
442  };
443  return std::async(std::launch::deferred, task);
444  }
445 
446  // Let's synchronize once instead of in each task.
447  if (sync_default_stream && !is_compat_mode_preferred()) {
448  PushAndPopContext c(ctx);
449  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr));
450  }
451 
452  // Regular case that use the threadpool and run the tasks in parallel
453  auto op = [this, ctx](const void* devPtr_base,
454  std::size_t size,
455  std::size_t file_offset,
456  std::size_t devPtr_offset) -> std::size_t {
457  PushAndPopContext c(ctx);
458  return write(
459  devPtr_base, size, file_offset, devPtr_offset, /* sync_default_stream = */ false);
460  };
461  auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
462  return parallel_io(op, devPtr_base, size, file_offset, task_size, devPtr_offset);
463  }
464 
499  void read_async(void* devPtr_base,
500  std::size_t* size_p,
501  off_t* file_offset_p,
502  off_t* devPtr_offset_p,
503  ssize_t* bytes_read_p,
504  CUstream stream)
505  {
506  if (is_compat_mode_preferred_for_async(_compat_mode)) {
507  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
508  *bytes_read_p =
509  static_cast<ssize_t>(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
510  } else {
511  CUFILE_TRY(cuFileAPI::instance().ReadAsync(
512  _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream));
513  }
514  }
515 
541  [[nodiscard]] StreamFuture read_async(void* devPtr_base,
542  std::size_t size,
543  off_t file_offset = 0,
544  off_t devPtr_offset = 0,
545  CUstream stream = nullptr)
546  {
547  StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
548  auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_] =
549  ret.get_args();
550  read_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_);
551  return ret;
552  }
553 
589  void write_async(void* devPtr_base,
590  std::size_t* size_p,
591  off_t* file_offset_p,
592  off_t* devPtr_offset_p,
593  ssize_t* bytes_written_p,
594  CUstream stream)
595  {
596  if (is_compat_mode_preferred_for_async(_compat_mode)) {
597  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
598  *bytes_written_p =
599  static_cast<ssize_t>(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
600  } else {
601  CUFILE_TRY(cuFileAPI::instance().WriteAsync(
602  _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream));
603  }
604  }
605 
631  [[nodiscard]] StreamFuture write_async(void* devPtr_base,
632  std::size_t size,
633  off_t file_offset = 0,
634  off_t devPtr_offset = 0,
635  CUstream stream = nullptr)
636  {
637  StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
638  auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_] =
639  ret.get_args();
640  write_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_);
641  return ret;
642  }
643 
653  [[nodiscard]] bool is_compat_mode_preferred() const noexcept
654  {
655  return defaults::is_compat_mode_preferred(_compat_mode);
656  }
657 
668  [[nodiscard]] bool is_compat_mode_preferred_for_async() const noexcept
669  {
670  static bool is_extra_symbol_available = is_stream_api_available();
671  static bool is_config_path_empty = config_path().empty();
672  return is_compat_mode_preferred() || !is_extra_symbol_available || is_config_path_empty;
673  }
674 };
675 
676 } // namespace kvikio
Handle of an open file registered with cufile.
Definition: file_handle.hpp:44
void close() noexcept
Deregister the file and close the two files.
FileHandle(const FileHandle &)=delete
FileHandle support move semantic but isn't copyable.
std::future< std::size_t > pread(void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold(), bool sync_default_stream=true)
Reads specified bytes from the file into the device or host memory in parallel.
void read_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_read_p, CUstream stream)
Reads specified bytes from the file into the device memory asynchronously.
CUfileHandle_t handle()
Get the underlying cuFile file handle.
int fd_open_flags() const
Get the flags of one of the file descriptors (see open(2))
StreamFuture write_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Writes specified bytes from the device memory into the file asynchronously.
std::size_t write(const void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset, bool sync_default_stream=true)
Writes specified bytes from the device memory into the file.
bool is_compat_mode_preferred_for_async() const noexcept
Returns true if the compatibility mode is expected to be ON for the asynchronous I/O on this file.
bool is_compat_mode_preferred() const noexcept
Returns true if the compatibility mode is expected to be ON for this file.
bool closed() const noexcept
Whether the file is closed according to its initialization status.
StreamFuture read_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Reads specified bytes from the file into the device memory asynchronously.
FileHandle(const std::string &file_path, const std::string &flags="r", mode_t mode=m644, CompatMode compat_mode=defaults::compat_mode())
Construct a file handle from a file path.
int fd() const noexcept
Get one of the file descriptors.
void write_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_written_p, CUstream stream)
Writes specified bytes from the device memory into the file asynchronously.
std::future< std::size_t > pwrite(const void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold(), bool sync_default_stream=true)
Writes specified bytes from device or host memory into the file in parallel.
std::size_t nbytes() const
Get the file size.
std::size_t read(void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset, bool sync_default_stream=true)
Reads specified bytes from the file into the device memory.
Push CUDA context on creation and pop it on destruction.
Definition: utils.hpp:233
Future of an asynchronous IO operation.
Definition: stream.hpp:48
std::tuple< void *, std::size_t *, off_t *, off_t *, ssize_t *, CUstream > get_args() const
Return the arguments of the future call.
Definition: stream.hpp:105
static std::size_t task_size()
Get the default task size used for parallel IO operations.
Definition: defaults.hpp:335
static bool is_compat_mode_preferred()
Whether the global compatibility mode from class defaults is expected to be ON.
Definition: defaults.hpp:284
static std::size_t gds_threshold()
Get the default GDS threshold, which is the minimum size to use GDS (in bytes).
Definition: defaults.hpp:361
static CompatMode compat_mode()
Return whether the KvikIO library is running in compatibility mode or not.
Definition: defaults.hpp:216
CompatMode
I/O compatibility mode.
Definition: defaults.hpp:38