source: trunk/FACT++/src/EventBuilderWrapper.h@ 16620

Last change on this file since 16620 was 16619, checked in by tbretz, 12 years ago
The incompolete event timeout can now be set as configuration value
File size: 42.5 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteFits2
24#endif
25
26#include "DataWriteFits2.h"
27
28#include "queue.h"
29
30namespace ba = boost::asio;
31namespace bs = boost::system;
32namespace fs = boost::filesystem;
33
34using ba::ip::tcp;
35
36using namespace std;
37
38// ========================================================================
39
40#include "EventBuilder.h"
41
42void StartEvtBuild();
43void CloseRunFile();
44
45// ========================================================================
46
47class EventBuilderWrapper
48{
49public:
50 // FIXME
51 static EventBuilderWrapper *This;
52
53 MessageImp &fMsg;
54
55private:
56 boost::thread fThreadMain;
57
58 enum
59 {
60 kCurrent = 0,
61 kTotal = 1,
62 kEventId = 2,
63 kTriggerId = 3,
64 };
65
66 FAD::FileFormat_t fFileFormat;
67
68 //uint32_t fMaxRun;
69 uint32_t fLastOpened;
70 uint32_t fLastClosed;
71 array<uint32_t,4> fNumEvts;
72
73 DimWriteStatistics fDimWriteStats;
74 DimDescribedService fDimRuns;
75 DimDescribedService fDimEvents;
76 DimDescribedService fDimRawData;
77 DimDescribedService fDimEventData;
78 DimDescribedService fDimFeedbackData;
79 DimDescribedService fDimFwVersion;
80 DimDescribedService fDimRunNumber;
81 DimDescribedService fDimStatus;
82 DimDescribedService fDimDNA;
83 DimDescribedService fDimTemperature;
84 DimDescribedService fDimPrescaler;
85 DimDescribedService fDimRefClock;
86 DimDescribedService fDimRoi;
87 DimDescribedService fDimDac;
88 DimDescribedService fDimDrsRuns;
89 DimDescribedService fDimDrsCalibration;
90 DimDescribedService fDimStatistics1;
91 //DimDescribedService fDimStatistics2;
92 DimDescribedService fDimFileFormat;
93 DimDescribedService fDimIncomplete;
94
95 Queue<pair<Time,GUI_STAT>> fQueueStatistics1;
96 Queue<tuple<Time,bool,FAD::EventHeader>> fQueueProcHeader;
97 Queue<pair<Time,array<uint32_t,4>>> fQueueEvents;
98 Queue<pair<Time,array<uint16_t,2>>> fQueueRoi;
99 Queue<vector<char>> fQueueRawData;
100 Queue<tuple<Time,uint32_t,array<float,1440*4>>> fQueueEventData;
101
102 string fPath;
103 uint32_t fNightAsInt;
104 uint32_t fRunNumber;
105 int64_t fRunInProgress;
106
107 array<uint16_t,2> fVecRoi;
108 pair<float,array<float, 1440*4>> fMaxEvent; // Maximum event from applyCalib
109
110protected:
111 bool InitRunNumber(const string &path="")
112 {
113 if (!path.empty())
114 {
115 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
116 {
117 fMsg.Error("Data path "+path+" does not exist!");
118 return false;
119 }
120
121 fPath = path;
122 fDimWriteStats.SetCurrentFolder(fPath);
123
124 fMsg.Info("Data path set to "+path+".");
125 }
126
127 // Get current night
128 const uint32_t night = Time().NightAsInt();
129 if (night==fNightAsInt)
130 return true;
131
132 // Check for run numbers
133 fRunNumber = 1000;
134
135 while (--fRunNumber>0)
136 {
137 const string name = DataProcessorImp::FormFileName(fPath, night, fRunNumber, "");
138
139 if (access((name+"bin").c_str(), F_OK) == 0)
140 break;
141 if (access((name+"fits").c_str(), F_OK) == 0)
142 break;
143 if (access((name+"drs.fits").c_str(), F_OK) == 0)
144 break;
145 }
146
147 // This is now the first file which does not exist
148 fRunNumber++;
149 fLastOpened = 0;
150
151 // Check if we have exceeded the maximum
152 if (fRunNumber==1000)
153 {
154 fMsg.Error("You have a file with run-number 1000 in "+fPath+" ["+to_string(night)+"]");
155 return false;
156 }
157
158 ostringstream str;
159 if (fNightAsInt==0)
160 str << "First night...";
161 else
162 str << "Night has changd from " << fNightAsInt << "... new";
163 str << " run-number is " << night << "-" << setfill('0') << setw(3) << fRunNumber << " [" << (fPath.empty()?".":fPath) << "]";
164 fMsg.Message(str);
165
166 fNightAsInt = night;
167
168 return true;
169 }
170
171public:
172 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
173 fFileFormat(FAD::kNone), /*fMaxRun(0),*/ fLastOpened(0), fLastClosed(0),
174 fDimWriteStats ("FAD_CONTROL", imp),
175 fDimRuns ("FAD_CONTROL/RUNS", "I:2;C",
176 "Run files statistics"
177 "|stats[int]:last opened or closed run"
178 "|file[string]:filename of last opened file"),
179 fDimEvents ("FAD_CONTROL/EVENTS", "I:4",
180 "Event counts"
181 "|evtsCount[int]:Num evts cur. run, total (all run), evt ID, trig. Num"),
182 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;S:1;I:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F",
183 "|roi[uint16]:number of samples per pixel"
184 "|roi_tm[uint16]:number of samples per time-marker channel"
185 "|num_fad[uint32]:event number from FADs"
186 "|num_ftm[uint32]:trigger number from FTM"
187 "|type[uint16]:trigger type from FTM"
188 "|num_boards[uint32]:number of active boards"
189 "|time[uint32]:PC time as unix time stamp"
190 "|time_board[uint32]:Time stamp of FAD boards"
191 "|start_pix[int16]:start sample of pixels"
192 "|start_tm[int16]:start sample of time marker channels"
193 "|adc[int16]:adc data"),
194 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", "|avg:|rms:|max:|pos"),
195 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
196 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42",
197 "Firmware version number of fad boards"
198 "|firmware[float]:Version number of firmware, for each board. 40=min, 41=max"),
199 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42",
200 "Run numbers coming from FAD boards"
201 "|runNumbers[int]:current run number of each FAD board. 40=min, 41=max"),
202 fDimStatus ("FAD_CONTROL/STATUS", "S:42",
203 "Status of FAD boards"
204 "|status[bitpattern]:Status of each FAD board. Maybe buggy"),
205 fDimDNA ("FAD_CONTROL/DNA", "X:40",
206 "DNA of FAD boards"
207 "|DNA[hex]:Hex identifier of each FAD board"),
208 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82",
209 "FADs temperatures"
210 "|temp[deg. C]:0 global min, 1-40 min, 41 global max, 42-81 max"),
211 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42",
212 "Trigger generator prescaler of fad boards"
213 "|prescaler[int]:Trigger generator prescaler value, for each board"),
214 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42",
215 "Reference clock of FAD boards"
216 "|refClocks[t]:ref clocks of FAD boards. 40=min, 41=max"),
217 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", "roi:|roi_rm:"),
218 fDimDac ("FAD_CONTROL/DAC", "S:336",
219 "DAC settings of each FAD board"
220 "|DAC[int]:DAC counts, sequentially DAC 0 board 0, 0/1, 0/2... (plus min max)"),
221 fDimDrsRuns ("FAD_CONTROL/DRS_RUNS", "I:1;I:3",
222 "|roi:Region of interest of secondary baseline"
223 "|run:Run numbers of DRS runs (0=none)"),
224 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:1;I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:163840;F:163840",
225 "|roi:Region of interest of secondary baseline"
226 "|run:Run numbers of DRS runs (0=none)"),
227 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:5;X:3;I:1;I:2;C:40;I:40;I:40",
228 "Event Builder status for GUI display"
229 "|bufferInfo[int]:Events in buffer, incomp., comp., write, proc., tot."
230 "|memInfo[int]:total mem allocated, used mem, max memory"
231 "|deltaT[ms]:Time in ms for rates"
232 "|rateNew[int]:Number of new start events received"
233 "|numConn[int]:Number of connections per board"
234 "|rateBytes[int]:Bytes read during last cylce"
235 "|relBytes[int]:Relative number of total bytes received (received - released)"),
236 fDimFileFormat("FAD_CONTROL/FILE_FORMAT", "S:1", "|format[int]:Current file format"),
237 fDimIncomplete("FAD_CONTROL/INCOMPLETE", "X:1", "|incomplete[bits]:One bit per board"),
238 // It is important to instantiate them after the DimServices
239 fQueueStatistics1(std::bind(&EventBuilderWrapper::UpdateDimStatistics1, this, placeholders::_1)),
240 fQueueProcHeader( std::bind(&EventBuilderWrapper::procHeader, this, placeholders::_1)),
241 fQueueEvents( std::bind(&EventBuilderWrapper::UpdateDimEvents, this, placeholders::_1)),
242 fQueueRoi( std::bind(&EventBuilderWrapper::UpdateDimRoi, this, placeholders::_1)),
243 fQueueRawData( std::bind(&EventBuilderWrapper::UpdateDimRawData, this, placeholders::_1)),
244 fQueueEventData( std::bind(&EventBuilderWrapper::UpdateDimEventData, this, placeholders::_1)),
245 fNightAsInt(0), fRunInProgress(-1),
246 fMaxEvent(make_pair(-FLT_MAX, array<float,1440*4>()))
247 {
248 if (This)
249 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
250
251 This = this;
252
253 fVecRoi.fill(0);
254
255 memset(fNumEvts.data(), 0, sizeof(fNumEvts));
256 fDimEvents.Update(fNumEvts);
257
258 for (size_t i=0; i<40; i++)
259 ConnectSlot(i, tcp::endpoint());
260 }
261
262 virtual ~EventBuilderWrapper()
263 {
264 Abort();
265
266 // FIXME: Used timed_join and abort afterwards
267 // What's the maximum time the eb need to abort?
268 fThreadMain.join();
269 }
270
271 map<uint32_t, FAD::RunDescription> fExpectedRuns;
272
273 mutex mtx_newrun;
274
275 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
276 {
277 if (maxtime<=0 || maxtime>24*60*60)
278 maxtime = 24*60*60;
279 if (maxevt<=0 || maxevt>INT32_MAX)
280 maxevt = INT32_MAX;
281
282 if (!InitRunNumber())
283 return 0;
284
285 const FAD::RunDescription descr =
286 {
287 uint32_t(maxtime),
288 uint32_t(maxevt),
289 fNightAsInt,
290 ref.first,
291 ref.second,
292 };
293
294 const lock_guard<mutex> lock(mtx_newrun);
295 fExpectedRuns[fRunNumber] = descr;
296 return fRunNumber++;
297 }
298
299 bool IsThreadRunning()
300 {
301 if (fThreadMain.get_id()==boost::this_thread::get_id())
302 return true;
303 return !fThreadMain.timed_join(boost::posix_time::microseconds(0));
304 }
305
306 void SetMaxMemory(unsigned int mb) const
307 {
308 g_maxMem = size_t(mb)*1000000;
309 }
310 void SetEventTimeout(uint16_t to) const
311 {
312 g_evtTimeout = to;
313 }
314
315 void StartThread(const vector<tcp::endpoint> &addr)
316 {
317 if (IsThreadRunning())
318 {
319 fMsg.Warn("Start - EventBuilder still running");
320 return;
321 }
322
323 //fLastMessage.clear();
324
325 for (size_t i=0; i<40; i++)
326 ConnectSlot(i, addr[i]);
327
328 fMsg.Message("Starting EventBuilder thread");
329
330 fThreadMain = boost::thread(StartEvtBuild);
331 }
332
333 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
334 {
335 if (i>39)
336 return;
337
338 fRunInProgress = -1;
339
340 if (addr==tcp::endpoint())
341 {
342 // In this order
343 g_port[i].sockDef = 0;
344
345 fDimIncomplete.setQuality(0);
346 fDimIncomplete.Update(uint64_t(0));
347 return;
348 }
349
350 struct sockaddr_in sockaddr; //IP for each socket
351 sockaddr.sin_family = AF_INET;
352 sockaddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
353 sockaddr.sin_port = htons(addr.port());
354 memcpy(&g_port[i].sockAddr, &sockaddr, sizeof(struct sockaddr_in));
355
356 // In this order
357 g_port[i].sockDef = 1;
358
359 fDimIncomplete.setQuality(0);
360 fDimIncomplete.Update(uint64_t(0));
361 }
362
363 void IgnoreSlot(unsigned int i)
364 {
365 if (i>39)
366 return;
367
368 if (g_port[i].sockAddr.sin_port==0)
369 return;
370
371 g_port[i].sockDef = -1;
372 }
373
374
375 void Abort()
376 {
377 fMsg.Message("Signal abort to EventBuilder thread...");
378 g_reset = 2;
379 }
380
381 void ResetThread(bool soft)
382 {
383 fMsg.Message("Signal reset to EventBuilder thread...");
384 g_reset = soft ? 101 : 102;
385 }
386
387 void Exit()
388 {
389 fMsg.Message("Signal exit to EventBuilder thread...");
390 g_reset = 1;
391 }
392
393 bool IsConnected(int i) const { return gi_NumConnect[i]==1; }
394 bool IsConnecting(int i) const { return gi_NumConnect[i]==0 && g_port[i].sockDef!=0; }
395 bool IsDisconnected(int i) const { return gi_NumConnect[i]==0 && g_port[i].sockDef==0; }
396 bool IsRunInProgress() const { return fRunInProgress>=0; }
397
398 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
399 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
400
401 void SetOutputFormat(FAD::FileFormat_t f)
402 {
403 const bool changed = f!=fFileFormat;
404
405 fFileFormat = f;
406 fDimFileFormat.Update(uint16_t(f));
407
408 string msg = "File format set to: ";
409 switch (f)
410 {
411 case FAD::kNone: msg += "kNone."; break;
412 case FAD::kDebug: msg += "kDebug."; break;
413 case FAD::kFits: msg += "kFits."; break;
414 case FAD::kCfitsio: msg += "kCfitsio"; break;
415 case FAD::kRaw: msg += "kRaw"; break;
416 case FAD::kCalib:
417 DataCalib::Restart();
418 DataCalib::Update(fDimDrsCalibration, fDimDrsRuns);
419 fMsg.Message("Resetted DRS calibration.");
420 return;
421 }
422
423 if (changed)
424 fMsg.Message(msg);
425 }
426
427 virtual int ResetSecondaryDrsBaseline()
428 {
429 if (DataCalib::ResetTrgOff(fDimDrsCalibration, fDimDrsRuns))
430 {
431 fFileFormat = FAD::kCalib;
432 fDimFileFormat.Update(uint16_t(fFileFormat));
433 fMsg.Message("Resetted DRS calibration for secondary baseline.");
434 }
435 else
436 fMsg.Warn("Could not reset DRS calibration of secondary baseline.");
437
438 return 0;
439 }
440
441 void LoadDrsCalibration(const char *fname)
442 {
443 if (!DataCalib::ReadFits(fname, fMsg))
444 return;
445 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
446 DataCalib::Update(fDimDrsCalibration, fDimDrsRuns);
447 }
448
449 virtual int CloseOpenFiles() { CloseRunFile(); fRunInProgress = -1; return 0; }
450
451
452 // -------------- Mapped event builder callbacks ------------------
453
454 void UpdateRuns(const string &fname="")
455 {
456 uint32_t values[2] =
457 {
458 fLastOpened,
459 fLastClosed
460 };
461
462 vector<char> data(sizeof(values)+fname.size()+1);
463 memcpy(data.data(), values, sizeof(values));
464 strcpy(data.data()+sizeof(values), fname.c_str());
465 fDimRuns.setQuality((bool)fFile);
466 fDimRuns.Update(data);
467
468 if (!fname.empty())
469 fDimWriteStats.FileOpened(fname);
470 }
471
472 shared_ptr<DataProcessorImp> fFile;
473
474 void UpdateDimEvents(const pair<Time,array<uint32_t,4>> &stat)
475 {
476 fDimEvents.setData(stat.second.data(), sizeof(uint32_t)*4);
477 fDimEvents.Update(stat.first);
478 }
479
480 bool runOpen(const EVT_CTRL2 &evt)
481 {
482 const uint32_t night = evt.runCtrl->night;
483 const uint32_t runid = evt.runNum>0 ? evt.runNum : time(NULL);
484
485 // If there is still an open file: close it
486 if (fFile)
487 runClose(*evt.runCtrl);
488
489 // Keep a copy of the currently valid drs calibration
490 // and associate it to the run control structure
491 evt.runCtrl->calib = shared_ptr<DrsCalibration>(new DrsCalibration(DataCalib::GetCalibration()));
492
493 /*
494 evt.runCtrl->calibInt.resize(1024*1440);
495
496 const int16_t *off = evt.runCtrl->zcalib.data();
497 int32_t *ptr = evt.runCtrl->calib.data();
498
499 const uint64_t num = evt.runCtrl->calib.fNumOffset;
500 for (int i=0; i<1024*1440)
501 ptr[i] = off[i]/num;
502 */
503
504 // FIMXE: Check if file already exists...
505
506 // Crate the file
507 DataProcessorImp *file = 0;
508 switch (fFileFormat)
509 {
510 case FAD::kNone: file = new DataDump(fPath, night, runid, fMsg); break;
511 case FAD::kDebug: file = new DataDebug(fPath, night, runid, fMsg); break;
512 case FAD::kCfitsio: file = new DataWriteFits(fPath, night, runid, fMsg); break;
513 case FAD::kFits: file = new DataWriteFits2(fPath, night, runid, fMsg); break;
514 case FAD::kRaw: file = new DataWriteRaw(fPath, night, runid, fMsg); break;
515 case FAD::kCalib: file = new DataCalib(fPath, night, runid, fDimDrsCalibration, fDimDrsRuns, fMsg); break;
516 }
517
518 try
519 {
520 // Try to open the file
521 FAD::RunDescription desc;
522 desc.name = evt.runCtrl->runType;
523
524 if (!file->Open(evt, desc))
525 return false;
526 }
527 catch (const exception &e)
528 {
529 fMsg.Error("Exception trying to open file: "+string(e.what()));
530 return false;
531 }
532
533 fLastOpened = runid;
534
535 // Signal that a file is open
536 fFile = shared_ptr<DataProcessorImp>(file);
537
538 // Now do all the calls which potentially block (dim)
539
540 // Time for update runs before time for update events
541 UpdateRuns(file->GetFileName());
542 fNumEvts[kEventId] = 0;
543 fNumEvts[kTriggerId] = 0;
544 fNumEvts[kCurrent] = 0;
545 fQueueEvents.emplace(Time(), fNumEvts);
546
547 ostringstream str;
548 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
549 fMsg.Info(str);
550
551 return true;
552 }
553
554 bool runWrite(const EVT_CTRL2 &e)
555 {
556 /*
557 const size_t size = sizeof(EVENT)+1440*(evt.Roi+evt.RoiTM)*2;
558 vector evt(e.fEvent, e.fEvent+size);
559
560 const EVENT &evt = *reinterpret_cast<EVENT*>(evt.data());
561
562 int16_t *val = evt.Adc_Data;
563 const int16_t *off = e.runCtrl->zcalib.data();
564 for (const int16_t *start=evt.StartPix; start<evt.StartPix+1440; val+=1024, off+=1024, start++)
565 {
566 if (*start<0)
567 continue;
568
569 for (size_t i=0; i<roi; i++)
570 val[i] -= offset[(*start+i)%1024];
571 }*/
572
573 const EVENT &evt = *e.fEvent;
574 if (!fFile->WriteEvt(evt))
575 return false;
576
577 fNumEvts[kCurrent]++;
578 fNumEvts[kEventId] = evt.EventNum;
579 fNumEvts[kTriggerId] = evt.TriggerNum;
580 fNumEvts[kTotal]++;
581
582 static Time oldt(boost::date_time::neg_infin);
583 Time newt;
584 if (newt>oldt+boost::posix_time::seconds(1))
585 {
586 fQueueEvents.emplace(newt, fNumEvts);
587 oldt = newt;
588 }
589
590 return true;
591 }
592
593 void runClose(RUN_CTRL2 &run)
594 {
595 if (!fFile)
596 return;
597
598 // It can happen that runFinished was never called
599 // (e.g. runWrite failed)
600 if (fRunInProgress==fFile->GetRunId())
601 fRunInProgress = -1;
602
603 // Close the file
604 const bool rc = fFile->Close(NULL);
605
606 fLastClosed = fFile->GetRunId();
607
608 ostringstream str;
609 str << "Closed: " << fFile->GetFileName() << " (" << fFile->GetRunId() << ")";
610 if (!rc)
611 str << "... failed!";
612
613 // Signal that the file is closed
614
615 fFile.reset();
616
617 // Now do all the calls which can potentially block (dim)
618
619 CloseRun(fLastClosed);
620
621 // Time for update events before time for update runs
622 fQueueEvents.emplace(Time(), fNumEvts);
623 UpdateRuns();
624
625 // Do the potentially blocking call after all others
626 rc ? fMsg.Info(str) : fMsg.Error(str);
627
628 // If a Drs Calibration has just been finished, all following events
629 // should also be processed with this calibration
630 const DrsCalibration &cal = DataCalib::GetCalibration();
631 if (!run.calib || run.calib->fStep != cal.fStep || run.calib->fRoi!=cal.fRoi)
632 run.calib = shared_ptr<DrsCalibration>(new DrsCalibration(cal));
633 }
634
635 virtual void CloseRun(uint32_t /*runid*/) { }
636
637 void UpdateDimRoi(const pair<Time, array<uint16_t,2>> &roi)
638 {
639 fDimRoi.setData(roi.second.data(), sizeof(uint16_t)*2);
640 fDimRoi.Update(roi.first);
641 }
642
643 bool eventCheck(const EVT_CTRL2 &evt)
644 {
645 const EVENT *event = evt.fEvent;
646
647 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
648
649 if (roi!=fVecRoi)
650 {
651 fQueueRoi.emplace(Time(), roi);
652 fVecRoi = roi;
653 }
654
655 const FAD::EventHeader *beg = reinterpret_cast<const FAD::EventHeader*>(evt.FADhead);
656 const FAD::EventHeader *end = reinterpret_cast<const FAD::EventHeader*>(evt.FADhead)+40;
657
658 // FIMXE: Compare with target configuration
659
660 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
661 {
662 // FIXME: Compare with expectations!!!
663 if (ptr->fStartDelimiter==0)
664 {
665 if (ptr==beg)
666 beg++;
667 continue;
668 }
669
670 if (beg->fStatus != ptr->fStatus)
671 {
672 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
673 return false;
674 }
675
676 if (beg->fRunNumber != ptr->fRunNumber)
677 {
678 fMsg.Error("Inconsistent run number detected.... closing run.");
679 return false;
680 }
681
682 /*
683 if (beg->fVersion != ptr->fVersion)
684 {
685 Error("Inconsist firmware version detected.... closing run.");
686 CloseRunFile(runNr, 0, 0);
687 break;
688 }
689 */
690 if (beg->fEventCounter != ptr->fEventCounter)
691 {
692 fMsg.Error("Inconsistent FAD event number detected.... closing run.");
693 return false;
694 }
695
696 if (beg->fTriggerCounter != ptr->fTriggerCounter)
697 {
698 fMsg.Error("Inconsistent FTM trigger number detected.... closing run.");
699 return false;
700 }
701
702 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
703 {
704 fMsg.Error("Inconsistent phase shift detected.... closing run.");
705 return false;
706 }
707
708 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
709 {
710 fMsg.Error("Inconsistent DAC values detected.... closing run.");
711 return false;
712 }
713
714 if (beg->fTriggerType != ptr->fTriggerType)
715 {
716 fMsg.Error("Inconsistent trigger type detected.... closing run.");
717 return false;
718 }
719 }
720
721 // check REFCLK_frequency
722 // check consistency with command configuration
723 // how to log errors?
724 // need gotNewRun/closedRun to know it is finished
725
726 return true;
727 }
728
729 Time fLastDimRawData;
730 Time fLastDimEventData;
731
732 void UpdateDimRawData(const vector<char> &v)
733 {
734 const EVENT *evt = reinterpret_cast<const EVENT*>(v.data());
735
736 fDimRawData.setData(v);
737 fDimRawData.setQuality(evt->TriggerType);
738 fDimRawData.Update(Time(evt->PCTime, evt->PCUsec));
739 }
740 void UpdateDimEventData(const tuple<Time,uint32_t,array<float, 1440*4>> &tup)
741 {
742 fDimEventData.setQuality(get<1>(tup));
743 fDimEventData.setData(get<2>(tup));
744 fDimEventData.Update(get<0>(tup));
745 }
746
747 void applyCalib(const EVT_CTRL2 &evt, const size_t &size)
748 {
749 const EVENT *event = evt.fEvent;
750 const int16_t *start = event->StartPix;
751
752 // Get the reference to the run associated information
753 RUN_CTRL2 &run = *evt.runCtrl;
754
755 if (size==1) // If there is more than one event waiting (including this one), throw them away
756 {
757 Time now;
758
759 // ------------------- Copy event data to new memory --------------------
760 // (to make it thread safe; a static buffer might improve memory handling)
761 const uint16_t roi = event->Roi;
762
763 // ------------------- Apply full DRS calibration ------------------------
764 // (Is that necessray, or would a simple offset correct do well already?)
765
766 // There seems to be a problem using std::array... maybe the size is too big?
767 // array<float, (1440+160)*1024> vec2;
768 vector<float> vec((1440+160)*roi);
769 run.calib->Apply(vec.data(), event->Adc_Data, start, roi);
770
771 // ------------------- Appy DRS-step correction --------------------------
772 for (auto it=run.prevStart.begin(); it!=run.prevStart.end(); it++)
773 {
774 DrsCalibrate::CorrectStep(vec.data(), 1440, roi, it->data(), start, roi+10);
775 DrsCalibrate::CorrectStep(vec.data(), 1440, roi, it->data(), start, 3);
776 }
777
778 // ------------------------- Remove spikes --------------------------------
779 DrsCalibrate::RemoveSpikes3(vec.data(), roi);
780
781 // -------------- Update raw data dim sevice (VERY SLOW) -----------------
782 if (fQueueRawData.empty() && now>fLastDimRawData+boost::posix_time::seconds(5))
783 {
784 vector<char> data1(sizeof(EVENT)+vec.size()*sizeof(float));
785 memcpy(data1.data(), event, sizeof(EVENT));
786 memcpy(data1.data()+sizeof(EVENT), vec.data(), vec.size()*sizeof(float));
787 fQueueRawData.emplace(data1);
788
789 fLastDimRawData = now;
790 }
791
792 // ------------------------- Basic statistics -----------------------------
793 DrsCalibrate::SlidingAverage(vec.data(), roi, 10);
794
795 // If this is a cosmic event
796 array<float, 1440*4> stats; // Mean, RMS, Max, Pos
797 const float max = DrsCalibrate::GetPixelStats(stats.data(), vec.data(), roi);
798 if (evt.trgTyp==0 && max>fMaxEvent.first)
799 fMaxEvent = make_pair(max, stats);
800
801 // ------------------ Update dim service (statistics) ---------------------
802
803 if (fQueueEventData.empty() && now>fLastDimEventData+boost::posix_time::seconds(3))
804 {
805 fQueueEventData.emplace(evt.time, evt.trgTyp, evt.trgTyp==0 ? fMaxEvent.second : stats);
806 if (evt.trgTyp==0)
807 fMaxEvent.first = -FLT_MAX;
808
809 fLastDimEventData = now;
810 }
811
812 // === SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
813 //
814 // if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
815 // return;
816 //
817 // vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
818 // DrsCalibrate::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
819 //
820 // fDimFeedbackData.Update(data2);
821 }
822
823 // Keep the start cells of the last five events for further corrections
824 // As a performance improvement we could also just store the
825 // pointers to the last five events...
826 // What if a new run is started? Do we mind?
827 auto &l = run.prevStart; // History for start cells of previous events (for step calibration)
828
829 if (l.size()<5)
830 l.emplace_front();
831 else
832 {
833 auto it = l.end();
834 l.splice(l.begin(), l, --it);
835 }
836
837 memcpy(l.front().data(), start, 1440*sizeof(int16_t));
838 }
839
840 bool IsRunWaiting()
841 {
842 const lock_guard<mutex> lock(mtx_newrun);
843 return fExpectedRuns.find(fRunNumber-1)!=fExpectedRuns.end();
844 }
845
846 uint32_t GetRunNumber() const
847 {
848 return fRunNumber;
849 }
850
851 bool IncreaseRunNumber(uint32_t run)
852 {
853 if (!InitRunNumber())
854 return false;
855
856 if (run<fRunNumber)
857 {
858 ostringstream msg;
859 msg <<
860 "Run number " << run << " smaller than next available "
861 "run number " << fRunNumber << " in " << fPath << " [" << fNightAsInt << "]";
862 fMsg.Error(msg);
863 return false;
864 }
865
866 fRunNumber = run;
867
868 return true;
869 }
870
871 void gotNewRun(RUN_CTRL2 &run)
872 {
873 // This is to secure iteration over fExpectedRuns
874 const lock_guard<mutex> lock(mtx_newrun);
875
876 map<uint32_t,FAD::RunDescription>::iterator it = fExpectedRuns.begin();
877 while (it!=fExpectedRuns.end())
878 {
879 if (it->first<run.runId)
880 {
881 ostringstream str;
882 str << "runOpen - Missed run " << it->first << ".";
883 fMsg.Info(str);
884
885 // Increase the iterator first, it becomes invalid with the next call
886 const auto is = it++;
887 fExpectedRuns.erase(is);
888 continue;
889 }
890
891 if (it->first==run.runId)
892 break;
893
894 it++;
895 }
896
897 if (it==fExpectedRuns.end())
898 {
899 ostringstream str;
900 str << "runOpen - Run " << run.runId << " wasn't expected (maybe manual triggers)";
901 fMsg.Warn(str);
902
903 // This is not ideal, but the best we can do
904 run.night = fNightAsInt;
905
906 return;
907 }
908
909 const FAD::RunDescription &conf = it->second;
910
911 run.runType = conf.name;
912 run.maxEvt = conf.maxevt;
913 run.closeTime = conf.maxtime + run.openTime;
914 run.night = conf.night;
915
916 fExpectedRuns.erase(it);
917
918 // Now signal the fadctrl (configuration process that a run is in progress)
919 // Maybe this could be done earlier, but we are talking about a
920 // negligible time scale here.
921 fRunInProgress = run.runId;
922 }
923
924 void runFinished()
925 {
926 // This is called when the last event of a run (run time exceeded or
927 // max number of events exceeded) has been received.
928 fRunInProgress = -1;
929 }
930
931 //map<boost::thread::id, string> fLastMessage;
932
933 void factOut(int severity, const char *message)
934 {
935 ostringstream str;
936 str << "EventBuilder: " << message;
937
938 /*
939 string &old = fLastMessage[boost::this_thread::get_id()];
940
941 if (str.str()==old)
942 return;
943 old = str.str();
944 */
945
946 fMsg.Update(str, severity);
947 }
948
949/*
950 void factStat(int64_t *stat, int len)
951 {
952 if (len!=7)
953 {
954 fMsg.Warn("factStat received unknown number of values.");
955 return;
956 }
957
958 vector<int64_t> data(1, g_maxMem);
959 data.insert(data.end(), stat, stat+len);
960
961 static vector<int64_t> last(8);
962 if (data==last)
963 return;
964 last = data;
965
966 fDimStatistics.Update(data);
967
968 // len ist die Laenge des arrays.
969 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
970 // kannst Du pruefen, ob die 100MB voll sind ....
971
972 ostringstream str;
973 str
974 << "Wait=" << stat[0] << " "
975 << "Skip=" << stat[1] << " "
976 << "Del=" << stat[2] << " "
977 << "Tot=" << stat[3] << " "
978 << "Mem=" << stat[4] << "/" << g_maxMem << " "
979 << "Read=" << stat[5] << " "
980 << "Conn=" << stat[6];
981
982 fMsg.Info(str);
983 }
984 */
985
986 void UpdateDimStatistics1(const pair<Time,GUI_STAT> &stat)
987 {
988 fDimStatistics1.setData(&stat.second, sizeof(GUI_STAT));
989 fDimStatistics1.Update(stat.first);
990 }
991
992 void factStat(const GUI_STAT &stat)
993 {
994 fQueueStatistics1.emplace(Time(), stat);
995 }
996
997 void factReportIncomplete(uint64_t rep)
998 {
999 fDimIncomplete.setQuality(1);
1000 fDimIncomplete.Update(rep);
1001 }
1002
1003 array<FAD::EventHeader, 40> fVecHeader;
1004
1005 template<typename T, class S>
1006 array<T, 42> Compare(const S *vec, const T *t)
1007 {
1008 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1009
1010 const T *min = NULL;
1011 const T *val = NULL;
1012 const T *max = NULL;
1013
1014 array<T, 42> arr;
1015
1016 // bool rc = true;
1017 for (int i=0; i<40; i++)
1018 {
1019 const char *base = reinterpret_cast<const char*>(vec+i);
1020 const T *ref = reinterpret_cast<const T*>(base+offset);
1021
1022 arr[i] = *ref;
1023
1024 if (gi_NumConnect[i]==0)
1025 {
1026 arr[i] = 0;
1027 continue;
1028 }
1029
1030 if (!val)
1031 {
1032 min = ref;
1033 val = ref;
1034 max = ref;
1035 }
1036
1037 if (*ref<*min)
1038 min = ref;
1039
1040 if (*ref>*max)
1041 max = ref;
1042
1043 // if (*val!=*ref)
1044 // rc = false;
1045 }
1046
1047 arr[40] = val ? *min : 1;
1048 arr[41] = val ? *max : 0;
1049
1050 return arr;
1051 }
1052
1053 template<typename T>
1054 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1055 {
1056 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1057
1058 T val = 0;
1059 T rc = 0;
1060
1061 array<T, 42> vec;
1062
1063 bool first = true;
1064
1065 for (int i=0; i<40; i++)
1066 {
1067 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1068 const T *ref = reinterpret_cast<const T*>(base+offset);
1069
1070 vec[i+2] = *ref;
1071
1072 if (gi_NumConnect[i]==0)
1073 {
1074 vec[i+2] = 0;
1075 continue;
1076 }
1077
1078 if (first)
1079 {
1080 first = false;
1081 val = *ref;
1082 rc = 0;
1083 }
1084
1085 rc |= val^*ref;
1086 }
1087
1088 vec[0] = rc;
1089 vec[1] = val;
1090
1091 return vec;
1092 }
1093
1094 template<typename T, size_t N>
1095 void Update(DimDescribedService &svc, const array<T, N> &data, const Time &t=Time(), int n=N)
1096 {
1097 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1098 svc.Update(t);
1099 }
1100
1101 template<typename T>
1102 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1103 {
1104 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1105 for (int i=0; i<40;i++)
1106 cout << " " << data.second[i+3];
1107 cout << endl;
1108 }
1109
1110 vector<uint> fNumConnected;
1111
1112 void procHeader(const tuple<Time,bool,FAD::EventHeader> &dat)
1113 {
1114 const Time &t = get<0>(dat);
1115 const bool changed = get<1>(dat);
1116 const FAD::EventHeader &h = get<2>(dat);
1117
1118 const FAD::EventHeader old = fVecHeader[h.Id()];
1119 fVecHeader[h.Id()] = h;
1120
1121 if (old.fVersion != h.fVersion || changed)
1122 {
1123 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1124
1125 array<float,42> data;
1126 for (int i=0; i<42; i++)
1127 {
1128 ostringstream str;
1129 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1130 data[i] = stof(str.str());
1131 }
1132 Update(fDimFwVersion, data, t);
1133 }
1134
1135 if (old.fRunNumber != h.fRunNumber || changed)
1136 {
1137 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1138 fDimRunNumber.setData(&run[0], 42*sizeof(uint32_t));
1139 fDimRunNumber.Update(t);
1140 }
1141
1142 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1143 {
1144 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1145 fDimPrescaler.setData(&pre[0], 42*sizeof(uint16_t));
1146 fDimPrescaler.Update(t);
1147 }
1148
1149 if (old.fDNA != h.fDNA || changed)
1150 {
1151 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1152 Update(fDimDNA, dna, t, 40);
1153 }
1154
1155 if (old.fStatus != h.fStatus || changed)
1156 {
1157 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1158 Update(fDimStatus, sts, t);
1159 }
1160
1161 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1162 {
1163 array<uint16_t, FAD::kNumDac*42> dacs;
1164
1165 for (int i=0; i<FAD::kNumDac; i++)
1166 {
1167 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1168 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1169 }
1170
1171 Update(fDimDac, dacs, t);
1172 }
1173
1174 // -----------
1175
1176 static Time oldt(boost::date_time::neg_infin);
1177 Time newt;
1178
1179 if (newt>oldt+boost::posix_time::seconds(1))
1180 {
1181 oldt = newt;
1182
1183 // --- RefClock
1184
1185 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1186 Update(fDimRefClock, clk, t);
1187
1188 // --- Temperatures
1189
1190 const array<int16_t,42> tmp[4] =
1191 {
1192 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1193 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1194 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1195 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1196 };
1197
1198 vector<int16_t> data;
1199 data.reserve(82);
1200 data.push_back(tmp[0][40]); // min: 0
1201 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1202 data.push_back(tmp[0][41]); // max: 41
1203 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1204
1205 for (int j=1; j<=3; j++)
1206 {
1207 const array<int16_t,42> &ref = tmp[j];
1208
1209 // Gloabl min
1210 if (ref[40]<data[0]) // 40=min
1211 data[0] = ref[40];
1212
1213 // Global max
1214 if (ref[41]>data[41]) // 41=max
1215 data[41] = ref[41];
1216
1217 for (int i=0; i<40; i++)
1218 {
1219 // min per board
1220 if (ref[i]<data[i+1]) // data: 1-40
1221 data[i+1] = ref[i]; // ref: 0-39
1222
1223 // max per board
1224 if (ref[i]>data[i+42]) // data: 42-81
1225 data[i+42] = ref[i]; // ref: 0-39
1226 }
1227 }
1228
1229 vector<float> deg(82); // 0: global min, 1-40: min
1230 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1231 deg[i] = data[i]/16.;
1232
1233 fDimTemperature.setData(deg.data(), 82*sizeof(float));
1234 fDimTemperature.Update(t);
1235 }
1236 }
1237
1238 void debugHead(const FAD::EventHeader &h)
1239 {
1240 const uint16_t id = h.Id();
1241 if (id>39)
1242 return;
1243
1244 if (fNumConnected.size()!=40)
1245 fNumConnected.resize(40);
1246
1247 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1248
1249 const bool changed = con!=fNumConnected || !IsThreadRunning();
1250
1251 fNumConnected = con;
1252
1253 fQueueProcHeader.emplace(Time(), changed, h);
1254 }
1255};
1256
1257EventBuilderWrapper *EventBuilderWrapper::This = 0;
1258
1259// ----------- Event builder callbacks implementation ---------------
1260bool runOpen(const EVT_CTRL2 &evt)
1261{
1262 return EventBuilderWrapper::This->runOpen(evt);
1263}
1264
1265bool runWrite(const EVT_CTRL2 &evt)
1266{
1267 return EventBuilderWrapper::This->runWrite(evt);
1268}
1269
1270void runClose(RUN_CTRL2 &run)
1271{
1272 EventBuilderWrapper::This->runClose(run);
1273}
1274
1275bool eventCheck(const EVT_CTRL2 &evt)
1276{
1277 return EventBuilderWrapper::This->eventCheck(evt);
1278}
1279
1280void gotNewRun(RUN_CTRL2 &run)
1281{
1282 EventBuilderWrapper::This->gotNewRun(run);
1283}
1284
1285void runFinished()
1286{
1287 EventBuilderWrapper::This->runFinished();
1288}
1289
1290void applyCalib(const EVT_CTRL2 &evt, const size_t &size)
1291{
1292 EventBuilderWrapper::This->applyCalib(evt, size);
1293}
1294
1295void factOut(int severity, const char *message)
1296{
1297 EventBuilderWrapper::This->factOut(severity, message);
1298}
1299
1300void factStat(const GUI_STAT &stat)
1301{
1302 EventBuilderWrapper::This->factStat(stat);
1303}
1304
1305void factReportIncomplete(uint64_t rep)
1306{
1307 EventBuilderWrapper::This->factReportIncomplete(rep);
1308}
1309
1310// ------
1311
1312void debugHead(void *buf)
1313{
1314 const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf);
1315 EventBuilderWrapper::This->debugHead(h);
1316}
1317
1318#endif
Note: See TracBrowser for help on using the repository browser.