Public Types | Public Member Functions | List of all members
rapidsmpf::streaming::Message Class Reference

Type-erased message wrapper around a payload. More...

#include <message.hpp>

Public Types

using CopyCallback = std::function< Message(Message const &, MemoryReservation &reservation)>
 Callback for performing a deep copy of a message. More...
 

Public Member Functions

 Message ()=default
 Create an empty message.
 
template<typename T >
 Message (std::uint64_t sequence_number, std::unique_ptr< T > payload, ContentDescription content_description, CopyCallback copy_cb=nullptr)
 Construct a new message from a unique pointer to its payload. More...
 
 Message (Message &&other) noexcept=default
 Move construct. More...
 
Messageoperator= (Message &&other) noexcept=default
 Move assign. More...
 
 Message (Message const &)=delete
 
Messageoperator= (Message const &)=delete
 
void reset () noexcept
 Reset the message to empty.
 
bool empty () const noexcept
 Returns true when no payload is stored. More...
 
constexpr std::uint64_t sequence_number () const noexcept
 Returns the sequence number of this message. More...
 
template<typename T >
bool holds () const noexcept
 Compare the payload type. More...
 
template<typename T >
T const & get () const
 Reference to the payload. More...
 
template<typename T >
release ()
 Extracts the payload and resets the message. More...
 
constexpr ContentDescription const & content_description () const noexcept
 Returns the content description associated with the message. More...
 
constexpr CopyCallback const & copy_cb () const noexcept
 Returns the copy callback associated with the message. More...
 
constexpr size_t copy_cost () const noexcept
 Returns the total memory size required for a deep copy of the payload. More...
 
Message copy (MemoryReservation &reservation) const
 Perform a deep copy of this message and its payload. More...
 

Detailed Description

Type-erased message wrapper around a payload.

Definition at line 27 of file message.hpp.

Member Typedef Documentation

◆ CopyCallback

using rapidsmpf::streaming::Message::CopyCallback = std::function<Message(Message const&, MemoryReservation& reservation)>

Callback for performing a deep copy of a message.

The copy operation allocates new memory for the message's payload using the provided memory reservation. The memory type specified in the reservation determines where the new copy will primarily reside (e.g., device or host memory).

Parameters
msgSource message to copy.
reservationMemory reservation to consume during allocation.
Returns
A new Message instance containing a deep copy of the payload.

Definition at line 41 of file message.hpp.

Constructor & Destructor Documentation

◆ Message() [1/2]

template<typename T >
rapidsmpf::streaming::Message::Message ( std::uint64_t  sequence_number,
std::unique_ptr< T >  payload,
ContentDescription  content_description,
CopyCallback  copy_cb = nullptr 
)
inline

Construct a new message from a unique pointer to its payload.

The message may optionally support deep-copy and spilling operations through a user-provided CopyCallback. If no callback is provided, copy and spill operations are disabled.

Template Parameters
TType of the payload to store inside the message.
Parameters
sequence_numberOrdering identifier for the message.
payloadNon-null unique pointer to the payload.
content_descriptionDescription of the payload's content. When a copy callback is provided, this description must accurately reflect the content of the payload (e.g., per-memory-type sizes and spillable status).
copy_cbOptional callback used to perform deep copies of the message. If nullptr, copying and spilling are disabled.
Note
Sequence numbers are used to ensure that when multiple producers send into the same output channel, channel ordering is preserved. Specifically, the guarantee is that Channels always produce elements in increasing sequence number order. To ensure this, single producers must promise to send into the channels in strictly increasing sequence number order. Behaviour is undefined if not. To ensure insertion into an output channel from multiple producers obeys this invariant, use a Lineariser. This promise allows consumers to ensure ordering by buffering at most num_consumers messages, rather than needing to buffer the entire channel input.
Exceptions
std::invalid_argumentif payload is null.

Definition at line 76 of file message.hpp.

◆ Message() [2/2]

rapidsmpf::streaming::Message::Message ( Message &&  other)
defaultnoexcept

Move construct.

Parameters
otherSource message.

Member Function Documentation

◆ content_description()

constexpr ContentDescription const& rapidsmpf::streaming::Message::content_description ( ) const
inlineconstexprnoexcept

Returns the content description associated with the message.

Returns
The message's content description.

Definition at line 185 of file message.hpp.

◆ copy()

Message rapidsmpf::streaming::Message::copy ( MemoryReservation reservation) const
inline

Perform a deep copy of this message and its payload.

Invokes the registered copy callback to create a new Message with freshly allocated buffers. The allocation is performed using the provided memory reservation, which also define the target memory type (e.g., host or device).

The resulting message contains a deep copy of the original payload, while preserving the same metadata and callbacks.

Parameters
reservationMemory reservation to consume for the copy.
Returns
A new Message instance containing a deep copy of the payload.
Exceptions
std::invalid_argumentif the message does not support copying.

Definition at line 231 of file message.hpp.

◆ copy_cb()

constexpr CopyCallback const& rapidsmpf::streaming::Message::copy_cb ( ) const
inlineconstexprnoexcept

Returns the copy callback associated with the message.

Returns
The message's copy callback function.

Definition at line 194 of file message.hpp.

◆ copy_cost()

constexpr size_t rapidsmpf::streaming::Message::copy_cost ( ) const
inlineconstexprnoexcept

Returns the total memory size required for a deep copy of the payload.

The computed size represents the total amount of memory that must be reserved to duplicate all content buffers of the message, regardless of their current memory locations. For example, if the payload's content resides in both host and device memory, the returned size is the sum of both.

Returns
Total number of bytes that must be reserved to perform a deep copy of the message's payload and content buffers.
See also
copy()

Definition at line 212 of file message.hpp.

◆ empty()

bool rapidsmpf::streaming::Message::empty ( ) const
inlinenoexcept

Returns true when no payload is stored.

Returns
true if empty, false otherwise.

Definition at line 127 of file message.hpp.

◆ get()

template<typename T >
T const& rapidsmpf::streaming::Message::get ( ) const
inline

Reference to the payload.

The returned reference remains valid until the message is released or reset.

Template Parameters
TPayload type.
Returns
Reference to the payload.
Exceptions
std::invalid_argumentif empty or type mismatch.

Definition at line 161 of file message.hpp.

◆ holds()

template<typename T >
bool rapidsmpf::streaming::Message::holds ( ) const
inlinenoexcept

Compare the payload type.

Template Parameters
TExpected payload type.
Returns
true if the payload is typeid(T), false otherwise.

Definition at line 147 of file message.hpp.

◆ operator=()

Message& rapidsmpf::streaming::Message::operator= ( Message &&  other)
defaultnoexcept

Move assign.

Parameters
otherSource message.
Returns
*this.

◆ release()

template<typename T >
T rapidsmpf::streaming::Message::release ( )
inline

Extracts the payload and resets the message.

Template Parameters
TPayload type.
Returns
The payload.
Exceptions
std::invalid_argumentif empty or type mismatch.

Definition at line 173 of file message.hpp.

◆ sequence_number()

constexpr std::uint64_t rapidsmpf::streaming::Message::sequence_number ( ) const
inlineconstexprnoexcept

Returns the sequence number of this message.

Returns
The sequence number.

Definition at line 136 of file message.hpp.


The documentation for this class was generated from the following file: