MADARA  3.1.8
Threader.cpp
Go to the documentation of this file.
1 #include "Threader.h"
3 
4 #ifdef _MADARA_JAVA_
5 
6 #include "java/JavaThread.h"
7 
8 #endif // MADARA_JAVA
9 
11  : data_ (0), control_(new knowledge::KnowledgeBase ())
12 {
13 
14 }
15 
17  knowledge::KnowledgeBase & data_plane)
18  : data_ (&data_plane), control_(new knowledge::KnowledgeBase ())
19 {
20 }
21 
23 {
24  terminate ();
25  wait ();
26 
27  delete control_;
28 }
29 
30 void
32 {
33  NamedWorkerThreads::iterator found = threads_.find (name);
34 
35  if (found != threads_.end ())
36  {
37  control_->set (name + ".paused", knowledge::KnowledgeRecord::Integer (1));
38  }
39 }
40 
41 void
43 {
44  for (NamedWorkerThreads::iterator i = threads_.begin ();
45  i != threads_.end (); ++i)
46  {
47  control_->set (i->first + ".paused", knowledge::KnowledgeRecord::Integer (1));
48  }
49 }
50 
51 void
53 {
54  NamedWorkerThreads::iterator found = threads_.find (name);
55 
56  if (found != threads_.end ())
57  {
58  control_->set (name + ".paused", knowledge::KnowledgeRecord::Integer (0));
59  }
60 }
61 
62 void
64 {
65  for (NamedWorkerThreads::iterator i = threads_.begin ();
66  i != threads_.end (); ++i)
67  {
68  control_->set (i->first + ".paused", knowledge::KnowledgeRecord::Integer (0));
69  }
70 }
71 
72 void
74  const std::string name, BaseThread * thread, bool paused)
75 {
76  if (name != "" && thread != 0)
77  {
78  WorkerThread * worker = new WorkerThread (name, thread,
79  control_, data_);
80 
81  if (paused)
82  thread->paused = 1;
83 
84  threads_[name] = worker;
85 
86  worker->run ();
87  }
88 }
89 
90 #ifdef _MADARA_JAVA_
91 
92 void
94  const std::string name, jobject thread, bool paused)
95 {
96  if (name != "" && thread != 0)
97  {
98  // attempt to create a Java Thread
99  JavaThread * new_thread = JavaThread::create (thread);
100 
101  // if successful, run the thread
102  if (new_thread)
103  run (name, new_thread, paused);
104  }
105 }
106 
107 
108 void
110  double hertz, const std::string name, jobject thread, bool paused)
111 {
112  if (name != "" && thread != 0)
113  {
114  // attempt to create a Java Thread
115  JavaThread * new_thread = JavaThread::create (thread);
116 
117  // if successful, run the thread
118  if (new_thread)
119  run (hertz, name, new_thread, paused);
120  }
121 }
122 
123 #endif // _MADARA_JAVA_
124 
125 void
127  const std::string name, BaseThread * thread, bool paused)
128 {
129  if (name != "" && thread != 0)
130  {
131  WorkerThread * worker = new WorkerThread (name, thread,
132  control_, data_, hertz);
133 
134  if (paused)
135  thread->paused = 1;
136 
137  threads_[name] = worker;
138 
139  worker->run ();
140  }
141 }
142 
144  knowledge::KnowledgeBase & data_plane)
145 {
146  data_ = &data_plane;
147 }
148 
149 void
151 {
152  NamedWorkerThreads::iterator found = threads_.find (name);
153 
154  if (found != threads_.end ())
155  {
156  control_->set (name + ".terminated", knowledge::KnowledgeRecord::Integer (1));
157  }
158 }
159 
160 void
162 {
163  for (NamedWorkerThreads::iterator i = threads_.begin ();
164  i != threads_.end (); ++i)
165  {
166  control_->set (i->first + ".terminated", knowledge::KnowledgeRecord::Integer (1));
167  }
168 }
169 
170 bool
172  const knowledge::WaitSettings & ws)
173 {
174  bool result (false);
175 
176 #ifndef _MADARA_NO_KARL_
177  NamedWorkerThreads::iterator found = threads_.find (name);
178 
179  if (found != threads_.end ())
180  {
181  std::string condition = found->second->finished_.get_name ();
182 
183  result = this->control_->wait (condition, ws).is_true ();
184 
185  if (result)
186  {
187  delete found->second;
188 
189  threads_.erase (found);
190  }
191  }
192 #endif // _MADARA_NO_KARL_
193 
194  return result;
195 }
196 
197 bool
199 {
200  bool result (false);
201 
202 #ifndef _MADARA_NO_KARL_
203  std::stringstream condition;
204 
205  NamedWorkerThreads::iterator i = threads_.begin ();
206 
207  // create a condition with the first thread's finished state
208  if (i != threads_.end ())
209  {
210  condition << i->second->finished_.get_name ();
211  ++i;
212  }
213 
214  // add each other thread to the condition
215  for (; i != threads_.end (); ++i)
216  {
217  condition << "&&";
218  condition << i->second->finished_.get_name ();
219  }
220 
221  if (threads_.size () > 0)
222  {
223  result = this->control_->wait (condition.str (), ws).is_true ();
224  }
225 
226  if (result)
227  {
228  for (i = threads_.begin (); i != threads_.end (); ++i)
229  {
230  delete i->second;
231  }
232 
233  threads_.clear ();
234  }
235 #endif // _MADARA_NO_KARL_
236 
237  return result;
238 }
bool is_true(void) const
Checks to see if the record is true.
madara::knowledge::containers::Integer paused
thread safe paused flag that may be set by the Threader
Definition: BaseThread.h:107
void terminate(void)
Requests all threads to terminate.
Definition: Threader.cpp:161
void run(void)
Runs the thread once.
Threader()
Default constructor.
Definition: Threader.cpp:10
NamedWorkerThreads threads_
the threads that are still active
Definition: Threader.h:232
void run(const std::string name, BaseThread *thread, bool paused=false)
Starts a new thread and executes the provided user thread once.
Definition: Threader.cpp:73
void pause(void)
Requests all threads to pause.
Definition: Threader.cpp:42
Abstract base class for implementing threads.
Definition: BaseThread.h:38
madara::knowledge::KnowledgeRecord wait(const std::string &expression, const WaitSettings &settings=WaitSettings())
Waits for an expression to be non-zero.
A thread that executes BaseThread logic.
Definition: WorkerThread.h:31
This class provides a distributed knowledge base to users.
Definition: KnowledgeBase.h:44
~Threader()
Destructor.
Definition: Threader.cpp:22
knowledge::KnowledgeBase * control_
The control plane used by threads for termination and pause information.
Definition: Threader.h:227
static constexpr struct madara::knowledge::tags::string_t string
knowledge::KnowledgeBase * data_
The data plane used by threads.
Definition: Threader.h:220
void resume(void)
Requests all threads to resume (unpause)
Definition: Threader.cpp:63
int set(const VariableReference &variable, const std::string &value, const EvalSettings &settings=EvalSettings(false, false, true, false, false))
Atomically sets the value of a variable to a string.
Provides functions and classes for the distributed knowledge base.
Encapsulates settings for a wait statement.
Definition: WaitSettings.h:23
void set_data_plane(knowledge::KnowledgeBase &data_plane)
Sets the data plane for new threads.
Definition: Threader.cpp:143
bool wait(const std::string name, const knowledge::WaitSettings &ws=knowledge::WaitSettings())
Wait for a specific thread to complete.
Definition: Threader.cpp:171
A facade for a user-defined Java thread class.
Definition: JavaThread.h:26
static JavaThread * create(jobject obj)
Creates a JavaThread.
Definition: JavaThread.cpp:121