#define BOOST_DISABLE_ASSERTS 1 #include #include #include #include #include "Time.h" #include "Event.h" #include "Shell.h" #include "StateMachineDim.h" #include "Connection.h" #include "Configuration.h" #include "Timers.h" #include "Console.h" #include "Converter.h" #include "tools.h" #include namespace ba = boost::asio; namespace bs = boost::system; using ba::deadline_timer; using ba::ip::tcp; using namespace std; #include "LocalControl.h" #include #include // string containing database information string database; // things to be done/checked/changed // * do not use --database, but sth like --database-scheduler // to be independent of the configuration database // * check if the following include is really needed // #include // * string database not as global variable // pass reference to conf and the string as a member of the class // (see chat of 4.5.2011) // * move definition of config parameters to AutoScheduler class // + read in from config // other things to do // // define what is error and warning // config parameters // mintime // runtimec // runtimep // repostime // missing: // from / to for the time range for which the scheduling shall be done // // calculate time for std obs // calculate sun set/rise // // check for double std sources // // return errors // get input from sendcommand // ========================================================================= template class AutoScheduler : public T { bool fNextIsPreview; public: enum states_t { kSM_Scheduling=1, kSM_Comitting, }; struct ObservationParameters { int obskey; int obsmode; int obstype; int splitflag; int telsetup; float fluxweight; float slope; float flux; float ra; float dec; boost::posix_time::ptime starttime; boost::posix_time::ptime stoptime; boost::posix_time::time_duration duration_db; string sourcename; int sourcekey; }; struct FixedObs { int obskey_fixed; int sourcekey_fixed; string sourcename_fixed; int obsmode_fixed; int obstype_fixed; int telsetup_fixed; float ra_fixed; float dec_fixed; boost::posix_time::ptime obsfixedstart; boost::posix_time::ptime obsfixedstop; }; // will need other types of obs // FloatingObs (duration < stop-start + splitflag no) // FloatingSplittedObs (duration < stop-start + splitflag yes) // FixedSlot, i.e. just block a time slot struct StdObs { int obskey_std; int sourcekey_std; string sourcename_std; int obsmode_std; int obstype_std; int telsetup_std; float fluxweight_std; float slope_std; float flux_std; float ra_std; float dec_std; boost::posix_time::ptime obsstdstart; boost::posix_time::ptime obsstdstop; }; struct ScheduledObs { int sourcekey_obs; string sourcename_obs; int obsmode_obs; int obstype_obs; int telsetup_obs; boost::posix_time::ptime obsstart; boost::posix_time::ptime obsstop; }; struct ScheduledRun { //int runnumber; // to be seen, if runnumber is needed int runtype; int sourcekey_run; string sourcename_run;//for convenience int obsmode_run; int obstype_run; int telsetup_run; boost::posix_time::ptime runstart; boost::posix_time::ptime runstop; }; int fSessionId; int Schedule() { boost::posix_time::time_duration runtimec(0, 3, 0); boost::posix_time::time_duration runtimep(0, 3, 0); boost::posix_time::time_duration mintime(1, 0, 0); boost::posix_time::time_duration repostime(0, 5, 0); stringstream str; str << "Scheduling started -> Preview (id=" << fSessionId << ")"; T::Message(str); static const boost::regex expr("(([[:word:].-]+)(:(.+))?@)?([[:word:].-]+)(:([[:digit:]]+))?(/([[:word:].-]+))"); // 2: user // 4: pass // 5: server // 7: port // 9: db boost::smatch what; if (!boost::regex_match(database, what, expr, boost::match_extra)) { cout << "Couldn't parse '" << database << "'." << endl; throw; } if (what.size()!=10) { cout << "Error parsing '" << database << "'." << endl; throw; } const string user = what[2]; const string passwd = what[4]; const string server = what[5]; const string db = what[9]; const int port = atoi(string(what[7]).c_str()); cout << "Connecting to '"; if (!user.empty()) cout << user << "@"; cout << server; if (port) cout << ":" << port; if (!db.empty()) cout << "/" << db; cout << "'" << endl; mysqlpp::Connection conn(db.c_str(), server.c_str(), user.c_str(), passwd.c_str(), port); if (!conn.connected()) { cout << "MySQL connection error: " << conn.error() << endl; throw; } // get observation parameters from DB // maybe order by priority? mysqlpp::Query query = conn.query("SELECT fObservationKEY, fStartTime, fStopTime, fDuration, fSourceName, fSourceKEY, fSplitFlag, fFluxWeight, fSlope, fFlux, fRightAscension, fDeclination, fObservationModeKEY, fObservationTypeKEY , fTelescopeSetupKEY FROM ObservationParameters LEFT JOIN Source USING(fSourceKEY) ORDER BY fStartTime"); mysqlpp::StoreQueryResult res = query.store(); if (!res) { cout << "MySQL query failed: " << query.error() << endl; throw; } cout << "Found " << res.num_rows() << " Observation Parameter sets." << endl; ObservationParameters olist[res.num_rows()]; std::vector obsfixedlist; std::vector obsstdlist; std::vector obslist; std::vector runlist; // loop over observation parameters from DB // fill these parameters into FixedObs and StdObs int counter=0; int counter2=0; int counter3=0; for (vector::iterator v=res.begin(); v("mintime"), 0); t1 >> Time::sql >> olist[counter].starttime; t2 >> Time::sql >> olist[counter].stoptime; t3 >> olist[counter].duration_db; boost::posix_time::time_period period(olist[counter].starttime, olist[counter].stoptime); olist[counter].sourcename=(*v)[4].c_str(); olist[counter].sourcekey=(*v)[5]; if (!(*v)[0].is_null()) olist[counter].obskey=(*v)[0]; //else // cout << "no obskey" << endl; if (!(*v)[12].is_null()) olist[counter].obsmode=(*v)[12]; //else // cout << "no obsmode" << endl; if (!(*v)[13].is_null()) olist[counter].obstype=(*v)[13]; //else // cout << "no obstype" << endl; if (!(*v)[14].is_null()) olist[counter].telsetup=(*v)[14]; //else // cout << "no telsetup" << endl; if (!(*v)[6].is_null()) olist[counter].splitflag=(*v)[6]; //else //{ // cout << "no splitflag" << endl; ////stringstream str; ////str << "no splitflag"; //T::Message("no splitflag"); //T::Error("---> no splitflag"); //T::Warn("+++ no splitflag"); //} if (!(*v)[7].is_null()) olist[counter].fluxweight=(*v)[7]; else { olist[counter].fluxweight=0; // cout << "no fluxweight" << endl; } if (!(*v)[8].is_null()) olist[counter].slope=(*v)[8]; //else // cout << "no slope" << endl; if (!(*v)[9].is_null()) olist[counter].flux=(*v)[9]; //else // cout << "no flux" << endl; if (!(*v)[10].is_null()) olist[counter].ra=(*v)[10]; //else // cout << "no ra" << endl; if (!(*v)[11].is_null()) olist[counter].dec=(*v)[11]; //else // cout << "no dec" << endl; // time_duration cannot be used, as only up to 99 hours are handeled boost::posix_time::time_duration duration = period.length(); /* if (olist[counter].stoptime < olist[counter].starttime+mintime) cout << " ====> WARN: Observation too short. " << endl; if (olist[counter].starttime.is_not_a_date_time()) cout << " WARN: starttime not a date_time. " << endl; else cout << " start: " << Time::sql << olist[counter].starttime << endl; if (olist[counter].stoptime.is_not_a_date_time()) cout << " WARN: stoptime not a date_time. " << endl; else cout << " stop: " << Time::sql << olist[counter].stoptime << endl; if (!(olist[counter].starttime.is_not_a_date_time() || olist[counter].stoptime.is_not_a_date_time())) cout << " diff: " << period << endl; if (olist[counter].stoptime < olist[counter].starttime) cout << " ====> WARN: stop time (" << olist[counter].stoptime << ") < start time (" << olist[counter].starttime << "). " << endl; cout << "diff: " << duration << flush; cout << "dur_db: " << olist[counter].duration_db << endl; */ // always filled: obstype // // fixed observations: // filled: starttime, stoptime // not filled: fluxweight // maybe filled: obsmode, telsetup, source (not filled for FixedSlotObs) // maybe filled: duration (filled for FloatingObs and FloatingSplittedObs) // maybe filled: splitflag (filled for FloatingSplittedObs) // // std observations: // filled: fluxweight, telsetup, obsmore, source // not filled: starttime, stoptime, duration // fixed observations if (!(olist[counter].stoptime.is_not_a_date_time() && olist[counter].starttime.is_not_a_date_time()) && olist[counter].fluxweight==0 ) { obsfixedlist.resize(counter2+1); obsfixedlist[counter2].obsfixedstart=olist[counter].starttime; obsfixedlist[counter2].obsfixedstop=olist[counter].stoptime; obsfixedlist[counter2].sourcename_fixed=olist[counter].sourcename; obsfixedlist[counter2].obskey_fixed=olist[counter].obskey; obsfixedlist[counter2].obstype_fixed=olist[counter].obstype; obsfixedlist[counter2].obsmode_fixed=olist[counter].obsmode; obsfixedlist[counter2].telsetup_fixed=olist[counter].telsetup; obsfixedlist[counter2].sourcekey_fixed=olist[counter].sourcekey; obsfixedlist[counter2].ra_fixed=olist[counter].ra; obsfixedlist[counter2].dec_fixed=olist[counter].dec; counter2++; } // std obs if (olist[counter].stoptime.is_not_a_date_time() && olist[counter].starttime.is_not_a_date_time() && olist[counter].fluxweight>0 ) { obsstdlist.resize(counter3+1); obsstdlist[counter3].sourcename_std=olist[counter].sourcename; obsstdlist[counter3].obskey_std=olist[counter].obskey; obsstdlist[counter3].obsmode_std=olist[counter].obsmode; obsstdlist[counter3].obstype_std=olist[counter].obstype; obsstdlist[counter3].telsetup_std=olist[counter].telsetup; obsstdlist[counter3].sourcekey_std=olist[counter].sourcekey; obsstdlist[counter3].fluxweight_std=olist[counter].fluxweight; obsstdlist[counter3].flux_std=olist[counter].flux; obsstdlist[counter3].slope_std=olist[counter].slope; obsstdlist[counter3].ra_std=olist[counter].ra; obsstdlist[counter3].dec_std=olist[counter].dec; counter3++; } counter++; } cout << obsfixedlist.size() << " fixed observations found. " << endl; cout << obsstdlist.size() << " standard observations found. " << endl; // loop to add the fixed observations to the ScheduledObs list // missing: checks and evaluation // * check for sun // * check for moon counter2=0; boost::posix_time::ptime finalobsfixedstart; boost::posix_time::ptime finalobsfixedstop; boost::posix_time::time_duration delta1; boost::posix_time::time_duration delta2; boost::posix_time::time_duration delta0(0,0,0); struct vector::iterator vobs; struct vector::iterator vobs5; cout << "Fixed Observations: " << endl; for (vobs=obsfixedlist.begin(); vobs!=obsfixedlist.end(); vobs++) { counter3=0; delta1=delta0; delta2=delta0; finalobsfixedstart=obsfixedlist[counter2].obsfixedstart; finalobsfixedstop=obsfixedlist[counter2].obsfixedstop; //cout << "final: " << finalobsfixedstart << " - " << finalobsfixedstop << endl; for (vobs5=obsfixedlist.begin(); vobs5!=obsfixedlist.end(); vobs5++) { //cout << "startc" << obsfixedlist[counter2].obsfixedstart << " -- " << flush; //cout << "stopc" << obsfixedlist[counter2].obsfixedstop << " -- " << flush; //cout << "start" << (*vobs5).obsfixedstart << " -- " << flush; //cout << "stop" << (*vobs5).obsfixedstop << " -- " << flush; //if (obsfixedlist[counter2].obsfixedstart < (*vobs5).obsfixedstop // && (*vobs5).obsfixedstop < obsfixedlist[counter2].obsfixedstop // && counter2!=counter3) if ((*vobs5).obsfixedstart < obsfixedlist[counter2].obsfixedstop && obsfixedlist[counter2].obsfixedstop <= (*vobs5).obsfixedstop && obsfixedlist[counter2].obsfixedstart <= (*vobs5).obsfixedstart && counter2!=counter3) { //delta1=((*vobs5).obsfixedstop-obsfixedlist[counter2].obsfixedstart)/2; delta1=(obsfixedlist[counter2].obsfixedstop-(*vobs5).obsfixedstart)/2; finalobsfixedstop=obsfixedlist[counter2].obsfixedstop-delta1; //cout << "delta1 " << delta1 << endl; stringstream warndelta1; warndelta1 << "Overlap between two fixed observations (" << obsfixedlist[counter2].obskey_fixed << " " << (*vobs5).obskey_fixed << "). The stoptime of " << obsfixedlist[counter2].obskey_fixed << " has been changed."; T::Warn(warndelta1); } //if (obsfixedlist[counter2].obsfixedstart < (*vobs5).obsfixedstart // && (*vobs5).obsfixedstart < obsfixedlist[counter2].obsfixedstop // && counter2!=counter3) if ((*vobs5).obsfixedstart <= obsfixedlist[counter2].obsfixedstart && obsfixedlist[counter2].obsfixedstart < (*vobs5).obsfixedstop && obsfixedlist[counter2].obsfixedstop >= (*vobs5).obsfixedstop && counter2!=counter3) { delta2=((*vobs5).obsfixedstop-obsfixedlist[counter2].obsfixedstart)/2; finalobsfixedstart=obsfixedlist[counter2].obsfixedstart+delta2; //cout << "delta2 " << delta2 << endl; stringstream warndelta2; warndelta2 << "Overlap between two fixed observations (" << obsfixedlist[counter2].obskey_fixed << " " << (*vobs5).obskey_fixed << "). The starttime of " << obsfixedlist[counter2].obskey_fixed << " has been changed."; T::Warn(warndelta2); } counter3++; //cout << endl; } //cout << "-> final: " << finalobsfixedstart << " - " << finalobsfixedstop << endl; obslist.resize(counter2+1); //obslist[counter2].obsstart=obsfixedlist[counter2].obsfixedstart; obslist[counter2].obsstart=finalobsfixedstart; //obslist[counter2].obsstop=obsfixedlist[counter2].obsfixedstop; obslist[counter2].obsstop=finalobsfixedstop; obslist[counter2].sourcename_obs=obsfixedlist[counter2].sourcename_fixed; obslist[counter2].obsmode_obs=obsfixedlist[counter2].obsmode_fixed; obslist[counter2].obstype_obs=obsfixedlist[counter2].obstype_fixed; obslist[counter2].telsetup_obs=obsfixedlist[counter2].telsetup_fixed; obslist[counter2].sourcekey_obs=obsfixedlist[counter2].sourcekey_fixed; counter2++; //cout << " " << (*vobs).sourcename_fixed << " " << (*vobs).obsfixedstart << flush; //cout << " - " << (*vobs).obsfixedstop << endl; } // in this loop the standard observations shall be // checked, evaluated // the observation times shall be calculated // and the observations added to the ScheduledObs list struct vector::iterator vobs2; cout << "Standard Observations: " << endl; for (vobs2=obsstdlist.begin(); vobs2!=obsstdlist.end(); vobs2++) { cout << " " << (*vobs2).sourcename_std << endl; } // in this loop from the scheduled observations the list // of scheduled runs shall be calculated counter2=0; struct vector::iterator vobs3; for (vobs3=obslist.begin(); vobs3!=obslist.end(); vobs3++) { // add runtype and runtime calculations runlist.resize(counter2+1); runlist[counter2].runstart=obslist[counter2].obsstart; runlist[counter2].runstop=obslist[counter2].obsstop; runlist[counter2].sourcename_run=obslist[counter2].sourcename_obs; runlist[counter2].obsmode_run=obslist[counter2].obsmode_obs; runlist[counter2].obstype_run=obslist[counter2].obstype_obs; runlist[counter2].telsetup_run=obslist[counter2].telsetup_obs; runlist[counter2].sourcekey_run=obslist[counter2].sourcekey_obs; counter2++; cout << (*vobs3).sourcename_obs << endl; } //delete old scheduled runs from the DB mysqlpp::Query query0 = conn.query("DELETE FROM ScheduledRun"); mysqlpp::SimpleResult res0 = query0.execute(); if (!res0) { cout << "MySQL query failed: " << query0.error() << endl; throw; } counter3=0; boost::posix_time::ptime finalstarttime; boost::posix_time::ptime finalstoptime; struct vector::iterator vobs4; for (vobs4=runlist.begin(); vobs4!=runlist.end(); vobs4++) { for (int i=2; i<5; i++) { switch(i) { case 2: finalstarttime=runlist[counter3].runstart+repostime+runtimec+runtimep; finalstoptime=runlist[counter3].runstop; break; case 3: finalstarttime=runlist[counter3].runstart+repostime; finalstoptime=runlist[counter3].runstart+runtimep+repostime; break; case 4: finalstarttime=runlist[counter3].runstart+runtimep+repostime; finalstoptime=runlist[counter3].runstart+repostime+runtimep+runtimec; break; } stringstream q1; //cout << (*vobs4).sourcename_run << endl; q1 << "INSERT ScheduledRun set fStartTime='" << Time::sql << finalstarttime; q1 << "', fStopTime='" << Time::sql << finalstoptime; q1 << "', fSourceKEY='" << (*vobs4).sourcekey_run; q1 << "', fRunTypeKEY='" << i; q1 << "', fTelescopeSetupKEY='" << (*vobs4).telsetup_run; q1 << "', fObservationTypeKEY='" << (*vobs4).obstype_run; q1 << "', fObservationModeKEY='" << (*vobs4).obsmode_run; q1 << "'"; //cout << "executing query: " << q1.str() << endl; mysqlpp::Query query1 = conn.query(q1.str()); mysqlpp::SimpleResult res1 = query1.execute(); if (!res1) { cout << "MySQL query failed: " << query1.error() << endl; throw; } } counter3++; } //usleep(3000000); T::Message("Scheduling done."); fSessionId = -1; bool error = false; return error ? T::kSM_Error : T::kSM_Ready; } /* // commit probably done by webinterface int Commit() { stringstream str; str << "Comitting preview (id=" << fSessionId << ")"; T::Message(str); usleep(3000000); T::Message("Comitted."); fSessionId = -1; bool error = false; return error ? T::kSM_Error : T::kSM_Ready; } */ AutoScheduler(ostream &out=cout) : T(out, "SCHEDULER"), fNextIsPreview(true), fSessionId(-1) { AddStateName(kSM_Scheduling, "Scheduling"); //AddStateName(kSM_Comitting, "Comitting"); AddTransition(kSM_Scheduling, "SCHEDULE", T::kSM_Ready); //AddTransition(kSM_Comitting, "COMMIT", T::kSM_Ready); T::PrintListOfEvents(); } int Execute() { switch (T::GetCurrentState()) { case kSM_Scheduling: return Schedule(); //case kSM_Comitting: // return Commit(); } return T::GetCurrentState(); } int Transition(const Event &evt) { switch (evt.GetTargetState()) { case kSM_Scheduling: //case kSM_Comitting: //fSessionId = evt.GetInt(); break; } return evt.GetTargetState(); } int Configure(const Event &) { return T::GetCurrentState(); } }; // ------------------------------------------------------------------------ template int RunDim(Configuration &conf) { WindowLog wout; //log.SetWindow(stdscr); if (conf.Has("log")) if (!wout.OpenLogFile(conf.Get("log"))) wout << kRed << "ERROR - Couldn't open log-file " << conf.Get("log") << ": " << strerror(errno) << endl; // Start io_service.Run to use the StateMachineImp::Run() loop // Start io_service.run to only use the commandHandler command detaching AutoScheduler io_service(wout); io_service.Run(); return 0; } template int RunShell(Configuration &conf) { static T shell(conf.GetName().c_str(), conf.Get("console")!=1); WindowLog &win = shell.GetStreamIn(); WindowLog &wout = shell.GetStreamOut(); if (conf.Has("log")) if (!wout.OpenLogFile(conf.Get("log"))) win << kRed << "ERROR - Couldn't open log-file " << conf.Get("log") << ": " << strerror(errno) << endl; AutoScheduler io_service(wout); shell.SetReceiver(io_service); boost::thread t(boost::bind(&AutoScheduler::Run, &io_service)); //io_service.SetReady(); shell.Run(); // Run the shell io_service.Stop(); // Signal Loop-thread to stop // io_service.Close(); // Obsolete, done by the destructor // wout << "join: " << t.timed_join(boost::posix_time::milliseconds(0)) << endl; // Wait until the StateMachine has finished its thread // before returning and destroying the dim objects which might // still be in use. t.join(); return 0; } void SetupConfiguration(Configuration &conf) { const string n = conf.GetName()+".log"; //po::options_description config("Program options"); po::options_description config("Configuration"); config.add_options() ("dns", var("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)") ("log,l", var(n), "Write log-file") ("no-dim,d", po_switch(), "Disable dim services") ("console,c", var(), "Use console (0=shell, 1=simple buffered, X=simple unbuffered)") ("mintime", var(), "minimum observation time") ; conf.AddEnv("dns", "DIM_DNS_NODE"); conf.AddOptions(config); conf.AddOptionsDatabase(config); } int main(int argc, const char* argv[]) { Configuration conf(argv[0]); SetupConfiguration(conf); po::variables_map vm; try { vm = conf.Parse(argc, argv); } catch (std::exception &e) { #if BOOST_VERSION > 104000 po::multiple_occurrences *MO = dynamic_cast(&e); if (MO) cout << "Error: " << e.what() << " of '" << MO->get_option_name() << "' option." << endl; else #endif cout << "Error: " << e.what() << endl; cout << endl; return -1; } //if (conf.Has("mintime")) // cout << "has mintime --------------------+" << conf.Get("mintime")<< "+------------" << endl; if (conf.Has("database")) database = conf.Get("database").c_str(); else { cout << "Please provide which database you want to use for scheduling." << endl; return -1; } if (conf.HasHelp() || conf.HasPrint()) return -1; // To allow overwriting of DIM_DNS_NODE set 0 to 1 setenv("DIM_DNS_NODE", conf.Get("dns").c_str(), 1); try { // No console access at all if (!conf.Has("console")) { if (conf.Get("no-dim")) return RunDim(conf); else return RunDim(conf); } // Cosole access w/ and w/o Dim if (conf.Get("no-dim")) { if (conf.Get("console")==0) return RunShell(conf); else return RunShell(conf); } else { if (conf.Get("console")==0) return RunShell(conf); else return RunShell(conf); } } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; } return 0; }