MADARA  3.1.8
TcpTransportReadThread.cpp
Go to the documentation of this file.
2 
3 
4 #include <iostream>
5 
7  const TransportSettings & settings,
8  const std::string & id,
9  std::map <std::string, ACE_INET_Addr> & addresses,
10  BandwidthMonitor & send_monitor,
11  BandwidthMonitor & receive_monitor,
12  PacketScheduler & packet_scheduler)
13  : settings_ (settings), id_ (id), context_ (0),
14  addresses_ (addresses),
15  send_monitor_ (send_monitor),
16  receive_monitor_ (receive_monitor),
17  packet_scheduler_ (packet_scheduler)
18 {
19 }
20 
21 void
24 {
25  context_ = &(knowledge.get_context ());
26 
27  // setup the receive buffer
28  if (settings_.queue_length > 0)
29  buffer_ = new char[settings_.queue_length];
30 
31  if (context_)
32  {
33  // check for an on_data_received ruleset
34  if (settings_.on_data_received_logic.length () != 0)
35  {
36 
37 #ifndef _MADARA_NO_KARL_
39  "TcpTransportReadThread::init:" \
40  " setting rules to %s\n",
42 
45 #endif // _MADARA_NO_KARL_
46  }
47  else
48  {
50  "TcpTransportReadThread::init:" \
51  " no permanent rules were set");
52  }
53  }
54 }
55 
56 void
58 const char * print_prefix,
59 MessageHeader * header,
60 const knowledge::KnowledgeMap & records)
61 {
62  int64_t buffer_remaining = (int64_t)settings_.queue_length;
63  char * buffer = buffer_.get_ptr ();
64  int result (0);
65 
66  if (!settings_.no_sending)
67  {
68  result = prep_rebroadcast (*context_, buffer, buffer_remaining,
69  settings_, print_prefix,
70  header, records,
72 
73  if (result > 0)
74  {
75  uint64_t bytes_sent = 0;
76  uint64_t packet_size = (uint64_t)result;
77 
78  if (packet_size > settings_.max_fragment_size)
79  {
80  FragmentMap map;
81 
83  "%s:" \
84  " fragmenting %" PRIu64 " byte packet (%" PRIu32 " bytes is max fragment size)\n",
85  print_prefix, packet_size, settings_.max_fragment_size);
86 
87  // fragment the message
89 
90  for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i)
91  {
92  size_t frag_size =
93  (size_t)MessageHeader::get_size (i->second);
94 
95  for (std::map <std::string, ACE_INET_Addr>::const_iterator addr =
96  addresses_.begin (); addr != addresses_.end (); ++addr)
97  {
98  if (addr->first != settings_.hosts[0])
99  {
100  ssize_t actual_sent = -1;
101 
102  // sleep between fragments, if such a slack time is specified
103  if (settings_.slack_time > 0)
105 
107  "%s:" \
108  " Send result was %d of %d byte fragment to %s\n",
109  print_prefix, actual_sent, frag_size, addr->first.c_str ());
110 
111  if (actual_sent > 0)
112  {
113  send_monitor_.add ((uint32_t)frag_size);
114  bytes_sent += actual_sent;
115  }
116  }
117  }
118  }
119 
121  "%s:" \
122  " Sent fragments totalling %" PRIu64 " bytes\n",
123  print_prefix, bytes_sent);
124 
125  delete_fragments (map);
126  }
127  else
128  {
129  for (std::map <std::string, ACE_INET_Addr>::const_iterator i =
130  addresses_.begin (); i != addresses_.end (); ++i)
131  {
132  if (i->first != settings_.hosts[0])
133  {
134  ssize_t actual_sent = -1;
135 
137  "%s:" \
138  " Sent %d packet to %s\n",
139  print_prefix, packet_size, i->first.c_str ());
140 
141  if (actual_sent > 0)
142  {
143  send_monitor_.add ((uint32_t)actual_sent);
144  bytes_sent += actual_sent;
145  }
146  }
147  }
148 
150  "%s:" \
151  " Sent %d total bytes via rebroadcast\n",
152  print_prefix, bytes_sent);
153  }
154 
156  "%s:" \
157  " Send bandwidth = %" PRIu64 " B/s\n",
158  print_prefix,
160  }
161  }
162 }
163 
164 void
166 {
167 }
168 
169 
170 void
172 {
173  if (!settings_.no_receiving)
174  {
175  ACE_Time_Value wait_time (1);
176  ACE_INET_Addr remote;
177 
178  // allocate a buffer to send
179  char * buffer = buffer_.get_ptr ();
180  const char * print_prefix = "TcpTransportReadThread::run";
181 
183  "%s:" \
184  " entering main service loop.\n",
185  print_prefix);
186 
187  knowledge::KnowledgeMap rebroadcast_records;
188 
189  if (buffer == 0)
190  {
192  "%s:" \
193  " Unable to allocate buffer of size " PRIu32 ". Exiting thread.\n",
194  print_prefix,
196 
197  return;
198  }
199 
201  "%s:" \
202  " entering a recv on the socket.\n",
203  print_prefix);
204 
205  ssize_t bytes_read = 0;
206 
208  "%s:" \
209  " received a message header of %" PRIu64 " bytes from %s:%d\n",
210  print_prefix,
211  bytes_read,
212  remote.get_host_addr (), remote.get_port_number ());
213 
214  if (bytes_read > 0)
215  {
216  MessageHeader * header = 0;
217 
218  std::stringstream remote_host;
219  remote_host << remote.get_host_addr ();
220  remote_host << ":";
221  remote_host << remote.get_port_number ();
222 
223  process_received_update (buffer, bytes_read, id_, *context_,
224  settings_, send_monitor_, receive_monitor_, rebroadcast_records,
225 #ifndef _MADARA_NO_KARL_
227 #endif // _MADARA_NO_KARL_
228  print_prefix,
229  remote_host.str ().c_str (), header);
230 
231  if (header)
232  {
233  if (header->ttl > 0 && rebroadcast_records.size () > 0 &&
235  {
236  --header->ttl;
237  header->ttl = std::min (
238  settings_.get_participant_ttl (), header->ttl);
239 
240  rebroadcast (print_prefix, header, rebroadcast_records);
241  }
242 
243  // delete header
244  delete header;
245  }
246  }
247  else
248  {
250  "%s:" \
251  " wait timeout on new messages. Proceeding to next wait\n",
252  print_prefix);
253  }
254  }
255 }
const std::string id_
host:port identifier of this process
uint64_t get_bytes_per_second(void)
Queries the monitor for the current bandwidth utilization per second over the past window...
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
madara::knowledge::CompiledExpression on_data_received_
data received rules, defined in Transport settings
BandwidthMonitor & send_monitor_
monitor for sending bandwidth usage
TcpTransportReadThread(const TransportSettings &settings, const std::string &id, std::map< std::string, ACE_INET_Addr > &addresses, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, PacketScheduler &packet_scheduler)
Constructor.
void rebroadcast(const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records)
Sends a rebroadcast packet to all peers.
Provides scheduler for dropping packets.
std::vector< std::string > hosts
Host information for transports that require it.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Holds basic transport settings.
#define madara_logger_log(logger, level,...)
Fast version of the madara::logger::log method.
Definition: Logger.h:20
madara::utility::ScopedArray< char > buffer_
buffer for sending
T * get_ptr(void)
get the underlying pointer
Definition: ScopedArray.inl:68
bool no_receiving
if true, never receive over transport
void run(void)
The main loop internals for the read thread.
void cleanup(void)
Cleanup function called by thread manager.
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:44
static uint64_t get_size(const char *buffer)
Returns the size field of the header.
Parses incoming expression strings into a parse tree and generates an expression tree from the parse ...
Definition: Interpreter.h:42
::std::map< std::string, KnowledgeRecord > KnowledgeMap
bool no_sending
if true, never send over transport
void add(uint64_t size)
Adds a message to the monitor.
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
std::string on_data_received_logic
logic to be evaluated after every successful update
double slack_time
time to sleep between sends and rebroadcasts
static constexpr struct madara::knowledge::tags::string_t string
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
int MADARA_Export process_received_update(const char *buffer, uint32_t bytes_read, const std::string &id, knowledge::ThreadSafeContext &context, const QoSTransportSettings &settings, BandwidthMonitor &send_monitor, BandwidthMonitor &receive_monitor, knowledge::KnowledgeMap &rebroadcast_records, knowledge::CompiledExpression &on_data_received, const char *print_prefix, const char *remote_host, MessageHeader *&header)
Processes a received update, updates monitors, fills rebroadcast records according to settings filter...
Definition: Transport.cpp:112
BandwidthMonitor & receive_monitor_
monitor for receiving bandwidth usage
uint32_t queue_length
Length of the buffer used to store history of events.
void init(knowledge::KnowledgeBase &knowledge)
Initializes MADARA context-related items.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
ThreadSafeContext & get_context(void)
Returns the ThreadSafeContext associated with this Knowledge Base.
int MADARA_Export prep_rebroadcast(knowledge::ThreadSafeContext &context, char *buffer, int64_t &buffer_remaining, const QoSTransportSettings &settings, const char *print_prefix, MessageHeader *header, const knowledge::KnowledgeMap &records, PacketScheduler &packet_scheduler)
Preps a buffer for rebroadcasting records to other agents on the network.
Definition: Transport.cpp:740
Provides monitoring capability of a transport&#39;s bandwidth.
Provides functions and classes for the distributed knowledge base.
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
std::map< std::string, ACE_INET_Addr > & addresses_
internet addresses of our peers
logger::Logger & get_logger(void) const
Gets the logger used for information printing.
knowledge::ThreadSafeContext * context_
knowledge context
PacketScheduler & packet_scheduler_
scheduler for mimicking target network conditions
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:56
uint32_t max_fragment_size
Maximum allowed fragment size for partitioning large messages.
const QoSTransportSettings settings_
Transport settings.
unsigned char get_participant_ttl(void) const
Returns the maximum time to live participation of this transport in rebroadcasting of other agent&#39;s m...