Line data Source code
1 : // Copyright (c) 2015-2021 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #include <scheduler.h> 6 : 7 : #include <sync.h> 8 : #include <util/time.h> 9 : 10 : #include <cassert> 11 : #include <functional> 12 : #include <utility> 13 : 14 740 : CScheduler::CScheduler() = default; 15 : 16 740 : CScheduler::~CScheduler() 17 370 : { 18 370 : assert(nThreadsServicingQueue == 0); 19 370 : if (stopWhenEmpty) assert(taskQueue.empty()); 20 740 : } 21 : 22 : 23 383 : void CScheduler::serviceQueue() 24 : { 25 383 : WAIT_LOCK(newTaskMutex, lock); 26 383 : ++nThreadsServicingQueue; 27 : 28 : // newTaskMutex is locked throughout this loop EXCEPT 29 : // when the thread is waiting or when the user's function 30 : // is called. 31 59433 : while (!shouldStop()) { 32 : try { 33 84256 : while (!shouldStop() && taskQueue.empty()) { 34 : // Wait until there is something to do. 35 25206 : newTaskScheduled.wait(lock); 36 : } 37 : 38 : // Wait until either there is a new task, or until 39 : // the time of the first item on the queue: 40 : 41 59078 : while (!shouldStop() && !taskQueue.empty()) { 42 58697 : std::chrono::steady_clock::time_point timeToWaitFor = taskQueue.begin()->first; 43 58697 : if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) { 44 58669 : break; // Exit loop after timeout, it means we reached the time of the event 45 : } 46 : } 47 : 48 : // If there are multiple threads, the queue can empty while we're waiting (another 49 : // thread may service the task we were waiting on). 50 59050 : if (shouldStop() || taskQueue.empty()) 51 390 : continue; 52 : 53 58660 : Function f = taskQueue.begin()->second; 54 58660 : taskQueue.erase(taskQueue.begin()); 55 : 56 : { 57 : // Unlock before calling f, so it can reschedule itself or another task 58 : // without deadlocking: 59 58660 : REVERSE_LOCK(lock); 60 58660 : f(); 61 58660 : } 62 58660 : } catch (...) { 63 0 : --nThreadsServicingQueue; 64 0 : throw; 65 0 : } 66 : } 67 383 : --nThreadsServicingQueue; 68 383 : newTaskScheduled.notify_one(); 69 383 : } 70 : 71 58661 : void CScheduler::schedule(CScheduler::Function f, std::chrono::steady_clock::time_point t) 72 : { 73 : { 74 58661 : LOCK(newTaskMutex); 75 58661 : taskQueue.insert(std::make_pair(t, f)); 76 58661 : } 77 58661 : newTaskScheduled.notify_one(); 78 58661 : } 79 : 80 0 : void CScheduler::MockForward(std::chrono::seconds delta_seconds) 81 : { 82 0 : assert(delta_seconds > 0s && delta_seconds <= 1h); 83 : 84 : { 85 0 : LOCK(newTaskMutex); 86 : 87 : // use temp_queue to maintain updated schedule 88 0 : std::multimap<std::chrono::steady_clock::time_point, Function> temp_queue; 89 : 90 0 : for (const auto& element : taskQueue) { 91 0 : temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second); 92 : } 93 : 94 : // point taskQueue to temp_queue 95 0 : taskQueue = std::move(temp_queue); 96 0 : } 97 : 98 : // notify that the taskQueue needs to be processed 99 0 : newTaskScheduled.notify_one(); 100 0 : } 101 : 102 0 : static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta) 103 : { 104 0 : f(); 105 0 : s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta); 106 0 : } 107 : 108 0 : void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta) 109 : { 110 0 : scheduleFromNow([this, f, delta] { Repeat(*this, f, delta); }, delta); 111 0 : } 112 : 113 2 : size_t CScheduler::getQueueInfo(std::chrono::steady_clock::time_point& first, 114 : std::chrono::steady_clock::time_point& last) const 115 : { 116 2 : LOCK(newTaskMutex); 117 2 : size_t result = taskQueue.size(); 118 2 : if (!taskQueue.empty()) { 119 1 : first = taskQueue.begin()->first; 120 1 : last = taskQueue.rbegin()->first; 121 1 : } 122 2 : return result; 123 2 : } 124 : 125 184 : bool CScheduler::AreThreadsServicingQueue() const 126 : { 127 184 : LOCK(newTaskMutex); 128 184 : return nThreadsServicingQueue; 129 184 : } 130 : 131 : 132 99868 : void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() 133 : { 134 : { 135 99868 : LOCK(m_callbacks_mutex); 136 : // Try to avoid scheduling too many copies here, but if we 137 : // accidentally have two ProcessQueue's scheduled at once its 138 : // not a big deal. 139 99868 : if (m_are_callbacks_running) return; 140 82622 : if (m_callbacks_pending.empty()) return; 141 99868 : } 142 116521 : m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now()); 143 99868 : } 144 : 145 58444 : void SingleThreadedSchedulerClient::ProcessQueue() 146 : { 147 58444 : std::function<void()> callback; 148 : { 149 58444 : LOCK(m_callbacks_mutex); 150 58444 : if (m_are_callbacks_running) return; 151 58444 : if (m_callbacks_pending.empty()) return; 152 49934 : m_are_callbacks_running = true; 153 : 154 49934 : callback = std::move(m_callbacks_pending.front()); 155 49934 : m_callbacks_pending.pop_front(); 156 58444 : } 157 : 158 : // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue 159 : // to ensure both happen safely even if callback() throws. 160 : struct RAIICallbacksRunning { 161 : SingleThreadedSchedulerClient* instance; 162 99868 : explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {} 163 99868 : ~RAIICallbacksRunning() 164 49934 : { 165 : { 166 49934 : LOCK(instance->m_callbacks_mutex); 167 49934 : instance->m_are_callbacks_running = false; 168 49934 : } 169 49934 : instance->MaybeScheduleProcessQueue(); 170 99868 : } 171 49934 : } raiicallbacksrunning(this); 172 : 173 49934 : callback(); 174 58444 : } 175 : 176 49934 : void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func) 177 : { 178 : { 179 49934 : LOCK(m_callbacks_mutex); 180 49934 : m_callbacks_pending.emplace_back(std::move(func)); 181 49934 : } 182 49934 : MaybeScheduleProcessQueue(); 183 49934 : } 184 : 185 184 : void SingleThreadedSchedulerClient::EmptyQueue() 186 : { 187 184 : assert(!m_scheduler.AreThreadsServicingQueue()); 188 184 : bool should_continue = true; 189 368 : while (should_continue) { 190 184 : ProcessQueue(); 191 184 : LOCK(m_callbacks_mutex); 192 184 : should_continue = !m_callbacks_pending.empty(); 193 184 : } 194 184 : } 195 : 196 35090 : size_t SingleThreadedSchedulerClient::CallbacksPending() 197 : { 198 35090 : LOCK(m_callbacks_mutex); 199 35090 : return m_callbacks_pending.size(); 200 35090 : }