source: trunk/FACT++/src/scheduler.cc@ 10560

Last change on this file since 10560 was 10530, checked in by Daniela Dorner, 14 years ago
added program for scheduler
File size: 15.3 KB
Line 
1#define BOOST_DISABLE_ASSERTS 1
2#include <boost/bind.hpp>
3#include <boost/thread.hpp>
4#include <boost/asio/deadline_timer.hpp>
5
6#include "Time.h"
7#include "Event.h"
8#include "Shell.h"
9#include "StateMachineDim.h"
10#include "Connection.h"
11#include "Configuration.h"
12#include "Timers.h"
13#include "Console.h"
14#include "Converter.h"
15
16#include "tools.h"
17
18#include <vector>
19
20namespace ba = boost::asio;
21namespace bs = boost::system;
22
23using ba::deadline_timer;
24using ba::ip::tcp;
25
26using namespace std;
27
28
29#include "LocalControl.h"
30#include <boost/date_time/posix_time/posix_time.hpp>
31#include <mysql++/mysql++.h>
32
33// string containing database information
34string database;
35
36
37// =========================================================================
38
39template <class T>
40class AutoScheduler : public T
41{
42 bool fNextIsPreview;
43public:
44 enum states_t
45 {
46 kSM_Scheduling=1,
47 kSM_Comitting,
48 };
49
50 struct ObservationParameters
51 {
52 int obskey;
53 boost::posix_time::ptime starttime;
54 boost::posix_time::ptime stoptime;
55 boost::posix_time::time_duration duration_db;
56 string sourcename;
57 };
58
59 struct FixedObs
60 {
61 int obskey_fixed;
62 int sourcekey_fixed;
63 string sourcename_fixed;
64 int obsmode_fixed;
65 boost::posix_time::ptime obsfixedstart;
66 boost::posix_time::ptime obsfixedstop;
67 };
68
69 struct StdObs
70 {
71 int obskey_std;
72 int sourcekey_std;
73 string sourcename_std;
74 int obsmode_std;
75 boost::posix_time::ptime obsstdstart;
76 boost::posix_time::ptime obsstdstop;
77 };
78
79 struct ScheduledObs
80 {
81 int sourcekey_obs;
82 string sourcename_obs;
83 int obsmode_obs;
84 boost::posix_time::ptime obsstart;
85 boost::posix_time::ptime obsstop;
86 };
87
88 struct ScheduledRun
89 {
90 int runnumber;
91 int runtype;
92 int sourcekey;
93 int obsmode;
94 boost::posix_time::ptime runstart;
95 boost::posix_time::ptime runstop;
96 };
97
98 int fSessionId;
99
100 int Schedule()
101 {
102 stringstream str;
103 str << "Scheduling started -> Preview (id=" << fSessionId << ")";
104 T::Message(str);
105
106 //static const boost::regex expr("(([[:word:].-]+)(:(.+))?@)?([[:word:].-]+)(:([[:digit:]]+))?(/([[:word:].-]+))?");
107 static const boost::regex expr("(([[:word:].-]+)(:(.+))?@)?([[:word:].-]+)(:([[:digit:]]+))?(/([[:word:].-]+))");
108 // 2: user
109 // 4: pass
110 // 5: server
111 // 7: port
112 // 9: db
113
114 //string database="root:Ihtp4aDB@localhost:3306/Scheduling";
115 boost::smatch what;
116 if (!boost::regex_match(database, what, expr, boost::match_extra))
117 {
118 cout << "Couldn't parse '" << database << "'." << endl;
119 throw;
120 }
121
122 if (what.size()!=10)
123 {
124 cout << "Error parsing '" << database << "'." << endl;
125 throw;
126 }
127
128 const string user = what[2];
129 const string passwd = what[4];
130 const string server = what[5];
131 const string db = what[9];
132 const int port = atoi(string(what[7]).c_str());
133
134 cout << "Connecting to '";
135 if (!user.empty())
136 cout << user << "@";
137 cout << server;
138 if (port)
139 cout << ":" << port;
140 if (!db.empty())
141 cout << "/" << db;
142 cout << "'" << endl;
143
144 mysqlpp::Connection conn(db.c_str(), server.c_str(), user.c_str(), passwd.c_str(), port);
145 if (!conn.connected())
146 {
147 cout << "MySQL connection error: " << conn.error() << endl;
148 throw;
149 }
150 //mysqlpp::Connection conn(db.c_str(), server.c_str(), user.c_str(), passwd.c_str(), port);
151 //mysqlpp::Connection conn("Scheduling", "localhost", "root", "Ihtp4aDB", 3306);
152 if (!conn.connected())
153 {
154 cout << "MySQL connection error: " << conn.error() << endl;
155 throw;
156 }
157
158 // get observation parameters from DB
159 mysqlpp::Query query = conn.query("SELECT fObservationKEY, fStartTime, fStopTime, fDuration, fSourceName, fSourceKEY, fSplitFlag, fFluxWeight, fObservationModeKEY, fObservationTypeKEY FROM ObservationParameters LEFT JOIN Source USING(fSourceKEY) ORDER BY fObservationKEY");
160 //maybe order by priority?
161
162 mysqlpp::StoreQueryResult res = query.store();
163 if (!res)
164 {
165 cout << "MySQL query failed: " << query.error() << endl;
166 throw;
167 }
168
169 cout << "Found " << res.num_rows() << " Observation Parameter sets." << endl;
170
171 ObservationParameters olist[res.num_rows()];
172 std::vector<FixedObs> obsfixedlist;
173 std::vector<StdObs> obsstdlist;
174 std::vector<ScheduledObs> obslist;
175 std::vector<ScheduledRun> runlist;
176
177 // loop over observation parameters from DB
178 // fill these parameters into FixedObs and StdObs
179 int counter=0;
180 int counter2=0;
181 int counter3=0;
182 for (vector<mysqlpp::Row>::iterator v=res.begin(); v<res.end(); v++)
183 {
184 cout << "Obskey: " << (*v)[0].c_str() << " source: " << (*v)[4].c_str() << "(" << (*v)[5].c_str() << ")" << flush;
185 cout << " T1 " << (*v)[1].c_str() << " T2: " << (*v)[2].c_str() << " (c " << counter << " " << counter2 << ")" << endl;
186
187 stringstream t1;
188 stringstream t2;
189 stringstream t3;
190 t1 << (*v)[1].c_str();
191 t2 << (*v)[2].c_str();
192 t3 << (*v)[3].c_str();
193
194 //boost::posix_time::time_duration mintime(0,conf.Get<int>("mintime"), 0);
195 boost::posix_time::time_duration mintime(1, 0, 0);
196 t1 >> Time::sql >> olist[counter].starttime;
197 t2 >> Time::sql >> olist[counter].stoptime;
198 t3 >> olist[counter].duration_db;
199 boost::posix_time::time_period period(olist[counter].starttime, olist[counter].stoptime);
200 olist[counter].sourcename=(*v)[4].c_str();
201 olist[counter].obskey=(*v)[0];
202
203 // time_duration cannot be used, as only up to 99 hours are handeled
204 boost::posix_time::time_duration duration = period.length();
205
206 /*
207 if (olist[counter].stoptime < olist[counter].starttime+mintime)
208 cout << " ====> WARN: Observation too short. " << endl;
209
210 if (olist[counter].starttime.is_not_a_date_time())
211 cout << " WARN: starttime not a date_time. " << endl;
212 else
213 cout << " start: " << Time::sql << olist[counter].starttime << endl;
214 if (olist[counter].stoptime.is_not_a_date_time())
215 cout << " WARN: stoptime not a date_time. " << endl;
216 else
217 cout << " stop: " << Time::sql << olist[counter].stoptime << endl;
218 if (!(olist[counter].starttime.is_not_a_date_time() || olist[counter].stoptime.is_not_a_date_time()))
219 cout << " diff: " << period << endl;
220 if (olist[counter].stoptime < olist[counter].starttime)
221 cout << " ====> WARN: stop time (" << olist[counter].stoptime << ") < start time (" << olist[counter].starttime << "). " << endl;
222 cout << "diff: " << duration << flush;
223 cout << "dur_db: " << olist[counter].duration_db << endl;
224 */
225
226 // if start and stop time are available, it is a fixed observation
227 if (!(olist[counter].stoptime.is_not_a_date_time() && olist[counter].starttime.is_not_a_date_time()))
228 {
229 obsfixedlist.resize(counter2+1);
230 obsfixedlist[counter2].obsfixedstart=olist[counter].starttime;
231 obsfixedlist[counter2].obsfixedstop=olist[counter].stoptime;
232 obsfixedlist[counter2].sourcename_fixed=olist[counter].sourcename;
233 obsfixedlist[counter2].obskey_fixed=olist[counter].obskey;
234 counter2++;
235 }
236 else
237 {
238 obsstdlist.resize(counter3+1);
239 obsstdlist[counter3].sourcename_std=olist[counter].sourcename;
240 obsstdlist[counter3].obskey_std=olist[counter].obskey;
241 counter3++;
242 }
243
244 counter++;
245 }
246 cout << obsfixedlist.size() << " fixed observations found. " << endl;
247 cout << obsstdlist.size() << " standard observations found. " << endl;
248
249 // in this loop the fixed observations shall be
250 // checked, evaluated and added to the ScheduledObs list
251 struct vector<FixedObs>::iterator vobs;
252 cout << "Fixed Observations: " << endl;
253 for (vobs=obsfixedlist.begin(); vobs!=obsfixedlist.end(); vobs++)
254 {
255 cout << " " << (*vobs).sourcename_fixed << " " << (*vobs).obsfixedstart << flush;
256 cout << " - " << (*vobs).obsfixedstop << endl;
257 }
258
259 // in this loop the standard observations shall be
260 // checked, evaluated
261 // the observation times shall be calculated
262 // and the observations added to the ScheduledObs list
263 struct vector<StdObs>::iterator vobs2;
264 cout << "Standard Observations: " << endl;
265 for (vobs2=obsstdlist.begin(); vobs2!=obsstdlist.end(); vobs2++)
266 {
267 cout << " " << (*vobs2).sourcename_std << endl;
268 }
269
270 // in this loop from the scheduled observations the list
271 // of scheduled runs shall be calculated
272 struct vector<ScheduledObs>::iterator vobs3;
273 for (vobs3=obslist.begin(); vobs3!=obslist.end(); vobs3++)
274 {
275 cout << (*vobs3).sourcename_obs << endl;
276 }
277
278 //usleep(3000000);
279 T::Message("Scheduling done.");
280
281 fSessionId = -1;
282
283 bool error = false;
284 return error ? T::kSM_Error : T::kSM_Ready;
285 }
286
287 /*
288 // commit probably done by webinterface
289 int Commit()
290 {
291 stringstream str;
292 str << "Comitting preview (id=" << fSessionId << ")";
293 T::Message(str);
294
295 usleep(3000000);
296 T::Message("Comitted.");
297
298 fSessionId = -1;
299
300 bool error = false;
301 return error ? T::kSM_Error : T::kSM_Ready;
302 }
303 */
304
305 AutoScheduler(ostream &out=cout) : T(out, "SCHEDULER"), fNextIsPreview(true), fSessionId(-1)
306 {
307 AddStateName(kSM_Scheduling, "Scheduling");
308 //AddStateName(kSM_Comitting, "Comitting");
309
310 AddTransition(kSM_Scheduling, "SCHEDULE", T::kSM_Ready);
311 //AddTransition(kSM_Comitting, "COMMIT", T::kSM_Ready);
312
313 T::PrintListOfEvents();
314 }
315
316 int Execute()
317 {
318 switch (T::GetCurrentState())
319 {
320 case kSM_Scheduling:
321 return Schedule();
322
323 //case kSM_Comitting:
324 // return Commit();
325 }
326 return T::GetCurrentState();
327 }
328
329 int Transition(const Event &evt)
330 {
331 switch (evt.GetTargetState())
332 {
333 case kSM_Scheduling:
334 //case kSM_Comitting:
335 //fSessionId = evt.GetInt();
336 break;
337 }
338
339 return evt.GetTargetState();
340 }
341 int Configure(const Event &)
342 {
343 return T::GetCurrentState();
344 }
345};
346
347// ------------------------------------------------------------------------
348
349template<class S>
350int RunDim(Configuration &conf)
351{
352 WindowLog wout;
353
354 //log.SetWindow(stdscr);
355 if (conf.Has("log"))
356 if (!wout.OpenLogFile(conf.Get<string>("log")))
357 wout << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
358
359 // Start io_service.Run to use the StateMachineImp::Run() loop
360 // Start io_service.run to only use the commandHandler command detaching
361 AutoScheduler<S> io_service(wout);
362 io_service.Run();
363
364 return 0;
365}
366
367template<class T, class S>
368int RunShell(Configuration &conf)
369{
370 static T shell(conf.GetName().c_str(), conf.Get<int>("console")!=1);
371
372 WindowLog &win = shell.GetStreamIn();
373 WindowLog &wout = shell.GetStreamOut();
374
375 if (conf.Has("log"))
376 if (!wout.OpenLogFile(conf.Get<string>("log")))
377 win << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
378
379 AutoScheduler<S> io_service(wout);
380 shell.SetReceiver(io_service);
381
382 boost::thread t(boost::bind(&AutoScheduler<S>::Run, &io_service));
383
384 //io_service.SetReady();
385
386 shell.Run(); // Run the shell
387 io_service.Stop(); // Signal Loop-thread to stop
388 // io_service.Close(); // Obsolete, done by the destructor
389 // wout << "join: " << t.timed_join(boost::posix_time::milliseconds(0)) << endl;
390
391 // Wait until the StateMachine has finished its thread
392 // before returning and destroying the dim objects which might
393 // still be in use.
394 t.join();
395
396 return 0;
397}
398
399void SetupConfiguration(Configuration &conf)
400{
401 const string n = conf.GetName()+".log";
402
403 //po::options_description config("Program options");
404 po::options_description config("Configuration");
405 config.add_options()
406 ("dns", var<string>("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
407 ("log,l", var<string>(n), "Write log-file")
408 ("no-dim,d", po_switch(), "Disable dim services")
409 ("console,c", var<int>(), "Use console (0=shell, 1=simple buffered, X=simple unbuffered)")
410 ("mintime", var<int>(), "minimum observation time")
411 ;
412
413 conf.AddEnv("dns", "DIM_DNS_NODE");
414
415 conf.AddOptions(config);
416 conf.AddOptionsDatabase(config);
417}
418
419int main(int argc, const char* argv[])
420{
421 Configuration conf(argv[0]);
422 SetupConfiguration(conf);
423
424 po::variables_map vm;
425 try
426 {
427 vm = conf.Parse(argc, argv);
428 }
429 catch (std::exception &e)
430 {
431#if BOOST_VERSION > 104000
432 po::multiple_occurrences *MO = dynamic_cast<po::multiple_occurrences*>(&e);
433 if (MO)
434 cout << "Error: " << e.what() << " of '" << MO->get_option_name() << "' option." << endl;
435 else
436#endif
437 cout << "Error: " << e.what() << endl;
438 cout << endl;
439
440 return -1;
441 }
442
443 //if (conf.Has("mintime"))
444 // cout << "has mintime --------------------+" << conf.Get<int>("mintime")<< "+------------" << endl;
445
446 if (conf.Has("database"))
447 database = conf.Get<string>("database").c_str();
448 else
449 {
450 cout << "Please provide which database you want to use for scheduling." << endl;
451 return -1;
452 }
453
454
455 if (conf.HasHelp() || conf.HasPrint())
456 return -1;
457
458 // To allow overwriting of DIM_DNS_NODE set 0 to 1
459 setenv("DIM_DNS_NODE", conf.Get<string>("dns").c_str(), 1);
460
461 try
462 {
463 // No console access at all
464 if (!conf.Has("console"))
465 {
466 if (conf.Get<bool>("no-dim"))
467 return RunDim<StateMachine>(conf);
468 else
469 return RunDim<StateMachineDim>(conf);
470 }
471 // Cosole access w/ and w/o Dim
472 if (conf.Get<bool>("no-dim"))
473 {
474 if (conf.Get<int>("console")==0)
475 return RunShell<LocalShell, StateMachine>(conf);
476 else
477 return RunShell<LocalConsole, StateMachine>(conf);
478 }
479 else
480 {
481 if (conf.Get<int>("console")==0)
482 return RunShell<LocalShell, StateMachineDim>(conf);
483 else
484 return RunShell<LocalConsole, StateMachineDim>(conf);
485 }
486 }
487 catch (std::exception& e)
488 {
489 std::cerr << "Exception: " << e.what() << "\n";
490 }
491
492 return 0;
493}
Note: See TracBrowser for help on using the repository browser.