LCOV - code coverage report
Current view: top level - src - ctpl_stl.h (source / functions) Hit Total Coverage
Test: test_dash_coverage.info Lines: 83 114 72.8 %
Date: 2026-06-25 07:23:51 Functions: 28 192 14.6 %

          Line data    Source code
       1             : /*********************************************************
       2             : *
       3             : *  Copyright (C) 2014 by Vitaliy Vitsentiy
       4             : *
       5             : *  Licensed under the Apache License, Version 2.0 (the "License");
       6             : *  you may not use this file except in compliance with the License.
       7             : *  You may obtain a copy of the License at
       8             : *
       9             : *     http://www.apache.org/licenses/LICENSE-2.0
      10             : *
      11             : *  Unless required by applicable law or agreed to in writing, software
      12             : *  distributed under the License is distributed on an "AS IS" BASIS,
      13             : *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      14             : *  See the License for the specific language governing permissions and
      15             : *  limitations under the License.
      16             : *
      17             : *********************************************************/
      18             : 
      19             : 
      20             : #ifndef __ctpl_stl_thread_pool_H__
      21             : #define __ctpl_stl_thread_pool_H__
      22             : 
      23             : #include <functional>
      24             : #include <thread>
      25             : #include <atomic>
      26             : #include <vector>
      27             : #include <memory>
      28             : #include <exception>
      29             : #include <future>
      30             : #include <mutex>
      31             : #include <queue>
      32             : 
      33             : 
      34             : 
      35             : // thread pool to run user's functors with signature
      36             : //      ret func(int id, other_params)
      37             : // where id is the index of the thread that runs the functor
      38             : // ret is some return type
      39             : 
      40             : 
      41             : namespace ctpl {
      42             : 
      43             :     namespace detail {
      44             :         template <typename T>
      45             :         class Queue {
      46             :         public:
      47         720 :             bool push(T const & value) {
      48         720 :                 std::unique_lock<std::mutex> lock(this->mutex);
      49         720 :                 this->q.push(value);
      50             :                 return true;
      51         720 :             }
      52             :             // deletes the retrieved element, do not use for non integral types
      53        4546 :             bool pop(T & v) {
      54        4546 :                 std::unique_lock<std::mutex> lock(this->mutex);
      55        4546 :                 if (this->q.empty())
      56        3826 :                     return false;
      57         720 :                 v = this->q.front();
      58         720 :                 this->q.pop();
      59         720 :                 return true;
      60        4546 :             }
      61             :             bool empty() {
      62             :                 std::unique_lock<std::mutex> lock(this->mutex);
      63             :                 return this->q.empty();
      64             :             }
      65             :         private:
      66             :             std::queue<T> q;
      67             :             std::mutex mutex;
      68             :         };
      69             :     }
      70             : 
      71             :     class thread_pool {
      72             : 
      73             :     public:
      74             : 
      75         360 :         thread_pool() { this->init(); }
      76             :         explicit thread_pool(int nThreads) { this->init(); this->resize(nThreads); }
      77             : 
      78             :         // the destructor waits for all the functions in the queue to be finished
      79         360 :         ~thread_pool() {
      80         180 :             this->stop(true);
      81         360 :         }
      82             : 
      83             :         // get the number of running threads in the pool
      84         900 :         int size() { return static_cast<int>(this->threads.size()); }
      85             : 
      86             :         // number of idle threads
      87             :         int n_idle() { return this->nWaiting; }
      88             :         std::thread & get_thread(int i) { return *this->threads[i]; }
      89             : 
      90             :         // change the number of threads in the pool
      91             :         // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
      92             :         // nThreads must be >= 0
      93         180 :         void resize(int nThreads) {
      94         180 :             if (!this->isStop && !this->isDone) {
      95         180 :                 int oldNThreads = static_cast<int>(this->threads.size());
      96         180 :                 if (oldNThreads <= nThreads) {  // if the number of threads is increased
      97         180 :                     this->threads.resize(nThreads);
      98         180 :                     this->flags.resize(nThreads);
      99             : 
     100         900 :                     for (int i = oldNThreads; i < nThreads; ++i) {
     101         720 :                         this->flags[i] = std::make_shared<std::atomic<bool>>(false);
     102         720 :                         this->set_thread(i);
     103         720 :                     }
     104         180 :                 }
     105             :                 else {  // the number of threads is decreased
     106           0 :                     for (int i = oldNThreads - 1; i >= nThreads; --i) {
     107           0 :                         *this->flags[i] = true;  // this thread will finish
     108           0 :                         this->threads[i]->detach();
     109           0 :                     }
     110             :                     {
     111             :                         // stop the detached threads that were waiting
     112           0 :                         std::unique_lock<std::mutex> lock(this->mutex);
     113           0 :                         this->cv.notify_all();
     114           0 :                     }
     115           0 :                     this->threads.resize(nThreads);  // safe to delete because the threads are detached
     116           0 :                     this->flags.resize(nThreads);  // safe to delete because the threads have copies of shared_ptr of the flags, not originals
     117             :                 }
     118         180 :             }
     119         180 :         }
     120             : 
     121             :         // empty the queue
     122         540 :         void clear_queue() {
     123             :             std::function<void(int id)> * _f;
     124         540 :             while (this->q.pop(_f))
     125           0 :                 delete _f; // empty the queue
     126         540 :         }
     127             : 
     128             :         // pops a functional wrapper to the original function
     129             :         std::function<void(int)> pop() {
     130             :             std::function<void(int id)> * _f = nullptr;
     131             :             this->q.pop(_f);
     132             :             [[maybe_unused]] std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
     133             :             std::function<void(int)> f;
     134             :             if (_f)
     135             :                 f = *_f;
     136             :             return f;
     137             :         }
     138             : 
     139             :         // wait for all computing threads to finish and stop all threads
     140             :         // may be called asynchronously to not pause the calling thread while waiting
     141             :         // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
     142         540 :         void stop(bool isWait = false) {
     143         540 :             if (!isWait) {
     144           0 :                 if (this->isStop)
     145           0 :                     return;
     146           0 :                 this->isStop = true;
     147           0 :                 for (int i = 0, n = this->size(); i < n; ++i) {
     148           0 :                     *this->flags[i] = true;  // command the threads to stop
     149           0 :                 }
     150           0 :                 this->clear_queue();  // empty the queue
     151           0 :             }
     152             :             else {
     153         540 :                 if (this->isDone || this->isStop)
     154         360 :                     return;
     155         180 :                 this->isDone = true;  // give the waiting threads a command to finish
     156             :             }
     157             :             {
     158         180 :                 std::unique_lock<std::mutex> lock(this->mutex);
     159         180 :                 this->cv.notify_all();  // stop all waiting threads
     160         180 :             }
     161         900 :             for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) {  // wait for the computing threads to finish
     162         720 :                 if (this->threads[i]->joinable())
     163         720 :                     this->threads[i]->join();
     164         720 :             }
     165             :             // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
     166             :             // therefore delete them here
     167         180 :             this->clear_queue();
     168         180 :             this->threads.clear();
     169         180 :             this->flags.clear();
     170         540 :         }
     171             : 
     172             :         template<typename F, typename... Rest>
     173           0 :         auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
     174           0 :             auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
     175           0 :                     std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
     176             :             );
     177           0 :             auto fut = pck->get_future();
     178           0 :             auto _f = new std::function<void(int id)>([pck](int id) {
     179           0 :                 (*pck)(id);
     180           0 :             });
     181           0 :             this->q.push(_f);
     182           0 :             std::unique_lock<std::mutex> lock(this->mutex);
     183           0 :             this->cv.notify_one();
     184           0 :             return fut;
     185           0 :         }
     186             : 
     187             :         // run the user's function that excepts argument int - id of the running thread. returned value is templatized
     188             :         // operator returns std::future, where the user can get the result and rethrow the catched exceptins
     189             :         template<typename F>
     190         720 :         auto push(F && f) ->std::future<decltype(f(0))> {
     191         720 :             auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
     192         720 :             auto fut = pck->get_future();
     193        1437 :             auto _f = new std::function<void(int id)>([pck](int id) {
     194         717 :                 (*pck)(id);
     195         717 :             });
     196         720 :             this->q.push(_f);
     197         720 :             std::unique_lock<std::mutex> lock(this->mutex);
     198         720 :             this->cv.notify_one();
     199         720 :             return fut;
     200         720 :         }
     201             : 
     202             : 
     203             :     private:
     204             : 
     205             :         // deleted
     206             :         thread_pool(const thread_pool &);// = delete;
     207             :         thread_pool(thread_pool &&);// = delete;
     208             :         thread_pool & operator=(const thread_pool &);// = delete;
     209             :         thread_pool & operator=(thread_pool &&);// = delete;
     210             : 
     211         720 :         void set_thread(int i) {
     212         720 :             std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
     213        1440 :             auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
     214         720 :                 std::atomic<bool> & _flag = *flag;
     215             :                 std::function<void(int id)> * _f;
     216         720 :                 bool isPop = this->q.pop(_f);
     217        1283 :                 while (true) {
     218        2002 :                     while (isPop) {  // if there is anything in the queue
     219         719 :                         std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
     220         719 :                         (*_f)(i);
     221         663 :                         if (_flag)
     222           0 :                             return;  // the thread is wanted to stop, return even if the queue is not empty yet
     223             :                         else
     224         663 :                             isPop = this->q.pop(_f);
     225         831 :                     }
     226             :                     // the queue is empty here, wait for the next command
     227        1283 :                     std::unique_lock<std::mutex> lock(this->mutex);
     228        1283 :                     ++this->nWaiting;
     229        3849 :                     this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
     230        1283 :                     --this->nWaiting;
     231        1283 :                     if (!isPop)
     232         720 :                         return;  // if the queue is empty and this->isDone == true or *flag then return
     233        1283 :                 }
     234         832 :             };
     235         720 :             this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
     236         720 :         }
     237             : 
     238         180 :         void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
     239             : 
     240             :         std::vector<std::unique_ptr<std::thread>> threads;
     241             :         std::vector<std::shared_ptr<std::atomic<bool>>> flags;
     242             :         detail::Queue<std::function<void(int id)> *> q;
     243             :         std::atomic<bool> isDone;
     244             :         std::atomic<bool> isStop;
     245             :         std::atomic<int> nWaiting;  // how many threads are waiting
     246             : 
     247             :         std::mutex mutex;
     248             :         std::condition_variable cv;
     249             :     };
     250             : 
     251             : }
     252             : 
     253             : #endif // __ctpl_stl_thread_pool_H__

Generated by: LCOV version 1.16