source: trunk/FACT++/src/Connection.cc@ 14600

Last change on this file since 14600 was 14577, checked in by tbretz, 12 years ago
Improved StartConnect to get rid of a misleading error at startup of most programs.
File size: 13.9 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6@todo
7 Unify with ConnectionUSB
8
9*/
10// **************************************************************************
11#include "Connection.h"
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using ba::ip::tcp;
20
21 // -------- Abbreviations for starting async tasks ---------
22
23int Connection::Write(const Time &t, const string &txt, int qos)
24{
25 if (fLog)
26 return fLog->Write(t, txt, qos);
27
28 return MessageImp::Write(t, txt, qos);
29}
30
31void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
32{
33 ba::async_read(*this, buffers,
34 boost::bind(&Connection::HandleReceivedData, this,
35 dummy::error, dummy::bytes_transferred, type));
36}
37
38void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
39{
40 ba::async_write(*this, buffers,
41 boost::bind(&Connection::HandleSentData, this,
42 dummy::error, dummy::bytes_transferred));
43}
44
45/*
46void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
47 void (Connection::*handler)(const bs::error_code&))
48{
49 // - The boost::asio::basic_deadline_timer::expires_from_now()
50 // function cancels any pending asynchronous waits, and returns
51 // the number of asynchronous waits that were cancelled. If it
52 // returns 0 then you were too late and the wait handler has
53 // already been executed, or will soon be executed. If it
54 // returns 1 then the wait handler was successfully cancelled.
55 // - If a wait handler is cancelled, the bs::error_code passed to
56 // it contains the value bs::error::operation_aborted.
57 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
58
59 timer.async_wait(boost::bind(handler, this, dummy::error));
60}
61*/
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65 tcp::endpoint endpoint = *iterator;
66
67 // AsyncConnect + Deadline
68 async_connect(endpoint,
69 boost::bind(&Connection::ConnectIter,
70 this, iterator, ba::placeholders::error));
71
72 // We will get a "Connection timeout anyway"
73 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
74}
75
76void Connection::AsyncConnect()
77{
78 // AsyncConnect + Deadline
79 async_connect(fEndpoint,
80 boost::bind(&Connection::ConnectAddr,
81 this, fEndpoint, ba::placeholders::error));
82
83 // We will get a "Connection timeout anyway"
84 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
85}
86
87// ------------------------ close --------------------------
88// close from another thread
89void Connection::CloseImp(bool restart)
90{
91 if (IsConnected() && fVerbose)
92 {
93 ostringstream str;
94 str << "Connection closed to " << URL() << ".";
95 Info(str);
96 }
97
98 // Stop any pending connection attempt
99 fConnectionTimer.cancel();
100
101 // Close possible open connections
102 close();
103
104 // Reset the connection status
105 fConnectionStatus = kDisconnected;
106
107 // Stop deadline counters
108 fInTimeout.cancel();
109 fOutTimeout.cancel();
110
111 // Empty output queue
112 fOutQueue.clear();
113
114 if (!restart || IsConnecting())
115 return;
116
117 // We need some timeout before reconnecting!
118 // And we have to check if we are alreayd trying to connect
119 // We shoudl wait until all operations in progress were canceled
120
121 // Start trying to reconnect
122 fMsgConnect = "";
123 fErrConnect = "";
124 StartConnect();
125}
126
127void Connection::PostClose(bool restart)
128{
129 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
130}
131
132// ------------------------ write --------------------------
133void Connection::HandleWriteTimeout(const bs::error_code &error)
134{
135 if (error==ba::error::basic_errors::operation_aborted)
136 return;
137
138 // 125: Operation canceled (bs::error_code(125, bs::system_category))
139 if (error)
140 {
141 ostringstream str;
142 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
143 Error(str);
144
145 CloseImp();
146 return;
147 }
148
149 if (!is_open())
150 {
151 // For example: Here we could schedule a new accept if we
152 // would not want to allow two connections at the same time.
153 return;
154 }
155
156 // Check whether the deadline has passed. We compare the deadline
157 // against the current time since a new asynchronous operation
158 // may have moved the deadline before this actor had a chance
159 // to run.
160 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
161 return;
162
163 Error("fOutTimeout has expired, writing data to "+URL());
164
165 CloseImp();
166}
167
168void Connection::HandleSentData(const bs::error_code& error, size_t n)
169{
170 if (error && error != ba::error::not_connected)
171 {
172 ostringstream str;
173 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
174 Error(str);
175
176 CloseImp();
177 return;
178 }
179
180 if (error == ba::error::not_connected)
181 {
182 ostringstream msg;
183 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
184 Warn(msg);
185 }
186 else
187 {
188 if (fDebugTx)
189 {
190 ostringstream msg;
191 msg << n << " bytes successfully sent to " << URL();
192 Debug(msg);
193 }
194 }
195
196 // This is "thread" safe because SendMessage and HandleSentMessage
197 // are serialized in the EventQueue. Note: Do not call these
198 // functions directly from any other place then Handlers, use
199 // PostMessage instead
200 if (!fOutQueue.empty())
201 fOutQueue.pop_front();
202
203 if (fOutQueue.empty())
204 {
205 // Queue went empty, remove deadline
206 fOutTimeout.cancel();
207 return;
208 }
209
210 // AsyncWrite + Deadline
211 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
212 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
213}
214
215// It is important that when SendMessageImp is called, or to be more
216// precise boost::bind is called, teh data is copied!
217void Connection::SendMessageImp(const vector<char> msg)
218{
219 /*
220 if (!fConnectionEstablished)
221 {
222 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
223 return;
224 }*/
225
226 const bool first_message_in_queue = fOutQueue.empty();
227
228 // This is "thread" safe because SendMessage and HandleSentMessage
229 // are serialized in the EventQueue. Note: Do not call these
230 // functions directly from any other place then Handlers, use
231 // PostMessage instead
232 fOutQueue.push_back(msg);
233
234 if (!first_message_in_queue)
235 return;
236
237 // AsyncWrite + Deadline
238 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
239 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
240}
241
242void Connection::PostMessage(const void *ptr, size_t max)
243{
244 const vector<char> msg(reinterpret_cast<const char*>(ptr),
245 reinterpret_cast<const char*>(ptr)+max);
246
247 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
248}
249
250void Connection::PostMessage(const string &cmd, size_t max)
251{
252 if (max==size_t(-1))
253 max = cmd.length()+1;
254
255 vector <char>msg(max);
256
257 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
258
259 PostMessage(msg);
260}
261
262void Connection::HandleConnectionTimer(const bs::error_code &error)
263{
264 if (error==ba::error::basic_errors::operation_aborted)
265 return;
266
267 if (error)
268 {
269 ostringstream str;
270 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
271 Error(str);
272 }
273
274 if (is_open())
275 {
276 // For example: Here we could schedule a new accept if we
277 // would not want to allow two connections at the same time.
278 return;
279 }
280
281 // Check whether the deadline has passed. We compare the deadline
282 // against the current time since a new asynchronous operation
283 // may have moved the deadline before this actor had a chance
284 // to run.
285 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
286 StartConnect();
287}
288
289bool Connection::ConnectImp(const tcp::endpoint &endpoint, const bs::error_code& error)
290{
291 const string host = endpoint.port()==0 ? "" :
292 endpoint.address().to_string()+':'+to_string((long long unsigned int)endpoint.port());
293
294 // Connection established
295 if (!error)
296 {
297 set_option(socket_base::keep_alive(true));
298
299 const int optval = 30;
300 // First keep alive after 30s
301 setsockopt(native(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
302 // New keep alive after 30s
303 setsockopt(native(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
304
305 if (fVerbose)
306 Info("Connection established to "+host+"...");
307
308 fConnectionStatus = kConnected;
309
310 ConnectionEstablished();
311 return true;
312 }
313
314 // If returning from run will lead to deletion of this
315 // instance, close() is not needed (maybe implicitly called).
316 // If run is called again, close() is needed here. Otherwise:
317 // Software caused connection abort when we try to resolve
318 // the endpoint again.
319 CloseImp(false);
320
321 ostringstream msg;
322 if (!host.empty())
323 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
324
325 if (fErrConnect!=msg.str())
326 {
327 if (error!=ba::error::basic_errors::connection_refused)
328 fMsgConnect = "";
329 fErrConnect = msg.str();
330 Warn(fErrConnect);
331 }
332
333 if (error==ba::error::basic_errors::operation_aborted)
334 return true;
335
336 fConnectionStatus = kConnecting;
337
338 return false;
339/*
340 // Go on with the next
341 if (++iterator != tcp::resolver::iterator())
342 {
343 AsyncConnect(iterator);
344 return;
345 }
346*/
347 // No more entries to try, if we would not put anything else
348 // into the queue anymore it would now return (run() would return)
349
350 // Since we don't want to block the main loop, we wait using an
351 // asnychronous timer
352
353 // FIXME: Should we move this before AsyncConnect() ?
354// AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
355}
356
357void Connection::ConnectIter(tcp::resolver::iterator iterator, const bs::error_code& error)
358{
359 if (ConnectImp(*iterator, error))
360 return;
361
362 // Go on with the next
363 if (++iterator != tcp::resolver::iterator())
364 {
365 AsyncConnect(iterator);
366 return;
367 }
368
369 // No more entries to try, if we would not put anything else
370 // into the queue anymore it would now return (run() would return)
371 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
372}
373
374void Connection::ConnectAddr(const tcp::endpoint &endpoint, const bs::error_code& error)
375{
376 if (ConnectImp(endpoint, error))
377 return;
378
379 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
380}
381
382// FIXME: Async connect should get address and port as an argument
383void Connection::StartConnect()
384{
385 fConnectionStatus = kConnecting;
386
387 if (fEndpoint!=tcp::endpoint())
388 {
389 ostringstream msg;
390 msg << "Trying to connect to " << fEndpoint << "...";
391 if (fMsgConnect!=msg.str())
392 {
393 fMsgConnect = msg.str();
394 Info(msg);
395 }
396
397 AsyncConnect();
398 return;
399 }
400
401 const bool valid = !fAddress.empty() || !fPort.empty();
402
403 boost::system::error_code ec;
404
405 ostringstream msg;
406 if (!valid)
407 msg << "No target address... connection attempt postponed.";
408 else
409 {
410 tcp::resolver resolver(get_io_service());
411
412 tcp::resolver::query query(fAddress, fPort);
413 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
414
415 msg << "Trying to connect to " << URL() << "...";
416
417 // Start connection attempts (will also reset deadline counter)
418 if (!ec)
419 AsyncConnect(iterator);
420 else
421 msg << " " << ec.message() << " (" << ec << ")";
422 }
423
424 // Only output message if it has changed
425 if (fMsgConnect!=msg.str())
426 {
427 fMsgConnect = msg.str();
428 if (ec)
429 Error(msg);
430 if (!ec && fVerbose)
431 Info(msg);
432 }
433
434 if (!valid || ec)
435 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
436}
437
438void Connection::SetEndpoint(const string &addr, int port)
439{
440 if (fConnectionStatus>=1)
441 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
442
443 fAddress = addr;
444 fPort = to_string((long long)port);
445}
446
447void Connection::SetEndpoint(const string &addr, const string &port)
448{
449 if (fConnectionStatus>=1 && URL()!=":")
450 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
451
452 fAddress = addr;
453 fPort = port;
454}
455
456void Connection::SetEndpoint(const string &addr)
457{
458 const size_t p0 = addr.find_first_of(':');
459 const size_t p1 = addr.find_last_of(':');
460
461 if (p0==string::npos || p0!=p1)
462 {
463 Error("Connection::SetEndpoint - Wrong format of argument '"+addr+"' ('host:port' expected)");
464 return;
465 }
466
467 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
468}
469
470void Connection::SetEndpoint(const tcp::endpoint &ep)
471{
472 const ba::ip::address addr = ep.address();
473
474 const ba::ip::address use =
475 addr.is_v6() && addr.to_v6().is_loopback() ?
476 ba::ip::address(ba::ip::address_v4::loopback()) :
477 addr;
478
479 SetEndpoint(use.to_string(), ep.port());
480
481 fEndpoint = tcp::endpoint(use, ep.port());
482}
483
484
485Connection::Connection(ba::io_service& ioservice, ostream &out) :
486MessageImp(out), tcp::socket(ioservice),
487fLog(0), fVerbose(true), fDebugTx(false),
488fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
489fConnectionStatus(kDisconnected)
490{
491}
Note: See TracBrowser for help on using the repository browser.