source: trunk/FACT++/src/dclient5.cc@ 10329

Last change on this file since 10329 was 10292, checked in by tbretz, 14 years ago
Replaced -> by . in AddTransition and AddConfiguration.
File size: 18.3 KB
Line 
1#include <boost/bind.hpp>
2#include <boost/thread.hpp>
3#include <boost/asio/deadline_timer.hpp>
4
5#include "Event.h"
6#include "Shell.h"
7#include "StateMachineDim.h"
8#include "Connection.h"
9#include "Configuration.h"
10#include "Timers.h"
11#include "Console.h"
12
13#include "tools.h"
14
15namespace ba = boost::asio;
16namespace bs = boost::system;
17
18using ba::deadline_timer;
19using ba::ip::tcp;
20
21using namespace std;
22
23
24// ------------------------------------------------------------------------
25
26#include "LocalControl.h"
27
28// ------------------------------------------------------------------------
29
30class ConnectionFAD : public Connection
31{
32 MessageImp &fMsg;
33
34 int state;
35
36 char fReadBuffer[1000];
37
38public:
39 void ConnectionEstablished()
40 {
41 StartAsyncRead();
42 }
43
44 void HandleReadTimeout(const bs::error_code &error)
45 {
46 return;
47 if (!is_open())
48 {
49 // For example: Here we could schedule a new accept if we
50 // would not want to allow two connections at the same time.
51 return;
52 }
53
54 // 125: Operation canceled
55
56 if (error && error!=bs::error_code(125, bs::system_category))
57 {
58 stringstream str;
59
60 str << "HandleReadTimeout: " << error.message() << " (" << error << ")";// << endl;
61 if (error==bs::error_code(2, ba::error::misc_category))
62 Warn(str); // Connection: EOF (closed by remote host)
63 else
64 Error(str);
65 }
66
67 // Check whether the deadline has passed. We compare the deadline
68 // against the current time since a new asynchronous operation
69 // may have moved the deadline before this actor had a chance
70 // to run.
71 if (fInTimeout.expires_at() > deadline_timer::traits_type::now())
72 return;
73
74 Error("fInTimeout has expired...");
75
76 PostClose();
77 }
78
79 void HandleReceivedData(const bs::error_code& error, size_t bytes_received, int type)
80 {
81 // Do not schedule a new read if the connection failed.
82 if (bytes_received==0 || error)
83 {
84 // 107: Transport endpoint is not connected
85 // 125: Operation canceled
86 if (error && error!=bs::error_code(107, bs::system_category))
87 {
88 stringstream str;
89 str << "Reading from " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
90 Error(str);
91 }
92 PostClose(error!=bs::error_code(125, bs::system_category));
93 return;
94 }
95
96 string txt;
97
98 if (bytes_received==2)
99 {
100 txt = string(fReadBuffer, bytes_received);
101 //std::vector<char> buf(128);
102 //bytes_transferred = sock.receive(boost::asio::buffer(d3));
103
104 fMsg() << "Received b=" << bytes_received << ": " << (int)fReadBuffer[0] << " " << (int)txt[0] << " '" << txt << "' " << " " << error.message() << " (" << error << ")" << endl;
105
106 if (fReadBuffer[0]=='T')
107 {
108 // AsyncRead + Deadline
109 // Do all manipulation to the buffer BEFORE this call!
110 AsyncRead(ba::buffer(fReadBuffer+2, 21)/*,
111 &Connection::HandleReceivedData*/);
112 AsyncWait(fInTimeout, 5000, &Connection::HandleReadTimeout);
113 }
114 else
115 {
116 // AsyncRead + Deadline
117 // Do all manipulation to the buffer BEFORE this call!
118 AsyncRead(ba::buffer(fReadBuffer+2, 35)/*,
119 &Connection::HandleReceivedData*/);
120 AsyncWait(fInTimeout, 5000, &Connection::HandleReadTimeout);
121 }
122 }
123 else
124 {
125 txt = string(fReadBuffer, bytes_received+2);
126 const int s = atoi(fReadBuffer+35);
127 if (s==9)
128 Info("Requested time received: "+txt);
129 else
130 state = s;
131
132 Out() << "Received b=" << bytes_received << ": " << (int)fReadBuffer[0] << " " << (int)txt[0] << " '" << txt << "' " << " " << error.message() << " (" << error << ")" << endl;
133 memset(fReadBuffer, 0, 100);
134
135 // Do all manipulation to the buffer BEFORE this call!
136 AsyncRead(ba::buffer(fReadBuffer, 2)/*,
137 &Connection::HandleReceivedData*/);
138
139
140 }
141 }
142
143 int GetState() const { return state; }
144
145 void StartAsyncRead()
146 {
147 // Start also a dealine_time for a proper timeout
148 // Therefore we must know how often we expect messages
149 // FIXME: Add deadline_counter
150
151 memset(fReadBuffer, 0, 100);
152
153 // AsyncRead + Deadline
154 AsyncRead(ba::buffer(fReadBuffer, 2)/*,
155 &Connection::HandleReceivedData*/);
156 AsyncWait(fInTimeout, 5000, &Connection::HandleReadTimeout);
157 }
158
159 /*
160 ConnectionFAD(ba::io_service& io_service, const string &addr, int port) :
161 Connection(io_service, addr, port), state(0) { }
162 ConnectionFAD(ba::io_service& io_service, const string &addr, const string &port) :
163 Connection(io_service, addr, port), state(0) { }
164 */
165
166 ConnectionFAD(ba::io_service& ioservice, MessageImp &imp) :
167 Connection(ioservice, imp()), fMsg(imp), state(0)
168 {
169 }
170};
171
172template <class T>
173class StateMachineFAD : public T, public ba::io_service
174{
175public:
176 enum states_t
177 {
178 kSM_Disconnected = 1,
179 kSM_Connecting,
180 kSM_Connected,
181 kSM_Running,
182 kSM_SomeRunning,
183 kSM_Starting,
184 kSM_Stopping,
185 kSM_Reconnect,
186 kSM_SetUrl,
187 };
188
189 ConnectionFAD c1;
190 ConnectionFAD c2;
191 ConnectionFAD c3;
192 ConnectionFAD c4;
193 ConnectionFAD c5;
194 ConnectionFAD c6;
195 ConnectionFAD c7;
196 ConnectionFAD c8;
197 ConnectionFAD c9;
198
199 /*
200 int Write(const Time &time, const char *txt, int qos)
201 {
202 return T::Write(time, txt, qos);
203 }
204 */
205 Timers fTimers;
206
207 StateMachineFAD(const string &name="", ostream &out=cout) :
208 T(out, name),
209 c1(*this, *this), c2(*this, *this), c3(*this, *this), c4(*this, *this),
210 c5(*this, *this), c6(*this, *this), c7(*this, *this), c8(*this, *this),
211 c9(*this, *this), fTimers(out)
212 {
213// c1.SetEndpoint();
214 c2.SetEndpoint("localhost", 4001);
215 c3.SetEndpoint("ftmboard1.ethz.ch", 5000);
216 c4.SetEndpoint("localhost", 4003);
217 c5.SetEndpoint("localhost", 4004);
218 c6.SetEndpoint("localhost", 4005);
219 c7.SetEndpoint("localhost", 4006);
220 c8.SetEndpoint("localhost", 4007);
221 c9.SetEndpoint("localhost", 4008);
222
223 c1.SetLogStream(this);
224 c2.SetLogStream(this);
225 c3.SetLogStream(this);
226 c4.SetLogStream(this);
227 c5.SetLogStream(this);
228 c6.SetLogStream(this);
229 c7.SetLogStream(this);
230 c8.SetLogStream(this);
231 c9.SetLogStream(this);
232
233 c1.StartConnect(); // This sets the connection to "open"
234 c2.StartConnect(); // This sets the connection to "open"
235 c3.StartConnect(); // This sets the connection to "open"
236 //c4.StartConnect(); // This sets the connection to "open"
237 //c5.StartConnect(); // This sets the connection to "open"
238 //c6.StartConnect(); // This sets the connection to "open"
239 //c7.StartConnect(); // This sets the connection to "open"
240 //c8.StartConnect(); // This sets the connection to "open"
241 //c9.StartConnect(); // This sets the connection to "open"
242
243 AddStateName(kSM_Disconnected, "Disconnected");
244 AddStateName(kSM_Connecting, "Connecting"); // Some connected
245 AddStateName(kSM_Connected, "Connected");
246 AddStateName(kSM_Running, "Running");
247 AddStateName(kSM_SomeRunning, "SomeRunning");
248 AddStateName(kSM_Starting, "Starting");
249 AddStateName(kSM_Stopping, "Stopping");
250
251 AddTransition(kSM_Running, "START", kSM_Connected).
252 AssignFunction(boost::bind(&StateMachineFAD::Start, this, _1, 5));
253 AddTransition(kSM_Connected, "STOP", kSM_Running);
254
255 AddConfiguration("TIME", kSM_Running);
256 AddConfiguration("LED", kSM_Connected);
257
258 T::AddConfiguration("TESTI", "I");
259 T::AddConfiguration("TESTI2", "I:2");
260 T::AddConfiguration("TESTIF", "I:2;F:2");
261 T::AddConfiguration("TESTIC", "I:2;C");
262
263 T::AddConfiguration("CMD", "C").
264 AssignFunction(boost::bind(&StateMachineFAD::Command, this, _1));
265
266 AddTransition(kSM_Reconnect, "RECONNECT");
267
268 AddTransition(kSM_SetUrl, "SETURL", "C");
269
270 T::PrintListOfEvents();
271 }
272
273 int Command(const EventImp &evt)
274 {
275 string cmd = evt.GetText();
276
277 size_t p0 = cmd.find_first_of(' ');
278 if (p0==string::npos)
279 p0 = cmd.length();
280
281 T::Out() << "\nCommand: '" << cmd.substr(0, p0) << "'" << cmd.substr(p0)<< "'" << endl;
282 /*
283 const Converter c(T::Out(), "B:5;I:2;F;W;O;C", "yes no false 0 1 31 42 11.12 \"test hallo\" ");
284
285 T::Out() << c.GetRc() << endl;
286 T::Out() << c.N() << endl;
287 T::Out() << c.Get<bool>(0) << endl;
288 T::Out() << c.Get<bool>(1) << endl;
289 T::Out() << c.Get<bool>(2) << endl;
290 T::Out() << c.Get<bool>(3) << endl;
291 T::Out() << c.Get<bool>(4) << endl;
292 T::Out() << c.Get<int>(5) << endl;
293 T::Out() << c.Get<int>(6) << endl;
294 T::Out() << c.Get<float>(7) << endl;
295 T::Out() << c.Get<int>(7) << endl;
296 T::Out() << c.Get<string>(8) << endl;
297 T::Out() << c.Get<string>(9) << endl;
298 T::Out() << c.Get<string>(10) << endl;
299 */
300 return T::GetCurrentState();
301 }
302 int Start(const EventImp &evt, int i)
303 {
304 switch (evt.GetTargetState())
305 {
306 case kSM_Running: // We are coming from kRunning
307 case kSM_Starting: // We are coming from kConnected
308 T::Out() << "Received Start(" << i << ")" << endl;
309 c1.PostMessage("START", 10);
310 c2.PostMessage("START", 10);
311 // We could introduce a "waiting for execution" state
312 return T::GetCurrentState();
313 }
314 return T::kSM_FatalError;
315 }
316
317 void Close()
318 {
319 c1.PostClose();
320 c2.PostClose();
321 c3.PostClose();
322 c4.PostClose();
323 c5.PostClose();
324 c6.PostClose();
325 c7.PostClose();
326 c8.PostClose();
327 c9.PostClose();
328 }
329
330
331 int Execute()
332 {
333 // Dispatch at most one handler from the queue. In contrary
334 // to run_run(), it doesn't wait until a handler is available
335 // which can be dispatched, so poll_one() might return with 0
336 // handlers dispatched. The handlers are always dispatched
337 // synchronously.
338
339 fTimers.SetT();
340 const int n = poll_one();
341 fTimers.Proc(n==0 && T::IsQueueEmpty());
342
343// return c3.IsConnected() ? kSM_Connected : kSM_Disconnected;
344
345
346 // None is connected
347 if (!c1.IsConnected() && !c2.IsConnected())
348 return kSM_Disconnected;
349
350 // Some are connected
351 if (c1.IsConnected()!=c2.IsConnected())
352 return kSM_Connecting;
353
354 if (c1.GetState()==0 && c2.GetState()==0 && T::GetCurrentState()!=kSM_Starting)
355 return kSM_Connected;
356
357 if (c1.GetState()==1 && c2.GetState()==1 && T::GetCurrentState()!=kSM_Stopping)
358 return kSM_Running;
359
360 return kSM_SomeRunning;//GetCurrentState();
361 }
362
363 int Transition(const Event &evt)
364 {
365 ConnectionFAD *con1 = &c1;
366 ConnectionFAD *con2 = &c2;
367
368 switch (evt.GetTargetState())
369 {
370 case kSM_SetUrl:
371 T::Out() << evt.GetText() << endl;
372 c1.SetEndpoint(evt.GetText());
373 return T::GetCurrentState();
374 case kSM_Reconnect:
375 // Close all connections
376 c1.PostClose(false);
377 c2.PostClose(false);
378 c3.PostClose(false);
379
380 // Now wait until all connection have been closed and
381 // all pending handlers have been processed
382 poll();
383
384 // Now we can reopen the connection
385 c1.PostClose(true);
386 c2.PostClose(true);
387 c3.PostClose(true);
388
389
390 //c4.PostClose(true);
391 //c5.PostClose(true);
392 //c6.PostClose(true);
393 //c7.PostClose(true);
394 //c8.PostClose(true);
395 //c9.PostClose(true);
396 return T::GetCurrentState();
397 case kSM_Running: // We are coming from kRunning
398 case kSM_Starting: // We are coming from kConnected
399 T::Out() << "Received START" << endl;
400 con1->PostMessage("START", 10);
401 con2->PostMessage("START", 10);
402 // We could introduce a "waiting for execution" state
403 return T::GetCurrentState();
404 return kSM_Starting; //GetCurrentState();
405
406 case kSM_Connected: // We are coming from kConnected
407 case kSM_Stopping: // We are coming from kRunning
408 T::Out() << "Received STOP" << endl;
409 con1->PostMessage("STOP", 10);
410 con2->PostMessage("STOP", 10);
411 // We could introduce a "waiting for execution" state
412 return T::GetCurrentState();
413 return kSM_Stopping;//GetCurrentState();
414 }
415
416 return T::kSM_FatalError; //evt.GetTargetState();
417 }
418 int Configure(const Event &evt)
419 {
420 if (evt.GetName()=="TIME")
421 {
422 c1.PostMessage("TIME", 10);
423 c2.PostMessage("TIME", 10);
424 }
425
426 vector<char> v(2);
427 v[0] = 0xc0;
428 v[1] = 0x00;
429
430 if (evt.GetName()=="LED")
431 c3.PostMessage(v);
432
433 return T::GetCurrentState();
434 }
435};
436
437// ------------------------------------------------------------------------
438
439template<class S>
440int RunDim(Configuration &conf)
441{
442 /*
443 initscr(); // Start curses mode
444 cbreak(); // Line buffering disabled, Pass on
445 intrflush(stdscr, FALSE);
446 start_color(); // Initialize ncurses colors
447 use_default_colors(); // Assign terminal default colors to -1
448 for (int i=1; i<8; i++)
449 init_pair(i, i, -1); // -1: def background
450 scrollok(stdscr, true);
451 */
452
453 WindowLog wout;
454
455 //log.SetWindow(stdscr);
456 if (conf.Has("log"))
457 if (!wout.OpenLogFile(conf.Get<string>("log")))
458 wout << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
459
460 // Start io_service.Run to use the StateMachineImp::Run() loop
461 // Start io_service.run to only use the commandHandler command detaching
462 StateMachineFAD<S> io_service("DATA_LOGGER", wout);
463 io_service.Run();
464
465 return 0;
466}
467
468template<class T, class S>
469int RunShell(Configuration &conf)
470{
471 static T shell(conf.GetName().c_str(), conf.Get<int>("console")!=1);
472
473 WindowLog &win = shell.GetStreamIn();
474 WindowLog &wout = shell.GetStreamOut();
475
476 if (conf.Has("log"))
477 if (!wout.OpenLogFile(conf.Get<string>("log")))
478 win << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
479
480 StateMachineFAD<S> io_service("DATA_LOGGER", wout);
481 shell.SetReceiver(io_service);
482
483 boost::thread t(boost::bind(&StateMachineFAD<S>::Run, &io_service));
484
485 //io_service.SetReady();
486
487 shell.Run(); // Run the shell
488 io_service.Stop(); // Signal Loop-thread to stop
489 // io_service.Close(); // Obsolete, done by the destructor
490 // wout << "join: " << t.timed_join(boost::posix_time::milliseconds(0)) << endl;
491
492 // Wait until the StateMachine has finished its thread
493 // before returning and destroying the dim objects which might
494 // still be in use.
495 t.join();
496
497 return 0;
498}
499
500void SetupConfiguration(Configuration &conf)
501{
502 const string n = conf.GetName()+".log";
503
504 po::options_description config("Program options");
505 config.add_options()
506 ("dns", var<string>("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
507 ("log,l", var<string>(n), "Write log-file")
508 ("no-dim,d", po_switch(), "Disable dim services")
509 ("console,c", var<int>(), "Use console (0=shell, 1=simple buffered, X=simple unbuffered)")
510 ;
511
512 conf.AddEnv("dns", "DIM_DNS_NODE");
513
514 conf.AddOptions(config);
515}
516
517int main(int argc, char* argv[])
518{
519 Configuration conf(argv[0]);
520 SetupConfiguration(conf);
521
522 po::variables_map vm;
523 try
524 {
525 vm = conf.Parse(argc, argv);
526 }
527 catch (std::exception &e)
528 {
529#if BOOST_VERSION > 104000
530 po::multiple_occurrences *MO = dynamic_cast<po::multiple_occurrences*>(&e);
531 if (MO)
532 cout << "Error: " << e.what() << " of '" << MO->get_option_name() << "' option." << endl;
533 else
534#endif
535 cout << "Error: " << e.what() << endl;
536 cout << endl;
537
538 return -1;
539 }
540
541 if (conf.HasHelp() || conf.HasPrint())
542 return -1;
543
544 // To allow overwriting of DIM_DNS_NODE set 0 to 1
545 setenv("DIM_DNS_NODE", conf.Get<string>("dns").c_str(), 1);
546
547 try
548 {
549 // No console access at all
550 if (!conf.Has("console"))
551 {
552 if (conf.Get<bool>("no-dim"))
553 return RunDim<StateMachine>(conf);
554 else
555 return RunDim<StateMachineDim>(conf);
556 }
557 // Cosole access w/ and w/o Dim
558 if (conf.Get<bool>("no-dim"))
559 {
560 if (conf.Get<int>("console")==0)
561 return RunShell<LocalShell, StateMachine>(conf);
562 else
563 return RunShell<LocalConsole, StateMachine>(conf);
564 }
565 else
566 {
567 if (conf.Get<int>("console")==0)
568 return RunShell<LocalShell, StateMachineDim>(conf);
569 else
570 return RunShell<LocalConsole, StateMachineDim>(conf);
571 }
572 }
573 catch (std::exception& e)
574 {
575 std::cerr << "Exception: " << e.what() << "\n";
576 }
577
578 return 0;
579}
580
581/*
582class FADctrlDim : public StateMachineFAD<StateMachineDim>
583{
584public:
585FADctrlDim(const std::string &name="DATA_LOGGER", std::ostream &out=std::cout)
586: StateMachineFAD<StateMachineDim>(out, name) { }
587};
588
589 class FADctrlLocalShell : public StateMachineFAD<StateMachine>
590{
591public:
592 ostream &win;
593
594 FADctrlLocalShell(std::ostream &out, std::ostream &out2)
595 : StateMachineFAD<StateMachine>(out), win(out2) { }
596
597 FADctrlLocalShell(std::ostream &out=std::cout)
598 : StateMachineFAD<StateMachine>(out), win(out) { }
599
600};
601*/
Note: See TracBrowser for help on using the repository browser.