Line data Source code
1 : // Copyright (c) 2012-2021 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #ifndef BITCOIN_CHECKQUEUE_H 6 : #define BITCOIN_CHECKQUEUE_H 7 : 8 : #include <sync.h> 9 : #include <tinyformat.h> 10 : #include <util/threadnames.h> 11 : 12 : #include <algorithm> 13 : #include <vector> 14 : 15 : template <typename T> 16 : class CCheckQueueControl; 17 : 18 : /** 19 : * Queue for verifications that have to be performed. 20 : * The verifications are represented by a type T, which must provide an 21 : * operator(), returning a bool. 22 : * 23 : * One thread (the master) is assumed to push batches of verifications 24 : * onto the queue, where they are processed by N-1 worker threads. When 25 : * the master is done adding work, it temporarily joins the worker pool 26 : * as an N'th worker, until all jobs are done. 27 : */ 28 : template <typename T> 29 : class CCheckQueue 30 : { 31 : private: 32 : //! Mutex to protect the inner state 33 : Mutex m_mutex; 34 : 35 : //! Worker threads block on this when out of work 36 : std::condition_variable m_worker_cv; 37 : 38 : //! Master thread blocks on this when out of work 39 : std::condition_variable m_master_cv; 40 : 41 : //! The queue of elements to be processed. 42 : //! As the order of booleans doesn't matter, it is used as a LIFO (stack) 43 : std::vector<T> queue GUARDED_BY(m_mutex); 44 : 45 : //! The number of workers (including the master) that are idle. 46 336 : int nIdle GUARDED_BY(m_mutex){0}; 47 : 48 : //! The total number of workers (including the master). 49 336 : int nTotal GUARDED_BY(m_mutex){0}; 50 : 51 : //! The temporary evaluation result. 52 336 : bool fAllOk GUARDED_BY(m_mutex){true}; 53 : 54 : /** 55 : * Number of verifications that haven't completed yet. 56 : * This includes elements that are no longer queued, but still in the 57 : * worker's own batches. 58 : */ 59 336 : unsigned int nTodo GUARDED_BY(m_mutex){0}; 60 : 61 : //! The maximum number of elements to be processed in one batch 62 : const unsigned int nBatchSize; 63 : 64 : std::vector<std::thread> m_worker_threads; 65 336 : bool m_request_stop GUARDED_BY(m_mutex){false}; 66 : 67 : /** Internal function that does bulk of the verification work. */ 68 69888 : bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 69 : { 70 69888 : std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; 71 69888 : std::vector<T> vChecks; 72 69888 : vChecks.reserve(nBatchSize); 73 69889 : unsigned int nNow = 0; 74 69889 : bool fOk = true; 75 69889 : do { 76 : { 77 1086068 : WAIT_LOCK(m_mutex, lock); 78 : // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) 79 1087462 : if (nNow) { 80 1017571 : fAllOk &= fOk; 81 1017571 : nTodo -= nNow; 82 1017571 : if (nTodo == 0 && !fMaster) 83 : // We processed the last element; inform the master it can exit and return the result 84 44294 : m_master_cv.notify_one(); 85 1017571 : } else { 86 : // first iteration 87 69891 : nTotal++; 88 : } 89 : // logically, the do loop starts here 90 1241268 : while (queue.empty() && !m_request_stop) { 91 223302 : if (fMaster && nTodo == 0) { 92 69496 : nTotal--; 93 69496 : bool fRet = fAllOk; 94 : // reset the status for new work later 95 69496 : fAllOk = true; 96 : // return the current status 97 69496 : return fRet; 98 : } 99 153806 : nIdle++; 100 153806 : cond.wait(lock); // wait 101 153806 : nIdle--; 102 : } 103 1017966 : if (m_request_stop) { 104 395 : return false; 105 : } 106 : 107 : // Decide how many work units to process now. 108 : // * Do not try to do everything at once, but aim for increasingly smaller batches so 109 : // all workers finish approximately simultaneously. 110 : // * Try to account for idle jobs which will instantly start helping. 111 : // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. 112 1017571 : nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); 113 1017571 : vChecks.resize(nNow); 114 12784373 : for (unsigned int i = 0; i < nNow; i++) { 115 : // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global 116 : // queue to the local batch vector instead of copying. 117 11766802 : vChecks[i].swap(queue.back()); 118 11766802 : queue.pop_back(); 119 11766802 : } 120 : // Check whether we need to do work at all 121 1017571 : fOk = fAllOk; 122 1087462 : } 123 : // execute work 124 12768370 : for (T& check : vChecks) 125 11752187 : if (fOk) 126 11522434 : fOk = check(); 127 1016179 : vChecks.clear(); 128 1016179 : } while (true); 129 72888 : } 130 : 131 : public: 132 : //! Mutex to ensure only one concurrent CCheckQueueControl 133 : Mutex m_control_mutex; 134 : 135 : //! Create a new check queue 136 1344 : explicit CCheckQueue(unsigned int nBatchSizeIn) 137 336 : : nBatchSize(nBatchSizeIn) 138 336 : { 139 672 : } 140 : 141 : //! Create a pool of new worker threads. 142 373 : void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 143 : { 144 : { 145 373 : LOCK(m_mutex); 146 373 : nIdle = 0; 147 373 : nTotal = 0; 148 373 : fAllOk = true; 149 373 : } 150 373 : assert(m_worker_threads.empty()); 151 768 : for (int n = 0; n < threads_num; ++n) { 152 788 : m_worker_threads.emplace_back([this, n]() { 153 393 : util::ThreadRename(strprintf("scriptch.%i", n)); 154 393 : Loop(false /* worker thread */); 155 393 : }); 156 395 : } 157 373 : } 158 : 159 : //! Wait until execution finishes, and return whether all evaluations were successful. 160 69496 : bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 161 : { 162 69496 : return Loop(true /* master thread */); 163 : } 164 : 165 : //! Add a batch of checks to the queue 166 2645276 : void Add(std::vector<T>& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 167 : { 168 2645276 : if (vChecks.empty()) { 169 290629 : return; 170 : } 171 : 172 : { 173 2354647 : LOCK(m_mutex); 174 14121449 : for (T& check : vChecks) { 175 11766802 : queue.emplace_back(); 176 11766802 : check.swap(queue.back()); 177 : } 178 2354647 : nTodo += vChecks.size(); 179 2354647 : } 180 : 181 2354647 : if (vChecks.size() == 1) { 182 262777 : m_worker_cv.notify_one(); 183 262777 : } else { 184 2091870 : m_worker_cv.notify_all(); 185 : } 186 2645276 : } 187 : 188 : //! Stop all of the worker threads. 189 373 : void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 190 : { 191 746 : WITH_LOCK(m_mutex, m_request_stop = true); 192 373 : m_worker_cv.notify_all(); 193 768 : for (std::thread& t : m_worker_threads) { 194 395 : t.join(); 195 : } 196 373 : m_worker_threads.clear(); 197 746 : WITH_LOCK(m_mutex, m_request_stop = false); 198 373 : } 199 : 200 380 : ~CCheckQueue() 201 190 : { 202 190 : assert(m_worker_threads.empty()); 203 380 : } 204 : 205 : }; 206 : 207 : /** 208 : * RAII-style controller object for a CCheckQueue that guarantees the passed 209 : * queue is finished before continuing. 210 : */ 211 : template <typename T> 212 : class CCheckQueueControl 213 : { 214 : private: 215 : CCheckQueue<T> * const pqueue; 216 : bool fDone; 217 : 218 : public: 219 : CCheckQueueControl() = delete; 220 : CCheckQueueControl(const CCheckQueueControl&) = delete; 221 : CCheckQueueControl& operator=(const CCheckQueueControl&) = delete; 222 138992 : explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false) 223 69496 : { 224 : // passed queue is supposed to be unused, or nullptr 225 69496 : if (pqueue != nullptr) { 226 69496 : ENTER_CRITICAL_SECTION(pqueue->m_control_mutex); 227 69496 : } 228 138992 : } 229 : 230 69496 : bool Wait() 231 : { 232 69496 : if (pqueue == nullptr) 233 0 : return true; 234 69496 : bool fRet = pqueue->Wait(); 235 69496 : fDone = true; 236 69496 : return fRet; 237 69496 : } 238 : 239 2645276 : void Add(std::vector<T>& vChecks) 240 : { 241 2645276 : if (pqueue != nullptr) 242 2645276 : pqueue->Add(vChecks); 243 2645276 : } 244 : 245 138992 : ~CCheckQueueControl() 246 69496 : { 247 69496 : if (!fDone) 248 1010 : Wait(); 249 69496 : if (pqueue != nullptr) { 250 69496 : LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex); 251 69496 : } 252 138992 : } 253 : }; 254 : 255 : #endif // BITCOIN_CHECKQUEUE_H