source: trunk/FACT++/src/scheduler.cc @ 10530

Last change on this file since 10530 was 10530, checked in by dorner, 9 years ago
added program for scheduler
File size: 15.3 KB
Line 
1#define BOOST_DISABLE_ASSERTS 1
2#include <boost/bind.hpp>
3#include <boost/thread.hpp>
4#include <boost/asio/deadline_timer.hpp>
5
6#include "Time.h"
7#include "Event.h"
8#include "Shell.h"
9#include "StateMachineDim.h"
10#include "Connection.h"
11#include "Configuration.h"
12#include "Timers.h"
13#include "Console.h"
14#include "Converter.h"
15
16#include "tools.h"
17
18#include <vector>
19
20namespace ba    = boost::asio;
21namespace bs    = boost::system;
22
23using ba::deadline_timer;
24using ba::ip::tcp;
25
26using namespace std;
27
28
29#include "LocalControl.h"
30#include <boost/date_time/posix_time/posix_time.hpp>
31#include <mysql++/mysql++.h>
32
33// string containing database information
34string database;
35
36
37// =========================================================================
38
39template <class T>
40class AutoScheduler : public T
41{
42    bool fNextIsPreview;
43public:
44    enum states_t
45    {
46        kSM_Scheduling=1,
47        kSM_Comitting,
48    };
49
50    struct ObservationParameters
51    {
52        int obskey;
53        boost::posix_time::ptime starttime;
54        boost::posix_time::ptime stoptime;
55        boost::posix_time::time_duration duration_db;
56        string sourcename;
57    };
58
59    struct FixedObs
60    {
61        int obskey_fixed;
62        int sourcekey_fixed;
63        string sourcename_fixed;
64        int obsmode_fixed;
65        boost::posix_time::ptime obsfixedstart;
66        boost::posix_time::ptime obsfixedstop;
67    };
68
69    struct StdObs
70    {
71        int obskey_std;
72        int sourcekey_std;
73        string sourcename_std;
74        int obsmode_std;
75        boost::posix_time::ptime obsstdstart;
76        boost::posix_time::ptime obsstdstop;
77    };
78
79    struct ScheduledObs
80    {
81        int sourcekey_obs;
82        string sourcename_obs;
83        int obsmode_obs;
84        boost::posix_time::ptime obsstart;
85        boost::posix_time::ptime obsstop;
86    };
87
88    struct ScheduledRun
89    {
90        int runnumber;
91        int runtype;
92        int sourcekey;
93        int obsmode;
94        boost::posix_time::ptime runstart;
95        boost::posix_time::ptime runstop;
96    };
97
98    int fSessionId;
99
100    int Schedule()
101    {
102        stringstream str;
103        str << "Scheduling started -> Preview (id=" << fSessionId << ")";
104        T::Message(str);
105
106        //static const boost::regex expr("(([[:word:].-]+)(:(.+))?@)?([[:word:].-]+)(:([[:digit:]]+))?(/([[:word:].-]+))?");
107        static const boost::regex expr("(([[:word:].-]+)(:(.+))?@)?([[:word:].-]+)(:([[:digit:]]+))?(/([[:word:].-]+))");
108        // 2: user
109        // 4: pass
110        // 5: server
111        // 7: port
112        // 9: db
113
114        //string database="root:Ihtp4aDB@localhost:3306/Scheduling";
115        boost::smatch what;
116        if (!boost::regex_match(database, what, expr, boost::match_extra))
117        {
118            cout << "Couldn't parse '" << database << "'." << endl;
119            throw;
120        }
121
122        if (what.size()!=10)
123        {
124            cout << "Error parsing '" << database << "'." << endl;
125            throw;
126        }
127
128        const string user   = what[2];
129        const string passwd = what[4];
130        const string server = what[5];
131        const string db     = what[9];
132        const int port      = atoi(string(what[7]).c_str());
133
134        cout << "Connecting to '";
135        if (!user.empty())
136            cout << user << "@";
137        cout << server;
138        if (port)
139            cout << ":" << port;
140        if (!db.empty())
141            cout << "/" << db;
142        cout << "'" << endl;
143
144        mysqlpp::Connection conn(db.c_str(), server.c_str(), user.c_str(), passwd.c_str(), port);
145        if (!conn.connected())
146        {
147            cout << "MySQL connection error: " << conn.error() << endl;
148            throw;
149        }
150        //mysqlpp::Connection conn(db.c_str(), server.c_str(), user.c_str(), passwd.c_str(), port);
151        //mysqlpp::Connection conn("Scheduling", "localhost", "root", "Ihtp4aDB", 3306);
152        if (!conn.connected())
153        {
154            cout << "MySQL connection error: " << conn.error() << endl;
155            throw;
156        }
157
158        // get observation parameters from DB
159        mysqlpp::Query query = conn.query("SELECT fObservationKEY, fStartTime, fStopTime, fDuration, fSourceName, fSourceKEY, fSplitFlag, fFluxWeight, fObservationModeKEY, fObservationTypeKEY FROM ObservationParameters LEFT JOIN Source USING(fSourceKEY) ORDER BY fObservationKEY");
160        //maybe order by priority?
161
162        mysqlpp::StoreQueryResult res = query.store();
163        if (!res)
164        {
165            cout << "MySQL query failed: " << query.error() << endl;
166            throw;
167        }
168
169        cout << "Found " << res.num_rows() << " Observation Parameter sets." << endl;
170
171        ObservationParameters olist[res.num_rows()];
172        std::vector<FixedObs> obsfixedlist;
173        std::vector<StdObs> obsstdlist;
174        std::vector<ScheduledObs> obslist;
175        std::vector<ScheduledRun> runlist;
176
177        // loop over observation parameters from DB
178        // fill these parameters into FixedObs and StdObs
179        int counter=0;
180        int counter2=0;
181        int counter3=0;
182        for (vector<mysqlpp::Row>::iterator v=res.begin(); v<res.end(); v++)
183        {
184            cout << "Obskey: " << (*v)[0].c_str() << " source: " << (*v)[4].c_str() << "(" << (*v)[5].c_str() << ")" << flush;
185            cout << " T1 " << (*v)[1].c_str() << " T2: " << (*v)[2].c_str() << " (c " << counter << " " << counter2 << ")" << endl;
186
187            stringstream t1;
188            stringstream t2;
189            stringstream t3;
190            t1 << (*v)[1].c_str();
191            t2 << (*v)[2].c_str();
192            t3 << (*v)[3].c_str();
193
194            //boost::posix_time::time_duration mintime(0,conf.Get<int>("mintime"), 0);
195            boost::posix_time::time_duration mintime(1, 0, 0);
196            t1 >> Time::sql >> olist[counter].starttime;
197            t2 >> Time::sql >> olist[counter].stoptime;
198            t3 >> olist[counter].duration_db;
199            boost::posix_time::time_period period(olist[counter].starttime, olist[counter].stoptime);
200            olist[counter].sourcename=(*v)[4].c_str();
201            olist[counter].obskey=(*v)[0];
202
203            // time_duration cannot be used, as only up to 99 hours are handeled
204            boost::posix_time::time_duration duration = period.length();
205
206            /*
207            if (olist[counter].stoptime < olist[counter].starttime+mintime)
208                cout << "  ====> WARN: Observation too short. " << endl;
209
210            if (olist[counter].starttime.is_not_a_date_time())
211                cout << "  WARN: starttime not a date_time. " << endl;
212            else
213                cout << "  start:   " << Time::sql << olist[counter].starttime << endl;
214            if (olist[counter].stoptime.is_not_a_date_time())
215                cout << "  WARN: stoptime not a date_time. " << endl;
216            else
217                cout << "  stop:   " << Time::sql << olist[counter].stoptime << endl;
218            if (!(olist[counter].starttime.is_not_a_date_time() || olist[counter].stoptime.is_not_a_date_time()))
219                cout << "  diff:   " << period << endl;
220            if (olist[counter].stoptime < olist[counter].starttime)
221                cout << "  ====> WARN: stop time (" << olist[counter].stoptime << ") < start time (" << olist[counter].starttime << "). " << endl;
222            cout << "diff:   " << duration << flush;
223            cout << "dur_db:   " << olist[counter].duration_db << endl;
224            */
225
226            // if start and stop time are available, it is a fixed observation
227            if (!(olist[counter].stoptime.is_not_a_date_time() && olist[counter].starttime.is_not_a_date_time()))
228            {
229                obsfixedlist.resize(counter2+1);
230                obsfixedlist[counter2].obsfixedstart=olist[counter].starttime;
231                obsfixedlist[counter2].obsfixedstop=olist[counter].stoptime;
232                obsfixedlist[counter2].sourcename_fixed=olist[counter].sourcename;
233                obsfixedlist[counter2].obskey_fixed=olist[counter].obskey;
234                counter2++;
235            }
236            else
237            {
238                obsstdlist.resize(counter3+1);
239                obsstdlist[counter3].sourcename_std=olist[counter].sourcename;
240                obsstdlist[counter3].obskey_std=olist[counter].obskey;
241                counter3++;
242            }
243
244            counter++;
245        }
246        cout << obsfixedlist.size() << " fixed observations found. " << endl;
247        cout << obsstdlist.size() << " standard observations found. " << endl;
248
249        // in this loop the fixed observations shall be
250        // checked, evaluated and added to the ScheduledObs list
251        struct vector<FixedObs>::iterator vobs;
252        cout << "Fixed Observations: " << endl;
253        for (vobs=obsfixedlist.begin(); vobs!=obsfixedlist.end(); vobs++)
254        {
255            cout << "  " << (*vobs).sourcename_fixed <<  " " << (*vobs).obsfixedstart << flush;
256            cout << " - " << (*vobs).obsfixedstop << endl;
257        }
258
259        // in this loop the standard observations shall be
260        // checked, evaluated
261        // the observation times shall be calculated
262        // and the observations added to the ScheduledObs list
263        struct vector<StdObs>::iterator vobs2;
264        cout << "Standard Observations: " << endl;
265        for (vobs2=obsstdlist.begin(); vobs2!=obsstdlist.end(); vobs2++)
266        {
267            cout << "  " << (*vobs2).sourcename_std << endl;
268        }
269
270        // in this loop from the scheduled observations the list
271        // of scheduled runs shall be calculated
272        struct vector<ScheduledObs>::iterator vobs3;
273        for (vobs3=obslist.begin(); vobs3!=obslist.end(); vobs3++)
274        {
275            cout << (*vobs3).sourcename_obs << endl;
276        }
277
278        //usleep(3000000);
279        T::Message("Scheduling done.");
280
281        fSessionId = -1;
282
283        bool error = false;
284        return error ? T::kSM_Error : T::kSM_Ready;
285    }
286
287    /*
288    // commit probably done by webinterface
289    int Commit()
290    {
291        stringstream str;
292        str << "Comitting preview (id=" << fSessionId << ")";
293        T::Message(str);
294
295        usleep(3000000);
296        T::Message("Comitted.");
297
298        fSessionId = -1;
299
300        bool error = false;
301        return error ? T::kSM_Error : T::kSM_Ready;
302    }
303    */
304
305    AutoScheduler(ostream &out=cout) : T(out, "SCHEDULER"), fNextIsPreview(true), fSessionId(-1)
306    {
307        AddStateName(kSM_Scheduling,  "Scheduling");
308        //AddStateName(kSM_Comitting,   "Comitting");
309
310        AddTransition(kSM_Scheduling, "SCHEDULE", T::kSM_Ready);
311        //AddTransition(kSM_Comitting,  "COMMIT",   T::kSM_Ready);
312
313        T::PrintListOfEvents();
314    }
315
316    int Execute()
317    {
318        switch (T::GetCurrentState())
319        {
320        case kSM_Scheduling:
321            return Schedule();
322
323        //case kSM_Comitting:
324        //    return Commit();
325        }
326        return T::GetCurrentState();
327    }
328
329    int Transition(const Event &evt)
330    {
331        switch (evt.GetTargetState())
332        {
333        case kSM_Scheduling:
334        //case kSM_Comitting:
335            //fSessionId = evt.GetInt();
336            break;
337        }
338
339        return evt.GetTargetState();
340    }
341    int Configure(const Event &)
342    {
343        return T::GetCurrentState();
344    }
345};
346
347// ------------------------------------------------------------------------
348
349template<class S>
350int RunDim(Configuration &conf)
351{
352    WindowLog wout;
353
354    //log.SetWindow(stdscr);
355    if (conf.Has("log"))
356        if (!wout.OpenLogFile(conf.Get<string>("log")))
357            wout << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
358
359    // Start io_service.Run to use the StateMachineImp::Run() loop
360    // Start io_service.run to only use the commandHandler command detaching
361    AutoScheduler<S> io_service(wout);
362    io_service.Run();
363
364    return 0;
365}
366
367template<class T, class S>
368int RunShell(Configuration &conf)
369{
370    static T shell(conf.GetName().c_str(), conf.Get<int>("console")!=1);
371
372    WindowLog &win  = shell.GetStreamIn();
373    WindowLog &wout = shell.GetStreamOut();
374
375    if (conf.Has("log"))
376        if (!wout.OpenLogFile(conf.Get<string>("log")))
377            win << kRed << "ERROR - Couldn't open log-file " << conf.Get<string>("log") << ": " << strerror(errno) << endl;
378
379    AutoScheduler<S> io_service(wout);
380    shell.SetReceiver(io_service);
381
382    boost::thread t(boost::bind(&AutoScheduler<S>::Run, &io_service));
383
384    //io_service.SetReady();
385
386    shell.Run();                 // Run the shell
387    io_service.Stop();           // Signal Loop-thread to stop
388    // io_service.Close();       // Obsolete, done by the destructor
389    // wout << "join: " << t.timed_join(boost::posix_time::milliseconds(0)) << endl;
390
391    // Wait until the StateMachine has finished its thread
392    // before returning and destroying the dim objects which might
393    // still be in use.
394    t.join();
395
396    return 0;
397}
398
399void SetupConfiguration(Configuration &conf)
400{
401    const string n = conf.GetName()+".log";
402
403    //po::options_description config("Program options");
404    po::options_description config("Configuration");
405    config.add_options()
406        ("dns",       var<string>("localhost"),  "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
407        ("log,l",     var<string>(n), "Write log-file")
408        ("no-dim,d",  po_switch(),    "Disable dim services")
409        ("console,c", var<int>(),     "Use console (0=shell, 1=simple buffered, X=simple unbuffered)")
410        ("mintime",   var<int>(),     "minimum observation time")
411        ;
412
413    conf.AddEnv("dns", "DIM_DNS_NODE");
414
415    conf.AddOptions(config);
416    conf.AddOptionsDatabase(config);
417}
418
419int main(int argc, const char* argv[])
420{
421    Configuration conf(argv[0]);
422    SetupConfiguration(conf);
423
424    po::variables_map vm;
425    try
426    {
427        vm = conf.Parse(argc, argv);
428    }
429    catch (std::exception &e)
430    {
431#if BOOST_VERSION > 104000
432        po::multiple_occurrences *MO = dynamic_cast<po::multiple_occurrences*>(&e);
433        if (MO)
434            cout << "Error: " << e.what() << " of '" << MO->get_option_name() << "' option." << endl;
435        else
436#endif
437            cout << "Error: " << e.what() << endl;
438        cout << endl;
439
440        return -1;
441    }
442
443    //if (conf.Has("mintime"))
444    //    cout << "has mintime --------------------+" << conf.Get<int>("mintime")<< "+------------" << endl;
445
446    if (conf.Has("database"))
447        database = conf.Get<string>("database").c_str();
448    else
449    {
450        cout << "Please provide which database you want to use for scheduling." << endl;
451        return -1;
452    }
453
454
455    if (conf.HasHelp() || conf.HasPrint())
456        return -1;
457
458    // To allow overwriting of DIM_DNS_NODE set 0 to 1
459    setenv("DIM_DNS_NODE", conf.Get<string>("dns").c_str(), 1);
460
461    try
462    {
463        // No console access at all
464        if (!conf.Has("console"))
465        {
466            if (conf.Get<bool>("no-dim"))
467                return RunDim<StateMachine>(conf);
468            else
469                return RunDim<StateMachineDim>(conf);
470        }
471        // Cosole access w/ and w/o Dim
472        if (conf.Get<bool>("no-dim"))
473        {
474            if (conf.Get<int>("console")==0)
475                return RunShell<LocalShell, StateMachine>(conf);
476            else
477                return RunShell<LocalConsole, StateMachine>(conf);
478        }
479        else
480        {
481            if (conf.Get<int>("console")==0)
482                return RunShell<LocalShell, StateMachineDim>(conf);
483            else
484                return RunShell<LocalConsole, StateMachineDim>(conf);
485        }
486    }
487    catch (std::exception& e)
488    {
489        std::cerr << "Exception: " << e.what() << "\n";
490    }
491
492    return 0;
493}
Note: See TracBrowser for help on using the repository browser.