Line data Source code
1 : #include "client_connection.hpp"
2 : #include "../../util/logger.hpp"
3 : #include "../../util/compression.hpp"
4 :
5 : namespace thinger::http {
6 :
7 : std::atomic<unsigned long> client_connection::connections(0);
8 :
9 767524 : client_connection::client_connection(std::shared_ptr<thinger::asio::socket> socket,
10 767524 : std::chrono::seconds timeout)
11 767524 : : socket_(std::move(socket))
12 1535048 : , timeout_(timeout) {
13 767524 : ++connections;
14 1535048 : LOG_TRACE("created http client connection with timeout: {} seconds. total: {}",
15 : timeout.count(), connections.load());
16 767524 : }
17 :
18 48 : client_connection::client_connection(std::shared_ptr<thinger::asio::unix_socket> socket,
19 : const std::string& path,
20 48 : std::chrono::seconds timeout)
21 48 : : socket_(std::move(socket))
22 48 : , socket_path_(path)
23 96 : , timeout_(timeout) {
24 48 : ++connections;
25 96 : LOG_TRACE("created http client connection (for unix socket: {}) with timeout: {} seconds. total: {}",
26 : path, timeout.count(), connections.load());
27 48 : }
28 :
29 767572 : client_connection::~client_connection() {
30 767572 : --connections;
31 1535144 : LOG_TRACE("releasing http client connection. total: {}", connections.load());
32 767572 : }
33 :
34 871 : awaitable<void> client_connection::ensure_connected(const http_request& request) {
35 : if (socket_->is_open()) {
36 : co_return;
37 : }
38 :
39 : LOG_TRACE("connecting to: {}:{}", request.get_host(), request.get_port());
40 :
41 : for (unsigned retry = 0; retry < MAX_RETRIES; ++retry) {
42 : boost::system::error_code ec;
43 : if (socket_path_.empty()) {
44 : ec = co_await socket_->connect(request.get_host(), request.get_port(), CONNECT_TIMEOUT);
45 : } else {
46 : auto unix_socket = std::static_pointer_cast<thinger::asio::unix_socket>(socket_);
47 : ec = co_await unix_socket->connect(socket_path_, CONNECT_TIMEOUT);
48 : }
49 :
50 : if (!ec) {
51 : LOG_TRACE("connection established");
52 : co_return;
53 : }
54 :
55 : LOG_ERROR("error while connecting (attempt #{}): {} ({}) ({})",
56 : retry + 1, ec.message(), ec.value(), ec.category().name());
57 :
58 : // Don't retry for certain errors
59 : if (ec == boost::asio::error::operation_aborted ||
60 : ec == boost::asio::error::host_not_found) {
61 : co_return;
62 : }
63 :
64 : if (retry + 1 >= MAX_RETRIES) {
65 : co_return;
66 : }
67 :
68 : // Close and retry
69 : socket_->close();
70 : }
71 1742 : }
72 :
73 864 : awaitable<std::shared_ptr<http_response>> client_connection::read_response(bool head_request) {
74 : response_parser_.reset();
75 :
76 : while (true) {
77 : auto bytes = co_await socket_->read_some(buffer_, MAX_BUFFER_SIZE);
78 :
79 : if (bytes == 0) {
80 : co_return nullptr;
81 : }
82 :
83 : boost::tribool result = response_parser_.parse(buffer_, buffer_ + bytes, head_request);
84 :
85 : if (result) {
86 : // Successfully parsed response
87 : auto response = response_parser_.consume_response();
88 :
89 : // Decompress if needed
90 : if (response && response->has_header("Content-Encoding")) {
91 : std::string encoding = response->get_header("Content-Encoding");
92 : if (encoding == "gzip") {
93 : auto decompressed = ::thinger::util::gzip::decompress(response->get_content());
94 : if (decompressed) {
95 : response->set_content(std::move(*decompressed));
96 : response->remove_header("Content-Encoding");
97 : } else {
98 : LOG_ERROR("Failed to decompress gzip response");
99 : }
100 : } else if (encoding == "deflate") {
101 : auto decompressed = ::thinger::util::deflate::decompress(response->get_content());
102 : if (decompressed) {
103 : response->set_content(std::move(*decompressed));
104 : response->remove_header("Content-Encoding");
105 : } else {
106 : LOG_ERROR("Failed to decompress deflate response");
107 : }
108 : }
109 : }
110 :
111 : co_return response;
112 : } else if (!result) {
113 : // Parse error
114 : co_return nullptr;
115 : }
116 : // else: indeterminate, keep reading
117 : }
118 1728 : }
119 :
120 869 : awaitable<std::shared_ptr<http_response>> client_connection::send_request(
121 : std::shared_ptr<http_request> request) {
122 :
123 : std::shared_ptr<http_response> response;
124 :
125 : // Timeout timer
126 : boost::asio::steady_timer timer(socket_->get_io_context());
127 : timer.expires_after(timeout_);
128 :
129 869 : auto do_request = [&]() -> awaitable<void> {
130 : co_await ensure_connected(*request);
131 : if (!socket_->is_open()) co_return;
132 :
133 : request->log("CLIENT->", 0);
134 : response_parser_.setOnChunked(request->get_chunked_callback());
135 :
136 : co_await request->to_socket(socket_);
137 :
138 : bool is_head = request->get_method() == http::method::HEAD;
139 : response = co_await read_response(is_head);
140 :
141 : if (response && !response->keep_alive()) {
142 : socket_->close();
143 : }
144 1738 : };
145 :
146 869 : auto do_timeout = [&]() -> awaitable<void> {
147 : auto [ec] = co_await timer.async_wait(use_nothrow_awaitable);
148 : if (!ec) {
149 : LOG_ERROR("Request timeout after {} seconds", timeout_.count());
150 : socket_->close();
151 : }
152 1738 : };
153 :
154 : co_await (do_request() || do_timeout());
155 :
156 : co_return response;
157 1738 : }
158 :
159 2 : awaitable<stream_result> client_connection::send_request_streaming(
160 : std::shared_ptr<http_request> request,
161 : stream_callback callback) {
162 :
163 : stream_result result;
164 :
165 : // Timeout timer
166 : boost::asio::steady_timer timer(socket_->get_io_context());
167 : timer.expires_after(timeout_);
168 :
169 2 : auto do_request = [&]() -> awaitable<void> {
170 : co_await ensure_connected(*request);
171 :
172 : if (!socket_->is_open()) {
173 : result.error = "Failed to connect";
174 : co_return;
175 : }
176 :
177 : request->log("CLIENT->", 0);
178 : response_parser_.reset();
179 :
180 : // Set up streaming callback
181 : response_parser_.setOnStreaming(
182 2 : [&callback, &result, this](
183 : const std::string_view& data, size_t downloaded, size_t total) -> bool {
184 2 : if (result.status_code == 0) {
185 2 : result.status_code = response_parser_.get_status_code();
186 : }
187 2 : result.bytes_transferred = downloaded;
188 :
189 2 : stream_info info{data, downloaded, total, result.status_code};
190 4 : return callback(info);
191 : });
192 :
193 : co_await request->to_socket(socket_);
194 :
195 : // Read response with streaming
196 : while (true) {
197 : auto bytes = co_await socket_->read_some(buffer_, MAX_BUFFER_SIZE);
198 :
199 : if (bytes == 0) {
200 : result.error = "Connection closed";
201 : co_return;
202 : }
203 :
204 : bool is_head = request->get_method() == http::method::HEAD;
205 : boost::tribool parse_result = response_parser_.parse(
206 : buffer_, buffer_ + bytes, is_head);
207 :
208 : if (result.status_code == 0) {
209 : result.status_code = response_parser_.get_status_code();
210 : }
211 :
212 : if (parse_result) {
213 : auto response = response_parser_.consume_response();
214 : if (response) {
215 : result.status_code = response->get_status_code();
216 : if (result.bytes_transferred == 0) {
217 : result.bytes_transferred = response->get_content_size();
218 : }
219 : if (!response->keep_alive()) {
220 : socket_->close();
221 : }
222 : }
223 : co_return;
224 : } else if (!parse_result) {
225 : result.error = "Parse error or download aborted";
226 : co_return;
227 : }
228 : }
229 4 : };
230 :
231 2 : auto do_timeout = [&]() -> awaitable<void> {
232 : auto [ec] = co_await timer.async_wait(use_nothrow_awaitable);
233 : if (!ec) {
234 : LOG_ERROR("Streaming request timeout after {} seconds", timeout_.count());
235 : socket_->close();
236 : result.error = "Request timeout";
237 : }
238 4 : };
239 :
240 : co_await (do_request() || do_timeout());
241 :
242 : co_return result;
243 4 : }
244 :
245 115 : void client_connection::close() {
246 115 : if (socket_->is_open()) {
247 0 : socket_->close();
248 : }
249 115 : response_parser_.reset();
250 115 : }
251 :
252 38 : std::shared_ptr<thinger::asio::socket> client_connection::release_socket() {
253 38 : socket_->cancel();
254 38 : return socket_;
255 : }
256 :
257 : }
|