MADARA  3.1.8
WorkerThread.cpp
Go to the documentation of this file.
1 #include "WorkerThread.h"
2 #include "ace/High_Res_Timer.h"
3 #include "ace/OS_NS_sys_time.h"
5 
6 #ifdef _MADARA_JAVA_
7 #include <jni.h>
8 #include "madara_jni.h"
10 #endif
11 
12 #ifdef WIN32
13 
14 #include <process.h>
15 
16 unsigned __stdcall worker_thread_windows_glue (void * param)
17 {
19  static_cast < madara::threads::WorkerThread *> (
20  param);
21  if (caller)
22  {
23  return (unsigned) caller->svc ();
24  }
25  else
26  {
27  return 0;
28  }
29 }
30 
31 #endif
32 
33 
34 #include <iostream>
35 #include <algorithm>
36 
38  : thread_ (0), control_ (0), data_ (0), hertz_ (-1.0)
39 {
40 }
41 
42 
44  const std::string & name,
45  BaseThread * thread,
46  knowledge::KnowledgeBase * control,
48  double hertz)
49  : name_ (name), thread_ (thread), control_ (control), data_ (data),
50  hertz_ (hertz)
51 {
52  if (thread && control)
53  {
54  std::stringstream base_string;
55  base_string << name;
56 
57  thread->name = name;
58  thread->init_control_vars (*control);
59 
61  base_string.str () + ".finished", *control);
63  base_string.str () + ".started", *control);
65  base_string.str () + ".hertz", *control);
66 
67  finished_ = 0;
68  started_ = 0;
70  }
71 }
72 
74 {
75 }
76 
77 void
79 {
80  if (this != &input)
81  {
82  this->name_ = input.name_;
83  this->thread_ = input.thread_;
84  this->control_ = input.control_;
85  this->data_ = input.data_;
86  this->finished_ = input.finished_;
87  this->started_ = input.started_;
88  this->new_hertz_ = input.new_hertz_;
89  this->hertz_ = input.hertz_;
90  }
91 }
92 
93 void
95 {
96  int result;
97 
98 #ifndef WIN32
99  result = this->activate ();
100 #else
101  result = 0;
102  _beginthreadex(NULL, 0, worker_thread_windows_glue, (void*)this, 0, 0);
103 
104 #endif
105 
106  if (result != -1)
107  {
109  "WorkerThread::WorkerThread(%s):" \
110  " thread started (result = %d)\n", name_.c_str (), result);
111  }
112  else
113  {
115  "WorkerThread::WorkerThread(%s):" \
116  " failed to create thread\n", name_.c_str ());
117  }
118 }
119 
120 int
122 {
124  "WorkerThread(%s)::svc:" \
125  " checking thread existence\n", name_.c_str ());
126 
127  if (thread_)
128  {
129  started_ = 1;
130 
131 #ifdef _MADARA_JAVA_
132  // try detaching one more time, just to make sure.
134 #endif
135 
136  thread_->init (*data_);
137 
138  {
139  ACE_Time_Value current = ACE_High_Res_Timer::gettimeofday ();
140  ACE_Time_Value next_epoch, frequency;
141 
142  bool one_shot = true;
143  bool blaster = false;
144 
145  knowledge::VariableReference terminated;
147 
148  terminated = control_->get_ref (name_ + ".terminated");
149  paused = control_->get_ref (name_ + ".paused");
150 
151  // change thread frequency
152  change_frequency (hertz_, current, frequency, next_epoch,
153  one_shot, blaster);
154 
155  while (control_->get (terminated).is_false ())
156  {
158  "WorkerThread(%s)::svc:" \
159  " thread checking for pause\n", name_.c_str ());
160 
161  if (control_->get (paused).is_false ())
162  {
164  "WorkerThread(%s)::svc:" \
165  " thread calling run function\n", name_.c_str ());
166 
167  thread_->run ();
168  }
169 
170  if (one_shot)
171  break;
172 
173  // check for a change in frequency/hertz
174  if (new_hertz_ != hertz_)
175  {
177  current, frequency, next_epoch, one_shot, blaster);
178  }
179 
180  if (!blaster)
181  {
182  current = ACE_High_Res_Timer::gettimeofday ();
183 
185  "WorkerThread(%s)::svc:" \
186  " thread checking for next hertz epoch\n", name_.c_str ());
187 
188  if (current < next_epoch)
189  madara::utility::sleep (next_epoch - current);
190 
192  "WorkerThread(%s)::svc:" \
193  " thread past epoch\n", name_.c_str ());
194 
195  next_epoch += frequency;
196  }
197  } // end while !terminated
198 
200  "WorkerThread(%s)::svc:" \
201  " thread has been terminated\n", name_.c_str ());
202 
203  }
204 
206  "WorkerThread(%s)::svc:" \
207  " calling thread cleanup method\n", name_.c_str ());
208 
209  thread_->cleanup ();
210 
212  "WorkerThread(%s)::svc:" \
213  " deleting thread\n", name_.c_str ());
214 
215  delete thread_;
216 
218  "WorkerThread(%s)::svc:" \
219  " setting finished to 1\n", finished_.get_name ().c_str ());
220 
221  finished_ = 1;
222  }
223  else
224  {
226  "WorkerThread(%s)::svc:" \
227  " thread creation failed\n", name_.c_str ());
228  }
229 
230  return 0;
231 }
WorkerThread()
Default constructor.
void operator=(const WorkerThread &input)
Assignment operator.
This class encapsulates attaching and detaching to a VM.
Definition: Acquire_VM.h:24
knowledge::KnowledgeBase * control_
the control plane to the knowledge base
Definition: WorkerThread.h:108
virtual void init(knowledge::KnowledgeBase &)
Initializes thread with a MADARA context.
Definition: BaseThread.h:59
std::string get_name(void) const
Returns the name of the container.
void set_name(const std::string &var_name, KnowledgeBase &knowledge)
Sets the variable name that this refers to.
Definition: Integer.inl:58
knowledge::KnowledgeBase * data_
the data plane (the knowledge base)
Definition: WorkerThread.h:111
virtual void run(void)=0
Executes the main thread logic.
knowledge::containers::Double new_hertz_
thread safe hertz reference
Definition: WorkerThread.h:128
virtual void init_control_vars(knowledge::KnowledgeBase &control)
Initializes the Java thread implementation&#39;s control plane variables.
Definition: BaseThread.h:86
double hertz_
hertz rate for worker thread executions
Definition: WorkerThread.h:133
MADARA_Export utility::Refcounter< logger::Logger > global_logger
virtual void cleanup(void)
Cleans up any thread residue (usually instances created in init).
Definition: BaseThread.h:76
void run(void)
Runs the thread once.
MADARA_Export double sleep(double sleep_time)
Sleeps for a certain amount of time.
Definition: Utility.cpp:856
Optimized reference to a variable within the knowledge base.
madara::knowledge::KnowledgeRecord get(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings(false))
Retrieves a knowledge value.
void change_frequency(double hertz, ACE_Time_Value &current, ACE_Time_Value &frequency, ACE_Time_Value &next_epoch, bool &one_shot, bool &blaster)
Changes the frequency given a hertz rate.
Abstract base class for implementing threads.
Definition: BaseThread.h:38
#define madara_logger_ptr_log(logger, level,...)
Fast version of the madara::logger::log method for Logger pointers.
Definition: Logger.h:32
A thread that executes BaseThread logic.
Definition: WorkerThread.h:31
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:44
std::string name_
the name of the contained thread
Definition: WorkerThread.h:102
static constexpr struct madara::knowledge::tags::string_t string
knowledge::containers::Integer started_
thread safe start flag that will be sent to the knowledge base on launch of the thread ...
Definition: WorkerThread.h:123
std::string name
The unique name of your thread.
Definition: BaseThread.h:96
knowledge::containers::Integer finished_
thread safe finished flag that will be sent to the knowledge base on completion of the thread ...
Definition: WorkerThread.h:117
int svc(void)
Reads messages from a socket.
BaseThread * thread_
the contained thread
Definition: WorkerThread.h:105
bool is_false(void) const
Checks to see if the record is false.
void set_name(const std::string &var_name, KnowledgeBase &knowledge)
Sets the variable name that this refers to.
Definition: Double.cpp:143
VariableReference get_ref(const std::string &key, const KnowledgeReferenceSettings &settings=KnowledgeReferenceSettings(false))
Atomically returns a reference to the variable.