16 Knowledge::UpdateDataReader_ptr & update_reader,
17 Knowledge::UpdateDataWriter_ptr & update_writer,
21 : id_ (id), settings_ (settings), context_ (&context),
22 update_reader_ (update_reader),
23 update_writer_ (update_writer),
24 send_monitor_ (send_monitor),
25 receive_monitor_ (receive_monitor),
26 packet_scheduler_ (packet_scheduler)
46 #ifndef _MADARA_NO_KARL_ 48 "UdpTransportReadThread::init:" \
49 " setting rules to %s\n",
54 #endif // _MADARA_NO_KARL_ 59 "UdpTransportReadThread::init:" \
60 " no permanent rules were set\n");
72 const char * print_prefix,
85 ssize_t bytes_sent (result +
sizeof (Knowledge::Update));
86 DDS::ReturnCode_t dds_result;
87 DDS::InstanceHandle_t handle;
95 Knowledge::Update data;
97 data.buffer = Knowledge::seq_oct (result, result,
99 data.clock = cur_clock;
100 data.quality = quality;
101 data.updates = DDS::ULong (records.size ());
102 data.originator = DDS::string_dup(
id_.c_str ());
105 data.timestamp = time (NULL);
113 " Sent packet of size %d\n",
121 " Send bandwidth = %d B/s\n",
130 DDS::SampleInfoSeq_var infoList =
new DDS::SampleInfoSeq;
131 DDS::ReturnCode_t dds_result;
133 DDS::Boolean result =
false;
134 const char * print_prefix =
"SpliceReadThread::svc";
135 Knowledge::UpdateSeq_var update_data_list_ =
new Knowledge::UpdateSeq;
137 DDS::WaitSet waitset_;
138 DDS::StatusCondition_ptr condition_;
141 condition_->set_enabled_statuses (DDS::DATA_AVAILABLE_STATUS);
142 waitset_.attach_condition (condition_);
144 ::DDS::Duration_t wait_time;
146 wait_time.nanosec = 0;
151 "%s: entering processing loop.\n", print_prefix);
156 DDS::ConditionSeq_var conditionList =
new DDS::ConditionSeq();
157 result = waitset_.wait (conditionList.inout (), wait_time);
162 "%s: entering a take on the DDS reader.\n", print_prefix);
164 dds_result =
update_reader_->take (update_data_list_, infoList, 20,
165 DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
167 amount = update_data_list_->length ();
171 for (
int i = 0; i < amount; ++i)
176 if (!update_data_list_[i].originator.val ())
181 "%s: discarding null originator event.\n", print_prefix);
189 "%s: discarding non-assignment event.\n", print_prefix);
199 update_data_list_[i].buffer.length (),
id_, *
context_,
206 if (header->
ttl > 0 && rebroadcast_records.size () > 0 &&
207 settings_.get_participant_ttl () > 0)
210 header->
ttl = std::min (
211 settings_.get_participant_ttl (), header->
ttl);
213 rebroadcast (print_prefix, header, rebroadcast_records);
223 dds_result =
update_reader_->return_loan (update_data_list_, infoList);
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
SpliceReadThread(const std::string &id, const TransportSettings &settings, knowledge::ThreadSafeContext &context, Knowledge::UpdateDataReader_ptr &update_reader, Knowledge::UpdateDataWriter_ptr &update_writer, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
::Knowledge::UpdateDataWriter_var update_writer_
The DDS data writer that we can write to.
const QoSTransportSettings settings_
Transport settings.
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
This class stores variables and their values for use by any entity needing state information in a thr...
Provides scheduler for dropping packets.
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
Provides knowledge logging services to files and terminals.
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
madara::utility::ScopedArray< char > buffer_
buffer for receiving
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
T * get_ptr(void)
get the underlying pointer
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
::Knowledge::UpdateDataReader_var update_reader_
The DDS data reader that we will take from.
This class provides a distributed knowledge base to users.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
::std::map< std::string, KnowledgeRecord > KnowledgeMap
void add(uint64_t size)
Adds a message to the monitor.
void cleanup(void)
Cleanup function called by thread manager.
std::string on_data_received_logic
logic to be evaluated after every successful update
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
uint32_t queue_length
Length of the buffer used to store history of events.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Provides utility functions and classes for common tasks and needs.
Provides monitoring capability of a transport's bandwidth.
Provides functions and classes for the distributed knowledge base.
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
const std::string id_
Unique identifier for this entity (e.g., host:port)
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
knowledge::ThreadSafeContext * context_
The knowledge context that we will be updating.
void run(void)
The main loop internals for the read thread.