MADARA  3.1.8
MulticastTransport.cpp
Go to the documentation of this file.
1 
5 
11 
12 #include <iostream>
14 
17  TransportSettings & config, bool launch_transport)
18 : Base (id, config, context),
19  write_socket_ (ACE_sap_any_cast (ACE_INET_Addr &), PF_INET, 0, 1)
20 {
21  // create a reference to the knowledge base for threading
22  knowledge_.use (context);
23 
24  // set the data plane for the read threads
26 
27  if (launch_transport)
28  setup ();
29 }
30 
32 {
33  close ();
34 }
35 
36 void
38 {
39  this->invalidate_transport ();
40 
42 
44 
45  write_socket_.close ();
46 }
47 
48 int
50 {
52 }
53 
54 int
56 {
58 }
59 
60 int
62 {
63  // call base setup method to initialize certain common variables
64  Base::setup ();
65 
66  // resize addresses to be the size of the list of hosts
67  addresses_.resize (this->settings_.hosts.size ());
68 
69  int ttl = 1;
70  int send_buff_size = 0, tar_buff_size (settings_.queue_length);
71  int rcv_buff_size = 0;
72  int opt_len = sizeof (int);
73 
74  write_socket_.set_option (IPPROTO_IP,
75  IP_MULTICAST_TTL,
76  (void *) &ttl,
77  sizeof (ttl));
78 
79  write_socket_.get_option (SOL_SOCKET, SO_SNDBUF,
80  (void *)&send_buff_size, &opt_len);
81 
82  write_socket_.get_option (SOL_SOCKET, SO_RCVBUF,
83  (void *)&rcv_buff_size, &opt_len);
84 
86  "MulticastTransport::setup:" \
87  " default socket buff size is send=%d, rcv=%d\n",
88  send_buff_size, rcv_buff_size);
89 
90  if (send_buff_size < tar_buff_size)
91  {
93  "MulticastTransport::setup:" \
94  " setting send buff size to settings.queue_length (%d)\n",
95  tar_buff_size);
96 
97  write_socket_.set_option (SOL_SOCKET, SO_SNDBUF,
98  (void *) &tar_buff_size, opt_len);
99 
100  write_socket_.get_option (SOL_SOCKET, SO_SNDBUF,
101  (void *)&send_buff_size, &opt_len);
102 
104  "MulticastTransport::setup:" \
105  " current socket buff size is send=%d, rcv=%d\n",
106  send_buff_size, rcv_buff_size);
107  }
108 
109  if (rcv_buff_size < tar_buff_size)
110  {
112  "MulticastTransport::setup:" \
113  " setting rcv buff size to settings.queue_length (%d)\n",
114  tar_buff_size);
115 
116  write_socket_.set_option (SOL_SOCKET, SO_SNDBUF,
117  (void *) &tar_buff_size, opt_len);
118 
119  write_socket_.get_option (SOL_SOCKET, SO_SNDBUF,
120  (void *)&rcv_buff_size, &opt_len);
121 
123  "MulticastTransport::setup:" \
124  " current socket buff size is send=%d, rcv=%d\n",
125  send_buff_size, rcv_buff_size);
126  }
127 
128  if (addresses_.size () > 0)
129  {
130  // convert the string host:port into an ACE address
131  for (unsigned int i = 0; i < addresses_.size (); ++i)
132  {
133  addresses_[i].set (settings_.hosts[i].c_str ());
134 
136  "MulticastTransport::setup:" \
137  " settings address[%d] to %s:%d\n", i,
138  addresses_[i].get_host_addr (), addresses_[i].get_port_number ());
139  }
140 
141  int port = addresses_[0].get_port_number ();
142  const char * host = addresses_[0].get_host_addr ();
143 
144  if (-1 == read_socket_.join (addresses_[0], 1))
145  {
147  "MulticastTransport::setup:" \
148  " Error subscribing to multicast address %s:%d\n", host, port);
149  }
150  else
151  {
153  "MulticastTransport::setup:" \
154  " Success subscribing to multicast address %s:%d\n", host, port);
155 
156  int send_buff_size = 0, tar_buff_size (settings_.queue_length);
157  int rcv_buff_size = 0;
158  int opt_len = sizeof (int);
159 
160  ACE_SOCK_Dgram & bare_socket = read_socket_;
161 
162  bare_socket.get_option (SOL_SOCKET, SO_RCVBUF,
163  (void *)&rcv_buff_size, &opt_len);
164 
166  "MulticastTransport::setup:" \
167  " default socket buff size is send=%d, rcv=%d\n",
168  send_buff_size, rcv_buff_size);
169 
170  if (send_buff_size < tar_buff_size)
171  {
173  "MulticastTransport::setup:" \
174  " setting send buff size to settings.queue_length (%d)\n",
175  tar_buff_size);
176 
177  bare_socket.set_option (SOL_SOCKET, SO_SNDBUF,
178  (void *)&tar_buff_size, opt_len);
179 
180  bare_socket.get_option (SOL_SOCKET, SO_SNDBUF,
181  (void *)&send_buff_size, &opt_len);
182 
184  "MulticastTransport::setup:" \
185  " current socket buff size is send=%d, rcv=%d\n",
186  send_buff_size, rcv_buff_size);
187  }
188 
189  if (rcv_buff_size < tar_buff_size)
190  {
192  "MulticastTransport::setup:" \
193  " setting rcv buff size to settings.queue_length (%d)\n",
194  tar_buff_size);
195 
196  bare_socket.set_option (SOL_SOCKET, SO_RCVBUF,
197  (void *)&tar_buff_size, opt_len);
198 
199  bare_socket.get_option (SOL_SOCKET, SO_RCVBUF,
200  (void *)&rcv_buff_size, &opt_len);
201 
203  "MulticastTransport::setup:" \
204  " current socket buff size is send=%d, rcv=%d\n",
205  send_buff_size, rcv_buff_size);
206  }
207  }
208 
209  if (!settings_.no_receiving)
210  {
211  double hertz = settings_.read_thread_hertz;
212  if (hertz < 0.0)
213  {
214  hertz = 0.0;
215  }
216 
218  "MulticastTransport::setup:" \
219  " starting %d threads at %f hertz\n", settings_.read_threads,
220  hertz);
221 
222  for (uint32_t i = 0; i < settings_.read_threads; ++i)
223  {
224  std::stringstream thread_name;
225  thread_name << "read";
226  thread_name << i;
227 
228  read_threads_.run (hertz, thread_name.str (),
232  }
233  }
234  }
235 
236 
237 
238  return this->validate_transport ();
239 }
240 
241 long
243  const madara::knowledge::KnowledgeRecords & orig_updates)
244 {
245  long result (0);
246  const char * print_prefix = "MulticastTransport::send_data";
247 
248  if (!settings_.no_sending)
249  {
250  result = prep_send (orig_updates, print_prefix);
251 
252  if (addresses_.size () > 0 && result > 0)
253  {
254  uint64_t bytes_sent = 0;
255  uint64_t packet_size = (uint64_t)result;
256 
257  if (packet_size > settings_.max_fragment_size)
258  {
259  FragmentMap map;
260 
262  "%s:" \
263  " fragmenting %" PRIu64 " byte packet (%" PRIu32 " bytes is max fragment size)\n",
264  print_prefix, packet_size, settings_.max_fragment_size);
265 
266  // fragment the message
268 
269  int j (0);
270  for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i)
271  {
273  "%s:" \
274  " Sending fragment %d\n",
275  print_prefix, j);
276 
277  int send_attempts = -1;
278  ssize_t actual_sent = -1;
279 
280  while (actual_sent < 0 &&
281  (settings_.resend_attempts < 0 ||
282  send_attempts < settings_.resend_attempts))
283  {
284 
285  // send the fragment
286  actual_sent = write_socket_.send (
287  i->second,
288  (ssize_t)MessageHeader::get_size (i->second),
289  addresses_[0]);
290 
291  ++send_attempts;
292 
293  if (actual_sent > 0)
294  {
296  "%s:" \
297  " Sent %d byte fragment\n",
298  print_prefix, (int)actual_sent);
299 
300  bytes_sent += actual_sent;
301  }
302  }
303 
304  // sleep between fragments, if such a slack time is specified
305  if (settings_.slack_time > 0)
307  }
308 
309  if (bytes_sent > 0)
310  {
311  send_monitor_.add ((uint32_t)bytes_sent);
312  }
313 
315  "%s:" \
316  " Sent fragments totalling %" PRIu64 " bytes\n",
317  print_prefix, bytes_sent);
318 
319  delete_fragments (map);
320  }
321  else
322  {
324  "%s:" \
325  " Sending packet of size %ld\n",
326  print_prefix, result);
327 
328  int send_attempts = -1;
329  ssize_t actual_sent = -1;
330 
331  while (actual_sent < 0 &&
332  (settings_.resend_attempts < 0 ||
333  send_attempts < settings_.resend_attempts))
334  {
335 
336  // send the fragment
337  actual_sent = write_socket_.send (
338  buffer_.get_ptr (), (ssize_t)result, addresses_[0]);
339 
340  ++send_attempts;
341 
342  if (actual_sent > 0)
343  {
344  bytes_sent = (uint64_t)actual_sent;
345 
347  "%s:" \
348  " Sent packet of size %" PRIu64 "\n",
349  print_prefix, bytes_sent);
350 
351  send_monitor_.add ((uint32_t)actual_sent);
352  }
353  else if (actual_sent == ECONNRESET)
354  {
356  "%s:" \
357  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
358  print_prefix);
359  }
360  else if (actual_sent == EINTR)
361  {
363  "%s:" \
364  " Local socket was interrupted during send (EINTR)\n",
365  print_prefix);
366  }
367  else if (actual_sent == EWOULDBLOCK)
368  {
370  "%s:" \
371  " Send would have blocked (EWOULDBLOCK)\n",
372  print_prefix);
373  }
374  else if (actual_sent == ENOTCONN)
375  {
377  "%s:" \
378  " Send reports socket is not connected (ENOTCONN)\n",
379  print_prefix);
380  }
381  else if (actual_sent == EADDRINUSE)
382  {
384  "%s:" \
385  " Send reports the interface is busy (EADDRINUSE)\n",
386  print_prefix);
387  }
388  else if (actual_sent == EBADF)
389  {
391  "%s:" \
392  " Send socket is invalid (EBADF)\n",
393  print_prefix);
394  }
395  else
396  {
398  "%s:" \
399  " Packet was not sent due to unknown error (%d)\n",
400  print_prefix, (int)actual_sent);
401  }
402  }
403  }
404 
405 
407  "%s:" \
408  " Send bandwidth = %" PRIu64 " B/s\n",
409  print_prefix, send_monitor_.get_bytes_per_second ());
410 
411  result = (long) bytes_sent;
412  }
413  }
414 
415  return result;
416 }
int setup(void)
Initializes the transport.
QoSTransportSettings settings_
Definition: Transport.h:188
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:815
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:34
ACE_SOCK_Dgram_Mcast read_socket_
The multicast socket we are reading from.
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
threads::Threader read_threads_
threads for reading knowledge updates
MulticastTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
ACE_SOCK_Dgram write_socket_
underlying socket for sending
void close(void)
Closes the transport.
This class stores variables and their values for use by any entity needing state information in a thr...
std::vector< std::string > hosts
Host information for transports that require it.
Thread for reading knowledge updates through a Multicast datagram socket.
double read_thread_hertz
number of valid messages allowed to be received per second.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
int reliability(void) const
Accesses reliability setting.
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:203
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:73
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
const std::string id_
host:port identifier of this process
Definition: Transport.h:186
double slack_time
time to sleep between sends and rebroadcasts
static constexpr struct madara::knowledge::tags::string_t string
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
uint32_t read_threads
the number of read threads to start
uint32_t queue_length
Length of the buffer used to store history of events.
void terminate(const std::string name)
Requests a specific thread to terminate.
Definition: Threader.cpp:150
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:206
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:209
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:33
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:143
std::vector< ACE_INET_Addr > addresses_
holds all multicast addresses we are sending to
Base class from which all transports must be derived.
Definition: Transport.h:62
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:171
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:200
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:191
int resend_attempts
Maximum number of attempts to resend if transport is busy.
long send_data(const madara::knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.
void use(ThreadSafeContext &original)
Refer to and use another knowledge base&#39;s context.