Changeset 10284 for trunk/FACT++/src/Connection.cc
- Timestamp:
- 04/05/11 11:21:27 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/Connection.cc
r10268 r10284 31 31 } 32 32 33 void Connection::AsyncRead( ba::mutable_buffers_1 buffers, int type)33 void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type) 34 34 { 35 35 ba::async_read(*this, buffers, … … 38 38 } 39 39 40 void Connection::AsyncWrite( ba::mutable_buffers_1buffers)40 void Connection::AsyncWrite(const ba::const_buffers_1 &buffers) 41 41 { 42 42 ba::async_write(*this, buffers, … … 63 63 void Connection::AsyncConnect(tcp::resolver::iterator iterator) 64 64 { 65 //cout << "Start async connect(...)" << endl;66 fConnectionStatus = 1;67 68 65 tcp::endpoint endpoint = *iterator; 69 66 … … 72 69 boost::bind(&Connection::ConnectImp, 73 70 this, ba::placeholders::error, 74 ++iterator));71 iterator)); 75 72 76 73 // We will get a "Connection timeout anyway" … … 85 82 { 86 83 stringstream str; 87 str << "Connection closed to " << URL() << "."; // << endl;84 str << "Connection closed to " << URL() << "."; 88 85 Message(str); 89 86 } … … 96 93 97 94 // Reset the connection status 98 fConnectionStatus = 0;95 fConnectionStatus = kDisconnected; 99 96 100 97 // Stop deadline counters … … 102 99 fOutTimeout.cancel(); 103 100 101 // Empty output queue 102 fOutQueue.clear(); 103 104 104 if (!restart || IsConnecting()) 105 105 return; … … 111 111 // Start trying to reconnect 112 112 fMsgConnect = ""; 113 fErrConnect = ""; 113 114 StartConnect(); 114 115 } … … 122 123 void Connection::HandleWriteTimeout(const bs::error_code &error) 123 124 { 124 // 125: Operation canceled 125 if (error && error!=b s::error_code(125, bs::system_category))125 // 125: Operation canceled (bs::error_code(125, bs::system_category)) 126 if (error && error!=ba::error::basic_errors::operation_aborted) 126 127 { 127 128 stringstream str; 128 129 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl; 129 130 Error(str); 131 132 CloseImp(); 133 return; 130 134 } 131 135 … … 149 153 } 150 154 151 void Connection::HandleSentData(const bs::error_code& error, size_t )155 void Connection::HandleSentData(const bs::error_code& error, size_t n) 152 156 { 153 157 if (error) … … 161 165 } 162 166 163 Message("Data successfully sent to "+URL()); 167 stringstream msg; 168 msg << n << " bytes successfully sent to " << URL(); 169 Message(msg); 164 170 165 171 // This is "thread" safe because SendMessage and HandleSentMessage … … 177 183 178 184 // AsyncWrite + Deadline 179 AsyncWrite(ba:: buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);185 AsyncWrite(ba::const_buffers_1(&fOutQueue.front()[0],fOutQueue.front().size())/*, &Connection::HandleSentData*/); 180 186 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout); 181 187 } 182 188 183 void Connection::SendMessageImp(const vector<char> &msg) 189 // It is important that when SendMessageImp is called, or to be more 190 // precise boost::bind is called, teh data is copied! 191 void Connection::SendMessageImp(const vector<char> msg) 184 192 { 185 193 /* … … 202 210 203 211 // AsyncWrite + Deadline 204 AsyncWrite(ba:: buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);212 AsyncWrite(ba::const_buffers_1(&fOutQueue.front()[0],fOutQueue.front().size())/*, &Connection::HandleSentData*/); 205 213 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout); 206 }207 208 void Connection::PostMessage(vector<uint16_t> inp)209 {210 // Convert to network byte order211 for_each(inp.begin(), inp.end(), htons);212 213 // FIXME FIXME214 215 PostMessage((char*)&inp.front(), sizeof(uint16_t)*inp.size());216 217 // const vector<char> msg((char*)&inp.front(), (char*)&inp.last()+1);218 // get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));219 }220 221 void Connection::PostMessage(const vector<char> &msg)222 {223 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));224 214 } 225 215 … … 239 229 vector <char>msg(max); 240 230 241 for (unsigned int i=0; i<max; i++) 242 msg[i] = 0; 243 244 for (unsigned int i=0; i<min(cmd.length()+1, max); i++) 245 msg[i] = cmd[i]; 246 247 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg)); 231 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin()); 232 233 PostMessage(msg); 248 234 } 249 235 … … 251 237 { 252 238 // 125: Operation canceled 253 if (error && error!=b s::error_code(125, bs::system_category))239 if (error && error!=ba::error::basic_errors::operation_aborted) 254 240 { 255 241 stringstream str; … … 274 260 275 261 void Connection::ConnectImp(const bs::error_code& error, 276 tcp::resolver::iterator endpoint_iterator) 277 { 262 tcp::resolver::iterator iterator) 263 { 264 tcp::endpoint endpoint = *iterator; 265 266 const string host = endpoint.port()==0 ? "" : 267 endpoint.address().to_string()+":"+lexical_cast<string>(endpoint.port()); 268 278 269 // Connection established 279 270 if (!error) 280 271 { 281 Message("Connection established to "+URL()+"..."); 282 283 // Initialize hardware... 284 // Set state to : connected/undefined until we get the first message 285 // (send request for configuration) ? 286 // (send configuration?) 287 fMsgConnect = ""; 288 fErrConnect = ""; 289 290 fConnectionStatus = 2; 291 //StartAsyncRead(); 292 293 // We can create the buffer here and delete it in 294 // Handle Read. However, the read buffer is not filled again 295 // before it was not read within HandleReceivedData -- so a single 296 // buffer should be good enough. Before HandleReceivedData 297 // returns the data must be copied to a "safe" place. 298 299 // fSocket.get_io_service()/*fEventQueue*/.stop(); 272 Message("Connection established to "+host+"..."); 273 274 fConnectionStatus = kConnected; 300 275 301 276 ConnectionEstablished(); … … 309 284 // the endpoint again. 310 285 CloseImp(false); 311 //fSocket.close(); 312 313 // 111: Connection refused 314 if (1/*error!=bs::error_code(111, bs::system_category)*/) 315 { 316 stringstream msg; 317 if (URL()!=":") 318 msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")"; 319 320 if (fErrConnect!=msg.str()) 321 { 286 287 stringstream msg; 288 if (!host.empty()) 289 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")"; 290 291 if (fErrConnect!=msg.str()) 292 { 293 if (error!=ba::error::basic_errors::connection_refused) 322 294 fMsgConnect = ""; 323 fErrConnect = msg.str(); 324 Warn(fErrConnect); 325 } 295 fErrConnect = msg.str(); 296 Warn(fErrConnect); 326 297 } 327 298 328 299 // Go on with the next 329 if ( endpoint_iterator != tcp::resolver::iterator())330 { 331 AsyncConnect( endpoint_iterator);300 if (++iterator != tcp::resolver::iterator()) 301 { 302 AsyncConnect(iterator); 332 303 return; 333 304 } … … 346 317 void Connection::StartConnect() 347 318 { 348 fConnectionStatus = 1;319 fConnectionStatus = kConnecting; 349 320 350 321 tcp::resolver resolver(get_io_service()); … … 401 372 if (p0==string::npos || p0!=p1) 402 373 { 403 Error("Connection::SetEndPoint - Wrong format of argument .");374 Error("Connection::SetEndPoint - Wrong format of argument ('host:port' expected)"); 404 375 return; 405 376 } … … 411 382 412 383 Connection::Connection(ba::io_service& ioservice, ostream &out) : 413 MessageImp(out), tcp::socket(ioservice), 414 fLog(0), //fAddress("localhost"), fPort("5000"), 384 MessageImp(out), tcp::socket(ioservice), fLog(0), 415 385 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice), 416 fConnectionStatus(0) 417 { 418 } 419 /* 420 Connection::Connection(ba::io_service& ioservice, const string &addr, int port) : 421 tcp::socket(ioservice), 422 fLog(0), fAddress(addr), fPort(lexical_cast<string>(port)), 423 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice), 424 fConnectionStatus(0) 425 { 426 } 427 428 Connection::Connection(ba::io_service& ioservice, const string &addr, const string &port) : 429 tcp::socket(ioservice), fLog(0), fAddress(addr), fPort(port), 430 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice), 431 fConnectionStatus(0) 432 { 433 } 434 */ 386 fConnectionStatus(kDisconnected) 387 { 388 }
Note:
See TracChangeset
for help on using the changeset viewer.