posix_io.hpp
1 /*
2  * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION.
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 #pragma once
6 
7 #include <unistd.h>
8 #include <cstddef>
9 #include <cstdlib>
10 #include <map>
11 #include <thread>
12 #include <type_traits>
13 
14 #include <kvikio/bounce_buffer.hpp>
15 #include <kvikio/detail/nvtx.hpp>
16 #include <kvikio/detail/utils.hpp>
17 #include <kvikio/error.hpp>
18 #include <kvikio/shim/cuda.hpp>
19 #include <kvikio/utils.hpp>
20 
21 namespace kvikio::detail {
22 
26 enum class IOOperationType : uint8_t {
27  READ,
28  WRITE,
29 };
30 
34 enum class PartialIO : uint8_t {
35  YES,
36  NO,
37 };
38 
46  private:
47  std::map<std::pair<CUcontext, std::thread::id>, CUstream> _streams;
48 
49  public:
50  StreamsByThread() = default;
51 
52  // Here we intentionally do not destroy in the destructor the CUDA resources
53  // (e.g. CUstream) with static storage duration, but instead let them leak
54  // on program termination. This is to prevent undefined behavior in CUDA. See
55  // <https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#initialization>
56  // This also prevents crash (segmentation fault) if clients call
57  // cuDevicePrimaryCtxReset() or cudaDeviceReset() before program termination.
58  ~StreamsByThread() = default;
59 
60  KVIKIO_EXPORT static CUstream get(CUcontext ctx, std::thread::id thd_id);
61 
62  static CUstream get();
63 
64  StreamsByThread(StreamsByThread const&) = delete;
65  StreamsByThread& operator=(StreamsByThread const&) = delete;
66  StreamsByThread(StreamsByThread&& o) = delete;
67  StreamsByThread& operator=(StreamsByThread&& o) = delete;
68 };
69 
96 template <IOOperationType Operation,
97  PartialIO PartialIOStatus,
98  typename BounceBufferPoolType = PageAlignedBounceBufferPool>
99 ssize_t posix_host_io(
100  int fd_direct_off, void const* buf, size_t count, off_t offset, int fd_direct_on = -1)
101 {
102  auto pread_or_write = [](int fd, void* buf, size_t count, off_t offset) -> ssize_t {
103  ssize_t nbytes{};
104  if constexpr (Operation == IOOperationType::READ) {
105  nbytes = ::pread(fd, buf, count, offset);
106  } else {
107  nbytes = ::pwrite(fd, buf, count, offset);
108  }
109  return nbytes;
110  };
111 
112  off_t cur_offset = offset;
113  size_t bytes_remaining = count;
114  char* buffer = const_cast<char*>(static_cast<char const*>(buf));
115  auto const page_size = get_page_size();
116 
117  // Process all bytes in a loop (unless PartialIO::YES returns early)
118  while (bytes_remaining > 0) {
119  ssize_t nbytes_processed{};
120 
121  if (fd_direct_on == -1) {
122  // Direct I/O disabled: use buffered I/O for entire transfer
123  nbytes_processed = pread_or_write(fd_direct_off, buffer, bytes_remaining, cur_offset);
124  } else {
125  // Direct I/O enabled: attempt to use it when alignment allows
126  auto const is_cur_offset_aligned = detail::is_aligned(cur_offset, page_size);
127 
128  if (!is_cur_offset_aligned) {
129  // Handle unaligned prefix: use buffered I/O to reach next page boundary
130  // This ensures subsequent iterations will have page-aligned offsets
131  auto const aligned_cur_offset = detail::align_up(cur_offset, page_size);
132  auto const bytes_requested = std::min(aligned_cur_offset - cur_offset, bytes_remaining);
133  nbytes_processed = pread_or_write(fd_direct_off, buffer, bytes_requested, cur_offset);
134  } else {
135  if (bytes_remaining < page_size) {
136  // Handle unaligned suffix: remaining bytes are less than a page, use buffered I/O
137  nbytes_processed = pread_or_write(fd_direct_off, buffer, bytes_remaining, cur_offset);
138  } else {
139  // Offset is page-aligned. Now make transfer size page-aligned too by rounding down
140  auto aligned_bytes_remaining = detail::align_down(bytes_remaining, page_size);
141  auto const is_buf_aligned = detail::is_aligned(buffer, page_size);
142  auto bytes_requested = aligned_bytes_remaining;
143 
144  if (!is_buf_aligned) {
145  // Buffer not page-aligned: use bounce buffer for Direct I/O
146  auto bounce_buffer = BounceBufferPoolType::instance().get();
147  auto* aligned_buf = bounce_buffer.get();
148  // Limit transfer size to bounce buffer capacity
149  bytes_requested = std::min(bytes_requested, bounce_buffer.size());
150 
151  if constexpr (Operation == IOOperationType::WRITE) {
152  // Copy user data to aligned bounce buffer before Direct I/O write
153  std::memcpy(aligned_buf, buffer, bytes_requested);
154  }
155 
156  // Perform Direct I/O using the bounce buffer
157  nbytes_processed =
158  pread_or_write(fd_direct_on, aligned_buf, bytes_requested, cur_offset);
159 
160  if constexpr (Operation == IOOperationType::READ) {
161  // Copy data from bounce buffer to user buffer after Direct I/O read
162  std::memcpy(buffer, aligned_buf, nbytes_processed);
163  }
164  } else {
165  // Buffer is page-aligned: perform Direct I/O directly with user buffer
166  nbytes_processed = pread_or_write(fd_direct_on, buffer, bytes_requested, cur_offset);
167  }
168  }
169  }
170  }
171 
172  // Error handling
173  if (nbytes_processed == -1) {
174  std::string const name = (Operation == IOOperationType::READ) ? "pread" : "pwrite";
175  KVIKIO_EXPECT(errno != EBADF, "POSIX error: Operation not permitted");
176  KVIKIO_FAIL("POSIX error on " + name + ": " + strerror(errno));
177  }
178  if constexpr (Operation == IOOperationType::READ) {
179  KVIKIO_EXPECT(nbytes_processed != 0, "POSIX error on pread: EOF");
180  }
181 
182  // Return early if partial I/O is allowed
183  if constexpr (PartialIOStatus == PartialIO::YES) { return nbytes_processed; }
184 
185  // Advance to next segment
186  buffer += nbytes_processed; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
187  cur_offset += nbytes_processed;
188  bytes_remaining -= nbytes_processed;
189  }
190 
191  return convert_size2ssize(count);
192 }
193 
217 template <IOOperationType Operation, typename BounceBufferPoolType = CudaPinnedBounceBufferPool>
218 std::size_t posix_device_io(int fd_direct_off,
219  void const* devPtr_base,
220  std::size_t size,
221  std::size_t file_offset,
222  std::size_t devPtr_offset,
223  int fd_direct_on = -1)
224 {
225  // Direct I/O requires page-aligned bounce buffers. CudaPinnedBounceBufferPool uses
226  // cudaMemHostAlloc which does not guarantee page alignment.
227  if (std::is_same_v<BounceBufferPoolType, CudaPinnedBounceBufferPool>) {
229  fd_direct_on == -1,
230  "Direct I/O requires page-aligned bounce buffers. CudaPinnedBounceBufferPool does not "
231  "guarantee page alignment. Use CudaPageAlignedPinnedBounceBufferPool instead.");
232  }
233 
234  auto bounce_buffer = BounceBufferPoolType::instance().get();
235  CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset;
236  off_t cur_file_offset = convert_size2off(file_offset);
237  off_t bytes_remaining = convert_size2off(size);
238  off_t const chunk_size2 = convert_size2off(bounce_buffer.size());
239 
240  // Get a stream for the current CUDA context and thread
241  CUstream stream = StreamsByThread::get();
242 
243  while (bytes_remaining > 0) {
244  off_t const nbytes_requested = std::min(chunk_size2, bytes_remaining);
245  ssize_t nbytes_got = nbytes_requested;
246  if constexpr (Operation == IOOperationType::READ) {
247  nbytes_got = posix_host_io<IOOperationType::READ, PartialIO::YES>(
248  fd_direct_off, bounce_buffer.get(), nbytes_requested, cur_file_offset, fd_direct_on);
250  cudaAPI::instance().MemcpyHtoDAsync(devPtr, bounce_buffer.get(), nbytes_got, stream));
251  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
252  } else { // Is a write operation
254  cudaAPI::instance().MemcpyDtoHAsync(bounce_buffer.get(), devPtr, nbytes_requested, stream));
255  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
256  posix_host_io<IOOperationType::WRITE, PartialIO::NO>(
257  fd_direct_off, bounce_buffer.get(), nbytes_requested, cur_file_offset, fd_direct_on);
258  }
259  cur_file_offset += nbytes_got;
260  devPtr += nbytes_got;
261  bytes_remaining -= nbytes_got;
262  }
263  return size;
264 }
265 
281 template <PartialIO PartialIOStatus>
282 std::size_t posix_host_read(
283  int fd_direct_off, void* buf, std::size_t size, std::size_t file_offset, int fd_direct_on = -1)
284 {
285  KVIKIO_NVTX_FUNC_RANGE(size);
286 
287  auto cur_fd_direct_on{-1};
288  if (fd_direct_on != -1 && defaults::auto_direct_io_read()) { cur_fd_direct_on = fd_direct_on; }
289 
290  return detail::posix_host_io<IOOperationType::READ, PartialIOStatus>(
291  fd_direct_off, buf, size, convert_size2off(file_offset), cur_fd_direct_on);
292 }
293 
309 template <PartialIO PartialIOStatus>
310 std::size_t posix_host_write(int fd_direct_off,
311  void const* buf,
312  std::size_t size,
313  std::size_t file_offset,
314  int fd_direct_on = -1)
315 {
316  KVIKIO_NVTX_FUNC_RANGE(size);
317 
318  auto cur_fd_direct_on{-1};
319  if (fd_direct_on != -1 && defaults::auto_direct_io_write()) { cur_fd_direct_on = fd_direct_on; }
320 
321  return detail::posix_host_io<IOOperationType::WRITE, PartialIOStatus>(
322  fd_direct_off, buf, size, convert_size2off(file_offset), cur_fd_direct_on);
323 }
324 
339 std::size_t posix_device_read(int fd_direct_off,
340  void const* devPtr_base,
341  std::size_t size,
342  std::size_t file_offset,
343  std::size_t devPtr_offset,
344  int fd_direct_on = -1);
345 
360 std::size_t posix_device_write(int fd_direct_off,
361  void const* devPtr_base,
362  std::size_t size,
363  std::size_t file_offset,
364  std::size_t devPtr_offset,
365  int fd_direct_on = -1);
366 
367 } // namespace kvikio::detail
Thread-safe singleton pool for reusable bounce buffers.
static bool auto_direct_io_read()
Check if Direct I/O is enabled for POSIX reads.
static bool auto_direct_io_write()
Check if Direct I/O is enabled for POSIX writes.
Singleton class to retrieve a CUDA stream for device-host copying.
Definition: posix_io.hpp:45
#define KVIKIO_EXPECT(...)
Macro for checking pre-conditions or conditions that throws an exception when a condition is violated...
Definition: error.hpp:205
#define CUDA_DRIVER_TRY(...)
Error checking macro for CUDA driver API functions.
Definition: error.hpp:57
#define KVIKIO_FAIL(...)
Indicates that an erroneous code path has been taken.
Definition: error.hpp:241