#include "MTcpIpIO.h" #include // usleep #include #include #include #undef DEBUG using namespace std; /* enum ESockOptions { kSendBuffer, // size of send buffer kRecvBuffer, // size of receive buffer kOobInline, // OOB message inline kKeepAlive, // keep socket alive kReuseAddr, // allow reuse of local portion of address 5-tuple kNoDelay, // send without delay kNoBlock, // non-blocking I/O kProcessGroup, // socket process group (used for SIGURG and SIGIO) kAtMark, // are we at out-of-band mark (read only) kBytesToRead // get number of bytes to read, FIONREAD (read only) }; enum ESendRecvOptions { kDefault, // default option (= 0) kOob, // send or receive out-of-band data kPeek, // peek at incoming message (receive only) kDontBlock // send/recv as much data as possible without blocking }; */ MTcpIpIO::MTcpIpIO(MLog &out) : MThread(false), Log(out), fRxSocket(NULL), fServSock(NULL) { fTxSocket = new TSocket("ceco", 7304); } MTcpIpIO::~MTcpIpIO() { // // Make sure, that no loop waiting for connection // is running anymore! // Stop(); // // Now delete all TCP/IP objects // cout << "Delete TxSocket " << fTxSocket << "..." << flush; delete fTxSocket; cout << "Done." << endl; if (fServSock) { cout << "Delete ServSock " << fServSock << "..." << flush; delete fServSock; cout << "Done." << endl; } if (fRxSocket) { cout << "Delete RxSocket " << fRxSocket << "..." << flush; delete fRxSocket; cout << "Done." << endl; } } bool MTcpIpIO::Send(const char *msg) { if (!fTxSocket->IsValid()) return false; const UInt_t len = fTxSocket->SendRaw(msg, strlen(msg)); if (len<0) { cout << "ERROR - Sending Message" << endl; return false; } if (len!=strlen(msg)) { cout << "Send wrong number (" << len << ") of Bytes." << endl; return false; } #ifdef DEBUG cout << "Tx: " << msg << flush; #endif return true; } bool MTcpIpIO::InterpreteStr(TString str) { cout << "Rx: " << str << flush; } void MTcpIpIO::Clear() { char c; while (fRxSocket->RecvRaw(&c, 1)>0 && !HasStopFlag()) usleep(1); } void *MTcpIpIO::Thread() { cout << "Starting receiver..." << endl; while (!HasStopFlag()) { fServSock=new TServerSocket(7404, kTRUE); if (!fServSock->IsValid()) { cout << "ServerSocket not valid: "; switch (fServSock->GetErrorCode()) { case 0: cout << "No error." << endl; break; case -1: cout << "low level socket() call failed." << endl; break; case -2: cout << "low level bind() call failed." << endl; break; case -3: cout << "low level listen() call failed." << endl; break; default: cout << "Unknown." << endl; break; } delete fServSock; fServSock=NULL; usleep(5000000); continue; } fServSock->SetOption(kNoBlock, 1); cout << "Waiting for conntection on port 7404..." << endl; while (!HasStopFlag() && (Long_t)fRxSocket<=0) { fRxSocket = fServSock->Accept(); if (fRxSocket==0) cout << "Error: TServerSock::Accept" << endl; usleep(10); } // Can happen in case of HasStopFlag() if (fRxSocket==(void*)-1) fRxSocket=NULL; if (fRxSocket==NULL) { delete fServSock; fServSock=NULL; continue; } if (!fRxSocket->IsValid()) { cout << "TSocket not valid..." << endl; delete fServSock; delete fRxSocket; fServSock = NULL; fRxSocket = NULL; continue; } cout << "Connection established..." << endl; fRxSocket->SetOption(kNoBlock, 1); Clear(); TString str; while (!HasStopFlag()) { char c; const Int_t len = fRxSocket->RecvRaw(&c, 1); // No data received (non-blocking mode) if (len<0) { usleep(1); continue; } // Data received with zero length! if (len==0) { cout << "len==0" << endl; continue; } // Data received if (len>1) { cout << "Data too long!!!" << endl; break; } // Data received (len==1) if (c!='\n') { str += c; continue; } // String completed InterpreteStr(str); str = ""; } delete fServSock; delete fRxSocket; fServSock = NULL; fRxSocket = NULL; } cout << "Receiver stopped..." << endl; return NULL; }