source: trunk/Mars/mcore/Queue.h@ 17270

Last change on this file since 17270 was 17268, checked in by tbretz, 12 years ago
Implemented a <-operator to allow for sorting and the possibility to request prompt execution.
File size: 6.8 KB
Line 
1#ifndef FACT_Queue
2#define FACT_Queue
3
4#include <list>
5#include <thread>
6#include <condition_variable>
7
8// The second template argument must support:
9// iterator it = begin(); // get the next element to be processed
10// erase(it); // erase the processed element from the queue
11// push_back(); // add a new element to the queue
12// emplace_back(); // emplace a new element to the queue
13// splice(); // used to efficiently implement post with mutex
14
15template<class T, class List=std::list<T>>
16class Queue
17{
18 size_t fSize; // Only necessary for before C++11
19
20 List fList;
21
22 std::mutex fMutex; // Mutex needed for the conditional
23 std::condition_variable fCond; // Conditional
24
25 enum state_t
26 {
27 kIdle,
28 kRun,
29 kStop,
30 kAbort,
31 kTrigger,
32 kPrompt
33
34 };
35
36 state_t fState; // Stop signal for the thread
37
38 typedef std::function<bool(const T &)> callback;
39 callback fCallback; // Callback function called by the thread
40
41 std::thread fThread; // Handle to the thread
42
43 void Thread()
44 {
45 std::unique_lock<std::mutex> lock(fMutex);
46
47 // No filling allowed by default (the queue is
48 // always processed until it is empty)
49 size_t allowed = 0;
50
51 while (1)
52 {
53 while (fSize==allowed && fState==kRun)
54 fCond.wait(lock);
55
56 // Check if the State flag has been changed
57 if (fState==kAbort)
58 break;
59
60 if (fState==kStop && fList.empty())
61 break;
62
63 // If thread got just woken up, move back the state to kRun
64 if (fState==kTrigger)
65 fState = kRun;
66
67 // Could have been a fState==kTrigger case
68 if (fList.empty())
69 continue;
70
71 // During the unlocked state, fSize might change.
72 // The current size of the queue needs to be saved.
73 allowed = fSize;
74
75 // get the first entry from the (sorted) list
76 const auto it = fList.begin();
77
78 // Theoretically, we can loose a signal here, but this is
79 // not a problem, because then we detect a non-empty queue
80 lock.unlock();
81
82 // If the first event in the queue could not be processed,
83 // no further processing makes sense until a new event has
84 // been posted (or the number of events in the queue has
85 // changed) [allowed>0], in the case processing was
86 // successfull [allowed==0], the next event will be processed
87 // immediately.
88 if (!fCallback || !fCallback(*it))
89 allowed = 0;
90
91 lock.lock();
92
93 // Whenever an event was successfully processed, allowed
94 // is larger than zero and thus the event will be popped
95 if (allowed==0)
96 continue;
97
98 fList.erase(it);
99
100 fSize--;
101 allowed--;
102
103 }
104
105 fList.clear();
106 fSize = 0;
107
108 fState = kIdle;
109 }
110
111public:
112 Queue(const callback &f, bool startup=true) : fSize(0), fState(kIdle), fCallback(f)
113 {
114 if (startup)
115 start();
116 }
117
118 Queue(const Queue<T,List>& q) : fSize(0), fState(kIdle), fCallback(q.fCallback)
119 {
120 }
121
122 Queue<T,List>& operator = (const Queue<T,List>& q)
123 {
124 fSize = 0;
125 fState = kIdle;
126 fCallback = q.fCallback;
127 return *this;
128 }
129
130 ~Queue()
131 {
132 wait(true);
133 }
134
135 bool start()
136 {
137 const std::lock_guard<std::mutex> lock(fMutex);
138 if (fState!=kIdle)
139 return false;
140
141 fState = kRun;
142 fThread = std::thread(std::bind(&Queue::Thread, this));
143 return true;
144 }
145
146 bool stop()
147 {
148 const std::lock_guard<std::mutex> lock(fMutex);
149 if (fState==kIdle)
150 return false;
151
152 fState = kStop;
153 fCond.notify_one();
154
155 return true;
156 }
157
158 bool abort()
159 {
160 const std::lock_guard<std::mutex> lock(fMutex);
161 if (fState==kIdle)
162 return false;
163
164 fState = kAbort;
165 fCond.notify_one();
166
167 return true;
168 }
169
170 bool wait(bool abrt=false)
171 {
172 {
173 const std::lock_guard<std::mutex> lock(fMutex);
174 if (fState==kIdle)
175 return false;
176
177 if (fState==kRun)
178 {
179 fState = abrt ? kAbort : kStop;
180 fCond.notify_one();
181 }
182 }
183
184 fThread.join();
185 return true;
186 }
187
188 bool enablePromptExecution()
189 {
190 const std::lock_guard<std::mutex> lock(fMutex);
191 if (fState!=kIdle || fSize>0)
192 return false;
193
194 fState = kPrompt;
195 return true;
196 }
197
198 bool disablePromptExecution()
199 {
200 const std::lock_guard<std::mutex> lock(fMutex);
201 if (fState!=kPrompt)
202 return false;
203
204 fState = kIdle;
205 return true;
206 }
207
208 bool setPromptExecution(bool state)
209 {
210 return state ? enablePromptExecution() : disablePromptExecution();
211 }
212
213
214 bool post(const T &val)
215 {
216 const std::lock_guard<std::mutex> lock(fMutex);
217 if (fState==kPrompt)
218 return fCallback(val);
219
220 if (fState==kIdle)
221 return false;
222
223 fList.push_back(val);
224 fSize++;
225
226 fCond.notify_one();
227
228 return true;
229 }
230
231 bool notify()
232 {
233 const std::lock_guard<std::mutex> lock(fMutex);
234 if (fState!=kRun)
235 return false;
236
237 fState = kTrigger;
238 fCond.notify_one();
239
240 return true;
241 }
242
243#ifdef __GXX_EXPERIMENTAL_CXX0X__
244 template<typename... _Args>
245 bool emplace(_Args&&... __args)
246 {
247 const std::lock_guard<std::mutex> lock(fMutex);
248 if (fState==kPrompt)
249 return fCallback(T(__args...));
250
251 if (fState==kIdle)
252 return false;
253
254 fList.emplace_back(__args...);
255 fSize++;
256
257 fCond.notify_one();
258
259 return true;
260 }
261
262 bool post(T &&val) { return emplace(std::move(val)); }
263#endif
264
265#ifdef __GXX_EXPERIMENTAL_CXX0X__
266 bool move(List&& x, typename List::iterator i)
267#else
268 bool move(List& x, typename List::iterator i)
269#endif
270 {
271 const std::lock_guard<std::mutex> lock(fMutex);
272 if (fState==kIdle)
273 return false;
274
275 fList.splice(fList.end(), x, i);
276 fSize++;
277
278 fCond.notify_one();
279
280 return true;
281 }
282
283#ifdef __GXX_EXPERIMENTAL_CXX0X__
284 bool move(List& x, typename List::iterator i) { return move(std::move(x), i); }
285#endif
286
287 size_t size() const
288 {
289 return fSize;
290 }
291
292 bool empty() const
293 {
294 return fSize==0;
295 }
296
297 bool operator<(const Queue& other) const
298 {
299 return fSize < other.fSize;
300 }
301
302};
303
304#endif
Note: See TracBrowser for help on using the repository browser.