Classes | Public Member Functions | Static Public Attributes | List of all members
rapidsmpf::streaming::MemoryReserveOrWait Class Reference

Asynchronous coordinator for memory reservation requests. More...

#include <memory_reserve_or_wait.hpp>

Public Member Functions

 MemoryReserveOrWait (config::Options options, MemoryType mem_type, std::shared_ptr< CoroThreadPoolExecutor > executor, std::shared_ptr< BufferResource > br)
 Constructs a MemoryReserveOrWait instance. More...
 
Actor shutdown ()
 Shuts down all pending memory reservation requests. More...
 
coro::task< MemoryReservationreserve_or_wait (std::size_t size, std::int64_t net_memory_delta)
 Attempts to reserve memory or waits until progress can be made. More...
 
coro::task< std::pair< MemoryReservation, std::size_t > > reserve_or_wait_or_overbook (std::size_t size, std::int64_t net_memory_delta)
 Variant of reserve_or_wait() that allows overbooking on timeout. More...
 
coro::task< MemoryReservationreserve_or_wait_or_fail (std::size_t size, std::int64_t net_memory_delta)
 Variant of reserve_or_wait() that fails if no progress is possible. More...
 
std::size_t size () const noexcept
 Returns the number of pending memory reservation requests. More...
 
std::size_t periodic_memory_check_counter () const noexcept
 Returns the number of iterations performed by periodic_memory_check(). More...
 
std::shared_ptr< CoroThreadPoolExecutor > const & executor () const noexcept
 Get the coroutine executor used by this instance. More...
 
std::shared_ptr< BufferResource > const & br () const noexcept
 Get the buffer resource used for memory reservations. More...
 
Duration timeout () const noexcept
 Get the configured progress timeout. More...
 

Static Public Attributes

static constexpr std::int64_t missing_net_memory_delta = 0
 Sentinel indicating that net_memory_delta estimation has not yet been implemented. More...
 

Detailed Description

Asynchronous coordinator for memory reservation requests.

MemoryReserveOrWait provides a coroutine-based mechanism for reserving memory with backpressure. Callers submit reservation requests via reserve_or_wait(), which suspends until enough memory is available or progress must be forced.

Definition at line 33 of file memory_reserve_or_wait.hpp.

Constructor & Destructor Documentation

◆ MemoryReserveOrWait()

rapidsmpf::streaming::MemoryReserveOrWait::MemoryReserveOrWait ( config::Options  options,
MemoryType  mem_type,
std::shared_ptr< CoroThreadPoolExecutor executor,
std::shared_ptr< BufferResource br 
)

Constructs a MemoryReserveOrWait instance.

If no reservation request can be satisfied within the timeout specified by the "memory_reserve_timeout" key in options, the coroutine forces progress by selecting the smallest pending request and attempting to reserve memory for it. This attempt may result in an empty reservation if the request still cannot be satisfied.

Parameters
optionsConfiguration options.
mem_typeThe memory type for which reservations are requested.
executorShared pointer to a coroutine executor.
brBuffer resource for memory allocation.*

Member Function Documentation

◆ br()

std::shared_ptr<BufferResource> const& rapidsmpf::streaming::MemoryReserveOrWait::br ( ) const
noexcept

Get the buffer resource used for memory reservations.

Returns
Shared pointer to the buffer resource.

◆ executor()

std::shared_ptr<CoroThreadPoolExecutor> const& rapidsmpf::streaming::MemoryReserveOrWait::executor ( ) const
noexcept

Get the coroutine executor used by this instance.

Returns
Shared pointer to the coroutine executor.

◆ periodic_memory_check_counter()

std::size_t rapidsmpf::streaming::MemoryReserveOrWait::periodic_memory_check_counter ( ) const
noexcept

Returns the number of iterations performed by periodic_memory_check().

This counter is incremented once per loop iteration inside periodic_memory_check(), and can be useful for diagnostics or testing.

Returns
The total number of memory-check iterations executed so far.

◆ reserve_or_wait()

coro::task<MemoryReservation> rapidsmpf::streaming::MemoryReserveOrWait::reserve_or_wait ( std::size_t  size,
std::int64_t  net_memory_delta 
)

Attempts to reserve memory or waits until progress can be made.

This coroutine submits a memory reservation request and then suspends until either sufficient memory becomes available or no reservation request (including other pending requests) makes progress within the configured timeout.

The timeout does not apply specifically to this request. Instead, it is used as a global progress guarantee: if no pending reservation request can be satisfied within the timeout, MemoryReserveOrWait forces progress by selecting the smallest pending request and attempting to reserve memory for it. The forced reservation attempt may result in an empty MemoryReservation if the selected request still cannot be satisfied.

