coro_executor.hpp
1 
6 #pragma once
7 #include <memory>
8 #include <thread>
9 
10 #include <coro/coro.hpp>
11 
12 #include <rapidsmpf/config.hpp>
13 #include <rapidsmpf/error.hpp>
14 #include <rapidsmpf/statistics.hpp>
15 
16 namespace rapidsmpf::streaming {
17 
37  public:
49  std::uint32_t num_streaming_threads,
50  std::shared_ptr<Statistics> statistics = Statistics::disabled()
51  );
52 
69  config::Options options,
70  std::shared_ptr<Statistics> statistics = Statistics::disabled()
71  );
72 
73  ~CoroThreadPoolExecutor() noexcept;
74 
78  CoroThreadPoolExecutor& operator=(CoroThreadPoolExecutor& o) = delete;
79  CoroThreadPoolExecutor& operator=(CoroThreadPoolExecutor&& o) = delete;
80 
91  void shutdown() noexcept;
92 
98  [[nodiscard]] std::uint32_t num_streaming_threads() const noexcept {
99  return executor_->thread_count();
100  }
101 
109  [[nodiscard]] std::unique_ptr<coro::thread_pool>& get() noexcept;
110 
116  [[nodiscard]] auto schedule() {
117  return executor_->schedule();
118  }
119 
126  [[nodiscard]] auto schedule(auto task) {
127  return executor_->schedule(std::move(task));
128  }
129 
135  [[nodiscard]] auto yield() {
136  return executor_->yield();
137  }
138 
145  auto spawn_detached(auto task) noexcept {
146  return executor_->spawn_detached(std::move(task));
147  }
148 
155  auto spawn_joinable(auto task) noexcept {
156  return executor_->spawn_joinable(std::move(task));
157  }
158 
159  private:
160  std::atomic<bool> is_shutdown_{false};
161  std::unique_ptr<coro::thread_pool> executor_;
162  std::shared_ptr<Statistics> statistics_;
163  std::thread::id creator_thread_id_;
164 };
165 
166 } // namespace rapidsmpf::streaming
static std::shared_ptr< Statistics > disabled()
Returns a shared pointer to a disabled (no-op) Statistics instance.
Manages configuration options for RapidsMPF operations.
Definition: config.hpp:140
Executor wrapper around a coro::thread_pool used for coroutine execution.
std::uint32_t num_streaming_threads() const noexcept
Get the configured number of streaming threads.
CoroThreadPoolExecutor(std::uint32_t num_streaming_threads, std::shared_ptr< Statistics > statistics=Statistics::disabled())
Construct an executor with an explicit number of streaming threads.
CoroThreadPoolExecutor(config::Options options, std::shared_ptr< Statistics > statistics=Statistics::disabled())
Construct an executor from configuration options.
auto yield()
Yield execution back to the underlying libcoro thread pool.
std::unique_ptr< coro::thread_pool > & get() noexcept
Get access to the underlying thread pool to be used with libcoro.
auto schedule(auto task)
Schedule a task on the underlying libcoro thread pool.
void shutdown() noexcept
Shut down the underlying thread pool.
auto spawn_joinable(auto task) noexcept
Spawn a joinable task on the underlying libcoro thread pool.
auto schedule()
Schedule work on the underlying libcoro thread pool.
auto spawn_detached(auto task) noexcept
Spawn a detached task on the underlying libcoro thread pool.