- Timestamp:
- 06/08/13 12:22:15 (11 years ago)
- Location:
- trunk/FACT++/src
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/Connection.cc
r14577 r16769 103 103 104 104 // Reset the connection status 105 fQueueSize = 0; 105 106 fConnectionStatus = kDisconnected; 106 107 … … 108 109 fInTimeout.cancel(); 109 110 fOutTimeout.cancel(); 110 111 // Empty output queue112 fOutQueue.clear();113 111 114 112 if (!restart || IsConnecting()) … … 168 166 void Connection::HandleSentData(const bs::error_code& error, size_t n) 169 167 { 168 if (error==ba::error::basic_errors::operation_aborted) 169 return; 170 170 171 if (error && error != ba::error::not_connected) 171 172 { … … 183 184 msg << n << " bytes could not be sent to " << URL() << " due to missing connection."; 184 185 Warn(msg); 185 } 186 else 187 { 188 if (fDebugTx) 189 { 190 ostringstream msg; 191 msg << n << " bytes successfully sent to " << URL(); 192 Debug(msg); 193 } 194 } 195 196 // This is "thread" safe because SendMessage and HandleSentMessage 197 // are serialized in the EventQueue. Note: Do not call these 198 // functions directly from any other place then Handlers, use 199 // PostMessage instead 200 if (!fOutQueue.empty()) 201 fOutQueue.pop_front(); 202 203 if (fOutQueue.empty()) 204 { 205 // Queue went empty, remove deadline 186 187 return; 188 } 189 190 if (--fQueueSize==0) 206 191 fOutTimeout.cancel(); 207 return; 208 } 209 210 // AsyncWrite + Deadline 211 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/); 192 193 if (fDebugTx) 194 { 195 ostringstream msg; 196 msg << n << " bytes successfully sent to " << URL(); 197 Debug(msg); 198 } 199 } 200 201 void Connection::PostMessage(const void *ptr, size_t sz) 202 { 203 // This function can be called from a different thread... 204 if (!is_open()) 205 return; 206 207 // ... this is why we have to increase fQueueSize first 208 fQueueSize++; 209 210 // ... and shift the deadline timer 211 // This is not ideal, because if we are continously 212 // filling the buffer, it will never timeout 212 213 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout); 213 } 214 215 // It is important that when SendMessageImp is called, or to be more 216 // precise boost::bind is called, teh data is copied! 217 void Connection::SendMessageImp(const vector<char> msg) 218 { 219 /* 220 if (!fConnectionEstablished) 221 { 222 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+"."); 223 return; 224 }*/ 225 226 const bool first_message_in_queue = fOutQueue.empty(); 227 228 // This is "thread" safe because SendMessage and HandleSentMessage 229 // are serialized in the EventQueue. Note: Do not call these 230 // functions directly from any other place then Handlers, use 231 // PostMessage instead 232 fOutQueue.push_back(msg); 233 234 if (!first_message_in_queue) 235 return; 236 237 // AsyncWrite + Deadline 238 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/); 239 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout); 240 } 241 242 void Connection::PostMessage(const void *ptr, size_t max) 243 { 244 const vector<char> msg(reinterpret_cast<const char*>(ptr), 245 reinterpret_cast<const char*>(ptr)+max); 246 247 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg)); 214 215 // Now we can schedule the buffer to be sent 216 AsyncWrite(ba::const_buffers_1(ptr, sz)); 217 218 // If a socket is closed, all pending asynchronous 219 // operation will be aborted. 248 220 } 249 221 … … 253 225 max = cmd.length()+1; 254 226 255 vector <char>msg(max); 256 257 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin()); 258 259 PostMessage(msg); 227 PostMessage(cmd.c_str(), min(cmd.length()+1, max)); 260 228 } 261 229 … … 306 274 Info("Connection established to "+host+"..."); 307 275 276 fQueueSize = 0; 308 277 fConnectionStatus = kConnected; 309 278 … … 487 456 fLog(0), fVerbose(true), fDebugTx(false), 488 457 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice), 489 f ConnectionStatus(kDisconnected)490 { 491 } 458 fQueueSize(0), fConnectionStatus(kDisconnected) 459 { 460 } -
trunk/FACT++/src/Connection.h
r16090 r16769 39 39 boost::asio::deadline_timer fOutTimeout; 40 40 boost::asio::deadline_timer fConnectionTimer; 41 std::list<std::vector<char>> fOutQueue; 41 42 size_t fQueueSize; 42 43 43 44 ConnectionStatus_t fConnectionStatus; … … 116 117 117 118 // ------------------------ write -------------------------- 118 void SendMessageImp(const std::vector<char> msg);119 119 void PostMessage(const void *msg, size_t s=0); 120 120 void PostMessage(const std::string &cmd, size_t s=-1); … … 137 137 virtual void HandleReadTimeout(const boost::system::error_code&) { } 138 138 139 bool IsTxQueueEmpty() const { return f OutQueue.empty();}139 bool IsTxQueueEmpty() const { return fQueueSize==0; /*fOutQueue.empty();*/ } 140 140 141 141 int IsClosed() const { return !is_open(); }
Note:
See TracChangeset
for help on using the changeset viewer.