All Classes Namespaces Functions Variables Typedefs Enumerations Friends
Public Member Functions | Protected Member Functions | Protected Attributes | Friends | List of all members
ucxx::Worker Class Reference

Component encapsulating a UCP worker. More...

#include <worker.h>

Inheritance diagram for ucxx::Worker:
ucxx::Component ucxx::python::Worker

Public Member Functions

 Worker (const Worker &)=delete
 
Workeroperator= (Worker const &)=delete
 
 Worker (Worker &&o)=delete
 
Workeroperator= (Worker &&o)=delete
 
virtual ~Worker ()
 ucxx::Worker destructor.
 
ucp_worker_h getHandle ()
 Get the underlying ucp_worker_h handle. More...
 
std::string getInfo ()
 Get information about the underlying ucp_worker_h object. More...
 
void initBlockingProgressMode ()
 Initialize blocking progress mode. More...
 
int getEpollFileDescriptor ()
 Get the epoll file descriptor associated with the worker. More...
 
bool arm ()
 Arm the UCP worker. More...
 
bool progressWorkerEvent (const int epollTimeout=-1)
 Progress worker event while in blocking progress mode. More...
 
void signal ()
 Signal the worker that an event happened. More...
 
bool waitProgress ()
 Block until an event has happened, then progresses. More...
 
bool progressOnce ()
 Progress the worker only once. More...
 
bool progress ()
 Progress the worker until all communication events are completed. More...
 
void registerDelayedSubmission (std::shared_ptr< Request > request, DelayedSubmissionCallbackType callback)
 Register delayed request submission. More...
 
bool registerGenericPre (DelayedSubmissionCallbackType callback, uint64_t period=0)
 Register callback to be executed in progress thread before progressing. More...
 
bool registerGenericPost (DelayedSubmissionCallbackType callback, uint64_t period=0)
 Register callback to be executed in progress thread before progressing. More...
 
bool isDelayedRequestSubmissionEnabled () const
 Inquire if worker has been created with delayed submission enabled. More...
 
bool isFutureEnabled () const
 Inquire if worker has been created with future support. More...
 
virtual void populateFuturesPool ()
 Populate the futures pool. More...
 
virtual void clearFuturesPool ()
 Clear the futures pool. More...
 
virtual std::shared_ptr< FuturegetFuture ()
 Get a future from the pool. More...
 
virtual RequestNotifierWaitState waitRequestNotifier (uint64_t periodNs)
 Block until a request event. More...
 
virtual void runRequestNotifier ()
 Notify futures of each completed communication request. More...
 
virtual void stopRequestNotifierThread ()
 Signal the notifier to terminate. More...
 
void setProgressThreadStartCallback (std::function< void(void *)> callback, void *callbackArg)
 Set callback to be executed at the progress thread start. More...
 
void startProgressThread (const bool pollingMode=false, const int epollTimeout=1)
 Start the progress thread. More...
 
void stopProgressThread ()
 Stop the progress thread. More...
 
bool isProgressThreadRunning ()
 Inquire if worker has a progress thread running. More...
 
std::thread::id getProgressThreadId ()
 Get the progress thread ID. More...
 
size_t cancelInflightRequests (uint64_t period=0, uint64_t maxAttempts=1)
 Cancel inflight requests. More...
 
void scheduleRequestCancel (TrackedRequestsPtr trackedRequests)
 Schedule cancelation of inflight requests. More...
 
void removeInflightRequest (const Request *const request)
 Remove reference to request from internal container. More...
 
std::pair< bool, TagRecvInfotagProbe (const Tag tag, const TagMask tagMask=TagMaskFull)
 Check for uncaught tag messages. More...
 
