source: trunk/FACT++/src/ConnectionUSB.cc@ 12951

Last change on this file since 12951 was 12540, checked in by tbretz, 13 years ago
Fixed a bug which could cause crahses when a transmission is interrupted at the moment the connection is opened and e.g. the bias crate is not yet switched on.
File size: 9.3 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "ConnectionUSB.h"
9
10#include <boost/bind.hpp>
11
12using namespace std;
13
14namespace ba = boost::asio;
15namespace bs = boost::system;
16namespace dummy = ba::placeholders;
17
18using ba::serial_port_base;
19
20//#define DEBUG_TX
21//#define DEBUG
22
23#ifdef DEBUG
24#include <fstream>
25#include <iomanip>
26#include "Time.h"
27#endif
28
29// -------- Abbreviations for starting async tasks ---------
30
31int ConnectionUSB::Write(const Time &t, const string &txt, int qos)
32{
33 if (fLog)
34 return fLog->Write(t, txt, qos);
35
36 return MessageImp::Write(t, txt, qos);
37}
38
39void ConnectionUSB::AsyncRead(const ba::mutable_buffers_1 buffers, int type, int counter)
40{
41 ba::async_read(*this, buffers,
42 boost::bind(&ConnectionUSB::HandleReceivedData, this,
43 dummy::error, dummy::bytes_transferred, type, counter));
44}
45
46void ConnectionUSB::AsyncWrite(const ba::const_buffers_1 &buffers)
47{
48 ba::async_write(*this, buffers,
49 boost::bind(&ConnectionUSB::HandleSentData, this,
50 dummy::error, dummy::bytes_transferred));
51}
52
53void ConnectionUSB::AsyncWait(ba::deadline_timer &timer, int millisec,
54 void (ConnectionUSB::*handler)(const bs::error_code&))
55{
56 // - The boost::asio::basic_deadline_timer::expires_from_now()
57 // function cancels any pending asynchronous waits, and returns
58 // the number of asynchronous waits that were cancelled. If it
59 // returns 0 then you were too late and the wait handler has
60 // already been executed, or will soon be executed. If it
61 // returns 1 then the wait handler was successfully cancelled.
62 // - If a wait handler is cancelled, the bs::error_code passed to
63 // it contains the value bs::error::operation_aborted.
64 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
65
66 timer.async_wait(boost::bind(handler, this, dummy::error));
67}
68
69// ------------------------ close --------------------------
70// close from another thread
71void ConnectionUSB::CloseImp(bool restart)
72{
73 if (IsConnected())
74 {
75 ostringstream str;
76 str << "Closing connection to " << URL() << ".";
77 Info(str);
78 }
79
80 // Close possible open connections
81 bs::error_code ec;
82 cancel(ec);
83 if (ec)
84 Error("Cancel async requests on "+URL()+": "+ec.message());
85
86 if (IsConnected())
87 {
88 close(ec);
89 if (ec)
90 Error("Closing "+URL()+": "+ec.message());
91 else
92 Info("Connection closed succesfully.");
93 }
94
95 // Stop deadline counters
96 fInTimeout.cancel();
97 fOutTimeout.cancel();
98
99 // Reset the connection status
100 fConnectionStatus = kDisconnected;
101
102 // Empty output queue
103 fOutQueue.clear();
104
105#ifdef DEBUG
106 ofstream fout1("transmitted.txt", ios::app);
107 ofstream fout2("received.txt", ios::app);
108 ofstream fout3("send.txt", ios::app);
109 fout1 << Time() << ": ---" << endl;
110 fout2 << Time() << ": ---" << endl;
111 fout3 << Time() << ": ---" << endl;
112#endif
113
114 if (!restart || IsConnecting())
115 return;
116
117 // We need some timeout before reconnecting!
118 // And we have to check if we are alreayd trying to connect
119 // We shoudl wait until all operations in progress were canceled
120
121 // Start trying to reconnect
122 Connect();
123}
124
125void ConnectionUSB::PostClose(bool restart)
126{
127 get_io_service().post(boost::bind(&ConnectionUSB::CloseImp, this, restart));
128}
129
130// ------------------------ write --------------------------
131void ConnectionUSB::HandleWriteTimeout(const bs::error_code &error)
132{
133 if (error==ba::error::basic_errors::operation_aborted)
134 return;
135
136 // 125: Operation canceled (bs::error_code(125, bs::system_category))
137 if (error)
138 {
139 ostringstream str;
140 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
141 Error(str);
142
143 CloseImp(false);
144 return;
145 }
146
147 if (!is_open())
148 {
149 // For example: Here we could schedule a new accept if we
150 // would not want to allow two connections at the same time.
151 return;
152 }
153
154 // Check whether the deadline has passed. We compare the deadline
155 // against the current time since a new asynchronous operation
156 // may have moved the deadline before this actor had a chance
157 // to run.
158 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
159 return;
160
161 Error("fOutTimeout has expired, writing data to "+URL());
162
163 CloseImp(false);
164}
165
166void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n)
167{
168 if (error && error != ba::error::not_connected)
169 {
170 ostringstream str;
171 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
172 Error(str);
173
174 CloseImp(false);
175 return;
176 }
177
178 if (error == ba::error::not_connected)
179 {
180 ostringstream msg;
181 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
182 Warn(msg);
183 }
184 else
185 {
186#ifdef DEBUG_TX
187 ostringstream msg;
188 msg << n << " bytes successfully sent to " << URL();
189 Message(msg);
190#endif
191 }
192
193#ifdef DEBUG
194 ofstream fout("transmitted.txt", ios::app);
195 fout << Time() << ": ";
196 for (unsigned int i=0; i<fOutQueue.front().size(); i++)
197 fout << hex << setfill('0') << setw(2) << (uint32_t)fOutQueue.front()[i];
198 fout << endl;
199#endif
200
201 HandleTransmittedData(n);
202
203 // This is "thread" safe because SendMessage and HandleSentMessage
204 // are serialized in the EventQueue. Note: Do not call these
205 // functions directly from any other place then Handlers, use
206 // PostMessage instead
207 if (!fOutQueue.empty())
208 fOutQueue.pop_front();
209
210 if (fOutQueue.empty())
211 {
212 // Queue went empty, remove deadline
213 fOutTimeout.cancel();
214 return;
215 }
216
217 // AsyncWrite + Deadline
218 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
219 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
220}
221
222// It is important that when SendMessageImp is called, or to be more
223// precise boost::bind is called, teh data is copied!
224void ConnectionUSB::SendMessageImp(const vector<uint8_t> msg)
225{
226 /*
227 if (!fConnectionEstablished)
228 {
229 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
230 return;
231 }*/
232
233 const bool first_message_in_queue = fOutQueue.empty();
234
235 // This is "thread" safe because SendMessage and HandleSentMessage
236 // are serialized in the EventQueue. Note: Do not call these
237 // functions directly from any other place then Handlers, use
238 // PostMessage instead
239 fOutQueue.push_back(msg);
240
241 if (!first_message_in_queue)
242 return;
243
244#ifdef DEBUG
245 ofstream fout("send.txt", ios::app);
246 fout << Time() << ": ";
247 for (unsigned int i=0; i<msg.size(); i++)
248 fout << hex << setfill('0') << setw(2) << (uint32_t)msg[i];
249 fout << endl;
250#endif
251
252 // AsyncWrite + Deadline
253 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
254 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
255}
256
257void ConnectionUSB::PostMessage(const void *ptr, size_t max)
258{
259 const vector<uint8_t> msg(reinterpret_cast<const uint8_t*>(ptr),
260 reinterpret_cast<const uint8_t*>(ptr)+max);
261
262 get_io_service().post(boost::bind(&ConnectionUSB::SendMessageImp, this, msg));
263}
264
265void ConnectionUSB::PostMessage(const string &cmd, size_t max)
266{
267 if (max==size_t(-1))
268 max = cmd.length()+1;
269
270 vector <char>msg(max);
271
272 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
273
274 PostMessage(msg);
275}
276
277void ConnectionUSB::Connect()
278{
279 fConnectionStatus = kConnecting;
280
281 Info("Connecting to "+URL()+".");
282
283 bs::error_code ec;
284 open(URL(), ec);
285
286 if (ec)
287 {
288 ostringstream msg;
289 msg << "Error opening " << URL() << "... " << ec.message() << " (" << ec << ")";
290 Error(msg);
291 fConnectionStatus = kDisconnected;
292 return;
293 }
294
295 Info("Connection established.");
296
297 try
298 {
299 set_option(fBaudRate);
300 set_option(fCharacterSize);
301 set_option(fParity);
302 set_option(fStopBits);
303 set_option(fFlowControl);
304 }
305 catch (const bs::system_error &erc)
306 {
307 Error(string("Setting connection options: ")+erc.what());
308 // CLOSE
309 return;
310 }
311
312 fConnectionStatus = kConnected;
313
314 ConnectionEstablished();
315}
316
317void ConnectionUSB::SetEndpoint(const string &addr)
318{
319 if (fConnectionStatus>=1)
320 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
321
322 fAddress = "/dev/"+addr;
323}
324
325
326ConnectionUSB::ConnectionUSB(ba::io_service& ioservice, ostream &out) :
327MessageImp(out), ba::serial_port(ioservice), fLog(0),
328fBaudRate(115200),
329fCharacterSize(8), fParity(parity::none), fStopBits(stop_bits::one),
330fFlowControl(flow_control::hardware),
331fInTimeout(ioservice), fOutTimeout(ioservice),
332fConnectionStatus(kDisconnected)
333{
334}
Note: See TracBrowser for help on using the repository browser.