#ifndef FACT_EventBuilderWrapper #define FACT_EventBuilderWrapper #include #if BOOST_VERSION < 104400 #if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4)) #undef BOOST_HAS_RVALUE_REFS #endif #endif #include #include #include #include "DimWriteStatistics.h" #include "DataCalib.h" #include "DataWriteRaw.h" #ifdef HAVE_FITS #include "DataWriteFits.h" #else #define DataWriteFits DataWriteFits2 #endif #include "DataWriteFits2.h" #include "queue.h" namespace ba = boost::asio; namespace bs = boost::system; namespace fs = boost::filesystem; using ba::ip::tcp; using namespace std; // ======================================================================== #include "EventBuilder.h" void StartEvtBuild(); void CloseRunFile(); // ======================================================================== class EventBuilderWrapper { public: // FIXME static EventBuilderWrapper *This; MessageImp &fMsg; private: boost::thread fThreadMain; enum { kCurrent = 0, kTotal = 1, kEventId = 2, kTriggerId = 3, }; FAD::FileFormat_t fFileFormat; //uint32_t fMaxRun; uint32_t fLastOpened; uint32_t fLastClosed; array fNumEvts; DimWriteStatistics fDimWriteStats; DimDescribedService fDimRuns; DimDescribedService fDimEvents; DimDescribedService fDimRawData; DimDescribedService fDimEventData; DimDescribedService fDimFeedbackData; DimDescribedService fDimFwVersion; DimDescribedService fDimRunNumber; DimDescribedService fDimStatus; DimDescribedService fDimDNA; DimDescribedService fDimTemperature; DimDescribedService fDimPrescaler; DimDescribedService fDimRefClock; DimDescribedService fDimRoi; DimDescribedService fDimDac; DimDescribedService fDimDrsRuns; DimDescribedService fDimDrsCalibration; DimDescribedService fDimStatistics1; //DimDescribedService fDimStatistics2; DimDescribedService fDimFileFormat; DimDescribedService fDimIncomplete; Queue> fQueueStatistics1; Queue> fQueueProcHeader; Queue>> fQueueEvents; Queue>> fQueueRoi; Queue> fQueueRawData; Queue>> fQueueEventData; Queue, array>> fQueueTempRefClk; string fPath; uint32_t fNightAsInt; uint32_t fRunNumber; int64_t fRunInProgress; array fVecRoi; pair> fMaxEvent; // Maximum event from applyCalib protected: bool InitRunNumber(const string &path="") { if (!path.empty()) { if (!DimWriteStatistics::DoesPathExist(path, fMsg)) { fMsg.Error("Data path "+path+" does not exist!"); return false; } fPath = path; fDimWriteStats.SetCurrentFolder(fPath); fMsg.Info("Data path set to "+path+"."); } // Get current night const uint32_t night = Time().NightAsInt(); if (night==fNightAsInt) return true; // Check for run numbers fRunNumber = 1000; while (--fRunNumber>0) { const string name = DataProcessorImp::FormFileName(fPath, night, fRunNumber, ""); if (access((name+"bin").c_str(), F_OK) == 0) break; if (access((name+"fits").c_str(), F_OK) == 0) break; if (access((name+"drs.fits").c_str(), F_OK) == 0) break; } // This is now the first file which does not exist fRunNumber++; fLastOpened = 0; // Check if we have exceeded the maximum if (fRunNumber==1000) { fMsg.Error("You have a file with run-number 1000 in "+fPath+" ["+to_string(night)+"]"); return false; } ostringstream str; if (fNightAsInt==0) str << "First night..."; else str << "Night has changd from " << fNightAsInt << "... new"; str << " run-number is " << night << "-" << setfill('0') << setw(3) << fRunNumber << " [" << (fPath.empty()?".":fPath) << "]"; fMsg.Message(str); fNightAsInt = night; return true; } public: EventBuilderWrapper(MessageImp &imp) : fMsg(imp), fFileFormat(FAD::kNone), /*fMaxRun(0),*/ fLastOpened(0), fLastClosed(0), fDimWriteStats ("FAD_CONTROL", imp), fDimRuns ("FAD_CONTROL/RUNS", "I:2;C", "Run files statistics" "|stats[int]:last opened or closed run" "|file[string]:filename of last opened file"), fDimEvents ("FAD_CONTROL/EVENTS", "I:4", "Event counts" "|evtsCount[int]:Num evts cur. run, total (all run), evt ID, trig. Num"), fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;S:1;I:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", "|roi[uint16]:number of samples per pixel" "|roi_tm[uint16]:number of samples per time-marker channel" "|num_fad[uint32]:event number from FADs" "|num_ftm[uint32]:trigger number from FTM" "|type[uint16]:trigger type from FTM" "|num_boards[uint32]:number of active boards" "|time[uint32]:PC time as unix time stamp" "|time_board[uint32]:Time stamp of FAD boards" "|start_pix[int16]:start sample of pixels" "|start_tm[int16]:start sample of time marker channels" "|adc[int16]:adc data"), fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", "|avg:|rms:|max:|pos"), fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""), fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", "Firmware version number of fad boards" "|firmware[float]:Version number of firmware, for each board. 40=min, 41=max"), fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", "Run numbers coming from FAD boards" "|runNumbers[int]:current run number of each FAD board. 40=min, 41=max"), fDimStatus ("FAD_CONTROL/STATUS", "S:42", "Status of FAD boards" "|status[bitpattern]:Status of each FAD board. Maybe buggy"), fDimDNA ("FAD_CONTROL/DNA", "X:40", "DNA of FAD boards" "|DNA[hex]:Hex identifier of each FAD board"), fDimTemperature ("FAD_CONTROL/TEMPERATURE", "S:1;F:160", "DRS temperatures" "|cnt[uint16]:Counter of averaged values" "|temp[deg C]:average temp of all DRS chips"), fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", "Trigger generator prescaler of fad boards" "|prescaler[int]:Trigger generator prescaler value, for each board"), fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "S:1;F:40", "Reference clock of FAD boards" "|cnt[uint16]:Counter of averaged values" "|clk[Hz]:Averaged clock of ref clocks of FAD boards"), fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", "roi:|roi_rm:"), fDimDac ("FAD_CONTROL/DAC", "S:336", "DAC settings of each FAD board" "|DAC[int]:DAC counts, sequentially DAC 0 board 0, 0/1, 0/2... (plus min max)"), fDimDrsRuns ("FAD_CONTROL/DRS_RUNS", "I:1;I:3", "|roi:Region of interest of secondary baseline" "|run:Run numbers of DRS runs (0=none)"), fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:1;I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:163840;F:163840", "|roi:Region of interest of secondary baseline" "|run:Run numbers of DRS runs (0=none)"), fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:5;X:3;I:1;I:2;C:40;I:40;I:40", "Event Builder status for GUI display" "|bufferInfo[int]:Events in buffer, incomp., comp., write, proc., tot." "|memInfo[int]:total mem allocated, used mem, max memory" "|deltaT[ms]:Time in ms for rates" "|rateNew[int]:Number of new start events received" "|numConn[int]:Number of connections per board" "|rateBytes[int]:Bytes read during last cylce" "|relBytes[int]:Relative number of total bytes received (received - released)"), fDimFileFormat("FAD_CONTROL/FILE_FORMAT", "S:1", "|format[int]:Current file format"), fDimIncomplete("FAD_CONTROL/INCOMPLETE", "X:1", "|incomplete[bits]:One bit per board"), // It is important to instantiate them after the DimServices fQueueStatistics1(std::bind(&EventBuilderWrapper::UpdateDimStatistics1, this, placeholders::_1)), fQueueProcHeader( std::bind(&EventBuilderWrapper::procHeader, this, placeholders::_1)), fQueueEvents( std::bind(&EventBuilderWrapper::UpdateDimEvents, this, placeholders::_1)), fQueueRoi( std::bind(&EventBuilderWrapper::UpdateDimRoi, this, placeholders::_1)), fQueueRawData( std::bind(&EventBuilderWrapper::UpdateDimRawData, this, placeholders::_1)), fQueueEventData( std::bind(&EventBuilderWrapper::UpdateDimEventData, this, placeholders::_1)), fQueueTempRefClk( std::bind(&EventBuilderWrapper::UpdateDimTempRefClk, this, placeholders::_1)), fNightAsInt(0), fRunInProgress(-1), fMaxEvent(make_pair(-FLT_MAX, array())) { if (This) throw logic_error("EventBuilderWrapper cannot be instantiated twice."); This = this; fVecRoi.fill(0); memset(fNumEvts.data(), 0, sizeof(fNumEvts)); fDimEvents.Update(fNumEvts); for (size_t i=0; i<40; i++) ConnectSlot(i, tcp::endpoint()); } virtual ~EventBuilderWrapper() { Abort(); // FIXME: Used timed_join and abort afterwards // What's the maximum time the eb need to abort? fThreadMain.join(); } map fExpectedRuns; mutex mtx_newrun; uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair &ref) { if (maxtime<=0 || maxtime>24*60*60) maxtime = 24*60*60; if (maxevt<=0 || maxevt>INT32_MAX) maxevt = INT32_MAX; if (!InitRunNumber()) return 0; const FAD::RunDescription descr = { uint32_t(maxtime), uint32_t(maxevt), fNightAsInt, ref.first, ref.second, }; const lock_guard lock(mtx_newrun); fExpectedRuns[fRunNumber] = descr; return fRunNumber++; } bool IsThreadRunning() { if (fThreadMain.get_id()==boost::this_thread::get_id()) return true; return !fThreadMain.timed_join(boost::posix_time::microseconds(0)); } void SetMaxMemory(unsigned int mb) const { g_maxMem = size_t(mb)*1000000; } void SetEventTimeout(uint16_t to) const { g_evtTimeout = to; } void StartThread(const vector &addr) { if (IsThreadRunning()) { fMsg.Warn("Start - EventBuilder still running"); return; } //fLastMessage.clear(); for (size_t i=0; i<40; i++) ConnectSlot(i, addr[i]); fMsg.Message("Starting EventBuilder thread"); fThreadMain = boost::thread(StartEvtBuild); } void ConnectSlot(unsigned int i, const tcp::endpoint &addr) { if (i>39) return; fRunInProgress = -1; if (addr==tcp::endpoint()) { // In this order g_port[i].sockDef = 0; fDimIncomplete.setQuality(0); fDimIncomplete.Update(uint64_t(0)); return; } struct sockaddr_in sockaddr; //IP for each socket sockaddr.sin_family = AF_INET; sockaddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong()); sockaddr.sin_port = htons(addr.port()); memcpy(&g_port[i].sockAddr, &sockaddr, sizeof(struct sockaddr_in)); // In this order g_port[i].sockDef = 1; fDimIncomplete.setQuality(0); fDimIncomplete.Update(uint64_t(0)); } void IgnoreSlot(unsigned int i) { if (i>39) return; if (g_port[i].sockAddr.sin_port==0) return; g_port[i].sockDef = -1; } void Abort() { fMsg.Message("Signal abort to EventBuilder thread..."); g_reset = 2; } void ResetThread(bool soft) { fMsg.Message("Signal reset to EventBuilder thread..."); g_reset = soft ? 101 : 102; } void Exit() { fMsg.Message("Signal exit to EventBuilder thread..."); g_reset = 1; } bool IsConnected(int i) const { return gi_NumConnect[i]==1; } bool IsConnecting(int i) const { return gi_NumConnect[i]==0 && g_port[i].sockDef!=0; } bool IsDisconnected(int i) const { return gi_NumConnect[i]==0 && g_port[i].sockDef==0; } bool IsRunInProgress() const { return fRunInProgress>=0; } void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; } bool IsIgnored(int i) const { return g_port[i].sockDef==-1; } void SetOutputFormat(FAD::FileFormat_t f) { const bool changed = f!=fFileFormat; fFileFormat = f; fDimFileFormat.Update(uint16_t(f)); string msg = "File format set to: "; switch (f) { case FAD::kNone: msg += "kNone."; break; case FAD::kDebug: msg += "kDebug."; break; case FAD::kFits: msg += "kFits."; break; case FAD::kCfitsio: msg += "kCfitsio"; break; case FAD::kRaw: msg += "kRaw"; break; case FAD::kCalib: DataCalib::Restart(); DataCalib::Update(fDimDrsCalibration, fDimDrsRuns); fMsg.Message("Resetted DRS calibration."); return; } if (changed) fMsg.Message(msg); } virtual int ResetSecondaryDrsBaseline() { if (DataCalib::ResetTrgOff(fDimDrsCalibration, fDimDrsRuns)) { fFileFormat = FAD::kCalib; fDimFileFormat.Update(uint16_t(fFileFormat)); fMsg.Message("Resetted DRS calibration for secondary baseline."); } else fMsg.Warn("Could not reset DRS calibration of secondary baseline."); return 0; } void LoadDrsCalibration(const char *fname) { if (!DataCalib::ReadFits(fname, fMsg)) return; fMsg.Info("Successfully loaded DRS calibration from "+string(fname)); DataCalib::Update(fDimDrsCalibration, fDimDrsRuns); } virtual int CloseOpenFiles() { CloseRunFile(); fRunInProgress = -1; return 0; } // -------------- Mapped event builder callbacks ------------------ void UpdateRuns(const string &fname="") { uint32_t values[2] = { fLastOpened, fLastClosed }; vector data(sizeof(values)+fname.size()+1); memcpy(data.data(), values, sizeof(values)); strcpy(data.data()+sizeof(values), fname.c_str()); fDimRuns.setQuality((bool)fFile); fDimRuns.Update(data); if (!fname.empty()) fDimWriteStats.FileOpened(fname); } shared_ptr fFile; void UpdateDimEvents(const pair> &stat) { fDimEvents.setData(stat.second.data(), sizeof(uint32_t)*4); fDimEvents.Update(stat.first); } bool runOpen(const EVT_CTRL2 &evt) { const uint32_t night = evt.runCtrl->night; const uint32_t runid = evt.runNum>0 ? evt.runNum : time(NULL); // If there is still an open file: close it if (fFile) runClose(*evt.runCtrl); // Keep a copy of the currently valid drs calibration // and associate it to the run control structure evt.runCtrl->calib = make_shared(DataCalib::GetCalibration()); /* evt.runCtrl->calibInt.resize(1024*1440); const int16_t *off = evt.runCtrl->zcalib.data(); int32_t *ptr = evt.runCtrl->calib.data(); const uint64_t num = evt.runCtrl->calib.fNumOffset; for (int i=0; i<1024*1440) ptr[i] = off[i]/num; */ // FIMXE: Check if file already exists... // Crate the file DataProcessorImp *file = 0; switch (fFileFormat) { case FAD::kNone: file = new DataDump(fPath, night, runid, fMsg); break; case FAD::kDebug: file = new DataDebug(fPath, night, runid, fMsg); break; case FAD::kCfitsio: file = new DataWriteFits(fPath, night, runid, fMsg); break; case FAD::kFits: file = new DataWriteFits2(fPath, night, runid, fMsg); break; case FAD::kRaw: file = new DataWriteRaw(fPath, night, runid, fMsg); break; case FAD::kCalib: file = new DataCalib(fPath, night, runid, fDimDrsCalibration, fDimDrsRuns, fMsg); break; } try { // Try to open the file FAD::RunDescription desc; desc.name = evt.runCtrl->runType; if (!file->Open(evt, desc)) return false; } catch (const exception &e) { fMsg.Error("Exception trying to open file: "+string(e.what())); return false; } fLastOpened = runid; // Signal that a file is open fFile = shared_ptr(file); // Now do all the calls which potentially block (dim) // Time for update runs before time for update events UpdateRuns(file->GetFileName()); fNumEvts[kEventId] = 0; fNumEvts[kTriggerId] = 0; fNumEvts[kCurrent] = 0; fQueueEvents.emplace(Time(), fNumEvts); ostringstream str; str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")"; fMsg.Info(str); return true; } bool runWrite(const EVT_CTRL2 &e) { /* const size_t size = sizeof(EVENT)+1440*(evt.Roi+evt.RoiTM)*2; vector evt(e.fEvent, e.fEvent+size); const EVENT &evt = *reinterpret_cast(evt.data()); int16_t *val = evt.Adc_Data; const int16_t *off = e.runCtrl->zcalib.data(); for (const int16_t *start=evt.StartPix; startWriteEvt(evt)) return false; fNumEvts[kCurrent]++; fNumEvts[kEventId] = evt.EventNum; fNumEvts[kTriggerId] = evt.TriggerNum; fNumEvts[kTotal]++; static Time oldt(boost::date_time::neg_infin); Time newt; if (newt>oldt+boost::posix_time::seconds(1)) { fQueueEvents.emplace(newt, fNumEvts); oldt = newt; } return true; } void runClose(RUN_CTRL2 &run) { if (!fFile) return; // It can happen that runFinished was never called // (e.g. runWrite failed) if (fRunInProgress==fFile->GetRunId()) fRunInProgress = -1; // Close the file const bool rc = fFile->Close(NULL); fLastClosed = fFile->GetRunId(); ostringstream str; str << "Closed: " << fFile->GetFileName() << " (" << fFile->GetRunId() << ")"; if (!rc) str << "... failed!"; // Signal that the file is closed fFile.reset(); // Now do all the calls which can potentially block (dim) CloseRun(fLastClosed); // Time for update events before time for update runs fQueueEvents.emplace(Time(), fNumEvts); UpdateRuns(); // Do the potentially blocking call after all others rc ? fMsg.Info(str) : fMsg.Error(str); // If a Drs Calibration has just been finished, all following events // should also be processed with this calibration. // Note that this is a generally dangerous operation. Here, the previous // DRS calibration shared_ptr gets freed and if it is the last in use, // the memory will vanish. If another thread accesses that pointer, // it _must_ make a copy of the shared_ptr first to ensure that // the memory will stay in scope until the end of its operation. const DrsCalibration &cal = DataCalib::GetCalibration(); if (!run.calib || run.calib->fStep != cal.fStep || run.calib->fRoi!=cal.fRoi) run.calib = make_shared(cal); } virtual void CloseRun(uint32_t /*runid*/) { } void UpdateDimRoi(const pair> &roi) { fDimRoi.setData(roi.second.data(), sizeof(uint16_t)*2); fDimRoi.Update(roi.first); } void UpdateDimTempRefClk(const tuple, array> &dat) { const auto delay = boost::posix_time::seconds(5); const Time &tm = get<0>(dat); const array &clk = get<1>(dat); const array &tmp = get<2>(dat); // --------------- RefClock --------------- // history, add current data to history static list>> listclk; listclk.emplace_back(tm, clk); // --------------- Temperatures --------------- // history, add current data to history static list>> listtmp; listtmp.emplace_back(tm, tmp); // ========== Update dim services once a second ========= static Time oldt(boost::date_time::neg_infin); Time newt; if (newtfirst+delay>tm) break; listclk.pop_front(); } // Structure for dim service struct Clock { uint16_t num; float val[40]; Clock() { memset(this, 0, sizeof(Clock)); } } __attribute__((__packed__)); // Calculate average and fll structure vector clknum(40); Clock avgclk; avgclk.num = listclk.size(); for (auto it=listclk.begin(); it!=listclk.end(); it++) for (int i=0; i<40; i++) if (it->second[i]!=UINT32_MAX) { avgclk.val[i] += it->second[i]; clknum[i]++; } for (int i=0; i<40; i++) avgclk.val[i] *= 2.048/clknum[i]; // Update dim service fDimRefClock.setData(avgclk); fDimRefClock.Update(tm); listclk.clear(); // --------------- Temperatures --------------- // remove expired data from history while (1) { auto it=listtmp.begin(); if (it==listtmp.end() || it->first+delay>tm) break; listtmp.pop_front(); } // Structure for dim service struct Temp { uint16_t num; float val[160]; Temp() { memset(this, 0, sizeof(Temp)); } } __attribute__((__packed__)); // Calculate average and fll structure vector tmpnum(160); Temp avgtmp; avgtmp.num = listtmp.size(); for (auto it=listtmp.begin(); it!=listtmp.end(); it++) for (int i=0; i<160; i++) if (it->second[i]!=INT16_MIN) { avgtmp.val[i] += it->second[i]; tmpnum[i]++; } for (int i=0; i<160; i++) avgtmp.val[i] /= tmpnum[i]*16; // Update dim service fDimTemperature.setData(avgtmp); fDimTemperature.Update(tm); listtmp.clear(); } bool eventCheck(const EVT_CTRL2 &evt) { const EVENT *event = evt.fEvent; const Time tm(evt.time); const array roi = {{ event->Roi, event->RoiTM }}; if (roi!=fVecRoi) { fQueueRoi.emplace(tm, roi); fVecRoi = roi; } const FAD::EventHeader *beg = reinterpret_cast(evt.FADhead); const FAD::EventHeader *end = reinterpret_cast(evt.FADhead)+40; // FIMXE: Compare with target configuration // Copy data to array array clk; array tmp; for (int i=0; i<40; i++) clk[i] = UINT32_MAX; for (int i=0; i<160; i++) tmp[i] = INT16_MIN; //fill(clk.data(), clk.data()+ 40, UINT32_MAX); //fill(tmp.data(), tmp.data()+160, INT16_MIN); for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++) { // FIXME: Compare with expectations!!! if (ptr->fStartDelimiter==0) { if (ptr==beg) beg++; continue; } clk[ptr->Id()] = ptr->fFreqRefClock; for (int i=0; i<4; i++) tmp[ptr->Id()*4+i] = ptr->fTempDrs[i]; if (beg->fStatus != ptr->fStatus) { fMsg.Error("Inconsistency in FAD status detected.... closing run."); return false; } if (beg->fRunNumber != ptr->fRunNumber) { fMsg.Error("Inconsistent run number detected.... closing run."); return false; } /* if (beg->fVersion != ptr->fVersion) { Error("Inconsist firmware version detected.... closing run."); CloseRunFile(runNr, 0, 0); break; } */ if (beg->fEventCounter != ptr->fEventCounter) { fMsg.Error("Inconsistent FAD event number detected.... closing run."); return false; } if (beg->fTriggerCounter != ptr->fTriggerCounter) { fMsg.Error("Inconsistent FTM trigger number detected.... closing run."); return false; } // FIXME: Check with first event! if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift) { fMsg.Error("Inconsistent phase shift detected.... closing run."); return false; } // FIXME: Check with first event! if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac))) { fMsg.Error("Inconsistent DAC values detected.... closing run."); return false; } if (beg->fTriggerType != ptr->fTriggerType) { fMsg.Error("Inconsistent trigger type detected.... closing run."); return false; } } fQueueTempRefClk.emplace(tm, clk, tmp); // check REFCLK_frequency // check consistency with command configuration // how to log errors? // need gotNewRun/closedRun to know it is finished return true; } Time fLastDimRawData; Time fLastDimEventData; void UpdateDimRawData(const vector &v) { const EVENT *evt = reinterpret_cast(v.data()); fDimRawData.setData(v); fDimRawData.setQuality(evt->TriggerType); fDimRawData.Update(Time(evt->PCTime, evt->PCUsec)); } void UpdateDimEventData(const tuple> &tup) { fDimEventData.setQuality(get<1>(tup)); fDimEventData.setData(get<2>(tup)); fDimEventData.Update(get<0>(tup)); } void applyCalib(const EVT_CTRL2 &evt, const size_t &size) { const EVENT *event = evt.fEvent; const int16_t *start = event->StartPix; // Get the reference to the run associated information RUN_CTRL2 &run = *evt.runCtrl; if (size==1) // If there is more than one event waiting (including this one), throw them away { Time now; // ------------------- Copy event data to new memory -------------------- // (to make it thread safe; a static buffer might improve memory handling) const uint16_t roi = event->Roi; // ------------------- Apply full DRS calibration ------------------------ // (Is that necessray, or would a simple offset correct do well already?) // This is a very important step. Making a copy of the shared pointer ensures // that another thread (here: runClose) can set a new shared_ptr with new // data without this thread being affected. If we just did run.calib->Apply // the shared_pointer in use here might vanash during the processing, the // memory is freed and we access invalid memory. It is not important // which memory we acces (the old or the new one) because it is just for // display purpose anyway. const shared_ptr cal = run.calib; // There seems to be a problem using std::array... maybe the size is too big? // array vec2; vector vec((1440+160)*roi); cal->Apply(vec.data(), event->Adc_Data, start, roi); // ------------------- Appy DRS-step correction -------------------------- for (auto it=run.prevStart.begin(); it!=run.prevStart.end(); it++) { DrsCalibrate::CorrectStep(vec.data(), 1440, roi, it->data(), start, roi+10); DrsCalibrate::CorrectStep(vec.data(), 1440, roi, it->data(), start, 3); } // ------------------------- Remove spikes -------------------------------- DrsCalibrate::RemoveSpikes3(vec.data(), roi); // -------------- Update raw data dim sevice (VERY SLOW) ----------------- if (fQueueRawData.empty() && now>fLastDimRawData+boost::posix_time::seconds(5)) { vector data1(sizeof(EVENT)+vec.size()*sizeof(float)); memcpy(data1.data(), event, sizeof(EVENT)); memcpy(data1.data()+sizeof(EVENT), vec.data(), vec.size()*sizeof(float)); fQueueRawData.emplace(data1); fLastDimRawData = now; } // ------------------------- Basic statistics ----------------------------- DrsCalibrate::SlidingAverage(vec.data(), roi, 10); // If this is a cosmic event array stats; // Mean, RMS, Max, Pos const float max = DrsCalibrate::GetPixelStats(stats.data(), vec.data(), roi); if (evt.trgTyp==0 && max>fMaxEvent.first) fMaxEvent = make_pair(max, stats); // ------------------ Update dim service (statistics) --------------------- if (fQueueEventData.empty() && now>fLastDimEventData+boost::posix_time::microseconds(3141593)) { fQueueEventData.emplace(evt.time, evt.trgTyp, evt.trgTyp==0 ? fMaxEvent.second : stats); if (evt.trgTyp==0) fMaxEvent.first = -FLT_MAX; fLastDimEventData = now; } // === SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event) // // if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint()) // return; // // vector data2(1440); // Mean, RMS, Max, Pos, first, last // DrsCalibrate::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1); // // fDimFeedbackData.Update(data2); } // Keep the start cells of the last five events for further corrections // As a performance improvement we could also just store the // pointers to the last five events... // What if a new run is started? Do we mind? auto &l = run.prevStart; // History for start cells of previous events (for step calibration) if (l.size()<5) l.emplace_front(); else { auto it = l.end(); l.splice(l.begin(), l, --it); } memcpy(l.front().data(), start, 1440*sizeof(int16_t)); } bool IsRunWaiting() { const lock_guard lock(mtx_newrun); return fExpectedRuns.find(fRunNumber-1)!=fExpectedRuns.end(); } uint32_t GetRunNumber() const { return fRunNumber; } bool IncreaseRunNumber(uint32_t run) { if (!InitRunNumber()) return false; if (run lock(mtx_newrun); map::iterator it = fExpectedRuns.begin(); while (it!=fExpectedRuns.end()) { if (it->firstfirst << "."; fMsg.Info(str); // Increase the iterator first, it becomes invalid with the next call const auto is = it++; fExpectedRuns.erase(is); continue; } if (it->first==run.runId) break; it++; } if (it==fExpectedRuns.end()) { ostringstream str; str << "runOpen - Run " << run.runId << " wasn't expected (maybe manual triggers)"; fMsg.Warn(str); // This is not ideal, but the best we can do run.night = fNightAsInt; return; } const FAD::RunDescription &conf = it->second; run.runType = conf.name; run.maxEvt = conf.maxevt; run.closeTime = conf.maxtime + run.openTime; run.night = conf.night; fExpectedRuns.erase(it); // Now signal the fadctrl (configuration process that a run is in progress) // Maybe this could be done earlier, but we are talking about a // negligible time scale here. fRunInProgress = run.runId; } void runFinished() { // This is called when the last event of a run (run time exceeded or // max number of events exceeded) has been received. fRunInProgress = -1; } //map fLastMessage; void factOut(int severity, const char *message) { ostringstream str; str << "EventBuilder: " << message; /* string &old = fLastMessage[boost::this_thread::get_id()]; if (str.str()==old) return; old = str.str(); */ fMsg.Update(str, severity); } /* void factStat(int64_t *stat, int len) { if (len!=7) { fMsg.Warn("factStat received unknown number of values."); return; } vector data(1, g_maxMem); data.insert(data.end(), stat, stat+len); static vector last(8); if (data==last) return; last = data; fDimStatistics.Update(data); // len ist die Laenge des arrays. // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran // kannst Du pruefen, ob die 100MB voll sind .... ostringstream str; str << "Wait=" << stat[0] << " " << "Skip=" << stat[1] << " " << "Del=" << stat[2] << " " << "Tot=" << stat[3] << " " << "Mem=" << stat[4] << "/" << g_maxMem << " " << "Read=" << stat[5] << " " << "Conn=" << stat[6]; fMsg.Info(str); } */ void UpdateDimStatistics1(const pair &stat) { fDimStatistics1.setData(&stat.second, sizeof(GUI_STAT)); fDimStatistics1.Update(stat.first); } void factStat(const GUI_STAT &stat) { fQueueStatistics1.emplace(Time(), stat); } void factReportIncomplete(uint64_t rep) { fDimIncomplete.setQuality(1); fDimIncomplete.Update(rep); } array fVecHeader; template array Compare(const S *vec, const T *t) { const int offset = reinterpret_cast(t) - reinterpret_cast(vec); const T *min = NULL; const T *val = NULL; const T *max = NULL; array arr; // bool rc = true; for (int i=0; i<40; i++) { const char *base = reinterpret_cast(vec+i); const T *ref = reinterpret_cast(base+offset); arr[i] = *ref; if (gi_NumConnect[i]==0) { arr[i] = 0; continue; } if (!val) { min = ref; val = ref; max = ref; } if (*ref<*min) min = ref; if (*ref>*max) max = ref; // if (*val!=*ref) // rc = false; } arr[40] = val ? *min : 1; arr[41] = val ? *max : 0; return arr; } template array CompareBits(const FAD::EventHeader *h, const T *t) { const int offset = reinterpret_cast(t) - reinterpret_cast(h); T val = 0; T rc = 0; array vec; bool first = true; for (int i=0; i<40; i++) { const char *base = reinterpret_cast(&fVecHeader[i]); const T *ref = reinterpret_cast(base+offset); vec[i+2] = *ref; if (gi_NumConnect[i]==0) { vec[i+2] = 0; continue; } if (first) { first = false; val = *ref; rc = 0; } rc |= val^*ref; } vec[0] = rc; vec[1] = val; return vec; } template void Update(DimDescribedService &svc, const array &data, const Time &t=Time(), int n=N) { svc.setData(const_cast(data.data()), sizeof(T)*n); svc.Update(t); } template void Print(const char *name, const pair> &data) { cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << " fNumConnected; void procHeader(const tuple &dat) { const Time &t = get<0>(dat); const bool changed = get<1>(dat); const FAD::EventHeader &h = get<2>(dat); const FAD::EventHeader old = fVecHeader[h.Id()]; fVecHeader[h.Id()] = h; if (old.fVersion != h.fVersion || changed) { const array ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion); array data; for (int i=0; i<42; i++) { ostringstream str; str << (ver[i]>>8) << '.' << (ver[i]&0xff); data[i] = stof(str.str()); } Update(fDimFwVersion, data, t); } if (old.fRunNumber != h.fRunNumber || changed) { const array run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber); fDimRunNumber.setData(&run[0], 42*sizeof(uint32_t)); fDimRunNumber.Update(t); } if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed) { const array pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler); fDimPrescaler.setData(&pre[0], 42*sizeof(uint16_t)); fDimPrescaler.Update(t); } if (old.fDNA != h.fDNA || changed) { const array dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA); Update(fDimDNA, dna, t, 40); } if (old.fStatus != h.fStatus || changed) { const array sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus); Update(fDimStatus, sts, t); } if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed) { array dacs; for (int i=0; i dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]); memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42); } Update(fDimDac, dacs, t); } } void debugHead(const FAD::EventHeader &h) { const uint16_t id = h.Id(); if (id>39) return; if (fNumConnected.size()!=40) fNumConnected.resize(40); const vector con(gi_NumConnect, gi_NumConnect+40); const bool changed = con!=fNumConnected || !IsThreadRunning(); fNumConnected = con; fQueueProcHeader.emplace(Time(), changed, h); } }; EventBuilderWrapper *EventBuilderWrapper::This = 0; // ----------- Event builder callbacks implementation --------------- bool runOpen(const EVT_CTRL2 &evt) { return EventBuilderWrapper::This->runOpen(evt); } bool runWrite(const EVT_CTRL2 &evt) { return EventBuilderWrapper::This->runWrite(evt); } void runClose(RUN_CTRL2 &run) { EventBuilderWrapper::This->runClose(run); } bool eventCheck(const EVT_CTRL2 &evt) { return EventBuilderWrapper::This->eventCheck(evt); } void gotNewRun(RUN_CTRL2 &run) { EventBuilderWrapper::This->gotNewRun(run); } void runFinished() { EventBuilderWrapper::This->runFinished(); } void applyCalib(const EVT_CTRL2 &evt, const size_t &size) { EventBuilderWrapper::This->applyCalib(evt, size); } void factOut(int severity, const char *message) { EventBuilderWrapper::This->factOut(severity, message); } void factStat(const GUI_STAT &stat) { EventBuilderWrapper::This->factStat(stat); } void factReportIncomplete(uint64_t rep) { EventBuilderWrapper::This->factReportIncomplete(rep); } // ------ void debugHead(void *buf) { const FAD::EventHeader &h = *reinterpret_cast(buf); EventBuilderWrapper::This->debugHead(h); } #endif