source: trunk/FACT++/src/Connection.cc@ 20026

Last change on this file since 20026 was 20018, checked in by tbretz, 4 years ago
In favor of boost 1.70, replaced get_io_context
File size: 13.3 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6@todo
7 Unify with ConnectionUSB
8
9*/
10// **************************************************************************
11#include "Connection.h"
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using ba::ip::tcp;
20
21 // -------- Abbreviations for starting async tasks ---------
22
23int Connection::Write(const Time &t, const string &txt, int qos)
24{
25 if (fLog)
26 return fLog->Write(t, txt, qos);
27
28 return MessageImp::Write(t, txt, qos);
29}
30
31void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
32{
33 ba::async_read(*this, buffers,
34 boost::bind(&Connection::HandleReceivedData, this,
35 dummy::error, dummy::bytes_transferred, type));
36}
37
38void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
39{
40 ba::async_write(*this, buffers,
41 boost::bind(&Connection::HandleSentData, this,
42 dummy::error, dummy::bytes_transferred));
43}
44
45/*
46void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
47 void (Connection::*handler)(const bs::error_code&))
48{
49 // - The boost::asio::basic_deadline_timer::expires_from_now()
50 // function cancels any pending asynchronous waits, and returns
51 // the number of asynchronous waits that were cancelled. If it
52 // returns 0 then you were too late and the wait handler has
53 // already been executed, or will soon be executed. If it
54 // returns 1 then the wait handler was successfully cancelled.
55 // - If a wait handler is cancelled, the bs::error_code passed to
56 // it contains the value bs::error::operation_aborted.
57 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
58
59 timer.async_wait(boost::bind(handler, this, dummy::error));
60}
61*/
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65 tcp::endpoint endpoint = *iterator;
66
67 // AsyncConnect + Deadline
68 async_connect(endpoint,
69 boost::bind(&Connection::ConnectIter,
70 this, iterator, ba::placeholders::error));
71
72 // We will get a "Connection timeout anyway"
73 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
74}
75
76void Connection::AsyncConnect()
77{
78 // AsyncConnect + Deadline
79 async_connect(fEndpoint,
80 boost::bind(&Connection::ConnectAddr,
81 this, fEndpoint, ba::placeholders::error));
82
83 // We will get a "Connection timeout anyway"
84 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
85}
86
87// ------------------------ close --------------------------
88// close from another thread
89void Connection::CloseImp(bool restart)
90{
91 if (IsConnected() && fVerbose)
92 {
93 ostringstream str;
94 str << "Connection closed to " << URL() << ".";
95 Info(str);
96 }
97
98 // Stop any pending connection attempt
99 fConnectionTimer.cancel();
100
101 // Close possible open connections
102 close();
103
104 // Reset the connection status
105 fQueueSize = 0;
106 fConnectionStatus = kDisconnected;
107
108 // Stop deadline counters
109 fInTimeout.cancel();
110 fOutTimeout.cancel();
111
112 if (!restart || IsConnecting())
113 return;
114
115 // We need some timeout before reconnecting!
116 // And we have to check if we are alreayd trying to connect
117 // We shoudl wait until all operations in progress were canceled
118
119 // Start trying to reconnect
120 fMsgConnect = "";
121 fErrConnect = "";
122 StartConnect();
123}
124
125void Connection::PostClose(bool restart)
126{
127#if BOOST_VERSION < 107000
128 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
129#else
130 ba::post(boost::bind(&Connection::CloseImp, this, restart));
131#endif
132}
133
134// ------------------------ write --------------------------
135void Connection::HandleWriteTimeout(const bs::error_code &error)
136{
137 if (error==ba::error::basic_errors::operation_aborted)
138 return;
139
140 // 125: Operation canceled (bs::error_code(125, bs::system_category))
141 if (error)
142 {
143 ostringstream str;
144 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
145 Error(str);
146
147 CloseImp();
148 return;
149 }
150
151 if (!is_open())
152 {
153 // For example: Here we could schedule a new accept if we
154 // would not want to allow two connections at the same time.
155 return;
156 }
157
158 // Check whether the deadline has passed. We compare the deadline
159 // against the current time since a new asynchronous operation
160 // may have moved the deadline before this actor had a chance
161 // to run.
162 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
163 return;
164
165 Error("fOutTimeout has expired, writing data to "+URL());
166
167 CloseImp();
168}
169
170void Connection::HandleSentData(const bs::error_code& error, size_t n)
171{
172 if (error==ba::error::basic_errors::operation_aborted)
173 return;
174
175 if (error && error != ba::error::not_connected)
176 {
177 ostringstream str;
178 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
179 Error(str);
180
181 CloseImp();
182 return;
183 }
184
185 if (error == ba::error::not_connected)
186 {
187 ostringstream msg;
188 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
189 Warn(msg);
190
191 return;
192 }
193
194 if (--fQueueSize==0)
195 fOutTimeout.cancel();
196
197 if (fDebugTx)
198 {
199 ostringstream msg;
200 msg << n << " bytes successfully sent to " << URL();
201 Debug(msg);
202 }
203}
204
205void Connection::PostMessage(const void *ptr, size_t sz)
206{
207 // This function can be called from a different thread...
208 if (!is_open())
209 return;
210
211 // ... this is why we have to increase fQueueSize first
212 fQueueSize++;
213
214 // ... and shift the deadline timer
215 // This is not ideal, because if we are continously
216 // filling the buffer, it will never timeout
217 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
218
219 // Now we can schedule the buffer to be sent
220 AsyncWrite(ba::const_buffers_1(ptr, sz));
221
222 // If a socket is closed, all pending asynchronous
223 // operation will be aborted.
224}
225
226void Connection::PostMessage(const string &cmd, size_t max)
227{
228 if (max==size_t(-1))
229 max = cmd.length()+1;
230
231 PostMessage(cmd.c_str(), min(cmd.length()+1, max));
232}
233
234void Connection::HandleConnectionTimer(const bs::error_code &error)
235{
236 if (error==ba::error::basic_errors::operation_aborted)
237 return;
238
239 if (error)
240 {
241 ostringstream str;
242 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
243 Error(str);
244 }
245
246 if (is_open())
247 {
248 // For example: Here we could schedule a new accept if we
249 // would not want to allow two connections at the same time.
250 return;
251 }
252
253 // Check whether the deadline has passed. We compare the deadline
254 // against the current time since a new asynchronous operation
255 // may have moved the deadline before this actor had a chance
256 // to run.
257 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
258 StartConnect();
259}
260
261bool Connection::ConnectImp(const tcp::endpoint &endpoint, const bs::error_code& error)
262{
263 const string host = endpoint.port()==0 ? "" :
264 endpoint.address().to_string()+':'+to_string((long long unsigned int)endpoint.port());
265
266 // Connection established
267 if (!error)
268 {
269 set_option(socket_base::keep_alive(true));
270
271 const int optval = 30;
272
273#if BOOST_VERSION <107000
274 // First keep alive after 30s
275 setsockopt(native(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
276 // New keep alive after 30s
277 setsockopt(native(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
278#else
279 // First keep alive after 30s
280 setsockopt(native_handle(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
281 // New keep alive after 30s
282 setsockopt(native_handle(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
283#endif
284
285 if (fVerbose)
286 Info("Connection established to "+host+"...");
287
288 fQueueSize = 0;
289 fConnectionStatus = kConnected;
290
291 ConnectionEstablished();
292 return true;
293 }
294
295 // If returning from run will lead to deletion of this
296 // instance, close() is not needed (maybe implicitly called).
297 // If run is called again, close() is needed here. Otherwise:
298 // Software caused connection abort when we try to resolve
299 // the endpoint again.
300 CloseImp(false);
301
302 ostringstream msg;
303 if (!host.empty())
304 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
305
306 if (fErrConnect!=msg.str())
307 {
308 if (error!=ba::error::basic_errors::connection_refused)
309 fMsgConnect = "";
310 fErrConnect = msg.str();
311 Warn(fErrConnect);
312 }
313
314 if (error==ba::error::basic_errors::operation_aborted)
315 return true;
316
317 fConnectionStatus = kConnecting;
318
319 return false;
320/*
321 // Go on with the next
322 if (++iterator != tcp::resolver::iterator())
323 {
324 AsyncConnect(iterator);
325 return;
326 }
327*/
328 // No more entries to try, if we would not put anything else
329 // into the queue anymore it would now return (run() would return)
330
331 // Since we don't want to block the main loop, we wait using an
332 // asnychronous timer
333
334 // FIXME: Should we move this before AsyncConnect() ?
335// AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
336}
337
338void Connection::ConnectIter(tcp::resolver::iterator iterator, const bs::error_code& error)
339{
340 if (ConnectImp(*iterator, error))
341 return;
342
343 // Go on with the next
344 if (++iterator != tcp::resolver::iterator())
345 {
346 AsyncConnect(iterator);
347 return;
348 }
349
350 // No more entries to try, if we would not put anything else
351 // into the queue anymore it would now return (run() would return)
352 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
353}
354
355void Connection::ConnectAddr(const tcp::endpoint &endpoint, const bs::error_code& error)
356{
357 if (ConnectImp(endpoint, error))
358 return;
359
360 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
361}
362
363// FIXME: Async connect should get address and port as an argument
364void Connection::StartConnect()
365{
366 fConnectionStatus = kConnecting;
367
368 if (fEndpoint!=tcp::endpoint())
369 {
370 ostringstream msg;
371 msg << "Trying to connect to " << fEndpoint << "...";
372 if (fMsgConnect!=msg.str())
373 {
374 fMsgConnect = msg.str();
375 Info(msg);
376 }
377
378 AsyncConnect();
379 return;
380 }
381
382 const bool valid = !fAddress.empty() || !fPort.empty();
383
384 boost::system::error_code ec;
385
386 ostringstream msg;
387 if (!valid)
388 msg << "No target address... connection attempt postponed.";
389 else
390 {
391#if BOOST_VERSION < 107000
392 tcp::resolver resolver(get_io_service());
393#else
394 tcp::resolver resolver(get_executor());
395#endif
396 tcp::resolver::query query(fAddress, fPort);
397 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
398
399 msg << "Trying to connect to " << URL() << "...";
400
401 // Start connection attempts (will also reset deadline counter)
402 if (!ec)
403 AsyncConnect(iterator);
404 else
405 msg << " " << ec.message() << " (" << ec << ")";
406 }
407
408 // Only output message if it has changed
409 if (fMsgConnect!=msg.str())
410 {
411 fMsgConnect = msg.str();
412 if (ec)
413 Error(msg);
414 if (!ec && fVerbose)
415 Info(msg);
416 }
417
418 if (!valid || ec)
419 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
420}
421
422void Connection::SetEndpoint(const string &addr, int port)
423{
424 if (fConnectionStatus>=1)
425 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
426
427 fAddress = addr;
428 fPort = to_string((long long)port);
429}
430
431void Connection::SetEndpoint(const string &addr, const string &port)
432{
433 if (fConnectionStatus>=1 && URL()!=":")
434 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
435
436 fAddress = addr;
437 fPort = port;
438}
439
440void Connection::SetEndpoint(const string &addr)
441{
442 const size_t p0 = addr.find_first_of(':');
443 const size_t p1 = addr.find_last_of(':');
444
445 if (p0==string::npos || p0!=p1)
446 {
447 Error("Connection::SetEndpoint - Wrong format of argument '"+addr+"' ('host:port' expected)");
448 return;
449 }
450
451 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
452}
453
454void Connection::SetEndpoint(const tcp::endpoint &ep)
455{
456 const ba::ip::address addr = ep.address();
457
458 const ba::ip::address use =
459 addr.is_v6() && addr.to_v6().is_loopback() ?
460 ba::ip::address(ba::ip::address_v4::loopback()) :
461 addr;
462
463 SetEndpoint(use.to_string(), ep.port());
464
465 fEndpoint = tcp::endpoint(use, ep.port());
466}
467
468
469Connection::Connection(ba::io_service& ioservice, ostream &out) :
470MessageImp(out), tcp::socket(ioservice),
471fLog(0), fVerbose(true), fDebugTx(false),
472fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
473fQueueSize(0), fConnectionStatus(kDisconnected)
474{
475}
Note: See TracBrowser for help on using the repository browser.