Changeset 17220


Ignore:
Timestamp:
10/15/13 15:22:42 (11 years ago)
Author:
tbretz
Message:
Reverting to last revision.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/queue.h

    r17219 r17220  
    1010{
    1111    size_t fSize;                 // Only necessary for before C++11
    12     bool   fSort;                 // Sort the list before processing
    1312
    1413    std::list<T> fList;
    1514
    16     std::mutex fMutex;             // Mutex needed for the conditional
     15    std::mutex fMutex;        // Mutex needed for the conditional
    1716    std::condition_variable fCond; // Conditional
    1817
     
    2322        kStop,
    2423        kAbort,
    25         kTrigger
    2624    };
    2725
    2826    state_t fState;               // Stop signal for the thread
    2927
    30     typedef std::function<bool(const T &)> callback;
     28    typedef std::function<void(const T &)> callback;
    3129    callback fCallback;       // Callback function called by the thread
    3230
     
    3735        std::unique_lock<std::mutex> lock(fMutex);
    3836
    39         // No filling allowed by default (the queue is
    40         // always processed until it is empty)
    41         size_t allowed = 0;
    42 
    4337        while (1)
    4438        {
    45             while (fSize==allowed && fState==kRun)
     39            while (fList.empty() && fState==kRun)
    4640                fCond.wait(lock);
    4741
    48             // Check if the State flag has been changed
    4942            if (fState==kAbort)
    5043                break;
     
    5346                break;
    5447
    55             // If thread got just woken up, move back the state to kRun
    56             if (fState == kTrigger)
    57                 fState = kRun;
    58 
    59             // Could have been a fState==kTrigger case
    60             if (fList.empty())
    61                 continue;
    62 
    63             // During the unlocked state, fSize might change.
    64             // The current size of the queue needs to be saved.
    65             allowed = fSize;
    66 
    67             // get the first entry from the (sorted) list
    68             const auto it = fSort ? min_element(fList.begin(), fList.end()) : fList.begin();
     48            const T &val = fList.front();
    6949
    7050            // Theoretically, we can loose a signal here, but this is
     
    7252            lock.unlock();
    7353
    74             // If the first event in the queue could not be processed,
    75             // no further processing makes sense until a new event has
    76             // been posted (or the number of events in the queue has
    77             // changed)  [allowed>0], in the case processing was
    78             // successfull [alloed==0], the next event will be processed
    79             // immediately.
    80             if (!fCallback || !fCallback(*it))
    81                 allowed = 0;
     54            if (fCallback)
     55                fCallback(val);
    8256
    8357            lock.lock();
    8458
    85             // Whenever an event was successfully processed, allowed
    86             // is larger than zero and thus the event will be popped
    87             if (allowed==0)
    88                 continue;
    89 
    90             if (fSort)
    91                 fList.erase(it);
    92             else
    93                 fList.pop_front() ;
    94 
     59            fList.pop_front();
    9560            fSize--;
    96             allowed--;
    97 
    9861        }
    9962
     
    10568
    10669public:
    107     Queue(const callback &f, bool sort=false, bool startup=true) : fSize(0), fSort(sort), fState(kIdle), fCallback(f)
     70    Queue(const callback &f) : fSize(0), fState(kIdle), fCallback(f)
    10871    {
    109         if (startup)
    110             start();
     72        start();
    11173    }
    112 
    113     Queue(const Queue<T>& q) : fSize(0), fSort(q.fSort), fState(kIdle), fCallback(q.fCallback)
    114     {
    115     }
    116 
    117     Queue<T>& operator = (const Queue<T>& q)
    118     {
    119         fSize     = 0;
    120         fSort     = q.fSort;
    121         fState    = kIdle;
    122         fCallback = q.fCallback;
    123         return *this;
    124     }
    125 
    12674    ~Queue()
    12775    {
     
    185133    {
    186134        const std::lock_guard<std::mutex> lock(fMutex);
    187 
    188135        if (fState==kIdle)
    189136            return false;
     
    192139        fSize++;
    193140
    194         fCond.notify_one();
    195 
    196         return true;
    197     }
    198 
    199     bool notify()
    200     {
    201         const std::lock_guard<std::mutex> lock(fMutex);
    202         if (fState!=kRun)
    203             return false;
    204 
    205         fState = kTrigger;
    206141        fCond.notify_one();
    207142
Note: See TracChangeset for help on using the changeset viewer.