Line data Source code
1 : // Copyright (c) 2017-2023 Vincent Thiery
2 : // Copyright (c) 2024-2025 The Dash Core developers
3 : // Distributed under the MIT software license, see the accompanying
4 : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 :
6 : #include <stats/rawsender.h>
7 :
8 : #include <logging.h>
9 : #include <netaddress.h>
10 : #include <netbase.h>
11 : #include <util/sock.h>
12 : #include <util/thread.h>
13 :
14 0 : RawSender::RawSender(const std::string& host, uint16_t port, std::pair<uint64_t, uint8_t> batching_opts,
15 : uint64_t interval_ms, std::optional<bilingual_str>& error) :
16 : m_host{host},
17 : m_port{port},
18 : m_batching_opts{batching_opts},
19 : m_interval_ms{interval_ms}
20 0 : {
21 : if (host.empty()) {
22 : error = _("No host specified");
23 : return;
24 : }
25 :
26 : if (auto netaddr = LookupHost(m_host, /*fAllowLookup=*/true); netaddr.has_value()) {
27 : if (!netaddr->IsIPv4() && !netaddr->IsIPv6()) {
28 : error = strprintf(_("Host %s on unsupported network"), m_host);
29 : return;
30 : }
31 : if (!CService(*netaddr, port).GetSockAddr(reinterpret_cast<struct sockaddr*>(&m_server.first), &m_server.second)) {
32 : error = strprintf(_("Cannot get socket address for %s"), m_host);
33 : return;
34 : }
35 : } else {
36 : error = strprintf(_("Unable to lookup host %s"), m_host);
37 : return;
38 : }
39 :
40 : SOCKET hSocket = ::socket(reinterpret_cast<struct sockaddr*>(&m_server.first)->sa_family, SOCK_DGRAM, IPPROTO_UDP);
41 : if (hSocket == INVALID_SOCKET) {
42 : error = strprintf(_("Cannot create socket (socket() returned error %s)"), NetworkErrorString(WSAGetLastError()));
43 : return;
44 : }
45 : m_sock = std::make_unique<Sock>(hSocket);
46 :
47 : if (m_interval_ms == 0) {
48 : LogPrintf("Send interval is zero, not starting RawSender queueing thread.\n");
49 : } else {
50 : m_interrupt.reset();
51 0 : m_thread = std::thread(&util::TraceThread, "rawsender", [this] { QueueThreadMain(); });
52 : }
53 :
54 : LogPrintf("Started %sRawSender sending messages to %s:%d\n", m_thread.joinable() ? "threaded " : "", m_host, m_port);
55 0 : }
56 :
57 0 : RawSender::~RawSender()
58 0 : {
59 : // If there is a thread, interrupt and stop it
60 0 : if (m_thread.joinable()) {
61 0 : m_interrupt();
62 0 : m_thread.join();
63 0 : }
64 : // Flush queue of uncommitted messages
65 0 : QueueFlush();
66 :
67 0 : LogPrintf("Stopped RawSender instance sending messages to %s:%d. %d successes, %d failures.\n",
68 : m_host, m_port, m_successes, m_failures);
69 0 : }
70 :
71 0 : std::optional<bilingual_str> RawSender::Send(const RawMessage& msg)
72 : {
73 : // If there is a thread, append to queue
74 0 : if (m_thread.joinable()) {
75 0 : QueueAdd(msg);
76 0 : return std::nullopt;
77 : }
78 : // There isn't a queue, send directly
79 0 : return SendDirectly(msg);
80 0 : }
81 :
82 0 : std::optional<bilingual_str> RawSender::SendDirectly(const RawMessage& msg)
83 : {
84 0 : if (!m_sock) {
85 0 : m_failures++;
86 0 : return _("Socket not initialized, cannot send message");
87 : }
88 :
89 0 : if (::sendto(m_sock->Get(), reinterpret_cast<const char*>(msg.data()),
90 : #ifdef WIN32
91 : static_cast<int>(msg.size()),
92 : #else
93 0 : msg.size(),
94 : #endif // WIN32
95 0 : /*flags=*/0, reinterpret_cast<struct sockaddr*>(&m_server.first), m_server.second) == SOCKET_ERROR) {
96 0 : m_failures++;
97 0 : return strprintf(_("Unable to send message to %s (::sendto() returned error %s)"), this->ToStringHostPort(),
98 0 : NetworkErrorString(WSAGetLastError()));
99 : }
100 :
101 0 : m_successes++;
102 0 : return std::nullopt;
103 0 : }
104 :
105 0 : std::string RawSender::ToStringHostPort() const { return strprintf("%s:%d", m_host, m_port); }
106 :
107 0 : void RawSender::QueueAdd(const RawMessage& msg)
108 : {
109 0 : AssertLockNotHeld(cs);
110 0 : LOCK(cs);
111 :
112 0 : const auto& [batch_size, batch_delim] = m_batching_opts;
113 : // If no batch size has been specified, simply add to queue
114 0 : if (batch_size == 0) {
115 0 : m_queue.push_back(msg);
116 0 : return;
117 : }
118 :
119 : // We can batch, either create a new batch in queue or append to existing batch in queue
120 0 : if (m_queue.empty() || m_queue.back().size() + msg.size() >= batch_size) {
121 : // Either we don't have a place to batch our message or we exceeded the batch size, make a new batch
122 0 : m_queue.emplace_back();
123 0 : m_queue.back().reserve(batch_size);
124 0 : } else if (!m_queue.back().empty()) {
125 : // When there is already a batch open we need a delimiter when its not empty
126 0 : m_queue.back() += batch_delim;
127 0 : }
128 :
129 : // Add the new message to the batch
130 0 : m_queue.back() += msg;
131 0 : }
132 :
133 0 : void RawSender::QueueFlush()
134 : {
135 0 : AssertLockNotHeld(cs);
136 0 : WITH_LOCK(cs, QueueFlush(m_queue));
137 0 : }
138 :
139 0 : void RawSender::QueueFlush(std::deque<RawMessage>& queue)
140 : {
141 0 : while (!queue.empty()) {
142 0 : SendDirectly(queue.front());
143 0 : queue.pop_front();
144 : }
145 0 : }
146 :
147 0 : void RawSender::QueueThreadMain()
148 : {
149 0 : AssertLockNotHeld(cs);
150 :
151 0 : while (!m_interrupt) {
152 : // Swap the queues to commit the existing queue of messages
153 0 : std::deque<RawMessage> queue;
154 0 : WITH_LOCK(cs, m_queue.swap(queue));
155 :
156 : // Flush the committed queue
157 0 : QueueFlush(queue);
158 0 : assert(queue.empty());
159 :
160 0 : if (!m_interrupt.sleep_for(std::chrono::milliseconds(m_interval_ms))) {
161 0 : return;
162 : }
163 0 : }
164 0 : }
|