MADARA  3.1.8
TransportSettings.cpp
Go to the documentation of this file.
1 #include "TransportSettings.h"
2 #include "Fragmentation.h"
6 
9 
11  write_domain (DEFAULT_DOMAIN),
12  read_threads (1),
13  queue_length (DEFAULT_QUEUE_LENGTH),
14  type (DEFAULT_TRANSPORT),
15  max_fragment_size (62000),
16  resend_attempts (MAXIMUM_RESEND_ATTEMPTS),
17  fragment_queue_length (5),
18  reliability (DEFAULT_RELIABILITY),
19  id (DEFAULT_ID),
20  processes (DEFAULT_PROCESSES),
21  on_data_received_logic (),
22  delay_launch (false),
23  never_exit (false),
24  send_reduced_message_header (false),
25  slack_time (0),
26  read_thread_hertz (0.0),
27 
28 #ifdef _USE_CID_
29  latency_enabled (DEFAULT_LATENCY_ENABLED),
30  latency_timeout (DEFAULT_LATENCY_TIMEOUT),
31  latency_default (DEFAULT_LATENCY),
32  num_responses (0),
33  num_summations (0),
34  num_voters (0),
35  num_votes_received (0),
36  redeployment_percentage_allowed (DEFAULT_REDEPLOYMENT_PERCENTAGE),
37 #endif // _USE_CID_
38 
39  hosts (),
40  no_sending (false),
41  no_receiving (false)
42 {
43 }
44 
46  const TransportSettings & settings) :
47  write_domain (settings.write_domain),
48  read_threads (settings.read_threads),
49  queue_length (settings.queue_length),
50  type (settings.type),
54  reliability (settings.reliability),
55  id (settings.id),
56  processes (settings.processes),
58  delay_launch (settings.delay_launch),
59  never_exit (settings.never_exit),
61  slack_time (settings.slack_time),
63 
64 #ifdef _USE_CID_
65 
66  latency_enabled (settings.latency_enabled),
67  latency_timeout (settings.latency_timeout),
68  latency_default (settings.latency_default),
69  latencies (settings.latencies),
70  num_responses (0),
71  num_summations (0),
72  num_voters (settings.num_voters),
73  num_votes_received (0),
74  redeployment_percentage_allowed (
75  settings.redeployment_percentage_allowed),
76 #endif // _USE_CID_
77  hosts (),
78  no_sending (settings.no_sending),
79  no_receiving (settings.no_receiving),
80  read_domains_ (settings.read_domains_)
81 {
82  hosts.resize (settings.hosts.size ());
83  for (unsigned int i = 0; i < settings.hosts.size (); ++i)
84  hosts[i] = settings.hosts[i];
85 }
86 
87 void
89  const TransportSettings & settings)
90 {
91  read_threads = settings.read_threads;
92  write_domain = settings.write_domain;
93  read_domains_ = settings.read_domains_;
94  queue_length = settings.queue_length;
95  type = settings.type;
96  max_fragment_size = settings.max_fragment_size;
97  resend_attempts = settings.resend_attempts;
98  fragment_queue_length = settings.fragment_queue_length;
99  reliability = settings.reliability;
100  id = settings.id;
101  processes = settings.processes;
102 
103  on_data_received_logic = settings.on_data_received_logic;
104  delay_launch = settings.delay_launch;
105  never_exit = settings.never_exit;
106 
107  send_reduced_message_header = settings.send_reduced_message_header;
108  slack_time = settings.slack_time;
109  read_thread_hertz = settings.read_thread_hertz;
110 
111 #ifdef _USE_CID_
112  latency_enabled = settings.latency_enabled;
113  latency_timeout = settings.latency_timeout;
114  latency_default = settings.latency_default;
115  latencies = settings.latencies;
116  num_responses = 0;
117  num_summations = 0;
118  num_voters = settings.num_voters;
119  num_votes_received = 0;
120  redeployment_percentage_allowed =
121  settings.redeployment_percentage_allowed;
122 #endif // _USE_CID_
123 
124  hosts.resize (settings.hosts.size ());
125  for (unsigned int i = 0; i < settings.hosts.size (); ++i)
126  hosts[i] = settings.hosts[i];
127 
128  no_sending = settings.no_sending;
129  no_receiving = settings.no_receiving;
130 }
131 
133 {
134  for (OriginatorFragmentMap::iterator originator = fragment_map.begin ();
135  originator != fragment_map.end (); ++originator)
136  {
137  for (ClockFragmentMap::iterator clock = originator->second.begin ();
138  clock != originator->second.end (); ++clock)
139  {
140  delete_fragments (clock->second);
141  }
142  }
143 }
144 
145 void
147  const std::string & prefix)
148 {
150  knowledge.load_context (filename);
151 
152  read_threads = (uint32_t) knowledge.get (prefix + ".read_threads").to_integer ();
153  write_domain = knowledge.get (prefix + ".write_domain").to_string ();
154  queue_length = (uint32_t)knowledge.get (prefix + ".queue_length").to_integer ();
155  type = (uint32_t)knowledge.get (prefix + ".type").to_integer ();
156  max_fragment_size = (uint32_t)knowledge.get (prefix + ".max_fragment_size").to_integer ();
157  resend_attempts = (uint32_t)knowledge.get (prefix + ".resend_attempts").to_integer ();
158  fragment_queue_length = (uint32_t)knowledge.get (prefix + ".fragment_queue_length").to_integer ();
159  reliability = (uint32_t)knowledge.get (prefix + ".reliability").to_integer ();
160  id = (uint32_t)knowledge.get (prefix + ".id").to_integer ();
161  processes = (uint32_t)knowledge.get (prefix + ".processes").to_integer ();
162 
163  on_data_received_logic = knowledge.get (prefix + ".on_data_received_logic").to_string ();
164  delay_launch = knowledge.get (prefix + ".delay_launch").is_true ();
165  never_exit = knowledge.get (prefix + ".never_exit").is_true ();
166 
167  send_reduced_message_header = knowledge.get (prefix + ".send_reduced_message_header").is_true ();
168  slack_time = knowledge.get (prefix + ".slack_time").to_double ();
169  read_thread_hertz = knowledge.get (prefix + ".read_thread_hertz").to_double ();
170 
171  containers::StringVector kb_hosts (prefix + ".hosts", knowledge);
172 
173  hosts.resize (kb_hosts.size ());
174  for (unsigned int i = 0; i < hosts.size (); ++i)
175  hosts[i] = kb_hosts[i];
176 
177 
178  containers::Map kb_read_domains (prefix + ".read_domains", knowledge);
179 
180  std::vector <std::string> keys;
181  kb_read_domains.keys (keys);
182 
183  for (unsigned int i = 0; i < keys.size (); ++i)
184  {
185  read_domains_[keys[i]] = 1;
186  }
187 
188  no_sending = knowledge.get (prefix + ".no_sending").is_true ();
189  no_receiving = knowledge.get (prefix + ".no_receiving").is_true ();
190 }
191 
192 void
194  const std::string & prefix)
195 {
197  knowledge.evaluate (madara::utility::file_to_string (filename));
198 
199  read_threads = (uint32_t) knowledge.get (prefix + ".read_threads").to_integer ();
200  write_domain = knowledge.get (prefix + ".write_domain").to_string ();
201  queue_length = (uint32_t)knowledge.get (prefix + ".queue_length").to_integer ();
202  type = (uint32_t)knowledge.get (prefix + ".type").to_integer ();
203  max_fragment_size = (uint32_t)knowledge.get (prefix + ".max_fragment_size").to_integer ();
204  resend_attempts = (uint32_t)knowledge.get (prefix + ".resend_attempts").to_integer ();
205  fragment_queue_length = (uint32_t)knowledge.get (prefix + ".fragment_queue_length").to_integer ();
206  reliability = (uint32_t)knowledge.get (prefix + ".reliability").to_integer ();
207  id = (uint32_t)knowledge.get (prefix + ".id").to_integer ();
208  processes = (uint32_t)knowledge.get (prefix + ".processes").to_integer ();
209 
210  on_data_received_logic = knowledge.get (prefix + ".on_data_received_logic").to_string ();
211  delay_launch = knowledge.get (prefix + ".delay_launch").is_true ();
212  never_exit = knowledge.get (prefix + ".never_exit").is_true ();
213 
214  send_reduced_message_header = knowledge.get (prefix + ".send_reduced_message_header").is_true ();
215  slack_time = knowledge.get (prefix + ".slack_time").to_double ();
216  read_thread_hertz = knowledge.get (prefix + ".read_thread_hertz").to_double ();
217 
218  containers::StringVector kb_hosts (prefix + ".hosts", knowledge);
219 
220  hosts.resize (kb_hosts.size ());
221  for (unsigned int i = 0; i < hosts.size (); ++i)
222  hosts[i] = kb_hosts[i];
223 
224 
225  containers::Map kb_read_domains (prefix + ".read_domains", knowledge);
226 
227  std::vector <std::string> keys;
228  kb_read_domains.keys (keys);
229 
230  for (unsigned int i = 0; i < keys.size (); ++i)
231  {
232  read_domains_[keys[i]] = 1;
233  }
234 
235  no_sending = knowledge.get (prefix + ".no_sending").is_true ();
236  no_receiving = knowledge.get (prefix + ".no_receiving").is_true ();
237 }
238 
239 void
241  const std::string & prefix) const
242 {
244 
245  // load what exists at the file so we can append/overwrite it
246  knowledge.load_context (madara::utility::file_to_string (filename));
247 
248  containers::StringVector kb_hosts (prefix + ".hosts", knowledge,
249  (int)hosts.size ());
250 
251  knowledge.set (prefix + ".read_threads", Integer (read_threads));
252  knowledge.set (prefix + ".write_domain", write_domain);
253  knowledge.set (prefix + ".queue_length", Integer (queue_length));
254  knowledge.set (prefix + ".type", Integer (type));
255  knowledge.set (prefix + ".max_fragment_size", Integer (max_fragment_size));
256  knowledge.set (prefix + ".resend_attempts", Integer (resend_attempts));
257  knowledge.set (prefix + ".fragment_queue_length",
258  Integer (fragment_queue_length));
259  knowledge.set (prefix + ".reliability", Integer (reliability));
260  knowledge.set (prefix + ".id", Integer (id));
261  knowledge.set (prefix + ".processes", Integer (processes));
262 
263  knowledge.set (prefix + ".on_data_received_logic", on_data_received_logic);
264  knowledge.set (prefix + ".delay_launch", Integer (delay_launch));
265  knowledge.set (prefix + ".never_exit", Integer (never_exit));
266 
267  knowledge.set (prefix + ".send_reduced_message_header",
268  Integer (send_reduced_message_header));
269  knowledge.set (prefix + ".slack_time", slack_time);
270  knowledge.set (prefix + ".read_thread_hertz", read_thread_hertz);
271 
272  for (size_t i = 0; i < hosts.size (); ++i)
273  kb_hosts.set (i, hosts[i]);
274 
275  knowledge.set (prefix + ".no_sending", Integer (no_sending));
276  knowledge.set (prefix + ".no_receiving", Integer (no_receiving));
277 
278  knowledge::containers::Map kb_read_domains (prefix + ".read_domains", knowledge);
279  for (std::map <std::string, int>::const_iterator i = read_domains_.begin ();
280  i != read_domains_.end (); ++i)
281  {
282  kb_read_domains.set (i->first,
284  }
285 
286  knowledge.save_context (filename);
287 }
288 
289 void
291  const std::string & prefix) const
292 {
294 
295  // load what exists at the file so we can append/overwrite it
296  knowledge.evaluate (madara::utility::file_to_string (filename));
297 
298  containers::StringVector kb_hosts (prefix + ".hosts", knowledge,
299  (int)hosts.size ());
300 
301  knowledge.set (prefix + ".read_threads", Integer (read_threads));
302  knowledge.set (prefix + ".write_domain", write_domain);
303  knowledge.set (prefix + ".queue_length", Integer (queue_length));
304  knowledge.set (prefix + ".type", Integer (type));
305  knowledge.set (prefix + ".max_fragment_size", Integer (max_fragment_size));
306  knowledge.set (prefix + ".resend_attempts", Integer (resend_attempts));
307  knowledge.set (prefix + ".fragment_queue_length",
308  Integer (fragment_queue_length));
309  knowledge.set (prefix + ".reliability", Integer (reliability));
310  knowledge.set (prefix + ".id", Integer (id));
311  knowledge.set (prefix + ".processes", Integer (processes));
312 
313  knowledge.set (prefix + ".on_data_received_logic", on_data_received_logic);
314  knowledge.set (prefix + ".delay_launch", Integer (delay_launch));
315  knowledge.set (prefix + ".never_exit", Integer (never_exit));
316 
317  knowledge.set (prefix + ".send_reduced_message_header",
318  Integer (send_reduced_message_header));
319  knowledge.set (prefix + ".slack_time", slack_time);
320  knowledge.set (prefix + ".read_thread_hertz", read_thread_hertz);
321 
322  for (size_t i = 0; i < hosts.size (); ++i)
323  kb_hosts.set (i, hosts[i]);
324 
325  knowledge.set (prefix + ".no_sending", Integer (no_sending));
326  knowledge.set (prefix + ".no_receiving", Integer (no_receiving));
327 
328  knowledge::containers::Map kb_read_domains (prefix + ".read_domains", knowledge);
329  for (std::map <std::string, int>::const_iterator i = read_domains_.begin ();
330  i != read_domains_.end (); ++i)
331  {
332  kb_read_domains.set (i->first,
334  }
335 
336  knowledge.save_as_karl (filename);
337 }
virtual void load_text(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a text file.
bool is_true(void) const
Checks to see if the record is true.
#define DEFAULT_ID
Default id in group.
int64_t save_context(const std::string &filename) const
Saves the context to a file.
double to_double(void) const
converts the value to a float/double
void keys(std::vector< std::string > &curkeys) const
Returns the keys within the map.
Definition: Map.cpp:512
uint32_t fragment_queue_length
Indicates queue length for holding clock-keyed fragments.
std::vector< std::string > hosts
Host information for transports that require it.
double read_thread_hertz
number of valid messages allowed to be received per second.
Holds basic transport settings.
madara::knowledge::KnowledgeRecord::Integer Integer
madara::knowledge::KnowledgeRecord get(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings(false))
Retrieves a knowledge value.
#define MAXIMUM_RESEND_ATTEMPTS
Default number of processes in group.
uint32_t type
Type of transport. See madara::transport::Types for options.
virtual void save(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a binary file.
Provides container classes for fast knowledge base access and mutation.
Definition: Barrier.h:27
uint32_t id
the id of this process.
This class stores a vector of strings inside of KaRL.
Definition: StringVector.h:31
int set(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value=madara::knowledge::KnowledgeRecord::MODIFIED)
Sets a location within the map to the specified value.
Definition: Map.cpp:601
virtual void save_text(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a text file.
uint32_t processes
number of processes expected in the network (best to overestimate if building latency tables ...
bool no_receiving
if true, never receive over transport
std::map< std::string, int > read_domains_
Any acceptable read domain is added here.
#define DEFAULT_DOMAIN
Default knowledge domain.
TransportSettings()
Constructor for this class.
std::string write_domain
All class members are accessible to users for easy setup.
This class stores a map of strings to KaRL variables.
Definition: Map.h:32
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:44
madara::knowledge::KnowledgeRecord::Integer Integer
bool no_sending
if true, never send over transport
bool send_reduced_message_header
send the reduced message header (clock, size, updates, KaRL id)
int64_t save_as_karl(const std::string &filename) const
Saves the context to a file as karl assignments, rather than binary.
Integer to_integer(void) const
converts the value to an integer
std::string on_data_received_logic
logic to be evaluated after every successful update
double slack_time
time to sleep between sends and rebroadcasts
static constexpr struct madara::knowledge::tags::string_t string
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
uint32_t read_threads
the number of read threads to start
#define DEFAULT_PROCESSES
Default number of processes in group.
uint32_t queue_length
Length of the buffer used to store history of events.
int set(const VariableReference &variable, const std::string &value, const EvalSettings &settings=EvalSettings(false, false, true, false, false))
Atomically sets the value of a variable to a string.
Provides functions and classes for the distributed knowledge base.
MADARA_Export std::string file_to_string(const std::string &filename)
Reads a file into a string.
Definition: Utility.cpp:450
madara::knowledge::KnowledgeRecord evaluate(const std::string &expression, const EvalSettings &settings=EvalSettings())
Evaluates an expression.
uint32_t reliability
Reliability required of the transport.
virtual void load(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a binary file.
int64_t load_context(const std::string &filename, bool use_id=true, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true, true, true, false))
Loads the context from a file.
void operator=(const TransportSettings &settings)
Assignment operator.
std::string to_string(const std::string &delimiter=", ") const
converts the value to a string.
bool never_exit
prevent MADARA from exiting on fatal errors and invalid state
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
int resend_attempts
Maximum number of attempts to resend if transport is busy.
bool delay_launch
delay launching transports
size_t size(void) const
Returns the size of the local vector.