Public Member Functions | List of all members
rapidsmpf::streaming::Lineariser Class Reference

Linearise insertion into an output channel from a fixed number of producers by sequence number. More...

#include <lineariser.hpp>

Public Member Functions

 Lineariser (std::shared_ptr< Context > ctx, std::shared_ptr< Channel > ch_out, std::size_t num_producers, std::size_t buffer_size=1)
 Create a new Lineariser into an output channel. More...
 
std::vector< std::shared_ptr< BoundedQueue > > & get_queues ()
 Get a reference to the input queues. More...
 
Actor drain ()
 Process inputs and send to the output channel. More...
 
coro::task< void > shutdown ()
 Shut down the lineariser, informing both producers and consumer/. More...
 

Detailed Description

Linearise insertion into an output channel from a fixed number of producers by sequence number.

Producers are polled in round-robin fashion, and therefore must deliver messages in round-robin increasing sequence number order. If this guarantee is upheld, then the output of the Lineariser is guaranteed to be in total order of the sequence numbers.

Example usage:

auto ctx = std::make_shared<Context>(...);
auto ch_out = ctx->create_channel();
auto linearise = std::make_shared<Lineariser>(ch_out, 8);
std::vector<Actor> tasks;
// Draining the lineariser will pull from all the input channels until they are
// shutdown and send to the output channel until it is consumed.
tasks.push_back(linearise->drain());
for (auto& ch_in: lineariser->get_inputs()) {
// Each producer promises to send an increasing stream of sequence ids in
// round-robin fashion. That is, if there are P producers, producer 0 sends
// [0, P, 2P, ...], producer 1 sends [1, P+1, 2P + 1, ...] and producer i
// sends [i, P + i, 2P + i, ...].
tasks.push_back(producer(ctx, ch_in, ...));
}
coro_results(co_await coro::when_all(std::move(tasks)));
// ch_out will see inputs in global total order of sequence id.
auto coro_results(Range &&task_results)
Collect the results of multiple finished coroutines.
Definition: coro_utils.hpp:48

Definition at line 50 of file lineariser.hpp.

Constructor & Destructor Documentation

◆ Lineariser()

rapidsmpf::streaming::Lineariser::Lineariser ( std::shared_ptr< Context ctx,
std::shared_ptr< Channel ch_out,
std::size_t  num_producers,
std::size_t  buffer_size = 1 
)
inline

Create a new Lineariser into an output channel.

Parameters
ctxStreaming context.
ch_outThe output channel.
num_producersThe number of producers.
buffer_sizeThe number of messages that are buffered in the lineariser from each producer.

Definition at line 61 of file lineariser.hpp.

Member Function Documentation

◆ drain()

Actor rapidsmpf::streaming::Lineariser::drain ( )
inline

Process inputs and send to the output channel.

Returns
Coroutine representing the linearised sends of all producers.
Note
This coroutine should be awaited in a coro::when_all with all of the producer tasks.

Definition at line 94 of file lineariser.hpp.

◆ get_queues()

std::vector<std::shared_ptr<BoundedQueue> >& rapidsmpf::streaming::Lineariser::get_queues ( )
inline

Get a reference to the input queues.

Returns
Reference to the BoundedQueues to send into.
Note
Behaviour is undefined if more than one producer coroutine sends into the same queue.

Definition at line 82 of file lineariser.hpp.

◆ shutdown()

coro::task<void> rapidsmpf::streaming::Lineariser::shutdown ( )
inline

Shut down the lineariser, informing both producers and consumer/.

Returns
Coroutine representing the shutdown of all input queues and the output channel.

Definition at line 130 of file lineariser.hpp.


The documentation for this class was generated from the following file: