MADARA  3.1.8
ZMQTransport.cpp
Go to the documentation of this file.
1 
5 
11 
12 #include <iostream>
14 #include "ZMQContext.h"
15 
18  TransportSettings & config, bool launch_transport)
19 : Base (id, config, context), write_socket_ (0)
20 {
21  // create a reference to the knowledge base for threading
22  knowledge_.use (context);
23 
24  // set the data plane for the read threads
26 
27  // keep track of references to context so management is automatic
28  zmq_context.add_ref ();
29 
30  if (launch_transport)
31  setup ();
32 
33 }
34 
36 {
37  close ();
38 
39  // destroy context if it has no references anymore
40  zmq_context.rem_ref ();
41 }
42 
43 void
45 {
46  this->invalidate_transport ();
47 
48  if (write_socket_ != 0)
49  {
50  int option = 2000;
51 
52  // if you don't do this, ZMQ waits forever for no reason. Super smart.
53  zmq_setsockopt (write_socket_, ZMQ_LINGER, (void *)&option, sizeof (int));
54 
55  madara::utility::sleep (0.100);
56 
57  zmq_close (write_socket_);
58  }
59 
61  "ZMQTransport::close:" \
62  " calling terminate on read threads\n");
63 
65 
67  "ZMQTransport::close:" \
68  " waiting on read threads\n");
69 
71 
73  "ZMQTransport::close:" \
74  " waiting on read threads\n");
75 
76 }
77 
78 int
80 {
82 }
83 
84 int
86 {
88 }
89 
90 int
92 {
93  // call base setup method to initialize certain common variables
94  Base::setup ();
95 
96  if (settings_.hosts.size () > 0)
97  {
98 
100  "ZMQTransport::setup:" \
101  " setting up write socket\n");
102 
103  write_socket_ = zmq_socket (zmq_context.get_context (), ZMQ_PUB);
104 
106  "ZMQTransport::setup:" \
107  " binding write to %s\n",
108  settings_.hosts[0].c_str ());
109 
110  int bind_result = zmq_bind (write_socket_, settings_.hosts[0].c_str ());
111 
112  if (bind_result != 0)
113  {
115  "ZMQTransport::setup:" \
116  " ERROR: could not bind to %s\n",
117  settings_.hosts[0].c_str ());
119  "ZMQTransport::setup:" \
120  " ERROR: errno = %s\n",
121  zmq_strerror (zmq_errno ()));
122  }
123  else
124  {
126  "ZMQTransport::setup:" \
127  " successfully bound to %s\n",
128  settings_.hosts[0].c_str ());
129  }
130 
131 
132  int send_buff_size = 0;
133  int rcv_buff_size = 0;
134  int buff_size = settings_.queue_length;
135  int timeout = 300;
136  size_t opt_len = sizeof (int);
137 
139  "ZMQTransport::setup:" \
140  " setting send buff size to settings.queue_length (%d)\n",
141  buff_size);
142 
143  int result = zmq_setsockopt (
144  write_socket_, ZMQ_SNDBUF, (void *)&buff_size, opt_len);
145 
146  if (result == 0)
147  {
148  int result = zmq_getsockopt (
149  write_socket_, ZMQ_SNDBUF, (void *)&send_buff_size, &opt_len);
150 
152  "ZMQTransport::setup:" \
153  " successfully set sockopt sendbuf size to %d. Actual %d allocated\n",
154  buff_size, send_buff_size);
155  }
156  else
157  {
159  "ZMQTransport::setup:" \
160  " ERROR: errno = %s\n",
161  zmq_strerror (zmq_errno ()));
162  }
163 
164 
165  result = zmq_setsockopt (
166  write_socket_, ZMQ_SNDTIMEO, (void *)&timeout, opt_len);
167 
168  if (result == 0)
169  {
170  int result = zmq_getsockopt (
171  write_socket_, ZMQ_SNDTIMEO, (void *)&timeout, &opt_len);
172 
174  "ZMQTransport::setup:" \
175  " successfully set send timeout to %d\n",
176  timeout);
177  }
178  else
179  {
181  "ZMQTransport::setup:" \
182  " ERROR: When setting timeout on send, errno = %s\n",
183  zmq_strerror (zmq_errno ()));
184  }
185 
186  if (!settings_.no_receiving)
187  {
188  double hertz = settings_.read_thread_hertz;
189  if (hertz < 0.0)
190  {
191  hertz = 0.0;
192  }
193 
195  "ZMQTransport::setup:" \
196  " starting %d threads at %f hertz\n", settings_.read_threads,
197  hertz);
198 
199  for (uint32_t i = 0; i < settings_.read_threads; ++i)
200  {
201  std::stringstream thread_name;
202  thread_name << "read";
203  thread_name << i;
204 
205  read_threads_.run (hertz, thread_name.str (),
207  settings_, id_, write_socket_,
209  }
210  }
211  }
212 
213  return this->validate_transport ();
214 }
215 
216 long
218  const madara::knowledge::KnowledgeRecords & orig_updates)
219 {
220  long result (0);
221  const char * print_prefix = "ZMQTransport::send_data";
222 
223  if (!settings_.no_sending)
224  {
225  result = prep_send (orig_updates, print_prefix);
226 
227  if (settings_.hosts.size () > 0 && result > 0)
228  {
230  "ZMQTransport::send:" \
231  " sending %d bytes on socket\n", (int)result);
232 
233  //send the prepped buffer over ZeroMQ with timeout of 300ms
234  result = (long) zmq_send (
235  write_socket_, (void *)buffer_.get_ptr (), (size_t)result, 0);
236 
238  "ZMQTransport::send:" \
239  " sent %d bytes on socket\n", (int)result);
240  }
241  }
242 
243  return result;
244 }
QoSTransportSettings settings_
Definition: Transport.h:188
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:815
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:34
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
void close(void)
Closes the transport.
This class stores variables and their values for use by any entity needing state information in a thr...
std::vector< std::string > hosts
Host information for transports that require it.
double read_thread_hertz
number of valid messages allowed to be received per second.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
void * write_socket_
underlying socket for sending
Definition: ZMQTransport.h:101
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:203
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:73
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
bool no_sending
if true, never send over transport
int reliability(void) const
Accesses reliability setting.
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
Definition: ZMQTransport.h:95
const std::string id_
host:port identifier of this process
Definition: Transport.h:186
Thread for reading knowledge updates through a ZMQ datagram socket.
int setup(void)
Initializes the transport.
static constexpr struct madara::knowledge::tags::string_t string
uint32_t read_threads
the number of read threads to start
virtual ~ZMQTransport()
Destructor.
threads::Threader read_threads_
threads for reading knowledge updates
Definition: ZMQTransport.h:98
uint32_t queue_length
Length of the buffer used to store history of events.
void terminate(const std::string name)
Requests a specific thread to terminate.
Definition: Threader.cpp:150
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:206
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:209
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:33
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:143
Base class from which all transports must be derived.
Definition: Transport.h:62
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:171
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:200
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:191
void use(ThreadSafeContext &original)
Refer to and use another knowledge base&#39;s context.
ZMQTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
long send_data(const madara::knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.