When multiple reservation requests are eligible, MemoryReserveOrWait uses net_memory_delta as a heuristic to prefer requests that are expected to reduce memory pressure sooner. The value represents the estimated net change in memory usage after the reservation has been allocated and the dependent operation completes (that is, the memory impact after both allocating size and finishing the work that consumes the reservation):

  • > 0: expected net increase in memory usage
  • = 0: memory-neutral
  • < 0: expected net decrease in memory usage

Smaller values have higher priority. Examples:

  • Reading data from disk into memory typically has a positive net_memory_delta (memory usage increases).
  • A row-wise transformation that retains input and output typically has a net delta near zero (memory-neutral).
  • Writing data to disk or a reduction that frees inputs typically has a negative net_memory_delta (memory usage decreases).
Parameters
sizeNumber of bytes to reserve.
net_memory_deltaEstimated net change in memory usage after the reservation is allocated and the dependent operation completes. Smaller values have higher priority.
Returns
A MemoryReservation representing the allocated memory, or an empty reservation if progress could not be made.
Exceptions
std::runtime_errorIf shutdown occurs before the request can be processed.

◆ reserve_or_wait_or_fail()

coro::task<MemoryReservation> rapidsmpf::streaming::MemoryReserveOrWait::reserve_or_wait_or_fail ( std::size_t  size,
std::int64_t  net_memory_delta 
)

Variant of reserve_or_wait() that fails if no progress is possible.

This coroutine behaves identically to reserve_or_wait() with respect to request submission, waiting, and progress guarantees until the progress timeout expires.

If no reservation request can be satisfied before the timeout, this method fails instead of forcing progress. Overbooking is not allowed, and no memory reservation is made.

Parameters
sizeNumber of bytes to reserve.
net_memory_deltaHeuristic used to prioritize eligible requests. See reserve_or_wait() for details and semantics.
Returns
A MemoryReservation representing the allocated memory.
Exceptions
rapidsmpf::reservation_errorIf no progress is possible within the timeout.
std::runtime_errorIf shutdown occurs before the request can be processed.
See also
reserve_or_wait()

◆ reserve_or_wait_or_overbook()

coro::task<std::pair<MemoryReservation, std::size_t> > rapidsmpf::streaming::MemoryReserveOrWait::reserve_or_wait_or_overbook ( std::size_t  size,
std::int64_t  net_memory_delta 
)

Variant of reserve_or_wait() that allows overbooking on timeout.

This coroutine behaves identically to reserve_or_wait() with respect to request submission, waiting, and progress guarantees. The only difference is the behavior when the progress timeout expires.

If no reservation request can be satisfied before the timeout, this method attempts to reserve the requested memory by allowing overbooking. This guarantees forward progress, but may exceed the configured memory limits.

Parameters
sizeNumber of bytes to reserve.
net_memory_deltaHeuristic used to prioritize eligible requests. See reserve_or_wait() for details and semantics.
Returns
A pair consisting of:
  • A MemoryReservation representing the allocated memory.
  • The number of bytes by which the reservation overbooked the available memory. This value is zero if no overbooking occurred.
Exceptions
std::runtime_errorIf shutdown occurs before the request can be processed.
See also
reserve_or_wait()

◆ shutdown()

Actor rapidsmpf::streaming::MemoryReserveOrWait::shutdown ( )

Shuts down all pending memory reservation requests.

Returns
A coroutine that completes only after all pending requests have been cancelled and the periodic memory check task has exited.

◆ size()

std::size_t rapidsmpf::streaming::MemoryReserveOrWait::size ( ) const
noexcept

Returns the number of pending memory reservation requests.

It may change concurrently as requests are added or fulfilled.

Returns
The number of outstanding reservation requests.

◆ timeout()

Duration rapidsmpf::streaming::MemoryReserveOrWait::timeout ( ) const
noexcept

Get the configured progress timeout.

Returns
The progress timeout duration.

Member Data Documentation

◆ missing_net_memory_delta

constexpr std::int64_t rapidsmpf::streaming::MemoryReserveOrWait::missing_net_memory_delta = 0
staticconstexpr

Sentinel indicating that net_memory_delta estimation has not yet been implemented.

This value is used when a reasonable estimate of the net memory delta is not yet available. Any use of this sentinel should be treated as a TODO, since providing a concrete estimate enables better spilling and scheduling decisions.

See also
reserve_or_wait()

Definition at line 46 of file memory_reserve_or_wait.hpp.


The documentation for this class was generated from the following file: