18 :
Base (id, config, context),
19 write_socket_ (ACE_sap_any_cast (ACE_INET_Addr &), PF_INET, 0, 1)
71 int rcv_buff_size = 0;
72 int opt_len =
sizeof (int);
80 (
void *)&send_buff_size, &opt_len);
83 (
void *)&rcv_buff_size, &opt_len);
86 "MulticastTransport::setup:" \
87 " default socket buff size is send=%d, rcv=%d\n",
88 send_buff_size, rcv_buff_size);
90 if (send_buff_size < tar_buff_size)
93 "MulticastTransport::setup:" \
94 " setting send buff size to settings.queue_length (%d)\n",
98 (
void *) &tar_buff_size, opt_len);
101 (
void *)&send_buff_size, &opt_len);
104 "MulticastTransport::setup:" \
105 " current socket buff size is send=%d, rcv=%d\n",
106 send_buff_size, rcv_buff_size);
109 if (rcv_buff_size < tar_buff_size)
112 "MulticastTransport::setup:" \
113 " setting rcv buff size to settings.queue_length (%d)\n",
117 (
void *) &tar_buff_size, opt_len);
120 (
void *)&rcv_buff_size, &opt_len);
123 "MulticastTransport::setup:" \
124 " current socket buff size is send=%d, rcv=%d\n",
125 send_buff_size, rcv_buff_size);
131 for (
unsigned int i = 0; i <
addresses_.size (); ++i)
136 "MulticastTransport::setup:" \
137 " settings address[%d] to %s:%d\n", i,
142 const char * host =
addresses_[0].get_host_addr ();
147 "MulticastTransport::setup:" \
148 " Error subscribing to multicast address %s:%d\n", host, port);
153 "MulticastTransport::setup:" \
154 " Success subscribing to multicast address %s:%d\n", host, port);
157 int rcv_buff_size = 0;
158 int opt_len =
sizeof (int);
162 bare_socket.get_option (SOL_SOCKET, SO_RCVBUF,
163 (
void *)&rcv_buff_size, &opt_len);
166 "MulticastTransport::setup:" \
167 " default socket buff size is send=%d, rcv=%d\n",
168 send_buff_size, rcv_buff_size);
170 if (send_buff_size < tar_buff_size)
173 "MulticastTransport::setup:" \
174 " setting send buff size to settings.queue_length (%d)\n",
177 bare_socket.set_option (SOL_SOCKET, SO_SNDBUF,
178 (
void *)&tar_buff_size, opt_len);
180 bare_socket.get_option (SOL_SOCKET, SO_SNDBUF,
181 (
void *)&send_buff_size, &opt_len);
184 "MulticastTransport::setup:" \
185 " current socket buff size is send=%d, rcv=%d\n",
186 send_buff_size, rcv_buff_size);
189 if (rcv_buff_size < tar_buff_size)
192 "MulticastTransport::setup:" \
193 " setting rcv buff size to settings.queue_length (%d)\n",
196 bare_socket.set_option (SOL_SOCKET, SO_RCVBUF,
197 (
void *)&tar_buff_size, opt_len);
199 bare_socket.get_option (SOL_SOCKET, SO_RCVBUF,
200 (
void *)&rcv_buff_size, &opt_len);
203 "MulticastTransport::setup:" \
204 " current socket buff size is send=%d, rcv=%d\n",
205 send_buff_size, rcv_buff_size);
218 "MulticastTransport::setup:" \
224 std::stringstream thread_name;
225 thread_name <<
"read";
246 const char * print_prefix =
"MulticastTransport::send_data";
250 result =
prep_send (orig_updates, print_prefix);
254 uint64_t bytes_sent = 0;
255 uint64_t packet_size = (uint64_t)result;
263 " fragmenting %" PRIu64
" byte packet (%" PRIu32
" bytes is max fragment size)\n",
270 for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i)
274 " Sending fragment %d\n",
277 int send_attempts = -1;
278 ssize_t actual_sent = -1;
280 while (actual_sent < 0 &&
297 " Sent %d byte fragment\n",
298 print_prefix, (int)actual_sent);
300 bytes_sent += actual_sent;
316 " Sent fragments totalling %" PRIu64
" bytes\n",
317 print_prefix, bytes_sent);
325 " Sending packet of size %ld\n",
326 print_prefix, result);
328 int send_attempts = -1;
329 ssize_t actual_sent = -1;
331 while (actual_sent < 0 &&
344 bytes_sent = (uint64_t)actual_sent;
348 " Sent packet of size %" PRIu64
"\n",
349 print_prefix, bytes_sent);
353 else if (actual_sent == ECONNRESET)
357 " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
360 else if (actual_sent == EINTR)
364 " Local socket was interrupted during send (EINTR)\n",
367 else if (actual_sent == EWOULDBLOCK)
371 " Send would have blocked (EWOULDBLOCK)\n",
374 else if (actual_sent == ENOTCONN)
378 " Send reports socket is not connected (ENOTCONN)\n",
381 else if (actual_sent == EADDRINUSE)
385 " Send reports the interface is busy (EADDRINUSE)\n",
388 else if (actual_sent == EBADF)
392 " Send socket is invalid (EBADF)\n",
399 " Packet was not sent due to unknown error (%d)\n",
400 print_prefix, (int)actual_sent);
408 " Send bandwidth = %" PRIu64
" B/s\n",
411 result = (long) bytes_sent;
virtual ~MulticastTransport()
Destructor.
int setup(void)
Initializes the transport.
QoSTransportSettings settings_
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
ACE_SOCK_Dgram_Mcast read_socket_
The multicast socket we are reading from.
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
threads::Threader read_threads_
threads for reading knowledge updates
MulticastTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
ACE_SOCK_Dgram write_socket_
underlying socket for sending
void close(void)
Closes the transport.
This class stores variables and their values for use by any entity needing state information in a thr...
std::vector< std::string > hosts
Host information for transports that require it.
Thread for reading knowledge updates through a Multicast datagram socket.
double read_thread_hertz
number of valid messages allowed to be received per second.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
int reliability(void) const
Accesses reliability setting.
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
T * get_ptr(void)
get the underlying pointer
bool no_receiving
if true, never receive over transport
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
const std::string id_
host:port identifier of this process
double slack_time
time to sleep between sends and rebroadcasts
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
uint32_t read_threads
the number of read threads to start
uint32_t queue_length
Length of the buffer used to store history of events.
void terminate(const std::string name)
Requests a specific thread to terminate.
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
madara::utility::ScopedArray< char > buffer_
buffer for sending
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
std::vector< ACE_INET_Addr > addresses_
holds all multicast addresses we are sending to
Base class from which all transports must be derived.
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
madara::knowledge::ThreadSafeContext & context_
int resend_attempts
Maximum number of attempts to resend if transport is busy.
long send_data(const madara::knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.