Changeset 17929
- Timestamp:
- 07/23/14 12:00:00 (10 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/sqmctrl.cc
r17918 r17929 29 29 30 30 private: 31 bool fIsVerbose; 32 bool fMsgReceived; 33 bool fMsgValid; 34 31 bool fIsVerbose; 35 32 uint16_t fTimeout; 36 33 37 34 boost::asio::streambuf fBuffer; 35 36 boost::asio::deadline_timer fTrigger; 38 37 39 38 void HandleRead(const boost::system::error_code& err, size_t bytes_received) … … 64 63 if (!getline(is, buffer, '\n')) 65 64 { 66 Error("Received message does not contain \\n... closing connection.");65 Fatal("Received message does not contain \\n... closing connection."); 67 66 PostClose(false); 68 67 return; … … 72 71 73 72 if (fIsVerbose) 74 Out() << buffer << endl;73 Out() << Time().GetAsStr("%H:%M:%S.%f") << "[" << buffer.size() << "]: " << buffer << endl; 75 74 76 75 vector<string> vec; 77 76 boost::split(vec, buffer, boost::is_any_of(",")); 78 77 78 // Send next request in fTimeout milliseconds 79 fTrigger.expires_from_now(boost::posix_time::milliseconds(fTimeout)); 80 fTrigger.async_wait(boost::bind(&ConnectionSQM::HandleRequestTrigger, 81 this, dummy::error)); 79 82 try 80 83 { … … 83 86 84 87 if (vec[0]!="r") 85 throw runtime_error("Not a 'reading request' answer .");88 throw runtime_error("Not a 'reading request' answer: "+vec[0]); 86 89 87 90 SQM::Data data; … … 97 100 catch (const exception &e) 98 101 { 99 fMsgValid = false;100 102 Error("Parsing received data failed ["+string(e.what())+"]"); 101 103 Error("Received: "+buffer); 102 PostClose(false); 103 return; 104 } 105 106 fMsgValid = true; 107 fMsgReceived = true; 104 PostClose(true); 105 } 108 106 } 109 107 … … 114 112 { 115 113 ostringstream str; 116 str << "Read timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;114 str << "ReadTimeout of " << URL() << " failed: " << error.message() << " (" << error << ")";// << endl; 117 115 Error(str); 118 116 … … 125 123 // For example: Here we could schedule a new accept if we 126 124 // would not want to allow two connections at the same time. 127 PostClose(true);125 //PostClose(true); 128 126 return; 129 127 } … … 136 134 return; 137 135 138 if (fMsgReceived)139 {140 StartReadReport();141 return;142 }143 144 136 ostringstream str; 145 str << "No answer received from " << URL() << ".";137 str << "No valid answer received from " << URL() << " within " << ceil(fTimeout*1.5) << "ms"; 146 138 Error(str); 147 139 … … 149 141 } 150 142 143 void HandleRequestTrigger(const bs::error_code &error) 144 { 145 // 125: Operation canceled (bs::error_code(125, bs::system_category)) 146 if (error && error!=ba::error::basic_errors::operation_aborted) 147 { 148 ostringstream str; 149 str << "RequestTrigger failed of " << URL() << " failed: " << error.message() << " (" << error << ")";// << endl; 150 Error(str); 151 152 PostClose(true); 153 return; 154 } 155 156 if (!is_open()) 157 { 158 // For example: Here we could schedule a new accept if we 159 // would not want to allow two connections at the same time. 160 //PostClose(true); 161 return; 162 } 163 164 // Check whether the deadline has passed. We compare the deadline 165 // against the current time since a new asynchronous operation 166 // may have moved the deadline before this actor had a chance 167 // to run. 168 if (fTrigger.expires_at() > ba::deadline_timer::traits_type::now()) 169 return; 170 171 StartReadReport(); 172 } 173 151 174 void StartReadReport() 152 175 { 153 fMsgReceived = false;154 155 176 PostMessage(string("rx\n"), 3); 177 156 178 async_read_until(*this, fBuffer, '\n', 157 179 boost::bind(&ConnectionSQM::HandleRead, this, 158 180 dummy::error, dummy::bytes_transferred)); 159 181 160 fInTimeout.expires_from_now(boost::posix_time::milliseconds(fTimeout ));182 fInTimeout.expires_from_now(boost::posix_time::milliseconds(fTimeout*2.5)); 161 183 fInTimeout.async_wait(boost::bind(&ConnectionSQM::HandleReadTimeout, 162 184 this, dummy::error)); … … 175 197 public: 176 198 ConnectionSQM(ba::io_service& ioservice, MessageImp &imp) : Connection(ioservice, imp()), 177 fIsVerbose(true) 199 fIsVerbose(true), fTimeout(0), fTrigger(ioservice) 178 200 { 179 201 SetLogStream(&imp); … … 193 215 int GetState() const 194 216 { 195 if (!is_open()) 196 return SQM::State::kDisconnected; 197 198 return fMsgValid ? SQM::State::kValid : SQM::State::kConnected; 217 return is_open() ? SQM::State::kConnected : SQM::State::kDisconnected; 199 218 } 200 219 };
Note:
See TracChangeset
for help on using the changeset viewer.