Changeset 14233 for trunk


Ignore:
Timestamp:
06/26/12 12:37:25 (12 years ago)
Author:
lyard
Message:
replaced DimServiceInfoListImp with EventImps. Warning, currently the logger only subscribes to services that exist when it is launched
Location:
trunk/FACT++/src
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/Fits.cc

    r13205 r14233  
    6060//! @param out Message object to use for propagating messages
    6161//     
    62 void Fits::InitDataColumns(const vector<Description> &desc, const vector<string>& dataFormat, void* dataPointer, MessageImp* out)
     62void Fits::InitDataColumns(const vector<Description> &desc, const vector<string>& dataFormat, const void* dataPointer, MessageImp* out)
    6363{
    6464    fDataFormats = dataFormat;
    65     fDataPointer = dataPointer;
     65//FIXME I removed the static assignment of the data pointer because it changes from event to event now. Remove the arg of this function
     66//    fDataPointer = dataPointer;
    6667
    6768    if ((desc.size() == 0) && (dataFormat.size() == 0))
     
    314315//! @param conv the converter corresponding to the service being logged
    315316//
    316 bool Fits::Write(const Converter &conv)
     317bool Fits::Write(const Converter &conv, const void* data)
    317318{
    318319    //first copy the standard variables to the copy buffer
     
    328329    {
    329330        //now take care of the DIM data. The Converter is here for that purpose
    330         conv.ToFits(fCopyBuffer.data()+shift, fDataPointer, fCopyBuffer.size()-shift);
     331        conv.ToFits(fCopyBuffer.data()+shift, data, fCopyBuffer.size()-shift);
    331332    }
    332333    catch (const runtime_error &e)
  • trunk/FACT++/src/Fits.h

    r12536 r14233  
    3939
    4040    ///the pointer to the contiguous memory location where the data is stored (i.e. the dim data pointer)
    41     void* fDataPointer;
     41//    void* fDataPointer;
    4242    ///the copy buffer. Required to put the standard and data variable in contguous memory
    4343    vector<char> fCopyBuffer;
     
    6161
    6262    Fits() : fFile(NULL),
    63         fDataPointer(NULL),
    6463        fEndMjD(0.0),
    6564        fNumOpenFitsFiles(NULL),
     
    8079
    8180    ///Adds columns specific to the service being logged.
    82     void InitDataColumns(const vector<Description> &desc, const vector<string>& dataFormat, void* dataPointer, MessageImp* out);
     81    void InitDataColumns(const vector<Description> &desc, const vector<string>& dataFormat, const void* dataPointer, MessageImp* out);
    8382
    8483    ///Opens a FITS file
     
    8685
    8786    ///Write one line of data. Use the given converter.
    88     bool Write(const Converter &conv);
     87    bool Write(const Converter &conv, const void* data);
    8988
    9089    ///Close the currently opened file.
  • trunk/FACT++/src/datalogger.cc

    r14217 r14233  
    8484#endif
    8585
     86#include "DimState.h"
    8687
    8788//Dim structures
     
    115116    }
    116117};
     118
     119EventImp nullEventImp;
    117120///Dim subscription type. Stores all the relevant info to handle a Dim subscription
    118121struct SubscriptionType
     
    137140    /// that it is the first which is deleted -- and consequently none of
    138141    /// the other members can still be in use in an infoHandler)
    139     shared_ptr<DimStampedInfo> dimInfo;
     142    //DIM_REPLACE
     143    //shared_ptr<DimStampedInfo> dimInfo;
     144    unsigned int index;
     145
    140146    ///Dim info constructor
    141     SubscriptionType(DimStampedInfo* info=NULL)
     147    //DIM_REPLACE
     148//    SubscriptionType(DimStampedInfo* info=NULL)
     149    SubscriptionType()
    142150    {
    143151        fConv = shared_ptr<Converter>();
     
    145153        lastReceivedEvent = Time::None;
    146154        fitsBufferAllocated = false;
    147 
    148155        // Should be the last instantiated to make sure that all other
    149156        // variables which might be used are already initialized
    150         dimInfo = shared_ptr<DimStampedInfo>(info);
     157        //DIM_REPLACE
     158        //dimInfo = shared_ptr<DimStampedInfo>(info);
     159        index = 0;
    151160    }
    152161    ///default destructor
     
    156165};
    157166
    158 class DataLogger : public StateMachineDim, DimServiceInfoListImp
     167class DataLogger : public StateMachineDim
     168//DIM_REPLACE
     169//, DimServiceInfoListImp
    159170{
    160171public:
     
    216227     ***************************************************/
    217228    //overloading of DIM's infoHandler function
    218     void infoHandler();
     229    int infoCallback(const EventImp& evt, unsigned int infoIndex);
    219230
    220231    /***************************************************
     
    222233     ***************************************************/
    223234    ///Reporting method for the services info received
    224     void Report(DimInfo* I, SubscriptionType& sub);
     235    void Report(const EventImp& evt, SubscriptionType& sub);
    225236
    226237    ///Configuration of the nightly file path
     
    229240    int PrintState(const Event& evt);
    230241    ///checks whether or not the current info being treated is a run number
    231     void CheckForRunNumber(DimInfo* I);
     242    void CheckForRunNumber(const EventImp& evt, unsigned int index);
    232243    /// start transition
    233244    int Start();
     
    244255#ifdef HAVE_FITS
    245256    ///Open fits files
    246     void OpenFITSFiles(SubscriptionType& sub);
     257    void OpenFITSFiles(SubscriptionType& sub, const EventImp& evt);
    247258    ///Write data to FITS files
    248     void WriteToFITS(SubscriptionType& sub);
     259    void WriteToFITS(SubscriptionType& sub, const void* data);
    249260    ///Allocate the buffers required for fits
    250     void AllocateFITSBuffers(SubscriptionType& sub);
     261    void AllocateFITSBuffers(SubscriptionType& sub, const EventImp& evt);
    251262#endif//has_fits
    252263
     
    314325    bool ShouldSubscribe(const string& server, const string& service);
    315326    ///Subscribe to a given server and service
    316     DimStampedInfo* SubscribeTo(const string& server, const string& service);
     327//    EventImp& SubscribeTo(const string& server, const string& service);
    317328    ///Open a text file and checks for ofstream status
    318329    bool OpenTextFile(ofstream& stream, const string& name);
     
    331342    * INHERITED FROM DimServiceInfoList
    332343    ***************************************************/
     344    ///Add a new server subscription
     345    void AddServer(const string& server);
    333346    ///Add a new service subscription
    334     void AddService(const string&, const string&, const string&, bool);
     347    void AddService(const Service& svc);
    335348    ///Remove a given service subscription
     349    //FIXME unused
    336350    void RemoveService(const string, const string, bool);
    337351    ///Remove all the services associated with a given server
     352    //FIXME unused
    338353    void RemoveAllServices(const string&);
    339354    ///pointer to the dim's subscription that should distribute the run numbers.
    340     DimInfo* fRunNumberService;
     355    //DIM_REPLACE
     356    //DimInfo* fRunNumberService;
     357    unsigned int fRunNumberService;
    341358    /***************************************************
    342359     * Overwritten from MessageImp
     
    350367    int fCurrentDay;
    351368    Time lastFlush;
     369
     370    DimDnsServiceList fDimList;
     371    vector<DimDescriptions*> fServerDescriptionsList;
     372
     373    //counter for keeping tracker of services
     374    unsigned int servicesCounter;
    352375public:
    353376    int Write(const Time &time, const std::string &txt, int qos=kMessage);
     
    412435}
    413436
     437
     438void DataLogger::AddServer(const string& server)
     439{
     440    Info("Got request to add server " + server );
     441    if (server != "DIS_DNS")
     442    {
     443        DimDescriptions* d = new DimDescriptions(server);
     444        d->Subscribe(*this);
     445        fServerDescriptionsList.push_back(d);
     446    }
     447
     448}
     449
    414450// --------------------------------------------------------------------------
    415451//
     
    419455//! @param isCmd whether this is a Dim Command or not. Commands are not logged
    420456//
    421 void DataLogger::AddService(const string& server, const string& service, const string&, bool isCmd)
    422 {
    423     //dataLogger does not subscribe to commands
     457void DataLogger::AddService(const Service& svc)
     458{
     459    const string& server = svc.server;
     460    const string& service = svc.service;
     461    const bool isCmd = svc.iscmd;
     462
     463    Info("Got request to add service: "+server+"/"+service);
     464   //dataLogger does not subscribe to commands
    424465    if (isCmd)
    425466        return;
    426467
    427     Info("Got request to add service: "+server+"/"+service);
    428468
    429469    //check the given subscription against black and white lists
     
    438478        return;
    439479    }
    440 
    441     list[service].dimInfo.reset(SubscribeTo(server, service));
     480    //DIM_REPLACE
     481//    list[service].dimInfo.reset(SubscribeTo(server, service));
     482    if (fDebugIsOn)
     483        Debug("Subscribing to service "+server+"/"+service);
     484    Subscribe(server + "/" + service)
     485        (bind(&DataLogger::infoCallback, this, placeholders::_1, servicesCounter));
    442486    list[service].server  = server;
    443487    list[service].service = service;
     488    list[service].index = servicesCounter;
    444489    fNumSubAndFitsData.numSubscriptions++;
    445490    //check if this is the run numbers service
    446491    if ((server == "FAD_CONTROL") && (service == "START_RUN"))
    447         fRunNumberService = list[service].dimInfo.get();
     492        fRunNumberService = servicesCounter;
     493    servicesCounter++;
    448494    Info("Added subscription to " + server + "/" + service);
    449495}
     
    457503void DataLogger::RemoveService(string server, string service, bool isCmd)
    458504{
     505
    459506    Info("Got request to remove service: "+server+"/"+service);
    460507    if (fDestructing)//this function is called by the super class, after the destructor has deleted its own subscriptions
    461508        return;
     509//FIXME unused
     510    return;
    462511
    463512    if (isCmd)
     
    482531
    483532    if ((server == "FAD_CONTROL") && (service == "START_RUN"))
    484         fRunNumberService = NULL;
     533        fRunNumberService = 0;
    485534
    486535    Info("Removed subscription to " + server + "/" + service);
     
    499548        return;
    500549    }
    501 
     550//FIXME unused
     551    return;
    502552    fNumSubAndFitsData.numSubscriptions -= fServiceSubscriptions[server].size();
    503553
     
    506556
    507557    if (server == "FAD_CONTROL")
    508         fRunNumberService = NULL;
     558        fRunNumberService = 0;
    509559
    510560    if (fDebugIsOn)
     
    585635//! @param service the service name
    586636//
    587 DimStampedInfo* DataLogger::SubscribeTo(const string& server, const string& service)
    588 {
    589     if (fDebugIsOn)
    590         Debug("Subscribing to service "+server+"/"+service);
    591 
    592     return new DimStampedInfo((server + "/" + service).c_str(), (void*)NULL, 0, this);
    593 }
     637/*EventImp& DataLogger::SubscribeTo(const string& server, const string& service)
     638{
     639
     640    //DIM_REPLACE
     641    //return new DimStampedInfo((server + "/" + service).c_str(), (void*)NULL, 0, this);
     642    EventImp& newSubscription = Subscribe(server + "/" + service);
     643    newSubscription.bind(&infoHandler, this, placeholders::_1);
     644    return newSubscription;
     645}*/
    594646// --------------------------------------------------------------------------
    595647//
     
    703755{
    704756    shouldBackLog = true;
     757
     758    servicesCounter=1;
     759
    705760    //initialize member data
    706761    fFilePath = ".";
     762
     763    fDimList.Subscribe(*this);
     764    fDimList.SetCallbackServerAdd(bind(&DataLogger::AddServer, this, placeholders::_1));
     765    fDimList.SetCallbackServiceAdd(bind(&DataLogger::AddService, this, placeholders::_1));
    707766
    708767    //calculate time "centered" around noon instead of midnight
     
    809868     NotifyOpenedFile("", 0, fOpenedRunFiles);
    810869
    811      fRunNumberService = NULL;
     870     fRunNumberService = 0;
     871
    812872     fShouldAutoStart = false;
    813873     fAutoStarted = false;
     874
     875
    814876     if(fDebugIsOn)
    815877     {
     
    855917    }
    856918
     919    for (auto it=fServerDescriptionsList.begin(); it!= fServerDescriptionsList.end(); it++)
     920        delete *it;
     921
    857922    if (fDebugIsOn)
    858923        Debug("DataLogger desctruction ends");   
     
    880945//! Inherited from DimInfo. Handles all the Infos to which we subscribed, and log them
    881946//
    882 void DataLogger::infoHandler()
    883 {
     947int DataLogger::infoCallback(const EventImp& evt, unsigned int subIndex)
     948{
     949//    if (fDebugIsOn)
     950//    {
     951//        ostringstream str;
     952//        str << "Got infoCallback called with service index= " << subIndex;
     953//        Debug(str.str());
     954//    }
    884955    if ((GetCurrentState() == kSM_Ready) &&  (!fAutoStarted) && fShouldAutoStart)
    885956    {
     
    892963            fAutoStarted = true;
    893964    }
    894     DimInfo* I = getInfo();
    895 
    896     if (I==NULL)
    897         return;
     965
    898966
    899967    //check if the service pointer corresponds to something that we subscribed to
     
    904972    for (x=fServiceSubscriptions.begin(); x != fServiceSubscriptions.end(); x++)
    905973    {//find current service is subscriptions
     974     //Edit: this should be useless now... remove it sometimes ?
    906975        for (y=x->second.begin(); y!=x->second.end();y++)
    907             if ((y->second.dimInfo).get() == I)
     976            if (y->second.index == subIndex)
    908977            {
    909978                found = true;   
     
    914983    }
    915984    if (!found)
    916         return;
    917 
    918     if (I->getSize() <= 0 || I->getData()==NULL)
    919     {
    920         return;
    921     }
    922     if (strlen(I->getFormat()) == 0)
    923     {
    924         ostringstream str;
    925         str << "Format of " << I->getName() << " is empty (ptr=" << I->getData() << ", size=" << I->getSize() << ")... ignoring it.";
    926         Warn(str);
    927         return;
    928     }
    929     // Make sure that getTimestampMillisecs is NEVER called before
    930     // getTimestamp is properly called
    931     // check that the message has been updated by something, i.e. must be different from its initial value
    932     if (I->getTimestamp() == 0)
    933     {
    934         return;
    935     }
     985        return GetCurrentState();
     986
     987    if (evt.GetSize() == 0)
     988        return GetCurrentState();
     989    if (evt.GetFormat() == "")
     990        return GetCurrentState();
     991
    936992//    cout.precision(20);
    937993//    cout << "Orig timestamp: " << Time(I->getTimestamp(), I->getTimestampMillisecs()*1000).Mjd() << endl;
     
    941997    //        subscribe to this service anyway and hence we have the pointer
    942998    //        (no need to check for the name)
    943     CheckForRunNumber(I);
    944 
    945     Report(I, y->second);
     999    CheckForRunNumber(evt, subIndex);
     1000
     1001    Report(evt, y->second);
    9461002
    9471003    //remove old run numbers
    9481004    TrimOldRunNumbers();
     1005
     1006    return GetCurrentState();
    9491007}
    9501008
     
    10021060//!        the current DimInfo
    10031061//
    1004 void DataLogger::CheckForRunNumber(DimInfo* I)
    1005 {
    1006     if (I != fRunNumberService)
     1062void DataLogger::CheckForRunNumber(const EventImp& evt, unsigned int index)
     1063{
     1064    if (index != fRunNumberService)
    10071065        return;
    1008 
    1009     AddNewRunNumber(I->getLonglong(), Time(I->getTimestamp(), I->getTimestampMillisecs()*1000));
     1066//    int64_t newRun = reinterpret_cast<const uint64_t*>(evt.GetData())[0];
     1067    AddNewRunNumber(evt.GetXtra(), evt.GetTime());
    10101068}
    10111069
     
    10181076//!        The dataLogger's subscription corresponding to this DimInfo
    10191077//
    1020 void DataLogger::Report(DimInfo* I, SubscriptionType& sub)
    1021 {
    1022     const string fmt(I->getFormat());
     1078void DataLogger::Report(const EventImp& evt, SubscriptionType& sub)
     1079{
     1080    const string fmt(evt.GetFormat());
    10231081
    10241082    const bool isItaReport = fmt!="C";
     
    10271085        return;
    10281086
    1029     if (fDebugIsOn && string(I->getName())!="DATA_LOGGER/MESSAGE")
     1087    if (fDebugIsOn && string(evt.GetName())!="DATA_LOGGER/MESSAGE")
    10301088    {
    10311089        ostringstream str;
    1032         str << "Logging " << I->getName() << " [" << I->getFormat() << "] (" << I->getSize() << ")";
     1090        str << "Logging " << evt.GetName() << " [" << evt.GetFormat() << "] (" << evt.GetSize() << ")";
    10331091        Debug(str);
    10341092    }
     
    11051163    if (!sub.fConv)
    11061164    {
    1107         sub.fConv = shared_ptr<Converter>(new Converter(Out(), I->getFormat()));
     1165        sub.fConv = shared_ptr<Converter>(new Converter(Out(), evt.GetFormat()));
    11081166        if (!sub.fConv->valid())
    11091167        {
    11101168            ostringstream str;
    1111             str << "Couldn't properly parse the format... service " << sub.dimInfo->getName() << " ignored.";
     1169            str << "Couldn't properly parse the format... service " << evt.GetName() << " ignored.";
    11121170            Error(str);
    11131171            return;   
     
    11161174    //construct the header
    11171175    ostringstream header;
    1118     const Time cTime(I->getTimestamp(), I->getTimestampMillisecs()*1000);
    1119     fQuality = I->getQuality();
     1176    const Time cTime(evt.GetTime());
     1177    fQuality = evt.GetQoS();
    11201178    //I had strange surprises with the quality from Dim before. Double check that the value is indeed valid.
    11211179    if (fQuality != kMessage &&
     
    11321190    {
    11331191        //write text header
    1134         header << I->getName() << " " << fQuality << " ";
     1192        header << evt.GetName() << " " << fQuality << " ";
    11351193        header << cTime.Y() << " " << cTime.M() << " " << cTime.D() << " ";
    11361194        header << cTime.h() << " " << cTime.m() << " " << cTime.s() << " ";
    1137         header << cTime.ms() << " " << I->getTimestamp() << " ";
     1195        header << cTime.ms() << " " << evt.GetTime() << " ";
    11381196
    11391197        string text;
    11401198        try
    11411199        {
    1142             text = sub.fConv->GetString(I->getData(), I->getSize());
     1200            text = sub.fConv->GetString(evt.GetData(), evt.GetSize());
    11431201        }
    11441202        catch (const runtime_error &e)
    11451203        {
    11461204            ostringstream str;
    1147             str << "Parsing service " << sub.dimInfo->getName();
     1205            str << "Parsing service " << evt.GetName();
    11481206            str << " failed: " << e.what() << " removing the subscription for now.";
    11491207            Error(str);
     
    11571215        {
    11581216            ostringstream str;
    1159             str << "Service " << sub.dimInfo->getName() << " sent an empty string";
     1217            str << "Service " << evt.GetName() << " sent an empty string";
    11601218            Info(str);
    11611219            return;
     
    11781236        if (!sub.nightlyFile.IsOpen())
    11791237            if (GetCurrentState() != kSM_Ready)
    1180                 OpenFITSFiles(sub);
    1181         WriteToFITS(sub);
     1238                OpenFITSFiles(sub, evt);
     1239        WriteToFITS(sub, evt.GetData());
    11821240#endif
    11831241    }
     
    11871245        try
    11881246        {
    1189            strings = sub.fConv->ToStrings(I->getData());
     1247           strings = sub.fConv->ToStrings(evt.GetData());
    11901248        }
    11911249        catch (const runtime_error &e)
    11921250        {
    11931251            ostringstream str;
    1194             str << "Parsing service " << sub.dimInfo->getName();
     1252            str << "Parsing service " << evt.GetName();
    11951253            str << " failed: " << e.what() << " removing the subscription for now.";
    11961254            Error(str);
     
    12031261        {
    12041262            ostringstream err;
    1205             err << "There was more than one string message in service " << I->getName() << " going to fatal error state";
     1263            err << "There was more than one string message in service " << evt.GetName() << " going to fatal error state";
    12061264            Error(err.str());
    12071265        }
    12081266        ostringstream msg;
    1209         msg << I->getName() << ": " << strings[0];
     1267        msg << evt.GetName() << ": " << strings[0];
    12101268
    12111269        if (fNightlyLogFile.is_open())
     
    12191277        if (!sub.nightlyFile.IsOpen())
    12201278            if (GetCurrentState() != kSM_Ready)
    1221                 OpenFITSFiles(sub);
    1222         WriteToFITS(sub);
     1279                OpenFITSFiles(sub, evt);
     1280        WriteToFITS(sub, evt.GetData());
    12231281    }
    12241282
     
    15251583//! @param sub
    15261584//!     the current DimInfo subscription being examined
    1527 void DataLogger::OpenFITSFiles(SubscriptionType& sub)
    1528 {
    1529     string serviceName(sub.dimInfo->getName());
     1585void DataLogger::OpenFITSFiles(SubscriptionType& sub, const EventImp& evt)
     1586{
     1587    string serviceName(sub.server + "_" + sub.service);//evt.GetName());
    15301588
    15311589    for (unsigned int i=0;i<serviceName.size(); i++)
     
    15441602        const string fileNameOnly = partialName.substr(partialName.find_last_of('/')+1, partialName.size());
    15451603        if (!sub.fitsBufferAllocated)
    1546             AllocateFITSBuffers(sub);
     1604            AllocateFITSBuffers(sub, evt);
    15471605        //get the size of the file we're about to open
    15481606        if (fFilesStats.FileOpened(partialName))
     
    15721630//! @param sub the subscription of interest.
    15731631//
    1574 void DataLogger::AllocateFITSBuffers(SubscriptionType& sub)
     1632void DataLogger::AllocateFITSBuffers(SubscriptionType& sub, const EventImp& evt)
    15751633{
    15761634    //Init the time columns of the file
     
    15951653    str << "Initializing data columns for service " << sub.server << "/" << sub.service;
    15961654    Info(str);
    1597     sub.nightlyFile.InitDataColumns(GetDescription(sub.server, sub.service), dataFormatsLocal, sub.dimInfo->getData(), this);
     1655    sub.nightlyFile.InitDataColumns(Description::SplitDescription(evt.GetDescription()), dataFormatsLocal, evt.GetData(), this);
    15981656
    15991657    sub.fitsBufferAllocated = true;
     
    16031661//! write a dimInfo data to its corresponding FITS files
    16041662//
    1605 void DataLogger::WriteToFITS(SubscriptionType& sub)
     1663//FIXME: DO I REALLY NEED THE EVENT IMP HERE ???
     1664void DataLogger::WriteToFITS(SubscriptionType& sub, const void* data)
    16061665{
    16071666        //nightly File status (open or not) already checked
    16081667        if (sub.nightlyFile.IsOpen())
    16091668        {
    1610             if (!sub.nightlyFile.Write(*sub.fConv.get()))
     1669            if (!sub.nightlyFile.Write(*sub.fConv.get(), data))
    16111670            {
    16121671                RemoveService(sub.server, sub.service, false);
     
    19762035int RunShell(Configuration &conf)
    19772036{
    1978     return Main::execute<T, DataLogger>(conf, true);
     2037    return Main::execute<T, DataLogger>(conf);//, true);
    19792038}
    19802039
Note: See TracChangeset for help on using the changeset viewer.