5 #include "ace/Time_Value.h" 13 const ACE_INET_Addr & address,
14 ACE_SOCK_Dgram_Bcast & socket,
18 : settings_ (settings), id_ (id), context_ (0),
19 address_ (address.get_port_number ()),
20 write_socket_ (socket),
21 send_monitor_ (send_monitor),
22 receive_monitor_ (receive_monitor),
23 packet_scheduler_ (packet_scheduler)
26 ACE_INET_Addr local (address.get_port_number ());
33 "BroadcastTransportReadThread::constructor:" \
34 " Error subscribing to broadcast address %s:%d\n",
40 "BroadcastTransportReadThread::constructor:" \
41 " Success subscribing to broadcast address %s:%d\n",
45 int rcv_buff_size = 0;
46 int opt_len =
sizeof (int);
50 bare_socket.get_option (SOL_SOCKET, SO_RCVBUF,
51 (
void *)&rcv_buff_size, &opt_len);
54 "BroadcastTransportReadThread::constructor:" \
55 " default socket buff size is send=%d, rcv=%d\n",
56 send_buff_size, rcv_buff_size);
58 if (send_buff_size < tar_buff_size)
61 "BroadcastTransportReadThread::constructor:" \
62 " setting send buff size to settings.queue_length (%d)\n",
65 bare_socket.set_option (SOL_SOCKET, SO_SNDBUF,
66 (
void *)&tar_buff_size, opt_len);
68 bare_socket.get_option (SOL_SOCKET, SO_SNDBUF,
69 (
void *)&send_buff_size, &opt_len);
72 "BroadcastTransportReadThread::constructor:" \
73 " current socket buff size is send=%d, rcv=%d\n",
74 send_buff_size, rcv_buff_size);
77 if (rcv_buff_size < tar_buff_size)
80 "BroadcastTransportReadThread::constructor:" \
81 " setting rcv buff size to settings.queue_length (%d)\n",
84 bare_socket.set_option (SOL_SOCKET, SO_SNDBUF,
85 (
void *)&tar_buff_size, opt_len);
87 bare_socket.get_option (SOL_SOCKET, SO_SNDBUF,
88 (
void *)&rcv_buff_size, &opt_len);
91 "BroadcastTransportReadThread::constructor:" \
92 " current socket buff size is send=%d, rcv=%d\n",
93 send_buff_size, rcv_buff_size);
115 "BroadcastTransportReadThread::init:" \
116 " setting rules to %s\n",
119 #ifndef _MADARA_NO_KARL_ 122 #endif // _MADARA_NO_KARL_ 127 "BroadcastTransportReadThread::init:" \
128 " no permanent rules were set\n");
141 "BroadcastTransportReadThread::cleanup:" \
142 " Error closing broadcast socket\n");
148 const char * print_prefix,
165 ssize_t bytes_sent = 0;
174 " fragmenting %d byte packet (%" PRIu64
" bytes is max fragment size)\n",
181 for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i)
185 " Sending fragment %d\n",
188 int send_attempts = -1;
189 ssize_t actual_sent = -1;
191 while (actual_sent < 0 &&
205 bytes_sent = (uint64_t)actual_sent;
209 " Sent packet of size %" PRIu64
"\n",
210 print_prefix, bytes_sent);
214 else if (actual_sent == ECONNRESET)
218 " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
221 else if (actual_sent == EINTR)
225 " Local socket was interrupted during send (EINTR)\n",
228 else if (actual_sent == EWOULDBLOCK)
232 " Send would have blocked (EWOULDBLOCK)\n",
235 else if (actual_sent == ENOTCONN)
239 " Send reports socket is not connected (ENOTCONN)\n",
242 else if (actual_sent == EADDRINUSE)
246 " Send reports the interface is busy (EADDRINUSE)\n",
249 else if (actual_sent == EBADF)
253 " Send socket is invalid (EBADF)\n",
260 " Packet was not sent due to unknown error (%d)\n",
261 print_prefix, (int)actual_sent);
274 " Sent fragments totalling %" PRIu64
" bytes\n",
285 " Sending packet of size %ld\n",
286 print_prefix, result);
288 int send_attempts = -1;
289 ssize_t actual_sent = -1;
291 while (actual_sent < 0 &&
304 bytes_sent = (uint64_t)actual_sent;
309 " Sent packet of size %" PRIu64
"\n",
310 print_prefix, bytes_sent);
314 else if (actual_sent == ECONNRESET)
319 " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
322 else if (actual_sent == EINTR)
327 " Local socket was interrupted during send (EINTR)\n",
330 else if (actual_sent == EWOULDBLOCK)
335 " Send would have blocked (EWOULDBLOCK)\n",
338 else if (actual_sent == ENOTCONN)
343 " Send reports socket is not connected (ENOTCONN)\n",
346 else if (actual_sent == EADDRINUSE)
351 " Send reports the interface is busy (EADDRINUSE)\n",
354 else if (actual_sent == EBADF)
359 " Send socket is invalid (EBADF)\n",
367 " Packet was not sent due to unknown error (%d)\n",
368 print_prefix, (int)actual_sent);
375 " Send bandwidth = %" PRIu64
" B/s\n",
387 ACE_Time_Value wait_time (1);
388 ACE_INET_Addr remote;
392 const char * print_prefix =
"BroadcastTransportReadThread::run";
396 " entering main service loop.\n",
405 " Unable to allocate buffer of size " PRIu32
". Exiting thread.\n",
414 " entering a recv on the socket.\n",
423 if (remote.get_host_addr () != 0)
428 " received a message header of %lld bytes from %s:%d\n",
430 (
long long)bytes_read,
431 remote.get_host_addr (), (int)remote.get_port_number ());
438 " received %lld bytes from unknown host\n",
440 (
long long)bytes_read);
445 std::stringstream remote_host;
446 remote_host << remote.get_host_addr ();
448 remote_host << remote.get_port_number ();
452 #ifndef _MADARA_NO_KARL_
456 remote_host.str ().c_str (), header);
460 if (header->
ttl > 0 && rebroadcast_records.size () > 0 &&
464 header->
ttl = std::min (
467 rebroadcast (print_prefix, header, rebroadcast_records);
478 " wait timeout on new messages. Proceeding to next wait\n",
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void run(void)
The main loop internals for the read thread.
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
ACE_SOCK_Dgram read_socket_
The broadcast socket we are reading from.
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
madara::utility::ScopedArray< char > buffer_
buffer for sending
const std::string id_
host:port identifier of this process
Provides scheduler for dropping packets.
MADARA_Export utility::Refcounter< logger::Logger > global_logger
void cleanup(void)
Cleanup function called by thread manager.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Holds basic transport settings.
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
const QoSTransportSettings * qos_settings_
pointer to qos_settings (if applicable)
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
T * get_ptr(void)
get the underlying pointer
bool no_receiving
if true, never receive over transport
BroadcastTransportReadThread(const TransportSettings &settings, const std::string &id, const ACE_INET_Addr &address, ACE_SOCK_Dgram_Bcast &socket, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
This class provides a distributed knowledge base to users.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
::std::map< std::string, KnowledgeRecord > KnowledgeMap
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
ACE_SOCK_Dgram_Bcast write_socket_
underlying socket for rebroadcasting
std::string on_data_received_logic
logic to be evaluated after every successful update
double slack_time
time to sleep between sends and rebroadcasts
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
knowledge::ThreadSafeContext * context_
knowledge context
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet.
uint32_t queue_length
Length of the buffer used to store history of events.
Container for quality-of-service settings.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Provides monitoring capability of a transport's bandwidth.
Provides functions and classes for the distributed knowledge base.
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
const TransportSettings & settings_
Transport settings.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
ACE_INET_Addr address_
The broadcast address we are subscribing to.
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
int resend_attempts
Maximum number of attempts to resend if transport is busy.
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent's m...