Changeset 16779 for trunk/Cosy


Ignore:
Timestamp:
06/09/13 12:51:13 (11 years ago)
Author:
tbretz
Message:
A less CPU hungry version of the msg queue using C++11 techniques.
Location:
trunk/Cosy/base
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/Cosy/base/msgqueue.cc

    r8843 r16779  
    11#include "msgqueue.h"
     2
     3#include <iostream>
    24
    35using namespace std;
     
    79// This creates the Message queue thread,
    810//
    9 MsgQueue::MsgQueue() : MThread("MsqQueue"), fNextMsg(0), fNextPtr(0)
     11MsgQueue::MsgQueue() : fState(kRun), fThread(std::bind(&MsgQueue::Thread, this))
    1012{
    11     RunThread();
    1213}
    1314
     
    1920{
    2021    CancelThread();
     22}
    2123
    22     fMuxMsg.Lock();
     24void MsgQueue::CancelThread()
     25{
     26    const std::lock_guard<std::mutex> lock(fMutex);
     27    if (fState!=kIdle)
     28        return;
    2329
    24     if (fNextPtr)
    25     {
    26         delete [] fNextPtr;
    27         fNextPtr = 0;
    28     }
    29 
    30     fMuxMsg.UnLock();
     30    fState = kAbort;
     31    fCond.notify_one();
     32    fThread.join();
    3133}
    3234
     
    4345// the old action can be finished correctly by the user.
    4446//
    45 Int_t MsgQueue::Proc(int msg, void *mp)
     47int MsgQueue::Proc(int msg, void *mp)
    4648{
    4749    return 0;
     
    5052int MsgQueue::Break() const
    5153{
    52     return fNextMsg>0 || IsThreadCanceled();
     54    return fSize>0 || fState!=kRun;
    5355}
    5456
     
    6062// in this thread. This makes sure, that the calling program is not stalled.
    6163//
    62 Int_t MsgQueue::Thread()
     64void MsgQueue::Thread()
    6365{
     66    std::unique_lock<std::mutex> lock(fMutex);
     67
    6468    while (1)
    6569    {
    66         Int_t msg = 0;
    67         char *ptr = 0;
     70        while (fList.empty() && fState==kRun)
     71            fCond.wait(lock);
    6872
    69         fMuxMsg.Lock();
     73        if (fState==kAbort)
     74            break;
    7075
    71         if (fNextMsg)
    72         {
    73             msg = fNextMsg;
    74             ptr = fNextPtr;
     76        if (fState==kStop && fList.empty())
     77            break;
    7578
    76             fNextMsg = 0;
    77             fNextPtr = 0;
    78         }
     79        const auto &val = fList.front();
     80        fSize--;
    7981
    80         fMuxMsg.UnLock();
     82        lock.unlock();
     83        Proc(val.first, const_cast<char*>(val.second.data()));
     84        lock.lock();
    8185
    82         if (ptr)
    83         {
    84             Proc(msg, ptr);
    85             delete [] ptr;
    86         }
     86        fList.pop_front();
     87    }
    8788
    88         usleep(1);
    89         TThread::CancelPoint();
    90     }
    91     return 0;
     89    fList.clear();
     90    fSize = 0;
     91
     92    fState = kIdle;
    9293}
    9394
     
    102103void *MsgQueue::PostMsg(int msg, void *mp, int size)
    103104{
    104     fMuxMsg.Lock();
     105    const std::lock_guard<std::mutex> lock(fMutex);
     106    if (fState==kIdle)
     107        return false;
    105108
    106     fNextMsg = msg;
     109    fSize++;
     110    fList.emplace_back(msg, vector<char>(reinterpret_cast<char*>(mp),reinterpret_cast<char*>(mp)+size));
    107111
    108     if (fNextPtr)
    109         delete [] fNextPtr;
    110     fNextPtr = new char[size];
    111 
    112     memcpy(fNextPtr, mp, size);
    113 
    114     fMuxMsg.UnLock();
     112    fCond.notify_one();
    115113
    116114    return 0;
     115
    117116}
  • trunk/Cosy/base/msgqueue.h

    r8843 r16779  
    22#define COSY_MsgQueue
    33
    4 #ifndef ROOT_TMutex
    5 #include <TMutex.h>
    6 #endif
    7 
    8 #ifndef MARS_MThread
    9 #include "MThread.h"
     4#ifndef __CINT__
     5#include <list>
     6#include <mutex>
     7#include <thread>
     8#include <vector>
     9#include <condition_variable>
    1010#endif
    1111
     
    1313#define WM_QUIT 0xffff
    1414
    15 class MsgQueue : public MThread
     15class MsgQueue
    1616{
    1717private:
    18     int    fNextMsg;
    19     char  *fNextPtr;
     18    size_t fSize;                 // Only necessary for before C++11
    2019
    21     TMutex fMuxMsg;
     20#ifndef __CINT__
     21    std::list<std::pair<int, std::vector<char>>> fList;
    2222
    23     Int_t Thread();
     23    std::mutex fMutex;        // Mutex needed for the conditional
     24    std::condition_variable fCond; // Conditional
     25#endif
     26
     27    enum state_t
     28    {
     29        kIdle,
     30        kRun,
     31        kStop,
     32        kAbort,
     33    };
     34
     35    state_t fState;               // Stop signal for the thread
     36#ifndef __CINT__
     37    std::thread fThread;      // Handle to the thread
     38#endif
     39
     40    void Thread();
    2441
    2542public:
     
    2946    int Break() const;
    3047
    31     virtual Int_t Proc(int msg, void *mp1);
    32     Int_t Proc(int msg) { return Proc(msg, 0); }
     48    void CancelThread();
     49
     50    virtual int Proc(int msg, void *mp1);
     51    int Proc(int msg) { return Proc(msg, 0); }
    3352
    3453    void *PostMsg(int msg, void *mp1, int size);
Note: See TracChangeset for help on using the changeset viewer.