All Classes Namespaces Functions Variables Typedefs Enumerations Friends
Public Member Functions | Friends | List of all members
ucxx::python::Worker Class Reference

Specialized Python implementation of a ucxx::Worker. More...

#include <worker.h>

Inheritance diagram for ucxx::python::Worker:
ucxx::Worker ucxx::Component

Public Member Functions

 Worker (const Worker &)=delete
 
Workeroperator= (Worker const &)=delete
 
 Worker (Worker &&o)=delete
 
Workeroperator= (Worker &&o)=delete
 
void populateFuturesPool () override
 Populate the Python futures pool. More...
 
void clearFuturesPool () override
 Clear the futures pool. More...
 
std::shared_ptr<::ucxx::FuturegetFuture () override
 Get a Python future from the pool. More...
 
RequestNotifierWaitState waitRequestNotifier (uint64_t periodNs) override
 Block until a request event. More...
 
void runRequestNotifier () override
 Notify Python futures of each completed communication request. More...
 
void stopRequestNotifierThread () override
 Signal the notifier to terminate. More...
 
- Public Member Functions inherited from ucxx::Worker
 Worker (const Worker &)=delete
 
Workeroperator= (Worker const &)=delete
 
 Worker (Worker &&o)=delete
 
Workeroperator= (Worker &&o)=delete
 
virtual ~Worker ()
 ucxx::Worker destructor.
 
ucp_worker_h getHandle ()
 Get the underlying ucp_worker_h handle. More...
 
std::string getInfo ()
 Get information about the underlying ucp_worker_h object. More...
 
void initBlockingProgressMode ()
 Initialize blocking progress mode. More...
 
int getEpollFileDescriptor ()
 Get the epoll file descriptor associated with the worker. More...
 
bool arm ()
 Arm the UCP worker. More...
 
bool progressWorkerEvent (const int epollTimeout=-1)
 Progress worker event while in blocking progress mode. More...
 
void signal ()
 Signal the worker that an event happened. More...
 
bool waitProgress ()
 Block until an event has happened, then progresses. More...
 
bool progressOnce ()
 Progress the worker only once. More...
 
bool progress ()
 Progress the worker until all communication events are completed. More...
 
void registerDelayedSubmission (std::shared_ptr< Request > request, DelayedSubmissionCallbackType callback)
 Register delayed request submission. More...
 
bool registerGenericPre (DelayedSubmissionCallbackType callback, uint64_t period=0)
 Register callback to be executed in progress thread before progressing. More...
 
bool registerGenericPost (DelayedSubmissionCallbackType callback, uint64_t period=0)
 Register callback to be executed in progress thread before progressing. More...
 
bool isDelayedRequestSubmissionEnabled () const
 Inquire if worker has been created with delayed submission enabled. More...
 
bool isFutureEnabled () const
 Inquire if worker has been created with future support. More...
 
void setProgressThreadStartCallback (std::function< void(void *)> callback, void *callbackArg)
 Set callback to be executed at the progress thread start. More...
 
void startProgressThread (const bool pollingMode=false, const int epollTimeout=1)
 Start the progress thread. More...
 
void stopProgressThread ()
 Stop the progress thread. More...
 
bool isProgressThreadRunning ()
 Inquire if worker has a progress thread running. More...
 
std::thread::id getProgressThreadId ()
 Get the progress thread ID. More...
 
size_t cancelInflightRequests (uint64_t period=0, uint64_t maxAttempts=1)
 Cancel inflight requests. More...
 
void scheduleRequestCancel (TrackedRequestsPtr trackedRequests)
 Schedule cancelation of inflight requests. More...
 
void removeInflightRequest (const Request *const request)
 Remove reference to request from internal container. More...
 
bool tagProbe (const Tag tag)
 Check for uncaught tag messages. More...
 
