source: trunk/FACT++/src/Connection.cc@ 14468

Last change on this file since 14468 was 14179, checked in by tbretz, 12 years ago
Changed keep alive time from 10s to 30s. This increases the latency time in case of real disconnects, but ensures that the FADs are ot disconnected only because of a short blocking
File size: 13.8 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6@todo
7 Unify with ConnectionUSB
8
9*/
10// **************************************************************************
11#include "Connection.h"
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using ba::ip::tcp;
20
21 // -------- Abbreviations for starting async tasks ---------
22
23int Connection::Write(const Time &t, const string &txt, int qos)
24{
25 if (fLog)
26 return fLog->Write(t, txt, qos);
27
28 return MessageImp::Write(t, txt, qos);
29}
30
31void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
32{
33 ba::async_read(*this, buffers,
34 boost::bind(&Connection::HandleReceivedData, this,
35 dummy::error, dummy::bytes_transferred, type));
36}
37
38void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
39{
40 ba::async_write(*this, buffers,
41 boost::bind(&Connection::HandleSentData, this,
42 dummy::error, dummy::bytes_transferred));
43}
44
45/*
46void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
47 void (Connection::*handler)(const bs::error_code&))
48{
49 // - The boost::asio::basic_deadline_timer::expires_from_now()
50 // function cancels any pending asynchronous waits, and returns
51 // the number of asynchronous waits that were cancelled. If it
52 // returns 0 then you were too late and the wait handler has
53 // already been executed, or will soon be executed. If it
54 // returns 1 then the wait handler was successfully cancelled.
55 // - If a wait handler is cancelled, the bs::error_code passed to
56 // it contains the value bs::error::operation_aborted.
57 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
58
59 timer.async_wait(boost::bind(handler, this, dummy::error));
60}
61*/
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65 tcp::endpoint endpoint = *iterator;
66
67 // AsyncConnect + Deadline
68 async_connect(endpoint,
69 boost::bind(&Connection::ConnectIter,
70 this, iterator, ba::placeholders::error));
71
72 // We will get a "Connection timeout anyway"
73 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
74}
75
76void Connection::AsyncConnect()
77{
78 // AsyncConnect + Deadline
79 async_connect(fEndpoint,
80 boost::bind(&Connection::ConnectAddr,
81 this, fEndpoint, ba::placeholders::error));
82
83 // We will get a "Connection timeout anyway"
84 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
85}
86
87// ------------------------ close --------------------------
88// close from another thread
89void Connection::CloseImp(bool restart)
90{
91 if (IsConnected() && fVerbose)
92 {
93 ostringstream str;
94 str << "Connection closed to " << URL() << ".";
95 Info(str);
96 }
97
98 // Stop any pending connection attempt
99 fConnectionTimer.cancel();
100
101 // Close possible open connections
102 close();
103
104 // Reset the connection status
105 fConnectionStatus = kDisconnected;
106
107 // Stop deadline counters
108 fInTimeout.cancel();
109 fOutTimeout.cancel();
110
111 // Empty output queue
112 fOutQueue.clear();
113
114 if (!restart || IsConnecting())
115 return;
116
117 // We need some timeout before reconnecting!
118 // And we have to check if we are alreayd trying to connect
119 // We shoudl wait until all operations in progress were canceled
120
121 // Start trying to reconnect
122 fMsgConnect = "";
123 fErrConnect = "";
124 StartConnect();
125}
126
127void Connection::PostClose(bool restart)
128{
129 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
130}
131
132// ------------------------ write --------------------------
133void Connection::HandleWriteTimeout(const bs::error_code &error)
134{
135 if (error==ba::error::basic_errors::operation_aborted)
136 return;
137
138 // 125: Operation canceled (bs::error_code(125, bs::system_category))
139 if (error)
140 {
141 ostringstream str;
142 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
143 Error(str);
144
145 CloseImp();
146 return;
147 }
148
149 if (!is_open())
150 {
151 // For example: Here we could schedule a new accept if we
152 // would not want to allow two connections at the same time.
153 return;
154 }
155
156 // Check whether the deadline has passed. We compare the deadline
157 // against the current time since a new asynchronous operation
158 // may have moved the deadline before this actor had a chance
159 // to run.
160 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
161 return;
162
163 Error("fOutTimeout has expired, writing data to "+URL());
164
165 CloseImp();
166}
167
168void Connection::HandleSentData(const bs::error_code& error, size_t n)
169{
170 if (error && error != ba::error::not_connected)
171 {
172 ostringstream str;
173 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
174 Error(str);
175
176 CloseImp();
177 return;
178 }
179
180 if (error == ba::error::not_connected)
181 {
182 ostringstream msg;
183 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
184 Warn(msg);
185 }
186 else
187 {
188 if (fDebugTx)
189 {
190 ostringstream msg;
191 msg << n << " bytes successfully sent to " << URL();
192 Debug(msg);
193 }
194 }
195
196 // This is "thread" safe because SendMessage and HandleSentMessage
197 // are serialized in the EventQueue. Note: Do not call these
198 // functions directly from any other place then Handlers, use
199 // PostMessage instead
200 if (!fOutQueue.empty())
201 fOutQueue.pop_front();
202
203 if (fOutQueue.empty())
204 {
205 // Queue went empty, remove deadline
206 fOutTimeout.cancel();
207 return;
208 }
209
210 // AsyncWrite + Deadline
211 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
212 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
213}
214
215// It is important that when SendMessageImp is called, or to be more
216// precise boost::bind is called, teh data is copied!
217void Connection::SendMessageImp(const vector<char> msg)
218{
219 /*
220 if (!fConnectionEstablished)
221 {
222 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
223 return;
224 }*/
225
226 const bool first_message_in_queue = fOutQueue.empty();
227
228 // This is "thread" safe because SendMessage and HandleSentMessage
229 // are serialized in the EventQueue. Note: Do not call these
230 // functions directly from any other place then Handlers, use
231 // PostMessage instead
232 fOutQueue.push_back(msg);
233
234 if (!first_message_in_queue)
235 return;
236
237 // AsyncWrite + Deadline
238 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
239 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
240}
241
242void Connection::PostMessage(const void *ptr, size_t max)
243{
244 const vector<char> msg(reinterpret_cast<const char*>(ptr),
245 reinterpret_cast<const char*>(ptr)+max);
246
247 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
248}
249
250void Connection::PostMessage(const string &cmd, size_t max)
251{
252 if (max==size_t(-1))
253 max = cmd.length()+1;
254
255 vector <char>msg(max);
256
257 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
258
259 PostMessage(msg);
260}
261
262void Connection::HandleConnectionTimer(const bs::error_code &error)
263{
264 if (error==ba::error::basic_errors::operation_aborted)
265 return;
266
267 if (error)
268 {
269 ostringstream str;
270 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
271 Error(str);
272 }
273
274 if (is_open())
275 {
276 // For example: Here we could schedule a new accept if we
277 // would not want to allow two connections at the same time.
278 return;
279 }
280
281 // Check whether the deadline has passed. We compare the deadline
282 // against the current time since a new asynchronous operation
283 // may have moved the deadline before this actor had a chance
284 // to run.
285 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
286 StartConnect();
287}
288
289bool Connection::ConnectImp(const tcp::endpoint &endpoint, const bs::error_code& error)
290{
291 const string host = endpoint.port()==0 ? "" :
292 endpoint.address().to_string()+':'+to_string((long long unsigned int)endpoint.port());
293
294 // Connection established
295 if (!error)
296 {
297 set_option(socket_base::keep_alive(true));
298
299 const int optval = 30;
300 // First keep alive after 30s
301 setsockopt(native(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
302 // New keep alive after 30s
303 setsockopt(native(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
304
305 if (fVerbose)
306 Info("Connection established to "+host+"...");
307
308 fConnectionStatus = kConnected;
309
310 ConnectionEstablished();
311 return true;
312 }
313
314 // If returning from run will lead to deletion of this
315 // instance, close() is not needed (maybe implicitly called).
316 // If run is called again, close() is needed here. Otherwise:
317 // Software caused connection abort when we try to resolve
318 // the endpoint again.
319 CloseImp(false);
320
321 ostringstream msg;
322 if (!host.empty())
323 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
324
325 if (fErrConnect!=msg.str())
326 {
327 if (error!=ba::error::basic_errors::connection_refused)
328 fMsgConnect = "";
329 fErrConnect = msg.str();
330 Warn(fErrConnect);
331 }
332
333 if (error==ba::error::basic_errors::operation_aborted)
334 return true;
335
336 fConnectionStatus = kConnecting;
337
338 return false;
339/*
340 // Go on with the next
341 if (++iterator != tcp::resolver::iterator())
342 {
343 AsyncConnect(iterator);
344 return;
345 }
346*/
347 // No more entries to try, if we would not put anything else
348 // into the queue anymore it would now return (run() would return)
349
350 // Since we don't want to block the main loop, we wait using an
351 // asnychronous timer
352
353 // FIXME: Should we move this before AsyncConnect() ?
354// AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
355}
356
357void Connection::ConnectIter(tcp::resolver::iterator iterator, const bs::error_code& error)
358{
359 if (ConnectImp(*iterator, error))
360 return;
361
362 // Go on with the next
363 if (++iterator != tcp::resolver::iterator())
364 {
365 AsyncConnect(iterator);
366 return;
367 }
368
369 // No more entries to try, if we would not put anything else
370 // into the queue anymore it would now return (run() would return)
371 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
372}
373
374void Connection::ConnectAddr(const tcp::endpoint &endpoint, const bs::error_code& error)
375{
376 if (ConnectImp(endpoint, error))
377 return;
378
379 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
380}
381
382// FIXME: Async connect should get address and port as an argument
383void Connection::StartConnect()
384{
385 fConnectionStatus = kConnecting;
386
387 if (fEndpoint!=tcp::endpoint())
388 {
389 ostringstream msg;
390 msg << "Trying to connect to " << fEndpoint << "...";
391 if (fMsgConnect!=msg.str())
392 {
393 fMsgConnect = msg.str();
394 Info(msg);
395 }
396
397 AsyncConnect();
398 return;
399 }
400
401 tcp::resolver resolver(get_io_service());
402
403 boost::system::error_code ec;
404
405 tcp::resolver::query query(fAddress, fPort);
406 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
407
408 ostringstream msg;
409 if (!fAddress.empty() || !fPort.empty() || ec)
410 msg << "Trying to connect to " << URL() << "...";
411
412 if (ec)
413 msg << " " << ec.message() << " (" << ec << ")";
414
415 // Only output message if it has changed
416 if (fMsgConnect!=msg.str())
417 {
418 fMsgConnect = msg.str();
419 if (ec)
420 Error(msg);
421 if (!ec && fVerbose)
422 Info(msg);
423 }
424
425 if (ec)
426 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
427 else
428 // Start connection attempts (will also reset deadline counter)
429 AsyncConnect(iterator);
430}
431
432void Connection::SetEndpoint(const string &addr, int port)
433{
434 if (fConnectionStatus>=1)
435 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
436
437 fAddress = addr;
438 fPort = to_string((long long)port);
439}
440
441void Connection::SetEndpoint(const string &addr, const string &port)
442{
443 if (fConnectionStatus>=1 && URL()!=":")
444 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
445
446 fAddress = addr;
447 fPort = port;
448}
449
450void Connection::SetEndpoint(const string &addr)
451{
452 const size_t p0 = addr.find_first_of(':');
453 const size_t p1 = addr.find_last_of(':');
454
455 if (p0==string::npos || p0!=p1)
456 {
457 Error("Connection::SetEndpoint - Wrong format of argument '"+addr+"' ('host:port' expected)");
458 return;
459 }
460
461 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
462}
463
464void Connection::SetEndpoint(const tcp::endpoint &ep)
465{
466 const ba::ip::address addr = ep.address();
467
468 const ba::ip::address use =
469 addr.is_v6() && addr.to_v6().is_loopback() ?
470 ba::ip::address(ba::ip::address_v4::loopback()) :
471 addr;
472
473 SetEndpoint(use.to_string(), ep.port());
474
475 fEndpoint = tcp::endpoint(use, ep.port());
476}
477
478
479Connection::Connection(ba::io_service& ioservice, ostream &out) :
480MessageImp(out), tcp::socket(ioservice),
481fLog(0), fVerbose(true), fDebugTx(false),
482fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
483fConnectionStatus(kDisconnected)
484{
485}
Note: See TracBrowser for help on using the repository browser.