Line data Source code
1 : #ifndef THINGER_HTTP_CLIENT_ASYNC_CLIENT_HPP
2 : #define THINGER_HTTP_CLIENT_ASYNC_CLIENT_HPP
3 :
4 : #include "http_client_base.hpp"
5 : #include "stream_types.hpp"
6 : #include "request_builder.hpp"
7 : #include "../../asio/worker_client.hpp"
8 : #include "../../asio/workers.hpp"
9 : #include <condition_variable>
10 : #include <mutex>
11 : #include <atomic>
12 : #include <fstream>
13 : #include <filesystem>
14 :
15 : namespace thinger::http {
16 :
17 : /**
18 : * Asynchronous HTTP client with callback-based API.
19 : * Uses worker thread pool for optimal performance.
20 : * Perfect for concurrent requests and non-blocking I/O.
21 : *
22 : * Usage with callbacks:
23 : * http::async_client client;
24 : * client.get("https://api.example.com/data", [](auto& response) {
25 : * std::cout << response.body() << std::endl;
26 : * });
27 : * client.wait(); // Wait for all requests to complete
28 : *
29 : * Usage with coroutines:
30 : * http::async_client client;
31 : * client.run([&]() -> awaitable<void> {
32 : * auto response = co_await client.get("https://api.example.com/data");
33 : * });
34 : * client.wait();
35 : */
36 : class async_client : public http_client_base, public asio::worker_client {
37 : private:
38 : std::condition_variable requests_cv_;
39 : std::mutex requests_mutex_;
40 : std::atomic<size_t> active_requests_{0};
41 :
42 : protected:
43 866 : boost::asio::io_context& get_io_context() override {
44 866 : return asio::get_workers().get_thread_io_context();
45 : }
46 :
47 : public:
48 : async_client();
49 : ~async_client() override;
50 :
51 : // Track active requests for synchronization
52 : void track_request_start();
53 : void track_request_end();
54 :
55 : // worker_client interface
56 : bool stop() override;
57 : void wait() override;
58 :
59 : // Wait with timeout
60 : bool wait_for(std::chrono::milliseconds timeout);
61 :
62 : // Query
63 24 : size_t pending_requests() const { return active_requests_.load(); }
64 24 : bool has_pending_requests() const { return active_requests_.load() > 0; }
65 :
66 : // Run an awaitable on the worker pool
67 : template<typename T>
68 : void run(awaitable<T> coro) {
69 : track_request_start();
70 : co_spawn(get_io_context(),
71 : [this, coro = std::move(coro)]() mutable -> awaitable<void> {
72 : co_await std::move(coro);
73 : track_request_end();
74 : },
75 : detached);
76 : }
77 :
78 : // Run a coroutine factory on the worker pool
79 : template<typename F>
80 421 : void run(F&& coro_factory) {
81 421 : track_request_start();
82 421 : co_spawn(get_io_context(),
83 1222 : [this, factory = std::forward<F>(coro_factory)]() mutable -> awaitable<void> {
84 : co_await factory();
85 : track_request_end();
86 : },
87 : detached);
88 421 : }
89 :
90 : // ============================================
91 : // Callback-based async methods
92 : // ============================================
93 : // These provide a simpler interface when you're not already in a coroutine.
94 : // The callback is invoked with the response when the request completes.
95 : //
96 : // Usage:
97 : // client.get("https://api.example.com/users", [](client_response& response) {
98 : // std::cout << response.body() << std::endl;
99 : // });
100 : // client.wait(); // Wait for all requests to complete
101 :
102 : // Bring base class awaitable methods into scope for co_await usage
103 : using http_client_base::get;
104 : using http_client_base::post;
105 : using http_client_base::put;
106 : using http_client_base::patch;
107 : using http_client_base::del;
108 : using http_client_base::head;
109 : using http_client_base::options;
110 : using http_client_base::send;
111 : using http_client_base::send_streaming;
112 :
113 : template<typename Callback>
114 308 : void get(const std::string& url, Callback&& callback, headers_map headers = {}) {
115 616 : run([this, url, cb = std::forward<Callback>(callback), h = std::move(headers)]() mutable -> awaitable<void> {
116 : auto response = co_await http_client_base::get(url, std::move(h));
117 : cb(response);
118 : });
119 308 : }
120 :
121 : template<typename Callback>
122 20 : void post(const std::string& url, Callback&& callback, std::string body = {},
123 : std::string content_type = "application/json", headers_map headers = {}) {
124 80 : run([this, url, cb = std::forward<Callback>(callback), b = std::move(body),
125 40 : ct = std::move(content_type), h = std::move(headers)]() mutable -> awaitable<void> {
126 : auto response = co_await http_client_base::post(url, std::move(b), std::move(ct), std::move(h));
127 : cb(response);
128 : });
129 20 : }
130 :
131 : template<typename Callback>
132 : void post(const std::string& url, const form& form, Callback&& callback, headers_map headers = {}) {
133 : run([this, url, cb = std::forward<Callback>(callback),
134 : f = form, h = std::move(headers)]() mutable -> awaitable<void> {
135 : auto response = co_await http_client_base::post(url, f, std::move(h));
136 : cb(response);
137 : });
138 : }
139 :
140 : template<typename Callback>
141 : void put(const std::string& url, Callback&& callback, std::string body = {},
142 : std::string content_type = "application/json", headers_map headers = {}) {
143 : run([this, url, cb = std::forward<Callback>(callback), b = std::move(body),
144 : ct = std::move(content_type), h = std::move(headers)]() mutable -> awaitable<void> {
145 : auto response = co_await http_client_base::put(url, std::move(b), std::move(ct), std::move(h));
146 : cb(response);
147 : });
148 : }
149 :
150 : template<typename Callback>
151 : void patch(const std::string& url, Callback&& callback, std::string body = {},
152 : std::string content_type = "application/json", headers_map headers = {}) {
153 : run([this, url, cb = std::forward<Callback>(callback), b = std::move(body),
154 : ct = std::move(content_type), h = std::move(headers)]() mutable -> awaitable<void> {
155 : auto response = co_await http_client_base::patch(url, std::move(b), std::move(ct), std::move(h));
156 : cb(response);
157 : });
158 : }
159 :
160 : template<typename Callback>
161 : void del(const std::string& url, Callback&& callback, headers_map headers = {}) {
162 : run([this, url, cb = std::forward<Callback>(callback), h = std::move(headers)]() mutable -> awaitable<void> {
163 : auto response = co_await http_client_base::del(url, std::move(h));
164 : cb(response);
165 : });
166 : }
167 :
168 : template<typename Callback>
169 : void head(const std::string& url, Callback&& callback, headers_map headers = {}) {
170 : run([this, url, cb = std::forward<Callback>(callback), h = std::move(headers)]() mutable -> awaitable<void> {
171 : auto response = co_await http_client_base::head(url, std::move(h));
172 : cb(response);
173 : });
174 : }
175 :
176 : template<typename Callback>
177 : void options(const std::string& url, Callback&& callback, headers_map headers = {}) {
178 : run([this, url, cb = std::forward<Callback>(callback), h = std::move(headers)]() mutable -> awaitable<void> {
179 : auto response = co_await http_client_base::options(url, std::move(h));
180 : cb(response);
181 : });
182 : }
183 :
184 : // ============================================
185 : // Streaming methods
186 : // ============================================
187 :
188 : /**
189 : * Send a streaming request with callback.
190 : *
191 : * Usage:
192 : * client.send_streaming(request, stream_callback, [](stream_result& result) {
193 : * std::cout << "Downloaded: " << result.bytes_transferred << " bytes" << std::endl;
194 : * });
195 : */
196 : template<typename Callback>
197 : void send_streaming(std::shared_ptr<http_request> request, stream_callback stream_cb, Callback&& result_cb) {
198 : run([this, req = std::move(request), scb = std::move(stream_cb),
199 : rcb = std::forward<Callback>(result_cb)]() mutable -> awaitable<void> {
200 : auto result = co_await http_client_base::send_streaming(std::move(req), std::move(scb));
201 : rcb(result);
202 : });
203 : }
204 :
205 : /**
206 : * Streaming GET with callback.
207 : */
208 : template<typename Callback>
209 : void get_streaming(const std::string& url, stream_callback stream_cb, Callback&& result_cb, headers_map headers = {}) {
210 : auto request = std::make_shared<http_request>();
211 : request->set_url(url);
212 : request->set_method(method::GET);
213 : for (const auto& [key, value] : headers) {
214 : request->add_header(key, value);
215 : }
216 : send_streaming(std::move(request), std::move(stream_cb), std::forward<Callback>(result_cb));
217 : }
218 :
219 : /**
220 : * Download file with progress callback.
221 : */
222 : template<typename Callback>
223 : void download(const std::string& url, const std::filesystem::path& path,
224 : Callback&& result_cb, progress_callback progress = {}) {
225 : run([this, url, path, rcb = std::forward<Callback>(result_cb),
226 : prog = std::move(progress)]() mutable -> awaitable<void> {
227 : // Open file
228 : std::ofstream file(path, std::ios::binary);
229 : if (!file) {
230 : stream_result result;
231 : result.error = "Cannot open file for writing: " + path.string();
232 : rcb(result);
233 : co_return;
234 : }
235 :
236 : // Create request
237 : auto request = std::make_shared<http_request>();
238 : request->set_url(url);
239 : request->set_method(method::GET);
240 :
241 : // Stream to file
242 : auto result = co_await http_client_base::send_streaming(request,
243 : [&file, &prog](const stream_info& info) {
244 : file.write(info.data.data(), static_cast<std::streamsize>(info.data.size()));
245 : if (prog) {
246 : prog(info.downloaded, info.total);
247 : }
248 : return true;
249 : });
250 :
251 : rcb(result);
252 : });
253 : }
254 :
255 : // ============================================
256 : // Request builder for fluent API
257 : // ============================================
258 :
259 : /**
260 : * Create a request builder for fluent API.
261 : *
262 : * Usage:
263 : * auto res = co_await client.request("https://api.com/data")
264 : * .header("Authorization", "Bearer xxx")
265 : * .get();
266 : */
267 76 : request_builder<async_client> request(const std::string& url) {
268 76 : return request_builder<async_client>(this, url);
269 : }
270 :
271 : // ============================================
272 : // WebSocket
273 : // ============================================
274 :
275 : /**
276 : * Connect to a WebSocket server (awaitable version).
277 : *
278 : * Usage:
279 : * auto ws = co_await client.websocket("wss://server.com/ws", "subprotocol");
280 : * if (ws) {
281 : * co_await ws->send_text_async("Hello!");
282 : * }
283 : */
284 : awaitable<std::optional<websocket_client>> websocket(
285 : const std::string& url, const std::string& subprotocol = "") {
286 : return upgrade_websocket(url, subprotocol);
287 : }
288 :
289 : /**
290 : * Connect to a WebSocket server with custom request (for builder pattern).
291 : */
292 4 : awaitable<std::optional<websocket_client>> websocket(
293 : std::shared_ptr<http_request> request, const std::string& subprotocol = "") {
294 4 : return upgrade_websocket(std::move(request), subprotocol);
295 : }
296 :
297 : /**
298 : * Connect to a WebSocket server (callback version).
299 : *
300 : * Usage:
301 : * client.websocket("ws://server.com/path", [](auto ws) {
302 : * if (ws) {
303 : * ws->send_text("Hello!");
304 : * ws->run(); // Start message loop
305 : * }
306 : * });
307 : *
308 : * The callback receives a shared_ptr to maintain ownership.
309 : */
310 : template<typename Callback>
311 6 : void websocket(const std::string& url, Callback&& callback, const std::string& subprotocol = "") {
312 12 : run([this, url, cb = std::forward<Callback>(callback), proto = subprotocol]() mutable -> awaitable<void> {
313 : auto result = co_await upgrade_websocket(url, proto);
314 : if (result) {
315 : auto ws = std::make_shared<websocket_client>(std::move(*result));
316 : cb(ws);
317 : } else {
318 : cb(std::shared_ptr<websocket_client>{});
319 : }
320 : });
321 6 : }
322 : };
323 :
324 : } // namespace thinger::http
325 :
326 : #endif
|