lineariser.hpp
1 
6 #pragma once
7 
8 #include <cstddef>
9 #include <memory>
10 #include <utility>
11 
12 #include <rapidsmpf/streaming/core/actor.hpp>
13 #include <rapidsmpf/streaming/core/channel.hpp>
14 #include <rapidsmpf/streaming/core/context.hpp>
15 #include <rapidsmpf/streaming/core/coro_utils.hpp>
16 #include <rapidsmpf/streaming/core/queue.hpp>
17 
18 namespace rapidsmpf::streaming {
19 
50 class Lineariser {
51  public:
62  std::shared_ptr<Context> ctx,
63  std::shared_ptr<Channel> ch_out,
64  std::size_t num_producers,
65  std::size_t buffer_size = 1
66  )
67  : ctx_{std::move(ctx)}, ch_out_{std::move(ch_out)} {
68  queues_.reserve(num_producers);
69  for (std::size_t i = 0; i < num_producers; i++) {
70  queues_.push_back(ctx->create_bounded_queue(buffer_size));
71  }
72  }
73 
82  std::vector<std::shared_ptr<BoundedQueue>>& get_queues() {
83  return queues_;
84  }
85 
95  ShutdownAtExit c{ch_out_};
96  co_await ctx_->executor()->schedule();
97  bool shutdown{false};
98  while (!(shutdown || queues_.empty())) {
99  for (auto& q : queues_) {
100  auto [receipt, msg] = co_await q->receive();
101  if (msg.empty()) {
102  q = nullptr;
103  continue;
104  }
105  if (!co_await ch_out_->send(std::move(msg))) {
106  // Output channel is shut down, tell the producers to shutdown.
107  shutdown = true;
108  break;
109  }
110  co_await receipt;
111  }
112  std::erase(queues_, nullptr);
113  }
114  // We either exited the loop because all the queues are gone, or the output
115  // channel is shutdown (in which case we want no more inputs), so either way, just
116  // shut down the remaining queues.
117  std::vector<coro::task<void>> tasks;
118  tasks.reserve(1 + queues_.size());
119  for (auto& q : queues_) {
120  tasks.push_back(q->shutdown());
121  }
122  tasks.push_back(ch_out_->drain(ctx_->executor()));
123  coro_results(co_await coro::when_all(std::move(tasks)));
124  }
125 
132  coro::task<void> shutdown() {
133  std::vector<coro::task<void>> tasks;
134  tasks.reserve(1 + queues_.size());
135  for (auto& q : queues_) {
136  tasks.push_back(q->shutdown());
137  }
138  tasks.push_back(ch_out_->shutdown());
139  coro_results(co_await coro::when_all(std::move(tasks)));
140  }
141 
142  private:
143  std::shared_ptr<Context> ctx_;
144  std::vector<std::shared_ptr<BoundedQueue>> queues_;
145  std::shared_ptr<Channel> ch_out_;
146 };
147 
148 } // namespace rapidsmpf::streaming
Linearise insertion into an output channel from a fixed number of producers by sequence number.
Definition: lineariser.hpp:50
std::vector< std::shared_ptr< BoundedQueue > > & get_queues()
Get a reference to the input queues.
Definition: lineariser.hpp:82
Actor drain()
Process inputs and send to the output channel.
Definition: lineariser.hpp:94
coro::task< void > shutdown()
Shut down the lineariser, informing both producers and consumer/.
Definition: lineariser.hpp:132
Lineariser(std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_out, std::size_t num_producers, std::size_t buffer_size=1)
Create a new Lineariser into an output channel.
Definition: lineariser.hpp:61
Helper RAII class to shut down channels when they go out of scope.
Definition: channel.hpp:350
coro::task< void > Actor
Alias for an actor in a streaming graph.
Definition: actor.hpp:18
auto coro_results(Range &&task_results)
Collect the results of multiple finished coroutines.
Definition: coro_utils.hpp:48