Send or receive multiple messages with the UCX Tag API. More...
#include <request_tag_multi.h>
Public Member Functions | |
virtual | ~RequestTagMulti () |
ucxx::RequestTagMulti destructor. More... | |
void | markCompleted (ucs_status_t status) |
Mark request as completed. More... | |
void | recvCallback (ucs_status_t status) |
Callback to submit request to receive new header or frames. More... | |
void | populateDelayedSubmission () override |
Populate the internal submission dispatcher. More... | |
void | cancel () override |
Cancel the request. More... | |
![]() | |
Request (const Request &)=delete | |
Request & | operator= (Request const &)=delete |
Request (Request &&o)=delete | |
Request & | operator= (Request &&o)=delete |
virtual | ~Request () |
ucxx::Request destructor. More... | |
ucs_status_t | getStatus () |
Return the status of the request. More... | |
void * | getFuture () |
Return the future used to check on state. More... | |
void | checkError () |
Check whether the request completed with an error. More... | |
bool | isCompleted () |
Check whether the request has already completed. More... | |
void | callback (void *request, ucs_status_t status) |
Callback executed by UCX when request is completed. More... | |
const std::string & | getOwnerString () const |
Get formatted string with owner type and handle address. More... | |
virtual std::shared_ptr< Buffer > | getRecvBuffer () |
Get the received buffer. More... | |
![]() | |
void | setParent (std::shared_ptr< Component > parent) |
Set the internal parent reference. More... | |
std::shared_ptr< Component > | getParent () const |
Get the internal parent reference. More... | |
Public Attributes | |
std::vector< BufferRequestPtr > | _bufferRequests {} |
Container of all requests posted. | |
bool | _isFilled {false} |
Whether the all requests have been posted. | |
Friends | |
std::shared_ptr< RequestTagMulti > | createRequestTagMulti (std::shared_ptr< Endpoint > endpoint, const std::variant< data::TagMultiSend, data::TagMultiReceive > requestData, const bool enablePythonFuture) |
Enqueue a multi-buffer tag send operation. More... | |
Additional Inherited Members | |
![]() | |
Request (std::shared_ptr< Component > endpointOrWorker, const data::RequestData requestData, std::string operationName, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr) | |
Protected constructor of an abstract ucxx::Request . More... | |
void | process () |
Perform initial processing of the request to determine if immediate completion. More... | |
void | setStatus (ucs_status_t status) |
Set the request status and notify Python future. More... | |
![]() | |
ucs_status_t | _status {UCS_INPROGRESS} |
Requests status. | |
std::string | _status_msg {} |
Human-readable status message. | |
void * | _request {nullptr} |
Pointer to UCP request. | |
std::shared_ptr< Future > | _future {nullptr} |
Future to notify upon completion. | |
std::shared_ptr< Worker > | _worker |
Worker that generated request (if not from endpoint) More... | |
std::shared_ptr< Endpoint > | _endpoint |
Endpoint that generated request (if not from worker) More... | |
std::string | _ownerString |
String to print owner (endpoint or worker) when logging. More... | |
std::recursive_mutex | _mutex {} |
Mutex to prevent checking status while it's being set. | |
data::RequestData | _requestData {} |
The operation-specific data to be used in the request. | |
std::string | _operationName |
Human-readable operation name, mostly used for log messages. More... | |
bool | _enablePythonFuture {true} |
Whether Python future is enabled for this request. | |
RequestCallbackUserFunction | _callback {nullptr} |
Completion callback. | |
RequestCallbackUserData | _callbackData {nullptr} |
Completion callback data. | |
![]() | |
std::shared_ptr< Component > | _parent {nullptr} |
A reference-counted pointer to the parent. | |
Send or receive multiple messages with the UCX Tag API.
Send or receive multiple messages with the UCX Tag API. This is done combining multiple messages with ucxx::RequestTag
, first sending/receiving a header, followed by sending/receiving the user messages. Intended primarily for use with Python, such that the program can then only wait for the completion of one future and thus reduce potentially expensive iterations over multiple futures.
|
virtual |
ucxx::RequestTagMulti
destructor.
Free internal resources.
|
overridevirtual |
Cancel the request.
Cancel the request. Often called by the error handler or parent's object destructor but may be called by the user to cancel the request as well.
Reimplemented from ucxx::Request.
void ucxx::RequestTagMulti::markCompleted | ( | ucs_status_t | status | ) |
Mark request as completed.
Mark a single ucxx::RequestTag
as completed. This method is passed as the user-defined callback to the ucxx::RequestTag
constructor, which will then be executed when that completes.
When this method is called, the request that completed will be pushed into a container which will be later used to evaluate if all frames completed and set the final status of the multi-transfer request and the Python future, if enabled. The final status is either UCS_OK
if all underlying requests completed successfully, otherwise it will contain the status of the first failing request, for granular information the user may still verify each of the underlying requests individually.
[in] | status | the status of the request being completed. |
|
overridevirtual |
Populate the internal submission dispatcher.
The ucxx::Request
utilizes ucxx::DelayedSubmission
to manage when the request will be dispatched. This method is registered as a callback in the worker, that may choose to either execute (submit) it immediately or delay for the next iteration of its progress loop, depending on the progress mode in use by the worker.
See ucxx::DelayedSubmission::DelayedSubmission()
for more details.
Implements ucxx::Request.
void ucxx::RequestTagMulti::recvCallback | ( | ucs_status_t | status | ) |
Callback to submit request to receive new header or frames.
When a receive multi-transfer tag request is created or has received a new header, this callback must be executed to ensure the next request to receive is submitted.
If no requests for the present ucxx::RequestTagMulti
transfer have been posted yet, create one receiving a message with header. If the previous received request is header containing the next
flag set, then the next request is another header. Otherwise, the next incoming message(s) is(are) frame(s).
When called, the callback receives a single argument, the status of the current request.
[in] | status | the status of the request being completed. |
std::runtime_error | if called by a send request. |
|
friend |
Enqueue a multi-buffer tag send operation.
Initiate a multi-buffer tag operation, returning a std::shared<ucxx::RequestTagMulti>
that can be later awaited and checked for errors.
This is a non-blocking operation, and the status of a send transfer must be verified from the resulting request object before the data can be released. If this is a receive transfer and because the receiver has no a priori knowledge of the data being received, memory allocations are automatically handled internally. The receiver must have the same capabilities of the sender, so that if the sender is compiled with RMM support to allow for CUDA transfers, the receiver must have the ability to understand and allocate CUDA memory.
The primary use of multi-buffer transfers is in Python where we want to reduce the amount of futures needed to watch for, thus reducing Python overhead. However, this may be used as a convenience implementation for transfers that require multiple frames, internally this is implemented as one or more ucxx::RequestTag
calls sending headers (depending on the number of frames being transferred), followed by one ucxx::RequestTag
for each data frame.
Using a Python future may be requested by specifying enablePythonFuture
. If a Python future is requested, the Python application must then await on this future to ensure the transfer has completed. Requires UCXX to be compiled with UCXX_ENABLE_PYTHON=1
.
std::runtime_error | if sizes of buffer , size and isCUDA do not match. |
[in] | endpoint | the std::shared_ptr<Endpoint> parent component |
[in] | requestData | container of the specified message type, including all type-specific data. |
[in] | enablePythonFuture | whether a python future should be created and subsequently notified. |