Classes | Public Member Functions | List of all members
rapidsmpf::streaming::ThrottlingAdaptor Class Reference

An adaptor to throttle access to a channel. More...

#include <channel.hpp>

Public Member Functions

 ThrottlingAdaptor (std::shared_ptr< Channel > channel, std::ptrdiff_t max_tickets)
 Create an adaptor that throttles sends into a channel. More...
 
coro::task< Ticket > acquire ()
 Obtain a ticket to send a message. More...
 

Detailed Description

An adaptor to throttle access to a channel.

This adds a semaphore-based throttle to a channel to cap the number of suspended coroutines that can be waiting to send into it. It is useful when writing producer nodes that otherwise do not depend on an input channel.

Definition at line 103 of file channel.hpp.

Constructor & Destructor Documentation

◆ ThrottlingAdaptor()

rapidsmpf::streaming::ThrottlingAdaptor::ThrottlingAdaptor ( std::shared_ptr< Channel channel,
std::ptrdiff_t  max_tickets 
)
inlineexplicit

Create an adaptor that throttles sends into a channel.

Parameters
channelChannel to throttle.
max_ticketsMaximum number of simultaneous tickets for sending into the channel.

This adaptor is typically used for producer tasks that have no dependencies but where we nonetheless want to introduce a suspension point before sending into an output channel. Such a task can accept the output channel and wrap it in a ThrottlingAdaptor. Consumers of the adapted channel must first acquire a ticket to send before they can send. At most max_tickets consumers can pass the acquire suspension point at once.

Example usage:

auto ch = ctx->create_channel();
auto throttled = ThrottlingAdaptor(ch, 4);
auto make_task = [&]() {
auto ticket = co_await throttled.acquire();
auto data = do_expensive_work();
auto [_, receipt] = co_await ticket.send(data);
// Not for correctness, but to allow other threads to pick up awaiters at
// acquire
co_await executor->yield();
co_await receipt;
};
std::vector<coro::task<void>> tasks;
for ( ... ) {
tasks.push_back(make_task());
}
co_await coro::when_all(std::move(tasks));
ThrottlingAdaptor(std::shared_ptr< Channel > channel, std::ptrdiff_t max_tickets)
Create an adaptor that throttles sends into a channel.
Definition: channel.hpp:216

Definition at line 216 of file channel.hpp.

Member Function Documentation

◆ acquire()

coro::task<Ticket> rapidsmpf::streaming::ThrottlingAdaptor::acquire ( )
inline

Obtain a ticket to send a message.

Suspends if all tickets are currently handed out.

Exceptions
std::runtime_errorIf the semaphore is shut down.
Returns
A coroutine producing a new Ticket that grants permission to send a message.

Definition at line 235 of file channel.hpp.


The documentation for this class was generated from the following file: