source: trunk/FACT++/src/EventBuilderWrapper.h@ 11827

Last change on this file since 11827 was 11824, checked in by tbretz, 15 years ago
Display some statistics in the FAD tab with th ehelp of current-run/next-run from the fadctrl -- in the way the users gets the most important infomration of what is going on.
File size: 37.2 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/date_time/posix_time/posix_time_types.hpp>
13
14#include "DimWriteStatistics.h"
15
16#include "DataCalib.h"
17#include "DataWriteRaw.h"
18
19#ifdef HAVE_FITS
20#include "DataWriteFits.h"
21#else
22#define DataWriteFits DataWriteRaw
23#endif
24
25namespace ba = boost::asio;
26namespace bs = boost::system;
27
28using ba::ip::tcp;
29
30using namespace std;
31
32// ========================================================================
33
34#include "EventBuilder.h"
35
36extern "C" {
37 extern void StartEvtBuild();
38 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
39}
40
41// ========================================================================
42
43class EventBuilderWrapper
44{
45public:
46 // FIXME
47 static EventBuilderWrapper *This;
48
49 MessageImp &fMsg;
50
51private:
52 boost::thread fThread;
53
54 enum CommandStates_t // g_runStat
55 {
56 kAbort = -2, // quit as soon as possible ('abort')
57 kExit = -1, // stop reading, quit when buffered events done ('exit')
58 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
59 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
60 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
61 kModeFlush = 10, // read data from camera, but skip them ('flush')
62 kModeTest = 20, // read data and process them, but do not write to disk ('test')
63 kModeFlag = 30, // read data, process and write all to disk ('flag')
64 kModeRun = 40, // read data, process and write selected to disk ('run')
65 };
66
67 enum
68 {
69 kCurrent = 0,
70 kTotal = 1,
71 kEventId = 2,
72 kTriggerId = 3,
73 };
74
75 enum FileFormat_t
76 {
77 kNone = 0,
78 kDebug,
79 kFits,
80 kRaw,
81 kCalib
82 };
83
84 FileFormat_t fFileFormat;
85
86
87 uint32_t fMaxRun;
88 uint32_t fLastOpened;
89 uint32_t fLastClosed;
90 uint32_t fNumEvts[4];
91
92 DimWriteStatistics fDimWriteStats;
93 DimDescribedService fDimRuns;
94 DimDescribedService fDimEvents;
95 DimDescribedService fDimRawData;
96 DimDescribedService fDimEventData;
97 DimDescribedService fDimFwVersion;
98 DimDescribedService fDimRunNumber;
99 DimDescribedService fDimStatus;
100 DimDescribedService fDimDNA;
101 DimDescribedService fDimTemperature;
102 DimDescribedService fDimPrescaler;
103 DimDescribedService fDimRefClock;
104 DimDescribedService fDimRoi;
105 DimDescribedService fDimDac;
106 DimDescribedService fDimDrsCalibration;
107 DimDescribedService fDimStatistics1;
108 DimDescribedService fDimStatistics2;
109
110 bool fDebugStream;
111 bool fDebugRead;
112 bool fDebugLog;
113
114 uint32_t fRunNumber;
115
116 void InitRunNumber()
117 {
118 // FIXME: Add a check that we are not too close to noon!
119 //const int night = Time().NightAsInt();
120
121 fRunNumber = 1000;
122
123 while (--fRunNumber>0)
124 {
125 const string name = DataProcessorImp::FormFileName(fRunNumber, "");
126
127 if (access((name+"bin").c_str(), F_OK) == 0)
128 break;
129 if (access((name+"fits").c_str(), F_OK) == 0)
130 break;
131 if (access((name+"drs.fits").c_str(), F_OK) == 0)
132 break;
133 }
134
135 fRunNumber++;
136
137 ostringstream str;
138 str << "Starting with run number " << fRunNumber;
139 fMsg.Message(str);
140
141 fMsg.Info(" ==> TODO: Run-number detection doesn't work when noon passes!");
142 fMsg.Info(" ==> TODO: Crosscheck with database!");
143 }
144
145public:
146 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
147 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
148 fDimWriteStats ("FAD_CONTROL", imp),
149 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
150 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
151 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
152 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
153 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
154 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
155 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
156 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
157 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
158 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
159 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
160 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
161 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
162 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560", ""),
163 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
164 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
165 fDebugStream(false), fDebugRead(false), fDebugLog(false)
166 {
167 if (This)
168 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
169
170 This = this;
171
172 memset(fNumEvts, 0, sizeof(fNumEvts));
173
174 fDimEvents.Update(fNumEvts);
175
176 for (size_t i=0; i<40; i++)
177 ConnectSlot(i, tcp::endpoint());
178
179 InitRunNumber();
180 }
181 virtual ~EventBuilderWrapper()
182 {
183 Abort();
184
185 // FIXME: Used timed_join and abort afterwards
186 // What's the maximum time the eb need to abort?
187 fThread.join();
188 //ffMsg.Info("EventBuilder stopped.");
189
190 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
191 delete *it;
192 }
193
194 struct RunDescription
195 {
196 uint32_t maxtime;
197 uint32_t maxevt;
198
199 FAD::Configuration reference;
200 };
201
202 map<uint32_t, RunDescription> fExpectedRuns;
203
204 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const FAD::Configuration &ref)
205 {
206 if (maxtime<=0 || maxtime>24*60*60)
207 maxtime = 24*60*60;
208 if (maxevt<=0 || maxevt>INT32_MAX)
209 maxevt = INT32_MAX;
210
211 const RunDescription descr =
212 {
213 uint32_t(maxtime),
214 uint32_t(maxevt),
215 ref
216 };
217
218 // FIMXE: Maybe reset an event counter so that the mcp can count events?
219
220 fExpectedRuns[fRunNumber] = descr;
221 return fRunNumber++;
222 }
223
224 bool IsThreadRunning()
225 {
226 return !fThread.timed_join(boost::posix_time::microseconds(0));
227 }
228
229 void SetMaxMemory(unsigned int mb) const
230 {
231 /*
232 if (mb*1000000<GetUsedMemory())
233 {
234 // ffMsg.Warn("...");
235 return;
236 }*/
237
238 g_maxMem = size_t(mb)*1000000;
239 }
240
241 void StartThread(const vector<tcp::endpoint> &addr)
242 {
243 if (IsThreadRunning())
244 {
245 fMsg.Warn("Start - EventBuilder still running");
246 return;
247 }
248
249 fLastMessage.clear();
250
251 for (size_t i=0; i<40; i++)
252 ConnectSlot(i, addr[i]);
253
254 g_runStat = kModeRun;
255
256 fMsg.Message("Starting EventBuilder thread");
257
258 fThread = boost::thread(StartEvtBuild);
259 }
260 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
261 {
262 if (i>39)
263 return;
264
265 if (addr==tcp::endpoint())
266 {
267 DisconnectSlot(i);
268 return;
269 }
270
271 g_port[i].sockAddr.sin_family = AF_INET;
272 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
273 g_port[i].sockAddr.sin_port = htons(addr.port());
274 // In this order
275 g_port[i].sockDef = 1;
276 }
277 void DisconnectSlot(unsigned int i)
278 {
279 if (i>39)
280 return;
281
282 g_port[i].sockDef = 0;
283 // In this order
284 g_port[i].sockAddr.sin_family = AF_INET;
285 g_port[i].sockAddr.sin_addr.s_addr = 0;
286 g_port[i].sockAddr.sin_port = 0;
287 }
288 void IgnoreSlot(unsigned int i)
289 {
290 if (i>39)
291 return;
292 if (g_port[i].sockAddr.sin_port==0)
293 return;
294
295 g_port[i].sockDef = -1;
296 }
297
298
299 void Abort()
300 {
301 fMsg.Message("Signal abort to EventBuilder thread...");
302 g_runStat = kAbort;
303 }
304
305 void ResetThread(bool soft)
306 {
307 /*
308 if (g_reset > 0)
309
310 * suspend reading
311 * reset = g_reset;
312 * g_reset=0
313
314 * reset% 10
315 == 0 leave event Buffers as they are
316 == 1 let all buffers drain (write (incomplete) events)
317 > 1 flush all buffers (do not write buffered events)
318
319 * (reset/10)%10
320 > 0 close all sockets and destroy them (also free the
321 allocated read-buffers)
322 recreate before resuming operation
323 [ this is more than just close/open that can be
324 triggered by e.g. close/open the base-socket ]
325
326 * (reset/100)%10
327 > 0 close all open run-files
328
329 * (reset/1000)
330 sleep so many seconds before resuming operation
331 (does not (yet) take into account time left when waiting
332 for buffers getting empty ...)
333
334 * resume_reading
335
336 */
337 fMsg.Message("Signal reset to EventBuilder thread...");
338 g_reset = soft ? 101 : 102;
339 }
340
341 void Exit()
342 {
343 fMsg.Message("Signal exit to EventBuilder thread...");
344 g_runStat = kExit;
345 }
346
347 /*
348 void Wait()
349 {
350 fThread.join();
351 ffMsg.Message("EventBuilder stopped.");
352 }*/
353
354 void Hybernate() const { g_runStat = kHybernate; }
355 void Sleep() const { g_runStat = kSleep; }
356 void FlushMode() const { g_runStat = kModeFlush; }
357 void TestMode() const { g_runStat = kModeTest; }
358 void FlagMode() const { g_runStat = kModeFlag; }
359 void RunMode() const { g_runStat = kModeRun; }
360
361 // FIXME: To be removed
362 void SetMode(int mode) const { g_runStat = mode; }
363
364 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
365 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
366 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
367 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
368
369 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
370 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
371
372 void SetOutputFormat(FileFormat_t f)
373 {
374 fFileFormat = f;
375 if (fFileFormat==kCalib)
376 {
377 DataCalib::Restart();
378 DataCalib::Update(fDimDrsCalibration);
379 fMsg.Message("Resetted DRS calibration.");
380 }
381 }
382
383 void SetDebugLog(bool b) { fDebugLog = b; }
384
385 void SetDebugStream(bool b)
386 {
387 fDebugStream = b;
388 if (b)
389 return;
390
391 for (int i=0; i<40; i++)
392 {
393 if (!fDumpStream[i].is_open())
394 continue;
395
396 fDumpStream[i].close();
397
398 ostringstream name;
399 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
400 fMsg.Message("Closed file '"+name.str()+"'");
401 }
402 }
403
404 void SetDebugRead(bool b)
405 {
406 fDebugRead = b;
407 if (b || !fDumpRead.is_open())
408 return;
409
410 fDumpRead.close();
411 fMsg.Message("Closed file 'socket_events.txt'");
412 }
413
414// size_t GetUsedMemory() const { return gi_usedMem; }
415
416 void LoadDrsCalibration(const char *fname)
417 {
418 if (!DataCalib::ReadFits(fname, fMsg))
419 return;
420 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
421 DataCalib::Update(fDimDrsCalibration);
422 }
423
424 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
425
426
427 /*
428 struct OpenFileToDim
429 {
430 int code;
431 char fileName[FILENAME_MAX];
432 };
433
434 SignalRunOpened(runid, filename);
435 // Send num open files
436 // Send runid, (more info about the run?), filename via dim
437
438 SignalEvtWritten(runid);
439 // Send num events written of newest file
440
441 SignalRunClose(runid);
442 // Send new num open files
443 // Send empty file-name if no file is open
444
445 */
446
447 // -------------- Mapped event builder callbacks ------------------
448
449 void UpdateRuns(const string &fname="")
450 {
451 uint32_t values[5] =
452 {
453 static_cast<uint32_t>(fFiles.size()),
454 0xffffffff,
455 0,
456 fLastOpened,
457 fLastClosed
458 };
459
460 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
461 it!=fFiles.end(); it++)
462 {
463 const DataProcessorImp *file = *it;
464
465 if (file->GetRunId()<values[1])
466 values[1] = file->GetRunId();
467
468 if (file->GetRunId()>values[2])
469 values[2] = file->GetRunId();
470 }
471
472 fMaxRun = values[2];
473
474 vector<char> data(sizeof(values)+fname.size()+1);
475 memcpy(data.data(), values, sizeof(values));
476 strcpy(data.data()+sizeof(values), fname.c_str());
477
478 fDimRuns.Update(data);
479 }
480
481 vector<DataProcessorImp*> fFiles;
482
483 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
484 {
485 fMsg.Info(" ==> TODO: Update run configuration in database!");
486
487 // Check if file already exists...
488 DataProcessorImp *file = 0;
489 switch (fFileFormat)
490 {
491 case kNone: file = new DataDump(runid, fMsg); break;
492 case kDebug: file = new DataDebug(runid, fMsg); break;
493 case kFits: file = new DataWriteFits(runid, fMsg); break;
494 case kRaw: file = new DataWriteRaw(runid, fMsg); break;
495 case kCalib: file = new DataCalib(runid, fDimDrsCalibration, fMsg); break;
496 }
497
498 try
499 {
500 if (!file->Open(h))
501 return 0;
502 }
503 catch (const exception &e)
504 {
505 fMsg.Error("Exception trying to open file: "+string(e.what()));
506 return 0;
507 }
508
509 fFiles.push_back(file);
510
511 ostringstream str;
512 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
513 fMsg.Info(str);
514
515 fDimWriteStats.FileOpened(file->GetFileName());
516
517 fLastOpened = runid;
518 UpdateRuns(file->GetFileName());
519
520 fNumEvts[kEventId] = 0;
521 fNumEvts[kTriggerId] = 0;
522
523 fNumEvts[kCurrent] = 0;
524 fDimEvents.Update(fNumEvts);
525 // fDimCurrentEvent.Update(uint32_t(0));
526
527 return reinterpret_cast<FileHandle_t>(file);
528 }
529
530 int runWrite(FileHandle_t handler, EVENT *e, size_t)
531 {
532 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
533
534 if (!file->WriteEvt(e))
535 return -1;
536
537 if (file->GetRunId()==fMaxRun)
538 {
539 fNumEvts[kCurrent]++;
540 fNumEvts[kEventId] = e->EventNum;
541 fNumEvts[kTriggerId] = e->TriggerNum;
542 }
543
544 fNumEvts[kTotal]++;
545
546 static Time oldt(boost::date_time::neg_infin);
547 Time newt;
548 if (newt>oldt+boost::posix_time::seconds(1))
549 {
550 fDimEvents.Update(fNumEvts);
551 oldt = newt;
552 }
553
554
555 // ===> SignalEvtWritten(runid);
556 // Send num events written of newest file
557
558 /* close run runId (all all runs if runId=0) */
559 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
560 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
561
562 return 0;
563 }
564
565 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
566 {
567 fMsg.Info(" ==> TODO: Update run configuration in database!");
568
569 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
570
571 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
572 if (it==fFiles.end())
573 {
574 ostringstream str;
575 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
576 fMsg.Fatal(str);
577 return -1;
578 }
579
580 fFiles.erase(it);
581
582 fLastClosed = file->GetRunId();
583 UpdateRuns();
584
585 fDimEvents.Update(fNumEvts);
586
587 const bool rc = file->Close(tail);
588 if (!rc)
589 {
590 // Error message
591 }
592
593 ostringstream str;
594 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
595 fMsg.Info(str);
596
597 delete file;
598
599 // ==> SignalRunClose(runid);
600 // Send new num open files
601 // Send empty file-name if no file is open
602
603 return rc ? 0 : -1;
604 }
605
606 ofstream fDumpStream[40];
607
608 void debugStream(int isock, void *buf, int len)
609 {
610 if (!fDebugStream)
611 return;
612
613 const int slot = isock/7;
614 if (slot<0 || slot>39)
615 return;
616
617 if (!fDumpStream[slot].is_open())
618 {
619 ostringstream name;
620 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
621
622 fDumpStream[slot].open(name.str().c_str(), ios::app);
623 if (!fDumpStream[slot])
624 {
625 ostringstream str;
626 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
627 fMsg.Error(str);
628
629 return;
630 }
631
632 fMsg.Message("Opened file '"+name.str()+"' for writing.");
633 }
634
635 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
636 }
637
638 ofstream fDumpRead; // Stream to possibly dump docket events
639
640 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
641 {
642 // isock = socketID (0-279)
643 // ibyte = #bytes gelesen
644 // event = eventId (oder 0 wenn noch nicht bekannt)
645 // state : 1=finished reading data
646 // 0=reading data
647 // -1=start reading data (header)
648 // -2=start reading data,
649 // eventId not known yet (too little data)
650 // tsec, tusec = time when reading seconds, microseconds
651 //
652 if (!fDebugRead || ibyte==0)
653 return;
654
655 if (!fDumpRead.is_open())
656 {
657 fDumpRead.open("socket_events.txt", ios::app);
658 if (!fDumpRead)
659 {
660 ostringstream str;
661 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
662 fMsg.Error(str);
663
664 return;
665 }
666
667 fMsg.Message("Opened file 'socket_events.txt' for writing.");
668
669 fDumpRead << "# START: " << Time().GetAsStr() << endl;
670 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
671 }
672
673 fDumpRead
674 << setw(2) << state << " "
675 << setw(8) << tsec << " "
676 << setw(9) << tusec << " "
677 << setw(3) << isock << " "
678 << setw(2) << isock/7 << " "
679 << runno << " "
680 << event << " "
681 << ftmevt << " "
682 << ibyte << endl;
683 }
684
685 array<uint16_t,2> fVecRoi;
686
687 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
688 {
689 /*
690 fadhd[i] ist ein array mit den 40 fad-headers
691 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
692
693 event ist die Struktur, die auch die write routine erhaelt;
694 darin sind im header die 'soll-werte' fuer z.B. eventID
695 als auch die ADC-Werte (falls Du die brauchst)
696
697 Wenn die routine einen negativen Wert liefert, wird das event
698 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
699 */
700
701 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
702
703 if (roi!=fVecRoi)
704 {
705 Update(fDimRoi, roi);
706 fVecRoi = roi;
707 }
708
709 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
710 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
711
712 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
713 {
714 // FIXME: Compare with expectations!!!
715 if (ptr->fStartDelimiter==0)
716 {
717 if (ptr==beg)
718 beg++;
719 continue;
720 }
721
722 if (beg->fStatus != ptr->fStatus)
723 {
724 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
725 CloseRunFile(runNr, 0, 0);
726 return -1;
727 }
728
729 if (beg->fRunNumber != ptr->fRunNumber)
730 {
731 fMsg.Error("Inconsistent run number detected.... closing run.");
732 CloseRunFile(runNr, 0, 0);
733 return -1;
734 }
735
736 /*
737 if (beg->fVersion != ptr->fVersion)
738 {
739 Error("Inconsist firmware version detected.... closing run.");
740 CloseRunFile(runNr, 0, 0);
741 break;
742 }
743 if (beg->fEventCounter != ptr->fEventCounter)
744 {
745 Error("Inconsist run number detected.... closing run.");
746 CloseRunFile(runNr, 0, 0);
747 break;
748 }
749 if (beg->fTriggerCounter != ptr->fTriggerCounter)
750 {
751 Error("Inconsist trigger number detected.... closing run.");
752 CloseRunFile(runNr, 0, 0);
753 break;
754 }*/
755
756 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
757 {
758 fMsg.Error("Inconsistent phase shift detected.... closing run.");
759 CloseRunFile(runNr, 0, 0);
760 return -1;
761 }
762
763 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
764 {
765 fMsg.Error("Inconsistent DAC values detected.... closing run.");
766 CloseRunFile(runNr, 0, 0);
767 return -1;
768 }
769
770 if (beg->fTriggerType != ptr->fTriggerType)
771 {
772 fMsg.Error("Inconsistent trigger type detected.... closing run.");
773 CloseRunFile(runNr, 0, 0);
774 return -1;
775 }
776 }
777
778 // check REFCLK_frequency
779 // check consistency with command configuration
780 // how to log errors?
781 // need gotNewRun/closedRun to know it is finished
782
783 static Time oldt(boost::date_time::neg_infin);
784 Time newt;
785
786 // FIXME: Only send events if the have newer run-numbers
787 if (newt<oldt+boost::posix_time::seconds(1))
788 return 0;
789
790 oldt = newt;
791
792 const size_t sz = sizeof(EVENT)+event->Roi*2*1440;
793
794 vector<char> data(sz+event->Roi*2*1440);
795 memcpy(data.data(), event, sizeof(EVENT));
796
797 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
798
799 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
800 fDimRawData.Update(data);
801
802 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
803 CalibData::GetPixelStats(data2.data(), vec, event->Roi);
804
805 fDimEventData.Update(data2);
806
807
808
809
810 return 0;
811 }
812
813 bool IsRunStarted() const
814 {
815 return fExpectedRuns.find(fRunNumber-1)==fExpectedRuns.end();
816 }
817
818 uint32_t GetRunNumber() const
819 {
820 return fRunNumber;
821 }
822
823 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
824 {
825 // This function is called even when writing is switched off
826 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
827 if (it==fExpectedRuns.end())
828 {
829 ostringstream str;
830 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
831 return;
832 }
833
834 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
835 // return: 0=close scheduled / >0 already closed / <0 does not exist
836
837 fExpectedRuns.erase(it);
838 }
839
840 map<boost::thread::id, string> fLastMessage;
841
842 void factOut(int severity, int err, const char *message)
843 {
844 if (!fDebugLog && severity==99)
845 return;
846
847 ostringstream str;
848 //str << boost::this_thread::get_id() << " ";
849 str << "EventBuilder(";
850 if (err<0)
851 str << "---";
852 else
853 str << err;
854 str << "): " << message;
855
856 string &old = fLastMessage[boost::this_thread::get_id()];
857
858 if (str.str()==old)
859 return;
860 old = str.str();
861
862 fMsg.Update(str, severity);
863 }
864/*
865 void factStat(int64_t *stat, int len)
866 {
867 if (len!=7)
868 {
869 fMsg.Warn("factStat received unknown number of values.");
870 return;
871 }
872
873 vector<int64_t> data(1, g_maxMem);
874 data.insert(data.end(), stat, stat+len);
875
876 static vector<int64_t> last(8);
877 if (data==last)
878 return;
879 last = data;
880
881 fDimStatistics.Update(data);
882
883 // len ist die Laenge des arrays.
884 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
885 // kannst Du pruefen, ob die 100MB voll sind ....
886
887 ostringstream str;
888 str
889 << "Wait=" << stat[0] << " "
890 << "Skip=" << stat[1] << " "
891 << "Del=" << stat[2] << " "
892 << "Tot=" << stat[3] << " "
893 << "Mem=" << stat[4] << "/" << g_maxMem << " "
894 << "Read=" << stat[5] << " "
895 << "Conn=" << stat[6];
896
897 fMsg.Info(str);
898 }
899 */
900
901 void factStat(const EVT_STAT &stat)
902 {
903 fDimStatistics2.Update(stat);
904 /*
905 //some info about what happened since start of program (or last 'reset')
906 uint32_t reset ; //#if increased, reset all counters
907 uint32_t numRead[MAX_SOCK] ; //how often succesfull read from N sockets per loop
908
909 uint64_t gotByte[NBOARDS] ; //#Bytes read per Board
910 uint32_t gotErr[NBOARDS] ; //#Communication Errors per Board
911 uint32_t evtGet; //#new Start of Events read
912 uint32_t evtTot; //#complete Events read
913 uint32_t evtErr; //#Events with Errors
914 uint32_t evtSkp; //#Events incomplete (timeout)
915
916 uint32_t procTot; //#Events processed
917 uint32_t procErr; //#Events showed problem in processing
918 uint32_t procTrg; //#Events accepted by SW trigger
919 uint32_t procSkp; //#Events rejected by SW trigger
920
921 uint32_t feedTot; //#Events used for feedBack system
922 uint32_t feedErr; //#Events rejected by feedBack
923
924 uint32_t wrtTot; //#Events written to disk
925 uint32_t wrtErr; //#Events with write-error
926
927 uint32_t runOpen; //#Runs opened
928 uint32_t runClose; //#Runs closed
929 uint32_t runErr; //#Runs with open/close errors
930
931
932 //info about current connection status
933 uint8_t numConn[NBOARDS] ; //#Sockets succesfully open per board
934 */
935 }
936
937 void factStat(const GUI_STAT &stat)
938 {
939 fDimStatistics1.Update(stat);
940 /*
941 //info about status of the main threads
942 int32_t readStat ; //read thread
943 int32_t procStat ; //processing thread(s)
944 int32_t writStat ; //write thread
945
946 //info about some rates
947 int32_t deltaT ; //time in milli-seconds for rates
948 int32_t readEvt ; //#events read
949 int32_t procEvt ; //#events processed
950 int32_t writEvt ; //#events written
951 int32_t skipEvt ; //#events skipped
952
953 //some info about current state of event buffer (snapspot)
954 int32_t evtBuf; //#Events currently waiting in Buffer
955 uint64_t totMem; //#Bytes available in Buffer
956 uint64_t usdMem; //#Bytes currently used
957 uint64_t maxMem; //max #Bytes used during past Second
958 */
959 }
960
961
962 array<FAD::EventHeader, 40> fVecHeader;
963
964 template<typename T, class S>
965 array<T, 42> Compare(const S *vec, const T *t)
966 {
967 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
968
969 const T *min = NULL;
970 const T *val = NULL;
971 const T *max = NULL;
972
973 array<T, 42> arr;
974
975 bool rc = true;
976 for (int i=0; i<40; i++)
977 {
978 const char *base = reinterpret_cast<const char*>(vec+i);
979 const T *ref = reinterpret_cast<const T*>(base+offset);
980
981 arr[i] = *ref;
982
983 if (gi_NumConnect[i]!=7)
984 {
985 arr[i] = 0;
986 continue;
987 }
988
989 if (!val)
990 {
991 min = ref;
992 val = ref;
993 max = ref;
994 }
995
996 if (*ref<*min)
997 min = ref;
998
999 if (*ref>*max)
1000 max = ref;
1001
1002 if (*val!=*ref)
1003 rc = false;
1004 }
1005
1006 arr[40] = val ? *min : 1;
1007 arr[41] = val ? *max : 0;
1008
1009 return arr;
1010 }
1011
1012 template<typename T>
1013 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1014 {
1015 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1016
1017 T val = 0;
1018 T rc = 0;
1019
1020 array<T, 42> vec;
1021
1022 bool first = true;
1023
1024 for (int i=0; i<40; i++)
1025 {
1026 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1027 const T *ref = reinterpret_cast<const T*>(base+offset);
1028
1029 vec[i+2] = *ref;
1030
1031 if (gi_NumConnect[i]!=7)
1032 {
1033 vec[i+2] = 0;
1034 continue;
1035 }
1036
1037 if (first)
1038 {
1039 first = false;
1040 val = *ref;
1041 rc = 0;
1042 }
1043
1044 rc |= val^*ref;
1045 }
1046
1047 vec[0] = rc;
1048 vec[1] = val;
1049
1050 return vec;
1051 }
1052
1053 template<typename T, size_t N>
1054 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1055 {
1056// svc.setQuality(vec[40]<=vec[41]);
1057 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1058 svc.updateService();
1059 }
1060
1061 template<typename T>
1062 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1063 {
1064 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1065 for (int i=0; i<40;i++)
1066 cout << " " << data.second[i+3];
1067 cout << endl;
1068 }
1069
1070 vector<uint> fNumConnected;
1071
1072 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1073 {
1074 const uint16_t id = h.Id();
1075 if (id>39)
1076 return;
1077
1078 if (fNumConnected.size()!=40)
1079 fNumConnected.resize(40);
1080
1081 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1082
1083 const bool changed = con!=fNumConnected || !IsThreadRunning();
1084
1085 fNumConnected = con;
1086
1087 const FAD::EventHeader old = fVecHeader[id];
1088 fVecHeader[id] = h;
1089
1090 if (old.fVersion != h.fVersion || changed)
1091 {
1092 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1093
1094 array<float,42> data;
1095 for (int i=0; i<42; i++)
1096 {
1097 ostringstream str;
1098 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1099 data[i] = stof(str.str());
1100 }
1101 Update(fDimFwVersion, data);
1102 }
1103
1104 if (old.fRunNumber != h.fRunNumber || changed)
1105 {
1106 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1107 fDimRunNumber.Update(run);
1108 }
1109
1110 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1111 {
1112 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1113 fDimPrescaler.Update(pre);
1114 }
1115
1116 if (old.fDNA != h.fDNA || changed)
1117 {
1118 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1119 Update(fDimDNA, dna, 40);
1120 }
1121
1122 if (old.fStatus != h.fStatus || changed)
1123 {
1124 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1125 Update(fDimStatus, sts);
1126 }
1127
1128 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1129 {
1130 array<uint16_t, FAD::kNumDac*42> dacs;
1131
1132 for (int i=0; i<FAD::kNumDac; i++)
1133 {
1134 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1135 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1136 }
1137
1138 Update(fDimDac, dacs);
1139 }
1140
1141 // -----------
1142
1143 static Time oldt(boost::date_time::neg_infin);
1144 Time newt;
1145
1146 if (newt>oldt+boost::posix_time::seconds(1))
1147 {
1148 oldt = newt;
1149
1150 // --- RefClock
1151
1152 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1153 Update(fDimRefClock, clk);
1154
1155 // --- Temperatures
1156
1157 const array<int16_t,42> tmp[4] =
1158 {
1159 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1160 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1161 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1162 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1163 };
1164
1165 vector<int16_t> data;
1166 data.reserve(82);
1167 data.push_back(tmp[0][40]); // min: 0
1168 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1169 data.push_back(tmp[0][41]); // max: 41
1170 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1171
1172 for (int j=1; j<=3; j++)
1173 {
1174 const array<int16_t,42> &ref = tmp[j];
1175
1176 // Gloabl min
1177 if (ref[40]<data[0]) // 40=min
1178 data[0] = ref[40];
1179
1180 // Global max
1181 if (ref[41]>data[41]) // 41=max
1182 data[41] = ref[41];
1183
1184 for (int i=0; i<40; i++)
1185 {
1186 // min per board
1187 if (ref[i]<data[i+1]) // data: 1-40
1188 data[i+1] = ref[i]; // ref: 0-39
1189
1190 // max per board
1191 if (ref[i]>data[i+42]) // data: 42-81
1192 data[i+42] = ref[i]; // ref: 0-39
1193 }
1194
1195
1196 }
1197
1198 vector<float> deg(82); // 0: global min, 1-40: min
1199 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1200 deg[i] = data[i]/16.;
1201 fDimTemperature.Update(deg);
1202 }
1203
1204 /*
1205 uint16_t fTriggerType;
1206 uint32_t fTriggerId;
1207 uint32_t fEventCounter;
1208 uint16_t fAdcClockPhaseShift;
1209 uint16_t fNumTriggersToGenerate;
1210 uint16_t fTriggerGeneratorPrescaler;
1211 uint32_t fTimeStamp;
1212 int16_t fTempDrs[kNumTemp]; // In units of 1/16 deg(?)
1213 uint16_t fDac[kNumDac];
1214 */
1215 }
1216};
1217
1218EventBuilderWrapper *EventBuilderWrapper::This = 0;
1219
1220// ----------- Event builder callbacks implementation ---------------
1221extern "C"
1222{
1223 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1224 {
1225 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1226 }
1227
1228 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1229 {
1230 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1231 }
1232
1233 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1234 {
1235 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1236 }
1237
1238 void factOut(int severity, int err, const char *message)
1239 {
1240 EventBuilderWrapper::This->factOut(severity, err, message);
1241 }
1242
1243 void factStat(GUI_STAT stat)
1244 {
1245 EventBuilderWrapper::This->factStat(stat);
1246 }
1247
1248 void factStatNew(EVT_STAT stat)
1249 {
1250 EventBuilderWrapper::This->factStat(stat);
1251 }
1252
1253 void debugHead(int socket, int/*board*/, void *buf)
1254 {
1255 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1256
1257 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1258 }
1259
1260 void debugStream(int isock, void *buf, int len)
1261 {
1262 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1263 }
1264
1265 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1266 {
1267 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1268 }
1269
1270 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
1271 {
1272 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event);
1273 }
1274
1275 void gotNewRun(int runnr, PEVNT_HEADER *headers)
1276 {
1277 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1278 }
1279
1280 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer)
1281 {
1282 return 100;
1283 }
1284
1285}
1286
1287#endif
Note: See TracBrowser for help on using the repository browser.