All Classes Files Functions Enumerations Enumerator Pages
posix_io.hpp
1 /*
2  * Copyright (c) 2022-2025, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <unistd.h>
19 #include <cstddef>
20 #include <cstdlib>
21 #include <map>
22 #include <thread>
23 
24 #include <kvikio/bounce_buffer.hpp>
25 #include <kvikio/error.hpp>
26 #include <kvikio/shim/cuda.hpp>
27 #include <kvikio/utils.hpp>
28 
29 namespace kvikio::detail {
30 
34 enum class IOOperationType : uint8_t {
35  READ,
36  WRITE,
37 };
38 
42 enum class PartialIO : uint8_t {
43  YES,
44  NO,
45 };
46 
54  private:
55  std::map<std::pair<CUcontext, std::thread::id>, CUstream> _streams;
56 
57  public:
58  StreamsByThread() = default;
59 
60  // Here we intentionally do not destroy in the destructor the CUDA resources
61  // (e.g. CUstream) with static storage duration, but instead let them leak
62  // on program termination. This is to prevent undefined behavior in CUDA. See
63  // <https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#initialization>
64  // This also prevents crash (segmentation fault) if clients call
65  // cuDevicePrimaryCtxReset() or cudaDeviceReset() before program termination.
66  ~StreamsByThread() = default;
67 
68  KVIKIO_EXPORT static CUstream get(CUcontext ctx, std::thread::id thd_id);
69 
70  static CUstream get();
71 
72  StreamsByThread(const StreamsByThread&) = delete;
73  StreamsByThread& operator=(StreamsByThread const&) = delete;
74  StreamsByThread(StreamsByThread&& o) = delete;
75  StreamsByThread& operator=(StreamsByThread&& o) = delete;
76 };
77 
90 template <IOOperationType Operation, PartialIO PartialIOStatus>
91 ssize_t posix_host_io(int fd, const void* buf, size_t count, off_t offset)
92 {
93  off_t cur_offset = offset;
94  size_t byte_remaining = count;
95  char* buffer = const_cast<char*>(static_cast<const char*>(buf));
96  while (byte_remaining > 0) {
97  ssize_t nbytes = 0;
98  if constexpr (Operation == IOOperationType::READ) {
99  nbytes = ::pread(fd, buffer, byte_remaining, cur_offset);
100  } else {
101  nbytes = ::pwrite(fd, buffer, byte_remaining, cur_offset);
102  }
103  if (nbytes == -1) {
104  const std::string name = Operation == IOOperationType::READ ? "pread" : "pwrite";
105  if (errno == EBADF) {
106  throw CUfileException{std::string{"POSIX error on " + name + " at: "} + __FILE__ + ":" +
107  KVIKIO_STRINGIFY(__LINE__) + ": Operation not permitted"};
108  }
109  throw CUfileException{std::string{"POSIX error on " + name + " at: "} + __FILE__ + ":" +
110  KVIKIO_STRINGIFY(__LINE__) + ": " + strerror(errno)};
111  }
112  if constexpr (Operation == IOOperationType::READ) {
113  if (nbytes == 0) {
114  throw CUfileException{std::string{"POSIX error on pread at: "} + __FILE__ + ":" +
115  KVIKIO_STRINGIFY(__LINE__) + ": EOF"};
116  }
117  }
118  if constexpr (PartialIOStatus == PartialIO::YES) { return nbytes; }
119  buffer += nbytes; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
120  cur_offset += nbytes;
121  byte_remaining -= nbytes;
122  }
123  return convert_size2ssize(count);
124 }
125 
137 template <IOOperationType Operation>
138 std::size_t posix_device_io(int fd,
139  const void* devPtr_base,
140  std::size_t size,
141  std::size_t file_offset,
142  std::size_t devPtr_offset)
143 {
144  auto alloc = AllocRetain::instance().get();
145  CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset;
146  off_t cur_file_offset = convert_size2off(file_offset);
147  off_t byte_remaining = convert_size2off(size);
148  const off_t chunk_size2 = convert_size2off(alloc.size());
149 
150  // Get a stream for the current CUDA context and thread
151  CUstream stream = StreamsByThread::get();
152 
153  while (byte_remaining > 0) {
154  const off_t nbytes_requested = std::min(chunk_size2, byte_remaining);
155  ssize_t nbytes_got = nbytes_requested;
156  if constexpr (Operation == IOOperationType::READ) {
157  nbytes_got = posix_host_io<IOOperationType::READ, PartialIO::YES>(
158  fd, alloc.get(), nbytes_requested, cur_file_offset);
159  CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.get(), nbytes_got, stream));
160  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
161  } else { // Is a write operation
162  CUDA_DRIVER_TRY(
163  cudaAPI::instance().MemcpyDtoHAsync(alloc.get(), devPtr, nbytes_requested, stream));
164  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
165  posix_host_io<IOOperationType::WRITE, PartialIO::NO>(
166  fd, alloc.get(), nbytes_requested, cur_file_offset);
167  }
168  cur_file_offset += nbytes_got;
169  devPtr += nbytes_got;
170  byte_remaining -= nbytes_got;
171  }
172  return size;
173 }
174 
189 template <PartialIO PartialIOStatus>
190 std::size_t posix_host_read(int fd, void* buf, std::size_t size, std::size_t file_offset)
191 {
192  KVIKIO_NVTX_SCOPED_RANGE("posix_host_read()", size);
193  return detail::posix_host_io<IOOperationType::READ, PartialIOStatus>(
194  fd, buf, size, convert_size2off(file_offset));
195 }
196 
211 template <PartialIO PartialIOStatus>
212 std::size_t posix_host_write(int fd, const void* buf, std::size_t size, std::size_t file_offset)
213 {
214  KVIKIO_NVTX_SCOPED_RANGE("posix_host_write()", size);
215  return detail::posix_host_io<IOOperationType::WRITE, PartialIOStatus>(
216  fd, buf, size, convert_size2off(file_offset));
217 }
218 
232 std::size_t posix_device_read(int fd,
233  const void* devPtr_base,
234  std::size_t size,
235  std::size_t file_offset,
236  std::size_t devPtr_offset);
237 
251 std::size_t posix_device_write(int fd,
252  const void* devPtr_base,
253  std::size_t size,
254  std::size_t file_offset,
255  std::size_t devPtr_offset);
256 
257 } // namespace kvikio::detail
Singleton class to retrieve a CUDA stream for device-host copying.
Definition: posix_io.hpp:53