Changeset 17220
- Timestamp:
- 10/15/13 15:22:42 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/queue.h
r17219 r17220 10 10 { 11 11 size_t fSize; // Only necessary for before C++11 12 bool fSort; // Sort the list before processing13 12 14 13 std::list<T> fList; 15 14 16 std::mutex fMutex; 15 std::mutex fMutex; // Mutex needed for the conditional 17 16 std::condition_variable fCond; // Conditional 18 17 … … 23 22 kStop, 24 23 kAbort, 25 kTrigger26 24 }; 27 25 28 26 state_t fState; // Stop signal for the thread 29 27 30 typedef std::function< bool(const T &)> callback;28 typedef std::function<void(const T &)> callback; 31 29 callback fCallback; // Callback function called by the thread 32 30 … … 37 35 std::unique_lock<std::mutex> lock(fMutex); 38 36 39 // No filling allowed by default (the queue is40 // always processed until it is empty)41 size_t allowed = 0;42 43 37 while (1) 44 38 { 45 while (f Size==allowed&& fState==kRun)39 while (fList.empty() && fState==kRun) 46 40 fCond.wait(lock); 47 41 48 // Check if the State flag has been changed49 42 if (fState==kAbort) 50 43 break; … … 53 46 break; 54 47 55 // If thread got just woken up, move back the state to kRun 56 if (fState == kTrigger) 57 fState = kRun; 58 59 // Could have been a fState==kTrigger case 60 if (fList.empty()) 61 continue; 62 63 // During the unlocked state, fSize might change. 64 // The current size of the queue needs to be saved. 65 allowed = fSize; 66 67 // get the first entry from the (sorted) list 68 const auto it = fSort ? min_element(fList.begin(), fList.end()) : fList.begin(); 48 const T &val = fList.front(); 69 49 70 50 // Theoretically, we can loose a signal here, but this is … … 72 52 lock.unlock(); 73 53 74 // If the first event in the queue could not be processed, 75 // no further processing makes sense until a new event has 76 // been posted (or the number of events in the queue has 77 // changed) [allowed>0], in the case processing was 78 // successfull [alloed==0], the next event will be processed 79 // immediately. 80 if (!fCallback || !fCallback(*it)) 81 allowed = 0; 54 if (fCallback) 55 fCallback(val); 82 56 83 57 lock.lock(); 84 58 85 // Whenever an event was successfully processed, allowed 86 // is larger than zero and thus the event will be popped 87 if (allowed==0) 88 continue; 89 90 if (fSort) 91 fList.erase(it); 92 else 93 fList.pop_front() ; 94 59 fList.pop_front(); 95 60 fSize--; 96 allowed--;97 98 61 } 99 62 … … 105 68 106 69 public: 107 Queue(const callback &f , bool sort=false, bool startup=true) : fSize(0), fSort(sort), fState(kIdle), fCallback(f)70 Queue(const callback &f) : fSize(0), fState(kIdle), fCallback(f) 108 71 { 109 if (startup) 110 start(); 72 start(); 111 73 } 112 113 Queue(const Queue<T>& q) : fSize(0), fSort(q.fSort), fState(kIdle), fCallback(q.fCallback)114 {115 }116 117 Queue<T>& operator = (const Queue<T>& q)118 {119 fSize = 0;120 fSort = q.fSort;121 fState = kIdle;122 fCallback = q.fCallback;123 return *this;124 }125 126 74 ~Queue() 127 75 { … … 185 133 { 186 134 const std::lock_guard<std::mutex> lock(fMutex); 187 188 135 if (fState==kIdle) 189 136 return false; … … 192 139 fSize++; 193 140 194 fCond.notify_one();195 196 return true;197 }198 199 bool notify()200 {201 const std::lock_guard<std::mutex> lock(fMutex);202 if (fState!=kRun)203 return false;204 205 fState = kTrigger;206 141 fCond.notify_one(); 207 142
Note:
See TracChangeset
for help on using the changeset viewer.