Changeset 17219 for trunk/FACT++
- Timestamp:
- 10/15/13 15:05:54 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/queue.h
r16562 r17219 10 10 { 11 11 size_t fSize; // Only necessary for before C++11 12 bool fSort; // Sort the list before processing 12 13 13 14 std::list<T> fList; 14 15 15 std::mutex fMutex; // Mutex needed for the conditional16 std::mutex fMutex; // Mutex needed for the conditional 16 17 std::condition_variable fCond; // Conditional 17 18 … … 22 23 kStop, 23 24 kAbort, 25 kTrigger 24 26 }; 25 27 26 28 state_t fState; // Stop signal for the thread 27 29 28 typedef std::function< void(const T &)> callback;30 typedef std::function<bool(const T &)> callback; 29 31 callback fCallback; // Callback function called by the thread 30 32 … … 34 36 { 35 37 std::unique_lock<std::mutex> lock(fMutex); 38 39 // No filling allowed by default (the queue is 40 // always processed until it is empty) 41 size_t allowed = 0; 36 42 37 43 while (1) 38 44 { 39 while (f List.empty()&& fState==kRun)45 while (fSize==allowed && fState==kRun) 40 46 fCond.wait(lock); 41 47 48 // Check if the State flag has been changed 42 49 if (fState==kAbort) 43 50 break; … … 46 53 break; 47 54 48 const T &val = fList.front(); 55 // If thread got just woken up, move back the state to kRun 56 if (fState == kTrigger) 57 fState = kRun; 58 59 // Could have been a fState==kTrigger case 60 if (fList.empty()) 61 continue; 62 63 // During the unlocked state, fSize might change. 64 // The current size of the queue needs to be saved. 65 allowed = fSize; 66 67 // get the first entry from the (sorted) list 68 const auto it = fSort ? min_element(fList.begin(), fList.end()) : fList.begin(); 49 69 50 70 // Theoretically, we can loose a signal here, but this is … … 52 72 lock.unlock(); 53 73 54 if (fCallback) 55 fCallback(val); 74 // If the first event in the queue could not be processed, 75 // no further processing makes sense until a new event has 76 // been posted (or the number of events in the queue has 77 // changed) [allowed>0], in the case processing was 78 // successfull [alloed==0], the next event will be processed 79 // immediately. 80 if (!fCallback || !fCallback(*it)) 81 allowed = 0; 56 82 57 83 lock.lock(); 58 84 59 fList.pop_front(); 85 // Whenever an event was successfully processed, allowed 86 // is larger than zero and thus the event will be popped 87 if (allowed==0) 88 continue; 89 90 if (fSort) 91 fList.erase(it); 92 else 93 fList.pop_front() ; 94 60 95 fSize--; 96 allowed--; 97 61 98 } 62 99 … … 68 105 69 106 public: 70 Queue(const callback &f) : fSize(0), fState(kIdle), fCallback(f) 71 { 72 start(); 73 } 107 Queue(const callback &f, bool sort=false, bool startup=true) : fSize(0), fSort(sort), fState(kIdle), fCallback(f) 108 { 109 if (startup) 110 start(); 111 } 112 113 Queue(const Queue<T>& q) : fSize(0), fSort(q.fSort), fState(kIdle), fCallback(q.fCallback) 114 { 115 } 116 117 Queue<T>& operator = (const Queue<T>& q) 118 { 119 fSize = 0; 120 fSort = q.fSort; 121 fState = kIdle; 122 fCallback = q.fCallback; 123 return *this; 124 } 125 74 126 ~Queue() 75 127 { … … 133 185 { 134 186 const std::lock_guard<std::mutex> lock(fMutex); 187 135 188 if (fState==kIdle) 136 189 return false; … … 139 192 fSize++; 140 193 194 fCond.notify_one(); 195 196 return true; 197 } 198 199 bool notify() 200 { 201 const std::lock_guard<std::mutex> lock(fMutex); 202 if (fState!=kRun) 203 return false; 204 205 fState = kTrigger; 141 206 fCond.notify_one(); 142 207
Note:
See TracChangeset
for help on using the changeset viewer.