All Classes Namespaces Functions Variables Typedefs Enumerations Friends
delayed_submission.h
1 
5 #pragma once
6 
7 #include <deque>
8 #include <functional>
9 #include <memory>
10 #include <mutex>
11 #include <optional>
12 #include <set>
13 #include <stdexcept>
14 #include <string>
15 #include <utility>
16 #include <variant>
17 
18 #include <ucp/api/ucp.h>
19 #include <ucs/memory/memory_type.h>
20 
21 #include <ucxx/log.h>
22 #include <ucxx/request_data.h>
23 
24 namespace ucxx {
25 
32 typedef std::function<void()> DelayedSubmissionCallbackType;
33 
34 typedef uint64_t ItemIdType;
35 
42 template <typename T>
44  protected:
45  std::string _name{"undefined"};
46  bool _enabled{true};
47  ItemIdType _itemId{0};
48  std::optional<ItemIdType> _processing{
49  std::nullopt};
50  std::deque<std::pair<ItemIdType, T>> _collection{};
51  std::set<ItemIdType> _canceled{};
52  std::mutex _mutex{};
53 
62  virtual void scheduleLog(ItemIdType id, T item) = 0;
63 
73  virtual void processItem(ItemIdType id, T item) = 0;
74 
75  public:
90  explicit BaseDelayedSubmissionCollection(const std::string name, const bool enabled)
91  : _name{name}, _enabled{enabled}
92  {
93  }
94 
100 
117  [[nodiscard]] virtual ItemIdType schedule(T item)
118  {
119  if (!_enabled) throw std::runtime_error("Resource is disabled.");
120 
121  ItemIdType id;
122  {
123  std::lock_guard<std::mutex> lock(_mutex);
124  id = _itemId++;
125  _collection.emplace_back(id, item);
126  }
127  scheduleLog(id, item);
128 
129  return id;
130  }
131 
138  void process()
139  {
140  // Process only those that were already inserted to prevent from never
141  // returning if `_collection` grows indefinitely.
142  size_t toProcess = 0;
143  {
144  std::lock_guard<std::mutex> lock(_mutex);
145  toProcess = _collection.size();
146  }
147 
148  for (auto i = 0; i < toProcess; ++i) {
149  std::pair<ItemIdType, T> item;
150  {
151  std::lock_guard<std::mutex> lock(_mutex);
152  item = std::move(_collection.front());
153  _collection.pop_front();
154  if (_canceled.erase(item.first)) continue;
155  _processing = std::optional<ItemIdType>{item.first};
156  }
157 
158  processItem(item.first, item.second);
159  }
160 
161  {
162  // Clear the value of `_processing` as no more requests will be processed.
163  std::lock_guard<std::mutex> lock(_mutex);
164  _processing = std::nullopt;
165  }
166  }
167 
179  void cancel(ItemIdType id)
180  {
181  std::lock_guard<std::mutex> lock(_mutex);
182  if (_processing.has_value() && _processing.value() == id)
183  throw std::runtime_error("Cannot cancel, item is being processed.");
184 
185  _canceled.insert(id);
186  ucxx_trace_req("Canceled item: %lu", id);
187  }
188 };
189 
198  std::pair<std::shared_ptr<Request>, DelayedSubmissionCallbackType>> {
199  protected:
201  ItemIdType id,
202  std::pair<std::shared_ptr<Request>, DelayedSubmissionCallbackType> item) override;
203 
205  ItemIdType id,
206  std::pair<std::shared_ptr<Request>, DelayedSubmissionCallbackType> item) override;
207 
208  public:
219  explicit RequestDelayedSubmissionCollection(const std::string name, const bool enabled);
220 };
221 
229  : public BaseDelayedSubmissionCollection<DelayedSubmissionCallbackType> {
230  protected:
231  void scheduleLog(ItemIdType id, DelayedSubmissionCallbackType item) override;
232 
233  void processItem(ItemIdType id, DelayedSubmissionCallbackType callback) override;
234 
235  public:
245  explicit GenericDelayedSubmissionCollection(const std::string name);
246 };
247 
255  private:
257  "generic pre"};
259  "generic post"};
261  "request", false};
262  bool _enableDelayedRequestSubmission{false};
263 
264  public:
276  explicit DelayedSubmissionCollection(bool enableDelayedRequestSubmission = false);
277 
278  DelayedSubmissionCollection() = delete;
280  DelayedSubmissionCollection& operator=(DelayedSubmissionCollection const&) = delete;
283 
298  void processPre();
299 
306  void processPost();
307 
321  void registerRequest(std::shared_ptr<Request> request, DelayedSubmissionCallbackType callback);
322 
333  [[nodiscard]] ItemIdType registerGenericPre(DelayedSubmissionCallbackType callback);
334 
345  [[nodiscard]] ItemIdType registerGenericPost(DelayedSubmissionCallbackType callback);
346 
358  void cancelGenericPre(ItemIdType id);
359 
371  void cancelGenericPost(ItemIdType id);
372 
381  [[nodiscard]] bool isDelayedRequestSubmissionEnabled() const;
382 };
383 
384 } // namespace ucxx
Base type for a collection of delayed submissions.
Definition: delayed_submission.h:43
std::string _name
The human-readable name of the collection, used for logging.
Definition: delayed_submission.h:45
ItemIdType _itemId
The item ID counter, used to allow cancelation.
Definition: delayed_submission.h:47
std::deque< std::pair< ItemIdType, T > > _collection
The collection.
Definition: delayed_submission.h:50
std::mutex _mutex
Mutex to provide access to _collection.
Definition: delayed_submission.h:52
std::optional< ItemIdType > _processing
The ID of the item being processed, if any.
Definition: delayed_submission.h:48
BaseDelayedSubmissionCollection(const std::string name, const bool enabled)
Constructor for a thread-safe delayed submission collection.
Definition: delayed_submission.h:90
void process()
Process all pending callbacks.
Definition: delayed_submission.h:138
void cancel(ItemIdType id)
Cancel a pending callback.
Definition: delayed_submission.h:179
std::set< ItemIdType > _canceled
IDs of canceled items.
Definition: delayed_submission.h:51
virtual ItemIdType schedule(T item)
Register a callable or complex-type for delayed submission.
Definition: delayed_submission.h:117
bool _enabled
Whether the resource required to process the collection is enabled.
Definition: delayed_submission.h:46
virtual void scheduleLog(ItemIdType id, T item)=0
Log message during schedule().
virtual void processItem(ItemIdType id, T item)=0
Process a single item during process().
A collection of delayed submissions of multiple types.
Definition: delayed_submission.h:254
void processPost()
Process all pending generic-post callback operations.
void registerRequest(std::shared_ptr< Request > request, DelayedSubmissionCallbackType callback)
Register a request for delayed submission.
ItemIdType registerGenericPre(DelayedSubmissionCallbackType callback)
Register a generic callback to execute during processPre().
void processPre()
Process pending delayed request submission and generic-pre callback operations.
bool isDelayedRequestSubmissionEnabled() const
Inquire if delayed request submission is enabled.
void cancelGenericPost(ItemIdType id)
Cancel a generic callback scheduled for processPost() execution.
ItemIdType registerGenericPost(DelayedSubmissionCallbackType callback)
Register a generic callback to execute during processPost().
DelayedSubmissionCollection(bool enableDelayedRequestSubmission=false)
Default delayed submission collection constructor.
void cancelGenericPre(ItemIdType id)
Cancel a generic callback scheduled for processPre() execution.
A collection of delayed submissions of generic callbacks.
Definition: delayed_submission.h:229
void scheduleLog(ItemIdType id, DelayedSubmissionCallbackType item) override
Log message during schedule().
GenericDelayedSubmissionCollection(const std::string name)
Constructor of a collection of delayed submissions of generic callbacks.
void processItem(ItemIdType id, DelayedSubmissionCallbackType callback) override
Process a single item during process().
A collection of delayed request submissions.
Definition: delayed_submission.h:198
void scheduleLog(ItemIdType id, std::pair< std::shared_ptr< Request >, DelayedSubmissionCallbackType > item) override
Log message during schedule().
RequestDelayedSubmissionCollection(const std::string name, const bool enabled)
Constructor of a collection of delayed request submissions.
void processItem(ItemIdType id, std::pair< std::shared_ptr< Request >, DelayedSubmissionCallbackType > item) override
Process a single item during process().
Definition: address.h:15
std::function< void()> DelayedSubmissionCallbackType
A user-defined function to execute as part of delayed submission callback.
Definition: delayed_submission.h:32