MADARA  3.1.8
Fragmentation.cpp
Go to the documentation of this file.
1 #include "Fragmentation.h"
2 #include "ReducedMessageHeader.h"
5 
6 #include <algorithm>
7 #include <time.h>
8 
10 : MessageHeader (), update_number (0)
11 {
13  madara_id[7] = 0;
14 
15  originator[0] = 0;
16  domain[0] = 0;
17 }
18 
19 
21 {
22 }
23 
24 uint32_t
26 {
27  return sizeof (uint64_t) * 3 // size, clock, timestamp
30  + sizeof (uint32_t) * 4; // type, updates, quality, update_number
31 }
32 
33 uint32_t
35 {
36  return sizeof (uint64_t) * 3 // size, clock, timestamp
39  + sizeof (uint32_t) * 4; // type, updates, quality, update_number
40 }
41 
42 
43 
44 uint32_t
46 {
47  buffer += 116;
48  return (madara::utility::endian_swap (*(uint32_t *)buffer));
49 }
50 
51 const char *
53  int64_t & buffer_remaining)
54 {
55  // Remove size field from the buffer and update accordingly
56  if ((size_t)buffer_remaining >= sizeof (size))
57  {
58  memcpy (&size, buffer, sizeof (size));
60  buffer += sizeof (size);
61  }
62  buffer_remaining -= sizeof (size);
63 
64  // Remove madara_id field from the buffer and update accordingly
65  if ((size_t)buffer_remaining >= sizeof (char) * MADARA_IDENTIFIER_LENGTH)
66  {
67  strncpy (madara_id, buffer, MADARA_IDENTIFIER_LENGTH);
68  buffer += sizeof (char) * MADARA_IDENTIFIER_LENGTH;
69  }
70  buffer_remaining -= sizeof (char) * MADARA_IDENTIFIER_LENGTH;
71 
72  // Remove domain field from the buffer and update accordingly
73  if ((size_t)buffer_remaining >= sizeof (char) * MADARA_DOMAIN_MAX_LENGTH)
74  {
75  strncpy (domain, buffer, MADARA_DOMAIN_MAX_LENGTH);
76  buffer += sizeof (char) * MADARA_DOMAIN_MAX_LENGTH;
77  }
78  buffer_remaining -= sizeof (char) * MADARA_DOMAIN_MAX_LENGTH;
79 
80  // Remove originator from the buffer and update accordingly
81  if ((size_t)buffer_remaining >= sizeof (char) * MAX_ORIGINATOR_LENGTH)
82  {
83  strncpy (originator, buffer, MAX_ORIGINATOR_LENGTH);
84  buffer += sizeof (char) * MAX_ORIGINATOR_LENGTH;
85  }
86  buffer_remaining -= sizeof (char) * MAX_ORIGINATOR_LENGTH;
87 
88  // Remove type field from the buffer and update accordingly
89  if ((size_t)buffer_remaining >= sizeof (type))
90  {
91  memcpy (&type, buffer, sizeof (type));
93  buffer += sizeof (type);
94  }
95  buffer_remaining -= sizeof (type);
96 
97  // Remove updates field from the buffer and update accordingly
98  if ((size_t)buffer_remaining >= sizeof (updates))
99  {
100  memcpy (&updates, buffer, sizeof (updates));
102  buffer += sizeof (updates);
103  }
104  buffer_remaining -= sizeof (updates);
105 
106  // Remove quality field from the buffer and update accordingly
107  if ((size_t)buffer_remaining >= sizeof (quality))
108  {
109  memcpy (&quality, buffer, sizeof (quality));
111  buffer += sizeof (quality);
112  }
113  buffer_remaining -= sizeof (quality);
114 
115  // Remove clock field from the buffer and update accordingly
116  if ((size_t)buffer_remaining >= sizeof (clock))
117  {
118  memcpy (&clock, buffer, sizeof (clock));
120  buffer += sizeof (clock);
121  }
122  buffer_remaining -= sizeof (clock);
123 
124  // Remove timestamp field from the buffer and update accordingly
125  if ((size_t)buffer_remaining >= sizeof (timestamp))
126  {
127  memcpy (&timestamp, buffer, sizeof (timestamp));
129  buffer += sizeof (timestamp);
130  }
131  buffer_remaining -= sizeof (timestamp);
132 
133  // Remove the time to live field from the buffer
134  if (buffer_remaining >= 1)
135  {
136  memcpy (&ttl, buffer, 1);
137  buffer += 1;
138  }
139  buffer_remaining -= 1;
140 
141  // Remove updates field from the buffer and update accordingly
142  if ((size_t)buffer_remaining >= sizeof (update_number))
143  {
144  memcpy (&update_number, buffer, sizeof (update_number));
146  buffer += sizeof (update_number);
147  }
148  buffer_remaining -= sizeof (update_number);
149 
150  return buffer;
151 }
152 
153 char *
155  int64_t & buffer_remaining)
156 {
157  // Write size field from the buffer and update accordingly
158  if ((size_t)buffer_remaining >= sizeof (size))
159  {
160  *(uint64_t *) buffer = madara::utility::endian_swap (size);
161  buffer += sizeof (size);
162  }
163  buffer_remaining -= sizeof (size);
164 
165  // Write madara_id field from the buffer and update accordingly
166  if ((size_t)buffer_remaining >= sizeof (char) * MADARA_IDENTIFIER_LENGTH)
167  {
168  strncpy (buffer, madara_id, MADARA_IDENTIFIER_LENGTH);
169  buffer += sizeof (char) * MADARA_IDENTIFIER_LENGTH;
170  }
171  buffer_remaining -= sizeof (char) * MADARA_IDENTIFIER_LENGTH;
172 
173  // Write domain field from the buffer and update accordingly
174  if ((size_t)buffer_remaining >= sizeof (char) * MADARA_DOMAIN_MAX_LENGTH)
175  {
176  strncpy (buffer, domain, MADARA_DOMAIN_MAX_LENGTH);
177  buffer += sizeof (char) * MADARA_DOMAIN_MAX_LENGTH;
178  }
179  buffer_remaining -= sizeof (char) * MADARA_DOMAIN_MAX_LENGTH;
180 
181  // Write originator from the buffer and update accordingly
182  if ((size_t)buffer_remaining >= sizeof (char) * MAX_ORIGINATOR_LENGTH)
183  {
184  strncpy (buffer, originator, MAX_ORIGINATOR_LENGTH);
185  buffer += sizeof (char) * MAX_ORIGINATOR_LENGTH;
186  }
187  buffer_remaining -= sizeof (char) * MAX_ORIGINATOR_LENGTH;
188 
189  // Write type field from the buffer and update accordingly
190  if ((size_t)buffer_remaining >= sizeof (type))
191  {
192  *(uint32_t *) buffer = madara::utility::endian_swap (type);
193  buffer += sizeof (type);
194  }
195  buffer_remaining -= sizeof (type);
196 
197  // Write updates field from the buffer and update accordingly
198  if ((size_t)buffer_remaining >= sizeof (updates))
199  {
200  *(uint32_t *) buffer = madara::utility::endian_swap (updates);
201  buffer += sizeof (updates);
202  }
203  buffer_remaining -= sizeof (updates);
204 
205  // Write quality field from the buffer and update accordingly
206  if ((size_t)buffer_remaining >= sizeof (quality))
207  {
208  *(uint32_t *) buffer = madara::utility::endian_swap (quality);
209  buffer += sizeof (quality);
210  }
211  buffer_remaining -= sizeof (quality);
212 
213  // Write clock field from the buffer and update accordingly
214  if ((size_t)buffer_remaining >= sizeof (clock))
215  {
216  *(uint64_t *) buffer = madara::utility::endian_swap (clock);
217  buffer += sizeof (clock);
218  }
219  buffer_remaining -= sizeof (clock);
220 
221  // Write timestamp field from the buffer and update accordingly
222  if ((size_t)buffer_remaining >= sizeof (timestamp))
223  {
224  *(uint64_t *) buffer = madara::utility::endian_swap (timestamp);
225  buffer += sizeof (timestamp);
226  }
227  buffer_remaining -= sizeof (timestamp);
228 
229  if (buffer_remaining >= 1)
230  {
231  memcpy (buffer, &ttl, 1);
232  buffer += 1;
233  }
234  buffer_remaining -= 1;
235 
236  // Write updates field from the buffer and update accordingly
237  if ((size_t)buffer_remaining >= sizeof (update_number))
238  {
239  *(uint32_t *) buffer = madara::utility::endian_swap (update_number);
240  buffer += sizeof (update_number);
241  }
242  buffer_remaining -= sizeof (update_number);
243 
244  return buffer;
245 }
246 
247 bool
249  const MessageHeader & other)
250 {
251  const FragmentMessageHeader * rhs = dynamic_cast <
252  const FragmentMessageHeader *> (&other);
253 
254  return size == rhs->size &&
255  type == rhs->type &&
256  updates == rhs->updates &&
257  update_number == rhs->update_number &&
258  quality == rhs->quality &&
259  clock == rhs->clock &&
260  timestamp == rhs->timestamp &&
261  strncmp (madara_id, rhs->madara_id, MADARA_IDENTIFIER_LENGTH) == 0 &&
262  strncmp (domain, rhs->domain, MADARA_DOMAIN_MAX_LENGTH) == 0 &&
263  strncmp (originator, rhs->originator, MAX_ORIGINATOR_LENGTH) == 0;
264 }
265 
266 
268 {
269  char * result = 0;
270 
272  "transport::defrag:" \
273  " defragging fragment map\n");
274 
275  FragmentMap::iterator i = map.find (0);
276  if (i != map.end ())
277  {
278  FragmentMessageHeader header;
279  int64_t buffer_remaining (header.encoded_size ());
280  const char * buffer = header.read (i->second, buffer_remaining);
281 
282  // do we have enough updates to defragment?
283  if (header.updates <= map.size ())
284  {
286  "transport::defrag:" \
287  " the map is large enough to contain updates\n");
288 
289  int64_t size = 0;
291  {
293  "transport::defrag:" \
294  " regular message header detected\n");
295 
296  MessageHeader contents_header;
297  buffer_remaining = contents_header.encoded_size ();
298  contents_header.read (buffer, buffer_remaining);
299  size = contents_header.size;
300  }
302  {
304  "transport::defrag:" \
305  " regular message header detected\n");
306 
307  ReducedMessageHeader contents_header;
308  buffer_remaining = contents_header.encoded_size ();
309  contents_header.read (buffer, buffer_remaining);
310  size = contents_header.size;
311  }
312 
313  result = new char [size];
314  char * lhs = result;
315 
316  uint32_t actual_size = header.size - header.encoded_size ();
317  buffer_remaining = actual_size;
318 
319  if (size >= 0)
320  {
322  "transport::defrag: copying buffer to lhs\n");
323 
324  memcpy (lhs, buffer, buffer_remaining);
325  buffer += actual_size;
326  lhs += actual_size;
327  size -= actual_size;
328  }
329 
330  // if so, iterate over the fragments and copy the contents
331  for (++i ;i != map.end (); ++i)
332  {
334  "transport::defrag: reading header of new fragment\n");
335 
336  buffer_remaining = header.encoded_size ();
337  buffer = header.read (i->second, buffer_remaining);
338  actual_size = header.size - header.encoded_size ();
339  buffer_remaining = actual_size;
340 
341  if (size >= 0)
342  {
344  "transport::defrag: copying buffer to lhs\n");
345 
346  memcpy (lhs, buffer, buffer_remaining);
347  buffer += actual_size;
348  lhs += actual_size;
349  size -= actual_size;
350  }
351  }
352  }
353  }
354 
355  return result;
356 }
357 
358 void
360 {
361  for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i)
362  {
363  delete [] i->second;
364  }
365  map.clear ();
366 }
367 
368 char *
370  uint32_t update_number, const char * fragment,
371  uint32_t queue_length,
372  OriginatorFragmentMap & map, bool clear)
373 {
374  char * result = 0;
375 
387  "transport::add_fragment: adding fragment\n");
388 
389  char * new_fragment = 0;
390  FragmentMessageHeader header;
391  int64_t buffer_remaining = header.encoded_size ();
392 
394  "transport::add_fragment:" \
395  " reading header from buffer\n");
396 
397  header.read (fragment, buffer_remaining);
398 
399  if (header.size > 0)
400  {
402  "transport::add_fragment:" \
403  " reading header from hold fragment\n");
404 
405  new_fragment = new char [header.size];
406  memcpy (new_fragment, fragment, header.size);
407  }
408  else
409  return 0;
410 
412  "transport::add_fragment:" \
413  " searching for originator %s.\n", originator);
414 
415  OriginatorFragmentMap::iterator orig_map = map.find (originator);
416  if (orig_map == map.end ())
417  {
419  "transport::add_fragment:" \
420  " creating entry for originator %s.\n", originator);
421 
422  // originator does not exist (1)
423  map[originator][clock][update_number] = new_fragment;
424  }
425 
426  else
427  {
429  "transport::add_fragment:" \
430  " originator %s exists in fragment map.\n", originator);
431 
432  ClockFragmentMap & clock_map (orig_map->second);
433  ClockFragmentMap::iterator clock_found = clock_map.find (clock);
434 
435  if (clock_found != clock_map.end ())
436  {
437  // we have found the clock entry
438  if (clock_found->second.find (update_number)
439  == clock_found->second.end ())
440  {
441  if (clock_found->second.size () != 0)
442  {
444  "transport::add_fragment:" \
445  " %s:%d is being added.\n", originator, update_number);
446 
447  // the fragment does not exist yet
448  clock_found->second [update_number] = new_fragment;
449 
450  // check for a new buffer
451  result = defrag (clock_found->second);
452 
453  if (result && clear)
454  {
456  "transport::add_fragment:" \
457  " %s:%d is complete. Deleting fragments.\n",
458  originator, update_number);
459 
460  delete_fragments (clock_found->second);
461  clock_map.erase (clock);
462  }
463  else
464  {
466  "transport::add_fragment:" \
467  " %s:%d is complete. Need more fragments.\n",
468  originator, update_number);
469  }
470  }
471  else
472  {
473  // if we get here, the message has already been defragged
475  "transport::add_fragment:" \
476  " %s:%d has been previously defragged and fragments deleted.\n",
477  originator, update_number);
478 
479  }
480  }
481  }
482 
483  else if (clock_map.size () < queue_length)
484  {
485  // if we get here, the message has already been defragged
487  "transport::add_fragment:" \
488  " %s:%d is being added to queue.\n",
489  originator, update_number);
490 
491  // clock queue has not been exhausted (2)
492  clock_map [clock][update_number] = new_fragment;
493  }
494  else
495  {
497  "transport::add_fragment:" \
498  " %s:%d is being added to queue after a deletion\n",
499  originator, update_number);
500 
501  uint32_t oldest = clock_map.begin ()->first;
502 
503  if (oldest < clock)
504  {
506  "transport::add_fragment:" \
507  " deleting fragments.\n",
508  originator, update_number);
509 
510  FragmentMap & fragments = clock_map [oldest];
511 
512  // delete all fragments in the clock entry
513  for (FragmentMap::iterator i = fragments.begin ();
514  i != fragments.end (); ++i)
515  {
516  delete [] i->second;
517  }
518 
520  "transport::add_fragment:" \
521  " erasing old clock.\n",
522  originator, update_number);
523 
524  // erase the oldest clock fragment map
525  clock_map.erase (oldest);
526 
527  // replace it in the logical queue with the new clock fragment
528  clock_map [clock][update_number] = new_fragment;
529  }
530  }
531  }
532 
533  return result;
534 }
535 
536 void
538  MessageHeader & header)
539 {
540  if (this != &header)
541  {
542  clock = header.clock;
543  strncpy (domain, header.domain, MADARA_DOMAIN_MAX_LENGTH);
544  strncpy (originator, header.originator, MAX_ORIGINATOR_LENGTH);
545  quality = header.quality;
546  size = header.size;
547  timestamp = header.timestamp;
548  ttl = header.ttl;
549  type = header.type;
550  updates = header.updates;
551  }
552 }
553 
554 
555 void
556 madara::transport::frag (char * source,
557  uint32_t fragment_size, FragmentMap & map)
558 {
559  if (fragment_size > 0)
560  {
562  "transport::frag:" \
563  " fragmenting character stream into %d byte packets.\n",
564  fragment_size);
565 
566  uint32_t data_per_packet = fragment_size -
568 
569  const char * buffer = source;
570  uint64_t total_size;
571  FragmentMessageHeader header;
573  {
575  "transport::frag:" \
576  " regular message header detected\n",
577  fragment_size);
578 
579  MessageHeader contents_header;
580  int64_t buffer_remaining = contents_header.encoded_size ();
581  contents_header.read (source, buffer_remaining);
582  header = contents_header;
583  }
585  {
587  "transport::frag:" \
588  " regular message header detected\n");
589 
590  ReducedMessageHeader contents_header;
591  int64_t buffer_remaining = contents_header.encoded_size ();
592  contents_header.read (source, buffer_remaining);
593  header = contents_header;
594  }
595 
596  total_size = header.size;
597  header.updates = header.size / data_per_packet;
598 
599  if (header.size % data_per_packet != 0)
600  ++header.updates;
601 
603  "transport::frag:" \
604  " iterating over %d updates.\n",
605  header.updates);
606 
607  for (uint32_t i = 0; i < header.updates; ++i)
608  {
609  char * new_frag;
610  size_t cur_size;
611  int64_t buffer_remaining;
612  uint64_t actual_data_size;
613 
614  if (i == header.updates - 1)
615  cur_size = (size_t) total_size +
616  FragmentMessageHeader::static_encoded_size ();
617  else
618  cur_size = (size_t) fragment_size;
619 
620  buffer_remaining = cur_size;
621  actual_data_size = cur_size -
623  new_frag = new char [cur_size];
624 
625  map[i] = new_frag;
626 
628  "transport::frag:" \
629  " writing %d packet of size %d.\n",
630  i, cur_size);
631 
632  header.update_number = i;
633  header.size = cur_size;
634  new_frag = header.write (new_frag, buffer_remaining);
635  memcpy (new_frag, buffer, (size_t)actual_data_size);
636  buffer += actual_data_size;
637  total_size -= actual_data_size;
638  }
639  }
640 }
641 
642 bool
644  OriginatorFragmentMap & map)
645 {
646  bool result = false;
647 
648  OriginatorFragmentMap::iterator orig_map = map.find (originator);
649 
650  if (orig_map != map.end ())
651  {
652  ClockFragmentMap & clock_map (orig_map->second);
653  ClockFragmentMap::iterator clock_found = clock_map.find (clock);
654 
655  if (clock_found != clock_map.end ())
656  {
657  uint64_t size = clock_found->second.size ();
658  FragmentMap::iterator i = clock_found->second.find (0);
659 
660  if (i != clock_found->second.end ())
661  {
662  if (FragmentMessageHeader::get_updates (i->second) == size)
663  {
664  result = true;
665  }
666  }
667  else if (size == 0)
668  {
669  result = true;
670  }
671  }
672  }
673 
674  return result;
675 }
676 
677 bool
679  uint32_t update_number, OriginatorFragmentMap & map)
680 {
681  bool result = false;
682 
683  OriginatorFragmentMap::iterator orig_map = map.find (originator);
684 
685  if (orig_map != map.end ())
686  {
687  ClockFragmentMap & clock_map (orig_map->second);
688  ClockFragmentMap::iterator clock_found = clock_map.find (clock);
689 
690  if (clock_found != clock_map.end ())
691  {
692  if (clock_found->second.find (update_number)
693  != clock_found->second.end ())
694  {
695  result = true;
696  }
697  }
698  }
699 
700  return result;
701 }
MADARA_Export char * add_fragment(const char *originator, uint64_t clock, uint32_t update_number, const char *fragment, uint32_t queue_length, OriginatorFragmentMap &map, bool clear=true)
Adds a fragment to an originator fragment map and returns the aggregate message if the message is com...
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
MADARA_Export char * defrag(FragmentMap &map)
Pieces together a fragment map into a single buffer.
char madara_id[8]
the identifier of this transport (MADARA_IDENTIFIER)
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a MessageHeader instance from a buffer and updates the amount of buffer room remaining...
MADARA_Export bool is_complete(const char *originator, uint64_t clock, OriginatorFragmentMap &map)
Breaks a large packet into smaller packets.
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
virtual ~FragmentMessageHeader()
Destructor.
uint32_t updates
the number of knowledge variable updates in the message
#define FRAGMENTATION_MADARA_ID
Definition: Fragmentation.h:23
MADARA_Export utility::Refcounter< logger::Logger > global_logger
#define MAX_ORIGINATOR_LENGTH
Definition: MessageHeader.h:27
Defines a fragmentation header which allows for multi-part messages that are only applied once all fr...
Definition: Fragmentation.h:47
uint32_t type
the type of message
static uint32_t get_updates(const char *buffer)
Returns the number of updates indicated in the header.
virtual bool equals(const MessageHeader &other)
Compares the fields of this instance to another instance.
std::map< std::string, ClockFragmentMap > OriginatorFragmentMap
Map of originator to a map of clocks to fragments.
std::map< uint64_t, FragmentMap > ClockFragmentMap
Map of clocks to fragments.
MADARA_Export uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
Definition: Utility.cpp:625
virtual uint32_t encoded_size(void) const
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:32
uint64_t size
the size of this header plus the updates
static uint32_t static_encoded_size(void)
Returns the size of the encoded MessageHeader class, which may be different from sizeof (MessageHeade...
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a MessageHeader instance from a buffer and updates the amount of buffer room remaining...
unsigned char ttl
time to live (number of rebroadcasts to perform after original send
uint64_t timestamp
the timestamp of the sender when the message was generated
void operator=(MessageHeader &header)
Assignment operator for regular message header.
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
char originator[64]
the originator of the message (host:port)
char domain[32]
the domain that this message is intended for
MADARA_Export bool exists(const char *originator, uint64_t clock, uint32_t update_number, OriginatorFragmentMap &map)
Checks if a fragment already exists within a fragment map.
virtual const char * read(const char *buffer, int64_t &buffer_remaining)
Reads a MessageHeader instance from a buffer and updates the amount of buffer room remaining...
static bool message_header_test(const char *buffer)
Tests the buffer for a normal message identifier.
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.
#define MADARA_IDENTIFIER_LENGTH
Definition: MessageHeader.h:20
uint64_t clock
the clock of the sender when the message was generated
Defines a simple, smaller message header of 29 bytes that supports less QoS.
#define MADARA_DOMAIN_MAX_LENGTH
Definition: MessageHeader.h:22
Defines a robust message header which is the default for KaRL messages.
Definition: MessageHeader.h:56
virtual char * write(char *buffer, int64_t &buffer_remaining)
Writes a MessageHeader instance to a buffer and updates the amount of buffer room remaining...
uint32_t quality
the quality of the message sender