delayed_submission.h
1 
5 #pragma once
6 
7 #include <deque>
8 #include <functional>
9 #include <memory>
10 #include <mutex>
11 #include <optional>
12 #include <set>
13 #include <stdexcept>
14 #include <string>
15 #include <utility>
16 #include <variant>
17 
18 #include <ucp/api/ucp.h>
19 #include <ucs/memory/memory_type.h>
20 
21 #include <ucxx/log.h>
22 #include <ucxx/request_data.h>
23 
24 namespace ucxx {
25 
32 typedef std::function<void()> DelayedSubmissionCallbackType;
33 
40 typedef uint64_t ItemIdType;
41 
48 template <typename T>
50  protected:
51  std::string _name{"undefined"};
52  bool _enabled{true};
54  std::optional<ItemIdType> _processing{
55  std::nullopt};
56  std::deque<std::pair<ItemIdType, T>> _collection{};
57  std::set<ItemIdType> _canceled{};
58  std::mutex _mutex{};
59 
68  virtual void scheduleLog(ItemIdType id, T item) = 0;
69 
79  virtual void processItem(ItemIdType id, T item) = 0;
80 
81  public:
96  explicit BaseDelayedSubmissionCollection(const std::string name, const bool enabled)
97  : _name{name}, _enabled{enabled}
98  {
99  }
100 
106 
123  [[nodiscard]] virtual ItemIdType schedule(T item)
124  {
125  if (!_enabled) throw std::runtime_error("Resource is disabled.");
126 
127  ItemIdType id;
128  {
129  std::lock_guard<std::mutex> lock(_mutex);
130  id = _itemId++;
131  _collection.emplace_back(id, item);
132  }
133  scheduleLog(id, item);
134 
135  return id;
136  }
137 
144  void process()
145  {
146  // Process only those that were already inserted to prevent from never
147  // returning if `_collection` grows indefinitely.
148  size_t toProcess = 0;
149  {
150  std::lock_guard<std::mutex> lock(_mutex);
151  toProcess = _collection.size();
152  }
153 
154  for (size_t i = 0; i < toProcess; ++i) {
155  std::pair<ItemIdType, T> item;
156  {
157  std::lock_guard<std::mutex> lock(_mutex);
158  item = std::move(_collection.front());
159  _collection.pop_front();
160  if (_canceled.erase(item.first)) continue;
161  _processing = std::optional<ItemIdType>{item.first};
162  }
163 
164  processItem(item.first, item.second);
165  }
166 
167  {
168  // Clear the value of `_processing` as no more requests will be processed.
169  std::lock_guard<std::mutex> lock(_mutex);
170  _processing = std::nullopt;
171  }
172  }
173 
186  {
187  std::lock_guard<std::mutex> lock(_mutex);
188  if (_processing.has_value() && _processing.value() == id)
189  throw std::runtime_error("Cannot cancel, item is being processed.");
190 
191  _canceled.insert(id);
192  ucxx_trace_req("Canceled item: %lu", id);
193  }
194 };
195 
204  std::pair<std::shared_ptr<Request>, DelayedSubmissionCallbackType>> {
205  protected:
207  ItemIdType id,
208  std::pair<std::shared_ptr<Request>, DelayedSubmissionCallbackType> item) override;
209 
211  ItemIdType id,
212  std::pair<std::shared_ptr<Request>, DelayedSubmissionCallbackType> item) override;
213 
214  public:
225  explicit RequestDelayedSubmissionCollection(const std::string name, const bool enabled);
226 };
227 
235  : public BaseDelayedSubmissionCollection<DelayedSubmissionCallbackType> {
236  protected:
238 
240 
241  public:
251  explicit GenericDelayedSubmissionCollection(const std::string name);
252 };
253 
261  private:
263  "generic pre"};
265  "generic post"};
267  "request", false};
268  bool _enableDelayedRequestSubmission{false};
269 
270  public:
282  explicit DelayedSubmissionCollection(bool enableDelayedRequestSubmission = false);
283 
284  DelayedSubmissionCollection() = delete;
286  DelayedSubmissionCollection& operator=(DelayedSubmissionCollection const&) = delete;
289 
304  void processPre();
305 
312  void processPost();
313 
327  void registerRequest(std::shared_ptr<Request> request, DelayedSubmissionCallbackType callback);
328 
340 
352 
365 
378 
387  [[nodiscard]] bool isDelayedRequestSubmissionEnabled() const;
388 };
389 
390 } // namespace ucxx
Base type for a collection of delayed submissions.
Definition: delayed_submission.h:49
std::string _name
The human-readable name of the collection, used for logging.
Definition: delayed_submission.h:51
ItemIdType _itemId
The item ID counter, used to allow cancelation.
Definition: delayed_submission.h:53
std::deque< std::pair< ItemIdType, T > > _collection
The collection.
Definition: delayed_submission.h:56
std::mutex _mutex
Mutex to provide access to _collection.
Definition: delayed_submission.h:58
std::optional< ItemIdType > _processing
The ID of the item being processed, if any.
Definition: delayed_submission.h:54
BaseDelayedSubmissionCollection(const std::string name, const bool enabled)
Constructor for a thread-safe delayed submission collection.
Definition: delayed_submission.h:96
void process()
Process all pending callbacks.
Definition: delayed_submission.h:144
void cancel(ItemIdType id)
Cancel a pending callback.
Definition: delayed_submission.h:185
std::set< ItemIdType > _canceled
IDs of canceled items.
Definition: delayed_submission.h:57
virtual ItemIdType schedule(T item)
Register a callable or complex-type for delayed submission.
Definition: delayed_submission.h:123
bool _enabled
Whether the resource required to process the collection is enabled.
Definition: delayed_submission.h:52
virtual void scheduleLog(ItemIdType id, T item)=0
Log message during schedule().
virtual void processItem(ItemIdType id, T item)=0
Process a single item during process().
A collection of delayed submissions of multiple types.
Definition: delayed_submission.h:260
void processPost()
Process all pending generic-post callback operations.
void registerRequest(std::shared_ptr< Request > request, DelayedSubmissionCallbackType callback)
Register a request for delayed submission.
ItemIdType registerGenericPre(DelayedSubmissionCallbackType callback)
Register a generic callback to execute during processPre().
void processPre()
Process pending delayed request submission and generic-pre callback operations.
bool isDelayedRequestSubmissionEnabled() const
Inquire if delayed request submission is enabled.
void cancelGenericPost(ItemIdType id)
Cancel a generic callback scheduled for processPost() execution.
ItemIdType registerGenericPost(DelayedSubmissionCallbackType callback)
Register a generic callback to execute during processPost().
DelayedSubmissionCollection(bool enableDelayedRequestSubmission=false)
Default delayed submission collection constructor.
void cancelGenericPre(ItemIdType id)
Cancel a generic callback scheduled for processPre() execution.
A collection of delayed submissions of generic callbacks.
Definition: delayed_submission.h:235
void scheduleLog(ItemIdType id, DelayedSubmissionCallbackType item) override
Log message during schedule().
GenericDelayedSubmissionCollection(const std::string name)
Constructor of a collection of delayed submissions of generic callbacks.
void processItem(ItemIdType id, DelayedSubmissionCallbackType callback) override
Process a single item during process().
A collection of delayed request submissions.
Definition: delayed_submission.h:204
void scheduleLog(ItemIdType id, std::pair< std::shared_ptr< Request >, DelayedSubmissionCallbackType > item) override
Log message during schedule().
RequestDelayedSubmissionCollection(const std::string name, const bool enabled)
Constructor of a collection of delayed request submissions.
void processItem(ItemIdType id, std::pair< std::shared_ptr< Request >, DelayedSubmissionCallbackType > item) override
Process a single item during process().
Definition: address.h:15
std::function< void()> DelayedSubmissionCallbackType
A user-defined function to execute as part of delayed submission callback.
Definition: delayed_submission.h:32
uint64_t ItemIdType
Type for identifying items in DelayedSubmissionCollection.
Definition: delayed_submission.h:40