Line data Source code
1 : // Copyright (c) 2012-2021 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #ifndef BITCOIN_CHECKQUEUE_H 6 : #define BITCOIN_CHECKQUEUE_H 7 : 8 : #include <sync.h> 9 : #include <tinyformat.h> 10 : #include <util/threadnames.h> 11 : 12 : #include <algorithm> 13 : #include <vector> 14 : 15 : template <typename T> 16 : class CCheckQueueControl; 17 : 18 : /** 19 : * Queue for verifications that have to be performed. 20 : * The verifications are represented by a type T, which must provide an 21 : * operator(), returning a bool. 22 : * 23 : * One thread (the master) is assumed to push batches of verifications 24 : * onto the queue, where they are processed by N-1 worker threads. When 25 : * the master is done adding work, it temporarily joins the worker pool 26 : * as an N'th worker, until all jobs are done. 27 : */ 28 : template <typename T> 29 : class CCheckQueue 30 : { 31 : private: 32 : //! Mutex to protect the inner state 33 : Mutex m_mutex; 34 : 35 : //! Worker threads block on this when out of work 36 : std::condition_variable m_worker_cv; 37 : 38 : //! Master thread blocks on this when out of work 39 : std::condition_variable m_master_cv; 40 : 41 : //! The queue of elements to be processed. 42 : //! As the order of booleans doesn't matter, it is used as a LIFO (stack) 43 : std::vector<T> queue GUARDED_BY(m_mutex); 44 : 45 : //! The number of workers (including the master) that are idle. 46 6381 : int nIdle GUARDED_BY(m_mutex){0}; 47 : 48 : //! The total number of workers (including the master). 49 6381 : int nTotal GUARDED_BY(m_mutex){0}; 50 : 51 : //! The temporary evaluation result. 52 6381 : bool fAllOk GUARDED_BY(m_mutex){true}; 53 : 54 : /** 55 : * Number of verifications that haven't completed yet. 56 : * This includes elements that are no longer queued, but still in the 57 : * worker's own batches. 58 : */ 59 6381 : unsigned int nTodo GUARDED_BY(m_mutex){0}; 60 : 61 : //! The maximum number of elements to be processed in one batch 62 : const unsigned int nBatchSize; 63 : 64 : std::vector<std::thread> m_worker_threads; 65 6381 : bool m_request_stop GUARDED_BY(m_mutex){false}; 66 : 67 : /** Internal function that does bulk of the verification work. */ 68 596924 : bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 69 : { 70 596924 : std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; 71 596924 : std::vector<T> vChecks; 72 596924 : vChecks.reserve(nBatchSize); 73 596979 : unsigned int nNow = 0; 74 596979 : bool fOk = true; 75 596979 : do { 76 : { 77 1706848 : WAIT_LOCK(m_mutex, lock); 78 : // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) 79 1708511 : if (nNow) { 80 1111373 : fAllOk &= fOk; 81 1111373 : nTodo -= nNow; 82 1111373 : if (nTodo == 0 && !fMaster) 83 : // We processed the last element; inform the master it can exit and return the result 84 91030 : m_master_cv.notify_one(); 85 1111373 : } else { 86 : // first iteration 87 597138 : nTotal++; 88 : } 89 : // logically, the do loop starts here 90 2067608 : while (queue.empty() && !m_request_stop) { 91 867370 : if (fMaster && nTodo == 0) { 92 508273 : nTotal--; 93 508273 : bool fRet = fAllOk; 94 : // reset the status for new work later 95 508273 : fAllOk = true; 96 : // return the current status 97 508273 : return fRet; 98 : } 99 359097 : nIdle++; 100 359097 : cond.wait(lock); // wait 101 359097 : nIdle--; 102 : } 103 1200238 : if (m_request_stop) { 104 88865 : return false; 105 : } 106 : 107 : // Decide how many work units to process now. 108 : // * Do not try to do everything at once, but aim for increasingly smaller batches so 109 : // all workers finish approximately simultaneously. 110 : // * Try to account for idle jobs which will instantly start helping. 111 : // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. 112 1111373 : nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); 113 1111373 : vChecks.resize(nNow); 114 12994566 : for (unsigned int i = 0; i < nNow; i++) { 115 : // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global 116 : // queue to the local batch vector instead of copying. 117 11883193 : vChecks[i].swap(queue.back()); 118 11883193 : queue.pop_back(); 119 11883193 : } 120 : // Check whether we need to do work at all 121 1111373 : fOk = fAllOk; 122 1708511 : } 123 : // execute work 124 12978676 : for (T& check : vChecks) 125 11868799 : if (fOk) 126 11639044 : fOk = check(); 127 1109869 : vChecks.clear(); 128 1109869 : } while (true); 129 600566 : } 130 : 131 : public: 132 : //! Mutex to ensure only one concurrent CCheckQueueControl 133 : Mutex m_control_mutex; 134 : 135 : //! Create a new check queue 136 25524 : explicit CCheckQueue(unsigned int nBatchSizeIn) 137 6381 : : nBatchSize(nBatchSizeIn) 138 6381 : { 139 12762 : } 140 : 141 : //! Create a pool of new worker threads. 142 6271 : void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 143 : { 144 : { 145 6271 : LOCK(m_mutex); 146 6271 : nIdle = 0; 147 6271 : nTotal = 0; 148 6271 : fAllOk = true; 149 6271 : } 150 6271 : assert(m_worker_threads.empty()); 151 95136 : for (int n = 0; n < threads_num; ++n) { 152 177023 : m_worker_threads.emplace_back([this, n]() { 153 88158 : util::ThreadRename(strprintf("scriptch.%i", n)); 154 88158 : Loop(false /* worker thread */); 155 88158 : }); 156 88865 : } 157 6271 : } 158 : 159 : //! Wait until execution finishes, and return whether all evaluations were successful. 160 508273 : bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 161 : { 162 508273 : return Loop(true /* master thread */); 163 : } 164 : 165 : //! Add a batch of checks to the queue 166 2987734 : void Add(std::vector<T>& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 167 : { 168 2987734 : if (vChecks.empty()) { 169 544870 : return; 170 : } 171 : 172 : { 173 2442864 : LOCK(m_mutex); 174 14326057 : for (T& check : vChecks) { 175 11883193 : queue.emplace_back(); 176 11883193 : check.swap(queue.back()); 177 : } 178 2442864 : nTodo += vChecks.size(); 179 2442864 : } 180 : 181 2442864 : if (vChecks.size() == 1) { 182 328118 : m_worker_cv.notify_one(); 183 328118 : } else { 184 2114746 : m_worker_cv.notify_all(); 185 : } 186 2987734 : } 187 : 188 : //! Stop all of the worker threads. 189 6286 : void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 190 : { 191 12572 : WITH_LOCK(m_mutex, m_request_stop = true); 192 6286 : m_worker_cv.notify_all(); 193 95151 : for (std::thread& t : m_worker_threads) { 194 88865 : t.join(); 195 : } 196 6286 : m_worker_threads.clear(); 197 12572 : WITH_LOCK(m_mutex, m_request_stop = false); 198 6286 : } 199 : 200 6146 : ~CCheckQueue() 201 3073 : { 202 3073 : assert(m_worker_threads.empty()); 203 6146 : } 204 : 205 : }; 206 : 207 : /** 208 : * RAII-style controller object for a CCheckQueue that guarantees the passed 209 : * queue is finished before continuing. 210 : */ 211 : template <typename T> 212 : class CCheckQueueControl 213 : { 214 : private: 215 : CCheckQueue<T> * const pqueue; 216 : bool fDone; 217 : 218 : public: 219 : CCheckQueueControl() = delete; 220 : CCheckQueueControl(const CCheckQueueControl&) = delete; 221 : CCheckQueueControl& operator=(const CCheckQueueControl&) = delete; 222 1022058 : explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false) 223 511029 : { 224 : // passed queue is supposed to be unused, or nullptr 225 511029 : if (pqueue != nullptr) { 226 508273 : ENTER_CRITICAL_SECTION(pqueue->m_control_mutex); 227 508273 : } 228 1022058 : } 229 : 230 513563 : bool Wait() 231 : { 232 513563 : if (pqueue == nullptr) 233 5290 : return true; 234 508273 : bool fRet = pqueue->Wait(); 235 508273 : fDone = true; 236 508273 : return fRet; 237 513563 : } 238 : 239 2988624 : void Add(std::vector<T>& vChecks) 240 : { 241 2988624 : if (pqueue != nullptr) 242 2987734 : pqueue->Add(vChecks); 243 2988624 : } 244 : 245 1022058 : ~CCheckQueueControl() 246 511029 : { 247 511029 : if (!fDone) 248 3884 : Wait(); 249 511029 : if (pqueue != nullptr) { 250 508273 : LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex); 251 508273 : } 252 1022058 : } 253 : }; 254 : 255 : #endif // BITCOIN_CHECKQUEUE_H