source: trunk/FACT++/src/ConnectionUSB.cc@ 12229

Last change on this file since 12229 was 12229, checked in by tbretz, 13 years ago
Added some debug out code.
File size: 8.8 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "ConnectionUSB.h"
9
10#include <boost/bind.hpp>
11
12using namespace std;
13
14namespace ba = boost::asio;
15namespace bs = boost::system;
16namespace dummy = ba::placeholders;
17
18using ba::serial_port_base;
19
20//#define DEBUG_TX
21
22 // -------- Abbreviations for starting async tasks ---------
23
24int ConnectionUSB::Write(const Time &t, const string &txt, int qos)
25{
26 if (fLog)
27 return fLog->Write(t, txt, qos);
28
29 return MessageImp::Write(t, txt, qos);
30}
31
32void ConnectionUSB::AsyncRead(const ba::mutable_buffers_1 buffers, int type, int counter)
33{
34 ba::async_read(*this, buffers,
35 boost::bind(&ConnectionUSB::HandleReceivedData, this,
36 dummy::error, dummy::bytes_transferred, type, counter));
37}
38
39void ConnectionUSB::AsyncWrite(const ba::const_buffers_1 &buffers)
40{
41 ba::async_write(*this, buffers,
42 boost::bind(&ConnectionUSB::HandleSentData, this,
43 dummy::error, dummy::bytes_transferred));
44}
45
46void ConnectionUSB::AsyncWait(ba::deadline_timer &timer, int millisec,
47 void (ConnectionUSB::*handler)(const bs::error_code&))
48{
49 // - The boost::asio::basic_deadline_timer::expires_from_now()
50 // function cancels any pending asynchronous waits, and returns
51 // the number of asynchronous waits that were cancelled. If it
52 // returns 0 then you were too late and the wait handler has
53 // already been executed, or will soon be executed. If it
54 // returns 1 then the wait handler was successfully cancelled.
55 // - If a wait handler is cancelled, the bs::error_code passed to
56 // it contains the value bs::error::operation_aborted.
57 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
58
59 timer.async_wait(boost::bind(handler, this, dummy::error));
60}
61
62// ------------------------ close --------------------------
63// close from another thread
64void ConnectionUSB::CloseImp(bool restart)
65{
66 if (IsConnected())
67 {
68 ostringstream str;
69 str << "Closing connection to " << URL() << ".";
70 Info(str);
71 }
72
73 // Close possible open connections
74 bs::error_code ec;
75 cancel(ec);
76 if (ec)
77 Error("Cancel async requests on "+URL()+": "+ec.message());
78
79 if (IsConnected())
80 {
81 close(ec);
82 if (ec)
83 Error("Closing "+URL()+": "+ec.message());
84 else
85 Info("Connection closed succesfully.");
86 }
87
88 // Stop deadline counters
89 fInTimeout.cancel();
90 fOutTimeout.cancel();
91
92 // Reset the connection status
93 fConnectionStatus = kDisconnected;
94
95 // Empty output queue
96 fOutQueue.clear();
97
98 if (!restart || IsConnecting())
99 return;
100
101 // We need some timeout before reconnecting!
102 // And we have to check if we are alreayd trying to connect
103 // We shoudl wait until all operations in progress were canceled
104
105 // Start trying to reconnect
106 Connect();
107}
108
109void ConnectionUSB::PostClose(bool restart)
110{
111 get_io_service().post(boost::bind(&ConnectionUSB::CloseImp, this, restart));
112}
113
114// ------------------------ write --------------------------
115void ConnectionUSB::HandleWriteTimeout(const bs::error_code &error)
116{
117 if (error==ba::error::basic_errors::operation_aborted)
118 return;
119
120 // 125: Operation canceled (bs::error_code(125, bs::system_category))
121 if (error)
122 {
123 ostringstream str;
124 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
125 Error(str);
126
127 CloseImp(false);
128 return;
129 }
130
131 if (!is_open())
132 {
133 // For example: Here we could schedule a new accept if we
134 // would not want to allow two connections at the same time.
135 return;
136 }
137
138 // Check whether the deadline has passed. We compare the deadline
139 // against the current time since a new asynchronous operation
140 // may have moved the deadline before this actor had a chance
141 // to run.
142 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
143 return;
144
145 Error("fOutTimeout has expired, writing data to "+URL());
146
147 CloseImp(false);
148}
149
150void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n)
151{
152 if (error && error != ba::error::not_connected)
153 {
154 ostringstream str;
155 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
156 Error(str);
157
158 CloseImp(false);
159 return;
160 }
161
162 if (error == ba::error::not_connected)
163 {
164 ostringstream msg;
165 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
166 Warn(msg);
167 }
168 else
169 {
170#ifdef DEBUG_TX
171 ostringstream msg;
172 msg << n << " bytes successfully sent to " << URL();
173 Message(msg);
174#endif
175 }
176
177#ifdef DEBUG
178 ofstream fout("tansmitted.txt", ios::app);
179 fout << Time() << ": ";
180 for (int i=0; i<fOutQueue.front().size(); i++)
181 fout << hex << setfill('0') << setw(2) << (uint32_t)fOutQueue.front()[i];
182 fout << endl;
183#endif
184
185 HandleTransmittedData(n);
186
187 // This is "thread" safe because SendMessage and HandleSentMessage
188 // are serialized in the EventQueue. Note: Do not call these
189 // functions directly from any other place then Handlers, use
190 // PostMessage instead
191 fOutQueue.pop_front();
192
193 if (fOutQueue.empty())
194 {
195 // Queue went empty, remove deadline
196 fOutTimeout.cancel();
197 return;
198 }
199
200 // AsyncWrite + Deadline
201 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
202 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
203}
204
205// It is important that when SendMessageImp is called, or to be more
206// precise boost::bind is called, teh data is copied!
207void ConnectionUSB::SendMessageImp(const vector<char> msg)
208{
209 /*
210 if (!fConnectionEstablished)
211 {
212 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
213 return;
214 }*/
215
216 const bool first_message_in_queue = fOutQueue.empty();
217
218 // This is "thread" safe because SendMessage and HandleSentMessage
219 // are serialized in the EventQueue. Note: Do not call these
220 // functions directly from any other place then Handlers, use
221 // PostMessage instead
222 fOutQueue.push_back(msg);
223
224 if (!first_message_in_queue)
225 return;
226
227#ifdef DEBUG
228 ofstream fout("send.txt", ios::app);
229 fout << Time() << ": ";
230 for (int i=0; i<msg.size(); i++)
231 fout << hex << setfill('0') << setw(2) << (uint32_t)msg[i];
232 fout << endl;
233#endif
234
235 // AsyncWrite + Deadline
236 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
237 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
238}
239
240void ConnectionUSB::PostMessage(const void *ptr, size_t max)
241{
242 const vector<char> msg(reinterpret_cast<const char*>(ptr),
243 reinterpret_cast<const char*>(ptr)+max);
244
245 get_io_service().post(boost::bind(&ConnectionUSB::SendMessageImp, this, msg));
246}
247
248void ConnectionUSB::PostMessage(const string &cmd, size_t max)
249{
250 if (max==size_t(-1))
251 max = cmd.length()+1;
252
253 vector <char>msg(max);
254
255 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
256
257 PostMessage(msg);
258}
259
260void ConnectionUSB::Connect()
261{
262 fConnectionStatus = kConnecting;
263
264 Info("Connecting to "+URL()+".");
265
266 bs::error_code ec;
267 open(URL(), ec);
268
269 if (ec)
270 {
271 ostringstream msg;
272 msg << "Error opening " << URL() << "... " << ec.message() << " (" << ec << ")";
273 Error(msg);
274 fConnectionStatus = kDisconnected;
275 return;
276 }
277
278 Info("Connection established.");
279
280 try
281 {
282 set_option(fBaudRate);
283 set_option(fCharacterSize);
284 set_option(fParity);
285 set_option(fStopBits);
286 set_option(fFlowControl);
287 }
288 catch (const bs::system_error &erc)
289 {
290 Error(string("Setting connection options: ")+erc.what());
291 // CLOSE
292 return;
293 }
294
295 fConnectionStatus = kConnected;
296
297 ConnectionEstablished();
298}
299
300void ConnectionUSB::SetEndpoint(const string &addr)
301{
302 if (fConnectionStatus>=1)
303 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
304
305 fAddress = "/dev/"+addr;
306}
307
308
309ConnectionUSB::ConnectionUSB(ba::io_service& ioservice, ostream &out) :
310MessageImp(out), ba::serial_port(ioservice), fLog(0),
311fBaudRate(115200),
312fCharacterSize(8), fParity(parity::none), fStopBits(stop_bits::one),
313fFlowControl(flow_control::hardware),
314fInTimeout(ioservice), fOutTimeout(ioservice),
315fConnectionStatus(kDisconnected)
316{
317}
Note: See TracBrowser for help on using the repository browser.