source: trunk/FACT++/src/tngweather.cc@ 14814

Last change on this file since 14814 was 14615, checked in by tbretz, 12 years ago
Removed obsolete includes.
File size: 19.7 KB
Line 
1#include <boost/bind.hpp>
2
3#include <string> // std::string
4#include <algorithm> // std::transform
5#include <cctype> // std::tolower
6
7#include "FACT.h"
8#include "Dim.h"
9#include "Event.h"
10#include "Shell.h"
11#include "StateMachineDim.h"
12#include "Connection.h"
13#include "LocalControl.h"
14#include "Configuration.h"
15#include "Timers.h"
16#include "Console.h"
17
18#include "tools.h"
19
20#include "HeadersTNGWeather.h"
21
22#include <Soprano/Soprano>
23
24namespace ba = boost::asio;
25namespace bs = boost::system;
26namespace dummy = ba::placeholders;
27
28using namespace std;
29using namespace TNGWeather;
30
31
32class ConnectionWeather : public Connection
33{
34 uint16_t fInterval;
35
36 bool fIsVerbose;
37
38 string fSite;
39
40 virtual void UpdateWeather(const Time &, const DimWeather &)
41 {
42 }
43
44 virtual void UpdateDust(const Time &, const float &)
45 {
46 }
47
48 string fRdfData;
49 float fDust;
50
51protected:
52
53 boost::array<char, 4096> fArray;
54
55 Time fLastReport;
56 Time fLastReception;
57
58 void HandleRead(const boost::system::error_code& err, size_t bytes_received)
59 {
60 // Do not schedule a new read if the connection failed.
61 if (bytes_received==0 || err)
62 {
63 if (err==ba::error::eof)
64 Warn("Connection closed by remote host.");
65
66 // 107: Transport endpoint is not connected (bs::error_code(107, bs::system_category))
67 // 125: Operation canceled
68 if (err && err!=ba::error::eof && // Connection closed by remote host
69 err!=ba::error::basic_errors::not_connected && // Connection closed by remote host
70 err!=ba::error::basic_errors::operation_aborted) // Connection closed by us
71 {
72 ostringstream str;
73 str << "Reading from " << URL() << ": " << err.message() << " (" << err << ")";// << endl;
74 Error(str);
75 }
76 PostClose(err!=ba::error::basic_errors::operation_aborted);
77
78 fRdfData = "";
79 return;
80 }
81
82 fRdfData += string(fArray.data(), bytes_received);
83
84 const size_t end = fRdfData.find("\r\n\r\n");
85 if (end==string::npos)
86 {
87 Out() << "Received data corrupted [1]." << endl;
88 Out() << fRdfData << endl;
89 return;
90 }
91
92 string data(fRdfData);
93 data.erase(0, end+4);
94
95 size_t pos = 0;
96 while (1)
97 {
98 const size_t chunk = data.find("\r\n", pos);
99 if (chunk==0 || chunk==string::npos)
100 {
101 StartReadReport();
102 return;
103 }
104
105 size_t len = 0;
106 stringstream val(data.substr(pos, chunk-pos));
107 val >> hex >> len;
108
109 data.erase(pos, chunk-pos+2);
110 if (len==0)
111 break;
112
113 pos += len+2; // Count trailing \r\n of chunk
114 }
115
116
117 fLastReception = Time();
118 fRdfData = "";
119 PostClose(false);
120
121 if (fIsVerbose)
122 {
123 Out() << "------------------------------------------------------" << endl;
124 Out() << data << endl;
125 Out() << "------------------------------------------------------" << endl;
126 }
127
128 const Soprano::Parser* p = Soprano::PluginManager::instance()->discoverParserForSerialization( Soprano::SerializationRdfXml );
129 Soprano::StatementIterator it = p->parseString(QString(data.c_str()), QUrl(""), Soprano::SerializationRdfXml );
130
131
132 DimWeather w;
133 Time time(Time::none);
134 try
135 {
136 while (it.next())
137 {
138 const string pre = (*it).predicate().toString().toStdString();
139 const string obj = (*it).object().toString().toStdString();
140
141 const size_t slash = pre.find_last_of('/');
142 if (slash==string::npos)
143 continue;
144
145 const string id = pre.substr(slash+1);
146
147 if (obj=="N/A")
148 continue;
149
150 if (id=="dimmSeeing")
151 w.fSeeing = stof(obj);
152 if (id=="dustTotal")
153 w.fDustTotal = stof(obj);
154 if (id=="deltaM1")
155 w.fDeltaM1 = stof(obj);
156 if (id=="airPressure")
157 w.fAirPressure = stof(obj);
158 if (id=="dewPoint")
159 w.fDewPoint = stof(obj);
160 if (id=="windDirection")
161 w.fWindDirection = stof(obj);
162 if (id=="windSpeed")
163 w.fWindSpeed = stof(obj)*3.6;
164 if (id=="hum")
165 w.fHumidity = stof(obj);
166 if (id=="tempGround")
167 w.fTempGround = stof(obj);
168 if (id=="temp2M")
169 w.fTemp2M = stof(obj);
170 if (id=="temp5M")
171 w.fTemp5M = stof(obj);
172 if (id=="temp10M")
173 w.fTemp10M = stof(obj);
174 if (id=="date")
175 {
176 time.SetFromStr(obj, "%Y-%m-%dT%H:%M:%S");
177
178 if (!time.IsValid())
179 {
180 struct tm tm;
181
182 vector<char> buf(255);
183 if (strptime(obj.c_str(), "%c", &tm))
184 time = Time(tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday,
185 tm.tm_hour, tm.tm_min, tm.tm_sec);
186 }
187 }
188
189 /*
190 Out() << "S: " << (*it).subject().toString().toStdString() << endl;
191 Out() << "P: " << (*it).predicate().toString().toStdString() << endl;
192 Out() << "O: " << (*it).object().toString().toStdString() << endl;
193 Out() << "C: " << (*it).context().toString().toStdString() << endl;
194 */
195 }
196
197 if (!time.IsValid())
198 throw runtime_error("time invalid");
199
200 if (time!=fLastReport && fIsVerbose)
201 {
202 Out() << endl;
203 Out() << "Date: " << time << endl;
204 Out() << "Seeing: " << w.fSeeing << endl;
205 Out() << "DustTotal: " << w.fDustTotal << endl;
206 Out() << "DeltaM1: " << w.fDeltaM1 << endl;
207 Out() << "AirPressure: " << w.fAirPressure << endl;
208 Out() << "DewPoint: " << w.fDewPoint << endl;
209 Out() << "WindDirection: " << w.fWindDirection << endl;
210 Out() << "WindSpeed: " << w.fWindSpeed << endl;
211 Out() << "Humidity: " << w.fHumidity << endl;
212 Out() << "TempGround: " << w.fTempGround << endl;
213 Out() << "Temp2M: " << w.fTemp2M << endl;
214 Out() << "Temp5M: " << w.fTemp5M << endl;
215 Out() << "Temp10M: " << w.fTemp10M << endl;
216 Out() << endl;
217 }
218
219 fLastReport = time;
220
221 UpdateWeather(time, w);
222
223 if (fDust==w.fDustTotal)
224 return;
225
226 UpdateDust(time, w.fDustTotal);
227 fDust = w.fDustTotal;
228
229 ostringstream out;
230 out << setprecision(3) << "Dust: " << fDust << "ug/m^3 [" << time << "]";
231 Message(out);
232 }
233
234 catch (const exception &e)
235 {
236 Out() << "Corrupted data received: " << e.what() << endl;
237 fLastReport = Time(Time::none);
238 return;
239 }
240 }
241
242 void StartReadReport()
243 {
244 async_read_some(ba::buffer(fArray),
245 boost::bind(&ConnectionWeather::HandleRead, this,
246 dummy::error, dummy::bytes_transferred));
247 }
248
249 boost::asio::deadline_timer fKeepAlive;
250
251 void PostRequest()
252 {
253 const string cmd =
254 "GET "+fSite+" HTTP/1.1\r\n"
255 "Accept: */*\r\n"
256 "Content-Type: application/octet-stream\r\n"
257 "User-Agent: FACT\r\n"
258 "Host: www.fact-project.org\r\n"
259 "Pragma: no-cache\r\n"
260 "Cache-Control: no-cache\r\n"
261 "Expires: 0\r\n"
262 "Connection: Keep-Alive\r\n"
263 "Cache-Control: max-age=0\r\n"
264 "\r\n";
265
266 PostMessage(cmd);
267 }
268
269 void Request()
270 {
271 PostRequest();
272
273 fKeepAlive.expires_from_now(boost::posix_time::seconds(fInterval));
274 fKeepAlive.async_wait(boost::bind(&ConnectionWeather::HandleRequest,
275 this, dummy::error));
276 }
277
278 void HandleRequest(const bs::error_code &error)
279 {
280 // 125: Operation canceled (bs::error_code(125, bs::system_category))
281 if (error && error!=ba::error::basic_errors::operation_aborted)
282 {
283 ostringstream str;
284 str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
285 Error(str);
286
287 PostClose(false);
288 return;
289 }
290
291 if (!is_open())
292 {
293 // For example: Here we could schedule a new accept if we
294 // would not want to allow two connections at the same time.
295 PostClose(true);
296 return;
297 }
298
299 // Check whether the deadline has passed. We compare the deadline
300 // against the current time since a new asynchronous operation
301 // may have moved the deadline before this actor had a chance
302 // to run.
303 if (fKeepAlive.expires_at() > ba::deadline_timer::traits_type::now())
304 return;
305
306 Request();
307 }
308
309
310private:
311 // This is called when a connection was established
312 void ConnectionEstablished()
313 {
314 Request();
315 StartReadReport();
316 }
317
318public:
319
320 static const uint16_t kMaxAddr;
321
322public:
323 ConnectionWeather(ba::io_service& ioservice, MessageImp &imp) : Connection(ioservice, imp()),
324 fIsVerbose(true), fDust(-1),
325 fLastReport(Time::none), fLastReception(Time::none),
326 fKeepAlive(ioservice)
327 {
328 SetLogStream(&imp);
329 }
330
331 void SetVerbose(bool b)
332 {
333 fIsVerbose = b;
334 Connection::SetVerbose(b);
335 }
336
337 void SetInterval(uint16_t i)
338 {
339 fInterval = i;
340 }
341
342 void SetSite(const string &site)
343 {
344 fSite = site;
345 }
346
347 int GetState() const
348 {
349 if (fLastReport.IsValid() && fLastReport+boost::posix_time::seconds(fInterval*2)>Time())
350 return 3; // receiving
351
352 if (fLastReception.IsValid() && fLastReception+boost::posix_time::seconds(fInterval*2)>Time())
353 return 2; // connected
354
355 return 1; // Disconnected
356 }
357};
358
359const uint16_t ConnectionWeather::kMaxAddr = 0xfff;
360
361// ------------------------------------------------------------------------
362
363#include "DimDescriptionService.h"
364
365class ConnectionDimWeather : public ConnectionWeather
366{
367private:
368 DimDescribedService fDimWeather;
369 DimDescribedService fDimAtmosphere;
370
371 virtual void UpdateWeather(const Time &t, const DimWeather &data)
372 {
373 fDimWeather.setData(&data, sizeof(DimWeather));
374 fDimWeather.Update(t);
375 }
376
377 virtual void UpdateDust(const Time &t, const float &dust)
378 {
379 fDimAtmosphere.setData(&dust, sizeof(float));
380 fDimAtmosphere.Update(t);
381 }
382
383public:
384 ConnectionDimWeather(ba::io_service& ioservice, MessageImp &imp) :
385 ConnectionWeather(ioservice, imp),
386 fDimWeather("TNG_WEATHER/DATA", "F:1;F:1;F:1;F:1;F:1;F:1;F:1;F:1;F:1;F:1;F:1;F:1",
387 "|T_10M[deg C]:Temperature 10m above ground"
388 "|T_5M[deg C]:Temperature 5m above ground"
389 "|T_2M[deg C]:Temperature 2m above ground"
390 "|T_0[deg C]:Temperature at ground"
391 "|T_dew[deg C]:Dew point"
392 "|H[%]:Humidity"
393 "|P[mbar]:Air pressure"
394 "|v[km/h]:Wind speed"
395 "|d[deg]:Wind direction (N-E)"
396 "|DeltaM1"
397 "|Dust[ug/m^3]:Dust (total)"
398 "|Seeing[W/m^2]:Seeing"),
399 fDimAtmosphere("TNG_WEATHER/DUST", "F:1",
400 "|Dust[ug/m^3]:Dust (total)")
401 {
402 }
403};
404
405// ------------------------------------------------------------------------
406
407template <class T, class S>
408class StateMachineWeather : public T, public ba::io_service, public ba::io_service::work
409{
410private:
411 S fWeather;
412
413 bool CheckEventSize(size_t has, const char *name, size_t size)
414 {
415 if (has==size)
416 return true;
417
418 ostringstream msg;
419 msg << name << " - Received event has " << has << " bytes, but expected " << size << ".";
420 T::Fatal(msg);
421 return false;
422 }
423
424 int SetVerbosity(const EventImp &evt)
425 {
426 if (!CheckEventSize(evt.GetSize(), "SetVerbosity", 1))
427 return T::kSM_FatalError;
428
429 fWeather.SetVerbose(evt.GetBool());
430
431 return T::GetCurrentState();
432 }
433/*
434 int Disconnect()
435 {
436 // Close all connections
437 fWeather.PostClose(false);
438
439 return T::GetCurrentState();
440 }
441
442 int Reconnect(const EventImp &evt)
443 {
444 // Close all connections to supress the warning in SetEndpoint
445 fWeather.PostClose(false);
446
447 // Now wait until all connection have been closed and
448 // all pending handlers have been processed
449 poll();
450
451 if (evt.GetBool())
452 fWeather.SetEndpoint(evt.GetString());
453
454 // Now we can reopen the connection
455 fWeather.PostClose(true);
456
457 return T::GetCurrentState();
458 }
459*/
460 int Execute()
461 {
462 // Dispatch (execute) at most one handler from the queue. In contrary
463 // to run_one(), it doesn't wait until a handler is available
464 // which can be dispatched, so poll_one() might return with 0
465 // handlers dispatched. The handlers are always dispatched/executed
466 // synchronously, i.e. within the call to poll_one()
467 poll_one();
468
469 return fWeather.GetState();
470 }
471
472
473public:
474 StateMachineWeather(ostream &out=cout) :
475 T(out, "TNG_WEATHER"), ba::io_service::work(static_cast<ba::io_service&>(*this)),
476 fWeather(*this, *this)
477 {
478 // ba::io_service::work is a kind of keep_alive for the loop.
479 // It prevents the io_service to go to stopped state, which
480 // would prevent any consecutive calls to run()
481 // or poll() to do nothing. reset() could also revoke to the
482 // previous state but this might introduce some overhead of
483 // deletion and creation of threads and more.
484
485 // State names
486 T::AddStateName(State::kDisconnected, "NoConnection",
487 "No connection to web-server could be established recently");
488
489 T::AddStateName(State::kConnected, "Invalid",
490 "Connection to webserver can be established, but received data is not recent or invalid");
491
492 T::AddStateName(State::kReceiving, "Valid",
493 "Connection to webserver can be established, receint data received");
494
495 // Verbosity commands
496 T::AddEvent("SET_VERBOSE", "B")
497 (bind(&StateMachineWeather::SetVerbosity, this, placeholders::_1))
498 ("set verbosity state"
499 "|verbosity[bool]:disable or enable verbosity for received data (yes/no), except dynamic data");
500/*
501 // Conenction commands
502 AddEvent("DISCONNECT")
503 (bind(&StateMachineWeather::Disconnect, this))
504 ("disconnect from ethernet");
505
506 AddEvent("RECONNECT", "O")
507 (bind(&StateMachineWeather::Reconnect, this, placeholders::_1))
508 ("(Re)connect ethernet connection to FTM, a new address can be given"
509 "|[host][string]:new ethernet address in the form <host:port>");
510*/
511 }
512
513 int EvalOptions(Configuration &conf)
514 {
515 fWeather.SetVerbose(!conf.Get<bool>("quiet"));
516 fWeather.SetInterval(conf.Get<uint16_t>("interval"));
517 fWeather.SetDebugTx(conf.Get<bool>("debug-tx"));
518 fWeather.SetSite(conf.Get<string>("url"));
519 fWeather.SetEndpoint(conf.Get<string>("addr"));
520 fWeather.StartConnect();
521
522 return -1;
523 }
524};
525
526// ------------------------------------------------------------------------
527
528#include "Main.h"
529
530
531template<class T, class S, class R>
532int RunShell(Configuration &conf)
533{
534 return Main::execute<T, StateMachineWeather<S, R>>(conf);
535}
536
537void SetupConfiguration(Configuration &conf)
538{
539 po::options_description control("TNG weather control options");
540 control.add_options()
541 ("no-dim,d", po_switch(), "Disable dim services")
542 ("addr,a", var<string>("tngweb.tng.iac.es:80"), "Network address of Cosy")
543 ("url,u", var<string>("/weather/rss/"), "File name and path to load")
544 ("quiet,q", po_bool(true), "Disable printing contents of all received messages (except dynamic data) in clear text.")
545 ("interval,i", var<uint16_t>(300), "Interval between two updates on the server in seconds")
546 ("debug-tx", po_bool(), "Enable debugging of ethernet transmission.")
547 ;
548
549 conf.AddOptions(control);
550}
551
552/*
553 Extract usage clause(s) [if any] for SYNOPSIS.
554 Translators: "Usage" and "or" here are patterns (regular expressions) which
555 are used to match the usage synopsis in program output. An example from cp
556 (GNU coreutils) which contains both strings:
557 Usage: cp [OPTION]... [-T] SOURCE DEST
558 or: cp [OPTION]... SOURCE... DIRECTORY
559 or: cp [OPTION]... -t DIRECTORY SOURCE...
560 */
561void PrintUsage()
562{
563 cout <<
564 "The tngweather is an interface to the TNG weather data.\n"
565 "\n"
566 "The default is that the program is started without user intercation. "
567 "All actions are supposed to arrive as DimCommands. Using the -c "
568 "option, a local shell can be initialized. With h or help a short "
569 "help message about the usuage can be brought to the screen.\n"
570 "\n"
571 "Usage: tngweather [-c type] [OPTIONS]\n"
572 " or: tngweather [OPTIONS]\n";
573 cout << endl;
574}
575
576void PrintHelp()
577{
578// Main::PrintHelp<StateMachineFTM<StateMachine, ConnectionFTM>>();
579
580 /* Additional help text which is printed after the configuration
581 options goes here */
582
583 /*
584 cout << "bla bla bla" << endl << endl;
585 cout << endl;
586 cout << "Environment:" << endl;
587 cout << "environment" << endl;
588 cout << endl;
589 cout << "Examples:" << endl;
590 cout << "test exam" << endl;
591 cout << endl;
592 cout << "Files:" << endl;
593 cout << "files" << endl;
594 cout << endl;
595 */
596}
597
598int main(int argc, const char* argv[])
599{
600 Configuration conf(argv[0]);
601 conf.SetPrintUsage(PrintUsage);
602 Main::SetupConfiguration(conf);
603 SetupConfiguration(conf);
604
605 if (!conf.DoParse(argc, argv, PrintHelp))
606 return 127;
607
608 //try
609 {
610 // No console access at all
611 if (!conf.Has("console"))
612 {
613 if (conf.Get<bool>("no-dim"))
614 return RunShell<LocalStream, StateMachine, ConnectionWeather>(conf);
615 else
616 return RunShell<LocalStream, StateMachineDim, ConnectionDimWeather>(conf);
617 }
618 // Cosole access w/ and w/o Dim
619 if (conf.Get<bool>("no-dim"))
620 {
621 if (conf.Get<int>("console")==0)
622 return RunShell<LocalShell, StateMachine, ConnectionWeather>(conf);
623 else
624 return RunShell<LocalConsole, StateMachine, ConnectionWeather>(conf);
625 }
626 else
627 {
628 if (conf.Get<int>("console")==0)
629 return RunShell<LocalShell, StateMachineDim, ConnectionDimWeather>(conf);
630 else
631 return RunShell<LocalConsole, StateMachineDim, ConnectionDimWeather>(conf);
632 }
633 }
634 /*catch (std::exception& e)
635 {
636 cerr << "Exception: " << e.what() << endl;
637 return -1;
638 }*/
639
640 return 0;
641}
Note: See TracBrowser for help on using the repository browser.