MADARA  3.1.8
TcpTransport.cpp
Go to the documentation of this file.
4 
8 #include "ace/INET_Addr.h"
9 #include "ace/SOCK_Dgram.h"
10 
11 
12 #include <iostream>
13 
16  TransportSettings & config, bool launch_transport)
17  : Base (id, config, context)
18 {
19  // create a reference to the knowledge base for threading
20  knowledge_.use (context);
21 
22  // set the data plane for the read threads
24 
25  if (launch_transport)
26  setup ();
27 }
28 
30 {
31  close ();
32 }
33 
34 void
36 {
37  this->invalidate_transport ();
38 
40 
42 }
43 
44 int
46 {
47  return RELIABLE;
48 }
49 
50 int
52 {
53  return RELIABLE;
54 }
55 
56 int
58 {
59  // call base setup method to initialize certain common variables
60  Base::setup ();
61 
62  if (settings_.hosts.size () > 0)
63  {
64  for (size_t i = 0; i < settings_.hosts.size (); ++i)
65  {
66  addresses_[settings_.hosts[i]].set (settings_.hosts[i].c_str ());
67  }
68 
69  int send_buff_size = 0, tar_buff_size (settings_.queue_length);
70  int rcv_buff_size = 0;
71 
73  "TcpTransport::setup:" \
74  " default socket buff size is send=%d, rcv=%d\n",
75  send_buff_size, rcv_buff_size);
76 
77  if (send_buff_size < tar_buff_size)
78  {
80  "TcpTransport::setup:" \
81  " setting send buff size to settings.queue_length (%d)\n",
82  tar_buff_size);
83 
85  "TcpTransport::setup:" \
86  " current socket buff size is send=%d, rcv=%d\n",
87  send_buff_size, rcv_buff_size);
88  }
89 
90  if (rcv_buff_size < tar_buff_size)
91  {
93  "TcpTransport::setup:" \
94  " setting rcv buff size to settings.queue_length (%d)\n",
95  tar_buff_size);
96 
98  "TcpTransport::setup:" \
99  " current socket buff size is send=%d, rcv=%d\n",
100  send_buff_size, rcv_buff_size);
101  }
102 
103 
104  if (addresses_.size () > 0)
105  {
106 
107  } // if appropriate addresses
108 
109  if (!settings_.no_receiving)
110  {
111  double hertz = settings_.read_thread_hertz;
112  if (hertz < 0.0)
113  {
114  hertz = 0.0;
115  }
116 
118  "TcpTransportReadThread::setup:" \
119  " starting %d threads at %f hertz\n", settings_.read_threads,
120  hertz);
121 
122  for (uint32_t i = 0; i < settings_.read_threads; ++i)
123  {
124  std::stringstream thread_name;
125  thread_name << "read";
126  thread_name << i;
127 
128  read_threads_.run (hertz, thread_name.str (),
132  }
133  }
134  }
135 
136  return this->validate_transport ();
137 }
138 
139 long
141 const madara::knowledge::KnowledgeRecords & orig_updates)
142 {
143  const char * print_prefix = "TcpTransport::send_data";
144 
145  long result (0);
146 
147  if (!settings_.no_sending)
148  {
149  result = prep_send (orig_updates, print_prefix);
150 
151  if (result > 0)
152  {
153  uint64_t bytes_sent = 0;
154  uint64_t packet_size = (uint64_t)result;
155 
156  if (packet_size > settings_.max_fragment_size)
157  {
158  FragmentMap map;
159 
161  "%s:" \
162  " fragmenting %" PRIu64 " byte packet (%" PRIu32 " bytes is max fragment size)\n",
163  print_prefix, packet_size, settings_.max_fragment_size);
164 
165  // fragment the message
167 
168  for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i)
169  {
170  size_t frag_size =
171  (size_t)MessageHeader::get_size (i->second);
172 
173  for (std::map <std::string, ACE_INET_Addr>::const_iterator addr =
174  addresses_.begin (); addr != addresses_.end (); ++addr)
175  {
176  if (addr->first != settings_.hosts[0])
177  {
178  ssize_t actual_sent = -1;
179 
180  // sleep between fragments, if such a slack time is specified
181  if (settings_.slack_time > 0)
183 
185  "%s: Send result was %d of %d byte fragment to %s\n",
186  print_prefix, actual_sent, frag_size, addr->first.c_str ());
187 
188  if (actual_sent > 0)
189  {
190  send_monitor_.add ((uint32_t)actual_sent);
191  bytes_sent += actual_sent;
192  }
193  }
194  }
195  }
196 
198  "%s:" \
199  " Sent fragments totalling %" PRIu64 " bytes\n",
200  print_prefix, bytes_sent);
201 
202  delete_fragments (map);
203  }
204  else
205  {
206  for (std::map <std::string, ACE_INET_Addr>::const_iterator i =
207  addresses_.begin (); i != addresses_.end (); ++i)
208  {
209  if (i->first != settings_.hosts[0])
210  {
211  ssize_t actual_sent = -1;
212 
214  "%s:" \
215  " Sent %d packet to %s\n",
216  print_prefix, packet_size, i->first.c_str ());
217 
218  if (actual_sent > 0)
219  {
220  send_monitor_.add ((uint32_t)actual_sent);
221  bytes_sent += actual_sent;
222  }
223  }
224  }
225  }
226 
227  result = (long)bytes_sent;
228 
230  "%s:" \
231  " Send bandwidth = %" PRIu64 " B/s\n",
232  print_prefix, send_monitor_.get_bytes_per_second ());
233  }
234  }
235 
236  return result;
237 }
QoSTransportSettings settings_
Definition: Transport.h:188
long prep_send(const madara::knowledge::KnowledgeRecords &orig_updates, const char *print_prefix)
Preps a message for sending.
Definition: Transport.cpp:815
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
threads::Threader read_threads_
threads for reading knowledge updates
Definition: TcpTransport.h:78
void invalidate_transport(void)
Invalidates a transport to indicate it is shutting down.
Definition: Transport.inl:34
long send_data(const madara::knowledge::KnowledgeRecords &updates)
Sends a list of knowledge updates to listeners.
int setup(void)
all subclasses should call this method at the end of its setup
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
virtual ~TcpTransport()
Destructor.
int validate_transport(void)
Validates a transport to indicate it is not shutting down.
Definition: Transport.inl:6
knowledge::KnowledgeBase knowledge_
knowledge base for threads to use
Definition: TcpTransport.h:75
This class stores variables and their values for use by any entity needing state information in a thr...
void close(void)
Closes this transport.
std::vector< std::string > hosts
Host information for transports that require it.
std::map< std::string, ACE_INET_Addr > addresses_
Definition: TcpTransport.h:80
double read_thread_hertz
number of valid messages allowed to be received per second.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
BandwidthMonitor receive_monitor_
monitor for receiving bandwidth usage
Definition: Transport.h:203
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:73
int reliability(void) const
Accesses reliability setting.
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
::std::map< std::string, KnowledgeRecord * > KnowledgeRecords
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
const std::string id_
host:port identifier of this process
Definition: Transport.h:186
double slack_time
time to sleep between sends and rebroadcasts
TcpTransport(const std::string &id, madara::knowledge::ThreadSafeContext &context, TransportSettings &config, bool launch_transport)
Constructor.
static constexpr struct madara::knowledge::tags::string_t string
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
uint32_t read_threads
the number of read threads to start
uint32_t queue_length
Length of the buffer used to store history of events.
void terminate(const std::string name)
Requests a specific thread to terminate.
Definition: Threader.cpp:150
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
PacketScheduler packet_scheduler_
scheduler for dropping packets to simulate network issues
Definition: Transport.h:206
madara::utility::ScopedArray< char > buffer_
buffer for sending
Definition: Transport.h:209
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
virtual int setup(void)
all subclasses should call this method at the end of its setup
Definition: Transport.cpp:33
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:143
Base class from which all transports must be derived.
Definition: Transport.h:62
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:171
Thread for reading knowledge updates through a TCP socket (unused but planned)
BandwidthMonitor send_monitor_
monitor for sending bandwidth usage
Definition: Transport.h:200
madara::knowledge::ThreadSafeContext & context_
Definition: Transport.h:191
void use(ThreadSafeContext &original)
Refer to and use another knowledge base&#39;s context.