source: trunk/Mars/mcore/Queue.h@ 17219

Last change on this file since 17219 was 17219, checked in by lyard, 11 years ago
draft zofits working with Queue and MemoryManager
File size: 5.9 KB
Line 
1#ifndef FACT_Queue
2#define FACT_Queue
3
4#include <list>
5#include <thread>
6#include <condition_variable>
7
8template<class T>
9class Queue
10{
11 size_t fSize; // Only necessary for before C++11
12 bool fSort; // Sort the list before processing
13
14 std::list<T> fList;
15
16 std::mutex fMutex; // Mutex needed for the conditional
17 std::condition_variable fCond; // Conditional
18
19 enum state_t
20 {
21 kIdle,
22 kRun,
23 kStop,
24 kAbort,
25 kTrigger
26 };
27
28 state_t fState; // Stop signal for the thread
29
30 typedef std::function<bool(const T &)> callback;
31 callback fCallback; // Callback function called by the thread
32
33 std::thread fThread; // Handle to the thread
34
35 void Thread()
36 {
37 std::unique_lock<std::mutex> lock(fMutex);
38
39 // No filling allowed by default (the queue is
40 // always processed until it is empty)
41 size_t allowed = 0;
42
43 while (1)
44 {
45 while (fSize==allowed && fState==kRun)
46 fCond.wait(lock);
47
48 // Check if the State flag has been changed
49 if (fState==kAbort)
50 break;
51
52 if (fState==kStop && fList.empty())
53 break;
54
55 // If thread got just woken up, move back the state to kRun
56 if (fState == kTrigger)
57 fState = kRun;
58
59 // Could have been a fState==kTrigger case
60 if (fList.empty())
61 continue;
62
63 // During the unlocked state, fSize might change.
64 // The current size of the queue needs to be saved.
65 allowed = fSize;
66
67 // get the first entry from the (sorted) list
68 const auto it = fSort ? min_element(fList.begin(), fList.end()) : fList.begin();
69
70 // Theoretically, we can loose a signal here, but this is
71 // not a problem, because then we detect a non-empty queue
72 lock.unlock();
73
74 // If the first event in the queue could not be processed,
75 // no further processing makes sense until a new event has
76 // been posted (or the number of events in the queue has
77 // changed) [allowed>0], in the case processing was
78 // successfull [alloed==0], the next event will be processed
79 // immediately.
80 if (!fCallback || !fCallback(*it))
81 allowed = 0;
82
83 lock.lock();
84
85 // Whenever an event was successfully processed, allowed
86 // is larger than zero and thus the event will be popped
87 if (allowed==0)
88 continue;
89
90 if (fSort)
91 fList.erase(it);
92 else
93 fList.pop_front() ;
94
95 fSize--;
96 allowed--;
97
98 }
99
100 fList.clear();
101 fSize = 0;
102
103 fState = kIdle;
104 }
105
106public:
107 Queue(const callback &f, bool sort=false, bool startup=true) : fSize(0), fSort(sort), fState(kIdle), fCallback(f)
108 {
109 if (startup)
110 start();
111 }
112
113 Queue(const Queue<T>& q) : fSize(0), fSort(q.fSort), fState(kIdle), fCallback(q.fCallback)
114 {
115 }
116
117 Queue<T>& operator = (const Queue<T>& q)
118 {
119 fSize = 0;
120 fSort = q.fSort;
121 fState = kIdle;
122 fCallback = q.fCallback;
123 return *this;
124 }
125
126 ~Queue()
127 {
128 wait(true);
129 }
130
131 bool start()
132 {
133 const std::lock_guard<std::mutex> lock(fMutex);
134 if (fState!=kIdle)
135 return false;
136
137 fState = kRun;
138 fThread = std::thread(std::bind(&Queue::Thread, this));
139 return true;
140 }
141
142 bool stop()
143 {
144 const std::lock_guard<std::mutex> lock(fMutex);
145 if (fState==kIdle)
146 return false;
147
148 fState = kStop;
149 fCond.notify_one();
150
151 return true;
152 }
153
154 bool abort()
155 {
156 const std::lock_guard<std::mutex> lock(fMutex);
157 if (fState==kIdle)
158 return false;
159
160 fState = kAbort;
161 fCond.notify_one();
162
163 return true;
164 }
165
166 bool wait(bool abrt=false)
167 {
168 {
169 const std::lock_guard<std::mutex> lock(fMutex);
170 if (fState==kIdle)
171 return false;
172
173 if (fState==kRun)
174 {
175 fState = abrt ? kAbort : kStop;
176 fCond.notify_one();
177 }
178 }
179
180 fThread.join();
181 return true;
182 }
183
184 bool post(const T &val)
185 {
186 const std::lock_guard<std::mutex> lock(fMutex);
187
188 if (fState==kIdle)
189 return false;
190
191 fList.push_back(val);
192 fSize++;
193
194 fCond.notify_one();
195
196 return true;
197 }
198
199 bool notify()
200 {
201 const std::lock_guard<std::mutex> lock(fMutex);
202 if (fState!=kRun)
203 return false;
204
205 fState = kTrigger;
206 fCond.notify_one();
207
208 return true;
209 }
210
211#ifdef __GXX_EXPERIMENTAL_CXX0X__
212 template<typename... _Args>
213 bool emplace(_Args&&... __args)
214 {
215 const std::lock_guard<std::mutex> lock(fMutex);
216 if (fState==kIdle)
217 return false;
218
219 fList.emplace_back(__args...);
220 fSize++;
221
222 fCond.notify_one();
223
224 return true;
225 }
226
227 bool post(T &&val) { return emplace(std::move(val)); }
228#endif
229
230#ifdef __GXX_EXPERIMENTAL_CXX0X__
231 bool move(std::list<T>&& x, typename std::list<T>::iterator i)
232#else
233 bool move(std::list<T>& x, typename std::list<T>::iterator i)
234#endif
235 {
236 const std::lock_guard<std::mutex> lock(fMutex);
237 if (fState==kIdle)
238 return false;
239
240 fList.splice(fList.end(), x, i);
241 fSize++;
242
243 fCond.notify_one();
244
245 return true;
246 }
247
248#ifdef __GXX_EXPERIMENTAL_CXX0X__
249 bool move(std::list<T>& x, typename std::list<T>::iterator i) { return move(std::move(x), i); }
250#endif
251
252 size_t size() const
253 {
254 return fSize;
255 }
256
257 bool empty() const
258 {
259 return fSize==0;
260 }
261};
262
263#endif
Note: See TracBrowser for help on using the repository browser.