Changeset 8816 for trunk/MagicSoft/Cosy/tcpip
- Timestamp:
- 01/16/08 14:34:11 (17 years ago)
- Location:
- trunk/MagicSoft/Cosy/tcpip
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/MagicSoft/Cosy/tcpip/MDriveCom.h
r7788 r8816 10 10 class MsgQueue; 11 11 class Ring; 12 class MLog; 12 13 13 14 class MDriveCom : public MCeCoCom … … 39 40 kMonitoring = 0x40 40 41 }; 41 42 MDriveCom(MsgQueue *q, MLog &out=gLog) : MCeCoCom("DRIVE-REPORT",out), fQueue(q) {}42 43 MDriveCom(MsgQueue *q, MLog *out) : MCeCoCom(out), fQueue(q) {} 43 44 44 45 bool SendReport(UInt_t stat, RaDec rd, ZdAz so, ZdAz is, ZdAz er); -
trunk/MagicSoft/Cosy/tcpip/MTcpIpIO.cc
r8378 r8816 2 2 3 3 #include <unistd.h> // usleep 4 #include <iostream>5 4 6 5 #include <TSocket.h> 7 6 #include <TServerSocket.h> 7 8 #include "MLog.h" 9 #include "MLogManip.h" 8 10 9 11 #undef DEBUG … … 33 35 */ 34 36 35 MTcpIpIO::MTcpIpIO(MLog &out) 36 : MThread(false), Log(out), fRxSocket(NULL), fServSock(NULL), fSendInterval(1000) 37 { 38 fTxSocket = new TSocket("ceco", 7304); 37 MTcpIpO::MTcpIpO(Int_t tx) 38 { 39 fTxSocket = new TSocket("ceco", tx); 40 } 41 42 MTcpIpO::~MTcpIpO() 43 { 44 // Now delete all TCP/IP objects 45 delete fTxSocket; 46 } 47 48 MTcpIpIO::MTcpIpIO(Int_t tx, Int_t rx) : MTcpIpI(rx), MTcpIpO(tx) 49 { 50 RunThread(); 39 51 } 40 52 41 53 MTcpIpIO::~MTcpIpIO() 42 54 { 43 // 44 // Make sure, that no loop waiting for connection 45 // is running anymore! 46 // 47 Stop(); 48 49 // 50 // Now delete all TCP/IP objects 51 // 52 //cout << "Delete TxSocket " << fTxSocket << "..." << flush; 53 delete fTxSocket; 54 //cout << "Done." << endl; 55 if (fServSock) 56 { 57 //cout << "Delete ServSock " << fServSock << "..." << flush; 58 delete fServSock; 59 //cout << "Done." << endl; 60 } 61 if (fRxSocket) 62 { 63 //cout << "Delete RxSocket " << fRxSocket << "..." << flush; 64 delete fRxSocket; 65 //cout << "Done." << endl; 66 } 67 } 68 69 bool MTcpIpIO::Send(const char *msg, bool force=kFALSE) 70 { 71 72 const MTime t(-1); 73 74 if ((double)t-(double)fTime<0.001*fSendInterval && !force) 75 return true; 76 77 78 if (lout.Lock("MTcpIpIO::Send")) 79 { 80 //const Int_t rc = lout.IsOutputDeviceEnabled(MLog::eGui); 81 //lout.DisableOutputDevice(MLog::eGui); 82 lout << msg << flush; 83 lout.UnLock("MTcpIpIO::Send"); 84 //if (rc) 85 // lout.EnableOutputDevice(MLog::eGui); 86 } 87 88 fTime = t; 89 90 if (!fTxSocket->IsValid()) 91 return false; 92 93 const Int_t len = fTxSocket->SendRaw(msg, strlen(msg)); 94 if (len<0) 55 } 56 57 bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len) 58 { 59 if (!tx.IsValid()) 60 { 61 //cout << "*!* Transmit socket invalid!" << endl; 62 return false; 63 } 64 65 const Int_t l = tx.SendRaw(msg, len); 66 if (l<0) 95 67 { 96 68 cout << "ERROR - Sending Message" << endl; 97 69 return false; 98 70 } 99 if (len!=(Int_t)strlen(msg)) 100 { 101 cout << "Send wrong number (" << len << ") of Bytes." << endl; 102 return false; 103 } 71 72 if (l!=len) 73 { 74 cout << "Send wrong number (" << l << ") of Bytes." << endl; 75 return false; 76 } 77 104 78 #ifdef DEBUG 105 79 cout << "Tx: " << msg << flush; … … 109 83 } 110 84 85 bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len) 86 { 87 // R__LOCKGUARD2(myMutex); 88 89 cout << "Connecting to " << addr << ":" << port << endl; 90 91 // FIXME: Set tx-socket to nonblocking? 92 TSocket tx(addr, port); 93 // return SendFrame(tx, msg, len); 94 95 if (!tx.IsValid()) 96 { 97 //cout << "*!* Transmit socket invalid!" << endl; 98 return false; 99 } 100 101 cout << "Sending to " << addr << ":" << port << endl; 102 103 const Int_t l = tx.SendRaw(msg, len, kDontBlock); 104 if (l<0) 105 { 106 cout << "ERROR - Sending Message" << endl; 107 return false; 108 } 109 110 if (l!=len) 111 { 112 cout << "Send wrong number (" << l << ") of Bytes." << endl; 113 return false; 114 } 115 116 #ifdef DEBUG 117 cout << "Tx: " << msg << flush; 118 #endif 119 120 return true; 121 } 122 123 bool MTcpIpO::Send(const char *msg, Int_t len) 124 { 125 return SendFrame(*fTxSocket, msg, len); 126 } 127 111 128 bool MTcpIpIO::InterpreteStr(TString str) 112 129 { … … 115 132 } 116 133 117 void MTcpIpIO::Clear() 118 { 134 void MTcpIpIO::ReadSocket(TSocket &rx) 135 { 136 // Clear buffer! 119 137 char c; 120 while (fRxSocket->RecvRaw(&c, 1)>0 && !HasStopFlag()) 138 // while (fRxSocket->RecvRaw(&c, 1)>0 && !HasStopFlag()) 139 while (rx.RecvRaw(&c, 1)>0 && !IsThreadCanceled()) 121 140 usleep(1); 122 } 123 124 void *MTcpIpIO::Thread() 125 { 126 cout << "Starting receiver..." << endl; 127 128 while (!HasStopFlag()) 129 { 130 fServSock=new TServerSocket(7404, kTRUE); 141 142 TString str; 143 // while (!HasStopFlag()) 144 while (!IsThreadCanceled()) 145 { 146 char c; 147 const Int_t len = rx.RecvRaw(&c, 1); 148 149 // No data received (non-blocking mode) 150 if (len<0) 151 { 152 usleep(1); 153 continue; 154 } 155 156 // Data received with zero length! 157 if (len==0) 158 { 159 // THIS MEANS CONNECTIION LOST!!!! 160 cout << "============> len==0 (CONNECTION LOST?)" << endl; 161 break; // This break is for TEST PURPOSE FIXME!!! 162 continue; 163 } 164 165 // Data received 166 if (len>1) 167 { 168 cout << "Data too long!!!" << endl; 169 break; 170 } 171 172 // Data received (len==1) 173 if (c!='\n') 174 { 175 str += c; 176 continue; 177 } 178 179 // String completed 180 InterpreteStr(str); 181 str = ""; 182 } 183 } 184 185 //void *MTcpIpIO::Thread() 186 Int_t MTcpIpI::Thread() 187 { 188 gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl; 189 190 TServerSocket *fServSock=NULL; 191 TSocket *fRxSocket=NULL; 192 193 // while (!HasStopFlag()) 194 while (!IsThreadCanceled()) 195 { 196 fServSock=new TServerSocket(fPortRx, kTRUE); 131 197 if (!fServSock->IsValid()) 132 198 { 133 cout << "ServerSocket notvalid: ";199 cout << "ServerSocket on port " << fPortRx << " invalid: "; 134 200 switch (fServSock->GetErrorCode()) 135 201 { … … 142 208 delete fServSock; 143 209 fServSock=NULL; 144 usleep(5000000);210 MyThreadX::Sleep(5000000); 145 211 continue; 146 212 } … … 148 214 fServSock->SetOption(kNoBlock, 1); 149 215 150 cout << "Waiting for conntection on port 7404..." << endl; 151 while (!HasStopFlag() && (Long_t)fRxSocket<=0) 152 { 216 cout << "Waiting for connection on port " << fPortRx << "..." << endl; 217 // while (!HasStopFlag() && (Long_t)fRxSocket<=0) 218 while (!IsThreadCanceled() && (Long_t)fRxSocket<=0) 219 { 220 //TThread::CancelPoint(); 221 153 222 fRxSocket = fServSock->Accept(); 154 223 if (fRxSocket==0) 155 cout << "Error: TServerSock::Accept " << endl;224 cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl; 156 225 usleep(10); 157 226 } … … 170 239 if (!fRxSocket->IsValid()) 171 240 { 172 cout << "TSocket not valid..." << endl;241 cout << "TSocket invalid on port " << fPortRx << "." << endl; 173 242 delete fServSock; 174 243 delete fRxSocket; … … 178 247 } 179 248 180 cout << "Connection established ..." << endl;249 cout << "Connection established on port " << fPortRx << "." << endl; 181 250 182 251 fRxSocket->SetOption(kNoBlock, 1); 183 252 184 Clear(); 185 186 TString str; 187 while (!HasStopFlag()) 188 { 189 char c; 190 const Int_t len = fRxSocket->RecvRaw(&c, 1); 191 192 // No data received (non-blocking mode) 193 if (len<0) 194 { 195 usleep(1); 196 continue; 197 } 198 199 // Data received with zero length! 200 if (len==0) 201 { 202 cout << "len==0" << endl; 203 continue; 204 } 205 206 // Data received 207 if (len>1) 208 { 209 cout << "Data too long!!!" << endl; 210 break; 211 } 212 213 // Data received (len==1) 214 if (c!='\n') 215 { 216 str += c; 217 continue; 218 } 219 220 // String completed 221 InterpreteStr(str); 222 str = ""; 223 } 253 // ------ IDENTICAL UP TO HERE ------ 254 255 ReadSocket(*fRxSocket); 256 224 257 delete fServSock; 225 258 delete fRxSocket; … … 228 261 } 229 262 230 cout << "Receiver stopped..." << endl; 231 232 return NULL; 233 } 263 gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl; 264 265 return 0; 266 // return NULL; 267 } -
trunk/MagicSoft/Cosy/tcpip/MTcpIpIO.h
r4865 r8816 2 2 #define COSY_MTcpIpIO 3 3 4 #ifndef COSY_MThread4 #ifndef MARS_MThread 5 5 #include "MThread.h" 6 #endif7 #ifndef COSY_Log8 #include "log.h"9 6 #endif 10 7 #ifndef MARS_MTime … … 12 9 #endif 13 10 11 14 12 class TString; 15 13 class TSocket; 16 14 class TServerSocket; 17 15 18 class MTcpIpIO : public MThread, public Log 16 // A generalized class for receiving over tcp/ip 17 class MTcpIpI : public MyThreadX 19 18 { 20 19 private: 21 TSocket *fTxSocket; 22 TSocket *fRxSocket; 23 TServerSocket *fServSock; 20 Int_t fPortRx; 24 21 25 MTime fTime; 26 Int_t fSendInterval; // [ms] 22 Int_t Thread(); 27 23 28 v oid Clear();24 virtual void ReadSocket(TSocket &rx) = 0; 29 25 30 26 public: 31 MTcpIpIO(MLog &out=gLog); 27 MTcpIpI(Int_t rx) : MyThreadX(Form("MTcpIpI::%d", rx)), fPortRx(rx) { /*RunThread();*/ } 28 ~MTcpIpI() { CancelThread(); } 29 }; 30 31 32 // A generalized class for sending over tcp/ip 33 class MTcpIpO 34 { 35 private: 36 TSocket *fTxSocket; 37 38 public: 39 MTcpIpO(Int_t tx); 40 ~MTcpIpO(); 41 42 static bool SendFrame(TSocket &tx, const char *msg, int len); 43 static bool SendFrame(const char *addr, int port, const char *msg, int len); 44 45 bool Send(const char *msg, int len); 46 }; 47 48 // This class es espcially meant to receive and send ascii messages 49 class MTcpIpIO : public MTcpIpI, public MTcpIpO 50 { 51 private: 52 void ReadSocket(TSocket &rx); 53 54 public: 55 MTcpIpIO(Int_t tx, Int_t rx); 32 56 ~MTcpIpIO(); 33 57 34 virtual bool Send(const char *msg, bool force);35 58 virtual bool InterpreteStr(TString str); 36 37 void *Thread();38 59 }; 39 60
Note:
See TracChangeset
for help on using the changeset viewer.