std::shared_ptr< RequesttagRecv (void *buffer, size_t length, Tag tag, TagMask tagMask, const bool enableFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
 Enqueue a tag receive operation. More...
 
std::shared_ptr< AddressgetAddress ()
 Get the address of the UCX worker object. More...
 
std::shared_ptr< EndpointcreateEndpointFromHostname (std::string ipAddress, uint16_t port, bool endpointErrorHandling=true)
 Create endpoint to worker listening on specific IP and port. More...
 
std::shared_ptr< EndpointcreateEndpointFromWorkerAddress (std::shared_ptr< Address > address, bool endpointErrorHandling=true)
 Create endpoint to worker located at UCX address. More...
 
std::shared_ptr< ListenercreateListener (uint16_t port, ucp_listener_conn_callback_t callback, void *callbackArgs)
 Listen for remote connections on given port. More...
 
void registerAmAllocator (ucs_memory_type_t memoryType, AmAllocatorType allocator)
 Register allocator for active messages. More...
 
void registerAmReceiverCallback (AmReceiverCallbackInfo info, AmReceiverCallbackType callback)
 Register receiver callback for active messages. More...
 
bool amProbe (const ucp_ep_h endpointHandle) const
 Check for uncaught active messages. More...
 
std::shared_ptr< Requestflush (const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
 Enqueue a flush operation. More...
 
- Public Member Functions inherited from ucxx::Component
void setParent (std::shared_ptr< Component > parent)
 Set the internal parent reference. More...
 
std::shared_ptr< ComponentgetParent () const
 Get the internal parent reference. More...
 

Friends

std::shared_ptr<::ucxx::WorkercreateWorker (std::shared_ptr< Context > context, const bool enableDelayedSubmission, const bool enableFuture)
 Constructor of shared_ptr<ucxx::python::Worker>. More...
 

Additional Inherited Members

- Protected Member Functions inherited from ucxx::Worker
 Worker (std::shared_ptr< Context > context, const bool enableDelayedSubmission=false, const bool enableFuture=false)
 Protected constructor of ucxx::Worker. More...
 
- Protected Attributes inherited from ucxx::Worker
bool _enableFuture
 Boolean identifying whether the worker was created with future capability. More...
 
std::mutex _futuresPoolMutex {}
 Mutex to access the futures pool.
 
std::queue< std::shared_ptr< Future > > _futuresPool {}
 Futures pool to prevent running out of fresh futures.
 
std::shared_ptr< Notifier_notifier {nullptr}
 Notifier object.
 
std::shared_ptr< internal::AmData_amData
 Worker data made available to Active Messages callback.
 
- Protected Attributes inherited from ucxx::Component
std::shared_ptr< Component_parent {nullptr}
 A reference-counted pointer to the parent.
 

Detailed Description

Specialized Python implementation of a ucxx::Worker.

Specialized Python implementation of a ucxx::Worker, providing Python-specific functionality, such as notification of Python futures.

Member Function Documentation

◆ clearFuturesPool()

void ucxx::python::Worker::clearFuturesPool ( )
overridevirtual

Clear the futures pool.

Clear the futures pool, ensuring all references are removed and thus avoiding reference cycles that prevent the ucxx::Worker and other resources from cleaning up on time.

This method is safe to be called even if object was created with enableFuture=false.

Reimplemented from ucxx::Worker.

◆ getFuture()

std::shared_ptr<::ucxx::Future> ucxx::python::Worker::getFuture ( )
overridevirtual

Get a Python future from the pool.

Get a Python future from the pool. If the pool is empty, ucxx::python::Worker::populateFuturesPool() is called and a warning is raised, since that likely means the user is missing to call the aforementioned method regularly.

Returns
The shared_ptr<ucxx::python::Future> object

Reimplemented from ucxx::Worker.

◆ populateFuturesPool()

void ucxx::python::Worker::populateFuturesPool ( )
overridevirtual

Populate the Python futures pool.

To avoid taking the Python GIL for every new future required by each ucxx::Request, the ucxx::python::Worker maintains a pool of futures that can be acquired when a new ucxx::Request is created. Currently the pool has a maximum size of 100 objects, and will refill once it goes under 50, otherwise calling this functions results in a no-op.

Exceptions
std::runtime_errorif object was created with enableFuture=false.

Reimplemented from ucxx::Worker.

◆ runRequestNotifier()

void ucxx::python::Worker::runRequestNotifier ( )
overridevirtual

Notify Python futures of each completed communication request.

Notifies Python futures of each completed communication request of their new status. This method is intended to be used from the Python notifier thread, where the thread will call waitRequestNotifier() and block until some communication is completed, and then call this method to notify all futures. If this is notifying a Python future, the thread where this method is called from must be using the same Python event loop as the thread that submitted the transfer request.

Reimplemented from ucxx::Worker.

◆ stopRequestNotifierThread()

void ucxx::python::Worker::stopRequestNotifierThread ( )
overridevirtual

Signal the notifier to terminate.

Signals the notifier to terminate, awakening the waitRequestNotifier() blocking call.

Reimplemented from ucxx::Worker.

◆ waitRequestNotifier()

RequestNotifierWaitState ucxx::python::Worker::waitRequestNotifier ( uint64_t  periodNs)
overridevirtual

Block until a request event.

Blocks until some communication is completed and a Python future is ready to be notified, shutdown was initiated or a timeout occurred (only if periodNs > 0). This method is intended for use from the Python notifier thread, where that thread will block until one of the aforementioned events occur.

Returns
RequestNotifierWaitState::Ready if some communication completed, RequestNotifierWaitStats::Timeout if a timeout occurred, or RequestNotifierWaitStats::Shutdown if shutdown has initiated.

Reimplemented from ucxx::Worker.

Friends And Related Function Documentation

◆ createWorker

std::shared_ptr<::ucxx::Worker> createWorker ( std::shared_ptr< Context context,
const bool  enableDelayedSubmission,
const bool  enableFuture 
)
friend

Constructor of shared_ptr<ucxx::python::Worker>.

The constructor for a shared_ptr<ucxx::python::Worker> object. The default constructor is made private to ensure all UCXX objects are shared pointers for correct lifetime management.

// context is `std::shared_ptr<ucxx::Context>`
auto worker = ucxx::createWorker(context, false, false);
std::shared_ptr< Worker > createWorker(std::shared_ptr< Context > context, const bool enableDelayedSubmission, const bool enableFuture)
Returns
The shared_ptr<ucxx::python::Worker> object

The constructor for a shared_ptr<ucxx::Worker> object. The default constructor is made private to ensure all UCXX objects are shared pointers for correct lifetime management.

// context is `std::shared_ptr<ucxx::Context>`
auto worker = context->createWorker(false);
// Equivalent to line above
// auto worker = ucxx::createWorker(context, false);
Parameters
[in]contextthe context from which to create the worker.
[in]enableDelayedSubmissionif true, each ucxx::Request will not be submitted immediately, but instead delayed to the progress thread. Requires use of the progress thread.
[in]enableFutureif true, notifies the future associated with each ucxx::Request, currently used only by ucxx::python::Worker.
Returns
The shared_ptr<ucxx::Worker> object

The documentation for this class was generated from the following file: