source: trunk/FACT++/src/Connection.cc@ 10278

Last change on this file since 10278 was 10268, checked in by tbretz, 14 years ago
Some improvements to the connection handling, added some PostMessage functions which automatically convert to network byte order.
File size: 12.8 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "Connection.h"
9
10#include <boost/bind.hpp>
11#include <boost/lexical_cast.hpp>
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::ip::tcp;
21
22
23 // -------- Abbreviations for starting async tasks ---------
24
25int Connection::Write(const Time &t, const char *txt, int qos)
26{
27 if (fLog)
28 return fLog->Write(t, txt, qos);
29
30 return MessageImp::Write(t, txt, qos);
31}
32
33void Connection::AsyncRead(ba::mutable_buffers_1 buffers, int type)
34{
35 ba::async_read(*this, buffers,
36 boost::bind(&Connection::HandleReceivedData, this,
37 dummy::error, dummy::bytes_transferred, type));
38}
39
40void Connection::AsyncWrite(ba::mutable_buffers_1 buffers)
41{
42 ba::async_write(*this, buffers,
43 boost::bind(&Connection::HandleSentData, this,
44 dummy::error, dummy::bytes_transferred));
45}
46
47void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
48 void (Connection::*handler)(const bs::error_code&))
49{
50 // - The boost::asio::basic_deadline_timer::expires_from_now()
51 // function cancels any pending asynchronous waits, and returns
52 // the number of asynchronous waits that were cancelled. If it
53 // returns 0 then you were too late and the wait handler has
54 // already been executed, or will soon be executed. If it
55 // returns 1 then the wait handler was successfully cancelled.
56 // - If a wait handler is cancelled, the bs::error_code passed to
57 // it contains the value bs::error::operation_aborted.
58 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
59
60 timer.async_wait(boost::bind(handler, this, dummy::error));
61}
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65 //cout << "Start async connect(...)" << endl;
66 fConnectionStatus = 1;
67
68 tcp::endpoint endpoint = *iterator;
69
70 // AsyncConnect + Deadline
71 async_connect(endpoint,
72 boost::bind(&Connection::ConnectImp,
73 this, ba::placeholders::error,
74 ++iterator));
75
76 // We will get a "Connection timeout anyway"
77 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
78}
79
80// ------------------------ close --------------------------
81// close from another thread
82void Connection::CloseImp(bool restart)
83{
84 if (IsConnected())
85 {
86 stringstream str;
87 str << "Connection closed to " << URL() << ".";// << endl;
88 Message(str);
89 }
90
91 // Stop any pending connection attempt
92 fConnectionTimer.cancel();
93
94 // Close possible open connections
95 close();
96
97 // Reset the connection status
98 fConnectionStatus = 0;
99
100 // Stop deadline counters
101 fInTimeout.cancel();
102 fOutTimeout.cancel();
103
104 if (!restart || IsConnecting())
105 return;
106
107 // We need some timeout before reconnecting!
108 // And we have to check if we are alreayd trying to connect
109 // We shoudl wait until all operations in progress were canceled
110
111 // Start trying to reconnect
112 fMsgConnect = "";
113 StartConnect();
114}
115
116void Connection::PostClose(bool restart)
117{
118 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
119}
120
121// ------------------------ write --------------------------
122void Connection::HandleWriteTimeout(const bs::error_code &error)
123{
124 // 125: Operation canceled
125 if (error && error!=bs::error_code(125, bs::system_category))
126 {
127 stringstream str;
128 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
129 Error(str);
130 }
131
132 if (!is_open())
133 {
134 // For example: Here we could schedule a new accept if we
135 // would not want to allow two connections at the same time.
136 return;
137 }
138
139 // Check whether the deadline has passed. We compare the deadline
140 // against the current time since a new asynchronous operation
141 // may have moved the deadline before this actor had a chance
142 // to run.
143 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
144 return;
145
146 Error("fOutTimeout has expired, writing data to "+URL());
147
148 CloseImp();
149}
150
151void Connection::HandleSentData(const bs::error_code& error, size_t)
152{
153 if (error)
154 {
155 stringstream str;
156 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
157 Error(str);
158
159 CloseImp();
160 return;
161 }
162
163 Message("Data successfully sent to "+URL());
164
165 // This is "thread" safe because SendMessage and HandleSentMessage
166 // are serialized in the EventQueue. Note: Do not call these
167 // functions directly from any other place then Handlers, use
168 // PostMessage instead
169 fOutQueue.pop_front();
170
171 if (fOutQueue.empty())
172 {
173 // Queue went empty, remove deadline
174 fOutTimeout.cancel();
175 return;
176 }
177
178 // AsyncWrite + Deadline
179 AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);
180 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
181}
182
183void Connection::SendMessageImp(const vector<char> &msg)
184{
185 /*
186 if (!fConnectionEstablished)
187 {
188 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
189 return;
190 }*/
191
192 const bool first_message_in_queue = fOutQueue.empty();
193
194 // This is "thread" safe because SendMessage and HandleSentMessage
195 // are serialized in the EventQueue. Note: Do not call these
196 // functions directly from any other place then Handlers, use
197 // PostMessage instead
198 fOutQueue.push_back(msg);
199
200 if (!first_message_in_queue)
201 return;
202
203 // AsyncWrite + Deadline
204 AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);
205 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
206}
207
208void Connection::PostMessage(vector<uint16_t> inp)
209{
210 // Convert to network byte order
211 for_each(inp.begin(), inp.end(), htons);
212
213 // FIXME FIXME
214
215 PostMessage((char*)&inp.front(), sizeof(uint16_t)*inp.size());
216
217// const vector<char> msg((char*)&inp.front(), (char*)&inp.last()+1);
218// get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
219}
220
221void Connection::PostMessage(const vector<char> &msg)
222{
223 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
224}
225
226void Connection::PostMessage(const void *ptr, size_t max)
227{
228 const vector<char> msg(reinterpret_cast<const char*>(ptr),
229 reinterpret_cast<const char*>(ptr)+max);
230
231 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
232}
233
234void Connection::PostMessage(const string &cmd, size_t max)
235{
236 if (max==size_t(-1))
237 max = cmd.length()+1;
238
239 vector <char>msg(max);
240
241 for (unsigned int i=0; i<max; i++)
242 msg[i] = 0;
243
244 for (unsigned int i=0; i<min(cmd.length()+1, max); i++)
245 msg[i] = cmd[i];
246
247 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
248}
249
250void Connection::HandleConnectionTimer(const bs::error_code &error)
251{
252 // 125: Operation canceled
253 if (error && error!=bs::error_code(125, bs::system_category))
254 {
255 stringstream str;
256 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
257 Error(str);
258 }
259
260 if (is_open())
261 {
262 // For example: Here we could schedule a new accept if we
263 // would not want to allow two connections at the same time.
264 return;
265 }
266
267 // Check whether the deadline has passed. We compare the deadline
268 // against the current time since a new asynchronous operation
269 // may have moved the deadline before this actor had a chance
270 // to run.
271 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
272 StartConnect();
273}
274
275void Connection::ConnectImp(const bs::error_code& error,
276 tcp::resolver::iterator endpoint_iterator)
277{
278 // Connection established
279 if (!error)
280 {
281 Message("Connection established to "+URL()+"...");
282
283 // Initialize hardware...
284 // Set state to : connected/undefined until we get the first message
285 // (send request for configuration) ?
286 // (send configuration?)
287 fMsgConnect = "";
288 fErrConnect = "";
289
290 fConnectionStatus = 2;
291 //StartAsyncRead();
292
293 // We can create the buffer here and delete it in
294 // Handle Read. However, the read buffer is not filled again
295 // before it was not read within HandleReceivedData -- so a single
296 // buffer should be good enough. Before HandleReceivedData
297 // returns the data must be copied to a "safe" place.
298
299 // fSocket.get_io_service()/*fEventQueue*/.stop();
300
301 ConnectionEstablished();
302 return;
303 }
304
305 // If returning from run will lead to deletion of this
306 // instance, close() is not needed (maybe implicitly called).
307 // If run is called again, close() is needed here. Otherwise:
308 // Software caused connection abort when we try to resolve
309 // the endpoint again.
310 CloseImp(false);
311 //fSocket.close();
312
313 // 111: Connection refused
314 if (1/*error!=bs::error_code(111, bs::system_category)*/)
315 {
316 stringstream msg;
317 if (URL()!=":")
318 msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")";
319
320 if (fErrConnect!=msg.str())
321 {
322 fMsgConnect = "";
323 fErrConnect = msg.str();
324 Warn(fErrConnect);
325 }
326 }
327
328 // Go on with the next
329 if (endpoint_iterator != tcp::resolver::iterator())
330 {
331 AsyncConnect(endpoint_iterator);
332 return;
333 }
334
335 // No more entries to try, if we would not put anything else
336 // into the queue anymore it would now return (run() would return)
337
338 // Since we don't want to block the main loop, we wait using an
339 // asnychronous timer
340
341 // FIXME: Should we move this before AsyncConnect() ?
342 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
343}
344
345// FIXME: Async connect should get address and port as an argument
346void Connection::StartConnect()
347{
348 fConnectionStatus = 1;
349
350 tcp::resolver resolver(get_io_service());
351
352 boost::system::error_code ec;
353
354 tcp::resolver::query query(fAddress, fPort);
355 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
356
357 stringstream msg;
358 if (!fAddress.empty() || !fPort.empty() || ec)
359 msg << "Trying to connect to " << URL() << "...";
360
361 if (ec)
362 msg << " " << ec.message() << " (" << ec << ")";
363
364 // Only output message if it has changed
365 if (fMsgConnect!=msg.str())
366 {
367 fMsgConnect = msg.str();
368 ec ? Error(msg) : Message(msg);
369 }
370
371 if (ec)
372 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
373 else
374 // Start connection attempts (will also reset deadline counter)
375 AsyncConnect(iterator);
376}
377
378void Connection::SetEndpoint(const string &addr, int port)
379{
380 if (fConnectionStatus>=1)
381 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
382
383 fAddress = addr;
384 fPort = lexical_cast<string>(port);
385}
386
387void Connection::SetEndpoint(const string &addr, const string &port)
388{
389 if (fConnectionStatus>=1 && URL()!=":")
390 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
391
392 fAddress = addr;
393 fPort = port;
394}
395
396void Connection::SetEndpoint(const string &addr)
397{
398 const size_t p0 = addr.find_first_of(':');
399 const size_t p1 = addr.find_last_of(':');
400
401 if (p0==string::npos || p0!=p1)
402 {
403 Error("Connection::SetEndPoint - Wrong format of argument.");
404 return;
405 }
406
407 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
408}
409
410
411
412Connection::Connection(ba::io_service& ioservice, ostream &out) :
413MessageImp(out), tcp::socket(ioservice),
414fLog(0), //fAddress("localhost"), fPort("5000"),
415fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
416fConnectionStatus(0)
417{
418}
419/*
420Connection::Connection(ba::io_service& ioservice, const string &addr, int port) :
421tcp::socket(ioservice),
422fLog(0), fAddress(addr), fPort(lexical_cast<string>(port)),
423fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
424fConnectionStatus(0)
425{
426}
427
428Connection::Connection(ba::io_service& ioservice, const string &addr, const string &port) :
429tcp::socket(ioservice), fLog(0), fAddress(addr), fPort(port),
430fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
431fConnectionStatus(0)
432{
433}
434*/
Note: See TracBrowser for help on using the repository browser.