LCOV - code coverage report
Current view: top level - src/llmq - net_signing.cpp (source / functions) Hit Total Coverage
Test: test_dash_coverage.info Lines: 0 236 0.0 %
Date: 2026-06-25 07:23:51 Functions: 0 31 0.0 %

          Line data    Source code
       1             : // Copyright (c) 2025 The Dash Core developers
       2             : // Distributed under the MIT software license, see the accompanying
       3             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4             : 
       5             : #include <llmq/net_signing.h>
       6             : 
       7             : #include <bls/bls_batchverifier.h>
       8             : #include <llmq/quorums.h>
       9             : #include <llmq/signhash.h>
      10             : #include <llmq/signing_shares.h>
      11             : #include <llmq/signing.h>
      12             : #include <spork.h>
      13             : #include <util/std23.h>
      14             : 
      15             : #include <logging.h>
      16             : #include <streams.h>
      17             : #include <util/thread.h>
      18             : #include <validationinterface.h>
      19             : 
      20             : #include <cxxtimer.hpp>
      21             : 
      22             : #include <algorithm>
      23             : #include <ranges>
      24             : #include <unordered_map>
      25             : 
      26             : namespace llmq {
      27           0 : void NetSigning::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv)
      28             : {
      29           0 :     if (msg_type == NetMsgType::QSIGREC) {
      30           0 :         auto recoveredSig = std::make_shared<CRecoveredSig>();
      31           0 :         vRecv >> *recoveredSig;
      32             : 
      33           0 :         WITH_LOCK(cs_main, m_peer_manager->PeerEraseObjectRequest(pfrom.GetId(), CInv{MSG_QUORUM_RECOVERED_SIG,
      34             :                                                                                       recoveredSig->GetHash()}));
      35             : 
      36           0 :         if (!Params().GetLLMQ(recoveredSig->getLlmqType()).has_value()) {
      37           0 :             m_peer_manager->PeerMisbehaving(pfrom.GetId(), 100);
      38           0 :             return;
      39             :         }
      40             : 
      41           0 :         m_sig_manager.VerifyAndProcessRecoveredSig(pfrom.GetId(), std::move(recoveredSig));
      42           0 :     }
      43             : 
      44           0 :     if (m_shares_manager == nullptr) return;
      45             : 
      46           0 :     if (m_sporkman.IsSporkActive(SPORK_21_QUORUM_ALL_CONNECTED) && msg_type == NetMsgType::QSIGSHARE) {
      47           0 :         std::vector<CSigShare> receivedSigShares;
      48           0 :         vRecv >> receivedSigShares;
      49             : 
      50           0 :         if (receivedSigShares.size() > CSigSharesManager::MAX_MSGS_SIG_SHARES) {
      51           0 :             LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n",
      52             :                      __func__, receivedSigShares.size(), CSigSharesManager::MAX_MSGS_SIG_SHARES, pfrom.GetId());
      53           0 :             BanNode(pfrom.GetId());
      54           0 :             return;
      55             :         }
      56             : 
      57           0 :         for (const auto& sigShare : receivedSigShares) {
      58           0 :             if (!m_shares_manager->ProcessMessageSigShare(pfrom.GetId(), sigShare)) {
      59           0 :                 BanNode(pfrom.GetId());
      60           0 :             }
      61             :         }
      62           0 :     }
      63             : 
      64           0 :     if (msg_type == NetMsgType::QSIGSESANN) {
      65           0 :         std::vector<CSigSesAnn> msgs;
      66           0 :         vRecv >> msgs;
      67           0 :         if (msgs.size() > CSigSharesManager::MAX_MSGS_CNT_QSIGSESANN) {
      68           0 :             LogPrint(BCLog::LLMQ_SIGS, /* Continued */
      69             :                      "NetSigning::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n",
      70             :                      __func__, msgs.size(), CSigSharesManager::MAX_MSGS_CNT_QSIGSESANN, pfrom.GetId());
      71           0 :             BanNode(pfrom.GetId());
      72           0 :             return;
      73             :         }
      74           0 :         if (!std::ranges::all_of(msgs, [this, &pfrom](const auto& ann) {
      75           0 :                 return m_shares_manager->ProcessMessageSigSesAnn(pfrom, ann);
      76             :             })) {
      77           0 :             BanNode(pfrom.GetId());
      78           0 :             return;
      79             :         }
      80           0 :     } else if (msg_type == NetMsgType::QSIGSHARESINV || msg_type == NetMsgType::QGETSIGSHARES) {
      81           0 :         std::vector<CSigSharesInv> msgs;
      82           0 :         vRecv >> msgs;
      83           0 :         if (msgs.size() > CSigSharesManager::MAX_MSGS_CNT_QSIGSHARES) {
      84           0 :             LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many invs in %s message. cnt=%d, max=%d, node=%d\n",
      85             :                      __func__, msg_type, msgs.size(), CSigSharesManager::MAX_MSGS_CNT_QSIGSHARES, pfrom.GetId());
      86           0 :             BanNode(pfrom.GetId());
      87           0 :             return;
      88             :         }
      89           0 :         if (!std::ranges::all_of(msgs, [this, &pfrom, &msg_type](const auto& inv) {
      90           0 :                 return m_shares_manager->ProcessMessageSigShares(pfrom, inv, msg_type);
      91             :             })) {
      92           0 :             BanNode(pfrom.GetId());
      93           0 :             return;
      94             :         }
      95           0 :     } else if (msg_type == NetMsgType::QBSIGSHARES) {
      96           0 :         std::vector<CBatchedSigShares> msgs;
      97           0 :         vRecv >> msgs;
      98           0 :         const size_t totalSigsCount = std23::ranges::fold_left(msgs, size_t{0}, [](size_t s, const auto& bs) {
      99           0 :             return s + bs.sigShares.size();
     100             :         });
     101           0 :         if (totalSigsCount > CSigSharesManager::MAX_MSGS_TOTAL_BATCHED_SIGS) {
     102           0 :             LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n",
     103             :                      __func__, msgs.size(), CSigSharesManager::MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom.GetId());
     104           0 :             BanNode(pfrom.GetId());
     105           0 :             return;
     106             :         }
     107           0 :         if (!std::ranges::all_of(msgs, [this, &pfrom](const auto& bs) {
     108           0 :                 return m_shares_manager->ProcessMessageBatchedSigShares(pfrom, bs);
     109             :             })) {
     110           0 :             BanNode(pfrom.GetId());
     111           0 :             return;
     112             :         }
     113           0 :     }
     114           0 : }
     115             : 
     116           0 : void NetSigning::Start()
     117             : {
     118             :     // can't start new thread if we have one running already
     119           0 :     assert(!signing_thread.joinable());
     120           0 :     assert(!shares_cleaning_thread.joinable());
     121           0 :     assert(!shares_dispatcher_thread.joinable());
     122             : 
     123           0 :     signing_thread = std::thread(&util::TraceThread, "recsigs", [this] { WorkThreadSigning(); });
     124             : 
     125           0 :     if (m_shares_manager) {
     126             :         // Initialize worker pool
     127           0 :         int worker_count = std::clamp(static_cast<int>(std::thread::hardware_concurrency() / 2), 1, 4);
     128           0 :         worker_pool.resize(worker_count);
     129           0 :         RenameThreadPool(worker_pool, "sigsh-work");
     130             : 
     131           0 :         shares_cleaning_thread = std::thread(&util::TraceThread, "sigsh-maint", [this] { WorkThreadCleaning(); });
     132           0 :         shares_dispatcher_thread = std::thread(&util::TraceThread, "sigsh-dispat", [this] { WorkThreadDispatcher(); });
     133           0 :     }
     134           0 : }
     135             : 
     136           0 : void NetSigning::Stop()
     137             : {
     138             :     // make sure to call InterruptWorkerThread() first
     139           0 :     if (!workInterrupt) {
     140           0 :         assert(false);
     141             :     }
     142             : 
     143           0 :     if (signing_thread.joinable()) {
     144           0 :         signing_thread.join();
     145           0 :     }
     146             : 
     147           0 :     if (m_shares_manager) {
     148             :         // Join threads FIRST to stop any pending push() calls
     149           0 :         if (shares_cleaning_thread.joinable()) {
     150           0 :             shares_cleaning_thread.join();
     151           0 :         }
     152           0 :         if (shares_dispatcher_thread.joinable()) {
     153           0 :             shares_dispatcher_thread.join();
     154           0 :         }
     155             : 
     156             :         // Then stop worker pool (now safe, no more push() calls)
     157           0 :         worker_pool.clear_queue();
     158           0 :         worker_pool.stop(true);
     159           0 :     }
     160           0 : }
     161             : 
     162           0 : void NetSigning::ProcessRecoveredSig(std::shared_ptr<const CRecoveredSig> recovered_sig, bool consider_proactive_relay)
     163             : {
     164           0 :     if (recovered_sig == nullptr) return;
     165           0 :     if (!m_sig_manager.ProcessRecoveredSig(recovered_sig)) return;
     166             : 
     167           0 :     auto listeners = m_sig_manager.GetListeners();
     168           0 :     for (auto& l : listeners) {
     169           0 :         auto result = l->HandleNewRecoveredSig(*recovered_sig);
     170           0 :         if (const auto* inv = std::get_if<CInv>(&result)) {
     171           0 :             m_peer_manager->PeerRelayInv(*inv);
     172           0 :         } else if (const auto* tx_ref = std::get_if<CTransactionRef>(&result)) {
     173           0 :             m_peer_manager->PeerRelayTransaction((*tx_ref)->GetHash());
     174           0 :         }
     175           0 :     }
     176             : 
     177             :     // TODO refactor to use a better abstraction analogous to IsAllMembersConnectedEnabled
     178           0 :     auto proactive_relay = consider_proactive_relay && recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_100_67 &&
     179           0 :                            recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_400_60 &&
     180           0 :                            recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_400_85;
     181           0 :     GetMainSignals().NotifyRecoveredSig(recovered_sig, recovered_sig->GetHash().ToString(), proactive_relay);
     182           0 : }
     183             : 
     184           0 : bool NetSigning::ProcessPendingRecoveredSigs()
     185             : {
     186           0 :     Uint256HashMap<std::shared_ptr<const CRecoveredSig>> pending{m_sig_manager.FetchPendingReconstructed()};
     187             : 
     188           0 :     for (const auto& p : pending) {
     189           0 :         ProcessRecoveredSig(p.second, true);
     190             :     }
     191             : 
     192           0 :     std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>> recSigsByNode;
     193           0 :     std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CBLSPublicKey, StaticSaltedHasher> pubkeys;
     194             : 
     195           0 :     const size_t nMaxBatchSize{32};
     196           0 :     bool more_work = m_sig_manager.CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, pubkeys);
     197           0 :     if (recSigsByNode.empty()) {
     198           0 :         return false;
     199             :     }
     200             : 
     201             :     // It's ok to perform insecure batched verification here as we verify against the quorum public keys, which are not
     202             :     // craftable by individual entities, making the rogue public key attack impossible
     203           0 :     CBLSBatchVerifier<NodeId, uint256> batchVerifier(false, false);
     204             : 
     205           0 :     size_t verifyCount = 0;
     206           0 :     for (const auto& [nodeId, v] : recSigsByNode) {
     207           0 :         for (const auto& recSig : v) {
     208             :             // we didn't verify the lazy signature until now
     209           0 :             if (!recSig->sig.Get().IsValid()) {
     210           0 :                 batchVerifier.badSources.emplace(nodeId);
     211           0 :                 break;
     212             :             }
     213             : 
     214           0 :             const auto& pubkey = pubkeys.at(std::make_pair(recSig->getLlmqType(), recSig->getQuorumHash()));
     215           0 :             batchVerifier.PushMessage(nodeId, recSig->GetHash(), recSig->buildSignHash().Get(), recSig->sig.Get(), pubkey);
     216           0 :             verifyCount++;
     217             :         }
     218             :     }
     219             : 
     220           0 :     cxxtimer::Timer verifyTimer(true);
     221           0 :     batchVerifier.Verify();
     222           0 :     verifyTimer.stop();
     223             : 
     224           0 :     LogPrint(BCLog::LLMQ, "NetSigning::%s -- verified recovered sig(s). count=%d, vt=%d, nodes=%d\n", __func__,
     225             :              verifyCount, verifyTimer.count(), recSigsByNode.size());
     226             : 
     227           0 :     Uint256HashSet processed;
     228           0 :     for (const auto& [nodeId, v] : recSigsByNode) {
     229           0 :         if (batchVerifier.badSources.count(nodeId)) {
     230           0 :             LogPrint(BCLog::LLMQ, "NetSigning::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId);
     231           0 :             m_peer_manager->PeerMisbehaving(nodeId, 100);
     232           0 :             continue;
     233             :         }
     234             : 
     235           0 :         for (const auto& recSig : v) {
     236           0 :             if (!processed.emplace(recSig->GetHash()).second) {
     237           0 :                 continue;
     238             :             }
     239             : 
     240           0 :             ProcessRecoveredSig(recSig, nodeId == -1);
     241             :         }
     242             :     }
     243             : 
     244           0 :     return more_work;
     245           0 : }
     246             : 
     247           0 : void NetSigning::WorkThreadSigning()
     248             : {
     249           0 :     while (!workInterrupt) {
     250           0 :         bool fMoreWork = ProcessPendingRecoveredSigs();
     251             : 
     252           0 :         constexpr auto CLEANUP_INTERVAL{5s};
     253           0 :         if (cleanupThrottler.TryCleanup(CLEANUP_INTERVAL)) {
     254           0 :             m_sig_manager.Cleanup();
     255           0 :         }
     256             : 
     257             :         // TODO Wakeup when pending signing is needed?
     258           0 :         if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
     259           0 :             return;
     260             :         }
     261             :     }
     262           0 : }
     263             : 
     264           0 : void NetSigning::RemoveBannedNodeStates()
     265             : {
     266           0 :     assert(m_shares_manager != nullptr);
     267             :     // Called regularly to cleanup local node states for banned nodes
     268           0 :     m_shares_manager->RemoveNodesIf([this](NodeId node_id) { return m_peer_manager->PeerIsBanned(node_id); });
     269           0 : }
     270             : 
     271           0 : void NetSigning::BanNode(NodeId nodeId)
     272             : {
     273           0 :     if (nodeId == -1) return;
     274             : 
     275           0 :     m_peer_manager->PeerMisbehaving(nodeId, 100);
     276           0 :     if (m_shares_manager) {
     277           0 :         m_shares_manager->MarkAsBanned(nodeId);
     278           0 :     }
     279           0 : }
     280             : 
     281           0 : void NetSigning::WorkThreadCleaning()
     282             : {
     283           0 :     assert(m_shares_manager);
     284             : 
     285           0 :     while (!workInterrupt) {
     286           0 :         RemoveBannedNodeStates();
     287             : 
     288           0 :         m_shares_manager->SendMessages();
     289           0 :         m_shares_manager->Cleanup();
     290             : 
     291           0 :         workInterrupt.sleep_for(std::chrono::milliseconds(100));
     292             :     }
     293           0 : }
     294             : 
     295           0 : void NetSigning::WorkThreadDispatcher()
     296             : {
     297           0 :     assert(m_shares_manager);
     298             : 
     299           0 :     while (!workInterrupt) {
     300             :         // Dispatch all pending signs (individual tasks)
     301             :         {
     302           0 :             auto signs = m_shares_manager->DispatchPendingSigns();
     303             :             // Dispatch all signs to worker pool
     304           0 :             for (auto& work : signs) {
     305           0 :                 if (workInterrupt) break;
     306             : 
     307           0 :                 worker_pool.push([this, work = std::move(work)](int) mutable {
     308           0 :                     auto rs = m_shares_manager->SignAndProcessSingleShare(std::move(work));
     309           0 :                     ProcessRecoveredSig(rs, true);
     310           0 :                 });
     311             :             }
     312           0 :         }
     313             : 
     314             :         // Collect pending sig shares synchronously and dispatch each batch to a worker for parallel BLS verification
     315           0 :         while (!workInterrupt) {
     316           0 :             std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
     317           0 :             std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
     318             : 
     319           0 :             const size_t nMaxBatchSize{32};
     320           0 :             bool more_work = m_shares_manager->CollectPendingSigSharesToVerify(nMaxBatchSize, sigSharesByNodes, quorums);
     321             : 
     322           0 :             if (sigSharesByNodes.empty()) {
     323           0 :                 break;
     324             :             }
     325             : 
     326           0 :             worker_pool.push([this, sigSharesByNodes = std::move(sigSharesByNodes), quorums = std::move(quorums)](int) mutable {
     327           0 :                 ProcessPendingSigShares(std::move(sigSharesByNodes), std::move(quorums));
     328           0 :             });
     329             : 
     330           0 :             if (!more_work) {
     331           0 :                 break;
     332             :             }
     333           0 :         }
     334             : 
     335             :         // Always sleep briefly between checks
     336           0 :         workInterrupt.sleep_for(std::chrono::milliseconds(10));
     337             :     }
     338           0 : }
     339             : 
     340           0 : void NetSigning::NotifyRecoveredSig(const std::shared_ptr<const CRecoveredSig>& sig, bool proactive_relay)
     341             : {
     342           0 :     m_peer_manager->PeerRelayRecoveredSig(*sig, proactive_relay);
     343           0 : }
     344             : 
     345           0 : void NetSigning::ProcessPendingSigShares(
     346             :     std::unordered_map<NodeId, std::vector<CSigShare>>&& sigSharesByNodes,
     347             :     std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>&& quorums)
     348             : {
     349             :     // It's ok to perform insecure batched verification here as we verify against the quorum public key shares,
     350             :     // which are not craftable by individual entities, making the rogue public key attack impossible
     351           0 :     CBLSBatchVerifier<NodeId, SigShareKey> batchVerifier(false, true);
     352             : 
     353           0 :     cxxtimer::Timer prepareTimer(true);
     354           0 :     size_t verifyCount = 0;
     355           0 :     for (const auto& [nodeId, v] : sigSharesByNodes) {
     356           0 :         for (const auto& sigShare : v) {
     357           0 :             if (m_sig_manager.HasRecoveredSigForId(sigShare.getLlmqType(), sigShare.getId())) {
     358           0 :                 continue;
     359             :             }
     360             : 
     361             :             // Materialize the signature once. Get() internally validates, so if it returns an invalid signature,
     362             :             // we know it's malformed. This avoids calling Get() twice (once for IsValid(), once for PushMessage).
     363           0 :             CBLSSignature sig = sigShare.sigShare.Get();
     364             :             // we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive
     365             :             // deserialization in the message thread
     366           0 :             if (!sig.IsValid()) {
     367           0 :                 BanNode(nodeId);
     368             :                 // don't process any additional shares from this node
     369           0 :                 break;
     370             :             }
     371             : 
     372           0 :             auto quorum = quorums.at(std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash()));
     373           0 :             auto pubKeyShare = quorum->GetPubKeyShare(sigShare.getQuorumMember());
     374             : 
     375           0 :             if (!pubKeyShare.IsValid()) {
     376             :                 // this should really not happen (we already ensured we have the quorum vvec,
     377             :                 // so we should also be able to create all pubkey shares)
     378           0 :                 LogPrintf("NetSigning::%s -- pubKeyShare is invalid, which should not be possible here\n", __func__);
     379           0 :                 assert(false);
     380             :             }
     381             : 
     382           0 :             batchVerifier.PushMessage(nodeId, sigShare.GetKey(), sigShare.GetSignHash(), sig, pubKeyShare);
     383           0 :             verifyCount++;
     384           0 :         }
     385             :     }
     386           0 :     prepareTimer.stop();
     387             : 
     388           0 :     cxxtimer::Timer verifyTimer(true);
     389           0 :     batchVerifier.Verify();
     390           0 :     verifyTimer.stop();
     391             : 
     392           0 :     LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- verified sig shares. count=%d, pt=%d, vt=%d, nodes=%d\n", __func__,
     393             :              verifyCount, prepareTimer.count(), verifyTimer.count(), sigSharesByNodes.size());
     394             : 
     395           0 :     for (const auto& [nodeId, v] : sigSharesByNodes) {
     396           0 :         if (batchVerifier.badSources.count(nodeId) != 0) {
     397           0 :             LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- invalid sig shares from other node, banning peer=%d\n",
     398             :                      __func__, nodeId);
     399             :             // this will also cause re-requesting of the shares that were sent by this node
     400           0 :             BanNode(nodeId);
     401           0 :             continue;
     402             :         }
     403             : 
     404           0 :         auto rec_sigs = m_shares_manager->ProcessPendingSigShares(v, quorums);
     405           0 :         for (auto& rs : rec_sigs) {
     406           0 :             ProcessRecoveredSig(rs, true);
     407             :         }
     408           0 :     }
     409           0 : }
     410             : 
     411             : } // namespace llmq

Generated by: LCOV version 1.16