//**************************************************************** /** @class DataLogger @brief Logs all message and infos between the services This is the main logging class facility. It derives from StateMachineDim and DimInfoHandler. the first parent is here to enforce a state machine behaviour, while the second one is meant to make the dataLogger receive dim services to which it subscribed from. The possible states and transitions of the machine are: \dot digraph datalogger { node [shape=record, fontname=Helvetica, fontsize=10]; e [label="Error" color="red"]; r [label="Ready"] d [label="NightlyOpen"] w [label="WaitingRun"] l [label="Logging"] b [label="BadNightlyconfig" color="red"] c [label="BadRunConfig" color="red"] e -> r r -> e r -> d r -> b d -> w d -> r w -> r l -> r l -> w b -> d w -> c w -> l b -> r c -> r c -> l } \enddot */ //**************************************************************** #include "FACT.h" #include "Dim.h" #include "Event.h" #include "Time.h" #include "StateMachineDim.h" #include "WindowLog.h" #include "Configuration.h" #include "ServiceList.h" #include "Converter.h" #include "MessageImp.h" #include "LocalControl.h" #include "DimDescriptionService.h" #include "Description.h" #include "DimServiceInfoList.h" //for getting stat of opened files #include //for getting disk free space #include //for getting files sizes #include #include #include #if BOOST_VERSION < 104400 #if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4)) #undef BOOST_HAS_RVALUE_REFS #endif #endif #include #ifdef HAVE_FITS #include "Fits.h" #endif //Dim structures struct DataLoggerStats { long sizeWritten; long freeSpace; long writingRate; }; struct NumSubAndFitsType { int numSubscriptions; int numOpenFits; }; struct OpenFileToDim { int code; char fileName[FILENAME_MAX]; }; class DataLogger : public StateMachineDim, DimInfoHandler { public: /// The list of existing states specific to the DataLogger enum { kSM_NightlyOpen = 20, ///< Nightly file openned and writing kSM_WaitingRun = 30, ///< waiting for the run number to open the run file kSM_Logging = 40, ///< both files openned and writing kSM_BadNightlyConfig = 0x101, ///< the folder specified for Nightly logging does not exist or has bad permissions kSM_BadRunConfig = 0x102, ///< the folder specified for the run logging does not exist or has wrong permissions or no run number } localstates_t; DataLogger(ostream &out); ~DataLogger(); private: //Define all the data structure specific to the DataLogger here /// ofstream for the NightlyLogfile ofstream fNightlyLogFile; /// ofstream for the run-specific Log file ofstream fRunLogFile; /// ofstream for the Nightly report file ofstream fNightlyReportFile; /// ofstream for the run-specific report file ofstream fRunReportFile; /// base path of the Nightlyfile string fNightlyFileName; ///base path of the run file string fRunFileName; ///run number (-1 means no run number specified) int fRunNumber; ///previous run number. to check if changed while logging int fPreviousRunNumber; ///Current Service Quality int fQuality; ///Modified Julian Date double fMjD; public: ///Define all the static names static const char* fConfigDay; static const char* fConfigRun; static const char* fConfigRunNumber; static const char* fConfigLog; static const char* fTransStart; static const char* fTransStop; static const char* fTransStartRun; static const char* fTransStopRun; static const char* fTransReset; static const char* fTransWait; static const char* fRunNumberInfo; ///< This is the name of the dimInfo received to specify the run number. It must be updated once the final name will be defined static const char* fPrintCommand; static const char* fDebugOnOff; static const char* fStatsPeriod; static const char* fStartStopOpenedFiles; static const char* fStartStopNumSubsAndFits; private: //overloading of DIM's infoHandler function void infoHandler(); ///for obtaining the name of the existing services ServiceList fServiceList; ///A std pair to store both the DimInfo pointer and the corresponding outputted fits file struct SubscriptionType { #ifdef HAVE_FITS ///Nightly FITS output file Fits nightlyFile; ///run-specific FITS output file Fits runFile; #endif ///the actual dimInfo pointer DimStampedInfo* dimInfo; ///the converter for outputting the data according to the format Converter* fConv; ///the number of existing handlers to this structure. ///This is required otherwise I MUST handle the deleting of dimInfo outside from the destructor int* numCopies; void operator = (const SubscriptionType& other) { #ifdef HAVE_FITS nightlyFile = other.nightlyFile; runFile = other.runFile; #endif dimInfo = other.dimInfo; numCopies = other.numCopies; fConv = other.fConv; (*numCopies)++; } SubscriptionType(const SubscriptionType& other) { #ifdef HAVE_FITS nightlyFile = other.nightlyFile; runFile = other.runFile; #endif dimInfo = other.dimInfo; numCopies = other.numCopies; fConv = other.fConv; (*numCopies)++; } SubscriptionType(DimStampedInfo* info) { dimInfo = info; fConv = NULL; numCopies = new int(1); } SubscriptionType() { dimInfo = NULL; fConv = NULL; numCopies = new int(1); } ~SubscriptionType() { if (numCopies) (*numCopies)--; if (numCopies) if (*numCopies < 1) { if (dimInfo) delete dimInfo; #ifdef HAVE_FITS if (nightlyFile.IsOpen()) nightlyFile.Close(); if (runFile.IsOpen()) runFile.Close(); #endif if (numCopies) delete numCopies; delete fConv; fConv = NULL; dimInfo = NULL; numCopies = NULL; } } }; typedef map > SubscriptionsListType; ///All the services to which we have subscribed to, sorted by server name. SubscriptionsListType fServiceSubscriptions; ///Reporting method for the services info received void ReportPlease(DimInfo* I, SubscriptionType& sub); ///Configuration of the nightly file path int ConfigureNightlyFileName(const Event& evt); ///Configuration fo the file name int ConfigureRunFileName(const Event& evt); ///DEPREC - configuration of the run number int ConfigureRunNumber(const Event& evt); ///logging method for the messages int LogMessagePlease(const Event& evt); ///print the current state of the dataLogger int PrintStatePlease(const Event& evt); ///checks whether or not the current info being treated is a run number void CheckForRunNumber(DimInfo* I); /// start transition int StartPlease(); ///from waiting to logging transition int StartRunPlease(); /// from logging to waiting transition int StopRunPlease(); ///stop and reset transition int GoToReadyPlease(); ///from NightlyOpen to waiting transition int NightlyToWaitRunPlease(); #ifdef HAVE_FITS ///Open fits files void OpenFITSFilesPlease(SubscriptionType& sub); ///Write data to FITS files void WriteToFITS(SubscriptionType& sub); ///Allocate the buffers required for fits void AllocateFITSBuffers(SubscriptionType& sub); ///FITS file for runs grouping. only one, hence dealt with in the dataLogger itself CCfits::FITS* fRunFitsFile; #endif//has_fits public: ///checks with fServiceList whether or not the services got updated bool CheckForServicesUpdate(); private: ///monitoring notification loop void ServicesMonitoring(); ///services notification thread boost::thread fMonitoringThread; ///end of the monitoring bool fContinueMonitoring; ///required for accurate monitoring map fFileSizesMap; string fFullNightlyLogFileName; string fFullNightlyReportFileName; string fFullRunLogFileName; string fFullRunReportFileName; long fBaseSizeNightly; long fPreviousSize; long fBaseSizeRun; ///Service for opened files DimDescribedService* fOpenedNightlyFiles; DimDescribedService* fOpenedRunFiles; DimDescribedService* fNumSubAndFits; NumSubAndFitsType fNumSubAndFitsData; inline void NotifyOpenedFile(string name, int type, DimDescribedService* service); public: bool SetConfiguration(Configuration& conf); private: set fBlackList; set fWhiteList; set fGrouping; bool fHasBlackList; bool fHasWhiteList; bool fDebugIsOn; float fStatsPeriodDuration; bool fOpenedFilesIsOn; bool fNumSubAndFitsIsOn; //functions for controlling the services behavior int SetDebugOnOff(const Event& evt); int SetStatsPeriod(const Event& evt); int SetOpenedFilesOnOff(const Event& evt); int SetNumSubsAndFitsOnOff(const Event& evt); ///boolean to prevent DIM update while desctructing the dataLogger bool fDestructing; ///Small function for calculating the total size written so far void calculateTotalSizeWritten(DataLoggerStats& statVar, bool& shouldWarn, bool isPrinting); ///vectors to keep track of opened Fits files, for grouping purposes. //This cannot be done otherwise, as services may disapear before the files are nicely closed. Hence which files were opened must be remembered. map > fOpenedRunFits; map > fOpenedNightlyFits; void CreateFitsGrouping(bool runGroup); }; //DataLogger void DataLogger::calculateTotalSizeWritten(DataLoggerStats& statVar, bool& shouldWarn, bool isPrinting) { #ifdef HAVE_FITS if (isPrinting) { ostringstream str; str.str(""); str << "There are " << fNumSubAndFitsData.numOpenFits << " FITS files open:"; Message(str.str()); } SubscriptionsListType::iterator x; map::iterator y; ///TODO the grouping file is dealt with several times. This should not be a problem but well, better to fix it I guess. for (x=fServiceSubscriptions.begin(); x != fServiceSubscriptions.end(); x++) { for (y=x->second.begin(); y != x->second.end(); y++) { if (y->second.runFile.IsOpen()) { fFileSizesMap[y->second.runFile.fFileName] = y->second.runFile.GetWrittenSize(); if (isPrinting) Message("-> "+y->second.runFile.fFileName); } if (y->second.nightlyFile.IsOpen()) { fFileSizesMap[y->second.nightlyFile.fFileName] = y->second.nightlyFile.GetWrittenSize(); if (isPrinting) Message("-> "+y->second.nightlyFile.fFileName); } } } #else if (isPrinting) Message("FITS output disabled at compilation"); #endif struct stat st; //gather log and report files sizes on disk if (fNightlyLogFile.is_open()) { stat(fFullNightlyLogFileName.c_str(), &st); fFileSizesMap[fFullNightlyLogFileName] = st.st_size; } if (fNightlyReportFile.is_open()) { stat(fFullNightlyReportFileName.c_str(), &st); fFileSizesMap[fFullNightlyReportFileName] = st.st_size; } if (fRunLogFile.is_open()) { stat(fFullRunLogFileName.c_str(), &st); fFileSizesMap[fFullRunLogFileName] = st.st_size; } if (fRunReportFile.is_open()) { stat(fFullRunReportFileName.c_str(), &st); fFileSizesMap[fFullRunReportFileName] = st.st_size; } struct statvfs vfs; if (!statvfs(fNightlyFileName.c_str(), &vfs)) { statVar.freeSpace = vfs.f_bsize*vfs.f_bavail; shouldWarn = false; } else { ostringstream str; str.str(""); str << "Unable to retrieve stats for " << fNightlyFileName << ". Reason: " << strerror(errno) << " [" << errno << "]"; if (!shouldWarn) Error(str); shouldWarn = true; statVar.freeSpace = -1; } //sum up all the file sizes. past and present statVar.sizeWritten = 0; for (map::iterator it=fFileSizesMap.begin(); it != fFileSizesMap.end(); it++) statVar.sizeWritten += it->second; statVar.sizeWritten -= fBaseSizeNightly; statVar.sizeWritten -= fBaseSizeRun; } //static members initialization //since I do not check the transition/config names any longer, indeed maybe these could be hard-coded... but who knows what will happen in the future ? const char* DataLogger::fConfigDay = "CONFIG_DAY"; const char* DataLogger::fConfigRun = "CONFIG_RUN"; const char* DataLogger::fConfigRunNumber = "CONFIG_RUN_NUMBER"; const char* DataLogger::fConfigLog = "LOG"; const char* DataLogger::fTransStart = "START"; const char* DataLogger::fTransStop = "STOP"; const char* DataLogger::fTransStartRun = "START_RUN"; const char* DataLogger::fTransStopRun = "STOP_RUN"; const char* DataLogger::fTransReset = "RESET"; const char* DataLogger::fTransWait = "WAIT_RUN_NUMBER"; const char* DataLogger::fRunNumberInfo = "RUN_NUMBER"; const char* DataLogger::fPrintCommand = "PRINT"; const char* DataLogger::fDebugOnOff = "DEBUG"; const char* DataLogger::fStatsPeriod = "STATS_PERIOD"; const char* DataLogger::fStartStopOpenedFiles = "OPENED_FILES_SRVC"; const char* DataLogger::fStartStopNumSubsAndFits = "NUM_SUBS_SRVC"; void DataLogger::ServicesMonitoring() { DataLoggerStats statVar; statVar.sizeWritten = 0; statVar.freeSpace = 0; statVar.writingRate = 0; struct statvfs vfs; if (!statvfs(fNightlyFileName.c_str(), &vfs)) statVar.freeSpace = vfs.f_bsize*vfs.f_bavail; else statVar.freeSpace = -1; DimDescribedService srvc ("DATA_LOGGER/STATS", "X:3", statVar, "Add description here"); fPreviousSize = 0; bool statWarning = false; //loop-wait for broadcast while (fContinueMonitoring) { if (fStatsPeriodDuration == 0.0f) { sleep(0.1f); continue; } else sleep(fStatsPeriodDuration); //update the fits files sizes calculateTotalSizeWritten(statVar, statWarning, false); if (fStatsPeriodDuration == 0.0f) continue; statVar.writingRate = (statVar.sizeWritten - fPreviousSize)/fStatsPeriodDuration; fPreviousSize = statVar.sizeWritten; if (statVar.writingRate != 0) //if data has been written { srvc.updateService(); if(fDebugIsOn) { ostringstream str; str << "Size written: " << statVar.sizeWritten/1024 << " KB; writting rate: "; str << statVar.writingRate/1024 << " KB/s; free space: "; str << statVar.freeSpace/(1024*1024) << " MB"; Debug(str.str()); } } } } // -------------------------------------------------------------------------- // //! Default constructor. The name of the machine is given DATA_LOGGER //! and the state is set to kSM_Ready at the end of the function. // //!Setup the allows states, configs and transitions for the data logger // DataLogger::DataLogger(ostream &out) : StateMachineDim(out, "DATA_LOGGER") { dic_disable_padding(); dis_disable_padding(); //initialize member data fNightlyFileName = "."; fRunFileName = "."; fRunNumber = -1; fPreviousRunNumber = fRunNumber; #ifdef HAVE_FITS fRunFitsFile = NULL; #endif //Give a name to this machine's specific states AddStateName(kSM_NightlyOpen, "NightlyFileOpen", "The summary files for the night are open."); AddStateName(kSM_WaitingRun, "WaitForRun", "The summary files for the night are open and we wait for a run to be started."); AddStateName(kSM_Logging, "Logging", "The summary files for the night and the files for a single run are open."); AddStateName(kSM_BadNightlyConfig, "ErrNightlyFolder", "The folder for the nighly summary files is invalid."); AddStateName(kSM_BadRunConfig, "ErrRunFolder", "The folder for the run files is invalid."); /*Add the possible transitions for this machine*/ AddEvent(kSM_NightlyOpen, fTransStart, kSM_Ready, kSM_BadNightlyConfig) (boost::bind(&DataLogger::StartPlease, this)) ("Start the nightly logging. Nightly file location must be specified already"); AddEvent(kSM_Ready, fTransStop, kSM_NightlyOpen, kSM_WaitingRun, kSM_Logging) (boost::bind(&DataLogger::GoToReadyPlease, this)) ("Stop all data logging, close all files."); AddEvent(kSM_Logging, fTransStartRun, kSM_WaitingRun, kSM_BadRunConfig) (boost::bind(&DataLogger::StartRunPlease, this)) ("Start the run logging. Run file location must be specified already."); AddEvent(kSM_WaitingRun, fTransStopRun, kSM_Logging) (boost::bind(&DataLogger::StopRunPlease, this)) ("Wait for a run to be started, open run-files as soon as a run number arrives."); AddEvent(kSM_Ready, fTransReset, kSM_Error, kSM_BadNightlyConfig, kSM_BadRunConfig, kSM_Error) (boost::bind(&DataLogger::GoToReadyPlease, this)) ("Transition to exit error states. Closes the nightly file if already opened."); AddEvent(kSM_WaitingRun, fTransWait, kSM_NightlyOpen) (boost::bind(&DataLogger::NightlyToWaitRunPlease, this)); /*Add the possible configurations for this machine*/ AddEvent(fConfigDay, "C", kSM_Ready, kSM_BadNightlyConfig) (boost::bind(&DataLogger::ConfigureNightlyFileName, this, _1)) ("Configure the folder for the nightly files." "|Path[string]:Absolute or relative path name where the nightly files should be stored."); AddEvent(fConfigRun, "C", kSM_Ready, kSM_BadNightlyConfig, kSM_NightlyOpen, kSM_WaitingRun, kSM_BadRunConfig) (boost::bind(&DataLogger::ConfigureRunFileName, this, _1)) ("Configure the folder for the run files." "|Path[string]:Absolute or relative path name where the run files should be stored."); AddEvent(fConfigRunNumber, "I", kSM_Ready, kSM_BadNightlyConfig, kSM_NightlyOpen, kSM_WaitingRun, kSM_BadRunConfig) (boost::bind(&DataLogger::ConfigureRunNumber, this, _1)) ("configure the run number. cannot be done in logging state"); //Provide a logging command //I get the feeling that I should be going through the EventImp //instead of DimCommand directly, mainly because the commandHandler //is already done in StateMachineImp.cc //Thus I'll simply add a configuration, which I will treat as the logging command AddEvent(fConfigLog, "C", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_BadRunConfig) (boost::bind(&DataLogger::LogMessagePlease, this, _1)) ("Log a single message to the log-files." "|Message[string]:Message to be logged."); //Provide a print command ostringstream str; str << kSM_Ready << " " << kSM_NightlyOpen << " " << kSM_WaitingRun << " " << kSM_Logging << " " << kSM_BadNightlyConfig; str << " " << kSM_BadRunConfig; AddEvent(fPrintCommand, str.str().c_str(), "") (boost::bind(&DataLogger::PrintStatePlease, this, _1)) ("Print information about the internal status of the data logger."); fServiceList.SetHandler(this); CheckForServicesUpdate(); //start the monitoring service fContinueMonitoring = true; fMonitoringThread = boost::thread(boost::bind(&DataLogger::ServicesMonitoring, this)); fBaseSizeNightly = 0; fBaseSizeRun = 0; OpenFileToDim fToDim; fToDim.code = 0; fToDim.fileName[0] = '\0'; fOpenedNightlyFiles = new DimDescribedService(GetName() + "/FILENAME_NIGHTLY", "I:1;C", fToDim, "Path and base name which is used to compile the filenames for the nightly files." "|Type[int]:type of open files (1=log, 2=rep, 4=fits)" "|Name[string]:path and base file name"); fOpenedRunFiles = new DimDescribedService(GetName() + "/FILENAME_RUN", "I:1;C", fToDim, "Path and base name which is used to compile the filenames for the run files." "|Type[int]:type of open files (1=log, 2=rep, 4=fits)" "|Name[string]:path and base file name"); fNumSubAndFitsData.numSubscriptions = 0; fNumSubAndFitsData.numOpenFits = 0; fNumSubAndFits = new DimDescribedService(GetName() + "/NUM_SUBS", "I:2", fNumSubAndFitsData, "Shows number of services to which the data logger is currently subscribed and the total number of open files." "|Subscriptions[int]:number of dim services to which the data logger is currently subscribed." "|NumOpenFiles[int]:number of files currently open by the data logger"); //black/white list fHasBlackList = false; fHasWhiteList = false; fBlackList.clear(); fWhiteList.clear(); //services parameters fDebugIsOn = false; fStatsPeriodDuration = 1.0f; fOpenedFilesIsOn = true; fNumSubAndFitsIsOn = true; //provide services control commands AddEvent(fDebugOnOff, "B:1", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready) (boost::bind(&DataLogger::SetDebugOnOff, this, _1)) ("Switch debug mode on off. Debug mode prints ifnormation about every service written to a file." "|Enable[bool]:Enable of disable debuig mode (yes/no)."); AddEvent(fStatsPeriod, "F", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready) (boost::bind(&DataLogger::SetStatsPeriod, this, _1)) ("Interval in which the data-logger statitistics service (STATS) is updated." "Interval[s]:Floating point value in seconds."); AddEvent(fStartStopOpenedFiles, "B:1", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready) (boost::bind(&DataLogger::SetOpenedFilesOnOff ,this, _1)) ("Can be used to switch the service off which distributes information about the open files."); AddEvent(fStartStopNumSubsAndFits, "B:1", kSM_NightlyOpen, kSM_Logging, kSM_WaitingRun, kSM_Ready) (boost::bind(&DataLogger::SetNumSubsAndFitsOnOff, this, _1)) ("Can be used to switch the service off which distributes information about the number of subscriptions and open files."); fDestructing = false; if(fDebugIsOn) { Debug("DataLogger Init Done."); } } // -------------------------------------------------------------------------- // //! Checks for changes in the existing services. //! Any new service will be added to the service list, while the ones which disappeared are removed. // //FIXME The service must be udpated so that I get the first notification. This should not be bool DataLogger::CheckForServicesUpdate() { bool serviceUpdated = false; //get the current server list const vector serverList = fServiceList.GetServerList(); //first let's remove the servers that may have disapeared //can't treat the erase on maps the same way as for vectors. Do it the safe way instead vector toBeDeleted; for (SubscriptionsListType::iterator cListe = fServiceSubscriptions.begin(); cListe != fServiceSubscriptions.end(); cListe++) { vector::const_iterator givenServers; for (givenServers=serverList.begin(); givenServers!= serverList.end(); givenServers++) if (cListe->first == *givenServers) break; if (givenServers == serverList.end())//server vanished. Remove it { toBeDeleted.push_back(cListe->first); serviceUpdated = true; } } for (vector::const_iterator it = toBeDeleted.begin(); it != toBeDeleted.end(); it++) fServiceSubscriptions.erase(*it); //now crawl through the list of servers, and see if there was some updates for (vector::const_iterator i=serverList.begin(); i!=serverList.end();i++) { //skip the two de-fact excluded services //Dim crashes if the publisher subscribes to its own service. This sounds weird, I agree. if ((i->find("DIS_DNS") != string::npos) || (i->find("DATA_LOGGER") != string::npos)) continue; //find the current server in our subscription list SubscriptionsListType::iterator cSubs = fServiceSubscriptions.find(*i); //get the service list of the current server vector cServicesList = fServiceList.GetServiceList(*i); if (cSubs != fServiceSubscriptions.end())//if the current server already is in our subscriptions { //then check and update our list of subscriptions //first, remove the services that may have dissapeared. map::iterator serverSubs; vector::const_iterator givenSubs; toBeDeleted.clear(); for (serverSubs=cSubs->second.begin(); serverSubs != cSubs->second.end(); serverSubs++) { for (givenSubs = cServicesList.begin(); givenSubs != cServicesList.end(); givenSubs++) if (serverSubs->first == *givenSubs) break; if (givenSubs == cServicesList.end()) { toBeDeleted.push_back(serverSubs->first); serviceUpdated = true; } } for (vector::const_iterator it = toBeDeleted.begin(); it != toBeDeleted.end(); it++) cSubs->second.erase(*it); //now check for new services for (givenSubs = cServicesList.begin(); givenSubs != cServicesList.end(); givenSubs++) { if (*givenSubs == "SERVICE_LIST") continue; if (fHasWhiteList && (fWhiteList.find(*i + "/") == fWhiteList.end()) && (fWhiteList.find(*i + "/" + *givenSubs) == fWhiteList.end()) && (fWhiteList.find("/" + *givenSubs) == fWhiteList.end())) continue; if (fHasBlackList && ((fBlackList.find(*i + "/") != fBlackList.end()) || (fBlackList.find(*i + "/" + *givenSubs) != fBlackList.end()) || (fBlackList.find("/" + *givenSubs) != fBlackList.end()))) continue; if (cSubs->second.find(*givenSubs) == cSubs->second.end()) {//service not found. Add it cSubs->second[*givenSubs].dimInfo = new DimStampedInfo(((*i) + "/" + *givenSubs).c_str(), const_cast(""), this); serviceUpdated = true; if(fDebugIsOn) { ostringstream str; str << "Subscribing to service " << *i << "/" << *givenSubs; Debug(str.str()); } } } } else //server not found in our list. Create its entry { fServiceSubscriptions[*i] = map(); map& liste = fServiceSubscriptions[*i]; for (vector::const_iterator j = cServicesList.begin(); j!= cServicesList.end(); j++) { if (*j == "SERVICE_LIST") continue; if (fHasWhiteList && (fWhiteList.find(*i + "/") == fWhiteList.end()) && (fWhiteList.find(*i + "/" + *j) == fWhiteList.end()) && (fWhiteList.find("/" + *j) == fWhiteList.end())) continue; if (fHasBlackList && ((fBlackList.find(*i + "/") != fBlackList.end()) || (fBlackList.find(*i + "/" + *j) != fBlackList.end()) || (fBlackList.find("/" + *j) != fBlackList.end()))) continue; liste[*j].dimInfo = new DimStampedInfo(((*i) + "/" + (*j)).c_str(), const_cast(""), this); serviceUpdated = true; if(fDebugIsOn) { ostringstream str; str << "Subscribing to service " << *i << "/" << *j; Debug(str.str()); } } } } return serviceUpdated; } // -------------------------------------------------------------------------- // //! Destructor // DataLogger::~DataLogger() { if (fDebugIsOn) { Debug("DataLogger destruction starts"); } fDestructing = true; //first let's go to the ready state GoToReadyPlease(); //release the services subscriptions fServiceSubscriptions.clear(); //exit the monitoring loop fContinueMonitoring = false; fMonitoringThread.join(); //close the files if (fNightlyLogFile.is_open()) fNightlyLogFile.close(); if (fNightlyReportFile.is_open()) fNightlyReportFile.close(); if (fRunLogFile.is_open()) fRunLogFile.close(); if (fRunReportFile.is_open()) fRunReportFile.close(); delete fOpenedNightlyFiles; delete fOpenedRunFiles; delete fNumSubAndFits; #ifdef HAVE_FITS if (fRunFitsFile != NULL) delete fRunFitsFile; fRunFitsFile = NULL; #endif if (fDebugIsOn) { Debug("DataLogger desctruction ends"); } } // -------------------------------------------------------------------------- // //! Inherited from DimInfo. Handles all the Infos to which we subscribed, and log them // void DataLogger::infoHandler() { // Make sure getTimestamp is called _before_ getTimestampMillisecs if (fDestructing) return; DimInfo* I = getInfo(); SubscriptionsListType::iterator x; map::iterator y; if (I==NULL) { if (CheckForServicesUpdate()) { //services were updated. Notify fNumSubAndFitsData.numSubscriptions = 0; for (x=fServiceSubscriptions.begin(); x != fServiceSubscriptions.end(); x++) fNumSubAndFitsData.numSubscriptions += x->second.size(); if (fNumSubAndFitsIsOn) { if (fDebugIsOn) { ostringstream str; str << "Updating number of subscriptions service: Num Subs=" << fNumSubAndFitsData.numSubscriptions << " Num open FITS=" << fNumSubAndFitsData.numOpenFits; Debug(str.str()); } fNumSubAndFits->updateService(); } } return; } //check if the service pointer corresponds to something that we subscribed to //this is a fix for a bug that provides bad Infos when a server starts bool found = false; for (x=fServiceSubscriptions.begin(); x != fServiceSubscriptions.end(); x++) {//find current service is subscriptions for (y=x->second.begin(); y!=x->second.end();y++) if (y->second.dimInfo == I) { found = true; break; } if (found) break; } if (!found) return; if (I->getSize() <= 0) return; // Make sure that getTimestampMillisecs is NEVER called before // getTimestamp is properly called // check that the message has been updated by something, i.e. must be different from its initial value if (I->getTimestamp() == 0) return; CheckForRunNumber(I); if (fPreviousRunNumber != fRunNumber) {//run number has changed. close and reopen run files. StopRunPlease(); StartRunPlease(); fPreviousRunNumber = fRunNumber; } ReportPlease(I, y->second); } // -------------------------------------------------------------------------- // //! Checks whether or not the current info is a run number. //! If so, then remember it. A run number is required to open the run-log file //! @param I //! the current DimInfo // void DataLogger::CheckForRunNumber(DimInfo* I) { if (strstr(I->getName(), fRunNumberInfo) != NULL) {//assumes that the run number is an integer fRunNumber = I->getInt(); ostringstream str; str << "New run number is " << fRunNumber; Message(str.str()); } } // -------------------------------------------------------------------------- // //! write infos to log files. //! @param I //! The current DimInfo //! @param sub //! The dataLogger's subscription corresponding to this DimInfo // void DataLogger::ReportPlease(DimInfo* I, SubscriptionType& sub) { //should we log or report this info ? (i.e. is it a message ?) bool isItaReport = ((strstr(I->getName(), "Message") == NULL) && (strstr(I->getName(), "MESSAGE") == NULL)); if (I->getFormat()[0] == 'C') isItaReport = false; if (!fNightlyReportFile.is_open()) return; //create the converter for that service if (sub.fConv == NULL && isItaReport) { //trick the converter in case of 'C'. why do I do this ? well simple: the converter checks that the right number //of bytes was written. because I skip 'C' with fits, the bytes will not be allocated, hence the "size copied ckeck" //of the converter will fail, hence throwing an exception. string fakeFormat(I->getFormat()); if (fakeFormat[fakeFormat.size()-1] == 'C') fakeFormat = fakeFormat.substr(0, fakeFormat.size()-1); sub.fConv = new Converter(Out(), I->getFormat()); if (!sub.fConv) { ostringstream str; str << "Couldn't properly parse the format... service " << sub.dimInfo->getName() << " ignored."; Error(str); return; } } //construct the header ostringstream header; Time cTime(I->getTimestamp(), I->getTimestampMillisecs()*1000); fQuality = I->getQuality(); fMjD = cTime.Mjd(); if (isItaReport) { //write text header header << I->getName() << " " << fQuality << " "; header << cTime.Y() << " " << cTime.M() << " " << cTime.D() << " "; header << cTime.h() << " " << cTime.m() << " " << cTime.s() << " "; header << cTime.ms() << " " << I->getTimestamp() << " "; string text; try { text = sub.fConv->GetString(I->getData(), I->getSize()); } catch (const runtime_error &e) { Out() << kRed << e.what() << endl; ostringstream str; str << "Could not properly parse the data for service " << sub.dimInfo->getName(); str << " reason: " << e.what() << ". Entry ignored"; Error(str); return; } if (text.empty()) { ostringstream str; str << "Service " << sub.dimInfo->getName() << " sent an empty string"; Info(str); return; } //replace bizarre characters by white space replace(text.begin(), text.end(), '\n', '\\'); replace_if(text.begin(), text.end(), ptr_fun(&iscntrl), ' '); //write entry to Nightly report if (fNightlyReportFile.is_open()) { if (fDebugIsOn) { ostringstream str; str << "Writing: \"" << header.str() << text << "\" to Nightly report file"; Debug(str.str()); } fNightlyReportFile << header.str() << text << endl; //check if either eof, bailbit or batbit are set if (!fNightlyReportFile.good()) { Error("An error occured while writing to the nightly report file. Closing it"); if (fNightlyReportFile.is_open()) fNightlyReportFile.close(); } } //write entry to run-report if (fRunReportFile.is_open()) { if (fDebugIsOn) { ostringstream str; str << "Writing: \"" << header.str() << text << "\" to Run report file"; Debug(str.str()); } fRunReportFile << header.str() << text << endl; if (!fRunReportFile.good()) { Error("An error occured while writing to the run report file. Closing it."); if (fRunReportFile.is_open()) fRunReportFile.close(); } } } else {//write entry to both Nightly and run logs string n = I->getName(); ostringstream msg; msg << n << ": " << I->getString();//n.substr(0, n.find_first_of('/')) << ": " << I->getString(); if (fNightlyLogFile.is_open()) { if (fDebugIsOn) { ostringstream str; str << "Writing: \"" << msg.str() << "\" to Nightly log file"; Debug(str.str()); } MessageImp nightlyMess(fNightlyLogFile); nightlyMess.Write(cTime, msg.str().c_str(), fQuality); if (!fNightlyLogFile.good()) { Error("An error occured while writing to the nightly log file. Closing it."); if (fNightlyLogFile.is_open()) fNightlyLogFile.close(); } } if (fRunLogFile.is_open()) { if (fDebugIsOn) { ostringstream str; str << "Writing: \"" << msg.str() << "\" to Run log file"; Debug(str.str()); } MessageImp runMess(fRunLogFile); runMess.Write(cTime, msg.str().c_str(), fQuality); if (!fRunLogFile.good()) { Error("An error occured while writing to the run log file. Closing it."); if (fRunLogFile.is_open()) fRunLogFile.close(); } } } #ifdef HAVE_FITS if (isItaReport) { if (!sub.nightlyFile.IsOpen() || !sub.runFile.IsOpen()) OpenFITSFilesPlease(sub); WriteToFITS(sub); } #endif } // -------------------------------------------------------------------------- // //! write messages to logs. //! @param evt //! the current event to log //! @returns //! the new state. Currently, always the current state //! //! @deprecated //! I guess that this function should not be any longer // //Otherwise re-write it properly with the MessageImp class int DataLogger::LogMessagePlease(const Event& evt) { if (!fNightlyLogFile.is_open()) return GetCurrentState(); Warn("LogMessagePlease has not been checked nor updated since a long while. Undefined behavior to be expected"); ostringstream header; const Time& cTime = evt.GetTime(); header << evt.GetName() << " " << cTime.Y() << " " << cTime.M() << " " << cTime.D() << " "; header << cTime.h() << " " << cTime.m() << " " << cTime.s() << " "; header << cTime.ms() << " "; const Converter conv(Out(), evt.GetFormat()); if (!conv) { Error("Couldn't properly parse the format... ignored."); return GetCurrentState(); } string text; try { text = conv.GetString(evt.GetData(), evt.GetSize()); } catch (const runtime_error &e) { Out() << kRed << e.what() << endl; Error("Couldn't properly parse the data... ignored."); return GetCurrentState(); } if (text.empty()) return GetCurrentState(); //replace bizarre characters by white space replace(text.begin(), text.end(), '\n', '\\'); replace_if(text.begin(), text.end(), ptr_fun(&iscntrl), ' '); if (fDebugIsOn) { ostringstream str; str << "Logging: \"" << header << text << "\""; Debug(str.str()); } if (fNightlyLogFile.is_open()) { fNightlyLogFile << header; if (!fNightlyLogFile.good()) { Error("An error occured while writing to the run log file. Closing it."); if (fNightlyLogFile.is_open()) fNightlyLogFile.close(); } } if (fRunLogFile.is_open()) { fRunLogFile << header; if (!fRunLogFile.good()) { Error("An error occured while writing to the run log file. Closing it."); if (fRunLogFile.is_open()) fRunLogFile.close(); } } if (fNightlyLogFile.is_open()) { fNightlyLogFile << text; if (!fNightlyLogFile.good()) { Error("An error occured while writing to the run log file. Closing it."); if (fNightlyLogFile.is_open()) fNightlyLogFile.close(); } } if (fRunLogFile.is_open()) { fRunLogFile << text; if (!fRunLogFile.good()) { Error("An error occured while writing to the run log file. Closing it."); if (fRunLogFile.is_open()) fRunLogFile.close(); } } return GetCurrentState(); } // -------------------------------------------------------------------------- // //! print the dataLogger's current state. invoked by the PRINT command //! @param evt //! the current event. Not used by the method //! @returns //! the new state. Which, in that case, is the current state //! int DataLogger::PrintStatePlease(const Event& ) { Message("-----------------------------------------"); Message("------ DATA LOGGER CURRENT STATE --------"); Message("-----------------------------------------"); //print the path configuration string actualTargetDir; if (fNightlyFileName == ".") { char currentPath[FILENAME_MAX]; if (getcwd(currentPath, sizeof(currentPath))) actualTargetDir = currentPath; } else actualTargetDir = fNightlyFileName; Message("Nightly Path: " + actualTargetDir); if (fRunFileName == ".") { char currentPath[FILENAME_MAX]; if (getcwd(currentPath, sizeof(currentPath))) actualTargetDir = currentPath; } else actualTargetDir = fRunFileName; Message("Run Path: " + actualTargetDir); ostringstream str; str << "Run Number: " << fRunNumber; Message(str.str()); Message("----------- OPENED FILES ----------------"); //print all the open files. if (fNightlyLogFile.is_open()) Message("Nightly Log..........OPEN"); else Message("Nightly log........CLOSED"); if (fNightlyReportFile.is_open()) Message("Nightly Report.......OPEN"); else Message("Nightly Report.....CLOSED"); if (fRunLogFile.is_open()) Message("Run Log..............OPEN"); else Message("Run Log............CLOSED"); if (fRunReportFile.is_open()) Message("Run Report...........OPEN"); else Message("Run Report.........CLOSED"); bool statWarning = false; DataLoggerStats statVar; calculateTotalSizeWritten(statVar, statWarning, false); Message("---------------- STATS ------------------"); str.str(""); str << "Total Size written: " << statVar.sizeWritten << " bytes."; Message(str.str()); str.str(""); str << "Disk free space: " << statVar.freeSpace << " bytes."; Message(str.str()); str.str(""); str << "Statistics are updated every " << fStatsPeriodDuration << " seconds"; if (fStatsPeriodDuration != 0) Message(str); else Message("Statistics updates are currently disabled"); Message("----------- DIM SUBSCRIPTIONS -----------"); str.str(""); str << "There are " << fNumSubAndFitsData.numSubscriptions << " active DIM subscriptions:"; Message(str.str()); for (map >::const_iterator it=fServiceSubscriptions.begin(); it!= fServiceSubscriptions.end();it++) { Message("Server "+it->first); for (map::const_iterator it2=it->second.begin(); it2!=it->second.end(); it2++) Message(" -> "+it2->first); } if (fHasBlackList) { Message("------------- BLOCK LIST ----------------"); for (set::iterator it=fBlackList.begin(); it != fBlackList.end(); it++) Message(*it); } if (fHasWhiteList) { Message("----------- ALLOW LIST ------------------"); for (set::iterator it=fWhiteList.begin(); it != fWhiteList.end(); it++) Message(*it); } if (fGrouping.size() != 0) { Message("--------- GROUPING LIST -----------------"); Message("The following servers and/or services will be grouping under a single run fits file:"); for (set::iterator it=fGrouping.begin(); it != fGrouping.end(); it++) Message(*it); } Message("-----------------------------------------"); Message("------ END OF DATA LOGGER STATE ---------"); Message("-----------------------------------------"); return GetCurrentState(); } // -------------------------------------------------------------------------- // //! turn debug mode on and off //! @param evt //! the current event. contains the instruction string: On, Off, on, off, ON, OFF, 0 or 1 //! @returns //! the new state. Which, in that case, is the current state //! int DataLogger::SetDebugOnOff(const Event& evt) { bool backupDebug = fDebugIsOn; fDebugIsOn = evt.GetBool(); if (fDebugIsOn == backupDebug) Warn("Warning: debug mode was already in the requested state"); else { ostringstream str; str << "Debug mode is now " << fDebugIsOn; Message(str.str()); } return GetCurrentState(); } // -------------------------------------------------------------------------- // //! set the statistics update period duration. 0 disables the statistics //! @param evt //! the current event. contains the new duration. //! @returns //! the new state. Which, in that case, is the current state //! int DataLogger::SetStatsPeriod(const Event& evt) { float backupDuration = fStatsPeriodDuration; fStatsPeriodDuration = evt.GetFloat(); if (fStatsPeriodDuration < 0) { Error("Statistics period duration should be greater than zero. Discarding provided value."); fStatsPeriodDuration = backupDuration; return GetCurrentState(); } if (fStatsPeriodDuration != fStatsPeriodDuration) { Error("Provided duration does not appear to be a valid float. discarding it."); fStatsPeriodDuration = backupDuration; return GetCurrentState(); } if (backupDuration == fStatsPeriodDuration) Warn("Warning: statistics period was not modified: supplied value already in use"); else { if (fStatsPeriodDuration == 0.0f) Message("Statistics are now OFF"); else { ostringstream str; str << "Statistics period is now " << fStatsPeriodDuration << " seconds"; Message(str.str()); } } return GetCurrentState(); } // -------------------------------------------------------------------------- // //! set the opened files service on or off. //! @param evt //! the current event. contains the instruction string. similar to setdebugonoff //! @returns //! the new state. Which, in that case, is the current state //! int DataLogger::SetOpenedFilesOnOff(const Event& evt) { bool backupOpened = fOpenedFilesIsOn; fOpenedFilesIsOn = evt.GetBool(); if (fOpenedFilesIsOn == backupOpened) Warn("Warning: opened files service mode was already in the requested state"); else { ostringstream str; str << "Opened files service mode is now " << fOpenedFilesIsOn; Message(str.str()); } return GetCurrentState(); } // -------------------------------------------------------------------------- // //! set the number of subscriptions and opened fits on and off //! @param evt //! the current event. contains the instruction string. similar to setdebugonoff //! @returns //! the new state. Which, in that case, is the current state //! int DataLogger::SetNumSubsAndFitsOnOff(const Event& evt) { bool backupSubs = fNumSubAndFitsIsOn; fNumSubAndFitsIsOn = evt.GetBool(); if (fNumSubAndFitsIsOn == backupSubs) Warn("Warning: Number of subscriptions service mode was already in the requested state"); else { ostringstream str; str << "Number of subscriptions service mode is now " << fNumSubAndFitsIsOn; Message(str.str()); } return GetCurrentState(); } // -------------------------------------------------------------------------- // //! Sets the path to use for the Nightly log file. //! @param evt //! the event transporting the path //! @returns //! currently only the current state. // int DataLogger::ConfigureNightlyFileName(const Event& evt) { if (evt.GetText() != NULL) { fNightlyFileName = string(evt.GetText()); Message("New Nightly folder specified: " + fNightlyFileName); } else Error("Empty Nightly folder given. Please specify a valid path."); return GetCurrentState(); } // -------------------------------------------------------------------------- // //! Sets the path to use for the run log file. //! @param evt //! the event transporting the path //! @returns //! currently only the current state int DataLogger::ConfigureRunFileName(const Event& evt) { if (evt.GetText() != NULL) { fRunFileName = string(evt.GetText()); Message("New Run folder specified: " + fRunFileName); } else Error("Empty Nightly folder given. Please specify a valid path"); return GetCurrentState(); } // -------------------------------------------------------------------------- // //! Sets the run number. //! @param evt //! the event transporting the run number //! @returns //! currently only the current state int DataLogger::ConfigureRunNumber(const Event& evt) { fRunNumber = evt.GetInt(); ostringstream str; str << "The new run number is: " << fRunNumber; Message(str.str()); return GetCurrentState(); } // -------------------------------------------------------------------------- // //! Notifies the DIM service that a particular file was opened //! @ param name the base name of the opened file, i.e. without path nor extension. //! WARNING: use string instead of string& because I pass values that do not convert to string&. //! this is not a problem though because file are not opened so often. //! @ param type the type of the opened file. 0 = none open, 1 = log, 2 = text, 4 = fits inline void DataLogger::NotifyOpenedFile(string name, int type, DimDescribedService* service) { if (fOpenedFilesIsOn) { if (fDebugIsOn) { ostringstream str; str << "Updating files service " << service->getName() << "with code: " << type << " and file: " << name; Debug(str.str()); str.str(""); str << "Num subs: " << fNumSubAndFitsData.numSubscriptions << " Num open FITS: " << fNumSubAndFitsData.numOpenFits; Debug(str.str()); } OpenFileToDim fToDim; fToDim.code = type; memcpy(fToDim.fileName, name.c_str(), name.size()+1); service->setData(reinterpret_cast(&fToDim), name.size()+1+sizeof(int)); service->setQuality(0); service->updateService(); } } // -------------------------------------------------------------------------- // //! Implements the Start transition. //! Concatenates the given path for the Nightly file and the filename itself (based on the day), //! and tries to open it. //! @returns //! kSM_NightlyOpen if success, kSM_BadNightlyConfig if failure int DataLogger::StartPlease() { if (fDebugIsOn) { Debug("Starting..."); } Time time; ostringstream sTime; sTime << time.Y() << "_" << time.M() << "_" << time.D(); fFullNightlyLogFileName = fNightlyFileName + '/' + sTime.str() + ".log"; fNightlyLogFile.open(fFullNightlyLogFileName.c_str(), ios_base::out | ios_base::app); if (errno != 0) { ostringstream str; str << "Unable to open Nightly Log " << fFullNightlyLogFileName << ". Reason: " << strerror(errno) << " [" << errno << "]"; Error(str); } fFullNightlyReportFileName = fNightlyFileName + '/' + sTime.str() + ".rep"; fNightlyReportFile.open(fFullNightlyReportFileName.c_str(), ios_base::out | ios_base::app); if (errno != 0) { ostringstream str; str << "Unable to open Nightly Report " << fFullNightlyReportFileName << ". Reason: " << strerror(errno) << " [" << errno << "]"; Error(str); } if (!fNightlyLogFile.is_open() || !fNightlyReportFile.is_open()) { ostringstream str; str << "Something went wrong while openning nightly files " << fFullNightlyLogFileName << " and " << fFullNightlyReportFileName; Error(str.str()); return kSM_BadNightlyConfig; } //get the size of the newly opened file. struct stat st; stat(fFullNightlyLogFileName.c_str(), &st); fBaseSizeNightly = st.st_size; stat(fFullNightlyReportFileName.c_str(), &st); fBaseSizeNightly += st.st_size; fFileSizesMap.clear(); fBaseSizeRun = 0; fPreviousSize = 0; //notify that files were opened string actualTargetDir; if (fNightlyFileName == ".") { char currentPath[FILENAME_MAX]; if (!getcwd(currentPath, sizeof(currentPath))) { if (errno != 0) { ostringstream str; str << "Unable retrieve current path" << ". Reason: " << strerror(errno) << " [" << errno << "]"; Error(str); } } actualTargetDir = currentPath; } else { actualTargetDir = fNightlyFileName; } //notify that a new file has been opened. NotifyOpenedFile(actualTargetDir + '/' + sTime.str(), 3, fOpenedNightlyFiles); fOpenedNightlyFits.clear(); return kSM_NightlyOpen; } #ifdef HAVE_FITS // -------------------------------------------------------------------------- // //! open if required a the FITS files corresponding to a given subscription //! @param sub //! the current DimInfo subscription being examined void DataLogger::OpenFITSFilesPlease(SubscriptionType& sub) { string serviceName(sub.dimInfo->getName()); //we must check if we should group this service subscription to a single fits file before we replace the / by _ bool hasGrouping = false; if (!sub.runFile.IsOpen() && (GetCurrentState() == kSM_Logging)) {//will we find this service in the grouping list ? for (set::iterator it=fGrouping.begin(); it!=fGrouping.end(); it++) { if (serviceName.find(*it) != string::npos) { hasGrouping = true; break; } } } for (unsigned int i=0;iupdateService(); if (fDebugIsOn) { ostringstream str; str << "Opened Nightly FITS: " << partialName << " and table: FACT-" << serviceName << ".current number of opened FITS: " << fNumSubAndFitsData.numOpenFits; Debug(str.str()); } } if (!sub.runFile.IsOpen() && (GetCurrentState() == kSM_Logging)) {//buffer for the run file have already been allocated when doing the Nightly file ostringstream sRun; sRun << fRunNumber; string fileNameOnly; string partialName; if (hasGrouping) { fileNameOnly = sRun.str() + "_group.fits"; partialName = fRunFileName + '/' + fileNameOnly; } else { fileNameOnly = sRun.str() + '_' + serviceName + ".fits"; partialName = fRunFileName + '/' + fileNameOnly; } //get the size of the file we're about to open if (fFileSizesMap.find(partialName) == fFileSizesMap.end()) { struct stat st; if (!stat(partialName.c_str(), &st)) fBaseSizeRun += st.st_size; else fBaseSizeRun = 0; fFileSizesMap[partialName] = 0; fOpenedRunFits[fileNameOnly].push_back(serviceName); } else if (hasGrouping) {//most likely I should add this service name. //the only case for which I should not add it is if a service disapeared, hence the file was closed //and reopened again. Unlikely to happen, but well it may bool found = false; for (vector::iterator it=fOpenedRunFits[fileNameOnly].begin(); it!=fOpenedRunFits[fileNameOnly].end(); it++) if (*it == serviceName) { found = true; break; } if (!found) fOpenedRunFits[fileNameOnly].push_back(serviceName); } if (hasGrouping && fRunFitsFile == NULL) try { fRunFitsFile = new CCfits::FITS(partialName, CCfits::RWmode::Write); (fNumSubAndFitsData.numOpenFits)++; } catch (CCfits::FitsException e) { ostringstream str; str << "Could not open FITS Run file " << partialName << " reason: " << e.message(); Error(str); fRunFitsFile = NULL; } string actualTargetDir; if (fRunFileName == ".") { char currentPath[FILENAME_MAX]; if (getcwd(currentPath, sizeof(currentPath))) actualTargetDir = currentPath; } else { actualTargetDir = fRunFileName; } NotifyOpenedFile(actualTargetDir + '/' + sRun.str(), 7, fOpenedRunFiles);// + '_' + serviceName, 4); if (hasGrouping) sub.runFile.Open(partialName, serviceName, fRunFitsFile, &fNumSubAndFitsData.numOpenFits, this);//Out()); else sub.runFile.Open(partialName, serviceName, NULL, &fNumSubAndFitsData.numOpenFits, this);//Out()); if (fNumSubAndFitsIsOn) fNumSubAndFits->updateService(); if (fDebugIsOn) { ostringstream str; str << "Opened Run FITS: " << partialName << " and table: FACT-" << serviceName << ".current number of opened FITS: " << fNumSubAndFitsData.numOpenFits; Debug(str.str()); } } } // -------------------------------------------------------------------------- // void DataLogger::AllocateFITSBuffers(SubscriptionType& sub) { int size = sub.dimInfo->getSize(); //Init the time columns of the file Description dateDesc(string("Time"), string("Modified Julian Date"), string("MjD")); sub.nightlyFile.AddStandardColumn(dateDesc, "1D", &fMjD, sizeof(double)); sub.runFile.AddStandardColumn(dateDesc, "1D", &fMjD, sizeof(double)); Description QoSDesc("Qos", "Quality of service", "None"); sub.nightlyFile.AddStandardColumn(QoSDesc, "1J", &fQuality, sizeof(int)); sub.runFile.AddStandardColumn(QoSDesc, "1J", &fQuality, sizeof(int)); const Converter::FormatList flist = sub.fConv->GetList(); // Compilation failed if (flist.empty() || flist.back().first.second!=0) { Error("Compilation of format string failed."); return; } //we've got a nice structure describing the format of this service's messages. //Let's create the appropriate FITS columns vector dataFormatsLocal; for (unsigned int i=0;iname()[0]) { case 'c': case 'C': dataQualifier.str("S"); break; case 's': dataQualifier << "I"; break; case 'i': case 'I': dataQualifier << "J"; break; case 'l': case 'L': dataQualifier << "J"; break; case 'f': case 'F': dataQualifier << "E"; break; case 'd': case 'D': dataQualifier << "D"; break; case 'x': case 'X': dataQualifier << "K"; break; case 'S': //for strings, the number of elements I get is wrong. Correct it dataQualifier.str(""); //clear dataQualifier << size-1 << "A"; size = size-1; break; default: Fatal("THIS SHOULD NEVER BE REACHED. dataLogger.cc ln 1198."); }; //we skip the variable length strings for now (in fits only) if (dataQualifier.str() != "S") dataFormatsLocal.push_back(dataQualifier.str()); } sub.nightlyFile.InitDataColumns(fServiceList.GetDescriptions(sub.dimInfo->getName()), dataFormatsLocal, sub.dimInfo->getData(), size); sub.runFile.InitDataColumns(fServiceList.GetDescriptions(sub.dimInfo->getName()), dataFormatsLocal, sub.dimInfo->getData(), size); } // -------------------------------------------------------------------------- // //! write a dimInfo data to its corresponding FITS files // void DataLogger::WriteToFITS(SubscriptionType& sub) { //nightly File status (open or not) already checked if (sub.nightlyFile.IsOpen()) { sub.nightlyFile.Write(sub.fConv); if (fDebugIsOn) { Debug("Writing to nightly FITS " + sub.nightlyFile.fFileName); } } if (sub.runFile.IsOpen()) { sub.runFile.Write(sub.fConv); if (fDebugIsOn) { Debug("Writing to Run FITS " + sub.runFile.fFileName); } } } #endif //if has_fits // -------------------------------------------------------------------------- // //! Implements the StartRun transition. //! Concatenates the given path for the run file and the filename itself (based on the run number), //! and tries to open it. //! @returns //! kSM_Logging if success, kSM_BadRunConfig if failure. int DataLogger::StartRunPlease() { if (fDebugIsOn) { Debug("Starting Run Logging..."); } //attempt to open run file with current parameters // if (fRunNumber == -1) // return kSM_BadRunConfig; ostringstream sRun; sRun << fRunNumber; fFullRunLogFileName = fRunFileName + '/' + sRun.str() + ".log"; fRunLogFile.open(fFullRunLogFileName.c_str(), ios_base::out | ios_base::app); //maybe should be app instead of ate if (errno != 0) { ostringstream str; str << "Unable to open run Log " << fFullRunLogFileName << ". Reason: " << strerror(errno) << " [" << errno << "]"; Error(str); } fFullRunReportFileName = fRunFileName + '/' + sRun.str() + ".rep"; fRunReportFile.open(fFullRunReportFileName.c_str(), ios_base::out | ios_base::app); if (errno != 0) { ostringstream str; str << "Unable to open run report " << fFullRunReportFileName << ". Reason: " << strerror(errno) << " [" << errno << "]"; Error(str); } if (!fRunLogFile.is_open() || !fRunReportFile.is_open()) { ostringstream str; str << "Something went wrong while openning nightly files " << fFullRunLogFileName << " and " << fFullRunReportFileName; Error(str.str()); return kSM_BadRunConfig; } //get the size of the newly opened file. struct stat st; fBaseSizeRun = 0; if (fFileSizesMap.find(fFullRunLogFileName) == fFileSizesMap.end()) { stat(fFullRunLogFileName.c_str(), &st); if (errno != 0) { ostringstream str; str << "Unable to stat " << fFullRunLogFileName << ". Reason: " << strerror(errno) << " [" << errno << "]"; Error(str); } else fBaseSizeRun += st.st_size; fFileSizesMap[fFullRunLogFileName] = 0; } if (fFileSizesMap.find(fFullRunReportFileName) == fFileSizesMap.end()) { stat(fFullRunReportFileName.c_str(), &st); if (errno != 0) { ostringstream str; str << "Unable to stat " << fFullRunReportFileName << ". Reason: " << strerror(errno) << " [" << errno << "]"; Error(str); } else fBaseSizeRun += st.st_size; fFileSizesMap[fFullRunReportFileName] = 0; } string actualTargetDir; if (fRunFileName == ".") { char currentPath[FILENAME_MAX]; if (!getcwd(currentPath, sizeof(currentPath))) { if (errno != 0) { ostringstream str; str << "Unable to retrieve the current path" << ". Reason: " << strerror(errno) << " [" << errno << "]"; Error(str); } } actualTargetDir = currentPath; } else { actualTargetDir = fRunFileName; } NotifyOpenedFile(actualTargetDir + '/' + sRun.str(), 3, fOpenedRunFiles); fOpenedRunFits.clear(); return kSM_Logging; } #ifdef HAVE_FITS void DataLogger::CreateFitsGrouping(bool runGroup) { if (fDebugIsOn) { ostringstream str; str << "Creating fits group for "; if (runGroup) str << "run files"; else str << "nightly files"; Debug(str.str()); } //create the FITS group corresponding to the ending run. CCfits::FITS* groupFile; map >& filesToGroup = runGroup? fOpenedRunFits : fOpenedNightlyFits; unsigned int numFilesToGroup = 0; for (map >::iterator it=filesToGroup.begin(); it != filesToGroup.end(); it++) { numFilesToGroup += it->second.size(); } if (fDebugIsOn) { ostringstream str; str << "There are " << numFilesToGroup << " tables to group"; Debug(str.str()); } if (numFilesToGroup <= 1) { filesToGroup.clear(); return; } ostringstream groupName; if (runGroup) { groupName << fRunFileName << '/' << fRunNumber << ".fits"; } else { Time time; ostringstream sTime; sTime << time.Y() << "_" << time.M() << "_" << time.D(); groupName << fNightlyFileName << '/' << sTime.str() << ".fits"; } CCfits::Table* groupTable; int maxCharLength = 50;//FILENAME_MAX; try { groupFile = new CCfits::FITS(groupName.str(), CCfits::RWmode::Write); //setup the column names ostringstream pathTypeName; pathTypeName << maxCharLength << "A"; vector names; vector dataTypes; names.push_back("MEMBER_XTENSION"); dataTypes.push_back("8A"); names.push_back("MEMBER_URI_TYPE"); dataTypes.push_back("3A"); names.push_back("MEMBER_LOCATION"); dataTypes.push_back(pathTypeName.str()); names.push_back("MEMBER_NAME"); dataTypes.push_back(pathTypeName.str()); groupTable = groupFile->addTable("GROUPING", numFilesToGroup, names, dataTypes); } catch (CCfits::FitsException e) { ostringstream str; str << "Could not open or create FITS table GROUPING in file " << groupName.str() << " reason: " << e.message(); Error(str); return; } //CCfits seems to be buggy somehow: can't use the column's function "write": it create a compilation error: maybe strings were not thought about. //use cfitsio routines instead groupTable->makeThisCurrent(); //create appropriate buffer. unsigned char* fitsBuffer = new unsigned char[8 + 3 + 2*maxCharLength + 1]; //+1 for trailling character memset(fitsBuffer, 0, 8 + 3 + 2*maxCharLength + 1); char* startOfExtension = reinterpret_cast(fitsBuffer); char* startOfURI = reinterpret_cast(&fitsBuffer[8]); char* startOfLocation = reinterpret_cast(&fitsBuffer[8 + 3]); char* startOfName = reinterpret_cast(&fitsBuffer[8+3+maxCharLength]); // char* startOfNameVisible = reinterpret_cast(&fitsBuffer[8 + 3 + 2*maxCharLength]); sprintf(startOfExtension, "%s", "BINTABLE"); sprintf(startOfURI, "%s", "URL"); int i=1; for (map >::iterator it=filesToGroup.begin(); it!=filesToGroup.end(); it++) for (vector::iterator jt=it->second.begin(); jt != it->second.end(); jt++, i++) { strcpy(startOfLocation, it->first.c_str()); strcpy(startOfName, jt->c_str()); if (fDebugIsOn) { ostringstream str; str << "Grouping " << it->first << " " << *jt; Debug(str.str()); } // strcpy(startOfNameVisible, jt->c_str()); int status = 0; fits_write_tblbytes(groupFile->fitsPointer(), i, 1, 8+3+2*maxCharLength, fitsBuffer, &status); if (status) { ostringstream str; str << "Could not write row #" << i << "In the fits grouping file " << groupName << ". Cfitsio error code: " << status; Error(str.str()); } } filesToGroup.clear(); delete groupFile; } #endif //HAVE_FITS // -------------------------------------------------------------------------- // //! Implements the StopRun transition. //! Attempts to close the run file. //! @returns //! kSM_WaitingRun if success, kSM_FatalError otherwise int DataLogger::StopRunPlease() { if (fDebugIsOn) { Debug("Stopping Run Logging..."); } if (!fRunLogFile.is_open() || !fRunReportFile.is_open()) return kSM_FatalError; if (fRunLogFile.is_open()) fRunLogFile.close(); if (fRunReportFile.is_open()) fRunReportFile.close(); #ifdef HAVE_FITS for (SubscriptionsListType::iterator i = fServiceSubscriptions.begin(); i != fServiceSubscriptions.end(); i++) for (map::iterator j = i->second.begin(); j != i->second.end(); j++) { if (j->second.runFile.IsOpen()) j->second.runFile.Close(); } if (fRunFitsFile != NULL) { delete fRunFitsFile; fRunFitsFile = NULL; (fNumSubAndFitsData.numOpenFits)--; } #endif NotifyOpenedFile("", 0, fOpenedRunFiles); if (fNumSubAndFitsIsOn) fNumSubAndFits->updateService(); CreateFitsGrouping(true); return kSM_WaitingRun; } // -------------------------------------------------------------------------- // //! Implements the Stop and Reset transitions. //! Attempts to close any openned file. //! @returns //! kSM_Ready int DataLogger::GoToReadyPlease() { if (fDebugIsOn) { Debug("Going to the Ready state..."); } if (fNightlyLogFile.is_open()) fNightlyLogFile.close(); if (fNightlyReportFile.is_open()) fNightlyReportFile.close(); if (fRunLogFile.is_open()) fRunLogFile.close(); if (fRunReportFile.is_open()) fRunReportFile.close(); #ifdef HAVE_FITS for (SubscriptionsListType::iterator i = fServiceSubscriptions.begin(); i != fServiceSubscriptions.end(); i++) for (map::iterator j = i->second.begin(); j != i->second.end(); j++) { if (j->second.nightlyFile.IsOpen()) j->second.nightlyFile.Close(); if (j->second.runFile.IsOpen()) j->second.runFile.Close(); } if (fRunFitsFile != NULL) { delete fRunFitsFile; fRunFitsFile = NULL; (fNumSubAndFitsData.numOpenFits)--; } #endif if (GetCurrentState() == kSM_Logging) NotifyOpenedFile("", 0, fOpenedRunFiles); if (GetCurrentState() == kSM_Logging || GetCurrentState() == kSM_WaitingRun || GetCurrentState() == kSM_NightlyOpen) { NotifyOpenedFile("", 0, fOpenedNightlyFiles); if (fNumSubAndFitsIsOn) fNumSubAndFits->updateService(); } CreateFitsGrouping(true); CreateFitsGrouping(false); return kSM_Ready; } // -------------------------------------------------------------------------- // //! Implements the transition towards kSM_WaitingRun //! Does nothing really. //! @returns //! kSM_WaitingRun int DataLogger::NightlyToWaitRunPlease() { if (fDebugIsOn) { Debug("Going to Wait Run Number state..."); } return kSM_WaitingRun; } bool DataLogger::SetConfiguration(Configuration& conf) { fDebugIsOn = conf.Get("debug"); //Set the block or allow list fBlackList.clear(); fWhiteList.clear(); if (conf.Has("block")) { vector vec = conf.Get>("block"); if (vec.size() != 0) { fHasBlackList = true; if (fDebugIsOn) Debug("Setting BLOCK list:"); } for (vector::iterator it = vec.begin(); it != vec.end(); it++) { fBlackList.insert(*it); if (fDebugIsOn) Debug(" " + *it); } } if (conf.Has("allow")) { vector vec = conf.Get>("allow"); if (vec.size() != 0) { fHasWhiteList = true; if (fDebugIsOn) Debug("Setting ALLOW list:"); } for (vector::iterator it=vec.begin(); it != vec.end(); it++) { fWhiteList.insert(*it); if (fDebugIsOn) Debug(" " + *it); } } //Set the grouping if (conf.Has("group")) { vector vec = conf.Get>("group"); if (vec.size() != 0) if (fDebugIsOn) Debug("Setting GROUPING list:"); for (vector::iterator it=vec.begin(); it != vec.end(); it++) { fGrouping.insert(*it); if (fDebugIsOn) Debug(" " + *it); } } return true; } // -------------------------------------------------------------------------- int RunDim(Configuration &conf) { WindowLog wout; //log.SetWindow(stdscr); if (conf.Has("log")) if (!wout.OpenLogFile(conf.Get("log"))) wout << kRed << "ERROR - Couldn't open log-file " << conf.Get("log") << ": " << strerror(errno) << endl; // Start io_service.Run to use the StateMachineImp::Run() loop // Start io_service.run to only use the commandHandler command detaching DataLogger logger(wout); if (!logger.SetConfiguration(conf)) return -1; logger.Run(true); return 0; } void RunThread(DataLogger* logger) { // This is necessary so that the StateMachine Thread can signal the // Readline thread to exit logger->Run(true); Readline::Stop(); } template int RunShell(Configuration &conf) { static T shell(conf.GetName().c_str(), conf.Get("console")!=1); WindowLog &win = shell.GetStreamIn(); WindowLog &wout = shell.GetStreamOut(); if (conf.Has("log")) if (!wout.OpenLogFile(conf.Get("log"))) win << kRed << "ERROR - Couldn't open log-file " << conf.Get("log") << ": " << strerror(errno) << endl; DataLogger logger(wout); if (!logger.SetConfiguration(conf)) return -1; shell.SetReceiver(logger); boost::thread t(boost::bind(RunThread, &logger)); shell.Run(); // Run the shell logger.Stop(); //Wait until the StateMachine has finished its thread //before returning and destroyinng the dim objects which might //still be in use. t.join(); return 0; } /* Extract usage clause(s) [if any] for SYNOPSIS. Translators: "Usage" and "or" here are patterns (regular expressions) which are used to match the usage synopsis in program output. An example from cp (GNU coreutils) which contains both strings: Usage: cp [OPTION]... [-T] SOURCE DEST or: cp [OPTION]... SOURCE... DIRECTORY or: cp [OPTION]... -t DIRECTORY SOURCE... */ void PrintUsage() { cout << "\n" "The data logger connects to all available Dim services and " "writes them to ascii and fits files.\n" "\n" "The default is that the program is started without user interaction. " "All actions are supposed to arrive as DimCommands. Using the -c " "option, a local shell can be initialized. With h or help a short " "help message about the usage can be brought to the screen.\n" "\n" "Usage: dataLogger [-c type] [OPTIONS]\n" " or: dataLogger [OPTIONS]\n"; cout << endl; } void PrintHelp() { /* Additional help text which is printed after the configuration options goes here */ cout << "\n" "The block option has priority over the allow, " "i.e. if both are present, only the block list is kept. " "If only a server name or service without its server prefix is given " "then all the services of that server, or all the services that " "correspond to the given suffix are ignored or considered.\n" "\n" "For example, block=DIS_DNS will skip all the services offered by " "the DIS_DNS server, while block=SERVICE_LIST will skip all the " "SERVICE_LIST services offered by any server.\n" "\n" "The commands offered by the dataLoger are the following: \n"; cout << setw(20) << DataLogger::fConfigDay << " : specify the path where to put the nightly files\n"; cout << setw(20) << DataLogger::fConfigRun << " : specify the path where to put the run files\n"; cout << setw(20) << DataLogger::fConfigRunNumber << " : specify the run number\n"; cout << setw(20) << DataLogger::fConfigLog << " : log a particular message\n"; cout << setw(20) << DataLogger::fTransStart << " : start the nightly logging\n"; cout << setw(20) << DataLogger::fTransStop << " : stop the nightly logging\n"; cout << setw(20) << DataLogger::fTransStartRun << " : start the run logging\n"; cout << setw(20) << DataLogger::fTransStopRun << " : stop the run logging\n"; cout << setw(20) << DataLogger::fTransReset << " : stop any logging and/or recover from an error state\n"; cout << setw(20) << DataLogger::fTransWait << " : go to the wait for run number state\n"; cout << setw(20) << DataLogger::fPrintCommand << " : print the current state of the logger to the shell\n"; cout << setw(20) << DataLogger::fDebugOnOff << " : turn on or off the debug mode\n"; cout << setw(20) << DataLogger::fStatsPeriod << " : set the periodicity of the statistics. 0 disable them\n"; cout << endl; } void SetupConfiguration(Configuration &conf) { const string n = conf.GetName()+".log"; po::options_description configp("Program options"); configp.add_options() ("dns", var("localhost"), "Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)") ("log,l", var(n), "Write log-file") ("console,c", var(), "Use console (0=shell, 1=simple buffered, X=simple unbuffered)") ; po::options_description configs("DataLogger options"); configs.add_options() ("block,b", vars(), "Black-list of services") ("allow,a", vars(), "White-list of services") ("debug", po_bool(), "Debug mode. Print clear text of received service reports to log-stream") ("group,g", vars(), "Grouping of services into a single run-Fits") ; conf.AddEnv("dns", "DIM_DNS_NODE"); conf.AddOptions(configp); conf.AddOptions(configs); } int main(int argc, const char* argv[]) { Configuration conf(argv[0]); conf.SetPrintUsage(PrintUsage); SetupConfiguration(conf); po::variables_map vm; try { vm = conf.Parse(argc, argv); } catch (exception &e) { #if BOOST_VERSION > 104000 po::multiple_occurrences *MO = dynamic_cast(&e); if (MO) cout << "Error: " << e.what() << " of '" << MO->get_option_name() << "' option." << endl; else #endif cout << "Error: " << e.what() << endl; cout << endl; return -1; } if (conf.HasPrint()) return -1; if (conf.HasVersion()) { FACT::PrintVersion(argv[0]); return -1; } if (conf.HasHelp()) { PrintHelp(); return -1; } Dim::Setup(conf.Get("dns")); // try { // No console access at all if (!conf.Has("console")) return RunDim(conf); // Console access w/ and w/o Dim if (conf.Get("console")==0) return RunShell(conf); else return RunShell(conf); } /* catch (exception& e) { cerr << "Exception: " << e.what() << endl; return -1; }*/ return 0; }