lineariser.hpp
1 
6 #pragma once
7 
8 #include <cstddef>
9 #include <memory>
10 #include <utility>
11 
12 #include <rapidsmpf/streaming/core/actor.hpp>
13 #include <rapidsmpf/streaming/core/channel.hpp>
14 #include <rapidsmpf/streaming/core/context.hpp>
15 #include <rapidsmpf/streaming/core/coro_utils.hpp>
16 #include <rapidsmpf/streaming/core/queue.hpp>
17 
18 namespace rapidsmpf::streaming {
19 
50 class Lineariser {
51  public:
62  std::shared_ptr<Context> ctx,
63  std::shared_ptr<Channel> ch_out,
64  std::size_t num_producers,
65  std::size_t buffer_size = 1
66  )
67  : ctx_{std::move(ctx)}, ch_out_{std::move(ch_out)} {
68  queues_.reserve(num_producers);
69  for (std::size_t i = 0; i < num_producers; i++) {
70  queues_.push_back(ctx->create_bounded_queue(buffer_size));
71  }
72  }
73 
82  std::vector<std::shared_ptr<BoundedQueue>>& get_queues() {
83  return queues_;
84  }
85 
95  ShutdownAtExit c{ch_out_};
96  co_await ctx_->executor()->schedule();
97  while (!queues_.empty()) {
98  for (auto& q : queues_) {
99  auto [receipt, msg] = co_await q->receive();
100  if (msg.empty()) {
101  q = nullptr;
102  continue;
103  }
104  if (!co_await ch_out_->send(std::move(msg))) {
105  // Output channel is shut down, tell the producers to shutdown.
106  break;
107  }
108  co_await receipt;
109  }
110  std::erase(queues_, nullptr);
111  }
112  // We either exited the loop because all the queues are gone, or the output
113  // channel is shutdown (in which case we want no more inputs), so either way, just
114  // shut down the remaining queues.
115  std::vector<coro::task<void>> tasks;
116  tasks.reserve(1 + queues_.size());
117  for (auto& q : queues_) {
118  tasks.push_back(q->shutdown());
119  }
120  tasks.push_back(ch_out_->drain(ctx_->executor()));
121  coro_results(co_await coro::when_all(std::move(tasks)));
122  }
123 
130  coro::task<void> shutdown() {
131  std::vector<coro::task<void>> tasks;
132  tasks.reserve(1 + queues_.size());
133  for (auto& q : queues_) {
134  tasks.push_back(q->shutdown());
135  }
136  tasks.push_back(ch_out_->shutdown());
137  coro_results(co_await coro::when_all(std::move(tasks)));
138  }
139 
140  private:
141  std::shared_ptr<Context> ctx_;
142  std::vector<std::shared_ptr<BoundedQueue>> queues_;
143  std::shared_ptr<Channel> ch_out_;
144 };
145 
146 } // namespace rapidsmpf::streaming
Linearise insertion into an output channel from a fixed number of producers by sequence number.
Definition: lineariser.hpp:50
std::vector< std::shared_ptr< BoundedQueue > > & get_queues()
Get a reference to the input queues.
Definition: lineariser.hpp:82
Actor drain()
Process inputs and send to the output channel.
Definition: lineariser.hpp:94
coro::task< void > shutdown()
Shut down the lineariser, informing both producers and consumer/.
Definition: lineariser.hpp:130
Lineariser(std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_out, std::size_t num_producers, std::size_t buffer_size=1)
Create a new Lineariser into an output channel.
Definition: lineariser.hpp:61
Helper RAII class to shut down channels when they go out of scope.
Definition: channel.hpp:350
coro::task< void > Actor
Alias for an actor in a streaming graph.
Definition: actor.hpp:18
auto coro_results(Range &&task_results)
Collect the results of multiple finished coroutines.
Definition: coro_utils.hpp:48