source: trunk/Mars/mcore/Queue.h@ 19552

Last change on this file since 19552 was 19087, checked in by tbretz, 6 years ago
Icluded functional for bind
File size: 7.1 KB
Line 
1#ifndef FACT_Queue
2#define FACT_Queue
3
4#include <list>
5#include <functional>
6
7#ifndef __CINT__
8#include <thread>
9#include <condition_variable>
10#else
11namespace std
12{
13 class mutex;
14 class thread;
15 class condition_variable;
16 template<class T> class function<T>;
17}
18#endif
19
20
21// The second template argument must support:
22// iterator it = begin(); // get the next element to be processed
23// erase(it); // erase the processed element from the queue
24// push_back(); // add a new element to the queue
25// emplace_back(); // emplace a new element to the queue
26// splice(); // used to efficiently implement post with mutex
27
28template<class T, class List=std::list<T>>
29class Queue
30{
31 size_t fSize; // Only necessary for before C++11
32
33 List fList;
34
35 std::mutex fMutex; // Mutex needed for the conditional
36 std::condition_variable fCond; // Conditional
37
38 enum state_t
39 {
40 kIdle,
41 kRun,
42 kStop,
43 kAbort,
44 kTrigger,
45 kPrompt
46
47 };
48
49 state_t fState; // Stop signal for the thread
50
51 typedef std::function<bool(const T &)> callback;
52 callback fCallback; // Callback function called by the thread
53
54 std::thread fThread; // Handle to the thread
55
56 void Thread()
57 {
58 std::unique_lock<std::mutex> lock(fMutex);
59
60 // No filling allowed by default (the queue is
61 // always processed until it is empty)
62 size_t allowed = 0;
63
64 while (1)
65 {
66 while (fSize==allowed && fState==kRun)
67 fCond.wait(lock);
68
69 // Check if the State flag has been changed
70 if (fState==kAbort)
71 break;
72
73 if (fState==kStop && fList.empty())
74 break;
75
76 // If thread got just woken up, move back the state to kRun
77 if (fState==kTrigger)
78 fState = kRun;
79
80 // Could have been a fState==kTrigger case
81 if (fList.empty())
82 continue;
83
84 // During the unlocked state, fSize might change.
85 // The current size of the queue needs to be saved.
86 allowed = fSize;
87
88 // get the first entry from the (sorted) list
89 const auto it = fList.begin();
90
91 // Theoretically, we can loose a signal here, but this is
92 // not a problem, because then we detect a non-empty queue
93 lock.unlock();
94
95 // If the first event in the queue could not be processed,
96 // no further processing makes sense until a new event has
97 // been posted (or the number of events in the queue has
98 // changed) [allowed>0], in the case processing was
99 // successfull [allowed==0], the next event will be processed
100 // immediately.
101
102 if (fCallback && fCallback(*it))
103 allowed = 0;
104
105 lock.lock();
106
107 // Whenever an event was successfully processed, allowed
108 // is equal to zero and thus the event will be popped
109 if (allowed>0)
110 continue;
111
112 fList.erase(it);
113 fSize--;
114 }
115
116 fList.clear();
117 fSize = 0;
118
119 fState = kIdle;
120 }
121
122public:
123 Queue(const callback &f, bool startup=true) : fSize(0), fState(kIdle), fCallback(f)
124 {
125 if (startup)
126 start();
127 }
128
129 Queue(const Queue<T,List>& q) : fSize(0), fState(kIdle), fCallback(q.fCallback)
130 {
131 }
132
133 Queue<T,List>& operator = (const Queue<T,List>& q)
134 {
135 fSize = 0;
136 fState = kIdle;
137 fCallback = q.fCallback;
138 return *this;
139 }
140
141#ifdef __MARS__ // Needed for the compilatio of the dictionary
142 Queue() : fSize(0), fState(kIdle)
143 {
144 }
145#endif
146
147 ~Queue()
148 {
149 wait(true);
150 }
151
152 bool start()
153 {
154 const std::lock_guard<std::mutex> lock(fMutex);
155 if (fState!=kIdle)
156 return false;
157
158 fState = kRun;
159 fThread = std::thread(std::bind(&Queue::Thread, this));
160 return true;
161 }
162
163 bool stop()
164 {
165 const std::lock_guard<std::mutex> lock(fMutex);
166 if (fState==kIdle)
167 return false;
168
169 fState = kStop;
170 fCond.notify_one();
171
172 return true;
173 }
174
175 bool abort()
176 {
177 const std::lock_guard<std::mutex> lock(fMutex);
178 if (fState==kIdle)
179 return false;
180
181 fState = kAbort;
182 fCond.notify_one();
183
184 return true;
185 }
186
187 bool wait(bool abrt=false)
188 {
189 {
190 const std::lock_guard<std::mutex> lock(fMutex);
191 if (fState==kIdle || fState==kPrompt)
192 return false;
193
194 if (fState==kRun)
195 {
196 fState = abrt ? kAbort : kStop;
197 fCond.notify_one();
198 }
199 }
200
201 fThread.join();
202 return true;
203 }
204
205 bool enablePromptExecution()
206 {
207 const std::lock_guard<std::mutex> lock(fMutex);
208 if (fState!=kIdle || fSize>0)
209 return false;
210
211 fState = kPrompt;
212 return true;
213 }
214
215 bool disablePromptExecution()
216 {
217 const std::lock_guard<std::mutex> lock(fMutex);
218 if (fState!=kPrompt)
219 return false;
220
221 fState = kIdle;
222 return true;
223 }
224
225 bool setPromptExecution(bool state)
226 {
227 return state ? enablePromptExecution() : disablePromptExecution();
228 }
229
230
231 bool post(const T &val)
232 {
233 const std::lock_guard<std::mutex> lock(fMutex);
234 if (fState==kPrompt)
235 return fCallback(val);
236
237 if (fState==kIdle)
238 return false;
239
240 fList.push_back(val);
241 fSize++;
242
243 fCond.notify_one();
244
245 return true;
246 }
247
248 bool notify()
249 {
250 const std::lock_guard<std::mutex> lock(fMutex);
251 if (fState!=kRun)
252 return false;
253
254 fState = kTrigger;
255 fCond.notify_one();
256
257 return true;
258 }
259
260#ifdef __GXX_EXPERIMENTAL_CXX0X__
261 template<typename... _Args>
262 bool emplace(_Args&&... __args)
263 {
264 const std::lock_guard<std::mutex> lock(fMutex);
265 if (fState==kPrompt)
266 return fCallback(T(__args...));
267
268 if (fState==kIdle)
269 return false;
270
271 fList.emplace_back(__args...);
272 fSize++;
273
274 fCond.notify_one();
275
276 return true;
277 }
278
279 bool post(T &&val) { return emplace(std::move(val)); }
280#endif
281
282#ifdef __GXX_EXPERIMENTAL_CXX0X__
283 bool move(List&& x, typename List::iterator i)
284#else
285 bool move(List& x, typename List::iterator i)
286#endif
287 {
288 const std::lock_guard<std::mutex> lock(fMutex);
289 if (fState==kIdle)
290 return false;
291
292 fList.splice(fList.end(), x, i);
293 fSize++;
294
295 fCond.notify_one();
296
297 return true;
298 }
299
300#ifdef __GXX_EXPERIMENTAL_CXX0X__
301 bool move(List& x, typename List::iterator i) { return move(std::move(x), i); }
302#endif
303
304 size_t size() const
305 {
306 return fSize;
307 }
308
309 bool empty() const
310 {
311 return fSize==0;
312 }
313
314 bool operator<(const Queue& other) const
315 {
316 return fSize < other.fSize;
317 }
318
319};
320
321#endif
Note: See TracBrowser for help on using the repository browser.