8 #include "ace/INET_Addr.h" 9 #include "ace/SOCK_Dgram.h" 17 :
Base (id, config, context)
49 "UdpRegistryClient::cleanup:" \
50 " Error closing socket\n");
77 "UdpRegistryClient::setup:" \
78 " adding server %s to registry lookup list\n",
85 unsigned short port = 50100;
89 if (bind_result == -1)
92 "UdpRegistryClient::setup:" \
93 " socket failed to open\n");
97 ACE_Addr endpoint_addr;
98 socket_.get_local_addr (endpoint_addr);
99 endpoint_addr.get_addr ();
102 "UdpRegistryClient::setup:" \
103 " socket bound to %s:%d\n", host.c_str (), (int)port);
107 int rcv_buff_size = 0;
108 int opt_len =
sizeof (int);
110 socket_.get_option (SOL_SOCKET, SO_SNDBUF,
111 (
void *)&send_buff_size, &opt_len);
113 socket_.get_option (SOL_SOCKET, SO_RCVBUF,
114 (
void *)&rcv_buff_size, &opt_len);
117 "UdpRegistryClient::setup:" \
118 " default socket buff size is send=%d, rcv=%d\n",
119 send_buff_size, rcv_buff_size);
121 if (send_buff_size < tar_buff_size)
124 "UdpRegistryClient::setup:" \
125 " setting send buff size to settings.queue_length (%d)\n",
128 socket_.set_option (SOL_SOCKET, SO_SNDBUF,
129 (
void *)&tar_buff_size, opt_len);
131 socket_.get_option (SOL_SOCKET, SO_SNDBUF,
132 (
void *)&send_buff_size, &opt_len);
135 "UdpRegistryClient::setup:" \
136 " current socket buff size is send=%d, rcv=%d\n",
137 send_buff_size, rcv_buff_size);
140 if (rcv_buff_size < tar_buff_size)
143 "UdpRegistryClient::setup:" \
144 " setting rcv buff size to settings.queue_length (%d)\n",
147 socket_.set_option (SOL_SOCKET, SO_RCVBUF,
148 (
void *)&tar_buff_size, opt_len);
150 socket_.get_option (SOL_SOCKET, SO_RCVBUF,
151 (
void *)&rcv_buff_size, &opt_len);
154 "UdpRegistryClient::setup:" \
155 " current socket buff size is send=%d, rcv=%d\n",
156 send_buff_size, rcv_buff_size);
168 "UdpRegistryClientReadThread::setup:" \
174 std::stringstream thread_name;
175 thread_name <<
"read";
192 const char * print_prefix =
"UdpRegistryClient::register";
195 uint64_t bytes_sent = 0;
203 strncpy (header.
domain, this->settings_.write_domain.c_str (),
204 sizeof (header.
domain) - 1);
216 for (std::map <std::string, ACE_INET_Addr>::const_iterator i =
221 " Sending register of size %d to %s\n",
222 print_prefix, (int)result, i->first.c_str ());
224 int send_attempts = -1;
225 ssize_t actual_sent = -1;
227 while (actual_sent < 0 &&
234 (ssize_t)result, i->second);
240 bytes_sent += actual_sent;
244 " Sent register of size %d to %s\n",
245 print_prefix, (int)actual_sent, i->first.c_str ());
249 else if (actual_sent == ECONNRESET)
253 " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
256 else if (actual_sent == EINTR)
260 " Local socket was interrupted during send (EINTR)\n",
263 else if (actual_sent == EWOULDBLOCK)
267 " Send would have blocked (EWOULDBLOCK)\n",
270 else if (actual_sent == ENOTCONN)
274 " Send reports socket is not connected (ENOTCONN)\n",
277 else if (actual_sent == EADDRINUSE)
281 " Send reports the interface is busy (EADDRINUSE)\n",
284 else if (actual_sent == EBADF)
288 " Send socket is invalid (EBADF)\n",
295 " register was not sent due to unknown error (%d)\n",
296 print_prefix, (int)actual_sent);
305 " ERROR: no servers available for sending. Failed to register.\n");
313 const char * print_prefix =
"UdpRegistryClient::send_data";
321 std::vector <std::string>
hosts;
325 for (
size_t i = 0; i < hosts.size (); ++i)
327 clients_[hosts[i]].set (hosts[i].c_str ());
332 result =
prep_send (orig_updates, print_prefix);
336 uint64_t bytes_sent = 0;
337 uint64_t packet_size = (uint64_t)result;
345 " fragmenting %" PRIu64
" byte packet (%" PRIu32
346 " bytes is max fragment size)\n",
353 for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i, ++j)
358 for (std::map <std::string, ACE_INET_Addr>::const_iterator addr =
365 " Sending fragment %d to %s\n",
366 print_prefix, j, addr->first.c_str ());
368 int send_attempts = -1;
369 ssize_t actual_sent = -1;
371 while (actual_sent < 0 &&
378 i->second, frag_size, addr->second);
387 "%s: Send result was %d of %d byte fragment to %s\n",
388 print_prefix, (int)actual_sent, (
int)frag_size, addr->first.c_str ());
392 bytes_sent = (uint64_t)actual_sent;
396 " Sent packet of size %" PRIu64
" to %s\n",
397 print_prefix, bytes_sent, addr->first.c_str ());
401 else if (actual_sent == ECONNRESET)
405 " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
408 else if (actual_sent == EINTR)
412 " Local socket was interrupted during send (EINTR)\n",
415 else if (actual_sent == EWOULDBLOCK)
419 " Send would have blocked (EWOULDBLOCK)\n",
422 else if (actual_sent == ENOTCONN)
426 " Send reports socket is not connected (ENOTCONN)\n",
429 else if (actual_sent == EADDRINUSE)
433 " Send reports the interface is busy (EADDRINUSE)\n",
436 else if (actual_sent == EBADF)
440 " Send socket is invalid (EBADF)\n",
447 " Packet was not sent due to unknown error (%d)\n",
448 print_prefix, (int)actual_sent);
462 " Sent fragments totalling %" PRIu64
" bytes\n",
463 print_prefix, bytes_sent);
469 for (std::map <std::string, ACE_INET_Addr>::const_iterator i =
476 " Sending packet of size %d to %s\n",
477 print_prefix, (int)result, i->first.c_str ());
479 int send_attempts = -1;
480 ssize_t actual_sent = -1;
482 while (actual_sent < 0 &&
489 (ssize_t)result, i->second);
495 bytes_sent += actual_sent;
499 " Sent packet of size %d to %s\n",
500 print_prefix, (int)actual_sent, i->first.c_str ());
504 else if (actual_sent == ECONNRESET)
508 " WARNING: Remote socket disappeared during send (ECONNRESET)\n",
511 else if (actual_sent == EINTR)
515 " Local socket was interrupted during send (EINTR)\n",
518 else if (actual_sent == EWOULDBLOCK)
522 " Send would have blocked (EWOULDBLOCK)\n",
525 else if (actual_sent == ENOTCONN)
529 " Send reports socket is not connected (ENOTCONN)\n",
532 else if (actual_sent == EADDRINUSE)
536 " Send reports the interface is busy (EADDRINUSE)\n",
539 else if (actual_sent == EBADF)
543 " Send socket is invalid (EBADF)\n",
550 " Packet was not sent due to unknown error (%d)\n",
551 print_prefix, (int)actual_sent);
558 result = (long) bytes_sent;
562 " Send bandwidth = %" PRIu64
" B/s\n",
Thread for reading knowledge and registry updates through a UDP socket.
QoSTransportSettings settings_
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
MADARA_Export int bind_to_ephemeral_port(ACE_SOCK_Dgram &socket, std::string &host, unsigned short &port, bool increase_until_bound=true)
Binds to an ephemeral port.
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
knowledge::containers::Map endpoints_
int reliability(void) const
Accesses reliability setting.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
void keys(std::vector< std::string > &curkeys) const
Returns the keys within the map.
ACE_SOCK_Dgram socket_
underlying socket for sending and receiving
This class stores variables and their values for use by any entity needing state information in a thr...
void close(void)
Closes this transport.
std::vector< std::string > hosts
Host information for transports that require it.
std::vector< std::string > sync_keys(void)
Syncs the keys from the knowledge base.
std::map< std::string, ACE_INET_Addr > clients_
registry clients
std::map< std::string, ACE_INET_Addr > servers_
registry servers
double read_thread_hertz
number of valid messages allowed to be received per second.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
long send_data(const madara::knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.
T * get_ptr(void)
get the underlying pointer
bool no_receiving
if true, never receive over transport
std::string write_domain
All class members are accessible to users for easy setup.
void set_name(const std::string &var_name, KnowledgeBase &knowledge, bool sync=true)
Sets the variable name that this refers to.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
void send_register(void)
Sends register messages to all servers.
const std::string id_
host:port identifier of this process
double slack_time
time to sleep between sends and rebroadcasts
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
uint32_t read_threads
the number of read threads to start
virtual ~UdpRegistryClient()
Destructor.
uint32_t queue_length
Length of the buffer used to store history of events.
UdpRegistryClient(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
void terminate(const std::string name)
Requests a specific thread to terminate.
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
int setup(void)
all subclasses should call this method at the end of its setup
uint64_t get_clock(void) const
Atomically gets the Lamport clock.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
madara::utility::ScopedArray< char > buffer_
buffer for sending
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Base class from which all transports must be derived.
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
madara::knowledge::ThreadSafeContext & context_
int resend_attempts
Maximum number of attempts to resend if transport is busy.
void use(ThreadSafeContext &original)
Refer to and use another knowledge base's context.
threads::Threader read_threads_
threads for reading knowledge updates