source: trunk/Mars/mcore/Queue.h@ 17289

Last change on this file since 17289 was 17283, checked in by lyard, 11 years ago
added no-wait condition when processing is immediate
File size: 6.8 KB
Line 
1#ifndef FACT_Queue
2#define FACT_Queue
3
4#include <list>
5#include <thread>
6#include <condition_variable>
7
8// The second template argument must support:
9// iterator it = begin(); // get the next element to be processed
10// erase(it); // erase the processed element from the queue
11// push_back(); // add a new element to the queue
12// emplace_back(); // emplace a new element to the queue
13// splice(); // used to efficiently implement post with mutex
14
15using namespace std;
16
17template<class T, class List=std::list<T>>
18class Queue
19{
20 size_t fSize; // Only necessary for before C++11
21
22 List fList;
23
24 std::mutex fMutex; // Mutex needed for the conditional
25 std::condition_variable fCond; // Conditional
26
27 enum state_t
28 {
29 kIdle,
30 kRun,
31 kStop,
32 kAbort,
33 kTrigger,
34 kPrompt
35
36 };
37
38 state_t fState; // Stop signal for the thread
39
40 typedef std::function<bool(const T &)> callback;
41 callback fCallback; // Callback function called by the thread
42
43 std::thread fThread; // Handle to the thread
44
45 void Thread()
46 {
47 std::unique_lock<std::mutex> lock(fMutex);
48
49 // No filling allowed by default (the queue is
50 // always processed until it is empty)
51 size_t allowed = 0;
52
53 while (1)
54 {
55 while (fSize==allowed && fState==kRun)
56 fCond.wait(lock);
57
58 // Check if the State flag has been changed
59 if (fState==kAbort)
60 break;
61
62 if (fState==kStop && fList.empty())
63 break;
64
65 // If thread got just woken up, move back the state to kRun
66 if (fState==kTrigger)
67 fState = kRun;
68
69 // Could have been a fState==kTrigger case
70 if (fList.empty())
71 continue;
72
73 // During the unlocked state, fSize might change.
74 // The current size of the queue needs to be saved.
75 allowed = fSize;
76
77 // get the first entry from the (sorted) list
78 const auto it = fList.begin();
79
80 // Theoretically, we can loose a signal here, but this is
81 // not a problem, because then we detect a non-empty queue
82 lock.unlock();
83
84 // If the first event in the queue could not be processed,
85 // no further processing makes sense until a new event has
86 // been posted (or the number of events in the queue has
87 // changed) [allowed>0], in the case processing was
88 // successfull [allowed==0], the next event will be processed
89 // immediately.
90
91 if (fCallback && fCallback(*it))
92 allowed = 0;
93
94 lock.lock();
95
96 // Whenever an event was successfully processed, allowed
97 // is equal to zero and thus the event will be popped
98 if (allowed>0)
99 continue;
100
101 fList.erase(it);
102 fSize--;
103 }
104
105 fList.clear();
106 fSize = 0;
107
108 fState = kIdle;
109 }
110
111public:
112 Queue(const callback &f, bool startup=true) : fSize(0), fState(kIdle), fCallback(f)
113 {
114 if (startup)
115 start();
116 }
117
118 Queue(const Queue<T,List>& q) : fSize(0), fState(kIdle), fCallback(q.fCallback)
119 {
120 }
121
122 Queue<T,List>& operator = (const Queue<T,List>& q)
123 {
124 fSize = 0;
125 fState = kIdle;
126 fCallback = q.fCallback;
127 return *this;
128 }
129
130 ~Queue()
131 {
132 wait(true);
133 }
134
135 bool start()
136 {
137 const std::lock_guard<std::mutex> lock(fMutex);
138 if (fState!=kIdle)
139 return false;
140
141 fState = kRun;
142 fThread = std::thread(std::bind(&Queue::Thread, this));
143 return true;
144 }
145
146 bool stop()
147 {
148 const std::lock_guard<std::mutex> lock(fMutex);
149 if (fState==kIdle)
150 return false;
151
152 fState = kStop;
153 fCond.notify_one();
154
155 return true;
156 }
157
158 bool abort()
159 {
160 const std::lock_guard<std::mutex> lock(fMutex);
161 if (fState==kIdle)
162 return false;
163
164 fState = kAbort;
165 fCond.notify_one();
166
167 return true;
168 }
169
170 bool wait(bool abrt=false)
171 {
172 {
173 const std::lock_guard<std::mutex> lock(fMutex);
174 if (fState==kIdle || fState==kPrompt)
175 return false;
176
177 if (fState==kRun)
178 {
179 fState = abrt ? kAbort : kStop;
180 fCond.notify_one();
181 }
182 }
183
184 fThread.join();
185 return true;
186 }
187
188 bool enablePromptExecution()
189 {
190 const std::lock_guard<std::mutex> lock(fMutex);
191 if (fState!=kIdle || fSize>0)
192 return false;
193
194 fState = kPrompt;
195 return true;
196 }
197
198 bool disablePromptExecution()
199 {
200 const std::lock_guard<std::mutex> lock(fMutex);
201 if (fState!=kPrompt)
202 return false;
203
204 fState = kIdle;
205 return true;
206 }
207
208 bool setPromptExecution(bool state)
209 {
210 return state ? enablePromptExecution() : disablePromptExecution();
211 }
212
213
214 bool post(const T &val)
215 {
216 const std::lock_guard<std::mutex> lock(fMutex);
217 if (fState==kPrompt)
218 return fCallback(val);
219
220 if (fState==kIdle)
221 return false;
222
223 fList.push_back(val);
224 fSize++;
225
226 fCond.notify_one();
227
228 return true;
229 }
230
231 bool notify()
232 {
233 const std::lock_guard<std::mutex> lock(fMutex);
234 if (fState!=kRun)
235 return false;
236
237 fState = kTrigger;
238 fCond.notify_one();
239
240 return true;
241 }
242
243#ifdef __GXX_EXPERIMENTAL_CXX0X__
244 template<typename... _Args>
245 bool emplace(_Args&&... __args)
246 {
247 const std::lock_guard<std::mutex> lock(fMutex);
248 if (fState==kPrompt)
249 return fCallback(T(__args...));
250
251 if (fState==kIdle)
252 return false;
253
254 fList.emplace_back(__args...);
255 fSize++;
256
257 fCond.notify_one();
258
259 return true;
260 }
261
262 bool post(T &&val) { return emplace(std::move(val)); }
263#endif
264
265#ifdef __GXX_EXPERIMENTAL_CXX0X__
266 bool move(List&& x, typename List::iterator i)
267#else
268 bool move(List& x, typename List::iterator i)
269#endif
270 {
271 const std::lock_guard<std::mutex> lock(fMutex);
272 if (fState==kIdle)
273 return false;
274
275 fList.splice(fList.end(), x, i);
276 fSize++;
277
278 fCond.notify_one();
279
280 return true;
281 }
282
283#ifdef __GXX_EXPERIMENTAL_CXX0X__
284 bool move(List& x, typename List::iterator i) { return move(std::move(x), i); }
285#endif
286
287 size_t size() const
288 {
289 return fSize;
290 }
291
292 bool empty() const
293 {
294 return fSize==0;
295 }
296
297 bool operator<(const Queue& other) const
298 {
299 return fSize < other.fSize;
300 }
301
302};
303
304#endif
Note: See TracBrowser for help on using the repository browser.