MADARA  3.1.8
Transport.cpp
Go to the documentation of this file.
1 #include "Transport.h"
2 
6 
7 #include <algorithm>
8 
10  TransportSettings & new_settings,
12  : is_valid_ (false), shutting_down_ (false),
13  valid_setup_ (mutex_), id_ (id),
14  settings_ (new_settings), context_ (context)
15 
16 #ifndef _MADARA_NO_KARL_
17  , on_data_received_ (context.get_logger ())
18 #endif // _MADARA_NO_KARL_
19 {
22 
23 #ifdef _USE_CID_
24  settings_.setup ();
25 #endif // _USE_CID_
26 }
27 
29 {
30 }
31 
32 int
34 {
35  // check for an on_data_received ruleset
36  if (settings_.on_data_received_logic.length () != 0)
37  {
39  "transport::Base::setup" \
40  " setting rules to %s\n",
42 
43 #ifndef _MADARA_NO_KARL_
45  on_data_received_ = interpreter.interpret (context_,
47 #endif // _MADARA_NO_KARL_
48  }
49  else
50  {
52  "transport::Base::setup" \
53  " no permanent rules were set\n");
54  }
55 
56  // setup the send buffer
57  if (settings_.queue_length > 0)
58  buffer_ = new char [settings_.queue_length];
59 
60  // if read domains has not been set, then set to write domain
61  if (settings_.num_read_domains () == 0)
62  {
64  "transport::Base::setup" \
65  " no read domains set. Adding write domain (%s)\n",
66  settings_.write_domain.c_str ());
67 
69  }
70  else
71  {
73  "transport::Base::setup" \
74  " settings configured with %d read domains\n",
76  }
77 
78  if (settings_.num_read_domains () > 0 &&
80  {
81  std::vector <std::string> domains;
82  settings_.get_read_domains (domains);
83 
84  std::stringstream buffer;
85 
86  for (unsigned int i = 0; i < domains.size (); ++i)
87  {
88  buffer << domains[i];
89 
90  if (i != domains.size () - 1)
91  {
92  buffer << ", ";
93  }
94  }
95 
97  "transport::Base::setup" \
98  " Write domain: %s. Read domains: %s\n",
99  settings_.write_domain.c_str (), buffer.str ().c_str ());
100  }
101 
102  return validate_transport ();
103 }
104 
105 void
107 {
109 }
110 
111 int
113  const char * buffer,
114  uint32_t bytes_read,
115  const std::string & id,
118  BandwidthMonitor & send_monitor,
119  BandwidthMonitor & receive_monitor,
120  knowledge::KnowledgeMap & rebroadcast_records,
121 #ifndef _MADARA_NO_KARL_
122  knowledge::CompiledExpression & on_data_received,
123 #endif // _MADARA_NO_KARL_
124 
125  const char * print_prefix,
126  const char * remote_host,
127  MessageHeader *& header)
128 {
129  // reset header to 0, so it is safe to delete
130  header = 0;
131 
132  int max_buffer_size = (int)bytes_read;
133 
134  // tell the receive bandwidth monitor about the transaction
135  receive_monitor.add (bytes_read);
136 
138  "%s:" \
139  " Receive bandwidth = %" PRIu64 " B/s\n",
140  print_prefix,
141  receive_monitor.get_bytes_per_second ());
142 
143  bool is_reduced = false;
144  bool is_fragment = false;
145 
147  "%s:" \
148  " calling decode filters on %" PRIu32 " bytes\n",
149  print_prefix, bytes_read);
150 
151  // call decodes, if applicable
152  bytes_read = (uint32_t)settings.filter_decode ((unsigned char *)buffer,
153  max_buffer_size, max_buffer_size);
154 
156  "%s:" \
157  " Decoding resulted in %" PRIu32 " final bytes\n",
158  print_prefix, bytes_read);
159 
160  // setup buffer remaining, used by the knowledge record read method
161  int64_t buffer_remaining = (int64_t)bytes_read;
162 
163  // clear the rebroadcast records
164  rebroadcast_records.clear ();
165 
166  // receive records will be what we pass to the aggregate filter
167  knowledge::KnowledgeMap updates;
168 
169  // check the buffer for a reduced message header
170  if (bytes_read >= ReducedMessageHeader::static_encoded_size () &&
172  {
174  "%s:" \
175  " processing reduced KaRL message from %s\n",
176  print_prefix,
177  remote_host);
178 
179  header = new ReducedMessageHeader ();
180  is_reduced = true;
181  }
182  else if (bytes_read >= MessageHeader::static_encoded_size () &&
184  {
186  "%s:" \
187  " processing KaRL message from %s\n",
188  print_prefix,
189  remote_host);
190 
191  header = new MessageHeader ();
192  }
193  else if (bytes_read >= FragmentMessageHeader::static_encoded_size () &&
195  {
197  "%s:" \
198  " processing KaRL fragment message from %s\n",
199  print_prefix,
200  remote_host);
201 
202  header = new FragmentMessageHeader ();
203  is_fragment = true;
204  }
205  else if (bytes_read >= 8 + MADARA_IDENTIFIER_LENGTH)
206  {
207  // get the text that appears as identifier.
208  char identifier[MADARA_IDENTIFIER_LENGTH];
209  strncpy (identifier, buffer + 8, MADARA_IDENTIFIER_LENGTH);
210  identifier[7] = 0;
211 
213  "%s:" \
214  " dropping non-KaRL message with id %s from %s\n",
215  print_prefix,
216  identifier,
217  remote_host);
218 
219  return -1;
220  }
221  else
222  {
224  "%s:" \
225  " dropping too short message from %s (length %i)\n",
226  print_prefix,
227  remote_host,
228  bytes_read);
229 
230  return -1;
231  }
232 
233  const char * update = header->read (buffer, buffer_remaining);
234 
236  "%s:" \
237  " header info: %s\n",
238  print_prefix, header->to_string ().c_str ());
239 
240  if (header->size < bytes_read)
241  {
243  "%s:" \
244  " Message header.size (%" PRIu64 " bytes) is less than actual"
245  " bytes read (%" PRIu32 " bytes). Dropping message.\n",
246  print_prefix, header->size, bytes_read);
247 
248  return -1;
249  }
250 
251  if (is_fragment &&
252  exists (header->originator, header->clock,
253  ((FragmentMessageHeader *)header)->update_number, settings.fragment_map))
254  {
256  "%s:" \
257  " Fragment already exists in fragment map. Dropping.\n",
258  print_prefix);
259 
260  return -1;
261  }
262 
263  if (!is_reduced)
264  {
265  // reject the message if it is us as the originator (no update necessary)
266  if (id == header->originator)
267  {
269  "%s:" \
270  " dropping message from ourself\n",
271  print_prefix);
272 
273  return -2;
274  }
275  else
276  {
278  "%s:" \
279  " remote id (%s) is not our own\n",
280  print_prefix,
281  remote_host);
282  }
283 
284  if (settings.is_trusted (remote_host))
285  {
287  "%s: remote id (%s) is trusted\n",
288  print_prefix,
289  remote_host);
290  }
291  else
292  {
294  "%s:" \
295  " dropping message from untrusted peer (%s\n",
296  print_prefix,
297  remote_host);
298 
299  // delete the header and continue to the svc loop
300  return -3;
301  }
302 
303  std::string originator (header->originator);
304 
305  if (settings.is_trusted (originator))
306  {
308  "%s:" \
309  " originator (%s) is trusted\n",
310  print_prefix,
311  originator.c_str ());
312  }
313  else
314  {
316  "%s:" \
317  " dropping message from untrusted originator (%s)\n",
318  print_prefix,
319  originator.c_str ());
320 
321  return -4;
322  }
323 
324  // reject the message if it is from a different domain
325  if (!settings.is_reading_domain (header->domain))
326  {
328  "%s:" \
329  " remote id (%s) has an untrusted domain (%s). Dropping message.\n",
330  print_prefix,
331  remote_host,
332  header->domain);
333 
334  // delete the header and continue to the svc loop
335  return -5;
336  }
337  else
338  {
340  "%s:" \
341  " remote id (%s) message is in our domain\n",
342  print_prefix,
343  remote_host);
344  }
345  }
346 
347  // fragments are special cases
348  if (is_fragment)
349  {
350  // grab the fragment header
351  FragmentMessageHeader * frag_header =
352  dynamic_cast <FragmentMessageHeader *> (header);
353 
355  "%s:" \
356  " Processing fragment %" PRIu32 " of %s:%" PRIu64 ".\n",
357  print_prefix, frag_header->update_number,
358  frag_header->originator, frag_header->clock);
359 
360  // add the fragment and attempt to defrag the message
361  char * message = transport::add_fragment (
362  frag_header->originator, frag_header->clock,
363  frag_header->update_number, buffer, settings.fragment_queue_length,
364  settings.fragment_map, true);
365 
366  // if we have no return message, we may have previously defragged it
367  if (!message)
368  {
369  return 0;
370  }
371  else
372  {
374  "%s:" \
375  " Message has been pieced together from fragments. Processing...\n",
376  print_prefix);
377 
383  buffer_remaining = (int64_t)frag_header->get_size (message);
384  if (buffer_remaining <= settings.queue_length)
385  {
386  char * buffer_override = (char *)buffer;
387  memcpy (buffer_override, message, frag_header->get_size (message));
388 
389  // check the buffer for a reduced message header
391  {
393  "%s:" \
394  " processing reduced KaRL message from %s\n",
395  print_prefix,
396  remote_host);
397 
398  header = new ReducedMessageHeader ();
399  is_reduced = true;
400  update = header->read (buffer, buffer_remaining);
401  }
402  else if (MessageHeader::message_header_test (buffer))
403  {
405  "%s:" \
406  " processing KaRL message from %s\n",
407  print_prefix,
408  remote_host);
409 
410  header = new MessageHeader ();
411  update = header->read (buffer, buffer_remaining);
412  }
413 
414  delete [] message;
415  }
416  }
417  }
418 
419  int actual_updates = 0;
420  uint64_t current_time = time (NULL);
421  double deadline = settings.get_deadline ();
422  TransportContext transport_context (
424  receive_monitor.get_bytes_per_second (),
425  send_monitor.get_bytes_per_second (),
426  header->timestamp, current_time,
427  header->domain, header->originator,
428  remote_host);
429 
430  uint64_t latency (0);
431 
432  if (deadline >= 1.0)
433  {
434  if (header->timestamp < current_time)
435  {
436  latency = current_time - header->timestamp;
437 
438  if (latency > deadline)
439  {
441  "%s:" \
442  " deadline violation (latency is %" PRIu64 ", deadline is %f).\n",
443  print_prefix,
444  latency, deadline);
445 
446  return -6;
447  }
448  }
449  else
450  {
452  "%s:" \
453  " Cannot compute message latency." \
454  " Message header timestamp is in the future." \
455  " message.timestamp = %" PRIu64 ", cur_timestamp = %" PRIu64 ".\n",
456  print_prefix,
457  header->timestamp, current_time);
458  }
459  }
460 
462  "%s:" \
463  " iterating over the %" PRIu32 " updates\n",
464  print_prefix,
465  header->updates);
466 
467  // temporary record for reading from the updates buffer
469  record.quality = header->quality;
470  record.clock = header->clock;
471  std::string key;
472 
473  bool dropped = false;
474 
475  if (send_monitor.is_bandwidth_violated (
476  settings.get_send_bandwidth_limit ()))
477  {
478  dropped = true;
480  "%s:" \
481  " Send monitor has detected violation of bandwidth limit." \
482  " Dropping packet from rebroadcast list\n", print_prefix);
483  }
484  else if (receive_monitor.is_bandwidth_violated (
485  settings.get_total_bandwidth_limit ()))
486  {
487  dropped = true;
489  "%s:" \
490  " Receive monitor has detected violation of bandwidth limit." \
491  " Dropping packet from rebroadcast list...\n", print_prefix);
492  }
493  else if (settings.get_participant_ttl () < header->ttl)
494  {
495  dropped = true;
497  "%s:" \
498  " Transport participant TTL is lower than header ttl." \
499  " Dropping packet from rebroadcast list...\n", print_prefix);
500  }
501 
503  "%s:" \
504  " Applying %" PRIu32 " updates\n", print_prefix, header->updates);
505 
506  // iterate over the updates
507  for (uint32_t i = 0; i < header->updates; ++i)
508  {
509  // read converts everything into host format from the update stream
510  update = record.read (update, key, buffer_remaining);
511 
512  if (buffer_remaining < 0)
513  {
515  "%s:" \
516  " unable to process message. Buffer remaining is negative." \
517  " Server is likely being targeted by custom KaRL tools.\n",
518  print_prefix);
519 
520  // we do not delete the header as this will be cleaned up later
521  break;
522  }
523  else
524  {
526  "%s:" \
527  " Applying receive filter to %s\n", print_prefix, key.c_str ());
528 
529  record = settings.filter_receive (record, key, transport_context);
530 
531  if (record.exists ())
532  {
534  "%s:" \
535  " Filter results for %s were %s\n", print_prefix,
536  key.c_str (), record.to_string ().c_str ());
537 
538  updates[key] = record;
539  }
540  else
541  {
543  "%s:" \
544  " Filter resulted in dropping %s\n", print_prefix, key.c_str ());
545  }
546  }
547  }
548 
549  const knowledge::KnowledgeMap & additionals = transport_context.get_records ();
550 
551  if (additionals.size () > 0)
552  {
554  "%s:" \
555  " %lld additional records being handled after receive.\n", print_prefix,
556  (long long)additionals.size ());
557 
558  for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
559  i != additionals.end (); ++i)
560  {
561  updates[i->first] = i->second;
562  }
563 
564  transport_context.clear_records ();
565 
566  if (header->ttl < 2)
567  header->ttl = 2;
568 
569  // modify originator to indicate we are the originator of modifications
570  strncpy (header->originator, id.c_str (),
571  sizeof (header->originator) - 1);
572 
573  }
574 
575  // apply aggregate receive filters
576  if (settings.get_number_of_receive_aggregate_filters () > 0
577  && (updates.size () > 0 || header->type == transport::REGISTER))
578  {
580  "%s:" \
581  " Applying aggregate receive filters.\n", print_prefix);
582 
583 
584  settings.filter_receive (updates, transport_context);
585  }
586  else
587  {
589  "%s:" \
590  " No aggregate receive filters were applied...\n",
591  print_prefix);
592  }
593 
595  "%s:" \
596  " Locking the context to apply updates.\n", print_prefix);
597 
598  {
599  knowledge::ContextGuard guard (context);
600 
602  "%s:" \
603  " Applying updates to context.\n", print_prefix);
604 
605  // apply updates from the update list
606  for (knowledge::KnowledgeMap::iterator i = updates.begin ();
607  i != updates.end (); ++i)
608  {
609  int result = 0;
610 
611  result = i->second.apply (context, i->first, header->quality,
612  header->clock, false);
613  ++actual_updates;
614 
615  if (result != 1)
616  {
618  "%s:" \
619  " update %s=%s was rejected\n",
620  print_prefix,
621  key.c_str (), record.to_string ().c_str ());
622  }
623  else
624  {
626  "%s:" \
627  " update %s=%s was accepted\n",
628  print_prefix,
629  key.c_str (), record.to_string ().c_str ());
630  }
631  }
632  }
633 
634  context.set_changed ();
635 
636  if (!dropped)
637  {
638  transport_context.set_operation (
640 
642  "%s:" \
643  " Applying rebroadcast filters to receive results.\n", print_prefix);
644 
645  // create a list of rebroadcast records from the updates
646  for (knowledge::KnowledgeMap::iterator i = updates.begin ();
647  i != updates.end (); ++i)
648  {
649  i->second = settings.filter_rebroadcast (
650  i->second, i->first, transport_context);
651 
652  if (i->second.exists ())
653  {
654  if (i->second.to_string () != "")
655  {
657  "%s:" \
658  " Filter results for key %s were %s\n", print_prefix,
659  i->first.c_str (), i->second.to_string ().c_str ());
660  }
661  rebroadcast_records[i->first] = i->second;
662  }
663  else
664  {
666  "%s:" \
667  " Filter resulted in dropping %s\n", print_prefix,
668  i->first.c_str ());
669  }
670  }
671 
672  const knowledge::KnowledgeMap & additionals = transport_context.get_records ();
673 
674  for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
675  i != additionals.end (); ++i)
676  {
677  rebroadcast_records[i->first] = i->second;
678  }
679 
681  "%s:" \
682  " Applying aggregate rebroadcast filters to %d records.\n",
683  print_prefix, rebroadcast_records.size ());
684 
685  // apply aggregate filters to the rebroadcast records
687  && rebroadcast_records.size () > 0)
688  {
689  settings.filter_rebroadcast (rebroadcast_records, transport_context);
690  }
691  else
692  {
694  "%s:" \
695  " No aggregate rebroadcast filters were applied...\n",
696  print_prefix);
697  }
698 
700  "%s:" \
701  " Returning to caller with %d rebroadcast records.\n",
702  print_prefix, rebroadcast_records.size ());
703 
704  }
705  else
706  {
708  "%s:" \
709  " Rebroadcast packet was dropped...\n",
710  print_prefix);
711  }
712 
713  // before we send to others, we first execute rules
714  if (settings.on_data_received_logic.length () != 0)
715  {
716 #ifndef _MADARA_NO_KARL_
718  "%s:" \
719  " evaluating rules in %s\n",
720  print_prefix,
721  settings.on_data_received_logic.c_str ());
722 
723  context.evaluate (on_data_received);
724 #endif // _MADARA_NO_KARL_
725 
726  }
727  else
728  {
730  "%s:" \
731  " no permanent rules were set\n",
732  print_prefix);
733  }
734 
735  return actual_updates;
736 }
737 
738 
739 int
742  char * buffer,
743  int64_t & buffer_remaining,
745  const char * print_prefix,
746  MessageHeader * header,
747  const knowledge::KnowledgeMap & records,
748  PacketScheduler & packet_scheduler)
749 {
750  int result = 0;
751 
752  if (header->ttl > 0 && records.size () > 0 && packet_scheduler.add ())
753  {
754  // keep track of the message_size portion of buffer
755  uint64_t * message_size = (uint64_t *)buffer;
756  int max_buffer_size = (int)buffer_remaining;
757 
758  // the number of updates will be the size of the records map
759  header->updates = uint32_t (records.size ());
760 
761  // set the update to the end of the header
762  char * update = header->write (buffer, buffer_remaining);
763 
764  for (knowledge::KnowledgeMap::const_iterator i = records.begin ();
765  i != records.end (); ++i)
766  {
767  update = i->second.write (update, i->first, buffer_remaining);
768  }
769 
770  if (buffer_remaining > 0)
771  {
772  int size = (int)(settings.queue_length - buffer_remaining);
773  *message_size = madara::utility::endian_swap ((uint64_t)size);
774 
776  "%s:" \
777  " %" PRIu64 " bytes prepped for rebroadcast packet\n",
778  print_prefix, size);
779 
780  result = size;
781 
783  "%s:" \
784  " calling encode filters\n",
785  print_prefix);
786 
787  settings.filter_encode ((unsigned char *)buffer,
788  result, max_buffer_size);
789  }
790  else
791  {
793  "%s:" \
794  " Not enough buffer for rebroadcasting packet\n",
795  print_prefix);
796 
797  result = -2;
798  }
799  }
800  else
801  {
803  "%s:" \
804  " No rebroadcast necessary.\n",
805  print_prefix);
806 
807  result = -1;
808  }
809 
810  packet_scheduler.print_status (logger::LOG_DETAILED, print_prefix);
811 
812  return result;
813 }
814 
816  const madara::knowledge::KnowledgeRecords & orig_updates,
817  const char * print_prefix)
818 {
819  // check to see if we are shutting down
820  long ret = this->check_transport ();
821  if (-1 == ret)
822  {
824  "%s: transport has been told to shutdown",
825  print_prefix);
826 
827  return ret;
828  }
829  else if (-2 == ret)
830  {
832  "%s: transport is not valid",
833  print_prefix);
834 
835  return ret;
836  }
837 
838  // get the maximum quality from the updates
839  uint32_t quality = knowledge::max_quality (orig_updates);
840  bool reduced = false;
841 
842  knowledge::KnowledgeMap filtered_updates;
843 
845  "%s:" \
846  " Applying filters before sending...\n",
847  print_prefix);
848 
852  (uint64_t) time (NULL), (uint64_t) time (NULL),
854  id_);
855 
856  bool dropped = false;
857 
860  {
861  dropped = true;
863  "%s:" \
864  " Send monitor has detected violation of bandwidth limit." \
865  " Dropping packet...\n", print_prefix);
866  }
869  {
870  dropped = true;
872  "%s:" \
873  " Receive monitor has detected violation of bandwidth limit." \
874  " Dropping packet...\n", print_prefix);
875  }
876 
877  if (!dropped && packet_scheduler_.add ())
878  {
883  for (knowledge::KnowledgeRecords::const_iterator i = orig_updates.begin ();
884  i != orig_updates.end (); ++i)
885  {
887  "%s:" \
888  " Calling filter chain.\n", print_prefix);
889 
890  // filter the record according to the send filter chain
891  knowledge::KnowledgeRecord result = settings_.filter_send (*i->second, i->first,
892  transport_context);
893 
895  "%s:" \
896  " Filter returned.\n", print_prefix);
897 
898  if (result.exists ())
899  {
901  "%s:" \
902  " Adding record to update list.\n", print_prefix);
903 
904  filtered_updates[i->first] = result;
905  }
906  else
907  {
909  "%s:" \
910  " Filter removed record from update list.\n", print_prefix);
911  }
912  }
913 
914  const knowledge::KnowledgeMap & additionals = transport_context.get_records ();
915 
916  for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
917  i != additionals.end (); ++i)
918  {
920  "%s:" \
921  " Filter added a record %s to the update list.\n",
922  print_prefix, i->first.c_str ());
923  filtered_updates[i->first] = i->second;
924  }
925  }
926  else
927  {
929  "%s:" \
930  " Packet scheduler has dropped packet...\n", print_prefix);
931 
932  return 0;
933  }
934 
936  "%s:" \
937  " Applying %d aggregate update send filters to %d updates...\n",
938  print_prefix, (int)settings_.get_number_of_send_aggregate_filters (),
939  (int)filtered_updates.size ());
940 
941  // apply the aggregate filters
943  filtered_updates.size () > 0)
944  {
945  settings_.filter_send (filtered_updates, transport_context);
946  }
947  else
948  {
950  "%s:" \
951  " No aggregate send filters were applied...\n",
952  print_prefix);
953  }
954 
956 
958  "%s:" \
959  " Finished applying filters before sending...\n",
960  print_prefix);
961 
962  if (filtered_updates.size () == 0)
963  {
965  "%s:" \
966  " Filters removed all data. Nothing to send.\n",
967  print_prefix);
968 
969  return 0;
970  }
971 
972  // allocate a buffer to send
973  char * buffer = buffer_.get_ptr ();
974  int64_t buffer_remaining = settings_.queue_length;
975 
976  if (buffer == 0)
977  {
979  "%s:" \
980  " Unable to allocate buffer of size " PRIu32 ". Exiting thread.\n",
981  print_prefix,
983 
984  return -3;
985  }
986 
987 
988  // set the header to the beginning of the buffer
989  MessageHeader * header = 0;
990 
992  {
994  "%s:" \
995  " Preparing message with reduced message header.\n",
996  print_prefix);
997 
998  header = new ReducedMessageHeader ();
999  reduced = true;
1000  }
1001  else
1002  {
1004  "%s:" \
1005  " Preparing message with normal message header.\n",
1006  print_prefix);
1007 
1008  header = new MessageHeader ();
1009  }
1010 
1011  // get the clock
1012  header->clock = context_.get_clock ();
1013 
1014  if (!reduced)
1015  {
1016  // copy the domain from settings
1017  strncpy (header->domain, this->settings_.write_domain.c_str (),
1018  sizeof (header->domain) - 1);
1019 
1020  // get the quality of the key
1021  header->quality = quality;
1022 
1023  // copy the message originator (our id)
1024  strncpy (header->originator, id_.c_str (), sizeof (header->originator) - 1);
1025 
1026  // send data is generally an assign type. However, MessageHeader is
1027  // flexible enough to support both, and this will simply our read thread
1028  // handling
1030 
1031  }
1032 
1033  // set the time-to-live
1034  header->ttl = settings_.get_rebroadcast_ttl ();
1035 
1036  header->updates = uint32_t (filtered_updates.size ());
1037 
1038  // compute size of this header
1039  header->size = header->encoded_size ();
1040 
1041  // keep track of the maximum buffer size for encoding
1042  int max_buffer_size = (int)buffer_remaining;
1043 
1044  // set the update to the end of the header
1045  char * update = header->write (buffer, buffer_remaining);
1046  uint64_t * message_size = (uint64_t *)buffer;
1047 
1048  // Message header format
1049  // [size|id|domain|originator|type|updates|quality|clock|list of updates]
1050 
1064  // zero out the memory
1065  //memset(buffer, 0, madara::transport::MAX_PACKET_SIZE);
1066 
1067  // Message update format
1068  // [key|value]
1069 
1070  int j = 0;
1071  for (knowledge::KnowledgeMap::const_iterator i = filtered_updates.begin ();
1072  i != filtered_updates.end (); ++i, ++j)
1073  {
1074  update = i->second.write (update, i->first, buffer_remaining);
1075 
1076  if (buffer_remaining > 0)
1077  {
1079  "%s:" \
1080  " update[%d] => encoding %s of type %" PRId32 " and size %" PRIu32 "\n",
1081  print_prefix,
1082  j, i->first.c_str (), i->second.type (), i->second.size ());
1083  }
1084  else
1085  {
1087  "%s:" \
1088  " unable to encode update[%d] => %s of type %"
1089  PRId32 " and size %" PRIu32 "\n",
1090  print_prefix,
1091  j, i->first.c_str (), i->second.type (), i->second.size ());
1092  }
1093  }
1094 
1095  long size (0);
1096 
1097  if (buffer_remaining > 0)
1098  {
1099  size = (long)(settings_.queue_length - buffer_remaining);
1100  *message_size = madara::utility::endian_swap ((uint64_t)size);
1101 
1102  // before we send to others, we first execute rules
1103  if (settings_.on_data_received_logic.length () != 0)
1104  {
1105 #ifndef _MADARA_NO_KARL_
1106 
1108  "%s:" \
1109  " evaluating rules in %s\n",
1110  print_prefix,
1112 
1114 
1116  "%s:" \
1117  " rules have been successfully evaluated\n",
1118  print_prefix);
1119 
1120 #endif // _MADARA_NO_KARL_
1121 
1122  }
1123  else
1124  {
1126  "%s:" \
1127  " no permanent rules were set\n",
1128  print_prefix);
1129  }
1130  }
1131 
1133  "%s:" \
1134  " calling encode filters\n",
1135  print_prefix);
1136 
1137  // buffer is ready encoding
1138  size = (long)settings_.filter_encode ((unsigned char *)buffer_.get_ptr (),
1139  (int)size, max_buffer_size);
1140 
1142  "%s:" \
1143  " header info before encode: %s\n",
1144  print_prefix, header->to_string ().c_str ());
1145 
1146  delete header;
1147 
1148  return size;
1149 }
This class encapsulates an entry in a KnowledgeBase.
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
MADARA_Export char * add_fragment(const char *originator, uint64_t clock, uint32_t update_number, const char *fragment, uint32_t queue_length, OriginatorFragmentMap &map, bool clear=true)
Adds a fragment to an originator fragment map and returns the aggregate message if the message is com...
madara::expression::ExpressionTree on_data_received_
data received rules, defined in Transport settings
Definition: Transport.h:195
bool is_trusted(const std::string &peer) const
Checks if a peer is trusted.
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
Base(const std::string &id, TransportSettings &new_settings, knowledge::ThreadSafeContext &context)
Constructor.
Definition: Transport.cpp:9
OriginatorFragmentMap fragment_map
map of fragments received by originator
void print_status(unsigned int log_level=0, const char *prefix="PacketScheduler")
Prints the number of status of the packet scheduler.
QoSTransportSettings settings_
Definition: Transport.h:188
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:815
int filter_decode(unsigned char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void add_read_domain(const std::string domain)
Adds a read domain to the list of domains to read from.
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:34
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a MessageHeader instance from a buffer and updates the amount of buffer room remaining...
uint32_t quality
priority of the update
bool is_bandwidth_violated(int64_t limit)
Checks send and receive bandwidth against send and receive limits.
void attach(knowledge::ThreadSafeContext *context)
Attaches a context to the various filtering systems.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
uint32_t updates
the number of knowledge variable updates in the message
int64_t get_send_bandwidth_limit(void) const
Returns the limit for sending on this transport in bytes per second.
knowledge::KnowledgeRecord filter_receive(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the receive filter chain.
uint32_t fragment_queue_length
Indicates queue length for holding clock-keyed fragments.
This class stores variables and their values for use by any entity needing state information in a thr...
Provides scheduler for dropping packets.
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
Holds basic transport settings.
Compiled, optimized KaRL logic.
double get_deadline(void) const
Returns the latency deadline in seconds.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
Defines a fragmentation header which allows for multi-part messages that are only applied once all fr...
Definition: Fragmentation.h:47
Provides context about the transport.
virtual ~Base()
Destructor.
Definition: Transport.cpp:28
uint32_t type
the type of message
bool add(void)
Adds a message to the monitor.
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:203
static uint32_t static_encoded_size(void)
Returns the size of the encoded MessageHeader class.
size_t get_number_of_send_aggregate_filters(void) const
Returns the number of aggregate filters applied before sending @ return the number of aggregate filte...
int get_level(void)
Gets the maximum logging detail level.
Definition: Logger.inl:68
virtual std::string to_string(void)
Converts the relevant fields to a printable string.
size_t num_read_domains(void) const
Returns the number of read domains.
A thread-safe guard for a context or knowledge base.
Definition: ContextGuard.h:23
static bool fragment_message_header_test(const char *buffer)
Tests the buffer for a reduced message identifier.
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
MADARA_Export uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
Definition: Utility.cpp:625
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
knowledge::KnowledgeRecord filter_rebroadcast(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the rebroadcast filter chain.
std::string write_domain
All class members are accessible to users for easy setup.
uint64_t size
the size of this header plus the updates
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:42
static uint32_t static_encoded_size(void)
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
::std::map< std::string, KnowledgeRecord > KnowledgeMap
void get_read_domains(std::vector< std::string > &domains) const
Retrieves the list of read domains.
bool send_reduced_message_header
send the reduced message header (clock, size, updates, KaRL id)
void add(uint64_t size)
Adds a message to the monitor.
size_t get_number_of_receive_aggregate_filters(void) const
Returns the number of aggregate filters applied after receiving @ return the number of aggregate filt...
static bool reduced_message_header_test(const char *buffer)
Tests the buffer for a reduced message identifier.
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
void attach(const QoSTransportSettings *settings)
Attaches settings.
const std::string id_
host:port identifier of this process
Definition: Transport.h:186
bool is_reading_domain(const std::string domain) const
Checks if a domain is in the domain read list.
uint64_t timestamp
the timestamp of the sender when the message was generated
std::string on_data_received_logic
logic to be evaluated after every successful update
static constexpr struct madara::knowledge::tags::string_t string
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:112
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
char originator[64]
the originator of the message (host:port)
char domain[32]
the domain that this message is intended for
uint32_t queue_length
Length of the buffer used to store history of events.
Container for quality-of-service settings.
virtual void close(void)
Closes this transport.
Definition: Transport.cpp:106
MADARA_Export bool exists(const char *originator, uint64_t clock, uint32_t update_number, OriginatorFragmentMap &map)
Checks if a fragment already exists within a fragment map.
int filter_encode(unsigned char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:740
int64_t get_total_bandwidth_limit(void) const
Returns the total limit for this transport in bytes per second.
Provides monitoring capability of a transport&#39;s bandwidth.
static bool message_header_test(const char *buffer)
Tests the buffer for a normal message identifier.
knowledge::KnowledgeRecord filter_send(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to send&#39;s filter chain.
#define MADARA_IDENTIFIER_LENGTH
Definition: MessageHeader.h:20
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
uint64_t clock
the clock of the sender when the message was generated
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:206
Defines a simple, smaller message header of 29 bytes that supports less QoS.
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:209
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:33
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:56
size_t get_number_of_rebroadcast_aggregate_filters(void) const
Returns the number of aggregate filters applied before rebroadcasting @ return the number of aggregat...
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:200
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:191
uint32_t quality
the quality of the message sender
int check_transport(void)
all subclasses should call this method at the beginning of send_data
Definition: Transport.inl:19
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent&#39;s m...
TransportSettings & settings(void)
Getter for the transport settings.
Definition: Transport.inl:45
virtual char * write(char *buffer, int64_t &buffer_remaining)
Writes a MessageHeader instance to a buffer and updates the amount of buffer room remaining...