MADARA  3.1.8
TimedExecutor.cpp
Go to the documentation of this file.
1 #include "TimedExecutor.h"
2 #include "TimedEventThread.h"
3 #include "ace/Thread.h"
5 
6 bool operator< (
9 {
10  return lhs.first > rhs.first;
11 }
12 
13 
15  : thread_info_ (0), threads_ (0), num_threads_ (0)
16 {
17  threads_ref_ = control_plane_.get_ref ("threads");
18  queue_size_ = control_plane_.get_ref ("queued");
19  terminated_ = control_plane_.get_ref ("terminated");
20 }
21 
22 
24 {
25  shutdown ();
26  clear_queue ();
27 }
28 
29 void
31 {
32  MADARA_GUARD_TYPE guard (mutex_);
33 
34  events_.push (new_event);
35 
36  // inform sleeping threads of new queued events
39 }
40 
41 void
43 {
44  MADARA_GUARD_TYPE guard (mutex_);
45 
46  TimedEvent timed_event;
47 
48  // setup times
49  ACE_Time_Value cur_time = ACE_High_Res_Timer::gettimeofday ();
50 
51  // create timed_event
52  timed_event.first = cur_time + new_event.delay;
53  timed_event.second = new Event (new_event);
54 
55  // add the timed event to the event queue
56  events_.push (timed_event);
57 
58  // inform sleeping threads of new queued events
61 }
62 
63 ACE_Time_Value
65 {
66  // obtain current time
67  ACE_Time_Value cur_time = ACE_High_Res_Timer::gettimeofday ();
68 
69  mutex_.MADARA_LOCK_LOCK ();
70 
71  // obtain next event in queue
72  if (events_.size () > 0)
73  {
74  madara_logger_ptr_log (logger::global_logger.get(), logger::LOG_MINOR, "TimedExecutor::remove: " \
75  "events queue size is greater than zero\n");
76 
77  cur_event = events_.top ();
78 
79  // if we've hit the timeout, pop
80  if (cur_time >= cur_event.first)
81  {
82  madara_logger_ptr_log (logger::global_logger.get(), logger::LOG_MAJOR, "TimedExecutor::remove: "
83  "removing top element from events queue\n");
84 
85  events_.pop ();
86  }
87  // otherwise, zero out the cur_event event pointer
88  else
89  {
90  madara_logger_ptr_log (logger::global_logger.get(), logger::LOG_MAJOR, "TimedExecutor::remove: "
91  "earliest event is not ready yet\n");
92 
93  cur_event.second = 0;
94  cur_time = cur_event.first - cur_time;
95  }
96 
97  // inform sleeping threads of new queued events
100 
101  mutex_.MADARA_LOCK_UNLOCK ();
102  }
103  // there are no events in queue
104  else
105  {
106  mutex_.MADARA_LOCK_UNLOCK ();
107 
108  madara_logger_ptr_log (logger::global_logger.get(), logger::LOG_MAJOR, "TimedExecutor::remove: "
109  "Nothing to do. Thread going to sleep\n");
110 
111  WaitSettings wait_settings;
112  wait_settings.poll_frequency = -1;
113  wait_settings.max_wait_time = 5.0;
114 
115 #ifndef _MADARA_NO_KARL_
116  // inform sleeping threads of new queued events
117  control_plane_.wait ("queued > 0 || terminated", wait_settings);
118 #endif // _MADARA_NO_KARL_
119 
120  madara_logger_ptr_log (logger::global_logger.get(), logger::LOG_MAJOR, "TimedExecutor::remove: "
121  "Condition has changed. Thread waking up\n");
122 
123  cur_event.second = 0;
124  cur_time.set (0.0);
125  }
126 
127  return cur_time;
128 }
129 
130 void
132 {
133  if (thread_info_)
134  {
136 
137  // wait for all threads to complete
138  enter_barrier ();
139 
141 
142  delete [] thread_info_;
143 
144  for (unsigned int i = 0; i < num_threads_; ++i)
145  {
146  delete threads_[i];
147  }
148 
149  delete [] threads_;
150 
151  thread_info_ = 0;
152  threads_ = 0;
153  num_threads_ = 0;
154 
156  }
157 }
158 
159 bool
161 {
162  return control_plane_.get (terminated_).to_integer () != 0;
163 }
164 
165 void
167  unsigned int num_threads)
168 {
170 
171  // shutdown any existing threads
172  if (thread_info_)
173  {
174  shutdown ();
175  }
176 
181 
182  for (unsigned int i = 0; i < num_threads; ++i)
183  {
184  thread_info_[i].id = i;
185  thread_info_[i].executor = this;
187  threads_[i] = new TimedEventThread (thread_info_[i]);
188  }
189 }
190 
191 ACE_Time_Value
193 {
194  // get the current time
195  ACE_Time_Value cur_time = ACE_High_Res_Timer::gettimeofday ();
196 
197  // calculate the time left before next event is due
198  cur_time = events_.top ().first - cur_time;
199 
200  // return the time
201  return cur_time;
202 }
203 
204 void
206 {
207  mutex_.MADARA_LOCK_LOCK ();
208 }
209 
210 void
212 {
213  mutex_.MADARA_LOCK_UNLOCK ();
214 }
215 
218  KnowledgeBase & knowledge, const std::string & expression,
219  double delay, double period, int executions)
220 {
221  Event new_event;
222  new_event.executions = 0;
223  new_event.intended_executions = executions;
224  new_event.knowledge = &knowledge;
225  new_event.period.set (period);
226  new_event.delay.set (delay);
227 
228 #ifndef _MADARA_NO_KARL_
229 
230  new_event.root = knowledge.compile (expression).get_root ();
231 
232 #endif // _MADARA_NO_KARL_
233 
234  return new_event;
235 }
236 
237 void
239 {
240  WaitSettings settings;
241  settings.poll_frequency = .5;
242  settings.max_wait_time = -1;
243 
244  madara_logger_ptr_log (logger::global_logger.get(), logger::LOG_MAJOR, "TimedExecutor::enter_barrier: "
245  "Entering barrier\n");
246 
247 #ifndef _MADARA_NO_KARL_
248 
250  "closed = 0 ;>\n"
251  ".i [0->threads) (thread.{.i}.closed => ++closed) ;>\n"
252  "closed == threads", settings);
253 
254 #endif // _MADARA_NO_KARL_
255 
257  "TimedExecutor::enter_barrier: Leaving barrier\n");
258 }
259 
260 void
262 {
263  MADARA_GUARD_TYPE guard (mutex_);
264 
265  while (events_.size () > 0)
266  {
267  TimedEvent cur_event (events_.top ());
268  delete cur_event.second;
269  events_.pop ();
270  }
271 }
272 
275 {
276  MADARA_GUARD_TYPE guard (mutex_);
277 
278  return num_threads_;
279 }
void enter_barrier(void)
Enters the barrier.
double max_wait_time
Maximum time to wait for an expression to become true (in seconds)
Definition: WaitSettings.h:113
MADARA_Export Event fill_event(KnowledgeBase &knowledge, const std::string &expression, double delay=0.0, double period=0.0, int executions=-1)
Fills an event struct.
VariableReference queue_size_
Reference to queue size in the control plane.
MADARA_LOCK_TYPE mutex_
Mutex for local changes.
std::pair< ACE_Time_Value, Event * > TimedEvent
Definition: TimedExecutor.h:69
bool operator<(const madara::knowledge::TimedEvent &lhs, const madara::knowledge::TimedEvent &rhs)
Comparison for higher event priorities.
unsigned int id
thread identifier
TimedExecutor * executor
pointer to TimedExecutor caller, which provides thread-safe callbacks for shutdown information ...
void unlock(void)
Unlocks the event queue.
knowledge::KnowledgeRecord::Integer num_threads(void)
Returns the number of threads.
KnowledgeBase * knowledge
knowledge base for executing the expression (should be same as compiled)
Definition: TimedExecutor.h:53
expression::ComponentNode * get_root(void)
Gets the root of the compiled expression.
MADARA_Export utility::Refcounter< logger::Logger > global_logger
TimedEventThread ** threads_
Timed Event Threads.
expression::ComponentNode * root
expression to be executed (rooted Tree)
Definition: TimedExecutor.h:46
knowledge::KnowledgeRecord::Integer num_threads_
Number of thread.
void lock(void)
Locks the context.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
madara::knowledge::KnowledgeRecord get(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings(false))
Retrieves a knowledge value.
bool clear(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings())
Clears a variable.
Expression container for timed event.
Definition: TimedExecutor.h:36
Thread executing timed events.
A struct that is passed to Timed Event Thread Executors to control job handling, shutdowns, etc.
madara::knowledge::KnowledgeRecord wait(const std::string &expression, const WaitSettings &settings=WaitSettings())
Waits for an expression to be non-zero.
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:32
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:44
void shutdown(void)
Informs threads of a shutdown and waits for threads to exit.
ACE_Time_Value delay
delay before first execution
Definition: TimedExecutor.h:39
TimedExecutor()
Default constructor.
double poll_frequency
Frequency to poll an expression for truth (in seconds)
Definition: WaitSettings.h:108
ACE_Time_Value time_until_next(void)
Returns the time.
PriorityQueue events_
Event queue.
VariableReference terminated_
Reference to terminated condition within control plane.
Integer to_integer(void) const
converts the value to an integer
static constexpr struct madara::knowledge::tags::string_t string
TimedEventThreadInfo * thread_info_
Vector of thread info.
VariableReference threads_ref_
Reference to the number of threads in the control plane.
void launch_threads(unsigned int threads)
Launches threads.
KnowledgeBase * control_plane
For barrier and other information.
int set(const VariableReference &variable, const std::string &value, const EvalSettings &settings=EvalSettings(false, false, true, false, false))
Atomically sets the value of a variable to a string.
Provides functions and classes for the distributed knowledge base.
KnowledgeBase control_plane_
For barrier and other information.
ACE_Time_Value remove(TimedEvent &cur_event)
Removes a timed event from the queue.
void add(const TimedEvent &new_event)
Adds a timed event to the queue.
ACE_Time_Value period
time between executions in seconds
Definition: TimedExecutor.h:42
Encapsulates settings for a wait statement.
Definition: WaitSettings.h:23
unsigned int executions
executions so far
Definition: TimedExecutor.h:56
void clear_queue(void)
Clears the event queue.
CompiledExpression compile(const std::string &expression)
Compiles a KaRL expression into an expression tree.
bool is_shutdown(void)
Checks if the thread pool has been shutdown.
VariableReference get_ref(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings(false))
Atomically returns a reference to the variable.
int intended_executions
intended executions
Definition: TimedExecutor.h:59