source: trunk/FACT++/src/Connection.cc @ 10268

Last change on this file since 10268 was 10268, checked in by tbretz, 9 years ago
Some improvements to the connection handling, added some PostMessage functions which automatically convert to network byte order.
File size: 12.8 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "Connection.h"
9
10#include <boost/bind.hpp>
11#include <boost/lexical_cast.hpp>
12
13using namespace std;
14
15namespace ba    = boost::asio;
16namespace bs    = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::ip::tcp;
21
22
23    // -------- Abbreviations for starting async tasks ---------
24
25int Connection::Write(const Time &t, const char *txt, int qos)
26{
27    if (fLog)
28        return fLog->Write(t, txt, qos);
29
30    return MessageImp::Write(t, txt, qos);
31}
32
33void Connection::AsyncRead(ba::mutable_buffers_1 buffers, int type)
34{
35    ba::async_read(*this, buffers,
36                   boost::bind(&Connection::HandleReceivedData, this,
37                               dummy::error, dummy::bytes_transferred, type));
38}
39
40void Connection::AsyncWrite(ba::mutable_buffers_1 buffers)
41{
42    ba::async_write(*this, buffers,
43                    boost::bind(&Connection::HandleSentData, this,
44                                dummy::error, dummy::bytes_transferred));
45}
46
47void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
48                           void (Connection::*handler)(const bs::error_code&))
49{
50    // - The boost::asio::basic_deadline_timer::expires_from_now()
51    //   function cancels any pending asynchronous waits, and returns
52    //   the number of asynchronous waits that were cancelled. If it
53    //   returns 0 then you were too late and the wait handler has
54    //   already been executed, or will soon be executed. If it
55    //   returns 1 then the wait handler was successfully cancelled.
56    // - If a wait handler is cancelled, the bs::error_code passed to
57    //   it contains the value bs::error::operation_aborted.
58    timer.expires_from_now(boost::posix_time::milliseconds(millisec));
59
60    timer.async_wait(boost::bind(handler, this, dummy::error));
61}
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65    //cout << "Start async connect(...)" << endl;
66    fConnectionStatus = 1;
67
68    tcp::endpoint endpoint = *iterator;
69
70    // AsyncConnect + Deadline
71    async_connect(endpoint,
72                  boost::bind(&Connection::ConnectImp,
73                              this, ba::placeholders::error,
74                              ++iterator));
75
76    // We will get a "Connection timeout anyway"
77    //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
78}
79
80// ------------------------ close --------------------------
81// close from another thread
82void Connection::CloseImp(bool restart)
83{
84    if (IsConnected())
85    {
86        stringstream str;
87        str << "Connection closed to " << URL() << ".";// << endl;
88        Message(str);
89    }
90
91    // Stop any pending connection attempt
92    fConnectionTimer.cancel();
93
94    // Close possible open connections
95    close();
96
97    // Reset the connection status
98    fConnectionStatus = 0;
99
100    // Stop deadline counters
101    fInTimeout.cancel();
102    fOutTimeout.cancel();
103
104    if (!restart || IsConnecting())
105        return;
106
107    // We need some timeout before reconnecting!
108    // And we have to check if we are alreayd trying to connect
109    // We shoudl wait until all operations in progress were canceled
110
111    // Start trying to reconnect
112    fMsgConnect = "";
113    StartConnect();
114}
115
116void Connection::PostClose(bool restart)
117{
118    get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
119}
120
121// ------------------------ write --------------------------
122void Connection::HandleWriteTimeout(const bs::error_code &error)
123{
124    // 125: Operation canceled
125    if (error && error!=bs::error_code(125, bs::system_category))
126    {
127        stringstream str;
128        str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
129        Error(str);
130    }
131
132    if (!is_open())
133    {
134        // For example: Here we could schedule a new accept if we
135        // would not want to allow two connections at the same time.
136        return;
137    }
138
139    // Check whether the deadline has passed. We compare the deadline
140    // against the current time since a new asynchronous operation
141    // may have moved the deadline before this actor had a chance
142    // to run.
143    if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
144        return;
145
146    Error("fOutTimeout has expired, writing data to "+URL());
147
148    CloseImp();
149}
150
151void Connection::HandleSentData(const bs::error_code& error, size_t)
152{
153    if (error)
154    {
155        stringstream str;
156        str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
157        Error(str);
158
159        CloseImp();
160        return;
161    }
162
163    Message("Data successfully sent to "+URL());
164
165    // This is "thread" safe because SendMessage and HandleSentMessage
166    // are serialized in the EventQueue. Note: Do not call these
167    // functions directly from any other place then Handlers, use
168    // PostMessage instead
169    fOutQueue.pop_front();
170
171    if (fOutQueue.empty())
172    {
173        // Queue went empty, remove deadline
174        fOutTimeout.cancel();
175        return;
176    }
177
178    // AsyncWrite + Deadline
179    AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);
180    AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
181}
182
183void Connection::SendMessageImp(const vector<char> &msg)
184{
185    /*
186    if (!fConnectionEstablished)
187    {
188        UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
189        return;
190    }*/
191
192    const bool first_message_in_queue = fOutQueue.empty();
193
194    // This is "thread" safe because SendMessage and HandleSentMessage
195    // are serialized in the EventQueue. Note: Do not call these
196    // functions directly from any other place then Handlers, use
197    // PostMessage instead
198    fOutQueue.push_back(msg);
199
200    if (!first_message_in_queue)
201        return;
202
203    // AsyncWrite + Deadline
204    AsyncWrite(ba::buffer(fOutQueue.front())/*, &Connection::HandleSentData*/);
205    AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
206}
207
208void Connection::PostMessage(vector<uint16_t> inp)
209{
210    // Convert to network byte order
211    for_each(inp.begin(), inp.end(), htons);
212
213    // FIXME FIXME
214
215    PostMessage((char*)&inp.front(), sizeof(uint16_t)*inp.size());
216
217//    const vector<char> msg((char*)&inp.front(), (char*)&inp.last()+1);
218//    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
219}
220
221void Connection::PostMessage(const vector<char> &msg)
222{
223    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
224}
225
226void Connection::PostMessage(const void *ptr, size_t max)
227{
228    const vector<char> msg(reinterpret_cast<const char*>(ptr),
229                           reinterpret_cast<const char*>(ptr)+max);
230
231    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
232}
233
234void Connection::PostMessage(const string &cmd, size_t max)
235{
236    if (max==size_t(-1))
237        max = cmd.length()+1;
238
239    vector <char>msg(max);
240
241    for (unsigned int i=0; i<max; i++)
242        msg[i] = 0;
243
244    for (unsigned int i=0; i<min(cmd.length()+1, max); i++)
245        msg[i] = cmd[i];
246
247    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
248}
249
250void Connection::HandleConnectionTimer(const bs::error_code &error)
251{
252    // 125: Operation canceled
253    if (error && error!=bs::error_code(125, bs::system_category))
254    {
255        stringstream str;
256        str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
257        Error(str);
258    }
259
260    if (is_open())
261    {
262        // For example: Here we could schedule a new accept if we
263        // would not want to allow two connections at the same time.
264        return;
265    }
266
267    // Check whether the deadline has passed. We compare the deadline
268    // against the current time since a new asynchronous operation
269    // may have moved the deadline before this actor had a chance
270    // to run.
271    if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
272        StartConnect();
273}
274
275void Connection::ConnectImp(const bs::error_code& error,
276                            tcp::resolver::iterator endpoint_iterator)
277{
278    // Connection established
279    if (!error)
280    {
281        Message("Connection established to "+URL()+"...");
282
283        // Initialize hardware...
284        // Set state to : connected/undefined until we get the first message
285        //                (send request for configuration) ?
286        //                (send configuration?)
287        fMsgConnect = "";
288        fErrConnect = "";
289
290        fConnectionStatus = 2;
291        //StartAsyncRead();
292
293        // We can create the buffer here and delete it in
294        // Handle Read. However, the read buffer is not filled again
295        // before it was not read within HandleReceivedData -- so a single
296        // buffer should be good enough. Before HandleReceivedData
297        // returns the data must be copied to a "safe" place.
298
299        // fSocket.get_io_service()/*fEventQueue*/.stop();
300
301        ConnectionEstablished();
302        return;
303    }
304
305    // If returning from run will lead to deletion of this
306    // instance, close() is not needed (maybe implicitly called).
307    // If run is called again, close() is needed here. Otherwise:
308    // Software caused connection abort when we try to resolve
309    // the endpoint again.
310    CloseImp(false);
311    //fSocket.close();
312
313    // 111: Connection refused
314    if (1/*error!=bs::error_code(111, bs::system_category)*/)
315    {
316        stringstream msg;
317        if (URL()!=":")
318            msg << "Connecting to " << URL() << ": " << error.message() << " (" << error << ")";
319
320        if (fErrConnect!=msg.str())
321        {
322            fMsgConnect = "";
323            fErrConnect = msg.str();
324            Warn(fErrConnect);
325        }
326    }
327
328    // Go on with the next
329    if (endpoint_iterator != tcp::resolver::iterator())
330    {
331        AsyncConnect(endpoint_iterator);
332        return;
333    }
334
335    // No more entries to try, if we would not put anything else
336    // into the queue anymore it would now return (run() would return)
337
338    // Since we don't want to block the main loop, we wait using an
339    // asnychronous timer
340
341    // FIXME: Should we move this before AsyncConnect() ?
342    AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
343}
344
345// FIXME: Async connect should get address and port as an argument
346void Connection::StartConnect()
347{
348    fConnectionStatus = 1;
349
350    tcp::resolver resolver(get_io_service());
351
352    boost::system::error_code ec;
353
354    tcp::resolver::query query(fAddress, fPort);
355    tcp::resolver::iterator iterator = resolver.resolve(query, ec);
356
357    stringstream msg;
358    if (!fAddress.empty() || !fPort.empty() || ec)
359        msg << "Trying to connect to " << URL() << "...";
360
361    if (ec)
362        msg << " " << ec.message() << " (" << ec << ")";
363
364    // Only output message if it has changed
365    if (fMsgConnect!=msg.str())
366    {
367        fMsgConnect = msg.str();
368        ec ? Error(msg) : Message(msg);
369    }
370
371    if (ec)
372        AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
373    else
374        // Start connection attempts (will also reset deadline counter)
375        AsyncConnect(iterator);
376}
377
378void Connection::SetEndpoint(const string &addr, int port)
379{
380    if (fConnectionStatus>=1)
381        Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
382
383    fAddress = addr;
384    fPort    = lexical_cast<string>(port);
385}
386
387void Connection::SetEndpoint(const string &addr, const string &port)
388{
389    if (fConnectionStatus>=1 && URL()!=":")
390        Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
391
392    fAddress = addr;
393    fPort    = port;
394}
395
396void Connection::SetEndpoint(const string &addr)
397{
398    const size_t p0 = addr.find_first_of(':');
399    const size_t p1 = addr.find_last_of(':');
400
401    if (p0==string::npos || p0!=p1)
402    {
403        Error("Connection::SetEndPoint - Wrong format of argument.");
404        return;
405    }
406
407    SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
408}
409
410
411
412Connection::Connection(ba::io_service& ioservice, ostream &out) :
413MessageImp(out), tcp::socket(ioservice),
414fLog(0), //fAddress("localhost"), fPort("5000"),
415fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
416fConnectionStatus(0)
417{
418}
419/*
420Connection::Connection(ba::io_service& ioservice, const string &addr, int port) :
421tcp::socket(ioservice),
422fLog(0), fAddress(addr), fPort(lexical_cast<string>(port)),
423fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
424fConnectionStatus(0)
425{
426}
427
428Connection::Connection(ba::io_service& ioservice, const string &addr, const string &port) :
429tcp::socket(ioservice), fLog(0), fAddress(addr), fPort(port),
430fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
431fConnectionStatus(0)
432{
433}
434*/
Note: See TracBrowser for help on using the repository browser.