Line data Source code
1 : #ifndef THINGER_WEBSOCKET_CONNECTION_HPP
2 : #define THINGER_WEBSOCKET_CONNECTION_HPP
3 :
4 : #include <memory>
5 : #include <queue>
6 : #include <boost/asio/streambuf.hpp>
7 : #include "../../asio/sockets/websocket.hpp"
8 : #include "../../asio/sockets/socket.hpp"
9 : #include "../data/out_data.hpp"
10 : #include "../../util/types.hpp"
11 :
12 : namespace thinger::http{
13 :
14 : class websocket_connection : public std::enable_shared_from_this<websocket_connection>{
15 :
16 : public:
17 :
18 : /**
19 : * Parameter for controlling the maximum number of bytes used in the incoming
20 : * buffer
21 : */
22 : static constexpr size_t DEFAULT_BUFFER_SIZE = 1024; // 1KB
23 : static constexpr size_t MAX_BUFFER_SIZE = 16*1024*1024; // 16MB
24 :
25 : /**
26 : * Parameter for controlling the maximum number of pending messages stored
27 : * in the output queue.
28 : */
29 : static constexpr int MAX_OUTPUT_MESSAGES = 100;
30 :
31 : /**
32 : * Parameter for controlling the number of live http client connections
33 : */
34 : static std::atomic<unsigned long> connections;
35 :
36 : /**
37 : * Constructor that requires a socket
38 : * @param socket
39 : */
40 : websocket_connection(std::shared_ptr<asio::websocket> socket);
41 :
42 : /**
43 : * Destructor
44 : */
45 : virtual ~websocket_connection();
46 :
47 : /**
48 : * Set a callbback to listen for text frames received
49 : * @param callback
50 : */
51 : void on_message(std::function<void(std::string, bool binary)> callback);
52 :
53 : /**
54 : * Set a message handler
55 : * @param handler
56 : */
57 : //void set_stream_handler(std::shared_ptr<messages::stream_handler> handler);
58 :
59 :
60 : /**
61 : * Execute actions on the websocket thread
62 : * @param callback
63 : */
64 36 : inline void execute(std::function<void()> callback){
65 36 : boost::asio::dispatch(ws_->get_io_context(), [self = shared_from_this(), callback = std::move(callback)](){
66 36 : callback();
67 36 : });
68 36 : }
69 :
70 : /**
71 : * Start the websocket. Required for control lifetime and reading messages
72 : */
73 : void start();
74 :
75 : /**
76 : * Stop the webbsocket
77 : */
78 : void stop();
79 :
80 : /**
81 : *
82 : * @return true if the endpoint connection is congested
83 : */
84 : bool congested_connection();
85 :
86 : /**
87 : * Send a text frame over connection
88 : * @param text
89 : */
90 : void send_text(std::string text);
91 :
92 : /**
93 : * Send a binary frame over connection
94 : * @param text
95 : */
96 : void send_binary(std::string data);
97 :
98 : /**
99 : * Return the base socket used in this client connection and release it for being used by another connection handler.
100 : * No further calls mut be done to this instance.
101 : */
102 : std::shared_ptr<asio::socket> release_socket();
103 :
104 : private:
105 :
106 : /**
107 : * Start the read loop coroutine
108 : */
109 : void start_read_loop();
110 :
111 : /**
112 : * Main read loop coroutine
113 : */
114 : awaitable<void> read_loop();
115 :
116 : /**
117 : * Process the output queue
118 : */
119 : void process_out_queue();
120 :
121 : private:
122 :
123 : /// Socket being used for the websocket connection
124 : std::shared_ptr<asio::websocket> ws_;
125 :
126 : /// Shared keeper to keep the connection alive
127 : //std::shared_ptr<base::shared_keeper<websocket_connection>> shared_keeper_;
128 :
129 : /// Out queue
130 : std::queue<std::pair<std::string, bool>> out_queue_;
131 :
132 : /// Buffer for incoming data.
133 : boost::asio::streambuf buffer_{MAX_BUFFER_SIZE};
134 :
135 : /// Message listener
136 : //std::shared_ptr<messages::stream_handler> stream_handler_;
137 :
138 : /// Text frame callback
139 : std::function<void(std::string, bool)> on_frame_callback_;
140 :
141 : bool writing_ = false;
142 : };
143 :
144 : }
145 :
146 : #endif
|