19 #include <sys/types.h>
24 #include <system_error>
27 #include <kvikio/buffer.hpp>
28 #include <kvikio/cufile/config.hpp>
30 #include <kvikio/error.hpp>
31 #include <kvikio/parallel_operation.hpp>
32 #include <kvikio/posix_io.hpp>
33 #include <kvikio/shim/cufile.hpp>
34 #include <kvikio/stream.hpp>
35 #include <kvikio/utils.hpp>
47 int _fd_direct_on{-1};
48 int _fd_direct_off{-1};
49 bool _initialized{
false};
51 mutable std::size_t _nbytes{0};
52 CUfileHandle_t _handle{};
66 if (!is_batch_and_stream_available()) {
67 if (requested_compat_mode == CompatMode::AUTO) {
return true; }
68 throw std::runtime_error(
"Missing cuFile batch or stream library symbol.");
73 if (config_path().empty()) {
74 if (requested_compat_mode == CompatMode::AUTO) {
return true; }
75 throw std::runtime_error(
"Missing cuFile configuration file.");
85 static constexpr mode_t m644 = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
105 const std::string& flags =
"r",
115 : _fd_direct_on{std::exchange(o._fd_direct_on, -1)},
116 _fd_direct_off{std::exchange(o._fd_direct_off, -1)},
117 _initialized{std::exchange(o._initialized,
false)},
118 _compat_mode{std::exchange(o._compat_mode, CompatMode::AUTO)},
119 _nbytes{std::exchange(o._nbytes, 0)},
120 _handle{std::exchange(o._handle, CUfileHandle_t{})}
123 FileHandle& operator=(FileHandle&& o) noexcept
125 _fd_direct_on = std::exchange(o._fd_direct_on, -1);
126 _fd_direct_off = std::exchange(o._fd_direct_off, -1);
127 _initialized = std::exchange(o._initialized,
false);
128 _compat_mode = std::exchange(o._compat_mode, CompatMode::AUTO);
129 _nbytes = std::exchange(o._nbytes, 0);
130 _handle = std::exchange(o._handle, CUfileHandle_t{});
133 ~FileHandle() noexcept {
close(); }
140 [[nodiscard]]
bool closed() const noexcept {
return !_initialized; }
150 _compat_mode = CompatMode::AUTO;
152 if (_fd_direct_on != -1) {
::close(_fd_direct_on); }
155 _initialized =
false;
170 throw CUfileException(
"The underlying cuFile handle isn't available in compatibility mode");
184 [[nodiscard]]
int fd() const noexcept {
return _fd_direct_off; }
204 [[nodiscard]] std::size_t
nbytes()
const;
236 std::size_t
read(
void* devPtr_base,
238 std::size_t file_offset,
239 std::size_t devPtr_offset,
240 bool sync_default_stream =
true)
243 return detail::posix_device_read(
244 _fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
246 if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(
nullptr)); }
248 KVIKIO_NVTX_SCOPED_RANGE(
"cufileRead()", size);
249 ssize_t ret = cuFileAPI::instance().Read(
250 _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
251 CUFILE_CHECK_BYTES_DONE(ret);
286 std::size_t
write(
const void* devPtr_base,
288 std::size_t file_offset,
289 std::size_t devPtr_offset,
290 bool sync_default_stream =
true)
295 return detail::posix_device_write(
296 _fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
298 if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(
nullptr)); }
300 KVIKIO_NVTX_SCOPED_RANGE(
"cufileWrite()", size);
301 ssize_t ret = cuFileAPI::instance().Write(
302 _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
304 throw std::system_error(errno, std::generic_category(),
"Unable to write file");
307 throw CUfileException(std::string{
"cuFile error at: "} + __FILE__ +
":" +
308 KVIKIO_STRINGIFY(__LINE__) +
": " + CUFILE_ERRSTR(ret));
340 std::future<std::size_t>
pread(
void* buf,
342 std::size_t file_offset = 0,
345 bool sync_default_stream =
true)
347 KVIKIO_NVTX_MARKER(
"FileHandle::pread()", size);
348 if (is_host_memory(buf)) {
349 auto op = [
this](
void* hostPtr_base,
351 std::size_t file_offset,
352 std::size_t hostPtr_offset) -> std::size_t {
353 char* buf =
static_cast<char*
>(hostPtr_base) + hostPtr_offset;
354 return detail::posix_host_read<detail::PartialIO::NO>(
355 _fd_direct_off, buf, size, file_offset);
358 return parallel_io(op, buf, size, file_offset, task_size, 0);
361 CUcontext ctx = get_context_from_pointer(buf);
364 if (size < gds_threshold) {
365 auto task = [
this, ctx, buf, size, file_offset]() -> std::size_t {
367 return detail::posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
369 return std::async(std::launch::deferred, task);
375 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(
nullptr));
379 auto task = [
this, ctx](
void* devPtr_base,
381 std::size_t file_offset,
382 std::size_t devPtr_offset) -> std::size_t {
384 return read(devPtr_base, size, file_offset, devPtr_offset,
false);
386 auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
387 return parallel_io(task, devPtr_base, size, file_offset, task_size, devPtr_offset);
417 std::future<std::size_t>
pwrite(
const void* buf,
419 std::size_t file_offset = 0,
422 bool sync_default_stream =
true)
424 KVIKIO_NVTX_MARKER(
"FileHandle::pwrite()", size);
425 if (is_host_memory(buf)) {
426 auto op = [
this](
const void* hostPtr_base,
428 std::size_t file_offset,
429 std::size_t hostPtr_offset) -> std::size_t {
430 const char* buf =
static_cast<const char*
>(hostPtr_base) + hostPtr_offset;
431 return detail::posix_host_write<detail::PartialIO::NO>(
432 _fd_direct_off, buf, size, file_offset);
435 return parallel_io(op, buf, size, file_offset, task_size, 0);
438 CUcontext ctx = get_context_from_pointer(buf);
441 if (size < gds_threshold) {
442 auto task = [
this, ctx, buf, size, file_offset]() -> std::size_t {
444 return detail::posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
446 return std::async(std::launch::deferred, task);
452 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(
nullptr));
456 auto op = [
this, ctx](
const void* devPtr_base,
458 std::size_t file_offset,
459 std::size_t devPtr_offset) -> std::size_t {
462 devPtr_base, size, file_offset, devPtr_offset,
false);
464 auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
465 return parallel_io(op, devPtr_base, size, file_offset, task_size, devPtr_offset);
504 off_t* file_offset_p,
505 off_t* devPtr_offset_p,
506 ssize_t* bytes_read_p,
510 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
512 static_cast<ssize_t
>(
read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
514 CUFILE_TRY(cuFileAPI::instance().ReadAsync(
515 _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream));
546 off_t file_offset = 0,
547 off_t devPtr_offset = 0,
548 CUstream stream =
nullptr)
550 StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
551 auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_] =
553 read_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_);
594 off_t* file_offset_p,
595 off_t* devPtr_offset_p,
596 ssize_t* bytes_written_p,
600 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
602 static_cast<ssize_t
>(
write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
604 CUFILE_TRY(cuFileAPI::instance().WriteAsync(
605 _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream));
636 off_t file_offset = 0,
637 off_t devPtr_offset = 0,
638 CUstream stream =
nullptr)
640 StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
641 auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_] =
643 write_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_);
673 static bool is_extra_symbol_available = is_batch_and_stream_available();
674 static bool is_config_path_empty = config_path().empty();
Handle of an open file registered with cufile.
void close() noexcept
Deregister the file and close the two files.
FileHandle(const FileHandle &)=delete
FileHandle support move semantic but isn't copyable.
std::future< std::size_t > pread(void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold(), bool sync_default_stream=true)
Reads specified bytes from the file into the device or host memory in parallel.
void read_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_read_p, CUstream stream)
Reads specified bytes from the file into the device memory asynchronously.
CUfileHandle_t handle()
Get the underlying cuFile file handle.
int fd_open_flags() const
Get the flags of one of the file descriptors (see open(2))
StreamFuture write_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Writes specified bytes from the device memory into the file asynchronously.
std::size_t write(const void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset, bool sync_default_stream=true)
Writes specified bytes from the device memory into the file.
bool is_compat_mode_preferred_for_async() const noexcept
Returns true if the compatibility mode is expected to be ON for the asynchronous I/O on this file.
bool is_compat_mode_preferred() const noexcept
Returns true if the compatibility mode is expected to be ON for this file.
bool closed() const noexcept
Whether the file is closed according to its initialization status.
StreamFuture read_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Reads specified bytes from the file into the device memory asynchronously.
FileHandle(const std::string &file_path, const std::string &flags="r", mode_t mode=m644, CompatMode compat_mode=defaults::compat_mode())
Construct a file handle from a file path.
int fd() const noexcept
Get one of the file descriptors.
void write_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_written_p, CUstream stream)
Writes specified bytes from the device memory into the file asynchronously.
std::future< std::size_t > pwrite(const void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold(), bool sync_default_stream=true)
Writes specified bytes from device or host memory into the file in parallel.
std::size_t nbytes() const
Get the file size.
std::size_t read(void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset, bool sync_default_stream=true)
Reads specified bytes from the file into the device memory.
Push CUDA context on creation and pop it on destruction.
Future of an asynchronous IO operation.
std::tuple< void *, std::size_t *, off_t *, off_t *, ssize_t *, CUstream > get_args() const
Return the arguments of the future call.
static std::size_t task_size()
Get the default task size used for parallel IO operations.
static bool is_compat_mode_preferred()
Whether the global compatibility mode from class defaults is expected to be ON.
static std::size_t gds_threshold()
Get the default GDS threshold, which is the minimum size to use GDS (in bytes).
static CompatMode compat_mode()
Return whether the KvikIO library is running in compatibility mode or not.
CompatMode
I/O compatibility mode.