Specialized Python implementation of a ucxx::Worker
.
More...
#include <worker.h>
Public Member Functions | |
Worker (const Worker &)=delete | |
Worker & | operator= (Worker const &)=delete |
Worker (Worker &&o)=delete | |
Worker & | operator= (Worker &&o)=delete |
void | populateFuturesPool () override |
Populate the Python futures pool. More... | |
void | clearFuturesPool () override |
Clear the futures pool. More... | |
std::shared_ptr<::ucxx::Future > | getFuture () override |
Get a Python future from the pool. More... | |
RequestNotifierWaitState | waitRequestNotifier (uint64_t periodNs) override |
Block until a request event. More... | |
void | runRequestNotifier () override |
Notify Python futures of each completed communication request. More... | |
void | stopRequestNotifierThread () override |
Signal the notifier to terminate. More... | |
![]() | |
Worker (const Worker &)=delete | |
Worker & | operator= (Worker const &)=delete |
Worker (Worker &&o)=delete | |
Worker & | operator= (Worker &&o)=delete |
virtual | ~Worker () |
ucxx::Worker destructor. | |
ucp_worker_h | getHandle () |
Get the underlying ucp_worker_h handle. More... | |
std::string | getInfo () |
Get information about the underlying ucp_worker_h object. More... | |
void | initBlockingProgressMode () |
Initialize blocking progress mode. More... | |
int | getEpollFileDescriptor () |
Get the epoll file descriptor associated with the worker. More... | |
bool | arm () |
Arm the UCP worker. More... | |
bool | progressWorkerEvent (const int epollTimeout=-1) |
Progress worker event while in blocking progress mode. More... | |
void | signal () |
Signal the worker that an event happened. More... | |
bool | waitProgress () |
Block until an event has happened, then progresses. More... | |
bool | progressOnce () |
Progress the worker only once. More... | |
bool | progress () |
Progress the worker until all communication events are completed. More... | |
void | registerDelayedSubmission (std::shared_ptr< Request > request, DelayedSubmissionCallbackType callback) |
Register delayed request submission. More... | |
bool | registerGenericPre (DelayedSubmissionCallbackType callback, uint64_t period=0) |
Register callback to be executed in progress thread before progressing. More... | |
bool | registerGenericPost (DelayedSubmissionCallbackType callback, uint64_t period=0) |
Register callback to be executed in progress thread before progressing. More... | |
bool | isDelayedRequestSubmissionEnabled () const |
Inquire if worker has been created with delayed submission enabled. More... | |
bool | isFutureEnabled () const |
Inquire if worker has been created with future support. More... | |
void | setProgressThreadStartCallback (std::function< void(void *)> callback, void *callbackArg) |
Set callback to be executed at the progress thread start. More... | |
void | startProgressThread (const bool pollingMode=false, const int epollTimeout=1) |
Start the progress thread. More... | |
void | stopProgressThread () |
Stop the progress thread. More... | |
bool | isProgressThreadRunning () |
Inquire if worker has a progress thread running. More... | |
std::thread::id | getProgressThreadId () |
Get the progress thread ID. More... | |
size_t | cancelInflightRequests (uint64_t period=0, uint64_t maxAttempts=1) |
Cancel inflight requests. More... | |
void | scheduleRequestCancel (TrackedRequestsPtr trackedRequests) |
Schedule cancelation of inflight requests. More... | |
void | removeInflightRequest (const Request *const request) |
Remove reference to request from internal container. More... | |
bool | tagProbe (const Tag tag) |
Check for uncaught tag messages. More... | |
std::shared_ptr< Request > | tagRecv (void *buffer, size_t length, Tag tag, TagMask tagMask, const bool enableFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr) |
Enqueue a tag receive operation. More... | |
std::shared_ptr< Address > | getAddress () |
Get the address of the UCX worker object. More... | |
std::shared_ptr< Endpoint > | createEndpointFromHostname (std::string ipAddress, uint16_t port, bool endpointErrorHandling=true) |
Create endpoint to worker listening on specific IP and port. More... | |
std::shared_ptr< Endpoint > | createEndpointFromWorkerAddress (std::shared_ptr< Address > address, bool endpointErrorHandling=true) |
Create endpoint to worker located at UCX address. More... | |
std::shared_ptr< Listener > | createListener (uint16_t port, ucp_listener_conn_callback_t callback, void *callbackArgs) |
Listen for remote connections on given port. More... | |
void | registerAmAllocator (ucs_memory_type_t memoryType, AmAllocatorType allocator) |
Register allocator for active messages. More... | |
void | registerAmReceiverCallback (AmReceiverCallbackInfo info, AmReceiverCallbackType callback) |
Register receiver callback for active messages. More... | |
bool | amProbe (const ucp_ep_h endpointHandle) const |
Check for uncaught active messages. More... | |
std::shared_ptr< Request > | flush (const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr) |
Enqueue a flush operation. More... | |
![]() | |
void | setParent (std::shared_ptr< Component > parent) |
Set the internal parent reference. More... | |
std::shared_ptr< Component > | getParent () const |
Get the internal parent reference. More... | |
Friends | |
std::shared_ptr<::ucxx::Worker > | createWorker (std::shared_ptr< Context > context, const bool enableDelayedSubmission, const bool enableFuture) |
Constructor of shared_ptr<ucxx::python::Worker> . More... | |
Additional Inherited Members | |
![]() | |
Worker (std::shared_ptr< Context > context, const bool enableDelayedSubmission=false, const bool enableFuture=false) | |
Protected constructor of ucxx::Worker . More... | |
![]() | |
bool | _enableFuture |
Boolean identifying whether the worker was created with future capability. More... | |
std::mutex | _futuresPoolMutex {} |
Mutex to access the futures pool. | |
std::queue< std::shared_ptr< Future > > | _futuresPool {} |
Futures pool to prevent running out of fresh futures. | |
std::shared_ptr< Notifier > | _notifier {nullptr} |
Notifier object. | |
std::shared_ptr< internal::AmData > | _amData |
Worker data made available to Active Messages callback. | |
![]() | |
std::shared_ptr< Component > | _parent {nullptr} |
A reference-counted pointer to the parent. | |
Specialized Python implementation of a ucxx::Worker
.
Specialized Python implementation of a ucxx::Worker
, providing Python-specific functionality, such as notification of Python futures.
|
overridevirtual |
Clear the futures pool.
Clear the futures pool, ensuring all references are removed and thus avoiding reference cycles that prevent the ucxx::Worker
and other resources from cleaning up on time.
This method is safe to be called even if object was created with enableFuture=false
.
Reimplemented from ucxx::Worker.
|
overridevirtual |
Get a Python future from the pool.
Get a Python future from the pool. If the pool is empty, ucxx::python::Worker::populateFuturesPool()
is called and a warning is raised, since that likely means the user is missing to call the aforementioned method regularly.
shared_ptr<ucxx::python::Future>
object Reimplemented from ucxx::Worker.
|
overridevirtual |
Populate the Python futures pool.
To avoid taking the Python GIL for every new future required by each ucxx::Request
, the ucxx::python::Worker
maintains a pool of futures that can be acquired when a new ucxx::Request
is created. Currently the pool has a maximum size of 100 objects, and will refill once it goes under 50, otherwise calling this functions results in a no-op.
std::runtime_error | if object was created with enableFuture=false . |
Reimplemented from ucxx::Worker.
|
overridevirtual |
Notify Python futures of each completed communication request.
Notifies Python futures of each completed communication request of their new status. This method is intended to be used from the Python notifier thread, where the thread will call waitRequestNotifier()
and block until some communication is completed, and then call this method to notify all futures. If this is notifying a Python future, the thread where this method is called from must be using the same Python event loop as the thread that submitted the transfer request.
Reimplemented from ucxx::Worker.
|
overridevirtual |
Signal the notifier to terminate.
Signals the notifier to terminate, awakening the waitRequestNotifier()
blocking call.
Reimplemented from ucxx::Worker.
|
overridevirtual |
Block until a request event.
Blocks until some communication is completed and a Python future is ready to be notified, shutdown was initiated or a timeout occurred (only if periodNs > 0
). This method is intended for use from the Python notifier thread, where that thread will block until one of the aforementioned events occur.
RequestNotifierWaitState::Ready
if some communication completed, RequestNotifierWaitStats::Timeout
if a timeout occurred, or RequestNotifierWaitStats::Shutdown
if shutdown has initiated. Reimplemented from ucxx::Worker.
|
friend |
Constructor of shared_ptr<ucxx::python::Worker>
.
The constructor for a shared_ptr<ucxx::python::Worker>
object. The default constructor is made private to ensure all UCXX objects are shared pointers for correct lifetime management.
shared_ptr<ucxx::python::Worker>
objectThe constructor for a shared_ptr<ucxx::Worker>
object. The default constructor is made private to ensure all UCXX objects are shared pointers for correct lifetime management.
[in] | context | the context from which to create the worker. |
[in] | enableDelayedSubmission | if true , each ucxx::Request will not be submitted immediately, but instead delayed to the progress thread. Requires use of the progress thread. |
[in] | enableFuture | if true , notifies the future associated with each ucxx::Request , currently used only by ucxx::python::Worker . |
shared_ptr<ucxx::Worker>
object