12   : is_valid_ (false), shutting_down_ (false),
    13   valid_setup_ (mutex_), id_ (id),
    14   settings_ (new_settings), context_ (context)
    16 #ifndef _MADARA_NO_KARL_
    17   , on_data_received_ (context.get_logger ())
    39       "transport::Base::setup" \
    40       " setting rules to %s\n",
    43 #ifndef _MADARA_NO_KARL_    47 #endif // _MADARA_NO_KARL_    52       "transport::Base::setup" \
    53       " no permanent rules were set\n");
    64       "transport::Base::setup" \
    65       " no read domains set. Adding write domain (%s)\n",
    73       "transport::Base::setup" \
    74       " settings configured with %d read domains\n",
    81     std::vector <std::string> domains;
    84     std::stringstream buffer;
    86     for (
unsigned int i = 0; i < domains.size (); ++i)
    90       if (i != domains.size () - 1)
    97       "transport::Base::setup" \
    98       " Write domain: %s. Read domains: %s\n",
   121 #ifndef _MADARA_NO_KARL_
   125   const char * print_prefix,
   126   const char * remote_host,
   132   int max_buffer_size = (int)bytes_read;
   135   receive_monitor.
add (bytes_read);
   139     " Receive bandwidth = %" PRIu64 
" B/s\n",
   143   bool is_reduced = 
false;
   144   bool is_fragment = 
false;
   148     " calling decode filters on %" PRIu32 
" bytes\n",
   149     print_prefix, bytes_read);
   152   bytes_read = (uint32_t)settings.
filter_decode ((
unsigned char *)buffer,
   153     max_buffer_size, max_buffer_size);
   157     " Decoding resulted in %" PRIu32 
" final bytes\n",
   158     print_prefix, bytes_read);
   161   int64_t buffer_remaining = (int64_t)bytes_read;
   164   rebroadcast_records.clear ();
   175       " processing reduced KaRL message from %s\n",
   187       " processing KaRL message from %s\n",
   198       " processing KaRL fragment message from %s\n",
   214       " dropping non-KaRL message with id %s from %s\n",
   225       " dropping too short message from %s (length %i)\n",
   233   const char * update = header->
read (buffer, buffer_remaining);
   237     " header info: %s\n",
   238     print_prefix, header->
to_string ().c_str ());
   240   if (header->
size < bytes_read)
   244       " Message header.size (%" PRIu64 
" bytes) is less than actual"   245       " bytes read (%" PRIu32 
" bytes). Dropping message.\n",
   246       print_prefix, header->
size, bytes_read);
   257       " Fragment already exists in fragment map. Dropping.\n",
   270         " dropping message from ourself\n",
   279         " remote id (%s) is not our own\n",
   287         "%s: remote id (%s) is trusted\n",
   295         " dropping message from untrusted peer (%s\n",
   309         " originator (%s) is trusted\n",
   311         originator.c_str ());
   317         " dropping message from untrusted originator (%s)\n",
   319         originator.c_str ());
   329         " remote id (%s) has an untrusted domain (%s). Dropping message.\n",
   341         " remote id (%s) message is in our domain\n",
   356       " Processing fragment %" PRIu32 
" of %s:%" PRIu64 
".\n",
   375         " Message has been pieced together from fragments. Processing...\n",
   383       buffer_remaining = (int64_t)frag_header->
get_size (message);
   386         char * buffer_override = (
char *)buffer;
   387         memcpy (buffer_override, message, frag_header->
get_size (message));
   394             " processing reduced KaRL message from %s\n",
   400           update = header->read (buffer, buffer_remaining);
   406             " processing KaRL message from %s\n",
   411           update = header->read (buffer, buffer_remaining);
   419   int actual_updates = 0;
   420   uint64_t current_time = time (NULL);
   430   uint64_t latency (0);
   436       latency = current_time - header->
timestamp;
   438       if (latency > deadline)
   442           " deadline violation (latency is %" PRIu64 
", deadline is %f).\n",
   453         " Cannot compute message latency." \
   454         " Message header timestamp is in the future." \
   455         " message.timestamp = %" PRIu64 
", cur_timestamp = %" PRIu64 
".\n",
   463     " iterating over the %" PRIu32 
" updates\n",
   470   record.clock = header->
clock;
   473   bool dropped = 
