MADARA  3.1.8
KnowledgeRecordFilters.cpp
Go to the documentation of this file.
3 
5 #include <memory>
7 
8 #ifdef _MADARA_PYTHON_CALLBACKS_
9 
10 #include <boost/python/call.hpp>
12 
13 #endif
14 
15 #ifdef _MADARA_JAVA_
16 
17 #include <jni.h>
18 #include "madara_jni.h"
20 
21 #endif
22 
24  : context_ (0)
25 {
26 }
27 
29  const knowledge::KnowledgeRecordFilters & filters)
30  : filters_ (filters.filters_),
32  context_ (filters.context_)
33 {
34 }
35 
37 {
38 }
39 
40 void
43 {
44  if (this != &rhs)
45  {
46  filters_ = rhs.filters_;
48  context_ = rhs.context_;
49  }
50 }
51 
52 void
55 {
56  if (function != 0)
57  {
61  "KnowledgeRecordFilters::add: "
62  "Adding C record filter\n");
63 
64  // start with 1st bit, check every bit until types is 0
65  for (uint32_t cur = 1; types > 0; cur <<= 1)
66  {
67  // if current is set in the bitmask
68  if (madara::utility::bitmask_check (types, cur))
69  {
70  // remove the filter list from the type cur
71  filters_[cur].push_back (Function (function));
72  }
73 
74  // remove the current flag from the types
75  types = madara::utility::bitmask_remove (types, cur);
76  }
77  }
78 }
79 
80 void
82  void (*function) (
84 {
85  if (function != 0)
86  {
90  "KnowledgeRecordFilters::add: "
91  "Adding C aggregate filter\n");
92 
93  aggregate_filters_.push_back (AggregateFilter (function));
94  }
95 }
96 
97 void
99  filters::AggregateFilter * functor)
100 {
101  if (functor != 0)
102  {
106  "KnowledgeRecordFilters::add: "
107  "Adding aggregate functor filter\n");
108 
109  aggregate_filters_.push_back (AggregateFilter (functor));
110  }
111 }
112 
113 void
115 filters::BufferFilter * functor)
116 {
117  if (functor != 0)
118  {
122  "KnowledgeRecordFilters::add: "
123  "Adding buffer functor filter\n");
124 
125  buffer_filters_.push_back (functor);
126  }
127 }
128 
129 void
131  uint32_t types,
132  filters::RecordFilter * functor)
133 {
134  if (functor != 0)
135  {
139  "KnowledgeRecordFilters::add: "
140  "Adding record function filter to types\n");
141 
142  // start with 1st bit, check every bit until types is 0
143  for (uint32_t cur = 1; types > 0; cur <<= 1)
144  {
145  // if current is set in the bitmask
146  if (madara::utility::bitmask_check (types, cur))
147  {
148  // remove the filter list from the type cur
149  filters_[cur].push_back (Function (functor));
150  }
151 
152  // remove the current flag from the types
153  types = madara::utility::bitmask_remove (types, cur);
154  }
155  }
156 }
157 
158 #ifdef _MADARA_JAVA_
159 
160 void
162  jobject & callable)
163 {
164  if (callable != NULL)
165  {
169  "KnowledgeRecordFilters::add: "
170  "Adding Java record filter\n");
171 
172  // start with 1st bit, check every bit until types is 0
173  for (uint32_t cur = 1; types > 0; cur <<= 1)
174  {
175  // if current is set in the bitmask
176  if (madara::utility::bitmask_check (types, cur))
177  {
178  // remove the filter list from the type cur
179  filters_[cur].push_back (Function (callable));
180  }
181 
182  // remove the current flag from the types
183  types = madara::utility::bitmask_remove (types, cur);
184  }
185  }
186 }
187 
188 void
190  jobject & callable)
191 {
192  if (callable != NULL)
193  {
197  "KnowledgeRecordFilters::add: "
198  "Adding Java aggregate filter\n");
199 
200  aggregate_filters_.push_back (AggregateFilter (callable));
201  }
202 }
203 
204 #endif
205 
206 #ifdef _MADARA_PYTHON_CALLBACKS_
207 
208 void
210  boost::python::object & callable)
211 {
212  if (!callable.is_none ())
213  {
214  // start with 1st bit, check every bit until types is 0
215  for (uint32_t cur = 1; types > 0; cur <<= 1)
216  {
217  // if current is set in the bitmask
218  if (madara::utility::bitmask_check (types, cur))
219  {
220  // remove the filter list from the type cur
221  filters_[cur].push_back (Function (callable));
222  }
223 
224  // remove the current flag from the types
225  types = madara::utility::bitmask_remove (types, cur);
226  }
227  }
228 }
229 
230 void
232  boost::python::object & callable)
233 {
234  if (!callable.is_none ())
235  {
236  aggregate_filters_.push_back (AggregateFilter (callable));
237  }
238 }
239 
240 #endif
241 
242 void
244  ThreadSafeContext * context)
245 {
246  context_ = context;
247 }
248 
249 void
251 {
252  // start with 1st bit, check every bit until types is 0
253  for (uint32_t cur = 1; types > 0; cur <<= 1)
254  {
255  // if current is set in the bitmask
256  if (madara::utility::bitmask_check (types, cur))
257  {
258  // remove the filter list from the type cur
259  filters_.erase (cur);
260  }
261 
262  // remove the current flag from the types
263  types = madara::utility::bitmask_remove (types, cur);
264  }
265 }
266 
267 void
269  void)
270 {
271  aggregate_filters_.clear ();
272 }
273 
274 void
276 void)
277 {
278  buffer_filters_.clear ();
279 }
280 
281 void
283  void) const
284 {
285  if (context_)
286  {
290  "Printing Knowledge Record Filter Chains by Type...\n");
291 
292  for (FilterMap::const_iterator i = filters_.begin ();
293  i != filters_.end (); ++i)
294  {
298  "%d = %d chained filters\n", i->first, i->second.size ());
299  }
300 
304  "%d chained aggregate filters\n", aggregate_filters_.size ());
305 
309  "%d chained buffer filters\n", buffer_filters_.size ());
310  }
311 }
312 
313 
316  const knowledge::KnowledgeRecord & input,
317  const std::string & name,
318  transport::TransportContext & transport_context) const
319 {
320  // grab the filter chain entry for the type
321  uint32_t type = input.type ();
322  FilterMap::const_iterator type_match = filters_.find (type);
323  knowledge::KnowledgeRecord result (input);
324 
325  // if there are filters for this type
326  if (type_match != filters_.end ())
327  {
331  "KnowledgeRecordFilters::filter: "
332  "Entering record filter logic\n");
333 
334  const FilterChain & chain = type_match->second;
335  FunctionArguments arguments;
336 
337  // JVMs appear to do strange things with the stack on jni_attach
338  std::unique_ptr <Variables> heap_variables (
339  new Variables ());
340 
341  heap_variables->context_ = context_;
342 
343  for (FilterChain::const_iterator i = chain.begin ();
344  i != chain.end (); ++i)
345  {
349  "KnowledgeRecordFilters::filter: "
350  "Preparing args for filter\n");
351 
357  arguments.resize (madara::filters::TOTAL_ARGUMENTS);
358 
359  if (name != "")
360  {
361  // second argument is the variable name, if applicable
362  arguments[1].set_value (name);
363  }
364 
365  // third argument is the operation being performed
366  arguments[2].set_value (KnowledgeRecord::Integer (
367  transport_context.get_operation ()));
368 
369  // fourth argument is the send/rebroadcast bandwidth utilization
370  arguments[3].set_value (KnowledgeRecord::Integer (
371  transport_context.get_send_bandwidth ()));
372 
373  // fifth argument is the send/rebroadcast bandwidth utilization
374  arguments[4].set_value (KnowledgeRecord::Integer (
375  transport_context.get_receive_bandwidth ()));
376 
377  // sixth argument is the message timestamp
378  arguments[5].set_value (KnowledgeRecord::Integer (
379  transport_context.get_message_time ()));
380 
381  // sixth argument is the message timestamp
382  arguments[6].set_value (KnowledgeRecord::Integer (
383  transport_context.get_current_time ()));
384 
385  // seventh argument is the networking domain
386  arguments[7].set_value (transport_context.get_domain ());
387 
388  // eighth argument is the update originator
389  arguments[8].set_value (transport_context.get_originator ());
390 
391  // setup arguments to the function
392  arguments[0] = result;
393 
397  "KnowledgeRecordFilters::filter: "
398  "Checking filter type\n");
399 
400  // optimize selection for functors, the preferred filter impl
401  if (i->is_functor ())
402  {
406  "KnowledgeRecordFilters::filter: "
407  "Calling functor filter\n");
408 
409  result = i->functor->filter (arguments, *heap_variables.get ());
410  }
411 #ifdef _MADARA_JAVA_
412  else if (i->is_java_callable ())
413  {
417  "KnowledgeRecordFilters::filter: "
418  "Calling Java filter\n");
419 
421 
426  jclass jvarClass = madara::utility::java::find_class (
427  jvm.env, "com/madara/Variables");
428  jclass jlistClass = madara::utility::java::find_class (
429  jvm.env, "com/madara/KnowledgeList");
430 
431  jmethodID fromPointerCall = jvm.env->GetStaticMethodID (jvarClass,
432  "fromPointer", "(J)Lcom/madara/Variables;");
433  jobject jvariables = jvm.env->CallStaticObjectMethod (jvarClass,
434  fromPointerCall, (jlong)heap_variables.get ());
435 
436  // prep to create the KnowledgeList
437  jmethodID listConstructor = jvm.env->GetMethodID (jlistClass,
438  "<init>", "([J)V");
439 
440  jlongArray ret = jvm.env->NewLongArray ((jsize)arguments.size ());
441  jlong * tmp = new jlong[(jsize)arguments.size ()];
442 
443  for (unsigned int x = 0; x < arguments.size (); x++)
444  {
445  tmp[x] = (jlong)arguments[x].clone ();
446  }
447 
448  jvm.env->SetLongArrayRegion (ret, 0, (jsize)arguments.size (), tmp);
449  delete[] tmp;
450 
451  // create the KnowledgeList
452  jobject jlist = jvm.env->NewObject (jlistClass, listConstructor, ret);
453 
454  // get the filter's class
455  jclass filterClass = jvm.env->GetObjectClass (i->java_object);
456 
457  // get the filter method
458  jmethodID filterMethod = jvm.env->GetMethodID (filterClass,
459  "filter",
460  "(Lcom/madara/KnowledgeList;Lcom/madara/Variables;)Lcom/madara/KnowledgeRecord;");
461 
465  "KnowledgeRecordFilters::filter: "
466  "Calling Java method\n");
467 
468  // call the filter and hold the result
469  jobject jresult = jvm.env->CallObjectMethod (i->java_object,
470  filterMethod, jlist, jvariables);
471 
475  "KnowledgeRecordFilters::filter: "
476  "Obtaining pointer from Knowledge Record result\n");
477 
478  jmethodID getPtrMethod = jvm.env->GetMethodID (
479  jvm.env->GetObjectClass (jresult), "getCPtr", "()J");
480  jlong cptr = jvm.env->CallLongMethod (jresult, getPtrMethod);
481 
485  "KnowledgeRecordFilters::filter: "
486  "Checking return for origination outside of filter\n");
487 
488  bool do_delete = true;
489  //We need to see if they returned an arg we sent them, or a new value
490  for (unsigned int x = 0; x < arguments.size (); x++)
491  {
492  if (cptr == (jlong)&(arguments[x]))
493  {
494  do_delete = false;
495  break;
496  }
497  }
498 
499  if (cptr != 0)
500  {
504  "KnowledgeRecordFilters::filter: "
505  "Doing a deep copy of the return value\n");
506 
507  result = *(madara::knowledge::KnowledgeRecord *)cptr;
508  }
509 
510  if (do_delete)
511  {
515  "KnowledgeRecordFilters::filter: "
516  "Deleting the original pointer\n");
517 
518  delete (KnowledgeRecord*)cptr;
519  }
520 
521  jvm.env->DeleteWeakGlobalRef (jvarClass);
522  jvm.env->DeleteLocalRef (jvariables);
523  jvm.env->DeleteLocalRef (jresult);
524  jvm.env->DeleteLocalRef (jlist);
525  jvm.env->DeleteWeakGlobalRef (jlistClass);
526  jvm.env->DeleteLocalRef (ret);
527  }
528 #endif
529 
530 #ifdef _MADARA_PYTHON_CALLBACKS_
531 
532  else if (i->is_python_callable ())
533  {
537  "KnowledgeRecordFilters::filter: "
538  "Calling Python filter\n");
539 
540  // acquire the interpreter lock to use the python function
541  madara::python::Acquire_GIL acquire_gil;
542 
543  // some guides have stated that we should let python handle exceptions
544  result = boost::python::call <madara::knowledge::KnowledgeRecord> (
545  i->python_function.ptr (), boost::ref (arguments),
546  boost::ref (*heap_variables.get ()));
547  }
548 
549 #endif
550 
551  // if the function is not zero
552  else if (i->is_extern_unnamed ())
553  {
557  "KnowledgeRecordFilters::filter: "
558  "Calling unnamed C filter\n");
559 
560  result = i->extern_unnamed (arguments, *heap_variables.get ());
561  }
562 
563  // did the filter add records to be sent?
564  if (arguments.size () > madara::filters::TOTAL_ARGUMENTS)
565  {
566  for (unsigned int i = madara::filters::TOTAL_ARGUMENTS;
567  i + 1 < arguments.size (); i += 2)
568  {
569  if (arguments[i].is_string_type ())
570  {
574  "KnowledgeRecordFilters::filter: Adding %s "
575  "to transport context.\n", arguments[i].to_string ().c_str ());
576 
577  transport_context.add_record (
578  arguments[i].to_string (), arguments[i + 1]);
579  }
580  else
581  {
585  "KnowledgeRecordFilters::filter: ERROR. Filter attempted to"
586  " add records to transport context, but args[%d] was not"
587  " a string value.\n", i);
588 
589  break;
590  }
591  }
592  }
593  }
594  }
595 
596  return result;
597 }
598 
599 void
601  KnowledgeMap & records,
602  const transport::TransportContext & transport_context) const
603 {
604  // if there are aggregate filters
605  if (aggregate_filters_.size () > 0)
606  {
610  "KnowledgeRecordFilters::filter: "
611  "Entering aggregate filter method\n");
612 
613  // JVMs appear to do strange things with the stack on jni_attach
614  std::unique_ptr <Variables> heap_variables (
615  new Variables ());
616 
617  heap_variables->context_ = context_;
618 
619  for (AggregateFilters::const_iterator i = aggregate_filters_.begin ();
620  i != aggregate_filters_.end (); ++i)
621  {
622  // optimize selection for functors, the preferred filter impl
623  if (i->is_functor ())
624  {
628  "KnowledgeRecordFilters::filter: "
629  "Checking vars for null\n");
630 
631  Variables * vars = heap_variables.get ();
632  if (vars)
633  {
637  "KnowledgeRecordFilters::filter: "
638  "Calling C++ filter\n");
639 
640  i->functor->filter (records, transport_context,
641  *vars);
642 
646  "KnowledgeRecordFilters::filter: "
647  "Finished calling C++ filter\n");
648  }
649  else
650  {
654  "KnowledgeRecordFilters::filter: "
655  "Memory issues caused vars to be null before filtering\n");
656  }
657  }
658 #ifdef _MADARA_JAVA_
659  else if (i->is_java_callable ())
660  {
661  // manage VM attachment
663 
664  // JVMs appear to do strange things with the stack on jni_attach
665  std::auto_ptr <KnowledgeMap> heap_records (new KnowledgeMap (records));
666  std::auto_ptr <transport::TransportContext> heap_context (
667  new transport::TransportContext (transport_context));
668 
672  jclass jvarClass = madara::utility::java::find_class (jvm.env,
673  "com/madara/Variables");
674  jclass jpacketClass = madara::utility::java::find_class (jvm.env,
675  "com/madara/transport/filters/Packet");
676  jclass jcontextClass = madara::utility::java::find_class (jvm.env,
677  "com/madara/transport/TransportContext");
678 
679  jmethodID varfromPointerCall = jvm.env->GetStaticMethodID (
680  jvarClass,
681  "fromPointer", "(J)Lcom/madara/Variables;");
682  jobject jvariables = jvm.env->CallStaticObjectMethod (
683  jvarClass,
684  varfromPointerCall, (jlong)heap_variables.get ());
685 
686  jmethodID packetfromPointerCall = jvm.env->GetStaticMethodID (
687  jpacketClass,
688  "fromPointer", "(J)Lcom/madara/transport/filters/Packet;");
689  jobject jpacket = jvm.env->CallStaticObjectMethod (jpacketClass,
690  packetfromPointerCall, (jlong)heap_records.get ());
691 
692  jmethodID contextfromPointerCall = jvm.env->GetStaticMethodID (
693  jcontextClass,
694  "fromPointer", "(J)Lcom/madara/transport/TransportContext;");
695  jobject jcontext = jvm.env->CallStaticObjectMethod (jcontextClass,
696  contextfromPointerCall, (jlong)heap_context.get ());
697 
698  // get the filter's class and method
699  jclass filterClass = jvm.env->GetObjectClass (i->java_object);
700  jmethodID filterMethod = jvm.env->GetMethodID (filterClass,
701  "filter",
702  "(Lcom/madara/transport/filters/Packet;"
703  "Lcom/madara/transport/TransportContext;Lcom/madara/Variables;)V");
704 
705 
709  "KnowledgeRecordFilters::filter: "
710  "Calling Java method\n");
711 
712  jvm.env->CallVoidMethod (i->java_object, filterMethod,
713  jpacket, jcontext, jvariables);
714 
718  "KnowledgeRecordFilters::filter: "
719  "Overwriting resulting records from heap records\n");
720 
721  // overwrite the old records
722  //records = *heap_records.get ();
723 
724  for (KnowledgeMap::const_iterator i = heap_records->begin ();
725  i != heap_records->end (); ++i)
726  {
727  records[i->first] = i->second;
728  }
729 
730  jvm.env->DeleteLocalRef (filterClass);
731  jvm.env->DeleteLocalRef (jcontext);
732  jvm.env->DeleteLocalRef (jpacket);
733  jvm.env->DeleteWeakGlobalRef (jvarClass);
734  jvm.env->DeleteWeakGlobalRef (jpacketClass);
735  jvm.env->DeleteWeakGlobalRef (jcontextClass);
736  // the auto_ptr should clear up the heap-allocated records
737  }
738 #endif
739 
740 #ifdef _MADARA_PYTHON_CALLBACKS_
741 
742  else if (i->is_python_callable ())
743  {
744  // acquire the interpreter lock to use the python function
745  madara::python::Acquire_GIL acquire_gil;
746 
747  // some guides have stated that we should let python handle exceptions
748  boost::python::call <madara::knowledge::KnowledgeRecord> (
749  i->python_function.ptr (),
750  boost::ref (records), boost::ref (transport_context),
751  boost::ref (*heap_variables.get ()));
752  }
753 
754 #endif
755 
756  // if the function is not zero
757  else if (i->is_extern_unnamed ())
758  {
759  i->unnamed_filter (records, transport_context,
760  *heap_variables.get ());
761  }
762 
763  }
764  }
765 }
766 
768  unsigned char * source, int size, int max_size) const
769 {
770  // decode from back to front
771  for (filters::BufferFilters::const_reverse_iterator i = buffer_filters_.rbegin ();
772  i != buffer_filters_.rend (); ++i)
773  {
774  (*i)->decode (source, size, max_size);
775  }
776 }
777 
779  unsigned char * source, int size, int max_size) const
780 {
781  // encode from front to back
782  for (filters::BufferFilters::const_iterator i = buffer_filters_.begin ();
783  i != buffer_filters_.end (); ++i)
784  {
785  (*i)->encode (source, size, max_size);
786  }
787 }
788 
789 size_t
791  void) const
792 {
793  return filters_.size ();
794 }
795 
796 size_t
798  void) const
799 {
800  return aggregate_filters_.size ();
801 }
802 
803 size_t
805 void) const
806 {
807  return buffer_filters_.size ();
808 }
This class encapsulates an entry in a KnowledgeBase.
JNIEnv * env
The Java environment.
Definition: Acquire_VM.h:58
knowledge::KnowledgeRecord filter(const knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to its filter chain.
This class encapsulates attaching and detaching to a VM.
Definition: Acquire_VM.h:24
void clear(uint32_t types)
Clears the list of filters for the specified types.
int32_t type(void) const
returns the size of the value
bool bitmask_check(T mask, T values)
Returns true if mask contains values.
Definition: Utility.inl:46
This class stores a function definition.
Definition: Functions.h:44
void clear_aggregate_filters(void)
Clears the aggregate filters.
void filter_encode(unsigned char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
void filter_decode(unsigned char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
void print_num_filters(void) const
Prints the number of filters chained for each type.
Abstract base class for implementing aggregate record filters via a functor interface.
filters::BufferFilters buffer_filters_
List of buffer filters.
ThreadSafeContext * context_
Context used by this filter.
void attach(ThreadSafeContext *context)
Attaches a context.
This class stores variables and their values for use by any entity needing state information in a thr...
MADARA_Export utility::Refcounter< logger::Logger > global_logger
Abstract base class for implementing individual record filters via a functor interface.
Definition: RecordFilter.h:33
Provides map of data types to a filter chain to apply to the data.
std::vector< KnowledgeRecord > FunctionArguments
Provides context about the transport.
This class stores a function definition.
uint64_t get_receive_bandwidth(void) const
Gets the receive bandwidth in bytes per second.
#define madara_logger_cond_log(conditional, logger, alt_logger_ptr, level,...)
High-performance logger that performs conditional logging based on first arg.
Definition: Logger.h:56
size_t get_number_of_buffer_filters(void) const
Returns the number of buffer filters.
uint64_t get_current_time(void) const
Gets the current timestamp.
void clear_buffer_filters(void)
Clears the buffer filters.
int64_t get_operation(void) const
Get operation that the context is performing.
AggregateFilters aggregate_filters_
List of aggregate filters.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
void add_record(const std::string &key, const madara::knowledge::KnowledgeRecord &record)
Adds a record to the list that should be appended to send or rebroadcast.
size_t get_number_of_filtered_types(void) const
Returns the number of types that have filters.
uint64_t get_message_time(void) const
Gets the message timestamp.
static constexpr struct madara::knowledge::tags::string_t string
size_t get_number_of_aggregate_filters(void) const
Returns the number of aggregate update filters.
T bitmask_remove(T mask, T values)
Removes values from a bit mask.
Definition: Utility.inl:55
void operator=(const KnowledgeRecordFilters &rhs)
Assignment operator.
const std::string & get_domain(void) const
Returns the network domain.
const std::string & get_originator(void) const
Returns the current message originator.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
Abstract base class for implementing buffer filters via a functor interface.
Definition: BufferFilter.h:26
void add(uint32_t types, knowledge::KnowledgeRecord(*function)(FunctionArguments &, Variables &))
Adds a filter to the list of types.
knowledge::KnowledgeRecord get(const std::string &key, const KnowledgeReferenceSettings &settings=knowledge::KnowledgeReferenceSettings(false))
Retrieves the value of a variable.
Provides an interface for external functions into the MADARA KaRL variable settings.
std::list< Function > FilterChain
a chain of filters
FilterMap filters_
Container for mapping types to filter chains.
uint64_t get_send_bandwidth(void) const
Gets the send/rebroadcast bandwidth in bytes per second.