21 "DDS_RETCODE_UNSUPPORTED",
22 "DDS_RETCODE_BAD_PARAMETER",
23 "DDS_RETCODE_PRECONDITION_NOT_MET",
24 "DDS_RETCODE_OUT_OF_RESOURCES",
25 "DDS_RETCODE_NOT_ENABLED",
26 "DDS_RETCODE_IMMUTABLE_POLICY",
27 "DDS_RETCODE_INCONSISTENT_POLICY",
28 "DDS_RETCODE_ALREADY_DELETED",
29 "DDS_RETCODE_TIMEOUT",
30 "DDS_RETCODE_NO_DATA",
31 "DDS_RETCODE_ILLEGAL_OPERATION" };
39 :
madara::transport::
Base (id, config, context),
40 domain_ (0), domain_factory_ (0),
41 domain_participant_ (0), publisher_ (0), subscriber_ (0),
42 datawriter_ (0), datareader_ (0),
43 update_writer_ (0), update_reader_ (0),
118 DDS::ReturnCode_t status;
126 "SpliceDDSTransport::setup:" \
127 " Creating a participant for topic (%s)\n",
131 "SpliceDDSTransport::setup:" \
132 " Participant settings are being read from the OSPL_URI environment" 138 domain_factory_->get_default_participant_qos (
part_qos_);
147 "\nSpliceDDSTransport::setup:" \
148 " splice daemon not running. Try 'ospl start'...\n");
157 topic_qos_.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
159 topic_qos_.resource_limits.max_samples_per_instance =
163 DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
171 "SpliceDDSTransport::setup:" \
172 " Registering type support\n");
177 check_status(status,
"Knowledge::UpdateTypeSupport::register_type");
185 "SpliceDDSTransport::setup:" \
186 " Setting up knowledge domain via topic (%s)\n",
195 "DDS::DomainParticipant::create_topic (KnowledgeUpdate)");
199 check_status(status,
"DDS::DomainParticipant::get_default_publisher_qos");
204 pub_qos_.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
205 pub_qos_.presentation.coherent_access =
true;
206 pub_qos_.presentation.ordered_access =
false;
211 "SpliceDDSTransport::setup:" \
212 " Creating publisher for topic (%s)\n",
219 pub_qos_, NULL, DDS::STATUS_MASK_NONE);
224 check_status(status,
"DDS::DomainParticipant::get_default_subscriber_qos");
229 sub_qos_.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
230 sub_qos_.presentation.coherent_access =
true;
231 sub_qos_.presentation.ordered_access =
false;
235 "SpliceDDSTransport::setup:" \
236 " Creating subscriber for topic (%s)\n",
243 sub_qos_, NULL, DDS::STATUS_MASK_NONE);
249 "SpliceDDSTransport::setup:" \
250 " pub or sub could not be created. Try 'ospl stop; ospl start'...\n");
262 "SpliceDDSTransport::setup:" \
263 " Enabling reliable transport for (%s) datawriters\n",
272 DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
277 "SpliceDDSTransport::setup:" \
278 " Enabling unreliable transport for (%s) datawriters\n",
283 "SpliceDDSTransport::setup:" \
284 " Creating datawriter for topic (%s)\n",
290 check_handle(datawriter_,
"DDS::Publisher::create_datawriter (Update)");
291 update_writer_ =
dynamic_cast<Knowledge::UpdateDataWriter_ptr
> (datawriter_.in ());
306 check_status(status,
"DDS::Subscriber::get_default_datareader_qos");
313 "SpliceDDSTransport::setup:" \
314 " Enabling reliable transport for (%s) datareaders\n",
321 DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
329 "SpliceDDSTransport::setup:" \
330 " Enabling unreliable transport for (%s) datareaders\n",
335 "SpliceDDSTransport::setup:" \
336 " Creating datareader for topic (%s)\n",
349 check_handle(datareader_,
"DDS::Subscriber::create_datareader (Update)");
350 update_reader_ =
dynamic_cast<Knowledge::UpdateDataReader_ptr
>(datareader_.in ());
362 "UdpTransportReadThread::setup:" \
368 std::stringstream thread_name;
369 thread_name <<
"read";
390 result =
prep_send (updates,
"SpliceDDSTransport::send_data:");
398 DDS::ReturnCode_t dds_result;
399 DDS::InstanceHandle_t handle;
401 Knowledge::Update data;
403 data.buffer = Knowledge::seq_oct (result, result, (
unsigned char *)
buffer_.
get_ptr ());
404 data.clock = cur_clock;
405 data.quality = quality;
406 data.updates = DDS::ULong (updates.size ());
407 data.originator = DDS::string_dup (
id_.c_str ());
410 data.timestamp = time (NULL);
414 "SpliceDDSTransport::send:" \
415 " sending multiassignment: %d updates, time=llu, quality=%d\n",
416 data.updates, cur_clock, quality);
420 result = (long)dds_result;
434 "SpliceDDSTransport::check_handle:" \
435 " error in %s: Creation failed: invalid handle\n", info);
446 if ((status == DDS::RETCODE_OK) || (status == DDS::RETCODE_NO_DATA))
450 "SpliceDDSTransport::check_status:" \
451 " error in %s: Creation failed: %s\n",
uint32_t max_quality(const KnowledgeRecords &records)
Returns the maximum quality within the records.
QoSTransportSettings settings_
DDS::SubscriberQos sub_qos_
DDS::DataReader_var datareader_
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
DDS::DataWriterQos datawriter_qos_
int reliability(void) const
Accesses reliability setting.
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
This class stores variables and their values for use by any entity needing state information in a thr...
DDS::Publisher_var publisher_
DDS::DomainParticipant_var domain_participant_
Knowledge::UpdateDataReader_var update_reader_
unsigned char get_rebroadcast_ttl(void) const
Gets the time to live for rebroadcasting (number of rebroadcasts per message).
double read_thread_hertz
number of valid messages allowed to be received per second.
Provides knowledge logging services to files and terminals.
Holds basic transport settings.
Knowledge::UpdateDataWriter_var update_writer_
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
DDS::DomainParticipantFactory_var domain_factory_
DDS::PublisherQos pub_qos_
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
~SpliceDDSTransport()
Destructor.
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
static const char * partition_
DDS::Topic_var update_topic_
static const char * ret_code_names[]
long send_data(const knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.
DDS::DataWriter_var datawriter_
threads::Threader read_threads_
threads for reading knowledge updates
T * get_ptr(void)
get the underlying pointer
bool no_receiving
if true, never receive over transport
void check_handle(void *handle, const char *info)
Splice handle checker.
SpliceDDSTransport(const std::string &id, knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
std::string write_domain
All class members are accessible to users for easy setup.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
volatile bool shutting_down_
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
bool no_sending
if true, never send over transport
Knowledge::UpdateDataWriter_var latency_update_writer_
MADARA_Export std::string & dds_topicify(std::string &input)
Changes periods to underscores in compliance with OpenSplice needs.
DDS::DomainParticipantQos part_qos_
DDS::Subscriber_var subscriber_
Knowledge::UpdateTypeSupport update_type_support_
int setup(void)
Activates this transport.
Thread for reading knowledge updates via waitsets.
const std::string id_
host:port identifier of this process
DDS::DataWriter_var latencywriter_
uint32_t read_threads
the number of read threads to start
void close(void)
Closes this transport.
uint32_t queue_length
Length of the buffer used to store history of events.
void terminate(const std::string name)
Requests a specific thread to terminate.
DDS::DataReaderQos datareader_qos_
static const char * topic_names_[]
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
void check_status(DDS::ReturnCode_t status, const char *info)
Splice status checker.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
uint32_t reliability
Reliability required of the transport.
Copyright (c) 2015 Carnegie Mellon University.
madara::utility::ScopedArray< char > buffer_
buffer for sending
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
const char * get_error_name(DDS::ReturnCode_t status)
Returns error name of the specific status.
virtual int setup(void)
all subclasses should call this method at the end of its setup
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Base class from which all transports must be derived.
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
madara::knowledge::ThreadSafeContext & context_
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.