false;
   481       " Send monitor has detected violation of bandwidth limit." \
   482       " Dropping packet from rebroadcast list\n", print_prefix);
   490       " Receive monitor has detected violation of bandwidth limit." \
   491       " Dropping packet from rebroadcast list...\n", print_prefix);
   498       " Transport participant TTL is lower than header ttl." \
   499       " Dropping packet from rebroadcast list...\n", print_prefix);
   504     " Applying %" PRIu32 
" updates\n", print_prefix, header->
updates);
   507   for (uint32_t i = 0; i < header->
updates; ++i)
   510     update = record.read (update, key, buffer_remaining);
   512     if (buffer_remaining < 0)
   516         " unable to process message. Buffer remaining is negative." \
   517         " Server is likely being targeted by custom KaRL tools.\n",
   527         " Applying receive filter to %s\n", print_prefix, key.c_str ());
   529       record = settings.
filter_receive (record, key, transport_context);
   531       if (record.exists ())
   535           " Filter results for %s were %s\n", print_prefix,
   536           key.c_str (), record.to_string ().c_str ());
   538         updates[key] = record;
   544           " Filter resulted in dropping %s\n", print_prefix, key.c_str ());
   551   if (additionals.size () > 0)
   555       " %lld additional records being handled after receive.\n", print_prefix,
   556       (
long long)additionals.size ());
   558     for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
   559           i != additionals.end (); ++i)
   561       updates[i->first] = i->second;
   564     transport_context.clear_records ();
   581       " Applying aggregate receive filters.\n", print_prefix);
   590       " No aggregate receive filters were applied...\n",
   596     " Locking the context to apply updates.\n", print_prefix);
   603       " Applying updates to context.\n", print_prefix);
   606     for (knowledge::KnowledgeMap::iterator i = updates.begin ();
   607       i != updates.end (); ++i)
   611       result = i->second.apply (context, i->first, header->
quality,
   612         header->
clock, 
false);
   619           " update %s=%s was rejected\n",
   621           key.c_str (), record.to_string ().c_str ());
   627           " update %s=%s was accepted\n",
   629           key.c_str (), record.to_string ().c_str ());
   638     transport_context.set_operation (
   643       " Applying rebroadcast filters to receive results.\n", print_prefix);
   646     for (knowledge::KnowledgeMap::iterator i = updates.begin ();
   647          i != updates.end (); ++i)
   650         i->second, i->first, transport_context);
   652       if (i->second.exists ())
   654         if (i->second.to_string () != 
"")
   658             " Filter results for key %s were %s\n", print_prefix,
   659             i->first.c_str (), i->second.to_string ().c_str ());
   661         rebroadcast_records[i->first] = i->second;
   667           " Filter resulted in dropping %s\n", print_prefix,
   674     for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
   675           i != additionals.end (); ++i)
   677       rebroadcast_records[i->first] = i->second;
   682       " Applying aggregate rebroadcast filters to %d records.\n",
   683       print_prefix, rebroadcast_records.size ());
   687       && rebroadcast_records.size () > 0)
   695         " No aggregate rebroadcast filters were applied...\n",
   701       " Returning to caller with %d rebroadcast records.\n",
   702       print_prefix, rebroadcast_records.size ());
   709       " Rebroadcast packet was dropped...\n",
   716 #ifndef _MADARA_NO_KARL_   719       " evaluating rules in %s\n",
   723     context.
evaluate (on_data_received);
   724 #endif // _MADARA_NO_KARL_   731       " no permanent rules were set\n",
   735   return actual_updates;
   743   int64_t & buffer_remaining,
   745   const char * print_prefix,
   752   if (header->
ttl > 0 && records.size () > 0 && packet_scheduler.
add ())
   755     uint64_t * message_size = (uint64_t *)buffer;
   756     int max_buffer_size = (int)buffer_remaining;
   759     header->
updates = uint32_t (records.size ());
   762     char * update = header->
write (buffer, buffer_remaining);
   764     for (knowledge::KnowledgeMap::const_iterator i = records.begin ();
   765          i != records.end (); ++i)
   767       update = i->second.write (update, i->first, buffer_remaining);
   770     if (buffer_remaining > 0)
   772       int size = (int)(settings.
queue_length - buffer_remaining);
   777         " %" PRIu64 
