Line data Source code
1 : #include "workers.hpp"
2 : #include <boost/bind/bind.hpp>
3 : #include <boost/asio/post.hpp>
4 : #include <mutex>
5 : #include <algorithm>
6 : #include "../util/logger.hpp"
7 :
8 : namespace thinger::asio{
9 :
10 : // Lazy singleton implementation
11 1582 : workers& get_workers() {
12 1582 : static workers instance;
13 1582 : return instance;
14 : }
15 :
16 : using std::thread;
17 : using std::bind;
18 : using std::shared_ptr;
19 : using std::make_shared;
20 :
21 81 : workers::workers() :
22 81 : signals_(wait_context_){
23 :
24 81 : }
25 :
26 81 : workers::~workers()
27 : {
28 81 : if(running_) {
29 0 : LOG_WARNING("workers destructor called while still running - forcing stop");
30 0 : stop();
31 : }
32 81 : }
33 :
34 0 : void workers::wait(const std::set<unsigned>& signals) {
35 0 : LOG_DEBUG("registering stop signals...");
36 :
37 : // Keep the wait_context_ running
38 0 : wait_work_ = std::make_unique<work_guard_type>(wait_context_.get_executor());
39 :
40 0 : for (auto signal: signals){
41 0 : signals_.add(signal);
42 : }
43 :
44 : // wait for any signal to be received
45 0 : signals_.async_wait([this](const boost::system::error_code& ec, int signal_number){
46 0 : if(!ec){
47 0 : LOG_INFO("received signal: {}", signal_number);
48 0 : stop();
49 : }
50 0 : });
51 :
52 : // Run until stop() is called
53 0 : wait_context_.run();
54 :
55 : // Clean up
56 0 : wait_work_.reset();
57 0 : }
58 :
59 :
60 281 : bool workers::start(size_t worker_threads)
61 : {
62 281 : std::scoped_lock<std::mutex> lock(mutex_);
63 281 : if(running_) return false;
64 281 : running_ = true;
65 :
66 281 : LOG_INFO("starting {} working threads in the shared pool", worker_threads);
67 281 : worker_threads_.reserve(worker_threads);
68 23227 : for(auto thread_number=1; thread_number<=worker_threads; ++thread_number){
69 22946 : auto worker = std::make_unique<worker_thread>("worker thread " + std::to_string(thread_number));
70 22946 : auto id = worker->start();
71 22946 : workers_threads_map_.emplace(id, *worker);
72 22946 : worker_threads_.emplace_back(std::move(worker));
73 22946 : }
74 :
75 281 : return running_;
76 281 : }
77 :
78 6 : boost::asio::io_context& workers::get_isolated_io_context(std::string thread_name)
79 : {
80 6 : LOG_INFO("starting '{}' worker thread", thread_name);
81 6 : auto worker = std::make_unique<worker_thread>(std::move(thread_name));
82 6 : auto& io_context = worker->get_io_context();
83 6 : auto thread_id = worker->start();
84 6 : auto& worker_ref = *worker;
85 6 : job_threads_.emplace_back(std::move(worker));
86 6 : workers_threads_map_.emplace(thread_id, worker_ref);
87 6 : return io_context;
88 6 : }
89 :
90 281 : void workers::do_stop()
91 : {
92 281 : LOG_INFO("executing full stop");
93 :
94 : // First, notify all clients to stop
95 : {
96 281 : std::lock_guard<std::mutex> lock(clients_mutex_);
97 281 : if (!clients_.empty()) {
98 0 : LOG_INFO("Stopping {} worker clients", clients_.size());
99 0 : for (auto* client : clients_) {
100 0 : if (client && client->is_running()) {
101 : try {
102 0 : LOG_DEBUG("Stopping client: {}", client->get_service_name());
103 0 : client->stop();
104 0 : } catch (const std::exception& e) {
105 0 : LOG_ERROR("Error stopping client {}: {}", client->get_service_name(), e.what());
106 0 : }
107 : }
108 : }
109 : }
110 281 : }
111 :
112 : // Stop all job threads
113 281 : LOG_INFO("stopping job threads");
114 287 : for(auto const& worker_thread : job_threads_){
115 6 : worker_thread->stop();
116 : }
117 :
118 : // Stop all worker threads
119 281 : LOG_INFO("stopping worker threads");
120 23227 : for(auto const& worker_thread : worker_threads_){
121 22946 : worker_thread->stop();
122 : }
123 :
124 : // Clear auxiliary references
125 281 : LOG_INFO("clearing structures");
126 281 : worker_threads_.clear();
127 281 : job_threads_.clear();
128 281 : workers_threads_map_.clear();
129 281 : next_io_context_ = 0;
130 :
131 : // Cancel signals and stop wait_context
132 281 : LOG_INFO("stopping wait context");
133 281 : signals_.cancel();
134 281 : wait_work_.reset();
135 281 : wait_context_.stop();
136 :
137 281 : LOG_INFO("all done!");
138 281 : }
139 :
140 293 : bool workers::stop()
141 : {
142 : //LOG_INFO("workers::stop() called");
143 :
144 : // Check if already stopping
145 : {
146 293 : std::scoped_lock<std::mutex> lock(mutex_);
147 293 : if(!running_) {
148 12 : LOG_WARNING("workers already stopped");
149 12 : return false;
150 : }
151 281 : running_ = false;
152 293 : }
153 :
154 : // Check if wait_context_ is running in the current thread
155 281 : if (wait_context_.get_executor().running_in_this_thread()) {
156 : // We're already in wait_context_, execute directly
157 : //LOG_INFO("executing stop directly in wait_context thread");
158 0 : do_stop();
159 281 : } else if (wait_work_) {
160 : // wait_context_ is running in another thread, post to it
161 : //LOG_INFO("posting stop to wait_context thread");
162 0 : boost::asio::post(wait_context_, [this]() {
163 0 : do_stop();
164 0 : });
165 : } else {
166 : // wait_context_ is not running, execute directly
167 : //LOG_INFO("wait_context not running, executing stop directly");
168 281 : do_stop();
169 : }
170 :
171 281 : return true;
172 : }
173 :
174 24 : boost::asio::io_context& workers::get_next_io_context()
175 : {
176 24 : return worker_threads_[next_io_context_++%worker_threads_.size()]->get_io_context();
177 : }
178 :
179 878 : boost::asio::io_context& workers::get_thread_io_context()
180 : {
181 878 : std::thread::id this_id = std::this_thread::get_id();
182 :
183 : // First check if this is a worker thread
184 : {
185 878 : std::scoped_lock<std::mutex> lock(mutex_);
186 878 : auto it = workers_threads_map_.find(this_id);
187 878 : if(it != workers_threads_map_.end()){
188 445 : return it->second.get().get_io_context();
189 : }
190 878 : }
191 :
192 : // Not a worker thread - return a worker thread's io_context
193 : // The actual thread affinity will be handled differently
194 433 : LOG_DEBUG("Thread is not a worker thread, using first worker's io_context");
195 433 : return worker_threads_.begin()->get()->get_io_context();
196 : }
197 :
198 : // Client management implementation
199 295 : void workers::register_client(worker_client* client) {
200 295 : if (!client) return;
201 :
202 295 : bool should_start = false;
203 :
204 : {
205 295 : std::lock_guard<std::mutex> lock(clients_mutex_);
206 295 : auto result = clients_.insert(client);
207 :
208 295 : if (result.second) { // Successfully inserted
209 295 : LOG_INFO("Worker client registered: {}", client->get_service_name());
210 :
211 : // Auto-start if this is the first client and auto-manage is enabled
212 295 : if (auto_manage_ && clients_.size() == 1 && !running_) {
213 260 : LOG_INFO("First worker client registered, starting workers automatically");
214 260 : should_start = true;
215 : }
216 : }
217 295 : }
218 :
219 : // Start outside the lock to avoid potential deadlock
220 295 : if (should_start) {
221 260 : start();
222 : }
223 : }
224 :
225 295 : void workers::unregister_client(worker_client* client) {
226 295 : if (!client) return;
227 :
228 295 : bool should_stop = false;
229 :
230 : {
231 295 : std::lock_guard<std::mutex> lock(clients_mutex_);
232 295 : size_t removed = clients_.erase(client);
233 :
234 295 : if (removed > 0) {
235 295 : LOG_INFO("Worker client unregistered: {}", client->get_service_name());
236 :
237 : // Auto-stop if no clients remain and auto-manage is enabled
238 295 : if (auto_manage_ && clients_.empty() && running_) {
239 272 : LOG_INFO("Last worker client unregistered, stopping workers automatically");
240 272 : should_stop = true;
241 : }
242 : }
243 295 : }
244 :
245 : // Stop outside the lock to avoid potential deadlock
246 295 : if (should_stop) {
247 272 : stop();
248 : }
249 : }
250 :
251 39 : size_t workers::client_count() const {
252 39 : std::lock_guard<std::mutex> lock(clients_mutex_);
253 78 : return clients_.size();
254 39 : }
255 :
256 : }
|