source: trunk/FACT++/src/ConnectionUSB.cc@ 11969

Last change on this file since 11969 was 11963, checked in by tbretz, 14 years ago
Fixed two wrong conditional checks if there was an error or not.
File size: 8.3 KB
Line 
1// **************************************************************************
2/** @class Connection
3
4@brief Maintains an ansynchronous TCP/IP client connection
5
6*/
7// **************************************************************************
8#include "ConnectionUSB.h"
9
10#include <boost/bind.hpp>
11
12using namespace std;
13
14namespace ba = boost::asio;
15namespace bs = boost::system;
16namespace dummy = ba::placeholders;
17
18using ba::serial_port_base;
19
20#define DEBUG_TX
21
22 // -------- Abbreviations for starting async tasks ---------
23
24int ConnectionUSB::Write(const Time &t, const string &txt, int qos)
25{
26 if (fLog)
27 return fLog->Write(t, txt, qos);
28
29 return MessageImp::Write(t, txt, qos);
30}
31
32void ConnectionUSB::AsyncRead(const ba::mutable_buffers_1 buffers, int type, int counter)
33{
34 ba::async_read(*this, buffers,
35 boost::bind(&ConnectionUSB::HandleReceivedData, this,
36 dummy::error, dummy::bytes_transferred, type, counter));
37}
38
39void ConnectionUSB::AsyncWrite(const ba::const_buffers_1 &buffers)
40{
41 ba::async_write(*this, buffers,
42 boost::bind(&ConnectionUSB::HandleSentData, this,
43 dummy::error, dummy::bytes_transferred));
44}
45
46void ConnectionUSB::AsyncWait(ba::deadline_timer &timer, int millisec,
47 void (ConnectionUSB::*handler)(const bs::error_code&))
48{
49 // - The boost::asio::basic_deadline_timer::expires_from_now()
50 // function cancels any pending asynchronous waits, and returns
51 // the number of asynchronous waits that were cancelled. If it
52 // returns 0 then you were too late and the wait handler has
53 // already been executed, or will soon be executed. If it
54 // returns 1 then the wait handler was successfully cancelled.
55 // - If a wait handler is cancelled, the bs::error_code passed to
56 // it contains the value bs::error::operation_aborted.
57 timer.expires_from_now(boost::posix_time::milliseconds(millisec));
58
59 timer.async_wait(boost::bind(handler, this, dummy::error));
60}
61
62// ------------------------ close --------------------------
63// close from another thread
64void ConnectionUSB::CloseImp(bool restart)
65{
66 if (IsConnected())
67 {
68 ostringstream str;
69 str << "Closing connection to " << URL() << ".";
70 Info(str);
71 }
72
73 // Close possible open connections
74 bs::error_code ec;
75 cancel(ec);
76 if (ec)
77 Error("Cancel async requests on "+URL()+": "+ec.message());
78
79 if (IsConnected())
80 {
81 close(ec);
82 if (ec)
83 Error("Closing "+URL()+": "+ec.message());
84 }
85
86 // Reset the connection status
87 fConnectionStatus = kDisconnected;
88
89 // Stop deadline counters
90 fInTimeout.cancel();
91 fOutTimeout.cancel();
92
93 // Empty output queue
94 fOutQueue.clear();
95
96 if (!restart || IsConnecting())
97 return;
98
99 // We need some timeout before reconnecting!
100 // And we have to check if we are alreayd trying to connect
101 // We shoudl wait until all operations in progress were canceled
102
103 // Start trying to reconnect
104 Connect();
105}
106
107void ConnectionUSB::PostClose(bool restart)
108{
109 get_io_service().post(boost::bind(&ConnectionUSB::CloseImp, this, restart));
110}
111
112// ------------------------ write --------------------------
113void ConnectionUSB::HandleWriteTimeout(const bs::error_code &error)
114{
115 if (error==ba::error::basic_errors::operation_aborted)
116 return;
117
118 // 125: Operation canceled (bs::error_code(125, bs::system_category))
119 if (error)
120 {
121 ostringstream str;
122 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
123 Error(str);
124
125 CloseImp();
126 return;
127 }
128
129 if (!is_open())
130 {
131 // For example: Here we could schedule a new accept if we
132 // would not want to allow two connections at the same time.
133 return;
134 }
135
136 // Check whether the deadline has passed. We compare the deadline
137 // against the current time since a new asynchronous operation
138 // may have moved the deadline before this actor had a chance
139 // to run.
140 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
141 return;
142
143 Error("fOutTimeout has expired, writing data to "+URL());
144
145 CloseImp();
146}
147
148void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n)
149{
150 if (error && error != ba::error::not_connected)
151 {
152 ostringstream str;
153 str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
154 Error(str);
155
156 CloseImp();
157 return;
158 }
159
160 if (error == ba::error::not_connected)
161 {
162 ostringstream msg;
163 msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
164 Warn(msg);
165 }
166 else
167 {
168#ifdef DEBUG_TX
169 ostringstream msg;
170 msg << n << " bytes successfully sent to " << URL();
171 Message(msg);
172#endif
173 }
174
175 HandleTransmittedData(n);
176
177 // This is "thread" safe because SendMessage and HandleSentMessage
178 // are serialized in the EventQueue. Note: Do not call these
179 // functions directly from any other place then Handlers, use
180 // PostMessage instead
181 fOutQueue.pop_front();
182
183 if (fOutQueue.empty())
184 {
185 // Queue went empty, remove deadline
186 fOutTimeout.cancel();
187 return;
188 }
189
190 // AsyncWrite + Deadline
191 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
192 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
193}
194
195// It is important that when SendMessageImp is called, or to be more
196// precise boost::bind is called, teh data is copied!
197void ConnectionUSB::SendMessageImp(const vector<char> msg)
198{
199 /*
200 if (!fConnectionEstablished)
201 {
202 UpdateWarn("SendMessageImp, but no connection to "+fAddress+":"+fPort+".");
203 return;
204 }*/
205
206 const bool first_message_in_queue = fOutQueue.empty();
207
208 // This is "thread" safe because SendMessage and HandleSentMessage
209 // are serialized in the EventQueue. Note: Do not call these
210 // functions directly from any other place then Handlers, use
211 // PostMessage instead
212 fOutQueue.push_back(msg);
213
214 if (!first_message_in_queue)
215 return;
216
217 // AsyncWrite + Deadline
218 AsyncWrite(ba::const_buffers_1(fOutQueue.front().data(), fOutQueue.front().size())/*, &ConnectionUSB::HandleSentData*/);
219 AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
220}
221
222void ConnectionUSB::PostMessage(const void *ptr, size_t max)
223{
224 const vector<char> msg(reinterpret_cast<const char*>(ptr),
225 reinterpret_cast<const char*>(ptr)+max);
226
227 get_io_service().post(boost::bind(&ConnectionUSB::SendMessageImp, this, msg));
228}
229
230void ConnectionUSB::PostMessage(const string &cmd, size_t max)
231{
232 if (max==size_t(-1))
233 max = cmd.length()+1;
234
235 vector <char>msg(max);
236
237 copy(cmd.begin(), cmd.begin()+min(cmd.length()+1, max), msg.begin());
238
239 PostMessage(msg);
240}
241
242void ConnectionUSB::Connect()
243{
244 fConnectionStatus = kConnecting;
245
246 bs::error_code ec;
247 open(URL(), ec);
248
249 ostringstream msg;
250 msg << "Connecting to " << URL() << "... ";
251 if (ec)
252 msg << " " << ec.message() << " (" << ec << ")";
253 else
254 msg << "success.";
255
256 if (ec)
257 {
258 Error(msg);
259 fConnectionStatus = kDisconnected;
260 return;
261 }
262
263 Info(msg);
264
265 try
266 {
267 set_option(fBaudRate);
268 set_option(fCharacterSize);
269 set_option(fParity);
270 set_option(fStopBits);
271 set_option(fFlowControl);
272 }
273 catch (const bs::system_error &erc)
274 {
275 Error(string("Setting connection options: ")+erc.what());
276 // CLOSE
277 return;
278 }
279
280 fConnectionStatus = kConnected;
281
282 ConnectionEstablished();
283}
284
285void ConnectionUSB::SetEndpoint(const string &addr)
286{
287 if (fConnectionStatus>=1)
288 Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
289
290 fAddress = "/dev/"+addr;
291}
292
293
294ConnectionUSB::ConnectionUSB(ba::io_service& ioservice, ostream &out) :
295MessageImp(out), ba::serial_port(ioservice), fLog(0),
296fBaudRate(115200),
297fCharacterSize(8), fParity(parity::none), fStopBits(stop_bits::one),
298fFlowControl(flow_control::none),
299fInTimeout(ioservice), fOutTimeout(ioservice),
300fConnectionStatus(kDisconnected)
301{
302}
Note: See TracBrowser for help on using the repository browser.