" bytes prepped for rebroadcast packet\n",
   784         " calling encode filters\n",
   788         result, max_buffer_size);
   794         " Not enough buffer for rebroadcasting packet\n",
   804       " No rebroadcast necessary.\n",
   817   const char * print_prefix)
   824       "%s: transport has been told to shutdown",
   832       "%s: transport is not valid",
   840   bool reduced = 
false;
   846     " Applying filters before sending...\n",
   852       (uint64_t) time (NULL), (uint64_t) time (NULL),
   856   bool dropped = 
false;
   864       " Send monitor has detected violation of bandwidth limit." \
   865       " Dropping packet...\n", print_prefix);
   873       " Receive monitor has detected violation of bandwidth limit." \
   874       " Dropping packet...\n", print_prefix);
   883     for (knowledge::KnowledgeRecords::const_iterator i = orig_updates.begin ();
   884           i != orig_updates.end (); ++i)
   888         " Calling filter chain.\n", print_prefix);
   896         " Filter returned.\n", print_prefix);
   898       if (result.exists ())
   902           " Adding record to update list.\n", print_prefix);
   904         filtered_updates[i->first] = result;
   910           " Filter removed record from update list.\n", print_prefix);
   916     for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
   917          i != additionals.end (); ++i)
   921         " Filter added a record %s to the update list.\n",
   922         print_prefix, i->first.c_str ());
   923       filtered_updates[i->first] = i->second;
   930       " Packet scheduler has dropped packet...\n", print_prefix);
   937     " Applying %d aggregate update send filters to %d updates...\n",
   939     (int)filtered_updates.size ());
   943       filtered_updates.size () > 0)
   951       " No aggregate send filters were applied...\n",
   959     " Finished applying filters before sending...\n",
   962   if (filtered_updates.size () == 0)
   966       " Filters removed all data. Nothing to send.\n",
   980       " Unable to allocate buffer of size " PRIu32 
". Exiting thread.\n",
   995       " Preparing message with reduced message header.\n",
  1005       " Preparing message with normal message header.\n",
  1017     strncpy (header->
domain, this->settings_.write_domain.c_str (),
  1018       sizeof (header->
domain) - 1);
  1036   header->
updates = uint32_t (filtered_updates.size ());
  1042   int max_buffer_size = (int)buffer_remaining;
  1045   char * update = header->
write (buffer, buffer_remaining);
  1046   uint64_t * message_size = (uint64_t *)buffer;
  1071   for (knowledge::KnowledgeMap::const_iterator i = filtered_updates.begin ();
  1072     i != filtered_updates.end (); ++i, ++j)
  1074     update = i->second.write (update, i->first, buffer_remaining);
  1076     if (buffer_remaining > 0)
  1080         " update[%d] => encoding %s of type %" PRId32 
" and size %" PRIu32 
"\n",
  1082         j, i->first.c_str (), i->second.type (), i->second.size ());
  1088         " unable to encode update[%d] => %s of type %"  1089         PRId32 
" and size %" PRIu32 
"\n",
  1091         j, i->first.c_str (), i->second.type (), i->second.size ());
  1097   if (buffer_remaining > 0)
  1105 #ifndef _MADARA_NO_KARL_  1109         " evaluating rules in %s\n",
  1117         " rules have been successfully evaluated\n",
  1120 #endif // _MADARA_NO_KARL_  1127         " no permanent rules were set\n",
  1134     " calling encode filters\n",
  1139     (int)size, max_buffer_size);
  1143     " header info before encode: %s\n",
  1144     print_prefix, header->