std::shared_ptr< RequesttagRecv (void *buffer, size_t length, Tag tag, TagMask tagMask, const bool enableFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
 Enqueue a tag receive operation. More...
 
std::shared_ptr< AddressgetAddress ()
 Get the address of the UCX worker object. More...
 
std::shared_ptr< EndpointcreateEndpointFromHostname (std::string ipAddress, uint16_t port, bool endpointErrorHandling=true)
 Create endpoint to worker listening on specific IP and port. More...
 
std::shared_ptr< EndpointcreateEndpointFromWorkerAddress (std::shared_ptr< Address > address, bool endpointErrorHandling=true)
 Create endpoint to worker located at UCX address. More...
 
std::shared_ptr< ListenercreateListener (uint16_t port, ucp_listener_conn_callback_t callback, void *callbackArgs)
 Listen for remote connections on given port. More...
 
void registerAmAllocator (ucs_memory_type_t memoryType, AmAllocatorType allocator)
 Register allocator for active messages. More...
 
void registerAmReceiverCallback (AmReceiverCallbackInfo info, AmReceiverCallbackType callback)
 Register receiver callback for active messages. More...
 
bool amProbe (const ucp_ep_h endpointHandle) const
 Check for uncaught active messages. More...
 
std::shared_ptr< Requestflush (const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
 Enqueue a flush operation. More...
 
- Public Member Functions inherited from ucxx::Component
void setParent (std::shared_ptr< Component > parent)
 Set the internal parent reference. More...
 
std::shared_ptr< ComponentgetParent () const
 Get the internal parent reference. More...
 

Protected Member Functions

 Worker (std::shared_ptr< Context > context, const bool enableDelayedSubmission=false, const bool enableFuture=false)
 Protected constructor of ucxx::Worker. More...
 

Protected Attributes

bool _enableFuture
 Boolean identifying whether the worker was created with future capability. More...
 
std::mutex _futuresPoolMutex {}
 Mutex to access the futures pool.
 
std::queue< std::shared_ptr< Future > > _futuresPool {}
 Futures pool to prevent running out of fresh futures.
 
std::shared_ptr< Notifier_notifier {nullptr}
 Notifier object.
 
std::shared_ptr< internal::AmData_amData
 Worker data made available to Active Messages callback.
 
- Protected Attributes inherited from ucxx::Component
std::shared_ptr< Component_parent {nullptr}
 A reference-counted pointer to the parent.
 

Friends

std::shared_ptr< RequestAmcreateRequestAm (std::shared_ptr< Endpoint > endpoint, const std::variant< data::AmSend, data::AmReceive > requestData, const bool enablePythonFuture, RequestCallbackUserFunction callbackFunction, RequestCallbackUserData callbackData)
 
std::shared_ptr< WorkercreateWorker (std::shared_ptr< Context > context, const bool enableDelayedSubmission, const bool enableFuture)
 Constructor of shared_ptr<ucxx::Worker>. More...
 

Detailed Description

Component encapsulating a UCP worker.

The UCP layer provides a handle to access workers in form of ucp_worker_h object, this class encapsulates that object and provides methods to simplify its handling.

Constructor & Destructor Documentation

◆ Worker()

ucxx::Worker::Worker ( std::shared_ptr< Context context,
const bool  enableDelayedSubmission = false,
const bool  enableFuture = false 
)
explicitprotected

Protected constructor of ucxx::Worker.

This is the internal implementation of ucxx::Worker constructor, made protected not to be called directly. Instead the user should call context::createWorker() or ucxx::createWorker() (or ucxx::createPythonWorker for the Python-enabled implementation).

Parameters
[in]contextthe context from which to create the worker.
[in]enableDelayedSubmissionif true, each ucxx::Request will not be submitted immediately, but instead delayed to the progress thread. Requires use of the progress thread.
[in]enableFutureif true, notifies the future associated with each ucxx::Request, currently used only by ucxx::python::Worker.

Member Function Documentation

◆ amProbe()

bool ucxx::Worker::amProbe ( const ucp_ep_h  endpointHandle) const

Check for uncaught active messages.

Checks the worker for any uncaught active messages. An uncaught active message is any active message that has been fully or partially received by the worker, but not matched by a corresponding createRequestAmRecv() call.

// `worker` is `std::shared_ptr<ucxx::Worker>`
// `ep` is a remote `std::shared_ptr<ucxx::Endpoint` to the local `worker`
assert(!worker->amProbe(ep->getHandle()));
ep->amSend(buffer, length);
assert(worker->amProbe(0));
Returns
true if any uncaught messages were received, false otherwise.

◆ arm()

bool ucxx::Worker::arm ( )

Arm the UCP worker.

Wrapper for ucp_worker_arm, checking its return status for errors and raising an exception if an error occurred.

Exceptions
ucxx::Errorif an error occurred while attempting to arm the worker.
Returns
true if worker was armed successfully, false if its status was UCS_ERR_BUSY.

◆ cancelInflightRequests()

size_t ucxx::Worker::cancelInflightRequests ( uint64_t  period = 0,
uint64_t  maxAttempts = 1 
)

Cancel inflight requests.

Cancel inflight requests, returning the total number of requests that were canceled. This is usually executed during the progress loop.

If the parent worker is running a progress thread, a maximum timeout may be specified for which the close operation will wait. This can be particularly important for cases where the progress thread might be attempting to acquire a resource (e.g., the Python GIL) while the current thread owns that resource. In particular for Python, the ~Worker() will call this method for which we can't release the GIL when the garbage collector runs and destroys the object.

Parameters
[in]periodmaximum period to wait for a generic pre/post progress thread operation will wait for.
[in]maxAttemptsmaximum number of attempts to close endpoint, only applicable if worker is running a progress thread and period > 0.
Returns
Number of requests that were canceled.

◆ clearFuturesPool()

virtual void ucxx::Worker::clearFuturesPool ( )
virtual

Clear the futures pool.

Clear the futures pool, ensuring all references are removed and thus avoiding reference cycles that prevent the ucxx::Worker and other resources from cleaning up on time.

Exceptions
std::runtime_errorif future support is not implemented.

Reimplemented in ucxx::python::Worker.

◆ createEndpointFromHostname()

std::shared_ptr<Endpoint> ucxx::Worker::createEndpointFromHostname ( std::string  ipAddress,
uint16_t  port,
bool  endpointErrorHandling = true 
)

Create endpoint to worker listening on specific IP and port.

Creates an endpoint to a remote worker listening on a specific IP address and port. The remote worker must have an active listener created with ucxx::Worker::createListener().

// `worker` is `std::shared_ptr<ucxx::Worker>`
// Create endpoint to worker listening on `10.10.10.10:12345`.
auto ep = worker->createEndpointFromHostname("10.10.10.10", 12345);
Exceptions
std::invalid_argumentif the IP address or hostname is invalid.
std::bad_allocif there was an error allocating space to handle the address.
ucxx::Errorif an error occurred while attempting to create the endpoint.
Parameters
[in]ipAddressstring containing the IP address of the remote worker.
[in]portport number where the remote worker is listening at.
[in]endpointErrorHandlingenable endpoint error handling if true, disable otherwise.
Returns
The shared_ptr<ucxx::Endpoint> object

◆ createEndpointFromWorkerAddress()

std::shared_ptr<Endpoint> ucxx::Worker::createEndpointFromWorkerAddress ( std::shared_ptr< Address address,
bool  endpointErrorHandling = true 
)

Create endpoint to worker located at UCX address.

Creates an endpoint to a listener-independent remote worker. The worker location is identified by its UCX address, wrapped by a std::shared_ptr<ucxx::Address> object.

// `worker` is `std::shared_ptr<ucxx::Worker>`
auto localAddress = worker->getAddress();
// pass address to remote process
// ...
// receive address received from remote process
// ...
// `remoteAddress` is `std::shared_ptr<ucxx::Address>`
auto ep = worker->createEndpointFromAddress(remoteAddress);
Exceptions
ucxx::Errorif an error occurred while attempting to create the endpoint.
Parameters
[in]addressaddress of the remote UCX worker.
[in]endpointErrorHandlingenable endpoint error handling if true, disable otherwise.
Returns
The shared_ptr<ucxx::Endpoint> object

◆ createListener()

std::shared_ptr<Listener> ucxx::Worker::createListener ( uint16_t  port,
ucp_listener_conn_callback_t  callback,
void *  callbackArgs 
)

Listen for remote connections on given port.

Starts a listener on given port. The listener allows remote processes to connect to the local worker via an IP and port pair. The connection is then handle via a callback specified by the user.

Exceptions
std::bad_allocif there was an error allocating space to handle the address.
ucxx::Errorif an error occurred while attempting to create the listener or to acquire its address.
Parameters
[in]portport number where to listen at.
[in]callbackto handle each incoming connection.
[in]callbackArgspointer to argument to pass to the callback.
Returns
The shared_ptr<ucxx::Listener> object

◆ flush()

std::shared_ptr<Request> ucxx::Worker::flush ( const bool  enablePythonFuture = false,
RequestCallbackUserFunction  callbackFunction = nullptr,
RequestCallbackUserData  callbackData = nullptr 
)

Enqueue a flush operation.

Enqueue request to flush outstanding AMO (Atomic Memory Operation) and RMA (Remote Memory Access) operations on the worker, returning a pointer to a request object that can be later awaited and checked for errors. This is a non-blocking operation, and its status must be verified from the resulting request object to confirm the flush operation has completed successfully.

Using a Python future may be requested by specifying enablePythonFuture. If a Python future is requested, the Python application must then await on this future to ensure the transfer has completed. Requires UCXX Python support.

Parameters
[in]buffera raw pointer to the data to be sent.
[in]enablePythonFuturewhether a python future should be created and subsequently notified.
[in]callbackFunctionuser-defined callback function to call upon completion.
[in]callbackDatauser-defined data to pass to the callbackFunction.
Returns
Request to be subsequently checked for the completion and its state.

◆ getAddress()

std::shared_ptr<Address> ucxx::Worker::getAddress ( )

Get the address of the UCX worker object.

Gets the address of the underlying UCX worker object, which can then be passed to a remote worker, allowing creating a new endpoint to the local worker via ucxx::Worker::createEndpointFromWorkerAddress().

Exceptions
ucxx::Errorif an error occurred while attempting to get the worker address.
Returns
The address of the local worker.

◆ getEpollFileDescriptor()

int ucxx::Worker::getEpollFileDescriptor ( )

Get the epoll file descriptor associated with the worker.

Get the epoll file descriptor associated with the worker when running in blocking mode. The worker only has an associated epoll file descriptor after initBlockingProgressMode() is executed.

The file descriptor is destroyed as part of the ucxx::Worker destructor, thus any reference to it shall not be used after that.

Exceptions
std::runtime_errorif initBlockingProgressMode() was not executed to run the worker in blocking progress mode.
Returns
the file descriptor.

◆ getFuture()

virtual std::shared_ptr<Future> ucxx::Worker::getFuture ( )
virtual

Get a future from the pool.

Get a future from the pool. If the pool is empty, ucxx::Worker::populateFuturesPool() is called and a warning is raised, since that likely means the user is missing to call the aforementioned method regularly.

Exceptions
std::runtime_errorif future support is not implemented.
Returns
The shared_ptr<ucxx::python::Future> object

Reimplemented in ucxx::python::Worker.

◆ getHandle()

ucp_worker_h ucxx::Worker::getHandle ( )

Get the underlying ucp_worker_h handle.

Lifetime of the ucp_worker_h handle is managed by the ucxx::Worker object and its ownership is non-transferrable. Once the ucxx::Worker is destroyed the handle is not valid anymore, it is the user's responsibility to ensure the owner's lifetime while using the handle.

// worker is `std::shared_ptr<ucxx::Worker>`
ucp_worker_h workerHandle = worker->getHandle();
Returns
The underlying ucp_worker_h handle.

◆ getInfo()

std::string ucxx::Worker::getInfo ( )

Get information about the underlying ucp_worker_h object.

Convenience wrapper for ucp_worker_print_info() to get information about the underlying UCP worker handle and return it as a string.

Returns
String containing information about the UCP worker.

◆ getProgressThreadId()

std::thread::id ucxx::Worker::getProgressThreadId ( )

Get the progress thread ID.

Get the progress thread ID, only valid if startProgressThread() was called.

Returns
the progress thread ID.

◆ initBlockingProgressMode()

void ucxx::Worker::initBlockingProgressMode ( )

Initialize blocking progress mode.

Initialize blocking progress mode, creates internal file descriptors to handle blocking progress by waiting for the UCP worker to notify the file descriptors. This method is supposed to be called when usage of progressWorkerEvent() is intended, before the first call to progressWorkerEvent(). If using polling mode only via progress()/progressOnce() calls or wake-up with waitProgress(), this method should not be called.

In blocking mode, the user should call progressWorkerEvent() to block and then progress the worker as new events arrive. wakeProgressEvent() may be called to forcefully wake this method, for example to shutdown the application.

// worker is `std::shared_ptr<ucxx::Worker>`
// Block until UCX's wakes up for an incoming event, then fully progresses the
// worker
worker->initBlockingProgressMode();
worker->progressWorkerEvent();
// All events have been progressed.
Exceptions
std::ios_base::failureif creating any of the file descriptors or setting their statuses.

◆ isDelayedRequestSubmissionEnabled()

bool ucxx::Worker::isDelayedRequestSubmissionEnabled ( ) const

Inquire if worker has been created with delayed submission enabled.

Check whether the worker has been created with delayed submission enabled.

Returns
true if delayed submission is enabled, false otherwise.

◆ isFutureEnabled()

bool ucxx::Worker::isFutureEnabled ( ) const

Inquire if worker has been created with future support.

Check whether the worker has been created with future support.

Returns
true if future support is enabled, false otherwise.

◆ isProgressThreadRunning()

bool ucxx::Worker::isProgressThreadRunning ( )

Inquire if worker has a progress thread running.

Check whether the worker currently has a progress thread running.

Returns
true if a progress thread is running, false otherwise.

◆ populateFuturesPool()

virtual void ucxx::Worker::populateFuturesPool ( )
virtual

Populate the futures pool.

To avoid taking blocking resources (such as the Python GIL) for every new future required by each ucxx::Request, the ucxx::Worker maintains a pool of futures that can be acquired when a new ucxx::Request is created. Currently the pool has a maximum size of 100 objects, and will refill once it goes under 50, otherwise calling this functions results in a no-op.

Exceptions
std::runtime_errorif future support is not implemented.

Reimplemented in ucxx::python::Worker.

◆ progress()

bool ucxx::Worker::progress ( )

Progress the worker until all communication events are completed.

Iteratively calls progressOnce() until all communication events are completed. Additionally ensure inflight messages pending for cancelation are canceled.

// worker is `std::shared_ptr<ucxx::Worker>`
worker->progress();
// All events have been progressed and inflight pending for cancelation were canceled.
Returns
whether any communication events have been progressed.

◆ progressOnce()

bool ucxx::Worker::progressOnce ( )

Progress the worker only once.

Wrapper for ucp_worker_progress.

// worker is `std::shared_ptr<ucxx::Worker>`
while (!worker->progressOnce()) ;
// All events have been progressed.
Returns
true if any communication was progressed, false otherwise.

◆ progressWorkerEvent()

bool ucxx::Worker::progressWorkerEvent ( const int  epollTimeout = -1)

Progress worker event while in blocking progress mode.

Blocks until a new worker event has happened and the worker notifies the file descriptor associated with it, or epollTimeout has elapsed. Requires blocking progress mode to be initialized with initBlockingProgressMode() before the first call to this method. Additionally ensure inflight messages pending for cancelation are canceled.

// worker is `std::shared_ptr<ucxx::Worker>`
// Block until UCX's wakes up for an incoming event, then fully progresses the
// worker
worker->initBlockingProgressMode();
worker->progressWorkerEvent();
// All events have been progressed.
Parameters
[in]epollTimeouttimeout in ms when waiting for worker event, or -1 to block indefinitely.
Exceptions
std::ios_base::failureif creating any of the file descriptors or setting their statuses.
Returns
true if any communication was progressed, false otherwise.

◆ registerAmAllocator()

void ucxx::Worker::registerAmAllocator ( ucs_memory_type_t  memoryType,
AmAllocatorType  allocator 
)

Register allocator for active messages.

Register a new allocator for active messages. By default, only one allocator is defined for host memory (UCS_MEMORY_TYPE_HOST), and is used as a fallback when an allocator for the source's memory type is unavailable. In many circumstances relying exclusively on the host allocator is undesirable, for example when transferring CUDA buffers the destination is always going to be a host buffer and prevent the use of transports such as NVLink or InfiniBand+GPUDirectRDMA. For that reason it's important that the user defines those allocators that are important for the application.

If the memoryType has already been registered, the previous allocator will be replaced by the new one. Be careful when doing this after transfers have started, there are no guarantees that inflight messages have not already been allocated with the old allocator for that type.

// context is `std::shared_ptr<ucxx::Context>`
auto worker = context->createWorker(false);
worker->registerAmAllocator(`UCS_MEMORY_TYPE_CUDA`, ucxx::RMMBuffer);
Parameters
[in]memoryTypethe memory type the allocator will be used for.
[in]allocatorthe allocator callable that will be used to allocate new active message buffers.

◆ registerAmReceiverCallback()

void ucxx::Worker::registerAmReceiverCallback ( AmReceiverCallbackInfo  info,
AmReceiverCallbackType  callback 
)

Register receiver callback for active messages.

Register a new receiver callback for active messages. By default, active messages do not execute any callbacks on the receiving end unless one is specified when sending the message. If the message sender specifies a callback receiver identifier then the remote receiver needs to have a callback registered with the same identifier to execute when the request completes. To ensure multiple applications that do not know about each other can have coexisting callbacks where receiver identifiers may have the same value, an owner must be specified as well, which has the form of a string and should be reasonably unique to prevent accidentally calling callbacks from a separate application, thus names like "A" or "UCX" are discouraged in favor of more descriptive names such as "MyFastCommsProject", and the name "ucxx" is reserved.

Because it is impossible to predict which callback would be called in such an event, the registered callback cannot be changed, thus calling this method with the same given owner and identifier will throw std::runtime_error.

// `worker` is `std::shared_ptr<ucxx::Worker>`
auto callback = [](std::shared_ptr<ucxx::Request> req) {
std::cout << "The UCXX request address is " << (void*)req.get() << std::endl;
};
worker->registerAmReceiverCallback({"MyFastApp", 0}, callback};
Exceptions
std::runtime_errorif a callback with same given owner and identifier is already registered, or if the reserved owner name "ucxx" is specified.
Parameters
[in]receiverCallbackInfothe owner name and unique identifier of the receiver callback.
[in]callbackthe callback to execute when the active message is received.

◆ registerDelayedSubmission()

void ucxx::Worker::registerDelayedSubmission ( std::shared_ptr< Request request,
DelayedSubmissionCallbackType  callback 
)

Register delayed request submission.

Register ucxx::Request for delayed submission. When the ucxx::Worker is created with enableDelayedSubmission=true, calling actual UCX transfer routines will not happen immediately and instead will be submitted later by the worker thread.

The purpose of this method is to offload as much as possible any work to the worker thread, thus decreasing computation on the caller thread, but potentially increasing transfer latency.

Parameters
[in]requestthe request to which the callback belongs, ensuring it remains alive until the callback is invoked.
[in]callbackthe callback set to execute the UCP transfer routine during the worker thread loop.

◆ registerGenericPost()

bool ucxx::Worker::registerGenericPost ( DelayedSubmissionCallbackType  callback,
uint64_t  period = 0 
)

Register callback to be executed in progress thread before progressing.

Register callback to be executed in the current or next iteration of the progress thread after the worker is progressed. There is no guarantee that the callback will be executed in the current or next iteration, this depends on where the progress thread is in the current iteration when this callback is registered.

The purpose of this method is to schedule operations to be executed in the progress thread, immediately after progressing the worker completes.

If period is 0 this is a blocking call that only returns when the callback has been executed and will always return true, and if period is a positive integer the time in nanoseconds will be waited for the callback to complete and return true in the successful case or false otherwise. However, if the callback is not cancelable anymore (i.e., it has already started), this method will keep retrying and may never return if the callback never completes, it is unsafe to return as this would allow the caller to destroy the callback and its resources causing undefined behavior. period only applies if the worker progress thread is running, otherwise the callback is immediately executed.

Parameters
[in]callbackthe callback to execute before progressing the worker.
[in]periodthe time in nanoseconds to wait for the callback to complete.
Returns
true if the callback was successfully executed or false if timed out.

◆ registerGenericPre()

bool ucxx::Worker::registerGenericPre ( DelayedSubmissionCallbackType  callback,
uint64_t  period = 0 
)

Register callback to be executed in progress thread before progressing.

Register callback to be executed in the current or next iteration of the progress thread before the worker is progressed. There is no guarantee that the callback will be executed in the current or next iteration, this depends on where the progress thread is in the current iteration when this callback is registered.

The purpose of this method is to schedule operations to be executed in the progress thread, such as endpoint creation and closing, so that progressing doesn't ever need to occur in the application thread when using a progress thread.

If period is 0 this is a blocking call that only returns when the callback has been executed and will always return true, and if period is a positive integer the time in nanoseconds will be waited for the callback to complete and return true in the successful case or false otherwise. However, if the callback is not cancelable anymore (i.e., it has already started), this method will keep retrying and may never return if the callback never completes, it is unsafe to return as this would allow the caller to destroy the callback and its resources causing undefined behavior. period only applies if the worker progress thread is running, otherwise the callback is immediately executed.

Parameters
[in]callbackthe callback to execute before progressing the worker.
[in]periodthe time in nanoseconds to wait for the callback to complete.
Returns
true if the callback was successfully executed or false if timed out.

◆ removeInflightRequest()

void ucxx::Worker::removeInflightRequest ( const Request *const  request)

Remove reference to request from internal container.

Remove the reference to a specific request from the internal container. This should be called when a request has completed and the ucxx::Worker does not need to keep track of it anymore. The raw pointer to a ucxx::Request is passed here as opposed to the usual std::shared_ptr<ucxx::Request> used elsewhere, this is because the raw pointer address is used as key to the requests reference, and this is called from the object's destructor.

Parameters
[in]requestraw pointer to the request

◆ runRequestNotifier()

virtual void ucxx::Worker::runRequestNotifier ( )
virtual

Notify futures of each completed communication request.

Notifies futures of each completed communication request of their new status. This method is intended to be used from the Notifier (such as the Python thread running it), where the thread will call waitRequestNotifier() and block until some communication is completed, and then call this method to notify all futures. If this is notifying a Python future, the thread where this method is called from must be using the same Python event loop as the thread that submitted the transfer request.

Exceptions
std::runtime_errorif future support is not implemented.

Reimplemented in ucxx::python::Worker.

◆ scheduleRequestCancel()

void ucxx::Worker::scheduleRequestCancel ( TrackedRequestsPtr  trackedRequests)

Schedule cancelation of inflight requests.

Schedule inflight request to be canceled when cancelInflightRequests() is executed the next time, usually during the progress loop. This is usually called from a ucxx::Endpoint, for example when the error callback was called, signaling that inflight requests for that endpoint will not be completed successfully and should be canceled.

Parameters
[in]trackedRequeststhe requests tracked by a child of this class to be scheduled for cancelation.

◆ setProgressThreadStartCallback()

void ucxx::Worker::setProgressThreadStartCallback ( std::function< void(void *)>  callback,
void *  callbackArg 
)

Set callback to be executed at the progress thread start.

Sets a callback that will be executed at the beginning of the progress thread. This can be used to initialize any resources that are required to be available on the thread the worker will be progressed from, such as a CUDA context.

Parameters
[in]callbackfunction to execute during progress thread start
[in]callbackArgargument to be passed to the callback function

◆ signal()

void ucxx::Worker::signal ( )

Signal the worker that an event happened.

Signals that an event has happened while, causing both either progressWorkerEvent() or waitProgress() to immediately wake-up.

// worker is `std::shared_ptr<ucxx::Worker>`
void progressThread() {
// Block until UCX's wakes up for an incoming event, then fully progresses the
// worker.
worker->initBlockingProgressMode();
worker->progressWorkerEvent();
// Will reach this point and exit after 3 seconds
}
void otherThread() {
// Signals the worker after 3 seconds
std::this_thread::sleep_for(std::chrono::seconds(3));
worker->signal();
}
void mainThread() {
t1 = std::thread(progressThread);
t2 = std::thread(otherThread);
t1.join();
t2.join();
}
Exceptions
ucxx::Errorif an error occurred while attempting to signal the worker.

◆ startProgressThread()

void ucxx::Worker::startProgressThread ( const bool  pollingMode = false,
const int  epollTimeout = 1 
)

Start the progress thread.

Spawns a new thread that will take care of continuously progressing the worker. The thread can progress the worker in blocking mode, using progressWorkerEvent() only when worker events happen, or in polling mode by continuously calling progress() (incurs in high CPU utilization).

Parameters
[in]pollingModeuse polling mode if true, or blocking mode if false.
[in]epollTimeouttimeout in ms when waiting for worker event, or -1 to block indefinitely, only applicable if pollingMode==true.

◆ stopProgressThread()

void ucxx::Worker::stopProgressThread ( )

Stop the progress thread.

Stop the progress thread.

May be called by the user at any time, and also called during destructor if the worker thread was ever started.

◆ stopRequestNotifierThread()

virtual void ucxx::Worker::stopRequestNotifierThread ( )
virtual

Signal the notifier to terminate.

Signals the notifier to terminate, awakening the waitRequestNotifier() blocking call.

Exceptions
std::runtime_errorif future support is not implemented.

Reimplemented in ucxx::python::Worker.

◆ tagProbe()

std::pair<bool, TagRecvInfo> ucxx::Worker::tagProbe ( const Tag  tag,
const TagMask  tagMask = TagMaskFull 
)

Check for uncaught tag messages.

Checks the worker for any uncaught tag messages. An uncaught tag message is any tag message that has been fully or partially received by the worker, but not matched by a corresponding ucp_tag_recv_* call. Additionally, returns information about the tag message.

Note this is a non-blocking call, if this is being used to actively check for an incoming message the worker should be constantly progress until a valid probe is returned.

// `worker` is `std::shared_ptr<ucxx::Worker>`
auto probe = worker->tagProbe(0);
assert(!probe.first)
// `ep` is a remote `std::shared_ptr<ucxx::Endpoint` to the local `worker`
ep->tagSend(buffer, length, 0);
probe = worker->tagProbe(0);
assert(probe.first);
assert(probe.second.tag == 0);
assert(probe.second.length == length);
std::pair< bool, TagRecvInfo > tagProbe(const Tag tag, const TagMask tagMask=TagMaskFull)
Check for uncaught tag messages.
Returns
pair where first elements is true if any uncaught messages were received, false otherwise, and second element contain the information from the tag receive.

◆ tagRecv()

std::shared_ptr<Request> ucxx::Worker::tagRecv ( void *  buffer,
size_t  length,
Tag  tag,
TagMask  tagMask,
const bool  enableFuture = false,
RequestCallbackUserFunction  callbackFunction = nullptr,
RequestCallbackUserData  callbackData = nullptr 
)

Enqueue a tag receive operation.

Enqueue a tag receive operation, returning a std::shared<ucxx::Request> that can be later awaited and checked for errors. This is a non-blocking operation, and the status of the transfer must be verified from the resulting request object before the data can be consumed.

Using a future may be requested by specifying enableFuture if the worker implementation has support for it. If a future is requested, the application must then await on this future to ensure the transfer has completed.

Parameters
[in]buffera raw pointer to pre-allocated memory where resulting data will be stored.
[in]lengththe size in bytes of the tag message to be received.
[in]tagthe tag to match.
[in]tagMaskthe tag mask to use.
[in]enableFuturewhether a future should be created and subsequently notified.
[in]callbackFunctionuser-defined callback function to call upon completion.
[in]callbackDatauser-defined data to pass to the callbackFunction.
Returns
Request to be subsequently checked for the completion and its state.

◆ waitProgress()

bool ucxx::Worker::waitProgress ( )

Block until an event has happened, then progresses.

Blocks until an event has happened as part of UCX's wake-up mechanism and progress the worker. Additionally ensure inflight messages pending for cancelation are canceled.

// worker is `std::shared_ptr<ucxx::Worker>`
// Block until UCX's wakes up for an incoming event, then fully progresses the
// worker
worker->waitProgress();
worker->progress();
// All events have been progressed.
Exceptions
ucxx::Errorif an error occurred while attempting to arm the worker.
Returns
true if any communication was progressed, false otherwise.

◆ waitRequestNotifier()

virtual RequestNotifierWaitState ucxx::Worker::waitRequestNotifier ( uint64_t  periodNs)
virtual

Block until a request event.

Blocks until some communication is completed and future is ready to be notified, shutdown was initiated or a timeout occurred (only if periodNs > 0). This method is intended for use from the notifier (such as the Python thread running it), where that thread will block until one of the aforementioned events occur.

Exceptions
std::runtime_errorif future support is not implemented.
Returns
RequestNotifierWaitState::Ready if some communication completed, RequestNotifierWaitStats::Timeout if a timeout occurred, or RequestNotifierWaitStats::Shutdown if shutdown has initiated.

Reimplemented in ucxx::python::Worker.

Friends And Related Function Documentation

◆ createRequestAm

std::shared_ptr<RequestAm> createRequestAm ( std::shared_ptr< Endpoint endpoint,
const std::variant< data::AmSend, data::AmReceive requestData,
const bool  enablePythonFuture,
RequestCallbackUserFunction  callbackFunction,
RequestCallbackUserData  callbackData 
)
friend

The constructor for a std::shared_ptr<ucxx::RequestAm> object, creating an active message request, returning a pointer to a request object that can be later awaited and checked for errors. This is a non-blocking operation, and the status of the transfer must be verified from the resulting request object before the data can be released if this is a send operation, or consumed if this is a receive operation. Received data is available via the getRecvBuffer() method if the receive transfer request completed successfully.

Exceptions
ucxx::Errorif endpoint is not a valid std::shared_ptr<ucxx::Endpoint>.
Parameters
[in]endpointthe parent endpoint.
[in]requestDatacontainer of the specified message type, including all type-specific data.
[in]enablePythonFuturewhether a python future should be created and subsequently notified.
[in]callbackFunctionuser-defined callback function to call upon completion.
[in]callbackDatauser-defined data to pass to the callbackFunction.
Returns
The shared_ptr<ucxx::RequestAm> object

◆ createWorker

std::shared_ptr<Worker> createWorker ( std::shared_ptr< Context context,
const bool  enableDelayedSubmission,
const bool  enableFuture 
)
friend

Constructor of shared_ptr<ucxx::Worker>.

The constructor for a shared_ptr<ucxx::Worker> object. The default constructor is made private to ensure all UCXX objects are shared pointers for correct lifetime management.

// context is `std::shared_ptr<ucxx::Context>`
auto worker = context->createWorker(false);
// Equivalent to line above
// auto worker = ucxx::createWorker(context, false);
Parameters
[in]contextthe context from which to create the worker.
[in]enableDelayedSubmissionif true, each ucxx::Request will not be submitted immediately, but instead delayed to the progress thread. Requires use of the progress thread.
[in]enableFutureif true, notifies the future associated with each ucxx::Request, currently used only by ucxx::python::Worker.
Returns
The shared_ptr<ucxx::Worker> object

Member Data Documentation

◆ _enableFuture

bool ucxx::Worker::_enableFuture
protected
Initial value:
{
false}

Boolean identifying whether the worker was created with future capability.


The documentation for this class was generated from the following file: