Line data Source code
1 : #include "unix_socket_server.hpp"
2 : #include "workers.hpp"
3 : #include "../util/logger.hpp"
4 : #include <filesystem>
5 :
6 : namespace thinger::asio {
7 :
8 : // Constructor with io_context providers
9 57 : unix_socket_server::unix_socket_server(std::string unix_path,
10 : io_context_provider acceptor_context_provider,
11 : io_context_provider connection_context_provider,
12 : std::set<std::string> allowed_remotes,
13 57 : std::set<std::string> forbidden_remotes)
14 57 : : socket_server_base(std::move(acceptor_context_provider),
15 57 : std::move(connection_context_provider),
16 57 : std::move(allowed_remotes),
17 57 : std::move(forbidden_remotes))
18 228 : , unix_path_(std::move(unix_path))
19 : {
20 57 : }
21 :
22 : // Legacy constructor for backward compatibility
23 0 : unix_socket_server::unix_socket_server(std::string unix_path,
24 : std::set<std::string> allowed_remotes,
25 0 : std::set<std::string> forbidden_remotes)
26 : : unix_socket_server(unix_path,
27 0 : []() -> boost::asio::io_context& { return get_workers().get_thread_io_context(); },
28 0 : []() -> boost::asio::io_context& { return get_workers().get_next_io_context(); },
29 0 : std::move(allowed_remotes),
30 0 : std::move(forbidden_remotes))
31 : {
32 0 : }
33 :
34 96 : unix_socket_server::~unix_socket_server() {
35 57 : stop();
36 96 : }
37 :
38 114 : bool unix_socket_server::stop() {
39 : // First call base class to set running_ = false
40 114 : socket_server_base::stop();
41 :
42 : // Now close the acceptor safely
43 114 : if (acceptor_) {
44 57 : boost::system::error_code ec;
45 57 : acceptor_->close(ec);
46 57 : if (ec) {
47 0 : LOG_WARNING("Error closing Unix acceptor: {}", ec.message());
48 : }
49 57 : acceptor_.reset();
50 : }
51 :
52 : // Remove the socket file when server is stopped
53 114 : if (!unix_path_.empty()) {
54 114 : std::error_code ec;
55 114 : std::filesystem::remove(unix_path_, ec);
56 114 : if (ec) {
57 0 : LOG_WARNING("Failed to remove Unix socket file {}: {}", unix_path_, ec.message());
58 : }
59 : }
60 :
61 114 : return true;
62 : }
63 :
64 0 : std::string unix_socket_server::get_service_name() const {
65 0 : return "unix_server@" + unix_path_;
66 : }
67 :
68 57 : bool unix_socket_server::create_acceptor() {
69 : // Get io_context from provider
70 57 : boost::asio::io_context& io_context = acceptor_context_provider_();
71 :
72 : // Remove existing socket file if it exists
73 57 : std::error_code ec;
74 57 : std::filesystem::remove(unix_path_, ec);
75 :
76 : // Create the endpoint
77 57 : boost::asio::local::stream_protocol::endpoint endpoint(unix_path_);
78 :
79 57 : int num_attempts = 0;
80 57 : bool success = false;
81 :
82 : do {
83 57 : LOG_DEBUG("starting Unix socket acceptor on {}", unix_path_);
84 57 : if (num_attempts > 0) {
85 0 : std::this_thread::sleep_for(std::chrono::seconds(5));
86 : }
87 :
88 : try {
89 57 : acceptor_ = std::make_unique<boost::asio::local::stream_protocol::acceptor>(io_context);
90 57 : acceptor_->open(endpoint.protocol());
91 57 : acceptor_->bind(endpoint);
92 57 : acceptor_->listen();
93 57 : success = true;
94 0 : } catch (boost::system::system_error& error) {
95 0 : LOG_ERROR("cannot start listening on Unix socket {}: {}",
96 : unix_path_, error.code().message());
97 0 : if (max_listening_attempts_ >= 0 && num_attempts >= max_listening_attempts_) {
98 0 : return false;
99 : }
100 0 : }
101 57 : num_attempts++;
102 57 : } while (!success && (max_listening_attempts_ < 0 || num_attempts < max_listening_attempts_));
103 :
104 57 : if (success) {
105 57 : LOG_INFO("Unix socket server is now listening on {}", unix_path_);
106 :
107 : // Set permissions to allow access (you might want to make this configurable)
108 57 : std::filesystem::permissions(unix_path_,
109 : std::filesystem::perms::owner_all |
110 : std::filesystem::perms::group_read |
111 : std::filesystem::perms::group_write,
112 : std::filesystem::perm_options::replace);
113 : }
114 :
115 57 : return success;
116 : }
117 :
118 120 : void unix_socket_server::accept_connection() {
119 : // Use async_accept with move semantics for the socket
120 120 : acceptor_->async_accept([this](const boost::system::error_code& e,
121 : boost::asio::local::stream_protocol::socket peer) mutable {
122 117 : if (!e) {
123 : // Create unix_socket with the already-connected socket
124 66 : auto sock = std::make_shared<unix_socket>("unix_socket_server", std::move(peer));
125 :
126 66 : LOG_INFO("received connection on Unix socket: {}", unix_path_);
127 :
128 : // Call handler with the connected socket
129 66 : if (handler_) {
130 66 : handler_(std::move(sock));
131 : }
132 :
133 : // Continue accepting connections
134 66 : if (running_) accept_connection();
135 66 : } else {
136 51 : if (e != boost::asio::error::operation_aborted) {
137 0 : LOG_ERROR("cannot accept more Unix socket connections: {}", e.message());
138 0 : if (running_) {
139 : // Retry after a delay to avoid tight loop on persistent errors
140 : auto timer = std::make_shared<boost::asio::steady_timer>(
141 0 : acceptor_context_provider_(),
142 0 : std::chrono::seconds(1)
143 0 : );
144 0 : timer->async_wait([this, timer](const boost::system::error_code& e) {
145 0 : if (e != boost::asio::error::operation_aborted) {
146 0 : accept_connection();
147 : }
148 0 : });
149 0 : }
150 : } else {
151 51 : LOG_INFO("stop accepting Unix socket connections");
152 : }
153 : }
154 117 : });
155 120 : }
156 :
157 : } // namespace thinger::asio
|