15 #include "ace/OS_NS_Thread.h" 16 #include "ace/High_Res_Timer.h" 17 #include "ace/OS_NS_sys_socket.h" 19 #ifdef _MADARA_USING_ZMQ_ 23 #ifdef _USE_OPEN_SPLICE_ 25 #endif // _USE_OPEN_SPLICE_ 46 unsigned short port = 50000;
53 "KnowledgeBaseImpl::setup_unique_hostport:" \
54 " unable to bind to any ephemeral port\n");
64 "KnowledgeBaseImpl::setup_unique_hostport:" \
65 " unique bind to %s\n", actual_host.c_str ());
87 "KnowledgeBaseImpl::attach_transport:" \
88 " activating transport type %d\n", settings.
type);
102 #ifdef _USE_OPEN_SPLICE_ 104 "KnowledgeBaseImpl::activate_transport:" \
105 " creating Open Splice DDS transport.\n");
111 "KnowledgeBaseImpl::activate_transport:" \
112 " project was not generated with opensplice=1. Transport is invalid.\n");
119 "KnowledgeBaseImpl::activate_transport:" \
120 " creating NDDS transport.\n");
126 "KnowledgeBaseImpl::activate_transport:" \
127 " project was not generated with ndds=1. Transport is invalid.\n");
133 "KnowledgeBaseImpl::activate_transport:" \
134 " creating UDP transport.\n");
141 #ifdef _MADARA_USING_ZMQ_ 143 "KnowledgeBaseImpl::activate_transport:" \
144 " creating ZMQ transport.\n");
150 "KnowledgeBaseImpl::activate_transport:" \
151 " project was not generated with zmq=1. Transport is invalid.\n");
157 "KnowledgeBaseImpl::activate_transport:" \
158 " creating UDP Registry Server transport.\n");
166 "KnowledgeBaseImpl::activate_transport:" \
167 " creating UDP Registry Client transport.\n");
175 "KnowledgeBaseImpl::activate_transport:" \
176 " creating TCP transport.\n");
184 "KnowledgeBaseImpl::activate_transport:" \
185 " no transport was specified. Setting transport to null.\n");
208 "KnowledgeBaseImpl::close_transport:" \
209 " transport has been closed\n");
217 #ifndef _MADARA_NO_KARL_ 224 "KnowledgeBaseImpl::compile:" \
225 " compiling %s\n", expression.c_str ());
235 return wait (compiled, settings);
244 ACE_Time_Value current = ACE_High_Res_Timer::gettimeofday ();
245 ACE_Time_Value max_wait, sleep_time, next_epoch;
246 ACE_Time_Value poll_frequency, last = current;
251 max_wait = current + max_wait;
254 next_epoch = current + poll_frequency;
265 "KnowledgeBaseImpl::wait:" \
266 " waiting on %s\n", ce.
logic.c_str ());
271 "KnowledgeBaseImpl::wait:" \
272 " completed first eval to get %s\n",
273 last_value.to_string ().c_str ());
279 current = ACE_High_Res_Timer::gettimeofday ();
282 while (!last_value.to_integer () &&
286 "KnowledgeBaseImpl::wait:" \
287 " current is %" PRIu64
".%" PRIu64
" and max is %" PRIu64
".%" PRIu64
" (poll freq is %f)\n",
288 current.sec (), current.usec (), max_wait.sec (), max_wait.usec (),
292 "KnowledgeBaseImpl::wait:" \
293 " last value didn't result in success\n");
299 if (current < next_epoch)
301 sleep_time = next_epoch - current;
305 next_epoch = next_epoch + poll_frequency;
317 "KnowledgeBaseImpl::wait:" \
318 " waiting on %s\n", ce.
logic.c_str ());
323 "KnowledgeBaseImpl::wait:" \
324 " completed eval to get %s\n",
325 last_value.to_string ().c_str ());
333 current = ACE_High_Res_Timer::gettimeofday ();
337 if (current >= max_wait)
340 "KnowledgeBaseImpl::wait:" \
341 " Evaluate did not succeed. Timeout occurred\n");
359 "KnowledgeBaseImpl::evaluate:" \
360 " evaluating %s.\n", ce.
logic.c_str ());
395 "KnowledgeBaseImpl::evaluate:" \
396 " evaluating ComponentNode rooted tree\n");
436 if (modified.size () > 0)
456 for (KnowledgeRecords::const_iterator i = modified.begin ();
457 i != modified.end (); ++i)
461 allowed_modifieds[i->first] = i->second;
466 if (allowed_modifieds.size () > 0)
477 for (KnowledgeRecords::const_iterator i = allowed_modifieds.begin ();
478 i != allowed_modifieds.end (); ++i)
493 "%s: no modifications to send\n", prefix.c_str ());
503 "%s: no transport configured\n", prefix.c_str ());
510 "%s: user requested to not send modifieds\n", prefix.c_str ());
519 #endif // _MADARA_NO_KARL_ This class encapsulates an entry in a KnowledgeBase.
double max_wait_time
Maximum time to wait for an expression to become true (in seconds)
void lock(void) const
Locks the mutex.
MADARA_Export int bind_to_ephemeral_port(ACE_SOCK_Dgram &socket, std::string &host, unsigned short &port, bool increase_until_bound=true)
Binds to an ephemeral port.
std::map< std::string, bool > send_list
Map of record names that are allowed to be sent after operation.
void push_back(T &value)
Pushes a value onto the end of the vector.
ACE_SOCK_Dgram unique_bind_
const knowledge::KnowledgeRecords & get_modifieds(void) const
Retrieves a list of modified variables.
Multicast-based transport for knowledge.
std::string setup_unique_hostport(const std::string &host="")
Binds to an ephemeral port for unique tie breakers in global ordering.
Multicast-based transport for knowledge.
void lock(void) const
Locks the mutex on this context.
std::string pre_print_statement
Statement to print before evaluations.
bool signal_changes
Toggle whether to signal changes have happened.
void signal(bool lock=true) const
Signals that this thread is done with the context.
void print(unsigned int level) const
Atomically prints all variables and values in the context.
std::string logic
the logic that was compiled
void close_transport(void)
Closes the transport mechanism so no dissemination is possible.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Holds basic transport settings.
Compiled, optimized KaRL logic.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
size_t attach_transport(madara::transport::Base *transport)
Attaches a transport to the Knowledge Engine.
std::string post_print_statement
Statement to print after evaluations.
uint32_t type
Type of transport. See madara::transport::Types for options.
madara::knowledge::KnowledgeRecord wait(const std::string &expression)
Waits for an expression to be non-zero.
madara::expression::ExpressionTree expression
the expression tree
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
void unlock(void) const
Unlocks the mutex.
This class provides an interface into the Open Splice dissemination transport.
void unlock(void) const
Unlocks the mutex on this context.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
size_t size(void) const
returns the current size of the vector
An abstract base class defines a simple abstract implementation of an expression tree node...
double poll_frequency
Frequency to poll an expression for truth (in seconds)
ZMQ-based transport for knowledge.
void clear(void)
Clears the vector.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
MADARA_Export int merge_hostport_identifier(std::string &key, const std::string &host, const std::string &port)
Merges a host and port into a host:port key.
This class provides an interface into the NDDS dissemination transport.
void wait_for_change(bool extra_release=false)
Wait for a change to happen to the context.
uint64_t inc_clock(const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically increments the Lamport clock and returns the new clock time (intended for sending knowledg...
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
Encapsulates settings for an evaluation statement.
madara::knowledge::KnowledgeRecord evaluate(const std::string &expression)
Evaluates an expression.
Provides functions and classes for the distributed knowledge base.
int send_modifieds(const std::string &prefix, const EvalSettings &settings=EvalSettings())
Sends all modified variables through the attached transports.
Copyright (c) 2015 Carnegie Mellon University.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
UDP-based transport for knowledge.
bool delay_sending_modifieds
Toggle for sending modifieds in a single update event after each evaluation.
Encapsulates settings for a wait statement.
madara::transport::Transports transports_
UDP-based transport for knowledge.
Base class from which all transports must be derived.
bool never_exit
prevent MADARA from exiting on fatal errors and invalid state
transport::QoSTransportSettings settings_
UDP-based server that handles a registry of UDP endpoints, which makes it ideal for any NAT-protected...
TCP-based transport (skeleton code)
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
void reset_modified(void)
Reset all variables to be unmodified.