MADARA  3.1.8
ZMQTransportReadThread.cpp
Go to the documentation of this file.
2 
5 #include "ace/Time_Value.h"
7 #include "ZMQContext.h"
8 
9 
10 #include <iostream>
11 #include <algorithm>
12 
14  const TransportSettings & settings,
15  const std::string & id,
16  void * write_socket,
17  BandwidthMonitor & send_monitor,
18  BandwidthMonitor & receive_monitor,
19  PacketScheduler & packet_scheduler)
20  : settings_ (settings), id_ (id), context_ (0),
21  write_socket_ (write_socket),
22  read_socket_ (0),
23  send_monitor_ (send_monitor),
24  receive_monitor_ (receive_monitor),
25  packet_scheduler_ (packet_scheduler)
26 {
27 }
28 
29 void
32 {
33  context_ = &(knowledge.get_context ());
34 
36  {
37  int send_buff_size = 0;
38  int rcv_buff_size = 0;
39  int timeout = 1000;
40  int buff_size = settings_.queue_length;
41  size_t opt_len = sizeof (int);
42 
43  // setup the receive buffer
44  if (settings_.queue_length > 0)
45  buffer_ = new char[settings_.queue_length];
46 
48  "ZMQTransportReadThread::init:" \
49  " setting up read socket\n");
50 
51  read_socket_ = zmq_socket (zmq_context.get_context (), ZMQ_SUB);
52 
53  // subscribe to all messages
54  zmq_setsockopt (read_socket_, ZMQ_SUBSCRIBE, 0, 0);
55 
57  "ZMQTransportReadThread::init:" \
58  " setting rcv buff size to settings.queue_length (%d)\n",
59  buff_size);
60 
61  int result = zmq_setsockopt (
62  read_socket_, ZMQ_RCVBUF, (void *)&buff_size, opt_len);
63 
64  if (result == 0)
65  {
66  int result = zmq_getsockopt (
67  read_socket_, ZMQ_RCVBUF, (void *)&rcv_buff_size, &opt_len);
68 
70  "ZMQTransportReadThread::init:" \
71  " successfully set sockopt rcvbuf size to %d. Actual %d allocated\n",
72  buff_size, rcv_buff_size);
73  }
74  else
75  {
77  "ZMQTransportReadThread::init:" \
78  " ERROR: errno = %s\n",
79  zmq_strerror (zmq_errno ()));
80  }
81 
82 
83  result = zmq_setsockopt (
84  read_socket_, ZMQ_RCVTIMEO, (void *)&timeout, opt_len);
85 
86  if (result == 0)
87  {
88  int result = zmq_getsockopt (
89  read_socket_, ZMQ_RCVTIMEO, (void *)&timeout, &opt_len);
90 
92  "ZMQTransportReadThread::init:" \
93  " successfully set rcv timeout to %d\n",
94  timeout);
95  }
96  else
97  {
99  "ZMQTransportReadThread::init:" \
100  " ERROR: When setting timeout on rcv, errno = %s\n",
101  zmq_strerror (zmq_errno ()));
102  }
103 
104 
105  if (settings_.hosts.size () >= 1)
106  {
107  // if the first host was a reliable multicast, we need to connect to it
108  if (utility::begins_with (settings_.hosts[0], "pgm") ||
109  utility::begins_with (settings_.hosts[0], "epgm"))
110  {
111 
112  }
113  }
114 
115  // connect the reader to the host sockets
116  size_t i = 0;
117 
118  if (settings_.hosts.size () >= 1)
119  {
120  // we ignore first host unless it is pgm because writer is on first host
121  if (!utility::begins_with (settings_.hosts[0], "pgm") &&
122  !utility::begins_with (settings_.hosts[0], "epgm"))
123  {
124  ++i;
125  }
126  }
127 
128  for (; i < settings_.hosts.size (); ++i)
129  {
130  int connect_result = zmq_connect (
131  read_socket_, settings_.hosts[i].c_str ());
132 
133  if (connect_result == 0)
134  {
136  "ZMQTransportReadThread::init:" \
137  " successfully connected to %s\n",
138  settings_.hosts[i].c_str ());
139  }
140  else
141  {
143  "ZMQTransportReadThread::init:" \
144  " ERROR: could not connect to %s\n",
145  settings_.hosts[i].c_str ());
147  "ZMQTransportReadThread::init:" \
148  " ERROR: errno = %s\n",
149  zmq_strerror (zmq_errno ()));
150  }
151  }
152  }
153 
154  if (context_)
155  {
156  // check for an on_data_received ruleset
157  if (settings_.on_data_received_logic.length () != 0)
158  {
160  "ZMQTransportReadThread::init:" \
161  " setting rules to %s\n",
163 
164 
165 #ifndef _MADARA_NO_KARL_
168 #endif // _MADARA_NO_KARL_
169  }
170  else
171  {
173  "ZMQTransportReadThread::init:" \
174  " no permanent rules were set\n");
175  }
176  }
177 }
178 
179 void
181 {
183  "ZMQTransportReadThread::cleanup:" \
184  " starting cleanup\n");
185 
186  if (read_socket_ != 0)
187  {
189  "ZMQTransportReadThread::cleanup:" \
190  " closing read socket\n");
191 
192  int option = 0;
193  // if you don't do this, ZMQ waits forever for no reason. Super smart.
194  zmq_setsockopt (read_socket_, ZMQ_LINGER, (void *)&option, sizeof (int));
195 
196  madara::utility::sleep (0.100);
197 
198  zmq_close (read_socket_);
199  }
200 
202  "ZMQTransportReadThread::cleanup:" \
203  " finished cleanup\n");
204 
205 }
206 
207 void
209  const char * print_prefix,
210  MessageHeader * header,
211  const knowledge::KnowledgeMap & records)
212 {
213  int64_t buffer_remaining = (int64_t)settings_.queue_length;
214  char * buffer = buffer_.get_ptr ();
215  int result (0);
216 
217  if (!settings_.no_sending)
218  {
219  result = prep_rebroadcast (*context_, buffer, buffer_remaining,
220  settings_, print_prefix,
221  header, records,
223 
224  if (result > 0)
225  {
226  if (settings_.hosts.size () > 0 && result > 0)
227  {
229  "ZMQTransportReadThread::send:" \
230  " sending %d bytes on socket\n", result);
231 
232  //send the prepped buffer over ZeroMQ
233  result = zmq_send (
234  write_socket_, (void *)buffer_.get_ptr (), (size_t)result, ZMQ_DONTWAIT);
235 
237  "ZMQTransportReadThread::send:" \
238  " sent %d bytes on socket\n", result);
239  }
240  }
241  }
242 }
243 
244 void
246 {
247  if (!settings_.no_receiving)
248  {
249  // allocate a buffer to send
250  char * buffer = buffer_.get_ptr ();
251  const char * print_prefix = "ZMQTransportReadThread::run";
252  int64_t buffer_remaining = settings_.queue_length;
253  size_t zmq_buffer_size = buffer_remaining;
254 
256  "%s:" \
257  " entering main service loop.\n",
258  print_prefix);
259 
260  knowledge::KnowledgeMap rebroadcast_records;
261 
262  if (buffer == 0)
263  {
265  "%s:" \
266  " Unable to allocate buffer of size " PRIu32 ". Exiting thread.\n",
267  print_prefix,
269 
270  return;
271  }
272 
274  "%s:" \
275  " entering a recv on the socket.\n",
276  print_prefix);
277 
278  // blocking receive up to rcv timeout (1 second)
279  buffer_remaining = (int64_t)zmq_recv (
280  read_socket_, (void *)buffer, zmq_buffer_size, 0);
281 
283  "%s:" \
284  " past recv on the socket.\n",
285  print_prefix);
286 
287  if (buffer_remaining > 0)
288  {
289  MessageHeader * header = (MessageHeader *)buffer;
290 
292  "%s:" \
293  " processing %d byte update from %s.\n",
294  print_prefix, (int)buffer_remaining, header->originator);
295 
296  process_received_update (buffer, (uint32_t)buffer_remaining, id_, *context_,
297  settings_, send_monitor_, receive_monitor_, rebroadcast_records,
298 #ifndef _MADARA_NO_KARL_
300 #endif // _MADARA_NO_KARL_
301  print_prefix,
302  header->originator, header);
303 
305  "%s:" \
306  " done processing %d byte update from %s.\n",
307  print_prefix, (int)buffer_remaining, header->originator);
308 
309  if (header)
310  {
311  // delete header
312  delete header;
313  }
314  }
315  else
316  {
318  "%s:" \
319  " wait timeout on new messages. Proceeding to next wait\n",
320  print_prefix);
321  }
322  }
323 }
const QoSTransportSettings settings_
quality-of-service transport settings
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
const std::string id_
host:port identifier of this process
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
void * read_socket_
The multicast socket we are reading from.
knowledge::ThreadSafeContext * context_
knowledge context
void * write_socket_
underlying socket for sending
Provides scheduler for dropping packets.
std::vector< std::string > hosts
Host information for transports that require it.
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
void cleanup(void)
Cleanup function called by thread manager.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
ZMQTransportReadThread(const TransportSettings &settings, const std::string &id, void *write_socket, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:44
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:42
::std::map< std::string, KnowledgeRecord > KnowledgeMap
bool no_sending
if true, never send over transport
std::string on_data_received_logic
logic to be evaluated after every successful update
static constexpr struct madara::knowledge::tags::string_t string
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:112
char originator[64]
the originator of the message (host:port)
uint32_t queue_length
Length of the buffer used to store history of events.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:740
Provides monitoring capability of a transport&#39;s bandwidth.
Provides functions and classes for the distributed knowledge base.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
MADARA_Export bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
Definition: Utility.inl:7
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:56
void run(void)
The main loop internals for the read thread.
madara::utility::ScopedArray< char > buffer_
buffer for receiving