LCOV - code coverage report
Current view: top level - http/client - client_connection.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 98.0 % 49 48
Test Date: 2026-02-20 15:38:22 Functions: 93.3 % 15 14

            Line data    Source code
       1              : #include "client_connection.hpp"
       2              : #include "../../util/logger.hpp"
       3              : #include "../../util/compression.hpp"
       4              : 
       5              : namespace thinger::http {
       6              : 
       7              : std::atomic<unsigned long> client_connection::connections(0);
       8              : 
       9       767524 : client_connection::client_connection(std::shared_ptr<thinger::asio::socket> socket,
      10       767524 :                                      std::chrono::seconds timeout)
      11       767524 :     : socket_(std::move(socket))
      12      1535048 :     , timeout_(timeout) {
      13       767524 :     ++connections;
      14      1535048 :     LOG_TRACE("created http client connection with timeout: {} seconds. total: {}",
      15              :               timeout.count(), connections.load());
      16       767524 : }
      17              : 
      18           48 : client_connection::client_connection(std::shared_ptr<thinger::asio::unix_socket> socket,
      19              :                                      const std::string& path,
      20           48 :                                      std::chrono::seconds timeout)
      21           48 :     : socket_(std::move(socket))
      22           48 :     , socket_path_(path)
      23           96 :     , timeout_(timeout) {
      24           48 :     ++connections;
      25           96 :     LOG_TRACE("created http client connection (for unix socket: {}) with timeout: {} seconds. total: {}",
      26              :               path, timeout.count(), connections.load());
      27           48 : }
      28              : 
      29       767572 : client_connection::~client_connection() {
      30       767572 :     --connections;
      31      1535144 :     LOG_TRACE("releasing http client connection. total: {}", connections.load());
      32       767572 : }
      33              : 
      34          871 : awaitable<void> client_connection::ensure_connected(const http_request& request) {
      35              :     if (socket_->is_open()) {
      36              :         co_return;
      37              :     }
      38              : 
      39              :     LOG_TRACE("connecting to: {}:{}", request.get_host(), request.get_port());
      40              : 
      41              :     for (unsigned retry = 0; retry < MAX_RETRIES; ++retry) {
      42              :         boost::system::error_code ec;
      43              :         if (socket_path_.empty()) {
      44              :             ec = co_await socket_->connect(request.get_host(), request.get_port(), CONNECT_TIMEOUT);
      45              :         } else {
      46              :             auto unix_socket = std::static_pointer_cast<thinger::asio::unix_socket>(socket_);
      47              :             ec = co_await unix_socket->connect(socket_path_, CONNECT_TIMEOUT);
      48              :         }
      49              : 
      50              :         if (!ec) {
      51              :             LOG_TRACE("connection established");
      52              :             co_return;
      53              :         }
      54              : 
      55              :         LOG_ERROR("error while connecting (attempt #{}): {} ({}) ({})",
      56              :                   retry + 1, ec.message(), ec.value(), ec.category().name());
      57              : 
      58              :         // Don't retry for certain errors
      59              :         if (ec == boost::asio::error::operation_aborted ||
      60              :             ec == boost::asio::error::host_not_found) {
      61              :             co_return;
      62              :         }
      63              : 
      64              :         if (retry + 1 >= MAX_RETRIES) {
      65              :             co_return;
      66              :         }
      67              : 
      68              :         // Close and retry
      69              :         socket_->close();
      70              :     }
      71         1742 : }
      72              : 
      73          864 : awaitable<std::shared_ptr<http_response>> client_connection::read_response(bool head_request) {
      74              :     response_parser_.reset();
      75              : 
      76              :     while (true) {
      77              :         auto bytes = co_await socket_->read_some(buffer_, MAX_BUFFER_SIZE);
      78              : 
      79              :         if (bytes == 0) {
      80              :             co_return nullptr;
      81              :         }
      82              : 
      83              :         boost::tribool result = response_parser_.parse(buffer_, buffer_ + bytes, head_request);
      84              : 
      85              :         if (result) {
      86              :             // Successfully parsed response
      87              :             auto response = response_parser_.consume_response();
      88              : 
      89              :             // Decompress if needed
      90              :             if (response && response->has_header("Content-Encoding")) {
      91              :                 std::string encoding = response->get_header("Content-Encoding");
      92              :                 if (encoding == "gzip") {
      93              :                     auto decompressed = ::thinger::util::gzip::decompress(response->get_content());
      94              :                     if (decompressed) {
      95              :                         response->set_content(std::move(*decompressed));
      96              :                         response->remove_header("Content-Encoding");
      97              :                     } else {
      98              :                         LOG_ERROR("Failed to decompress gzip response");
      99              :                     }
     100              :                 } else if (encoding == "deflate") {
     101              :                     auto decompressed = ::thinger::util::deflate::decompress(response->get_content());
     102              :                     if (decompressed) {
     103              :                         response->set_content(std::move(*decompressed));
     104              :                         response->remove_header("Content-Encoding");
     105              :                     } else {
     106              :                         LOG_ERROR("Failed to decompress deflate response");
     107              :                     }
     108              :                 }
     109              :             }
     110              : 
     111              :             co_return response;
     112              :         } else if (!result) {
     113              :             // Parse error
     114              :             co_return nullptr;
     115              :         }
     116              :         // else: indeterminate, keep reading
     117              :     }
     118         1728 : }
     119              : 
     120          869 : awaitable<std::shared_ptr<http_response>> client_connection::send_request(
     121              :     std::shared_ptr<http_request> request) {
     122              : 
     123              :     std::shared_ptr<http_response> response;
     124              : 
     125              :     // Timeout timer
     126              :     boost::asio::steady_timer timer(socket_->get_io_context());
     127              :     timer.expires_after(timeout_);
     128              : 
     129          869 :     auto do_request = [&]() -> awaitable<void> {
     130              :         co_await ensure_connected(*request);
     131              :         if (!socket_->is_open()) co_return;
     132              : 
     133              :         request->log("CLIENT->", 0);
     134              :         response_parser_.setOnChunked(request->get_chunked_callback());
     135              : 
     136              :         co_await request->to_socket(socket_);
     137              : 
     138              :         bool is_head = request->get_method() == http::method::HEAD;
     139              :         response = co_await read_response(is_head);
     140              : 
     141              :         if (response && !response->keep_alive()) {
     142              :             socket_->close();
     143              :         }
     144         1738 :     };
     145              : 
     146          869 :     auto do_timeout = [&]() -> awaitable<void> {
     147              :         auto [ec] = co_await timer.async_wait(use_nothrow_awaitable);
     148              :         if (!ec) {
     149              :             LOG_ERROR("Request timeout after {} seconds", timeout_.count());
     150              :             socket_->close();
     151              :         }
     152         1738 :     };
     153              : 
     154              :     co_await (do_request() || do_timeout());
     155              : 
     156              :     co_return response;
     157         1738 : }
     158              : 
     159            2 : awaitable<stream_result> client_connection::send_request_streaming(
     160              :     std::shared_ptr<http_request> request,
     161              :     stream_callback callback) {
     162              : 
     163              :     stream_result result;
     164              : 
     165              :     // Timeout timer
     166              :     boost::asio::steady_timer timer(socket_->get_io_context());
     167              :     timer.expires_after(timeout_);
     168              : 
     169            2 :     auto do_request = [&]() -> awaitable<void> {
     170              :         co_await ensure_connected(*request);
     171              : 
     172              :         if (!socket_->is_open()) {
     173              :             result.error = "Failed to connect";
     174              :             co_return;
     175              :         }
     176              : 
     177              :         request->log("CLIENT->", 0);
     178              :         response_parser_.reset();
     179              : 
     180              :         // Set up streaming callback
     181              :         response_parser_.setOnStreaming(
     182            2 :             [&callback, &result, this](
     183              :                 const std::string_view& data, size_t downloaded, size_t total) -> bool {
     184            2 :                 if (result.status_code == 0) {
     185            2 :                     result.status_code = response_parser_.get_status_code();
     186              :                 }
     187            2 :                 result.bytes_transferred = downloaded;
     188              : 
     189            2 :                 stream_info info{data, downloaded, total, result.status_code};
     190            4 :                 return callback(info);
     191              :             });
     192              : 
     193              :         co_await request->to_socket(socket_);
     194              : 
     195              :         // Read response with streaming
     196              :         while (true) {
     197              :             auto bytes = co_await socket_->read_some(buffer_, MAX_BUFFER_SIZE);
     198              : 
     199              :             if (bytes == 0) {
     200              :                 result.error = "Connection closed";
     201              :                 co_return;
     202              :             }
     203              : 
     204              :             bool is_head = request->get_method() == http::method::HEAD;
     205              :             boost::tribool parse_result = response_parser_.parse(
     206              :                 buffer_, buffer_ + bytes, is_head);
     207              : 
     208              :             if (result.status_code == 0) {
     209              :                 result.status_code = response_parser_.get_status_code();
     210              :             }
     211              : 
     212              :             if (parse_result) {
     213              :                 auto response = response_parser_.consume_response();
     214              :                 if (response) {
     215              :                     result.status_code = response->get_status_code();
     216              :                     if (result.bytes_transferred == 0) {
     217              :                         result.bytes_transferred = response->get_content_size();
     218              :                     }
     219              :                     if (!response->keep_alive()) {
     220              :                         socket_->close();
     221              :                     }
     222              :                 }
     223              :                 co_return;
     224              :             } else if (!parse_result) {
     225              :                 result.error = "Parse error or download aborted";
     226              :                 co_return;
     227              :             }
     228              :         }
     229            4 :     };
     230              : 
     231            2 :     auto do_timeout = [&]() -> awaitable<void> {
     232              :         auto [ec] = co_await timer.async_wait(use_nothrow_awaitable);
     233              :         if (!ec) {
     234              :             LOG_ERROR("Streaming request timeout after {} seconds", timeout_.count());
     235              :             socket_->close();
     236              :             result.error = "Request timeout";
     237              :         }
     238            4 :     };
     239              : 
     240              :     co_await (do_request() || do_timeout());
     241              : 
     242              :     co_return result;
     243            4 : }
     244              : 
     245          115 : void client_connection::close() {
     246          115 :     if (socket_->is_open()) {
     247            0 :         socket_->close();
     248              :     }
     249          115 :     response_parser_.reset();
     250          115 : }
     251              : 
     252           38 : std::shared_ptr<thinger::asio::socket> client_connection::release_socket() {
     253           38 :     socket_->cancel();
     254           38 :     return socket_;
     255              : }
     256              : 
     257              : }
        

Generated by: LCOV version 2.0-1