Line data Source code
1 : // Copyright (c) 2025-2026 The Dash Core developers
2 : // Distributed under the MIT software license, see the accompanying
3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 :
5 : #include <llmq/net_quorum.h>
6 :
7 : #include <active/masternode.h>
8 : #include <chainparams.h>
9 : #include <evo/deterministicmns.h>
10 : #include <llmq/commitment.h>
11 : #include <llmq/options.h>
12 : #include <llmq/quorumsman.h>
13 : #include <llmq/utils.h>
14 : #include <logging.h>
15 : #include <masternode/sync.h>
16 : #include <net.h>
17 : #include <netmessagemaker.h>
18 : #include <util/helpers.h>
19 : #include <util/std23.h>
20 : #include <util/thread.h>
21 : #include <util/time.h>
22 : #include <validation.h>
23 :
24 : #include <cxxtimer.hpp>
25 :
26 : #include <algorithm>
27 : #include <ranges>
28 :
29 : namespace llmq {
30 :
31 8571 : NetQuorum::NetQuorum(PeerManagerInternal* peer_manager, CBLSWorker& bls_worker,
32 : CConnman& connman, CDeterministicMNManager& dmnman, CQuorumManager& qman,
33 : CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman,
34 : const CMasternodeSync& mn_sync, const CSporkManager& sporkman,
35 : QuorumRole* quorum_role, CActiveMasternodeManager* nodeman,
36 : int16_t worker_count, const QvvecSyncModeMap& sync_map, bool quorums_recovery) :
37 2857 : NetHandler(peer_manager),
38 2857 : m_bls_worker{bls_worker},
39 2857 : m_connman{connman},
40 2857 : m_dmnman{dmnman},
41 2857 : m_qman{qman},
42 2857 : m_qsnapman{qsnapman},
43 2857 : m_chainman{chainman},
44 2857 : m_mn_sync{mn_sync},
45 2857 : m_sporkman{sporkman},
46 2857 : m_role{quorum_role},
47 2857 : m_nodeman{nodeman},
48 2857 : m_worker_count{worker_count},
49 2857 : m_sync_map{sync_map},
50 2857 : m_quorums_recovery{quorums_recovery}
51 8571 : {
52 2857 : quorumThreadInterrupt.reset();
53 5714 : }
54 :
55 : // NetHandler
56 :
57 2831 : void NetQuorum::Start()
58 : {
59 2831 : if (!m_role) return;
60 666 : assert(m_worker_count > 0);
61 666 : workerPool.resize(m_worker_count);
62 666 : RenameThreadPool(workerPool, "q-mngr");
63 2831 : }
64 :
65 5714 : void NetQuorum::Stop()
66 : {
67 5714 : workerPool.clear_queue();
68 5714 : workerPool.stop(true);
69 5714 : }
70 :
71 96785 : void NetQuorum::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv)
72 : {
73 96785 : if (msg_type == NetMsgType::QGETDATA) {
74 335 : if (!m_role || !m_role->IsMasternode() || (pfrom.GetVerifiedProRegTxHash().IsNull() && !pfrom.qwatch)) {
75 12 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not a verified masternode or a qwatch connection");
76 12 : return;
77 : }
78 :
79 323 : CQuorumDataRequest request;
80 323 : vRecv >> request;
81 :
82 646 : auto sendQDATA = [&](CQuorumDataRequest::Errors nError,
83 : bool request_limit_exceeded,
84 : const CDataStream& body = CDataStream(SER_NETWORK, PROTOCOL_VERSION)) -> bool {
85 323 : bool misbehave = false;
86 323 : switch (nError) {
87 : case (CQuorumDataRequest::Errors::NONE):
88 : case (CQuorumDataRequest::Errors::QUORUM_TYPE_INVALID):
89 : case (CQuorumDataRequest::Errors::QUORUM_BLOCK_NOT_FOUND):
90 : case (CQuorumDataRequest::Errors::QUORUM_NOT_FOUND):
91 : case (CQuorumDataRequest::Errors::MASTERNODE_IS_NO_MEMBER):
92 : case (CQuorumDataRequest::Errors::UNDEFINED):
93 292 : misbehave = request_limit_exceeded;
94 292 : break;
95 : case (CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING):
96 : case (CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING):
97 : // Do not punish limit exceed if we don't have the requested data
98 31 : break;
99 : }
100 323 : request.SetError(nError);
101 323 : CDataStream ssResponse{SER_NETWORK, pfrom.GetCommonVersion()};
102 323 : ssResponse << request << body;
103 323 : m_connman.PushMessage(&pfrom, CNetMsgMaker(pfrom.GetCommonVersion()).Make(NetMsgType::QDATA, ssResponse));
104 323 : return misbehave;
105 323 : };
106 :
107 323 : const CQuorumDataRequestKey key(pfrom.GetVerifiedProRegTxHash(), false, request.GetQuorumHash(), request.GetLLMQType());
108 323 : const bool request_limit_exceeded = !m_qman.RegisterDataRequest(key, request, /*add_expiry_bias=*/false);
109 :
110 323 : if (!Params().GetLLMQ(request.GetLLMQType()).has_value()) {
111 6 : if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_TYPE_INVALID, request_limit_exceeded)) {
112 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
113 0 : }
114 6 : return;
115 : }
116 :
117 634 : const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(::cs_main, return m_chainman.m_blockman.LookupBlockIndex(request.GetQuorumHash()));
118 317 : if (pQuorumBaseBlockIndex == nullptr) {
119 6 : if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_BLOCK_NOT_FOUND, request_limit_exceeded)) {
120 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
121 0 : }
122 6 : return;
123 : }
124 :
125 311 : const auto pQuorum = m_qman.GetQuorum(request.GetLLMQType(), request.GetQuorumHash());
126 311 : if (pQuorum == nullptr) {
127 6 : if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_NOT_FOUND, request_limit_exceeded)) {
128 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
129 0 : }
130 6 : return;
131 : }
132 :
133 305 : CDataStream ssResponseData(SER_NETWORK, pfrom.GetCommonVersion());
134 :
135 : // Check if request wants QUORUM_VERIFICATION_VECTOR data
136 305 : if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) {
137 287 : if (!pQuorum->HasVerificationVector()) {
138 17 : if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING, request_limit_exceeded)) {
139 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
140 0 : }
141 17 : return;
142 : }
143 270 : ssResponseData << *pQuorum->GetVerificationVector();
144 270 : }
145 :
146 : // Check if request wants ENCRYPTED_CONTRIBUTIONS data
147 288 : bool misbehave_contrib = ProcessContribQGETDATA(ssResponseData, *pQuorum, request, pQuorumBaseBlockIndex);
148 :
149 288 : CQuorumDataRequest::Errors ret_err{CQuorumDataRequest::Errors::NONE};
150 288 : if (auto request_err = request.GetError(); request_err != CQuorumDataRequest::Errors::NONE &&
151 288 : request_err != CQuorumDataRequest::Errors::UNDEFINED) {
152 20 : ret_err = request_err;
153 20 : }
154 :
155 576 : bool misbehave_qdata = (ret_err != CQuorumDataRequest::Errors::NONE)
156 20 : ? sendQDATA(ret_err, request_limit_exceeded)
157 268 : : sendQDATA(CQuorumDataRequest::Errors::NONE, request_limit_exceeded, ssResponseData);
158 :
159 288 : if (request_limit_exceeded && (misbehave_contrib || misbehave_qdata)) {
160 90 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
161 90 : }
162 288 : return;
163 311 : }
164 :
165 96450 : if (msg_type == NetMsgType::QDATA) {
166 185 : if (!m_role || pfrom.GetVerifiedProRegTxHash().IsNull()) {
167 6 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not a verified masternode and -watchquorums is not enabled");
168 6 : return;
169 : }
170 :
171 179 : CQuorumDataRequest request;
172 179 : vRecv >> request;
173 :
174 : {
175 179 : const CQuorumDataRequestKey key(pfrom.GetVerifiedProRegTxHash(), true, request.GetQuorumHash(), request.GetLLMQType());
176 179 : const auto validation = m_qman.ValidateDataResponse(key, request);
177 179 : switch (validation) {
178 : case CQuorumManager::DataResponseValidation::NotRequested:
179 6 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not requested");
180 6 : return;
181 : case CQuorumManager::DataResponseValidation::AlreadyReceived:
182 6 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "already received");
183 6 : return;
184 : case CQuorumManager::DataResponseValidation::Mismatch:
185 18 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not like requested");
186 18 : return;
187 : case CQuorumManager::DataResponseValidation::OK:
188 149 : break;
189 : }
190 : }
191 :
192 149 : if (request.GetError() != CQuorumDataRequest::Errors::NONE) {
193 19 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: Error %d (%s), from peer=%d\n", __func__, msg_type, request.GetError(), request.GetErrorString(), pfrom.GetId());
194 19 : return;
195 : }
196 :
197 130 : CQuorumPtr pQuorum = m_qman.GetCachedMutableQuorum(request.GetLLMQType(), request.GetQuorumHash());
198 130 : if (!pQuorum) {
199 : // Don't bump score because we asked for it
200 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: Quorum not found, from peer=%d\n", __func__, msg_type, pfrom.GetId());
201 0 : return;
202 : }
203 :
204 : // Check if request has QUORUM_VERIFICATION_VECTOR data
205 130 : if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) {
206 130 : std::vector<CBLSPublicKey> verificationVector;
207 130 : vRecv >> verificationVector;
208 :
209 130 : if (pQuorum->SetVerificationVector(verificationVector)) {
210 130 : m_qman.QueueQuorumForWarming(pQuorum);
211 130 : } else {
212 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "invalid quorum verification vector");
213 0 : return;
214 : }
215 130 : }
216 :
217 : // Check if request has ENCRYPTED_CONTRIBUTIONS data
218 130 : if (!ProcessContribQDATA(pfrom, vRecv, *pQuorum, request)) {
219 0 : return;
220 : }
221 :
222 130 : m_qman.WriteContributions(pQuorum);
223 130 : }
224 96785 : }
225 :
226 288 : bool NetQuorum::ProcessContribQGETDATA(CDataStream& ssResponseData, const CQuorum& quorum,
227 : CQuorumDataRequest& request,
228 : gsl::not_null<const CBlockIndex*> block_index) const
229 : {
230 288 : if (!(request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS)) {
231 201 : return false;
232 : }
233 :
234 87 : if (!m_nodeman) {
235 0 : request.SetError(CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING);
236 0 : return true;
237 : }
238 :
239 87 : int memberIdx = quorum.GetMemberIndex(request.GetProTxHash());
240 87 : if (memberIdx == -1) {
241 6 : request.SetError(CQuorumDataRequest::Errors::MASTERNODE_IS_NO_MEMBER);
242 6 : return true;
243 : }
244 :
245 81 : std::vector<CBLSIESEncryptedObject<CBLSSecretKey>> vecEncrypted;
246 162 : if (!m_qman.GetEncryptedContributions(request.GetLLMQType(), block_index,
247 81 : quorum.qc->validMembers, request.GetProTxHash(), vecEncrypted)) {
248 14 : request.SetError(CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING);
249 14 : return true;
250 : }
251 :
252 67 : ssResponseData << vecEncrypted;
253 67 : return false;
254 288 : }
255 :
256 130 : bool NetQuorum::ProcessContribQDATA(CNode& pfrom, CDataStream& vRecv,
257 : CQuorum& quorum, CQuorumDataRequest& request)
258 : {
259 130 : if (!(request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS)) {
260 93 : return true;
261 : }
262 :
263 37 : if (!m_nodeman) {
264 0 : return true;
265 : }
266 :
267 37 : auto vvec = quorum.GetVerificationVector();
268 37 : if (!vvec || vvec->size() != size_t(quorum.params.threshold)) {
269 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: No valid quorum verification vector available, from peer=%d\n",
270 : __func__, NetMsgType::QDATA, pfrom.GetId());
271 0 : return false;
272 : }
273 :
274 37 : int memberIdx = quorum.GetMemberIndex(request.GetProTxHash());
275 37 : if (memberIdx == -1) {
276 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: Not a member of the quorum, from peer=%d\n",
277 : __func__, NetMsgType::QDATA, pfrom.GetId());
278 0 : return false;
279 : }
280 :
281 37 : std::vector<CBLSIESEncryptedObject<CBLSSecretKey>> vecEncrypted;
282 37 : vRecv >> vecEncrypted;
283 :
284 37 : std::vector<CBLSSecretKey> vecSecretKeys;
285 37 : vecSecretKeys.resize(vecEncrypted.size());
286 164 : for (const auto i : util::irange(vecEncrypted.size())) {
287 127 : if (!m_nodeman->Decrypt(vecEncrypted[i], memberIdx, vecSecretKeys[i], PROTOCOL_VERSION)) {
288 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "failed to decrypt");
289 0 : return false;
290 : }
291 : }
292 :
293 74 : if (!quorum.SetSecretKeyShare(m_bls_worker.AggregateSecretKeys(vecSecretKeys),
294 37 : m_nodeman->GetProTxHash())) {
295 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "invalid secret key share received");
296 0 : return false;
297 : }
298 :
299 37 : return true;
300 130 : }
301 :
302 131 : DataRequestStatus NetQuorum::RequestQuorumData(CNode& peer, const CQuorum& quorum, uint16_t nDataMask,
303 : const uint256& proTxHash) const
304 : {
305 262 : const CQuorumDataRequestKey key(peer.GetVerifiedProRegTxHash(), true,
306 131 : quorum.m_quorum_base_block_index->GetBlockHash(), quorum.qc->llmqType);
307 262 : const CQuorumDataRequest request(quorum.qc->llmqType, quorum.m_quorum_base_block_index->GetBlockHash(),
308 131 : nDataMask, proTxHash);
309 131 : if (!m_qman.RegisterDataRequest(key, request)) {
310 0 : return m_qman.GetDataRequestStatus(peer.GetVerifiedProRegTxHash(), /*we_requested=*/true,
311 0 : quorum.m_quorum_base_block_index->GetBlockHash(), quorum.qc->llmqType);
312 : }
313 131 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- sending QGETDATA quorumHash[%s] llmqType[%d] proRegTx[%s]\n", __func__,
314 : key.quorumHash.ToString(), std23::to_underlying(key.llmqType), key.proRegTx.ToString());
315 :
316 131 : CNetMsgMaker msgMaker(peer.GetCommonVersion());
317 131 : m_connman.PushMessage(&peer, msgMaker.Make(NetMsgType::QGETDATA, request));
318 131 : return DataRequestStatus::Requested;
319 131 : }
320 :
321 :
322 2825 : void NetQuorum::InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd)
323 : {
324 2825 : if (!m_role) return;
325 666 : UpdatedBlockTip(tip, nullptr, ibd);
326 666 : if (tip) {
327 3996 : for (const auto& params : Params().GetConsensus().llmqs) {
328 3330 : CheckQuorumConnections(params, tip);
329 : }
330 666 : }
331 2825 : }
332 :
333 : // CValidationInterface
334 :
335 221351 : void NetQuorum::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload)
336 : {
337 221351 : if (!m_role) return;
338 87087 : if (!pindexNew) return;
339 87087 : if (fInitialDownload || pindexNew == pindexFork) return;
340 81279 : if (!m_mn_sync.IsBlockchainSynced()) return;
341 :
342 478704 : for (const auto& params : Params().GetConsensus().llmqs) {
343 398920 : CheckQuorumConnections(params, pindexNew);
344 : }
345 :
346 79784 : m_qman.CleanupExpiredDataRequests();
347 79784 : TriggerQuorumDataRecoveryThreads(pindexNew);
348 79784 : StartCleanupOldQuorumDataThread(pindexNew);
349 221351 : }
350 :
351 : // Private helpers
352 :
353 402250 : Uint256HashSet NetQuorum::GetQuorumsToDelete(const Consensus::LLMQParams& llmqParams,
354 : gsl::not_null<const CBlockIndex*> pindexNew) const
355 : {
356 402250 : auto connmanQuorumsToDelete = m_connman.GetMasternodeQuorums(llmqParams.type);
357 :
358 402250 : if (IsQuorumRotationEnabled(llmqParams, pindexNew)) {
359 74726 : int cycleIndexTipHeight = pindexNew->nHeight % llmqParams.dkgInterval;
360 74726 : int cycleQuorumBaseHeight = pindexNew->nHeight - cycleIndexTipHeight;
361 74726 : std::stringstream ss;
362 224178 : for (const auto quorumIndex : util::irange(llmqParams.signingActiveQuorumCount)) {
363 149452 : if (quorumIndex <= cycleIndexTipHeight) {
364 146354 : int curDkgHeight = cycleQuorumBaseHeight + quorumIndex;
365 146354 : auto curDkgBlock = pindexNew->GetAncestor(curDkgHeight)->GetBlockHash();
366 146354 : ss << curDkgHeight << ":" << curDkgBlock.ToString() << " | ";
367 146354 : connmanQuorumsToDelete.erase(curDkgBlock);
368 146354 : }
369 : }
370 74726 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn quorum connections for rotated quorums: [%s]\n",
371 : __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, ss.str());
372 74726 : } else {
373 327524 : int curDkgHeight = pindexNew->nHeight - (pindexNew->nHeight % llmqParams.dkgInterval);
374 327524 : auto curDkgBlock = pindexNew->GetAncestor(curDkgHeight)->GetBlockHash();
375 327524 : connmanQuorumsToDelete.erase(curDkgBlock);
376 327524 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn quorum connections for quorum: [%d:%s]\n",
377 : __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, curDkgHeight, curDkgBlock.ToString());
378 : }
379 :
380 402250 : return connmanQuorumsToDelete;
381 402250 : }
382 :
383 402250 : void NetQuorum::CheckQuorumConnections(const Consensus::LLMQParams& llmqParams,
384 : gsl::not_null<const CBlockIndex*> pindexNew) const
385 : {
386 402250 : const bool is_masternode = m_role->IsMasternode();
387 402250 : const uint256 proTxHash = is_masternode ? m_role->GetProTxHash() : uint256{};
388 :
389 402250 : auto lastQuorums = m_qman.ScanQuorums(llmqParams.type, pindexNew, (size_t)llmqParams.keepOldConnections);
390 402250 : auto deletableQuorums = GetQuorumsToDelete(llmqParams, pindexNew);
391 :
392 804470 : const bool watchOtherISQuorums = is_masternode &&
393 402220 : llmqParams.type == Params().GetConsensus().llmqTypeDIP0024InstantSend &&
394 124404 : std::ranges::any_of(lastQuorums, [&proTxHash](const auto& old_quorum) { return old_quorum->IsMember(proTxHash); });
395 :
396 633703 : for (const auto& quorum : lastQuorums) {
397 462906 : if (EnsureQuorumConnections(llmqParams, m_connman, m_sporkman,
398 231453 : {m_dmnman, m_qsnapman, m_chainman, quorum->m_quorum_base_block_index},
399 231453 : m_dmnman.GetListAtChainTip(), proTxHash,
400 231453 : /*is_masternode=*/is_masternode,
401 231453 : /*quorums_watch=*/is_masternode ? m_role->IsWatching() : true)) {
402 140937 : if (deletableQuorums.erase(quorum->qc->quorumHash) > 0) {
403 110579 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn quorum connections for quorum: [%d:%s]\n",
404 : __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight,
405 : quorum->m_quorum_base_block_index->nHeight,
406 : quorum->m_quorum_base_block_index->GetBlockHash().ToString());
407 110579 : }
408 231453 : } else if (watchOtherISQuorums && !quorum->IsMember(proTxHash)) {
409 18366 : Uint256HashSet connections;
410 18366 : const auto& cindexes = utils::CalcDeterministicWatchConnections(llmqParams.type,
411 18366 : quorum->m_quorum_base_block_index,
412 18366 : quorum->members.size(), 1);
413 36732 : for (auto idx : cindexes) {
414 18366 : connections.emplace(quorum->members[idx]->proTxHash);
415 : }
416 18366 : if (!connections.empty()) {
417 18366 : if (!m_connman.HasMasternodeQuorumNodes(llmqParams.type, quorum->m_quorum_base_block_index->GetBlockHash())) {
418 575 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] adding mn inter-quorum connections for quorum: [%d:%s]\n",
419 : __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight,
420 : quorum->m_quorum_base_block_index->nHeight,
421 : quorum->m_quorum_base_block_index->GetBlockHash().ToString());
422 575 : m_connman.SetMasternodeQuorumNodes(llmqParams.type,
423 575 : quorum->m_quorum_base_block_index->GetBlockHash(), connections);
424 575 : m_connman.SetMasternodeQuorumRelayMembers(llmqParams.type,
425 575 : quorum->m_quorum_base_block_index->GetBlockHash(), connections);
426 575 : }
427 18366 : if (deletableQuorums.erase(quorum->qc->quorumHash) > 0) {
428 14224 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn inter-quorum connections for quorum: [%d:%s]\n",
429 : __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight,
430 : quorum->m_quorum_base_block_index->nHeight,
431 : quorum->m_quorum_base_block_index->GetBlockHash().ToString());
432 14224 : }
433 18366 : }
434 18366 : }
435 : }
436 :
437 404088 : for (const auto& quorumHash : deletableQuorums) {
438 1838 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- removing masternodes quorum connections for quorum %s:\n",
439 : __func__, quorumHash.ToString());
440 1838 : m_connman.RemoveMasternodeQuorumNodes(llmqParams.type, quorumHash);
441 : }
442 402250 : }
443 :
444 79784 : void NetQuorum::TriggerQuorumDataRecoveryThreads(gsl::not_null<const CBlockIndex*> block_index) const
445 : {
446 79784 : if (!m_quorums_recovery) return;
447 :
448 76412 : const bool is_masternode = m_role->IsMasternode();
449 76412 : const uint256 proTxHash = is_masternode ? m_role->GetProTxHash() : uint256{};
450 :
451 76412 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- Process block %s as protx_hash=%s\n", __func__, block_index->GetBlockHash().ToString(), proTxHash.ToString());
452 :
453 458472 : for (const auto& params : Params().GetConsensus().llmqs) {
454 382060 : auto vecQuorums = m_qman.ScanQuorums(params.type, block_index, params.keepOldConnections);
455 892517 : const bool fWeAreQuorumTypeMember = is_masternode && std::ranges::any_of(vecQuorums, [&proTxHash](const auto& pQuorum) {
456 128397 : return pQuorum->IsValidMember(proTxHash);
457 : });
458 :
459 591442 : for (auto& pQuorum : vecQuorums) {
460 209382 : if (is_masternode && pQuorum->IsValidMember(proTxHash)) {
461 126148 : uint16_t nDataMask{0};
462 126148 : if (!pQuorum->HasVerificationVector()) {
463 2575 : nDataMask |= CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR;
464 2575 : }
465 126148 : if (!pQuorum->GetSkShare().IsValid()) {
466 2575 : nDataMask |= CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS;
467 2575 : }
468 126148 : if (nDataMask != 0) {
469 2575 : StartSkShareRecoveryThread(block_index, std::move(pQuorum), nDataMask);
470 2575 : } else {
471 123573 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- No data needed from (%d, %s) at height %d\n", __func__,
472 : std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(),
473 : block_index->nHeight);
474 : }
475 126148 : } else {
476 83234 : TryStartVvecSyncThread(block_index, std::move(pQuorum), fWeAreQuorumTypeMember);
477 : }
478 : }
479 382060 : }
480 79784 : }
481 :
482 2627 : void NetQuorum::DataRecoveryThread(gsl::not_null<const CBlockIndex*> block_index, CQuorumCPtr pQuorum,
483 : uint16_t data_mask, const uint256& protx_hash, size_t start_offset) const
484 : {
485 2627 : size_t nTries{0};
486 2627 : uint16_t nDataMask{data_mask};
487 2627 : int64_t nTimeLastSuccess{0};
488 2627 : uint256* pCurrentMemberHash{nullptr};
489 2627 : std::vector<uint256> vecMemberHashes;
490 2627 : const int64_t nRequestTimeout{10};
491 :
492 13602 : auto printLog = [&](const std::string& strMessage) {
493 10975 : const std::string strMember{pCurrentMemberHash == nullptr ? "nullptr" : pCurrentMemberHash->ToString()};
494 10975 : LogPrint(BCLog::LLMQ, "NetQuorum::DataRecoveryThread -- %s - for llmqType %d, quorumHash %s, nDataMask (%d/%d), pCurrentMemberHash %s, nTries %d\n",
495 : strMessage, std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(),
496 : nDataMask, data_mask, strMember, nTries);
497 10981 : };
498 2627 : printLog("Start");
499 :
500 2627 : while (!m_mn_sync.IsBlockchainSynced() && !quorumThreadInterrupt) {
501 0 : quorumThreadInterrupt.sleep_for(std::chrono::seconds(nRequestTimeout));
502 : }
503 :
504 2627 : if (quorumThreadInterrupt) {
505 0 : printLog("Aborted");
506 0 : return;
507 : }
508 :
509 2627 : vecMemberHashes.reserve(pQuorum->qc->validMembers.size());
510 5562 : for (auto& member : pQuorum->members) {
511 2935 : if (pQuorum->IsValidMember(member->proTxHash) && member->proTxHash != protx_hash) {
512 397 : vecMemberHashes.push_back(member->proTxHash);
513 397 : }
514 : }
515 2627 : std::sort(vecMemberHashes.begin(), vecMemberHashes.end());
516 :
517 2627 : printLog("Try to request");
518 :
519 2868 : while (nDataMask > 0 && !quorumThreadInterrupt) {
520 5712 : if (nDataMask & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR &&
521 2856 : pQuorum->HasVerificationVector()) {
522 103 : nDataMask &= ~CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR;
523 103 : printLog("Received quorumVvec");
524 103 : }
525 :
526 5454 : if (nDataMask & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS && pQuorum->GetSkShare().IsValid()) {
527 24 : nDataMask &= ~CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS;
528 24 : printLog("Received skShare");
529 24 : }
530 :
531 2856 : if (nDataMask == 0) {
532 103 : printLog("Success");
533 103 : break;
534 : }
535 :
536 2753 : if ((GetTime<std::chrono::seconds>().count() - nTimeLastSuccess) > nRequestTimeout) {
537 2735 : if (nTries >= vecMemberHashes.size()) {
538 2512 : printLog("All tried but failed");
539 2512 : break;
540 : }
541 223 : pCurrentMemberHash = &vecMemberHashes[(start_offset + nTries++) % vecMemberHashes.size()];
542 446 : if (m_qman.IsDataRequestPending(*pCurrentMemberHash, /*we_requested=*/true, pQuorum->qc->quorumHash,
543 223 : pQuorum->qc->llmqType)) {
544 0 : printLog("Already asked");
545 0 : continue;
546 : }
547 223 : quorumThreadInterrupt.sleep_for(std::chrono::milliseconds(start_offset * 100));
548 223 : nTimeLastSuccess = GetTime<std::chrono::seconds>().count();
549 223 : m_connman.AddPendingMasternode(*pCurrentMemberHash);
550 223 : printLog("Connect");
551 223 : }
552 :
553 1055 : m_connman.ForEachNode([&](CNode* pNode) {
554 814 : auto verifiedProRegTxHash = pNode->GetVerifiedProRegTxHash();
555 814 : if (pCurrentMemberHash == nullptr || verifiedProRegTxHash != *pCurrentMemberHash) {
556 683 : return;
557 : }
558 :
559 131 : switch (RequestQuorumData(*pNode, *pQuorum, nDataMask, protx_hash)) {
560 : case DataRequestStatus::Requested:
561 131 : nTimeLastSuccess = GetTime<std::chrono::seconds>().count();
562 131 : printLog("Requested");
563 131 : return;
564 : case DataRequestStatus::NotFound:
565 0 : printLog("Failed");
566 0 : pNode->fDisconnect = true;
567 0 : pCurrentMemberHash = nullptr;
568 0 : return;
569 : case DataRequestStatus::Processed:
570 0 : printLog("Processed");
571 0 : pNode->fDisconnect = true;
572 0 : pCurrentMemberHash = nullptr;
573 0 : return;
574 : case DataRequestStatus::Pending:
575 0 : printLog("Waiting");
576 0 : return;
577 : }
578 814 : });
579 241 : quorumThreadInterrupt.sleep_for(std::chrono::seconds(1));
580 : }
581 2627 : pQuorum->ReleaseRecovery();
582 2627 : printLog("Done");
583 2627 : }
584 :
585 90 : void NetQuorum::StartVvecSyncThread(gsl::not_null<const CBlockIndex*> block_index, CQuorumCPtr pQuorum) const
586 : {
587 90 : if (pQuorum->qc->validMembers.empty()) return;
588 90 : if (!pQuorum->TryClaimRecovery()) {
589 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- Already running\n", __func__);
590 0 : return;
591 : }
592 :
593 180 : workerPool.push([pQuorum = std::move(pQuorum), block_index, this](int threadId) mutable {
594 90 : DataRecoveryThread(block_index, std::move(pQuorum), CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR,
595 90 : /*protx_hash=*/uint256(), /*start_offset=*/0);
596 90 : });
597 90 : }
598 :
599 83234 : void NetQuorum::TryStartVvecSyncThread(gsl::not_null<const CBlockIndex*> block_index, CQuorumCPtr pQuorum,
600 : bool fWeAreQuorumTypeMember) const
601 : {
602 83234 : if (pQuorum->IsRecoveryRunning()) return;
603 :
604 83125 : const bool fSyncForTypeEnabled = m_sync_map.count(pQuorum->qc->llmqType) > 0;
605 83125 : const QvvecSyncMode syncMode = fSyncForTypeEnabled ? m_sync_map.at(pQuorum->qc->llmqType) : QvvecSyncMode::Invalid;
606 166175 : const bool fSyncCurrent = syncMode == QvvecSyncMode::Always ||
607 83050 : (syncMode == QvvecSyncMode::OnlyIfTypeMember && fWeAreQuorumTypeMember);
608 :
609 83125 : if ((fSyncForTypeEnabled && fSyncCurrent) && !pQuorum->HasVerificationVector()) {
610 90 : StartVvecSyncThread(block_index, std::move(pQuorum));
611 90 : } else {
612 83035 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- No data needed from (%d, %s) at height %d\n", __func__,
613 : std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(), block_index->nHeight);
614 : }
615 83234 : }
616 :
617 2575 : void NetQuorum::StartSkShareRecoveryThread(gsl::not_null<const CBlockIndex*> pIndex, CQuorumCPtr pQuorum,
618 : uint16_t nDataMaskIn) const
619 : {
620 2575 : if (pQuorum->qc->validMembers.empty()) return;
621 :
622 2575 : if (!pQuorum->TryClaimRecovery()) {
623 38 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- Already running\n", __func__);
624 38 : return;
625 : }
626 :
627 5073 : workerPool.push([pQuorum = std::move(pQuorum), pIndex, nDataMaskIn, this](int threadId) mutable {
628 2536 : const size_t size_offset = GetQuorumRecoveryStartOffset(*pQuorum, pIndex);
629 2536 : DataRecoveryThread(pIndex, std::move(pQuorum), nDataMaskIn, m_role->GetProTxHash(), size_offset);
630 2538 : });
631 2575 : }
632 :
633 2536 : size_t NetQuorum::GetQuorumRecoveryStartOffset(const CQuorum& quorum,
634 : gsl::not_null<const CBlockIndex*> pIndex) const
635 : {
636 2536 : auto mns = m_dmnman.GetListForBlock(pIndex);
637 2536 : std::vector<uint256> vecProTxHashes;
638 2536 : vecProTxHashes.reserve(mns.GetCounts().enabled());
639 2537 : mns.ForEachMN(/*onlyValid=*/true,
640 7732 : [&](const auto& pMasternode) { vecProTxHashes.emplace_back(pMasternode.proTxHash); });
641 2537 : std::sort(vecProTxHashes.begin(), vecProTxHashes.end());
642 2535 : size_t nIndex{0};
643 : {
644 2535 : auto my_protx_hash = m_role->GetProTxHash();
645 3805 : for (const auto i : util::irange(vecProTxHashes.size())) {
646 : // cppcheck-suppress useStlAlgorithm
647 3803 : if (my_protx_hash == vecProTxHashes[i]) {
648 2537 : nIndex = i;
649 2537 : break;
650 : }
651 : }
652 : }
653 2544 : return nIndex % quorum.qc->validMembers.size();
654 2560 : }
655 :
656 79784 : void NetQuorum::StartCleanupOldQuorumDataThread(gsl::not_null<const CBlockIndex*> pIndex) const
657 : {
658 : // Note: this function is CPU heavy and we don't want it to be running during DKGs.
659 : // The largest dkgMiningWindowStart for a related quorum type is 42 (LLMQ_60_75).
660 : // At the same time most quorums use dkgInterval = 24 so the next DKG for them
661 : // (after block 576 + 42) will start at block 576 + 24 * 2. That's only a 6 blocks
662 : // window and it's better to have more room so we pick next cycle.
663 : // dkgMiningWindowStart for small quorums is 10 i.e. a safe block to start
664 : // these calculations is at height 576 + 24 * 2 + 10 = 576 + 58.
665 79784 : if (pIndex->nHeight % 576 != 58) {
666 79730 : return;
667 : }
668 :
669 54 : cxxtimer::Timer t(/*start=*/true);
670 54 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- start\n", __func__);
671 :
672 : // do not block the caller thread
673 108 : workerPool.push([pIndex, t, this](int threadId) {
674 54 : Uint256HashSet dbKeysToSkip;
675 :
676 108 : if (LOCK(cs_cleanup); cleanupQuorumsCache.empty()) {
677 54 : utils::InitQuorumsCache(cleanupQuorumsCache, m_chainman.GetConsensus(), /*limit_by_connections=*/false);
678 54 : }
679 324 : for (const auto& params : Params().GetConsensus().llmqs) {
680 270 : if (quorumThreadInterrupt) {
681 0 : break;
682 : }
683 270 : LOCK(cs_cleanup);
684 270 : auto& cache = cleanupQuorumsCache[params.type];
685 270 : const CBlockIndex* pindex_loop{pIndex};
686 270 : Uint256HashSet quorum_keys;
687 19728 : while (pindex_loop != nullptr && pIndex->nHeight - pindex_loop->nHeight < params.max_store_depth()) {
688 19458 : uint256 quorum_key;
689 19458 : if (cache.get(pindex_loop->GetBlockHash(), quorum_key)) {
690 0 : quorum_keys.insert(quorum_key);
691 0 : if (quorum_keys.size() >= static_cast<size_t>(params.keepOldKeys)) break; // extra safety belt
692 0 : }
693 19458 : pindex_loop = pindex_loop->pprev;
694 : }
695 346 : for (const auto& pQuorum : m_qman.ScanQuorums(params.type, pIndex, params.keepOldKeys - quorum_keys.size())) {
696 76 : const uint256 quorum_key = MakeQuorumKey(*pQuorum);
697 76 : quorum_keys.insert(quorum_key);
698 76 : cache.insert(pQuorum->m_quorum_base_block_index->GetBlockHash(), quorum_key);
699 : }
700 270 : dbKeysToSkip.merge(quorum_keys);
701 270 : }
702 :
703 54 : if (!quorumThreadInterrupt) {
704 54 : m_qman.CleanupOldQuorumData(dbKeysToSkip);
705 54 : }
706 :
707 54 : LogPrint(BCLog::LLMQ, "NetQuorum::StartCleanupOldQuorumDataThread -- done. time=%d\n", t.count());
708 54 : });
709 79784 : }
710 :
711 236262 : bool EnsureQuorumConnections(const Consensus::LLMQParams& llmqParams, CConnman& connman, const CSporkManager& sporkman,
712 : const UtilParameters& util_params, const CDeterministicMNList& tip_mn_list,
713 : const uint256& myProTxHash, bool is_masternode, bool quorums_watch)
714 : {
715 236262 : if (!is_masternode && !quorums_watch) {
716 0 : return false;
717 : }
718 :
719 236262 : auto members = utils::GetAllQuorumMembers(llmqParams.type, util_params);
720 236262 : if (members.empty()) {
721 0 : return false;
722 : }
723 :
724 898017 : bool isMember = std::ranges::find_if(members, [&](const auto& dmn) { return dmn->proTxHash == myProTxHash; }) !=
725 236262 : members.end();
726 :
727 236262 : if (!isMember && !quorums_watch) {
728 92321 : return false;
729 : }
730 :
731 143941 : LogPrint(BCLog::NET_NETCONN, "%s -- isMember=%d for quorum %s:\n", __func__, isMember,
732 : util_params.m_base_index->GetBlockHash().ToString());
733 :
734 143941 : Uint256HashSet connections;
735 143941 : Uint256HashSet relayMembers;
736 143941 : if (isMember) {
737 143935 : connections = utils::GetQuorumConnections(llmqParams, sporkman, util_params, myProTxHash, /*onlyOutbound=*/true);
738 : // If all-members-connected is enabled for this quorum type, leverage the full-mesh
739 : // connections for low-latency recovered sig propagation by treating all members as
740 : // relay members (instead of the ring-based subset). This ensures peers will send
741 : // QSENDRECSIGS to each other across the full mesh and set m_wants_recsigs widely.
742 143935 : if (IsAllMembersConnectedEnabled(llmqParams.type, sporkman)) {
743 282916 : for (const auto& dmn : members) {
744 229170 : if (dmn->proTxHash != myProTxHash) {
745 175423 : relayMembers.emplace(dmn->proTxHash);
746 175423 : }
747 : }
748 53746 : } else {
749 90189 : relayMembers = utils::GetQuorumRelayMembers(llmqParams, util_params, myProTxHash, true);
750 : }
751 143935 : } else {
752 6 : auto cindexes = utils::CalcDeterministicWatchConnections(llmqParams.type, util_params.m_base_index, members.size(), 1);
753 12 : for (auto idx : cindexes) {
754 6 : connections.emplace(members[idx]->proTxHash);
755 : }
756 6 : relayMembers = connections;
757 6 : }
758 143941 : if (!connections.empty()) {
759 137909 : if (!connman.HasMasternodeQuorumNodes(llmqParams.type, util_params.m_base_index->GetBlockHash()) &&
760 3591 : LogAcceptDebug(BCLog::LLMQ)) {
761 3591 : std::string debugMsg = strprintf("%s -- adding masternodes quorum connections for quorum %s:\n", __func__,
762 3591 : util_params.m_base_index->GetBlockHash().ToString());
763 9980 : for (const auto& c : connections) {
764 6389 : auto dmn = tip_mn_list.GetValidMN(c);
765 6389 : if (!dmn) {
766 30 : debugMsg += strprintf(" %s (not in valid MN set anymore)\n", c.ToString());
767 30 : } else {
768 12718 : debugMsg += strprintf(" %s (%s)\n", c.ToString(),
769 6359 : dmn->pdmnState->netInfo->GetPrimary().ToStringAddrPort());
770 : }
771 6389 : }
772 3591 : LogPrint(BCLog::NET_NETCONN, debugMsg.c_str()); /* Continued */
773 3591 : }
774 134318 : connman.SetMasternodeQuorumNodes(llmqParams.type, util_params.m_base_index->GetBlockHash(), connections);
775 134318 : }
776 143941 : if (!relayMembers.empty()) {
777 141433 : connman.SetMasternodeQuorumRelayMembers(llmqParams.type, util_params.m_base_index->GetBlockHash(), relayMembers);
778 141433 : }
779 143941 : return true;
780 236262 : }
781 :
782 : } // namespace llmq
|