LCOV - code coverage report
Current view: top level - http/client - async_client.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 100.0 % 23 23
Test Date: 2026-02-20 15:38:22 Functions: 100.0 % 215 215

            Line data    Source code
       1              : #ifndef THINGER_HTTP_CLIENT_ASYNC_CLIENT_HPP
       2              : #define THINGER_HTTP_CLIENT_ASYNC_CLIENT_HPP
       3              : 
       4              : #include "http_client_base.hpp"
       5              : #include "stream_types.hpp"
       6              : #include "request_builder.hpp"
       7              : #include "../../asio/worker_client.hpp"
       8              : #include "../../asio/workers.hpp"
       9              : #include <condition_variable>
      10              : #include <mutex>
      11              : #include <atomic>
      12              : #include <fstream>
      13              : #include <filesystem>
      14              : 
      15              : namespace thinger::http {
      16              : 
      17              : /**
      18              :  * Asynchronous HTTP client with callback-based API.
      19              :  * Uses worker thread pool for optimal performance.
      20              :  * Perfect for concurrent requests and non-blocking I/O.
      21              :  *
      22              :  * Usage with callbacks:
      23              :  *   http::async_client client;
      24              :  *   client.get("https://api.example.com/data", [](auto& response) {
      25              :  *       std::cout << response.body() << std::endl;
      26              :  *   });
      27              :  *   client.wait();  // Wait for all requests to complete
      28              :  *
      29              :  * Usage with coroutines:
      30              :  *   http::async_client client;
      31              :  *   client.run([&]() -> awaitable<void> {
      32              :  *       auto response = co_await client.get("https://api.example.com/data");
      33              :  *   });
      34              :  *   client.wait();
      35              :  */
      36              : class async_client : public http_client_base, public asio::worker_client {
      37              : private:
      38              :     std::condition_variable requests_cv_;
      39              :     std::mutex requests_mutex_;
      40              :     std::atomic<size_t> active_requests_{0};
      41              : 
      42              : protected:
      43          866 :     boost::asio::io_context& get_io_context() override {
      44          866 :         return asio::get_workers().get_thread_io_context();
      45              :     }
      46              : 
      47              : public:
      48              :     async_client();
      49              :     ~async_client() override;
      50              : 
      51              :     // Track active requests for synchronization
      52              :     void track_request_start();
      53              :     void track_request_end();
      54              : 
      55              :     // worker_client interface
      56              :     bool stop() override;
      57              :     void wait() override;
      58              : 
      59              :     // Wait with timeout
      60              :     bool wait_for(std::chrono::milliseconds timeout);
      61              : 
      62              :     // Query
      63           24 :     size_t pending_requests() const { return active_requests_.load(); }
      64           24 :     bool has_pending_requests() const { return active_requests_.load() > 0; }
      65              : 
      66              :     // Run an awaitable on the worker pool
      67              :     template<typename T>
      68              :     void run(awaitable<T> coro) {
      69              :         track_request_start();
      70              :         co_spawn(get_io_context(),
      71              :             [this, coro = std::move(coro)]() mutable -> awaitable<void> {
      72              :                 co_await std::move(coro);
      73              :                 track_request_end();
      74              :             },
      75              :             detached);
      76              :     }
      77              : 
      78              :     // Run a coroutine factory on the worker pool
      79              :     template<typename F>
      80          421 :     void run(F&& coro_factory) {
      81          421 :         track_request_start();
      82          421 :         co_spawn(get_io_context(),
      83         1222 :             [this, factory = std::forward<F>(coro_factory)]() mutable -> awaitable<void> {
      84              :                 co_await factory();
      85              :                 track_request_end();
      86              :             },
      87              :             detached);
      88          421 :     }
      89              : 
      90              :     // ============================================
      91              :     // Callback-based async methods
      92              :     // ============================================
      93              :     // These provide a simpler interface when you're not already in a coroutine.
      94              :     // The callback is invoked with the response when the request completes.
      95              :     //
      96              :     // Usage:
      97              :     //   client.get("https://api.example.com/users", [](client_response& response) {
      98              :     //       std::cout << response.body() << std::endl;
      99              :     //   });
     100              :     //   client.wait();  // Wait for all requests to complete
     101              : 
     102              :     // Bring base class awaitable methods into scope for co_await usage
     103              :     using http_client_base::get;
     104              :     using http_client_base::post;
     105              :     using http_client_base::put;
     106              :     using http_client_base::patch;
     107              :     using http_client_base::del;
     108              :     using http_client_base::head;
     109              :     using http_client_base::options;
     110              :     using http_client_base::send;
     111              :     using http_client_base::send_streaming;
     112              : 
     113              :     template<typename Callback>
     114          308 :     void get(const std::string& url, Callback&& callback, headers_map headers = {}) {
     115          616 :         run([this, url, cb = std::forward<Callback>(callback), h = std::move(headers)]() mutable -> awaitable<void> {
     116              :             auto response = co_await http_client_base::get(url, std::move(h));
     117              :             cb(response);
     118              :         });
     119          308 :     }
     120              : 
     121              :     template<typename Callback>
     122           20 :     void post(const std::string& url, Callback&& callback, std::string body = {},
     123              :               std::string content_type = "application/json", headers_map headers = {}) {
     124           80 :         run([this, url, cb = std::forward<Callback>(callback), b = std::move(body),
     125           40 :              ct = std::move(content_type), h = std::move(headers)]() mutable -> awaitable<void> {
     126              :             auto response = co_await http_client_base::post(url, std::move(b), std::move(ct), std::move(h));
     127              :             cb(response);
     128              :         });
     129           20 :     }
     130              : 
     131              :     template<typename Callback>
     132              :     void post(const std::string& url, const form& form, Callback&& callback, headers_map headers = {}) {
     133              :         run([this, url, cb = std::forward<Callback>(callback),
     134              :              f = form, h = std::move(headers)]() mutable -> awaitable<void> {
     135              :             auto response = co_await http_client_base::post(url, f, std::move(h));
     136              :             cb(response);
     137              :         });
     138              :     }
     139              : 
     140              :     template<typename Callback>
     141              :     void put(const std::string& url, Callback&& callback, std::string body = {},
     142              :              std::string content_type = "application/json", headers_map headers = {}) {
     143              :         run([this, url, cb = std::forward<Callback>(callback), b = std::move(body),
     144              :              ct = std::move(content_type), h = std::move(headers)]() mutable -> awaitable<void> {
     145              :             auto response = co_await http_client_base::put(url, std::move(b), std::move(ct), std::move(h));
     146              :             cb(response);
     147              :         });
     148              :     }
     149              : 
     150              :     template<typename Callback>
     151              :     void patch(const std::string& url, Callback&& callback, std::string body = {},
     152              :                std::string content_type = "application/json", headers_map headers = {}) {
     153              :         run([this, url, cb = std::forward<Callback>(callback), b = std::move(body),
     154              :              ct = std::move(content_type), h = std::move(headers)]() mutable -> awaitable<void> {
     155              :             auto response = co_await http_client_base::patch(url, std::move(b), std::move(ct), std::move(h));
     156              :             cb(response);
     157              :         });
     158              :     }
     159              : 
     160              :     template<typename Callback>
     161              :     void del(const std::string& url, Callback&& callback, headers_map headers = {}) {
     162              :         run([this, url, cb = std::forward<Callback>(callback), h = std::move(headers)]() mutable -> awaitable<void> {
     163              :             auto response = co_await http_client_base::del(url, std::move(h));
     164              :             cb(response);
     165              :         });
     166              :     }
     167              : 
     168              :     template<typename Callback>
     169              :     void head(const std::string& url, Callback&& callback, headers_map headers = {}) {
     170              :         run([this, url, cb = std::forward<Callback>(callback), h = std::move(headers)]() mutable -> awaitable<void> {
     171              :             auto response = co_await http_client_base::head(url, std::move(h));
     172              :             cb(response);
     173              :         });
     174              :     }
     175              : 
     176              :     template<typename Callback>
     177              :     void options(const std::string& url, Callback&& callback, headers_map headers = {}) {
     178              :         run([this, url, cb = std::forward<Callback>(callback), h = std::move(headers)]() mutable -> awaitable<void> {
     179              :             auto response = co_await http_client_base::options(url, std::move(h));
     180              :             cb(response);
     181              :         });
     182              :     }
     183              : 
     184              :     // ============================================
     185              :     // Streaming methods
     186              :     // ============================================
     187              : 
     188              :     /**
     189              :      * Send a streaming request with callback.
     190              :      *
     191              :      * Usage:
     192              :      *   client.send_streaming(request, stream_callback, [](stream_result& result) {
     193              :      *       std::cout << "Downloaded: " << result.bytes_transferred << " bytes" << std::endl;
     194              :      *   });
     195              :      */
     196              :     template<typename Callback>
     197              :     void send_streaming(std::shared_ptr<http_request> request, stream_callback stream_cb, Callback&& result_cb) {
     198              :         run([this, req = std::move(request), scb = std::move(stream_cb),
     199              :              rcb = std::forward<Callback>(result_cb)]() mutable -> awaitable<void> {
     200              :             auto result = co_await http_client_base::send_streaming(std::move(req), std::move(scb));
     201              :             rcb(result);
     202              :         });
     203              :     }
     204              : 
     205              :     /**
     206              :      * Streaming GET with callback.
     207              :      */
     208              :     template<typename Callback>
     209              :     void get_streaming(const std::string& url, stream_callback stream_cb, Callback&& result_cb, headers_map headers = {}) {
     210              :         auto request = std::make_shared<http_request>();
     211              :         request->set_url(url);
     212              :         request->set_method(method::GET);
     213              :         for (const auto& [key, value] : headers) {
     214              :             request->add_header(key, value);
     215              :         }
     216              :         send_streaming(std::move(request), std::move(stream_cb), std::forward<Callback>(result_cb));
     217              :     }
     218              : 
     219              :     /**
     220              :      * Download file with progress callback.
     221              :      */
     222              :     template<typename Callback>
     223              :     void download(const std::string& url, const std::filesystem::path& path,
     224              :                   Callback&& result_cb, progress_callback progress = {}) {
     225              :         run([this, url, path, rcb = std::forward<Callback>(result_cb),
     226              :              prog = std::move(progress)]() mutable -> awaitable<void> {
     227              :             // Open file
     228              :             std::ofstream file(path, std::ios::binary);
     229              :             if (!file) {
     230              :                 stream_result result;
     231              :                 result.error = "Cannot open file for writing: " + path.string();
     232              :                 rcb(result);
     233              :                 co_return;
     234              :             }
     235              : 
     236              :             // Create request
     237              :             auto request = std::make_shared<http_request>();
     238              :             request->set_url(url);
     239              :             request->set_method(method::GET);
     240              : 
     241              :             // Stream to file
     242              :             auto result = co_await http_client_base::send_streaming(request,
     243              :                 [&file, &prog](const stream_info& info) {
     244              :                     file.write(info.data.data(), static_cast<std::streamsize>(info.data.size()));
     245              :                     if (prog) {
     246              :                         prog(info.downloaded, info.total);
     247              :                     }
     248              :                     return true;
     249              :                 });
     250              : 
     251              :             rcb(result);
     252              :         });
     253              :     }
     254              : 
     255              :     // ============================================
     256              :     // Request builder for fluent API
     257              :     // ============================================
     258              : 
     259              :     /**
     260              :      * Create a request builder for fluent API.
     261              :      *
     262              :      * Usage:
     263              :      *   auto res = co_await client.request("https://api.com/data")
     264              :      *       .header("Authorization", "Bearer xxx")
     265              :      *       .get();
     266              :      */
     267           76 :     request_builder<async_client> request(const std::string& url) {
     268           76 :         return request_builder<async_client>(this, url);
     269              :     }
     270              : 
     271              :     // ============================================
     272              :     // WebSocket
     273              :     // ============================================
     274              : 
     275              :     /**
     276              :      * Connect to a WebSocket server (awaitable version).
     277              :      *
     278              :      * Usage:
     279              :      *   auto ws = co_await client.websocket("wss://server.com/ws", "subprotocol");
     280              :      *   if (ws) {
     281              :      *       co_await ws->send_text_async("Hello!");
     282              :      *   }
     283              :      */
     284              :     awaitable<std::optional<websocket_client>> websocket(
     285              :         const std::string& url, const std::string& subprotocol = "") {
     286              :         return upgrade_websocket(url, subprotocol);
     287              :     }
     288              : 
     289              :     /**
     290              :      * Connect to a WebSocket server with custom request (for builder pattern).
     291              :      */
     292            4 :     awaitable<std::optional<websocket_client>> websocket(
     293              :         std::shared_ptr<http_request> request, const std::string& subprotocol = "") {
     294            4 :         return upgrade_websocket(std::move(request), subprotocol);
     295              :     }
     296              : 
     297              :     /**
     298              :      * Connect to a WebSocket server (callback version).
     299              :      *
     300              :      * Usage:
     301              :      *   client.websocket("ws://server.com/path", [](auto ws) {
     302              :      *       if (ws) {
     303              :      *           ws->send_text("Hello!");
     304              :      *           ws->run();  // Start message loop
     305              :      *       }
     306              :      *   });
     307              :      *
     308              :      * The callback receives a shared_ptr to maintain ownership.
     309              :      */
     310              :     template<typename Callback>
     311            6 :     void websocket(const std::string& url, Callback&& callback, const std::string& subprotocol = "") {
     312           12 :         run([this, url, cb = std::forward<Callback>(callback), proto = subprotocol]() mutable -> awaitable<void> {
     313              :             auto result = co_await upgrade_websocket(url, proto);
     314              :             if (result) {
     315              :                 auto ws = std::make_shared<websocket_client>(std::move(*result));
     316              :                 cb(ws);
     317              :             } else {
     318              :                 cb(std::shared_ptr<websocket_client>{});
     319              :             }
     320              :         });
     321            6 :     }
     322              : };
     323              : 
     324              : } // namespace thinger::http
     325              : 
     326              : #endif
        

Generated by: LCOV version 2.0-1