source: trunk/FACT++/src/EventBuilderWrapper.h@ 11788

Last change on this file since 11788 was 11758, checked in by tbretz, 13 years ago
Renamed EVENT_DATA to RAW_EVENT; added new EVENT_DATA
File size: 35.7 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/date_time/posix_time/posix_time_types.hpp>
13
14#include "DimWriteStatistics.h"
15
16#include "DataCalib.h"
17#include "DataWriteRaw.h"
18
19#ifdef HAVE_FITS
20#include "DataWriteFits.h"
21#else
22#define DataWriteFits DataWriteRaw
23#endif
24
25namespace ba = boost::asio;
26namespace bs = boost::system;
27
28using ba::ip::tcp;
29
30using namespace std;
31
32// ========================================================================
33
34#include "EventBuilder.h"
35
36extern "C" {
37 extern void StartEvtBuild();
38 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
39}
40
41// ========================================================================
42
43class EventBuilderWrapper
44{
45public:
46 // FIXME
47 static EventBuilderWrapper *This;
48
49 MessageImp &fMsg;
50
51private:
52 boost::thread fThread;
53
54 enum CommandStates_t // g_runStat
55 {
56 kAbort = -2, // quit as soon as possible ('abort')
57 kExit = -1, // stop reading, quit when buffered events done ('exit')
58 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
59 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
60 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
61 kModeFlush = 10, // read data from camera, but skip them ('flush')
62 kModeTest = 20, // read data and process them, but do not write to disk ('test')
63 kModeFlag = 30, // read data, process and write all to disk ('flag')
64 kModeRun = 40, // read data, process and write selected to disk ('run')
65 };
66
67 enum
68 {
69 kCurrent = 0,
70 kTotal = 1,
71 kEventId = 2,
72 kTriggerId = 3,
73 };
74
75 enum FileFormat_t
76 {
77 kNone = 0,
78 kDebug,
79 kFits,
80 kRaw,
81 kCalib
82 };
83
84 FileFormat_t fFileFormat;
85
86
87 uint32_t fMaxRun;
88 uint32_t fLastOpened;
89 uint32_t fLastClosed;
90 uint32_t fNumEvts[4];
91
92 DimWriteStatistics fDimWriteStats;
93 DimDescribedService fDimRuns;
94 DimDescribedService fDimEvents;
95 DimDescribedService fDimRawData;
96 DimDescribedService fDimEventData;
97 DimDescribedService fDimFwVersion;
98 DimDescribedService fDimRunNumber;
99 DimDescribedService fDimStatus;
100 DimDescribedService fDimDNA;
101 DimDescribedService fDimTemperature;
102 DimDescribedService fDimPrescaler;
103 DimDescribedService fDimRefClock;
104 DimDescribedService fDimRoi;
105 DimDescribedService fDimDac;
106 DimDescribedService fDimDrsCalibration;
107 DimDescribedService fDimStatistics1;
108 DimDescribedService fDimStatistics2;
109
110 bool fDebugStream;
111 bool fDebugRead;
112 bool fDebugLog;
113
114 uint32_t fRunNumber;
115
116 void InitRunNumber()
117 {
118 // FIXME: Add a check that we are not too close to noon!
119 //const int night = Time().NightAsInt();
120
121 fRunNumber = 1000;
122
123 while (--fRunNumber>0)
124 {
125 const string name = DataProcessorImp::FormFileName(fRunNumber, "");
126
127 if (access((name+"bin").c_str(), F_OK) == 0)
128 break;
129 if (access((name+"fits").c_str(), F_OK) == 0)
130 break;
131 if (access((name+"drs.fits").c_str(), F_OK) == 0)
132 break;
133 }
134
135 fRunNumber++;
136
137 ostringstream str;
138 str << "Starting with run number " << fRunNumber;
139 fMsg.Message(str);
140
141 fMsg.Info(" ==> TODO: Run-number detection doesn't work when noon passes!");
142 fMsg.Info(" ==> TODO: Crosscheck with database!");
143 }
144
145public:
146 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
147 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
148 fDimWriteStats ("FAD_CONTROL", imp),
149 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
150 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
151 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
152 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
153 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
154 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
155 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
156 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
157 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
158 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
159 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
160 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
161 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
162 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560", ""),
163 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
164 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
165 fDebugStream(false), fDebugRead(false), fDebugLog(false)
166 {
167 if (This)
168 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
169
170 This = this;
171
172 memset(fNumEvts, 0, sizeof(fNumEvts));
173
174 fDimEvents.Update(fNumEvts);
175
176 for (size_t i=0; i<40; i++)
177 ConnectSlot(i, tcp::endpoint());
178
179 InitRunNumber();
180 }
181 virtual ~EventBuilderWrapper()
182 {
183 Abort();
184
185 // FIXME: Used timed_join and abort afterwards
186 // What's the maximum time the eb need to abort?
187 fThread.join();
188 //ffMsg.Info("EventBuilder stopped.");
189
190 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
191 delete *it;
192 }
193
194 struct RunDescription
195 {
196 uint32_t maxtime;
197 uint32_t maxevt;
198
199 FAD::Configuration reference;
200 };
201
202 map<uint32_t, RunDescription> fExpectedRuns;
203
204 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const FAD::Configuration &ref)
205 {
206 if (maxtime<=0 || maxtime>24*60*60)
207 maxtime = 24*60*60;
208 if (maxevt<=0 || maxevt>INT32_MAX)
209 maxevt = INT32_MAX;
210
211 const RunDescription descr =
212 {
213 uint32_t(maxtime),
214 uint32_t(maxevt),
215 ref
216 };
217
218 // FIMXE: Maybe reset an event counter so that the mcp can count events?
219
220 fExpectedRuns[fRunNumber] = descr;
221 return fRunNumber++;
222 }
223
224 bool IsThreadRunning()
225 {
226 return !fThread.timed_join(boost::posix_time::microseconds(0));
227 }
228
229 void SetMaxMemory(unsigned int mb) const
230 {
231 /*
232 if (mb*1000000<GetUsedMemory())
233 {
234 // ffMsg.Warn("...");
235 return;
236 }*/
237
238 g_maxMem = size_t(mb)*1000000;
239 }
240
241 void StartThread(const vector<tcp::endpoint> &addr)
242 {
243 if (IsThreadRunning())
244 {
245 fMsg.Warn("Start - EventBuilder still running");
246 return;
247 }
248
249 fLastMessage.clear();
250
251 for (size_t i=0; i<40; i++)
252 ConnectSlot(i, addr[i]);
253
254 g_runStat = kModeRun;
255
256 fMsg.Message("Starting EventBuilder thread");
257
258 fThread = boost::thread(StartEvtBuild);
259 }
260 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
261 {
262 if (i>39)
263 return;
264
265 if (addr==tcp::endpoint())
266 {
267 DisconnectSlot(i);
268 return;
269 }
270
271 g_port[i].sockAddr.sin_family = AF_INET;
272 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
273 g_port[i].sockAddr.sin_port = htons(addr.port());
274 // In this order
275 g_port[i].sockDef = 1;
276 }
277 void DisconnectSlot(unsigned int i)
278 {
279 if (i>39)
280 return;
281
282 g_port[i].sockDef = 0;
283 // In this order
284 g_port[i].sockAddr.sin_family = AF_INET;
285 g_port[i].sockAddr.sin_addr.s_addr = 0;
286 g_port[i].sockAddr.sin_port = 0;
287 }
288 void IgnoreSlot(unsigned int i)
289 {
290 if (i>39)
291 return;
292 if (g_port[i].sockAddr.sin_port==0)
293 return;
294
295 g_port[i].sockDef = -1;
296 }
297
298
299 void Abort()
300 {
301 fMsg.Message("Signal abort to EventBuilder thread...");
302 g_runStat = kAbort;
303 }
304
305 void ResetThread(bool soft)
306 {
307 /*
308 if (g_reset > 0)
309
310 * suspend reading
311 * reset = g_reset;
312 * g_reset=0
313
314 * reset% 10
315 == 0 leave event Buffers as they are
316 == 1 let all buffers drain (write (incomplete) events)
317 > 1 flush all buffers (do not write buffered events)
318
319 * (reset/10)%10
320 > 0 close all sockets and destroy them (also free the
321 allocated read-buffers)
322 recreate before resuming operation
323 [ this is more than just close/open that can be
324 triggered by e.g. close/open the base-socket ]
325
326 * (reset/100)%10
327 > 0 close all open run-files
328
329 * (reset/1000)
330 sleep so many seconds before resuming operation
331 (does not (yet) take into account time left when waiting
332 for buffers getting empty ...)
333
334 * resume_reading
335
336 */
337 fMsg.Message("Signal reset to EventBuilder thread...");
338 g_reset = soft ? 101 : 102;
339 }
340
341 void Exit()
342 {
343 fMsg.Message("Signal exit to EventBuilder thread...");
344 g_runStat = kExit;
345 }
346
347 /*
348 void Wait()
349 {
350 fThread.join();
351 ffMsg.Message("EventBuilder stopped.");
352 }*/
353
354 void Hybernate() const { g_runStat = kHybernate; }
355 void Sleep() const { g_runStat = kSleep; }
356 void FlushMode() const { g_runStat = kModeFlush; }
357 void TestMode() const { g_runStat = kModeTest; }
358 void FlagMode() const { g_runStat = kModeFlag; }
359 void RunMode() const { g_runStat = kModeRun; }
360
361 // FIXME: To be removed
362 void SetMode(int mode) const { g_runStat = mode; }
363
364 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
365 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
366 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
367 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
368
369 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
370 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
371
372 void SetOutputFormat(FileFormat_t f)
373 {
374 fFileFormat = f;
375 if (fFileFormat==kCalib)
376 {
377 DataCalib::Restart();
378 DataCalib::Update(fDimDrsCalibration);
379 fMsg.Message("Resetted DRS calibration.");
380 }
381 }
382
383 void SetDebugLog(bool b) { fDebugLog = b; }
384
385 void SetDebugStream(bool b)
386 {
387 fDebugStream = b;
388 if (b)
389 return;
390
391 for (int i=0; i<40; i++)
392 {
393 if (!fDumpStream[i].is_open())
394 continue;
395
396 fDumpStream[i].close();
397
398 ostringstream name;
399 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
400 fMsg.Message("Closed file '"+name.str()+"'");
401 }
402 }
403
404 void SetDebugRead(bool b)
405 {
406 fDebugRead = b;
407 if (b || !fDumpRead.is_open())
408 return;
409
410 fDumpRead.close();
411 fMsg.Message("Closed file 'socket_events.txt'");
412 }
413
414// size_t GetUsedMemory() const { return gi_usedMem; }
415
416 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
417
418
419 /*
420 struct OpenFileToDim
421 {
422 int code;
423 char fileName[FILENAME_MAX];
424 };
425
426 SignalRunOpened(runid, filename);
427 // Send num open files
428 // Send runid, (more info about the run?), filename via dim
429
430 SignalEvtWritten(runid);
431 // Send num events written of newest file
432
433 SignalRunClose(runid);
434 // Send new num open files
435 // Send empty file-name if no file is open
436
437 */
438
439 // -------------- Mapped event builder callbacks ------------------
440
441 void UpdateRuns(const string &fname="")
442 {
443 uint32_t values[5] =
444 {
445 static_cast<uint32_t>(fFiles.size()),
446 0xffffffff,
447 0,
448 fLastOpened,
449 fLastClosed
450 };
451
452 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
453 it!=fFiles.end(); it++)
454 {
455 const DataProcessorImp *file = *it;
456
457 if (file->GetRunId()<values[1])
458 values[1] = file->GetRunId();
459
460 if (file->GetRunId()>values[2])
461 values[2] = file->GetRunId();
462 }
463
464 fMaxRun = values[2];
465
466 vector<char> data(sizeof(values)+fname.size()+1);
467 memcpy(data.data(), values, sizeof(values));
468 strcpy(data.data()+sizeof(values), fname.c_str());
469
470 fDimRuns.Update(data);
471 }
472
473 vector<DataProcessorImp*> fFiles;
474
475 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
476 {
477 fMsg.Info(" ==> TODO: Update run configuration in database!");
478
479 // Check if file already exists...
480 DataProcessorImp *file = 0;
481 switch (fFileFormat)
482 {
483 case kNone: file = new DataDump(runid, fMsg); break;
484 case kDebug: file = new DataDebug(runid, fMsg); break;
485 case kFits: file = new DataWriteFits(runid, fMsg); break;
486 case kRaw: file = new DataWriteRaw(runid, fMsg); break;
487 case kCalib: file = new DataCalib(runid, fDimDrsCalibration, fMsg); break;
488 }
489
490 try
491 {
492 if (!file->Open(h))
493 return 0;
494 }
495 catch (const exception &e)
496 {
497 fMsg.Error("Exception trying to open file: "+string(e.what()));
498 return 0;
499 }
500
501 fFiles.push_back(file);
502
503 ostringstream str;
504 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
505 fMsg.Info(str);
506
507 fDimWriteStats.FileOpened(file->GetFileName());
508
509 fLastOpened = runid;
510 UpdateRuns(file->GetFileName());
511
512 fNumEvts[kEventId] = 0;
513 fNumEvts[kTriggerId] = 0;
514
515 fNumEvts[kCurrent] = 0;
516 fDimEvents.Update(fNumEvts);
517 // fDimCurrentEvent.Update(uint32_t(0));
518
519 return reinterpret_cast<FileHandle_t>(file);
520 }
521
522 int runWrite(FileHandle_t handler, EVENT *e, size_t)
523 {
524 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
525
526 if (!file->WriteEvt(e))
527 return -1;
528
529 if (file->GetRunId()==fMaxRun)
530 {
531 fNumEvts[kCurrent]++;
532 fNumEvts[kEventId] = e->EventNum;
533 fNumEvts[kTriggerId] = e->TriggerNum;
534 }
535
536 fNumEvts[kTotal]++;
537
538 static Time oldt(boost::date_time::neg_infin);
539 Time newt;
540 if (newt>oldt+boost::posix_time::seconds(1))
541 {
542 fDimEvents.Update(fNumEvts);
543 oldt = newt;
544 }
545
546
547 // ===> SignalEvtWritten(runid);
548 // Send num events written of newest file
549
550 /* close run runId (all all runs if runId=0) */
551 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
552 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
553
554 return 0;
555 }
556
557 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
558 {
559 fMsg.Info(" ==> TODO: Update run configuration in database!");
560
561 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
562
563 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
564 if (it==fFiles.end())
565 {
566 ostringstream str;
567 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
568 fMsg.Fatal(str);
569 return -1;
570 }
571
572 fFiles.erase(it);
573
574 fLastClosed = file->GetRunId();
575 UpdateRuns();
576
577 fDimEvents.Update(fNumEvts);
578
579 const bool rc = file->Close(tail);
580 if (!rc)
581 {
582 // Error message
583 }
584
585 ostringstream str;
586 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
587 fMsg.Info(str);
588
589 delete file;
590
591 // ==> SignalRunClose(runid);
592 // Send new num open files
593 // Send empty file-name if no file is open
594
595 return rc ? 0 : -1;
596 }
597
598 ofstream fDumpStream[40];
599
600 void debugStream(int isock, void *buf, int len)
601 {
602 if (!fDebugStream)
603 return;
604
605 const int slot = isock/7;
606 if (slot<0 || slot>39)
607 return;
608
609 if (!fDumpStream[slot].is_open())
610 {
611 ostringstream name;
612 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
613
614 fDumpStream[slot].open(name.str().c_str(), ios::app);
615 if (!fDumpStream[slot])
616 {
617 ostringstream str;
618 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
619 fMsg.Error(str);
620
621 return;
622 }
623
624 fMsg.Message("Opened file '"+name.str()+"' for writing.");
625 }
626
627 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
628 }
629
630 ofstream fDumpRead; // Stream to possibly dump docket events
631
632 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
633 {
634 // isock = socketID (0-279)
635 // ibyte = #bytes gelesen
636 // event = eventId (oder 0 wenn noch nicht bekannt)
637 // state : 1=finished reading data
638 // 0=reading data
639 // -1=start reading data (header)
640 // -2=start reading data,
641 // eventId not known yet (too little data)
642 // tsec, tusec = time when reading seconds, microseconds
643 //
644 if (!fDebugRead || ibyte==0)
645 return;
646
647 if (!fDumpRead.is_open())
648 {
649 fDumpRead.open("socket_events.txt", ios::app);
650 if (!fDumpRead)
651 {
652 ostringstream str;
653 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
654 fMsg.Error(str);
655
656 return;
657 }
658
659 fMsg.Message("Opened file 'socket_events.txt' for writing.");
660
661 fDumpRead << "# START: " << Time().GetAsStr() << endl;
662 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
663 }
664
665 fDumpRead
666 << setw(2) << state << " "
667 << setw(8) << tsec << " "
668 << setw(9) << tusec << " "
669 << setw(3) << isock << " "
670 << setw(2) << isock/7 << " "
671 << runno << " "
672 << event << " "
673 << ftmevt << " "
674 << ibyte << endl;
675 }
676
677 array<uint16_t,2> fVecRoi;
678
679 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
680 {
681 /*
682 fadhd[i] ist ein array mit den 40 fad-headers
683 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
684
685 event ist die Struktur, die auch die write routine erhaelt;
686 darin sind im header die 'soll-werte' fuer z.B. eventID
687 als auch die ADC-Werte (falls Du die brauchst)
688
689 Wenn die routine einen negativen Wert liefert, wird das event
690 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
691 */
692
693 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
694
695 if (roi!=fVecRoi)
696 {
697 Update(fDimRoi, roi);
698 fVecRoi = roi;
699 }
700
701 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
702 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
703
704 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
705 {
706 // Event incomplete
707 //if (ptr->fStartDelimiter==0)
708 // return -1;
709
710 // Either one of
711 // * fStatus
712 // * fRunNumber
713 // * fEventCounter
714 // * fAdcClockPhaseShift
715 // * fTriggerGeneratorPrescaler
716 // * fDac
717 // inconsistent
718
719 // FIXME: Produce some output, only once per run or
720 // minute
721
722 /*
723 if (*ptr != *beg)
724 return -1;
725
726 if (ptr->fTriggerType != beg->fTriggerType)
727 return -1;
728 if (ptr->fTriggerId != beg->fTriggerId)
729 return -1;
730 if (ptr->fVersion != beg->fVersion)
731 return -1;
732 */
733 }
734
735 // check REFCLK_frequency
736 // check consistency with command configuration
737 // how to log errors?
738 // need gotNewRun/closedRun to know it is finished
739
740 static Time oldt(boost::date_time::neg_infin);
741 Time newt;
742
743 // FIXME: Only send events if the have newer run-numbers
744 if (newt<oldt+boost::posix_time::seconds(1))
745 return 0;
746
747 oldt = newt;
748
749 const size_t sz = sizeof(EVENT)+event->Roi*2*1440;
750
751 vector<char> data(sz+event->Roi*2*1440);
752 memcpy(data.data(), event, sizeof(EVENT));
753
754 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
755
756 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
757 fDimRawData.Update(data);
758
759 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
760 CalibData::GetPixelStats(data2.data(), vec, event->Roi);
761
762 fDimEventData.Update(data2);
763
764
765
766
767 return 0;
768 }
769
770 bool IsRunStarted() const
771 {
772 return fExpectedRuns.find(fRunNumber-1)==fExpectedRuns.end();
773 }
774
775 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
776 {
777 // This function is called even when writing is switched off
778 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
779 if (it==fExpectedRuns.end())
780 {
781 ostringstream str;
782 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
783 return;
784 }
785
786 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
787 // return: 0=close scheduled / >0 already closed / <0 does not exist
788
789 fExpectedRuns.erase(it);
790 }
791
792 map<boost::thread::id, string> fLastMessage;
793
794 void factOut(int severity, int err, const char *message)
795 {
796 if (!fDebugLog && severity==99)
797 return;
798
799 ostringstream str;
800 //str << boost::this_thread::get_id() << " ";
801 str << "EventBuilder(";
802 if (err<0)
803 str << "---";
804 else
805 str << err;
806 str << "): " << message;
807
808 string &old = fLastMessage[boost::this_thread::get_id()];
809
810 if (str.str()==old)
811 return;
812 old = str.str();
813
814 fMsg.Update(str, severity);
815 }
816/*
817 void factStat(int64_t *stat, int len)
818 {
819 if (len!=7)
820 {
821 fMsg.Warn("factStat received unknown number of values.");
822 return;
823 }
824
825 vector<int64_t> data(1, g_maxMem);
826 data.insert(data.end(), stat, stat+len);
827
828 static vector<int64_t> last(8);
829 if (data==last)
830 return;
831 last = data;
832
833 fDimStatistics.Update(data);
834
835 // len ist die Laenge des arrays.
836 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
837 // kannst Du pruefen, ob die 100MB voll sind ....
838
839 ostringstream str;
840 str
841 << "Wait=" << stat[0] << " "
842 << "Skip=" << stat[1] << " "
843 << "Del=" << stat[2] << " "
844 << "Tot=" << stat[3] << " "
845 << "Mem=" << stat[4] << "/" << g_maxMem << " "
846 << "Read=" << stat[5] << " "
847 << "Conn=" << stat[6];
848
849 fMsg.Info(str);
850 }
851 */
852
853 void factStat(const EVT_STAT &stat)
854 {
855 fDimStatistics2.Update(stat);
856 /*
857 //some info about what happened since start of program (or last 'reset')
858 uint32_t reset ; //#if increased, reset all counters
859 uint32_t numRead[MAX_SOCK] ; //how often succesfull read from N sockets per loop
860
861 uint64_t gotByte[NBOARDS] ; //#Bytes read per Board
862 uint32_t gotErr[NBOARDS] ; //#Communication Errors per Board
863 uint32_t evtGet; //#new Start of Events read
864 uint32_t evtTot; //#complete Events read
865 uint32_t evtErr; //#Events with Errors
866 uint32_t evtSkp; //#Events incomplete (timeout)
867
868 uint32_t procTot; //#Events processed
869 uint32_t procErr; //#Events showed problem in processing
870 uint32_t procTrg; //#Events accepted by SW trigger
871 uint32_t procSkp; //#Events rejected by SW trigger
872
873 uint32_t feedTot; //#Events used for feedBack system
874 uint32_t feedErr; //#Events rejected by feedBack
875
876 uint32_t wrtTot; //#Events written to disk
877 uint32_t wrtErr; //#Events with write-error
878
879 uint32_t runOpen; //#Runs opened
880 uint32_t runClose; //#Runs closed
881 uint32_t runErr; //#Runs with open/close errors
882
883
884 //info about current connection status
885 uint8_t numConn[NBOARDS] ; //#Sockets succesfully open per board
886 */
887 }
888
889 void factStat(const GUI_STAT &stat)
890 {
891 fDimStatistics1.Update(stat);
892 /*
893 //info about status of the main threads
894 int32_t readStat ; //read thread
895 int32_t procStat ; //processing thread(s)
896 int32_t writStat ; //write thread
897
898 //info about some rates
899 int32_t deltaT ; //time in milli-seconds for rates
900 int32_t readEvt ; //#events read
901 int32_t procEvt ; //#events processed
902 int32_t writEvt ; //#events written
903 int32_t skipEvt ; //#events skipped
904
905 //some info about current state of event buffer (snapspot)
906 int32_t evtBuf; //#Events currently waiting in Buffer
907 uint64_t totMem; //#Bytes available in Buffer
908 uint64_t usdMem; //#Bytes currently used
909 uint64_t maxMem; //max #Bytes used during past Second
910 */
911 }
912
913
914 array<FAD::EventHeader, 40> fVecHeader;
915
916 template<typename T, class S>
917 array<T, 42> Compare(const S *vec, const T *t)
918 {
919 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
920
921 const T *min = NULL;
922 const T *val = NULL;
923 const T *max = NULL;
924
925 array<T, 42> arr;
926
927 bool rc = true;
928 for (int i=0; i<40; i++)
929 {
930 const char *base = reinterpret_cast<const char*>(vec+i);
931 const T *ref = reinterpret_cast<const T*>(base+offset);
932
933 arr[i] = *ref;
934
935 if (gi_NumConnect[i]!=7)
936 {
937 arr[i] = 0;
938 continue;
939 }
940
941 if (!val)
942 {
943 min = ref;
944 val = ref;
945 max = ref;
946 }
947
948 if (*ref<*min)
949 min = ref;
950
951 if (*ref>*max)
952 max = ref;
953
954 if (*val!=*ref)
955 rc = false;
956 }
957
958 arr[40] = val ? *min : 1;
959 arr[41] = val ? *max : 0;
960
961 return arr;
962 }
963
964 template<typename T>
965 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
966 {
967 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
968
969 T val = 0;
970 T rc = 0;
971
972 array<T, 42> vec;
973
974 bool first = true;
975
976 for (int i=0; i<40; i++)
977 {
978 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
979 const T *ref = reinterpret_cast<const T*>(base+offset);
980
981 vec[i+2] = *ref;
982
983 if (gi_NumConnect[i]!=7)
984 {
985 vec[i+2] = 0;
986 continue;
987 }
988
989 if (first)
990 {
991 first = false;
992 val = *ref;
993 rc = 0;
994 }
995
996 rc |= val^*ref;
997 }
998
999 vec[0] = rc;
1000 vec[1] = val;
1001
1002 return vec;
1003 }
1004
1005 template<typename T, size_t N>
1006 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1007 {
1008// svc.setQuality(vec[40]<=vec[41]);
1009 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1010 svc.updateService();
1011 }
1012
1013 template<typename T>
1014 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1015 {
1016 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1017 for (int i=0; i<40;i++)
1018 cout << " " << data.second[i+3];
1019 cout << endl;
1020 }
1021
1022 vector<uint> fNumConnected;
1023
1024 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1025 {
1026 const uint16_t id = h.Id();
1027 if (id>39)
1028 return;
1029
1030 if (fNumConnected.size()!=40)
1031 fNumConnected.resize(40);
1032
1033 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1034
1035 const bool changed = con!=fNumConnected || !IsThreadRunning();
1036
1037 fNumConnected = con;
1038
1039 const FAD::EventHeader old = fVecHeader[id];
1040 fVecHeader[id] = h;
1041
1042 if (old.fVersion != h.fVersion || changed)
1043 {
1044 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1045
1046 array<float,42> data;
1047 for (int i=0; i<42; i++)
1048 {
1049 ostringstream str;
1050 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1051 data[i] = stof(str.str());
1052 }
1053 Update(fDimFwVersion, data);
1054 }
1055
1056 if (old.fRunNumber != h.fRunNumber || changed)
1057 {
1058 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1059 fDimRunNumber.Update(run);
1060 }
1061
1062 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1063 {
1064 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1065 fDimPrescaler.Update(pre);
1066 }
1067
1068 if (old.fDNA != h.fDNA || changed)
1069 {
1070 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1071 Update(fDimDNA, dna, 40);
1072 }
1073
1074 if (old.fStatus != h.fStatus || changed)
1075 {
1076 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1077 Update(fDimStatus, sts);
1078 }
1079
1080 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1081 {
1082 array<uint16_t, FAD::kNumDac*42> dacs;
1083
1084 for (int i=0; i<FAD::kNumDac; i++)
1085 {
1086 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1087 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1088 }
1089
1090 Update(fDimDac, dacs);
1091 }
1092
1093 // -----------
1094
1095 static Time oldt(boost::date_time::neg_infin);
1096 Time newt;
1097
1098 if (newt>oldt+boost::posix_time::seconds(1))
1099 {
1100 oldt = newt;
1101
1102 // --- RefClock
1103
1104 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1105 Update(fDimRefClock, clk);
1106
1107 // --- Temperatures
1108
1109 const array<int16_t,42> tmp[4] =
1110 {
1111 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1112 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1113 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1114 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1115 };
1116
1117 vector<int16_t> data;
1118 data.reserve(82);
1119 data.push_back(tmp[0][40]); // min: 0
1120 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1121 data.push_back(tmp[0][41]); // max: 41
1122 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1123
1124 for (int j=1; j<=3; j++)
1125 {
1126 const array<int16_t,42> &ref = tmp[j];
1127
1128 // Gloabl min
1129 if (ref[40]<data[0]) // 40=min
1130 data[0] = ref[40];
1131
1132 // Global max
1133 if (ref[41]>data[41]) // 41=max
1134 data[41] = ref[41];
1135
1136 for (int i=0; i<40; i++)
1137 {
1138 // min per board
1139 if (ref[i]<data[i+1]) // data: 1-40
1140 data[i+1] = ref[i]; // ref: 0-39
1141
1142 // max per board
1143 if (ref[i]>data[i+42]) // data: 42-81
1144 data[i+42] = ref[i]; // ref: 0-39
1145 }
1146
1147
1148 }
1149
1150 vector<float> deg(82); // 0: global min, 1-40: min
1151 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1152 deg[i] = data[i]/16.;
1153 fDimTemperature.Update(deg);
1154 }
1155
1156 /*
1157 uint16_t fTriggerType;
1158 uint32_t fTriggerId;
1159 uint32_t fEventCounter;
1160 uint16_t fAdcClockPhaseShift;
1161 uint16_t fNumTriggersToGenerate;
1162 uint16_t fTriggerGeneratorPrescaler;
1163 uint32_t fTimeStamp;
1164 int16_t fTempDrs[kNumTemp]; // In units of 1/16 deg(?)
1165 uint16_t fDac[kNumDac];
1166 */
1167 }
1168};
1169
1170EventBuilderWrapper *EventBuilderWrapper::This = 0;
1171
1172// ----------- Event builder callbacks implementation ---------------
1173extern "C"
1174{
1175 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1176 {
1177 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1178 }
1179
1180 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1181 {
1182 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1183 }
1184
1185 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1186 {
1187 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1188 }
1189
1190 void factOut(int severity, int err, const char *message)
1191 {
1192 EventBuilderWrapper::This->factOut(severity, err, message);
1193 }
1194
1195 void factStat(GUI_STAT stat)
1196 {
1197 EventBuilderWrapper::This->factStat(stat);
1198 }
1199
1200 void factStatNew(EVT_STAT stat)
1201 {
1202 EventBuilderWrapper::This->factStat(stat);
1203 }
1204
1205 void debugHead(int socket, int/*board*/, void *buf)
1206 {
1207 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1208
1209 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1210 }
1211
1212 void debugStream(int isock, void *buf, int len)
1213 {
1214 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1215 }
1216
1217 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1218 {
1219 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1220 }
1221
1222 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
1223 {
1224 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event);
1225 }
1226
1227 void gotNewRun(int runnr, PEVNT_HEADER *headers)
1228 {
1229 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1230 }
1231
1232 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer)
1233 {
1234 return 100;
1235 }
1236
1237}
1238
1239#endif
Note: See TracBrowser for help on using the repository browser.