source: trunk/FACT++/src/Connection.cc@ 10876

Last change on this file since 10876 was 10831, checked in by tbretz, 14 years ago
Replaces some Message calls by Info
File size: 11.8 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "Connection.h"
9
10#include <boost/bind.hpp>
11#include <boost/lexical_cast.hpp>
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::ip::tcp;
21
22
23 // -------- Abbreviations for starting async tasks ---------
24
25int Connection::Write(const Time &t, const string &txt, int qos)
26{
27 if (fLog)
28 return fLog->Write(t, txt, qos);
29
30 return MessageImp::Write(t, txt, qos);
31}
32
33void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
34{
35 ba::async_read(*this, buffers,
36 boost::bind(&Connection::HandleReceivedData, this,
37 dummy::error, dummy::bytes_transferred, type));
38}
39
40void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
41{
42 ba::async_write(*this, buffers,
43 boost::bind(&Connection::HandleSentData, this,
44 dummy::error, dummy::bytes_transferred));
45}
46
47void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
48 void (Connection::*handler)(const bs::error_code&))
49{
50 // - The boost::asio::basic_deadline_timer::expires_from_now()
51 // function cancels any pending asynchronous waits, and returns
52 // the number of asynchronous waits that were cancelled. If it
53 // returns 0 then you were too late and the wait handler has
54 // already been executed, or will soon be executed. If it
55 // returns 1 then the wait handler was successfully cancelled.
56 // - If a wait handler is cancelled, the bs::error_code passed to
57 // it contains the value bs::error::operation_aborted.
58 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
59
60 timer.async_wait(boost::bind(handler, this, dummy::error));
61}
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65 tcp::endpoint endpoint = *iterator;
66
67 // AsyncConnect + Deadline
68 async_connect(endpoint,
69 boost::bind(&Connection::ConnectImp,
70 this, ba::placeholders::error,
71 iterator));
72
73 // We will get a "Connection timeout anyway"
74 //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
75}
76
77// ------------------------ close --------------------------
78// close from another thread
79void Connection::CloseImp(bool restart)
80{
81 if (IsConnected())
82 {
83 ostringstream str;
84 str << "Connection closed to " << URL() << ".";
85 Info(str);
86 }
87
88 // Stop any pending connection attempt
89 fConnectionTimer.cancel();
90
91 // Close possible open connections
92 close();
93
94 // Reset the connection status
95 fConnectionStatus = kDisconnected;
96
97 // Stop deadline counters
98 fInTimeout.cancel();
99 fOutTimeout.cancel();
100
101 // Empty output queue
102 fOutQueue.clear();
103
104 if (!restart || IsConnecting())
105 return;
106
107 // We need some timeout before reconnecting!
108 // And we have to check if we are alreayd trying to connect
109 // We shoudl wait until all operations in progress were canceled
110
111 // Start trying to reconnect
112 fMsgConnect = "";
113 fErrConnect = "";
114 StartConnect();
115}
116
117void Connection::PostClose(bool restart)
118{
119 get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
120}
121
122// ------------------------ write --------------------------
123void Connection::HandleWriteTimeout(const bs::error_code &error)
124{
125 if (error==ba::error::basic_errors::operation_aborted)
126 return;
127
128 // 125: Operation canceled (bs::error_code(125, bs::system_category))
129 if (error)
130 {
131 ostringstream str;
132 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
133 Error(str);
134
135 CloseImp();
136 return;
137 }
138
139 if (!is_open())
140 {
141 // For example: Here we could schedule a new accept if we
142 // would not want to allow two connections at the same time.
143 return;
144 }
145
146 // Check whether the deadline has passed. We compare the deadline
147 // against the current time since a new asynchronous operation
148 // may have moved the deadline before this actor had a chance
149 // to run.
150 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
151 return;
152
153 Error("fOutTimeout has expired, writing data to "+URL());
154
155 CloseImp();
156}
157
158void Connection::HandleSentData(const bs::error_code& error, size_t n)
159{
160 if (error && error != ba::error::not_connected)
161 {
162 ostringstream str;
163 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
164 Error(str);
165
166 CloseImp();
167 return;
168 }
169
170 if (error == ba::error::not_connected)
171 {
172 ostringstream msg;
173 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
174 Warn(msg);
175 }
176 else
177 {
178 ostringstream msg;
179 msg << n << " bytes successfully sent to " << URL();
180 Message(msg);
181 }
182
183 // This is "thread" safe because SendMessage and HandleSentMessage
184 // are serialized in the EventQueue. Note: Do not call these
185 // functions directly from any other place then Handlers, use
186 // PostMessage instead
187 fOutQueue.pop_front();
188
189 if (fOutQueue.empty())
190 {
191 // Queue went empty, remove deadline
192 fOutTimeout.cancel();
193 return;
194 }
195
196 // AsyncWrite + Deadline
197 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
198 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
199}
200
201// It is important that when SendMessageImp is called, or to be more
202// precise boost::bind is called, teh data is copied!
203void Connection::SendMessageImp(const vector<char> msg)
204{
205 /*
206 if (!fConnectionEstablished)
207 {
208 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
209 return;
210 }*/
211
212 const bool first_message_in_queue = fOutQueue.empty();
213
214 // This is "thread" safe because SendMessage and HandleSentMessage
215 // are serialized in the EventQueue. Note: Do not call these
216 // functions directly from any other place then Handlers, use
217 // PostMessage instead
218 fOutQueue.push_back(msg);
219
220 if (!first_message_in_queue)
221 return;
222
223 // AsyncWrite + Deadline
224 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &Connection::HandleSentData*/);
225 AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
226}
227
228void Connection::PostMessage(const void *ptr, size_t max)
229{
230 const vector<char> msg(reinterpret_cast<const char*>(ptr),
231 reinterpret_cast<const char*>(ptr)+max);
232
233 get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
234}
235
236void Connection::PostMessage(const string &cmd, size_t max)
237{
238 if (max==size_t(-1))
239 max = cmd.length()+1;
240
241 vector <char>msg(max);
242
243 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
244
245 PostMessage(msg);
246}
247
248void Connection::HandleConnectionTimer(const bs::error_code &error)
249{
250 if (error==ba::error::basic_errors::operation_aborted)
251 return;
252
253 if (error)
254 {
255 ostringstream str;
256 str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
257 Error(str);
258 }
259
260 if (is_open())
261 {
262 // For example: Here we could schedule a new accept if we
263 // would not want to allow two connections at the same time.
264 return;
265 }
266
267 // Check whether the deadline has passed. We compare the deadline
268 // against the current time since a new asynchronous operation
269 // may have moved the deadline before this actor had a chance
270 // to run.
271 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
272 StartConnect();
273}
274
275void Connection::ConnectImp(const bs::error_code& error,
276 tcp::resolver::iterator iterator)
277{
278 tcp::endpoint endpoint = *iterator;
279
280 const string host = endpoint.port()==0 ? "" :
281 endpoint.address().to_string()+":"+lexical_cast<string>(endpoint.port());
282
283 // Connection established
284 if (!error)
285 {
286 Info("Connection established to "+host+"...");
287
288 fConnectionStatus = kConnected;
289
290 ConnectionEstablished();
291 return;
292 }
293
294 // If returning from run will lead to deletion of this
295 // instance, close() is not needed (maybe implicitly called).
296 // If run is called again, close() is needed here. Otherwise:
297 // Software caused connection abort when we try to resolve
298 // the endpoint again.
299 CloseImp(false);
300 fConnectionStatus = kConnecting;
301
302 // FIXME: "No route to host (syytem::113)" gives error:"Operation already in progress. (system:114)"
303 ostringstream msg;
304 if (!host.empty())
305 msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
306
307 if (fErrConnect!=msg.str())
308 {
309 if (error!=ba::error::basic_errors::connection_refused)
310 fMsgConnect = "";
311 fErrConnect = msg.str();
312 Warn(fErrConnect);
313 }
314
315 // Go on with the next
316 if (++iterator != tcp::resolver::iterator())
317 {
318 AsyncConnect(iterator);
319 return;
320 }
321
322 // No more entries to try, if we would not put anything else
323 // into the queue anymore it would now return (run() would return)
324
325 // Since we don't want to block the main loop, we wait using an
326 // asnychronous timer
327
328 // FIXME: Should we move this before AsyncConnect() ?
329 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
330}
331
332// FIXME: Async connect should get address and port as an argument
333void Connection::StartConnect()
334{
335 fConnectionStatus = kConnecting;
336
337 tcp::resolver resolver(get_io_service());
338
339 boost::system::error_code ec;
340
341 tcp::resolver::query query(fAddress, fPort);
342 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
343
344 ostringstream msg;
345 if (!fAddress.empty() || !fPort.empty() || ec)
346 msg << "Trying to connect to " << URL() << "...";
347
348 if (ec)
349 msg << " " << ec.message() << " (" << ec << ")";
350
351 // Only output message if it has changed
352 if (fMsgConnect!=msg.str())
353 {
354 fMsgConnect = msg.str();
355 ec ? Error(msg) : Info(msg);
356 }
357
358 if (ec)
359 AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
360 else
361 // Start connection attempts (will also reset deadline counter)
362 AsyncConnect(iterator);
363}
364
365void Connection::SetEndpoint(const string &addr, int port)
366{
367 if (fConnectionStatus>=1)
368 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
369
370 fAddress = addr;
371 fPort = lexical_cast<string>(port);
372}
373
374void Connection::SetEndpoint(const string &addr, const string &port)
375{
376 if (fConnectionStatus>=1 && URL()!=":")
377 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
378
379 fAddress = addr;
380 fPort = port;
381}
382
383void Connection::SetEndpoint(const string &addr)
384{
385 const size_t p0 = addr.find_first_of(':');
386 const size_t p1 = addr.find_last_of(':');
387
388 if (p0==string::npos || p0!=p1)
389 {
390 Error("Connection::SetEndpoint - Wrong format of argument '"+addr+"' ('host:port' expected)");
391 return;
392 }
393
394 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
395}
396
397
398
399Connection::Connection(ba::io_service& ioservice, ostream &out) :
400MessageImp(out), tcp::socket(ioservice), fLog(0),
401fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
402fConnectionStatus(kDisconnected)
403{
404}
Note: See TracBrowser for help on using the repository browser.