context.hpp
1 
6 #pragma once
7 
8 #include <memory>
9 #include <thread>
10 
11 #include <coro/coro.hpp>
12 
13 #include <rapidsmpf/communicator/communicator.hpp>
14 #include <rapidsmpf/config.hpp>
15 #include <rapidsmpf/error.hpp>
16 #include <rapidsmpf/statistics.hpp>
17 #include <rapidsmpf/streaming/core/channel.hpp>
18 #include <rapidsmpf/streaming/core/coro_executor.hpp>
19 #include <rapidsmpf/streaming/core/memory_reserve_or_wait.hpp>
20 #include <rapidsmpf/streaming/core/queue.hpp>
21 
22 namespace rapidsmpf::streaming {
23 
41 class Context {
42  public:
55  std::shared_ptr<Communicator::Logger> logger,
56  std::shared_ptr<CoroThreadPoolExecutor> executor,
57  std::shared_ptr<BufferResource> br
58  );
59 
69  std::shared_ptr<Communicator::Logger> logger,
70  std::shared_ptr<BufferResource> br
71  );
72 
103  static std::shared_ptr<Context> from_options(
104  RmmResourceAdaptor* mr,
105  std::shared_ptr<Communicator::Logger> logger,
107  );
108 
109  // No copy constructor and assignment operator.
110  Context(Context const&) = delete;
111  Context& operator=(Context const&) = delete;
112 
113  // No move constructor and assignment operator.
114  Context(Context&&) = delete;
115  Context& operator=(Context&&) = delete;
116 
117  ~Context() noexcept;
118 
129  void shutdown() noexcept;
130 
136  [[nodiscard]] config::Options options() const noexcept;
137 
141  [[nodiscard]] std::shared_ptr<Communicator::Logger> const& logger() const noexcept;
142 
148  [[nodiscard]] std::shared_ptr<CoroThreadPoolExecutor> const&
149  executor() const noexcept;
150 
156  [[nodiscard]] std::shared_ptr<BufferResource> const& br() const noexcept;
157 
174  [[nodiscard]] std::shared_ptr<MemoryReserveOrWait> const& memory(
175  MemoryType mem_type
176  ) const noexcept;
177 
183  [[nodiscard]] std::shared_ptr<Statistics> statistics() const noexcept;
184 
190  [[nodiscard]] std::shared_ptr<Channel> create_channel() const noexcept;
191 
197  [[nodiscard]] std::shared_ptr<SpillableMessages> const&
198  spillable_messages() const noexcept;
199 
207  [[nodiscard]] std::shared_ptr<BoundedQueue> create_bounded_queue(
208  std::size_t buffer_size
209  ) const noexcept;
210 
220  [[nodiscard]] std::size_t uid() const noexcept;
221 
222  private:
223  std::size_t const uid_;
224  std::atomic<bool> is_shutdown_{false};
225  std::thread::id creator_thread_id_;
226  config::Options options_;
227  std::shared_ptr<Communicator::Logger> logger_;
228  std::shared_ptr<CoroThreadPoolExecutor> executor_;
229  std::shared_ptr<BufferResource> br_;
230  std::array<std::shared_ptr<MemoryReserveOrWait>, MEMORY_TYPES.size()> memory_ = {};
231  std::shared_ptr<SpillableMessages> spillable_messages_;
232  SpillManager::SpillFunctionID spill_function_id_{};
233 };
234 
235 } // namespace rapidsmpf::streaming
Class managing buffer resources.
Abstract base class for a communication mechanism between nodes.
A RMM memory resource adaptor tailored to RapidsMPF.
std::size_t SpillFunctionID
Represents a unique identifier for a registered spill function.
Tracks statistics across rapidsmpf operations.
Definition: statistics.hpp:62
Manages configuration options for RapidsMPF operations.
Definition: config.hpp:140
A bounded queue for type-erased Messages.
Definition: queue.hpp:31
A coroutine-based channel for sending and receiving messages asynchronously.
Definition: channel.hpp:52
Context for actors (coroutines) in rapidsmpf.
Definition: context.hpp:41
std::shared_ptr< SpillableMessages > const & spillable_messages() const noexcept
Returns the spillable messages collection.
std::shared_ptr< Communicator::Logger > const & logger() const noexcept
std::size_t uid() const noexcept
Return a unique identifier for this context.
std::shared_ptr< Channel > create_channel() const noexcept
Create a new channel associated with this context.
std::shared_ptr< BoundedQueue > create_bounded_queue(std::size_t buffer_size) const noexcept
Create a new bounded queue associated with this context.
static std::shared_ptr< Context > from_options(RmmResourceAdaptor *mr, std::shared_ptr< Communicator::Logger > logger, config::Options options)
Create a Context based on configuration options.
Context(config::Options options, std::shared_ptr< Communicator::Logger > logger, std::shared_ptr< BufferResource > br)
Convenience constructor using the provided configuration options.
std::shared_ptr< CoroThreadPoolExecutor > const & executor() const noexcept
Returns the coroutine executor.
Context(config::Options options, std::shared_ptr< Communicator::Logger > logger, std::shared_ptr< CoroThreadPoolExecutor > executor, std::shared_ptr< BufferResource > br)
Full constructor for the Context.
config::Options options() const noexcept
Returns the configuration options.
std::shared_ptr< MemoryReserveOrWait > const & memory(MemoryType mem_type) const noexcept
Get the handle for memory reservations for a given memory type.
void shutdown() noexcept
Shut down the context.
std::shared_ptr< BufferResource > const & br() const noexcept
Returns the buffer resource.
std::shared_ptr< Statistics > statistics() const noexcept
Returns the statistics collector.
Executor wrapper around a coro::thread_pool used for coroutine execution.
Asynchronous coordinator for memory reservation requests.
Container for individually spillable messages.
constexpr std::array< MemoryType, 3 > MEMORY_TYPES
All memory types sorted in decreasing order of preference.
Definition: memory_type.hpp:23
MemoryType
Enum representing the type of memory sorted in decreasing order of preference.
Definition: memory_type.hpp:16