Line data Source code
1 : // Copyright (c) 2025-2026 The Dash Core developers
2 : // Distributed under the MIT software license, see the accompanying
3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 :
5 : #include <llmq/net_quorum.h>
6 :
7 : #include <active/masternode.h>
8 : #include <chainparams.h>
9 : #include <evo/deterministicmns.h>
10 : #include <llmq/commitment.h>
11 : #include <llmq/options.h>
12 : #include <llmq/quorumsman.h>
13 : #include <llmq/utils.h>
14 : #include <logging.h>
15 : #include <masternode/sync.h>
16 : #include <net.h>
17 : #include <netmessagemaker.h>
18 : #include <util/helpers.h>
19 : #include <util/std23.h>
20 : #include <util/thread.h>
21 : #include <util/time.h>
22 : #include <validation.h>
23 :
24 : #include <cxxtimer.hpp>
25 :
26 : #include <algorithm>
27 : #include <ranges>
28 :
29 : namespace llmq {
30 :
31 0 : NetQuorum::NetQuorum(PeerManagerInternal* peer_manager, CBLSWorker& bls_worker,
32 : CConnman& connman, CDeterministicMNManager& dmnman, CQuorumManager& qman,
33 : CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman,
34 : const CMasternodeSync& mn_sync, const CSporkManager& sporkman,
35 : QuorumRole* quorum_role, CActiveMasternodeManager* nodeman,
36 : int16_t worker_count, const QvvecSyncModeMap& sync_map, bool quorums_recovery) :
37 0 : NetHandler(peer_manager),
38 0 : m_bls_worker{bls_worker},
39 0 : m_connman{connman},
40 0 : m_dmnman{dmnman},
41 0 : m_qman{qman},
42 0 : m_qsnapman{qsnapman},
43 0 : m_chainman{chainman},
44 0 : m_mn_sync{mn_sync},
45 0 : m_sporkman{sporkman},
46 0 : m_role{quorum_role},
47 0 : m_nodeman{nodeman},
48 0 : m_worker_count{worker_count},
49 0 : m_sync_map{sync_map},
50 0 : m_quorums_recovery{quorums_recovery}
51 0 : {
52 0 : quorumThreadInterrupt.reset();
53 0 : }
54 :
55 : // NetHandler
56 :
57 0 : void NetQuorum::Start()
58 : {
59 0 : if (!m_role) return;
60 0 : assert(m_worker_count > 0);
61 0 : workerPool.resize(m_worker_count);
62 0 : RenameThreadPool(workerPool, "q-mngr");
63 0 : }
64 :
65 0 : void NetQuorum::Stop()
66 : {
67 0 : workerPool.clear_queue();
68 0 : workerPool.stop(true);
69 0 : }
70 :
71 0 : void NetQuorum::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv)
72 : {
73 0 : if (msg_type == NetMsgType::QGETDATA) {
74 0 : if (!m_role || !m_role->IsMasternode() || (pfrom.GetVerifiedProRegTxHash().IsNull() && !pfrom.qwatch)) {
75 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not a verified masternode or a qwatch connection");
76 0 : return;
77 : }
78 :
79 0 : CQuorumDataRequest request;
80 0 : vRecv >> request;
81 :
82 0 : auto sendQDATA = [&](CQuorumDataRequest::Errors nError,
83 : bool request_limit_exceeded,
84 : const CDataStream& body = CDataStream(SER_NETWORK, PROTOCOL_VERSION)) -> bool {
85 0 : bool misbehave = false;
86 0 : switch (nError) {
87 : case (CQuorumDataRequest::Errors::NONE):
88 : case (CQuorumDataRequest::Errors::QUORUM_TYPE_INVALID):
89 : case (CQuorumDataRequest::Errors::QUORUM_BLOCK_NOT_FOUND):
90 : case (CQuorumDataRequest::Errors::QUORUM_NOT_FOUND):
91 : case (CQuorumDataRequest::Errors::MASTERNODE_IS_NO_MEMBER):
92 : case (CQuorumDataRequest::Errors::UNDEFINED):
93 0 : misbehave = request_limit_exceeded;
94 0 : break;
95 : case (CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING):
96 : case (CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING):
97 : // Do not punish limit exceed if we don't have the requested data
98 0 : break;
99 : }
100 0 : request.SetError(nError);
101 0 : CDataStream ssResponse{SER_NETWORK, pfrom.GetCommonVersion()};
102 0 : ssResponse << request << body;
103 0 : m_connman.PushMessage(&pfrom, CNetMsgMaker(pfrom.GetCommonVersion()).Make(NetMsgType::QDATA, ssResponse));
104 0 : return misbehave;
105 0 : };
106 :
107 0 : const CQuorumDataRequestKey key(pfrom.GetVerifiedProRegTxHash(), false, request.GetQuorumHash(), request.GetLLMQType());
108 0 : const bool request_limit_exceeded = !m_qman.RegisterDataRequest(key, request, /*add_expiry_bias=*/false);
109 :
110 0 : if (!Params().GetLLMQ(request.GetLLMQType()).has_value()) {
111 0 : if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_TYPE_INVALID, request_limit_exceeded)) {
112 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
113 0 : }
114 0 : return;
115 : }
116 :
117 0 : const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(::cs_main, return m_chainman.m_blockman.LookupBlockIndex(request.GetQuorumHash()));
118 0 : if (pQuorumBaseBlockIndex == nullptr) {
119 0 : if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_BLOCK_NOT_FOUND, request_limit_exceeded)) {
120 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
121 0 : }
122 0 : return;
123 : }
124 :
125 0 : const auto pQuorum = m_qman.GetQuorum(request.GetLLMQType(), request.GetQuorumHash());
126 0 : if (pQuorum == nullptr) {
127 0 : if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_NOT_FOUND, request_limit_exceeded)) {
128 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
129 0 : }
130 0 : return;
131 : }
132 :
133 0 : CDataStream ssResponseData(SER_NETWORK, pfrom.GetCommonVersion());
134 :
135 : // Check if request wants QUORUM_VERIFICATION_VECTOR data
136 0 : if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) {
137 0 : if (!pQuorum->HasVerificationVector()) {
138 0 : if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING, request_limit_exceeded)) {
139 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
140 0 : }
141 0 : return;
142 : }
143 0 : ssResponseData << *pQuorum->GetVerificationVector();
144 0 : }
145 :
146 : // Check if request wants ENCRYPTED_CONTRIBUTIONS data
147 0 : bool misbehave_contrib = ProcessContribQGETDATA(ssResponseData, *pQuorum, request, pQuorumBaseBlockIndex);
148 :
149 0 : CQuorumDataRequest::Errors ret_err{CQuorumDataRequest::Errors::NONE};
150 0 : if (auto request_err = request.GetError(); request_err != CQuorumDataRequest::Errors::NONE &&
151 0 : request_err != CQuorumDataRequest::Errors::UNDEFINED) {
152 0 : ret_err = request_err;
153 0 : }
154 :
155 0 : bool misbehave_qdata = (ret_err != CQuorumDataRequest::Errors::NONE)
156 0 : ? sendQDATA(ret_err, request_limit_exceeded)
157 0 : : sendQDATA(CQuorumDataRequest::Errors::NONE, request_limit_exceeded, ssResponseData);
158 :
159 0 : if (request_limit_exceeded && (misbehave_contrib || misbehave_qdata)) {
160 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded");
161 0 : }
162 0 : return;
163 0 : }
164 :
165 0 : if (msg_type == NetMsgType::QDATA) {
166 0 : if (!m_role || pfrom.GetVerifiedProRegTxHash().IsNull()) {
167 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not a verified masternode and -watchquorums is not enabled");
168 0 : return;
169 : }
170 :
171 0 : CQuorumDataRequest request;
172 0 : vRecv >> request;
173 :
174 : {
175 0 : const CQuorumDataRequestKey key(pfrom.GetVerifiedProRegTxHash(), true, request.GetQuorumHash(), request.GetLLMQType());
176 0 : const auto validation = m_qman.ValidateDataResponse(key, request);
177 0 : switch (validation) {
178 : case CQuorumManager::DataResponseValidation::NotRequested:
179 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not requested");
180 0 : return;
181 : case CQuorumManager::DataResponseValidation::AlreadyReceived:
182 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "already received");
183 0 : return;
184 : case CQuorumManager::DataResponseValidation::Mismatch:
185 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not like requested");
186 0 : return;
187 : case CQuorumManager::DataResponseValidation::OK:
188 0 : break;
189 : }
190 : }
191 :
192 0 : if (request.GetError() != CQuorumDataRequest::Errors::NONE) {
193 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: Error %d (%s), from peer=%d\n", __func__, msg_type, request.GetError(), request.GetErrorString(), pfrom.GetId());
194 0 : return;
195 : }
196 :
197 0 : CQuorumPtr pQuorum = m_qman.GetCachedMutableQuorum(request.GetLLMQType(), request.GetQuorumHash());
198 0 : if (!pQuorum) {
199 : // Don't bump score because we asked for it
200 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: Quorum not found, from peer=%d\n", __func__, msg_type, pfrom.GetId());
201 0 : return;
202 : }
203 :
204 : // Check if request has QUORUM_VERIFICATION_VECTOR data
205 0 : if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) {
206 0 : std::vector<CBLSPublicKey> verificationVector;
207 0 : vRecv >> verificationVector;
208 :
209 0 : if (pQuorum->SetVerificationVector(verificationVector)) {
210 0 : m_qman.QueueQuorumForWarming(pQuorum);
211 0 : } else {
212 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "invalid quorum verification vector");
213 0 : return;
214 : }
215 0 : }
216 :
217 : // Check if request has ENCRYPTED_CONTRIBUTIONS data
218 0 : if (!ProcessContribQDATA(pfrom, vRecv, *pQuorum, request)) {
219 0 : return;
220 : }
221 :
222 0 : m_qman.WriteContributions(pQuorum);
223 0 : }
224 0 : }
225 :
226 0 : bool NetQuorum::ProcessContribQGETDATA(CDataStream& ssResponseData, const CQuorum& quorum,
227 : CQuorumDataRequest& request,
228 : gsl::not_null<const CBlockIndex*> block_index) const
229 : {
230 0 : if (!(request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS)) {
231 0 : return false;
232 : }
233 :
234 0 : if (!m_nodeman) {
235 0 : request.SetError(CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING);
236 0 : return true;
237 : }
238 :
239 0 : int memberIdx = quorum.GetMemberIndex(request.GetProTxHash());
240 0 : if (memberIdx == -1) {
241 0 : request.SetError(CQuorumDataRequest::Errors::MASTERNODE_IS_NO_MEMBER);
242 0 : return true;
243 : }
244 :
245 0 : std::vector<CBLSIESEncryptedObject<CBLSSecretKey>> vecEncrypted;
246 0 : if (!m_qman.GetEncryptedContributions(request.GetLLMQType(), block_index,
247 0 : quorum.qc->validMembers, request.GetProTxHash(), vecEncrypted)) {
248 0 : request.SetError(CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING);
249 0 : return true;
250 : }
251 :
252 0 : ssResponseData << vecEncrypted;
253 0 : return false;
254 0 : }
255 :
256 0 : bool NetQuorum::ProcessContribQDATA(CNode& pfrom, CDataStream& vRecv,
257 : CQuorum& quorum, CQuorumDataRequest& request)
258 : {
259 0 : if (!(request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS)) {
260 0 : return true;
261 : }
262 :
263 0 : if (!m_nodeman) {
264 0 : return true;
265 : }
266 :
267 0 : auto vvec = quorum.GetVerificationVector();
268 0 : if (!vvec || vvec->size() != size_t(quorum.params.threshold)) {
269 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: No valid quorum verification vector available, from peer=%d\n",
270 : __func__, NetMsgType::QDATA, pfrom.GetId());
271 0 : return false;
272 : }
273 :
274 0 : int memberIdx = quorum.GetMemberIndex(request.GetProTxHash());
275 0 : if (memberIdx == -1) {
276 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: Not a member of the quorum, from peer=%d\n",
277 : __func__, NetMsgType::QDATA, pfrom.GetId());
278 0 : return false;
279 : }
280 :
281 0 : std::vector<CBLSIESEncryptedObject<CBLSSecretKey>> vecEncrypted;
282 0 : vRecv >> vecEncrypted;
283 :
284 0 : std::vector<CBLSSecretKey> vecSecretKeys;
285 0 : vecSecretKeys.resize(vecEncrypted.size());
286 0 : for (const auto i : util::irange(vecEncrypted.size())) {
287 0 : if (!m_nodeman->Decrypt(vecEncrypted[i], memberIdx, vecSecretKeys[i], PROTOCOL_VERSION)) {
288 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "failed to decrypt");
289 0 : return false;
290 : }
291 : }
292 :
293 0 : if (!quorum.SetSecretKeyShare(m_bls_worker.AggregateSecretKeys(vecSecretKeys),
294 0 : m_nodeman->GetProTxHash())) {
295 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "invalid secret key share received");
296 0 : return false;
297 : }
298 :
299 0 : return true;
300 0 : }
301 :
302 0 : DataRequestStatus NetQuorum::RequestQuorumData(CNode& peer, const CQuorum& quorum, uint16_t nDataMask,
303 : const uint256& proTxHash) const
304 : {
305 0 : const CQuorumDataRequestKey key(peer.GetVerifiedProRegTxHash(), true,
306 0 : quorum.m_quorum_base_block_index->GetBlockHash(), quorum.qc->llmqType);
307 0 : const CQuorumDataRequest request(quorum.qc->llmqType, quorum.m_quorum_base_block_index->GetBlockHash(),
308 0 : nDataMask, proTxHash);
309 0 : if (!m_qman.RegisterDataRequest(key, request)) {
310 0 : return m_qman.GetDataRequestStatus(peer.GetVerifiedProRegTxHash(), /*we_requested=*/true,
311 0 : quorum.m_quorum_base_block_index->GetBlockHash(), quorum.qc->llmqType);
312 : }
313 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- sending QGETDATA quorumHash[%s] llmqType[%d] proRegTx[%s]\n", __func__,
314 : key.quorumHash.ToString(), std23::to_underlying(key.llmqType), key.proRegTx.ToString());
315 :
316 0 : CNetMsgMaker msgMaker(peer.GetCommonVersion());
317 0 : m_connman.PushMessage(&peer, msgMaker.Make(NetMsgType::QGETDATA, request));
318 0 : return DataRequestStatus::Requested;
319 0 : }
320 :
321 :
322 0 : void NetQuorum::InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd)
323 : {
324 0 : if (!m_role) return;
325 0 : UpdatedBlockTip(tip, nullptr, ibd);
326 0 : if (tip) {
327 0 : for (const auto& params : Params().GetConsensus().llmqs) {
328 0 : CheckQuorumConnections(params, tip);
329 : }
330 0 : }
331 0 : }
332 :
333 : // CValidationInterface
334 :
335 0 : void NetQuorum::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload)
336 : {
337 0 : if (!m_role) return;
338 0 : if (!pindexNew) return;
339 0 : if (fInitialDownload || pindexNew == pindexFork) return;
340 0 : if (!m_mn_sync.IsBlockchainSynced()) return;
341 :
342 0 : for (const auto& params : Params().GetConsensus().llmqs) {
343 0 : CheckQuorumConnections(params, pindexNew);
344 : }
345 :
346 0 : m_qman.CleanupExpiredDataRequests();
347 0 : TriggerQuorumDataRecoveryThreads(pindexNew);
348 0 : StartCleanupOldQuorumDataThread(pindexNew);
349 0 : }
350 :
351 : // Private helpers
352 :
353 0 : Uint256HashSet NetQuorum::GetQuorumsToDelete(const Consensus::LLMQParams& llmqParams,
354 : gsl::not_null<const CBlockIndex*> pindexNew) const
355 : {
356 0 : auto connmanQuorumsToDelete = m_connman.GetMasternodeQuorums(llmqParams.type);
357 :
358 0 : if (IsQuorumRotationEnabled(llmqParams, pindexNew)) {
359 0 : int cycleIndexTipHeight = pindexNew->nHeight % llmqParams.dkgInterval;
360 0 : int cycleQuorumBaseHeight = pindexNew->nHeight - cycleIndexTipHeight;
361 0 : std::stringstream ss;
362 0 : for (const auto quorumIndex : util::irange(llmqParams.signingActiveQuorumCount)) {
363 0 : if (quorumIndex <= cycleIndexTipHeight) {
364 0 : int curDkgHeight = cycleQuorumBaseHeight + quorumIndex;
365 0 : auto curDkgBlock = pindexNew->GetAncestor(curDkgHeight)->GetBlockHash();
366 0 : ss << curDkgHeight << ":" << curDkgBlock.ToString() << " | ";
367 0 : connmanQuorumsToDelete.erase(curDkgBlock);
368 0 : }
369 : }
370 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn quorum connections for rotated quorums: [%s]\n",
371 : __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, ss.str());
372 0 : } else {
373 0 : int curDkgHeight = pindexNew->nHeight - (pindexNew->nHeight % llmqParams.dkgInterval);
374 0 : auto curDkgBlock = pindexNew->GetAncestor(curDkgHeight)->GetBlockHash();
375 0 : connmanQuorumsToDelete.erase(curDkgBlock);
376 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn quorum connections for quorum: [%d:%s]\n",
377 : __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, curDkgHeight, curDkgBlock.ToString());
378 : }
379 :
380 0 : return connmanQuorumsToDelete;
381 0 : }
382 :
383 0 : void NetQuorum::CheckQuorumConnections(const Consensus::LLMQParams& llmqParams,
384 : gsl::not_null<const CBlockIndex*> pindexNew) const
385 : {
386 0 : const bool is_masternode = m_role->IsMasternode();
387 0 : const uint256 proTxHash = is_masternode ? m_role->GetProTxHash() : uint256{};
388 :
389 0 : auto lastQuorums = m_qman.ScanQuorums(llmqParams.type, pindexNew, (size_t)llmqParams.keepOldConnections);
390 0 : auto deletableQuorums = GetQuorumsToDelete(llmqParams, pindexNew);
391 :
392 0 : const bool watchOtherISQuorums = is_masternode &&
393 0 : llmqParams.type == Params().GetConsensus().llmqTypeDIP0024InstantSend &&
394 0 : std::ranges::any_of(lastQuorums, [&proTxHash](const auto& old_quorum) { return old_quorum->IsMember(proTxHash); });
395 :
396 0 : for (const auto& quorum : lastQuorums) {
397 0 : if (EnsureQuorumConnections(llmqParams, m_connman, m_sporkman,
398 0 : {m_dmnman, m_qsnapman, m_chainman, quorum->m_quorum_base_block_index},
399 0 : m_dmnman.GetListAtChainTip(), proTxHash,
400 0 : /*is_masternode=*/is_masternode,
401 0 : /*quorums_watch=*/is_masternode ? m_role->IsWatching() : true)) {
402 0 : if (deletableQuorums.erase(quorum->qc->quorumHash) > 0) {
403 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn quorum connections for quorum: [%d:%s]\n",
404 : __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight,
405 : quorum->m_quorum_base_block_index->nHeight,
406 : quorum->m_quorum_base_block_index->GetBlockHash().ToString());
407 0 : }
408 0 : } else if (watchOtherISQuorums && !quorum->IsMember(proTxHash)) {
409 0 : Uint256HashSet connections;
410 0 : const auto& cindexes = utils::CalcDeterministicWatchConnections(llmqParams.type,
411 0 : quorum->m_quorum_base_block_index,
412 0 : quorum->members.size(), 1);
413 0 : for (auto idx : cindexes) {
414 0 : connections.emplace(quorum->members[idx]->proTxHash);
415 : }
416 0 : if (!connections.empty()) {
417 0 : if (!m_connman.HasMasternodeQuorumNodes(llmqParams.type, quorum->m_quorum_base_block_index->GetBlockHash())) {
418 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] adding mn inter-quorum connections for quorum: [%d:%s]\n",
419 : __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight,
420 : quorum->m_quorum_base_block_index->nHeight,
421 : quorum->m_quorum_base_block_index->GetBlockHash().ToString());
422 0 : m_connman.SetMasternodeQuorumNodes(llmqParams.type,
423 0 : quorum->m_quorum_base_block_index->GetBlockHash(), connections);
424 0 : m_connman.SetMasternodeQuorumRelayMembers(llmqParams.type,
425 0 : quorum->m_quorum_base_block_index->GetBlockHash(), connections);
426 0 : }
427 0 : if (deletableQuorums.erase(quorum->qc->quorumHash) > 0) {
428 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn inter-quorum connections for quorum: [%d:%s]\n",
429 : __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight,
430 : quorum->m_quorum_base_block_index->nHeight,
431 : quorum->m_quorum_base_block_index->GetBlockHash().ToString());
432 0 : }
433 0 : }
434 0 : }
435 : }
436 :
437 0 : for (const auto& quorumHash : deletableQuorums) {
438 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- removing masternodes quorum connections for quorum %s:\n",
439 : __func__, quorumHash.ToString());
440 0 : m_connman.RemoveMasternodeQuorumNodes(llmqParams.type, quorumHash);
441 : }
442 0 : }
443 :
444 0 : void NetQuorum::TriggerQuorumDataRecoveryThreads(gsl::not_null<const CBlockIndex*> block_index) const
445 : {
446 0 : if (!m_quorums_recovery) return;
447 :
448 0 : const bool is_masternode = m_role->IsMasternode();
449 0 : const uint256 proTxHash = is_masternode ? m_role->GetProTxHash() : uint256{};
450 :
451 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- Process block %s as protx_hash=%s\n", __func__, block_index->GetBlockHash().ToString(), proTxHash.ToString());
452 :
453 0 : for (const auto& params : Params().GetConsensus().llmqs) {
454 0 : auto vecQuorums = m_qman.ScanQuorums(params.type, block_index, params.keepOldConnections);
455 0 : const bool fWeAreQuorumTypeMember = is_masternode && std::ranges::any_of(vecQuorums, [&proTxHash](const auto& pQuorum) {
456 0 : return pQuorum->IsValidMember(proTxHash);
457 : });
458 :
459 0 : for (auto& pQuorum : vecQuorums) {
460 0 : if (is_masternode && pQuorum->IsValidMember(proTxHash)) {
461 0 : uint16_t nDataMask{0};
462 0 : if (!pQuorum->HasVerificationVector()) {
463 0 : nDataMask |= CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR;
464 0 : }
465 0 : if (!pQuorum->GetSkShare().IsValid()) {
466 0 : nDataMask |= CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS;
467 0 : }
468 0 : if (nDataMask != 0) {
469 0 : StartSkShareRecoveryThread(block_index, std::move(pQuorum), nDataMask);
470 0 : } else {
471 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- No data needed from (%d, %s) at height %d\n", __func__,
472 : std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(),
473 : block_index->nHeight);
474 : }
475 0 : } else {
476 0 : TryStartVvecSyncThread(block_index, std::move(pQuorum), fWeAreQuorumTypeMember);
477 : }
478 : }
479 0 : }
480 0 : }
481 :
482 0 : void NetQuorum::DataRecoveryThread(gsl::not_null<const CBlockIndex*> block_index, CQuorumCPtr pQuorum,
483 : uint16_t data_mask, const uint256& protx_hash, size_t start_offset) const
484 : {
485 0 : size_t nTries{0};
486 0 : uint16_t nDataMask{data_mask};
487 0 : int64_t nTimeLastSuccess{0};
488 0 : uint256* pCurrentMemberHash{nullptr};
489 0 : std::vector<uint256> vecMemberHashes;
490 0 : const int64_t nRequestTimeout{10};
491 :
492 0 : auto printLog = [&](const std::string& strMessage) {
493 0 : const std::string strMember{pCurrentMemberHash == nullptr ? "nullptr" : pCurrentMemberHash->ToString()};
494 0 : LogPrint(BCLog::LLMQ, "NetQuorum::DataRecoveryThread -- %s - for llmqType %d, quorumHash %s, nDataMask (%d/%d), pCurrentMemberHash %s, nTries %d\n",
495 : strMessage, std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(),
496 : nDataMask, data_mask, strMember, nTries);
497 0 : };
498 0 : printLog("Start");
499 :
500 0 : while (!m_mn_sync.IsBlockchainSynced() && !quorumThreadInterrupt) {
501 0 : quorumThreadInterrupt.sleep_for(std::chrono::seconds(nRequestTimeout));
502 : }
503 :
504 0 : if (quorumThreadInterrupt) {
505 0 : printLog("Aborted");
506 0 : return;
507 : }
508 :
509 0 : vecMemberHashes.reserve(pQuorum->qc->validMembers.size());
510 0 : for (auto& member : pQuorum->members) {
511 0 : if (pQuorum->IsValidMember(member->proTxHash) && member->proTxHash != protx_hash) {
512 0 : vecMemberHashes.push_back(member->proTxHash);
513 0 : }
514 : }
515 0 : std::sort(vecMemberHashes.begin(), vecMemberHashes.end());
516 :
517 0 : printLog("Try to request");
518 :
519 0 : while (nDataMask > 0 && !quorumThreadInterrupt) {
520 0 : if (nDataMask & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR &&
521 0 : pQuorum->HasVerificationVector()) {
522 0 : nDataMask &= ~CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR;
523 0 : printLog("Received quorumVvec");
524 0 : }
525 :
526 0 : if (nDataMask & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS && pQuorum->GetSkShare().IsValid()) {
527 0 : nDataMask &= ~CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS;
528 0 : printLog("Received skShare");
529 0 : }
530 :
531 0 : if (nDataMask == 0) {
532 0 : printLog("Success");
533 0 : break;
534 : }
535 :
536 0 : if ((GetTime<std::chrono::seconds>().count() - nTimeLastSuccess) > nRequestTimeout) {
537 0 : if (nTries >= vecMemberHashes.size()) {
538 0 : printLog("All tried but failed");
539 0 : break;
540 : }
541 0 : pCurrentMemberHash = &vecMemberHashes[(start_offset + nTries++) % vecMemberHashes.size()];
542 0 : if (m_qman.IsDataRequestPending(*pCurrentMemberHash, /*we_requested=*/true, pQuorum->qc->quorumHash,
543 0 : pQuorum->qc->llmqType)) {
544 0 : printLog("Already asked");
545 0 : continue;
546 : }
547 0 : quorumThreadInterrupt.sleep_for(std::chrono::milliseconds(start_offset * 100));
548 0 : nTimeLastSuccess = GetTime<std::chrono::seconds>().count();
549 0 : m_connman.AddPendingMasternode(*pCurrentMemberHash);
550 0 : printLog("Connect");
551 0 : }
552 :
553 0 : m_connman.ForEachNode([&](CNode* pNode) {
554 0 : auto verifiedProRegTxHash = pNode->GetVerifiedProRegTxHash();
555 0 : if (pCurrentMemberHash == nullptr || verifiedProRegTxHash != *pCurrentMemberHash) {
556 0 : return;
557 : }
558 :
559 0 : switch (RequestQuorumData(*pNode, *pQuorum, nDataMask, protx_hash)) {
560 : case DataRequestStatus::Requested:
561 0 : nTimeLastSuccess = GetTime<std::chrono::seconds>().count();
562 0 : printLog("Requested");
563 0 : return;
564 : case DataRequestStatus::NotFound:
565 0 : printLog("Failed");
566 0 : pNode->fDisconnect = true;
567 0 : pCurrentMemberHash = nullptr;
568 0 : return;
569 : case DataRequestStatus::Processed:
570 0 : printLog("Processed");
571 0 : pNode->fDisconnect = true;
572 0 : pCurrentMemberHash = nullptr;
573 0 : return;
574 : case DataRequestStatus::Pending:
575 0 : printLog("Waiting");
576 0 : return;
577 : }
578 0 : });
579 0 : quorumThreadInterrupt.sleep_for(std::chrono::seconds(1));
580 : }
581 0 : pQuorum->ReleaseRecovery();
582 0 : printLog("Done");
583 0 : }
584 :
585 0 : void NetQuorum::StartVvecSyncThread(gsl::not_null<const CBlockIndex*> block_index, CQuorumCPtr pQuorum) const
586 : {
587 0 : if (pQuorum->qc->validMembers.empty()) return;
588 0 : if (!pQuorum->TryClaimRecovery()) {
589 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- Already running\n", __func__);
590 0 : return;
591 : }
592 :
593 0 : workerPool.push([pQuorum = std::move(pQuorum), block_index, this](int threadId) mutable {
594 0 : DataRecoveryThread(block_index, std::move(pQuorum), CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR,
595 0 : /*protx_hash=*/uint256(), /*start_offset=*/0);
596 0 : });
597 0 : }
598 :
599 0 : void NetQuorum::TryStartVvecSyncThread(gsl::not_null<const CBlockIndex*> block_index, CQuorumCPtr pQuorum,
600 : bool fWeAreQuorumTypeMember) const
601 : {
602 0 : if (pQuorum->IsRecoveryRunning()) return;
603 :
604 0 : const bool fSyncForTypeEnabled = m_sync_map.count(pQuorum->qc->llmqType) > 0;
605 0 : const QvvecSyncMode syncMode = fSyncForTypeEnabled ? m_sync_map.at(pQuorum->qc->llmqType) : QvvecSyncMode::Invalid;
606 0 : const bool fSyncCurrent = syncMode == QvvecSyncMode::Always ||
607 0 : (syncMode == QvvecSyncMode::OnlyIfTypeMember && fWeAreQuorumTypeMember);
608 :
609 0 : if ((fSyncForTypeEnabled && fSyncCurrent) && !pQuorum->HasVerificationVector()) {
610 0 : StartVvecSyncThread(block_index, std::move(pQuorum));
611 0 : } else {
612 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- No data needed from (%d, %s) at height %d\n", __func__,
613 : std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(), block_index->nHeight);
614 : }
615 0 : }
616 :
617 0 : void NetQuorum::StartSkShareRecoveryThread(gsl::not_null<const CBlockIndex*> pIndex, CQuorumCPtr pQuorum,
618 : uint16_t nDataMaskIn) const
619 : {
620 0 : if (pQuorum->qc->validMembers.empty()) return;
621 :
622 0 : if (!pQuorum->TryClaimRecovery()) {
623 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- Already running\n", __func__);
624 0 : return;
625 : }
626 :
627 0 : workerPool.push([pQuorum = std::move(pQuorum), pIndex, nDataMaskIn, this](int threadId) mutable {
628 0 : const size_t size_offset = GetQuorumRecoveryStartOffset(*pQuorum, pIndex);
629 0 : DataRecoveryThread(pIndex, std::move(pQuorum), nDataMaskIn, m_role->GetProTxHash(), size_offset);
630 0 : });
631 0 : }
632 :
633 0 : size_t NetQuorum::GetQuorumRecoveryStartOffset(const CQuorum& quorum,
634 : gsl::not_null<const CBlockIndex*> pIndex) const
635 : {
636 0 : auto mns = m_dmnman.GetListForBlock(pIndex);
637 0 : std::vector<uint256> vecProTxHashes;
638 0 : vecProTxHashes.reserve(mns.GetCounts().enabled());
639 0 : mns.ForEachMN(/*onlyValid=*/true,
640 0 : [&](const auto& pMasternode) { vecProTxHashes.emplace_back(pMasternode.proTxHash); });
641 0 : std::sort(vecProTxHashes.begin(), vecProTxHashes.end());
642 0 : size_t nIndex{0};
643 : {
644 0 : auto my_protx_hash = m_role->GetProTxHash();
645 0 : for (const auto i : util::irange(vecProTxHashes.size())) {
646 : // cppcheck-suppress useStlAlgorithm
647 0 : if (my_protx_hash == vecProTxHashes[i]) {
648 0 : nIndex = i;
649 0 : break;
650 : }
651 : }
652 : }
653 0 : return nIndex % quorum.qc->validMembers.size();
654 0 : }
655 :
656 0 : void NetQuorum::StartCleanupOldQuorumDataThread(gsl::not_null<const CBlockIndex*> pIndex) const
657 : {
658 : // Note: this function is CPU heavy and we don't want it to be running during DKGs.
659 : // The largest dkgMiningWindowStart for a related quorum type is 42 (LLMQ_60_75).
660 : // At the same time most quorums use dkgInterval = 24 so the next DKG for them
661 : // (after block 576 + 42) will start at block 576 + 24 * 2. That's only a 6 blocks
662 : // window and it's better to have more room so we pick next cycle.
663 : // dkgMiningWindowStart for small quorums is 10 i.e. a safe block to start
664 : // these calculations is at height 576 + 24 * 2 + 10 = 576 + 58.
665 0 : if (pIndex->nHeight % 576 != 58) {
666 0 : return;
667 : }
668 :
669 0 : cxxtimer::Timer t(/*start=*/true);
670 0 : LogPrint(BCLog::LLMQ, "NetQuorum::%s -- start\n", __func__);
671 :
672 : // do not block the caller thread
673 0 : workerPool.push([pIndex, t, this](int threadId) {
674 0 : Uint256HashSet dbKeysToSkip;
675 :
676 0 : if (LOCK(cs_cleanup); cleanupQuorumsCache.empty()) {
677 0 : utils::InitQuorumsCache(cleanupQuorumsCache, m_chainman.GetConsensus(), /*limit_by_connections=*/false);
678 0 : }
679 0 : for (const auto& params : Params().GetConsensus().llmqs) {
680 0 : if (quorumThreadInterrupt) {
681 0 : break;
682 : }
683 0 : LOCK(cs_cleanup);
684 0 : auto& cache = cleanupQuorumsCache[params.type];
685 0 : const CBlockIndex* pindex_loop{pIndex};
686 0 : Uint256HashSet quorum_keys;
687 0 : while (pindex_loop != nullptr && pIndex->nHeight - pindex_loop->nHeight < params.max_store_depth()) {
688 0 : uint256 quorum_key;
689 0 : if (cache.get(pindex_loop->GetBlockHash(), quorum_key)) {
690 0 : quorum_keys.insert(quorum_key);
691 0 : if (quorum_keys.size() >= static_cast<size_t>(params.keepOldKeys)) break; // extra safety belt
692 0 : }
693 0 : pindex_loop = pindex_loop->pprev;
694 : }
695 0 : for (const auto& pQuorum : m_qman.ScanQuorums(params.type, pIndex, params.keepOldKeys - quorum_keys.size())) {
696 0 : const uint256 quorum_key = MakeQuorumKey(*pQuorum);
697 0 : quorum_keys.insert(quorum_key);
698 0 : cache.insert(pQuorum->m_quorum_base_block_index->GetBlockHash(), quorum_key);
699 : }
700 0 : dbKeysToSkip.merge(quorum_keys);
701 0 : }
702 :
703 0 : if (!quorumThreadInterrupt) {
704 0 : m_qman.CleanupOldQuorumData(dbKeysToSkip);
705 0 : }
706 :
707 0 : LogPrint(BCLog::LLMQ, "NetQuorum::StartCleanupOldQuorumDataThread -- done. time=%d\n", t.count());
708 0 : });
709 0 : }
710 :
711 0 : bool EnsureQuorumConnections(const Consensus::LLMQParams& llmqParams, CConnman& connman, const CSporkManager& sporkman,
712 : const UtilParameters& util_params, const CDeterministicMNList& tip_mn_list,
713 : const uint256& myProTxHash, bool is_masternode, bool quorums_watch)
714 : {
715 0 : if (!is_masternode && !quorums_watch) {
716 0 : return false;
717 : }
718 :
719 0 : auto members = utils::GetAllQuorumMembers(llmqParams.type, util_params);
720 0 : if (members.empty()) {
721 0 : return false;
722 : }
723 :
724 0 : bool isMember = std::ranges::find_if(members, [&](const auto& dmn) { return dmn->proTxHash == myProTxHash; }) !=
725 0 : members.end();
726 :
727 0 : if (!isMember && !quorums_watch) {
728 0 : return false;
729 : }
730 :
731 0 : LogPrint(BCLog::NET_NETCONN, "%s -- isMember=%d for quorum %s:\n", __func__, isMember,
732 : util_params.m_base_index->GetBlockHash().ToString());
733 :
734 0 : Uint256HashSet connections;
735 0 : Uint256HashSet relayMembers;
736 0 : if (isMember) {
737 0 : connections = utils::GetQuorumConnections(llmqParams, sporkman, util_params, myProTxHash, /*onlyOutbound=*/true);
738 : // If all-members-connected is enabled for this quorum type, leverage the full-mesh
739 : // connections for low-latency recovered sig propagation by treating all members as
740 : // relay members (instead of the ring-based subset). This ensures peers will send
741 : // QSENDRECSIGS to each other across the full mesh and set m_wants_recsigs widely.
742 0 : if (IsAllMembersConnectedEnabled(llmqParams.type, sporkman)) {
743 0 : for (const auto& dmn : members) {
744 0 : if (dmn->proTxHash != myProTxHash) {
745 0 : relayMembers.emplace(dmn->proTxHash);
746 0 : }
747 : }
748 0 : } else {
749 0 : relayMembers = utils::GetQuorumRelayMembers(llmqParams, util_params, myProTxHash, true);
750 : }
751 0 : } else {
752 0 : auto cindexes = utils::CalcDeterministicWatchConnections(llmqParams.type, util_params.m_base_index, members.size(), 1);
753 0 : for (auto idx : cindexes) {
754 0 : connections.emplace(members[idx]->proTxHash);
755 : }
756 0 : relayMembers = connections;
757 0 : }
758 0 : if (!connections.empty()) {
759 0 : if (!connman.HasMasternodeQuorumNodes(llmqParams.type, util_params.m_base_index->GetBlockHash()) &&
760 0 : LogAcceptDebug(BCLog::LLMQ)) {
761 0 : std::string debugMsg = strprintf("%s -- adding masternodes quorum connections for quorum %s:\n", __func__,
762 0 : util_params.m_base_index->GetBlockHash().ToString());
763 0 : for (const auto& c : connections) {
764 0 : auto dmn = tip_mn_list.GetValidMN(c);
765 0 : if (!dmn) {
766 0 : debugMsg += strprintf(" %s (not in valid MN set anymore)\n", c.ToString());
767 0 : } else {
768 0 : debugMsg += strprintf(" %s (%s)\n", c.ToString(),
769 0 : dmn->pdmnState->netInfo->GetPrimary().ToStringAddrPort());
770 : }
771 0 : }
772 0 : LogPrint(BCLog::NET_NETCONN, debugMsg.c_str()); /* Continued */
773 0 : }
774 0 : connman.SetMasternodeQuorumNodes(llmqParams.type, util_params.m_base_index->GetBlockHash(), connections);
775 0 : }
776 0 : if (!relayMembers.empty()) {
777 0 : connman.SetMasternodeQuorumRelayMembers(llmqParams.type, util_params.m_base_index->GetBlockHash(), relayMembers);
778 0 : }
779 0 : return true;
780 0 : }
781 :
782 : } // namespace llmq
|