Changeset 10442


Ignore:
Timestamp:
04/21/11 12:27:37 (14 years ago)
Author:
lyard
Message:
Added dataLogger services and use only one file for fits runs
Location:
trunk/FACT++/src
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/Fits.cc

    r10380 r10442  
    1818#include "Time.h"
    1919#include "Converter.h"
     20
     21//for file stats
     22#include <sys/stat.h>
    2023
    2124using namespace std;
     
    3235void Fits::AddStandardColumn(Description& desc, std::string dataFormat, void* dataPointer, long unsigned int numDataBytes)
    3336{
     37        //check if entry already exist
     38        for (std::vector<Description>::iterator it=fStandardColDesc.begin(); it != fStandardColDesc.end(); it++)
     39                if (it->name == desc.name)
     40                        return;
    3441        fStandardColDesc.push_back(desc);
    3542        fStandardFormats.push_back(dataFormat);
     
    5360        else
    5461        {
     62                fDataColDesc.clear();
    5563                for (unsigned int i=0;i<dataFormat.size();i++)
    5664                {
     
    6977//! @param fileName the filename with complete or relative path of the file to open
    7078//! @param tableName the name of the table that will receive the logged data.
    71 //
    72 void Fits::Open(const std::string& fileName, const std::string& tableName)
     79//! @param file a pointer to an existing FITS file. If NULL, file will be opened and managed internally
     80//
     81void Fits::Open(const std::string& fileName, const std::string& tableName, FITS* file)
    7382{               
    74         try
    75         {
    76                 fFile = new FITS(fileName, RWmode::Write);
    77         }
    78         catch (FITS::CantOpen)
    79         {
    80                 std::ostringstream err;
    81                 err << "Could not open " << fileName << ".Skipping it.";
    82                 throw runtime_error(err.str());
    83         }       
     83        fFileName = fileName;
     84        if (file == NULL)
     85        {
     86                try
     87                {
     88                        fFile = new FITS(fileName, RWmode::Write);
     89                }
     90                catch (FITS::CantOpen)
     91                {
     92                        std::ostringstream err;
     93                        err << "Could not open " << fileName << ".Skipping it.";
     94                        throw runtime_error(err.str());
     95                }       
     96                fOwner = true;
     97        }
     98        else
     99        {
     100                fFile = file;
     101                fOwner = false;
     102        }
    84103        //concatenate the standard and data columns
    85104        //do it the inneficient way first: its easier and faster to code.
     
    206225void Fits::Write(Converter* conv)
    207226{
     227
     228        fTable->makeThisCurrent();
    208229        try
    209230        {
     
    240261
    241262        //This forces the writting of each row to the disk. Otherwise all rows are written when the file is closed.
     263        ///TODO check whether this consumes too much resources or not. If it does, flush every N write operations instead
    242264        fFile->flush();
    243265}
     
    252274//                      if (fTable != NULL)
    253275//                              delete fTable;
     276
    254277        std::string name = "TEND";
    255278        double doubleValue = fEndMjD;
    256279        std::string comment = "Time of the last received data";
    257280        fTable->addKey(name, doubleValue, comment);
    258 
    259         if (fFile != NULL)
     281       
     282        if (fFile != NULL && fOwner)
    260283                delete fFile;
    261284        fFile = NULL;
     
    264287        fCopyBuffer = NULL;
    265288}
     289
     290// --------------------------------------------------------------------------
     291//
     292//! This closes the currently openned FITS file.
     293//! it also updates the header to reflect the time of the last logged row
     294//     
     295int Fits::GetWrittenSize()
     296{
     297        if (!IsOpen())
     298                return 0;
     299               
     300        struct stat st;
     301        if (stat(fFileName.c_str(), &st))
     302                return 0;
     303        else
     304                return st.st_size;
     305}
  • trunk/FACT++/src/Fits.h

    r10367 r10442  
    1616                ///The CCfits object to the FITS file
    1717                FITS* fFile;
     18                ///Flag indicating whether the FITS object should be managed internally or not.
     19                bool fOwner;
    1820                ///The CCfits Table
    1921                Table* fTable;
     
    4951        public:
    5052               
    51                 Fits() : fFile(NULL),
    52                                          fTable(NULL),
     53                Fits() :  fFile(NULL),
     54                                  fOwner(false),
     55                                  fTable(NULL),
    5356                                         fNumRows(0),
    5457                                         fDataPointer(NULL),
     
    5659                                         fCopyBuffer(NULL),
    5760                                         fTotalNumBytes(0),
    58                                          fEndMjD(0.0)
     61                                         fEndMjD(0.0),
     62                                         fFileName("")
    5963                 {}
    6064               
     
    7478
    7579                ///Opens a FITS file
    76                 void Open(const std::string& fileName, const std::string& tableName);
     80                void Open(const std::string& fileName, const std::string& tableName, FITS* file);
    7781
    7882                ///Write one line of data. Use the given converter.
     
    8185                ///Close the currently opened file.
    8286                void Close();
     87               
     88                ///Get the size currently written on the disk
     89                int GetWrittenSize();
     90                ///Name of the openned file. For querying stats
     91                std::string fFileName;
    8392
    8493};//Fits
    8594
     95
    8696#endif /*FITS_H_*/
    8797
  • trunk/FACT++/src/dataLogger.cc

    r10421 r10442  
    5454#include "Description.h"
    5555
    56 #define HAS_FITS
     56//for getting stat of opened files
     57#include <unistd.h>
     58//for getting disk free space
     59#include <sys/statvfs.h>
     60//for getting files sizes
     61#include <sys/stat.h>
     62
     63//#define HAS_FITS
    5764
    5865#include <fstream>
    5966
    6067#include <boost/bind.hpp>
     68#include <boost/thread.hpp>
    6169
    6270#ifdef HAS_FITS
     
    259267        ///Allocate the buffers required for fits
    260268        void AllocateFITSBuffers(SubscriptionType& sub);
     269        ///FITS file for runs. only one, hence dealt with in the dataLogger itself
     270        FITS* fRunFitsFile;
    261271#endif
    262272public:
    263273        ///checks with fServiceList whether or not the services got updated
    264274        void CheckForServicesUpdate();
     275
     276private:       
     277        ///monitoring notification loop
     278        void ServicesMonitoring();
     279        ///services notification thread
     280        boost::thread fMonitoringThread;
     281        ///end of the monitoring
     282        bool fContinueMonitoring;
     283        ///required for accurate monitoring
     284        std::map<std::string, long long> fFileSizesMap;
     285        std::string fFullDailyLogFileName;
     286        std::string fFullDailyReportFileName;
     287        std::string fFullRunLogFileName;
     288        std::string fFullRunReportFileName;
     289        long long fBaseSizeDaily;
     290        long long fPreviousSize;
     291        long long fBaseSizeRun;
     292        ///Service for opened files
     293        DimService* fOpenedFiles;
     294        char* fDimBuffer;
     295        inline void NotifyOpenedFile(std::string name, int type);
    265296       
    266297}; //DataLogger
     
    280311const char* DataLogger::fRunNumberInfo = "RUN_NUMBER";
    281312
     313void DataLogger::ServicesMonitoring()
     314{
     315                //create the DIM service
     316                int dataSize = 2*sizeof(long long) + sizeof(long);
     317                char* data = new char[dataSize];
     318                memset(data, 0, dataSize);
     319                DimService srvc("DATALOGGER/STATS", "x:2;l:1", static_cast<void*>(data), dataSize);
     320                double deltaT = 1;
     321                fPreviousSize = 0;
     322
     323                long long& targetSize  = reinterpret_cast<long long*>(data)[0];
     324                long long& targetFreeSpace  = reinterpret_cast<long long*>(data)[1];
     325                long& targetRate = reinterpret_cast<long*>(data + 2*sizeof(long long))[0];
     326                //loop-wait for broadcast
     327                while (fContinueMonitoring)
     328                {
     329                        sleep(deltaT);
     330                        //update the fits files sizes
     331#ifdef HAS_FITS
     332                        SubscriptionsListType::iterator x;
     333                        std::map<std::string, SubscriptionType>::iterator y;
     334                        bool runFileDone = false;
     335                        for (x=fServiceSubscriptions.begin(); x != fServiceSubscriptions.end(); x++)
     336                        {
     337                                for (y=x->second.begin(); y != x->second.end(); y++)
     338                                {
     339                                        if (y->second.runFile.IsOpen() && !runFileDone)
     340                                        {
     341                                                        fFileSizesMap[y->second.runFile.fFileName] = y->second.runFile.GetWrittenSize();
     342                                                        runFileDone = true;
     343                                        }
     344                                        if (y->second.dailyFile.IsOpen())
     345                                                fFileSizesMap[y->second.dailyFile.fFileName] = y->second.dailyFile.GetWrittenSize();
     346                                }
     347                        }
     348#endif
     349                        struct stat st;
     350                        //gather log and report files sizes on disk
     351                        if (fDailyLogFile.is_open())
     352                        {
     353                                stat(fFullDailyLogFileName.c_str(), &st);
     354                                fFileSizesMap[fFullDailyLogFileName] = st.st_size;     
     355                        }
     356                        if (fDailyReportFile.is_open())
     357                        {
     358                                stat(fFullDailyReportFileName.c_str(), &st);
     359                                fFileSizesMap[fFullDailyReportFileName] = st.st_size;   
     360                        }
     361                        if (fRunLogFile.is_open())
     362                        {
     363                                stat(fFullRunLogFileName.c_str(), &st);
     364                                fFileSizesMap[fFullRunLogFileName] = st.st_size;       
     365                        }
     366                        if (fRunReportFile.is_open())
     367                        {
     368                                stat(fFullRunReportFileName.c_str(), &st);
     369                                fFileSizesMap[fFullRunReportFileName] = st.st_size;
     370                        }       
     371                        if (fDailyLogFile.is_open())
     372                        {
     373                                struct statvfs vfs;
     374                                statvfs(fDailyFileName.c_str(), &vfs);
     375                                targetFreeSpace = vfs.f_bsize*vfs.f_bavail;
     376                        }
     377                        //sum up all the file sizes. past and present
     378                        targetSize = 0;
     379                        for (std::map<std::string, long long>::iterator it=fFileSizesMap.begin(); it != fFileSizesMap.end();  it++)
     380                                targetSize += it->second;
     381                        targetSize -= fBaseSizeDaily;
     382                        targetSize -= fBaseSizeRun;
     383                        //FIXME get the actual time elapsed
     384                        targetRate = (targetSize - fPreviousSize)/deltaT; 
     385                        fPreviousSize = targetSize;
     386                        if (targetRate != 0) //if data has been written
     387                        {
     388std::cout << "fBaseSizeDaily: " << fBaseSizeDaily << " fBaseSizeRun: " << fBaseSizeRun << std::endl;
     389//std::cout << "Written Size: " << targetSize << ", free space: " << targetFreeSpace << ", data rate: " << targetRate << " Bytes/s" << std::endl;
     390
     391                                srvc.updateService(static_cast<void*>(data), dataSize);
     392                        }
     393                }
     394                delete[] data;
     395}
     396
     397
    282398// --------------------------------------------------------------------------
    283399//
     
    293409                fRunFileName = "/home/lyard/log";
    294410                fRunNumber = 12345;
     411#ifdef HAS_FITS
     412                fRunFitsFile = NULL;
     413#endif
    295414                //Give a name to this machine's specific states
    296415                AddStateName(kSM_DailyOpen,      "DailyFileOpen");
     
    344463                fServiceList.SetHandler(this);
    345464                CheckForServicesUpdate();
     465               
     466                //start the monitoring service
     467                fContinueMonitoring = true;
     468                fMonitoringThread = boost::thread(boost::bind(&DataLogger::ServicesMonitoring, this));
     469                fBaseSizeDaily = 0;
     470                fBaseSizeRun = 0;
     471                //start the open files service
     472                fDimBuffer = new char[256];
     473                memset(fDimBuffer, 0, sizeof(int));
     474                fDimBuffer[sizeof(int)] = '\0';
     475                //gives the entire buffer size. Otherwise, dim overwrites memory at bizarre locations if smaller size is given at creation time.
     476                fOpenedFiles =  new DimService("DATALOGGER/FILENAME", "i:1;C", static_cast<void*>(fDimBuffer), 256);
     477               
    346478               
    347479}
     
    433565DataLogger::~DataLogger()
    434566{
     567        //exit the monitoring loop
     568        fContinueMonitoring = false;
     569        delete[] fDimBuffer;
     570        delete fOpenedFiles;
     571        fMonitoringThread.join();
    435572        //close the files
    436573        if (fDailyLogFile.is_open())
     
    444581        //release the services subscriptions
    445582        fServiceSubscriptions.clear();
     583#ifdef HAS_FITS
     584        if (fRunFitsFile != NULL)
     585                delete fRunFitsFile;
     586        fRunFitsFile = NULL;
     587#endif
    446588}
    447589// --------------------------------------------------------------------------
     
    602744        CheckForRunNumber(I);
    603745        ReportPlease(I, y->second);
     746
    604747}
    605748
     
    613756void DataLogger::CheckForRunNumber(DimInfo* I)
    614757{
    615         return;
    616758        if (strstr(I->getName(), fRunNumberInfo) != NULL)
    617759        {//assumes that the run number is an integer
     
    822964// --------------------------------------------------------------------------
    823965//
     966//! Notifies the DIM service that a particular file was opened
     967//! @ param name the base name of the opened file, i.e. without path nor extension.
     968//!     WARNING: use string instead of string& because I pass values that do not convert to string&.
     969//!             this is not a problem though because file are not opened so often.
     970//! @ param type the type of the opened file. 0 = none open, 1 = log, 2 = text, 4 = fits
     971inline void DataLogger::NotifyOpenedFile(std::string name, int type)
     972{
     973//std::cout << "name: " << name << " size: " << name.size() << std::endl;
     974        reinterpret_cast<int*>(fDimBuffer)[0] = type;
     975        strcpy(&fDimBuffer[sizeof(int)], name.c_str());
     976        fOpenedFiles->updateService(static_cast<void*>(fDimBuffer), name.size() + 1 + sizeof(int));
     977}
     978// --------------------------------------------------------------------------
     979//
    824980//! Implements the Start transition.
    825981//! Concatenates the given path for the daily file and the filename itself (based on the day),
     
    833989        std::stringstream sTime;
    834990        sTime << time.Y() << "_" << time.M() << "_" << time.D();
    835         std::string fullName = fDailyFileName + '/' + sTime.str() + ".log";
    836        
    837         fDailyLogFile.open(fullName.c_str(), std::ios_base::out | std::ios_base::app); //maybe should be "app" instead of "ate" ??
    838         fullName = fDailyFileName + '/' + sTime.str() + ".rep";
    839         fDailyReportFile.open(fullName.c_str(), std::ios_base::out | std::ios_base::app);
     991        fFullDailyLogFileName = fDailyFileName + '/' + sTime.str() + ".log";
     992       
     993        fDailyLogFile.open(fFullDailyLogFileName.c_str(), std::ios_base::out | std::ios_base::app); //maybe should be "app" instead of "ate" ??
     994        fFullDailyReportFileName = fDailyFileName + '/' + sTime.str() + ".rep";
     995        fDailyReportFile.open(fFullDailyReportFileName.c_str(), std::ios_base::out | std::ios_base::app);
    840996       
    841997        if (!fDailyLogFile.is_open() || !fDailyReportFile.is_open())
     
    8441000            return kSM_BadDailyConfig;
    8451001        }
    846 
     1002        //get the size of the newly opened file.
     1003        struct stat st;
     1004        stat(fFullDailyLogFileName.c_str(), &st);
     1005        fBaseSizeDaily = st.st_size;   
     1006        stat(fFullDailyReportFileName.c_str(), &st);
     1007        fBaseSizeDaily += st.st_size;
     1008        fFileSizesMap.clear();
     1009        fBaseSizeRun = 0;
     1010        fPreviousSize = 0;
     1011        //notify that files were opened
     1012        NotifyOpenedFile(sTime.str(), 3);
     1013       
     1014       
    8471015        return kSM_DailyOpen;   
    8481016}
     
    8731041                std::string partialName = fDailyFileName + '/' + sTime.str() + '_' + serviceName + ".fits";
    8741042                AllocateFITSBuffers(sub);
    875                 sub.dailyFile.Open(partialName, serviceName);
     1043                //get the size of the file we're about to open
     1044                if (fFileSizesMap.find(partialName) == fFileSizesMap.end())
     1045                {
     1046                        struct stat st;
     1047                        if (!stat(partialName.c_str(), &st))
     1048                                fBaseSizeDaily += st.st_size;
     1049                        fFileSizesMap[partialName] = 0;
     1050                }
     1051                sub.dailyFile.Open(partialName, serviceName, NULL);
     1052                //notify the opening
     1053                NotifyOpenedFile(sTime.str() + '_' + serviceName, 4);
     1054               
    8761055        }
    8771056        if (!sub.runFile.IsOpen() && (GetCurrentState() == kSM_Logging))
     
    8791058                std::stringstream sRun;
    8801059                sRun << fRunNumber;
    881                 std::string partialName = fRunFileName + '/' + sRun.str() + '_' + serviceName + ".fits";
    882                 sub.runFile.Open(partialName, serviceName);
     1060                std::string partialName = fRunFileName + '/' + sRun.str() + ".fits";//'_' + serviceName + ".fits";
     1061                if (fRunFitsFile == NULL)
     1062                {
     1063                        //get the size of the file we're about to open
     1064                        if (fFileSizesMap.find(partialName) == fFileSizesMap.end())
     1065                        {
     1066                                struct stat st;
     1067                                if (!stat(partialName.c_str(), &st))
     1068                                        fBaseSizeRun += st.st_size;
     1069                                else
     1070                                        fBaseSizeRun = 0;
     1071                                fFileSizesMap[partialName] = 0;
     1072                        }
     1073                        try
     1074                        {
     1075                                fRunFitsFile = new FITS(partialName, RWmode::Write);   
     1076                        }       
     1077                        catch (CCfits::FitsError e)
     1078                        {
     1079                                std::ostringstream err;
     1080                                err << "Could not open run file " << partialName;
     1081                                throw runtime_error(err.str());
     1082                        }
     1083                        NotifyOpenedFile(sRun.str(), 4);// + '_' + serviceName, 4);
     1084                }
     1085                sub.runFile.Open(partialName, serviceName, fRunFitsFile);
    8831086        }
    8841087}       
     
    9151118                dataQualifier << flist[i].second.first;
    9161119                switch (flist[i].first.first->name()[0])
    917                 {
     1120                {//TODO handle all the data format cases
    9181121                        case 'c':
    9191122                                dataQualifier <<  "S";
     
    9361139                        break;
    9371140                        case 'x':
     1141                        case 'X':
    9381142                                dataQualifier << "K";
    9391143                        break;
     
    9811185        std::stringstream sRun;
    9821186        sRun << fRunNumber;
    983         std::string fullName = fRunFileName + '/' + sRun.str() + ".log";
    984         fRunLogFile.open(fullName.c_str(), std::ios_base::out | std::ios_base::app); //maybe should be app instead of ate
    985 
    986         fullName = fRunFileName + '/' + sRun.str() + ".rep";
    987         fRunReportFile.open(fullName.c_str(), std::ios_base::out | std::ios_base::app);
     1187        fFullRunLogFileName = fRunFileName + '/' + sRun.str() + ".log";
     1188        fRunLogFile.open(fFullRunLogFileName.c_str(), std::ios_base::out | std::ios_base::app); //maybe should be app instead of ate
     1189
     1190        fFullRunReportFileName = fRunFileName + '/' + sRun.str() + ".rep";
     1191        fRunReportFile.open(fFullRunReportFileName.c_str(), std::ios_base::out | std::ios_base::app);
    9881192       
    9891193        if (!fRunLogFile.is_open() || !fRunReportFile.is_open())
     
    9921196                return kSM_BadRunConfig;       
    9931197        }
     1198        //get the size of the newly opened file.
     1199        struct stat st;
     1200        fBaseSizeRun = 0;
     1201        if (fFileSizesMap.find(fFullRunLogFileName) == fFileSizesMap.end())
     1202        {
     1203                stat(fFullRunLogFileName.c_str(), &st);
     1204                fBaseSizeRun += st.st_size;
     1205                fFileSizesMap[fFullRunLogFileName] = 0;
     1206        }
     1207        if (fFileSizesMap.find(fFullRunReportFileName) == fFileSizesMap.end())
     1208        {
     1209                stat(fFullRunReportFileName.c_str(), &st);
     1210                fBaseSizeRun += st.st_size;
     1211                fFileSizesMap[fFullRunReportFileName] = 0;
     1212        }
     1213        NotifyOpenedFile(sRun.str(), 3);
    9941214       
    9951215        return kSM_Logging;
     
    10151235                                        j->second.runFile.Close();     
    10161236                }
     1237        if (fRunFitsFile != NULL)
     1238        {
     1239                delete fRunFitsFile;
     1240                fRunFitsFile = NULL;   
     1241        }
    10171242#endif
    10181243        return kSM_WaitingRun;
     
    10461271                                        j->second.runFile.Close();     
    10471272                }
     1273        if (fRunFitsFile != NULL)
     1274        {
     1275                delete fRunFitsFile;
     1276                fRunFitsFile = NULL;   
     1277        }
    10481278#endif
    10491279        return kSM_Ready;
Note: See TracChangeset for help on using the changeset viewer.