source: trunk/FACT++/src/ConnectionSSL.cc@ 20056

Last change on this file since 20056 was 20017, checked in by tbretz, 4 years ago
Made ready for boost 1.70 by replacing get_io_context; the SSL connection has to be properly shutdown, but this required a new socket.. therefore switched to boost::optional to keep the socket.
File size: 16.9 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6@todo
7 Unify with ConnectionUSB
8
9*/
10// **************************************************************************
11#include "ConnectionSSL.h"
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace ssl = boost::asio::ssl;
17namespace bs = boost::system;
18namespace dummy = ba::placeholders;
19
20using ba::ip::tcp;
21
22 // -------- Abbreviations for starting async tasks ---------
23
24int ConnectionSSL::Write(const Time &t, const string &txt, int qos)
25{
26 if (fLog)
27 return fLog->Write(t, txt, qos);
28
29 return MessageImp::Write(t, txt, qos);
30}
31
32void ConnectionSSL::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
33{
34 ba::async_read(*stream, buffers,
35 boost::bind(&ConnectionSSL::HandleReceivedData, this,
36 dummy::error, dummy::bytes_transferred, type));
37}
38
39void ConnectionSSL::AsyncWrite(const ba::const_buffers_1 &buffers)
40{
41 ba::async_write(*stream, buffers,
42 boost::bind(&ConnectionSSL::HandleSentData, this,
43 dummy::error, dummy::bytes_transferred));
44}
45
46/*
47void ConnectionSSL::AsyncWait(ba::deadline_timer &timer, int millisec,
48 void (ConnectionSSL::*handler)(const bs::error_code&))
49{
50 // - The boost::asio::basic_deadline_timer::expires_from_now()
51 // function cancels any pending asynchronous waits, and returns
52 // the number of asynchronous waits that were cancelled. If it
53 // returns 0 then you were too late and the wait handler has
54 // already been executed, or will soon be executed. If it
55 // returns 1 then the wait handler was successfully cancelled.
56 // - If a wait handler is cancelled, the bs::error_code passed to
57 // it contains the value bs::error::operation_aborted.
58 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
59
60 timer.async_wait(boost::bind(handler, this, dummy::error));
61}
62*/
63
64void ConnectionSSL::AsyncConnect(tcp::resolver::iterator iterator)
65{
66 tcp::endpoint endpoint = *iterator;
67
68 // AsyncConnect + Deadline
69 stream->lowest_layer().async_connect(endpoint,
70 boost::bind(&ConnectionSSL::ConnectIter,
71 this, iterator, ba::placeholders::error));
72
73 // We will get a "Connection timeout anyway"
74 //AsyncWait(fConnectTimeout, 5, &ConnectionSSL::HandleConnectTimeout);
75}
76
77void ConnectionSSL::AsyncConnect()
78{
79 // AsyncConnect + Deadline
80 stream->lowest_layer().async_connect(fEndpoint,
81 boost::bind(&ConnectionSSL::ConnectAddr,
82 this, fEndpoint, ba::placeholders::error));
83
84 // We will get a "Connection timeout anyway"
85 //AsyncWait(fConnectTimeout, 5, &ConnectionSSL::HandleConnectTimeout);
86}
87
88void ConnectionSSL::HandleAsyncShutdown(const bs::error_code &error)
89{
90 stream->lowest_layer().close();
91
92 if (!error || error==error.default_error_condition())
93 return;
94
95 ostringstream str;
96 str << "Shutdown of SSL connection to " << URL() << "[" << error.value() << "]: " << error.message() << " (" << error << ")";// << endl;
97 Error(str);
98}
99
100
101// ------------------------ close --------------------------
102// close from another thread
103void ConnectionSSL::CloseImp(bool restart)
104{
105 if (IsConnected() && fVerbose)
106 {
107 ostringstream str;
108 str << "Connection closed to " << URL() << ".";
109 Info(str);
110 }
111
112 // Stop any pending connection attempt
113 fConnectionTimer.cancel();
114
115 // Close possible open connections
116 if (!IsClosed())
117 {
118 // Adapted from
119 // https://stackoverflow.com/questions/49122521/cant-implement-boostasiosslstreamboostasioiptcpsocket-reconnect
120
121#if BOOST_VERSION < 107000
122 auto& io_context = stream->get_io_service();
123#else
124 const auto& io_context = stream->get_executor();
125#endif
126
127 stream->lowest_layer().cancel(); // Remove all pending stuff in the queue
128 stream->async_shutdown(boost::bind(&ConnectionSSL::HandleAsyncShutdown, this, dummy::error));
129
130 stream.emplace(io_context, *this);
131 }
132
133 // Reset the connection status
134 fQueueSize = 0;
135 fConnectionStatus = kDisconnected;
136
137 // Stop deadline counters
138 fInTimeout.cancel();
139 fOutTimeout.cancel();
140
141 if (!restart || IsConnecting())
142 return;
143
144 // We need some timeout before reconnecting!
145 // And we have to check if we are alreayd trying to connect
146 // We shoudl wait until all operations in progress were canceled
147
148 // Start trying to reconnect
149 fMsgConnect = "";
150 fErrConnect = "";
151 StartConnect();
152}
153
154void ConnectionSSL::PostClose(bool restart)
155{
156#if BOOST_VERSION < 107000
157 stream->get_io_service().post(boost::bind(&ConnectionSSL::CloseImp, this, restart));
158#else
159 ba::post(boost::bind(&ConnectionSSL::CloseImp, this, restart));
160#endif
161}
162
163// ------------------------ write --------------------------
164void ConnectionSSL::HandleWriteTimeout(const bs::error_code &error)
165{
166 if (error==ba::error::basic_errors::operation_aborted)
167 return;
168
169 // 125: Operation canceled (bs::error_code(125, bs::system_category))
170 if (error)
171 {
172 ostringstream str;
173 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
174 Error(str);
175
176 CloseImp();
177 return;
178 }
179
180 if (IsClosed())
181 {
182 // For example: Here we could schedule a new accept if we
183 // would not want to allow two connections at the same time.
184 return;
185 }
186
187 // Check whether the deadline has passed. We compare the deadline
188 // against the current time since a new asynchronous operation
189 // may have moved the deadline before this actor had a chance
190 // to run.
191 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
192 return;
193
194 Error("fOutTimeout has expired, writing data to "+URL());
195
196 CloseImp();
197}
198
199void ConnectionSSL::HandleSentData(const bs::error_code& error, size_t n)
200{
201 if (error==ba::error::basic_errors::operation_aborted)
202 return;
203
204 if (error && error != ba::error::not_connected)
205 {
206 ostringstream str;
207 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
208 Error(str);
209
210 CloseImp();
211 return;
212 }
213
214 if (error == ba::error::not_connected)
215 {
216 ostringstream msg;
217 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
218 Warn(msg);
219
220 return;
221 }
222
223 if (--fQueueSize==0)
224 fOutTimeout.cancel();
225
226 if (fDebugTx)
227 {
228 ostringstream msg;
229 msg << n << " bytes successfully sent to " << URL();
230 Debug(msg);
231 }
232}
233
234void ConnectionSSL::PostMessage(const void *ptr, size_t sz)
235{
236 // This function can be called from a different thread...
237 if (IsClosed())
238 return;
239
240 // ... this is why we have to increase fQueueSize first
241 fQueueSize++;
242
243 // ... and shift the deadline timer
244 // This is not ideal, because if we are continously
245 // filling the buffer, it will never timeout
246 AsyncWait(fOutTimeout, 5000, &ConnectionSSL::HandleWriteTimeout);
247
248 // Now we can schedule the buffer to be sent
249 AsyncWrite(ba::const_buffers_1(ptr, sz));
250
251 // If a socket is closed, all pending asynchronous
252 // operation will be aborted.
253}
254
255void ConnectionSSL::PostMessage(const string &cmd, size_t max)
256{
257 if (max==size_t(-1))
258 max = cmd.length()+1;
259
260 PostMessage(cmd.c_str(), min(cmd.length()+1, max));
261}
262
263void ConnectionSSL::HandleConnectionTimer(const bs::error_code &error)
264{
265 if (error==ba::error::basic_errors::operation_aborted)
266 return;
267
268 if (error)
269 {
270 ostringstream str;
271 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
272 Error(str);
273 }
274
275 if (!IsClosed())
276 {
277 // For example: Here we could schedule a new accept if we
278 // would not want to allow two connections at the same time.
279 return;
280 }
281
282 // Check whether the deadline has passed. We compare the deadline
283 // against the current time since a new asynchronous operation
284 // may have moved the deadline before this actor had a chance
285 // to run.
286 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
287 StartConnect();
288}
289
290void ConnectionSSL::HandleHandshake(const tcp::endpoint &endpoint, const bs::error_code& error)
291{
292 const string host = endpoint.port()==0 ? "" :
293 endpoint.address().to_string()+':'+to_string((long long unsigned int)endpoint.port());
294
295 if (!error)
296 {
297 stream->lowest_layer().set_option(boost::asio::socket_base::keep_alive(true));
298
299 const int optval = 30;
300
301#if BOOST_VERSION < 107000
302 // First keep alive after 30s
303 setsockopt(stream->lowest_layer().native(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
304 // New keep alive after 30s
305 setsockopt(stream->lowest_layer().native(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
306#else
307 // First keep alive after 30s
308 setsockopt(stream->lowest_layer().native_handle(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
309 // New keep alive after 30s
310 setsockopt(stream->lowest_layer().native_handle(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
311#endif
312
313 if (fVerbose)
314 Info("Connection established to "+host+"...");
315
316 fQueueSize = 0;
317 fConnectionStatus = kConnected;
318
319 ConnectionEstablished();
320 return;
321 }
322
323 // If returning from run will lead to deletion of this
324 // instance, close() is not needed (maybe implicitly called).
325 // If run is called again, close() is needed here. Otherwise:
326 // Software caused connection abort when we try to resolve
327 // the endpoint again.
328 CloseImp(false);
329
330 ostringstream msg;
331 if (!host.empty())
332 msg << "Handshake with " << host << ": " << error.message() << " (" << error << ")";
333
334 if (fErrConnect!=msg.str())
335 {
336 if (error!=ba::error::basic_errors::connection_refused)
337 fMsgConnect = "";
338 fErrConnect = msg.str();
339 Warn(fErrConnect);
340 }
341
342 if (error==ba::error::basic_errors::operation_aborted)
343 return;
344
345 fConnectionStatus = kConnecting;
346}
347
348bool ConnectionSSL::ConnectImp(const tcp::endpoint &endpoint, const bs::error_code& error)
349{
350 const string addr = endpoint.address().to_string();
351 const string host = endpoint.port()==0 ? "" : addr+':'+to_string((long long unsigned int)endpoint.port());
352
353 // Connection established
354 if (!error)
355 {
356 if (fVerbose)
357 Info("Starting handshake for "+fAddress);
358
359 //stream::set_verify_mode(ssl::verify_peer);
360 //stream::set_verify_callback(ssl::rfc2818_verification(fAddress));
361 //set_default_verify_paths();
362
363 stream->async_handshake(boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::client,
364 boost::bind(&ConnectionSSL::HandleHandshake, this,
365 endpoint, dummy::error));
366 return true;
367 }
368
369 /*
370 handshake(stream::client);
371
372 lowest_layer().set_option(boost::asio::socket_base::keep_alive(true));
373
374 const int optval = 30;
375 // First keep alive after 30s
376 setsockopt(lowest_layer().native(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
377 // New keep alive after 30s
378 setsockopt(lowest_layer().native(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
379
380 if (fVerbose)
381 Info("Connection established to "+host+"...");
382
383 fQueueSize = 0;
384 fConnectionStatus = kConnected;
385
386 ConnectionEstablished();
387 return true;
388 }*/
389
390 // If returning from run will lead to deletion of this
391 // instance, close() is not needed (maybe implicitly called).
392 // If run is called again, close() is needed here. Otherwise:
393 // Software caused connection abort when we try to resolve
394 // the endpoint again.
395 CloseImp(false);
396
397 ostringstream msg;
398 if (!host.empty())
399 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
400
401 if (fErrConnect!=msg.str())
402 {
403 if (error!=ba::error::basic_errors::connection_refused)
404 fMsgConnect = "";
405 fErrConnect = msg.str();
406 Warn(fErrConnect);
407 }
408
409 if (error==ba::error::basic_errors::operation_aborted)
410 return true;
411
412 fConnectionStatus = kConnecting;
413
414 return false;
415/*
416 // Go on with the next
417 if (++iterator != tcp::resolver::iterator())
418 {
419 AsyncConnect(iterator);
420 return;
421 }
422*/
423 // No more entries to try, if we would not put anything else
424 // into the queue anymore it would now return (run() would return)
425
426 // Since we don't want to block the main loop, we wait using an
427 // asnychronous timer
428
429 // FIXME: Should we move this before AsyncConnect() ?
430// AsyncWait(fConnectionTimer, 250, &ConnectionSSL::HandleConnectionTimer);
431}
432
433void ConnectionSSL::ConnectIter(tcp::resolver::iterator iterator, const bs::error_code& error)
434{
435 if (ConnectImp(*iterator, error))
436 return;
437
438 // Go on with the next
439 if (++iterator != tcp::resolver::iterator())
440 {
441 AsyncConnect(iterator);
442 return;
443 }
444
445 // No more entries to try, if we would not put anything else
446 // into the queue anymore it would now return (run() would return)
447 AsyncWait(fConnectionTimer, 250, &ConnectionSSL::HandleConnectionTimer);
448}
449
450void ConnectionSSL::ConnectAddr(const tcp::endpoint &endpoint, const bs::error_code& error)
451{
452 if (ConnectImp(endpoint, error))
453 return;
454
455 AsyncWait(fConnectionTimer, 250, &ConnectionSSL::HandleConnectionTimer);
456}
457
458// FIXME: Async connect should get address and port as an argument
459void ConnectionSSL::StartConnect()
460{
461 fConnectionStatus = kConnecting;
462
463 if (fEndpoint!=tcp::endpoint())
464 {
465 ostringstream msg;
466 msg << "Trying to connect to " << fEndpoint << "...";
467 if (fMsgConnect!=msg.str())
468 {
469 fMsgConnect = msg.str();
470 Info(msg);
471 }
472
473 AsyncConnect();
474 return;
475 }
476
477 const bool valid = !fAddress.empty() || !fPort.empty();
478
479 boost::system::error_code ec;
480
481 ostringstream msg;
482 if (!valid)
483 msg << "No target address... connection attempt postponed.";
484 else
485 {
486#if BOOST_VERSION < 107000
487 tcp::resolver resolver(stream->get_io_service());
488#else
489 tcp::resolver resolver(stream->get_executor());
490#endif
491
492 tcp::resolver::query query(fAddress, fPort);
493 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
494
495 msg << "Trying to connect to " << URL() << "...";
496
497 // Start connection attempts (will also reset deadline counter)
498 if (!ec)
499 AsyncConnect(iterator);
500 else
501 msg << " " << ec.message() << " (" << ec << ")";
502 }
503
504 // Only output message if it has changed
505 if (fMsgConnect!=msg.str())
506 {
507 fMsgConnect = msg.str();
508 if (ec)
509 Error(msg);
510 if (!ec && fVerbose)
511 Info(msg);
512 }
513
514 if (!valid || ec)
515 AsyncWait(fConnectionTimer, 250, &ConnectionSSL::HandleConnectionTimer);
516}
517
518void ConnectionSSL::SetEndpoint(const string &addr, int port)
519{
520 if (fConnectionStatus>=1)
521 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
522
523 fAddress = addr;
524 fPort = to_string((long long)port);
525}
526
527void ConnectionSSL::SetEndpoint(const string &addr, const string &port)
528{
529 if (fConnectionStatus>=1 && URL()!=":")
530 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
531
532 fAddress = addr;
533 fPort = port;
534}
535
536void ConnectionSSL::SetEndpoint(const string &addr)
537{
538 const size_t p0 = addr.find_first_of(':');
539 const size_t p1 = addr.find_last_of(':');
540
541 if (p0==string::npos || p0!=p1)
542 {
543 Error("ConnectionSSL::SetEndpoint - Wrong format of argument '"+addr+"' ('host:port' expected)");
544 return;
545 }
546
547 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
548}
549
550void ConnectionSSL::SetEndpoint(const tcp::endpoint &ep)
551{
552 const ba::ip::address addr = ep.address();
553
554 const ba::ip::address use =
555 addr.is_v6() && addr.to_v6().is_loopback() ?
556 ba::ip::address(ba::ip::address_v4::loopback()) :
557 addr;
558
559 SetEndpoint(use.to_string(), ep.port());
560
561 fEndpoint = tcp::endpoint(use, ep.port());
562}
563
564ConnectionSSL::ConnectionSSL(ba::io_service& ioservice, ostream &out) : MessageImp(out),
565#if BOOST_VERSION < 107000
566ssl::context(ioservice, boost::asio::ssl::context::method::sslv23_client),
567#else
568ssl::context(boost::asio::ssl::context::method::sslv23_client),
569#endif
570stream(boost::in_place_init, ioservice, *this),
571fLog(0), fVerbose(true), fDebugTx(false),
572fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
573fQueueSize(0), fConnectionStatus(kDisconnected)
574{
575 //stream::set_verify_mode(ssl::verify_none);
576}
Note: See TracBrowser for help on using the repository browser.