LCOV - code coverage report
Current view: top level - http/server - sse_connection.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 0.0 % 28 0
Test Date: 2026-02-20 15:38:22 Functions: 0.0 % 7 0

            Line data    Source code
       1              : #ifndef SSE_CONNECTION_HPP
       2              : #define SSE_CONNECTION_HPP
       3              : 
       4              : #include <memory>
       5              : #include <queue>
       6              : #include "../common/http_request.hpp"
       7              : #include "../common/http_response.hpp"
       8              : #include "../../asio/sockets/socket.hpp"
       9              : #include "../data/out_string.hpp"
      10              : #include "../../util/logger.hpp"
      11              : #include "../../util/types.hpp"
      12              : 
      13              : namespace thinger::http{
      14              : 
      15              : class sse_connection : public std::enable_shared_from_this<sse_connection>, public boost::noncopyable{
      16              : 
      17              : public:
      18              : 
      19              :     /**
      20              :      * Parameter for controlling the maximum number of pending messages stored
      21              :      * in the output queue.
      22              :      */
      23              :     static const int MAX_OUTPUT_MESSAGES = 100;
      24              : 
      25              :     /**
      26              :      * Parameter for controlling the number of live sse connections
      27              :      */
      28              :     static std::atomic<unsigned long> connections;
      29              : 
      30            0 :     sse_connection(std::shared_ptr<asio::socket> socket) :
      31            0 :             socket_(socket),
      32            0 :             timer_(socket->get_io_context()),
      33            0 :             idle_(false)
      34              :     {
      35            0 :         connections++;
      36            0 :         LOG_DEBUG("created sse connection total: {}", unsigned(connections));
      37            0 :     }
      38              : 
      39            0 :     virtual ~sse_connection()
      40            0 :     {
      41            0 :         connections--;
      42            0 :         LOG_DEBUG("releasing sse connection. total: {}", unsigned(connections));
      43            0 :     }
      44              : 
      45              : private:
      46              : 
      47            0 :     void handle_timeout()
      48              :     {
      49            0 :         idle_ = true;
      50            0 :         timer_.expires_after(std::chrono::seconds(60));
      51            0 :         timer_.async_wait(
      52            0 :             [this, self = shared_from_this()](const boost::system::error_code& e){
      53              :                 // if timer was not cancelled (just expired by itself)
      54            0 :                 if(e != boost::asio::error::operation_aborted && !idle_){
      55            0 :                     handle_timeout();
      56            0 :                 }else if(idle_){
      57              :                     // will terminate any pending async reads or writes
      58            0 :                     socket_->close();
      59              :                 }
      60            0 :             }
      61              :         );
      62            0 :     }
      63              : 
      64              :     void process_out_queue()
      65              :     {
      66              :         if(writing_) return;
      67              :         if(out_queue_.empty()) return;
      68              : 
      69              :         writing_ = true;
      70              : 
      71              :         // Spawn a coroutine to handle the write
      72              :         co_spawn(socket_->get_io_context(),
      73            0 :             [this, self = shared_from_this()]() -> awaitable<void> {
      74              :                 try {
      75              :                     while(!out_queue_.empty()) {
      76              :                         const auto& data = out_queue_.front();
      77              :                         std::vector<boost::asio::const_buffer> buffers;
      78              :                         buffers.push_back(boost::asio::buffer(data.first));
      79              :                         buffers.push_back(boost::asio::buffer(misc_strings::name_value_separator));
      80              :                         buffers.push_back(boost::asio::buffer(data.second));
      81              :                         buffers.push_back(boost::asio::buffer(misc_strings::lf));
      82              : 
      83              :                         if(data.first == "data"){
      84              :                             buffers.push_back(boost::asio::buffer(misc_strings::lf));
      85              :                         }
      86              : 
      87              :                         co_await socket_->write(buffers);
      88              :                         idle_ = false;
      89              :                         out_queue_.pop();
      90              :                     }
      91              :                     writing_ = false;
      92              :                 } catch (const boost::system::system_error& e) {
      93              :                     timer_.cancel();
      94              :                     writing_ = false;
      95              :                 }
      96            0 :             },
      97              :             detached);
      98              :     }
      99              : 
     100              : public:
     101              : 
     102            0 :     void start(){
     103            0 :         handle_timeout();
     104            0 :     }
     105              : 
     106              :     void stop(){
     107              :         boost::asio::dispatch(socket_->get_io_context(), [this, self = shared_from_this()](){
     108              :             timer_.cancel();
     109              :             socket_->close();
     110              :         });
     111              :     }
     112              : 
     113              :     void send_retry(unsigned long millis){
     114              :         handle_write("retry", boost::lexical_cast<std::string>(millis));
     115              :     }
     116              : 
     117              :     void send_event(const std::string& event_name){
     118              :         handle_write("event", event_name);
     119              :     }
     120              : 
     121              :     void send_data(const std::string& data){
     122              :         handle_write("data", data);
     123              :     }
     124              : 
     125              :     void handle_write(const std::string& type, const std::string& value){
     126              :          boost::asio::dispatch(socket_->get_io_context(), [this, self = shared_from_this(), type, value](){
     127              :             if(out_queue_.size()<=MAX_OUTPUT_MESSAGES){
     128              :                 out_queue_.push(std::make_pair(type, value));
     129              :                 process_out_queue();
     130              :             }
     131              :         });
     132              :     }
     133              : 
     134              : private:
     135              :     /// Socket being used HTTP connection
     136              :     std::shared_ptr<asio::socket> socket_;
     137              : 
     138              :     /// Out queue
     139              :     std::queue<std::pair<std::string, std::string>> out_queue_;
     140              : 
     141              :     bool writing_ = false;
     142              : 
     143              :     /// Timer used for controlling HTTP timeout
     144              :     boost::asio::steady_timer timer_;
     145              : 
     146              :     bool idle_;
     147              : };
     148              : 
     149              : }
     150              : 
     151              : #endif
        

Generated by: LCOV version 2.0-1