LCOV - code coverage report
Current view: top level - asio - workers.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 79.9 % 134 107
Test Date: 2026-02-20 15:38:22 Functions: 75.0 % 16 12

            Line data    Source code
       1              : #include "workers.hpp"
       2              : #include <boost/bind/bind.hpp>
       3              : #include <boost/asio/post.hpp>
       4              : #include <mutex>
       5              : #include <algorithm>
       6              : #include "../util/logger.hpp"
       7              : 
       8              : namespace thinger::asio{
       9              : 
      10              :         // Lazy singleton implementation
      11         1582 :         workers& get_workers() {
      12         1582 :                 static workers instance;
      13         1582 :                 return instance;
      14              :         }
      15              : 
      16              :         using std::thread;
      17              :         using std::bind;
      18              :         using std::shared_ptr;
      19              :         using std::make_shared;
      20              : 
      21           81 :     workers::workers() :
      22           81 :         signals_(wait_context_){
      23              : 
      24           81 :     }
      25              : 
      26           81 :     workers::~workers()
      27              :     {
      28           81 :         if(running_) {
      29            0 :             LOG_WARNING("workers destructor called while still running - forcing stop");
      30            0 :             stop();
      31              :         }
      32           81 :     }
      33              : 
      34            0 :     void workers::wait(const std::set<unsigned>& signals) {
      35            0 :         LOG_DEBUG("registering stop signals...");
      36              :         
      37              :         // Keep the wait_context_ running
      38            0 :         wait_work_ = std::make_unique<work_guard_type>(wait_context_.get_executor());
      39              :         
      40            0 :         for (auto signal: signals){
      41            0 :             signals_.add(signal);
      42              :         }
      43              : 
      44              :         // wait for any signal to be received
      45            0 :         signals_.async_wait([this](const boost::system::error_code& ec, int signal_number){
      46            0 :             if(!ec){
      47            0 :                 LOG_INFO("received signal: {}", signal_number);
      48            0 :                 stop();
      49              :             }
      50            0 :         });
      51              : 
      52              :         // Run until stop() is called
      53            0 :         wait_context_.run();
      54              :         
      55              :         // Clean up
      56            0 :         wait_work_.reset();
      57            0 :     }
      58              : 
      59              : 
      60          281 :     bool workers::start(size_t worker_threads)
      61              :     {
      62          281 :         std::scoped_lock<std::mutex> lock(mutex_);
      63          281 :         if(running_) return false;
      64          281 :         running_ = true;
      65              : 
      66          281 :         LOG_INFO("starting {} working threads in the shared pool", worker_threads);
      67          281 :         worker_threads_.reserve(worker_threads);
      68        23227 :         for(auto thread_number=1; thread_number<=worker_threads; ++thread_number){
      69        22946 :             auto worker = std::make_unique<worker_thread>("worker thread " + std::to_string(thread_number));
      70        22946 :             auto id = worker->start();
      71        22946 :             workers_threads_map_.emplace(id, *worker);
      72        22946 :             worker_threads_.emplace_back(std::move(worker));
      73        22946 :         }
      74              : 
      75          281 :         return running_;
      76          281 :     }
      77              : 
      78            6 :     boost::asio::io_context& workers::get_isolated_io_context(std::string thread_name)
      79              :     {
      80            6 :         LOG_INFO("starting '{}' worker thread", thread_name);
      81            6 :         auto worker = std::make_unique<worker_thread>(std::move(thread_name));
      82            6 :         auto& io_context = worker->get_io_context();
      83            6 :         auto thread_id = worker->start();
      84            6 :         auto& worker_ref = *worker;
      85            6 :         job_threads_.emplace_back(std::move(worker));
      86            6 :         workers_threads_map_.emplace(thread_id, worker_ref);
      87            6 :         return io_context;
      88            6 :     }
      89              : 
      90          281 :     void workers::do_stop()
      91              :     {
      92          281 :         LOG_INFO("executing full stop");
      93              :         
      94              :         // First, notify all clients to stop
      95              :         {
      96          281 :             std::lock_guard<std::mutex> lock(clients_mutex_);
      97          281 :             if (!clients_.empty()) {
      98            0 :                 LOG_INFO("Stopping {} worker clients", clients_.size());
      99            0 :                 for (auto* client : clients_) {
     100            0 :                     if (client && client->is_running()) {
     101              :                         try {
     102            0 :                             LOG_DEBUG("Stopping client: {}", client->get_service_name());
     103            0 :                             client->stop();
     104            0 :                         } catch (const std::exception& e) {
     105            0 :                             LOG_ERROR("Error stopping client {}: {}", client->get_service_name(), e.what());
     106            0 :                         }
     107              :                     }
     108              :                 }
     109              :             }
     110          281 :         }
     111              :         
     112              :         // Stop all job threads
     113          281 :         LOG_INFO("stopping job threads");
     114          287 :         for(auto const& worker_thread : job_threads_){
     115            6 :             worker_thread->stop();
     116              :         }
     117              : 
     118              :         // Stop all worker threads
     119          281 :         LOG_INFO("stopping worker threads");
     120        23227 :         for(auto const& worker_thread : worker_threads_){
     121        22946 :             worker_thread->stop();
     122              :         }
     123              : 
     124              :         // Clear auxiliary references
     125          281 :         LOG_INFO("clearing structures");
     126          281 :         worker_threads_.clear();
     127          281 :         job_threads_.clear();
     128          281 :         workers_threads_map_.clear();
     129          281 :         next_io_context_ = 0;
     130              : 
     131              :         // Cancel signals and stop wait_context
     132          281 :         LOG_INFO("stopping wait context");
     133          281 :         signals_.cancel();
     134          281 :         wait_work_.reset();
     135          281 :         wait_context_.stop();
     136              :         
     137          281 :         LOG_INFO("all done!");
     138          281 :     }
     139              : 
     140          293 :     bool workers::stop()
     141              :     {
     142              :         //LOG_INFO("workers::stop() called");
     143              :         
     144              :         // Check if already stopping
     145              :         {
     146          293 :             std::scoped_lock<std::mutex> lock(mutex_);
     147          293 :             if(!running_) {
     148           12 :                 LOG_WARNING("workers already stopped");
     149           12 :                 return false;
     150              :             }
     151          281 :             running_ = false;
     152          293 :         }
     153              : 
     154              :         // Check if wait_context_ is running in the current thread
     155          281 :         if (wait_context_.get_executor().running_in_this_thread()) {
     156              :             // We're already in wait_context_, execute directly
     157              :             //LOG_INFO("executing stop directly in wait_context thread");
     158            0 :             do_stop();
     159          281 :         } else if (wait_work_) {
     160              :             // wait_context_ is running in another thread, post to it
     161              :             //LOG_INFO("posting stop to wait_context thread");
     162            0 :             boost::asio::post(wait_context_, [this]() {
     163            0 :                 do_stop();
     164            0 :             });
     165              :         } else {
     166              :             // wait_context_ is not running, execute directly
     167              :             //LOG_INFO("wait_context not running, executing stop directly");
     168          281 :             do_stop();
     169              :         }
     170              : 
     171          281 :         return true;
     172              :     }
     173              : 
     174           24 :         boost::asio::io_context& workers::get_next_io_context()
     175              :         {
     176           24 :         return worker_threads_[next_io_context_++%worker_threads_.size()]->get_io_context();
     177              :         }
     178              : 
     179          878 :         boost::asio::io_context& workers::get_thread_io_context()
     180              :         {
     181          878 :         std::thread::id this_id = std::this_thread::get_id();
     182              :         
     183              :         // First check if this is a worker thread
     184              :         {
     185          878 :             std::scoped_lock<std::mutex> lock(mutex_);
     186          878 :             auto it = workers_threads_map_.find(this_id);
     187          878 :             if(it != workers_threads_map_.end()){
     188          445 :                 return it->second.get().get_io_context();
     189              :             }
     190          878 :         }
     191              :         
     192              :         // Not a worker thread - return a worker thread's io_context
     193              :         // The actual thread affinity will be handled differently
     194          433 :         LOG_DEBUG("Thread is not a worker thread, using first worker's io_context");
     195          433 :         return worker_threads_.begin()->get()->get_io_context();
     196              :         }
     197              : 
     198              :     // Client management implementation
     199          295 :     void workers::register_client(worker_client* client) {
     200          295 :         if (!client) return;
     201              :         
     202          295 :         bool should_start = false;
     203              :         
     204              :         {
     205          295 :             std::lock_guard<std::mutex> lock(clients_mutex_);
     206          295 :             auto result = clients_.insert(client);
     207              :             
     208          295 :             if (result.second) { // Successfully inserted
     209          295 :                 LOG_INFO("Worker client registered: {}", client->get_service_name());
     210              :                 
     211              :                 // Auto-start if this is the first client and auto-manage is enabled
     212          295 :                 if (auto_manage_ && clients_.size() == 1 && !running_) {
     213          260 :                     LOG_INFO("First worker client registered, starting workers automatically");
     214          260 :                     should_start = true;
     215              :                 }
     216              :             }
     217          295 :         }
     218              :         
     219              :         // Start outside the lock to avoid potential deadlock
     220          295 :         if (should_start) {
     221          260 :             start();
     222              :         }
     223              :     }
     224              :     
     225          295 :     void workers::unregister_client(worker_client* client) {
     226          295 :         if (!client) return;
     227              : 
     228          295 :         bool should_stop = false;
     229              :         
     230              :         {
     231          295 :             std::lock_guard<std::mutex> lock(clients_mutex_);
     232          295 :             size_t removed = clients_.erase(client);
     233              :             
     234          295 :             if (removed > 0) {
     235          295 :                 LOG_INFO("Worker client unregistered: {}", client->get_service_name());
     236              :                 
     237              :                 // Auto-stop if no clients remain and auto-manage is enabled
     238          295 :                 if (auto_manage_ && clients_.empty() && running_) {
     239          272 :                     LOG_INFO("Last worker client unregistered, stopping workers automatically");
     240          272 :                     should_stop = true;
     241              :                 }
     242              :             }
     243          295 :         }
     244              :         
     245              :         // Stop outside the lock to avoid potential deadlock
     246          295 :         if (should_stop) {
     247          272 :             stop();
     248              :         }
     249              :     }
     250              :     
     251           39 :     size_t workers::client_count() const {
     252           39 :         std::lock_guard<std::mutex> lock(clients_mutex_);
     253           78 :         return clients_.size();
     254           39 :     }
     255              : 
     256              : }
        

Generated by: LCOV version 2.0-1