source: trunk/FACT++/src/Connection.cc @ 10284

Last change on this file since 10284 was 10284, checked in by tbretz, 9 years ago
- Added enum ConnectionStatus_t - change fConnectionStatus type to ConnectionStatus_t - removed obsolete fReadBuffer - added const qualifiers to const_buffer_1 in AsyncWrite - added Out() member function - (temporarily?) removed some PostMessage member functions - removed host and port from constructors for simplification - improved how connections are established - improved output of host name during connection attempts
File size: 11.3 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "Connection.h"
9
10#include <boost/bind.hpp>
11#include <boost/lexical_cast.hpp>
12
13using namespace std;
14
15namespace ba    = boost::asio;
16namespace bs    = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::ip::tcp;
21
22
23    // -------- Abbreviations for starting async tasks ---------
24
25int Connection::Write(const Time &t, const char *txt, int qos)
26{
27    if (fLog)
28        return fLog->Write(t, txt, qos);
29
30    return MessageImp::Write(t, txt, qos);
31}
32
33void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
34{
35    ba::async_read(*this, buffers,
36                   boost::bind(&Connection::HandleReceivedData, this,
37                               dummy::error, dummy::bytes_transferred, type));
38}
39
40void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
41{
42    ba::async_write(*this, buffers,
43                    boost::bind(&Connection::HandleSentData, this,
44                                dummy::error, dummy::bytes_transferred));
45}
46
47void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
48                           void (Connection::*handler)(const bs::error_code&))
49{
50    // - The boost::asio::basic_deadline_timer::expires_from_now()
51    //   function cancels any pending asynchronous waits, and returns
52    //   the number of asynchronous waits that were cancelled. If it
53    //   returns 0 then you were too late and the wait handler has
54    //   already been executed, or will soon be executed. If it
55    //   returns 1 then the wait handler was successfully cancelled.
56    // - If a wait handler is cancelled, the bs::error_code passed to
57    //   it contains the value bs::error::operation_aborted.
58    timer.expires_from_now(boost::posix_time::milliseconds(millisec));
59
60    timer.async_wait(boost::bind(handler, this, dummy::error));
61}
62
63void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64{
65    tcp::endpoint endpoint = *iterator;
66
67    // AsyncConnect + Deadline
68    async_connect(endpoint,
69                  boost::bind(&Connection::ConnectImp,
70                              this, ba::placeholders::error,
71                              iterator));
72
73    // We will get a "Connection timeout anyway"
74    //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
75}
76
77// ------------------------ close --------------------------
78// close from another thread
79void Connection::CloseImp(bool restart)
80{
81    if (IsConnected())
82    {
83        stringstream str;
84        str << "Connection closed to " << URL() << ".";
85        Message(str);
86    }
87
88    // Stop any pending connection attempt
89    fConnectionTimer.cancel();
90
91    // Close possible open connections
92    close();
93
94    // Reset the connection status
95    fConnectionStatus = kDisconnected;
96
97    // Stop deadline counters
98    fInTimeout.cancel();
99    fOutTimeout.cancel();
100
101    // Empty output queue
102    fOutQueue.clear();
103
104    if (!restart || IsConnecting())
105        return;
106
107    // We need some timeout before reconnecting!
108    // And we have to check if we are alreayd trying to connect
109    // We shoudl wait until all operations in progress were canceled
110
111    // Start trying to reconnect
112    fMsgConnect = "";
113    fErrConnect = "";
114    StartConnect();
115}
116
117void Connection::PostClose(bool restart)
118{
119    get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
120}
121
122// ------------------------ write --------------------------
123void Connection::HandleWriteTimeout(const bs::error_code &error)
124{
125    // 125: Operation canceled (bs::error_code(125, bs::system_category))
126    if (error && error!=ba::error::basic_errors::operation_aborted)
127    {
128        stringstream str;
129        str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
130        Error(str);
131
132        CloseImp();
133        return;
134    }
135
136    if (!is_open())
137    {
138        // For example: Here we could schedule a new accept if we
139        // would not want to allow two connections at the same time.
140        return;
141    }
142
143    // Check whether the deadline has passed. We compare the deadline
144    // against the current time since a new asynchronous operation
145    // may have moved the deadline before this actor had a chance
146    // to run.
147    if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
148        return;
149
150    Error("fOutTimeout has expired, writing data to "+URL());
151
152    CloseImp();
153}
154
155void Connection::HandleSentData(const bs::error_code& error, size_t n)
156{
157    if (error)
158    {
159        stringstream str;
160        str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
161        Error(str);
162
163        CloseImp();
164        return;
165    }
166
167    stringstream msg;
168    msg << n << " bytes successfully sent to " << URL();
169    Message(msg);
170
171    // This is "thread" safe because SendMessage and HandleSentMessage
172    // are serialized in the EventQueue. Note: Do not call these
173    // functions directly from any other place then Handlers, use
174    // PostMessage instead
175    fOutQueue.pop_front();
176
177    if (fOutQueue.empty())
178    {
179        // Queue went empty, remove deadline
180        fOutTimeout.cancel();
181        return;
182    }
183
184    // AsyncWrite + Deadline
185    AsyncWrite(ba::const_buffers_1(&fOutQueue.front()[0],fOutQueue.front().size())/*, &Connection::HandleSentData*/);
186    AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
187}
188
189// It is important that when SendMessageImp is called, or to be more
190// precise boost::bind is called, teh data is copied!
191void Connection::SendMessageImp(const vector<char> msg)
192{
193    /*
194    if (!fConnectionEstablished)
195    {
196        UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
197        return;
198    }*/
199
200    const bool first_message_in_queue = fOutQueue.empty();
201
202    // This is "thread" safe because SendMessage and HandleSentMessage
203    // are serialized in the EventQueue. Note: Do not call these
204    // functions directly from any other place then Handlers, use
205    // PostMessage instead
206    fOutQueue.push_back(msg);
207
208    if (!first_message_in_queue)
209        return;
210
211    // AsyncWrite + Deadline
212    AsyncWrite(ba::const_buffers_1(&fOutQueue.front()[0],fOutQueue.front().size())/*, &Connection::HandleSentData*/);
213    AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
214}
215
216void Connection::PostMessage(const void *ptr, size_t max)
217{
218    const vector<char> msg(reinterpret_cast<const char*>(ptr),
219                           reinterpret_cast<const char*>(ptr)+max);
220
221    get_io_service().post(boost::bind(&Connection::SendMessageImp, this, msg));
222}
223
224void Connection::PostMessage(const string &cmd, size_t max)
225{
226    if (max==size_t(-1))
227        max = cmd.length()+1;
228
229    vector <char>msg(max);
230
231    copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
232
233    PostMessage(msg);
234}
235
236void Connection::HandleConnectionTimer(const bs::error_code &error)
237{
238    // 125: Operation canceled
239    if (error && error!=ba::error::basic_errors::operation_aborted)
240    {
241        stringstream str;
242        str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
243        Error(str);
244    }
245
246    if (is_open())
247    {
248        // For example: Here we could schedule a new accept if we
249        // would not want to allow two connections at the same time.
250        return;
251    }
252
253    // Check whether the deadline has passed. We compare the deadline
254    // against the current time since a new asynchronous operation
255    // may have moved the deadline before this actor had a chance
256    // to run.
257    if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
258        StartConnect();
259}
260
261void Connection::ConnectImp(const bs::error_code& error,
262                            tcp::resolver::iterator iterator)
263{
264    tcp::endpoint endpoint = *iterator;
265
266    const string host = endpoint.port()==0 ? "" :
267        endpoint.address().to_string()+":"+lexical_cast<string>(endpoint.port());
268
269    // Connection established
270    if (!error)
271    {
272        Message("Connection established to "+host+"...");
273
274        fConnectionStatus = kConnected;
275
276        ConnectionEstablished();
277        return;
278    }
279
280    // If returning from run will lead to deletion of this
281    // instance, close() is not needed (maybe implicitly called).
282    // If run is called again, close() is needed here. Otherwise:
283    // Software caused connection abort when we try to resolve
284    // the endpoint again.
285    CloseImp(false);
286
287    stringstream msg;
288    if (!host.empty())
289        msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
290
291    if (fErrConnect!=msg.str())
292    {
293        if (error!=ba::error::basic_errors::connection_refused)
294            fMsgConnect = "";
295        fErrConnect = msg.str();
296        Warn(fErrConnect);
297    }
298
299    // Go on with the next
300    if (++iterator != tcp::resolver::iterator())
301    {
302        AsyncConnect(iterator);
303        return;
304    }
305
306    // No more entries to try, if we would not put anything else
307    // into the queue anymore it would now return (run() would return)
308
309    // Since we don't want to block the main loop, we wait using an
310    // asnychronous timer
311
312    // FIXME: Should we move this before AsyncConnect() ?
313    AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
314}
315
316// FIXME: Async connect should get address and port as an argument
317void Connection::StartConnect()
318{
319    fConnectionStatus = kConnecting;
320
321    tcp::resolver resolver(get_io_service());
322
323    boost::system::error_code ec;
324
325    tcp::resolver::query query(fAddress, fPort);
326    tcp::resolver::iterator iterator = resolver.resolve(query, ec);
327
328    stringstream msg;
329    if (!fAddress.empty() || !fPort.empty() || ec)
330        msg << "Trying to connect to " << URL() << "...";
331
332    if (ec)
333        msg << " " << ec.message() << " (" << ec << ")";
334
335    // Only output message if it has changed
336    if (fMsgConnect!=msg.str())
337    {
338        fMsgConnect = msg.str();
339        ec ? Error(msg) : Message(msg);
340    }
341
342    if (ec)
343        AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
344    else
345        // Start connection attempts (will also reset deadline counter)
346        AsyncConnect(iterator);
347}
348
349void Connection::SetEndpoint(const string &addr, int port)
350{
351    if (fConnectionStatus>=1)
352        Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
353
354    fAddress = addr;
355    fPort    = lexical_cast<string>(port);
356}
357
358void Connection::SetEndpoint(const string &addr, const string &port)
359{
360    if (fConnectionStatus>=1 && URL()!=":")
361        Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
362
363    fAddress = addr;
364    fPort    = port;
365}
366
367void Connection::SetEndpoint(const string &addr)
368{
369    const size_t p0 = addr.find_first_of(':');
370    const size_t p1 = addr.find_last_of(':');
371
372    if (p0==string::npos || p0!=p1)
373    {
374        Error("Connection::SetEndPoint - Wrong format of argument ('host:port' expected)");
375        return;
376    }
377
378    SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
379}
380
381
382
383Connection::Connection(ba::io_service& ioservice, ostream &out) :
384MessageImp(out), tcp::socket(ioservice), fLog(0),
385fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
386fConnectionStatus(kDisconnected)
387{
388}
Note: See TracBrowser for help on using the repository browser.