source: trunk/FACT++/src/ConnectionUSB.cc@ 17437

Last change on this file since 17437 was 16768, checked in by tbretz, 12 years ago
There is no need to make the scheduling of the async write asynchronously, so I schedule the async write now directly; this also removed the send queue, which was primarily introduced to make the tranmission to the FTM serial, but this is not needed anymore; to keep track of the send queue size and for debugging, a counter for the messages in the send buffer has been introduced
File size: 9.1 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "ConnectionUSB.h"
9
10#include <boost/bind.hpp>
11
12using namespace std;
13
14namespace ba = boost::asio;
15namespace bs = boost::system;
16namespace dummy = ba::placeholders;
17
18using ba::serial_port_base;
19
20//#define DEBUG_TX
21//#define DEBUG
22
23#ifdef DEBUG
24#include <fstream>
25#include <iomanip>
26#include "Time.h"
27#endif
28
29// -------- Abbreviations for starting async tasks ---------
30
31int ConnectionUSB::Write(const Time &t, const string &txt, int qos)
32{
33 if (fLog)
34 return fLog->Write(t, txt, qos);
35
36 return MessageImp::Write(t, txt, qos);
37}
38
39void ConnectionUSB::AsyncRead(const ba::mutable_buffers_1 buffers, int type, int counter)
40{
41 ba::async_read(*this, buffers,
42 boost::bind(&ConnectionUSB::HandleReceivedData, this,
43 dummy::error, dummy::bytes_transferred, type, counter));
44}
45
46void ConnectionUSB::AsyncWrite(const ba::const_buffers_1 &buffers)
47{
48 ba::async_write(*this, buffers,
49 boost::bind(&ConnectionUSB::HandleSentData, this,
50 dummy::error, dummy::bytes_transferred));
51}
52
53void ConnectionUSB::AsyncWait(ba::deadline_timer &timer, int millisec,
54 void (ConnectionUSB::*handler)(const bs::error_code&))
55{
56 // - The boost::asio::basic_deadline_timer::expires_from_now()
57 // function cancels any pending asynchronous waits, and returns
58 // the number of asynchronous waits that were cancelled. If it
59 // returns 0 then you were too late and the wait handler has
60 // already been executed, or will soon be executed. If it
61 // returns 1 then the wait handler was successfully cancelled.
62 // - If a wait handler is cancelled, the bs::error_code passed to
63 // it contains the value bs::error::operation_aborted.
64 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
65
66 timer.async_wait(boost::bind(handler, this, dummy::error));
67}
68
69// ------------------------ close --------------------------
70// close from another thread
71void ConnectionUSB::CloseImp(int64_t delay)
72{
73 if (IsConnected())
74 Info("Closing connection to "+URL()+".");
75
76 // Close possible open connections
77 bs::error_code ec;
78 cancel(ec);
79 if (ec && ec!=ba::error::basic_errors::bad_descriptor)
80 {
81 ostringstream msg;
82 msg << "Cancel async requests on " << URL() << ": " << ec.message() << " (" << ec << ")";
83 Error(msg);
84 }
85
86 if (IsConnected())
87 {
88 close(ec);
89 if (ec)
90 {
91 ostringstream msg;
92 msg << "Closing " << URL() << ": " << ec.message() << " (" << ec << ")";
93 Error(msg);
94 }
95 else
96 Info("Closed connection to "+URL()+" succesfully.");
97 }
98
99 // Stop deadline counters
100 fInTimeout.cancel();
101 fOutTimeout.cancel();
102 fConnectTimeout.cancel();
103
104 // Reset the connection status
105 fQueueSize = 0;
106 fConnectionStatus = kDisconnected;
107
108#ifdef DEBUG
109 ofstream fout1("transmitted.txt", ios::app);
110 ofstream fout2("received.txt", ios::app);
111 ofstream fout3("send.txt", ios::app);
112 fout1 << Time() << ": ---" << endl;
113 fout2 << Time() << ": ---" << endl;
114 fout3 << Time() << ": ---" << endl;
115#endif
116
117 if (delay<0 || IsConnecting())
118 return;
119
120 // We need some timeout before reconnecting!
121 // And we have to check if we are alreayd trying to connect
122 // We should wait until all operations in progress were canceled
123 fConnectTimeout.expires_from_now(boost::posix_time::seconds(delay));
124 fConnectTimeout.async_wait(boost::bind(&ConnectionUSB::HandleReconnectTimeout, this, dummy::error));
125}
126
127void ConnectionUSB::PostClose(int64_t delay)
128{
129 get_io_service().post(boost::bind(&ConnectionUSB::CloseImp, this, delay));
130}
131
132void ConnectionUSB::HandleReconnectTimeout(const bs::error_code &error)
133{
134 if (error==ba::error::basic_errors::operation_aborted)
135 return;
136
137 // 125: Operation canceled (bs::error_code(125, bs::system_category))
138 if (error)
139 {
140 ostringstream str;
141 str << "Reconnect timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
142 Error(str);
143
144 CloseImp(-1);
145 return;
146 }
147
148
149 if (is_open())
150 {
151 Error("HandleReconnectTimeout - "+URL()+" is already open.");
152 return;
153 }
154
155 // Check whether the deadline has passed. We compare the deadline
156 // against the current time since a new asynchronous operation
157 // may have moved the deadline before this actor had a chance
158 // to run.
159 if (fConnectTimeout.expires_at() > ba::deadline_timer::traits_type::now())
160 return;
161
162 // Start trying to reconnect
163 Connect();
164}
165
166
167// ------------------------ write --------------------------
168void ConnectionUSB::HandleWriteTimeout(const bs::error_code &error)
169{
170 if (error==ba::error::basic_errors::operation_aborted)
171 return;
172
173 // 125: Operation canceled (bs::error_code(125, bs::system_category))
174 if (error)
175 {
176 ostringstream str;
177 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
178 Error(str);
179
180 CloseImp(-1);
181 return;
182 }
183
184 if (!is_open())
185 {
186 // For example: Here we could schedule a new accept if we
187 // would not want to allow two connections at the same time.
188 return;
189 }
190
191 // Check whether the deadline has passed. We compare the deadline
192 // against the current time since a new asynchronous operation
193 // may have moved the deadline before this actor had a chance
194 // to run.
195 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
196 return;
197
198 Error("fOutTimeout has expired, writing data to "+URL());
199
200 CloseImp(-1);
201}
202
203void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n)
204{
205 if (error==ba::error::basic_errors::operation_aborted)
206 return;
207
208 if (error && error != ba::error::not_connected)
209 {
210 ostringstream str;
211 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
212 Error(str);
213
214 CloseImp(-1);
215 return;
216 }
217
218 if (error == ba::error::not_connected)
219 {
220 ostringstream msg;
221 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
222 Warn(msg);
223 return;
224 }
225
226 if (--fQueueSize==0)
227 fOutTimeout.cancel();
228
229#ifdef DEBUG_TX
230 ostringstream msg;
231 msg << n << " bytes successfully sent to " << URL();
232 Message(msg);
233#endif
234
235#ifdef DEBUG
236 ofstream fout("transmitted.txt", ios::app);
237 fout << Time() << ": ";
238 for (unsigned int i=0; i<fOutQueue.front().size(); i++)
239 fout << hex << setfill('0') << setw(2) << (uint32_t)fOutQueue.front()[i];
240 fout << endl;
241#endif
242
243 HandleTransmittedData(n);
244}
245
246void ConnectionUSB::PostMessage(const void *ptr, size_t sz)
247{
248 // This function can be called from a different thread...
249 if (!is_open())
250 return;
251
252 // ... this is why we have to increase fQueueSize first
253 fQueueSize++;
254
255 // ... and shift the deadline timer
256 // This is not ideal, because if we are continously
257 // filling the buffer, it will never timeout
258 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
259
260 // Now we can schedule the buffer to be sent
261 AsyncWrite(ba::const_buffers_1(ptr, sz));
262}
263
264void ConnectionUSB::PostMessage(const string &cmd, size_t max)
265{
266 if (max==size_t(-1))
267 max = cmd.length()+1;
268
269 PostMessage(cmd.c_str(), min(cmd.length()+1, max));
270}
271
272void ConnectionUSB::Connect()
273{
274 fConnectionStatus = kConnecting;
275
276 Info("Connecting to "+URL()+".");
277
278 bs::error_code ec;
279 open(URL(), ec);
280
281 if (ec)
282 {
283 ostringstream msg;
284 msg << "Error opening " << URL() << "... " << ec.message() << " (" << ec << ")";
285 Error(msg);
286 fConnectionStatus = kDisconnected;
287 return;
288 }
289
290 Info("Connection established.");
291
292 try
293 {
294 set_option(fBaudRate);
295 set_option(fCharacterSize);
296 set_option(fParity);
297 set_option(fStopBits);
298 set_option(fFlowControl);
299 }
300 catch (const bs::system_error &erc)
301 {
302 Error(string("Setting connection options: ")+erc.what());
303 // CLOSE
304 return;
305 }
306
307 fQueueSize = 0;
308 fConnectionStatus = kConnected;
309
310 ConnectionEstablished();
311}
312
313void ConnectionUSB::SetEndpoint(const string &addr)
314{
315 if (fConnectionStatus>=1)
316 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
317
318 fAddress = "/dev/"+addr;
319}
320
321
322ConnectionUSB::ConnectionUSB(ba::io_service& ioservice, ostream &out) :
323MessageImp(out), ba::serial_port(ioservice), fLog(0),
324fBaudRate(115200),
325fCharacterSize(8), fParity(parity::none), fStopBits(stop_bits::one),
326fFlowControl(flow_control::hardware),
327fInTimeout(ioservice), fOutTimeout(ioservice), fConnectTimeout(ioservice),
328fQueueSize(0), fConnectionStatus(kDisconnected)
329{
330}
Note: See TracBrowser for help on using the repository browser.