Line data Source code
1 : #include "connection_pool.hpp"
2 : #include "../../util/logger.hpp"
3 :
4 : namespace thinger::http {
5 :
6 720 : connection_pool::~connection_pool() {
7 : // Always close connections when destroying the pool
8 : // There's no point keeping them alive as they won't be reachable
9 720 : clear();
10 720 : }
11 :
12 3145084 : std::shared_ptr<client_connection> connection_pool::get_connection(const std::string& host,
13 : uint16_t port,
14 : bool ssl) {
15 6290168 : return get_connection_impl(host, port, ssl, "");
16 : }
17 :
18 48 : std::shared_ptr<client_connection> connection_pool::get_unix_connection(const std::string& unix_path) {
19 96 : return get_connection_impl("", 0, false, unix_path);
20 : }
21 :
22 767486 : void connection_pool::store_connection(const std::string& host,
23 : uint16_t port,
24 : bool ssl,
25 : std::shared_ptr<client_connection> connection) {
26 2302458 : store_connection_impl(host, port, ssl, "", connection);
27 767486 : }
28 :
29 48 : void connection_pool::store_unix_connection(const std::string& unix_path,
30 : std::shared_ptr<client_connection> connection) {
31 144 : store_connection_impl("", 0, false, unix_path, connection);
32 48 : }
33 :
34 591 : size_t connection_pool::cleanup_expired() {
35 591 : std::unique_lock<std::shared_mutex> lock(mutex_);
36 591 : size_t removed = 0;
37 591 : auto& seq_index = connections_.get<by_sequence>();
38 591 : auto it = seq_index.begin();
39 4444 : while (it != seq_index.end()) {
40 3853 : if (it->connection.expired()) {
41 324 : it = seq_index.erase(it);
42 324 : ++removed;
43 : } else {
44 3529 : ++it;
45 : }
46 : }
47 591 : return removed;
48 591 : }
49 :
50 37187 : size_t connection_pool::size() const {
51 37187 : std::shared_lock<std::shared_mutex> lock(mutex_);
52 74374 : return connections_.size();
53 37187 : }
54 :
55 1544 : void connection_pool::clear() {
56 1544 : std::unique_lock<std::shared_mutex> lock(mutex_);
57 :
58 : // Always close all active connections gracefully before clearing
59 : // There's no use case for keeping connections alive outside the pool
60 1544 : auto& seq_index = connections_.get<by_sequence>();
61 903836 : for (auto it = seq_index.begin(); it != seq_index.end(); ++it) {
62 451146 : if (auto conn = it->connection.lock()) {
63 : // Use the connection's close method to ensure graceful shutdown
64 : // This will notify all pending requests with an error
65 115 : conn->close();
66 451146 : }
67 : }
68 :
69 1544 : connections_.clear();
70 1544 : }
71 :
72 3145132 : std::shared_ptr<client_connection> connection_pool::get_connection_impl(const std::string& host,
73 : uint16_t port,
74 : bool ssl,
75 : const std::string& unix_path) {
76 : // First try with shared lock for reading
77 : {
78 3145132 : std::shared_lock<std::shared_mutex> lock(mutex_);
79 3145132 : auto& key_index = connections_.get<by_key>();
80 3145132 : auto it = key_index.find(std::make_tuple(host, port, ssl, unix_path));
81 :
82 3145132 : if (it != key_index.end()) {
83 : // Try to convert weak_ptr to shared_ptr
84 2692894 : if (auto conn = it->connection.lock()) {
85 : // Connection is still alive, return it
86 2389201 : return conn;
87 2692894 : }
88 : // Connection expired, need to remove it with write lock
89 : } else {
90 : // Not found
91 452238 : return nullptr;
92 : }
93 3145132 : }
94 :
95 : // Only take write lock if we need to remove an expired connection
96 303693 : std::unique_lock<std::shared_mutex> lock(mutex_);
97 303693 : auto& key_index = connections_.get<by_key>();
98 303693 : auto it = key_index.find(std::make_tuple(host, port, ssl, unix_path));
99 :
100 303693 : if (it != key_index.end() && it->connection.expired()) {
101 303402 : key_index.erase(it);
102 : }
103 :
104 303693 : return nullptr;
105 303693 : }
106 :
107 767534 : void connection_pool::store_connection_impl(const std::string& host,
108 : uint16_t port,
109 : bool ssl,
110 : const std::string& unix_path,
111 : std::shared_ptr<client_connection> connection) {
112 767534 : if (!connection) return;
113 :
114 767534 : std::unique_lock<std::shared_mutex> lock(mutex_);
115 :
116 : // Remove any existing entry with the same key
117 767534 : auto& key_index = connections_.get<by_key>();
118 767534 : auto it = key_index.find(std::make_tuple(host, port, ssl, unix_path));
119 767534 : if (it != key_index.end()) {
120 12662 : key_index.erase(it);
121 : }
122 :
123 : // Insert the new connection
124 767534 : connections_.emplace(host, port, ssl, unix_path, connection);
125 767534 : }
126 :
127 : } // namespace thinger::http
|