Changeset 10268 for trunk/FACT++/src/Connection.cc
- Timestamp:
- 03/30/11 15:39:31 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/Connection.cc
r10183 r10268 31 31 } 32 32 33 void Connection::AsyncRead(ba::mutable_buffers_1 buffers )33 void Connection::AsyncRead(ba::mutable_buffers_1 buffers, int type) 34 34 { 35 35 ba::async_read(*this, buffers, 36 36 boost::bind(&Connection::HandleReceivedData, this, 37 dummy::error, dummy::bytes_transferred ));37 dummy::error, dummy::bytes_transferred, type)); 38 38 } 39 39 … … 89 89 } 90 90 91 // Close the connection 91 // Stop any pending connection attempt 92 fConnectionTimer.cancel(); 93 94 // Close possible open connections 95 close(); 96 97 // Reset the connection status 92 98 fConnectionStatus = 0; 93 close();94 99 95 100 // Stop deadline counters 96 101 fInTimeout.cancel(); 97 102 fOutTimeout.cancel(); 98 //fConnectTimeout.cancel();99 103 100 104 if (!restart || IsConnecting()) … … 106 110 107 111 // Start trying to reconnect 108 AsyncConnect(); 112 fMsgConnect = ""; 113 StartConnect(); 109 114 } 110 115 … … 201 206 } 202 207 208 void Connection::PostMessage(vector<uint16_t> inp) 209 { 210 // Convert to network byte order 211 for_each(inp.begin(), inp.end(), htons); 212 213 // FIXME FIXME 214 215 PostMessage((char*)&inp.front(), sizeof(uint16_t)*inp.size()); 216 217 // const vector<char> msg((char*)&inp.front(), (char*)&inp.last()+1); 218 // get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg)); 219 } 220 203 221 void Connection::PostMessage(const vector<char> &msg) 204 222 { 223 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg)); 224 } 225 226 void Connection::PostMessage(const void *ptr, size_t max) 227 { 228 const vector<char> msg(reinterpret_cast<const char*>(ptr), 229 reinterpret_cast<const char*>(ptr)+max); 230 205 231 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg)); 206 232 } … … 244 270 // to run. 245 271 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now()) 246 AsyncConnect();272 StartConnect(); 247 273 } 248 274 … … 272 298 273 299 // fSocket.get_io_service()/*fEventQueue*/.stop(); 300 301 ConnectionEstablished(); 274 302 return; 275 303 } … … 287 315 { 288 316 stringstream msg; 289 msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")"; 317 if (URL()!=":") 318 msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")"; 290 319 291 320 if (fErrConnect!=msg.str()) … … 315 344 316 345 // FIXME: Async connect should get address and port as an argument 317 void Connection:: AsyncConnect()346 void Connection::StartConnect() 318 347 { 319 348 fConnectionStatus = 1; … … 321 350 tcp::resolver resolver(get_io_service()); 322 351 352 boost::system::error_code ec; 353 323 354 tcp::resolver::query query(fAddress, fPort); 324 tcp::resolver::iterator iterator = resolver.resolve(query );355 tcp::resolver::iterator iterator = resolver.resolve(query, ec); 325 356 326 357 stringstream msg; 327 msg << "Trying to connect to " << fAddress << ":" << fPort << "..."; 328 358 if (!fAddress.empty() || !fPort.empty() || ec) 359 msg << "Trying to connect to " << URL() << "..."; 360 361 if (ec) 362 msg << " " << ec.message() << " (" << ec << ")"; 363 364 // Only output message if it has changed 329 365 if (fMsgConnect!=msg.str()) 330 366 { 331 367 fMsgConnect = msg.str(); 332 Message(msg); 333 } 334 335 // Start connection attempts (will also reset deadline counter) 336 AsyncConnect(iterator); 337 } 338 339 void Connection::SetEndpoint(const char *addr, int port) 368 ec ? Error(msg) : Message(msg); 369 } 370 371 if (ec) 372 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer); 373 else 374 // Start connection attempts (will also reset deadline counter) 375 AsyncConnect(iterator); 376 } 377 378 void Connection::SetEndpoint(const string &addr, int port) 340 379 { 341 380 if (fConnectionStatus>=1) … … 346 385 } 347 386 387 void Connection::SetEndpoint(const string &addr, const string &port) 388 { 389 if (fConnectionStatus>=1 && URL()!=":") 390 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection."); 391 392 fAddress = addr; 393 fPort = port; 394 } 395 396 void Connection::SetEndpoint(const string &addr) 397 { 398 const size_t p0 = addr.find_first_of(':'); 399 const size_t p1 = addr.find_last_of(':'); 400 401 if (p0==string::npos || p0!=p1) 402 { 403 Error("Connection::SetEndPoint - Wrong format of argument."); 404 return; 405 } 406 407 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1)); 408 } 409 410 348 411 349 412 Connection::Connection(ba::io_service& ioservice, ostream &out) : 350 413 MessageImp(out), tcp::socket(ioservice), 351 fLog(0), fAddress("localhost"), fPort("5000"),414 fLog(0), //fAddress("localhost"), fPort("5000"), 352 415 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice), 353 416 fConnectionStatus(0) 354 417 { 355 418 } 356 419 /* 357 420 Connection::Connection(ba::io_service& ioservice, const string &addr, int port) : 358 421 tcp::socket(ioservice), … … 369 432 { 370 433 } 371 434 */
Note:
See TracChangeset
for help on using the changeset viewer.