source: trunk/Mars/mcore/Queue.h@ 17292

Last change on this file since 17292 was 17292, checked in by tbretz, 11 years ago
Added some tweaks to allow root cint to parse the C++11 stuff.
File size: 7.0 KB
Line 
1#ifndef FACT_Queue
2#define FACT_Queue
3
4#include <list>
5
6#ifndef __CINT__
7#include <thread>
8#include <condition_variable>
9#else
10namespace std
11{
12 class mutex;
13 class thread;
14 class condition_variable;
15 template<class T> class function<T>;
16}
17#endif
18
19
20// The second template argument must support:
21// iterator it = begin(); // get the next element to be processed
22// erase(it); // erase the processed element from the queue
23// push_back(); // add a new element to the queue
24// emplace_back(); // emplace a new element to the queue
25// splice(); // used to efficiently implement post with mutex
26
27using namespace std;
28
29template<class T, class List=std::list<T>>
30class Queue
31{
32 size_t fSize; // Only necessary for before C++11
33
34 List fList;
35
36 std::mutex fMutex; // Mutex needed for the conditional
37 std::condition_variable fCond; // Conditional
38
39 enum state_t
40 {
41 kIdle,
42 kRun,
43 kStop,
44 kAbort,
45 kTrigger,
46 kPrompt
47
48 };
49
50 state_t fState; // Stop signal for the thread
51
52 typedef std::function<bool(const T &)> callback;
53 callback fCallback; // Callback function called by the thread
54
55 std::thread fThread; // Handle to the thread
56
57 void Thread()
58 {
59 std::unique_lock<std::mutex> lock(fMutex);
60
61 // No filling allowed by default (the queue is
62 // always processed until it is empty)
63 size_t allowed = 0;
64
65 while (1)
66 {
67 while (fSize==allowed && fState==kRun)
68 fCond.wait(lock);
69
70 // Check if the State flag has been changed
71 if (fState==kAbort)
72 break;
73
74 if (fState==kStop && fList.empty())
75 break;
76
77 // If thread got just woken up, move back the state to kRun
78 if (fState==kTrigger)
79 fState = kRun;
80
81 // Could have been a fState==kTrigger case
82 if (fList.empty())
83 continue;
84
85 // During the unlocked state, fSize might change.
86 // The current size of the queue needs to be saved.
87 allowed = fSize;
88
89 // get the first entry from the (sorted) list
90 const auto it = fList.begin();
91
92 // Theoretically, we can loose a signal here, but this is
93 // not a problem, because then we detect a non-empty queue
94 lock.unlock();
95
96 // If the first event in the queue could not be processed,
97 // no further processing makes sense until a new event has
98 // been posted (or the number of events in the queue has
99 // changed) [allowed>0], in the case processing was
100 // successfull [allowed==0], the next event will be processed
101 // immediately.
102
103 if (fCallback && fCallback(*it))
104 allowed = 0;
105
106 lock.lock();
107
108 // Whenever an event was successfully processed, allowed
109 // is equal to zero and thus the event will be popped
110 if (allowed>0)
111 continue;
112
113 fList.erase(it);
114 fSize--;
115 }
116
117 fList.clear();
118 fSize = 0;
119
120 fState = kIdle;
121 }
122
123public:
124 Queue(const callback &f, bool startup=true) : fSize(0), fState(kIdle), fCallback(f)
125 {
126 if (startup)
127 start();
128 }
129
130 Queue(const Queue<T,List>& q) : fSize(0), fState(kIdle), fCallback(q.fCallback)
131 {
132 }
133
134 Queue<T,List>& operator = (const Queue<T,List>& q)
135 {
136 fSize = 0;
137 fState = kIdle;
138 fCallback = q.fCallback;
139 return *this;
140 }
141
142 ~Queue()
143 {
144 wait(true);
145 }
146
147 bool start()
148 {
149 const std::lock_guard<std::mutex> lock(fMutex);
150 if (fState!=kIdle)
151 return false;
152
153 fState = kRun;
154 fThread = std::thread(std::bind(&Queue::Thread, this));
155 return true;
156 }
157
158 bool stop()
159 {
160 const std::lock_guard<std::mutex> lock(fMutex);
161 if (fState==kIdle)
162 return false;
163
164 fState = kStop;
165 fCond.notify_one();
166
167 return true;
168 }
169
170 bool abort()
171 {
172 const std::lock_guard<std::mutex> lock(fMutex);
173 if (fState==kIdle)
174 return false;
175
176 fState = kAbort;
177 fCond.notify_one();
178
179 return true;
180 }
181
182 bool wait(bool abrt=false)
183 {
184 {
185 const std::lock_guard<std::mutex> lock(fMutex);
186 if (fState==kIdle || fState==kPrompt)
187 return false;
188
189 if (fState==kRun)
190 {
191 fState = abrt ? kAbort : kStop;
192 fCond.notify_one();
193 }
194 }
195
196 fThread.join();
197 return true;
198 }
199
200 bool enablePromptExecution()
201 {
202 const std::lock_guard<std::mutex> lock(fMutex);
203 if (fState!=kIdle || fSize>0)
204 return false;
205
206 fState = kPrompt;
207 return true;
208 }
209
210 bool disablePromptExecution()
211 {
212 const std::lock_guard<std::mutex> lock(fMutex);
213 if (fState!=kPrompt)
214 return false;
215
216 fState = kIdle;
217 return true;
218 }
219
220 bool setPromptExecution(bool state)
221 {
222 return state ? enablePromptExecution() : disablePromptExecution();
223 }
224
225
226 bool post(const T &val)
227 {
228 const std::lock_guard<std::mutex> lock(fMutex);
229 if (fState==kPrompt)
230 return fCallback(val);
231
232 if (fState==kIdle)
233 return false;
234
235 fList.push_back(val);
236 fSize++;
237
238 fCond.notify_one();
239
240 return true;
241 }
242
243 bool notify()
244 {
245 const std::lock_guard<std::mutex> lock(fMutex);
246 if (fState!=kRun)
247 return false;
248
249 fState = kTrigger;
250 fCond.notify_one();
251
252 return true;
253 }
254
255#ifdef __GXX_EXPERIMENTAL_CXX0X__
256 template<typename... _Args>
257 bool emplace(_Args&&... __args)
258 {
259 const std::lock_guard<std::mutex> lock(fMutex);
260 if (fState==kPrompt)
261 return fCallback(T(__args...));
262
263 if (fState==kIdle)
264 return false;
265
266 fList.emplace_back(__args...);
267 fSize++;
268
269 fCond.notify_one();
270
271 return true;
272 }
273
274 bool post(T &&val) { return emplace(std::move(val)); }
275#endif
276
277#ifdef __GXX_EXPERIMENTAL_CXX0X__
278 bool move(List&& x, typename List::iterator i)
279#else
280 bool move(List& x, typename List::iterator i)
281#endif
282 {
283 const std::lock_guard<std::mutex> lock(fMutex);
284 if (fState==kIdle)
285 return false;
286
287 fList.splice(fList.end(), x, i);
288 fSize++;
289
290 fCond.notify_one();
291
292 return true;
293 }
294
295#ifdef __GXX_EXPERIMENTAL_CXX0X__
296 bool move(List& x, typename List::iterator i) { return move(std::move(x), i); }
297#endif
298
299 size_t size() const
300 {
301 return fSize;
302 }
303
304 bool empty() const
305 {
306 return fSize==0;
307 }
308
309 bool operator<(const Queue& other) const
310 {
311 return fSize < other.fSize;
312 }
313
314};
315
316#endif
Note: See TracBrowser for help on using the repository browser.