source: trunk/FACT++/src/Connection.cc@ 11164

Last change on this file since 11164 was 11164, checked in by tbretz, 13 years ago
Switch on keep alive packates with 10s timeouts.
File size: 12.0 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "Connection.h"
9
10#include <boost/bind.hpp>
11#include <boost/lexical_cast.hpp>
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::ip::tcp;
21
22 // -------- Abbreviations for starting async tasks ---------
23
24int Connection::Write(const Time &t, const string &txt, int qos)
25{
26 if (fLog)
27 return fLog->Write(t, txt, qos);
28
29 return MessageImp::Write(t, txt, qos);
30}
31
32void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
33{
34 ba::async_read(*this, buffers,
35 boost::bind(&Connection::HandleReceivedData, this,
36 dummy::error, dummy::bytes_transferred, type));
37}
38
39void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
40{
41 ba::async_write(*this, buffers,
42 boost::bind(&Connection::HandleSentData, this,
43 dummy::error, dummy::bytes_transferred));
44}
45
46void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
47 void (Connection::*handler)(const bs::error_code&))
48{
49 // - The boost::asio::basic_deadline_timer::expires_from_now()
50 // function cancels any pending asynchronous waits, and returns
51 // the number of asynchronous waits that were cancelled. If it
52 // returns 0 then you were too late and the wait handler has
53 // already been executed, or will soon be executed. If it
54 // returns 1 then the wait handler was successfully cancelled.
55 // - If a wait handler is cancelled, the bs::error_code passed to
56 // it contains the value bs::error::operation_aborted.
57 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
58
59 timer.async_wait(boost::bind(handler, this, dummy::error));
60}
61
62void Connection::AsyncConnect(tcp::resolver::iterator iterator)
63{
64 tcp::endpoint endpoint = *iterator;
65
66 // AsyncConnect + Deadline
67 async_connect(endpoint,
68 boost::bind(&Connection::ConnectImp,
69 this, ba::placeholders::error,
70 iterator));
71
72 // We will get a "Connection timeout anyway"
73 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
74}
75
76// ------------------------ close --------------------------
77// close from another thread
78void Connection::CloseImp(bool restart)
79{
80 if (IsConnected())
81 {
82 ostringstream str;
83 str << "Connection closed to " << URL() << ".";
84 Info(str);
85 }
86
87 // Stop any pending connection attempt
88 fConnectionTimer.cancel();
89
90 // Close possible open connections
91 close();
92
93 // Reset the connection status
94 fConnectionStatus = kDisconnected;
95
96 // Stop deadline counters
97 fInTimeout.cancel();
98 fOutTimeout.cancel();
99
100 // Empty output queue
101 fOutQueue.clear();
102
103 if (!restart || IsConnecting())
104 return;
105
106 // We need some timeout before reconnecting!
107 // And we have to check if we are alreayd trying to connect
108 // We shoudl wait until all operations in progress were canceled
109
110 // Start trying to reconnect
111 fMsgConnect = "";
112 fErrConnect = "";
113 StartConnect();
114}
115
116void Connection::PostClose(bool restart)
117{
118 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
119}
120
121// ------------------------ write --------------------------
122void Connection::HandleWriteTimeout(const bs::error_code &error)
123{
124 if (error==ba::error::basic_errors::operation_aborted)
125 return;
126
127 // 125: Operation canceled (bs::error_code(125, bs::system_category))
128 if (error)
129 {
130 ostringstream str;
131 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
132 Error(str);
133
134 CloseImp();
135 return;
136 }
137
138 if (!is_open())
139 {
140 // For example: Here we could schedule a new accept if we
141 // would not want to allow two connections at the same time.
142 return;
143 }
144
145 // Check whether the deadline has passed. We compare the deadline
146 // against the current time since a new asynchronous operation
147 // may have moved the deadline before this actor had a chance
148 // to run.
149 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
150 return;
151
152 Error("fOutTimeout has expired, writing data to "+URL());
153
154 CloseImp();
155}
156
157void Connection::HandleSentData(const bs::error_code& error, size_t n)
158{
159 if (error && error != ba::error::not_connected)
160 {
161 ostringstream str;
162 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
163 Error(str);
164
165 CloseImp();
166 return;
167 }
168
169 if (error == ba::error::not_connected)
170 {
171 ostringstream msg;
172 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
173 Warn(msg);
174 }
175 else
176 {
177 if (fDebugTx)
178 {
179 ostringstream msg;
180 msg << n << " bytes successfully sent to " << URL();
181 Message(msg);
182 }
183 }
184
185 // This is "thread" safe because SendMessage and HandleSentMessage
186 // are serialized in the EventQueue. Note: Do not call these
187 // functions directly from any other place then Handlers, use
188 // PostMessage instead
189 fOutQueue.pop_front();
190
191 if (fOutQueue.empty())
192 {
193 // Queue went empty, remove deadline
194 fOutTimeout.cancel();
195 return;
196 }
197
198 // AsyncWrite + Deadline
199 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
200 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
201}
202
203// It is important that when SendMessageImp is called, or to be more
204// precise boost::bind is called, teh data is copied!
205void Connection::SendMessageImp(const vector<char> msg)
206{
207 /*
208 if (!fConnectionEstablished)
209 {
210 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
211 return;
212 }*/
213
214 const bool first_message_in_queue = fOutQueue.empty();
215
216 // This is "thread" safe because SendMessage and HandleSentMessage
217 // are serialized in the EventQueue. Note: Do not call these
218 // functions directly from any other place then Handlers, use
219 // PostMessage instead
220 fOutQueue.push_back(msg);
221
222 if (!first_message_in_queue)
223 return;
224
225 // AsyncWrite + Deadline
226 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
227 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
228}
229
230void Connection::PostMessage(const void *ptr, size_t max)
231{
232 const vector<char> msg(reinterpret_cast<const char*>(ptr),
233 reinterpret_cast<const char*>(ptr)+max);
234
235 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
236}
237
238void Connection::PostMessage(const string &cmd, size_t max)
239{
240 if (max==size_t(-1))
241 max = cmd.length()+1;
242
243 vector <char>msg(max);
244
245 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
246
247 PostMessage(msg);
248}
249
250void Connection::HandleConnectionTimer(const bs::error_code &error)
251{
252 if (error==ba::error::basic_errors::operation_aborted)
253 return;
254
255 if (error)
256 {
257 ostringstream str;
258 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
259 Error(str);
260 }
261
262 if (is_open())
263 {
264 // For example: Here we could schedule a new accept if we
265 // would not want to allow two connections at the same time.
266 return;
267 }
268
269 // Check whether the deadline has passed. We compare the deadline
270 // against the current time since a new asynchronous operation
271 // may have moved the deadline before this actor had a chance
272 // to run.
273 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
274 StartConnect();
275}
276
277void Connection::ConnectImp(const bs::error_code& error,
278 tcp::resolver::iterator iterator)
279{
280 tcp::endpoint endpoint = *iterator;
281
282 const string host = endpoint.port()==0 ? "" :
283 endpoint.address().to_string()+":"+lexical_cast<string>(endpoint.port());
284
285 // Connection established
286 if (!error)
287 {
288 set_option(socket_base::keep_alive(true));
289
290 const int optval = 10;
291 // First keep alive after 10s
292 setsockopt(native(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
293 // New keep alive after 10s
294 setsockopt(native(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
295
296 Info("Connection established to "+host+"...");
297
298 fConnectionStatus = kConnected;
299
300 ConnectionEstablished();
301 return;
302 }
303
304 // If returning from run will lead to deletion of this
305 // instance, close() is not needed (maybe implicitly called).
306 // If run is called again, close() is needed here. Otherwise:
307 // Software caused connection abort when we try to resolve
308 // the endpoint again.
309 CloseImp(false);
310
311 ostringstream msg;
312 if (!host.empty())
313 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
314
315 if (fErrConnect!=msg.str())
316 {
317 if (error!=ba::error::basic_errors::connection_refused)
318 fMsgConnect = "";
319 fErrConnect = msg.str();
320 Warn(fErrConnect);
321 }
322
323 if (error==ba::error::basic_errors::operation_aborted)
324 return;
325
326 fConnectionStatus = kConnecting;
327
328 // Go on with the next
329 if (++iterator != tcp::resolver::iterator())
330 {
331 AsyncConnect(iterator);
332 return;
333 }
334
335 // No more entries to try, if we would not put anything else
336 // into the queue anymore it would now return (run() would return)
337
338 // Since we don't want to block the main loop, we wait using an
339 // asnychronous timer
340
341 // FIXME: Should we move this before AsyncConnect() ?
342 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
343}
344
345// FIXME: Async connect should get address and port as an argument
346void Connection::StartConnect()
347{
348 fConnectionStatus = kConnecting;
349
350 tcp::resolver resolver(get_io_service());
351
352 boost::system::error_code ec;
353
354 tcp::resolver::query query(fAddress, fPort);
355 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
356
357 ostringstream msg;
358 if (!fAddress.empty() || !fPort.empty() || ec)
359 msg << "Trying to connect to " << URL() << "...";
360
361 if (ec)
362 msg << " " << ec.message() << " (" << ec << ")";
363
364 // Only output message if it has changed
365 if (fMsgConnect!=msg.str())
366 {
367 fMsgConnect = msg.str();
368 ec ? Error(msg) : Info(msg);
369 }
370
371 if (ec)
372 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
373 else
374 // Start connection attempts (will also reset deadline counter)
375 AsyncConnect(iterator);
376}
377
378void Connection::SetEndpoint(const string &addr, int port)
379{
380 if (fConnectionStatus>=1)
381 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
382
383 fAddress = addr;
384 fPort = lexical_cast<string>(port);
385}
386
387void Connection::SetEndpoint(const string &addr, const string &port)
388{
389 if (fConnectionStatus>=1 && URL()!=":")
390 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
391
392 fAddress = addr;
393 fPort = port;
394}
395
396void Connection::SetEndpoint(const string &addr)
397{
398 const size_t p0 = addr.find_first_of(':');
399 const size_t p1 = addr.find_last_of(':');
400
401 if (p0==string::npos || p0!=p1)
402 {
403 Error("Connection::SetEndpoint - Wrong format of argument '"+addr+"' ('host:port' expected)");
404 return;
405 }
406
407 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
408}
409
410
411
412Connection::Connection(ba::io_service& ioservice, ostream &out) :
413MessageImp(out), tcp::socket(ioservice), fLog(0),
414fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
415fConnectionStatus(kDisconnected)
416{
417}
Note: See TracBrowser for help on using the repository browser.