MADARA  3.1.8
UdpTransport.cpp
Go to the documentation of this file.
4 
8 #include "ace/INET_Addr.h"
9 #include "ace/SOCK_Dgram.h"
10 
11 
12 #include <iostream>
13 
16  TransportSettings & config, bool launch_transport)
17  : Base (id, config, context)
18 {
19  // create a reference to the knowledge base for threading
20  knowledge_.use (context);
21 
22  // set the data plane for the read threads
24 
25  if (launch_transport)
26  setup ();
27 }
28 
30 {
31  close ();
32 }
33 
34 void
36 {
37  this->invalidate_transport ();
38 
40 
42 
43  // close send port
44  if (-1 == write_socket_.close ())
45  {
47  "UdpTransport::cleanup:" \
48  " Error closing write socket\n");
49  }
50 
51  // close receive port
52  if (-1 == read_socket_.close ())
53  {
55  "UdpTransport::cleanup:" \
56  " Error closing read socket\n");
57  }
58 }
59 
60 int
62 {
63  return RELIABLE;
64 }
65 
66 int
68 {
69  return RELIABLE;
70 }
71 
72 int
74 {
75  // call base setup method to initialize certain common variables
76  Base::setup ();
77 
78  if (settings_.hosts.size () > 0)
79  {
80  for (size_t i = 0; i < settings_.hosts.size (); ++i)
81  {
82  addresses_[settings_.hosts[i]].set (settings_.hosts[i].c_str ());
83  }
84 
85  // open the broadcast socket to any port for sending
86  if (write_socket_.open (ACE_Addr::sap_any) == -1)
87  {
89  "UdpTransport::setup:" \
90  " socket failed to open\n");
91  }
92 
93  int send_buff_size = 0, tar_buff_size (settings_.queue_length);
94  int rcv_buff_size = 0;
95  int opt_len = sizeof (int);
96 
97  write_socket_.get_option (SOL_SOCKET, SO_SNDBUF,
98  (void *)&send_buff_size, &opt_len);
99 
100  write_socket_.get_option (SOL_SOCKET, SO_RCVBUF,
101  (void *)&rcv_buff_size, &opt_len);
102 
104  "UdpTransport::setup:" \
105  " default socket buff size is send=%d, rcv=%d\n",
106  send_buff_size, rcv_buff_size);
107 
108  if (send_buff_size < tar_buff_size)
109  {
111  "UdpTransport::setup:" \
112  " setting send buff size to settings.queue_length (%d)\n",
113  tar_buff_size);
114 
115  write_socket_.set_option (SOL_SOCKET, SO_SNDBUF,
116  (void *)&tar_buff_size, opt_len);
117 
118  write_socket_.get_option (SOL_SOCKET, SO_SNDBUF,
119  (void *)&send_buff_size, &opt_len);
120 
122  "UdpTransport::setup:" \
123  " current socket buff size is send=%d, rcv=%d\n",
124  send_buff_size, rcv_buff_size);
125  }
126 
127  if (rcv_buff_size < tar_buff_size)
128  {
130  "UdpTransport::setup:" \
131  " setting rcv buff size to settings.queue_length (%d)\n",
132  tar_buff_size);
133 
134  write_socket_.set_option (SOL_SOCKET, SO_RCVBUF,
135  (void *)&tar_buff_size, opt_len);
136 
137  write_socket_.get_option (SOL_SOCKET, SO_RCVBUF,
138  (void *)&rcv_buff_size, &opt_len);
139 
141  "UdpTransport::setup:" \
142  " current socket buff size is send=%d, rcv=%d\n",
143  send_buff_size, rcv_buff_size);
144  }
145 
146 
147  if (addresses_.size () > 0)
148  {
149  // for receiving, we only want to bind to the local port
150  ACE_INET_Addr local (addresses_[settings_.hosts[0]].get_port_number ());
151 
152  if (-1 == read_socket_.open (local, 2, 0, 0))
153  {
155  "UdpTransport::setup:" \
156  " Error subscribing to udp address %s:%d\n",
157  local.get_host_addr (), local.get_port_number ());
158  }
159  else
160  {
162  "UdpTransport::setup:" \
163  " Success subscribing to udp address %s:%d\n",
164  local.get_host_addr (), local.get_port_number ());
165 
166  int send_buff_size = 0, tar_buff_size (settings_.queue_length);
167  int rcv_buff_size = 0;
168  int opt_len = sizeof (int);
169 
170  ACE_SOCK_Dgram & bare_socket = read_socket_;
171 
172  bare_socket.get_option (SOL_SOCKET, SO_RCVBUF,
173  (void *)&rcv_buff_size, &opt_len);
174 
176  "UdpTransport::setup:" \
177  " default socket buff size is send=%d, rcv=%d\n",
178  send_buff_size, rcv_buff_size);
179 
180  if (send_buff_size < tar_buff_size)
181  {
183  "UdpTransport::setup:" \
184  " setting send buff size to settings.queue_length (%d)\n",
185  tar_buff_size);
186 
187  bare_socket.set_option (SOL_SOCKET, SO_SNDBUF,
188  (void *)&tar_buff_size, opt_len);
189 
190  bare_socket.get_option (SOL_SOCKET, SO_SNDBUF,
191  (void *)&send_buff_size, &opt_len);
192 
194  "UdpTransport::setup:" \
195  " current socket buff size is send=%d, rcv=%d\n",
196  send_buff_size, rcv_buff_size);
197  }
198 
199  if (rcv_buff_size < tar_buff_size)
200  {
202  "UdpTransport::setup:" \
203  " setting rcv buff size to settings.queue_length (%d)\n",
204  tar_buff_size);
205 
206  bare_socket.set_option (SOL_SOCKET, SO_RCVBUF,
207  (void *)&tar_buff_size, opt_len);
208 
209  bare_socket.get_option (SOL_SOCKET, SO_RCVBUF,
210  (void *)&rcv_buff_size, &opt_len);
211 
213  "UdpTransportReadThread::setup:" \
214  " current socket buff size is send=%d, rcv=%d\n",
215  send_buff_size, rcv_buff_size);
216  }
217 
218  } // if successful socket
219 
220  } // if appropriate addresses
221 
222  if (!settings_.no_receiving)
223  {
224  double hertz = settings_.read_thread_hertz;
225  if (hertz < 0.0)
226  {
227  hertz = 0.0;
228  }
229 
231  "UdpTransportReadThread::setup:" \
232  " starting %d threads at %f hertz\n", settings_.read_threads,
233  hertz);
234 
235  for (uint32_t i = 0; i < settings_.read_threads; ++i)
236  {
237  std::stringstream thread_name;
238  thread_name << "read";
239  thread_name << i;
240 
241  read_threads_.run (hertz, thread_name.str (),
245  }
246  }
247  }
248 
249  return this->validate_transport ();
250 }
251 
252 long
254  const madara::knowledge::KnowledgeRecords & orig_updates)
255 {
256  const char * print_prefix = "UdpTransport::send_data";
257 
258  long result (0);
259 
260  if (!settings_.no_sending)
261  {
262  result = prep_send (orig_updates, print_prefix);
263 
264  if (result > 0)
265  {
266  uint64_t bytes_sent = 0;
267  uint64_t packet_size = (uint64_t)result;
268 
269  if (packet_size > settings_.max_fragment_size)
270  {
271  FragmentMap map;
272 
274  "%s:" \
275  " fragmenting %" PRIu64 " byte packet (%" PRIu32 " bytes is max fragment size)\n",
276  print_prefix, packet_size, settings_.max_fragment_size);
277 
278  // fragment the message
280 
281  int j (0);
282  for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i, ++j)
283  {
284  size_t frag_size =
285  (size_t) MessageHeader::get_size (i->second);
286 
287  for (std::map <std::string, ACE_INET_Addr>::const_iterator addr =
288  addresses_.begin (); addr != addresses_.end (); ++addr)
289  {
290  if (addr->first != settings_.hosts[0])
291  {
293  "%s:" \
294  " Sending fragment %d\n",
295  print_prefix, j);
296 
297  int send_attempts = -1;
298  ssize_t actual_sent = -1;
299 
300  while (actual_sent < 0 &&
301  (settings_.resend_attempts < 0 ||
302  send_attempts < settings_.resend_attempts))
303  {
304 
305  // send the fragment
306  actual_sent = write_socket_.send (
307  i->second, frag_size, addr->second);
308 
309  ++send_attempts;
310 
311  // sleep between fragments, if such a slack time is specified
312  if (settings_.slack_time > 0)
314 
316  "%s: Send result was %d of %d byte fragment to %s\n",
317  print_prefix, actual_sent, frag_size, addr->first.c_str ());
318 
319  if (actual_sent > 0)
320  {
321  bytes_sent = (uint64_t)actual_sent;
322 
324  "%s:" \
325  " Sent packet of size %" PRIu64 "\n",
326  print_prefix, bytes_sent);
327 
328  send_monitor_.add ((uint32_t)actual_sent);
329  }
330  else if (actual_sent == ECONNRESET)
331  {
333  "%s:" \
334  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
335  print_prefix);
336  }
337  else if (actual_sent == EINTR)
338  {
340  "%s:" \
341  " Local socket was interrupted during send (EINTR)\n",
342  print_prefix);
343  }
344  else if (actual_sent == EWOULDBLOCK)
345  {
347  "%s:" \
348  " Send would have blocked (EWOULDBLOCK)\n",
349  print_prefix);
350  }
351  else if (actual_sent == ENOTCONN)
352  {
354  "%s:" \
355  " Send reports socket is not connected (ENOTCONN)\n",
356  print_prefix);
357  }
358  else if (actual_sent == EADDRINUSE)
359  {
361  "%s:" \
362  " Send reports the interface is busy (EADDRINUSE)\n",
363  print_prefix);
364  }
365  else if (actual_sent == EBADF)
366  {
368  "%s:" \
369  " Send socket is invalid (EBADF)\n",
370  print_prefix);
371  }
372  else
373  {
375  "%s:" \
376  " Packet was not sent due to unknown error (%d)\n",
377  print_prefix, (int)actual_sent);
378  }
379  }
380  }
381  }
382  }
383 
384  if (bytes_sent > 0)
385  {
386  send_monitor_.add ((uint32_t)bytes_sent);
387  }
388 
390  "%s:" \
391  " Sent fragments totalling %" PRIu64 " bytes\n",
392  print_prefix, bytes_sent);
393 
394  delete_fragments (map);
395  }
396  else
397  {
398  for (std::map <std::string, ACE_INET_Addr>::const_iterator i =
399  addresses_.begin (); i != addresses_.end (); ++i)
400  {
401  if (i->first != settings_.hosts[0])
402  {
404  "%s:" \
405  " Sending packet of size %ld\n",
406  print_prefix, result);
407 
408  int send_attempts = -1;
409  ssize_t actual_sent = -1;
410 
411  while (actual_sent < 0 &&
412  (settings_.resend_attempts < 0 ||
413  send_attempts < settings_.resend_attempts))
414  {
415 
416  // send the fragment
417  actual_sent = write_socket_.send (buffer_.get_ptr (),
418  (ssize_t)result, i->second);
419 
420  ++send_attempts;
421 
422  if (actual_sent > 0)
423  {
424  bytes_sent += actual_sent;
425 
427  "%s:" \
428  " Sent packet of size %" PRIu64 "\n",
429  print_prefix, (int)actual_sent);
430 
431  send_monitor_.add ((uint32_t)actual_sent);
432  }
433  else if (actual_sent == ECONNRESET)
434  {
436  "%s:" \
437  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
438  print_prefix);
439  }
440  else if (actual_sent == EINTR)
441  {
443  "%s:" \
444  " Local socket was interrupted during send (EINTR)\n",
445  print_prefix);
446  }
447  else if (actual_sent == EWOULDBLOCK)
448  {
450  "%s:" \
451  " Send would have blocked (EWOULDBLOCK)\n",
452  print_prefix);
453  }
454  else if (actual_sent == ENOTCONN)
455  {
457  "%s:" \
458  " Send reports socket is not connected (ENOTCONN)\n",
459  print_prefix);
460  }
461  else if (actual_sent == EADDRINUSE)
462  {
464  "%s:" \
465  " Send reports the interface is busy (EADDRINUSE)\n",
466  print_prefix);
467  }
468  else if (actual_sent == EBADF)
469  {
471  "%s:" \
472  " Send socket is invalid (EBADF)\n",
473  print_prefix);
474  }
475  else
476  {
478  "%s:" \
479  " Packet was not sent due to unknown error (%d)\n",
480  print_prefix, (int)actual_sent);
481  }
482  }
483  }
484  }
485  }
486 
487  result = (long) bytes_sent;
488 
490  "%s:" \
491  " Send bandwidth = %" PRIu64 " B/s\n",
492  print_prefix, send_monitor_.get_bytes_per_second ());
493  }
494  }
495 
496  return result;
497 }
int setup(void)
all subclasses should call this method at the end of its setup
QoSTransportSettings settings_
Definition: Transport.h:188
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:815
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
UdpTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:34
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
This class stores variables and their values for use by any entity needing state information in a thr...
std::vector< std::string > hosts
Host information for transports that require it.
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
Definition: UdpTransport.h:90
std::map< std::string, ACE_INET_Addr > addresses_
Definition: UdpTransport.h:95
double read_thread_hertz
number of valid messages allowed to be received per second.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
void close(void)
Closes this transport.
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:203
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:73
threads::Threader read_threads_
threads for reading knowledge updates
Definition: UdpTransport.h:93
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
long send_data(const madara::knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
ACE_SOCK_Dgram write_socket_
underlying socket for sending
Definition: UdpTransport.h:98
const std::string id_
host:port identifier of this process
Definition: Transport.h:186
ACE_SOCK_Dgram read_socket_
The socket we are reading from.
Definition: UdpTransport.h:101
double slack_time
time to sleep between sends and rebroadcasts
static constexpr struct madara::knowledge::tags::string_t string
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
uint32_t read_threads
the number of read threads to start
int reliability(void) const
Accesses reliability setting.
uint32_t queue_length
Length of the buffer used to store history of events.
virtual ~UdpTransport()
Destructor.
void terminate(const std::string name)
Requests a specific thread to terminate.
Definition: Threader.cpp:150
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:206
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:209
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:33
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:143
Base class from which all transports must be derived.
Definition: Transport.h:62
Thread for reading knowledge updates through a UDP socket.
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:171
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:200
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:191
int resend_attempts
Maximum number of attempts to resend if transport is busy.
void use(ThreadSafeContext &original)
Refer to and use another knowledge base&#39;s context.