1 #ifndef _MADARA_TRANSPORT_SETTINGS_H_ 2 #define _MADARA_TRANSPORT_SETTINGS_H_ 21 #include "ace/Thread_Mutex.h" 22 #include "ace/Recursive_Thread_Mutex.h" 23 #include "ace/Condition_T.h" 24 #include "ace/Guard_T.h" 26 #include "ace/High_Res_Timer.h" 30 #include "madara/cid/CIDTransportSettings.h" 31 #include "madara/cid/CIDConvenience.h" 32 #include "madara/cid/CIDHeuristic.h" 33 #include "madara/cid/CIDGenetic.h" 38 #include "madara/LockType.h" 43 #include "madara/MADARA_export.h" 53 typedef ACE_High_Res_Timer
Timer;
57 typedef madara::Cid::TransportSettings LatencyTransportSettings;
102 #define DEFAULT_DOMAIN "KaRL" 104 typedef std::vector <std::string>
Voters;
108 static const uint32_t DEFAULT_QUEUE_LENGTH = 500000;
111 static const uint32_t DEFAULT_DEADLINE = 0;
117 static const uint32_t DEFAULT_RELIABILITY =
RELIABLE;
127 #define DEFAULT_PROCESSES 1 132 #define MAXIMUM_RESEND_ATTEMPTS 10 139 #define DEFAULT_LATENCY_ENABLED false 144 #define DEFAULT_LATENCY_TIMEOUT 10.0 149 #define DEFAULT_LATENCY 200000000000 154 #define DEFAULT_REDEPLOYMENT_PERCENTAGE 0.10 159 TransportSettings ();
162 TransportSettings (
const TransportSettings & settings);
164 virtual ~TransportSettings ();
167 void operator= (
const TransportSettings & settings);
178 void clear_read_domains (
void);
184 void get_read_domains (std::vector<std::string> & domains)
const;
191 bool is_reading_domain (
const std::string domain)
const;
197 size_t num_read_domains (
void)
const;
203 inline void reset_timers (
void)
205 ContextGuard guard (mutex);
207 for (uint32_t i = 0; i < processes; ++i)
209 madara::Cid::reset_latencies (latencies, latency_default);
215 inline void start_all_timers (
void)
217 ContextGuard guard (mutex);
221 for (
unsigned int i = 0; i < processes; ++i)
228 inline void stop_timer (
int index,
bool roundtrip =
true)
230 ACE_hrtime_t measured;
231 ContextGuard guard (mutex);
233 timers[index].stop ();
234 timers[index].elapsed_time (measured);
236 latencies.network_latencies[id][index].second = roundtrip ?
237 (uint64_t) measured / 2 : (uint64_t) measured;
244 inline
void setup (
void)
246 ContextGuard guard (mutex);
250 madara::Cid::init (processes, latencies);
251 timers.resize (processes);
253 madara::Cid::reset_latencies (latencies, latency_default);
261 inline void setup (madara::Cid::AlgorithmConfigs & configs)
263 ContextGuard guard (mutex);
265 latencies.algorithm_configs = configs;
266 latencies.results.resize (configs.size ());
272 void print_my_latencies (std::ostream & output)
276 std::stringstream buffer;
279 madara::Cid::Identifiers & ids = latencies.ids;
280 madara::Cid::LatencyVector & current = latencies.network_latencies[id];
282 buffer <<
"Latencies for id = " <<
id << std::endl;
285 for (uint32_t i = 0; i < processes; ++i)
287 buffer << ids[i] <<
" = " << current[i].second << std::endl;
292 output << buffer.str ();
298 void print_all_latencies (std::ostream & output)
302 std::stringstream buffer;
305 madara::Cid::Identifiers & ids = latencies.ids;
307 buffer <<
"\nAll latencies in the context:\n\n";
310 for (uint32_t i = 0; i < processes; ++i)
312 madara::Cid::LatencyVector & current = latencies.network_latencies[i];
313 for (uint32_t j = 0; j < processes; ++j)
315 buffer << ids[i] <<
" to " << ids[j] <<
316 " = " << current[j].second << std::endl;
323 output << buffer.str ();
329 void print_all_summations (std::ostream & output)
333 std::stringstream buffer;
336 madara::Cid::Identifiers & ids = latencies.ids;
337 madara::Cid::SummationsMap & summation_map =
338 latencies.network_summations;
340 buffer <<
"\nAll summations in the context:\n\n";
343 for (madara::Cid::SummationsMap::iterator i = summation_map.begin ();
344 i != summation_map.end (); ++i)
346 buffer <<
"Degree = " << i->first << std::endl;
347 madara::Cid::LatencyVector & current = i->second;
351 std::sort (current.begin (), current.end (),
352 madara::Cid::IncreasingLatency);
354 for (madara::Cid::LatencyVector::size_type j = 0;
355 j < current.size (); ++j)
357 buffer <<
" " << current[j].first <<
" = " <<
358 current[j].second <<
"\n";
365 output << buffer.str ();
371 void print_all_results (std::ostream & output)
375 std::stringstream buffer;
378 madara::Cid::Identifiers & ids = latencies.ids;
379 madara::Cid::AlgorithmConfigs & configs = latencies.algorithm_configs;
380 madara::Cid::AlgorithmResults & results = latencies.results;
382 buffer <<
"\nAll redeployment algorithm results in the context:\n\n";
385 for (madara::Cid::AlgorithmConfigs::size_type i = 0;
386 i < configs.size (); ++i)
388 if ( results[i].algorithm == madara::Cid::CID)
390 else if (results[i].algorithm == madara::Cid::BCID)
392 else if (results[i].algorithm == madara::Cid::BCID_GGA)
393 buffer <<
"BCID-GGA (" << configs[i].time <<
"),";
394 else if (results[i].algorithm == madara::Cid::BCID_BGA)
395 buffer <<
"BCID-BGA (" << configs[i].time <<
"),";
396 else if (results[i].algorithm == madara::Cid::CID_BGA)
397 buffer <<
"CID-BGA (" << configs[i].time <<
"),";
398 else if (results[i].algorithm == madara::Cid::CID_GGA)
399 buffer <<
"CID-GGA (" << configs[i].time <<
"),";
401 buffer << results[i].latency <<
"\n ";
402 buffer << results[i].deployment <<
"\n";
410 output << buffer.str ();
419 std::stringstream buffer;
420 ContextGuard guard (mutex);
421 madara::Cid::LatencyVector & current = latencies.network_latencies[id];
424 for (uint32_t i = 0; i < processes; ++i)
426 buffer << i <<
"=" << current[i].second <<
";";
429 return buffer.str ();
437 void unaggregate_latencies (uint32_t source,
440 std::stringstream stream (aggregation);
441 ContextGuard guard (mutex);
442 madara::Cid::LatencyVector & current = latencies.network_latencies[source];
450 for (
unsigned int i = 0; !stream.eof (); ++i)
452 stream >> key >> symbol >> value >> symbol;
457 current[i].first = key;
458 current[i].second = value;
468 ContextGuard guard (mutex);
470 return madara::Cid::prepare_summations (
id, latencies);
478 void unpack_summations (uint32_t source,
481 std::stringstream stream (summations);
482 ContextGuard guard (mutex);
483 madara::Cid::LatencyVector & current =
484 latencies.network_latencies[source];
485 madara::Cid::SummationsMap & summations_map =
486 latencies.network_summations;
494 while (!stream.eof ())
496 stream >> key >> symbol >> value >> symbol;
498 madara::Cid::LatencyVector & cur_summation = summations_map[key];
500 if (cur_summation.size () == processes)
503 std::sort (cur_summation.begin (), cur_summation.end (),
504 madara::Cid::IncreasingId);
511 cur_summation.resize (processes);
512 for (
unsigned int i = 0; i < processes; ++i)
513 cur_summation[i].first = i;
517 if (source < processes && cur_summation[source].second != value)
519 cur_summation[source].second = value;
530 ContextGuard guard (mutex);
531 latencies.network_summations.clear ();
532 madara::Cid::read_deployment (latencies,
539 ContextGuard guard (mutex);
543 std::vector <std::string> tokens;
544 std::vector <std::string> splitters;
545 std::vector <std::string> pivot_list;
553 for (std::vector <std::string>::size_type i = 0;
554 i + 1 < tokens.size (); i += 2)
556 std::stringstream buffer (tokens[i]);
559 latencies.ids[id] = tokens[i + 1];
567 void run_cid (
unsigned int index)
569 ContextGuard guard (mutex);
571 latencies.solution_lookup.clear ();
572 madara::Cid::reset_solution (latencies);
574 madara::Cid::approximate (latencies);
576 latencies.results[index].algorithm = madara::Cid::CID;
577 latencies.results[index].deployment = madara::Cid::stringify_solution (latencies);
578 latencies.results[index].latency =
579 madara::Cid::calculate_latency (latencies);
586 void run_bcid (
unsigned int index)
588 ContextGuard guard (mutex);
590 latencies.solution_lookup.clear ();
591 madara::Cid::reset_solution (latencies);
593 madara::Cid::fill_by_highest_degree (latencies);
595 latencies.results[index].algorithm = madara::Cid::BCID;
596 latencies.results[index].deployment =
597 madara::Cid::stringify_solution (latencies);
598 latencies.results[index].latency =
599 madara::Cid::calculate_latency (latencies);
606 void run_bcid_bga (
unsigned int index)
608 ContextGuard guard (mutex);
610 latencies.solution_lookup.clear ();
611 madara::Cid::reset_solution (latencies);
613 madara::Cid::fill_by_highest_degree (latencies);
614 madara::Cid::ga_naive (latencies,
615 latencies.algorithm_configs[index].time);
617 latencies.results[index].algorithm = madara::Cid::BCID_BGA;
618 latencies.results[index].deployment =
619 madara::Cid::stringify_solution (latencies);
620 latencies.results[index].latency =
621 madara::Cid::calculate_latency (latencies);
628 void run_bcid_gga (
unsigned int index)
630 ContextGuard guard (mutex);
632 latencies.solution_lookup.clear ();
633 madara::Cid::reset_solution (latencies);
635 madara::Cid::fill_by_highest_degree (latencies);
636 madara::Cid::ga_degree (latencies,
637 latencies.algorithm_configs[index].time);
639 latencies.results[index].algorithm = madara::Cid::BCID_GGA;
640 latencies.results[index].deployment =
641 madara::Cid::stringify_solution (latencies);
642 latencies.results[index].latency =
643 madara::Cid::calculate_latency (latencies);
650 void run_cid_bga (
unsigned int index)
652 ContextGuard guard (mutex);
654 latencies.solution_lookup.clear ();
655 madara::Cid::reset_solution (latencies);
657 madara::Cid::approximate (latencies);
658 madara::Cid::ga_naive(latencies,
659 latencies.algorithm_configs[index].time);
661 latencies.results[index].algorithm = madara::Cid::CID_BGA;
662 latencies.results[index].deployment =
663 madara::Cid::stringify_solution (latencies);
664 latencies.results[index].latency =
665 madara::Cid::calculate_latency (latencies);
672 void run_cid_gga (
unsigned int index)
674 ContextGuard guard (mutex);
676 latencies.solution_lookup.clear ();
677 madara::Cid::reset_solution (latencies);
679 madara::Cid::approximate (latencies);
680 madara::Cid::ga_degree(latencies,
681 latencies.algorithm_configs[index].time);
683 latencies.results[index].algorithm = madara::Cid::CID_GGA;
684 latencies.results[index].deployment =
685 madara::Cid::stringify_solution (latencies);
686 latencies.results[index].latency =
687 madara::Cid::calculate_latency (latencies);
694 void run (
unsigned int index)
696 int algorithm = latencies.algorithm_configs[index].algorithm;
697 if ( algorithm == madara::Cid::CID)
699 else if (algorithm == madara::Cid::BCID)
701 else if (algorithm == madara::Cid::BCID_GGA)
702 run_bcid_gga (index);
703 else if (algorithm == madara::Cid::BCID_BGA)
704 run_bcid_bga (index);
705 else if (algorithm == madara::Cid::CID_BGA)
707 else if (algorithm == madara::Cid::CID_GGA)
716 for (
unsigned int i = 0; i < latencies.algorithm_configs.size (); ++i)
719 std::sort (latencies.results.begin (), latencies.results.end (),
720 madara::Cid::IncreasingAlgorithmLatency);
738 virtual void load_text (
const std::string & filename,
754 virtual void save_text (
const std::string & filename,
826 bool latency_enabled;
830 double latency_timeout;
834 uint64_t latency_default;
837 MADARA_LOCK_TYPE mutex;
840 LatencyTransportSettings latencies;
846 uint32_t num_responses;
849 uint32_t num_summations;
855 uint32_t num_votes_received;
858 double redeployment_percentage_allowed;
895 switch (settings.
type)
901 name =
"PrismTech DDS";
907 name =
"TCP (unsupported)";
910 name =
"UDP Unicast";
913 name =
"UDP Multicast";
916 name =
"UDP Broadcast";
919 name =
"UDP P2P Registry Server";
922 name =
"UDP P2P Registry Client";
925 name =
"ZeroMQ Pub/Sub";
936 #endif // _MADARA_TRANSPORT_SETTINGS_H_
std::vector< Timer > Timers
OriginatorFragmentMap fragment_map
map of fragments received by originator
ACE_Guard< MADARA_LOCK_TYPE > ContextGuard
uint32_t fragment_queue_length
Indicates queue length for holding clock-keyed fragments.
std::vector< std::string > hosts
Host information for transports that require it.
double read_thread_hertz
number of valid messages allowed to be received per second.
Holds basic transport settings.
uint32_t type
Type of transport. See madara::transport::Types for options.
uint32_t id
the id of this process.
std::map< std::string, ClockFragmentMap > OriginatorFragmentMap
Map of originator to a map of clocks to fragments.
uint32_t processes
number of processes expected in the network (best to overestimate if building latency tables ...
bool no_receiving
if true, never receive over transport
std::map< std::string, int > read_domains_
Any acceptable read domain is added here.
std::string write_domain
All class members are accessible to users for easy setup.
bool no_sending
if true, never send over transport
bool send_reduced_message_header
send the reduced message header (clock, size, updates, KaRL id)
std::string on_data_received_logic
logic to be evaluated after every successful update
double slack_time
time to sleep between sends and rebroadcasts
uint32_t read_threads
the number of read threads to start
uint32_t queue_length
Length of the buffer used to store history of events.
std::vector< std::string > Voters
uint32_t reliability
Reliability required of the transport.
Copyright (c) 2015 Carnegie Mellon University.
MADARA_Export void tokenizer(const std::string &input, const ::std::vector< std::string > &splitters,::std::vector< std::string > &tokens,::std::vector< std::string > &pivot_list)
Splits an input string into tokens.
Base class from which all transports must be derived.
bool never_exit
prevent MADARA from exiting on fatal errors and invalid state
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
std::string type_name(const TransportSettings &settings)
int resend_attempts
Maximum number of attempts to resend if transport is busy.
bool delay_launch
delay launching transports
MADARA_Export std::string clean_dir_name(const std::string &target)
Substitutes the appropriate directory delimiter, which may help with portability between operating sy...