- Timestamp:
- 06/09/13 12:51:13 (11 years ago)
- Location:
- trunk/Cosy/base
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Cosy/base/msgqueue.cc
r8843 r16779 1 1 #include "msgqueue.h" 2 3 #include <iostream> 2 4 3 5 using namespace std; … … 7 9 // This creates the Message queue thread, 8 10 // 9 MsgQueue::MsgQueue() : MThread("MsqQueue"), fNextMsg(0), fNextPtr(0)11 MsgQueue::MsgQueue() : fState(kRun), fThread(std::bind(&MsgQueue::Thread, this)) 10 12 { 11 RunThread();12 13 } 13 14 … … 19 20 { 20 21 CancelThread(); 22 } 21 23 22 fMuxMsg.Lock(); 24 void MsgQueue::CancelThread() 25 { 26 const std::lock_guard<std::mutex> lock(fMutex); 27 if (fState!=kIdle) 28 return; 23 29 24 if (fNextPtr) 25 { 26 delete [] fNextPtr; 27 fNextPtr = 0; 28 } 29 30 fMuxMsg.UnLock(); 30 fState = kAbort; 31 fCond.notify_one(); 32 fThread.join(); 31 33 } 32 34 … … 43 45 // the old action can be finished correctly by the user. 44 46 // 45 Int_t MsgQueue::Proc(int msg, void *mp)47 int MsgQueue::Proc(int msg, void *mp) 46 48 { 47 49 return 0; … … 50 52 int MsgQueue::Break() const 51 53 { 52 return f NextMsg>0 || IsThreadCanceled();54 return fSize>0 || fState!=kRun; 53 55 } 54 56 … … 60 62 // in this thread. This makes sure, that the calling program is not stalled. 61 63 // 62 Int_tMsgQueue::Thread()64 void MsgQueue::Thread() 63 65 { 66 std::unique_lock<std::mutex> lock(fMutex); 67 64 68 while (1) 65 69 { 66 Int_t msg = 0;67 char *ptr = 0;70 while (fList.empty() && fState==kRun) 71 fCond.wait(lock); 68 72 69 fMuxMsg.Lock(); 73 if (fState==kAbort) 74 break; 70 75 71 if (fNextMsg) 72 { 73 msg = fNextMsg; 74 ptr = fNextPtr; 76 if (fState==kStop && fList.empty()) 77 break; 75 78 76 fNextMsg = 0; 77 fNextPtr = 0; 78 } 79 const auto &val = fList.front(); 80 fSize--; 79 81 80 fMuxMsg.UnLock(); 82 lock.unlock(); 83 Proc(val.first, const_cast<char*>(val.second.data())); 84 lock.lock(); 81 85 82 if (ptr) 83 { 84 Proc(msg, ptr); 85 delete [] ptr; 86 } 86 fList.pop_front(); 87 } 87 88 88 usleep(1);89 TThread::CancelPoint();90 } 91 return 0;89 fList.clear(); 90 fSize = 0; 91 92 fState = kIdle; 92 93 } 93 94 … … 102 103 void *MsgQueue::PostMsg(int msg, void *mp, int size) 103 104 { 104 fMuxMsg.Lock(); 105 const std::lock_guard<std::mutex> lock(fMutex); 106 if (fState==kIdle) 107 return false; 105 108 106 fNextMsg = msg; 109 fSize++; 110 fList.emplace_back(msg, vector<char>(reinterpret_cast<char*>(mp),reinterpret_cast<char*>(mp)+size)); 107 111 108 if (fNextPtr) 109 delete [] fNextPtr; 110 fNextPtr = new char[size]; 111 112 memcpy(fNextPtr, mp, size); 113 114 fMuxMsg.UnLock(); 112 fCond.notify_one(); 115 113 116 114 return 0; 115 117 116 } -
trunk/Cosy/base/msgqueue.h
r8843 r16779 2 2 #define COSY_MsgQueue 3 3 4 #ifndef ROOT_TMutex5 #include < TMutex.h>6 # endif7 8 #i fndef MARS_MThread9 #include "MThread.h"4 #ifndef __CINT__ 5 #include <list> 6 #include <mutex> 7 #include <thread> 8 #include <vector> 9 #include <condition_variable> 10 10 #endif 11 11 … … 13 13 #define WM_QUIT 0xffff 14 14 15 class MsgQueue : public MThread15 class MsgQueue 16 16 { 17 17 private: 18 int fNextMsg; 19 char *fNextPtr; 18 size_t fSize; // Only necessary for before C++11 20 19 21 TMutex fMuxMsg; 20 #ifndef __CINT__ 21 std::list<std::pair<int, std::vector<char>>> fList; 22 22 23 Int_t Thread(); 23 std::mutex fMutex; // Mutex needed for the conditional 24 std::condition_variable fCond; // Conditional 25 #endif 26 27 enum state_t 28 { 29 kIdle, 30 kRun, 31 kStop, 32 kAbort, 33 }; 34 35 state_t fState; // Stop signal for the thread 36 #ifndef __CINT__ 37 std::thread fThread; // Handle to the thread 38 #endif 39 40 void Thread(); 24 41 25 42 public: … … 29 46 int Break() const; 30 47 31 virtual Int_t Proc(int msg, void *mp1); 32 Int_t Proc(int msg) { return Proc(msg, 0); } 48 void CancelThread(); 49 50 virtual int Proc(int msg, void *mp1); 51 int Proc(int msg) { return Proc(msg, 0); } 33 52 34 53 void *PostMsg(int msg, void *mp1, int size);
Note:
See TracChangeset
for help on using the changeset viewer.