source: trunk/FACT++/src/ConnectionUSB.cc@ 11323

Last change on this file since 11323 was 11314, checked in by tbretz, 13 years ago
File size: 8.3 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "ConnectionUSB.h"
9
10#include <boost/bind.hpp>
11#include <boost/lexical_cast.hpp>
12
13using namespace std;
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17namespace dummy = ba::placeholders;
18
19using boost::lexical_cast;
20using ba::serial_port_base;
21
22#define DEBUG_TX
23
24 // -------- Abbreviations for starting async tasks ---------
25
26int ConnectionUSB::Write(const Time &t, const string &txt, int qos)
27{
28 if (fLog)
29 return fLog->Write(t, txt, qos);
30
31 return MessageImp::Write(t, txt, qos);
32}
33
34void ConnectionUSB::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
35{
36 ba::async_read(*this, buffers,
37 boost::bind(&ConnectionUSB::HandleReceivedData, this,
38 dummy::error, dummy::bytes_transferred, type));
39}
40
41void ConnectionUSB::AsyncWrite(const ba::const_buffers_1 &buffers)
42{
43 ba::async_write(*this, buffers,
44 boost::bind(&ConnectionUSB::HandleSentData, this,
45 dummy::error, dummy::bytes_transferred));
46}
47
48void ConnectionUSB::AsyncWait(ba::deadline_timer &timer, int millisec,
49 void (ConnectionUSB::*handler)(const bs::error_code&))
50{
51 // - The boost::asio::basic_deadline_timer::expires_from_now()
52 // function cancels any pending asynchronous waits, and returns
53 // the number of asynchronous waits that were cancelled. If it
54 // returns 0 then you were too late and the wait handler has
55 // already been executed, or will soon be executed. If it
56 // returns 1 then the wait handler was successfully cancelled.
57 // - If a wait handler is cancelled, the bs::error_code passed to
58 // it contains the value bs::error::operation_aborted.
59 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
60
61 timer.async_wait(boost::bind(handler, this, dummy::error));
62}
63
64// ------------------------ close --------------------------
65// close from another thread
66void ConnectionUSB::CloseImp(bool restart)
67{
68 if (IsConnected())
69 {
70 ostringstream str;
71 str << "Connection closed to " << URL() << ".";
72 Info(str);
73 }
74
75 // Close possible open connections
76 bs::error_code ec;
77 cancel(ec);
78 if (!ec)
79 Error("Cancel async requests on "+URL()+": "+ec.message());
80
81 close(ec);
82 if (!ec)
83 Error("Closing "+URL()+": "+ec.message());
84
85 // Reset the connection status
86 fConnectionStatus = kDisconnected;
87
88 // Stop deadline counters
89 fInTimeout.cancel();
90 fOutTimeout.cancel();
91
92 // Empty output queue
93 fOutQueue.clear();
94
95 if (!restart || IsConnecting())
96 return;
97
98 // We need some timeout before reconnecting!
99 // And we have to check if we are alreayd trying to connect
100 // We shoudl wait until all operations in progress were canceled
101
102 // Start trying to reconnect
103 Connect();
104}
105
106void ConnectionUSB::PostClose(bool restart)
107{
108 get_io_service().post(boost::bind(&ConnectionUSB::CloseImp, this, restart));
109}
110
111// ------------------------ write --------------------------
112void ConnectionUSB::HandleWriteTimeout(const bs::error_code &error)
113{
114 if (error==ba::error::basic_errors::operation_aborted)
115 return;
116
117 // 125: Operation canceled (bs::error_code(125, bs::system_category))
118 if (error)
119 {
120 ostringstream str;
121 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
122 Error(str);
123
124 CloseImp();
125 return;
126 }
127
128 if (!is_open())
129 {
130 // For example: Here we could schedule a new accept if we
131 // would not want to allow two connections at the same time.
132 return;
133 }
134
135 // Check whether the deadline has passed. We compare the deadline
136 // against the current time since a new asynchronous operation
137 // may have moved the deadline before this actor had a chance
138 // to run.
139 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
140 return;
141
142 Error("fOutTimeout has expired, writing data to "+URL());
143
144 CloseImp();
145}
146
147void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n)
148{
149 if (error && error != ba::error::not_connected)
150 {
151 ostringstream str;
152 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
153 Error(str);
154
155 CloseImp();
156 return;
157 }
158
159 if (error == ba::error::not_connected)
160 {
161 ostringstream msg;
162 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
163 Warn(msg);
164 }
165 else
166 {
167#ifdef DEBUG_TX
168 ostringstream msg;
169 msg << n << " bytes successfully sent to " << URL();
170 Message(msg);
171#endif
172 }
173
174 HandleTransmittedData(n);
175
176 // This is "thread" safe because SendMessage and HandleSentMessage
177 // are serialized in the EventQueue. Note: Do not call these
178 // functions directly from any other place then Handlers, use
179 // PostMessage instead
180 fOutQueue.pop_front();
181
182 if (fOutQueue.empty())
183 {
184 // Queue went empty, remove deadline
185 fOutTimeout.cancel();
186 return;
187 }
188
189 // AsyncWrite + Deadline
190 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
191 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
192}
193
194// It is important that when SendMessageImp is called, or to be more
195// precise boost::bind is called, teh data is copied!
196void ConnectionUSB::SendMessageImp(const vector<char> msg)
197{
198 /*
199 if (!fConnectionEstablished)
200 {
201 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
202 return;
203 }*/
204
205 const bool first_message_in_queue = fOutQueue.empty();
206
207 // This is "thread" safe because SendMessage and HandleSentMessage
208 // are serialized in the EventQueue. Note: Do not call these
209 // functions directly from any other place then Handlers, use
210 // PostMessage instead
211 fOutQueue.push_back(msg);
212
213 if (!first_message_in_queue)
214 return;
215
216 // AsyncWrite + Deadline
217 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
218 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
219}
220
221void ConnectionUSB::PostMessage(const void *ptr, size_t max)
222{
223 const vector<char> msg(reinterpret_cast<const char*>(ptr),
224 reinterpret_cast<const char*>(ptr)+max);
225
226 get_io_service().post(boost::bind(&ConnectionUSB::SendMessageImp, this, msg));
227}
228
229void ConnectionUSB::PostMessage(const string &cmd, size_t max)
230{
231 if (max==size_t(-1))
232 max = cmd.length()+1;
233
234 vector <char>msg(max);
235
236 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
237
238 PostMessage(msg);
239}
240
241void ConnectionUSB::Connect()
242{
243 fConnectionStatus = kConnecting;
244
245 bs::error_code ec;
246 open(URL(), ec);
247
248 ostringstream msg;
249 msg << "Connecting to " << URL() << "...";
250 if (ec)
251 msg << " " << ec.message() << " (" << ec << ")";
252 else
253 msg << "success.";
254
255 if (ec)
256 {
257 Error(msg);
258 fConnectionStatus = kDisconnected;
259 return;
260 }
261
262 Info(msg);
263
264 try
265 {
266 set_option(fBaudRate);
267 set_option(fCharacterSize);
268 set_option(fParity);
269 set_option(fStopBits);
270 set_option(fFlowControl);
271 }
272 catch (const bs::system_error &erc)
273 {
274 Error(string("Setting connection options: ")+erc.what());
275 // CLOSE
276 return;
277 }
278
279 fConnectionStatus = kConnected;
280
281 ConnectionEstablished();
282}
283
284void ConnectionUSB::SetEndpoint(const string &addr)
285{
286 if (fConnectionStatus>=1)
287 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
288
289 fAddress = "/dev/"+addr;
290}
291
292
293ConnectionUSB::ConnectionUSB(ba::io_service& ioservice, ostream &out) :
294MessageImp(out), ba::serial_port(ioservice), fLog(0),
295fBaudRate(115200),
296fCharacterSize(8), fParity(parity::none), fStopBits(stop_bits::one),
297fFlowControl(flow_control::none),
298fInTimeout(ioservice), fOutTimeout(ioservice),
299fConnectionStatus(kDisconnected)
300{
301}
Note: See TracBrowser for help on using the repository browser.