source: trunk/FACT++/src/Connection.cc@ 11408

Last change on this file since 11408 was 11370, checked in by tbretz, 14 years ago
Set URL from SetEndpoint(tcp::endpoint)
File size: 13.5 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6@todo
7 Unify with ConnectionUSB
8
9*/
10// **************************************************************************
11#include "Connection.h"
12
13#include <boost/bind.hpp>
14#include <boost/lexical_cast.hpp>
15
16using namespace std;
17
18namespace ba = boost::asio;
19namespace bs = boost::system;
20namespace dummy = ba::placeholders;
21
22using boost::lexical_cast;
23using ba::ip::tcp;
24
25 // -------- Abbreviations for starting async tasks ---------
26
27int Connection::Write(const Time &t, const string &txt, int qos)
28{
29 if (fLog)
30 return fLog->Write(t, txt, qos);
31
32 return MessageImp::Write(t, txt, qos);
33}
34
35void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
36{
37 ba::async_read(*this, buffers,
38 boost::bind(&Connection::HandleReceivedData, this,
39 dummy::error, dummy::bytes_transferred, type));
40}
41
42void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
43{
44 ba::async_write(*this, buffers,
45 boost::bind(&Connection::HandleSentData, this,
46 dummy::error, dummy::bytes_transferred));
47}
48
49/*
50void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
51 void (Connection::*handler)(const bs::error_code&))
52{
53 // - The boost::asio::basic_deadline_timer::expires_from_now()
54 // function cancels any pending asynchronous waits, and returns
55 // the number of asynchronous waits that were cancelled. If it
56 // returns 0 then you were too late and the wait handler has
57 // already been executed, or will soon be executed. If it
58 // returns 1 then the wait handler was successfully cancelled.
59 // - If a wait handler is cancelled, the bs::error_code passed to
60 // it contains the value bs::error::operation_aborted.
61 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
62
63 timer.async_wait(boost::bind(handler, this, dummy::error));
64}
65*/
66
67void Connection::AsyncConnect(tcp::resolver::iterator iterator)
68{
69 tcp::endpoint endpoint = *iterator;
70
71 // AsyncConnect + Deadline
72 async_connect(endpoint,
73 boost::bind(&Connection::ConnectIter,
74 this, iterator, ba::placeholders::error));
75
76 // We will get a "Connection timeout anyway"
77 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
78}
79
80void Connection::AsyncConnect()
81{
82 // AsyncConnect + Deadline
83 async_connect(fEndpoint,
84 boost::bind(&Connection::ConnectAddr,
85 this, fEndpoint, ba::placeholders::error));
86
87 // We will get a "Connection timeout anyway"
88 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
89}
90
91// ------------------------ close --------------------------
92// close from another thread
93void Connection::CloseImp(bool restart)
94{
95 if (IsConnected())
96 {
97 ostringstream str;
98 str << "Connection closed to " << URL() << ".";
99 Info(str);
100 }
101
102 // Stop any pending connection attempt
103 fConnectionTimer.cancel();
104
105 // Close possible open connections
106 close();
107
108 // Reset the connection status
109 fConnectionStatus = kDisconnected;
110
111 // Stop deadline counters
112 fInTimeout.cancel();
113 fOutTimeout.cancel();
114
115 // Empty output queue
116 fOutQueue.clear();
117
118 if (!restart || IsConnecting())
119 return;
120
121 // We need some timeout before reconnecting!
122 // And we have to check if we are alreayd trying to connect
123 // We shoudl wait until all operations in progress were canceled
124
125 // Start trying to reconnect
126 fMsgConnect = "";
127 fErrConnect = "";
128 StartConnect();
129}
130
131void Connection::PostClose(bool restart)
132{
133 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
134}
135
136// ------------------------ write --------------------------
137void Connection::HandleWriteTimeout(const bs::error_code &error)
138{
139 if (error==ba::error::basic_errors::operation_aborted)
140 return;
141
142 // 125: Operation canceled (bs::error_code(125, bs::system_category))
143 if (error)
144 {
145 ostringstream str;
146 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
147 Error(str);
148
149 CloseImp();
150 return;
151 }
152
153 if (!is_open())
154 {
155 // For example: Here we could schedule a new accept if we
156 // would not want to allow two connections at the same time.
157 return;
158 }
159
160 // Check whether the deadline has passed. We compare the deadline
161 // against the current time since a new asynchronous operation
162 // may have moved the deadline before this actor had a chance
163 // to run.
164 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
165 return;
166
167 Error("fOutTimeout has expired, writing data to "+URL());
168
169 CloseImp();
170}
171
172void Connection::HandleSentData(const bs::error_code& error, size_t n)
173{
174 if (error && error != ba::error::not_connected)
175 {
176 ostringstream str;
177 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
178 Error(str);
179
180 CloseImp();
181 return;
182 }
183
184 if (error == ba::error::not_connected)
185 {
186 ostringstream msg;
187 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
188 Warn(msg);
189 }
190 else
191 {
192 if (fDebugTx)
193 {
194 ostringstream msg;
195 msg << n << " bytes successfully sent to " << URL();
196 Message(msg);
197 }
198 }
199
200 // This is "thread" safe because SendMessage and HandleSentMessage
201 // are serialized in the EventQueue. Note: Do not call these
202 // functions directly from any other place then Handlers, use
203 // PostMessage instead
204 fOutQueue.pop_front();
205
206 if (fOutQueue.empty())
207 {
208 // Queue went empty, remove deadline
209 fOutTimeout.cancel();
210 return;
211 }
212
213 // AsyncWrite + Deadline
214 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
215 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
216}
217
218// It is important that when SendMessageImp is called, or to be more
219// precise boost::bind is called, teh data is copied!
220void Connection::SendMessageImp(const vector<char> msg)
221{
222 /*
223 if (!fConnectionEstablished)
224 {
225 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
226 return;
227 }*/
228
229 const bool first_message_in_queue = fOutQueue.empty();
230
231 // This is "thread" safe because SendMessage and HandleSentMessage
232 // are serialized in the EventQueue. Note: Do not call these
233 // functions directly from any other place then Handlers, use
234 // PostMessage instead
235 fOutQueue.push_back(msg);
236
237 if (!first_message_in_queue)
238 return;
239
240 // AsyncWrite + Deadline
241 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
242 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
243}
244
245void Connection::PostMessage(const void *ptr, size_t max)
246{
247 const vector<char> msg(reinterpret_cast<const char*>(ptr),
248 reinterpret_cast<const char*>(ptr)+max);
249
250 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
251}
252
253void Connection::PostMessage(const string &cmd, size_t max)
254{
255 if (max==size_t(-1))
256 max = cmd.length()+1;
257
258 vector <char>msg(max);
259
260 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
261
262 PostMessage(msg);
263}
264
265void Connection::HandleConnectionTimer(const bs::error_code &error)
266{
267 if (error==ba::error::basic_errors::operation_aborted)
268 return;
269
270 if (error)
271 {
272 ostringstream str;
273 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
274 Error(str);
275 }
276
277 if (is_open())
278 {
279 // For example: Here we could schedule a new accept if we
280 // would not want to allow two connections at the same time.
281 return;
282 }
283
284 // Check whether the deadline has passed. We compare the deadline
285 // against the current time since a new asynchronous operation
286 // may have moved the deadline before this actor had a chance
287 // to run.
288 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
289 StartConnect();
290}
291
292bool Connection::ConnectImp(const tcp::endpoint &endpoint, const bs::error_code& error)
293{
294 const string host = endpoint.port()==0 ? "" :
295 endpoint.address().to_string()+":"+lexical_cast<string>(endpoint.port());
296
297 // Connection established
298 if (!error)
299 {
300 set_option(socket_base::keep_alive(true));
301
302 const int optval = 10;
303 // First keep alive after 10s
304 setsockopt(native(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
305 // New keep alive after 10s
306 setsockopt(native(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
307
308 Info("Connection established to "+host+"...");
309
310 fConnectionStatus = kConnected;
311
312 ConnectionEstablished();
313 return true;
314 }
315
316 // If returning from run will lead to deletion of this
317 // instance, close() is not needed (maybe implicitly called).
318 // If run is called again, close() is needed here. Otherwise:
319 // Software caused connection abort when we try to resolve
320 // the endpoint again.
321 CloseImp(false);
322
323 ostringstream msg;
324 if (!host.empty())
325 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
326
327 if (fErrConnect!=msg.str())
328 {
329 if (error!=ba::error::basic_errors::connection_refused)
330 fMsgConnect = "";
331 fErrConnect = msg.str();
332 Warn(fErrConnect);
333 }
334
335 if (error==ba::error::basic_errors::operation_aborted)
336 return true;
337
338 fConnectionStatus = kConnecting;
339
340 return false;
341/*
342 // Go on with the next
343 if (++iterator != tcp::resolver::iterator())
344 {
345 AsyncConnect(iterator);
346 return;
347 }
348*/
349 // No more entries to try, if we would not put anything else
350 // into the queue anymore it would now return (run() would return)
351
352 // Since we don't want to block the main loop, we wait using an
353 // asnychronous timer
354
355 // FIXME: Should we move this before AsyncConnect() ?
356// AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
357}
358
359void Connection::ConnectIter(tcp::resolver::iterator iterator, const bs::error_code& error)
360{
361 if (ConnectImp(*iterator, error))
362 return;
363
364 // Go on with the next
365 if (++iterator != tcp::resolver::iterator())
366 {
367 AsyncConnect(iterator);
368 return;
369 }
370
371 // No more entries to try, if we would not put anything else
372 // into the queue anymore it would now return (run() would return)
373 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
374}
375
376void Connection::ConnectAddr(const tcp::endpoint &endpoint, const bs::error_code& error)
377{
378 if (ConnectImp(endpoint, error))
379 return;
380
381 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
382}
383
384// FIXME: Async connect should get address and port as an argument
385void Connection::StartConnect()
386{
387 fConnectionStatus = kConnecting;
388
389 if (fEndpoint!=tcp::endpoint())
390 {
391 ostringstream msg;
392 msg << "Trying to connect to " << fEndpoint << "...";
393 if (fMsgConnect!=msg.str())
394 {
395 fMsgConnect = msg.str();
396 Info(msg);
397 }
398
399 AsyncConnect();
400 return;
401 }
402
403 tcp::resolver resolver(get_io_service());
404
405 boost::system::error_code ec;
406
407 tcp::resolver::query query(fAddress, fPort);
408 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
409
410 ostringstream msg;
411 if (!fAddress.empty() || !fPort.empty() || ec)
412 msg << "Trying to connect to " << URL() << "...";
413
414 if (ec)
415 msg << " " << ec.message() << " (" << ec << ")";
416
417 // Only output message if it has changed
418 if (fMsgConnect!=msg.str())
419 {
420 fMsgConnect = msg.str();
421 ec ? Error(msg) : Info(msg);
422 }
423
424 if (ec)
425 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
426 else
427 // Start connection attempts (will also reset deadline counter)
428 AsyncConnect(iterator);
429}
430
431void Connection::SetEndpoint(const string &addr, int port)
432{
433 if (fConnectionStatus>=1)
434 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
435
436 fAddress = addr;
437 fPort = lexical_cast<string>(port);
438}
439
440void Connection::SetEndpoint(const string &addr, const string &port)
441{
442 if (fConnectionStatus>=1 && URL()!=":")
443 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
444
445 fAddress = addr;
446 fPort = port;
447}
448
449void Connection::SetEndpoint(const string &addr)
450{
451 const size_t p0 = addr.find_first_of(':');
452 const size_t p1 = addr.find_last_of(':');
453
454 if (p0==string::npos || p0!=p1)
455 {
456 Error("Connection::SetEndpoint - Wrong format of argument '"+addr+"' ('host:port' expected)");
457 return;
458 }
459
460 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
461}
462
463void Connection::SetEndpoint(const boost::asio::ip::tcp::endpoint &ep)
464{
465 SetEndpoint(ep.address().to_string(), ep.port());
466 fEndpoint = ep;
467}
468
469
470Connection::Connection(ba::io_service& ioservice, ostream &out) :
471MessageImp(out), tcp::socket(ioservice), fLog(0),
472fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
473fConnectionStatus(kDisconnected)
474{
475}
Note: See TracBrowser for help on using the repository browser.