posix_io.hpp
1 /*
2  * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION.
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 #pragma once
6 
7 #include <unistd.h>
8 
9 #include <cstddef>
10 #include <cstdlib>
11 #include <type_traits>
12 
13 #include <kvikio/bounce_buffer.hpp>
14 #include <kvikio/detail/nvtx.hpp>
15 #include <kvikio/detail/stream.hpp>
16 #include <kvikio/detail/utils.hpp>
17 #include <kvikio/error.hpp>
18 #include <kvikio/shim/cuda.hpp>
19 #include <kvikio/utils.hpp>
20 
21 namespace kvikio::detail {
22 
26 enum class IOOperationType : uint8_t {
27  READ,
28  WRITE,
29 };
30 
34 enum class PartialIO : uint8_t {
35  YES,
36  NO,
37 };
38 
65 template <IOOperationType Operation,
66  PartialIO PartialIOStatus,
67  typename BounceBufferPoolType = PageAlignedBounceBufferPool>
68 ssize_t posix_host_io(
69  int fd_direct_off, void const* buf, size_t count, off_t offset, int fd_direct_on = -1)
70 {
71  auto pread_or_write = [](int fd, void* buf, size_t count, off_t offset) -> ssize_t {
72  ssize_t nbytes{};
73  if constexpr (Operation == IOOperationType::READ) {
74  nbytes = ::pread(fd, buf, count, offset);
75  } else {
76  nbytes = ::pwrite(fd, buf, count, offset);
77  }
78  return nbytes;
79  };
80 
81  off_t cur_offset = offset;
82  size_t bytes_remaining = count;
83  char* buffer = const_cast<char*>(static_cast<char const*>(buf));
84  auto const page_size = get_page_size();
85 
86  constexpr char const* op_name_bio =
87  (Operation == IOOperationType::READ) ? "Buffered pread" : "Buffered pwrite";
88  constexpr char const* op_name_dio =
89  (Operation == IOOperationType::READ) ? "Direct pread" : "Direct pwrite";
90  constexpr char const* op_name_dio_bounce =
91  (Operation == IOOperationType::READ) ? "Direct pread with bounce" : "Direct pwrite with bounce";
92  constexpr nvtx3::rgb color_bio{255, 128, 128};
93  constexpr nvtx3::rgb color_dio{128, 255, 128};
94  constexpr nvtx3::rgb color_dio_bounce{128, 128, 255};
95 
96  // Process all bytes in a loop (unless PartialIO::YES returns early)
97  while (bytes_remaining > 0) {
98  ssize_t nbytes_processed{};
99 
100  if (fd_direct_on == -1) {
101  KVIKIO_NVTX_SCOPED_RANGE(op_name_bio, bytes_remaining, color_bio);
102  // Direct I/O disabled: use buffered I/O for entire transfer
103  nbytes_processed = pread_or_write(fd_direct_off, buffer, bytes_remaining, cur_offset);
104  } else {
105  // Direct I/O enabled: attempt to use it when alignment allows
106  auto const is_cur_offset_aligned = detail::is_aligned(cur_offset, page_size);
107 
108  if (!is_cur_offset_aligned) {
109  // Handle unaligned prefix: use buffered I/O to reach next page boundary
110  // This ensures subsequent iterations will have page-aligned offsets
111  auto const aligned_cur_offset = detail::align_up(cur_offset, page_size);
112  auto const bytes_requested = std::min(aligned_cur_offset - cur_offset, bytes_remaining);
113  KVIKIO_NVTX_SCOPED_RANGE(op_name_bio, bytes_requested, color_bio);
114  nbytes_processed = pread_or_write(fd_direct_off, buffer, bytes_requested, cur_offset);
115  } else {
116  if (bytes_remaining < page_size) {
117  KVIKIO_NVTX_SCOPED_RANGE(op_name_bio, bytes_remaining, color_bio);
118  // Handle unaligned suffix: remaining bytes are less than a page, use buffered I/O
119  nbytes_processed = pread_or_write(fd_direct_off, buffer, bytes_remaining, cur_offset);
120  } else {
121  // Offset is page-aligned. Now make transfer size page-aligned too by rounding down
122  auto aligned_bytes_remaining = detail::align_down(bytes_remaining, page_size);
123  auto const is_buf_aligned = detail::is_aligned(buffer, page_size);
124  auto bytes_requested = aligned_bytes_remaining;
125 
126  if (!is_buf_aligned) {
127  // Buffer not page-aligned: use bounce buffer for Direct I/O
128  auto bounce_buffer = BounceBufferPoolType::instance().get();
129  auto* aligned_buf = bounce_buffer.get();
130  // Limit transfer size to bounce buffer capacity
131  bytes_requested = std::min(bytes_requested, bounce_buffer.size());
132 
133  KVIKIO_NVTX_SCOPED_RANGE(op_name_dio_bounce, bytes_requested, color_dio_bounce);
134 
135  if constexpr (Operation == IOOperationType::WRITE) {
136  // Copy user data to aligned bounce buffer before Direct I/O write
137  std::memcpy(aligned_buf, buffer, bytes_requested);
138  }
139 
140  // Perform Direct I/O using the bounce buffer
141  nbytes_processed =
142  pread_or_write(fd_direct_on, aligned_buf, bytes_requested, cur_offset);
143 
144  if constexpr (Operation == IOOperationType::READ) {
145  // Copy data from bounce buffer to user buffer after Direct I/O read
146  std::memcpy(buffer, aligned_buf, nbytes_processed);
147  }
148  } else {
149  KVIKIO_NVTX_SCOPED_RANGE(op_name_dio, bytes_requested, color_dio);
150  // Buffer is page-aligned: perform Direct I/O directly with user buffer
151  nbytes_processed = pread_or_write(fd_direct_on, buffer, bytes_requested, cur_offset);
152  }
153  }
154  }
155  }
156 
157  // Error handling
158  if (nbytes_processed == -1) {
159  std::string const name = (Operation == IOOperationType::READ) ? "pread" : "pwrite";
160  KVIKIO_EXPECT(errno != EBADF, "POSIX error: Operation not permitted");
161  KVIKIO_FAIL("POSIX error on " + name + ": " + strerror(errno));
162  }
163  if constexpr (Operation == IOOperationType::READ) {
164  KVIKIO_EXPECT(nbytes_processed != 0, "POSIX error on pread: EOF");
165  }
166 
167  // Return early if partial I/O is allowed
168  if constexpr (PartialIOStatus == PartialIO::YES) { return nbytes_processed; }
169 
170  // Advance to next segment
171  buffer += nbytes_processed; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
172  cur_offset += nbytes_processed;
173  bytes_remaining -= nbytes_processed;
174  }
175 
176  return convert_size2ssize(count);
177 }
178 
202 template <IOOperationType Operation, typename BounceBufferPoolType = CudaPinnedBounceBufferPool>
203 std::size_t posix_device_io(int fd_direct_off,
204  void const* devPtr_base,
205  std::size_t size,
206  std::size_t file_offset,
207  std::size_t devPtr_offset,
208  int fd_direct_on = -1)
209 {
210  // Direct I/O requires page-aligned bounce buffers. CudaPinnedBounceBufferPool uses
211  // cudaMemHostAlloc which does not guarantee page alignment.
212  if (std::is_same_v<BounceBufferPoolType, CudaPinnedBounceBufferPool>) {
214  fd_direct_on == -1,
215  "Direct I/O requires page-aligned bounce buffers. CudaPinnedBounceBufferPool does not "
216  "guarantee page alignment. Use CudaPageAlignedPinnedBounceBufferPool instead.");
217  }
218 
219  auto bounce_buffer = BounceBufferPoolType::instance().get();
220  CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset;
221  off_t cur_file_offset = convert_size2off(file_offset);
222  off_t bytes_remaining = convert_size2off(size);
223  off_t const chunk_size2 = convert_size2off(bounce_buffer.size());
224 
225  // Get a stream for the current CUDA context and thread
226  CUstream stream = StreamCachePerThreadAndContext::get();
227 
228  while (bytes_remaining > 0) {
229  off_t const nbytes_requested = std::min(chunk_size2, bytes_remaining);
230  ssize_t nbytes_got = nbytes_requested;
231  if constexpr (Operation == IOOperationType::READ) {
232  nbytes_got = posix_host_io<IOOperationType::READ, PartialIO::YES>(
233  fd_direct_off, bounce_buffer.get(), nbytes_requested, cur_file_offset, fd_direct_on);
235  cudaAPI::instance().MemcpyHtoDAsync(devPtr, bounce_buffer.get(), nbytes_got, stream));
236  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
237  } else { // Is a write operation
239  cudaAPI::instance().MemcpyDtoHAsync(bounce_buffer.get(), devPtr, nbytes_requested, stream));
240  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
241  posix_host_io<IOOperationType::WRITE, PartialIO::NO>(
242  fd_direct_off, bounce_buffer.get(), nbytes_requested, cur_file_offset, fd_direct_on);
243  }
244  cur_file_offset += nbytes_got;
245  devPtr += nbytes_got;
246  bytes_remaining -= nbytes_got;
247  }
248  return size;
249 }
250 
266 template <PartialIO PartialIOStatus>
267 std::size_t posix_host_read(
268  int fd_direct_off, void* buf, std::size_t size, std::size_t file_offset, int fd_direct_on = -1)
269 {
270  KVIKIO_NVTX_FUNC_RANGE(size);
271 
272  auto cur_fd_direct_on{-1};
273  if (fd_direct_on != -1 && defaults::auto_direct_io_read()) { cur_fd_direct_on = fd_direct_on; }
274 
275  return detail::posix_host_io<IOOperationType::READ, PartialIOStatus>(
276  fd_direct_off, buf, size, convert_size2off(file_offset), cur_fd_direct_on);
277 }
278 
294 template <PartialIO PartialIOStatus>
295 std::size_t posix_host_write(int fd_direct_off,
296  void const* buf,
297  std::size_t size,
298  std::size_t file_offset,
299  int fd_direct_on = -1)
300 {
301  KVIKIO_NVTX_FUNC_RANGE(size);
302 
303  auto cur_fd_direct_on{-1};
304  if (fd_direct_on != -1 && defaults::auto_direct_io_write()) { cur_fd_direct_on = fd_direct_on; }
305 
306  return detail::posix_host_io<IOOperationType::WRITE, PartialIOStatus>(
307  fd_direct_off, buf, size, convert_size2off(file_offset), cur_fd_direct_on);
308 }
309 
324 std::size_t posix_device_read(int fd_direct_off,
325  void const* devPtr_base,
326  std::size_t size,
327  std::size_t file_offset,
328  std::size_t devPtr_offset,
329  int fd_direct_on = -1);
330 
345 std::size_t posix_device_write(int fd_direct_off,
346  void const* devPtr_base,
347  std::size_t size,
348  std::size_t file_offset,
349  std::size_t devPtr_offset,
350  int fd_direct_on = -1);
351 
352 } // namespace kvikio::detail
static bool auto_direct_io_read()
Check if Direct I/O is enabled for POSIX reads.
static bool auto_direct_io_write()
Check if Direct I/O is enabled for POSIX writes.
static KVIKIO_EXPORT CUstream get()
Get or create a CUDA stream for the current context and thread.
#define KVIKIO_EXPECT(...)
Macro for checking pre-conditions or conditions that throws an exception when a condition is violated...
Definition: error.hpp:207
#define CUDA_DRIVER_TRY(...)
Error checking macro for CUDA driver API functions.
Definition: error.hpp:59
#define KVIKIO_FAIL(...)
Indicates that an erroneous code path has been taken.
Definition: error.hpp:243
BounceBufferPool< PageAlignedAllocator > PageAlignedBounceBufferPool
Bounce buffer pool using page-aligned host memory.