Line data Source code
1 : #include "websocket_client.hpp"
2 : #include "../../util/logger.hpp"
3 :
4 : #include <future>
5 :
6 : namespace thinger::http {
7 :
8 38 : websocket_client::websocket_client(std::shared_ptr<asio::websocket> ws)
9 38 : : websocket_(std::move(ws)) {}
10 :
11 192 : websocket_client::~websocket_client() {
12 192 : stop();
13 192 : }
14 :
15 154 : websocket_client::websocket_client(websocket_client&& other) noexcept
16 154 : : websocket_(std::move(other.websocket_)),
17 154 : running_(other.running_.load()),
18 154 : on_message_(std::move(other.on_message_)),
19 154 : on_close_(std::move(other.on_close_)),
20 154 : on_error_(std::move(other.on_error_)) {
21 154 : other.running_ = false;
22 154 : }
23 :
24 0 : websocket_client& websocket_client::operator=(websocket_client&& other) noexcept {
25 0 : if (this != &other) {
26 0 : stop();
27 0 : websocket_ = std::move(other.websocket_);
28 0 : running_ = other.running_.load();
29 0 : on_message_ = std::move(other.on_message_);
30 0 : on_close_ = std::move(other.on_close_);
31 0 : on_error_ = std::move(other.on_error_);
32 0 : other.running_ = false;
33 : }
34 0 : return *this;
35 : }
36 :
37 158 : bool websocket_client::is_open() const {
38 158 : return websocket_ && websocket_->is_open();
39 : }
40 :
41 0 : std::shared_ptr<asio::websocket> websocket_client::release_socket() {
42 0 : running_ = false;
43 0 : return std::move(websocket_);
44 : }
45 :
46 : // ============================================
47 : // Synchronous API
48 : // ============================================
49 :
50 30 : bool websocket_client::send_text(std::string_view message) {
51 30 : if (!is_open()) return false;
52 :
53 30 : std::promise<bool> promise;
54 30 : auto future = promise.get_future();
55 :
56 30 : auto& io_context = websocket_->get_io_context();
57 :
58 30 : boost::asio::co_spawn(
59 : io_context,
60 90 : [this, msg = std::string(message), &promise]() -> awaitable<void> {
61 : bool result = co_await send_text_async(std::move(msg));
62 : promise.set_value(result);
63 60 : },
64 : boost::asio::detached
65 : );
66 :
67 30 : if (io_context.get_executor().running_in_this_thread()) {
68 12 : while (future.wait_for(std::chrono::milliseconds(0)) != std::future_status::ready) {
69 6 : io_context.poll_one();
70 : }
71 : } else {
72 24 : io_context.run();
73 24 : io_context.restart();
74 : }
75 :
76 30 : return future.get();
77 30 : }
78 :
79 4 : bool websocket_client::send_binary(const void* data, size_t size) {
80 4 : return send_binary(std::string_view(static_cast<const char*>(data), size));
81 : }
82 :
83 4 : bool websocket_client::send_binary(std::string_view data) {
84 4 : if (!is_open()) return false;
85 :
86 4 : std::promise<bool> promise;
87 4 : auto future = promise.get_future();
88 :
89 4 : auto& io_context = websocket_->get_io_context();
90 4 : std::vector<uint8_t> data_vec(data.begin(), data.end());
91 :
92 4 : boost::asio::co_spawn(
93 : io_context,
94 8 : [this, data_vec = std::move(data_vec), &promise]() mutable -> awaitable<void> {
95 : bool result = co_await send_binary_async(std::move(data_vec));
96 : promise.set_value(result);
97 8 : },
98 : boost::asio::detached
99 : );
100 :
101 4 : if (io_context.get_executor().running_in_this_thread()) {
102 0 : while (future.wait_for(std::chrono::milliseconds(0)) != std::future_status::ready) {
103 0 : io_context.poll_one();
104 : }
105 : } else {
106 4 : io_context.run();
107 4 : io_context.restart();
108 : }
109 :
110 4 : return future.get();
111 4 : }
112 :
113 36 : std::pair<std::string, bool> websocket_client::receive() {
114 36 : if (!is_open()) return {"", false};
115 :
116 36 : std::promise<std::pair<std::string, bool>> promise;
117 36 : auto future = promise.get_future();
118 :
119 36 : auto& io_context = websocket_->get_io_context();
120 :
121 36 : boost::asio::co_spawn(
122 : io_context,
123 72 : [this, &promise]() -> awaitable<void> {
124 : auto result = co_await receive_async();
125 : promise.set_value(std::move(result));
126 72 : },
127 : boost::asio::detached
128 : );
129 :
130 36 : if (io_context.get_executor().running_in_this_thread()) {
131 90 : while (future.wait_for(std::chrono::milliseconds(0)) != std::future_status::ready) {
132 84 : io_context.poll_one();
133 : }
134 : } else {
135 30 : io_context.run();
136 30 : io_context.restart();
137 : }
138 :
139 36 : return future.get();
140 36 : }
141 :
142 34 : void websocket_client::close() {
143 34 : if (!websocket_) return;
144 :
145 34 : std::promise<void> promise;
146 34 : auto future = promise.get_future();
147 :
148 34 : auto& io_context = websocket_->get_io_context();
149 :
150 34 : boost::asio::co_spawn(
151 : io_context,
152 68 : [this, &promise]() -> awaitable<void> {
153 : co_await close_async();
154 : promise.set_value();
155 68 : },
156 : boost::asio::detached
157 : );
158 :
159 34 : if (io_context.get_executor().running_in_this_thread()) {
160 33 : while (future.wait_for(std::chrono::milliseconds(0)) != std::future_status::ready) {
161 25 : io_context.poll_one();
162 : }
163 : } else {
164 26 : io_context.run();
165 26 : io_context.restart();
166 : }
167 :
168 34 : future.get();
169 34 : }
170 :
171 : // ============================================
172 : // Async/Callback API
173 : // ============================================
174 :
175 0 : void websocket_client::run() {
176 0 : if (!is_open() || running_) return;
177 :
178 0 : running_ = true;
179 :
180 0 : boost::asio::co_spawn(
181 0 : websocket_->get_io_context(),
182 0 : [this]() -> awaitable<void> {
183 : co_await message_loop();
184 0 : },
185 : boost::asio::detached
186 : );
187 : }
188 :
189 192 : void websocket_client::stop() {
190 192 : running_ = false;
191 192 : if (websocket_) {
192 38 : websocket_->close();
193 : }
194 192 : }
195 :
196 0 : awaitable<void> websocket_client::message_loop() {
197 : while (running_ && is_open()) {
198 : auto [message, is_binary] = co_await receive_async();
199 :
200 : if (message.empty() && !is_open()) {
201 : break;
202 : }
203 :
204 : if (message.empty()) {
205 : // Error or connection closed
206 : break;
207 : }
208 :
209 : if (on_message_) {
210 : on_message_(message, is_binary);
211 : }
212 : }
213 :
214 : running_ = false;
215 :
216 : if (on_close_) {
217 : on_close_();
218 : }
219 0 : }
220 :
221 : // ============================================
222 : // Coroutine API
223 : // ============================================
224 :
225 34 : awaitable<bool> websocket_client::send_text_async(std::string message) {
226 : if (!is_open()) co_return false;
227 :
228 : websocket_->set_binary(false);
229 : co_await websocket_->write(message);
230 : co_return websocket_->is_open();
231 68 : }
232 :
233 4 : awaitable<bool> websocket_client::send_binary_async(std::vector<uint8_t> data) {
234 : if (!is_open()) co_return false;
235 :
236 : websocket_->set_binary(true);
237 : co_await websocket_->write(data.data(), data.size());
238 : co_return websocket_->is_open();
239 8 : }
240 :
241 40 : awaitable<std::pair<std::string, bool>> websocket_client::receive_async() {
242 : if (!is_open()) {
243 : co_return std::make_pair(std::string{}, false);
244 : }
245 :
246 : std::array<uint8_t, 65536> buffer;
247 : size_t bytes_read = co_await websocket_->read_some(buffer.data(), buffer.size());
248 :
249 : if (bytes_read == 0) {
250 : co_return std::make_pair(std::string{}, false);
251 : }
252 :
253 : std::string message(reinterpret_cast<char*>(buffer.data()), bytes_read);
254 : bool is_binary = websocket_->is_binary();
255 :
256 : co_return std::make_pair(std::move(message), is_binary);
257 80 : }
258 :
259 38 : awaitable<void> websocket_client::close_async() {
260 : if (websocket_) {
261 : co_await websocket_->close_graceful();
262 : }
263 : running_ = false;
264 76 : }
265 :
266 : } // namespace thinger::http
|