source: trunk/FACT++/src/sqmctrl.cc @ 19459

Last change on this file since 19459 was 19459, checked in by tbretz, 8 weeks ago
Use IsConnected rather than is_open to avoid toggling when no connection is possible.
File size: 16.2 KB
Line 
1#include <boost/algorithm/string.hpp>
2
3#include "FACT.h"
4#include "Dim.h"
5#include "Event.h"
6#include "StateMachineDim.h"
7#include "StateMachineAsio.h"
8#include "Connection.h"
9#include "LocalControl.h"
10#include "Configuration.h"
11#include "Console.h"
12
13#include "tools.h"
14
15#include "HeadersSQM.h"
16
17namespace ba = boost::asio;
18namespace bs = boost::system;
19namespace dummy = ba::placeholders;
20
21using namespace std;
22
23class ConnectionSQM : public Connection
24{
25protected:
26    virtual void Update(const SQM::Data &)
27    {
28    }
29
30private:
31    bool     fIsVerbose;
32    bool     fFirstMessage;
33    bool     fValid;
34    uint16_t fTimeout;
35
36    boost::asio::streambuf fBuffer;
37
38    boost::asio::deadline_timer fTrigger;
39
40    void HandleRead(const boost::system::error_code& err, size_t bytes_received)
41    {
42        // Do not schedule a new read if the connection failed.
43        if (bytes_received==0 || err)
44        {
45            if (err==ba::error::eof)
46                Warn("Connection closed by remote host.");
47
48            // 107: Transport endpoint is not connected (bs::error_code(107, bs::system_category))
49            // 125: Operation canceled
50            if (err && err!=ba::error::eof &&                     // Connection closed by remote host
51                err!=ba::error::basic_errors::not_connected &&    // Connection closed by remote host
52                err!=ba::error::basic_errors::operation_aborted)  // Connection closed by us
53            {
54                ostringstream str;
55                str << "Reading from " << URL() << ": " << err.message() << " (" << err << ")";// << endl;
56                Error(str);
57            }
58            PostClose(false);//err!=ba::error::basic_errors::operation_aborted);
59            return;
60        }
61
62        istream is(&fBuffer);
63
64        string buffer;
65        if (!getline(is, buffer, '\n'))
66        {
67            Fatal("Received message does not contain \\n... closing connection.");
68            PostClose(false);
69            return;
70        }
71
72        buffer = buffer.substr(0, buffer.size()-1);
73
74        if (fIsVerbose)
75        {
76            Out() << Time().GetAsStr("%H:%M:%S.%f") << "[" << buffer.size() << "]: " << buffer << "|" << endl;
77            // Out() << Time().GetAsStr("%H:%M:%S.%f") << "[ " << vec.size() << "]: ";
78            // for (auto it=vec.begin(); it!=vec.end(); it++)
79            //     Out() << *it << "|";
80            // Out() << endl;
81        }
82
83        vector<string> vec;
84        boost::split(vec, buffer, boost::is_any_of(","));
85
86        try
87        {
88            if (vec.size()!=6)
89                throw runtime_error("Unknown number of fields in received data");
90
91            if (vec[0]!="r")
92                throw runtime_error("Not a proper answer");
93
94            SQM::Data data;
95
96            data.mag    = stof(vec[1]);
97            data.freq   = stol(vec[2]);
98            data.counts = stol(vec[3]);
99            data.period = stof(vec[4]);
100            data.temp   = stof(vec[5]);
101
102            Update(data);
103
104            fValid = true;
105        }
106        catch (const exception &e)
107        {
108            if (fFirstMessage)
109                Warn("Parsing first message failed ["+string(e.what())+"]");
110            else
111            {
112                Error("Parsing received message failed ["+string(e.what())+"]");
113                Error("Received: "+buffer);
114                PostClose(false);
115                return;
116            }
117        }
118
119        // Send next request in fTimeout milliseconds calculated from
120        // the last request onwards.
121        fTrigger.expires_at(fTrigger.expires_at()+boost::posix_time::milliseconds(fTimeout));
122        fTrigger.async_wait(boost::bind(&ConnectionSQM::HandleRequestTrigger,
123                                        this, dummy::error));
124
125        fFirstMessage = false;
126    }
127
128    void HandleReadTimeout(const bs::error_code &error)
129    {
130        // 125: Operation canceled (bs::error_code(125, bs::system_category))
131        if (error && error!=ba::error::basic_errors::operation_aborted)
132        {
133            ostringstream str;
134            str << "ReadTimeout of " << URL() << " failed: " << error.message() << " (" << error << ")";// << endl;
135            Error(str);
136
137            PostClose(false);
138            return;
139        }
140
141        if (!is_open())
142        {
143            // For example: Here we could schedule a new accept if we
144            // would not want to allow two connections at the same time.
145            fValid = false;
146            PostClose(true);
147            return;
148        }
149
150        // This is called if the deadline has been shifted
151        if (error==ba::error::basic_errors::operation_aborted)
152            return;
153
154        // Check whether the deadline has passed. We compare the deadline
155        // against the current time since a new asynchronous operation
156        // may have moved the deadline before this actor had a chance
157        // to run.
158        if (fInTimeout.expires_at() > ba::deadline_timer::traits_type::now())
159            return;
160
161        ostringstream str;
162        str << "No valid answer received from " << URL() << " within " << ceil(fTimeout*1.5) << "ms";
163        Error(str);
164
165        PostClose(false);
166
167        fInTimeout.expires_from_now(boost::posix_time::milliseconds(1000));
168        fInTimeout.async_wait(boost::bind(&ConnectionSQM::HandleReadTimeout,
169                                          this, dummy::error));
170    }
171
172    void HandleRequestTrigger(const bs::error_code &error)
173    {
174
175        // 125: Operation canceled (bs::error_code(125, bs::system_category))
176        if (error && error!=ba::error::basic_errors::operation_aborted)
177        {
178            ostringstream str;
179            str << "RequestTrigger failed of " << URL() << " failed: " << error.message() << " (" << error << ")";// << endl;
180            Error(str);
181
182            PostClose(false);
183            return;
184        }
185
186        if (!is_open())
187        {
188            // For example: Here we could schedule a new accept if we
189            // would not want to allow two connections at the same time.
190            //PostClose(true);
191            return;
192        }
193
194        // Check whether the deadline has passed. We compare the deadline
195        // against the current time since a new asynchronous operation
196        // may have moved the deadline before this actor had a chance
197        // to run.
198        if (fTrigger.expires_at() > ba::deadline_timer::traits_type::now())
199            return;
200
201        StartReadReport();
202    }
203
204    void StartReadReport()
205    {
206        PostMessage(string("rx\n"), 3);
207
208        // Do not schedule two reads
209        if (!fFirstMessage)
210        {
211            async_read_until(*this, fBuffer, '\n',
212                             boost::bind(&ConnectionSQM::HandleRead, this,
213                                         dummy::error, dummy::bytes_transferred));
214        }
215
216        fInTimeout.expires_from_now(boost::posix_time::milliseconds(fTimeout*1.5));
217        fInTimeout.async_wait(boost::bind(&ConnectionSQM::HandleReadTimeout,
218                                          this, dummy::error));
219    }
220
221private:
222    // This is called when a connection was established
223    void ConnectionEstablished()
224    {
225        fValid = false;
226        fFirstMessage = true;
227
228        // Empty a possible buffer first before we start reading
229        // otherwise reading and writing might not be consecutive
230        async_read_until(*this, fBuffer, '\n',
231                         boost::bind(&ConnectionSQM::HandleRead, this,
232                                     dummy::error, dummy::bytes_transferred));
233
234        // If there was no immediate answer, send a request
235        fTrigger.expires_at(Time()+boost::posix_time::milliseconds(1000));
236        fTrigger.async_wait(boost::bind(&ConnectionSQM::HandleRequestTrigger,
237                                        this, dummy::error));
238    }
239
240public:
241    static const uint16_t kMaxAddr;
242
243public:
244    ConnectionSQM(ba::io_service& ioservice, MessageImp &imp) : Connection(ioservice, imp()),
245        fIsVerbose(true), fTimeout(0), fTrigger(ioservice)
246    {
247        SetLogStream(&imp);
248    }
249
250    void SetVerbose(bool b)
251    {
252        fIsVerbose = b;
253        Connection::SetVerbose(b);
254    }
255
256    void SetTimeout(uint16_t t)
257    {
258        fTimeout = t;
259    }
260
261    int GetState() const
262    {
263        if (!IsConnected())
264            return  SQM::State::kDisconnected;
265
266        return fValid ? SQM::State::kValid : SQM::State::kConnected;
267    }
268};
269
270// ------------------------------------------------------------------------
271
272#include "DimDescriptionService.h"
273
274class ConnectionDimWeather : public ConnectionSQM
275{
276private:
277    DimDescribedService fDim;
278
279public:
280    ConnectionDimWeather(ba::io_service& ioservice, MessageImp &imp) :
281        ConnectionSQM(ioservice, imp),
282        fDim("SQM_CONTROL/DATA", "F:1;I:1;I:1;F:1;F:1",
283             "Data received from sky quality meter"
284             "|Mag[mag/arcsec^2]:Magnitude (0 means upper brightness limit)"
285             "|Freq[Hz]:Frequency of sensor"
286             "|Counts:Period of sensor (counts occur at 14.7456MHz/32)"
287             "|Period[s]:Period of sensor"
288             "|Temp[deg C]:Sensor temperature in deg C")
289    {
290    }
291
292    void Update(const SQM::Data &data)
293    {
294        fDim.Update(data);
295    }
296};
297
298// ------------------------------------------------------------------------
299
300template <class T, class S>
301class StateMachineSQMControl : public StateMachineAsio<T>
302{
303private:
304    S fSQM;
305
306    bool CheckEventSize(size_t has, const char *name, size_t size)
307    {
308        if (has==size)
309            return true;
310
311        ostringstream msg;
312        msg << name << " - Received event has " << has << " bytes, but expected " << size << ".";
313        T::Fatal(msg);
314        return false;
315    }
316
317    int Disconnect()
318    {
319        // Close all connections
320        fSQM.PostClose(false);
321
322        return T::GetCurrentState();
323    }
324
325    int Reconnect(const EventImp &evt)
326    {
327        // Close all connections to supress the warning in SetEndpoint
328        fSQM.PostClose(false);
329
330        // Now wait until all connection have been closed and
331        // all pending handlers have been processed
332        ba::io_service::poll();
333
334        if (evt.GetBool())
335            fSQM.SetEndpoint(evt.GetString());
336
337        // Now we can reopen the connection
338        fSQM.PostClose(true);
339
340        return T::GetCurrentState();
341    }
342
343    int SetVerbosity(const EventImp &evt)
344    {
345        if (!CheckEventSize(evt.GetSize(), "SetVerbosity", 1))
346            return T::kSM_FatalError;
347
348        fSQM.SetVerbose(evt.GetBool());
349
350        return T::GetCurrentState();
351    }
352
353    int Send(const string &cmd)
354    {
355        const string tx = cmd+"\r\n";
356        fSQM.PostMessage(tx, tx.size());
357        return T::GetCurrentState();
358    }
359
360    int SendCommand(const EventImp &evt)
361    {
362        return Send(evt.GetString());
363    }
364
365    int Execute()
366    {
367        return fSQM.GetState();
368    }
369
370
371public:
372    StateMachineSQMControl(ostream &out=cout) :
373        StateMachineAsio<T>(out, "SQM_CONTROL"), fSQM(*this, *this)
374    {
375        // State names
376        T::AddStateName(SQM::State::kDisconnected, "Disconnected",
377                        "No connection to Sky Quality Meter");
378
379        T::AddStateName(SQM::State::kConnected, "Connected",
380                        "Connection established, but no valid message received");
381
382        T::AddStateName(SQM::State::kValid, "Valid",
383                        "Valid message received");
384
385        // Commands
386        //T::AddEvent("SEND_COMMAND", "C")
387        //    (bind(&StateMachineSQMControl::SendCommand, this, placeholders::_1))
388        //    ("Send command to SQM");
389
390        // Verbosity commands
391        T::AddEvent("SET_VERBOSE", "B")
392            (bind(&StateMachineSQMControl::SetVerbosity, this, placeholders::_1))
393            ("set verbosity state"
394             "|verbosity[bool]:disable or enable verbosity for received data (yes/no), except dynamic data");
395
396        //T::AddEvent("ENABLE")
397        //    (bind(&StateMachineSQMControl::Send, this, "veto_60"))
398        //    ("Enable trigger signal once a second vetoed at every exact minute");
399
400        //T::AddEvent("DISABLE")
401        //    (bind(&StateMachineSQMControl::Send, this, "veto_on"))
402        //    ("Diable trigger output");
403
404        // Conenction commands
405        T::AddEvent("DISCONNECT")
406            (bind(&StateMachineSQMControl::Disconnect, this))
407            ("disconnect from ethernet");
408
409         T::AddEvent("RECONNECT", "O")
410            (bind(&StateMachineSQMControl::Reconnect, this, placeholders::_1))
411            ("(Re)connect ethernet connection to SQM, a new address can be given"
412             "|[host][string]:new ethernet address in the form <host:port>");
413
414    }
415
416    int EvalOptions(Configuration &conf)
417    {
418        fSQM.SetVerbose(!conf.Get<bool>("quiet"));
419        fSQM.SetTimeout(conf.Get<uint16_t>("request-interval"));
420        fSQM.SetDebugTx(conf.Get<bool>("debug-tx"));
421        fSQM.SetEndpoint(conf.Get<string>("addr"));
422        fSQM.StartConnect();
423
424        return -1;
425    }
426};
427
428// ------------------------------------------------------------------------
429
430#include "Main.h"
431
432
433template<class T, class S, class R>
434int RunShell(Configuration &conf)
435{
436    return Main::execute<T, StateMachineSQMControl<S, R>>(conf);
437}
438
439void SetupConfiguration(Configuration &conf)
440{
441    po::options_description control("SQM control");
442    control.add_options()
443        ("no-dim,d",         po_switch(),         "Disable dim services")
444        ("addr,a",           var<string>("10.0.100.208:10001"), "Network address of the lid controling Arduino including port")
445        ("quiet,q",          po_bool(true),       "Disable printing contents of all received messages (except dynamic data) in clear text.")
446        ("debug-tx",         po_bool(),           "Enable debugging of ethernet transmission.")
447        ("request-interval", var<uint16_t>(5000), "How often to request a report [milliseconds].")
448        ;
449
450    conf.AddOptions(control);
451}
452
453/*
454 Extract usage clause(s) [if any] for SYNOPSIS.
455 Translators: "Usage" and "or" here are patterns (regular expressions) which
456 are used to match the usage synopsis in program output.  An example from cp
457 (GNU coreutils) which contains both strings:
458  Usage: cp [OPTION]... [-T] SOURCE DEST
459    or:  cp [OPTION]... SOURCE... DIRECTORY
460    or:  cp [OPTION]... -t DIRECTORY SOURCE...
461 */
462void PrintUsage()
463{
464    cout <<
465        "The sqmctrl is an interface to the Sky Quality Meter.\n"
466        "\n"
467        "The default is that the program is started without user intercation. "
468        "All actions are supposed to arrive as DimCommands. Using the -c "
469        "option, a local shell can be initialized. With h or help a short "
470        "help message about the usuage can be brought to the screen.\n"
471        "\n"
472        "Usage: sqmctrl [-c type] [OPTIONS]\n"
473        "  or:  sqmctrl [OPTIONS]\n";
474    cout << endl;
475}
476
477void PrintHelp()
478{
479//    Main::PrintHelp<StateMachineFTM<StateMachine, ConnectionFTM>>();
480
481    /* Additional help text which is printed after the configuration
482     options goes here */
483
484    /*
485     cout << "bla bla bla" << endl << endl;
486     cout << endl;
487     cout << "Environment:" << endl;
488     cout << "environment" << endl;
489     cout << endl;
490     cout << "Examples:" << endl;
491     cout << "test exam" << endl;
492     cout << endl;
493     cout << "Files:" << endl;
494     cout << "files" << endl;
495     cout << endl;
496     */
497}
498
499int main(int argc, const char* argv[])
500{
501    Configuration conf(argv[0]);
502    conf.SetPrintUsage(PrintUsage);
503    Main::SetupConfiguration(conf);
504    SetupConfiguration(conf);
505
506    if (!conf.DoParse(argc, argv, PrintHelp))
507        return 127;
508
509    // No console access at all
510    if (!conf.Has("console"))
511    {
512        if (conf.Get<bool>("no-dim"))
513            return RunShell<LocalStream, StateMachine, ConnectionSQM>(conf);
514        else
515            return RunShell<LocalStream, StateMachineDim, ConnectionDimWeather>(conf);
516    }
517    // Cosole access w/ and w/o Dim
518    if (conf.Get<bool>("no-dim"))
519    {
520        if (conf.Get<int>("console")==0)
521            return RunShell<LocalShell, StateMachine, ConnectionSQM>(conf);
522        else
523            return RunShell<LocalConsole, StateMachine, ConnectionSQM>(conf);
524    }
525    else
526    {
527        if (conf.Get<int>("console")==0)
528            return RunShell<LocalShell, StateMachineDim, ConnectionDimWeather>(conf);
529        else
530            return RunShell<LocalConsole, StateMachineDim, ConnectionDimWeather>(conf);
531    }
532
533    return 0;
534}
Note: See TracBrowser for help on using the repository browser.