Line data Source code
1 : #include "connection_pool.hpp"
2 : #include "../../util/logger.hpp"
3 :
4 : namespace thinger::http {
5 :
6 827 : connection_pool::~connection_pool() {
7 : // Always close connections when destroying the pool
8 : // There's no point keeping them alive as they won't be reachable
9 827 : clear();
10 827 : }
11 :
12 3211516 : std::shared_ptr<client_connection> connection_pool::get_connection(const std::string& host,
13 : uint16_t port,
14 : bool ssl) {
15 6423032 : return get_connection_impl(host, port, ssl, "");
16 : }
17 :
18 57 : std::shared_ptr<client_connection> connection_pool::get_unix_connection(const std::string& unix_path) {
19 114 : return get_connection_impl("", 0, false, unix_path);
20 : }
21 :
22 767625 : void connection_pool::store_connection(const std::string& host,
23 : uint16_t port,
24 : bool ssl,
25 : std::shared_ptr<client_connection> connection) {
26 2302875 : store_connection_impl(host, port, ssl, "", connection);
27 767625 : }
28 :
29 57 : void connection_pool::store_unix_connection(const std::string& unix_path,
30 : std::shared_ptr<client_connection> connection) {
31 171 : store_connection_impl("", 0, false, unix_path, connection);
32 57 : }
33 :
34 591 : size_t connection_pool::cleanup_expired() {
35 591 : std::unique_lock<std::shared_mutex> lock(mutex_);
36 591 : size_t removed = 0;
37 591 : auto& seq_index = connections_.get<by_sequence>();
38 591 : auto it = seq_index.begin();
39 4444 : while (it != seq_index.end()) {
40 3853 : if (it->connection.expired()) {
41 324 : it = seq_index.erase(it);
42 324 : ++removed;
43 : } else {
44 3529 : ++it;
45 : }
46 : }
47 591 : return removed;
48 591 : }
49 :
50 37106 : size_t connection_pool::size() const {
51 37106 : std::shared_lock<std::shared_mutex> lock(mutex_);
52 74212 : return connections_.size();
53 37106 : }
54 :
55 1758 : void connection_pool::clear() {
56 1758 : std::unique_lock<std::shared_mutex> lock(mutex_);
57 :
58 : // Always close all active connections gracefully before clearing
59 : // There's no use case for keeping connections alive outside the pool
60 1758 : auto& seq_index = connections_.get<by_sequence>();
61 1426640 : for (auto it = seq_index.begin(); it != seq_index.end(); ++it) {
62 712441 : if (auto conn = it->connection.lock()) {
63 : // Use the connection's close method to ensure graceful shutdown
64 : // This will notify all pending requests with an error
65 116 : conn->close();
66 712441 : }
67 : }
68 :
69 1758 : connections_.clear();
70 1758 : }
71 :
72 3211573 : std::shared_ptr<client_connection> connection_pool::get_connection_impl(const std::string& host,
73 : uint16_t port,
74 : bool ssl,
75 : const std::string& unix_path) {
76 : // First try with shared lock for reading
77 : {
78 3211573 : std::shared_lock<std::shared_mutex> lock(mutex_);
79 3211573 : auto& key_index = connections_.get<by_key>();
80 3211573 : auto it = key_index.find(std::make_tuple(host, port, ssl, unix_path));
81 :
82 3211573 : if (it != key_index.end()) {
83 : // Try to convert weak_ptr to shared_ptr
84 2497817 : if (auto conn = it->connection.lock()) {
85 : // Connection is still alive, return it
86 2455210 : return conn;
87 2497817 : }
88 : // Connection expired, need to remove it with write lock
89 : } else {
90 : // Not found
91 713756 : return nullptr;
92 : }
93 3211573 : }
94 :
95 : // Only take write lock if we need to remove an expired connection
96 42607 : std::unique_lock<std::shared_mutex> lock(mutex_);
97 42607 : auto& key_index = connections_.get<by_key>();
98 42607 : auto it = key_index.find(std::make_tuple(host, port, ssl, unix_path));
99 :
100 42607 : if (it != key_index.end() && it->connection.expired()) {
101 42406 : key_index.erase(it);
102 : }
103 :
104 42607 : return nullptr;
105 42607 : }
106 :
107 767682 : void connection_pool::store_connection_impl(const std::string& host,
108 : uint16_t port,
109 : bool ssl,
110 : const std::string& unix_path,
111 : std::shared_ptr<client_connection> connection) {
112 767682 : if (!connection) return;
113 :
114 767682 : std::unique_lock<std::shared_mutex> lock(mutex_);
115 :
116 : // Remove any existing entry with the same key
117 767682 : auto& key_index = connections_.get<by_key>();
118 767682 : auto it = key_index.find(std::make_tuple(host, port, ssl, unix_path));
119 767682 : if (it != key_index.end()) {
120 12511 : key_index.erase(it);
121 : }
122 :
123 : // Insert the new connection
124 767682 : connections_.emplace(host, port, ssl, unix_path, connection);
125 767682 : }
126 :
127 : } // namespace thinger::http
|