12 #include <rapidsmpf/streaming/core/actor.hpp>
13 #include <rapidsmpf/streaming/core/channel.hpp>
14 #include <rapidsmpf/streaming/core/context.hpp>
15 #include <rapidsmpf/streaming/core/coro_utils.hpp>
16 #include <rapidsmpf/streaming/core/queue.hpp>
62 std::shared_ptr<Context> ctx,
63 std::shared_ptr<Channel> ch_out,
64 std::size_t num_producers,
65 std::size_t buffer_size = 1
67 : ctx_{std::move(ctx)}, ch_out_{std::move(ch_out)} {
68 queues_.reserve(num_producers);
69 for (std::size_t i = 0; i < num_producers; i++) {
70 queues_.push_back(ctx->create_bounded_queue(buffer_size));
82 std::vector<std::shared_ptr<BoundedQueue>>&
get_queues() {
96 co_await ctx_->executor()->schedule();
97 while (!queues_.empty()) {
98 for (
auto& q : queues_) {
99 auto [receipt, msg] = co_await q->receive();
104 if (!co_await ch_out_->send(std::move(msg))) {
110 std::erase(queues_,
nullptr);
115 std::vector<coro::task<void>> tasks;
116 tasks.reserve(1 + queues_.size());
117 for (
auto& q : queues_) {
118 tasks.push_back(q->shutdown());
120 tasks.push_back(ch_out_->drain(ctx_->executor()));
121 coro_results(co_await coro::when_all(std::move(tasks)));
131 std::vector<coro::task<void>> tasks;
132 tasks.reserve(1 + queues_.size());
133 for (
auto& q : queues_) {
134 tasks.push_back(q->shutdown());
136 tasks.push_back(ch_out_->shutdown());
137 coro_results(co_await coro::when_all(std::move(tasks)));
141 std::shared_ptr<Context> ctx_;
142 std::vector<std::shared_ptr<BoundedQueue>> queues_;
143 std::shared_ptr<Channel> ch_out_;
Linearise insertion into an output channel from a fixed number of producers by sequence number.
std::vector< std::shared_ptr< BoundedQueue > > & get_queues()
Get a reference to the input queues.
Actor drain()
Process inputs and send to the output channel.
coro::task< void > shutdown()
Shut down the lineariser, informing both producers and consumer/.
Lineariser(std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_out, std::size_t num_producers, std::size_t buffer_size=1)
Create a new Lineariser into an output channel.
Helper RAII class to shut down channels when they go out of scope.
coro::task< void > Actor
Alias for an actor in a streaming graph.
auto coro_results(Range &&task_results)
Collect the results of multiple finished coroutines.