Container for individually spillable messages. More...
#include <spillable_messages.hpp>
Public Types | |
| using | MessageId = std::uint64_t |
| Unique identifier assigned to each message. | |
Public Member Functions | |
| SpillableMessages (SpillableMessages const &)=delete | |
| SpillableMessages & | operator= (SpillableMessages const &)=delete |
| SpillableMessages (SpillableMessages &&) noexcept=delete | |
| SpillableMessages & | operator= (SpillableMessages &&) noexcept=delete |
| MessageId | insert (Message &&message) |
| Insert a new message and return its assigned ID. More... | |
| Message | extract (MessageId mid) |
| Extract and remove a message by ID. More... | |
| Message | copy (MessageId mid, MemoryReservation &reservation) |
| Create a deep copy of a message without removing it. More... | |
| std::size_t | spill (MessageId mid, BufferResource *br) const |
| Spill a message's device memory to host memory. More... | |
| std::map< MessageId, ContentDescription > | get_content_descriptions () const |
| Get a snapshot of current messages' content descriptions. More... | |
| ContentDescription | get_content_description (MessageId mid) const |
| Get the content description of a message by ID. More... | |
Container for individually spillable messages.
SpillableMessages manages a collection of Message instances that can be spilled or extracted independently. Each message is assigned a unique MessageId upon insertion, which can later be used to extract or spill that message.
The container is thread-safe for concurrent insertions, extractions, and spills.
Definition at line 39 of file spillable_messages.hpp.
| Message rapidsmpf::streaming::SpillableMessages::copy | ( | MessageId | mid, |
| MemoryReservation & | reservation | ||
| ) |
Create a deep copy of a message without removing it.
This method duplicates the message identified by mid while leaving the original message intact inside the container. The returned message is a full deep copy of the payload. If the message is currently being spilled by another thread, this call waits until spilling completes.
| mid | Message identifier. |
| reservation | Memory reservation used for allocating buffers during the deep copy. The reservation also determines the memory type of the returned message. |
Message.| std::out_of_range | If the message has already been extracted or the message identifier is invalid. |
| std::runtime_error | If required memory cannot be allocated using the provided reservation. |
| ContentDescription rapidsmpf::streaming::SpillableMessages::get_content_description | ( | MessageId | mid | ) | const |
Get the content description of a message by ID.
| mid | Message identifier. |
| std::out_of_range | If the message does not exist. |
| std::map<MessageId, ContentDescription> rapidsmpf::streaming::SpillableMessages::get_content_descriptions | ( | ) | const |
Get a snapshot of current messages' content descriptions.
The returned map may become outdated immediately if other threads modify the container after this call.
Use this snapshot to decide which messages to spill, but keep in mind that the information may no longer be accurate when the actual spill occurs. When calling spill(), the returned size reflects what was actually spilled.
MessageId to ContentDescription. Insert a new message and return its assigned ID.
| message | Message to insert. |
MessageId of the inserted message. | std::size_t rapidsmpf::streaming::SpillableMessages::spill | ( | MessageId | mid, |
| BufferResource * | br | ||
| ) | const |
Spill a message's device memory to host memory.
Performs an in-place deep copy of the message's payload from device to host memory using the specified buffer resource.
If the message is currently being accessed by another thread, is already spilled, not spillable, or does not exist, the operation returns immediately without spilling.
| mid | Message identifier. If the message does not exist, zero is returned. |
| br | Buffer resource used for allocations during the spill operation. |
| std::runtime_error | If there is insufficient host memory to reserve. |