12 : is_valid_ (false), shutting_down_ (false),
13 valid_setup_ (mutex_), id_ (id),
14 settings_ (new_settings), context_ (context)
16 #ifndef _MADARA_NO_KARL_
17 , on_data_received_ (context.get_logger ())
39 "transport::Base::setup" \
40 " setting rules to %s\n",
43 #ifndef _MADARA_NO_KARL_ 47 #endif // _MADARA_NO_KARL_ 52 "transport::Base::setup" \
53 " no permanent rules were set\n");
64 "transport::Base::setup" \
65 " no read domains set. Adding write domain (%s)\n",
73 "transport::Base::setup" \
74 " settings configured with %d read domains\n",
81 std::vector <std::string> domains;
84 std::stringstream buffer;
86 for (
unsigned int i = 0; i < domains.size (); ++i)
90 if (i != domains.size () - 1)
97 "transport::Base::setup" \
98 " Write domain: %s. Read domains: %s\n",
121 #ifndef _MADARA_NO_KARL_
125 const char * print_prefix,
126 const char * remote_host,
132 int max_buffer_size = (int)bytes_read;
135 receive_monitor.
add (bytes_read);
139 " Receive bandwidth = %" PRIu64
" B/s\n",
143 bool is_reduced =
false;
144 bool is_fragment =
false;
148 " calling decode filters on %" PRIu32
" bytes\n",
149 print_prefix, bytes_read);
152 bytes_read = (uint32_t)settings.
filter_decode ((
unsigned char *)buffer,
153 max_buffer_size, max_buffer_size);
157 " Decoding resulted in %" PRIu32
" final bytes\n",
158 print_prefix, bytes_read);
161 int64_t buffer_remaining = (int64_t)bytes_read;
164 rebroadcast_records.clear ();
175 " processing reduced KaRL message from %s\n",
187 " processing KaRL message from %s\n",
198 " processing KaRL fragment message from %s\n",
214 " dropping non-KaRL message with id %s from %s\n",
225 " dropping too short message from %s (length %i)\n",
233 const char * update = header->
read (buffer, buffer_remaining);
237 " header info: %s\n",
238 print_prefix, header->
to_string ().c_str ());
240 if (header->
size < bytes_read)
244 " Message header.size (%" PRIu64
" bytes) is less than actual" 245 " bytes read (%" PRIu32
" bytes). Dropping message.\n",
246 print_prefix, header->
size, bytes_read);
257 " Fragment already exists in fragment map. Dropping.\n",
270 " dropping message from ourself\n",
279 " remote id (%s) is not our own\n",
287 "%s: remote id (%s) is trusted\n",
295 " dropping message from untrusted peer (%s\n",
309 " originator (%s) is trusted\n",
311 originator.c_str ());
317 " dropping message from untrusted originator (%s)\n",
319 originator.c_str ());
329 " remote id (%s) has an untrusted domain (%s). Dropping message.\n",
341 " remote id (%s) message is in our domain\n",
356 " Processing fragment %" PRIu32
" of %s:%" PRIu64
".\n",
375 " Message has been pieced together from fragments. Processing...\n",
383 buffer_remaining = (int64_t)frag_header->
get_size (message);
386 char * buffer_override = (
char *)buffer;
387 memcpy (buffer_override, message, frag_header->
get_size (message));
394 " processing reduced KaRL message from %s\n",
400 update = header->read (buffer, buffer_remaining);
406 " processing KaRL message from %s\n",
411 update = header->read (buffer, buffer_remaining);
419 int actual_updates = 0;
420 uint64_t current_time = time (NULL);
430 uint64_t latency (0);
436 latency = current_time - header->
timestamp;
438 if (latency > deadline)
442 " deadline violation (latency is %" PRIu64
", deadline is %f).\n",
453 " Cannot compute message latency." \
454 " Message header timestamp is in the future." \
455 " message.timestamp = %" PRIu64
", cur_timestamp = %" PRIu64
".\n",
463 " iterating over the %" PRIu32
" updates\n",
470 record.clock = header->
clock;
473 bool dropped =
false;
481 " Send monitor has detected violation of bandwidth limit." \
482 " Dropping packet from rebroadcast list\n", print_prefix);
490 " Receive monitor has detected violation of bandwidth limit." \
491 " Dropping packet from rebroadcast list...\n", print_prefix);
498 " Transport participant TTL is lower than header ttl." \
499 " Dropping packet from rebroadcast list...\n", print_prefix);
504 " Applying %" PRIu32
" updates\n", print_prefix, header->
updates);
507 for (uint32_t i = 0; i < header->
updates; ++i)
510 update = record.read (update, key, buffer_remaining);
512 if (buffer_remaining < 0)
516 " unable to process message. Buffer remaining is negative." \
517 " Server is likely being targeted by custom KaRL tools.\n",
527 " Applying receive filter to %s\n", print_prefix, key.c_str ());
529 record = settings.
filter_receive (record, key, transport_context);
531 if (record.exists ())
535 " Filter results for %s were %s\n", print_prefix,
536 key.c_str (), record.to_string ().c_str ());
538 updates[key] = record;
544 " Filter resulted in dropping %s\n", print_prefix, key.c_str ());
551 if (additionals.size () > 0)
555 " %lld additional records being handled after receive.\n", print_prefix,
556 (
long long)additionals.size ());
558 for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
559 i != additionals.end (); ++i)
561 updates[i->first] = i->second;
564 transport_context.clear_records ();
581 " Applying aggregate receive filters.\n", print_prefix);
590 " No aggregate receive filters were applied...\n",
596 " Locking the context to apply updates.\n", print_prefix);
603 " Applying updates to context.\n", print_prefix);
606 for (knowledge::KnowledgeMap::iterator i = updates.begin ();
607 i != updates.end (); ++i)
611 result = i->second.apply (context, i->first, header->
quality,
612 header->
clock,
false);
619 " update %s=%s was rejected\n",
621 key.c_str (), record.to_string ().c_str ());
627 " update %s=%s was accepted\n",
629 key.c_str (), record.to_string ().c_str ());
638 transport_context.set_operation (
643 " Applying rebroadcast filters to receive results.\n", print_prefix);
646 for (knowledge::KnowledgeMap::iterator i = updates.begin ();
647 i != updates.end (); ++i)
650 i->second, i->first, transport_context);
652 if (i->second.exists ())
654 if (i->second.to_string () !=
"")
658 " Filter results for key %s were %s\n", print_prefix,
659 i->first.c_str (), i->second.to_string ().c_str ());
661 rebroadcast_records[i->first] = i->second;
667 " Filter resulted in dropping %s\n", print_prefix,
674 for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
675 i != additionals.end (); ++i)
677 rebroadcast_records[i->first] = i->second;
682 " Applying aggregate rebroadcast filters to %d records.\n",
683 print_prefix, rebroadcast_records.size ());
687 && rebroadcast_records.size () > 0)
695 " No aggregate rebroadcast filters were applied...\n",
701 " Returning to caller with %d rebroadcast records.\n",
702 print_prefix, rebroadcast_records.size ());
709 " Rebroadcast packet was dropped...\n",
716 #ifndef _MADARA_NO_KARL_ 719 " evaluating rules in %s\n",
723 context.
evaluate (on_data_received);
724 #endif // _MADARA_NO_KARL_ 731 " no permanent rules were set\n",
735 return actual_updates;
743 int64_t & buffer_remaining,
745 const char * print_prefix,
752 if (header->
ttl > 0 && records.size () > 0 && packet_scheduler.
add ())
755 uint64_t * message_size = (uint64_t *)buffer;
756 int max_buffer_size = (int)buffer_remaining;
759 header->
updates = uint32_t (records.size ());
762 char * update = header->
write (buffer, buffer_remaining);
764 for (knowledge::KnowledgeMap::const_iterator i = records.begin ();
765 i != records.end (); ++i)
767 update = i->second.write (update, i->first, buffer_remaining);
770 if (buffer_remaining > 0)
772 int size = (int)(settings.
queue_length - buffer_remaining);
777 " %" PRIu64
" bytes prepped for rebroadcast packet\n",
784 " calling encode filters\n",
788 result, max_buffer_size);
794 " Not enough buffer for rebroadcasting packet\n",
804 " No rebroadcast necessary.\n",
817 const char * print_prefix)
824 "%s: transport has been told to shutdown",
832 "%s: transport is not valid",
840 bool reduced =
false;
846 " Applying filters before sending...\n",
852 (uint64_t) time (NULL), (uint64_t) time (NULL),
856 bool dropped =
false;
864 " Send monitor has detected violation of bandwidth limit." \
865 " Dropping packet...\n", print_prefix);
873 " Receive monitor has detected violation of bandwidth limit." \
874 " Dropping packet...\n", print_prefix);
883 for (knowledge::KnowledgeRecords::const_iterator i = orig_updates.begin ();
884 i != orig_updates.end (); ++i)
888 " Calling filter chain.\n", print_prefix);
896 " Filter returned.\n", print_prefix);
898 if (result.exists ())
902 " Adding record to update list.\n", print_prefix);
904 filtered_updates[i->first] = result;
910 " Filter removed record from update list.\n", print_prefix);
916 for (knowledge::KnowledgeMap::const_iterator i = additionals.begin ();
917 i != additionals.end (); ++i)
921 " Filter added a record %s to the update list.\n",
922 print_prefix, i->first.c_str ());
923 filtered_updates[i->first] = i->second;
930 " Packet scheduler has dropped packet...\n", print_prefix);
937 " Applying %d aggregate update send filters to %d updates...\n",
939 (int)filtered_updates.size ());
943 filtered_updates.size () > 0)
951 " No aggregate send filters were applied...\n",
959 " Finished applying filters before sending...\n",
962 if (filtered_updates.size () == 0)
966 " Filters removed all data. Nothing to send.\n",
980 " Unable to allocate buffer of size " PRIu32
". Exiting thread.\n",
995 " Preparing message with reduced message header.\n",
1005 " Preparing message with normal message header.\n",
1017 strncpy (header->
domain, this->settings_.write_domain.c_str (),
1018 sizeof (header->
domain) - 1);
1036 header->
updates = uint32_t (filtered_updates.size ());
1042 int max_buffer_size = (int)buffer_remaining;
1045 char * update = header->
write (buffer, buffer_remaining);
1046 uint64_t * message_size = (uint64_t *)buffer;
1071 for (knowledge::KnowledgeMap::const_iterator i = filtered_updates.begin ();
1072 i != filtered_updates.end (); ++i, ++j)
1074 update = i->second.write (update, i->first, buffer_remaining);
1076 if (buffer_remaining > 0)
1080 " update[%d] => encoding %s of type %" PRId32
" and size %" PRIu32
"\n",
1082 j, i->first.c_str (), i->second.type (), i->second.size ());
1088 " unable to encode update[%d] => %s of type %" 1089 PRId32
" and size %" PRIu32
"\n",
1091 j, i->first.c_str (), i->second.type (), i->second.size ());
1097 if (buffer_remaining > 0)
1105 #ifndef _MADARA_NO_KARL_ 1109 " evaluating rules in %s\n",
1117 " rules have been successfully evaluated\n",
1120 #endif // _MADARA_NO_KARL_ 1127 " no permanent rules were set\n",
1134 " calling encode filters\n",
1139 (int)size, max_buffer_size);
1143 " header info before encode: %s\n",
1144 print_prefix, header->
to_string ().c_str ());
This class encapsulates an entry in a KnowledgeBase.
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
MADARA_Export char * add_fragment(const char *originator, uint64_t clock, uint32_t update_number, const char *fragment, uint32_t queue_length, OriginatorFragmentMap &map, bool clear=true)
Adds a fragment to an originator fragment map and returns the aggregate message if the message is com...
madara::expression::ExpressionTree on_data_received_
data received rules, defined in Transport settings
bool is_trusted(const std::string &peer) const
Checks if a peer is trusted.
Base(const std::string &id, TransportSettings &new_settings, knowledge::ThreadSafeContext &context)
Constructor.
OriginatorFragmentMap fragment_map
map of fragments received by originator
void print_status(unsigned int log_level=0, const char *prefix="PacketScheduler")
Prints the number of status of the packet scheduler.
QoSTransportSettings settings_
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
int filter_decode(unsigned char *source, int size, int max_size) const
Calls decode on the the buffer filter chain.
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void add_read_domain(const std::string domain)
Adds a read domain to the list of domains to read from.
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
uint32_t quality
priority of the update
bool is_bandwidth_violated(int64_t limit)
Checks send and receive bandwidth against send and receive limits.
void attach(knowledge::ThreadSafeContext *context)
Attaches a context to the various filtering systems.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
int64_t get_send_bandwidth_limit(void) const
Returns the limit for sending on this transport in bytes per second.
knowledge::KnowledgeRecord filter_receive(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the receive filter chain.
uint32_t fragment_queue_length
Indicates queue length for holding clock-keyed fragments.
This class stores variables and their values for use by any entity needing state information in a thr...
Provides scheduler for dropping packets.
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
Holds basic transport settings.
Compiled, optimized KaRL logic.
double get_deadline(void) const
Returns the latency deadline in seconds.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Provides context about the transport.
virtual ~Base()
Destructor.
bool add(void)
Adds a message to the monitor.
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
size_t get_number_of_send_aggregate_filters(void) const
Returns the number of aggregate filters applied before sending @ return the number of aggregate filte...
int get_level(void)
Gets the maximum logging detail level.
size_t num_read_domains(void) const
Returns the number of read domains.
A thread-safe guard for a context or knowledge base.
knowledge::KnowledgeRecord evaluate(CompiledExpression expression, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Evaluate a compiled expression.
MADARA_Export uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
T * get_ptr(void)
get the underlying pointer
knowledge::KnowledgeRecord filter_rebroadcast(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to the rebroadcast filter chain.
std::string write_domain
All class members are accessible to users for easy setup.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
::std::map< std::string, KnowledgeRecord > KnowledgeMap
void get_read_domains(std::vector< std::string > &domains) const
Retrieves the list of read domains.
bool send_reduced_message_header
send the reduced message header (clock, size, updates, KaRL id)
void add(uint64_t size)
Adds a message to the monitor.
size_t get_number_of_receive_aggregate_filters(void) const
Returns the number of aggregate filters applied after receiving @ return the number of aggregate filt...
void attach(const QoSTransportSettings *settings)
Attaches settings.
const std::string id_
host:port identifier of this process
bool is_reading_domain(const std::string domain) const
Checks if a domain is in the domain read list.
std::string on_data_received_logic
logic to be evaluated after every successful update
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
uint32_t queue_length
Length of the buffer used to store history of events.
Container for quality-of-service settings.
virtual void close(void)
Closes this transport.
MADARA_Export bool exists(const char *originator, uint64_t clock, uint32_t update_number, OriginatorFragmentMap &map)
Checks if a fragment already exists within a fragment map.
int filter_encode(unsigned char *source, int size, int max_size) const
Calls encode on the the buffer filter chain.
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
int64_t get_total_bandwidth_limit(void) const
Returns the total limit for this transport in bytes per second.
Provides monitoring capability of a transport's bandwidth.
knowledge::KnowledgeRecord filter_send(const madara::knowledge::KnowledgeRecord &input, const std::string &name, transport::TransportContext &context) const
Filters an input according to send's filter chain.
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
madara::utility::ScopedArray< char > buffer_
buffer for sending
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
size_t get_number_of_rebroadcast_aggregate_filters(void) const
Returns the number of aggregate filters applied before rebroadcasting @ return the number of aggregat...
madara::knowledge::KnowledgeRecord evaluate(const madara::knowledge::KnowledgeUpdateSettings &settings=knowledge::KnowledgeUpdateSettings())
Evaluates the expression tree.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
madara::knowledge::ThreadSafeContext & context_
int check_transport(void)
all subclasses should call this method at the beginning of send_data
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent's m...
TransportSettings & settings(void)
Getter for the transport settings.