Changeset 11019
- Timestamp:
- 06/14/11 18:04:59 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/scheduler.cc
r10997 r11019 9 9 #include <boost/thread.hpp> 10 10 #include <boost/regex.hpp> 11 #include <boost/asio/deadline_timer.hpp>12 #include <boost/date_time/posix_time/posix_time.hpp>13 11 14 12 #include <mysql++/mysql++.h> … … 28 26 #include "tools.h" 29 27 30 namespace ba = boost::asio;31 namespace bs = boost::system;32 33 using ba::deadline_timer;34 using ba::ip::tcp;35 36 28 using namespace std; 37 29 using namespace boost::gregorian; … … 41 33 // * do not use --database, but sth like --database-scheduler 42 34 // to be independent of the configuration database 43 // * check if the following include is really needed44 // #include <boost/date_time/posix_time/posix_time.hpp>45 // * string database not as global variable46 // pass reference to conf and the string as a member of the class47 // (see chat of 4.5.2011)48 35 // * move definition of config parameters to AutoScheduler class 49 36 // + read in from config … … 98 85 float ra; 99 86 float dec; 100 boost::posix_time::ptime starttime;101 boost::posix_time::ptime stoptime;102 boost::posix_time::time_duration duration_db;87 ptime start; 88 ptime stop; 89 time_duration duration_db; 103 90 string sourcename; 104 91 int sourcekey; … … 107 94 struct FixedObs 108 95 { 109 int obskey _fixed;110 int sourcekey _fixed;111 string sourcename _fixed;112 int obsmode _fixed;113 int obstype _fixed;114 int telsetup _fixed;115 float ra _fixed;116 float dec _fixed;117 boost::posix_time::ptime obsfixedstart;118 boost::posix_time::ptime obsfixedstop;96 int obskey; 97 int sourcekey; 98 string sourcename; 99 int obsmode; 100 int obstype; 101 int telsetup; 102 float ra; 103 float dec; 104 ptime start; 105 ptime stop; 119 106 }; 120 107 … … 137 124 float ra_std; 138 125 float dec_std; 139 boost::posix_time::ptime obsstdstart;140 boost::posix_time::ptime obsstdstop;126 ptime obsstdstart; 127 ptime obsstdstop; 141 128 }; 142 129 … … 148 135 int obstype_obs; 149 136 int telsetup_obs; 150 boost::posix_time::ptime obsstart;151 boost::posix_time::ptime obsstop;137 ptime obsstart; 138 ptime obsstop; 152 139 }; 153 140 … … 161 148 int obstype_run; 162 149 int telsetup_run; 163 boost::posix_time::ptime runstart;164 boost::posix_time::ptime runstop;150 ptime runstart; 151 ptime runstop; 165 152 }; 166 153 … … 175 162 bool error = false; 176 163 177 boost::posix_time::time_duration runtimec(0, 3, 0); 178 boost::posix_time::time_duration runtimep(0, 3, 0); 179 boost::posix_time::time_duration mintime(1, 0, 0); 180 boost::posix_time::time_duration repostime(0, 5, 0); 181 //boost::posix_time::ptime startsched=microsec_clock::local_time(); 182 boost::posix_time::ptime startsched(microsec_clock::local_time()); 183 boost::posix_time::ptime stopsched=startsched+years(1); 184 cout << "Scheduling for the period from " << startsched << " to " << stopsched << endl; 164 time_duration runtimec(0, 3, 0); 165 time_duration runtimep(0, 3, 0); 166 time_duration mintime(1, 0, 0); 167 time_duration repostime(0, 5, 0); 168 //ptime startsched=microsec_clock::local_time(); 169 ptime startsched(microsec_clock::local_time()); 170 ptime stopsched=startsched+years(1); 171 172 ostringstream str; 173 str << "Scheduling for the period from " << startsched << " to " << stopsched; 174 T::Message(str); 185 175 186 176 static const boost::regex expr("(([[:word:].-]+)(:(.+))?@)?([[:word:].-]+)(:([[:digit:]]+))?(/([[:word:].-]+))"); … … 194 184 if (!boost::regex_match(fDatabase, what, expr, boost::match_extra)) 195 185 { 196 cout << "Couldn't parse '" << fDatabase << "'." << endl; 197 throw; 186 ostringstream msg; 187 msg << "Regex to parse database '" << fDatabase << "' empty."; 188 T::Error(msg); 189 return T::kSM_Error; 198 190 } 199 191 200 192 if (what.size()!=10) 201 193 { 202 cout << "Error parsing '" << fDatabase << "'." << endl; 203 throw; 194 ostringstream msg; 195 msg << "Parsing database name failed: '" << fDatabase << "'"; 196 T::Error(msg); 197 return T::kSM_Error; 204 198 } 205 199 … … 216 210 T::Message(dbnamemsg); 217 211 218 cout << "Connecting to '"; 212 str.str(""); 213 str << "Connecting to '"; 219 214 if (!user.empty()) 220 cout<< user << "@";221 cout<< server;215 str << user << "@"; 216 str << server; 222 217 if (port) 223 cout<< ":" << port;218 str << ":" << port; 224 219 if (!db.empty()) 225 cout << "/" << db; 226 cout << "'" << endl; 220 str << "/" << db; 221 str << "'"; 222 T::Info(str); 227 223 228 224 mysqlpp::Connection conn(db.c_str(), server.c_str(), user.c_str(), passwd.c_str(), port); 225 /* throws exceptions 229 226 if (!conn.connected()) 230 227 { 231 cout << "MySQL connection error: " << conn.error() << endl; 232 throw; 233 } 228 ostringstream msg; 229 msg << "MySQL connection error: " << conn.error(); 230 T::Error(msg); 231 return T::kSM_Error; 232 }*/ 234 233 235 234 // get observation parameters from DB 236 235 // maybe order by priority? 237 mysqlpp::Query query = conn.query("SELECT fObservationKEY, fStartTime, fStopTime, fDuration, fSourceName, fSourceKEY, fSplitFlag, fFluxWeight, fSlope, fFlux, fRightAscension, fDeclination, fObservationModeKEY, fObservationTypeKEY , fTelescopeSetupKEY FROM ObservationParameters LEFT JOIN Source USING(fSourceKEY) ORDER BY fStartTime"); 238 239 mysqlpp::StoreQueryResult res = query.store(); 236 const mysqlpp::StoreQueryResult res = 237 conn.query("SELECT fObservationKEY, fStartTime, fStopTime, fDuration, fSourceName, fSourceKEY, fSplitFlag, fFluxWeight, fSlope, fFlux, fRightAscension, fDeclination, fObservationModeKEY, fObservationTypeKEY , fTelescopeSetupKEY FROM ObservationParameters LEFT JOIN Source USING(fSourceKEY) ORDER BY fStartTime").store(); 238 // FIXME: Maybe we have to check for a successfull 239 // query but an empty result 240 /* thorws exceptions? 240 241 if (!res) 241 242 { 242 cout << "MySQL query failed: " << query.error() << endl; 243 throw; 244 } 245 246 cout << "Found " << res.num_rows() << " Observation Parameter sets." << endl; 243 ostringstream msg; 244 msg << "MySQL query failed: " << query.error(); 245 T::Error(msg); 246 return T::kSM_Error; 247 }*/ 248 249 str.str(""); 250 str << "Found " << res.num_rows() << " Observation Parameter sets."; 251 T::Debug(str); 247 252 248 253 ObservationParameters olist[res.num_rows()]; 249 std::vector<FixedObs> obsfixedlist;250 std::vector<StdObs> obsstdlist;251 std::vector<ScheduledObs> obslist;252 std::vector<ScheduledRun> runlist;254 vector<FixedObs> obsfixedlist; 255 vector<StdObs> obsstdlist; 256 vector<ScheduledObs> obslist; 257 vector<ScheduledRun> runlist; 253 258 254 259 // loop over observation parameters from DB … … 258 263 int counter3=0; 259 264 cout << "Obs: <obskey> <sourcename>(<sourcekey>, <fluxweight>) from <starttime> to <stoptime>" << endl; 260 for (vector<mysqlpp::Row>:: iterator v=res.begin(); v<res.end(); v++)265 for (vector<mysqlpp::Row>::const_iterator v=res.begin(); v<res.end(); v++) 261 266 { 262 267 cout << " Obs: " << (*v)[0].c_str() << " " << (*v)[4].c_str() << "(" << (*v)[5].c_str() << flush; … … 287 292 288 293 //boost::posix_time::time_duration mintime(0,conf.Get<int>("mintime"), 0); 289 t1 >> Time::sql >> olist[counter].start time;290 t2 >> Time::sql >> olist[counter].stop time;294 t1 >> Time::sql >> olist[counter].start; 295 t2 >> Time::sql >> olist[counter].stop; 291 296 t3 >> olist[counter].duration_db; 292 boost::posix_time::time_period period(olist[counter].starttime, olist[counter].stoptime); 297 const time_period period(olist[counter].start, olist[counter].stop); 298 293 299 olist[counter].sourcename=(*v)[4].c_str(); 294 300 olist[counter].sourcekey=(*v)[5]; … … 318 324 319 325 // time_duration cannot be used, as only up to 99 hours are handeled 320 boost::posix_time::time_duration duration = period.length();326 const time_duration duration = period.length(); 321 327 322 328 /* … … 354 360 355 361 // fixed observations 356 if (!(olist[counter].stop time.is_not_a_date_time()357 && olist[counter].start time.is_not_a_date_time())362 if (!(olist[counter].stop.is_not_a_date_time() 363 && olist[counter].start.is_not_a_date_time()) 358 364 && olist[counter].fluxweight==0 359 365 ) 360 366 { 361 367 obsfixedlist.resize(counter2+1); 362 obsfixedlist[counter2]. obsfixedstart=olist[counter].starttime;363 obsfixedlist[counter2]. obsfixedstop=olist[counter].stoptime;364 obsfixedlist[counter2].sourcename _fixed=olist[counter].sourcename;365 obsfixedlist[counter2].obskey _fixed=olist[counter].obskey;366 obsfixedlist[counter2].obstype _fixed=olist[counter].obstype;367 obsfixedlist[counter2].obsmode _fixed=olist[counter].obsmode;368 obsfixedlist[counter2].telsetup _fixed=olist[counter].telsetup;369 obsfixedlist[counter2].sourcekey _fixed=olist[counter].sourcekey;370 obsfixedlist[counter2].ra _fixed=olist[counter].ra;371 obsfixedlist[counter2].dec _fixed=olist[counter].dec;368 obsfixedlist[counter2].start=olist[counter].start; 369 obsfixedlist[counter2].stop=olist[counter].stop; 370 obsfixedlist[counter2].sourcename=olist[counter].sourcename; 371 obsfixedlist[counter2].obskey=olist[counter].obskey; 372 obsfixedlist[counter2].obstype=olist[counter].obstype; 373 obsfixedlist[counter2].obsmode=olist[counter].obsmode; 374 obsfixedlist[counter2].telsetup=olist[counter].telsetup; 375 obsfixedlist[counter2].sourcekey=olist[counter].sourcekey; 376 obsfixedlist[counter2].ra=olist[counter].ra; 377 obsfixedlist[counter2].dec=olist[counter].dec; 372 378 counter2++; 373 379 } 374 380 375 381 // std obs 376 if (olist[counter].stop time.is_not_a_date_time()377 && olist[counter].start time.is_not_a_date_time()382 if (olist[counter].stop.is_not_a_date_time() 383 && olist[counter].start.is_not_a_date_time() 378 384 && olist[counter].fluxweight>0 379 385 ) … … 416 422 counter2=0; 417 423 int skipcounter=0; 418 boost::posix_time::ptime finalobsfixedstart; 419 boost::posix_time::ptime finalobsfixedstop; 420 boost::posix_time::time_duration delta1; 421 boost::posix_time::time_duration delta2; 422 boost::posix_time::time_duration delta0(0,0,0); 423 struct vector<FixedObs>::iterator vobs; 424 struct vector<FixedObs>::iterator vobs5; 424 ptime finalobsfixedstart; 425 ptime finalobsfixedstop; 426 time_duration delta0(0,0,0); 427 425 428 cout << "Fixed Observations: " << endl; 426 for ( vobs=obsfixedlist.begin(); vobs!=obsfixedlist.end(); vobs++)427 { 428 if (obsfixedlist[counter2]. obsfixedstart < startsched429 || obsfixedlist[counter2]. obsfixedstop > stopsched)429 for (struct vector<FixedObs>::const_iterator vobs=obsfixedlist.begin(); vobs!=obsfixedlist.end(); vobs++) 430 { 431 if (obsfixedlist[counter2].start < startsched 432 || obsfixedlist[counter2].stop > stopsched) 430 433 { 431 434 ostringstream skipfixedobsmsg; 432 skipfixedobsmsg << "Skip 1 fixed observation (obskey " << obsfixedlist[counter2].obskey_fixed << ") as it is out of scheduling time range."; 435 skipfixedobsmsg << "Skip 1 fixed observation (obskey "; 436 skipfixedobsmsg << obsfixedlist[counter2].obskey; 437 skipfixedobsmsg << ") as it is out of scheduling time range."; 433 438 T::Message(skipfixedobsmsg); 439 434 440 counter2++; 435 441 skipcounter++; … … 437 443 } 438 444 counter3=0; 439 delta1=delta0; 440 delta2=delta0; 441 finalobsfixedstart=obsfixedlist[counter2].obsfixedstart; 442 finalobsfixedstop=obsfixedlist[counter2].obsfixedstop; 443 444 for (vobs5=obsfixedlist.begin(); vobs5!=obsfixedlist.end(); vobs5++) 445 446 time_duration delta1=delta0; 447 time_duration delta2=delta0; 448 449 finalobsfixedstart=obsfixedlist[counter2].start; 450 finalobsfixedstop=obsfixedlist[counter2].stop; 451 452 for (struct vector<FixedObs>::const_iterator vobs5=obsfixedlist.begin(); vobs5!=obsfixedlist.end(); vobs5++) 445 453 { 446 if ( (*vobs5).obsfixedstart < obsfixedlist[counter2].obsfixedstop447 && obsfixedlist[counter2]. obsfixedstop <= (*vobs5).obsfixedstop448 && obsfixedlist[counter2]. obsfixedstart <= (*vobs5).obsfixedstart454 if (vobs5->start < obsfixedlist[counter2].stop 455 && obsfixedlist[counter2].stop <= vobs5->stop 456 && obsfixedlist[counter2].start <= vobs5->start 449 457 && counter2!=counter3) 450 458 { 451 delta1=(obsfixedlist[counter2].obsfixedstop-(*vobs5).obsfixedstart)/2; 452 finalobsfixedstop=obsfixedlist[counter2].obsfixedstop-delta1; 459 delta1=(obsfixedlist[counter2].stop-vobs5->start)/2; 460 finalobsfixedstop=obsfixedlist[counter2].stop-delta1; 461 453 462 ostringstream warndelta1; 454 warndelta1 << "Overlap between two fixed observations (" << obsfixedlist[counter2].obskey_fixed << " " << (*vobs5).obskey_fixed << "). The stoptime of " << obsfixedlist[counter2].obskey_fixed << " has been changed."; 463 warndelta1 << "Overlap between two fixed observations ("; 464 warndelta1 << obsfixedlist[counter2].obskey << " "; 465 warndelta1 << vobs5->obskey << "). The stoptime of "; 466 warndelta1 << obsfixedlist[counter2].obskey << " has been changed."; 455 467 T::Warn(warndelta1); 456 468 } 457 if ( (*vobs5).obsfixedstart <= obsfixedlist[counter2].obsfixedstart458 && obsfixedlist[counter2]. obsfixedstart < (*vobs5).obsfixedstop459 && obsfixedlist[counter2]. obsfixedstop >= (*vobs5).obsfixedstop469 if (vobs5->start <= obsfixedlist[counter2].start 470 && obsfixedlist[counter2].start < vobs5->stop 471 && obsfixedlist[counter2].stop >= vobs5->stop 460 472 && counter2!=counter3) 461 473 { 462 delta2=((*vobs5).obsfixedstop-obsfixedlist[counter2].obsfixedstart)/2; 463 finalobsfixedstart=obsfixedlist[counter2].obsfixedstart+delta2; 474 delta2=(vobs5->stop-obsfixedlist[counter2].start)/2; 475 finalobsfixedstart=obsfixedlist[counter2].start+delta2; 476 464 477 ostringstream warndelta2; 465 warndelta2 << "Overlap between two fixed observations (" << obsfixedlist[counter2].obskey_fixed << " " << (*vobs5).obskey_fixed << "). The starttime of " << obsfixedlist[counter2].obskey_fixed << " has been changed."; 478 warndelta2 << "Overlap between two fixed observations ("; 479 warndelta2 << obsfixedlist[counter2].obskey << " "; 480 warndelta2 << vobs5->obskey << "). The starttime of "; 481 warndelta2 << obsfixedlist[counter2].obskey << " has been changed."; 482 466 483 T::Warn(warndelta2); 467 484 } … … 469 486 } 470 487 471 int num=counter2-skipcounter;488 const int num=counter2-skipcounter; 472 489 obslist.resize(num+1); 473 490 obslist[num].obsstart=finalobsfixedstart; 474 491 obslist[num].obsstop=finalobsfixedstop; 475 obslist[num].sourcename_obs=obsfixedlist[counter2].sourcename _fixed;476 obslist[num].obsmode_obs=obsfixedlist[counter2].obsmode _fixed;477 obslist[num].obstype_obs=obsfixedlist[counter2].obstype _fixed;478 obslist[num].telsetup_obs=obsfixedlist[counter2].telsetup _fixed;479 obslist[num].sourcekey_obs=obsfixedlist[counter2].sourcekey _fixed;492 obslist[num].sourcename_obs=obsfixedlist[counter2].sourcename; 493 obslist[num].obsmode_obs=obsfixedlist[counter2].obsmode; 494 obslist[num].obstype_obs=obsfixedlist[counter2].obstype; 495 obslist[num].telsetup_obs=obsfixedlist[counter2].telsetup; 496 obslist[num].sourcekey_obs=obsfixedlist[counter2].sourcekey; 480 497 counter2++; 481 cout << " " << (*vobs).sourcename_fixed << " " << (*vobs).obsfixedstart << flush; 482 cout << " - " << (*vobs).obsfixedstop << endl; 498 499 cout << " " << vobs->sourcename << " " << vobs->start; 500 cout << " - " << vobs->stop << endl; 483 501 } 484 502 ostringstream obsmsg; … … 512 530 // the observation times shall be calculated 513 531 // and the observations added to the ScheduledObs list 514 struct vector<StdObs>::iterator vobs2;515 532 cout << "Standard Observations: " << endl; 516 for ( vobs2=obsstdlist.begin(); vobs2!=obsstdlist.end(); vobs2++)517 { 518 cout << " " << (*vobs2).sourcename_std << endl;533 for (struct vector<StdObs>::const_iterator vobs2=obsstdlist.begin(); vobs2!=obsstdlist.end(); vobs2++) 534 { 535 cout << " " << vobs2->sourcename_std << endl; 519 536 } 520 537 … … 523 540 // might be merged with next loop 524 541 counter2=0; 525 struct vector<ScheduledObs>::iterator vobs3; 526 for (vobs3=obslist.begin(); vobs3!=obslist.end(); vobs3++) 542 for (struct vector<ScheduledObs>::const_iterator vobs3=obslist.begin(); vobs3!=obslist.end(); vobs3++) 527 543 { 528 544 runlist.resize(counter2+1); … … 539 555 540 556 //delete old scheduled runs from the DB 541 mysqlpp::Query query0 = conn.query("DELETE FROM ScheduledRun"); 542 543 mysqlpp::SimpleResult res0 = query0.execute(); 557 const mysqlpp::SimpleResult res0 = 558 conn.query("DELETE FROM ScheduledRun").execute(); 559 // FIXME: Maybe we have to check for a successfull 560 // query but an empty result 561 /* throws exceptions 544 562 if (!res0) 545 563 { 546 cout << "MySQL query failed: " << query0.error() << endl; 547 throw; 548 } 564 ostringstream msg; 565 msg << "MySQL query failed: " << query0.error(); 566 T::Error(msg); 567 return T::kSM_Error; 568 }*/ 549 569 550 570 // in this loop the ScheduledRuns are inserted to the DB … … 553 573 counter3=0; 554 574 int insertcount=0; 555 boost::posix_time::ptime finalstarttime; 556 boost::posix_time::ptime finalstoptime; 557 struct vector<ScheduledRun>::iterator vobs4; 558 for (vobs4=runlist.begin(); vobs4!=runlist.end(); vobs4++) 575 ptime finalstarttime; 576 ptime finalstoptime; 577 for (struct vector<ScheduledRun>::const_iterator vobs4=runlist.begin(); vobs4!=runlist.end(); vobs4++) 559 578 { 560 579 for (int i=2; i<5; i++) … … 588 607 //cout << "executing query: " << q1.str() << endl; 589 608 590 mysqlpp::Query query1 = conn.query(q1.str()); 591 592 mysqlpp::SimpleResult res1 = query1.execute(); 609 const mysqlpp::SimpleResult res1 = conn.query(q1.str()).execute(); 610 // FIXME: Maybe we have to check for a successfull 611 // query but an empty result 612 /* throws exceptions 593 613 if (!res1) 594 614 { 595 cout << "MySQL query failed: " << query1.error() << endl; 596 throw; 597 } 615 ostringstream msg; 616 msg << "MySQL query failed: " << query1.error(); 617 T::Error(str); 618 return T::kSM_Error; 619 }*/ 598 620 insertcount++; 599 621 } … … 608 630 fSessionId = -1; 609 631 610 //bool error = false;611 632 return error ? T::kSM_Error : T::kSM_Ready; 612 633 } … … 632 653 AutoScheduler(ostream &out=cout) : T(out, "SCHEDULER"), fNextIsPreview(true), fSessionId(-1), fDBName("") 633 654 { 634 AddStateName(kSM_Scheduling, "Scheduling"); 635 //AddStateName(kSM_Comitting, "Comitting"); 636 637 AddEvent(kSM_Scheduling, "SCHEDULE", T::kSM_Ready); 655 AddStateName(kSM_Scheduling, "Scheduling", "Scheduling in progress."); 656 657 AddEvent(kSM_Scheduling, "SCHEDULE", "C", T::kSM_Ready) 658 ("FIXME FIXME FIXME (explanation for the command)" 659 "|database[string]:FIXME FIXME FIMXE (meaning and format)"); 660 661 AddEvent(T::kSM_Ready, "RESET", T::kSM_Error) 662 ("Reset command to get out of the error state"); 663 638 664 //AddEvent(kSM_Comitting, "COMMIT", T::kSM_Ready); 639 665 … … 646 672 { 647 673 case kSM_Scheduling: 648 return Schedule(); 674 try 675 { 676 return Schedule(); 677 } 678 catch (const mysqlpp::Exception &e) 679 { 680 T::Error(string("MySQL: ")+e.what()); 681 return T::kSM_Error; 682 } 683 684 // This does an autmatic reset (FOR TESTING ONLY) 685 case T::kSM_Error: 686 return T::kSM_Ready; 649 687 650 688 //case kSM_Comitting: … … 811 849 vm = conf.Parse(argc, argv); 812 850 } 813 catch (std::exception &e)814 {815 851 #if BOOST_VERSION > 104000 816 po::multiple_occurrences *MO = dynamic_cast<po::multiple_occurrences*>(&e); 817 if (MO) 818 cout << "Error: " << e.what() << " of '" << MO->get_option_name() << "' option." << endl; 819 else 852 catch (po::multiple_occurrences &e) 853 { 854 cerr << "Program options invalid due to: " << e.what() << " of '" << e.get_option_name() << "'." << endl; 855 return -1; 856 } 820 857 #endif 821 cout << "Error: " << e.what() << endl;822 cout << endl;823 858 catch (exception& e) 859 { 860 cerr << "Program options invalid due to: " << e.what() << endl; 824 861 return -1; 825 862 }
Note:
See TracChangeset
for help on using the changeset viewer.