request_am.h
1 
5 #include <functional>
6 #include <map>
7 #include <memory>
8 #include <mutex>
9 #include <queue>
10 #include <string>
11 #include <unordered_map>
12 #include <vector>
13 
14 #include <ucp/api/ucp.h>
15 
16 #include <ucxx/typedefs.h>
17 
18 namespace ucxx {
19 
20 class Buffer;
21 class InflightRequests;
22 class RequestAm;
23 class Request;
24 class Worker;
25 
26 namespace internal {
27 
28 class AmData;
29 
37  public:
39  ucp_ep_h _ep{nullptr};
40  std::shared_ptr<RequestAm> _request{
41  nullptr};
42  std::shared_ptr<Buffer> _buffer{nullptr};
43 
44  RecvAmMessage() = delete;
45  RecvAmMessage(const RecvAmMessage&) = delete;
46  RecvAmMessage& operator=(RecvAmMessage const&) = delete;
47  RecvAmMessage(RecvAmMessage&& o) = delete;
48  RecvAmMessage& operator=(RecvAmMessage&& o) = delete;
49 
64  ucp_ep_h ep,
65  std::shared_ptr<RequestAm> request,
66  std::shared_ptr<Buffer> buffer,
68  std::vector<std::byte> userHeader = {});
69 
80  void callback(void* request, ucs_status_t status);
81 };
82 
83 typedef std::unordered_map<ucp_ep_h, std::queue<std::shared_ptr<RequestAm>>> AmPoolType;
84 typedef std::map<std::shared_ptr<RequestAm>,
85  std::shared_ptr<RecvAmMessage>,
86  std::owner_less<std::shared_ptr<RequestAm>>>
87  RecvAmMessageMapType;
88 
89 typedef std::unordered_map<AmReceiverCallbackIdType, AmReceiverCallbackType>
90  AmReceiverCallbackMapType;
91 typedef std::
92  unordered_map<AmReceiverCallbackOwnerType, AmReceiverCallbackMapType, AmReceiverCallbackOwnerHash>
93  AmReceiverCallbackOwnerMapType;
94 
102 class AmData {
103  public:
104  std::weak_ptr<Worker> _worker{};
105  std::string _ownerString{};
106  AmPoolType _recvPool{};
107  AmPoolType _recvWait{};
108  RecvAmMessageMapType
110  AmReceiverCallbackOwnerMapType
113  std::mutex _mutex{};
114  std::function<void(std::shared_ptr<Request>)>
116  std::unordered_map<ucs_memory_type_t, AmAllocatorType>
118 };
119 
120 } // namespace internal
121 
122 } // namespace ucxx
Active Message data owned by a ucxx::Worker.
Definition: request_am.h:102
RecvAmMessageMapType _recvAmMessageMap
The active messages waiting to be handled by callback.
Definition: request_am.h:109
AmReceiverCallbackOwnerMapType _receiverCallbacks
Definition: request_am.h:111
AmPoolType _recvPool
The pool of completed receive requests (waiting for user request)
Definition: request_am.h:106
std::unordered_map< ucs_memory_type_t, AmAllocatorType > _allocators
Default and user-defined active message allocators.
Definition: request_am.h:117
std::string _ownerString
The owner string used for logging.
Definition: request_am.h:105
std::weak_ptr< Worker > _worker
The worker to which the Active Message callback belongs.
Definition: request_am.h:104
std::mutex _mutex
Mutex to provide access to pools/maps.
Definition: request_am.h:113
AmPoolType _recvWait
The pool of user receive requests (waiting for message arrival)
Definition: request_am.h:107
std::function< void(std::shared_ptr< Request >)> _registerInflightRequest
Worker function to register inflight requests with.
Definition: request_am.h:115
Handle receiving of a ucxx::RequestAm.
Definition: request_am.h:36
ucp_ep_h _ep
Handle containing address of the reply endpoint.
Definition: request_am.h:39
void callback(void *request, ucs_status_t status)
Execute the ucxx::Request::callback().
std::shared_ptr< Buffer > _buffer
Buffer containing the received data.
Definition: request_am.h:42
RecvAmMessage(internal::AmData *amData, ucp_ep_h ep, std::shared_ptr< RequestAm > request, std::shared_ptr< Buffer > buffer, AmReceiverCallbackType receiverCallback=AmReceiverCallbackType(), std::vector< std::byte > userHeader={})
Constructor of ucxx::RecvAmMessage.
internal::AmData * _amData
Active messages data.
Definition: request_am.h:38
std::shared_ptr< RequestAm > _request
Request which will later be notified/delivered to user.
Definition: request_am.h:40
Definition: address.h:16
std::function< void(std::shared_ptr< Request >, ucp_ep_h)> AmReceiverCallbackType
Active Message receiver callback.
Definition: typedefs.h:138