source: trunk/FACT++/src/queue.h@ 15900

Last change on this file since 15900 was 15625, checked in by tbretz, 12 years ago
Secure fThread.join() against joining a thread which was already stopped.
File size: 2.1 KB
Line 
1#ifndef FACT_Queue
2#define FACT_Queue
3
4#include <thread>
5#include <condition_variable>
6
7template<class T>
8class Queue : std::deque<T>
9{
10 std::mutex fMutex; // Mutex needed for the conditional
11 std::condition_variable fCond; // Conditional
12
13 enum state_t
14 {
15 kRun,
16 kWait,
17 kStop
18 };
19
20 state_t fState; // Stop signal for the thread
21
22 typedef std::function<void(const T &)> callback;
23 callback fCallback; // Callback function called by the thread
24
25 std::thread fThread; // Handle to the thread
26
27 void Thread()
28 {
29 std::unique_lock<std::mutex> lock(fMutex);
30 while (1)
31 {
32 while (std::deque<T>::empty() && fState==kRun)
33 fCond.wait(lock);
34
35 if (fState==kStop)
36 break;
37
38 if (fState==kWait && std::deque<T>::empty())
39 break;
40
41 const T val = std::deque<T>::front();
42 std::deque<T>::pop_front();
43
44 // Theoretically, we can loose a signal here, but this is
45 // not a problem, because then we detect a non-empty queue
46 lock.unlock();
47
48 if (fCallback)
49 fCallback(val);
50
51 lock.lock();
52 }
53 }
54
55public:
56 Queue(const callback &f) : fState(kRun), fCallback(f)
57 {
58 fThread = std::thread(std::bind(&Queue::Thread, this));
59 }
60 ~Queue()
61 {
62 stop();
63 join();
64 }
65
66 void post(const T &val)
67 {
68 const std::lock_guard<std::mutex> lock(fMutex);
69
70 std::deque<T>::push_back(val);
71 fCond.notify_one();
72 }
73
74 void wait()
75 {
76 fMutex.lock();
77 fState = kWait;
78 fCond.notify_one();
79 fMutex.unlock();
80 }
81
82 void stop()
83 {
84 fMutex.lock();
85 fState = kStop;
86 fCond.notify_one();
87 fMutex.unlock();
88 }
89
90 void join()
91 {
92 // This can happen is the thread is not running anymore
93 try
94 {
95 fThread.join();
96 }
97 catch (const std::system_error &)
98 {
99 }
100 }
101};
102
103#endif
Note: See TracBrowser for help on using the repository browser.