Component encapsulating a UCP worker. More...
#include <worker.h>
Public Member Functions | |
Worker (const Worker &)=delete | |
Worker & | operator= (Worker const &)=delete |
Worker (Worker &&o)=delete | |
Worker & | operator= (Worker &&o)=delete |
virtual | ~Worker () |
ucxx::Worker destructor. | |
ucp_worker_h | getHandle () |
Get the underlying ucp_worker_h handle. More... | |
std::string | getInfo () |
Get information about the underlying ucp_worker_h object. More... | |
void | initBlockingProgressMode () |
Initialize blocking progress mode. More... | |
int | getEpollFileDescriptor () |
Get the epoll file descriptor associated with the worker. More... | |
bool | arm () |
Arm the UCP worker. More... | |
bool | progressWorkerEvent (const int epollTimeout=-1) |
Progress worker event while in blocking progress mode. More... | |
void | signal () |
Signal the worker that an event happened. More... | |
bool | waitProgress () |
Block until an event has happened, then progresses. More... | |
bool | progressOnce () |
Progress the worker only once. More... | |
bool | progress () |
Progress the worker until all communication events are completed. More... | |
void | registerDelayedSubmission (std::shared_ptr< Request > request, DelayedSubmissionCallbackType callback) |
Register delayed request submission. More... | |
bool | registerGenericPre (DelayedSubmissionCallbackType callback, uint64_t period=0) |
Register callback to be executed in progress thread before progressing. More... | |
bool | registerGenericPost (DelayedSubmissionCallbackType callback, uint64_t period=0) |
Register callback to be executed in progress thread before progressing. More... | |
bool | isDelayedRequestSubmissionEnabled () const |
Inquire if worker has been created with delayed submission enabled. More... | |
bool | isFutureEnabled () const |
Inquire if worker has been created with future support. More... | |
virtual void | populateFuturesPool () |
Populate the futures pool. More... | |
virtual void | clearFuturesPool () |
Clear the futures pool. More... | |
virtual std::shared_ptr< Future > | getFuture () |
Get a future from the pool. More... | |
virtual RequestNotifierWaitState | waitRequestNotifier (uint64_t periodNs) |
Block until a request event. More... | |
virtual void | runRequestNotifier () |
Notify futures of each completed communication request. More... | |
virtual void | stopRequestNotifierThread () |
Signal the notifier to terminate. More... | |
void | setProgressThreadStartCallback (std::function< void(void *)> callback, void *callbackArg) |
Set callback to be executed at the progress thread start. More... | |
void | startProgressThread (const bool pollingMode=false, const int epollTimeout=1) |
Start the progress thread. More... | |
void | stopProgressThread () |
Stop the progress thread. More... | |
bool | isProgressThreadRunning () |
Inquire if worker has a progress thread running. More... | |
std::thread::id | getProgressThreadId () |
Get the progress thread ID. More... | |
size_t | cancelInflightRequests (uint64_t period=0, uint64_t maxAttempts=1) |
Cancel inflight requests. More... | |
void | scheduleRequestCancel (TrackedRequestsPtr trackedRequests) |
Schedule cancelation of inflight requests. More... | |
void | removeInflightRequest (const Request *const request) |
Remove reference to request from internal container. More... | |
std::pair< bool, TagRecvInfo > | tagProbe (const Tag tag, const TagMask tagMask=TagMaskFull) |
Check for uncaught tag messages. More... | |
std::shared_ptr< Request > | tagRecv (void *buffer, size_t length, Tag tag, TagMask tagMask, const bool enableFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr) |
Enqueue a tag receive operation. More... | |
std::shared_ptr< Address > | getAddress () |
Get the address of the UCX worker object. More... | |
std::shared_ptr< Endpoint > | createEndpointFromHostname (std::string ipAddress, uint16_t port, bool endpointErrorHandling=true) |
Create endpoint to worker listening on specific IP and port. More... | |
std::shared_ptr< Endpoint > | createEndpointFromWorkerAddress (std::shared_ptr< Address > address, bool endpointErrorHandling=true) |
Create endpoint to worker located at UCX address. More... | |
std::shared_ptr< Listener > | createListener (uint16_t port, ucp_listener_conn_callback_t callback, void *callbackArgs) |
Listen for remote connections on given port. More... | |
void | registerAmAllocator (ucs_memory_type_t memoryType, AmAllocatorType allocator) |
Register allocator for active messages. More... | |
void | registerAmReceiverCallback (AmReceiverCallbackInfo info, AmReceiverCallbackType callback) |
Register receiver callback for active messages. More... | |
bool | amProbe (const ucp_ep_h endpointHandle) const |
Check for uncaught active messages. More... | |
std::shared_ptr< Request > | flush (const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr) |
Enqueue a flush operation. More... | |
![]() | |
void | setParent (std::shared_ptr< Component > parent) |
Set the internal parent reference. More... | |
std::shared_ptr< Component > | getParent () const |
Get the internal parent reference. More... | |
Protected Member Functions | |
Worker (std::shared_ptr< Context > context, const bool enableDelayedSubmission=false, const bool enableFuture=false) | |
Protected constructor of ucxx::Worker . More... | |
Protected Attributes | |
bool | _enableFuture |
Boolean identifying whether the worker was created with future capability. More... | |
std::mutex | _futuresPoolMutex {} |
Mutex to access the futures pool. | |
std::queue< std::shared_ptr< Future > > | _futuresPool {} |
Futures pool to prevent running out of fresh futures. | |
std::shared_ptr< Notifier > | _notifier {nullptr} |
Notifier object. | |
std::shared_ptr< internal::AmData > | _amData |
Worker data made available to Active Messages callback. | |
![]() | |
std::shared_ptr< Component > | _parent {nullptr} |
A reference-counted pointer to the parent. | |
Friends | |
std::shared_ptr< RequestAm > | createRequestAm (std::shared_ptr< Endpoint > endpoint, const std::variant< data::AmSend, data::AmReceive > requestData, const bool enablePythonFuture, RequestCallbackUserFunction callbackFunction, RequestCallbackUserData callbackData) |
std::shared_ptr< Worker > | createWorker (std::shared_ptr< Context > context, const bool enableDelayedSubmission, const bool enableFuture) |
Constructor of shared_ptr<ucxx::Worker> . More... | |
Component encapsulating a UCP worker.
The UCP layer provides a handle to access workers in form of ucp_worker_h
object, this class encapsulates that object and provides methods to simplify its handling.
|
explicitprotected |
Protected constructor of ucxx::Worker
.
This is the internal implementation of ucxx::Worker
constructor, made protected not to be called directly. Instead the user should call context::createWorker()
or ucxx::createWorker()
(or ucxx::createPythonWorker
for the Python-enabled implementation).
[in] | context | the context from which to create the worker. |
[in] | enableDelayedSubmission | if true , each ucxx::Request will not be submitted immediately, but instead delayed to the progress thread. Requires use of the progress thread. |
[in] | enableFuture | if true , notifies the future associated with each ucxx::Request , currently used only by ucxx::python::Worker . |
bool ucxx::Worker::amProbe | ( | const ucp_ep_h | endpointHandle | ) | const |
Check for uncaught active messages.
Checks the worker for any uncaught active messages. An uncaught active message is any active message that has been fully or partially received by the worker, but not matched by a corresponding createRequestAmRecv()
call.
true
if any uncaught messages were received, false
otherwise. bool ucxx::Worker::arm | ( | ) |
Arm the UCP worker.
Wrapper for ucp_worker_arm
, checking its return status for errors and raising an exception if an error occurred.
ucxx::Error | if an error occurred while attempting to arm the worker. |
true
if worker was armed successfully, false
if its status was UCS_ERR_BUSY
. size_t ucxx::Worker::cancelInflightRequests | ( | uint64_t | period = 0 , |
uint64_t | maxAttempts = 1 |
||
) |
Cancel inflight requests.
Cancel inflight requests, returning the total number of requests that were canceled. This is usually executed during the progress loop.
If the parent worker is running a progress thread, a maximum timeout may be specified for which the close operation will wait. This can be particularly important for cases where the progress thread might be attempting to acquire a resource (e.g., the Python GIL) while the current thread owns that resource. In particular for Python, the ~Worker()
will call this method for which we can't release the GIL when the garbage collector runs and destroys the object.
[in] | period | maximum period to wait for a generic pre/post progress thread operation will wait for. |
[in] | maxAttempts | maximum number of attempts to close endpoint, only applicable if worker is running a progress thread and period > 0 . |
|
virtual |
Clear the futures pool.
Clear the futures pool, ensuring all references are removed and thus avoiding reference cycles that prevent the ucxx::Worker
and other resources from cleaning up on time.
std::runtime_error | if future support is not implemented. |
Reimplemented in ucxx::python::Worker.
std::shared_ptr<Endpoint> ucxx::Worker::createEndpointFromHostname | ( | std::string | ipAddress, |
uint16_t | port, | ||
bool | endpointErrorHandling = true |
||
) |
Create endpoint to worker listening on specific IP and port.
Creates an endpoint to a remote worker listening on a specific IP address and port. The remote worker must have an active listener created with ucxx::Worker::createListener()
.
std::invalid_argument | if the IP address or hostname is invalid. |
std::bad_alloc | if there was an error allocating space to handle the address. |
ucxx::Error | if an error occurred while attempting to create the endpoint. |
[in] | ipAddress | string containing the IP address of the remote worker. |
[in] | port | port number where the remote worker is listening at. |
[in] | endpointErrorHandling | enable endpoint error handling if true , disable otherwise. |
shared_ptr<ucxx::Endpoint>
object std::shared_ptr<Endpoint> ucxx::Worker::createEndpointFromWorkerAddress | ( | std::shared_ptr< Address > | address, |
bool | endpointErrorHandling = true |
||
) |
Create endpoint to worker located at UCX address.
Creates an endpoint to a listener-independent remote worker. The worker location is identified by its UCX address, wrapped by a std::shared_ptr<ucxx::Address>
object.
ucxx::Error | if an error occurred while attempting to create the endpoint. |
[in] | address | address of the remote UCX worker. |
[in] | endpointErrorHandling | enable endpoint error handling if true , disable otherwise. |
shared_ptr<ucxx::Endpoint>
object std::shared_ptr<Listener> ucxx::Worker::createListener | ( | uint16_t | port, |
ucp_listener_conn_callback_t | callback, | ||
void * | callbackArgs | ||
) |
Listen for remote connections on given port.
Starts a listener on given port. The listener allows remote processes to connect to the local worker via an IP and port pair. The connection is then handle via a callback specified by the user.
std::bad_alloc | if there was an error allocating space to handle the address. |
ucxx::Error | if an error occurred while attempting to create the listener or to acquire its address. |
[in] | port | port number where to listen at. |
[in] | callback | to handle each incoming connection. |
[in] | callbackArgs | pointer to argument to pass to the callback. |
shared_ptr<ucxx::Listener>
object std::shared_ptr<Request> ucxx::Worker::flush | ( | const bool | enablePythonFuture = false , |
RequestCallbackUserFunction | callbackFunction = nullptr , |
||
RequestCallbackUserData | callbackData = nullptr |
||
) |
Enqueue a flush operation.
Enqueue request to flush outstanding AMO (Atomic Memory Operation) and RMA (Remote Memory Access) operations on the worker, returning a pointer to a request object that can be later awaited and checked for errors. This is a non-blocking operation, and its status must be verified from the resulting request object to confirm the flush operation has completed successfully.
Using a Python future may be requested by specifying enablePythonFuture
. If a Python future is requested, the Python application must then await on this future to ensure the transfer has completed. Requires UCXX Python support.
[in] | buffer | a raw pointer to the data to be sent. |
[in] | enablePythonFuture | whether a python future should be created and subsequently notified. |
[in] | callbackFunction | user-defined callback function to call upon completion. |
[in] | callbackData | user-defined data to pass to the callbackFunction . |
std::shared_ptr<Address> ucxx::Worker::getAddress | ( | ) |
Get the address of the UCX worker object.
Gets the address of the underlying UCX worker object, which can then be passed to a remote worker, allowing creating a new endpoint to the local worker via ucxx::Worker::createEndpointFromWorkerAddress()
.
ucxx::Error | if an error occurred while attempting to get the worker address. |
int ucxx::Worker::getEpollFileDescriptor | ( | ) |
Get the epoll file descriptor associated with the worker.
Get the epoll file descriptor associated with the worker when running in blocking mode. The worker only has an associated epoll file descriptor after initBlockingProgressMode()
is executed.
The file descriptor is destroyed as part of the ucxx::Worker
destructor, thus any reference to it shall not be used after that.
std::runtime_error | if initBlockingProgressMode() was not executed to run the worker in blocking progress mode. |
|
virtual |
Get a future from the pool.
Get a future from the pool. If the pool is empty, ucxx::Worker::populateFuturesPool()
is called and a warning is raised, since that likely means the user is missing to call the aforementioned method regularly.
std::runtime_error | if future support is not implemented. |
shared_ptr<ucxx::python::Future>
object Reimplemented in ucxx::python::Worker.
ucp_worker_h ucxx::Worker::getHandle | ( | ) |
Get the underlying ucp_worker_h
handle.
Lifetime of the ucp_worker_h
handle is managed by the ucxx::Worker
object and its ownership is non-transferrable. Once the ucxx::Worker
is destroyed the handle is not valid anymore, it is the user's responsibility to ensure the owner's lifetime while using the handle.
ucp_worker_h
handle. std::string ucxx::Worker::getInfo | ( | ) |
Get information about the underlying ucp_worker_h
object.
Convenience wrapper for ucp_worker_print_info()
to get information about the underlying UCP worker handle and return it as a string.
std::thread::id ucxx::Worker::getProgressThreadId | ( | ) |
Get the progress thread ID.
Get the progress thread ID, only valid if startProgressThread()
was called.
void ucxx::Worker::initBlockingProgressMode | ( | ) |
Initialize blocking progress mode.
Initialize blocking progress mode, creates internal file descriptors to handle blocking progress by waiting for the UCP worker to notify the file descriptors. This method is supposed to be called when usage of progressWorkerEvent()
is intended, before the first call to progressWorkerEvent()
. If using polling mode only via progress()
/progressOnce()
calls or wake-up with waitProgress()
, this method should not be called.
In blocking mode, the user should call progressWorkerEvent()
to block and then progress the worker as new events arrive. wakeProgressEvent()
may be called to forcefully wake this method, for example to shutdown the application.
std::ios_base::failure | if creating any of the file descriptors or setting their statuses. |
bool ucxx::Worker::isDelayedRequestSubmissionEnabled | ( | ) | const |
Inquire if worker has been created with delayed submission enabled.
Check whether the worker has been created with delayed submission enabled.
true
if delayed submission is enabled, false
otherwise. bool ucxx::Worker::isFutureEnabled | ( | ) | const |
Inquire if worker has been created with future support.
Check whether the worker has been created with future support.
true
if future support is enabled, false
otherwise. bool ucxx::Worker::isProgressThreadRunning | ( | ) |
Inquire if worker has a progress thread running.
Check whether the worker currently has a progress thread running.
true
if a progress thread is running, false
otherwise.
|
virtual |
Populate the futures pool.
To avoid taking blocking resources (such as the Python GIL) for every new future required by each ucxx::Request
, the ucxx::Worker
maintains a pool of futures that can be acquired when a new ucxx::Request
is created. Currently the pool has a maximum size of 100 objects, and will refill once it goes under 50, otherwise calling this functions results in a no-op.
std::runtime_error | if future support is not implemented. |
Reimplemented in ucxx::python::Worker.
bool ucxx::Worker::progress | ( | ) |
Progress the worker until all communication events are completed.
Iteratively calls progressOnce()
until all communication events are completed. Additionally ensure inflight messages pending for cancelation are canceled.
bool ucxx::Worker::progressOnce | ( | ) |
Progress the worker only once.
Wrapper for ucp_worker_progress
.
true
if any communication was progressed, false
otherwise. bool ucxx::Worker::progressWorkerEvent | ( | const int | epollTimeout = -1 | ) |
Progress worker event while in blocking progress mode.
Blocks until a new worker event has happened and the worker notifies the file descriptor associated with it, or epollTimeout
has elapsed. Requires blocking progress mode to be initialized with initBlockingProgressMode()
before the first call to this method. Additionally ensure inflight messages pending for cancelation are canceled.
[in] | epollTimeout | timeout in ms when waiting for worker event, or -1 to block indefinitely. |
std::ios_base::failure | if creating any of the file descriptors or setting their statuses. |
true
if any communication was progressed, false
otherwise. void ucxx::Worker::registerAmAllocator | ( | ucs_memory_type_t | memoryType, |
AmAllocatorType | allocator | ||
) |
Register allocator for active messages.
Register a new allocator for active messages. By default, only one allocator is defined for host memory (UCS_MEMORY_TYPE_HOST
), and is used as a fallback when an allocator for the source's memory type is unavailable. In many circumstances relying exclusively on the host allocator is undesirable, for example when transferring CUDA buffers the destination is always going to be a host buffer and prevent the use of transports such as NVLink or InfiniBand+GPUDirectRDMA. For that reason it's important that the user defines those allocators that are important for the application.
If the memoryType
has already been registered, the previous allocator will be replaced by the new one. Be careful when doing this after transfers have started, there are no guarantees that inflight messages have not already been allocated with the old allocator for that type.
[in] | memoryType | the memory type the allocator will be used for. |
[in] | allocator | the allocator callable that will be used to allocate new active message buffers. |
void ucxx::Worker::registerAmReceiverCallback | ( | AmReceiverCallbackInfo | info, |
AmReceiverCallbackType | callback | ||
) |
Register receiver callback for active messages.
Register a new receiver callback for active messages. By default, active messages do not execute any callbacks on the receiving end unless one is specified when sending the message. If the message sender specifies a callback receiver identifier then the remote receiver needs to have a callback registered with the same identifier to execute when the request completes. To ensure multiple applications that do not know about each other can have coexisting callbacks where receiver identifiers may have the same value, an owner must be specified as well, which has the form of a string and should be reasonably unique to prevent accidentally calling callbacks from a separate application, thus names like "A" or "UCX" are discouraged in favor of more descriptive names such as "MyFastCommsProject", and the name "ucxx" is reserved.
Because it is impossible to predict which callback would be called in such an event, the registered callback cannot be changed, thus calling this method with the same given owner and identifier will throw std::runtime_error
.
std::runtime_error | if a callback with same given owner and identifier is already registered, or if the reserved owner name "ucxx" is specified. |
[in] | receiverCallbackInfo | the owner name and unique identifier of the receiver callback. |
[in] | callback | the callback to execute when the active message is received. |
void ucxx::Worker::registerDelayedSubmission | ( | std::shared_ptr< Request > | request, |
DelayedSubmissionCallbackType | callback | ||
) |
Register delayed request submission.
Register ucxx::Request
for delayed submission. When the ucxx::Worker
is created with enableDelayedSubmission=true
, calling actual UCX transfer routines will not happen immediately and instead will be submitted later by the worker thread.
The purpose of this method is to offload as much as possible any work to the worker thread, thus decreasing computation on the caller thread, but potentially increasing transfer latency.
[in] | request | the request to which the callback belongs, ensuring it remains alive until the callback is invoked. |
[in] | callback | the callback set to execute the UCP transfer routine during the worker thread loop. |
bool ucxx::Worker::registerGenericPost | ( | DelayedSubmissionCallbackType | callback, |
uint64_t | period = 0 |
||
) |
Register callback to be executed in progress thread before progressing.
Register callback to be executed in the current or next iteration of the progress thread after the worker is progressed. There is no guarantee that the callback will be executed in the current or next iteration, this depends on where the progress thread is in the current iteration when this callback is registered.
The purpose of this method is to schedule operations to be executed in the progress thread, immediately after progressing the worker completes.
If period
is 0
this is a blocking call that only returns when the callback has been executed and will always return true
, and if period
is a positive integer the time in nanoseconds will be waited for the callback to complete and return true
in the successful case or false
otherwise. However, if the callback is not cancelable anymore (i.e., it has already started), this method will keep retrying and may never return if the callback never completes, it is unsafe to return as this would allow the caller to destroy the callback and its resources causing undefined behavior. period
only applies if the worker progress thread is running, otherwise the callback is immediately executed.
[in] | callback | the callback to execute before progressing the worker. |
[in] | period | the time in nanoseconds to wait for the callback to complete. |
true
if the callback was successfully executed or false
if timed out. bool ucxx::Worker::registerGenericPre | ( | DelayedSubmissionCallbackType | callback, |
uint64_t | period = 0 |
||
) |
Register callback to be executed in progress thread before progressing.
Register callback to be executed in the current or next iteration of the progress thread before the worker is progressed. There is no guarantee that the callback will be executed in the current or next iteration, this depends on where the progress thread is in the current iteration when this callback is registered.
The purpose of this method is to schedule operations to be executed in the progress thread, such as endpoint creation and closing, so that progressing doesn't ever need to occur in the application thread when using a progress thread.
If period
is 0
this is a blocking call that only returns when the callback has been executed and will always return true
, and if period
is a positive integer the time in nanoseconds will be waited for the callback to complete and return true
in the successful case or false
otherwise. However, if the callback is not cancelable anymore (i.e., it has already started), this method will keep retrying and may never return if the callback never completes, it is unsafe to return as this would allow the caller to destroy the callback and its resources causing undefined behavior. period
only applies if the worker progress thread is running, otherwise the callback is immediately executed.
[in] | callback | the callback to execute before progressing the worker. |
[in] | period | the time in nanoseconds to wait for the callback to complete. |
true
if the callback was successfully executed or false
if timed out. void ucxx::Worker::removeInflightRequest | ( | const Request *const | request | ) |
Remove reference to request from internal container.
Remove the reference to a specific request from the internal container. This should be called when a request has completed and the ucxx::Worker
does not need to keep track of it anymore. The raw pointer to a ucxx::Request
is passed here as opposed to the usual std::shared_ptr<ucxx::Request>
used elsewhere, this is because the raw pointer address is used as key to the requests reference, and this is called from the object's destructor.
[in] | request | raw pointer to the request |
|
virtual |
Notify futures of each completed communication request.
Notifies futures of each completed communication request of their new status. This method is intended to be used from the Notifier (such as the Python thread running it), where the thread will call waitRequestNotifier()
and block until some communication is completed, and then call this method to notify all futures. If this is notifying a Python future, the thread where this method is called from must be using the same Python event loop as the thread that submitted the transfer request.
std::runtime_error | if future support is not implemented. |
Reimplemented in ucxx::python::Worker.
void ucxx::Worker::scheduleRequestCancel | ( | TrackedRequestsPtr | trackedRequests | ) |
Schedule cancelation of inflight requests.
Schedule inflight request to be canceled when cancelInflightRequests()
is executed the next time, usually during the progress loop. This is usually called from a ucxx::Endpoint
, for example when the error callback was called, signaling that inflight requests for that endpoint will not be completed successfully and should be canceled.
[in] | trackedRequests | the requests tracked by a child of this class to be scheduled for cancelation. |
void ucxx::Worker::setProgressThreadStartCallback | ( | std::function< void(void *)> | callback, |
void * | callbackArg | ||
) |
Set callback to be executed at the progress thread start.
Sets a callback that will be executed at the beginning of the progress thread. This can be used to initialize any resources that are required to be available on the thread the worker will be progressed from, such as a CUDA context.
[in] | callback | function to execute during progress thread start |
[in] | callbackArg | argument to be passed to the callback function |
void ucxx::Worker::signal | ( | ) |
Signal the worker that an event happened.
Signals that an event has happened while, causing both either progressWorkerEvent()
or waitProgress()
to immediately wake-up.
ucxx::Error | if an error occurred while attempting to signal the worker. |
void ucxx::Worker::startProgressThread | ( | const bool | pollingMode = false , |
const int | epollTimeout = 1 |
||
) |
Start the progress thread.
Spawns a new thread that will take care of continuously progressing the worker. The thread can progress the worker in blocking mode, using progressWorkerEvent()
only when worker events happen, or in polling mode by continuously calling progress()
(incurs in high CPU utilization).
[in] | pollingMode | use polling mode if true , or blocking mode if false . |
[in] | epollTimeout | timeout in ms when waiting for worker event, or -1 to block indefinitely, only applicable if pollingMode==true . |
void ucxx::Worker::stopProgressThread | ( | ) |
Stop the progress thread.
Stop the progress thread.
May be called by the user at any time, and also called during destructor if the worker thread was ever started.
|
virtual |
Signal the notifier to terminate.
Signals the notifier to terminate, awakening the waitRequestNotifier()
blocking call.
std::runtime_error | if future support is not implemented. |
Reimplemented in ucxx::python::Worker.
std::pair<bool, TagRecvInfo> ucxx::Worker::tagProbe | ( | const Tag | tag, |
const TagMask | tagMask = TagMaskFull |
||
) |
Check for uncaught tag messages.
Checks the worker for any uncaught tag messages. An uncaught tag message is any tag message that has been fully or partially received by the worker, but not matched by a corresponding ucp_tag_recv_*
call. Additionally, returns information about the tag message.
Note this is a non-blocking call, if this is being used to actively check for an incoming message the worker should be constantly progress until a valid probe is returned.
true
if any uncaught messages were received, false
otherwise, and second element contain the information from the tag receive. std::shared_ptr<Request> ucxx::Worker::tagRecv | ( | void * | buffer, |
size_t | length, | ||
Tag | tag, | ||
TagMask | tagMask, | ||
const bool | enableFuture = false , |
||
RequestCallbackUserFunction | callbackFunction = nullptr , |
||
RequestCallbackUserData | callbackData = nullptr |
||
) |
Enqueue a tag receive operation.
Enqueue a tag receive operation, returning a std::shared<ucxx::Request>
that can be later awaited and checked for errors. This is a non-blocking operation, and the status of the transfer must be verified from the resulting request object before the data can be consumed.
Using a future may be requested by specifying enableFuture
if the worker implementation has support for it. If a future is requested, the application must then await on this future to ensure the transfer has completed.
[in] | buffer | a raw pointer to pre-allocated memory where resulting data will be stored. |
[in] | length | the size in bytes of the tag message to be received. |
[in] | tag | the tag to match. |
[in] | tagMask | the tag mask to use. |
[in] | enableFuture | whether a future should be created and subsequently notified. |
[in] | callbackFunction | user-defined callback function to call upon completion. |
[in] | callbackData | user-defined data to pass to the callbackFunction . |
bool ucxx::Worker::waitProgress | ( | ) |
Block until an event has happened, then progresses.
Blocks until an event has happened as part of UCX's wake-up mechanism and progress the worker. Additionally ensure inflight messages pending for cancelation are canceled.
ucxx::Error | if an error occurred while attempting to arm the worker. |
true
if any communication was progressed, false
otherwise.
|
virtual |
Block until a request event.
Blocks until some communication is completed and future is ready to be notified, shutdown was initiated or a timeout occurred (only if periodNs > 0
). This method is intended for use from the notifier (such as the Python thread running it), where that thread will block until one of the aforementioned events occur.
std::runtime_error | if future support is not implemented. |
RequestNotifierWaitState::Ready
if some communication completed, RequestNotifierWaitStats::Timeout
if a timeout occurred, or RequestNotifierWaitStats::Shutdown
if shutdown has initiated. Reimplemented in ucxx::python::Worker.
|
friend |
The constructor for a std::shared_ptr<ucxx::RequestAm>
object, creating an active message request, returning a pointer to a request object that can be later awaited and checked for errors. This is a non-blocking operation, and the status of the transfer must be verified from the resulting request object before the data can be released if this is a send operation, or consumed if this is a receive operation. Received data is available via the getRecvBuffer()
method if the receive transfer request completed successfully.
ucxx::Error | if endpoint is not a valid std::shared_ptr<ucxx::Endpoint> . |
[in] | endpoint | the parent endpoint. |
[in] | requestData | container of the specified message type, including all type-specific data. |
[in] | enablePythonFuture | whether a python future should be created and subsequently notified. |
[in] | callbackFunction | user-defined callback function to call upon completion. |
[in] | callbackData | user-defined data to pass to the callbackFunction . |
shared_ptr<ucxx::RequestAm>
object
|
friend |
Constructor of shared_ptr<ucxx::Worker>
.
The constructor for a shared_ptr<ucxx::Worker>
object. The default constructor is made private to ensure all UCXX objects are shared pointers for correct lifetime management.
[in] | context | the context from which to create the worker. |
[in] | enableDelayedSubmission | if true , each ucxx::Request will not be submitted immediately, but instead delayed to the progress thread. Requires use of the progress thread. |
[in] | enableFuture | if true , notifies the future associated with each ucxx::Request , currently used only by ucxx::python::Worker . |
shared_ptr<ucxx::Worker>
object
|
protected |
Boolean identifying whether the worker was created with future capability.