MADARA  3.1.8
UdpRegistryClientReadThread.cpp
Go to the documentation of this file.
2 
5 #include "ace/Time_Value.h"
6 
7 #include <iostream>
8 
10  const TransportSettings & settings,
11  const std::string & id,
12  std::map <std::string, ACE_INET_Addr> & addresses,
13  ACE_SOCK_Dgram & socket,
14  BandwidthMonitor & send_monitor,
15  BandwidthMonitor & receive_monitor,
16  PacketScheduler & packet_scheduler)
17  : settings_ (settings), id_ (id), context_ (0),
18  addresses_ (addresses),
19  socket_ (socket),
20  send_monitor_ (send_monitor),
21  receive_monitor_ (receive_monitor),
22  packet_scheduler_ (packet_scheduler)
23 {
24 
25 
26 }
27 
28 void
31 {
32  context_ = &(knowledge.get_context ());
33 
34  // setup the receive buffer
35  if (settings_.queue_length > 0)
36  buffer_ = new char [settings_.queue_length];
37 
38  if (context_)
39  {
40  // check for an on_data_received ruleset
41  if (settings_.on_data_received_logic.length () != 0)
42  {
43 
44 #ifndef _MADARA_NO_KARL_
46  "UdpRegistryClientReadThread::init:" \
47  " setting rules to %s\n",
49 
52 #endif // _MADARA_NO_KARL_
53  }
54  else
55  {
57  "UdpRegistryClientReadThread::init:" \
58  " no permanent rules were set\n");
59  }
60  }
61 }
62 
63 void
65  const char * print_prefix,
66  MessageHeader * header,
67  const knowledge::KnowledgeMap & records)
68 {
69  int64_t buffer_remaining = (int64_t) settings_.queue_length;
70  char * buffer = buffer_.get_ptr ();
71  int result (0);
72 
73  if (!settings_.no_sending)
74  {
75  result = prep_rebroadcast (*context_, buffer, buffer_remaining,
76  settings_, print_prefix,
77  header, records,
79 
80  if (result > 0)
81  {
82  uint64_t bytes_sent = 0;
83  uint64_t packet_size = (uint64_t)result;
84 
85  if (packet_size > settings_.max_fragment_size)
86  {
87  FragmentMap map;
88 
90  "%s:" \
91  " fragmenting %" PRIu64 " byte packet (%" PRIu32 " bytes is max fragment size)\n",
92  print_prefix, packet_size, settings_.max_fragment_size);
93 
94  // fragment the message
96 
97  int j (0);
98  for (FragmentMap::iterator i = map.begin (); i != map.end ();
99  ++i, ++j)
100  {
101  size_t frag_size =
102  (size_t)MessageHeader::get_size (i->second);
103 
104  for (std::map <std::string, ACE_INET_Addr>::const_iterator addr =
105  addresses_.begin (); addr != addresses_.end (); ++addr)
106  {
107  if (addr->first != settings_.hosts[0])
108  {
110  "%s:" \
111  " Sending fragment %d\n",
112  print_prefix, j);
113 
114  int send_attempts = -1;
115  ssize_t actual_sent = -1;
116 
117  while (actual_sent < 0 &&
118  (settings_.resend_attempts < 0 ||
119  send_attempts < settings_.resend_attempts))
120  {
121 
122  // send the fragment
123  actual_sent = socket_.send (
124  i->second, frag_size, addr->second);
125 
126  ++send_attempts;
127 
128  if (actual_sent > 0)
129  {
130  bytes_sent = (uint64_t)actual_sent;
131 
133  "%s:" \
134  " Sent packet of size %" PRIu64 "\n",
135  print_prefix, bytes_sent);
136 
137  send_monitor_.add ((uint32_t)actual_sent);
138  }
139  else if (actual_sent == ECONNRESET)
140  {
142  "%s:" \
143  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
144  print_prefix);
145  }
146  else if (actual_sent == EINTR)
147  {
149  "%s:" \
150  " Local socket was interrupted during send (EINTR)\n",
151  print_prefix);
152  }
153  else if (actual_sent == EWOULDBLOCK)
154  {
156  "%s:" \
157  " Send would have blocked (EWOULDBLOCK)\n",
158  print_prefix);
159  }
160  else if (actual_sent == ENOTCONN)
161  {
163  "%s:" \
164  " Send reports socket is not connected (ENOTCONN)\n",
165  print_prefix);
166  }
167  else if (actual_sent == EADDRINUSE)
168  {
170  "%s:" \
171  " Send reports the interface is busy (EADDRINUSE)\n",
172  print_prefix);
173  }
174  else if (actual_sent == EBADF)
175  {
177  "%s:" \
178  " Send socket is invalid (EBADF)\n",
179  print_prefix);
180  }
181  else
182  {
184  "%s:" \
185  " Packet was not sent due to unknown error (%d)\n",
186  print_prefix, (int)actual_sent);
187  }
188  }
189 
190  // sleep between fragments, if such a slack time is specified
191  if (settings_.slack_time > 0)
193 
194  }
195  }
196  }
197 
199  "%s:" \
200  " Sent fragments totalling %" PRIu64 " bytes\n",
201  print_prefix, bytes_sent);
202 
203  delete_fragments (map);
204  }
205  else
206  {
207  for (std::map <std::string, ACE_INET_Addr>::const_iterator i =
208  addresses_.begin (); i != addresses_.end (); ++i)
209  {
210  if (i->first != settings_.hosts[0])
211  {
214  "%s:" \
215  " Sending packet of size %ld\n",
216  print_prefix, result);
217 
218  int send_attempts = -1;
219  ssize_t actual_sent = -1;
220 
221  while (actual_sent < 0 &&
222  (settings_.resend_attempts < 0 ||
223  send_attempts < settings_.resend_attempts))
224  {
225 
226  // send the fragment
227  actual_sent = socket_.send (buffer_.get_ptr (),
228  (ssize_t)result, i->second);
229 
230  ++send_attempts;
231 
232  if (actual_sent > 0)
233  {
234  bytes_sent += actual_sent;
235 
238  "%s:" \
239  " Sent packet of size %" PRIu64 "\n",
240  print_prefix, (int)actual_sent);
241 
242  send_monitor_.add ((uint32_t)actual_sent);
243  }
244  else if (actual_sent == ECONNRESET)
245  {
248  "%s:" \
249  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
250  print_prefix);
251  }
252  else if (actual_sent == EINTR)
253  {
256  "%s:" \
257  " Local socket was interrupted during send (EINTR)\n",
258  print_prefix);
259  }
260  else if (actual_sent == EWOULDBLOCK)
261  {
264  "%s:" \
265  " Send would have blocked (EWOULDBLOCK)\n",
266  print_prefix);
267  }
268  else if (actual_sent == ENOTCONN)
269  {
272  "%s:" \
273  " Send reports socket is not connected (ENOTCONN)\n",
274  print_prefix);
275  }
276  else if (actual_sent == EADDRINUSE)
277  {
280  "%s:" \
281  " Send reports the interface is busy (EADDRINUSE)\n",
282  print_prefix);
283  }
284  else if (actual_sent == EBADF)
285  {
288  "%s:" \
289  " Send socket is invalid (EBADF)\n",
290  print_prefix);
291  }
292  else
293  {
296  "%s:" \
297  " Packet was not sent due to unknown error (%d)\n",
298  print_prefix, (int)actual_sent);
299  }
300  }
301  }
302  }
303 
305  "%s:" \
306  " Sent %d total bytes via rebroadcast\n",
307  print_prefix, bytes_sent);
308  }
309 
311  "%s:" \
312  " Send bandwidth = %" PRIu64 " B/s\n",
313  print_prefix,
315  }
316  }
317 }
318 
319 void
321 {
322 }
323 
324 
325 void
327 {
328  if (!settings_.no_receiving)
329  {
330  ACE_Time_Value wait_time (1);
331  ACE_INET_Addr remote;
332 
333  // allocate a buffer to send
334  char * buffer = buffer_.get_ptr ();
335  const char * print_prefix = "UdpRegistryClientReadThread::run";
336 
338  "%s:" \
339  " entering main service loop.\n",
340  print_prefix);
341 
342  knowledge::KnowledgeMap rebroadcast_records;
343 
344  if (buffer == 0)
345  {
347  "%s:" \
348  " Unable to allocate buffer of size " PRIu32 ". Exiting thread.\n",
349  print_prefix,
351 
352  return;
353  }
354 
356  "%s:" \
357  " entering a recv on the socket.\n",
358  print_prefix);
359 
360  // read the message
361  ssize_t bytes_read = socket_.recv ((void *)buffer,
362  (size_t)settings_.queue_length, remote, 0, &wait_time);
363 
364  if (bytes_read > 0)
365  {
366  if (remote.get_host_addr () != 0)
367  {
370  "%s:" \
371  " received a message header of %lld bytes from %s:%d\n",
372  print_prefix,
373  (long long)bytes_read,
374  remote.get_host_addr (), (int)remote.get_port_number ());
375  }
376  else
377  {
380  "%s:" \
381  " received %lld bytes from unknown host\n",
382  print_prefix,
383  (long long)bytes_read);
384  }
385  MessageHeader * header = 0;
386 
387  std::stringstream remote_host;
388  remote_host << remote.get_host_addr ();
389  remote_host << ":";
390  remote_host << remote.get_port_number ();
391 
392  process_received_update (buffer, bytes_read, id_, *context_,
393  settings_, send_monitor_, receive_monitor_, rebroadcast_records,
394  #ifndef _MADARA_NO_KARL_
396  #endif // _MADARA_NO_KARL_
397  print_prefix,
398  remote_host.str ().c_str (), header);
399 
400  if (header)
401  {
402  if (header->ttl > 0 && rebroadcast_records.size () > 0 &&
404  {
405  --header->ttl;
406  header->ttl = std::min (
407  settings_.get_participant_ttl (), header->ttl);
408 
409  rebroadcast (print_prefix, header, rebroadcast_records);
410  }
411 
412  // delete header
413  delete header;
414  }
415  }
416  else
417  {
419  "%s:" \
420  " wait timeout on new messages. Proceeding to next wait\n",
421  print_prefix);
422  }
423  }
424 }
knowledge::ThreadSafeContext * context_
knowledge context
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
madara::utility::ScopedArray< char > buffer_
buffer for sending
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
UdpRegistryClientReadThread(const TransportSettings &settings, const std::string &id, std::map< std::string, ACE_INET_Addr > &addresses, ACE_SOCK_Dgram &socket, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
std::map< std::string, ACE_INET_Addr > addresses_
internet addresses of our peers
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
void run(void)
The main loop internals for the read thread.
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet to all peers.
Provides scheduler for dropping packets.
std::vector< std::string > hosts
Host information for transports that require it.
MADARA_Export utility::Refcounter< logger::Logger > global_logger
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
const QoSTransportSettings settings_
Transport settings.
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:32
void cleanup(void)
Cleanup function called by thread manager.
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:44
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:42
::std::map< std::string, KnowledgeRecord > KnowledgeMap
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
std::string on_data_received_logic
logic to be evaluated after every successful update
double slack_time
time to sleep between sends and rebroadcasts
static constexpr struct madara::knowledge::tags::string_t string
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:112
ACE_SOCK_Dgram & socket_
The socket we are writing to and reading from.
uint32_t queue_length
Length of the buffer used to store history of events.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
const std::string id_
host:port identifier of this process
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:740
Provides monitoring capability of a transport&#39;s bandwidth.
Provides functions and classes for the distributed knowledge base.
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:56
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
int resend_attempts
Maximum number of attempts to resend if transport is busy.
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent&#39;s m...