Changeset 10284


Ignore:
Timestamp:
Apr 5, 2011, 11:21:27 AM (9 years ago)
Author:
tbretz
Message:
- Added enum ConnectionStatus_t
- change fConnectionStatus type to ConnectionStatus_t
- removed obsolete fReadBuffer
- added const qualifiers to const_buffer_1 in AsyncWrite
- added Out() member function
- (temporarily?) removed some PostMessage member functions
- removed host and port from constructors for simplification
- improved how connections are established
- improved output of host name during connection attempts

Location:
trunk/FACT++/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/Connection.cc

    r10268 r10284  
    3131}
    3232
    33 void Connection::AsyncRead(ba::mutable_buffers_1 buffers, int type)
     33void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
    3434{
    3535    ba::async_read(*this, buffers,
     
    3838}
    3939
    40 void Connection::AsyncWrite(ba::mutable_buffers_1 buffers)
     40void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
    4141{
    4242    ba::async_write(*this, buffers,
     
    6363void Connection::AsyncConnect(tcp::resolver::iterator iterator)
    6464{
    65     //cout << "Start async connect(...)" << endl;
    66     fConnectionStatus = 1;
    67 
    6865    tcp::endpoint endpoint = *iterator;
    6966
     
    7269                  boost::bind(&Connection::ConnectImp,
    7370                              this, ba::placeholders::error,
    74                               ++iterator));
     71                              iterator));
    7572
    7673    // We will get a "Connection timeout anyway"
     
    8582    {
    8683        stringstream str;
    87         str << "Connection closed to " << URL() << ".";// << endl;
     84        str << "Connection closed to " << URL() << ".";
    8885        Message(str);
    8986    }
     
    9693
    9794    // Reset the connection status
    98     fConnectionStatus = 0;
     95    fConnectionStatus = kDisconnected;
    9996
    10097    // Stop deadline counters
     
    10299    fOutTimeout.cancel();
    103100
     101    // Empty output queue
     102    fOutQueue.clear();
     103
    104104    if (!restart || IsConnecting())
    105105        return;
     
    111111    // Start trying to reconnect
    112112    fMsgConnect = "";
     113    fErrConnect = "";
    113114    StartConnect();
    114115}
     
    122123void Connection::HandleWriteTimeout(const bs::error_code &error)
    123124{
    124     // 125: Operation canceled
    125     if (error && error!=bs::error_code(125, bs::system_category))
     125    // 125: Operation canceled (bs::error_code(125, bs::system_category))
     126    if (error && error!=ba::error::basic_errors::operation_aborted)
    126127    {
    127128        stringstream str;
    128129        str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
    129130        Error(str);
     131
     132        CloseImp();
     133        return;
    130134    }
    131135
     
    149153}
    150154
    151 void Connection::HandleSentData(const bs::error_code& error, size_t)
     155void Connection::HandleSentData(const bs::error_code& error, size_t n)
    152156{
    153157    if (error)
     
    161165    }
    162166
    163     Message("Data successfully sent to "+URL());
     167    stringstream msg;
     168    msg << n << " bytes successfully sent to " << URL();
     169    Message(msg);
    164170
    165171    // This is "thread" safe because SendMessage and HandleSentMessage
     
    177183
    178184    // AsyncWrite + Deadline
    179     AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);
     185    AsyncWrite(ba::const_buffers_1(&fOutQueue.front()[0],fOutQueue.front().size())/*, &Connection::HandleSentData*/);
    180186    AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
    181187}
    182188
    183 void Connection::SendMessageImp(const vector<char> &msg)
     189// It is important that when SendMessageImp is called, or to be more
     190// precise boost::bind is called, teh data is copied!
     191void Connection::SendMessageImp(const vector<char> msg)
    184192{
    185193    /*
     
    202210
    203211    // AsyncWrite + Deadline
    204     AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);
     212    AsyncWrite(ba::const_buffers_1(&fOutQueue.front()[0],fOutQueue.front().size())/*, &Connection::HandleSentData*/);
    205213    AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
    206 }
    207 
    208 void Connection::PostMessage(vector<uint16_t> inp)
    209 {
    210     // Convert to network byte order
    211     for_each(inp.begin(), inp.end(), htons);
    212 
    213     // FIXME FIXME
    214 
    215     PostMessage((char*)&inp.front(), sizeof(uint16_t)*inp.size());
    216 
    217 //    const vector<char> msg((char*)&inp.front(), (char*)&inp.last()+1);
    218 //    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
    219 }
    220 
    221 void Connection::PostMessage(const vector<char> &msg)
    222 {
    223     get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
    224214}
    225215
     
    239229    vector <char>msg(max);
    240230
    241     for (unsigned int i=0; i<max; i++)
    242         msg[i] = 0;
    243 
    244     for (unsigned int i=0; i<min(cmd.length()+1, max); i++)
    245         msg[i] = cmd[i];
    246 
    247     get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
     231    copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
     232
     233    PostMessage(msg);
    248234}
    249235
     
    251237{
    252238    // 125: Operation canceled
    253     if (error && error!=bs::error_code(125, bs::system_category))
     239    if (error && error!=ba::error::basic_errors::operation_aborted)
    254240    {
    255241        stringstream str;
     
    274260
    275261void Connection::ConnectImp(const bs::error_code& error,
    276                             tcp::resolver::iterator endpoint_iterator)
    277 {
     262                            tcp::resolver::iterator iterator)
     263{
     264    tcp::endpoint endpoint = *iterator;
     265
     266    const string host = endpoint.port()==0 ? "" :
     267        endpoint.address().to_string()+":"+lexical_cast<string>(endpoint.port());
     268
    278269    // Connection established
    279270    if (!error)
    280271    {
    281         Message("Connection established to "+URL()+"...");
    282 
    283         // Initialize hardware...
    284         // Set state to : connected/undefined until we get the first message
    285         //                (send request for configuration) ?
    286         //                (send configuration?)
    287         fMsgConnect = "";
    288         fErrConnect = "";
    289 
    290         fConnectionStatus = 2;
    291         //StartAsyncRead();
    292 
    293         // We can create the buffer here and delete it in
    294         // Handle Read. However, the read buffer is not filled again
    295         // before it was not read within HandleReceivedData -- so a single
    296         // buffer should be good enough. Before HandleReceivedData
    297         // returns the data must be copied to a "safe" place.
    298 
    299         // fSocket.get_io_service()/*fEventQueue*/.stop();
     272        Message("Connection established to "+host+"...");
     273
     274        fConnectionStatus = kConnected;
    300275
    301276        ConnectionEstablished();
     
    309284    // the endpoint again.
    310285    CloseImp(false);
    311     //fSocket.close();
    312 
    313     // 111: Connection refused
    314     if (1/*error!=bs::error_code(111, bs::system_category)*/)
    315     {
    316         stringstream msg;
    317         if (URL()!=":")
    318             msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")";
    319 
    320         if (fErrConnect!=msg.str())
    321         {
     286
     287    stringstream msg;
     288    if (!host.empty())
     289        msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
     290
     291    if (fErrConnect!=msg.str())
     292    {
     293        if (error!=ba::error::basic_errors::connection_refused)
    322294            fMsgConnect = "";
    323             fErrConnect = msg.str();
    324             Warn(fErrConnect);
    325         }
     295        fErrConnect = msg.str();
     296        Warn(fErrConnect);
    326297    }
    327298
    328299    // Go on with the next
    329     if (endpoint_iterator != tcp::resolver::iterator())
    330     {
    331         AsyncConnect(endpoint_iterator);
     300    if (++iterator != tcp::resolver::iterator())
     301    {
     302        AsyncConnect(iterator);
    332303        return;
    333304    }
     
    346317void Connection::StartConnect()
    347318{
    348     fConnectionStatus = 1;
     319    fConnectionStatus = kConnecting;
    349320
    350321    tcp::resolver resolver(get_io_service());
     
    401372    if (p0==string::npos || p0!=p1)
    402373    {
    403         Error("Connection::SetEndPoint - Wrong format of argument.");
     374        Error("Connection::SetEndPoint - Wrong format of argument ('host:port' expected)");
    404375        return;
    405376    }
     
    411382
    412383Connection::Connection(ba::io_service& ioservice, ostream &out) :
    413 MessageImp(out), tcp::socket(ioservice),
    414 fLog(0), //fAddress("localhost"), fPort("5000"),
     384MessageImp(out), tcp::socket(ioservice), fLog(0),
    415385fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
    416 fConnectionStatus(0)
    417 {
    418 }
    419 /*
    420 Connection::Connection(ba::io_service& ioservice, const string &addr, int port) :
    421 tcp::socket(ioservice),
    422 fLog(0), fAddress(addr), fPort(lexical_cast<string>(port)),
    423 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
    424 fConnectionStatus(0)
    425 {
    426 }
    427 
    428 Connection::Connection(ba::io_service& ioservice, const string &addr, const string &port) :
    429 tcp::socket(ioservice), fLog(0), fAddress(addr), fPort(port),
    430 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
    431 fConnectionStatus(0)
    432 {
    433 }
    434 */
     386fConnectionStatus(kDisconnected)
     387{
     388}
  • trunk/FACT++/src/Connection.h

    r10268 r10284  
    1717    std::string fAddress;
    1818    std::string fPort;
     19
     20    enum ConnectionStatus_t
     21    {
     22        kDisconnected = 0,
     23        kConnecting   = 1,
     24        kConnected    = 2,
     25    };
     26
    1927protected:
    2028    boost::asio::deadline_timer   fInTimeout;
     
    2331    boost::asio::deadline_timer   fOutTimeout;
    2432    boost::asio::deadline_timer   fConnectionTimer;
    25     std::deque<std::vector<char>> fOutQueue; // Shell we directly put ba:buffers into the queue?
     33    std::deque<std::vector<char>> fOutQueue;
    2634
    27     int fConnectionStatus;   // 0=offline, 1=connecting, 2=connected
     35    ConnectionStatus_t fConnectionStatus;
    2836
    2937    std::string fErrConnect;
    3038    std::string fMsgConnect;
    3139
    32 protected:
    33     char fReadBuffer[1000];
    34 
    35 
    3640public:
    3741    void SetLogStream(MessageImp *log) { fLog = log; }
     42    std::ostream &Out() { return fLog ? fLog->Out() : Out(); }
    3843
    3944    // -------- Abbreviations for starting async tasks ---------
    4045
    41     int Write(const Time &t, const char *txt, int qos=kInfo);
    42 
    43     void AsyncRead(boost::asio::mutable_buffers_1 buffers, int type=0);
    44     void AsyncWrite(boost::asio::mutable_buffers_1 buffers);
     46    void AsyncRead(const boost::asio::mutable_buffers_1 buffers, int type=0);
     47    void AsyncWrite(const boost::asio::const_buffers_1 &buffers);
    4548    void AsyncWait(boost::asio::deadline_timer &timer, int millisec,
    4649                   void (Connection::*handler)(const boost::system::error_code&));
     50
     51private:
    4752    void AsyncConnect(boost::asio::ip::tcp::resolver::iterator iterator);
    48 
    49     std::string URL() const { return fAddress + ":" + fPort; }
    5053
    5154    void CloseImp(bool restart=true);
     
    5457                    boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
    5558
    56 public:
    57 
    58     // ------------------------ close --------------------------
    59     // close from another thread
    60     void PostClose(bool restart=true);
    61 
    62     // ------------------------ write --------------------------
    6359    void HandleConnectionTimer(const boost::system::error_code &error);
    6460    void HandleWriteTimeout(const boost::system::error_code &error);
    6561    void HandleSentData(const boost::system::error_code& error, size_t);
    66     void SendMessageImp(const std::vector<char> &msg);
    67     void PostMessage(std::vector<uint16_t> msg);
    68     void PostMessage(const std::vector<char> &msg);
    69     void PostMessage(const void *msg, size_t s=0);
    70     void PostMessage(const std::string &cmd, size_t s=-1);
    7162
    72     template<std::size_t N>
    73         void PostMessage(boost::array<uint16_t, N> arr)
    74     {
    75         // Convert to network byte order
    76         for_each(arr.begin(), arr.end(), htons);
    77         PostMessage(std::vector<uint16_t>(arr.begin(), arr.end()));
    78     }
    79 
    80     // ------------------------ connect --------------------------
     63    int Write(const Time &t, const char *txt, int qos=kInfo);
    8164
    8265    virtual void ConnectionEstablished() { }
    8366
    84     void StartConnect();
     67public:
     68    Connection(boost::asio::io_service& io_service, std::ostream &out);
    8569
    86     Connection(boost::asio::io_service& io_service, std::ostream &out);
    87 //    Connection(boost::asio::io_service& io_service, const std::string &addr="localhost", int port=5000);
    88 //    Connection(boost::asio::io_service& io_service, const std::string &addr, const std::string &port);
     70    // ------------------------ connect --------------------------
    8971
    9072    void SetEndpoint(const std::string &addr, int port);
    9173    void SetEndpoint(const std::string &addr, const std::string &port);
    9274    void SetEndpoint(const std::string &addr);
     75
     76    void StartConnect();
     77
     78    // ------------------------ close --------------------------
     79    void PostClose(bool restart=true);
     80
     81    // ------------------------ write --------------------------
     82    void SendMessageImp(const std::vector<char> msg);
     83    void PostMessage(const void *msg, size_t s=0);
     84    void PostMessage(const std::string &cmd, size_t s=-1);
     85
     86    template<typename T, size_t N>
     87        void PostMessage(const boost::array<T, N> &msg)
     88    {
     89        PostMessage(msg.begin(), msg.size()*sizeof(T));
     90    }
     91
     92    template<typename T>
     93        void PostMessage(const std::vector<T> &msg)
     94    {
     95        PostMessage(&msg[0], msg.size()*sizeof(T));
     96    }
    9397
    9498    // ------------------------ others --------------------------
     
    99103    int IsClosed() const { return !is_open(); }
    100104
    101     bool IsConnected()  const { return fConnectionStatus==2; }
    102     bool IsConnecting() const { return fConnectionStatus==1; }
     105    bool IsConnected()  const { return fConnectionStatus==kConnected;  }
     106    bool IsConnecting() const { return fConnectionStatus==kConnecting; }
     107
     108    std::string URL() const { return fAddress + ":" + fPort; }
    103109};
    104110
Note: See TracChangeset for help on using the changeset viewer.