source: trunk/FACT++/src/ConnectionUSB.cc@ 12231

Last change on this file since 12231 was 12231, checked in by tbretz, 13 years ago
Fixed the new DEBUG stuff.
File size: 8.9 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "ConnectionUSB.h"
9
10#include <boost/bind.hpp>
11
12using namespace std;
13
14namespace ba = boost::asio;
15namespace bs = boost::system;
16namespace dummy = ba::placeholders;
17
18using ba::serial_port_base;
19
20#include <fstream>
21#include <iomanip>
22#include "Time.h"
23#define DEBUG
24
25//#define DEBUG_TX
26
27 // -------- Abbreviations for starting async tasks ---------
28
29int ConnectionUSB::Write(const Time &t, const string &txt, int qos)
30{
31 if (fLog)
32 return fLog->Write(t, txt, qos);
33
34 return MessageImp::Write(t, txt, qos);
35}
36
37void ConnectionUSB::AsyncRead(const ba::mutable_buffers_1 buffers, int type, int counter)
38{
39 ba::async_read(*this, buffers,
40 boost::bind(&ConnectionUSB::HandleReceivedData, this,
41 dummy::error, dummy::bytes_transferred, type, counter));
42}
43
44void ConnectionUSB::AsyncWrite(const ba::const_buffers_1 &buffers)
45{
46 ba::async_write(*this, buffers,
47 boost::bind(&ConnectionUSB::HandleSentData, this,
48 dummy::error, dummy::bytes_transferred));
49}
50
51void ConnectionUSB::AsyncWait(ba::deadline_timer &timer, int millisec,
52 void (ConnectionUSB::*handler)(const bs::error_code&))
53{
54 // - The boost::asio::basic_deadline_timer::expires_from_now()
55 // function cancels any pending asynchronous waits, and returns
56 // the number of asynchronous waits that were cancelled. If it
57 // returns 0 then you were too late and the wait handler has
58 // already been executed, or will soon be executed. If it
59 // returns 1 then the wait handler was successfully cancelled.
60 // - If a wait handler is cancelled, the bs::error_code passed to
61 // it contains the value bs::error::operation_aborted.
62 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
63
64 timer.async_wait(boost::bind(handler, this, dummy::error));
65}
66
67// ------------------------ close --------------------------
68// close from another thread
69void ConnectionUSB::CloseImp(bool restart)
70{
71 if (IsConnected())
72 {
73 ostringstream str;
74 str << "Closing connection to " << URL() << ".";
75 Info(str);
76 }
77
78 // Close possible open connections
79 bs::error_code ec;
80 cancel(ec);
81 if (ec)
82 Error("Cancel async requests on "+URL()+": "+ec.message());
83
84 if (IsConnected())
85 {
86 close(ec);
87 if (ec)
88 Error("Closing "+URL()+": "+ec.message());
89 else
90 Info("Connection closed succesfully.");
91 }
92
93 // Stop deadline counters
94 fInTimeout.cancel();
95 fOutTimeout.cancel();
96
97 // Reset the connection status
98 fConnectionStatus = kDisconnected;
99
100 // Empty output queue
101 fOutQueue.clear();
102
103 if (!restart || IsConnecting())
104 return;
105
106 // We need some timeout before reconnecting!
107 // And we have to check if we are alreayd trying to connect
108 // We shoudl wait until all operations in progress were canceled
109
110 // Start trying to reconnect
111 Connect();
112}
113
114void ConnectionUSB::PostClose(bool restart)
115{
116 get_io_service().post(boost::bind(&ConnectionUSB::CloseImp, this, restart));
117}
118
119// ------------------------ write --------------------------
120void ConnectionUSB::HandleWriteTimeout(const bs::error_code &error)
121{
122 if (error==ba::error::basic_errors::operation_aborted)
123 return;
124
125 // 125: Operation canceled (bs::error_code(125, bs::system_category))
126 if (error)
127 {
128 ostringstream str;
129 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
130 Error(str);
131
132 CloseImp(false);
133 return;
134 }
135
136 if (!is_open())
137 {
138 // For example: Here we could schedule a new accept if we
139 // would not want to allow two connections at the same time.
140 return;
141 }
142
143 // Check whether the deadline has passed. We compare the deadline
144 // against the current time since a new asynchronous operation
145 // may have moved the deadline before this actor had a chance
146 // to run.
147 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
148 return;
149
150 Error("fOutTimeout has expired, writing data to "+URL());
151
152 CloseImp(false);
153}
154
155void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n)
156{
157 if (error && error != ba::error::not_connected)
158 {
159 ostringstream str;
160 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
161 Error(str);
162
163 CloseImp(false);
164 return;
165 }
166
167 if (error == ba::error::not_connected)
168 {
169 ostringstream msg;
170 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
171 Warn(msg);
172 }
173 else
174 {
175#ifdef DEBUG_TX
176 ostringstream msg;
177 msg << n << " bytes successfully sent to " << URL();
178 Message(msg);
179#endif
180 }
181
182#ifdef DEBUG
183 ofstream fout("tansmitted.txt", ios::app);
184 fout << Time() << ": ";
185 for (unsigned int i=0; i<fOutQueue.front().size(); i++)
186 fout << hex << setfill('0') << setw(2) << (uint32_t)fOutQueue.front()[i];
187 fout << endl;
188#endif
189
190 HandleTransmittedData(n);
191
192 // This is "thread" safe because SendMessage and HandleSentMessage
193 // are serialized in the EventQueue. Note: Do not call these
194 // functions directly from any other place then Handlers, use
195 // PostMessage instead
196 fOutQueue.pop_front();
197
198 if (fOutQueue.empty())
199 {
200 // Queue went empty, remove deadline
201 fOutTimeout.cancel();
202 return;
203 }
204
205 // AsyncWrite + Deadline
206 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
207 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
208}
209
210// It is important that when SendMessageImp is called, or to be more
211// precise boost::bind is called, teh data is copied!
212void ConnectionUSB::SendMessageImp(const vector<char> msg)
213{
214 /*
215 if (!fConnectionEstablished)
216 {
217 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
218 return;
219 }*/
220
221 const bool first_message_in_queue = fOutQueue.empty();
222
223 // This is "thread" safe because SendMessage and HandleSentMessage
224 // are serialized in the EventQueue. Note: Do not call these
225 // functions directly from any other place then Handlers, use
226 // PostMessage instead
227 fOutQueue.push_back(msg);
228
229 if (!first_message_in_queue)
230 return;
231
232#ifdef DEBUG
233 ofstream fout("send.txt", ios::app);
234 fout << Time() << ": ";
235 for (unsigned int i=0; i<msg.size(); i++)
236 fout << hex << setfill('0') << setw(2) << (uint32_t)msg[i];
237 fout << endl;
238#endif
239
240 // AsyncWrite + Deadline
241 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
242 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
243}
244
245void ConnectionUSB::PostMessage(const void *ptr, size_t max)
246{
247 const vector<char> msg(reinterpret_cast<const char*>(ptr),
248 reinterpret_cast<const char*>(ptr)+max);
249
250 get_io_service().post(boost::bind(&ConnectionUSB::SendMessageImp, this, msg));
251}
252
253void ConnectionUSB::PostMessage(const string &cmd, size_t max)
254{
255 if (max==size_t(-1))
256 max = cmd.length()+1;
257
258 vector <char>msg(max);
259
260 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
261
262 PostMessage(msg);
263}
264
265void ConnectionUSB::Connect()
266{
267 fConnectionStatus = kConnecting;
268
269 Info("Connecting to "+URL()+".");
270
271 bs::error_code ec;
272 open(URL(), ec);
273
274 if (ec)
275 {
276 ostringstream msg;
277 msg << "Error opening " << URL() << "... " << ec.message() << " (" << ec << ")";
278 Error(msg);
279 fConnectionStatus = kDisconnected;
280 return;
281 }
282
283 Info("Connection established.");
284
285 try
286 {
287 set_option(fBaudRate);
288 set_option(fCharacterSize);
289 set_option(fParity);
290 set_option(fStopBits);
291 set_option(fFlowControl);
292 }
293 catch (const bs::system_error &erc)
294 {
295 Error(string("Setting connection options: ")+erc.what());
296 // CLOSE
297 return;
298 }
299
300 fConnectionStatus = kConnected;
301
302 ConnectionEstablished();
303}
304
305void ConnectionUSB::SetEndpoint(const string &addr)
306{
307 if (fConnectionStatus>=1)
308 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
309
310 fAddress = "/dev/"+addr;
311}
312
313
314ConnectionUSB::ConnectionUSB(ba::io_service& ioservice, ostream &out) :
315MessageImp(out), ba::serial_port(ioservice), fLog(0),
316fBaudRate(115200),
317fCharacterSize(8), fParity(parity::none), fStopBits(stop_bits::one),
318fFlowControl(flow_control::hardware),
319fInTimeout(ioservice), fOutTimeout(ioservice),
320fConnectionStatus(kDisconnected)
321{
322}
Note: See TracBrowser for help on using the repository browser.