MADARA  3.1.8
UdpRegistryClient.cpp
Go to the documentation of this file.
4 
8 #include "ace/INET_Addr.h"
9 #include "ace/SOCK_Dgram.h"
10 
11 
12 #include <iostream>
13 
16  TransportSettings & config, bool launch_transport)
17  : Base (id, config, context)
18 {
19  // create a reference to the knowledge base for threading
20  knowledge_.use (context);
21 
22  // set the data plane for the read threads
24 
25  endpoints_.set_name ("domain." + config.write_domain + ".endpoints", knowledge_);
26 
27  if (launch_transport)
28  setup ();
29 }
30 
32 {
33  close ();
34 }
35 
36 void
38 {
39  this->invalidate_transport ();
40 
42 
44 
45  // close send port
46  if (-1 == socket_.close ())
47  {
49  "UdpRegistryClient::cleanup:" \
50  " Error closing socket\n");
51  }
52 }
53 
54 int
56 {
57  return RELIABLE;
58 }
59 
60 int
62 {
63  return RELIABLE;
64 }
65 
66 int
68 {
69  // call base setup method to initialize certain common variables
70  Base::setup ();
71 
72  if (settings_.hosts.size () > 0)
73  {
74  for (size_t i = 0; i < settings_.hosts.size (); ++i)
75  {
77  "UdpRegistryClient::setup:" \
78  " adding server %s to registry lookup list\n",
79  settings_.hosts[i].c_str ());
80 
81  servers_[settings_.hosts[i]].set (settings_.hosts[i].c_str ());
82  }
83 
84  std::string host;
85  unsigned short port = 50100;
86  int bind_result = utility::bind_to_ephemeral_port (socket_, host, port);
87 
88  // open the socket to any port for sending
89  if (bind_result == -1)
90  {
92  "UdpRegistryClient::setup:" \
93  " socket failed to open\n");
94  }
95  else
96  {
97  ACE_Addr endpoint_addr;
98  socket_.get_local_addr (endpoint_addr);
99  endpoint_addr.get_addr ();
100 
102  "UdpRegistryClient::setup:" \
103  " socket bound to %s:%d\n", host.c_str (), (int)port);
104  }
105 
106  int send_buff_size = 0, tar_buff_size (settings_.queue_length);
107  int rcv_buff_size = 0;
108  int opt_len = sizeof (int);
109 
110  socket_.get_option (SOL_SOCKET, SO_SNDBUF,
111  (void *)&send_buff_size, &opt_len);
112 
113  socket_.get_option (SOL_SOCKET, SO_RCVBUF,
114  (void *)&rcv_buff_size, &opt_len);
115 
117  "UdpRegistryClient::setup:" \
118  " default socket buff size is send=%d, rcv=%d\n",
119  send_buff_size, rcv_buff_size);
120 
121  if (send_buff_size < tar_buff_size)
122  {
124  "UdpRegistryClient::setup:" \
125  " setting send buff size to settings.queue_length (%d)\n",
126  tar_buff_size);
127 
128  socket_.set_option (SOL_SOCKET, SO_SNDBUF,
129  (void *)&tar_buff_size, opt_len);
130 
131  socket_.get_option (SOL_SOCKET, SO_SNDBUF,
132  (void *)&send_buff_size, &opt_len);
133 
135  "UdpRegistryClient::setup:" \
136  " current socket buff size is send=%d, rcv=%d\n",
137  send_buff_size, rcv_buff_size);
138  }
139 
140  if (rcv_buff_size < tar_buff_size)
141  {
143  "UdpRegistryClient::setup:" \
144  " setting rcv buff size to settings.queue_length (%d)\n",
145  tar_buff_size);
146 
147  socket_.set_option (SOL_SOCKET, SO_RCVBUF,
148  (void *)&tar_buff_size, opt_len);
149 
150  socket_.get_option (SOL_SOCKET, SO_RCVBUF,
151  (void *)&rcv_buff_size, &opt_len);
152 
154  "UdpRegistryClient::setup:" \
155  " current socket buff size is send=%d, rcv=%d\n",
156  send_buff_size, rcv_buff_size);
157  }
158 
159  if (!settings_.no_receiving)
160  {
161  double hertz = settings_.read_thread_hertz;
162  if (hertz < 0.0)
163  {
164  hertz = 0.0;
165  }
166 
168  "UdpRegistryClientReadThread::setup:" \
169  " starting %d threads at %f hertz\n", settings_.read_threads,
170  hertz);
171 
172  for (uint32_t i = 0; i < settings_.read_threads; ++i)
173  {
174  std::stringstream thread_name;
175  thread_name << "read";
176  thread_name << i;
177 
178  read_threads_.run (hertz, thread_name.str (),
182  }
183  }
184  }
185 
186  return this->validate_transport ();
187 }
188 
189 void
191 {
192  const char * print_prefix = "UdpRegistryClient::register";
193 
194  long result (0);
195  uint64_t bytes_sent = 0;
196 
197  if (servers_.size () > 0)
198  {
199  // Register messages always use the message header to include domain
200  MessageHeader header;
201  header.type = transport::REGISTER;
202  strncpy (header.originator, this->id_.c_str (), sizeof (header.originator) - 1);
203  strncpy (header.domain, this->settings_.write_domain.c_str (),
204  sizeof (header.domain) - 1);
205  header.updates = 0;
206  header.clock = context_.get_clock ();
207  // compute size of this header
208  header.size = header.encoded_size ();
209 
210  int64_t buffer_remaining = settings_.queue_length;
211 
212  char * update = header.write (buffer_.get_ptr (), buffer_remaining);
213 
214  result = (long)(update - buffer_.get_ptr ());
215 
216  for (std::map <std::string, ACE_INET_Addr>::const_iterator i =
217  servers_.begin (); i != servers_.end (); ++i)
218  {
220  "%s:" \
221  " Sending register of size %d to %s\n",
222  print_prefix, (int)result, i->first.c_str ());
223 
224  int send_attempts = -1;
225  ssize_t actual_sent = -1;
226 
227  while (actual_sent < 0 &&
228  (settings_.resend_attempts < 0 ||
229  send_attempts < settings_.resend_attempts))
230  {
231 
232  // send the fragment
233  actual_sent = socket_.send (buffer_.get_ptr (),
234  (ssize_t)result, i->second);
235 
236  ++send_attempts;
237 
238  if (actual_sent > 0)
239  {
240  bytes_sent += actual_sent;
241 
243  "%s:" \
244  " Sent register of size %d to %s\n",
245  print_prefix, (int)actual_sent, i->first.c_str ());
246 
247  send_monitor_.add ((uint32_t)actual_sent);
248  }
249  else if (actual_sent == ECONNRESET)
250  {
252  "%s:" \
253  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
254  print_prefix);
255  }
256  else if (actual_sent == EINTR)
257  {
259  "%s:" \
260  " Local socket was interrupted during send (EINTR)\n",
261  print_prefix);
262  }
263  else if (actual_sent == EWOULDBLOCK)
264  {
266  "%s:" \
267  " Send would have blocked (EWOULDBLOCK)\n",
268  print_prefix);
269  }
270  else if (actual_sent == ENOTCONN)
271  {
273  "%s:" \
274  " Send reports socket is not connected (ENOTCONN)\n",
275  print_prefix);
276  }
277  else if (actual_sent == EADDRINUSE)
278  {
280  "%s:" \
281  " Send reports the interface is busy (EADDRINUSE)\n",
282  print_prefix);
283  }
284  else if (actual_sent == EBADF)
285  {
287  "%s:" \
288  " Send socket is invalid (EBADF)\n",
289  print_prefix);
290  }
291  else
292  {
294  "%s:" \
295  " register was not sent due to unknown error (%d)\n",
296  print_prefix, (int)actual_sent);
297  }
298  }
299  }
300  }
301  else
302  {
304  "%s:" \
305  " ERROR: no servers available for sending. Failed to register.\n");
306  }
307 }
308 
309 long
311  const madara::knowledge::KnowledgeRecords & orig_updates)
312 {
313  const char * print_prefix = "UdpRegistryClient::send_data";
314 
315  long result (0);
316 
317  if (!settings_.no_sending)
318  {
319  this->endpoints_.sync_keys ();
320 
321  std::vector <std::string> hosts;
322  this->clients_.clear ();
323  this->endpoints_.keys (hosts);
324 
325  for (size_t i = 0; i < hosts.size (); ++i)
326  {
327  clients_[hosts[i]].set (hosts[i].c_str ());
328  }
329 
330  send_register ();
331 
332  result = prep_send (orig_updates, print_prefix);
333 
334  if (result > 0)
335  {
336  uint64_t bytes_sent = 0;
337  uint64_t packet_size = (uint64_t)result;
338 
339  if (packet_size > settings_.max_fragment_size)
340  {
341  FragmentMap map;
342 
344  "%s:" \
345  " fragmenting %" PRIu64 " byte packet (%" PRIu32
346  " bytes is max fragment size)\n",
347  print_prefix, packet_size, settings_.max_fragment_size);
348 
349  // fragment the message
351 
352  int j (0);
353  for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i, ++j)
354  {
355  size_t frag_size =
356  (size_t) MessageHeader::get_size (i->second);
357 
358  for (std::map <std::string, ACE_INET_Addr>::const_iterator addr =
359  clients_.begin (); addr != clients_.end (); ++addr)
360  {
361  if (addr->first != settings_.hosts[0])
362  {
364  "%s:" \
365  " Sending fragment %d to %s\n",
366  print_prefix, j, addr->first.c_str ());
367 
368  int send_attempts = -1;
369  ssize_t actual_sent = -1;
370 
371  while (actual_sent < 0 &&
372  (settings_.resend_attempts < 0 ||
373  send_attempts < settings_.resend_attempts))
374  {
375 
376  // send the fragment
377  actual_sent = socket_.send (
378  i->second, frag_size, addr->second);
379 
380  ++send_attempts;
381 
382  // sleep between fragments, if such a slack time is specified
383  if (settings_.slack_time > 0)
385 
387  "%s: Send result was %d of %d byte fragment to %s\n",
388  print_prefix, (int)actual_sent, (int)frag_size, addr->first.c_str ());
389 
390  if (actual_sent > 0)
391  {
392  bytes_sent = (uint64_t)actual_sent;
393 
395  "%s:" \
396  " Sent packet of size %" PRIu64 " to %s\n",
397  print_prefix, bytes_sent, addr->first.c_str ());
398 
399  send_monitor_.add ((uint32_t)actual_sent);
400  }
401  else if (actual_sent == ECONNRESET)
402  {
404  "%s:" \
405  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
406  print_prefix);
407  }
408  else if (actual_sent == EINTR)
409  {
411  "%s:" \
412  " Local socket was interrupted during send (EINTR)\n",
413  print_prefix);
414  }
415  else if (actual_sent == EWOULDBLOCK)
416  {
418  "%s:" \
419  " Send would have blocked (EWOULDBLOCK)\n",
420  print_prefix);
421  }
422  else if (actual_sent == ENOTCONN)
423  {
425  "%s:" \
426  " Send reports socket is not connected (ENOTCONN)\n",
427  print_prefix);
428  }
429  else if (actual_sent == EADDRINUSE)
430  {
432  "%s:" \
433  " Send reports the interface is busy (EADDRINUSE)\n",
434  print_prefix);
435  }
436  else if (actual_sent == EBADF)
437  {
439  "%s:" \
440  " Send socket is invalid (EBADF)\n",
441  print_prefix);
442  }
443  else
444  {
446  "%s:" \
447  " Packet was not sent due to unknown error (%d)\n",
448  print_prefix, (int)actual_sent);
449  }
450  }
451  }
452  }
453  }
454 
455  if (bytes_sent > 0)
456  {
457  send_monitor_.add ((uint32_t)bytes_sent);
458  }
459 
461  "%s:" \
462  " Sent fragments totalling %" PRIu64 " bytes\n",
463  print_prefix, bytes_sent);
464 
465  delete_fragments (map);
466  }
467  else
468  {
469  for (std::map <std::string, ACE_INET_Addr>::const_iterator i =
470  clients_.begin (); i != clients_.end (); ++i)
471  {
472  if (i->first != settings_.hosts[0])
473  {
475  "%s:" \
476  " Sending packet of size %d to %s\n",
477  print_prefix, (int)result, i->first.c_str ());
478 
479  int send_attempts = -1;
480  ssize_t actual_sent = -1;
481 
482  while (actual_sent < 0 &&
483  (settings_.resend_attempts < 0 ||
484  send_attempts < settings_.resend_attempts))
485  {
486 
487  // send the fragment
488  actual_sent = socket_.send (buffer_.get_ptr (),
489  (ssize_t)result, i->second);
490 
491  ++send_attempts;
492 
493  if (actual_sent > 0)
494  {
495  bytes_sent += actual_sent;
496 
498  "%s:" \
499  " Sent packet of size %d to %s\n",
500  print_prefix, (int)actual_sent, i->first.c_str ());
501 
502  send_monitor_.add ((uint32_t)actual_sent);
503  }
504  else if (actual_sent == ECONNRESET)
505  {
507  "%s:" \
508  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
509  print_prefix);
510  }
511  else if (actual_sent == EINTR)
512  {
514  "%s:" \
515  " Local socket was interrupted during send (EINTR)\n",
516  print_prefix);
517  }
518  else if (actual_sent == EWOULDBLOCK)
519  {
521  "%s:" \
522  " Send would have blocked (EWOULDBLOCK)\n",
523  print_prefix);
524  }
525  else if (actual_sent == ENOTCONN)
526  {
528  "%s:" \
529  " Send reports socket is not connected (ENOTCONN)\n",
530  print_prefix);
531  }
532  else if (actual_sent == EADDRINUSE)
533  {
535  "%s:" \
536  " Send reports the interface is busy (EADDRINUSE)\n",
537  print_prefix);
538  }
539  else if (actual_sent == EBADF)
540  {
542  "%s:" \
543  " Send socket is invalid (EBADF)\n",
544  print_prefix);
545  }
546  else
547  {
549  "%s:" \
550  " Packet was not sent due to unknown error (%d)\n",
551  print_prefix, (int)actual_sent);
552  }
553  }
554  }
555  }
556  }
557 
558  result = (long) bytes_sent;
559 
561  "%s:" \
562  " Send bandwidth = %" PRIu64 " B/s\n",
563  print_prefix, send_monitor_.get_bytes_per_second ());
564  }
565  }
566 
567  return result;
568 }
Thread for reading knowledge and registry updates through a UDP socket.
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
QoSTransportSettings settings_
Definition: Transport.h:188
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:815
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:34
MADARA_Export int bind_to_ephemeral_port(ACE_SOCK_Dgram &socket, std::string &host, unsigned short &port, bool increase_until_bound=true)
Binds to an ephemeral port.
Definition: Utility.cpp:397
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
knowledge::containers::Map endpoints_
int reliability(void) const
Accesses reliability setting.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
uint32_t updates
the number of knowledge variable updates in the message
void keys(std::vector< std::string > &curkeys) const
Returns the keys within the map.
Definition: Map.cpp:512
ACE_SOCK_Dgram socket_
underlying socket for sending and receiving
This class stores variables and their values for use by any entity needing state information in a thr...
void close(void)
Closes this transport.
std::vector< std::string > hosts
Host information for transports that require it.
std::vector< std::string > sync_keys(void)
Syncs the keys from the knowledge base.
Definition: Map.cpp:225
std::map< std::string, ACE_INET_Addr > clients_
registry clients
std::map< std::string, ACE_INET_Addr > servers_
registry servers
double read_thread_hertz
number of valid messages allowed to be received per second.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
uint32_t type
the type of message
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:203
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:73
long send_data(const madara::knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
std::string write_domain
All class members are accessible to users for easy setup.
uint64_t size
the size of this header plus the updates
void set_name(const std::string &var_name, KnowledgeBase &knowledge, bool sync=true)
Sets the variable name that this refers to.
Definition: Map.cpp:411
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
void send_register(void)
Sends register messages to all servers.
const std::string id_
host:port identifier of this process
Definition: Transport.h:186
double slack_time
time to sleep between sends and rebroadcasts
static constexpr struct madara::knowledge::tags::string_t string
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
uint32_t read_threads
the number of read threads to start
char originator[64]
the originator of the message (host:port)
char domain[32]
the domain that this message is intended for
uint32_t queue_length
Length of the buffer used to store history of events.
UdpRegistryClient(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
void terminate(const std::string name)
Requests a specific thread to terminate.
Definition: Threader.cpp:150
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
int setup(void)
all subclasses should call this method at the end of its setup
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
uint64_t clock
the clock of the sender when the message was generated
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:206
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:209
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:33
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:56
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:143
Base class from which all transports must be derived.
Definition: Transport.h:62
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:171
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:200
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:191
int resend_attempts
Maximum number of attempts to resend if transport is busy.
void use(ThreadSafeContext &original)
Refer to and use another knowledge base&#39;s context.
threads::Threader read_threads_
threads for reading knowledge updates
virtual char * write(char *buffer, int64_t &buffer_remaining)
Writes a MessageHeader instance to a buffer and updates the amount of buffer room remaining...