source: trunk/Mars/mcore/Queue.h@ 17260

Last change on this file since 17260 was 17227, checked in by tbretz, 11 years ago
Remove fSort; replaced the possibility to define a wrapper class which overwrites begin()
File size: 6.0 KB
Line 
1#ifndef FACT_Queue
2#define FACT_Queue
3
4#include <list>
5#include <thread>
6#include <condition_variable>
7
8// The second template argument must support:
9// iterator it = begin(); // get the next element to be processed
10// erase(it); // erase the processed element from the queue
11// push_back(); // add a new element to the queue
12// emplace_back(); // emplace a new element to the queue
13// splice(); // used to efficiently implement post with mutex
14
15template<class T, class List=std::list<T>>
16class Queue
17{
18 size_t fSize; // Only necessary for before C++11
19
20 List fList;
21
22 std::mutex fMutex; // Mutex needed for the conditional
23 std::condition_variable fCond; // Conditional
24
25 enum state_t
26 {
27 kIdle,
28 kRun,
29 kStop,
30 kAbort,
31 kTrigger
32 };
33
34 state_t fState; // Stop signal for the thread
35
36 typedef std::function<bool(const T &)> callback;
37 callback fCallback; // Callback function called by the thread
38
39 std::thread fThread; // Handle to the thread
40
41 void Thread()
42 {
43 std::unique_lock<std::mutex> lock(fMutex);
44
45 // No filling allowed by default (the queue is
46 // always processed until it is empty)
47 size_t allowed = 0;
48
49 while (1)
50 {
51 while (fSize==allowed && fState==kRun)
52 fCond.wait(lock);
53
54 // Check if the State flag has been changed
55 if (fState==kAbort)
56 break;
57
58 if (fState==kStop && fList.empty())
59 break;
60
61 // If thread got just woken up, move back the state to kRun
62 if (fState==kTrigger)
63 fState = kRun;
64
65 // Could have been a fState==kTrigger case
66 if (fList.empty())
67 continue;
68
69 // During the unlocked state, fSize might change.
70 // The current size of the queue needs to be saved.
71 allowed = fSize;
72
73 // get the first entry from the (sorted) list
74 const auto it = fList.begin();
75
76 // Theoretically, we can loose a signal here, but this is
77 // not a problem, because then we detect a non-empty queue
78 lock.unlock();
79
80 // If the first event in the queue could not be processed,
81 // no further processing makes sense until a new event has
82 // been posted (or the number of events in the queue has
83 // changed) [allowed>0], in the case processing was
84 // successfull [allowed==0], the next event will be processed
85 // immediately.
86 if (!fCallback || !fCallback(*it))
87 allowed = 0;
88
89 lock.lock();
90
91 // Whenever an event was successfully processed, allowed
92 // is larger than zero and thus the event will be popped
93 if (allowed==0)
94 continue;
95
96 fList.erase(it);
97
98 fSize--;
99 allowed--;
100
101 }
102
103 fList.clear();
104 fSize = 0;
105
106 fState = kIdle;
107 }
108
109public:
110 Queue(const callback &f, bool startup=true) : fSize(0), fState(kIdle), fCallback(f)
111 {
112 if (startup)
113 start();
114 }
115
116 Queue(const Queue<T,List>& q) : fSize(0), fState(kIdle), fCallback(q.fCallback)
117 {
118 }
119
120 Queue<T,List>& operator = (const Queue<T,List>& q)
121 {
122 fSize = 0;
123 fState = kIdle;
124 fCallback = q.fCallback;
125 return *this;
126 }
127
128 ~Queue()
129 {
130 wait(true);
131 }
132
133 bool start()
134 {
135 const std::lock_guard<std::mutex> lock(fMutex);
136 if (fState!=kIdle)
137 return false;
138
139 fState = kRun;
140 fThread = std::thread(std::bind(&Queue::Thread, this));
141 return true;
142 }
143
144 bool stop()
145 {
146 const std::lock_guard<std::mutex> lock(fMutex);
147 if (fState==kIdle)
148 return false;
149
150 fState = kStop;
151 fCond.notify_one();
152
153 return true;
154 }
155
156 bool abort()
157 {
158 const std::lock_guard<std::mutex> lock(fMutex);
159 if (fState==kIdle)
160 return false;
161
162 fState = kAbort;
163 fCond.notify_one();
164
165 return true;
166 }
167
168 bool wait(bool abrt=false)
169 {
170 {
171 const std::lock_guard<std::mutex> lock(fMutex);
172 if (fState==kIdle)
173 return false;
174
175 if (fState==kRun)
176 {
177 fState = abrt ? kAbort : kStop;
178 fCond.notify_one();
179 }
180 }
181
182 fThread.join();
183 return true;
184 }
185
186 bool post(const T &val)
187 {
188 const std::lock_guard<std::mutex> lock(fMutex);
189
190 if (fState==kIdle)
191 return false;
192
193 fList.push_back(val);
194 fSize++;
195
196 fCond.notify_one();
197
198 return true;
199 }
200
201 bool notify()
202 {
203 const std::lock_guard<std::mutex> lock(fMutex);
204 if (fState!=kRun)
205 return false;
206
207 fState = kTrigger;
208 fCond.notify_one();
209
210 return true;
211 }
212
213#ifdef __GXX_EXPERIMENTAL_CXX0X__
214 template<typename... _Args>
215 bool emplace(_Args&&... __args)
216 {
217 const std::lock_guard<std::mutex> lock(fMutex);
218 if (fState==kIdle)
219 return false;
220
221 fList.emplace_back(__args...);
222 fSize++;
223
224 fCond.notify_one();
225
226 return true;
227 }
228
229 bool post(T &&val) { return emplace(std::move(val)); }
230#endif
231
232#ifdef __GXX_EXPERIMENTAL_CXX0X__
233 bool move(List&& x, typename List::iterator i)
234#else
235 bool move(List& x, typename List::iterator i)
236#endif
237 {
238 const std::lock_guard<std::mutex> lock(fMutex);
239 if (fState==kIdle)
240 return false;
241
242 fList.splice(fList.end(), x, i);
243 fSize++;
244
245 fCond.notify_one();
246
247 return true;
248 }
249
250#ifdef __GXX_EXPERIMENTAL_CXX0X__
251 bool move(List& x, typename List::iterator i) { return move(std::move(x), i); }
252#endif
253
254 size_t size() const
255 {
256 return fSize;
257 }
258
259 bool empty() const
260 {
261 return fSize==0;
262 }
263};
264
265#endif
Note: See TracBrowser for help on using the repository browser.