Line data Source code
1 : #ifndef SSE_CONNECTION_HPP
2 : #define SSE_CONNECTION_HPP
3 :
4 : #include <memory>
5 : #include <queue>
6 : #include "../common/http_request.hpp"
7 : #include "../common/http_response.hpp"
8 : #include "../../asio/sockets/socket.hpp"
9 : #include "../data/out_string.hpp"
10 : #include "../../util/logger.hpp"
11 : #include "../../util/types.hpp"
12 :
13 : namespace thinger::http{
14 :
15 : class sse_connection : public std::enable_shared_from_this<sse_connection>, public boost::noncopyable{
16 :
17 : public:
18 :
19 : /**
20 : * Parameter for controlling the maximum number of pending messages stored
21 : * in the output queue.
22 : */
23 : static const int MAX_OUTPUT_MESSAGES = 100;
24 :
25 : /**
26 : * Parameter for controlling the number of live sse connections
27 : */
28 : static std::atomic<unsigned long> connections;
29 :
30 21 : sse_connection(std::shared_ptr<asio::socket> socket) :
31 21 : socket_(socket),
32 21 : timer_(socket->get_io_context()),
33 21 : idle_(false)
34 : {
35 21 : connections++;
36 21 : LOG_DEBUG("created sse connection total: {}", unsigned(connections));
37 21 : }
38 :
39 21 : virtual ~sse_connection()
40 21 : {
41 21 : connections--;
42 21 : LOG_DEBUG("releasing sse connection. total: {}", unsigned(connections));
43 21 : }
44 :
45 : private:
46 :
47 21 : void handle_timeout()
48 : {
49 21 : idle_ = true;
50 21 : timer_.expires_after(std::chrono::seconds(60));
51 21 : timer_.async_wait(
52 42 : [this, self = shared_from_this()](const boost::system::error_code& e){
53 : // if timer was not cancelled (just expired by itself)
54 15 : if(e != boost::asio::error::operation_aborted && !idle_){
55 0 : handle_timeout();
56 15 : }else if(idle_){
57 : // will terminate any pending async reads or writes
58 3 : socket_->close();
59 : }
60 15 : }
61 : );
62 21 : }
63 :
64 30 : void process_out_queue()
65 : {
66 30 : if(writing_) return;
67 15 : if(out_queue_.empty()) return;
68 :
69 15 : writing_ = true;
70 :
71 : // Spawn a coroutine to handle the write
72 15 : co_spawn(socket_->get_io_context(),
73 45 : [this, self = shared_from_this()]() -> awaitable<void> {
74 : while(!out_queue_.empty()) {
75 : const auto& data = out_queue_.front();
76 : std::vector<boost::asio::const_buffer> buffers;
77 : buffers.push_back(boost::asio::buffer(data.first));
78 : buffers.push_back(boost::asio::buffer(misc_strings::name_value_separator));
79 : buffers.push_back(boost::asio::buffer(data.second));
80 : buffers.push_back(boost::asio::buffer(misc_strings::lf));
81 :
82 : if(data.first == "data"){
83 : buffers.push_back(boost::asio::buffer(misc_strings::lf));
84 : }
85 :
86 : auto [write_ec, write_bytes] = co_await socket_->write(buffers);
87 : if (write_ec) break;
88 : idle_ = false;
89 : out_queue_.pop();
90 : }
91 : writing_ = false;
92 30 : },
93 : detached);
94 : }
95 :
96 : public:
97 :
98 21 : void start(){
99 21 : handle_timeout();
100 21 : }
101 :
102 18 : void stop(){
103 18 : boost::asio::dispatch(socket_->get_io_context(), [this, self = shared_from_this()](){
104 18 : timer_.cancel();
105 18 : socket_->close();
106 18 : });
107 18 : }
108 :
109 6 : void send_retry(unsigned long millis){
110 18 : handle_write("retry", boost::lexical_cast<std::string>(millis));
111 6 : }
112 :
113 6 : void send_event(const std::string& event_name){
114 6 : handle_write("event", event_name);
115 6 : }
116 :
117 18 : void send_data(const std::string& data){
118 18 : handle_write("data", data);
119 18 : }
120 :
121 30 : void handle_write(const std::string& type, const std::string& value){
122 30 : boost::asio::dispatch(socket_->get_io_context(), [this, self = shared_from_this(), type, value](){
123 30 : if(out_queue_.size()<=MAX_OUTPUT_MESSAGES){
124 30 : out_queue_.push(std::make_pair(type, value));
125 30 : process_out_queue();
126 : }
127 30 : });
128 30 : }
129 :
130 : private:
131 : /// Socket being used HTTP connection
132 : std::shared_ptr<asio::socket> socket_;
133 :
134 : /// Out queue
135 : std::queue<std::pair<std::string, std::string>> out_queue_;
136 :
137 : bool writing_ = false;
138 :
139 : /// Timer used for controlling HTTP timeout
140 : boost::asio::steady_timer timer_;
141 :
142 : bool idle_;
143 : };
144 :
145 : }
146 :
147 : #endif
|