queue.hpp
1 
6 #pragma once
7 
8 #include <cstddef>
9 #include <memory>
10 #include <optional>
11 #include <utility>
12 
13 #include <rapidsmpf/streaming/core/channel.hpp>
14 #include <rapidsmpf/streaming/core/context.hpp>
15 #include <rapidsmpf/streaming/core/coro_utils.hpp>
16 #include <rapidsmpf/streaming/core/message.hpp>
17 #include <rapidsmpf/streaming/core/node.hpp>
18 
19 #include <coro/queue.hpp>
20 #include <coro/sync_wait.hpp>
21 
22 namespace rapidsmpf::streaming {
23 
31 class BoundedQueue {
32  friend Context;
33 
34  private:
38  class Shutdown {
39  public:
45  explicit Shutdown(BoundedQueue& q) : q_{q} {}
46 
50  ~Shutdown() noexcept {
51  coro::sync_wait(q_.shutdown());
52  }
53 
54  private:
55  BoundedQueue& q_;
56  };
57 
59  class Ticket {
60  public:
61  Ticket& operator=(Ticket const&) = delete;
62  Ticket(Ticket const&) = delete;
63  Ticket& operator=(Ticket&&) = default;
64  Ticket(Ticket&&) = default;
65  ~Ticket() = default;
66 
73  Ticket(coro::queue<Message>* q, Semaphore* semaphore)
74  : q_{q}, semaphore_{semaphore} {}
75 
90  [[nodiscard]] coro::task<bool> send(Message msg) {
91  RAPIDSMPF_EXPECTS(
92  q_ && semaphore_, "Ticket has already been used", std::logic_error
93  );
94  auto result = co_await q_->push(std::move(msg));
95  q_ = nullptr;
96  semaphore_ = nullptr;
97  co_return result == coro::queue_produce_result::produced;
98  }
99 
100  private:
101  coro::queue<Message>* q_;
102  Semaphore* semaphore_;
103  };
104 
117  BoundedQueue(std::size_t buffer_size)
118  : semaphore_{static_cast<std::ptrdiff_t>(buffer_size)} {};
119 
120  public:
127  [[nodiscard]] coro::task<std::optional<Ticket>> acquire() {
128  auto result = co_await semaphore_.acquire();
129  if (result == coro::semaphore_acquire_result::shutdown) {
130  co_return std::nullopt;
131  }
132  co_return std::make_optional<Ticket>(&q_, &semaphore_);
133  }
134 
141  [[nodiscard]] coro::task<std::pair<coro::task<void>, Message>> receive() {
142  auto msg = co_await q_.pop();
143  if (msg.has_value()) {
144  co_return {semaphore_.release(), std::move(*msg)};
145  } else {
146  co_return {[]() -> coro::task<void> { co_return; }(), Message{}};
147  }
148  }
149 
157  [[nodiscard]] coro::task<void> drain(std::unique_ptr<coro::thread_pool>& executor) {
158  co_await q_.shutdown_drain(executor);
159  co_await semaphore_.shutdown();
160  }
161 
169  [[nodiscard]] coro::task<void> shutdown() {
170  coro_results(co_await coro::when_all(q_.shutdown(), semaphore_.shutdown()));
171  }
172 
179  [[nodiscard]] BoundedQueue::Shutdown raii_shutdown() noexcept {
180  return BoundedQueue::Shutdown{*this};
181  }
182 
183  private:
184  coro::queue<Message> q_{};
185  Semaphore semaphore_;
186 };
187 
188 } // namespace rapidsmpf::streaming
A bounded queue for type-erased Messages.
Definition: queue.hpp:31
coro::task< void > drain(std::unique_ptr< coro::thread_pool > &executor)
Drain all messages in the queue and shut down.
Definition: queue.hpp:157
BoundedQueue::Shutdown raii_shutdown() noexcept
Obtain an object that will synchronously shutdown the queue when it goes out of scope.
Definition: queue.hpp:179
coro::task< std::pair< coro::task< void >, Message > > receive()
Receive a message from the queue.
Definition: queue.hpp:141
coro::task< void > shutdown()
Immediately shut down the queue.
Definition: queue.hpp:169
coro::task< std::optional< Ticket > > acquire()
Acquire a ticket to send into the queue.
Definition: queue.hpp:127
Context for nodes (coroutines) in rapidsmpf.
Definition: context.hpp:25
Type-erased message wrapper around a payload.
Definition: message.hpp:27
coro::semaphore< std::numeric_limits< std::ptrdiff_t >::max()> Semaphore
An awaitable semaphore to manage acquisition and release of finite resources.
Definition: channel.hpp:29
auto coro_results(Range &&task_results)
Collect the results of multiple finished coroutines.
Definition: coro_utils.hpp:48