LCOV - code coverage report
Current view: top level - src/llmq - net_signing.cpp (source / functions) Hit Total Coverage
Test: total_coverage.info Lines: 209 236 88.6 %
Date: 2026-06-25 07:23:43 Functions: 31 31 100.0 %

          Line data    Source code
       1             : // Copyright (c) 2025 The Dash Core developers
       2             : // Distributed under the MIT software license, see the accompanying
       3             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4             : 
       5             : #include <llmq/net_signing.h>
       6             : 
       7             : #include <bls/bls_batchverifier.h>
       8             : #include <llmq/quorums.h>
       9             : #include <llmq/signhash.h>
      10             : #include <llmq/signing_shares.h>
      11             : #include <llmq/signing.h>
      12             : #include <spork.h>
      13             : #include <util/std23.h>
      14             : 
      15             : #include <logging.h>
      16             : #include <streams.h>
      17             : #include <util/thread.h>
      18             : #include <validationinterface.h>
      19             : 
      20             : #include <cxxtimer.hpp>
      21             : 
      22             : #include <algorithm>
      23             : #include <ranges>
      24             : #include <unordered_map>
      25             : 
      26             : namespace llmq {
      27       96785 : void NetSigning::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv)
      28             : {
      29       96785 :     if (msg_type == NetMsgType::QSIGREC) {
      30       33556 :         auto recoveredSig = std::make_shared<CRecoveredSig>();
      31       33556 :         vRecv >> *recoveredSig;
      32             : 
      33       67112 :         WITH_LOCK(cs_main, m_peer_manager->PeerEraseObjectRequest(pfrom.GetId(), CInv{MSG_QUORUM_RECOVERED_SIG,
      34             :                                                                                       recoveredSig->GetHash()}));
      35             : 
      36       33556 :         if (!Params().GetLLMQ(recoveredSig->getLlmqType()).has_value()) {
      37           0 :             m_peer_manager->PeerMisbehaving(pfrom.GetId(), 100);
      38           0 :             return;
      39             :         }
      40             : 
      41       33556 :         m_sig_manager.VerifyAndProcessRecoveredSig(pfrom.GetId(), std::move(recoveredSig));
      42       33556 :     }
      43             : 
      44       96785 :     if (m_shares_manager == nullptr) return;
      45             : 
      46       94137 :     if (m_sporkman.IsSporkActive(SPORK_21_QUORUM_ALL_CONNECTED) && msg_type == NetMsgType::QSIGSHARE) {
      47        5601 :         std::vector<CSigShare> receivedSigShares;
      48        5601 :         vRecv >> receivedSigShares;
      49             : 
      50        5601 :         if (receivedSigShares.size() > CSigSharesManager::MAX_MSGS_SIG_SHARES) {
      51           0 :             LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n",
      52             :                      __func__, receivedSigShares.size(), CSigSharesManager::MAX_MSGS_SIG_SHARES, pfrom.GetId());
      53           0 :             BanNode(pfrom.GetId());
      54           0 :             return;
      55             :         }
      56             : 
      57       11980 :         for (const auto& sigShare : receivedSigShares) {
      58        6379 :             if (!m_shares_manager->ProcessMessageSigShare(pfrom.GetId(), sigShare)) {
      59           0 :                 BanNode(pfrom.GetId());
      60           0 :             }
      61             :         }
      62        5601 :     }
      63             : 
      64       94137 :     if (msg_type == NetMsgType::QSIGSESANN) {
      65       12988 :         std::vector<CSigSesAnn> msgs;
      66       12988 :         vRecv >> msgs;
      67       12988 :         if (msgs.size() > CSigSharesManager::MAX_MSGS_CNT_QSIGSESANN) {
      68           0 :             LogPrint(BCLog::LLMQ_SIGS, /* Continued */
      69             :                      "NetSigning::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n",
      70             :                      __func__, msgs.size(), CSigSharesManager::MAX_MSGS_CNT_QSIGSESANN, pfrom.GetId());
      71           0 :             BanNode(pfrom.GetId());
      72           0 :             return;
      73             :         }
      74       34804 :         if (!std::ranges::all_of(msgs, [this, &pfrom](const auto& ann) {
      75       21816 :                 return m_shares_manager->ProcessMessageSigSesAnn(pfrom, ann);
      76             :             })) {
      77           0 :             BanNode(pfrom.GetId());
      78           0 :             return;
      79             :         }
      80       94137 :     } else if (msg_type == NetMsgType::QSIGSHARESINV || msg_type == NetMsgType::QGETSIGSHARES) {
      81        9262 :         std::vector<CSigSharesInv> msgs;
      82        9262 :         vRecv >> msgs;
      83        9262 :         if (msgs.size() > CSigSharesManager::MAX_MSGS_CNT_QSIGSHARES) {
      84           0 :             LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many invs in %s message. cnt=%d, max=%d, node=%d\n",
      85             :                      __func__, msg_type, msgs.size(), CSigSharesManager::MAX_MSGS_CNT_QSIGSHARES, pfrom.GetId());
      86           0 :             BanNode(pfrom.GetId());
      87           0 :             return;
      88             :         }
      89       24595 :         if (!std::ranges::all_of(msgs, [this, &pfrom, &msg_type](const auto& inv) {
      90       15333 :                 return m_shares_manager->ProcessMessageSigShares(pfrom, inv, msg_type);
      91             :             })) {
      92           0 :             BanNode(pfrom.GetId());
      93           0 :             return;
      94             :         }
      95       81149 :     } else if (msg_type == NetMsgType::QBSIGSHARES) {
      96       11447 :         std::vector<CBatchedSigShares> msgs;
      97       11447 :         vRecv >> msgs;
      98       30340 :         const size_t totalSigsCount = std23::ranges::fold_left(msgs, size_t{0}, [](size_t s, const auto& bs) {
      99       18893 :             return s + bs.sigShares.size();
     100             :         });
     101       11447 :         if (totalSigsCount > CSigSharesManager::MAX_MSGS_TOTAL_BATCHED_SIGS) {
     102           0 :             LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n",
     103             :                      __func__, msgs.size(), CSigSharesManager::MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom.GetId());
     104           0 :             BanNode(pfrom.GetId());
     105           0 :             return;
     106             :         }
     107       30340 :         if (!std::ranges::all_of(msgs, [this, &pfrom](const auto& bs) {
     108       18893 :                 return m_shares_manager->ProcessMessageBatchedSigShares(pfrom, bs);
     109             :             })) {
     110           0 :             BanNode(pfrom.GetId());
     111           0 :             return;
     112             :         }
     113       11447 :     }
     114       96785 : }
     115             : 
     116        2831 : void NetSigning::Start()
     117             : {
     118             :     // can't start new thread if we have one running already
     119        2831 :     assert(!signing_thread.joinable());
     120        2831 :     assert(!shares_cleaning_thread.joinable());
     121        2831 :     assert(!shares_dispatcher_thread.joinable());
     122             : 
     123        5662 :     signing_thread = std::thread(&util::TraceThread, "recsigs", [this] { WorkThreadSigning(); });
     124             : 
     125        2831 :     if (m_shares_manager) {
     126             :         // Initialize worker pool
     127         660 :         int worker_count = std::clamp(static_cast<int>(std::thread::hardware_concurrency() / 2), 1, 4);
     128         660 :         worker_pool.resize(worker_count);
     129         660 :         RenameThreadPool(worker_pool, "sigsh-work");
     130             : 
     131        1320 :         shares_cleaning_thread = std::thread(&util::TraceThread, "sigsh-maint", [this] { WorkThreadCleaning(); });
     132        1320 :         shares_dispatcher_thread = std::thread(&util::TraceThread, "sigsh-dispat", [this] { WorkThreadDispatcher(); });
     133         660 :     }
     134        2831 : }
     135             : 
     136        5714 : void NetSigning::Stop()
     137             : {
     138             :     // make sure to call InterruptWorkerThread() first
     139        5714 :     if (!workInterrupt) {
     140           0 :         assert(false);
     141             :     }
     142             : 
     143        5714 :     if (signing_thread.joinable()) {
     144        2831 :         signing_thread.join();
     145        2831 :     }
     146             : 
     147        5714 :     if (m_shares_manager) {
     148             :         // Join threads FIRST to stop any pending push() calls
     149        1320 :         if (shares_cleaning_thread.joinable()) {
     150         660 :             shares_cleaning_thread.join();
     151         660 :         }
     152        1320 :         if (shares_dispatcher_thread.joinable()) {
     153         660 :             shares_dispatcher_thread.join();
     154         660 :         }
     155             : 
     156             :         // Then stop worker pool (now safe, no more push() calls)
     157        1320 :         worker_pool.clear_queue();
     158        1320 :         worker_pool.stop(true);
     159        1320 :     }
     160        5714 : }
     161             : 
     162       45657 : void NetSigning::ProcessRecoveredSig(std::shared_ptr<const CRecoveredSig> recovered_sig, bool consider_proactive_relay)
     163             : {
     164       45657 :     if (recovered_sig == nullptr) return;
     165       27558 :     if (!m_sig_manager.ProcessRecoveredSig(recovered_sig)) return;
     166             : 
     167       27218 :     auto listeners = m_sig_manager.GetListeners();
     168      135077 :     for (auto& l : listeners) {
     169      107859 :         auto result = l->HandleNewRecoveredSig(*recovered_sig);
     170      107859 :         if (const auto* inv = std::get_if<CInv>(&result)) {
     171        4888 :             m_peer_manager->PeerRelayInv(*inv);
     172      107859 :         } else if (const auto* tx_ref = std::get_if<CTransactionRef>(&result)) {
     173         188 :             m_peer_manager->PeerRelayTransaction((*tx_ref)->GetHash());
     174         188 :         }
     175      107859 :     }
     176             : 
     177             :     // TODO refactor to use a better abstraction analogous to IsAllMembersConnectedEnabled
     178       36121 :     auto proactive_relay = consider_proactive_relay && recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_100_67 &&
     179        8903 :                            recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_400_60 &&
     180        8903 :                            recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_400_85;
     181       27218 :     GetMainSignals().NotifyRecoveredSig(recovered_sig, recovered_sig->GetHash().ToString(), proactive_relay);
     182       45657 : }
     183             : 
     184      378122 : bool NetSigning::ProcessPendingRecoveredSigs()
     185             : {
     186      378122 :     Uint256HashMap<std::shared_ptr<const CRecoveredSig>> pending{m_sig_manager.FetchPendingReconstructed()};
     187             : 
     188      378403 :     for (const auto& p : pending) {
     189         281 :         ProcessRecoveredSig(p.second, true);
     190             :     }
     191             : 
     192      378122 :     std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>> recSigsByNode;
     193      378122 :     std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CBLSPublicKey, StaticSaltedHasher> pubkeys;
     194             : 
     195      378122 :     const size_t nMaxBatchSize{32};
     196      378122 :     bool more_work = m_sig_manager.CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, pubkeys);
     197      378122 :     if (recSigsByNode.empty()) {
     198      365874 :         return false;
     199             :     }
     200             : 
     201             :     // It's ok to perform insecure batched verification here as we verify against the quorum public keys, which are not
     202             :     // craftable by individual entities, making the rogue public key attack impossible
     203       12248 :     CBLSBatchVerifier<NodeId, uint256> batchVerifier(false, false);
     204             : 
     205       12248 :     size_t verifyCount = 0;
     206       51886 :     for (const auto& [nodeId, v] : recSigsByNode) {
     207       39638 :         for (const auto& recSig : v) {
     208             :             // we didn't verify the lazy signature until now
     209       22431 :             if (!recSig->sig.Get().IsValid()) {
     210           0 :                 batchVerifier.badSources.emplace(nodeId);
     211           0 :                 break;
     212             :             }
     213             : 
     214       22431 :             const auto& pubkey = pubkeys.at(std::make_pair(recSig->getLlmqType(), recSig->getQuorumHash()));
     215       22431 :             batchVerifier.PushMessage(nodeId, recSig->GetHash(), recSig->buildSignHash().Get(), recSig->sig.Get(), pubkey);
     216       22431 :             verifyCount++;
     217             :         }
     218             :     }
     219             : 
     220       12248 :     cxxtimer::Timer verifyTimer(true);
     221       12248 :     batchVerifier.Verify();
     222       12248 :     verifyTimer.stop();
     223             : 
     224       12248 :     LogPrint(BCLog::LLMQ, "NetSigning::%s -- verified recovered sig(s). count=%d, vt=%d, nodes=%d\n", __func__,
     225             :              verifyCount, verifyTimer.count(), recSigsByNode.size());
     226             : 
     227       12248 :     Uint256HashSet processed;
     228       65164 :     for (const auto& [nodeId, v] : recSigsByNode) {
     229       34414 :         if (batchVerifier.badSources.count(nodeId)) {
     230           2 :             LogPrint(BCLog::LLMQ, "NetSigning::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId);
     231           4 :             m_peer_manager->PeerMisbehaving(nodeId, 100);
     232           2 :             continue;
     233             :         }
     234             : 
     235       39620 :         for (const auto& recSig : v) {
     236       22415 :             if (!processed.emplace(recSig->GetHash()).second) {
     237        3915 :                 continue;
     238             :             }
     239             : 
     240       37000 :             ProcessRecoveredSig(recSig, nodeId == -1);
     241             :         }
     242             :     }
     243             : 
     244       12248 :     return more_work;
     245      378122 : }
     246             : 
     247        2831 : void NetSigning::WorkThreadSigning()
     248             : {
     249      378124 :     while (!workInterrupt) {
     250      378122 :         bool fMoreWork = ProcessPendingRecoveredSigs();
     251             : 
     252      378122 :         constexpr auto CLEANUP_INTERVAL{5s};
     253      378122 :         if (cleanupThrottler.TryCleanup(CLEANUP_INTERVAL)) {
     254       17341 :             m_sig_manager.Cleanup();
     255       17341 :         }
     256             : 
     257             :         // TODO Wakeup when pending signing is needed?
     258      378122 :         if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
     259        2829 :             return;
     260             :         }
     261             :     }
     262        2831 : }
     263             : 
     264      216234 : void NetSigning::RemoveBannedNodeStates()
     265             : {
     266      216234 :     assert(m_shares_manager != nullptr);
     267             :     // Called regularly to cleanup local node states for banned nodes
     268      493395 :     m_shares_manager->RemoveNodesIf([this](NodeId node_id) { return m_peer_manager->PeerIsBanned(node_id); });
     269      216234 : }
     270             : 
     271           9 : void NetSigning::BanNode(NodeId nodeId)
     272             : {
     273           9 :     if (nodeId == -1) return;
     274             : 
     275           9 :     m_peer_manager->PeerMisbehaving(nodeId, 100);
     276           9 :     if (m_shares_manager) {
     277           9 :         m_shares_manager->MarkAsBanned(nodeId);
     278           9 :     }
     279           9 : }
     280             : 
     281         660 : void NetSigning::WorkThreadCleaning()
     282             : {
     283         660 :     assert(m_shares_manager);
     284             : 
     285      216894 :     while (!workInterrupt) {
     286      216234 :         RemoveBannedNodeStates();
     287             : 
     288      216234 :         m_shares_manager->SendMessages();
     289      216234 :         m_shares_manager->Cleanup();
     290             : 
     291      216234 :         workInterrupt.sleep_for(std::chrono::milliseconds(100));
     292             :     }
     293         660 : }
     294             : 
     295         660 : void NetSigning::WorkThreadDispatcher()
     296             : {
     297         660 :     assert(m_shares_manager);
     298             : 
     299      709571 :     while (!workInterrupt) {
     300             :         // Dispatch all pending signs (individual tasks)
     301             :         {
     302      708911 :             auto signs = m_shares_manager->DispatchPendingSigns();
     303             :             // Dispatch all signs to worker pool
     304      727323 :             for (auto& work : signs) {
     305       18412 :                 if (workInterrupt) break;
     306             : 
     307       36824 :                 worker_pool.push([this, work = std::move(work)](int) mutable {
     308       18412 :                     auto rs = m_shares_manager->SignAndProcessSingleShare(std::move(work));
     309       18412 :                     ProcessRecoveredSig(rs, true);
     310       18412 :                 });
     311             :             }
     312      708911 :         }
     313             : 
     314             :         // Collect pending sig shares synchronously and dispatch each batch to a worker for parallel BLS verification
     315      708919 :         while (!workInterrupt) {
     316      708919 :             std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
     317      708919 :             std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
     318             : 
     319      708919 :             const size_t nMaxBatchSize{32};
     320      708919 :             bool more_work = m_shares_manager->CollectPendingSigSharesToVerify(nMaxBatchSize, sigSharesByNodes, quorums);
     321             : 
     322      708919 :             if (sigSharesByNodes.empty()) {
     323      696737 :                 break;
     324             :             }
     325             : 
     326       24364 :             worker_pool.push([this, sigSharesByNodes = std::move(sigSharesByNodes), quorums = std::move(quorums)](int) mutable {
     327       12182 :                 ProcessPendingSigShares(std::move(sigSharesByNodes), std::move(quorums));
     328       12182 :             });
     329             : 
     330       12182 :             if (!more_work) {
     331       12174 :                 break;
     332             :             }
     333      708919 :         }
     334             : 
     335             :         // Always sleep briefly between checks
     336      708911 :         workInterrupt.sleep_for(std::chrono::milliseconds(10));
     337             :     }
     338         660 : }
     339             : 
     340       27204 : void NetSigning::NotifyRecoveredSig(const std::shared_ptr<const CRecoveredSig>& sig, bool proactive_relay)
     341             : {
     342       27204 :     m_peer_manager->PeerRelayRecoveredSig(*sig, proactive_relay);
     343       27204 : }
     344             : 
     345       12182 : void NetSigning::ProcessPendingSigShares(
     346             :     std::unordered_map<NodeId, std::vector<CSigShare>>&& sigSharesByNodes,
     347             :     std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>&& quorums)
     348             : {
     349             :     // It's ok to perform insecure batched verification here as we verify against the quorum public key shares,
     350             :     // which are not craftable by individual entities, making the rogue public key attack impossible
     351       12182 :     CBLSBatchVerifier<NodeId, SigShareKey> batchVerifier(false, true);
     352             : 
     353       12182 :     cxxtimer::Timer prepareTimer(true);
     354       12182 :     size_t verifyCount = 0;
     355       50372 :     for (const auto& [nodeId, v] : sigSharesByNodes) {
     356       38224 :         for (const auto& sigShare : v) {
     357       22839 :             if (m_sig_manager.HasRecoveredSigForId(sigShare.getLlmqType(), sigShare.getId())) {
     358          39 :                 continue;
     359             :             }
     360             : 
     361             :             // Materialize the signature once. Get() internally validates, so if it returns an invalid signature,
     362             :             // we know it's malformed. This avoids calling Get() twice (once for IsValid(), once for PushMessage).
     363       22800 :             CBLSSignature sig = sigShare.sigShare.Get();
     364             :             // we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive
     365             :             // deserialization in the message thread
     366       22800 :             if (!sig.IsValid()) {
     367           5 :                 BanNode(nodeId);
     368             :                 // don't process any additional shares from this node
     369           5 :                 break;
     370             :             }
     371             : 
     372       22795 :             auto quorum = quorums.at(std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash()));
     373       22795 :             auto pubKeyShare = quorum->GetPubKeyShare(sigShare.getQuorumMember());
     374             : 
     375       22795 :             if (!pubKeyShare.IsValid()) {
     376             :                 // this should really not happen (we already ensured we have the quorum vvec,
     377             :                 // so we should also be able to create all pubkey shares)
     378           0 :                 LogPrintf("NetSigning::%s -- pubKeyShare is invalid, which should not be possible here\n", __func__);
     379           0 :                 assert(false);
     380             :             }
     381             : 
     382       22795 :             batchVerifier.PushMessage(nodeId, sigShare.GetKey(), sigShare.GetSignHash(), sig, pubKeyShare);
     383       22795 :             verifyCount++;
     384       22795 :         }
     385             :     }
     386       12182 :     prepareTimer.stop();
     387             : 
     388       12182 :     cxxtimer::Timer verifyTimer(true);
     389       12182 :     batchVerifier.Verify();
     390       12182 :     verifyTimer.stop();
     391             : 
     392       12182 :     LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- verified sig shares. count=%d, pt=%d, vt=%d, nodes=%d\n", __func__,
     393             :              verifyCount, prepareTimer.count(), verifyTimer.count(), sigSharesByNodes.size());
     394             : 
     395       42966 :     for (const auto& [nodeId, v] : sigSharesByNodes) {
     396       30780 :         if (batchVerifier.badSources.count(nodeId) != 0) {
     397           4 :             LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- invalid sig shares from other node, banning peer=%d\n",
     398             :                      __func__, nodeId);
     399             :             // this will also cause re-requesting of the shares that were sent by this node
     400           4 :             BanNode(nodeId);
     401           4 :             continue;
     402             :         }
     403             : 
     404       30772 :         auto rec_sigs = m_shares_manager->ProcessPendingSigShares(v, quorums);
     405       23850 :         for (auto& rs : rec_sigs) {
     406        8464 :             ProcessRecoveredSig(rs, true);
     407             :         }
     408       15386 :     }
     409       12182 : }
     410             : 
     411             : } // namespace llmq

Generated by: LCOV version 1.16