source: trunk/FACT++/src/Connection.cc@ 11021

Last change on this file since 11021 was 10983, checked in by tbretz, 13 years ago
Canceled conecction atempts now stop the connection atempt.
File size: 11.8 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "Connection.h"
9
10#include <boost/bind.hpp>
11#include <boost/lexical_cast.hpp>
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::ip::tcp;
21
22#define DEBUG_TX
23
24 // -------- Abbreviations for starting async tasks ---------
25
26int Connection::Write(const Time &t, const string &txt, int qos)
27{
28 if (fLog)
29 return fLog->Write(t, txt, qos);
30
31 return MessageImp::Write(t, txt, qos);
32}
33
34void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
35{
36 ba::async_read(*this, buffers,
37 boost::bind(&Connection::HandleReceivedData, this,
38 dummy::error, dummy::bytes_transferred, type));
39}
40
41void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
42{
43 ba::async_write(*this, buffers,
44 boost::bind(&Connection::HandleSentData, this,
45 dummy::error, dummy::bytes_transferred));
46}
47
48void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
49 void (Connection::*handler)(const bs::error_code&))
50{
51 // - The boost::asio::basic_deadline_timer::expires_from_now()
52 // function cancels any pending asynchronous waits, and returns
53 // the number of asynchronous waits that were cancelled. If it
54 // returns 0 then you were too late and the wait handler has
55 // already been executed, or will soon be executed. If it
56 // returns 1 then the wait handler was successfully cancelled.
57 // - If a wait handler is cancelled, the bs::error_code passed to
58 // it contains the value bs::error::operation_aborted.
59 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
60
61 timer.async_wait(boost::bind(handler, this, dummy::error));
62}
63
64void Connection::AsyncConnect(tcp::resolver::iterator iterator)
65{
66 tcp::endpoint endpoint = *iterator;
67
68 // AsyncConnect + Deadline
69 async_connect(endpoint,
70 boost::bind(&Connection::ConnectImp,
71 this, ba::placeholders::error,
72 iterator));
73
74 // We will get a "Connection timeout anyway"
75 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
76}
77
78// ------------------------ close --------------------------
79// close from another thread
80void Connection::CloseImp(bool restart)
81{
82 if (IsConnected())
83 {
84 ostringstream str;
85 str << "Connection closed to " << URL() << ".";
86 Info(str);
87 }
88
89 // Stop any pending connection attempt
90 fConnectionTimer.cancel();
91
92 // Close possible open connections
93 close();
94
95 // Reset the connection status
96 fConnectionStatus = kDisconnected;
97
98 // Stop deadline counters
99 fInTimeout.cancel();
100 fOutTimeout.cancel();
101
102 // Empty output queue
103 fOutQueue.clear();
104
105 if (!restart || IsConnecting())
106 return;
107
108 // We need some timeout before reconnecting!
109 // And we have to check if we are alreayd trying to connect
110 // We shoudl wait until all operations in progress were canceled
111
112 // Start trying to reconnect
113 fMsgConnect = "";
114 fErrConnect = "";
115 StartConnect();
116}
117
118void Connection::PostClose(bool restart)
119{
120 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
121}
122
123// ------------------------ write --------------------------
124void Connection::HandleWriteTimeout(const bs::error_code &error)
125{
126 if (error==ba::error::basic_errors::operation_aborted)
127 return;
128
129 // 125: Operation canceled (bs::error_code(125, bs::system_category))
130 if (error)
131 {
132 ostringstream str;
133 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
134 Error(str);
135
136 CloseImp();
137 return;
138 }
139
140 if (!is_open())
141 {
142 // For example: Here we could schedule a new accept if we
143 // would not want to allow two connections at the same time.
144 return;
145 }
146
147 // Check whether the deadline has passed. We compare the deadline
148 // against the current time since a new asynchronous operation
149 // may have moved the deadline before this actor had a chance
150 // to run.
151 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
152 return;
153
154 Error("fOutTimeout has expired, writing data to "+URL());
155
156 CloseImp();
157}
158
159void Connection::HandleSentData(const bs::error_code& error, size_t n)
160{
161 if (error && error != ba::error::not_connected)
162 {
163 ostringstream str;
164 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
165 Error(str);
166
167 CloseImp();
168 return;
169 }
170
171 if (error == ba::error::not_connected)
172 {
173 ostringstream msg;
174 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
175 Warn(msg);
176 }
177 else
178 {
179#ifdef DEBUG_TX
180 ostringstream msg;
181 msg << n << " bytes successfully sent to " << URL();
182 Message(msg);
183#endif
184 }
185
186 // This is "thread" safe because SendMessage and HandleSentMessage
187 // are serialized in the EventQueue. Note: Do not call these
188 // functions directly from any other place then Handlers, use
189 // PostMessage instead
190 fOutQueue.pop_front();
191
192 if (fOutQueue.empty())
193 {
194 // Queue went empty, remove deadline
195 fOutTimeout.cancel();
196 return;
197 }
198
199 // AsyncWrite + Deadline
200 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
201 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
202}
203
204// It is important that when SendMessageImp is called, or to be more
205// precise boost::bind is called, teh data is copied!
206void Connection::SendMessageImp(const vector<char> msg)
207{
208 /*
209 if (!fConnectionEstablished)
210 {
211 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
212 return;
213 }*/
214
215 const bool first_message_in_queue = fOutQueue.empty();
216
217 // This is "thread" safe because SendMessage and HandleSentMessage
218 // are serialized in the EventQueue. Note: Do not call these
219 // functions directly from any other place then Handlers, use
220 // PostMessage instead
221 fOutQueue.push_back(msg);
222
223 if (!first_message_in_queue)
224 return;
225
226 // AsyncWrite + Deadline
227 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
228 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
229}
230
231void Connection::PostMessage(const void *ptr, size_t max)
232{
233 const vector<char> msg(reinterpret_cast<const char*>(ptr),
234 reinterpret_cast<const char*>(ptr)+max);
235
236 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
237}
238
239void Connection::PostMessage(const string &cmd, size_t max)
240{
241 if (max==size_t(-1))
242 max = cmd.length()+1;
243
244 vector <char>msg(max);
245
246 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
247
248 PostMessage(msg);
249}
250
251void Connection::HandleConnectionTimer(const bs::error_code &error)
252{
253 if (error==ba::error::basic_errors::operation_aborted)
254 return;
255
256 if (error)
257 {
258 ostringstream str;
259 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
260 Error(str);
261 }
262
263 if (is_open())
264 {
265 // For example: Here we could schedule a new accept if we
266 // would not want to allow two connections at the same time.
267 return;
268 }
269
270 // Check whether the deadline has passed. We compare the deadline
271 // against the current time since a new asynchronous operation
272 // may have moved the deadline before this actor had a chance
273 // to run.
274 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
275 StartConnect();
276}
277
278void Connection::ConnectImp(const bs::error_code& error,
279 tcp::resolver::iterator iterator)
280{
281 tcp::endpoint endpoint = *iterator;
282
283 const string host = endpoint.port()==0 ? "" :
284 endpoint.address().to_string()+":"+lexical_cast<string>(endpoint.port());
285
286 // Connection established
287 if (!error)
288 {
289 Info("Connection established to "+host+"...");
290
291 fConnectionStatus = kConnected;
292
293 ConnectionEstablished();
294 return;
295 }
296
297 // If returning from run will lead to deletion of this
298 // instance, close() is not needed (maybe implicitly called).
299 // If run is called again, close() is needed here. Otherwise:
300 // Software caused connection abort when we try to resolve
301 // the endpoint again.
302 CloseImp(false);
303
304 ostringstream msg;
305 if (!host.empty())
306 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
307
308 if (fErrConnect!=msg.str())
309 {
310 if (error!=ba::error::basic_errors::connection_refused)
311 fMsgConnect = "";
312 fErrConnect = msg.str();
313 Warn(fErrConnect);
314 }
315
316 if (error==ba::error::basic_errors::operation_aborted)
317 return;
318
319 fConnectionStatus = kConnecting;
320
321 // Go on with the next
322 if (++iterator != tcp::resolver::iterator())
323 {
324 AsyncConnect(iterator);
325 return;
326 }
327
328 // No more entries to try, if we would not put anything else
329 // into the queue anymore it would now return (run() would return)
330
331 // Since we don't want to block the main loop, we wait using an
332 // asnychronous timer
333
334 // FIXME: Should we move this before AsyncConnect() ?
335 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
336}
337
338// FIXME: Async connect should get address and port as an argument
339void Connection::StartConnect()
340{
341 fConnectionStatus = kConnecting;
342
343 tcp::resolver resolver(get_io_service());
344
345 boost::system::error_code ec;
346
347 tcp::resolver::query query(fAddress, fPort);
348 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
349
350 ostringstream msg;
351 if (!fAddress.empty() || !fPort.empty() || ec)
352 msg << "Trying to connect to " << URL() << "...";
353
354 if (ec)
355 msg << " " << ec.message() << " (" << ec << ")";
356
357 // Only output message if it has changed
358 if (fMsgConnect!=msg.str())
359 {
360 fMsgConnect = msg.str();
361 ec ? Error(msg) : Info(msg);
362 }
363
364 if (ec)
365 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
366 else
367 // Start connection attempts (will also reset deadline counter)
368 AsyncConnect(iterator);
369}
370
371void Connection::SetEndpoint(const string &addr, int port)
372{
373 if (fConnectionStatus>=1)
374 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
375
376 fAddress = addr;
377 fPort = lexical_cast<string>(port);
378}
379
380void Connection::SetEndpoint(const string &addr, const string &port)
381{
382 if (fConnectionStatus>=1 && URL()!=":")
383 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
384
385 fAddress = addr;
386 fPort = port;
387}
388
389void Connection::SetEndpoint(const string &addr)
390{
391 const size_t p0 = addr.find_first_of(':');
392 const size_t p1 = addr.find_last_of(':');
393
394 if (p0==string::npos || p0!=p1)
395 {
396 Error("Connection::SetEndpoint - Wrong format of argument '"+addr+"' ('host:port' expected)");
397 return;
398 }
399
400 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
401}
402
403
404
405Connection::Connection(ba::io_service& ioservice, ostream &out) :
406MessageImp(out), tcp::socket(ioservice), fLog(0),
407fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
408fConnectionStatus(kDisconnected)
409{
410}
Note: See TracBrowser for help on using the repository browser.