source: trunk/FACT++/src/sqmctrl.cc@ 17936

Last change on this file since 17936 was 17932, checked in by tbretz, 11 years ago
Revived the valid state; make sure the first message (which might still be junk from a non empty buffer) is properly treated; make sure that an automatic re-connect properly works
File size: 16.2 KB
Line 
1#include <boost/algorithm/string.hpp>
2
3#include "FACT.h"
4#include "Dim.h"
5#include "Event.h"
6#include "StateMachineDim.h"
7#include "StateMachineAsio.h"
8#include "Connection.h"
9#include "LocalControl.h"
10#include "Configuration.h"
11#include "Console.h"
12
13#include "tools.h"
14
15#include "HeadersSQM.h"
16
17namespace ba = boost::asio;
18namespace bs = boost::system;
19namespace dummy = ba::placeholders;
20
21using namespace std;
22
23class ConnectionSQM : public Connection
24{
25protected:
26 virtual void Update(const SQM::Data &)
27 {
28 }
29
30private:
31 bool fIsVerbose;
32 bool fFirstMessage;
33 bool fValid;
34 uint16_t fTimeout;
35
36 boost::asio::streambuf fBuffer;
37
38 boost::asio::deadline_timer fTrigger;
39
40 void HandleRead(const boost::system::error_code& err, size_t bytes_received)
41 {
42 // Do not schedule a new read if the connection failed.
43 if (bytes_received==0 || err)
44 {
45 if (err==ba::error::eof)
46 Warn("Connection closed by remote host.");
47
48 // 107: Transport endpoint is not connected (bs::error_code(107, bs::system_category))
49 // 125: Operation canceled
50 if (err && err!=ba::error::eof && // Connection closed by remote host
51 err!=ba::error::basic_errors::not_connected && // Connection closed by remote host
52 err!=ba::error::basic_errors::operation_aborted) // Connection closed by us
53 {
54 ostringstream str;
55 str << "Reading from " << URL() << ": " << err.message() << " (" << err << ")";// << endl;
56 Error(str);
57 }
58 PostClose(false);//err!=ba::error::basic_errors::operation_aborted);
59 return;
60 }
61
62 istream is(&fBuffer);
63
64 string buffer;
65 if (!getline(is, buffer, '\n'))
66 {
67 Fatal("Received message does not contain \\n... closing connection.");
68 PostClose(false);
69 return;
70 }
71
72 buffer = buffer.substr(0, buffer.size()-1);
73
74 if (fIsVerbose)
75 {
76 Out() << Time().GetAsStr("%H:%M:%S.%f") << "[" << buffer.size() << "]: " << buffer << "|" << endl;
77 // Out() << Time().GetAsStr("%H:%M:%S.%f") << "[ " << vec.size() << "]: ";
78 // for (auto it=vec.begin(); it!=vec.end(); it++)
79 // Out() << *it << "|";
80 // Out() << endl;
81 }
82
83 vector<string> vec;
84 boost::split(vec, buffer, boost::is_any_of(","));
85
86 try
87 {
88 if (vec.size()!=6)
89 throw runtime_error("Unknown number of fields in received data");
90
91 if (vec[0]!="r")
92 throw runtime_error("Not a proper answer["+to_string(vec[0].size())+"]: "+vec[0]);
93
94 SQM::Data data;
95
96 data.mag = stof(vec[1]);
97 data.freq = stol(vec[2]);
98 data.counts = stol(vec[3]);
99 data.period = stof(vec[4]);
100 data.temp = stof(vec[5]);
101
102 Update(data);
103
104 fValid = true;
105 }
106 catch (const exception &e)
107 {
108 if (fFirstMessage)
109 Warn("Parsing first message failed ["+string(e.what())+"]");
110 else
111 {
112 Error("Parsing received message failed ["+string(e.what())+"]");
113 Error("Received: "+buffer);
114 PostClose(false);
115 return;
116 }
117 }
118
119 // Send next request in fTimeout milliseconds calculated from
120 // the last request onwards.
121 fTrigger.expires_at(fTrigger.expires_at()+boost::posix_time::milliseconds(fTimeout));
122 fTrigger.async_wait(boost::bind(&ConnectionSQM::HandleRequestTrigger,
123 this, dummy::error));
124
125 fFirstMessage = false;
126 }
127
128 void HandleReadTimeout(const bs::error_code &error)
129 {
130 // 125: Operation canceled (bs::error_code(125, bs::system_category))
131 if (error && error!=ba::error::basic_errors::operation_aborted)
132 {
133 ostringstream str;
134 str << "ReadTimeout of " << URL() << " failed: " << error.message() << " (" << error << ")";// << endl;
135 Error(str);
136
137 PostClose(false);
138 return;
139 }
140
141 if (!is_open())
142 {
143 // For example: Here we could schedule a new accept if we
144 // would not want to allow two connections at the same time.
145 PostClose(true);
146 return;
147 }
148
149 // This is called if the deadline has been shifted
150 if (error==ba::error::basic_errors::operation_aborted)
151 return;
152
153 // Check whether the deadline has passed. We compare the deadline
154 // against the current time since a new asynchronous operation
155 // may have moved the deadline before this actor had a chance
156 // to run.
157 if (fInTimeout.expires_at() > ba::deadline_timer::traits_type::now())
158 return;
159
160 ostringstream str;
161 str << "No valid answer received from " << URL() << " within " << ceil(fTimeout*1.5) << "ms";
162 Error(str);
163
164 PostClose(false);
165
166 fInTimeout.expires_from_now(boost::posix_time::milliseconds(1000));
167 fInTimeout.async_wait(boost::bind(&ConnectionSQM::HandleReadTimeout,
168 this, dummy::error));
169 }
170
171 void HandleRequestTrigger(const bs::error_code &error)
172 {
173
174 // 125: Operation canceled (bs::error_code(125, bs::system_category))
175 if (error && error!=ba::error::basic_errors::operation_aborted)
176 {
177 ostringstream str;
178 str << "RequestTrigger failed of " << URL() << " failed: " << error.message() << " (" << error << ")";// << endl;
179 Error(str);
180
181 PostClose(false);
182 return;
183 }
184
185 if (!is_open())
186 {
187 // For example: Here we could schedule a new accept if we
188 // would not want to allow two connections at the same time.
189 //PostClose(true);
190 return;
191 }
192
193 // Check whether the deadline has passed. We compare the deadline
194 // against the current time since a new asynchronous operation
195 // may have moved the deadline before this actor had a chance
196 // to run.
197 if (fTrigger.expires_at() > ba::deadline_timer::traits_type::now())
198 return;
199
200 StartReadReport();
201 }
202
203 void StartReadReport()
204 {
205 PostMessage(string("rx\n"), 3);
206
207 // Do not schedule two reads
208 if (!fFirstMessage)
209 {
210 async_read_until(*this, fBuffer, '\n',
211 boost::bind(&ConnectionSQM::HandleRead, this,
212 dummy::error, dummy::bytes_transferred));
213 }
214
215 fInTimeout.expires_from_now(boost::posix_time::milliseconds(fTimeout*1.5));
216 fInTimeout.async_wait(boost::bind(&ConnectionSQM::HandleReadTimeout,
217 this, dummy::error));
218 }
219
220private:
221 // This is called when a connection was established
222 void ConnectionEstablished()
223 {
224 fValid = false;
225 fFirstMessage = true;
226
227 // Empty a possible buffer first before we start reading
228 // otherwise reading and writing might not be consecutive
229 async_read_until(*this, fBuffer, '\n',
230 boost::bind(&ConnectionSQM::HandleRead, this,
231 dummy::error, dummy::bytes_transferred));
232
233 // If there was no immediate answer, send a request
234 fTrigger.expires_at(Time()+boost::posix_time::milliseconds(1000));
235 fTrigger.async_wait(boost::bind(&ConnectionSQM::HandleRequestTrigger,
236 this, dummy::error));
237 }
238
239public:
240 static const uint16_t kMaxAddr;
241
242public:
243 ConnectionSQM(ba::io_service& ioservice, MessageImp &imp) : Connection(ioservice, imp()),
244 fIsVerbose(true), fTimeout(0), fTrigger(ioservice)
245 {
246 SetLogStream(&imp);
247 }
248
249 void SetVerbose(bool b)
250 {
251 fIsVerbose = b;
252 Connection::SetVerbose(b);
253 }
254
255 void SetTimeout(uint16_t t)
256 {
257 fTimeout = t;
258 }
259
260 int GetState() const
261 {
262 if (!is_open())
263 return SQM::State::kDisconnected;
264
265 return fValid ? SQM::State::kValid : SQM::State::kConnected;
266 }
267};
268
269// ------------------------------------------------------------------------
270
271#include "DimDescriptionService.h"
272
273class ConnectionDimWeather : public ConnectionSQM
274{
275private:
276 DimDescribedService fDim;
277
278public:
279 ConnectionDimWeather(ba::io_service& ioservice, MessageImp &imp) :
280 ConnectionSQM(ioservice, imp),
281 fDim("SQM_CONTROL/DATA", "F:1;I:1;I:1;F:1;F:1",
282 "Data received from sky quality meter"
283 "|mag[mag/arcsec^2]:Magnitude (0 means upper brightness limit)"
284 "|freq[Hz]:Frequency of sensor"
285 "|counts:Period of sensor (counts occur at 14.7456MHz/32)"
286 "|period[s]:Period of sensor"
287 "|temp[deg C]:Sensor temperature in deg C")
288 {
289 }
290
291 void Update(const SQM::Data &data)
292 {
293 fDim.Update(data);
294 }
295};
296
297// ------------------------------------------------------------------------
298
299template <class T, class S>
300class StateMachineSQMControl : public StateMachineAsio<T>
301{
302private:
303 S fSQM;
304
305 bool CheckEventSize(size_t has, const char *name, size_t size)
306 {
307 if (has==size)
308 return true;
309
310 ostringstream msg;
311 msg << name << " - Received event has " << has << " bytes, but expected " << size << ".";
312 T::Fatal(msg);
313 return false;
314 }
315
316 int Disconnect()
317 {
318 // Close all connections
319 fSQM.PostClose(false);
320
321 return T::GetCurrentState();
322 }
323
324 int Reconnect(const EventImp &evt)
325 {
326 // Close all connections to supress the warning in SetEndpoint
327 fSQM.PostClose(false);
328
329 // Now wait until all connection have been closed and
330 // all pending handlers have been processed
331 ba::io_service::poll();
332
333 if (evt.GetBool())
334 fSQM.SetEndpoint(evt.GetString());
335
336 // Now we can reopen the connection
337 fSQM.PostClose(true);
338
339 return T::GetCurrentState();
340 }
341
342 int SetVerbosity(const EventImp &evt)
343 {
344 if (!CheckEventSize(evt.GetSize(), "SetVerbosity", 1))
345 return T::kSM_FatalError;
346
347 fSQM.SetVerbose(evt.GetBool());
348
349 return T::GetCurrentState();
350 }
351
352 int Send(const string &cmd)
353 {
354 const string tx = cmd+"\r\n";
355 fSQM.PostMessage(tx, tx.size());
356 return T::GetCurrentState();
357 }
358
359 int SendCommand(const EventImp &evt)
360 {
361 return Send(evt.GetString());
362 }
363
364 int Execute()
365 {
366 return fSQM.GetState();
367 }
368
369
370public:
371 StateMachineSQMControl(ostream &out=cout) :
372 StateMachineAsio<T>(out, "SQM_CONTROL"), fSQM(*this, *this)
373 {
374 // State names
375 T::AddStateName(SQM::State::kDisconnected, "Disconnected",
376 "No connection to Sky Quality Meter");
377
378 T::AddStateName(SQM::State::kConnected, "Connected",
379 "Connection established, but no valid message received");
380
381 T::AddStateName(SQM::State::kValid, "Valid",
382 "Valid message received");
383
384 // Commands
385 //T::AddEvent("SEND_COMMAND", "C")
386 // (bind(&StateMachineSQMControl::SendCommand, this, placeholders::_1))
387 // ("Send command to SQM");
388
389 // Verbosity commands
390 T::AddEvent("SET_VERBOSE", "B")
391 (bind(&StateMachineSQMControl::SetVerbosity, this, placeholders::_1))
392 ("set verbosity state"
393 "|verbosity[bool]:disable or enable verbosity for received data (yes/no), except dynamic data");
394
395 //T::AddEvent("ENABLE")
396 // (bind(&StateMachineSQMControl::Send, this, "veto_60"))
397 // ("Enable trigger signal once a second vetoed at every exact minute");
398
399 //T::AddEvent("DISABLE")
400 // (bind(&StateMachineSQMControl::Send, this, "veto_on"))
401 // ("Diable trigger output");
402
403 // Conenction commands
404 T::AddEvent("DISCONNECT")
405 (bind(&StateMachineSQMControl::Disconnect, this))
406 ("disconnect from ethernet");
407
408 T::AddEvent("RECONNECT", "O")
409 (bind(&StateMachineSQMControl::Reconnect, this, placeholders::_1))
410 ("(Re)connect ethernet connection to SQM, a new address can be given"
411 "|[host][string]:new ethernet address in the form <host:port>");
412
413 }
414
415 int EvalOptions(Configuration &conf)
416 {
417 fSQM.SetVerbose(!conf.Get<bool>("quiet"));
418 fSQM.SetTimeout(conf.Get<uint16_t>("request-interval"));
419 fSQM.SetDebugTx(conf.Get<bool>("debug-tx"));
420 fSQM.SetEndpoint(conf.Get<string>("addr"));
421 fSQM.StartConnect();
422
423 return -1;
424 }
425};
426
427// ------------------------------------------------------------------------
428
429#include "Main.h"
430
431
432template<class T, class S, class R>
433int RunShell(Configuration &conf)
434{
435 return Main::execute<T, StateMachineSQMControl<S, R>>(conf);
436}
437
438void SetupConfiguration(Configuration &conf)
439{
440 po::options_description control("SQM control");
441 control.add_options()
442 ("no-dim,d", po_switch(), "Disable dim services")
443 ("addr,a", var<string>("10.0.100.208:10001"), "Network address of the lid controling Arduino including port")
444 ("quiet,q", po_bool(true), "Disable printing contents of all received messages (except dynamic data) in clear text.")
445 ("debug-tx", po_bool(), "Enable debugging of ethernet transmission.")
446 ("request-interval", var<uint16_t>(5000), "How often to request a report [milliseconds].")
447 ;
448
449 conf.AddOptions(control);
450}
451
452/*
453 Extract usage clause(s) [if any] for SYNOPSIS.
454 Translators: "Usage" and "or" here are patterns (regular expressions) which
455 are used to match the usage synopsis in program output. An example from cp
456 (GNU coreutils) which contains both strings:
457 Usage: cp [OPTION]... [-T] SOURCE DEST
458 or: cp [OPTION]... SOURCE... DIRECTORY
459 or: cp [OPTION]... -t DIRECTORY SOURCE...
460 */
461void PrintUsage()
462{
463 cout <<
464 "The sqmctrl is an interface to the Sky Quality Meter.\n"
465 "\n"
466 "The default is that the program is started without user intercation. "
467 "All actions are supposed to arrive as DimCommands. Using the -c "
468 "option, a local shell can be initialized. With h or help a short "
469 "help message about the usuage can be brought to the screen.\n"
470 "\n"
471 "Usage: sqmctrl [-c type] [OPTIONS]\n"
472 " or: sqmctrl [OPTIONS]\n";
473 cout << endl;
474}
475
476void PrintHelp()
477{
478// Main::PrintHelp<StateMachineFTM<StateMachine, ConnectionFTM>>();
479
480 /* Additional help text which is printed after the configuration
481 options goes here */
482
483 /*
484 cout << "bla bla bla" << endl << endl;
485 cout << endl;
486 cout << "Environment:" << endl;
487 cout << "environment" << endl;
488 cout << endl;
489 cout << "Examples:" << endl;
490 cout << "test exam" << endl;
491 cout << endl;
492 cout << "Files:" << endl;
493 cout << "files" << endl;
494 cout << endl;
495 */
496}
497
498int main(int argc, const char* argv[])
499{
500 Configuration conf(argv[0]);
501 conf.SetPrintUsage(PrintUsage);
502 Main::SetupConfiguration(conf);
503 SetupConfiguration(conf);
504
505 if (!conf.DoParse(argc, argv, PrintHelp))
506 return 127;
507
508 // No console access at all
509 if (!conf.Has("console"))
510 {
511 if (conf.Get<bool>("no-dim"))
512 return RunShell<LocalStream, StateMachine, ConnectionSQM>(conf);
513 else
514 return RunShell<LocalStream, StateMachineDim, ConnectionDimWeather>(conf);
515 }
516 // Cosole access w/ and w/o Dim
517 if (conf.Get<bool>("no-dim"))
518 {
519 if (conf.Get<int>("console")==0)
520 return RunShell<LocalShell, StateMachine, ConnectionSQM>(conf);
521 else
522 return RunShell<LocalConsole, StateMachine, ConnectionSQM>(conf);
523 }
524 else
525 {
526 if (conf.Get<int>("console")==0)
527 return RunShell<LocalShell, StateMachineDim, ConnectionDimWeather>(conf);
528 else
529 return RunShell<LocalConsole, StateMachineDim, ConnectionDimWeather>(conf);
530 }
531
532 return 0;
533}
Note: See TracBrowser for help on using the repository browser.