Line data Source code
1 : // Copyright (c) 2012-2021 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #include <test/util/setup_common.h> 6 : 7 : #include <checkqueue.h> 8 : #include <sync.h> 9 : #include <test/util/random.h> 10 : #include <util/time.h> 11 : 12 : #include <boost/test/unit_test.hpp> 13 : 14 : #include <atomic> 15 : #include <condition_variable> 16 : #include <mutex> 17 : #include <thread> 18 : #include <utility> 19 : #include <vector> 20 : 21 : /** 22 : * Identical to TestingSetup but excludes lock contention logging if 23 : * `DEBUG_LOCKCONTENTION` is defined, as some of these tests are designed to be 24 : * heavily contested to trigger race conditions or other issues. 25 : */ 26 : struct NoLockLoggingTestingSetup : public TestingSetup { 27 10 : NoLockLoggingTestingSetup() 28 : #ifdef DEBUG_LOCKCONTENTION 29 : : TestingSetup{CBaseChainParams::MAIN, /*extra_args=*/{"-debugexclude=lock"}} {} 30 : #else 31 10 : : TestingSetup{CBaseChainParams::MAIN} {} 32 : #endif 33 : }; 34 : 35 146 : BOOST_FIXTURE_TEST_SUITE(checkqueue_tests, NoLockLoggingTestingSetup) 36 : 37 : static const unsigned int QUEUE_BATCH_SIZE = 128; 38 : static const int SCRIPT_CHECK_THREADS = 3; 39 : 40 : struct FakeCheck { 41 0 : bool operator()() const 42 : { 43 0 : return true; 44 : } 45 0 : void swap(FakeCheck& x) noexcept {}; 46 : }; 47 : 48 : struct FakeCheckCheckCompletion { 49 : static std::atomic<size_t> n_calls; 50 10647155 : bool operator()() 51 : { 52 10647155 : n_calls.fetch_add(1, std::memory_order_relaxed); 53 10647155 : return true; 54 : } 55 21327246 : void swap(FakeCheckCheckCompletion& x) noexcept {}; 56 : }; 57 : 58 : struct FailingCheck { 59 : bool fails; 60 1001080 : FailingCheck(bool _fails) : fails(_fails){}; 61 2010000 : FailingCheck() : fails(true){}; 62 271593 : bool operator()() const 63 : { 64 271593 : return !fails; 65 : } 66 1005000 : void swap(FailingCheck& x) noexcept 67 : { 68 1005000 : std::swap(fails, x.fails); 69 1005000 : }; 70 : }; 71 : 72 : struct UniqueCheck { 73 : static Mutex m; 74 : static std::unordered_multiset<size_t> results GUARDED_BY(m); 75 : size_t check_id; 76 200000 : UniqueCheck(size_t check_id_in) : check_id(check_id_in){}; 77 400000 : UniqueCheck() : check_id(0){}; 78 99882 : bool operator()() 79 : { 80 99882 : LOCK(m); 81 99882 : results.insert(check_id); 82 : return true; 83 99882 : } 84 200000 : void swap(UniqueCheck& x) noexcept 85 : { 86 200000 : std::swap(x.check_id, check_id); 87 200000 : }; 88 : }; 89 : 90 : 91 : struct MemoryCheck { 92 : static std::atomic<size_t> fake_allocated_memory; 93 1554270 : bool b {false}; 94 498961 : bool operator()() const 95 : { 96 498961 : return true; 97 : } 98 2997000 : MemoryCheck() = default; 99 1110540 : MemoryCheck(const MemoryCheck& x) 100 555270 : { 101 : // We have to do this to make sure that destructor calls are paired 102 : // 103 : // Really, copy constructor should be deletable, but CCheckQueue breaks 104 : // if it is deleted because of internal push_back. 105 555270 : fake_allocated_memory.fetch_add(b, std::memory_order_relaxed); 106 1110540 : }; 107 999000 : MemoryCheck(bool b_) : b(b_) 108 499500 : { 109 499500 : fake_allocated_memory.fetch_add(b, std::memory_order_relaxed); 110 999000 : }; 111 4067666 : ~MemoryCheck() 112 2033049 : { 113 2034617 : fake_allocated_memory.fetch_sub(b, std::memory_order_relaxed); 114 4067666 : }; 115 999000 : void swap(MemoryCheck& x) noexcept 116 : { 117 999000 : std::swap(b, x.b); 118 999000 : }; 119 : }; 120 : 121 : struct FrozenCleanupCheck { 122 : static std::atomic<uint64_t> nFrozen; 123 : static std::condition_variable cv; 124 : static std::mutex m; 125 : // Freezing can't be the default initialized behavior given how the queue 126 : // swaps in default initialized Checks. 127 3 : bool should_freeze {false}; 128 1 : bool operator()() const 129 : { 130 1 : return true; 131 : } 132 9 : FrozenCleanupCheck() = default; 133 6 : ~FrozenCleanupCheck() 134 3 : { 135 3 : if (should_freeze) { 136 1 : std::unique_lock<std::mutex> l(m); 137 1 : nFrozen.store(1, std::memory_order_relaxed); 138 1 : cv.notify_one(); 139 3 : cv.wait(l, []{ return nFrozen.load(std::memory_order_relaxed) == 0;}); 140 1 : } 141 6 : } 142 2 : void swap(FrozenCleanupCheck& x) noexcept 143 : { 144 2 : std::swap(should_freeze, x.should_freeze); 145 2 : }; 146 : }; 147 : 148 : // Static Allocations 149 : std::mutex FrozenCleanupCheck::m{}; 150 : std::atomic<uint64_t> FrozenCleanupCheck::nFrozen{0}; 151 : std::condition_variable FrozenCleanupCheck::cv{}; 152 : Mutex UniqueCheck::m; 153 146 : std::unordered_multiset<size_t> UniqueCheck::results; 154 : std::atomic<size_t> FakeCheckCheckCompletion::n_calls{0}; 155 : std::atomic<size_t> MemoryCheck::fake_allocated_memory{0}; 156 : 157 : // Queue Typedefs 158 : typedef CCheckQueue<FakeCheckCheckCompletion> Correct_Queue; 159 : typedef CCheckQueue<FakeCheck> Standard_Queue; 160 : typedef CCheckQueue<FailingCheck> Failing_Queue; 161 : typedef CCheckQueue<UniqueCheck> Unique_Queue; 162 : typedef CCheckQueue<MemoryCheck> Memory_Queue; 163 : typedef CCheckQueue<FrozenCleanupCheck> FrozenCleanup_Queue; 164 : 165 : 166 : /** This test case checks that the CCheckQueue works properly 167 : * with each specified size_t Checks pushed. 168 : */ 169 4 : static void Correct_Queue_range(std::vector<size_t> range) 170 : { 171 4 : auto small_queue = std::make_unique<Correct_Queue>(QUEUE_BATCH_SIZE); 172 4 : small_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); 173 : // Make vChecks here to save on malloc (this test can be slow...) 174 4 : std::vector<FakeCheckCheckCompletion> vChecks; 175 222 : for (const size_t i : range) { 176 218 : size_t total = i; 177 218 : FakeCheckCheckCompletion::n_calls = 0; 178 218 : CCheckQueueControl<FakeCheckCheckCompletion> control(small_queue.get()); 179 2370250 : while (total) { 180 2370032 : vChecks.resize(std::min(total, (size_t) InsecureRandRange(10))); 181 2370032 : total -= vChecks.size(); 182 2370032 : control.Add(vChecks); 183 : } 184 218 : BOOST_REQUIRE(control.Wait()); 185 218 : if (FakeCheckCheckCompletion::n_calls != i) { 186 0 : BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i); 187 0 : } 188 218 : } 189 4 : small_queue->StopWorkerThreads(); 190 4 : } 191 : 192 : /** Test that 0 checks is correct 193 : */ 194 148 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Zero) 195 : { 196 1 : std::vector<size_t> range; 197 1 : range.push_back(size_t{0}); 198 1 : Correct_Queue_range(range); 199 1 : } 200 : /** Test that 1 check is correct 201 : */ 202 148 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_One) 203 : { 204 1 : std::vector<size_t> range; 205 1 : range.push_back(size_t{1}); 206 1 : Correct_Queue_range(range); 207 1 : } 208 : /** Test that MAX check is correct 209 : */ 210 148 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Max) 211 : { 212 1 : std::vector<size_t> range; 213 1 : range.push_back(100000); 214 1 : Correct_Queue_range(range); 215 1 : } 216 : /** Test that random numbers of checks are correct 217 : */ 218 148 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random) 219 : { 220 1 : std::vector<size_t> range; 221 1 : range.reserve(100000/1000); 222 216 : for (size_t i = 2; i < 100000; i += std::max((size_t)1, (size_t)InsecureRandRange(std::min((size_t)1000, ((size_t)100000) - i)))) 223 215 : range.push_back(i); 224 1 : Correct_Queue_range(range); 225 1 : } 226 : 227 : 228 : /** Test that failing checks are caught */ 229 148 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) 230 : { 231 1 : auto fail_queue = std::make_unique<Failing_Queue>(QUEUE_BATCH_SIZE); 232 1 : fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); 233 : 234 1002 : for (size_t i = 0; i < 1001; ++i) { 235 1001 : CCheckQueueControl<FailingCheck> control(fail_queue.get()); 236 1001 : size_t remaining = i; 237 112585 : while (remaining) { 238 111584 : size_t r = InsecureRandRange(10); 239 : 240 111584 : std::vector<FailingCheck> vChecks; 241 111584 : vChecks.reserve(r); 242 612084 : for (size_t k = 0; k < r && remaining; k++, remaining--) 243 500500 : vChecks.emplace_back(remaining == 1); 244 111584 : control.Add(vChecks); 245 111584 : } 246 1001 : bool success = control.Wait(); 247 1001 : if (i > 0) { 248 1000 : BOOST_REQUIRE(!success); 249 1001 : } else if (i == 0) { 250 1 : BOOST_REQUIRE(success); 251 1 : } 252 1001 : } 253 1 : fail_queue->StopWorkerThreads(); 254 1 : } 255 : // Test that a block validation which fails does not interfere with 256 : // future blocks, ie, the bad state is cleared. 257 148 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) 258 : { 259 1 : auto fail_queue = std::make_unique<Failing_Queue>(QUEUE_BATCH_SIZE); 260 1 : fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); 261 : 262 11 : for (auto times = 0; times < 10; ++times) { 263 30 : for (const bool end_fails : {true, false}) { 264 20 : CCheckQueueControl<FailingCheck> control(fail_queue.get()); 265 : { 266 20 : std::vector<FailingCheck> vChecks; 267 20 : vChecks.resize(100, false); 268 20 : vChecks[99] = end_fails; 269 20 : control.Add(vChecks); 270 20 : } 271 20 : bool r =control.Wait(); 272 20 : BOOST_REQUIRE(r != end_fails); 273 20 : } 274 10 : } 275 1 : fail_queue->StopWorkerThreads(); 276 1 : } 277 : 278 : // Test that unique checks are actually all called individually, rather than 279 : // just one check being called repeatedly. Test that checks are not called 280 : // more than once as well 281 148 : BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) 282 : { 283 1 : auto queue = std::make_unique<Unique_Queue>(QUEUE_BATCH_SIZE); 284 1 : queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); 285 : 286 1 : size_t COUNT = 100000; 287 1 : size_t total = COUNT; 288 : { 289 1 : CCheckQueueControl<UniqueCheck> control(queue.get()); 290 22122 : while (total) { 291 22121 : size_t r = InsecureRandRange(10); 292 22121 : std::vector<UniqueCheck> vChecks; 293 122121 : for (size_t k = 0; k < r && total; k++) 294 100000 : vChecks.emplace_back(--total); 295 22121 : control.Add(vChecks); 296 22121 : } 297 1 : } 298 : { 299 1 : LOCK(UniqueCheck::m); 300 1 : bool r = true; 301 1 : BOOST_REQUIRE_EQUAL(UniqueCheck::results.size(), COUNT); 302 100001 : for (size_t i = 0; i < COUNT; ++i) { 303 100000 : r = r && UniqueCheck::results.count(i) == 1; 304 100000 : } 305 1 : BOOST_REQUIRE(r); 306 1 : } 307 1 : queue->StopWorkerThreads(); 308 1 : } 309 : 310 : 311 : // Test that blocks which might allocate lots of memory free their memory aggressively. 312 : // 313 : // This test attempts to catch a pathological case where by lazily freeing 314 : // checks might mean leaving a check un-swapped out, and decreasing by 1 each 315 : // time could leave the data hanging across a sequence of blocks. 316 148 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) 317 : { 318 1 : auto queue = std::make_unique<Memory_Queue>(QUEUE_BATCH_SIZE); 319 1 : queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); 320 1001 : for (size_t i = 0; i < 1000; ++i) { 321 1000 : size_t total = i; 322 : { 323 1000 : CCheckQueueControl<MemoryCheck> control(queue.get()); 324 112364 : while (total) { 325 111364 : size_t r = InsecureRandRange(10); 326 111364 : std::vector<MemoryCheck> vChecks; 327 610864 : for (size_t k = 0; k < r && total; k++) { 328 499500 : total--; 329 : // Each iteration leaves data at the front, back, and middle 330 : // to catch any sort of deallocation failure 331 499500 : vChecks.emplace_back(total == 0 || total == i || total == i/2); 332 499500 : } 333 111364 : control.Add(vChecks); 334 111364 : } 335 1000 : } 336 1000 : BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U); 337 1000 : } 338 1 : queue->StopWorkerThreads(); 339 1 : } 340 : 341 : // Test that a new verification cannot occur until all checks 342 : // have been destructed 343 148 : BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) 344 : { 345 1 : auto queue = std::make_unique<FrozenCleanup_Queue>(QUEUE_BATCH_SIZE); 346 1 : bool fails = false; 347 1 : queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); 348 2 : std::thread t0([&]() { 349 1 : CCheckQueueControl<FrozenCleanupCheck> control(queue.get()); 350 1 : std::vector<FrozenCleanupCheck> vChecks(1); 351 : // Freezing can't be the default initialized behavior given how the queue 352 : // swaps in default initialized Checks (otherwise freezing destructor 353 : // would get called twice). 354 1 : vChecks[0].should_freeze = true; 355 1 : control.Add(vChecks); 356 1 : bool waitResult = control.Wait(); // Hangs here 357 1 : assert(waitResult); 358 1 : }); 359 : { 360 1 : std::unique_lock<std::mutex> l(FrozenCleanupCheck::m); 361 : // Wait until the queue has finished all jobs and frozen 362 3 : FrozenCleanupCheck::cv.wait(l, [](){return FrozenCleanupCheck::nFrozen == 1;}); 363 1 : } 364 : // Try to get control of the queue a bunch of times 365 101 : for (auto x = 0; x < 100 && !fails; ++x) { 366 100 : fails = queue->m_control_mutex.try_lock(); 367 100 : } 368 : { 369 : // Unfreeze (we need lock n case of spurious wakeup) 370 1 : std::unique_lock<std::mutex> l(FrozenCleanupCheck::m); 371 1 : FrozenCleanupCheck::nFrozen = 0; 372 1 : } 373 : // Awaken frozen destructor 374 1 : FrozenCleanupCheck::cv.notify_one(); 375 : // Wait for control to finish 376 1 : t0.join(); 377 1 : BOOST_REQUIRE(!fails); 378 1 : queue->StopWorkerThreads(); 379 1 : } 380 : 381 : 382 : /** Test that CCheckQueueControl is threadsafe */ 383 148 : BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) 384 : { 385 1 : auto queue = std::make_unique<Standard_Queue>(QUEUE_BATCH_SIZE); 386 : { 387 1 : std::vector<std::thread> tg; 388 1 : tg.reserve(3); 389 1 : std::atomic<int> nThreads {0}; 390 1 : std::atomic<int> fails {0}; 391 4 : for (size_t i = 0; i < 3; ++i) { 392 3 : tg.emplace_back( 393 6 : [&]{ 394 3 : CCheckQueueControl<FakeCheck> control(queue.get()); 395 : // While sleeping, no other thread should execute to this point 396 3 : auto observed = ++nThreads; 397 3 : UninterruptibleSleep(std::chrono::milliseconds{10}); 398 3 : fails += observed != nThreads; 399 3 : }); 400 3 : } 401 4 : for (auto& thread: tg) { 402 3 : if (thread.joinable()) thread.join(); 403 : } 404 1 : BOOST_REQUIRE_EQUAL(fails, 0); 405 1 : } 406 : { 407 1 : std::vector<std::thread> tg; 408 1 : std::mutex m; 409 1 : std::condition_variable cv; 410 1 : bool has_lock{false}; 411 1 : bool has_tried{false}; 412 1 : bool done{false}; 413 1 : bool done_ack{false}; 414 : { 415 1 : std::unique_lock<std::mutex> l(m); 416 2 : tg.emplace_back([&]{ 417 1 : CCheckQueueControl<FakeCheck> control(queue.get()); 418 1 : std::unique_lock<std::mutex> ll(m); 419 1 : has_lock = true; 420 1 : cv.notify_one(); 421 3 : cv.wait(ll, [&]{return has_tried;}); 422 1 : done = true; 423 1 : cv.notify_one(); 424 : // Wait until the done is acknowledged 425 : // 426 3 : cv.wait(ll, [&]{return done_ack;}); 427 1 : }); 428 : // Wait for thread to get the lock 429 3 : cv.wait(l, [&](){return has_lock;}); 430 1 : bool fails = false; 431 101 : for (auto x = 0; x < 100 && !fails; ++x) { 432 100 : fails = queue->m_control_mutex.try_lock(); 433 100 : } 434 1 : has_tried = true; 435 1 : cv.notify_one(); 436 3 : cv.wait(l, [&](){return done;}); 437 : // Acknowledge the done 438 1 : done_ack = true; 439 1 : cv.notify_one(); 440 1 : BOOST_REQUIRE(!fails); 441 1 : } 442 2 : for (auto& thread: tg) { 443 1 : if (thread.joinable()) thread.join(); 444 : } 445 1 : } 446 1 : } 447 146 : BOOST_AUTO_TEST_SUITE_END()