20 #include <sys/types.h>
29 #include <system_error>
32 #include <kvikio/buffer.hpp>
33 #include <kvikio/cufile_config.hpp>
34 #include <kvikio/defaults.hpp>
35 #include <kvikio/error.hpp>
36 #include <kvikio/parallel_operation.hpp>
37 #include <kvikio/posix_io.hpp>
38 #include <kvikio/shim/cufile.hpp>
39 #include <kvikio/stream.hpp>
40 #include <kvikio/utils.hpp>
55 inline int open_fd_parse_flags(
const std::string& flags,
bool o_direct)
58 if (flags.empty()) {
throw std::invalid_argument(
"Unknown file open flag"); }
61 file_flags = O_RDONLY;
62 if (flags[1] ==
'+') { file_flags = O_RDWR; }
65 file_flags = O_WRONLY;
66 if (flags[1] ==
'+') { file_flags = O_RDWR; }
67 file_flags |= O_CREAT | O_TRUNC;
69 case 'a':
throw std::invalid_argument(
"Open flag 'a' isn't supported");
70 default:
throw std::invalid_argument(
"Unknown file open flag");
72 file_flags |= O_CLOEXEC;
75 file_flags |= O_DIRECT;
77 throw std::invalid_argument(
"'o_direct' flag unsupported on this platform");
91 inline int open_fd(
const std::string& file_path,
92 const std::string& flags,
97 int fd = ::open(file_path.c_str(), open_fd_parse_flags(flags, o_direct), mode);
98 if (fd == -1) {
throw std::system_error(errno, std::generic_category(),
"Unable to open file"); }
107 [[nodiscard]]
inline int open_flags(
int fd)
109 int ret = fcntl(fd, F_GETFL);
111 throw std::system_error(errno, std::generic_category(),
"Unable to retrieve open flags");
122 [[nodiscard]]
inline std::size_t get_file_size(
int file_descriptor)
125 int ret = fstat(file_descriptor, &st);
127 throw std::system_error(errno, std::generic_category(),
"Unable to query file size");
129 return static_cast<std::size_t
>(st.st_size);
142 int _fd_direct_on{-1};
143 int _fd_direct_off{-1};
144 bool _initialized{
false};
145 bool _compat_mode{
false};
146 mutable std::size_t _nbytes{0};
147 CUfileHandle_t _handle{};
150 static constexpr mode_t m644 = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
170 const std::string& flags =
"r",
173 : _fd_direct_off{detail::open_fd(file_path, flags, false, mode)},
175 _compat_mode{compat_mode}
183 _fd_direct_on = detail::open_fd(file_path, flags,
true, mode);
184 }
catch (
const std::system_error&) {
186 }
catch (
const std::invalid_argument&) {
193 desc.type = CU_FILE_HANDLE_TYPE_OPAQUE_FD;
195 desc.handle.fd = _fd_direct_on;
196 CUFILE_TRY(cuFileAPI::instance().HandleRegister(&_handle, &desc));
206 : _fd_direct_on{std::exchange(o._fd_direct_on, -1)},
207 _fd_direct_off{std::exchange(o._fd_direct_off, -1)},
208 _initialized{std::exchange(o._initialized,
false)},
209 _compat_mode{std::exchange(o._compat_mode,
false)},
210 _nbytes{std::exchange(o._nbytes, 0)},
211 _handle{std::exchange(o._handle, CUfileHandle_t{})}
214 FileHandle& operator=(FileHandle&& o) noexcept
216 _fd_direct_on = std::exchange(o._fd_direct_on, -1);
217 _fd_direct_off = std::exchange(o._fd_direct_off, -1);
218 _initialized = std::exchange(o._initialized,
false);
219 _compat_mode = std::exchange(o._compat_mode,
false);
220 _nbytes = std::exchange(o._nbytes, 0);
221 _handle = std::exchange(o._handle, CUfileHandle_t{});
224 ~FileHandle() noexcept {
close(); }
226 [[nodiscard]]
bool closed() const noexcept {
return !_initialized; }
233 if (closed()) {
return; }
235 if (!_compat_mode) { cuFileAPI::instance().HandleDeregister(_handle); }
237 if (_fd_direct_on != -1) {
::close(_fd_direct_on); }
240 _initialized =
false;
255 throw CUfileException(
"The underlying cuFile handle isn't available in compatibility mode");
269 [[nodiscard]]
int fd() const noexcept {
return _fd_direct_off; }
280 [[nodiscard]]
int fd_open_flags()
const {
return detail::open_flags(_fd_direct_off); }
291 if (closed()) {
return 0; }
292 if (_nbytes == 0) { _nbytes = detail::get_file_size(_fd_direct_off); }
326 std::size_t
read(
void* devPtr_base,
328 std::size_t file_offset,
329 std::size_t devPtr_offset,
330 bool sync_default_stream =
true)
333 return posix_device_read(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
335 if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(
nullptr)); }
337 KVIKIO_NVTX_FUNC_RANGE(
"cufileRead()", size);
338 ssize_t ret = cuFileAPI::instance().Read(
339 _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
340 CUFILE_CHECK_BYTES_DONE(ret);
375 std::size_t
write(
const void* devPtr_base,
377 std::size_t file_offset,
378 std::size_t devPtr_offset,
379 bool sync_default_stream =
true)
384 return posix_device_write(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
386 if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(
nullptr)); }
388 KVIKIO_NVTX_FUNC_RANGE(
"cufileWrite()", size);
389 ssize_t ret = cuFileAPI::instance().Write(
390 _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
392 throw std::system_error(errno, std::generic_category(),
"Unable to write file");
395 throw CUfileException(std::string{
"cuFile error at: "} + __FILE__ +
":" +
396 KVIKIO_STRINGIFY(__LINE__) +
": " + CUFILE_ERRSTR(ret));
428 std::future<std::size_t>
pread(
void* buf,
430 std::size_t file_offset = 0,
433 bool sync_default_stream =
true)
435 if (is_host_memory(buf)) {
436 auto op = [
this](
void* hostPtr_base,
438 std::size_t file_offset,
439 std::size_t hostPtr_offset) -> std::size_t {
440 char* buf =
static_cast<char*
>(hostPtr_base) + hostPtr_offset;
441 return posix_host_read(_fd_direct_off, buf, size, file_offset,
false);
444 return parallel_io(op, buf, size, file_offset, task_size, 0);
447 CUcontext ctx = get_context_from_pointer(buf);
450 if (size < gds_threshold) {
451 auto task = [
this, ctx, buf, size, file_offset]() -> std::size_t {
453 return posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
455 return std::async(std::launch::deferred, task);
459 if (sync_default_stream && !_compat_mode) {
461 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(
nullptr));
465 auto task = [
this, ctx](
void* devPtr_base,
467 std::size_t file_offset,
468 std::size_t devPtr_offset) -> std::size_t {
470 return read(devPtr_base, size, file_offset, devPtr_offset,
false);
472 auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
473 return parallel_io(task, devPtr_base, size, file_offset, task_size, devPtr_offset);
503 std::future<std::size_t>
pwrite(
const void* buf,
505 std::size_t file_offset = 0,
508 bool sync_default_stream =
true)
510 if (is_host_memory(buf)) {
511 auto op = [
this](
const void* hostPtr_base,
513 std::size_t file_offset,
514 std::size_t hostPtr_offset) -> std::size_t {
515 const char* buf =
static_cast<const char*
>(hostPtr_base) + hostPtr_offset;
516 return posix_host_write(_fd_direct_off, buf, size, file_offset,
false);
519 return parallel_io(op, buf, size, file_offset, task_size, 0);
522 CUcontext ctx = get_context_from_pointer(buf);
525 if (size < gds_threshold) {
526 auto task = [
this, ctx, buf, size, file_offset]() -> std::size_t {
528 return posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
530 return std::async(std::launch::deferred, task);
534 if (sync_default_stream && !_compat_mode) {
536 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(
nullptr));
540 auto op = [
this, ctx](
const void* devPtr_base,
542 std::size_t file_offset,
543 std::size_t devPtr_offset) -> std::size_t {
546 devPtr_base, size, file_offset, devPtr_offset,
false);
548 auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
549 return parallel_io(op, devPtr_base, size, file_offset, task_size, devPtr_offset);
588 off_t* file_offset_p,
589 off_t* devPtr_offset_p,
590 ssize_t* bytes_read_p,
595 if (kvikio::is_batch_and_stream_available() && !_compat_mode && !config_path().empty()) {
596 CUFILE_TRY(cuFileAPI::instance().ReadAsync(
597 _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream));
600 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
602 static_cast<ssize_t
>(
read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
632 off_t file_offset = 0,
633 off_t devPtr_offset = 0,
634 CUstream stream =
nullptr)
636 StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
637 auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_] =
639 read_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_);
680 off_t* file_offset_p,
681 off_t* devPtr_offset_p,
682 ssize_t* bytes_written_p,
687 if (kvikio::is_batch_and_stream_available() && !_compat_mode && !config_path().empty()) {
688 CUFILE_TRY(cuFileAPI::instance().WriteAsync(
689 _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream));
692 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
694 static_cast<ssize_t
>(
write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
724 off_t file_offset = 0,
725 off_t devPtr_offset = 0,
726 CUstream stream =
nullptr)
728 StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
729 auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_] =
731 write_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_);
Handle of an open file registered with cufile.
void close() noexcept
Deregister the file and close the two files.
FileHandle(const FileHandle &)=delete
FileHandle support move semantic but isn't copyable.
std::future< std::size_t > pread(void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold(), bool sync_default_stream=true)
Reads specified bytes from the file into the device or host memory in parallel.
void read_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_read_p, CUstream stream)
Reads specified bytes from the file into the device memory asynchronously.
CUfileHandle_t handle()
Get the underlying cuFile file handle.
int fd_open_flags() const
Get the flags of one of the file descriptors (see open(2))
FileHandle(const std::string &file_path, const std::string &flags="r", mode_t mode=m644, bool compat_mode=defaults::compat_mode())
Construct a file handle from a file path.
bool is_compat_mode_on() const noexcept
Returns true if the compatibility mode has been enabled for this file.
StreamFuture write_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Writes specified bytes from the device memory into the file asynchronously.
std::size_t write(const void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset, bool sync_default_stream=true)
Writes specified bytes from the device memory into the file.
StreamFuture read_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Reads specified bytes from the file into the device memory asynchronously.
int fd() const noexcept
Get one of the file descriptors.
void write_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_written_p, CUstream stream)
Writes specified bytes from the device memory into the file asynchronously.
std::future< std::size_t > pwrite(const void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold(), bool sync_default_stream=true)
Writes specified bytes from device or host memory into the file in parallel.
std::size_t nbytes() const
Get the file size.
std::size_t read(void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset, bool sync_default_stream=true)
Reads specified bytes from the file into the device memory.
Push CUDA context on creation and pop it on destruction.
Future of an asynchronous IO operation.
std::tuple< void *, std::size_t *, off_t *, off_t *, ssize_t *, CUstream > get_args() const
Return the arguments of the future call.
static std::size_t task_size()
Get the default task size used for parallel IO operations.
static bool compat_mode()
Return whether the KvikIO library is running in compatibility mode or not.
static std::size_t gds_threshold()
Get the default GDS threshold, which is the minimum size to use GDS (in bytes).