Ignore:
Timestamp:
04/05/11 11:21:27 (14 years ago)
Author:
tbretz
Message:
- Added enum ConnectionStatus_t
- change fConnectionStatus type to ConnectionStatus_t
- removed obsolete fReadBuffer
- added const qualifiers to const_buffer_1 in AsyncWrite
- added Out() member function
- (temporarily?) removed some PostMessage member functions
- removed host and port from constructors for simplification
- improved how connections are established
- improved output of host name during connection attempts

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/Connection.cc

    r10268 r10284  
    3131}
    3232
    33 void Connection::AsyncRead(ba::mutable_buffers_1 buffers, int type)
     33void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
    3434{
    3535    ba::async_read(*this, buffers,
     
    3838}
    3939
    40 void Connection::AsyncWrite(ba::mutable_buffers_1 buffers)
     40void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
    4141{
    4242    ba::async_write(*this, buffers,
     
    6363void Connection::AsyncConnect(tcp::resolver::iterator iterator)
    6464{
    65     //cout << "Start async connect(...)" << endl;
    66     fConnectionStatus = 1;
    67 
    6865    tcp::endpoint endpoint = *iterator;
    6966
     
    7269                  boost::bind(&Connection::ConnectImp,
    7370                              this, ba::placeholders::error,
    74                               ++iterator));
     71                              iterator));
    7572
    7673    // We will get a "Connection timeout anyway"
     
    8582    {
    8683        stringstream str;
    87         str << "Connection closed to " << URL() << ".";// << endl;
     84        str << "Connection closed to " << URL() << ".";
    8885        Message(str);
    8986    }
     
    9693
    9794    // Reset the connection status
    98     fConnectionStatus = 0;
     95    fConnectionStatus = kDisconnected;
    9996
    10097    // Stop deadline counters
     
    10299    fOutTimeout.cancel();
    103100
     101    // Empty output queue
     102    fOutQueue.clear();
     103
    104104    if (!restart || IsConnecting())
    105105        return;
     
    111111    // Start trying to reconnect
    112112    fMsgConnect = "";
     113    fErrConnect = "";
    113114    StartConnect();
    114115}
     
    122123void Connection::HandleWriteTimeout(const bs::error_code &error)
    123124{
    124     // 125: Operation canceled
    125     if (error && error!=bs::error_code(125, bs::system_category))
     125    // 125: Operation canceled (bs::error_code(125, bs::system_category))
     126    if (error && error!=ba::error::basic_errors::operation_aborted)
    126127    {
    127128        stringstream str;
    128129        str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
    129130        Error(str);
     131
     132        CloseImp();
     133        return;
    130134    }
    131135
     
    149153}
    150154
    151 void Connection::HandleSentData(const bs::error_code& error, size_t)
     155void Connection::HandleSentData(const bs::error_code& error, size_t n)
    152156{
    153157    if (error)
     
    161165    }
    162166
    163     Message("Data successfully sent to "+URL());
     167    stringstream msg;
     168    msg << n << " bytes successfully sent to " << URL();
     169    Message(msg);
    164170
    165171    // This is "thread" safe because SendMessage and HandleSentMessage
     
    177183
    178184    // AsyncWrite + Deadline
    179     AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);
     185    AsyncWrite(ba::const_buffers_1(&fOutQueue.front()[0],fOutQueue.front().size())/*, &Connection::HandleSentData*/);
    180186    AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
    181187}
    182188
    183 void Connection::SendMessageImp(const vector<char> &msg)
     189// It is important that when SendMessageImp is called, or to be more
     190// precise boost::bind is called, teh data is copied!
     191void Connection::SendMessageImp(const vector<char> msg)
    184192{
    185193    /*
     
    202210
    203211    // AsyncWrite + Deadline
    204     AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);
     212    AsyncWrite(ba::const_buffers_1(&fOutQueue.front()[0],fOutQueue.front().size())/*, &Connection::HandleSentData*/);
    205213    AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
    206 }
    207 
    208 void Connection::PostMessage(vector<uint16_t> inp)
    209 {
    210     // Convert to network byte order
    211     for_each(inp.begin(), inp.end(), htons);
    212 
    213     // FIXME FIXME
    214 
    215     PostMessage((char*)&inp.front(), sizeof(uint16_t)*inp.size());
    216 
    217 //    const vector<char> msg((char*)&inp.front(), (char*)&inp.last()+1);
    218 //    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
    219 }
    220 
    221 void Connection::PostMessage(const vector<char> &msg)
    222 {
    223     get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
    224214}
    225215
     
    239229    vector <char>msg(max);
    240230
    241     for (unsigned int i=0; i<max; i++)
    242         msg[i] = 0;
    243 
    244     for (unsigned int i=0; i<min(cmd.length()+1, max); i++)
    245         msg[i] = cmd[i];
    246 
    247     get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
     231    copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
     232
     233    PostMessage(msg);
    248234}
    249235
     
    251237{
    252238    // 125: Operation canceled
    253     if (error && error!=bs::error_code(125, bs::system_category))
     239    if (error && error!=ba::error::basic_errors::operation_aborted)
    254240    {
    255241        stringstream str;
     
    274260
    275261void Connection::ConnectImp(const bs::error_code& error,
    276                             tcp::resolver::iterator endpoint_iterator)
    277 {
     262                            tcp::resolver::iterator iterator)
     263{
     264    tcp::endpoint endpoint = *iterator;
     265
     266    const string host = endpoint.port()==0 ? "" :
     267        endpoint.address().to_string()+":"+lexical_cast<string>(endpoint.port());
     268
    278269    // Connection established
    279270    if (!error)
    280271    {
    281         Message("Connection established to "+URL()+"...");
    282 
    283         // Initialize hardware...
    284         // Set state to : connected/undefined until we get the first message
    285         //                (send request for configuration) ?
    286         //                (send configuration?)
    287         fMsgConnect = "";
    288         fErrConnect = "";
    289 
    290         fConnectionStatus = 2;
    291         //StartAsyncRead();
    292 
    293         // We can create the buffer here and delete it in
    294         // Handle Read. However, the read buffer is not filled again
    295         // before it was not read within HandleReceivedData -- so a single
    296         // buffer should be good enough. Before HandleReceivedData
    297         // returns the data must be copied to a "safe" place.
    298 
    299         // fSocket.get_io_service()/*fEventQueue*/.stop();
     272        Message("Connection established to "+host+"...");
     273
     274        fConnectionStatus = kConnected;
    300275
    301276        ConnectionEstablished();
     
    309284    // the endpoint again.
    310285    CloseImp(false);
    311     //fSocket.close();
    312 
    313     // 111: Connection refused
    314     if (1/*error!=bs::error_code(111, bs::system_category)*/)
    315     {
    316         stringstream msg;
    317         if (URL()!=":")
    318             msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")";
    319 
    320         if (fErrConnect!=msg.str())
    321         {
     286
     287    stringstream msg;
     288    if (!host.empty())
     289        msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
     290
     291    if (fErrConnect!=msg.str())
     292    {
     293        if (error!=ba::error::basic_errors::connection_refused)
    322294            fMsgConnect = "";
    323             fErrConnect = msg.str();
    324             Warn(fErrConnect);
    325         }
     295        fErrConnect = msg.str();
     296        Warn(fErrConnect);
    326297    }
    327298
    328299    // Go on with the next
    329     if (endpoint_iterator != tcp::resolver::iterator())
    330     {
    331         AsyncConnect(endpoint_iterator);
     300    if (++iterator != tcp::resolver::iterator())
     301    {
     302        AsyncConnect(iterator);
    332303        return;
    333304    }
     
    346317void Connection::StartConnect()
    347318{
    348     fConnectionStatus = 1;
     319    fConnectionStatus = kConnecting;
    349320
    350321    tcp::resolver resolver(get_io_service());
     
    401372    if (p0==string::npos || p0!=p1)
    402373    {
    403         Error("Connection::SetEndPoint - Wrong format of argument.");
     374        Error("Connection::SetEndPoint - Wrong format of argument ('host:port' expected)");
    404375        return;
    405376    }
     
    411382
    412383Connection::Connection(ba::io_service& ioservice, ostream &out) :
    413 MessageImp(out), tcp::socket(ioservice),
    414 fLog(0), //fAddress("localhost"), fPort("5000"),
     384MessageImp(out), tcp::socket(ioservice), fLog(0),
    415385fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
    416 fConnectionStatus(0)
    417 {
    418 }
    419 /*
    420 Connection::Connection(ba::io_service& ioservice, const string &addr, int port) :
    421 tcp::socket(ioservice),
    422 fLog(0), fAddress(addr), fPort(lexical_cast<string>(port)),
    423 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
    424 fConnectionStatus(0)
    425 {
    426 }
    427 
    428 Connection::Connection(ba::io_service& ioservice, const string &addr, const string &port) :
    429 tcp::socket(ioservice), fLog(0), fAddress(addr), fPort(port),
    430 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
    431 fConnectionStatus(0)
    432 {
    433 }
    434 */
     386fConnectionStatus(kDisconnected)
     387{
     388}
Note: See TracChangeset for help on using the changeset viewer.