1 | // **************************************************************************
2 | /** @class Connection
3 |
4 | @brief Maintains an ansynchronous TCP/IP client connection
5 |
6 | */
7 | // **************************************************************************
8 | #include "ConnectionUSB.h"
9 |
10 | #include <boost/bind.hpp>
11 |
12 | using namespace std;
13 |
14 | namespace ba = boost::asio;
15 | namespace bs = boost::system;
16 | namespace dummy = ba::placeholders;
17 |
18 | using ba::serial_port_base;
19 |
20 | //#define DEBUG_TX
21 | //#define DEBUG
22 |
23 | #ifdef DEBUG
24 | #include <fstream>
25 | #include <iomanip>
26 | #include "Time.h"
27 | #endif
28 |
29 | // -------- Abbreviations for starting async tasks ---------
30 |
31 | int ConnectionUSB::Write(const Time &t, const string &txt, int qos)
32 | {
33 | if (fLog)
34 | return fLog->Write(t, txt, qos);
35 |
36 | return MessageImp::Write(t, txt, qos);
37 | }
38 |
39 | void ConnectionUSB::AsyncRead(const ba::mutable_buffers_1 buffers, int type, int counter)
40 | {
41 | ba::async_read(*this, buffers,
42 | boost::bind(&ConnectionUSB::HandleReceivedData, this,
43 | dummy::error, dummy::bytes_transferred, type, counter));
44 | }
45 |
46 | void ConnectionUSB::AsyncWrite(const ba::const_buffers_1 &buffers)
47 | {
48 | ba::async_write(*this, buffers,
49 | boost::bind(&ConnectionUSB::HandleSentData, this,
50 | dummy::error, dummy::bytes_transferred));
51 | }
52 |
53 | void ConnectionUSB::AsyncWait(ba::deadline_timer &timer, int millisec,
54 | void (ConnectionUSB::*handler)(const bs::error_code&))
55 | {
56 | // - The boost::asio::basic_deadline_timer::expires_from_now()
57 | // function cancels any pending asynchronous waits, and returns
58 | // the number of asynchronous waits that were cancelled. If it
59 | // returns 0 then you were too late and the wait handler has
60 | // already been executed, or will soon be executed. If it
61 | // returns 1 then the wait handler was successfully cancelled.
62 | // - If a wait handler is cancelled, the bs::error_code passed to
63 | // it contains the value bs::error::operation_aborted.
64 | timer.expires_from_now(boost::posix_time::milliseconds(millisec));
65 |
66 | timer.async_wait(boost::bind(handler, this, dummy::error));
67 | }
68 |
69 | // ------------------------ close --------------------------
70 | // close from another thread
71 | void ConnectionUSB::CloseImp(int64_t delay)
72 | {
73 | if (IsConnected())
74 | Info("Closing connection to "+URL()+".");
75 |
76 | // Close possible open connections
77 | bs::error_code ec;
78 | cancel(ec);
79 | if (ec && ec!=ba::error::basic_errors::bad_descriptor)
80 | {
81 | ostringstream msg;
82 | msg << "Cancel async requests on " << URL() << ": " << ec.message() << " (" << ec << ")";
83 | Error(msg);
84 | }
85 |
86 | if (IsConnected())
87 | {
88 | close(ec);
89 | if (ec)
90 | {
91 | ostringstream msg;
92 | msg << "Closing " << URL() << ": " << ec.message() << " (" << ec << ")";
93 | Error(msg);
94 | }
95 | else
96 | Info("Closed connection to "+URL()+" succesfully.");
97 | }
98 |
99 | // Stop deadline counters
100 | fInTimeout.cancel();
101 | fOutTimeout.cancel();
102 | fConnectTimeout.cancel();
103 |
104 | // Reset the connection status
105 | fQueueSize = 0;
106 | fConnectionStatus = kDisconnected;
107 |
108 | #ifdef DEBUG
109 | ofstream fout1("transmitted.txt", ios::app);
110 | ofstream fout2("received.txt", ios::app);
111 | ofstream fout3("send.txt", ios::app);
112 | fout1 << Time() << ": ---" << endl;
113 | fout2 << Time() << ": ---" << endl;
114 | fout3 << Time() << ": ---" << endl;
115 | #endif
116 |
117 | if (delay<0 || IsConnecting())
118 | return;
119 |
120 | // We need some timeout before reconnecting!
121 | // And we have to check if we are alreayd trying to connect
122 | // We should wait until all operations in progress were canceled
123 | fConnectTimeout.expires_from_now(boost::posix_time::seconds(delay));
124 | fConnectTimeout.async_wait(boost::bind(&ConnectionUSB::HandleReconnectTimeout, this, dummy::error));
125 | }
126 |
127 | void ConnectionUSB::PostClose(int64_t delay)
128 | {
129 | get_io_service().post(boost::bind(&ConnectionUSB::CloseImp, this, delay));
130 | }
131 |
132 | void ConnectionUSB::HandleReconnectTimeout(const bs::error_code &error)
133 | {
134 | if (error==ba::error::basic_errors::operation_aborted)
135 | return;
136 |
137 | // 125: Operation canceled (bs::error_code(125, bs::system_category))
138 | if (error)
139 | {
140 | ostringstream str;
141 | str << "Reconnect timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
142 | Error(str);
143 |
144 | CloseImp(-1);
145 | return;
146 | }
147 |
148 |
149 | if (is_open())
150 | {
151 | Error("HandleReconnectTimeout - "+URL()+" is already open.");
152 | return;
153 | }
154 |
155 | // Check whether the deadline has passed. We compare the deadline
156 | // against the current time since a new asynchronous operation
157 | // may have moved the deadline before this actor had a chance
158 | // to run.
159 | if (fConnectTimeout.expires_at() > ba::deadline_timer::traits_type::now())
160 | return;
161 |
162 | // Start trying to reconnect
163 | Connect();
164 | }
165 |
166 |
167 | // ------------------------ write --------------------------
168 | void ConnectionUSB::HandleWriteTimeout(const bs::error_code &error)
169 | {
170 | if (error==ba::error::basic_errors::operation_aborted)
171 | return;
172 |
173 | // 125: Operation canceled (bs::error_code(125, bs::system_category))
174 | if (error)
175 | {
176 | ostringstream str;
177 | str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
178 | Error(str);
179 |
180 | CloseImp(-1);
181 | return;
182 | }
183 |
184 | if (!is_open())
185 | {
186 | // For example: Here we could schedule a new accept if we
187 | // would not want to allow two connections at the same time.
188 | return;
189 | }
190 |
191 | // Check whether the deadline has passed. We compare the deadline
192 | // against the current time since a new asynchronous operation
193 | // may have moved the deadline before this actor had a chance
194 | // to run.
195 | if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
196 | return;
197 |
198 | Error("fOutTimeout has expired, writing data to "+URL());
199 |
200 | CloseImp(-1);
201 | }
202 |
203 | void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n)
204 | {
205 | if (error==ba::error::basic_errors::operation_aborted)
206 | return;
207 |
208 | if (error && error != ba::error::not_connected)
209 | {
210 | ostringstream str;
211 | str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
212 | Error(str);
213 |
214 | CloseImp(-1);
215 | return;
216 | }
217 |
218 | if (error == ba::error::not_connected)
219 | {
220 | ostringstream msg;
221 | msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
222 | Warn(msg);
223 | return;
224 | }
225 |
226 | if (--fQueueSize==0)
227 | fOutTimeout.cancel();
228 |
229 | #ifdef DEBUG_TX
230 | ostringstream msg;
231 | msg << n << " bytes successfully sent to " << URL();
232 | Message(msg);
233 | #endif
234 |
235 | #ifdef DEBUG
236 | ofstream fout("transmitted.txt", ios::app);
237 | fout << Time() << ": ";
238 | for (unsigned int i=0; i<fOutQueue.front().size(); i++)
239 | fout << hex << setfill('0') << setw(2) << (uint32_t)fOutQueue.front()[i];
240 | fout << endl;
241 | #endif
242 |
243 | HandleTransmittedData(n);
244 | }
245 |
246 | void ConnectionUSB::PostMessage(const void *ptr, size_t sz)
247 | {
248 | // This function can be called from a different thread...
249 | if (!is_open())
250 | return;
251 |
252 | // ... this is why we have to increase fQueueSize first
253 | fQueueSize++;
254 |
255 | // ... and shift the deadline timer
256 | // This is not ideal, because if we are continously
257 | // filling the buffer, it will never timeout
258 | AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
259 |
260 | // Now we can schedule the buffer to be sent
261 | AsyncWrite(ba::const_buffers_1(ptr, sz));
262 | }
263 |
264 | void ConnectionUSB::PostMessage(const string &cmd, size_t max)
265 | {
266 | if (max==size_t(-1))
267 | max = cmd.length()+1;
268 |
269 | PostMessage(cmd.c_str(), min(cmd.length()+1, max));
270 | }
271 |
272 | void ConnectionUSB::Connect(int _baud_rate, int _character_size,
273 | parity::type _parity, stop_bits::type _stop_bits,
274 | flow_control::type _flow_control)
275 | {
276 | fConnectionStatus = kConnecting;
277 |
278 | Info("Connecting to "+URL()+".");
279 |
280 | bs::error_code ec;
281 | open(URL(), ec);
282 |
283 | if (ec)
284 | {
285 | ostringstream msg;
286 | msg << "Error opening " << URL() << "... " << ec.message() << " (" << ec << ")";
287 | Error(msg);
288 | fConnectionStatus = kDisconnected;
289 | return;
290 | }
291 |
292 | Info("Connection established.");
293 |
294 | try
295 | {
296 | set_option(baud_rate(_baud_rate));
297 | set_option(character_size(_character_size));
298 | set_option(parity(_parity));
299 | set_option(stop_bits(_stop_bits));
300 | set_option(flow_control(_flow_control));
301 | /*
302 | Info("Setting Baud Rate");
303 | set_option(fBaudRate);
304 | Info("Setting Character Size");
305 | set_option(fCharacterSize);
306 | Info("Setting Parity");
307 | set_option(fParity);
308 | Info("Setting Sop Bits");
309 | set_option(fStopBits);
310 | Info("Setting Flow control");
311 | set_option(fFlowControl);
312 | */
313 | }
314 | catch (const bs::system_error &erc)
315 | {
316 | Error(string("Setting connection options: ")+erc.what());
317 | close();
318 | return;
319 | }
320 |
321 | fQueueSize = 0;
322 | fConnectionStatus = kConnected;
323 |
324 | ConnectionEstablished();
325 | }
326 |
327 | void ConnectionUSB::SetEndpoint(const string &addr)
328 | {
329 | if (fConnectionStatus>=1)
330 | Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
331 |
332 | fAddress = "/dev/"+addr;
333 | }
334 |
335 |
336 | ConnectionUSB::ConnectionUSB(ba::io_service& ioservice, ostream &out) :
337 | MessageImp(out), ba::serial_port(ioservice), fLog(0),
338 | /*
339 | fBaudRate(115200),
340 | fCharacterSize(8), fParity(parity::none), fStopBits(stop_bits::one),
341 | fFlowControl(flow_control::none),
342 | */
343 | fInTimeout(ioservice), fOutTimeout(ioservice), fConnectTimeout(ioservice),
344 | fQueueSize(0), fConnectionStatus(kDisconnected)
345 | {
346 | }