source: trunk/FACT++/src/fscctrl.cc@ 11194

Last change on this file since 11194 was 11115, checked in by tbretz, 13 years ago
Added DUMP_STREAM command; output number of received bytes
File size: 20.7 KB
Line 
1#include <boost/bind.hpp>
2#include <boost/array.hpp>
3#if BOOST_VERSION < 104400
4#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
5#undef BOOST_HAS_RVALUE_REFS
6#endif
7#endif
8#include <boost/thread.hpp>
9#include <boost/asio/error.hpp>
10#include <boost/asio/deadline_timer.hpp>
11
12#include "Dim.h"
13#include "Event.h"
14#include "Shell.h"
15#include "StateMachineDim.h"
16#include "Connection.h"
17#include "Configuration.h"
18#include "Console.h"
19#include "Converter.h"
20
21#include "tools.h"
22
23#include "LocalControl.h"
24
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace dummy = ba::placeholders;
29
30using namespace std;
31
32// ------------------------------------------------------------------------
33
34class ConnectionFSC : public Connection
35{
36 boost::asio::streambuf fBuffer;
37
38 bool fIsVerbose;
39 bool fDump;
40
41 ofstream fDumpStream;
42
43protected:
44
45 virtual void UpdateTemp(float, const vector<float> &)
46 {
47 }
48
49 virtual void UpdateHum(float, const vector<float>&)
50 {
51 }
52
53 virtual void UpdateVolt(float, const vector<float>&)
54 {
55 }
56
57 virtual void UpdateCur(float, const vector<float>&)
58 {
59 }
60
61 /*
62 virtual void UpdateError()
63 {
64 if (!fIsVerbose)
65 return;
66
67 Out() << endl << kRed << "Error received:" << endl;
68 Out() << fError;
69 if (fIsHexOutput)
70 Out() << Converter::GetHex<uint16_t>(fError, 16) << endl;
71 }
72*/
73
74 void Dump(const string &str)
75 {
76 if (!fDumpStream.is_open())
77 {
78 fDumpStream.open("socket_dump-fsc.txt", ios::app);
79 if (!fDumpStream)
80 {
81 //ostringstream str;
82 //str << "Open file " << name << ": " << strerror(errno) << " (errno=" << errno << ")";
83 //Error(str);
84
85 return;
86 }
87 }
88
89 fDumpStream << str << endl;
90 }
91
92private:
93 void HandleReceivedData(const bs::error_code& err, size_t bytes_received, int /*type*/)
94 {
95 // Do not schedule a new read if the connection failed.
96 if (bytes_received==0 || err)
97 {
98 if (err==ba::error::eof)
99 Warn("Connection closed by remote host (FTM).");
100
101 // 107: Transport endpoint is not connected (bs::error_code(107, bs::system_category))
102 // 125: Operation canceled
103 if (err && err!=ba::error::eof && // Connection closed by remote host
104 err!=ba::error::basic_errors::not_connected && // Connection closed by remote host
105 err!=ba::error::basic_errors::operation_aborted) // Connection closed by us
106 {
107 ostringstream str;
108 str << "Reading from " << URL() << ": " << err.message() << " (" << err << ")";// << endl;
109 Error(str);
110 }
111 PostClose(err!=ba::error::basic_errors::operation_aborted);
112 return;
113 }
114
115 if (fIsVerbose)
116 Out() << kBold << "Received (" << bytes_received << " bytes):" << endl;
117
118 /*
119 "status: 00000538 \n"
120 "time_s: 764.755 \n"
121 "VOLTAGES \n"
122 " \n"
123 "enable:11111111 11111111 11111111 11111111 11111111 11111111 11111111 11111111 11111111 11111111 00001111 \n"
124 " done:11111111 11111111 11111111 11111111 11111111 11111111 11111111 11111111 11111111 11111111 00001111 \n"
125 "values:0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 00 0 0 0 0 0 0 0 0 \n"
126 "RESISTANCES \n"
127 " \n"
128 "enable:11111111 11111111 11111111 11111111 11111111 11111111 11111111 11111111 \n"
129 " done:11111111 11111111 11111111 11111111 11111111 11111111 11111111 11111111 \n"
130 "values: \n"
131 "1000.16 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 \n"
132 "3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 \n"
133 "1197.07 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 \n"
134 " 558.59 677.92 817.26 989.39 1200.35 1503.06 1799.90 2204.18 \n"
135 "3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 \n"
136 "3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 \n"
137 "3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 \n"
138 "3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 3199.99 \n"
139 "end.\n";
140 */
141/*
142 const unsigned int TIME_OFF = 3;
143 const unsigned int VOLT_OFF = 30;
144 const unsigned int CURR_OFF = 70;
145 const unsigned int HUMI_OFF = 110;
146 const unsigned int TEMP_OFF = 134;
147 */
148
149 if (fDump)
150 {
151 ostringstream msg;
152 msg << "--- " << Time().GetAsStr() << " --- received " << bytes_received << " bytes.";
153 Dump(msg.str());
154 }
155
156
157 istream is(&fBuffer);
158
159 int state = 0;
160 bool values = false;
161
162 vector<int> volt;
163 vector<float> resist;
164 int status=-1;
165 float time=0;
166
167 string buffer;
168 while (getline(is, buffer, '\n'))
169 {
170 if (fIsVerbose)
171 Out() << buffer << endl;
172 if (fDump)
173 Dump(buffer);
174
175 buffer = Tools::Trim(buffer);
176
177 if (buffer.empty())
178 continue;
179
180 if (buffer.substr(0, 4)=="end.")
181 break;
182
183 if (buffer.substr(0, 8)=="status: ")
184 {
185 status = atoi(buffer.c_str()+8);
186 continue;
187 }
188
189 if (buffer.substr(0, 8)=="time_s: ")
190 {
191 time = atof(buffer.c_str()+8);
192 continue;
193 }
194
195 if (buffer.substr(0, 8)=="VOLTAGES")
196 {
197 state = 1;
198 continue;
199 }
200
201 if (buffer.substr(0, 11)=="RESISTANCES")
202 {
203 state = 2;
204 continue;
205 }
206
207 if (state==1 && buffer.substr(0, 7)=="values:")
208 {
209 istringstream in(buffer.substr(7));
210 while (1)
211 {
212 int v;
213 in >> v;
214 if (!in)
215 break;
216
217 volt.push_back(v);
218 }
219 continue;
220 }
221
222 if (state==2 && buffer.substr(0, 7)=="values:")
223 {
224 values = true;
225 continue;
226 }
227
228 if (state==2 && !values)
229 continue;
230
231 istringstream in(buffer);
232 while (1)
233 {
234 float f;
235 in >> f;
236 if (!in)
237 break;
238
239 resist.push_back(f);
240 }
241 }
242
243 StartRead();
244 }
245
246 void StartRead()
247 {
248 ba::async_read_until(*this, fBuffer, "end.\n",
249 boost::bind(&ConnectionFSC::HandleReceivedData, this,
250 dummy::error, dummy::bytes_transferred, 0));
251
252 // FIXME: Add timeout here
253 }
254
255 // This is called when a connection was established
256 void ConnectionEstablished()
257 {
258 PostMessage("m", 1);
259
260 fBuffer.prepare(10000);
261 StartRead();
262 }
263
264/*
265 void HandleReadTimeout(const bs::error_code &error)
266 {
267 if (error==ba::error::basic_errors::operation_aborted)
268 return;
269
270 if (error)
271 {
272 ostringstream str;
273 str << "Read timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
274 Error(str);
275
276 PostClose();
277 return;
278
279 }
280
281 if (!is_open())
282 {
283 // For example: Here we could schedule a new accept if we
284 // would not want to allow two connections at the same time.
285 return;
286 }
287
288 // Check whether the deadline has passed. We compare the deadline
289 // against the current time since a new asynchronous operation
290 // may have moved the deadline before this actor had a chance
291 // to run.
292 if (fInTimeout.expires_at() > ba::deadline_timer::traits_type::now())
293 return;
294
295 Error("Timeout reading data from "+URL());
296
297 PostClose();
298 }
299*/
300
301public:
302 ConnectionFSC(ba::io_service& ioservice, MessageImp &imp) : Connection(ioservice, imp()),
303 fIsVerbose(true), fDump(false)
304 {
305 SetLogStream(&imp);
306 }
307
308 void SetVerbose(bool b)
309 {
310 fIsVerbose = b;
311 }
312
313 void SetDumpStream(bool b)
314 {
315 fDump = b;
316 }
317};
318
319// ------------------------------------------------------------------------
320
321#include "DimDescriptionService.h"
322
323class ConnectionDimFSC : public ConnectionFSC
324{
325private:
326
327 DimDescribedService fDimTemp;
328 DimDescribedService fDimHum;
329 DimDescribedService fDimVolt;
330 DimDescribedService fDimCurrent;
331
332 void Update(DimDescribedService &svc, vector<float> data, float time) const
333 {
334 data.insert(data.begin(), time);
335 svc.setData(data.data(), data.size()*sizeof(float));
336 svc.updateService();
337 }
338
339 void UpdateTemp(float time, const vector<float> &temp)
340 {
341 Update(fDimTemp, temp, time);
342 }
343
344 void UpdateHum(float time, const vector<float> &hum)
345 {
346 Update(fDimHum, hum, time);
347 }
348
349 void UpdateVolt(float time, const vector<float> &volt)
350 {
351 Update(fDimVolt, volt, time);
352 }
353
354 void UpdateCur(float time, const vector<float> &curr)
355 {
356 Update(fDimCurrent, curr, time);
357 }
358
359public:
360 ConnectionDimFSC(ba::io_service& ioservice, MessageImp &imp) :
361 ConnectionFSC(ioservice, imp),
362 fDimTemp ("FSC_CONTROL/TEMPERATURE", "F:1;F:64", ""),
363 fDimHum ("FSC_CONTROL/HUMIDITY", "F:1;F:40", ""),
364 fDimVolt ("FSC_CONTROL/VOLTAGE", "F:1;F:40", ""),
365 fDimCurrent("FSC_CONTROL/CURRENT", "F:1;F:4", "")
366 {
367 }
368
369 // A B [C] [D] E [F] G H [I] J K [L] M N O P Q R [S] T U V W [X] Y Z
370};
371
372// ------------------------------------------------------------------------
373
374template <class T, class S>
375class StateMachineFSC : public T, public ba::io_service, public ba::io_service::work
376{
377 int Wrap(boost::function<void()> f)
378 {
379 f();
380 return T::GetCurrentState();
381 }
382
383 boost::function<int(const EventImp &)> Wrapper(boost::function<void()> func)
384 {
385 return boost::bind(&StateMachineFSC::Wrap, this, func);
386 }
387
388private:
389 S fFSC;
390
391 enum states_t
392 {
393 kStateDisconnected = 1,
394 kStateConnected = 2,
395 };
396
397 int Disconnect()
398 {
399 // Close all connections
400 fFSC.PostClose(false);
401
402 /*
403 // Now wait until all connection have been closed and
404 // all pending handlers have been processed
405 poll();
406 */
407
408 return T::GetCurrentState();
409 }
410
411 int Reconnect(const EventImp &evt)
412 {
413 // Close all connections to supress the warning in SetEndpoint
414 fFSC.PostClose(false);
415
416 // Now wait until all connection have been closed and
417 // all pending handlers have been processed
418 poll();
419
420 if (evt.GetBool())
421 fFSC.SetEndpoint(evt.GetString());
422
423 // Now we can reopen the connection
424 fFSC.PostClose(true);
425
426 return T::GetCurrentState();
427 }
428
429 int Execute()
430 {
431 // Dispatch (execute) at most one handler from the queue. In contrary
432 // to run_one(), it doesn't wait until a handler is available
433 // which can be dispatched, so poll_one() might return with 0
434 // handlers dispatched. The handlers are always dispatched/executed
435 // synchronously, i.e. within the call to poll_one()
436 poll_one();
437
438 return fFSC.IsConnected() ? kStateConnected : kStateDisconnected;
439 }
440
441 bool CheckEventSize(size_t has, const char *name, size_t size)
442 {
443 if (has==size)
444 return true;
445
446 ostringstream msg;
447 msg << name << " - Received event has " << has << " bytes, but expected " << size << ".";
448 T::Fatal(msg);
449 return false;
450 }
451
452 int SetVerbosity(const EventImp &evt)
453 {
454 if (!CheckEventSize(evt.GetSize(), "SetVerbosity", 1))
455 return T::kSM_FatalError;
456
457 fFSC.SetVerbose(evt.GetBool());
458
459 return T::GetCurrentState();
460 }
461
462 int SetDumpStream(const EventImp &evt)
463 {
464 if (!CheckEventSize(evt.GetSize(), "SetDumpStream", 1))
465 return T::kSM_FatalError;
466
467 fFSC.SetDumpStream(evt.GetBool());
468
469 return T::GetCurrentState();
470 }
471
472public:
473 StateMachineFSC(ostream &out=cout) :
474 T(out, "FSC_CONTROL"), ba::io_service::work(static_cast<ba::io_service&>(*this)),
475 fFSC(*this, *this)
476 {
477 // ba::io_service::work is a kind of keep_alive for the loop.
478 // It prevents the io_service to go to stopped state, which
479 // would prevent any consecutive calls to run()
480 // or poll() to do nothing. reset() could also revoke to the
481 // previous state but this might introduce some overhead of
482 // deletion and creation of threads and more.
483
484 // State names
485 AddStateName(kStateDisconnected, "Disconnected",
486 "FSC board not connected via ethernet.");
487
488 AddStateName(kStateConnected, "Connected",
489 "Ethernet connection to FSC established.");
490
491 // Verbosity commands
492 T::AddEvent("SET_VERBOSE", "B:1")
493 (boost::bind(&StateMachineFSC::SetVerbosity, this, _1))
494 ("set verbosity state"
495 "|verbosity[bool]:disable or enable verbosity for received data (yes/no), except dynamic data");
496
497 T::AddEvent("DUMP_STREAM", "B:1")
498 (boost::bind(&StateMachineFSC::SetDumpStream, this, _1))
499 (""
500 "");
501
502 // Conenction commands
503 AddEvent("DISCONNECT", kStateConnected)
504 (boost::bind(&StateMachineFSC::Disconnect, this))
505 ("disconnect from ethernet");
506
507 AddEvent("RECONNECT", "O", kStateDisconnected, kStateConnected)
508 (boost::bind(&StateMachineFSC::Reconnect, this, _1))
509 ("(Re)connect ethernet connection to FTM, a new address can be given"
510 "|[host][string]:new ethernet address in the form <host:port>");
511
512 fFSC.StartConnect();
513 }
514
515 void SetEndpoint(const string &url)
516 {
517 fFSC.SetEndpoint(url);
518 }
519
520 bool SetConfiguration(const Configuration &conf)
521 {
522 SetEndpoint(conf.Get<string>("addr"));
523
524 fFSC.SetVerbose(!conf.Get<bool>("quiet"));
525
526 return true;
527 }
528};
529
530// ------------------------------------------------------------------------
531
532void RunThread(StateMachineImp *io_service)
533{
534 // This is necessary so that the StateMachien Thread can signal the
535 // Readline to exit
536 io_service->Run();
537 Readline::Stop();
538}
539
540/*
541template<class S, class T>
542int RunDim(Configuration &conf)
543{
544 WindowLog wout;
545
546 ReadlineColor::PrintBootMsg(wout, conf.GetName(), false);
547
548
549 if (conf.Has("log"))
550 if (!wout.OpenLogFile(conf.Get<string>("log")))
551 wout << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
552
553 // Start io_service.Run to use the StateMachineImp::Run() loop
554 // Start io_service.run to only use the commandHandler command detaching
555 StateMachineFSC<S, T> io_service(wout);
556 if (!io_service.SetConfiguration(conf))
557 return -1;
558
559 io_service.Run();
560
561 return 0;
562}
563*/
564
565template<class T, class S, class R>
566int RunShell(Configuration &conf)
567{
568 static T shell(conf.GetName().c_str(), conf.Get<int>("console")!=1);
569
570 WindowLog &win = shell.GetStreamIn();
571 WindowLog &wout = shell.GetStreamOut();
572
573 if (conf.Has("log"))
574 if (!wout.OpenLogFile(conf.Get<string>("log")))
575 win << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
576
577 StateMachineFSC<S, R> io_service(wout);
578 if (!io_service.SetConfiguration(conf))
579 return -1;
580
581 shell.SetReceiver(io_service);
582
583 boost::thread t(boost::bind(RunThread, &io_service));
584 // boost::thread t(boost::bind(&StateMachineFSC<S>::Run, &io_service));
585
586 if (conf.Has("exec"))
587 {
588 const vector<string> v = conf.Get<vector<string>>("exec");
589 for (vector<string>::const_iterator it=v.begin(); it!=v.end(); it++)
590 shell.Execute(*it);
591 }
592
593 shell.Run(); // Run the shell
594 io_service.Stop(); // Signal Loop-thread to stop
595 // io_service.Close(); // Obsolete, done by the destructor
596
597 // Wait until the StateMachine has finished its thread
598 // before returning and destroying the dim objects which might
599 // still be in use.
600 t.join();
601
602 return 0;
603}
604
605void SetupConfiguration(Configuration &conf)
606{
607 const string n = conf.GetName()+".log";
608
609 po::options_description config("Program options");
610 config.add_options()
611 ("dns", var<string>("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
612 ("log,l", var<string>(n), "Write log-file")
613 ("no-dim,d", po_bool(), "Disable dim services")
614 ("console,c", var<int>(), "Use console (0=shell, 1=simple buffered, X=simple unbuffered)")
615 ("exec,e", vars<string>(), "Execute one or more scrips at startup")
616 ;
617
618 po::options_description control("FTM control options");
619 control.add_options()
620 ("addr,a", var<string>("localhost:5000"), "Network address of FTM")
621 ("quiet,q", po_bool(), "Disable printing contents of all received messages (except dynamic data) in clear text.")
622 ;
623
624 conf.AddEnv("dns", "DIM_DNS_NODE");
625
626 conf.AddOptions(config);
627 conf.AddOptions(control);
628}
629
630/*
631 Extract usage clause(s) [if any] for SYNOPSIS.
632 Translators: "Usage" and "or" here are patterns (regular expressions) which
633 are used to match the usage synopsis in program output. An example from cp
634 (GNU coreutils) which contains both strings:
635 Usage: cp [OPTION]... [-T] SOURCE DEST
636 or: cp [OPTION]... SOURCE... DIRECTORY
637 or: cp [OPTION]... -t DIRECTORY SOURCE...
638 */
639void PrintUsage()
640{
641 cout <<
642 "The ftmctrl controls the FSC (FACT Slow Control) board.\n"
643 "\n"
644 "The default is that the program is started without user intercation. "
645 "All actions are supposed to arrive as DimCommands. Using the -c "
646 "option, a local shell can be initialized. With h or help a short "
647 "help message about the usuage can be brought to the screen.\n"
648 "\n"
649 "Usage: fscctrl [-c type] [OPTIONS]\n"
650 " or: fscctrl [OPTIONS]\n";
651 cout << endl;
652}
653
654void PrintHelp()
655{
656 /* Additional help text which is printed after the configuration
657 options goes here */
658
659 /*
660 cout << "bla bla bla" << endl << endl;
661 cout << endl;
662 cout << "Environment:" << endl;
663 cout << "environment" << endl;
664 cout << endl;
665 cout << "Examples:" << endl;
666 cout << "test exam" << endl;
667 cout << endl;
668 cout << "Files:" << endl;
669 cout << "files" << endl;
670 cout << endl;
671 */
672}
673
674int main(int argc, const char* argv[])
675{
676 Configuration conf(argv[0]);
677 conf.SetPrintUsage(PrintUsage);
678 SetupConfiguration(conf);
679
680 po::variables_map vm;
681 try
682 {
683 vm = conf.Parse(argc, argv);
684 }
685#if BOOST_VERSION > 104000
686 catch (po::multiple_occurrences &e)
687 {
688 cerr << "Program options invalid due to: " << e.what() << " of '" << e.get_option_name() << "'." << endl;
689 return -1;
690 }
691#endif
692 catch (exception& e)
693 {
694 cerr << "Program options invalid due to: " << e.what() << endl;
695 return -1;
696 }
697
698 if (conf.HasVersion() || conf.HasPrint())
699 return -1;
700
701 if (conf.HasHelp())
702 {
703 PrintHelp();
704 return -1;
705 }
706
707 Dim::Setup(conf.Get<string>("dns"));
708
709 //try
710 {
711 // No console access at all
712 if (!conf.Has("console"))
713 {
714 if (conf.Get<bool>("no-dim"))
715 return RunShell<LocalStream, StateMachine, ConnectionFSC>(conf);
716 else
717 return RunShell<LocalStream, StateMachineDim, ConnectionDimFSC>(conf);
718 }
719 // Cosole access w/ and w/o Dim
720 if (conf.Get<bool>("no-dim"))
721 {
722 if (conf.Get<int>("console")==0)
723 return RunShell<LocalShell, StateMachine, ConnectionFSC>(conf);
724 else
725 return RunShell<LocalConsole, StateMachine, ConnectionFSC>(conf);
726 }
727 else
728 {
729 if (conf.Get<int>("console")==0)
730 return RunShell<LocalShell, StateMachineDim, ConnectionDimFSC>(conf);
731 else
732 return RunShell<LocalConsole, StateMachineDim, ConnectionDimFSC>(conf);
733 }
734 }
735 /*catch (std::exception& e)
736 {
737 cerr << "Exception: " << e.what() << endl;
738 return -1;
739 }*/
740
741 return 0;
742}
Note: See TracBrowser for help on using the repository browser.