LCOV - code coverage report
Current view: top level - http/server - server_connection.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 77.8 % 90 70
Test Date: 2026-02-20 15:38:22 Functions: 71.4 % 21 15

            Line data    Source code
       1              : #include "server_connection.hpp"
       2              : #include "request.hpp"
       3              : #include "../../util/logger.hpp"
       4              : 
       5              : namespace thinger::http {
       6              : 
       7              : std::atomic<unsigned long> server_connection::connections(0);
       8              : 
       9          846 : server_connection::server_connection(std::shared_ptr<asio::socket> socket)
      10          846 :     : socket_(std::move(socket))
      11         1692 :     , timeout_timer_(socket_->get_io_context()) {
      12          846 :     ++connections;
      13          846 :     LOG_DEBUG("created http server connection total: {}", static_cast<unsigned>(connections));
      14          846 : }
      15              : 
      16          846 : server_connection::~server_connection() {
      17          846 :     --connections;
      18          846 :     LOG_DEBUG("releasing http server connection. total: {}", static_cast<unsigned>(connections));
      19          846 : }
      20              : 
      21          846 : void server_connection::start(std::chrono::seconds timeout) {
      22          846 :     if (running_) return;
      23          846 :     running_ = true;
      24          846 :     timeout_ = timeout;
      25              : 
      26              :     // Start timeout timer
      27          846 :     reset_timeout();
      28              : 
      29              :     // Spawn the read loop coroutine
      30          846 :     co_spawn(socket_->get_io_context(),
      31         2538 :         [self = shared_from_this()]() -> awaitable<void> {
      32              :             co_await self->read_loop();
      33         1692 :         },
      34              :         detached);
      35              : }
      36              : 
      37         4874 : void server_connection::reset_timeout() {
      38         4874 :     timeout_timer_.expires_after(timeout_);
      39         4874 :     timeout_timer_.async_wait([this, self = shared_from_this()](const boost::system::error_code& ec) {
      40         4086 :         if (ec) return; // Timer was cancelled
      41              : 
      42            4 :         LOG_DEBUG("http server connection timed out after {} seconds", timeout_.count());
      43            4 :         close();
      44              :     });
      45         4874 : }
      46              : 
      47           22 : void server_connection::close() {
      48           22 :     running_ = false;
      49           22 :     timeout_timer_.cancel();
      50           22 :     socket_->close();
      51           22 : }
      52              : 
      53          846 : awaitable<void> server_connection::read_loop() {
      54              :     auto self = shared_from_this();
      55              : 
      56              :     // Parse headers only; body reading is managed by the handler layer
      57              :     request_parser_.set_headers_only(true);
      58              : 
      59              :     size_t buffered = 0; // bytes of valid data in buffer_
      60              : 
      61              :     try {
      62              :         while (running_ && socket_->is_open()) {
      63              :             // If no buffered data, read from socket
      64              :             if (buffered == 0) {
      65              :                 auto bytes = co_await socket_->read_some(buffer_, MAX_BUFFER_SIZE);
      66              :                 reset_timeout();
      67              :                 buffered = bytes;
      68              :             }
      69              : 
      70              :             // Parse available data
      71              :             uint8_t* begin = buffer_;
      72              :             uint8_t* end = buffer_ + buffered;
      73              :             boost::tribool result = request_parser_.parse(begin, end);
      74              :             size_t unconsumed = static_cast<size_t>(end - begin);
      75              : 
      76              :             if (result) {
      77              :                 // Successfully parsed headers
      78              :                 auto http_req = request_parser_.consume_request();
      79              :                 http_req->set_ssl(socket_->is_secure());
      80              : 
      81              :                 size_t content_length = http_req->get_content_length();
      82              : 
      83              :                 auto stream = std::make_shared<http_stream>(++request_id_, http_req->keep_alive());
      84              : 
      85              :                 // Add to queue for pipelining
      86              :                 {
      87              :                     std::lock_guard<std::mutex> lock(queue_mutex_);
      88              :                     request_queue_.push(stream);
      89              :                 }
      90              : 
      91              :                 // Log the request
      92              :                 http_req->log("SERVER REQUEST", 0);
      93              : 
      94              :                 // Create request and store read-ahead data
      95              :                 auto req = std::make_shared<request>(self, stream, http_req);
      96              :                 if (unconsumed > 0) {
      97              :                     req->set_read_ahead(begin, unconsumed);
      98              :                 }
      99              : 
     100              :                 // Dispatch to handler (awaitable — handler decides body reading strategy)
     101              :                 if (handler_) {
     102              :                     co_await handler_(req);
     103              :                     reset_timeout();
     104              :                 }
     105              : 
     106              :                 // After handler completes, compute leftover for pipelining.
     107              :                 // The request tracks how many read-ahead bytes remain unconsumed.
     108              :                 // Any leftover read-ahead bytes are pipelined data for the next request.
     109              :                 size_t remaining_ahead = req->read_ahead_available();
     110              :                 if (remaining_ahead > 0) {
     111              :                     // Copy residual read-ahead back to buffer_ for the next iteration
     112              :                     size_t ahead_start = unconsumed - remaining_ahead;
     113              :                     std::memmove(buffer_, begin + ahead_start, remaining_ahead);
     114              :                     buffered = remaining_ahead;
     115              :                 } else {
     116              :                     buffered = 0;
     117              :                 }
     118              : 
     119              :                 // If not keep-alive, stop reading after this request
     120              :                 if (!stream->keep_alive()) {
     121              :                     break;
     122              :                 }
     123              :             } else if (!result) {
     124              :                 // Bad request
     125              :                 LOG_ERROR("invalid http request");
     126              :                 auto stream = std::make_shared<http_stream>(++request_id_, false);
     127              :                 {
     128              :                     std::lock_guard<std::mutex> lock(queue_mutex_);
     129              :                     request_queue_.push(stream);
     130              :                 }
     131              :                 handle_stock_error(stream, http_response::status::bad_request);
     132              :                 break;
     133              :             } else {
     134              :                 // Indeterminate — all data consumed by parser, need more
     135              :                 buffered = 0;
     136              :             }
     137              :         }
     138              :     } catch (const boost::system::system_error& e) {
     139              :         if (e.code() != boost::asio::error::operation_aborted &&
     140              :             e.code() != boost::asio::error::eof) {
     141              :             LOG_ERROR("Error in server connection read loop: {}", e.what());
     142              :         }
     143              :     }
     144              : 
     145              :     // Connection ended
     146              :     running_ = false;
     147              :     timeout_timer_.cancel();
     148         1692 : }
     149              : 
     150          886 : awaitable<void> server_connection::write_frame(std::shared_ptr<http_stream> stream,
     151              :                                                 std::shared_ptr<http_frame> frame) {
     152              :     // Log response
     153              :     frame->log("SERVER RESPONSE", 0);
     154              : 
     155              :     // Write frame to socket
     156              :     co_await frame->to_socket(socket_);
     157              : 
     158              :     // Reset timeout on activity
     159              :     reset_timeout();
     160              : 
     161              :     // Check if stream is complete
     162              :     if (frame->end_stream()) {
     163              :         stream->completed();
     164              : 
     165              :         if (!stream->keep_alive()) {
     166              :             close();
     167              :         } else {
     168              :             // Remove completed stream from queue
     169              :             std::lock_guard<std::mutex> lock(queue_mutex_);
     170              :             if (!request_queue_.empty()) {
     171              :                 request_queue_.pop();
     172              :             }
     173              :         }
     174              :     }
     175         1772 : }
     176              : 
     177         1766 : void server_connection::process_output_queue() {
     178         2628 :     if (writing_) return;
     179              : 
     180         1748 :     std::shared_ptr<http_stream> stream;
     181         1748 :     std::shared_ptr<http_frame> frame;
     182              : 
     183              :     {
     184         1748 :         std::lock_guard<std::mutex> lock(queue_mutex_);
     185         1748 :         if (request_queue_.empty()) return;
     186              : 
     187          904 :         stream = request_queue_.front();
     188          904 :         if (stream->empty_queue()) return;
     189              : 
     190          886 :         frame = stream->current_frame();
     191          886 :         stream->pop_frame();
     192         1748 :     }
     193              : 
     194          886 :     writing_ = true;
     195              : 
     196          886 :     co_spawn(socket_->get_io_context(),
     197         2658 :         [this, self = shared_from_this(), stream, frame]() -> awaitable<void> {
     198              :             try {
     199              :                 co_await write_frame(stream, frame);
     200              :             } catch (const boost::system::system_error& e) {
     201              :                 LOG_ERROR("Error writing frame: {}", e.what());
     202              :                 close();
     203              :             }
     204              :             writing_ = false;
     205              : 
     206              :             // Process more frames if available
     207              :             process_output_queue();
     208         1772 :         },
     209              :         detached);
     210         2610 : }
     211              : 
     212          886 : void server_connection::handle_stream(std::shared_ptr<http_stream> stream,
     213              :                                        std::shared_ptr<http_frame> frame) {
     214          886 :     boost::asio::dispatch(socket_->get_io_context(),
     215         1772 :         [this, self = shared_from_this(), stream, frame] {
     216          886 :             stream->add_frame(frame);
     217              : 
     218          886 :             std::shared_ptr<http_stream> front_stream;
     219              :             {
     220          886 :                 std::lock_guard<std::mutex> lock(queue_mutex_);
     221          886 :                 if (request_queue_.empty()) {
     222            0 :                     LOG_ERROR("trying to send response without a pending request!");
     223            0 :                     return;
     224              :                 }
     225          886 :                 front_stream = request_queue_.front();
     226          886 :             }
     227              : 
     228              :             // Only process if this is the front stream (for pipelining order)
     229          886 :             if (front_stream->id() == stream->id()) {
     230          880 :                 process_output_queue();
     231              :             }
     232          886 :         });
     233          886 : }
     234              : 
     235            0 : void server_connection::handle_stock_error(std::shared_ptr<http_stream> stream,
     236              :                                             http_response::status status) {
     237            0 :     auto http_error = http_response::stock_http_reply(status);
     238            0 :     http_error->set_keep_alive(stream->keep_alive());
     239            0 :     handle_stream(stream, http_error);
     240            0 : }
     241              : 
     242           38 : std::shared_ptr<asio::socket> server_connection::release_socket() {
     243           38 :     running_ = false;
     244           38 :     socket_->cancel();
     245           38 :     timeout_timer_.cancel();
     246           38 :     return socket_;
     247              : }
     248              : 
     249            0 : void server_connection::release() {
     250            0 :     boost::asio::dispatch(socket_->get_io_context(),
     251            0 :         [this, self = shared_from_this()] {
     252            0 :             close();
     253            0 :         });
     254            0 : }
     255              : 
     256          364 : std::shared_ptr<asio::socket> server_connection::get_socket() {
     257          364 :     return socket_;
     258              : }
     259              : 
     260            0 : void server_connection::update_connection_timeout(std::chrono::seconds timeout) {
     261            0 :     boost::asio::dispatch(socket_->get_io_context(),
     262            0 :         [this, self = shared_from_this(), timeout] {
     263            0 :             timeout_ = timeout;
     264            0 :             reset_timeout();
     265            0 :         });
     266            0 : }
     267              : 
     268              : }
        

Generated by: LCOV version 2.0-1