LCOV - code coverage report
Current view: top level - http/server - sse_connection.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 98.2 % 55 54
Test Date: 2026-04-21 17:49:55 Functions: 93.3 % 15 14

            Line data    Source code
       1              : #ifndef SSE_CONNECTION_HPP
       2              : #define SSE_CONNECTION_HPP
       3              : 
       4              : #include <memory>
       5              : #include <queue>
       6              : #include "../common/http_request.hpp"
       7              : #include "../common/http_response.hpp"
       8              : #include "../../asio/sockets/socket.hpp"
       9              : #include "../data/out_string.hpp"
      10              : #include "../../util/logger.hpp"
      11              : #include "../../util/types.hpp"
      12              : 
      13              : namespace thinger::http{
      14              : 
      15              : class sse_connection : public std::enable_shared_from_this<sse_connection>, public boost::noncopyable{
      16              : 
      17              : public:
      18              : 
      19              :     /**
      20              :      * Parameter for controlling the maximum number of pending messages stored
      21              :      * in the output queue.
      22              :      */
      23              :     static const int MAX_OUTPUT_MESSAGES = 100;
      24              : 
      25              :     /**
      26              :      * Parameter for controlling the number of live sse connections
      27              :      */
      28              :     static std::atomic<unsigned long> connections;
      29              : 
      30           21 :     sse_connection(std::shared_ptr<asio::socket> socket) :
      31           21 :             socket_(socket),
      32           21 :             timer_(socket->get_io_context()),
      33           21 :             idle_(false)
      34              :     {
      35           21 :         connections++;
      36           21 :         LOG_DEBUG("created sse connection total: {}", unsigned(connections));
      37           21 :     }
      38              : 
      39           21 :     virtual ~sse_connection()
      40           21 :     {
      41           21 :         connections--;
      42           21 :         LOG_DEBUG("releasing sse connection. total: {}", unsigned(connections));
      43           21 :     }
      44              : 
      45              : private:
      46              : 
      47           21 :     void handle_timeout()
      48              :     {
      49           21 :         idle_ = true;
      50           21 :         timer_.expires_after(std::chrono::seconds(60));
      51           21 :         timer_.async_wait(
      52           42 :             [this, self = shared_from_this()](const boost::system::error_code& e){
      53              :                 // if timer was not cancelled (just expired by itself)
      54           15 :                 if(e != boost::asio::error::operation_aborted && !idle_){
      55            0 :                     handle_timeout();
      56           15 :                 }else if(idle_){
      57              :                     // will terminate any pending async reads or writes
      58            3 :                     socket_->close();
      59              :                 }
      60           15 :             }
      61              :         );
      62           21 :     }
      63              : 
      64           30 :     void process_out_queue()
      65              :     {
      66           30 :         if(writing_) return;
      67           15 :         if(out_queue_.empty()) return;
      68              : 
      69           15 :         writing_ = true;
      70              : 
      71              :         // Spawn a coroutine to handle the write
      72           15 :         co_spawn(socket_->get_io_context(),
      73           45 :             [this, self = shared_from_this()]() -> awaitable<void> {
      74              :                     while(!out_queue_.empty()) {
      75              :                         const auto& data = out_queue_.front();
      76              :                         std::vector<boost::asio::const_buffer> buffers;
      77              :                         buffers.push_back(boost::asio::buffer(data.first));
      78              :                         buffers.push_back(boost::asio::buffer(misc_strings::name_value_separator));
      79              :                         buffers.push_back(boost::asio::buffer(data.second));
      80              :                         buffers.push_back(boost::asio::buffer(misc_strings::lf));
      81              : 
      82              :                         if(data.first == "data"){
      83              :                             buffers.push_back(boost::asio::buffer(misc_strings::lf));
      84              :                         }
      85              : 
      86              :                         auto [write_ec, write_bytes] = co_await socket_->write(buffers);
      87              :                         if (write_ec) break;
      88              :                         idle_ = false;
      89              :                         out_queue_.pop();
      90              :                     }
      91              :                     writing_ = false;
      92           30 :             },
      93              :             detached);
      94              :     }
      95              : 
      96              : public:
      97              : 
      98           21 :     void start(){
      99           21 :         handle_timeout();
     100           21 :     }
     101              : 
     102           18 :     void stop(){
     103           18 :         boost::asio::dispatch(socket_->get_io_context(), [this, self = shared_from_this()](){
     104           18 :             timer_.cancel();
     105           18 :             socket_->close();
     106           18 :         });
     107           18 :     }
     108              : 
     109            6 :     void send_retry(unsigned long millis){
     110           18 :         handle_write("retry", boost::lexical_cast<std::string>(millis));
     111            6 :     }
     112              : 
     113            6 :     void send_event(const std::string& event_name){
     114            6 :         handle_write("event", event_name);
     115            6 :     }
     116              : 
     117           18 :     void send_data(const std::string& data){
     118           18 :         handle_write("data", data);
     119           18 :     }
     120              : 
     121           30 :     void handle_write(const std::string& type, const std::string& value){
     122           30 :          boost::asio::dispatch(socket_->get_io_context(), [this, self = shared_from_this(), type, value](){
     123           30 :             if(out_queue_.size()<=MAX_OUTPUT_MESSAGES){
     124           30 :                 out_queue_.push(std::make_pair(type, value));
     125           30 :                 process_out_queue();
     126              :             }
     127           30 :         });
     128           30 :     }
     129              : 
     130              : private:
     131              :     /// Socket being used HTTP connection
     132              :     std::shared_ptr<asio::socket> socket_;
     133              : 
     134              :     /// Out queue
     135              :     std::queue<std::pair<std::string, std::string>> out_queue_;
     136              : 
     137              :     bool writing_ = false;
     138              : 
     139              :     /// Timer used for controlling HTTP timeout
     140              :     boost::asio::steady_timer timer_;
     141              : 
     142              :     bool idle_;
     143              : };
     144              : 
     145              : }
     146              : 
     147              : #endif
        

Generated by: LCOV version 2.0-1