Changeset 10268


Ignore:
Timestamp:
03/30/11 15:39:31 (14 years ago)
Author:
tbretz
Message:
Some improvements to the connection handling, added some PostMessage functions which automatically convert to network byte order.
Location:
trunk/FACT++/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/Connection.cc

    r10183 r10268  
    3131}
    3232
    33 void Connection::AsyncRead(ba::mutable_buffers_1 buffers)
     33void Connection::AsyncRead(ba::mutable_buffers_1 buffers, int type)
    3434{
    3535    ba::async_read(*this, buffers,
    3636                   boost::bind(&Connection::HandleReceivedData, this,
    37                                dummy::error, dummy::bytes_transferred));
     37                               dummy::error, dummy::bytes_transferred, type));
    3838}
    3939
     
    8989    }
    9090
    91     // Close the connection
     91    // Stop any pending connection attempt
     92    fConnectionTimer.cancel();
     93
     94    // Close possible open connections
     95    close();
     96
     97    // Reset the connection status
    9298    fConnectionStatus = 0;
    93     close();
    9499
    95100    // Stop deadline counters
    96101    fInTimeout.cancel();
    97102    fOutTimeout.cancel();
    98     //fConnectTimeout.cancel();
    99103
    100104    if (!restart || IsConnecting())
     
    106110
    107111    // Start trying to reconnect
    108     AsyncConnect();
     112    fMsgConnect = "";
     113    StartConnect();
    109114}
    110115
     
    201206}
    202207
     208void Connection::PostMessage(vector<uint16_t> inp)
     209{
     210    // Convert to network byte order
     211    for_each(inp.begin(), inp.end(), htons);
     212
     213    // FIXME FIXME
     214
     215    PostMessage((char*)&inp.front(), sizeof(uint16_t)*inp.size());
     216
     217//    const vector<char> msg((char*)&inp.front(), (char*)&inp.last()+1);
     218//    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
     219}
     220
    203221void Connection::PostMessage(const vector<char> &msg)
    204222{
     223    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
     224}
     225
     226void Connection::PostMessage(const void *ptr, size_t max)
     227{
     228    const vector<char> msg(reinterpret_cast<const char*>(ptr),
     229                           reinterpret_cast<const char*>(ptr)+max);
     230
    205231    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
    206232}
     
    244270    // to run.
    245271    if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
    246         AsyncConnect();
     272        StartConnect();
    247273}
    248274
     
    272298
    273299        // fSocket.get_io_service()/*fEventQueue*/.stop();
     300
     301        ConnectionEstablished();
    274302        return;
    275303    }
     
    287315    {
    288316        stringstream msg;
    289         msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")";
     317        if (URL()!=":")
     318            msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")";
    290319
    291320        if (fErrConnect!=msg.str())
     
    315344
    316345// FIXME: Async connect should get address and port as an argument
    317 void Connection::AsyncConnect()
     346void Connection::StartConnect()
    318347{
    319348    fConnectionStatus = 1;
     
    321350    tcp::resolver resolver(get_io_service());
    322351
     352    boost::system::error_code ec;
     353
    323354    tcp::resolver::query query(fAddress, fPort);
    324     tcp::resolver::iterator iterator = resolver.resolve(query);
     355    tcp::resolver::iterator iterator = resolver.resolve(query, ec);
    325356
    326357    stringstream msg;
    327     msg << "Trying to connect to " << fAddress << ":" << fPort << "...";
    328 
     358    if (!fAddress.empty() || !fPort.empty() || ec)
     359        msg << "Trying to connect to " << URL() << "...";
     360
     361    if (ec)
     362        msg << " " << ec.message() << " (" << ec << ")";
     363
     364    // Only output message if it has changed
    329365    if (fMsgConnect!=msg.str())
    330366    {
    331367        fMsgConnect = msg.str();
    332         Message(msg);
    333     }
    334 
    335     // Start connection attempts (will also reset deadline counter)
    336     AsyncConnect(iterator);
    337 }
    338 
    339 void Connection::SetEndpoint(const char *addr, int port)
     368        ec ? Error(msg) : Message(msg);
     369    }
     370
     371    if (ec)
     372        AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
     373    else
     374        // Start connection attempts (will also reset deadline counter)
     375        AsyncConnect(iterator);
     376}
     377
     378void Connection::SetEndpoint(const string &addr, int port)
    340379{
    341380    if (fConnectionStatus>=1)
     
    346385}
    347386
     387void Connection::SetEndpoint(const string &addr, const string &port)
     388{
     389    if (fConnectionStatus>=1 && URL()!=":")
     390        Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
     391
     392    fAddress = addr;
     393    fPort    = port;
     394}
     395
     396void Connection::SetEndpoint(const string &addr)
     397{
     398    const size_t p0 = addr.find_first_of(':');
     399    const size_t p1 = addr.find_last_of(':');
     400
     401    if (p0==string::npos || p0!=p1)
     402    {
     403        Error("Connection::SetEndPoint - Wrong format of argument.");
     404        return;
     405    }
     406
     407    SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
     408}
     409
     410
    348411
    349412Connection::Connection(ba::io_service& ioservice, ostream &out) :
    350413MessageImp(out), tcp::socket(ioservice),
    351 fLog(0), fAddress("localhost"), fPort("5000"),
     414fLog(0), //fAddress("localhost"), fPort("5000"),
    352415fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
    353416fConnectionStatus(0)
    354417{
    355418}
    356 
     419/*
    357420Connection::Connection(ba::io_service& ioservice, const string &addr, int port) :
    358421tcp::socket(ioservice),
     
    369432{
    370433}
    371 
     434*/
  • trunk/FACT++/src/Connection.h

    r10183 r10268  
    55#include <string>
    66#include <boost/asio.hpp>
     7#include <boost/function.hpp>
    78#include <boost/asio/deadline_timer.hpp>
    89
     
    4041    int Write(const Time &t, const char *txt, int qos=kInfo);
    4142
    42     void AsyncRead(boost::asio::mutable_buffers_1 buffers/*,
    43                    void (Connection::*handler)(const boost::system::error_code&, size_t)*/);
    44     void AsyncWrite(boost::asio::mutable_buffers_1 buffers/*,
    45                     void (Connection::*handler)(const boost::system::error_code&)*/);
     43    void AsyncRead(boost::asio::mutable_buffers_1 buffers, int type=0);
     44    void AsyncWrite(boost::asio::mutable_buffers_1 buffers);
    4645    void AsyncWait(boost::asio::deadline_timer &timer, int millisec,
    4746                   void (Connection::*handler)(const boost::system::error_code&));
     
    5049    std::string URL() const { return fAddress + ":" + fPort; }
    5150
     51    void CloseImp(bool restart=true);
     52
     53    void ConnectImp(const boost::system::error_code& error,
     54                    boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
     55
    5256public:
    5357
    5458    // ------------------------ close --------------------------
    5559    // close from another thread
    56     void CloseImp(bool restart=true);
    5760    void PostClose(bool restart=true);
    5861
     
    6265    void HandleSentData(const boost::system::error_code& error, size_t);
    6366    void SendMessageImp(const std::vector<char> &msg);
     67    void PostMessage(std::vector<uint16_t> msg);
    6468    void PostMessage(const std::vector<char> &msg);
     69    void PostMessage(const void *msg, size_t s=0);
    6570    void PostMessage(const std::string &cmd, size_t s=-1);
     71
     72    template<std::size_t N>
     73        void PostMessage(boost::array<uint16_t, N> arr)
     74    {
     75        // Convert to network byte order
     76        for_each(arr.begin(), arr.end(), htons);
     77        PostMessage(std::vector<uint16_t>(arr.begin(), arr.end()));
     78    }
    6679
    6780    // ------------------------ connect --------------------------
    6881
    69     virtual void ConnectImp(const boost::system::error_code& error,
    70                             boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
     82    virtual void ConnectionEstablished() { }
    7183
    72     void AsyncConnect();
     84    void StartConnect();
    7385
    7486    Connection(boost::asio::io_service& io_service, std::ostream &out);
    75     Connection(boost::asio::io_service& io_service, const std::string &addr="localhost", int port=5000);
    76     Connection(boost::asio::io_service& io_service, const std::string &addr, const std::string &port);
     87//    Connection(boost::asio::io_service& io_service, const std::string &addr="localhost", int port=5000);
     88//    Connection(boost::asio::io_service& io_service, const std::string &addr, const std::string &port);
    7789
    78     void SetEndpoint(const char *addr, int port);
     90    void SetEndpoint(const std::string &addr, int port);
     91    void SetEndpoint(const std::string &addr, const std::string &port);
     92    void SetEndpoint(const std::string &addr);
    7993
    8094    // ------------------------ others --------------------------
    8195
    82     virtual void HandleReceivedData(const boost::system::error_code&, size_t) { }
     96    virtual void HandleReceivedData(const boost::system::error_code&, size_t, int = 0) { }
    8397    virtual void HandleReadTimeout(const boost::system::error_code&) { }
    8498
Note: See TracChangeset for help on using the changeset viewer.