14 #include <rapidsmpf/error.hpp>
15 #include <rapidsmpf/streaming/core/message.hpp>
16 #include <rapidsmpf/streaming/core/node.hpp>
17 #include <rapidsmpf/streaming/core/spillable_messages.hpp>
19 #include <coro/coro.hpp>
20 #include <coro/semaphore.hpp>
29 using Semaphore = coro::semaphore<std::numeric_limits<std::ptrdiff_t>::max()>;
70 Node drain(std::unique_ptr<coro::thread_pool>& executor);
86 [[nodiscard]]
bool empty() const noexcept;
90 : sm_{std::move(spillable_messages)} {}
92 coro::ring_buffer<SpillableMessages::MessageId, 1> rb_;
93 std::shared_ptr<SpillableMessages> sm_;
108 Ticket& operator=(Ticket
const&) =
delete;
109 Ticket(Ticket
const&) =
delete;
110 Ticket& operator=(Ticket&&) =
default;
111 Ticket(Ticket&&) =
default;
153 [[nodiscard]] coro::task<std::pair<bool, coro::task<void>>> send(
Message msg) {
154 RAPIDSMPF_EXPECTS(ch_,
"Ticket has already been used", std::logic_error);
155 auto sent = co_await ch_->send(std::move(msg));
158 co_return {sent, semaphore_->release()};
162 co_await semaphore_->shutdown();
163 co_return {sent, []() -> coro::task<void> { co_return; }()};
174 : ch_{channel}, semaphore_{semaphore} {};
217 std::shared_ptr<Channel> channel, std::ptrdiff_t max_tickets
219 : ch_{std::move(channel)}, semaphore_(max_tickets) {
221 max_tickets > 0,
"ThrottlingAdaptor must have at least one ticket"
236 auto result = co_await semaphore_.acquire();
238 result == coro::semaphore_acquire_result::acquired,
239 "Semaphore was shutdown",
242 co_return {ch_.get(), &semaphore_};
246 std::shared_ptr<Channel> ch_;
274 : channels_{std::move(channels)} {
275 for (
auto& ch : channels_) {
276 RAPIDSMPF_EXPECTS(ch,
"channel cannot be null", std::invalid_argument);
292 template <
class... T>
294 requires(std::convertible_to<T, std::shared_ptr<Channel>> && ...)
296 std::vector<std::shared_ptr<
Channel>>{std::forward<T>(channels)...}
311 for (
auto& ch : channels_) {
312 coro::sync_wait(ch->shutdown());
317 std::vector<std::shared_ptr<Channel>> channels_;
A coroutine-based channel for sending and receiving messages asynchronously.
bool empty() const noexcept
Check whether the channel is empty.
Node drain(std::unique_ptr< coro::thread_pool > &executor)
Drains all pending messages from the channel and shuts it down.
Node shutdown()
Immediately shuts down the channel.
coro::task< Message > receive()
Asynchronously receive a message from the channel.
coro::task< bool > send(Message msg)
Asynchronously send a message into the channel.
Context for nodes (coroutines) in rapidsmpf.
Type-erased message wrapper around a payload.
Helper RAII class to shut down channels when they go out of scope.
~ShutdownAtExit() noexcept
Destructor that synchronously shuts down all channels.
ShutdownAtExit(T &&... channels) requires(std
Variadic convenience constructor.
ShutdownAtExit(std::vector< std::shared_ptr< Channel >> channels)
Construct from a vector of channel handles.
Container for individually spillable messages.
An adaptor to throttle access to a channel.
coro::task< Ticket > acquire()
Obtain a ticket to send a message.
ThrottlingAdaptor(std::shared_ptr< Channel > channel, std::ptrdiff_t max_tickets)
Create an adaptor that throttles sends into a channel.
coro::task< void > Node
Alias for a node in a streaming pipeline.
coro::semaphore< std::numeric_limits< std::ptrdiff_t >::max()> Semaphore
An awaitable semaphore to manage acquisition and release of finite resources.