source: trunk/FACT++/src/dclient5.cc@ 10372

Last change on this file since 10372 was 10372, checked in by tbretz, 10 years ago
Changed second argument in Parse to const as well as in all main functions.
File size: 21.0 KB
Line 
1#include <boost/bind.hpp>
2#include <boost/thread.hpp>
3#include <boost/asio/deadline_timer.hpp>
4
5#include "Event.h"
6#include "Shell.h"
7#include "StateMachineDim.h"
8#include "Connection.h"
9#include "Configuration.h"
10#include "Timers.h"
11#include "Console.h"
12
13#include "tools.h"
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17
18using ba::deadline_timer;
19using ba::ip::tcp;
20
21using namespace std;
22
23
24// ------------------------------------------------------------------------
25
26#include "LocalControl.h"
27
28// ------------------------------------------------------------------------
29
30class ConnectionFAD : public Connection
31{
32 MessageImp &fMsg;
33
34 int state;
35
36 char fReadBuffer[1000];
37
38public:
39 void ConnectionEstablished()
40 {
41 StartAsyncRead();
42 }
43
44 void HandleReadTimeout(const bs::error_code &error)
45 {
46 return;
47 if (!is_open())
48 {
49 // For example: Here we could schedule a new accept if we
50 // would not want to allow two connections at the same time.
51 return;
52 }
53
54 // 125: Operation canceled
55
56 if (error && error!=bs::error_code(125, bs::system_category))
57 {
58 stringstream str;
59
60 str << "HandleReadTimeout: " << error.message() << " (" << error << ")";// << endl;
61 if (error==bs::error_code(2, ba::error::misc_category))
62 Warn(str); // Connection: EOF (closed by remote host)
63 else
64 Error(str);
65 }
66
67 // Check whether the deadline has passed. We compare the deadline
68 // against the current time since a new asynchronous operation
69 // may have moved the deadline before this actor had a chance
70 // to run.
71 if (fInTimeout.expires_at() > deadline_timer::traits_type::now())
72 return;
73
74 Error("fInTimeout has expired...");
75
76 PostClose();
77 }
78
79 void HandleReceivedData(const bs::error_code& error, size_t bytes_received, int type)
80 {
81 // Do not schedule a new read if the connection failed.
82 if (bytes_received==0 || error)
83 {
84 // 107: Transport endpoint is not connected
85 // 125: Operation canceled
86 if (error && error!=bs::error_code(107, bs::system_category))
87 {
88 stringstream str;
89 str << "Reading from " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
90 Error(str);
91 }
92 PostClose(error!=bs::error_code(125, bs::system_category));
93 return;
94 }
95
96 string txt;
97
98 if (bytes_received==2)
99 {
100 txt = string(fReadBuffer, bytes_received);
101 //std::vector<char> buf(128);
102 //bytes_transferred = sock.receive(boost::asio::buffer(d3));
103
104 fMsg() << "Received b=" << bytes_received << ": " << (int)fReadBuffer[0] << " " << (int)txt[0] << " '" << txt << "' " << " " << error.message() << " (" << error << ")" << endl;
105
106 if (fReadBuffer[0]=='T')
107 {
108 // AsyncRead + Deadline
109 // Do all manipulation to the buffer BEFORE this call!
110 AsyncRead(ba::buffer(fReadBuffer+2, 21)/*,
111 &Connection::HandleReceivedData*/);
112 AsyncWait(fInTimeout, 5000, &Connection::HandleReadTimeout);
113 }
114 else
115 {
116 // AsyncRead + Deadline
117 // Do all manipulation to the buffer BEFORE this call!
118 AsyncRead(ba::buffer(fReadBuffer+2, 35)/*,
119 &Connection::HandleReceivedData*/);
120 AsyncWait(fInTimeout, 5000, &Connection::HandleReadTimeout);
121 }
122 }
123 else
124 {
125 txt = string(fReadBuffer, bytes_received+2);
126 const int s = atoi(fReadBuffer+35);
127 if (s==9)
128 Info("Requested time received: "+txt);
129 else
130 state = s;
131
132 Out() << "Received b=" << bytes_received << ": " << (int)fReadBuffer[0] << " " << (int)txt[0] << " '" << txt << "' " << " " << error.message() << " (" << error << ")" << endl;
133 memset(fReadBuffer, 0, 100);
134
135 // Do all manipulation to the buffer BEFORE this call!
136 AsyncRead(ba::buffer(fReadBuffer, 2)/*,
137 &Connection::HandleReceivedData*/);
138
139
140 }
141 }
142
143 int GetState() const { return state; }
144
145 void StartAsyncRead()
146 {
147 // Start also a dealine_time for a proper timeout
148 // Therefore we must know how often we expect messages
149 // FIXME: Add deadline_counter
150
151 memset(fReadBuffer, 0, 100);
152
153 // AsyncRead + Deadline
154 AsyncRead(ba::buffer(fReadBuffer, 2)/*,
155 &Connection::HandleReceivedData*/);
156 AsyncWait(fInTimeout, 5000, &Connection::HandleReadTimeout);
157 }
158
159 /*
160 ConnectionFAD(ba::io_service& io_service, const string &addr, int port) :
161 Connection(io_service, addr, port), state(0) { }
162 ConnectionFAD(ba::io_service& io_service, const string &addr, const string &port) :
163 Connection(io_service, addr, port), state(0) { }
164 */
165
166 ConnectionFAD(ba::io_service& ioservice, MessageImp &imp) :
167 Connection(ioservice, imp()), fMsg(imp), state(0)
168 {
169 }
170};
171
172template <class T>
173class StateMachineFAD : public T, public ba::io_service
174{
175public:
176 enum states_t
177 {
178 kSM_Disconnected = 1,
179 kSM_Connecting,
180 kSM_Connected,
181 kSM_Running,
182 kSM_SomeRunning,
183 kSM_Starting,
184 kSM_Stopping,
185 kSM_Reconnect,
186 kSM_SetUrl,
187 };
188
189 ConnectionFAD c1;
190 ConnectionFAD c2;
191 ConnectionFAD c3;
192 ConnectionFAD c4;
193 ConnectionFAD c5;
194 ConnectionFAD c6;
195 ConnectionFAD c7;
196 ConnectionFAD c8;
197 ConnectionFAD c9;
198
199 /*
200 int Write(const Time &time, const char *txt, int qos)
201 {
202 return T::Write(time, txt, qos);
203 }
204 */
205 Timers fTimers;
206
207 StateMachineFAD(const string &name="", ostream &out=cout) :
208 T(out, name),
209 c1(*this, *this), c2(*this, *this), c3(*this, *this), c4(*this, *this),
210 c5(*this, *this), c6(*this, *this), c7(*this, *this), c8(*this, *this),
211 c9(*this, *this), fTimers(out)
212 {
213// c1.SetEndpoint();
214 c2.SetEndpoint("localhost", 4001);
215 c3.SetEndpoint("ftmboard1.ethz.ch", 5000);
216 c4.SetEndpoint("localhost", 4003);
217 c5.SetEndpoint("localhost", 4004);
218 c6.SetEndpoint("localhost", 4005);
219 c7.SetEndpoint("localhost", 4006);
220 c8.SetEndpoint("localhost", 4007);
221 c9.SetEndpoint("localhost", 4008);
222
223 c1.SetLogStream(this);
224 c2.SetLogStream(this);
225 c3.SetLogStream(this);
226 c4.SetLogStream(this);
227 c5.SetLogStream(this);
228 c6.SetLogStream(this);
229 c7.SetLogStream(this);
230 c8.SetLogStream(this);
231 c9.SetLogStream(this);
232
233 c1.StartConnect(); // This sets the connection to "open"
234 c2.StartConnect(); // This sets the connection to "open"
235 c3.StartConnect(); // This sets the connection to "open"
236 //c4.StartConnect(); // This sets the connection to "open"
237 //c5.StartConnect(); // This sets the connection to "open"
238 //c6.StartConnect(); // This sets the connection to "open"
239 //c7.StartConnect(); // This sets the connection to "open"
240 //c8.StartConnect(); // This sets the connection to "open"
241 //c9.StartConnect(); // This sets the connection to "open"
242
243 AddStateName(kSM_Disconnected, "Disconnected");
244 AddStateName(kSM_Connecting, "Connecting"); // Some connected
245 AddStateName(kSM_Connected, "Connected");
246 AddStateName(kSM_Running, "Running");
247 AddStateName(kSM_SomeRunning, "SomeRunning");
248 AddStateName(kSM_Starting, "Starting");
249 AddStateName(kSM_Stopping, "Stopping");
250
251 AddTransition(kSM_Running, "START", kSM_Connected).
252 AssignFunction(boost::bind(&StateMachineFAD::Start, this, _1, 5));
253 AddTransition(kSM_Connected, "STOP", kSM_Running);
254
255 AddConfiguration("TIME", kSM_Running);
256 AddConfiguration("LED", kSM_Connected);
257
258 T::AddConfiguration("TESTI", "I");
259 T::AddConfiguration("TESTI2", "I:2");
260 T::AddConfiguration("TESTIF", "I:2;F:2");
261 T::AddConfiguration("TESTIC", "I:2;C");
262
263 T::AddConfiguration("CMD", "C").
264 AssignFunction(boost::bind(&StateMachineFAD::Command, this, _1));
265
266 AddTransition(kSM_Reconnect, "RECONNECT");
267
268 AddTransition(kSM_SetUrl, "SETURL", "C");
269 }
270
271 int Command(const EventImp &evt)
272 {
273 string cmd = evt.GetText();
274
275 size_t p0 = cmd.find_first_of(' ');
276 if (p0==string::npos)
277 p0 = cmd.length();
278
279 T::Out() << "\nCommand: '" << cmd.substr(0, p0) << "'" << cmd.substr(p0)<< "'" << endl;
280 /*
281 const Converter c(T::Out(), "B:5;I:2;F;W;O;C", "yes no false 0 1 31 42 11.12 \"test hallo\" ");
282
283 T::Out() << c.GetRc() << endl;
284 T::Out() << c.N() << endl;
285 T::Out() << c.Get<bool>(0) << endl;
286 T::Out() << c.Get<bool>(1) << endl;
287 T::Out() << c.Get<bool>(2) << endl;
288 T::Out() << c.Get<bool>(3) << endl;
289 T::Out() << c.Get<bool>(4) << endl;
290 T::Out() << c.Get<int>(5) << endl;
291 T::Out() << c.Get<int>(6) << endl;
292 T::Out() << c.Get<float>(7) << endl;
293 T::Out() << c.Get<int>(7) << endl;
294 T::Out() << c.Get<string>(8) << endl;
295 T::Out() << c.Get<string>(9) << endl;
296 T::Out() << c.Get<string>(10) << endl;
297 */
298 return T::GetCurrentState();
299 }
300 int Start(const EventImp &evt, int i)
301 {
302 switch (evt.GetTargetState())
303 {
304 case kSM_Running: // We are coming from kRunning
305 case kSM_Starting: // We are coming from kConnected
306 T::Out() << "Received Start(" << i << ")" << endl;
307 c1.PostMessage("START", 10);
308 c2.PostMessage("START", 10);
309 // We could introduce a "waiting for execution" state
310 return T::GetCurrentState();
311 }
312 return T::kSM_FatalError;
313 }
314
315 void Close()
316 {
317 c1.PostClose();
318 c2.PostClose();
319 c3.PostClose();
320 c4.PostClose();
321 c5.PostClose();
322 c6.PostClose();
323 c7.PostClose();
324 c8.PostClose();
325 c9.PostClose();
326 }
327
328
329 int Execute()
330 {
331 // Dispatch at most one handler from the queue. In contrary
332 // to run_run(), it doesn't wait until a handler is available
333 // which can be dispatched, so poll_one() might return with 0
334 // handlers dispatched. The handlers are always dispatched
335 // synchronously.
336
337 fTimers.SetT();
338 const int n = poll_one();
339 fTimers.Proc(n==0 && T::IsQueueEmpty());
340
341// return c3.IsConnected() ? kSM_Connected : kSM_Disconnected;
342
343
344 // None is connected
345 if (!c1.IsConnected() && !c2.IsConnected())
346 return kSM_Disconnected;
347
348 // Some are connected
349 if (c1.IsConnected()!=c2.IsConnected())
350 return kSM_Connecting;
351
352 if (c1.GetState()==0 && c2.GetState()==0 && T::GetCurrentState()!=kSM_Starting)
353 return kSM_Connected;
354
355 if (c1.GetState()==1 && c2.GetState()==1 && T::GetCurrentState()!=kSM_Stopping)
356 return kSM_Running;
357
358 return kSM_SomeRunning;//GetCurrentState();
359 }
360
361 int Transition(const Event &evt)
362 {
363 ConnectionFAD *con1 = &c1;
364 ConnectionFAD *con2 = &c2;
365
366 switch (evt.GetTargetState())
367 {
368 case kSM_SetUrl:
369 T::Out() << evt.GetText() << endl;
370 c1.SetEndpoint(evt.GetText());
371 return T::GetCurrentState();
372 case kSM_Reconnect:
373 // Close all connections
374 c1.PostClose(false);
375 c2.PostClose(false);
376 c3.PostClose(false);
377
378 // Now wait until all connection have been closed and
379 // all pending handlers have been processed
380 poll();
381
382 // Now we can reopen the connection
383 c1.PostClose(true);
384 c2.PostClose(true);
385 c3.PostClose(true);
386
387
388 //c4.PostClose(true);
389 //c5.PostClose(true);
390 //c6.PostClose(true);
391 //c7.PostClose(true);
392 //c8.PostClose(true);
393 //c9.PostClose(true);
394 return T::GetCurrentState();
395 case kSM_Running: // We are coming from kRunning
396 case kSM_Starting: // We are coming from kConnected
397 T::Out() << "Received START" << endl;
398 con1->PostMessage("START", 10);
399 con2->PostMessage("START", 10);
400 // We could introduce a "waiting for execution" state
401 return T::GetCurrentState();
402 return kSM_Starting; //GetCurrentState();
403
404 case kSM_Connected: // We are coming from kConnected
405 case kSM_Stopping: // We are coming from kRunning
406 T::Out() << "Received STOP" << endl;
407 con1->PostMessage("STOP", 10);
408 con2->PostMessage("STOP", 10);
409 // We could introduce a "waiting for execution" state
410 return T::GetCurrentState();
411 return kSM_Stopping;//GetCurrentState();
412 }
413
414 return T::kSM_FatalError; //evt.GetTargetState();
415 }
416 int Configure(const Event &evt)
417 {
418 if (evt.GetName()=="TIME")
419 {
420 c1.PostMessage("TIME", 10);
421 c2.PostMessage("TIME", 10);
422 }
423
424 vector<char> v(2);
425 v[0] = 0xc0;
426 v[1] = 0x00;
427
428 if (evt.GetName()=="LED")
429 c3.PostMessage(v);
430
431 return T::GetCurrentState();
432 }
433};
434
435// ------------------------------------------------------------------------
436
437template<class S>
438int RunDim(Configuration &conf)
439{
440 /*
441 initscr(); // Start curses mode
442 cbreak(); // Line buffering disabled, Pass on
443 intrflush(stdscr, FALSE);
444 start_color(); // Initialize ncurses colors
445 use_default_colors(); // Assign terminal default colors to -1
446 for (int i=1; i<8; i++)
447 init_pair(i, i, -1); // -1: def background
448 scrollok(stdscr, true);
449 */
450
451 WindowLog wout;
452
453 //log.SetWindow(stdscr);
454 if (conf.Has("log"))
455 if (!wout.OpenLogFile(conf.Get<string>("log")))
456 wout << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
457
458 // Start io_service.Run to use the StateMachineImp::Run() loop
459 // Start io_service.run to only use the commandHandler command detaching
460 StateMachineFAD<S> io_service("DATA_LOGGER", wout);
461 io_service.Run();
462
463 return 0;
464}
465
466template<class T, class S>
467int RunShell(Configuration &conf)
468{
469 static T shell(conf.GetName().c_str(), conf.Get<int>("console")!=1);
470
471 WindowLog &win = shell.GetStreamIn();
472 WindowLog &wout = shell.GetStreamOut();
473
474 if (conf.Has("log"))
475 if (!wout.OpenLogFile(conf.Get<string>("log")))
476 win << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
477
478 StateMachineFAD<S> io_service("DATA_LOGGER", wout);
479 shell.SetReceiver(io_service);
480
481 boost::thread t(boost::bind(&StateMachineFAD<S>::Run, &io_service));
482
483 //io_service.SetReady();
484
485 shell.Run(); // Run the shell
486 io_service.Stop(); // Signal Loop-thread to stop
487 // io_service.Close(); // Obsolete, done by the destructor
488 // wout << "join: " << t.timed_join(boost::posix_time::milliseconds(0)) << endl;
489
490 // Wait until the StateMachine has finished its thread
491 // before returning and destroying the dim objects which might
492 // still be in use.
493 t.join();
494
495 return 0;
496}
497
498/*
499 Extract usage clause(s) [if any] for SYNOPSIS.
500 Translators: "Usage" and "or" here are patterns (regular expressions) which
501 are used to match the usage synopsis in program output. An example from cp
502 (GNU coreutils) which contains both strings:
503 Usage: cp [OPTION]... [-T] SOURCE DEST
504 or: cp [OPTION]... SOURCE... DIRECTORY
505 or: cp [OPTION]... -t DIRECTORY SOURCE...
506 */
507void PrintUsage()
508{
509 cout << "\n"
510 "The console connects to all available Dim Servers and allows to "
511 "easily access all of their commands.\n"
512 "\n"
513 "Usage: test3 [-c type] [OPTIONS]\n"
514 " or: test3 [OPTIONS]\n"
515 "\n"
516 "Options:\n"
517 "The following describes the available commandline options. "
518 "For further details on how command line option are parsed "
519 "and in which order which configuration sources are accessed "
520 "please refer to the class reference of the Configuration class.";
521 cout << endl;
522
523}
524
525void PrintHelp()
526{
527 cout << "\n"
528 "The default is that the program is started without user interaction. "
529 "All actions are supposed to arrive as DimCommands. Using the -c "
530 "option, a local shell can be initialized. With h or help a short "
531 "help message about the usuage can be brought to the screen."
532 << endl;
533
534 /*
535 cout << "bla bla bla" << endl << endl;
536 cout << endl;
537 cout << "Environment:" << endl;
538 cout << "environment" << endl;
539 cout << endl;
540 cout << "Examples:" << endl;
541 cout << "test exam" << endl;
542 cout << endl;
543 cout << "Files:" << endl;
544 cout << "files" << endl;
545 cout << endl;
546 */
547}
548
549/*
550 The first line of the --version information is assumed to be in one
551 of the following formats:
552
553 <version>
554 <program> <version>
555 {GNU,Free} <program> <version>
556 <program> ({GNU,Free} <package>) <version>
557 <program> - {GNU,Free} <package> <version>
558
559 and separated from any copyright/author details by a blank line.
560
561 Handle multi-line bug reporting sections of the form:
562
563 Report <program> bugs to <addr>
564 GNU <package> home page: <url>
565 ...
566*/
567void PrintVersion(const char *name)
568{
569 cout <<
570 name << " - FACT++ 1.0\n"
571 "\n"
572 "Written by Thomas Bretz <thomas.bretz@epfl.ch> et al.\n"
573 "\n"
574 "Report bugs to Thomas Bretz <thomas.bretz@epfl.ch>\n"
575 "FACT++ home page: http://www.xxx.com\n"
576 "\n"
577 "Copyright (C) 2011 by the FACT Collaboration.\n"
578 "This is free software; see the source for copying conditions.\n"
579 << endl;
580}
581
582
583void SetupConfiguration(Configuration &conf)
584{
585 const string n = conf.GetName()+".log";
586
587 po::options_description config("Program options");
588 config.add_options()
589 ("dns", var<string>("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
590 ("log,l", var<string>(n), "Write log-file")
591 ("no-dim,d", po_switch(), "Disable dim services")
592 ("console,c", var<int>(), "Use console (0=shell, 1=simple buffered, X=simple unbuffered)")
593 ;
594
595 conf.AddEnv("dns", "DIM_DNS_NODE");
596
597 conf.AddOptions(config);
598}
599
600int main(int argc, const char* argv[])
601{
602 Configuration conf(argv[0]);
603 conf.SetPrintUsage(PrintUsage);
604 SetupConfiguration(conf);
605
606 po::variables_map vm;
607 try
608 {
609 vm = conf.Parse(argc, argv);
610 }
611 catch (std::exception &e)
612 {
613#if BOOST_VERSION > 104000
614 po::multiple_occurrences *MO = dynamic_cast<po::multiple_occurrences*>(&e);
615 if (MO)
616 cout << "Error: " << e.what() << " of '" << MO->get_option_name() << "' option." << endl;
617 else
618#endif
619 cout << "Error: " << e.what() << endl;
620 cout << endl;
621
622 return -1;
623 }
624
625 if (conf.HasPrint())
626 return -1;
627
628 if (conf.HasVersion())
629 {
630 PrintVersion(argv[0]);
631 return -1;
632 }
633
634 if (conf.HasHelp())
635 {
636 PrintHelp();
637 return -1;
638 }
639
640 // To allow overwriting of DIM_DNS_NODE set 0 to 1
641 setenv("DIM_DNS_NODE", conf.Get<string>("dns").c_str(), 1);
642
643 try
644 {
645 // No console access at all
646 if (!conf.Has("console"))
647 {
648 if (conf.Get<bool>("no-dim"))
649 return RunDim<StateMachine>(conf);
650 else
651 return RunDim<StateMachineDim>(conf);
652 }
653 // Cosole access w/ and w/o Dim
654 if (conf.Get<bool>("no-dim"))
655 {
656 if (conf.Get<int>("console")==0)
657 return RunShell<LocalShell, StateMachine>(conf);
658 else
659 return RunShell<LocalConsole, StateMachine>(conf);
660 }
661 else
662 {
663 if (conf.Get<int>("console")==0)
664 return RunShell<LocalShell, StateMachineDim>(conf);
665 else
666 return RunShell<LocalConsole, StateMachineDim>(conf);
667 }
668 }
669 catch (std::exception& e)
670 {
671 std::cerr << "Exception: " << e.what() << "\n";
672 }
673
674 return 0;
675}
676
677/*
678class FADctrlDim : public StateMachineFAD<StateMachineDim>
679{
680public:
681FADctrlDim(const std::string &name="DATA_LOGGER", std::ostream &out=std::cout)
682: StateMachineFAD<StateMachineDim>(out, name) { }
683};
684
685 class FADctrlLocalShell : public StateMachineFAD<StateMachine>
686{
687public:
688 ostream &win;
689
690 FADctrlLocalShell(std::ostream &out, std::ostream &out2)
691 : StateMachineFAD<StateMachine>(out), win(out2) { }
692
693 FADctrlLocalShell(std::ostream &out=std::cout)
694 : StateMachineFAD<StateMachine>(out), win(out) { }
695
696};
697*/
Note: See TracBrowser for help on using the repository browser.