1 #include "madara/transport/ndds/NDDSListener.h" 16 : settings_ (settings), id_ (id), context_ (&context),
17 send_monitor_ (send_monitor), receive_monitor_ (receive_monitor),
18 packet_scheduler_ (packet_scheduler)
28 "NDDSListener::NDDSListener:" \
29 " setting rules to %s\n",
37 "NDDSListener::NDDSListener:" \
38 " no permanent rules were set\n");
55 DDSDataReader* reader,
const DDS_SubscriptionMatchedStatus& status)
62 const char * print_prefix,
75 ssize_t bytes_sent (result +
sizeof (Ndds_Knowledge_Update));
79 " Sent packet of size %d\n",
87 " Send bandwidth = %d B/s\n",
96 Ndds_Knowledge_UpdateSeq update_data_list;
97 DDS_SampleInfoSeq info_seq;
100 const char * print_prefix =
"NDDSListener::svc";
102 Ndds_Knowledge_UpdateDataReader * update_reader =
103 Ndds_Knowledge_UpdateDataReader::narrow(reader);
104 if (update_reader == NULL)
108 " Unable to create specialized reader. Leaving callback...\n",
114 rc = update_reader->take(
117 DDS_LENGTH_UNLIMITED,
118 DDS_ANY_SAMPLE_STATE,
120 DDS_ANY_INSTANCE_STATE);
122 if (rc == DDS_RETCODE_NO_DATA)
126 else if (rc != DDS_RETCODE_OK)
130 " could not take current sample.\n", print_prefix);
134 for (
int i = 0; i < update_data_list.length(); ++i)
136 if (info_seq[i].valid_data)
141 if (strncmp (update_data_list[i].originator,
"", 1) == 0)
147 " discarding null originator event.\n", print_prefix);
157 " discarding non-assignment event.\n", print_prefix);
169 " processing multassignment from %s with time %llu and quality %d.\n",
170 print_prefix, update_data_list[i].originator,
171 update_data_list[i].clock, update_data_list[i].quality);
174 update_data_list[i].buffer.length (),
id_, *
context_,
182 rc = update_reader->return_loan(update_data_list, info_seq);
183 if (rc != DDS_RETCODE_OK)
187 " could return DDS sample instance loan.\n", print_prefix);
void on_data_available(DDSDataReader *reader)
Handles the case that data has become available.
utility::ScopedArray< char > buffer_
buffer for receiving
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
This class stores variables and their values for use by any entity needing state information in a thr...
Provides scheduler for dropping packets.
Provides knowledge logging services to files and terminals.
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
T * get_ptr(void)
get the underlying pointer
const QoSTransportSettings settings_
Transport settings.
::std::map< std::string, KnowledgeRecord > KnowledgeMap
void add(uint64_t size)
Adds a message to the monitor.
knowledge::ThreadSafeContext * context_
knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
std::string on_data_received_logic
logic to be evaluated after every successful update
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
uint32_t queue_length
Length of the buffer used to store history of events.
void on_subscription_matched(DDSDataReader *reader, const DDS_SubscriptionMatchedStatus &status)
Handles the case where a subscription has been matched.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Provides monitoring capability of a transport's bandwidth.
~NDDSListener()
Destructor.
NDDSListener(const TransportSettings &settings, const std::string &id, knowledge::ThreadSafeContext &context, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
Container for NDDS callbacks.