source: trunk/FACT++/src/Connection.cc@ 10551

Last change on this file since 10551 was 10512, checked in by tbretz, 14 years ago
replace the address of the first element by the data() member function
File size: 11.3 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "Connection.h"
9
10#include <boost/bind.hpp>
11#include <boost/lexical_cast.hpp>
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::ip::tcp;
21
22
23 // -------- Abbreviations for starting async tasks ---------
24
25int Connection::Write(const Time &t, const string &txt, int qos)
26{
27 if (fLog)
28 return fLog->Write(t, txt, qos);
29
30 return MessageImp::Write(t, txt, qos);
31}
32
33void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
34{
35 ba::async_read(*this, buffers,
36 boost::bind(&Connection::HandleReceivedData, this,
37 dummy::error, dummy::bytes_transferred, type));
38}
39
40void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
41{
42 ba::async_write(*this, buffers,
43 boost::bind(&Connection::HandleSentData, this,
44 dummy::error, dummy::bytes_transferred));
45}
46
47void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
48 void (Connection::*handler)(const bs::error_code&))
49{
50 // - The boost::asio::basic_deadline_timer::expires_from_now()
51 // function cancels any pending asynchronous waits, and returns
52 // the number of asynchronous waits that were cancelled. If it
53 // returns 0 then you were too late and the wait handler has
54 // already been executed, or will soon be executed. If it
55 // returns 1 then the wait handler was successfully cancelled.
56 // - If a wait handler is cancelled, the bs::error_code passed to
57 // it contains the value bs::error::operation_aborted.
58 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
59
60 timer.async_wait(boost::bind(handler, this, dummy::error));
61}
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65 tcp::endpoint endpoint = *iterator;
66
67 // AsyncConnect + Deadline
68 async_connect(endpoint,
69 boost::bind(&Connection::ConnectImp,
70 this, ba::placeholders::error,
71 iterator));
72
73 // We will get a "Connection timeout anyway"
74 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
75}
76
77// ------------------------ close --------------------------
78// close from another thread
79void Connection::CloseImp(bool restart)
80{
81 if (IsConnected())
82 {
83 stringstream str;
84 str << "Connection closed to " << URL() << ".";
85 Message(str);
86 }
87
88 // Stop any pending connection attempt
89 fConnectionTimer.cancel();
90
91 // Close possible open connections
92 close();
93
94 // Reset the connection status
95 fConnectionStatus = kDisconnected;
96
97 // Stop deadline counters
98 fInTimeout.cancel();
99 fOutTimeout.cancel();
100
101 // Empty output queue
102 fOutQueue.clear();
103
104 if (!restart || IsConnecting())
105 return;
106
107 // We need some timeout before reconnecting!
108 // And we have to check if we are alreayd trying to connect
109 // We shoudl wait until all operations in progress were canceled
110
111 // Start trying to reconnect
112 fMsgConnect = "";
113 fErrConnect = "";
114 StartConnect();
115}
116
117void Connection::PostClose(bool restart)
118{
119 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
120}
121
122// ------------------------ write --------------------------
123void Connection::HandleWriteTimeout(const bs::error_code &error)
124{
125 // 125: Operation canceled (bs::error_code(125, bs::system_category))
126 if (error && error!=ba::error::basic_errors::operation_aborted)
127 {
128 stringstream str;
129 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
130 Error(str);
131
132 CloseImp();
133 return;
134 }
135
136 if (!is_open())
137 {
138 // For example: Here we could schedule a new accept if we
139 // would not want to allow two connections at the same time.
140 return;
141 }
142
143 // Check whether the deadline has passed. We compare the deadline
144 // against the current time since a new asynchronous operation
145 // may have moved the deadline before this actor had a chance
146 // to run.
147 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
148 return;
149
150 Error("fOutTimeout has expired, writing data to "+URL());
151
152 CloseImp();
153}
154
155void Connection::HandleSentData(const bs::error_code& error, size_t n)
156{
157 if (error)
158 {
159 stringstream str;
160 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
161 Error(str);
162
163 CloseImp();
164 return;
165 }
166
167 stringstream msg;
168 msg << n << " bytes successfully sent to " << URL();
169 Message(msg);
170
171 // This is "thread" safe because SendMessage and HandleSentMessage
172 // are serialized in the EventQueue. Note: Do not call these
173 // functions directly from any other place then Handlers, use
174 // PostMessage instead
175 fOutQueue.pop_front();
176
177 if (fOutQueue.empty())
178 {
179 // Queue went empty, remove deadline
180 fOutTimeout.cancel();
181 return;
182 }
183
184 // AsyncWrite + Deadline
185 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
186 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
187}
188
189// It is important that when SendMessageImp is called, or to be more
190// precise boost::bind is called, teh data is copied!
191void Connection::SendMessageImp(const vector<char> msg)
192{
193 /*
194 if (!fConnectionEstablished)
195 {
196 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
197 return;
198 }*/
199
200 const bool first_message_in_queue = fOutQueue.empty();
201
202 // This is "thread" safe because SendMessage and HandleSentMessage
203 // are serialized in the EventQueue. Note: Do not call these
204 // functions directly from any other place then Handlers, use
205 // PostMessage instead
206 fOutQueue.push_back(msg);
207
208 if (!first_message_in_queue)
209 return;
210
211 // AsyncWrite + Deadline
212 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
213 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
214}
215
216void Connection::PostMessage(const void *ptr, size_t max)
217{
218 const vector<char> msg(reinterpret_cast<const char*>(ptr),
219 reinterpret_cast<const char*>(ptr)+max);
220
221 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
222}
223
224void Connection::PostMessage(const string &cmd, size_t max)
225{
226 if (max==size_t(-1))
227 max = cmd.length()+1;
228
229 vector <char>msg(max);
230
231 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
232
233 PostMessage(msg);
234}
235
236void Connection::HandleConnectionTimer(const bs::error_code &error)
237{
238 // 125: Operation canceled
239 if (error && error!=ba::error::basic_errors::operation_aborted)
240 {
241 stringstream str;
242 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
243 Error(str);
244 }
245
246 if (is_open())
247 {
248 // For example: Here we could schedule a new accept if we
249 // would not want to allow two connections at the same time.
250 return;
251 }
252
253 // Check whether the deadline has passed. We compare the deadline
254 // against the current time since a new asynchronous operation
255 // may have moved the deadline before this actor had a chance
256 // to run.
257 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
258 StartConnect();
259}
260
261void Connection::ConnectImp(const bs::error_code& error,
262 tcp::resolver::iterator iterator)
263{
264 tcp::endpoint endpoint = *iterator;
265
266 const string host = endpoint.port()==0 ? "" :
267 endpoint.address().to_string()+":"+lexical_cast<string>(endpoint.port());
268
269 // Connection established
270 if (!error)
271 {
272 Message("Connection established to "+host+"...");
273
274 fConnectionStatus = kConnected;
275
276 ConnectionEstablished();
277 return;
278 }
279
280 // If returning from run will lead to deletion of this
281 // instance, close() is not needed (maybe implicitly called).
282 // If run is called again, close() is needed here. Otherwise:
283 // Software caused connection abort when we try to resolve
284 // the endpoint again.
285 CloseImp(false);
286
287 stringstream msg;
288 if (!host.empty())
289 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
290
291 if (fErrConnect!=msg.str())
292 {
293 if (error!=ba::error::basic_errors::connection_refused)
294 fMsgConnect = "";
295 fErrConnect = msg.str();
296 Warn(fErrConnect);
297 }
298
299 // Go on with the next
300 if (++iterator != tcp::resolver::iterator())
301 {
302 AsyncConnect(iterator);
303 return;
304 }
305
306 // No more entries to try, if we would not put anything else
307 // into the queue anymore it would now return (run() would return)
308
309 // Since we don't want to block the main loop, we wait using an
310 // asnychronous timer
311
312 // FIXME: Should we move this before AsyncConnect() ?
313 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
314}
315
316// FIXME: Async connect should get address and port as an argument
317void Connection::StartConnect()
318{
319 fConnectionStatus = kConnecting;
320
321 tcp::resolver resolver(get_io_service());
322
323 boost::system::error_code ec;
324
325 tcp::resolver::query query(fAddress, fPort);
326 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
327
328 stringstream msg;
329 if (!fAddress.empty() || !fPort.empty() || ec)
330 msg << "Trying to connect to " << URL() << "...";
331
332 if (ec)
333 msg << " " << ec.message() << " (" << ec << ")";
334
335 // Only output message if it has changed
336 if (fMsgConnect!=msg.str())
337 {
338 fMsgConnect = msg.str();
339 ec ? Error(msg) : Message(msg);
340 }
341
342 if (ec)
343 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
344 else
345 // Start connection attempts (will also reset deadline counter)
346 AsyncConnect(iterator);
347}
348
349void Connection::SetEndpoint(const string &addr, int port)
350{
351 if (fConnectionStatus>=1)
352 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
353
354 fAddress = addr;
355 fPort = lexical_cast<string>(port);
356}
357
358void Connection::SetEndpoint(const string &addr, const string &port)
359{
360 if (fConnectionStatus>=1 && URL()!=":")
361 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
362
363 fAddress = addr;
364 fPort = port;
365}
366
367void Connection::SetEndpoint(const string &addr)
368{
369 const size_t p0 = addr.find_first_of(':');
370 const size_t p1 = addr.find_last_of(':');
371
372 if (p0==string::npos || p0!=p1)
373 {
374 Error("Connection::SetEndPoint - Wrong format of argument ('host:port' expected)");
375 return;
376 }
377
378 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
379}
380
381
382
383Connection::Connection(ba::io_service& ioservice, ostream &out) :
384MessageImp(out), tcp::socket(ioservice), fLog(0),
385fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
386fConnectionStatus(kDisconnected)
387{
388}
Note: See TracBrowser for help on using the repository browser.