source: trunk/FACT++/src/ConnectionSSL.cc@ 20011

Last change on this file since 20011 was 20005, checked in by tbretz, 4 years ago
The API in boost 1.70 changed.
File size: 15.6 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6@todo
7 Unify with ConnectionUSB
8
9*/
10// **************************************************************************
11#include "ConnectionSSL.h"
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace ssl = boost::asio::ssl;
17namespace bs = boost::system;
18namespace dummy = ba::placeholders;
19
20typedef ssl::stream<boost::asio::ip::tcp::socket> stream;
21
22using ba::ip::tcp;
23
24 // -------- Abbreviations for starting async tasks ---------
25
26int ConnectionSSL::Write(const Time &t, const string &txt, int qos)
27{
28 if (fLog)
29 return fLog->Write(t, txt, qos);
30
31 return MessageImp::Write(t, txt, qos);
32}
33
34void ConnectionSSL::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
35{
36 ba::async_read(*this, buffers,
37 boost::bind(&ConnectionSSL::HandleReceivedData, this,
38 dummy::error, dummy::bytes_transferred, type));
39}
40
41void ConnectionSSL::AsyncWrite(const ba::const_buffers_1 &buffers)
42{
43 ba::async_write(*this, buffers,
44 boost::bind(&ConnectionSSL::HandleSentData, this,
45 dummy::error, dummy::bytes_transferred));
46}
47
48/*
49void ConnectionSSL::AsyncWait(ba::deadline_timer &timer, int millisec,
50 void (ConnectionSSL::*handler)(const bs::error_code&))
51{
52 // - The boost::asio::basic_deadline_timer::expires_from_now()
53 // function cancels any pending asynchronous waits, and returns
54 // the number of asynchronous waits that were cancelled. If it
55 // returns 0 then you were too late and the wait handler has
56 // already been executed, or will soon be executed. If it
57 // returns 1 then the wait handler was successfully cancelled.
58 // - If a wait handler is cancelled, the bs::error_code passed to
59 // it contains the value bs::error::operation_aborted.
60 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
61
62 timer.async_wait(boost::bind(handler, this, dummy::error));
63}
64*/
65
66void ConnectionSSL::AsyncConnect(tcp::resolver::iterator iterator)
67{
68 tcp::endpoint endpoint = *iterator;
69
70 // AsyncConnect + Deadline
71 lowest_layer().async_connect(endpoint,
72 boost::bind(&ConnectionSSL::ConnectIter,
73 this, iterator, ba::placeholders::error));
74
75 // We will get a "Connection timeout anyway"
76 //AsyncWait(fConnectTimeout, 5, &ConnectionSSL::HandleConnectTimeout);
77}
78
79void ConnectionSSL::AsyncConnect()
80{
81 // AsyncConnect + Deadline
82 lowest_layer().async_connect(fEndpoint,
83 boost::bind(&ConnectionSSL::ConnectAddr,
84 this, fEndpoint, ba::placeholders::error));
85
86 // We will get a "Connection timeout anyway"
87 //AsyncWait(fConnectTimeout, 5, &ConnectionSSL::HandleConnectTimeout);
88}
89
90// ------------------------ close --------------------------
91// close from another thread
92void ConnectionSSL::CloseImp(bool restart)
93{
94 if (IsConnected() && fVerbose)
95 {
96 ostringstream str;
97 str << "Connection closed to " << URL() << ".";
98 Info(str);
99 }
100
101 // Stop any pending connection attempt
102 fConnectionTimer.cancel();
103
104 // Close possible open connections
105 lowest_layer().close();
106
107 // Reset the connection status
108 fQueueSize = 0;
109 fConnectionStatus = kDisconnected;
110
111 // Stop deadline counters
112 fInTimeout.cancel();
113 fOutTimeout.cancel();
114
115 if (!restart || IsConnecting())
116 return;
117
118 // We need some timeout before reconnecting!
119 // And we have to check if we are alreayd trying to connect
120 // We shoudl wait until all operations in progress were canceled
121
122 // Start trying to reconnect
123 fMsgConnect = "";
124 fErrConnect = "";
125 StartConnect();
126}
127
128void ConnectionSSL::PostClose(bool restart)
129{
130 get_io_service().post(boost::bind(&ConnectionSSL::CloseImp, this, restart));
131}
132
133// ------------------------ write --------------------------
134void ConnectionSSL::HandleWriteTimeout(const bs::error_code &error)
135{
136 if (error==ba::error::basic_errors::operation_aborted)
137 return;
138
139 // 125: Operation canceled (bs::error_code(125, bs::system_category))
140 if (error)
141 {
142 ostringstream str;
143 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
144 Error(str);
145
146 CloseImp();
147 return;
148 }
149
150 if (IsClosed())
151 {
152 // For example: Here we could schedule a new accept if we
153 // would not want to allow two connections at the same time.
154 return;
155 }
156
157 // Check whether the deadline has passed. We compare the deadline
158 // against the current time since a new asynchronous operation
159 // may have moved the deadline before this actor had a chance
160 // to run.
161 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
162 return;
163
164 Error("fOutTimeout has expired, writing data to "+URL());
165
166 CloseImp();
167}
168
169void ConnectionSSL::HandleSentData(const bs::error_code& error, size_t n)
170{
171 if (error==ba::error::basic_errors::operation_aborted)
172 return;
173
174 if (error && error != ba::error::not_connected)
175 {
176 ostringstream str;
177 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
178 Error(str);
179
180 CloseImp();
181 return;
182 }
183
184 if (error == ba::error::not_connected)
185 {
186 ostringstream msg;
187 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
188 Warn(msg);
189
190 return;
191 }
192
193 if (--fQueueSize==0)
194 fOutTimeout.cancel();
195
196 if (fDebugTx)
197 {
198 ostringstream msg;
199 msg << n << " bytes successfully sent to " << URL();
200 Debug(msg);
201 }
202}
203
204void ConnectionSSL::PostMessage(const void *ptr, size_t sz)
205{
206 // This function can be called from a different thread...
207 if (IsClosed())
208 return;
209
210 // ... this is why we have to increase fQueueSize first
211 fQueueSize++;
212
213 // ... and shift the deadline timer
214 // This is not ideal, because if we are continously
215 // filling the buffer, it will never timeout
216 AsyncWait(fOutTimeout, 5000, &ConnectionSSL::HandleWriteTimeout);
217
218 // Now we can schedule the buffer to be sent
219 AsyncWrite(ba::const_buffers_1(ptr, sz));
220
221 // If a socket is closed, all pending asynchronous
222 // operation will be aborted.
223}
224
225void ConnectionSSL::PostMessage(const string &cmd, size_t max)
226{
227 if (max==size_t(-1))
228 max = cmd.length()+1;
229
230 PostMessage(cmd.c_str(), min(cmd.length()+1, max));
231}
232
233void ConnectionSSL::HandleConnectionTimer(const bs::error_code &error)
234{
235 if (error==ba::error::basic_errors::operation_aborted)
236 return;
237
238 if (error)
239 {
240 ostringstream str;
241 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
242 Error(str);
243 }
244
245 if (!IsClosed())
246 {
247 // For example: Here we could schedule a new accept if we
248 // would not want to allow two connections at the same time.
249 return;
250 }
251
252 // Check whether the deadline has passed. We compare the deadline
253 // against the current time since a new asynchronous operation
254 // may have moved the deadline before this actor had a chance
255 // to run.
256 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
257 StartConnect();
258}
259
260void ConnectionSSL::HandleHandshake(const tcp::endpoint &endpoint, const bs::error_code& error)
261{
262 const string host = endpoint.port()==0 ? "" :
263 endpoint.address().to_string()+':'+to_string((long long unsigned int)endpoint.port());
264
265 if (!error)
266 {
267 lowest_layer().set_option(boost::asio::socket_base::keep_alive(true));
268
269 const int optval = 30;
270
271#if BOOST_VERSION <107000
272 // First keep alive after 30s
273 setsockopt(lowest_layer().native(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
274 // New keep alive after 30s
275 setsockopt(lowest_layer().native(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
276#else
277 // First keep alive after 30s
278 setsockopt(lowest_layer().native_handle(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
279 // New keep alive after 30s
280 setsockopt(lowest_layer().native_handle(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
281#endif
282
283 if (fVerbose)
284 Info("Connection established to "+host+"...");
285
286 fQueueSize = 0;
287 fConnectionStatus = kConnected;
288
289 ConnectionEstablished();
290 return;
291 }
292
293 // If returning from run will lead to deletion of this
294 // instance, close() is not needed (maybe implicitly called).
295 // If run is called again, close() is needed here. Otherwise:
296 // Software caused connection abort when we try to resolve
297 // the endpoint again.
298 CloseImp(false);
299
300 ostringstream msg;
301 if (!host.empty())
302 msg << "Handshake with " << host << ": " << error.message() << " (" << error << ")";
303
304 if (fErrConnect!=msg.str())
305 {
306 if (error!=ba::error::basic_errors::connection_refused)
307 fMsgConnect = "";
308 fErrConnect = msg.str();
309 Warn(fErrConnect);
310 }
311
312 if (error==ba::error::basic_errors::operation_aborted)
313 return;
314
315 fConnectionStatus = kConnecting;
316}
317
318bool ConnectionSSL::ConnectImp(const tcp::endpoint &endpoint, const bs::error_code& error)
319{
320 const string addr = endpoint.address().to_string();
321 const string host = endpoint.port()==0 ? "" : addr+':'+to_string((long long unsigned int)endpoint.port());
322
323 // Connection established
324 if (!error)
325 {
326 if (fVerbose)
327 Info("Starting handshake for "+fAddress);
328
329 //stream::set_verify_mode(ssl::verify_peer);
330 //stream::set_verify_callback(ssl::rfc2818_verification(fAddress));
331 //set_default_verify_paths();
332
333 async_handshake(stream::client,
334 boost::bind(&ConnectionSSL::HandleHandshake, this,
335 endpoint, dummy::error));
336 return true;
337 }
338
339 /*
340 handshake(stream::client);
341
342 lowest_layer().set_option(boost::asio::socket_base::keep_alive(true));
343
344 const int optval = 30;
345 // First keep alive after 30s
346 setsockopt(lowest_layer().native(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
347 // New keep alive after 30s
348 setsockopt(lowest_layer().native(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
349
350 if (fVerbose)
351 Info("Connection established to "+host+"...");
352
353 fQueueSize = 0;
354 fConnectionStatus = kConnected;
355
356 ConnectionEstablished();
357 return true;
358 }*/
359
360 // If returning from run will lead to deletion of this
361 // instance, close() is not needed (maybe implicitly called).
362 // If run is called again, close() is needed here. Otherwise:
363 // Software caused connection abort when we try to resolve
364 // the endpoint again.
365 CloseImp(false);
366
367 ostringstream msg;
368 if (!host.empty())
369 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
370
371 if (fErrConnect!=msg.str())
372 {
373 if (error!=ba::error::basic_errors::connection_refused)
374 fMsgConnect = "";
375 fErrConnect = msg.str();
376 Warn(fErrConnect);
377 }
378
379 if (error==ba::error::basic_errors::operation_aborted)
380 return true;
381
382 fConnectionStatus = kConnecting;
383
384 return false;
385/*
386 // Go on with the next
387 if (++iterator != tcp::resolver::iterator())
388 {
389 AsyncConnect(iterator);
390 return;
391 }
392*/
393 // No more entries to try, if we would not put anything else
394 // into the queue anymore it would now return (run() would return)
395
396 // Since we don't want to block the main loop, we wait using an
397 // asnychronous timer
398
399 // FIXME: Should we move this before AsyncConnect() ?
400// AsyncWait(fConnectionTimer, 250, &ConnectionSSL::HandleConnectionTimer);
401}
402
403void ConnectionSSL::ConnectIter(tcp::resolver::iterator iterator, const bs::error_code& error)
404{
405 if (ConnectImp(*iterator, error))
406 return;
407
408 // Go on with the next
409 if (++iterator != tcp::resolver::iterator())
410 {
411 AsyncConnect(iterator);
412 return;
413 }
414
415 // No more entries to try, if we would not put anything else
416 // into the queue anymore it would now return (run() would return)
417 AsyncWait(fConnectionTimer, 250, &ConnectionSSL::HandleConnectionTimer);
418}
419
420void ConnectionSSL::ConnectAddr(const tcp::endpoint &endpoint, const bs::error_code& error)
421{
422 if (ConnectImp(endpoint, error))
423 return;
424
425 AsyncWait(fConnectionTimer, 250, &ConnectionSSL::HandleConnectionTimer);
426}
427
428// FIXME: Async connect should get address and port as an argument
429void ConnectionSSL::StartConnect()
430{
431 fConnectionStatus = kConnecting;
432
433 if (fEndpoint!=tcp::endpoint())
434 {
435 ostringstream msg;
436 msg << "Trying to connect to " << fEndpoint << "...";
437 if (fMsgConnect!=msg.str())
438 {
439 fMsgConnect = msg.str();
440 Info(msg);
441 }
442
443 AsyncConnect();
444 return;
445 }
446
447 const bool valid = !fAddress.empty() || !fPort.empty();
448
449 boost::system::error_code ec;
450
451 ostringstream msg;
452 if (!valid)
453 msg << "No target address... connection attempt postponed.";
454 else
455 {
456 tcp::resolver resolver(get_io_service());
457
458 tcp::resolver::query query(fAddress, fPort);
459 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
460
461 msg << "Trying to connect to " << URL() << "...";
462
463 // Start connection attempts (will also reset deadline counter)
464 if (!ec)
465 AsyncConnect(iterator);
466 else
467 msg << " " << ec.message() << " (" << ec << ")";
468 }
469
470 // Only output message if it has changed
471 if (fMsgConnect!=msg.str())
472 {
473 fMsgConnect = msg.str();
474 if (ec)
475 Error(msg);
476 if (!ec && fVerbose)
477 Info(msg);
478 }
479
480 if (!valid || ec)
481 AsyncWait(fConnectionTimer, 250, &ConnectionSSL::HandleConnectionTimer);
482}
483
484void ConnectionSSL::SetEndpoint(const string &addr, int port)
485{
486 if (fConnectionStatus>=1)
487 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
488
489 fAddress = addr;
490 fPort = to_string((long long)port);
491}
492
493void ConnectionSSL::SetEndpoint(const string &addr, const string &port)
494{
495 if (fConnectionStatus>=1 && URL()!=":")
496 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
497
498 fAddress = addr;
499 fPort = port;
500}
501
502void ConnectionSSL::SetEndpoint(const string &addr)
503{
504 const size_t p0 = addr.find_first_of(':');
505 const size_t p1 = addr.find_last_of(':');
506
507 if (p0==string::npos || p0!=p1)
508 {
509 Error("ConnectionSSL::SetEndpoint - Wrong format of argument '"+addr+"' ('host:port' expected)");
510 return;
511 }
512
513 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
514}
515
516void ConnectionSSL::SetEndpoint(const tcp::endpoint &ep)
517{
518 const ba::ip::address addr = ep.address();
519
520 const ba::ip::address use =
521 addr.is_v6() && addr.to_v6().is_loopback() ?
522 ba::ip::address(ba::ip::address_v4::loopback()) :
523 addr;
524
525 SetEndpoint(use.to_string(), ep.port());
526
527 fEndpoint = tcp::endpoint(use, ep.port());
528}
529
530ConnectionSSL::ConnectionSSL(ba::io_service& ioservice, ostream &out) : MessageImp(out),
531ssl::context(ioservice, boost::asio::ssl::context::method::sslv23_client),
532stream(ioservice, *this), fLog(0), fVerbose(true), fDebugTx(false),
533fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
534fQueueSize(0), fConnectionStatus(kDisconnected)
535{
536 //stream::set_verify_mode(ssl::verify_none);
537}
Note: See TracBrowser for help on using the repository browser.