MADARA  3.1.8
NddsListener.cpp
Go to the documentation of this file.
1 #include "madara/transport/ndds/NDDSListener.h"
5 
6 namespace logger = madara::logger;
7 
8 #include <sstream>
9 
11  const TransportSettings & settings, const std::string & id,
13  BandwidthMonitor & send_monitor,
14  BandwidthMonitor & receive_monitor,
15  PacketScheduler & packet_scheduler)
16 : settings_ (settings), id_ (id), context_ (&context),
17  send_monitor_ (send_monitor), receive_monitor_ (receive_monitor),
18  packet_scheduler_ (packet_scheduler)
19 {
20  // setup the receive buffer
21  if (settings_.queue_length > 0)
22  buffer_ = new char [settings_.queue_length];
23 
24  // check for an on_data_received ruleset
25  if (settings_.on_data_received_logic.length () != 0)
26  {
28  "NDDSListener::NDDSListener:" \
29  " setting rules to %s\n",
31 
33  }
34  else
35  {
37  "NDDSListener::NDDSListener:" \
38  " no permanent rules were set\n");
39  }
40 
41 }
42 
44 : settings_ (ref.settings_), id_ (ref.id_), context_ (ref.context_),
47 {
48 }
49 
51 {}
52 
53 void
55  DDSDataReader* reader, const DDS_SubscriptionMatchedStatus& status)
56 {
58 }
59 
60 void
62  const char * print_prefix,
63  MessageHeader * header,
64  const knowledge::KnowledgeMap & records)
65 {
66  int64_t buffer_remaining = (int64_t) settings_.queue_length;
67  char * buffer = buffer_.get_ptr ();
68  unsigned long result = prep_rebroadcast (*context_, buffer, buffer_remaining,
69  settings_, print_prefix,
70  header, records,
72 
73  if (result > 0)
74  {
75  ssize_t bytes_sent (result + sizeof (Ndds_Knowledge_Update));
76 
78  "%s:" \
79  " Sent packet of size %d\n",
80  print_prefix,
81  bytes_sent);
82 
83  send_monitor_.add ((uint32_t)bytes_sent);
84 
86  "%s:" \
87  " Send bandwidth = %d B/s\n",
88  print_prefix,
90  }
91 }
92 
93 void
95 {
96  Ndds_Knowledge_UpdateSeq update_data_list;
97  DDS_SampleInfoSeq info_seq;
98  DDS_ReturnCode_t rc;
99 
100  const char * print_prefix = "NDDSListener::svc";
101 
102  Ndds_Knowledge_UpdateDataReader * update_reader =
103  Ndds_Knowledge_UpdateDataReader::narrow(reader);
104  if (update_reader == NULL)
105  {
107  "%s:" \
108  " Unable to create specialized reader. Leaving callback...\n",
109  print_prefix);
110 
111  return;
112  }
113 
114  rc = update_reader->take(
115  update_data_list,
116  info_seq,
117  DDS_LENGTH_UNLIMITED,
118  DDS_ANY_SAMPLE_STATE,
119  DDS_ANY_VIEW_STATE,
120  DDS_ANY_INSTANCE_STATE);
121 
122  if (rc == DDS_RETCODE_NO_DATA)
123  {
124  return;
125  }
126  else if (rc != DDS_RETCODE_OK)
127  {
129  "%s:" \
130  " could not take current sample.\n", print_prefix);
131  return;
132  }
133 
134  for (int i = 0; i < update_data_list.length(); ++i)
135  {
136  if (info_seq[i].valid_data)
137  {
138  // if we are evaluating a message from ourselves, just continue
139  // to the next one. It's also possible to receive null originators
140  // from what I can only guess is the ospl daemon messing up
141  if (strncmp (update_data_list[i].originator, "", 1) == 0)
142  {
143  // if we don't check originator for null, we get phantom sends
144  // when the program exits.
146  "%s:" \
147  " discarding null originator event.\n", print_prefix);
148 
149  continue;
150  }
151 
152  if (update_data_list[i].type != madara::transport::MULTIASSIGN)
153  {
154  // we do not allow any other type than multiassign
156  "%s:" \
157  " discarding non-assignment event.\n", print_prefix);
158 
159  continue;
160  }
161 
162  knowledge::KnowledgeMap rebroadcast_records;
163  MessageHeader * header = 0;
164 
165  if (knowledge::MULTIPLE_ASSIGNMENT == update_data_list[i].type)
166  {
168  "%s:" \
169  " processing multassignment from %s with time %llu and quality %d.\n",
170  print_prefix, update_data_list[i].originator,
171  update_data_list[i].clock, update_data_list[i].quality);
172 
173  process_received_update ((char *)update_data_list[i].buffer.get_contiguous_buffer (),
174  update_data_list[i].buffer.length (), id_, *context_,
175  settings_, send_monitor_, receive_monitor_, rebroadcast_records,
176  on_data_received_, print_prefix,
177  "", header);
178  }
179  }
180  }
181 
182  rc = update_reader->return_loan(update_data_list, info_seq);
183  if (rc != DDS_RETCODE_OK)
184  {
186  "%s:" \
187  " could return DDS sample instance loan.\n", print_prefix);
188  }
189 }
void on_data_available(DDSDataReader *reader)
Handles the case that data has become available.
utility::ScopedArray< char > buffer_
buffer for receiving
Definition: NddsListener.h:94
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
Definition: NddsListener.h:100
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
This class stores variables and their values for use by any entity needing state information in a thr...
Provides scheduler for dropping packets.
Provides knowledge logging services to files and terminals.
Definition: GlobalLogger.h:11
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
Definition: NddsListener.h:97
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
const QoSTransportSettings settings_
Transport settings.
Definition: NddsListener.h:82
::std::map< std::string, KnowledgeRecord > KnowledgeMap
void add(uint64_t size)
Adds a message to the monitor.
knowledge::ThreadSafeContext * context_
Definition: NddsListener.h:88
knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
Definition: NddsListener.h:91
std::string on_data_received_logic
logic to be evaluated after every successful update
static constexpr struct madara::knowledge::tags::string_t string
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:112
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
uint32_t queue_length
Length of the buffer used to store history of events.
void on_subscription_matched(DDSDataReader *reader, const DDS_SubscriptionMatchedStatus &status)
Handles the case where a subscription has been matched.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:740
Provides monitoring capability of a transport&#39;s bandwidth.
NDDSListener(const TransportSettings &settings, const std::string &id, knowledge::ThreadSafeContext &context, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:56
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
Definition: NddsListener.h:103
Container for NDDS callbacks.
Definition: NddsListener.h:23