request.h
1 
5 #pragma once
6 
7 #include <atomic>
8 #include <chrono>
9 #include <memory>
10 #include <string>
11 #include <utility>
12 
13 #include <ucp/api/ucp.h>
14 
15 #include <ucxx/component.h>
16 #include <ucxx/endpoint.h>
17 #include <ucxx/future.h>
18 #include <ucxx/request_data.h>
19 #include <ucxx/typedefs.h>
20 
21 #define ucxx_trace_req_f(_owner, _req, _handle, _name, _message, ...) \
22  ucxx_trace_req("ucxx::Request: %p on %s, UCP handle: %p, op: %s, " _message, \
23  (_req), \
24  (_owner), \
25  (_handle), \
26  (_name), \
27  ##__VA_ARGS__)
28 
29 namespace ucxx {
30 
39 class Request : public Component {
40  protected:
44  struct Attributes {
45  ucs_memory_type memoryType{UCS_MEMORY_TYPE_UNKNOWN};
46  std::string debugString{};
47  };
48 
49  ucs_status_t _status{UCS_INPROGRESS};
50  std::string _status_msg{};
51  void* _request{nullptr};
52  std::shared_ptr<Future> _future{nullptr};
53  std::shared_ptr<Worker> _worker{
54  nullptr};
55  std::shared_ptr<Endpoint> _endpoint{
56  nullptr};
57  std::string _ownerString{
58  "undetermined owner"};
59  std::recursive_mutex _mutex{};
60  data::RequestData _requestData{};
61  std::string _operationName{
62  "request_undefined"};
63  bool _enablePythonFuture{true};
69 
91  Request(std::shared_ptr<Component> endpointOrWorker,
92  const data::RequestData requestData,
93  std::string operationName,
94  const bool enablePythonFuture = false,
95  RequestCallbackUserFunction callbackFunction = nullptr,
96  RequestCallbackUserData callbackData = nullptr);
97 
106  void process();
107 
117  void setStatus(ucs_status_t status);
118 
119  public:
120  Request() = delete;
121  Request(const Request&) = delete;
122  Request& operator=(Request const&) = delete;
123  Request(Request&& o) = delete;
124  Request& operator=(Request&& o) = delete;
125 
132  virtual ~Request();
133 
140  virtual void cancel();
141 
151  [[nodiscard]] ucs_status_t getStatus();
152 
161  [[nodiscard]] void* getFuture();
162 
175  void checkError();
176 
185  [[nodiscard]] bool isCompleted();
186 
200  void callback(void* request, ucs_status_t status);
201 
212  virtual void populateDelayedSubmission() = 0;
213 
224  [[nodiscard]] const std::string& getOwnerString() const;
225 
238  [[nodiscard]] virtual std::shared_ptr<Buffer> getRecvBuffer();
239 
249  [[nodiscard]] virtual std::string getRecvHeader();
250 
274  [[nodiscard]] Attributes queryAttributes();
275 
276  protected:
291  void publishRequest(void* request);
292 };
293 
294 } // namespace ucxx
A UCXX component class to prevent early destruction of parent object.
Definition: component.h:17
Base type for a UCXX transfer request.
Definition: request.h:39
virtual std::string getRecvHeader()
Get the received user header.
void checkError()
Check whether the request completed with an error.
std::string _status_msg
Human-readable status message.
Definition: request.h:50
RequestCallbackUserFunction _callback
Completion callback.
Definition: request.h:64
std::recursive_mutex _mutex
Mutex to prevent checking status while it's being set.
Definition: request.h:59
void * getFuture()
Return the future used to check on state.
data::RequestData _requestData
The operation-specific data to be used in the request.
Definition: request.h:60
bool isCompleted()
Check whether the request has already completed.
RequestCallbackUserData _callbackData
Completion callback data.
Definition: request.h:65
void publishRequest(void *request)
Publish the UCP request handle and capture its attributes.
std::shared_ptr< Endpoint > _endpoint
Endpoint that generated request (if not from worker)
Definition: request.h:55
void callback(void *request, ucs_status_t status)
Callback executed by UCX when request is completed.
std::shared_ptr< Worker > _worker
Worker that generated request (if not from endpoint)
Definition: request.h:53
bool _enablePythonFuture
Whether Python future is enabled for this request.
Definition: request.h:63
virtual std::shared_ptr< Buffer > getRecvBuffer()
Get the received buffer.
virtual ~Request()
ucxx::Request destructor.
ucs_status_t getStatus()
Return the status of the request.
std::shared_ptr< Future > _future
Future to notify upon completion.
Definition: request.h:52
const std::string & getOwnerString() const
Get formatted string with owner type and handle address.
Attributes _requestAttr
Definition: request.h:66
Attributes queryAttributes()
Get the requests's attributes.
std::string _ownerString
String to print owner (endpoint or worker) when logging.
Definition: request.h:57
Request(std::shared_ptr< Component > endpointOrWorker, const data::RequestData requestData, std::string operationName, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Protected constructor of an abstract ucxx::Request.
std::string _operationName
Human-readable operation name, mostly used for log messages.
Definition: request.h:61
ucs_status_t _status
Requests status.
Definition: request.h:49
void setStatus(ucs_status_t status)
Set the request status and notify Python future.
void process()
Perform initial processing of the request to determine if immediate completion.
virtual void cancel()
Cancel the request.
virtual void populateDelayedSubmission()=0
Populate the internal submission dispatcher.
void * _request
Pointer to UCP request.
Definition: request.h:51
Definition: address.h:16
std::function< void(ucs_status_t, std::shared_ptr< void >)> RequestCallbackUserFunction
A user-defined function to execute as part of a ucxx::Request callback.
Definition: typedefs.h:97
std::shared_ptr< void > RequestCallbackUserData
Data for the user-defined function provided to the ucxx::Request callback.
Definition: typedefs.h:105
Request attributes reported by ucp_request_query.
Definition: request.h:44
ucs_memory_type memoryType
Memory type of the request.
Definition: request.h:45
std::string debugString
Stored debug string.
Definition: request.h:46