source: trunk/FACT++/src/dclient5.cc@ 10608

Last change on this file since 10608 was 10600, checked in by tbretz, 14 years ago
Added fix because gcc 4.5 is not supported by boost 1.42 which are both part of natty.
File size: 21.1 KB
Line 
1#include <boost/bind.hpp>
2#if BOOST_VERSION<1440
3#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
4#undef BOOST_HAS_RVALUE_REFS
5#error test
6#endif
7#endif
8#include <boost/thread.hpp>
9#include <boost/asio/error.hpp>
10#include <boost/asio/deadline_timer.hpp>
11
12#include "Event.h"
13#include "Shell.h"
14#include "StateMachineDim.h"
15#include "Connection.h"
16#include "Configuration.h"
17#include "Timers.h"
18#include "Console.h"
19
20#include "tools.h"
21
22namespace ba = boost::asio;
23namespace bs = boost::system;
24
25using ba::deadline_timer;
26using ba::ip::tcp;
27
28using namespace std;
29
30
31// ------------------------------------------------------------------------
32
33#include "LocalControl.h"
34
35// ------------------------------------------------------------------------
36
37class ConnectionFAD : public Connection
38{
39 MessageImp &fMsg;
40
41 int state;
42
43 char fReadBuffer[1000];
44
45public:
46 void ConnectionEstablished()
47 {
48 StartAsyncRead();
49 }
50
51 void HandleReadTimeout(const bs::error_code &error)
52 {
53 return;
54 if (!is_open())
55 {
56 // For example: Here we could schedule a new accept if we
57 // would not want to allow two connections at the same time.
58 return;
59 }
60
61 // 125: Operation canceled
62
63 if (error && error!=ba::error::basic_errors::operation_aborted)
64 {
65 stringstream str;
66
67 str << "HandleReadTimeout: " << error.message() << " (" << error << ")";// << endl;
68 if (error==ba::error::misc_errors::eof)
69 Warn(str); // Connection: EOF (closed by remote host)
70 else
71 Error(str);
72 }
73
74 // Check whether the deadline has passed. We compare the deadline
75 // against the current time since a new asynchronous operation
76 // may have moved the deadline before this actor had a chance
77 // to run.
78 if (fInTimeout.expires_at() > deadline_timer::traits_type::now())
79 return;
80
81 Error("fInTimeout has expired...");
82
83 PostClose();
84 }
85
86 void HandleReceivedData(const bs::error_code& error, size_t bytes_received, int type)
87 {
88 // Do not schedule a new read if the connection failed.
89 if (bytes_received==0 || error)
90 {
91 // 107: Transport endpoint is not connected
92 // 125: Operation canceled
93 if (error && error!=ba::error::basic_errors::not_connected)
94 {
95 stringstream str;
96 str << "Reading from " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
97 Error(str);
98 }
99 PostClose(error!=ba::error::basic_errors::operation_aborted);
100 return;
101 }
102
103 string txt;
104
105 if (bytes_received==2)
106 {
107 txt = string(fReadBuffer, bytes_received);
108 //std::vector<char> buf(128);
109 //bytes_transferred = sock.receive(boost::asio::buffer(d3));
110
111 fMsg() << "Received b=" << bytes_received << ": " << (int)fReadBuffer[0] << " " << (int)txt[0] << " '" << txt << "' " << " " << error.message() << " (" << error << ")" << endl;
112
113 if (fReadBuffer[0]=='T')
114 {
115 // AsyncRead + Deadline
116 // Do all manipulation to the buffer BEFORE this call!
117 AsyncRead(ba::buffer(fReadBuffer+2, 21)/*,
118 &Connection::HandleReceivedData*/);
119 AsyncWait(fInTimeout, 5000, &Connection::HandleReadTimeout);
120 }
121 else
122 {
123 // AsyncRead + Deadline
124 // Do all manipulation to the buffer BEFORE this call!
125 AsyncRead(ba::buffer(fReadBuffer+2, 35)/*,
126 &Connection::HandleReceivedData*/);
127 AsyncWait(fInTimeout, 5000, &Connection::HandleReadTimeout);
128 }
129 }
130 else
131 {
132 txt = string(fReadBuffer, bytes_received+2);
133 const int s = atoi(fReadBuffer+35);
134 if (s==9)
135 Info("Requested time received: "+txt);
136 else
137 state = s;
138
139 Out() << "Received b=" << bytes_received << ": " << (int)fReadBuffer[0] << " " << (int)txt[0] << " '" << txt << "' " << " " << error.message() << " (" << error << ")" << endl;
140 memset(fReadBuffer, 0, 100);
141
142 // Do all manipulation to the buffer BEFORE this call!
143 AsyncRead(ba::buffer(fReadBuffer, 2)/*,
144 &Connection::HandleReceivedData*/);
145
146
147 }
148 }
149
150 int GetState() const { return state; }
151
152 void StartAsyncRead()
153 {
154 // Start also a dealine_time for a proper timeout
155 // Therefore we must know how often we expect messages
156 // FIXME: Add deadline_counter
157
158 memset(fReadBuffer, 0, 100);
159
160 // AsyncRead + Deadline
161 AsyncRead(ba::buffer(fReadBuffer, 2)/*,
162 &Connection::HandleReceivedData*/);
163 AsyncWait(fInTimeout, 5000, &Connection::HandleReadTimeout);
164 }
165
166 /*
167 ConnectionFAD(ba::io_service& io_service, const string &addr, int port) :
168 Connection(io_service, addr, port), state(0) { }
169 ConnectionFAD(ba::io_service& io_service, const string &addr, const string &port) :
170 Connection(io_service, addr, port), state(0) { }
171 */
172
173 ConnectionFAD(ba::io_service& ioservice, MessageImp &imp) :
174 Connection(ioservice, imp()), fMsg(imp), state(0)
175 {
176 }
177};
178
179template <class T>
180class StateMachineFAD : public T, public ba::io_service
181{
182public:
183 enum states_t
184 {
185 kSM_Disconnected = 1,
186 kSM_Connecting,
187 kSM_Connected,
188 kSM_Running,
189 kSM_SomeRunning,
190 kSM_Starting,
191 kSM_Stopping,
192 kSM_Reconnect,
193 kSM_SetUrl,
194 };
195
196 ConnectionFAD c1;
197 ConnectionFAD c2;
198 ConnectionFAD c3;
199 ConnectionFAD c4;
200 ConnectionFAD c5;
201 ConnectionFAD c6;
202 ConnectionFAD c7;
203 ConnectionFAD c8;
204 ConnectionFAD c9;
205
206 /*
207 int Write(const Time &time, const char *txt, int qos)
208 {
209 return T::Write(time, txt, qos);
210 }
211 */
212 Timers fTimers;
213
214 StateMachineFAD(const string &name="", ostream &out=cout) :
215 T(out, name),
216 c1(*this, *this), c2(*this, *this), c3(*this, *this), c4(*this, *this),
217 c5(*this, *this), c6(*this, *this), c7(*this, *this), c8(*this, *this),
218 c9(*this, *this), fTimers(out)
219 {
220// c1.SetEndpoint();
221 c2.SetEndpoint("localhost", 4001);
222 c3.SetEndpoint("ftmboard1.ethz.ch", 5000);
223 c4.SetEndpoint("localhost", 4003);
224 c5.SetEndpoint("localhost", 4004);
225 c6.SetEndpoint("localhost", 4005);
226 c7.SetEndpoint("localhost", 4006);
227 c8.SetEndpoint("localhost", 4007);
228 c9.SetEndpoint("localhost", 4008);
229
230 c1.SetLogStream(this);
231 c2.SetLogStream(this);
232 c3.SetLogStream(this);
233 c4.SetLogStream(this);
234 c5.SetLogStream(this);
235 c6.SetLogStream(this);
236 c7.SetLogStream(this);
237 c8.SetLogStream(this);
238 c9.SetLogStream(this);
239
240 c1.StartConnect(); // This sets the connection to "open"
241 c2.StartConnect(); // This sets the connection to "open"
242 c3.StartConnect(); // This sets the connection to "open"
243 //c4.StartConnect(); // This sets the connection to "open"
244 //c5.StartConnect(); // This sets the connection to "open"
245 //c6.StartConnect(); // This sets the connection to "open"
246 //c7.StartConnect(); // This sets the connection to "open"
247 //c8.StartConnect(); // This sets the connection to "open"
248 //c9.StartConnect(); // This sets the connection to "open"
249
250 AddStateName(kSM_Disconnected, "Disconnected");
251 AddStateName(kSM_Connecting, "Connecting"); // Some connected
252 AddStateName(kSM_Connected, "Connected");
253 AddStateName(kSM_Running, "Running");
254 AddStateName(kSM_SomeRunning, "SomeRunning");
255 AddStateName(kSM_Starting, "Starting");
256 AddStateName(kSM_Stopping, "Stopping");
257
258 AddTransition(kSM_Running, "START", kSM_Connected).
259 AssignFunction(boost::bind(&StateMachineFAD::Start, this, _1, 5));
260 AddTransition(kSM_Connected, "STOP", kSM_Running);
261
262 AddConfiguration("TIME", kSM_Running);
263 AddConfiguration("LED", kSM_Connected);
264
265 T::AddConfiguration("TESTI", "I");
266 T::AddConfiguration("TESTI2", "I:2");
267 T::AddConfiguration("TESTIF", "I:2;F:2");
268 T::AddConfiguration("TESTIC", "I:2;C");
269
270 T::AddConfiguration("CMD", "C").
271 AssignFunction(boost::bind(&StateMachineFAD::Command, this, _1));
272
273 AddTransition(kSM_Reconnect, "RECONNECT");
274
275 AddTransition(kSM_SetUrl, "SETURL", "C");
276 }
277
278 int Command(const EventImp &evt)
279 {
280 string cmd = evt.GetText();
281
282 size_t p0 = cmd.find_first_of(' ');
283 if (p0==string::npos)
284 p0 = cmd.length();
285
286 T::Out() << "\nCommand: '" << cmd.substr(0, p0) << "'" << cmd.substr(p0)<< "'" << endl;
287 /*
288 const Converter c(T::Out(), "B:5;I:2;F;W;O;C", "yes no false 0 1 31 42 11.12 \"test hallo\" ");
289
290 T::Out() << c.GetRc() << endl;
291 T::Out() << c.N() << endl;
292 T::Out() << c.Get<bool>(0) << endl;
293 T::Out() << c.Get<bool>(1) << endl;
294 T::Out() << c.Get<bool>(2) << endl;
295 T::Out() << c.Get<bool>(3) << endl;
296 T::Out() << c.Get<bool>(4) << endl;
297 T::Out() << c.Get<int>(5) << endl;
298 T::Out() << c.Get<int>(6) << endl;
299 T::Out() << c.Get<float>(7) << endl;
300 T::Out() << c.Get<int>(7) << endl;
301 T::Out() << c.Get<string>(8) << endl;
302 T::Out() << c.Get<string>(9) << endl;
303 T::Out() << c.Get<string>(10) << endl;
304 */
305 return T::GetCurrentState();
306 }
307 int Start(const EventImp &evt, int i)
308 {
309 switch (evt.GetTargetState())
310 {
311 case kSM_Running: // We are coming from kRunning
312 case kSM_Starting: // We are coming from kConnected
313 T::Out() << "Received Start(" << i << ")" << endl;
314 c1.PostMessage("START", 10);
315 c2.PostMessage("START", 10);
316 // We could introduce a "waiting for execution" state
317 return T::GetCurrentState();
318 }
319 return T::kSM_FatalError;
320 }
321
322 void Close()
323 {
324 c1.PostClose();
325 c2.PostClose();
326 c3.PostClose();
327 c4.PostClose();
328 c5.PostClose();
329 c6.PostClose();
330 c7.PostClose();
331 c8.PostClose();
332 c9.PostClose();
333 }
334
335
336 int Execute()
337 {
338 // Dispatch at most one handler from the queue. In contrary
339 // to run_run(), it doesn't wait until a handler is available
340 // which can be dispatched, so poll_one() might return with 0
341 // handlers dispatched. The handlers are always dispatched
342 // synchronously.
343
344 fTimers.SetT();
345 const int n = poll_one();
346 fTimers.Proc(n==0 && T::IsQueueEmpty());
347
348// return c3.IsConnected() ? kSM_Connected : kSM_Disconnected;
349
350
351 // None is connected
352 if (!c1.IsConnected() && !c2.IsConnected())
353 return kSM_Disconnected;
354
355 // Some are connected
356 if (c1.IsConnected()!=c2.IsConnected())
357 return kSM_Connecting;
358
359 if (c1.GetState()==0 && c2.GetState()==0 && T::GetCurrentState()!=kSM_Starting)
360 return kSM_Connected;
361
362 if (c1.GetState()==1 && c2.GetState()==1 && T::GetCurrentState()!=kSM_Stopping)
363 return kSM_Running;
364
365 return kSM_SomeRunning;//GetCurrentState();
366 }
367
368 int Transition(const Event &evt)
369 {
370 ConnectionFAD *con1 = &c1;
371 ConnectionFAD *con2 = &c2;
372
373 switch (evt.GetTargetState())
374 {
375 case kSM_SetUrl:
376 T::Out() << evt.GetText() << endl;
377 c1.SetEndpoint(evt.GetText());
378 return T::GetCurrentState();
379 case kSM_Reconnect:
380 // Close all connections
381 c1.PostClose(false);
382 c2.PostClose(false);
383 c3.PostClose(false);
384
385 // Now wait until all connection have been closed and
386 // all pending handlers have been processed
387 poll();
388
389 // Now we can reopen the connection
390 c1.PostClose(true);
391 c2.PostClose(true);
392 c3.PostClose(true);
393
394
395 //c4.PostClose(true);
396 //c5.PostClose(true);
397 //c6.PostClose(true);
398 //c7.PostClose(true);
399 //c8.PostClose(true);
400 //c9.PostClose(true);
401 return T::GetCurrentState();
402 case kSM_Running: // We are coming from kRunning
403 case kSM_Starting: // We are coming from kConnected
404 T::Out() << "Received START" << endl;
405 con1->PostMessage("START", 10);
406 con2->PostMessage("START", 10);
407 // We could introduce a "waiting for execution" state
408 return T::GetCurrentState();
409 return kSM_Starting; //GetCurrentState();
410
411 case kSM_Connected: // We are coming from kConnected
412 case kSM_Stopping: // We are coming from kRunning
413 T::Out() << "Received STOP" << endl;
414 con1->PostMessage("STOP", 10);
415 con2->PostMessage("STOP", 10);
416 // We could introduce a "waiting for execution" state
417 return T::GetCurrentState();
418 return kSM_Stopping;//GetCurrentState();
419 }
420
421 return T::kSM_FatalError; //evt.GetTargetState();
422 }
423 int Configure(const Event &evt)
424 {
425 if (evt.GetName()=="TIME")
426 {
427 c1.PostMessage("TIME", 10);
428 c2.PostMessage("TIME", 10);
429 }
430
431 vector<char> v(2);
432 v[0] = 0xc0;
433 v[1] = 0x00;
434
435 if (evt.GetName()=="LED")
436 c3.PostMessage(v);
437
438 return T::GetCurrentState();
439 }
440};
441
442// ------------------------------------------------------------------------
443
444template<class S>
445int RunDim(Configuration &conf)
446{
447 /*
448 initscr(); // Start curses mode
449 cbreak(); // Line buffering disabled, Pass on
450 intrflush(stdscr, FALSE);
451 start_color(); // Initialize ncurses colors
452 use_default_colors(); // Assign terminal default colors to -1
453 for (int i=1; i<8; i++)
454 init_pair(i, i, -1); // -1: def background
455 scrollok(stdscr, true);
456 */
457
458 WindowLog wout;
459
460 //log.SetWindow(stdscr);
461 if (conf.Has("log"))
462 if (!wout.OpenLogFile(conf.Get<string>("log")))
463 wout << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
464
465 // Start io_service.Run to use the StateMachineImp::Run() loop
466 // Start io_service.run to only use the commandHandler command detaching
467 StateMachineFAD<S> io_service("DATA_LOGGER", wout);
468 io_service.Run();
469
470 return 0;
471}
472
473template<class T, class S>
474int RunShell(Configuration &conf)
475{
476 static T shell(conf.GetName().c_str(), conf.Get<int>("console")!=1);
477
478 WindowLog &win = shell.GetStreamIn();
479 WindowLog &wout = shell.GetStreamOut();
480
481 if (conf.Has("log"))
482 if (!wout.OpenLogFile(conf.Get<string>("log")))
483 win << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
484
485 StateMachineFAD<S> io_service("DATA_LOGGER", wout);
486 shell.SetReceiver(io_service);
487
488 boost::thread t(boost::bind(&StateMachineFAD<S>::Run, &io_service));
489
490 //io_service.SetReady();
491
492 shell.Run(); // Run the shell
493 io_service.Stop(); // Signal Loop-thread to stop
494 // io_service.Close(); // Obsolete, done by the destructor
495 // wout << "join: " << t.timed_join(boost::posix_time::milliseconds(0)) << endl;
496
497 // Wait until the StateMachine has finished its thread
498 // before returning and destroying the dim objects which might
499 // still be in use.
500 t.join();
501
502 return 0;
503}
504
505/*
506 Extract usage clause(s) [if any] for SYNOPSIS.
507 Translators: "Usage" and "or" here are patterns (regular expressions) which
508 are used to match the usage synopsis in program output. An example from cp
509 (GNU coreutils) which contains both strings:
510 Usage: cp [OPTION]... [-T] SOURCE DEST
511 or: cp [OPTION]... SOURCE... DIRECTORY
512 or: cp [OPTION]... -t DIRECTORY SOURCE...
513 */
514void PrintUsage()
515{
516 cout << "\n"
517 "The console connects to all available Dim Servers and allows to "
518 "easily access all of their commands.\n"
519 "\n"
520 "Usage: test3 [-c type] [OPTIONS]\n"
521 " or: test3 [OPTIONS]\n"
522 "\n"
523 "Options:\n"
524 "The following describes the available commandline options. "
525 "For further details on how command line option are parsed "
526 "and in which order which configuration sources are accessed "
527 "please refer to the class reference of the Configuration class.";
528 cout << endl;
529
530}
531
532void PrintHelp()
533{
534 cout << "\n"
535 "The default is that the program is started without user interaction. "
536 "All actions are supposed to arrive as DimCommands. Using the -c "
537 "option, a local shell can be initialized. With h or help a short "
538 "help message about the usuage can be brought to the screen."
539 << endl;
540
541 /*
542 cout << "bla bla bla" << endl << endl;
543 cout << endl;
544 cout << "Environment:" << endl;
545 cout << "environment" << endl;
546 cout << endl;
547 cout << "Examples:" << endl;
548 cout << "test exam" << endl;
549 cout << endl;
550 cout << "Files:" << endl;
551 cout << "files" << endl;
552 cout << endl;
553 */
554}
555
556/*
557 The first line of the --version information is assumed to be in one
558 of the following formats:
559
560 <version>
561 <program> <version>
562 {GNU,Free} <program> <version>
563 <program> ({GNU,Free} <package>) <version>
564 <program> - {GNU,Free} <package> <version>
565
566 and separated from any copyright/author details by a blank line.
567
568 Handle multi-line bug reporting sections of the form:
569
570 Report <program> bugs to <addr>
571 GNU <package> home page: <url>
572 ...
573*/
574void PrintVersion(const char *name)
575{
576 cout <<
577 name << " - "PACKAGE_STRING"\n"
578 "\n"
579 "Written by Thomas Bretz et al.\n"
580 "\n"
581 "Report bugs to <"PACKAGE_BUGREPORT">\n"
582 "Home page: "PACKAGE_URL"\n"
583 "\n"
584 "Copyright (C) 2011 by the FACT Collaboration.\n"
585 "This is free software; see the source for copying conditions.\n"
586 << endl;
587}
588
589
590void SetupConfiguration(Configuration &conf)
591{
592 const string n = conf.GetName()+".log";
593
594 po::options_description config("Program options");
595 config.add_options()
596 ("dns", var<string>("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
597 ("log,l", var<string>(n), "Write log-file")
598 ("no-dim,d", po_switch(), "Disable dim services")
599 ("console,c", var<int>(), "Use console (0=shell, 1=simple buffered, X=simple unbuffered)")
600 ;
601
602 conf.AddEnv("dns", "DIM_DNS_NODE");
603
604 conf.AddOptions(config);
605}
606
607int main(int argc, const char* argv[])
608{
609 Configuration conf(argv[0]);
610 conf.SetPrintUsage(PrintUsage);
611 SetupConfiguration(conf);
612
613 po::variables_map vm;
614 try
615 {
616 vm = conf.Parse(argc, argv);
617 }
618 catch (std::exception &e)
619 {
620#if BOOST_VERSION > 104000
621 po::multiple_occurrences *MO = dynamic_cast<po::multiple_occurrences*>(&e);
622 if (MO)
623 cout << "Error: " << e.what() << " of '" << MO->get_option_name() << "' option." << endl;
624 else
625#endif
626 cout << "Error: " << e.what() << endl;
627 cout << endl;
628
629 return -1;
630 }
631
632 if (conf.HasPrint())
633 return -1;
634
635 if (conf.HasVersion())
636 {
637 PrintVersion(argv[0]);
638 return -1;
639 }
640
641 if (conf.HasHelp())
642 {
643 PrintHelp();
644 return -1;
645 }
646
647 // To allow overwriting of DIM_DNS_NODE set 0 to 1
648 setenv("DIM_DNS_NODE", conf.Get<string>("dns").c_str(), 1);
649
650 try
651 {
652 // No console access at all
653 if (!conf.Has("console"))
654 {
655 if (conf.Get<bool>("no-dim"))
656 return RunDim<StateMachine>(conf);
657 else
658 return RunDim<StateMachineDim>(conf);
659 }
660 // Cosole access w/ and w/o Dim
661 if (conf.Get<bool>("no-dim"))
662 {
663 if (conf.Get<int>("console")==0)
664 return RunShell<LocalShell, StateMachine>(conf);
665 else
666 return RunShell<LocalConsole, StateMachine>(conf);
667 }
668 else
669 {
670 if (conf.Get<int>("console")==0)
671 return RunShell<LocalShell, StateMachineDim>(conf);
672 else
673 return RunShell<LocalConsole, StateMachineDim>(conf);
674 }
675 }
676 catch (std::exception& e)
677 {
678 std::cerr << "Exception: " << e.what() << "\n";
679 }
680
681 return 0;
682}
683
684/*
685class FADctrlDim : public StateMachineFAD<StateMachineDim>
686{
687public:
688FADctrlDim(const std::string &name="DATA_LOGGER", std::ostream &out=std::cout)
689: StateMachineFAD<StateMachineDim>(out, name) { }
690};
691
692 class FADctrlLocalShell : public StateMachineFAD<StateMachine>
693{
694public:
695 ostream &win;
696
697 FADctrlLocalShell(std::ostream &out, std::ostream &out2)
698 : StateMachineFAD<StateMachine>(out), win(out2) { }
699
700 FADctrlLocalShell(std::ostream &out=std::cout)
701 : StateMachineFAD<StateMachine>(out), win(out) { }
702
703};
704*/
Note: See TracBrowser for help on using the repository browser.