18 #include <ucp/api/ucp.h>
19 #include <ucs/memory/memory_type.h>
22 #include <ucxx/request_data.h>
34 typedef uint64_t ItemIdType;
119 if (!
_enabled)
throw std::runtime_error(
"Resource is disabled.");
123 std::lock_guard<std::mutex> lock(
_mutex);
142 size_t toProcess = 0;
144 std::lock_guard<std::mutex> lock(
_mutex);
148 for (
auto i = 0; i < toProcess; ++i) {
149 std::pair<ItemIdType, T> item;
151 std::lock_guard<std::mutex> lock(
_mutex);
154 if (
_canceled.erase(item.first))
continue;
155 _processing = std::optional<ItemIdType>{item.first};
163 std::lock_guard<std::mutex> lock(
_mutex);
181 std::lock_guard<std::mutex> lock(
_mutex);
183 throw std::runtime_error(
"Cannot cancel, item is being processed.");
186 ucxx_trace_req(
"Canceled item: %lu",
id);
198 std::pair<std::shared_ptr<Request>, DelayedSubmissionCallbackType>> {
262 bool _enableDelayedRequestSubmission{
false};
Base type for a collection of delayed submissions.
Definition: delayed_submission.h:43
std::string _name
The human-readable name of the collection, used for logging.
Definition: delayed_submission.h:45
ItemIdType _itemId
The item ID counter, used to allow cancelation.
Definition: delayed_submission.h:47
std::deque< std::pair< ItemIdType, T > > _collection
The collection.
Definition: delayed_submission.h:50
std::mutex _mutex
Mutex to provide access to _collection.
Definition: delayed_submission.h:52
std::optional< ItemIdType > _processing
The ID of the item being processed, if any.
Definition: delayed_submission.h:48
BaseDelayedSubmissionCollection(const std::string name, const bool enabled)
Constructor for a thread-safe delayed submission collection.
Definition: delayed_submission.h:90
void process()
Process all pending callbacks.
Definition: delayed_submission.h:138
void cancel(ItemIdType id)
Cancel a pending callback.
Definition: delayed_submission.h:179
std::set< ItemIdType > _canceled
IDs of canceled items.
Definition: delayed_submission.h:51
virtual ItemIdType schedule(T item)
Register a callable or complex-type for delayed submission.
Definition: delayed_submission.h:117
bool _enabled
Whether the resource required to process the collection is enabled.
Definition: delayed_submission.h:46
virtual void scheduleLog(ItemIdType id, T item)=0
Log message during schedule().
virtual void processItem(ItemIdType id, T item)=0
Process a single item during process().
A collection of delayed submissions of multiple types.
Definition: delayed_submission.h:254
void processPost()
Process all pending generic-post callback operations.
void registerRequest(std::shared_ptr< Request > request, DelayedSubmissionCallbackType callback)
Register a request for delayed submission.
ItemIdType registerGenericPre(DelayedSubmissionCallbackType callback)
Register a generic callback to execute during processPre().
void processPre()
Process pending delayed request submission and generic-pre callback operations.
bool isDelayedRequestSubmissionEnabled() const
Inquire if delayed request submission is enabled.
void cancelGenericPost(ItemIdType id)
Cancel a generic callback scheduled for processPost() execution.
ItemIdType registerGenericPost(DelayedSubmissionCallbackType callback)
Register a generic callback to execute during processPost().
DelayedSubmissionCollection(bool enableDelayedRequestSubmission=false)
Default delayed submission collection constructor.
void cancelGenericPre(ItemIdType id)
Cancel a generic callback scheduled for processPre() execution.
A collection of delayed submissions of generic callbacks.
Definition: delayed_submission.h:229
void scheduleLog(ItemIdType id, DelayedSubmissionCallbackType item) override
Log message during schedule().
GenericDelayedSubmissionCollection(const std::string name)
Constructor of a collection of delayed submissions of generic callbacks.
void processItem(ItemIdType id, DelayedSubmissionCallbackType callback) override
Process a single item during process().
A collection of delayed request submissions.
Definition: delayed_submission.h:198
void scheduleLog(ItemIdType id, std::pair< std::shared_ptr< Request >, DelayedSubmissionCallbackType > item) override
Log message during schedule().
RequestDelayedSubmissionCollection(const std::string name, const bool enabled)
Constructor of a collection of delayed request submissions.
void processItem(ItemIdType id, std::pair< std::shared_ptr< Request >, DelayedSubmissionCallbackType > item) override
Process a single item during process().
std::function< void()> DelayedSubmissionCallbackType
A user-defined function to execute as part of delayed submission callback.
Definition: delayed_submission.h:32