MADARA  3.1.8
MulticastTransportReadThread.cpp
Go to the documentation of this file.
2 
5 #include "ace/Time_Value.h"
7 
8 
9 #include <iostream>
10 #include <algorithm>
11 
13  const TransportSettings & settings,
14  const std::string & id,
15  const ACE_INET_Addr & address,
16  ACE_SOCK_Dgram & write_socket,
17  ACE_SOCK_Dgram_Mcast & read_socket,
18  BandwidthMonitor & send_monitor,
19  BandwidthMonitor & receive_monitor,
20  PacketScheduler & packet_scheduler)
21  : settings_ (settings), id_ (id), context_ (0), address_ (address),
22  read_socket_ (read_socket),
23  write_socket_ (write_socket),
24  send_monitor_ (send_monitor),
25  receive_monitor_ (receive_monitor),
26  packet_scheduler_ (packet_scheduler)
27 {
28 }
29 
30 void
33 {
34  context_ = &(knowledge.get_context ());
35 
36  // setup the receive buffer
37  if (settings_.queue_length > 0)
38  buffer_ = new char [settings_.queue_length];
39 
40  if (context_)
41  {
42  // check for an on_data_received ruleset
43  if (settings_.on_data_received_logic.length () != 0)
44  {
46  "MulticastTransportReadThread::init:" \
47  " setting rules to %s\n",
49 
50 
51 #ifndef _MADARA_NO_KARL_
54 #endif // _MADARA_NO_KARL_
55  }
56  else
57  {
59  "MulticastTransportReadThread::init:" \
60  " no permanent rules were set\n");
61  }
62  }
63 }
64 
65 void
67 {
68  // Unsubscribe
69  if (-1 == read_socket_.leave (address_))
70  {
72  "MulticastTransportReadThread::close:" \
73  " Error unsubscribing to multicast address\n");
74  }
75 
76  read_socket_.close ();
77 }
78 
79 void
81  const char * print_prefix,
82  MessageHeader * header,
83  const knowledge::KnowledgeMap & records)
84 {
85  int64_t buffer_remaining = (int64_t) settings_.queue_length;
86  char * buffer = buffer_.get_ptr ();
87  int result (0);
88 
89  if (!settings_.no_sending)
90  {
91  result = prep_rebroadcast (*context_, buffer, buffer_remaining,
92  settings_, print_prefix,
93  header, records,
95 
96  if (result > 0)
97  {
98  ssize_t bytes_sent = 0;
99  uint64_t packet_size = (uint64_t)result;
100 
101  if (packet_size > settings_.max_fragment_size)
102  {
103  FragmentMap map;
104 
106  "%s:" \
107  " fragmenting %d byte packet (%" PRIu64 " bytes is max fragment size)\n",
108  print_prefix, packet_size, settings_.max_fragment_size);
109 
110  // fragment the message
112 
113  int j (0);
114  for (FragmentMap::iterator i = map.begin (); i != map.end ();
115  ++i, ++j)
116  {
118  "%s:" \
119  " Sending fragment %d\n",
120  print_prefix, j);
121 
122  int send_attempts = -1;
123  ssize_t actual_sent = -1;
124 
125  while (actual_sent < 0 &&
126  (settings_.resend_attempts < 0 ||
127  send_attempts < settings_.resend_attempts))
128  {
129  // send the fragment
130  actual_sent = write_socket_.send (
131  i->second,
132  (ssize_t)MessageHeader::get_size (i->second),
133  address_);
134 
135  ++send_attempts;
136 
137  if (actual_sent > 0)
138  {
139  bytes_sent = (uint64_t)actual_sent;
140 
142  "%s:" \
143  " Sent packet of size %" PRIu64 "\n",
144  print_prefix, bytes_sent);
145 
146  send_monitor_.add ((uint32_t)actual_sent);
147  }
148  else if (actual_sent == ECONNRESET)
149  {
151  "%s:" \
152  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
153  print_prefix);
154  }
155  else if (actual_sent == EINTR)
156  {
158  "%s:" \
159  " Local socket was interrupted during send (EINTR)\n",
160  print_prefix);
161  }
162  else if (actual_sent == EWOULDBLOCK)
163  {
165  "%s:" \
166  " Send would have blocked (EWOULDBLOCK)\n",
167  print_prefix);
168  }
169  else if (actual_sent == ENOTCONN)
170  {
172  "%s:" \
173  " Send reports socket is not connected (ENOTCONN)\n",
174  print_prefix);
175  }
176  else if (actual_sent == EADDRINUSE)
177  {
179  "%s:" \
180  " Send reports the interface is busy (EADDRINUSE)\n",
181  print_prefix);
182  }
183  else if (actual_sent == EBADF)
184  {
186  "%s:" \
187  " Send socket is invalid (EBADF)\n",
188  print_prefix);
189  }
190  else
191  {
193  "%s:" \
194  " Packet was not sent due to unknown error (%d)\n",
195  print_prefix, (int)actual_sent);
196  }
197  }
198 
199  // sleep between fragments, if such a slack time is specified
200  if (settings_.slack_time > 0)
202  }
203 
204  send_monitor_.add ((uint32_t)bytes_sent);
205 
207  "%s:" \
208  " Sent fragments totalling %" PRIu64 " bytes\n",
209  print_prefix,
210  bytes_sent);
211 
212  delete_fragments (map);
213  }
214  else
215  {
218  "%s:" \
219  " Sending packet of size %ld\n",
220  print_prefix, result);
221 
222  int send_attempts = -1;
223  ssize_t actual_sent = -1;
224 
225  while (actual_sent < 0 &&
226  (settings_.resend_attempts < 0 ||
227  send_attempts < settings_.resend_attempts))
228  {
229 
230  // send the fragment
231  actual_sent = write_socket_.send (
232  buffer_.get_ptr (), (ssize_t)result, address_);
233 
234  ++send_attempts;
235 
236  if (actual_sent > 0)
237  {
238  bytes_sent = (uint64_t)actual_sent;
239 
242  "%s:" \
243  " Sent packet of size %" PRIu64 "\n",
244  print_prefix, bytes_sent);
245 
246  send_monitor_.add ((uint32_t)actual_sent);
247  }
248  else if (actual_sent == ECONNRESET)
249  {
252  "%s:" \
253  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
254  print_prefix);
255  }
256  else if (actual_sent == EINTR)
257  {
260  "%s:" \
261  " Local socket was interrupted during send (EINTR)\n",
262  print_prefix);
263  }
264  else if (actual_sent == EWOULDBLOCK)
265  {
268  "%s:" \
269  " Send would have blocked (EWOULDBLOCK)\n",
270  print_prefix);
271  }
272  else if (actual_sent == ENOTCONN)
273  {
276  "%s:" \
277  " Send reports socket is not connected (ENOTCONN)\n",
278  print_prefix);
279  }
280  else if (actual_sent == EADDRINUSE)
281  {
284  "%s:" \
285  " Send reports the interface is busy (EADDRINUSE)\n",
286  print_prefix);
287  }
288  else if (actual_sent == EBADF)
289  {
292  "%s:" \
293  " Send socket is invalid (EBADF)\n",
294  print_prefix);
295  }
296  else
297  {
300  "%s:" \
301  " Packet was not sent due to unknown error (%d)\n",
302  print_prefix, (int)actual_sent);
303  }
304  }
305  }
306 
308  "%s:" \
309  " Send bandwidth = %" PRIu64 " B/s\n",
310  print_prefix,
312  }
313  }
314 }
315 
316 void
318 {
319  if (!settings_.no_receiving)
320  {
321  ACE_Time_Value wait_time (1);
322  ACE_INET_Addr remote;
323 
324  // allocate a buffer to send
325  char * buffer = buffer_.get_ptr ();
326  const char * print_prefix = "MulticastTransportReadThread::run";
327 
329  "%s:" \
330  " entering main service loop.\n", print_prefix);
331 
332  knowledge::KnowledgeMap rebroadcast_records;
333 
334  if (buffer == 0)
335  {
337  "%s:" \
338  " Unable to allocate buffer of size " PRIu32 ". Exiting thread.\n",
339  print_prefix,
341 
342  return;
343  }
344 
346  "%s:" \
347  " entering a recv on the socket.\n",
348  print_prefix);
349 
350  // read the message
351  ssize_t bytes_read = read_socket_.recv ((void *)buffer,
352  (size_t) settings_.queue_length, remote, 0, &wait_time);
353 
354  if (bytes_read > 0)
355  {
356  if (remote.get_host_addr () != 0)
357  {
360  "%s:" \
361  " received a message header of %lld bytes from %s:%d\n",
362  print_prefix,
363  (long long)bytes_read,
364  remote.get_host_addr (), (int)remote.get_port_number ());
365  }
366  else
367  {
370  "%s:" \
371  " received %lld bytes from unknown host\n",
372  print_prefix,
373  (long long)bytes_read);
374  }
375  MessageHeader * header = 0;
376 
377  std::stringstream remote_host;
378  remote_host << remote.get_host_addr ();
379  remote_host << ":";
380  remote_host << remote.get_port_number ();
381 
382  process_received_update (buffer, bytes_read, id_, *context_,
383  settings_, send_monitor_, receive_monitor_, rebroadcast_records,
384  #ifndef _MADARA_NO_KARL_
386  #endif // _MADARA_NO_KARL_
387  print_prefix,
388  remote_host.str ().c_str (), header);
389 
390  if (header)
391  {
392  if (header->ttl > 0 && rebroadcast_records.size () > 0 &&
394  {
395  --header->ttl;
396  header->ttl = std::min (
397  settings_.get_participant_ttl (), header->ttl);
398 
399  rebroadcast (print_prefix, header, rebroadcast_records);
400  }
401 
402  // delete header
403  delete header;
404  }
405  }
406  else
407  {
409  "%s:" \
410  " wait timeout on new messages. Proceeding to next wait\n",
411  print_prefix);
412  }
413 
415  "%s:" \
416  " finished iteration.\n",
417  print_prefix);
418  }
419 }
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
MulticastTransportReadThread(const TransportSettings &settings, const std::string &id, const ACE_INET_Addr &address, ACE_SOCK_Dgram &write_socket, ACE_SOCK_Dgram_Mcast &read_socket, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
void run(void)
The main loop internals for the read thread.
Provides scheduler for dropping packets.
MADARA_Export utility::Refcounter< logger::Logger > global_logger
const std::string id_
host:port identifier of this process
knowledge::ThreadSafeContext * context_
knowledge context
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
ACE_SOCK_Dgram & write_socket_
underlying socket for sending
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:32
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:44
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:42
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
::std::map< std::string, KnowledgeRecord > KnowledgeMap
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
std::string on_data_received_logic
logic to be evaluated after every successful update
double slack_time
time to sleep between sends and rebroadcasts
static constexpr struct madara::knowledge::tags::string_t string
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:112
uint32_t queue_length
Length of the buffer used to store history of events.
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:740
Provides monitoring capability of a transport&#39;s bandwidth.
const QoSTransportSettings settings_
quality-of-service transport settings
Provides functions and classes for the distributed knowledge base.
ACE_SOCK_Dgram_Mcast & read_socket_
The multicast socket we are reading from.
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
void cleanup(void)
Cleanup function called by thread manager.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:56
ACE_INET_Addr address_
The multicast address we are subscribing to.
madara::utility::ScopedArray< char > buffer_
buffer for receiving
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
int resend_attempts
Maximum number of attempts to resend if transport is busy.
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent&#39;s m...