Ignore:
Timestamp:
03/30/11 15:39:31 (14 years ago)
Author:
tbretz
Message:
Some improvements to the connection handling, added some PostMessage functions which automatically convert to network byte order.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/Connection.cc

    r10183 r10268  
    3131}
    3232
    33 void Connection::AsyncRead(ba::mutable_buffers_1 buffers)
     33void Connection::AsyncRead(ba::mutable_buffers_1 buffers, int type)
    3434{
    3535    ba::async_read(*this, buffers,
    3636                   boost::bind(&Connection::HandleReceivedData, this,
    37                                dummy::error, dummy::bytes_transferred));
     37                               dummy::error, dummy::bytes_transferred, type));
    3838}
    3939
     
    8989    }
    9090
    91     // Close the connection
     91    // Stop any pending connection attempt
     92    fConnectionTimer.cancel();
     93
     94    // Close possible open connections
     95    close();
     96
     97    // Reset the connection status
    9298    fConnectionStatus = 0;
    93     close();
    9499
    95100    // Stop deadline counters
    96101    fInTimeout.cancel();
    97102    fOutTimeout.cancel();
    98     //fConnectTimeout.cancel();
    99103
    100104    if (!restart || IsConnecting())
     
    106110
    107111    // Start trying to reconnect
    108     AsyncConnect();
     112    fMsgConnect = "";
     113    StartConnect();
    109114}
    110115
     
    201206}
    202207
     208void Connection::PostMessage(vector<uint16_t> inp)
     209{
     210    // Convert to network byte order
     211    for_each(inp.begin(), inp.end(), htons);
     212
     213    // FIXME FIXME
     214
     215    PostMessage((char*)&inp.front(), sizeof(uint16_t)*inp.size());
     216
     217//    const vector<char> msg((char*)&inp.front(), (char*)&inp.last()+1);
     218//    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
     219}
     220
    203221void Connection::PostMessage(const vector<char> &msg)
    204222{
     223    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
     224}
     225
     226void Connection::PostMessage(const void *ptr, size_t max)
     227{
     228    const vector<char> msg(reinterpret_cast<const char*>(ptr),
     229                           reinterpret_cast<const char*>(ptr)+max);
     230
    205231    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
    206232}
     
    244270    // to run.
    245271    if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
    246         AsyncConnect();
     272        StartConnect();
    247273}
    248274
     
    272298
    273299        // fSocket.get_io_service()/*fEventQueue*/.stop();
     300
     301        ConnectionEstablished();
    274302        return;
    275303    }
     
    287315    {
    288316        stringstream msg;
    289         msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")";
     317        if (URL()!=":")
     318            msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")";
    290319
    291320        if (fErrConnect!=msg.str())
     
    315344
    316345// FIXME: Async connect should get address and port as an argument
    317 void Connection::AsyncConnect()
     346void Connection::StartConnect()
    318347{
    319348    fConnectionStatus = 1;
     
    321350    tcp::resolver resolver(get_io_service());
    322351
     352    boost::system::error_code ec;
     353
    323354    tcp::resolver::query query(fAddress, fPort);
    324     tcp::resolver::iterator iterator = resolver.resolve(query);
     355    tcp::resolver::iterator iterator = resolver.resolve(query, ec);
    325356
    326357    stringstream msg;
    327     msg << "Trying to connect to " << fAddress << ":" << fPort << "...";
    328 
     358    if (!fAddress.empty() || !fPort.empty() || ec)
     359        msg << "Trying to connect to " << URL() << "...";
     360
     361    if (ec)
     362        msg << " " << ec.message() << " (" << ec << ")";
     363
     364    // Only output message if it has changed
    329365    if (fMsgConnect!=msg.str())
    330366    {
    331367        fMsgConnect = msg.str();
    332         Message(msg);
    333     }
    334 
    335     // Start connection attempts (will also reset deadline counter)
    336     AsyncConnect(iterator);
    337 }
    338 
    339 void Connection::SetEndpoint(const char *addr, int port)
     368        ec ? Error(msg) : Message(msg);
     369    }
     370
     371    if (ec)
     372        AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
     373    else
     374        // Start connection attempts (will also reset deadline counter)
     375        AsyncConnect(iterator);
     376}
     377
     378void Connection::SetEndpoint(const string &addr, int port)
    340379{
    341380    if (fConnectionStatus>=1)
     
    346385}
    347386
     387void Connection::SetEndpoint(const string &addr, const string &port)
     388{
     389    if (fConnectionStatus>=1 && URL()!=":")
     390        Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
     391
     392    fAddress = addr;
     393    fPort    = port;
     394}
     395
     396void Connection::SetEndpoint(const string &addr)
     397{
     398    const size_t p0 = addr.find_first_of(':');
     399    const size_t p1 = addr.find_last_of(':');
     400
     401    if (p0==string::npos || p0!=p1)
     402    {
     403        Error("Connection::SetEndPoint - Wrong format of argument.");
     404        return;
     405    }
     406
     407    SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
     408}
     409
     410
    348411
    349412Connection::Connection(ba::io_service& ioservice, ostream &out) :
    350413MessageImp(out), tcp::socket(ioservice),
    351 fLog(0), fAddress("localhost"), fPort("5000"),
     414fLog(0), //fAddress("localhost"), fPort("5000"),
    352415fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
    353416fConnectionStatus(0)
    354417{
    355418}
    356 
     419/*
    357420Connection::Connection(ba::io_service& ioservice, const string &addr, int port) :
    358421tcp::socket(ioservice),
     
    369432{
    370433}
    371 
     434*/
Note: See TracChangeset for help on using the changeset viewer.