source: trunk/FACT++/src/Connection.cc @ 10268

Last change on this file since 10268 was 10268, checked in by tbretz, 10 years ago
Some improvements to the connection handling, added some PostMessage functions which automatically convert to network byte order.
File size: 12.8 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "Connection.h"
9
10#include <boost/bind.hpp>
11#include <boost/lexical_cast.hpp>
12
13using namespace std;
14
15namespace ba    = boost::asio;
16namespace bs    = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::ip::tcp;
21
22
23    // -------- Abbreviations for starting async tasks ---------
24
25int Connection::Write(const Time &t, const char *txt, int qos)
26{
27    if (fLog)
28        return fLog->Write(t, txt, qos);
29
30    return MessageImp::Write(t, txt, qos);
31}
32
33void Connection::AsyncRead(ba::mutable_buffers_1 buffers, int type)
34{
35    ba::async_read(*this, buffers,
36                   boost::bind(&Connection::HandleReceivedData, this,
37                               dummy::error, dummy::bytes_transferred, type));
38}
39
40void Connection::AsyncWrite(ba::mutable_buffers_1 buffers)
41{
42    ba::async_write(*this, buffers,
43                    boost::bind(&Connection::HandleSentData, this,
44                                dummy::error, dummy::bytes_transferred));
45}
46
47void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
48                           void (Connection::*handler)(const bs::error_code&))
49{
50    // - The boost::asio::basic_deadline_timer::expires_from_now()
51    //   function cancels any pending asynchronous waits, and returns
52    //   the number of asynchronous waits that were cancelled. If it
53    //   returns 0 then you were too late and the wait handler has
54    //   already been executed, or will soon be executed. If it
55    //   returns 1 then the wait handler was successfully cancelled.
56    // - If a wait handler is cancelled, the bs::error_code passed to
57    //   it contains the value bs::error::operation_aborted.
58    timer.expires_from_now(boost::posix_time::milliseconds(millisec));
59
60    timer.async_wait(boost::bind(handler, this, dummy::error));
61}
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65    //cout << "Start async connect(...)" << endl;
66    fConnectionStatus = 1;
67
68    tcp::endpoint endpoint = *iterator;
69
70    // AsyncConnect + Deadline
71    async_connect(endpoint,
72                  boost::bind(&Connection::ConnectImp,
73                              this, ba::placeholders::error,
74                              ++iterator));
75
76    // We will get a "Connection timeout anyway"
77    //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
78}
79
80// ------------------------ close --------------------------
81// close from another thread
82void Connection::CloseImp(bool restart)
83{
84    if (IsConnected())
85    {
86        stringstream str;
87        str << "Connection closed to " << URL() << ".";// << endl;
88        Message(str);
89    }
90
91    // Stop any pending connection attempt
92    fConnectionTimer.cancel();
93
94    // Close possible open connections
95    close();
96
97    // Reset the connection status
98    fConnectionStatus = 0;
99
100    // Stop deadline counters
101    fInTimeout.cancel();
102    fOutTimeout.cancel();
103
104    if (!restart || IsConnecting())
105        return;
106
107    // We need some timeout before reconnecting!
108    // And we have to check if we are alreayd trying to connect
109    // We shoudl wait until all operations in progress were canceled
110
111    // Start trying to reconnect
112    fMsgConnect = "";
113    StartConnect();
114}
115
116void Connection::PostClose(bool restart)
117{
118    get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
119}
120
121// ------------------------ write --------------------------
122void Connection::HandleWriteTimeout(const bs::error_code &error)
123{
124    // 125: Operation canceled
125    if (error && error!=bs::error_code(125, bs::system_category))
126    {
127        stringstream str;
128        str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
129        Error(str);
130    }
131
132    if (!is_open())
133    {
134        // For example: Here we could schedule a new accept if we
135        // would not want to allow two connections at the same time.
136        return;
137    }
138
139    // Check whether the deadline has passed. We compare the deadline
140    // against the current time since a new asynchronous operation
141    // may have moved the deadline before this actor had a chance
142    // to run.
143    if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
144        return;
145
146    Error("fOutTimeout has expired, writing data to "+URL());
147
148    CloseImp();
149}
150
151void Connection::HandleSentData(const bs::error_code& error, size_t)
152{
153    if (error)
154    {
155        stringstream str;
156        str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
157        Error(str);
158
159        CloseImp();
160        return;
161    }
162
163    Message("Data successfully sent to "+URL());
164
165    // This is "thread" safe because SendMessage and HandleSentMessage
166    // are serialized in the EventQueue. Note: Do not call these
167    // functions directly from any other place then Handlers, use
168    // PostMessage instead
169    fOutQueue.pop_front();
170
171    if (fOutQueue.empty())
172    {
173        // Queue went empty, remove deadline
174        fOutTimeout.cancel();
175        return;
176    }
177
178    // AsyncWrite + Deadline
179    AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);
180    AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
181}
182
183void Connection::SendMessageImp(const vector<char> &msg)
184{
185    /*
186    if (!fConnectionEstablished)
187    {
188        UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
189        return;
190    }*/
191
192    const bool first_message_in_queue = fOutQueue.empty();
193
194    // This is "thread" safe because SendMessage and HandleSentMessage
195    // are serialized in the EventQueue. Note: Do not call these
196    // functions directly from any other place then Handlers, use
197    // PostMessage instead
198    fOutQueue.push_back(msg);
199
200    if (!first_message_in_queue)
201        return;
202
203    // AsyncWrite + Deadline
204    AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);
205    AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
206}
207
208void Connection::PostMessage(vector<uint16_t> inp)
209{
210    // Convert to network byte order
211    for_each(inp.begin(), inp.end(), htons);
212
213    // FIXME FIXME
214
215    PostMessage((char*)&inp.front(), sizeof(uint16_t)*inp.size());
216
217//    const vector<char> msg((char*)&inp.front(), (char*)&inp.last()+1);
218//    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
219}
220
221void Connection::PostMessage(const vector<char> &msg)
222{
223    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
224}
225
226void Connection::PostMessage(const void *ptr, size_t max)
227{
228    const vector<char> msg(reinterpret_cast<const char*>(ptr),
229                           reinterpret_cast<const char*>(ptr)+max);
230
231    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
232}
233
234void Connection::PostMessage(const string &cmd, size_t max)
235{
236    if (max==size_t(-1))
237        max = cmd.length()+1;
238
239    vector <char>msg(max);
240
241    for (unsigned int i=0; i<max; i++)
242        msg[i] = 0;
243
244    for (unsigned int i=0; i<min(cmd.length()+1, max); i++)
245        msg[i] = cmd[i];
246
247    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
248}
249
250void Connection::HandleConnectionTimer(const bs::error_code &error)
251{
252    // 125: Operation canceled
253    if (error && error!=bs::error_code(125, bs::system_category))
254    {
255        stringstream str;
256        str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
257        Error(str);
258    }
259
260    if (is_open())
261    {
262        // For example: Here we could schedule a new accept if we
263        // would not want to allow two connections at the same time.
264        return;
265    }
266
267    // Check whether the deadline has passed. We compare the deadline
268    // against the current time since a new asynchronous operation
269    // may have moved the deadline before this actor had a chance
270    // to run.
271    if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
272        StartConnect();
273}
274
275void Connection::ConnectImp(const bs::error_code& error,
276                            tcp::resolver::iterator endpoint_iterator)
277{
278    // Connection established
279    if (!error)
280    {
281        Message("Connection established to "+URL()+"...");
282
283        // Initialize hardware...
284        // Set state to : connected/undefined until we get the first message
285        //                (send request for configuration) ?
286        //                (send configuration?)
287        fMsgConnect = "";
288        fErrConnect = "";
289
290        fConnectionStatus = 2;
291        //StartAsyncRead();
292
293        // We can create the buffer here and delete it in
294        // Handle Read. However, the read buffer is not filled again
295        // before it was not read within HandleReceivedData -- so a single
296        // buffer should be good enough. Before HandleReceivedData
297        // returns the data must be copied to a "safe" place.
298
299        // fSocket.get_io_service()/*fEventQueue*/.stop();
300
301        ConnectionEstablished();
302        return;
303    }
304
305    // If returning from run will lead to deletion of this
306    // instance, close() is not needed (maybe implicitly called).
307    // If run is called again, close() is needed here. Otherwise:
308    // Software caused connection abort when we try to resolve
309    // the endpoint again.
310    CloseImp(false);
311    //fSocket.close();
312
313    // 111: Connection refused
314    if (1/*error!=bs::error_code(111, bs::system_category)*/)
315    {
316        stringstream msg;
317        if (URL()!=":")
318            msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")";
319
320        if (fErrConnect!=msg.str())
321        {
322            fMsgConnect = "";
323            fErrConnect = msg.str();
324            Warn(fErrConnect);
325        }
326    }
327
328    // Go on with the next
329    if (endpoint_iterator != tcp::resolver::iterator())
330    {
331        AsyncConnect(endpoint_iterator);
332        return;
333    }
334
335    // No more entries to try, if we would not put anything else
336    // into the queue anymore it would now return (run() would return)
337
338    // Since we don't want to block the main loop, we wait using an
339    // asnychronous timer
340
341    // FIXME: Should we move this before AsyncConnect() ?
342    AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
343}
344
345// FIXME: Async connect should get address and port as an argument
346void Connection::StartConnect()
347{
348    fConnectionStatus = 1;
349
350    tcp::resolver resolver(get_io_service());
351
352    boost::system::error_code ec;
353
354    tcp::resolver::query query(fAddress, fPort);
355    tcp::resolver::iterator iterator = resolver.resolve(query, ec);
356
357    stringstream msg;
358    if (!fAddress.empty() || !fPort.empty() || ec)
359        msg << "Trying to connect to " << URL() << "...";
360
361    if (ec)
362        msg << " " << ec.message() << " (" << ec << ")";
363
364    // Only output message if it has changed
365    if (fMsgConnect!=msg.str())
366    {
367        fMsgConnect = msg.str();
368        ec ? Error(msg) : Message(msg);
369    }
370
371    if (ec)
372        AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
373    else
374        // Start connection attempts (will also reset deadline counter)
375        AsyncConnect(iterator);
376}
377
378void Connection::SetEndpoint(const string &addr, int port)
379{
380    if (fConnectionStatus>=1)
381        Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
382
383    fAddress = addr;
384    fPort    = lexical_cast<string>(port);
385}
386
387void Connection::SetEndpoint(const string &addr, const string &port)
388{
389    if (fConnectionStatus>=1 && URL()!=":")
390        Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
391
392    fAddress = addr;
393    fPort    = port;
394}
395
396void Connection::SetEndpoint(const string &addr)
397{
398    const size_t p0 = addr.find_first_of(':');
399    const size_t p1 = addr.find_last_of(':');
400
401    if (p0==string::npos || p0!=p1)
402    {
403        Error("Connection::SetEndPoint - Wrong format of argument.");
404        return;
405    }
406
407    SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
408}
409
410
411
412Connection::Connection(ba::io_service& ioservice, ostream &out) :
413MessageImp(out), tcp::socket(ioservice),
414fLog(0), //fAddress("localhost"), fPort("5000"),
415fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
416fConnectionStatus(0)
417{
418}
419/*
420Connection::Connection(ba::io_service& ioservice, const string &addr, int port) :
421tcp::socket(ioservice),
422fLog(0), fAddress(addr), fPort(lexical_cast<string>(port)),
423fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
424fConnectionStatus(0)
425{
426}
427
428Connection::Connection(ba::io_service& ioservice, const string &addr, const string &port) :
429tcp::socket(ioservice), fLog(0), fAddress(addr), fPort(port),
430fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
431fConnectionStatus(0)
432{
433}
434*/
Note: See TracBrowser for help on using the repository browser.