Line data Source code
1 : // Copyright (c) 2025 The Dash Core developers
2 : // Distributed under the MIT software license, see the accompanying
3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 :
5 : #include <llmq/net_signing.h>
6 :
7 : #include <bls/bls_batchverifier.h>
8 : #include <llmq/quorums.h>
9 : #include <llmq/signhash.h>
10 : #include <llmq/signing_shares.h>
11 : #include <llmq/signing.h>
12 : #include <spork.h>
13 : #include <util/std23.h>
14 :
15 : #include <logging.h>
16 : #include <streams.h>
17 : #include <util/thread.h>
18 : #include <validationinterface.h>
19 :
20 : #include <cxxtimer.hpp>
21 :
22 : #include <algorithm>
23 : #include <ranges>
24 : #include <unordered_map>
25 :
26 : namespace llmq {
27 96785 : void NetSigning::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv)
28 : {
29 96785 : if (msg_type == NetMsgType::QSIGREC) {
30 33556 : auto recoveredSig = std::make_shared<CRecoveredSig>();
31 33556 : vRecv >> *recoveredSig;
32 :
33 67112 : WITH_LOCK(cs_main, m_peer_manager->PeerEraseObjectRequest(pfrom.GetId(), CInv{MSG_QUORUM_RECOVERED_SIG,
34 : recoveredSig->GetHash()}));
35 :
36 33556 : if (!Params().GetLLMQ(recoveredSig->getLlmqType()).has_value()) {
37 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 100);
38 0 : return;
39 : }
40 :
41 33556 : m_sig_manager.VerifyAndProcessRecoveredSig(pfrom.GetId(), std::move(recoveredSig));
42 33556 : }
43 :
44 96785 : if (m_shares_manager == nullptr) return;
45 :
46 94137 : if (m_sporkman.IsSporkActive(SPORK_21_QUORUM_ALL_CONNECTED) && msg_type == NetMsgType::QSIGSHARE) {
47 5601 : std::vector<CSigShare> receivedSigShares;
48 5601 : vRecv >> receivedSigShares;
49 :
50 5601 : if (receivedSigShares.size() > CSigSharesManager::MAX_MSGS_SIG_SHARES) {
51 0 : LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n",
52 : __func__, receivedSigShares.size(), CSigSharesManager::MAX_MSGS_SIG_SHARES, pfrom.GetId());
53 0 : BanNode(pfrom.GetId());
54 0 : return;
55 : }
56 :
57 11980 : for (const auto& sigShare : receivedSigShares) {
58 6379 : if (!m_shares_manager->ProcessMessageSigShare(pfrom.GetId(), sigShare)) {
59 0 : BanNode(pfrom.GetId());
60 0 : }
61 : }
62 5601 : }
63 :
64 94137 : if (msg_type == NetMsgType::QSIGSESANN) {
65 12988 : std::vector<CSigSesAnn> msgs;
66 12988 : vRecv >> msgs;
67 12988 : if (msgs.size() > CSigSharesManager::MAX_MSGS_CNT_QSIGSESANN) {
68 0 : LogPrint(BCLog::LLMQ_SIGS, /* Continued */
69 : "NetSigning::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n",
70 : __func__, msgs.size(), CSigSharesManager::MAX_MSGS_CNT_QSIGSESANN, pfrom.GetId());
71 0 : BanNode(pfrom.GetId());
72 0 : return;
73 : }
74 34804 : if (!std::ranges::all_of(msgs, [this, &pfrom](const auto& ann) {
75 21816 : return m_shares_manager->ProcessMessageSigSesAnn(pfrom, ann);
76 : })) {
77 0 : BanNode(pfrom.GetId());
78 0 : return;
79 : }
80 94137 : } else if (msg_type == NetMsgType::QSIGSHARESINV || msg_type == NetMsgType::QGETSIGSHARES) {
81 9262 : std::vector<CSigSharesInv> msgs;
82 9262 : vRecv >> msgs;
83 9262 : if (msgs.size() > CSigSharesManager::MAX_MSGS_CNT_QSIGSHARES) {
84 0 : LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many invs in %s message. cnt=%d, max=%d, node=%d\n",
85 : __func__, msg_type, msgs.size(), CSigSharesManager::MAX_MSGS_CNT_QSIGSHARES, pfrom.GetId());
86 0 : BanNode(pfrom.GetId());
87 0 : return;
88 : }
89 24595 : if (!std::ranges::all_of(msgs, [this, &pfrom, &msg_type](const auto& inv) {
90 15333 : return m_shares_manager->ProcessMessageSigShares(pfrom, inv, msg_type);
91 : })) {
92 0 : BanNode(pfrom.GetId());
93 0 : return;
94 : }
95 81149 : } else if (msg_type == NetMsgType::QBSIGSHARES) {
96 11447 : std::vector<CBatchedSigShares> msgs;
97 11447 : vRecv >> msgs;
98 30340 : const size_t totalSigsCount = std23::ranges::fold_left(msgs, size_t{0}, [](size_t s, const auto& bs) {
99 18893 : return s + bs.sigShares.size();
100 : });
101 11447 : if (totalSigsCount > CSigSharesManager::MAX_MSGS_TOTAL_BATCHED_SIGS) {
102 0 : LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n",
103 : __func__, msgs.size(), CSigSharesManager::MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom.GetId());
104 0 : BanNode(pfrom.GetId());
105 0 : return;
106 : }
107 30340 : if (!std::ranges::all_of(msgs, [this, &pfrom](const auto& bs) {
108 18893 : return m_shares_manager->ProcessMessageBatchedSigShares(pfrom, bs);
109 : })) {
110 0 : BanNode(pfrom.GetId());
111 0 : return;
112 : }
113 11447 : }
114 96785 : }
115 :
116 2831 : void NetSigning::Start()
117 : {
118 : // can't start new thread if we have one running already
119 2831 : assert(!signing_thread.joinable());
120 2831 : assert(!shares_cleaning_thread.joinable());
121 2831 : assert(!shares_dispatcher_thread.joinable());
122 :
123 5662 : signing_thread = std::thread(&util::TraceThread, "recsigs", [this] { WorkThreadSigning(); });
124 :
125 2831 : if (m_shares_manager) {
126 : // Initialize worker pool
127 660 : int worker_count = std::clamp(static_cast<int>(std::thread::hardware_concurrency() / 2), 1, 4);
128 660 : worker_pool.resize(worker_count);
129 660 : RenameThreadPool(worker_pool, "sigsh-work");
130 :
131 1320 : shares_cleaning_thread = std::thread(&util::TraceThread, "sigsh-maint", [this] { WorkThreadCleaning(); });
132 1320 : shares_dispatcher_thread = std::thread(&util::TraceThread, "sigsh-dispat", [this] { WorkThreadDispatcher(); });
133 660 : }
134 2831 : }
135 :
136 5714 : void NetSigning::Stop()
137 : {
138 : // make sure to call InterruptWorkerThread() first
139 5714 : if (!workInterrupt) {
140 0 : assert(false);
141 : }
142 :
143 5714 : if (signing_thread.joinable()) {
144 2831 : signing_thread.join();
145 2831 : }
146 :
147 5714 : if (m_shares_manager) {
148 : // Join threads FIRST to stop any pending push() calls
149 1320 : if (shares_cleaning_thread.joinable()) {
150 660 : shares_cleaning_thread.join();
151 660 : }
152 1320 : if (shares_dispatcher_thread.joinable()) {
153 660 : shares_dispatcher_thread.join();
154 660 : }
155 :
156 : // Then stop worker pool (now safe, no more push() calls)
157 1320 : worker_pool.clear_queue();
158 1320 : worker_pool.stop(true);
159 1320 : }
160 5714 : }
161 :
162 45657 : void NetSigning::ProcessRecoveredSig(std::shared_ptr<const CRecoveredSig> recovered_sig, bool consider_proactive_relay)
163 : {
164 45657 : if (recovered_sig == nullptr) return;
165 27558 : if (!m_sig_manager.ProcessRecoveredSig(recovered_sig)) return;
166 :
167 27218 : auto listeners = m_sig_manager.GetListeners();
168 135077 : for (auto& l : listeners) {
169 107859 : auto result = l->HandleNewRecoveredSig(*recovered_sig);
170 107859 : if (const auto* inv = std::get_if<CInv>(&result)) {
171 4888 : m_peer_manager->PeerRelayInv(*inv);
172 107859 : } else if (const auto* tx_ref = std::get_if<CTransactionRef>(&result)) {
173 188 : m_peer_manager->PeerRelayTransaction((*tx_ref)->GetHash());
174 188 : }
175 107859 : }
176 :
177 : // TODO refactor to use a better abstraction analogous to IsAllMembersConnectedEnabled
178 36121 : auto proactive_relay = consider_proactive_relay && recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_100_67 &&
179 8903 : recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_400_60 &&
180 8903 : recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_400_85;
181 27218 : GetMainSignals().NotifyRecoveredSig(recovered_sig, recovered_sig->GetHash().ToString(), proactive_relay);
182 45657 : }
183 :
184 378122 : bool NetSigning::ProcessPendingRecoveredSigs()
185 : {
186 378122 : Uint256HashMap<std::shared_ptr<const CRecoveredSig>> pending{m_sig_manager.FetchPendingReconstructed()};
187 :
188 378403 : for (const auto& p : pending) {
189 281 : ProcessRecoveredSig(p.second, true);
190 : }
191 :
192 378122 : std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>> recSigsByNode;
193 378122 : std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CBLSPublicKey, StaticSaltedHasher> pubkeys;
194 :
195 378122 : const size_t nMaxBatchSize{32};
196 378122 : bool more_work = m_sig_manager.CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, pubkeys);
197 378122 : if (recSigsByNode.empty()) {
198 365874 : return false;
199 : }
200 :
201 : // It's ok to perform insecure batched verification here as we verify against the quorum public keys, which are not
202 : // craftable by individual entities, making the rogue public key attack impossible
203 12248 : CBLSBatchVerifier<NodeId, uint256> batchVerifier(false, false);
204 :
205 12248 : size_t verifyCount = 0;
206 51886 : for (const auto& [nodeId, v] : recSigsByNode) {
207 39638 : for (const auto& recSig : v) {
208 : // we didn't verify the lazy signature until now
209 22431 : if (!recSig->sig.Get().IsValid()) {
210 0 : batchVerifier.badSources.emplace(nodeId);
211 0 : break;
212 : }
213 :
214 22431 : const auto& pubkey = pubkeys.at(std::make_pair(recSig->getLlmqType(), recSig->getQuorumHash()));
215 22431 : batchVerifier.PushMessage(nodeId, recSig->GetHash(), recSig->buildSignHash().Get(), recSig->sig.Get(), pubkey);
216 22431 : verifyCount++;
217 : }
218 : }
219 :
220 12248 : cxxtimer::Timer verifyTimer(true);
221 12248 : batchVerifier.Verify();
222 12248 : verifyTimer.stop();
223 :
224 12248 : LogPrint(BCLog::LLMQ, "NetSigning::%s -- verified recovered sig(s). count=%d, vt=%d, nodes=%d\n", __func__,
225 : verifyCount, verifyTimer.count(), recSigsByNode.size());
226 :
227 12248 : Uint256HashSet processed;
228 65164 : for (const auto& [nodeId, v] : recSigsByNode) {
229 34414 : if (batchVerifier.badSources.count(nodeId)) {
230 2 : LogPrint(BCLog::LLMQ, "NetSigning::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId);
231 4 : m_peer_manager->PeerMisbehaving(nodeId, 100);
232 2 : continue;
233 : }
234 :
235 39620 : for (const auto& recSig : v) {
236 22415 : if (!processed.emplace(recSig->GetHash()).second) {
237 3915 : continue;
238 : }
239 :
240 37000 : ProcessRecoveredSig(recSig, nodeId == -1);
241 : }
242 : }
243 :
244 12248 : return more_work;
245 378122 : }
246 :
247 2831 : void NetSigning::WorkThreadSigning()
248 : {
249 378124 : while (!workInterrupt) {
250 378122 : bool fMoreWork = ProcessPendingRecoveredSigs();
251 :
252 378122 : constexpr auto CLEANUP_INTERVAL{5s};
253 378122 : if (cleanupThrottler.TryCleanup(CLEANUP_INTERVAL)) {
254 17341 : m_sig_manager.Cleanup();
255 17341 : }
256 :
257 : // TODO Wakeup when pending signing is needed?
258 378122 : if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
259 2829 : return;
260 : }
261 : }
262 2831 : }
263 :
264 216234 : void NetSigning::RemoveBannedNodeStates()
265 : {
266 216234 : assert(m_shares_manager != nullptr);
267 : // Called regularly to cleanup local node states for banned nodes
268 493395 : m_shares_manager->RemoveNodesIf([this](NodeId node_id) { return m_peer_manager->PeerIsBanned(node_id); });
269 216234 : }
270 :
271 9 : void NetSigning::BanNode(NodeId nodeId)
272 : {
273 9 : if (nodeId == -1) return;
274 :
275 9 : m_peer_manager->PeerMisbehaving(nodeId, 100);
276 9 : if (m_shares_manager) {
277 9 : m_shares_manager->MarkAsBanned(nodeId);
278 9 : }
279 9 : }
280 :
281 660 : void NetSigning::WorkThreadCleaning()
282 : {
283 660 : assert(m_shares_manager);
284 :
285 216894 : while (!workInterrupt) {
286 216234 : RemoveBannedNodeStates();
287 :
288 216234 : m_shares_manager->SendMessages();
289 216234 : m_shares_manager->Cleanup();
290 :
291 216234 : workInterrupt.sleep_for(std::chrono::milliseconds(100));
292 : }
293 660 : }
294 :
295 660 : void NetSigning::WorkThreadDispatcher()
296 : {
297 660 : assert(m_shares_manager);
298 :
299 709571 : while (!workInterrupt) {
300 : // Dispatch all pending signs (individual tasks)
301 : {
302 708911 : auto signs = m_shares_manager->DispatchPendingSigns();
303 : // Dispatch all signs to worker pool
304 727323 : for (auto& work : signs) {
305 18412 : if (workInterrupt) break;
306 :
307 36824 : worker_pool.push([this, work = std::move(work)](int) mutable {
308 18412 : auto rs = m_shares_manager->SignAndProcessSingleShare(std::move(work));
309 18412 : ProcessRecoveredSig(rs, true);
310 18412 : });
311 : }
312 708911 : }
313 :
314 : // Collect pending sig shares synchronously and dispatch each batch to a worker for parallel BLS verification
315 708919 : while (!workInterrupt) {
316 708919 : std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
317 708919 : std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
318 :
319 708919 : const size_t nMaxBatchSize{32};
320 708919 : bool more_work = m_shares_manager->CollectPendingSigSharesToVerify(nMaxBatchSize, sigSharesByNodes, quorums);
321 :
322 708919 : if (sigSharesByNodes.empty()) {
323 696737 : break;
324 : }
325 :
326 24364 : worker_pool.push([this, sigSharesByNodes = std::move(sigSharesByNodes), quorums = std::move(quorums)](int) mutable {
327 12182 : ProcessPendingSigShares(std::move(sigSharesByNodes), std::move(quorums));
328 12182 : });
329 :
330 12182 : if (!more_work) {
331 12174 : break;
332 : }
333 708919 : }
334 :
335 : // Always sleep briefly between checks
336 708911 : workInterrupt.sleep_for(std::chrono::milliseconds(10));
337 : }
338 660 : }
339 :
340 27204 : void NetSigning::NotifyRecoveredSig(const std::shared_ptr<const CRecoveredSig>& sig, bool proactive_relay)
341 : {
342 27204 : m_peer_manager->PeerRelayRecoveredSig(*sig, proactive_relay);
343 27204 : }
344 :
345 12182 : void NetSigning::ProcessPendingSigShares(
346 : std::unordered_map<NodeId, std::vector<CSigShare>>&& sigSharesByNodes,
347 : std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>&& quorums)
348 : {
349 : // It's ok to perform insecure batched verification here as we verify against the quorum public key shares,
350 : // which are not craftable by individual entities, making the rogue public key attack impossible
351 12182 : CBLSBatchVerifier<NodeId, SigShareKey> batchVerifier(false, true);
352 :
353 12182 : cxxtimer::Timer prepareTimer(true);
354 12182 : size_t verifyCount = 0;
355 50372 : for (const auto& [nodeId, v] : sigSharesByNodes) {
356 38224 : for (const auto& sigShare : v) {
357 22839 : if (m_sig_manager.HasRecoveredSigForId(sigShare.getLlmqType(), sigShare.getId())) {
358 39 : continue;
359 : }
360 :
361 : // Materialize the signature once. Get() internally validates, so if it returns an invalid signature,
362 : // we know it's malformed. This avoids calling Get() twice (once for IsValid(), once for PushMessage).
363 22800 : CBLSSignature sig = sigShare.sigShare.Get();
364 : // we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive
365 : // deserialization in the message thread
366 22800 : if (!sig.IsValid()) {
367 5 : BanNode(nodeId);
368 : // don't process any additional shares from this node
369 5 : break;
370 : }
371 :
372 22795 : auto quorum = quorums.at(std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash()));
373 22795 : auto pubKeyShare = quorum->GetPubKeyShare(sigShare.getQuorumMember());
374 :
375 22795 : if (!pubKeyShare.IsValid()) {
376 : // this should really not happen (we already ensured we have the quorum vvec,
377 : // so we should also be able to create all pubkey shares)
378 0 : LogPrintf("NetSigning::%s -- pubKeyShare is invalid, which should not be possible here\n", __func__);
379 0 : assert(false);
380 : }
381 :
382 22795 : batchVerifier.PushMessage(nodeId, sigShare.GetKey(), sigShare.GetSignHash(), sig, pubKeyShare);
383 22795 : verifyCount++;
384 22795 : }
385 : }
386 12182 : prepareTimer.stop();
387 :
388 12182 : cxxtimer::Timer verifyTimer(true);
389 12182 : batchVerifier.Verify();
390 12182 : verifyTimer.stop();
391 :
392 12182 : LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- verified sig shares. count=%d, pt=%d, vt=%d, nodes=%d\n", __func__,
393 : verifyCount, prepareTimer.count(), verifyTimer.count(), sigSharesByNodes.size());
394 :
395 42966 : for (const auto& [nodeId, v] : sigSharesByNodes) {
396 30780 : if (batchVerifier.badSources.count(nodeId) != 0) {
397 4 : LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- invalid sig shares from other node, banning peer=%d\n",
398 : __func__, nodeId);
399 : // this will also cause re-requesting of the shares that were sent by this node
400 4 : BanNode(nodeId);
401 4 : continue;
402 : }
403 :
404 30772 : auto rec_sigs = m_shares_manager->ProcessPendingSigShares(v, quorums);
405 23850 : for (auto& rs : rec_sigs) {
406 8464 : ProcessRecoveredSig(rs, true);
407 : }
408 15386 : }
409 12182 : }
410 :
411 : } // namespace llmq
|