LCOV - code coverage report
Current view: top level - http/client - websocket_client.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 78.8 % 113 89
Test Date: 2026-02-20 15:38:22 Functions: 78.3 % 23 18

            Line data    Source code
       1              : #include "websocket_client.hpp"
       2              : #include "../../util/logger.hpp"
       3              : 
       4              : #include <future>
       5              : 
       6              : namespace thinger::http {
       7              : 
       8           38 : websocket_client::websocket_client(std::shared_ptr<asio::websocket> ws)
       9           38 :     : websocket_(std::move(ws)) {}
      10              : 
      11          192 : websocket_client::~websocket_client() {
      12          192 :     stop();
      13          192 : }
      14              : 
      15          154 : websocket_client::websocket_client(websocket_client&& other) noexcept
      16          154 :     : websocket_(std::move(other.websocket_)),
      17          154 :       running_(other.running_.load()),
      18          154 :       on_message_(std::move(other.on_message_)),
      19          154 :       on_close_(std::move(other.on_close_)),
      20          154 :       on_error_(std::move(other.on_error_)) {
      21          154 :     other.running_ = false;
      22          154 : }
      23              : 
      24            0 : websocket_client& websocket_client::operator=(websocket_client&& other) noexcept {
      25            0 :     if (this != &other) {
      26            0 :         stop();
      27            0 :         websocket_ = std::move(other.websocket_);
      28            0 :         running_ = other.running_.load();
      29            0 :         on_message_ = std::move(other.on_message_);
      30            0 :         on_close_ = std::move(other.on_close_);
      31            0 :         on_error_ = std::move(other.on_error_);
      32            0 :         other.running_ = false;
      33              :     }
      34            0 :     return *this;
      35              : }
      36              : 
      37          158 : bool websocket_client::is_open() const {
      38          158 :     return websocket_ && websocket_->is_open();
      39              : }
      40              : 
      41            0 : std::shared_ptr<asio::websocket> websocket_client::release_socket() {
      42            0 :     running_ = false;
      43            0 :     return std::move(websocket_);
      44              : }
      45              : 
      46              : // ============================================
      47              : // Synchronous API
      48              : // ============================================
      49              : 
      50           30 : bool websocket_client::send_text(std::string_view message) {
      51           30 :     if (!is_open()) return false;
      52              : 
      53           30 :     std::promise<bool> promise;
      54           30 :     auto future = promise.get_future();
      55              : 
      56           30 :     auto& io_context = websocket_->get_io_context();
      57              : 
      58           30 :     boost::asio::co_spawn(
      59              :         io_context,
      60           90 :         [this, msg = std::string(message), &promise]() -> awaitable<void> {
      61              :             bool result = co_await send_text_async(std::move(msg));
      62              :             promise.set_value(result);
      63           60 :         },
      64              :         boost::asio::detached
      65              :     );
      66              : 
      67           30 :     if (io_context.get_executor().running_in_this_thread()) {
      68           12 :         while (future.wait_for(std::chrono::milliseconds(0)) != std::future_status::ready) {
      69            6 :             io_context.poll_one();
      70              :         }
      71              :     } else {
      72           24 :         io_context.run();
      73           24 :         io_context.restart();
      74              :     }
      75              : 
      76           30 :     return future.get();
      77           30 : }
      78              : 
      79            4 : bool websocket_client::send_binary(const void* data, size_t size) {
      80            4 :     return send_binary(std::string_view(static_cast<const char*>(data), size));
      81              : }
      82              : 
      83            4 : bool websocket_client::send_binary(std::string_view data) {
      84            4 :     if (!is_open()) return false;
      85              : 
      86            4 :     std::promise<bool> promise;
      87            4 :     auto future = promise.get_future();
      88              : 
      89            4 :     auto& io_context = websocket_->get_io_context();
      90            4 :     std::vector<uint8_t> data_vec(data.begin(), data.end());
      91              : 
      92            4 :     boost::asio::co_spawn(
      93              :         io_context,
      94            8 :         [this, data_vec = std::move(data_vec), &promise]() mutable -> awaitable<void> {
      95              :             bool result = co_await send_binary_async(std::move(data_vec));
      96              :             promise.set_value(result);
      97            8 :         },
      98              :         boost::asio::detached
      99              :     );
     100              : 
     101            4 :     if (io_context.get_executor().running_in_this_thread()) {
     102            0 :         while (future.wait_for(std::chrono::milliseconds(0)) != std::future_status::ready) {
     103            0 :             io_context.poll_one();
     104              :         }
     105              :     } else {
     106            4 :         io_context.run();
     107            4 :         io_context.restart();
     108              :     }
     109              : 
     110            4 :     return future.get();
     111            4 : }
     112              : 
     113           36 : std::pair<std::string, bool> websocket_client::receive() {
     114           36 :     if (!is_open()) return {"", false};
     115              : 
     116           36 :     std::promise<std::pair<std::string, bool>> promise;
     117           36 :     auto future = promise.get_future();
     118              : 
     119           36 :     auto& io_context = websocket_->get_io_context();
     120              : 
     121           36 :     boost::asio::co_spawn(
     122              :         io_context,
     123           72 :         [this, &promise]() -> awaitable<void> {
     124              :             auto result = co_await receive_async();
     125              :             promise.set_value(std::move(result));
     126           72 :         },
     127              :         boost::asio::detached
     128              :     );
     129              : 
     130           36 :     if (io_context.get_executor().running_in_this_thread()) {
     131           90 :         while (future.wait_for(std::chrono::milliseconds(0)) != std::future_status::ready) {
     132           84 :             io_context.poll_one();
     133              :         }
     134              :     } else {
     135           30 :         io_context.run();
     136           30 :         io_context.restart();
     137              :     }
     138              : 
     139           36 :     return future.get();
     140           36 : }
     141              : 
     142           34 : void websocket_client::close() {
     143           34 :     if (!websocket_) return;
     144              : 
     145           34 :     std::promise<void> promise;
     146           34 :     auto future = promise.get_future();
     147              : 
     148           34 :     auto& io_context = websocket_->get_io_context();
     149              : 
     150           34 :     boost::asio::co_spawn(
     151              :         io_context,
     152           68 :         [this, &promise]() -> awaitable<void> {
     153              :             co_await close_async();
     154              :             promise.set_value();
     155           68 :         },
     156              :         boost::asio::detached
     157              :     );
     158              : 
     159           34 :     if (io_context.get_executor().running_in_this_thread()) {
     160           33 :         while (future.wait_for(std::chrono::milliseconds(0)) != std::future_status::ready) {
     161           25 :             io_context.poll_one();
     162              :         }
     163              :     } else {
     164           26 :         io_context.run();
     165           26 :         io_context.restart();
     166              :     }
     167              : 
     168           34 :     future.get();
     169           34 : }
     170              : 
     171              : // ============================================
     172              : // Async/Callback API
     173              : // ============================================
     174              : 
     175            0 : void websocket_client::run() {
     176            0 :     if (!is_open() || running_) return;
     177              : 
     178            0 :     running_ = true;
     179              : 
     180            0 :     boost::asio::co_spawn(
     181            0 :         websocket_->get_io_context(),
     182            0 :         [this]() -> awaitable<void> {
     183              :             co_await message_loop();
     184            0 :         },
     185              :         boost::asio::detached
     186              :     );
     187              : }
     188              : 
     189          192 : void websocket_client::stop() {
     190          192 :     running_ = false;
     191          192 :     if (websocket_) {
     192           38 :         websocket_->close();
     193              :     }
     194          192 : }
     195              : 
     196            0 : awaitable<void> websocket_client::message_loop() {
     197              :     while (running_ && is_open()) {
     198              :         auto [message, is_binary] = co_await receive_async();
     199              : 
     200              :         if (message.empty() && !is_open()) {
     201              :             break;
     202              :         }
     203              : 
     204              :         if (message.empty()) {
     205              :             // Error or connection closed
     206              :             break;
     207              :         }
     208              : 
     209              :         if (on_message_) {
     210              :             on_message_(message, is_binary);
     211              :         }
     212              :     }
     213              : 
     214              :     running_ = false;
     215              : 
     216              :     if (on_close_) {
     217              :         on_close_();
     218              :     }
     219            0 : }
     220              : 
     221              : // ============================================
     222              : // Coroutine API
     223              : // ============================================
     224              : 
     225           34 : awaitable<bool> websocket_client::send_text_async(std::string message) {
     226              :     if (!is_open()) co_return false;
     227              : 
     228              :     websocket_->set_binary(false);
     229              :     co_await websocket_->write(message);
     230              :     co_return websocket_->is_open();
     231           68 : }
     232              : 
     233            4 : awaitable<bool> websocket_client::send_binary_async(std::vector<uint8_t> data) {
     234              :     if (!is_open()) co_return false;
     235              : 
     236              :     websocket_->set_binary(true);
     237              :     co_await websocket_->write(data.data(), data.size());
     238              :     co_return websocket_->is_open();
     239            8 : }
     240              : 
     241           40 : awaitable<std::pair<std::string, bool>> websocket_client::receive_async() {
     242              :     if (!is_open()) {
     243              :         co_return std::make_pair(std::string{}, false);
     244              :     }
     245              : 
     246              :     std::array<uint8_t, 65536> buffer;
     247              :     size_t bytes_read = co_await websocket_->read_some(buffer.data(), buffer.size());
     248              : 
     249              :     if (bytes_read == 0) {
     250              :         co_return std::make_pair(std::string{}, false);
     251              :     }
     252              : 
     253              :     std::string message(reinterpret_cast<char*>(buffer.data()), bytes_read);
     254              :     bool is_binary = websocket_->is_binary();
     255              : 
     256              :     co_return std::make_pair(std::move(message), is_binary);
     257           80 : }
     258              : 
     259           38 : awaitable<void> websocket_client::close_async() {
     260              :     if (websocket_) {
     261              :         co_await websocket_->close_graceful();
     262              :     }
     263              :     running_ = false;
     264           76 : }
     265              : 
     266              : } // namespace thinger::http
        

Generated by: LCOV version 2.0-1