context.hpp
1 
6 #pragma once
7 
8 #include <memory>
9 #include <thread>
10 
11 #include <coro/coro.hpp>
12 
13 #include <rapidsmpf/communicator/communicator.hpp>
14 #include <rapidsmpf/config.hpp>
15 #include <rapidsmpf/error.hpp>
16 #include <rapidsmpf/statistics.hpp>
17 #include <rapidsmpf/streaming/core/channel.hpp>
18 #include <rapidsmpf/streaming/core/coro_executor.hpp>
19 #include <rapidsmpf/streaming/core/memory_reserve_or_wait.hpp>
20 #include <rapidsmpf/streaming/core/queue.hpp>
21 
22 namespace rapidsmpf::streaming {
23 
41 class Context {
42  public:
55  std::shared_ptr<Communicator::Logger> logger,
56  std::shared_ptr<CoroThreadPoolExecutor> executor,
57  std::shared_ptr<BufferResource> br
58  );
59 
69  std::shared_ptr<Communicator::Logger> logger,
70  std::shared_ptr<BufferResource> br
71  );
72 
102  static std::shared_ptr<Context> from_options(
104  std::shared_ptr<Communicator::Logger> logger,
106  );
107 
108  // No copy constructor and assignment operator.
109  Context(Context const&) = delete;
110  Context& operator=(Context const&) = delete;
111 
112  // No move constructor and assignment operator.
113  Context(Context&&) = delete;
114  Context& operator=(Context&&) = delete;
115 
116  ~Context() noexcept;
117 
128  void shutdown() noexcept;
129 
135  [[nodiscard]] config::Options options() const noexcept;
136 
140  [[nodiscard]] std::shared_ptr<Communicator::Logger> const& logger() const noexcept;
141 
147  [[nodiscard]] std::shared_ptr<CoroThreadPoolExecutor> const&
148  executor() const noexcept;
149 
155  [[nodiscard]] std::shared_ptr<BufferResource> const& br() const noexcept;
156 
173  [[nodiscard]] std::shared_ptr<MemoryReserveOrWait> const& memory(
174  MemoryType mem_type
175  ) const noexcept;
176 
182  [[nodiscard]] std::shared_ptr<Statistics> statistics() const noexcept;
183 
189  [[nodiscard]] std::shared_ptr<Channel> create_channel() const noexcept;
190 
196  [[nodiscard]] std::shared_ptr<SpillableMessages> const&
197  spillable_messages() const noexcept;
198 
206  [[nodiscard]] std::shared_ptr<BoundedQueue> create_bounded_queue(
207  std::size_t buffer_size
208  ) const noexcept;
209 
219  [[nodiscard]] std::size_t uid() const noexcept;
220 
221  private:
222  std::size_t const uid_;
223  std::atomic<bool> is_shutdown_{false};
224  std::thread::id creator_thread_id_;
225  config::Options options_;
226  std::shared_ptr<Communicator::Logger> logger_;
227  std::shared_ptr<CoroThreadPoolExecutor> executor_;
228  std::shared_ptr<BufferResource> br_;
229  std::array<std::shared_ptr<MemoryReserveOrWait>, MEMORY_TYPES.size()> memory_ = {};
230  std::shared_ptr<SpillableMessages> spillable_messages_;
231  SpillManager::SpillFunctionID spill_function_id_{};
232 };
233 
234 } // namespace rapidsmpf::streaming
Class managing buffer resources.
Abstract base class for a communication mechanism between nodes.
A RMM memory resource adaptor tailored to RapidsMPF.
std::size_t SpillFunctionID
Represents a unique identifier for a registered spill function.
Tracks statistics across rapidsmpf operations.
Definition: statistics.hpp:71
Manages configuration options for RapidsMPF operations.
Definition: config.hpp:140
A bounded queue for type-erased Messages.
Definition: queue.hpp:31
A coroutine-based channel for sending and receiving messages asynchronously.
Definition: channel.hpp:52
Context for actors (coroutines) in rapidsmpf.
Definition: context.hpp:41
std::shared_ptr< SpillableMessages > const & spillable_messages() const noexcept
Returns the spillable messages collection.
std::shared_ptr< Communicator::Logger > const & logger() const noexcept
std::size_t uid() const noexcept
Return a unique identifier for this context.
std::shared_ptr< Channel > create_channel() const noexcept
Create a new channel associated with this context.
std::shared_ptr< BoundedQueue > create_bounded_queue(std::size_t buffer_size) const noexcept
Create a new bounded queue associated with this context.
Context(config::Options options, std::shared_ptr< Communicator::Logger > logger, std::shared_ptr< BufferResource > br)
Convenience constructor using the provided configuration options.
std::shared_ptr< CoroThreadPoolExecutor > const & executor() const noexcept
Returns the coroutine executor.
Context(config::Options options, std::shared_ptr< Communicator::Logger > logger, std::shared_ptr< CoroThreadPoolExecutor > executor, std::shared_ptr< BufferResource > br)
Full constructor for the Context.
config::Options options() const noexcept
Returns the configuration options.
std::shared_ptr< MemoryReserveOrWait > const & memory(MemoryType mem_type) const noexcept
Get the handle for memory reservations for a given memory type.
void shutdown() noexcept
Shut down the context.
std::shared_ptr< BufferResource > const & br() const noexcept
Returns the buffer resource.
std::shared_ptr< Statistics > statistics() const noexcept
Returns the statistics collector.
static std::shared_ptr< Context > from_options(RmmResourceAdaptor mr, std::shared_ptr< Communicator::Logger > logger, config::Options options)
Create a Context based on configuration options.
Executor wrapper around a coro::thread_pool used for coroutine execution.
Asynchronous coordinator for memory reservation requests.
Container for individually spillable messages.
constexpr std::array< MemoryType, 3 > MEMORY_TYPES
All memory types sorted in decreasing order of preference.
Definition: memory_type.hpp:23
MemoryType
Enum representing the type of memory sorted in decreasing order of preference.
Definition: memory_type.hpp:16