request_am.h
1 
5 #include <functional>
6 #include <map>
7 #include <memory>
8 #include <mutex>
9 #include <queue>
10 #include <string>
11 #include <unordered_map>
12 #include <vector>
13 
14 #include <ucp/api/ucp.h>
15 
16 #include <ucxx/typedefs.h>
17 
18 namespace ucxx {
19 
20 class Buffer;
21 class InflightRequests;
22 class RequestAm;
23 class Request;
24 class Worker;
25 
26 namespace internal {
27 
28 class AmData;
29 
37  public:
39  ucp_ep_h _ep{nullptr};
40  std::shared_ptr<RequestAm> _request{
41  nullptr};
42  std::shared_ptr<Buffer> _buffer{nullptr};
43 
44  RecvAmMessage() = delete;
45  RecvAmMessage(const RecvAmMessage&) = delete;
46  RecvAmMessage& operator=(RecvAmMessage const&) = delete;
47  RecvAmMessage(RecvAmMessage&& o) = delete;
48  RecvAmMessage& operator=(RecvAmMessage&& o) = delete;
49 
64  ucp_ep_h ep,
65  std::shared_ptr<RequestAm> request,
66  std::shared_ptr<Buffer> buffer,
68  std::vector<std::byte> userHeader = {});
69 
77  void setUcpRequest(void* request);
78 
89  void callback(void* request, ucs_status_t status);
90 };
91 
92 typedef std::unordered_map<ucp_ep_h, std::queue<std::shared_ptr<RequestAm>>> AmPoolType;
93 typedef std::map<std::shared_ptr<RequestAm>,
94  std::shared_ptr<RecvAmMessage>,
95  std::owner_less<std::shared_ptr<RequestAm>>>
96  RecvAmMessageMapType;
97 
98 typedef std::unordered_map<AmReceiverCallbackIdType, AmReceiverCallbackType>
99  AmReceiverCallbackMapType;
100 typedef std::unordered_map<AmReceiverCallbackOwnerType, AmReceiverCallbackMapType>
101  AmReceiverCallbackOwnerMapType;
102 
110 class AmData {
111  public:
112  std::weak_ptr<Worker> _worker{};
113  std::string _ownerString{};
114  AmPoolType _recvPool{};
115  AmPoolType _recvWait{};
116  RecvAmMessageMapType
118  AmReceiverCallbackOwnerMapType
121  std::mutex _mutex{};
122  std::function<void(std::shared_ptr<Request>)>
124  std::unordered_map<ucs_memory_type_t, AmAllocatorType>
126 };
127 
128 } // namespace internal
129 
130 } // namespace ucxx
Active Message data owned by a ucxx::Worker.
Definition: request_am.h:110
RecvAmMessageMapType _recvAmMessageMap
The active messages waiting to be handled by callback.
Definition: request_am.h:117
AmReceiverCallbackOwnerMapType _receiverCallbacks
Definition: request_am.h:119
AmPoolType _recvPool
The pool of completed receive requests (waiting for user request)
Definition: request_am.h:114
std::unordered_map< ucs_memory_type_t, AmAllocatorType > _allocators
Default and user-defined active message allocators.
Definition: request_am.h:125
std::string _ownerString
The owner string used for logging.
Definition: request_am.h:113
std::weak_ptr< Worker > _worker
The worker to which the Active Message callback belongs.
Definition: request_am.h:112
std::mutex _mutex
Mutex to provide access to pools/maps.
Definition: request_am.h:121
AmPoolType _recvWait
The pool of user receive requests (waiting for message arrival)
Definition: request_am.h:115
std::function< void(std::shared_ptr< Request >)> _registerInflightRequest
Worker function to register inflight requests with.
Definition: request_am.h:123
Handle receiving of a ucxx::RequestAm.
Definition: request_am.h:36
ucp_ep_h _ep
Handle containing address of the reply endpoint.
Definition: request_am.h:39
void callback(void *request, ucs_status_t status)
Execute the ucxx::Request::callback().
std::shared_ptr< Buffer > _buffer
Buffer containing the received data.
Definition: request_am.h:42
RecvAmMessage(internal::AmData *amData, ucp_ep_h ep, std::shared_ptr< RequestAm > request, std::shared_ptr< Buffer > buffer, AmReceiverCallbackType receiverCallback=AmReceiverCallbackType(), std::vector< std::byte > userHeader={})
Constructor of ucxx::RecvAmMessage.
void setUcpRequest(void *request)
Set the UCP request.
internal::AmData * _amData
Active messages data.
Definition: request_am.h:38
std::shared_ptr< RequestAm > _request
Request which will later be notified/delivered to user.
Definition: request_am.h:40
Definition: address.h:15
std::function< void(std::shared_ptr< Request >, ucp_ep_h)> AmReceiverCallbackType
Active Message receiver callback.
Definition: typedefs.h:137