1 #include "madara/transport/ndds/NDDSTransportReadThread.h" 6 #include "ace/OS_NS_Thread.h" 14 NDDSKnowledgeUpdateDataReader * reader)
15 : id_ (id), context_ (context),
16 barrier_ (2), terminated_ (false),
17 mutex_ (), is_not_ready_ (mutex_), is_ready_ (false),
23 this->activate (THR_NEW_LWP | THR_DETACHED, 1);
26 DLINFO "NDDSReadThread::NDDSReadThread:" \
27 " read thread started\n"));
54 DLINFO "NDDSReadThread::enter_barrier:" \
55 " beginning thread barrier for shut down of read thread\n"));
62 NDDSKnowledgeUpdate & data)
68 long long value = data.value;
72 DLINFO "NDDSReadThread::handle_assignment:" \
73 " waiting to process assignment\n"));
89 data.quality, data.clock,
false);
97 DLINFO "NDDSReadThread::handle_assignment:" \
98 " received data[%s]=%q from %s.\n",
99 key.c_str (), value, data.originator));
102 else if (result == 0)
105 DLINFO "NDDSReadThread::handle_assignment:" \
106 " discarded data[%s]=%q from %s as the value was already set.\n",
107 key.c_str (), value, data.originator));
109 else if (result == -1)
112 DLINFO "NDDSReadThread::handle_assignment:" \
113 " discarded data due to null key.\n"));
115 else if (result == -2)
118 DLINFO "NDDSReadThread::handle_assignment:" \
119 " discarded data[%s]=%q due to lower quality (%u vs %u).\n",
120 key.c_str (), value, cur_quality, data.quality));
122 else if (result == -3)
125 DLINFO "NDDSReadThread::handle_assignment:" \
126 " discarded data[%s]=%q due to older timestamp (%Q vs %Q).\n",
127 key.c_str (), value, cur_clock, data.clock));
133 NDDSKnowledgeUpdate & data)
140 std::stringstream stream (data.key);
143 DLINFO "NDDSReadThread::multiassignment:" \
144 " waiting to process multiassignment\n"));
149 DLINFO "NDDSReadThread::multiassignment:" \
150 " processing multiassignment (%s).\n",
153 while (!stream.eof ())
155 stream >> key >> symbol >> value >> symbol;
166 data.quality, data.clock,
false);
172 DLINFO "NDDSReadThread::handle_multiassignment:" \
173 " received data[%s]=%q from %s.\n",
174 key.c_str (), value, data.originator));
177 else if (result == 0)
180 DLINFO "NDDSReadThread::handle_multiassignment:" \
181 " discarded data[%s]=%q from %s as the value was already set.\n",
182 key.c_str (), value, data.originator));
184 else if (result == -1)
187 DLINFO "NDDSReadThread::handle_multiassignment:" \
188 " discarded data due to null key.\n"));
190 else if (result == -2)
193 DLINFO "NDDSReadThread::handle_multiassignment:" \
194 " discarded data[%s]=%q due to lower quality (%u vs %u).\n",
195 key.c_str (), value, cur_quality, data.quality));
197 else if (result == -3)
200 DLINFO "NDDSReadThread::handle_multiassignment:" \
201 " discarded data[%s]=%q due to older timestamp (%Q vs %Q).\n",
202 key.c_str (), value, cur_clock, data.clock));
216 DLINFO "NDDSReadThread::svc:" \
217 " entering processing loop.\n"));
219 NDDSKnowledgeUpdateSeq update_data_list;
220 DDSSampleInfoSeq info_seq;
228 DDS_LENGTH_UNLIMITED,
229 DDS_ANY_SAMPLE_STATE,
231 DDS_ANY_INSTANCE_STATE);
234 if (rc == DDS_RETCODE_OK)
236 for (
int i = 0; i < update_data_list.length(); ++i)
241 DLINFO "NDDSReadThread::svc:" \
242 " processing %s=%q from %s with time %Q and quality %u.\n",
243 update_data_list[i].key, update_data_list[i].value,
244 update_data_list[i].originator,
245 update_data_list[i].clock, update_data_list[i].quality));
252 DLINFO "NDDSReadThread::svc:" \
253 " processing multassignment from %s with time %Q and quality %u.\n",
254 update_data_list[i].originator,
255 update_data_list[i].clock, update_data_list[i].quality));
transport::Condition is_not_ready_
uint32_t get_quality(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Atomically gets quality of a variable.
void handle_multiassignment(NDDSKnowledgeUpdate &data)
NDDSKnowledgeUpdateDataReader * data_reader_
void wait_for_ready(void)
int enter_barrier(void)
service exit point for thread
knowledge::ThreadSafeContext & context_
void lock(void) const
Locks the mutex on this context.
This class stores variables and their values for use by any entity needing state information in a thr...
#define MADARA_LOG_MAJOR_DEBUG_INFO
Used to display important configuration information that impacts major MADARA events.
#define MADARA_LOG_MAJOR_EVENT
Used to indicate that a ``significant'' MADARA event has completed.
void unlock(void) const
Unlocks the mutex on this context.
void handle_assignment(NDDSKnowledgeUpdate &data)
NDDSReadThread(const std::string &id, knowledge::ThreadSafeContext &context, NDDSKnowledgeUpdateDataReader *reader)
int set_if_unequal(const std::string &key, madara::knowledge::KnowledgeRecord::Integer value, uint32_t quality, uint64_t clock, const KnowledgeUpdateSettings &settings=KnowledgeUpdateSettings())
Atomically sets if the variable value will be different.
#define MADARA_DEBUG(L, X)
ACE_Barrier barrier_
typdef for a threadsafe counter
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
ACE_Atomic_Op< ACE_Mutex, bool > terminated_
#define MADARA_LOG_MINOR_EVENT
Used to inficate a ``minor'' MADARA event has completed.
::std::vector< ::std::string > assignment_symbols_