Ignore:
Timestamp:
06/09/13 12:51:13 (11 years ago)
Author:
tbretz
Message:
A less CPU hungry version of the msg queue using C++11 techniques.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/Cosy/base/msgqueue.cc

    r8843 r16779  
    11#include "msgqueue.h"
     2
     3#include <iostream>
    24
    35using namespace std;
     
    79// This creates the Message queue thread,
    810//
    9 MsgQueue::MsgQueue() : MThread("MsqQueue"), fNextMsg(0), fNextPtr(0)
     11MsgQueue::MsgQueue() : fState(kRun), fThread(std::bind(&MsgQueue::Thread, this))
    1012{
    11     RunThread();
    1213}
    1314
     
    1920{
    2021    CancelThread();
     22}
    2123
    22     fMuxMsg.Lock();
     24void MsgQueue::CancelThread()
     25{
     26    const std::lock_guard<std::mutex> lock(fMutex);
     27    if (fState!=kIdle)
     28        return;
    2329
    24     if (fNextPtr)
    25     {
    26         delete [] fNextPtr;
    27         fNextPtr = 0;
    28     }
    29 
    30     fMuxMsg.UnLock();
     30    fState = kAbort;
     31    fCond.notify_one();
     32    fThread.join();
    3133}
    3234
     
    4345// the old action can be finished correctly by the user.
    4446//
    45 Int_t MsgQueue::Proc(int msg, void *mp)
     47int MsgQueue::Proc(int msg, void *mp)
    4648{
    4749    return 0;
     
    5052int MsgQueue::Break() const
    5153{
    52     return fNextMsg>0 || IsThreadCanceled();
     54    return fSize>0 || fState!=kRun;
    5355}
    5456
     
    6062// in this thread. This makes sure, that the calling program is not stalled.
    6163//
    62 Int_t MsgQueue::Thread()
     64void MsgQueue::Thread()
    6365{
     66    std::unique_lock<std::mutex> lock(fMutex);
     67
    6468    while (1)
    6569    {
    66         Int_t msg = 0;
    67         char *ptr = 0;
     70        while (fList.empty() && fState==kRun)
     71            fCond.wait(lock);
    6872
    69         fMuxMsg.Lock();
     73        if (fState==kAbort)
     74            break;
    7075
    71         if (fNextMsg)
    72         {
    73             msg = fNextMsg;
    74             ptr = fNextPtr;
     76        if (fState==kStop && fList.empty())
     77            break;
    7578
    76             fNextMsg = 0;
    77             fNextPtr = 0;
    78         }
     79        const auto &val = fList.front();
     80        fSize--;
    7981
    80         fMuxMsg.UnLock();
     82        lock.unlock();
     83        Proc(val.first, const_cast<char*>(val.second.data()));
     84        lock.lock();
    8185
    82         if (ptr)
    83         {
    84             Proc(msg, ptr);
    85             delete [] ptr;
    86         }
     86        fList.pop_front();
     87    }
    8788
    88         usleep(1);
    89         TThread::CancelPoint();
    90     }
    91     return 0;
     89    fList.clear();
     90    fSize = 0;
     91
     92    fState = kIdle;
    9293}
    9394
     
    102103void *MsgQueue::PostMsg(int msg, void *mp, int size)
    103104{
    104     fMuxMsg.Lock();
     105    const std::lock_guard<std::mutex> lock(fMutex);
     106    if (fState==kIdle)
     107        return false;
    105108
    106     fNextMsg = msg;
     109    fSize++;
     110    fList.emplace_back(msg, vector<char>(reinterpret_cast<char*>(mp),reinterpret_cast<char*>(mp)+size));
    107111
    108     if (fNextPtr)
    109         delete [] fNextPtr;
    110     fNextPtr = new char[size];
    111 
    112     memcpy(fNextPtr, mp, size);
    113 
    114     fMuxMsg.UnLock();
     112    fCond.notify_one();
    115113
    116114    return 0;
     115
    117116}
Note: See TracChangeset for help on using the changeset viewer.