13 #include <rapidsmpf/streaming/core/channel.hpp>
14 #include <rapidsmpf/streaming/core/context.hpp>
15 #include <rapidsmpf/streaming/core/coro_utils.hpp>
16 #include <rapidsmpf/streaming/core/message.hpp>
17 #include <rapidsmpf/streaming/core/node.hpp>
19 #include <coro/queue.hpp>
20 #include <coro/sync_wait.hpp>
50 ~Shutdown() noexcept {
51 coro::sync_wait(q_.shutdown());
61 Ticket& operator=(Ticket
const&) =
delete;
62 Ticket(Ticket
const&) =
delete;
63 Ticket& operator=(Ticket&&) =
default;
64 Ticket(Ticket&&) =
default;
73 Ticket(coro::queue<Message>* q,
Semaphore* semaphore)
74 : q_{q}, semaphore_{semaphore} {}
90 [[nodiscard]] coro::task<bool> send(
Message msg) {
92 q_ && semaphore_,
"Ticket has already been used", std::logic_error
94 auto result = co_await q_->push(std::move(msg));
97 co_return result == coro::queue_produce_result::produced;
101 coro::queue<Message>* q_;
118 : semaphore_{
static_cast<std::ptrdiff_t
>(buffer_size)} {};
127 [[nodiscard]] coro::task<std::optional<Ticket>>
acquire() {
128 auto result = co_await semaphore_.acquire();
129 if (result == coro::semaphore_acquire_result::shutdown) {
130 co_return std::nullopt;
132 co_return std::make_optional<Ticket>(&q_, &semaphore_);
142 auto msg = co_await q_.pop();
143 if (msg.has_value()) {
144 co_return {semaphore_.release(), std::move(*msg)};
146 co_return {[]() -> coro::task<void> { co_return; }(),
Message{}};
157 [[nodiscard]] coro::task<void>
drain(std::unique_ptr<coro::thread_pool>& executor) {
158 co_await q_.shutdown_drain(executor);
159 co_await semaphore_.shutdown();
170 coro_results(co_await coro::when_all(q_.shutdown(), semaphore_.shutdown()));
180 return BoundedQueue::Shutdown{*
this};
184 coro::queue<Message> q_{};
A bounded queue for type-erased Messages.
coro::task< void > drain(std::unique_ptr< coro::thread_pool > &executor)
Drain all messages in the queue and shut down.
BoundedQueue::Shutdown raii_shutdown() noexcept
Obtain an object that will synchronously shutdown the queue when it goes out of scope.
coro::task< std::pair< coro::task< void >, Message > > receive()
Receive a message from the queue.
coro::task< void > shutdown()
Immediately shut down the queue.
coro::task< std::optional< Ticket > > acquire()
Acquire a ticket to send into the queue.
Context for nodes (coroutines) in rapidsmpf.
Type-erased message wrapper around a payload.
coro::semaphore< std::numeric_limits< std::ptrdiff_t >::max()> Semaphore
An awaitable semaphore to manage acquisition and release of finite resources.
auto coro_results(Range &&task_results)
Collect the results of multiple finished coroutines.