- Timestamp:
- 05/27/11 11:40:24 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/fad.cc
r10846 r10851 40 40 } 41 41 42 void AsyncWrite( const ba::const_buffers_1 &buffers)43 { 44 ba::async_write(* this, buffers,42 void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers) 43 { 44 ba::async_write(*socket, buffers, 45 45 boost::bind(&tcp_connection::HandleSentData, shared_from_this(), 46 46 dummy::error, dummy::bytes_transferred)); … … 76 76 77 77 bool fTriggerEnabled; 78 bool fCommandSocket; 79 80 int fSocket; 78 81 79 82 void SendData() … … 108 111 fBuffer.insert(fBuffer.begin(), h.begin(), h.end()); 109 112 110 AsyncWrite(ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2))); 113 if (fCommandSocket) 114 AsyncWrite(this, ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2))); 115 else 116 { 117 fSocket++; 118 fSocket %= fSockets.size(); 119 120 AsyncWrite(fSockets[fSocket].get(), ba::buffer(ba::const_buffer(fBuffer.data(), fBuffer.size()*2))); 121 } 111 122 } 112 123 … … 186 197 case kCmdRun+0x100: 187 198 cout << "-> Run" << endl; 199 break; 200 201 case kCmdSocket: 202 case kCmdSocket+0x100: 203 cout << "-> Socket" << endl; 204 fCommandSocket = fBufCommand[0]==kCmdSocket; 188 205 break; 189 206 … … 267 284 // Ownership of buffer must be valid until Handler is called. 268 285 286 fTriggerEnabled=false; 287 fCommandSocket=true; 288 269 289 fHeader.fStartDelimiter = FAD::kDelimiterStart; 270 290 fHeader.fVersion = 0x104; … … 294 314 295 315 } 316 317 vector<boost::shared_ptr<ba::ip::tcp::socket>> fSockets; 318 319 ~tcp_connection() 320 { 321 fSockets.clear(); 322 } 323 324 void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket, int port, const boost::system::error_code&/* error*/) 325 { 326 cout << "Added one socket " << socket->remote_endpoint().address().to_v4().to_string(); 327 cout << ":"<< port << endl; 328 fSockets.push_back(socket); 329 } 296 330 }; 297 331 298 332 299 class tcp_server : public tcp::acceptor333 class tcp_server 300 334 { 335 tcp::acceptor acc0; 336 tcp::acceptor acc1; 337 tcp::acceptor acc2; 338 tcp::acceptor acc3; 339 tcp::acceptor acc4; 340 tcp::acceptor acc5; 341 tcp::acceptor acc6; 342 tcp::acceptor acc7; 343 344 int fPort; 345 301 346 public: 302 347 tcp_server(ba::io_service& ioservice, int port) : 303 tcp::acceptor(ioservice, tcp::endpoint(tcp::v4(), port)) 304 348 acc0(ioservice, tcp::endpoint(tcp::v4(), port)), 349 acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)), 350 acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)), 351 acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)), 352 acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)), 353 acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)), 354 acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)), 355 acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)), 356 fPort(port) 305 357 { 306 358 // We could start listening for more than one connection … … 313 365 314 366 private: 367 void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc) 368 { 369 boost::shared_ptr<ba::ip::tcp::socket> connection = 370 boost::shared_ptr<ba::ip::tcp::socket>(new ba::ip::tcp::socket(acc.io_service())); 371 acc.async_accept(*connection, 372 boost::bind(&tcp_connection::handle_accept, 373 dest, connection, 374 acc.local_endpoint().port(), 375 ba::placeholders::error)); 376 } 377 315 378 void start_accept() 316 379 { 317 380 cout << "Start accept..." << flush; 318 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/ io_service());381 tcp_connection::shared_ptr new_connection = tcp_connection::create(/*acceptor_.*/acc0.io_service()); 319 382 320 383 // This will accept a connection without blocking 321 async_accept(*new_connection, 322 boost::bind(&tcp_server::handle_accept, 323 this, 324 new_connection, 325 ba::placeholders::error)); 326 384 acc0.async_accept(*new_connection, 385 boost::bind(&tcp_server::handle_accept, 386 this, 387 new_connection, 388 ba::placeholders::error)); 389 390 start_accept(new_connection, acc1); 391 start_accept(new_connection, acc2); 392 start_accept(new_connection, acc3); 393 start_accept(new_connection, acc4); 394 start_accept(new_connection, acc5); 395 start_accept(new_connection, acc6); 396 start_accept(new_connection, acc7); 327 397 328 398 cout << "start-done." << endl; … … 358 428 tcp_server server(io_service, port); 359 429 360 tcp::acceptor acc1(io_service, tcp::endpoint(tcp::v4(), port+1));361 tcp::acceptor acc2(io_service, tcp::endpoint(tcp::v4(), port+2));362 tcp::acceptor acc3(io_service, tcp::endpoint(tcp::v4(), port+3));363 tcp::acceptor acc4(io_service, tcp::endpoint(tcp::v4(), port+4));364 tcp::acceptor acc5(io_service, tcp::endpoint(tcp::v4(), port+5));365 tcp::acceptor acc6(io_service, tcp::endpoint(tcp::v4(), port+6));366 tcp::acceptor acc7(io_service, tcp::endpoint(tcp::v4(), port+7));367 368 430 // ba::add_service(io_service, &server); 369 431 // server.add_service(...); 370 cout << "Run..." << flush;432 //cout << "Run..." << flush; 371 433 372 434 // Calling run() from a single thread ensures no concurrent access … … 374 436 io_service.run(); 375 437 376 cout << "end." << endl;438 //cout << "end." << endl; 377 439 } 378 440 catch (std::exception& e)
Note:
See TracChangeset
for help on using the changeset viewer.