MADARA  3.1.8
BroadcastTransportReadThread.cpp
Go to the documentation of this file.
2 
5 #include "ace/Time_Value.h"
7 
8 #include <iostream>
9 
11  const TransportSettings & settings,
12  const std::string & id,
13  const ACE_INET_Addr & address,
14  ACE_SOCK_Dgram_Bcast & socket,
15  BandwidthMonitor & send_monitor,
16  BandwidthMonitor & receive_monitor,
17  PacketScheduler & packet_scheduler)
18  : settings_ (settings), id_ (id), context_ (0),
19  address_ (address.get_port_number ()),
20  write_socket_ (socket),
21  send_monitor_ (send_monitor),
22  receive_monitor_ (receive_monitor),
23  packet_scheduler_ (packet_scheduler)
24 {
25  // for receiving, we only want to bind to the local port
26  ACE_INET_Addr local (address.get_port_number ());
27 
28  qos_settings_ = dynamic_cast <const QoSTransportSettings *> (&settings);
29 
30  if (-1 == read_socket_.open (local, 2, 0, 1))
31  {
33  "BroadcastTransportReadThread::constructor:" \
34  " Error subscribing to broadcast address %s:%d\n",
35  address_.get_host_addr (), address_.get_port_number ());
36  }
37  else
38  {
40  "BroadcastTransportReadThread::constructor:" \
41  " Success subscribing to broadcast address %s:%d\n",
42  address_.get_host_addr (), address_.get_port_number ());
43 
44  int send_buff_size = 0, tar_buff_size (settings_.queue_length);
45  int rcv_buff_size = 0;
46  int opt_len = sizeof (int);
47 
48  ACE_SOCK_Dgram & bare_socket = read_socket_;
49 
50  bare_socket.get_option (SOL_SOCKET, SO_RCVBUF,
51  (void *)&rcv_buff_size, &opt_len);
52 
54  "BroadcastTransportReadThread::constructor:" \
55  " default socket buff size is send=%d, rcv=%d\n",
56  send_buff_size, rcv_buff_size);
57 
58  if (send_buff_size < tar_buff_size)
59  {
61  "BroadcastTransportReadThread::constructor:" \
62  " setting send buff size to settings.queue_length (%d)\n",
63  tar_buff_size);
64 
65  bare_socket.set_option (SOL_SOCKET, SO_SNDBUF,
66  (void *)&tar_buff_size, opt_len);
67 
68  bare_socket.get_option (SOL_SOCKET, SO_SNDBUF,
69  (void *)&send_buff_size, &opt_len);
70 
72  "BroadcastTransportReadThread::constructor:" \
73  " current socket buff size is send=%d, rcv=%d\n",
74  send_buff_size, rcv_buff_size);
75  }
76 
77  if (rcv_buff_size < tar_buff_size)
78  {
80  "BroadcastTransportReadThread::constructor:" \
81  " setting rcv buff size to settings.queue_length (%d)\n",
82  tar_buff_size);
83 
84  bare_socket.set_option (SOL_SOCKET, SO_SNDBUF,
85  (void *)&tar_buff_size, opt_len);
86 
87  bare_socket.get_option (SOL_SOCKET, SO_SNDBUF,
88  (void *)&rcv_buff_size, &opt_len);
89 
91  "BroadcastTransportReadThread::constructor:" \
92  " current socket buff size is send=%d, rcv=%d\n",
93  send_buff_size, rcv_buff_size);
94  }
95 
96  // setup the receive buffer
97  if (settings_.queue_length > 0)
98  buffer_ = new char [settings_.queue_length];
99  }
100 }
101 
102 
103 void
106 {
107  context_ = &(knowledge.get_context ());
108 
109  if (context_)
110  {
111  // check for an on_data_received ruleset
112  if (settings_.on_data_received_logic.length () != 0)
113  {
115  "BroadcastTransportReadThread::init:" \
116  " setting rules to %s\n",
118 
119 #ifndef _MADARA_NO_KARL_
122 #endif // _MADARA_NO_KARL_
123  }
124  else
125  {
127  "BroadcastTransportReadThread::init:" \
128  " no permanent rules were set\n");
129  }
130  }
131 }
132 
133 
134 void
136 {
137  // close broadcast port
138  if (-1 == read_socket_.close ())
139  {
141  "BroadcastTransportReadThread::cleanup:" \
142  " Error closing broadcast socket\n");
143  }
144 }
145 
146 void
148  const char * print_prefix,
149  MessageHeader * header,
150  const knowledge::KnowledgeMap & records)
151 {
152  int64_t buffer_remaining = (int64_t) settings_.queue_length;
153  char * buffer = buffer_.get_ptr ();
154  int result (0);
155 
156  if (!settings_.no_sending)
157  {
158  result = prep_rebroadcast (*context_, buffer, buffer_remaining,
159  *qos_settings_, print_prefix,
160  header, records,
162 
163  if (result > 0)
164  {
165  ssize_t bytes_sent = 0;
166  uint64_t packet_size = MessageHeader::get_size (buffer_.get_ptr ());
167 
168  if (packet_size > settings_.max_fragment_size)
169  {
170  FragmentMap map;
171 
173  "%s:" \
174  " fragmenting %d byte packet (%" PRIu64 " bytes is max fragment size)\n",
175  print_prefix, packet_size, settings_.max_fragment_size);
176 
177  // fragment the message
179 
180  int j (0);
181  for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i)
182  {
184  "%s:" \
185  " Sending fragment %d\n",
186  print_prefix, j);
187 
188  int send_attempts = -1;
189  ssize_t actual_sent = -1;
190 
191  while (actual_sent < 0 &&
192  (settings_.resend_attempts < 0 ||
193  send_attempts < settings_.resend_attempts))
194  {
195  // send the fragment
196  actual_sent = write_socket_.send (
197  i->second,
198  (ssize_t)MessageHeader::get_size (i->second),
199  address_);
200 
201  ++send_attempts;
202 
203  if (actual_sent > 0)
204  {
205  bytes_sent = (uint64_t)actual_sent;
206 
208  "%s:" \
209  " Sent packet of size %" PRIu64 "\n",
210  print_prefix, bytes_sent);
211 
212  send_monitor_.add ((uint32_t)actual_sent);
213  }
214  else if (actual_sent == ECONNRESET)
215  {
217  "%s:" \
218  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
219  print_prefix);
220  }
221  else if (actual_sent == EINTR)
222  {
224  "%s:" \
225  " Local socket was interrupted during send (EINTR)\n",
226  print_prefix);
227  }
228  else if (actual_sent == EWOULDBLOCK)
229  {
231  "%s:" \
232  " Send would have blocked (EWOULDBLOCK)\n",
233  print_prefix);
234  }
235  else if (actual_sent == ENOTCONN)
236  {
238  "%s:" \
239  " Send reports socket is not connected (ENOTCONN)\n",
240  print_prefix);
241  }
242  else if (actual_sent == EADDRINUSE)
243  {
245  "%s:" \
246  " Send reports the interface is busy (EADDRINUSE)\n",
247  print_prefix);
248  }
249  else if (actual_sent == EBADF)
250  {
252  "%s:" \
253  " Send socket is invalid (EBADF)\n",
254  print_prefix);
255  }
256  else
257  {
259  "%s:" \
260  " Packet was not sent due to unknown error (%d)\n",
261  print_prefix, (int)actual_sent);
262  }
263  }
264 
265  // sleep between fragments, if such a slack time is specified
266  if (settings_.slack_time > 0)
268  }
269 
270  send_monitor_.add ((uint32_t)bytes_sent);
271 
273  "%s:" \
274  " Sent fragments totalling %" PRIu64 " bytes\n",
275  print_prefix,
276  bytes_sent);
277 
278  delete_fragments (map);
279  }
280  else
281  {
284  "%s:" \
285  " Sending packet of size %ld\n",
286  print_prefix, result);
287 
288  int send_attempts = -1;
289  ssize_t actual_sent = -1;
290 
291  while (actual_sent < 0 &&
292  (settings_.resend_attempts < 0 ||
293  send_attempts < settings_.resend_attempts))
294  {
295 
296  // send the fragment
297  actual_sent = write_socket_.send (
298  buffer_.get_ptr (), (ssize_t)result, address_);
299 
300  ++send_attempts;
301 
302  if (actual_sent > 0)
303  {
304  bytes_sent = (uint64_t)actual_sent;
305 
308  "%s:" \
309  " Sent packet of size %" PRIu64 "\n",
310  print_prefix, bytes_sent);
311 
312  send_monitor_.add ((uint32_t)actual_sent);
313  }
314  else if (actual_sent == ECONNRESET)
315  {
318  "%s:" \
319  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
320  print_prefix);
321  }
322  else if (actual_sent == EINTR)
323  {
326  "%s:" \
327  " Local socket was interrupted during send (EINTR)\n",
328  print_prefix);
329  }
330  else if (actual_sent == EWOULDBLOCK)
331  {
334  "%s:" \
335  " Send would have blocked (EWOULDBLOCK)\n",
336  print_prefix);
337  }
338  else if (actual_sent == ENOTCONN)
339  {
342  "%s:" \
343  " Send reports socket is not connected (ENOTCONN)\n",
344  print_prefix);
345  }
346  else if (actual_sent == EADDRINUSE)
347  {
350  "%s:" \
351  " Send reports the interface is busy (EADDRINUSE)\n",
352  print_prefix);
353  }
354  else if (actual_sent == EBADF)
355  {
358  "%s:" \
359  " Send socket is invalid (EBADF)\n",
360  print_prefix);
361  }
362  else
363  {
366  "%s:" \
367  " Packet was not sent due to unknown error (%d)\n",
368  print_prefix, (int)actual_sent);
369  }
370  }
371  }
372 
374  "%s:" \
375  " Send bandwidth = %" PRIu64 " B/s\n",
376  print_prefix,
378  }
379  }
380 }
381 
382 void
384 {
385  if (!settings_.no_receiving)
386  {
387  ACE_Time_Value wait_time (1);
388  ACE_INET_Addr remote;
389 
390  // allocate a buffer to send
391  char * buffer = buffer_.get_ptr ();
392  const char * print_prefix = "BroadcastTransportReadThread::run";
393 
395  "%s:" \
396  " entering main service loop.\n",
397  print_prefix);
398 
399  knowledge::KnowledgeMap rebroadcast_records;
400 
401  if (buffer == 0)
402  {
404  "%s:" \
405  " Unable to allocate buffer of size " PRIu32 ". Exiting thread.\n",
406  print_prefix,
408 
409  return;
410  }
411 
413  "%s:" \
414  " entering a recv on the socket.\n",
415  print_prefix);
416 
417  // read the message
418  ssize_t bytes_read = read_socket_.recv ((void *)buffer,
419  (size_t)settings_.queue_length, remote, 0, &wait_time);
420 
421  if (bytes_read > 0)
422  {
423  if (remote.get_host_addr () != 0)
424  {
427  "%s:" \
428  " received a message header of %lld bytes from %s:%d\n",
429  print_prefix,
430  (long long)bytes_read,
431  remote.get_host_addr (), (int)remote.get_port_number ());
432  }
433  else
434  {
437  "%s:" \
438  " received %lld bytes from unknown host\n",
439  print_prefix,
440  (long long)bytes_read);
441  }
442 
443  MessageHeader * header = 0;
444 
445  std::stringstream remote_host;
446  remote_host << remote.get_host_addr ();
447  remote_host << ":";
448  remote_host << remote.get_port_number ();
449 
450  process_received_update (buffer, bytes_read, id_, *context_,
451  *qos_settings_, send_monitor_, receive_monitor_, rebroadcast_records,
452  #ifndef _MADARA_NO_KARL_
454  #endif // _MADARA_NO_KARL_
455  print_prefix,
456  remote_host.str ().c_str (), header);
457 
458  if (header)
459  {
460  if (header->ttl > 0 && rebroadcast_records.size () > 0 &&
462  {
463  --header->ttl;
464  header->ttl = std::min (
465  qos_settings_->get_participant_ttl (), header->ttl);
466 
467  rebroadcast (print_prefix, header, rebroadcast_records);
468  }
469 
470  // delete header
471  delete header;
472  }
473  }
474  else
475  {
477  "%s:" \
478  " wait timeout on new messages. Proceeding to next wait\n",
479  print_prefix);
480  }
481  }
482 }
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void run(void)
The main loop internals for the read thread.
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
ACE_SOCK_Dgram read_socket_
The broadcast socket we are reading from.
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
madara::utility::ScopedArray< char > buffer_
buffer for sending
const std::string id_
host:port identifier of this process
Provides scheduler for dropping packets.
MADARA_Export utility::Refcounter< logger::Logger > global_logger
void cleanup(void)
Cleanup function called by thread manager.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
const QoSTransportSettings * qos_settings_
pointer to qos_settings (if applicable)
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
BroadcastTransportReadThread(const TransportSettings &settings, const std::string &id, const ACE_INET_Addr &address, ACE_SOCK_Dgram_Bcast &socket, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:32
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:44
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:42
::std::map< std::string, KnowledgeRecord > KnowledgeMap
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
ACE_SOCK_Dgram_Bcast write_socket_
underlying socket for rebroadcasting
std::string on_data_received_logic
logic to be evaluated after every successful update
double slack_time
time to sleep between sends and rebroadcasts
static constexpr struct madara::knowledge::tags::string_t string
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:112
knowledge::ThreadSafeContext * context_
knowledge context
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
uint32_t queue_length
Length of the buffer used to store history of events.
Container for quality-of-service settings.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:740
Provides monitoring capability of a transport&#39;s bandwidth.
Provides functions and classes for the distributed knowledge base.
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
const TransportSettings & settings_
Transport settings.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
ACE_INET_Addr address_
The broadcast address we are subscribing to.
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:56
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
int resend_attempts
Maximum number of attempts to resend if transport is busy.
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent&#39;s m...