MADARA  3.1.8
NddsTransport.cpp
Go to the documentation of this file.
2 //#include "madara/transport/ndds/NddsTransportReadThread.h"
6 
7 namespace logger = madara::logger;
8 
9 #include <iostream>
10 #include <sstream>
11 
13  "MADARA_KaRL_Data",
14  "MADARA_KaRL_Control"
15 };
16 
17 const char * madara::transport::NddsTransport::partition_ = "Madara_knowledge";
18 
20  const std::string & id,
22  TransportSettings & config, bool launch_transport)
23  : madara::transport::Base (id, config, context),
24  domain_participant_ (0), update_topic_ (0), update_writer_ (0),
25  listener_ (0)
26 {
27  if (launch_transport)
28  setup ();
29 }
30 
32 {
33  close ();
34 }
35 
36 void
38 {
39  DDS_ReturnCode_t rc;
40  this->invalidate_transport ();
41 
42  if (subscriber_)
43  {
44  subscriber_->delete_datareader (data_reader_);
45  }
46 
47  if (publisher_)
48  {
49  publisher_->delete_datawriter (data_writer_);
50  }
51 
53  {
54  domain_participant_->delete_subscriber (subscriber_);
55  domain_participant_->delete_publisher (publisher_);
56  domain_participant_->delete_topic (update_topic_);
57  }
58 
60  {
61  /* Perform a clean shutdown of the participant and all the contained
62  * entities
63  */
64  rc = domain_participant_->delete_contained_entities();
65  if (rc != DDS_RETCODE_OK)
66  {
68  "NddsTransport::close: unable to delete participant entities\n");
69  }
70 
71  rc = DDSDomainParticipantFactory::get_instance()->delete_participant(
73  if (rc != DDS_RETCODE_OK)
74  {
76  "NddsTransport::close: unable to delete participant\n");
77  }
78  }
79 
81  subscriber_ = 0;
82  publisher_ = 0;
83  update_writer_ = 0;
84  update_topic_ = 0;
85 
86  this->shutting_down_ = false;
87 }
88 
89 int
91 {
92  return this->settings_.reliability;
93 }
94 
95 int
97 {
98  return this->settings_.reliability = setting;
99 }
100 
101 int
103 {
104  Base::setup ();
105 
106  DDS_ReturnCode_t rc;
107  DDS_DomainId_t domain = 0;
108  this->is_valid_ = false;
109 
110  std::stringstream domainreader;
111  domainreader << this->settings_.write_domain;
112  domainreader >> domain;
113 
114  // create the domain participant
115 
116  domain_participant_ = DDSDomainParticipantFactory::get_instance ()->
117  create_participant(
118  domain,
119  DDS_PARTICIPANT_QOS_DEFAULT,
120  NULL, /* Listener */
121  DDS_STATUS_MASK_NONE);
122 
123 
124  if (domain_participant_ == NULL)
125  {
127  "NddsTransport::setup:" \
128  " Unable to start the NDDS transport. Exiting...\n");
129 
130  exit (-2);
131  }
132 
133  DDS_TopicQos topic_qos;
134  domain_participant_->get_default_topic_qos(topic_qos);
135 
137  {
138  topic_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
139  topic_qos.history.depth = this->settings_.queue_length;
140  topic_qos.resource_limits.max_samples_per_instance =
141  this->settings_.queue_length;
142  topic_qos.resource_limits.max_samples = this->settings_.queue_length;
143  topic_qos.destination_order.kind =
144  DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
145  }
146  //topic_qos_.resource_limits.max_samples_per_instance= 10;
147  domain_participant_->set_default_topic_qos(topic_qos);
148 
149  // register the Knowledge Update Type
150  rc = Ndds_Knowledge_UpdateTypeSupport::register_type (
152  Ndds_Knowledge_UpdateTypeSupport::get_type_name());
153 
154  if (rc != DDS_RETCODE_OK)
155  {
157  "NddsTransport::setup:" \
158  " Unable to register the knowledge update data type. Exiting...\n");
159 
160  exit (-2);
161  }
162 
163  // create the knowledge topic
164  update_topic_ = domain_participant_->create_topic (
165  topic_names_[0],
166  Ndds_Knowledge_UpdateTypeSupport::get_type_name(),
167  topic_qos,
168  NULL, /* listener */
169  DDS_STATUS_MASK_NONE);
170  if (update_topic_ == 0)
171  {
173  "NddsTransport::setup:" \
174  " Unable to create topic. Exiting...\n");
175 
176  exit (-2);
177  }
178 
179  DDS_PublisherQos pub_qos;
180 
181  domain_participant_->get_default_publisher_qos (pub_qos);
182 
184  {
185  pub_qos.presentation.access_scope = DDS_TOPIC_PRESENTATION_QOS;
186  pub_qos.presentation.coherent_access = true;
187  pub_qos.presentation.ordered_access = false;
188  }
189 
190  // create the publisher_
191  publisher_ = domain_participant_->create_publisher (
192  pub_qos,
193  NULL, /* listener */
194  DDS_STATUS_MASK_NONE);
195  if (publisher_ == 0)
196  {
198  "NddsTransport::setup:" \
199  " Unable to create publisher_. Exiting...\n");
200  exit (-2);
201  }
202 
203  DDS_DataWriterQos datawriter_qos;
204  publisher_->get_default_datawriter_qos (datawriter_qos);
205  publisher_->copy_from_topic_qos (datawriter_qos, topic_qos);
206 
207  // create a topic data writer
208  data_writer_ = publisher_->create_datawriter (
210  datawriter_qos,
211  NULL, /* listener */
212  DDS_STATUS_MASK_NONE);
213  if (data_writer_ == 0)
214  {
216  "NddsTransport::setup:" \
217  " Unable to create topic data writer. Exiting...\n");
218 
219  exit (-2);
220  }
221 
222  // create the specialized data writer for our data type
223  update_writer_ = Ndds_Knowledge_UpdateDataWriter::narrow(data_writer_);
224  if (update_writer_ == 0)
225  {
227  "NddsTransport::setup:" \
228  " Unable to create narrowed data writer. Exiting...\n");
229 
230  exit (-2);
231  }
232 
233  DDS_SubscriberQos sub_qos;
234 
236  {
237  sub_qos.presentation.access_scope = DDS_TOPIC_PRESENTATION_QOS;
238  sub_qos.presentation.coherent_access = true;
239  sub_qos.presentation.ordered_access = false;
240  }
241 
242  // create a subscriber_
243  subscriber_ = domain_participant_->create_subscriber (
244  sub_qos,
245  NULL, /* listener */
246  DDS_STATUS_MASK_NONE);
247  if (subscriber_ == 0)
248  {
250  "NddsTransport::setup:" \
251  " Unable to create subscriber_. Exiting...\n");
252 
253  exit (-2);
254  }
255 
256 
257  DDS_DataReaderQos datareader_qos;
258  subscriber_->get_default_datareader_qos (datareader_qos);
259  subscriber_->copy_from_topic_qos (datareader_qos, topic_qos);
260 
262  {
263  datareader_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
264  datareader_qos.history.depth = this->settings_.queue_length;
265  datareader_qos.resource_limits.max_samples = this->settings_.queue_length;
266  datareader_qos.destination_order.kind =
267  DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
268  }
269 
272 
273  // create a reader for the topic
274  data_reader_ = subscriber_->create_datareader (
276  datareader_qos,
277  listener_,
278  DDS_STATUS_MASK_ALL);
279  if (data_reader_ == 0)
280  {
282  "NddsTransport::setup:" \
283  " Unable to create reader. Leaving thread...\n");
284 
285  return -2;
286  }
287 
288  this->validate_transport ();
289 
290  return 0;
291 }
292 
293 long
295  const knowledge::KnowledgeRecords & updates)
296 {
297  long result =
298  prep_send (updates, "NddsTransport::send_data:");
299 
300  // get the maximum quality from the updates
301  uint32_t quality = knowledge::max_quality (updates);
302 
304  unsigned long long cur_clock = context_.get_clock ();
305 
306  DDS_ReturnCode_t rc;
307 
308  std::stringstream buffer;
309  Ndds_Knowledge_Update data;
310 
311  Ndds_Knowledge_Update_initialize (&data);
312 
313  data.clock = cur_clock;
314  data.quality = quality;
315 
316  data.buffer.ensure_length (result, result);
317  data.buffer.from_array ((DDS_Octet *)buffer_.get_ptr (), result);
318  data.clock = cur_clock;
319  data.quality = quality;
320  data.updates = DDS_UnsignedLong (updates.size ());
321  data.originator = new char [id_.size () + 1];
322  strncpy (data.originator, id_.c_str (), id_.size () + 1);
323  data.type = madara::transport::MULTIASSIGN;
324  data.ttl = settings_.get_rebroadcast_ttl ();
325  data.timestamp = time (NULL);
326  data.madara_id = new char [strlen (MADARA_IDENTIFIER) + 1];
327  strncpy (data.madara_id, MADARA_IDENTIFIER, strlen (MADARA_IDENTIFIER) + 1);
328 
330  "NddsTransport::send:" \
331  " sending multiassignment: %d updates, time=%llu, quality=%d\n",
332  data.updates, cur_clock, quality);
333 
334  DDS_InstanceHandle_t handle = update_writer_->register_instance (data);
335  rc = update_writer_->write (data, handle);
336 
337  Ndds_Knowledge_Update_finalize (&data);
338 
339  return rc;
340 }
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
volatile bool is_valid_
Definition: Transport.h:179
QoSTransportSettings settings_
Definition: Transport.h:188
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:815
#define MADARA_IDENTIFIER
Definition: MessageHeader.h:21
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:34
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
int reliability(void) const
Accesses reliability setting.
This class stores variables and their values for use by any entity needing state information in a thr...
static const char * partition_
Definition: NddsTransport.h:76
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
long send_data(const knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.
Provides knowledge logging services to files and terminals.
Definition: GlobalLogger.h:11
Holds basic transport settings.
DDSDomainParticipant * domain_participant_
Definition: NddsTransport.h:79
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:203
NddsTransport(const std::string &id, knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
Ndds_Knowledge_UpdateDataWriter * update_writer_
Definition: NddsTransport.h:85
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
std::string write_domain
All class members are accessible to users for easy setup.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
volatile bool shutting_down_
Definition: Transport.h:180
const std::string id_
host:port identifier of this process
Definition: Transport.h:186
static constexpr struct madara::knowledge::tags::string_t string
uint32_t queue_length
Length of the buffer used to store history of events.
void log(int level, const char *message,...)
Logs a message to all available loggers.
Definition: Logger.cpp:23
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
int setup(void)
Activates this transport.
void close(void)
Closes this transport.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:206
uint32_t reliability
Reliability required of the transport.
Copyright (c) 2015 Carnegie Mellon University.
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:209
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:33
Base class from which all transports must be derived.
Definition: Transport.h:62
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:200
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:191
static const char * topic_names_[]
Definition: NddsTransport.h:75
Container for NDDS callbacks.
Definition: NddsListener.h:23