14 #include <coro/coro.hpp>
15 #include <coro/queue.hpp>
16 #include <coro/semaphore.hpp>
18 #include <rapidsmpf/error.hpp>
19 #include <rapidsmpf/streaming/core/actor.hpp>
20 #include <rapidsmpf/streaming/core/coro_executor.hpp>
21 #include <rapidsmpf/streaming/core/message.hpp>
22 #include <rapidsmpf/streaming/core/spillable_messages.hpp>
31 using Semaphore = coro::semaphore<std::numeric_limits<std::ptrdiff_t>::max()>;
79 [[nodiscard]] coro::task<Message>
receive();
137 [[nodiscard]]
Actor drain(std::shared_ptr<CoroThreadPoolExecutor> executor);
166 [[nodiscard]]
bool empty() const noexcept;
177 : sm_{std::move(spillable_messages)} {}
179 coro::ring_buffer<SpillableMessages::MessageId, 1> rb_;
180 std::shared_ptr<SpillableMessages> sm_;
181 coro::queue<Message> metadata_;
196 Ticket& operator=(Ticket
const&) =
delete;
197 Ticket(Ticket
const&) =
delete;
198 Ticket& operator=(Ticket&&) =
default;
199 Ticket(Ticket&&) =
default;
241 [[nodiscard]] coro::task<std::pair<bool, coro::task<void>>> send(
Message msg) {
242 RAPIDSMPF_EXPECTS(ch_,
"Ticket has already been used", std::logic_error);
243 auto sent = co_await ch_->send(std::move(msg));
246 co_return {sent, semaphore_->release()};
250 co_await semaphore_->shutdown();
251 co_return {sent, []() -> coro::task<void> { co_return; }()};
262 : ch_{channel}, semaphore_{semaphore} {};
305 std::shared_ptr<Channel> channel, std::ptrdiff_t max_tickets
307 : ch_{std::move(channel)}, semaphore_(max_tickets) {
309 max_tickets > 0,
"ThrottlingAdaptor must have at least one ticket"
324 auto result = co_await semaphore_.acquire();
326 result == coro::semaphore_acquire_result::acquired,
327 "Semaphore was shutdown",
330 co_return {ch_.get(), &semaphore_};
334 std::shared_ptr<Channel> ch_;
362 : channels_{std::move(channels)} {
363 for (
auto& ch : channels_) {
364 RAPIDSMPF_EXPECTS(ch,
"channel cannot be null", std::invalid_argument);
380 template <
class... T>
382 requires(std::convertible_to<T, std::shared_ptr<Channel>> && ...)
384 std::vector<std::shared_ptr<
Channel>>{std::forward<T>(channels)...}
399 for (
auto& ch : channels_) {
400 coro::sync_wait(ch->shutdown());
405 std::vector<std::shared_ptr<Channel>> channels_;
A coroutine-based channel for sending and receiving messages asynchronously.
bool empty() const noexcept
Check whether the channel is empty.
Actor shutdown_metadata()
Immediately shuts down the metadata channel.
coro::task< bool > send_metadata(Message msg)
Asynchronously send a metadata message into the channel.
Actor drain_metadata(std::shared_ptr< CoroThreadPoolExecutor > executor)
Drains all pending metadata messages from the channel and shuts down the metadata channel.
Actor drain(std::shared_ptr< CoroThreadPoolExecutor > executor)
Drains all pending messages from the channel and shuts it down.
bool is_shutdown() const noexcept
Check whether the channel is shut down.
coro::task< Message > receive_metadata()
Asynchronously receive a metadata message from the channel.
coro::task< Message > receive()
Asynchronously receive a message from the channel.
coro::task< bool > send(Message msg)
Asynchronously send a message into the channel.
Actor shutdown()
Immediately shuts down the channel.
Context for actors (coroutines) in rapidsmpf.
Type-erased message wrapper around a payload.
Helper RAII class to shut down channels when they go out of scope.
~ShutdownAtExit() noexcept
Destructor that synchronously shuts down all channels.
ShutdownAtExit(T &&... channels) requires(std
Variadic convenience constructor.
ShutdownAtExit(std::vector< std::shared_ptr< Channel >> channels)
Construct from a vector of channel handles.
Container for individually spillable messages.
An adaptor to throttle access to a channel.
coro::task< Ticket > acquire()
Obtain a ticket to send a message.
ThrottlingAdaptor(std::shared_ptr< Channel > channel, std::ptrdiff_t max_tickets)
Create an adaptor that throttles sends into a channel.
coro::task< void > Actor
Alias for an actor in a streaming graph.
coro::semaphore< std::numeric_limits< std::ptrdiff_t >::max()> Semaphore
An awaitable semaphore to manage acquisition and release of finite resources.