source: trunk/FACT++/src/ConnectionUSB.cc@ 20019

Last change on this file since 20019 was 20019, checked in by tbretz, 4 years ago
In favor of boost 1.70, replaced get_io_context
File size: 9.6 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "ConnectionUSB.h"
9
10#include <boost/bind.hpp>
11
12using namespace std;
13
14namespace ba = boost::asio;
15namespace bs = boost::system;
16namespace dummy = ba::placeholders;
17
18using ba::serial_port_base;
19
20//#define DEBUG_TX
21//#define DEBUG
22
23#ifdef DEBUG
24#include <fstream>
25#include <iomanip>
26#include "Time.h"
27#endif
28
29// -------- Abbreviations for starting async tasks ---------
30
31int ConnectionUSB::Write(const Time &t, const string &txt, int qos)
32{
33 if (fLog)
34 return fLog->Write(t, txt, qos);
35
36 return MessageImp::Write(t, txt, qos);
37}
38
39void ConnectionUSB::AsyncRead(const ba::mutable_buffers_1 buffers, int type, int counter)
40{
41 ba::async_read(*this, buffers,
42 boost::bind(&ConnectionUSB::HandleReceivedData, this,
43 dummy::error, dummy::bytes_transferred, type, counter));
44}
45
46void ConnectionUSB::AsyncWrite(const ba::const_buffers_1 &buffers)
47{
48 ba::async_write(*this, buffers,
49 boost::bind(&ConnectionUSB::HandleSentData, this,
50 dummy::error, dummy::bytes_transferred));
51}
52
53void ConnectionUSB::AsyncWait(ba::deadline_timer &timer, int millisec,
54 void (ConnectionUSB::*handler)(const bs::error_code&))
55{
56 // - The boost::asio::basic_deadline_timer::expires_from_now()
57 // function cancels any pending asynchronous waits, and returns
58 // the number of asynchronous waits that were cancelled. If it
59 // returns 0 then you were too late and the wait handler has
60 // already been executed, or will soon be executed. If it
61 // returns 1 then the wait handler was successfully cancelled.
62 // - If a wait handler is cancelled, the bs::error_code passed to
63 // it contains the value bs::error::operation_aborted.
64 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
65
66 timer.async_wait(boost::bind(handler, this, dummy::error));
67}
68
69// ------------------------ close --------------------------
70// close from another thread
71void ConnectionUSB::CloseImp(int64_t delay)
72{
73 if (IsConnected())
74 Info("Closing connection to "+URL()+".");
75
76 // Close possible open connections
77 bs::error_code ec;
78 cancel(ec);
79 if (ec && ec!=ba::error::basic_errors::bad_descriptor)
80 {
81 ostringstream msg;
82 msg << "Cancel async requests on " << URL() << ": " << ec.message() << " (" << ec << ")";
83 Error(msg);
84 }
85
86 if (IsConnected())
87 {
88 close(ec);
89 if (ec)
90 {
91 ostringstream msg;
92 msg << "Closing " << URL() << ": " << ec.message() << " (" << ec << ")";
93 Error(msg);
94 }
95 else
96 Info("Closed connection to "+URL()+" succesfully.");
97 }
98
99 // Stop deadline counters
100 fInTimeout.cancel();
101 fOutTimeout.cancel();
102 fConnectTimeout.cancel();
103
104 // Reset the connection status
105 fQueueSize = 0;
106 fConnectionStatus = kDisconnected;
107
108#ifdef DEBUG
109 ofstream fout1("transmitted.txt", ios::app);
110 ofstream fout2("received.txt", ios::app);
111 ofstream fout3("send.txt", ios::app);
112 fout1 << Time() << ": ---" << endl;
113 fout2 << Time() << ": ---" << endl;
114 fout3 << Time() << ": ---" << endl;
115#endif
116
117 if (delay<0 || IsConnecting())
118 return;
119
120 // We need some timeout before reconnecting!
121 // And we have to check if we are alreayd trying to connect
122 // We should wait until all operations in progress were canceled
123 fConnectTimeout.expires_from_now(boost::posix_time::seconds(delay));
124 fConnectTimeout.async_wait(boost::bind(&ConnectionUSB::HandleReconnectTimeout, this, dummy::error));
125}
126
127void ConnectionUSB::PostClose(int64_t delay)
128{
129#if BOOST_VERSION < 107000
130 get_io_service().post(boost::bind(&ConnectionUSB::CloseImp, this, delay));
131#else
132 ba::post(boost::bind(&ConnectionUSB::CloseImp, this, delay));
133#endif
134}
135
136void ConnectionUSB::HandleReconnectTimeout(const bs::error_code &error)
137{
138 if (error==ba::error::basic_errors::operation_aborted)
139 return;
140
141 // 125: Operation canceled (bs::error_code(125, bs::system_category))
142 if (error)
143 {
144 ostringstream str;
145 str << "Reconnect timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
146 Error(str);
147
148 CloseImp(-1);
149 return;
150 }
151
152
153 if (is_open())
154 {
155 Error("HandleReconnectTimeout - "+URL()+" is already open.");
156 return;
157 }
158
159 // Check whether the deadline has passed. We compare the deadline
160 // against the current time since a new asynchronous operation
161 // may have moved the deadline before this actor had a chance
162 // to run.
163 if (fConnectTimeout.expires_at() > ba::deadline_timer::traits_type::now())
164 return;
165
166 // Start trying to reconnect
167 Connect();
168}
169
170
171// ------------------------ write --------------------------
172void ConnectionUSB::HandleWriteTimeout(const bs::error_code &error)
173{
174 if (error==ba::error::basic_errors::operation_aborted)
175 return;
176
177 // 125: Operation canceled (bs::error_code(125, bs::system_category))
178 if (error)
179 {
180 ostringstream str;
181 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
182 Error(str);
183
184 CloseImp(-1);
185 return;
186 }
187
188 if (!is_open())
189 {
190 // For example: Here we could schedule a new accept if we
191 // would not want to allow two connections at the same time.
192 return;
193 }
194
195 // Check whether the deadline has passed. We compare the deadline
196 // against the current time since a new asynchronous operation
197 // may have moved the deadline before this actor had a chance
198 // to run.
199 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
200 return;
201
202 Error("fOutTimeout has expired, writing data to "+URL());
203
204 CloseImp(-1);
205}
206
207void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n)
208{
209 if (error==ba::error::basic_errors::operation_aborted)
210 return;
211
212 if (error && error != ba::error::not_connected)
213 {
214 ostringstream str;
215 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
216 Error(str);
217
218 CloseImp(-1);
219 return;
220 }
221
222 if (error == ba::error::not_connected)
223 {
224 ostringstream msg;
225 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
226 Warn(msg);
227 return;
228 }
229
230 if (--fQueueSize==0)
231 fOutTimeout.cancel();
232
233#ifdef DEBUG_TX
234 ostringstream msg;
235 msg << n << " bytes successfully sent to " << URL();
236 Message(msg);
237#endif
238
239#ifdef DEBUG
240 ofstream fout("transmitted.txt", ios::app);
241 fout << Time() << ": ";
242 for (unsigned int i=0; i<fOutQueue.front().size(); i++)
243 fout << hex << setfill('0') << setw(2) << (uint32_t)fOutQueue.front()[i];
244 fout << endl;
245#endif
246
247 HandleTransmittedData(n);
248}
249
250void ConnectionUSB::PostMessage(const void *ptr, size_t sz)
251{
252 // This function can be called from a different thread...
253 if (!is_open())
254 return;
255
256 // ... this is why we have to increase fQueueSize first
257 fQueueSize++;
258
259 // ... and shift the deadline timer
260 // This is not ideal, because if we are continously
261 // filling the buffer, it will never timeout
262 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
263
264 // Now we can schedule the buffer to be sent
265 AsyncWrite(ba::const_buffers_1(ptr, sz));
266}
267
268void ConnectionUSB::PostMessage(const string &cmd, size_t max)
269{
270 if (max==size_t(-1))
271 max = cmd.length()+1;
272
273 PostMessage(cmd.c_str(), min(cmd.length()+1, max));
274}
275
276void ConnectionUSB::Connect(int _baud_rate, int _character_size,
277 parity::type _parity, stop_bits::type _stop_bits,
278 flow_control::type _flow_control)
279{
280 fConnectionStatus = kConnecting;
281
282 Info("Connecting to "+URL()+".");
283
284 bs::error_code ec;
285 open(URL(), ec);
286
287 if (ec)
288 {
289 ostringstream msg;
290 msg << "Error opening " << URL() << "... " << ec.message() << " (" << ec << ")";
291 Error(msg);
292 fConnectionStatus = kDisconnected;
293 return;
294 }
295
296 Info("Connection established.");
297
298 try
299 {
300 Debug("Setting Baud Rate");
301 set_option(baud_rate(_baud_rate));
302
303 Debug("Setting Character Size");
304 set_option(character_size(_character_size));
305
306 Debug("Setting Parity");
307 set_option(parity(_parity));
308
309 Debug("Setting Sop Bits");
310 set_option(stop_bits(_stop_bits));
311
312 Debug("Setting Flow control");
313 set_option(flow_control(_flow_control));
314 }
315 catch (const bs::system_error &erc)
316 {
317 Error(string("Setting connection options: ")+erc.what());
318 close();
319 return;
320 }
321
322 fQueueSize = 0;
323 fConnectionStatus = kConnected;
324
325 ConnectionEstablished();
326}
327
328void ConnectionUSB::SetEndpoint(const string &addr)
329{
330 if (fConnectionStatus>=1)
331 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
332
333 fAddress = "/dev/"+addr;
334}
335
336
337ConnectionUSB::ConnectionUSB(ba::io_service& ioservice, ostream &out) :
338MessageImp(out), ba::serial_port(ioservice), fLog(0),
339/*
340 fBaudRate(115200),
341fCharacterSize(8), fParity(parity::none), fStopBits(stop_bits::one),
342fFlowControl(flow_control::none),
343*/
344fInTimeout(ioservice), fOutTimeout(ioservice), fConnectTimeout(ioservice),
345fQueueSize(0), fConnectionStatus(kDisconnected)
346{
347}
Note: See TracBrowser for help on using the repository browser.