Line data Source code
1 : #include "server_connection.hpp"
2 : #include "request.hpp"
3 : #include "../../util/logger.hpp"
4 :
5 : namespace thinger::http {
6 :
7 : std::atomic<unsigned long> server_connection::connections(0);
8 :
9 992 : server_connection::server_connection(std::shared_ptr<asio::socket> socket)
10 992 : : socket_(std::move(socket))
11 1984 : , timeout_timer_(socket_->get_io_context()) {
12 992 : ++connections;
13 992 : LOG_DEBUG("created http server connection total: {}", static_cast<unsigned>(connections));
14 992 : }
15 :
16 992 : server_connection::~server_connection() {
17 992 : --connections;
18 992 : LOG_DEBUG("releasing http server connection. total: {}", static_cast<unsigned>(connections));
19 992 : }
20 :
21 992 : void server_connection::start(std::chrono::seconds timeout) {
22 992 : if (running_) return;
23 992 : running_ = true;
24 992 : timeout_ = timeout;
25 :
26 : // Start timeout timer
27 992 : reset_timeout();
28 :
29 : // Spawn the read loop coroutine
30 992 : co_spawn(socket_->get_io_context(),
31 2976 : [self = shared_from_this()]() -> awaitable<void> {
32 : co_await self->read_loop();
33 1984 : },
34 : detached);
35 : }
36 :
37 4699 : void server_connection::reset_timeout() {
38 4699 : timeout_timer_.expires_after(timeout_);
39 4699 : timeout_timer_.async_wait([this, self = shared_from_this()](const boost::system::error_code& ec) {
40 4688 : if (ec) return; // Timer was cancelled
41 :
42 4 : LOG_DEBUG("http server connection timed out after {} seconds", timeout_.count());
43 4 : close();
44 : });
45 4699 : }
46 :
47 40 : void server_connection::close() {
48 40 : running_ = false;
49 40 : timeout_timer_.cancel();
50 40 : socket_->close();
51 40 : }
52 :
53 992 : awaitable<void> server_connection::read_loop() {
54 : auto self = shared_from_this();
55 :
56 : // Parse headers only; body reading is managed by the handler layer
57 : request_parser_.set_headers_only(true);
58 :
59 : size_t buffered = 0; // bytes of valid data in buffer_
60 :
61 : while (running_ && socket_->is_open()) {
62 : // If no buffered data, read from socket
63 : if (buffered == 0) {
64 : auto [ec, bytes] = co_await socket_->read_some(buffer_, MAX_BUFFER_SIZE);
65 : if (ec) break;
66 : reset_timeout();
67 : buffered = bytes;
68 : }
69 :
70 : // Parse available data
71 : uint8_t* begin = buffer_;
72 : uint8_t* end = buffer_ + buffered;
73 : boost::tribool result = request_parser_.parse(begin, end);
74 : size_t unconsumed = static_cast<size_t>(end - begin);
75 :
76 : if (result) {
77 : // Successfully parsed headers
78 : auto http_req = request_parser_.consume_request();
79 : http_req->set_ssl(socket_->is_secure());
80 :
81 : size_t content_length = http_req->get_content_length();
82 :
83 : auto stream = std::make_shared<http_stream>(++request_id_, http_req->keep_alive());
84 :
85 : // Add to queue for pipelining
86 : {
87 : std::lock_guard<std::mutex> lock(queue_mutex_);
88 : request_queue_.push(stream);
89 : }
90 :
91 : // Log the request
92 : http_req->log("SERVER REQUEST", 0);
93 :
94 : // Create request and store read-ahead data
95 : auto req = std::make_shared<request>(self, stream, http_req);
96 : if (unconsumed > 0) {
97 : req->set_read_ahead(begin, unconsumed);
98 : }
99 :
100 : // Dispatch to handler (awaitable — handler decides body reading strategy)
101 : if (handler_) {
102 : co_await handler_(req);
103 : reset_timeout();
104 : }
105 :
106 : // After handler completes, compute leftover for pipelining.
107 : // The request tracks how many read-ahead bytes remain unconsumed.
108 : // Any leftover read-ahead bytes are pipelined data for the next request.
109 : size_t remaining_ahead = req->read_ahead_available();
110 : if (remaining_ahead > 0) {
111 : // Copy residual read-ahead back to buffer_ for the next iteration
112 : size_t ahead_start = unconsumed - remaining_ahead;
113 : std::memmove(buffer_, begin + ahead_start, remaining_ahead);
114 : buffered = remaining_ahead;
115 : } else {
116 : buffered = 0;
117 : }
118 :
119 : // If not keep-alive, stop reading after this request
120 : if (!stream->keep_alive()) {
121 : break;
122 : }
123 : } else if (!result) {
124 : // Bad request
125 : LOG_ERROR("invalid http request");
126 : auto stream = std::make_shared<http_stream>(++request_id_, false);
127 : {
128 : std::lock_guard<std::mutex> lock(queue_mutex_);
129 : request_queue_.push(stream);
130 : }
131 : handle_stock_error(stream, http_response::status::bad_request);
132 : break;
133 : } else {
134 : // Indeterminate — all data consumed by parser, need more
135 : buffered = 0;
136 : }
137 : }
138 :
139 : // Connection ended
140 : running_ = false;
141 : timeout_timer_.cancel();
142 1984 : }
143 :
144 1023 : awaitable<void> server_connection::write_frame(std::shared_ptr<http_stream> stream,
145 : std::shared_ptr<http_frame> frame) {
146 : // Log response
147 : frame->log("SERVER RESPONSE", 0);
148 :
149 : // Write frame to socket
150 : co_await frame->to_socket(socket_);
151 :
152 : // Reset timeout on activity
153 : reset_timeout();
154 :
155 : // Check if stream is complete
156 : if (frame->end_stream()) {
157 : stream->completed();
158 :
159 : if (!stream->keep_alive()) {
160 : close();
161 : } else {
162 : // Remove completed stream from queue
163 : std::lock_guard<std::mutex> lock(queue_mutex_);
164 : if (!request_queue_.empty()) {
165 : request_queue_.pop();
166 : }
167 : }
168 : }
169 2046 : }
170 :
171 2040 : void server_connection::process_output_queue() {
172 3039 : if (writing_) return;
173 :
174 2022 : std::shared_ptr<http_stream> stream;
175 2022 : std::shared_ptr<http_frame> frame;
176 :
177 : {
178 2022 : std::lock_guard<std::mutex> lock(queue_mutex_);
179 2022 : if (request_queue_.empty()) return;
180 :
181 1059 : stream = request_queue_.front();
182 1059 : if (stream->empty_queue()) return;
183 :
184 1023 : frame = stream->current_frame();
185 1023 : stream->pop_frame();
186 2022 : }
187 :
188 1023 : writing_ = true;
189 :
190 1023 : co_spawn(socket_->get_io_context(),
191 3069 : [this, self = shared_from_this(), stream, frame]() -> awaitable<void> {
192 : co_await write_frame(stream, frame);
193 : writing_ = false;
194 :
195 : // Process more frames if available
196 : process_output_queue();
197 2046 : },
198 : detached);
199 3021 : }
200 :
201 1023 : void server_connection::handle_stream(std::shared_ptr<http_stream> stream,
202 : std::shared_ptr<http_frame> frame) {
203 1023 : boost::asio::dispatch(socket_->get_io_context(),
204 2046 : [this, self = shared_from_this(), stream, frame] {
205 1023 : stream->add_frame(frame);
206 :
207 1023 : std::shared_ptr<http_stream> front_stream;
208 : {
209 1023 : std::lock_guard<std::mutex> lock(queue_mutex_);
210 1023 : if (request_queue_.empty()) {
211 0 : LOG_ERROR("trying to send response without a pending request!");
212 0 : return;
213 : }
214 1023 : front_stream = request_queue_.front();
215 1023 : }
216 :
217 : // Only process if this is the front stream (for pipelining order)
218 1023 : if (front_stream->id() == stream->id()) {
219 1017 : process_output_queue();
220 : }
221 1023 : });
222 1023 : }
223 :
224 0 : void server_connection::handle_stock_error(std::shared_ptr<http_stream> stream,
225 : http_response::status status) {
226 0 : auto http_error = http_response::stock_http_reply(status);
227 0 : http_error->set_keep_alive(stream->keep_alive());
228 0 : handle_stream(stream, http_error);
229 0 : }
230 :
231 59 : std::shared_ptr<asio::socket> server_connection::release_socket() {
232 59 : running_ = false;
233 59 : socket_->cancel();
234 59 : timeout_timer_.cancel();
235 59 : return socket_;
236 : }
237 :
238 0 : void server_connection::release() {
239 0 : boost::asio::dispatch(socket_->get_io_context(),
240 0 : [this, self = shared_from_this()] {
241 0 : close();
242 0 : });
243 0 : }
244 :
245 364 : std::shared_ptr<asio::socket> server_connection::get_socket() {
246 364 : return socket_;
247 : }
248 :
249 0 : void server_connection::update_connection_timeout(std::chrono::seconds timeout) {
250 0 : boost::asio::dispatch(socket_->get_io_context(),
251 0 : [this, self = shared_from_this(), timeout] {
252 0 : timeout_ = timeout;
253 0 : reset_timeout();
254 0 : });
255 0 : }
256 :
257 : }
|