MADARA  3.1.8
NddsTransportReadThread.cpp
Go to the documentation of this file.
1 #include "madara/transport/ndds/NDDSTransportReadThread.h"
5 
6 #include "ace/OS_NS_Thread.h"
7 
8 #include <iostream>
9 #include <sstream>
10 
12  const std::string & id,
14  NDDSKnowledgeUpdateDataReader * reader)
15  : id_ (id), context_ (context),
16  barrier_ (2), terminated_ (false),
17  mutex_ (), is_not_ready_ (mutex_), is_ready_ (false),
18  data_reader_ (reader)
19 {
20  assignment_symbols_.push_back ("=");
21  assignment_symbols_.push_back (";");
22 
23  this->activate (THR_NEW_LWP | THR_DETACHED, 1);
24 
26  DLINFO "NDDSReadThread::NDDSReadThread:" \
27  " read thread started\n"));
28 }
29 
31 {
32  close ();
33 }
34 
35 int
37 {
38  terminated_ = true;
39  enter_barrier ();
40 
41  return 0;
42 }
43 
44 void
46 {
47  if (!is_ready_)
48  is_not_ready_.wait ();
49 }
50 
52 {
54  DLINFO "NDDSReadThread::enter_barrier:" \
55  " beginning thread barrier for shut down of read thread\n"));
56 
57  barrier_.wait ();
58  return 0;
59 }
60 
62  NDDSKnowledgeUpdate & data)
63 {
64  if (data.key)
65  {
66  // if we aren't evaluating a message from ourselves, process it
67  std::string key = data.key;
68  long long value = data.value;
69  int result = 0;
70 
72  DLINFO "NDDSReadThread::handle_assignment:" \
73  " waiting to process assignment\n"));
74 
75  context_.lock ();
76  unsigned long long cur_clock = context_.get_clock (key);
77  unsigned long cur_quality = context_.get_quality (key);
78 
79  // if the data we are updating had a lower clock value or less quality
80  // then that means this update is the latest value. Among
81  // other things, this means our solution will work even
82  // without FIFO channel transports
83 
84  // if the data we are updating had a lower clock value
85  // then that means this update is the latest value. Among
86  // other things, this means our solution will work even
87  // without FIFO channel transports
88  result = context_.set_if_unequal (key, value,
89  data.quality, data.clock, false);
90 
91  context_.unlock ();
92 
93  // if we actually updated the value
94  if (result == 1)
95  {
97  DLINFO "NDDSReadThread::handle_assignment:" \
98  " received data[%s]=%q from %s.\n",
99  key.c_str (), value, data.originator));
100  }
101  // if the data was already current
102  else if (result == 0)
103  {
105  DLINFO "NDDSReadThread::handle_assignment:" \
106  " discarded data[%s]=%q from %s as the value was already set.\n",
107  key.c_str (), value, data.originator));
108  }
109  else if (result == -1)
110  {
112  DLINFO "NDDSReadThread::handle_assignment:" \
113  " discarded data due to null key.\n"));
114  }
115  else if (result == -2)
116  {
118  DLINFO "NDDSReadThread::handle_assignment:" \
119  " discarded data[%s]=%q due to lower quality (%u vs %u).\n",
120  key.c_str (), value, cur_quality, data.quality));
121  }
122  else if (result == -3)
123  {
125  DLINFO "NDDSReadThread::handle_assignment:" \
126  " discarded data[%s]=%q due to older timestamp (%Q vs %Q).\n",
127  key.c_str (), value, cur_clock, data.clock));
128  }
129  }
130 }
131 
133  NDDSKnowledgeUpdate & data)
134 {
135  if (data.key)
136  {
137  std::string key;
138  char symbol;
139  long long value;
140  std::stringstream stream (data.key);
141 
143  DLINFO "NDDSReadThread::multiassignment:" \
144  " waiting to process multiassignment\n"));
145 
146  context_.lock ();
147 
149  DLINFO "NDDSReadThread::multiassignment:" \
150  " processing multiassignment (%s).\n",
151  data.key));
152 
153  while (!stream.eof ())
154  {
155  stream >> key >> symbol >> value >> symbol;
156 
157  int result = 0;
158  unsigned long long cur_clock = context_.get_clock (key);
159  unsigned long cur_quality = context_.get_quality (key);
160 
161  // if the data we are updating had a lower clock value
162  // then that means this update is the latest value. Among
163  // other things, this means our solution will work even
164  // without FIFO channel transports
165  result = context_.set_if_unequal (key, value,
166  data.quality, data.clock, false);
167 
168  // if we actually updated the value
169  if (result == 1)
170  {
172  DLINFO "NDDSReadThread::handle_multiassignment:" \
173  " received data[%s]=%q from %s.\n",
174  key.c_str (), value, data.originator));
175  }
176  // if the data was already current
177  else if (result == 0)
178  {
180  DLINFO "NDDSReadThread::handle_multiassignment:" \
181  " discarded data[%s]=%q from %s as the value was already set.\n",
182  key.c_str (), value, data.originator));
183  }
184  else if (result == -1)
185  {
187  DLINFO "NDDSReadThread::handle_multiassignment:" \
188  " discarded data due to null key.\n"));
189  }
190  else if (result == -2)
191  {
193  DLINFO "NDDSReadThread::handle_multiassignment:" \
194  " discarded data[%s]=%q due to lower quality (%u vs %u).\n",
195  key.c_str (), value, cur_quality, data.quality));
196  }
197  else if (result == -3)
198  {
200  DLINFO "NDDSReadThread::handle_multiassignment:" \
201  " discarded data[%s]=%q due to older timestamp (%Q vs %Q).\n",
202  key.c_str (), value, cur_clock, data.clock));
203  }
204  }
205 
206  context_.unlock ();
207  }
208 }
209 
210 int
212 {
213  // if we don't check originator for null, we get phantom sends
214  // when the program exits.
216  DLINFO "NDDSReadThread::svc:" \
217  " entering processing loop.\n"));
218 
219  NDDSKnowledgeUpdateSeq update_data_list;
220  DDSSampleInfoSeq info_seq;
221  DDS_ReturnCode_t rc;
222 
223  while (false == terminated_.value ())
224  {
225  rc = data_reader_->take(
226  update_data_list,
227  info_seq,
228  DDS_LENGTH_UNLIMITED,
229  DDS_ANY_SAMPLE_STATE,
230  DDS_ANY_VIEW_STATE,
231  DDS_ANY_INSTANCE_STATE);
232 
233  // data is valid
234  if (rc == DDS_RETCODE_OK)
235  {
236  for (int i = 0; i < update_data_list.length(); ++i)
237  {
238  if (madara::knowledge::ASSIGNMENT == update_data_list[i].type)
239  {
241  DLINFO "NDDSReadThread::svc:" \
242  " processing %s=%q from %s with time %Q and quality %u.\n",
243  update_data_list[i].key, update_data_list[i].value,
244  update_data_list[i].originator,
245  update_data_list[i].clock, update_data_list[i].quality));
246 
247  handle_assignment (update_data_list[i]);
248  }
249  else if (madara::knowledge::MULTIPLE_ASSIGNMENT == update_data_list[i].type)
250  {
252  DLINFO "NDDSReadThread::svc:" \
253  " processing multassignment from %s with time %Q and quality %u.\n",
254  update_data_list[i].originator,
255  update_data_list[i].clock, update_data_list[i].quality));
256 
257  handle_multiassignment (update_data_list[i]);
258  }
259  }
260  }
261  }
262 
263  enter_barrier ();
264  return 0;
265 }
uint32_t get_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets quality of a variable.
void handle_multiassignment(NDDSKnowledgeUpdate &data)
#define DLINFO
Definition: LogMacros.h:27
NDDSKnowledgeUpdateDataReader * data_reader_
int enter_barrier(void)
service exit point for thread
knowledge::ThreadSafeContext & context_
void lock(void) const
Locks the mutex on this context.
This class stores variables and their values for use by any entity needing state information in a thr...
#define MADARA_LOG_MAJOR_DEBUG_INFO
Used to display important configuration information that impacts major MADARA events.
Definition: LogMacros.h:58
#define MADARA_LOG_MAJOR_EVENT
Used to indicate that a ``significant&#39;&#39; MADARA event has completed.
Definition: LogMacros.h:50
void unlock(void) const
Unlocks the mutex on this context.
void handle_assignment(NDDSKnowledgeUpdate &data)
NDDSReadThread(const std::string &id, knowledge::ThreadSafeContext &context, NDDSKnowledgeUpdateDataReader *reader)
int set_if_unequal(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value, uint32_t quality, uint64_t clock, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets if the variable value will be different.
static constexpr struct madara::knowledge::tags::string_t string
#define MADARA_DEBUG(L, X)
Definition: LogMacros.h:137
ACE_Barrier barrier_
typdef for a threadsafe counter
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
ACE_Atomic_Op< ACE_Mutex, bool > terminated_
#define MADARA_LOG_MINOR_EVENT
Used to inficate a ``minor&#39;&#39; MADARA event has completed.
Definition: LogMacros.h:52
::std::vector< ::std::string > assignment_symbols_