source: trunk/FACT++/src/ConnectionUSB.cc@ 18998

Last change on this file since 18998 was 18996, checked in by tbretz, 7 years ago
Connection must be closed when setting a parameter fails, it is unusable. Made Connect() virtual to allow the derived class to access the parameters when a connection gets reconnected.
File size: 9.6 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "ConnectionUSB.h"
9
10#include <boost/bind.hpp>
11
12using namespace std;
13
14namespace ba = boost::asio;
15namespace bs = boost::system;
16namespace dummy = ba::placeholders;
17
18using ba::serial_port_base;
19
20//#define DEBUG_TX
21//#define DEBUG
22
23#ifdef DEBUG
24#include <fstream>
25#include <iomanip>
26#include "Time.h"
27#endif
28
29// -------- Abbreviations for starting async tasks ---------
30
31int ConnectionUSB::Write(const Time &t, const string &txt, int qos)
32{
33 if (fLog)
34 return fLog->Write(t, txt, qos);
35
36 return MessageImp::Write(t, txt, qos);
37}
38
39void ConnectionUSB::AsyncRead(const ba::mutable_buffers_1 buffers, int type, int counter)
40{
41 ba::async_read(*this, buffers,
42 boost::bind(&ConnectionUSB::HandleReceivedData, this,
43 dummy::error, dummy::bytes_transferred, type, counter));
44}
45
46void ConnectionUSB::AsyncWrite(const ba::const_buffers_1 &buffers)
47{
48 ba::async_write(*this, buffers,
49 boost::bind(&ConnectionUSB::HandleSentData, this,
50 dummy::error, dummy::bytes_transferred));
51}
52
53void ConnectionUSB::AsyncWait(ba::deadline_timer &timer, int millisec,
54 void (ConnectionUSB::*handler)(const bs::error_code&))
55{
56 // - The boost::asio::basic_deadline_timer::expires_from_now()
57 // function cancels any pending asynchronous waits, and returns
58 // the number of asynchronous waits that were cancelled. If it
59 // returns 0 then you were too late and the wait handler has
60 // already been executed, or will soon be executed. If it
61 // returns 1 then the wait handler was successfully cancelled.
62 // - If a wait handler is cancelled, the bs::error_code passed to
63 // it contains the value bs::error::operation_aborted.
64 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
65
66 timer.async_wait(boost::bind(handler, this, dummy::error));
67}
68
69// ------------------------ close --------------------------
70// close from another thread
71void ConnectionUSB::CloseImp(int64_t delay)
72{
73 if (IsConnected())
74 Info("Closing connection to "+URL()+".");
75
76 // Close possible open connections
77 bs::error_code ec;
78 cancel(ec);
79 if (ec && ec!=ba::error::basic_errors::bad_descriptor)
80 {
81 ostringstream msg;
82 msg << "Cancel async requests on " << URL() << ": " << ec.message() << " (" << ec << ")";
83 Error(msg);
84 }
85
86 if (IsConnected())
87 {
88 close(ec);
89 if (ec)
90 {
91 ostringstream msg;
92 msg << "Closing " << URL() << ": " << ec.message() << " (" << ec << ")";
93 Error(msg);
94 }
95 else
96 Info("Closed connection to "+URL()+" succesfully.");
97 }
98
99 // Stop deadline counters
100 fInTimeout.cancel();
101 fOutTimeout.cancel();
102 fConnectTimeout.cancel();
103
104 // Reset the connection status
105 fQueueSize = 0;
106 fConnectionStatus = kDisconnected;
107
108#ifdef DEBUG
109 ofstream fout1("transmitted.txt", ios::app);
110 ofstream fout2("received.txt", ios::app);
111 ofstream fout3("send.txt", ios::app);
112 fout1 << Time() << ": ---" << endl;
113 fout2 << Time() << ": ---" << endl;
114 fout3 << Time() << ": ---" << endl;
115#endif
116
117 if (delay<0 || IsConnecting())
118 return;
119
120 // We need some timeout before reconnecting!
121 // And we have to check if we are alreayd trying to connect
122 // We should wait until all operations in progress were canceled
123 fConnectTimeout.expires_from_now(boost::posix_time::seconds(delay));
124 fConnectTimeout.async_wait(boost::bind(&ConnectionUSB::HandleReconnectTimeout, this, dummy::error));
125}
126
127void ConnectionUSB::PostClose(int64_t delay)
128{
129 get_io_service().post(boost::bind(&ConnectionUSB::CloseImp, this, delay));
130}
131
132void ConnectionUSB::HandleReconnectTimeout(const bs::error_code &error)
133{
134 if (error==ba::error::basic_errors::operation_aborted)
135 return;
136
137 // 125: Operation canceled (bs::error_code(125, bs::system_category))
138 if (error)
139 {
140 ostringstream str;
141 str << "Reconnect timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
142 Error(str);
143
144 CloseImp(-1);
145 return;
146 }
147
148
149 if (is_open())
150 {
151 Error("HandleReconnectTimeout - "+URL()+" is already open.");
152 return;
153 }
154
155 // Check whether the deadline has passed. We compare the deadline
156 // against the current time since a new asynchronous operation
157 // may have moved the deadline before this actor had a chance
158 // to run.
159 if (fConnectTimeout.expires_at() > ba::deadline_timer::traits_type::now())
160 return;
161
162 // Start trying to reconnect
163 Connect();
164}
165
166
167// ------------------------ write --------------------------
168void ConnectionUSB::HandleWriteTimeout(const bs::error_code &error)
169{
170 if (error==ba::error::basic_errors::operation_aborted)
171 return;
172
173 // 125: Operation canceled (bs::error_code(125, bs::system_category))
174 if (error)
175 {
176 ostringstream str;
177 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
178 Error(str);
179
180 CloseImp(-1);
181 return;
182 }
183
184 if (!is_open())
185 {
186 // For example: Here we could schedule a new accept if we
187 // would not want to allow two connections at the same time.
188 return;
189 }
190
191 // Check whether the deadline has passed. We compare the deadline
192 // against the current time since a new asynchronous operation
193 // may have moved the deadline before this actor had a chance
194 // to run.
195 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
196 return;
197
198 Error("fOutTimeout has expired, writing data to "+URL());
199
200 CloseImp(-1);
201}
202
203void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n)
204{
205 if (error==ba::error::basic_errors::operation_aborted)
206 return;
207
208 if (error && error != ba::error::not_connected)
209 {
210 ostringstream str;
211 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
212 Error(str);
213
214 CloseImp(-1);
215 return;
216 }
217
218 if (error == ba::error::not_connected)
219 {
220 ostringstream msg;
221 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
222 Warn(msg);
223 return;
224 }
225
226 if (--fQueueSize==0)
227 fOutTimeout.cancel();
228
229#ifdef DEBUG_TX
230 ostringstream msg;
231 msg << n << " bytes successfully sent to " << URL();
232 Message(msg);
233#endif
234
235#ifdef DEBUG
236 ofstream fout("transmitted.txt", ios::app);
237 fout << Time() << ": ";
238 for (unsigned int i=0; i<fOutQueue.front().size(); i++)
239 fout << hex << setfill('0') << setw(2) << (uint32_t)fOutQueue.front()[i];
240 fout << endl;
241#endif
242
243 HandleTransmittedData(n);
244}
245
246void ConnectionUSB::PostMessage(const void *ptr, size_t sz)
247{
248 // This function can be called from a different thread...
249 if (!is_open())
250 return;
251
252 // ... this is why we have to increase fQueueSize first
253 fQueueSize++;
254
255 // ... and shift the deadline timer
256 // This is not ideal, because if we are continously
257 // filling the buffer, it will never timeout
258 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
259
260 // Now we can schedule the buffer to be sent
261 AsyncWrite(ba::const_buffers_1(ptr, sz));
262}
263
264void ConnectionUSB::PostMessage(const string &cmd, size_t max)
265{
266 if (max==size_t(-1))
267 max = cmd.length()+1;
268
269 PostMessage(cmd.c_str(), min(cmd.length()+1, max));
270}
271
272void ConnectionUSB::Connect(int _baud_rate, int _character_size,
273 parity::type _parity, stop_bits::type _stop_bits,
274 flow_control::type _flow_control)
275{
276 fConnectionStatus = kConnecting;
277
278 Info("Connecting to "+URL()+".");
279
280 bs::error_code ec;
281 open(URL(), ec);
282
283 if (ec)
284 {
285 ostringstream msg;
286 msg << "Error opening " << URL() << "... " << ec.message() << " (" << ec << ")";
287 Error(msg);
288 fConnectionStatus = kDisconnected;
289 return;
290 }
291
292 Info("Connection established.");
293
294 try
295 {
296 set_option(baud_rate(_baud_rate));
297 set_option(character_size(_character_size));
298 set_option(parity(_parity));
299 set_option(stop_bits(_stop_bits));
300 set_option(flow_control(_flow_control));
301/*
302 Info("Setting Baud Rate");
303 set_option(fBaudRate);
304 Info("Setting Character Size");
305 set_option(fCharacterSize);
306 Info("Setting Parity");
307 set_option(fParity);
308 Info("Setting Sop Bits");
309 set_option(fStopBits);
310 Info("Setting Flow control");
311 set_option(fFlowControl);
312 */
313 }
314 catch (const bs::system_error &erc)
315 {
316 Error(string("Setting connection options: ")+erc.what());
317 close();
318 return;
319 }
320
321 fQueueSize = 0;
322 fConnectionStatus = kConnected;
323
324 ConnectionEstablished();
325}
326
327void ConnectionUSB::SetEndpoint(const string &addr)
328{
329 if (fConnectionStatus>=1)
330 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
331
332 fAddress = "/dev/"+addr;
333}
334
335
336ConnectionUSB::ConnectionUSB(ba::io_service& ioservice, ostream &out) :
337MessageImp(out), ba::serial_port(ioservice), fLog(0),
338/*
339 fBaudRate(115200),
340fCharacterSize(8), fParity(parity::none), fStopBits(stop_bits::one),
341fFlowControl(flow_control::none),
342*/
343fInTimeout(ioservice), fOutTimeout(ioservice), fConnectTimeout(ioservice),
344fQueueSize(0), fConnectionStatus(kDisconnected)
345{
346}
Note: See TracBrowser for help on using the repository browser.