source: trunk/FACT++/src/queue.h@ 16411

Last change on this file since 16411 was 16283, checked in by tbretz, 12 years ago
In analogy to the container functions, added post and emplace
File size: 3.3 KB
Line 
1#ifndef FACT_Queue
2#define FACT_Queue
3
4#include <thread>
5#include <condition_variable>
6
7template<class T>
8class Queue : std::list<T>
9{
10 std::mutex fMutex; // Mutex needed for the conditional
11 std::condition_variable fCond; // Conditional
12
13 enum state_t
14 {
15 kIdle,
16 kRun,
17 kStop,
18 kAbort,
19 };
20
21 size_t fSize; // Only necessary for before C++11
22
23 state_t fState; // Stop signal for the thread
24
25 typedef std::function<void(const T &)> callback;
26 callback fCallback; // Callback function called by the thread
27
28 std::thread fThread; // Handle to the thread
29
30 void Thread()
31 {
32 std::unique_lock<std::mutex> lock(fMutex);
33
34 while (1)
35 {
36 while (std::list<T>::empty() && fState==kRun)
37 fCond.wait(lock);
38
39 if (fState==kAbort)
40 break;
41
42 if (fState==kStop && std::list<T>::empty())
43 break;
44
45 const T &val = std::list<T>::front();
46
47 // Theoretically, we can loose a signal here, but this is
48 // not a problem, because then we detect a non-empty queue
49 lock.unlock();
50
51 if (fCallback)
52 fCallback(val);
53
54 lock.lock();
55
56 std::list<T>::pop_front();
57 fSize--;
58 }
59
60 std::list<T>::clear();
61 fSize = 0;
62
63 fState = kIdle;
64 }
65
66public:
67 Queue(const callback &f) : fSize(0), fState(kIdle), fCallback(f)
68 {
69 start();
70 }
71 ~Queue()
72 {
73 wait(true);
74 }
75
76 bool start()
77 {
78 const std::lock_guard<std::mutex> lock(fMutex);
79 if (fState!=kIdle)
80 return false;
81
82 fState = kRun;
83 fThread = std::thread(std::bind(&Queue::Thread, this));
84 return true;
85 }
86
87 bool stop()
88 {
89 const std::lock_guard<std::mutex> lock(fMutex);
90 if (fState==kIdle)
91 return false;
92
93 fState = kStop;
94 fCond.notify_one();
95
96 return true;
97 }
98
99 bool abort()
100 {
101 const std::lock_guard<std::mutex> lock(fMutex);
102 if (fState==kIdle)
103 return false;
104
105 fState = kAbort;
106 fCond.notify_one();
107
108 return true;
109 }
110
111 bool wait(bool abrt=false)
112 {
113 {
114 const std::lock_guard<std::mutex> lock(fMutex);
115 if (fState==kIdle)
116 return false;
117
118 if (fState==kRun)
119 {
120 fState = abrt ? kAbort : kStop;
121 fCond.notify_one();
122 }
123 }
124
125 fThread.join();
126 return true;
127 }
128
129 bool post(const T &val)
130 {
131 const std::lock_guard<std::mutex> lock(fMutex);
132 if (fState==kIdle)
133 return false;
134
135 std::list<T>::push_back(val);
136 fSize++;
137
138 fCond.notify_one();
139
140 return true;
141 }
142
143#ifdef __GXX_EXPERIMENTAL_CXX0X__
144 template<typename... _Args>
145 bool emplace(_Args&&... __args)
146 {
147 const std::lock_guard<std::mutex> lock(fMutex);
148 if (fState==kIdle)
149 return false;
150
151 std::list<T>::emplace_back(__args...);
152 fSize++;
153
154 fCond.notify_one();
155
156 return true;
157
158 }
159
160 bool post(T &&val) { return emplace(std::move(val)); }
161#endif
162
163 size_t size() const
164 {
165 return fSize;
166 }
167};
168
169#endif
Note: See TracBrowser for help on using the repository browser.