MADARA  3.1.8
SpliceDDSTransport.cpp
Go to the documentation of this file.
4 #include <time.h>
5 
6 #include <iostream>
7 #include <sstream>
9 
10 namespace logger = madara::logger;
11 
13  "MADARA_KaRL_Data",
14  "MADARA_KaRL_Control"
15 };
16 
17 /* Array to hold the names for all ReturnCodes. */
19  "DDS_RETCODE_OK",
20  "DDS_RETCODE_ERROR",
21  "DDS_RETCODE_UNSUPPORTED",
22  "DDS_RETCODE_BAD_PARAMETER",
23  "DDS_RETCODE_PRECONDITION_NOT_MET",
24  "DDS_RETCODE_OUT_OF_RESOURCES",
25  "DDS_RETCODE_NOT_ENABLED",
26  "DDS_RETCODE_IMMUTABLE_POLICY",
27  "DDS_RETCODE_INCONSISTENT_POLICY",
28  "DDS_RETCODE_ALREADY_DELETED",
29  "DDS_RETCODE_TIMEOUT",
30  "DDS_RETCODE_NO_DATA",
31  "DDS_RETCODE_ILLEGAL_OPERATION" };
32 
33 const char * madara::transport::SpliceDDSTransport::partition_ = "Madara_knowledge";
34 
36  const std::string & id,
38  TransportSettings & config, bool launch_transport)
39  : madara::transport::Base (id, config, context),
40  domain_ (0), domain_factory_ (0),
41  domain_participant_ (0), publisher_ (0), subscriber_ (0),
42  datawriter_ (0), datareader_ (0),
43  update_writer_ (0), update_reader_ (0),
44  update_topic_ (0)
45 {
46  // create a reference to the knowledge base for threading
47  knowledge_.use (context);
48 
49  // set the data plane for the read threads
51 
52  if (launch_transport)
53  setup ();
54 }
56 {
57  close ();
58 }
59 
60 void
62 {
63  this->invalidate_transport ();
64 
66 
68 
69  //if (subscriber_.in ())
70  //{
71  // subscriber_->delete_datareader (update_reader_);
72  //}
73 
74  //if (publisher_.in ())
75  //{
76  // publisher_->delete_datawriter (update_writer_);
77  // publisher_->delete_datawriter (latency_update_writer_);
78  //}
79 
80 
81  //if (domain_participant_.in ())
82  //{
83  // domain_participant_->delete_subscriber (subscriber_);
84  // domain_participant_->delete_publisher (publisher_);
85  // domain_participant_->delete_topic (update_topic_);
86  //}
87 
88  //if (domain_factory_.in ())
89  // domain_factory_->delete_participant (domain_participant_);
90 
91  update_reader_ = 0;
92  update_writer_ = 0;
93  update_topic_ = 0;
94  subscriber_ = 0;
95  publisher_ = 0;
97  domain_factory_ = 0;
98 
99  this->shutting_down_ = false;
100 }
101 
102 int
104 {
105  return this->settings_.reliability;
106 }
107 
108 int
110 {
111  return this->settings_.reliability = setting;
112 }
113 
114 int
116 {
117  Base::setup ();
118  DDS::ReturnCode_t status;
119 
120  this->is_valid_ = false;
121 
122  // reset the valid setup flag
123  //valid_setup_ = false;
124 
126  "SpliceDDSTransport::setup:" \
127  " Creating a participant for topic (%s)\n",
129 
131  "SpliceDDSTransport::setup:" \
132  " Participant settings are being read from the OSPL_URI environment"
133  " variable\n",
135 
136  // get the domain participant factory
137  domain_factory_ = DDS::DomainParticipantFactory::get_instance ();
138  domain_factory_->get_default_participant_qos (part_qos_);
139  domain_participant_ = domain_factory_->create_participant (
140  domain_,
141  part_qos_, NULL, DDS::STATUS_MASK_NONE);
142 
143  // if dp == NULL, we've got an error
144  if (domain_participant_ == NULL)
145  {
147  "\nSpliceDDSTransport::setup:" \
148  " splice daemon not running. Try 'ospl start'...\n");
149 
150  exit (-2);
151  }
152 
153  domain_participant_->get_default_topic_qos(topic_qos_);
154 
156  {
157  topic_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
158  topic_qos_.history.depth = this->settings_.queue_length;
159  topic_qos_.resource_limits.max_samples_per_instance =
160  this->settings_.queue_length;
161  topic_qos_.resource_limits.max_samples = this->settings_.queue_length;
162  topic_qos_.destination_order.kind =
163  DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
164  //topic_qos_.
165  }
166  //topic_qos_.resource_limits.max_samples_per_instance= 10;
167  domain_participant_->set_default_topic_qos(topic_qos_);
168 
169 
171  "SpliceDDSTransport::setup:" \
172  " Registering type support\n");
173 
174  // Register Update type
175  status = this->update_type_support_.register_type (
176  domain_participant_, "Knowledge::Update");
177  check_status(status, "Knowledge::UpdateTypeSupport::register_type");
178 
179  // Register Mutex type
180  //status = this->mutex_type_support_.register_type (
181  // domain_participant_, "Knowledge::Mutex");
182  //check_status(status, "Knowledge::MutexTypeSupport::register_type");
183 
185  "SpliceDDSTransport::setup:" \
186  " Setting up knowledge domain via topic (%s)\n",
188 
189  // Create Update topic
190  update_topic_ = domain_participant_->create_topic (
192  "Knowledge::Update",
193  topic_qos_, NULL, DDS::STATUS_MASK_NONE);
194  check_handle(update_topic_,
195  "DDS::DomainParticipant::create_topic (KnowledgeUpdate)");
196 
197  // Get default qos for publisher
198  status = domain_participant_->get_default_publisher_qos (pub_qos_);
199  check_status(status, "DDS::DomainParticipant::get_default_publisher_qos");
200 
201 
203  {
204  pub_qos_.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
205  pub_qos_.presentation.coherent_access = true;
206  pub_qos_.presentation.ordered_access = false;
207  //topic_qos_.
208  }
209 
211  "SpliceDDSTransport::setup:" \
212  " Creating publisher for topic (%s)\n",
214 
215  // Create publisher
216  pub_qos_.partition.name.length (1);
217  pub_qos_.partition.name[0] = DDS::string_dup (partition_);
218  publisher_ = domain_participant_->create_publisher (
219  pub_qos_, NULL, DDS::STATUS_MASK_NONE);
220  check_handle(publisher_, "DDS::DomainParticipant::create_publisher");
221 
222  // Create subscriber
223  status = domain_participant_->get_default_subscriber_qos (sub_qos_);
224  check_status(status, "DDS::DomainParticipant::get_default_subscriber_qos");
225 
226 
228  {
229  sub_qos_.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
230  sub_qos_.presentation.coherent_access = true;
231  sub_qos_.presentation.ordered_access = false;
232  }
233 
235  "SpliceDDSTransport::setup:" \
236  " Creating subscriber for topic (%s)\n",
238 
239  sub_qos_.partition.name.length (1);
240  sub_qos_.partition.name[0] = DDS::string_dup (partition_);
241  subscriber_ = domain_participant_->create_subscriber (
242 // sub_qos_, &sub_listener_, DDS::DATA_AVAILABLE_STATUS | DDS::DATA_ON_READERS_STATUS);
243  sub_qos_, NULL, DDS::STATUS_MASK_NONE);
244  check_handle(subscriber_, "DDS::DomainParticipant::create_subscriber");
245 
246  if (!subscriber_ || !publisher_)
247  {
249  "SpliceDDSTransport::setup:" \
250  " pub or sub could not be created. Try 'ospl stop; ospl start'...\n");
251 
252  exit (-2);
253  }
254 
255  // Create datawriter
256  publisher_->get_default_datawriter_qos (datawriter_qos_);
257  publisher_->copy_from_topic_qos(datawriter_qos_, topic_qos_);
258 
260  {
262  "SpliceDDSTransport::setup:" \
263  " Enabling reliable transport for (%s) datawriters\n",
265 
266  datawriter_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
267  datawriter_qos_.history.depth = this->settings_.queue_length;
268  datawriter_qos_.resource_limits.max_samples = this->settings_.queue_length;
269  datawriter_qos_.resource_limits.max_samples_per_instance =
270  this->settings_.queue_length;
271  datawriter_qos_.destination_order.kind =
272  DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
273  }
274  else
275  {
277  "SpliceDDSTransport::setup:" \
278  " Enabling unreliable transport for (%s) datawriters\n",
280  }
281 
283  "SpliceDDSTransport::setup:" \
284  " Creating datawriter for topic (%s)\n",
286 
287  // Create Update writer
288  datawriter_ = publisher_->create_datawriter (update_topic_,
289  datawriter_qos_, NULL, DDS::STATUS_MASK_NONE);
290  check_handle(datawriter_, "DDS::Publisher::create_datawriter (Update)");
291  update_writer_ = dynamic_cast<Knowledge::UpdateDataWriter_ptr> (datawriter_.in ());
292  check_handle(update_writer_, "Knowledge::UpdateDataWriter_ptr::narrow");
293 
294  // Create Latency Update writer for Read Thread
295  latencywriter_ = publisher_->create_datawriter (update_topic_,
296  datawriter_qos_, NULL, DDS::STATUS_MASK_NONE);
297  check_handle(latencywriter_, "DDS::Publisher::create_datawriter (Update)");
298  latency_update_writer_ = dynamic_cast<Knowledge::UpdateDataWriter_ptr> (latencywriter_.in ());
299  check_handle(latency_update_writer_, "Knowledge::UpdateDataWriter_ptr::narrow");
300 
301 
302  // Create datareader
303  status = subscriber_->get_default_datareader_qos (datareader_qos_);
304  subscriber_->copy_from_topic_qos (datareader_qos_, topic_qos_);
305  //publisher_->copy_from_topic_qos(datawriter_qos_, topic_qos_);
306  check_status(status, "DDS::Subscriber::get_default_datareader_qos");
307 
308  datareader_qos_.reader_data_lifecycle.enable_invalid_samples = FALSE;
309 
311  {
313  "SpliceDDSTransport::setup:" \
314  " Enabling reliable transport for (%s) datareaders\n",
316 
317  datareader_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
318  datareader_qos_.history.depth = this->settings_.queue_length;
319  datareader_qos_.resource_limits.max_samples = this->settings_.queue_length;
320  datareader_qos_.destination_order.kind =
321  DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
322 
323  // unlike the other qos, we do not set max_samples_per_instance here.
324  // that shouldn't be as necessary, since we are using take on the reader
325  }
326  else
327  {
329  "SpliceDDSTransport::setup:" \
330  " Enabling unreliable transport for (%s) datareaders\n",
332  }
333 
335  "SpliceDDSTransport::setup:" \
336  " Creating datareader for topic (%s)\n",
338 
339  // Create Update datareader
340  datareader_ = subscriber_->create_datareader (update_topic_,
341  //datareader_qos_, &dr_listener_, DDS::STATUS_MASK_NONE);
342  datareader_qos_, NULL, DDS::STATUS_MASK_NONE);
343 
344  // notes: we set the mask to none because the listener will be called
345  // by the subscriber listener with notify_datareaders. This is the Splice
346  // way of doing this, since we require subscription information and they
347  // have so far not implemented on_subscription_matched.
348 
349  check_handle(datareader_, "DDS::Subscriber::create_datareader (Update)");
350  update_reader_ = dynamic_cast<Knowledge::UpdateDataReader_ptr>(datareader_.in ());
351  check_handle(update_reader_, "Knowledge::UpdateDataReader_ptr::narrow");
352 
353  if (!settings_.no_receiving)
354  {
355  double hertz = settings_.read_thread_hertz;
356  if (hertz < 0.0)
357  {
358  hertz = 0.0;
359  }
360 
362  "UdpTransportReadThread::setup:" \
363  " starting %d threads at %f hertz\n", settings_.read_threads,
364  hertz);
365 
366  for (uint32_t i = 0; i < settings_.read_threads; ++i)
367  {
368  std::stringstream thread_name;
369  thread_name << "read";
370  thread_name << i;
371 
372  read_threads_.run (hertz, thread_name.str (),
376  }
377  }
378 
379  return this->validate_transport ();
380 }
381 
382 long
384  const knowledge::KnowledgeRecords & updates)
385 {
386  long result = 0;
387 
388  if (!settings_.no_sending)
389  {
390  result = prep_send (updates, "SpliceDDSTransport::send_data:");
391 
392  // get the maximum quality from the updates
393  uint32_t quality = knowledge::max_quality (updates);
394 
396  unsigned long long cur_clock = context_.get_clock ();
397 
398  DDS::ReturnCode_t dds_result;
399  DDS::InstanceHandle_t handle;
400 
401  Knowledge::Update data;
402 
403  data.buffer = Knowledge::seq_oct (result, result, (unsigned char *)buffer_.get_ptr ());
404  data.clock = cur_clock;
405  data.quality = quality;
406  data.updates = DDS::ULong (updates.size ());
407  data.originator = DDS::string_dup (id_.c_str ());
408  data.type = madara::transport::MULTIASSIGN;
409  data.ttl = settings_.get_rebroadcast_ttl ();
410  data.timestamp = time (NULL);
411  data.madara_id = DDS::string_dup (MADARA_IDENTIFIER);
412 
414  "SpliceDDSTransport::send:" \
415  " sending multiassignment: %d updates, time=llu, quality=%d\n",
416  data.updates, cur_clock, quality);
417 
418  handle = update_writer_->register_instance (data);
419  dds_result = update_writer_->write (data, handle);
420  result = (long)dds_result;
421  //update_writer_->unregister_instance (data, handle);
422  }
423 
424  return result;
425 }
426 
427 void
429  const char * info)
430 {
431  if (!handle)
432  {
434  "SpliceDDSTransport::check_handle:" \
435  " error in %s: Creation failed: invalid handle\n", info);
436 
437  exit (-2);
438  }
439 }
440 
441 void
443  const char * info)
444 {
445  // if the status is okay, then return without issue
446  if ((status == DDS::RETCODE_OK) || (status == DDS::RETCODE_NO_DATA))
447  return;
448 
450  "SpliceDDSTransport::check_status:" \
451  " error in %s: Creation failed: %s\n",
452  info, get_error_name (status));
453 
454  exit (-2);
455 }
456 
457 const char *
459 {
460  return ret_code_names[status];
461 }
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
volatile bool is_valid_
Definition: Transport.h:179
QoSTransportSettings settings_
Definition: Transport.h:188
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:815
#define MADARA_IDENTIFIER
Definition: MessageHeader.h:21
int reliability(void) const
Accesses reliability setting.
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:34
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
This class stores variables and their values for use by any entity needing state information in a thr...
DDS::DomainParticipant_var domain_participant_
Knowledge::UpdateDataReader_var update_reader_
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
double read_thread_hertz
number of valid messages allowed to be received per second.
Provides knowledge logging services to files and terminals.
Definition: GlobalLogger.h:11
Holds basic transport settings.
Knowledge::UpdateDataWriter_var update_writer_
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
DDS::DomainParticipantFactory_var domain_factory_
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:203
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:73
long send_data(const knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.
threads::Threader read_threads_
threads for reading knowledge updates
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
void check_handle(void *handle, const char *info)
Splice handle checker.
SpliceDDSTransport(const std::string &id, knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
std::string write_domain
All class members are accessible to users for easy setup.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
volatile bool shutting_down_
Definition: Transport.h:180
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
bool no_sending
if true, never send over transport
Knowledge::UpdateDataWriter_var latency_update_writer_
MADARA_Export std::string & dds_topicify(std::string &input)
Changes periods to underscores in compliance with OpenSplice needs.
Definition: Utility.cpp:112
Knowledge::UpdateTypeSupport update_type_support_
int setup(void)
Activates this transport.
Thread for reading knowledge updates via waitsets.
const std::string id_
host:port identifier of this process
Definition: Transport.h:186
static constexpr struct madara::knowledge::tags::string_t string
uint32_t read_threads
the number of read threads to start
void close(void)
Closes this transport.
uint32_t queue_length
Length of the buffer used to store history of events.
void terminate(const std::string name)
Requests a specific thread to terminate.
Definition: Threader.cpp:150
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
void check_status(DDS::ReturnCode_t status, const char *info)
Splice status checker.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:206
uint32_t reliability
Reliability required of the transport.
Copyright (c) 2015 Carnegie Mellon University.
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:209
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
const char * get_error_name(DDS::ReturnCode_t status)
Returns error name of the specific status.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:33
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:143
Base class from which all transports must be derived.
Definition: Transport.h:62
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:171
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:200
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:191
void use(ThreadSafeContext &original)
Refer to and use another knowledge base&#39;s context.