All Classes Namespaces Functions Enumerations Enumerator Modules Pages
posix_io.hpp
1 /*
2  * Copyright (c) 2022-2025, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <unistd.h>
19 #include <cstddef>
20 #include <cstdlib>
21 #include <map>
22 #include <thread>
23 
24 #include <kvikio/bounce_buffer.hpp>
25 #include <kvikio/error.hpp>
26 #include <kvikio/nvtx.hpp>
27 #include <kvikio/shim/cuda.hpp>
28 #include <kvikio/utils.hpp>
29 
30 namespace kvikio::detail {
31 
35 enum class IOOperationType : uint8_t {
36  READ,
37  WRITE,
38 };
39 
43 enum class PartialIO : uint8_t {
44  YES,
45  NO,
46 };
47 
55  private:
56  std::map<std::pair<CUcontext, std::thread::id>, CUstream> _streams;
57 
58  public:
59  StreamsByThread() = default;
60 
61  // Here we intentionally do not destroy in the destructor the CUDA resources
62  // (e.g. CUstream) with static storage duration, but instead let them leak
63  // on program termination. This is to prevent undefined behavior in CUDA. See
64  // <https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#initialization>
65  // This also prevents crash (segmentation fault) if clients call
66  // cuDevicePrimaryCtxReset() or cudaDeviceReset() before program termination.
67  ~StreamsByThread() = default;
68 
69  KVIKIO_EXPORT static CUstream get(CUcontext ctx, std::thread::id thd_id);
70 
71  static CUstream get();
72 
73  StreamsByThread(StreamsByThread const&) = delete;
74  StreamsByThread& operator=(StreamsByThread const&) = delete;
75  StreamsByThread(StreamsByThread&& o) = delete;
76  StreamsByThread& operator=(StreamsByThread&& o) = delete;
77 };
78 
91 template <IOOperationType Operation, PartialIO PartialIOStatus>
92 ssize_t posix_host_io(int fd, void const* buf, size_t count, off_t offset)
93 {
94  off_t cur_offset = offset;
95  size_t byte_remaining = count;
96  char* buffer = const_cast<char*>(static_cast<char const*>(buf));
97  while (byte_remaining > 0) {
98  ssize_t nbytes = 0;
99  if constexpr (Operation == IOOperationType::READ) {
100  nbytes = ::pread(fd, buffer, byte_remaining, cur_offset);
101  } else {
102  nbytes = ::pwrite(fd, buffer, byte_remaining, cur_offset);
103  }
104  if (nbytes == -1) {
105  std::string const name = (Operation == IOOperationType::READ) ? "pread" : "pwrite";
106  KVIKIO_EXPECT(errno != EBADF, "POSIX error: Operation not permitted");
107  KVIKIO_FAIL("POSIX error on " + name + ": " + strerror(errno));
108  }
109  if constexpr (Operation == IOOperationType::READ) {
110  KVIKIO_EXPECT(nbytes != 0, "POSIX error on pread: EOF");
111  }
112  if constexpr (PartialIOStatus == PartialIO::YES) { return nbytes; }
113  buffer += nbytes; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
114  cur_offset += nbytes;
115  byte_remaining -= nbytes;
116  }
117  return convert_size2ssize(count);
118 }
119 
131 template <IOOperationType Operation>
132 std::size_t posix_device_io(int fd,
133  void const* devPtr_base,
134  std::size_t size,
135  std::size_t file_offset,
136  std::size_t devPtr_offset)
137 {
138  auto alloc = AllocRetain::instance().get();
139  CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset;
140  off_t cur_file_offset = convert_size2off(file_offset);
141  off_t byte_remaining = convert_size2off(size);
142  off_t const chunk_size2 = convert_size2off(alloc.size());
143 
144  // Get a stream for the current CUDA context and thread
145  CUstream stream = StreamsByThread::get();
146 
147  while (byte_remaining > 0) {
148  off_t const nbytes_requested = std::min(chunk_size2, byte_remaining);
149  ssize_t nbytes_got = nbytes_requested;
150  if constexpr (Operation == IOOperationType::READ) {
151  nbytes_got = posix_host_io<IOOperationType::READ, PartialIO::YES>(
152  fd, alloc.get(), nbytes_requested, cur_file_offset);
153  CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.get(), nbytes_got, stream));
154  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
155  } else { // Is a write operation
157  cudaAPI::instance().MemcpyDtoHAsync(alloc.get(), devPtr, nbytes_requested, stream));
158  CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
159  posix_host_io<IOOperationType::WRITE, PartialIO::NO>(
160  fd, alloc.get(), nbytes_requested, cur_file_offset);
161  }
162  cur_file_offset += nbytes_got;
163  devPtr += nbytes_got;
164  byte_remaining -= nbytes_got;
165  }
166  return size;
167 }
168 
183 template <PartialIO PartialIOStatus>
184 std::size_t posix_host_read(int fd, void* buf, std::size_t size, std::size_t file_offset)
185 {
186  KVIKIO_NVTX_SCOPED_RANGE("posix_host_read()", size);
187  return detail::posix_host_io<IOOperationType::READ, PartialIOStatus>(
188  fd, buf, size, convert_size2off(file_offset));
189 }
190 
205 template <PartialIO PartialIOStatus>
206 std::size_t posix_host_write(int fd, void const* buf, std::size_t size, std::size_t file_offset)
207 {
208  KVIKIO_NVTX_SCOPED_RANGE("posix_host_write()", size);
209  return detail::posix_host_io<IOOperationType::WRITE, PartialIOStatus>(
210  fd, buf, size, convert_size2off(file_offset));
211 }
212 
226 std::size_t posix_device_read(int fd,
227  void const* devPtr_base,
228  std::size_t size,
229  std::size_t file_offset,
230  std::size_t devPtr_offset);
231 
245 std::size_t posix_device_write(int fd,
246  void const* devPtr_base,
247  std::size_t size,
248  std::size_t file_offset,
249  std::size_t devPtr_offset);
250 
251 } // namespace kvikio::detail
Singleton class to retrieve a CUDA stream for device-host copying.
Definition: posix_io.hpp:54
#define KVIKIO_EXPECT(...)
Macro for checking pre-conditions or conditions that throws an exception when a condition is violated...
Definition: error.hpp:216
#define CUDA_DRIVER_TRY(...)
Error checking macro for CUDA driver API functions.
Definition: error.hpp:68
#define KVIKIO_FAIL(...)
Indicates that an erroneous code path has been taken.
Definition: error.hpp:252