endpoint.h
1 
5 #pragma once
6 
7 #include <netdb.h>
8 
9 #include <memory>
10 #include <string>
11 #include <vector>
12 
13 #include <ucp/api/ucp.h>
14 
15 #include <ucxx/address.h>
16 #include <ucxx/component.h>
17 #include <ucxx/exception.h>
18 #include <ucxx/inflight_requests.h>
19 #include <ucxx/listener.h>
20 #include <ucxx/request.h>
21 #include <ucxx/typedefs.h>
22 #include <ucxx/utils/sockaddr.h>
23 #include <ucxx/worker.h>
24 
25 namespace ucxx {
26 
41  void operator()(ucp_ep_params_t* ptr);
42 };
43 
50 class Endpoint : public Component {
51  private:
52  ucp_ep_h _handle{nullptr};
53  ucp_ep_h _originalHandle{nullptr};
55  bool _endpointErrorHandling{true};
56  std::unique_ptr<InflightRequests> _inflightRequests{
57  std::make_unique<InflightRequests>()};
58  std::mutex _mutex{};
61  ucs_status_t _status{UCS_INPROGRESS};
62  std::atomic<bool> _closing{false};
63  EndpointCloseCallbackUserFunction _closeCallback{nullptr};
64  EndpointCloseCallbackUserData _closeCallbackArg{
65  nullptr};
66 
91  Endpoint(std::shared_ptr<Component> workerOrListener, bool endpointErrorHandling);
92 
102  void create(ucp_ep_params_t* params);
103 
115  [[nodiscard]] std::shared_ptr<Request> registerInflightRequest(std::shared_ptr<Request> request);
116 
127  friend void endpointErrorCallback(void* arg, ucp_ep_h ep, ucs_status_t status);
128 
129  public:
130  Endpoint() = delete;
131  Endpoint(const Endpoint&) = delete;
132  Endpoint& operator=(Endpoint const&) = delete;
133  Endpoint(Endpoint&& o) = delete;
134  Endpoint& operator=(Endpoint&& o) = delete;
135 
136  ~Endpoint();
137 
160  friend std::shared_ptr<Endpoint> createEndpointFromHostname(std::shared_ptr<Worker> worker,
161  std::string ipAddress,
162  uint16_t port,
163  bool endpointErrorHandling);
164 
187  friend std::shared_ptr<Endpoint> createEndpointFromConnRequest(std::shared_ptr<Listener> listener,
188  ucp_conn_request_h connRequest,
189  bool endpointErrorHandling);
190 
210  friend std::shared_ptr<Endpoint> createEndpointFromWorkerAddress(std::shared_ptr<Worker> worker,
211  std::shared_ptr<Address> address,
212  bool endpointErrorHandling);
213 
229  [[nodiscard]] ucp_ep_h getHandle();
230 
242  [[nodiscard]] bool isAlive() const;
243 
253  void raiseOnError();
254 
264  void removeInflightRequest(std::shared_ptr<Request> request);
265 
277 
287  [[nodiscard]] size_t getCancelingSize() const;
288 
310  size_t cancelInflightRequestsBlocking(uint64_t period = 0, uint64_t maxAttempts = 1);
311 
330  EndpointCloseCallbackUserData closeCallbackArg);
331 
370  [[nodiscard]] std::shared_ptr<Request> amSend(
371  const void* const buffer,
372  const size_t length,
373  const ucs_memory_type_t memoryType,
374  const std::optional<AmReceiverCallbackInfo> receiverCallbackInfo = std::nullopt,
375  const bool enablePythonFuture = false,
376  RequestCallbackUserFunction callbackFunction = nullptr,
377  RequestCallbackUserData callbackData = nullptr);
378 
395  [[nodiscard]] std::shared_ptr<Request> amSend(
396  const void* const buffer,
397  const size_t length,
398  const AmSendParams& params,
399  const bool enablePythonFuture = false,
400  RequestCallbackUserFunction callbackFunction = nullptr,
401  RequestCallbackUserData callbackData = nullptr);
402 
418  [[nodiscard]] std::shared_ptr<Request> amSend(
419  std::vector<ucp_dt_iov_t> iov,
420  const AmSendParams& params,
421  const bool enablePythonFuture = false,
422  RequestCallbackUserFunction callbackFunction = nullptr,
423  RequestCallbackUserData callbackData = nullptr);
424 
453  [[nodiscard]] std::shared_ptr<Request> amRecv(
454  const bool enablePythonFuture = false,
455  RequestCallbackUserFunction callbackFunction = nullptr,
456  RequestCallbackUserData callbackData = nullptr);
457 
489  [[nodiscard]] std::shared_ptr<Request> memPut(
490  const void* const buffer,
491  size_t length,
492  uint64_t remoteAddr,
493  ucp_rkey_h rkey,
494  const bool enablePythonFuture = false,
495  RequestCallbackUserFunction callbackFunction = nullptr,
496  RequestCallbackUserData callbackData = nullptr);
497 
531  [[nodiscard]] std::shared_ptr<Request> memPut(
532  const void* const buffer,
533  size_t length,
534  std::shared_ptr<ucxx::RemoteKey> remoteKey,
535  uint64_t remoteAddrOffset = 0,
536  const bool enablePythonFuture = false,
537  RequestCallbackUserFunction callbackFunction = nullptr,
538  RequestCallbackUserData callbackData = nullptr);
539 
571  [[nodiscard]] std::shared_ptr<Request> memGet(
572  void* buffer,
573  size_t length,
574  uint64_t remoteAddr,
575  ucp_rkey_h rkey,
576  const bool enablePythonFuture = false,
577  RequestCallbackUserFunction callbackFunction = nullptr,
578  RequestCallbackUserData callbackData = nullptr);
579 
613  [[nodiscard]] std::shared_ptr<Request> memGet(
614  void* buffer,
615  size_t length,
616  std::shared_ptr<ucxx::RemoteKey> remoteKey,
617  uint64_t remoteAddrOffset = 0,
618  const bool enablePythonFuture = false,
619  RequestCallbackUserFunction callbackFunction = nullptr,
620  RequestCallbackUserData callbackData = nullptr);
621 
641  [[nodiscard]] std::shared_ptr<Request> streamSend(const void* const buffer,
642  size_t length,
643  const bool enablePythonFuture);
644 
665  [[nodiscard]] std::shared_ptr<Request> streamRecv(void* buffer,
666  size_t length,
667  const bool enablePythonFuture);
668 
698  [[nodiscard]] std::shared_ptr<Request> tagSend(
699  const void* const buffer,
700  size_t length,
701  Tag tag,
702  const bool enablePythonFuture = false,
703  RequestCallbackUserFunction callbackFunction = nullptr,
704  RequestCallbackUserData callbackData = nullptr);
705 
737  [[nodiscard]] std::shared_ptr<Request> tagRecv(
738  void* buffer,
739  size_t length,
740  Tag tag,
741  TagMask tagMask,
742  const bool enablePythonFuture = false,
743  RequestCallbackUserFunction callbackFunction = nullptr,
744  RequestCallbackUserData callbackData = nullptr);
745 
779  [[nodiscard]] std::shared_ptr<Request> tagMultiSend(const std::vector<const void*>& buffer,
780  const std::vector<size_t>& size,
781  const std::vector<int>& isCUDA,
782  const Tag tag,
783  const bool enablePythonFuture);
784 
807  [[nodiscard]] std::shared_ptr<Request> tagMultiRecv(const Tag tag,
808  const TagMask tagMask,
809  const bool enablePythonFuture);
810 
838  [[nodiscard]] std::shared_ptr<Request> flush(
839  const bool enablePythonFuture = false,
840  RequestCallbackUserFunction callbackFunction = nullptr,
841  RequestCallbackUserData callbackData = nullptr);
842 
853  [[nodiscard]] std::shared_ptr<Worker> getWorker();
854 
902  [[nodiscard]] std::shared_ptr<Request> close(
903  const bool enablePythonFuture = false,
904  EndpointCloseCallbackUserFunction callbackFunction = nullptr,
905  EndpointCloseCallbackUserData callbackData = nullptr);
906 
930  void closeBlocking(uint64_t period = 0, uint64_t maxAttempts = 1);
931 };
932 
933 } // namespace ucxx
A UCXX component class to prevent early destruction of parent object.
Definition: component.h:17
Component encapsulating a UCP endpoint.
Definition: endpoint.h:50
std::shared_ptr< Request > flush(const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a flush operation.
void removeInflightRequest(std::shared_ptr< Request > request)
Remove reference to request from internal container.
std::shared_ptr< Request > close(const bool enablePythonFuture=false, EndpointCloseCallbackUserFunction callbackFunction=nullptr, EndpointCloseCallbackUserData callbackData=nullptr)
Enqueue a non-blocking endpoint close operation.
std::shared_ptr< Request > tagMultiSend(const std::vector< const void * > &buffer, const std::vector< size_t > &size, const std::vector< int > &isCUDA, const Tag tag, const bool enablePythonFuture)
Enqueue a multi-buffer tag send operation.
bool isAlive() const
Check whether the endpoint is still alive.
size_t cancelInflightRequests()
Cancel inflight requests.
std::shared_ptr< Request > amSend(const void *const buffer, const size_t length, const AmSendParams ¶ms, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue an active message send operation with explicit policy parameters.
std::shared_ptr< Request > streamSend(const void *const buffer, size_t length, const bool enablePythonFuture)
Enqueue a stream send operation.
std::shared_ptr< Request > streamRecv(void *buffer, size_t length, const bool enablePythonFuture)
Enqueue a stream receive operation.
size_t cancelInflightRequestsBlocking(uint64_t period=0, uint64_t maxAttempts=1)
Cancel inflight requests.
std::shared_ptr< Request > tagMultiRecv(const Tag tag, const TagMask tagMask, const bool enablePythonFuture)
Enqueue a multi-buffer tag receive operation.
std::shared_ptr< Request > amRecv(const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue an active message receive operation.
void setCloseCallback(EndpointCloseCallbackUserFunction closeCallback, EndpointCloseCallbackUserData closeCallbackArg)
Register a user-defined callback to call when endpoint closes.
friend std::shared_ptr< Endpoint > createEndpointFromWorkerAddress(std::shared_ptr< Worker > worker, std::shared_ptr< Address > address, bool endpointErrorHandling)
Constructor for shared_ptr<ucxx::Endpoint>.
std::shared_ptr< Request > memGet(void *buffer, size_t length, std::shared_ptr< ucxx::RemoteKey > remoteKey, uint64_t remoteAddrOffset=0, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a memory get operation.
std::shared_ptr< Worker > getWorker()
Get ucxx::Worker component from a worker or listener object.
friend void endpointErrorCallback(void *arg, ucp_ep_h ep, ucs_status_t status)
The error callback registered at endpoint creation time.
std::shared_ptr< Request > tagSend(const void *const buffer, size_t length, Tag tag, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a tag send operation.
ucp_ep_h getHandle()
Get the underlying ucp_ep_h handle.
size_t getCancelingSize() const
Check the number of inflight requests being canceled.
void raiseOnError()
Raises an exception if an error occurred.
std::shared_ptr< Request > memPut(const void *const buffer, size_t length, uint64_t remoteAddr, ucp_rkey_h rkey, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a memory put operation.
std::shared_ptr< Request > memGet(void *buffer, size_t length, uint64_t remoteAddr, ucp_rkey_h rkey, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a memory get operation.
std::shared_ptr< Request > tagRecv(void *buffer, size_t length, Tag tag, TagMask tagMask, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a tag receive operation.
friend std::shared_ptr< Endpoint > createEndpointFromHostname(std::shared_ptr< Worker > worker, std::string ipAddress, uint16_t port, bool endpointErrorHandling)
Constructor for shared_ptr<ucxx::Endpoint>.
friend std::shared_ptr< Endpoint > createEndpointFromConnRequest(std::shared_ptr< Listener > listener, ucp_conn_request_h connRequest, bool endpointErrorHandling)
Constructor for shared_ptr<ucxx::Endpoint>.
void closeBlocking(uint64_t period=0, uint64_t maxAttempts=1)
Close the endpoint while keeping the object alive.
std::shared_ptr< Request > amSend(const void *const buffer, const size_t length, const ucs_memory_type_t memoryType, const std::optional< AmReceiverCallbackInfo > receiverCallbackInfo=std::nullopt, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue an active message send operation.
std::shared_ptr< Request > amSend(std::vector< ucp_dt_iov_t > iov, const AmSendParams ¶ms, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue an active message send operation with IOV datatype.
std::shared_ptr< Request > memPut(const void *const buffer, size_t length, std::shared_ptr< ucxx::RemoteKey > remoteKey, uint64_t remoteAddrOffset=0, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a memory put operation.
Definition: address.h:15
std::function< void(ucs_status_t, std::shared_ptr< void >)> RequestCallbackUserFunction
A user-defined function to execute as part of a ucxx::Request callback.
Definition: typedefs.h:96
std::shared_ptr< void > RequestCallbackUserData
Data for the user-defined function provided to the ucxx::Request callback.
Definition: typedefs.h:104
RequestCallbackUserData EndpointCloseCallbackUserData
Data for the user-defined function provided to endpoint close callback.
Definition: typedefs.h:120
RequestCallbackUserFunction EndpointCloseCallbackUserFunction
A user-defined function to execute after an endpoint closes.
Definition: typedefs.h:112
TagMask
Strong type for a UCP tag mask.
Definition: typedefs.h:73
Tag
Strong type for a UCP tag.
Definition: typedefs.h:65
Parameters controlling Active Message send behavior.
Definition: typedefs.h:193
Deleter for a endpoint parameters object.
Definition: endpoint.h:33
void operator()(ucp_ep_params_t *ptr)
Execute the deletion.