source: trunk/FACT++/src/ConnectionUSB.cc@ 13414

Last change on this file since 13414 was 13294, checked in by tbretz, 13 years ago
Added an automatic reconnect under certain conditions.
File size: 10.5 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "ConnectionUSB.h"
9
10#include <boost/bind.hpp>
11
12using namespace std;
13
14namespace ba = boost::asio;
15namespace bs = boost::system;
16namespace dummy = ba::placeholders;
17
18using ba::serial_port_base;
19
20//#define DEBUG_TX
21//#define DEBUG
22
23#ifdef DEBUG
24#include <fstream>
25#include <iomanip>
26#include "Time.h"
27#endif
28
29// -------- Abbreviations for starting async tasks ---------
30
31int ConnectionUSB::Write(const Time &t, const string &txt, int qos)
32{
33 if (fLog)
34 return fLog->Write(t, txt, qos);
35
36 return MessageImp::Write(t, txt, qos);
37}
38
39void ConnectionUSB::AsyncRead(const ba::mutable_buffers_1 buffers, int type, int counter)
40{
41 ba::async_read(*this, buffers,
42 boost::bind(&ConnectionUSB::HandleReceivedData, this,
43 dummy::error, dummy::bytes_transferred, type, counter));
44}
45
46void ConnectionUSB::AsyncWrite(const ba::const_buffers_1 &buffers)
47{
48 ba::async_write(*this, buffers,
49 boost::bind(&ConnectionUSB::HandleSentData, this,
50 dummy::error, dummy::bytes_transferred));
51}
52
53void ConnectionUSB::AsyncWait(ba::deadline_timer &timer, int millisec,
54 void (ConnectionUSB::*handler)(const bs::error_code&))
55{
56 // - The boost::asio::basic_deadline_timer::expires_from_now()
57 // function cancels any pending asynchronous waits, and returns
58 // the number of asynchronous waits that were cancelled. If it
59 // returns 0 then you were too late and the wait handler has
60 // already been executed, or will soon be executed. If it
61 // returns 1 then the wait handler was successfully cancelled.
62 // - If a wait handler is cancelled, the bs::error_code passed to
63 // it contains the value bs::error::operation_aborted.
64 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
65
66 timer.async_wait(boost::bind(handler, this, dummy::error));
67}
68
69// ------------------------ close --------------------------
70// close from another thread
71void ConnectionUSB::CloseImp(int64_t delay)
72{
73 if (IsConnected())
74 Info("Closing connection to "+URL()+".");
75
76 // Close possible open connections
77 bs::error_code ec;
78 cancel(ec);
79 if (ec && ec!=ba::error::basic_errors::bad_descriptor)
80 {
81 ostringstream msg;
82 msg << "Cancel async requests on " << URL() << ": " << ec.message() << " (" << ec << ")";
83 Error(msg);
84 }
85
86 if (IsConnected())
87 {
88 close(ec);
89 if (ec)
90 {
91 ostringstream msg;
92 msg << "Closing " << URL() << ": " << ec.message() << " (" << ec << ")";
93 Error(msg);
94 }
95 else
96 Info("Closed connection to "+URL()+" succesfully.");
97 }
98
99 // Stop deadline counters
100 fInTimeout.cancel();
101 fOutTimeout.cancel();
102 fConnectTimeout.cancel();
103
104 // Reset the connection status
105 fConnectionStatus = kDisconnected;
106
107 // Empty output queue
108 fOutQueue.clear();
109
110#ifdef DEBUG
111 ofstream fout1("transmitted.txt", ios::app);
112 ofstream fout2("received.txt", ios::app);
113 ofstream fout3("send.txt", ios::app);
114 fout1 << Time() << ": ---" << endl;
115 fout2 << Time() << ": ---" << endl;
116 fout3 << Time() << ": ---" << endl;
117#endif
118
119 if (delay<0 || IsConnecting())
120 return;
121
122 // We need some timeout before reconnecting!
123 // And we have to check if we are alreayd trying to connect
124 // We should wait until all operations in progress were canceled
125 fConnectTimeout.expires_from_now(boost::posix_time::seconds(delay));
126 fConnectTimeout.async_wait(boost::bind(&ConnectionUSB::HandleReconnectTimeout, this, dummy::error));
127}
128
129void ConnectionUSB::PostClose(int64_t delay)
130{
131 get_io_service().post(boost::bind(&ConnectionUSB::CloseImp, this, delay));
132}
133
134void ConnectionUSB::HandleReconnectTimeout(const bs::error_code &error)
135{
136 if (error==ba::error::basic_errors::operation_aborted)
137 return;
138
139 // 125: Operation canceled (bs::error_code(125, bs::system_category))
140 if (error)
141 {
142 ostringstream str;
143 str << "Reconnect timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
144 Error(str);
145
146 CloseImp(-1);
147 return;
148 }
149
150
151 if (is_open())
152 {
153 Error("HandleReconnectTimeout - "+URL()+" is already open.");
154 return;
155 }
156
157 // Check whether the deadline has passed. We compare the deadline
158 // against the current time since a new asynchronous operation
159 // may have moved the deadline before this actor had a chance
160 // to run.
161 if (fConnectTimeout.expires_at() > ba::deadline_timer::traits_type::now())
162 return;
163
164 // Start trying to reconnect
165 Connect();
166}
167
168
169// ------------------------ write --------------------------
170void ConnectionUSB::HandleWriteTimeout(const bs::error_code &error)
171{
172 if (error==ba::error::basic_errors::operation_aborted)
173 return;
174
175 // 125: Operation canceled (bs::error_code(125, bs::system_category))
176 if (error)
177 {
178 ostringstream str;
179 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
180 Error(str);
181
182 CloseImp(-1);
183 return;
184 }
185
186 if (!is_open())
187 {
188 // For example: Here we could schedule a new accept if we
189 // would not want to allow two connections at the same time.
190 return;
191 }
192
193 // Check whether the deadline has passed. We compare the deadline
194 // against the current time since a new asynchronous operation
195 // may have moved the deadline before this actor had a chance
196 // to run.
197 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
198 return;
199
200 Error("fOutTimeout has expired, writing data to "+URL());
201
202 CloseImp(-1);
203}
204
205void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n)
206{
207 if (error && error != ba::error::not_connected)
208 {
209 ostringstream str;
210 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
211 Error(str);
212
213 CloseImp(-1);
214 return;
215 }
216
217 if (error == ba::error::not_connected)
218 {
219 ostringstream msg;
220 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
221 Warn(msg);
222 }
223 else
224 {
225#ifdef DEBUG_TX
226 ostringstream msg;
227 msg << n << " bytes successfully sent to " << URL();
228 Message(msg);
229#endif
230 }
231
232#ifdef DEBUG
233 ofstream fout("transmitted.txt", ios::app);
234 fout << Time() << ": ";
235 for (unsigned int i=0; i<fOutQueue.front().size(); i++)
236 fout << hex << setfill('0') << setw(2) << (uint32_t)fOutQueue.front()[i];
237 fout << endl;
238#endif
239
240 HandleTransmittedData(n);
241
242 // This is "thread" safe because SendMessage and HandleSentMessage
243 // are serialized in the EventQueue. Note: Do not call these
244 // functions directly from any other place then Handlers, use
245 // PostMessage instead
246 if (!fOutQueue.empty())
247 fOutQueue.pop_front();
248
249 if (fOutQueue.empty())
250 {
251 // Queue went empty, remove deadline
252 fOutTimeout.cancel();
253 return;
254 }
255
256 // AsyncWrite + Deadline
257 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
258 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
259}
260
261// It is important that when SendMessageImp is called, or to be more
262// precise boost::bind is called, teh data is copied!
263void ConnectionUSB::SendMessageImp(const vector<uint8_t> msg)
264{
265 /*
266 if (!fConnectionEstablished)
267 {
268 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
269 return;
270 }*/
271
272 const bool first_message_in_queue = fOutQueue.empty();
273
274 // This is "thread" safe because SendMessage and HandleSentMessage
275 // are serialized in the EventQueue. Note: Do not call these
276 // functions directly from any other place then Handlers, use
277 // PostMessage instead
278 fOutQueue.push_back(msg);
279
280 if (!first_message_in_queue)
281 return;
282
283#ifdef DEBUG
284 ofstream fout("send.txt", ios::app);
285 fout << Time() << ": ";
286 for (unsigned int i=0; i<msg.size(); i++)
287 fout << hex << setfill('0') << setw(2) << (uint32_t)msg[i];
288 fout << endl;
289#endif
290
291 // AsyncWrite + Deadline
292 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
293 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
294}
295
296void ConnectionUSB::PostMessage(const void *ptr, size_t max)
297{
298 const vector<uint8_t> msg(reinterpret_cast<const uint8_t*>(ptr),
299 reinterpret_cast<const uint8_t*>(ptr)+max);
300
301 get_io_service().post(boost::bind(&ConnectionUSB::SendMessageImp, this, msg));
302}
303
304void ConnectionUSB::PostMessage(const string &cmd, size_t max)
305{
306 if (max==size_t(-1))
307 max = cmd.length()+1;
308
309 vector <char>msg(max);
310
311 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
312
313 PostMessage(msg);
314}
315
316void ConnectionUSB::Connect()
317{
318 fConnectionStatus = kConnecting;
319
320 Info("Connecting to "+URL()+".");
321
322 bs::error_code ec;
323 open(URL(), ec);
324
325 if (ec)
326 {
327 ostringstream msg;
328 msg << "Error opening " << URL() << "... " << ec.message() << " (" << ec << ")";
329 Error(msg);
330 fConnectionStatus = kDisconnected;
331 return;
332 }
333
334 Info("Connection established.");
335
336 try
337 {
338 set_option(fBaudRate);
339 set_option(fCharacterSize);
340 set_option(fParity);
341 set_option(fStopBits);
342 set_option(fFlowControl);
343 }
344 catch (const bs::system_error &erc)
345 {
346 Error(string("Setting connection options: ")+erc.what());
347 // CLOSE
348 return;
349 }
350
351 fConnectionStatus = kConnected;
352
353 ConnectionEstablished();
354}
355
356void ConnectionUSB::SetEndpoint(const string &addr)
357{
358 if (fConnectionStatus>=1)
359 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
360
361 fAddress = "/dev/"+addr;
362}
363
364
365ConnectionUSB::ConnectionUSB(ba::io_service& ioservice, ostream &out) :
366MessageImp(out), ba::serial_port(ioservice), fLog(0),
367fBaudRate(115200),
368fCharacterSize(8), fParity(parity::none), fStopBits(stop_bits::one),
369fFlowControl(flow_control::hardware),
370fInTimeout(ioservice), fOutTimeout(ioservice), fConnectTimeout(ioservice),
371fConnectionStatus(kDisconnected)
372{
373}
Note: See TracBrowser for help on using the repository browser.