11 #include <coro/task.hpp>
13 #include <rapidsmpf/config.hpp>
14 #include <rapidsmpf/memory/buffer_resource.hpp>
15 #include <rapidsmpf/streaming/core/actor.hpp>
16 #include <rapidsmpf/streaming/core/coro_executor.hpp>
17 #include <rapidsmpf/streaming/core/coro_utils.hpp>
18 #include <rapidsmpf/utils/misc.hpp>
65 std::shared_ptr<CoroThreadPoolExecutor>
executor,
66 std::shared_ptr<BufferResource>
br
123 std::
size_t size, std::int64_t net_memory_delta
151 std::
size_t size, std::int64_t net_memory_delta
177 std::
size_t size, std::int64_t net_memory_delta
187 [[nodiscard]] std::
size_t size() const noexcept;
234 std::int64_t net_memory_delta;
237 std::uint64_t sequence_number;
240 coro::queue<MemoryReservation>& queue;
243 friend bool operator<(Request
const& a, Request
const& b) {
244 return std::tie(a.size, a.sequence_number)
245 < std::tie(b.size, b.sequence_number);
279 coro::task<void> periodic_memory_check();
281 mutable std::mutex mutex_;
282 std::uint64_t sequence_counter{0};
284 std::shared_ptr<CoroThreadPoolExecutor> executor_;
285 std::shared_ptr<BufferResource> br_;
287 std::set<Request> reservation_requests_;
288 std::atomic<std::uint64_t> periodic_memory_check_counter_{0};
289 std::optional<coro::task<void>> periodic_memory_check_task_;
354 std::shared_ptr<Context> ctx,
356 std::int64_t net_memory_delta,
358 std::optional<AllowOverbooking> allow_overbooking = std::nullopt
Class managing buffer resources.
Represents a reservation for future memory allocation.
Manages configuration options for RapidsMPF operations.
Executor wrapper around a coro::thread_pool used for coroutine execution.
Asynchronous coordinator for memory reservation requests.
Actor shutdown()
Shuts down all pending memory reservation requests.
coro::task< MemoryReservation > reserve_or_wait_or_fail(std::size_t size, std::int64_t net_memory_delta)
Variant of reserve_or_wait() that fails if no progress is possible.
coro::task< MemoryReservation > reserve_or_wait(std::size_t size, std::int64_t net_memory_delta)
Attempts to reserve memory or waits until progress can be made.
std::size_t size() const noexcept
Returns the number of pending memory reservation requests.
MemoryReserveOrWait(config::Options options, MemoryType mem_type, std::shared_ptr< CoroThreadPoolExecutor > executor, std::shared_ptr< BufferResource > br)
Constructs a MemoryReserveOrWait instance.
std::shared_ptr< CoroThreadPoolExecutor > const & executor() const noexcept
Get the coroutine executor used by this instance.
std::shared_ptr< BufferResource > const & br() const noexcept
Get the buffer resource used for memory reservations.
coro::task< std::pair< MemoryReservation, std::size_t > > reserve_or_wait_or_overbook(std::size_t size, std::int64_t net_memory_delta)
Variant of reserve_or_wait() that allows overbooking on timeout.
static constexpr std::int64_t missing_net_memory_delta
Sentinel indicating that net_memory_delta estimation has not yet been implemented.
Duration timeout() const noexcept
Get the configured progress timeout.
std::size_t periodic_memory_check_counter() const noexcept
Returns the number of iterations performed by periodic_memory_check().
coro::task< void > Actor
Alias for an actor in a streaming graph.
coro::task< MemoryReservation > reserve_memory(std::shared_ptr< Context > ctx, std::size_t size, std::int64_t net_memory_delta, MemoryType mem_type=MemoryType::DEVICE, std::optional< AllowOverbooking > allow_overbooking=std::nullopt)
Reserve memory using the context memory reservation mechanism.
std::chrono::duration< double > Duration
Alias for a duration type representing time in seconds as a double.
MemoryType
Enum representing the type of memory sorted in decreasing order of preference.