#include "MTcpIpIO.h" #include // usleep #include #include #include #include "MLog.h" #include "MLogManip.h" #include "MString.h" #include "MTimeout.h" #undef DEBUG using namespace std; /* enum ESockOptions { kSendBuffer, // size of send buffer kRecvBuffer, // size of receive buffer kOobInline, // OOB message inline kKeepAlive, // keep socket alive kReuseAddr, // allow reuse of local portion of address 5-tuple kNoDelay, // send without delay kNoBlock, // non-blocking I/O kProcessGroup, // socket process group (used for SIGURG and SIGIO) kAtMark, // are we at out-of-band mark (read only) kBytesToRead // get number of bytes to read, FIONREAD (read only) }; enum ESendRecvOptions { kDefault, // default option (= 0) kOob, // send or receive out-of-band data kPeek, // peek at incoming message (receive only) kDontBlock // send/recv as much data as possible without blocking }; */ MTcpIpO::MTcpIpO(const char *addr, Int_t tx) { fTxSocket = new TSocket(addr, tx); } MTcpIpO::~MTcpIpO() { // Now delete all TCP/IP objects delete fTxSocket; } MTcpIpIO::MTcpIpIO(const char *addr, Int_t tx, Int_t rx) : MTcpIpI(rx), MTcpIpO(addr, tx) { RunThread(); } MTcpIpIO::~MTcpIpIO() { } TString MTcpIpO::GetSocketAddress(const TSocket &s) { if (!s.IsValid()) return "n/a"; const TInetAddress &a = s.GetInetAddress(); if (!a.IsValid()) return "undefined"; return MString::Format("%s:%d", a.GetHostAddress(), a.GetPort()); } TString MTcpIpO::GetSocketAddress() const { return GetSocketAddress(*fTxSocket); } bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len) { if (!tx.IsValid()) { //gLog << warn << "WARNING - No transmission to " << GetSocketAddress(tx) << " possible." << endl; return false; } #ifdef DEBUG cout << "Tx: " << msg << flush; #endif const Int_t l = tx.SendRaw(msg, len); if (l<0) { gLog << err << "ERROR - Sending TCP/IP frame to " << GetSocketAddress(tx) << endl; return false; } if (l!=len) { gLog << err << "ERROR - Sent wrong number (" << l << ") of bytes to " << GetSocketAddress(tx) << endl; return false; } return true; } bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len) { // R__LOCKGUARD2(myMutex); #ifdef DEBUG cout << "Connecting to " << addr << ":" << port << endl; #endif // FIXME: Set tx-socket to nonblocking? TSocket tx(addr, port); return SendFrame(tx, msg, len); /* if (!tx.IsValid()) { gLog << warn << "WARNING - No transmission to " << addr << ":" << port << " possible." << endl; return false; } gLog << dbg << "Sending to " << addr << ":" << port << endl; const Int_t l = tx.SendRaw(msg, len, kDontBlock); if (l<0) { gLog << err << "ERROR - Sending TCP/IP frame to " << addr << ":" << port << endl; return false; } if (l!=len) { gLog << err << "ERROR - Sent wrong number (" << l << ") of bytes to " << addr << ":" << port << endl; return false; } #ifdef DEBUG cout << "Tx: " << msg << flush; #endif return true; */ } bool MTcpIpO::Send(const char *msg, Int_t len) { return SendFrame(*fTxSocket, msg, len); } bool MTcpIpIO::InterpreteStr(TString str) { cout << "Rx: " << str << flush; return true; } Bool_t MTcpIpIO::ReadSocket(TSocket &rx) { TString str; while (!IsThreadCanceled()) { char c; const Int_t len = rx.RecvRaw(&c, 1); // For details see TSocket::RecvRaw // -1: // ERROR // EINVAL, EWOULDBLOCK // -5: // EPIPE || ECONNRESET = Pipe broken or connection reset by peer // 0: Data received with zero length! (Connection lost/call interrupted) if (len<=0) return kFALSE; // Data received if (len>1) { cout << "Data too long!!!" << endl; break; } // Data received (len==1) if (c!='\n') { str += c; continue; } // String completed InterpreteStr(str); str = ""; } return kTRUE; } Bool_t MTcpIpI::WaitForData(TSocket &sock) { // No connection established? if (!sock.IsValid()) { gLog << warn << "TSocket invalid on port " << fPortRx << "." << endl; return kFALSE; } fConnectionEstablished = kTRUE; MTimeout timeout; // Get connection on port fPortRx and redirected while (!IsThreadCanceled()) { // Check for pending data (every ms) switch (sock.Select(TSocket::kRead, 1)) { case kTRUE: // Data pending... go on reading if (!ReadSocket(sock)) { gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(sock) << endl; return kFALSE; } timeout.Start(fTimeout); // Go on waiting for new data continue; case kFALSE: // Time out, no data yet, go on waiting if (timeout.HasTimedOut()) { gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(sock) << " timed out." << endl; return kFALSE; } // Go on waiting for new data continue; default: // Error occurance gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl; return kFALSE; } } return kTRUE; //??? } void MTcpIpI::WaitForConnection(TServerSocket &server) { while (!IsThreadCanceled()) { gLog << all << MTime(-1) << " Listening for new connection on port " << fPortRx << "..." << endl; // Check for a connection request (reminder: we are in non-blocking mode) TSocket *socket = 0; while (!IsThreadCanceled()) { //cout << (int) IsThreadCanceled() << endl; // Wait for a new connection on RX port socket = server.Accept(); // Accept returned an error if (socket==0) { gLog << err << "Error: TServerSock::Accept on port " << fPortRx << ": " << strerror(errno) << endl; // Since we don't know the type of the error we better shut down the socket return; } // No connection request pending if ((Long_t)socket<0) { MThread::Sleep(1000); // Wait a ms continue; } // Connection established break; } if ((Long_t)socket<=0) return; gLog << all << MTime(-1) << " Connection established to " << MTcpIpO::GetSocketAddress(*socket) << "..." << endl; if (!WaitForData(*socket)) fConnectionEstablished = kFALSE; delete socket; } } Int_t MTcpIpI::Thread() { gLog << inf << "- Starting server listening on port " << fPortRx << "..." << endl; while (!IsThreadCanceled()) { TServerSocket *server=new TServerSocket(fPortRx, kTRUE, 0); server->SetOption(kNoBlock, 1); while (!IsThreadCanceled() && server->IsValid()) WaitForConnection(*server); if (!server->IsValid()) { gLog << err << "ServerSocket on port " << fPortRx << " invalid: "; switch (server->GetErrorCode()) { case 0: gLog << "No error." << endl; break; case -1: gLog << "low level socket() call failed." << endl; break; case -2: gLog << "low level bind() call failed." << endl; break; case -3: gLog << "low level listen() call failed." << endl; break; default: gLog << "Unknown." << endl; break; } } delete server; MThread::Sleep(5000000); } gLog << inf << "- Listening server stopped on port " << fPortRx << "." << endl; return 0; } /* Int_t MTcpIpI::Thread() { gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl; // if (fPortRx==7404) // { // gLog << err << "CeCo communication skipped." << endl; // return 0; // } TServerSocket *fServSock=NULL; TSocket *fRxSocket=NULL; // while (!HasStopFlag()) while (!IsThreadCanceled()) { fServSock=new TServerSocket(fPortRx, kTRUE); if (!fServSock->IsValid()) { gLog << err << "ServerSocket on port " << fPortRx << " invalid: "; switch (fServSock->GetErrorCode()) { case 0: gLog << "No error." << endl; break; case -1: gLog << "low level socket() call failed." << endl; break; case -2: gLog << "low level bind() call failed." << endl; break; case -3: gLog << "low level listen() call failed." << endl; break; default: gLog << "Unknown." << endl; break; } delete fServSock; fServSock=NULL; MThread::Sleep(5000000); continue; } fServSock->SetOption(kNoBlock, 1); gLog << all << "Waiting for connection on port " << fPortRx << "..." << endl; // while (!HasStopFlag() && (Long_t)fRxSocket<=0) while (!IsThreadCanceled() && (Long_t)fRxSocket<=0) { //TThread::CancelPoint(); fRxSocket = fServSock->Accept(); if (fRxSocket==0) cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl; usleep(10); } // Can happen in case of HasStopFlag() if (fRxSocket==(void*)-1) fRxSocket=NULL; if (fRxSocket==NULL) { delete fServSock; fServSock=NULL; continue; } if (!fRxSocket->IsValid()) { cout << "TSocket invalid on port " << fPortRx << "." << endl; delete fServSock; delete fRxSocket; fServSock = NULL; fRxSocket = NULL; continue; } gLog << all << "Connection established on port " << fPortRx << "." << endl; //fRxSocket->SetOption(kNoBlock, 1); // Waqit for data while (!IsThreadCanceled()) { switch (fRxSocket->Select(kRead, 1)) { case kTRUE: // Data waiting to be read break; case kFALSE: // time out usleep(10); continue; } // ERROR cout << "Error: TRxSocket::Select on port " << fPortRx << "." << endl; delete fServSock; delete fRxSocket; fServSock = NULL; fRxSocket = NULL; break; }¨ if (!fServSock) continue; if (IsThreadCanceled()) { delete fServSock; delete fRxSocket; fServSock = NULL; fRxSocket = NULL; continue; } // ------ IDENTICAL UP TO HERE ------ // Read and evaluate data ReadSocket(*fRxSocket); delete fServSock; delete fRxSocket; fServSock = NULL; fRxSocket = NULL; } gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl; return 0; // return NULL; }*/