source: trunk/FACT++/src/ConnectionUSB.cc@ 11609

Last change on this file since 11609 was 11479, checked in by tbretz, 13 years ago
Replaces ato/atof by stoi/stof; replaced boost::lexical_cast by to_string
File size: 8.2 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "ConnectionUSB.h"
9
10#include <boost/bind.hpp>
11
12using namespace std;
13
14namespace ba = boost::asio;
15namespace bs = boost::system;
16namespace dummy = ba::placeholders;
17
18using ba::serial_port_base;
19
20#define DEBUG_TX
21
22 // -------- Abbreviations for starting async tasks ---------
23
24int ConnectionUSB::Write(const Time &t, const string &txt, int qos)
25{
26 if (fLog)
27 return fLog->Write(t, txt, qos);
28
29 return MessageImp::Write(t, txt, qos);
30}
31
32void ConnectionUSB::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
33{
34 ba::async_read(*this, buffers,
35 boost::bind(&ConnectionUSB::HandleReceivedData, this,
36 dummy::error, dummy::bytes_transferred, type));
37}
38
39void ConnectionUSB::AsyncWrite(const ba::const_buffers_1 &buffers)
40{
41 ba::async_write(*this, buffers,
42 boost::bind(&ConnectionUSB::HandleSentData, this,
43 dummy::error, dummy::bytes_transferred));
44}
45
46void ConnectionUSB::AsyncWait(ba::deadline_timer &timer, int millisec,
47 void (ConnectionUSB::*handler)(const bs::error_code&))
48{
49 // - The boost::asio::basic_deadline_timer::expires_from_now()
50 // function cancels any pending asynchronous waits, and returns
51 // the number of asynchronous waits that were cancelled. If it
52 // returns 0 then you were too late and the wait handler has
53 // already been executed, or will soon be executed. If it
54 // returns 1 then the wait handler was successfully cancelled.
55 // - If a wait handler is cancelled, the bs::error_code passed to
56 // it contains the value bs::error::operation_aborted.
57 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
58
59 timer.async_wait(boost::bind(handler, this, dummy::error));
60}
61
62// ------------------------ close --------------------------
63// close from another thread
64void ConnectionUSB::CloseImp(bool restart)
65{
66 if (IsConnected())
67 {
68 ostringstream str;
69 str << "Connection closed to " << URL() << ".";
70 Info(str);
71 }
72
73 // Close possible open connections
74 bs::error_code ec;
75 cancel(ec);
76 if (!ec)
77 Error("Cancel async requests on "+URL()+": "+ec.message());
78
79 close(ec);
80 if (!ec)
81 Error("Closing "+URL()+": "+ec.message());
82
83 // Reset the connection status
84 fConnectionStatus = kDisconnected;
85
86 // Stop deadline counters
87 fInTimeout.cancel();
88 fOutTimeout.cancel();
89
90 // Empty output queue
91 fOutQueue.clear();
92
93 if (!restart || IsConnecting())
94 return;
95
96 // We need some timeout before reconnecting!
97 // And we have to check if we are alreayd trying to connect
98 // We shoudl wait until all operations in progress were canceled
99
100 // Start trying to reconnect
101 Connect();
102}
103
104void ConnectionUSB::PostClose(bool restart)
105{
106 get_io_service().post(boost::bind(&ConnectionUSB::CloseImp, this, restart));
107}
108
109// ------------------------ write --------------------------
110void ConnectionUSB::HandleWriteTimeout(const bs::error_code &error)
111{
112 if (error==ba::error::basic_errors::operation_aborted)
113 return;
114
115 // 125: Operation canceled (bs::error_code(125, bs::system_category))
116 if (error)
117 {
118 ostringstream str;
119 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
120 Error(str);
121
122 CloseImp();
123 return;
124 }
125
126 if (!is_open())
127 {
128 // For example: Here we could schedule a new accept if we
129 // would not want to allow two connections at the same time.
130 return;
131 }
132
133 // Check whether the deadline has passed. We compare the deadline
134 // against the current time since a new asynchronous operation
135 // may have moved the deadline before this actor had a chance
136 // to run.
137 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
138 return;
139
140 Error("fOutTimeout has expired, writing data to "+URL());
141
142 CloseImp();
143}
144
145void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n)
146{
147 if (error && error != ba::error::not_connected)
148 {
149 ostringstream str;
150 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
151 Error(str);
152
153 CloseImp();
154 return;
155 }
156
157 if (error == ba::error::not_connected)
158 {
159 ostringstream msg;
160 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
161 Warn(msg);
162 }
163 else
164 {
165#ifdef DEBUG_TX
166 ostringstream msg;
167 msg << n << " bytes successfully sent to " << URL();
168 Message(msg);
169#endif
170 }
171
172 HandleTransmittedData(n);
173
174 // This is "thread" safe because SendMessage and HandleSentMessage
175 // are serialized in the EventQueue. Note: Do not call these
176 // functions directly from any other place then Handlers, use
177 // PostMessage instead
178 fOutQueue.pop_front();
179
180 if (fOutQueue.empty())
181 {
182 // Queue went empty, remove deadline
183 fOutTimeout.cancel();
184 return;
185 }
186
187 // AsyncWrite + Deadline
188 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
189 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
190}
191
192// It is important that when SendMessageImp is called, or to be more
193// precise boost::bind is called, teh data is copied!
194void ConnectionUSB::SendMessageImp(const vector<char> msg)
195{
196 /*
197 if (!fConnectionEstablished)
198 {
199 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
200 return;
201 }*/
202
203 const bool first_message_in_queue = fOutQueue.empty();
204
205 // This is "thread" safe because SendMessage and HandleSentMessage
206 // are serialized in the EventQueue. Note: Do not call these
207 // functions directly from any other place then Handlers, use
208 // PostMessage instead
209 fOutQueue.push_back(msg);
210
211 if (!first_message_in_queue)
212 return;
213
214 // AsyncWrite + Deadline
215 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
216 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
217}
218
219void ConnectionUSB::PostMessage(const void *ptr, size_t max)
220{
221 const vector<char> msg(reinterpret_cast<const char*>(ptr),
222 reinterpret_cast<const char*>(ptr)+max);
223
224 get_io_service().post(boost::bind(&ConnectionUSB::SendMessageImp, this, msg));
225}
226
227void ConnectionUSB::PostMessage(const string &cmd, size_t max)
228{
229 if (max==size_t(-1))
230 max = cmd.length()+1;
231
232 vector <char>msg(max);
233
234 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
235
236 PostMessage(msg);
237}
238
239void ConnectionUSB::Connect()
240{
241 fConnectionStatus = kConnecting;
242
243 bs::error_code ec;
244 open(URL(), ec);
245
246 ostringstream msg;
247 msg << "Connecting to " << URL() << "...";
248 if (ec)
249 msg << " " << ec.message() << " (" << ec << ")";
250 else
251 msg << "success.";
252
253 if (ec)
254 {
255 Error(msg);
256 fConnectionStatus = kDisconnected;
257 return;
258 }
259
260 Info(msg);
261
262 try
263 {
264 set_option(fBaudRate);
265 set_option(fCharacterSize);
266 set_option(fParity);
267 set_option(fStopBits);
268 set_option(fFlowControl);
269 }
270 catch (const bs::system_error &erc)
271 {
272 Error(string("Setting connection options: ")+erc.what());
273 // CLOSE
274 return;
275 }
276
277 fConnectionStatus = kConnected;
278
279 ConnectionEstablished();
280}
281
282void ConnectionUSB::SetEndpoint(const string &addr)
283{
284 if (fConnectionStatus>=1)
285 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
286
287 fAddress = "/dev/"+addr;
288}
289
290
291ConnectionUSB::ConnectionUSB(ba::io_service& ioservice, ostream &out) :
292MessageImp(out), ba::serial_port(ioservice), fLog(0),
293fBaudRate(115200),
294fCharacterSize(8), fParity(parity::none), fStopBits(stop_bits::one),
295fFlowControl(flow_control::none),
296fInTimeout(ioservice), fOutTimeout(ioservice),
297fConnectionStatus(kDisconnected)
298{
299}
Note: See TracBrowser for help on using the repository browser.