Line data Source code
1 : #ifndef SSE_CONNECTION_HPP
2 : #define SSE_CONNECTION_HPP
3 :
4 : #include <memory>
5 : #include <queue>
6 : #include "../common/http_request.hpp"
7 : #include "../common/http_response.hpp"
8 : #include "../../asio/sockets/socket.hpp"
9 : #include "../data/out_string.hpp"
10 : #include "../../util/logger.hpp"
11 : #include "../../util/types.hpp"
12 :
13 : namespace thinger::http{
14 :
15 : class sse_connection : public std::enable_shared_from_this<sse_connection>, public boost::noncopyable{
16 :
17 : public:
18 :
19 : /**
20 : * Parameter for controlling the maximum number of pending messages stored
21 : * in the output queue.
22 : */
23 : static const int MAX_OUTPUT_MESSAGES = 100;
24 :
25 : /**
26 : * Parameter for controlling the number of live sse connections
27 : */
28 : static std::atomic<unsigned long> connections;
29 :
30 0 : sse_connection(std::shared_ptr<asio::socket> socket) :
31 0 : socket_(socket),
32 0 : timer_(socket->get_io_context()),
33 0 : idle_(false)
34 : {
35 0 : connections++;
36 0 : LOG_DEBUG("created sse connection total: {}", unsigned(connections));
37 0 : }
38 :
39 0 : virtual ~sse_connection()
40 0 : {
41 0 : connections--;
42 0 : LOG_DEBUG("releasing sse connection. total: {}", unsigned(connections));
43 0 : }
44 :
45 : private:
46 :
47 0 : void handle_timeout()
48 : {
49 0 : idle_ = true;
50 0 : timer_.expires_after(std::chrono::seconds(60));
51 0 : timer_.async_wait(
52 0 : [this, self = shared_from_this()](const boost::system::error_code& e){
53 : // if timer was not cancelled (just expired by itself)
54 0 : if(e != boost::asio::error::operation_aborted && !idle_){
55 0 : handle_timeout();
56 0 : }else if(idle_){
57 : // will terminate any pending async reads or writes
58 0 : socket_->close();
59 : }
60 0 : }
61 : );
62 0 : }
63 :
64 : void process_out_queue()
65 : {
66 : if(writing_) return;
67 : if(out_queue_.empty()) return;
68 :
69 : writing_ = true;
70 :
71 : // Spawn a coroutine to handle the write
72 : co_spawn(socket_->get_io_context(),
73 0 : [this, self = shared_from_this()]() -> awaitable<void> {
74 : try {
75 : while(!out_queue_.empty()) {
76 : const auto& data = out_queue_.front();
77 : std::vector<boost::asio::const_buffer> buffers;
78 : buffers.push_back(boost::asio::buffer(data.first));
79 : buffers.push_back(boost::asio::buffer(misc_strings::name_value_separator));
80 : buffers.push_back(boost::asio::buffer(data.second));
81 : buffers.push_back(boost::asio::buffer(misc_strings::lf));
82 :
83 : if(data.first == "data"){
84 : buffers.push_back(boost::asio::buffer(misc_strings::lf));
85 : }
86 :
87 : co_await socket_->write(buffers);
88 : idle_ = false;
89 : out_queue_.pop();
90 : }
91 : writing_ = false;
92 : } catch (const boost::system::system_error& e) {
93 : timer_.cancel();
94 : writing_ = false;
95 : }
96 0 : },
97 : detached);
98 : }
99 :
100 : public:
101 :
102 0 : void start(){
103 0 : handle_timeout();
104 0 : }
105 :
106 : void stop(){
107 : boost::asio::dispatch(socket_->get_io_context(), [this, self = shared_from_this()](){
108 : timer_.cancel();
109 : socket_->close();
110 : });
111 : }
112 :
113 : void send_retry(unsigned long millis){
114 : handle_write("retry", boost::lexical_cast<std::string>(millis));
115 : }
116 :
117 : void send_event(const std::string& event_name){
118 : handle_write("event", event_name);
119 : }
120 :
121 : void send_data(const std::string& data){
122 : handle_write("data", data);
123 : }
124 :
125 : void handle_write(const std::string& type, const std::string& value){
126 : boost::asio::dispatch(socket_->get_io_context(), [this, self = shared_from_this(), type, value](){
127 : if(out_queue_.size()<=MAX_OUTPUT_MESSAGES){
128 : out_queue_.push(std::make_pair(type, value));
129 : process_out_queue();
130 : }
131 : });
132 : }
133 :
134 : private:
135 : /// Socket being used HTTP connection
136 : std::shared_ptr<asio::socket> socket_;
137 :
138 : /// Out queue
139 : std::queue<std::pair<std::string, std::string>> out_queue_;
140 :
141 : bool writing_ = false;
142 :
143 : /// Timer used for controlling HTTP timeout
144 : boost::asio::steady_timer timer_;
145 :
146 : bool idle_;
147 : };
148 :
149 : }
150 :
151 : #endif
|