19 :
Base (id, config, context), write_socket_ (0)
28 zmq_context.add_ref ();
40 zmq_context.rem_ref ();
53 zmq_setsockopt (
write_socket_, ZMQ_LINGER, (
void *)&option,
sizeof (
int));
61 "ZMQTransport::close:" \
62 " calling terminate on read threads\n");
67 "ZMQTransport::close:" \
68 " waiting on read threads\n");
73 "ZMQTransport::close:" \
74 " waiting on read threads\n");
100 "ZMQTransport::setup:" \
101 " setting up write socket\n");
103 write_socket_ = zmq_socket (zmq_context.get_context (), ZMQ_PUB);
106 "ZMQTransport::setup:" \
107 " binding write to %s\n",
110 int bind_result = zmq_bind (write_socket_,
settings_.
hosts[0].c_str ());
112 if (bind_result != 0)
115 "ZMQTransport::setup:" \
116 " ERROR: could not bind to %s\n",
119 "ZMQTransport::setup:" \
120 " ERROR: errno = %s\n",
121 zmq_strerror (zmq_errno ()));
126 "ZMQTransport::setup:" \
127 " successfully bound to %s\n",
132 int send_buff_size = 0;
133 int rcv_buff_size = 0;
136 size_t opt_len =
sizeof (int);
139 "ZMQTransport::setup:" \
140 " setting send buff size to settings.queue_length (%d)\n",
143 int result = zmq_setsockopt (
144 write_socket_, ZMQ_SNDBUF, (
void *)&buff_size, opt_len);
148 int result = zmq_getsockopt (
149 write_socket_, ZMQ_SNDBUF, (
void *)&send_buff_size, &opt_len);
152 "ZMQTransport::setup:" \
153 " successfully set sockopt sendbuf size to %d. Actual %d allocated\n",
154 buff_size, send_buff_size);
159 "ZMQTransport::setup:" \
160 " ERROR: errno = %s\n",
161 zmq_strerror (zmq_errno ()));
165 result = zmq_setsockopt (
166 write_socket_, ZMQ_SNDTIMEO, (
void *)&timeout, opt_len);
170 int result = zmq_getsockopt (
171 write_socket_, ZMQ_SNDTIMEO, (
void *)&timeout, &opt_len);
174 "ZMQTransport::setup:" \
175 " successfully set send timeout to %d\n",
181 "ZMQTransport::setup:" \
182 " ERROR: When setting timeout on send, errno = %s\n",
183 zmq_strerror (zmq_errno ()));
195 "ZMQTransport::setup:" \
201 std::stringstream thread_name;
202 thread_name <<
"read";
221 const char * print_prefix =
"ZMQTransport::send_data";
225 result =
prep_send (orig_updates, print_prefix);
230 "ZMQTransport::send:" \
231 " sending %d bytes on socket\n", (int)result);
234 result = (long) zmq_send (
238 "ZMQTransport::send:" \
239 " sent %d bytes on socket\n", (int)result);
QoSTransportSettings settings_
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
void close(void)
Closes the transport.
This class stores variables and their values for use by any entity needing state information in a thr...
std::vector< std::string > hosts
Host information for transports that require it.
double read_thread_hertz
number of valid messages allowed to be received per second.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Holds basic transport settings.
void * write_socket_
underlying socket for sending
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
T * get_ptr(void)
get the underlying pointer
bool no_receiving
if true, never receive over transport
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
bool no_sending
if true, never send over transport
int reliability(void) const
Accesses reliability setting.
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
const std::string id_
host:port identifier of this process
Thread for reading knowledge updates through a ZMQ datagram socket.
int setup(void)
Initializes the transport.
uint32_t read_threads
the number of read threads to start
virtual ~ZMQTransport()
Destructor.
threads::Threader read_threads_
threads for reading knowledge updates
uint32_t queue_length
Length of the buffer used to store history of events.
void terminate(const std::string name)
Requests a specific thread to terminate.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
madara::utility::ScopedArray< char > buffer_
buffer for sending
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Base class from which all transports must be derived.
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
madara::knowledge::ThreadSafeContext & context_
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.
ZMQTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
long send_data(const madara::knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.