MADARA  3.1.8
KnowledgeBaseImpl.cpp
Go to the documentation of this file.
11 
12 
13 #include <sstream>
14 
15 #include "ace/OS_NS_Thread.h"
16 #include "ace/High_Res_Timer.h"
17 #include "ace/OS_NS_sys_socket.h"
18 
19 #ifdef _MADARA_USING_ZMQ_
21 #endif
22 
23 #ifdef _USE_OPEN_SPLICE_
25 #endif // _USE_OPEN_SPLICE_
26 
27 #ifdef _USE_NDDS_
29 #endif // _USE_NDDS_
30 
31 #include <iostream>
32 
33 namespace madara { namespace knowledge {
34 
37 const std::string & host)
38 {
39  // placeholder for our ip address
40  std::string actual_host (host);
41 
42  if (host == "")
43  {
44  // start from 50k, which is just above the bottom of the user
45  // definable port range (hopefully avoid conflicts with 49152-49999
46  unsigned short port = 50000;
47 
49  unique_bind_, actual_host, port)
50  == -1)
51  {
53  "KnowledgeBaseImpl::setup_unique_hostport:" \
54  " unable to bind to any ephemeral port\n");
55 
56  if (!settings_.never_exit)
57  exit (-1);
58  }
59 
60  // we were able to bind to an ephemeral port
61  madara::utility::merge_hostport_identifier (actual_host, actual_host, port);
62 
64  "KnowledgeBaseImpl::setup_unique_hostport:" \
65  " unique bind to %s\n", actual_host.c_str ());
66  }
67 
68  return actual_host;
69 }
70 
71 size_t
74 {
75  madara::transport::Base * transport (0);
76  std::string originator (id);
77 
78  if (originator == "")
79  {
80  if (id_.size () > 0)
81  originator = id_;
82  else
83  originator = id_ = setup_unique_hostport ();
84  }
85 
87  "KnowledgeBaseImpl::attach_transport:" \
88  " activating transport type %d\n", settings.type);
89 
90  if (settings.type == madara::transport::BROADCAST)
91  {
92  transport = new madara::transport::BroadcastTransport (originator, map_,
93  settings, true);
94  }
95  else if (settings.type == madara::transport::MULTICAST)
96  {
97  transport = new madara::transport::MulticastTransport (originator, map_,
98  settings, true);
99  }
100  else if (settings.type == madara::transport::SPLICE)
101  {
102 #ifdef _USE_OPEN_SPLICE_
104  "KnowledgeBaseImpl::activate_transport:" \
105  " creating Open Splice DDS transport.\n");
106 
107  transport = new madara::transport::SpliceDDSTransport (originator, map_,
108  settings, true);
109 #else
111  "KnowledgeBaseImpl::activate_transport:" \
112  " project was not generated with opensplice=1. Transport is invalid.\n");
113 #endif
114  }
115  else if (settings.type == madara::transport::NDDS)
116  {
117 #ifdef _USE_NDDS_
119  "KnowledgeBaseImpl::activate_transport:" \
120  " creating NDDS transport.\n");
121 
122  transport = new madara::transport::NddsTransport (originator, map_,
123  settings, true);
124 #else
126  "KnowledgeBaseImpl::activate_transport:" \
127  " project was not generated with ndds=1. Transport is invalid.\n");
128 #endif
129  }
130  else if (settings.type == madara::transport::UDP)
131  {
133  "KnowledgeBaseImpl::activate_transport:" \
134  " creating UDP transport.\n");
135 
136  transport = new madara::transport::UdpTransport (originator, map_,
137  settings, true);
138  }
139  else if (settings.type == madara::transport::ZMQ)
140  {
141 #ifdef _MADARA_USING_ZMQ_
143  "KnowledgeBaseImpl::activate_transport:" \
144  " creating ZMQ transport.\n");
145 
146  transport = new madara::transport::ZMQTransport (originator, map_,
147  settings, true);
148 #else
150  "KnowledgeBaseImpl::activate_transport:" \
151  " project was not generated with zmq=1. Transport is invalid.\n");
152 #endif
153  }
154  else if (settings.type == madara::transport::REGISTRY_SERVER)
155  {
157  "KnowledgeBaseImpl::activate_transport:" \
158  " creating UDP Registry Server transport.\n");
159 
160  transport = new madara::transport::UdpRegistryServer (originator, map_,
161  settings, true);
162  }
163  else if (settings.type == madara::transport::REGISTRY_CLIENT)
164  {
166  "KnowledgeBaseImpl::activate_transport:" \
167  " creating UDP Registry Client transport.\n");
168 
169  transport = new madara::transport::UdpRegistryClient (originator, map_,
170  settings, true);
171  }
172  else if (settings.type == madara::transport::TCP)
173  {
175  "KnowledgeBaseImpl::activate_transport:" \
176  " creating TCP transport.\n");
177 
178  transport = new madara::transport::TcpTransport (originator, map_,
179  settings, true);
180  }
181  else
182  {
184  "KnowledgeBaseImpl::activate_transport:" \
185  " no transport was specified. Setting transport to null.\n");
186  }
187 
188  // if we have a valid transport, add it to the transports vector
189  if (transport != 0)
190  {
191  transports_.push_back (transport);
192  }
193 
194  return transports_.size ();
195 }
196 
197 void
199 {
200  if (transports_.size () > 0)
201  {
202  for (unsigned int i = 0; i < transports_.size (); ++i)
203  {
204  transports_[i]->close ();
205  delete transports_[i];
206 
208  "KnowledgeBaseImpl::close_transport:" \
209  " transport has been closed\n");
210  }
211 
212  transports_.clear ();
213  }
214 }
215 
216 
217 #ifndef _MADARA_NO_KARL_
218 
221 const std::string & expression)
222 {
224  "KnowledgeBaseImpl::compile:" \
225  " compiling %s\n", expression.c_str ());
226 
227  return map_.compile (expression);
228 }
229 
232 const WaitSettings & settings)
233 {
234  CompiledExpression compiled = compile (expression);
235  return wait (compiled, settings);
236 }
237 
240 CompiledExpression & ce,
241 const WaitSettings & settings)
242 {
243  // get current time of day
244  ACE_Time_Value current = ACE_High_Res_Timer::gettimeofday ();
245  ACE_Time_Value max_wait, sleep_time, next_epoch;
246  ACE_Time_Value poll_frequency, last = current;
247 
248  if (settings.poll_frequency >= 0)
249  {
250  max_wait.set (settings.max_wait_time);
251  max_wait = current + max_wait;
252 
253  poll_frequency.set (settings.poll_frequency);
254  next_epoch = current + poll_frequency;
255  }
256 
257  // print the post statement at highest log level (cannot be masked)
258  if (settings.pre_print_statement != "")
260 
261  // lock the context
262  map_.lock ();
263 
265  "KnowledgeBaseImpl::wait:" \
266  " waiting on %s\n", ce.logic.c_str ());
267 
268  KnowledgeRecord last_value = ce.expression.evaluate (settings);
269 
271  "KnowledgeBaseImpl::wait:" \
272  " completed first eval to get %s\n",
273  last_value.to_string ().c_str ());
274 
275  send_modifieds ("KnowledgeBaseImpl:wait", settings);
276 
277  map_.unlock ();
278 
279  current = ACE_High_Res_Timer::gettimeofday ();
280 
281  // wait for expression to be true
282  while (!last_value.to_integer () &&
283  (settings.max_wait_time < 0 || current < max_wait))
284  {
286  "KnowledgeBaseImpl::wait:" \
287  " current is %" PRIu64 ".%" PRIu64 " and max is %" PRIu64 ".%" PRIu64 " (poll freq is %f)\n",
288  current.sec (), current.usec (), max_wait.sec (), max_wait.usec (),
289  settings.poll_frequency);
290 
292  "KnowledgeBaseImpl::wait:" \
293  " last value didn't result in success\n");
294 
295  // Unlike the other wait statements, we allow for a time based wait.
296  // To do this, we allow a user to specify a
297  if (settings.poll_frequency > 0)
298  {
299  if (current < next_epoch)
300  {
301  sleep_time = next_epoch - current;
302  madara::utility::sleep (sleep_time);
303  }
304 
305  next_epoch = next_epoch + poll_frequency;
306  }
307  else
308  map_.wait_for_change (true);
309 
310  // relock - basically we need to evaluate the tree again, and
311  // we can't have a bunch of people changing the variables as
312  // while we're evaluating the tree.
313  map_.lock ();
314 
315 
317  "KnowledgeBaseImpl::wait:" \
318  " waiting on %s\n", ce.logic.c_str ());
319 
320  last_value = ce.expression.evaluate (settings);
321 
323  "KnowledgeBaseImpl::wait:" \
324  " completed eval to get %s\n",
325  last_value.to_string ().c_str ());
326 
327  send_modifieds ("KnowledgeBaseImpl:wait", settings);
328 
329  map_.unlock ();
330  map_.signal ();
331 
332  // get current time
333  current = ACE_High_Res_Timer::gettimeofday ();
334 
335  } // end while (!last)
336 
337  if (current >= max_wait)
338  {
340  "KnowledgeBaseImpl::wait:" \
341  " Evaluate did not succeed. Timeout occurred\n");
342  }
343 
344  // print the post statement at highest log level (cannot be masked)
345  if (settings.post_print_statement != "")
347 
348  return last_value;
349 }
350 
353 CompiledExpression & ce,
354 const EvalSettings & settings)
355 {
356  KnowledgeRecord last_value;
357 
359  "KnowledgeBaseImpl::evaluate:" \
360  " evaluating %s.\n", ce.logic.c_str ());
361 
362  // iterators and tree for evaluation of interpreter results
363  //madara::expression::ExpressionTree tree;
364 
365  // print the post statement at highest log level (cannot be masked)
366  if (settings.pre_print_statement != "")
368 
369  // lock the context from being updated by any ongoing threads
370  map_.lock ();
371 
372  // interpret the current expression and then evaluate it
373  //tree = interpreter_.interpret (map_, expression);
374  last_value = ce.expression.evaluate (settings);
375 
376  send_modifieds ("KnowledgeBaseImpl:evaluate", settings);
377 
378  // print the post statement at highest log level (cannot be masked)
379  if (settings.post_print_statement != "")
381 
382  map_.unlock ();
383 
384  return last_value;
385 }
386 
390 const EvalSettings & settings)
391 {
392  KnowledgeRecord last_value;
393 
395  "KnowledgeBaseImpl::evaluate:" \
396  " evaluating ComponentNode rooted tree\n");
397 
398  // iterators and tree for evaluation of interpreter results
399  //madara::expression::ExpressionTree tree;
400 
401  // print the post statement at highest log level (cannot be masked)
402  if (settings.pre_print_statement != "")
404 
405  // lock the context from being updated by any ongoing threads
406  map_.lock ();
407 
408  // interpret the current expression and then evaluate it
409  //tree = interpreter_.interpret (map_, expression);
410  last_value = map_.evaluate (root, settings);
411 
412  send_modifieds ("KnowledgeBaseImpl:evaluate", settings);
413 
414  // print the post statement at highest log level (cannot be masked)
415  if (settings.post_print_statement != "")
417 
418  map_.unlock ();
419 
420  return last_value;
421 }
422 
423 int
425  const std::string & prefix,
426  const EvalSettings & settings)
427 {
428  int result = 0;
429 
430  MADARA_GUARD_TYPE guard (map_.mutex_);
431 
432  if (transports_.size () > 0 && !settings.delay_sending_modifieds)
433  {
434  const KnowledgeRecords & modified = map_.get_modifieds ();
435 
436  if (modified.size () > 0)
437  {
438  // if there is not an allowed send_list list
439  if (settings.send_list.size () == 0)
440  {
441  transports_.lock ();
442  // send across each transport
443  for (unsigned int i = 0; i < transports_.size (); ++i, ++result)
444  transports_[i]->send_data (modified);
445 
446  transports_.unlock ();
447 
448  // reset the modified map
449  map_.reset_modified ();
450  }
451  // if there is a send_list
452  else
453  {
454  KnowledgeRecords allowed_modifieds;
455  // otherwise, we are only allowed to send a subset of modifieds
456  for (KnowledgeRecords::const_iterator i = modified.begin ();
457  i != modified.end (); ++i)
458  {
459  if (settings.send_list.find (i->first) != settings.send_list.end ())
460  {
461  allowed_modifieds[i->first] = i->second;
462  }
463  }
464 
465  // if the subset was greater than zero, we send the subset
466  if (allowed_modifieds.size () > 0)
467  {
468  transports_.lock ();
469 
470  // send across each transport
471  for (unsigned int i = 0; i < transports_.size (); ++i, ++result)
472  transports_[i]->send_data (allowed_modifieds);
473 
474  transports_.unlock ();
475 
476  // reset modified list for the allowed modifications
477  for (KnowledgeRecords::const_iterator i = allowed_modifieds.begin ();
478  i != allowed_modifieds.end (); ++i)
479  {
480  map_.reset_modified (i->first);
481  }
482  }
483  }
484 
485  map_.inc_clock (settings);
486 
487  if (settings.signal_changes)
488  map_.signal (false);
489  }
490  else
491  {
493  "%s: no modifications to send\n", prefix.c_str ());
494 
495  result = -1;
496  }
497  }
498  else
499  {
500  if (transports_.size () == 0)
501  {
503  "%s: no transport configured\n", prefix.c_str ());
504 
505  result = -2;
506  }
507  else if (settings.delay_sending_modifieds)
508  {
510  "%s: user requested to not send modifieds\n", prefix.c_str ());
511 
512  result = -3;
513  }
514  }
515 
516  return result;
517 }
518 
519 #endif // _MADARA_NO_KARL_
520 
521 } }
This class encapsulates an entry in a KnowledgeBase.
double max_wait_time
Maximum time to wait for an expression to become true (in seconds)
Definition: WaitSettings.h:113
void lock(void) const
Locks the mutex.
MADARA_Export int bind_to_ephemeral_port(ACE_SOCK_Dgram &socket, std::string &host, unsigned short &port, bool increase_until_bound=true)
Binds to an ephemeral port.
Definition: Utility.cpp:397
std::map< std::string, bool > send_list
Map of record names that are allowed to be sent after operation.
Definition: EvalSettings.h:130
void push_back(T &value)
Pushes a value onto the end of the vector.
const knowledge::KnowledgeRecords & get_modifieds(void) const
Retrieves a list of modified variables.
Multicast-based transport for knowledge.
std::string setup_unique_hostport(const std::string &host="")
Binds to an ephemeral port for unique tie breakers in global ordering.
Multicast-based transport for knowledge.
void lock(void) const
Locks the mutex on this context.
std::string pre_print_statement
Statement to print before evaluations.
Definition: EvalSettings.h:119
bool signal_changes
Toggle whether to signal changes have happened.
void signal(bool lock=true) const
Signals that this thread is done with the context.
void print(unsigned int level) const
Atomically prints all variables and values in the context.
std::string logic
the logic that was compiled
void close_transport(void)
Closes the transport mechanism so no dissemination is possible.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
Compiled, optimized KaRL logic.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
size_t attach_transport(madara::transport::Base *transport)
Attaches a transport to the Knowledge Engine.
std::string post_print_statement
Statement to print after evaluations.
Definition: EvalSettings.h:124
uint32_t type
Type of transport. See madara::transport::Types for options.
madara::knowledge::KnowledgeRecord wait(const std::string &expression)
Waits for an expression to be non-zero.
madara::expression::ExpressionTree expression
the expression tree
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
void unlock(void) const
Unlocks the mutex.
This class provides an interface into the Open Splice dissemination transport.
void unlock(void) const
Unlocks the mutex on this context.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
size_t size(void) const
returns the current size of the vector
An abstract base class defines a simple abstract implementation of an expression tree node...
Definition: ComponentNode.h:35
double poll_frequency
Frequency to poll an expression for truth (in seconds)
Definition: WaitSettings.h:108
ZMQ-based transport for knowledge.
Definition: ZMQTransport.h:43
void clear(void)
Clears the vector.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
static constexpr struct madara::knowledge::tags::string_t string
MADARA_Export int merge_hostport_identifier(std::string &key, const std::string &host, const std::string &port)
Merges a host and port into a host:port key.
Definition: Utility.cpp:372
This class provides an interface into the NDDS dissemination transport.
Definition: NddsTransport.h:22
void wait_for_change(bool extra_release=false)
Wait for a change to happen to the context.
uint64_t inc_clock(const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically increments the Lamport clock and returns the new clock time (intended for sending knowledg...
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
Encapsulates settings for an evaluation statement.
Definition: EvalSettings.h:26
madara::knowledge::KnowledgeRecord evaluate(const std::string &expression)
Evaluates an expression.
Provides functions and classes for the distributed knowledge base.
int send_modifieds(const std::string &prefix, const EvalSettings &settings=EvalSettings())
Sends all modified variables through the attached transports.
Copyright (c) 2015 Carnegie Mellon University.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
UDP-based transport for knowledge.
bool delay_sending_modifieds
Toggle for sending modifieds in a single update event after each evaluation.
Definition: EvalSettings.h:114
Encapsulates settings for a wait statement.
Definition: WaitSettings.h:23
madara::transport::Transports transports_
UDP-based transport for knowledge.
Definition: UdpTransport.h:34
Base class from which all transports must be derived.
Definition: Transport.h:62
bool never_exit
prevent MADARA from exiting on fatal errors and invalid state
transport::QoSTransportSettings settings_
UDP-based server that handles a registry of UDP endpoints, which makes it ideal for any NAT-protected...
TCP-based transport (skeleton code)
Definition: TcpTransport.h:19
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
void reset_modified(void)
Reset all variables to be unmodified.