Line data Source code
1 : // Copyright (c) 2012-2021 The Bitcoin Core developers
2 : // Distributed under the MIT software license, see the accompanying
3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 :
5 : #include <random.h>
6 : #include <scheduler.h>
7 : #include <util/time.h>
8 :
9 : #include <boost/test/unit_test.hpp>
10 :
11 : #include <functional>
12 : #include <mutex>
13 : #include <thread>
14 : #include <vector>
15 :
16 146 : BOOST_AUTO_TEST_SUITE(scheduler_tests)
17 :
18 399 : static void microTask(CScheduler& s, std::mutex& mutex, int& counter, int delta, std::chrono::steady_clock::time_point rescheduleTime)
19 : {
20 : {
21 399 : std::lock_guard<std::mutex> lock(mutex);
22 399 : counter += delta;
23 399 : }
24 399 : auto noTime = std::chrono::steady_clock::time_point::min();
25 399 : if (rescheduleTime != noTime) {
26 200 : CScheduler::Function f = std::bind(µTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime);
27 200 : s.schedule(f, rescheduleTime);
28 200 : }
29 399 : }
30 :
31 148 : BOOST_AUTO_TEST_CASE(manythreads)
32 : {
33 : // Stress test: hundreds of microsecond-scheduled tasks,
34 : // serviced by 10 threads.
35 : //
36 : // So... ten shared counters, which if all the tasks execute
37 : // properly will sum to the number of tasks done.
38 : // Each task adds or subtracts a random amount from one of the
39 : // counters, and then schedules another task 0-1000
40 : // microseconds in the future to subtract or add from
41 : // the counter -random_amount+1, so in the end the shared
42 : // counters should sum to the number of initial tasks performed.
43 1 : CScheduler microTasks;
44 :
45 1 : std::mutex counterMutex[10];
46 1 : int counter[10] = { 0 };
47 1 : FastRandomContext rng{/*fDeterministic=*/true};
48 201 : auto zeroToNine = [](FastRandomContext& rc) -> int { return rc.randrange(10); }; // [0, 9]
49 401 : auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; // [-11, 1000]
50 201 : auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); }; // [-1000, 1000]
51 :
52 1 : auto start = std::chrono::steady_clock::now();
53 1 : auto now = start;
54 1 : std::chrono::steady_clock::time_point first, last;
55 1 : size_t nTasks = microTasks.getQueueInfo(first, last);
56 1 : BOOST_CHECK(nTasks == 0);
57 :
58 101 : for (int i = 0; i < 100; ++i) {
59 100 : auto t = now + std::chrono::microseconds(randomMsec(rng));
60 100 : auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
61 100 : int whichCounter = zeroToNine(rng);
62 100 : CScheduler::Function f = std::bind(µTask, std::ref(microTasks),
63 100 : std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
64 100 : randomDelta(rng), tReschedule);
65 100 : microTasks.schedule(f, t);
66 100 : }
67 1 : nTasks = microTasks.getQueueInfo(first, last);
68 1 : BOOST_CHECK(nTasks == 100);
69 1 : BOOST_CHECK(first < last);
70 1 : BOOST_CHECK(last > now);
71 :
72 : // As soon as these are created they will start running and servicing the queue
73 1 : std::vector<std::thread> microThreads;
74 6 : for (int i = 0; i < 5; i++)
75 5 : microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks));
76 :
77 1 : UninterruptibleSleep(std::chrono::microseconds{600});
78 1 : now = std::chrono::steady_clock::now();
79 :
80 : // More threads and more tasks:
81 6 : for (int i = 0; i < 5; i++)
82 5 : microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks));
83 101 : for (int i = 0; i < 100; i++) {
84 100 : auto t = now + std::chrono::microseconds(randomMsec(rng));
85 100 : auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
86 100 : int whichCounter = zeroToNine(rng);
87 100 : CScheduler::Function f = std::bind(µTask, std::ref(microTasks),
88 100 : std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
89 100 : randomDelta(rng), tReschedule);
90 100 : microTasks.schedule(f, t);
91 100 : }
92 :
93 : // Drain the task queue then exit threads
94 1 : microTasks.StopWhenDrained();
95 : // wait until all the threads are done
96 11 : for (auto& thread: microThreads) {
97 10 : if (thread.joinable()) thread.join();
98 : }
99 :
100 1 : int counterSum = 0;
101 11 : for (int i = 0; i < 10; i++) {
102 10 : BOOST_CHECK(counter[i] != 0);
103 10 : counterSum += counter[i];
104 10 : }
105 1 : BOOST_CHECK_EQUAL(counterSum, 200);
106 1 : }
107 :
108 148 : BOOST_AUTO_TEST_CASE(wait_until_past)
109 : {
110 1 : std::condition_variable condvar;
111 1 : Mutex mtx;
112 1 : WAIT_LOCK(mtx, lock);
113 :
114 7 : const auto no_wait = [&](const std::chrono::seconds& d) {
115 6 : return condvar.wait_until(lock, std::chrono::steady_clock::now() - d);
116 : };
117 :
118 1 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::seconds{1}));
119 1 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::minutes{1}));
120 1 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1}));
121 1 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{10}));
122 1 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{100}));
123 1 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1000}));
124 1 : }
125 :
126 148 : BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
127 : {
128 1 : CScheduler scheduler;
129 :
130 : // each queue should be well ordered with respect to itself but not other queues
131 1 : SingleThreadedSchedulerClient queue1(scheduler);
132 1 : SingleThreadedSchedulerClient queue2(scheduler);
133 :
134 : // create more threads than queues
135 : // if the queues only permit execution of one task at once then
136 : // the extra threads should effectively be doing nothing
137 : // if they don't we'll get out of order behaviour
138 1 : std::vector<std::thread> threads;
139 6 : for (int i = 0; i < 5; ++i) {
140 10 : threads.emplace_back([&] { scheduler.serviceQueue(); });
141 5 : }
142 :
143 : // these are not atomic, if SinglethreadedSchedulerClient prevents
144 : // parallel execution at the queue level no synchronization should be required here
145 1 : int counter1 = 0;
146 1 : int counter2 = 0;
147 :
148 : // just simply count up on each queue - if execution is properly ordered then
149 : // the callbacks should run in exactly the order in which they were enqueued
150 101 : for (int i = 0; i < 100; ++i) {
151 200 : queue1.AddToProcessQueue([i, &counter1]() {
152 100 : bool expectation = i == counter1++;
153 100 : assert(expectation);
154 100 : });
155 :
156 200 : queue2.AddToProcessQueue([i, &counter2]() {
157 100 : bool expectation = i == counter2++;
158 100 : assert(expectation);
159 100 : });
160 100 : }
161 :
162 : // finish up
163 1 : scheduler.StopWhenDrained();
164 6 : for (auto& thread: threads) {
165 5 : if (thread.joinable()) thread.join();
166 : }
167 :
168 1 : BOOST_CHECK_EQUAL(counter1, 100);
169 1 : BOOST_CHECK_EQUAL(counter2, 100);
170 1 : }
171 :
172 : /* disabled for now. See discussion in https://github.com/bitcoin/bitcoin/pull/18174
173 : BOOST_AUTO_TEST_CASE(mockforward)
174 : {
175 : CScheduler scheduler;
176 :
177 : int counter{0};
178 : CScheduler::Function dummy = [&counter]{counter++;};
179 :
180 : // schedule jobs for 2, 5 & 8 minutes into the future
181 :
182 : scheduler.scheduleFromNow(dummy, std::chrono::minutes{2});
183 : scheduler.scheduleFromNow(dummy, std::chrono::minutes{5});
184 : scheduler.scheduleFromNow(dummy, std::chrono::minutes{8});
185 :
186 : // check taskQueue
187 : std::chrono::steady_clock::time_point first, last;
188 : size_t num_tasks = scheduler.getQueueInfo(first, last);
189 : BOOST_CHECK_EQUAL(num_tasks, 3ul);
190 :
191 : std::thread scheduler_thread([&]() { scheduler.serviceQueue(); });
192 :
193 : // bump the scheduler forward 5 minutes
194 : scheduler.MockForward(std::chrono::minutes{5});
195 :
196 : // ensure scheduler has chance to process all tasks queued for before 1 ms from now.
197 : scheduler.scheduleFromNow([&scheduler] { scheduler.stop(); }, std::chrono::milliseconds{1});
198 : scheduler_thread.join();
199 :
200 : // check that the queue only has one job remaining
201 : num_tasks = scheduler.getQueueInfo(first, last);
202 : BOOST_CHECK_EQUAL(num_tasks, 1ul);
203 :
204 : // check that the dummy function actually ran
205 : BOOST_CHECK_EQUAL(counter, 2);
206 :
207 : // check that the time of the remaining job has been updated
208 : auto now = std::chrono::steady_clock::now();
209 : int delta = std::chrono::duration_cast<std::chrono::seconds>(first - now).count();
210 : // should be between 2 & 3 minutes from now
211 : BOOST_CHECK(delta > 2*60 && delta < 3*60);
212 : }
213 : */
214 :
215 146 : BOOST_AUTO_TEST_SUITE_END()
|