Line data Source code
1 : #include "server_connection.hpp"
2 : #include "request.hpp"
3 : #include "../../util/logger.hpp"
4 :
5 : namespace thinger::http {
6 :
7 : std::atomic<unsigned long> server_connection::connections(0);
8 :
9 846 : server_connection::server_connection(std::shared_ptr<asio::socket> socket)
10 846 : : socket_(std::move(socket))
11 1692 : , timeout_timer_(socket_->get_io_context()) {
12 846 : ++connections;
13 846 : LOG_DEBUG("created http server connection total: {}", static_cast<unsigned>(connections));
14 846 : }
15 :
16 846 : server_connection::~server_connection() {
17 846 : --connections;
18 846 : LOG_DEBUG("releasing http server connection. total: {}", static_cast<unsigned>(connections));
19 846 : }
20 :
21 846 : void server_connection::start(std::chrono::seconds timeout) {
22 846 : if (running_) return;
23 846 : running_ = true;
24 846 : timeout_ = timeout;
25 :
26 : // Start timeout timer
27 846 : reset_timeout();
28 :
29 : // Spawn the read loop coroutine
30 846 : co_spawn(socket_->get_io_context(),
31 2538 : [self = shared_from_this()]() -> awaitable<void> {
32 : co_await self->read_loop();
33 1692 : },
34 : detached);
35 : }
36 :
37 4874 : void server_connection::reset_timeout() {
38 4874 : timeout_timer_.expires_after(timeout_);
39 4874 : timeout_timer_.async_wait([this, self = shared_from_this()](const boost::system::error_code& ec) {
40 4086 : if (ec) return; // Timer was cancelled
41 :
42 4 : LOG_DEBUG("http server connection timed out after {} seconds", timeout_.count());
43 4 : close();
44 : });
45 4874 : }
46 :
47 22 : void server_connection::close() {
48 22 : running_ = false;
49 22 : timeout_timer_.cancel();
50 22 : socket_->close();
51 22 : }
52 :
53 846 : awaitable<void> server_connection::read_loop() {
54 : auto self = shared_from_this();
55 :
56 : // Parse headers only; body reading is managed by the handler layer
57 : request_parser_.set_headers_only(true);
58 :
59 : size_t buffered = 0; // bytes of valid data in buffer_
60 :
61 : try {
62 : while (running_ && socket_->is_open()) {
63 : // If no buffered data, read from socket
64 : if (buffered == 0) {
65 : auto bytes = co_await socket_->read_some(buffer_, MAX_BUFFER_SIZE);
66 : reset_timeout();
67 : buffered = bytes;
68 : }
69 :
70 : // Parse available data
71 : uint8_t* begin = buffer_;
72 : uint8_t* end = buffer_ + buffered;
73 : boost::tribool result = request_parser_.parse(begin, end);
74 : size_t unconsumed = static_cast<size_t>(end - begin);
75 :
76 : if (result) {
77 : // Successfully parsed headers
78 : auto http_req = request_parser_.consume_request();
79 : http_req->set_ssl(socket_->is_secure());
80 :
81 : size_t content_length = http_req->get_content_length();
82 :
83 : auto stream = std::make_shared<http_stream>(++request_id_, http_req->keep_alive());
84 :
85 : // Add to queue for pipelining
86 : {
87 : std::lock_guard<std::mutex> lock(queue_mutex_);
88 : request_queue_.push(stream);
89 : }
90 :
91 : // Log the request
92 : http_req->log("SERVER REQUEST", 0);
93 :
94 : // Create request and store read-ahead data
95 : auto req = std::make_shared<request>(self, stream, http_req);
96 : if (unconsumed > 0) {
97 : req->set_read_ahead(begin, unconsumed);
98 : }
99 :
100 : // Dispatch to handler (awaitable — handler decides body reading strategy)
101 : if (handler_) {
102 : co_await handler_(req);
103 : reset_timeout();
104 : }
105 :
106 : // After handler completes, compute leftover for pipelining.
107 : // The request tracks how many read-ahead bytes remain unconsumed.
108 : // Any leftover read-ahead bytes are pipelined data for the next request.
109 : size_t remaining_ahead = req->read_ahead_available();
110 : if (remaining_ahead > 0) {
111 : // Copy residual read-ahead back to buffer_ for the next iteration
112 : size_t ahead_start = unconsumed - remaining_ahead;
113 : std::memmove(buffer_, begin + ahead_start, remaining_ahead);
114 : buffered = remaining_ahead;
115 : } else {
116 : buffered = 0;
117 : }
118 :
119 : // If not keep-alive, stop reading after this request
120 : if (!stream->keep_alive()) {
121 : break;
122 : }
123 : } else if (!result) {
124 : // Bad request
125 : LOG_ERROR("invalid http request");
126 : auto stream = std::make_shared<http_stream>(++request_id_, false);
127 : {
128 : std::lock_guard<std::mutex> lock(queue_mutex_);
129 : request_queue_.push(stream);
130 : }
131 : handle_stock_error(stream, http_response::status::bad_request);
132 : break;
133 : } else {
134 : // Indeterminate — all data consumed by parser, need more
135 : buffered = 0;
136 : }
137 : }
138 : } catch (const boost::system::system_error& e) {
139 : if (e.code() != boost::asio::error::operation_aborted &&
140 : e.code() != boost::asio::error::eof) {
141 : LOG_ERROR("Error in server connection read loop: {}", e.what());
142 : }
143 : }
144 :
145 : // Connection ended
146 : running_ = false;
147 : timeout_timer_.cancel();
148 1692 : }
149 :
150 886 : awaitable<void> server_connection::write_frame(std::shared_ptr<http_stream> stream,
151 : std::shared_ptr<http_frame> frame) {
152 : // Log response
153 : frame->log("SERVER RESPONSE", 0);
154 :
155 : // Write frame to socket
156 : co_await frame->to_socket(socket_);
157 :
158 : // Reset timeout on activity
159 : reset_timeout();
160 :
161 : // Check if stream is complete
162 : if (frame->end_stream()) {
163 : stream->completed();
164 :
165 : if (!stream->keep_alive()) {
166 : close();
167 : } else {
168 : // Remove completed stream from queue
169 : std::lock_guard<std::mutex> lock(queue_mutex_);
170 : if (!request_queue_.empty()) {
171 : request_queue_.pop();
172 : }
173 : }
174 : }
175 1772 : }
176 :
177 1766 : void server_connection::process_output_queue() {
178 2628 : if (writing_) return;
179 :
180 1748 : std::shared_ptr<http_stream> stream;
181 1748 : std::shared_ptr<http_frame> frame;
182 :
183 : {
184 1748 : std::lock_guard<std::mutex> lock(queue_mutex_);
185 1748 : if (request_queue_.empty()) return;
186 :
187 904 : stream = request_queue_.front();
188 904 : if (stream->empty_queue()) return;
189 :
190 886 : frame = stream->current_frame();
191 886 : stream->pop_frame();
192 1748 : }
193 :
194 886 : writing_ = true;
195 :
196 886 : co_spawn(socket_->get_io_context(),
197 2658 : [this, self = shared_from_this(), stream, frame]() -> awaitable<void> {
198 : try {
199 : co_await write_frame(stream, frame);
200 : } catch (const boost::system::system_error& e) {
201 : LOG_ERROR("Error writing frame: {}", e.what());
202 : close();
203 : }
204 : writing_ = false;
205 :
206 : // Process more frames if available
207 : process_output_queue();
208 1772 : },
209 : detached);
210 2610 : }
211 :
212 886 : void server_connection::handle_stream(std::shared_ptr<http_stream> stream,
213 : std::shared_ptr<http_frame> frame) {
214 886 : boost::asio::dispatch(socket_->get_io_context(),
215 1772 : [this, self = shared_from_this(), stream, frame] {
216 886 : stream->add_frame(frame);
217 :
218 886 : std::shared_ptr<http_stream> front_stream;
219 : {
220 886 : std::lock_guard<std::mutex> lock(queue_mutex_);
221 886 : if (request_queue_.empty()) {
222 0 : LOG_ERROR("trying to send response without a pending request!");
223 0 : return;
224 : }
225 886 : front_stream = request_queue_.front();
226 886 : }
227 :
228 : // Only process if this is the front stream (for pipelining order)
229 886 : if (front_stream->id() == stream->id()) {
230 880 : process_output_queue();
231 : }
232 886 : });
233 886 : }
234 :
235 0 : void server_connection::handle_stock_error(std::shared_ptr<http_stream> stream,
236 : http_response::status status) {
237 0 : auto http_error = http_response::stock_http_reply(status);
238 0 : http_error->set_keep_alive(stream->keep_alive());
239 0 : handle_stream(stream, http_error);
240 0 : }
241 :
242 38 : std::shared_ptr<asio::socket> server_connection::release_socket() {
243 38 : running_ = false;
244 38 : socket_->cancel();
245 38 : timeout_timer_.cancel();
246 38 : return socket_;
247 : }
248 :
249 0 : void server_connection::release() {
250 0 : boost::asio::dispatch(socket_->get_io_context(),
251 0 : [this, self = shared_from_this()] {
252 0 : close();
253 0 : });
254 0 : }
255 :
256 364 : std::shared_ptr<asio::socket> server_connection::get_socket() {
257 364 : return socket_;
258 : }
259 :
260 0 : void server_connection::update_connection_timeout(std::chrono::seconds timeout) {
261 0 : boost::asio::dispatch(socket_->get_io_context(),
262 0 : [this, self = shared_from_this(), timeout] {
263 0 : timeout_ = timeout;
264 0 : reset_timeout();
265 0 : });
266 0 : }
267 :
268 : }
|