18 #include <sys/types.h>
21 #include <kvikio/error.hpp>
22 #include <kvikio/shim/cuda.hpp>
23 #include <kvikio/shim/cufile.hpp>
57 void* _devPtr_base{
nullptr};
58 CUstream _stream{
nullptr};
59 ArgByVal* _val{
nullptr};
60 bool _stream_synchronized{
false};
66 void* devPtr_base, std::size_t size, off_t file_offset, off_t devPtr_offset, CUstream stream)
67 : _devPtr_base{devPtr_base}, _stream{stream}
71 if ((_val =
static_cast<ArgByVal*
>(std::malloc(
sizeof(ArgByVal)))) ==
nullptr) {
72 throw std::bad_alloc{};
75 .size = size, .file_offset = file_offset, .devPtr_offset = devPtr_offset, .bytes_done = 0};
84 : _devPtr_base{std::exchange(o._devPtr_base,
nullptr)},
85 _stream{std::exchange(o._stream,
nullptr)},
86 _val{std::exchange(o._val,
nullptr)},
87 _stream_synchronized{o._stream_synchronized}
90 StreamFuture& operator=(StreamFuture&& o) noexcept
92 _devPtr_base = std::exchange(o._devPtr_base,
nullptr);
93 _stream = std::exchange(o._stream,
nullptr);
94 _val = std::exchange(o._val,
nullptr);
95 _stream_synchronized = o._stream_synchronized;
105 std::tuple<void*, std::size_t*, off_t*, off_t*, ssize_t*, CUstream>
get_args()
const
107 if (_val ==
nullptr) {
110 return {_devPtr_base,
113 &_val->devPtr_offset,
127 if (_val ==
nullptr) {
131 if (!_stream_synchronized) {
132 _stream_synchronized =
true;
133 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream));
136 CUFILE_CHECK_BYTES_DONE(_val->bytes_done);
139 return static_cast<std::size_t
>(_val->bytes_done);
148 if (_val !=
nullptr) {
152 std::cerr << e.what() << std::endl;
Future of an asynchronous IO operation.
StreamFuture(const StreamFuture &)=delete
StreamFuture support move semantic but isn't copyable.
~StreamFuture() noexcept
Free the by-value arguments and make sure the associated CUDA stream has been synchronized.
std::tuple< void *, std::size_t *, off_t *, off_t *, ssize_t *, CUstream > get_args() const
Return the arguments of the future call.
std::size_t check_bytes_done()
Return the number of bytes read or written by the future operation.