Public Member Functions | List of all members
rapidsmpf::streaming::Channel Class Reference

A coroutine-based channel for sending and receiving messages asynchronously. More...

#include <channel.hpp>

Public Member Functions

coro::task< bool > send (Message msg)
 Asynchronously send a message into the channel. More...
 
coro::task< Messagereceive ()
 Asynchronously receive a message from the channel. More...
 
coro::task< bool > send_metadata (Message msg)
 Asynchronously send a metadata message into the channel. More...
 
coro::task< Messagereceive_metadata ()
 Asynchronously receive a metadata message from the channel. More...
 
Actor drain_metadata (std::shared_ptr< CoroThreadPoolExecutor > executor)
 Drains all pending metadata messages from the channel and shuts down the metadata channel. More...
 
Actor drain (std::shared_ptr< CoroThreadPoolExecutor > executor)
 Drains all pending messages from the channel and shuts it down. More...
 
Actor shutdown ()
 Immediately shuts down the channel. More...
 
Actor shutdown_metadata ()
 Immediately shuts down the metadata channel. More...
 
bool empty () const noexcept
 Check whether the channel is empty. More...
 
bool is_shutdown () const noexcept
 Check whether the channel is shut down. More...
 

Detailed Description

A coroutine-based channel for sending and receiving messages asynchronously.

The constructor is private, use the factory method Context::create_channel() to create a new channel.

In addition to sending messages through a channel, channel producers can communicate metadata to consumers through a metadata side-channel.

Note
The Channel is bounded for the purposes of sending messages (via send), but the side-channel is unbounded for the purposes of sending metadata (via send_metadata). There is no implied ordering between metadata messages and ordinary messages (they travel over separate paths and do not interfere).
The metadata side-channel should be drained (or shutdown) like the Channel itself. For convenience, drain and shutdown ensure that both channel and metadata channel are shut down appropriately. One can also drain or shutdown the side-channel independently (drain_metadata and shutdown_metadata).

Definition at line 52 of file channel.hpp.

Member Function Documentation

◆ drain()

Actor rapidsmpf::streaming::Channel::drain ( std::shared_ptr< CoroThreadPoolExecutor executor)

Drains all pending messages from the channel and shuts it down.

This is intended to ensure all remaining messages are processed.

Warning
If the consumer has no intention of reading metadata messages it must call shutdown_metadata (directly, or indirectly via shutdown) otherwise when the producer drains the output metadata channel it will block forever.
Parameters
executorThe thread pool used to process remaining messages.
Returns
A coroutine representing the completion of the shutdown drain.

◆ drain_metadata()

Actor rapidsmpf::streaming::Channel::drain_metadata ( std::shared_ptr< CoroThreadPoolExecutor executor)

Drains all pending metadata messages from the channel and shuts down the metadata channel.

This is intended to ensure all remaining metadata messages are processed.

Warning
If the consumer has no intention of reading metadata messages it must call shutdown_metadata (directly, or indirectly via shutdown) otherwise when the producer drains the output metadata channel it will block forever.
Parameters
executorThe thread pool used to process remaining messages.
Returns
A coroutine representing the completion of the metadata shutdown drain.

◆ empty()

bool rapidsmpf::streaming::Channel::empty ( ) const
noexcept

Check whether the channel is empty.

Returns
True if there are no messages in the buffer.

◆ is_shutdown()

bool rapidsmpf::streaming::Channel::is_shutdown ( ) const
noexcept

Check whether the channel is shut down.

Returns
True if the channel is shut down.

◆ receive()

coro::task<Message> rapidsmpf::streaming::Channel::receive ( )

Asynchronously receive a message from the channel.

Suspends if the channel is empty.

Returns
A coroutine that evaluates to the message, which will be empty if the channel is shut down.
Exceptions
std::logic_errorIf the received message is empty.

◆ receive_metadata()

coro::task<Message> rapidsmpf::streaming::Channel::receive_metadata ( )

Asynchronously receive a metadata message from the channel.

Suspends if no metadata is available.

Returns
A coroutine that evaluates to the message, which will be empty if the metadata queue is shut down.

◆ send()

coro::task<bool> rapidsmpf::streaming::Channel::send ( Message  msg)

Asynchronously send a message into the channel.

Suspends if the channel is full.

Parameters
msgThe msg to send.
Returns
A coroutine that evaluates to true if the msg was successfully sent or false if the channel was shut down.
Exceptions
std::logic_errorIf the message is empty.

◆ send_metadata()

coro::task<bool> rapidsmpf::streaming::Channel::send_metadata ( Message  msg)

Asynchronously send a metadata message into the channel.

Note
Sending metadata is always possible, even if the other end of the channel never consumes the metadata.
A typical usage of metadata will have the consumer reading metadata before reading messages from the channel. Hence, the producer should send metadata (or shutdown the side-channel via shutdown_metadata) before proceeding to send messages.
Parameters
msgThe metadata message to send.
Returns
A coroutine that evaluates to true if the msg was successfully sent or false if the channel was shut down.
Exceptions
std::logic_errorIf the message is empty.

◆ shutdown()

Actor rapidsmpf::streaming::Channel::shutdown ( )

Immediately shuts down the channel.

Any pending or future send/receive operations (including metadata messages) will complete with failure.

Returns
A coroutine representing the completion of the shutdown.

◆ shutdown_metadata()

Actor rapidsmpf::streaming::Channel::shutdown_metadata ( )

Immediately shuts down the metadata channel.

Any pending or future metadata send/receive operations will complete with failure.

Note
If the producer has no metadata to provide, it should shutdown_metadata before anything else.
Returns
A coroutine representing the completion of the shutdown.

The documentation for this class was generated from the following file: