23 :
madara::transport::
Base (id, config, context),
24 domain_participant_ (0), update_topic_ (0), update_writer_ (0),
65 if (rc != DDS_RETCODE_OK)
68 "NddsTransport::close: unable to delete participant entities\n");
71 rc = DDSDomainParticipantFactory::get_instance()->delete_participant(
73 if (rc != DDS_RETCODE_OK)
76 "NddsTransport::close: unable to delete participant\n");
107 DDS_DomainId_t domain = 0;
110 std::stringstream domainreader;
112 domainreader >> domain;
119 DDS_PARTICIPANT_QOS_DEFAULT,
121 DDS_STATUS_MASK_NONE);
127 "NddsTransport::setup:" \
128 " Unable to start the NDDS transport. Exiting...\n");
133 DDS_TopicQos topic_qos;
138 topic_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
140 topic_qos.resource_limits.max_samples_per_instance =
143 topic_qos.destination_order.kind =
144 DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
150 rc = Ndds_Knowledge_UpdateTypeSupport::register_type (
152 Ndds_Knowledge_UpdateTypeSupport::get_type_name());
154 if (rc != DDS_RETCODE_OK)
157 "NddsTransport::setup:" \
158 " Unable to register the knowledge update data type. Exiting...\n");
166 Ndds_Knowledge_UpdateTypeSupport::get_type_name(),
169 DDS_STATUS_MASK_NONE);
173 "NddsTransport::setup:" \
174 " Unable to create topic. Exiting...\n");
179 DDS_PublisherQos pub_qos;
185 pub_qos.presentation.access_scope = DDS_TOPIC_PRESENTATION_QOS;
186 pub_qos.presentation.coherent_access =
true;
187 pub_qos.presentation.ordered_access =
false;
194 DDS_STATUS_MASK_NONE);
198 "NddsTransport::setup:" \
199 " Unable to create publisher_. Exiting...\n");
203 DDS_DataWriterQos datawriter_qos;
204 publisher_->get_default_datawriter_qos (datawriter_qos);
205 publisher_->copy_from_topic_qos (datawriter_qos, topic_qos);
212 DDS_STATUS_MASK_NONE);
216 "NddsTransport::setup:" \
217 " Unable to create topic data writer. Exiting...\n");
227 "NddsTransport::setup:" \
228 " Unable to create narrowed data writer. Exiting...\n");
233 DDS_SubscriberQos sub_qos;
237 sub_qos.presentation.access_scope = DDS_TOPIC_PRESENTATION_QOS;
238 sub_qos.presentation.coherent_access =
true;
239 sub_qos.presentation.ordered_access =
false;
246 DDS_STATUS_MASK_NONE);
250 "NddsTransport::setup:" \
251 " Unable to create subscriber_. Exiting...\n");
257 DDS_DataReaderQos datareader_qos;
258 subscriber_->get_default_datareader_qos (datareader_qos);
259 subscriber_->copy_from_topic_qos (datareader_qos, topic_qos);
263 datareader_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
266 datareader_qos.destination_order.kind =
267 DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
278 DDS_STATUS_MASK_ALL);
282 "NddsTransport::setup:" \
283 " Unable to create reader. Leaving thread...\n");
298 prep_send (updates,
"NddsTransport::send_data:");
308 std::stringstream buffer;
309 Ndds_Knowledge_Update data;
311 Ndds_Knowledge_Update_initialize (&data);
313 data.clock = cur_clock;
314 data.quality = quality;
316 data.buffer.ensure_length (result, result);
318 data.clock = cur_clock;
319 data.quality = quality;
320 data.updates = DDS_UnsignedLong (updates.size ());
321 data.originator =
new char [
id_.size () + 1];
322 strncpy (data.originator,
id_.c_str (),
id_.size () + 1);
325 data.timestamp = time (NULL);
330 "NddsTransport::send:" \
331 " sending multiassignment: %d updates, time=%llu, quality=%d\n",
332 data.updates, cur_clock, quality);
334 DDS_InstanceHandle_t handle =
update_writer_->register_instance (data);
337 Ndds_Knowledge_Update_finalize (&data);
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
QoSTransportSettings settings_
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
int reliability(void) const
Accesses reliability setting.
This class stores variables and their values for use by any entity needing state information in a thr...
static const char * partition_
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
long send_data(const knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.
Provides knowledge logging services to files and terminals.
Holds basic transport settings.
DDSDomainParticipant * domain_participant_
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
NddsTransport(const std::string &id, knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
Ndds_Knowledge_UpdateDataWriter * update_writer_
T * get_ptr(void)
get the underlying pointer
std::string write_domain
All class members are accessible to users for easy setup.
DDSSubscriber * subscriber_
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
volatile bool shutting_down_
DDSDataWriter * data_writer_
const std::string id_
host:port identifier of this process
DDSDataReader * data_reader_
uint32_t queue_length
Length of the buffer used to store history of events.
void log(int level, const char *message,...)
Logs a message to all available loggers.
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
int setup(void)
Activates this transport.
void close(void)
Closes this transport.
~NddsTransport()
Destructor.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
uint32_t reliability
Reliability required of the transport.
Copyright (c) 2015 Carnegie Mellon University.
madara::utility::ScopedArray< char > buffer_
buffer for sending
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Base class from which all transports must be derived.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
madara::knowledge::ThreadSafeContext & context_
DDSPublisher * publisher_
static const char * topic_names_[]
Container for NDDS callbacks.