19 #include <sys/types.h>
24 #include <system_error>
27 #include <kvikio/buffer.hpp>
28 #include <kvikio/cufile/config.hpp>
30 #include <kvikio/error.hpp>
31 #include <kvikio/parallel_operation.hpp>
32 #include <kvikio/posix_io.hpp>
33 #include <kvikio/shim/cufile.hpp>
34 #include <kvikio/stream.hpp>
35 #include <kvikio/utils.hpp>
47 int _fd_direct_on{-1};
48 int _fd_direct_off{-1};
49 bool _initialized{
false};
51 mutable std::size_t _nbytes{0};
52 CUfileHandle_t _handle{};
67 if (!is_stream_api_available()) {
68 if (requested_compat_mode == CompatMode::AUTO) {
return true; }
69 throw std::runtime_error(
"Missing the cuFile stream api.");
74 if (config_path().empty()) {
75 if (requested_compat_mode == CompatMode::AUTO) {
return true; }
76 throw std::runtime_error(
"Missing cuFile configuration file.");
82 static constexpr mode_t m644 = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
102 const std::string& flags =
"r",
112 : _fd_direct_on{std::exchange(o._fd_direct_on, -1)},
113 _fd_direct_off{std::exchange(o._fd_direct_off, -1)},
114 _initialized{std::exchange(o._initialized,
false)},
115 _compat_mode{std::exchange(o._compat_mode, CompatMode::AUTO)},
116 _nbytes{std::exchange(o._nbytes, 0)},
117 _handle{std::exchange(o._handle, CUfileHandle_t{})}
120 FileHandle& operator=(FileHandle&& o) noexcept
122 _fd_direct_on = std::exchange(o._fd_direct_on, -1);
123 _fd_direct_off = std::exchange(o._fd_direct_off, -1);
124 _initialized = std::exchange(o._initialized,
false);
125 _compat_mode = std::exchange(o._compat_mode, CompatMode::AUTO);
126 _nbytes = std::exchange(o._nbytes, 0);
127 _handle = std::exchange(o._handle, CUfileHandle_t{});
130 ~FileHandle() noexcept {
close(); }
137 [[nodiscard]]
bool closed() const noexcept {
return !_initialized; }
147 _compat_mode = CompatMode::AUTO;
149 if (_fd_direct_on != -1) {
::close(_fd_direct_on); }
152 _initialized =
false;
167 throw CUfileException(
"The underlying cuFile handle isn't available in compatibility mode");
181 [[nodiscard]]
int fd() const noexcept {
return _fd_direct_off; }
201 [[nodiscard]] std::size_t
nbytes()
const;
233 std::size_t
read(
void* devPtr_base,
235 std::size_t file_offset,
236 std::size_t devPtr_offset,
237 bool sync_default_stream =
true)
240 return detail::posix_device_read(
241 _fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
243 if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(
nullptr)); }
245 KVIKIO_NVTX_SCOPED_RANGE(
"cufileRead()", size);
246 ssize_t ret = cuFileAPI::instance().Read(
247 _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
248 CUFILE_CHECK_BYTES_DONE(ret);
283 std::size_t
write(
const void* devPtr_base,
285 std::size_t file_offset,
286 std::size_t devPtr_offset,
287 bool sync_default_stream =
true)
292 return detail::posix_device_write(
293 _fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
295 if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(
nullptr)); }
297 KVIKIO_NVTX_SCOPED_RANGE(
"cufileWrite()", size);
298 ssize_t ret = cuFileAPI::instance().Write(
299 _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
301 throw std::system_error(errno, std::generic_category(),
"Unable to write file");
304 throw CUfileException(std::string{
"cuFile error at: "} + __FILE__ +
":" +
305 KVIKIO_STRINGIFY(__LINE__) +
": " + CUFILE_ERRSTR(ret));
337 std::future<std::size_t>
pread(
void* buf,
339 std::size_t file_offset = 0,
342 bool sync_default_stream =
true)
344 KVIKIO_NVTX_MARKER(
"FileHandle::pread()", size);
345 if (is_host_memory(buf)) {
346 auto op = [
this](
void* hostPtr_base,
348 std::size_t file_offset,
349 std::size_t hostPtr_offset) -> std::size_t {
350 char* buf =
static_cast<char*
>(hostPtr_base) + hostPtr_offset;
351 return detail::posix_host_read<detail::PartialIO::NO>(
352 _fd_direct_off, buf, size, file_offset);
355 return parallel_io(op, buf, size, file_offset, task_size, 0);
358 CUcontext ctx = get_context_from_pointer(buf);
361 if (size < gds_threshold) {
362 auto task = [
this, ctx, buf, size, file_offset]() -> std::size_t {
364 return detail::posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
366 return std::async(std::launch::deferred, task);
372 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(
nullptr));
376 auto task = [
this, ctx](
void* devPtr_base,
378 std::size_t file_offset,
379 std::size_t devPtr_offset) -> std::size_t {
381 return read(devPtr_base, size, file_offset, devPtr_offset,
false);
383 auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
384 return parallel_io(task, devPtr_base, size, file_offset, task_size, devPtr_offset);
414 std::future<std::size_t>
pwrite(
const void* buf,
416 std::size_t file_offset = 0,
419 bool sync_default_stream =
true)
421 KVIKIO_NVTX_MARKER(
"FileHandle::pwrite()", size);
422 if (is_host_memory(buf)) {
423 auto op = [
this](
const void* hostPtr_base,
425 std::size_t file_offset,
426 std::size_t hostPtr_offset) -> std::size_t {
427 const char* buf =
static_cast<const char*
>(hostPtr_base) + hostPtr_offset;
428 return detail::posix_host_write<detail::PartialIO::NO>(
429 _fd_direct_off, buf, size, file_offset);
432 return parallel_io(op, buf, size, file_offset, task_size, 0);
435 CUcontext ctx = get_context_from_pointer(buf);
438 if (size < gds_threshold) {
439 auto task = [
this, ctx, buf, size, file_offset]() -> std::size_t {
441 return detail::posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
443 return std::async(std::launch::deferred, task);
449 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(
nullptr));
453 auto op = [
this, ctx](
const void* devPtr_base,
455 std::size_t file_offset,
456 std::size_t devPtr_offset) -> std::size_t {
459 devPtr_base, size, file_offset, devPtr_offset,
false);
461 auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
462 return parallel_io(op, devPtr_base, size, file_offset, task_size, devPtr_offset);
501 off_t* file_offset_p,
502 off_t* devPtr_offset_p,
503 ssize_t* bytes_read_p,
507 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
509 static_cast<ssize_t
>(
read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
511 CUFILE_TRY(cuFileAPI::instance().ReadAsync(
512 _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream));
543 off_t file_offset = 0,
544 off_t devPtr_offset = 0,
545 CUstream stream =
nullptr)
547 StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
548 auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_] =
550 read_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_);
591 off_t* file_offset_p,
592 off_t* devPtr_offset_p,
593 ssize_t* bytes_written_p,
597 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
599 static_cast<ssize_t
>(
write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
601 CUFILE_TRY(cuFileAPI::instance().WriteAsync(
602 _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream));
633 off_t file_offset = 0,
634 off_t devPtr_offset = 0,
635 CUstream stream =
nullptr)
637 StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
638 auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_] =
640 write_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_);
670 static bool is_extra_symbol_available = is_stream_api_available();
671 static bool is_config_path_empty = config_path().empty();
Handle of an open file registered with cufile.
void close() noexcept
Deregister the file and close the two files.
FileHandle(const FileHandle &)=delete
FileHandle support move semantic but isn't copyable.
std::future< std::size_t > pread(void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold(), bool sync_default_stream=true)
Reads specified bytes from the file into the device or host memory in parallel.
void read_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_read_p, CUstream stream)
Reads specified bytes from the file into the device memory asynchronously.
CUfileHandle_t handle()
Get the underlying cuFile file handle.
int fd_open_flags() const
Get the flags of one of the file descriptors (see open(2))
StreamFuture write_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Writes specified bytes from the device memory into the file asynchronously.
std::size_t write(const void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset, bool sync_default_stream=true)
Writes specified bytes from the device memory into the file.
bool is_compat_mode_preferred_for_async() const noexcept
Returns true if the compatibility mode is expected to be ON for the asynchronous I/O on this file.
bool is_compat_mode_preferred() const noexcept
Returns true if the compatibility mode is expected to be ON for this file.
bool closed() const noexcept
Whether the file is closed according to its initialization status.
StreamFuture read_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Reads specified bytes from the file into the device memory asynchronously.
FileHandle(const std::string &file_path, const std::string &flags="r", mode_t mode=m644, CompatMode compat_mode=defaults::compat_mode())
Construct a file handle from a file path.
int fd() const noexcept
Get one of the file descriptors.
void write_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_written_p, CUstream stream)
Writes specified bytes from the device memory into the file asynchronously.
std::future< std::size_t > pwrite(const void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold(), bool sync_default_stream=true)
Writes specified bytes from device or host memory into the file in parallel.
std::size_t nbytes() const
Get the file size.
std::size_t read(void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset, bool sync_default_stream=true)
Reads specified bytes from the file into the device memory.
Push CUDA context on creation and pop it on destruction.
Future of an asynchronous IO operation.
std::tuple< void *, std::size_t *, off_t *, off_t *, ssize_t *, CUstream > get_args() const
Return the arguments of the future call.
static std::size_t task_size()
Get the default task size used for parallel IO operations.
static bool is_compat_mode_preferred()
Whether the global compatibility mode from class defaults is expected to be ON.
static std::size_t gds_threshold()
Get the default GDS threshold, which is the minimum size to use GDS (in bytes).
static CompatMode compat_mode()
Return whether the KvikIO library is running in compatibility mode or not.
CompatMode
I/O compatibility mode.