source: branches/Mars_IncreaseNsb/mcore/Queue.h@ 19344

Last change on this file since 19344 was 17747, checked in by tbretz, 11 years ago
'using namespace std' should never appear in header
File size: 7.1 KB
Line 
1#ifndef FACT_Queue
2#define FACT_Queue
3
4#include <list>
5
6#ifndef __CINT__
7#include <thread>
8#include <condition_variable>
9#else
10namespace std
11{
12 class mutex;
13 class thread;
14 class condition_variable;
15 template<class T> class function<T>;
16}
17#endif
18
19
20// The second template argument must support:
21// iterator it = begin(); // get the next element to be processed
22// erase(it); // erase the processed element from the queue
23// push_back(); // add a new element to the queue
24// emplace_back(); // emplace a new element to the queue
25// splice(); // used to efficiently implement post with mutex
26
27template<class T, class List=std::list<T>>
28class Queue
29{
30 size_t fSize; // Only necessary for before C++11
31
32 List fList;
33
34 std::mutex fMutex; // Mutex needed for the conditional
35 std::condition_variable fCond; // Conditional
36
37 enum state_t
38 {
39 kIdle,
40 kRun,
41 kStop,
42 kAbort,
43 kTrigger,
44 kPrompt
45
46 };
47
48 state_t fState; // Stop signal for the thread
49
50 typedef std::function<bool(const T &)> callback;
51 callback fCallback; // Callback function called by the thread
52
53 std::thread fThread; // Handle to the thread
54
55 void Thread()
56 {
57 std::unique_lock<std::mutex> lock(fMutex);
58
59 // No filling allowed by default (the queue is
60 // always processed until it is empty)
61 size_t allowed = 0;
62
63 while (1)
64 {
65 while (fSize==allowed && fState==kRun)
66 fCond.wait(lock);
67
68 // Check if the State flag has been changed
69 if (fState==kAbort)
70 break;
71
72 if (fState==kStop && fList.empty())
73 break;
74
75 // If thread got just woken up, move back the state to kRun
76 if (fState==kTrigger)
77 fState = kRun;
78
79 // Could have been a fState==kTrigger case
80 if (fList.empty())
81 continue;
82
83 // During the unlocked state, fSize might change.
84 // The current size of the queue needs to be saved.
85 allowed = fSize;
86
87 // get the first entry from the (sorted) list
88 const auto it = fList.begin();
89
90 // Theoretically, we can loose a signal here, but this is
91 // not a problem, because then we detect a non-empty queue
92 lock.unlock();
93
94 // If the first event in the queue could not be processed,
95 // no further processing makes sense until a new event has
96 // been posted (or the number of events in the queue has
97 // changed) [allowed>0], in the case processing was
98 // successfull [allowed==0], the next event will be processed
99 // immediately.
100
101 if (fCallback && fCallback(*it))
102 allowed = 0;
103
104 lock.lock();
105
106 // Whenever an event was successfully processed, allowed
107 // is equal to zero and thus the event will be popped
108 if (allowed>0)
109 continue;
110
111 fList.erase(it);
112 fSize--;
113 }
114
115 fList.clear();
116 fSize = 0;
117
118 fState = kIdle;
119 }
120
121public:
122 Queue(const callback &f, bool startup=true) : fSize(0), fState(kIdle), fCallback(f)
123 {
124 if (startup)
125 start();
126 }
127
128 Queue(const Queue<T,List>& q) : fSize(0), fState(kIdle), fCallback(q.fCallback)
129 {
130 }
131
132 Queue<T,List>& operator = (const Queue<T,List>& q)
133 {
134 fSize = 0;
135 fState = kIdle;
136 fCallback = q.fCallback;
137 return *this;
138 }
139
140#ifdef __MARS__ // Needed for the compilatio of the dictionary
141 Queue() : fSize(0), fState(kIdle)
142 {
143 }
144#endif
145
146 ~Queue()
147 {
148 wait(true);
149 }
150
151 bool start()
152 {
153 const std::lock_guard<std::mutex> lock(fMutex);
154 if (fState!=kIdle)
155 return false;
156
157 fState = kRun;
158 fThread = std::thread(std::bind(&Queue::Thread, this));
159 return true;
160 }
161
162 bool stop()
163 {
164 const std::lock_guard<std::mutex> lock(fMutex);
165 if (fState==kIdle)
166 return false;
167
168 fState = kStop;
169 fCond.notify_one();
170
171 return true;
172 }
173
174 bool abort()
175 {
176 const std::lock_guard<std::mutex> lock(fMutex);
177 if (fState==kIdle)
178 return false;
179
180 fState = kAbort;
181 fCond.notify_one();
182
183 return true;
184 }
185
186 bool wait(bool abrt=false)
187 {
188 {
189 const std::lock_guard<std::mutex> lock(fMutex);
190 if (fState==kIdle || fState==kPrompt)
191 return false;
192
193 if (fState==kRun)
194 {
195 fState = abrt ? kAbort : kStop;
196 fCond.notify_one();
197 }
198 }
199
200 fThread.join();
201 return true;
202 }
203
204 bool enablePromptExecution()
205 {
206 const std::lock_guard<std::mutex> lock(fMutex);
207 if (fState!=kIdle || fSize>0)
208 return false;
209
210 fState = kPrompt;
211 return true;
212 }
213
214 bool disablePromptExecution()
215 {
216 const std::lock_guard<std::mutex> lock(fMutex);
217 if (fState!=kPrompt)
218 return false;
219
220 fState = kIdle;
221 return true;
222 }
223
224 bool setPromptExecution(bool state)
225 {
226 return state ? enablePromptExecution() : disablePromptExecution();
227 }
228
229
230 bool post(const T &val)
231 {
232 const std::lock_guard<std::mutex> lock(fMutex);
233 if (fState==kPrompt)
234 return fCallback(val);
235
236 if (fState==kIdle)
237 return false;
238
239 fList.push_back(val);
240 fSize++;
241
242 fCond.notify_one();
243
244 return true;
245 }
246
247 bool notify()
248 {
249 const std::lock_guard<std::mutex> lock(fMutex);
250 if (fState!=kRun)
251 return false;
252
253 fState = kTrigger;
254 fCond.notify_one();
255
256 return true;
257 }
258
259#ifdef __GXX_EXPERIMENTAL_CXX0X__
260 template<typename... _Args>
261 bool emplace(_Args&&... __args)
262 {
263 const std::lock_guard<std::mutex> lock(fMutex);
264 if (fState==kPrompt)
265 return fCallback(T(__args...));
266
267 if (fState==kIdle)
268 return false;
269
270 fList.emplace_back(__args...);
271 fSize++;
272
273 fCond.notify_one();
274
275 return true;
276 }
277
278 bool post(T &&val) { return emplace(std::move(val)); }
279#endif
280
281#ifdef __GXX_EXPERIMENTAL_CXX0X__
282 bool move(List&& x, typename List::iterator i)
283#else
284 bool move(List& x, typename List::iterator i)
285#endif
286 {
287 const std::lock_guard<std::mutex> lock(fMutex);
288 if (fState==kIdle)
289 return false;
290
291 fList.splice(fList.end(), x, i);
292 fSize++;
293
294 fCond.notify_one();
295
296 return true;
297 }
298
299#ifdef __GXX_EXPERIMENTAL_CXX0X__
300 bool move(List& x, typename List::iterator i) { return move(std::move(x), i); }
301#endif
302
303 size_t size() const
304 {
305 return fSize;
306 }
307
308 bool empty() const
309 {
310 return fSize==0;
311 }
312
313 bool operator<(const Queue& other) const
314 {
315 return fSize < other.fSize;
316 }
317
318};
319
320#endif
Note: See TracBrowser for help on using the repository browser.