socket_backend.hpp
1 
6 #pragma once
7 
8 #include <array>
9 #include <memory>
10 #include <mutex>
11 #include <string>
12 #include <string_view>
13 #include <thread>
14 #include <vector>
15 
16 #include <rapidsmpf/bootstrap/backend.hpp>
17 #include <rapidsmpf/bootstrap/bootstrap.hpp>
18 
19 // NOTE: Do not use RAPIDSMPF_EXPECTS or RAPIDSMPF_FAIL in this file.
20 // Using these macros introduces a CUDA dependency via rapidsmpf/error.hpp.
21 // Prefer throwing standard exceptions instead.
22 
24 
31 class SocketServer {
32  public:
43  explicit SocketServer(int nranks);
44 
52 
58  [[nodiscard]] std::string const& address() const noexcept {
59  return address_;
60  }
61 
67  [[nodiscard]] std::string const& token() const noexcept {
68  return token_;
69  }
70 
71  SocketServer(SocketServer const&) = delete;
72  SocketServer& operator=(SocketServer const&) = delete;
73  SocketServer(SocketServer&&) = delete;
74  SocketServer& operator=(SocketServer&&) = delete;
75 
76  private:
77  struct State;
78  std::shared_ptr<State> state_;
79  int listen_fd_{-1};
80  std::array<int, 2> wakeup_pipe_{-1, -1};
81  std::string address_;
82  std::string token_;
83  std::thread accept_thread_;
84  std::mutex handler_mutex_;
85  std::vector<std::thread> handler_threads_;
86 
87  void accept_loop();
88  void handle_connection(int client_fd);
89 };
90 
105 class SocketBackend : public Backend {
106  public:
108  explicit SocketBackend(Context ctx);
109 
110  ~SocketBackend() override;
111 
113  void put(std::string const& key, std::string_view value) override;
114 
116  std::string get(std::string const& key, Duration timeout) override;
117 
119  void barrier() override;
120 
122  void sync() override;
123 
124  private:
125  Context ctx_;
126  int fd_{-1};
127 
128  void send_line(std::string const& line);
129  std::string recv_line();
130  void send_bytes(void const* data, std::size_t n);
131  void recv_bytes(void* data, std::size_t n);
132 };
133 
134 } // namespace rapidsmpf::bootstrap::detail
Abstract interface for bootstrap coordination backends.
Definition: backend.hpp:80
Socket-based coordination backend (client side, runs in each rank).
void sync() override
Ensure all previous put() operations are globally visible.
void barrier() override
Perform a barrier synchronization.
void put(std::string const &key, std::string_view value) override
Store a key-value pair (rank 0 only).
std::string get(std::string const &key, Duration timeout) override
Retrieve a value, blocking until available or timeout occurs.
SocketBackend(Context ctx)
Construct a file backend.
In-process TCP coordination server, intended to run inside rrun.
SocketServer(int nranks)
Start the server.
~SocketServer()
Shut down the server and join all threads.
std::string const & address() const noexcept
Returns the server address as "127.0.0.1:<port>".
std::string const & token() const noexcept
Returns the 64-character hex-encoded 256-bit authentication token.
std::chrono::duration< double > Duration
Type alias for Duration type.
Definition: types.hpp:17
Context information for the current process/rank.
Definition: bootstrap.hpp:25