Line data Source code
1 : // Copyright (c) 2025 The Dash Core developers
2 : // Distributed under the MIT software license, see the accompanying
3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 :
5 : #include <llmq/net_signing.h>
6 :
7 : #include <bls/bls_batchverifier.h>
8 : #include <llmq/quorums.h>
9 : #include <llmq/signhash.h>
10 : #include <llmq/signing_shares.h>
11 : #include <llmq/signing.h>
12 : #include <spork.h>
13 : #include <util/std23.h>
14 :
15 : #include <logging.h>
16 : #include <streams.h>
17 : #include <util/thread.h>
18 : #include <validationinterface.h>
19 :
20 : #include <cxxtimer.hpp>
21 :
22 : #include <algorithm>
23 : #include <ranges>
24 : #include <unordered_map>
25 :
26 : namespace llmq {
27 0 : void NetSigning::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv)
28 : {
29 0 : if (msg_type == NetMsgType::QSIGREC) {
30 0 : auto recoveredSig = std::make_shared<CRecoveredSig>();
31 0 : vRecv >> *recoveredSig;
32 :
33 0 : WITH_LOCK(cs_main, m_peer_manager->PeerEraseObjectRequest(pfrom.GetId(), CInv{MSG_QUORUM_RECOVERED_SIG,
34 : recoveredSig->GetHash()}));
35 :
36 0 : if (!Params().GetLLMQ(recoveredSig->getLlmqType()).has_value()) {
37 0 : m_peer_manager->PeerMisbehaving(pfrom.GetId(), 100);
38 0 : return;
39 : }
40 :
41 0 : m_sig_manager.VerifyAndProcessRecoveredSig(pfrom.GetId(), std::move(recoveredSig));
42 0 : }
43 :
44 0 : if (m_shares_manager == nullptr) return;
45 :
46 0 : if (m_sporkman.IsSporkActive(SPORK_21_QUORUM_ALL_CONNECTED) && msg_type == NetMsgType::QSIGSHARE) {
47 0 : std::vector<CSigShare> receivedSigShares;
48 0 : vRecv >> receivedSigShares;
49 :
50 0 : if (receivedSigShares.size() > CSigSharesManager::MAX_MSGS_SIG_SHARES) {
51 0 : LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n",
52 : __func__, receivedSigShares.size(), CSigSharesManager::MAX_MSGS_SIG_SHARES, pfrom.GetId());
53 0 : BanNode(pfrom.GetId());
54 0 : return;
55 : }
56 :
57 0 : for (const auto& sigShare : receivedSigShares) {
58 0 : if (!m_shares_manager->ProcessMessageSigShare(pfrom.GetId(), sigShare)) {
59 0 : BanNode(pfrom.GetId());
60 0 : }
61 : }
62 0 : }
63 :
64 0 : if (msg_type == NetMsgType::QSIGSESANN) {
65 0 : std::vector<CSigSesAnn> msgs;
66 0 : vRecv >> msgs;
67 0 : if (msgs.size() > CSigSharesManager::MAX_MSGS_CNT_QSIGSESANN) {
68 0 : LogPrint(BCLog::LLMQ_SIGS, /* Continued */
69 : "NetSigning::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n",
70 : __func__, msgs.size(), CSigSharesManager::MAX_MSGS_CNT_QSIGSESANN, pfrom.GetId());
71 0 : BanNode(pfrom.GetId());
72 0 : return;
73 : }
74 0 : if (!std::ranges::all_of(msgs, [this, &pfrom](const auto& ann) {
75 0 : return m_shares_manager->ProcessMessageSigSesAnn(pfrom, ann);
76 : })) {
77 0 : BanNode(pfrom.GetId());
78 0 : return;
79 : }
80 0 : } else if (msg_type == NetMsgType::QSIGSHARESINV || msg_type == NetMsgType::QGETSIGSHARES) {
81 0 : std::vector<CSigSharesInv> msgs;
82 0 : vRecv >> msgs;
83 0 : if (msgs.size() > CSigSharesManager::MAX_MSGS_CNT_QSIGSHARES) {
84 0 : LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many invs in %s message. cnt=%d, max=%d, node=%d\n",
85 : __func__, msg_type, msgs.size(), CSigSharesManager::MAX_MSGS_CNT_QSIGSHARES, pfrom.GetId());
86 0 : BanNode(pfrom.GetId());
87 0 : return;
88 : }
89 0 : if (!std::ranges::all_of(msgs, [this, &pfrom, &msg_type](const auto& inv) {
90 0 : return m_shares_manager->ProcessMessageSigShares(pfrom, inv, msg_type);
91 : })) {
92 0 : BanNode(pfrom.GetId());
93 0 : return;
94 : }
95 0 : } else if (msg_type == NetMsgType::QBSIGSHARES) {
96 0 : std::vector<CBatchedSigShares> msgs;
97 0 : vRecv >> msgs;
98 0 : const size_t totalSigsCount = std23::ranges::fold_left(msgs, size_t{0}, [](size_t s, const auto& bs) {
99 0 : return s + bs.sigShares.size();
100 : });
101 0 : if (totalSigsCount > CSigSharesManager::MAX_MSGS_TOTAL_BATCHED_SIGS) {
102 0 : LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n",
103 : __func__, msgs.size(), CSigSharesManager::MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom.GetId());
104 0 : BanNode(pfrom.GetId());
105 0 : return;
106 : }
107 0 : if (!std::ranges::all_of(msgs, [this, &pfrom](const auto& bs) {
108 0 : return m_shares_manager->ProcessMessageBatchedSigShares(pfrom, bs);
109 : })) {
110 0 : BanNode(pfrom.GetId());
111 0 : return;
112 : }
113 0 : }
114 0 : }
115 :
116 0 : void NetSigning::Start()
117 : {
118 : // can't start new thread if we have one running already
119 0 : assert(!signing_thread.joinable());
120 0 : assert(!shares_cleaning_thread.joinable());
121 0 : assert(!shares_dispatcher_thread.joinable());
122 :
123 0 : signing_thread = std::thread(&util::TraceThread, "recsigs", [this] { WorkThreadSigning(); });
124 :
125 0 : if (m_shares_manager) {
126 : // Initialize worker pool
127 0 : int worker_count = std::clamp(static_cast<int>(std::thread::hardware_concurrency() / 2), 1, 4);
128 0 : worker_pool.resize(worker_count);
129 0 : RenameThreadPool(worker_pool, "sigsh-work");
130 :
131 0 : shares_cleaning_thread = std::thread(&util::TraceThread, "sigsh-maint", [this] { WorkThreadCleaning(); });
132 0 : shares_dispatcher_thread = std::thread(&util::TraceThread, "sigsh-dispat", [this] { WorkThreadDispatcher(); });
133 0 : }
134 0 : }
135 :
136 0 : void NetSigning::Stop()
137 : {
138 : // make sure to call InterruptWorkerThread() first
139 0 : if (!workInterrupt) {
140 0 : assert(false);
141 : }
142 :
143 0 : if (signing_thread.joinable()) {
144 0 : signing_thread.join();
145 0 : }
146 :
147 0 : if (m_shares_manager) {
148 : // Join threads FIRST to stop any pending push() calls
149 0 : if (shares_cleaning_thread.joinable()) {
150 0 : shares_cleaning_thread.join();
151 0 : }
152 0 : if (shares_dispatcher_thread.joinable()) {
153 0 : shares_dispatcher_thread.join();
154 0 : }
155 :
156 : // Then stop worker pool (now safe, no more push() calls)
157 0 : worker_pool.clear_queue();
158 0 : worker_pool.stop(true);
159 0 : }
160 0 : }
161 :
162 0 : void NetSigning::ProcessRecoveredSig(std::shared_ptr<const CRecoveredSig> recovered_sig, bool consider_proactive_relay)
163 : {
164 0 : if (recovered_sig == nullptr) return;
165 0 : if (!m_sig_manager.ProcessRecoveredSig(recovered_sig)) return;
166 :
167 0 : auto listeners = m_sig_manager.GetListeners();
168 0 : for (auto& l : listeners) {
169 0 : auto result = l->HandleNewRecoveredSig(*recovered_sig);
170 0 : if (const auto* inv = std::get_if<CInv>(&result)) {
171 0 : m_peer_manager->PeerRelayInv(*inv);
172 0 : } else if (const auto* tx_ref = std::get_if<CTransactionRef>(&result)) {
173 0 : m_peer_manager->PeerRelayTransaction((*tx_ref)->GetHash());
174 0 : }
175 0 : }
176 :
177 : // TODO refactor to use a better abstraction analogous to IsAllMembersConnectedEnabled
178 0 : auto proactive_relay = consider_proactive_relay && recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_100_67 &&
179 0 : recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_400_60 &&
180 0 : recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_400_85;
181 0 : GetMainSignals().NotifyRecoveredSig(recovered_sig, recovered_sig->GetHash().ToString(), proactive_relay);
182 0 : }
183 :
184 0 : bool NetSigning::ProcessPendingRecoveredSigs()
185 : {
186 0 : Uint256HashMap<std::shared_ptr<const CRecoveredSig>> pending{m_sig_manager.FetchPendingReconstructed()};
187 :
188 0 : for (const auto& p : pending) {
189 0 : ProcessRecoveredSig(p.second, true);
190 : }
191 :
192 0 : std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>> recSigsByNode;
193 0 : std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CBLSPublicKey, StaticSaltedHasher> pubkeys;
194 :
195 0 : const size_t nMaxBatchSize{32};
196 0 : bool more_work = m_sig_manager.CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, pubkeys);
197 0 : if (recSigsByNode.empty()) {
198 0 : return false;
199 : }
200 :
201 : // It's ok to perform insecure batched verification here as we verify against the quorum public keys, which are not
202 : // craftable by individual entities, making the rogue public key attack impossible
203 0 : CBLSBatchVerifier<NodeId, uint256> batchVerifier(false, false);
204 :
205 0 : size_t verifyCount = 0;
206 0 : for (const auto& [nodeId, v] : recSigsByNode) {
207 0 : for (const auto& recSig : v) {
208 : // we didn't verify the lazy signature until now
209 0 : if (!recSig->sig.Get().IsValid()) {
210 0 : batchVerifier.badSources.emplace(nodeId);
211 0 : break;
212 : }
213 :
214 0 : const auto& pubkey = pubkeys.at(std::make_pair(recSig->getLlmqType(), recSig->getQuorumHash()));
215 0 : batchVerifier.PushMessage(nodeId, recSig->GetHash(), recSig->buildSignHash().Get(), recSig->sig.Get(), pubkey);
216 0 : verifyCount++;
217 : }
218 : }
219 :
220 0 : cxxtimer::Timer verifyTimer(true);
221 0 : batchVerifier.Verify();
222 0 : verifyTimer.stop();
223 :
224 0 : LogPrint(BCLog::LLMQ, "NetSigning::%s -- verified recovered sig(s). count=%d, vt=%d, nodes=%d\n", __func__,
225 : verifyCount, verifyTimer.count(), recSigsByNode.size());
226 :
227 0 : Uint256HashSet processed;
228 0 : for (const auto& [nodeId, v] : recSigsByNode) {
229 0 : if (batchVerifier.badSources.count(nodeId)) {
230 0 : LogPrint(BCLog::LLMQ, "NetSigning::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId);
231 0 : m_peer_manager->PeerMisbehaving(nodeId, 100);
232 0 : continue;
233 : }
234 :
235 0 : for (const auto& recSig : v) {
236 0 : if (!processed.emplace(recSig->GetHash()).second) {
237 0 : continue;
238 : }
239 :
240 0 : ProcessRecoveredSig(recSig, nodeId == -1);
241 : }
242 : }
243 :
244 0 : return more_work;
245 0 : }
246 :
247 0 : void NetSigning::WorkThreadSigning()
248 : {
249 0 : while (!workInterrupt) {
250 0 : bool fMoreWork = ProcessPendingRecoveredSigs();
251 :
252 0 : constexpr auto CLEANUP_INTERVAL{5s};
253 0 : if (cleanupThrottler.TryCleanup(CLEANUP_INTERVAL)) {
254 0 : m_sig_manager.Cleanup();
255 0 : }
256 :
257 : // TODO Wakeup when pending signing is needed?
258 0 : if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
259 0 : return;
260 : }
261 : }
262 0 : }
263 :
264 0 : void NetSigning::RemoveBannedNodeStates()
265 : {
266 0 : assert(m_shares_manager != nullptr);
267 : // Called regularly to cleanup local node states for banned nodes
268 0 : m_shares_manager->RemoveNodesIf([this](NodeId node_id) { return m_peer_manager->PeerIsBanned(node_id); });
269 0 : }
270 :
271 0 : void NetSigning::BanNode(NodeId nodeId)
272 : {
273 0 : if (nodeId == -1) return;
274 :
275 0 : m_peer_manager->PeerMisbehaving(nodeId, 100);
276 0 : if (m_shares_manager) {
277 0 : m_shares_manager->MarkAsBanned(nodeId);
278 0 : }
279 0 : }
280 :
281 0 : void NetSigning::WorkThreadCleaning()
282 : {
283 0 : assert(m_shares_manager);
284 :
285 0 : while (!workInterrupt) {
286 0 : RemoveBannedNodeStates();
287 :
288 0 : m_shares_manager->SendMessages();
289 0 : m_shares_manager->Cleanup();
290 :
291 0 : workInterrupt.sleep_for(std::chrono::milliseconds(100));
292 : }
293 0 : }
294 :
295 0 : void NetSigning::WorkThreadDispatcher()
296 : {
297 0 : assert(m_shares_manager);
298 :
299 0 : while (!workInterrupt) {
300 : // Dispatch all pending signs (individual tasks)
301 : {
302 0 : auto signs = m_shares_manager->DispatchPendingSigns();
303 : // Dispatch all signs to worker pool
304 0 : for (auto& work : signs) {
305 0 : if (workInterrupt) break;
306 :
307 0 : worker_pool.push([this, work = std::move(work)](int) mutable {
308 0 : auto rs = m_shares_manager->SignAndProcessSingleShare(std::move(work));
309 0 : ProcessRecoveredSig(rs, true);
310 0 : });
311 : }
312 0 : }
313 :
314 : // Collect pending sig shares synchronously and dispatch each batch to a worker for parallel BLS verification
315 0 : while (!workInterrupt) {
316 0 : std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
317 0 : std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
318 :
319 0 : const size_t nMaxBatchSize{32};
320 0 : bool more_work = m_shares_manager->CollectPendingSigSharesToVerify(nMaxBatchSize, sigSharesByNodes, quorums);
321 :
322 0 : if (sigSharesByNodes.empty()) {
323 0 : break;
324 : }
325 :
326 0 : worker_pool.push([this, sigSharesByNodes = std::move(sigSharesByNodes), quorums = std::move(quorums)](int) mutable {
327 0 : ProcessPendingSigShares(std::move(sigSharesByNodes), std::move(quorums));
328 0 : });
329 :
330 0 : if (!more_work) {
331 0 : break;
332 : }
333 0 : }
334 :
335 : // Always sleep briefly between checks
336 0 : workInterrupt.sleep_for(std::chrono::milliseconds(10));
337 : }
338 0 : }
339 :
340 0 : void NetSigning::NotifyRecoveredSig(const std::shared_ptr<const CRecoveredSig>& sig, bool proactive_relay)
341 : {
342 0 : m_peer_manager->PeerRelayRecoveredSig(*sig, proactive_relay);
343 0 : }
344 :
345 0 : void NetSigning::ProcessPendingSigShares(
346 : std::unordered_map<NodeId, std::vector<CSigShare>>&& sigSharesByNodes,
347 : std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>&& quorums)
348 : {
349 : // It's ok to perform insecure batched verification here as we verify against the quorum public key shares,
350 : // which are not craftable by individual entities, making the rogue public key attack impossible
351 0 : CBLSBatchVerifier<NodeId, SigShareKey> batchVerifier(false, true);
352 :
353 0 : cxxtimer::Timer prepareTimer(true);
354 0 : size_t verifyCount = 0;
355 0 : for (const auto& [nodeId, v] : sigSharesByNodes) {
356 0 : for (const auto& sigShare : v) {
357 0 : if (m_sig_manager.HasRecoveredSigForId(sigShare.getLlmqType(), sigShare.getId())) {
358 0 : continue;
359 : }
360 :
361 : // Materialize the signature once. Get() internally validates, so if it returns an invalid signature,
362 : // we know it's malformed. This avoids calling Get() twice (once for IsValid(), once for PushMessage).
363 0 : CBLSSignature sig = sigShare.sigShare.Get();
364 : // we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive
365 : // deserialization in the message thread
366 0 : if (!sig.IsValid()) {
367 0 : BanNode(nodeId);
368 : // don't process any additional shares from this node
369 0 : break;
370 : }
371 :
372 0 : auto quorum = quorums.at(std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash()));
373 0 : auto pubKeyShare = quorum->GetPubKeyShare(sigShare.getQuorumMember());
374 :
375 0 : if (!pubKeyShare.IsValid()) {
376 : // this should really not happen (we already ensured we have the quorum vvec,
377 : // so we should also be able to create all pubkey shares)
378 0 : LogPrintf("NetSigning::%s -- pubKeyShare is invalid, which should not be possible here\n", __func__);
379 0 : assert(false);
380 : }
381 :
382 0 : batchVerifier.PushMessage(nodeId, sigShare.GetKey(), sigShare.GetSignHash(), sig, pubKeyShare);
383 0 : verifyCount++;
384 0 : }
385 : }
386 0 : prepareTimer.stop();
387 :
388 0 : cxxtimer::Timer verifyTimer(true);
389 0 : batchVerifier.Verify();
390 0 : verifyTimer.stop();
391 :
392 0 : LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- verified sig shares. count=%d, pt=%d, vt=%d, nodes=%d\n", __func__,
393 : verifyCount, prepareTimer.count(), verifyTimer.count(), sigSharesByNodes.size());
394 :
395 0 : for (const auto& [nodeId, v] : sigSharesByNodes) {
396 0 : if (batchVerifier.badSources.count(nodeId) != 0) {
397 0 : LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- invalid sig shares from other node, banning peer=%d\n",
398 : __func__, nodeId);
399 : // this will also cause re-requesting of the shares that were sent by this node
400 0 : BanNode(nodeId);
401 0 : continue;
402 : }
403 :
404 0 : auto rec_sigs = m_shares_manager->ProcessPendingSigShares(v, quorums);
405 0 : for (auto& rs : rec_sigs) {
406 0 : ProcessRecoveredSig(rs, true);
407 : }
408 0 : }
409 0 : }
410 :
411 : } // namespace llmq
|