file_handle.hpp
1 /*
2  * Copyright (c) 2021-2024, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <sys/stat.h>
19 #include <sys/types.h>
20 
21 #include <cstddef>
22 #include <cstdlib>
23 #include <stdexcept>
24 #include <system_error>
25 #include <utility>
26 
27 #include <kvikio/buffer.hpp>
28 #include <kvikio/cufile/config.hpp>
29 #include <kvikio/defaults.hpp>
30 #include <kvikio/error.hpp>
31 #include <kvikio/parallel_operation.hpp>
32 #include <kvikio/posix_io.hpp>
33 #include <kvikio/shim/cufile.hpp>
34 #include <kvikio/stream.hpp>
35 #include <kvikio/utils.hpp>
36 
37 namespace kvikio {
38 
44 class FileHandle {
45  private:
46  // We use two file descriptors, one opened with the O_DIRECT flag and one without.
47  int _fd_direct_on{-1};
48  int _fd_direct_off{-1};
49  bool _initialized{false};
50  CompatMode _compat_mode{CompatMode::AUTO};
51  mutable std::size_t _nbytes{0}; // The size of the underlying file, zero means unknown.
52  CUfileHandle_t _handle{};
53 
63  bool is_compat_mode_preferred_for_async(CompatMode requested_compat_mode)
64  {
65  if (!defaults::is_compat_mode_preferred(requested_compat_mode)) {
66  if (!is_batch_and_stream_available()) {
67  if (requested_compat_mode == CompatMode::AUTO) { return true; }
68  throw std::runtime_error("Missing cuFile batch or stream library symbol.");
69  }
70 
71  // When checking for availability, we also check if cuFile's config file exist. This is
72  // because even when the stream API is available, it doesn't work if no config file exist.
73  if (config_path().empty()) {
74  if (requested_compat_mode == CompatMode::AUTO) { return true; }
75  throw std::runtime_error("Missing cuFile configuration file.");
76  }
77 
78  return false;
79  }
80 
81  return true;
82  }
83 
84  public:
85  static constexpr mode_t m644 = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
86  FileHandle() noexcept = default;
87 
104  FileHandle(const std::string& file_path,
105  const std::string& flags = "r",
106  mode_t mode = m644,
107  CompatMode compat_mode = defaults::compat_mode());
108 
112  FileHandle(const FileHandle&) = delete;
113  FileHandle& operator=(FileHandle const&) = delete;
114  FileHandle(FileHandle&& o) noexcept
115  : _fd_direct_on{std::exchange(o._fd_direct_on, -1)},
116  _fd_direct_off{std::exchange(o._fd_direct_off, -1)},
117  _initialized{std::exchange(o._initialized, false)},
118  _compat_mode{std::exchange(o._compat_mode, CompatMode::AUTO)},
119  _nbytes{std::exchange(o._nbytes, 0)},
120  _handle{std::exchange(o._handle, CUfileHandle_t{})}
121  {
122  }
123  FileHandle& operator=(FileHandle&& o) noexcept
124  {
125  _fd_direct_on = std::exchange(o._fd_direct_on, -1);
126  _fd_direct_off = std::exchange(o._fd_direct_off, -1);
127  _initialized = std::exchange(o._initialized, false);
128  _compat_mode = std::exchange(o._compat_mode, CompatMode::AUTO);
129  _nbytes = std::exchange(o._nbytes, 0);
130  _handle = std::exchange(o._handle, CUfileHandle_t{});
131  return *this;
132  }
133  ~FileHandle() noexcept { close(); }
134 
140  [[nodiscard]] bool closed() const noexcept { return !_initialized; }
141 
145  void close() noexcept
146  {
147  if (closed()) { return; }
148 
149  if (!is_compat_mode_preferred()) { cuFileAPI::instance().HandleDeregister(_handle); }
150  _compat_mode = CompatMode::AUTO;
151  ::close(_fd_direct_off);
152  if (_fd_direct_on != -1) { ::close(_fd_direct_on); }
153  _fd_direct_on = -1;
154  _fd_direct_off = -1;
155  _initialized = false;
156  }
157 
166  [[nodiscard]] CUfileHandle_t handle()
167  {
168  if (closed()) { throw CUfileException("File handle is closed"); }
169  if (is_compat_mode_preferred()) {
170  throw CUfileException("The underlying cuFile handle isn't available in compatibility mode");
171  }
172  return _handle;
173  }
174 
184  [[nodiscard]] int fd() const noexcept { return _fd_direct_off; }
185 
195  [[nodiscard]] int fd_open_flags() const;
196 
204  [[nodiscard]] std::size_t nbytes() const;
205 
236  std::size_t read(void* devPtr_base,
237  std::size_t size,
238  std::size_t file_offset,
239  std::size_t devPtr_offset,
240  bool sync_default_stream = true)
241  {
242  if (is_compat_mode_preferred()) {
243  return detail::posix_device_read(
244  _fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
245  }
246  if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); }
247 
248  KVIKIO_NVTX_SCOPED_RANGE("cufileRead()", size);
249  ssize_t ret = cuFileAPI::instance().Read(
250  _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
251  CUFILE_CHECK_BYTES_DONE(ret);
252  return ret;
253  }
254 
286  std::size_t write(const void* devPtr_base,
287  std::size_t size,
288  std::size_t file_offset,
289  std::size_t devPtr_offset,
290  bool sync_default_stream = true)
291  {
292  _nbytes = 0; // Invalidate the computed file size
293 
294  if (is_compat_mode_preferred()) {
295  return detail::posix_device_write(
296  _fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
297  }
298  if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); }
299 
300  KVIKIO_NVTX_SCOPED_RANGE("cufileWrite()", size);
301  ssize_t ret = cuFileAPI::instance().Write(
302  _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
303  if (ret == -1) {
304  throw std::system_error(errno, std::generic_category(), "Unable to write file");
305  }
306  if (ret < -1) {
307  throw CUfileException(std::string{"cuFile error at: "} + __FILE__ + ":" +
308  KVIKIO_STRINGIFY(__LINE__) + ": " + CUFILE_ERRSTR(ret));
309  }
310  return ret;
311  }
312 
340  std::future<std::size_t> pread(void* buf,
341  std::size_t size,
342  std::size_t file_offset = 0,
343  std::size_t task_size = defaults::task_size(),
344  std::size_t gds_threshold = defaults::gds_threshold(),
345  bool sync_default_stream = true)
346  {
347  KVIKIO_NVTX_MARKER("FileHandle::pread()", size);
348  if (is_host_memory(buf)) {
349  auto op = [this](void* hostPtr_base,
350  std::size_t size,
351  std::size_t file_offset,
352  std::size_t hostPtr_offset) -> std::size_t {
353  char* buf = static_cast<char*>(hostPtr_base) + hostPtr_offset;
354  return detail::posix_host_read<detail::PartialIO::NO>(
355  _fd_direct_off, buf, size, file_offset);
356  };
357 
358  return parallel_io(op, buf, size, file_offset, task_size, 0);
359  }
360 
361  CUcontext ctx = get_context_from_pointer(buf);
362 
363  // Shortcut that circumvent the threadpool and use the POSIX backend directly.
364  if (size < gds_threshold) {
365  auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
366  PushAndPopContext c(ctx);
367  return detail::posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
368  };
369  return std::async(std::launch::deferred, task);
370  }
371 
372  // Let's synchronize once instead of in each task.
373  if (sync_default_stream && !is_compat_mode_preferred()) {
374  PushAndPopContext c(ctx);
375  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr));
376  }
377 
378  // Regular case that use the threadpool and run the tasks in parallel
379  auto task = [this, ctx](void* devPtr_base,
380  std::size_t size,
381  std::size_t file_offset,
382  std::size_t devPtr_offset) -> std::size_t {
383  PushAndPopContext c(ctx);
384  return read(devPtr_base, size, file_offset, devPtr_offset, /* sync_default_stream = */ false);
385  };
386  auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
387  return parallel_io(task, devPtr_base, size, file_offset, task_size, devPtr_offset);
388  }
389 
417  std::future<std::size_t> pwrite(const void* buf,
418  std::size_t size,
419  std::size_t file_offset = 0,
420  std::size_t task_size = defaults::task_size(),
421  std::size_t gds_threshold = defaults::gds_threshold(),
422  bool sync_default_stream = true)
423  {
424  KVIKIO_NVTX_MARKER("FileHandle::pwrite()", size);
425  if (is_host_memory(buf)) {
426  auto op = [this](const void* hostPtr_base,
427  std::size_t size,
428  std::size_t file_offset,
429  std::size_t hostPtr_offset) -> std::size_t {
430  const char* buf = static_cast<const char*>(hostPtr_base) + hostPtr_offset;
431  return detail::posix_host_write<detail::PartialIO::NO>(
432  _fd_direct_off, buf, size, file_offset);
433  };
434 
435  return parallel_io(op, buf, size, file_offset, task_size, 0);
436  }
437 
438  CUcontext ctx = get_context_from_pointer(buf);
439 
440  // Shortcut that circumvent the threadpool and use the POSIX backend directly.
441  if (size < gds_threshold) {
442  auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
443  PushAndPopContext c(ctx);
444  return detail::posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
445  };
446  return std::async(std::launch::deferred, task);
447  }
448 
449  // Let's synchronize once instead of in each task.
450  if (sync_default_stream && !is_compat_mode_preferred()) {
451  PushAndPopContext c(ctx);
452  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr));
453  }
454 
455  // Regular case that use the threadpool and run the tasks in parallel
456  auto op = [this, ctx](const void* devPtr_base,
457  std::size_t size,
458  std::size_t file_offset,
459  std::size_t devPtr_offset) -> std::size_t {
460  PushAndPopContext c(ctx);
461  return write(
462  devPtr_base, size, file_offset, devPtr_offset, /* sync_default_stream = */ false);
463  };
464  auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
465  return parallel_io(op, devPtr_base, size, file_offset, task_size, devPtr_offset);
466  }
467 
502  void read_async(void* devPtr_base,
503  std::size_t* size_p,
504  off_t* file_offset_p,
505  off_t* devPtr_offset_p,
506  ssize_t* bytes_read_p,
507  CUstream stream)
508  {
509  if (is_compat_mode_preferred_for_async(_compat_mode)) {
510  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
511  *bytes_read_p =
512  static_cast<ssize_t>(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
513  } else {
514  CUFILE_TRY(cuFileAPI::instance().ReadAsync(
515  _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream));
516  }
517  }
518 
544  [[nodiscard]] StreamFuture read_async(void* devPtr_base,
545  std::size_t size,
546  off_t file_offset = 0,
547  off_t devPtr_offset = 0,
548  CUstream stream = nullptr)
549  {
550  StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
551  auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_] =
552  ret.get_args();
553  read_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_);
554  return ret;
555  }
556 
592  void write_async(void* devPtr_base,
593  std::size_t* size_p,
594  off_t* file_offset_p,
595  off_t* devPtr_offset_p,
596  ssize_t* bytes_written_p,
597  CUstream stream)
598  {
599  if (is_compat_mode_preferred_for_async(_compat_mode)) {
600  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
601  *bytes_written_p =
602  static_cast<ssize_t>(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
603  } else {
604  CUFILE_TRY(cuFileAPI::instance().WriteAsync(
605  _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream));
606  }
607  }
608 
634  [[nodiscard]] StreamFuture write_async(void* devPtr_base,
635  std::size_t size,
636  off_t file_offset = 0,
637  off_t devPtr_offset = 0,
638  CUstream stream = nullptr)
639  {
640  StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
641  auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_] =
642  ret.get_args();
643  write_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_);
644  return ret;
645  }
646 
656  [[nodiscard]] bool is_compat_mode_preferred() const noexcept
657  {
658  return defaults::is_compat_mode_preferred(_compat_mode);
659  }
660 
671  [[nodiscard]] bool is_compat_mode_preferred_for_async() const noexcept
672  {
673  static bool is_extra_symbol_available = is_batch_and_stream_available();
674  static bool is_config_path_empty = config_path().empty();
675  return is_compat_mode_preferred() || !is_extra_symbol_available || is_config_path_empty;
676  }
677 };
678 
679 } // namespace kvikio
Handle of an open file registered with cufile.
Definition: file_handle.hpp:44
void close() noexcept
Deregister the file and close the two files.
FileHandle(const FileHandle &)=delete
FileHandle support move semantic but isn't copyable.
std::future< std::size_t > pread(void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold(), bool sync_default_stream=true)
Reads specified bytes from the file into the device or host memory in parallel.
void read_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_read_p, CUstream stream)
Reads specified bytes from the file into the device memory asynchronously.
CUfileHandle_t handle()
Get the underlying cuFile file handle.
int fd_open_flags() const
Get the flags of one of the file descriptors (see open(2))
StreamFuture write_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Writes specified bytes from the device memory into the file asynchronously.
std::size_t write(const void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset, bool sync_default_stream=true)
Writes specified bytes from the device memory into the file.
bool is_compat_mode_preferred_for_async() const noexcept
Returns true if the compatibility mode is expected to be ON for the asynchronous I/O on this file.
bool is_compat_mode_preferred() const noexcept
Returns true if the compatibility mode is expected to be ON for this file.
bool closed() const noexcept
Whether the file is closed according to its initialization status.
StreamFuture read_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Reads specified bytes from the file into the device memory asynchronously.
FileHandle(const std::string &file_path, const std::string &flags="r", mode_t mode=m644, CompatMode compat_mode=defaults::compat_mode())
Construct a file handle from a file path.
int fd() const noexcept
Get one of the file descriptors.
void write_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_written_p, CUstream stream)
Writes specified bytes from the device memory into the file asynchronously.
std::future< std::size_t > pwrite(const void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold(), bool sync_default_stream=true)
Writes specified bytes from device or host memory into the file in parallel.
std::size_t nbytes() const
Get the file size.
std::size_t read(void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset, bool sync_default_stream=true)
Reads specified bytes from the file into the device memory.
Push CUDA context on creation and pop it on destruction.
Definition: utils.hpp:233
Future of an asynchronous IO operation.
Definition: stream.hpp:48
std::tuple< void *, std::size_t *, off_t *, off_t *, ssize_t *, CUstream > get_args() const
Return the arguments of the future call.
Definition: stream.hpp:105
static std::size_t task_size()
Get the default task size used for parallel IO operations.
Definition: defaults.hpp:335
static bool is_compat_mode_preferred()
Whether the global compatibility mode from class defaults is expected to be ON.
Definition: defaults.hpp:284
static std::size_t gds_threshold()
Get the default GDS threshold, which is the minimum size to use GDS (in bytes).
Definition: defaults.hpp:361
static CompatMode compat_mode()
Return whether the KvikIO library is running in compatibility mode or not.
Definition: defaults.hpp:216
CompatMode
I/O compatibility mode.
Definition: defaults.hpp:38