12 #include <type_traits>
14 #include <kvikio/bounce_buffer.hpp>
15 #include <kvikio/detail/nvtx.hpp>
16 #include <kvikio/detail/utils.hpp>
17 #include <kvikio/error.hpp>
18 #include <kvikio/shim/cuda.hpp>
19 #include <kvikio/utils.hpp>
21 namespace kvikio::detail {
26 enum class IOOperationType : uint8_t {
34 enum class PartialIO : uint8_t {
47 std::map<std::pair<CUcontext, std::thread::id>, CUstream> _streams;
60 KVIKIO_EXPORT
static CUstream get(CUcontext ctx, std::thread::id thd_id);
62 static CUstream get();
96 template <IOOperationType Operation,
97 PartialIO PartialIOStatus,
99 ssize_t posix_host_io(
100 int fd_direct_off,
void const* buf,
size_t count, off_t offset,
int fd_direct_on = -1)
102 auto pread_or_write = [](
int fd,
void* buf,
size_t count, off_t offset) -> ssize_t {
104 if constexpr (Operation == IOOperationType::READ) {
105 nbytes = ::pread(fd, buf, count, offset);
107 nbytes = ::pwrite(fd, buf, count, offset);
112 off_t cur_offset = offset;
113 size_t bytes_remaining = count;
114 char* buffer =
const_cast<char*
>(
static_cast<char const*
>(buf));
115 auto const page_size = get_page_size();
118 while (bytes_remaining > 0) {
119 ssize_t nbytes_processed{};
121 if (fd_direct_on == -1) {
123 nbytes_processed = pread_or_write(fd_direct_off, buffer, bytes_remaining, cur_offset);
126 auto const is_cur_offset_aligned = detail::is_aligned(cur_offset, page_size);
128 if (!is_cur_offset_aligned) {
131 auto const aligned_cur_offset = detail::align_up(cur_offset, page_size);
132 auto const bytes_requested = std::min(aligned_cur_offset - cur_offset, bytes_remaining);
133 nbytes_processed = pread_or_write(fd_direct_off, buffer, bytes_requested, cur_offset);
135 if (bytes_remaining < page_size) {
137 nbytes_processed = pread_or_write(fd_direct_off, buffer, bytes_remaining, cur_offset);
140 auto aligned_bytes_remaining = detail::align_down(bytes_remaining, page_size);
141 auto const is_buf_aligned = detail::is_aligned(buffer, page_size);
142 auto bytes_requested = aligned_bytes_remaining;
144 if (!is_buf_aligned) {
146 auto bounce_buffer = BounceBufferPoolType::instance().get();
147 auto* aligned_buf = bounce_buffer.get();
149 bytes_requested = std::min(bytes_requested, bounce_buffer.size());
151 if constexpr (Operation == IOOperationType::WRITE) {
153 std::memcpy(aligned_buf, buffer, bytes_requested);
158 pread_or_write(fd_direct_on, aligned_buf, bytes_requested, cur_offset);
160 if constexpr (Operation == IOOperationType::READ) {
162 std::memcpy(buffer, aligned_buf, nbytes_processed);
166 nbytes_processed = pread_or_write(fd_direct_on, buffer, bytes_requested, cur_offset);
173 if (nbytes_processed == -1) {
174 std::string
const name = (Operation == IOOperationType::READ) ?
"pread" :
"pwrite";
175 KVIKIO_EXPECT(errno != EBADF,
"POSIX error: Operation not permitted");
176 KVIKIO_FAIL(
"POSIX error on " + name +
": " + strerror(errno));
178 if constexpr (Operation == IOOperationType::READ) {
179 KVIKIO_EXPECT(nbytes_processed != 0,
"POSIX error on pread: EOF");
183 if constexpr (PartialIOStatus == PartialIO::YES) {
return nbytes_processed; }
186 buffer += nbytes_processed;
187 cur_offset += nbytes_processed;
188 bytes_remaining -= nbytes_processed;
191 return convert_size2ssize(count);
217 template <IOOperationType Operation,
typename BounceBufferPoolType = CudaPinnedBounceBufferPool>
218 std::size_t posix_device_io(
int fd_direct_off,
219 void const* devPtr_base,
221 std::size_t file_offset,
222 std::size_t devPtr_offset,
223 int fd_direct_on = -1)
227 if (std::is_same_v<BounceBufferPoolType, CudaPinnedBounceBufferPool>) {
230 "Direct I/O requires page-aligned bounce buffers. CudaPinnedBounceBufferPool does not "
231 "guarantee page alignment. Use CudaPageAlignedPinnedBounceBufferPool instead.");
234 auto bounce_buffer = BounceBufferPoolType::instance().get();
235 CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset;
236 off_t cur_file_offset = convert_size2off(file_offset);
237 off_t bytes_remaining = convert_size2off(size);
238 off_t
const chunk_size2 = convert_size2off(bounce_buffer.size());
241 CUstream stream = StreamsByThread::get();
243 while (bytes_remaining > 0) {
244 off_t
const nbytes_requested = std::min(chunk_size2, bytes_remaining);
245 ssize_t nbytes_got = nbytes_requested;
246 if constexpr (Operation == IOOperationType::READ) {
247 nbytes_got = posix_host_io<IOOperationType::READ, PartialIO::YES>(
248 fd_direct_off, bounce_buffer.get(), nbytes_requested, cur_file_offset, fd_direct_on);
250 cudaAPI::instance().MemcpyHtoDAsync(devPtr, bounce_buffer.get(), nbytes_got, stream));
254 cudaAPI::instance().MemcpyDtoHAsync(bounce_buffer.get(), devPtr, nbytes_requested, stream));
256 posix_host_io<IOOperationType::WRITE, PartialIO::NO>(
257 fd_direct_off, bounce_buffer.get(), nbytes_requested, cur_file_offset, fd_direct_on);
259 cur_file_offset += nbytes_got;
260 devPtr += nbytes_got;
261 bytes_remaining -= nbytes_got;
281 template <PartialIO PartialIOStatus>
282 std::size_t posix_host_read(
283 int fd_direct_off,
void* buf, std::size_t size, std::size_t file_offset,
int fd_direct_on = -1)
285 KVIKIO_NVTX_FUNC_RANGE(size);
287 auto cur_fd_direct_on{-1};
290 return detail::posix_host_io<IOOperationType::READ, PartialIOStatus>(
291 fd_direct_off, buf, size, convert_size2off(file_offset), cur_fd_direct_on);
309 template <PartialIO PartialIOStatus>
310 std::size_t posix_host_write(
int fd_direct_off,
313 std::size_t file_offset,
314 int fd_direct_on = -1)
316 KVIKIO_NVTX_FUNC_RANGE(size);
318 auto cur_fd_direct_on{-1};
321 return detail::posix_host_io<IOOperationType::WRITE, PartialIOStatus>(
322 fd_direct_off, buf, size, convert_size2off(file_offset), cur_fd_direct_on);
339 std::size_t posix_device_read(
int fd_direct_off,
340 void const* devPtr_base,
342 std::size_t file_offset,
343 std::size_t devPtr_offset,
344 int fd_direct_on = -1);
360 std::size_t posix_device_write(
int fd_direct_off,
361 void const* devPtr_base,
363 std::size_t file_offset,
364 std::size_t devPtr_offset,
365 int fd_direct_on = -1);
Thread-safe singleton pool for reusable bounce buffers.
static bool auto_direct_io_read()
Check if Direct I/O is enabled for POSIX reads.
static bool auto_direct_io_write()
Check if Direct I/O is enabled for POSIX writes.
Singleton class to retrieve a CUDA stream for device-host copying.
#define KVIKIO_EXPECT(...)
Macro for checking pre-conditions or conditions that throws an exception when a condition is violated...
#define CUDA_DRIVER_TRY(...)
Error checking macro for CUDA driver API functions.
#define KVIKIO_FAIL(...)
Indicates that an erroneous code path has been taken.