source: trunk/Mars/mcore/Queue.h@ 17689

Last change on this file since 17689 was 17297, checked in by tbretz, 11 years ago
Added some more tweaks needed by CINT (default constructors, handling of __attribute((__packed__)))
File size: 7.1 KB
Line 
1#ifndef FACT_Queue
2#define FACT_Queue
3
4#include <list>
5
6#ifndef __CINT__
7#include <thread>
8#include <condition_variable>
9#else
10namespace std
11{
12 class mutex;
13 class thread;
14 class condition_variable;
15 template<class T> class function<T>;
16}
17#endif
18
19
20// The second template argument must support:
21// iterator it = begin(); // get the next element to be processed
22// erase(it); // erase the processed element from the queue
23// push_back(); // add a new element to the queue
24// emplace_back(); // emplace a new element to the queue
25// splice(); // used to efficiently implement post with mutex
26
27using namespace std;
28
29template<class T, class List=std::list<T>>
30class Queue
31{
32 size_t fSize; // Only necessary for before C++11
33
34 List fList;
35
36 std::mutex fMutex; // Mutex needed for the conditional
37 std::condition_variable fCond; // Conditional
38
39 enum state_t
40 {
41 kIdle,
42 kRun,
43 kStop,
44 kAbort,
45 kTrigger,
46 kPrompt
47
48 };
49
50 state_t fState; // Stop signal for the thread
51
52 typedef std::function<bool(const T &)> callback;
53 callback fCallback; // Callback function called by the thread
54
55 std::thread fThread; // Handle to the thread
56
57 void Thread()
58 {
59 std::unique_lock<std::mutex> lock(fMutex);
60
61 // No filling allowed by default (the queue is
62 // always processed until it is empty)
63 size_t allowed = 0;
64
65 while (1)
66 {
67 while (fSize==allowed && fState==kRun)
68 fCond.wait(lock);
69
70 // Check if the State flag has been changed
71 if (fState==kAbort)
72 break;
73
74 if (fState==kStop && fList.empty())
75 break;
76
77 // If thread got just woken up, move back the state to kRun
78 if (fState==kTrigger)
79 fState = kRun;
80
81 // Could have been a fState==kTrigger case
82 if (fList.empty())
83 continue;
84
85 // During the unlocked state, fSize might change.
86 // The current size of the queue needs to be saved.
87 allowed = fSize;
88
89 // get the first entry from the (sorted) list
90 const auto it = fList.begin();
91
92 // Theoretically, we can loose a signal here, but this is
93 // not a problem, because then we detect a non-empty queue
94 lock.unlock();
95
96 // If the first event in the queue could not be processed,
97 // no further processing makes sense until a new event has
98 // been posted (or the number of events in the queue has
99 // changed) [allowed>0], in the case processing was
100 // successfull [allowed==0], the next event will be processed
101 // immediately.
102
103 if (fCallback && fCallback(*it))
104 allowed = 0;
105
106 lock.lock();
107
108 // Whenever an event was successfully processed, allowed
109 // is equal to zero and thus the event will be popped
110 if (allowed>0)
111 continue;
112
113 fList.erase(it);
114 fSize--;
115 }
116
117 fList.clear();
118 fSize = 0;
119
120 fState = kIdle;
121 }
122
123public:
124 Queue(const callback &f, bool startup=true) : fSize(0), fState(kIdle), fCallback(f)
125 {
126 if (startup)
127 start();
128 }
129
130 Queue(const Queue<T,List>& q) : fSize(0), fState(kIdle), fCallback(q.fCallback)
131 {
132 }
133
134 Queue<T,List>& operator = (const Queue<T,List>& q)
135 {
136 fSize = 0;
137 fState = kIdle;
138 fCallback = q.fCallback;
139 return *this;
140 }
141
142#ifdef __MARS__ // Needed for the compilatio of the dictionary
143 Queue() : fSize(0), fState(kIdle)
144 {
145 }
146#endif
147
148 ~Queue()
149 {
150 wait(true);
151 }
152
153 bool start()
154 {
155 const std::lock_guard<std::mutex> lock(fMutex);
156 if (fState!=kIdle)
157 return false;
158
159 fState = kRun;
160 fThread = std::thread(std::bind(&Queue::Thread, this));
161 return true;
162 }
163
164 bool stop()
165 {
166 const std::lock_guard<std::mutex> lock(fMutex);
167 if (fState==kIdle)
168 return false;
169
170 fState = kStop;
171 fCond.notify_one();
172
173 return true;
174 }
175
176 bool abort()
177 {
178 const std::lock_guard<std::mutex> lock(fMutex);
179 if (fState==kIdle)
180 return false;
181
182 fState = kAbort;
183 fCond.notify_one();
184
185 return true;
186 }
187
188 bool wait(bool abrt=false)
189 {
190 {
191 const std::lock_guard<std::mutex> lock(fMutex);
192 if (fState==kIdle || fState==kPrompt)
193 return false;
194
195 if (fState==kRun)
196 {
197 fState = abrt ? kAbort : kStop;
198 fCond.notify_one();
199 }
200 }
201
202 fThread.join();
203 return true;
204 }
205
206 bool enablePromptExecution()
207 {
208 const std::lock_guard<std::mutex> lock(fMutex);
209 if (fState!=kIdle || fSize>0)
210 return false;
211
212 fState = kPrompt;
213 return true;
214 }
215
216 bool disablePromptExecution()
217 {
218 const std::lock_guard<std::mutex> lock(fMutex);
219 if (fState!=kPrompt)
220 return false;
221
222 fState = kIdle;
223 return true;
224 }
225
226 bool setPromptExecution(bool state)
227 {
228 return state ? enablePromptExecution() : disablePromptExecution();
229 }
230
231
232 bool post(const T &val)
233 {
234 const std::lock_guard<std::mutex> lock(fMutex);
235 if (fState==kPrompt)
236 return fCallback(val);
237
238 if (fState==kIdle)
239 return false;
240
241 fList.push_back(val);
242 fSize++;
243
244 fCond.notify_one();
245
246 return true;
247 }
248
249 bool notify()
250 {
251 const std::lock_guard<std::mutex> lock(fMutex);
252 if (fState!=kRun)
253 return false;
254
255 fState = kTrigger;
256 fCond.notify_one();
257
258 return true;
259 }
260
261#ifdef __GXX_EXPERIMENTAL_CXX0X__
262 template<typename... _Args>
263 bool emplace(_Args&&... __args)
264 {
265 const std::lock_guard<std::mutex> lock(fMutex);
266 if (fState==kPrompt)
267 return fCallback(T(__args...));
268
269 if (fState==kIdle)
270 return false;
271
272 fList.emplace_back(__args...);
273 fSize++;
274
275 fCond.notify_one();
276
277 return true;
278 }
279
280 bool post(T &&val) { return emplace(std::move(val)); }
281#endif
282
283#ifdef __GXX_EXPERIMENTAL_CXX0X__
284 bool move(List&& x, typename List::iterator i)
285#else
286 bool move(List& x, typename List::iterator i)
287#endif
288 {
289 const std::lock_guard<std::mutex> lock(fMutex);
290 if (fState==kIdle)
291 return false;
292
293 fList.splice(fList.end(), x, i);
294 fSize++;
295
296 fCond.notify_one();
297
298 return true;
299 }
300
301#ifdef __GXX_EXPERIMENTAL_CXX0X__
302 bool move(List& x, typename List::iterator i) { return move(std::move(x), i); }
303#endif
304
305 size_t size() const
306 {
307 return fSize;
308 }
309
310 bool empty() const
311 {
312 return fSize==0;
313 }
314
315 bool operator<(const Queue& other) const
316 {
317 return fSize < other.fSize;
318 }
319
320};
321
322#endif
Note: See TracBrowser for help on using the repository browser.