Line data Source code
1 : // Copyright (c) 2009-2010 Satoshi Nakamoto
2 : // Copyright (c) 2009-2021 The Bitcoin Core developers
3 : // Copyright (c) 2014-2025 The Dash Core developers
4 : // Distributed under the MIT software license, see the accompanying
5 : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
6 :
7 : #if defined(HAVE_CONFIG_H)
8 : #include <config/bitcoin-config.h>
9 : #endif
10 :
11 : #include <net.h>
12 : #include <netmessagemaker.h>
13 :
14 : #include <addrdb.h>
15 : #include <addrman.h>
16 : #include <banman.h>
17 : #include <clientversion.h>
18 : #include <compat/compat.h>
19 : #include <consensus/consensus.h>
20 : #include <crypto/sha256.h>
21 : #include <node/eviction.h>
22 : #include <fs.h>
23 : #include <i2p.h>
24 : #include <key.h>
25 : #include <memusage.h>
26 : #include <net_permissions.h>
27 : #include <netaddress.h>
28 : #include <netbase.h>
29 : #include <node/interface_ui.h>
30 : #include <protocol.h>
31 : #include <random.h>
32 : #include <scheduler.h>
33 : #include <util/sock.h>
34 : #include <util/strencodings.h>
35 : #include <util/system.h>
36 : #include <util/thread.h>
37 : #include <util/threadinterrupt.h>
38 : #include <util/time.h>
39 : #include <util/trace.h>
40 : #include <util/translation.h>
41 :
42 : #include <evo/deterministicmns.h>
43 : #include <masternode/meta.h>
44 : #include <masternode/sync.h>
45 : #include <stats/client.h>
46 : #include <util/std23.h>
47 : #include <util/vector.h>
48 : #include <util/wpipe.h>
49 :
50 : #ifdef WIN32
51 : #include <string.h>
52 : #endif
53 :
54 : #if HAVE_DECL_GETIFADDRS && HAVE_DECL_FREEIFADDRS
55 : #include <ifaddrs.h>
56 : #endif
57 :
58 : #include <algorithm>
59 : #include <array>
60 : #include <cstdint>
61 : #include <functional>
62 : #include <unordered_map>
63 :
64 : #include <math.h>
65 :
66 : /** Maximum number of block-relay-only anchor connections */
67 : static constexpr size_t MAX_BLOCK_RELAY_ONLY_ANCHORS = 2;
68 : static_assert (MAX_BLOCK_RELAY_ONLY_ANCHORS <= static_cast<size_t>(MAX_BLOCK_RELAY_ONLY_CONNECTIONS), "MAX_BLOCK_RELAY_ONLY_ANCHORS must not exceed MAX_BLOCK_RELAY_ONLY_CONNECTIONS.");
69 : /** Anchor IP address database file name */
70 : const char* const ANCHORS_DATABASE_FILENAME = "anchors.dat";
71 :
72 : // How often to dump addresses to peers.dat
73 : static constexpr std::chrono::minutes DUMP_PEERS_INTERVAL{15};
74 :
75 : /** Number of DNS seeds to query when the number of connections is low. */
76 : static constexpr int DNSSEEDS_TO_QUERY_AT_ONCE = 3;
77 :
78 : /** How long to delay before querying DNS seeds
79 : *
80 : * If we have more than THRESHOLD entries in addrman, then it's likely
81 : * that we got those addresses from having previously connected to the P2P
82 : * network, and that we'll be able to successfully reconnect to the P2P
83 : * network via contacting one of them. So if that's the case, spend a
84 : * little longer trying to connect to known peers before querying the
85 : * DNS seeds.
86 : */
87 : static constexpr std::chrono::seconds DNSSEEDS_DELAY_FEW_PEERS{11};
88 : static constexpr std::chrono::minutes DNSSEEDS_DELAY_MANY_PEERS{5};
89 : static constexpr int DNSSEEDS_DELAY_PEER_THRESHOLD = 1000; // "many" vs "few" peers
90 :
91 : /** The default timeframe for -maxuploadtarget. 1 day. */
92 : static constexpr std::chrono::seconds MAX_UPLOAD_TIMEFRAME{60 * 60 * 24};
93 :
94 : // A random time period (0 to 1 seconds) is added to feeler connections to prevent synchronization.
95 : static constexpr auto FEELER_SLEEP_WINDOW{1s};
96 :
97 : /** Frequency to attempt extra connections to reachable networks we're not connected to yet **/
98 : static constexpr auto EXTRA_NETWORK_PEER_INTERVAL{5min};
99 :
100 : /** Used to pass flags to the Bind() function */
101 : enum BindFlags {
102 : BF_NONE = 0,
103 : BF_REPORT_ERROR = (1U << 0),
104 : /**
105 : * Do not call AddLocal() for our special addresses, e.g., for incoming
106 : * Tor connections, to prevent gossiping them over the network.
107 : */
108 : BF_DONT_ADVERTISE = (1U << 1),
109 : };
110 :
111 : #ifndef USE_WAKEUP_PIPE
112 : // The set of sockets cannot be modified while waiting
113 : // The sleep time needs to be small to avoid new sockets stalling
114 : static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 50;
115 : #else
116 : // select() is woken up through the wakeup pipe whenever a new node is added, so we can wait much longer.
117 : // We are however still somewhat limited in how long we can sleep as there is periodic work (cleanup) to be done in
118 : // the socket handler thread
119 : static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 500;
120 : #endif /* USE_WAKEUP_PIPE */
121 :
122 : const std::string NET_MESSAGE_TYPE_OTHER = "*other*";
123 :
124 : static const uint64_t RANDOMIZER_ID_NETGROUP = 0x6c0edd8036ef4036ULL; // SHA256("netgroup")[0:8]
125 : static const uint64_t RANDOMIZER_ID_LOCALHOSTNONCE = 0xd93e69e2bbfa5735ULL; // SHA256("localhostnonce")[0:8]
126 : static const uint64_t RANDOMIZER_ID_ADDRCACHE = 0x1cf2e4ddd306dda9ULL; // SHA256("addrcache")[0:8]
127 : //
128 : // Global state variables
129 : //
130 : bool fDiscover = true;
131 : bool fListen = true;
132 : GlobalMutex g_maplocalhost_mutex;
133 3308 : std::map<CNetAddr, LocalServiceInfo> mapLocalHost GUARDED_BY(g_maplocalhost_mutex);
134 : std::string strSubVersion;
135 :
136 38315063 : size_t CSerializedNetMsg::GetMemoryUsage() const noexcept
137 : {
138 : // Don't count the dynamic memory used for the m_type string, by assuming it fits in the
139 : // "small string" optimization area (which stores data inside the object itself, up to some
140 : // size; 15 bytes in modern libstdc++).
141 38315063 : return sizeof(*this) + memusage::DynamicUsage(data);
142 : }
143 :
144 20 : void CConnman::AddAddrFetch(const std::string& strDest)
145 : {
146 20 : LOCK(m_addr_fetches_mutex);
147 20 : m_addr_fetches.push_back(strDest);
148 20 : }
149 :
150 21365 : uint16_t GetListenPort()
151 : {
152 : // If -bind= is provided with ":port" part, use that (first one if multiple are provided).
153 42534 : for (const std::string& bind_arg : gArgs.GetArgs("-bind")) {
154 21169 : constexpr uint16_t dummy_port = 0;
155 :
156 21169 : const std::optional<CService> bind_addr{Lookup(bind_arg, dummy_port, /*fAllowLookup=*/false)};
157 21169 : if (bind_addr.has_value() && bind_addr->GetPort() != dummy_port) return bind_addr->GetPort();
158 21169 : }
159 :
160 : // Otherwise, if -whitebind= without NetPermissionFlags::NoBan is provided, use that
161 : // (-whitebind= is required to have ":port").
162 21364 : for (const std::string& whitebind_arg : gArgs.GetArgs("-whitebind")) {
163 2 : NetWhitebindPermissions whitebind;
164 2 : bilingual_str error;
165 2 : if (NetWhitebindPermissions::TryParse(whitebind_arg, whitebind, error)) {
166 2 : if (!NetPermissions::HasFlag(whitebind.m_flags, NetPermissionFlags::NoBan)) {
167 2 : return whitebind.m_service.GetPort();
168 : }
169 0 : }
170 2 : }
171 :
172 : // Otherwise, if -port= is provided, use that. Otherwise use the default port.
173 21360 : return static_cast<uint16_t>(gArgs.GetIntArg("-port", Params().GetDefaultPort()));
174 21365 : }
175 :
176 : // Determine the "best" local address for a particular peer.
177 9053 : [[nodiscard]] static std::optional<CService> GetLocal(const CNode& peer)
178 : {
179 9053 : if (!fListen) return std::nullopt;
180 :
181 9053 : std::optional<CService> addr;
182 9053 : int nBestScore = -1;
183 9053 : int nBestReachability = -1;
184 : {
185 9053 : LOCK(g_maplocalhost_mutex);
186 9269 : for (const auto& [local_addr, local_service_info] : mapLocalHost) {
187 : // For privacy reasons, don't advertise our privacy-network address
188 : // to other networks and don't advertise our other-network address
189 : // to privacy networks.
190 104 : if (local_addr.GetNetwork() != peer.ConnectedThroughNetwork()
191 71 : && (local_addr.IsPrivacyNet() || peer.IsConnectedThroughPrivacyNet())) {
192 36 : continue;
193 : }
194 35 : const int nScore{local_service_info.nScore};
195 35 : const int nReachability{local_addr.GetReachabilityFrom(peer.addr)};
196 35 : if (nReachability > nBestReachability || (nReachability == nBestReachability && nScore > nBestScore)) {
197 18 : addr.emplace(CService{local_addr, local_service_info.nPort});
198 18 : nBestReachability = nReachability;
199 18 : nBestScore = nScore;
200 18 : }
201 : }
202 9053 : }
203 9053 : return addr;
204 18106 : }
205 :
206 : //! Convert the serialized seeds into usable address objects.
207 6 : static std::vector<CAddress> ConvertSeeds(const std::vector<uint8_t> &vSeedsIn)
208 : {
209 : // It'll only connect to one or two seed nodes because once it connects,
210 : // it'll get a pile of addresses with newer timestamps.
211 : // Seed nodes are given a random 'last seen time' of between one and two
212 : // weeks ago.
213 6 : const auto one_week{7 * 24h};
214 6 : std::vector<CAddress> vSeedsOut;
215 6 : FastRandomContext rng;
216 6 : CDataStream s(vSeedsIn, SER_NETWORK, PROTOCOL_VERSION | ADDRV2_FORMAT);
217 6 : while (!s.eof()) {
218 0 : CService endpoint;
219 0 : s >> endpoint;
220 0 : CAddress addr{endpoint, GetDesirableServiceFlags(NODE_NONE)};
221 0 : addr.nTime = rng.rand_uniform_delay(Now<NodeSeconds>() - one_week, -one_week);
222 0 : LogPrint(BCLog::NET, "Added hardcoded seed: %s\n", addr.ToStringAddrPort());
223 0 : vSeedsOut.push_back(addr);
224 0 : }
225 6 : return vSeedsOut;
226 6 : }
227 :
228 : // Determine the "best" local address for a particular peer.
229 : // If none, return the unroutable 0.0.0.0 but filled in with
230 : // the normal parameters, since the IP may be changed to a useful
231 : // one by discovery.
232 9053 : CService GetLocalAddress(const CNode& peer)
233 : {
234 9053 : return GetLocal(peer).value_or(CService{CNetAddr(), GetListenPort()});
235 0 : }
236 :
237 0 : static int GetnScore(const CService& addr)
238 : {
239 0 : LOCK(g_maplocalhost_mutex);
240 0 : const auto it = mapLocalHost.find(addr);
241 0 : return (it != mapLocalHost.end()) ? it->second.nScore : 0;
242 0 : }
243 :
244 : // Is our peer's addrLocal potentially useful as an external IP source?
245 9032 : [[nodiscard]] static bool IsPeerAddrLocalGood(CNode *pnode)
246 : {
247 9032 : CService addrLocal = pnode->GetAddrLocal();
248 9032 : return fDiscover && pnode->addr.IsRoutable() && addrLocal.IsRoutable() &&
249 4 : g_reachable_nets.Contains(addrLocal);
250 9032 : }
251 :
252 9032 : std::optional<CService> GetLocalAddrForPeer(CNode& node)
253 : {
254 9032 : CService addrLocal{GetLocalAddress(node)};
255 : // If discovery is enabled, sometimes give our peer the address it
256 : // tells us that it sees us as in case it has a better idea of our
257 : // address than we do.
258 9032 : FastRandomContext rng;
259 9032 : if (IsPeerAddrLocalGood(&node) && (!addrLocal.IsRoutable() ||
260 0 : rng.randbits((GetnScore(addrLocal) > LOCAL_MANUAL) ? 3 : 1) == 0))
261 : {
262 4 : if (node.IsInboundConn()) {
263 : // For inbound connections, assume both the address and the port
264 : // as seen from the peer.
265 1 : addrLocal = CService{node.GetAddrLocal()};
266 1 : } else {
267 : // For outbound connections, assume just the address as seen from
268 : // the peer and leave the port in `addrLocal` as returned by
269 : // `GetLocalAddress()` above. The peer has no way to observe our
270 : // listening port when we have initiated the connection.
271 3 : addrLocal.SetIP(node.GetAddrLocal());
272 : }
273 4 : }
274 9032 : if (addrLocal.IsRoutable())
275 : {
276 4 : LogPrint(BCLog::NET, "Advertising address %s to peer=%d\n", addrLocal.ToStringAddrPort(), node.GetId());
277 4 : return addrLocal;
278 : }
279 : // Address is unroutable. Don't advertise.
280 9028 : return std::nullopt;
281 9032 : }
282 :
283 : // learn a new local address
284 59 : bool AddLocal(const CService& addr_, int nScore)
285 : {
286 59 : CService addr{MaybeFlipIPv6toCJDNS(addr_)};
287 :
288 59 : if (!addr.IsRoutable() && Params().RequireRoutableExternalIP())
289 0 : return false;
290 :
291 59 : if (!fDiscover && nScore < LOCAL_MANUAL)
292 0 : return false;
293 :
294 59 : if (!g_reachable_nets.Contains(addr))
295 0 : return false;
296 :
297 59 : LogPrintf("AddLocal(%s,%i)\n", addr.ToStringAddrPort(), nScore);
298 :
299 : {
300 59 : LOCK(g_maplocalhost_mutex);
301 59 : const auto [it, is_newly_added] = mapLocalHost.emplace(addr, LocalServiceInfo());
302 59 : LocalServiceInfo &info = it->second;
303 59 : if (is_newly_added || nScore >= info.nScore) {
304 118 : info.nScore = nScore + (is_newly_added ? 0 : 1);
305 59 : info.nPort = addr.GetPort();
306 59 : }
307 59 : }
308 :
309 59 : return true;
310 59 : }
311 :
312 48 : bool AddLocal(const CNetAddr &addr, int nScore)
313 : {
314 48 : return AddLocal(CService(addr, GetListenPort()), nScore);
315 0 : }
316 :
317 11 : void RemoveLocal(const CService& addr)
318 : {
319 11 : LOCK(g_maplocalhost_mutex);
320 11 : LogPrintf("RemoveLocal(%s)\n", addr.ToStringAddrPort());
321 11 : mapLocalHost.erase(addr);
322 11 : }
323 :
324 : /** vote for a local address */
325 0 : bool SeenLocal(const CService& addr)
326 : {
327 0 : LOCK(g_maplocalhost_mutex);
328 0 : const auto it = mapLocalHost.find(addr);
329 0 : if (it == mapLocalHost.end()) return false;
330 0 : ++it->second.nScore;
331 0 : return true;
332 0 : }
333 :
334 :
335 : /** check whether a given address is potentially local */
336 5 : bool IsLocal(const CService& addr)
337 : {
338 5 : LOCK(g_maplocalhost_mutex);
339 5 : return mapLocalHost.count(addr) > 0;
340 5 : }
341 :
342 3588 : bool CConnman::AlreadyConnectedToAddress(const CAddress& addr) const
343 : {
344 3588 : READ_LOCK(m_nodes_mutex);
345 3588 : return FindNode(static_cast<CService>(addr)) != nullptr;
346 3588 : }
347 :
348 4976 : bool CConnman::CheckIncomingNonce(uint64_t nonce) const
349 : {
350 4976 : READ_LOCK(m_nodes_mutex);
351 24773 : for (const CNode* pnode : m_nodes) {
352 19797 : if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && pnode->GetLocalNonce() == nonce)
353 0 : return false;
354 : }
355 4976 : return true;
356 4976 : }
357 :
358 : /** Get the bind address for a socket as CAddress */
359 11427 : static CAddress GetBindAddress(const Sock& sock)
360 : {
361 11427 : CAddress addr_bind;
362 : struct sockaddr_storage sockaddr_bind;
363 11427 : socklen_t sockaddr_bind_len = sizeof(sockaddr_bind);
364 11427 : if (!sock.GetSockName((struct sockaddr*)&sockaddr_bind, &sockaddr_bind_len)) {
365 11427 : addr_bind.SetSockAddr((const struct sockaddr*)&sockaddr_bind);
366 11427 : } else {
367 0 : LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "getsockname failed\n");
368 : }
369 11427 : return addr_bind;
370 11427 : }
371 :
372 5773 : CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport)
373 : {
374 5773 : AssertLockNotHeld(m_unused_i2p_sessions_mutex);
375 5773 : assert(conn_type != ConnectionType::INBOUND);
376 :
377 5773 : if (pszDest == nullptr) {
378 3582 : if (addrConnect.GetPort() == GetListenPort() && IsLocal(addrConnect)) {
379 0 : return nullptr;
380 : }
381 :
382 : // Look for an existing connection
383 3582 : if (ExistsNode(static_cast<CService>(addrConnect))) {
384 0 : LogPrintf("Failed to open new connection, already connected\n");
385 0 : return nullptr;
386 : }
387 3582 : }
388 :
389 : /// debug print
390 5773 : if (fLogIPs) {
391 12 : LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "trying %s connection %s lastseen=%.1fhrs\n",
392 : use_v2transport ? "v2" : "v1",
393 : pszDest ? pszDest : addrConnect.ToStringAddrPort(),
394 : Ticks<HoursDouble>(pszDest ? 0h : Now<NodeSeconds>() - addrConnect.nTime));
395 12 : } else {
396 5761 : LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "trying %s connection lastseen=%.1fhrs\n",
397 : use_v2transport ? "v2" : "v1",
398 : Ticks<HoursDouble>(pszDest ? 0h : Now<NodeSeconds>() - addrConnect.nTime));
399 : }
400 :
401 : // Resolve
402 9355 : const uint16_t default_port{pszDest != nullptr ? Params().GetDefaultPort(pszDest) :
403 3582 : Params().GetDefaultPort()};
404 5773 : if (pszDest) {
405 2191 : std::vector<CService> resolved{Lookup(pszDest, default_port, fNameLookup && !HaveNameProxy(), 256)};
406 2191 : if (!resolved.empty()) {
407 2165 : Shuffle(resolved.begin(), resolved.end(), FastRandomContext());
408 : // If the connection is made by name, it can be the case that the name resolves to more than one address.
409 : // We don't want to connect any more of them if we are already connected to one
410 4323 : for (const auto& r : resolved) {
411 2171 : addrConnect = CAddress{MaybeFlipIPv6toCJDNS(r), NODE_NONE};
412 2171 : if (!addrConnect.IsValid()) {
413 3 : LogPrint(BCLog::NET, "Resolver returned invalid address %s for %s\n", addrConnect.ToStringAddrPort(), pszDest);
414 3 : return nullptr;
415 : }
416 : // It is possible that we already have a connection to the IP/port pszDest resolved to.
417 : // In that case, drop the connection that was just created.
418 2168 : if (ExistsNode(static_cast<CService>(addrConnect))) {
419 10 : LogPrintf("Not opening a connection to %s, already connected to %s\n", pszDest, addrConnect.ToStringAddrPort());
420 10 : return nullptr;
421 : }
422 : }
423 2152 : }
424 2191 : }
425 :
426 : // Connect
427 5760 : std::unique_ptr<Sock> sock;
428 5760 : Proxy proxy;
429 5760 : CAddress addr_bind;
430 5760 : assert(!addr_bind.IsValid());
431 5760 : std::unique_ptr<i2p::sam::Session> i2p_transient_session;
432 :
433 5760 : if (addrConnect.IsValid()) {
434 5174 : const bool use_proxy{GetProxy(addrConnect.GetNetwork(), proxy)};
435 5174 : bool proxyConnectionFailed = false;
436 :
437 5174 : if (addrConnect.IsI2P() && use_proxy) {
438 8 : i2p::Connection conn;
439 8 : bool connected{false};
440 :
441 8 : if (m_i2p_sam_session) {
442 6 : connected = m_i2p_sam_session->Connect(addrConnect, conn, proxyConnectionFailed);
443 6 : } else {
444 : {
445 2 : LOCK(m_unused_i2p_sessions_mutex);
446 2 : if (m_unused_i2p_sessions.empty()) {
447 2 : i2p_transient_session =
448 2 : std::make_unique<i2p::sam::Session>(proxy, &interruptNet);
449 2 : } else {
450 0 : i2p_transient_session.swap(m_unused_i2p_sessions.front());
451 0 : m_unused_i2p_sessions.pop();
452 : }
453 2 : }
454 2 : connected = i2p_transient_session->Connect(addrConnect, conn, proxyConnectionFailed);
455 2 : if (!connected) {
456 2 : LOCK(m_unused_i2p_sessions_mutex);
457 2 : if (m_unused_i2p_sessions.size() < MAX_UNUSED_I2P_SESSIONS_SIZE) {
458 2 : m_unused_i2p_sessions.emplace(i2p_transient_session.release());
459 2 : }
460 2 : }
461 : }
462 :
463 8 : if (connected) {
464 0 : sock = std::move(conn.sock);
465 0 : addr_bind = CAddress{conn.me, NODE_NONE};
466 0 : }
467 5174 : } else if (use_proxy) {
468 26 : LogPrintLevel(BCLog::PROXY, BCLog::Level::Debug, "Using proxy: %s to connect to %s:%s\n", proxy.ToString(), addrConnect.ToStringAddr(), addrConnect.GetPort());
469 26 : sock = ConnectThroughProxy(proxy, addrConnect.ToStringAddr(), addrConnect.GetPort(), proxyConnectionFailed);
470 26 : } else {
471 : // no proxy needed (none set for target network)
472 5140 : sock = ConnectDirectly(addrConnect, conn_type == ConnectionType::MANUAL);
473 : }
474 5174 : if (!proxyConnectionFailed) {
475 : // If a connection to the node was attempted, and failure (if any) is not caused by a problem connecting to
476 : // the proxy, mark this as an attempt.
477 5168 : addrman.Attempt(addrConnect, fCountFailure);
478 5168 : }
479 5760 : } else if (pszDest && GetNameProxy(proxy)) {
480 14 : std::string host;
481 14 : uint16_t port{default_port};
482 14 : SplitHostPort(std::string(pszDest), port, host);
483 : bool proxyConnectionFailed;
484 14 : sock = ConnectThroughProxy(proxy, host, port, proxyConnectionFailed);
485 14 : }
486 5760 : if (!sock) {
487 971 : return nullptr;
488 : }
489 :
490 : // Add node
491 4789 : NodeId id = GetNewNodeId();
492 4789 : uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
493 4789 : if (!addr_bind.IsValid()) {
494 4789 : addr_bind = GetBindAddress(*sock);
495 4789 : }
496 9578 : CNode* pnode = new CNode(id,
497 4789 : std::move(sock),
498 : addrConnect,
499 4789 : CalculateKeyedNetGroup(addrConnect),
500 4789 : nonce,
501 : addr_bind,
502 4789 : pszDest ? pszDest : "",
503 4789 : conn_type,
504 : /*inbound_onion=*/false,
505 19156 : CNodeOptions{
506 : .i2p_sam_session = std::move(i2p_transient_session),
507 : .recv_flood_size = nReceiveFloodSize,
508 : .use_v2transport = use_v2transport,
509 : });
510 : pnode->AddRef();
511 : ::g_stats_client->inc("peers.connect", 1.0f);
512 :
513 : // We're making a new connection, harvest entropy from the time (and our peer count)
514 : RandAddEvent((uint32_t)id);
515 :
516 : return pnode;
517 0 : }
518 :
519 14819 : void CNode::CloseSocketDisconnect(CConnman* connman)
520 : {
521 14819 : fDisconnect = true;
522 14819 : LOCK2(connman->cs_mapSocketToNode, m_sock_mutex);
523 14819 : if (!m_sock) {
524 4872 : return;
525 : }
526 :
527 9947 : fHasRecvData = false;
528 9947 : fCanSendData = false;
529 :
530 9947 : connman->mapSocketToNode.erase(m_sock->Get());
531 : {
532 9947 : LOCK(connman->cs_sendable_receivable_nodes);
533 9947 : connman->mapReceivableNodes.erase(GetId());
534 9947 : connman->mapSendableNodes.erase(GetId());
535 9947 : }
536 :
537 9947 : if (connman->m_edge_trig_events) {
538 9947 : connman->m_edge_trig_events->UnregisterEvents(m_sock->Get());
539 9947 : }
540 :
541 9947 : LogPrint(BCLog::NET, "disconnecting peer=%d\n", id);
542 9947 : m_sock.reset();
543 9947 : m_i2p_sam_session.reset();
544 :
545 9947 : ::g_stats_client->inc("peers.disconnect", 1.0f);
546 14819 : }
547 :
548 6638 : void CConnman::AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr) const {
549 7150 : for (const auto& subnet : vWhitelistedRange) {
550 512 : if (subnet.m_subnet.Match(addr)) NetPermissions::AddFlag(flags, subnet.m_flags);
551 : }
552 6638 : }
553 :
554 2365089 : bool CNode::IsBlockRelayOnly() const {
555 2365089 : bool ignores_incoming_txs{gArgs.GetBoolArg("-blocksonly", DEFAULT_BLOCKSONLY)};
556 : // Stop processing non-block data early if
557 : // 1) We are in blocks only mode and peer has no relay permission
558 : // 2) This peer is a block-relay-only peer
559 2365089 : return (ignores_incoming_txs && !HasPermission(NetPermissionFlags::Relay)) || IsBlockOnlyConn();
560 0 : }
561 :
562 95175 : CService CNode::GetAddrLocal() const
563 : {
564 95175 : AssertLockNotHeld(m_addr_local_mutex);
565 95175 : LOCK(m_addr_local_mutex);
566 95175 : return addrLocal;
567 95175 : }
568 :
569 9011 : void CNode::SetAddrLocal(const CService& addrLocalIn) {
570 9011 : AssertLockNotHeld(m_addr_local_mutex);
571 9011 : LOCK(m_addr_local_mutex);
572 9011 : if (addrLocal.IsValid()) {
573 0 : error("Addr local already set for node: %i. Refusing to change from %s to %s", id, addrLocal.ToStringAddrPort(), addrLocalIn.ToStringAddrPort());
574 0 : } else {
575 9011 : addrLocal = addrLocalIn;
576 : }
577 9011 : }
578 :
579 942 : std::string CNode::GetLogString() const
580 : {
581 942 : return fLogIPs ? addr.ToStringAddrPort() : strprintf("%d", id);
582 : }
583 :
584 91153 : Network CNode::ConnectedThroughNetwork() const
585 : {
586 91153 : return m_inbound_onion ? NET_ONION : addr.GetNetClass();
587 : }
588 :
589 33 : bool CNode::IsConnectedThroughPrivacyNet() const
590 : {
591 33 : return m_inbound_onion || addr.IsPrivacyNet();
592 : }
593 :
594 : #undef X
595 : #define X(name) stats.name = name
596 86139 : void CNode::CopyStats(CNodeStats& stats) const
597 : {
598 86139 : stats.nodeid = this->GetId();
599 86139 : X(addr);
600 86139 : X(addrBind);
601 86139 : stats.m_network = ConnectedThroughNetwork();
602 86139 : X(m_last_send);
603 86139 : X(m_last_recv);
604 86139 : X(m_last_tx_time);
605 86139 : X(m_last_block_time);
606 86139 : X(m_connected);
607 86139 : X(nTimeOffset);
608 86139 : X(m_addr_name);
609 86139 : X(nVersion);
610 : {
611 86139 : LOCK(m_subver_mutex);
612 86139 : X(cleanSubVer);
613 86139 : }
614 86139 : stats.fInbound = IsInboundConn();
615 86139 : X(m_bip152_highbandwidth_to);
616 86139 : X(m_bip152_highbandwidth_from);
617 : {
618 86139 : LOCK(cs_vSend);
619 86139 : X(mapSendBytesPerMsgType);
620 86139 : X(nSendBytes);
621 86139 : }
622 : {
623 86139 : LOCK(cs_vRecv);
624 86139 : X(mapRecvBytesPerMsgType);
625 86139 : X(nRecvBytes);
626 86139 : Transport::Info info = m_transport->GetInfo();
627 86139 : stats.m_transport_type = info.transport_type;
628 86139 : if (info.session_id) stats.m_session_id = HexStr(*info.session_id);
629 86139 : }
630 86139 : X(m_permission_flags);
631 :
632 86139 : X(m_last_ping_time);
633 86139 : X(m_min_ping_time);
634 :
635 : // Leave string empty if addrLocal invalid (not filled in yet)
636 86139 : CService addrLocalUnlocked = GetAddrLocal();
637 86139 : stats.addrLocal = addrLocalUnlocked.IsValid() ? addrLocalUnlocked.ToStringAddrPort() : "";
638 :
639 : {
640 86139 : LOCK(cs_mnauth);
641 86139 : X(verifiedProRegTxHash);
642 86139 : X(verifiedPubKeyHash);
643 86139 : }
644 86139 : X(m_masternode_connection);
645 86139 : X(m_conn_type);
646 86139 : }
647 : #undef X
648 :
649 793101 : bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
650 : {
651 793101 : complete = false;
652 793101 : const auto time = GetTime<std::chrono::microseconds>();
653 793101 : LOCK(cs_vRecv);
654 793101 : m_last_recv = std::chrono::duration_cast<std::chrono::seconds>(time);
655 793101 : nRecvBytes += msg_bytes.size();
656 2451356 : while (msg_bytes.size() > 0) {
657 : // absorb network data
658 1658275 : if (!m_transport->ReceivedBytes(msg_bytes)) {
659 : // Serious transport problem, disconnect from the peer.
660 20 : return false;
661 : }
662 :
663 1658255 : if (m_transport->ReceivedMessageComplete()) {
664 : // decompose a transport agnostic CNetMessage from the deserializer
665 821059 : bool reject_message{false};
666 821059 : CNetMessage msg = m_transport->GetReceivedMessage(time, reject_message);
667 821059 : if (reject_message) {
668 : // Message deserialization failed. Drop the message but don't disconnect the peer.
669 : // store the size of the corrupt message
670 164 : mapRecvBytesPerMsgType.at(NET_MESSAGE_TYPE_OTHER) += msg.m_raw_message_size;
671 164 : continue;
672 : }
673 :
674 : // Store received bytes per message type.
675 : // To prevent a memory DOS, only allow known message types.
676 820895 : auto i = mapRecvBytesPerMsgType.find(msg.m_type);
677 820895 : if (i == mapRecvBytesPerMsgType.end()) {
678 12 : i = mapRecvBytesPerMsgType.find(NET_MESSAGE_TYPE_OTHER);
679 12 : }
680 820895 : assert(i != mapRecvBytesPerMsgType.end());
681 820895 : i->second += msg.m_raw_message_size;
682 820895 : ::g_stats_client->count("bandwidth.message." + std::string(msg.m_type) + ".bytesReceived", msg.m_raw_message_size, 1.0f);
683 :
684 : // push the message to the process queue,
685 820895 : vRecvMsg.push_back(std::move(msg));
686 :
687 820895 : complete = true;
688 821059 : }
689 : }
690 :
691 793081 : return true;
692 793101 : }
693 :
694 40262 : V1Transport::V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noexcept :
695 20130 : m_node_id(node_id), hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn)
696 20130 : {
697 : assert(std::size(Params().MessageStart()) == std::size(m_magic_bytes));
698 : std::copy(std::begin(Params().MessageStart()), std::end(Params().MessageStart()), m_magic_bytes);
699 : LOCK(m_recv_mutex);
700 : Reset();
701 10065 : }
702 :
703 91516 : Transport::Info V1Transport::GetInfo() const noexcept
704 : {
705 91516 : return {.transport_type = TransportProtocolType::V1, .session_id = {}};
706 : }
707 :
708 807987 : int V1Transport::readHeader(Span<const uint8_t> msg_bytes)
709 : {
710 807987 : AssertLockHeld(m_recv_mutex);
711 : // copy data to temporary parsing buffer
712 807987 : unsigned int nRemaining = CMessageHeader::HEADER_SIZE - nHdrPos;
713 807987 : unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size());
714 :
715 807987 : memcpy(&hdrbuf[nHdrPos], msg_bytes.data(), nCopy);
716 807987 : nHdrPos += nCopy;
717 :
718 : // if header incomplete, exit
719 807987 : if (nHdrPos < CMessageHeader::HEADER_SIZE)
720 20 : return nCopy;
721 :
722 : // deserialize to CMessageHeader
723 : try {
724 807967 : hdrbuf >> hdr;
725 807967 : }
726 : catch (const std::exception&) {
727 0 : LogPrint(BCLog::NET, "Header error: Unable to deserialize, peer=%d\n", m_node_id);
728 0 : return -1;
729 0 : }
730 :
731 : // Check start string, network magic
732 807967 : if (memcmp(hdr.pchMessageStart, m_magic_bytes, CMessageHeader::MESSAGE_START_SIZE) != 0) {
733 4 : LogPrint(BCLog::NET, "Header error: Wrong MessageStart %s received, peer=%d\n", HexStr(hdr.pchMessageStart), m_node_id);
734 4 : return -1;
735 : }
736 :
737 : // reject messages larger than MAX_SIZE or MAX_PROTOCOL_MESSAGE_LENGTH
738 807963 : if (hdr.nMessageSize > MAX_SIZE || hdr.nMessageSize > MAX_PROTOCOL_MESSAGE_LENGTH) {
739 6 : LogPrint(BCLog::NET, "Header error: Size too large (%s, %u bytes), peer=%d\n", SanitizeString(hdr.GetCommand()), hdr.nMessageSize, m_node_id);
740 6 : return -1;
741 : }
742 :
743 : // switch state to reading message data
744 807957 : in_data = true;
745 :
746 807957 : return nCopy;
747 807987 : }
748 :
749 836375 : int V1Transport::readData(Span<const uint8_t> msg_bytes)
750 : {
751 836375 : AssertLockHeld(m_recv_mutex);
752 836375 : unsigned int nRemaining = hdr.nMessageSize - nDataPos;
753 836375 : unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size());
754 :
755 836375 : if (vRecv.size() < nDataPos + nCopy) {
756 : // Allocate up to 256 KiB ahead, but never more than the total message size.
757 789444 : vRecv.resize(std::min(hdr.nMessageSize, nDataPos + nCopy + 256 * 1024));
758 789444 : }
759 :
760 836375 : hasher.Write(msg_bytes.first(nCopy));
761 836375 : memcpy(&vRecv[nDataPos], msg_bytes.data(), nCopy);
762 836375 : nDataPos += nCopy;
763 :
764 836375 : return nCopy;
765 : }
766 :
767 807956 : const uint256& V1Transport::GetMessageHash() const
768 : {
769 807956 : AssertLockHeld(m_recv_mutex);
770 807956 : assert(CompleteInternal());
771 807956 : if (data_hash.IsNull())
772 807956 : hasher.Finalize(data_hash);
773 807956 : return data_hash;
774 : }
775 :
776 807956 : CNetMessage V1Transport::GetReceivedMessage(const std::chrono::microseconds time, bool& reject_message)
777 : {
778 807956 : AssertLockNotHeld(m_recv_mutex);
779 : // Initialize out parameter
780 807956 : reject_message = false;
781 : // decompose a single CNetMessage from the TransportDeserializer
782 807956 : LOCK(m_recv_mutex);
783 807956 : CNetMessage msg(std::move(vRecv));
784 :
785 : // store message type string, time, and sizes
786 807956 : msg.m_type = hdr.GetCommand();
787 807956 : msg.m_time = time;
788 807956 : msg.m_message_size = hdr.nMessageSize;
789 807956 : msg.m_raw_message_size = hdr.nMessageSize + CMessageHeader::HEADER_SIZE;
790 :
791 807956 : uint256 hash = GetMessageHash();
792 :
793 : // We just received a message off the wire, harvest entropy from the time (and the message checksum)
794 807956 : RandAddEvent(ReadLE32(hash.begin()));
795 :
796 : // Check checksum and header message type string
797 807956 : if (memcmp(hash.begin(), hdr.pchChecksum, CMessageHeader::CHECKSUM_SIZE) != 0) {
798 2 : LogPrint(BCLog::NET, "Header error: Wrong checksum (%s, %u bytes), expected %s was %s, peer=%d\n",
799 : SanitizeString(msg.m_type), msg.m_message_size,
800 : HexStr(Span{hash}.first(CMessageHeader::CHECKSUM_SIZE)),
801 : HexStr(hdr.pchChecksum),
802 : m_node_id);
803 2 : reject_message = true;
804 807956 : } else if (!hdr.IsCommandValid()) {
805 162 : LogPrint(BCLog::NET, "Header error: Invalid message type (%s, %u bytes), peer=%d\n",
806 : SanitizeString(hdr.GetCommand()), msg.m_message_size, m_node_id);
807 162 : reject_message = true;
808 162 : }
809 :
810 : // Always reset the network deserializer (prepare for the next message)
811 807956 : Reset();
812 807956 : return msg;
813 807956 : }
814 :
815 827966 : bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
816 : {
817 827966 : AssertLockNotHeld(m_send_mutex);
818 : // Determine whether a new message can be set.
819 827966 : LOCK(m_send_mutex);
820 827966 : if (m_sending_header || m_bytes_sent < m_message_to_send.data.size()) return false;
821 :
822 : // create dbl-sha256 checksum
823 790584 : uint256 hash = Hash(msg.data);
824 :
825 : // create header
826 790584 : CMessageHeader hdr(m_magic_bytes, msg.m_type.c_str(), msg.data.size());
827 790584 : memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE);
828 :
829 : // serialize header
830 790584 : m_header_to_send.clear();
831 790584 : CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, m_header_to_send, 0, hdr};
832 :
833 : // update state
834 790584 : m_message_to_send = std::move(msg);
835 790584 : m_sending_header = true;
836 790584 : m_bytes_sent = 0;
837 790584 : return true;
838 827966 : }
839 :
840 82078554 : Transport::BytesToSend V1Transport::GetBytesToSend(bool have_next_message) const noexcept
841 : {
842 82080358 : AssertLockNotHeld(m_send_mutex);
843 82076750 : LOCK(m_send_mutex);
844 82077387 : if (m_sending_header) {
845 1534063 : return {Span{m_header_to_send}.subspan(m_bytes_sent),
846 : // We have more to send after the header if the message has payload, or if there
847 : // is a next message after that.
848 790596 : have_next_message || !m_message_to_send.data.empty(),
849 790596 : m_message_to_send.m_type
850 : };
851 : } else {
852 81286791 : return {Span{m_message_to_send.data}.subspan(m_bytes_sent),
853 : // We only have more to send after this message's payload if there is another
854 : // message.
855 : have_next_message,
856 81287370 : m_message_to_send.m_type
857 : };
858 : }
859 82077966 : }
860 :
861 1549551 : void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept
862 : {
863 1549551 : AssertLockNotHeld(m_send_mutex);
864 1549551 : LOCK(m_send_mutex);
865 1549551 : m_bytes_sent += bytes_sent;
866 1549551 : if (m_sending_header && m_bytes_sent == m_header_to_send.size()) {
867 : // We're done sending a message's header. Switch to sending its data bytes.
868 790583 : m_sending_header = false;
869 790583 : m_bytes_sent = 0;
870 1549551 : } else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) {
871 : // We're done sending a message's data. Wipe the data vector to reduce memory consumption.
872 758893 : ClearShrink(m_message_to_send.data);
873 758893 : m_bytes_sent = 0;
874 758893 : }
875 1549551 : }
876 :
877 36667483 : size_t V1Transport::GetSendMemoryUsage() const noexcept
878 : {
879 36667631 : AssertLockNotHeld(m_send_mutex);
880 36667393 : LOCK(m_send_mutex);
881 : // Don't count sending-side fields besides m_message_to_send, as they're all small and bounded.
882 36667245 : return m_message_to_send.GetMemoryUsage();
883 36667245 : }
884 :
885 : namespace {
886 :
887 : /** List of short messages as defined in BIP324, in order.
888 : *
889 : * Only message types that are actually implemented in this codebase need to be listed, as other
890 : * messages get ignored anyway - whether we know how to decode them or not.
891 : */
892 0 : const std::array<std::string, 33> V2_BITCOIN_IDS = {
893 3308 : "", // 12 bytes follow encoding the message type like in V1
894 3308 : NetMsgType::ADDR,
895 3308 : NetMsgType::BLOCK,
896 3308 : NetMsgType::BLOCKTXN,
897 3308 : NetMsgType::CMPCTBLOCK,
898 3308 : "", /* FEEFILTER is not implemented in Dash */
899 3308 : NetMsgType::FILTERADD,
900 3308 : NetMsgType::FILTERCLEAR,
901 3308 : NetMsgType::FILTERLOAD,
902 3308 : NetMsgType::GETBLOCKS,
903 3308 : NetMsgType::GETBLOCKTXN,
904 3308 : NetMsgType::GETDATA,
905 3308 : NetMsgType::GETHEADERS,
906 3308 : NetMsgType::HEADERS,
907 3308 : NetMsgType::INV,
908 3308 : NetMsgType::MEMPOOL,
909 3308 : NetMsgType::MERKLEBLOCK,
910 3308 : NetMsgType::NOTFOUND,
911 3308 : NetMsgType::PING,
912 3308 : NetMsgType::PONG,
913 3308 : NetMsgType::SENDCMPCT,
914 3308 : NetMsgType::TX,
915 3308 : NetMsgType::GETCFILTERS,
916 3308 : NetMsgType::CFILTER,
917 3308 : NetMsgType::GETCFHEADERS,
918 3308 : NetMsgType::CFHEADERS,
919 3308 : NetMsgType::GETCFCHECKPT,
920 3308 : NetMsgType::CFCHECKPT,
921 3308 : NetMsgType::ADDRV2,
922 : // Unimplemented message types that are assigned in BIP324:
923 3308 : "",
924 3308 : "",
925 3308 : "",
926 3308 : ""
927 : };
928 :
929 : /** List of short messages allocated in Dash's reserved namespace, in order.
930 : *
931 : * Slots should not be reused unless the switchover has already been done
932 : * by a protocol upgrade, the old message is no longer supported by the client
933 : * and a new slot wasn't already allotted for the message.
934 : */
935 0 : const std::array<std::string, 41> V2_DASH_IDS = {
936 3308 : NetMsgType::SPORK,
937 3308 : NetMsgType::GETSPORKS,
938 3308 : NetMsgType::SENDDSQUEUE,
939 3308 : NetMsgType::DSACCEPT,
940 3308 : NetMsgType::DSVIN,
941 3308 : NetMsgType::DSFINALTX,
942 3308 : NetMsgType::DSSIGNFINALTX,
943 3308 : NetMsgType::DSCOMPLETE,
944 3308 : NetMsgType::DSSTATUSUPDATE,
945 3308 : NetMsgType::DSTX,
946 3308 : NetMsgType::DSQUEUE,
947 3308 : NetMsgType::SYNCSTATUSCOUNT,
948 3308 : NetMsgType::MNGOVERNANCESYNC,
949 3308 : NetMsgType::MNGOVERNANCEOBJECT,
950 3308 : NetMsgType::MNGOVERNANCEOBJECTVOTE,
951 3308 : NetMsgType::GETMNLISTDIFF,
952 3308 : NetMsgType::MNLISTDIFF,
953 3308 : NetMsgType::QSENDRECSIGS,
954 3308 : NetMsgType::QFCOMMITMENT,
955 3308 : NetMsgType::QCONTRIB,
956 3308 : NetMsgType::QCOMPLAINT,
957 3308 : NetMsgType::QJUSTIFICATION,
958 3308 : NetMsgType::QPCOMMITMENT,
959 3308 : NetMsgType::QWATCH,
960 3308 : NetMsgType::QSIGSESANN,
961 3308 : NetMsgType::QSIGSHARESINV,
962 3308 : NetMsgType::QGETSIGSHARES,
963 3308 : NetMsgType::QBSIGSHARES,
964 3308 : NetMsgType::QSIGREC,
965 3308 : NetMsgType::QSIGSHARE,
966 3308 : NetMsgType::QGETDATA,
967 3308 : NetMsgType::QDATA,
968 3308 : NetMsgType::CLSIG,
969 3308 : NetMsgType::ISDLOCK,
970 3308 : NetMsgType::MNAUTH,
971 3308 : NetMsgType::GETHEADERS2,
972 3308 : NetMsgType::SENDHEADERS2,
973 3308 : NetMsgType::HEADERS2,
974 3308 : NetMsgType::GETQUORUMROTATIONINFO,
975 3308 : NetMsgType::QUORUMROTATIONINFO,
976 3308 : NetMsgType::PLATFORMBAN
977 : };
978 :
979 : /** Explicit version requirements for Dash v2 short IDs added after baseline.
980 : *
981 : * Maps message type to the minimum protocol version required to use its short ID.
982 : * Only contains messages that were added AFTER the initial v2 implementation.
983 : * Messages not in this map are baseline messages available at BIP324_DASH_BASELINE_VERSION.
984 : *
985 : * IMPORTANT:
986 : * - Never remove entries (historical compatibility)
987 : * - Add new messages as they are introduced
988 : */
989 3308 : static const std::map<std::string, int> V2_NEW_SHORT_ID_VERSIONS = {
990 3308 : {NetMsgType::PLATFORMBAN, PLATFORMBAN_V2_SHORT_ID_VERSION},
991 : };
992 :
993 : /** Get the minimum protocol version required for a message's short ID.
994 : *
995 : * @param message_type The message type to check
996 : * @return Minimum protocol version required
997 : */
998 13617 : int GetMessageMinVersion(const std::string& message_type)
999 : {
1000 13617 : auto it = V2_NEW_SHORT_ID_VERSIONS.find(message_type);
1001 13617 : if (it != V2_NEW_SHORT_ID_VERSIONS.end()) {
1002 2 : return it->second;
1003 : }
1004 : // Not in map means it's a baseline message
1005 13615 : return BIP324_DASH_BASELINE_VERSION;
1006 13617 : }
1007 :
1008 : /** A complete set of short IDs
1009 : *
1010 : * Bitcoin takes up short IDs up to 128 (lower half) while Dash can take
1011 : * up short IDs between 128 and 256 (upper half) most of the array will
1012 : * have entries that correspond to nothing.
1013 : *
1014 : * To distinguish between entries that are *meant* to correspond to
1015 : * nothing versus empty space, use IsValidV2ShortID()
1016 : */
1017 1100695 : constexpr std::array<std::string_view, 256> V2ShortIDs() {
1018 : static_assert(std::size(V2_BITCOIN_IDS) <= 128);
1019 : static_assert(std::size(V2_DASH_IDS) <= 128);
1020 :
1021 1100695 : std::array<std::string_view, 256> ret{};
1022 1100695 : std::fill(ret.begin(), ret.end(), "");
1023 1100695 : std::copy(V2_BITCOIN_IDS.begin(), V2_BITCOIN_IDS.end(), ret.begin());
1024 1100695 : std::copy(V2_DASH_IDS.begin(), V2_DASH_IDS.end(), ret.begin() + 128);
1025 1100695 : return ret;
1026 : }
1027 :
1028 855904 : bool IsValidV2ShortID(uint8_t first_byte) {
1029 : // Since we have filled the namespace of short IDs, we have to preserve
1030 : // the expected behaviour of coming up short when going beyond Bitcoin's
1031 : // and Dash's *used* slots. We do this by checking if the byte is within
1032 : // the range where a valid message is expected to reside.
1033 1594544 : return first_byte < std::size(V2_BITCOIN_IDS) ||
1034 738640 : (first_byte >= 128 && static_cast<uint8_t>(first_byte - 128) < std::size(V2_DASH_IDS));
1035 : }
1036 :
1037 : class V2MessageMap
1038 : {
1039 : std::unordered_map<std::string, uint8_t> m_map;
1040 :
1041 : public:
1042 6616 : V2MessageMap() noexcept
1043 3308 : {
1044 846848 : for (size_t i = 1; i < std::size(V2ShortIDs()); ++i) {
1045 843540 : if (IsValidV2ShortID(i)) {
1046 241484 : m_map.emplace(V2ShortIDs()[i], i);
1047 241484 : }
1048 843540 : }
1049 6616 : }
1050 :
1051 14574 : std::optional<uint8_t> operator()(const std::string& message_name) const noexcept
1052 : {
1053 14574 : auto it = m_map.find(message_name);
1054 14574 : if (it == m_map.end()) return std::nullopt;
1055 13617 : return it->second;
1056 14574 : }
1057 : };
1058 :
1059 3308 : const V2MessageMap V2_MESSAGE_MAP;
1060 :
1061 379 : std::vector<uint8_t> GenerateRandomGarbage() noexcept
1062 : {
1063 379 : std::vector<uint8_t> ret;
1064 379 : FastRandomContext rng;
1065 379 : ret.resize(rng.randrange(V2Transport::MAX_GARBAGE_LEN + 1));
1066 379 : rng.fillrand(MakeWritableByteSpan(ret));
1067 379 : return ret;
1068 379 : }
1069 :
1070 : } // namespace
1071 :
1072 366 : void V2Transport::StartSendingHandshake() noexcept
1073 : {
1074 366 : AssertLockHeld(m_send_mutex);
1075 366 : Assume(m_send_state == SendState::AWAITING_KEY);
1076 366 : Assume(m_send_buffer.empty());
1077 : // Initialize the send buffer with ellswift pubkey + provided garbage.
1078 366 : m_send_buffer.resize(EllSwiftPubKey::size() + m_send_garbage.size());
1079 366 : std::copy(std::begin(m_cipher.GetOurPubKey()), std::end(m_cipher.GetOurPubKey()), MakeWritableByteSpan(m_send_buffer).begin());
1080 366 : std::copy(m_send_garbage.begin(), m_send_garbage.end(), m_send_buffer.begin() + EllSwiftPubKey::size());
1081 : // We cannot wipe m_send_garbage as it will still be used as AAD later in the handshake.
1082 366 : }
1083 :
1084 1137 : V2Transport::V2Transport(NodeId nodeid, bool initiating, int type_in, int version_in, const CKey& key, Span<const std::byte> ent32, std::vector<uint8_t> garbage) noexcept :
1085 379 : m_cipher{key, ent32}, m_initiating{initiating}, m_nodeid{nodeid},
1086 379 : m_v1_fallback{nodeid, type_in, version_in}, m_recv_type{type_in}, m_recv_version{version_in},
1087 : m_recv_state{initiating ? RecvState::KEY : RecvState::KEY_MAYBE_V1},
1088 : m_send_garbage{std::move(garbage)},
1089 : m_send_state{initiating ? SendState::AWAITING_KEY : SendState::MAYBE_V1}
1090 758 : {
1091 : Assume(m_send_garbage.size() <= MAX_GARBAGE_LEN);
1092 : // Start sending immediately if we're the initiator of the connection.
1093 : if (initiating) {
1094 : LOCK(m_send_mutex);
1095 : StartSendingHandshake();
1096 : }
1097 379 : }
1098 :
1099 379 : V2Transport::V2Transport(NodeId nodeid, bool initiating, int type_in, int version_in) noexcept :
1100 758 : V2Transport{nodeid, initiating, type_in, version_in, GenerateRandomKey(),
1101 758 : MakeByteSpan(GetRandHash()), GenerateRandomGarbage()} { }
1102 :
1103 27976 : void V2Transport::SetReceiveState(RecvState recv_state) noexcept
1104 : {
1105 27976 : AssertLockHeld(m_recv_mutex);
1106 : // Enforce allowed state transitions.
1107 27976 : switch (m_recv_state) {
1108 : case RecvState::KEY_MAYBE_V1:
1109 219 : Assume(recv_state == RecvState::KEY || recv_state == RecvState::V1);
1110 219 : break;
1111 : case RecvState::KEY:
1112 357 : Assume(recv_state == RecvState::GARB_GARBTERM);
1113 357 : break;
1114 : case RecvState::GARB_GARBTERM:
1115 347 : Assume(recv_state == RecvState::VERSION);
1116 347 : break;
1117 : case RecvState::VERSION:
1118 343 : Assume(recv_state == RecvState::APP);
1119 343 : break;
1120 : case RecvState::APP:
1121 13355 : Assume(recv_state == RecvState::APP_READY);
1122 13355 : break;
1123 : case RecvState::APP_READY:
1124 13355 : Assume(recv_state == RecvState::APP);
1125 13355 : break;
1126 : case RecvState::V1:
1127 0 : Assume(false); // V1 state cannot be left
1128 0 : break;
1129 : }
1130 : // Change state.
1131 27976 : m_recv_state = recv_state;
1132 27976 : }
1133 :
1134 576 : void V2Transport::SetSendState(SendState send_state) noexcept
1135 : {
1136 576 : AssertLockHeld(m_send_mutex);
1137 : // Enforce allowed state transitions.
1138 576 : switch (m_send_state) {
1139 : case SendState::MAYBE_V1:
1140 219 : Assume(send_state == SendState::V1 || send_state == SendState::AWAITING_KEY);
1141 219 : break;
1142 : case SendState::AWAITING_KEY:
1143 357 : Assume(send_state == SendState::READY);
1144 357 : break;
1145 : case SendState::READY:
1146 : case SendState::V1:
1147 0 : Assume(false); // Final states
1148 0 : break;
1149 : }
1150 : // Change state.
1151 576 : m_send_state = send_state;
1152 576 : }
1153 :
1154 126436 : bool V2Transport::ReceivedMessageComplete() const noexcept
1155 : {
1156 126436 : AssertLockNotHeld(m_recv_mutex);
1157 126436 : LOCK(m_recv_mutex);
1158 126436 : if (m_recv_state == RecvState::V1) return m_v1_fallback.ReceivedMessageComplete();
1159 :
1160 117445 : return m_recv_state == RecvState::APP_READY;
1161 126436 : }
1162 :
1163 223 : void V2Transport::ProcessReceivedMaybeV1Bytes() noexcept
1164 : {
1165 223 : AssertLockHeld(m_recv_mutex);
1166 223 : AssertLockNotHeld(m_send_mutex);
1167 223 : Assume(m_recv_state == RecvState::KEY_MAYBE_V1);
1168 : // We still have to determine if this is a v1 or v2 connection. The bytes being received could
1169 : // be the beginning of either a v1 packet (network magic + "version\x00\x00\x00\x00\x00"), or
1170 : // of a v2 public key. BIP324 specifies that a mismatch with this 16-byte string should trigger
1171 : // sending of the key.
1172 223 : std::array<uint8_t, V1_PREFIX_LEN> v1_prefix = {0, 0, 0, 0, 'v', 'e', 'r', 's', 'i', 'o', 'n', 0, 0, 0, 0, 0};
1173 223 : std::copy(std::begin(Params().MessageStart()), std::end(Params().MessageStart()), v1_prefix.begin());
1174 223 : Assume(m_recv_buffer.size() <= v1_prefix.size());
1175 223 : if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.end(), v1_prefix.begin())) {
1176 : // Mismatch with v1 prefix, so we can assume a v2 connection.
1177 208 : SetReceiveState(RecvState::KEY); // Convert to KEY state, leaving received bytes around.
1178 : // Transition the sender to AWAITING_KEY state and start sending.
1179 208 : LOCK(m_send_mutex);
1180 208 : SetSendState(SendState::AWAITING_KEY);
1181 208 : StartSendingHandshake();
1182 223 : } else if (m_recv_buffer.size() == v1_prefix.size()) {
1183 : // Full match with the v1 prefix, so fall back to v1 behavior.
1184 11 : LOCK(m_send_mutex);
1185 11 : Span<const uint8_t> feedback{m_recv_buffer};
1186 : // Feed already received bytes to v1 transport. It should always accept these, because it's
1187 : // less than the size of a v1 header, and these are the first bytes fed to m_v1_fallback.
1188 11 : bool ret = m_v1_fallback.ReceivedBytes(feedback);
1189 11 : Assume(feedback.empty());
1190 11 : Assume(ret);
1191 11 : SetReceiveState(RecvState::V1);
1192 11 : SetSendState(SendState::V1);
1193 : // Reset v2 transport buffers to save memory.
1194 11 : ClearShrink(m_recv_buffer);
1195 11 : ClearShrink(m_send_buffer);
1196 11 : } else {
1197 : // We have not received enough to distinguish v1 from v2 yet. Wait until more bytes come.
1198 : }
1199 223 : }
1200 :
1201 443 : bool V2Transport::ProcessReceivedKeyBytes() noexcept
1202 : {
1203 443 : AssertLockHeld(m_recv_mutex);
1204 443 : AssertLockNotHeld(m_send_mutex);
1205 443 : Assume(m_recv_state == RecvState::KEY);
1206 443 : Assume(m_recv_buffer.size() <= EllSwiftPubKey::size());
1207 :
1208 : // As a special exception, if bytes 4-16 of the key on a responder connection match the
1209 : // corresponding bytes of a V1 version message, but bytes 0-4 don't match the network magic
1210 : // (if they did, we'd have switched to V1 state already), assume this is a peer from
1211 : // another network, and disconnect them. They will almost certainly disconnect us too when
1212 : // they receive our uniformly random key and garbage, but detecting this case specially
1213 : // means we can log it.
1214 : static constexpr std::array<uint8_t, 12> MATCH = {'v', 'e', 'r', 's', 'i', 'o', 'n', 0, 0, 0, 0, 0};
1215 : static constexpr size_t OFFSET = sizeof(CMessageHeader::MessageStartChars);
1216 443 : if (!m_initiating && m_recv_buffer.size() >= OFFSET + MATCH.size()) {
1217 262 : if (std::equal(MATCH.begin(), MATCH.end(), m_recv_buffer.begin() + OFFSET)) {
1218 3 : LogPrint(BCLog::NET, "V2 transport error: V1 peer with wrong MessageStart %s\n",
1219 : HexStr(Span(m_recv_buffer).first(OFFSET)));
1220 3 : return false;
1221 : }
1222 259 : }
1223 :
1224 440 : if (m_recv_buffer.size() == EllSwiftPubKey::size()) {
1225 : // Other side's key has been fully received, and can now be Diffie-Hellman combined with
1226 : // our key to initialize the encryption ciphers.
1227 :
1228 : // Initialize the ciphers.
1229 357 : EllSwiftPubKey ellswift(MakeByteSpan(m_recv_buffer));
1230 357 : LOCK(m_send_mutex);
1231 357 : m_cipher.Initialize(ellswift, m_initiating);
1232 :
1233 : // Switch receiver state to GARB_GARBTERM.
1234 357 : SetReceiveState(RecvState::GARB_GARBTERM);
1235 357 : m_recv_buffer.clear();
1236 :
1237 : // Switch sender state to READY.
1238 357 : SetSendState(SendState::READY);
1239 :
1240 : // Append the garbage terminator to the send buffer.
1241 357 : m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::GARBAGE_TERMINATOR_LEN);
1242 714 : std::copy(m_cipher.GetSendGarbageTerminator().begin(),
1243 357 : m_cipher.GetSendGarbageTerminator().end(),
1244 357 : MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::GARBAGE_TERMINATOR_LEN).begin());
1245 :
1246 : // Construct version packet in the send buffer, with the sent garbage data as AAD.
1247 357 : m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::EXPANSION + VERSION_CONTENTS.size());
1248 357 : m_cipher.Encrypt(
1249 357 : /*contents=*/VERSION_CONTENTS,
1250 357 : /*aad=*/MakeByteSpan(m_send_garbage),
1251 : /*ignore=*/false,
1252 357 : /*output=*/MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::EXPANSION + VERSION_CONTENTS.size()));
1253 : // We no longer need the garbage.
1254 357 : ClearShrink(m_send_garbage);
1255 357 : } else {
1256 : // We still have to receive more key bytes.
1257 : }
1258 440 : return true;
1259 443 : }
1260 :
1261 740093 : bool V2Transport::ProcessReceivedGarbageBytes() noexcept
1262 : {
1263 740093 : AssertLockHeld(m_recv_mutex);
1264 740093 : Assume(m_recv_state == RecvState::GARB_GARBTERM);
1265 740093 : Assume(m_recv_buffer.size() <= MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN);
1266 740093 : if (m_recv_buffer.size() >= BIP324Cipher::GARBAGE_TERMINATOR_LEN) {
1267 734738 : if (MakeByteSpan(m_recv_buffer).last(BIP324Cipher::GARBAGE_TERMINATOR_LEN) == m_cipher.GetReceiveGarbageTerminator()) {
1268 : // Garbage terminator received. Store garbage to authenticate it as AAD later.
1269 347 : m_recv_aad = std::move(m_recv_buffer);
1270 347 : m_recv_aad.resize(m_recv_aad.size() - BIP324Cipher::GARBAGE_TERMINATOR_LEN);
1271 347 : m_recv_buffer.clear();
1272 347 : SetReceiveState(RecvState::VERSION);
1273 734738 : } else if (m_recv_buffer.size() == MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN) {
1274 : // We've reached the maximum length for garbage + garbage terminator, and the
1275 : // terminator still does not match. Abort.
1276 6 : LogPrint(BCLog::NET, "V2 transport error: missing garbage terminator, peer=%d\n", m_nodeid);
1277 6 : return false;
1278 : } else {
1279 : // We still need to receive more garbage and/or garbage terminator bytes.
1280 : }
1281 734732 : } else {
1282 : // We have less than GARBAGE_TERMINATOR_LEN (16) bytes, so we certainly need to receive
1283 : // more first.
1284 : }
1285 740087 : return true;
1286 740093 : }
1287 :
1288 128149 : bool V2Transport::ProcessReceivedPacketBytes() noexcept
1289 : {
1290 128149 : AssertLockHeld(m_recv_mutex);
1291 128149 : Assume(m_recv_state == RecvState::VERSION || m_recv_state == RecvState::APP);
1292 :
1293 : // The maximum permitted contents length for a packet, consisting of:
1294 : // - 0x00 byte: indicating long message type encoding
1295 : // - 12 bytes of message type
1296 : // - payload
1297 : static constexpr size_t MAX_CONTENTS_LEN =
1298 : 1 + CMessageHeader::COMMAND_SIZE +
1299 : std::min<size_t>(MAX_SIZE, MAX_PROTOCOL_MESSAGE_LENGTH);
1300 :
1301 128149 : if (m_recv_buffer.size() == BIP324Cipher::LENGTH_LEN) {
1302 : // Length descriptor received.
1303 63320 : m_recv_len = m_cipher.DecryptLength(MakeByteSpan(m_recv_buffer));
1304 63320 : if (m_recv_len > MAX_CONTENTS_LEN) {
1305 10 : LogPrint(BCLog::NET, "V2 transport error: packet too large (%u bytes), peer=%d\n", m_recv_len, m_nodeid);
1306 10 : return false;
1307 : }
1308 128139 : } else if (m_recv_buffer.size() > BIP324Cipher::LENGTH_LEN && m_recv_buffer.size() == m_recv_len + BIP324Cipher::EXPANSION) {
1309 : // Ciphertext received, decrypt it into m_recv_decode_buffer.
1310 : // Note that it is impossible to reach this branch without hitting the branch above first,
1311 : // as GetMaxBytesToProcess only allows up to LENGTH_LEN into the buffer before that point.
1312 63310 : m_recv_decode_buffer.resize(m_recv_len);
1313 63310 : bool ignore{false};
1314 126620 : bool ret = m_cipher.Decrypt(
1315 63310 : /*input=*/MakeByteSpan(m_recv_buffer).subspan(BIP324Cipher::LENGTH_LEN),
1316 63310 : /*aad=*/MakeByteSpan(m_recv_aad),
1317 : /*ignore=*/ignore,
1318 63310 : /*contents=*/MakeWritableByteSpan(m_recv_decode_buffer));
1319 63310 : if (!ret) {
1320 14 : LogPrint(BCLog::NET, "V2 transport error: packet decryption failure (%u bytes), peer=%d\n", m_recv_len, m_nodeid);
1321 14 : return false;
1322 : }
1323 : // We have decrypted a valid packet with the AAD we expected, so clear the expected AAD.
1324 63296 : ClearShrink(m_recv_aad);
1325 : // Feed the last 4 bytes of the Poly1305 authentication tag (and its timing) into our RNG.
1326 63296 : RandAddEvent(ReadLE32(m_recv_buffer.data() + m_recv_buffer.size() - 4));
1327 :
1328 : // At this point we have a valid packet decrypted into m_recv_decode_buffer. If it's not a
1329 : // decoy, which we simply ignore, use the current state to decide what to do with it.
1330 63296 : if (!ignore) {
1331 13698 : switch (m_recv_state) {
1332 : case RecvState::VERSION:
1333 : // Version message received; transition to application phase. The contents is
1334 : // ignored, but can be used for future extensions.
1335 343 : SetReceiveState(RecvState::APP);
1336 343 : break;
1337 : case RecvState::APP:
1338 : // Application message decrypted correctly. It can be extracted using GetMessage().
1339 13355 : SetReceiveState(RecvState::APP_READY);
1340 13355 : break;
1341 : default:
1342 : // Any other state is invalid (this function should not have been called).
1343 0 : Assume(false);
1344 0 : }
1345 13698 : }
1346 : // Wipe the receive buffer where the next packet will be received into.
1347 63296 : ClearShrink(m_recv_buffer);
1348 : // In all but APP_READY state, we can wipe the decoded contents.
1349 63296 : if (m_recv_state != RecvState::APP_READY) ClearShrink(m_recv_decode_buffer);
1350 63296 : } else {
1351 : // We either have less than 3 bytes, so we don't know the packet's length yet, or more
1352 : // than 3 bytes but less than the packet's full ciphertext. Wait until those arrive.
1353 : }
1354 128125 : return true;
1355 128149 : }
1356 :
1357 870804 : size_t V2Transport::GetMaxBytesToProcess() noexcept
1358 : {
1359 870804 : AssertLockHeld(m_recv_mutex);
1360 870804 : switch (m_recv_state) {
1361 : case RecvState::KEY_MAYBE_V1:
1362 : // During the KEY_MAYBE_V1 state we do not allow more than the length of v1 prefix into the
1363 : // receive buffer.
1364 223 : Assume(m_recv_buffer.size() <= V1_PREFIX_LEN);
1365 : // As long as we're not sure if this is a v1 or v2 connection, don't receive more than what
1366 : // is strictly necessary to distinguish the two (16 bytes). If we permitted more than
1367 : // the v1 header size (24 bytes), we may not be able to feed the already-received bytes
1368 : // back into the m_v1_fallback V1 transport.
1369 223 : return V1_PREFIX_LEN - m_recv_buffer.size();
1370 : case RecvState::KEY:
1371 : // During the KEY state, we only allow the 64-byte key into the receive buffer.
1372 443 : Assume(m_recv_buffer.size() <= EllSwiftPubKey::size());
1373 : // As long as we have not received the other side's public key, don't receive more than
1374 : // that (64 bytes), as garbage follows, and locating the garbage terminator requires the
1375 : // key exchange first.
1376 443 : return EllSwiftPubKey::size() - m_recv_buffer.size();
1377 : case RecvState::GARB_GARBTERM:
1378 : // Process garbage bytes one by one (because terminator may appear anywhere).
1379 740093 : return 1;
1380 : case RecvState::VERSION:
1381 : case RecvState::APP:
1382 : // These three states all involve decoding a packet. Process the length descriptor first,
1383 : // so that we know where the current packet ends (and we don't process bytes from the next
1384 : // packet or decoy yet). Then, process the ciphertext bytes of the current packet.
1385 128149 : if (m_recv_buffer.size() < BIP324Cipher::LENGTH_LEN) {
1386 63340 : return BIP324Cipher::LENGTH_LEN - m_recv_buffer.size();
1387 : } else {
1388 : // Note that BIP324Cipher::EXPANSION is the total difference between contents size
1389 : // and encoded packet size, which includes the 3 bytes due to the packet length.
1390 : // When transitioning from receiving the packet length to receiving its ciphertext,
1391 : // the encrypted packet length is left in the receive buffer.
1392 64809 : return BIP324Cipher::EXPANSION + m_recv_len - m_recv_buffer.size();
1393 : }
1394 : case RecvState::APP_READY:
1395 : // No bytes can be processed until GetMessage() is called.
1396 1896 : return 0;
1397 : case RecvState::V1:
1398 : // Not allowed (must be dealt with by the caller).
1399 0 : Assume(false);
1400 0 : return 0;
1401 : }
1402 0 : Assume(false); // unreachable
1403 0 : return 0;
1404 870804 : }
1405 :
1406 270348 : bool V2Transport::ReceivedBytes(Span<const uint8_t>& msg_bytes) noexcept
1407 : {
1408 523754 : AssertLockNotHeld(m_recv_mutex);
1409 : /** How many bytes to allocate in the receive buffer at most above what is received so far. */
1410 : static constexpr size_t MAX_RESERVE_AHEAD = 256 * 1024;
1411 :
1412 16942 : LOCK(m_recv_mutex);
1413 16942 : if (m_recv_state == RecvState::V1) return m_v1_fallback.ReceivedBytes(msg_bytes);
1414 :
1415 : // Process the provided bytes in msg_bytes in a loop. In each iteration a nonzero number of
1416 : // bytes (decided by GetMaxBytesToProcess) are taken from the beginning om msg_bytes, and
1417 : // appended to m_recv_buffer. Then, depending on the receiver state, one of the
1418 : // ProcessReceived*Bytes functions is called to process the bytes in that buffer.
1419 884956 : while (!msg_bytes.empty()) {
1420 : // Decide how many bytes to copy from msg_bytes to m_recv_buffer.
1421 870804 : size_t max_read = GetMaxBytesToProcess();
1422 :
1423 : // Reserve space in the buffer if there is not enough.
1424 870804 : if (m_recv_buffer.size() + std::min(msg_bytes.size(), max_read) > m_recv_buffer.capacity()) {
1425 127074 : switch (m_recv_state) {
1426 : case RecvState::KEY_MAYBE_V1:
1427 : case RecvState::KEY:
1428 : case RecvState::GARB_GARBTERM:
1429 : // During the initial states (key/garbage), allocate once to fit the maximum (4111
1430 : // bytes).
1431 127074 : m_recv_buffer.reserve(MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN);
1432 371 : break;
1433 : case RecvState::VERSION:
1434 : case RecvState::APP: {
1435 : // During states where a packet is being received, as much as is expected but never
1436 : // more than MAX_RESERVE_AHEAD bytes in addition to what is received so far.
1437 : // This means attackers that want to cause us to waste allocated memory are limited
1438 : // to MAX_RESERVE_AHEAD above the largest allowed message contents size, and to
1439 : // MAX_RESERVE_AHEAD more than they've actually sent us.
1440 0 : size_t alloc_add = std::min(max_read, msg_bytes.size() + MAX_RESERVE_AHEAD);
1441 126703 : m_recv_buffer.reserve(m_recv_buffer.size() + alloc_add);
1442 126703 : break;
1443 : }
1444 : case RecvState::APP_READY:
1445 : // The buffer is empty in this state.
1446 0 : Assume(m_recv_buffer.empty());
1447 0 : break;
1448 : case RecvState::V1:
1449 : // Should have bailed out above.
1450 0 : Assume(false);
1451 0 : break;
1452 : }
1453 127074 : }
1454 :
1455 : // Can't read more than provided input.
1456 870804 : max_read = std::min(msg_bytes.size(), max_read);
1457 : // Copy data to buffer.
1458 870804 : m_recv_buffer.insert(m_recv_buffer.end(), UCharCast(msg_bytes.data()), UCharCast(msg_bytes.data() + max_read));
1459 870804 : msg_bytes = msg_bytes.subspan(max_read);
1460 :
1461 : // Process data in the buffer.
1462 870804 : switch (m_recv_state) {
1463 : case RecvState::KEY_MAYBE_V1:
1464 223 : ProcessReceivedMaybeV1Bytes();
1465 223 : if (m_recv_state == RecvState::V1) return true;
1466 212 : break;
1467 :
1468 : case RecvState::KEY:
1469 443 : if (!ProcessReceivedKeyBytes()) return false;
1470 440 : break;
1471 :
1472 : case RecvState::GARB_GARBTERM:
1473 740093 : if (!ProcessReceivedGarbageBytes()) return false;
1474 740087 : break;
1475 :
1476 : case RecvState::VERSION:
1477 : case RecvState::APP:
1478 128149 : if (!ProcessReceivedPacketBytes()) return false;
1479 128125 : break;
1480 :
1481 : case RecvState::APP_READY:
1482 1896 : return true;
1483 :
1484 : case RecvState::V1:
1485 : // We should have bailed out before.
1486 0 : Assume(false);
1487 0 : break;
1488 : }
1489 : // Make sure we have made progress before continuing.
1490 868864 : Assume(max_read > 0);
1491 : }
1492 :
1493 14152 : return true;
1494 16942 : }
1495 :
1496 13355 : std::optional<std::string> V2Transport::GetMessageType(Span<const uint8_t>& contents) noexcept
1497 : {
1498 13355 : if (contents.size() == 0) return std::nullopt; // Empty contents
1499 13355 : uint8_t first_byte = contents[0];
1500 13355 : contents = contents.subspan(1); // Strip first byte.
1501 :
1502 13355 : if (first_byte != 0) {
1503 : // Short (1 byte) encoding.
1504 12364 : if (IsValidV2ShortID(first_byte)) {
1505 : // Valid short message id.
1506 12363 : return std::string{V2ShortIDs()[first_byte]};
1507 : } else {
1508 : // Unknown short message id.
1509 1 : return std::nullopt;
1510 : }
1511 : }
1512 :
1513 991 : if (contents.size() < CMessageHeader::COMMAND_SIZE) {
1514 10 : return std::nullopt; // Long encoding needs 12 message type bytes.
1515 : }
1516 :
1517 981 : size_t msg_type_len{0};
1518 8140 : while (msg_type_len < CMessageHeader::COMMAND_SIZE && contents[msg_type_len] != 0) {
1519 : // Verify that message type bytes before the first 0x00 are in range.
1520 7159 : if (contents[msg_type_len] < ' ' || contents[msg_type_len] > 0x7F) {
1521 0 : return {};
1522 : }
1523 7159 : ++msg_type_len;
1524 : }
1525 981 : std::string ret{reinterpret_cast<const char*>(contents.data()), msg_type_len};
1526 5494 : while (msg_type_len < CMessageHeader::COMMAND_SIZE) {
1527 : // Verify that message type bytes after the first 0x00 are also 0x00.
1528 4563 : if (contents[msg_type_len] != 0) return {};
1529 4513 : ++msg_type_len;
1530 : }
1531 : // Strip message type bytes of contents.
1532 931 : contents = contents.subspan(CMessageHeader::COMMAND_SIZE);
1533 931 : return ret;
1534 13355 : }
1535 :
1536 13792 : CNetMessage V2Transport::GetReceivedMessage(std::chrono::microseconds time, bool& reject_message) noexcept
1537 : {
1538 13792 : AssertLockNotHeld(m_recv_mutex);
1539 13792 : LOCK(m_recv_mutex);
1540 13792 : if (m_recv_state == RecvState::V1) return m_v1_fallback.GetReceivedMessage(time, reject_message);
1541 :
1542 13355 : Assume(m_recv_state == RecvState::APP_READY);
1543 13355 : Span<const uint8_t> contents{m_recv_decode_buffer};
1544 13355 : auto msg_type = GetMessageType(contents);
1545 13355 : CDataStream ret(m_recv_type, m_recv_version);
1546 13355 : CNetMessage msg{std::move(ret)};
1547 : // Note that BIP324Cipher::EXPANSION also includes the length descriptor size.
1548 13355 : msg.m_raw_message_size = m_recv_decode_buffer.size() + BIP324Cipher::EXPANSION;
1549 13355 : if (msg_type) {
1550 13294 : reject_message = false;
1551 13294 : msg.m_type = std::move(*msg_type);
1552 13294 : msg.m_time = time;
1553 13294 : msg.m_message_size = contents.size();
1554 13294 : msg.m_recv.resize(contents.size());
1555 13294 : std::copy(contents.begin(), contents.end(), UCharCast(msg.m_recv.data()));
1556 13294 : } else {
1557 61 : LogPrint(BCLog::NET, "V2 transport error: invalid message type (%u bytes contents), peer=%d\n", m_recv_decode_buffer.size(), m_nodeid);
1558 61 : reject_message = true;
1559 : }
1560 13355 : ClearShrink(m_recv_decode_buffer);
1561 13355 : SetReceiveState(RecvState::APP);
1562 :
1563 13355 : return msg;
1564 13792 : }
1565 :
1566 16006 : bool V2Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
1567 : {
1568 16006 : AssertLockNotHeld(m_send_mutex);
1569 16006 : LOCK(m_send_mutex);
1570 16006 : if (m_send_state == SendState::V1) return m_v1_fallback.SetMessageToSend(msg);
1571 : // We only allow adding a new message to be sent when in the READY state (so the packet cipher
1572 : // is available) and the send buffer is empty. This limits the number of messages in the send
1573 : // buffer to just one, and leaves the responsibility for queueing them up to the caller.
1574 15183 : if (!(m_send_state == SendState::READY && m_send_buffer.empty())) return false;
1575 : // Construct contents (encoding message type + payload).
1576 14574 : std::vector<uint8_t> contents;
1577 14574 : auto short_message_id = V2_MESSAGE_MAP(msg.m_type);
1578 :
1579 14574 : if (short_message_id.has_value() && m_peer_version >= GetMessageMinVersion(msg.m_type)) {
1580 : // Use short encoding (1 byte)
1581 13616 : contents.resize(1 + msg.data.size());
1582 13616 : contents[0] = *short_message_id;
1583 13616 : std::copy(msg.data.begin(), msg.data.end(), contents.begin() + 1);
1584 13616 : } else {
1585 : // Use long encoding (13 bytes for message type)
1586 : // Initialize with zeroes, and then write the message type string starting at offset 1.
1587 : // This means contents[0] and the unused positions in contents[1..13] remain 0x00.
1588 958 : contents.resize(1 + CMessageHeader::COMMAND_SIZE + msg.data.size(), 0);
1589 958 : std::copy(msg.m_type.begin(), msg.m_type.end(), contents.data() + 1);
1590 958 : std::copy(msg.data.begin(), msg.data.end(), contents.begin() + 1 + CMessageHeader::COMMAND_SIZE);
1591 : }
1592 : // Construct ciphertext in send buffer.
1593 14574 : m_send_buffer.resize(contents.size() + BIP324Cipher::EXPANSION);
1594 14574 : m_cipher.Encrypt(MakeByteSpan(contents), {}, false, MakeWritableByteSpan(m_send_buffer));
1595 14574 : m_send_type = msg.m_type;
1596 : // Release memory
1597 14574 : ClearShrink(msg.data);
1598 14574 : return true;
1599 16006 : }
1600 :
1601 349 : void V2Transport::SetPeerVersion(int version) noexcept
1602 : {
1603 349 : AssertLockNotHeld(m_send_mutex);
1604 349 : LOCK(m_send_mutex);
1605 349 : m_peer_version = version;
1606 349 : }
1607 :
1608 322397 : Transport::BytesToSend V2Transport::GetBytesToSend(bool have_next_message) const noexcept
1609 : {
1610 322424 : AssertLockNotHeld(m_send_mutex);
1611 322370 : LOCK(m_send_mutex);
1612 322388 : if (m_send_state == SendState::V1) return m_v1_fallback.GetBytesToSend(have_next_message);
1613 :
1614 296971 : if (m_send_state == SendState::MAYBE_V1) Assume(m_send_buffer.empty());
1615 296971 : Assume(m_send_pos <= m_send_buffer.size());
1616 296980 : return {
1617 296973 : Span{m_send_buffer}.subspan(m_send_pos),
1618 : // We only have more to send after the current m_send_buffer if there is a (next)
1619 : // message to be sent, and we're capable of sending packets. */
1620 296980 : have_next_message && m_send_state == SendState::READY,
1621 296980 : m_send_type
1622 : };
1623 322397 : }
1624 :
1625 17474 : void V2Transport::MarkBytesSent(size_t bytes_sent) noexcept
1626 : {
1627 17474 : AssertLockNotHeld(m_send_mutex);
1628 17474 : LOCK(m_send_mutex);
1629 17474 : if (m_send_state == SendState::V1) return m_v1_fallback.MarkBytesSent(bytes_sent);
1630 :
1631 15990 : if (m_send_state == SendState::AWAITING_KEY && m_send_pos == 0 && bytes_sent > 0) {
1632 166 : LogPrint(BCLog::NET, "start sending v2 handshake to peer=%d\n", m_nodeid);
1633 166 : }
1634 :
1635 15990 : m_send_pos += bytes_sent;
1636 15990 : Assume(m_send_pos <= m_send_buffer.size());
1637 15990 : if (m_send_pos >= CMessageHeader::HEADER_SIZE) {
1638 15296 : m_sent_v1_header_worth = true;
1639 15296 : }
1640 : // Wipe the buffer when everything is sent.
1641 15990 : if (m_send_pos == m_send_buffer.size()) {
1642 15076 : m_send_pos = 0;
1643 15076 : ClearShrink(m_send_buffer);
1644 15076 : }
1645 17474 : }
1646 :
1647 219 : bool V2Transport::ShouldReconnectV1() const noexcept
1648 : {
1649 219 : AssertLockNotHeld(m_send_mutex);
1650 219 : AssertLockNotHeld(m_recv_mutex);
1651 : // Only outgoing connections need reconnection.
1652 219 : if (!m_initiating) return false;
1653 :
1654 84 : LOCK(m_recv_mutex);
1655 : // We only reconnect in the very first state and when the receive buffer is empty. Together
1656 : // these conditions imply nothing has been received so far.
1657 84 : if (m_recv_state != RecvState::KEY) return false;
1658 6 : if (!m_recv_buffer.empty()) return false;
1659 : // Check if we've sent enough for the other side to disconnect us (if it was V1).
1660 6 : LOCK(m_send_mutex);
1661 6 : return m_sent_v1_header_worth;
1662 219 : }
1663 :
1664 111269 : size_t V2Transport::GetSendMemoryUsage() const noexcept
1665 : {
1666 111276 : AssertLockNotHeld(m_send_mutex);
1667 111262 : LOCK(m_send_mutex);
1668 111266 : if (m_send_state == SendState::V1) return m_v1_fallback.GetSendMemoryUsage();
1669 :
1670 102789 : return sizeof(m_send_buffer) + memusage::DynamicUsage(m_send_buffer);
1671 111263 : }
1672 :
1673 3780 : Transport::Info V2Transport::GetInfo() const noexcept
1674 : {
1675 3780 : AssertLockNotHeld(m_recv_mutex);
1676 3780 : LOCK(m_recv_mutex);
1677 3780 : if (m_recv_state == RecvState::V1) return m_v1_fallback.GetInfo();
1678 :
1679 3696 : Transport::Info info;
1680 :
1681 : // Do not report v2 and session ID until the version packet has been received
1682 : // and verified (confirming that the other side very likely has the same keys as us).
1683 7371 : if (m_recv_state != RecvState::KEY_MAYBE_V1 && m_recv_state != RecvState::KEY &&
1684 3683 : m_recv_state != RecvState::GARB_GARBTERM && m_recv_state != RecvState::VERSION) {
1685 3675 : info.transport_type = TransportProtocolType::V2;
1686 3675 : info.session_id = uint256(MakeUCharSpan(m_cipher.GetSessionID()));
1687 3675 : } else {
1688 21 : info.transport_type = TransportProtocolType::DETECTING;
1689 : }
1690 :
1691 3696 : return info;
1692 3780 : }
1693 :
1694 35965423 : std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
1695 : {
1696 35965423 : auto it = node.vSendMsg.begin();
1697 35965423 : size_t nSentSize = 0;
1698 35965423 : bool data_left{false}; //!< second return value (whether unsent data remains)
1699 35965423 : std::optional<bool> expected_more;
1700 :
1701 37529811 : while (true) {
1702 37529811 : if (it != node.vSendMsg.end()) {
1703 : // If possible, move one message from the send queue to the transport. This fails when
1704 : // there is an existing message still being sent, or (for v2 transports) when the
1705 : // handshake has not yet completed.
1706 843093 : size_t memusage = it->GetMemoryUsage();
1707 843093 : if (node.m_transport->SetMessageToSend(*it)) {
1708 : // Update memory usage of send buffer (as *it will be deleted).
1709 805102 : node.m_send_memusage -= memusage;
1710 805102 : ++it;
1711 805102 : }
1712 843093 : }
1713 42223425 : const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend(it != node.vSendMsg.end());
1714 : // We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more
1715 : // bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check,
1716 : // verify that the previously returned 'more' was correct.
1717 37529811 : if (expected_more.has_value()) Assume(!data.empty() == *expected_more);
1718 37529811 : expected_more = more;
1719 37529811 : data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent
1720 37529811 : int nBytes = 0;
1721 37529811 : if (!data.empty()) {
1722 1564547 : LOCK(node.m_sock_mutex);
1723 : // There is no socket in case we've already disconnected, or in test cases without
1724 : // real connections. In these cases, we bail out immediately and just leave things
1725 : // in the send queue and transport.
1726 1564547 : if (!node.m_sock) {
1727 0 : break;
1728 : }
1729 1564547 : int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
1730 : #ifdef MSG_MORE
1731 : if (more) {
1732 : flags |= MSG_MORE;
1733 : }
1734 : #endif
1735 4693641 : nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags);
1736 1564547 : }
1737 37529811 : if (nBytes > 0) {
1738 1564520 : node.m_last_send = GetTime<std::chrono::seconds>();
1739 1564520 : node.nSendBytes += nBytes;
1740 : // Notify transport that bytes have been processed.
1741 1564520 : node.m_transport->MarkBytesSent(nBytes);
1742 : // Update statistics per message type.
1743 1564520 : if (!msg_type.empty()) { // don't report v2 handshake bytes for now
1744 3128250 : node.AccountForSentBytes(msg_type, nBytes);
1745 1564125 : }
1746 1564520 : nSentSize += nBytes;
1747 3129040 : if ((size_t)nBytes != data.size()) {
1748 : // could not send full message; stop sending more
1749 132 : node.fCanSendData = false;
1750 132 : break;
1751 : }
1752 1564388 : } else {
1753 35965291 : if (nBytes < 0) {
1754 : // error
1755 27 : int nErr = WSAGetLastError();
1756 27 : if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) {
1757 27 : LogPrint(BCLog::NET, "socket send error for peer=%d: %s\n", node.GetId(), NetworkErrorString(nErr));
1758 27 : node.fDisconnect = true;
1759 27 : }
1760 27 : }
1761 35965291 : break;
1762 : }
1763 : }
1764 :
1765 35965423 : node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize;
1766 :
1767 35965423 : if (it == node.vSendMsg.end()) {
1768 35965016 : assert(node.m_send_memusage == 0);
1769 35965016 : }
1770 35965423 : node.vSendMsg.erase(node.vSendMsg.begin(), it);
1771 35965423 : node.nSendMsgSize = node.vSendMsg.size();
1772 35965423 : return {nSentSize, data_left};
1773 0 : }
1774 :
1775 2 : std::vector<NodeEvictionCandidate> CConnman::GetEvictionCandidates() const
1776 : {
1777 2 : std::vector<NodeEvictionCandidate> vEvictionCandidates;
1778 2 : READ_LOCK(m_nodes_mutex);
1779 :
1780 44 : for (const CNode* node : m_nodes) {
1781 42 : if (node->fDisconnect)
1782 0 : continue;
1783 :
1784 42 : if (m_active_masternode) {
1785 : // This handles eviction protected nodes. Nodes are always protected for a short time after the connection
1786 : // was accepted. This short time is meant for the VERSION/VERACK exchange and the possible MNAUTH that might
1787 : // follow when the incoming connection is from another masternode. When a message other than MNAUTH
1788 : // is received after VERSION/VERACK, the protection is lifted immediately.
1789 0 : bool isProtected = GetTime<std::chrono::seconds>() - node->m_connected < INBOUND_EVICTION_PROTECTION_TIME;
1790 0 : if (node->nTimeFirstMessageReceived.load() != 0s && !node->fFirstMessageIsMNAUTH) {
1791 0 : isProtected = false;
1792 0 : }
1793 : // if MNAUTH was valid, the node is always protected (and at the same time not accounted when
1794 : // checking incoming connection limits)
1795 0 : if (!node->GetVerifiedProRegTxHash().IsNull()) {
1796 0 : isProtected = true;
1797 0 : }
1798 0 : if (isProtected) {
1799 0 : continue;
1800 : }
1801 0 : }
1802 :
1803 42 : NodeEvictionCandidate candidate{
1804 42 : .id = node->GetId(),
1805 42 : .m_connected = node->m_connected,
1806 42 : .m_min_ping_time = node->m_min_ping_time,
1807 42 : .m_last_block_time = node->m_last_block_time,
1808 42 : .m_last_tx_time = node->m_last_tx_time,
1809 42 : .fRelevantServices = node->m_has_all_wanted_services,
1810 42 : .m_relay_txs = node->m_relays_txs.load(),
1811 42 : .fBloomFilter = node->m_bloom_filter_loaded.load(),
1812 42 : .nKeyedNetGroup = node->nKeyedNetGroup,
1813 42 : .prefer_evict = node->m_prefer_evict,
1814 42 : .m_is_local = node->addr.IsLocal(),
1815 42 : .m_network = node->ConnectedThroughNetwork(),
1816 42 : .m_noban = node->HasPermission(NetPermissionFlags::NoBan),
1817 42 : .m_conn_type = node->m_conn_type,
1818 : };
1819 42 : vEvictionCandidates.push_back(candidate);
1820 : }
1821 :
1822 2 : return vEvictionCandidates;
1823 2 : }
1824 :
1825 : /** Try to find a connection to evict when the node is full.
1826 : * Extreme care must be taken to avoid opening the node to attacker
1827 : * triggered network partitioning.
1828 : * The strategy used here is to protect a small number of peers
1829 : * for each of several distinct characteristics which are difficult
1830 : * to forge. In order to partition a node the attacker must be
1831 : * simultaneously better at all of them than honest peers.
1832 : */
1833 2 : bool CConnman::AttemptToEvictConnection()
1834 : {
1835 2 : std::vector<NodeEvictionCandidate> vEvictionCandidates = GetEvictionCandidates();
1836 2 : const std::optional<NodeId> node_id_to_evict = SelectNodeToEvict(std::move(vEvictionCandidates));
1837 2 : if (!node_id_to_evict) {
1838 0 : return false;
1839 : }
1840 4 : return WithNodeMutable(*node_id_to_evict, [](CNode* pnode){
1841 2 : LogPrint(BCLog::NET_NETCONN, "selected %s connection for eviction peer=%d; disconnecting\n", pnode->ConnectionTypeAsString(), pnode->GetId());
1842 2 : pnode->fDisconnect = true;
1843 2 : return true;
1844 2 : }).value_or(false);
1845 2 : }
1846 :
1847 6638 : void CConnman::AcceptConnection(const ListenSocket& hListenSocket, CMasternodeSync& mn_sync) {
1848 : struct sockaddr_storage sockaddr;
1849 6638 : socklen_t len = sizeof(sockaddr);
1850 6638 : auto sock = hListenSocket.sock->Accept((struct sockaddr*)&sockaddr, &len);
1851 6638 : CAddress addr;
1852 :
1853 6638 : if (!sock) {
1854 0 : const int nErr = WSAGetLastError();
1855 0 : if (nErr != WSAEWOULDBLOCK) {
1856 0 : LogPrintf("socket error accept failed: %s\n", NetworkErrorString(nErr));
1857 0 : }
1858 0 : return;
1859 : }
1860 :
1861 6638 : if (!addr.SetSockAddr((const struct sockaddr*)&sockaddr)) {
1862 0 : LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "Unknown socket family\n");
1863 0 : } else {
1864 6638 : addr = CAddress{MaybeFlipIPv6toCJDNS(addr), NODE_NONE};
1865 : }
1866 :
1867 6638 : const CAddress addr_bind{MaybeFlipIPv6toCJDNS(GetBindAddress(*sock)), NODE_NONE};
1868 :
1869 6638 : NetPermissionFlags permission_flags = NetPermissionFlags::None;
1870 6638 : hListenSocket.AddSocketPermissionFlags(permission_flags);
1871 :
1872 6638 : CreateNodeFromAcceptedSocket(std::move(sock), permission_flags, addr_bind, addr, mn_sync);
1873 6638 : }
1874 :
1875 6638 : void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
1876 : NetPermissionFlags permission_flags,
1877 : const CAddress& addr_bind,
1878 : const CAddress& addr,
1879 : CMasternodeSync& mn_sync)
1880 : {
1881 6638 : int nInbound = 0;
1882 6638 : int nVerifiedInboundMasternodes = 0;
1883 6638 : int nMaxInbound = nMaxConnections - m_max_outbound;
1884 :
1885 6638 : AddWhitelistPermissionFlags(permission_flags, addr);
1886 6638 : if (NetPermissions::HasFlag(permission_flags, NetPermissionFlags::Implicit)) {
1887 114 : NetPermissions::ClearFlag(permission_flags, NetPermissionFlags::Implicit);
1888 114 : if (gArgs.GetBoolArg("-whitelistforcerelay", DEFAULT_WHITELISTFORCERELAY)) NetPermissions::AddFlag(permission_flags, NetPermissionFlags::ForceRelay);
1889 114 : if (gArgs.GetBoolArg("-whitelistrelay", DEFAULT_WHITELISTRELAY)) NetPermissions::AddFlag(permission_flags, NetPermissionFlags::Relay);
1890 114 : NetPermissions::AddFlag(permission_flags, NetPermissionFlags::Mempool);
1891 114 : NetPermissions::AddFlag(permission_flags, NetPermissionFlags::NoBan);
1892 114 : }
1893 :
1894 :
1895 : {
1896 6638 : READ_LOCK(m_nodes_mutex);
1897 21582 : for (const CNode* pnode : m_nodes) {
1898 14944 : if (pnode->IsInboundConn()) {
1899 9481 : nInbound++;
1900 9481 : if (!pnode->GetVerifiedProRegTxHash().IsNull()) {
1901 5057 : nVerifiedInboundMasternodes++;
1902 5057 : }
1903 9481 : }
1904 : }
1905 6638 : }
1906 :
1907 6638 : std::string strDropped;
1908 6638 : if (fLogIPs) {
1909 2 : strDropped = strprintf("connection from %s dropped", addr.ToStringAddrPort());
1910 2 : } else {
1911 6636 : strDropped = "connection dropped";
1912 : }
1913 :
1914 6638 : if (!fNetworkActive) {
1915 750 : LogPrint(BCLog::NET_NETCONN, "%s: not accepting new connections\n", strDropped);
1916 750 : return;
1917 : }
1918 :
1919 5888 : if (!sock->IsSelectable(/*is_select=*/::g_socket_events_mode == SocketEventsMode::Select)) {
1920 0 : LogPrintf("%s: non-selectable socket\n", strDropped);
1921 0 : return;
1922 : }
1923 :
1924 : // According to the internet TCP_NODELAY is not carried into accepted sockets
1925 : // on all platforms. Set it again here just to be sure.
1926 5888 : const int on{1};
1927 5888 : if (sock->SetSockOpt(IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) == SOCKET_ERROR) {
1928 0 : LogPrint(BCLog::NET, "connection from %s: unable to set TCP_NODELAY, continuing anyway\n",
1929 : addr.ToStringAddrPort());
1930 0 : }
1931 :
1932 : // Don't accept connections from banned peers.
1933 11776 : bool banned = m_banman && m_banman->IsBanned(addr);
1934 5888 : if (!NetPermissions::HasFlag(permission_flags, NetPermissionFlags::NoBan) && banned)
1935 : {
1936 6 : LogPrint(BCLog::NET_NETCONN, "%s (banned)\n", strDropped);
1937 6 : return;
1938 : }
1939 :
1940 : // Only accept connections from discouraged peers if our inbound slots aren't (almost) full.
1941 11764 : bool discouraged = m_banman && m_banman->IsDiscouraged(addr);
1942 5882 : if (!NetPermissions::HasFlag(permission_flags, NetPermissionFlags::NoBan) && nInbound + 1 >= nMaxInbound && discouraged)
1943 : {
1944 0 : LogPrint(BCLog::NET_NETCONN, "connection from %s dropped (discouraged)\n", addr.ToStringAddrPort());
1945 0 : return;
1946 : }
1947 :
1948 : // Evict connections until we are below nMaxInbound. In case eviction protection resulted in nodes to not be evicted
1949 : // before, they might get evicted in batches now (after the protection timeout).
1950 : // We don't evict verified MN connections and also don't take them into account when checking limits. We can do this
1951 : // because we know that such connections are naturally limited by the total number of MNs, so this is not usable
1952 : // for attacks.
1953 5884 : while (nInbound - nVerifiedInboundMasternodes >= nMaxInbound)
1954 : {
1955 2 : if (!AttemptToEvictConnection()) {
1956 : // No connection to evict, disconnect the new connection
1957 0 : LogPrint(BCLog::NET_NETCONN, "failed to find an eviction candidate - connection dropped (full)\n");
1958 0 : return;
1959 : }
1960 2 : nInbound--;
1961 : }
1962 :
1963 : // don't accept incoming connections until blockchain is synced
1964 5882 : if (m_active_masternode && !mn_sync.IsBlockchainSynced()) {
1965 724 : LogPrint(BCLog::NET_NETCONN, "AcceptConnection -- blockchain is not synced yet, skipping inbound connection attempt\n");
1966 724 : return;
1967 : }
1968 :
1969 5158 : NodeId id = GetNewNodeId();
1970 5158 : uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
1971 :
1972 5158 : ServiceFlags nodeServices = nLocalServices;
1973 5158 : if (NetPermissions::HasFlag(permission_flags, NetPermissionFlags::BloomFilter)) {
1974 4 : nodeServices = static_cast<ServiceFlags>(nodeServices | NODE_BLOOM);
1975 4 : }
1976 :
1977 5158 : const bool inbound_onion = std::find(m_onion_binds.begin(), m_onion_binds.end(), addr_bind) != m_onion_binds.end();
1978 : // The V2Transport transparently falls back to V1 behavior when an incoming V1 connection is
1979 : // detected, so use it whenever we signal NODE_P2P_V2.
1980 5158 : const bool use_v2transport(nodeServices & NODE_P2P_V2);
1981 :
1982 10316 : CNode* pnode = new CNode(id,
1983 5158 : std::move(sock),
1984 5158 : addr,
1985 5158 : CalculateKeyedNetGroup(addr),
1986 5158 : nonce,
1987 5158 : addr_bind,
1988 5158 : /*addrNameIn=*/"",
1989 : ConnectionType::INBOUND,
1990 5158 : inbound_onion,
1991 30948 : CNodeOptions{
1992 5158 : .permission_flags = permission_flags,
1993 : .prefer_evict = discouraged,
1994 : .recv_flood_size = nReceiveFloodSize,
1995 : .use_v2transport = use_v2transport,
1996 : });
1997 : pnode->AddRef();
1998 : m_msgproc->InitializeNode(*pnode, nodeServices);
1999 :
2000 : {
2001 : LOCK(pnode->m_sock_mutex);
2002 : if (fLogIPs) {
2003 : LogPrint(BCLog::NET_NETCONN, "connection from %s accepted, sock=%d, peer=%d\n", addr.ToStringAddrPort(), pnode->m_sock->Get(), pnode->GetId());
2004 : } else {
2005 : LogPrint(BCLog::NET_NETCONN, "connection accepted, sock=%d, peer=%d\n", pnode->m_sock->Get(), pnode->GetId());
2006 : }
2007 : }
2008 :
2009 : {
2010 : LOCK(m_nodes_mutex);
2011 : m_nodes.push_back(pnode);
2012 : }
2013 : {
2014 : LOCK2(cs_mapSocketToNode, pnode->m_sock_mutex);
2015 : mapSocketToNode.emplace(pnode->m_sock->Get(), pnode);
2016 : if (m_edge_trig_events) {
2017 : if (!m_edge_trig_events->RegisterEvents(pnode->m_sock->Get())) {
2018 : LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n");
2019 : }
2020 : }
2021 : if (m_wakeup_pipe) {
2022 : m_wakeup_pipe->Write();
2023 : }
2024 : }
2025 :
2026 : // We received a new connection, harvest entropy from the time (and our peer count)
2027 : RandAddEvent((uint32_t)id);
2028 0 : }
2029 :
2030 146 : bool CConnman::AddConnection(const std::string& address, ConnectionType conn_type, bool use_v2transport = false)
2031 : {
2032 146 : AssertLockNotHeld(m_unused_i2p_sessions_mutex);
2033 146 : std::optional<int> max_connections;
2034 146 : switch (conn_type) {
2035 : case ConnectionType::INBOUND:
2036 : case ConnectionType::MANUAL:
2037 0 : return false;
2038 : case ConnectionType::OUTBOUND_FULL_RELAY:
2039 94 : max_connections = m_max_outbound_full_relay;
2040 94 : break;
2041 : case ConnectionType::BLOCK_RELAY:
2042 42 : max_connections = m_max_outbound_block_relay;
2043 42 : break;
2044 : // no limit for ADDR_FETCH because -seednode has no limit either
2045 : case ConnectionType::ADDR_FETCH:
2046 6 : break;
2047 : // no limit for FEELER connections since they're short-lived
2048 : case ConnectionType::FEELER:
2049 4 : break;
2050 : } // no default case, so the compiler can warn about missing cases
2051 :
2052 : // Count existing connections
2053 752 : int existing_connections = WITH_READ_LOCK(m_nodes_mutex,
2054 : return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; }););
2055 :
2056 : // Max connections of specified type already exist
2057 146 : if (max_connections != std::nullopt && existing_connections >= max_connections) return false;
2058 :
2059 : // Max total outbound connections already exist
2060 146 : CSemaphoreGrant grant(*semOutbound, true);
2061 146 : if (!grant) return false;
2062 :
2063 146 : OpenNetworkConnection(CAddress(), false, std::move(grant), address.c_str(), conn_type, /*use_v2transport=*/use_v2transport);
2064 146 : return true;
2065 146 : }
2066 :
2067 29789339 : void CConnman::DisconnectNodes()
2068 : {
2069 29789339 : AssertLockNotHeld(m_nodes_mutex);
2070 29789339 : AssertLockNotHeld(m_reconnections_mutex);
2071 :
2072 : // Use a temporary variable to accumulate desired reconnections, so we don't need
2073 : // m_reconnections_mutex while holding m_nodes_mutex.
2074 29789339 : decltype(m_reconnections) reconnections_to_add;
2075 :
2076 : // Quick check without exclusive lock to avoid unnecessary locking
2077 : // Note: This is a best-effort optimization. If network is active and no nodes
2078 : // are marked for disconnect, we can skip the expensive exclusive lock.
2079 : // If network is inactive or nodes need cleanup, we must take the lock.
2080 29789339 : bool has_to_disconnect = false;
2081 29789339 : if (fNetworkActive) {
2082 : {
2083 29782175 : READ_LOCK(m_nodes_mutex);
2084 65747219 : for (CNode* pnode : m_nodes) {
2085 35972224 : if (pnode->fDisconnect) {
2086 7180 : has_to_disconnect = true;
2087 7180 : break;
2088 : }
2089 : }
2090 29782175 : }
2091 : // Only return early if network is active AND no nodes need disconnection
2092 : // AND no disconnected nodes need cleanup (checked below)
2093 29782175 : if (!has_to_disconnect && m_nodes_disconnected.empty()) {
2094 29769895 : return;
2095 : }
2096 12280 : }
2097 :
2098 19444 : if (has_to_disconnect || !fNetworkActive) {
2099 11956 : LOCK(m_nodes_mutex);
2100 :
2101 9568 : if (!fNetworkActive) {
2102 : // Disconnect any connected nodes
2103 4704 : for (CNode* pnode : m_nodes) {
2104 2316 : if (!pnode->fDisconnect) {
2105 588 : LogPrint(BCLog::NET_NETCONN, "Network not active, dropping peer=%d\n", pnode->GetId());
2106 588 : pnode->fDisconnect = true;
2107 588 : }
2108 : }
2109 2388 : }
2110 :
2111 : // Disconnect unused nodes, if we have any
2112 40956 : for (auto it = m_nodes.begin(); it != m_nodes.end(); )
2113 : {
2114 31388 : CNode* pnode = *it;
2115 31388 : if (pnode->fDisconnect)
2116 : {
2117 : // If we were the ones who initiated the disconnect, we must assume that the other side wants to see
2118 : // pending messages. If the other side initiated the disconnect (or disconnected after we've shutdown
2119 : // the socket), we can be pretty sure that they are not interested in any pending messages anymore and
2120 : // thus can immediately close the socket.
2121 9825 : if (!pnode->fOtherSideDisconnected) {
2122 4973 : if (pnode->nDisconnectLingerTime.load() == SteadyClock::time_point{}) {
2123 : // let's not immediately close the socket but instead wait for at least 100ms so that there is a
2124 : // chance to flush all/some pending data. Otherwise the other side might not receive REJECT messages
2125 : // that were pushed right before setting fDisconnect=true
2126 : // Flushing must happen in two places to ensure data can be received by the other side:
2127 : // 1. vSendMsg must be empty and all messages sent via send(). This is ensured by SocketHandler()
2128 : // being called before DisconnectNodes and also by the linger time
2129 : // 2. Internal socket send buffers must be flushed. This is ensured solely by the linger time
2130 1529 : pnode->nDisconnectLingerTime = SteadyClock::now() + 100ms;
2131 1529 : }
2132 4973 : if (SteadyClock::now() < pnode->nDisconnectLingerTime.load()) {
2133 : // everything flushed to the kernel?
2134 4953 : const auto& [to_send, more, _msg_type] = pnode->m_transport->GetBytesToSend(pnode->nSendMsgSize != 0);
2135 4953 : const bool queue_is_empty{to_send.empty() && !more};
2136 4953 : if (!pnode->fSocketShutdown && queue_is_empty) {
2137 1524 : LOCK(pnode->m_sock_mutex);
2138 1524 : if (pnode->m_sock) {
2139 : // Give the other side a chance to detect the disconnect as early as possible (recv() will return 0)
2140 1508 : ::shutdown(pnode->m_sock->Get(), SD_SEND);
2141 1508 : }
2142 1524 : pnode->fSocketShutdown = true;
2143 1524 : }
2144 4953 : ++it;
2145 4953 : continue;
2146 : }
2147 20 : }
2148 :
2149 4872 : if (fLogIPs) {
2150 0 : LogPrintf("ThreadSocketHandler -- removing node: peer=%d addr=%s nRefCount=%d fInbound=%d m_masternode_connection=%d m_masternode_iqr_connection=%d\n",
2151 : pnode->GetId(), pnode->addr.ToStringAddrPort(), pnode->GetRefCount(), pnode->IsInboundConn(), pnode->m_masternode_connection, pnode->m_masternode_iqr_connection);
2152 0 : } else {
2153 4872 : LogPrintf("ThreadSocketHandler -- removing node: peer=%d nRefCount=%d fInbound=%d m_masternode_connection=%d m_masternode_iqr_connection=%d\n",
2154 : pnode->GetId(), pnode->GetRefCount(), pnode->IsInboundConn(), pnode->m_masternode_connection, pnode->m_masternode_iqr_connection);
2155 : }
2156 :
2157 : // remove from m_nodes
2158 4872 : it = m_nodes.erase(it);
2159 :
2160 : // Add to reconnection list if appropriate. We don't reconnect right here, because
2161 : // the creation of a connection is a blocking operation (up to several seconds),
2162 : // and we don't want to hold up the socket handler thread for that long.
2163 4872 : if (pnode->m_transport->ShouldReconnectV1()) {
2164 6 : reconnections_to_add.push_back({
2165 6 : .addr_connect = pnode->addr,
2166 6 : .grant = std::move(pnode->grantOutbound),
2167 6 : .destination = pnode->m_dest,
2168 6 : .conn_type = pnode->m_conn_type,
2169 : .use_v2transport = false,
2170 6 : .masternode_connection = pnode->m_masternode_connection,
2171 6 : .masternode_probe_connection = pnode->m_masternode_probe_connection});
2172 6 : LogPrint(BCLog::NET, "retrying with v1 transport protocol for peer=%d\n", pnode->GetId());
2173 6 : }
2174 :
2175 : // release outbound grant (if any)
2176 4872 : pnode->grantOutbound.Release();
2177 :
2178 : // close socket and cleanup
2179 4872 : pnode->CloseSocketDisconnect(this);
2180 :
2181 : // update connection count by network
2182 4872 : if (pnode->IsManualOrFullOutboundConn()) --m_network_conn_counts[pnode->addr.GetNetwork()];
2183 :
2184 : // hold in disconnected pool until all refs are released
2185 4872 : pnode->Release();
2186 4872 : m_nodes_disconnected.push_back(pnode);
2187 4872 : } else {
2188 21563 : ++it;
2189 : }
2190 : }
2191 9568 : }
2192 : {
2193 : // Delete disconnected nodes
2194 17056 : std::list<CNode*> nodes_disconnected_copy = m_nodes_disconnected;
2195 26700 : for (auto it = m_nodes_disconnected.begin(); it != m_nodes_disconnected.end(); )
2196 : {
2197 12032 : CNode* pnode = *it;
2198 : // Destroy the object only after other threads have stopped using it.
2199 12032 : if (pnode->GetRefCount() <= 0) {
2200 4869 : it = m_nodes_disconnected.erase(it);
2201 4869 : DeleteNode(pnode);
2202 4869 : } else {
2203 7163 : ++it;
2204 : }
2205 : }
2206 14668 : }
2207 : {
2208 : // Move entries from reconnections_to_add to m_reconnections.
2209 14668 : LOCK(m_reconnections_mutex);
2210 14668 : m_reconnections.splice(m_reconnections.end(), std::move(reconnections_to_add));
2211 14668 : }
2212 29789339 : }
2213 :
2214 29784563 : void CConnman::NotifyNumConnectionsChanged(CMasternodeSync& mn_sync)
2215 : {
2216 59569126 : size_t nodes_size = WITH_READ_LOCK(m_nodes_mutex, return m_nodes.size(););
2217 :
2218 : // If we had zero connections before and new connections now or if we just dropped
2219 : // to zero connections reset the sync process if its outdated.
2220 29784563 : if ((nodes_size > 0 && nPrevNodeCount == 0) || (nodes_size == 0 && nPrevNodeCount > 0)) {
2221 26751 : mn_sync.Reset();
2222 26751 : }
2223 :
2224 29761449 : if (nodes_size != nPrevNodeCount) {
2225 13855 : nPrevNodeCount = nodes_size;
2226 13855 : if (m_client_interface) {
2227 13855 : m_client_interface->NotifyNumConnectionsChanged(nodes_size);
2228 13855 : }
2229 :
2230 13855 : CalculateNumConnectionsChangedStats();
2231 13855 : }
2232 29761449 : }
2233 :
2234 13855 : void CConnman::CalculateNumConnectionsChangedStats()
2235 : {
2236 13855 : if (!::g_stats_client->active()) {
2237 13855 : return;
2238 : }
2239 :
2240 : // count various node attributes for statsD
2241 0 : int fullNodes = 0;
2242 0 : int spvNodes = 0;
2243 0 : int inboundNodes = 0;
2244 0 : int outboundNodes = 0;
2245 0 : int ipv4Nodes = 0;
2246 0 : int ipv6Nodes = 0;
2247 0 : int torNodes = 0;
2248 0 : mapMsgTypeSize mapRecvBytesMsgStats;
2249 0 : mapMsgTypeSize mapSentBytesMsgStats;
2250 0 : for (const std::string &msg : getAllNetMessageTypes()) {
2251 0 : mapRecvBytesMsgStats[msg] = 0;
2252 0 : mapSentBytesMsgStats[msg] = 0;
2253 : }
2254 0 : mapRecvBytesMsgStats[NET_MESSAGE_TYPE_OTHER] = 0;
2255 0 : mapSentBytesMsgStats[NET_MESSAGE_TYPE_OTHER] = 0;
2256 0 : const NodesSnapshot snap{*this, /* cond = */ CConnman::FullyConnectedOnly};
2257 0 : for (auto pnode : snap.Nodes()) {
2258 0 : WITH_LOCK(pnode->cs_vRecv, pnode->UpdateRecvMapWithStats(mapRecvBytesMsgStats));
2259 0 : WITH_LOCK(pnode->cs_vSend, pnode->UpdateSentMapWithStats(mapSentBytesMsgStats));
2260 0 : if (pnode->m_bloom_filter_loaded.load()) {
2261 0 : spvNodes++;
2262 0 : } else {
2263 0 : fullNodes++;
2264 : }
2265 0 : if(pnode->IsInboundConn())
2266 0 : inboundNodes++;
2267 : else
2268 0 : outboundNodes++;
2269 0 : if(pnode->addr.IsIPv4())
2270 0 : ipv4Nodes++;
2271 0 : if(pnode->addr.IsIPv6())
2272 0 : ipv6Nodes++;
2273 0 : if(pnode->addr.IsTor())
2274 0 : torNodes++;
2275 0 : const auto last_ping_time = count_microseconds(pnode->m_last_ping_time);
2276 0 : if (last_ping_time > 0)
2277 0 : ::g_stats_client->timing("peers.ping_us", last_ping_time, 1.0f);
2278 : }
2279 0 : for (const std::string &msg : getAllNetMessageTypes()) {
2280 0 : ::g_stats_client->gauge("bandwidth.message." + msg + ".totalBytesReceived", mapRecvBytesMsgStats[msg], 1.0f);
2281 0 : ::g_stats_client->gauge("bandwidth.message." + msg + ".totalBytesSent", mapSentBytesMsgStats[msg], 1.0f);
2282 : }
2283 0 : ::g_stats_client->gauge("peers.totalConnections", nPrevNodeCount, 1.0f);
2284 0 : ::g_stats_client->gauge("peers.spvNodeConnections", spvNodes, 1.0f);
2285 0 : ::g_stats_client->gauge("peers.fullNodeConnections", fullNodes, 1.0f);
2286 0 : ::g_stats_client->gauge("peers.inboundConnections", inboundNodes, 1.0f);
2287 0 : ::g_stats_client->gauge("peers.outboundConnections", outboundNodes, 1.0f);
2288 0 : ::g_stats_client->gauge("peers.ipv4Connections", ipv4Nodes, 1.0f);
2289 0 : ::g_stats_client->gauge("peers.ipv6Connections", ipv6Nodes, 1.0f);
2290 0 : ::g_stats_client->gauge("peers.torConnections", torNodes, 1.0f);
2291 13855 : }
2292 :
2293 2561811 : bool CConnman::ShouldRunInactivityChecks(const CNode& node, std::chrono::seconds now) const
2294 : {
2295 2561811 : return node.m_connected + m_peer_connect_timeout < now;
2296 : }
2297 :
2298 198569 : bool CConnman::InactivityCheck(const CNode& node) const
2299 : {
2300 : // Tests that see disconnects after using mocktime can start nodes with a
2301 : // large timeout. For example, -peertimeout=999999999.
2302 198569 : const auto now{GetTime<std::chrono::seconds>()};
2303 198569 : const auto last_send{node.m_last_send.load()};
2304 198569 : const auto last_recv{node.m_last_recv.load()};
2305 :
2306 198569 : if (!ShouldRunInactivityChecks(node, now)) return false;
2307 :
2308 35 : if (last_recv.count() == 0 || last_send.count() == 0) {
2309 8 : LogPrint(BCLog::NET, "socket no message in first %i seconds, %d %d peer=%d\n", count_seconds(m_peer_connect_timeout), last_recv.count() != 0, last_send.count() != 0, node.GetId());
2310 8 : return true;
2311 : }
2312 :
2313 27 : if (now > last_send + TIMEOUT_INTERVAL) {
2314 0 : LogPrint(BCLog::NET, "socket sending timeout: %is peer=%d\n", count_seconds(now - last_send), node.GetId());
2315 0 : return true;
2316 : }
2317 :
2318 27 : if (now > last_recv + TIMEOUT_INTERVAL) {
2319 0 : LogPrint(BCLog::NET, "socket receive timeout: %is peer=%d\n", count_seconds(now - last_recv), node.GetId());
2320 0 : return true;
2321 : }
2322 :
2323 27 : if (!node.fSuccessfullyConnected) {
2324 18 : if (node.m_transport->GetInfo().transport_type == TransportProtocolType::DETECTING) {
2325 4 : LogPrint(BCLog::NET, "V2 handshake timeout peer=%d\n", node.GetId());
2326 4 : } else {
2327 14 : LogPrint(BCLog::NET, "version handshake timeout peer=%d\n", node.GetId());
2328 : }
2329 18 : return true;
2330 : }
2331 :
2332 9 : return false;
2333 198569 : }
2334 :
2335 29784563 : Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
2336 : {
2337 29784563 : Sock::EventsPerSock events_per_sock;
2338 :
2339 59675752 : for (const ListenSocket& hListenSocket : vhListenSocket) {
2340 29891189 : events_per_sock.emplace(hListenSocket.sock->Get(), Sock::Events{Sock::RECV});
2341 : }
2342 :
2343 65753510 : for (CNode* pnode : nodes) {
2344 35968947 : bool select_recv = !pnode->fHasRecvData;
2345 35968947 : bool select_send = !pnode->fCanSendData;
2346 35968947 : if (!select_recv && !select_send) continue;
2347 :
2348 8087471 : LOCK(pnode->m_sock_mutex);
2349 8087471 : if (!pnode->m_sock) {
2350 27 : continue;
2351 : }
2352 :
2353 8087444 : Sock::Event requested{0};
2354 8087444 : if (select_send) {
2355 8492 : requested |= Sock::SEND;
2356 8492 : }
2357 8087444 : if (select_recv) {
2358 8087188 : requested |= Sock::RECV;
2359 8087188 : }
2360 :
2361 8087444 : events_per_sock.emplace(pnode->m_sock->Get(), Sock::Events{requested});
2362 8087471 : }
2363 :
2364 29784563 : if (m_wakeup_pipe) {
2365 : // We add a pipe to the read set so that the select() call can be woken up from the outside
2366 : // This is done when data is added to send buffers (vSendMsg) or when new peers are added
2367 : // This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to
2368 : // timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually
2369 : // run on Linux and friends.
2370 29784563 : events_per_sock.emplace(m_wakeup_pipe->m_pipe[0], Sock::Events{Sock::RECV});
2371 29784563 : }
2372 :
2373 29784563 : return events_per_sock;
2374 29784563 : }
2375 :
2376 29784563 : void CConnman::SocketHandler(CMasternodeSync& mn_sync)
2377 : {
2378 29784563 : AssertLockNotHeld(m_total_bytes_sent_mutex);
2379 :
2380 29784563 : Sock::EventsPerSock events_per_sock;
2381 :
2382 59569126 : bool only_poll = [this]() {
2383 : // Check if we have work to do and thus should avoid waiting for events
2384 29784563 : READ_LOCK(m_nodes_mutex); // We acquire this to avoid the pointers stored in mapSendableNodes and mapReceivableNodes being invalidated by ThreadSocketHandler
2385 29784563 : LOCK(cs_sendable_receivable_nodes);
2386 29784563 : if (!mapReceivableNodes.empty()) {
2387 27881715 : return true;
2388 : }
2389 9969552 : for (const auto& p : mapSendableNodes) {
2390 8066987 : const auto& [to_send, more, _msg_type] = p.second->m_transport->GetBytesToSend(p.second->nSendMsgSize != 0);
2391 8066987 : if (!to_send.empty()) {
2392 283 : return true;
2393 : }
2394 : }
2395 1902565 : return false;
2396 29784563 : }();
2397 :
2398 : {
2399 29784563 : const NodesSnapshot snap{*this, /* cond = */ CConnman::AllNodes, /* shuffle = */ false};
2400 :
2401 29784563 : const auto timeout = std::chrono::milliseconds(only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
2402 29784563 : const bool is_lt = socketEventsMode == SocketEventsMode::Poll || socketEventsMode == SocketEventsMode::Select;
2403 :
2404 : // Check for the readiness of the already connected sockets and the
2405 : // listening sockets in one call ("readiness" as in poll(2) or
2406 : // select(2)). If none are ready, wait for a short while and return
2407 : // empty sets.
2408 29784563 : events_per_sock = GenerateWaitSockets(snap.Nodes());
2409 29784563 : if ((is_lt && events_per_sock.empty()) || !Sock::WaitManyInternal(timeout, events_per_sock, SocketEventsParams{socketEventsMode, GetModeFileDescriptor(), ToggleWakeupPipe})) {
2410 0 : if (is_lt) {
2411 0 : interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS));
2412 0 : }
2413 0 : }
2414 :
2415 : // Drain the wakeup pipe
2416 29784563 : if (m_wakeup_pipe && events_per_sock.find(m_wakeup_pipe->m_pipe[0]) != events_per_sock.end()) {
2417 520430 : m_wakeup_pipe->Drain();
2418 520430 : }
2419 :
2420 : // Service (send/receive) each of the already connected nodes.
2421 29784563 : SocketHandlerConnected(events_per_sock);
2422 29784563 : }
2423 :
2424 : // Accept new connections from listening sockets.
2425 29784563 : SocketHandlerListening(events_per_sock, mn_sync);
2426 29784563 : }
2427 :
2428 29784563 : void CConnman::SocketHandlerConnected(const Sock::EventsPerSock& events_per_sock)
2429 : {
2430 29784563 : AssertLockNotHeld(m_total_bytes_sent_mutex);
2431 :
2432 29784563 : if (interruptNet) return;
2433 :
2434 29781743 : std::set<CNode*> node_err_set;
2435 29781743 : std::set<CNode*> node_recv_set;
2436 29781743 : std::set<CNode*> node_send_set;
2437 : {
2438 29781743 : LOCK(cs_mapSocketToNode);
2439 89116243 : for (const auto& [sock, events] : events_per_sock) {
2440 59827430 : auto it = mapSocketToNode.find(sock);
2441 29913715 : if (it == mapSocketToNode.end()) continue;
2442 29386651 : if (events.occurred & Sock::ERR) {
2443 7213 : it->second->AddRef();
2444 7213 : node_err_set.emplace(it->second);
2445 7213 : }
2446 29386651 : if (events.occurred & Sock::RECV) {
2447 28616315 : if (events.occurred & Sock::ERR) continue;
2448 28616169 : LOCK(cs_sendable_receivable_nodes);
2449 28616169 : auto jt = mapReceivableNodes.emplace(it->second->GetId(), it->second);
2450 28616169 : assert(jt.first->second == it->second);
2451 28616169 : it->second->fHasRecvData = true;
2452 28616169 : }
2453 29386505 : if (events.occurred & Sock::SEND) {
2454 797257 : LOCK(cs_sendable_receivable_nodes);
2455 797257 : auto jt = mapSendableNodes.emplace(it->second->GetId(), it->second);
2456 797257 : assert(jt.first->second == it->second);
2457 797257 : it->second->fCanSendData = true;
2458 797257 : }
2459 : }
2460 29781743 : }
2461 :
2462 65748757 : ForEachNode(AllNodes, [&](CNode* pnode) {
2463 35967014 : const auto& [to_send, more, _msg_type] = pnode->m_transport->GetBytesToSend(pnode->nSendMsgSize != 0);
2464 : // Collect nodes that have a receivable socket, implement the following logic:
2465 : // * If there is data to send, try sending data. As this only
2466 : // happens when optimistic write failed, we choose to first drain the
2467 : // write buffer in this case before receiving more. This avoids
2468 : // needlessly queueing received data, if the remote peer is not themselves
2469 : // receiving data. This means properly utilizing TCP flow control signalling.
2470 : // * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try
2471 : // receiving data (which should succeed as the socket signalled as receivable).
2472 35967014 : if (pnode->fHasRecvData && !pnode->fPauseRecv && !pnode->fDisconnect &&
2473 792991 : (!pnode->m_transport->ReceivedMessageComplete() || to_send.empty())) {
2474 792991 : pnode->AddRef();
2475 792991 : node_recv_set.emplace(pnode);
2476 792991 : }
2477 :
2478 : // Collect nodes that have data to send and have a socket with non-empty write buffers
2479 35967014 : if (pnode->fCanSendData && (!pnode->m_transport->ReceivedMessageComplete() || !to_send.empty())) {
2480 35965423 : pnode->AddRef();
2481 35965423 : node_send_set.emplace(pnode);
2482 35965423 : }
2483 35967014 : });
2484 :
2485 65747166 : for (CNode* pnode : node_send_set) {
2486 35965423 : if (interruptNet) {
2487 0 : break;
2488 : }
2489 :
2490 : // Send data
2491 71930846 : auto [bytes_sent, data_left] = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
2492 35965423 : if (bytes_sent) {
2493 757222 : RecordBytesSent(bytes_sent);
2494 :
2495 : // If both receiving and (non-optimistic) sending were possible, we first attempt
2496 : // sending. If that succeeds, but does not fully drain the send queue, do not
2497 : // attempt to receive. This avoids needlessly queueing data if the remote peer
2498 : // is slow at receiving data, by means of TCP flow control. We only do this when
2499 : // sending actually succeeded to make sure progress is always made; otherwise a
2500 : // deadlock would be possible when both sides have data to send, but neither is
2501 : // receiving.
2502 757222 : if (data_left && node_recv_set.erase(pnode)) {
2503 69 : pnode->Release();
2504 69 : }
2505 757222 : }
2506 : }
2507 :
2508 29788956 : for (CNode* pnode : node_err_set)
2509 : {
2510 7213 : if (interruptNet) {
2511 0 : break;
2512 : }
2513 : // let recv() return errors and then handle it
2514 7213 : SocketRecvData(pnode);
2515 : }
2516 :
2517 30574665 : for (CNode* pnode : node_recv_set)
2518 : {
2519 792922 : if (interruptNet) {
2520 0 : break;
2521 : }
2522 792922 : if (pnode->fPauseRecv) {
2523 0 : continue;
2524 : }
2525 :
2526 792922 : SocketRecvData(pnode);
2527 : }
2528 :
2529 29788956 : for (auto& node : node_err_set) {
2530 7213 : node->Release();
2531 : }
2532 30574665 : for (auto& node : node_recv_set) {
2533 792922 : node->Release();
2534 : }
2535 65747166 : for (auto& node : node_send_set) {
2536 35965423 : node->Release();
2537 : }
2538 :
2539 29781743 : if (interruptNet) {
2540 0 : return;
2541 : }
2542 :
2543 : {
2544 29781743 : LOCK(cs_sendable_receivable_nodes);
2545 : // remove nodes from mapSendableNodes, so that the next iteration knows that there is no work to do
2546 : // (even if there are pending messages to be sent)
2547 65742303 : for (auto it = mapSendableNodes.begin(); it != mapSendableNodes.end(); ) {
2548 35960560 : if (!it->second->fCanSendData) {
2549 132 : it = mapSendableNodes.erase(it);
2550 132 : } else {
2551 35960428 : ++it;
2552 : }
2553 : }
2554 : // clean up mapReceivableNodes from nodes that were receivable in the last iteration but aren't anymore
2555 58398035 : for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
2556 28616292 : if (!it->second->fHasRecvData) {
2557 734560 : it = mapReceivableNodes.erase(it);
2558 734560 : } else {
2559 27881732 : ++it;
2560 : }
2561 : }
2562 29781743 : }
2563 29784563 : }
2564 :
2565 29784563 : void CConnman::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock, CMasternodeSync& mn_sync)
2566 : {
2567 59672717 : for (const ListenSocket& listen_socket : vhListenSocket) {
2568 29890957 : if (interruptNet) {
2569 2803 : return;
2570 : }
2571 29888154 : const auto it = events_per_sock.find(listen_socket.sock->Get());
2572 29888154 : if (it != events_per_sock.end() && (it->second.occurred & Sock::RECV)) {
2573 6638 : AcceptConnection(listen_socket, mn_sync);
2574 6638 : }
2575 : }
2576 29784563 : }
2577 :
2578 800135 : size_t CConnman::SocketRecvData(CNode *pnode)
2579 : {
2580 : // typical socket buffer is 8K-64K
2581 : uint8_t pchBuf[0x10000];
2582 800135 : int nBytes = 0;
2583 : {
2584 800135 : LOCK(pnode->m_sock_mutex);
2585 800135 : if (!pnode->m_sock)
2586 0 : return 0;
2587 800135 : nBytes = recv(pnode->m_sock->Get(), (char*)pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
2588 800135 : if (nBytes < (int)sizeof(pchBuf)) {
2589 741644 : pnode->fHasRecvData = false;
2590 741644 : }
2591 800135 : }
2592 800135 : if (nBytes > 0)
2593 : {
2594 793098 : bool notify = false;
2595 793098 : if (!pnode->ReceiveMsgBytes(Span<const uint8_t>(pchBuf, nBytes), notify)) {
2596 20 : READ_LOCK(m_nodes_mutex); // is this here for lock ordering?
2597 20 : pnode->CloseSocketDisconnect(this);
2598 20 : }
2599 793098 : RecordBytesRecv(nBytes);
2600 793098 : if (notify) {
2601 734570 : pnode->MarkReceivedMsgsForProcessing();
2602 734570 : WakeMessageHandler();
2603 734570 : }
2604 793098 : }
2605 7037 : else if (nBytes == 0)
2606 : {
2607 : // socket closed gracefully
2608 4745 : if (!pnode->fDisconnect) {
2609 3255 : LogPrint(BCLog::NET, "socket closed for peer=%d\n", pnode->GetId());
2610 3255 : }
2611 4745 : READ_LOCK(m_nodes_mutex); // is this here for lock ordering?
2612 4745 : pnode->fOtherSideDisconnected = true; // avoid lingering
2613 4745 : pnode->CloseSocketDisconnect(this);
2614 4745 : }
2615 2292 : else if (nBytes < 0)
2616 : {
2617 : // error
2618 2292 : int nErr = WSAGetLastError();
2619 2292 : if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
2620 : {
2621 107 : if (!pnode->fDisconnect){
2622 85 : LogPrint(BCLog::NET, "socket recv error for peer=%d: %s\n", pnode->GetId(), NetworkErrorString(nErr));
2623 85 : }
2624 107 : READ_LOCK(m_nodes_mutex); // is this here for lock ordering?
2625 107 : pnode->fOtherSideDisconnected = true; // avoid lingering
2626 107 : pnode->CloseSocketDisconnect(this);
2627 107 : }
2628 2292 : }
2629 800135 : if (nBytes < 0) {
2630 2292 : return 0;
2631 : }
2632 797843 : return (size_t)nBytes;
2633 800135 : }
2634 :
2635 2821 : void CConnman::ThreadSocketHandler(CMasternodeSync& mn_sync)
2636 : {
2637 2821 : AssertLockNotHeld(m_total_bytes_sent_mutex);
2638 :
2639 2821 : auto nLastCleanupNodes = SteadyClock::time_point{};
2640 :
2641 29787384 : while (!interruptNet)
2642 : {
2643 : // Handle sockets before we do the next round of disconnects. This allows us to flush send buffers one last time
2644 : // before actually closing sockets. Receiving is however skipped in case a peer is pending to be disconnected
2645 29784563 : SocketHandler(mn_sync);
2646 29784563 : if (SteadyClock::now() - nLastCleanupNodes > 1s) {
2647 265763 : ForEachNode(AllNodes, [&](CNode* pnode) {
2648 198569 : if (InactivityCheck(*pnode)) pnode->fDisconnect = true;
2649 198569 : });
2650 67194 : nLastCleanupNodes = SteadyClock::now();
2651 67194 : }
2652 29784563 : DisconnectNodes();
2653 29784563 : NotifyNumConnectionsChanged(mn_sync);
2654 : }
2655 2821 : }
2656 :
2657 938800 : void CConnman::WakeMessageHandler()
2658 : {
2659 : {
2660 938800 : LOCK(mutexMsgProc);
2661 938800 : fMsgProcWake = true;
2662 938800 : }
2663 938800 : condMsgProc.notify_one();
2664 938800 : }
2665 :
2666 22 : void CConnman::ThreadDNSAddressSeed()
2667 : {
2668 22 : FastRandomContext rng;
2669 22 : std::vector<std::string> seeds = Params().DNSSeeds();
2670 22 : Shuffle(seeds.begin(), seeds.end(), rng);
2671 22 : int seeds_right_now = 0; // Number of seeds left before testing if we have enough connections
2672 22 : int found = 0;
2673 :
2674 22 : if (gArgs.GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED)) {
2675 : // When -forcednsseed is provided, query all.
2676 2 : seeds_right_now = seeds.size();
2677 22 : } else if (addrman.Size() == 0) {
2678 : // If we have no known peers, query all.
2679 : // This will occur on the first run, or if peers.dat has been
2680 : // deleted.
2681 10 : seeds_right_now = seeds.size();
2682 10 : }
2683 :
2684 : // goal: only query DNS seed if address need is acute
2685 : // * If we have a reasonable number of peers in addrman, spend
2686 : // some time trying them first. This improves user privacy by
2687 : // creating fewer identifying DNS requests, reduces trust by
2688 : // giving seeds less influence on the network topology, and
2689 : // reduces traffic to the seeds.
2690 : // * When querying DNS seeds query a few at once, this ensures
2691 : // that we don't give DNS seeds the ability to eclipse nodes
2692 : // that query them.
2693 : // * If we continue having problems, eventually query all the
2694 : // DNS seeds, and if that fails too, also try the fixed seeds.
2695 : // (done in ThreadOpenConnections)
2696 22 : const std::chrono::seconds seeds_wait_time = (addrman.Size() >= DNSSEEDS_DELAY_PEER_THRESHOLD ? DNSSEEDS_DELAY_MANY_PEERS : DNSSEEDS_DELAY_FEW_PEERS);
2697 :
2698 36 : for (const std::string& seed : seeds) {
2699 22 : if (seeds_right_now == 0) {
2700 10 : seeds_right_now += DNSSEEDS_TO_QUERY_AT_ONCE;
2701 :
2702 10 : if (addrman.Size() > 0) {
2703 10 : LogPrintf("Waiting %d seconds before querying DNS seeds.\n", seeds_wait_time.count());
2704 10 : std::chrono::seconds to_wait = seeds_wait_time;
2705 12 : while (to_wait.count() > 0) {
2706 : // if sleeping for the MANY_PEERS interval, wake up
2707 : // early to see if we have enough peers and can stop
2708 : // this thread entirely freeing up its resources
2709 10 : std::chrono::seconds w = std::min(DNSSEEDS_DELAY_FEW_PEERS, to_wait);
2710 10 : if (!interruptNet.sleep_for(w)) return;
2711 4 : to_wait -= w;
2712 :
2713 4 : int nRelevant = 0;
2714 : {
2715 4 : READ_LOCK(m_nodes_mutex);
2716 12 : for (const CNode* pnode : m_nodes) {
2717 8 : if (pnode->fSuccessfullyConnected && pnode->IsFullOutboundConn() && !pnode->m_masternode_probe_connection) ++nRelevant;
2718 : }
2719 4 : }
2720 4 : if (nRelevant >= 2) {
2721 2 : if (found > 0) {
2722 0 : LogPrintf("%d addresses found from DNS seeds\n", found);
2723 0 : LogPrintf("P2P peers available. Finished DNS seeding.\n");
2724 0 : } else {
2725 2 : LogPrintf("P2P peers available. Skipped DNS seeding.\n");
2726 : }
2727 2 : return;
2728 : }
2729 : }
2730 2 : }
2731 2 : }
2732 :
2733 14 : if (interruptNet) return;
2734 :
2735 : // hold off on querying seeds if P2P network deactivated
2736 14 : if (!fNetworkActive) {
2737 0 : LogPrintf("Waiting for network to be reactivated before querying DNS seeds.\n");
2738 0 : do {
2739 0 : if (!interruptNet.sleep_for(std::chrono::seconds{1})) return;
2740 0 : } while (!fNetworkActive);
2741 0 : }
2742 :
2743 14 : LogPrintf("Loading addresses from DNS seed %s\n", seed);
2744 : // If -proxy is in use, we make an ADDR_FETCH connection to the DNS resolved peer address
2745 : // for the base dns seed domain in chainparams
2746 14 : if (HaveNameProxy()) {
2747 2 : AddAddrFetch(seed);
2748 2 : } else {
2749 12 : std::vector<CAddress> vAdd;
2750 12 : ServiceFlags requiredServiceBits = GetDesirableServiceFlags(NODE_NONE);
2751 12 : std::string host = strprintf("x%x.%s", requiredServiceBits, seed);
2752 12 : CNetAddr resolveSource;
2753 12 : if (!resolveSource.SetInternal(host)) {
2754 0 : continue;
2755 : }
2756 : // Limit number of IPs learned from a single DNS seed. This limit exists to prevent the results from
2757 : // one DNS seed from dominating AddrMan. Note that the number of results from a UDP DNS query is
2758 : // bounded to 33 already, but it is possible for it to use TCP where a larger number of results can be
2759 : // returned.
2760 12 : unsigned int nMaxIPs = 32;
2761 12 : const auto addresses{LookupHost(host, nMaxIPs, true)};
2762 12 : if (!addresses.empty()) {
2763 0 : for (const CNetAddr& ip : addresses) {
2764 0 : CAddress addr = CAddress(CService(ip, Params().GetDefaultPort()), requiredServiceBits);
2765 0 : addr.nTime = rng.rand_uniform_delay(Now<NodeSeconds>() - 3 * 24h, -4 * 24h); // use a random age between 3 and 7 days old
2766 0 : vAdd.push_back(addr);
2767 0 : found++;
2768 0 : }
2769 0 : addrman.Add(vAdd, resolveSource);
2770 0 : } else {
2771 : // If the seed does not support a subdomain with our desired service bits,
2772 : // we make an ADDR_FETCH connection to the DNS resolved peer address for the
2773 : // base dns seed domain in chainparams
2774 12 : AddAddrFetch(seed);
2775 : }
2776 12 : }
2777 14 : --seeds_right_now;
2778 : }
2779 14 : LogPrintf("%d addresses found from DNS seeds\n", found);
2780 22 : }
2781 :
2782 4075 : void CConnman::DumpAddresses()
2783 : {
2784 4075 : const auto start{SteadyClock::now()};
2785 :
2786 4075 : DumpPeerAddresses(::gArgs, addrman);
2787 :
2788 4075 : LogPrint(BCLog::NET, "Flushed %d addresses to peers.dat %dms\n",
2789 : addrman.Size(), Ticks<std::chrono::milliseconds>(SteadyClock::now() - start));
2790 4075 : }
2791 :
2792 78065 : void CConnman::ProcessAddrFetch()
2793 : {
2794 78065 : AssertLockNotHeld(m_unused_i2p_sessions_mutex);
2795 78065 : std::string strDest;
2796 : {
2797 78065 : LOCK(m_addr_fetches_mutex);
2798 78065 : if (m_addr_fetches.empty())
2799 78063 : return;
2800 2 : strDest = m_addr_fetches.front();
2801 2 : m_addr_fetches.pop_front();
2802 78065 : }
2803 : // Attempt v2 connection if we support v2 - we'll reconnect with v1 if our
2804 : // peer doesn't support it or immediately disconnects us for another reason.
2805 2 : const bool use_v2transport(GetLocalServices() & NODE_P2P_V2);
2806 2 : CAddress addr;
2807 2 : CSemaphoreGrant grant(*semOutbound, /*fTry=*/true);
2808 2 : if (grant) {
2809 2 : OpenNetworkConnection(addr, false, std::move(grant), strDest.c_str(), ConnectionType::ADDR_FETCH, use_v2transport);
2810 2 : }
2811 78065 : }
2812 :
2813 2764 : bool CConnman::GetTryNewOutboundPeer() const
2814 : {
2815 2764 : return m_try_another_outbound_peer;
2816 : }
2817 :
2818 5509 : void CConnman::SetTryNewOutboundPeer(bool flag)
2819 : {
2820 5509 : m_try_another_outbound_peer = flag;
2821 5509 : LogPrint(BCLog::NET, "setting try another outbound peer=%s\n", flag ? "true" : "false");
2822 5509 : }
2823 :
2824 854 : void CConnman::StartExtraBlockRelayPeers()
2825 : {
2826 854 : LogPrint(BCLog::NET, "enabling extra block-relay-only peers\n");
2827 854 : m_start_extra_block_relay_peers = true;
2828 854 : }
2829 :
2830 : // Return the number of peers we have over our outbound connection limit
2831 : // Exclude peers that are marked for disconnect, or are going to be
2832 : // disconnected soon (eg ADDR_FETCH and FEELER)
2833 : // Also exclude peers that haven't finished initial connection handshake yet
2834 : // (so that we don't decide we're over our desired connection limit, and then
2835 : // evict some peer that has finished the handshake)
2836 6076 : int CConnman::GetExtraFullOutboundCount() const
2837 : {
2838 6076 : int full_outbound_peers = 0;
2839 : {
2840 6076 : READ_LOCK(m_nodes_mutex);
2841 25496 : for (const CNode* pnode : m_nodes) {
2842 : // don't count outbound masternodes
2843 19420 : if (pnode->m_masternode_connection) {
2844 8549 : continue;
2845 : }
2846 10871 : if (pnode->fSuccessfullyConnected && !pnode->fDisconnect && pnode->IsFullOutboundConn() && !pnode->m_masternode_probe_connection) {
2847 70 : ++full_outbound_peers;
2848 70 : }
2849 : }
2850 6076 : }
2851 6076 : return std::max(full_outbound_peers - m_max_outbound_full_relay, 0);
2852 0 : }
2853 :
2854 6076 : int CConnman::GetExtraBlockRelayCount() const
2855 : {
2856 6076 : int block_relay_peers = 0;
2857 : {
2858 6076 : READ_LOCK(m_nodes_mutex);
2859 25496 : for (const CNode* pnode : m_nodes) {
2860 19420 : if (pnode->fSuccessfullyConnected && !pnode->fDisconnect && pnode->IsBlockOnlyConn()) {
2861 21 : ++block_relay_peers;
2862 21 : }
2863 : }
2864 6076 : }
2865 6076 : return std::max(block_relay_peers - m_max_outbound_block_relay, 0);
2866 0 : }
2867 :
2868 77228 : std::unordered_set<Network> CConnman::GetReachableEmptyNetworks() const
2869 : {
2870 77228 : std::unordered_set<Network> networks{};
2871 617824 : for (int n = 0; n < NET_MAX; n++) {
2872 540596 : enum Network net = (enum Network)n;
2873 540596 : if (net == NET_UNROUTABLE || net == NET_INTERNAL) continue;
2874 386140 : if (g_reachable_nets.Contains(net) && addrman.Size(net, std::nullopt) == 0) {
2875 154402 : networks.insert(net);
2876 154402 : }
2877 386140 : }
2878 77228 : return networks;
2879 77228 : }
2880 :
2881 38 : bool CConnman::MultipleManualOrFullOutboundConns(Network net) const
2882 : {
2883 38 : AssertSharedLockHeld(m_nodes_mutex);
2884 38 : return m_network_conn_counts[net] > 1;
2885 : }
2886 :
2887 0 : bool CConnman::MaybePickPreferredNetwork(std::optional<Network>& network)
2888 : {
2889 0 : std::array<Network, 5> nets{NET_IPV4, NET_IPV6, NET_ONION, NET_I2P, NET_CJDNS};
2890 0 : Shuffle(nets.begin(), nets.end(), FastRandomContext());
2891 :
2892 0 : READ_LOCK(m_nodes_mutex);
2893 0 : for (const auto net : nets) {
2894 0 : if (g_reachable_nets.Contains(net) && m_network_conn_counts[net] == 0 && addrman.Size(net) != 0) {
2895 0 : network = net;
2896 0 : return true;
2897 : }
2898 : }
2899 :
2900 0 : return false;
2901 0 : }
2902 :
2903 859 : void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDeterministicMNManager& dmnman)
2904 : {
2905 859 : AssertLockNotHeld(m_unused_i2p_sessions_mutex);
2906 859 : AssertLockNotHeld(m_reconnections_mutex);
2907 859 : FastRandomContext rng;
2908 : // Connect to specific addresses
2909 859 : if (!connect.empty())
2910 : {
2911 : // Attempt v2 connection if we support v2 - we'll reconnect with v1 if our
2912 : // peer doesn't support it or immediately disconnects us for another reason.
2913 12 : const bool use_v2transport(GetLocalServices() & NODE_P2P_V2);
2914 12 : for (int64_t nLoop = 0;; nLoop++)
2915 : {
2916 28 : for (const std::string& strAddr : connect)
2917 : {
2918 16 : CAddress addr(CService(), NODE_NONE);
2919 16 : OpenNetworkConnection(addr, false, {}, strAddr.c_str(), ConnectionType::MANUAL, /*use_v2transport=*/use_v2transport);
2920 16 : for (int i = 0; i < 10 && i < nLoop; i++)
2921 : {
2922 0 : if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
2923 0 : return;
2924 0 : }
2925 16 : }
2926 12 : if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
2927 12 : return;
2928 0 : PerformReconnections();
2929 0 : }
2930 : }
2931 :
2932 : // Initiate network connections
2933 847 : auto start = GetTime<std::chrono::microseconds>();
2934 :
2935 : // Minimum time before next feeler connection (in microseconds).
2936 847 : auto next_feeler = GetExponentialRand(start, FEELER_INTERVAL);
2937 847 : auto next_extra_block_relay = GetExponentialRand(start, EXTRA_BLOCK_RELAY_ONLY_PEER_INTERVAL);
2938 847 : auto next_extra_network_peer{GetExponentialRand(start, EXTRA_NETWORK_PEER_INTERVAL)};
2939 847 : const bool dnsseed = gArgs.GetBoolArg("-dnsseed", DEFAULT_DNSSEED);
2940 847 : bool add_fixed_seeds = gArgs.GetBoolArg("-fixedseeds", DEFAULT_FIXEDSEEDS);
2941 847 : const bool use_seednodes{gArgs.IsArgSet("-seednode")};
2942 :
2943 847 : if (!add_fixed_seeds) {
2944 841 : LogPrintf("Fixed seeds are disabled\n");
2945 841 : }
2946 :
2947 78075 : while (!interruptNet)
2948 : {
2949 78065 : ProcessAddrFetch();
2950 :
2951 78065 : if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
2952 837 : return;
2953 :
2954 77228 : PerformReconnections();
2955 :
2956 77228 : CSemaphoreGrant grant(*semOutbound);
2957 77228 : if (interruptNet)
2958 0 : return;
2959 :
2960 77228 : const std::unordered_set<Network> fixed_seed_networks{GetReachableEmptyNetworks()};
2961 77228 : if (add_fixed_seeds && !fixed_seed_networks.empty()) {
2962 : // When the node starts with an empty peers.dat, there are a few other sources of peers before
2963 : // we fallback on to fixed seeds: -dnsseed, -seednode, -addnode
2964 : // If none of those are available, we fallback on to fixed seeds immediately, else we allow
2965 : // 60 seconds for any of those sources to populate addrman.
2966 6 : bool add_fixed_seeds_now = false;
2967 : // It is cheapest to check if enough time has passed first.
2968 6 : if (GetTime<std::chrono::seconds>() > start + std::chrono::minutes{1}) {
2969 4 : add_fixed_seeds_now = true;
2970 4 : LogPrintf("Adding fixed seeds as 60 seconds have passed and addrman is empty for at least one reachable network\n");
2971 4 : }
2972 :
2973 : // Perform cheap checks before locking a mutex.
2974 2 : else if (!dnsseed && !use_seednodes) {
2975 2 : LOCK(m_added_nodes_mutex);
2976 2 : if (m_added_node_params.empty()) {
2977 2 : add_fixed_seeds_now = true;
2978 2 : LogPrintf("Adding fixed seeds as -dnsseed=0 (or IPv4/IPv6 connections are disabled via -onlynet) and neither -addnode nor -seednode are provided\n");
2979 2 : }
2980 2 : }
2981 :
2982 6 : if (add_fixed_seeds_now) {
2983 6 : std::vector<CAddress> seed_addrs{ConvertSeeds(Params().FixedSeeds())};
2984 : // We will not make outgoing connections to peers that are unreachable
2985 : // (e.g. because of -onlynet configuration).
2986 : // Therefore, we do not add them to addrman in the first place.
2987 : // In case previously unreachable networks become reachable
2988 : // (e.g. in case of -onlynet changes by the user), fixed seeds will
2989 : // be loaded only for networks for which we have no addresses.
2990 12 : seed_addrs.erase(std::remove_if(seed_addrs.begin(), seed_addrs.end(),
2991 6 : [&fixed_seed_networks](const CAddress& addr) { return fixed_seed_networks.count(addr.GetNetwork()) == 0; }),
2992 6 : seed_addrs.end());
2993 6 : CNetAddr local;
2994 6 : local.SetInternal("fixedseeds");
2995 6 : addrman.Add(seed_addrs, local);
2996 6 : add_fixed_seeds = false;
2997 6 : LogPrintf("Added %d fixed seeds from reachable networks.\n", seed_addrs.size());
2998 6 : }
2999 6 : }
3000 :
3001 : //
3002 : // Choose an address to connect to based on most recently seen
3003 : //
3004 77228 : CAddress addrConnect;
3005 :
3006 : // Only connect out to one peer per ipv4/ipv6 network group (/16 for IPv4).
3007 : // This is only done for mainnet and testnet
3008 77228 : int nOutboundFullRelay = 0;
3009 77228 : int nOutboundBlockRelay = 0;
3010 77228 : int nOutboundOnionRelay = 0;
3011 77228 : int outbound_privacy_network_peers = 0;
3012 77228 : std::set<std::vector<unsigned char>> outbound_ipv46_peer_netgroups;
3013 :
3014 77228 : if (!Params().AllowMultipleAddressesFromGroup()) {
3015 0 : READ_LOCK(m_nodes_mutex);
3016 0 : for (const CNode* pnode : m_nodes) {
3017 0 : if (pnode->IsFullOutboundConn() && !pnode->m_masternode_connection) nOutboundFullRelay++;
3018 0 : if (pnode->IsBlockOnlyConn()) nOutboundBlockRelay++;
3019 0 : if (pnode->IsFullOutboundConn() && pnode->ConnectedThroughNetwork() == Network::NET_ONION) nOutboundOnionRelay++;
3020 :
3021 : // Make sure our persistent outbound slots to ipv4/ipv6 peers belong to different netgroups.
3022 0 : switch (pnode->m_conn_type) {
3023 : // We currently don't take inbound connections into account. Since they are
3024 : // free to make, an attacker could make them to prevent us from connecting to
3025 : // certain peers.
3026 : case ConnectionType::INBOUND:
3027 : // Short-lived outbound connections should not affect how we select outbound
3028 : // peers from addrman.
3029 : case ConnectionType::ADDR_FETCH:
3030 : case ConnectionType::FEELER:
3031 0 : break;
3032 : case ConnectionType::MANUAL:
3033 : case ConnectionType::OUTBOUND_FULL_RELAY:
3034 : case ConnectionType::BLOCK_RELAY:
3035 0 : const CAddress address{pnode->addr};
3036 0 : if (address.IsTor() || address.IsI2P() || address.IsCJDNS()) {
3037 : // Since our addrman-groups for these networks are
3038 : // random, without relation to the route we
3039 : // take to connect to these peers or to the
3040 : // difficulty in obtaining addresses with diverse
3041 : // groups, we don't worry about diversity with
3042 : // respect to our addrman groups when connecting to
3043 : // these networks.
3044 0 : ++outbound_privacy_network_peers;
3045 0 : } else {
3046 0 : outbound_ipv46_peer_netgroups.insert(m_netgroupman.GetGroup(address));
3047 : }
3048 0 : } // no default case, so the compiler can warn about missing cases
3049 : }
3050 0 : }
3051 :
3052 77228 : std::set<uint256> setConnectedMasternodes;
3053 : {
3054 77228 : READ_LOCK(m_nodes_mutex);
3055 374292 : for (const CNode* pnode : m_nodes) {
3056 297064 : auto verifiedProRegTxHash = pnode->GetVerifiedProRegTxHash();
3057 297064 : if (!verifiedProRegTxHash.IsNull()) {
3058 226646 : setConnectedMasternodes.emplace(verifiedProRegTxHash);
3059 226646 : }
3060 : }
3061 77228 : }
3062 :
3063 77228 : ConnectionType conn_type = ConnectionType::OUTBOUND_FULL_RELAY;
3064 77228 : auto now = GetTime<std::chrono::microseconds>();
3065 77228 : bool anchor = false;
3066 77228 : bool fFeeler = false;
3067 77228 : std::optional<Network> preferred_net;
3068 77228 : bool onion_only = false;
3069 :
3070 : // Determine what type of connection to open. Opening
3071 : // BLOCK_RELAY connections to addresses from anchors.dat gets the highest
3072 : // priority. Then we open OUTBOUND_FULL_RELAY priority until we
3073 : // meet our full-relay capacity. Then we open BLOCK_RELAY connection
3074 : // until we hit our block-relay-only peer limit.
3075 : // GetTryNewOutboundPeer() gets set when a stale tip is detected, so we
3076 : // try opening an additional OUTBOUND_FULL_RELAY connection. If none of
3077 : // these conditions are met, check to see if it's time to try an extra
3078 : // block-relay-only peer (to confirm our tip is current, see below) or the next_feeler
3079 : // timer to decide if we should open a FEELER.
3080 :
3081 77228 : if (!m_anchors.empty() && (nOutboundBlockRelay < m_max_outbound_block_relay)) {
3082 2 : conn_type = ConnectionType::BLOCK_RELAY;
3083 2 : anchor = true;
3084 77228 : } else if (nOutboundFullRelay < m_max_outbound_full_relay) {
3085 : // OUTBOUND_FULL_RELAY
3086 77226 : } else if (nOutboundBlockRelay < m_max_outbound_block_relay) {
3087 0 : conn_type = ConnectionType::BLOCK_RELAY;
3088 0 : } else if (GetTryNewOutboundPeer()) {
3089 : // OUTBOUND_FULL_RELAY
3090 0 : } else if (now > next_extra_block_relay && m_start_extra_block_relay_peers) {
3091 : // Periodically connect to a peer (using regular outbound selection
3092 : // methodology from addrman) and stay connected long enough to sync
3093 : // headers, but not much else.
3094 : //
3095 : // Then disconnect the peer, if we haven't learned anything new.
3096 : //
3097 : // The idea is to make eclipse attacks very difficult to pull off,
3098 : // because every few minutes we're finding a new peer to learn headers
3099 : // from.
3100 : //
3101 : // This is similar to the logic for trying extra outbound (full-relay)
3102 : // peers, except:
3103 : // - we do this all the time on an exponential timer, rather than just when
3104 : // our tip is stale
3105 : // - we potentially disconnect our next-youngest block-relay-only peer, if our
3106 : // newest block-relay-only peer delivers a block more recently.
3107 : // See the eviction logic in net_processing.cpp.
3108 : //
3109 : // Because we can promote these connections to block-relay-only
3110 : // connections, they do not get their own ConnectionType enum
3111 : // (similar to how we deal with extra outbound peers).
3112 0 : next_extra_block_relay = GetExponentialRand(now, EXTRA_BLOCK_RELAY_ONLY_PEER_INTERVAL);
3113 0 : conn_type = ConnectionType::BLOCK_RELAY;
3114 0 : } else if (now > next_feeler) {
3115 0 : next_feeler = GetExponentialRand(now, FEELER_INTERVAL);
3116 0 : conn_type = ConnectionType::FEELER;
3117 0 : fFeeler = true;
3118 0 : } else if (nOutboundFullRelay == m_max_outbound_full_relay &&
3119 0 : m_max_outbound_full_relay == MAX_OUTBOUND_FULL_RELAY_CONNECTIONS &&
3120 0 : now > next_extra_network_peer &&
3121 0 : MaybePickPreferredNetwork(preferred_net)) {
3122 : // Full outbound connection management: Attempt to get at least one
3123 : // outbound peer from each reachable network by making extra connections
3124 : // and then protecting "only" peers from a network during outbound eviction.
3125 : // This is not attempted if the user changed -maxconnections to a value
3126 : // so low that less than MAX_OUTBOUND_FULL_RELAY_CONNECTIONS are made,
3127 : // to prevent interactions with otherwise protected outbound peers.
3128 0 : next_extra_network_peer = GetExponentialRand(now, EXTRA_NETWORK_PEER_INTERVAL);
3129 0 : } else if (nOutboundOnionRelay < m_max_outbound_onion && g_reachable_nets.Contains(Network::NET_ONION)) {
3130 0 : onion_only = true;
3131 0 : } else {
3132 : // skip to next iteration of while loop
3133 0 : continue;
3134 : }
3135 :
3136 77228 : addrman.ResolveCollisions();
3137 :
3138 77228 : auto mnList = dmnman.GetListAtChainTip();
3139 :
3140 77228 : const auto current_time{NodeClock::now()};
3141 77228 : int nTries = 0;
3142 156171 : while (!interruptNet)
3143 : {
3144 78943 : if (anchor && !m_anchors.empty()) {
3145 2 : const CAddress addr = m_anchors.back();
3146 2 : m_anchors.pop_back();
3147 4 : if (!addr.IsValid() || IsLocal(addr) || !g_reachable_nets.Contains(addr) ||
3148 4 : !HasAllDesirableServiceFlags(addr.nServices) ||
3149 2 : outbound_ipv46_peer_netgroups.count(m_netgroupman.GetGroup(addr))) continue;
3150 2 : addrConnect = addr;
3151 2 : LogPrint(BCLog::NET, "Trying to make an anchor connection to %s\n", addrConnect.ToStringAddrPort());
3152 2 : break;
3153 2 : }
3154 :
3155 : // If we didn't find an appropriate destination after trying 100 addresses fetched from addrman,
3156 : // stop this loop, and let the outer loop run again (which sleeps, adds seed nodes, recalculates
3157 : // already-connected network ranges, ...) before trying new addrman addresses.
3158 78941 : nTries++;
3159 78941 : if (nTries > 100)
3160 0 : break;
3161 :
3162 78941 : CAddress addr;
3163 78941 : NodeSeconds addr_last_try{0s};
3164 :
3165 78941 : if (fFeeler) {
3166 : // First, try to get a tried table collision address. This returns
3167 : // an empty (invalid) address if there are no collisions to try.
3168 0 : std::tie(addr, addr_last_try) = addrman.SelectTriedCollision();
3169 :
3170 0 : if (!addr.IsValid()) {
3171 : // No tried table collisions. Select a new table address
3172 : // for our feeler.
3173 0 : std::tie(addr, addr_last_try) = addrman.Select(true);
3174 0 : } else if (AlreadyConnectedToAddress(addr)) {
3175 : // If test-before-evict logic would have us connect to a
3176 : // peer that we're already connected to, just mark that
3177 : // address as Good(). We won't be able to initiate the
3178 : // connection anyway, so this avoids inadvertently evicting
3179 : // a currently-connected peer.
3180 0 : addrman.Good(addr);
3181 : // Select a new table address for our feeler instead.
3182 0 : std::tie(addr, addr_last_try) = addrman.Select(true);
3183 0 : }
3184 0 : } else {
3185 : // Not a feeler
3186 : // If preferred_net has a value set, pick an extra outbound
3187 : // peer from that network. The eviction logic in net_processing
3188 : // ensures that a peer from another network will be evicted.
3189 78941 : std::tie(addr, addr_last_try) = addrman.Select(false, preferred_net);
3190 : }
3191 :
3192 : // Require outbound IPv4/IPv6 connections, other than feelers, to be to distinct network groups
3193 78941 : if (!fFeeler && outbound_ipv46_peer_netgroups.count(m_netgroupman.GetGroup(addr))) {
3194 0 : continue;
3195 : }
3196 :
3197 : // if we selected an invalid address, restart
3198 78941 : if (!addr.IsValid()) {
3199 77191 : break;
3200 : }
3201 :
3202 : // don't connect to ourselves
3203 1750 : if (addr.GetPort() == GetListenPort() && IsLocal(addr)) {
3204 0 : break;
3205 : }
3206 :
3207 1750 : if (!g_reachable_nets.Contains(addr)) {
3208 0 : continue;
3209 : }
3210 :
3211 1750 : if (onion_only && !addr.IsTor()) {
3212 0 : continue;
3213 : }
3214 :
3215 : // only consider very recently tried nodes after 30 failed attempts
3216 1750 : if (current_time - addr_last_try < 10min && nTries < 30) {
3217 226 : continue;
3218 : }
3219 :
3220 : // for non-feelers, require all the services we'll want,
3221 : // for feelers, only require they be a full node (only because most
3222 : // SPV clients don't have a good address DB available)
3223 1524 : if (!fFeeler && !HasAllDesirableServiceFlags(addr.nServices)) {
3224 0 : continue;
3225 1524 : } else if (fFeeler && !MayHaveUsefulAddressDB(addr.nServices)) {
3226 0 : continue;
3227 : }
3228 :
3229 : // Do not connect to prohibited ports, unless 50 invalid addresses have been selected already.
3230 1524 : if (nTries < 50 && IsBadPort(addr.GetPort())) {
3231 1489 : continue;
3232 : }
3233 :
3234 : // Do not make automatic outbound connections to addnode peers, to
3235 : // not use our limited outbound slots for them and to ensure
3236 : // addnode connections benefit from their intended protections.
3237 35 : if (AddedNodesContain(addr)) {
3238 0 : LogPrint(BCLog::NET, "Not making automatic %s%s connection to %s peer selected for manual (addnode) connection%s\n",
3239 : preferred_net.has_value() ? "network-specific " : "",
3240 : ConnectionTypeAsString(conn_type), GetNetworkName(addr.GetNetwork()),
3241 : fLogIPs ? strprintf(": %s", addr.ToStringAddrPort()) : "");
3242 0 : continue;
3243 : }
3244 :
3245 : // don't try to connect to masternodes that we already have a connection to (most likely inbound)
3246 35 : if (auto dmn = mnList.GetMNByService(addr); dmn && setConnectedMasternodes.count(dmn->proTxHash)) {
3247 0 : continue;
3248 : }
3249 :
3250 35 : addrConnect = addr;
3251 35 : break;
3252 78941 : }
3253 :
3254 0 : if (addrConnect.IsValid()) {
3255 37 : if (fFeeler) {
3256 : // Add small amount of random noise before connection to avoid synchronization.
3257 0 : if (!interruptNet.sleep_for(rng.rand_uniform_duration<CThreadInterrupt::Clock>(FEELER_SLEEP_WINDOW))) {
3258 0 : return;
3259 : }
3260 0 : if (fLogIPs) {
3261 0 : LogPrint(BCLog::NET, "Making feeler connection to %s\n", addrConnect.ToStringAddrPort());
3262 0 : } else {
3263 0 : LogPrint(BCLog::NET, "Making feeler connection\n");
3264 : }
3265 0 : }
3266 :
3267 37 : if (preferred_net != std::nullopt) LogPrint(BCLog::NET, "Making network specific connection to %s on %s.\n", addrConnect.ToStringAddrPort(), GetNetworkName(preferred_net.value()));
3268 :
3269 : // Record addrman failure attempts when node has at least 2 persistent outbound connections to peers with
3270 : // different netgroups in ipv4/ipv6 networks + all peers in Tor/I2P/CJDNS networks.
3271 : // Don't record addrman failure attempts when node is offline. This can be identified since all local
3272 : // network connections (if any) belong in the same netgroup, and the size of `outbound_ipv46_peer_netgroups` would only be 1.
3273 37 : const bool count_failures{((int)outbound_ipv46_peer_netgroups.size() + outbound_privacy_network_peers) >= std::min(nMaxConnections - 1, 2)};
3274 : // Use BIP324 transport when both us and them have NODE_V2_P2P set.
3275 37 : const bool use_v2transport(addrConnect.nServices & GetLocalServices() & NODE_P2P_V2);
3276 37 : OpenNetworkConnection(addrConnect, count_failures, std::move(grant), /*strDest=*/nullptr, conn_type, use_v2transport);
3277 37 : }
3278 77228 : }
3279 155315 : }
3280 :
3281 847 : std::vector<CAddress> CConnman::GetCurrentBlockRelayOnlyConns() const
3282 : {
3283 847 : std::vector<CAddress> ret;
3284 847 : READ_LOCK(m_nodes_mutex);
3285 3939 : for (const CNode* pnode : m_nodes) {
3286 3092 : if (pnode->IsBlockRelayOnly()) {
3287 6 : ret.push_back(pnode->addr);
3288 6 : }
3289 : }
3290 :
3291 847 : return ret;
3292 847 : }
3293 :
3294 36037 : std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo(bool include_connected) const
3295 : {
3296 36037 : std::vector<AddedNodeInfo> ret;
3297 :
3298 36037 : std::list<AddedNodeParams> lAddresses(0);
3299 : {
3300 36037 : LOCK(m_added_nodes_mutex);
3301 36037 : ret.reserve(m_added_node_params.size());
3302 36037 : std::copy(m_added_node_params.cbegin(), m_added_node_params.cend(), std::back_inserter(lAddresses));
3303 36037 : }
3304 :
3305 :
3306 : // Build a map of all already connected addresses (by IP:port and by name) to inbound/outbound and resolved CService
3307 36037 : std::map<CService, bool> mapConnected;
3308 36037 : std::map<std::string, std::pair<bool, CService>> mapConnectedByName;
3309 : {
3310 36037 : READ_LOCK(m_nodes_mutex);
3311 139955 : for (const CNode* pnode : m_nodes) {
3312 103918 : if (pnode->addr.IsValid()) {
3313 103918 : mapConnected[pnode->addr] = pnode->IsInboundConn();
3314 103918 : }
3315 103918 : std::string addrName{pnode->m_addr_name};
3316 103918 : if (!addrName.empty()) {
3317 103918 : mapConnectedByName[std::move(addrName)] = std::make_pair(pnode->IsInboundConn(), static_cast<const CService&>(pnode->addr));
3318 103918 : }
3319 103918 : }
3320 36037 : }
3321 :
3322 36060 : for (const auto& addr : lAddresses) {
3323 23 : CService service{MaybeFlipIPv6toCJDNS(LookupNumeric(addr.m_added_node, Params().GetDefaultPort(addr.m_added_node)))};
3324 23 : AddedNodeInfo addedNode{addr, CService(), false, false};
3325 23 : if (service.IsValid()) {
3326 : // strAddNode is an IP:port
3327 19 : auto it = mapConnected.find(service);
3328 19 : if (it != mapConnected.end()) {
3329 15 : if (!include_connected) {
3330 5 : continue;
3331 : }
3332 10 : addedNode.resolvedAddress = service;
3333 10 : addedNode.fConnected = true;
3334 10 : addedNode.fInbound = it->second;
3335 10 : }
3336 14 : } else {
3337 : // strAddNode is a name
3338 4 : auto it = mapConnectedByName.find(addr.m_added_node);
3339 4 : if (it != mapConnectedByName.end()) {
3340 0 : if (!include_connected) {
3341 0 : continue;
3342 : }
3343 0 : addedNode.resolvedAddress = it->second.second;
3344 0 : addedNode.fConnected = true;
3345 0 : addedNode.fInbound = it->second.first;
3346 0 : }
3347 : }
3348 18 : ret.emplace_back(std::move(addedNode));
3349 23 : }
3350 :
3351 36037 : return ret;
3352 36037 : }
3353 :
3354 2821 : void CConnman::ThreadOpenAddedConnections()
3355 : {
3356 2821 : AssertLockNotHeld(m_unused_i2p_sessions_mutex);
3357 2821 : AssertLockNotHeld(m_reconnections_mutex);
3358 36018 : while (true)
3359 : {
3360 36018 : CSemaphoreGrant grant(*semAddnode);
3361 36018 : std::vector<AddedNodeInfo> vInfo = GetAddedNodeInfo(/*include_connected=*/false);
3362 36018 : bool tried = false;
3363 36019 : for (const AddedNodeInfo& info : vInfo) {
3364 4 : if (!grant) {
3365 : // If we've used up our semaphore and need a new one, let's not wait here since while we are waiting
3366 : // the addednodeinfo state might change.
3367 0 : break;
3368 : }
3369 4 : tried = true;
3370 4 : CAddress addr(CService(), NODE_NONE);
3371 4 : OpenNetworkConnection(addr, false, std::move(grant), info.m_params.m_added_node.c_str(), ConnectionType::MANUAL, info.m_params.m_use_v2transport);
3372 4 : if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return;
3373 1 : grant = CSemaphoreGrant(*semAddnode, /*fTry=*/true);
3374 4 : }
3375 : // See if any reconnections are desired.
3376 36015 : PerformReconnections();
3377 : // Retry every 60 seconds if a connection was attempted, otherwise two seconds
3378 36015 : if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2)))
3379 2818 : return;
3380 36018 : }
3381 2821 : }
3382 :
3383 2821 : void CConnman::ThreadOpenMasternodeConnections(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman,
3384 : CMasternodeSync& mn_sync)
3385 : {
3386 : // Connecting to specific addresses, no masternode connections available
3387 2821 : if (gArgs.IsArgSet("-connect") && gArgs.GetArgs("-connect").size() > 0)
3388 1972 : return;
3389 :
3390 849 : auto& chainParams = Params();
3391 :
3392 849 : bool didConnect = false;
3393 45615 : while (!interruptNet)
3394 : {
3395 45614 : auto sleepTime = std::chrono::milliseconds(1000);
3396 45614 : if (didConnect) {
3397 3545 : sleepTime = std::chrono::milliseconds(100);
3398 3545 : }
3399 45614 : if (!interruptNet.sleep_for(sleepTime))
3400 848 : return;
3401 :
3402 44766 : didConnect = false;
3403 :
3404 44766 : if (!fNetworkActive || !m_masternode_thread_active || !mn_sync.IsBlockchainSynced()) continue;
3405 :
3406 40709 : std::unordered_set<CService, CServiceHash> connectedNodes;
3407 40709 : Uint256HashMap</*fInbound=*/bool> connectedProRegTxHashes;
3408 211028 : ForEachNode([&](const CNode* pnode) {
3409 170319 : connectedNodes.emplace(pnode->addr);
3410 170319 : if (auto verifiedProRegTxHash = pnode->GetVerifiedProRegTxHash(); !verifiedProRegTxHash.IsNull()) {
3411 131665 : connectedProRegTxHashes.emplace(verifiedProRegTxHash, pnode->IsInboundConn());
3412 131665 : }
3413 170319 : });
3414 :
3415 40709 : auto mnList = dmnman.GetListAtChainTip();
3416 :
3417 40709 : if (interruptNet)
3418 1 : return;
3419 :
3420 40708 : int64_t nANow = GetTime<std::chrono::seconds>().count();
3421 40708 : constexpr const auto &_func_ = __func__;
3422 :
3423 : // NOTE: Process only one pending masternode at a time
3424 :
3425 40708 : MasternodeProbeConn isProbe = MasternodeProbeConn::IsNotConnection;
3426 :
3427 81280 : const auto getPendingQuorumNodes = [&]() SHARED_LOCKS_REQUIRED(m_nodes_mutex) EXCLUSIVE_LOCKS_REQUIRED(cs_vPendingMasternodes) {
3428 40572 : AssertSharedLockHeld(m_nodes_mutex);
3429 40572 : AssertLockHeld(cs_vPendingMasternodes);
3430 40572 : std::vector<CDeterministicMNCPtr> ret;
3431 139037 : for (const auto& group : masternodeQuorumNodes) {
3432 267927 : for (const auto& proRegTxHash : group.second) {
3433 169462 : if (connectedProRegTxHashes.count(proRegTxHash)) {
3434 155841 : continue;
3435 : }
3436 13621 : auto dmn = mnList.GetMN(proRegTxHash);
3437 13621 : if (!dmn) {
3438 0 : continue;
3439 : }
3440 13621 : const auto addr2 = dmn->pdmnState->netInfo->GetPrimary();
3441 13621 : CNode* pnode = FindNodeMutable(addr2, /*fExcludeDisconnecting=*/false);
3442 13621 : if (pnode && (pnode->m_masternode_connection || pnode->fDisconnect)) {
3443 : // node is either a masternode or disconnecting, skip it
3444 280 : continue;
3445 : }
3446 13341 : if (connectedNodes.count(addr2)) {
3447 : // we probably connected to it before it became a masternode
3448 : // or maybe we are still waiting for mnauth
3449 6 : bool slow_handshake = pnode && pnode->nTimeFirstMessageReceived.load() != 0s &&
3450 3 : GetTime<std::chrono::seconds>() - pnode->nTimeFirstMessageReceived.load() > 5s;
3451 3 : if (slow_handshake) {
3452 : // clearly not expecting mnauth to take that long even if it wasn't the first message
3453 : // we received (as it should normally), disconnect
3454 3 : LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- dropping non-mnauth connection to %s, service=%s\n",
3455 : _func_, proRegTxHash.ToString(), addr2.ToStringAddrPort());
3456 3 : pnode->fDisconnect = true;
3457 3 : }
3458 : // either way - it's not ready, skip it for now
3459 3 : continue;
3460 : }
3461 : // back off connecting to an address if we already tried recently
3462 13338 : int64_t last_attempt = mn_metaman.GetLastOutboundAttempt(dmn->proTxHash);
3463 13338 : if (nANow - last_attempt < chainParams.LLMQConnectionRetryTimeout()) {
3464 5922 : continue;
3465 : }
3466 : // all checks passed
3467 7416 : ret.emplace_back(dmn);
3468 13621 : }
3469 : }
3470 40572 : return ret;
3471 40572 : };
3472 :
3473 78231 : const auto getPendingProbes = [&]() EXCLUSIVE_LOCKS_REQUIRED(cs_vPendingMasternodes) {
3474 37523 : AssertLockHeld(cs_vPendingMasternodes);
3475 37523 : std::vector<CDeterministicMNCPtr> ret;
3476 38714 : for (auto it = masternodePendingProbes.begin(); it != masternodePendingProbes.end(); ) {
3477 1191 : auto dmn = mnList.GetMN(*it);
3478 1191 : if (!dmn) {
3479 0 : it = masternodePendingProbes.erase(it);
3480 0 : continue;
3481 : }
3482 1191 : bool connectedAndOutbound = connectedProRegTxHashes.count(dmn->proTxHash) && !connectedProRegTxHashes[dmn->proTxHash];
3483 1191 : if (connectedAndOutbound) {
3484 : // we already have an outbound connection to this MN so there is no theed to probe it again
3485 300 : mn_metaman.SetLastOutboundSuccess(dmn->proTxHash, nANow);
3486 300 : it = masternodePendingProbes.erase(it);
3487 300 : continue;
3488 : }
3489 :
3490 891 : ++it;
3491 :
3492 891 : int64_t lastAttempt = mn_metaman.GetLastOutboundAttempt(dmn->proTxHash);
3493 : // back off trying connecting to an address if we already tried recently
3494 891 : if (nANow - lastAttempt < chainParams.LLMQConnectionRetryTimeout()) {
3495 309 : continue;
3496 : }
3497 582 : ret.emplace_back(dmn);
3498 1191 : }
3499 37523 : return ret;
3500 37523 : };
3501 :
3502 81416 : auto getConnectToDmn = [&]() -> CDeterministicMNCPtr {
3503 : // don't hold lock while calling OpenMasternodeConnection as cs_main is locked deep inside
3504 40708 : READ_LOCK(m_nodes_mutex);
3505 40708 : LOCK(cs_vPendingMasternodes);
3506 :
3507 40708 : if (!vPendingMasternodes.empty()) {
3508 183 : auto dmn = mnList.GetValidMN(vPendingMasternodes.front());
3509 183 : vPendingMasternodes.erase(vPendingMasternodes.begin());
3510 : // Check if we should connect to this masternode
3511 : // We already hold m_nodes_mutex here, so check m_masternode_connection directly
3512 183 : if (dmn && !connectedNodes.count(dmn->pdmnState->netInfo->GetPrimary())) {
3513 136 : const CNode* pnode = FindNode(dmn->pdmnState->netInfo->GetPrimary(), /*fExcludeDisconnecting=*/false);
3514 136 : if (pnode == nullptr || (!pnode->m_masternode_connection && !pnode->fDisconnect)) {
3515 136 : LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- opening pending masternode connection to %s, service=%s\n",
3516 : _func_, dmn->proTxHash.ToString(),
3517 : dmn->pdmnState->netInfo->GetPrimary().ToStringAddrPort());
3518 136 : return dmn;
3519 : }
3520 0 : }
3521 230 : }
3522 :
3523 43621 : if (const auto pending = getPendingQuorumNodes(); !pending.empty()) {
3524 : // not-null
3525 3049 : auto dmn = pending[GetRand(pending.size())];
3526 3049 : LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- opening quorum connection to %s, service=%s\n",
3527 : _func_, dmn->proTxHash.ToString(), dmn->pdmnState->netInfo->GetPrimary().ToStringAddrPort());
3528 3049 : return dmn;
3529 3049 : }
3530 :
3531 37883 : if (const auto pending = getPendingProbes(); !pending.empty()) {
3532 : // not-null
3533 360 : auto dmn = pending[GetRand(pending.size())];
3534 360 : masternodePendingProbes.erase(dmn->proTxHash);
3535 360 : isProbe = MasternodeProbeConn::IsConnection;
3536 :
3537 360 : LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- probing masternode %s, service=%s\n", _func_, dmn->proTxHash.ToString(), dmn->pdmnState->netInfo->GetPrimary().ToStringAddrPort());
3538 360 : return dmn;
3539 360 : }
3540 37163 : return nullptr;
3541 40708 : };
3542 :
3543 40708 : CDeterministicMNCPtr connectToDmn = getConnectToDmn();
3544 :
3545 40708 : if (connectToDmn == nullptr) {
3546 37163 : continue;
3547 : }
3548 :
3549 3545 : didConnect = true;
3550 :
3551 3545 : mn_metaman.SetLastOutboundAttempt(connectToDmn->proTxHash, nANow);
3552 :
3553 3545 : OpenMasternodeConnection(CAddress(connectToDmn->pdmnState->netInfo->GetPrimary(), NODE_NETWORK), /*use_v2transport=*/GetLocalServices() & NODE_P2P_V2, isProbe);
3554 : // should be in the list now if connection was opened
3555 6182 : bool connected = ForNode(connectToDmn->pdmnState->netInfo->GetPrimary(), CConnman::AllNodes, [&](const CNode* pnode) {
3556 2637 : if (pnode->fDisconnect) {
3557 1 : return false;
3558 : }
3559 2636 : return true;
3560 2637 : });
3561 3545 : if (!connected) {
3562 909 : LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- connection failed for masternode %s, service=%s\n", __func__, connectToDmn->proTxHash.ToString(), connectToDmn->pdmnState->netInfo->GetPrimary().ToStringAddrPort());
3563 : // Will take a few consequent failed attempts to PoSe-punish a MN.
3564 909 : if (mn_metaman.OutboundFailedTooManyTimes(connectToDmn->proTxHash)) {
3565 592 : LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- failed to connect to masternode %s too many times\n", __func__, connectToDmn->proTxHash.ToString());
3566 592 : }
3567 909 : }
3568 40709 : }
3569 2821 : }
3570 :
3571 : // if successful, this moves the passed grant to the constructed node
3572 12945 : void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound,
3573 : const char *pszDest, ConnectionType conn_type, bool use_v2transport,
3574 : MasternodeConn masternode_connection, MasternodeProbeConn masternode_probe_connection)
3575 : {
3576 12945 : AssertLockNotHeld(m_unused_i2p_sessions_mutex);
3577 12945 : assert(conn_type != ConnectionType::INBOUND);
3578 :
3579 : //
3580 : // Initiate outbound network connection
3581 : //
3582 12945 : if (interruptNet) {
3583 1 : return;
3584 : }
3585 12944 : if (!fNetworkActive) {
3586 0 : return;
3587 : }
3588 :
3589 24470 : auto getIpStr = [&]() {
3590 11526 : if (fLogIPs) {
3591 4 : return addrConnect.ToStringAddrPort();
3592 : } else {
3593 11522 : return std::string("new peer");
3594 : }
3595 11526 : };
3596 :
3597 12944 : if (!pszDest) {
3598 : // banned, discouraged or exact match?
3599 10746 : if ((m_banman && (m_banman->IsDiscouraged(addrConnect) || m_banman->IsBanned(addrConnect))) || AlreadyConnectedToAddress(addrConnect))
3600 7164 : return;
3601 : // connecting to ourselves?
3602 3582 : if (addrConnect.GetPort() == GetListenPort() && IsLocal(addrConnect)) {
3603 0 : return;
3604 : }
3605 5780 : } else if (ExistsNode(std::string(pszDest))) {
3606 17 : return;
3607 : }
3608 :
3609 5763 : LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- connecting to %s\n", __func__, getIpStr());
3610 5763 : CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type, use_v2transport);
3611 :
3612 5763 : if (!pnode) {
3613 974 : LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- ConnectNode failed for %s\n", __func__, getIpStr());
3614 974 : return;
3615 : }
3616 :
3617 : {
3618 4789 : LOCK(pnode->m_sock_mutex);
3619 4789 : LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- successfully connected to %s, sock=%d, peer=%d\n", __func__, getIpStr(), pnode->m_sock->Get(), pnode->GetId());
3620 4789 : }
3621 :
3622 4789 : pnode->grantOutbound = std::move(grant_outbound);
3623 :
3624 4789 : if (masternode_connection == MasternodeConn::IsConnection)
3625 2637 : pnode->m_masternode_connection = true;
3626 4789 : if (masternode_probe_connection == MasternodeProbeConn::IsConnection)
3627 257 : pnode->m_masternode_probe_connection = true;
3628 :
3629 : {
3630 4789 : LOCK2(cs_mapSocketToNode, pnode->m_sock_mutex);
3631 4789 : mapSocketToNode.emplace(pnode->m_sock->Get(), pnode);
3632 4789 : }
3633 :
3634 4789 : m_msgproc->InitializeNode(*pnode, nLocalServices);
3635 : {
3636 4789 : LOCK(m_nodes_mutex);
3637 4789 : m_nodes.push_back(pnode);
3638 :
3639 : // update connection count by network
3640 4789 : if (pnode->IsManualOrFullOutboundConn()) ++m_network_conn_counts[pnode->addr.GetNetwork()];
3641 4789 : }
3642 : {
3643 4789 : if (m_edge_trig_events) {
3644 4789 : LOCK(pnode->m_sock_mutex);
3645 4789 : if (!m_edge_trig_events->RegisterEvents(pnode->m_sock->Get())) {
3646 0 : LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n");
3647 0 : }
3648 4789 : }
3649 4789 : if (m_wakeup_pipe) {
3650 4789 : m_wakeup_pipe->Write();
3651 4789 : }
3652 : }
3653 5781 : }
3654 :
3655 3545 : void CConnman::OpenMasternodeConnection(const CAddress &addrConnect, bool use_v2transport, MasternodeProbeConn probe) {
3656 7090 : OpenNetworkConnection(addrConnect, false, {}, /*strDest=*/nullptr, ConnectionType::OUTBOUND_FULL_RELAY,
3657 3545 : use_v2transport, MasternodeConn::IsConnection, probe);
3658 3545 : }
3659 :
3660 : Mutex NetEventsInterface::g_msgproc_mutex;
3661 :
3662 2821 : void CConnman::ThreadMessageHandler()
3663 : {
3664 2821 : LOCK(NetEventsInterface::g_msgproc_mutex);
3665 :
3666 2821 : auto nLastSendMessagesTimeMasternodes = SteadyClock::time_point{};
3667 :
3668 2821 : FastRandomContext rng;
3669 1058067 : while (!flagInterruptMsgProc)
3670 : {
3671 1055268 : bool fMoreWork = false;
3672 :
3673 1055268 : bool fSkipSendMessagesForMasternodes = true;
3674 1055268 : if (SteadyClock::now() - nLastSendMessagesTimeMasternodes >= 100ms) {
3675 405707 : fSkipSendMessagesForMasternodes = false;
3676 405707 : nLastSendMessagesTimeMasternodes = SteadyClock::now();
3677 405707 : }
3678 :
3679 : // Randomize the order in which we process messages from/to our peers.
3680 : // This prevents attacks in which an attacker exploits having multiple
3681 : // consecutive connections in the m_nodes list.
3682 1055268 : const NodesSnapshot snap{*this, /* cond = */ CConnman::AllNodes, /* shuffle = */ true};
3683 :
3684 4729469 : for (CNode* pnode : snap.Nodes()) {
3685 3674223 : if (pnode->fDisconnect)
3686 2132 : continue;
3687 :
3688 : // Receive messages
3689 3672091 : bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc);
3690 3672091 : fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
3691 3672091 : if (flagInterruptMsgProc)
3692 16 : return;
3693 : // Send messages
3694 3672075 : if (!fSkipSendMessagesForMasternodes || !pnode->m_masternode_connection) {
3695 2375414 : m_msgproc->SendMessages(pnode);
3696 2375414 : }
3697 :
3698 3672075 : if (flagInterruptMsgProc)
3699 6 : return;
3700 : }
3701 :
3702 1055246 : WAIT_LOCK(mutexMsgProc, lock);
3703 1055246 : if (!fMoreWork) {
3704 2524940 : condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this]() EXCLUSIVE_LOCKS_REQUIRED(mutexMsgProc) { return fMsgProcWake; });
3705 887006 : }
3706 1055246 : fMsgProcWake = false;
3707 1055268 : }
3708 2821 : }
3709 :
3710 8 : void CConnman::ThreadI2PAcceptIncoming(CMasternodeSync& mn_sync)
3711 : {
3712 : static constexpr auto err_wait_begin = 1s;
3713 : static constexpr auto err_wait_cap = 5min;
3714 8 : auto err_wait = err_wait_begin;
3715 :
3716 8 : bool advertising_listen_addr = false;
3717 8 : i2p::Connection conn;
3718 :
3719 16 : auto SleepOnFailure = [&]() {
3720 8 : interruptNet.sleep_for(err_wait);
3721 8 : if (err_wait < err_wait_cap) {
3722 8 : err_wait += 1s;
3723 8 : }
3724 8 : };
3725 :
3726 16 : while (!interruptNet) {
3727 :
3728 8 : if (!m_i2p_sam_session->Listen(conn)) {
3729 8 : if (advertising_listen_addr && conn.me.IsValid()) {
3730 0 : RemoveLocal(conn.me);
3731 0 : advertising_listen_addr = false;
3732 0 : }
3733 8 : SleepOnFailure();
3734 8 : continue;
3735 : }
3736 :
3737 0 : if (!advertising_listen_addr) {
3738 0 : AddLocal(conn.me, LOCAL_MANUAL);
3739 0 : advertising_listen_addr = true;
3740 0 : }
3741 :
3742 0 : if (!m_i2p_sam_session->Accept(conn)) {
3743 0 : SleepOnFailure();
3744 0 : continue;
3745 : }
3746 :
3747 0 : CreateNodeFromAcceptedSocket(std::move(conn.sock), NetPermissionFlags::None,
3748 0 : CAddress{conn.me, NODE_NONE}, CAddress{conn.peer, NODE_NONE}, mn_sync);
3749 :
3750 0 : err_wait = err_wait_begin;
3751 : }
3752 8 : }
3753 :
3754 5614 : bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, NetPermissionFlags permissions)
3755 : {
3756 5614 : int nOne = 1;
3757 :
3758 : // Create socket for listening for incoming connections
3759 : struct sockaddr_storage sockaddr;
3760 5614 : socklen_t len = sizeof(sockaddr);
3761 5614 : if (!addrBind.GetSockAddr((struct sockaddr*)&sockaddr, &len))
3762 : {
3763 0 : strError = strprintf(Untranslated("Error: Bind address family for %s not supported"), addrBind.ToStringAddrPort());
3764 0 : LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original);
3765 0 : return false;
3766 : }
3767 :
3768 5614 : std::unique_ptr<Sock> sock = CreateSock(addrBind.GetSAFamily());
3769 5614 : if (!sock) {
3770 0 : strError = strprintf(Untranslated("Couldn't open socket for incoming connections (socket returned error %s)"), NetworkErrorString(WSAGetLastError()));
3771 0 : LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original);
3772 0 : return false;
3773 : }
3774 :
3775 : // Allow binding if the port is still in TIME_WAIT state after
3776 : // the program was closed and restarted.
3777 5614 : if (sock->SetSockOpt(SOL_SOCKET, SO_REUSEADDR, (sockopt_arg_type)&nOne, sizeof(int)) == SOCKET_ERROR) {
3778 0 : strError = strprintf(Untranslated("Error setting SO_REUSEADDR on socket: %s, continuing anyway"), NetworkErrorString(WSAGetLastError()));
3779 0 : LogPrintf("%s\n", strError.original);
3780 0 : }
3781 :
3782 : // some systems don't have IPV6_V6ONLY but are always v6only; others do have the option
3783 : // and enable it by default or not. Try to enable it, if possible.
3784 5614 : if (addrBind.IsIPv6()) {
3785 : #ifdef IPV6_V6ONLY
3786 4 : if (sock->SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, (sockopt_arg_type)&nOne, sizeof(int)) == SOCKET_ERROR) {
3787 0 : strError = strprintf(Untranslated("Error setting IPV6_V6ONLY on socket: %s, continuing anyway"), NetworkErrorString(WSAGetLastError()));
3788 0 : LogPrintf("%s\n", strError.original);
3789 0 : }
3790 : #endif
3791 : #ifdef WIN32
3792 : int nProtLevel = PROTECTION_LEVEL_UNRESTRICTED;
3793 : if (sock->SetSockOpt(IPPROTO_IPV6, IPV6_PROTECTION_LEVEL, (const char*)&nProtLevel, sizeof(int)) == SOCKET_ERROR) {
3794 : strError = strprintf(Untranslated("Error setting IPV6_PROTECTION_LEVEL on socket: %s, continuing anyway"), NetworkErrorString(WSAGetLastError()));
3795 : LogPrintf("%s\n", strError.original);
3796 : }
3797 : #endif
3798 4 : }
3799 :
3800 5614 : if (sock->Bind(reinterpret_cast<struct sockaddr*>(&sockaddr), len) == SOCKET_ERROR) {
3801 2578 : int nErr = WSAGetLastError();
3802 2578 : if (nErr == WSAEADDRINUSE)
3803 2578 : strError = strprintf(_("Unable to bind to %s on this computer. %s is probably already running."), addrBind.ToStringAddrPort(), PACKAGE_NAME);
3804 : else
3805 0 : strError = strprintf(_("Unable to bind to %s on this computer (bind returned error %s)"), addrBind.ToStringAddrPort(), NetworkErrorString(nErr));
3806 2578 : LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original);
3807 2578 : return false;
3808 : }
3809 3036 : LogPrintf("Bound to %s\n", addrBind.ToStringAddrPort());
3810 :
3811 : // Listen for incoming connections
3812 3036 : if (sock->Listen(SOMAXCONN) == SOCKET_ERROR)
3813 : {
3814 0 : strError = strprintf(_("Listening for incoming connections failed (listen returned error %s)"), NetworkErrorString(WSAGetLastError()));
3815 0 : LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original);
3816 0 : return false;
3817 : }
3818 :
3819 3036 : if (m_edge_trig_events && !m_edge_trig_events->AddSocket(sock->Get())) {
3820 0 : LogPrintf("Error: EdgeTriggeredEvents::AddSocket() failed\n");
3821 0 : return false;
3822 : }
3823 :
3824 3036 : vhListenSocket.emplace_back(std::move(sock), permissions);
3825 :
3826 3036 : return true;
3827 5614 : }
3828 :
3829 21 : void Discover()
3830 : {
3831 21 : if (!fDiscover)
3832 17 : return;
3833 :
3834 : #ifdef WIN32
3835 : // Get local host IP
3836 : char pszHostName[256] = "";
3837 : if (gethostname(pszHostName, sizeof(pszHostName)) != SOCKET_ERROR)
3838 : {
3839 : const std::vector<CNetAddr> addresses{LookupHost(pszHostName, 0, true)};
3840 : for (const CNetAddr& addr : addresses)
3841 : {
3842 : if (AddLocal(addr, LOCAL_IF))
3843 : LogPrintf("%s: %s - %s\n", __func__, pszHostName, addr.ToStringAddr());
3844 : }
3845 : }
3846 : #elif (HAVE_DECL_GETIFADDRS && HAVE_DECL_FREEIFADDRS)
3847 : // Get local host ip
3848 : struct ifaddrs* myaddrs;
3849 4 : if (getifaddrs(&myaddrs) == 0)
3850 : {
3851 168 : for (struct ifaddrs* ifa = myaddrs; ifa != nullptr; ifa = ifa->ifa_next)
3852 : {
3853 164 : if (ifa->ifa_addr == nullptr) continue;
3854 164 : if ((ifa->ifa_flags & IFF_UP) == 0) continue;
3855 152 : if ((ifa->ifa_flags & IFF_LOOPBACK) != 0) continue;
3856 136 : if (ifa->ifa_addr->sa_family == AF_INET)
3857 : {
3858 8 : struct sockaddr_in* s4 = (struct sockaddr_in*)(ifa->ifa_addr);
3859 8 : CNetAddr addr(s4->sin_addr);
3860 8 : if (AddLocal(addr, LOCAL_IF))
3861 8 : LogPrintf("%s: IPv4 %s: %s\n", __func__, ifa->ifa_name, addr.ToStringAddr());
3862 8 : }
3863 128 : else if (ifa->ifa_addr->sa_family == AF_INET6)
3864 : {
3865 40 : struct sockaddr_in6* s6 = (struct sockaddr_in6*)(ifa->ifa_addr);
3866 40 : CNetAddr addr(s6->sin6_addr);
3867 40 : if (AddLocal(addr, LOCAL_IF))
3868 40 : LogPrintf("%s: IPv6 %s: %s\n", __func__, ifa->ifa_name, addr.ToStringAddr());
3869 40 : }
3870 136 : }
3871 4 : freeifaddrs(myaddrs);
3872 4 : }
3873 : #endif
3874 21 : }
3875 :
3876 3943 : void CConnman::SetNetworkActive(bool active, CMasternodeSync* const mn_sync)
3877 : {
3878 3943 : LogPrintf("%s: %s\n", __func__, active);
3879 :
3880 3943 : if (fNetworkActive == active) {
3881 3561 : return;
3882 : }
3883 :
3884 382 : fNetworkActive = active;
3885 :
3886 : // mn_sync is nullptr during app initialization with `-networkactive=`
3887 382 : if (mn_sync) {
3888 : // Always call the Reset() if the network gets enabled/disabled to make sure the sync process
3889 : // gets a reset if its outdated..
3890 376 : mn_sync->Reset();
3891 376 : }
3892 :
3893 382 : if (m_client_interface) {
3894 374 : m_client_interface->NotifyNetworkActiveChanged(fNetworkActive);
3895 374 : }
3896 3943 : }
3897 :
3898 6503 : CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In, AddrMan& addrman_in,
3899 : const NetGroupManager& netgroupman, bool network_active)
3900 : : addrman(addrman_in)
3901 : , m_netgroupman{netgroupman}
3902 : , nSeed0(nSeed0In)
3903 : , nSeed1(nSeed1In)
3904 2936 : {
3905 : // Make sure we never set the default port to a bad port
3906 : for (int n = 0; n < NET_MAX; ++n) {
3907 : const bool is_bad_port = IsBadPort(Params().GetDefaultPort(static_cast<Network>(n)));
3908 : assert(!is_bad_port);
3909 : }
3910 :
3911 : SetTryNewOutboundPeer(false);
3912 :
3913 : Options connOptions;
3914 : Init(connOptions);
3915 : SetNetworkActive(network_active, /* mn_sync = */ nullptr);
3916 2936 : }
3917 :
3918 9947 : NodeId CConnman::GetNewNodeId()
3919 : {
3920 9947 : return nLastNodeId.fetch_add(1, std::memory_order_relaxed);
3921 : }
3922 :
3923 :
3924 5614 : bool CConnman::Bind(const CService& addr_, unsigned int flags, NetPermissionFlags permissions)
3925 : {
3926 5614 : const CService addr{MaybeFlipIPv6toCJDNS(addr_)};
3927 :
3928 5614 : bilingual_str strError;
3929 5614 : if (!BindListenPort(addr, strError, permissions)) {
3930 2578 : if ((flags & BF_REPORT_ERROR) && m_client_interface) {
3931 0 : m_client_interface->ThreadSafeMessageBox(strError, "", CClientUIInterface::MSG_ERROR);
3932 0 : }
3933 2578 : return false;
3934 : }
3935 :
3936 3036 : if (addr.IsRoutable() && fDiscover && !(flags & BF_DONT_ADVERTISE) && !NetPermissions::HasFlag(permissions, NetPermissionFlags::NoBan)) {
3937 0 : AddLocal(addr, LOCAL_BIND);
3938 0 : }
3939 :
3940 3036 : return true;
3941 5614 : }
3942 :
3943 2804 : bool CConnman::InitBinds(const Options& options)
3944 : {
3945 2804 : bool fBound = false;
3946 5602 : for (const auto& addrBind : options.vBinds) {
3947 2798 : fBound |= Bind(addrBind, BF_REPORT_ERROR, NetPermissionFlags::None);
3948 : }
3949 2806 : for (const auto& addrBind : options.vWhiteBinds) {
3950 2 : fBound |= Bind(addrBind.m_service, BF_REPORT_ERROR, addrBind.m_flags);
3951 : }
3952 5610 : for (const auto& addr_bind : options.onion_binds) {
3953 2806 : fBound |= Bind(addr_bind, BF_DONT_ADVERTISE, NetPermissionFlags::None);
3954 : }
3955 2804 : if (options.bind_on_any) {
3956 : struct in_addr inaddr_any;
3957 4 : inaddr_any.s_addr = htonl(INADDR_ANY);
3958 4 : struct in6_addr inaddr6_any = IN6ADDR_ANY_INIT;
3959 4 : fBound |= Bind(CService(inaddr6_any, GetListenPort()), BF_NONE, NetPermissionFlags::None);
3960 4 : fBound |= Bind(CService(inaddr_any, GetListenPort()), !fBound ? BF_REPORT_ERROR : BF_NONE, NetPermissionFlags::None);
3961 4 : }
3962 2804 : return fBound;
3963 0 : }
3964 :
3965 2845 : bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman, CMasternodeSync& mn_sync,
3966 : CScheduler& scheduler, const Options& connOptions)
3967 : {
3968 2845 : AssertLockNotHeld(m_total_bytes_sent_mutex);
3969 2845 : Init(connOptions);
3970 :
3971 2845 : if (socketEventsMode == SocketEventsMode::EPoll || socketEventsMode == SocketEventsMode::KQueue) {
3972 2821 : m_edge_trig_events = std::make_unique<EdgeTriggeredEvents>(socketEventsMode);
3973 2821 : if (!m_edge_trig_events->IsValid()) {
3974 0 : LogPrintf("Unable to initialize EdgeTriggeredEvents instance\n");
3975 0 : m_edge_trig_events.reset();
3976 0 : return false;
3977 : }
3978 2821 : }
3979 :
3980 2845 : if (fListen && !InitBinds(connOptions)) {
3981 0 : if (m_client_interface) {
3982 0 : m_client_interface->ThreadSafeMessageBox(
3983 0 : _("Failed to listen on any port. Use -listen=0 if you want this."),
3984 0 : "", CClientUIInterface::MSG_ERROR);
3985 0 : }
3986 0 : return false;
3987 : }
3988 :
3989 2845 : Proxy i2p_sam;
3990 2845 : if (GetProxy(NET_I2P, i2p_sam) && connOptions.m_i2p_accept_incoming) {
3991 8 : m_i2p_sam_session = std::make_unique<i2p::sam::Session>(gArgs.GetDataDirNet() / "i2p_private_key",
3992 8 : i2p_sam, &interruptNet);
3993 8 : }
3994 :
3995 2827 : for (const auto& strDest : connOptions.vSeedNodes) {
3996 6 : AddAddrFetch(strDest);
3997 : }
3998 :
3999 2821 : if (m_use_addrman_outgoing) {
4000 : // Load addresses from anchors.dat
4001 847 : m_anchors = ReadAnchors(gArgs.GetDataDirNet() / ANCHORS_DATABASE_FILENAME);
4002 847 : if (m_anchors.size() > MAX_BLOCK_RELAY_ONLY_ANCHORS) {
4003 0 : m_anchors.resize(MAX_BLOCK_RELAY_ONLY_ANCHORS);
4004 0 : }
4005 847 : LogPrintf("%i block-relay-only anchors will be tried for connections.\n", m_anchors.size());
4006 847 : }
4007 :
4008 2821 : if (m_client_interface) {
4009 2821 : m_client_interface->InitMessage(_("Starting network threads…").translated);
4010 2821 : }
4011 :
4012 2821 : fAddressesInitialized = true;
4013 :
4014 2821 : if (semOutbound == nullptr) {
4015 : // initialize semaphore
4016 2821 : semOutbound = std::make_unique<CSemaphore>(std::min(m_max_outbound, nMaxConnections));
4017 2821 : }
4018 2821 : if (semAddnode == nullptr) {
4019 : // initialize semaphore
4020 2821 : semAddnode = std::make_unique<CSemaphore>(nMaxAddnode);
4021 2821 : }
4022 :
4023 : //
4024 : // Start threads
4025 : //
4026 2821 : assert(m_msgproc);
4027 2821 : InterruptSocks5(false);
4028 2821 : interruptNet.reset();
4029 2821 : flagInterruptMsgProc = false;
4030 :
4031 : {
4032 2821 : LOCK(mutexMsgProc);
4033 2821 : fMsgProcWake = false;
4034 2821 : }
4035 :
4036 : #ifdef USE_WAKEUP_PIPE
4037 2821 : m_wakeup_pipe = std::make_unique<WakeupPipe>(m_edge_trig_events.get());
4038 2821 : if (!m_wakeup_pipe->IsValid()) {
4039 : /* We log the error but do not halt initialization */
4040 0 : LogPrintf("Unable to initialize WakeupPipe instance\n");
4041 0 : m_wakeup_pipe.reset();
4042 0 : }
4043 : #endif /* USE_WAKEUP_PIPE */
4044 :
4045 : // Send and receive from sockets, accept connections
4046 5642 : threadSocketHandler = std::thread(&util::TraceThread, "net", [this, &mn_sync] { ThreadSocketHandler(mn_sync); });
4047 :
4048 2821 : if (!gArgs.GetBoolArg("-dnsseed", DEFAULT_DNSSEED))
4049 2799 : LogPrintf("DNS seeding disabled\n");
4050 : else
4051 44 : threadDNSAddressSeed = std::thread(&util::TraceThread, "dnsseed", [this] { ThreadDNSAddressSeed(); });
4052 :
4053 : // Initiate manual connections
4054 5642 : threadOpenAddedConnections = std::thread(&util::TraceThread, "addcon", [this] { ThreadOpenAddedConnections(); });
4055 :
4056 2821 : if (connOptions.m_use_addrman_outgoing && !connOptions.m_specified_outgoing.empty()) {
4057 0 : if (m_client_interface) {
4058 0 : m_client_interface->ThreadSafeMessageBox(
4059 0 : _("Cannot provide specific connections and have addrman find outgoing connections at the same time."),
4060 0 : "", CClientUIInterface::MSG_ERROR);
4061 0 : }
4062 0 : return false;
4063 : }
4064 2821 : if (connOptions.m_use_addrman_outgoing || !connOptions.m_specified_outgoing.empty()) {
4065 859 : threadOpenConnections = std::thread(
4066 847 : &util::TraceThread, "opencon",
4067 1706 : [this, connect = connOptions.m_specified_outgoing, &dmnman] { ThreadOpenConnections(connect, dmnman); });
4068 859 : }
4069 :
4070 : // Initiate masternode connections
4071 5654 : threadOpenMasternodeConnections = std::thread(&util::TraceThread, "mncon", [this, &dmnman, &mn_metaman, &mn_sync] {
4072 2821 : ThreadOpenMasternodeConnections(dmnman, mn_metaman, mn_sync);
4073 2821 : });
4074 :
4075 : // Process messages
4076 5642 : threadMessageHandler = std::thread(&util::TraceThread, "msghand", [this] { ThreadMessageHandler(); });
4077 :
4078 2821 : if (m_i2p_sam_session) {
4079 8 : threadI2PAcceptIncoming =
4080 16 : std::thread(&util::TraceThread, "i2paccept", [this, &mn_sync] { ThreadI2PAcceptIncoming(mn_sync); });
4081 8 : }
4082 :
4083 : // Dump network addresses
4084 4075 : scheduler.scheduleEvery([this] { DumpAddresses(); }, DUMP_PEERS_INTERVAL);
4085 :
4086 : // Run the ASMap Health check once and then schedule it to run every 24h.
4087 2821 : if (m_netgroupman.UsingASMap()) {
4088 14 : ASMapHealthCheck();
4089 14 : scheduler.scheduleEvery([this] { ASMapHealthCheck(); }, ASMAP_HEALTH_CHECK_INTERVAL);
4090 14 : }
4091 :
4092 2821 : return true;
4093 2869 : }
4094 :
4095 : class CNetCleanup
4096 : {
4097 : public:
4098 : CNetCleanup() = default;
4099 :
4100 0 : ~CNetCleanup()
4101 0 : {
4102 : #ifdef WIN32
4103 : // Shutdown Windows Sockets
4104 : WSACleanup();
4105 : #endif
4106 0 : }
4107 : };
4108 : static CNetCleanup instance_of_cnetcleanup;
4109 :
4110 0 : void CExplicitNetCleanup::callCleanup()
4111 : {
4112 : // Explicit call to destructor of CNetCleanup because it's not implicitly called
4113 : // when the wallet is restarted from within the wallet itself.
4114 : CNetCleanup tmp;
4115 0 : }
4116 :
4117 6501 : void CConnman::Interrupt()
4118 : {
4119 : {
4120 6501 : LOCK(mutexMsgProc);
4121 6501 : flagInterruptMsgProc = true;
4122 6501 : }
4123 6501 : condMsgProc.notify_all();
4124 :
4125 6501 : interruptNet();
4126 6501 : InterruptSocks5(true);
4127 :
4128 6501 : if (semOutbound) {
4129 33832 : for (int i=0; i<m_max_outbound; i++) {
4130 31011 : semOutbound->post();
4131 31011 : }
4132 2821 : }
4133 :
4134 6501 : if (semAddnode) {
4135 25389 : for (int i=0; i<nMaxAddnode; i++) {
4136 22568 : semAddnode->post();
4137 22568 : }
4138 2821 : }
4139 6501 : }
4140 :
4141 6679 : void CConnman::StopThreads()
4142 : {
4143 6679 : if (threadI2PAcceptIncoming.joinable()) {
4144 8 : threadI2PAcceptIncoming.join();
4145 8 : }
4146 6679 : if (threadMessageHandler.joinable())
4147 2821 : threadMessageHandler.join();
4148 6679 : if (threadOpenMasternodeConnections.joinable())
4149 2821 : threadOpenMasternodeConnections.join();
4150 6679 : if (threadOpenConnections.joinable())
4151 859 : threadOpenConnections.join();
4152 6679 : if (threadOpenAddedConnections.joinable())
4153 2821 : threadOpenAddedConnections.join();
4154 6679 : if (threadDNSAddressSeed.joinable())
4155 22 : threadDNSAddressSeed.join();
4156 6679 : if (threadSocketHandler.joinable())
4157 2821 : threadSocketHandler.join();
4158 6679 : }
4159 :
4160 6679 : void CConnman::StopNodes()
4161 : {
4162 6679 : if (fAddressesInitialized) {
4163 2821 : DumpAddresses();
4164 2821 : fAddressesInitialized = false;
4165 :
4166 2821 : if (m_use_addrman_outgoing) {
4167 : // Anchor connections are only dumped during clean shutdown.
4168 847 : std::vector<CAddress> anchors_to_dump = GetCurrentBlockRelayOnlyConns();
4169 847 : if (anchors_to_dump.size() > MAX_BLOCK_RELAY_ONLY_ANCHORS) {
4170 0 : anchors_to_dump.resize(MAX_BLOCK_RELAY_ONLY_ANCHORS);
4171 0 : }
4172 847 : DumpAnchors(gArgs.GetDataDirNet() / ANCHORS_DATABASE_FILENAME, anchors_to_dump);
4173 847 : }
4174 2821 : }
4175 :
4176 : // Delete peer connections.
4177 6679 : std::vector<CNode*> nodes;
4178 13358 : WITH_LOCK(m_nodes_mutex, nodes.swap(m_nodes));
4179 11754 : for (CNode *pnode : nodes) {
4180 5075 : pnode->CloseSocketDisconnect(this);
4181 5075 : DeleteNode(pnode);
4182 : }
4183 :
4184 : // Close listening sockets.
4185 6679 : if (m_edge_trig_events) {
4186 5857 : for (ListenSocket& hListenSocket : vhListenSocket) {
4187 3036 : if (hListenSocket.sock) {
4188 3036 : m_edge_trig_events->RemoveSocket(hListenSocket.sock->Get());
4189 3036 : }
4190 : }
4191 2821 : }
4192 :
4193 6682 : for (CNode* pnode : m_nodes_disconnected) {
4194 3 : DeleteNode(pnode);
4195 : }
4196 13358 : WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.clear());
4197 : {
4198 6679 : LOCK(cs_sendable_receivable_nodes);
4199 6679 : mapReceivableNodes.clear();
4200 6679 : }
4201 6679 : m_nodes_disconnected.clear();
4202 6679 : vhListenSocket.clear();
4203 13358 : WITH_LOCK(m_reconnections_mutex, m_reconnections.clear());
4204 6679 : semOutbound.reset();
4205 6679 : semAddnode.reset();
4206 : /**
4207 : * m_wakeup_pipe must be reset *before* m_edge_trig_events as it may
4208 : * attempt to call EdgeTriggeredEvents::UnregisterPipe() in its destructor
4209 : */
4210 6679 : m_wakeup_pipe.reset();
4211 6679 : m_edge_trig_events.reset();
4212 6679 : }
4213 :
4214 9947 : void CConnman::DeleteNode(CNode* pnode)
4215 : {
4216 9947 : assert(pnode);
4217 9947 : m_msgproc->FinalizeNode(*pnode);
4218 9947 : delete pnode;
4219 9947 : }
4220 :
4221 7128 : CConnman::~CConnman()
4222 3562 : {
4223 3566 : Interrupt();
4224 3566 : Stop();
4225 7128 : }
4226 :
4227 1576 : std::vector<CAddress> CConnman::GetAddresses(size_t max_addresses, size_t max_pct, std::optional<Network> network, const bool filtered) const
4228 : {
4229 1576 : std::vector<CAddress> addresses = addrman.GetAddr(max_addresses, max_pct, network, filtered);
4230 1576 : if (m_banman) {
4231 3152 : addresses.erase(std::remove_if(addresses.begin(), addresses.end(),
4232 70309 : [this](const CAddress& addr){return m_banman->IsDiscouraged(addr) || m_banman->IsBanned(addr);}),
4233 1576 : addresses.end());
4234 1576 : }
4235 1576 : return addresses;
4236 1576 : }
4237 :
4238 4896 : std::vector<CAddress> CConnman::GetAddresses(CNode& requestor, size_t max_addresses, size_t max_pct)
4239 : {
4240 4896 : auto local_socket_bytes = requestor.addrBind.GetAddrBytes();
4241 9792 : uint64_t cache_id = GetDeterministicRandomizer(RANDOMIZER_ID_ADDRCACHE)
4242 4896 : .Write(requestor.ConnectedThroughNetwork())
4243 4896 : .Write(local_socket_bytes.data(), local_socket_bytes.size())
4244 : // For outbound connections, the port of the bound address is randomly
4245 : // assigned by the OS and would therefore not be useful for seeding.
4246 4896 : .Write(requestor.IsInboundConn() ? requestor.addrBind.GetPort() : 0)
4247 4896 : .Finalize();
4248 4896 : const auto current_time = GetTime<std::chrono::microseconds>();
4249 4896 : auto r = m_addr_response_caches.emplace(cache_id, CachedAddrResponse{});
4250 : CachedAddrResponse& cache_entry = r.first->second;
4251 : if (cache_entry.m_cache_entry_expiration < current_time) { // If emplace() added new one it has expiration 0.
4252 : cache_entry.m_addrs_response_cache = GetAddresses(max_addresses, max_pct, /*network=*/std::nullopt);
4253 : // Choosing a proper cache lifetime is a trade-off between the privacy leak minimization
4254 : // and the usefulness of ADDR responses to honest users.
4255 : //
4256 : // Longer cache lifetime makes it more difficult for an attacker to scrape
4257 : // enough AddrMan data to maliciously infer something useful.
4258 : // By the time an attacker scraped enough AddrMan records, most of
4259 : // the records should be old enough to not leak topology info by
4260 : // e.g. analyzing real-time changes in timestamps.
4261 : //
4262 : // It takes only several hundred requests to scrape everything from an AddrMan containing 100,000 nodes,
4263 : // so ~24 hours of cache lifetime indeed makes the data less inferable by the time
4264 : // most of it could be scraped (considering that timestamps are updated via
4265 : // ADDR self-announcements and when nodes communicate).
4266 : // We also should be robust to those attacks which may not require scraping *full* victim's AddrMan
4267 : // (because even several timestamps of the same handful of nodes may leak privacy).
4268 : //
4269 : // On the other hand, longer cache lifetime makes ADDR responses
4270 : // outdated and less useful for an honest requestor, e.g. if most nodes
4271 : // in the ADDR response are no longer active.
4272 : //
4273 : // However, the churn in the network is known to be rather low. Since we consider
4274 : // nodes to be "terrible" (see IsTerrible()) if the timestamps are older than 30 days,
4275 : // max. 24 hours of "penalty" due to cache shouldn't make any meaningful difference
4276 : // in terms of the freshness of the response.
4277 : cache_entry.m_cache_entry_expiration = current_time + std::chrono::hours(21) + GetRandMillis(std::chrono::hours(6));
4278 : }
4279 : return cache_entry.m_addrs_response_cache;
4280 0 : }
4281 :
4282 19 : bool CConnman::AddNode(const AddedNodeParams& add)
4283 : {
4284 19 : const CService resolved(LookupNumeric(add.m_added_node, Params().GetDefaultPort(add.m_added_node)));
4285 19 : const bool resolved_is_valid{resolved.IsValid()};
4286 :
4287 19 : LOCK(m_added_nodes_mutex);
4288 29 : for (const auto& it : m_added_node_params) {
4289 36 : if (add.m_added_node == it.m_added_node || (resolved_is_valid && resolved == LookupNumeric(it.m_added_node, Params().GetDefaultPort(it.m_added_node)))) return false;
4290 : }
4291 :
4292 9 : m_added_node_params.push_back(add);
4293 9 : return true;
4294 19 : }
4295 :
4296 8 : bool CConnman::RemoveAddedNode(const std::string& strNode)
4297 : {
4298 8 : LOCK(m_added_nodes_mutex);
4299 8 : for (auto it = m_added_node_params.begin(); it != m_added_node_params.end(); ++it) {
4300 4 : if (strNode == it->m_added_node) {
4301 4 : m_added_node_params.erase(it);
4302 4 : return true;
4303 : }
4304 0 : }
4305 4 : return false;
4306 8 : }
4307 :
4308 41 : bool CConnman::AddedNodesContain(const CAddress& addr) const
4309 : {
4310 41 : AssertLockNotHeld(m_added_nodes_mutex);
4311 41 : const std::string addr_str{addr.ToStringAddr()};
4312 41 : const std::string addr_port_str{addr.ToStringAddrPort()};
4313 41 : LOCK(m_added_nodes_mutex);
4314 41 : return (m_added_node_params.size() < 24 // bound the query to a reasonable limit
4315 41 : && std::any_of(m_added_node_params.cbegin(), m_added_node_params.cend(),
4316 61 : [&](const auto& p) { return p.m_added_node == addr_str || p.m_added_node == addr_port_str; }));
4317 41 : }
4318 :
4319 223 : bool CConnman::AddPendingMasternode(const uint256& proTxHash)
4320 : {
4321 223 : LOCK(cs_vPendingMasternodes);
4322 223 : if (std23::ranges::contains(vPendingMasternodes, proTxHash)) {
4323 35 : return false;
4324 : }
4325 :
4326 188 : vPendingMasternodes.push_back(proTxHash);
4327 188 : return true;
4328 223 : }
4329 :
4330 134893 : void CConnman::SetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash, const Uint256HashSet& proTxHashes)
4331 : {
4332 134893 : LOCK(cs_vPendingMasternodes);
4333 134893 : auto it = masternodeQuorumNodes.emplace(std::make_pair(llmqType, quorumHash), proTxHashes);
4334 134893 : if (!it.second) {
4335 130727 : it.first->second = proTxHashes;
4336 130727 : }
4337 134893 : }
4338 :
4339 142008 : void CConnman::SetMasternodeQuorumRelayMembers(Consensus::LLMQType llmqType, const uint256& quorumHash, const Uint256HashSet& proTxHashes)
4340 : {
4341 : {
4342 142008 : LOCK(cs_vPendingMasternodes);
4343 142008 : auto it = masternodeQuorumRelayMembers.emplace(std::make_pair(llmqType, quorumHash), proTxHashes);
4344 142008 : if (!it.second) {
4345 137549 : it.first->second = proTxHashes;
4346 137549 : }
4347 142008 : }
4348 :
4349 : // Update existing connections
4350 814596 : ForEachNode([&](CNode* pnode) {
4351 672588 : auto verifiedProRegTxHash = pnode->GetVerifiedProRegTxHash();
4352 672588 : if (!verifiedProRegTxHash.IsNull() && !pnode->m_masternode_iqr_connection && IsMasternodeQuorumRelayMember(verifiedProRegTxHash)) {
4353 : // Tell our peer that we're interested in plain LLMQ recovered signatures.
4354 : // Otherwise the peer would only announce/send messages resulting from QRECSIG,
4355 : // e.g. InstantSend locks or ChainLocks. SPV and regular full nodes should not send
4356 : // this message as they are usually only interested in the higher level messages.
4357 323 : const CNetMsgMaker msgMaker(pnode->GetCommonVersion());
4358 323 : PushMessage(pnode, msgMaker.Make(NetMsgType::QSENDRECSIGS, true));
4359 323 : pnode->m_masternode_iqr_connection = true;
4360 323 : }
4361 672588 : });
4362 142008 : }
4363 :
4364 170644 : bool CConnman::HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const
4365 : {
4366 170644 : LOCK(cs_vPendingMasternodes);
4367 170644 : return masternodeQuorumNodes.count(std::make_pair(llmqType, quorumHash));
4368 170644 : }
4369 :
4370 402250 : Uint256HashSet CConnman::GetMasternodeQuorums(Consensus::LLMQType llmqType) const
4371 : {
4372 402250 : LOCK(cs_vPendingMasternodes);
4373 402250 : Uint256HashSet result;
4374 1346647 : for (const auto& p : masternodeQuorumNodes) {
4375 944397 : if (p.first.first != llmqType) {
4376 755225 : continue;
4377 : }
4378 189172 : result.emplace(p.first.second);
4379 : }
4380 402250 : return result;
4381 402250 : }
4382 :
4383 19792 : std::vector<NodeId> CConnman::GetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const
4384 : {
4385 19792 : READ_LOCK(m_nodes_mutex);
4386 19792 : LOCK(cs_vPendingMasternodes);
4387 19792 : auto it = masternodeQuorumNodes.find(std::make_pair(llmqType, quorumHash));
4388 19792 : if (it == masternodeQuorumNodes.end()) {
4389 273 : return {};
4390 : }
4391 19519 : const auto& proRegTxHashes = it->second;
4392 :
4393 19519 : std::vector<NodeId> nodes;
4394 :
4395 114551 : auto IsMasternodeQuorumNode = [&](const CNode* n) {
4396 95032 : if (n->fDisconnect) return false;
4397 95026 : const auto h = n->GetVerifiedProRegTxHash();
4398 95026 : return n->qwatch || (!h.IsNull() && proRegTxHashes.contains(h));
4399 95032 : };
4400 :
4401 75670 : for (NodeId id : m_nodes
4402 19519 : | std::views::filter(IsMasternodeQuorumNode)
4403 56151 : | std::views::transform([](const CNode* n){ return n->GetId(); })) {
4404 36632 : nodes.push_back(id);
4405 : }
4406 19519 : return nodes;
4407 39311 : }
4408 :
4409 1838 : void CConnman::RemoveMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash)
4410 : {
4411 1838 : LOCK(cs_vPendingMasternodes);
4412 1838 : masternodeQuorumNodes.erase(std::make_pair(llmqType, quorumHash));
4413 1838 : masternodeQuorumRelayMembers.erase(std::make_pair(llmqType, quorumHash));
4414 1838 : }
4415 :
4416 36 : bool CConnman::IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list) const
4417 : {
4418 : // Let's see if this is an outgoing connection to an address that is known to be a masternode
4419 : // We however only need to know this if the node did not authenticate itself as a MN yet
4420 36 : uint256 assumedProTxHash;
4421 36 : if (pnode->GetVerifiedProRegTxHash().IsNull() && !pnode->IsInboundConn()) {
4422 21 : auto dmn = tip_mn_list.GetMNByService(pnode->addr);
4423 21 : if (dmn == nullptr) {
4424 : // This is definitely not a masternode
4425 0 : return false;
4426 : }
4427 21 : assumedProTxHash = dmn->proTxHash;
4428 21 : }
4429 :
4430 36 : LOCK(cs_vPendingMasternodes);
4431 63 : for (const auto& p : masternodeQuorumNodes) {
4432 36 : if (!pnode->GetVerifiedProRegTxHash().IsNull()) {
4433 0 : if (p.second.count(pnode->GetVerifiedProRegTxHash())) {
4434 0 : return true;
4435 : }
4436 36 : } else if (!assumedProTxHash.IsNull()) {
4437 21 : if (p.second.count(assumedProTxHash)) {
4438 9 : return true;
4439 : }
4440 12 : }
4441 : }
4442 27 : return false;
4443 36 : }
4444 :
4445 34454 : bool CConnman::IsMasternodeQuorumRelayMember(const uint256& protxHash)
4446 : {
4447 34454 : if (protxHash.IsNull()) {
4448 0 : return false;
4449 : }
4450 34454 : LOCK(cs_vPendingMasternodes);
4451 150465 : for (const auto& p : masternodeQuorumRelayMembers) {
4452 119116 : if (p.second.count(protxHash)) {
4453 3105 : return true;
4454 : }
4455 : }
4456 31349 : return false;
4457 34454 : }
4458 :
4459 347 : void CConnman::AddPendingProbeConnections(const Uint256HashSet& proTxHashes)
4460 : {
4461 347 : LOCK(cs_vPendingMasternodes);
4462 347 : masternodePendingProbes.insert(proTxHashes.begin(), proTxHashes.end());
4463 347 : }
4464 :
4465 43541 : size_t CConnman::GetNodeCount(ConnectionDirection flags) const
4466 : {
4467 43541 : READ_LOCK(m_nodes_mutex);
4468 :
4469 43541 : int nNum = 0;
4470 196118 : for (const auto& pnode : m_nodes) {
4471 152577 : if (pnode->fDisconnect) {
4472 73 : continue;
4473 : }
4474 180293 : if ((flags & ConnectionDirection::Verified) && pnode->GetVerifiedProRegTxHash().IsNull()) {
4475 19008 : continue;
4476 : }
4477 133496 : if (flags & (pnode->IsInboundConn() ? ConnectionDirection::In : ConnectionDirection::Out)) {
4478 118379 : nNum++;
4479 133496 : } else if (flags == ConnectionDirection::Verified) {
4480 2927 : nNum++;
4481 2927 : }
4482 : }
4483 :
4484 43541 : return nNum;
4485 43541 : }
4486 :
4487 0 : std::map<CNetAddr, LocalServiceInfo> CConnman::getNetLocalAddresses() const
4488 : {
4489 0 : LOCK(g_maplocalhost_mutex);
4490 0 : return mapLocalHost;
4491 0 : }
4492 :
4493 5153 : size_t CConnman::GetMaxOutboundNodeCount()
4494 : {
4495 5153 : return m_max_outbound;
4496 : }
4497 :
4498 3 : size_t CConnman::GetMaxOutboundOnionNodeCount()
4499 : {
4500 3 : return m_max_outbound_onion;
4501 : }
4502 :
4503 104131 : uint32_t CConnman::GetMappedAS(const CNetAddr& addr) const
4504 : {
4505 104131 : return m_netgroupman.GetMappedAS(addr);
4506 : }
4507 :
4508 31207 : void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats) const
4509 : {
4510 31207 : vstats.clear();
4511 31207 : READ_LOCK(m_nodes_mutex);
4512 31207 : vstats.reserve(m_nodes.size());
4513 117595 : for (const CNode* pnode : m_nodes) {
4514 86388 : if (pnode->fDisconnect) {
4515 249 : continue;
4516 : }
4517 86139 : vstats.emplace_back();
4518 86139 : pnode->CopyStats(vstats.back());
4519 86139 : vstats.back().m_mapped_as = GetMappedAS(pnode->addr);
4520 : }
4521 31207 : }
4522 :
4523 8 : bool CConnman::DisconnectNode(const std::string& strNode)
4524 : {
4525 20 : return WithNodeMutable(strNode, [&](CNode* pnode){
4526 4 : LogPrint(BCLog::NET_NETCONN, "disconnect by address%s matched peer=%d; disconnecting\n", (fLogIPs ? strprintf("=%s", strNode) : ""), pnode->GetId());
4527 4 : pnode->fDisconnect = true;
4528 4 : return true;
4529 8 : }).value_or(false);
4530 : }
4531 :
4532 51 : bool CConnman::DisconnectNode(const CSubNet& subnet)
4533 : {
4534 51 : bool disconnected = false;
4535 51 : READ_LOCK(m_nodes_mutex);
4536 77 : for (CNode* pnode : m_nodes) {
4537 26 : if (subnet.Match(pnode->addr)) {
4538 19 : LogPrint(BCLog::NET_NETCONN, "disconnect by subnet%s matched peer=%d; disconnecting\n", (fLogIPs ? strprintf("=%s", subnet.ToString()) : ""), pnode->GetId());
4539 19 : pnode->fDisconnect = true;
4540 19 : disconnected = true;
4541 19 : }
4542 : }
4543 51 : return disconnected;
4544 51 : }
4545 :
4546 30 : bool CConnman::DisconnectNode(const CNetAddr& addr)
4547 : {
4548 30 : return DisconnectNode(CSubNet(addr));
4549 0 : }
4550 :
4551 162 : bool CConnman::DisconnectNode(NodeId id)
4552 : {
4553 486 : return WithNodeMutable(id, [&](CNode* pnode){
4554 162 : LogPrint(BCLog::NET_NETCONN, "disconnect by id peer=%d; disconnecting\n", pnode->GetId());
4555 162 : pnode->fDisconnect = true;
4556 162 : return true;
4557 162 : }).value_or(false);
4558 : }
4559 :
4560 793098 : void CConnman::RecordBytesRecv(uint64_t bytes)
4561 : {
4562 793098 : nTotalBytesRecv += bytes;
4563 793098 : ::g_stats_client->count("bandwidth.bytesReceived", bytes, 0.1f);
4564 793098 : ::g_stats_client->gauge("bandwidth.totalBytesReceived", nTotalBytesRecv.load(), 0.01f);
4565 793098 : }
4566 :
4567 757222 : void CConnman::RecordBytesSent(uint64_t bytes)
4568 : {
4569 757222 : AssertLockNotHeld(m_total_bytes_sent_mutex);
4570 757222 : LOCK(m_total_bytes_sent_mutex);
4571 :
4572 757222 : nTotalBytesSent += bytes;
4573 757222 : ::g_stats_client->count("bandwidth.bytesSent", bytes, 0.01f);
4574 757222 : ::g_stats_client->gauge("bandwidth.totalBytesSent", nTotalBytesSent, 0.01f);
4575 :
4576 757222 : const auto now = GetTime<std::chrono::seconds>();
4577 757222 : if (nMaxOutboundCycleStartTime + MAX_UPLOAD_TIMEFRAME < now)
4578 : {
4579 : // timeframe expired, reset cycle
4580 2045 : nMaxOutboundCycleStartTime = now;
4581 2045 : nMaxOutboundTotalBytesSentInCycle = 0;
4582 2045 : }
4583 :
4584 757222 : nMaxOutboundTotalBytesSentInCycle += bytes;
4585 757222 : }
4586 :
4587 24 : uint64_t CConnman::GetMaxOutboundTarget() const
4588 : {
4589 24 : AssertLockNotHeld(m_total_bytes_sent_mutex);
4590 24 : LOCK(m_total_bytes_sent_mutex);
4591 24 : return nMaxOutboundLimit;
4592 24 : }
4593 :
4594 24 : std::chrono::seconds CConnman::GetMaxOutboundTimeframe() const
4595 : {
4596 24 : return MAX_UPLOAD_TIMEFRAME;
4597 : }
4598 :
4599 24 : std::chrono::seconds CConnman::GetMaxOutboundTimeLeftInCycle() const
4600 : {
4601 24 : AssertLockNotHeld(m_total_bytes_sent_mutex);
4602 24 : LOCK(m_total_bytes_sent_mutex);
4603 24 : return GetMaxOutboundTimeLeftInCycle_();
4604 24 : }
4605 :
4606 1116 : std::chrono::seconds CConnman::GetMaxOutboundTimeLeftInCycle_() const
4607 : {
4608 1116 : AssertLockHeld(m_total_bytes_sent_mutex);
4609 :
4610 1116 : if (nMaxOutboundLimit == 0)
4611 24 : return 0s;
4612 :
4613 1092 : if (nMaxOutboundCycleStartTime.count() == 0)
4614 0 : return MAX_UPLOAD_TIMEFRAME;
4615 :
4616 1092 : const std::chrono::seconds cycleEndTime = nMaxOutboundCycleStartTime + MAX_UPLOAD_TIMEFRAME;
4617 1092 : const auto now = GetTime<std::chrono::seconds>();
4618 1092 : return (cycleEndTime < now) ? 0s : cycleEndTime - now;
4619 1116 : }
4620 :
4621 54918 : bool CConnman::OutboundTargetReached(bool historicalBlockServingLimit) const
4622 : {
4623 54918 : AssertLockNotHeld(m_total_bytes_sent_mutex);
4624 54918 : LOCK(m_total_bytes_sent_mutex);
4625 54918 : if (nMaxOutboundLimit == 0)
4626 53826 : return false;
4627 :
4628 1092 : if (historicalBlockServingLimit)
4629 : {
4630 : // keep a large enough buffer to at least relay each block once
4631 1092 : const std::chrono::seconds timeLeftInCycle = GetMaxOutboundTimeLeftInCycle_();
4632 1092 : const uint64_t buffer = timeLeftInCycle / std::chrono::minutes{10} * MaxBlockSize();
4633 1092 : if (buffer >= nMaxOutboundLimit || nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit - buffer)
4634 669 : return true;
4635 423 : }
4636 0 : else if (nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit)
4637 0 : return true;
4638 :
4639 423 : return false;
4640 54918 : }
4641 :
4642 24 : uint64_t CConnman::GetOutboundTargetBytesLeft() const
4643 : {
4644 24 : AssertLockNotHeld(m_total_bytes_sent_mutex);
4645 24 : LOCK(m_total_bytes_sent_mutex);
4646 24 : if (nMaxOutboundLimit == 0)
4647 24 : return 0;
4648 :
4649 0 : return (nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit) ? 0 : nMaxOutboundLimit - nMaxOutboundTotalBytesSentInCycle;
4650 24 : }
4651 :
4652 24 : uint64_t CConnman::GetTotalBytesRecv() const
4653 : {
4654 24 : return nTotalBytesRecv;
4655 : }
4656 :
4657 24 : uint64_t CConnman::GetTotalBytesSent() const
4658 : {
4659 24 : AssertLockNotHeld(m_total_bytes_sent_mutex);
4660 24 : LOCK(m_total_bytes_sent_mutex);
4661 24 : return nTotalBytesSent;
4662 24 : }
4663 :
4664 16422 : ServiceFlags CConnman::GetLocalServices() const
4665 : {
4666 16422 : return nLocalServices;
4667 : }
4668 :
4669 9987 : static std::unique_ptr<Transport> MakeTransport(NodeId id, bool use_v2transport, bool inbound) noexcept
4670 : {
4671 9987 : if (use_v2transport) {
4672 301 : return std::make_unique<V2Transport>(id, /*initiating=*/!inbound, SER_NETWORK, INIT_PROTO_VERSION);
4673 : } else {
4674 9686 : return std::make_unique<V1Transport>(id, SER_NETWORK, INIT_PROTO_VERSION);
4675 : }
4676 9987 : }
4677 :
4678 19976 : CNode::CNode(NodeId idIn,
4679 : std::shared_ptr<Sock> sock,
4680 : const CAddress& addrIn,
4681 : uint64_t nKeyedNetGroupIn,
4682 : uint64_t nLocalHostNonceIn,
4683 : const CAddress& addrBindIn,
4684 : const std::string& addrNameIn,
4685 : ConnectionType conn_type_in,
4686 : bool inbound_onion,
4687 : CNodeOptions&& node_opts)
4688 9989 : : m_transport{MakeTransport(idIn, node_opts.use_v2transport, conn_type_in == ConnectionType::INBOUND)},
4689 9989 : m_permission_flags{node_opts.permission_flags},
4690 9989 : m_sock{sock},
4691 : m_connected{GetTime<std::chrono::seconds>()},
4692 : addr{addrIn},
4693 : addrBind{addrBindIn},
4694 : m_addr_name{addrNameIn.empty() ? addr.ToStringAddrPort() : addrNameIn},
4695 : m_dest(addrNameIn),
4696 : m_inbound_onion{inbound_onion},
4697 : m_prefer_evict{node_opts.prefer_evict},
4698 : nKeyedNetGroup{nKeyedNetGroupIn},
4699 : m_conn_type{conn_type_in},
4700 : id{idIn},
4701 : nLocalHostNonce{nLocalHostNonceIn},
4702 : m_recv_flood_size{node_opts.recv_flood_size},
4703 : m_i2p_sam_session{std::move(node_opts.i2p_sam_session)}
4704 9987 : {
4705 : if (inbound_onion) assert(conn_type_in == ConnectionType::INBOUND);
4706 :
4707 : for (const std::string &msg : getAllNetMessageTypes())
4708 : mapRecvBytesPerMsgType[msg] = 0;
4709 : mapRecvBytesPerMsgType[NET_MESSAGE_TYPE_OTHER] = 0;
4710 :
4711 : if (fLogIPs) {
4712 : LogPrint(BCLog::NET, "Added connection to %s peer=%d\n", m_addr_name, id);
4713 : } else {
4714 : LogPrint(BCLog::NET, "Added connection peer=%d\n", id);
4715 : }
4716 9987 : }
4717 :
4718 734572 : void CNode::MarkReceivedMsgsForProcessing()
4719 : {
4720 734572 : AssertLockNotHeld(m_msg_process_queue_mutex);
4721 :
4722 734572 : size_t nQuorumSizeAdded = 0;
4723 734572 : size_t nNormalSizeAdded = 0;
4724 734572 : std::list<CNetMessage> quorumMsgs;
4725 734572 : std::list<CNetMessage> normalMsgs;
4726 :
4727 : // Classify messages into quorum-priority and normal queues
4728 1555467 : for (auto it = vRecvMsg.begin(); it != vRecvMsg.end();) {
4729 820895 : auto& msg = *it;
4730 : // vRecvMsg contains only completed CNetMessage
4731 : // the single possible partially deserialized message are held by TransportDeserializer
4732 820895 : if (IsQuorumPriorityMessage(msg.m_type)) {
4733 82137 : quorumMsgs.splice(quorumMsgs.end(), vRecvMsg, it++);
4734 82137 : nQuorumSizeAdded += msg.m_raw_message_size;
4735 82137 : } else {
4736 738758 : normalMsgs.splice(normalMsgs.end(), vRecvMsg, it++);
4737 738758 : nNormalSizeAdded += msg.m_raw_message_size;
4738 : }
4739 : }
4740 :
4741 734572 : LOCK(m_msg_process_queue_mutex);
4742 : // Splice classified messages into appropriate queues
4743 734572 : m_msg_quorum_queue.splice(m_msg_quorum_queue.end(), quorumMsgs);
4744 734572 : m_msg_quorum_queue_size += nQuorumSizeAdded;
4745 734572 : m_msg_process_queue.splice(m_msg_process_queue.end(), normalMsgs);
4746 734572 : m_msg_process_queue_size += nNormalSizeAdded;
4747 : // Compute backpressure over combined size of both queues
4748 734572 : fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size;
4749 734572 : }
4750 :
4751 3669751 : std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage()
4752 : {
4753 3669751 : LOCK(m_msg_process_queue_mutex);
4754 :
4755 : // Ratio-based processing: process N quorum messages for every 1 normal message
4756 : // This ensures forward progress for both queues while strongly prioritizing quorum messages
4757 : // However, if normal queue is empty, process quorum messages in bursts (like old algorithm)
4758 3669751 : constexpr size_t QUORUM_TO_NORMAL_RATIO = 100;
4759 :
4760 : // Check if we should process normal queue for forward progress
4761 : // Only apply ratio when both queues have messages to allow burst processing when normal queue is empty
4762 4379263 : bool skip_quorum_processing = !m_msg_process_queue.empty() &&
4763 709512 : m_quorum_msg_count_since_normal >= QUORUM_TO_NORMAL_RATIO;
4764 :
4765 : // Prioritize quorum queue: pop from it first if non-empty and ratio not reached
4766 : // If normal queue is empty, process quorum messages without ratio limit (burst mode)
4767 3669751 : if (!m_msg_quorum_queue.empty() && !skip_quorum_processing) {
4768 82126 : std::list<CNetMessage> msgs;
4769 : // Just take one message from quorum queue
4770 82126 : msgs.splice(msgs.begin(), m_msg_quorum_queue, m_msg_quorum_queue.begin());
4771 82126 : m_msg_quorum_queue_size -= msgs.front().m_raw_message_size;
4772 : // Only increment counter if normal queue has messages (to track ratio)
4773 : // If normal queue is empty, don't increment so we can process bursts quickly
4774 82126 : if (!m_msg_process_queue.empty()) {
4775 5354 : ++m_quorum_msg_count_since_normal;
4776 5354 : }
4777 : // Compute backpressure over combined size of both queues
4778 82126 : fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size;
4779 : // Return true for 'more' if either queue has remaining messages
4780 82126 : return std::make_pair(std::move(msgs.front()), !m_msg_quorum_queue.empty() || !m_msg_process_queue.empty());
4781 82126 : }
4782 :
4783 : // Process normal queue (either because quorum queue is empty or ratio reached)
4784 3587625 : if (m_msg_process_queue.empty()) return std::nullopt;
4785 :
4786 704158 : std::list<CNetMessage> msgs;
4787 : // Just take one message
4788 704158 : msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin());
4789 704158 : m_msg_process_queue_size -= msgs.front().m_raw_message_size;
4790 704158 : m_quorum_msg_count_since_normal = 0; // Reset counter after processing normal message
4791 : // Compute backpressure over combined size of both queues
4792 704158 : fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size;
4793 :
4794 704158 : return std::make_pair(std::move(msgs.front()), !m_msg_quorum_queue.empty() || !m_msg_process_queue.empty());
4795 3669751 : }
4796 :
4797 3526616 : bool CConnman::NodeFullyConnected(const CNode* pnode)
4798 : {
4799 3526616 : return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
4800 : }
4801 :
4802 805270 : void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
4803 : {
4804 805270 : AssertLockNotHeld(m_total_bytes_sent_mutex);
4805 805270 : size_t nMessageSize = msg.data.size();
4806 805270 : LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", msg.m_type, nMessageSize, pnode->GetId());
4807 805270 : if (gArgs.GetBoolArg("-capturemessages", false)) {
4808 30 : CaptureMessage(pnode->addr, msg.m_type, msg.data, /*is_incoming=*/false);
4809 30 : }
4810 :
4811 : TRACE6(net, outbound_message,
4812 : pnode->GetId(),
4813 : pnode->m_addr_name.c_str(),
4814 : pnode->ConnectionTypeAsString().c_str(),
4815 : msg.m_type.c_str(),
4816 : msg.data.size(),
4817 : msg.data.data()
4818 : );
4819 :
4820 805270 : ::g_stats_client->count(strprintf("bandwidth.message.%s.bytesSent", msg.m_type), nMessageSize, 1.0f);
4821 805270 : ::g_stats_client->inc(strprintf("message.sent.%s", msg.m_type), 1.0f);
4822 :
4823 : {
4824 805270 : LOCK(pnode->cs_vSend);
4825 : // Check if the transport still has unsent bytes, and indicate to it that we're about to
4826 : // give it a message to send.
4827 1562038 : const auto& [to_send, more, _msg_type] =
4828 805270 : pnode->m_transport->GetBytesToSend(/*have_next_message=*/true);
4829 805270 : const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()};
4830 :
4831 : // Update memory usage of send buffer.
4832 805270 : pnode->m_send_memusage += msg.GetMemoryUsage();
4833 805270 : if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true;
4834 : // Move message to vSendMsg queue.
4835 805270 : pnode->vSendMsg.push_back(std::move(msg));
4836 805269 : pnode->nSendMsgSize = pnode->vSendMsg.size();
4837 :
4838 : // If there was nothing to send before, and there is now (predicted by the "more" value
4839 : // returned by the GetBytesToSend call above), attempt "optimistic write":
4840 : // because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
4841 : // doing a send, try sending from the calling thread if the queue was empty before.
4842 : // With a V1Transport, more will always be true here, because adding a message always
4843 : // results in sendable bytes there, but with V2Transport this is not the case (it may
4844 : // still be in the handshake).
4845 805269 : if (queue_was_empty && more) {
4846 756768 : if (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load()) {
4847 511695 : m_wakeup_pipe->Write();
4848 511696 : }
4849 756769 : }
4850 805272 : }
4851 805274 : }
4852 :
4853 3545 : bool CConnman::ForNode(const CService& addr, std::function<bool(const CNode* pnode)> cond, std::function<bool(CNode* pnode)> func)
4854 : {
4855 3545 : READ_LOCK(m_nodes_mutex);
4856 3545 : CNode* found = FindNodeMutable(addr, false);
4857 3545 : return found != nullptr && cond(found) && func(found);
4858 3545 : }
4859 :
4860 0 : bool CConnman::ForNode(const CService& addr, std::function<bool(const CNode* pnode)> cond, std::function<bool(const CNode* pnode)> func) const
4861 : {
4862 0 : READ_LOCK(m_nodes_mutex);
4863 0 : const CNode* found = FindNode(addr, false);
4864 0 : return found != nullptr && cond(found) && func(found);
4865 0 : }
4866 :
4867 1971 : bool CConnman::ForNode(NodeId id, std::function<bool(const CNode* pnode)> cond, std::function<bool(CNode* pnode)> func)
4868 : {
4869 1971 : READ_LOCK(m_nodes_mutex);
4870 1971 : CNode* found = FindNodeMutable(id, false);
4871 1971 : return found != nullptr && cond(found) && func(found);
4872 1971 : }
4873 :
4874 0 : bool CConnman::ForNode(NodeId id, std::function<bool(const CNode* pnode)> cond, std::function<bool(const CNode* pnode)> func) const
4875 : {
4876 0 : READ_LOCK(m_nodes_mutex);
4877 0 : const CNode* found = FindNode(id, false);
4878 0 : return found != nullptr && cond(found) && func(found);
4879 0 : }
4880 :
4881 0 : bool CConnman::IsMasternodeOrDisconnectRequested(const CService& addr) const
4882 : {
4883 0 : return ForNode(addr, AllNodes, [](const CNode* pnode){
4884 0 : return pnode->m_masternode_connection || pnode->fDisconnect;
4885 : });
4886 0 : }
4887 :
4888 62139405 : CConnman::NodesSnapshot::NodesSnapshot(const CConnman& connman, std::function<bool(const CNode* pnode)> filter,
4889 : bool shuffle)
4890 31069793 : {
4891 : {
4892 31069612 : READ_LOCK(connman.m_nodes_mutex);
4893 31066268 : m_nodes_copy.reserve(connman.m_nodes.size());
4894 :
4895 71483495 : for (auto& node : connman.m_nodes) {
4896 40436373 : if (!filter(node)) continue;
4897 40405044 : node->AddRef();
4898 40422778 : m_nodes_copy.push_back(node);
4899 : }
4900 31107924 : }
4901 31047122 : if (shuffle) {
4902 1055268 : Shuffle(m_nodes_copy.begin(), m_nodes_copy.end(), FastRandomContext{});
4903 1055268 : }
4904 62181061 : }
4905 :
4906 62139899 : CConnman::NodesSnapshot::~NodesSnapshot()
4907 31070034 : {
4908 71498717 : for (auto& node : m_nodes_copy) {
4909 40428852 : node->Release();
4910 : }
4911 62139899 : }
4912 :
4913 24889 : CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) const
4914 : {
4915 24889 : return CSipHasher(nSeed0, nSeed1).Write(id);
4916 : }
4917 :
4918 9947 : uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& address) const
4919 : {
4920 9947 : std::vector<unsigned char> vchNetGroup(m_netgroupman.GetGroup(address));
4921 :
4922 9947 : return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize();
4923 9947 : }
4924 :
4925 113243 : void CConnman::PerformReconnections()
4926 : {
4927 113243 : AssertLockNotHeld(m_reconnections_mutex);
4928 113243 : AssertLockNotHeld(m_unused_i2p_sessions_mutex);
4929 113249 : while (true) {
4930 : // Move first element of m_reconnections to todo (avoiding an allocation inside the lock).
4931 113249 : decltype(m_reconnections) todo;
4932 : {
4933 113249 : LOCK(m_reconnections_mutex);
4934 113249 : if (m_reconnections.empty()) break;
4935 6 : todo.splice(todo.end(), m_reconnections, m_reconnections.begin());
4936 113249 : }
4937 :
4938 6 : auto& item = *todo.begin();
4939 12 : OpenNetworkConnection(item.addr_connect,
4940 : // We only reconnect if the first attempt to connect succeeded at
4941 : // connection time, but then failed after the CNode object was
4942 : // created. Since we already know connecting is possible, do not
4943 : // count failure to reconnect.
4944 : /*fCountFailure=*/false,
4945 6 : std::move(item.grant),
4946 6 : item.destination.empty() ? nullptr : item.destination.c_str(),
4947 6 : item.conn_type,
4948 6 : item.use_v2transport,
4949 6 : item.masternode_connection ? MasternodeConn::IsConnection : MasternodeConn::IsNotConnection,
4950 6 : item.masternode_probe_connection ? MasternodeProbeConn::IsConnection : MasternodeProbeConn::IsNotConnection);
4951 113249 : }
4952 113243 : }
4953 :
4954 14 : void CConnman::ASMapHealthCheck()
4955 : {
4956 14 : const std::vector<CAddress> v4_addrs{GetAddresses(/*max_addresses=*/ 0, /*max_pct=*/ 0, Network::NET_IPV4, /*filtered=*/ false)};
4957 14 : const std::vector<CAddress> v6_addrs{GetAddresses(/*max_addresses=*/ 0, /*max_pct=*/ 0, Network::NET_IPV6, /*filtered=*/ false)};
4958 14 : std::vector<CNetAddr> clearnet_addrs;
4959 14 : clearnet_addrs.reserve(v4_addrs.size() + v6_addrs.size());
4960 14 : std::transform(v4_addrs.begin(), v4_addrs.end(), std::back_inserter(clearnet_addrs),
4961 8 : [](const CAddress& addr) { return static_cast<CNetAddr>(addr); });
4962 14 : std::transform(v6_addrs.begin(), v6_addrs.end(), std::back_inserter(clearnet_addrs),
4963 0 : [](const CAddress& addr) { return static_cast<CNetAddr>(addr); });
4964 14 : m_netgroupman.ASMapHealthCheck(clearnet_addrs);
4965 14 : }
4966 :
4967 : // Dump binary message to file, with timestamp.
4968 41 : static void CaptureMessageToFile(const CAddress& addr,
4969 : const std::string& msg_type,
4970 : Span<const unsigned char> data,
4971 : bool is_incoming)
4972 : {
4973 : // Note: This function captures the message at the time of processing,
4974 : // not at socket receive/send time.
4975 : // This ensures that the messages are always in order from an application
4976 : // layer (processing) perspective.
4977 41 : auto now = GetTime<std::chrono::microseconds>();
4978 :
4979 : // Windows folder names cannot include a colon
4980 41 : std::string clean_addr = addr.ToStringAddrPort();
4981 41 : std::replace(clean_addr.begin(), clean_addr.end(), ':', '_');
4982 :
4983 41 : fs::path base_path = gArgs.GetDataDirNet() / "message_capture" / fs::u8path(clean_addr);
4984 41 : fs::create_directories(base_path);
4985 :
4986 41 : fs::path path = base_path / (is_incoming ? "msgs_recv.dat" : "msgs_sent.dat");
4987 41 : AutoFile f{fsbridge::fopen(path, "ab")};
4988 :
4989 41 : ser_writedata64(f, now.count());
4990 41 : f.write(MakeByteSpan(msg_type));
4991 248 : for (auto i = msg_type.length(); i < CMessageHeader::COMMAND_SIZE; ++i) {
4992 207 : f << uint8_t{'\0'};
4993 207 : }
4994 41 : uint32_t size = data.size();
4995 41 : ser_writedata32(f, size);
4996 41 : f.write(AsBytes(data));
4997 41 : }
4998 :
4999 : std::function<void(const CAddress& addr,
5000 : const std::string& msg_type,
5001 : Span<const unsigned char> data,
5002 : bool is_incoming)>
5003 3308 : CaptureMessage = CaptureMessageToFile;
|