#include #include #if BOOST_VERSION < 104400 #if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4)) #undef BOOST_HAS_RVALUE_REFS #endif #endif #include #include #include #include "Dim.h" #include "Time.h" #include "Event.h" #include "Shell.h" #include "StateMachineDim.h" #include "Connection.h" #include "Configuration.h" #include "Timers.h" #include "Console.h" #include "Converter.h" #include "LocalControl.h" #include "tools.h" using namespace std; using namespace boost::gregorian; using namespace boost::posix_time; // things to be done/checked/changed // * do not use --database, but sth like --database-scheduler // to be independent of the configuration database // * move definition of config parameters to AutoScheduler class // + read in from config // other things to do // // define what to transmit as info/warn/error // config parameters: // mintime // runtimec // runtimep // repostime // missing: // // calculate time for std obs // calculate sun set/rise // // return errors and other otherput from sendcommand to webinterface // in which cases should the scheduler go in error state? // when db is unavailable // does one also need a 'set scheduler to ready' function then? // do we want any error state at all? // ========================================================================= template class AutoScheduler : public T { bool fNextIsPreview; public: enum states_t { kSM_Scheduling=1, kSM_Comitting, }; struct ObservationParameters { int obskey; int obsmode; int obstype; int splitflag; int telsetup; float fluxweight; float slope; float flux; float ra; float dec; ptime start; ptime stop; time_duration duration_db; string sourcename; int sourcekey; }; struct FixedObs { int obskey; int sourcekey; string sourcename; int obsmode; int obstype; int telsetup; float ra; float dec; ptime start; ptime stop; }; // will need other types of obs // FloatingObs (duration < stop-start + splitflag no) // FloatingSplittedObs (duration < stop-start + splitflag yes) // FixedSlot, i.e. just block a time slot struct StdObs { int obskey_std; int sourcekey_std; string sourcename_std; int obsmode_std; int obstype_std; int telsetup_std; float fluxweight_std; float slope_std; float flux_std; float ra_std; float dec_std; ptime obsstdstart; ptime obsstdstop; }; struct ScheduledObs { int sourcekey_obs; string sourcename_obs; int obsmode_obs; int obstype_obs; int telsetup_obs; ptime obsstart; ptime obsstop; }; struct ScheduledRun { //int runnumber; // to be seen, if runnumber is needed int runtype; int sourcekey_run; string sourcename_run;//for convenience int obsmode_run; int obstype_run; int telsetup_run; ptime runstart; ptime runstop; }; int fSessionId; string fDatabase; string fDBName; int Schedule() { bool error = false; time_duration runtimec(0, 3, 0); time_duration runtimep(0, 3, 0); time_duration mintime(1, 0, 0); time_duration repostime(0, 5, 0); //ptime startsched=microsec_clock::local_time(); const ptime startsched(microsec_clock::local_time()); const ptime stopsched=startsched+years(1); ostringstream str; str << "Scheduling for the period from " << startsched << " to " << stopsched; T::Message(str); static const boost::regex expr("(([[:word:].-]+)(:(.+))?@)?([[:word:].-]+)(:([[:digit:]]+))?(/([[:word:].-]+))"); // 2: user // 4: pass // 5: server // 7: port // 9: db boost::smatch what; if (!boost::regex_match(fDatabase, what, expr, boost::match_extra)) { ostringstream msg; msg << "Regex to parse database '" << fDatabase << "' empty."; T::Error(msg); return T::kSM_Error; } if (what.size()!=10) { ostringstream msg; msg << "Parsing database name failed: '" << fDatabase << "'"; T::Error(msg); return T::kSM_Error; } cout << (string)what[9] << " | " << fDBName << " | " << endl; const string user = what[2]; const string passwd = what[4]; const string server = what[5]; const string db = fDBName.empty() ? what[9] : fDBName; const int port = atoi(string(what[7]).c_str()); ostringstream dbnamemsg; dbnamemsg << "Scheduling started -> using database " << fDBName << "."; T::Message(dbnamemsg); str.str(""); str << "Connecting to '"; if (!user.empty()) str << user << "@"; str << server; if (port) str << ":" << port; if (!db.empty()) str << "/" << db; str << "'"; T::Info(str); mysqlpp::Connection conn(db.c_str(), server.c_str(), user.c_str(), passwd.c_str(), port); /* throws exceptions if (!conn.connected()) { ostringstream msg; msg << "MySQL connection error: " << conn.error(); T::Error(msg); return T::kSM_Error; }*/ // get observation parameters from DB // maybe order by priority? const mysqlpp::StoreQueryResult res = conn.query("SELECT fObservationKEY, fStartTime, fStopTime, fDuration, fSourceName, fSourceKEY, fSplitFlag, fFluxWeight, fSlope, fFlux, fRightAscension, fDeclination, fObservationModeKEY, fObservationTypeKEY , fTelescopeSetupKEY FROM ObservationParameters LEFT JOIN Source USING(fSourceKEY) ORDER BY fStartTime").store(); // FIXME: Maybe we have to check for a successfull // query but an empty result /* thorws exceptions? if (!res) { ostringstream msg; msg << "MySQL query failed: " << query.error(); T::Error(msg); return T::kSM_Error; }*/ str.str(""); str << "Found " << res.num_rows() << " Observation Parameter sets."; T::Debug(str); ObservationParameters olist[res.num_rows()]; vector obsfixedlist; vector obsstdlist; vector obslist; vector runlist; // loop over observation parameters from DB // fill these parameters into FixedObs and StdObs int counter=0; int counter2=0; int counter3=0; cout << "Obs: (, ) from to " << endl; for (vector::const_iterator v=res.begin(); v("mintime"), 0); t1 >> Time::sql >> olist[counter].start; t2 >> Time::sql >> olist[counter].stop; t3 >> olist[counter].duration_db; const time_period period(olist[counter].start, olist[counter].stop); olist[counter].sourcename=(*v)[4].c_str(); olist[counter].sourcekey=(*v)[5]; if (!(*v)[0].is_null()) olist[counter].obskey=(*v)[0]; if (!(*v)[12].is_null()) olist[counter].obsmode=(*v)[12]; if (!(*v)[13].is_null()) olist[counter].obstype=(*v)[13]; if (!(*v)[14].is_null()) olist[counter].telsetup=(*v)[14]; if (!(*v)[6].is_null()) olist[counter].splitflag=(*v)[6]; if (!(*v)[7].is_null()) olist[counter].fluxweight=(*v)[7]; else olist[counter].fluxweight=0;//set fluxweight to 0 for check below if (!(*v)[8].is_null()) olist[counter].slope=(*v)[8]; if (!(*v)[9].is_null()) olist[counter].flux=(*v)[9]; if (!(*v)[10].is_null()) olist[counter].ra=(*v)[10]; if (!(*v)[11].is_null()) olist[counter].dec=(*v)[11]; // time_duration cannot be used, as only up to 99 hours are handeled const time_duration duration = period.length(); /* if (olist[counter].stoptime < olist[counter].starttime+mintime) cout << " ====> WARN: Observation too short. " << endl; if (olist[counter].starttime.is_not_a_date_time()) cout << " WARN: starttime not a date_time. " << endl; else cout << " start: " << Time::sql << olist[counter].starttime << endl; if (olist[counter].stoptime.is_not_a_date_time()) cout << " WARN: stoptime not a date_time. " << endl; else cout << " stop: " << Time::sql << olist[counter].stoptime << endl; if (!(olist[counter].starttime.is_not_a_date_time() || olist[counter].stoptime.is_not_a_date_time())) cout << " diff: " << period << endl; if (olist[counter].stoptime < olist[counter].starttime) cout << " ====> WARN: stop time (" << olist[counter].stoptime << ") < start time (" << olist[counter].starttime << "). " << endl; cout << "diff: " << duration << flush; cout << "dur_db: " << olist[counter].duration_db << endl; */ // always filled: obstype // // fixed observations: // filled: starttime, stoptime // not filled: fluxweight // maybe filled: obsmode, telsetup, source (not filled for FixedSlotObs) // maybe filled: duration (filled for FloatingObs and FloatingSplittedObs) // maybe filled: splitflag (filled for FloatingSplittedObs) // // std observations: // filled: fluxweight, telsetup, obsmore, source // not filled: starttime, stoptime, duration // fixed observations if (!(olist[counter].stop.is_not_a_date_time() && olist[counter].start.is_not_a_date_time()) && olist[counter].fluxweight==0 ) { obsfixedlist.resize(counter2+1); obsfixedlist[counter2].start=olist[counter].start; obsfixedlist[counter2].stop=olist[counter].stop; obsfixedlist[counter2].sourcename=olist[counter].sourcename; obsfixedlist[counter2].obskey=olist[counter].obskey; obsfixedlist[counter2].obstype=olist[counter].obstype; obsfixedlist[counter2].obsmode=olist[counter].obsmode; obsfixedlist[counter2].telsetup=olist[counter].telsetup; obsfixedlist[counter2].sourcekey=olist[counter].sourcekey; obsfixedlist[counter2].ra=olist[counter].ra; obsfixedlist[counter2].dec=olist[counter].dec; counter2++; } // std obs if (olist[counter].stop.is_not_a_date_time() && olist[counter].start.is_not_a_date_time() && olist[counter].fluxweight>0 ) { obsstdlist.resize(counter3+1); obsstdlist[counter3].sourcename_std=olist[counter].sourcename; obsstdlist[counter3].obskey_std=olist[counter].obskey; obsstdlist[counter3].obsmode_std=olist[counter].obsmode; obsstdlist[counter3].obstype_std=olist[counter].obstype; obsstdlist[counter3].telsetup_std=olist[counter].telsetup; obsstdlist[counter3].sourcekey_std=olist[counter].sourcekey; obsstdlist[counter3].fluxweight_std=olist[counter].fluxweight; obsstdlist[counter3].flux_std=olist[counter].flux; obsstdlist[counter3].slope_std=olist[counter].slope; obsstdlist[counter3].ra_std=olist[counter].ra; obsstdlist[counter3].dec_std=olist[counter].dec; counter3++; } counter++; } ostringstream fixedobsmsg; fixedobsmsg << obsfixedlist.size() << " fixed observations found. "; T::Message(fixedobsmsg); cout << obsfixedlist.size() << " fixed observations found. " << endl; ostringstream stdobsmsg; stdobsmsg << obsstdlist.size() << " standard observations found. "; T::Message(stdobsmsg); cout << obsstdlist.size() << " standard observations found. " << endl; // loop to add the fixed observations to the ScheduledObs list // performed checks: // * overlap of fixed observations: the overlap is split half-half // * check for scheduling time range: only take into account fixed obs within the range // missing checks and evaluation // * check for mintime (pb with overlap checks) // * check for sun // * check for moon counter2=0; int skipcounter=0; ptime finalobsfixedstart; ptime finalobsfixedstop; time_duration delta0(0,0,0); cout << "Fixed Observations: " << endl; for (struct vector::const_iterator vobs=obsfixedlist.begin(); vobs!=obsfixedlist.end(); vobs++) { if (obsfixedlist[counter2].start < startsched || obsfixedlist[counter2].stop > stopsched) { ostringstream skipfixedobsmsg; skipfixedobsmsg << "Skip 1 fixed observation (obskey "; skipfixedobsmsg << obsfixedlist[counter2].obskey; skipfixedobsmsg << ") as it is out of scheduling time range."; T::Message(skipfixedobsmsg); counter2++; skipcounter++; continue; } counter3=0; time_duration delta1=delta0; time_duration delta2=delta0; finalobsfixedstart=obsfixedlist[counter2].start; finalobsfixedstop=obsfixedlist[counter2].stop; for (struct vector::const_iterator vobs5=obsfixedlist.begin(); vobs5!=obsfixedlist.end(); vobs5++) { if (vobs5->start < obsfixedlist[counter2].stop && obsfixedlist[counter2].stop <= vobs5->stop && obsfixedlist[counter2].start <= vobs5->start && counter2!=counter3) { delta1=(obsfixedlist[counter2].stop-vobs5->start)/2; finalobsfixedstop=obsfixedlist[counter2].stop-delta1; ostringstream warndelta1; warndelta1 << "Overlap between two fixed observations ("; warndelta1 << obsfixedlist[counter2].obskey << " "; warndelta1 << vobs5->obskey << "). The stoptime of "; warndelta1 << obsfixedlist[counter2].obskey << " has been changed."; T::Warn(warndelta1); } if (vobs5->start <= obsfixedlist[counter2].start && obsfixedlist[counter2].start < vobs5->stop && obsfixedlist[counter2].stop >= vobs5->stop && counter2!=counter3) { delta2=(vobs5->stop-obsfixedlist[counter2].start)/2; finalobsfixedstart=obsfixedlist[counter2].start+delta2; ostringstream warndelta2; warndelta2 << "Overlap between two fixed observations ("; warndelta2 << obsfixedlist[counter2].obskey << " "; warndelta2 << vobs5->obskey << "). The starttime of "; warndelta2 << obsfixedlist[counter2].obskey << " has been changed."; T::Warn(warndelta2); } counter3++; } const int num=counter2-skipcounter; obslist.resize(num+1); obslist[num].obsstart=finalobsfixedstart; obslist[num].obsstop=finalobsfixedstop; obslist[num].sourcename_obs=obsfixedlist[counter2].sourcename; obslist[num].obsmode_obs=obsfixedlist[counter2].obsmode; obslist[num].obstype_obs=obsfixedlist[counter2].obstype; obslist[num].telsetup_obs=obsfixedlist[counter2].telsetup; obslist[num].sourcekey_obs=obsfixedlist[counter2].sourcekey; counter2++; cout << " " << vobs->sourcename << " " << vobs->start; cout << " - " << vobs->stop << endl; } ostringstream obsmsg; obsmsg << "Added " << obslist.size() << " fixed observations to ScheduledObs. "; T::Message(obsmsg); cout << "Added " << obslist.size() << " fixed observations to ScheduledObs. " << endl; for (int i=0; i<(int)obsstdlist.size(); i++) { for (int j=0; j<(int)obsstdlist.size(); j++) { if (obsstdlist[i].sourcekey_std == obsstdlist[j].sourcekey_std && i!=j) { cout << "One double sourcekey in std observations: " << obsstdlist[j].sourcekey_std << endl; ostringstream errdoublestd; errdoublestd << "One double sourcekey in std observations: " << obsstdlist[j].sourcekey_std << " (" << obsstdlist[j].sourcename_std << ")."; T::Error(errdoublestd); T::Message("Scheduling stopped."); return error ? T::kSM_Error : T::kSM_Ready; } } } // loop over nights // calculate sunset and sunrise // check if there is already scheduled obs in that night // // in this loop the standard observations shall be // checked, evaluated // the observation times shall be calculated // and the observations added to the ScheduledObs list cout << "Standard Observations: " << endl; for (struct vector::const_iterator vobs2=obsstdlist.begin(); vobs2!=obsstdlist.end(); vobs2++) { cout << " " << vobs2->sourcename_std << endl; } // in this loop the ScheduledRuns are filled // (only data runs -> no runtype yet) // might be merged with next loop counter2=0; for (struct vector::const_iterator vobs3=obslist.begin(); vobs3!=obslist.end(); vobs3++) { runlist.resize(counter2+1); runlist[counter2].runstart=obslist[counter2].obsstart; runlist[counter2].runstop=obslist[counter2].obsstop; runlist[counter2].sourcename_run=obslist[counter2].sourcename_obs; runlist[counter2].obsmode_run=obslist[counter2].obsmode_obs; runlist[counter2].obstype_run=obslist[counter2].obstype_obs; runlist[counter2].telsetup_run=obslist[counter2].telsetup_obs; runlist[counter2].sourcekey_run=obslist[counter2].sourcekey_obs; counter2++; //cout << (*vobs3).sourcename_obs << endl; } //delete old scheduled runs from the DB const mysqlpp::SimpleResult res0 = conn.query("DELETE FROM ScheduledRun").execute(); // FIXME: Maybe we have to check for a successfull // query but an empty result /* throws exceptions if (!res0) { ostringstream msg; msg << "MySQL query failed: " << query0.error(); T::Error(msg); return T::kSM_Error; }*/ // in this loop the ScheduledRuns are inserted to the DB // before the runtimes are adapted according to // duration of P-Run, C-Run and repositioning counter3=0; int insertcount=0; ptime finalstarttime; ptime finalstoptime; for (struct vector::const_iterator vobs4=runlist.begin(); vobs4!=runlist.end(); vobs4++) { for (int i=2; i<5; i++) { switch(i) { case 2: finalstarttime=runlist[counter3].runstart+repostime+runtimec+runtimep; finalstoptime=runlist[counter3].runstop; break; case 3: finalstarttime=runlist[counter3].runstart+repostime; finalstoptime=runlist[counter3].runstart+runtimep+repostime; break; case 4: finalstarttime=runlist[counter3].runstart+runtimep+repostime; finalstoptime=runlist[counter3].runstart+repostime+runtimep+runtimec; break; } ostringstream q1; //cout << (*vobs4).sourcename_run << endl; q1 << "INSERT ScheduledRun set fStartTime='" << Time::sql << finalstarttime; q1 << "', fStopTime='" << Time::sql << finalstoptime; q1 << "', fSourceKEY='" << (*vobs4).sourcekey_run; q1 << "', fRunTypeKEY='" << i; q1 << "', fTelescopeSetupKEY='" << (*vobs4).telsetup_run; q1 << "', fObservationTypeKEY='" << (*vobs4).obstype_run; q1 << "', fObservationModeKEY='" << (*vobs4).obsmode_run; q1 << "'"; //cout << "executing query: " << q1.str() << endl; const mysqlpp::SimpleResult res1 = conn.query(q1.str()).execute(); // FIXME: Maybe we have to check for a successfull // query but an empty result /* throws exceptions if (!res1) { ostringstream msg; msg << "MySQL query failed: " << query1.error(); T::Error(str); return T::kSM_Error; }*/ insertcount++; } counter3++; } ostringstream insertmsg; insertmsg << "Inserted " << insertcount << " runs into the DB."; T::Message(insertmsg); //usleep(3000000); T::Message("Scheduling done."); fSessionId = -1; return error ? T::kSM_Error : T::kSM_Ready; } /* // commit probably done by webinterface int Commit() { ostringstream str; str << "Comitting preview (id=" << fSessionId << ")"; T::Message(str); usleep(3000000); T::Message("Comitted."); fSessionId = -1; bool error = false; return error ? T::kSM_Error : T::kSM_Ready; } */ AutoScheduler(ostream &out=cout) : T(out, "SCHEDULER"), fNextIsPreview(true), fSessionId(-1), fDBName("") { AddStateName(kSM_Scheduling, "Scheduling", "Scheduling in progress."); AddEvent(kSM_Scheduling, "SCHEDULE", "C", T::kSM_Ready) ("FIXME FIXME FIXME (explanation for the command)" "|database[string]:FIXME FIXME FIMXE (meaning and format)"); AddEvent(T::kSM_Ready, "RESET", T::kSM_Error) ("Reset command to get out of the error state"); //AddEvent(kSM_Comitting, "COMMIT", T::kSM_Ready); T::PrintListOfEvents(); } int Execute() { switch (T::GetCurrentState()) { case kSM_Scheduling: try { return Schedule(); } catch (const mysqlpp::Exception &e) { T::Error(string("MySQL: ")+e.what()); return T::kSM_Error; } // This does an autmatic reset (FOR TESTING ONLY) case T::kSM_Error: return T::kSM_Ready; //case kSM_Comitting: // return Commit(); } return T::GetCurrentState(); } int Transition(const Event &evt) { switch (evt.GetTargetState()) { case kSM_Scheduling: if (evt.GetSize()>0) fDBName = evt.GetString(); //case kSM_Comitting: //fSessionId = evt.GetInt(); break; } return evt.GetTargetState(); } int Configure(const Event &) { return T::GetCurrentState(); } bool SetConfiguration(const Configuration &conf) { fDatabase = conf.Get("schedule-database"); return true; } }; // ------------------------------------------------------------------------ void RunThread(StateMachineImp *io_service) { // This is necessary so that the StateMachien Thread can signal the // Readline to exit io_service->Run(); Readline::Stop(); } template int RunDim(Configuration &conf) { WindowLog wout; ReadlineColor::PrintBootMsg(wout, conf.GetName(), false); //log.SetWindow(stdscr); if (conf.Has("log")) if (!wout.OpenLogFile(conf.Get("log"))) wout << kRed << "ERROR - Couldn't open log-file " << conf.Get("log") << ": " << strerror(errno) << endl; // Start io_service.Run to use the StateMachineImp::Run() loop // Start io_service.run to only use the commandHandler command detaching AutoScheduler io_service(wout); if (!io_service.SetConfiguration(conf)) return -1; io_service.Run(); return 0; } template int RunShell(Configuration &conf) { static T shell(conf.GetName().c_str(), conf.Get("console")!=1); WindowLog &win = shell.GetStreamIn(); WindowLog &wout = shell.GetStreamOut(); if (conf.Has("log")) if (!wout.OpenLogFile(conf.Get("log"))) win << kRed << "ERROR - Couldn't open log-file " << conf.Get("log") << ": " << strerror(errno) << endl; AutoScheduler io_service(wout); if (!io_service.SetConfiguration(conf)) return -1; shell.SetReceiver(io_service); // boost::thread t(boost::bind(&AutoScheduler::Run, &io_service)); boost::thread t(boost::bind(RunThread, &io_service)); //io_service.SetReady(); shell.Run(); // Run the shell io_service.Stop(); // Signal Loop-thread to stop // io_service.Close(); // Obsolete, done by the destructor // wout << "join: " << t.timed_join(boost::posix_time::milliseconds(0)) << endl; // Wait until the StateMachine has finished its thread // before returning and destroying the dim objects which might // still be in use. t.join(); return 0; } void SetupConfiguration(Configuration &conf) { const string n = conf.GetName()+".log"; //po::options_description config("Program options"); po::options_description config("Configuration"); config.add_options() ("dns", var("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)") ("log,l", var(n), "Write log-file") ("no-dim,d", po_switch(), "Disable dim services") ("console,c", var(), "Use console (0=shell, 1=simple buffered, X=simple unbuffered)") ("schedule-database", var() #if BOOST_VERSION >= 104200 ->required() #endif , "Database link as in\n\t[user:[password]@][server][:port][/database]\nOverwrites options from the default configuration file.") ("mintime", var(), "minimum observation time") ; po::positional_options_description p; p.add("schedule-database", 1); // The first positional options conf.AddEnv("dns", "DIM_DNS_NODE"); conf.AddOptions(config); conf.SetArgumentPositions(p); } void PrintUsage() { cout << "The scheduler... TEXT MISSING\n" "\n" "The default is that the program is started without user intercation. " "All actions are supposed to arrive as DimCommands. Using the -c " "option, a local shell can be initialized. With h or help a short " "help message about the usuage can be brought to the screen.\n" "\n" "Usage: scheduler [-c type] [OPTIONS] \n" " or: scheduler [OPTIONS] \n"; cout << endl; } void PrintHelp() { /* Additional help text which is printed after the configuration options goes here */ } int main(int argc, const char* argv[]) { Configuration conf(argv[0]); conf.SetPrintUsage(PrintUsage); SetupConfiguration(conf); po::variables_map vm; try { vm = conf.Parse(argc, argv); } #if BOOST_VERSION > 104000 catch (po::multiple_occurrences &e) { cerr << "Program options invalid due to: " << e.what() << " of '" << e.get_option_name() << "'." << endl; return -1; } #endif catch (exception& e) { cerr << "Program options invalid due to: " << e.what() << endl; return -1; } if (conf.HasVersion() || conf.HasPrint()) return -1; if (conf.HasHelp()) { PrintHelp(); return -1; } Dim::Setup(conf.Get("dns")); // try { // No console access at all if (!conf.Has("console")) { if (conf.Get("no-dim")) return RunDim(conf); else return RunDim(conf); } // Cosole access w/ and w/o Dim if (conf.Get("no-dim")) { if (conf.Get("console")==0) return RunShell(conf); else return RunShell(conf); } else { if (conf.Get("console")==0) return RunShell(conf); else return RunShell(conf); } } /* catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; }*/ return 0; }