#include #include #if BOOST_VERSION < 104400 #if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4)) #undef BOOST_HAS_RVALUE_REFS #endif #endif #include #include #include #include #include #include "Dim.h" #include "Time.h" #include "Event.h" #include "Shell.h" #include "StateMachineDim.h" #include "Connection.h" #include "Configuration.h" #include "Timers.h" #include "Console.h" #include "Converter.h" #include "LocalControl.h" #include "tools.h" namespace ba = boost::asio; namespace bs = boost::system; using ba::deadline_timer; using ba::ip::tcp; using namespace std; using namespace boost::gregorian; using namespace boost::posix_time; // things to be done/checked/changed // * do not use --database, but sth like --database-scheduler // to be independent of the configuration database // * check if the following include is really needed // #include // * string database not as global variable // pass reference to conf and the string as a member of the class // (see chat of 4.5.2011) // * move definition of config parameters to AutoScheduler class // + read in from config // other things to do // // define what to transmit as info/warn/error // config parameters: // mintime // runtimec // runtimep // repostime // missing: // // calculate time for std obs // calculate sun set/rise // // return errors and other otherput from sendcommand to webinterface // in which cases should the scheduler go in error state? // when db is unavailable // does one also need a 'set scheduler to ready' function then? // do we want any error state at all? // ========================================================================= template class AutoScheduler : public T { bool fNextIsPreview; public: enum states_t { kSM_Scheduling=1, kSM_Comitting, }; struct ObservationParameters { int obskey; int obsmode; int obstype; int splitflag; int telsetup; float fluxweight; float slope; float flux; float ra; float dec; boost::posix_time::ptime starttime; boost::posix_time::ptime stoptime; boost::posix_time::time_duration duration_db; string sourcename; int sourcekey; }; struct FixedObs { int obskey_fixed; int sourcekey_fixed; string sourcename_fixed; int obsmode_fixed; int obstype_fixed; int telsetup_fixed; float ra_fixed; float dec_fixed; boost::posix_time::ptime obsfixedstart; boost::posix_time::ptime obsfixedstop; }; // will need other types of obs // FloatingObs (duration < stop-start + splitflag no) // FloatingSplittedObs (duration < stop-start + splitflag yes) // FixedSlot, i.e. just block a time slot struct StdObs { int obskey_std; int sourcekey_std; string sourcename_std; int obsmode_std; int obstype_std; int telsetup_std; float fluxweight_std; float slope_std; float flux_std; float ra_std; float dec_std; boost::posix_time::ptime obsstdstart; boost::posix_time::ptime obsstdstop; }; struct ScheduledObs { int sourcekey_obs; string sourcename_obs; int obsmode_obs; int obstype_obs; int telsetup_obs; boost::posix_time::ptime obsstart; boost::posix_time::ptime obsstop; }; struct ScheduledRun { //int runnumber; // to be seen, if runnumber is needed int runtype; int sourcekey_run; string sourcename_run;//for convenience int obsmode_run; int obstype_run; int telsetup_run; boost::posix_time::ptime runstart; boost::posix_time::ptime runstop; }; int fSessionId; string fDatabase; string fDBName; int Schedule() { bool error = false; boost::posix_time::time_duration runtimec(0, 3, 0); boost::posix_time::time_duration runtimep(0, 3, 0); boost::posix_time::time_duration mintime(1, 0, 0); boost::posix_time::time_duration repostime(0, 5, 0); //boost::posix_time::ptime startsched=microsec_clock::local_time(); boost::posix_time::ptime startsched(microsec_clock::local_time()); boost::posix_time::ptime stopsched=startsched+years(1); cout << "Scheduling for the period from " << startsched << " to " << stopsched << endl; static const boost::regex expr("(([[:word:].-]+)(:(.+))?@)?([[:word:].-]+)(:([[:digit:]]+))?(/([[:word:].-]+))"); // 2: user // 4: pass // 5: server // 7: port // 9: db boost::smatch what; if (!boost::regex_match(fDatabase, what, expr, boost::match_extra)) { cout << "Couldn't parse '" << fDatabase << "'." << endl; throw; } if (what.size()!=10) { cout << "Error parsing '" << fDatabase << "'." << endl; throw; } const string user = what[2]; const string passwd = what[4]; const string server = what[5]; string db = what[9]; if (fDBName.size()!=0) db = fDBName; const int port = atoi(string(what[7]).c_str()); ostringstream dbnamemsg; dbnamemsg << "Scheduling started -> using database " << fDBName << "."; T::Message(dbnamemsg); cout << "Connecting to '"; if (!user.empty()) cout << user << "@"; cout << server; if (port) cout << ":" << port; if (!db.empty()) cout << "/" << db; cout << "'" << endl; mysqlpp::Connection conn(db.c_str(), server.c_str(), user.c_str(), passwd.c_str(), port); if (!conn.connected()) { cout << "MySQL connection error: " << conn.error() << endl; throw; } // get observation parameters from DB // maybe order by priority? mysqlpp::Query query = conn.query("SELECT fObservationKEY, fStartTime, fStopTime, fDuration, fSourceName, fSourceKEY, fSplitFlag, fFluxWeight, fSlope, fFlux, fRightAscension, fDeclination, fObservationModeKEY, fObservationTypeKEY , fTelescopeSetupKEY FROM ObservationParameters LEFT JOIN Source USING(fSourceKEY) ORDER BY fStartTime"); mysqlpp::StoreQueryResult res = query.store(); if (!res) { cout << "MySQL query failed: " << query.error() << endl; throw; } cout << "Found " << res.num_rows() << " Observation Parameter sets." << endl; ObservationParameters olist[res.num_rows()]; std::vector obsfixedlist; std::vector obsstdlist; std::vector obslist; std::vector runlist; // loop over observation parameters from DB // fill these parameters into FixedObs and StdObs int counter=0; int counter2=0; int counter3=0; cout << "Obs: (, ) from to " << endl; for (vector::iterator v=res.begin(); v("mintime"), 0); t1 >> Time::sql >> olist[counter].starttime; t2 >> Time::sql >> olist[counter].stoptime; t3 >> olist[counter].duration_db; boost::posix_time::time_period period(olist[counter].starttime, olist[counter].stoptime); olist[counter].sourcename=(*v)[4].c_str(); olist[counter].sourcekey=(*v)[5]; if (!(*v)[0].is_null()) olist[counter].obskey=(*v)[0]; if (!(*v)[12].is_null()) olist[counter].obsmode=(*v)[12]; if (!(*v)[13].is_null()) olist[counter].obstype=(*v)[13]; if (!(*v)[14].is_null()) olist[counter].telsetup=(*v)[14]; if (!(*v)[6].is_null()) olist[counter].splitflag=(*v)[6]; if (!(*v)[7].is_null()) olist[counter].fluxweight=(*v)[7]; else olist[counter].fluxweight=0;//set fluxweight to 0 for check below if (!(*v)[8].is_null()) olist[counter].slope=(*v)[8]; if (!(*v)[9].is_null()) olist[counter].flux=(*v)[9]; if (!(*v)[10].is_null()) olist[counter].ra=(*v)[10]; if (!(*v)[11].is_null()) olist[counter].dec=(*v)[11]; // time_duration cannot be used, as only up to 99 hours are handeled boost::posix_time::time_duration duration = period.length(); /* if (olist[counter].stoptime < olist[counter].starttime+mintime) cout << " ====> WARN: Observation too short. " << endl; if (olist[counter].starttime.is_not_a_date_time()) cout << " WARN: starttime not a date_time. " << endl; else cout << " start: " << Time::sql << olist[counter].starttime << endl; if (olist[counter].stoptime.is_not_a_date_time()) cout << " WARN: stoptime not a date_time. " << endl; else cout << " stop: " << Time::sql << olist[counter].stoptime << endl; if (!(olist[counter].starttime.is_not_a_date_time() || olist[counter].stoptime.is_not_a_date_time())) cout << " diff: " << period << endl; if (olist[counter].stoptime < olist[counter].starttime) cout << " ====> WARN: stop time (" << olist[counter].stoptime << ") < start time (" << olist[counter].starttime << "). " << endl; cout << "diff: " << duration << flush; cout << "dur_db: " << olist[counter].duration_db << endl; */ // always filled: obstype // // fixed observations: // filled: starttime, stoptime // not filled: fluxweight // maybe filled: obsmode, telsetup, source (not filled for FixedSlotObs) // maybe filled: duration (filled for FloatingObs and FloatingSplittedObs) // maybe filled: splitflag (filled for FloatingSplittedObs) // // std observations: // filled: fluxweight, telsetup, obsmore, source // not filled: starttime, stoptime, duration // fixed observations if (!(olist[counter].stoptime.is_not_a_date_time() && olist[counter].starttime.is_not_a_date_time()) && olist[counter].fluxweight==0 ) { obsfixedlist.resize(counter2+1); obsfixedlist[counter2].obsfixedstart=olist[counter].starttime; obsfixedlist[counter2].obsfixedstop=olist[counter].stoptime; obsfixedlist[counter2].sourcename_fixed=olist[counter].sourcename; obsfixedlist[counter2].obskey_fixed=olist[counter].obskey; obsfixedlist[counter2].obstype_fixed=olist[counter].obstype; obsfixedlist[counter2].obsmode_fixed=olist[counter].obsmode; obsfixedlist[counter2].telsetup_fixed=olist[counter].telsetup; obsfixedlist[counter2].sourcekey_fixed=olist[counter].sourcekey; obsfixedlist[counter2].ra_fixed=olist[counter].ra; obsfixedlist[counter2].dec_fixed=olist[counter].dec; counter2++; } // std obs if (olist[counter].stoptime.is_not_a_date_time() && olist[counter].starttime.is_not_a_date_time() && olist[counter].fluxweight>0 ) { obsstdlist.resize(counter3+1); obsstdlist[counter3].sourcename_std=olist[counter].sourcename; obsstdlist[counter3].obskey_std=olist[counter].obskey; obsstdlist[counter3].obsmode_std=olist[counter].obsmode; obsstdlist[counter3].obstype_std=olist[counter].obstype; obsstdlist[counter3].telsetup_std=olist[counter].telsetup; obsstdlist[counter3].sourcekey_std=olist[counter].sourcekey; obsstdlist[counter3].fluxweight_std=olist[counter].fluxweight; obsstdlist[counter3].flux_std=olist[counter].flux; obsstdlist[counter3].slope_std=olist[counter].slope; obsstdlist[counter3].ra_std=olist[counter].ra; obsstdlist[counter3].dec_std=olist[counter].dec; counter3++; } counter++; } ostringstream fixedobsmsg; fixedobsmsg << obsfixedlist.size() << " fixed observations found. "; T::Message(fixedobsmsg); cout << obsfixedlist.size() << " fixed observations found. " << endl; ostringstream stdobsmsg; stdobsmsg << obsstdlist.size() << " standard observations found. "; T::Message(stdobsmsg); cout << obsstdlist.size() << " standard observations found. " << endl; // loop to add the fixed observations to the ScheduledObs list // performed checks: // * overlap of fixed observations: the overlap is split half-half // * check for scheduling time range: only take into account fixed obs within the range // missing checks and evaluation // * check for mintime (pb with overlap checks) // * check for sun // * check for moon counter2=0; int skipcounter=0; boost::posix_time::ptime finalobsfixedstart; boost::posix_time::ptime finalobsfixedstop; boost::posix_time::time_duration delta1; boost::posix_time::time_duration delta2; boost::posix_time::time_duration delta0(0,0,0); struct vector::iterator vobs; struct vector::iterator vobs5; cout << "Fixed Observations: " << endl; for (vobs=obsfixedlist.begin(); vobs!=obsfixedlist.end(); vobs++) { if (obsfixedlist[counter2].obsfixedstart < startsched || obsfixedlist[counter2].obsfixedstop > stopsched) { ostringstream skipfixedobsmsg; skipfixedobsmsg << "Skip 1 fixed observation (obskey " << obsfixedlist[counter2].obskey_fixed << ") as it is out of scheduling time range."; T::Message(skipfixedobsmsg); counter2++; skipcounter++; continue; } counter3=0; delta1=delta0; delta2=delta0; finalobsfixedstart=obsfixedlist[counter2].obsfixedstart; finalobsfixedstop=obsfixedlist[counter2].obsfixedstop; for (vobs5=obsfixedlist.begin(); vobs5!=obsfixedlist.end(); vobs5++) { if ((*vobs5).obsfixedstart < obsfixedlist[counter2].obsfixedstop && obsfixedlist[counter2].obsfixedstop <= (*vobs5).obsfixedstop && obsfixedlist[counter2].obsfixedstart <= (*vobs5).obsfixedstart && counter2!=counter3) { delta1=(obsfixedlist[counter2].obsfixedstop-(*vobs5).obsfixedstart)/2; finalobsfixedstop=obsfixedlist[counter2].obsfixedstop-delta1; ostringstream warndelta1; warndelta1 << "Overlap between two fixed observations (" << obsfixedlist[counter2].obskey_fixed << " " << (*vobs5).obskey_fixed << "). The stoptime of " << obsfixedlist[counter2].obskey_fixed << " has been changed."; T::Warn(warndelta1); } if ((*vobs5).obsfixedstart <= obsfixedlist[counter2].obsfixedstart && obsfixedlist[counter2].obsfixedstart < (*vobs5).obsfixedstop && obsfixedlist[counter2].obsfixedstop >= (*vobs5).obsfixedstop && counter2!=counter3) { delta2=((*vobs5).obsfixedstop-obsfixedlist[counter2].obsfixedstart)/2; finalobsfixedstart=obsfixedlist[counter2].obsfixedstart+delta2; ostringstream warndelta2; warndelta2 << "Overlap between two fixed observations (" << obsfixedlist[counter2].obskey_fixed << " " << (*vobs5).obskey_fixed << "). The starttime of " << obsfixedlist[counter2].obskey_fixed << " has been changed."; T::Warn(warndelta2); } counter3++; } int num=counter2-skipcounter; obslist.resize(num+1); obslist[num].obsstart=finalobsfixedstart; obslist[num].obsstop=finalobsfixedstop; obslist[num].sourcename_obs=obsfixedlist[counter2].sourcename_fixed; obslist[num].obsmode_obs=obsfixedlist[counter2].obsmode_fixed; obslist[num].obstype_obs=obsfixedlist[counter2].obstype_fixed; obslist[num].telsetup_obs=obsfixedlist[counter2].telsetup_fixed; obslist[num].sourcekey_obs=obsfixedlist[counter2].sourcekey_fixed; counter2++; cout << " " << (*vobs).sourcename_fixed << " " << (*vobs).obsfixedstart << flush; cout << " - " << (*vobs).obsfixedstop << endl; } ostringstream obsmsg; obsmsg << "Added " << obslist.size() << " fixed observations to ScheduledObs. "; T::Message(obsmsg); cout << "Added " << obslist.size() << " fixed observations to ScheduledObs. " << endl; for (int i=0; i<(int)obsstdlist.size(); i++) { for (int j=0; j<(int)obsstdlist.size(); j++) { if (obsstdlist[i].sourcekey_std == obsstdlist[j].sourcekey_std && i!=j) { cout << "One double sourcekey in std observations: " << obsstdlist[j].sourcekey_std << endl; ostringstream errdoublestd; errdoublestd << "One double sourcekey in std observations: " << obsstdlist[j].sourcekey_std << " (" << obsstdlist[j].sourcename_std << ")."; T::Error(errdoublestd); T::Message("Scheduling stopped."); return error ? T::kSM_Error : T::kSM_Ready; } } } // loop over nights // calculate sunset and sunrise // check if there is already scheduled obs in that night // // in this loop the standard observations shall be // checked, evaluated // the observation times shall be calculated // and the observations added to the ScheduledObs list struct vector::iterator vobs2; cout << "Standard Observations: " << endl; for (vobs2=obsstdlist.begin(); vobs2!=obsstdlist.end(); vobs2++) { cout << " " << (*vobs2).sourcename_std << endl; } // in this loop the ScheduledRuns are filled // (only data runs -> no runtype yet) // might be merged with next loop counter2=0; struct vector::iterator vobs3; for (vobs3=obslist.begin(); vobs3!=obslist.end(); vobs3++) { runlist.resize(counter2+1); runlist[counter2].runstart=obslist[counter2].obsstart; runlist[counter2].runstop=obslist[counter2].obsstop; runlist[counter2].sourcename_run=obslist[counter2].sourcename_obs; runlist[counter2].obsmode_run=obslist[counter2].obsmode_obs; runlist[counter2].obstype_run=obslist[counter2].obstype_obs; runlist[counter2].telsetup_run=obslist[counter2].telsetup_obs; runlist[counter2].sourcekey_run=obslist[counter2].sourcekey_obs; counter2++; //cout << (*vobs3).sourcename_obs << endl; } //delete old scheduled runs from the DB mysqlpp::Query query0 = conn.query("DELETE FROM ScheduledRun"); mysqlpp::SimpleResult res0 = query0.execute(); if (!res0) { cout << "MySQL query failed: " << query0.error() << endl; throw; } // in this loop the ScheduledRuns are inserted to the DB // before the runtimes are adapted according to // duration of P-Run, C-Run and repositioning counter3=0; int insertcount=0; boost::posix_time::ptime finalstarttime; boost::posix_time::ptime finalstoptime; struct vector::iterator vobs4; for (vobs4=runlist.begin(); vobs4!=runlist.end(); vobs4++) { for (int i=2; i<5; i++) { switch(i) { case 2: finalstarttime=runlist[counter3].runstart+repostime+runtimec+runtimep; finalstoptime=runlist[counter3].runstop; break; case 3: finalstarttime=runlist[counter3].runstart+repostime; finalstoptime=runlist[counter3].runstart+runtimep+repostime; break; case 4: finalstarttime=runlist[counter3].runstart+runtimep+repostime; finalstoptime=runlist[counter3].runstart+repostime+runtimep+runtimec; break; } ostringstream q1; //cout << (*vobs4).sourcename_run << endl; q1 << "INSERT ScheduledRun set fStartTime='" << Time::sql << finalstarttime; q1 << "', fStopTime='" << Time::sql << finalstoptime; q1 << "', fSourceKEY='" << (*vobs4).sourcekey_run; q1 << "', fRunTypeKEY='" << i; q1 << "', fTelescopeSetupKEY='" << (*vobs4).telsetup_run; q1 << "', fObservationTypeKEY='" << (*vobs4).obstype_run; q1 << "', fObservationModeKEY='" << (*vobs4).obsmode_run; q1 << "'"; //cout << "executing query: " << q1.str() << endl; mysqlpp::Query query1 = conn.query(q1.str()); mysqlpp::SimpleResult res1 = query1.execute(); if (!res1) { cout << "MySQL query failed: " << query1.error() << endl; throw; } insertcount++; } counter3++; } ostringstream insertmsg; insertmsg << "Inserted " << insertcount << " runs into the DB."; T::Message(insertmsg); //usleep(3000000); T::Message("Scheduling done."); fSessionId = -1; //bool error = false; return error ? T::kSM_Error : T::kSM_Ready; } /* // commit probably done by webinterface int Commit() { ostringstream str; str << "Comitting preview (id=" << fSessionId << ")"; T::Message(str); usleep(3000000); T::Message("Comitted."); fSessionId = -1; bool error = false; return error ? T::kSM_Error : T::kSM_Ready; } */ AutoScheduler(ostream &out=cout) : T(out, "SCHEDULER"), fNextIsPreview(true), fSessionId(-1), fDBName("") { AddStateName(kSM_Scheduling, "Scheduling"); //AddStateName(kSM_Comitting, "Comitting"); AddEvent(kSM_Scheduling, "SCHEDULE", T::kSM_Ready); //AddEvent(kSM_Comitting, "COMMIT", T::kSM_Ready); T::PrintListOfEvents(); } int Execute() { switch (T::GetCurrentState()) { case kSM_Scheduling: return Schedule(); //case kSM_Comitting: // return Commit(); } return T::GetCurrentState(); } int Transition(const Event &evt) { switch (evt.GetTargetState()) { case kSM_Scheduling: if (evt.GetSize()>0) fDBName = evt.GetString(); //case kSM_Comitting: //fSessionId = evt.GetInt(); break; } return evt.GetTargetState(); } int Configure(const Event &) { return T::GetCurrentState(); } bool SetConfiguration(const Configuration &conf) { fDatabase = conf.Get("schedule-database"); return true; } }; // ------------------------------------------------------------------------ void RunThread(StateMachineImp *io_service) { // This is necessary so that the StateMachien Thread can signal the // Readline to exit io_service->Run(); Readline::Stop(); } template int RunDim(Configuration &conf) { WindowLog wout; //log.SetWindow(stdscr); if (conf.Has("log")) if (!wout.OpenLogFile(conf.Get("log"))) wout << kRed << "ERROR - Couldn't open log-file " << conf.Get("log") << ": " << strerror(errno) << endl; // Start io_service.Run to use the StateMachineImp::Run() loop // Start io_service.run to only use the commandHandler command detaching AutoScheduler io_service(wout); if (!io_service.SetConfiguration(conf)) return -1; io_service.Run(); return 0; } template int RunShell(Configuration &conf) { static T shell(conf.GetName().c_str(), conf.Get("console")!=1); WindowLog &win = shell.GetStreamIn(); WindowLog &wout = shell.GetStreamOut(); if (conf.Has("log")) if (!wout.OpenLogFile(conf.Get("log"))) win << kRed << "ERROR - Couldn't open log-file " << conf.Get("log") << ": " << strerror(errno) << endl; AutoScheduler io_service(wout); if (!io_service.SetConfiguration(conf)) return -1; shell.SetReceiver(io_service); // boost::thread t(boost::bind(&AutoScheduler::Run, &io_service)); boost::thread t(boost::bind(RunThread, &io_service)); //io_service.SetReady(); shell.Run(); // Run the shell io_service.Stop(); // Signal Loop-thread to stop // io_service.Close(); // Obsolete, done by the destructor // wout << "join: " << t.timed_join(boost::posix_time::milliseconds(0)) << endl; // Wait until the StateMachine has finished its thread // before returning and destroying the dim objects which might // still be in use. t.join(); return 0; } void SetupConfiguration(Configuration &conf) { const string n = conf.GetName()+".log"; //po::options_description config("Program options"); po::options_description config("Configuration"); config.add_options() ("dns", var("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)") ("log,l", var(n), "Write log-file") ("no-dim,d", po_switch(), "Disable dim services") ("console,c", var(), "Use console (0=shell, 1=simple buffered, X=simple unbuffered)") ("schedule-database", var() #if BOOST_VERSION >= 104200 ->required() #endif , "Database link as in\n\t[user:[password]@][server][:port][/database]\nOverwrites options from the default configuration file.") ("mintime", var(), "minimum observation time") ; po::positional_options_description p; p.add("schedule-database", 1); // The first positional options conf.AddEnv("dns", "DIM_DNS_NODE"); conf.AddOptions(config); conf.SetArgumentPositions(p); } void PrintUsage() { cout << "The scheduler... TEXT MISSING\n" "\n" "The default is that the program is started without user intercation. " "All actions are supposed to arrive as DimCommands. Using the -c " "option, a local shell can be initialized. With h or help a short " "help message about the usuage can be brought to the screen.\n" "\n" "Usage: scheduler [-c type] [OPTIONS] \n" " or: scheduler [OPTIONS] \n"; cout << endl; } void PrintHelp() { /* Additional help text which is printed after the configuration options goes here */ } int main(int argc, const char* argv[]) { Configuration conf(argv[0]); conf.SetPrintUsage(PrintUsage); SetupConfiguration(conf); po::variables_map vm; try { vm = conf.Parse(argc, argv); } catch (std::exception &e) { #if BOOST_VERSION > 104000 po::multiple_occurrences *MO = dynamic_cast(&e); if (MO) cout << "Error: " << e.what() << " of '" << MO->get_option_name() << "' option." << endl; else #endif cout << "Error: " << e.what() << endl; cout << endl; return -1; } if (conf.HasVersion() || conf.HasPrint()) return -1; if (conf.HasHelp()) { PrintHelp(); return -1; } Dim::Setup(conf.Get("dns")); // try { // No console access at all if (!conf.Has("console")) { if (conf.Get("no-dim")) return RunDim(conf); else return RunDim(conf); } // Cosole access w/ and w/o Dim if (conf.Get("no-dim")) { if (conf.Get("console")==0) return RunShell(conf); else return RunShell(conf); } else { if (conf.Get("console")==0) return RunShell(conf); else return RunShell(conf); } } /* catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; }*/ return 0; }