source: trunk/FACT++/src/EventBuilderWrapper.h@ 16388

Last change on this file since 16388 was 16381, checked in by tbretz, 11 years ago
There is no need to copy shared_ptrs when the pointer is kept valid outside of the scope of the function; updated applyCalib to support also step removal; replaced post by emplace where applicable; removed the obsolete DisconnectSlot; reset data-taking status (fRunInProgress) when the current run is closed manually; to improve the performance of calibApply one would need to buffer the dim service updates in its own queue
File size: 41.3 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteFits2
24#endif
25
26#include "DataWriteFits2.h"
27
28#include "queue.h"
29
30namespace ba = boost::asio;
31namespace bs = boost::system;
32namespace fs = boost::filesystem;
33
34using ba::ip::tcp;
35
36using namespace std;
37
38// ========================================================================
39
40#include "EventBuilder.h"
41
42void StartEvtBuild();
43void CloseRunFile();
44
45// ========================================================================
46
47class EventBuilderWrapper
48{
49public:
50 // FIXME
51 static EventBuilderWrapper *This;
52
53 MessageImp &fMsg;
54
55private:
56 boost::thread fThreadMain;
57
58 enum
59 {
60 kCurrent = 0,
61 kTotal = 1,
62 kEventId = 2,
63 kTriggerId = 3,
64 };
65
66 FAD::FileFormat_t fFileFormat;
67
68 //uint32_t fMaxRun;
69 uint32_t fLastOpened;
70 uint32_t fLastClosed;
71 array<uint32_t,4> fNumEvts;
72
73 DimWriteStatistics fDimWriteStats;
74 DimDescribedService fDimRuns;
75 DimDescribedService fDimEvents;
76 DimDescribedService fDimRawData;
77 DimDescribedService fDimEventData;
78 DimDescribedService fDimFeedbackData;
79 DimDescribedService fDimFwVersion;
80 DimDescribedService fDimRunNumber;
81 DimDescribedService fDimStatus;
82 DimDescribedService fDimDNA;
83 DimDescribedService fDimTemperature;
84 DimDescribedService fDimPrescaler;
85 DimDescribedService fDimRefClock;
86 DimDescribedService fDimRoi;
87 DimDescribedService fDimDac;
88 DimDescribedService fDimDrsRuns;
89 DimDescribedService fDimDrsCalibration;
90 DimDescribedService fDimStatistics1;
91 //DimDescribedService fDimStatistics2;
92 DimDescribedService fDimFileFormat;
93 DimDescribedService fDimIncomplete;
94
95 Queue<pair<Time,GUI_STAT>> fDimQueue1;
96 Queue<tuple<Time,bool,FAD::EventHeader>> fDimQueue2;
97 Queue<pair<Time,array<uint32_t,4>>> fDimQueue3;
98 Queue<pair<Time,array<uint16_t,2>>> fDimQueue4;
99
100 string fPath;
101 uint32_t fNightAsInt;
102 uint32_t fRunNumber;
103 int64_t fRunInProgress;
104
105 array<uint16_t,2> fVecRoi;
106
107protected:
108 bool InitRunNumber(const string &path="")
109 {
110 if (!path.empty())
111 {
112 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
113 {
114 fMsg.Error("Data path "+path+" does not exist!");
115 return false;
116 }
117
118 fPath = path;
119 fDimWriteStats.SetCurrentFolder(fPath);
120
121 fMsg.Info("Data path set to "+path+".");
122 }
123
124 // Get current night
125 const uint32_t night = Time().NightAsInt();
126 if (night==fNightAsInt)
127 return true;
128
129 // Check for run numbers
130 fRunNumber = 1000;
131
132 while (--fRunNumber>0)
133 {
134 const string name = DataProcessorImp::FormFileName(fPath, night, fRunNumber, "");
135
136 if (access((name+"bin").c_str(), F_OK) == 0)
137 break;
138 if (access((name+"fits").c_str(), F_OK) == 0)
139 break;
140 if (access((name+"drs.fits").c_str(), F_OK) == 0)
141 break;
142 }
143
144 // This is now the first file which does not exist
145 fRunNumber++;
146 fLastOpened = 0;
147
148 // Check if we have exceeded the maximum
149 if (fRunNumber==1000)
150 {
151 fMsg.Error("You have a file with run-number 1000 in "+fPath+" ["+to_string(night)+"]");
152 return false;
153 }
154
155 ostringstream str;
156 if (fNightAsInt==0)
157 str << "First night...";
158 else
159 str << "Night has changd from " << fNightAsInt << "... new";
160 str << " run-number is " << night << "-" << setfill('0') << setw(3) << fRunNumber << " [" << (fPath.empty()?".":fPath) << "]";
161 fMsg.Message(str);
162
163 fNightAsInt = night;
164
165 return true;
166 }
167
168public:
169 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
170 fFileFormat(FAD::kNone), /*fMaxRun(0),*/ fLastOpened(0), fLastClosed(0),
171 fDimWriteStats ("FAD_CONTROL", imp),
172 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C",
173 "Run files statistics"
174 "|stats[int]:num of open files, min/max run no, last opened or closed run"
175 "|file[string]:filename of last opened file"),
176 fDimEvents ("FAD_CONTROL/EVENTS", "I:4",
177 "Event counts"
178 "|evtsCount[int]:Num evts cur. run, total (all run), evt ID, trig. Num"),
179 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;S:1;I:1;I:1;S:1;I:1;C:4;I:1;I:2;I:40;S:1440;S:160;F",
180 "|roi[uint16]:number of samples per pixel"
181 "|roi_tm[uint16]:number of samples per time-marker channel"
182 "|num_fad[uint32]:event number from FADs"
183 "|num_ftm[uint32]:trigger number from FTM"
184 "|type[uint16]:trigger type from FTM"
185 "|num_boards[uint32]:number of active boards"
186 "|error[uint8]:event builder error counters"
187 "|dummy[]:"
188 "|time[uint32]:PC time as unix time stamp"
189 "|time_board[uint32]:Time stamp of FAD boards"
190 "|start_pix[int16]:start sample of pixels"
191 "|start_tm[int16]:start sample of time marker channels"
192 "|adc[int16]:adc data"),
193 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", "|avg:|rms:|max:|pos"),
194 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
195 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42",
196 "Firmware version number of fad boards"
197 "|firmware[float]:Version number of firmware, for each board. 40=min, 41=max"),
198 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42",
199 "Run numbers coming from FAD boards"
200 "|runNumbers[int]:current run number of each FAD board. 40=min, 41=max"),
201 fDimStatus ("FAD_CONTROL/STATUS", "S:42",
202 "Status of FAD boards"
203 "|status[bitpattern]:Status of each FAD board. Maybe buggy"),
204 fDimDNA ("FAD_CONTROL/DNA", "X:40",
205 "DNA of FAD boards"
206 "|DNA[hex]:Hex identifier of each FAD board"),
207 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82",
208 "FADs temperatures"
209 "|temp[deg. C]:0 global min, 1-40 min, 41 global max, 42-81 max"),
210 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42",
211 "Trigger generator prescaler of fad boards"
212 "|prescaler[int]:Trigger generator prescaler value, for each board"),
213 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42",
214 "Reference clock of FAD boards"
215 "|refClocks[t]:ref clocks of FAD boards. 40=min, 41=max"),
216 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", "roi:|roi_rm:"),
217 fDimDac ("FAD_CONTROL/DAC", "S:336",
218 "DAC settings of each FAD board"
219 "|DAC[int]:DAC counts, sequentially DAC 0 board 0, 0/1, 0/2... (plus min max)"),
220 fDimDrsRuns ("FAD_CONTROL/DRS_RUNS", "I:1;I:3",
221 "|roi:Region of interest of secondary baseline"
222 "|run:Run numbers of DRS runs (0=none)"),
223 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:1;I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:163840;F:163840",
224 "|roi:Region of interest of secondary baseline"
225 "|run:Run numbers of DRS runs (0=none)"),
226 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;X:40",
227 "Event Builder status for GUI display"
228 "|threadInfo[int]:Number of read, proc and writes"
229 "|bufferInfo[int]:Events in buffer, incomp., comp., tot., max past cycle, total"
230 "|memInfo[int]:total buf. mem, used mem, max used, max past cycle"
231 "|EvtCnt[int]:Number of events skipped, written, with errors"
232 "|badRoi[int]:Num boards with wrong ROI in event, run or board"
233 "|badRoiBoard[int]:Num boards with wrong ROI"
234 "|deltaT[ms]:Time in ms for rates"
235 "|rateNew[int]:Number of new start events received"
236 "|numConn[int]:Number of connections per board"
237 "|rateBytes[int]:Bytes read this cycle"
238 "|totBytes[int]:Bytes read (counter)"),
239 fDimFileFormat("FAD_CONTROL/FILE_FORMAT", "S:1", "|format[int]:Current file format"),
240 fDimIncomplete("FAD_CONTROL/INCOMPLETE", "X:1", "|incomplete[bits]:One bit per board"),
241 // It is important to instantiate them after the DimServices
242 fDimQueue1(std::bind(&EventBuilderWrapper::factStatSend, this, placeholders::_1)),
243 fDimQueue2(std::bind(&EventBuilderWrapper::procHeader, this, placeholders::_1)),
244 fDimQueue3(std::bind(&EventBuilderWrapper::updateEvents, this, placeholders::_1)),
245 fDimQueue4(std::bind(&EventBuilderWrapper::updateRoi, this, placeholders::_1)),
246 fNightAsInt(0), fRunInProgress(-1)
247 {
248 if (This)
249 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
250
251 This = this;
252
253 fVecRoi.fill(0);
254
255 memset(fNumEvts.data(), 0, sizeof(fNumEvts));
256 fDimEvents.Update(fNumEvts);
257
258 for (size_t i=0; i<40; i++)
259 ConnectSlot(i, tcp::endpoint());
260 }
261
262 virtual ~EventBuilderWrapper()
263 {
264 Abort();
265
266 // FIXME: Used timed_join and abort afterwards
267 // What's the maximum time the eb need to abort?
268 fThreadMain.join();
269 }
270
271 map<uint32_t, FAD::RunDescription> fExpectedRuns;
272
273 mutex mtx_newrun;
274
275 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
276 {
277 if (maxtime<=0 || maxtime>24*60*60)
278 maxtime = 24*60*60;
279 if (maxevt<=0 || maxevt>INT32_MAX)
280 maxevt = INT32_MAX;
281
282 if (!InitRunNumber())
283 return 0;
284
285 const FAD::RunDescription descr =
286 {
287 uint32_t(maxtime),
288 uint32_t(maxevt),
289 fNightAsInt,
290 ref.first,
291 ref.second,
292 };
293
294 const lock_guard<mutex> lock(mtx_newrun);
295 fExpectedRuns[fRunNumber] = descr;
296 return fRunNumber++;
297 }
298
299 bool IsThreadRunning()
300 {
301 if (fThreadMain.get_id()==boost::this_thread::get_id())
302 return true;
303 return !fThreadMain.timed_join(boost::posix_time::microseconds(0));
304 }
305
306 void SetMaxMemory(unsigned int mb) const
307 {
308 g_maxMem = size_t(mb)*1000000;
309 }
310
311 void StartThread(const vector<tcp::endpoint> &addr)
312 {
313 if (IsThreadRunning())
314 {
315 fMsg.Warn("Start - EventBuilder still running");
316 return;
317 }
318
319 //fLastMessage.clear();
320
321 for (size_t i=0; i<40; i++)
322 ConnectSlot(i, addr[i]);
323
324 fMsg.Message("Starting EventBuilder thread");
325
326 fThreadMain = boost::thread(StartEvtBuild);
327 }
328
329 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
330 {
331 if (i>39)
332 return;
333
334 fRunInProgress = -1;
335
336 if (addr==tcp::endpoint())
337 {
338 // In this order
339 g_port[i].sockDef = 0;
340
341 fDimIncomplete.setQuality(0);
342 fDimIncomplete.Update(uint64_t(0));
343 return;
344 }
345
346 struct sockaddr_in sockaddr; //IP for each socket
347 sockaddr.sin_family = AF_INET;
348 sockaddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
349 sockaddr.sin_port = htons(addr.port());
350 memcpy(&g_port[i].sockAddr, &sockaddr, sizeof(struct sockaddr_in));
351
352 // In this order
353 g_port[i].sockDef = 1;
354
355 fDimIncomplete.setQuality(0);
356 fDimIncomplete.Update(uint64_t(0));
357 }
358
359 void IgnoreSlot(unsigned int i)
360 {
361 if (i>39)
362 return;
363
364 if (g_port[i].sockAddr.sin_port==0)
365 return;
366
367 g_port[i].sockDef = -1;
368 }
369
370
371 void Abort()
372 {
373 fMsg.Message("Signal abort to EventBuilder thread...");
374 g_reset = 2;
375 }
376
377 void ResetThread(bool soft)
378 {
379 fMsg.Message("Signal reset to EventBuilder thread...");
380 g_reset = soft ? 101 : 102;
381 }
382
383 void Exit()
384 {
385 fMsg.Message("Signal exit to EventBuilder thread...");
386 g_reset = 1;
387 }
388
389 bool IsConnected(int i) const { return gi_NumConnect[i]==1; }
390 bool IsConnecting(int i) const { return gi_NumConnect[i]==0 && g_port[i].sockDef!=0; }
391 bool IsDisconnected(int i) const { return gi_NumConnect[i]==0 && g_port[i].sockDef==0; }
392 bool IsRunInProgress() const { return fRunInProgress>=0; }
393
394 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
395 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
396
397 void SetOutputFormat(FAD::FileFormat_t f)
398 {
399 const bool changed = f!=fFileFormat;
400
401 fFileFormat = f;
402 fDimFileFormat.Update(uint16_t(f));
403
404 string msg = "File format set to: ";
405 switch (f)
406 {
407 case FAD::kNone: msg += "kNone."; break;
408 case FAD::kDebug: msg += "kDebug."; break;
409 case FAD::kFits: msg += "kFits."; break;
410 case FAD::kCfitsio: msg += "kCfitsio"; break;
411 case FAD::kRaw: msg += "kRaw"; break;
412 case FAD::kCalib:
413 DataCalib::Restart();
414 DataCalib::Update(fDimDrsCalibration, fDimDrsRuns);
415 fMsg.Message("Resetted DRS calibration.");
416 return;
417 }
418
419 if (changed)
420 fMsg.Message(msg);
421 }
422
423 virtual int ResetSecondaryDrsBaseline()
424 {
425 if (DataCalib::ResetTrgOff(fDimDrsCalibration, fDimDrsRuns))
426 {
427 fFileFormat = FAD::kCalib;
428 fDimFileFormat.Update(uint16_t(fFileFormat));
429 fMsg.Message("Resetted DRS calibration for secondary baseline.");
430 }
431 else
432 fMsg.Warn("Could not reset DRS calibration of secondary baseline.");
433
434 return 0;
435 }
436
437 void LoadDrsCalibration(const char *fname)
438 {
439 if (!DataCalib::ReadFits(fname, fMsg))
440 return;
441 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
442 DataCalib::Update(fDimDrsCalibration, fDimDrsRuns);
443 }
444
445 virtual int CloseOpenFiles() { CloseRunFile(); fRunInProgress = -1; return 0; }
446
447
448 // -------------- Mapped event builder callbacks ------------------
449
450 void UpdateRuns(const string &fname="")
451 {
452 uint32_t values[5] =
453 {
454 !fFile ? 0 : 1,
455 fFile ? fFile->GetRunId() : 0,
456 fFile ? fFile->GetRunId() : 0,
457 fLastOpened,
458 fLastClosed
459 };
460
461 vector<char> data(sizeof(values)+fname.size()+1);
462 memcpy(data.data(), values, sizeof(values));
463 strcpy(data.data()+sizeof(values), fname.c_str());
464 fDimRuns.Update(data);
465 }
466
467 shared_ptr<DataProcessorImp> fFile;
468
469 void updateEvents(const pair<Time,array<uint32_t,4>> &stat)
470 {
471 fDimEvents.setData(stat.second.data(), sizeof(uint32_t)*4);
472 fDimEvents.Update(stat.first);
473 }
474
475 bool runOpen(const EVT_CTRL2 &evt)
476 {
477 const uint32_t night = evt.runCtrl->night;
478 const uint32_t runid = evt.runNum>0 ? evt.runNum : time(NULL);
479
480 // If there is still an open file: close it
481 if (fFile)
482 runClose();
483
484 // Keep a copy of the currently valid drs calibration
485 // and associate it to the run control structure
486 evt.runCtrl->calib = shared_ptr<DrsCalibration>(new DrsCalibration(DataCalib::GetCalibration()));
487
488 // FIMXE: Check if file already exists...
489
490 // Crate the file
491 DataProcessorImp *file = 0;
492 switch (fFileFormat)
493 {
494 case FAD::kNone: file = new DataDump(fPath, night, runid, fMsg); break;
495 case FAD::kDebug: file = new DataDebug(fPath, night, runid, fMsg); break;
496 case FAD::kCfitsio: file = new DataWriteFits(fPath, night, runid, fMsg); break;
497 case FAD::kFits: file = new DataWriteFits2(fPath, night, runid, fMsg); break;
498 case FAD::kRaw: file = new DataWriteRaw(fPath, night, runid, fMsg); break;
499 case FAD::kCalib: file = new DataCalib(fPath, night, runid, fDimDrsCalibration, fDimDrsRuns, fMsg); break;
500 }
501
502 try
503 {
504 // Try to open the file
505 FAD::RunDescription desc;
506 desc.name = evt.runCtrl->runType;
507
508 if (!file->Open(evt, desc))
509 return false;
510 }
511 catch (const exception &e)
512 {
513 fMsg.Error("Exception trying to open file: "+string(e.what()));
514 return false;
515 }
516
517 fLastOpened = runid;
518
519 // Signal that a file is open
520 fFile = shared_ptr<DataProcessorImp>(file);
521
522 // Now do all the calls which potentially block (dim)
523
524 // Time for update runs before time for update events
525 UpdateRuns(file->GetFileName());
526 fNumEvts[kEventId] = 0;
527 fNumEvts[kTriggerId] = 0;
528 fNumEvts[kCurrent] = 0;
529 fDimQueue3.emplace(Time(), fNumEvts);
530
531 fDimWriteStats.FileOpened(file->GetFileName());
532
533 ostringstream str;
534 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
535 fMsg.Info(str);
536
537 return true;
538 }
539
540 bool runWrite(const EVT_CTRL2 &e)
541 {
542 const EVENT &evt = *e.fEvent;
543 if (!fFile->WriteEvt(evt))
544 return false;
545
546 fNumEvts[kCurrent]++;
547 fNumEvts[kEventId] = evt.EventNum;
548 fNumEvts[kTriggerId] = evt.TriggerNum;
549 fNumEvts[kTotal]++;
550
551 static Time oldt(boost::date_time::neg_infin);
552 Time newt;
553 if (newt>oldt+boost::posix_time::seconds(1))
554 {
555 fDimQueue3.emplace(Time(), fNumEvts);
556 oldt = newt;
557 }
558
559 return true;
560 }
561
562 void runClose()
563 {
564 if (!fFile)
565 return;
566
567 // It can happen that runFinished was never called
568 // (e.g. runWrite failed)
569 if (fRunInProgress==fFile->GetRunId())
570 fRunInProgress = -1;
571
572 // Close the file
573 const bool rc = fFile->Close(NULL);
574
575 fLastClosed = fFile->GetRunId();
576
577 ostringstream str;
578 str << "Closed: " << fFile->GetFileName() << " (" << fFile->GetRunId() << ")";
579 if (!rc)
580 str << "... failed!";
581
582 // Signal that the file is closed
583
584 fFile.reset();
585
586 // Now do all the calls which can potentially block (dim)
587
588 CloseRun(fLastClosed);
589
590 // Time for update events before time for update runs
591 fDimQueue3.emplace(Time(), fNumEvts);
592 UpdateRuns();
593
594 // Do the potentially blocking call after all others
595 rc ? fMsg.Info(str) : fMsg.Error(str);
596 }
597
598 virtual void CloseRun(uint32_t /*runid*/) { }
599
600 void updateRoi(const pair<Time, array<uint16_t,2>> &roi)
601 {
602 fDimRoi.setData(roi.second.data(), sizeof(uint16_t)*2);
603 fDimRoi.Update(roi.first);
604 }
605
606 bool eventCheck(const EVT_CTRL2 &evt)
607 {
608 const EVENT *event = evt.fEvent;
609
610 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
611
612 if (roi!=fVecRoi)
613 {
614 fDimQueue4.emplace(Time(), roi);
615 fVecRoi = roi;
616 }
617
618 const FAD::EventHeader *beg = reinterpret_cast<const FAD::EventHeader*>(evt.FADhead);
619 const FAD::EventHeader *end = reinterpret_cast<const FAD::EventHeader*>(evt.FADhead)+40;
620
621 // FIMXE: Compare with target configuration
622
623 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
624 {
625 // FIXME: Compare with expectations!!!
626 if (ptr->fStartDelimiter==0)
627 {
628 if (ptr==beg)
629 beg++;
630 continue;
631 }
632
633 if (beg->fStatus != ptr->fStatus)
634 {
635 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
636 return false;
637 }
638
639 if (beg->fRunNumber != ptr->fRunNumber)
640 {
641 fMsg.Error("Inconsistent run number detected.... closing run.");
642 return false;
643 }
644
645 /*
646 if (beg->fVersion != ptr->fVersion)
647 {
648 Error("Inconsist firmware version detected.... closing run.");
649 CloseRunFile(runNr, 0, 0);
650 break;
651 }
652 */
653 if (beg->fEventCounter != ptr->fEventCounter)
654 {
655 fMsg.Error("Inconsistent FAD event number detected.... closing run.");
656 return false;
657 }
658
659 if (beg->fTriggerCounter != ptr->fTriggerCounter)
660 {
661 fMsg.Error("Inconsistent FTM trigger number detected.... closing run.");
662 return false;
663 }
664
665 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
666 {
667 fMsg.Error("Inconsistent phase shift detected.... closing run.");
668 return false;
669 }
670
671 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
672 {
673 fMsg.Error("Inconsistent DAC values detected.... closing run.");
674 return false;
675 }
676
677 if (beg->fTriggerType != ptr->fTriggerType)
678 {
679 fMsg.Error("Inconsistent trigger type detected.... closing run.");
680 return false;
681 }
682 }
683
684 // check REFCLK_frequency
685 // check consistency with command configuration
686 // how to log errors?
687 // need gotNewRun/closedRun to know it is finished
688
689 return true;
690 }
691
692 map<float,array<float, 1440*4>> evtList;
693
694 void applyCalib(const shared_ptr<EVT_CTRL2> &evt)
695 {
696 const EVENT *event = evt->fEvent;
697 const int16_t *start = event->StartPix;
698
699 // Get the reference to the run associated information
700 RUN_CTRL2 &run = *evt->runCtrl;
701
702 // Currently we send any event no matter what its trigger id is...
703 // To be changed.
704 static Time oldt(boost::date_time::neg_infin);
705
706 static uint64_t cnt = 0;
707
708 // In case of a need of more performance this cold be split to several threads
709 Time newt;
710 if (newt>=oldt+boost::posix_time::milliseconds(100))
711 {
712 // Keep the last update time
713 oldt = newt;
714
715 // ------------------- Copy event data to new memory --------------------
716 // (to make it thread safe; a static buffer might improve memory handling)
717 const uint16_t roi = event->Roi;
718
719 // ------------------- Apply full DRS calibration ------------------------
720 // (Is that necessray, or would a simple offset correct do well already?)
721
722 // There seems to be a problem using std::array... maybe the size is too big?
723 // array<float, (1440+160)*1024> vec2;
724 vector<float> vec((1440+160)*roi);
725 run.calib->Apply(vec.data(), event->Adc_Data, start, roi);
726
727 // ------------------- Appy DRS-step correction --------------------------
728 for (auto it=run.prevStart.begin(); it!=run.prevStart.end(); it++)
729 {
730 DrsCalibrate::CorrectStep(vec.data(), 1440, roi, it->data(), start, roi+10);
731 DrsCalibrate::CorrectStep(vec.data(), 1440, roi, it->data(), start, 3);
732 }
733
734 // ------------------------- Remove spikes --------------------------------
735 DrsCalibrate::RemoveSpikes3(vec.data(), roi);
736
737 // -------------- Update raw data dim sevice (VERY SLOW) -----------------
738 const Time tm = evt->time;
739 if (++cnt%50==0) // Once every 5s
740 {
741 //array<char,sizeof(EVENT)+(1440+160)*1024*sizeof(float)> data1;
742 vector<char> data1(sizeof(EVENT)+vec.size()*sizeof(float));
743 memcpy(data1.data(), event, sizeof(EVENT));
744 memcpy(data1.data()+sizeof(EVENT), vec.data(), vec.size()*sizeof(float));
745
746 // This needs to be decoupled for optimum performance!
747 // Make a queue which does not overfill by maintaining queue size!
748 fDimRawData.setQuality(evt->trgTyp);
749 fDimRawData.setData(data1.data(), sizeof(EVENT)+vec.size()*sizeof(float));
750 fDimRawData.Update(tm);
751 }
752
753 // ------------------------- Basic statistics -----------------------------
754 DrsCalibrate::SlidingAverage(vec.data(), roi, 10);
755
756 array<float, 1440*4> stats; // Mean, RMS, Max, Pos
757 const double max = DrsCalibrate::GetPixelStats(stats.data(), vec.data(), roi);
758
759 // ------------------ Update dim service (statistics) ---------------------
760 if (evt->trgTyp==0)
761 evtList[max] = stats;
762
763 if (cnt%50==0)
764 {
765 if (!evtList.empty())
766 stats = evtList.rbegin()->second;
767
768 // This needs to be decoupled for optimum performance!
769 // Make a queue which does not overfill by maintaining queue size!
770 fDimEventData.setQuality(evtList.empty() ? evt->trgTyp : 0);
771 fDimEventData.setData(stats.data(), 1440*4*sizeof(float));
772 fDimEventData.Update(tm);
773
774 evtList.clear();
775 }
776
777 // === SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
778 //
779 // if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
780 // return;
781 //
782 // vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
783 // DrsCalibrate::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
784 //
785 // fDimFeedbackData.Update(data2);
786 }
787
788 // Keep the start cells of the last five events for further corrections
789 // As a performance improvement we could also just store the
790 // pointers to the last five events...
791 // What if a new run is started? Do we mind?
792 run.prevStart.emplace_front();
793 memcpy(run.prevStart.front().data(), start, 1440*sizeof(int16_t));
794 if (run.prevStart.size()>5)
795 run.prevStart.pop_back();
796 }
797
798 bool IsRunWaiting()
799 {
800 const lock_guard<mutex> lock(mtx_newrun);
801 return fExpectedRuns.find(fRunNumber-1)!=fExpectedRuns.end();
802 }
803
804 uint32_t GetRunNumber() const
805 {
806 return fRunNumber;
807 }
808
809 bool IncreaseRunNumber(uint32_t run)
810 {
811 if (!InitRunNumber())
812 return false;
813
814 if (run<fRunNumber)
815 {
816 ostringstream msg;
817 msg <<
818 "Run number " << run << " smaller than next available "
819 "run number " << fRunNumber << " in " << fPath << " [" << fNightAsInt << "]";
820 fMsg.Error(msg);
821 return false;
822 }
823
824 fRunNumber = run;
825
826 return true;
827 }
828
829 void gotNewRun(RUN_CTRL2 &run)
830 {
831 // This is to secure iteration over fExpectedRuns
832 const lock_guard<mutex> lock(mtx_newrun);
833
834 map<uint32_t,FAD::RunDescription>::const_iterator it = fExpectedRuns.begin();
835 while (it!=fExpectedRuns.end())
836 {
837 if (it->first<run.runId)
838 {
839 ostringstream str;
840 str << "runOpen - Missed run " << it->first << ".";
841 fMsg.Info(str);
842
843 // Increase the iterator first, it becomes invalid with the next call
844 const auto is = it++;
845 fExpectedRuns.erase(is);
846 continue;
847 }
848
849 if (it->first==run.runId)
850 break;
851
852 it++;
853 }
854
855 if (it==fExpectedRuns.end())
856 {
857 ostringstream str;
858 str << "runOpen - Run " << run.runId << " wasn't expected (maybe manual triggers)";
859 fMsg.Warn(str);
860
861 // This is not ideal, but the best we can do
862 run.night = fNightAsInt;
863
864 return;
865 }
866
867 const FAD::RunDescription &conf = it->second;
868
869 run.runType = conf.name;
870 run.maxEvt = conf.maxevt;
871 run.closeTime = conf.maxtime + run.openTime;
872 run.night = conf.night;
873
874 fExpectedRuns.erase(it);
875
876 // Now signal the fadctrl (configuration process that a run is in progress)
877 // Maybe this could be done earlier, but we are talking about a
878 // negligible time scale here.
879 fRunInProgress = run.runId;
880 }
881
882 void runFinished()
883 {
884 // This is called when the last event of a run (run time exceeded or
885 // max number of events exceeded) has been received.
886 fRunInProgress = -1;
887 }
888
889 //map<boost::thread::id, string> fLastMessage;
890
891 void factOut(int severity, const char *message)
892 {
893 ostringstream str;
894 str << "EventBuilder: " << message;
895
896 /*
897 string &old = fLastMessage[boost::this_thread::get_id()];
898
899 if (str.str()==old)
900 return;
901 old = str.str();
902 */
903
904 fMsg.Update(str, severity);
905 }
906
907/*
908 void factStat(int64_t *stat, int len)
909 {
910 if (len!=7)
911 {
912 fMsg.Warn("factStat received unknown number of values.");
913 return;
914 }
915
916 vector<int64_t> data(1, g_maxMem);
917 data.insert(data.end(), stat, stat+len);
918
919 static vector<int64_t> last(8);
920 if (data==last)
921 return;
922 last = data;
923
924 fDimStatistics.Update(data);
925
926 // len ist die Laenge des arrays.
927 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
928 // kannst Du pruefen, ob die 100MB voll sind ....
929
930 ostringstream str;
931 str
932 << "Wait=" << stat[0] << " "
933 << "Skip=" << stat[1] << " "
934 << "Del=" << stat[2] << " "
935 << "Tot=" << stat[3] << " "
936 << "Mem=" << stat[4] << "/" << g_maxMem << " "
937 << "Read=" << stat[5] << " "
938 << "Conn=" << stat[6];
939
940 fMsg.Info(str);
941 }
942 */
943
944 void factStatSend(const pair<Time,GUI_STAT> &stat)
945 {
946 fDimStatistics1.setData(&stat.second, sizeof(GUI_STAT));
947 fDimStatistics1.Update(stat.first);
948 }
949
950 void factStat(const GUI_STAT &stat)
951 {
952 fDimQueue1.emplace(Time(), stat);
953 }
954
955 void factReportIncomplete(uint64_t rep)
956 {
957 fDimIncomplete.setQuality(1);
958 fDimIncomplete.Update(rep);
959 }
960
961 array<FAD::EventHeader, 40> fVecHeader;
962
963 template<typename T, class S>
964 array<T, 42> Compare(const S *vec, const T *t)
965 {
966 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
967
968 const T *min = NULL;
969 const T *val = NULL;
970 const T *max = NULL;
971
972 array<T, 42> arr;
973
974 // bool rc = true;
975 for (int i=0; i<40; i++)
976 {
977 const char *base = reinterpret_cast<const char*>(vec+i);
978 const T *ref = reinterpret_cast<const T*>(base+offset);
979
980 arr[i] = *ref;
981
982 if (gi_NumConnect[i]==0)
983 {
984 arr[i] = 0;
985 continue;
986 }
987
988 if (!val)
989 {
990 min = ref;
991 val = ref;
992 max = ref;
993 }
994
995 if (*ref<*min)
996 min = ref;
997
998 if (*ref>*max)
999 max = ref;
1000
1001 // if (*val!=*ref)
1002 // rc = false;
1003 }
1004
1005 arr[40] = val ? *min : 1;
1006 arr[41] = val ? *max : 0;
1007
1008 return arr;
1009 }
1010
1011 template<typename T>
1012 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1013 {
1014 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1015
1016 T val = 0;
1017 T rc = 0;
1018
1019 array<T, 42> vec;
1020
1021 bool first = true;
1022
1023 for (int i=0; i<40; i++)
1024 {
1025 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1026 const T *ref = reinterpret_cast<const T*>(base+offset);
1027
1028 vec[i+2] = *ref;
1029
1030 if (gi_NumConnect[i]==0)
1031 {
1032 vec[i+2] = 0;
1033 continue;
1034 }
1035
1036 if (first)
1037 {
1038 first = false;
1039 val = *ref;
1040 rc = 0;
1041 }
1042
1043 rc |= val^*ref;
1044 }
1045
1046 vec[0] = rc;
1047 vec[1] = val;
1048
1049 return vec;
1050 }
1051
1052 template<typename T, size_t N>
1053 void Update(DimDescribedService &svc, const array<T, N> &data, const Time &t=Time(), int n=N)
1054 {
1055 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1056 svc.Update(t);
1057 }
1058
1059 template<typename T>
1060 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1061 {
1062 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1063 for (int i=0; i<40;i++)
1064 cout << " " << data.second[i+3];
1065 cout << endl;
1066 }
1067
1068 vector<uint> fNumConnected;
1069
1070 void procHeader(const tuple<Time,bool,FAD::EventHeader> &dat)
1071 {
1072 const Time &t = get<0>(dat);
1073 const bool changed = get<1>(dat);
1074 const FAD::EventHeader &h = get<2>(dat);
1075
1076 const FAD::EventHeader old = fVecHeader[h.Id()];
1077 fVecHeader[h.Id()] = h;
1078
1079 if (old.fVersion != h.fVersion || changed)
1080 {
1081 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1082
1083 array<float,42> data;
1084 for (int i=0; i<42; i++)
1085 {
1086 ostringstream str;
1087 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1088 data[i] = stof(str.str());
1089 }
1090 Update(fDimFwVersion, data, t);
1091 }
1092
1093 if (old.fRunNumber != h.fRunNumber || changed)
1094 {
1095 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1096 fDimRunNumber.setData(&run[0], 42*sizeof(uint32_t));
1097 fDimRunNumber.Update(t);
1098 }
1099
1100 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1101 {
1102 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1103 fDimPrescaler.setData(&pre[0], 42*sizeof(uint16_t));
1104 fDimPrescaler.Update(t);
1105 }
1106
1107 if (old.fDNA != h.fDNA || changed)
1108 {
1109 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1110 Update(fDimDNA, dna, t, 40);
1111 }
1112
1113 if (old.fStatus != h.fStatus || changed)
1114 {
1115 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1116 Update(fDimStatus, sts, t);
1117 }
1118
1119 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1120 {
1121 array<uint16_t, FAD::kNumDac*42> dacs;
1122
1123 for (int i=0; i<FAD::kNumDac; i++)
1124 {
1125 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1126 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1127 }
1128
1129 Update(fDimDac, dacs, t);
1130 }
1131
1132 // -----------
1133
1134 static Time oldt(boost::date_time::neg_infin);
1135 Time newt;
1136
1137 if (newt>oldt+boost::posix_time::seconds(1))
1138 {
1139 oldt = newt;
1140
1141 // --- RefClock
1142
1143 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1144 Update(fDimRefClock, clk, t);
1145
1146 // --- Temperatures
1147
1148 const array<int16_t,42> tmp[4] =
1149 {
1150 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1151 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1152 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1153 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1154 };
1155
1156 vector<int16_t> data;
1157 data.reserve(82);
1158 data.push_back(tmp[0][40]); // min: 0
1159 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1160 data.push_back(tmp[0][41]); // max: 41
1161 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1162
1163 for (int j=1; j<=3; j++)
1164 {
1165 const array<int16_t,42> &ref = tmp[j];
1166
1167 // Gloabl min
1168 if (ref[40]<data[0]) // 40=min
1169 data[0] = ref[40];
1170
1171 // Global max
1172 if (ref[41]>data[41]) // 41=max
1173 data[41] = ref[41];
1174
1175 for (int i=0; i<40; i++)
1176 {
1177 // min per board
1178 if (ref[i]<data[i+1]) // data: 1-40
1179 data[i+1] = ref[i]; // ref: 0-39
1180
1181 // max per board
1182 if (ref[i]>data[i+42]) // data: 42-81
1183 data[i+42] = ref[i]; // ref: 0-39
1184 }
1185 }
1186
1187 vector<float> deg(82); // 0: global min, 1-40: min
1188 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1189 deg[i] = data[i]/16.;
1190
1191 fDimTemperature.setData(deg.data(), 82*sizeof(float));
1192 fDimTemperature.Update(t);
1193 }
1194 }
1195
1196 void debugHead(const FAD::EventHeader &h)
1197 {
1198 const uint16_t id = h.Id();
1199 if (id>39)
1200 return;
1201
1202 if (fNumConnected.size()!=40)
1203 fNumConnected.resize(40);
1204
1205 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1206
1207 const bool changed = con!=fNumConnected || !IsThreadRunning();
1208
1209 fNumConnected = con;
1210
1211 fDimQueue2.emplace(Time(), changed, h);
1212 }
1213};
1214
1215EventBuilderWrapper *EventBuilderWrapper::This = 0;
1216
1217// ----------- Event builder callbacks implementation ---------------
1218bool runOpen(const EVT_CTRL2 &evt)
1219{
1220 return EventBuilderWrapper::This->runOpen(evt);
1221}
1222
1223bool runWrite(const EVT_CTRL2 &evt)
1224{
1225 return EventBuilderWrapper::This->runWrite(evt);
1226}
1227
1228void runClose()
1229{
1230 EventBuilderWrapper::This->runClose();
1231}
1232
1233bool eventCheck(const EVT_CTRL2 &evt)
1234{
1235 return EventBuilderWrapper::This->eventCheck(evt);
1236}
1237
1238void gotNewRun(RUN_CTRL2 &run)
1239{
1240 EventBuilderWrapper::This->gotNewRun(run);
1241}
1242
1243void runFinished()
1244{
1245 EventBuilderWrapper::This->runFinished();
1246}
1247
1248void applyCalib(const shared_ptr<EVT_CTRL2> &evt)
1249{
1250 EventBuilderWrapper::This->applyCalib(evt);
1251}
1252
1253void factOut(int severity, const char *message)
1254{
1255 EventBuilderWrapper::This->factOut(severity, message);
1256}
1257
1258void factStat(GUI_STAT stat)
1259{
1260 EventBuilderWrapper::This->factStat(stat);
1261}
1262
1263void factReportIncomplete(uint64_t rep)
1264{
1265 EventBuilderWrapper::This->factReportIncomplete(rep);
1266}
1267
1268// ------
1269
1270void debugHead(void *buf)
1271{
1272 const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf);
1273 EventBuilderWrapper::This->debugHead(h);
1274}
1275
1276#endif
Note: See TracBrowser for help on using the repository browser.