LCOV - code coverage report
Current view: top level - http/server - server_connection.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 77.8 % 90 70
Test Date: 2026-04-21 17:49:55 Functions: 71.4 % 21 15

            Line data    Source code
       1              : #include "server_connection.hpp"
       2              : #include "request.hpp"
       3              : #include "../../util/logger.hpp"
       4              : 
       5              : namespace thinger::http {
       6              : 
       7              : std::atomic<unsigned long> server_connection::connections(0);
       8              : 
       9          992 : server_connection::server_connection(std::shared_ptr<asio::socket> socket)
      10          992 :     : socket_(std::move(socket))
      11         1984 :     , timeout_timer_(socket_->get_io_context()) {
      12          992 :     ++connections;
      13          992 :     LOG_DEBUG("created http server connection total: {}", static_cast<unsigned>(connections));
      14          992 : }
      15              : 
      16          992 : server_connection::~server_connection() {
      17          992 :     --connections;
      18          992 :     LOG_DEBUG("releasing http server connection. total: {}", static_cast<unsigned>(connections));
      19          992 : }
      20              : 
      21          992 : void server_connection::start(std::chrono::seconds timeout) {
      22          992 :     if (running_) return;
      23          992 :     running_ = true;
      24          992 :     timeout_ = timeout;
      25              : 
      26              :     // Start timeout timer
      27          992 :     reset_timeout();
      28              : 
      29              :     // Spawn the read loop coroutine
      30          992 :     co_spawn(socket_->get_io_context(),
      31         2976 :         [self = shared_from_this()]() -> awaitable<void> {
      32              :             co_await self->read_loop();
      33         1984 :         },
      34              :         detached);
      35              : }
      36              : 
      37         4699 : void server_connection::reset_timeout() {
      38         4699 :     timeout_timer_.expires_after(timeout_);
      39         4699 :     timeout_timer_.async_wait([this, self = shared_from_this()](const boost::system::error_code& ec) {
      40         4688 :         if (ec) return; // Timer was cancelled
      41              : 
      42            4 :         LOG_DEBUG("http server connection timed out after {} seconds", timeout_.count());
      43            4 :         close();
      44              :     });
      45         4699 : }
      46              : 
      47           40 : void server_connection::close() {
      48           40 :     running_ = false;
      49           40 :     timeout_timer_.cancel();
      50           40 :     socket_->close();
      51           40 : }
      52              : 
      53          992 : awaitable<void> server_connection::read_loop() {
      54              :     auto self = shared_from_this();
      55              : 
      56              :     // Parse headers only; body reading is managed by the handler layer
      57              :     request_parser_.set_headers_only(true);
      58              : 
      59              :     size_t buffered = 0; // bytes of valid data in buffer_
      60              : 
      61              :     while (running_ && socket_->is_open()) {
      62              :         // If no buffered data, read from socket
      63              :         if (buffered == 0) {
      64              :             auto [ec, bytes] = co_await socket_->read_some(buffer_, MAX_BUFFER_SIZE);
      65              :             if (ec) break;
      66              :             reset_timeout();
      67              :             buffered = bytes;
      68              :         }
      69              : 
      70              :         // Parse available data
      71              :         uint8_t* begin = buffer_;
      72              :         uint8_t* end = buffer_ + buffered;
      73              :         boost::tribool result = request_parser_.parse(begin, end);
      74              :         size_t unconsumed = static_cast<size_t>(end - begin);
      75              : 
      76              :         if (result) {
      77              :             // Successfully parsed headers
      78              :             auto http_req = request_parser_.consume_request();
      79              :             http_req->set_ssl(socket_->is_secure());
      80              : 
      81              :             size_t content_length = http_req->get_content_length();
      82              : 
      83              :             auto stream = std::make_shared<http_stream>(++request_id_, http_req->keep_alive());
      84              : 
      85              :             // Add to queue for pipelining
      86              :             {
      87              :                 std::lock_guard<std::mutex> lock(queue_mutex_);
      88              :                 request_queue_.push(stream);
      89              :             }
      90              : 
      91              :             // Log the request
      92              :             http_req->log("SERVER REQUEST", 0);
      93              : 
      94              :             // Create request and store read-ahead data
      95              :             auto req = std::make_shared<request>(self, stream, http_req);
      96              :             if (unconsumed > 0) {
      97              :                 req->set_read_ahead(begin, unconsumed);
      98              :             }
      99              : 
     100              :             // Dispatch to handler (awaitable — handler decides body reading strategy)
     101              :             if (handler_) {
     102              :                 co_await handler_(req);
     103              :                 reset_timeout();
     104              :             }
     105              : 
     106              :             // After handler completes, compute leftover for pipelining.
     107              :             // The request tracks how many read-ahead bytes remain unconsumed.
     108              :             // Any leftover read-ahead bytes are pipelined data for the next request.
     109              :             size_t remaining_ahead = req->read_ahead_available();
     110              :             if (remaining_ahead > 0) {
     111              :                 // Copy residual read-ahead back to buffer_ for the next iteration
     112              :                 size_t ahead_start = unconsumed - remaining_ahead;
     113              :                 std::memmove(buffer_, begin + ahead_start, remaining_ahead);
     114              :                 buffered = remaining_ahead;
     115              :             } else {
     116              :                 buffered = 0;
     117              :             }
     118              : 
     119              :             // If not keep-alive, stop reading after this request
     120              :             if (!stream->keep_alive()) {
     121              :                 break;
     122              :             }
     123              :         } else if (!result) {
     124              :             // Bad request
     125              :             LOG_ERROR("invalid http request");
     126              :             auto stream = std::make_shared<http_stream>(++request_id_, false);
     127              :             {
     128              :                 std::lock_guard<std::mutex> lock(queue_mutex_);
     129              :                 request_queue_.push(stream);
     130              :             }
     131              :             handle_stock_error(stream, http_response::status::bad_request);
     132              :             break;
     133              :         } else {
     134              :             // Indeterminate — all data consumed by parser, need more
     135              :             buffered = 0;
     136              :         }
     137              :     }
     138              : 
     139              :     // Connection ended
     140              :     running_ = false;
     141              :     timeout_timer_.cancel();
     142         1984 : }
     143              : 
     144         1023 : awaitable<void> server_connection::write_frame(std::shared_ptr<http_stream> stream,
     145              :                                                 std::shared_ptr<http_frame> frame) {
     146              :     // Log response
     147              :     frame->log("SERVER RESPONSE", 0);
     148              : 
     149              :     // Write frame to socket
     150              :     co_await frame->to_socket(socket_);
     151              : 
     152              :     // Reset timeout on activity
     153              :     reset_timeout();
     154              : 
     155              :     // Check if stream is complete
     156              :     if (frame->end_stream()) {
     157              :         stream->completed();
     158              : 
     159              :         if (!stream->keep_alive()) {
     160              :             close();
     161              :         } else {
     162              :             // Remove completed stream from queue
     163              :             std::lock_guard<std::mutex> lock(queue_mutex_);
     164              :             if (!request_queue_.empty()) {
     165              :                 request_queue_.pop();
     166              :             }
     167              :         }
     168              :     }
     169         2046 : }
     170              : 
     171         2040 : void server_connection::process_output_queue() {
     172         3039 :     if (writing_) return;
     173              : 
     174         2022 :     std::shared_ptr<http_stream> stream;
     175         2022 :     std::shared_ptr<http_frame> frame;
     176              : 
     177              :     {
     178         2022 :         std::lock_guard<std::mutex> lock(queue_mutex_);
     179         2022 :         if (request_queue_.empty()) return;
     180              : 
     181         1059 :         stream = request_queue_.front();
     182         1059 :         if (stream->empty_queue()) return;
     183              : 
     184         1023 :         frame = stream->current_frame();
     185         1023 :         stream->pop_frame();
     186         2022 :     }
     187              : 
     188         1023 :     writing_ = true;
     189              : 
     190         1023 :     co_spawn(socket_->get_io_context(),
     191         3069 :         [this, self = shared_from_this(), stream, frame]() -> awaitable<void> {
     192              :             co_await write_frame(stream, frame);
     193              :             writing_ = false;
     194              : 
     195              :             // Process more frames if available
     196              :             process_output_queue();
     197         2046 :         },
     198              :         detached);
     199         3021 : }
     200              : 
     201         1023 : void server_connection::handle_stream(std::shared_ptr<http_stream> stream,
     202              :                                        std::shared_ptr<http_frame> frame) {
     203         1023 :     boost::asio::dispatch(socket_->get_io_context(),
     204         2046 :         [this, self = shared_from_this(), stream, frame] {
     205         1023 :             stream->add_frame(frame);
     206              : 
     207         1023 :             std::shared_ptr<http_stream> front_stream;
     208              :             {
     209         1023 :                 std::lock_guard<std::mutex> lock(queue_mutex_);
     210         1023 :                 if (request_queue_.empty()) {
     211            0 :                     LOG_ERROR("trying to send response without a pending request!");
     212            0 :                     return;
     213              :                 }
     214         1023 :                 front_stream = request_queue_.front();
     215         1023 :             }
     216              : 
     217              :             // Only process if this is the front stream (for pipelining order)
     218         1023 :             if (front_stream->id() == stream->id()) {
     219         1017 :                 process_output_queue();
     220              :             }
     221         1023 :         });
     222         1023 : }
     223              : 
     224            0 : void server_connection::handle_stock_error(std::shared_ptr<http_stream> stream,
     225              :                                             http_response::status status) {
     226            0 :     auto http_error = http_response::stock_http_reply(status);
     227            0 :     http_error->set_keep_alive(stream->keep_alive());
     228            0 :     handle_stream(stream, http_error);
     229            0 : }
     230              : 
     231           59 : std::shared_ptr<asio::socket> server_connection::release_socket() {
     232           59 :     running_ = false;
     233           59 :     socket_->cancel();
     234           59 :     timeout_timer_.cancel();
     235           59 :     return socket_;
     236              : }
     237              : 
     238            0 : void server_connection::release() {
     239            0 :     boost::asio::dispatch(socket_->get_io_context(),
     240            0 :         [this, self = shared_from_this()] {
     241            0 :             close();
     242            0 :         });
     243            0 : }
     244              : 
     245          364 : std::shared_ptr<asio::socket> server_connection::get_socket() {
     246          364 :     return socket_;
     247              : }
     248              : 
     249            0 : void server_connection::update_connection_timeout(std::chrono::seconds timeout) {
     250            0 :     boost::asio::dispatch(socket_->get_io_context(),
     251            0 :         [this, self = shared_from_this(), timeout] {
     252            0 :             timeout_ = timeout;
     253            0 :             reset_timeout();
     254            0 :         });
     255            0 : }
     256              : 
     257              : }
        

Generated by: LCOV version 2.0-1