MADARA  3.1.8
TransportSettings.h
Go to the documentation of this file.
1 #ifndef _MADARA_TRANSPORT_SETTINGS_H_
2 #define _MADARA_TRANSPORT_SETTINGS_H_
3 
16 #include <string>
17 #include <sstream>
18 #include <vector>
19 #include <map>
20 #include <ostream>
21 #include "ace/Thread_Mutex.h"
22 #include "ace/Recursive_Thread_Mutex.h"
23 #include "ace/Condition_T.h"
24 #include "ace/Guard_T.h"
25 
26 #include "ace/High_Res_Timer.h"
27 
28 #ifdef _USE_CID_
29 
30 #include "madara/cid/CIDTransportSettings.h"
31 #include "madara/cid/CIDConvenience.h"
32 #include "madara/cid/CIDHeuristic.h"
33 #include "madara/cid/CIDGenetic.h"
34 #include "madara/utility/Utility.h"
35 
36 #endif // _USE_CID_
37 
38 #include "madara/LockType.h"
43 #include "madara/MADARA_export.h"
45 
46 namespace madara
47 {
48  namespace transport
49  {
50  // forward declare for friending
51  class Base;
52 
53  typedef ACE_High_Res_Timer Timer;
54  typedef std::vector<Timer> Timers;
55 
56 #ifdef _USE_CID_
57  typedef madara::Cid::TransportSettings LatencyTransportSettings;
58 #endif
59 
60  enum Types {
62  SPLICE = 1,
63  NDDS = 2,
64  TCP = 3,
65  UDP = 4,
66  MULTICAST = 5,
67  BROADCAST = 6,
70  ZMQ = 9
71  };
72 
76  };
77 
78  enum Messages {
79  ASSIGN = 0,
80  OPERATION = 1,
82  REGISTER = 3,
83  LATENCY = 10,
86  VOTE = 20
87  };
88 
92  class MADARA_Export TransportSettings
93  {
94  public:
95  // allow transport::Base to alter private/protected members
96  friend class Base;
97 
98  // for ease-of-use, typedef the templated guard
99  typedef ACE_Guard <MADARA_LOCK_TYPE> ContextGuard;
100 
102  #define DEFAULT_DOMAIN "KaRL"
103 
104  typedef std::vector <std::string> Voters;
105 
108  static const uint32_t DEFAULT_QUEUE_LENGTH = 500000;
109 
111  static const uint32_t DEFAULT_DEADLINE = 0;
112 
114  static const uint32_t DEFAULT_TRANSPORT = NO_TRANSPORT;
115 
117  static const uint32_t DEFAULT_RELIABILITY = RELIABLE;
118 
122  #define DEFAULT_ID 0
123 
127  #define DEFAULT_PROCESSES 1
128 
132  #define MAXIMUM_RESEND_ATTEMPTS 10
133 
134 #ifdef _USE_CID_
135 
139  #define DEFAULT_LATENCY_ENABLED false
140 
144  #define DEFAULT_LATENCY_TIMEOUT 10.0
145 
149  #define DEFAULT_LATENCY 200000000000
150 
154  #define DEFAULT_REDEPLOYMENT_PERCENTAGE 0.10
155 
156 #endif // _USE_CID_
157 
159  TransportSettings ();
160 
162  TransportSettings (const TransportSettings & settings);
163 
164  virtual ~TransportSettings ();
165 
167  void operator= (const TransportSettings & settings);
168 
173  void add_read_domain (const std::string domain);
174 
178  void clear_read_domains (void);
179 
184  void get_read_domains (std::vector<std::string> & domains) const;
185 
191  bool is_reading_domain (const std::string domain) const;
192 
197  size_t num_read_domains (void) const;
198 
199 #ifdef _USE_CID_
200 
203  inline void reset_timers (void)
204  {
205  ContextGuard guard (mutex);
206 
207  for (uint32_t i = 0; i < processes; ++i)
208  timers[i].reset ();
209  madara::Cid::reset_latencies (latencies, latency_default);
210  }
211 
215  inline void start_all_timers (void)
216  {
217  ContextGuard guard (mutex);
218 
219  reset_timers ();
220 
221  for (unsigned int i = 0; i < processes; ++i)
222  timers[i].start ();
223  }
224 
228  inline void stop_timer (int index, bool roundtrip = true)
229  {
230  ACE_hrtime_t measured;
231  ContextGuard guard (mutex);
232 
233  timers[index].stop ();
234  timers[index].elapsed_time (measured);
235 
236  latencies.network_latencies[id][index].second = roundtrip ?
237  (uint64_t) measured / 2 : (uint64_t) measured;
238  }
239 
244  inline void setup (void)
245  {
246  ContextGuard guard (mutex);
247 
248  if (latency_enabled)
249  {
250  madara::Cid::init (processes, latencies);
251  timers.resize (processes);
252 
253  madara::Cid::reset_latencies (latencies, latency_default);
254  }
255  }
256 
261  inline void setup (madara::Cid::AlgorithmConfigs & configs)
262  {
263  ContextGuard guard (mutex);
264 
265  latencies.algorithm_configs = configs;
266  latencies.results.resize (configs.size ());
267  }
268 
272  void print_my_latencies (std::ostream & output)
273  {
274  // we do not use a guard here because we want to do I/O operations
275  // outside of the mutex.
276  std::stringstream buffer;
277 
278  mutex.acquire ();
279  madara::Cid::Identifiers & ids = latencies.ids;
280  madara::Cid::LatencyVector & current = latencies.network_latencies[id];
281 
282  buffer << "Latencies for id = " << id << std::endl;
283 
284  // print each id -> latency
285  for (uint32_t i = 0; i < processes; ++i)
286  {
287  buffer << ids[i] << " = " << current[i].second << std::endl;
288  }
289 
290  mutex.release ();
291 
292  output << buffer.str ();
293  }
294 
298  void print_all_latencies (std::ostream & output)
299  {
300  // we do not use a guard here because we want to do I/O operations
301  // outside of the mutex.
302  std::stringstream buffer;
303 
304  mutex.acquire ();
305  madara::Cid::Identifiers & ids = latencies.ids;
306 
307  buffer << "\nAll latencies in the context:\n\n";
308 
309  // print each id -> latency
310  for (uint32_t i = 0; i < processes; ++i)
311  {
312  madara::Cid::LatencyVector & current = latencies.network_latencies[i];
313  for (uint32_t j = 0; j < processes; ++j)
314  {
315  buffer << ids[i] << " to " << ids[j] <<
316  " = " << current[j].second << std::endl;
317  }
318  }
319 
320  mutex.release ();
321 
322  // print the buffer
323  output << buffer.str ();
324  }
325 
329  void print_all_summations (std::ostream & output)
330  {
331  // we do not use a guard here because we want to do I/O operations
332  // outside of the mutex.
333  std::stringstream buffer;
334 
335  mutex.acquire ();
336  madara::Cid::Identifiers & ids = latencies.ids;
337  madara::Cid::SummationsMap & summation_map =
338  latencies.network_summations;
339 
340  buffer << "\nAll summations in the context:\n\n";
341 
342  // print each id -> latency
343  for (madara::Cid::SummationsMap::iterator i = summation_map.begin ();
344  i != summation_map.end (); ++i)
345  {
346  buffer << "Degree = " << i->first << std::endl;
347  madara::Cid::LatencyVector & current = i->second;
348 
349  // temporary, since we can't figure out why the summations aren't
350  // sorting in the read thread.
351  std::sort (current.begin (), current.end (),
352  madara::Cid::IncreasingLatency);
353 
354  for (madara::Cid::LatencyVector::size_type j = 0;
355  j < current.size (); ++j)
356  {
357  buffer << " " << current[j].first << " = " <<
358  current[j].second << "\n";
359  }
360  }
361 
362  mutex.release ();
363 
364  // print the buffer
365  output << buffer.str ();
366  }
367 
371  void print_all_results (std::ostream & output)
372  {
373  // we do not use a guard here because we want to do I/O operations
374  // outside of the mutex.
375  std::stringstream buffer;
376 
377  mutex.acquire ();
378  madara::Cid::Identifiers & ids = latencies.ids;
379  madara::Cid::AlgorithmConfigs & configs = latencies.algorithm_configs;
380  madara::Cid::AlgorithmResults & results = latencies.results;
381 
382  buffer << "\nAll redeployment algorithm results in the context:\n\n";
383 
384  // print each id -> latency
385  for (madara::Cid::AlgorithmConfigs::size_type i = 0;
386  i < configs.size (); ++i)
387  {
388  if ( results[i].algorithm == madara::Cid::CID)
389  buffer << "CID,";
390  else if (results[i].algorithm == madara::Cid::BCID)
391  buffer << "BCID,";
392  else if (results[i].algorithm == madara::Cid::BCID_GGA)
393  buffer << "BCID-GGA (" << configs[i].time << "),";
394  else if (results[i].algorithm == madara::Cid::BCID_BGA)
395  buffer << "BCID-BGA (" << configs[i].time << "),";
396  else if (results[i].algorithm == madara::Cid::CID_BGA)
397  buffer << "CID-BGA (" << configs[i].time << "),";
398  else if (results[i].algorithm == madara::Cid::CID_GGA)
399  buffer << "CID-GGA (" << configs[i].time << "),";
400 
401  buffer << results[i].latency << "\n ";
402  buffer << results[i].deployment << "\n";
403  }
404 
405  mutex.release ();
406 
407  buffer << "\n";
408 
409  // print the buffer
410  output << buffer.str ();
411  }
412 
417  std::string aggregate_latencies (void)
418  {
419  std::stringstream buffer;
420  ContextGuard guard (mutex);
421  madara::Cid::LatencyVector & current = latencies.network_latencies[id];
422 
423  // print each id -> latency
424  for (uint32_t i = 0; i < processes; ++i)
425  {
426  buffer << i << "=" << current[i].second << ";";
427  }
428 
429  return buffer.str ();
430  }
431 
437  void unaggregate_latencies (uint32_t source,
438  const std::string & aggregation)
439  {
440  std::stringstream stream (aggregation);
441  ContextGuard guard (mutex);
442  madara::Cid::LatencyVector & current = latencies.network_latencies[source];
443 
444  // key symbol value symbol
445  // 0 = 15 or 24 = 13847169741, for instance
446  char symbol;
447  unsigned int key;
448  uint64_t value;
449 
450  for (unsigned int i = 0; !stream.eof (); ++i)
451  {
452  stream >> key >> symbol >> value >> symbol;
453 
454  // make a quick check to see if these values are indeed useful
455  if (i < processes)
456  {
457  current[i].first = key;
458  current[i].second = value;
459  }
460  }
461  }
462 
466  std::string pack_summations (void)
467  {
468  ContextGuard guard (mutex);
469 
470  return madara::Cid::prepare_summations (id, latencies);
471  }
472 
478  void unpack_summations (uint32_t source,
479  const std::string & summations)
480  {
481  std::stringstream stream (summations);
482  ContextGuard guard (mutex);
483  madara::Cid::LatencyVector & current =
484  latencies.network_latencies[source];
485  madara::Cid::SummationsMap & summations_map =
486  latencies.network_summations;
487 
488  // key symbol value symbol
489  // 0 = 15 or 24 = 13847169741, for instance
490  char symbol;
491  unsigned int key;
492  uint64_t value;
493 
494  while (!stream.eof ())
495  {
496  stream >> key >> symbol >> value >> symbol;
497 
498  madara::Cid::LatencyVector & cur_summation = summations_map[key];
499 
500  if (cur_summation.size () == processes)
501  {
502  // if the array had already been created, sort by Id
503  std::sort (cur_summation.begin (), cur_summation.end (),
504  madara::Cid::IncreasingId);
505  }
506  else
507  {
508  // if the size of this entry is 0, create a new processes-length
509  // summation for this degree, and set each element to the
510  // corresponding id of that process (essentially sorting by Id)
511  cur_summation.resize (processes);
512  for (unsigned int i = 0; i < processes; ++i)
513  cur_summation[i].first = i;
514  }
515 
516  // make a quick check to see if these values are indeed useful
517  if (source < processes && cur_summation[source].second != value)
518  {
519  cur_summation[source].second = value;
520  }
521  }
522  }
523 
528  void read_dataflow (const std::string & filename)
529  {
530  ContextGuard guard (mutex);
531  latencies.network_summations.clear ();
532  madara::Cid::read_deployment (latencies,
534  }
535 
536  void read_solution (const std::string & source,
537  const std::string & my_host_port)
538  {
539  ContextGuard guard (mutex);
540  unsigned int id;
541  std::string host_port;
542 
543  std::vector <std::string> tokens;
544  std::vector <std::string> splitters;
545  std::vector <std::string> pivot_list;
546 
547  tokens.resize (2);
548  tokens[0] = ";";
549  tokens[1] = "=";
550 
551  madara::utility::tokenizer (source, splitters, tokens, pivot_list);
552 
553  for (std::vector <std::string>::size_type i = 0;
554  i + 1 < tokens.size (); i += 2)
555  {
556  std::stringstream buffer (tokens[i]);
557  buffer >> id;
558 
559  latencies.ids[id] = tokens[i + 1];
560  }
561  }
562 
567  void run_cid (unsigned int index)
568  {
569  ContextGuard guard (mutex);
570 
571  latencies.solution_lookup.clear ();
572  madara::Cid::reset_solution (latencies);
573 
574  madara::Cid::approximate (latencies);
575 
576  latencies.results[index].algorithm = madara::Cid::CID;
577  latencies.results[index].deployment = madara::Cid::stringify_solution (latencies);
578  latencies.results[index].latency =
579  madara::Cid::calculate_latency (latencies);
580  }
581 
586  void run_bcid (unsigned int index)
587  {
588  ContextGuard guard (mutex);
589 
590  latencies.solution_lookup.clear ();
591  madara::Cid::reset_solution (latencies);
592 
593  madara::Cid::fill_by_highest_degree (latencies);
594 
595  latencies.results[index].algorithm = madara::Cid::BCID;
596  latencies.results[index].deployment =
597  madara::Cid::stringify_solution (latencies);
598  latencies.results[index].latency =
599  madara::Cid::calculate_latency (latencies);
600  }
601 
606  void run_bcid_bga (unsigned int index)
607  {
608  ContextGuard guard (mutex);
609 
610  latencies.solution_lookup.clear ();
611  madara::Cid::reset_solution (latencies);
612 
613  madara::Cid::fill_by_highest_degree (latencies);
614  madara::Cid::ga_naive (latencies,
615  latencies.algorithm_configs[index].time);
616 
617  latencies.results[index].algorithm = madara::Cid::BCID_BGA;
618  latencies.results[index].deployment =
619  madara::Cid::stringify_solution (latencies);
620  latencies.results[index].latency =
621  madara::Cid::calculate_latency (latencies);
622  }
623 
628  void run_bcid_gga (unsigned int index)
629  {
630  ContextGuard guard (mutex);
631 
632  latencies.solution_lookup.clear ();
633  madara::Cid::reset_solution (latencies);
634 
635  madara::Cid::fill_by_highest_degree (latencies);
636  madara::Cid::ga_degree (latencies,
637  latencies.algorithm_configs[index].time);
638 
639  latencies.results[index].algorithm = madara::Cid::BCID_GGA;
640  latencies.results[index].deployment =
641  madara::Cid::stringify_solution (latencies);
642  latencies.results[index].latency =
643  madara::Cid::calculate_latency (latencies);
644  }
645 
650  void run_cid_bga (unsigned int index)
651  {
652  ContextGuard guard (mutex);
653 
654  latencies.solution_lookup.clear ();
655  madara::Cid::reset_solution (latencies);
656 
657  madara::Cid::approximate (latencies);
658  madara::Cid::ga_naive(latencies,
659  latencies.algorithm_configs[index].time);
660 
661  latencies.results[index].algorithm = madara::Cid::CID_BGA;
662  latencies.results[index].deployment =
663  madara::Cid::stringify_solution (latencies);
664  latencies.results[index].latency =
665  madara::Cid::calculate_latency (latencies);
666  }
667 
672  void run_cid_gga (unsigned int index)
673  {
674  ContextGuard guard (mutex);
675 
676  latencies.solution_lookup.clear ();
677  madara::Cid::reset_solution (latencies);
678 
679  madara::Cid::approximate (latencies);
680  madara::Cid::ga_degree(latencies,
681  latencies.algorithm_configs[index].time);
682 
683  latencies.results[index].algorithm = madara::Cid::CID_GGA;
684  latencies.results[index].deployment =
685  madara::Cid::stringify_solution (latencies);
686  latencies.results[index].latency =
687  madara::Cid::calculate_latency (latencies);
688  }
689 
694  void run (unsigned int index)
695  {
696  int algorithm = latencies.algorithm_configs[index].algorithm;
697  if ( algorithm == madara::Cid::CID)
698  run_cid (index);
699  else if (algorithm == madara::Cid::BCID)
700  run_bcid (index);
701  else if (algorithm == madara::Cid::BCID_GGA)
702  run_bcid_gga (index);
703  else if (algorithm == madara::Cid::BCID_BGA)
704  run_bcid_bga (index);
705  else if (algorithm == madara::Cid::CID_BGA)
706  run_cid_bga (index);
707  else if (algorithm == madara::Cid::CID_GGA)
708  run_cid_gga (index);
709  }
710 
714  void run_all (void)
715  {
716  for (unsigned int i = 0; i < latencies.algorithm_configs.size (); ++i)
717  run (i);
718 
719  std::sort (latencies.results.begin (), latencies.results.end (),
720  madara::Cid::IncreasingAlgorithmLatency);
721  }
722 
723 #endif // _USE_CID_
724 
730  virtual void load (const std::string & filename,
731  const std::string & prefix = "transport");
732 
738  virtual void load_text (const std::string & filename,
739  const std::string & prefix = "transport");
740 
746  virtual void save (const std::string & filename,
747  const std::string & prefix = "transport") const;
748 
754  virtual void save_text (const std::string & filename,
755  const std::string & prefix = "transport") const;
756 
758 
763 
765  uint32_t read_threads;
766 
768  uint32_t queue_length;
769 
771  uint32_t type;
772 
775 
778 
788 
791  uint32_t reliability;
792 
795  uint32_t id;
796 
799  uint32_t processes;
800 
803 
806 
809 
812 
815 
817  double slack_time;
818 
824 
825 #ifdef _USE_CID_
826  bool latency_enabled;
828 
830  double latency_timeout;
831 
834  uint64_t latency_default;
835 
837  MADARA_LOCK_TYPE mutex;
838 
840  LatencyTransportSettings latencies;
841 
843  Timers timers;
844 
846  uint32_t num_responses;
847 
849  uint32_t num_summations;
850 
852  uint32_t num_voters;
853 
855  uint32_t num_votes_received;
856 
858  double redeployment_percentage_allowed;
859 
861  Voters voters;
862 #endif // _USE_CID_
863 
870  std::vector <std::string> hosts;
871 
876 
881 
882  private:
883 
887  std::map <std::string, int> read_domains_;
888  };
889 
890  inline std::string
891  type_name (const TransportSettings & settings)
892  {
893  std::string name = "none";
894 
895  switch (settings.type)
896  {
897  case 0:
898  name = "None";
899  break;
900  case 1:
901  name = "PrismTech DDS";
902  break;
903  case 2:
904  name = "RTI DDS";
905  break;
906  case 3:
907  name = "TCP (unsupported)";
908  break;
909  case 4:
910  name = "UDP Unicast";
911  break;
912  case 5:
913  name = "UDP Multicast";
914  break;
915  case 6:
916  name = "UDP Broadcast";
917  break;
918  case 7:
919  name = "UDP P2P Registry Server";
920  break;
921  case 8:
922  name = "UDP P2P Registry Client";
923  break;
924  case 9:
925  name = "ZeroMQ Pub/Sub";
926  break;
927  }
928 
929  return name;
930  }
931  }
932 }
933 
934 #include "TransportSettings.inl"
935 
936 #endif // _MADARA_TRANSPORT_SETTINGS_H_
std::vector< Timer > Timers
OriginatorFragmentMap fragment_map
map of fragments received by originator
ACE_Guard< MADARA_LOCK_TYPE > ContextGuard
uint32_t fragment_queue_length
Indicates queue length for holding clock-keyed fragments.
std::vector< std::string > hosts
Host information for transports that require it.
double read_thread_hertz
number of valid messages allowed to be received per second.
Holds basic transport settings.
uint32_t type
Type of transport. See madara::transport::Types for options.
uint32_t id
the id of this process.
std::map< std::string, ClockFragmentMap > OriginatorFragmentMap
Map of originator to a map of clocks to fragments.
uint32_t processes
number of processes expected in the network (best to overestimate if building latency tables ...
bool no_receiving
if true, never receive over transport
std::map< std::string, int > read_domains_
Any acceptable read domain is added here.
std::string write_domain
All class members are accessible to users for easy setup.
bool no_sending
if true, never send over transport
bool send_reduced_message_header
send the reduced message header (clock, size, updates, KaRL id)
std::string on_data_received_logic
logic to be evaluated after every successful update
double slack_time
time to sleep between sends and rebroadcasts
static constexpr struct madara::knowledge::tags::string_t string
uint32_t read_threads
the number of read threads to start
uint32_t queue_length
Length of the buffer used to store history of events.
ACE_High_Res_Timer Timer
std::vector< std::string > Voters
uint32_t reliability
Reliability required of the transport.
Copyright (c) 2015 Carnegie Mellon University.
MADARA_Export void tokenizer(const std::string &input, const ::std::vector< std::string > &splitters,::std::vector< std::string > &tokens,::std::vector< std::string > &pivot_list)
Splits an input string into tokens.
Definition: Utility.cpp:291
Base class from which all transports must be derived.
Definition: Transport.h:62
bool never_exit
prevent MADARA from exiting on fatal errors and invalid state
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
std::string type_name(const TransportSettings &settings)
int resend_attempts
Maximum number of attempts to resend if transport is busy.
bool delay_launch
delay launching transports
MADARA_Export std::string clean_dir_name(const std::string &target)
Substitutes the appropriate directory delimiter, which may help with portability between operating sy...
Definition: Utility.cpp:560