MADARA  3.1.8
ThreadSafeContext.cpp
Go to the documentation of this file.
1 #include <iostream>
2 #include <fstream>
3 #include <sstream>
4 #include <iterator>
5 #include <memory>
6 
7 #include <string.h>
8 
10 
12 
15 #include <stdio.h>
16 #include <time.h>
19 
20 
21 // constructor
23  :
24 #ifdef MADARA_CONDITION_MUTEX_CONSTRUCTOR
25  changed_ (mutex_),
26 #endif
27  clock_ (0)
28 #ifndef _MADARA_NO_KARL_
29 ,
30  interpreter_ (new madara::expression::Interpreter ())
31 #endif // _MADARA_NO_KARL_
32  , logger_ (logger::global_logger.get ())
33 {
34  expansion_splitters_.push_back ("{");
35  expansion_splitters_.push_back ("}");
36 }
37 
38 // destructor
40 {
41 #ifndef _MADARA_NO_KARL_
42  delete interpreter_;
43 #endif // _MADARA_NO_KARL_
44 
45 }
46 
54  const std::string & key,
55  const KnowledgeReferenceSettings & settings)
56 {
57  std::string key_actual;
58  const std::string * key_ptr;
59  MADARA_GUARD_TYPE guard (mutex_);
60 
61  if (settings.expand_variables)
62  {
63  key_actual = expand_statement (key);
64  key_ptr = &key_actual;
65  }
66  else
67  key_ptr = &key;
68 
69  if (*key_ptr == "")
70  return 0;
71 
72  // if the variable doesn't exist, hash maps create a record automatically
73  // when used in this manner
74  return &map_[*key_ptr];
75 }
76 
77 
80  const std::string & key,
81  const KnowledgeReferenceSettings & settings)
82 {
83  std::string key_actual;
84  const std::string * key_ptr;
85  MADARA_GUARD_TYPE guard (mutex_);
86 
87  VariableReference record;
88 
89  // expand the key if the user asked for it
90  if (settings.expand_variables)
91  {
92  key_actual = expand_statement (key);
93  key_ptr = &key_actual;
94  }
95  else
96  key_ptr = &key;
97 
98  // set name to the expanded key, for debugging purposes
99  record.set_name (*key_ptr);
100 
101  // if the key is possible, create a reference to the record
102  if (*key_ptr != "")
103  {
104  record.record_ = &map_[*key_ptr];
105  }
106  return record;
107 }
108 
109 
112 const std::string & key,
113 const KnowledgeReferenceSettings & settings) const
114 {
115  std::string key_actual;
116  const std::string * key_ptr;
117  MADARA_GUARD_TYPE guard (mutex_);
118 
119  VariableReference record;
120 
121  // expand the key if the user asked for it
122  if (settings.expand_variables)
123  {
124  key_actual = expand_statement (key);
125  key_ptr = &key_actual;
126  }
127  else
128  key_ptr = &key;
129 
130  // set name to the expanded key, for debugging purposes
131  record.set_name (*key_ptr);
132 
133  // if the key is possible, create a reference to the record
134  if (*key_ptr != "")
135  {
136  KnowledgeMap::const_iterator found = map_.find (*key_ptr);
137  record.record_ = (KnowledgeRecord *)&found->second;
138  }
139  return record;
140 }
141 
142 // set the value of a variable
143 int
145  const VariableReference & variable,
146  const char * value, size_t size,
147  const KnowledgeUpdateSettings & settings)
148 {
149 
150  MADARA_GUARD_TYPE guard (mutex_);
151  if (variable.record_)
152  {
153  // check if we have the appropriate write quality
154  if (!settings.always_overwrite &&
155  variable.record_->write_quality < variable.record_->quality)
156  return -2;
157 
158  variable.record_->set_xml (value, size);
159  variable.record_->quality = variable.record_->write_quality;
160 
161  mark_and_signal (variable.name_.get_ptr (), variable.record_, settings);
162  }
163  else
164  return -1;
165 
166  return 0;
167 }
168 
169 // set the value of a variable
170 int
172  const VariableReference & variable,
173  const char * value, size_t size,
174  const KnowledgeUpdateSettings & settings)
175 {
176  MADARA_GUARD_TYPE guard (mutex_);
177  if (variable.record_)
178  {
179  // check if we have the appropriate write quality
180  if (!settings.always_overwrite &&
181  variable.record_->write_quality < variable.record_->quality)
182  return -2;
183 
184  variable.record_->set_text (value, size);
185  variable.record_->quality = variable.record_->write_quality;
186 
187  mark_and_signal (variable.name_.get_ptr (), variable.record_, settings);
188  }
189  else
190  return -1;
191 
192  return 0;
193 }
194 
195 // set the value of a variable
196 int
198  const VariableReference & variable,
199  const unsigned char * value, size_t size,
200  const KnowledgeUpdateSettings & settings)
201 {
202  MADARA_GUARD_TYPE guard (mutex_);
203  if (variable.record_)
204  {
205  // check if we have the appropriate write quality
206  if (!settings.always_overwrite &&
207  variable.record_->write_quality < variable.record_->quality)
208  return -2;
209 
210  variable.record_->set_jpeg (value, size);
211  variable.record_->quality = variable.record_->write_quality;
212 
213  mark_and_signal (variable.name_.get_ptr (), variable.record_, settings);
214  }
215  else
216  return -1;
217 
218  return 0;
219 }
220 
221 // set the value of a variable
222 int
224  const VariableReference & variable,
225  const unsigned char * value, size_t size,
226  const KnowledgeUpdateSettings & settings)
227 {
228  MADARA_GUARD_TYPE guard (mutex_);
229  if (variable.record_)
230  {
231  // check if we have the appropriate write quality
232  if (!settings.always_overwrite &&
233  variable.record_->write_quality < variable.record_->quality)
234  return -2;
235 
236  variable.record_->set_file (value, size);
237  variable.record_->quality = variable.record_->write_quality;
238 
239  mark_and_signal (variable.name_.get_ptr (), variable.record_, settings);
240  }
241  else
242  return -1;
243 
244  return 0;
245 }
246 
247 // set the value of a variable
248 int
250  const VariableReference & variable,
251  const std::string & filename,
252  const KnowledgeUpdateSettings & settings)
253 {
254  int return_value = 0;
255  MADARA_GUARD_TYPE guard (mutex_);
256  if (variable.record_)
257  {
258  // check if we have the appropriate write quality
259  if (!settings.always_overwrite &&
260  variable.record_->write_quality < variable.record_->quality)
261  return -2;
262 
263  return_value = variable.record_->read_file (filename);
264  variable.record_->quality = variable.record_->write_quality;
265 
266  mark_and_signal (variable.name_.get_ptr (), variable.record_, settings);
267  }
268  else
269  return return_value = -1;
270 
271  return return_value;
272 }
273 
276 uint32_t
278  const std::string & key,
279  const KnowledgeReferenceSettings & settings)
280 {
281  // enter the mutex
282  std::string key_actual;
283  const std::string * key_ptr;
284  MADARA_GUARD_TYPE guard (mutex_);
285 
286  if (settings.expand_variables)
287  {
288  key_actual = expand_statement (key);
289  key_ptr = &key_actual;
290  }
291  else
292  key_ptr = &key;
293 
294  // find the key in the knowledge base
295  KnowledgeMap::iterator found = map_.find (*key_ptr);
296 
297  // create the variable if it has never been written to before
298  // and update its current value quality to the quality parameter
299 
300  if (found != map_.end ())
301  return map_[*key_ptr].quality;
302 
303  // default quality is 0
304  return 0;
305 }
306 
309 uint32_t
311  const std::string & key,
312  const KnowledgeReferenceSettings & settings)
313 {
314  // enter the mutex
315  std::string key_actual;
316  const std::string * key_ptr;
317  MADARA_GUARD_TYPE guard (mutex_);
318 
319  if (settings.expand_variables)
320  {
321  key_actual = expand_statement (key);
322  key_ptr = &key_actual;
323  }
324  else
325  key_ptr = &key;
326 
327  // find the key in the knowledge base
328  KnowledgeMap::iterator found = map_.find (*key_ptr);
329 
330  // create the variable if it has never been written to before
331  // and update its current value quality to the quality parameter
332 
333  if (found != map_.end ())
334  return map_[*key_ptr].write_quality;
335 
336  // default quality is 0
337  return 0;
338 }
339 
342 uint32_t
344  const std::string & key, uint32_t quality,
345  bool force_update,
346  const KnowledgeReferenceSettings & settings)
347 {
348  // enter the mutex
349  std::string key_actual;
350  const std::string * key_ptr;
351  MADARA_GUARD_TYPE guard (mutex_);
352 
353  if (settings.expand_variables)
354  {
355  key_actual = expand_statement (key);
356  key_ptr = &key_actual;
357  }
358  else
359  key_ptr = &key;
360 
361  // check for null key
362  if (*key_ptr == "")
363  return 0;
364 
365  // find the key in the knowledge base
366  KnowledgeMap::iterator found = map_.find (*key_ptr);
367 
368  // create the variable if it has never been written to before
369  // and update its current value quality to the quality parameter
370 
371  if (found == map_.end () || force_update || quality > found->second.quality)
372  map_[*key_ptr].quality = quality;
373 
374  // return current quality
375  return map_[*key_ptr].quality;
376 }
377 
379 void
381  const std::string & key, uint32_t quality,
382  const KnowledgeReferenceSettings & settings)
383 {
384  // enter the mutex
385  std::string key_actual;
386  const std::string * key_ptr;
387  MADARA_GUARD_TYPE guard (mutex_);
388 
389  if (settings.expand_variables)
390  {
391  key_actual = expand_statement (key);
392  key_ptr = &key_actual;
393  }
394  else
395  key_ptr = &key;
396 
397  // create the variable if it has never been written to before
398  // and update its local process write quality to the quality parameter
399  map_[*key_ptr].write_quality = quality;
400 }
401 
406 int
409  uint32_t quality, uint64_t clock,
410  const KnowledgeUpdateSettings & settings)
411 {
412  int result = 1;
413 
414  // enter the mutex
415  std::string key_actual;
416  const std::string * key_ptr;
417  MADARA_GUARD_TYPE guard (mutex_);
418 
419  if (settings.expand_variables)
420  {
421  key_actual = expand_statement (key);
422  key_ptr = &key_actual;
423  }
424  else
425  key_ptr = &key;
426 
427  // check for null key
428  if (*key_ptr == "")
429  return -1;
430 
431  // find the key in the knowledge base
432  KnowledgeMap::iterator found = map_.find (*key_ptr);
433 
434  // if it's found, then compare the value
435  if (!settings.always_overwrite && found != map_.end ())
436  {
437  // setup a rhs
439  rhs.set_value (value);
440 
441  // if we do not have enough quality to update the variable
442  // return -2
443  if (quality < found->second.quality)
444  result = -2;
445 
446  // if we have the same quality, but our clock value
447  // is less than what we've already seen, then return -3
448  else if (quality == found->second.quality &&
449  clock < found->second.clock)
450  result = -3;
451 
452  // check for value already set
453  else if (found->second == rhs)
454  result = 0;
455  }
456 
457  madara::knowledge::KnowledgeRecord & record = map_[*key_ptr];
458 
459  // if we need to update quality, then update it
460  if (result != -2 && record.quality != quality)
461  record.quality = quality;
462 
463  // if we need to update the variable clock, then update it
464  if (clock > record.clock)
465  record.clock = clock;
466 
467  // if we need to update the global clock, then update it
468  if (clock > this->clock_)
469  this->clock_ = clock;
470 
471  if (result == 1)
472  {
473  // we have a situation where the value needs to be changed
474  record.set_value (value);
475 
476  mark_and_signal (key_ptr->c_str (), &record, settings);
477  }
478 
479  // value was changed
480  return result;
481 }
482 
487 int
489  const std::string & key, double value,
490  uint32_t quality, uint64_t clock,
491  const KnowledgeUpdateSettings & settings)
492 {
493  int result = 1;
494 
495  // enter the mutex
496  std::string key_actual;
497  const std::string * key_ptr;
498  MADARA_GUARD_TYPE guard (mutex_);
499 
500  if (settings.expand_variables)
501  {
502  key_actual = expand_statement (key);
503  key_ptr = &key_actual;
504  }
505  else
506  key_ptr = &key;
507 
508  // check for null key
509  if (*key_ptr == "")
510  return -1;
511 
512  // find the key in the knowledge base
513  KnowledgeMap::iterator found = map_.find (*key_ptr);
514 
515  // if it's found, then compare the value
516  if (!settings.always_overwrite && found != map_.end ())
517  {
518  // setup a rhs
520  rhs.set_value (value);
521 
522  // if we do not have enough quality to update the variable
523  // return -2
524  if (quality < found->second.quality)
525  result = -2;
526 
527  // if we have the same quality, but our clock value
528  // is less than what we've already seen, then return -3
529  else if (quality == found->second.quality &&
530  clock < found->second.clock)
531  result = -3;
532 
533  // check for value already set
534  else if (found->second == rhs)
535  result = 0;
536  }
537 
538  madara::knowledge::KnowledgeRecord & record = map_[*key_ptr];
539 
540  // if we need to update quality, then update it
541  if (result != -2 && record.quality != quality)
542  record.quality = quality;
543 
544  // if we need to update the variable clock, then update it
545  if (clock > record.clock)
546  record.clock = clock;
547 
548  // if we need to update the global clock, then update it
549  if (clock > this->clock_)
550  this->clock_ = clock;
551 
552  if (result == 1)
553  {
554  // we have a situation where the value needs to be changed
555  record.set_value (value);
556 
557  mark_and_signal (key_ptr->c_str (), &record, settings);
558  }
559 
560  // value was changed
561  return result;
562 }
563 
568 int
570  const std::string & key, const std::string & value,
571  uint32_t quality, uint64_t clock,
572  const KnowledgeUpdateSettings & settings)
573 {
574  int result = 1;
575 
576  // enter the mutex
577  std::string key_actual;
578  const std::string * key_ptr;
579  MADARA_GUARD_TYPE guard (mutex_);
580 
581  if (settings.expand_variables)
582  {
583  key_actual = expand_statement (key);
584  key_ptr = &key_actual;
585  }
586  else
587  key_ptr = &key;
588 
589  // check for null key
590  if (*key_ptr == "")
591  return -1;
592 
593  // find the key in the knowledge base
594  KnowledgeMap::iterator found = map_.find (*key_ptr);
595 
596  // if it's found, then compare the value
597  if (!settings.always_overwrite && found != map_.end ())
598  {
599  // setup a rhs
601  rhs.set_value (value);
602 
603  // if we do not have enough quality to update the variable
604  // return -2
605  if (quality < found->second.quality)
606  result = -2;
607 
608  // if we have the same quality, but our clock value
609  // is less than what we've already seen, then return -3
610  else if (quality == found->second.quality &&
611  clock < found->second.clock)
612  result = -3;
613 
614  // check for value already set
615  else if (found->second == rhs)
616  result = 0;
617  }
618 
619  madara::knowledge::KnowledgeRecord & record = map_[*key_ptr];
620 
621  // if we need to update quality, then update it
622  if (result != -2 && record.quality != quality)
623  record.quality = quality;
624 
625  // if we need to update the variable clock, then update it
626  if (clock > record.clock)
627  record.clock = clock;
628 
629  // if we need to update the global clock, then update it
630  if (clock > this->clock_)
631  this->clock_ = clock;
632 
633  if (result == 1)
634  {
635  // we have a situation where the value needs to be changed
636  record.set_value (value);
637 
638  // otherwise set the value
639  mark_and_signal (key_ptr->c_str (), &record, settings);
640  }
641  // value was changed
642  return result;
643 }
644 
645 
650 int
652  const std::string & key, const knowledge::KnowledgeRecord & rhs,
653  const KnowledgeUpdateSettings & settings)
654 {
655  int result = 1;
656 
657  // enter the mutex
658  std::string key_actual;
659  const std::string * key_ptr;
660  MADARA_GUARD_TYPE guard (mutex_);
661 
662  if (settings.expand_variables)
663  {
664  key_actual = expand_statement (key);
665  key_ptr = &key_actual;
666  }
667  else
668  key_ptr = &key;
669 
670  // check for null key
671  if (*key_ptr == "")
672  return -1;
673 
674  // find the key in the knowledge base
675  KnowledgeMap::iterator found = map_.find (*key_ptr);
676 
677  // if it's found, then compare the value
678  if (!settings.always_overwrite && found != map_.end ())
679  {
680  // if we do not have enough quality to update the variable
681  // return -2
682  if (rhs.quality < found->second.quality)
683  result = -2;
684 
685  // if we have the same quality, but our clock value
686  // is less than what we've already seen, then return -3
687  else if (rhs.quality == found->second.quality &&
688  rhs.clock < found->second.clock)
689  result = -3;
690 
691  // if we reach this point, then the record is safe to copy
692  found->second.set_value (rhs);
693 
694  knowledge::KnowledgeRecord & current_value = found->second;
695 
696  mark_and_signal (key_ptr->c_str (), &current_value, settings);
697  }
698  else
699  {
700  // if we reach this point, then we have to create the record
701  knowledge::KnowledgeRecord & current_value = map_[*key_ptr];
702  current_value.set_value (rhs);
703 
704  mark_and_signal (key_ptr->c_str (), &current_value, settings);
705  }
706 
707  // if we need to update the global clock, then update it
708  if (rhs.clock >= this->clock_)
709  this->clock_ = rhs.clock + 1;
710 
711  //if (settings.signal_changes)
712  // changed_.MADARA_CONDITION_NOTIFY_ONE ();
713 
714  // value was changed
715  return result;
716 }
717 
722 int
724  const VariableReference & target,
725  const knowledge::KnowledgeRecord & rhs,
726  const KnowledgeUpdateSettings & settings)
727 {
728  int result = 1;
729 
730  MADARA_GUARD_TYPE guard (mutex_);
731 
732  // if it's found, then compare the value
733  if (!settings.always_overwrite && target.is_valid ())
734  {
735  // if we do not have enough quality to update the variable
736  // return -2
737  if (rhs.quality < target.record_->quality)
738  result = -2;
739 
740  // if we have the same quality, but our clock value
741  // is less than what we've already seen, then return -3
742  else if (rhs.quality == target.record_->quality &&
743  rhs.clock < target.record_->clock)
744  result = -3;
745 
746  // if we reach this point, then the record is safe to copy
747  target.record_->set_value (rhs);
748 
749  knowledge::KnowledgeRecord & current_value = *target.record_;
750 
751  mark_and_signal (target.name_.get_ptr (), &current_value, settings);
752  }
753 
754  // if we need to update the global clock, then update it
755  if (rhs.clock >= this->clock_)
756  this->clock_ = rhs.clock + 1;
757 
758  // value was changed
759  return result;
760 }
761 
765 void
767 {
768  changed_.MADARA_CONDITION_NOTIFY_ONE ();
769 }
770 
771 // print all variables and their values
772 void
774  unsigned int level) const
775 {
776  MADARA_GUARD_TYPE guard (mutex_);
777  for (madara::knowledge::KnowledgeMap::const_iterator i = map_.begin ();
778  i != map_.end ();
779  ++i)
780  {
781  if (i->second.exists ())
782  {
783  madara_logger_ptr_log (logger_, (int)level, "%s=%s\n",
784  i->first.c_str (), i->second.to_string (", ").c_str ());
785  }
786  }
787 }
788 
789 // print all variables and their values
790 void
792  std::string & target,
793  const std::string & array_delimiter,
794  const std::string & record_delimiter,
795  const std::string & key_val_delimiter) const
796 {
797  MADARA_GUARD_TYPE guard (mutex_);
798  std::stringstream buffer;
799 
800  bool first = true;
801 
802  for (madara::knowledge::KnowledgeMap::const_iterator i = map_.begin ();
803  i != map_.end ();
804  ++i)
805  {
806  // separate each record with the record_delimiter
807  if (!first)
808  {
809  buffer << record_delimiter;
810  }
811 
812  buffer << i->first;
813 
814  // separate the key/value pairing with the key_val_delimiter
815  buffer << key_val_delimiter;
816 
817  if (i->second.is_string_type ())
818  {
819  buffer << "'";
820  }
821  else if (i->second.type () == i->second.DOUBLE_ARRAY ||
822  i->second.type () == i->second.INTEGER_ARRAY)
823  {
824  buffer << "[";
825  }
826 
827  // use the array_delimiter for the underlying to_string functions
828  buffer << i->second.to_string (array_delimiter);
829 
830  if (i->second.is_string_type ())
831  {
832  buffer << "'";
833  }
834  else if (i->second.type () == i->second.DOUBLE_ARRAY ||
835  i->second.type () == i->second.INTEGER_ARRAY)
836  {
837  buffer << "]";
838  }
839 
840  if (first)
841  first = false;
842  }
843 
844  target = buffer.str ();
845 }
846 
852  const std::string & statement) const
853 {
854  // enter the mutex
855  MADARA_GUARD_TYPE guard (mutex_);
856 
857  // vectors for holding parsed tokens and pivot_list
858  size_t subcount = 0;
859  size_t begin_exp = 0;
860 
861  std::stringstream builder;
862 
863  // iterate over the input string
864  for (std::string::size_type i = 0; i < statement.size (); ++i)
865  {
866  // if this is an open brace, increase the subcount
867  if (statement[i] == '{')
868  {
869  ++subcount;
870  if (subcount == 1)
871  begin_exp = i;
872  }
873  // closed brace should decrease subcount
874  else if (statement[i] == '}')
875  {
876  if (subcount == 1)
877  {
878  std::string expandable =
879  statement.substr (begin_exp + 1, i - begin_exp - 1);
880  std::string results = this->expand_statement (expandable);
881  builder << this->get (results);
882  }
883  --subcount;
884  }
885  // otherwise, if this subcount is 0, then we need to add it to our output
886  // we allow anything not in subcount == 0 to be handled through recursion
887  else
888  {
889  if (subcount == 0)
890  builder << statement[i];
891  }
892  }
893 
894  // check to see if all brace counts are appropriate
895  if (subcount != 0)
896  {
898  "KARL COMPILE ERROR : Improperly matched braces in %s\n",
899  statement.c_str ());
900 
901  exit (-1);
902  }
903 
904  return builder.str ();
905 }
906 
907 #ifndef _MADARA_NO_KARL_
908 
909 // defines a function by name
910 void
912  const std::string & name,
914  const KnowledgeReferenceSettings & settings)
915 {
916  // enter the mutex
917  std::string key_actual;
918  const std::string * key_ptr;
919  MADARA_GUARD_TYPE guard (mutex_);
920 
921  if (settings.expand_variables)
922  {
923  key_actual = expand_statement (name);
924  key_ptr = &key_actual;
925  }
926  else
927  key_ptr = &name;
928 
929  // check for null key
930  if (*key_ptr == "")
931  return;
932 
933  functions_[*key_ptr] = Function (func);
934 }
935 
936 void
938  const std::string & name,
939  knowledge::KnowledgeRecord (*func) (const char * name, FunctionArguments &, Variables &),
940  const KnowledgeReferenceSettings & settings)
941 {
942  // enter the mutex
943  std::string key_actual;
944  const std::string * key_ptr;
945  MADARA_GUARD_TYPE guard (mutex_);
946 
947  if (settings.expand_variables)
948  {
949  key_actual = expand_statement (name);
950  key_ptr = &key_actual;
951  }
952  else
953  key_ptr = &name;
954 
955  // check for null key
956  if (*key_ptr == "")
957  return;
958 
959  functions_[*key_ptr] = Function (func);
960 }
961 
962 #ifdef _MADARA_JAVA_
963 void
965  jobject callable,
966  const KnowledgeReferenceSettings & settings)
967 {
968  // enter the mutex
969  std::string key_actual;
970  const std::string * key_ptr;
971  MADARA_GUARD_TYPE guard (mutex_);
972 
973  if (settings.expand_variables)
974  {
975  key_actual = expand_statement (name);
976  key_ptr = &key_actual;
977  }
978  else
979  key_ptr = &name;
980 
981  // check for null key
982  if (*key_ptr == "")
983  return;
984 
985  functions_[*key_ptr] = Function (callable);
986 }
987 #endif
988 
989 #ifdef _MADARA_PYTHON_CALLBACKS_
990 void
992  boost::python::object callable,
993  const KnowledgeReferenceSettings & settings)
994 {
995  // enter the mutex
996  std::string key_actual;
997  const std::string * key_ptr;
998  MADARA_GUARD_TYPE guard (mutex_);
999 
1000  if (settings.expand_variables)
1001  {
1002  key_actual = expand_statement (name);
1003  key_ptr = &key_actual;
1004  }
1005  else
1006  key_ptr = &name;
1007 
1008  // check for null key
1009  if (*key_ptr == "")
1010  return;
1011 
1012  functions_[*key_ptr] = Function (callable);
1013 }
1014 
1015 #endif
1016 
1017 void
1019  const std::string & expression,
1020  const KnowledgeReferenceSettings & settings)
1021 {
1022  CompiledExpression compiled = compile (expression);
1023  define_function (name, compiled, settings);
1024 }
1025 
1026 void
1028  const CompiledExpression & expression,
1029  const KnowledgeReferenceSettings & settings)
1030 {
1031  // enter the mutex
1032  std::string key_actual;
1033  const std::string * key_ptr;
1034  MADARA_GUARD_TYPE guard (mutex_);
1035 
1036  if (settings.expand_variables)
1037  {
1038  key_actual = expand_statement (name);
1039  key_ptr = &key_actual;
1040  }
1041  else
1042  key_ptr = &name;
1043 
1044  // check for null key
1045  if (*key_ptr == "")
1046  return;
1047 
1048  functions_[*key_ptr] = Function (expression.expression);
1049 }
1050 
1051 
1054  const std::string & name,
1055  const KnowledgeReferenceSettings & settings)
1056 {
1057  // enter the mutex
1058  std::string key_actual;
1059  const std::string * key_ptr;
1060  MADARA_GUARD_TYPE guard (mutex_);
1061 
1062  if (settings.expand_variables)
1063  {
1064  key_actual = expand_statement (name);
1065  key_ptr = &key_actual;
1066  }
1067  else
1068  key_ptr = &name;
1069 
1070  // check for null key
1071  if (*key_ptr == "")
1072  return 0;
1073 
1074  return &functions_[*key_ptr];
1075 }
1076 
1077 
1080  const std::string & expression)
1081 {
1083  "ThreadSafeContext::compile:" \
1084  " compiling %s\n", expression.c_str ());
1085 
1086  MADARA_GUARD_TYPE guard (mutex_);
1087  CompiledExpression ce;
1088  ce.logic = expression;
1089  ce.expression = interpreter_->interpret (*this, expression);
1090 
1091  return ce;
1092 }
1093 
1096  CompiledExpression expression,
1097  const KnowledgeUpdateSettings & settings)
1098 {
1099  MADARA_GUARD_TYPE guard (mutex_);
1100  return expression.expression.evaluate (settings);
1101 }
1102 
1106  const KnowledgeUpdateSettings & settings)
1107 {
1108  MADARA_GUARD_TYPE guard (mutex_);
1109  if (root)
1110  return root->evaluate (settings);
1111  else
1113 }
1114 
1115 #endif // _MADARA_NO_KARL_
1116 
1117 size_t
1119  const std::string & subject,
1120  unsigned int start,
1121  unsigned int end,
1122  std::vector <KnowledgeRecord> & target)
1123 {
1124  target.clear ();
1125 
1126  // enter the mutex
1127  MADARA_GUARD_TYPE guard (mutex_);
1128 
1129  if (end >= start)
1130  {
1131  target.resize (end - start + 1);
1132 
1133  for (unsigned int i = 0; start <= end; ++start, ++i)
1134  {
1135  std::stringstream buffer;
1136  buffer << subject;
1137  buffer << start;
1138  target[i] = get (buffer.str ());
1139  }
1140  }
1141 
1142  return target.size ();
1143 }
1144 
1145 
1146 size_t
1148  const std::string & expression,
1149  std::map <std::string, knowledge::KnowledgeRecord> & target)
1150 {
1151  target.clear ();
1152 
1153  std::string subject (expression);
1154  bool matches_found (false);
1155 
1156  // remove the wildcard and make this into a subject
1157  if (subject[subject.size () - 1] == '*')
1158  subject.resize (subject.size () - 1);
1159 
1160  // just in case a string implementation does not inline
1161  std::string::size_type subject_size = subject.size ();
1162  const char * subject_ptr = subject.c_str ();
1163 
1164  // enter the mutex
1165  MADARA_GUARD_TYPE guard (mutex_);
1166 
1167  // if expression is blank, assume the user wants all variables
1168  if (expression.size () == 0)
1169  target = map_;
1170  else
1171  {
1172  for (KnowledgeMap::iterator i = map_.begin ();
1173  i != map_.end (); ++i)
1174  {
1175  if (i->first.size () >= subject_size)
1176  {
1177  int result = strncmp (i->first.c_str (), subject_ptr, subject_size);
1178  if (result == 0)
1179  {
1180  // we have a match, add this to the map
1181  target[i->first] = i->second;
1182  matches_found = true;
1183  }
1184  else if (matches_found)
1185  {
1186  // we have already found matches, and now we're not seeing matches
1187  break;
1188  }
1189  }
1190  }
1191  }
1192 
1193 
1194  return target.size ();
1195 }
1196 
1197 
1198 void
1200  const std::string & prefix,
1201  const std::string & suffix,
1202  VariableReferences & matches)
1203 {
1204  // get the first thing that either matches the prefix or is just after it
1205  KnowledgeMap::iterator i = map_.lower_bound (prefix);
1206 
1207  // if we have a valid prefix, then there is more to do
1208  if (i != map_.end ())
1209  {
1210  // keep track of the beginning as we're going to iterate twice
1211  KnowledgeMap::iterator first_match = i;
1212  KnowledgeMap::iterator after_matches = map_.end ();
1213  VariableReferences::iterator match;
1214 
1215  size_t num_matches = 0;
1216 
1217  size_t prefix_length = prefix.length ();
1218 
1219  // Iterate over all of the prefix matches
1220  while (i != map_.end () &&
1221  i->first.compare (0, prefix_length, prefix) == 0)
1222  {
1223  ++i;
1224  if (suffix == "" || utility::ends_with (i->first, suffix))
1225  {
1226  ++num_matches;
1227  }
1228  }
1229 
1230  // save the end point so we can do fewer checks
1231  after_matches = i;
1232 
1233  // resize the matches to the appropriate size
1234  matches.resize (num_matches);
1235 
1236  // now, instead of many resizes, we are just going to resize once and set
1237  i = first_match;
1238  num_matches = 0;
1239 
1240  match = matches.begin ();
1241 
1242  // Reiterate
1243  while (i != after_matches)
1244  {
1245  if (suffix == "" || utility::ends_with (i->first, suffix))
1246  {
1247  match->set_name (i->first);
1248  match->record_ = &i->second;
1249  ++match;
1250  }
1251  ++i;
1252  }
1253  }
1254  else
1255  {
1256  matches.clear ();
1257  }
1258 }
1259 
1260 
1261 size_t
1263  const std::string & prefix,
1264  const std::string & delimiter,
1265  const std::string & suffix,
1266  std::vector <std::string> & next_keys,
1267  std::map <std::string, knowledge::KnowledgeRecord> & result,
1268  bool just_keys)
1269 {
1270  // clear the user provided maps
1271  next_keys.clear ();
1272  result.clear ();
1273 
1274  // loop tracking for optimizations
1275  bool matches_found (false);
1276  std::string last_key ("");
1277 
1278  // enter the mutex
1279  MADARA_GUARD_TYPE guard (mutex_);
1280 
1281  KnowledgeMap::iterator i = map_.begin ();
1282 
1283  if (prefix != "")
1284  {
1285  i = map_.lower_bound (prefix);
1286  }
1287 
1288  for (; i != map_.end (); ++i)
1289  {
1290  // if the prefix doesn't match
1291  if (prefix != "" && !utility::begins_with (i->first, prefix))
1292  {
1293  // if we had previously matched a prefix, we're done
1294  if (matches_found)
1295  {
1296  break;
1297  }
1298  }
1299  // we have a prefix match
1300  else
1301  {
1302  // set matches found if it hasn't been set previously
1303  if (!matches_found)
1304  {
1305  matches_found = true;
1306  }
1307 
1308  // if the suffix is provided and doesn't match, continue
1309  if (suffix != "" && !utility::ends_with (i->first, suffix))
1310  {
1311  continue;
1312  }
1313 
1314  if (!just_keys)
1315  {
1316  // the key is safe to add to the master map
1317  result[i->first] = i->second;
1318  }
1319 
1320  // determine if there is a next key in the hierarchy
1321  size_t prefix_end = prefix.length () + delimiter.length ();
1322 
1323  std::string current_delimiter = i->first.substr (prefix.length (), delimiter.length ());
1324 
1325  if (current_delimiter == delimiter && i->first.length () > prefix_end)
1326  {
1327  // find the end of the sub key
1328  size_t key_end = i->first.find (delimiter, prefix_end);
1329 
1330  // if we haven't seen the subkey, add it
1331  std::string current_key (
1332  i->first.substr (prefix_end, key_end - prefix_end));
1333  if (current_key != last_key)
1334  {
1335  next_keys.push_back (current_key);
1336  last_key = current_key;
1337  }
1338  }
1339  }
1340  }
1341 
1342 
1343 
1344  return result.size ();
1345 }
1346 
1347 void
1349 const std::string & prefix,
1351 {
1352  // enter the mutex
1353  MADARA_GUARD_TYPE guard (mutex_);
1354 
1355  std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator>
1356  iters (get_prefix_range (prefix));
1357 
1358  map_.erase (iters.first, iters.second);
1359 
1360  {
1361  // check the changed map
1362  std::pair<KnowledgeRecords::iterator, KnowledgeRecords::iterator>
1363  changed (changed_map_.lower_bound (prefix), changed_map_.end ());
1364 
1365  // does our lower bound actually contain the prefix?
1366  if (madara::utility::begins_with (changed.first->first, prefix))
1367  {
1368  changed.second = changed.first;
1369 
1370  // until we find an entry that does not begin with prefix, loop
1371  for (++changed.second;
1372  madara::utility::begins_with (changed.second->first, prefix) &&
1373  changed.second != changed_map_.end ();
1374  ++changed.second);
1375 
1376  changed_map_.erase (changed.first, changed.second);
1377  }
1378  }
1379 
1380  {
1381  // check the local changed map
1382  std::pair<KnowledgeRecords::iterator, KnowledgeRecords::iterator>
1383  local_changed (local_changed_map_.lower_bound (prefix),
1384  local_changed_map_.end ());
1385 
1386 
1387  // does our lower bound actually contain the prefix?
1388  if (madara::utility::begins_with (local_changed.first->first, prefix))
1389  {
1390  local_changed.second = local_changed.first;
1391 
1392  // until we find an entry that does not begin with prefix, loop
1393  for (++local_changed.second;
1394  madara::utility::begins_with (local_changed.second->first, prefix) &&
1395  local_changed.second != local_changed_map_.end ();
1396  ++local_changed.second);
1397 
1398  local_changed_map_.erase (local_changed.first, local_changed.second);
1399  }
1400  }
1401 }
1402 
1403 std::pair<madara::knowledge::KnowledgeMap::iterator,
1404  madara::knowledge::KnowledgeMap::iterator>
1406  const std::string &prefix)
1407 {
1408  std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator>
1409  ret(map_.begin(), map_.end());
1410 
1411  // If prefix is empty string, copy entire map
1412  if(prefix.size() > 0)
1413  {
1414  ssize_t psz = prefix.size();
1415 
1416  // Find first element >= prefix; i.e., first match or first with that prefix
1417  ret.second = ret.first = map_.lower_bound(prefix);
1418 
1419  // Advance e until it is just past last element with prefix (or at end)
1420  while(ret.second != map_.end() &&
1421  ret.second->first.compare(0, psz, prefix) == 0)
1422  ++ret.second;
1423  }
1424  return ret;
1425 }
1426 
1427 std::pair<madara::knowledge::KnowledgeMap::const_iterator,
1428  madara::knowledge::KnowledgeMap::const_iterator>
1430  const std::string &prefix) const
1431 {
1432  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1433  ret(map_.begin(), map_.end());
1434 
1435  // If prefix is empty string, copy entire map
1436  if(prefix.size() > 0)
1437  {
1438  ssize_t psz = prefix.size ();
1439 
1440  // Find first element >= prefix; i.e., first match or first with that prefix
1441  ret.second = ret.first = map_.lower_bound(prefix);
1442 
1443  // Advance e until it is just past last element with prefix (or at end)
1444  while(ret.second != map_.end() &&
1445  ret.second->first.compare(0, psz, prefix) == 0)
1446  ++ret.second;
1447  }
1448  return ret;
1449 }
1450 
1453  const std::string &prefix) const
1454 {
1455  // enter the mutex
1456  MADARA_GUARD_TYPE guard (mutex_);
1457 
1458  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1459  iters(get_prefix_range(prefix));
1460 
1461  // RVO should avoid copying this map
1462  return KnowledgeMap(deep_iterate(iters.first), deep_iterate(iters.second));
1463 }
1464 
1467  const std::string &prefix) const
1468 {
1469  // enter the mutex
1470  MADARA_GUARD_TYPE guard (mutex_);
1471 
1472  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1473  iters(get_prefix_range(prefix));
1474 
1475  // NRVO should avoid copying this map
1476  KnowledgeMap ret;
1477  KnowledgeMap::iterator hint = ret.begin();
1478  for(;iters.first != iters.second; ++iters.first)
1479  {
1480  ret.emplace_hint(ret.end(), iters.first->first.substr(prefix.size()),
1481  iters.first->second);
1482  }
1483  return ret;
1484 }
1485 
1486 void
1488  const ThreadSafeContext & source,
1489  const KnowledgeRequirements & settings)
1490 {
1492  "ThreadSafeContext::copy:" \
1493  " copying a context\n");
1494 
1495  if (settings.clear_knowledge)
1496  {
1498  "ThreadSafeContext::copy:" \
1499  " clearing knowledge in target context\n");
1500 
1501  map_.clear ();
1502  }
1503 
1504  if (settings.predicates.size () != 0)
1505  {
1506  for (auto predicate : settings.predicates)
1507  {
1508  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1509  iters(source.get_prefix_range(predicate.prefix));
1510 
1511  if (predicate.suffix == "")
1512  {
1514  "ThreadSafeContext::copy:" \
1515  " matching predicate.prefix=%s\n", predicate.prefix.c_str ());
1516 
1517  for(;iters.first != iters.second; ++iters.first)
1518  {
1520  "ThreadSafeContext::copy:" \
1521  " looking for %s\n", iters.first->first.c_str ());
1522 
1523  auto where = map_.lower_bound(iters.first->first);
1524 
1525  if (where == map_.end() || where->first != iters.first->first)
1526  {
1528  "ThreadSafeContext::copy:" \
1529  " inserting %s\n", iters.first->first.c_str ());
1530 
1531  where = map_.emplace_hint(where,
1532  iters.first->first, iters.first->second);
1533  }
1534  else
1535  {
1537  "ThreadSafeContext::copy:" \
1538  " overwriting %s\n", iters.first->first.c_str ());
1539 
1540  where->second = iters.first->second;
1541  }
1542  }
1543  }
1544  else // we need to match a suffix
1545  {
1547  "ThreadSafeContext::copy:" \
1548  " matching predicate.suffix=%s\n", predicate.suffix.c_str ());
1549 
1550  for(;iters.first != iters.second; ++iters.first)
1551  {
1552  if (madara::utility::ends_with (iters.first->first,
1553  predicate.suffix))
1554  {
1556  "ThreadSafeContext::copy:" \
1557  " looking for %s\n", iters.first->first.c_str ());
1558 
1559  auto where = map_.lower_bound(iters.first->first);
1560 
1561  if (where == map_.end() || where->first != iters.first->first)
1562  {
1564  "ThreadSafeContext::copy:" \
1565  " inserting %s\n", iters.first->first.c_str ());
1566 
1567  where = map_.emplace_hint(where,
1568  iters.first->first, iters.first->second);
1569  }
1570  else
1571  {
1573  "ThreadSafeContext::copy:" \
1574  " overwriting %s\n", iters.first->first.c_str ());
1575 
1576  where->second = iters.first->second;
1577  }
1578  } // end suffix match
1579  }
1580  }
1581  }
1582  }
1583  // we need to insert everything from source into this
1584  else
1585  {
1586 
1587  std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1588  iters (source.map_.begin (), source.map_.end ());
1589 
1590  for(;iters.first != iters.second; ++iters.first)
1591  {
1592  map_.insert (map_.begin (), KnowledgeMap::value_type(
1593  iters.first->first, iters.first->second));
1594  }
1595  }
1596 }
1597 
1598 void
1600  const ThreadSafeContext & source,
1601  const CopySet & copy_set,
1602  bool clean_copy)
1603 {
1604  // if we need to clean first, clear the map
1605  if (clean_copy)
1606  map_.clear ();
1607 
1608  // if the copy set is empty, copy everything
1609  if (copy_set.size () == 0)
1610  {
1611  for (KnowledgeMap::const_iterator i = source.map_.begin ();
1612  i != source.map_.end (); ++i)
1613  {
1614  map_[i->first] = (i->second);
1615  }
1616  }
1617  else
1618  {
1619  // we have a copy set, so only copy what the user asked for
1620  for (CopySet::const_iterator key = copy_set.begin ();
1621  key != copy_set.end (); ++key)
1622  {
1623  // check source for existence of the current copy set key
1624  KnowledgeMap::const_iterator i = source.map_.find (key->first);
1625 
1626  // if found, make a copy of the found entry
1627  if (i != source.map_.end ())
1628  {
1629  map_[i->first] = (i->second);
1630  }
1631  }
1632  }
1633 }
1634 
1635 int64_t
1637  const std::string & filename,
1638  const std::string & id) const
1639 {
1641  "ThreadSafeContext::save_context:" \
1642  " opening file %s\n", filename.c_str ());
1643 
1644  int64_t total_written (0);
1645  FILE * file = fopen (filename.c_str (), "wb");
1646 
1647  FileHeader meta;
1648  meta.states = 1;
1649  strncpy (meta.originator, id.c_str (),
1650  sizeof (meta.originator) < id.size () + 1 ?
1651  sizeof (meta.originator) : id.size () + 1);
1652 
1653  transport::MessageHeader checkpoint_header;
1654 
1655  if (file)
1656  {
1657  int64_t max_buffer (102800);
1658  int64_t buffer_remaining (max_buffer);
1659  utility::ScopedArray <char> buffer = new char [max_buffer];
1660 
1661  char * current = buffer.get_ptr ();
1662 
1664  "ThreadSafeContext::save_context:" \
1665  " generating file meta\n");
1666 
1667  meta.size += checkpoint_header.encoded_size ();
1668  checkpoint_header.size = checkpoint_header.encoded_size ();
1669 
1670  current = meta.write (current, buffer_remaining);
1671 
1672  checkpoint_header.clock = clock_;
1673 
1674  current = checkpoint_header.write (current, buffer_remaining);
1675 
1677  "ThreadSafeContext::save_context:" \
1678  " writing records\n");
1679 
1680  // lock the context
1681  MADARA_GUARD_TYPE guard (mutex_);
1682 
1683  for (KnowledgeMap::const_iterator i = map_.begin ();
1684  i != map_.end (); ++i)
1685  {
1686  if (i->second.exists ())
1687  {
1688  // get the encoded size of the record for checking buffer boundaries
1689  int64_t encoded_size = i->second.get_encoded_size (i->first);
1690  ++checkpoint_header.updates;
1691  meta.size += encoded_size;
1692  checkpoint_header.size += encoded_size;
1693 
1694  if (encoded_size > buffer_remaining)
1695  {
1700  current = buffer.get_ptr ();
1701  fwrite (current,
1702  (size_t)(max_buffer - buffer_remaining), 1, file);
1703  total_written += (int64_t)(max_buffer - buffer_remaining);
1704  buffer_remaining = max_buffer;
1705 
1706  if (encoded_size > max_buffer)
1707  {
1712  buffer = new char[encoded_size];
1713  max_buffer = encoded_size;
1714  buffer_remaining = max_buffer;
1715  current = buffer.get_ptr ();
1716  } // end if larger than buffer
1717  } // end if larger than buffer remaining
1718 
1719  current = i->second.write (current, i->first, buffer_remaining);
1720  }
1721  }
1722 
1723  if (buffer_remaining != max_buffer)
1724  {
1725  fwrite (buffer.get_ptr (),
1726  (size_t) (max_buffer - buffer_remaining), 1, file);
1727  total_written += (int64_t) (max_buffer - buffer_remaining);
1728  }
1729 
1730  // update the meta data at the front
1731  fseek (file, 0, SEEK_SET);
1732 
1733  current = buffer.get_ptr ();
1734  buffer_remaining = max_buffer;
1735 
1736  current = meta.write (current, buffer_remaining);
1737  current = checkpoint_header.write (current, buffer_remaining);
1738 
1739  fwrite (buffer.get_ptr (), current - buffer.get_ptr (), 1, file);
1740 
1741  fclose (file);
1742  }
1743 
1744  return meta.size;
1745 }
1746 
1747 int64_t
1749  const CheckpointSettings & settings) const
1750 {
1752  "ThreadSafeContext::save_context:" \
1753  " opening file %s\n", settings.filename.c_str ());
1754 
1755  //int64_t total_written (0);
1756  FILE * file = fopen (settings.filename.c_str (), "wb");
1757 
1758  FileHeader meta;
1759  meta.states = 1;
1760  strncpy (meta.originator, settings.originator.c_str (),
1761  sizeof (meta.originator) < settings.originator.size () + 1 ?
1762  sizeof (meta.originator) : settings.originator.size () + 1);
1763 
1764  transport::MessageHeader checkpoint_header;
1765 
1766  if (file)
1767  {
1768  int64_t max_buffer (settings.buffer_size);
1769  int64_t buffer_remaining (max_buffer);
1770  utility::ScopedArray <char> buffer = new char [max_buffer];
1771 
1772  char * current = buffer.get_ptr ();
1773 
1775  "ThreadSafeContext::save_context:" \
1776  " generating file meta\n");
1777 
1778  meta.size += checkpoint_header.encoded_size ();
1779  checkpoint_header.size = checkpoint_header.encoded_size ();
1780 
1781  if (settings.override_timestamp)
1782  {
1783  meta.initial_timestamp = settings.initial_timestamp;
1784  meta.last_timestamp = settings.last_timestamp;
1785  }
1786 
1787  current = meta.write (current, buffer_remaining);
1788 
1789  if (settings.override_lamport)
1790  {
1791  checkpoint_header.clock = settings.initial_lamport_clock;
1792  }
1793  else
1794  {
1795  checkpoint_header.clock = clock_;
1796  }
1797 
1798  current = checkpoint_header.write (current, buffer_remaining);
1799 
1801  "ThreadSafeContext::save_context:" \
1802  " writing records\n");
1803 
1804  // lock the context
1805  MADARA_GUARD_TYPE guard (mutex_);
1806 
1807  for (KnowledgeMap::const_iterator i = map_.begin ();
1808  i != map_.end (); ++i)
1809  {
1810  if (i->second.exists ())
1811  {
1812  // check if the prefix is allowed
1813  if (settings.prefixes.size () > 0)
1814  {
1816  "ThreadSafeContext::save_context:" \
1817  " we have %d prefixes to check against.\n",
1818  (int)settings.prefixes.size ());
1819 
1820  bool prefix_found = false;
1821  for (size_t j = 0;
1822  j < settings.prefixes.size () && !prefix_found; ++j)
1823  {
1825  "ThreadSafeContext::save_context:" \
1826  " checking record %s against prefix %s.\n",
1827  i->first.c_str (),
1828  settings.prefixes[j].c_str ());
1829 
1831  i->first, settings.prefixes[j]))
1832  {
1834  "ThreadSafeContext::save_context:" \
1835  " record has the correct prefix.\n");
1836 
1837  prefix_found = true;
1838  }
1839  }
1840 
1841  if (!prefix_found)
1842  {
1844  "ThreadSafeContext::save_context:" \
1845  " record has the wrong prefix. Rejected.\n");
1846 
1847  continue;
1848  }
1849  }
1850 
1851  // get the encoded size of the record for checking buffer boundaries
1852  int64_t encoded_size = i->second.get_encoded_size (i->first);
1853  ++checkpoint_header.updates;
1854  meta.size += encoded_size;
1855  checkpoint_header.size += encoded_size;
1856 
1857  current = i->second.write (current, i->first, buffer_remaining);
1858  }
1859  }
1860 
1861  // write the final sizes
1862  current = meta.write (buffer.get_ptr (), max_buffer);
1863  current = checkpoint_header.write (current, max_buffer);
1864 
1865  // call decode with any buffer filters
1866  int total = settings.encode ((unsigned char *)buffer.get_ptr (),
1867  (int)meta.size, (int)max_buffer);
1868 
1869  // update the meta data at the front
1870  fseek (file, 0, SEEK_SET);
1871 
1873  "ThreadSafeContext::save_context:" \
1874  " encoding with buffer filters: %d:%d bytes written.\n",
1875  (int)meta.size, (int)checkpoint_header.size);
1876 
1877  fwrite (buffer.get_ptr (), (size_t)total, 1, file);
1878 
1879  fclose (file);
1880  }
1881 
1882  return meta.size;
1883 }
1884 
1885 
1886 
1887 int64_t
1889 const std::string & filename) const
1890 {
1892  "ThreadSafeContext::save_as_karl:" \
1893  " opening file %s\n", filename.c_str ());
1894 
1895  int64_t bytes_written (0);
1896  std::stringstream buffer;
1897  std::ofstream file;
1898  file.open (filename.c_str ());
1899 
1900  if (file.is_open ())
1901  {
1902  // lock the context
1903  MADARA_GUARD_TYPE guard (mutex_);
1904 
1905  for (KnowledgeMap::const_iterator i = map_.begin ();
1906  i != map_.end (); ++i)
1907  {
1908  if (i->second.exists ())
1909  {
1910  buffer << i->first;
1911  buffer << "=";
1912 
1913  if (!i->second.is_binary_file_type ())
1914  {
1915  // record is a non binary file type
1916  if (i->second.is_string_type ())
1917  {
1918  // strings require quotation marks
1919  buffer << "\"";
1920  }
1921  else if (i->second.type () == knowledge::KnowledgeRecord::INTEGER_ARRAY ||
1922  i->second.type () == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
1923  {
1924  // arrays require brackets
1925  buffer << "[";
1926  }
1927 
1928  buffer << i->second;
1929  if (i->second.is_string_type ())
1930  {
1931  // strings require quotation marks
1932  buffer << "\"";
1933  }
1934  else if (i->second.type () == knowledge::KnowledgeRecord::INTEGER_ARRAY ||
1935  i->second.type () == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
1936  {
1937  // arrays require brackets
1938  buffer << "]";
1939  }
1940  }
1941  else
1942  {
1943  buffer << "#read_file ('";
1944 
1945  std::string path = utility::extract_path (filename);
1946 
1947  if (path == "")
1948  path = ".";
1949 
1950  path += "/";
1951  path += i->first;
1952 
1953  if (i->second.type () == knowledge::KnowledgeRecord::IMAGE_JPEG)
1954  {
1955  path += ".jpg";
1956  }
1957  else
1958  {
1959  path += ".dat";
1960  }
1961 
1962  utility::write_file (path,
1963  (void *)&(*i->second.file_value_)[0], i->second.size ());
1964  buffer << path;
1965 
1966 
1967  buffer << "')";
1968  }
1969 
1970  buffer << ";\n";
1971  }
1972  }
1973 
1974  std::string result = buffer.str ();
1975  file << result;
1976 
1977  bytes_written = (int64_t) result.size ();
1978 
1979  file.close ();
1980  }
1981 
1982  return bytes_written;
1983 }
1984 
1985 
1986 int64_t
1988 const CheckpointSettings & settings) const
1989 {
1991  "ThreadSafeContext::save_as_karl:" \
1992  " opening file %s\n", settings.filename.c_str ());
1993 
1994  int64_t bytes_written (0);
1995  std::stringstream buffer;
1996  std::ofstream file;
1997  file.open (settings.filename.c_str ());
1998 
1999  if (file.is_open ())
2000  {
2001  // lock the context
2002  MADARA_GUARD_TYPE guard (mutex_);
2003 
2004  for (KnowledgeMap::const_iterator i = map_.begin ();
2005  i != map_.end (); ++i)
2006  {
2007  if (i->second.exists ())
2008  {
2009  // check if the prefix is allowed
2010  if (settings.prefixes.size () > 0)
2011  {
2013  "ThreadSafeContext::save_as_karl:" \
2014  " we have %d prefixes to check against.\n",
2015  (int)settings.prefixes.size ());
2016 
2017  bool prefix_found = false;
2018  for (size_t j = 0;
2019  j < settings.prefixes.size () && !prefix_found; ++j)
2020  {
2022  "ThreadSafeContext::save_as_karl:" \
2023  " checking record %s against prefix %s.\n",
2024  i->first.c_str (),
2025  settings.prefixes[j].c_str ());
2026 
2028  i->first, settings.prefixes[j]))
2029  {
2031  "ThreadSafeContext::save_as_karl:" \
2032  " the record has the correct prefix.\n");
2033 
2034  prefix_found = true;
2035  }
2036  }
2037 
2038  if (!prefix_found)
2039  {
2041  "ThreadSafeContext::save_as_karl:" \
2042  " the record does not have a correct prefix.\n");
2043 
2044  continue;
2045  }
2046  }
2047 
2048  buffer << i->first;
2049  buffer << "=";
2050 
2051  if (!i->second.is_binary_file_type ())
2052  {
2053  // record is a non binary file type
2054  if (i->second.is_string_type ())
2055  {
2056  // strings require quotation marks
2057  buffer << "\"";
2058  }
2059  else if (i->second.type () == knowledge::KnowledgeRecord::INTEGER_ARRAY ||
2060  i->second.type () == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
2061  {
2062  // arrays require brackets
2063  buffer << "[";
2064  }
2065 
2066  buffer << i->second;
2067  if (i->second.is_string_type ())
2068  {
2069  // strings require quotation marks
2070  buffer << "\"";
2071  }
2072  else if (i->second.type () == knowledge::KnowledgeRecord::INTEGER_ARRAY ||
2073  i->second.type () == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
2074  {
2075  // arrays require brackets
2076  buffer << "]";
2077  }
2078  }
2079  else
2080  {
2081  buffer << "#read_file ('";
2082 
2083  std::string path = utility::extract_path (settings.filename);
2084 
2085  if (path == "")
2086  path = ".";
2087 
2088  path += "/";
2089  path += i->first;
2090 
2091  if (i->second.type () == knowledge::KnowledgeRecord::IMAGE_JPEG)
2092  {
2093  path += ".jpg";
2094  }
2095  else
2096  {
2097  path += ".dat";
2098  }
2099 
2100  utility::write_file (path,
2101  (void *)&(*i->second.file_value_)[0], i->second.size ());
2102  buffer << path;
2103 
2104 
2105  buffer << "')";
2106  }
2107 
2108  buffer << ";\n";
2109  }
2110  }
2111 
2112  std::string result = buffer.str ();
2113  file << result;
2114 
2115  bytes_written = (int64_t) result.size ();
2116 
2117  file.close ();
2118  }
2119 
2120  return bytes_written;
2121 }
2122 
2123 
2124 int64_t
2126 const std::string & filename) const
2127 {
2129  "ThreadSafeContext::save_as_json:" \
2130  " opening file %s\n", filename.c_str ());
2131 
2132  int64_t bytes_written (0);
2133 
2134  std::stringstream buffer;
2135  std::ofstream file;
2136  file.open (filename.c_str ());
2137 
2138  if (file.is_open ())
2139  {
2140  // lock the context
2141  MADARA_GUARD_TYPE guard (mutex_);
2142 
2143  buffer << "{\n";
2144 
2145  for (KnowledgeMap::const_iterator i = map_.begin ();
2146  i != map_.end (); ++i)
2147  {
2148  if (i->second.exists ())
2149  {
2150  buffer << " \"";
2151  buffer << i->first;
2152  buffer << "\" : ";
2153 
2154  if (!i->second.is_binary_file_type ())
2155  {
2156  // record is a non binary file type
2157  if (i->second.is_string_type ())
2158  {
2159  // strings require quotation marks
2160  buffer << "\"";
2161  }
2162  else if (i->second.type () == knowledge::KnowledgeRecord::INTEGER_ARRAY ||
2163  i->second.type () == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
2164  {
2165  // arrays require brackets
2166  buffer << "[";
2167  }
2168 
2169  buffer << i->second;
2170  if (i->second.is_string_type ())
2171  {
2172  // strings require quotation marks
2173  buffer << "\"";
2174  }
2175  else if (i->second.type () == knowledge::KnowledgeRecord::INTEGER_ARRAY ||
2176  i->second.type () == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
2177  {
2178  // arrays require brackets
2179  buffer << "]";
2180  }
2181  }
2182  else
2183  {
2184  buffer << "#read_file ('";
2185 
2186  std::string path = utility::extract_path (filename);
2187 
2188  if (path == "")
2189  path = ".";
2190 
2191  path += "/";
2192  path += i->first;
2193 
2194  if (i->second.type () == knowledge::KnowledgeRecord::IMAGE_JPEG)
2195  {
2196  path += ".jpg";
2197  }
2198  else
2199  {
2200  path += ".dat";
2201  }
2202 
2203  utility::write_file (path,
2204  (void *)&(*i->second.file_value_)[0], i->second.size ());
2205  buffer << path;
2206 
2207 
2208  buffer << "')";
2209  }
2210 
2211  KnowledgeMap::const_iterator j (i);
2212 
2213  if (++j != map_.end ())
2214  buffer << ",\n";
2215  }
2216  }
2217 
2218  buffer << "\n}\n";
2219 
2220  std::string result = buffer.str ();
2221  file << result;
2222 
2223  bytes_written = (int64_t) result.size ();
2224 
2225  file.close ();
2226  }
2227 
2228  return bytes_written;
2229 }
2230 
2231 
2232 int64_t
2234 const CheckpointSettings & settings) const
2235 {
2237  "ThreadSafeContext::save_as_json:" \
2238  " opening file %s\n", settings.filename.c_str ());
2239 
2240  int64_t bytes_written (0);
2241 
2242  std::stringstream buffer;
2243  std::ofstream file;
2244  file.open (settings.filename.c_str ());
2245 
2246  if (file.is_open ())
2247  {
2248  // lock the context
2249  MADARA_GUARD_TYPE guard (mutex_);
2250 
2251  buffer << "{\n";
2252 
2253  for (KnowledgeMap::const_iterator i = map_.begin ();
2254  i != map_.end (); ++i)
2255  {
2256  if (i->second.exists ())
2257  {
2258  // check if the prefix is allowed
2259  if (settings.prefixes.size () > 0)
2260  {
2262  "ThreadSafeContext::save_as_json:" \
2263  " we have %d prefixes to check against.\n",
2264  (int)settings.prefixes.size ());
2265 
2266  bool prefix_found = false;
2267  for (size_t j = 0;
2268  j < settings.prefixes.size () && !prefix_found; ++j)
2269  {
2271  "ThreadSafeContext::save_as_json:" \
2272  " checking record %s against prefix %s.\n",
2273  i->first.c_str (),
2274  settings.prefixes[j].c_str ());
2275 
2277  i->first, settings.prefixes[j]))
2278  {
2280  "ThreadSafeContext::save_as_json:" \
2281  " the record has the correct prefix.\n");
2282 
2283  prefix_found = true;
2284  }
2285  }
2286 
2287  if (!prefix_found)
2288  {
2290  "ThreadSafeContext::save_as_json:" \
2291  " the record does not have a correct prefix.\n");
2292 
2293  continue;
2294  }
2295  }
2296 
2297  buffer << " \"";
2298  buffer << i->first;
2299  buffer << "\" : ";
2300 
2301  if (!i->second.is_binary_file_type ())
2302  {
2303  // record is a non binary file type
2304  if (i->second.is_string_type ())
2305  {
2306  // strings require quotation marks
2307  buffer << "\"";
2308  }
2309  else if (i->second.type () == knowledge::KnowledgeRecord::INTEGER_ARRAY ||
2310  i->second.type () == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
2311  {
2312  // arrays require brackets
2313  buffer << "[";
2314  }
2315 
2316  buffer << i->second;
2317  if (i->second.is_string_type ())
2318  {
2319  // strings require quotation marks
2320  buffer << "\"";
2321  }
2322  else if (i->second.type () == knowledge::KnowledgeRecord::INTEGER_ARRAY ||
2323  i->second.type () == knowledge::KnowledgeRecord::DOUBLE_ARRAY)
2324  {
2325  // arrays require brackets
2326  buffer << "]";
2327  }
2328  }
2329  else
2330  {
2331  buffer << "#read_file ('";
2332 
2333  std::string path = utility::extract_path (settings.filename);
2334 
2335  if (path == "")
2336  path = ".";
2337 
2338  path += "/";
2339  path += i->first;
2340 
2341  if (i->second.type () == knowledge::KnowledgeRecord::IMAGE_JPEG)
2342  {
2343  path += ".jpg";
2344  }
2345  else
2346  {
2347  path += ".dat";
2348  }
2349 
2350  utility::write_file (path,
2351  (void *)&(*i->second.file_value_)[0], i->second.size ());
2352  buffer << path;
2353 
2354 
2355  buffer << "')";
2356  }
2357 
2358  KnowledgeMap::const_iterator j (i);
2359 
2360  if (++j != map_.end ())
2361  buffer << ",\n";
2362  }
2363  }
2364 
2365  buffer << "\n}\n";
2366 
2367  std::string result = buffer.str ();
2368  file << result;
2369 
2370  bytes_written = (int64_t) result.size ();
2371 
2372  file.close ();
2373  }
2374 
2375  return bytes_written;
2376 }
2377 
2378 
2379 
2380 int64_t
2382  const std::string & filename, std::string & id,
2383  const KnowledgeUpdateSettings & settings)
2384 {
2386  "ThreadSafeContext::load_context:" \
2387  " opening file %s\n", filename.c_str ());
2388 
2389  // using ACE for writing to the destination file
2390  FILE * file = fopen (filename.c_str (), "rb");
2391 
2392  int64_t total_read (0);
2393 
2394  if (file)
2395  {
2396  int64_t max_buffer (102800);
2397  int64_t buffer_remaining (max_buffer);
2398  FileHeader meta;
2399 
2400  utility::ScopedArray <char> buffer = new char [max_buffer];
2401  const char * current = buffer.get_ptr ();
2402 
2404  "ThreadSafeContext::load_context:" \
2405  " reading file meta data\n");
2406 
2407  total_read = fread (buffer.get_ptr (),
2408  1, max_buffer, file);
2409  buffer_remaining = (int64_t)total_read;
2410 
2411  if (total_read > FileHeader::encoded_size () &&
2412  FileHeader::file_header_test (current))
2413  {
2414  // if there was something in the file, and it was the right header
2415 
2416  current = meta.read (current, buffer_remaining);
2417  id = meta.originator;
2418 
2424  if (meta.states > 0)
2425  {
2426  for (uint64_t state = 0; state < meta.states; ++state)
2427  {
2428  if (buffer_remaining > (int64_t)
2430  {
2431  transport::MessageHeader checkpoint_header;
2432 
2433  current = checkpoint_header.read (current, buffer_remaining);
2434 
2440  if (checkpoint_header.size > (uint64_t) buffer_remaining)
2441  {
2446  utility::ScopedArray <char> new_buffer =
2447  new char [checkpoint_header.size];
2448  memcpy (new_buffer.get_ptr (), current,
2449  (size_t)buffer_remaining);
2450 
2451  // read the rest of checkpoint into new buffer
2452  total_read += fread (new_buffer.get_ptr () + buffer_remaining, 1,
2453  checkpoint_header.size
2454  - (uint64_t)buffer_remaining
2455  - checkpoint_header.encoded_size (), file);
2456 
2457  // update other variables
2458  max_buffer = checkpoint_header.size;
2459  buffer_remaining = checkpoint_header.size
2460  - checkpoint_header.encoded_size ();
2461  current = new_buffer.get_ptr ();
2462  buffer = new_buffer;
2463  } // end if allocation is needed
2464 
2465  for (uint32_t update = 0;
2466  update < checkpoint_header.updates; ++update)
2467  {
2468  std::string key;
2470  current = record.read (current, key, buffer_remaining);
2471  update_record_from_external (key, record, settings);
2472  }
2473 
2474  } // end if enough buffer for reading a message header
2475 
2476  if (buffer_remaining == 0 && (uint64_t) total_read < meta.size)
2477  {
2478  buffer_remaining = max_buffer;
2479  current = buffer.get_ptr ();
2480  total_read += fread (buffer.get_ptr (), 1, buffer_remaining, file);
2481  }
2482  } // end for loop of states
2483  }
2484  } // end if total_read > 0
2485  else
2486  {
2488  "ThreadSafeContext::load_context:" \
2489  " invalid file. No contextual change.\n");
2490  }
2491 
2492  fclose (file);
2493  }
2494 
2495  return total_read;
2496 }
2497 
2498 int64_t
2500  const std::string & filename,
2501  FileHeader & meta,
2502  const KnowledgeUpdateSettings & settings)
2503 {
2505  "ThreadSafeContext::load_context:" \
2506  " opening file %s\n", filename.c_str ());
2507 
2508  // using ACE for writing to the destination file
2509  FILE * file = fopen (filename.c_str (), "rb");
2510 
2511  int64_t total_read (0);
2512 
2513  if (file)
2514  {
2515  int64_t max_buffer (102800);
2516  int64_t buffer_remaining (max_buffer);
2517 
2518  utility::ScopedArray <char> buffer = new char[max_buffer];
2519  const char * current = buffer.get_ptr ();
2520 
2522  "ThreadSafeContext::load_context:" \
2523  " reading file meta data\n");
2524 
2525  total_read = fread (buffer.get_ptr (),
2526  1, max_buffer, file);
2527  buffer_remaining = (int64_t)total_read;
2528 
2529  if (total_read > FileHeader::encoded_size () &&
2530  FileHeader::file_header_test (current))
2531  {
2532  // if there was something in the file, and it was the right header
2533 
2534  current = meta.read (current, buffer_remaining);
2535 
2541  if (meta.states > 0)
2542  {
2543  for (uint64_t state = 0; state < meta.states; ++state)
2544  {
2545  if (buffer_remaining > (int64_t)
2547  {
2548  transport::MessageHeader checkpoint_header;
2549 
2550  current = checkpoint_header.read (current, buffer_remaining);
2551 
2557  if (checkpoint_header.size > (uint64_t)buffer_remaining)
2558  {
2563  utility::ScopedArray <char> new_buffer =
2564  new char[checkpoint_header.size];
2565  memcpy (new_buffer.get_ptr (), current,
2566  (size_t)buffer_remaining);
2567 
2568  // read the rest of checkpoint into new buffer
2569  total_read += fread (new_buffer.get_ptr () + buffer_remaining, 1,
2570  checkpoint_header.size
2571  - (uint64_t)buffer_remaining
2572  - checkpoint_header.encoded_size (), file);
2573 
2574  // update other variables
2575  max_buffer = checkpoint_header.size;
2576  buffer_remaining = checkpoint_header.size
2577  - checkpoint_header.encoded_size ();
2578  current = new_buffer.get_ptr ();
2579  buffer = new_buffer;
2580  } // end if allocation is needed
2581 
2582  for (uint32_t update = 0;
2583  update < checkpoint_header.updates; ++update)
2584  {
2585  std::string key;
2587  current = record.read (current, key, buffer_remaining);
2588  update_record_from_external (key, record, settings);
2589  }
2590 
2591  } // end if enough buffer for reading a message header
2592 
2593  if (buffer_remaining == 0 && (uint64_t)total_read < meta.size)
2594  {
2595  buffer_remaining = max_buffer;
2596  current = buffer.get_ptr ();
2597  total_read += fread (buffer.get_ptr (), 1, buffer_remaining, file);
2598  }
2599  } // end for loop of states
2600  }
2601  } // end if total_read > 0
2602  else
2603  {
2605  "ThreadSafeContext::load_context:" \
2606  " invalid file. No contextual change.\n");
2607  }
2608 
2609  fclose (file);
2610  }
2611 
2612  return total_read;
2613 }
2614 
2615 
2616 int64_t
2618  CheckpointSettings & checkpoint_settings,
2619  const KnowledgeUpdateSettings & update_settings)
2620 {
2622  "ThreadSafeContext::load_context:" \
2623  " opening file %s\n", checkpoint_settings.filename.c_str ());
2624 
2625  // using ACE for writing to the destination file
2626  FILE * file = fopen (checkpoint_settings.filename.c_str (), "rb");
2627 
2628  int64_t total_read (0);
2629 
2630  if (checkpoint_settings.clear_knowledge)
2631  {
2632  this->clear ();
2633  }
2634 
2635  if (file)
2636  {
2637  FileHeader meta;
2638  int64_t max_buffer (checkpoint_settings.buffer_size);
2639  int64_t buffer_remaining (max_buffer);
2640 
2641  utility::ScopedArray <char> buffer = new char[max_buffer];
2642  const char * current = buffer.get_ptr ();
2643 
2644  total_read = fread (buffer.get_ptr (),
2645  1, max_buffer, file);
2646  buffer_remaining = (int64_t)total_read;
2647 
2649  "ThreadSafeContext::load_context:" \
2650  " reading file: %d bytes read.\n",
2651  (int)total_read);
2652 
2653  // call decode with any buffer filters
2654  checkpoint_settings.decode ((unsigned char *)buffer.get_ptr (),
2655  (int)(total_read), (int)max_buffer);
2656 
2657  if (total_read > FileHeader::encoded_size () &&
2658  FileHeader::file_header_test (current))
2659  {
2660  // if there was something in the file, and it was the right header
2661 
2662  current = meta.read (current, buffer_remaining);
2663 
2664  checkpoint_settings.initial_timestamp = meta.initial_timestamp;
2665  checkpoint_settings.last_timestamp = meta.last_timestamp;
2666  checkpoint_settings.originator = meta.originator;
2667  checkpoint_settings.states = meta.states;
2668  checkpoint_settings.version = utility::to_string_version (
2669  meta.karl_version);
2670 
2672  "ThreadSafeContext::load_context:" \
2673  " read File meta. Meta.size=%d\n", (int)meta.size);
2674 
2680  if (meta.states > 0)
2681  {
2682  for (uint64_t state = 0; state < meta.states &&
2683  state <= checkpoint_settings.last_state; ++state)
2684  {
2685  if (buffer_remaining > (int64_t)
2687  {
2688  transport::MessageHeader checkpoint_header;
2689 
2690  current = checkpoint_header.read (current, buffer_remaining);
2691 
2692  if (state == 0)
2693  {
2694  checkpoint_settings.initial_lamport_clock = checkpoint_header.clock;
2695  }
2696 
2697  if (state == meta.states - 1)
2698  {
2699  checkpoint_settings.last_lamport_clock = checkpoint_header.clock;
2700  }
2701 
2702  uint64_t updates_size = checkpoint_header.size -
2703  checkpoint_header.encoded_size ();
2704 
2706  "ThreadSafeContext::load_context:" \
2707  " read Checkpoint header. header.size=%d, updates.size=%d\n",
2708  (int)checkpoint_header.size, (int)updates_size);
2709 
2715  if (updates_size > (uint64_t)buffer_remaining)
2716  {
2721  utility::ScopedArray <char> new_buffer =
2722  new char[updates_size];
2723  memcpy (new_buffer.get_ptr (), current,
2724  (size_t)buffer_remaining);
2725 
2726  // read the rest of checkpoint into new buffer
2727  total_read += fread (new_buffer.get_ptr () + buffer_remaining, 1,
2728  updates_size
2729  - (uint64_t)buffer_remaining
2730  - checkpoint_header.encoded_size (), file);
2731 
2732  // update other variables
2733  max_buffer = updates_size;
2734  buffer_remaining = checkpoint_header.size
2735  - checkpoint_header.encoded_size ();
2736  current = new_buffer.get_ptr ();
2737  buffer = new_buffer;
2738  } // end if allocation is needed
2739 
2741  "ThreadSafeContext::load_context:" \
2742  " state=%d, initial_state=%d, last_state=%d\n",
2743  (int)state, (int)checkpoint_settings.initial_state,
2744  (int)checkpoint_settings.last_state);
2745 
2746  if (state <= checkpoint_settings.last_state &&
2747  state >= checkpoint_settings.initial_state)
2748  {
2749  for (uint32_t update = 0;
2750  update < checkpoint_header.updates; ++update)
2751  {
2752  std::string key;
2754  current = record.read (current, key, buffer_remaining);
2755 
2757  "ThreadSafeContext::load_context:" \
2758  " read record (%d of %d): %s\n",
2759  (int)update, (int)checkpoint_header.updates, key.c_str ());
2760 
2761  // check if the prefix is allowed
2762  if (checkpoint_settings.prefixes.size () > 0)
2763  {
2764  bool prefix_found = false;
2765  for (size_t j = 0; j < checkpoint_settings.prefixes.size ()
2766  && !prefix_found; ++j)
2767  {
2769  "ThreadSafeContext::load_context:" \
2770  " checking record %s against prefix %s\n",
2771  key.c_str (), checkpoint_settings.prefixes[j].c_str ());
2772 
2774  key, checkpoint_settings.prefixes[j]))
2775  {
2777  "ThreadSafeContext::load_context:" \
2778  " record has the correct prefix.\n");
2779 
2780  prefix_found = true;
2781  } // end if prefix success
2782  } // end for all prefixes
2783 
2784  if (!prefix_found)
2785  {
2787  "ThreadSafeContext::load_context:" \
2788  " record does not have the correct prefix. Rejected.\n");
2789 
2790  continue;
2791  } // end if prefix found
2792  } // end if there are prefixes in the checkpoint settings
2793 
2794  update_record_from_external (key, record, update_settings);
2795  } // end for all updates
2796  } // end if state is within acceptable range
2797  else
2798  {
2800  "ThreadSafeContext::load_context:" \
2801  " not a valid state, incrementing by %d bytes.\n",
2802  (int)updates_size);
2803 
2804  current += updates_size;
2805  }
2806  } // end if enough buffer for reading a message header
2807 
2808  if (buffer_remaining == 0 && (uint64_t)total_read < meta.size)
2809  {
2810  buffer_remaining = max_buffer;
2811  current = buffer.get_ptr ();
2812  total_read += fread (buffer.get_ptr (), 1, buffer_remaining, file);
2813  }
2814  } // end for loop of states
2815  }
2816  } // end if total_read > 0
2817  else
2818  {
2820  "ThreadSafeContext::load_context:" \
2821  " invalid file. No contextual change.\n");
2822  }
2823 
2824  fclose (file);
2825  }
2826 
2827  return total_read;
2828 }
2829 
2830 
2831 int64_t
2833  const CheckpointSettings & settings) const
2834 {
2836  "ThreadSafeContext::save_checkpoint:" \
2837  " opening file %s\n", settings.filename.c_str ());
2838 
2839  int64_t total_written (0);
2840  FILE * file = fopen (settings.filename.c_str (), "rb+");
2841 
2842  FileHeader meta;
2843  transport::MessageHeader checkpoint_header;
2844 
2845  if (file)
2846  {
2847  int64_t max_buffer (settings.buffer_size);
2848  int64_t buffer_remaining (max_buffer);
2849  utility::ScopedArray <char> buffer = new char [max_buffer];
2850 
2851  char * current = buffer.get_ptr ();
2852  const char * meta_reader = current;
2853 
2854  // read the meta data at the front
2855  fseek (file, 0, SEEK_SET);
2856  size_t ret = fread (current, meta.encoded_size (), 1, file);
2857  if (ret == 0) {
2859  "ThreadSafeContext::save_checkpoint:" \
2860  " failed to read existing file header: size=%d\n",
2861  (int)meta.encoded_size ());
2862 
2863  return -1;
2864  }
2865 
2866  meta_reader = meta.read (meta_reader, buffer_remaining);
2867 
2869  "ThreadSafeContext::save_checkpoint:" \
2870  " init file meta: size=%d, states=%d\n",
2871  (int)meta.size, (int)meta.states);
2872 
2873  if (settings.originator != "")
2874  {
2876  "ThreadSafeContext::save_checkpoint:" \
2877  " setting file meta id to %s\n",
2878  settings.originator.c_str ());
2879 
2880  strncpy (meta.originator, settings.originator.c_str (),
2881  sizeof (meta.originator) < settings.originator.size () + 1 ?
2882  sizeof (meta.originator) : settings.originator.size () + 1);
2883  }
2884 
2885  // save the spot where the file ends
2886  uint64_t checkpoint_start = meta.size;
2887 
2888  checkpoint_header.size = checkpoint_header.encoded_size ();
2889 
2891  "ThreadSafeContext::save_checkpoint:" \
2892  " meta.size=%d, chkpt.header.size=%d \n",
2893  (int)meta.size, (int)checkpoint_header.size);
2894 
2895  if (settings.override_timestamp)
2896  {
2897  meta.initial_timestamp = settings.initial_timestamp;
2898  meta.last_timestamp = settings.last_timestamp;
2899  }
2900 
2901  if (settings.override_lamport)
2902  {
2903  checkpoint_header.clock = settings.initial_lamport_clock;
2904  }
2905  else
2906  {
2907  checkpoint_header.clock = clock_;
2908  }
2909 
2910  const knowledge::KnowledgeRecords & local_records = this->get_local_modified ();
2911 
2912  if (local_records.size () != 0)
2913  {
2914  // skip over the checkpoint header. We'll write this later with the records
2915 
2917  "ThreadSafeContext::save_checkpoint:" \
2918  " fseek set to %d\n",
2919  (int)(checkpoint_start));
2920 
2921  // set the file pointer to the checkpoint header start
2922  fseek (file, checkpoint_start, SEEK_SET);
2923 
2924  // start updates just past the checkpoint header's buffer location
2925  current = checkpoint_header.write (buffer.get_ptr (), buffer_remaining);
2926 
2928  "ThreadSafeContext::save_checkpoint:" \
2929  " chkpt.header.size=%d, current->buffer delta=%d\n",
2930  (int) checkpoint_header.encoded_size (),
2931  (int)(current - buffer.get_ptr ()));
2932 
2933  {
2934  // lock the context
2935  MADARA_GUARD_TYPE guard (mutex_);
2936 
2937  for (KnowledgeRecords::const_iterator i = local_records.begin ();
2938  i != local_records.end (); ++i)
2939  {
2940  if (i->second->exists ())
2941  {
2942  // check if the prefix is allowed
2943  if (settings.prefixes.size () > 0)
2944  {
2945  bool prefix_found = false;
2946  for (size_t j = 0;
2947  j < settings.prefixes.size () && !prefix_found; ++j)
2948  {
2950  i->second->to_string (), settings.prefixes[j]))
2951  {
2952  prefix_found = true;
2953  }
2954  } // end for j->prefixes.size
2955 
2956  if (!prefix_found)
2957  continue;
2958  } // end if prefixes exists
2959 
2960  // get the encoded size of the record for checking buffer boundaries
2961  int64_t encoded_size = i->second->get_encoded_size (i->first);
2962 
2964  "ThreadSafeContext::save_checkpoint:" \
2965  " estimated encoded size of update=%d bytes\n",
2966  (int)encoded_size);
2967 
2968  if (encoded_size > buffer_remaining)
2969  {
2970  fwrite (current,
2971  (size_t)(max_buffer - buffer_remaining), 1, file);
2972  total_written += (int64_t)(max_buffer - buffer_remaining);
2973  buffer_remaining = max_buffer;
2974 
2976  "ThreadSafeContext::save_checkpoint:" \
2977  " encoded_size larger than remaining buffer. Flushing\n");
2978 
2979  if (encoded_size > max_buffer)
2980  {
2985  buffer = new char[encoded_size];
2986  max_buffer = encoded_size;
2987  buffer_remaining = max_buffer;
2988  current = buffer.get_ptr ();
2989 
2991  "ThreadSafeContext::save_checkpoint:" \
2992  " encoded_size larger than entire buffer. Reallocating\n");
2993  } // end if larger than buffer
2994  } // end if larger than buffer remaining
2995 
2996  current = i->second->write (current, i->first, buffer_remaining);
2997 
2998  checkpoint_header.size += (uint64_t)encoded_size;
2999  ++checkpoint_header.updates;
3000 
3002  "ThreadSafeContext::save_checkpoint:" \
3003  " chkpt.header.size=%d, current->buffer delta=%d\n",
3004  (int)checkpoint_header.size, (int)(current - buffer.get_ptr ()));
3005  } // if record exists
3006  } // for all records
3007 
3008  ++meta.states;
3009 
3010  if (settings.reset_checkpoint)
3011  {
3013  "ThreadSafeContext::save_checkpoint:" \
3014  " resetting checkpoint. Next checkpoint starts fresh here\n");
3015 
3016  reset_checkpoint ();
3017  }
3018  } // end scoped Guard for context
3019 
3021  "ThreadSafeContext::save_checkpoint:" \
3022  " writing final data for state #%d\n",
3023  (int)meta.states);
3024 
3025  if (buffer_remaining != max_buffer)
3026  {
3027  fwrite (buffer.get_ptr (),
3028  (size_t)(current - buffer.get_ptr ()), 1, file);
3029  total_written += (size_t)(current - buffer.get_ptr ());
3030 
3032  "ThreadSafeContext::save_checkpoint:" \
3033  " current->buffer=%d bytes, max->remaining=%d bytes\n",
3034  (int)(current - buffer.get_ptr ()), (int)max_buffer - buffer_remaining);
3035  }
3036 
3038  "ThreadSafeContext::save_checkpoint:" \
3039  " chkpt.header: size=%d, updates=%d\n",
3040  (int)checkpoint_header.size, (int)checkpoint_header.updates);
3041 
3042  buffer_remaining = max_buffer;
3043  fseek (file, checkpoint_start, SEEK_SET);
3044  current = checkpoint_header.write (buffer.get_ptr (), buffer_remaining);
3045  fwrite (buffer.get_ptr (), current - buffer.get_ptr (), 1, file);
3046 
3047  meta.size += checkpoint_header.size;
3048 
3050  "ThreadSafeContext::save_checkpoint:" \
3051  " new file meta: size=%d, states=%d, lastchkpt.size=%d\n",
3052  (int)meta.size, (int)meta.states, (int)checkpoint_header.size);
3053 
3054  // update the meta data at the front
3055  fseek (file, 0, SEEK_SET);
3056 
3058  "ThreadSafeContext::save_checkpoint:" \
3059  " updating file meta data in the file\n");
3060 
3061  buffer_remaining = max_buffer;
3062  current = meta.write (buffer.get_ptr (), buffer_remaining);
3063 
3064  fwrite (buffer.get_ptr (), current - buffer.get_ptr (), 1, file);
3065 
3066  } // if there are local checkpointing records
3067 
3068  fclose (file);
3069  } // if file is opened
3070  else
3071  {
3073  "ThreadSafeContext::save_checkpoint:" \
3074  " checkpoint doesn't exist. Creating.\n");
3075 
3076  // someone wants to save the checkpoint diff to a new file
3077  file = fopen (settings.filename.c_str (), "wb");
3078 
3079  meta.states = 1;
3080  strncpy (meta.originator, settings.originator.c_str (),
3081  sizeof (meta.originator) < settings.originator.size () + 1 ?
3082  sizeof (meta.originator) : settings.originator.size () + 1);
3083 
3084  if (file)
3085  {
3086  int64_t max_buffer (settings.buffer_size);
3087  int64_t buffer_remaining (max_buffer);
3088  utility::ScopedArray <char> buffer = new char [max_buffer];
3089 
3090  char * current = buffer.get_ptr ();
3091 
3093  "ThreadSafeContext::save_checkpoint:" \
3094  " creating file meta. file.meta.size=%d, state.size=%d\n",
3095  (int)meta.size, (int)checkpoint_header.encoded_size ());
3096 
3097  meta.size += checkpoint_header.encoded_size ();
3098  checkpoint_header.size = checkpoint_header.encoded_size ();
3099 
3100  if (settings.override_timestamp)
3101  {
3102  meta.initial_timestamp = settings.initial_timestamp;
3103  meta.last_timestamp = settings.last_timestamp;
3104  }
3105 
3106  current = meta.write (current, buffer_remaining);
3107 
3108  if (settings.override_lamport)
3109  {
3110  checkpoint_header.clock = settings.initial_lamport_clock;
3111  }
3112  else
3113  {
3114  checkpoint_header.clock = clock_;
3115  }
3116 
3117  current = checkpoint_header.write (current, buffer_remaining);
3118 
3120  "ThreadSafeContext::save_checkpoint:" \
3121  " writing diff records\n");
3122 
3123  // lock the context
3124  MADARA_GUARD_TYPE guard (mutex_);
3125 
3126  const knowledge::KnowledgeRecords & local_records = this->get_local_modified ();
3127 
3128  for (KnowledgeRecords::const_iterator i = local_records.begin ();
3129  i != local_records.end (); ++i)
3130  {
3131  if (i->second->exists ())
3132  {
3133  // check if the prefix is allowed
3134  if (settings.prefixes.size () > 0)
3135  {
3137  "ThreadSafeContext::save_checkpoint:" \
3138  " we have %d prefixes to check against.\n",
3139  (int)settings.prefixes.size ());
3140 
3141  bool prefix_found = false;
3142  for (size_t j = 0;
3143  j < settings.prefixes.size () && !prefix_found; ++j)
3144  {
3146  "ThreadSafeContext::save_checkpoint:" \
3147  " checking record %s against prefix %s.\n",
3148  i->first.c_str (),
3149  settings.prefixes[j].c_str ());
3150 
3152  i->first, settings.prefixes[j]))
3153  {
3155  "ThreadSafeContext::save_checkpoint:" \
3156  " record has the correct prefix.\n");
3157 
3158  prefix_found = true;
3159  }
3160  }
3161 
3162  if (!prefix_found)
3163  {
3165  "ThreadSafeContext::save_checkpoint:" \
3166  " record has the wrong prefix. Rejected.\n");
3167 
3168  continue;
3169  }
3170  }
3171 
3172  // get the encoded size of the record for checking buffer boundaries
3173  int64_t encoded_size = i->second->get_encoded_size (i->first);
3174 
3176  "ThreadSafeContext::save_checkpoint:" \
3177  " estimated encoded size of update=%d bytes\n",
3178  (int)encoded_size);
3179 
3180  ++checkpoint_header.updates;
3181  meta.size += encoded_size;
3182  checkpoint_header.size += encoded_size;
3183 
3184  current = i->second->write (current, i->first, buffer_remaining);
3185 
3187  "ThreadSafeContext::save_checkpoint:" \
3188  " current->buffer delta=%d bytes\n",
3189  (int)(current - buffer.get_ptr ()));
3190 
3191  }
3192  }
3193 
3194  // write the final sizes
3195  current = meta.write (buffer.get_ptr (), max_buffer);
3196  current = checkpoint_header.write (current, max_buffer);
3197 
3198  // call decode with any buffer filters
3199  int total = settings.encode ((unsigned char *)buffer.get_ptr (),
3200  (int)meta.size, (int)max_buffer);
3201 
3202  // update the meta data at the front
3203  fseek (file, 0, SEEK_SET);
3204 
3206  "ThreadSafeContext::save_checkpoint:" \
3207  " file size: %d bytes written (file:%d, state.size:%d).\n",
3208  (int)total, (int)meta.size, (int)checkpoint_header.size);
3209 
3210  fwrite (buffer.get_ptr (), (size_t)total, 1, file);
3211 
3212  fclose (file);
3213 
3214  if (settings.reset_checkpoint)
3215  {
3216  reset_checkpoint ();
3217  }
3218 
3219  } // if the new file creation for wb was successful
3220  } // end if we need to create a new file
3221 
3222  return checkpoint_header.size;
3223 }
3224 
3225 
3226 int64_t
3228  const std::string & filename,
3229  const std::string & id) const
3230 {
3232  "ThreadSafeContext::save_checkpoint:" \
3233  " opening file %s\n", filename.c_str ());
3234 
3235  int64_t total_written (0);
3236  FILE * file = fopen (filename.c_str (), "rb+");
3237 
3238  FileHeader meta;
3239  transport::MessageHeader checkpoint_header;
3240 
3241  if (file)
3242  {
3243  int64_t max_buffer (102800);
3244  int64_t buffer_remaining (max_buffer);
3245  utility::ScopedArray <char> buffer = new char [max_buffer];
3246 
3247  char * current = buffer.get_ptr ();
3248  const char * meta_reader = current;
3249 
3250  // read the meta data at the front
3251  fseek (file, 0, SEEK_SET);
3252  size_t ret = fread (current, meta.encoded_size (), 1, file);
3253  if (ret == 0) {
3255  "ThreadSafeContext::save_checkpoint:" \
3256  " failed to read existing file header: size=%d\n",
3257  (int)meta.encoded_size ());
3258 
3259  return -1;
3260  }
3261 
3262  meta_reader = meta.read (meta_reader, buffer_remaining);
3263 
3264  if (id != "")
3265  {
3267  "ThreadSafeContext::save_checkpoint:" \
3268  " setting file meta id to %s\n",
3269  id.c_str ());
3270 
3271  strncpy (meta.originator, id.c_str (),
3272  sizeof (meta.originator) < id.size () + 1 ?
3273  sizeof (meta.originator) : id.size () + 1);
3274  }
3275 
3276  // save the spot where the file ends
3277  uint64_t checkpoint_start = meta.size;
3278 
3280  "ThreadSafeContext::save_checkpoint:" \
3281  " generating file meta\n");
3282 
3283  meta.size += checkpoint_header.encoded_size ();
3284  checkpoint_header.size = 0;
3285 
3286  // lock the context
3287  MADARA_GUARD_TYPE guard (mutex_);
3288 
3289  const knowledge::KnowledgeRecords & records = this->get_modifieds ();
3290  const knowledge::KnowledgeRecords & local_records = this->get_local_modified ();
3291 
3292  if (records.size () + local_records.size () != 0)
3293  {
3294  checkpoint_header.size = checkpoint_header.encoded_size ();
3295 
3296  // set the file pointer to the end of the file
3297  fseek (file, checkpoint_start, SEEK_SET);
3298  current = checkpoint_header.write (current, buffer_remaining);
3299 
3301  "ThreadSafeContext::save_checkpoint:" \
3302  " writing records\n");
3303 
3304  for (KnowledgeRecords::const_iterator i = records.begin ();
3305  i != records.end (); ++i)
3306  {
3307  if (i->second->exists ())
3308  {
3309  // get the encoded size of the record for checking buffer boundaries
3310  int64_t encoded_size = i->second->get_encoded_size (i->first);
3311  ++checkpoint_header.updates;
3312  meta.size += encoded_size;
3313  checkpoint_header.size += encoded_size;
3314 
3315  if (encoded_size > buffer_remaining)
3316  {
3321  current = buffer.get_ptr ();
3322  fwrite (current,
3323  (size_t)(max_buffer - buffer_remaining), 1, file);
3324  total_written += (int64_t)(max_buffer - buffer_remaining);
3325  buffer_remaining = max_buffer;
3326 
3328  "ThreadSafeContext::save_checkpoint:" \
3329  " encoded_size larger than remaining buffer. Flushing\n");
3330 
3331  if (encoded_size > max_buffer)
3332  {
3337  buffer = new char[encoded_size];
3338  max_buffer = encoded_size;
3339  buffer_remaining = max_buffer;
3340  current = buffer.get_ptr ();
3341 
3343  "ThreadSafeContext::save_checkpoint:" \
3344  " encoded_size larger than entire buffer. Reallocating\n");
3345  } // end if larger than buffer
3346  } // end if larger than buffer remaining
3347 
3348  current = i->second->write (current, i->first, buffer_remaining);
3349  }
3350  } // end records loop
3351 
3352  for (KnowledgeRecords::const_iterator i = local_records.begin ();
3353  i != local_records.end (); ++i)
3354  {
3355  if (i->second->exists ())
3356  {
3357  // get the encoded size of the record for checking buffer boundaries
3358  int64_t encoded_size = i->second->get_encoded_size (i->first);
3359  ++checkpoint_header.updates;
3360  meta.size += encoded_size;
3361  checkpoint_header.size += encoded_size;
3362 
3363  if (encoded_size > buffer_remaining)
3364  {
3369  current = buffer.get_ptr ();
3370  fwrite (current,
3371  (size_t)(max_buffer - buffer_remaining), 1, file);
3372  total_written += (int64_t)(max_buffer - buffer_remaining);
3373  buffer_remaining = max_buffer;
3374 
3376  "ThreadSafeContext::save_checkpoint:" \
3377  " encoded_size larger than remaining buffer. Flushing\n");
3378 
3379  if (encoded_size > max_buffer)
3380  {
3385  buffer = new char[encoded_size];
3386  max_buffer = encoded_size;
3387  buffer_remaining = max_buffer;
3388  current = buffer.get_ptr ();
3389 
3391  "ThreadSafeContext::save_checkpoint:" \
3392  " encoded_size larger than entire buffer. Reallocating\n");
3393  } // end if larger than buffer
3394  } // end if larger than buffer remaining
3395 
3396  current = i->second->write (current, i->first, buffer_remaining);
3397  }
3398  }
3399 
3400  if (buffer_remaining != max_buffer)
3401  {
3402  fwrite (buffer.get_ptr (),
3403  (size_t) (max_buffer - buffer_remaining), 1, file);
3404  total_written += (size_t) (max_buffer - buffer_remaining);
3405  }
3406 
3408  "ThreadSafeContext::save_checkpoint:" \
3409  " updating file meta data\n");
3410 
3411  // update the meta data at the front
3412  fseek (file, 0, SEEK_SET);
3413 
3414  current = buffer.get_ptr ();
3415  buffer_remaining = max_buffer;
3416  ++meta.states;
3417 
3418  current = meta.write (current, buffer_remaining);
3419 
3420  fwrite (buffer.get_ptr (), current - buffer.get_ptr (), 1, file);
3421 
3423  "ThreadSafeContext::save_checkpoint:" \
3424  " updating checkpoint meta data\n");
3425 
3426  // update the checkpoint meta data
3427  fseek (file, checkpoint_start, SEEK_SET);
3428 
3429  current = buffer.get_ptr ();
3430  buffer_remaining = max_buffer;
3431 
3432  current = checkpoint_header.write (current, buffer_remaining);
3433 
3434  fwrite (buffer.get_ptr (), current - buffer.get_ptr (), 1, file);
3435  }
3436 
3437  fclose (file);
3438  }
3439 
3440  return checkpoint_header.size;
3441 }
3442 
This class encapsulates an entry in a KnowledgeBase.
bool expand_variables
Toggle for always attempting to expand variables (true) or never expanding variables (false) ...
ExpressionTree interpret(madara::knowledge::ThreadSafeContext &context, const std::string &input)
Compiles an expression into an expression tree.
uint32_t get_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets quality of a variable.
std::map< std::string, bool > CopySet
Typedef for set of copyable keys.
uint64_t initial_lamport_clock
initial lamport clock saved in the checkpoint
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
std::vector< MatchPredicate > predicates
A vector of acceptable predicates to match (prefix and suffix).
Function * retrieve_function(const std::string &name, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Retrieves an external function.
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a FileHeader instance from a buffer and updates the amount of buffer room remaining.
Definition: FileHeader.cpp:37
std::string version
the MADARA version
int update_record_from_external(const std::string &key, const knowledge::KnowledgeRecord &rhs, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true))
Atomically sets if the variable value meets update conditions.
madara::knowledge::KnowledgeMap map_
Hash table containing variable names and values.
Defines a file header which is the default for KaRL checkpointing.
Definition: FileHeader.h:35
void get_matches(const std::string &prefix, const std::string &suffix, VariableReferences &matches)
Creates an iteration of VariableReferences to all keys matching the prefix and suffix.
utility::ScopedArray< const char > name_
potential string value of the node (size int)
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a MessageHeader instance from a buffer and updates the amount of buffer room remaining...
size_t buffer_size
the size of the buffer needed for the checkpoint
std::vector< std::string > expansion_splitters_
size_t to_vector(const std::string &subject, unsigned int start, unsigned int end, std::vector< KnowledgeRecord > &target)
Fills a vector with Knowledge Records that begin with a common subject and have a finite range of int...
This class stores a function definition.
Definition: Functions.h:44
uint32_t quality
priority of the update
knowledge::KnowledgeMap to_map_stripped(const std::string &prefix) const
Creates a map with Knowledge Records that begin with the given prefix.
int64_t save_context(const std::string &filename, const std::string &id="") const
Saves the context to a file.
size_t to_map(const std::string &subject, std::map< std::string, knowledge::KnowledgeRecord > &target)
Fills a variable map with Knowledge Records that match an expression.
int set_file(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an arbitrary string.
madara::knowledge::KnowledgeRecord KnowledgeRecord
int set_jpeg(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to a JPEG image.
const knowledge::KnowledgeRecords & get_modifieds(void) const
Retrieves a list of modified variables.
uint32_t updates
the number of knowledge variable updates in the message
const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a KnowledgeRecord instance from a buffer and updates the amount of buffer room remaining...
void copy(const ThreadSafeContext &source, const KnowledgeRequirements &settings)
Copies variables and values from source to this context.
void set_file(const unsigned char *new_value, size_t size)
sets the value to an unknown file type
char originator[64]
the originator of the message (host:port)
Definition: FileHeader.h:126
uint64_t last_state
the last state number of interest (useful for loading ranges of checkpoint states.
uint64_t initial_timestamp
the timestamp for the initial checkpointing
Definition: FileHeader.h:106
int encode(unsigned char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
void set_text(const char *new_value, size_t size)
sets the value to a plaintext string
This class stores variables and their values for use by any entity needing state information in a thr...
int64_t load_context(const std::string &filename, std::string &id, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true, true, true, false))
Loads the context from a file.
virtual madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings)=0
Evaluates the expression tree.
uint64_t last_timestamp
the timestamp for the last checkpoint
Definition: FileHeader.h:111
void print(unsigned int level) const
Atomically prints all variables and values in the context.
int decode(unsigned char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
uint64_t size
the size of this header plus the updates
Definition: FileHeader.h:96
MADARA_Export utility::Refcounter< logger::Logger > global_logger
void delete_prefix(const std::string &prefix, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Deletes keys starting with the given prefix.
Holds settings for checkpoints to load or save.
void set_xml(const char *new_value, size_t size)
sets the value to an xml string
std::string logic
the logic that was compiled
int64_t save_as_karl(const std::string &filename) const
Saves the context to a file as karl assignments, rather than binary.
Provides knowledge logging services to files and terminals.
Definition: GlobalLogger.h:11
uint32_t karl_version
Version of KaRL installed when file was created.
Definition: FileHeader.h:121
Optimized reference to a variable within the knowledge base.
Compiled, optimized KaRL logic.
std::vector< KnowledgeRecord > FunctionArguments
static uint32_t encoded_size(void)
Returns the size of the encoded FileHeader class, which may be different from sizeof (FileHeader) bec...
Definition: FileHeader.cpp:29
bool is_valid(void) const
Checks to see if the variable reference has been initialized.
void define_function(const std::string &name, knowledge::KnowledgeRecord(*func)(FunctionArguments &, Variables &), const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Defines an external function.
bool clear_knowledge
If true, during loads, clear the KnowledgeBase first.
static uint32_t static_encoded_size(void)
Returns the size of the encoded MessageHeader class.
void set_value(const KnowledgeRecord &new_value)
Sets the value from another KnowledgeRecord, does not copy clock and write_quality.
std::string originator
the originator id of the checkpoint
MADARA_Export std::string extract_path(const std::string &name)
Extracts the path of a filename.
Definition: Utility.cpp:483
uint32_t get_write_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets write quality of this process for a variable.
madara::expression::ExpressionTree expression
the expression tree
void set_jpeg(const unsigned char *new_value, size_t size)
sets the value to a jpeg
const knowledge::KnowledgeRecords & get_local_modified(void) const
Retrieves a list of modified local variables.
uint64_t initial_state
the initial state number of interest (useful for loading ranges of checkpoint states).
int64_t save_checkpoint(const std::string &filename, const std::string &id="") const
Saves a checkpoint of a list of changes to a file.
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
DeepIterator< Iterator > deep_iterate(const Iterator &i)
Returns an input iterator from an iterator.
Definition: DeepIterator.h:179
int set_xml(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
uint64_t last_lamport_clock
final lamport clock saved in the checkpoint
std::vector< std::string > prefixes
A list of prefixes to save/load.
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
MADARA_Export ssize_t write_file(const std::string &filename, void *buffer, size_t size)
Writes a file with provided contents.
Definition: Utility.cpp:779
FunctionMap functions_
map of function names to functions
void to_string(std::string &target, const std::string &array_delimiter=",", const std::string &record_delimiter=";\n", const std::string &key_val_delimiter="=") const
Saves all keys and values into a string, using the underlying knowledge::KnowledgeRecord::to_string f...
bool override_timestamp
use the timestamps in this class instead of current wallclock time when writing context or checkpoint...
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:32
Holds settings requirements for knowledge, usually in copying.
uint64_t size
the size of this header plus the updates
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
::std::map< std::string, KnowledgeRecord > KnowledgeMap
uint64_t states
the number of states checkpointed in the file stream
Definition: FileHeader.h:101
An abstract base class defines a simple abstract implementation of an expression tree node...
Definition: ComponentNode.h:35
knowledge::KnowledgeRecords local_changed_map_
uint64_t initial_timestamp
initial wallclock time saved in the checkpoint
std::pair< KnowledgeMap::const_iterator, KnowledgeMap::const_iterator > get_prefix_range(const std::string &prefix) const
int set_if_unequal(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value, uint32_t quality, uint64_t clock, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets if the variable value will be different.
static constexpr struct madara::knowledge::tags::string_t string
static bool file_header_test(const char *buffer)
Tests the buffer for a normal message identifier.
Definition: FileHeader.h:88
bool clear_knowledge
If true, during loads, clear the KnowledgeBase first.
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
bool always_overwrite
Toggle for always overwriting records, regardless of quality, clock values, etc.
std::vector< VariableReference > VariableReferences
a vector of variable references
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
int set_text(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
std::string expand_statement(const std::string &statement) const
Expands a string with variable expansion.
MADARA_Export std::string to_string_version(uint32_t version)
Converts a MADARA uint32_t version number to human-readable.
Definition: Utility.cpp:71
KnowledgeRecord * get_record(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Retrieves a knowledge record from the key.
knowledge::KnowledgeRecords changed_map_
logger::Logger * logger_
Logger for printing.
uint64_t states
the number of states checkpointed in the file stream
void set_name(const std::string &name)
Sets the name of the variable.
uint64_t last_timestamp
final wallclock time saved in the checkpoint
bool override_lamport
use the lamport clocks in this class instead of KB clock when writing context or checkpoints ...
uint32_t write_quality
write priority for any local updates
knowledge::KnowledgeRecord * record_
Reference to knowledge record.
void reset_checkpoint(void) const
Reset all checkpoint variables in the modified lists.
uint64_t clock
the clock of the sender when the message was generated
bool clear(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Clears a variable.
VariableReference get_ref(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically returns a reference to the variable.
Settings for applying knowledge updates.
void set_write_quality(const std::string &key, uint32_t quality, const KnowledgeReferenceSettings &settings)
Atomically sets write quality of this process for a variable.
Copyright (c) 2015 Carnegie Mellon University.
uint32_t set_quality(const std::string &key, uint32_t quality, bool force_update, const KnowledgeReferenceSettings &settings)
Atomically sets quality of this process for a variable.
int read_file(const std::string &filename, uint32_t read_as_type=0)
reads a file and sets the type appropriately according to the extension
MADARA_Export bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
Definition: Utility.inl:7
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:56
Settings for applying knowledge updates.
bool reset_checkpoint
If true, resets the checkpoint to start a new diff from this point forward.
Provides an interface for external functions into the MADARA KaRL variable settings.
void mark_and_signal(const char *name, knowledge::KnowledgeRecord *record, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
method for marking a record modified and signaling changes
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
madara::expression::Interpreter * interpreter_
KaRL interpreter.
int read_file(const std::string &key, const std::string &filename, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically reads a file into a variable.
MADARA_Export bool ends_with(const std::string &input, const std::string &ending)
Check if input contains a pattern at the end.
Definition: Utility.inl:21
uint64_t clock
last modification time
int64_t save_as_json(const std::string &filename) const
Saves the context to a file as JSON.
virtual char * write(char *buffer, int64_t &buffer_remaining)
Writes a MessageHeader instance to a buffer and updates the amount of buffer room remaining...