Changeset 10284 for trunk/FACT++
- Timestamp:
- 04/05/11 11:21:27 (14 years ago)
- Location:
- trunk/FACT++/src
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/Connection.cc
r10268 r10284 31 31 } 32 32 33 void Connection::AsyncRead( ba::mutable_buffers_1 buffers, int type)33 void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type) 34 34 { 35 35 ba::async_read(*this, buffers, … … 38 38 } 39 39 40 void Connection::AsyncWrite( ba::mutable_buffers_1buffers)40 void Connection::AsyncWrite(const ba::const_buffers_1 &buffers) 41 41 { 42 42 ba::async_write(*this, buffers, … … 63 63 void Connection::AsyncConnect(tcp::resolver::iterator iterator) 64 64 { 65 //cout << "Start async connect(...)" << endl;66 fConnectionStatus = 1;67 68 65 tcp::endpoint endpoint = *iterator; 69 66 … … 72 69 boost::bind(&Connection::ConnectImp, 73 70 this, ba::placeholders::error, 74 ++iterator));71 iterator)); 75 72 76 73 // We will get a "Connection timeout anyway" … … 85 82 { 86 83 stringstream str; 87 str << "Connection closed to " << URL() << "."; // << endl;84 str << "Connection closed to " << URL() << "."; 88 85 Message(str); 89 86 } … … 96 93 97 94 // Reset the connection status 98 fConnectionStatus = 0;95 fConnectionStatus = kDisconnected; 99 96 100 97 // Stop deadline counters … … 102 99 fOutTimeout.cancel(); 103 100 101 // Empty output queue 102 fOutQueue.clear(); 103 104 104 if (!restart || IsConnecting()) 105 105 return; … … 111 111 // Start trying to reconnect 112 112 fMsgConnect = ""; 113 fErrConnect = ""; 113 114 StartConnect(); 114 115 } … … 122 123 void Connection::HandleWriteTimeout(const bs::error_code &error) 123 124 { 124 // 125: Operation canceled 125 if (error && error!=b s::error_code(125, bs::system_category))125 // 125: Operation canceled (bs::error_code(125, bs::system_category)) 126 if (error && error!=ba::error::basic_errors::operation_aborted) 126 127 { 127 128 stringstream str; 128 129 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl; 129 130 Error(str); 131 132 CloseImp(); 133 return; 130 134 } 131 135 … … 149 153 } 150 154 151 void Connection::HandleSentData(const bs::error_code& error, size_t )155 void Connection::HandleSentData(const bs::error_code& error, size_t n) 152 156 { 153 157 if (error) … … 161 165 } 162 166 163 Message("Data successfully sent to "+URL()); 167 stringstream msg; 168 msg << n << " bytes successfully sent to " << URL(); 169 Message(msg); 164 170 165 171 // This is "thread" safe because SendMessage and HandleSentMessage … … 177 183 178 184 // AsyncWrite + Deadline 179 AsyncWrite(ba:: buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);185 AsyncWrite(ba::const_buffers_1(&fOutQueue.front()[0],fOutQueue.front().size())/*, &Connection::HandleSentData*/); 180 186 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout); 181 187 } 182 188 183 void Connection::SendMessageImp(const vector<char> &msg) 189 // It is important that when SendMessageImp is called, or to be more 190 // precise boost::bind is called, teh data is copied! 191 void Connection::SendMessageImp(const vector<char> msg) 184 192 { 185 193 /* … … 202 210 203 211 // AsyncWrite + Deadline 204 AsyncWrite(ba:: buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);212 AsyncWrite(ba::const_buffers_1(&fOutQueue.front()[0],fOutQueue.front().size())/*, &Connection::HandleSentData*/); 205 213 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout); 206 }207 208 void Connection::PostMessage(vector<uint16_t> inp)209 {210 // Convert to network byte order211 for_each(inp.begin(), inp.end(), htons);212 213 // FIXME FIXME214 215 PostMessage((char*)&inp.front(), sizeof(uint16_t)*inp.size());216 217 // const vector<char> msg((char*)&inp.front(), (char*)&inp.last()+1);218 // get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));219 }220 221 void Connection::PostMessage(const vector<char> &msg)222 {223 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));224 214 } 225 215 … … 239 229 vector <char>msg(max); 240 230 241 for (unsigned int i=0; i<max; i++) 242 msg[i] = 0; 243 244 for (unsigned int i=0; i<min(cmd.length()+1, max); i++) 245 msg[i] = cmd[i]; 246 247 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg)); 231 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin()); 232 233 PostMessage(msg); 248 234 } 249 235 … … 251 237 { 252 238 // 125: Operation canceled 253 if (error && error!=b s::error_code(125, bs::system_category))239 if (error && error!=ba::error::basic_errors::operation_aborted) 254 240 { 255 241 stringstream str; … … 274 260 275 261 void Connection::ConnectImp(const bs::error_code& error, 276 tcp::resolver::iterator endpoint_iterator) 277 { 262 tcp::resolver::iterator iterator) 263 { 264 tcp::endpoint endpoint = *iterator; 265 266 const string host = endpoint.port()==0 ? "" : 267 endpoint.address().to_string()+":"+lexical_cast<string>(endpoint.port()); 268 278 269 // Connection established 279 270 if (!error) 280 271 { 281 Message("Connection established to "+URL()+"..."); 282 283 // Initialize hardware... 284 // Set state to : connected/undefined until we get the first message 285 // (send request for configuration) ? 286 // (send configuration?) 287 fMsgConnect = ""; 288 fErrConnect = ""; 289 290 fConnectionStatus = 2; 291 //StartAsyncRead(); 292 293 // We can create the buffer here and delete it in 294 // Handle Read. However, the read buffer is not filled again 295 // before it was not read within HandleReceivedData -- so a single 296 // buffer should be good enough. Before HandleReceivedData 297 // returns the data must be copied to a "safe" place. 298 299 // fSocket.get_io_service()/*fEventQueue*/.stop(); 272 Message("Connection established to "+host+"..."); 273 274 fConnectionStatus = kConnected; 300 275 301 276 ConnectionEstablished(); … … 309 284 // the endpoint again. 310 285 CloseImp(false); 311 //fSocket.close(); 312 313 // 111: Connection refused 314 if (1/*error!=bs::error_code(111, bs::system_category)*/) 315 { 316 stringstream msg; 317 if (URL()!=":") 318 msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")"; 319 320 if (fErrConnect!=msg.str()) 321 { 286 287 stringstream msg; 288 if (!host.empty()) 289 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")"; 290 291 if (fErrConnect!=msg.str()) 292 { 293 if (error!=ba::error::basic_errors::connection_refused) 322 294 fMsgConnect = ""; 323 fErrConnect = msg.str(); 324 Warn(fErrConnect); 325 } 295 fErrConnect = msg.str(); 296 Warn(fErrConnect); 326 297 } 327 298 328 299 // Go on with the next 329 if ( endpoint_iterator != tcp::resolver::iterator())330 { 331 AsyncConnect( endpoint_iterator);300 if (++iterator != tcp::resolver::iterator()) 301 { 302 AsyncConnect(iterator); 332 303 return; 333 304 } … … 346 317 void Connection::StartConnect() 347 318 { 348 fConnectionStatus = 1;319 fConnectionStatus = kConnecting; 349 320 350 321 tcp::resolver resolver(get_io_service()); … … 401 372 if (p0==string::npos || p0!=p1) 402 373 { 403 Error("Connection::SetEndPoint - Wrong format of argument .");374 Error("Connection::SetEndPoint - Wrong format of argument ('host:port' expected)"); 404 375 return; 405 376 } … … 411 382 412 383 Connection::Connection(ba::io_service& ioservice, ostream &out) : 413 MessageImp(out), tcp::socket(ioservice), 414 fLog(0), //fAddress("localhost"), fPort("5000"), 384 MessageImp(out), tcp::socket(ioservice), fLog(0), 415 385 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice), 416 fConnectionStatus(0) 417 { 418 } 419 /* 420 Connection::Connection(ba::io_service& ioservice, const string &addr, int port) : 421 tcp::socket(ioservice), 422 fLog(0), fAddress(addr), fPort(lexical_cast<string>(port)), 423 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice), 424 fConnectionStatus(0) 425 { 426 } 427 428 Connection::Connection(ba::io_service& ioservice, const string &addr, const string &port) : 429 tcp::socket(ioservice), fLog(0), fAddress(addr), fPort(port), 430 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice), 431 fConnectionStatus(0) 432 { 433 } 434 */ 386 fConnectionStatus(kDisconnected) 387 { 388 } -
trunk/FACT++/src/Connection.h
r10268 r10284 17 17 std::string fAddress; 18 18 std::string fPort; 19 20 enum ConnectionStatus_t 21 { 22 kDisconnected = 0, 23 kConnecting = 1, 24 kConnected = 2, 25 }; 26 19 27 protected: 20 28 boost::asio::deadline_timer fInTimeout; … … 23 31 boost::asio::deadline_timer fOutTimeout; 24 32 boost::asio::deadline_timer fConnectionTimer; 25 std::deque<std::vector<char>> fOutQueue; // Shell we directly put ba:buffers into the queue?33 std::deque<std::vector<char>> fOutQueue; 26 34 27 int fConnectionStatus; // 0=offline, 1=connecting, 2=connected35 ConnectionStatus_t fConnectionStatus; 28 36 29 37 std::string fErrConnect; 30 38 std::string fMsgConnect; 31 39 32 protected:33 char fReadBuffer[1000];34 35 36 40 public: 37 41 void SetLogStream(MessageImp *log) { fLog = log; } 42 std::ostream &Out() { return fLog ? fLog->Out() : Out(); } 38 43 39 44 // -------- Abbreviations for starting async tasks --------- 40 45 41 int Write(const Time &t, const char *txt, int qos=kInfo); 42 43 void AsyncRead(boost::asio::mutable_buffers_1 buffers, int type=0); 44 void AsyncWrite(boost::asio::mutable_buffers_1 buffers); 46 void AsyncRead(const boost::asio::mutable_buffers_1 buffers, int type=0); 47 void AsyncWrite(const boost::asio::const_buffers_1 &buffers); 45 48 void AsyncWait(boost::asio::deadline_timer &timer, int millisec, 46 49 void (Connection::*handler)(const boost::system::error_code&)); 50 51 private: 47 52 void AsyncConnect(boost::asio::ip::tcp::resolver::iterator iterator); 48 49 std::string URL() const { return fAddress + ":" + fPort; }50 53 51 54 void CloseImp(bool restart=true); … … 54 57 boost::asio::ip::tcp::resolver::iterator endpoint_iterator); 55 58 56 public:57 58 // ------------------------ close --------------------------59 // close from another thread60 void PostClose(bool restart=true);61 62 // ------------------------ write --------------------------63 59 void HandleConnectionTimer(const boost::system::error_code &error); 64 60 void HandleWriteTimeout(const boost::system::error_code &error); 65 61 void HandleSentData(const boost::system::error_code& error, size_t); 66 void SendMessageImp(const std::vector<char> &msg);67 void PostMessage(std::vector<uint16_t> msg);68 void PostMessage(const std::vector<char> &msg);69 void PostMessage(const void *msg, size_t s=0);70 void PostMessage(const std::string &cmd, size_t s=-1);71 62 72 template<std::size_t N> 73 void PostMessage(boost::array<uint16_t, N> arr) 74 { 75 // Convert to network byte order 76 for_each(arr.begin(), arr.end(), htons); 77 PostMessage(std::vector<uint16_t>(arr.begin(), arr.end())); 78 } 79 80 // ------------------------ connect -------------------------- 63 int Write(const Time &t, const char *txt, int qos=kInfo); 81 64 82 65 virtual void ConnectionEstablished() { } 83 66 84 void StartConnect(); 67 public: 68 Connection(boost::asio::io_service& io_service, std::ostream &out); 85 69 86 Connection(boost::asio::io_service& io_service, std::ostream &out); 87 // Connection(boost::asio::io_service& io_service, const std::string &addr="localhost", int port=5000); 88 // Connection(boost::asio::io_service& io_service, const std::string &addr, const std::string &port); 70 // ------------------------ connect -------------------------- 89 71 90 72 void SetEndpoint(const std::string &addr, int port); 91 73 void SetEndpoint(const std::string &addr, const std::string &port); 92 74 void SetEndpoint(const std::string &addr); 75 76 void StartConnect(); 77 78 // ------------------------ close -------------------------- 79 void PostClose(bool restart=true); 80 81 // ------------------------ write -------------------------- 82 void SendMessageImp(const std::vector<char> msg); 83 void PostMessage(const void *msg, size_t s=0); 84 void PostMessage(const std::string &cmd, size_t s=-1); 85 86 template<typename T, size_t N> 87 void PostMessage(const boost::array<T, N> &msg) 88 { 89 PostMessage(msg.begin(), msg.size()*sizeof(T)); 90 } 91 92 template<typename T> 93 void PostMessage(const std::vector<T> &msg) 94 { 95 PostMessage(&msg[0], msg.size()*sizeof(T)); 96 } 93 97 94 98 // ------------------------ others -------------------------- … … 99 103 int IsClosed() const { return !is_open(); } 100 104 101 bool IsConnected() const { return fConnectionStatus==2; } 102 bool IsConnecting() const { return fConnectionStatus==1; } 105 bool IsConnected() const { return fConnectionStatus==kConnected; } 106 bool IsConnecting() const { return fConnectionStatus==kConnecting; } 107 108 std::string URL() const { return fAddress + ":" + fPort; } 103 109 }; 104 110
Note:
See TracChangeset
for help on using the changeset viewer.