source: trunk/FACT++/src/EventBuilderWrapper.h@ 16533

Last change on this file since 16533 was 16530, checked in by tbretz, 12 years ago
Let applyCalib know how big the current queue is to optimize cpu usage; renamed the fDimQueue* to something more related; disentangled the service update from the processing in applyCalib, to avoid them increasing memory consumption by blocking the thread, only update them ifnothing else is in their own queue anymore on only on certain time scales - I hope this solves recently seen problems with Dim blocking the network; Updated the description of STATISTICS1
File size: 42.2 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteFits2
24#endif
25
26#include "DataWriteFits2.h"
27
28#include "queue.h"
29
30namespace ba = boost::asio;
31namespace bs = boost::system;
32namespace fs = boost::filesystem;
33
34using ba::ip::tcp;
35
36using namespace std;
37
38// ========================================================================
39
40#include "EventBuilder.h"
41
42void StartEvtBuild();
43void CloseRunFile();
44
45// ========================================================================
46
47class EventBuilderWrapper
48{
49public:
50 // FIXME
51 static EventBuilderWrapper *This;
52
53 MessageImp &fMsg;
54
55private:
56 boost::thread fThreadMain;
57
58 enum
59 {
60 kCurrent = 0,
61 kTotal = 1,
62 kEventId = 2,
63 kTriggerId = 3,
64 };
65
66 FAD::FileFormat_t fFileFormat;
67
68 //uint32_t fMaxRun;
69 uint32_t fLastOpened;
70 uint32_t fLastClosed;
71 array<uint32_t,4> fNumEvts;
72
73 DimWriteStatistics fDimWriteStats;
74 DimDescribedService fDimRuns;
75 DimDescribedService fDimEvents;
76 DimDescribedService fDimRawData;
77 DimDescribedService fDimEventData;
78 DimDescribedService fDimFeedbackData;
79 DimDescribedService fDimFwVersion;
80 DimDescribedService fDimRunNumber;
81 DimDescribedService fDimStatus;
82 DimDescribedService fDimDNA;
83 DimDescribedService fDimTemperature;
84 DimDescribedService fDimPrescaler;
85 DimDescribedService fDimRefClock;
86 DimDescribedService fDimRoi;
87 DimDescribedService fDimDac;
88 DimDescribedService fDimDrsRuns;
89 DimDescribedService fDimDrsCalibration;
90 DimDescribedService fDimStatistics1;
91 //DimDescribedService fDimStatistics2;
92 DimDescribedService fDimFileFormat;
93 DimDescribedService fDimIncomplete;
94
95 Queue<pair<Time,GUI_STAT>> fQueueStatistics1;
96 Queue<tuple<Time,bool,FAD::EventHeader>> fQueueProcHeader;
97 Queue<pair<Time,array<uint32_t,4>>> fQueueEvents;
98 Queue<pair<Time,array<uint16_t,2>>> fQueueRoi;
99 Queue<vector<char>> fQueueRawData;
100 Queue<tuple<Time,uint32_t,array<float,1440*4>>> fQueueEventData;
101
102 string fPath;
103 uint32_t fNightAsInt;
104 uint32_t fRunNumber;
105 int64_t fRunInProgress;
106
107 array<uint16_t,2> fVecRoi;
108 pair<float,array<float, 1440*4>> fMaxEvent; // Maximum event from applyCalib
109
110protected:
111 bool InitRunNumber(const string &path="")
112 {
113 if (!path.empty())
114 {
115 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
116 {
117 fMsg.Error("Data path "+path+" does not exist!");
118 return false;
119 }
120
121 fPath = path;
122 fDimWriteStats.SetCurrentFolder(fPath);
123
124 fMsg.Info("Data path set to "+path+".");
125 }
126
127 // Get current night
128 const uint32_t night = Time().NightAsInt();
129 if (night==fNightAsInt)
130 return true;
131
132 // Check for run numbers
133 fRunNumber = 1000;
134
135 while (--fRunNumber>0)
136 {
137 const string name = DataProcessorImp::FormFileName(fPath, night, fRunNumber, "");
138
139 if (access((name+"bin").c_str(), F_OK) == 0)
140 break;
141 if (access((name+"fits").c_str(), F_OK) == 0)
142 break;
143 if (access((name+"drs.fits").c_str(), F_OK) == 0)
144 break;
145 }
146
147 // This is now the first file which does not exist
148 fRunNumber++;
149 fLastOpened = 0;
150
151 // Check if we have exceeded the maximum
152 if (fRunNumber==1000)
153 {
154 fMsg.Error("You have a file with run-number 1000 in "+fPath+" ["+to_string(night)+"]");
155 return false;
156 }
157
158 ostringstream str;
159 if (fNightAsInt==0)
160 str << "First night...";
161 else
162 str << "Night has changd from " << fNightAsInt << "... new";
163 str << " run-number is " << night << "-" << setfill('0') << setw(3) << fRunNumber << " [" << (fPath.empty()?".":fPath) << "]";
164 fMsg.Message(str);
165
166 fNightAsInt = night;
167
168 return true;
169 }
170
171public:
172 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
173 fFileFormat(FAD::kNone), /*fMaxRun(0),*/ fLastOpened(0), fLastClosed(0),
174 fDimWriteStats ("FAD_CONTROL", imp),
175 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C",
176 "Run files statistics"
177 "|stats[int]:num of open files, min/max run no, last opened or closed run"
178 "|file[string]:filename of last opened file"),
179 fDimEvents ("FAD_CONTROL/EVENTS", "I:4",
180 "Event counts"
181 "|evtsCount[int]:Num evts cur. run, total (all run), evt ID, trig. Num"),
182 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;S:1;I:1;I:1;S:1;I:1;C:4;I:1;I:2;I:40;S:1440;S:160;F",
183 "|roi[uint16]:number of samples per pixel"
184 "|roi_tm[uint16]:number of samples per time-marker channel"
185 "|num_fad[uint32]:event number from FADs"
186 "|num_ftm[uint32]:trigger number from FTM"
187 "|type[uint16]:trigger type from FTM"
188 "|num_boards[uint32]:number of active boards"
189 "|error[uint8]:event builder error counters"
190 "|dummy[]:"
191 "|time[uint32]:PC time as unix time stamp"
192 "|time_board[uint32]:Time stamp of FAD boards"
193 "|start_pix[int16]:start sample of pixels"
194 "|start_tm[int16]:start sample of time marker channels"
195 "|adc[int16]:adc data"),
196 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", "|avg:|rms:|max:|pos"),
197 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
198 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42",
199 "Firmware version number of fad boards"
200 "|firmware[float]:Version number of firmware, for each board. 40=min, 41=max"),
201 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42",
202 "Run numbers coming from FAD boards"
203 "|runNumbers[int]:current run number of each FAD board. 40=min, 41=max"),
204 fDimStatus ("FAD_CONTROL/STATUS", "S:42",
205 "Status of FAD boards"
206 "|status[bitpattern]:Status of each FAD board. Maybe buggy"),
207 fDimDNA ("FAD_CONTROL/DNA", "X:40",
208 "DNA of FAD boards"
209 "|DNA[hex]:Hex identifier of each FAD board"),
210 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82",
211 "FADs temperatures"
212 "|temp[deg. C]:0 global min, 1-40 min, 41 global max, 42-81 max"),
213 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42",
214 "Trigger generator prescaler of fad boards"
215 "|prescaler[int]:Trigger generator prescaler value, for each board"),
216 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42",
217 "Reference clock of FAD boards"
218 "|refClocks[t]:ref clocks of FAD boards. 40=min, 41=max"),
219 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", "roi:|roi_rm:"),
220 fDimDac ("FAD_CONTROL/DAC", "S:336",
221 "DAC settings of each FAD board"
222 "|DAC[int]:DAC counts, sequentially DAC 0 board 0, 0/1, 0/2... (plus min max)"),
223 fDimDrsRuns ("FAD_CONTROL/DRS_RUNS", "I:1;I:3",
224 "|roi:Region of interest of secondary baseline"
225 "|run:Run numbers of DRS runs (0=none)"),
226 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:1;I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:163840;F:163840",
227 "|roi:Region of interest of secondary baseline"
228 "|run:Run numbers of DRS runs (0=none)"),
229 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:5;X:3;I:1;I:2;C:40;I:40;X:40",
230 "Event Builder status for GUI display"
231 "|bufferInfo[int]:Events in buffer, incomp., comp., write, proc., tot."
232 "|memInfo[int]:total mem allocated, used mem, max memory"
233 "|deltaT[ms]:Time in ms for rates"
234 "|rateNew[int]:Number of new start events received"
235 "|numConn[int]:Number of connections per board"
236 "|rateBytes[int]:Bytes read this cycle"
237 "|totBytes[int]:Bytes read (counter)"),
238 fDimFileFormat("FAD_CONTROL/FILE_FORMAT", "S:1", "|format[int]:Current file format"),
239 fDimIncomplete("FAD_CONTROL/INCOMPLETE", "X:1", "|incomplete[bits]:One bit per board"),
240 // It is important to instantiate them after the DimServices
241 fQueueStatistics1(std::bind(&EventBuilderWrapper::UpdateDimStatistics1, this, placeholders::_1)),
242 fQueueProcHeader( std::bind(&EventBuilderWrapper::procHeader, this, placeholders::_1)),
243 fQueueEvents( std::bind(&EventBuilderWrapper::UpdateDimEvents, this, placeholders::_1)),
244 fQueueRoi( std::bind(&EventBuilderWrapper::UpdateDimRoi, this, placeholders::_1)),
245 fQueueRawData( std::bind(&EventBuilderWrapper::UpdateDimRawData, this, placeholders::_1)),
246 fQueueEventData( std::bind(&EventBuilderWrapper::UpdateDimEventData, this, placeholders::_1)),
247 fNightAsInt(0), fRunInProgress(-1),
248 fMaxEvent(make_pair(-FLT_MAX, array<float,1440*4>()))
249 {
250 if (This)
251 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
252
253 This = this;
254
255 fVecRoi.fill(0);
256
257 memset(fNumEvts.data(), 0, sizeof(fNumEvts));
258 fDimEvents.Update(fNumEvts);
259
260 for (size_t i=0; i<40; i++)
261 ConnectSlot(i, tcp::endpoint());
262 }
263
264 virtual ~EventBuilderWrapper()
265 {
266 Abort();
267
268 // FIXME: Used timed_join and abort afterwards
269 // What's the maximum time the eb need to abort?
270 fThreadMain.join();
271 }
272
273 map<uint32_t, FAD::RunDescription> fExpectedRuns;
274
275 mutex mtx_newrun;
276
277 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
278 {
279 if (maxtime<=0 || maxtime>24*60*60)
280 maxtime = 24*60*60;
281 if (maxevt<=0 || maxevt>INT32_MAX)
282 maxevt = INT32_MAX;
283
284 if (!InitRunNumber())
285 return 0;
286
287 const FAD::RunDescription descr =
288 {
289 uint32_t(maxtime),
290 uint32_t(maxevt),
291 fNightAsInt,
292 ref.first,
293 ref.second,
294 };
295
296 const lock_guard<mutex> lock(mtx_newrun);
297 fExpectedRuns[fRunNumber] = descr;
298 return fRunNumber++;
299 }
300
301 bool IsThreadRunning()
302 {
303 if (fThreadMain.get_id()==boost::this_thread::get_id())
304 return true;
305 return !fThreadMain.timed_join(boost::posix_time::microseconds(0));
306 }
307
308 void SetMaxMemory(unsigned int mb) const
309 {
310 g_maxMem = size_t(mb)*1000000;
311 }
312
313 void StartThread(const vector<tcp::endpoint> &addr)
314 {
315 if (IsThreadRunning())
316 {
317 fMsg.Warn("Start - EventBuilder still running");
318 return;
319 }
320
321 //fLastMessage.clear();
322
323 for (size_t i=0; i<40; i++)
324 ConnectSlot(i, addr[i]);
325
326 fMsg.Message("Starting EventBuilder thread");
327
328 fThreadMain = boost::thread(StartEvtBuild);
329 }
330
331 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
332 {
333 if (i>39)
334 return;
335
336 fRunInProgress = -1;
337
338 if (addr==tcp::endpoint())
339 {
340 // In this order
341 g_port[i].sockDef = 0;
342
343 fDimIncomplete.setQuality(0);
344 fDimIncomplete.Update(uint64_t(0));
345 return;
346 }
347
348 struct sockaddr_in sockaddr; //IP for each socket
349 sockaddr.sin_family = AF_INET;
350 sockaddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
351 sockaddr.sin_port = htons(addr.port());
352 memcpy(&g_port[i].sockAddr, &sockaddr, sizeof(struct sockaddr_in));
353
354 // In this order
355 g_port[i].sockDef = 1;
356
357 fDimIncomplete.setQuality(0);
358 fDimIncomplete.Update(uint64_t(0));
359 }
360
361 void IgnoreSlot(unsigned int i)
362 {
363 if (i>39)
364 return;
365
366 if (g_port[i].sockAddr.sin_port==0)
367 return;
368
369 g_port[i].sockDef = -1;
370 }
371
372
373 void Abort()
374 {
375 fMsg.Message("Signal abort to EventBuilder thread...");
376 g_reset = 2;
377 }
378
379 void ResetThread(bool soft)
380 {
381 fMsg.Message("Signal reset to EventBuilder thread...");
382 g_reset = soft ? 101 : 102;
383 }
384
385 void Exit()
386 {
387 fMsg.Message("Signal exit to EventBuilder thread...");
388 g_reset = 1;
389 }
390
391 bool IsConnected(int i) const { return gi_NumConnect[i]==1; }
392 bool IsConnecting(int i) const { return gi_NumConnect[i]==0 && g_port[i].sockDef!=0; }
393 bool IsDisconnected(int i) const { return gi_NumConnect[i]==0 && g_port[i].sockDef==0; }
394 bool IsRunInProgress() const { return fRunInProgress>=0; }
395
396 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
397 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
398
399 void SetOutputFormat(FAD::FileFormat_t f)
400 {
401 const bool changed = f!=fFileFormat;
402
403 fFileFormat = f;
404 fDimFileFormat.Update(uint16_t(f));
405
406 string msg = "File format set to: ";
407 switch (f)
408 {
409 case FAD::kNone: msg += "kNone."; break;
410 case FAD::kDebug: msg += "kDebug."; break;
411 case FAD::kFits: msg += "kFits."; break;
412 case FAD::kCfitsio: msg += "kCfitsio"; break;
413 case FAD::kRaw: msg += "kRaw"; break;
414 case FAD::kCalib:
415 DataCalib::Restart();
416 DataCalib::Update(fDimDrsCalibration, fDimDrsRuns);
417 fMsg.Message("Resetted DRS calibration.");
418 return;
419 }
420
421 if (changed)
422 fMsg.Message(msg);
423 }
424
425 virtual int ResetSecondaryDrsBaseline()
426 {
427 if (DataCalib::ResetTrgOff(fDimDrsCalibration, fDimDrsRuns))
428 {
429 fFileFormat = FAD::kCalib;
430 fDimFileFormat.Update(uint16_t(fFileFormat));
431 fMsg.Message("Resetted DRS calibration for secondary baseline.");
432 }
433 else
434 fMsg.Warn("Could not reset DRS calibration of secondary baseline.");
435
436 return 0;
437 }
438
439 void LoadDrsCalibration(const char *fname)
440 {
441 if (!DataCalib::ReadFits(fname, fMsg))
442 return;
443 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
444 DataCalib::Update(fDimDrsCalibration, fDimDrsRuns);
445 }
446
447 virtual int CloseOpenFiles() { CloseRunFile(); fRunInProgress = -1; return 0; }
448
449
450 // -------------- Mapped event builder callbacks ------------------
451
452 void UpdateRuns(const string &fname="")
453 {
454 uint32_t values[5] =
455 {
456 !fFile ? 0 : 1,
457 fFile ? fFile->GetRunId() : 0,
458 fFile ? fFile->GetRunId() : 0,
459 fLastOpened,
460 fLastClosed
461 };
462
463 vector<char> data(sizeof(values)+fname.size()+1);
464 memcpy(data.data(), values, sizeof(values));
465 strcpy(data.data()+sizeof(values), fname.c_str());
466 fDimRuns.Update(data);
467 }
468
469 shared_ptr<DataProcessorImp> fFile;
470
471 void UpdateDimEvents(const pair<Time,array<uint32_t,4>> &stat)
472 {
473 fDimEvents.setData(stat.second.data(), sizeof(uint32_t)*4);
474 fDimEvents.Update(stat.first);
475 }
476
477 bool runOpen(const EVT_CTRL2 &evt)
478 {
479 const uint32_t night = evt.runCtrl->night;
480 const uint32_t runid = evt.runNum>0 ? evt.runNum : time(NULL);
481
482 // If there is still an open file: close it
483 if (fFile)
484 runClose();
485
486 // Keep a copy of the currently valid drs calibration
487 // and associate it to the run control structure
488 evt.runCtrl->calib = shared_ptr<DrsCalibration>(new DrsCalibration(DataCalib::GetCalibration()));
489
490 /*
491 evt.runCtrl->calibInt.resize(1024*1440);
492
493 const int16_t *off = evt.runCtrl->zcalib.data();
494 int32_t *ptr = evt.runCtrl->calib.data();
495
496 const uint64_t num = evt.runCtrl->calib.fNumOffset;
497 for (int i=0; i<1024*1440)
498 ptr[i] = off[i]/num;
499 */
500
501 // FIMXE: Check if file already exists...
502
503 // Crate the file
504 DataProcessorImp *file = 0;
505 switch (fFileFormat)
506 {
507 case FAD::kNone: file = new DataDump(fPath, night, runid, fMsg); break;
508 case FAD::kDebug: file = new DataDebug(fPath, night, runid, fMsg); break;
509 case FAD::kCfitsio: file = new DataWriteFits(fPath, night, runid, fMsg); break;
510 case FAD::kFits: file = new DataWriteFits2(fPath, night, runid, fMsg); break;
511 case FAD::kRaw: file = new DataWriteRaw(fPath, night, runid, fMsg); break;
512 case FAD::kCalib: file = new DataCalib(fPath, night, runid, fDimDrsCalibration, fDimDrsRuns, fMsg); break;
513 }
514
515 try
516 {
517 // Try to open the file
518 FAD::RunDescription desc;
519 desc.name = evt.runCtrl->runType;
520
521 if (!file->Open(evt, desc))
522 return false;
523 }
524 catch (const exception &e)
525 {
526 fMsg.Error("Exception trying to open file: "+string(e.what()));
527 return false;
528 }
529
530 fLastOpened = runid;
531
532 // Signal that a file is open
533 fFile = shared_ptr<DataProcessorImp>(file);
534
535 // Now do all the calls which potentially block (dim)
536
537 // Time for update runs before time for update events
538 UpdateRuns(file->GetFileName());
539 fNumEvts[kEventId] = 0;
540 fNumEvts[kTriggerId] = 0;
541 fNumEvts[kCurrent] = 0;
542 fQueueEvents.emplace(Time(), fNumEvts);
543
544 fDimWriteStats.FileOpened(file->GetFileName());
545
546 ostringstream str;
547 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
548 fMsg.Info(str);
549
550 return true;
551 }
552
553 bool runWrite(const EVT_CTRL2 &e)
554 {
555 /*
556 const size_t size = sizeof(EVENT)+1440*(evt.Roi+evt.RoiTM)*2;
557 vector evt(e.fEvent, e.fEvent+size);
558
559 const EVENT &evt = *reinterpret_cast<EVENT*>(evt.data());
560
561 int16_t *val = evt.Adc_Data;
562 const int16_t *off = e.runCtrl->zcalib.data();
563 for (const int16_t *start=evt.StartPix; start<evt.StartPix+1440; val+=1024, off+=1024, start++)
564 {
565 if (*start<0)
566 continue;
567
568 for (size_t i=0; i<roi; i++)
569 val[i] -= offset[(*start+i)%1024];
570 }*/
571
572 const EVENT &evt = *e.fEvent;
573 if (!fFile->WriteEvt(evt))
574 return false;
575
576 fNumEvts[kCurrent]++;
577 fNumEvts[kEventId] = evt.EventNum;
578 fNumEvts[kTriggerId] = evt.TriggerNum;
579 fNumEvts[kTotal]++;
580
581 static Time oldt(boost::date_time::neg_infin);
582 Time newt;
583 if (newt>oldt+boost::posix_time::seconds(1))
584 {
585 fQueueEvents.emplace(Time(), fNumEvts);
586 oldt = newt;
587 }
588
589 return true;
590 }
591
592 void runClose()
593 {
594 if (!fFile)
595 return;
596
597 // It can happen that runFinished was never called
598 // (e.g. runWrite failed)
599 if (fRunInProgress==fFile->GetRunId())
600 fRunInProgress = -1;
601
602 // Close the file
603 const bool rc = fFile->Close(NULL);
604
605 fLastClosed = fFile->GetRunId();
606
607 ostringstream str;
608 str << "Closed: " << fFile->GetFileName() << " (" << fFile->GetRunId() << ")";
609 if (!rc)
610 str << "... failed!";
611
612 // Signal that the file is closed
613
614 fFile.reset();
615
616 // Now do all the calls which can potentially block (dim)
617
618 CloseRun(fLastClosed);
619
620 // Time for update events before time for update runs
621 fQueueEvents.emplace(Time(), fNumEvts);
622 UpdateRuns();
623
624 // Do the potentially blocking call after all others
625 rc ? fMsg.Info(str) : fMsg.Error(str);
626 }
627
628 virtual void CloseRun(uint32_t /*runid*/) { }
629
630 void UpdateDimRoi(const pair<Time, array<uint16_t,2>> &roi)
631 {
632 fDimRoi.setData(roi.second.data(), sizeof(uint16_t)*2);
633 fDimRoi.Update(roi.first);
634 }
635
636 bool eventCheck(const EVT_CTRL2 &evt)
637 {
638 const EVENT *event = evt.fEvent;
639
640 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
641
642 if (roi!=fVecRoi)
643 {
644 fQueueRoi.emplace(Time(), roi);
645 fVecRoi = roi;
646 }
647
648 const FAD::EventHeader *beg = reinterpret_cast<const FAD::EventHeader*>(evt.FADhead);
649 const FAD::EventHeader *end = reinterpret_cast<const FAD::EventHeader*>(evt.FADhead)+40;
650
651 // FIMXE: Compare with target configuration
652
653 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
654 {
655 // FIXME: Compare with expectations!!!
656 if (ptr->fStartDelimiter==0)
657 {
658 if (ptr==beg)
659 beg++;
660 continue;
661 }
662
663 if (beg->fStatus != ptr->fStatus)
664 {
665 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
666 return false;
667 }
668
669 if (beg->fRunNumber != ptr->fRunNumber)
670 {
671 fMsg.Error("Inconsistent run number detected.... closing run.");
672 return false;
673 }
674
675 /*
676 if (beg->fVersion != ptr->fVersion)
677 {
678 Error("Inconsist firmware version detected.... closing run.");
679 CloseRunFile(runNr, 0, 0);
680 break;
681 }
682 */
683 if (beg->fEventCounter != ptr->fEventCounter)
684 {
685 fMsg.Error("Inconsistent FAD event number detected.... closing run.");
686 return false;
687 }
688
689 if (beg->fTriggerCounter != ptr->fTriggerCounter)
690 {
691 fMsg.Error("Inconsistent FTM trigger number detected.... closing run.");
692 return false;
693 }
694
695 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
696 {
697 fMsg.Error("Inconsistent phase shift detected.... closing run.");
698 return false;
699 }
700
701 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
702 {
703 fMsg.Error("Inconsistent DAC values detected.... closing run.");
704 return false;
705 }
706
707 if (beg->fTriggerType != ptr->fTriggerType)
708 {
709 fMsg.Error("Inconsistent trigger type detected.... closing run.");
710 return false;
711 }
712 }
713
714 // check REFCLK_frequency
715 // check consistency with command configuration
716 // how to log errors?
717 // need gotNewRun/closedRun to know it is finished
718
719 return true;
720 }
721
722 Time fLastDimRawData;
723 Time fLastDimEventData;
724
725 void UpdateDimRawData(const vector<char> &v)
726 {
727 const EVENT *evt = reinterpret_cast<const EVENT*>(v.data());
728
729 fDimRawData.setData(v.data());
730 fDimRawData.setQuality(evt->TriggerType);
731 fDimRawData.Update(Time(evt->PCTime, evt->PCUsec));
732 }
733 void UpdateDimEventData(const tuple<Time,uint32_t,array<float, 1440*4>> &tup)
734 {
735 fDimEventData.setQuality(get<1>(tup));
736 fDimEventData.setData(get<2>(tup));
737 fDimEventData.Update(get<0>(tup));
738 }
739
740 void applyCalib(const EVT_CTRL2 &evt, const size_t &size)
741 {
742 const EVENT *event = evt.fEvent;
743 const int16_t *start = event->StartPix;
744
745 // Get the reference to the run associated information
746 RUN_CTRL2 &run = *evt.runCtrl;
747
748 if (size==1) // If there is more than one event waiting (including this one), throw them away
749 {
750 Time now;
751
752 // ------------------- Copy event data to new memory --------------------
753 // (to make it thread safe; a static buffer might improve memory handling)
754 const uint16_t roi = event->Roi;
755
756 // ------------------- Apply full DRS calibration ------------------------
757 // (Is that necessray, or would a simple offset correct do well already?)
758
759 // There seems to be a problem using std::array... maybe the size is too big?
760 // array<float, (1440+160)*1024> vec2;
761 vector<float> vec((1440+160)*roi);
762 run.calib->Apply(vec.data(), event->Adc_Data, start, roi);
763
764 // ------------------- Appy DRS-step correction --------------------------
765 for (auto it=run.prevStart.begin(); it!=run.prevStart.end(); it++)
766 {
767 DrsCalibrate::CorrectStep(vec.data(), 1440, roi, it->data(), start, roi+10);
768 DrsCalibrate::CorrectStep(vec.data(), 1440, roi, it->data(), start, 3);
769 }
770
771 // ------------------------- Remove spikes --------------------------------
772 DrsCalibrate::RemoveSpikes3(vec.data(), roi);
773
774 // -------------- Update raw data dim sevice (VERY SLOW) -----------------
775 if (fQueueRawData.empty() && now>fLastDimRawData+boost::posix_time::seconds(5))
776 {
777 vector<char> data1(sizeof(EVENT)+vec.size()*sizeof(float));
778 memcpy(data1.data(), event, sizeof(EVENT));
779 memcpy(data1.data()+sizeof(EVENT), vec.data(), vec.size()*sizeof(float));
780 fQueueRawData.emplace(data1);
781
782 fLastDimRawData = now;
783 }
784
785 // ------------------------- Basic statistics -----------------------------
786 DrsCalibrate::SlidingAverage(vec.data(), roi, 10);
787
788 // If this is a cosmic event
789 array<float, 1440*4> stats; // Mean, RMS, Max, Pos
790 const float max = DrsCalibrate::GetPixelStats(stats.data(), vec.data(), roi);
791 if (evt.trgTyp==0 && max>fMaxEvent.first)
792 fMaxEvent = make_pair(max, stats);
793
794 // ------------------ Update dim service (statistics) ---------------------
795
796 if (fQueueEventData.empty() && now>fLastDimEventData+boost::posix_time::seconds(3))
797 {
798 fQueueEventData.emplace(evt.time, evt.trgTyp, evt.trgTyp==0 ? fMaxEvent.second : stats);
799 if (evt.trgTyp==0)
800 fMaxEvent.first = -FLT_MAX;
801
802 fLastDimEventData = now;
803 }
804
805 // === SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
806 //
807 // if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
808 // return;
809 //
810 // vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
811 // DrsCalibrate::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
812 //
813 // fDimFeedbackData.Update(data2);
814 }
815
816 // Keep the start cells of the last five events for further corrections
817 // As a performance improvement we could also just store the
818 // pointers to the last five events...
819 // What if a new run is started? Do we mind?
820 auto &l = run.prevStart; // History for start cells of previous events (for step calibration)
821
822 if (l.size()<5)
823 l.emplace_front();
824 else
825 {
826 auto it = l.end();
827 l.splice(l.begin(), l, --it);
828 }
829
830 memcpy(l.front().data(), start, 1440*sizeof(int16_t));
831 }
832
833 bool IsRunWaiting()
834 {
835 const lock_guard<mutex> lock(mtx_newrun);
836 return fExpectedRuns.find(fRunNumber-1)!=fExpectedRuns.end();
837 }
838
839 uint32_t GetRunNumber() const
840 {
841 return fRunNumber;
842 }
843
844 bool IncreaseRunNumber(uint32_t run)
845 {
846 if (!InitRunNumber())
847 return false;
848
849 if (run<fRunNumber)
850 {
851 ostringstream msg;
852 msg <<
853 "Run number " << run << " smaller than next available "
854 "run number " << fRunNumber << " in " << fPath << " [" << fNightAsInt << "]";
855 fMsg.Error(msg);
856 return false;
857 }
858
859 fRunNumber = run;
860
861 return true;
862 }
863
864 void gotNewRun(RUN_CTRL2 &run)
865 {
866 // This is to secure iteration over fExpectedRuns
867 const lock_guard<mutex> lock(mtx_newrun);
868
869 map<uint32_t,FAD::RunDescription>::iterator it = fExpectedRuns.begin();
870 while (it!=fExpectedRuns.end())
871 {
872 if (it->first<run.runId)
873 {
874 ostringstream str;
875 str << "runOpen - Missed run " << it->first << ".";
876 fMsg.Info(str);
877
878 // Increase the iterator first, it becomes invalid with the next call
879 const auto is = it++;
880 fExpectedRuns.erase(is);
881 continue;
882 }
883
884 if (it->first==run.runId)
885 break;
886
887 it++;
888 }
889
890 if (it==fExpectedRuns.end())
891 {
892 ostringstream str;
893 str << "runOpen - Run " << run.runId << " wasn't expected (maybe manual triggers)";
894 fMsg.Warn(str);
895
896 // This is not ideal, but the best we can do
897 run.night = fNightAsInt;
898
899 return;
900 }
901
902 const FAD::RunDescription &conf = it->second;
903
904 run.runType = conf.name;
905 run.maxEvt = conf.maxevt;
906 run.closeTime = conf.maxtime + run.openTime;
907 run.night = conf.night;
908
909 fExpectedRuns.erase(it);
910
911 // Now signal the fadctrl (configuration process that a run is in progress)
912 // Maybe this could be done earlier, but we are talking about a
913 // negligible time scale here.
914 fRunInProgress = run.runId;
915 }
916
917 void runFinished()
918 {
919 // This is called when the last event of a run (run time exceeded or
920 // max number of events exceeded) has been received.
921 fRunInProgress = -1;
922 }
923
924 //map<boost::thread::id, string> fLastMessage;
925
926 void factOut(int severity, const char *message)
927 {
928 ostringstream str;
929 str << "EventBuilder: " << message;
930
931 /*
932 string &old = fLastMessage[boost::this_thread::get_id()];
933
934 if (str.str()==old)
935 return;
936 old = str.str();
937 */
938
939 fMsg.Update(str, severity);
940 }
941
942/*
943 void factStat(int64_t *stat, int len)
944 {
945 if (len!=7)
946 {
947 fMsg.Warn("factStat received unknown number of values.");
948 return;
949 }
950
951 vector<int64_t> data(1, g_maxMem);
952 data.insert(data.end(), stat, stat+len);
953
954 static vector<int64_t> last(8);
955 if (data==last)
956 return;
957 last = data;
958
959 fDimStatistics.Update(data);
960
961 // len ist die Laenge des arrays.
962 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
963 // kannst Du pruefen, ob die 100MB voll sind ....
964
965 ostringstream str;
966 str
967 << "Wait=" << stat[0] << " "
968 << "Skip=" << stat[1] << " "
969 << "Del=" << stat[2] << " "
970 << "Tot=" << stat[3] << " "
971 << "Mem=" << stat[4] << "/" << g_maxMem << " "
972 << "Read=" << stat[5] << " "
973 << "Conn=" << stat[6];
974
975 fMsg.Info(str);
976 }
977 */
978
979 void UpdateDimStatistics1(const pair<Time,GUI_STAT> &stat)
980 {
981 fDimStatistics1.setData(&stat.second, sizeof(GUI_STAT));
982 fDimStatistics1.Update(stat.first);
983 }
984
985 void factStat(const GUI_STAT &stat)
986 {
987 fQueueStatistics1.emplace(Time(), stat);
988 }
989
990 void factReportIncomplete(uint64_t rep)
991 {
992 fDimIncomplete.setQuality(1);
993 fDimIncomplete.Update(rep);
994 }
995
996 array<FAD::EventHeader, 40> fVecHeader;
997
998 template<typename T, class S>
999 array<T, 42> Compare(const S *vec, const T *t)
1000 {
1001 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1002
1003 const T *min = NULL;
1004 const T *val = NULL;
1005 const T *max = NULL;
1006
1007 array<T, 42> arr;
1008
1009 // bool rc = true;
1010 for (int i=0; i<40; i++)
1011 {
1012 const char *base = reinterpret_cast<const char*>(vec+i);
1013 const T *ref = reinterpret_cast<const T*>(base+offset);
1014
1015 arr[i] = *ref;
1016
1017 if (gi_NumConnect[i]==0)
1018 {
1019 arr[i] = 0;
1020 continue;
1021 }
1022
1023 if (!val)
1024 {
1025 min = ref;
1026 val = ref;
1027 max = ref;
1028 }
1029
1030 if (*ref<*min)
1031 min = ref;
1032
1033 if (*ref>*max)
1034 max = ref;
1035
1036 // if (*val!=*ref)
1037 // rc = false;
1038 }
1039
1040 arr[40] = val ? *min : 1;
1041 arr[41] = val ? *max : 0;
1042
1043 return arr;
1044 }
1045
1046 template<typename T>
1047 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1048 {
1049 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1050
1051 T val = 0;
1052 T rc = 0;
1053
1054 array<T, 42> vec;
1055
1056 bool first = true;
1057
1058 for (int i=0; i<40; i++)
1059 {
1060 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1061 const T *ref = reinterpret_cast<const T*>(base+offset);
1062
1063 vec[i+2] = *ref;
1064
1065 if (gi_NumConnect[i]==0)
1066 {
1067 vec[i+2] = 0;
1068 continue;
1069 }
1070
1071 if (first)
1072 {
1073 first = false;
1074 val = *ref;
1075 rc = 0;
1076 }
1077
1078 rc |= val^*ref;
1079 }
1080
1081 vec[0] = rc;
1082 vec[1] = val;
1083
1084 return vec;
1085 }
1086
1087 template<typename T, size_t N>
1088 void Update(DimDescribedService &svc, const array<T, N> &data, const Time &t=Time(), int n=N)
1089 {
1090 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1091 svc.Update(t);
1092 }
1093
1094 template<typename T>
1095 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1096 {
1097 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1098 for (int i=0; i<40;i++)
1099 cout << " " << data.second[i+3];
1100 cout << endl;
1101 }
1102
1103 vector<uint> fNumConnected;
1104
1105 void procHeader(const tuple<Time,bool,FAD::EventHeader> &dat)
1106 {
1107 const Time &t = get<0>(dat);
1108 const bool changed = get<1>(dat);
1109 const FAD::EventHeader &h = get<2>(dat);
1110
1111 const FAD::EventHeader old = fVecHeader[h.Id()];
1112 fVecHeader[h.Id()] = h;
1113
1114 if (old.fVersion != h.fVersion || changed)
1115 {
1116 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1117
1118 array<float,42> data;
1119 for (int i=0; i<42; i++)
1120 {
1121 ostringstream str;
1122 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1123 data[i] = stof(str.str());
1124 }
1125 Update(fDimFwVersion, data, t);
1126 }
1127
1128 if (old.fRunNumber != h.fRunNumber || changed)
1129 {
1130 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1131 fDimRunNumber.setData(&run[0], 42*sizeof(uint32_t));
1132 fDimRunNumber.Update(t);
1133 }
1134
1135 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1136 {
1137 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1138 fDimPrescaler.setData(&pre[0], 42*sizeof(uint16_t));
1139 fDimPrescaler.Update(t);
1140 }
1141
1142 if (old.fDNA != h.fDNA || changed)
1143 {
1144 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1145 Update(fDimDNA, dna, t, 40);
1146 }
1147
1148 if (old.fStatus != h.fStatus || changed)
1149 {
1150 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1151 Update(fDimStatus, sts, t);
1152 }
1153
1154 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1155 {
1156 array<uint16_t, FAD::kNumDac*42> dacs;
1157
1158 for (int i=0; i<FAD::kNumDac; i++)
1159 {
1160 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1161 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1162 }
1163
1164 Update(fDimDac, dacs, t);
1165 }
1166
1167 // -----------
1168
1169 static Time oldt(boost::date_time::neg_infin);
1170 Time newt;
1171
1172 if (newt>oldt+boost::posix_time::seconds(1))
1173 {
1174 oldt = newt;
1175
1176 // --- RefClock
1177
1178 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1179 Update(fDimRefClock, clk, t);
1180
1181 // --- Temperatures
1182
1183 const array<int16_t,42> tmp[4] =
1184 {
1185 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1186 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1187 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1188 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1189 };
1190
1191 vector<int16_t> data;
1192 data.reserve(82);
1193 data.push_back(tmp[0][40]); // min: 0
1194 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1195 data.push_back(tmp[0][41]); // max: 41
1196 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1197
1198 for (int j=1; j<=3; j++)
1199 {
1200 const array<int16_t,42> &ref = tmp[j];
1201
1202 // Gloabl min
1203 if (ref[40]<data[0]) // 40=min
1204 data[0] = ref[40];
1205
1206 // Global max
1207 if (ref[41]>data[41]) // 41=max
1208 data[41] = ref[41];
1209
1210 for (int i=0; i<40; i++)
1211 {
1212 // min per board
1213 if (ref[i]<data[i+1]) // data: 1-40
1214 data[i+1] = ref[i]; // ref: 0-39
1215
1216 // max per board
1217 if (ref[i]>data[i+42]) // data: 42-81
1218 data[i+42] = ref[i]; // ref: 0-39
1219 }
1220 }
1221
1222 vector<float> deg(82); // 0: global min, 1-40: min
1223 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1224 deg[i] = data[i]/16.;
1225
1226 fDimTemperature.setData(deg.data(), 82*sizeof(float));
1227 fDimTemperature.Update(t);
1228 }
1229 }
1230
1231 void debugHead(const FAD::EventHeader &h)
1232 {
1233 const uint16_t id = h.Id();
1234 if (id>39)
1235 return;
1236
1237 if (fNumConnected.size()!=40)
1238 fNumConnected.resize(40);
1239
1240 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1241
1242 const bool changed = con!=fNumConnected || !IsThreadRunning();
1243
1244 fNumConnected = con;
1245
1246 fQueueProcHeader.emplace(Time(), changed, h);
1247 }
1248};
1249
1250EventBuilderWrapper *EventBuilderWrapper::This = 0;
1251
1252// ----------- Event builder callbacks implementation ---------------
1253bool runOpen(const EVT_CTRL2 &evt)
1254{
1255 return EventBuilderWrapper::This->runOpen(evt);
1256}
1257
1258bool runWrite(const EVT_CTRL2 &evt)
1259{
1260 return EventBuilderWrapper::This->runWrite(evt);
1261}
1262
1263void runClose()
1264{
1265 EventBuilderWrapper::This->runClose();
1266}
1267
1268bool eventCheck(const EVT_CTRL2 &evt)
1269{
1270 return EventBuilderWrapper::This->eventCheck(evt);
1271}
1272
1273void gotNewRun(RUN_CTRL2 &run)
1274{
1275 EventBuilderWrapper::This->gotNewRun(run);
1276}
1277
1278void runFinished()
1279{
1280 EventBuilderWrapper::This->runFinished();
1281}
1282
1283void applyCalib(const EVT_CTRL2 &evt, const size_t &size)
1284{
1285 EventBuilderWrapper::This->applyCalib(evt, size);
1286}
1287
1288void factOut(int severity, const char *message)
1289{
1290 EventBuilderWrapper::This->factOut(severity, message);
1291}
1292
1293void factStat(const GUI_STAT &stat)
1294{
1295 EventBuilderWrapper::This->factStat(stat);
1296}
1297
1298void factReportIncomplete(uint64_t rep)
1299{
1300 EventBuilderWrapper::This->factReportIncomplete(rep);
1301}
1302
1303// ------
1304
1305void debugHead(void *buf)
1306{
1307 const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf);
1308 EventBuilderWrapper::This->debugHead(h);
1309}
1310
1311#endif
Note: See TracBrowser for help on using the repository browser.