source: trunk/FACT++/src/dclient5.cc @ 10230

Last change on this file since 10230 was 10230, checked in by tbretz, 10 years ago
Added compatibility with boost V1.40
File size: 17.2 KB
Line 
1#include <boost/bind.hpp>
2#include <boost/thread.hpp>
3#include <boost/asio/deadline_timer.hpp>
4
5#include "Event.h"
6#include "Shell.h"
7#include "StateMachineDim.h"
8#include "Connection.h"
9#include "Configuration.h"
10#include "Timers.h"
11#include "Console.h"
12
13#include "tools.h"
14
15namespace ba    = boost::asio;
16namespace bs    = boost::system;
17
18using ba::deadline_timer;
19using ba::ip::tcp;
20
21using namespace std;
22
23
24// ------------------------------------------------------------------------
25
26#include "LocalControl.h"
27
28// ------------------------------------------------------------------------
29
30class ConnectionFAD : public Connection
31{
32    MessageImp &fMsg;
33
34    int state;
35
36public:
37    void ConnectImp(const bs::error_code& error,
38                    tcp::resolver::iterator endpoint_iterator)
39    {
40        Connection::ConnectImp(error, endpoint_iterator);
41        if (IsConnected())
42            StartAsyncRead();
43    }
44
45    void HandleReadTimeout(const bs::error_code &error)
46    {
47        return;
48        if (!is_open())
49        {
50            // For example: Here we could schedule a new accept if we
51            // would not want to allow two connections at the same time.
52            return;
53        }
54
55        // 125: Operation canceled
56
57        if (error && error!=bs::error_code(125, bs::system_category))
58        {
59            stringstream str;
60
61            str << "HandleReadTimeout: " << error.message() << " (" << error << ")";// << endl;
62            if (error==bs::error_code(2, ba::error::misc_category))
63                Warn(str); // Connection: EOF (closed by remote host)
64            else
65                Error(str);
66        }
67
68        // Check whether the deadline has passed. We compare the deadline
69        // against the current time since a new asynchronous operation
70        // may have moved the deadline before this actor had a chance
71        // to run.
72        if (fInTimeout.expires_at() > deadline_timer::traits_type::now())
73            return;
74
75        Error("fInTimeout has expired...");
76
77        CloseImp();
78    }
79
80    void HandleReceivedData(const bs::error_code& error, size_t bytes_received)
81    {
82        // Do not schedule a new read if the connection failed.
83        if (bytes_received==0 || error)
84        {
85            // 107: Transport endpoint is not connected
86            // 125: Operation canceled
87            if (error && error!=bs::error_code(107, bs::system_category))
88            {
89                stringstream str;
90                str << "Reading from " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
91                Error(str);
92            }
93            CloseImp(error!=bs::error_code(125, bs::system_category));
94            return;
95        }
96
97        string txt;
98
99        if (bytes_received==2)
100        {
101            txt = string(fReadBuffer, bytes_received);
102            //std::vector<char> buf(128);
103            //bytes_transferred = sock.receive(boost::asio::buffer(d3));
104
105            fMsg() << "Received b=" << bytes_received << ": " << (int)fReadBuffer[0] << " " << (int)txt[0] << " '" << txt << "' " << " " << error.message() << "  (" << error << ")" << endl;
106
107            if (fReadBuffer[0]=='T')
108            {
109                // AsyncRead + Deadline
110                // Do all manipulation to the buffer BEFORE this call!
111                AsyncRead(ba::buffer(fReadBuffer+2, 21)/*,
112                          &Connection::HandleReceivedData*/);
113                AsyncWait(fInTimeout, 5000, &Connection::HandleReadTimeout);
114            }
115            else
116            {
117                // AsyncRead + Deadline
118                // Do all manipulation to the buffer BEFORE this call!
119                AsyncRead(ba::buffer(fReadBuffer+2, 35)/*,
120                          &Connection::HandleReceivedData*/);
121                AsyncWait(fInTimeout, 5000, &Connection::HandleReadTimeout);
122            }
123        }
124        else
125        {
126            txt = string(fReadBuffer, bytes_received+2);
127            const int s = atoi(fReadBuffer+35);
128            if (s==9)
129                Info("Requested time received: "+txt);
130            else
131                state = s;
132
133            Out() << "Received b=" << bytes_received << ": " << (int)fReadBuffer[0] << " " << (int)txt[0] << " '" << txt << "' " << " " << error.message() << "  (" << error << ")" << endl;
134            memset(fReadBuffer, 0, 100);
135
136            // Do all manipulation to the buffer BEFORE this call!
137            AsyncRead(ba::buffer(fReadBuffer, 2)/*,
138                      &Connection::HandleReceivedData*/);
139
140
141        }
142    }
143
144    int GetState() const { return state; }
145
146    void StartAsyncRead()
147    {
148        // Start also a dealine_time for a proper timeout
149        // Therefore we must know how often we expect messages
150        // FIXME: Add deadline_counter
151
152        memset(fReadBuffer, 0, 100);
153
154        // AsyncRead + Deadline
155        AsyncRead(ba::buffer(fReadBuffer, 2)/*,
156                  &Connection::HandleReceivedData*/);
157        AsyncWait(fInTimeout, 5000, &Connection::HandleReadTimeout);
158    }
159
160    /*
161     ConnectionFAD(ba::io_service& io_service, const string &addr, int port) :
162     Connection(io_service, addr, port), state(0) { }
163     ConnectionFAD(ba::io_service& io_service, const string &addr, const string &port) :
164     Connection(io_service, addr, port), state(0) { }
165     */
166
167    ConnectionFAD(ba::io_service& ioservice, MessageImp &imp) :
168    Connection(ioservice, imp()), fMsg(imp), state(0)
169    {
170    }
171};
172
173template <class T>
174class StateMachineFAD : public T, public ba::io_service
175{
176public:
177    enum states_t
178    {
179        kSM_Disconnected = 1,
180        kSM_Connecting,
181        kSM_Connected,
182        kSM_Running,
183        kSM_SomeRunning,
184        kSM_Starting,
185        kSM_Stopping,
186        kSM_Reconnect,
187    };
188
189    ConnectionFAD c1;
190    ConnectionFAD c2;
191    ConnectionFAD c3;
192    ConnectionFAD c4;
193    ConnectionFAD c5;
194    ConnectionFAD c6;
195    ConnectionFAD c7;
196    ConnectionFAD c8;
197    ConnectionFAD c9;
198
199    /*
200    int Write(const Time &time, const char *txt, int qos)
201    {
202        return T::Write(time, txt, qos);
203    }
204    */
205    Timers fTimers;
206
207    StateMachineFAD(const string &name="", ostream &out=cout) :
208        T(out, name),
209        c1(*this, *this), c2(*this, *this), c3(*this, *this), c4(*this, *this),
210        c5(*this, *this), c6(*this, *this), c7(*this, *this), c8(*this, *this),
211        c9(*this, *this), fTimers(out)
212    {
213        c1.SetEndpoint("localhost", 5000);
214        c2.SetEndpoint("localhost", 4001);
215        c3.SetEndpoint("ftmboard1.ethz.ch", 5000);
216        c4.SetEndpoint("localhost", 4003);
217        c5.SetEndpoint("localhost", 4004);
218        c6.SetEndpoint("localhost", 4005);
219        c7.SetEndpoint("localhost", 4006);
220        c8.SetEndpoint("localhost", 4007);
221        c9.SetEndpoint("localhost", 4008);
222
223        c1.SetLogStream(this);
224        c2.SetLogStream(this);
225        c3.SetLogStream(this);
226        c4.SetLogStream(this);
227        c5.SetLogStream(this);
228        c6.SetLogStream(this);
229        c7.SetLogStream(this);
230        c8.SetLogStream(this);
231        c9.SetLogStream(this);
232
233        c1.AsyncConnect(); // This sets the connection to "open"
234        c2.AsyncConnect(); // This sets the connection to "open"
235        c3.AsyncConnect(); // This sets the connection to "open"
236        //c4.AsyncConnect(); // This sets the connection to "open"
237        //c5.AsyncConnect(); // This sets the connection to "open"
238        //c6.AsyncConnect(); // This sets the connection to "open"
239        //c7.AsyncConnect(); // This sets the connection to "open"
240        //c8.AsyncConnect(); // This sets the connection to "open"
241        //c9.AsyncConnect(); // This sets the connection to "open"
242
243        AddStateName(kSM_Disconnected,  "Disconnected");
244        AddStateName(kSM_Connecting,    "Connecting"); // Some connected
245        AddStateName(kSM_Connected,     "Connected");
246        AddStateName(kSM_Running,       "Running");
247        AddStateName(kSM_SomeRunning,   "SomeRunning");
248        AddStateName(kSM_Starting,      "Starting");
249        AddStateName(kSM_Stopping,      "Stopping");
250
251        AddTransition(kSM_Running,   "START", kSM_Connected)
252            ->AssignFunction(boost::bind(&StateMachineFAD::Start, this, _1, 5));
253        AddTransition(kSM_Connected, "STOP",  kSM_Running);
254
255        AddConfiguration("TIME", kSM_Running);
256        AddConfiguration("LED",  kSM_Connected);
257
258        T::AddConfiguration("MYT",  "I:1;C:5;I");
259        T::AddConfiguration("TESTI",            "I");
260        T::AddConfiguration("TESTI:5",          "I:5");
261        T::AddConfiguration("TESTI:5;F:1;D:2",  "I:5;F:1;D:2");
262        T::AddConfiguration("TESTC",            "C");
263        T::AddConfiguration("TESTI:5;C",        "I:5;C");
264
265        AddTransition(kSM_Reconnect, "RECONNECT");
266
267        T::PrintListOfEvents();
268    }
269
270    int Start(const EventImp &evt, int i)
271    {
272        switch (evt.GetTargetState())
273        {
274        case kSM_Running:    // We are coming from kRunning
275        case kSM_Starting:   // We are coming from kConnected
276            T::Out() << "Received Start(" << i << ")" << endl;
277            c1.PostMessage("START", 10);
278            c2.PostMessage("START", 10);
279            // We could introduce a "waiting for execution" state
280            return T::GetCurrentState();
281        }
282        return T::kSM_FatalError;
283    }
284
285    void Close()
286    {
287        c1.PostClose();
288        c2.PostClose();
289        c3.PostClose();
290        c4.PostClose();
291        c5.PostClose();
292        c6.PostClose();
293        c7.PostClose();
294        c8.PostClose();
295        c9.PostClose();
296    }
297
298
299    int Execute()
300    {
301        // Dispatch at most one handler from the queue. In contrary
302        // to run_run(), it doesn't wait until a handler is available
303        // which can be dispatched, so poll_one() might return with 0
304        // handlers dispatched. The handlers are always dispatched
305        // synchronously.
306
307        fTimers.SetT();
308        const int n = poll_one();
309        fTimers.Proc(n==0 && T::IsQueueEmpty());
310
311//        return c3.IsConnected() ? kSM_Connected : kSM_Disconnected;
312
313
314        // None is connected
315        if (!c1.IsConnected() && !c2.IsConnected())
316            return kSM_Disconnected;
317
318        // Some are connected
319        if (c1.IsConnected()!=c2.IsConnected())
320            return kSM_Connecting;
321
322        if (c1.GetState()==0 && c2.GetState()==0 && T::GetCurrentState()!=kSM_Starting)
323            return kSM_Connected;
324
325        if (c1.GetState()==1 && c2.GetState()==1 && T::GetCurrentState()!=kSM_Stopping)
326            return kSM_Running;
327
328        return kSM_SomeRunning;//GetCurrentState();
329    }
330
331    int Transition(const Event &evt)
332    {
333        ConnectionFAD *con1 = &c1;
334        ConnectionFAD *con2 = &c2;
335
336        switch (evt.GetTargetState())
337        {
338        case kSM_Reconnect:
339            // Close all connections
340            c1.PostClose(false);
341            c2.PostClose(false);
342            c3.PostClose(false);
343
344            // Now wait until all connection have been closed and
345            // all pending handlers have been processed
346            poll();
347
348            // Now we can reopen the connection
349            c1.PostClose(true);
350            c2.PostClose(true);
351            c3.PostClose(true);
352
353
354            //c4.PostClose(true);
355            //c5.PostClose(true);
356            //c6.PostClose(true);
357            //c7.PostClose(true);
358            //c8.PostClose(true);
359            //c9.PostClose(true);
360            return T::GetCurrentState();
361        case kSM_Running: // We are coming from kRunning
362        case kSM_Starting:   // We are coming from kConnected
363            T::Out() << "Received START" << endl;
364            con1->PostMessage("START", 10);
365            con2->PostMessage("START", 10);
366            // We could introduce a "waiting for execution" state
367            return T::GetCurrentState();
368            return kSM_Starting; //GetCurrentState();
369
370        case kSM_Connected:   // We are coming from kConnected
371        case kSM_Stopping: // We are coming from kRunning
372            T::Out() << "Received STOP" << endl;
373            con1->PostMessage("STOP", 10);
374            con2->PostMessage("STOP", 10);
375            // We could introduce a "waiting for execution" state
376            return T::GetCurrentState();
377            return kSM_Stopping;//GetCurrentState();
378        }
379
380        return T::kSM_FatalError; //evt.GetTargetState();
381    }
382    int Configure(const Event &evt)
383    {
384        if (evt.GetName()=="TIME")
385        {
386            c1.PostMessage("TIME", 10);
387            c2.PostMessage("TIME", 10);
388        }
389
390        vector<char> v(2);
391        v[0] = 0xc0;
392        v[1] = 0x00;
393
394        if (evt.GetName()=="LED")
395            c3.PostMessage(v);
396
397        return T::GetCurrentState();
398    }
399};
400
401// ------------------------------------------------------------------------
402
403template<class S>
404int RunDim(Configuration &conf)
405{
406    /*
407     initscr();               // Start curses mode
408     cbreak();                // Line buffering disabled, Pass on
409     intrflush(stdscr, FALSE);
410     start_color();            // Initialize ncurses colors
411     use_default_colors();     // Assign terminal default colors to -1
412     for (int i=1; i<8; i++)
413        init_pair(i, i, -1);  // -1: def background
414        scrollok(stdscr, true);
415        */
416
417    WindowLog wout;
418
419    //log.SetWindow(stdscr);
420    if (conf.Has("log"))
421        if (!wout.OpenLogFile(conf.Get<string>("log")))
422            wout << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
423
424    // Start io_service.Run to use the StateMachineImp::Run() loop
425    // Start io_service.run to only use the commandHandler command detaching
426    StateMachineFAD<S> io_service("DATA_LOGGER", wout);
427    io_service.Run();
428
429    return 0;
430}
431
432template<class T, class S>
433int RunShell(Configuration &conf)
434{
435    static T shell(conf.GetName().c_str(), conf.Get<int>("console")!=1);
436
437    WindowLog &win  = shell.GetStreamIn();
438    WindowLog &wout = shell.GetStreamOut();
439
440    if (conf.Has("log"))
441        if (!wout.OpenLogFile(conf.Get<string>("log")))
442            win << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
443
444    StateMachineFAD<S> io_service("DATA_LOGGER", wout);
445    shell.SetReceiver(io_service);
446
447    boost::thread t(boost::bind(&StateMachineFAD<S>::Run, &io_service));
448
449    //io_service.SetReady();
450
451    shell.Run();                 // Run the shell
452    io_service.Stop();           // Signal Loop-thread to stop
453    // io_service.Close();       // Obsolete, done by the destructor
454    // wout << "join: " << t.timed_join(boost::posix_time::milliseconds(0)) << endl;
455
456    // Wait until the StateMachine has finished its thread
457    // before returning and destroying the dim objects which might
458    // still be in use.
459    t.join();
460
461    return 0;
462}
463
464void SetupConfiguration(Configuration &conf)
465{
466    const string n = conf.GetName()+".log";
467
468    po::options_description config("Program options");
469    config.add_options()
470        ("dns",       var<string>("localhost"),  "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
471        ("log,l",     var<string>(n), "Write log-file")
472        ("no-dim,d",  po_switch(),    "Disable dim services")
473        ("console,c", var<int>(),     "Use console (0=shell, 1=simple buffered, X=simple unbuffered)")
474        ;
475
476    conf.AddEnv("dns", "DIM_DNS_NODE");
477
478    conf.AddOptions(config);
479}
480
481int main(int argc, char* argv[])
482{
483    Configuration conf(argv[0]);
484    SetupConfiguration(conf);
485
486    po::variables_map vm;
487    try
488    {
489        vm = conf.Parse(argc, argv);
490    }
491    catch (std::exception &e)
492    {
493#if BOOST_VERSION > 104000
494        po::multiple_occurrences *MO = dynamic_cast<po::multiple_occurrences*>(&e);
495        if (MO)
496            cout << "Error: " << e.what() << " of '" << MO->get_option_name() << "' option." << endl;
497        else
498#endif
499            cout << "Error: " << e.what() << endl;
500        cout << endl;
501
502        return -1;
503    }
504
505    if (conf.HasHelp() || conf.HasPrint())
506        return -1;
507
508    // To allow overwriting of DIM_DNS_NODE set 0 to 1
509    setenv("DIM_DNS_NODE", conf.Get<string>("dns").c_str(), 1);
510
511    try
512    {
513        // No console access at all
514        if (!conf.Has("console"))
515        {
516            if (conf.Get<bool>("no-dim"))
517                return RunDim<StateMachine>(conf);
518            else
519                return RunDim<StateMachineDim>(conf);
520        }
521        // Cosole access w/ and w/o Dim
522        if (conf.Get<bool>("no-dim"))
523        {
524            if (conf.Get<int>("console")==0)
525                return RunShell<LocalShell, StateMachine>(conf);
526            else
527                return RunShell<LocalConsole, StateMachine>(conf);
528        }
529        else
530        {
531            if (conf.Get<int>("console")==0)
532                return RunShell<LocalShell, StateMachineDim>(conf);
533            else
534                return RunShell<LocalConsole, StateMachineDim>(conf);
535        }
536    }
537    catch (std::exception& e)
538    {
539        std::cerr << "Exception: " << e.what() << "\n";
540    }
541
542    return 0;
543}
544
545/*
546class FADctrlDim : public StateMachineFAD<StateMachineDim>
547{
548public:
549FADctrlDim(const std::string &name="DATA_LOGGER", std::ostream &out=std::cout)
550: StateMachineFAD<StateMachineDim>(out, name) { }
551};
552
553 class FADctrlLocalShell : public StateMachineFAD<StateMachine>
554{
555public:
556    ostream &win;
557
558    FADctrlLocalShell(std::ostream &out, std::ostream &out2)
559        : StateMachineFAD<StateMachine>(out), win(out2) { }
560
561    FADctrlLocalShell(std::ostream &out=std::cout)
562        : StateMachineFAD<StateMachine>(out), win(out) { }
563
564};
565*/
Note: See TracBrowser for help on using the repository browser.