Line data Source code
1 : // Copyright (c) 2017-2023 Vincent Thiery 2 : // Copyright (c) 2024-2025 The Dash Core developers 3 : // Distributed under the MIT software license, see the accompanying 4 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 : 6 : #ifndef BITCOIN_STATS_RAWSENDER_H 7 : #define BITCOIN_STATS_RAWSENDER_H 8 : 9 : #include <compat/compat.h> 10 : #include <sync.h> 11 : #include <util/threadinterrupt.h> 12 : #include <util/translation.h> 13 : 14 : #include <deque> 15 : #include <memory> 16 : #include <optional> 17 : #include <string> 18 : #include <vector> 19 : 20 : class Sock; 21 : 22 : struct RawMessage : public std::vector<uint8_t> 23 : { 24 : using parent_type = std::vector<value_type>; 25 : using parent_type::parent_type; 26 : 27 0 : explicit RawMessage(const std::string& data) : parent_type{data.begin(), data.end()} {} 28 : 29 0 : parent_type& operator+=(value_type rhs) { return append(rhs); } 30 : parent_type& operator+=(std::string::value_type rhs) { return append(rhs); } 31 0 : parent_type& operator+=(const parent_type& rhs) { return append(rhs); } 32 0 : parent_type& operator+=(const std::string& rhs) { return append(rhs); } 33 : 34 0 : parent_type& append(value_type rhs) 35 : { 36 0 : push_back(rhs); 37 0 : return *this; 38 : } 39 : parent_type& append(std::string::value_type rhs) 40 : { 41 : push_back(static_cast<value_type>(rhs)); 42 : return *this; 43 : } 44 0 : parent_type& append(const parent_type& rhs) 45 : { 46 0 : insert(end(), rhs.begin(), rhs.end()); 47 0 : return *this; 48 : } 49 0 : parent_type& append(const std::string& rhs) 50 : { 51 0 : insert(end(), rhs.begin(), rhs.end()); 52 0 : return *this; 53 : } 54 : }; 55 : 56 : class RawSender 57 : { 58 : public: 59 : RawSender(const std::string& host, uint16_t port, std::pair<uint64_t, uint8_t> batching_opts, uint64_t interval_ms, 60 : std::optional<bilingual_str>& error); 61 : ~RawSender(); 62 : 63 : RawSender(const RawSender&) = delete; 64 : RawSender& operator=(const RawSender&) = delete; 65 : RawSender(RawSender&&) = delete; 66 : 67 : //! Request a message to be sent based on configuration (queueing, batching) 68 : std::optional<bilingual_str> Send(const RawMessage& msg) EXCLUSIVE_LOCKS_REQUIRED(!cs); 69 : 70 : private: 71 : //! Send a message directly using ::send{,to}() 72 : std::optional<bilingual_str> SendDirectly(const RawMessage& msg); 73 : 74 : //! Get target server address as string 75 : std::string ToStringHostPort() const; 76 : 77 : //! Add message to queue 78 : void QueueAdd(const RawMessage& msg) EXCLUSIVE_LOCKS_REQUIRED(!cs); 79 : 80 : //! Send all messages in queue of RawSender entity and flush it 81 : void QueueFlush() EXCLUSIVE_LOCKS_REQUIRED(!cs); 82 : 83 : //! Send all messages in given queue and flush it 84 : void QueueFlush(std::deque<RawMessage>& queue); 85 : 86 : //! Worker thread function if queueing is requested 87 : void QueueThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs); 88 : 89 : private: 90 : /* Socket used to communicate with host */ 91 : std::unique_ptr<Sock> m_sock{nullptr}; 92 : /* Socket address containing host information */ 93 : std::pair<struct sockaddr_storage, socklen_t> m_server{{}, sizeof(struct sockaddr_storage)}; 94 : 95 : /* Mutex to protect (batches of) messages queue */ 96 : mutable Mutex cs; 97 : /* Interrupt for queue processing thread */ 98 : CThreadInterrupt m_interrupt; 99 : /* Queue of (batches of) messages to be sent */ 100 : std::deque<RawMessage> m_queue GUARDED_BY(cs); 101 : /* Thread that processes queue every m_interval_ms */ 102 : std::thread m_thread; 103 : 104 : /* Hostname of server receiving messages */ 105 : const std::string m_host; 106 : /* Port of server receiving messages */ 107 : const uint16_t m_port; 108 : /* Batching parameters */ 109 : const std::pair</*size=*/uint64_t, /*delimiter=*/uint8_t> m_batching_opts{0, 0}; 110 : /* Time between queue thread runs (expressed in milliseconds) */ 111 : const uint64_t m_interval_ms; 112 : 113 : /* Number of messages sent */ 114 : uint64_t m_successes{0}; 115 : /* Number of messages not sent */ 116 : uint64_t m_failures{0}; 117 : }; 118 : 119 : #endif // BITCOIN_STATS_RAWSENDER_H