13 #include <ucp/api/ucp.h>
15 #include <ucxx/address.h>
16 #include <ucxx/component.h>
17 #include <ucxx/exception.h>
18 #include <ucxx/inflight_requests.h>
19 #include <ucxx/listener.h>
20 #include <ucxx/request.h>
21 #include <ucxx/typedefs.h>
22 #include <ucxx/utils/sockaddr.h>
23 #include <ucxx/worker.h>
52 ucp_ep_h _handle{
nullptr};
53 ucp_ep_h _originalHandle{
nullptr};
55 bool _endpointErrorHandling{
true};
56 std::unique_ptr<InflightRequests> _inflightRequests{
57 std::make_unique<InflightRequests>()};
61 ucs_status_t _status{UCS_INPROGRESS};
62 std::atomic<bool> _closing{
false};
91 Endpoint(std::shared_ptr<Component> workerOrListener,
bool endpointErrorHandling);
102 void create(ucp_ep_params_t* params);
115 [[nodiscard]] std::shared_ptr<Request> registerInflightRequest(std::shared_ptr<Request> request);
161 std::string ipAddress,
163 bool endpointErrorHandling);
188 ucp_conn_request_h connRequest,
189 bool endpointErrorHandling);
211 std::shared_ptr<Address> address,
212 bool endpointErrorHandling);
366 [[nodiscard]] std::shared_ptr<Request>
amSend(
369 const ucs_memory_type_t memoryType,
370 const std::optional<AmReceiverCallbackInfo> receiverCallbackInfo = std::nullopt,
371 const bool enablePythonFuture =
false,
396 [[nodiscard]] std::shared_ptr<Request>
amRecv(
397 const bool enablePythonFuture =
false,
423 [[nodiscard]] std::shared_ptr<Request>
memPut(
426 uint64_t remote_addr,
428 const bool enablePythonFuture =
false,
456 [[nodiscard]] std::shared_ptr<Request>
memPut(
459 std::shared_ptr<ucxx::RemoteKey> remoteKey,
460 uint64_t remoteAddrOffset = 0,
461 const bool enablePythonFuture =
false,
487 [[nodiscard]] std::shared_ptr<Request>
memGet(
492 const bool enablePythonFuture =
false,
520 [[nodiscard]] std::shared_ptr<Request>
memGet(
523 std::shared_ptr<ucxx::RemoteKey> remoteKey,
524 uint64_t remoteAddrOffset = 0,
525 const bool enablePythonFuture =
false,
548 [[nodiscard]] std::shared_ptr<Request>
streamSend(
void* buffer,
550 const bool enablePythonFuture);
572 [[nodiscard]] std::shared_ptr<Request>
streamRecv(
void* buffer,
574 const bool enablePythonFuture);
598 [[nodiscard]] std::shared_ptr<Request>
tagSend(
602 const bool enablePythonFuture =
false,
630 [[nodiscard]] std::shared_ptr<Request>
tagRecv(
635 const bool enablePythonFuture =
false,
672 [[nodiscard]] std::shared_ptr<Request>
tagMultiSend(
const std::vector<void*>& buffer,
673 const std::vector<size_t>& size,
674 const std::vector<int>& isCUDA,
676 const bool enablePythonFuture);
702 const bool enablePythonFuture);
725 [[nodiscard]] std::shared_ptr<Request>
flush(
726 const bool enablePythonFuture =
false,
782 [[nodiscard]] std::shared_ptr<Request>
close(
783 const bool enablePythonFuture =
false,
A UCXX component class to prevent early destruction of parent object.
Definition: component.h:17
Component encapsulating a UCP endpoint.
Definition: endpoint.h:50
std::shared_ptr< Request > memPut(void *buffer, size_t length, uint64_t remote_addr, ucp_rkey_h rkey, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a memory put operation.
std::shared_ptr< Request > flush(const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a flush operation.
std::shared_ptr< Request > close(const bool enablePythonFuture=false, EndpointCloseCallbackUserFunction callbackFunction=nullptr, EndpointCloseCallbackUserData callbackData=nullptr)
Enqueue a non-blocking endpoint close operation.
bool isAlive() const
Check whether the endpoint is still alive.
size_t cancelInflightRequests()
Cancel inflight requests.
std::shared_ptr< Request > tagSend(void *buffer, size_t length, Tag tag, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a tag send operation.
std::shared_ptr< Request > streamRecv(void *buffer, size_t length, const bool enablePythonFuture)
Enqueue a stream receive operation.
void removeInflightRequest(const Request *const request)
Remove reference to request from internal container.
size_t cancelInflightRequestsBlocking(uint64_t period=0, uint64_t maxAttempts=1)
Cancel inflight requests.
std::shared_ptr< Request > tagMultiRecv(const Tag tag, const TagMask tagMask, const bool enablePythonFuture)
Enqueue a multi-buffer tag receive operation.
std::shared_ptr< Request > amRecv(const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue an active message receive operation.
void setCloseCallback(EndpointCloseCallbackUserFunction closeCallback, EndpointCloseCallbackUserData closeCallbackArg)
Register a user-defined callback to call when endpoint closes.
friend std::shared_ptr< Endpoint > createEndpointFromWorkerAddress(std::shared_ptr< Worker > worker, std::shared_ptr< Address > address, bool endpointErrorHandling)
Constructor for shared_ptr<ucxx::Endpoint>.
std::shared_ptr< Request > memGet(void *buffer, size_t length, std::shared_ptr< ucxx::RemoteKey > remoteKey, uint64_t remoteAddrOffset=0, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a memory get operation.
std::shared_ptr< Worker > getWorker()
Get ucxx::Worker component from a worker or listener object.
std::shared_ptr< Request > tagMultiSend(const std::vector< void * > &buffer, const std::vector< size_t > &size, const std::vector< int > &isCUDA, const Tag tag, const bool enablePythonFuture)
Enqueue a multi-buffer tag send operation.
std::shared_ptr< Request > streamSend(void *buffer, size_t length, const bool enablePythonFuture)
Enqueue a stream send operation.
friend void endpointErrorCallback(void *arg, ucp_ep_h ep, ucs_status_t status)
The error callback registered at endpoint creation time.
ucp_ep_h getHandle()
Get the underlying ucp_ep_h handle.
std::shared_ptr< Request > amSend(void *buffer, const size_t length, const ucs_memory_type_t memoryType, const std::optional< AmReceiverCallbackInfo > receiverCallbackInfo=std::nullopt, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue an active message send operation.
size_t getCancelingSize() const
Check the number of inflight requests being canceled.
void raiseOnError()
Raises an exception if an error occurred.
std::shared_ptr< Request > memGet(void *buffer, size_t length, uint64_t remoteAddr, ucp_rkey_h rkey, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a memory get operation.
std::shared_ptr< Request > tagRecv(void *buffer, size_t length, Tag tag, TagMask tagMask, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a tag receive operation.
friend std::shared_ptr< Endpoint > createEndpointFromHostname(std::shared_ptr< Worker > worker, std::string ipAddress, uint16_t port, bool endpointErrorHandling)
Constructor for shared_ptr<ucxx::Endpoint>.
std::shared_ptr< Request > memPut(void *buffer, size_t length, std::shared_ptr< ucxx::RemoteKey > remoteKey, uint64_t remoteAddrOffset=0, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a memory put operation.
friend std::shared_ptr< Endpoint > createEndpointFromConnRequest(std::shared_ptr< Listener > listener, ucp_conn_request_h connRequest, bool endpointErrorHandling)
Constructor for shared_ptr<ucxx::Endpoint>.
void closeBlocking(uint64_t period=0, uint64_t maxAttempts=1)
Close the endpoint while keeping the object alive.
Base type for a UCXX transfer request.
Definition: request.h:38
std::function< void(ucs_status_t, std::shared_ptr< void >)> RequestCallbackUserFunction
A user-defined function to execute as part of a ucxx::Request callback.
Definition: typedefs.h:103
std::shared_ptr< void > RequestCallbackUserData
Data for the user-defined function provided to the ucxx::Request callback.
Definition: typedefs.h:111
RequestCallbackUserData EndpointCloseCallbackUserData
Data for the user-defined function provided to endpoint close callback.
Definition: typedefs.h:127
RequestCallbackUserFunction EndpointCloseCallbackUserFunction
A user-defined function to execute after an endpoint closes.
Definition: typedefs.h:119
TagMask
Strong type for a UCP tag mask.
Definition: typedefs.h:66
Tag
Strong type for a UCP tag.
Definition: typedefs.h:58
Deleter for a endpoint parameters object.
Definition: endpoint.h:33
void operator()(ucp_ep_params_t *ptr)
Execute the deletion.