MADARA  3.1.8
BroadcastTransport.cpp
Go to the documentation of this file.
4 
7 
8 #include <iostream>
9 
12  TransportSettings & config, bool launch_transport)
13 : Base (id, config, context)
14 {
15  // create a reference to the knowledge base for threading
16  knowledge_.use (context);
17 
18  // set the data plane for the read threads
20 
21  if (launch_transport)
22  setup ();
23 }
24 
26 {
27  close ();
28 }
29 
30 void
32 {
33  this->invalidate_transport ();
34 
36 
38 
39  socket_.close ();
40 }
41 
42 int
44 {
46 }
47 
48 int
50 {
52 }
53 
54 int
56 {
57  // call base setup method to initialize certain common variables
58  Base::setup ();
59 
60  // resize addresses to be the size of the list of hosts
61  addresses_.resize (this->settings_.hosts.size ());
62 
63  if (addresses_.size () > 0)
64  {
65  // convert the string host:port into an ACE address
66  for (unsigned int i = 0; i < addresses_.size (); ++i)
67  {
68  addresses_[i].set (settings_.hosts[i].c_str ());
69 
71  "BroadcastTransport::constructor" \
72  " settings address[%d] to %s:%d\n", i,
73  addresses_[i].get_host_addr (), addresses_[i].get_port_number ());
74  }
75 
76  // open the broadcast socket to any port for sending
77  if (socket_.open (ACE_Addr::sap_any) == -1)
78  {
80  "BroadcastTransport::constructor" \
81  "socket failed to open\n");
82  }
83  else
84  {
85  int send_buff_size = 0, tar_buff_size (settings_.queue_length);
86  int rcv_buff_size = 0;
87  int opt_len = sizeof (int);
88 
89  socket_.get_option (SOL_SOCKET, SO_SNDBUF,
90  (void *)&send_buff_size, &opt_len);
91 
92  socket_.get_option (SOL_SOCKET, SO_RCVBUF,
93  (void *)&rcv_buff_size, &opt_len);
94 
96  "BroadcastTransport::constructor" \
97  " default socket buff size is send=%d, rcv=%d\n",
98  send_buff_size, rcv_buff_size);
99 
100  if (send_buff_size < tar_buff_size)
101  {
103  "BroadcastTransport::constructor" \
104  " setting send buff size to settings.queue_length (%d)\n",
105  tar_buff_size);
106 
107  socket_.set_option (SOL_SOCKET, SO_SNDBUF,
108  (void *)&tar_buff_size, opt_len);
109 
110  socket_.get_option (SOL_SOCKET, SO_SNDBUF,
111  (void *)&send_buff_size, &opt_len);
112 
114  "BroadcastTransport::constructor" \
115  " current socket buff size is send=%d, rcv=%d\n",
116  send_buff_size, rcv_buff_size);
117  }
118 
119  if (rcv_buff_size < tar_buff_size)
120  {
122  "BroadcastTransport::constructor" \
123  " setting rcv buff size to settings.queue_length (%d)\n",
124  tar_buff_size);
125 
126  socket_.set_option (SOL_SOCKET, SO_RCVBUF,
127  (void *)&tar_buff_size, opt_len);
128 
129  socket_.get_option (SOL_SOCKET, SO_RCVBUF,
130  (void *)&rcv_buff_size, &opt_len);
131 
133  "BroadcastTransport::constructor" \
134  " current socket buff size is send=%d, rcv=%d\n",
135  send_buff_size, rcv_buff_size);
136  }
137 
138  if (!settings_.no_receiving)
139  {
140  // start the read threads
141  double hertz = settings_.read_thread_hertz;
142  if (hertz < 0.0)
143  {
144  hertz = 0.0;
145  }
146 
148  "BroadcastTransport::constructor:" \
149  " starting %d threads at %f hertz\n", settings_.read_threads,
150  hertz);
151 
152  for (uint32_t i = 0; i < settings_.read_threads; ++i)
153  {
154  std::stringstream thread_name;
155  thread_name << "read";
156  thread_name << i;
157 
158  read_threads_.run (hertz, thread_name.str (),
162  }
163  }
164  }
165 
166  }
167  return this->validate_transport ();
168 }
169 
170 long
172  const madara::knowledge::KnowledgeRecords & orig_updates)
173 {
174  const char * print_prefix = "BroadcastTransport::send_data";
175  long result (0);
176 
177  if (!settings_.no_sending)
178  {
179  result = prep_send (orig_updates, print_prefix);
180 
181  if (addresses_.size () > 0 && result > 0)
182  {
183  uint64_t bytes_sent = 0;
184  uint64_t packet_size = (uint64_t)result;
185 
186  if (packet_size > settings_.max_fragment_size)
187  {
188  FragmentMap map;
189 
191  "%s:" \
192  " fragmenting %" PRIu64 " byte packet (%" PRIu32 " bytes is max fragment size)\n",
193  print_prefix, packet_size, settings_.max_fragment_size);
194 
195  // fragment the message
197 
198  int j = 0;
199  for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i, ++j)
200  {
202  "%s:" \
203  " Sending fragment %d\n",
204  print_prefix, j);
205 
206  int send_attempts = -1;
207  ssize_t actual_sent = -1;
208 
209  while (actual_sent < 0 &&
210  (settings_.resend_attempts < 0 ||
211  send_attempts < settings_.resend_attempts))
212  {
213 
214  // send the fragment
215  actual_sent = socket_.send (
216  i->second,
217  (ssize_t)MessageHeader::get_size (i->second),
218  addresses_[0]);
219 
220  ++send_attempts;
221 
222  if (actual_sent > 0)
223  {
225  "%s:" \
226  " Sent %d byte fragment\n",
227  print_prefix, (int)actual_sent);
228 
229  bytes_sent += actual_sent;
230  }
231  else
232  {
234  "%s:" \
235  " Send fragment failed, socket busy\n", print_prefix);
236  }
237  }
238 
239  // sleep between fragments, if such a slack time is specified
240  if (settings_.slack_time > 0)
242  }
243 
244  if (bytes_sent > 0)
245  {
246  send_monitor_.add ((uint32_t)bytes_sent);
247  }
248 
250  "%s:" \
251  " Sent fragments totalling %" PRIu64 " bytes\n",
252  print_prefix, bytes_sent);
253 
254  delete_fragments (map);
255  }
256  else
257  {
259  "%s:" \
260  " Sending packet of size %ld\n",
261  print_prefix, result);
262 
263  int send_attempts = -1;
264  ssize_t actual_sent = -1;
265 
266  while (actual_sent < 0 &&
267  (settings_.resend_attempts < 0 ||
268  send_attempts < settings_.resend_attempts))
269  {
270 
271  // send the fragment
272  actual_sent = socket_.send (
273  buffer_.get_ptr (), (ssize_t)result, addresses_[0]);
274 
275  ++send_attempts;
276 
277  if (actual_sent > 0)
278  {
279  bytes_sent = (uint64_t)actual_sent;
280 
282  "%s:" \
283  " Sent packet of size %" PRIu64 "\n",
284  print_prefix, bytes_sent);
285 
286  send_monitor_.add ((uint32_t)actual_sent);
287  }
288  else if (actual_sent == ECONNRESET)
289  {
291  "%s:" \
292  " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
293  print_prefix);
294  }
295  else if (actual_sent == EINTR)
296  {
298  "%s:" \
299  " Local socket was interrupted during send (EINTR)\n",
300  print_prefix);
301  }
302  else if (actual_sent == EWOULDBLOCK)
303  {
305  "%s:" \
306  " Send would have blocked (EWOULDBLOCK)\n",
307  print_prefix);
308  }
309  else if (actual_sent == ENOTCONN)
310  {
312  "%s:" \
313  " Send reports socket is not connected (ENOTCONN)\n",
314  print_prefix);
315  }
316  else if (actual_sent == EADDRINUSE)
317  {
319  "%s:" \
320  " Send reports the interface is busy (EADDRINUSE)\n",
321  print_prefix);
322  }
323  else if (actual_sent == EBADF)
324  {
326  "%s:" \
327  " Send socket is invalid (EBADF)\n",
328  print_prefix);
329  }
330  else
331  {
333  "%s:" \
334  " Packet was not sent due to unknown error (%d)\n",
335  print_prefix, (int)actual_sent);
336  }
337  }
338  }
339 
341  "%s:" \
342  " Send bandwidth = %" PRIu64 " B/s\n",
343  print_prefix, send_monitor_.get_bytes_per_second ());
344 
345  result = (long) bytes_sent;
346  }
347  }
348 
349  return result;
350 }
Thread for reading knowledge updates through a Multicast datagram socket.
QoSTransportSettings settings_
Definition: Transport.h:188
void close(void)
Closes the transport.
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:815
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:34
int setup(void)
Initializes the transport.
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
int reliability(void) const
Accesses reliability setting.
threads::Threader read_threads_
threads for reading knowledge updates
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
This class stores variables and their values for use by any entity needing state information in a thr...
std::vector< std::string > hosts
Host information for transports that require it.
double read_thread_hertz
number of valid messages allowed to be received per second.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:203
ACE_SOCK_Dgram_Bcast socket_
underlying socket for sending
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:73
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
BroadcastTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
long send_data(const madara::knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.
const std::string id_
host:port identifier of this process
Definition: Transport.h:186
double slack_time
time to sleep between sends and rebroadcasts
static constexpr struct madara::knowledge::tags::string_t string
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
uint32_t read_threads
the number of read threads to start
uint32_t queue_length
Length of the buffer used to store history of events.
void terminate(const std::string name)
Requests a specific thread to terminate.
Definition: Threader.cpp:150
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:206
std::vector< ACE_INET_Addr > addresses_
holds all multicast addresses we are sending to
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:209
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:33
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:143
Base class from which all transports must be derived.
Definition: Transport.h:62
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:171
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:200
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:191
int resend_attempts
Maximum number of attempts to resend if transport is busy.
void use(ThreadSafeContext &original)
Refer to and use another knowledge base&#39;s context.