Ignore:
Timestamp:
01/16/08 14:34:11 (17 years ago)
Author:
tbretz
Message:
*** empty log message ***
Location:
trunk/MagicSoft/Cosy/tcpip
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/MagicSoft/Cosy/tcpip/MDriveCom.h

    r7788 r8816  
    1010class MsgQueue;
    1111class Ring;
     12class MLog;
    1213
    1314class MDriveCom : public MCeCoCom
     
    3940        kMonitoring = 0x40
    4041    };
    41  
    42     MDriveCom(MsgQueue *q, MLog &out=gLog) : MCeCoCom("DRIVE-REPORT", out), fQueue(q) {}
     42
     43    MDriveCom(MsgQueue *q, MLog *out) : MCeCoCom(out), fQueue(q) {}
    4344
    4445    bool SendReport(UInt_t stat, RaDec rd, ZdAz so, ZdAz is, ZdAz er);
  • trunk/MagicSoft/Cosy/tcpip/MTcpIpIO.cc

    r8378 r8816  
    22
    33#include <unistd.h>    // usleep
    4 #include <iostream>
    54
    65#include <TSocket.h>
    76#include <TServerSocket.h>
     7
     8#include "MLog.h"
     9#include "MLogManip.h"
    810
    911#undef DEBUG
     
    3335     */
    3436
    35 MTcpIpIO::MTcpIpIO(MLog &out)
    36     : MThread(false), Log(out), fRxSocket(NULL), fServSock(NULL), fSendInterval(1000)
    37 {
    38     fTxSocket = new TSocket("ceco", 7304);
     37MTcpIpO::MTcpIpO(Int_t tx)
     38{
     39    fTxSocket = new TSocket("ceco", tx);
     40}
     41
     42MTcpIpO::~MTcpIpO()
     43{
     44    // Now delete all TCP/IP objects
     45    delete fTxSocket;
     46}
     47
     48MTcpIpIO::MTcpIpIO(Int_t tx, Int_t rx) : MTcpIpI(rx), MTcpIpO(tx)
     49{
     50    RunThread();
    3951}
    4052
    4153MTcpIpIO::~MTcpIpIO()
    4254{
    43     //
    44     // Make sure, that no loop waiting for connection
    45     // is running anymore!
    46     //
    47     Stop();
    48 
    49     //
    50     // Now delete all TCP/IP objects
    51     //
    52     //cout << "Delete TxSocket " << fTxSocket << "..." << flush;
    53     delete fTxSocket;
    54     //cout << "Done." << endl;
    55     if (fServSock)
    56     {
    57         //cout << "Delete ServSock " << fServSock << "..." << flush;
    58         delete fServSock;
    59         //cout << "Done." << endl;
    60     }
    61     if (fRxSocket)
    62     {
    63         //cout << "Delete RxSocket " << fRxSocket << "..." << flush;
    64         delete fRxSocket;
    65         //cout << "Done." << endl;
    66     }
    67 }
    68 
    69 bool MTcpIpIO::Send(const char *msg, bool force=kFALSE)
    70 {
    71 
    72     const MTime t(-1);
    73 
    74     if ((double)t-(double)fTime<0.001*fSendInterval && !force)
    75           return true;
    76    
    77 
    78     if (lout.Lock("MTcpIpIO::Send"))
    79     {
    80         //const Int_t rc = lout.IsOutputDeviceEnabled(MLog::eGui);
    81         //lout.DisableOutputDevice(MLog::eGui);
    82         lout << msg << flush;
    83         lout.UnLock("MTcpIpIO::Send");
    84         //if (rc)
    85         //    lout.EnableOutputDevice(MLog::eGui);
    86     }
    87 
    88     fTime = t;
    89 
    90     if (!fTxSocket->IsValid())
    91         return false;
    92 
    93     const Int_t len = fTxSocket->SendRaw(msg, strlen(msg));
    94     if (len<0)
     55}
     56
     57bool MTcpIpO::SendFrame(TSocket &tx, const char *msg, int len)
     58{
     59    if (!tx.IsValid())
     60    {
     61        //cout << "*!* Transmit socket invalid!" << endl;
     62        return false;
     63    }
     64
     65    const Int_t l = tx.SendRaw(msg, len);
     66    if (l<0)
    9567    {
    9668        cout << "ERROR - Sending Message" << endl;
    9769        return false;
    9870    }
    99     if (len!=(Int_t)strlen(msg))
    100     {
    101         cout << "Send wrong number (" << len << ") of Bytes." << endl;
    102         return false;
    103     }
     71
     72    if (l!=len)
     73    {
     74        cout << "Send wrong number (" << l << ") of Bytes." << endl;
     75        return false;
     76    }
     77
    10478#ifdef DEBUG
    10579    cout << "Tx: " << msg << flush;
     
    10983}
    11084
     85bool MTcpIpO::SendFrame(const char *addr, int port, const char *msg, int len)
     86{
     87    //    R__LOCKGUARD2(myMutex);
     88
     89    cout << "Connecting to " << addr << ":" << port << endl;
     90
     91    // FIXME: Set tx-socket to nonblocking?
     92    TSocket tx(addr, port);
     93    //    return SendFrame(tx, msg, len);
     94
     95    if (!tx.IsValid())
     96    {
     97        //cout << "*!* Transmit socket invalid!" << endl;
     98        return false;
     99    }
     100
     101    cout << "Sending to " << addr << ":" << port << endl;
     102
     103    const Int_t l = tx.SendRaw(msg, len, kDontBlock);
     104    if (l<0)
     105    {
     106        cout << "ERROR - Sending Message" << endl;
     107        return false;
     108    }
     109
     110    if (l!=len)
     111    {
     112        cout << "Send wrong number (" << l << ") of Bytes." << endl;
     113        return false;
     114    }
     115
     116#ifdef DEBUG
     117    cout << "Tx: " << msg << flush;
     118#endif
     119
     120    return true;
     121}
     122
     123bool MTcpIpO::Send(const char *msg, Int_t len)
     124{
     125    return SendFrame(*fTxSocket, msg, len);
     126}
     127
    111128bool MTcpIpIO::InterpreteStr(TString str)
    112129{
     
    115132}
    116133
    117 void MTcpIpIO::Clear()
    118 {
     134void MTcpIpIO::ReadSocket(TSocket &rx)
     135{
     136    // Clear buffer!
    119137    char c;
    120     while (fRxSocket->RecvRaw(&c, 1)>0 && !HasStopFlag())
     138//    while (fRxSocket->RecvRaw(&c, 1)>0 && !HasStopFlag())
     139    while (rx.RecvRaw(&c, 1)>0 && !IsThreadCanceled())
    121140        usleep(1);
    122 }
    123 
    124 void *MTcpIpIO::Thread()
    125 {
    126     cout << "Starting receiver..." << endl;
    127 
    128     while (!HasStopFlag())
    129     {
    130         fServSock=new TServerSocket(7404, kTRUE);
     141
     142    TString str;
     143    //        while (!HasStopFlag())
     144    while (!IsThreadCanceled())
     145    {
     146        char c;
     147        const Int_t len = rx.RecvRaw(&c, 1);
     148
     149        // No data received (non-blocking mode)
     150        if (len<0)
     151        {
     152            usleep(1);
     153            continue;
     154        }
     155
     156        // Data received with zero length!
     157        if (len==0)
     158        {
     159            // THIS MEANS CONNECTIION LOST!!!!
     160            cout << "============> len==0 (CONNECTION LOST?)" << endl;
     161            break; // This break is for TEST PURPOSE FIXME!!!
     162            continue;
     163        }
     164
     165        // Data received
     166        if (len>1)
     167        {
     168            cout << "Data too long!!!" << endl;
     169            break;
     170        }
     171
     172        // Data received (len==1)
     173        if (c!='\n')
     174        {
     175            str += c;
     176            continue;
     177        }
     178
     179        // String completed
     180        InterpreteStr(str);
     181        str = "";
     182    }
     183}
     184
     185//void *MTcpIpIO::Thread()
     186Int_t MTcpIpI::Thread()
     187{
     188    gLog << inf << "- Starting receiver on port " << fPortRx << "..." << endl;
     189
     190    TServerSocket *fServSock=NULL;
     191    TSocket       *fRxSocket=NULL;
     192
     193//    while (!HasStopFlag())
     194    while (!IsThreadCanceled())
     195    {
     196        fServSock=new TServerSocket(fPortRx, kTRUE);
    131197        if (!fServSock->IsValid())
    132198        {
    133             cout << "ServerSocket not valid: ";
     199            cout << "ServerSocket on port " << fPortRx << " invalid: ";
    134200            switch (fServSock->GetErrorCode())
    135201            {
     
    142208            delete fServSock;
    143209            fServSock=NULL;
    144             usleep(5000000);
     210            MyThreadX::Sleep(5000000);
    145211            continue;
    146212        }
     
    148214        fServSock->SetOption(kNoBlock, 1);
    149215
    150         cout << "Waiting for conntection on port 7404..." << endl;
    151         while (!HasStopFlag() && (Long_t)fRxSocket<=0)
    152         {
     216        cout << "Waiting for connection on port " << fPortRx << "..." << endl;
     217//        while (!HasStopFlag() && (Long_t)fRxSocket<=0)
     218        while (!IsThreadCanceled() && (Long_t)fRxSocket<=0)
     219        {
     220            //TThread::CancelPoint();
     221
    153222            fRxSocket = fServSock->Accept();
    154223            if (fRxSocket==0)
    155                 cout << "Error: TServerSock::Accept" << endl;
     224                cout << "Error: TServerSock::Accept on port " << fPortRx << "." << endl;
    156225            usleep(10);
    157226        }
     
    170239        if (!fRxSocket->IsValid())
    171240        {
    172             cout << "TSocket not valid..." << endl;
     241            cout << "TSocket invalid on port " << fPortRx << "." << endl;
    173242            delete fServSock;
    174243            delete fRxSocket;
     
    178247        }
    179248
    180         cout << "Connection established..." << endl;
     249        cout << "Connection established on port " << fPortRx << "." << endl;
    181250
    182251        fRxSocket->SetOption(kNoBlock, 1);
    183252
    184         Clear();
    185 
    186         TString str;
    187         while (!HasStopFlag())
    188         {
    189             char c;
    190             const Int_t len = fRxSocket->RecvRaw(&c, 1);
    191 
    192             // No data received (non-blocking mode)
    193             if (len<0)
    194             {
    195                 usleep(1);
    196                 continue;
    197             }
    198 
    199             // Data received with zero length!
    200             if (len==0)
    201             {
    202                 cout << "len==0" << endl;
    203                 continue;
    204             }
    205 
    206             // Data received
    207             if (len>1)
    208             {
    209                 cout << "Data too long!!!" << endl;
    210                 break;
    211             }
    212 
    213             // Data received (len==1)
    214             if (c!='\n')
    215             {
    216                 str += c;
    217                 continue;
    218             }
    219 
    220             // String completed
    221             InterpreteStr(str);
    222             str = "";
    223         }
     253        // ------ IDENTICAL UP TO HERE ------
     254
     255        ReadSocket(*fRxSocket);
     256
    224257        delete fServSock;
    225258        delete fRxSocket;
     
    228261    }
    229262
    230     cout << "Receiver stopped..." << endl;
    231 
    232     return NULL;
    233 }
     263    gLog << inf << "- Receiver stopped on port " << fPortRx << "." << endl;
     264
     265    return 0;
     266//    return NULL;
     267}
  • trunk/MagicSoft/Cosy/tcpip/MTcpIpIO.h

    r4865 r8816  
    22#define COSY_MTcpIpIO
    33
    4 #ifndef COSY_MThread
     4#ifndef MARS_MThread
    55#include "MThread.h"
    6 #endif
    7 #ifndef COSY_Log
    8 #include "log.h"
    96#endif
    107#ifndef MARS_MTime
     
    129#endif
    1310
     11
    1412class TString;
    1513class TSocket;
    1614class TServerSocket;
    1715
    18 class MTcpIpIO : public MThread, public Log
     16// A generalized class for receiving over tcp/ip
     17class MTcpIpI : public MyThreadX
    1918{
    2019private:
    21     TSocket       *fTxSocket;
    22     TSocket       *fRxSocket;
    23     TServerSocket *fServSock;
     20    Int_t fPortRx;
    2421
    25     MTime fTime;
    26     Int_t fSendInterval; // [ms]
     22    Int_t Thread();
    2723
    28     void Clear();
     24    virtual void ReadSocket(TSocket &rx) = 0;
    2925
    3026public:
    31     MTcpIpIO(MLog &out=gLog);
     27    MTcpIpI(Int_t rx) : MyThreadX(Form("MTcpIpI::%d", rx)), fPortRx(rx) { /*RunThread();*/ }
     28    ~MTcpIpI() { CancelThread(); }
     29};
     30
     31
     32// A generalized class for sending over tcp/ip
     33class MTcpIpO
     34{
     35private:
     36    TSocket *fTxSocket;
     37
     38public:
     39    MTcpIpO(Int_t tx);
     40    ~MTcpIpO();
     41
     42    static bool SendFrame(TSocket &tx, const char *msg, int len);
     43    static bool SendFrame(const char *addr, int port, const char *msg, int len);
     44
     45    bool Send(const char *msg, int len);
     46};
     47
     48// This class es espcially meant to receive and send ascii messages
     49class MTcpIpIO : public MTcpIpI, public MTcpIpO
     50{
     51private:
     52    void ReadSocket(TSocket &rx);
     53
     54public:
     55    MTcpIpIO(Int_t tx, Int_t rx);
    3256    ~MTcpIpIO();
    3357
    34     virtual bool Send(const char *msg, bool force);
    3558    virtual bool InterpreteStr(TString str);
    36 
    37     void *Thread();
    3859};
    3960
Note: See TracChangeset for help on using the changeset viewer.