Line data Source code
1 : // Copyright (c) 2015-2021 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #include <scheduler.h> 6 : 7 : #include <sync.h> 8 : #include <util/time.h> 9 : 10 : #include <cassert> 11 : #include <functional> 12 : #include <utility> 13 : 14 13820 : CScheduler::CScheduler() = default; 15 : 16 13820 : CScheduler::~CScheduler() 17 6910 : { 18 6910 : assert(nThreadsServicingQueue == 0); 19 6910 : if (stopWhenEmpty) assert(taskQueue.empty()); 20 13820 : } 21 : 22 : 23 6923 : void CScheduler::serviceQueue() 24 : { 25 6923 : WAIT_LOCK(newTaskMutex, lock); 26 6923 : ++nThreadsServicingQueue; 27 : 28 : // newTaskMutex is locked throughout this loop EXCEPT 29 : // when the thread is waiting or when the user's function 30 : // is called. 31 1299489 : while (!shouldStop()) { 32 : try { 33 1321730 : while (!shouldStop() && taskQueue.empty()) { 34 : // Wait until there is something to do. 35 28726 : newTaskScheduled.wait(lock); 36 : } 37 : 38 : // Wait until either there is a new task, or until 39 : // the time of the first item on the queue: 40 : 41 1773101 : while (!shouldStop() && !taskQueue.empty()) { 42 1766717 : std::chrono::steady_clock::time_point timeToWaitFor = taskQueue.begin()->first; 43 1766706 : if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) { 44 1286307 : break; // Exit loop after timeout, it means we reached the time of the event 45 : } 46 : } 47 : 48 : // If there are multiple threads, the queue can empty while we're waiting (another 49 : // thread may service the task we were waiting on). 50 1292577 : if (shouldStop() || taskQueue.empty()) 51 6927 : continue; 52 : 53 1285556 : Function f = taskQueue.begin()->second; 54 1285639 : taskQueue.erase(taskQueue.begin()); 55 : 56 : { 57 : // Unlock before calling f, so it can reschedule itself or another task 58 : // without deadlocking: 59 1285622 : REVERSE_LOCK(lock); 60 1285639 : f(); 61 1285639 : } 62 1285639 : } catch (...) { 63 0 : --nThreadsServicingQueue; 64 0 : throw; 65 0 : } 66 : } 67 6485 : --nThreadsServicingQueue; 68 6485 : newTaskScheduled.notify_one(); 69 7413 : } 70 : 71 1332759 : void CScheduler::schedule(CScheduler::Function f, std::chrono::steady_clock::time_point t) 72 : { 73 : { 74 1332759 : LOCK(newTaskMutex); 75 1332759 : taskQueue.insert(std::make_pair(t, f)); 76 1332759 : } 77 1332759 : newTaskScheduled.notify_one(); 78 1332759 : } 79 : 80 43224 : void CScheduler::MockForward(std::chrono::seconds delta_seconds) 81 : { 82 43224 : assert(delta_seconds > 0s && delta_seconds <= 1h); 83 : 84 : { 85 43224 : LOCK(newTaskMutex); 86 : 87 : // use temp_queue to maintain updated schedule 88 43224 : std::multimap<std::chrono::steady_clock::time_point, Function> temp_queue; 89 : 90 609537 : for (const auto& element : taskQueue) { 91 566313 : temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second); 92 : } 93 : 94 : // point taskQueue to temp_queue 95 43224 : taskQueue = std::move(temp_queue); 96 43224 : } 97 : 98 : // notify that the taskQueue needs to be processed 99 43224 : newTaskScheduled.notify_one(); 100 43224 : } 101 : 102 398029 : static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta) 103 : { 104 398029 : f(); 105 780093 : s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta); 106 398029 : } 107 : 108 38066 : void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta) 109 : { 110 53911 : scheduleFromNow([this, f, delta] { Repeat(*this, f, delta); }, delta); 111 38066 : } 112 : 113 2 : size_t CScheduler::getQueueInfo(std::chrono::steady_clock::time_point& first, 114 : std::chrono::steady_clock::time_point& last) const 115 : { 116 2 : LOCK(newTaskMutex); 117 2 : size_t result = taskQueue.size(); 118 2 : if (!taskQueue.empty()) { 119 1 : first = taskQueue.begin()->first; 120 1 : last = taskQueue.rbegin()->first; 121 1 : } 122 2 : return result; 123 2 : } 124 : 125 6230 : bool CScheduler::AreThreadsServicingQueue() const 126 : { 127 6230 : LOCK(newTaskMutex); 128 6230 : return nThreadsServicingQueue; 129 6230 : } 130 : 131 : 132 1277101 : void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() 133 : { 134 : { 135 1277101 : LOCK(m_callbacks_mutex); 136 : // Try to avoid scheduling too many copies here, but if we 137 : // accidentally have two ProcessQueue's scheduled at once its 138 : // not a big deal. 139 1277101 : if (m_are_callbacks_running) return; 140 922605 : if (m_callbacks_pending.empty()) return; 141 1277101 : } 142 1352141 : m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now()); 143 1277101 : } 144 : 145 679669 : void SingleThreadedSchedulerClient::ProcessQueue() 146 : { 147 679669 : std::function<void()> callback; 148 : { 149 679669 : LOCK(m_callbacks_mutex); 150 679669 : if (m_are_callbacks_running) return; 151 679669 : if (m_callbacks_pending.empty()) return; 152 637115 : m_are_callbacks_running = true; 153 : 154 637115 : callback = std::move(m_callbacks_pending.front()); 155 637115 : m_callbacks_pending.pop_front(); 156 679669 : } 157 : 158 : // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue 159 : // to ensure both happen safely even if callback() throws. 160 : struct RAIICallbacksRunning { 161 : SingleThreadedSchedulerClient* instance; 162 1274230 : explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {} 163 1274230 : ~RAIICallbacksRunning() 164 637115 : { 165 : { 166 637115 : LOCK(instance->m_callbacks_mutex); 167 637115 : instance->m_are_callbacks_running = false; 168 637115 : } 169 637115 : instance->MaybeScheduleProcessQueue(); 170 1274230 : } 171 637115 : } raiicallbacksrunning(this); 172 : 173 637115 : callback(); 174 679669 : } 175 : 176 639986 : void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func) 177 : { 178 : { 179 639986 : LOCK(m_callbacks_mutex); 180 639986 : m_callbacks_pending.emplace_back(std::move(func)); 181 639986 : } 182 639986 : MaybeScheduleProcessQueue(); 183 639986 : } 184 : 185 6230 : void SingleThreadedSchedulerClient::EmptyQueue() 186 : { 187 6230 : assert(!m_scheduler.AreThreadsServicingQueue()); 188 6230 : bool should_continue = true; 189 12943 : while (should_continue) { 190 6713 : ProcessQueue(); 191 6713 : LOCK(m_callbacks_mutex); 192 6713 : should_continue = !m_callbacks_pending.empty(); 193 6713 : } 194 6230 : } 195 : 196 280157 : size_t SingleThreadedSchedulerClient::CallbacksPending() 197 : { 198 280157 : LOCK(m_callbacks_mutex); 199 280157 : return m_callbacks_pending.size(); 200 280157 : }