Asynchronous coordinator for memory reservation requests. More...
#include <memory_reserve_or_wait.hpp>
Public Member Functions | |
| MemoryReserveOrWait (config::Options options, MemoryType mem_type, std::shared_ptr< CoroThreadPoolExecutor > executor, std::shared_ptr< BufferResource > br) | |
Constructs a MemoryReserveOrWait instance. More... | |
| Actor | shutdown () |
| Shuts down all pending memory reservation requests. More... | |
| coro::task< MemoryReservation > | reserve_or_wait (std::size_t size, std::int64_t net_memory_delta) |
| Attempts to reserve memory or waits until progress can be made. More... | |
| coro::task< std::pair< MemoryReservation, std::size_t > > | reserve_or_wait_or_overbook (std::size_t size, std::int64_t net_memory_delta) |
Variant of reserve_or_wait() that allows overbooking on timeout. More... | |
| coro::task< MemoryReservation > | reserve_or_wait_or_fail (std::size_t size, std::int64_t net_memory_delta) |
Variant of reserve_or_wait() that fails if no progress is possible. More... | |
| std::size_t | size () const noexcept |
| Returns the number of pending memory reservation requests. More... | |
| std::size_t | periodic_memory_check_counter () const noexcept |
Returns the number of iterations performed by periodic_memory_check(). More... | |
| std::shared_ptr< CoroThreadPoolExecutor > const & | executor () const noexcept |
| Get the coroutine executor used by this instance. More... | |
| std::shared_ptr< BufferResource > const & | br () const noexcept |
| Get the buffer resource used for memory reservations. More... | |
| Duration | timeout () const noexcept |
| Get the configured progress timeout. More... | |
Static Public Attributes | |
| static constexpr std::int64_t | missing_net_memory_delta = 0 |
Sentinel indicating that net_memory_delta estimation has not yet been implemented. More... | |
Asynchronous coordinator for memory reservation requests.
MemoryReserveOrWait provides a coroutine-based mechanism for reserving memory with backpressure. Callers submit reservation requests via reserve_or_wait(), which suspends until enough memory is available or progress must be forced.
Definition at line 33 of file memory_reserve_or_wait.hpp.
| rapidsmpf::streaming::MemoryReserveOrWait::MemoryReserveOrWait | ( | config::Options | options, |
| MemoryType | mem_type, | ||
| std::shared_ptr< CoroThreadPoolExecutor > | executor, | ||
| std::shared_ptr< BufferResource > | br | ||
| ) |
Constructs a MemoryReserveOrWait instance.
If no reservation request can be satisfied within the timeout specified by the "memory_reserve_timeout" key in options, the coroutine forces progress by selecting the smallest pending request and attempting to reserve memory for it. This attempt may result in an empty reservation if the request still cannot be satisfied.
| options | Configuration options. |
| mem_type | The memory type for which reservations are requested. |
| executor | Shared pointer to a coroutine executor. |
| br | Buffer resource for memory allocation.* |
|
noexcept |
Get the buffer resource used for memory reservations.
|
noexcept |
Get the coroutine executor used by this instance.
|
noexcept |
Returns the number of iterations performed by periodic_memory_check().
This counter is incremented once per loop iteration inside periodic_memory_check(), and can be useful for diagnostics or testing.
| coro::task<MemoryReservation> rapidsmpf::streaming::MemoryReserveOrWait::reserve_or_wait | ( | std::size_t | size, |
| std::int64_t | net_memory_delta | ||
| ) |
Attempts to reserve memory or waits until progress can be made.
This coroutine submits a memory reservation request and then suspends until either sufficient memory becomes available or no reservation request (including other pending requests) makes progress within the configured timeout.
The timeout does not apply specifically to this request. Instead, it is used as a global progress guarantee: if no pending reservation request can be satisfied within the timeout, MemoryReserveOrWait forces progress by selecting the smallest pending request and attempting to reserve memory for it. The forced reservation attempt may result in an empty MemoryReservation if the selected request still cannot be satisfied.
When multiple reservation requests are eligible, MemoryReserveOrWait uses net_memory_delta as a heuristic to prefer requests that are expected to reduce memory pressure sooner. The value represents the estimated net change in memory usage after the reservation has been allocated and the dependent operation completes (that is, the memory impact after both allocating size and finishing the work that consumes the reservation):
Smaller values have higher priority. Examples:
net_memory_delta (memory usage increases).net_memory_delta (memory usage decreases).| size | Number of bytes to reserve. |
| net_memory_delta | Estimated net change in memory usage after the reservation is allocated and the dependent operation completes. Smaller values have higher priority. |
MemoryReservation representing the allocated memory, or an empty reservation if progress could not be made.| std::runtime_error | If shutdown occurs before the request can be processed. |
| coro::task<MemoryReservation> rapidsmpf::streaming::MemoryReserveOrWait::reserve_or_wait_or_fail | ( | std::size_t | size, |
| std::int64_t | net_memory_delta | ||
| ) |
Variant of reserve_or_wait() that fails if no progress is possible.
This coroutine behaves identically to reserve_or_wait() with respect to request submission, waiting, and progress guarantees until the progress timeout expires.
If no reservation request can be satisfied before the timeout, this method fails instead of forcing progress. Overbooking is not allowed, and no memory reservation is made.
| size | Number of bytes to reserve. |
| net_memory_delta | Heuristic used to prioritize eligible requests. See reserve_or_wait() for details and semantics. |
MemoryReservation representing the allocated memory.| rapidsmpf::reservation_error | If no progress is possible within the timeout. |
| std::runtime_error | If shutdown occurs before the request can be processed. |
| coro::task<std::pair<MemoryReservation, std::size_t> > rapidsmpf::streaming::MemoryReserveOrWait::reserve_or_wait_or_overbook | ( | std::size_t | size, |
| std::int64_t | net_memory_delta | ||
| ) |
Variant of reserve_or_wait() that allows overbooking on timeout.
This coroutine behaves identically to reserve_or_wait() with respect to request submission, waiting, and progress guarantees. The only difference is the behavior when the progress timeout expires.
If no reservation request can be satisfied before the timeout, this method attempts to reserve the requested memory by allowing overbooking. This guarantees forward progress, but may exceed the configured memory limits.
| size | Number of bytes to reserve. |
| net_memory_delta | Heuristic used to prioritize eligible requests. See reserve_or_wait() for details and semantics. |
MemoryReservation representing the allocated memory.| std::runtime_error | If shutdown occurs before the request can be processed. |
| Actor rapidsmpf::streaming::MemoryReserveOrWait::shutdown | ( | ) |
Shuts down all pending memory reservation requests.
|
noexcept |
Returns the number of pending memory reservation requests.
It may change concurrently as requests are added or fulfilled.
|
noexcept |
Get the configured progress timeout.
|
staticconstexpr |
Sentinel indicating that net_memory_delta estimation has not yet been implemented.
This value is used when a reasonable estimate of the net memory delta is not yet available. Any use of this sentinel should be treated as a TODO, since providing a concrete estimate enables better spilling and scheduling decisions.
Definition at line 46 of file memory_reserve_or_wait.hpp.