13 queue_length (DEFAULT_QUEUE_LENGTH),
14 type (DEFAULT_TRANSPORT),
15 max_fragment_size (62000),
17 fragment_queue_length (5),
18 reliability (DEFAULT_RELIABILITY),
21 on_data_received_logic (),
24 send_reduced_message_header (false),
26 read_thread_hertz (0.0),
29 latency_enabled (DEFAULT_LATENCY_ENABLED),
30 latency_timeout (DEFAULT_LATENCY_TIMEOUT),
31 latency_default (DEFAULT_LATENCY),
35 num_votes_received (0),
36 redeployment_percentage_allowed (DEFAULT_REDEPLOYMENT_PERCENTAGE),
66 latency_enabled (settings.latency_enabled),
67 latency_timeout (settings.latency_timeout),
68 latency_default (settings.latency_default),
69 latencies (settings.latencies),
72 num_voters (settings.num_voters),
73 num_votes_received (0),
74 redeployment_percentage_allowed (
75 settings.redeployment_percentage_allowed),
83 for (
unsigned int i = 0; i < settings.
hosts.size (); ++i)
112 latency_enabled = settings.latency_enabled;
113 latency_timeout = settings.latency_timeout;
114 latency_default = settings.latency_default;
115 latencies = settings.latencies;
118 num_voters = settings.num_voters;
119 num_votes_received = 0;
120 redeployment_percentage_allowed =
121 settings.redeployment_percentage_allowed;
124 hosts.resize (settings.
hosts.size ());
125 for (
unsigned int i = 0; i < settings.
hosts.size (); ++i)
126 hosts[i] = settings.
hosts[i];
134 for (OriginatorFragmentMap::iterator originator = fragment_map.begin ();
135 originator != fragment_map.end (); ++originator)
137 for (ClockFragmentMap::iterator clock = originator->second.begin ();
138 clock != originator->second.end (); ++clock)
152 read_threads = (uint32_t) knowledge.
get (prefix +
".read_threads").
to_integer ();
153 write_domain = knowledge.
get (prefix +
".write_domain").
to_string ();
154 queue_length = (uint32_t)knowledge.
get (prefix +
".queue_length").
to_integer ();
155 type = (uint32_t)knowledge.
get (prefix +
".type").
to_integer ();
156 max_fragment_size = (uint32_t)knowledge.
get (prefix +
".max_fragment_size").
to_integer ();
157 resend_attempts = (uint32_t)knowledge.
get (prefix +
".resend_attempts").
to_integer ();
158 fragment_queue_length = (uint32_t)knowledge.
get (prefix +
".fragment_queue_length").
to_integer ();
159 reliability = (uint32_t)knowledge.
get (prefix +
".reliability").
to_integer ();
161 processes = (uint32_t)knowledge.
get (prefix +
".processes").
to_integer ();
163 on_data_received_logic = knowledge.
get (prefix +
".on_data_received_logic").
to_string ();
164 delay_launch = knowledge.
get (prefix +
".delay_launch").
is_true ();
165 never_exit = knowledge.
get (prefix +
".never_exit").
is_true ();
167 send_reduced_message_header = knowledge.
get (prefix +
".send_reduced_message_header").
is_true ();
168 slack_time = knowledge.
get (prefix +
".slack_time").
to_double ();
169 read_thread_hertz = knowledge.
get (prefix +
".read_thread_hertz").
to_double ();
173 hosts.resize (kb_hosts.
size ());
174 for (
unsigned int i = 0; i < hosts.size (); ++i)
175 hosts[i] = kb_hosts[i];
180 std::vector <std::string> keys;
181 kb_read_domains.
keys (keys);
183 for (
unsigned int i = 0; i < keys.size (); ++i)
185 read_domains_[keys[i]] = 1;
188 no_sending = knowledge.
get (prefix +
".no_sending").
is_true ();
189 no_receiving = knowledge.
get (prefix +
".no_receiving").
is_true ();
199 read_threads = (uint32_t) knowledge.
get (prefix +
".read_threads").
to_integer ();
200 write_domain = knowledge.
get (prefix +
".write_domain").
to_string ();
201 queue_length = (uint32_t)knowledge.
get (prefix +
".queue_length").
to_integer ();
202 type = (uint32_t)knowledge.
get (prefix +
".type").
to_integer ();
203 max_fragment_size = (uint32_t)knowledge.
get (prefix +
".max_fragment_size").
to_integer ();
204 resend_attempts = (uint32_t)knowledge.
get (prefix +
".resend_attempts").
to_integer ();
205 fragment_queue_length = (uint32_t)knowledge.
get (prefix +
".fragment_queue_length").
to_integer ();
206 reliability = (uint32_t)knowledge.
get (prefix +
".reliability").
to_integer ();
208 processes = (uint32_t)knowledge.
get (prefix +
".processes").
to_integer ();
210 on_data_received_logic = knowledge.
get (prefix +
".on_data_received_logic").
to_string ();
211 delay_launch = knowledge.
get (prefix +
".delay_launch").
is_true ();
212 never_exit = knowledge.
get (prefix +
".never_exit").
is_true ();
214 send_reduced_message_header = knowledge.
get (prefix +
".send_reduced_message_header").
is_true ();
215 slack_time = knowledge.
get (prefix +
".slack_time").
to_double ();
216 read_thread_hertz = knowledge.
get (prefix +
".read_thread_hertz").
to_double ();
220 hosts.resize (kb_hosts.
size ());
221 for (
unsigned int i = 0; i < hosts.size (); ++i)
222 hosts[i] = kb_hosts[i];
227 std::vector <std::string> keys;
228 kb_read_domains.
keys (keys);
230 for (
unsigned int i = 0; i < keys.size (); ++i)
232 read_domains_[keys[i]] = 1;
235 no_sending = knowledge.
get (prefix +
".no_sending").
is_true ();
236 no_receiving = knowledge.
get (prefix +
".no_receiving").
is_true ();
251 knowledge.
set (prefix +
".read_threads",
Integer (read_threads));
252 knowledge.
set (prefix +
".write_domain", write_domain);
253 knowledge.
set (prefix +
".queue_length",
Integer (queue_length));
254 knowledge.
set (prefix +
".type",
Integer (type));
255 knowledge.
set (prefix +
".max_fragment_size",
Integer (max_fragment_size));
256 knowledge.
set (prefix +
".resend_attempts",
Integer (resend_attempts));
257 knowledge.
set (prefix +
".fragment_queue_length",
258 Integer (fragment_queue_length));
259 knowledge.
set (prefix +
".reliability",
Integer (reliability));
261 knowledge.
set (prefix +
".processes",
Integer (processes));
263 knowledge.
set (prefix +
".on_data_received_logic", on_data_received_logic);
264 knowledge.
set (prefix +
".delay_launch",
Integer (delay_launch));
265 knowledge.
set (prefix +
".never_exit",
Integer (never_exit));
267 knowledge.
set (prefix +
".send_reduced_message_header",
268 Integer (send_reduced_message_header));
269 knowledge.
set (prefix +
".slack_time", slack_time);
270 knowledge.
set (prefix +
".read_thread_hertz", read_thread_hertz);
272 for (
size_t i = 0; i < hosts.size (); ++i)
273 kb_hosts.set (i, hosts[i]);
275 knowledge.
set (prefix +
".no_sending",
Integer (no_sending));
276 knowledge.
set (prefix +
".no_receiving",
Integer (no_receiving));
279 for (std::map <std::string, int>::const_iterator i = read_domains_.begin ();
280 i != read_domains_.end (); ++i)
282 kb_read_domains.
set (i->first,
301 knowledge.
set (prefix +
".read_threads",
Integer (read_threads));
302 knowledge.
set (prefix +
".write_domain", write_domain);
303 knowledge.
set (prefix +
".queue_length",
Integer (queue_length));
304 knowledge.
set (prefix +
".type",
Integer (type));
305 knowledge.
set (prefix +
".max_fragment_size",
Integer (max_fragment_size));
306 knowledge.
set (prefix +
".resend_attempts",
Integer (resend_attempts));
307 knowledge.
set (prefix +
".fragment_queue_length",
308 Integer (fragment_queue_length));
309 knowledge.
set (prefix +
".reliability",
Integer (reliability));
311 knowledge.
set (prefix +
".processes",
Integer (processes));
313 knowledge.
set (prefix +
".on_data_received_logic", on_data_received_logic);
314 knowledge.
set (prefix +
".delay_launch",
Integer (delay_launch));
315 knowledge.
set (prefix +
".never_exit",
Integer (never_exit));
317 knowledge.
set (prefix +
".send_reduced_message_header",
318 Integer (send_reduced_message_header));
319 knowledge.
set (prefix +
".slack_time", slack_time);
320 knowledge.
set (prefix +
".read_thread_hertz", read_thread_hertz);
322 for (
size_t i = 0; i < hosts.size (); ++i)
323 kb_hosts.set (i, hosts[i]);
325 knowledge.
set (prefix +
".no_sending",
Integer (no_sending));
326 knowledge.
set (prefix +
".no_receiving",
Integer (no_receiving));
329 for (std::map <std::string, int>::const_iterator i = read_domains_.begin ();
330 i != read_domains_.end (); ++i)
332 kb_read_domains.
set (i->first,
virtual ~TransportSettings()
virtual void load_text(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a text file.
bool is_true(void) const
Checks to see if the record is true.
#define DEFAULT_ID
Default id in group.
int64_t save_context(const std::string &filename) const
Saves the context to a file.
double to_double(void) const
converts the value to a float/double
void keys(std::vector< std::string > &curkeys) const
Returns the keys within the map.
uint32_t fragment_queue_length
Indicates queue length for holding clock-keyed fragments.
std::vector< std::string > hosts
Host information for transports that require it.
double read_thread_hertz
number of valid messages allowed to be received per second.
Holds basic transport settings.
madara::knowledge::KnowledgeRecord::Integer Integer
madara::knowledge::KnowledgeRecord get(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings(false))
Retrieves a knowledge value.
#define MAXIMUM_RESEND_ATTEMPTS
Default number of processes in group.
uint32_t type
Type of transport. See madara::transport::Types for options.
virtual void save(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a binary file.
Provides container classes for fast knowledge base access and mutation.
uint32_t id
the id of this process.
This class stores a vector of strings inside of KaRL.
int set(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value=madara::knowledge::KnowledgeRecord::MODIFIED)
Sets a location within the map to the specified value.
virtual void save_text(const std::string &filename, const std::string &prefix="transport") const
Saves the settings from a text file.
uint32_t processes
number of processes expected in the network (best to overestimate if building latency tables ...
bool no_receiving
if true, never receive over transport
std::map< std::string, int > read_domains_
Any acceptable read domain is added here.
#define DEFAULT_DOMAIN
Default knowledge domain.
TransportSettings()
Constructor for this class.
std::string write_domain
All class members are accessible to users for easy setup.
This class stores a map of strings to KaRL variables.
This class provides a distributed knowledge base to users.
madara::knowledge::KnowledgeRecord::Integer Integer
bool no_sending
if true, never send over transport
bool send_reduced_message_header
send the reduced message header (clock, size, updates, KaRL id)
int64_t save_as_karl(const std::string &filename) const
Saves the context to a file as karl assignments, rather than binary.
Integer to_integer(void) const
converts the value to an integer
std::string on_data_received_logic
logic to be evaluated after every successful update
double slack_time
time to sleep between sends and rebroadcasts
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
uint32_t read_threads
the number of read threads to start
#define DEFAULT_PROCESSES
Default number of processes in group.
uint32_t queue_length
Length of the buffer used to store history of events.
int set(const VariableReference &variable, const std::string &value, const EvalSettings &settings=EvalSettings(false, false, true, false, false))
Atomically sets the value of a variable to a string.
Provides functions and classes for the distributed knowledge base.
MADARA_Export std::string file_to_string(const std::string &filename)
Reads a file into a string.
madara::knowledge::KnowledgeRecord evaluate(const std::string &expression, const EvalSettings &settings=EvalSettings())
Evaluates an expression.
uint32_t reliability
Reliability required of the transport.
virtual void load(const std::string &filename, const std::string &prefix="transport")
Loads the settings from a binary file.
int64_t load_context(const std::string &filename, bool use_id=true, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true, true, true, false))
Loads the context from a file.
void operator=(const TransportSettings &settings)
Assignment operator.
std::string to_string(const std::string &delimiter=", ") const
converts the value to a string.
bool never_exit
prevent MADARA from exiting on fatal errors and invalid state
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
int resend_attempts
Maximum number of attempts to resend if transport is busy.
bool delay_launch
delay launching transports
size_t size(void) const
Returns the size of the local vector.