source: trunk/FACT++/src/Connection.cc@ 19921

Last change on this file since 19921 was 16769, checked in by tbretz, 12 years ago
There is no need to make the scheduling of the async write asynchronously, so I schedule the async write now directly; this also removed the send queue, which was primarily introduced to make the tranmission to the FTM serial, but this is not needed anymore; to keep track of the send queue size and for debugging, a counter for the messages in the send buffer has been introduced
File size: 12.8 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6@todo
7 Unify with ConnectionUSB
8
9*/
10// **************************************************************************
11#include "Connection.h"
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using ba::ip::tcp;
20
21 // -------- Abbreviations for starting async tasks ---------
22
23int Connection::Write(const Time &t, const string &txt, int qos)
24{
25 if (fLog)
26 return fLog->Write(t, txt, qos);
27
28 return MessageImp::Write(t, txt, qos);
29}
30
31void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
32{
33 ba::async_read(*this, buffers,
34 boost::bind(&Connection::HandleReceivedData, this,
35 dummy::error, dummy::bytes_transferred, type));
36}
37
38void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
39{
40 ba::async_write(*this, buffers,
41 boost::bind(&Connection::HandleSentData, this,
42 dummy::error, dummy::bytes_transferred));
43}
44
45/*
46void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
47 void (Connection::*handler)(const bs::error_code&))
48{
49 // - The boost::asio::basic_deadline_timer::expires_from_now()
50 // function cancels any pending asynchronous waits, and returns
51 // the number of asynchronous waits that were cancelled. If it
52 // returns 0 then you were too late and the wait handler has
53 // already been executed, or will soon be executed. If it
54 // returns 1 then the wait handler was successfully cancelled.
55 // - If a wait handler is cancelled, the bs::error_code passed to
56 // it contains the value bs::error::operation_aborted.
57 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
58
59 timer.async_wait(boost::bind(handler, this, dummy::error));
60}
61*/
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65 tcp::endpoint endpoint = *iterator;
66
67 // AsyncConnect + Deadline
68 async_connect(endpoint,
69 boost::bind(&Connection::ConnectIter,
70 this, iterator, ba::placeholders::error));
71
72 // We will get a "Connection timeout anyway"
73 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
74}
75
76void Connection::AsyncConnect()
77{
78 // AsyncConnect + Deadline
79 async_connect(fEndpoint,
80 boost::bind(&Connection::ConnectAddr,
81 this, fEndpoint, ba::placeholders::error));
82
83 // We will get a "Connection timeout anyway"
84 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
85}
86
87// ------------------------ close --------------------------
88// close from another thread
89void Connection::CloseImp(bool restart)
90{
91 if (IsConnected() && fVerbose)
92 {
93 ostringstream str;
94 str << "Connection closed to " << URL() << ".";
95 Info(str);
96 }
97
98 // Stop any pending connection attempt
99 fConnectionTimer.cancel();
100
101 // Close possible open connections
102 close();
103
104 // Reset the connection status
105 fQueueSize = 0;
106 fConnectionStatus = kDisconnected;
107
108 // Stop deadline counters
109 fInTimeout.cancel();
110 fOutTimeout.cancel();
111
112 if (!restart || IsConnecting())
113 return;
114
115 // We need some timeout before reconnecting!
116 // And we have to check if we are alreayd trying to connect
117 // We shoudl wait until all operations in progress were canceled
118
119 // Start trying to reconnect
120 fMsgConnect = "";
121 fErrConnect = "";
122 StartConnect();
123}
124
125void Connection::PostClose(bool restart)
126{
127 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
128}
129
130// ------------------------ write --------------------------
131void Connection::HandleWriteTimeout(const bs::error_code &error)
132{
133 if (error==ba::error::basic_errors::operation_aborted)
134 return;
135
136 // 125: Operation canceled (bs::error_code(125, bs::system_category))
137 if (error)
138 {
139 ostringstream str;
140 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
141 Error(str);
142
143 CloseImp();
144 return;
145 }
146
147 if (!is_open())
148 {
149 // For example: Here we could schedule a new accept if we
150 // would not want to allow two connections at the same time.
151 return;
152 }
153
154 // Check whether the deadline has passed. We compare the deadline
155 // against the current time since a new asynchronous operation
156 // may have moved the deadline before this actor had a chance
157 // to run.
158 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
159 return;
160
161 Error("fOutTimeout has expired, writing data to "+URL());
162
163 CloseImp();
164}
165
166void Connection::HandleSentData(const bs::error_code& error, size_t n)
167{
168 if (error==ba::error::basic_errors::operation_aborted)
169 return;
170
171 if (error && error != ba::error::not_connected)
172 {
173 ostringstream str;
174 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
175 Error(str);
176
177 CloseImp();
178 return;
179 }
180
181 if (error == ba::error::not_connected)
182 {
183 ostringstream msg;
184 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
185 Warn(msg);
186
187 return;
188 }
189
190 if (--fQueueSize==0)
191 fOutTimeout.cancel();
192
193 if (fDebugTx)
194 {
195 ostringstream msg;
196 msg << n << " bytes successfully sent to " << URL();
197 Debug(msg);
198 }
199}
200
201void Connection::PostMessage(const void *ptr, size_t sz)
202{
203 // This function can be called from a different thread...
204 if (!is_open())
205 return;
206
207 // ... this is why we have to increase fQueueSize first
208 fQueueSize++;
209
210 // ... and shift the deadline timer
211 // This is not ideal, because if we are continously
212 // filling the buffer, it will never timeout
213 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
214
215 // Now we can schedule the buffer to be sent
216 AsyncWrite(ba::const_buffers_1(ptr, sz));
217
218 // If a socket is closed, all pending asynchronous
219 // operation will be aborted.
220}
221
222void Connection::PostMessage(const string &cmd, size_t max)
223{
224 if (max==size_t(-1))
225 max = cmd.length()+1;
226
227 PostMessage(cmd.c_str(), min(cmd.length()+1, max));
228}
229
230void Connection::HandleConnectionTimer(const bs::error_code &error)
231{
232 if (error==ba::error::basic_errors::operation_aborted)
233 return;
234
235 if (error)
236 {
237 ostringstream str;
238 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
239 Error(str);
240 }
241
242 if (is_open())
243 {
244 // For example: Here we could schedule a new accept if we
245 // would not want to allow two connections at the same time.
246 return;
247 }
248
249 // Check whether the deadline has passed. We compare the deadline
250 // against the current time since a new asynchronous operation
251 // may have moved the deadline before this actor had a chance
252 // to run.
253 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
254 StartConnect();
255}
256
257bool Connection::ConnectImp(const tcp::endpoint &endpoint, const bs::error_code& error)
258{
259 const string host = endpoint.port()==0 ? "" :
260 endpoint.address().to_string()+':'+to_string((long long unsigned int)endpoint.port());
261
262 // Connection established
263 if (!error)
264 {
265 set_option(socket_base::keep_alive(true));
266
267 const int optval = 30;
268 // First keep alive after 30s
269 setsockopt(native(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
270 // New keep alive after 30s
271 setsockopt(native(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
272
273 if (fVerbose)
274 Info("Connection established to "+host+"...");
275
276 fQueueSize = 0;
277 fConnectionStatus = kConnected;
278
279 ConnectionEstablished();
280 return true;
281 }
282
283 // If returning from run will lead to deletion of this
284 // instance, close() is not needed (maybe implicitly called).
285 // If run is called again, close() is needed here. Otherwise:
286 // Software caused connection abort when we try to resolve
287 // the endpoint again.
288 CloseImp(false);
289
290 ostringstream msg;
291 if (!host.empty())
292 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
293
294 if (fErrConnect!=msg.str())
295 {
296 if (error!=ba::error::basic_errors::connection_refused)
297 fMsgConnect = "";
298 fErrConnect = msg.str();
299 Warn(fErrConnect);
300 }
301
302 if (error==ba::error::basic_errors::operation_aborted)
303 return true;
304
305 fConnectionStatus = kConnecting;
306
307 return false;
308/*
309 // Go on with the next
310 if (++iterator != tcp::resolver::iterator())
311 {
312 AsyncConnect(iterator);
313 return;
314 }
315*/
316 // No more entries to try, if we would not put anything else
317 // into the queue anymore it would now return (run() would return)
318
319 // Since we don't want to block the main loop, we wait using an
320 // asnychronous timer
321
322 // FIXME: Should we move this before AsyncConnect() ?
323// AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
324}
325
326void Connection::ConnectIter(tcp::resolver::iterator iterator, const bs::error_code& error)
327{
328 if (ConnectImp(*iterator, error))
329 return;
330
331 // Go on with the next
332 if (++iterator != tcp::resolver::iterator())
333 {
334 AsyncConnect(iterator);
335 return;
336 }
337
338 // No more entries to try, if we would not put anything else
339 // into the queue anymore it would now return (run() would return)
340 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
341}
342
343void Connection::ConnectAddr(const tcp::endpoint &endpoint, const bs::error_code& error)
344{
345 if (ConnectImp(endpoint, error))
346 return;
347
348 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
349}
350
351// FIXME: Async connect should get address and port as an argument
352void Connection::StartConnect()
353{
354 fConnectionStatus = kConnecting;
355
356 if (fEndpoint!=tcp::endpoint())
357 {
358 ostringstream msg;
359 msg << "Trying to connect to " << fEndpoint << "...";
360 if (fMsgConnect!=msg.str())
361 {
362 fMsgConnect = msg.str();
363 Info(msg);
364 }
365
366 AsyncConnect();
367 return;
368 }
369
370 const bool valid = !fAddress.empty() || !fPort.empty();
371
372 boost::system::error_code ec;
373
374 ostringstream msg;
375 if (!valid)
376 msg << "No target address... connection attempt postponed.";
377 else
378 {
379 tcp::resolver resolver(get_io_service());
380
381 tcp::resolver::query query(fAddress, fPort);
382 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
383
384 msg << "Trying to connect to " << URL() << "...";
385
386 // Start connection attempts (will also reset deadline counter)
387 if (!ec)
388 AsyncConnect(iterator);
389 else
390 msg << " " << ec.message() << " (" << ec << ")";
391 }
392
393 // Only output message if it has changed
394 if (fMsgConnect!=msg.str())
395 {
396 fMsgConnect = msg.str();
397 if (ec)
398 Error(msg);
399 if (!ec && fVerbose)
400 Info(msg);
401 }
402
403 if (!valid || ec)
404 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
405}
406
407void Connection::SetEndpoint(const string &addr, int port)
408{
409 if (fConnectionStatus>=1)
410 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
411
412 fAddress = addr;
413 fPort = to_string((long long)port);
414}
415
416void Connection::SetEndpoint(const string &addr, const string &port)
417{
418 if (fConnectionStatus>=1 && URL()!=":")
419 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
420
421 fAddress = addr;
422 fPort = port;
423}
424
425void Connection::SetEndpoint(const string &addr)
426{
427 const size_t p0 = addr.find_first_of(':');
428 const size_t p1 = addr.find_last_of(':');
429
430 if (p0==string::npos || p0!=p1)
431 {
432 Error("Connection::SetEndpoint - Wrong format of argument '"+addr+"' ('host:port' expected)");
433 return;
434 }
435
436 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
437}
438
439void Connection::SetEndpoint(const tcp::endpoint &ep)
440{
441 const ba::ip::address addr = ep.address();
442
443 const ba::ip::address use =
444 addr.is_v6() && addr.to_v6().is_loopback() ?
445 ba::ip::address(ba::ip::address_v4::loopback()) :
446 addr;
447
448 SetEndpoint(use.to_string(), ep.port());
449
450 fEndpoint = tcp::endpoint(use, ep.port());
451}
452
453
454Connection::Connection(ba::io_service& ioservice, ostream &out) :
455MessageImp(out), tcp::socket(ioservice),
456fLog(0), fVerbose(true), fDebugTx(false),
457fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
458fQueueSize(0), fConnectionStatus(kDisconnected)
459{
460}
Note: See TracBrowser for help on using the repository browser.