LCOV - code coverage report
Current view: top level - src/llmq - net_quorum.cpp (source / functions) Hit Total Coverage
Test: total_coverage.info Lines: 448 497 90.1 %
Date: 2026-06-25 07:23:43 Functions: 43 43 100.0 %

          Line data    Source code
       1             : // Copyright (c) 2025-2026 The Dash Core developers
       2             : // Distributed under the MIT software license, see the accompanying
       3             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4             : 
       5             : #include <llmq/net_quorum.h>
       6             : 
       7             : #include <active/masternode.h>
       8             : #include <chainparams.h>
       9             : #include <evo/deterministicmns.h>
      10             : #include <llmq/commitment.h>
      11             : #include <llmq/options.h>
      12             : #include <llmq/quorumsman.h>
      13             : #include <llmq/utils.h>
      14             : #include <logging.h>
      15             : #include <masternode/sync.h>
      16             : #include <net.h>
      17             : #include <netmessagemaker.h>
      18             : #include <util/helpers.h>
      19             : #include <util/std23.h>
      20             : #include <util/thread.h>
      21             : #include <util/time.h>
      22             : #include <validation.h>
      23             : 
      24             : #include <cxxtimer.hpp>
      25             : 
      26             : #include <algorithm>
      27             : #include <ranges>
      28             : 
      29             : namespace llmq {
      30             : 
      31        8571 : NetQuorum::NetQuorum(PeerManagerInternal* peer_manager, CBLSWorker& bls_worker,
      32             :                      CConnman& connman, CDeterministicMNManager& dmnman, CQuorumManager& qman,
      33             :                      CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman,
      34             :                      const CMasternodeSync& mn_sync, const CSporkManager& sporkman,
      35             :                      QuorumRole* quorum_role, CActiveMasternodeManager* nodeman,
      36             :                      int16_t worker_count, const QvvecSyncModeMap& sync_map, bool quorums_recovery) :
      37        2857 :     NetHandler(peer_manager),
      38        2857 :     m_bls_worker{bls_worker},
      39        2857 :     m_connman{connman},
      40        2857 :     m_dmnman{dmnman},
      41        2857 :     m_qman{qman},
      42        2857 :     m_qsnapman{qsnapman},
      43        2857 :     m_chainman{chainman},
      44        2857 :     m_mn_sync{mn_sync},
      45        2857 :     m_sporkman{sporkman},
      46        2857 :     m_role{quorum_role},
      47        2857 :     m_nodeman{nodeman},
      48        2857 :     m_worker_count{worker_count},
      49        2857 :     m_sync_map{sync_map},
      50        2857 :     m_quorums_recovery{quorums_recovery}
      51        8571 : {
      52        2857 :     quorumThreadInterrupt.reset();
      53        5714 : }
      54             : 
      55             : // NetHandler
      56             : 
      57        2831 : void NetQuorum::Start()
      58             : {
      59        2831 :     if (!m_role) return;
      60         666 :     assert(m_worker_count > 0);
      61         666 :     workerPool.resize(m_worker_count);
      62         666 :     RenameThreadPool(workerPool, "q-mngr");
      63        2831 : }
      64             : 
      65        5714 : void NetQuorum::Stop()
      66             : {
      67        5714 :     workerPool.clear_queue();
      68        5714 :     workerPool.stop(true);
      69        5714 : }
      70             : 
      71       96785 : void NetQuorum::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv)
      72             : {
      73       96785 :     if (msg_type == NetMsgType::QGETDATA) {
      74         335 :         if (!m_role || !m_role->IsMasternode() || (pfrom.GetVerifiedProRegTxHash().IsNull() && !pfrom.qwatch)) {
      75          12 :             m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not a verified masternode or a qwatch connection");
      76          12 :             return;
      77             :         }
      78             : 
      79         323 :         CQuorumDataRequest request;
      80         323 :         vRecv >> request;
      81             : 
      82         646 :         auto sendQDATA = [&](CQuorumDataRequest::Errors nError,
      83             :                              bool request_limit_exceeded,
      84             :                              const CDataStream& body = CDataStream(SER_NETWORK, PROTOCOL_VERSION)) -> bool {
      85         323 :             bool misbehave = false;
      86         323 :             switch (nError) {
      87             :                 case (CQuorumDataRequest::Errors::NONE):
      88             :                 case (CQuorumDataRequest::Errors::QUORUM_TYPE_INVALID):
      89             :                 case (CQuorumDataRequest::Errors::QUORUM_BLOCK_NOT_FOUND):
      90             :                 case (CQuorumDataRequest::Errors::QUORUM_NOT_FOUND):
      91             :                 case (CQuorumDataRequest::Errors::MASTERNODE_IS_NO_MEMBER):
      92             :                 case (CQuorumDataRequest::Errors::UNDEFINED):
      93         292 :                     misbehave = request_limit_exceeded;
      94         292 :                     break;
      95             :                 case (CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING):
      96             :                 case (CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING):
      97             :                     // Do not punish limit exceed if we don't have the requested data
      98          31 :                     break;
      99             :             }
     100         323 :             request.SetError(nError);
     101         323 :             CDataStream ssResponse{SER_NETWORK, pfrom.GetCommonVersion()};
     102         323 :             ssResponse << request << body;
     103         323 :             m_connman.PushMessage(&pfrom, CNetMsgMaker(pfrom.GetCommonVersion()).Make(NetMsgType::QDATA, ssResponse));
     104         323 :             return misbehave;
     105         323 :         };
     106             : 
     107         323 :         const CQuorumDataRequestKey key(pfrom.GetVerifiedProRegTxHash(), false, request.GetQuorumHash(), request.GetLLMQType());
     108         323 :         const bool request_limit_exceeded = !m_qman.RegisterDataRequest(key, request, /*add_expiry_bias=*/false);
     109             : 
     110         323 :         if (!Params().GetLLMQ(request.GetLLMQType()).has_value()) {
     111           6 :             if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_TYPE_INVALID, request_limit_exceeded)) {
     112           0 :                 m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
     113           0 :             }
     114           6 :             return;
     115             :         }
     116             : 
     117         634 :         const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(::cs_main, return m_chainman.m_blockman.LookupBlockIndex(request.GetQuorumHash()));
     118         317 :         if (pQuorumBaseBlockIndex == nullptr) {
     119           6 :             if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_BLOCK_NOT_FOUND, request_limit_exceeded)) {
     120           0 :                 m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
     121           0 :             }
     122           6 :             return;
     123             :         }
     124             : 
     125         311 :         const auto pQuorum = m_qman.GetQuorum(request.GetLLMQType(), request.GetQuorumHash());
     126         311 :         if (pQuorum == nullptr) {
     127           6 :             if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_NOT_FOUND, request_limit_exceeded)) {
     128           0 :                 m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
     129           0 :             }
     130           6 :             return;
     131             :         }
     132             : 
     133         305 :         CDataStream ssResponseData(SER_NETWORK, pfrom.GetCommonVersion());
     134             : 
     135             :         // Check if request wants QUORUM_VERIFICATION_VECTOR data
     136         305 :         if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) {
     137         287 :             if (!pQuorum->HasVerificationVector()) {
     138          17 :                 if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING, request_limit_exceeded)) {
     139           0 :                     m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
     140           0 :                 }
     141          17 :                 return;
     142             :             }
     143         270 :             ssResponseData << *pQuorum->GetVerificationVector();
     144         270 :         }
     145             : 
     146             :         // Check if request wants ENCRYPTED_CONTRIBUTIONS data
     147         288 :         bool misbehave_contrib = ProcessContribQGETDATA(ssResponseData, *pQuorum, request, pQuorumBaseBlockIndex);
     148             : 
     149         288 :         CQuorumDataRequest::Errors ret_err{CQuorumDataRequest::Errors::NONE};
     150         288 :         if (auto request_err = request.GetError(); request_err != CQuorumDataRequest::Errors::NONE &&
     151         288 :                                                    request_err != CQuorumDataRequest::Errors::UNDEFINED) {
     152          20 :             ret_err = request_err;
     153          20 :         }
     154             : 
     155         576 :         bool misbehave_qdata = (ret_err != CQuorumDataRequest::Errors::NONE)
     156          20 :             ? sendQDATA(ret_err, request_limit_exceeded)
     157         268 :             : sendQDATA(CQuorumDataRequest::Errors::NONE, request_limit_exceeded, ssResponseData);
     158             : 
     159         288 :         if (request_limit_exceeded && (misbehave_contrib || misbehave_qdata)) {
     160          90 :             m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
     161          90 :         }
     162         288 :         return;
     163         311 :     }
     164             : 
     165       96450 :     if (msg_type == NetMsgType::QDATA) {
     166         185 :         if (!m_role || pfrom.GetVerifiedProRegTxHash().IsNull()) {
     167           6 :             m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not a verified masternode and -watchquorums is not enabled");
     168           6 :             return;
     169             :         }
     170             : 
     171         179 :         CQuorumDataRequest request;
     172         179 :         vRecv >> request;
     173             : 
     174             :         {
     175         179 :             const CQuorumDataRequestKey key(pfrom.GetVerifiedProRegTxHash(), true, request.GetQuorumHash(), request.GetLLMQType());
     176         179 :             const auto validation = m_qman.ValidateDataResponse(key, request);
     177         179 :             switch (validation) {
     178             :             case CQuorumManager::DataResponseValidation::NotRequested:
     179           6 :                 m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not requested");
     180           6 :                 return;
     181             :             case CQuorumManager::DataResponseValidation::AlreadyReceived:
     182           6 :                 m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "already received");
     183           6 :                 return;
     184             :             case CQuorumManager::DataResponseValidation::Mismatch:
     185          18 :                 m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not like requested");
     186          18 :                 return;
     187             :             case CQuorumManager::DataResponseValidation::OK:
     188         149 :                 break;
     189             :             }
     190             :         }
     191             : 
     192         149 :         if (request.GetError() != CQuorumDataRequest::Errors::NONE) {
     193          19 :             LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: Error %d (%s), from peer=%d\n", __func__, msg_type, request.GetError(), request.GetErrorString(), pfrom.GetId());
     194          19 :             return;
     195             :         }
     196             : 
     197         130 :         CQuorumPtr pQuorum = m_qman.GetCachedMutableQuorum(request.GetLLMQType(), request.GetQuorumHash());
     198         130 :         if (!pQuorum) {
     199             :             // Don't bump score because we asked for it
     200           0 :             LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: Quorum not found, from peer=%d\n", __func__, msg_type, pfrom.GetId());
     201           0 :             return;
     202             :         }
     203             : 
     204             :         // Check if request has QUORUM_VERIFICATION_VECTOR data
     205         130 :         if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) {
     206         130 :             std::vector<CBLSPublicKey> verificationVector;
     207         130 :             vRecv >> verificationVector;
     208             : 
     209         130 :             if (pQuorum->SetVerificationVector(verificationVector)) {
     210         130 :                 m_qman.QueueQuorumForWarming(pQuorum);
     211         130 :             } else {
     212           0 :                 m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "invalid quorum verification vector");
     213           0 :                 return;
     214             :             }
     215         130 :         }
     216             : 
     217             :         // Check if request has ENCRYPTED_CONTRIBUTIONS data
     218         130 :         if (!ProcessContribQDATA(pfrom, vRecv, *pQuorum, request)) {
     219           0 :             return;
     220             :         }
     221             : 
     222         130 :         m_qman.WriteContributions(pQuorum);
     223         130 :     }
     224       96785 : }
     225             : 
     226         288 : bool NetQuorum::ProcessContribQGETDATA(CDataStream& ssResponseData, const CQuorum& quorum,
     227             :                                        CQuorumDataRequest& request,
     228             :                                        gsl::not_null<const CBlockIndex*> block_index) const
     229             : {
     230         288 :     if (!(request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS)) {
     231         201 :         return false;
     232             :     }
     233             : 
     234          87 :     if (!m_nodeman) {
     235           0 :         request.SetError(CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING);
     236           0 :         return true;
     237             :     }
     238             : 
     239          87 :     int memberIdx = quorum.GetMemberIndex(request.GetProTxHash());
     240          87 :     if (memberIdx == -1) {
     241           6 :         request.SetError(CQuorumDataRequest::Errors::MASTERNODE_IS_NO_MEMBER);
     242           6 :         return true;
     243             :     }
     244             : 
     245          81 :     std::vector<CBLSIESEncryptedObject<CBLSSecretKey>> vecEncrypted;
     246         162 :     if (!m_qman.GetEncryptedContributions(request.GetLLMQType(), block_index,
     247          81 :                                          quorum.qc->validMembers, request.GetProTxHash(), vecEncrypted)) {
     248          14 :         request.SetError(CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING);
     249          14 :         return true;
     250             :     }
     251             : 
     252          67 :     ssResponseData << vecEncrypted;
     253          67 :     return false;
     254         288 : }
     255             : 
     256         130 : bool NetQuorum::ProcessContribQDATA(CNode& pfrom, CDataStream& vRecv,
     257             :                                     CQuorum& quorum, CQuorumDataRequest& request)
     258             : {
     259         130 :     if (!(request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS)) {
     260          93 :         return true;
     261             :     }
     262             : 
     263          37 :     if (!m_nodeman) {
     264           0 :         return true;
     265             :     }
     266             : 
     267          37 :     auto vvec = quorum.GetVerificationVector();
     268          37 :     if (!vvec || vvec->size() != size_t(quorum.params.threshold)) {
     269           0 :         LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: No valid quorum verification vector available, from peer=%d\n",
     270             :                  __func__, NetMsgType::QDATA, pfrom.GetId());
     271           0 :         return false;
     272             :     }
     273             : 
     274          37 :     int memberIdx = quorum.GetMemberIndex(request.GetProTxHash());
     275          37 :     if (memberIdx == -1) {
     276           0 :         LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: Not a member of the quorum, from peer=%d\n",
     277             :                  __func__, NetMsgType::QDATA, pfrom.GetId());
     278           0 :         return false;
     279             :     }
     280             : 
     281          37 :     std::vector<CBLSIESEncryptedObject<CBLSSecretKey>> vecEncrypted;
     282          37 :     vRecv >> vecEncrypted;
     283             : 
     284          37 :     std::vector<CBLSSecretKey> vecSecretKeys;
     285          37 :     vecSecretKeys.resize(vecEncrypted.size());
     286         164 :     for (const auto i : util::irange(vecEncrypted.size())) {
     287         127 :         if (!m_nodeman->Decrypt(vecEncrypted[i], memberIdx, vecSecretKeys[i], PROTOCOL_VERSION)) {
     288           0 :             m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "failed to decrypt");
     289           0 :             return false;
     290             :         }
     291             :     }
     292             : 
     293          74 :     if (!quorum.SetSecretKeyShare(m_bls_worker.AggregateSecretKeys(vecSecretKeys),
     294          37 :                                   m_nodeman->GetProTxHash())) {
     295           0 :         m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "invalid secret key share received");
     296           0 :         return false;
     297             :     }
     298             : 
     299          37 :     return true;
     300         130 : }
     301             : 
     302         131 : DataRequestStatus NetQuorum::RequestQuorumData(CNode& peer, const CQuorum& quorum, uint16_t nDataMask,
     303             :                                                const uint256& proTxHash) const
     304             : {
     305         262 :     const CQuorumDataRequestKey key(peer.GetVerifiedProRegTxHash(), true,
     306         131 :                                     quorum.m_quorum_base_block_index->GetBlockHash(), quorum.qc->llmqType);
     307         262 :     const CQuorumDataRequest request(quorum.qc->llmqType, quorum.m_quorum_base_block_index->GetBlockHash(),
     308         131 :                                      nDataMask, proTxHash);
     309         131 :     if (!m_qman.RegisterDataRequest(key, request)) {
     310           0 :         return m_qman.GetDataRequestStatus(peer.GetVerifiedProRegTxHash(), /*we_requested=*/true,
     311           0 :                                            quorum.m_quorum_base_block_index->GetBlockHash(), quorum.qc->llmqType);
     312             :     }
     313         131 :     LogPrint(BCLog::LLMQ, "NetQuorum::%s -- sending QGETDATA quorumHash[%s] llmqType[%d] proRegTx[%s]\n", __func__,
     314             :              key.quorumHash.ToString(), std23::to_underlying(key.llmqType), key.proRegTx.ToString());
     315             : 
     316         131 :     CNetMsgMaker msgMaker(peer.GetCommonVersion());
     317         131 :     m_connman.PushMessage(&peer, msgMaker.Make(NetMsgType::QGETDATA, request));
     318         131 :     return DataRequestStatus::Requested;
     319         131 : }
     320             : 
     321             : 
     322        2825 : void NetQuorum::InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd)
     323             : {
     324        2825 :     if (!m_role) return;
     325         666 :     UpdatedBlockTip(tip, nullptr, ibd);
     326         666 :     if (tip) {
     327        3996 :         for (const auto& params : Params().GetConsensus().llmqs) {
     328        3330 :             CheckQuorumConnections(params, tip);
     329             :         }
     330         666 :     }
     331        2825 : }
     332             : 
     333             : // CValidationInterface
     334             : 
     335      221351 : void NetQuorum::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload)
     336             : {
     337      221351 :     if (!m_role) return;
     338       87087 :     if (!pindexNew) return;
     339       87087 :     if (fInitialDownload || pindexNew == pindexFork) return;
     340       81279 :     if (!m_mn_sync.IsBlockchainSynced()) return;
     341             : 
     342      478704 :     for (const auto& params : Params().GetConsensus().llmqs) {
     343      398920 :         CheckQuorumConnections(params, pindexNew);
     344             :     }
     345             : 
     346       79784 :     m_qman.CleanupExpiredDataRequests();
     347       79784 :     TriggerQuorumDataRecoveryThreads(pindexNew);
     348       79784 :     StartCleanupOldQuorumDataThread(pindexNew);
     349      221351 : }
     350             : 
     351             : // Private helpers
     352             : 
     353      402250 : Uint256HashSet NetQuorum::GetQuorumsToDelete(const Consensus::LLMQParams& llmqParams,
     354             :                                              gsl::not_null<const CBlockIndex*> pindexNew) const
     355             : {
     356      402250 :     auto connmanQuorumsToDelete = m_connman.GetMasternodeQuorums(llmqParams.type);
     357             : 
     358      402250 :     if (IsQuorumRotationEnabled(llmqParams, pindexNew)) {
     359       74726 :         int cycleIndexTipHeight = pindexNew->nHeight % llmqParams.dkgInterval;
     360       74726 :         int cycleQuorumBaseHeight = pindexNew->nHeight - cycleIndexTipHeight;
     361       74726 :         std::stringstream ss;
     362      224178 :         for (const auto quorumIndex : util::irange(llmqParams.signingActiveQuorumCount)) {
     363      149452 :             if (quorumIndex <= cycleIndexTipHeight) {
     364      146354 :                 int curDkgHeight = cycleQuorumBaseHeight + quorumIndex;
     365      146354 :                 auto curDkgBlock = pindexNew->GetAncestor(curDkgHeight)->GetBlockHash();
     366      146354 :                 ss << curDkgHeight << ":" << curDkgBlock.ToString() << " | ";
     367      146354 :                 connmanQuorumsToDelete.erase(curDkgBlock);
     368      146354 :             }
     369             :         }
     370       74726 :         LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn quorum connections for rotated quorums: [%s]\n",
     371             :                  __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, ss.str());
     372       74726 :     } else {
     373      327524 :         int curDkgHeight = pindexNew->nHeight - (pindexNew->nHeight % llmqParams.dkgInterval);
     374      327524 :         auto curDkgBlock = pindexNew->GetAncestor(curDkgHeight)->GetBlockHash();
     375      327524 :         connmanQuorumsToDelete.erase(curDkgBlock);
     376      327524 :         LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn quorum connections for quorum: [%d:%s]\n",
     377             :                  __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, curDkgHeight, curDkgBlock.ToString());
     378             :     }
     379             : 
     380      402250 :     return connmanQuorumsToDelete;
     381      402250 : }
     382             : 
     383      402250 : void NetQuorum::CheckQuorumConnections(const Consensus::LLMQParams& llmqParams,
     384             :                                                 gsl::not_null<const CBlockIndex*> pindexNew) const
     385             : {
     386      402250 :     const bool is_masternode = m_role->IsMasternode();
     387      402250 :     const uint256 proTxHash = is_masternode ? m_role->GetProTxHash() : uint256{};
     388             : 
     389      402250 :     auto lastQuorums = m_qman.ScanQuorums(llmqParams.type, pindexNew, (size_t)llmqParams.keepOldConnections);
     390      402250 :     auto deletableQuorums = GetQuorumsToDelete(llmqParams, pindexNew);
     391             : 
     392      804470 :     const bool watchOtherISQuorums = is_masternode &&
     393      402220 :         llmqParams.type == Params().GetConsensus().llmqTypeDIP0024InstantSend &&
     394      124404 :         std::ranges::any_of(lastQuorums, [&proTxHash](const auto& old_quorum) { return old_quorum->IsMember(proTxHash); });
     395             : 
     396      633703 :     for (const auto& quorum : lastQuorums) {
     397      462906 :         if (EnsureQuorumConnections(llmqParams, m_connman, m_sporkman,
     398      231453 :                                            {m_dmnman, m_qsnapman, m_chainman, quorum->m_quorum_base_block_index},
     399      231453 :                                            m_dmnman.GetListAtChainTip(), proTxHash,
     400      231453 :                                            /*is_masternode=*/is_masternode,
     401      231453 :                                            /*quorums_watch=*/is_masternode ? m_role->IsWatching() : true)) {
     402      140937 :             if (deletableQuorums.erase(quorum->qc->quorumHash) > 0) {
     403      110579 :                 LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn quorum connections for quorum: [%d:%s]\n",
     404             :                          __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight,
     405             :                          quorum->m_quorum_base_block_index->nHeight,
     406             :                          quorum->m_quorum_base_block_index->GetBlockHash().ToString());
     407      110579 :             }
     408      231453 :         } else if (watchOtherISQuorums && !quorum->IsMember(proTxHash)) {
     409       18366 :             Uint256HashSet connections;
     410       18366 :             const auto& cindexes = utils::CalcDeterministicWatchConnections(llmqParams.type,
     411       18366 :                                                                              quorum->m_quorum_base_block_index,
     412       18366 :                                                                              quorum->members.size(), 1);
     413       36732 :             for (auto idx : cindexes) {
     414       18366 :                 connections.emplace(quorum->members[idx]->proTxHash);
     415             :             }
     416       18366 :             if (!connections.empty()) {
     417       18366 :                 if (!m_connman.HasMasternodeQuorumNodes(llmqParams.type, quorum->m_quorum_base_block_index->GetBlockHash())) {
     418         575 :                     LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] adding mn inter-quorum connections for quorum: [%d:%s]\n",
     419             :                              __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight,
     420             :                              quorum->m_quorum_base_block_index->nHeight,
     421             :                              quorum->m_quorum_base_block_index->GetBlockHash().ToString());
     422         575 :                     m_connman.SetMasternodeQuorumNodes(llmqParams.type,
     423         575 :                                                        quorum->m_quorum_base_block_index->GetBlockHash(), connections);
     424         575 :                     m_connman.SetMasternodeQuorumRelayMembers(llmqParams.type,
     425         575 :                                                               quorum->m_quorum_base_block_index->GetBlockHash(), connections);
     426         575 :                 }
     427       18366 :                 if (deletableQuorums.erase(quorum->qc->quorumHash) > 0) {
     428       14224 :                     LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn inter-quorum connections for quorum: [%d:%s]\n",
     429             :                              __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight,
     430             :                              quorum->m_quorum_base_block_index->nHeight,
     431             :                              quorum->m_quorum_base_block_index->GetBlockHash().ToString());
     432       14224 :                 }
     433       18366 :             }
     434       18366 :         }
     435             :     }
     436             : 
     437      404088 :     for (const auto& quorumHash : deletableQuorums) {
     438        1838 :         LogPrint(BCLog::LLMQ, "NetQuorum::%s -- removing masternodes quorum connections for quorum %s:\n",
     439             :                  __func__, quorumHash.ToString());
     440        1838 :         m_connman.RemoveMasternodeQuorumNodes(llmqParams.type, quorumHash);
     441             :     }
     442      402250 : }
     443             : 
     444       79784 : void NetQuorum::TriggerQuorumDataRecoveryThreads(gsl::not_null<const CBlockIndex*> block_index) const
     445             : {
     446       79784 :     if (!m_quorums_recovery) return;
     447             : 
     448       76412 :     const bool is_masternode = m_role->IsMasternode();
     449       76412 :     const uint256 proTxHash = is_masternode ? m_role->GetProTxHash() : uint256{};
     450             : 
     451       76412 :     LogPrint(BCLog::LLMQ, "NetQuorum::%s -- Process block %s as protx_hash=%s\n", __func__, block_index->GetBlockHash().ToString(), proTxHash.ToString());
     452             : 
     453      458472 :     for (const auto& params : Params().GetConsensus().llmqs) {
     454      382060 :         auto vecQuorums = m_qman.ScanQuorums(params.type, block_index, params.keepOldConnections);
     455      892517 :         const bool fWeAreQuorumTypeMember = is_masternode && std::ranges::any_of(vecQuorums, [&proTxHash](const auto& pQuorum) {
     456      128397 :             return pQuorum->IsValidMember(proTxHash);
     457             :         });
     458             : 
     459      591442 :         for (auto& pQuorum : vecQuorums) {
     460      209382 :             if (is_masternode && pQuorum->IsValidMember(proTxHash)) {
     461      126148 :                 uint16_t nDataMask{0};
     462      126148 :                 if (!pQuorum->HasVerificationVector()) {
     463        2575 :                     nDataMask |= CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR;
     464        2575 :                 }
     465      126148 :                 if (!pQuorum->GetSkShare().IsValid()) {
     466        2575 :                     nDataMask |= CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS;
     467        2575 :                 }
     468      126148 :                 if (nDataMask != 0) {
     469        2575 :                     StartSkShareRecoveryThread(block_index, std::move(pQuorum), nDataMask);
     470        2575 :                 } else {
     471      123573 :                     LogPrint(BCLog::LLMQ, "NetQuorum::%s -- No data needed from (%d, %s) at height %d\n", __func__,
     472             :                              std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(),
     473             :                              block_index->nHeight);
     474             :                 }
     475      126148 :             } else {
     476       83234 :                 TryStartVvecSyncThread(block_index, std::move(pQuorum), fWeAreQuorumTypeMember);
     477             :             }
     478             :         }
     479      382060 :     }
     480       79784 : }
     481             : 
     482        2627 : void NetQuorum::DataRecoveryThread(gsl::not_null<const CBlockIndex*> block_index, CQuorumCPtr pQuorum,
     483             :                                    uint16_t data_mask, const uint256& protx_hash, size_t start_offset) const
     484             : {
     485        2627 :     size_t nTries{0};
     486        2627 :     uint16_t nDataMask{data_mask};
     487        2627 :     int64_t nTimeLastSuccess{0};
     488        2627 :     uint256* pCurrentMemberHash{nullptr};
     489        2627 :     std::vector<uint256> vecMemberHashes;
     490        2627 :     const int64_t nRequestTimeout{10};
     491             : 
     492       13602 :     auto printLog = [&](const std::string& strMessage) {
     493       10975 :         const std::string strMember{pCurrentMemberHash == nullptr ? "nullptr" : pCurrentMemberHash->ToString()};
     494       10975 :         LogPrint(BCLog::LLMQ, "NetQuorum::DataRecoveryThread -- %s - for llmqType %d, quorumHash %s, nDataMask (%d/%d), pCurrentMemberHash %s, nTries %d\n",
     495             :                  strMessage, std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(),
     496             :                  nDataMask, data_mask, strMember, nTries);
     497       10981 :     };
     498        2627 :     printLog("Start");
     499             : 
     500        2627 :     while (!m_mn_sync.IsBlockchainSynced() && !quorumThreadInterrupt) {
     501           0 :         quorumThreadInterrupt.sleep_for(std::chrono::seconds(nRequestTimeout));
     502             :     }
     503             : 
     504        2627 :     if (quorumThreadInterrupt) {
     505           0 :         printLog("Aborted");
     506           0 :         return;
     507             :     }
     508             : 
     509        2627 :     vecMemberHashes.reserve(pQuorum->qc->validMembers.size());
     510        5562 :     for (auto& member : pQuorum->members) {
     511        2935 :         if (pQuorum->IsValidMember(member->proTxHash) && member->proTxHash != protx_hash) {
     512         397 :             vecMemberHashes.push_back(member->proTxHash);
     513         397 :         }
     514             :     }
     515        2627 :     std::sort(vecMemberHashes.begin(), vecMemberHashes.end());
     516             : 
     517        2627 :     printLog("Try to request");
     518             : 
     519        2868 :     while (nDataMask > 0 && !quorumThreadInterrupt) {
     520        5712 :         if (nDataMask & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR &&
     521        2856 :             pQuorum->HasVerificationVector()) {
     522         103 :             nDataMask &= ~CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR;
     523         103 :             printLog("Received quorumVvec");
     524         103 :         }
     525             : 
     526        5454 :         if (nDataMask & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS && pQuorum->GetSkShare().IsValid()) {
     527          24 :             nDataMask &= ~CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS;
     528          24 :             printLog("Received skShare");
     529          24 :         }
     530             : 
     531        2856 :         if (nDataMask == 0) {
     532         103 :             printLog("Success");
     533         103 :             break;
     534             :         }
     535             : 
     536        2753 :         if ((GetTime<std::chrono::seconds>().count() - nTimeLastSuccess) > nRequestTimeout) {
     537        2735 :             if (nTries >= vecMemberHashes.size()) {
     538        2512 :                 printLog("All tried but failed");
     539        2512 :                 break;
     540             :             }
     541         223 :             pCurrentMemberHash = &vecMemberHashes[(start_offset + nTries++) % vecMemberHashes.size()];
     542         446 :             if (m_qman.IsDataRequestPending(*pCurrentMemberHash, /*we_requested=*/true, pQuorum->qc->quorumHash,
     543         223 :                                           pQuorum->qc->llmqType)) {
     544           0 :                 printLog("Already asked");
     545           0 :                 continue;
     546             :             }
     547         223 :             quorumThreadInterrupt.sleep_for(std::chrono::milliseconds(start_offset * 100));
     548         223 :             nTimeLastSuccess = GetTime<std::chrono::seconds>().count();
     549         223 :             m_connman.AddPendingMasternode(*pCurrentMemberHash);
     550         223 :             printLog("Connect");
     551         223 :         }
     552             : 
     553        1055 :         m_connman.ForEachNode([&](CNode* pNode) {
     554         814 :             auto verifiedProRegTxHash = pNode->GetVerifiedProRegTxHash();
     555         814 :             if (pCurrentMemberHash == nullptr || verifiedProRegTxHash != *pCurrentMemberHash) {
     556         683 :                 return;
     557             :             }
     558             : 
     559         131 :             switch (RequestQuorumData(*pNode, *pQuorum, nDataMask, protx_hash)) {
     560             :             case DataRequestStatus::Requested:
     561         131 :                 nTimeLastSuccess = GetTime<std::chrono::seconds>().count();
     562         131 :                 printLog("Requested");
     563         131 :                 return;
     564             :             case DataRequestStatus::NotFound:
     565           0 :                 printLog("Failed");
     566           0 :                 pNode->fDisconnect = true;
     567           0 :                 pCurrentMemberHash = nullptr;
     568           0 :                 return;
     569             :             case DataRequestStatus::Processed:
     570           0 :                 printLog("Processed");
     571           0 :                 pNode->fDisconnect = true;
     572           0 :                 pCurrentMemberHash = nullptr;
     573           0 :                 return;
     574             :             case DataRequestStatus::Pending:
     575           0 :                 printLog("Waiting");
     576           0 :                 return;
     577             :             }
     578         814 :         });
     579         241 :         quorumThreadInterrupt.sleep_for(std::chrono::seconds(1));
     580             :     }
     581        2627 :     pQuorum->ReleaseRecovery();
     582        2627 :     printLog("Done");
     583        2627 : }
     584             : 
     585          90 : void NetQuorum::StartVvecSyncThread(gsl::not_null<const CBlockIndex*> block_index, CQuorumCPtr pQuorum) const
     586             : {
     587          90 :     if (pQuorum->qc->validMembers.empty()) return;
     588          90 :     if (!pQuorum->TryClaimRecovery()) {
     589           0 :         LogPrint(BCLog::LLMQ, "NetQuorum::%s -- Already running\n", __func__);
     590           0 :         return;
     591             :     }
     592             : 
     593         180 :     workerPool.push([pQuorum = std::move(pQuorum), block_index, this](int threadId) mutable {
     594          90 :         DataRecoveryThread(block_index, std::move(pQuorum), CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR,
     595          90 :                            /*protx_hash=*/uint256(), /*start_offset=*/0);
     596          90 :     });
     597          90 : }
     598             : 
     599       83234 : void NetQuorum::TryStartVvecSyncThread(gsl::not_null<const CBlockIndex*> block_index, CQuorumCPtr pQuorum,
     600             :                                        bool fWeAreQuorumTypeMember) const
     601             : {
     602       83234 :     if (pQuorum->IsRecoveryRunning()) return;
     603             : 
     604       83125 :     const bool fSyncForTypeEnabled = m_sync_map.count(pQuorum->qc->llmqType) > 0;
     605       83125 :     const QvvecSyncMode syncMode = fSyncForTypeEnabled ? m_sync_map.at(pQuorum->qc->llmqType) : QvvecSyncMode::Invalid;
     606      166175 :     const bool fSyncCurrent = syncMode == QvvecSyncMode::Always ||
     607       83050 :                               (syncMode == QvvecSyncMode::OnlyIfTypeMember && fWeAreQuorumTypeMember);
     608             : 
     609       83125 :     if ((fSyncForTypeEnabled && fSyncCurrent) && !pQuorum->HasVerificationVector()) {
     610          90 :         StartVvecSyncThread(block_index, std::move(pQuorum));
     611          90 :     } else {
     612       83035 :         LogPrint(BCLog::LLMQ, "NetQuorum::%s -- No data needed from (%d, %s) at height %d\n", __func__,
     613             :                  std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(), block_index->nHeight);
     614             :     }
     615       83234 : }
     616             : 
     617        2575 : void NetQuorum::StartSkShareRecoveryThread(gsl::not_null<const CBlockIndex*> pIndex, CQuorumCPtr pQuorum,
     618             :                                            uint16_t nDataMaskIn) const
     619             : {
     620        2575 :     if (pQuorum->qc->validMembers.empty()) return;
     621             : 
     622        2575 :     if (!pQuorum->TryClaimRecovery()) {
     623          38 :         LogPrint(BCLog::LLMQ, "NetQuorum::%s -- Already running\n", __func__);
     624          38 :         return;
     625             :     }
     626             : 
     627        5073 :     workerPool.push([pQuorum = std::move(pQuorum), pIndex, nDataMaskIn, this](int threadId) mutable {
     628        2536 :         const size_t size_offset = GetQuorumRecoveryStartOffset(*pQuorum, pIndex);
     629        2536 :         DataRecoveryThread(pIndex, std::move(pQuorum), nDataMaskIn, m_role->GetProTxHash(), size_offset);
     630        2538 :     });
     631        2575 : }
     632             : 
     633        2536 : size_t NetQuorum::GetQuorumRecoveryStartOffset(const CQuorum& quorum,
     634             :                                                gsl::not_null<const CBlockIndex*> pIndex) const
     635             : {
     636        2536 :     auto mns = m_dmnman.GetListForBlock(pIndex);
     637        2536 :     std::vector<uint256> vecProTxHashes;
     638        2536 :     vecProTxHashes.reserve(mns.GetCounts().enabled());
     639        2537 :     mns.ForEachMN(/*onlyValid=*/true,
     640        7732 :                   [&](const auto& pMasternode) { vecProTxHashes.emplace_back(pMasternode.proTxHash); });
     641        2537 :     std::sort(vecProTxHashes.begin(), vecProTxHashes.end());
     642        2535 :     size_t nIndex{0};
     643             :     {
     644        2535 :         auto my_protx_hash = m_role->GetProTxHash();
     645        3805 :         for (const auto i : util::irange(vecProTxHashes.size())) {
     646             :             // cppcheck-suppress useStlAlgorithm
     647        3803 :             if (my_protx_hash == vecProTxHashes[i]) {
     648        2537 :                 nIndex = i;
     649        2537 :                 break;
     650             :             }
     651             :         }
     652             :     }
     653        2544 :     return nIndex % quorum.qc->validMembers.size();
     654        2560 : }
     655             : 
     656       79784 : void NetQuorum::StartCleanupOldQuorumDataThread(gsl::not_null<const CBlockIndex*> pIndex) const
     657             : {
     658             :     // Note: this function is CPU heavy and we don't want it to be running during DKGs.
     659             :     // The largest dkgMiningWindowStart for a related quorum type is 42 (LLMQ_60_75).
     660             :     // At the same time most quorums use dkgInterval = 24 so the next DKG for them
     661             :     // (after block 576 + 42) will start at block 576 + 24 * 2. That's only a 6 blocks
     662             :     // window and it's better to have more room so we pick next cycle.
     663             :     // dkgMiningWindowStart for small quorums is 10 i.e. a safe block to start
     664             :     // these calculations is at height 576 + 24 * 2 + 10 = 576 + 58.
     665       79784 :     if (pIndex->nHeight % 576 != 58) {
     666       79730 :         return;
     667             :     }
     668             : 
     669          54 :     cxxtimer::Timer t(/*start=*/true);
     670          54 :     LogPrint(BCLog::LLMQ, "NetQuorum::%s -- start\n", __func__);
     671             : 
     672             :     // do not block the caller thread
     673         108 :     workerPool.push([pIndex, t, this](int threadId) {
     674          54 :         Uint256HashSet dbKeysToSkip;
     675             : 
     676         108 :         if (LOCK(cs_cleanup); cleanupQuorumsCache.empty()) {
     677          54 :             utils::InitQuorumsCache(cleanupQuorumsCache, m_chainman.GetConsensus(), /*limit_by_connections=*/false);
     678          54 :         }
     679         324 :         for (const auto& params : Params().GetConsensus().llmqs) {
     680         270 :             if (quorumThreadInterrupt) {
     681           0 :                 break;
     682             :             }
     683         270 :             LOCK(cs_cleanup);
     684         270 :             auto& cache = cleanupQuorumsCache[params.type];
     685         270 :             const CBlockIndex* pindex_loop{pIndex};
     686         270 :             Uint256HashSet quorum_keys;
     687       19728 :             while (pindex_loop != nullptr && pIndex->nHeight - pindex_loop->nHeight < params.max_store_depth()) {
     688       19458 :                 uint256 quorum_key;
     689       19458 :                 if (cache.get(pindex_loop->GetBlockHash(), quorum_key)) {
     690           0 :                     quorum_keys.insert(quorum_key);
     691           0 :                     if (quorum_keys.size() >= static_cast<size_t>(params.keepOldKeys)) break; // extra safety belt
     692           0 :                 }
     693       19458 :                 pindex_loop = pindex_loop->pprev;
     694             :             }
     695         346 :             for (const auto& pQuorum : m_qman.ScanQuorums(params.type, pIndex, params.keepOldKeys - quorum_keys.size())) {
     696          76 :                 const uint256 quorum_key = MakeQuorumKey(*pQuorum);
     697          76 :                 quorum_keys.insert(quorum_key);
     698          76 :                 cache.insert(pQuorum->m_quorum_base_block_index->GetBlockHash(), quorum_key);
     699             :             }
     700         270 :             dbKeysToSkip.merge(quorum_keys);
     701         270 :         }
     702             : 
     703          54 :         if (!quorumThreadInterrupt) {
     704          54 :             m_qman.CleanupOldQuorumData(dbKeysToSkip);
     705          54 :         }
     706             : 
     707          54 :         LogPrint(BCLog::LLMQ, "NetQuorum::StartCleanupOldQuorumDataThread -- done. time=%d\n", t.count());
     708          54 :     });
     709       79784 : }
     710             : 
     711      236262 : bool EnsureQuorumConnections(const Consensus::LLMQParams& llmqParams, CConnman& connman, const CSporkManager& sporkman,
     712             :                              const UtilParameters& util_params, const CDeterministicMNList& tip_mn_list,
     713             :                              const uint256& myProTxHash, bool is_masternode, bool quorums_watch)
     714             : {
     715      236262 :     if (!is_masternode && !quorums_watch) {
     716           0 :         return false;
     717             :     }
     718             : 
     719      236262 :     auto members = utils::GetAllQuorumMembers(llmqParams.type, util_params);
     720      236262 :     if (members.empty()) {
     721           0 :         return false;
     722             :     }
     723             : 
     724      898017 :     bool isMember = std::ranges::find_if(members, [&](const auto& dmn) { return dmn->proTxHash == myProTxHash; }) !=
     725      236262 :                     members.end();
     726             : 
     727      236262 :     if (!isMember && !quorums_watch) {
     728       92321 :         return false;
     729             :     }
     730             : 
     731      143941 :     LogPrint(BCLog::NET_NETCONN, "%s -- isMember=%d for quorum %s:\n", __func__, isMember,
     732             :              util_params.m_base_index->GetBlockHash().ToString());
     733             : 
     734      143941 :     Uint256HashSet connections;
     735      143941 :     Uint256HashSet relayMembers;
     736      143941 :     if (isMember) {
     737      143935 :         connections = utils::GetQuorumConnections(llmqParams, sporkman, util_params, myProTxHash, /*onlyOutbound=*/true);
     738             :         // If all-members-connected is enabled for this quorum type, leverage the full-mesh
     739             :         // connections for low-latency recovered sig propagation by treating all members as
     740             :         // relay members (instead of the ring-based subset). This ensures peers will send
     741             :         // QSENDRECSIGS to each other across the full mesh and set m_wants_recsigs widely.
     742      143935 :         if (IsAllMembersConnectedEnabled(llmqParams.type, sporkman)) {
     743      282916 :             for (const auto& dmn : members) {
     744      229170 :                 if (dmn->proTxHash != myProTxHash) {
     745      175423 :                     relayMembers.emplace(dmn->proTxHash);
     746      175423 :                 }
     747             :             }
     748       53746 :         } else {
     749       90189 :             relayMembers = utils::GetQuorumRelayMembers(llmqParams, util_params, myProTxHash, true);
     750             :         }
     751      143935 :     } else {
     752           6 :         auto cindexes = utils::CalcDeterministicWatchConnections(llmqParams.type, util_params.m_base_index, members.size(), 1);
     753          12 :         for (auto idx : cindexes) {
     754           6 :             connections.emplace(members[idx]->proTxHash);
     755             :         }
     756           6 :         relayMembers = connections;
     757           6 :     }
     758      143941 :     if (!connections.empty()) {
     759      137909 :         if (!connman.HasMasternodeQuorumNodes(llmqParams.type, util_params.m_base_index->GetBlockHash()) &&
     760        3591 :             LogAcceptDebug(BCLog::LLMQ)) {
     761        3591 :             std::string debugMsg = strprintf("%s -- adding masternodes quorum connections for quorum %s:\n", __func__,
     762        3591 :                                              util_params.m_base_index->GetBlockHash().ToString());
     763        9980 :             for (const auto& c : connections) {
     764        6389 :                 auto dmn = tip_mn_list.GetValidMN(c);
     765        6389 :                 if (!dmn) {
     766          30 :                     debugMsg += strprintf("  %s (not in valid MN set anymore)\n", c.ToString());
     767          30 :                 } else {
     768       12718 :                     debugMsg += strprintf("  %s (%s)\n", c.ToString(),
     769        6359 :                                           dmn->pdmnState->netInfo->GetPrimary().ToStringAddrPort());
     770             :                 }
     771        6389 :             }
     772        3591 :             LogPrint(BCLog::NET_NETCONN, debugMsg.c_str()); /* Continued */
     773        3591 :         }
     774      134318 :         connman.SetMasternodeQuorumNodes(llmqParams.type, util_params.m_base_index->GetBlockHash(), connections);
     775      134318 :     }
     776      143941 :     if (!relayMembers.empty()) {
     777      141433 :         connman.SetMasternodeQuorumRelayMembers(llmqParams.type, util_params.m_base_index->GetBlockHash(), relayMembers);
     778      141433 :     }
     779      143941 :     return true;
     780      236262 : }
     781             : 
     782             : } // namespace llmq

Generated by: LCOV version 1.16