Line data Source code
1 : #include "unix_socket_server.hpp"
2 : #include "workers.hpp"
3 : #include "../util/logger.hpp"
4 : #include <filesystem>
5 :
6 : namespace thinger::asio {
7 :
8 : // Constructor with io_context providers
9 69 : unix_socket_server::unix_socket_server(std::string unix_path,
10 : io_context_provider acceptor_context_provider,
11 : io_context_provider connection_context_provider,
12 : std::set<std::string> allowed_remotes,
13 69 : std::set<std::string> forbidden_remotes)
14 69 : : socket_server_base(std::move(acceptor_context_provider),
15 69 : std::move(connection_context_provider),
16 69 : std::move(allowed_remotes),
17 69 : std::move(forbidden_remotes))
18 276 : , unix_path_(std::move(unix_path))
19 : {
20 69 : }
21 :
22 : // Legacy constructor for backward compatibility
23 0 : unix_socket_server::unix_socket_server(std::string unix_path,
24 : std::set<std::string> allowed_remotes,
25 0 : std::set<std::string> forbidden_remotes)
26 : : unix_socket_server(unix_path,
27 0 : []() -> boost::asio::io_context& { return get_workers().get_thread_io_context(); },
28 0 : []() -> boost::asio::io_context& { return get_workers().get_next_io_context(); },
29 0 : std::move(allowed_remotes),
30 0 : std::move(forbidden_remotes))
31 : {
32 0 : }
33 :
34 117 : unix_socket_server::~unix_socket_server() {
35 69 : stop();
36 117 : }
37 :
38 138 : bool unix_socket_server::stop() {
39 : // First call base class to set running_ = false
40 138 : socket_server_base::stop();
41 :
42 : // Close the acceptor to cancel pending async operations, but do NOT
43 : // destroy it (reset) here. The async_accept handler may still be in
44 : // flight on the io_context thread and needs the acceptor alive until
45 : // the handler completes. The unique_ptr will clean up on destruction.
46 138 : if (acceptor_ && acceptor_->is_open()) {
47 69 : boost::system::error_code ec;
48 69 : acceptor_->close(ec);
49 69 : if (ec) {
50 0 : LOG_WARNING("Error closing Unix acceptor: {}", ec.message());
51 : }
52 : }
53 :
54 : // Remove the socket file when server is stopped
55 138 : if (!unix_path_.empty()) {
56 138 : std::error_code ec;
57 138 : std::filesystem::remove(unix_path_, ec);
58 138 : if (ec) {
59 0 : LOG_WARNING("Failed to remove Unix socket file {}: {}", unix_path_, ec.message());
60 : }
61 : }
62 :
63 138 : return true;
64 : }
65 :
66 0 : std::string unix_socket_server::get_service_name() const {
67 0 : return "unix_server@" + unix_path_;
68 : }
69 :
70 69 : bool unix_socket_server::create_acceptor() {
71 : // Get io_context from provider
72 69 : boost::asio::io_context& io_context = acceptor_context_provider_();
73 :
74 : // Remove existing socket file if it exists
75 69 : std::error_code ec;
76 69 : std::filesystem::remove(unix_path_, ec);
77 :
78 : // Create the endpoint
79 69 : boost::asio::local::stream_protocol::endpoint endpoint(unix_path_);
80 :
81 69 : int num_attempts = 0;
82 69 : bool success = false;
83 :
84 : do {
85 69 : LOG_DEBUG("starting Unix socket acceptor on {}", unix_path_);
86 69 : if (num_attempts > 0) {
87 0 : std::this_thread::sleep_for(std::chrono::seconds(5));
88 : }
89 :
90 : try {
91 69 : acceptor_ = std::make_unique<boost::asio::local::stream_protocol::acceptor>(io_context);
92 69 : acceptor_->open(endpoint.protocol());
93 69 : acceptor_->bind(endpoint);
94 69 : acceptor_->listen();
95 69 : success = true;
96 0 : } catch (boost::system::system_error& error) {
97 0 : LOG_ERROR("cannot start listening on Unix socket {}: {}",
98 : unix_path_, error.code().message());
99 0 : if (max_listening_attempts_ >= 0 && num_attempts >= max_listening_attempts_) {
100 0 : return false;
101 : }
102 0 : }
103 69 : num_attempts++;
104 69 : } while (!success && (max_listening_attempts_ < 0 || num_attempts < max_listening_attempts_));
105 :
106 69 : if (success) {
107 69 : LOG_INFO("Unix socket server is now listening on {}", unix_path_);
108 :
109 : // Set permissions to allow access (you might want to make this configurable)
110 69 : std::filesystem::permissions(unix_path_,
111 : std::filesystem::perms::owner_all |
112 : std::filesystem::perms::group_read |
113 : std::filesystem::perms::group_write,
114 : std::filesystem::perm_options::replace);
115 : }
116 :
117 69 : return success;
118 : }
119 :
120 146 : void unix_socket_server::accept_connection() {
121 : // Use async_accept with move semantics for the socket
122 146 : acceptor_->async_accept([this](const boost::system::error_code& e,
123 : boost::asio::local::stream_protocol::socket peer) mutable {
124 144 : if (!e) {
125 : // Create unix_socket with the already-connected socket
126 78 : auto sock = std::make_shared<unix_socket>("unix_socket_server", std::move(peer));
127 :
128 78 : LOG_INFO("received connection on Unix socket: {}", unix_path_);
129 :
130 : // Call handler with the connected socket
131 78 : if (handler_) {
132 78 : handler_(std::move(sock));
133 : }
134 :
135 : // Continue accepting connections
136 78 : if (running_) accept_connection();
137 78 : } else {
138 66 : if (e != boost::asio::error::operation_aborted) {
139 0 : LOG_ERROR("cannot accept more Unix socket connections: {}", e.message());
140 0 : if (running_) {
141 : // Retry after a delay to avoid tight loop on persistent errors
142 : auto timer = std::make_shared<boost::asio::steady_timer>(
143 0 : acceptor_context_provider_(),
144 0 : std::chrono::seconds(1)
145 0 : );
146 0 : timer->async_wait([this, timer](const boost::system::error_code& e) {
147 0 : if (e != boost::asio::error::operation_aborted) {
148 0 : accept_connection();
149 : }
150 0 : });
151 0 : }
152 : } else {
153 66 : LOG_INFO("stop accepting Unix socket connections");
154 : }
155 : }
156 144 : });
157 146 : }
158 :
159 : } // namespace thinger::asio
|