MADARA  3.1.8
UdpRegistryServer.cpp
Go to the documentation of this file.
4 
8 #include "ace/INET_Addr.h"
9 #include "ace/SOCK_Dgram.h"
10 
11 
12 #include <iostream>
13 
16  TransportSettings & config, bool launch_transport)
17  : Base (id, config, context)
18 {
19  // create a reference to the knowledge base for threading
20  knowledge_.use (context);
21 
22  // set the data plane for the read threads
24 
25  endpoints_.set_name ("domain." + config.write_domain + ".endpoints", knowledge_);
26 
27  if (launch_transport)
28  setup ();
29 }
30 
32 {
33  close ();
34 }
35 
36 void
38 {
39  this->invalidate_transport ();
40 
42 
44 
45  // close send port
46  if (-1 == write_socket_.close ())
47  {
49  "UdpRegistryServer::cleanup:" \
50  " Error closing write socket\n");
51  }
52 
53  // close receive port
54  if (-1 == read_socket_.close ())
55  {
57  "UdpRegistryServer::cleanup:" \
58  " Error closing read socket\n");
59  }
60 }
61 
62 int
64 {
65  return RELIABLE;
66 }
67 
68 int
70 {
71  return RELIABLE;
72 }
73 
74 int
76 {
77  // call base setup method to initialize certain common variables
78  Base::setup ();
79 
80  if (settings_.hosts.size () > 0)
81  {
82  for (size_t i = 0; i < settings_.hosts.size (); ++i)
83  {
85  "UdpRegistryClient::setup:" \
86  " adding server %s to registry lookup list\n",
87  settings_.hosts[i].c_str ());
88 
89  servers_[settings_.hosts[i]].set (settings_.hosts[i].c_str ());
90  }
91 
92  // open the broadcast socket to any port for sending
93  if (write_socket_.open (ACE_Addr::sap_any) == -1)
94  {
96  "UdpRegistryServer::setup:" \
97  " socket failed to open\n");
98  }
99 
100  int send_buff_size = 0, tar_buff_size (settings_.queue_length);
101  int rcv_buff_size = 0;
102  int opt_len = sizeof (int);
103 
104  write_socket_.get_option (SOL_SOCKET, SO_SNDBUF,
105  (void *)&send_buff_size, &opt_len);
106 
107  write_socket_.get_option (SOL_SOCKET, SO_RCVBUF,
108  (void *)&rcv_buff_size, &opt_len);
109 
111  "UdpRegistryServer::setup:" \
112  " default socket buff size is send=%d, rcv=%d\n",
113  send_buff_size, rcv_buff_size);
114 
115  if (send_buff_size < tar_buff_size)
116  {
118  "UdpRegistryServer::setup:" \
119  " setting send buff size to settings.queue_length (%d)\n",
120  tar_buff_size);
121 
122  write_socket_.set_option (SOL_SOCKET, SO_SNDBUF,
123  (void *)&tar_buff_size, opt_len);
124 
125  write_socket_.get_option (SOL_SOCKET, SO_SNDBUF,
126  (void *)&send_buff_size, &opt_len);
127 
129  "UdpRegistryServer::setup:" \
130  " current socket buff size is send=%d, rcv=%d\n",
131  send_buff_size, rcv_buff_size);
132  }
133 
134  if (rcv_buff_size < tar_buff_size)
135  {
137  "UdpRegistryServer::setup:" \
138  " setting rcv buff size to settings.queue_length (%d)\n",
139  tar_buff_size);
140 
141  write_socket_.set_option (SOL_SOCKET, SO_RCVBUF,
142  (void *)&tar_buff_size, opt_len);
143 
144  write_socket_.get_option (SOL_SOCKET, SO_RCVBUF,
145  (void *)&rcv_buff_size, &opt_len);
146 
148  "UdpRegistryServer::setup:" \
149  " current socket buff size is send=%d, rcv=%d\n",
150  send_buff_size, rcv_buff_size);
151  }
152 
153 
154  if (servers_.size () > 0)
155  {
156  // for receiving, we only want to bind to the local port
157  ACE_INET_Addr local (servers_[settings_.hosts[0]].get_port_number ());
158 
159  if (-1 == read_socket_.open (local, 2, 0, 0))
160  {
162  "UdpRegistryServer::setup:" \
163  " Error subscribing to udp address %s:%d\n",
164  local.get_host_addr (), local.get_port_number ());
165  }
166  else
167  {
169  "UdpRegistryServer::setup:" \
170  " Success subscribing to udp address %s:%d\n",
171  local.get_host_addr (), local.get_port_number ());
172 
173  int send_buff_size = 0, tar_buff_size (settings_.queue_length);
174  int rcv_buff_size = 0;
175  int opt_len = sizeof (int);
176 
177  ACE_SOCK_Dgram & bare_socket = read_socket_;
178 
179  bare_socket.get_option (SOL_SOCKET, SO_RCVBUF,
180  (void *)&rcv_buff_size, &opt_len);
181 
183  "UdpRegistryServer::setup:" \
184  " default socket buff size is send=%d, rcv=%d\n",
185  send_buff_size, rcv_buff_size);
186 
187  if (send_buff_size < tar_buff_size)
188  {
190  "UdpRegistryServer::setup:" \
191  " setting send buff size to settings.queue_length (%d)\n",
192  tar_buff_size);
193 
194  bare_socket.set_option (SOL_SOCKET, SO_SNDBUF,
195  (void *)&tar_buff_size, opt_len);
196 
197  bare_socket.get_option (SOL_SOCKET, SO_SNDBUF,
198  (void *)&send_buff_size, &opt_len);
199 
201  "UdpRegistryServer::setup:" \
202  " current socket buff size is send=%d, rcv=%d\n",
203  send_buff_size, rcv_buff_size);
204  }
205 
206  if (rcv_buff_size < tar_buff_size)
207  {
209  "UdpRegistryServer::setup:" \
210  " setting rcv buff size to settings.queue_length (%d)\n",
211  tar_buff_size);
212 
213  bare_socket.set_option (SOL_SOCKET, SO_RCVBUF,
214  (void *)&tar_buff_size, opt_len);
215 
216  bare_socket.get_option (SOL_SOCKET, SO_RCVBUF,
217  (void *)&rcv_buff_size, &opt_len);
218 
220  "UdpRegistryServerReadThread::setup:" \
221  " current socket buff size is send=%d, rcv=%d\n",
222  send_buff_size, rcv_buff_size);
223  }
224 
225  } // if successful socket
226 
227  } // if appropriate addresses
228 
229  if (!settings_.no_receiving)
230  {
231  double hertz = settings_.read_thread_hertz;
232  if (hertz < 0.0)
233  {
234  hertz = 0.0;
235  }
236 
238  "UdpRegistryServerReadThread::setup:" \
239  " starting %d threads at %f hertz\n", settings_.read_threads,
240  hertz);
241 
242  for (uint32_t i = 0; i < settings_.read_threads; ++i)
243  {
244  std::stringstream thread_name;
245  thread_name << "read";
246  thread_name << i;
247 
248  read_threads_.run (hertz, thread_name.str (),
252  }
253  }
254  }
255 
256  return this->validate_transport ();
257 }
258 
259 long
261  const madara::knowledge::KnowledgeRecords & orig_updates)
262 {
263  const char * print_prefix = "UdpRegistryServer::send_data";
264 
265  long result (0);
266 
267  if (!settings_.no_sending)
268  {
269  this->endpoints_.sync_keys ();
270 
271  std::vector <std::string> hosts;
272  this->clients_.clear ();
273  this->endpoints_.keys (hosts);
274 
275  for (size_t i = 0; i < hosts.size (); ++i)
276  {
278  "%s:" \
279  " adding %s to client send list\n",
280  print_prefix, hosts[i].c_str ());
281 
282  clients_[hosts[i]].set (hosts[i].c_str ());
283  }
284 
285  result = prep_send (orig_updates, print_prefix);
286 
287  if (result > 0)
288  {
289  uint64_t bytes_sent = 0;
290  uint64_t packet_size = (uint64_t)result;
291 
292  if (packet_size > settings_.max_fragment_size)
293  {
294  FragmentMap map;
295 
297  "%s:" \
298  " fragmenting %" PRIu64 " byte packet (%" PRIu32 " bytes is max fragment size)\n",
299  print_prefix, packet_size, settings_.max_fragment_size);
300 
301  // fragment the message
303 
304  int j (0);
305  for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i, ++j)
306  {
307  size_t frag_size =
308  (size_t) MessageHeader::get_size (i->second);
309 
310  for (std::map <std::string, ACE_INET_Addr>::const_iterator addr =
311  clients_.begin (); addr != clients_.end (); ++addr)
312  {
313  if (addr->first != settings_.hosts[0])
314  {
316  "%s:" \
317  " Sending fragment %d\n",
318  print_prefix, j);
319 
320  int send_attempts = -1;
321  ssize_t actual_sent = -1;
322 
323  while (actual_sent < 0 &&
324  (settings_.resend_attempts < 0 ||
325  send_attempts < settings_.resend_attempts))
326  {
327 
328  // send the fragment
329  actual_sent = write_socket_.send (
330  i->second, frag_size, addr->second);
331 
332  ++send_attempts;
333 
334  // sleep between fragments, if such a slack time is specified
335  if (settings_.slack_time > 0)
337 
339  "%s: Send result was %d of %d byte fragment to %s\n",
340  print_prefix, (int)actual_sent,
341  (int)frag_size, addr->first.c_str ());
342 
343  if (actual_sent > 0)
344  {
345  bytes_sent = (uint64_t)actual_sent;
346 
348  "%s:" \
349  " Sent packet of size %" PRIu64 "\n",
350  print_prefix, bytes_sent);
351 
352  send_monitor_.add ((uint32_t)actual_sent);
353  }
354  else if (actual_sent == ECONNRESET)
355  {
357  "%s:" \
358  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
359  print_prefix);
360  }
361  else if (actual_sent == EINTR)
362  {
364  "%s:" \
365  " Local socket was interrupted during send (EINTR)\n",
366  print_prefix);
367  }
368  else if (actual_sent == EWOULDBLOCK)
369  {
371  "%s:" \
372  " Send would have blocked (EWOULDBLOCK)\n",
373  print_prefix);
374  }
375  else if (actual_sent == ENOTCONN)
376  {
378  "%s:" \
379  " Send reports socket is not connected (ENOTCONN)\n",
380  print_prefix);
381  }
382  else if (actual_sent == EADDRINUSE)
383  {
385  "%s:" \
386  " Send reports the interface is busy (EADDRINUSE)\n",
387  print_prefix);
388  }
389  else if (actual_sent == EBADF)
390  {
392  "%s:" \
393  " Send socket is invalid (EBADF)\n",
394  print_prefix);
395  }
396  else
397  {
399  "%s:" \
400  " Packet was not sent due to unknown error (%d)\n",
401  print_prefix, (int)actual_sent);
402  }
403  }
404  }
405  }
406  }
407 
408  if (bytes_sent > 0)
409  {
410  send_monitor_.add ((uint32_t)bytes_sent);
411  }
412 
414  "%s:" \
415  " Sent fragments totalling %" PRIu64 " bytes\n",
416  print_prefix, bytes_sent);
417 
418  delete_fragments (map);
419  }
420  else
421  {
422  for (std::map <std::string, ACE_INET_Addr>::const_iterator i =
423  clients_.begin (); i != clients_.end (); ++i)
424  {
425  if (i->first != settings_.hosts[0])
426  {
428  "%s:" \
429  " Sending packet of size %ld to %s\n",
430  print_prefix, result, i->first.c_str ());
431 
432  int send_attempts = -1;
433  ssize_t actual_sent = -1;
434 
435  while (actual_sent < 0 &&
436  (settings_.resend_attempts < 0 ||
437  send_attempts < settings_.resend_attempts))
438  {
439 
440  // send the fragment
441  actual_sent = write_socket_.send (buffer_.get_ptr (),
442  (ssize_t)result, i->second);
443 
444  ++send_attempts;
445 
446  if (actual_sent > 0)
447  {
448  bytes_sent += actual_sent;
449 
451  "%s:" \
452  " Sent packet of size %d to %s\n",
453  print_prefix, (int)actual_sent, i->first.c_str ());
454 
455  send_monitor_.add ((uint32_t)actual_sent);
456  }
457  else if (actual_sent == ECONNRESET)
458  {
460  "%s:" \
461  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
462  print_prefix);
463  }
464  else if (actual_sent == EINTR)
465  {
467  "%s:" \
468  " Local socket was interrupted during send (EINTR)\n",
469  print_prefix);
470  }
471  else if (actual_sent == EWOULDBLOCK)
472  {
474  "%s:" \
475  " Send would have blocked (EWOULDBLOCK)\n",
476  print_prefix);
477  }
478  else if (actual_sent == ENOTCONN)
479  {
481  "%s:" \
482  " Send reports socket is not connected (ENOTCONN)\n",
483  print_prefix);
484  }
485  else if (actual_sent == EADDRINUSE)
486  {
488  "%s:" \
489  " Send reports the interface is busy (EADDRINUSE)\n",
490  print_prefix);
491  }
492  else if (actual_sent == EBADF)
493  {
495  "%s:" \
496  " Send socket is invalid (EBADF)\n",
497  print_prefix);
498  }
499  else
500  {
502  "%s:" \
503  " Packet was not sent due to unknown error (%d)\n",
504  print_prefix, (int)actual_sent);
505  }
506  }
507  }
508  }
509  }
510 
511  result = (long) bytes_sent;
512 
514  "%s:" \
515  " Send bandwidth = %" PRIu64 " B/s\n",
516  print_prefix, send_monitor_.get_bytes_per_second ());
517  }
518  }
519 
520  return result;
521 }
QoSTransportSettings settings_
Definition: Transport.h:188
int setup(void)
all subclasses should call this method at the end of its setup
long send_data(const madara::knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:815
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:34
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
void keys(std::vector< std::string > &curkeys) const
Returns the keys within the map.
Definition: Map.cpp:512
This class stores variables and their values for use by any entity needing state information in a thr...
std::vector< std::string > hosts
Host information for transports that require it.
std::vector< std::string > sync_keys(void)
Syncs the keys from the knowledge base.
Definition: Map.cpp:225
double read_thread_hertz
number of valid messages allowed to be received per second.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:203
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:73
knowledge::containers::Map endpoints_
Thread for reading registry updates through a UDP socket.
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
std::string write_domain
All class members are accessible to users for easy setup.
void set_name(const std::string &var_name, KnowledgeBase &knowledge, bool sync=true)
Sets the variable name that this refers to.
Definition: Map.cpp:411
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
std::map< std::string, ACE_INET_Addr > clients_
registry clients
bool no_sending
if true, never send over transport
ACE_SOCK_Dgram read_socket_
The socket we are reading from.
UdpRegistryServer(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
void add(uint64_t size)
Adds a message to the monitor.
int reliability(void) const
Accesses reliability setting.
const std::string id_
host:port identifier of this process
Definition: Transport.h:186
double slack_time
time to sleep between sends and rebroadcasts
static constexpr struct madara::knowledge::tags::string_t string
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
uint32_t read_threads
the number of read threads to start
uint32_t queue_length
Length of the buffer used to store history of events.
void terminate(const std::string name)
Requests a specific thread to terminate.
Definition: Threader.cpp:150
std::map< std::string, ACE_INET_Addr > servers_
registry servers
ACE_SOCK_Dgram write_socket_
underlying socket for sending
void close(void)
Closes this transport.
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:206
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:209
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:33
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:143
Base class from which all transports must be derived.
Definition: Transport.h:62
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:171
threads::Threader read_threads_
threads for reading knowledge updates
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:200
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:191
int resend_attempts
Maximum number of attempts to resend if transport is busy.
void use(ThreadSafeContext &original)
Refer to and use another knowledge base&#39;s context.