5 #include "ace/Time_Value.h" 12 std::map <std::string, ACE_INET_Addr> & addresses,
13 ACE_SOCK_Dgram & write_socket,
14 ACE_SOCK_Dgram & read_socket,
18 : settings_ (settings), id_ (id), context_ (0),
19 addresses_ (addresses),
20 write_socket_ (write_socket),
21 read_socket_ (read_socket),
22 send_monitor_ (send_monitor),
23 receive_monitor_ (receive_monitor),
24 packet_scheduler_ (packet_scheduler)
46 #ifndef _MADARA_NO_KARL_ 48 "UdpRegistryServerReadThread::init:" \
49 " setting rules to %s\n",
54 #endif // _MADARA_NO_KARL_ 59 "UdpRegistryServerReadThread::init:" \
60 " no permanent rules were set\n");
67 const char * print_prefix,
84 uint64_t bytes_sent = 0;
85 uint64_t packet_size = (uint64_t)result;
93 " fragmenting %" PRIu64
" byte packet (%" PRIu32
" bytes is max fragment size)\n",
100 for (FragmentMap::iterator i = map.begin (); i != map.end ();
106 for (std::map <std::string, ACE_INET_Addr>::const_iterator addr =
113 " Sending fragment %d\n",
116 int send_attempts = -1;
117 ssize_t actual_sent = -1;
119 while (actual_sent < 0 &&
126 i->second, frag_size, addr->second);
132 bytes_sent = (uint64_t)actual_sent;
136 " Sent packet of size %" PRIu64
"\n",
137 print_prefix, bytes_sent);
141 else if (actual_sent == ECONNRESET)
145 " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
148 else if (actual_sent == EINTR)
152 " Local socket was interrupted during send (EINTR)\n",
155 else if (actual_sent == EWOULDBLOCK)
159 " Send would have blocked (EWOULDBLOCK)\n",
162 else if (actual_sent == ENOTCONN)
166 " Send reports socket is not connected (ENOTCONN)\n",
169 else if (actual_sent == EADDRINUSE)
173 " Send reports the interface is busy (EADDRINUSE)\n",
176 else if (actual_sent == EBADF)
180 " Send socket is invalid (EBADF)\n",
187 " Packet was not sent due to unknown error (%d)\n",
188 print_prefix, (int)actual_sent);
202 " Sent fragments totalling %" PRIu64
" bytes\n",
203 print_prefix, bytes_sent);
209 for (std::map <std::string, ACE_INET_Addr>::const_iterator i =
217 " Sending packet of size %ld\n",
218 print_prefix, result);
220 int send_attempts = -1;
221 ssize_t actual_sent = -1;
223 while (actual_sent < 0 &&
230 (ssize_t)result, i->second);
236 bytes_sent += actual_sent;
241 " Sent packet of size %" PRIu64
"\n",
242 print_prefix, (int)actual_sent);
246 else if (actual_sent == ECONNRESET)
251 " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
254 else if (actual_sent == EINTR)
259 " Local socket was interrupted during send (EINTR)\n",
262 else if (actual_sent == EWOULDBLOCK)
267 " Send would have blocked (EWOULDBLOCK)\n",
270 else if (actual_sent == ENOTCONN)
275 " Send reports socket is not connected (ENOTCONN)\n",
278 else if (actual_sent == EADDRINUSE)
283 " Send reports the interface is busy (EADDRINUSE)\n",
286 else if (actual_sent == EBADF)
291 " Send socket is invalid (EBADF)\n",
299 " Packet was not sent due to unknown error (%d)\n",
300 print_prefix, (int)actual_sent);
308 " Sent %d total bytes via rebroadcast\n",
309 print_prefix, bytes_sent);
314 " Send bandwidth = %" PRIu64
" B/s\n",
332 ACE_Time_Value wait_time (1);
333 ACE_INET_Addr remote;
337 const char * print_prefix =
"UdpRegistryServerReadThread::run";
341 " entering main service loop.\n",
350 " Unable to allocate buffer of size " PRIu32
". Exiting thread.\n",
359 " entering a recv on the socket.\n",
368 if (remote.get_host_addr () != 0)
373 " received a message header of %lld bytes from %s:%d\n",
375 (
long long)bytes_read,
376 remote.get_host_addr (), (int)remote.get_port_number ());
383 " received %lld bytes from unknown host\n",
385 (
long long)bytes_read);
389 std::stringstream remote_host;
390 remote_host << remote.get_host_addr ();
392 remote_host << remote.get_port_number ();
396 #ifndef _MADARA_NO_KARL_
400 remote_host.str ().c_str (), header);
404 if (header->
ttl > 0 && rebroadcast_records.size () > 0 &&
408 header->
ttl = std::min (
411 rebroadcast (print_prefix, header, rebroadcast_records);
422 " wait timeout on new messages. Proceeding to next wait\n",
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
ACE_SOCK_Dgram & read_socket_
The socket we are reading from.
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
Provides scheduler for dropping packets.
std::vector< std::string > hosts
Host information for transports that require it.
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
MADARA_Export utility::Refcounter< logger::Logger > global_logger
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
knowledge::ThreadSafeContext * context_
knowledge context
T * get_ptr(void)
get the underlying pointer
bool no_receiving
if true, never receive over transport
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
This class provides a distributed knowledge base to users.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
::std::map< std::string, KnowledgeRecord > KnowledgeMap
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
UdpRegistryServerReadThread(const TransportSettings &settings, const std::string &id, std::map< std::string, ACE_INET_Addr > &addresses, ACE_SOCK_Dgram &write_socket, ACE_SOCK_Dgram &read_socket, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
ACE_SOCK_Dgram & write_socket_
The socket we are writing to.
void cleanup(void)
Cleanup function called by thread manager.
const std::string id_
host:port identifier of this process
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet to all peers.
madara::utility::ScopedArray< char > buffer_
buffer for sending
std::string on_data_received_logic
logic to be evaluated after every successful update
double slack_time
time to sleep between sends and rebroadcasts
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
const QoSTransportSettings settings_
Transport settings.
uint32_t queue_length
Length of the buffer used to store history of events.
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
void run(void)
The main loop internals for the read thread.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Provides monitoring capability of a transport's bandwidth.
Provides functions and classes for the distributed knowledge base.
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
std::map< std::string, ACE_INET_Addr > & addresses_
internet addresses of our peers
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
int resend_attempts
Maximum number of attempts to resend if transport is busy.
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent's m...