Changeset 10268
- Timestamp:
- 03/30/11 15:39:31 (14 years ago)
- Location:
- trunk/FACT++/src
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/Connection.cc
r10183 r10268 31 31 } 32 32 33 void Connection::AsyncRead(ba::mutable_buffers_1 buffers )33 void Connection::AsyncRead(ba::mutable_buffers_1 buffers, int type) 34 34 { 35 35 ba::async_read(*this, buffers, 36 36 boost::bind(&Connection::HandleReceivedData, this, 37 dummy::error, dummy::bytes_transferred ));37 dummy::error, dummy::bytes_transferred, type)); 38 38 } 39 39 … … 89 89 } 90 90 91 // Close the connection 91 // Stop any pending connection attempt 92 fConnectionTimer.cancel(); 93 94 // Close possible open connections 95 close(); 96 97 // Reset the connection status 92 98 fConnectionStatus = 0; 93 close();94 99 95 100 // Stop deadline counters 96 101 fInTimeout.cancel(); 97 102 fOutTimeout.cancel(); 98 //fConnectTimeout.cancel();99 103 100 104 if (!restart || IsConnecting()) … … 106 110 107 111 // Start trying to reconnect 108 AsyncConnect(); 112 fMsgConnect = ""; 113 StartConnect(); 109 114 } 110 115 … … 201 206 } 202 207 208 void Connection::PostMessage(vector<uint16_t> inp) 209 { 210 // Convert to network byte order 211 for_each(inp.begin(), inp.end(), htons); 212 213 // FIXME FIXME 214 215 PostMessage((char*)&inp.front(), sizeof(uint16_t)*inp.size()); 216 217 // const vector<char> msg((char*)&inp.front(), (char*)&inp.last()+1); 218 // get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg)); 219 } 220 203 221 void Connection::PostMessage(const vector<char> &msg) 204 222 { 223 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg)); 224 } 225 226 void Connection::PostMessage(const void *ptr, size_t max) 227 { 228 const vector<char> msg(reinterpret_cast<const char*>(ptr), 229 reinterpret_cast<const char*>(ptr)+max); 230 205 231 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg)); 206 232 } … … 244 270 // to run. 245 271 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now()) 246 AsyncConnect();272 StartConnect(); 247 273 } 248 274 … … 272 298 273 299 // fSocket.get_io_service()/*fEventQueue*/.stop(); 300 301 ConnectionEstablished(); 274 302 return; 275 303 } … … 287 315 { 288 316 stringstream msg; 289 msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")"; 317 if (URL()!=":") 318 msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")"; 290 319 291 320 if (fErrConnect!=msg.str()) … … 315 344 316 345 // FIXME: Async connect should get address and port as an argument 317 void Connection:: AsyncConnect()346 void Connection::StartConnect() 318 347 { 319 348 fConnectionStatus = 1; … … 321 350 tcp::resolver resolver(get_io_service()); 322 351 352 boost::system::error_code ec; 353 323 354 tcp::resolver::query query(fAddress, fPort); 324 tcp::resolver::iterator iterator = resolver.resolve(query );355 tcp::resolver::iterator iterator = resolver.resolve(query, ec); 325 356 326 357 stringstream msg; 327 msg << "Trying to connect to " << fAddress << ":" << fPort << "..."; 328 358 if (!fAddress.empty() || !fPort.empty() || ec) 359 msg << "Trying to connect to " << URL() << "..."; 360 361 if (ec) 362 msg << " " << ec.message() << " (" << ec << ")"; 363 364 // Only output message if it has changed 329 365 if (fMsgConnect!=msg.str()) 330 366 { 331 367 fMsgConnect = msg.str(); 332 Message(msg); 333 } 334 335 // Start connection attempts (will also reset deadline counter) 336 AsyncConnect(iterator); 337 } 338 339 void Connection::SetEndpoint(const char *addr, int port) 368 ec ? Error(msg) : Message(msg); 369 } 370 371 if (ec) 372 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer); 373 else 374 // Start connection attempts (will also reset deadline counter) 375 AsyncConnect(iterator); 376 } 377 378 void Connection::SetEndpoint(const string &addr, int port) 340 379 { 341 380 if (fConnectionStatus>=1) … … 346 385 } 347 386 387 void Connection::SetEndpoint(const string &addr, const string &port) 388 { 389 if (fConnectionStatus>=1 && URL()!=":") 390 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection."); 391 392 fAddress = addr; 393 fPort = port; 394 } 395 396 void Connection::SetEndpoint(const string &addr) 397 { 398 const size_t p0 = addr.find_first_of(':'); 399 const size_t p1 = addr.find_last_of(':'); 400 401 if (p0==string::npos || p0!=p1) 402 { 403 Error("Connection::SetEndPoint - Wrong format of argument."); 404 return; 405 } 406 407 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1)); 408 } 409 410 348 411 349 412 Connection::Connection(ba::io_service& ioservice, ostream &out) : 350 413 MessageImp(out), tcp::socket(ioservice), 351 fLog(0), fAddress("localhost"), fPort("5000"),414 fLog(0), //fAddress("localhost"), fPort("5000"), 352 415 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice), 353 416 fConnectionStatus(0) 354 417 { 355 418 } 356 419 /* 357 420 Connection::Connection(ba::io_service& ioservice, const string &addr, int port) : 358 421 tcp::socket(ioservice), … … 369 432 { 370 433 } 371 434 */ -
trunk/FACT++/src/Connection.h
r10183 r10268 5 5 #include <string> 6 6 #include <boost/asio.hpp> 7 #include <boost/function.hpp> 7 8 #include <boost/asio/deadline_timer.hpp> 8 9 … … 40 41 int Write(const Time &t, const char *txt, int qos=kInfo); 41 42 42 void AsyncRead(boost::asio::mutable_buffers_1 buffers/*, 43 void (Connection::*handler)(const boost::system::error_code&, size_t)*/); 44 void AsyncWrite(boost::asio::mutable_buffers_1 buffers/*, 45 void (Connection::*handler)(const boost::system::error_code&)*/); 43 void AsyncRead(boost::asio::mutable_buffers_1 buffers, int type=0); 44 void AsyncWrite(boost::asio::mutable_buffers_1 buffers); 46 45 void AsyncWait(boost::asio::deadline_timer &timer, int millisec, 47 46 void (Connection::*handler)(const boost::system::error_code&)); … … 50 49 std::string URL() const { return fAddress + ":" + fPort; } 51 50 51 void CloseImp(bool restart=true); 52 53 void ConnectImp(const boost::system::error_code& error, 54 boost::asio::ip::tcp::resolver::iterator endpoint_iterator); 55 52 56 public: 53 57 54 58 // ------------------------ close -------------------------- 55 59 // close from another thread 56 void CloseImp(bool restart=true);57 60 void PostClose(bool restart=true); 58 61 … … 62 65 void HandleSentData(const boost::system::error_code& error, size_t); 63 66 void SendMessageImp(const std::vector<char> &msg); 67 void PostMessage(std::vector<uint16_t> msg); 64 68 void PostMessage(const std::vector<char> &msg); 69 void PostMessage(const void *msg, size_t s=0); 65 70 void PostMessage(const std::string &cmd, size_t s=-1); 71 72 template<std::size_t N> 73 void PostMessage(boost::array<uint16_t, N> arr) 74 { 75 // Convert to network byte order 76 for_each(arr.begin(), arr.end(), htons); 77 PostMessage(std::vector<uint16_t>(arr.begin(), arr.end())); 78 } 66 79 67 80 // ------------------------ connect -------------------------- 68 81 69 virtual void ConnectImp(const boost::system::error_code& error, 70 boost::asio::ip::tcp::resolver::iterator endpoint_iterator); 82 virtual void ConnectionEstablished() { } 71 83 72 void AsyncConnect();84 void StartConnect(); 73 85 74 86 Connection(boost::asio::io_service& io_service, std::ostream &out); 75 Connection(boost::asio::io_service& io_service, const std::string &addr="localhost", int port=5000);76 Connection(boost::asio::io_service& io_service, const std::string &addr, const std::string &port);87 // Connection(boost::asio::io_service& io_service, const std::string &addr="localhost", int port=5000); 88 // Connection(boost::asio::io_service& io_service, const std::string &addr, const std::string &port); 77 89 78 void SetEndpoint(const char *addr, int port); 90 void SetEndpoint(const std::string &addr, int port); 91 void SetEndpoint(const std::string &addr, const std::string &port); 92 void SetEndpoint(const std::string &addr); 79 93 80 94 // ------------------------ others -------------------------- 81 95 82 virtual void HandleReceivedData(const boost::system::error_code&, size_t ) { }96 virtual void HandleReceivedData(const boost::system::error_code&, size_t, int = 0) { } 83 97 virtual void HandleReadTimeout(const boost::system::error_code&) { } 84 98
Note:
See TracChangeset
for help on using the changeset viewer.