source: trunk/FACT++/src/Connection.cc@ 10183

Last change on this file since 10183 was 10183, checked in by tbretz, 14 years ago
New import.
File size: 11.0 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "Connection.h"
9
10#include <boost/bind.hpp>
11#include <boost/lexical_cast.hpp>
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::ip::tcp;
21
22
23 // -------- Abbreviations for starting async tasks ---------
24
25int Connection::Write(const Time &t, const char *txt, int qos)
26{
27 if (fLog)
28 return fLog->Write(t, txt, qos);
29
30 return MessageImp::Write(t, txt, qos);
31}
32
33void Connection::AsyncRead(ba::mutable_buffers_1 buffers)
34{
35 ba::async_read(*this, buffers,
36 boost::bind(&Connection::HandleReceivedData, this,
37 dummy::error, dummy::bytes_transferred));
38}
39
40void Connection::AsyncWrite(ba::mutable_buffers_1 buffers)
41{
42 ba::async_write(*this, buffers,
43 boost::bind(&Connection::HandleSentData, this,
44 dummy::error, dummy::bytes_transferred));
45}
46
47void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
48 void (Connection::*handler)(const bs::error_code&))
49{
50 // - The boost::asio::basic_deadline_timer::expires_from_now()
51 // function cancels any pending asynchronous waits, and returns
52 // the number of asynchronous waits that were cancelled. If it
53 // returns 0 then you were too late and the wait handler has
54 // already been executed, or will soon be executed. If it
55 // returns 1 then the wait handler was successfully cancelled.
56 // - If a wait handler is cancelled, the bs::error_code passed to
57 // it contains the value bs::error::operation_aborted.
58 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
59
60 timer.async_wait(boost::bind(handler, this, dummy::error));
61}
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65 //cout << "Start async connect(...)" << endl;
66 fConnectionStatus = 1;
67
68 tcp::endpoint endpoint = *iterator;
69
70 // AsyncConnect + Deadline
71 async_connect(endpoint,
72 boost::bind(&Connection::ConnectImp,
73 this, ba::placeholders::error,
74 ++iterator));
75
76 // We will get a "Connection timeout anyway"
77 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
78}
79
80// ------------------------ close --------------------------
81// close from another thread
82void Connection::CloseImp(bool restart)
83{
84 if (IsConnected())
85 {
86 stringstream str;
87 str << "Connection closed to " << URL() << ".";// << endl;
88 Message(str);
89 }
90
91 // Close the connection
92 fConnectionStatus = 0;
93 close();
94
95 // Stop deadline counters
96 fInTimeout.cancel();
97 fOutTimeout.cancel();
98 //fConnectTimeout.cancel();
99
100 if (!restart || IsConnecting())
101 return;
102
103 // We need some timeout before reconnecting!
104 // And we have to check if we are alreayd trying to connect
105 // We shoudl wait until all operations in progress were canceled
106
107 // Start trying to reconnect
108 AsyncConnect();
109}
110
111void Connection::PostClose(bool restart)
112{
113 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
114}
115
116// ------------------------ write --------------------------
117void Connection::HandleWriteTimeout(const bs::error_code &error)
118{
119 // 125: Operation canceled
120 if (error && error!=bs::error_code(125, bs::system_category))
121 {
122 stringstream str;
123 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
124 Error(str);
125 }
126
127 if (!is_open())
128 {
129 // For example: Here we could schedule a new accept if we
130 // would not want to allow two connections at the same time.
131 return;
132 }
133
134 // Check whether the deadline has passed. We compare the deadline
135 // against the current time since a new asynchronous operation
136 // may have moved the deadline before this actor had a chance
137 // to run.
138 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
139 return;
140
141 Error("fOutTimeout has expired, writing data to "+URL());
142
143 CloseImp();
144}
145
146void Connection::HandleSentData(const bs::error_code& error, size_t)
147{
148 if (error)
149 {
150 stringstream str;
151 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
152 Error(str);
153
154 CloseImp();
155 return;
156 }
157
158 Message("Data successfully sent to "+URL());
159
160 // This is "thread" safe because SendMessage and HandleSentMessage
161 // are serialized in the EventQueue. Note: Do not call these
162 // functions directly from any other place then Handlers, use
163 // PostMessage instead
164 fOutQueue.pop_front();
165
166 if (fOutQueue.empty())
167 {
168 // Queue went empty, remove deadline
169 fOutTimeout.cancel();
170 return;
171 }
172
173 // AsyncWrite + Deadline
174 AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);
175 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
176}
177
178void Connection::SendMessageImp(const vector<char> &msg)
179{
180 /*
181 if (!fConnectionEstablished)
182 {
183 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
184 return;
185 }*/
186
187 const bool first_message_in_queue = fOutQueue.empty();
188
189 // This is "thread" safe because SendMessage and HandleSentMessage
190 // are serialized in the EventQueue. Note: Do not call these
191 // functions directly from any other place then Handlers, use
192 // PostMessage instead
193 fOutQueue.push_back(msg);
194
195 if (!first_message_in_queue)
196 return;
197
198 // AsyncWrite + Deadline
199 AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);
200 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
201}
202
203void Connection::PostMessage(const vector<char> &msg)
204{
205 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
206}
207
208void Connection::PostMessage(const string &cmd, size_t max)
209{
210 if (max==size_t(-1))
211 max = cmd.length()+1;
212
213 vector <char>msg(max);
214
215 for (unsigned int i=0; i<max; i++)
216 msg[i] = 0;
217
218 for (unsigned int i=0; i<min(cmd.length()+1, max); i++)
219 msg[i] = cmd[i];
220
221 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
222}
223
224void Connection::HandleConnectionTimer(const bs::error_code &error)
225{
226 // 125: Operation canceled
227 if (error && error!=bs::error_code(125, bs::system_category))
228 {
229 stringstream str;
230 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
231 Error(str);
232 }
233
234 if (is_open())
235 {
236 // For example: Here we could schedule a new accept if we
237 // would not want to allow two connections at the same time.
238 return;
239 }
240
241 // Check whether the deadline has passed. We compare the deadline
242 // against the current time since a new asynchronous operation
243 // may have moved the deadline before this actor had a chance
244 // to run.
245 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
246 AsyncConnect();
247}
248
249void Connection::ConnectImp(const bs::error_code& error,
250 tcp::resolver::iterator endpoint_iterator)
251{
252 // Connection established
253 if (!error)
254 {
255 Message("Connection established to "+URL()+"...");
256
257 // Initialize hardware...
258 // Set state to : connected/undefined until we get the first message
259 // (send request for configuration) ?
260 // (send configuration?)
261 fMsgConnect = "";
262 fErrConnect = "";
263
264 fConnectionStatus = 2;
265 //StartAsyncRead();
266
267 // We can create the buffer here and delete it in
268 // Handle Read. However, the read buffer is not filled again
269 // before it was not read within HandleReceivedData -- so a single
270 // buffer should be good enough. Before HandleReceivedData
271 // returns the data must be copied to a "safe" place.
272
273 // fSocket.get_io_service()/*fEventQueue*/.stop();
274 return;
275 }
276
277 // If returning from run will lead to deletion of this
278 // instance, close() is not needed (maybe implicitly called).
279 // If run is called again, close() is needed here. Otherwise:
280 // Software caused connection abort when we try to resolve
281 // the endpoint again.
282 CloseImp(false);
283 //fSocket.close();
284
285 // 111: Connection refused
286 if (1/*error!=bs::error_code(111, bs::system_category)*/)
287 {
288 stringstream msg;
289 msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")";
290
291 if (fErrConnect!=msg.str())
292 {
293 fMsgConnect = "";
294 fErrConnect = msg.str();
295 Warn(fErrConnect);
296 }
297 }
298
299 // Go on with the next
300 if (endpoint_iterator != tcp::resolver::iterator())
301 {
302 AsyncConnect(endpoint_iterator);
303 return;
304 }
305
306 // No more entries to try, if we would not put anything else
307 // into the queue anymore it would now return (run() would return)
308
309 // Since we don't want to block the main loop, we wait using an
310 // asnychronous timer
311
312 // FIXME: Should we move this before AsyncConnect() ?
313 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
314}
315
316// FIXME: Async connect should get address and port as an argument
317void Connection::AsyncConnect()
318{
319 fConnectionStatus = 1;
320
321 tcp::resolver resolver(get_io_service());
322
323 tcp::resolver::query query(fAddress, fPort);
324 tcp::resolver::iterator iterator = resolver.resolve(query);
325
326 stringstream msg;
327 msg << "Trying to connect to " << fAddress << ":" << fPort << "...";
328
329 if (fMsgConnect!=msg.str())
330 {
331 fMsgConnect = msg.str();
332 Message(msg);
333 }
334
335 // Start connection attempts (will also reset deadline counter)
336 AsyncConnect(iterator);
337}
338
339void Connection::SetEndpoint(const char *addr, int port)
340{
341 if (fConnectionStatus>=1)
342 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
343
344 fAddress = addr;
345 fPort = lexical_cast<string>(port);
346}
347
348
349Connection::Connection(ba::io_service& ioservice, ostream &out) :
350MessageImp(out), tcp::socket(ioservice),
351fLog(0), fAddress("localhost"), fPort("5000"),
352fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
353fConnectionStatus(0)
354{
355}
356
357Connection::Connection(ba::io_service& ioservice, const string &addr, int port) :
358tcp::socket(ioservice),
359fLog(0), fAddress(addr), fPort(lexical_cast<string>(port)),
360fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
361fConnectionStatus(0)
362{
363}
364
365Connection::Connection(ba::io_service& ioservice, const string &addr, const string &port) :
366tcp::socket(ioservice), fLog(0), fAddress(addr), fPort(port),
367fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
368fConnectionStatus(0)
369{
370}
371
Note: See TracBrowser for help on using the repository browser.