Changeset 17219 for trunk/FACT++


Ignore:
Timestamp:
10/15/13 15:05:54 (11 years ago)
Author:
lyard
Message:
draft zofits working with Queue and MemoryManager
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/queue.h

    r16562 r17219  
    1010{
    1111    size_t fSize;                 // Only necessary for before C++11
     12    bool   fSort;                 // Sort the list before processing
    1213
    1314    std::list<T> fList;
    1415
    15     std::mutex fMutex;        // Mutex needed for the conditional
     16    std::mutex fMutex;             // Mutex needed for the conditional
    1617    std::condition_variable fCond; // Conditional
    1718
     
    2223        kStop,
    2324        kAbort,
     25        kTrigger
    2426    };
    2527
    2628    state_t fState;               // Stop signal for the thread
    2729
    28     typedef std::function<void(const T &)> callback;
     30    typedef std::function<bool(const T &)> callback;
    2931    callback fCallback;       // Callback function called by the thread
    3032
     
    3436    {
    3537        std::unique_lock<std::mutex> lock(fMutex);
     38
     39        // No filling allowed by default (the queue is
     40        // always processed until it is empty)
     41        size_t allowed = 0;
    3642
    3743        while (1)
    3844        {
    39             while (fList.empty() && fState==kRun)
     45            while (fSize==allowed && fState==kRun)
    4046                fCond.wait(lock);
    4147
     48            // Check if the State flag has been changed
    4249            if (fState==kAbort)
    4350                break;
     
    4653                break;
    4754
    48             const T &val = fList.front();
     55            // If thread got just woken up, move back the state to kRun
     56            if (fState == kTrigger)
     57                fState = kRun;
     58
     59            // Could have been a fState==kTrigger case
     60            if (fList.empty())
     61                continue;
     62
     63            // During the unlocked state, fSize might change.
     64            // The current size of the queue needs to be saved.
     65            allowed = fSize;
     66
     67            // get the first entry from the (sorted) list
     68            const auto it = fSort ? min_element(fList.begin(), fList.end()) : fList.begin();
    4969
    5070            // Theoretically, we can loose a signal here, but this is
     
    5272            lock.unlock();
    5373
    54             if (fCallback)
    55                 fCallback(val);
     74            // If the first event in the queue could not be processed,
     75            // no further processing makes sense until a new event has
     76            // been posted (or the number of events in the queue has
     77            // changed)  [allowed>0], in the case processing was
     78            // successfull [alloed==0], the next event will be processed
     79            // immediately.
     80            if (!fCallback || !fCallback(*it))
     81                allowed = 0;
    5682
    5783            lock.lock();
    5884
    59             fList.pop_front();
     85            // Whenever an event was successfully processed, allowed
     86            // is larger than zero and thus the event will be popped
     87            if (allowed==0)
     88                continue;
     89
     90            if (fSort)
     91                fList.erase(it);
     92            else
     93                fList.pop_front() ;
     94
    6095            fSize--;
     96            allowed--;
     97
    6198        }
    6299
     
    68105
    69106public:
    70     Queue(const callback &f) : fSize(0), fState(kIdle), fCallback(f)
    71     {
    72         start();
    73     }
     107    Queue(const callback &f, bool sort=false, bool startup=true) : fSize(0), fSort(sort), fState(kIdle), fCallback(f)
     108    {
     109        if (startup)
     110            start();
     111    }
     112
     113    Queue(const Queue<T>& q) : fSize(0), fSort(q.fSort), fState(kIdle), fCallback(q.fCallback)
     114    {
     115    }
     116
     117    Queue<T>& operator = (const Queue<T>& q)
     118    {
     119        fSize     = 0;
     120        fSort     = q.fSort;
     121        fState    = kIdle;
     122        fCallback = q.fCallback;
     123        return *this;
     124    }
     125
    74126    ~Queue()
    75127    {
     
    133185    {
    134186        const std::lock_guard<std::mutex> lock(fMutex);
     187
    135188        if (fState==kIdle)
    136189            return false;
     
    139192        fSize++;
    140193
     194        fCond.notify_one();
     195
     196        return true;
     197    }
     198
     199    bool notify()
     200    {
     201        const std::lock_guard<std::mutex> lock(fMutex);
     202        if (fState!=kRun)
     203            return false;
     204
     205        fState = kTrigger;
    141206        fCond.notify_one();
    142207
Note: See TracChangeset for help on using the changeset viewer.