#define BOOST_DISABLE_ASSERTS 1 #include #include #include #include "Time.h" #include "Event.h" #include "Shell.h" #include "StateMachineDim.h" #include "Connection.h" #include "Configuration.h" #include "Timers.h" #include "Console.h" #include "Converter.h" #include "tools.h" #include namespace ba = boost::asio; namespace bs = boost::system; using ba::deadline_timer; using ba::ip::tcp; using namespace std; #include "LocalControl.h" #include #include // string containing database information string database; // ========================================================================= template class AutoScheduler : public T { bool fNextIsPreview; public: enum states_t { kSM_Scheduling=1, kSM_Comitting, }; struct ObservationParameters { int obskey; boost::posix_time::ptime starttime; boost::posix_time::ptime stoptime; boost::posix_time::time_duration duration_db; string sourcename; }; struct FixedObs { int obskey_fixed; int sourcekey_fixed; string sourcename_fixed; int obsmode_fixed; boost::posix_time::ptime obsfixedstart; boost::posix_time::ptime obsfixedstop; }; struct StdObs { int obskey_std; int sourcekey_std; string sourcename_std; int obsmode_std; boost::posix_time::ptime obsstdstart; boost::posix_time::ptime obsstdstop; }; struct ScheduledObs { int sourcekey_obs; string sourcename_obs; int obsmode_obs; boost::posix_time::ptime obsstart; boost::posix_time::ptime obsstop; }; struct ScheduledRun { int runnumber; int runtype; int sourcekey; int obsmode; boost::posix_time::ptime runstart; boost::posix_time::ptime runstop; }; int fSessionId; int Schedule() { stringstream str; str << "Scheduling started -> Preview (id=" << fSessionId << ")"; T::Message(str); //static const boost::regex expr("(([[:word:].-]+)(:(.+))?@)?([[:word:].-]+)(:([[:digit:]]+))?(/([[:word:].-]+))?"); static const boost::regex expr("(([[:word:].-]+)(:(.+))?@)?([[:word:].-]+)(:([[:digit:]]+))?(/([[:word:].-]+))"); // 2: user // 4: pass // 5: server // 7: port // 9: db //string database="root:Ihtp4aDB@localhost:3306/Scheduling"; boost::smatch what; if (!boost::regex_match(database, what, expr, boost::match_extra)) { cout << "Couldn't parse '" << database << "'." << endl; throw; } if (what.size()!=10) { cout << "Error parsing '" << database << "'." << endl; throw; } const string user = what[2]; const string passwd = what[4]; const string server = what[5]; const string db = what[9]; const int port = atoi(string(what[7]).c_str()); cout << "Connecting to '"; if (!user.empty()) cout << user << "@"; cout << server; if (port) cout << ":" << port; if (!db.empty()) cout << "/" << db; cout << "'" << endl; mysqlpp::Connection conn(db.c_str(), server.c_str(), user.c_str(), passwd.c_str(), port); if (!conn.connected()) { cout << "MySQL connection error: " << conn.error() << endl; throw; } //mysqlpp::Connection conn(db.c_str(), server.c_str(), user.c_str(), passwd.c_str(), port); //mysqlpp::Connection conn("Scheduling", "localhost", "root", "Ihtp4aDB", 3306); if (!conn.connected()) { cout << "MySQL connection error: " << conn.error() << endl; throw; } // get observation parameters from DB mysqlpp::Query query = conn.query("SELECT fObservationKEY, fStartTime, fStopTime, fDuration, fSourceName, fSourceKEY, fSplitFlag, fFluxWeight, fObservationModeKEY, fObservationTypeKEY FROM ObservationParameters LEFT JOIN Source USING(fSourceKEY) ORDER BY fObservationKEY"); //maybe order by priority? mysqlpp::StoreQueryResult res = query.store(); if (!res) { cout << "MySQL query failed: " << query.error() << endl; throw; } cout << "Found " << res.num_rows() << " Observation Parameter sets." << endl; ObservationParameters olist[res.num_rows()]; std::vector obsfixedlist; std::vector obsstdlist; std::vector obslist; std::vector runlist; // loop over observation parameters from DB // fill these parameters into FixedObs and StdObs int counter=0; int counter2=0; int counter3=0; for (vector::iterator v=res.begin(); v("mintime"), 0); boost::posix_time::time_duration mintime(1, 0, 0); t1 >> Time::sql >> olist[counter].starttime; t2 >> Time::sql >> olist[counter].stoptime; t3 >> olist[counter].duration_db; boost::posix_time::time_period period(olist[counter].starttime, olist[counter].stoptime); olist[counter].sourcename=(*v)[4].c_str(); olist[counter].obskey=(*v)[0]; // time_duration cannot be used, as only up to 99 hours are handeled boost::posix_time::time_duration duration = period.length(); /* if (olist[counter].stoptime < olist[counter].starttime+mintime) cout << " ====> WARN: Observation too short. " << endl; if (olist[counter].starttime.is_not_a_date_time()) cout << " WARN: starttime not a date_time. " << endl; else cout << " start: " << Time::sql << olist[counter].starttime << endl; if (olist[counter].stoptime.is_not_a_date_time()) cout << " WARN: stoptime not a date_time. " << endl; else cout << " stop: " << Time::sql << olist[counter].stoptime << endl; if (!(olist[counter].starttime.is_not_a_date_time() || olist[counter].stoptime.is_not_a_date_time())) cout << " diff: " << period << endl; if (olist[counter].stoptime < olist[counter].starttime) cout << " ====> WARN: stop time (" << olist[counter].stoptime << ") < start time (" << olist[counter].starttime << "). " << endl; cout << "diff: " << duration << flush; cout << "dur_db: " << olist[counter].duration_db << endl; */ // if start and stop time are available, it is a fixed observation if (!(olist[counter].stoptime.is_not_a_date_time() && olist[counter].starttime.is_not_a_date_time())) { obsfixedlist.resize(counter2+1); obsfixedlist[counter2].obsfixedstart=olist[counter].starttime; obsfixedlist[counter2].obsfixedstop=olist[counter].stoptime; obsfixedlist[counter2].sourcename_fixed=olist[counter].sourcename; obsfixedlist[counter2].obskey_fixed=olist[counter].obskey; counter2++; } else { obsstdlist.resize(counter3+1); obsstdlist[counter3].sourcename_std=olist[counter].sourcename; obsstdlist[counter3].obskey_std=olist[counter].obskey; counter3++; } counter++; } cout << obsfixedlist.size() << " fixed observations found. " << endl; cout << obsstdlist.size() << " standard observations found. " << endl; // in this loop the fixed observations shall be // checked, evaluated and added to the ScheduledObs list struct vector::iterator vobs; cout << "Fixed Observations: " << endl; for (vobs=obsfixedlist.begin(); vobs!=obsfixedlist.end(); vobs++) { cout << " " << (*vobs).sourcename_fixed << " " << (*vobs).obsfixedstart << flush; cout << " - " << (*vobs).obsfixedstop << endl; } // in this loop the standard observations shall be // checked, evaluated // the observation times shall be calculated // and the observations added to the ScheduledObs list struct vector::iterator vobs2; cout << "Standard Observations: " << endl; for (vobs2=obsstdlist.begin(); vobs2!=obsstdlist.end(); vobs2++) { cout << " " << (*vobs2).sourcename_std << endl; } // in this loop from the scheduled observations the list // of scheduled runs shall be calculated struct vector::iterator vobs3; for (vobs3=obslist.begin(); vobs3!=obslist.end(); vobs3++) { cout << (*vobs3).sourcename_obs << endl; } //usleep(3000000); T::Message("Scheduling done."); fSessionId = -1; bool error = false; return error ? T::kSM_Error : T::kSM_Ready; } /* // commit probably done by webinterface int Commit() { stringstream str; str << "Comitting preview (id=" << fSessionId << ")"; T::Message(str); usleep(3000000); T::Message("Comitted."); fSessionId = -1; bool error = false; return error ? T::kSM_Error : T::kSM_Ready; } */ AutoScheduler(ostream &out=cout) : T(out, "SCHEDULER"), fNextIsPreview(true), fSessionId(-1) { AddStateName(kSM_Scheduling, "Scheduling"); //AddStateName(kSM_Comitting, "Comitting"); AddTransition(kSM_Scheduling, "SCHEDULE", T::kSM_Ready); //AddTransition(kSM_Comitting, "COMMIT", T::kSM_Ready); T::PrintListOfEvents(); } int Execute() { switch (T::GetCurrentState()) { case kSM_Scheduling: return Schedule(); //case kSM_Comitting: // return Commit(); } return T::GetCurrentState(); } int Transition(const Event &evt) { switch (evt.GetTargetState()) { case kSM_Scheduling: //case kSM_Comitting: //fSessionId = evt.GetInt(); break; } return evt.GetTargetState(); } int Configure(const Event &) { return T::GetCurrentState(); } }; // ------------------------------------------------------------------------ template int RunDim(Configuration &conf) { WindowLog wout; //log.SetWindow(stdscr); if (conf.Has("log")) if (!wout.OpenLogFile(conf.Get("log"))) wout << kRed << "ERROR - Couldn't open log-file " << conf.Get("log") << ": " << strerror(errno) << endl; // Start io_service.Run to use the StateMachineImp::Run() loop // Start io_service.run to only use the commandHandler command detaching AutoScheduler io_service(wout); io_service.Run(); return 0; } template int RunShell(Configuration &conf) { static T shell(conf.GetName().c_str(), conf.Get("console")!=1); WindowLog &win = shell.GetStreamIn(); WindowLog &wout = shell.GetStreamOut(); if (conf.Has("log")) if (!wout.OpenLogFile(conf.Get("log"))) win << kRed << "ERROR - Couldn't open log-file " << conf.Get("log") << ": " << strerror(errno) << endl; AutoScheduler io_service(wout); shell.SetReceiver(io_service); boost::thread t(boost::bind(&AutoScheduler::Run, &io_service)); //io_service.SetReady(); shell.Run(); // Run the shell io_service.Stop(); // Signal Loop-thread to stop // io_service.Close(); // Obsolete, done by the destructor // wout << "join: " << t.timed_join(boost::posix_time::milliseconds(0)) << endl; // Wait until the StateMachine has finished its thread // before returning and destroying the dim objects which might // still be in use. t.join(); return 0; } void SetupConfiguration(Configuration &conf) { const string n = conf.GetName()+".log"; //po::options_description config("Program options"); po::options_description config("Configuration"); config.add_options() ("dns", var("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)") ("log,l", var(n), "Write log-file") ("no-dim,d", po_switch(), "Disable dim services") ("console,c", var(), "Use console (0=shell, 1=simple buffered, X=simple unbuffered)") ("mintime", var(), "minimum observation time") ; conf.AddEnv("dns", "DIM_DNS_NODE"); conf.AddOptions(config); conf.AddOptionsDatabase(config); } int main(int argc, const char* argv[]) { Configuration conf(argv[0]); SetupConfiguration(conf); po::variables_map vm; try { vm = conf.Parse(argc, argv); } catch (std::exception &e) { #if BOOST_VERSION > 104000 po::multiple_occurrences *MO = dynamic_cast(&e); if (MO) cout << "Error: " << e.what() << " of '" << MO->get_option_name() << "' option." << endl; else #endif cout << "Error: " << e.what() << endl; cout << endl; return -1; } //if (conf.Has("mintime")) // cout << "has mintime --------------------+" << conf.Get("mintime")<< "+------------" << endl; if (conf.Has("database")) database = conf.Get("database").c_str(); else { cout << "Please provide which database you want to use for scheduling." << endl; return -1; } if (conf.HasHelp() || conf.HasPrint()) return -1; // To allow overwriting of DIM_DNS_NODE set 0 to 1 setenv("DIM_DNS_NODE", conf.Get("dns").c_str(), 1); try { // No console access at all if (!conf.Has("console")) { if (conf.Get("no-dim")) return RunDim(conf); else return RunDim(conf); } // Cosole access w/ and w/o Dim if (conf.Get("no-dim")) { if (conf.Get("console")==0) return RunShell(conf); else return RunShell(conf); } else { if (conf.Get("console")==0) return RunShell(conf); else return RunShell(conf); } } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; } return 0; }