context.hpp
1 
6 #pragma once
7 
8 #include <memory>
9 
10 #include <rapidsmpf/communicator/communicator.hpp>
11 #include <rapidsmpf/config.hpp>
12 #include <rapidsmpf/error.hpp>
13 #include <rapidsmpf/progress_thread.hpp>
14 #include <rapidsmpf/statistics.hpp>
15 #include <rapidsmpf/streaming/core/channel.hpp>
16 #include <rapidsmpf/streaming/core/queue.hpp>
17 
18 #include <coro/coro.hpp>
19 
20 namespace rapidsmpf::streaming {
21 
25 class Context {
26  public:
40  config::Options options,
41  std::shared_ptr<Communicator> comm,
42  std::shared_ptr<ProgressThread> progress_thread,
43  std::unique_ptr<coro::thread_pool> executor,
44  std::shared_ptr<BufferResource> br,
45  std::shared_ptr<Statistics> statistics
46  );
47 
59  config::Options options,
60  std::shared_ptr<Communicator> comm,
61  std::shared_ptr<BufferResource> br,
62  std::shared_ptr<Statistics> statistics = Statistics::disabled()
63  );
64 
65  ~Context() noexcept;
66 
72  [[nodiscard]] config::Options get_options() const noexcept;
73 
79  [[nodiscard]] std::shared_ptr<Communicator> comm() const noexcept;
80 
86  [[nodiscard]] std::shared_ptr<ProgressThread> progress_thread() const noexcept;
87 
93  [[nodiscard]] std::unique_ptr<coro::thread_pool>& executor() noexcept;
94 
100  [[nodiscard]] BufferResource* br() const noexcept;
101 
107  [[nodiscard]] std::shared_ptr<Statistics> statistics() const noexcept;
108 
114  [[nodiscard]] std::shared_ptr<Channel> create_channel() const noexcept;
115 
121  [[nodiscard]] std::shared_ptr<SpillableMessages> spillable_messages() const {
122  return spillable_messages_;
123  }
124 
132  [[nodiscard]] std::shared_ptr<BoundedQueue> create_bounded_queue(
133  std::size_t buffer_size
134  ) const noexcept;
135 
136  private:
137  config::Options options_;
138  std::shared_ptr<Communicator> comm_;
139  std::shared_ptr<ProgressThread> progress_thread_;
140  std::unique_ptr<coro::thread_pool> executor_;
141  std::shared_ptr<BufferResource> br_;
142  std::shared_ptr<Statistics> statistics_;
143  std::shared_ptr<SpillableMessages> spillable_messages_;
144  SpillManager::SpillFunctionID spill_function_id_{};
145 };
146 
147 } // namespace rapidsmpf::streaming
Class managing buffer resources.
Definition: resource.hpp:133
Abstract base class for a communication mechanism between nodes.
A progress thread that can execute arbitrary functions.
std::size_t SpillFunctionID
Represents a unique identifier for a registered spill function.
Track statistics across rapidsmpf operations.
Definition: statistics.hpp:23
static std::shared_ptr< Statistics > disabled()
Returns a shared pointer to a disabled (no-op) Statistics instance.
Manages configuration options for RapidsMPF operations.
Definition: config.hpp:124
A coroutine-based channel for sending and receiving messages asynchronously.
Definition: channel.hpp:37
Context for nodes (coroutines) in rapidsmpf.
Definition: context.hpp:25
std::shared_ptr< Channel > create_channel() const noexcept
Create a new channel associated with this context.
std::shared_ptr< BoundedQueue > create_bounded_queue(std::size_t buffer_size) const noexcept
Create a new bounded queue associated with this context.
Context(config::Options options, std::shared_ptr< Communicator > comm, std::shared_ptr< ProgressThread > progress_thread, std::unique_ptr< coro::thread_pool > executor, std::shared_ptr< BufferResource > br, std::shared_ptr< Statistics > statistics)
Full constructor for the Context.
std::shared_ptr< SpillableMessages > spillable_messages() const
Returns the spillable messages collection.
Definition: context.hpp:121
std::unique_ptr< coro::thread_pool > & executor() noexcept
Returns the coroutine thread pool.
config::Options get_options() const noexcept
Returns the configuration options.
std::shared_ptr< ProgressThread > progress_thread() const noexcept
Returns the progress thread.
Context(config::Options options, std::shared_ptr< Communicator > comm, std::shared_ptr< BufferResource > br, std::shared_ptr< Statistics > statistics=Statistics::disabled())
Convenience constructor with minimal configuration.
BufferResource * br() const noexcept
Returns the buffer resource.
std::shared_ptr< Communicator > comm() const noexcept
Returns the communicator.
std::shared_ptr< Statistics > statistics() const noexcept
Returns the statistics collector.
Container for individually spillable messages.