24 #ifdef MADARA_CONDITION_MUTEX_CONSTRUCTOR
28 #ifndef _MADARA_NO_KARL_
30 interpreter_ (new
madara::expression::Interpreter ())
41 #ifndef _MADARA_NO_KARL_ 43 #endif // _MADARA_NO_KARL_ 59 MADARA_GUARD_TYPE guard (
mutex_);
64 key_ptr = &key_actual;
74 return &
map_[*key_ptr];
85 MADARA_GUARD_TYPE guard (
mutex_);
93 key_ptr = &key_actual;
117 MADARA_GUARD_TYPE guard (
mutex_);
125 key_ptr = &key_actual;
136 KnowledgeMap::const_iterator found =
map_.find (*key_ptr);
146 const char * value,
size_t size,
150 MADARA_GUARD_TYPE guard (
mutex_);
173 const char * value,
size_t size,
176 MADARA_GUARD_TYPE guard (
mutex_);
199 const unsigned char * value,
size_t size,
202 MADARA_GUARD_TYPE guard (
mutex_);
225 const unsigned char * value,
size_t size,
228 MADARA_GUARD_TYPE guard (
mutex_);
254 int return_value = 0;
255 MADARA_GUARD_TYPE guard (
mutex_);
269 return return_value = -1;
284 MADARA_GUARD_TYPE guard (
mutex_);
289 key_ptr = &key_actual;
295 KnowledgeMap::iterator found =
map_.find (*key_ptr);
300 if (found !=
map_.end ())
301 return map_[*key_ptr].quality;
317 MADARA_GUARD_TYPE guard (
mutex_);
322 key_ptr = &key_actual;
328 KnowledgeMap::iterator found =
map_.find (*key_ptr);
333 if (found !=
map_.end ())
334 return map_[*key_ptr].write_quality;
351 MADARA_GUARD_TYPE guard (
mutex_);
356 key_ptr = &key_actual;
366 KnowledgeMap::iterator found =
map_.find (*key_ptr);
371 if (found ==
map_.end () || force_update || quality > found->second.quality)
372 map_[*key_ptr].quality = quality;
375 return map_[*key_ptr].quality;
387 MADARA_GUARD_TYPE guard (
mutex_);
392 key_ptr = &key_actual;
399 map_[*key_ptr].write_quality = quality;
409 uint32_t quality, uint64_t clock,
417 MADARA_GUARD_TYPE guard (
mutex_);
422 key_ptr = &key_actual;
432 KnowledgeMap::iterator found =
map_.find (*key_ptr);
443 if (quality < found->second.quality)
448 else if (quality == found->second.quality &&
449 clock < found->second.clock)
453 else if (found->second == rhs)
460 if (result != -2 && record.
quality != quality)
464 if (clock > record.
clock)
465 record.
clock = clock;
490 uint32_t quality, uint64_t clock,
498 MADARA_GUARD_TYPE guard (
mutex_);
503 key_ptr = &key_actual;
513 KnowledgeMap::iterator found =
map_.find (*key_ptr);
524 if (quality < found->second.quality)
529 else if (quality == found->second.quality &&
530 clock < found->second.clock)
534 else if (found->second == rhs)
541 if (result != -2 && record.
quality != quality)
545 if (clock > record.
clock)
546 record.
clock = clock;
571 uint32_t quality, uint64_t clock,
579 MADARA_GUARD_TYPE guard (
mutex_);
584 key_ptr = &key_actual;
594 KnowledgeMap::iterator found =
map_.find (*key_ptr);
605 if (quality < found->second.quality)
610 else if (quality == found->second.quality &&
611 clock < found->second.clock)
615 else if (found->second == rhs)
622 if (result != -2 && record.
quality != quality)
626 if (clock > record.
clock)
627 record.
clock = clock;
660 MADARA_GUARD_TYPE guard (
mutex_);
665 key_ptr = &key_actual;
675 KnowledgeMap::iterator found =
map_.find (*key_ptr);
682 if (rhs.
quality < found->second.quality)
687 else if (rhs.
quality == found->second.quality &&
688 rhs.
clock < found->second.clock)
692 found->second.set_value (rhs);
708 if (rhs.
clock >= this->clock_)
730 MADARA_GUARD_TYPE guard (
mutex_);
755 if (rhs.
clock >= this->clock_)
768 changed_.MADARA_CONDITION_NOTIFY_ONE ();
774 unsigned int level)
const 776 MADARA_GUARD_TYPE guard (
mutex_);
777 for (madara::knowledge::KnowledgeMap::const_iterator i =
map_.begin ();
781 if (i->second.exists ())
784 i->first.c_str (), i->second.to_string (
", ").c_str ());
797 MADARA_GUARD_TYPE guard (
mutex_);
798 std::stringstream buffer;
802 for (madara::knowledge::KnowledgeMap::const_iterator i =
map_.begin ();
809 buffer << record_delimiter;
815 buffer << key_val_delimiter;
817 if (i->second.is_string_type ())
821 else if (i->second.type () == i->second.DOUBLE_ARRAY ||
822 i->second.type () == i->second.INTEGER_ARRAY)
828 buffer << i->second.to_string (array_delimiter);
830 if (i->second.is_string_type ())
834 else if (i->second.type () == i->second.DOUBLE_ARRAY ||
835 i->second.type () == i->second.INTEGER_ARRAY)
844 target = buffer.str ();
855 MADARA_GUARD_TYPE guard (
mutex_);
859 size_t begin_exp = 0;
861 std::stringstream builder;
864 for (std::string::size_type i = 0; i < statement.size (); ++i)
867 if (statement[i] ==
'{')
874 else if (statement[i] ==
'}')
879 statement.substr (begin_exp + 1, i - begin_exp - 1);
881 builder << this->
get (results);
890 builder << statement[i];
898 "KARL COMPILE ERROR : Improperly matched braces in %s\n",
904 return builder.str ();
907 #ifndef _MADARA_NO_KARL_ 919 MADARA_GUARD_TYPE guard (
mutex_);
924 key_ptr = &key_actual;
945 MADARA_GUARD_TYPE guard (
mutex_);
950 key_ptr = &key_actual;
971 MADARA_GUARD_TYPE guard (
mutex_);
976 key_ptr = &key_actual;
989 #ifdef _MADARA_PYTHON_CALLBACKS_ 992 boost::python::object callable,
998 MADARA_GUARD_TYPE guard (
mutex_);
1003 key_ptr = &key_actual;
1034 MADARA_GUARD_TYPE guard (
mutex_);
1039 key_ptr = &key_actual;
1060 MADARA_GUARD_TYPE guard (
mutex_);
1065 key_ptr = &key_actual;
1083 "ThreadSafeContext::compile:" \
1084 " compiling %s\n", expression.c_str ());
1086 MADARA_GUARD_TYPE guard (
mutex_);
1088 ce.
logic = expression;
1099 MADARA_GUARD_TYPE guard (
mutex_);
1108 MADARA_GUARD_TYPE guard (
mutex_);
1115 #endif // _MADARA_NO_KARL_ 1122 std::vector <KnowledgeRecord> & target)
1127 MADARA_GUARD_TYPE guard (
mutex_);
1131 target.resize (end - start + 1);
1133 for (
unsigned int i = 0; start <= end; ++start, ++i)
1135 std::stringstream buffer;
1138 target[i] =
get (buffer.str ());
1142 return target.size ();
1149 std::map <std::string, knowledge::KnowledgeRecord> & target)
1154 bool matches_found (
false);
1157 if (subject[subject.size () - 1] ==
'*')
1158 subject.resize (subject.size () - 1);
1161 std::string::size_type subject_size = subject.size ();
1162 const char * subject_ptr = subject.c_str ();
1165 MADARA_GUARD_TYPE guard (
mutex_);
1168 if (expression.size () == 0)
1172 for (KnowledgeMap::iterator i =
map_.begin ();
1173 i !=
map_.end (); ++i)
1175 if (i->first.size () >= subject_size)
1177 int result = strncmp (i->first.c_str (), subject_ptr, subject_size);
1181 target[i->first] = i->second;
1182 matches_found =
true;
1184 else if (matches_found)
1194 return target.size ();
1205 KnowledgeMap::iterator i =
map_.lower_bound (prefix);
1208 if (i !=
map_.end ())
1211 KnowledgeMap::iterator first_match = i;
1212 KnowledgeMap::iterator after_matches =
map_.end ();
1213 VariableReferences::iterator match;
1215 size_t num_matches = 0;
1217 size_t prefix_length = prefix.length ();
1220 while (i !=
map_.end () &&
1221 i->first.compare (0, prefix_length, prefix) == 0)
1234 matches.resize (num_matches);
1240 match = matches.begin ();
1243 while (i != after_matches)
1247 match->set_name (i->first);
1248 match->record_ = &i->second;
1266 std::vector <std::string> & next_keys,
1267 std::map <std::string, knowledge::KnowledgeRecord> & result,
1275 bool matches_found (
false);
1279 MADARA_GUARD_TYPE guard (
mutex_);
1281 KnowledgeMap::iterator i =
map_.begin ();
1285 i =
map_.lower_bound (prefix);
1288 for (; i !=
map_.end (); ++i)
1305 matches_found =
true;
1317 result[i->first] = i->second;
1321 size_t prefix_end = prefix.length () + delimiter.length ();
1323 std::string current_delimiter = i->first.substr (prefix.length (), delimiter.length ());
1325 if (current_delimiter == delimiter && i->first.length () > prefix_end)
1328 size_t key_end = i->first.find (delimiter, prefix_end);
1332 i->first.substr (prefix_end, key_end - prefix_end));
1333 if (current_key != last_key)
1335 next_keys.push_back (current_key);
1336 last_key = current_key;
1344 return result.size ();
1353 MADARA_GUARD_TYPE guard (
mutex_);
1355 std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator>
1358 map_.erase (iters.first, iters.second);
1362 std::pair<KnowledgeRecords::iterator, KnowledgeRecords::iterator>
1368 changed.second = changed.first;
1371 for (++changed.second;
1382 std::pair<KnowledgeRecords::iterator, KnowledgeRecords::iterator>
1390 local_changed.second = local_changed.first;
1393 for (++local_changed.second;
1396 ++local_changed.second);
1403 std::pair<madara::knowledge::KnowledgeMap::iterator,
1404 madara::knowledge::KnowledgeMap::iterator>
1408 std::pair<KnowledgeMap::iterator, KnowledgeMap::iterator>
1412 if(prefix.size() > 0)
1414 ssize_t psz = prefix.size();
1417 ret.second = ret.first =
map_.lower_bound(prefix);
1420 while(ret.second !=
map_.end() &&
1421 ret.second->first.compare(0, psz, prefix) == 0)
1427 std::pair<madara::knowledge::KnowledgeMap::const_iterator,
1428 madara::knowledge::KnowledgeMap::const_iterator>
1432 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1436 if(prefix.size() > 0)
1438 ssize_t psz = prefix.size ();
1441 ret.second = ret.first =
map_.lower_bound(prefix);
1444 while(ret.second !=
map_.end() &&
1445 ret.second->first.compare(0, psz, prefix) == 0)
1456 MADARA_GUARD_TYPE guard (
mutex_);
1458 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1470 MADARA_GUARD_TYPE guard (
mutex_);
1472 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1477 KnowledgeMap::iterator hint = ret.begin();
1478 for(;iters.first != iters.second; ++iters.first)
1480 ret.emplace_hint(ret.end(), iters.first->first.substr(prefix.size()),
1481 iters.first->second);
1492 "ThreadSafeContext::copy:" \
1493 " copying a context\n");
1498 "ThreadSafeContext::copy:" \
1499 " clearing knowledge in target context\n");
1508 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1511 if (predicate.suffix ==
"")
1514 "ThreadSafeContext::copy:" \
1515 " matching predicate.prefix=%s\n", predicate.prefix.c_str ());
1517 for(;iters.first != iters.second; ++iters.first)
1520 "ThreadSafeContext::copy:" \
1521 " looking for %s\n", iters.first->first.c_str ());
1523 auto where =
map_.lower_bound(iters.first->first);
1525 if (where ==
map_.end() || where->first != iters.first->first)
1528 "ThreadSafeContext::copy:" \
1529 " inserting %s\n", iters.first->first.c_str ());
1531 where =
map_.emplace_hint(where,
1532 iters.first->first, iters.first->second);
1537 "ThreadSafeContext::copy:" \
1538 " overwriting %s\n", iters.first->first.c_str ());
1540 where->second = iters.first->second;
1547 "ThreadSafeContext::copy:" \
1548 " matching predicate.suffix=%s\n", predicate.suffix.c_str ());
1550 for(;iters.first != iters.second; ++iters.first)
1556 "ThreadSafeContext::copy:" \
1557 " looking for %s\n", iters.first->first.c_str ());
1559 auto where =
map_.lower_bound(iters.first->first);
1561 if (where ==
map_.end() || where->first != iters.first->first)
1564 "ThreadSafeContext::copy:" \
1565 " inserting %s\n", iters.first->first.c_str ());
1567 where =
map_.emplace_hint(where,
1568 iters.first->first, iters.first->second);
1573 "ThreadSafeContext::copy:" \
1574 " overwriting %s\n", iters.first->first.c_str ());
1576 where->second = iters.first->second;
1587 std::pair<KnowledgeMap::const_iterator, KnowledgeMap::const_iterator>
1588 iters (source.
map_.begin (), source.
map_.end ());
1590 for(;iters.first != iters.second; ++iters.first)
1592 map_.insert (
map_.begin (), KnowledgeMap::value_type(
1593 iters.first->first, iters.first->second));
1609 if (copy_set.size () == 0)
1611 for (KnowledgeMap::const_iterator i = source.
map_.begin ();
1612 i != source.
map_.end (); ++i)
1614 map_[i->first] = (i->second);
1620 for (CopySet::const_iterator key = copy_set.begin ();
1621 key != copy_set.end (); ++key)
1624 KnowledgeMap::const_iterator i = source.
map_.find (key->first);
1627 if (i != source.
map_.end ())
1629 map_[i->first] = (i->second);
1641 "ThreadSafeContext::save_context:" \
1642 " opening file %s\n", filename.c_str ());
1644 int64_t total_written (0);
1645 FILE * file = fopen (filename.c_str (),
"wb");
1649 strncpy (meta.originator,
id.c_str (),
1650 sizeof (meta.originator) <
id.size () + 1 ?
1651 sizeof (meta.originator) :
id.size () + 1);
1657 int64_t max_buffer (102800);
1658 int64_t buffer_remaining (max_buffer);
1661 char * current = buffer.
get_ptr ();
1664 "ThreadSafeContext::save_context:" \
1665 " generating file meta\n");
1667 meta.size += checkpoint_header.encoded_size ();
1668 checkpoint_header.size = checkpoint_header.encoded_size ();
1670 current = meta.write (current, buffer_remaining);
1672 checkpoint_header.clock =
clock_;
1674 current = checkpoint_header.write (current, buffer_remaining);
1677 "ThreadSafeContext::save_context:" \
1678 " writing records\n");
1681 MADARA_GUARD_TYPE guard (
mutex_);
1683 for (KnowledgeMap::const_iterator i =
map_.begin ();
1684 i !=
map_.end (); ++i)
1686 if (i->second.exists ())
1689 int64_t encoded_size = i->second.get_encoded_size (i->first);
1690 ++checkpoint_header.updates;
1691 meta.size += encoded_size;
1692 checkpoint_header.size += encoded_size;
1694 if (encoded_size > buffer_remaining)
1702 (
size_t)(max_buffer - buffer_remaining), 1, file);
1703 total_written += (int64_t)(max_buffer - buffer_remaining);
1704 buffer_remaining = max_buffer;
1706 if (encoded_size > max_buffer)
1712 buffer =
new char[encoded_size];
1713 max_buffer = encoded_size;
1714 buffer_remaining = max_buffer;
1719 current = i->second.write (current, i->first, buffer_remaining);
1723 if (buffer_remaining != max_buffer)
1726 (size_t) (max_buffer - buffer_remaining), 1, file);
1727 total_written += (int64_t) (max_buffer - buffer_remaining);
1731 fseek (file, 0, SEEK_SET);
1734 buffer_remaining = max_buffer;
1736 current = meta.write (current, buffer_remaining);
1737 current = checkpoint_header.write (current, buffer_remaining);
1739 fwrite (buffer.
get_ptr (), current - buffer.
get_ptr (), 1, file);
1752 "ThreadSafeContext::save_context:" \
1753 " opening file %s\n", settings.
filename.c_str ());
1756 FILE * file = fopen (settings.
filename.c_str (),
"wb");
1760 strncpy (meta.originator, settings.
originator.c_str (),
1761 sizeof (meta.originator) < settings.
originator.size () + 1 ?
1762 sizeof (meta.originator) : settings.
originator.size () + 1);
1769 int64_t buffer_remaining (max_buffer);
1772 char * current = buffer.
get_ptr ();
1775 "ThreadSafeContext::save_context:" \
1776 " generating file meta\n");
1778 meta.size += checkpoint_header.encoded_size ();
1779 checkpoint_header.size = checkpoint_header.encoded_size ();
1787 current = meta.write (current, buffer_remaining);
1795 checkpoint_header.clock =
clock_;
1798 current = checkpoint_header.write (current, buffer_remaining);
1801 "ThreadSafeContext::save_context:" \
1802 " writing records\n");
1805 MADARA_GUARD_TYPE guard (
mutex_);
1807 for (KnowledgeMap::const_iterator i =
map_.begin ();
1808 i !=
map_.end (); ++i)
1810 if (i->second.exists ())
1816 "ThreadSafeContext::save_context:" \
1817 " we have %d prefixes to check against.\n",
1820 bool prefix_found =
false;
1822 j < settings.
prefixes.size () && !prefix_found; ++j)
1825 "ThreadSafeContext::save_context:" \
1826 " checking record %s against prefix %s.\n",
1834 "ThreadSafeContext::save_context:" \
1835 " record has the correct prefix.\n");
1837 prefix_found =
true;
1844 "ThreadSafeContext::save_context:" \
1845 " record has the wrong prefix. Rejected.\n");
1852 int64_t encoded_size = i->second.get_encoded_size (i->first);
1853 ++checkpoint_header.updates;
1854 meta.size += encoded_size;
1855 checkpoint_header.size += encoded_size;
1857 current = i->second.write (current, i->first, buffer_remaining);
1862 current = meta.write (buffer.
get_ptr (), max_buffer);
1863 current = checkpoint_header.write (current, max_buffer);
1866 int total = settings.
encode ((
unsigned char *)buffer.
get_ptr (),
1867 (int)meta.size, (
int)max_buffer);
1870 fseek (file, 0, SEEK_SET);
1873 "ThreadSafeContext::save_context:" \
1874 " encoding with buffer filters: %d:%d bytes written.\n",
1875 (
int)meta.size, (
int)checkpoint_header.size);
1877 fwrite (buffer.
get_ptr (), (size_t)total, 1, file);
1892 "ThreadSafeContext::save_as_karl:" \
1893 " opening file %s\n", filename.c_str ());
1895 int64_t bytes_written (0);
1896 std::stringstream buffer;
1898 file.open (filename.c_str ());
1900 if (file.is_open ())
1903 MADARA_GUARD_TYPE guard (
mutex_);
1905 for (KnowledgeMap::const_iterator i =
map_.begin ();
1906 i !=
map_.end (); ++i)
1908 if (i->second.exists ())
1913 if (!i->second.is_binary_file_type ())
1916 if (i->second.is_string_type ())
1928 buffer << i->second;
1929 if (i->second.is_string_type ())
1943 buffer <<
"#read_file ('";
1963 (
void *)&(*i->second.file_value_)[0], i->second.size ());
1977 bytes_written = (int64_t) result.size ();
1982 return bytes_written;
1991 "ThreadSafeContext::save_as_karl:" \
1992 " opening file %s\n", settings.
filename.c_str ());
1994 int64_t bytes_written (0);
1995 std::stringstream buffer;
1997 file.open (settings.
filename.c_str ());
1999 if (file.is_open ())
2002 MADARA_GUARD_TYPE guard (
mutex_);
2004 for (KnowledgeMap::const_iterator i =
map_.begin ();
2005 i !=
map_.end (); ++i)
2007 if (i->second.exists ())
2013 "ThreadSafeContext::save_as_karl:" \
2014 " we have %d prefixes to check against.\n",
2017 bool prefix_found =
false;
2019 j < settings.
prefixes.size () && !prefix_found; ++j)
2022 "ThreadSafeContext::save_as_karl:" \
2023 " checking record %s against prefix %s.\n",
2031 "ThreadSafeContext::save_as_karl:" \
2032 " the record has the correct prefix.\n");
2034 prefix_found =
true;
2041 "ThreadSafeContext::save_as_karl:" \
2042 " the record does not have a correct prefix.\n");
2051 if (!i->second.is_binary_file_type ())
2054 if (i->second.is_string_type ())
2066 buffer << i->second;
2067 if (i->second.is_string_type ())
2081 buffer <<
"#read_file ('";
2101 (
void *)&(*i->second.file_value_)[0], i->second.size ());
2115 bytes_written = (int64_t) result.size ();
2120 return bytes_written;
2129 "ThreadSafeContext::save_as_json:" \
2130 " opening file %s\n", filename.c_str ());
2132 int64_t bytes_written (0);
2134 std::stringstream buffer;
2136 file.open (filename.c_str ());
2138 if (file.is_open ())
2141 MADARA_GUARD_TYPE guard (
mutex_);
2145 for (KnowledgeMap::const_iterator i =
map_.begin ();
2146 i !=
map_.end (); ++i)
2148 if (i->second.exists ())
2154 if (!i->second.is_binary_file_type ())
2157 if (i->second.is_string_type ())
2169 buffer << i->second;
2170 if (i->second.is_string_type ())
2184 buffer <<
"#read_file ('";
2204 (
void *)&(*i->second.file_value_)[0], i->second.size ());
2211 KnowledgeMap::const_iterator j (i);
2213 if (++j !=
map_.end ())
2223 bytes_written = (int64_t) result.size ();
2228 return bytes_written;
2237 "ThreadSafeContext::save_as_json:" \
2238 " opening file %s\n", settings.
filename.c_str ());
2240 int64_t bytes_written (0);
2242 std::stringstream buffer;
2244 file.open (settings.
filename.c_str ());
2246 if (file.is_open ())
2249 MADARA_GUARD_TYPE guard (
mutex_);
2253 for (KnowledgeMap::const_iterator i =
map_.begin ();
2254 i !=
map_.end (); ++i)
2256 if (i->second.exists ())
2262 "ThreadSafeContext::save_as_json:" \
2263 " we have %d prefixes to check against.\n",
2266 bool prefix_found =
false;
2268 j < settings.
prefixes.size () && !prefix_found; ++j)
2271 "ThreadSafeContext::save_as_json:" \
2272 " checking record %s against prefix %s.\n",
2280 "ThreadSafeContext::save_as_json:" \
2281 " the record has the correct prefix.\n");
2283 prefix_found =
true;
2290 "ThreadSafeContext::save_as_json:" \
2291 " the record does not have a correct prefix.\n");
2301 if (!i->second.is_binary_file_type ())
2304 if (i->second.is_string_type ())
2316 buffer << i->second;
2317 if (i->second.is_string_type ())
2331 buffer <<
"#read_file ('";
2351 (
void *)&(*i->second.file_value_)[0], i->second.size ());
2358 KnowledgeMap::const_iterator j (i);
2360 if (++j !=
map_.end ())
2370 bytes_written = (int64_t) result.size ();
2375 return bytes_written;
2386 "ThreadSafeContext::load_context:" \
2387 " opening file %s\n", filename.c_str ());
2390 FILE * file = fopen (filename.c_str (),
"rb");
2392 int64_t total_read (0);
2396 int64_t max_buffer (102800);
2397 int64_t buffer_remaining (max_buffer);
2401 const char * current = buffer.
get_ptr ();
2404 "ThreadSafeContext::load_context:" \
2405 " reading file meta data\n");
2407 total_read = fread (buffer.
get_ptr (),
2408 1, max_buffer, file);
2409 buffer_remaining = (int64_t)total_read;
2416 current = meta.
read (current, buffer_remaining);
2426 for (uint64_t state = 0; state < meta.
states; ++state)
2428 if (buffer_remaining > (int64_t)
2433 current = checkpoint_header.
read (current, buffer_remaining);
2440 if (checkpoint_header.
size > (uint64_t) buffer_remaining)
2447 new char [checkpoint_header.
size];
2448 memcpy (new_buffer.
get_ptr (), current,
2449 (size_t)buffer_remaining);
2452 total_read += fread (new_buffer.
get_ptr () + buffer_remaining, 1,
2453 checkpoint_header.
size 2454 - (uint64_t)buffer_remaining
2458 max_buffer = checkpoint_header.
size;
2459 buffer_remaining = checkpoint_header.
size 2461 current = new_buffer.
get_ptr ();
2462 buffer = new_buffer;
2465 for (uint32_t update = 0;
2466 update < checkpoint_header.
updates; ++update)
2470 current = record.
read (current, key, buffer_remaining);
2476 if (buffer_remaining == 0 && (uint64_t) total_read < meta.
size)
2478 buffer_remaining = max_buffer;
2480 total_read += fread (buffer.
get_ptr (), 1, buffer_remaining, file);
2488 "ThreadSafeContext::load_context:" \
2489 " invalid file. No contextual change.\n");
2505 "ThreadSafeContext::load_context:" \
2506 " opening file %s\n", filename.c_str ());
2509 FILE * file = fopen (filename.c_str (),
"rb");
2511 int64_t total_read (0);
2515 int64_t max_buffer (102800);
2516 int64_t buffer_remaining (max_buffer);
2519 const char * current = buffer.
get_ptr ();
2522 "ThreadSafeContext::load_context:" \
2523 " reading file meta data\n");
2525 total_read = fread (buffer.
get_ptr (),
2526 1, max_buffer, file);
2527 buffer_remaining = (int64_t)total_read;
2534 current = meta.
read (current, buffer_remaining);
2543 for (uint64_t state = 0; state < meta.
states; ++state)
2545 if (buffer_remaining > (int64_t)
2550 current = checkpoint_header.
read (current, buffer_remaining);
2557 if (checkpoint_header.
size > (uint64_t)buffer_remaining)
2564 new char[checkpoint_header.
size];
2565 memcpy (new_buffer.
get_ptr (), current,
2566 (size_t)buffer_remaining);
2569 total_read += fread (new_buffer.
get_ptr () + buffer_remaining, 1,
2570 checkpoint_header.
size 2571 - (uint64_t)buffer_remaining
2575 max_buffer = checkpoint_header.
size;
2576 buffer_remaining = checkpoint_header.
size 2578 current = new_buffer.
get_ptr ();
2579 buffer = new_buffer;
2582 for (uint32_t update = 0;
2583 update < checkpoint_header.
updates; ++update)
2587 current = record.
read (current, key, buffer_remaining);
2593 if (buffer_remaining == 0 && (uint64_t)total_read < meta.
size)
2595 buffer_remaining = max_buffer;
2597 total_read += fread (buffer.
get_ptr (), 1, buffer_remaining, file);
2605 "ThreadSafeContext::load_context:" \
2606 " invalid file. No contextual change.\n");
2622 "ThreadSafeContext::load_context:" \
2623 " opening file %s\n", checkpoint_settings.
filename.c_str ());
2626 FILE * file = fopen (checkpoint_settings.
filename.c_str (),
"rb");
2628 int64_t total_read (0);
2638 int64_t max_buffer (checkpoint_settings.
buffer_size);
2639 int64_t buffer_remaining (max_buffer);
2642 const char * current = buffer.
get_ptr ();
2644 total_read = fread (buffer.
get_ptr (),
2645 1, max_buffer, file);
2646 buffer_remaining = (int64_t)total_read;
2649 "ThreadSafeContext::load_context:" \
2650 " reading file: %d bytes read.\n",
2654 checkpoint_settings.
decode ((
unsigned char *)buffer.
get_ptr (),
2655 (int)(total_read), (int)max_buffer);
2662 current = meta.
read (current, buffer_remaining);
2672 "ThreadSafeContext::load_context:" \
2673 " read File meta. Meta.size=%d\n", (
int)meta.
size);
2682 for (uint64_t state = 0; state < meta.
states &&
2683 state <= checkpoint_settings.
last_state; ++state)
2685 if (buffer_remaining > (int64_t)
2690 current = checkpoint_header.
read (current, buffer_remaining);
2697 if (state == meta.
states - 1)
2702 uint64_t updates_size = checkpoint_header.
size -
2706 "ThreadSafeContext::load_context:" \
2707 " read Checkpoint header. header.size=%d, updates.size=%d\n",
2708 (
int)checkpoint_header.
size, (
int)updates_size);
2715 if (updates_size > (uint64_t)buffer_remaining)
2722 new char[updates_size];
2723 memcpy (new_buffer.
get_ptr (), current,
2724 (size_t)buffer_remaining);
2727 total_read += fread (new_buffer.
get_ptr () + buffer_remaining, 1,
2729 - (uint64_t)buffer_remaining
2733 max_buffer = updates_size;
2734 buffer_remaining = checkpoint_header.
size 2736 current = new_buffer.
get_ptr ();
2737 buffer = new_buffer;
2741 "ThreadSafeContext::load_context:" \
2742 " state=%d, initial_state=%d, last_state=%d\n",
2746 if (state <= checkpoint_settings.
last_state &&
2749 for (uint32_t update = 0;
2750 update < checkpoint_header.
updates; ++update)
2754 current = record.
read (current, key, buffer_remaining);
2757 "ThreadSafeContext::load_context:" \
2758 " read record (%d of %d): %s\n",
2759 (
int)update, (
int)checkpoint_header.
updates, key.c_str ());
2762 if (checkpoint_settings.
prefixes.size () > 0)
2764 bool prefix_found =
false;
2765 for (
size_t j = 0; j < checkpoint_settings.
prefixes.size ()
2766 && !prefix_found; ++j)
2769 "ThreadSafeContext::load_context:" \
2770 " checking record %s against prefix %s\n",
2771 key.c_str (), checkpoint_settings.
prefixes[j].c_str ());
2774 key, checkpoint_settings.
prefixes[j]))
2777 "ThreadSafeContext::load_context:" \
2778 " record has the correct prefix.\n");
2780 prefix_found =
true;
2787 "ThreadSafeContext::load_context:" \
2788 " record does not have the correct prefix. Rejected.\n");
2800 "ThreadSafeContext::load_context:" \
2801 " not a valid state, incrementing by %d bytes.\n",
2804 current += updates_size;
2808 if (buffer_remaining == 0 && (uint64_t)total_read < meta.
size)
2810 buffer_remaining = max_buffer;
2812 total_read += fread (buffer.
get_ptr (), 1, buffer_remaining, file);
2820 "ThreadSafeContext::load_context:" \
2821 " invalid file. No contextual change.\n");
2836 "ThreadSafeContext::save_checkpoint:" \
2837 " opening file %s\n", settings.
filename.c_str ());
2839 int64_t total_written (0);
2840 FILE * file = fopen (settings.
filename.c_str (),
"rb+");
2848 int64_t buffer_remaining (max_buffer);
2851 char * current = buffer.
get_ptr ();
2852 const char * meta_reader = current;
2855 fseek (file, 0, SEEK_SET);
2856 size_t ret = fread (current, meta.encoded_size (), 1, file);
2859 "ThreadSafeContext::save_checkpoint:" \
2860 " failed to read existing file header: size=%d\n",
2861 (
int)meta.encoded_size ());
2866 meta_reader = meta.read (meta_reader, buffer_remaining);
2869 "ThreadSafeContext::save_checkpoint:" \
2870 " init file meta: size=%d, states=%d\n",
2871 (
int)meta.size, (
int)meta.states);
2876 "ThreadSafeContext::save_checkpoint:" \
2877 " setting file meta id to %s\n",
2880 strncpy (meta.originator, settings.
originator.c_str (),
2881 sizeof (meta.originator) < settings.
originator.size () + 1 ?
2882 sizeof (meta.originator) : settings.
originator.size () + 1);
2886 uint64_t checkpoint_start = meta.size;
2891 "ThreadSafeContext::save_checkpoint:" \
2892 " meta.size=%d, chkpt.header.size=%d \n",
2893 (
int)meta.size, (
int)checkpoint_header.
size);
2912 if (local_records.size () != 0)
2917 "ThreadSafeContext::save_checkpoint:" \
2918 " fseek set to %d\n",
2919 (
int)(checkpoint_start));
2922 fseek (file, checkpoint_start, SEEK_SET);
2925 current = checkpoint_header.
write (buffer.
get_ptr (), buffer_remaining);
2928 "ThreadSafeContext::save_checkpoint:" \
2929 " chkpt.header.size=%d, current->buffer delta=%d\n",
2931 (int)(current - buffer.
get_ptr ()));
2935 MADARA_GUARD_TYPE guard (
mutex_);
2937 for (KnowledgeRecords::const_iterator i = local_records.begin ();
2938 i != local_records.end (); ++i)
2940 if (i->second->exists ())
2945 bool prefix_found =
false;
2947 j < settings.
prefixes.size () && !prefix_found; ++j)
2950 i->second->to_string (), settings.
prefixes[j]))
2952 prefix_found =
true;
2961 int64_t encoded_size = i->second->get_encoded_size (i->first);
2964 "ThreadSafeContext::save_checkpoint:" \
2965 " estimated encoded size of update=%d bytes\n",
2968 if (encoded_size > buffer_remaining)
2971 (
size_t)(max_buffer - buffer_remaining), 1, file);
2972 total_written += (int64_t)(max_buffer - buffer_remaining);
2973 buffer_remaining = max_buffer;
2976 "ThreadSafeContext::save_checkpoint:" \
2977 " encoded_size larger than remaining buffer. Flushing\n");
2979 if (encoded_size > max_buffer)
2985 buffer =
new char[encoded_size];
2986 max_buffer = encoded_size;
2987 buffer_remaining = max_buffer;
2991 "ThreadSafeContext::save_checkpoint:" \
2992 " encoded_size larger than entire buffer. Reallocating\n");
2996 current = i->second->write (current, i->first, buffer_remaining);
2998 checkpoint_header.
size += (uint64_t)encoded_size;
3002 "ThreadSafeContext::save_checkpoint:" \
3003 " chkpt.header.size=%d, current->buffer delta=%d\n",
3004 (
int)checkpoint_header.
size, (
int)(current - buffer.
get_ptr ()));
3013 "ThreadSafeContext::save_checkpoint:" \
3014 " resetting checkpoint. Next checkpoint starts fresh here\n");
3021 "ThreadSafeContext::save_checkpoint:" \
3022 " writing final data for state #%d\n",
3025 if (buffer_remaining != max_buffer)
3028 (size_t)(current - buffer.
get_ptr ()), 1, file);
3029 total_written += (size_t)(current - buffer.
get_ptr ());
3032 "ThreadSafeContext::save_checkpoint:" \
3033 " current->buffer=%d bytes, max->remaining=%d bytes\n",
3034 (
int)(current - buffer.
get_ptr ()), (
int)max_buffer - buffer_remaining);
3038 "ThreadSafeContext::save_checkpoint:" \
3039 " chkpt.header: size=%d, updates=%d\n",
3040 (
int)checkpoint_header.
size, (
int)checkpoint_header.
updates);
3042 buffer_remaining = max_buffer;
3043 fseek (file, checkpoint_start, SEEK_SET);
3044 current = checkpoint_header.
write (buffer.
get_ptr (), buffer_remaining);
3045 fwrite (buffer.
get_ptr (), current - buffer.
get_ptr (), 1, file);
3047 meta.size += checkpoint_header.
size;
3050 "ThreadSafeContext::save_checkpoint:" \
3051 " new file meta: size=%d, states=%d, lastchkpt.size=%d\n",
3052 (
int)meta.size, (
int)meta.states, (
int)checkpoint_header.
size);
3055 fseek (file, 0, SEEK_SET);
3058 "ThreadSafeContext::save_checkpoint:" \
3059 " updating file meta data in the file\n");
3061 buffer_remaining = max_buffer;
3062 current = meta.write (buffer.
get_ptr (), buffer_remaining);
3064 fwrite (buffer.
get_ptr (), current - buffer.
get_ptr (), 1, file);
3073 "ThreadSafeContext::save_checkpoint:" \
3074 " checkpoint doesn't exist. Creating.\n");
3077 file = fopen (settings.
filename.c_str (),
"wb");
3080 strncpy (meta.originator, settings.
originator.c_str (),
3081 sizeof (meta.originator) < settings.
originator.size () + 1 ?
3082 sizeof (meta.originator) : settings.
originator.size () + 1);
3087 int64_t buffer_remaining (max_buffer);
3090 char * current = buffer.
get_ptr ();
3093 "ThreadSafeContext::save_checkpoint:" \
3094 " creating file meta. file.meta.size=%d, state.size=%d\n",
3095 (
int)meta.size, (
int)checkpoint_header.
encoded_size ());
3106 current = meta.write (current, buffer_remaining);
3117 current = checkpoint_header.
write (current, buffer_remaining);
3120 "ThreadSafeContext::save_checkpoint:" \
3121 " writing diff records\n");
3124 MADARA_GUARD_TYPE guard (
mutex_);
3128 for (KnowledgeRecords::const_iterator i = local_records.begin ();
3129 i != local_records.end (); ++i)
3131 if (i->second->exists ())
3137 "ThreadSafeContext::save_checkpoint:" \
3138 " we have %d prefixes to check against.\n",
3141 bool prefix_found =
false;
3143 j < settings.
prefixes.size () && !prefix_found; ++j)
3146 "ThreadSafeContext::save_checkpoint:" \
3147 " checking record %s against prefix %s.\n",
3155 "ThreadSafeContext::save_checkpoint:" \
3156 " record has the correct prefix.\n");
3158 prefix_found =
true;
3165 "ThreadSafeContext::save_checkpoint:" \
3166 " record has the wrong prefix. Rejected.\n");
3173 int64_t encoded_size = i->second->get_encoded_size (i->first);
3176 "ThreadSafeContext::save_checkpoint:" \
3177 " estimated encoded size of update=%d bytes\n",
3181 meta.size += encoded_size;
3182 checkpoint_header.
size += encoded_size;
3184 current = i->second->write (current, i->first, buffer_remaining);
3187 "ThreadSafeContext::save_checkpoint:" \
3188 " current->buffer delta=%d bytes\n",
3189 (
int)(current - buffer.
get_ptr ()));
3195 current = meta.write (buffer.
get_ptr (), max_buffer);
3196 current = checkpoint_header.
write (current, max_buffer);
3199 int total = settings.
encode ((
unsigned char *)buffer.
get_ptr (),
3200 (int)meta.size, (
int)max_buffer);
3203 fseek (file, 0, SEEK_SET);
3206 "ThreadSafeContext::save_checkpoint:" \
3207 " file size: %d bytes written (file:%d, state.size:%d).\n",
3208 (
int)total, (
int)meta.size, (
int)checkpoint_header.
size);
3210 fwrite (buffer.
get_ptr (), (size_t)total, 1, file);
3222 return checkpoint_header.
size;
3232 "ThreadSafeContext::save_checkpoint:" \
3233 " opening file %s\n", filename.c_str ());
3235 int64_t total_written (0);
3236 FILE * file = fopen (filename.c_str (),
"rb+");
3243 int64_t max_buffer (102800);
3244 int64_t buffer_remaining (max_buffer);
3247 char * current = buffer.
get_ptr ();
3248 const char * meta_reader = current;
3251 fseek (file, 0, SEEK_SET);
3252 size_t ret = fread (current, meta.encoded_size (), 1, file);
3255 "ThreadSafeContext::save_checkpoint:" \
3256 " failed to read existing file header: size=%d\n",
3257 (
int)meta.encoded_size ());
3262 meta_reader = meta.read (meta_reader, buffer_remaining);
3267 "ThreadSafeContext::save_checkpoint:" \
3268 " setting file meta id to %s\n",
3271 strncpy (meta.originator,
id.c_str (),
3272 sizeof (meta.originator) <
id.size () + 1 ?
3273 sizeof (meta.originator) :
id.size () + 1);
3277 uint64_t checkpoint_start = meta.size;
3280 "ThreadSafeContext::save_checkpoint:" \
3281 " generating file meta\n");
3284 checkpoint_header.
size = 0;
3287 MADARA_GUARD_TYPE guard (
mutex_);
3292 if (records.size () + local_records.size () != 0)
3297 fseek (file, checkpoint_start, SEEK_SET);
3298 current = checkpoint_header.
write (current, buffer_remaining);
3301 "ThreadSafeContext::save_checkpoint:" \
3302 " writing records\n");
3304 for (KnowledgeRecords::const_iterator i = records.begin ();
3305 i != records.end (); ++i)
3307 if (i->second->exists ())
3310 int64_t encoded_size = i->second->get_encoded_size (i->first);
3312 meta.size += encoded_size;
3313 checkpoint_header.
size += encoded_size;
3315 if (encoded_size > buffer_remaining)
3323 (
size_t)(max_buffer - buffer_remaining), 1, file);
3324 total_written += (int64_t)(max_buffer - buffer_remaining);
3325 buffer_remaining = max_buffer;
3328 "ThreadSafeContext::save_checkpoint:" \
3329 " encoded_size larger than remaining buffer. Flushing\n");
3331 if (encoded_size > max_buffer)
3337 buffer =
new char[encoded_size];
3338 max_buffer = encoded_size;
3339 buffer_remaining = max_buffer;
3343 "ThreadSafeContext::save_checkpoint:" \
3344 " encoded_size larger than entire buffer. Reallocating\n");
3348 current = i->second->write (current, i->first, buffer_remaining);
3352 for (KnowledgeRecords::const_iterator i = local_records.begin ();
3353 i != local_records.end (); ++i)
3355 if (i->second->exists ())
3358 int64_t encoded_size = i->second->get_encoded_size (i->first);
3360 meta.size += encoded_size;
3361 checkpoint_header.
size += encoded_size;
3363 if (encoded_size > buffer_remaining)
3371 (
size_t)(max_buffer - buffer_remaining), 1, file);
3372 total_written += (int64_t)(max_buffer - buffer_remaining);
3373 buffer_remaining = max_buffer;
3376 "ThreadSafeContext::save_checkpoint:" \
3377 " encoded_size larger than remaining buffer. Flushing\n");
3379 if (encoded_size > max_buffer)
3385 buffer =
new char[encoded_size];
3386 max_buffer = encoded_size;
3387 buffer_remaining = max_buffer;
3391 "ThreadSafeContext::save_checkpoint:" \
3392 " encoded_size larger than entire buffer. Reallocating\n");
3396 current = i->second->write (current, i->first, buffer_remaining);
3400 if (buffer_remaining != max_buffer)
3403 (size_t) (max_buffer - buffer_remaining), 1, file);
3404 total_written += (size_t) (max_buffer - buffer_remaining);
3408 "ThreadSafeContext::save_checkpoint:" \
3409 " updating file meta data\n");
3412 fseek (file, 0, SEEK_SET);
3415 buffer_remaining = max_buffer;
3418 current = meta.write (current, buffer_remaining);
3420 fwrite (buffer.
get_ptr (), current - buffer.
get_ptr (), 1, file);
3423 "ThreadSafeContext::save_checkpoint:" \
3424 " updating checkpoint meta data\n");
3427 fseek (file, checkpoint_start, SEEK_SET);
3430 buffer_remaining = max_buffer;
3432 current = checkpoint_header.
write (current, buffer_remaining);
3434 fwrite (buffer.
get_ptr (), current - buffer.
get_ptr (), 1, file);
3440 return checkpoint_header.
size;
This class encapsulates an entry in a KnowledgeBase.
bool expand_variables
Toggle for always attempting to expand variables (true) or never expanding variables (false) ...
ExpressionTree interpret(madara::knowledge::ThreadSafeContext &context, const std::string &input)
Compiles an expression into an expression tree.
uint32_t get_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets quality of a variable.
std::map< std::string, bool > CopySet
Typedef for set of copyable keys.
uint64_t initial_lamport_clock
initial lamport clock saved in the checkpoint
std::vector< MatchPredicate > predicates
A vector of acceptable predicates to match (prefix and suffix).
Function * retrieve_function(const std::string &name, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Retrieves an external function.
std::string version
the MADARA version
int update_record_from_external(const std::string &key, const knowledge::KnowledgeRecord &rhs, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true))
Atomically sets if the variable value meets update conditions.
madara::knowledge::KnowledgeMap map_
Hash table containing variable names and values.
void get_matches(const std::string &prefix, const std::string &suffix, VariableReferences &matches)
Creates an iteration of VariableReferences to all keys matching the prefix and suffix.
utility::ScopedArray< const char > name_
potential string value of the node (size int)
size_t buffer_size
the size of the buffer needed for the checkpoint
std::vector< std::string > expansion_splitters_
size_t to_vector(const std::string &subject, unsigned int start, unsigned int end, std::vector< KnowledgeRecord > &target)
Fills a vector with Knowledge Records that begin with a common subject and have a finite range of int...
This class stores a function definition.
uint32_t quality
priority of the update
knowledge::KnowledgeMap to_map_stripped(const std::string &prefix) const
Creates a map with Knowledge Records that begin with the given prefix.
int64_t save_context(const std::string &filename, const std::string &id="") const
Saves the context to a file.
MADARA_CONDITION_TYPE changed_
size_t to_map(const std::string &subject, std::map< std::string, knowledge::KnowledgeRecord > &target)
Fills a variable map with Knowledge Records that match an expression.
int set_file(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an arbitrary string.
madara::knowledge::KnowledgeRecord KnowledgeRecord
int set_jpeg(const std::string &key, const unsigned char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to a JPEG image.
const knowledge::KnowledgeRecords & get_modifieds(void) const
Retrieves a list of modified variables.
const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a KnowledgeRecord instance from a buffer and updates the amount of buffer room remaining...
void copy(const ThreadSafeContext &source, const KnowledgeRequirements &settings)
Copies variables and values from source to this context.
void set_file(const unsigned char *new_value, size_t size)
sets the value to an unknown file type
uint64_t last_state
the last state number of interest (useful for loading ranges of checkpoint states.
int encode(unsigned char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
void set_text(const char *new_value, size_t size)
sets the value to a plaintext string
This class stores variables and their values for use by any entity needing state information in a thr...
int64_t load_context(const std::string &filename, std::string &id, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings(true, true, true, false))
Loads the context from a file.
virtual madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings)=0
Evaluates the expression tree.
void print(unsigned int level) const
Atomically prints all variables and values in the context.
int decode(unsigned char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
MADARA_Export utility::Refcounter< logger::Logger > global_logger
void delete_prefix(const std::string &prefix, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Deletes keys starting with the given prefix.
Holds settings for checkpoints to load or save.
void set_xml(const char *new_value, size_t size)
sets the value to an xml string
std::string logic
the logic that was compiled
int64_t save_as_karl(const std::string &filename) const
Saves the context to a file as karl assignments, rather than binary.
Provides knowledge logging services to files and terminals.
Optimized reference to a variable within the knowledge base.
Compiled, optimized KaRL logic.
std::vector< KnowledgeRecord > FunctionArguments
bool is_valid(void) const
Checks to see if the variable reference has been initialized.
void define_function(const std::string &name, knowledge::KnowledgeRecord(*func)(FunctionArguments &, Variables &), const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Defines an external function.
bool clear_knowledge
If true, during loads, clear the KnowledgeBase first.
void set_value(const KnowledgeRecord &new_value)
Sets the value from another KnowledgeRecord, does not copy clock and write_quality.
std::string originator
the originator id of the checkpoint
MADARA_Export std::string extract_path(const std::string &name)
Extracts the path of a filename.
uint32_t get_write_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets write quality of this process for a variable.
std::string filename
path to files
madara::expression::ExpressionTree expression
the expression tree
void set_jpeg(const unsigned char *new_value, size_t size)
sets the value to a jpeg
const knowledge::KnowledgeRecords & get_local_modified(void) const
Retrieves a list of modified local variables.
uint64_t initial_state
the initial state number of interest (useful for loading ranges of checkpoint states).
int64_t save_checkpoint(const std::string &filename, const std::string &id="") const
Saves a checkpoint of a list of changes to a file.
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
DeepIterator< Iterator > deep_iterate(const Iterator &i)
Returns an input iterator from an iterator.
int set_xml(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
uint64_t last_lamport_clock
final lamport clock saved in the checkpoint
std::vector< std::string > prefixes
A list of prefixes to save/load.
T * get_ptr(void)
get the underlying pointer
MADARA_Export ssize_t write_file(const std::string &filename, void *buffer, size_t size)
Writes a file with provided contents.
FunctionMap functions_
map of function names to functions
void to_string(std::string &target, const std::string &array_delimiter=",", const std::string &record_delimiter=";\n", const std::string &key_val_delimiter="=") const
Saves all keys and values into a string, using the underlying knowledge::KnowledgeRecord::to_string f...
bool override_timestamp
use the timestamps in this class instead of current wallclock time when writing context or checkpoint...
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Holds settings requirements for knowledge, usually in copying.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
::std::map< std::string, KnowledgeRecord > KnowledgeMap
An abstract base class defines a simple abstract implementation of an expression tree node...
knowledge::KnowledgeRecords local_changed_map_
uint64_t initial_timestamp
initial wallclock time saved in the checkpoint
std::pair< KnowledgeMap::const_iterator, KnowledgeMap::const_iterator > get_prefix_range(const std::string &prefix) const
int set_if_unequal(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value, uint32_t quality, uint64_t clock, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets if the variable value will be different.
bool clear_knowledge
If true, during loads, clear the KnowledgeBase first.
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
bool always_overwrite
Toggle for always overwriting records, regardless of quality, clock values, etc.
std::vector< VariableReference > VariableReferences
a vector of variable references
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
int set_text(const std::string &key, const char *value, size_t size, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets the value of a variable to an XML string.
std::string expand_statement(const std::string &statement) const
Expands a string with variable expansion.
MADARA_Export std::string to_string_version(uint32_t version)
Converts a MADARA uint32_t version number to human-readable.
KnowledgeRecord * get_record(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Retrieves a knowledge record from the key.
knowledge::KnowledgeRecords changed_map_
logger::Logger * logger_
Logger for printing.
uint64_t states
the number of states checkpointed in the file stream
void set_name(const std::string &name)
Sets the name of the variable.
uint64_t last_timestamp
final wallclock time saved in the checkpoint
bool override_lamport
use the lamport clocks in this class instead of KB clock when writing context or checkpoints ...
uint32_t write_quality
write priority for any local updates
knowledge::KnowledgeRecord * record_
Reference to knowledge record.
void reset_checkpoint(void) const
Reset all checkpoint variables in the modified lists.
bool clear(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Clears a variable.
VariableReference get_ref(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically returns a reference to the variable.
Settings for applying knowledge updates.
void set_write_quality(const std::string &key, uint32_t quality, const KnowledgeReferenceSettings &settings)
Atomically sets write quality of this process for a variable.
Copyright (c) 2015 Carnegie Mellon University.
uint32_t set_quality(const std::string &key, uint32_t quality, bool force_update, const KnowledgeReferenceSettings &settings)
Atomically sets quality of this process for a variable.
int read_file(const std::string &filename, uint32_t read_as_type=0)
reads a file and sets the type appropriately according to the extension
MADARA_Export bool begins_with(const std::string &input, const std::string &prefix)
Check if input contains prefix at the beginning.
Settings for applying knowledge updates.
~ThreadSafeContext(void)
Destructor.
bool reset_checkpoint
If true, resets the checkpoint to start a new diff from this point forward.
Provides an interface for external functions into the MADARA KaRL variable settings.
void mark_and_signal(const char *name, knowledge::KnowledgeRecord *record, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
method for marking a record modified and signaling changes
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
madara::expression::Interpreter * interpreter_
KaRL interpreter.
ThreadSafeContext()
Constructor.
int read_file(const std::string &key, const std::string &filename, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically reads a file into a variable.
MADARA_Export bool ends_with(const std::string &input, const std::string &ending)
Check if input contains a pattern at the end.
uint64_t clock
last modification time
int64_t save_as_json(const std::string &filename) const
Saves the context to a file as JSON.