MADARA  3.1.8
UdpTransportReadThread.cpp
Go to the documentation of this file.
2 
5 #include "ace/Time_Value.h"
6 
7 #include <iostream>
8 
10  const TransportSettings & settings,
11  const std::string & id,
12  std::map <std::string, ACE_INET_Addr> & addresses,
13  ACE_SOCK_Dgram & write_socket,
14  ACE_SOCK_Dgram & read_socket,
15  BandwidthMonitor & send_monitor,
16  BandwidthMonitor & receive_monitor,
17  PacketScheduler & packet_scheduler)
18  : settings_ (settings), id_ (id), context_ (0),
19  addresses_ (addresses),
20  write_socket_ (write_socket),
21  read_socket_ (read_socket),
22  send_monitor_ (send_monitor),
23  receive_monitor_ (receive_monitor),
24  packet_scheduler_ (packet_scheduler)
25 {
26 
27 
28 }
29 
30 void
33 {
34  context_ = &(knowledge.get_context ());
35 
36  // setup the receive buffer
37  if (settings_.queue_length > 0)
38  buffer_ = new char [settings_.queue_length];
39 
40  if (context_)
41  {
42  // check for an on_data_received ruleset
43  if (settings_.on_data_received_logic.length () != 0)
44  {
45 
46 #ifndef _MADARA_NO_KARL_
48  "UdpTransportReadThread::init:" \
49  " setting rules to %s\n",
51 
54 #endif // _MADARA_NO_KARL_
55  }
56  else
57  {
59  "UdpTransportReadThread::init:" \
60  " no permanent rules were set\n");
61  }
62  }
63 }
64 
65 void
67  const char * print_prefix,
68  MessageHeader * header,
69  const knowledge::KnowledgeMap & records)
70 {
71  int64_t buffer_remaining = (int64_t) settings_.queue_length;
72  char * buffer = buffer_.get_ptr ();
73  int result (0);
74 
75  if (!settings_.no_sending)
76  {
77  result = prep_rebroadcast (*context_, buffer, buffer_remaining,
78  settings_, print_prefix,
79  header, records,
81 
82  if (result > 0)
83  {
84  uint64_t bytes_sent = 0;
85  uint64_t packet_size = (uint64_t)result;
86 
87  if (packet_size > settings_.max_fragment_size)
88  {
89  FragmentMap map;
90 
92  "%s:" \
93  " fragmenting %" PRIu64 " byte packet (%" PRIu32 " bytes is max fragment size)\n",
94  print_prefix, packet_size, settings_.max_fragment_size);
95 
96  // fragment the message
98 
99  int j (0);
100  for (FragmentMap::iterator i = map.begin (); i != map.end ();
101  ++i, ++j)
102  {
103  size_t frag_size =
104  (size_t)MessageHeader::get_size (i->second);
105 
106  for (std::map <std::string, ACE_INET_Addr>::const_iterator addr =
107  addresses_.begin (); addr != addresses_.end (); ++addr)
108  {
109  if (addr->first != settings_.hosts[0])
110  {
112  "%s:" \
113  " Sending fragment %d\n",
114  print_prefix, j);
115 
116  int send_attempts = -1;
117  ssize_t actual_sent = -1;
118 
119  while (actual_sent < 0 &&
120  (settings_.resend_attempts < 0 ||
121  send_attempts < settings_.resend_attempts))
122  {
123 
124  // send the fragment
125  actual_sent = write_socket_.send (
126  i->second, frag_size, addr->second);
127 
128  ++send_attempts;
129 
130  if (actual_sent > 0)
131  {
132  bytes_sent = (uint64_t)actual_sent;
133 
135  "%s:" \
136  " Sent packet of size %" PRIu64 "\n",
137  print_prefix, bytes_sent);
138 
139  send_monitor_.add ((uint32_t)actual_sent);
140  }
141  else if (actual_sent == ECONNRESET)
142  {
144  "%s:" \
145  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
146  print_prefix);
147  }
148  else if (actual_sent == EINTR)
149  {
151  "%s:" \
152  " Local socket was interrupted during send (EINTR)\n",
153  print_prefix);
154  }
155  else if (actual_sent == EWOULDBLOCK)
156  {
158  "%s:" \
159  " Send would have blocked (EWOULDBLOCK)\n",
160  print_prefix);
161  }
162  else if (actual_sent == ENOTCONN)
163  {
165  "%s:" \
166  " Send reports socket is not connected (ENOTCONN)\n",
167  print_prefix);
168  }
169  else if (actual_sent == EADDRINUSE)
170  {
172  "%s:" \
173  " Send reports the interface is busy (EADDRINUSE)\n",
174  print_prefix);
175  }
176  else if (actual_sent == EBADF)
177  {
179  "%s:" \
180  " Send socket is invalid (EBADF)\n",
181  print_prefix);
182  }
183  else
184  {
186  "%s:" \
187  " Packet was not sent due to unknown error (%d)\n",
188  print_prefix, (int)actual_sent);
189  }
190  }
191 
192  // sleep between fragments, if such a slack time is specified
193  if (settings_.slack_time > 0)
195 
196  }
197  }
198  }
199 
201  "%s:" \
202  " Sent fragments totalling %" PRIu64 " bytes\n",
203  print_prefix, bytes_sent);
204 
205  delete_fragments (map);
206  }
207  else
208  {
209  for (std::map <std::string, ACE_INET_Addr>::const_iterator i =
210  addresses_.begin (); i != addresses_.end (); ++i)
211  {
212  if (i->first != settings_.hosts[0])
213  {
216  "%s:" \
217  " Sending packet of size %ld\n",
218  print_prefix, result);
219 
220  int send_attempts = -1;
221  ssize_t actual_sent = -1;
222 
223  while (actual_sent < 0 &&
224  (settings_.resend_attempts < 0 ||
225  send_attempts < settings_.resend_attempts))
226  {
227 
228  // send the fragment
229  actual_sent = write_socket_.send (buffer_.get_ptr (),
230  (ssize_t)result, i->second);
231 
232  ++send_attempts;
233 
234  if (actual_sent > 0)
235  {
236  bytes_sent += actual_sent;
237 
240  "%s:" \
241  " Sent packet of size %" PRIu64 "\n",
242  print_prefix, (int)actual_sent);
243 
244  send_monitor_.add ((uint32_t)actual_sent);
245  }
246  else if (actual_sent == ECONNRESET)
247  {
250  "%s:" \
251  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
252  print_prefix);
253  }
254  else if (actual_sent == EINTR)
255  {
258  "%s:" \
259  " Local socket was interrupted during send (EINTR)\n",
260  print_prefix);
261  }
262  else if (actual_sent == EWOULDBLOCK)
263  {
266  "%s:" \
267  " Send would have blocked (EWOULDBLOCK)\n",
268  print_prefix);
269  }
270  else if (actual_sent == ENOTCONN)
271  {
274  "%s:" \
275  " Send reports socket is not connected (ENOTCONN)\n",
276  print_prefix);
277  }
278  else if (actual_sent == EADDRINUSE)
279  {
282  "%s:" \
283  " Send reports the interface is busy (EADDRINUSE)\n",
284  print_prefix);
285  }
286  else if (actual_sent == EBADF)
287  {
290  "%s:" \
291  " Send socket is invalid (EBADF)\n",
292  print_prefix);
293  }
294  else
295  {
298  "%s:" \
299  " Packet was not sent due to unknown error (%d)\n",
300  print_prefix, (int)actual_sent);
301  }
302  }
303  }
304  }
305 
307  "%s:" \
308  " Sent %d total bytes via rebroadcast\n",
309  print_prefix, bytes_sent);
310  }
311 
313  "%s:" \
314  " Send bandwidth = %" PRIu64 " B/s\n",
315  print_prefix,
317  }
318  }
319 }
320 
321 void
323 {
324 }
325 
326 
327 void
329 {
330  if (!settings_.no_receiving)
331  {
332  ACE_Time_Value wait_time (1);
333  ACE_INET_Addr remote;
334 
335  // allocate a buffer to send
336  char * buffer = buffer_.get_ptr ();
337  const char * print_prefix = "UdpTransportReadThread::run";
338 
340  "%s:" \
341  " entering main service loop.\n",
342  print_prefix);
343 
344  knowledge::KnowledgeMap rebroadcast_records;
345 
346  if (buffer == 0)
347  {
349  "%s:" \
350  " Unable to allocate buffer of size " PRIu32 ". Exiting thread.\n",
351  print_prefix,
353 
354  return;
355  }
356 
358  "%s:" \
359  " entering a recv on the socket.\n",
360  print_prefix);
361 
362  // read the message
363  ssize_t bytes_read = read_socket_.recv ((void *)buffer,
364  (size_t)settings_.queue_length, remote, 0, &wait_time);
365 
366  if (bytes_read > 0)
367  {
368  if (remote.get_host_addr () != 0)
369  {
372  "%s:" \
373  " received a message header of %lld bytes from %s:%d\n",
374  print_prefix,
375  (long long)bytes_read,
376  remote.get_host_addr (), (int)remote.get_port_number ());
377  }
378  else
379  {
382  "%s:" \
383  " received %lld bytes from unknown host\n",
384  print_prefix,
385  (long long)bytes_read);
386  }
387  MessageHeader * header = 0;
388 
389  std::stringstream remote_host;
390  remote_host << remote.get_host_addr ();
391  remote_host << ":";
392  remote_host << remote.get_port_number ();
393 
394  process_received_update (buffer, bytes_read, id_, *context_,
395  settings_, send_monitor_, receive_monitor_, rebroadcast_records,
396  #ifndef _MADARA_NO_KARL_
398  #endif // _MADARA_NO_KARL_
399  print_prefix,
400  remote_host.str ().c_str (), header);
401 
402  if (header)
403  {
404  if (header->ttl > 0 && rebroadcast_records.size () > 0 &&
406  {
407  --header->ttl;
408  header->ttl = std::min (
409  settings_.get_participant_ttl (), header->ttl);
410 
411  rebroadcast (print_prefix, header, rebroadcast_records);
412  }
413 
414  // delete header
415  delete header;
416  }
417  }
418  else
419  {
421  "%s:" \
422  " wait timeout on new messages. Proceeding to next wait\n",
423  print_prefix);
424  }
425  }
426 }
const QoSTransportSettings settings_
Transport settings.
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
ACE_SOCK_Dgram & read_socket_
The socket we are reading from.
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet to all peers.
const std::string id_
host:port identifier of this process
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
Provides scheduler for dropping packets.
std::vector< std::string > hosts
Host information for transports that require it.
knowledge::ThreadSafeContext * context_
knowledge context
MADARA_Export utility::Refcounter< logger::Logger > global_logger
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
std::map< std::string, ACE_INET_Addr > & addresses_
internet addresses of our peers
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
ACE_SOCK_Dgram & write_socket_
The socket we are writing to.
bool no_receiving
if true, never receive over transport
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:32
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:44
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:42
::std::map< std::string, KnowledgeRecord > KnowledgeMap
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
std::string on_data_received_logic
logic to be evaluated after every successful update
double slack_time
time to sleep between sends and rebroadcasts
static constexpr struct madara::knowledge::tags::string_t string
void run(void)
The main loop internals for the read thread.
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:112
uint32_t queue_length
Length of the buffer used to store history of events.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:740
Provides monitoring capability of a transport&#39;s bandwidth.
Provides functions and classes for the distributed knowledge base.
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
madara::utility::ScopedArray< char > buffer_
buffer for sending
UdpTransportReadThread(const TransportSettings &settings, const std::string &id, std::map< std::string, ACE_INET_Addr > &addresses, ACE_SOCK_Dgram &write_socket, ACE_SOCK_Dgram &read_socket, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
void cleanup(void)
Cleanup function called by thread manager.
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:56
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
int resend_attempts
Maximum number of attempts to resend if transport is busy.
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent&#39;s m...