source: trunk/FACT++/src/Connection.cc@ 10420

Last change on this file since 10420 was 10373, checked in by tbretz, 14 years ago
Changed basis of MessageImp::Write function from const char* to std::string& to be able to boost::bind the function, because boost::bind will hold an internal copy of the string.
File size: 11.3 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "Connection.h"
9
10#include <boost/bind.hpp>
11#include <boost/lexical_cast.hpp>
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::ip::tcp;
21
22
23 // -------- Abbreviations for starting async tasks ---------
24
25int Connection::Write(const Time &t, const string &txt, int qos)
26{
27 if (fLog)
28 return fLog->Write(t, txt, qos);
29
30 return MessageImp::Write(t, txt, qos);
31}
32
33void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
34{
35 ba::async_read(*this, buffers,
36 boost::bind(&Connection::HandleReceivedData, this,
37 dummy::error, dummy::bytes_transferred, type));
38}
39
40void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
41{
42 ba::async_write(*this, buffers,
43 boost::bind(&Connection::HandleSentData, this,
44 dummy::error, dummy::bytes_transferred));
45}
46
47void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
48 void (Connection::*handler)(const bs::error_code&))
49{
50 // - The boost::asio::basic_deadline_timer::expires_from_now()
51 // function cancels any pending asynchronous waits, and returns
52 // the number of asynchronous waits that were cancelled. If it
53 // returns 0 then you were too late and the wait handler has
54 // already been executed, or will soon be executed. If it
55 // returns 1 then the wait handler was successfully cancelled.
56 // - If a wait handler is cancelled, the bs::error_code passed to
57 // it contains the value bs::error::operation_aborted.
58 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
59
60 timer.async_wait(boost::bind(handler, this, dummy::error));
61}
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65 tcp::endpoint endpoint = *iterator;
66
67 // AsyncConnect + Deadline
68 async_connect(endpoint,
69 boost::bind(&Connection::ConnectImp,
70 this, ba::placeholders::error,
71 iterator));
72
73 // We will get a "Connection timeout anyway"
74 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
75}
76
77// ------------------------ close --------------------------
78// close from another thread
79void Connection::CloseImp(bool restart)
80{
81 if (IsConnected())
82 {
83 stringstream str;
84 str << "Connection closed to " << URL() << ".";
85 Message(str);
86 }
87
88 // Stop any pending connection attempt
89 fConnectionTimer.cancel();
90
91 // Close possible open connections
92 close();
93
94 // Reset the connection status
95 fConnectionStatus = kDisconnected;
96
97 // Stop deadline counters
98 fInTimeout.cancel();
99 fOutTimeout.cancel();
100
101 // Empty output queue
102 fOutQueue.clear();
103
104 if (!restart || IsConnecting())
105 return;
106
107 // We need some timeout before reconnecting!
108 // And we have to check if we are alreayd trying to connect
109 // We shoudl wait until all operations in progress were canceled
110
111 // Start trying to reconnect
112 fMsgConnect = "";
113 fErrConnect = "";
114 StartConnect();
115}
116
117void Connection::PostClose(bool restart)
118{
119 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
120}
121
122// ------------------------ write --------------------------
123void Connection::HandleWriteTimeout(const bs::error_code &error)
124{
125 // 125: Operation canceled (bs::error_code(125, bs::system_category))
126 if (error && error!=ba::error::basic_errors::operation_aborted)
127 {
128 stringstream str;
129 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
130 Error(str);
131
132 CloseImp();
133 return;
134 }
135
136 if (!is_open())
137 {
138 // For example: Here we could schedule a new accept if we
139 // would not want to allow two connections at the same time.
140 return;
141 }
142
143 // Check whether the deadline has passed. We compare the deadline
144 // against the current time since a new asynchronous operation
145 // may have moved the deadline before this actor had a chance
146 // to run.
147 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
148 return;
149
150 Error("fOutTimeout has expired, writing data to "+URL());
151
152 CloseImp();
153}
154
155void Connection::HandleSentData(const bs::error_code& error, size_t n)
156{
157 if (error)
158 {
159 stringstream str;
160 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
161 Error(str);
162
163 CloseImp();
164 return;
165 }
166
167 stringstream msg;
168 msg << n << " bytes successfully sent to " << URL();
169 Message(msg);
170
171 // This is "thread" safe because SendMessage and HandleSentMessage
172 // are serialized in the EventQueue. Note: Do not call these
173 // functions directly from any other place then Handlers, use
174 // PostMessage instead
175 fOutQueue.pop_front();
176
177 if (fOutQueue.empty())
178 {
179 // Queue went empty, remove deadline
180 fOutTimeout.cancel();
181 return;
182 }
183
184 // AsyncWrite + Deadline
185 AsyncWrite(ba::const_buffers_1(&fOutQueue.front()[0],fOutQueue.front().size())/*, &Connection::HandleSentData*/);
186 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
187}
188
189// It is important that when SendMessageImp is called, or to be more
190// precise boost::bind is called, teh data is copied!
191void Connection::SendMessageImp(const vector<char> msg)
192{
193 /*
194 if (!fConnectionEstablished)
195 {
196 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
197 return;
198 }*/
199
200 const bool first_message_in_queue = fOutQueue.empty();
201
202 // This is "thread" safe because SendMessage and HandleSentMessage
203 // are serialized in the EventQueue. Note: Do not call these
204 // functions directly from any other place then Handlers, use
205 // PostMessage instead
206 fOutQueue.push_back(msg);
207
208 if (!first_message_in_queue)
209 return;
210
211 // AsyncWrite + Deadline
212 AsyncWrite(ba::const_buffers_1(&fOutQueue.front()[0],fOutQueue.front().size())/*, &Connection::HandleSentData*/);
213 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
214}
215
216void Connection::PostMessage(const void *ptr, size_t max)
217{
218 const vector<char> msg(reinterpret_cast<const char*>(ptr),
219 reinterpret_cast<const char*>(ptr)+max);
220
221 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
222}
223
224void Connection::PostMessage(const string &cmd, size_t max)
225{
226 if (max==size_t(-1))
227 max = cmd.length()+1;
228
229 vector <char>msg(max);
230
231 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
232
233 PostMessage(msg);
234}
235
236void Connection::HandleConnectionTimer(const bs::error_code &error)
237{
238 // 125: Operation canceled
239 if (error && error!=ba::error::basic_errors::operation_aborted)
240 {
241 stringstream str;
242 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
243 Error(str);
244 }
245
246 if (is_open())
247 {
248 // For example: Here we could schedule a new accept if we
249 // would not want to allow two connections at the same time.
250 return;
251 }
252
253 // Check whether the deadline has passed. We compare the deadline
254 // against the current time since a new asynchronous operation
255 // may have moved the deadline before this actor had a chance
256 // to run.
257 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
258 StartConnect();
259}
260
261void Connection::ConnectImp(const bs::error_code& error,
262 tcp::resolver::iterator iterator)
263{
264 tcp::endpoint endpoint = *iterator;
265
266 const string host = endpoint.port()==0 ? "" :
267 endpoint.address().to_string()+":"+lexical_cast<string>(endpoint.port());
268
269 // Connection established
270 if (!error)
271 {
272 Message("Connection established to "+host+"...");
273
274 fConnectionStatus = kConnected;
275
276 ConnectionEstablished();
277 return;
278 }
279
280 // If returning from run will lead to deletion of this
281 // instance, close() is not needed (maybe implicitly called).
282 // If run is called again, close() is needed here. Otherwise:
283 // Software caused connection abort when we try to resolve
284 // the endpoint again.
285 CloseImp(false);
286
287 stringstream msg;
288 if (!host.empty())
289 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
290
291 if (fErrConnect!=msg.str())
292 {
293 if (error!=ba::error::basic_errors::connection_refused)
294 fMsgConnect = "";
295 fErrConnect = msg.str();
296 Warn(fErrConnect);
297 }
298
299 // Go on with the next
300 if (++iterator != tcp::resolver::iterator())
301 {
302 AsyncConnect(iterator);
303 return;
304 }
305
306 // No more entries to try, if we would not put anything else
307 // into the queue anymore it would now return (run() would return)
308
309 // Since we don't want to block the main loop, we wait using an
310 // asnychronous timer
311
312 // FIXME: Should we move this before AsyncConnect() ?
313 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
314}
315
316// FIXME: Async connect should get address and port as an argument
317void Connection::StartConnect()
318{
319 fConnectionStatus = kConnecting;
320
321 tcp::resolver resolver(get_io_service());
322
323 boost::system::error_code ec;
324
325 tcp::resolver::query query(fAddress, fPort);
326 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
327
328 stringstream msg;
329 if (!fAddress.empty() || !fPort.empty() || ec)
330 msg << "Trying to connect to " << URL() << "...";
331
332 if (ec)
333 msg << " " << ec.message() << " (" << ec << ")";
334
335 // Only output message if it has changed
336 if (fMsgConnect!=msg.str())
337 {
338 fMsgConnect = msg.str();
339 ec ? Error(msg) : Message(msg);
340 }
341
342 if (ec)
343 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
344 else
345 // Start connection attempts (will also reset deadline counter)
346 AsyncConnect(iterator);
347}
348
349void Connection::SetEndpoint(const string &addr, int port)
350{
351 if (fConnectionStatus>=1)
352 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
353
354 fAddress = addr;
355 fPort = lexical_cast<string>(port);
356}
357
358void Connection::SetEndpoint(const string &addr, const string &port)
359{
360 if (fConnectionStatus>=1 && URL()!=":")
361 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
362
363 fAddress = addr;
364 fPort = port;
365}
366
367void Connection::SetEndpoint(const string &addr)
368{
369 const size_t p0 = addr.find_first_of(':');
370 const size_t p1 = addr.find_last_of(':');
371
372 if (p0==string::npos || p0!=p1)
373 {
374 Error("Connection::SetEndPoint - Wrong format of argument ('host:port' expected)");
375 return;
376 }
377
378 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
379}
380
381
382
383Connection::Connection(ba::io_service& ioservice, ostream &out) :
384MessageImp(out), tcp::socket(ioservice), fLog(0),
385fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
386fConnectionStatus(kDisconnected)
387{
388}
Note: See TracBrowser for help on using the repository browser.