to_string ().c_str ());
 This class encapsulates an entry in a KnowledgeBase. 
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records. 
MADARA_Export char * add_fragment(const char *originator, uint64_t clock, uint32_t update_number, const char *fragment, uint32_t queue_length, OriginatorFragmentMap &map, bool clear=true)
Adds a fragment to an originator fragment map and returns the aggregate message if the message is com...
madara::expression::ExpressionTree on_data_received_
data received rules, defined in Transport settings 
bool is_trusted(const std::string &peer) const 
Checks if a peer is trusted. 
Base(const std::string &id, TransportSettings &new_settings, knowledge::ThreadSafeContext &context)
Constructor. 
OriginatorFragmentMap fragment_map
map of fragments received by originator 
void print_status(unsigned int log_level=0, const char *prefix="PacketScheduler")
Prints the number of status of the packet scheduler. 
QoSTransportSettings settings_
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending. 
int filter_decode(unsigned char *source, int size, int max_size) const 
Calls decode on the the buffer filter chain. 
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void add_read_domain(const std::string domain)
Adds a read domain to the list of domains to read from. 
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down. 
uint32_t quality
priority of the update 
bool is_bandwidth_violated(int64_t limit)
Checks send and receive bandwidth against send and receive limits. 
void attach(knowledge::ThreadSafeContext *context)
Attaches a context to the various filtering systems. 
int validate_transport(void)
Validates a transport to indicate it is not shutting down. 
int64_t get_send_bandwidth_limit(void) const 
Returns the limit for sending on this transport in bytes per second. 
knowledge::KnowledgeRecord filter_receive(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const 
Filters an input according to the receive filter chain. 
uint32_t fragment_queue_length
Indicates queue length for holding clock-keyed fragments. 
This class stores variables and their values for use by any entity needing state information in a thr...
Provides scheduler for dropping packets. 
unsigned char get_rebroadcast_ttl(void) const 
Gets the time to live for rebroadcasting (number of rebroadcasts per message). 
Holds basic transport settings. 
Compiled, optimized KaRL logic. 
double get_deadline(void) const 
Returns the latency deadline in seconds. 
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method. 
Provides context about the transport. 
virtual ~Base()
Destructor. 
bool add(void)
Adds a message to the monitor. 
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage 
size_t get_number_of_send_aggregate_filters(void) const 
Returns the number of aggregate filters applied before sending @ return the number of aggregate filte...
int get_level(void)
Gets the maximum logging detail level. 
size_t num_read_domains(void) const 
Returns the number of read domains. 
A thread-safe guard for a context or knowledge base. 
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression. 
MADARA_Export uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian. 
T * get_ptr(void)
get the underlying pointer 
knowledge::KnowledgeRecord filter_rebroadcast(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const 
Filters an input according to the rebroadcast filter chain. 
std::string write_domain
All class members are accessible to users for easy setup. 
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
::std::map< std::string, KnowledgeRecord > KnowledgeMap
void get_read_domains(std::vector< std::string > &domains) const 
Retrieves the list of read domains. 
bool send_reduced_message_header
send the reduced message header (clock, size, updates, KaRL id) 
void add(uint64_t size)
Adds a message to the monitor. 
size_t get_number_of_receive_aggregate_filters(void) const 
Returns the number of aggregate filters applied after receiving @ return the number of aggregate filt...
void attach(const QoSTransportSettings *settings)
Attaches settings. 
const std::string id_
host:port identifier of this process 
bool is_reading_domain(const std::string domain) const 
Checks if a domain is in the domain read list. 
std::string on_data_received_logic
logic to be evaluated after every successful update 
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry. 
uint32_t queue_length
Length of the buffer used to store history of events. 
Container for quality-of-service settings. 
virtual void close(void)
Closes this transport. 
MADARA_Export bool exists(const char *originator, uint64_t clock, uint32_t update_number, OriginatorFragmentMap &map)
Checks if a fragment already exists within a fragment map. 
int filter_encode(unsigned char *source, int size, int max_size) const 
Calls encode on the the buffer filter chain. 
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network. 
int64_t get_total_bandwidth_limit(void) const 
Returns the total limit for this transport in bytes per second. 
Provides monitoring capability of a transport's bandwidth. 
knowledge::KnowledgeRecord filter_send(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const 
Filters an input according to send's filter chain. 
uint64_t get_clock(void) const 
Atomically gets the Lamport clock. 
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues 
madara::utility::ScopedArray< char > buffer_
buffer for sending 
logger::Logger & get_logger(void) const 
Gets the logger used for information printing. 
virtual int setup(void)
all subclasses should call this method at the end of its setup 
size_t get_number_of_rebroadcast_aggregate_filters(void) const 
Returns the number of aggregate filters applied before rebroadcasting @ return the number of aggregat...
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree. 
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage 
madara::knowledge::ThreadSafeContext & context_
int check_transport(void)
all subclasses should call this method at the beginning of send_data 
unsigned char get_participant_ttl(void) const 
Returns the maximum time to live participation of this transport in rebroadcasting of other agent's m...
TransportSettings & settings(void)
Getter for the transport settings.