MADARA  3.1.8
SpliceDataReaderListener.cpp
Go to the documentation of this file.
5 
6 #include <iostream>
7 #include <sstream>
8 
10  const std::string & id,
12 : id_ (id), context_ (context)
13 {
14 }
15 
17  const SpliceDataReaderListener &ref)
18 : id_ (ref.id_), context_ (ref.context_)
19 {
20 }
21 
23 {}
24 
26  Knowledge::Update & data)
27 {
28  if (data.key.val ())
29  {
30  // if we aren't evaluating a message from ourselves, process it
31  std::string key = data.key.val ();
32  long long value = data.value;
33  int result = 0;
34 
36  DLINFO "SpliceDataReaderListener::handle_assignment:" \
37  " waiting to process assignment\n"));
38 
39  context_.lock ();
40  unsigned long long cur_clock = context_.get_clock (key);
41  unsigned long cur_quality = context_.get_quality (key);
42 
43  // if the data we are updating had a lower clock value or less quality
44  // then that means this update is the latest value. Among
45  // other things, this means our solution will work even
46  // without FIFO channel transports
47 
48  // if the data we are updating had a lower clock value
49  // then that means this update is the latest value. Among
50  // other things, this means our solution will work even
51  // without FIFO channel transports
52  result = context_.set_if_unequal (key, value,
53  data.quality, data.clock, false);
54 
55  context_.unlock ();
56 
57  // if we actually updated the value
58  if (result == 1)
59  {
61  DLINFO "SpliceDataReaderListener::handle_assignment:" \
62  " received data[%s]=%q from %s.\n",
63  key.c_str (), value, data.originator.val ()));
64  }
65  // if the data was already current
66  else if (result == 0)
67  {
69  DLINFO "SpliceDataReaderListener::handle_assignment:" \
70  " discarded data[%s]=%q from %s as the value was already set.\n",
71  key.c_str (), value, data.originator.val ()));
72  }
73  else if (result == -1)
74  {
76  DLINFO "SpliceDataReaderListener::handle_assignment:" \
77  " discarded data due to null key.\n"));
78  }
79  else if (result == -2)
80  {
82  DLINFO "SpliceDataReaderListener::handle_assignment:" \
83  " discarded data[%s]=%q due to lower quality (%u vs %u).\n",
84  key.c_str (), value, cur_quality, data.quality));
85  }
86  else if (result == -3)
87  {
89  DLINFO "SpliceDataReaderListener::handle_assignment:" \
90  " discarded data[%s]=%q due to older timestamp (%Q vs %Q).\n",
91  key.c_str (), value, cur_clock, data.clock));
92  }
93  }
94 }
95 
97  Knowledge::Update & data)
98 {
99  if (data.key.val ())
100  {
101  std::string key;
102  char symbol;
103  long long value;
104  std::stringstream stream (data.key.val ());
105 
107  DLINFO "SpliceDataReaderListener::multiassignment:" \
108  " waiting to process multiassignment\n"));
109 
110  context_.lock ();
111 
113  DLINFO "SpliceDataReaderListener::multiassignment:" \
114  " processing multiassignment (%s).\n",
115  data.key.val ()));
116 
117  while (!stream.eof ())
118  {
119  stream >> key >> symbol >> value >> symbol;
120 
121  int result = 0;
122  unsigned long long cur_clock = context_.get_clock (key);
123  unsigned long cur_quality = context_.get_quality (key);
124 
125  // if the data we are updating had a lower clock value
126  // then that means this update is the latest value. Among
127  // other things, this means our solution will work even
128  // without FIFO channel transports
129  result = context_.set_if_unequal (key, value,
130  data.quality, data.clock, false);
131 
132  // if we actually updated the value
133  if (result == 1)
134  {
136  DLINFO "SpliceDataReaderListener::handle_multiassignment:" \
137  " received data[%s]=%q from %s.\n",
138  key.c_str (), value, data.originator.val ()));
139  }
140  // if the data was already current
141  else if (result == 0)
142  {
144  DLINFO "SpliceDataReaderListener::handle_multiassignment:" \
145  " discarded data[%s]=%q from %s as the value was already set.\n",
146  key.c_str (), value, data.originator.val ()));
147  }
148  else if (result == -1)
149  {
151  DLINFO "SpliceDataReaderListener::handle_multiassignment:" \
152  " discarded data due to null key.\n"));
153  }
154  else if (result == -2)
155  {
157  DLINFO "SpliceDataReaderListener::handle_multiassignment:" \
158  " discarded data[%s]=%q due to lower quality (%u vs %u).\n",
159  key.c_str (), value, cur_quality, data.quality));
160  }
161  else if (result == -3)
162  {
164  DLINFO "SpliceDataReaderListener::handle_multiassignment:" \
165  " discarded data[%s]=%q due to older timestamp (%Q vs %Q).\n",
166  key.c_str (), value, cur_clock, data.clock));
167  }
168  }
169 
170  context_.unlock ();
171  }
172 }
173 
174 void
176  DDS::DataReader_ptr, const DDS::SampleLostStatus &status)
177 {
178 }
179 
180 void
182  DDS::DataReader_ptr, const DDS::SampleRejectedStatus &status)
183 {
184 }
185 
186 void
188  DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus &status)
189 {
190 }
191 
192 void
194  DDS::DataReader_ptr reader, const DDS::RequestedDeadlineMissedStatus & status)
195 {
196 }
197 
198 void
200  DDS::DataReader_ptr reader, const DDS::LivelinessChangedStatus & status)
201 {
202 }
203 
204 void
206  DDS::DataReader_ptr reader, const DDS::SubscriptionMatchedStatus & status)
207 {
209 }
210 
211 void
213  DDS::DataReader_ptr reader)
214 {
215  DDS::SampleInfoSeq_var infoList = new DDS::SampleInfoSeq;
216  DDS::ReturnCode_t dds_result;
217  int amount;
218  DDS::Boolean result = false;
219  Knowledge::UpdateSeq_var update_data_list_ = new Knowledge::UpdateSeq;
220 
221  Knowledge::UpdateDataReader_ptr update_reader =
222  dynamic_cast<Knowledge::UpdateDataReader_ptr> (reader);
223 
224  if (update_reader == 0)
225  {
227  "\nSpliceDataReaderListener::on_data_available:" \
228  " Unable to create specialized reader. Leaving callback...\n"));
229  return;
230  }
231 
232  dds_result = update_reader->take (update_data_list_, infoList, 1,
233  DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
234 
235  amount = update_data_list_->length ();
236 
237  //ACE_DEBUG ((LM_DEBUG, "(%P|%t) Returning from take with %d items.\n",
238  // amount));
239 
240  if (amount != 0)
241  {
242  for (int i = 0; i < amount; ++i)
243  {
244  // if we are evaluating a message from ourselves, just continue
245  // to the next one. It's also possible to receive null originators
246  // from what I can only guess is the ospl daemon messing up
247  if (!update_data_list_[i].originator.val () ||
248  id_ == update_data_list_[i].originator.val ())
249  {
250  // if we don't check originator for null, we get phantom sends
251  // when the program exits.
253  DLINFO "\nSpliceDataReaderListener::on_data_available:" \
254  " discarding null originator event.\n"));
255 
256  continue;
257  }
258 
259  if (madara::knowledge::ASSIGNMENT == update_data_list_[i].type)
260  {
262  DLINFO "\nSpliceDataReaderListener::on_data_available:" \
263  " processing %s=%q from %s with time %Q and quality %u.\n",
264  update_data_list_[i].key.val (), update_data_list_[i].value,
265  update_data_list_[i].originator.val (),
266  update_data_list_[i].clock, update_data_list_[i].quality));
267 
268  handle_assignment (update_data_list_[i]);
269  }
270  else if (madara::knowledge::MULTIPLE_ASSIGNMENT == update_data_list_[i].type)
271  {
273  DLINFO "\nSpliceDataReaderListener::on_data_available:" \
274  " processing multassignment from %s with time %Q and quality %u.\n",
275  update_data_list_[i].originator.val (),
276  update_data_list_[i].clock, update_data_list_[i].quality));
277 
278  handle_multiassignment (update_data_list_[i]);
279  }
280 
281  // otherwise the key was null, which is unusable
282  }
283  }
284  dds_result = update_reader->return_loan (update_data_list_, infoList);
285 
286 }
uint32_t get_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets quality of a variable.
#define DLINFO
Definition: LogMacros.h:27
void lock(void) const
Locks the mutex on this context.
This class stores variables and their values for use by any entity needing state information in a thr...
#define MADARA_ERROR(L, X)
Definition: LogMacros.h:114
#define MADARA_LOG_MAJOR_DEBUG_INFO
Used to display important configuration information that impacts major MADARA events.
Definition: LogMacros.h:58
#define MADARA_LOG_MAJOR_EVENT
Used to indicate that a ``significant&#39;&#39; MADARA event has completed.
Definition: LogMacros.h:50
void on_requested_incompatible_qos(DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus &status)
DDS callback for incompatible qos.
void unlock(void) const
Unlocks the mutex on this context.
int set_if_unequal(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value, uint32_t quality, uint64_t clock, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets if the variable value will be different.
static constexpr struct madara::knowledge::tags::string_t string
#define MADARA_DEBUG(L, X)
Definition: LogMacros.h:137
void on_sample_lost(DDS::DataReader_ptr, const DDS::SampleLostStatus &status)
DDS callback for sample lost.
void set_changed(void)
Force a change to be registered, waking up anyone waiting on entry.
void on_requested_deadline_missed(DDS::DataReader_ptr, const DDS::RequestedDeadlineMissedStatus &status)
DDS callback for deadline being missed.
void on_subscription_matched(DDS::DataReader_ptr reader, const DDS::SubscriptionMatchedStatus &status)
DDS callback for subscription matched.
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
SpliceDataReaderListener(const std::string &id, knowledge::ThreadSafeContext &context)
Container for DDS-related callbacks (deprecated and unused)
void on_liveliness_changed(DDS::DataReader_ptr, const DDS::LivelinessChangedStatus &status)
DDS callback for sample rejected.
#define MADARA_LOG_MINOR_EVENT
Used to inficate a ``minor&#39;&#39; MADARA event has completed.
Definition: LogMacros.h:52
#define MADARA_LOG_EVENT_TRACE
Used to trace significant actions within major/minor events.
Definition: LogMacros.h:55
void on_sample_rejected(DDS::DataReader_ptr, const DDS::SampleRejectedStatus &status)
DDS callback for sample rejected.
#define MADARA_LOG_TERMINAL_ERROR
Used for MADARA errors at the point the error exits the process in question, or when a decision is ma...
Definition: LogMacros.h:40