5 #include "ace/Time_Value.h" 20 : settings_ (settings), id_ (id), context_ (0),
21 write_socket_ (write_socket),
23 send_monitor_ (send_monitor),
24 receive_monitor_ (receive_monitor),
25 packet_scheduler_ (packet_scheduler)
37 int send_buff_size = 0;
38 int rcv_buff_size = 0;
41 size_t opt_len =
sizeof (int);
48 "ZMQTransportReadThread::init:" \
49 " setting up read socket\n");
51 read_socket_ = zmq_socket (zmq_context.get_context (), ZMQ_SUB);
54 zmq_setsockopt (read_socket_, ZMQ_SUBSCRIBE, 0, 0);
57 "ZMQTransportReadThread::init:" \
58 " setting rcv buff size to settings.queue_length (%d)\n",
61 int result = zmq_setsockopt (
62 read_socket_, ZMQ_RCVBUF, (
void *)&buff_size, opt_len);
66 int result = zmq_getsockopt (
67 read_socket_, ZMQ_RCVBUF, (
void *)&rcv_buff_size, &opt_len);
70 "ZMQTransportReadThread::init:" \
71 " successfully set sockopt rcvbuf size to %d. Actual %d allocated\n",
72 buff_size, rcv_buff_size);
77 "ZMQTransportReadThread::init:" \
78 " ERROR: errno = %s\n",
79 zmq_strerror (zmq_errno ()));
83 result = zmq_setsockopt (
84 read_socket_, ZMQ_RCVTIMEO, (
void *)&timeout, opt_len);
88 int result = zmq_getsockopt (
89 read_socket_, ZMQ_RCVTIMEO, (
void *)&timeout, &opt_len);
92 "ZMQTransportReadThread::init:" \
93 " successfully set rcv timeout to %d\n",
99 "ZMQTransportReadThread::init:" \
100 " ERROR: When setting timeout on rcv, errno = %s\n",
101 zmq_strerror (zmq_errno ()));
130 int connect_result = zmq_connect (
133 if (connect_result == 0)
136 "ZMQTransportReadThread::init:" \
137 " successfully connected to %s\n",
143 "ZMQTransportReadThread::init:" \
144 " ERROR: could not connect to %s\n",
147 "ZMQTransportReadThread::init:" \
148 " ERROR: errno = %s\n",
149 zmq_strerror (zmq_errno ()));
160 "ZMQTransportReadThread::init:" \
161 " setting rules to %s\n",
165 #ifndef _MADARA_NO_KARL_ 168 #endif // _MADARA_NO_KARL_ 173 "ZMQTransportReadThread::init:" \
174 " no permanent rules were set\n");
183 "ZMQTransportReadThread::cleanup:" \
184 " starting cleanup\n");
189 "ZMQTransportReadThread::cleanup:" \
190 " closing read socket\n");
194 zmq_setsockopt (
read_socket_, ZMQ_LINGER, (
void *)&option,
sizeof (
int));
202 "ZMQTransportReadThread::cleanup:" \
203 " finished cleanup\n");
209 const char * print_prefix,
229 "ZMQTransportReadThread::send:" \
230 " sending %d bytes on socket\n", result);
237 "ZMQTransportReadThread::send:" \
238 " sent %d bytes on socket\n", result);
251 const char * print_prefix =
"ZMQTransportReadThread::run";
253 size_t zmq_buffer_size = buffer_remaining;
257 " entering main service loop.\n",
266 " Unable to allocate buffer of size " PRIu32
". Exiting thread.\n",
275 " entering a recv on the socket.\n",
279 buffer_remaining = (int64_t)zmq_recv (
284 " past recv on the socket.\n",
287 if (buffer_remaining > 0)
293 " processing %d byte update from %s.\n",
294 print_prefix, (int)buffer_remaining, header->
originator);
298 #ifndef _MADARA_NO_KARL_
306 " done processing %d byte update from %s.\n",
307 print_prefix, (int)buffer_remaining, header->
originator);
319 " wait timeout on new messages. Proceeding to next wait\n",
const QoSTransportSettings settings_
quality-of-service transport settings
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
const std::string id_
host:port identifier of this process
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
void * read_socket_
The multicast socket we are reading from.
knowledge::ThreadSafeContext * context_
knowledge context
void * write_socket_
underlying socket for sending
Provides scheduler for dropping packets.
std::vector< std::string > hosts
Host information for transports that require it.
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
void cleanup(void)
Cleanup function called by thread manager.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
ZMQTransportReadThread(const TransportSettings &settings, const std::string &id, void *write_socket, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
T * get_ptr(void)
get the underlying pointer
bool no_receiving
if true, never receive over transport
This class provides a distributed knowledge base to users.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
::std::map< std::string, KnowledgeRecord > KnowledgeMap
bool no_sending
if true, never send over transport
std::string on_data_received_logic
logic to be evaluated after every successful update
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
uint32_t queue_length
Length of the buffer used to store history of events.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Provides monitoring capability of a transport's bandwidth.
Provides functions and classes for the distributed knowledge base.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
MADARA_Export bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
void run(void)
The main loop internals for the read thread.
madara::utility::ScopedArray< char > buffer_
buffer for receiving