Line data Source code
1 : // Copyright (c) 2017-2023 Vincent Thiery
2 : // Copyright (c) 2024-2025 The Dash Core developers
3 : // Distributed under the MIT software license, see the accompanying
4 : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 :
6 : #include <stats/rawsender.h>
7 :
8 : #include <logging.h>
9 : #include <netaddress.h>
10 : #include <netbase.h>
11 : #include <util/sock.h>
12 : #include <util/thread.h>
13 :
14 16 : RawSender::RawSender(const std::string& host, uint16_t port, std::pair<uint64_t, uint8_t> batching_opts,
15 : uint64_t interval_ms, std::optional<bilingual_str>& error) :
16 : m_host{host},
17 : m_port{port},
18 : m_batching_opts{batching_opts},
19 : m_interval_ms{interval_ms}
20 8 : {
21 : if (host.empty()) {
22 : error = _("No host specified");
23 : return;
24 : }
25 :
26 : if (auto netaddr = LookupHost(m_host, /*fAllowLookup=*/true); netaddr.has_value()) {
27 : if (!netaddr->IsIPv4() && !netaddr->IsIPv6()) {
28 : error = strprintf(_("Host %s on unsupported network"), m_host);
29 : return;
30 : }
31 : if (!CService(*netaddr, port).GetSockAddr(reinterpret_cast<struct sockaddr*>(&m_server.first), &m_server.second)) {
32 : error = strprintf(_("Cannot get socket address for %s"), m_host);
33 : return;
34 : }
35 : } else {
36 : error = strprintf(_("Unable to lookup host %s"), m_host);
37 : return;
38 : }
39 :
40 : SOCKET hSocket = ::socket(reinterpret_cast<struct sockaddr*>(&m_server.first)->sa_family, SOCK_DGRAM, IPPROTO_UDP);
41 : if (hSocket == INVALID_SOCKET) {
42 : error = strprintf(_("Cannot create socket (socket() returned error %s)"), NetworkErrorString(WSAGetLastError()));
43 : return;
44 : }
45 : m_sock = std::make_unique<Sock>(hSocket);
46 :
47 : if (m_interval_ms == 0) {
48 : LogPrintf("Send interval is zero, not starting RawSender queueing thread.\n");
49 : } else {
50 : m_interrupt.reset();
51 2 : m_thread = std::thread(&util::TraceThread, "rawsender", [this] { QueueThreadMain(); });
52 : }
53 :
54 : LogPrintf("Started %sRawSender sending messages to %s:%d\n", m_thread.joinable() ? "threaded " : "", m_host, m_port);
55 8 : }
56 :
57 16 : RawSender::~RawSender()
58 8 : {
59 : // If there is a thread, interrupt and stop it
60 8 : if (m_thread.joinable()) {
61 2 : m_interrupt();
62 2 : m_thread.join();
63 2 : }
64 : // Flush queue of uncommitted messages
65 8 : QueueFlush();
66 :
67 8 : LogPrintf("Stopped RawSender instance sending messages to %s:%d. %d successes, %d failures.\n",
68 : m_host, m_port, m_successes, m_failures);
69 16 : }
70 :
71 72 : std::optional<bilingual_str> RawSender::Send(const RawMessage& msg)
72 : {
73 : // If there is a thread, append to queue
74 72 : if (m_thread.joinable()) {
75 24 : QueueAdd(msg);
76 24 : return std::nullopt;
77 : }
78 : // There isn't a queue, send directly
79 48 : return SendDirectly(msg);
80 72 : }
81 :
82 50 : std::optional<bilingual_str> RawSender::SendDirectly(const RawMessage& msg)
83 : {
84 50 : if (!m_sock) {
85 0 : m_failures++;
86 0 : return _("Socket not initialized, cannot send message");
87 : }
88 :
89 150 : if (::sendto(m_sock->Get(), reinterpret_cast<const char*>(msg.data()),
90 : #ifdef WIN32
91 : static_cast<int>(msg.size()),
92 : #else
93 50 : msg.size(),
94 : #endif // WIN32
95 100 : /*flags=*/0, reinterpret_cast<struct sockaddr*>(&m_server.first), m_server.second) == SOCKET_ERROR) {
96 0 : m_failures++;
97 0 : return strprintf(_("Unable to send message to %s (::sendto() returned error %s)"), this->ToStringHostPort(),
98 0 : NetworkErrorString(WSAGetLastError()));
99 : }
100 :
101 50 : m_successes++;
102 50 : return std::nullopt;
103 50 : }
104 :
105 0 : std::string RawSender::ToStringHostPort() const { return strprintf("%s:%d", m_host, m_port); }
106 :
107 24 : void RawSender::QueueAdd(const RawMessage& msg)
108 : {
109 24 : AssertLockNotHeld(cs);
110 24 : LOCK(cs);
111 :
112 70 : const auto& [batch_size, batch_delim] = m_batching_opts;
113 : // If no batch size has been specified, simply add to queue
114 24 : if (batch_size == 0) {
115 0 : m_queue.push_back(msg);
116 0 : return;
117 : }
118 :
119 : // We can batch, either create a new batch in queue or append to existing batch in queue
120 24 : if (m_queue.empty() || m_queue.back().size() + msg.size() >= batch_size) {
121 : // Either we don't have a place to batch our message or we exceeded the batch size, make a new batch
122 2 : m_queue.emplace_back();
123 4 : m_queue.back().reserve(batch_size);
124 24 : } else if (!m_queue.back().empty()) {
125 : // When there is already a batch open we need a delimiter when its not empty
126 22 : m_queue.back() += batch_delim;
127 22 : }
128 :
129 : // Add the new message to the batch
130 24 : m_queue.back() += msg;
131 24 : }
132 :
133 8 : void RawSender::QueueFlush()
134 : {
135 8 : AssertLockNotHeld(cs);
136 16 : WITH_LOCK(cs, QueueFlush(m_queue));
137 8 : }
138 :
139 10 : void RawSender::QueueFlush(std::deque<RawMessage>& queue)
140 : {
141 12 : while (!queue.empty()) {
142 2 : SendDirectly(queue.front());
143 2 : queue.pop_front();
144 : }
145 10 : }
146 :
147 2 : void RawSender::QueueThreadMain()
148 : {
149 2 : AssertLockNotHeld(cs);
150 :
151 2 : while (!m_interrupt) {
152 : // Swap the queues to commit the existing queue of messages
153 2 : std::deque<RawMessage> queue;
154 4 : WITH_LOCK(cs, m_queue.swap(queue));
155 :
156 : // Flush the committed queue
157 2 : QueueFlush(queue);
158 2 : assert(queue.empty());
159 :
160 2 : if (!m_interrupt.sleep_for(std::chrono::milliseconds(m_interval_ms))) {
161 2 : return;
162 : }
163 2 : }
164 2 : }
|