11 #include <coro/coro.hpp>
13 #include <rapidsmpf/communicator/communicator.hpp>
14 #include <rapidsmpf/config.hpp>
15 #include <rapidsmpf/error.hpp>
16 #include <rapidsmpf/statistics.hpp>
17 #include <rapidsmpf/streaming/core/channel.hpp>
18 #include <rapidsmpf/streaming/core/coro_executor.hpp>
19 #include <rapidsmpf/streaming/core/memory_reserve_or_wait.hpp>
20 #include <rapidsmpf/streaming/core/queue.hpp>
55 std::shared_ptr<Communicator::Logger>
logger,
56 std::shared_ptr<CoroThreadPoolExecutor>
executor,
57 std::shared_ptr<BufferResource>
br
69 std::shared_ptr<Communicator::Logger>
logger,
70 std::shared_ptr<BufferResource>
br
105 std::shared_ptr<Communicator::Logger>
logger,
136 [[nodiscard]] config::Options
options() const noexcept;
208 std::
size_t buffer_size
220 [[nodiscard]] std::
size_t uid() const noexcept;
223 std::
size_t const uid_;
224 std::atomic<
bool> is_shutdown_{
false};
225 std::thread::id creator_thread_id_;
227 std::shared_ptr<Communicator::Logger> logger_;
228 std::shared_ptr<CoroThreadPoolExecutor> executor_;
229 std::shared_ptr<BufferResource> br_;
230 std::array<std::shared_ptr<MemoryReserveOrWait>,
MEMORY_TYPES.size()> memory_ = {};
231 std::shared_ptr<SpillableMessages> spillable_messages_;
Class managing buffer resources.
Abstract base class for a communication mechanism between nodes.
A RMM memory resource adaptor tailored to RapidsMPF.
std::size_t SpillFunctionID
Represents a unique identifier for a registered spill function.
Tracks statistics across rapidsmpf operations.
Manages configuration options for RapidsMPF operations.
A bounded queue for type-erased Messages.
A coroutine-based channel for sending and receiving messages asynchronously.
Context for actors (coroutines) in rapidsmpf.
std::shared_ptr< SpillableMessages > const & spillable_messages() const noexcept
Returns the spillable messages collection.
std::shared_ptr< Communicator::Logger > const & logger() const noexcept
std::size_t uid() const noexcept
Return a unique identifier for this context.
std::shared_ptr< Channel > create_channel() const noexcept
Create a new channel associated with this context.
std::shared_ptr< BoundedQueue > create_bounded_queue(std::size_t buffer_size) const noexcept
Create a new bounded queue associated with this context.
static std::shared_ptr< Context > from_options(RmmResourceAdaptor *mr, std::shared_ptr< Communicator::Logger > logger, config::Options options)
Create a Context based on configuration options.
Context(config::Options options, std::shared_ptr< Communicator::Logger > logger, std::shared_ptr< BufferResource > br)
Convenience constructor using the provided configuration options.
std::shared_ptr< CoroThreadPoolExecutor > const & executor() const noexcept
Returns the coroutine executor.
Context(config::Options options, std::shared_ptr< Communicator::Logger > logger, std::shared_ptr< CoroThreadPoolExecutor > executor, std::shared_ptr< BufferResource > br)
Full constructor for the Context.
config::Options options() const noexcept
Returns the configuration options.
std::shared_ptr< MemoryReserveOrWait > const & memory(MemoryType mem_type) const noexcept
Get the handle for memory reservations for a given memory type.
void shutdown() noexcept
Shut down the context.
std::shared_ptr< BufferResource > const & br() const noexcept
Returns the buffer resource.
std::shared_ptr< Statistics > statistics() const noexcept
Returns the statistics collector.
Executor wrapper around a coro::thread_pool used for coroutine execution.
Asynchronous coordinator for memory reservation requests.
Container for individually spillable messages.
constexpr std::array< MemoryType, 3 > MEMORY_TYPES
All memory types sorted in decreasing order of preference.
MemoryType
Enum representing the type of memory sorted in decreasing order of preference.