source: trunk/FACT++/src/Connection.cc@ 10979

Last change on this file since 10979 was 10925, checked in by tbretz, 13 years ago
Suppress notice about sent data by define.
File size: 11.8 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "Connection.h"
9
10#include <boost/bind.hpp>
11#include <boost/lexical_cast.hpp>
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::ip::tcp;
21
22
23 // -------- Abbreviations for starting async tasks ---------
24
25int Connection::Write(const Time &t, const string &txt, int qos)
26{
27 if (fLog)
28 return fLog->Write(t, txt, qos);
29
30 return MessageImp::Write(t, txt, qos);
31}
32
33void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
34{
35 ba::async_read(*this, buffers,
36 boost::bind(&Connection::HandleReceivedData, this,
37 dummy::error, dummy::bytes_transferred, type));
38}
39
40void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
41{
42 ba::async_write(*this, buffers,
43 boost::bind(&Connection::HandleSentData, this,
44 dummy::error, dummy::bytes_transferred));
45}
46
47void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
48 void (Connection::*handler)(const bs::error_code&))
49{
50 // - The boost::asio::basic_deadline_timer::expires_from_now()
51 // function cancels any pending asynchronous waits, and returns
52 // the number of asynchronous waits that were cancelled. If it
53 // returns 0 then you were too late and the wait handler has
54 // already been executed, or will soon be executed. If it
55 // returns 1 then the wait handler was successfully cancelled.
56 // - If a wait handler is cancelled, the bs::error_code passed to
57 // it contains the value bs::error::operation_aborted.
58 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
59
60 timer.async_wait(boost::bind(handler, this, dummy::error));
61}
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65 tcp::endpoint endpoint = *iterator;
66
67 // AsyncConnect + Deadline
68 async_connect(endpoint,
69 boost::bind(&Connection::ConnectImp,
70 this, ba::placeholders::error,
71 iterator));
72
73 // We will get a "Connection timeout anyway"
74 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
75}
76
77// ------------------------ close --------------------------
78// close from another thread
79void Connection::CloseImp(bool restart)
80{
81 if (IsConnected())
82 {
83 ostringstream str;
84 str << "Connection closed to " << URL() << ".";
85 Info(str);
86 }
87
88 // Stop any pending connection attempt
89 fConnectionTimer.cancel();
90
91 // Close possible open connections
92 close();
93
94 // Reset the connection status
95 fConnectionStatus = kDisconnected;
96
97 // Stop deadline counters
98 fInTimeout.cancel();
99 fOutTimeout.cancel();
100
101 // Empty output queue
102 fOutQueue.clear();
103
104 if (!restart || IsConnecting())
105 return;
106
107 // We need some timeout before reconnecting!
108 // And we have to check if we are alreayd trying to connect
109 // We shoudl wait until all operations in progress were canceled
110
111 // Start trying to reconnect
112 fMsgConnect = "";
113 fErrConnect = "";
114 StartConnect();
115}
116
117void Connection::PostClose(bool restart)
118{
119 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
120}
121
122// ------------------------ write --------------------------
123void Connection::HandleWriteTimeout(const bs::error_code &error)
124{
125 if (error==ba::error::basic_errors::operation_aborted)
126 return;
127
128 // 125: Operation canceled (bs::error_code(125, bs::system_category))
129 if (error)
130 {
131 ostringstream str;
132 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
133 Error(str);
134
135 CloseImp();
136 return;
137 }
138
139 if (!is_open())
140 {
141 // For example: Here we could schedule a new accept if we
142 // would not want to allow two connections at the same time.
143 return;
144 }
145
146 // Check whether the deadline has passed. We compare the deadline
147 // against the current time since a new asynchronous operation
148 // may have moved the deadline before this actor had a chance
149 // to run.
150 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
151 return;
152
153 Error("fOutTimeout has expired, writing data to "+URL());
154
155 CloseImp();
156}
157
158void Connection::HandleSentData(const bs::error_code& error, size_t n)
159{
160 if (error && error != ba::error::not_connected)
161 {
162 ostringstream str;
163 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
164 Error(str);
165
166 CloseImp();
167 return;
168 }
169
170 if (error == ba::error::not_connected)
171 {
172 ostringstream msg;
173 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
174 Warn(msg);
175 }
176 else
177 {
178#ifdef DEBUG_TX
179 ostringstream msg;
180 msg << n << " bytes successfully sent to " << URL();
181 Message(msg);
182#endif
183 }
184
185 // This is "thread" safe because SendMessage and HandleSentMessage
186 // are serialized in the EventQueue. Note: Do not call these
187 // functions directly from any other place then Handlers, use
188 // PostMessage instead
189 fOutQueue.pop_front();
190
191 if (fOutQueue.empty())
192 {
193 // Queue went empty, remove deadline
194 fOutTimeout.cancel();
195 return;
196 }
197
198 // AsyncWrite + Deadline
199 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
200 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
201}
202
203// It is important that when SendMessageImp is called, or to be more
204// precise boost::bind is called, teh data is copied!
205void Connection::SendMessageImp(const vector<char> msg)
206{
207 /*
208 if (!fConnectionEstablished)
209 {
210 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
211 return;
212 }*/
213
214 const bool first_message_in_queue = fOutQueue.empty();
215
216 // This is "thread" safe because SendMessage and HandleSentMessage
217 // are serialized in the EventQueue. Note: Do not call these
218 // functions directly from any other place then Handlers, use
219 // PostMessage instead
220 fOutQueue.push_back(msg);
221
222 if (!first_message_in_queue)
223 return;
224
225 // AsyncWrite + Deadline
226 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
227 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
228}
229
230void Connection::PostMessage(const void *ptr, size_t max)
231{
232 const vector<char> msg(reinterpret_cast<const char*>(ptr),
233 reinterpret_cast<const char*>(ptr)+max);
234
235 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
236}
237
238void Connection::PostMessage(const string &cmd, size_t max)
239{
240 if (max==size_t(-1))
241 max = cmd.length()+1;
242
243 vector <char>msg(max);
244
245 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
246
247 PostMessage(msg);
248}
249
250void Connection::HandleConnectionTimer(const bs::error_code &error)
251{
252 if (error==ba::error::basic_errors::operation_aborted)
253 return;
254
255 if (error)
256 {
257 ostringstream str;
258 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
259 Error(str);
260 }
261
262 if (is_open())
263 {
264 // For example: Here we could schedule a new accept if we
265 // would not want to allow two connections at the same time.
266 return;
267 }
268
269 // Check whether the deadline has passed. We compare the deadline
270 // against the current time since a new asynchronous operation
271 // may have moved the deadline before this actor had a chance
272 // to run.
273 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
274 StartConnect();
275}
276
277void Connection::ConnectImp(const bs::error_code& error,
278 tcp::resolver::iterator iterator)
279{
280 tcp::endpoint endpoint = *iterator;
281
282 const string host = endpoint.port()==0 ? "" :
283 endpoint.address().to_string()+":"+lexical_cast<string>(endpoint.port());
284
285 // Connection established
286 if (!error)
287 {
288 Info("Connection established to "+host+"...");
289
290 fConnectionStatus = kConnected;
291
292 ConnectionEstablished();
293 return;
294 }
295
296 // If returning from run will lead to deletion of this
297 // instance, close() is not needed (maybe implicitly called).
298 // If run is called again, close() is needed here. Otherwise:
299 // Software caused connection abort when we try to resolve
300 // the endpoint again.
301 CloseImp(false);
302 fConnectionStatus = kConnecting;
303
304 // FIXME: "No route to host (syytem::113)" gives error:"Operation already in progress. (system:114)"
305 ostringstream msg;
306 if (!host.empty())
307 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
308
309 if (fErrConnect!=msg.str())
310 {
311 if (error!=ba::error::basic_errors::connection_refused)
312 fMsgConnect = "";
313 fErrConnect = msg.str();
314 Warn(fErrConnect);
315 }
316
317 // Go on with the next
318 if (++iterator != tcp::resolver::iterator())
319 {
320 AsyncConnect(iterator);
321 return;
322 }
323
324 // No more entries to try, if we would not put anything else
325 // into the queue anymore it would now return (run() would return)
326
327 // Since we don't want to block the main loop, we wait using an
328 // asnychronous timer
329
330 // FIXME: Should we move this before AsyncConnect() ?
331 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
332}
333
334// FIXME: Async connect should get address and port as an argument
335void Connection::StartConnect()
336{
337 fConnectionStatus = kConnecting;
338
339 tcp::resolver resolver(get_io_service());
340
341 boost::system::error_code ec;
342
343 tcp::resolver::query query(fAddress, fPort);
344 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
345
346 ostringstream msg;
347 if (!fAddress.empty() || !fPort.empty() || ec)
348 msg << "Trying to connect to " << URL() << "...";
349
350 if (ec)
351 msg << " " << ec.message() << " (" << ec << ")";
352
353 // Only output message if it has changed
354 if (fMsgConnect!=msg.str())
355 {
356 fMsgConnect = msg.str();
357 ec ? Error(msg) : Info(msg);
358 }
359
360 if (ec)
361 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
362 else
363 // Start connection attempts (will also reset deadline counter)
364 AsyncConnect(iterator);
365}
366
367void Connection::SetEndpoint(const string &addr, int port)
368{
369 if (fConnectionStatus>=1)
370 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
371
372 fAddress = addr;
373 fPort = lexical_cast<string>(port);
374}
375
376void Connection::SetEndpoint(const string &addr, const string &port)
377{
378 if (fConnectionStatus>=1 && URL()!=":")
379 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
380
381 fAddress = addr;
382 fPort = port;
383}
384
385void Connection::SetEndpoint(const string &addr)
386{
387 const size_t p0 = addr.find_first_of(':');
388 const size_t p1 = addr.find_last_of(':');
389
390 if (p0==string::npos || p0!=p1)
391 {
392 Error("Connection::SetEndpoint - Wrong format of argument '"+addr+"' ('host:port' expected)");
393 return;
394 }
395
396 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
397}
398
399
400
401Connection::Connection(ba::io_service& ioservice, ostream &out) :
402MessageImp(out), tcp::socket(ioservice), fLog(0),
403fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
404fConnectionStatus(kDisconnected)
405{
406}
Note: See TracBrowser for help on using the repository browser.