27 return sizeof (uint64_t) * 3
30 +
sizeof (uint32_t) * 4;
36 return sizeof (uint64_t) * 3
39 +
sizeof (uint32_t) * 4;
53 int64_t & buffer_remaining)
56 if ((
size_t)buffer_remaining >=
sizeof (
size))
58 memcpy (&
size, buffer,
sizeof (
size));
60 buffer +=
sizeof (
size);
62 buffer_remaining -=
sizeof (
size);
89 if ((
size_t)buffer_remaining >=
sizeof (
type))
91 memcpy (&
type, buffer,
sizeof (
type));
93 buffer +=
sizeof (
type);
95 buffer_remaining -=
sizeof (
type);
98 if ((
size_t)buffer_remaining >=
sizeof (
updates))
104 buffer_remaining -=
sizeof (
updates);
107 if ((
size_t)buffer_remaining >=
sizeof (
quality))
113 buffer_remaining -=
sizeof (
quality);
116 if ((
size_t)buffer_remaining >=
sizeof (
clock))
120 buffer +=
sizeof (
clock);
122 buffer_remaining -=
sizeof (
clock);
125 if ((
size_t)buffer_remaining >=
sizeof (
timestamp))
134 if (buffer_remaining >= 1)
136 memcpy (&
ttl, buffer, 1);
139 buffer_remaining -= 1;
155 int64_t & buffer_remaining)
158 if ((
size_t)buffer_remaining >=
sizeof (
size))
161 buffer +=
sizeof (
size);
163 buffer_remaining -=
sizeof (
size);
190 if ((
size_t)buffer_remaining >=
sizeof (
type))
193 buffer +=
sizeof (
type);
195 buffer_remaining -=
sizeof (
type);
198 if ((
size_t)buffer_remaining >=
sizeof (
updates))
203 buffer_remaining -=
sizeof (
updates);
206 if ((
size_t)buffer_remaining >=
sizeof (
quality))
211 buffer_remaining -=
sizeof (
quality);
214 if ((
size_t)buffer_remaining >=
sizeof (
clock))
217 buffer +=
sizeof (
clock);
219 buffer_remaining -=
sizeof (
clock);
222 if ((
size_t)buffer_remaining >=
sizeof (
timestamp))
229 if (buffer_remaining >= 1)
231 memcpy (buffer, &
ttl, 1);
234 buffer_remaining -= 1;
272 "transport::defrag:" \
273 " defragging fragment map\n");
275 FragmentMap::iterator i = map.find (0);
280 const char * buffer = header.
read (i->second, buffer_remaining);
283 if (header.
updates <= map.size ())
286 "transport::defrag:" \
287 " the map is large enough to contain updates\n");
293 "transport::defrag:" \
294 " regular message header detected\n");
298 contents_header.
read (buffer, buffer_remaining);
299 size = contents_header.
size;
304 "transport::defrag:" \
305 " regular message header detected\n");
309 contents_header.
read (buffer, buffer_remaining);
310 size = contents_header.
size;
313 result =
new char [
size];
317 buffer_remaining = actual_size;
322 "transport::defrag: copying buffer to lhs\n");
324 memcpy (lhs, buffer, buffer_remaining);
325 buffer += actual_size;
331 for (++i ;i != map.end (); ++i)
334 "transport::defrag: reading header of new fragment\n");
337 buffer = header.
read (i->second, buffer_remaining);
339 buffer_remaining = actual_size;
344 "transport::defrag: copying buffer to lhs\n");
346 memcpy (lhs, buffer, buffer_remaining);
347 buffer += actual_size;
361 for (FragmentMap::iterator i = map.begin (); i != map.end (); ++i)
371 uint32_t queue_length,
387 "transport::add_fragment: adding fragment\n");
389 char * new_fragment = 0;
394 "transport::add_fragment:" \
395 " reading header from buffer\n");
397 header.
read (fragment, buffer_remaining);
402 "transport::add_fragment:" \
403 " reading header from hold fragment\n");
405 new_fragment =
new char [header.
size];
406 memcpy (new_fragment, fragment, header.
size);
412 "transport::add_fragment:" \
413 " searching for originator %s.\n", originator);
415 OriginatorFragmentMap::iterator orig_map = map.find (originator);
416 if (orig_map == map.end ())
419 "transport::add_fragment:" \
420 " creating entry for originator %s.\n", originator);
429 "transport::add_fragment:" \
430 " originator %s exists in fragment map.\n", originator);
433 ClockFragmentMap::iterator clock_found = clock_map.find (clock);
435 if (clock_found != clock_map.end ())
438 if (clock_found->second.find (update_number)
439 == clock_found->second.end ())
441 if (clock_found->second.size () != 0)
444 "transport::add_fragment:" \
445 " %s:%d is being added.\n", originator, update_number);
451 result =
defrag (clock_found->second);
456 "transport::add_fragment:" \
457 " %s:%d is complete. Deleting fragments.\n",
458 originator, update_number);
461 clock_map.erase (clock);
466 "transport::add_fragment:" \
467 " %s:%d is complete. Need more fragments.\n",
468 originator, update_number);
475 "transport::add_fragment:" \
476 " %s:%d has been previously defragged and fragments deleted.\n",
477 originator, update_number);
483 else if (clock_map.size () < queue_length)
487 "transport::add_fragment:" \
488 " %s:%d is being added to queue.\n",
489 originator, update_number);
497 "transport::add_fragment:" \
498 " %s:%d is being added to queue after a deletion\n",
499 originator, update_number);
501 uint32_t oldest = clock_map.begin ()->first;
506 "transport::add_fragment:" \
507 " deleting fragments.\n",
508 originator, update_number);
513 for (FragmentMap::iterator i = fragments.begin ();
514 i != fragments.end (); ++i)
520 "transport::add_fragment:" \
521 " erasing old clock.\n",
522 originator, update_number);
525 clock_map.erase (oldest);
559 if (fragment_size > 0)
563 " fragmenting character stream into %d byte packets.\n",
566 uint32_t data_per_packet = fragment_size -
569 const char * buffer = source;
576 " regular message header detected\n",
580 int64_t buffer_remaining = contents_header.
encoded_size ();
581 contents_header.
read (source, buffer_remaining);
582 header = contents_header;
588 " regular message header detected\n");
591 int64_t buffer_remaining = contents_header.
encoded_size ();
592 contents_header.
read (source, buffer_remaining);
593 header = contents_header;
596 total_size = header.
size;
599 if (header.
size % data_per_packet != 0)
604 " iterating over %d updates.\n",
607 for (uint32_t i = 0; i < header.
updates; ++i)
611 int64_t buffer_remaining;
612 uint64_t actual_data_size;
615 cur_size = (size_t) total_size +
616 FragmentMessageHeader::static_encoded_size ();
618 cur_size = (size_t) fragment_size;
620 buffer_remaining = cur_size;
621 actual_data_size = cur_size -
623 new_frag =
new char [cur_size];
629 " writing %d packet of size %d.\n",
633 header.
size = cur_size;
634 new_frag = header.
write (new_frag, buffer_remaining);
635 memcpy (new_frag, buffer, (
size_t)actual_data_size);
636 buffer += actual_data_size;
637 total_size -= actual_data_size;
648 OriginatorFragmentMap::iterator orig_map = map.find (originator);
650 if (orig_map != map.end ())
653 ClockFragmentMap::iterator clock_found = clock_map.find (clock);
655 if (clock_found != clock_map.end ())
657 uint64_t
size = clock_found->second.size ();
658 FragmentMap::iterator i = clock_found->second.find (0);
660 if (i != clock_found->second.end ())
683 OriginatorFragmentMap::iterator orig_map = map.find (originator);
685 if (orig_map != map.end ())
688 ClockFragmentMap::iterator clock_found = clock_map.find (clock);
690 if (clock_found != clock_map.end ())
692 if (clock_found->second.find (update_number)
693 != clock_found->second.end ())
MADARA_Export char * add_fragment(const char *originator, uint64_t clock, uint32_t update_number, const char *fragment, uint32_t queue_length, OriginatorFragmentMap &map, bool clear=true)
Adds a fragment to an originator fragment map and returns the aggregate message if the message is com...
MADARA_Export char * defrag(FragmentMap &map)
Pieces together a fragment map into a single buffer.
MADARA_Export bool is_complete(const char *originator, uint64_t clock, OriginatorFragmentMap &map)
Breaks a large packet into smaller packets.
MADARA_Export void frag(char *source, uint32_t fragment_size, FragmentMap &map)
Breaks a large packet into smaller packets.
#define FRAGMENTATION_MADARA_ID
MADARA_Export utility::Refcounter< logger::Logger > global_logger
std::map< std::string, ClockFragmentMap > OriginatorFragmentMap
Map of originator to a map of clocks to fragments.
std::map< uint64_t, FragmentMap > ClockFragmentMap
Map of clocks to fragments.
MADARA_Export uint64_t endian_swap(uint64_t value)
Converts a host format uint64_t into big endian.
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
MADARA_Export void delete_fragments(FragmentMap &map)
Deletes fragments within a fragment map and clears the map.
MADARA_Export bool exists(const char *originator, uint64_t clock, uint32_t update_number, OriginatorFragmentMap &map)
Checks if a fragment already exists within a fragment map.
std::map< uint32_t, const char * > FragmentMap
Map of fragment identifiers to fragments.