LCOV - code coverage report
Current view: top level - src - checkqueue.h (source / functions) Hit Total Coverage
Test: total_coverage.info Lines: 116 116 100.0 %
Date: 2026-06-25 07:23:43 Functions: 135 137 98.5 %

          Line data    Source code
       1             : // Copyright (c) 2012-2021 The Bitcoin Core developers
       2             : // Distributed under the MIT software license, see the accompanying
       3             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4             : 
       5             : #ifndef BITCOIN_CHECKQUEUE_H
       6             : #define BITCOIN_CHECKQUEUE_H
       7             : 
       8             : #include <sync.h>
       9             : #include <tinyformat.h>
      10             : #include <util/threadnames.h>
      11             : 
      12             : #include <algorithm>
      13             : #include <vector>
      14             : 
      15             : template <typename T>
      16             : class CCheckQueueControl;
      17             : 
      18             : /**
      19             :  * Queue for verifications that have to be performed.
      20             :   * The verifications are represented by a type T, which must provide an
      21             :   * operator(), returning a bool.
      22             :   *
      23             :   * One thread (the master) is assumed to push batches of verifications
      24             :   * onto the queue, where they are processed by N-1 worker threads. When
      25             :   * the master is done adding work, it temporarily joins the worker pool
      26             :   * as an N'th worker, until all jobs are done.
      27             :   */
      28             : template <typename T>
      29             : class CCheckQueue
      30             : {
      31             : private:
      32             :     //! Mutex to protect the inner state
      33             :     Mutex m_mutex;
      34             : 
      35             :     //! Worker threads block on this when out of work
      36             :     std::condition_variable m_worker_cv;
      37             : 
      38             :     //! Master thread blocks on this when out of work
      39             :     std::condition_variable m_master_cv;
      40             : 
      41             :     //! The queue of elements to be processed.
      42             :     //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
      43             :     std::vector<T> queue GUARDED_BY(m_mutex);
      44             : 
      45             :     //! The number of workers (including the master) that are idle.
      46        6381 :     int nIdle GUARDED_BY(m_mutex){0};
      47             : 
      48             :     //! The total number of workers (including the master).
      49        6381 :     int nTotal GUARDED_BY(m_mutex){0};
      50             : 
      51             :     //! The temporary evaluation result.
      52        6381 :     bool fAllOk GUARDED_BY(m_mutex){true};
      53             : 
      54             :     /**
      55             :      * Number of verifications that haven't completed yet.
      56             :      * This includes elements that are no longer queued, but still in the
      57             :      * worker's own batches.
      58             :      */
      59        6381 :     unsigned int nTodo GUARDED_BY(m_mutex){0};
      60             : 
      61             :     //! The maximum number of elements to be processed in one batch
      62             :     const unsigned int nBatchSize;
      63             : 
      64             :     std::vector<std::thread> m_worker_threads;
      65        6381 :     bool m_request_stop GUARDED_BY(m_mutex){false};
      66             : 
      67             :     /** Internal function that does bulk of the verification work. */
      68      596924 :     bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
      69             :     {
      70      596924 :         std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
      71      596924 :         std::vector<T> vChecks;
      72      596924 :         vChecks.reserve(nBatchSize);
      73      596979 :         unsigned int nNow = 0;
      74      596979 :         bool fOk = true;
      75      596979 :         do {
      76             :             {
      77     1706848 :                 WAIT_LOCK(m_mutex, lock);
      78             :                 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
      79     1708511 :                 if (nNow) {
      80     1111373 :                     fAllOk &= fOk;
      81     1111373 :                     nTodo -= nNow;
      82     1111373 :                     if (nTodo == 0 && !fMaster)
      83             :                         // We processed the last element; inform the master it can exit and return the result
      84       91030 :                         m_master_cv.notify_one();
      85     1111373 :                 } else {
      86             :                     // first iteration
      87      597138 :                     nTotal++;
      88             :                 }
      89             :                 // logically, the do loop starts here
      90     2067608 :                 while (queue.empty() && !m_request_stop) {
      91      867370 :                     if (fMaster && nTodo == 0) {
      92      508273 :                         nTotal--;
      93      508273 :                         bool fRet = fAllOk;
      94             :                         // reset the status for new work later
      95      508273 :                         fAllOk = true;
      96             :                         // return the current status
      97      508273 :                         return fRet;
      98             :                     }
      99      359097 :                     nIdle++;
     100      359097 :                     cond.wait(lock); // wait
     101      359097 :                     nIdle--;
     102             :                 }
     103     1200238 :                 if (m_request_stop) {
     104       88865 :                     return false;
     105             :                 }
     106             : 
     107             :                 // Decide how many work units to process now.
     108             :                 // * Do not try to do everything at once, but aim for increasingly smaller batches so
     109             :                 //   all workers finish approximately simultaneously.
     110             :                 // * Try to account for idle jobs which will instantly start helping.
     111             :                 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
     112     1111373 :                 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
     113     1111373 :                 vChecks.resize(nNow);
     114    12994566 :                 for (unsigned int i = 0; i < nNow; i++) {
     115             :                     // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global
     116             :                     // queue to the local batch vector instead of copying.
     117    11883193 :                     vChecks[i].swap(queue.back());
     118    11883193 :                     queue.pop_back();
     119    11883193 :                 }
     120             :                 // Check whether we need to do work at all
     121     1111373 :                 fOk = fAllOk;
     122     1708511 :             }
     123             :             // execute work
     124    12978676 :             for (T& check : vChecks)
     125    11868799 :                 if (fOk)
     126    11639044 :                     fOk = check();
     127     1109869 :             vChecks.clear();
     128     1109869 :         } while (true);
     129      600566 :     }
     130             : 
     131             : public:
     132             :     //! Mutex to ensure only one concurrent CCheckQueueControl
     133             :     Mutex m_control_mutex;
     134             : 
     135             :     //! Create a new check queue
     136       25524 :     explicit CCheckQueue(unsigned int nBatchSizeIn)
     137        6381 :         : nBatchSize(nBatchSizeIn)
     138        6381 :     {
     139       12762 :     }
     140             : 
     141             :     //! Create a pool of new worker threads.
     142        6271 :     void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     143             :     {
     144             :         {
     145        6271 :             LOCK(m_mutex);
     146        6271 :             nIdle = 0;
     147        6271 :             nTotal = 0;
     148        6271 :             fAllOk = true;
     149        6271 :         }
     150        6271 :         assert(m_worker_threads.empty());
     151       95136 :         for (int n = 0; n < threads_num; ++n) {
     152      177023 :             m_worker_threads.emplace_back([this, n]() {
     153       88158 :                 util::ThreadRename(strprintf("scriptch.%i", n));
     154       88158 :                 Loop(false /* worker thread */);
     155       88158 :             });
     156       88865 :         }
     157        6271 :     }
     158             : 
     159             :     //! Wait until execution finishes, and return whether all evaluations were successful.
     160      508273 :     bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     161             :     {
     162      508273 :         return Loop(true /* master thread */);
     163             :     }
     164             : 
     165             :     //! Add a batch of checks to the queue
     166     2987734 :     void Add(std::vector<T>& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     167             :     {
     168     2987734 :         if (vChecks.empty()) {
     169      544870 :             return;
     170             :         }
     171             : 
     172             :         {
     173     2442864 :             LOCK(m_mutex);
     174    14326057 :             for (T& check : vChecks) {
     175    11883193 :                 queue.emplace_back();
     176    11883193 :                 check.swap(queue.back());
     177             :             }
     178     2442864 :             nTodo += vChecks.size();
     179     2442864 :         }
     180             : 
     181     2442864 :         if (vChecks.size() == 1) {
     182      328118 :             m_worker_cv.notify_one();
     183      328118 :         } else {
     184     2114746 :             m_worker_cv.notify_all();
     185             :         }
     186     2987734 :     }
     187             : 
     188             :     //! Stop all of the worker threads.
     189        6286 :     void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     190             :     {
     191       12572 :         WITH_LOCK(m_mutex, m_request_stop = true);
     192        6286 :         m_worker_cv.notify_all();
     193       95151 :         for (std::thread& t : m_worker_threads) {
     194       88865 :             t.join();
     195             :         }
     196        6286 :         m_worker_threads.clear();
     197       12572 :         WITH_LOCK(m_mutex, m_request_stop = false);
     198        6286 :     }
     199             : 
     200        6146 :     ~CCheckQueue()
     201        3073 :     {
     202        3073 :         assert(m_worker_threads.empty());
     203        6146 :     }
     204             : 
     205             : };
     206             : 
     207             : /**
     208             :  * RAII-style controller object for a CCheckQueue that guarantees the passed
     209             :  * queue is finished before continuing.
     210             :  */
     211             : template <typename T>
     212             : class CCheckQueueControl
     213             : {
     214             : private:
     215             :     CCheckQueue<T> * const pqueue;
     216             :     bool fDone;
     217             : 
     218             : public:
     219             :     CCheckQueueControl() = delete;
     220             :     CCheckQueueControl(const CCheckQueueControl&) = delete;
     221             :     CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
     222     1022058 :     explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
     223      511029 :     {
     224             :         // passed queue is supposed to be unused, or nullptr
     225      511029 :         if (pqueue != nullptr) {
     226      508273 :             ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
     227      508273 :         }
     228     1022058 :     }
     229             : 
     230      513563 :     bool Wait()
     231             :     {
     232      513563 :         if (pqueue == nullptr)
     233        5290 :             return true;
     234      508273 :         bool fRet = pqueue->Wait();
     235      508273 :         fDone = true;
     236      508273 :         return fRet;
     237      513563 :     }
     238             : 
     239     2988624 :     void Add(std::vector<T>& vChecks)
     240             :     {
     241     2988624 :         if (pqueue != nullptr)
     242     2987734 :             pqueue->Add(vChecks);
     243     2988624 :     }
     244             : 
     245     1022058 :     ~CCheckQueueControl()
     246      511029 :     {
     247      511029 :         if (!fDone)
     248        3884 :             Wait();
     249      511029 :         if (pqueue != nullptr) {
     250      508273 :             LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
     251      508273 :         }
     252     1022058 :     }
     253             : };
     254             : 
     255             : #endif // BITCOIN_CHECKQUEUE_H

Generated by: LCOV version 1.16