Changeset 10851


Ignore:
Timestamp:
05/27/11 11:40:24 (14 years ago)
Author:
tbretz
Message:
Added support for switching between command socket and data sockets.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/fad.cc

    r10846 r10851  
    4040    }
    4141
    42     void AsyncWrite(const ba::const_buffers_1 &buffers)
    43     {
    44         ba::async_write(*this, buffers,
     42    void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
     43    {
     44        ba::async_write(*socket, buffers,
    4545                        boost::bind(&tcp_connection::HandleSentData, shared_from_this(),
    4646                                    dummy::error, dummy::bytes_transferred));
     
    7676
    7777    bool fTriggerEnabled;
     78    bool fCommandSocket;
     79
     80    int fSocket;
    7881
    7982    void SendData()
     
    108111        fBuffer.insert(fBuffer.begin(), h.begin(), h.end());
    109112
    110         AsyncWrite(ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
     113        if (fCommandSocket)
     114            AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
     115        else
     116        {
     117            fSocket++;
     118            fSocket %= fSockets.size();
     119
     120            AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2)));
     121        }
    111122    }
    112123
     
    186197            case kCmdRun+0x100:
    187198                cout << "-> Run" << endl;
     199                break;
     200
     201            case kCmdSocket:
     202            case kCmdSocket+0x100:
     203                cout << "-> Socket" << endl;
     204                fCommandSocket = fBufCommand[0]==kCmdSocket;
    188205                break;
    189206
     
    267284        // Ownership of buffer must be valid until Handler is called.
    268285
     286        fTriggerEnabled=false;
     287        fCommandSocket=true;
     288
    269289        fHeader.fStartDelimiter = FAD::kDelimiterStart;
    270290        fHeader.fVersion = 0x104;
     
    294314
    295315    }
     316
     317    vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets;
     318
     319    ~tcp_connection()
     320    {
     321        fSockets.clear();
     322    }
     323
     324    void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/)
     325    {
     326        cout << "Added one socket " << socket->remote_endpoint().address().to_v4().to_string();
     327        cout << ":"<< port << endl;
     328        fSockets.push_back(socket);
     329    }
    296330};
    297331
    298332
    299 class tcp_server : public tcp::acceptor
     333class tcp_server
    300334{
     335    tcp::acceptor acc0;
     336    tcp::acceptor acc1;
     337    tcp::acceptor acc2;
     338    tcp::acceptor acc3;
     339    tcp::acceptor acc4;
     340    tcp::acceptor acc5;
     341    tcp::acceptor acc6;
     342    tcp::acceptor acc7;
     343
     344    int fPort;
     345
    301346public:
    302347    tcp_server(ba::io_service& ioservice, int port) :
    303         tcp::acceptor(ioservice, tcp::endpoint(tcp::v4(), port))
    304 
     348        acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
     349        acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
     350        acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
     351        acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
     352        acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
     353        acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
     354        acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
     355        acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
     356        fPort(port)
    305357    {
    306358        // We could start listening for more than one connection
     
    313365
    314366private:
     367    void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
     368    {
     369        boost::shared_ptr<ba::ip::tcp::socket> connection =
     370            boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service()));
     371        acc.async_accept(*connection,
     372                          boost::bind(&tcp_connection::handle_accept,
     373                                      dest, connection,
     374                                      acc.local_endpoint().port(),
     375                                      ba::placeholders::error));
     376    }
     377
    315378    void start_accept()
    316379    {
    317380        cout << "Start accept..." << flush;
    318         tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/io_service());
     381        tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service());
    319382
    320383        // This will accept a connection without blocking
    321         async_accept(*new_connection,
    322                      boost::bind(&tcp_server::handle_accept,
    323                                  this,
    324                                  new_connection,
    325                                  ba::placeholders::error));
    326 
     384        acc0.async_accept(*new_connection,
     385                          boost::bind(&tcp_server::handle_accept,
     386                                      this,
     387                                      new_connection,
     388                                      ba::placeholders::error));
     389
     390        start_accept(new_connection, acc1);
     391        start_accept(new_connection, acc2);
     392        start_accept(new_connection, acc3);
     393        start_accept(new_connection, acc4);
     394        start_accept(new_connection, acc5);
     395        start_accept(new_connection, acc6);
     396        start_accept(new_connection, acc7);
    327397
    328398        cout << "start-done." << endl;
     
    358428        tcp_server server(io_service, port);
    359429
    360         tcp::acceptor acc1(io_service, tcp::endpoint(tcp::v4(), port+1));
    361         tcp::acceptor acc2(io_service, tcp::endpoint(tcp::v4(), port+2));
    362         tcp::acceptor acc3(io_service, tcp::endpoint(tcp::v4(), port+3));
    363         tcp::acceptor acc4(io_service, tcp::endpoint(tcp::v4(), port+4));
    364         tcp::acceptor acc5(io_service, tcp::endpoint(tcp::v4(), port+5));
    365         tcp::acceptor acc6(io_service, tcp::endpoint(tcp::v4(), port+6));
    366         tcp::acceptor acc7(io_service, tcp::endpoint(tcp::v4(), port+7));
    367 
    368430        //  ba::add_service(io_service, &server);
    369431        //  server.add_service(...);
    370         cout << "Run..." << flush;
     432        //cout << "Run..." << flush;
    371433
    372434        // Calling run() from a single thread ensures no concurrent access
     
    374436        io_service.run();
    375437
    376         cout << "end." << endl;
     438        //cout << "end." << endl;
    377439    }
    378440    catch (std::exception& e)
Note: See TracChangeset for help on using the changeset viewer.