Changeset 10951


Ignore:
Timestamp:
06/09/11 15:44:01 (13 years ago)
Author:
lyard
Message:
moved various processed from secondary thread to main thread
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/dataLogger.cc

    r10943 r10951  
    6363
    6464#include <fstream>
     65#include <mutex>
    6566
    6667#include <boost/bind.hpp>
     
    135136
    136137    }
    137     ///copy operator
    138     /*
    139     void operator = (const RunNumberType& other)
    140     {
    141 #ifdef RUN_LOGS
    142         logFile = other.logFile;
    143         logName = other.logName;
    144 #endif
    145         reportFile = other.reportFile;
    146         reportName = other.reportName;
    147         runNumber = other.runNumber;
    148         time = other.time;
    149 #ifdef HAVE_FITS
    150         runFitsFile = other.runFitsFile;
    151 #endif
    152 }*/
    153 
    154     ///copy constructor
    155     RunNumberType(const RunNumberType& other)
    156     {
    157 #ifdef RUN_LOGS
    158         logFile = other.logFile;
    159         logName = other.logName;
    160 #endif
    161         reportFile = other.reportFile;
    162         reportName = other.reportName;
    163         runNumber = other.runNumber;
    164         time = other.time;
    165 #ifdef HAVE_FITS
    166         runFitsFile = other.runFitsFile;
    167 #endif
    168     }
    169138
    170139    void addServiceToOpenedFits(const string& fileName, const string& serviceName)
     
    198167    ///the current run number used by this subscription
    199168    uint32_t runNumber;
    200     ///copy operator
    201     /*
    202     void operator = (const SubscriptionType& other)
    203     {
    204 #ifdef HAVE_FITS
    205         nightlyFile = other.nightlyFile;
    206         runFile = other.runFile;
    207 #endif
    208         dimInfo = other.dimInfo;
    209         server = other.server;
    210         service = other.service;
    211         fConv = other.fConv;
    212         runNumber = other.runNumber;
    213         }*/
    214 
    215 /*    ///copy constructor
    216     SubscriptionType(const SubscriptionType& other)
    217     {
    218 #ifdef HAVE_FITS
    219         nightlyFile = other.nightlyFile;
    220         runFile = other.runFile;
    221 #endif
    222         dimInfo = other.dimInfo;
    223         server = other.server;
    224         service = other.service;
    225         fConv = other.fConv;
    226         runNumber = other.runNumber;
    227     }*/
     169    ///time of the latest received event
     170    Time lastReceivedEvent;
     171    ///whether or not the fits buffer was allocated already
     172    bool fitsBufferAllocated;
     173
    228174    ///Dim info constructor
    229175    SubscriptionType(DimStampedInfo* info=NULL)
     
    232178        fConv = shared_ptr<Converter>();
    233179        runNumber = 0;
    234     }
    235     ///default constructor
    236 /*    SubscriptionType()
    237     {
    238         dimInfo = shared_ptr<DimStampedInfo>();
    239         fConv = shared_ptr<Converter>();
    240         runNumber = 0;
    241     }*/
     180        lastReceivedEvent = Time::None;
     181        fitsBufferAllocated = false;
     182    }
     183
    242184    ///default destructor
    243185    ~SubscriptionType()
     
    293235    ///full name of the nightly report file
    294236    string fFullNightlyReportFileName;
    295 
     237    ///variables for computing statistics
     238    DataLoggerStats fStatVar;
     239    ///variable to track when the statistic were last calculated
     240    double fPreviousStatsUpdateTime;
     241    double fPreviousOldRunNumberCheck;
     242    ///mutex to make sure that the Stats are not accessed while updating
     243    mutex fStatsMutex;
     244    ///boolean to know whether we should close and reopen daily files or not
     245    bool fDailyFileDayChangedAlready;
    296246public:
    297247    /***************************************************
     
    556506    list[service].server  = server;
    557507    list[service].service = service;
    558 
     508    fNumSubAndFitsData.numSubscriptions++;
    559509    if (fDebugIsOn)
    560510        Debug("Added subscription to " + server + "/" + service);
     
    571521    if (isCmd)
    572522        return;
    573 
    574     fServiceSubscriptions[server].erase(service);
     523    if (fServiceSubscriptions[server].erase(service) != 1)
     524    {
     525        ostringstream str;
     526        str << "Subscription " << server << "/" << service << "could not be removed as it is not present";
     527        Error(str.str());
     528        return;
     529    }
     530    fNumSubAndFitsData.numSubscriptions--;
    575531    if (fDebugIsOn)
    576532    {
     
    585541void DataLogger::RemoveAllServices(const string& server)
    586542{
     543    fNumSubAndFitsData.numSubscriptions -= fServiceSubscriptions[server].size();
    587544    fServiceSubscriptions[server].clear();
    588545    if (fDebugIsOn)
     
    813770bool DataLogger::calculateTotalSizeWritten(DataLoggerStats& statVar, bool isPrinting)
    814771{
     772    if (!isPrinting)
     773        fStatsMutex.lock();
    815774#ifdef HAVE_FITS
    816775    if (isPrinting)
     
    879838    statVar.sizeWritten -= fBaseSizeNightly;
    880839    statVar.sizeWritten -= fBaseSizeRun;
     840
     841    if (!isPrinting)
     842        fStatsMutex.unlock();
    881843
    882844    return shouldWarn;
     
    908870void DataLogger::ServicesMonitoring()
    909871{
    910         DataLoggerStats statVar;
    911         statVar.sizeWritten = 0;
    912         statVar.freeSpace = 0;
    913         statVar.writingRate = 0;
    914 
    915         struct statvfs vfs;
     872       struct statvfs vfs;
    916873        if (!statvfs(fNightlyFilePath.c_str(), &vfs))
    917             statVar.freeSpace = vfs.f_bsize*vfs.f_bavail;
     874            fStatVar.freeSpace = vfs.f_bsize*vfs.f_bavail;
    918875        else
    919             statVar.freeSpace = -1;
    920 
    921         DimDescribedService srvc ("DATA_LOGGER/STATS", "X:3", statVar, "Add description here");
     876            fStatVar.freeSpace = -1;
     877
     878        DimDescribedService srvc ("DATA_LOGGER/STATS", "X:3", fStatVar, "Add description here");
    922879        fPreviousSize = 0;
    923         //bool statWarning = false;
    924         bool resetDone = false;
    925880        //loop-wait for broadcast
    926881        while (fContinueMonitoring)
    927882        {
    928             //check if some run number entries can be deleted
    929             while (fRunNumber.size() > 1 && (Time() - fRunNumber.front().time) > boost::posix_time::time_duration(0,0,10,0))
    930             {
    931                 RemoveOldestRunNumber();
    932             }
    933             //check if daily files should be closed and reopened.
    934             //FIXME when Time().h() == 12, the actual time is 2pm
    935             if (Time().h() == 12 && !resetDone)
    936             {
    937                 if (fDebugIsOn)
    938                     Debug("Change of day detected. Closing daily files and restarting the dataLogger");
    939                 int cState = GetCurrentState();
    940                 GoToReadyPlease();
    941                 //it's noon: no run number should survive
    942                 //this I'm not too sure about...
    943 //                while (fRunNumber.size() > 0)
    944 //                    RemoveOldestRunNumber();
    945 
    946                 if (cState >= kSM_NightlyOpen)
    947                     StartPlease();
    948                 if (cState >= kSM_WaitingRun)
    949                     NightlyToWaitRunPlease();
    950                 if (cState >= kSM_Logging)
    951                     StartRunPlease();
    952 
    953                 resetDone = true;
    954             }
    955             else
    956  //           {
    957  //               if (Time().h() != 12 && resetDone)
    958                     resetDone = false;
    959 //            }
    960883            if (fStatsPeriodDuration == 0.0f)
    961884            {
     
    966889            sleep(fStatsPeriodDuration);
    967890
    968             //update the fits files sizes
    969             /*statWarning =*/ calculateTotalSizeWritten(statVar, false);
    970             if (fStatsPeriodDuration == 0.0f)
    971                 continue;
    972             statVar.writingRate = (statVar.sizeWritten - fPreviousSize)/fStatsPeriodDuration; 
    973 
    974             fPreviousSize = statVar.sizeWritten;
    975             if (statVar.writingRate != 0) //if data has been written
     891
     892            fStatsMutex.lock();
     893
     894            if (fStatVar.writingRate != 0) //if data has been written
    976895            {
    977896                srvc.updateService();
     
    986905                }
    987906            }
     907            fStatsMutex.unlock();
    988908        }
    989909}
     
    1056976             ("Print information about the internal status of the data logger.");
    1057977
    1058      //start the monitoring service
    1059      fContinueMonitoring = true;
    1060      fMonitoringThread = boost::thread(boost::bind(&DataLogger::ServicesMonitoring, this));
    1061      fBaseSizeNightly = 0;
    1062      fBaseSizeRun = 0;
    1063978     OpenFileToDim fToDim;
    1064979     fToDim.code = 0;
     
    11081023
    11091024     fDestructing = false;
     1025
     1026     //start the monitoring service
     1027     fStatVar.sizeWritten = 0;
     1028     fStatVar.freeSpace = 0;
     1029     fStatVar.writingRate = 0;
     1030     fPreviousStatsUpdateTime = Time().Mjd();
     1031     fPreviousOldRunNumberCheck = Time().Mjd();
     1032     fContinueMonitoring = true;
     1033     fMonitoringThread = boost::thread(boost::bind(&DataLogger::ServicesMonitoring, this));
     1034     fBaseSizeNightly = 0;
     1035     fBaseSizeRun = 0;
     1036     fDailyFileDayChangedAlready = true;
    11101037     if(fDebugIsOn)
    11111038     {
     
    11971124    ReportPlease(I, y->second);
    11981125
     1126    //update the fits files sizes
     1127    Time cTime = Time();
     1128    if ((fStatsPeriodDuration != 0) && ((cTime.Mjd() - fPreviousStatsUpdateTime)*24*60*60 > fStatsPeriodDuration))
     1129    {
     1130        calculateTotalSizeWritten(fStatVar, false);
     1131        fStatVar.writingRate = (fStatVar.sizeWritten - fPreviousSize)/((cTime.Mjd() - fPreviousStatsUpdateTime)*24*60*60);
     1132        fPreviousSize = fStatVar.sizeWritten;
     1133        fPreviousStatsUpdateTime = cTime.Mjd();
     1134    }
     1135    if ((cTime.Mjd() - fPreviousOldRunNumberCheck)*24*60*60 > 10.0)
     1136    {
     1137        while (fRunNumber.size() > 1 && (cTime - fRunNumber.back().time) > boost::posix_time::time_duration(0,0,10,0))
     1138        {
     1139             RemoveOldestRunNumber();
     1140        }
     1141        fPreviousOldRunNumberCheck = cTime.Mjd();
     1142    }
    11991143}
    12001144// --------------------------------------------------------------------------
     
    13131257    if (strstr(I->getName(), fRunNumberInfo) != NULL)
    13141258    {//assumes that the run number is an integer
     1259        //check if some run number entries can be deleted leave one so that two remain after adding the new one
    13151260        AddNewRunNumber(I->getLonglong(), Time(I->getTimestamp(), I->getTimestampMillisecs()*1000));
    13161261    }
     
    13271272void DataLogger::ReportPlease(DimInfo* I, SubscriptionType& sub)
    13281273{
     1274
     1275
    13291276    //should we log or report this info ? (i.e. is it a message ?)
    13301277    bool isItaReport = ((strstr(I->getName(), "Message") == NULL) && (strstr(I->getName(), "MESSAGE") == NULL));
    13311278    if (I->getFormat()[0] == 'C')
    13321279        isItaReport = false;
    1333    
     1280
    13341281    if (!fNightlyReportFile.is_open())
    13351282        return;
     1283
     1284    //Check whether we should close and reopen daily text files or not
     1285    //This should work in any case base of the following:
     1286    // - fDailyFileDayChangedAlready is initialized to true. So if the dataLogger is started around noon, no file will be closed
     1287    // - fDailyFileDayChangedAlready is set to false if (time != 12), so the file will be closed and reopened only if the logger runs since before noon (which is the required behavior)
     1288    //This only applies to text files. Fits are closed and reopened based on the last and current service received time.
     1289    //this was not applicable to text files, because as they gather several services, we have no guarantee that the received time will be greater than the previous one,
     1290    //which could lead to several close/reopen instead of only one.
     1291    if (Time().h() == 12 && !fDailyFileDayChangedAlready)
     1292    {
     1293        if (fDebugIsOn)
     1294            Debug("Its Noon! Closing and reopening daily text files");
     1295
     1296        fNightlyLogFile.close();
     1297        fNightlyReportFile.close();
     1298
     1299        fFullNightlyLogFileName = CompileFileName(fNightlyFilePath, "", "log");
     1300        OpenTextFilePlease(fNightlyLogFile, fFullNightlyLogFileName);
     1301
     1302        fFullNightlyReportFileName = CompileFileName(fNightlyFilePath, "", "rep");
     1303        OpenTextFilePlease(fNightlyReportFile, fFullNightlyReportFileName);
     1304
     1305        fDailyFileDayChangedAlready = true;
     1306    }
     1307    if (Time().h() != 12 && fDailyFileDayChangedAlready)
     1308        fDailyFileDayChangedAlready = false;
    13361309
    13371310    //create the converter for that service
     
    14661439    if (isItaReport)
    14671440    {
     1441        //check if the last received event was before noon and if current one is after noon.
     1442        //if so, close the file so that it gets reopened.
     1443        if (sub.nightlyFile.IsOpen())
     1444            if ((sub.lastReceivedEvent != Time::None) && (sub.lastReceivedEvent.h() < 12) && (cTime.h() >= 12))
     1445            {
     1446                sub.nightlyFile.Close();
     1447            }
     1448        sub.lastReceivedEvent = cTime;
    14681449        if (!sub.nightlyFile.IsOpen() || !sub.runFile.IsOpen() || sub.runNumber != sub.runFile.fRunNumber)
    14691450            OpenFITSFilesPlease(sub, cRunNumber);
     
    15201501
    15211502    DataLoggerStats statVar;
    1522     /*const bool statWarning =*/ calculateTotalSizeWritten(statVar, false);
     1503    /*const bool statWarning =*/ calculateTotalSizeWritten(statVar, true);
    15231504
    15241505    Message("----------------- STATS ------------------");
     
    15401521    str << "There are " << fNumSubAndFitsData.numSubscriptions << " active DIM subscriptions.";
    15411522    Message(str);
    1542 
    15431523    for (map<const string, map<string, SubscriptionType> >::const_iterator it=fServiceSubscriptions.begin(); it!= fServiceSubscriptions.end();it++)
    15441524    {
     
    15471527            Message(" -> "+it2->first);
    15481528    }
    1549 
    15501529    Message("--------------- BLOCK LIST ---------------");
    15511530    for (set<string>::const_iterator it=fBlackList.begin(); it != fBlackList.end(); it++)
     
    17421721{
    17431722    AddNewRunNumber(evt.GetXtra(), evt.GetTime());
    1744 //    fRunNumber = evt.GetInt();
    17451723    return GetCurrentState();
    17461724}
     
    18671845        partialName = CompileFileName(fNightlyFilePath, serviceName, "fits");
    18681846        fileNameOnly = partialName.substr(partialName.find_last_of('/')+1, partialName.size());
    1869         AllocateFITSBuffers(sub);
     1847        if (!sub.fitsBufferAllocated)
     1848            AllocateFITSBuffers(sub);
    18701849        //get the size of the file we're about to open
    18711850        if (RememberFileOrigSizePlease(partialName, true))//and remember that the file was opened (i.e. not an update)
     
    20201999     sub.nightlyFile.InitDataColumns(GetDescription(sub.server, sub.service), dataFormatsLocal, sub.dimInfo->getData(), size);
    20212000     sub.runFile.InitDataColumns(GetDescription(sub.server, sub.service), dataFormatsLocal, sub.dimInfo->getData(), size);
     2001     sub.fitsBufferAllocated = true;
    20222002}
    20232003// --------------------------------------------------------------------------
     
    22432223{
    22442224   if (fDebugIsOn)
    2245     {
     2225   {
    22462226        Debug("Going to the Ready state...");
    2247     }   
     2227   }
    22482228   if (GetCurrentState() == kSM_Logging)
    22492229       StopRunPlease();
Note: See TracChangeset for help on using the changeset viewer.