source: trunk/FACT++/src/Connection.cc@ 20115

Last change on this file since 20115 was 20086, checked in by tbretz, 4 years ago
Trying to improve exception safety of closing a connection. Calling shutdown is suggested in the close documentation and using the error code should prevent exceptions.
File size: 13.8 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6@todo
7 Unify with ConnectionUSB
8
9*/
10// **************************************************************************
11#include "Connection.h"
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using ba::ip::tcp;
20
21 // -------- Abbreviations for starting async tasks ---------
22
23int Connection::Write(const Time &t, const string &txt, int qos)
24{
25 if (fLog)
26 return fLog->Write(t, txt, qos);
27
28 return MessageImp::Write(t, txt, qos);
29}
30
31void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
32{
33 ba::async_read(*this, buffers,
34 boost::bind(&Connection::HandleReceivedData, this,
35 dummy::error, dummy::bytes_transferred, type));
36}
37
38void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
39{
40 ba::async_write(*this, buffers,
41 boost::bind(&Connection::HandleSentData, this,
42 dummy::error, dummy::bytes_transferred));
43}
44
45/*
46void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
47 void (Connection::*handler)(const bs::error_code&))
48{
49 // - The boost::asio::basic_deadline_timer::expires_from_now()
50 // function cancels any pending asynchronous waits, and returns
51 // the number of asynchronous waits that were cancelled. If it
52 // returns 0 then you were too late and the wait handler has
53 // already been executed, or will soon be executed. If it
54 // returns 1 then the wait handler was successfully cancelled.
55 // - If a wait handler is cancelled, the bs::error_code passed to
56 // it contains the value bs::error::operation_aborted.
57 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
58
59 timer.async_wait(boost::bind(handler, this, dummy::error));
60}
61*/
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65 tcp::endpoint endpoint = *iterator;
66
67 // AsyncConnect + Deadline
68 async_connect(endpoint,
69 boost::bind(&Connection::ConnectIter,
70 this, iterator, ba::placeholders::error));
71
72 // We will get a "Connection timeout anyway"
73 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
74}
75
76void Connection::AsyncConnect()
77{
78 // AsyncConnect + Deadline
79 async_connect(fEndpoint,
80 boost::bind(&Connection::ConnectAddr,
81 this, fEndpoint, ba::placeholders::error));
82
83 // We will get a "Connection timeout anyway"
84 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
85}
86
87// ------------------------ close --------------------------
88// close from another thread
89void Connection::CloseImp(bool restart)
90{
91 if (IsConnected() && fVerbose)
92 {
93 ostringstream str;
94 str << "Connection closed to " << URL() << ".";
95 Info(str);
96 }
97
98 // Stop any pending connection attempt
99 fConnectionTimer.cancel();
100
101 // Close possible open connections
102 if (is_open())
103 {
104 bs::error_code err;
105 shutdown(boost::asio::ip::tcp::socket::shutdown_both, err);
106 if (err)
107 {
108 ostringstream str;
109 str << "Shutting down " << URL() << ": " << err.message() << " (" << err << ")";// << endl;
110 Error(str);
111 }
112 close(err);
113 if (err)
114 {
115 ostringstream str;
116 str << "Closing " << URL() << ": " << err.message() << " (" << err << ")";// << endl;
117 Error(str);
118 }
119 }
120
121 // Reset the connection status
122 fQueueSize = 0;
123 fConnectionStatus = kDisconnected;
124
125 // Stop deadline counters
126 fInTimeout.cancel();
127 fOutTimeout.cancel();
128
129 if (!restart || IsConnecting())
130 return;
131
132 // We need some timeout before reconnecting!
133 // And we have to check if we are alreayd trying to connect
134 // We shoudl wait until all operations in progress were canceled
135
136 // Start trying to reconnect
137 fMsgConnect = "";
138 fErrConnect = "";
139 StartConnect();
140}
141
142void Connection::PostClose(bool restart)
143{
144#if BOOST_VERSION < 107000
145 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
146#else
147 ba::post(boost::bind(&Connection::CloseImp, this, restart));
148#endif
149}
150
151// ------------------------ write --------------------------
152void Connection::HandleWriteTimeout(const bs::error_code &error)
153{
154 if (error==ba::error::basic_errors::operation_aborted)
155 return;
156
157 // 125: Operation canceled (bs::error_code(125, bs::system_category))
158 if (error)
159 {
160 ostringstream str;
161 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
162 Error(str);
163
164 CloseImp();
165 return;
166 }
167
168 if (!is_open())
169 {
170 // For example: Here we could schedule a new accept if we
171 // would not want to allow two connections at the same time.
172 return;
173 }
174
175 // Check whether the deadline has passed. We compare the deadline
176 // against the current time since a new asynchronous operation
177 // may have moved the deadline before this actor had a chance
178 // to run.
179 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
180 return;
181
182 Error("fOutTimeout has expired, writing data to "+URL());
183
184 CloseImp();
185}
186
187void Connection::HandleSentData(const bs::error_code& error, size_t n)
188{
189 if (error==ba::error::basic_errors::operation_aborted)
190 return;
191
192 if (error && error != ba::error::not_connected)
193 {
194 ostringstream str;
195 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
196 Error(str);
197
198 CloseImp();
199 return;
200 }
201
202 if (error == ba::error::not_connected)
203 {
204 ostringstream msg;
205 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
206 Warn(msg);
207
208 return;
209 }
210
211 if (--fQueueSize==0)
212 fOutTimeout.cancel();
213
214 if (fDebugTx)
215 {
216 ostringstream msg;
217 msg << n << " bytes successfully sent to " << URL();
218 Debug(msg);
219 }
220}
221
222void Connection::PostMessage(const void *ptr, size_t sz)
223{
224 // This function can be called from a different thread...
225 if (!is_open())
226 return;
227
228 // ... this is why we have to increase fQueueSize first
229 fQueueSize++;
230
231 // ... and shift the deadline timer
232 // This is not ideal, because if we are continously
233 // filling the buffer, it will never timeout
234 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
235
236 // Now we can schedule the buffer to be sent
237 AsyncWrite(ba::const_buffers_1(ptr, sz));
238
239 // If a socket is closed, all pending asynchronous
240 // operation will be aborted.
241}
242
243void Connection::PostMessage(const string &cmd, size_t max)
244{
245 if (max==size_t(-1))
246 max = cmd.length()+1;
247
248 PostMessage(cmd.c_str(), min(cmd.length()+1, max));
249}
250
251void Connection::HandleConnectionTimer(const bs::error_code &error)
252{
253 if (error==ba::error::basic_errors::operation_aborted)
254 return;
255
256 if (error)
257 {
258 ostringstream str;
259 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
260 Error(str);
261 }
262
263 if (is_open())
264 {
265 // For example: Here we could schedule a new accept if we
266 // would not want to allow two connections at the same time.
267 return;
268 }
269
270 // Check whether the deadline has passed. We compare the deadline
271 // against the current time since a new asynchronous operation
272 // may have moved the deadline before this actor had a chance
273 // to run.
274 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
275 StartConnect();
276}
277
278bool Connection::ConnectImp(const tcp::endpoint &endpoint, const bs::error_code& error)
279{
280 const string host = endpoint.port()==0 ? "" :
281 endpoint.address().to_string()+':'+to_string((long long unsigned int)endpoint.port());
282
283 // Connection established
284 if (!error)
285 {
286 set_option(socket_base::keep_alive(true));
287
288 const int optval = 30;
289
290#if BOOST_VERSION <107000
291 // First keep alive after 30s
292 setsockopt(native(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
293 // New keep alive after 30s
294 setsockopt(native(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
295#else
296 // First keep alive after 30s
297 setsockopt(native_handle(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
298 // New keep alive after 30s
299 setsockopt(native_handle(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
300#endif
301
302 if (fVerbose)
303 Info("Connection established to "+host+"...");
304
305 fQueueSize = 0;
306 fConnectionStatus = kConnected;
307
308 ConnectionEstablished();
309 return true;
310 }
311
312 // If returning from run will lead to deletion of this
313 // instance, close() is not needed (maybe implicitly called).
314 // If run is called again, close() is needed here. Otherwise:
315 // Software caused connection abort when we try to resolve
316 // the endpoint again.
317 CloseImp(false);
318
319 ostringstream msg;
320 if (!host.empty())
321 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
322
323 if (fErrConnect!=msg.str())
324 {
325 if (error!=ba::error::basic_errors::connection_refused)
326 fMsgConnect = "";
327 fErrConnect = msg.str();
328 Warn(fErrConnect);
329 }
330
331 if (error==ba::error::basic_errors::operation_aborted)
332 return true;
333
334 fConnectionStatus = kConnecting;
335
336 return false;
337/*
338 // Go on with the next
339 if (++iterator != tcp::resolver::iterator())
340 {
341 AsyncConnect(iterator);
342 return;
343 }
344*/
345 // No more entries to try, if we would not put anything else
346 // into the queue anymore it would now return (run() would return)
347
348 // Since we don't want to block the main loop, we wait using an
349 // asnychronous timer
350
351 // FIXME: Should we move this before AsyncConnect() ?
352// AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
353}
354
355void Connection::ConnectIter(tcp::resolver::iterator iterator, const bs::error_code& error)
356{
357 if (ConnectImp(*iterator, error))
358 return;
359
360 // Go on with the next
361 if (++iterator != tcp::resolver::iterator())
362 {
363 AsyncConnect(iterator);
364 return;
365 }
366
367 // No more entries to try, if we would not put anything else
368 // into the queue anymore it would now return (run() would return)
369 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
370}
371
372void Connection::ConnectAddr(const tcp::endpoint &endpoint, const bs::error_code& error)
373{
374 if (ConnectImp(endpoint, error))
375 return;
376
377 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
378}
379
380// FIXME: Async connect should get address and port as an argument
381void Connection::StartConnect()
382{
383 fConnectionStatus = kConnecting;
384
385 if (fEndpoint!=tcp::endpoint())
386 {
387 ostringstream msg;
388 msg << "Trying to connect to " << fEndpoint << "...";
389 if (fMsgConnect!=msg.str())
390 {
391 fMsgConnect = msg.str();
392 Info(msg);
393 }
394
395 AsyncConnect();
396 return;
397 }
398
399 const bool valid = !fAddress.empty() || !fPort.empty();
400
401 boost::system::error_code ec;
402
403 ostringstream msg;
404 if (!valid)
405 msg << "No target address... connection attempt postponed.";
406 else
407 {
408#if BOOST_VERSION < 107000
409 tcp::resolver resolver(get_io_service());
410#else
411 tcp::resolver resolver(get_executor());
412#endif
413 tcp::resolver::query query(fAddress, fPort);
414 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
415
416 msg << "Trying to connect to " << URL() << "...";
417
418 // Start connection attempts (will also reset deadline counter)
419 if (!ec)
420 AsyncConnect(iterator);
421 else
422 msg << " " << ec.message() << " (" << ec << ")";
423 }
424
425 // Only output message if it has changed
426 if (fMsgConnect!=msg.str())
427 {
428 fMsgConnect = msg.str();
429 if (ec)
430 Error(msg);
431 if (!ec && fVerbose)
432 Info(msg);
433 }
434
435 if (!valid || ec)
436 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
437}
438
439void Connection::SetEndpoint(const string &addr, int port)
440{
441 if (fConnectionStatus>=1)
442 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
443
444 fAddress = addr;
445 fPort = to_string((long long)port);
446}
447
448void Connection::SetEndpoint(const string &addr, const string &port)
449{
450 if (fConnectionStatus>=1 && URL()!=":")
451 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
452
453 fAddress = addr;
454 fPort = port;
455}
456
457void Connection::SetEndpoint(const string &addr)
458{
459 const size_t p0 = addr.find_first_of(':');
460 const size_t p1 = addr.find_last_of(':');
461
462 if (p0==string::npos || p0!=p1)
463 {
464 Error("Connection::SetEndpoint - Wrong format of argument '"+addr+"' ('host:port' expected)");
465 return;
466 }
467
468 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
469}
470
471void Connection::SetEndpoint(const tcp::endpoint &ep)
472{
473 const ba::ip::address addr = ep.address();
474
475 const ba::ip::address use =
476 addr.is_v6() && addr.to_v6().is_loopback() ?
477 ba::ip::address(ba::ip::address_v4::loopback()) :
478 addr;
479
480 SetEndpoint(use.to_string(), ep.port());
481
482 fEndpoint = tcp::endpoint(use, ep.port());
483}
484
485
486Connection::Connection(ba::io_service& ioservice, ostream &out) :
487MessageImp(out), tcp::socket(ioservice),
488fLog(0), fVerbose(true), fDebugTx(false),
489fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
490fQueueSize(0), fConnectionStatus(kDisconnected)
491{
492}
Note: See TracBrowser for help on using the repository browser.