Changeset 16107 for trunk/FACT++


Ignore:
Timestamp:
05/24/13 14:00:44 (11 years ago)
Author:
tbretz
Message:
Removed the C-interface and added the C++-interface to the event builder; added a mutex to secure fExpectedRun; fixed some potential problems erasing entries from collections within iterators; added fIsRunInProgress to signal which run data is currently received from the FADs; propagate the run-type and the night through the run-ctrl structre; removed the need for fIsRunStarted
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilderWrapper.h

    r15610 r16107  
    1212#include <boost/filesystem.hpp>
    1313#include <boost/date_time/posix_time/posix_time_types.hpp>
    14 #include <condition_variable>
    1514
    1615#include "DimWriteStatistics.h"
     
    4140#include "EventBuilder.h"
    4241
    43 extern "C" {
    44     extern void StartEvtBuild();
    45     extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
    46 }
     42void StartEvtBuild();
     43void CloseRunFile();
    4744
    4845// ========================================================================
     
    5956    boost::thread fThreadMain;
    6057
    61     enum CommandStates_t // g_runStat
    62     {
    63         kAbort      = -2,   // quit as soon as possible ('abort')
    64         kExit       = -1,   // stop reading, quit when buffered events done ('exit')
    65         kInitialize =  0,   // 'initialize' (e.g. dim not yet started)
    66         kHybernate  =  1,   // do nothing for long time ('hybernate') [wakeup within ~1sec]
    67         kSleep      =  2,   // do nothing ('sleep')                   [wakeup within ~10msec]
    68         kModeFlush  = 10,   // read data from camera, but skip them ('flush')
    69         kModeTest   = 20,   // read data and process them, but do not write to disk ('test')
    70         kModeFlag   = 30,   // read data, process and write all to disk ('flag')
    71         kModeRun    = 40,   // read data, process and write selected to disk ('run')
    72     };
    73 
    7458    enum
    7559    {
     
    8266    FAD::FileFormat_t fFileFormat;
    8367
    84     uint32_t fMaxRun;
     68    //uint32_t fMaxRun;
    8569    uint32_t fLastOpened;
    8670    uint32_t fLastClosed;
     
    11397    Queue<pair<Time,array<uint32_t,4>>> fDimQueue3;
    11498    Queue<pair<Time,array<uint16_t,2>>> fDimQueue4;
    115    
    116     bool fDebugStream;
    117     bool fDebugRead;
    118     bool fDebugLog;
    11999
    120100    string   fPath;
    121     uint64_t fNightAsInt;
     101    uint32_t fNightAsInt;
    122102    uint32_t fRunNumber;
     103    int64_t  fRunInProgress;
     104
     105    array<uint16_t,2> fVecRoi;
    123106
    124107protected:
     
    140123
    141124        // Get current night
    142         const uint64_t night = Time().NightAsInt();
     125        const uint32_t night = Time().NightAsInt();
    143126        if (night==fNightAsInt)
    144127            return true;
     
    185168public:
    186169    EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
    187         fFileFormat(FAD::kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
     170        fFileFormat(FAD::kNone), /*fMaxRun(0),*/ fLastOpened(0), fLastClosed(0),
    188171        fDimWriteStats  ("FAD_CONTROL", imp),
    189172        fDimRuns        ("FAD_CONTROL/RUNS",               "I:5;C",
     
    254237                                                           "|rateBytes[int]:Bytes read this cycle"
    255238                                                           "|totBytes[int]:Bytes read (counter)"),
    256         /*fDimStatistics2 ("FAD_CONTROL/STATISTICS2",        "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40",
    257                                                            "Event Builder status, events oriented"
    258                                                            "|reset[int]:If increased, reset all counters"
    259                                                            "|numRead[int]:How often sucessful read from N sockets per loop"
    260                                                            "|gotByte[int]:number of bytes read per board"
    261                                                            "|gotErr[int]:number of com. errors per board"
    262                                                            "|evtStat[int]:number of evts read, completed, with errors, incomplete"
    263                                                            "|procStat[int]:num. of evts proc., w probs, acc. or rej. by SW trigger"
    264                                                            "|feedStat[int]:number of evts used or rejected by feedback system"
    265                                                            "|wrtStat[int]:number of evts written to disk, with errors"
    266                                                            "|runStat[int]:number of run opened, closed, with open or close errors"
    267                                                            "|numConn[int]:number of sockets successfully opened per board"),*/
    268239        fDimFileFormat("FAD_CONTROL/FILE_FORMAT",          "S:1", "|format[int]:Current file format"),
    269240        fDimIncomplete("FAD_CONTROL/INCOMPLETE",           "X:1", "|incomplete[bits]:One bit per board"),
     
    273244        fDimQueue3(std::bind(&EventBuilderWrapper::updateEvents, this, placeholders::_1)),
    274245        fDimQueue4(std::bind(&EventBuilderWrapper::updateRoi, this, placeholders::_1)),
    275         fDebugStream(false), fDebugRead(false), fDebugLog(false), fNightAsInt(0)
     246        fNightAsInt(0), fRunInProgress(-1)
    276247    {
    277248        if (This)
     
    279250
    280251        This = this;
     252
     253        fVecRoi.fill(0);
    281254
    282255        memset(fNumEvts.data(), 0, sizeof(fNumEvts));
     
    294267        //        What's the maximum time the eb need to abort?
    295268        fThreadMain.join();
    296 
    297         //ffMsg.Info("EventBuilder stopped.");
    298 
    299         for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
    300             delete *it;
    301     }
    302 
    303     set<uint32_t> fIsRunStarted;
     269    }
     270
    304271    map<uint32_t, FAD::RunDescription> fExpectedRuns;
     272
     273    mutex mtx_newrun;
    305274
    306275    uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
     
    311280            maxevt  = INT32_MAX;
    312281
     282        if (!InitRunNumber())
     283            return 0;
     284
    313285        const FAD::RunDescription descr =
    314286        {
    315287            uint32_t(maxtime),
    316288            uint32_t(maxevt),
     289            fNightAsInt,
    317290            ref.first,
    318291            ref.second,
    319292        };
    320293
    321         if (!InitRunNumber())
    322             return 0;
    323 
    324         // FIMXE: Maybe reset an event counter so that the mcp can count events?
    325 
    326         //fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
    327 
     294        const lock_guard<mutex> lock(mtx_newrun);
    328295        fExpectedRuns[fRunNumber] = descr;
    329         fIsRunStarted.insert(fRunNumber);
    330296        return fRunNumber++;
    331297    }
     
    333299    bool IsThreadRunning()
    334300    {
     301        if (fThreadMain.get_id()==boost::this_thread::get_id())
     302            return true;
    335303        return !fThreadMain.timed_join(boost::posix_time::microseconds(0));
    336304    }
     
    338306    void SetMaxMemory(unsigned int mb) const
    339307    {
    340         /*
    341         if (mb*1000000<GetUsedMemory())
    342         {
    343             // ffMsg.Warn("...");
    344             return;
    345         }*/
    346 
    347308        g_maxMem = size_t(mb)*1000000;
    348309    }
     
    356317        }
    357318
    358         fLastMessage.clear();
     319        //fLastMessage.clear();
    359320
    360321        for (size_t i=0; i<40; i++)
    361322            ConnectSlot(i, addr[i]);
    362323
    363         g_runStat = kModeRun;
    364         g_maxProc = 1;
    365 
    366324        fMsg.Message("Starting EventBuilder thread");
    367325
    368326        fThreadMain = boost::thread(StartEvtBuild);
    369327    }
     328
    370329    void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
    371330    {
     
    400359        // In this order
    401360
    402         struct sockaddr_in addr; //IP for each socket
    403         addr.sin_family      = AF_INET;
    404         addr.sin_addr.s_addr = 0;
    405         addr.sin_port        = 0;
    406         memcpy(&g_port[i].sockAddr, &addr, sizeof(struct sockaddr_in));
    407 
    408361        fDimIncomplete.setQuality(0);
    409362        fDimIncomplete.Update(uint64_t(0));
     
    413366        if (i>39)
    414367            return;
     368
    415369        if (g_port[i].sockAddr.sin_port==0)
    416370            return;
     
    423377    {
    424378        fMsg.Message("Signal abort to EventBuilder thread...");
    425         g_reset = 1;
     379        g_reset = 2;
    426380    }
    427381
    428382    void ResetThread(bool soft)
    429383    {
    430         /*
    431          if (g_reset > 0)
    432 
    433             * suspend reading
    434             * reset = g_reset;
    435             * g_reset=0
    436 
    437             * reset% 10
    438                 == 0  leave event Buffers as they are
    439                 == 1  let all buffers drain (write (incomplete) events)
    440                 >  1  flush all buffers (do not write buffered events)
    441 
    442             * (reset/10)%10
    443                 > 0   close all sockets and destroy them (also free the
    444                       allocated read-buffers)
    445                       recreate before resuming operation
    446                       [ this is more than just close/open that can be
    447                         triggered by e.g. close/open the base-socket ]
    448 
    449             * (reset/100)%10
    450                 > 0   close all open run-files
    451 
    452             * (reset/1000)
    453                       sleep so many seconds before resuming operation
    454                       (does not (yet) take into account time left when waiting
    455                       for buffers getting empty ...)
    456 
    457             * resume_reading
    458 
    459          */
    460384        fMsg.Message("Signal reset to EventBuilder thread...");
    461385        g_reset = soft ? 101 : 102;
     
    465389    {
    466390        fMsg.Message("Signal exit to EventBuilder thread...");
    467         g_runStat = kExit;
    468     }
    469 
    470     /*
    471     void Wait()
    472     {
    473         fThread.join();
    474         ffMsg.Message("EventBuilder stopped.");
    475     }*/
    476 
    477     void Hybernate() const { g_runStat = kHybernate; }
    478     void Sleep()     const { g_runStat = kSleep;     }
    479     void FlushMode() const { g_runStat = kModeFlush; }
    480     void TestMode()  const { g_runStat = kModeTest;  }
    481     void FlagMode()  const { g_runStat = kModeFlag;  }
    482     void RunMode()   const { g_runStat = kModeRun;   }
    483 
    484     // FIXME: To be removed
    485     //void SetMode(int mode) const { g_runStat = mode; }
    486 
    487     bool IsConnected(int i) const     { return gi_NumConnect[i]==7; }
    488     bool IsConnecting(int i) const    { return !IsConnected(i) && !IsDisconnected(i); }
    489     bool IsDisconnected(int i) const  { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
    490     int  GetNumConnected(int i) const { return gi_NumConnect[i]; }
    491     int GetNumFilesOpen() const       { return fFiles.size(); }
    492 
    493     /*
    494     bool IsConnected(int i) const     { return gi_NumConnect[i]>0; }
    495     bool IsConnecting(int i) const    { return !IsConnected(i) && !IsDisconnected(i); }
    496     bool IsDisconnected(int i) const  { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
    497     int  GetNumConnected(int i) const { return gi_NumConnect[i]; }
    498     */
     391        g_reset = 1;
     392    }
     393
     394    bool IsConnected(int i) const     { return gi_NumConnect[i]==1; }
     395    bool IsConnecting(int i) const    { return gi_NumConnect[i]==0 && g_port[i].sockDef!=0; }
     396    bool IsDisconnected(int i) const  { return gi_NumConnect[i]==0 && g_port[i].sockDef==0; }
     397    bool IsRunInProgress() const { return fRunInProgress>=0; }
    499398
    500399    void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
     
    541440    }
    542441
    543     void SetDebugLog(bool b) { fDebugLog = b; }
    544 
    545     void SetDebugStream(bool b)
    546     {
    547         fDebugStream = b;
    548         if (b)
    549             return;
    550 
    551         for (int i=0; i<40; i++)
    552         {
    553             if (!fDumpStream[i].is_open())
    554                 continue;
    555 
    556             fDumpStream[i].close();
    557 
    558             ostringstream name;
    559             name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
    560             fMsg.Message("Closed file '"+name.str()+"'");
    561         }
    562     }
    563 
    564     void SetDebugRead(bool b)
    565     {
    566         fDebugRead = b;
    567         if (b || !fDumpRead.is_open())
    568             return;
    569 
    570         fDumpRead.close();
    571         fMsg.Message("Closed file 'socket_events.txt'");
    572     }
    573 
    574 //    size_t GetUsedMemory() const { return gi_usedMem; }
    575 
    576442    void LoadDrsCalibration(const char *fname)
    577443    {
     
    582448    }
    583449
    584     virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
    585 
    586 
    587     /*
    588      struct OpenFileToDim
    589      {
    590         int code;
    591         char fileName[FILENAME_MAX];
    592      };
    593 
    594      SignalRunOpened(runid, filename);
    595      // Send num open files
    596      // Send runid, (more info about the run?), filename via dim
    597 
    598      SignalEvtWritten(runid);
    599      // Send num events written of newest file
    600 
    601      SignalRunClose(runid);
    602      // Send new num open files
    603      // Send empty file-name if no file is open
    604 
    605      */
     450    virtual int CloseOpenFiles() { CloseRunFile(); return 0; }
     451
    606452
    607453    // -------------- Mapped event builder callbacks ------------------
     
    611457        uint32_t values[5] =
    612458        {
    613             static_cast<uint32_t>(fFiles.size()),
    614             0xffffffff,
    615             0,
     459            !fFile ? 0 : 1,
     460            fFile ? fFile->GetRunId() : 0,
     461            fFile ? fFile->GetRunId() : 0,
    616462            fLastOpened,
    617463            fLastClosed
    618464        };
    619465
    620         for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
    621              it!=fFiles.end(); it++)
    622         {
    623             const DataProcessorImp *file = *it;
    624 
    625             if (file->GetRunId()<values[1])
    626                 values[1] = file->GetRunId();
    627 
    628             if (file->GetRunId()>values[2])
    629                 values[2] = file->GetRunId();
    630         }
    631 
    632         fMaxRun = values[2];
    633 
    634466        vector<char> data(sizeof(values)+fname.size()+1);
    635467        memcpy(data.data(), values, sizeof(values));
    636468        strcpy(data.data()+sizeof(values), fname.c_str());
    637 
    638469        fDimRuns.Update(data);
    639470    }
    640471
    641     vector<DataProcessorImp*> fFiles;
     472    shared_ptr<DataProcessorImp> fFile;
    642473
    643474    void updateEvents(const pair<Time,array<uint32_t,4>> &stat)
     
    647478    }
    648479
    649     FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
    650     {
    651         //fMsg.Info(" ==> TODO: Update run configuration in database!");
    652 
    653         map<uint32_t,FAD::RunDescription>::iterator it = fExpectedRuns.begin();
    654         while (it!=fExpectedRuns.end())
    655         {
    656             if (it->first<runid)
    657             {
    658                 ostringstream str;
    659                 str << "runOpen - Missed run " << it->first << ".";
    660                 fMsg.Info(str);
    661 
    662                 fExpectedRuns.erase(it++);
    663                 continue;
    664             }
    665             if (it->first==runid)
    666                 break;
    667             it++;
    668         }
    669 
    670         FAD::RunDescription desc;
    671 
    672         if (it==fExpectedRuns.end())
    673         {
    674             ostringstream str;
    675             str << "runOpen - Run " << runid << " wasn't expected (maybe manual triggers)";
    676             fMsg.Warn(str);
    677         }
    678         else
    679         {
    680             desc = it->second;
    681             fExpectedRuns.erase(it);
    682         }
    683 
    684         // Check if file already exists...
     480    bool runOpen(const shared_ptr<EVT_CTRL2> &evt)
     481    {
     482        const uint32_t night = evt->runCtrl->night;
     483        const uint32_t runid = evt->runNum>0 ? evt->runNum : time(NULL);
     484
     485        // If there is still an open file: close it
     486        if (fFile)
     487            runClose();
     488
     489        // Keep a copy of the currently valid drs calibration
     490        // and associate it to the run control structure
     491        evt->runCtrl->calib = shared_ptr<DrsCalibration>(new DrsCalibration(DataCalib::GetCalibration()));
     492
     493        // FIMXE: Check if file already exists...
     494
     495        // Crate the file
    685496        DataProcessorImp *file = 0;
    686497        switch (fFileFormat)
    687498        {
    688         case FAD::kNone:    file = new DataDump(fPath, fNightAsInt, runid,  fMsg); break;
    689         case FAD::kDebug:   file = new DataDebug(fPath, fNightAsInt, runid, fMsg); break;
    690         case FAD::kCfitsio: file = new DataWriteFits(fPath, fNightAsInt, runid,  fMsg); break;
    691         case FAD::kFits:    file = new DataWriteFits2(fPath, fNightAsInt, runid, fMsg); break;
    692         case FAD::kRaw:     file = new DataWriteRaw(fPath, fNightAsInt, runid,  fMsg); break;
    693         case FAD::kCalib:   file = new DataCalib(fPath, fNightAsInt, runid, fDimDrsCalibration, fDimDrsRuns, fMsg); break;
     499        case FAD::kNone:    file = new DataDump(fPath, night, runid,  fMsg); break;
     500        case FAD::kDebug:   file = new DataDebug(fPath, night, runid, fMsg); break;
     501        case FAD::kCfitsio: file = new DataWriteFits(fPath, night, runid,  fMsg); break;
     502        case FAD::kFits:    file = new DataWriteFits2(fPath, night, runid, fMsg); break;
     503        case FAD::kRaw:     file = new DataWriteRaw(fPath, night, runid, fMsg); break;
     504        case FAD::kCalib:   file = new DataCalib(fPath, night, runid, fDimDrsCalibration, fDimDrsRuns, fMsg); break;
    694505        }
    695506
    696507        try
    697508        {
    698             if (!file->Open(h, desc))
    699                 return 0;
     509            // Try to open the file
     510            FAD::RunDescription desc;
     511            desc.name = evt->runCtrl->runType;
     512
     513            if (!file->Open(*evt, desc))
     514                return false;
    700515        }
    701516        catch (const exception &e)
    702517        {
    703518            fMsg.Error("Exception trying to open file: "+string(e.what()));
    704             return 0;
    705         }
    706 
    707         fFiles.push_back(file);
     519            return false;
     520        }
     521
     522        fLastOpened = runid;
     523
     524        // Signal that a file is open
     525        fFile = shared_ptr<DataProcessorImp>(file);
     526
     527        // Now do all the calls which potentially block (dim)
     528
     529        // Time for update runs before time for update events
     530        UpdateRuns(file->GetFileName());
     531        fNumEvts[kEventId]   = 0;
     532        fNumEvts[kTriggerId] = 0;
     533        fNumEvts[kCurrent]   = 0;
     534        fDimQueue3.post(make_pair(Time(), fNumEvts));
     535
     536        fDimWriteStats.FileOpened(file->GetFileName());
    708537
    709538        ostringstream str;
     
    711540        fMsg.Info(str);
    712541
    713         fDimWriteStats.FileOpened(file->GetFileName());
    714 
    715         fLastOpened = runid;
    716         UpdateRuns(file->GetFileName());
    717 
    718         fNumEvts[kEventId] = 0;
    719         fNumEvts[kTriggerId] = 0;
    720 
    721         fNumEvts[kCurrent] = 0;
    722         fDimQueue3.post(make_pair(Time(), fNumEvts));
    723         // fDimCurrentEvent.Update(uint32_t(0));
    724 
    725         return reinterpret_cast<FileHandle_t>(file);
    726     }
    727 
    728     int runWrite(FileHandle_t handler, EVENT *e, size_t /*sz*/)
    729     {
    730         DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
    731 
    732         if (!file->WriteEvt(e))
    733             return -1;
    734 
    735         if (file->GetRunId()==fMaxRun)
    736         {
    737             fNumEvts[kCurrent]++;
    738             fNumEvts[kEventId]   = e->EventNum;
    739             fNumEvts[kTriggerId] = e->TriggerNum;
    740         }
    741 
     542        return true;
     543    }
     544
     545    bool runWrite(const shared_ptr<EVT_CTRL2> &e)
     546    {
     547        const EVENT &evt = *e->fEvent;
     548        if (!fFile->WriteEvt(evt))
     549            return false;
     550
     551        fNumEvts[kCurrent]++;
     552        fNumEvts[kEventId]   = evt.EventNum;
     553        fNumEvts[kTriggerId] = evt.TriggerNum;
    742554        fNumEvts[kTotal]++;
    743555
     
    747559        {
    748560            fDimQueue3.post(make_pair(Time(), fNumEvts));
    749             //fDimEvents.Update(fNumEvts);
    750561            oldt = newt;
    751562        }
    752563
    753 
    754         // ===> SignalEvtWritten(runid);
    755         // Send num events written of newest file
    756 
    757         /* close run runId (all all runs if runId=0) */
    758         /* return: 0=close scheduled / >0 already closed / <0 does not exist */
    759         //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
    760 
    761         return 0;
     564        return true;
     565    }
     566
     567    void runClose()
     568    {
     569        if (!fFile)
     570            return;
     571
     572        // It can happen that runFinished was never called
     573        // (e.g. runWrite failed)
     574        if (fRunInProgress==fFile->GetRunId())
     575            fRunInProgress = -1;
     576
     577        // Close the file
     578        const bool rc = fFile->Close(NULL);
     579
     580        fLastClosed = fFile->GetRunId();
     581
     582        ostringstream str;
     583        str << "Closed: " << fFile->GetFileName() << " (" << fFile->GetRunId() << ")";
     584        if (!rc)
     585            str << "... failed!";
     586
     587        // Signal that the file is closed
     588
     589        fFile.reset();
     590
     591        // Now do all the calls which can potentially block (dim)
     592
     593        CloseRun(fLastClosed);
     594
     595        // Time for update events before time for update runs
     596        fDimQueue3.post(make_pair(Time(), fNumEvts));
     597        UpdateRuns();
     598
     599        // Do the potentially blocking call after all others
     600        rc ? fMsg.Info(str) : fMsg.Error(str);
    762601    }
    763602
    764603    virtual void CloseRun(uint32_t /*runid*/) { }
    765 
    766     int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
    767     {
    768         //fMsg.Info(" ==> TODO: Update run configuration in database!");
    769 
    770         DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
    771 
    772         const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
    773         if (it==fFiles.end())
    774         {
    775             ostringstream str;
    776             str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
    777             fMsg.Fatal(str);
    778             return -1;
    779         }
    780 
    781         /*
    782         fFiles.erase(it);
    783 
    784         fLastClosed = file->GetRunId();
    785         CloseRun(fLastClosed);
    786         UpdateRuns();
    787 
    788         fDimEvents.Update(fNumEvts);
    789         */
    790 
    791         const bool rc = file->Close(tail);
    792         if (!rc)
    793         {
    794             // Error message
    795         }
    796 
    797         // Note that this is the signal for the fadctrl to change from
    798         // WritingData back to Connected. If this is done too early,
    799         // a new run might be started before this is closed. This is
    800         // faster, but leads to problems with the DRS calibration
    801         // if the system is fast enough to start the new run before
    802         // this one has really been closed.
    803         fFiles.erase(it);
    804 
    805         fLastClosed = file->GetRunId();
    806         CloseRun(fLastClosed);
    807         UpdateRuns();
    808 
    809         fDimQueue3.post(make_pair(Time(),fNumEvts));
    810         //fDimEvents.Update(fNumEvts);
    811 
    812 
    813         ostringstream str;
    814         str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
    815         fMsg.Info(str);
    816 
    817         delete file;
    818 
    819         // ==> SignalRunClose(runid);
    820         // Send new num open files
    821         // Send empty file-name if no file is open
    822 
    823         return rc ? 0 : -1;
    824     }
    825 
    826     ofstream fDumpStream[40];
    827 
    828     void debugStream(int isock, void *buf, int len)
    829     {
    830         if (!fDebugStream)
    831             return;
    832 
    833         const int slot = isock/7;
    834         if (slot<0 || slot>39)
    835             return;
    836 
    837         if (!fDumpStream[slot].is_open())
    838         {
    839             ostringstream name;
    840             name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
    841 
    842             fDumpStream[slot].open(name.str().c_str(), ios::app);
    843             if (!fDumpStream[slot])
    844             {
    845                 ostringstream str;
    846                 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
    847                 fMsg.Error(str);
    848 
    849                 return;
    850             }
    851 
    852             fMsg.Message("Opened file '"+name.str()+"' for writing.");
    853         }
    854 
    855         fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
    856     }
    857 
    858     ofstream fDumpRead; // Stream to possibly dump docket events
    859 
    860     void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
    861     {
    862         //   isock = socketID (0-279)
    863         //   ibyte = #bytes gelesen
    864         //   event = eventId (oder 0 wenn noch nicht bekannt)
    865         //   state : 1=finished reading data
    866         //           0=reading data
    867         //          -1=start reading data (header)
    868         //          -2=start reading data,
    869         //             eventId not known yet (too little data)
    870         //   tsec, tusec = time when reading seconds, microseconds
    871         //
    872         if (!fDebugRead || ibyte==0)
    873             return;
    874 
    875         if (!fDumpRead.is_open())
    876         {
    877             fDumpRead.open("socket_events.txt", ios::app);
    878             if (!fDumpRead)
    879             {
    880                 ostringstream str;
    881                 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
    882                 fMsg.Error(str);
    883 
    884                 return;
    885             }
    886 
    887             fMsg.Message("Opened file 'socket_events.txt' for writing.");
    888 
    889             fDumpRead << "# START: " << Time().GetAsStr() << endl;
    890             fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
    891         }
    892 
    893         fDumpRead
    894             << setw(2) << state   << " "
    895             << setw(8) << tsec    << " "
    896             << setw(9) << tusec   << " "
    897             << setw(3) << isock   << " "
    898             << setw(2) << isock/7 << " "
    899             << runno  << " "
    900             << event  << " "
    901             << ftmevt << " "
    902             << ibyte << endl;
    903     }
    904 
    905     array<uint16_t,2> fVecRoi;
    906604
    907605    void updateRoi(const pair<Time, array<uint16_t,2>> &roi)
     
    911609    }
    912610
    913     int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int /*iboard*/)
    914     {
    915         /*
    916          fadhd[i] ist ein array mit den 40 fad-headers
    917          (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
    918 
    919          event  ist die Struktur, die auch die write routine erhaelt;
    920          darin sind im header die 'soll-werte' fuer z.B. eventID
    921          als auch die ADC-Werte (falls Du die brauchst)
    922 
    923          Wenn die routine einen negativen Wert liefert, wird das event
    924          geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
    925          */
     611    int eventCheck(const shared_ptr<EVT_CTRL2> &evt)
     612    {
     613        const PEVNT_HEADER *fadhd = evt->FADhead.get();
     614        const EVENT *event = evt->fEvent;
    926615
    927616        const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
     
    933622        }
    934623
    935         const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
    936         const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
     624        const FAD::EventHeader *beg = reinterpret_cast<const FAD::EventHeader*>(fadhd);
     625        const FAD::EventHeader *end = reinterpret_cast<const FAD::EventHeader*>(fadhd)+40;
    937626
    938627        // FIMXE: Compare with target configuration
     
    951640            {
    952641                fMsg.Error("Inconsistency in FAD status detected.... closing run.");
    953                 CloseRunFile(runNr, 0, 0);
     642                evt->runCtrl->maxEvt = 0;
    954643                return -1;
    955644            }
     
    958647            {
    959648                fMsg.Error("Inconsistent run number detected.... closing run.");
    960                 CloseRunFile(runNr, 0, 0);
     649                evt->runCtrl->maxEvt = 0;
    961650                return -1;
    962651            }
     
    973662            {
    974663                fMsg.Error("Inconsistent FAD event number detected.... closing run.");
    975                 CloseRunFile(runNr, 0, 0);
     664                evt->runCtrl->maxEvt = 0;
    976665                return -1;
    977666            }
     
    980669            {
    981670                fMsg.Error("Inconsistent FTM trigger number detected.... closing run.");
    982                 CloseRunFile(runNr, 0, 0);
     671                evt->runCtrl->maxEvt = 0;
    983672                return -1;
    984673            }
     
    987676            {
    988677                fMsg.Error("Inconsistent phase shift detected.... closing run.");
    989                 CloseRunFile(runNr, 0, 0);
     678                evt->runCtrl->maxEvt = 0;
    990679                return -1;
    991680            }
     
    994683            {
    995684                fMsg.Error("Inconsistent DAC values detected.... closing run.");
    996                 CloseRunFile(runNr, 0, 0);
     685                evt->runCtrl->maxEvt = 0;
    997686                return -1;
    998687            }
     
    1001690            {
    1002691                fMsg.Error("Inconsistent trigger type detected.... closing run.");
    1003                 CloseRunFile(runNr, 0, 0);
     692                evt->runCtrl->maxEvt = 0;
    1004693                return -1;
    1005694            }
     
    1014703    }
    1015704
    1016     void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
    1017     {
     705    void applyCalib(const shared_ptr<EVT_CTRL2> &evt)
     706    {
     707        const PEVNT_HEADER *fadhd = evt->FADhead.get();
     708        const EVENT *event = evt->fEvent;
     709
    1018710        // Currently we send any event no matter what its trigger id is...
    1019711        // To be changed.
     
    1029721
    1030722        // Workaround to find a valid header.....
    1031         const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
    1032         const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
     723        const FAD::EventHeader *beg = reinterpret_cast<const FAD::EventHeader*>(fadhd);
     724        const FAD::EventHeader *end = reinterpret_cast<const FAD::EventHeader*>(fadhd)+40;
    1033725
    1034726        // FIMXE: Compare with target configuration
     
    1048740        float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
    1049741
    1050         DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
     742        evt->runCtrl->calib->Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
    1051743        DrsCalibrate::RemoveSpikes(vec, event->Roi);
    1052744
     
    1068760        fDimEventData.setQuality(ptr->fTriggerType);
    1069761        fDimEventData.Update(data2);
    1070     }
    1071 
     762
     763    }
     764
     765    /*
    1072766    void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
    1073767    {
    1074         /*
    1075768        if (!DataCalib::IsValid())
    1076769            return;
     
    1104797
    1105798        fDimFeedbackData.Update(data2);
    1106         */
    1107     }
    1108 
    1109     int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t /*iboard*/, void */*buffer*/)
    1110     {
    1111         switch (threadID)
    1112         {
    1113         case 0:
    1114             SendRawData(fadhd, event);
    1115             return 100;
    1116             /*
    1117         case 1:
    1118             SendFeedbackData(fadhd, event);
    1119             return 2;*/
    1120         }
    1121         return 100;
    1122     }
    1123 
    1124 
    1125     bool IsRunStarted() const
    1126     {
    1127         const set<uint32_t>::const_iterator it = fIsRunStarted.find(fRunNumber-1);
    1128         return it==fIsRunStarted.end();// ? true : it->second.started;
     799    }
     800    */
     801
     802    bool IsRunWaiting()// const
     803    {
     804        const lock_guard<mutex> lock(mtx_newrun);
     805        return fExpectedRuns.find(fRunNumber-1)!=fExpectedRuns.end();
    1129806    }
    1130807
     
    1134811    }
    1135812
     813    /*
    1136814    bool IsRunFileOpen()
    1137815    {
    1138816        return fLastOpened==fRunNumber-1;
    1139     }
     817    }*/
    1140818
    1141819    bool IncreaseRunNumber(uint32_t run)
     
    1159837    }
    1160838
    1161     void gotNewRun(uint32_t runnr, PEVNT_HEADER */*headers*/)
    1162     {
    1163         // This function is called even when writing is switched off
    1164         set<uint32_t>::iterator it = fIsRunStarted.begin();
    1165         while (it!=fIsRunStarted.end())
    1166         {
    1167             if (*it<runnr)
     839    void gotNewRun(RUN_CTRL2 &run)
     840    {
     841        // This is to secure iteration over fExpectedRuns
     842        const lock_guard<mutex> lock(mtx_newrun);
     843
     844        map<uint32_t,FAD::RunDescription>::const_iterator it = fExpectedRuns.begin();
     845        while (it!=fExpectedRuns.end())
     846        {
     847            if (it->first<run.runId)
    1168848            {
    1169849                ostringstream str;
    1170                 str << "gotNewRun - Missed run " << *it << ".";
     850                str << "runOpen - Missed run " << it->first << ".";
    1171851                fMsg.Info(str);
    1172852
    1173                 fIsRunStarted.erase(it++);
     853                // Increase the iterator first, it becomes invalid with the next call
     854                const auto is = it++;
     855                fExpectedRuns.erase(is);
    1174856                continue;
    1175857            }
    1176             if (*it==runnr)
     858
     859            if (it->first==run.runId)
    1177860                break;
     861
    1178862            it++;
    1179863        }
    1180         if (it==fIsRunStarted.end())
     864
     865        if (it==fExpectedRuns.end())
    1181866        {
    1182867            ostringstream str;
    1183             str << "gotNewRun - Not waiting for run " << runnr << ".";
     868            str << "runOpen - Run " << run.runId << " wasn't expected (maybe manual triggers)";
    1184869            fMsg.Warn(str);
    1185             return;
    1186         }
    1187 
    1188         map<uint32_t,FAD::RunDescription>::iterator i2 = fExpectedRuns.find(runnr);
    1189         if (i2==fExpectedRuns.end())
    1190         {
    1191             ostringstream str;
    1192             str << "gotNewRun - Run " << runnr << " wasn't expected.";
    1193             fMsg.Warn(str);
    1194             return;
    1195         }
    1196 
    1197         CloseRunFile(runnr, time(NULL)+i2->second.maxtime, i2->second.maxevt);
    1198         // return: 0=close scheduled / >0 already closed / <0 does not exist
    1199 
    1200         // FIXME: Move configuration from expected runs to runs which will soon
    1201         //        be opened/closed
    1202 
    1203         fIsRunStarted.erase(it);
    1204     }
    1205 
    1206     map<boost::thread::id, string> fLastMessage;
    1207 
    1208     void factOut(int severity, int err, const char *message)
    1209     {
    1210         if (!fDebugLog && severity==99)
    1211             return;
    1212 
     870
     871            // This is not ideal, but the best we can do
     872            run.night = fNightAsInt;
     873
     874            return;
     875        }
     876
     877        const FAD::RunDescription &conf = it->second;
     878
     879        run.runType   = conf.name;
     880        run.maxEvt    = conf.maxevt;
     881        run.closeTime = conf.maxtime + run.openTime;
     882        run.night     = conf.night;
     883
     884        fExpectedRuns.erase(it);
     885
     886        // Now signal the fadctrl (configuration process that a run is in progress)
     887        // Maybe this could be done earlier, but we are talking about a
     888        // negligible time scale here.
     889        fRunInProgress = run.runId;
     890    }
     891
     892    void runFinished()
     893    {
     894        // This is called when the last event of a run (run time exceeded or
     895        // max number of events exceeded) has been received.
     896        fRunInProgress = -1;
     897    }
     898
     899    //map<boost::thread::id, string> fLastMessage;
     900
     901    void factOut(int severity, const char *message)
     902    {
    1213903        ostringstream str;
    1214         //str << boost::this_thread::get_id() << " ";
    1215         str << "EventBuilder(";
    1216         if (err<0)
    1217             str << "---";
    1218         else
    1219             str << err;
    1220         str << "): " << message;
    1221 
     904        str << "EventBuilder: " << message;
     905
     906        /*
    1222907        string &old = fLastMessage[boost::this_thread::get_id()];
    1223908
     
    1225910            return;
    1226911        old = str.str();
     912        */
    1227913
    1228914        fMsg.Update(str, severity);
     
    1266952    */
    1267953
    1268     void factStat(const EVT_STAT &/*stat*/)
    1269     {
    1270         //fDimStatistics2.Update(stat);
    1271     }
    1272 
    1273954    void factStatSend(const pair<Time,GUI_STAT> &stat)
    1274955    {
     
    1309990            arr[i] = *ref;
    1310991
    1311             if (gi_NumConnect[i]!=7)
     992            if (gi_NumConnect[i]==0)
    1312993            {
    1313994                arr[i] = 0;
     
    13571038            vec[i+2] = *ref;
    13581039
    1359             if (gi_NumConnect[i]!=7)
     1040            if (gi_NumConnect[i]==0)
    13601041            {
    13611042                vec[i+2] = 0;
     
    13821063    void Update(DimDescribedService &svc, const array<T, N> &data, const Time &t=Time(), int n=N)
    13831064    {
    1384 //        svc.setQuality(vec[40]<=vec[41]);
    13851065        svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
    13861066        svc.Update(t);
     
    15241204    }
    15251205
    1526     void debugHead(int /*socket*/, const FAD::EventHeader &h)
     1206    void debugHead(const FAD::EventHeader &h)
    15271207    {
    15281208        const uint16_t id = h.Id();
     
    15401220
    15411221        fDimQueue2.post(make_tuple(Time(), changed, h));
    1542 
    1543         //const lock_guard<mutex> guard(fMutexDimQueue2);
    1544         //fDimQueue2.push_back(make_tuple(Time(), changed, h));
    15451222    }
    15461223};
     
    15491226
    15501227// ----------- Event builder callbacks implementation ---------------
    1551 extern "C"
     1228bool runOpen(const shared_ptr<EVT_CTRL2> &evt)
    15521229{
    1553     FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
    1554     {
    1555         return EventBuilderWrapper::This->runOpen(irun, runhd, len);
    1556     }
    1557 
    1558     int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
    1559     {
    1560         return EventBuilderWrapper::This->runWrite(fileId, event, len);
    1561     }
    1562 
    1563     int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
    1564     {
    1565         return EventBuilderWrapper::This->runClose(fileId, runth, len);
    1566     }
    1567 
    1568     // -----
    1569 
    1570     //void *runStart(uint32_t /*irun*/, RUN_HEAD */*runhd*/, size_t /*len*/)
    1571     //{
    1572     //    return NULL;
    1573     //}
    1574 
    1575     int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
    1576     {
    1577         return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
    1578     }
    1579 
    1580     int runEnd(uint32_t, void */*runPtr*/)
    1581     {
    1582         return 0;
    1583     }
    1584 
    1585     // -----
    1586 
    1587     int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
    1588     {
    1589         return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
    1590     }
    1591 
    1592     void gotNewRun(uint32_t runnr, PEVNT_HEADER *headers)
    1593     {
    1594         return EventBuilderWrapper::This->gotNewRun(runnr, headers);
    1595     }
    1596 
    1597     // -----
    1598 
    1599     void factOut(int severity, int err, const char *message)
    1600     {
    1601         EventBuilderWrapper::This->factOut(severity, err, message);
    1602     }
    1603 
    1604     void factStat(GUI_STAT stat)
    1605     {
    1606         EventBuilderWrapper::This->factStat(stat);
    1607     }
    1608 
    1609     void factStatNew(EVT_STAT stat)
    1610     {
    1611         EventBuilderWrapper::This->factStat(stat);
    1612     }
    1613 
    1614     void factReportIncomplete (uint64_t rep)
    1615     {
    1616         EventBuilderWrapper::This->factReportIncomplete(rep);
    1617     }
    1618 
    1619     // ------
    1620 
    1621     void debugHead(int socket, int/*board*/, void *buf)
    1622     {
    1623         const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf);
    1624         EventBuilderWrapper::This->debugHead(socket, h);
    1625     }
    1626 
    1627     void debugStream(int isock, void *buf, int len)
    1628     {
    1629         return EventBuilderWrapper::This->debugStream(isock, buf, len);
    1630     }
    1631 
    1632     void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
    1633     {
    1634         EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
    1635     }
     1230    return EventBuilderWrapper::This->runOpen(evt);
    16361231}
    16371232
     1233bool runWrite(const shared_ptr<EVT_CTRL2> &evt)
     1234{
     1235    return EventBuilderWrapper::This->runWrite(evt);
     1236}
     1237
     1238void runClose()
     1239{
     1240    EventBuilderWrapper::This->runClose();
     1241}
     1242
     1243int eventCheck(const shared_ptr<EVT_CTRL2> &evt)
     1244{
     1245    return EventBuilderWrapper::This->eventCheck(evt);
     1246}
     1247
     1248void gotNewRun(RUN_CTRL2 &run)
     1249{
     1250    EventBuilderWrapper::This->gotNewRun(run);
     1251}
     1252
     1253void runFinished()
     1254{
     1255    EventBuilderWrapper::This->runFinished();
     1256}
     1257
     1258void applyCalib(const shared_ptr<EVT_CTRL2> &evt)
     1259{
     1260    EventBuilderWrapper::This->applyCalib(evt);
     1261}
     1262
     1263void factOut(int severity, const char *message)
     1264{
     1265    EventBuilderWrapper::This->factOut(severity, message);
     1266}
     1267
     1268void factStat(GUI_STAT stat)
     1269{
     1270    EventBuilderWrapper::This->factStat(stat);
     1271}
     1272
     1273void factReportIncomplete(uint64_t rep)
     1274{
     1275    EventBuilderWrapper::This->factReportIncomplete(rep);
     1276}
     1277
     1278// ------
     1279
     1280void debugHead(void *buf)
     1281{
     1282    const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf);
     1283    EventBuilderWrapper::This->debugHead(h);
     1284}
     1285
    16381286#endif
Note: See TracChangeset for help on using the changeset viewer.