#include "MTcpIpIO.h" #include // usleep #include #include #include #include "MLog.h" #include "MLogManip.h" #include "MString.h" #include "MTimeout.h" #undef DEBUG using namespace std; /* enum ESockOptions { kSendBuffer, // size of send buffer kRecvBuffer, // size of receive buffer kOobInline, // OOB message inline kKeepAlive, // keep socket alive kReuseAddr, // allow reuse of local portion of address 5-tuple kNoDelay, // send without delay kNoBlock, // non-blocking I/O kProcessGroup, // socket process group (used for SIGURG and SIGIO) kAtMark, // are we at out-of-band mark (read only) kBytesToRead // get number of bytes to read, FIONREAD (read only) }; enum ESendRecvOptions { kDefault, // default option (= 0) kOob, // send or receive out-of-band data kPeek, // peek at incoming message (receive only) kDontBlock // send/recv as much data as possible without blocking }; */ MTcpIpO::MTcpIpO(const char *addr, Int_t tx) : fPortTx(tx) { // was "inf2" but nobody knows what the prg is doing if it times out gLog << all << "- Open send socket to " << addr << ":" << tx << endl; const Int_t save = gDebug; gDebug=1; fTxSocket = new TSocket(addr, tx); gDebug=save; fTxSocket->SetOption(kNoBlock, 1); MTcpIpO::RunThread(); } MTcpIpO::~MTcpIpO() { CancelThread(); // Now delete all TCP/IP objects delete fTxSocket; } MTcpIpIO::MTcpIpIO(const char *addr, Int_t tx, Int_t rx, UInt_t timeout) : MTcpIpI(rx, timeout), MTcpIpO(addr, tx) { MTcpIpI::RunThread(); } TString MTcpIpO::GetSocketAddress(const TSocket &s) { if (!s.IsValid()) return "n/a"; const TInetAddress &a = s.GetInetAddress(); if (!a.IsValid()) return "undefined"; return MString::Format("%s:%d", a.GetHostAddress(), a.GetPort()); } /* TString MTcpIpO::GetSocketAddress() const { TLockGuard guard(const_cast(&fMutex)); return GetSocketAddress(*fTxSocket); } */ bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len) { if (!tx.IsValid()) return false; #ifdef DEBUG cout << "Tx: " << msg << flush; #endif const Int_t l = tx.SendRaw(msg, len, kDontBlock); // Frame not sent, EWOULDBLOCK if (l==-4) { gLog << err << "ERROR - Sending to " << GetSocketAddress(tx) << " would block." << endl; /*--->*/ tx.Close(); // ????? return false; } if (l<0) { gLog << err << "ERROR - Sending TCP/IP frame to " << GetSocketAddress(tx) << endl; return false; } if (l!=len) { gLog << err << "ERROR - Could only sent " << l << " out of " << len << " bytes to " << GetSocketAddress(tx) << endl; return false; } return true; } bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len) { #ifdef DEBUG cout << "Connecting to " << addr << ":" << port << endl; #endif // FIXME: Set tx-socket to nonblocking? TSocket tx(addr, port); return SendFrame(tx, msg, len); } bool MTcpIpO::Send(const char *msg, Int_t len) { const Int_t mtx = fMutex.TryLock(); if (mtx==13) gLog << warn << "MTcpIpO::Send - mutex is already locked by this thread." << endl; // If Mutex cannot be locked, i.e. we are currently reopening // the send socket we cannot wait, because we would block the // executing threrad. if (mtx) return false; const bool rc = SendFrame(*fTxSocket, msg, len); fMutex.UnLock(); return rc; } Int_t MTcpIpO::Thread() { const TInetAddress &a = fTxSocket->GetInetAddress(); if (!a.IsValid()) { gLog << err << "ERROR: Send socket address invalid." << endl; return 0; } Int_t wait = 1000; /// Wait a ms while (!IsThreadCanceled()) { if (fTxSocket->IsValid()) { MThread::Sleep(1000); // Wait a ms wait = 1000; continue; } #ifdef DEBUG cout << "- Reopen send socket to " << a.GetHostAddress() << ":" << fPortTx << endl; #endif fMutex.Lock(); delete fTxSocket; fTxSocket = new TSocket(a.GetHostAddress(), fPortTx); fTxSocket->SetOption(kNoBlock, 1); fMutex.UnLock(); if (fTxSocket->IsValid()) continue; MThread::Sleep(wait); // Wait a ms if (wait<3000000) // 3s wait *= 2; } return 1; } Int_t MTcpIpOI::Thread() { fConnectionEstablished = kFALSE; const TInetAddress &a = fTxSocket->GetInetAddress(); if (!a.IsValid()) { gLog << err << "- MTcpIpOI::Thread - ERROR: Send socket address invalid." << endl; return 0; } gLog << inf << "- MTcpIpOI::Thread created connecting to " << a.GetHostAddress() << ":" << fPortTx << endl; Int_t wait = 1000; /// Wait a ms MTimeout timeout(fTimeout); while (!IsThreadCanceled()) { if (fTxSocket->IsValid()) { fConnectionEstablished = kTRUE; fTimeout = 1000; // Get connection on port fPortRx and redirected // Check for pending data (every ms) switch (fTxSocket->Select(TSocket::kRead, 1)) { case kTRUE: // Data pending... go on reading if (!ReadSocket(*fTxSocket)) { gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(*fTxSocket) << endl; fTxSocket->Close(); continue; } timeout.Start(fTimeout); continue; case kFALSE: // Time out, no data yet, go on waiting if (timeout.HasTimedOut()) { gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(*fTxSocket) << " timed out after " << fTimeout << "ms." << endl; fTxSocket->Close(); continue; } continue; default: // Error occurance gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl; fTxSocket->Close(); continue; } } fConnectionEstablished = kFALSE; #ifdef DEBUG cout << "- Reopen send socket to " << a.GetHostAddress() << ":" << fPortTx << endl; #endif fMutex.Lock(); delete fTxSocket; fTxSocket = new TSocket(a.GetHostAddress(), fPortTx); fTxSocket->SetOption(kNoBlock, 1); fMutex.UnLock(); timeout.Start(fTimeout); if (fTxSocket->IsValid()) continue; MThread::Sleep(wait); // Wait a ms if (wait<3000000) // 3s wait *= 2; } return 1; } bool MTcpIpIO::InterpreteStr(TString str) { cout << "Rx: " << str << flush; return true; } Bool_t MTcpIpIO::ReadSocket(TSocket &rx) { TString str; while (!MTcpIpI::IsThreadCanceled()) { char c; const Int_t len = rx.RecvRaw(&c, 1); // For details see TSocket::RecvRaw // -1: // ERROR // EINVAL, EWOULDBLOCK // -5: // EPIPE || ECONNRESET = Pipe broken or connection reset by peer // 0: Data received with zero length! (Connection lost/call interrupted) if (len<=0) return kFALSE; // Data received if (len>1) { cout << "Data too long!!!" << endl; break; } // Data received (len==1) if (c!='\n') { str += c; continue; } // String completed InterpreteStr(str); str = ""; } return kTRUE; } Bool_t MTcpIpI::WaitForData(TSocket &sock) { // No connection established? if (!sock.IsValid()) { gLog << warn << "TSocket invalid on port " << fPortRx << "." << endl; return kFALSE; } fConnectionEstablished = kTRUE; MTimeout timeout(fTimeout); // Get connection on port fPortRx and redirected while (!IsThreadCanceled()) { // Check for pending data (every ms) switch (sock.Select(TSocket::kRead, 1)) { case kTRUE: // Data pending... go on reading if (!ReadSocket(sock)) { gLog << warn << MTime(-1) << " WARNING - Connection lost to " << MTcpIpO::GetSocketAddress(sock) << endl; return kFALSE; } timeout.Start(fTimeout); // Go on waiting for new data continue; case kFALSE: // Time out, no data yet, go on waiting if (timeout.HasTimedOut()) { gLog << warn << MTime(-1) << " WARNING - Connection to " << MTcpIpO::GetSocketAddress(sock) << " timed out after " << fTimeout << "ms." << endl; return kFALSE; } // Go on waiting for new data continue; default: // Error occurance gLog << err << "TSocket::Select returned an error: " << strerror(errno) << endl; return kFALSE; } } return kTRUE; //??? } void MTcpIpI::WaitForConnection(TServerSocket &server) { while (!IsThreadCanceled()) { gLog << all << MTime(-1) << " Listening for new connection on port " << fPortRx << "..." << endl; // Check for a connection request (reminder: we are in non-blocking mode) TSocket *socket = 0; while (!IsThreadCanceled()) { //cout << (int) IsThreadCanceled() << endl; // Wait for a new connection on RX port socket = server.Accept(); // Accept returned an error if (socket==0) { gLog << err << "Error: TServerSock::Accept on port " << fPortRx << ": " << strerror(errno) << endl; // Since we don't know the type of the error we better shut down the socket return; } // No connection request pending if ((Long_t)socket<0) { MThread::Sleep(1000); // Wait a ms continue; } // Connection established break; } if ((Long_t)socket<=0) return; gLog << all << MTime(-1) << " Connection established to " << MTcpIpO::GetSocketAddress(*socket) << "..." << endl; if (!WaitForData(*socket)) fConnectionEstablished = kFALSE; #ifdef DEBUG cout << "===> DEL SOCKET" << endl; #endif delete socket; } } Int_t MTcpIpI::Thread() { gLog << inf << "- Starting server listening on port " << fPortRx << "..." << endl; while (!IsThreadCanceled()) { TServerSocket *server=new TServerSocket(fPortRx, kTRUE, 0); server->SetOption(kNoBlock, 1); while (!IsThreadCanceled() && server->IsValid()) WaitForConnection(*server); if (!server->IsValid()) { gLog << err << "ServerSocket on port " << fPortRx << " invalid: "; switch (server->GetErrorCode()) { case 0: gLog << "No error." << endl; break; case -1: gLog << "low level socket() call failed." << endl; break; case -2: gLog << "low level bind() call failed." << endl; break; case -3: gLog << "low level listen() call failed." << endl; break; default: gLog << "Unknown." << endl; break; } } #ifdef DEBUG cout << "===> DEL SERVER" << endl; #endif delete server; MThread::Sleep(5000000); } gLog << inf << "- Listening server stopped on port " << fPortRx << "." << endl; return 0; } /* Int_t MTcpIpI::Thread() { gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl; // if (fPortRx==7404) // { // gLog << err << "CeCo communication skipped." << endl; // return 0; // } TServerSocket *fServSock=NULL; TSocket *fRxSocket=NULL; // while (!HasStopFlag()) while (!IsThreadCanceled()) { fServSock=new TServerSocket(fPortRx, kTRUE); if (!fServSock->IsValid()) { gLog << err << "ServerSocket on port " << fPortRx << " invalid: "; switch (fServSock->GetErrorCode()) { case 0: gLog << "No error." << endl; break; case -1: gLog << "low level socket() call failed." << endl; break; case -2: gLog << "low level bind() call failed." << endl; break; case -3: gLog << "low level listen() call failed." << endl; break; default: gLog << "Unknown." << endl; break; } delete fServSock; fServSock=NULL; MThread::Sleep(5000000); continue; } fServSock->SetOption(kNoBlock, 1); gLog << all << "Waiting for connection on port " << fPortRx << "..." << endl; // while (!HasStopFlag() && (Long_t)fRxSocket<=0) while (!IsThreadCanceled() && (Long_t)fRxSocket<=0) { //TThread::CancelPoint(); fRxSocket = fServSock->Accept(); if (fRxSocket==0) cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl; usleep(10); } // Can happen in case of HasStopFlag() if (fRxSocket==(void*)-1) fRxSocket=NULL; if (fRxSocket==NULL) { delete fServSock; fServSock=NULL; continue; } if (!fRxSocket->IsValid()) { cout << "TSocket invalid on port " << fPortRx << "." << endl; delete fServSock; delete fRxSocket; fServSock = NULL; fRxSocket = NULL; continue; } gLog << all << "Connection established on port " << fPortRx << "." << endl; //fRxSocket->SetOption(kNoBlock, 1); // Waqit for data while (!IsThreadCanceled()) { switch (fRxSocket->Select(kRead, 1)) { case kTRUE: // Data waiting to be read break; case kFALSE: // time out usleep(10); continue; } // ERROR cout << "Error: TRxSocket::Select on port " << fPortRx << "." << endl; delete fServSock; delete fRxSocket; fServSock = NULL; fRxSocket = NULL; break; }¨ if (!fServSock) continue; if (IsThreadCanceled()) { delete fServSock; delete fRxSocket; fServSock = NULL; fRxSocket = NULL; continue; } // ------ IDENTICAL UP TO HERE ------ // Read and evaluate data ReadSocket(*fRxSocket); delete fServSock; delete fRxSocket; fServSock = NULL; fRxSocket = NULL; } gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl; return 0; // return NULL; }*/