18 #include <sys/types.h>
21 #include <kvikio/error.hpp>
22 #include <kvikio/shim/cuda.hpp>
23 #include <kvikio/shim/cufile.hpp>
56 void* _devPtr_base{
nullptr};
57 CUstream _stream{
nullptr};
58 ArgByVal* _val{
nullptr};
59 bool _stream_synchronized{
false};
65 void* devPtr_base, std::size_t size, off_t file_offset, off_t devPtr_offset, CUstream stream)
66 : _devPtr_base{devPtr_base}, _stream{stream}
70 if ((_val =
static_cast<ArgByVal*
>(std::malloc(
sizeof(ArgByVal)))) ==
nullptr) {
71 throw std::bad_alloc{};
74 .size = size, .file_offset = file_offset, .devPtr_offset = devPtr_offset, .bytes_done = 0};
83 : _devPtr_base{std::exchange(o._devPtr_base,
nullptr)},
84 _stream{std::exchange(o._stream,
nullptr)},
85 _val{std::exchange(o._val,
nullptr)},
86 _stream_synchronized{o._stream_synchronized}
89 StreamFuture& operator=(StreamFuture&& o) noexcept
91 _devPtr_base = std::exchange(o._devPtr_base,
nullptr);
92 _stream = std::exchange(o._stream,
nullptr);
93 _val = std::exchange(o._val,
nullptr);
94 _stream_synchronized = o._stream_synchronized;
104 std::tuple<void*, std::size_t*, off_t*, off_t*, ssize_t*, CUstream>
get_args()
const
106 if (_val ==
nullptr) {
109 return {_devPtr_base,
112 &_val->devPtr_offset,
126 if (_val ==
nullptr) {
130 if (!_stream_synchronized) {
131 _stream_synchronized =
true;
132 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream));
135 CUFILE_CHECK_BYTES_DONE(_val->bytes_done);
138 return static_cast<std::size_t
>(_val->bytes_done);
147 if (_val !=
nullptr) {
151 std::cerr << e.what() << std::endl;
Future of an asynchronous IO operation.
StreamFuture(const StreamFuture &)=delete
StreamFuture support move semantic but isn't copyable.
~StreamFuture() noexcept
Free the by-value arguments and make sure the associated CUDA stream has been synchronized.
std::tuple< void *, std::size_t *, off_t *, off_t *, ssize_t *, CUstream > get_args() const
Return the arguments of the future call.
std::size_t check_bytes_done()
Return the number of bytes read or written by the future operation.