#include "MTcpIpIO.h" #include // usleep #include #include #include "MLog.h" #include "MLogManip.h" #undef DEBUG using namespace std; /* enum ESockOptions { kSendBuffer, // size of send buffer kRecvBuffer, // size of receive buffer kOobInline, // OOB message inline kKeepAlive, // keep socket alive kReuseAddr, // allow reuse of local portion of address 5-tuple kNoDelay, // send without delay kNoBlock, // non-blocking I/O kProcessGroup, // socket process group (used for SIGURG and SIGIO) kAtMark, // are we at out-of-band mark (read only) kBytesToRead // get number of bytes to read, FIONREAD (read only) }; enum ESendRecvOptions { kDefault, // default option (= 0) kOob, // send or receive out-of-band data kPeek, // peek at incoming message (receive only) kDontBlock // send/recv as much data as possible without blocking }; */ MTcpIpO::MTcpIpO(Int_t tx) { fTxSocket = new TSocket("ceco", tx); } MTcpIpO::~MTcpIpO() { // Now delete all TCP/IP objects delete fTxSocket; } MTcpIpIO::MTcpIpIO(Int_t tx, Int_t rx) : MTcpIpI(rx), MTcpIpO(tx) { RunThread(); } MTcpIpIO::~MTcpIpIO() { } bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len) { if (!tx.IsValid()) { //cout << "*!* Transmit socket invalid!" << endl; return false; } const Int_t l = tx.SendRaw(msg, len); if (l<0) { cout << "ERROR - Sending Message" << endl; return false; } if (l!=len) { cout << "Send wrong number (" << l << ") of Bytes." << endl; return false; } #ifdef DEBUG cout << "Tx: " << msg << flush; #endif return true; } bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len) { // R__LOCKGUARD2(myMutex); cout << "Connecting to " << addr << ":" << port << endl; // FIXME: Set tx-socket to nonblocking? TSocket tx(addr, port); // return SendFrame(tx, msg, len); if (!tx.IsValid()) { //cout << "*!* Transmit socket invalid!" << endl; return false; } cout << "Sending to " << addr << ":" << port << endl; const Int_t l = tx.SendRaw(msg, len, kDontBlock); if (l<0) { cout << "ERROR - Sending Message" << endl; return false; } if (l!=len) { cout << "Send wrong number (" << l << ") of Bytes." << endl; return false; } #ifdef DEBUG cout << "Tx: " << msg << flush; #endif return true; } bool MTcpIpO::Send(const char *msg, Int_t len) { return SendFrame(*fTxSocket, msg, len); } bool MTcpIpIO::InterpreteStr(TString str) { cout << "Rx: " << str << flush; return true; } void MTcpIpIO::ReadSocket(TSocket &rx) { // Clear buffer! char c; // while (fRxSocket->RecvRaw(&c, 1)>0 && !HasStopFlag()) while (rx.RecvRaw(&c, 1)>0 && !IsThreadCanceled()) usleep(1); TString str; // while (!HasStopFlag()) while (!IsThreadCanceled()) { char c; const Int_t len = rx.RecvRaw(&c, 1); // No data received (non-blocking mode) if (len<0) { usleep(1); continue; } // Data received with zero length! if (len==0) { // THIS MEANS CONNECTIION LOST!!!! cout << "============> len==0 (CONNECTION LOST?)" << endl; break; // This break is for TEST PURPOSE FIXME!!! continue; } // Data received if (len>1) { cout << "Data too long!!!" << endl; break; } // Data received (len==1) if (c!='\n') { str += c; continue; } // String completed InterpreteStr(str); str = ""; } } //void *MTcpIpIO::Thread() Int_t MTcpIpI::Thread() { gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl; TServerSocket *fServSock=NULL; TSocket *fRxSocket=NULL; // while (!HasStopFlag()) while (!IsThreadCanceled()) { fServSock=new TServerSocket(fPortRx, kTRUE); if (!fServSock->IsValid()) { cout << "ServerSocket on port " << fPortRx << " invalid: "; switch (fServSock->GetErrorCode()) { case 0: cout << "No error." << endl; break; case -1: cout << "low level socket() call failed." << endl; break; case -2: cout << "low level bind() call failed." << endl; break; case -3: cout << "low level listen() call failed." << endl; break; default: cout << "Unknown." << endl; break; } delete fServSock; fServSock=NULL; MyThreadX::Sleep(5000000); continue; } fServSock->SetOption(kNoBlock, 1); cout << "Waiting for connection on port " << fPortRx << "..." << endl; // while (!HasStopFlag() && (Long_t)fRxSocket<=0) while (!IsThreadCanceled() && (Long_t)fRxSocket<=0) { //TThread::CancelPoint(); fRxSocket = fServSock->Accept(); if (fRxSocket==0) cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl; usleep(10); } // Can happen in case of HasStopFlag() if (fRxSocket==(void*)-1) fRxSocket=NULL; if (fRxSocket==NULL) { delete fServSock; fServSock=NULL; continue; } if (!fRxSocket->IsValid()) { cout << "TSocket invalid on port " << fPortRx << "." << endl; delete fServSock; delete fRxSocket; fServSock = NULL; fRxSocket = NULL; continue; } cout << "Connection established on port " << fPortRx << "." << endl; fRxSocket->SetOption(kNoBlock, 1); // ------ IDENTICAL UP TO HERE ------ ReadSocket(*fRxSocket); delete fServSock; delete fRxSocket; fServSock = NULL; fRxSocket = NULL; } gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl; return 0; // return NULL; }