source: trunk/FACT++/src/EventBuilderWrapper.h@ 11822

Last change on this file since 11822 was 11819, checked in by tbretz, 14 years ago
Added code for eventCheck; added command to load a DRS calibration file.
File size: 37.0 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/date_time/posix_time/posix_time_types.hpp>
13
14#include "DimWriteStatistics.h"
15
16#include "DataCalib.h"
17#include "DataWriteRaw.h"
18
19#ifdef HAVE_FITS
20#include "DataWriteFits.h"
21#else
22#define DataWriteFits DataWriteRaw
23#endif
24
25namespace ba = boost::asio;
26namespace bs = boost::system;
27
28using ba::ip::tcp;
29
30using namespace std;
31
32// ========================================================================
33
34#include "EventBuilder.h"
35
36extern "C" {
37 extern void StartEvtBuild();
38 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
39}
40
41// ========================================================================
42
43class EventBuilderWrapper
44{
45public:
46 // FIXME
47 static EventBuilderWrapper *This;
48
49 MessageImp &fMsg;
50
51private:
52 boost::thread fThread;
53
54 enum CommandStates_t // g_runStat
55 {
56 kAbort = -2, // quit as soon as possible ('abort')
57 kExit = -1, // stop reading, quit when buffered events done ('exit')
58 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
59 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
60 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
61 kModeFlush = 10, // read data from camera, but skip them ('flush')
62 kModeTest = 20, // read data and process them, but do not write to disk ('test')
63 kModeFlag = 30, // read data, process and write all to disk ('flag')
64 kModeRun = 40, // read data, process and write selected to disk ('run')
65 };
66
67 enum
68 {
69 kCurrent = 0,
70 kTotal = 1,
71 kEventId = 2,
72 kTriggerId = 3,
73 };
74
75 enum FileFormat_t
76 {
77 kNone = 0,
78 kDebug,
79 kFits,
80 kRaw,
81 kCalib
82 };
83
84 FileFormat_t fFileFormat;
85
86
87 uint32_t fMaxRun;
88 uint32_t fLastOpened;
89 uint32_t fLastClosed;
90 uint32_t fNumEvts[4];
91
92 DimWriteStatistics fDimWriteStats;
93 DimDescribedService fDimRuns;
94 DimDescribedService fDimEvents;
95 DimDescribedService fDimRawData;
96 DimDescribedService fDimEventData;
97 DimDescribedService fDimFwVersion;
98 DimDescribedService fDimRunNumber;
99 DimDescribedService fDimStatus;
100 DimDescribedService fDimDNA;
101 DimDescribedService fDimTemperature;
102 DimDescribedService fDimPrescaler;
103 DimDescribedService fDimRefClock;
104 DimDescribedService fDimRoi;
105 DimDescribedService fDimDac;
106 DimDescribedService fDimDrsCalibration;
107 DimDescribedService fDimStatistics1;
108 DimDescribedService fDimStatistics2;
109
110 bool fDebugStream;
111 bool fDebugRead;
112 bool fDebugLog;
113
114 uint32_t fRunNumber;
115
116 void InitRunNumber()
117 {
118 // FIXME: Add a check that we are not too close to noon!
119 //const int night = Time().NightAsInt();
120
121 fRunNumber = 1000;
122
123 while (--fRunNumber>0)
124 {
125 const string name = DataProcessorImp::FormFileName(fRunNumber, "");
126
127 if (access((name+"bin").c_str(), F_OK) == 0)
128 break;
129 if (access((name+"fits").c_str(), F_OK) == 0)
130 break;
131 if (access((name+"drs.fits").c_str(), F_OK) == 0)
132 break;
133 }
134
135 fRunNumber++;
136
137 ostringstream str;
138 str << "Starting with run number " << fRunNumber;
139 fMsg.Message(str);
140
141 fMsg.Info(" ==> TODO: Run-number detection doesn't work when noon passes!");
142 fMsg.Info(" ==> TODO: Crosscheck with database!");
143 }
144
145public:
146 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
147 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
148 fDimWriteStats ("FAD_CONTROL", imp),
149 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
150 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
151 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
152 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
153 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
154 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
155 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
156 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
157 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
158 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
159 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
160 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
161 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
162 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560", ""),
163 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
164 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
165 fDebugStream(false), fDebugRead(false), fDebugLog(false)
166 {
167 if (This)
168 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
169
170 This = this;
171
172 memset(fNumEvts, 0, sizeof(fNumEvts));
173
174 fDimEvents.Update(fNumEvts);
175
176 for (size_t i=0; i<40; i++)
177 ConnectSlot(i, tcp::endpoint());
178
179 InitRunNumber();
180 }
181 virtual ~EventBuilderWrapper()
182 {
183 Abort();
184
185 // FIXME: Used timed_join and abort afterwards
186 // What's the maximum time the eb need to abort?
187 fThread.join();
188 //ffMsg.Info("EventBuilder stopped.");
189
190 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
191 delete *it;
192 }
193
194 struct RunDescription
195 {
196 uint32_t maxtime;
197 uint32_t maxevt;
198
199 FAD::Configuration reference;
200 };
201
202 map<uint32_t, RunDescription> fExpectedRuns;
203
204 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const FAD::Configuration &ref)
205 {
206 if (maxtime<=0 || maxtime>24*60*60)
207 maxtime = 24*60*60;
208 if (maxevt<=0 || maxevt>INT32_MAX)
209 maxevt = INT32_MAX;
210
211 const RunDescription descr =
212 {
213 uint32_t(maxtime),
214 uint32_t(maxevt),
215 ref
216 };
217
218 // FIMXE: Maybe reset an event counter so that the mcp can count events?
219
220 fExpectedRuns[fRunNumber] = descr;
221 return fRunNumber++;
222 }
223
224 bool IsThreadRunning()
225 {
226 return !fThread.timed_join(boost::posix_time::microseconds(0));
227 }
228
229 void SetMaxMemory(unsigned int mb) const
230 {
231 /*
232 if (mb*1000000<GetUsedMemory())
233 {
234 // ffMsg.Warn("...");
235 return;
236 }*/
237
238 g_maxMem = size_t(mb)*1000000;
239 }
240
241 void StartThread(const vector<tcp::endpoint> &addr)
242 {
243 if (IsThreadRunning())
244 {
245 fMsg.Warn("Start - EventBuilder still running");
246 return;
247 }
248
249 fLastMessage.clear();
250
251 for (size_t i=0; i<40; i++)
252 ConnectSlot(i, addr[i]);
253
254 g_runStat = kModeRun;
255
256 fMsg.Message("Starting EventBuilder thread");
257
258 fThread = boost::thread(StartEvtBuild);
259 }
260 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
261 {
262 if (i>39)
263 return;
264
265 if (addr==tcp::endpoint())
266 {
267 DisconnectSlot(i);
268 return;
269 }
270
271 g_port[i].sockAddr.sin_family = AF_INET;
272 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
273 g_port[i].sockAddr.sin_port = htons(addr.port());
274 // In this order
275 g_port[i].sockDef = 1;
276 }
277 void DisconnectSlot(unsigned int i)
278 {
279 if (i>39)
280 return;
281
282 g_port[i].sockDef = 0;
283 // In this order
284 g_port[i].sockAddr.sin_family = AF_INET;
285 g_port[i].sockAddr.sin_addr.s_addr = 0;
286 g_port[i].sockAddr.sin_port = 0;
287 }
288 void IgnoreSlot(unsigned int i)
289 {
290 if (i>39)
291 return;
292 if (g_port[i].sockAddr.sin_port==0)
293 return;
294
295 g_port[i].sockDef = -1;
296 }
297
298
299 void Abort()
300 {
301 fMsg.Message("Signal abort to EventBuilder thread...");
302 g_runStat = kAbort;
303 }
304
305 void ResetThread(bool soft)
306 {
307 /*
308 if (g_reset > 0)
309
310 * suspend reading
311 * reset = g_reset;
312 * g_reset=0
313
314 * reset% 10
315 == 0 leave event Buffers as they are
316 == 1 let all buffers drain (write (incomplete) events)
317 > 1 flush all buffers (do not write buffered events)
318
319 * (reset/10)%10
320 > 0 close all sockets and destroy them (also free the
321 allocated read-buffers)
322 recreate before resuming operation
323 [ this is more than just close/open that can be
324 triggered by e.g. close/open the base-socket ]
325
326 * (reset/100)%10
327 > 0 close all open run-files
328
329 * (reset/1000)
330 sleep so many seconds before resuming operation
331 (does not (yet) take into account time left when waiting
332 for buffers getting empty ...)
333
334 * resume_reading
335
336 */
337 fMsg.Message("Signal reset to EventBuilder thread...");
338 g_reset = soft ? 101 : 102;
339 }
340
341 void Exit()
342 {
343 fMsg.Message("Signal exit to EventBuilder thread...");
344 g_runStat = kExit;
345 }
346
347 /*
348 void Wait()
349 {
350 fThread.join();
351 ffMsg.Message("EventBuilder stopped.");
352 }*/
353
354 void Hybernate() const { g_runStat = kHybernate; }
355 void Sleep() const { g_runStat = kSleep; }
356 void FlushMode() const { g_runStat = kModeFlush; }
357 void TestMode() const { g_runStat = kModeTest; }
358 void FlagMode() const { g_runStat = kModeFlag; }
359 void RunMode() const { g_runStat = kModeRun; }
360
361 // FIXME: To be removed
362 void SetMode(int mode) const { g_runStat = mode; }
363
364 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
365 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
366 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
367 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
368
369 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
370 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
371
372 void SetOutputFormat(FileFormat_t f)
373 {
374 fFileFormat = f;
375 if (fFileFormat==kCalib)
376 {
377 DataCalib::Restart();
378 DataCalib::Update(fDimDrsCalibration);
379 fMsg.Message("Resetted DRS calibration.");
380 }
381 }
382
383 void SetDebugLog(bool b) { fDebugLog = b; }
384
385 void SetDebugStream(bool b)
386 {
387 fDebugStream = b;
388 if (b)
389 return;
390
391 for (int i=0; i<40; i++)
392 {
393 if (!fDumpStream[i].is_open())
394 continue;
395
396 fDumpStream[i].close();
397
398 ostringstream name;
399 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
400 fMsg.Message("Closed file '"+name.str()+"'");
401 }
402 }
403
404 void SetDebugRead(bool b)
405 {
406 fDebugRead = b;
407 if (b || !fDumpRead.is_open())
408 return;
409
410 fDumpRead.close();
411 fMsg.Message("Closed file 'socket_events.txt'");
412 }
413
414// size_t GetUsedMemory() const { return gi_usedMem; }
415
416 void LoadDrsCalibration(const char *fname)
417 {
418 DataCalib::ReadFits(fname, fMsg);
419 }
420
421 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
422
423
424 /*
425 struct OpenFileToDim
426 {
427 int code;
428 char fileName[FILENAME_MAX];
429 };
430
431 SignalRunOpened(runid, filename);
432 // Send num open files
433 // Send runid, (more info about the run?), filename via dim
434
435 SignalEvtWritten(runid);
436 // Send num events written of newest file
437
438 SignalRunClose(runid);
439 // Send new num open files
440 // Send empty file-name if no file is open
441
442 */
443
444 // -------------- Mapped event builder callbacks ------------------
445
446 void UpdateRuns(const string &fname="")
447 {
448 uint32_t values[5] =
449 {
450 static_cast<uint32_t>(fFiles.size()),
451 0xffffffff,
452 0,
453 fLastOpened,
454 fLastClosed
455 };
456
457 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
458 it!=fFiles.end(); it++)
459 {
460 const DataProcessorImp *file = *it;
461
462 if (file->GetRunId()<values[1])
463 values[1] = file->GetRunId();
464
465 if (file->GetRunId()>values[2])
466 values[2] = file->GetRunId();
467 }
468
469 fMaxRun = values[2];
470
471 vector<char> data(sizeof(values)+fname.size()+1);
472 memcpy(data.data(), values, sizeof(values));
473 strcpy(data.data()+sizeof(values), fname.c_str());
474
475 fDimRuns.Update(data);
476 }
477
478 vector<DataProcessorImp*> fFiles;
479
480 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
481 {
482 fMsg.Info(" ==> TODO: Update run configuration in database!");
483
484 // Check if file already exists...
485 DataProcessorImp *file = 0;
486 switch (fFileFormat)
487 {
488 case kNone: file = new DataDump(runid, fMsg); break;
489 case kDebug: file = new DataDebug(runid, fMsg); break;
490 case kFits: file = new DataWriteFits(runid, fMsg); break;
491 case kRaw: file = new DataWriteRaw(runid, fMsg); break;
492 case kCalib: file = new DataCalib(runid, fDimDrsCalibration, fMsg); break;
493 }
494
495 try
496 {
497 if (!file->Open(h))
498 return 0;
499 }
500 catch (const exception &e)
501 {
502 fMsg.Error("Exception trying to open file: "+string(e.what()));
503 return 0;
504 }
505
506 fFiles.push_back(file);
507
508 ostringstream str;
509 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
510 fMsg.Info(str);
511
512 fDimWriteStats.FileOpened(file->GetFileName());
513
514 fLastOpened = runid;
515 UpdateRuns(file->GetFileName());
516
517 fNumEvts[kEventId] = 0;
518 fNumEvts[kTriggerId] = 0;
519
520 fNumEvts[kCurrent] = 0;
521 fDimEvents.Update(fNumEvts);
522 // fDimCurrentEvent.Update(uint32_t(0));
523
524 return reinterpret_cast<FileHandle_t>(file);
525 }
526
527 int runWrite(FileHandle_t handler, EVENT *e, size_t)
528 {
529 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
530
531 if (!file->WriteEvt(e))
532 return -1;
533
534 if (file->GetRunId()==fMaxRun)
535 {
536 fNumEvts[kCurrent]++;
537 fNumEvts[kEventId] = e->EventNum;
538 fNumEvts[kTriggerId] = e->TriggerNum;
539 }
540
541 fNumEvts[kTotal]++;
542
543 static Time oldt(boost::date_time::neg_infin);
544 Time newt;
545 if (newt>oldt+boost::posix_time::seconds(1))
546 {
547 fDimEvents.Update(fNumEvts);
548 oldt = newt;
549 }
550
551
552 // ===> SignalEvtWritten(runid);
553 // Send num events written of newest file
554
555 /* close run runId (all all runs if runId=0) */
556 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
557 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
558
559 return 0;
560 }
561
562 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
563 {
564 fMsg.Info(" ==> TODO: Update run configuration in database!");
565
566 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
567
568 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
569 if (it==fFiles.end())
570 {
571 ostringstream str;
572 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
573 fMsg.Fatal(str);
574 return -1;
575 }
576
577 fFiles.erase(it);
578
579 fLastClosed = file->GetRunId();
580 UpdateRuns();
581
582 fDimEvents.Update(fNumEvts);
583
584 const bool rc = file->Close(tail);
585 if (!rc)
586 {
587 // Error message
588 }
589
590 ostringstream str;
591 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
592 fMsg.Info(str);
593
594 delete file;
595
596 // ==> SignalRunClose(runid);
597 // Send new num open files
598 // Send empty file-name if no file is open
599
600 return rc ? 0 : -1;
601 }
602
603 ofstream fDumpStream[40];
604
605 void debugStream(int isock, void *buf, int len)
606 {
607 if (!fDebugStream)
608 return;
609
610 const int slot = isock/7;
611 if (slot<0 || slot>39)
612 return;
613
614 if (!fDumpStream[slot].is_open())
615 {
616 ostringstream name;
617 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
618
619 fDumpStream[slot].open(name.str().c_str(), ios::app);
620 if (!fDumpStream[slot])
621 {
622 ostringstream str;
623 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
624 fMsg.Error(str);
625
626 return;
627 }
628
629 fMsg.Message("Opened file '"+name.str()+"' for writing.");
630 }
631
632 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
633 }
634
635 ofstream fDumpRead; // Stream to possibly dump docket events
636
637 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
638 {
639 // isock = socketID (0-279)
640 // ibyte = #bytes gelesen
641 // event = eventId (oder 0 wenn noch nicht bekannt)
642 // state : 1=finished reading data
643 // 0=reading data
644 // -1=start reading data (header)
645 // -2=start reading data,
646 // eventId not known yet (too little data)
647 // tsec, tusec = time when reading seconds, microseconds
648 //
649 if (!fDebugRead || ibyte==0)
650 return;
651
652 if (!fDumpRead.is_open())
653 {
654 fDumpRead.open("socket_events.txt", ios::app);
655 if (!fDumpRead)
656 {
657 ostringstream str;
658 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
659 fMsg.Error(str);
660
661 return;
662 }
663
664 fMsg.Message("Opened file 'socket_events.txt' for writing.");
665
666 fDumpRead << "# START: " << Time().GetAsStr() << endl;
667 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
668 }
669
670 fDumpRead
671 << setw(2) << state << " "
672 << setw(8) << tsec << " "
673 << setw(9) << tusec << " "
674 << setw(3) << isock << " "
675 << setw(2) << isock/7 << " "
676 << runno << " "
677 << event << " "
678 << ftmevt << " "
679 << ibyte << endl;
680 }
681
682 array<uint16_t,2> fVecRoi;
683
684 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
685 {
686 /*
687 fadhd[i] ist ein array mit den 40 fad-headers
688 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
689
690 event ist die Struktur, die auch die write routine erhaelt;
691 darin sind im header die 'soll-werte' fuer z.B. eventID
692 als auch die ADC-Werte (falls Du die brauchst)
693
694 Wenn die routine einen negativen Wert liefert, wird das event
695 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
696 */
697
698 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
699
700 if (roi!=fVecRoi)
701 {
702 Update(fDimRoi, roi);
703 fVecRoi = roi;
704 }
705
706 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
707 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
708
709 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
710 {
711 // FIXME: Compare with expectations!!!
712 if (ptr->fStartDelimiter==0)
713 {
714 if (ptr==beg)
715 beg++;
716 continue;
717 }
718
719 if (beg->fStatus != ptr->fStatus)
720 {
721 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
722 CloseRunFile(runNr, 0, 0);
723 return -1;
724 }
725
726 if (beg->fRunNumber != ptr->fRunNumber)
727 {
728 fMsg.Error("Inconsistent run number detected.... closing run.");
729 CloseRunFile(runNr, 0, 0);
730 return -1;
731 }
732
733 /*
734 if (beg->fVersion != ptr->fVersion)
735 {
736 Error("Inconsist firmware version detected.... closing run.");
737 CloseRunFile(runNr, 0, 0);
738 break;
739 }
740 if (beg->fEventCounter != ptr->fEventCounter)
741 {
742 Error("Inconsist run number detected.... closing run.");
743 CloseRunFile(runNr, 0, 0);
744 break;
745 }
746 if (beg->fTriggerCounter != ptr->fTriggerCounter)
747 {
748 Error("Inconsist trigger number detected.... closing run.");
749 CloseRunFile(runNr, 0, 0);
750 break;
751 }*/
752
753 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
754 {
755 fMsg.Error("Inconsistent phase shift detected.... closing run.");
756 CloseRunFile(runNr, 0, 0);
757 return -1;
758 }
759
760 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
761 {
762 fMsg.Error("Inconsistent DAC values detected.... closing run.");
763 CloseRunFile(runNr, 0, 0);
764 return -1;
765 }
766
767 if (beg->fTriggerType != ptr->fTriggerType)
768 {
769 fMsg.Error("Inconsistent trigger type detected.... closing run.");
770 CloseRunFile(runNr, 0, 0);
771 return -1;
772 }
773 }
774
775 // check REFCLK_frequency
776 // check consistency with command configuration
777 // how to log errors?
778 // need gotNewRun/closedRun to know it is finished
779
780 static Time oldt(boost::date_time::neg_infin);
781 Time newt;
782
783 // FIXME: Only send events if the have newer run-numbers
784 if (newt<oldt+boost::posix_time::seconds(1))
785 return 0;
786
787 oldt = newt;
788
789 const size_t sz = sizeof(EVENT)+event->Roi*2*1440;
790
791 vector<char> data(sz+event->Roi*2*1440);
792 memcpy(data.data(), event, sizeof(EVENT));
793
794 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
795
796 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
797 fDimRawData.Update(data);
798
799 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
800 CalibData::GetPixelStats(data2.data(), vec, event->Roi);
801
802 fDimEventData.Update(data2);
803
804
805
806
807 return 0;
808 }
809
810 bool IsRunStarted() const
811 {
812 return fExpectedRuns.find(fRunNumber-1)==fExpectedRuns.end();
813 }
814
815 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
816 {
817 // This function is called even when writing is switched off
818 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
819 if (it==fExpectedRuns.end())
820 {
821 ostringstream str;
822 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
823 return;
824 }
825
826 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
827 // return: 0=close scheduled / >0 already closed / <0 does not exist
828
829 fExpectedRuns.erase(it);
830 }
831
832 map<boost::thread::id, string> fLastMessage;
833
834 void factOut(int severity, int err, const char *message)
835 {
836 if (!fDebugLog && severity==99)
837 return;
838
839 ostringstream str;
840 //str << boost::this_thread::get_id() << " ";
841 str << "EventBuilder(";
842 if (err<0)
843 str << "---";
844 else
845 str << err;
846 str << "): " << message;
847
848 string &old = fLastMessage[boost::this_thread::get_id()];
849
850 if (str.str()==old)
851 return;
852 old = str.str();
853
854 fMsg.Update(str, severity);
855 }
856/*
857 void factStat(int64_t *stat, int len)
858 {
859 if (len!=7)
860 {
861 fMsg.Warn("factStat received unknown number of values.");
862 return;
863 }
864
865 vector<int64_t> data(1, g_maxMem);
866 data.insert(data.end(), stat, stat+len);
867
868 static vector<int64_t> last(8);
869 if (data==last)
870 return;
871 last = data;
872
873 fDimStatistics.Update(data);
874
875 // len ist die Laenge des arrays.
876 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
877 // kannst Du pruefen, ob die 100MB voll sind ....
878
879 ostringstream str;
880 str
881 << "Wait=" << stat[0] << " "
882 << "Skip=" << stat[1] << " "
883 << "Del=" << stat[2] << " "
884 << "Tot=" << stat[3] << " "
885 << "Mem=" << stat[4] << "/" << g_maxMem << " "
886 << "Read=" << stat[5] << " "
887 << "Conn=" << stat[6];
888
889 fMsg.Info(str);
890 }
891 */
892
893 void factStat(const EVT_STAT &stat)
894 {
895 fDimStatistics2.Update(stat);
896 /*
897 //some info about what happened since start of program (or last 'reset')
898 uint32_t reset ; //#if increased, reset all counters
899 uint32_t numRead[MAX_SOCK] ; //how often succesfull read from N sockets per loop
900
901 uint64_t gotByte[NBOARDS] ; //#Bytes read per Board
902 uint32_t gotErr[NBOARDS] ; //#Communication Errors per Board
903 uint32_t evtGet; //#new Start of Events read
904 uint32_t evtTot; //#complete Events read
905 uint32_t evtErr; //#Events with Errors
906 uint32_t evtSkp; //#Events incomplete (timeout)
907
908 uint32_t procTot; //#Events processed
909 uint32_t procErr; //#Events showed problem in processing
910 uint32_t procTrg; //#Events accepted by SW trigger
911 uint32_t procSkp; //#Events rejected by SW trigger
912
913 uint32_t feedTot; //#Events used for feedBack system
914 uint32_t feedErr; //#Events rejected by feedBack
915
916 uint32_t wrtTot; //#Events written to disk
917 uint32_t wrtErr; //#Events with write-error
918
919 uint32_t runOpen; //#Runs opened
920 uint32_t runClose; //#Runs closed
921 uint32_t runErr; //#Runs with open/close errors
922
923
924 //info about current connection status
925 uint8_t numConn[NBOARDS] ; //#Sockets succesfully open per board
926 */
927 }
928
929 void factStat(const GUI_STAT &stat)
930 {
931 fDimStatistics1.Update(stat);
932 /*
933 //info about status of the main threads
934 int32_t readStat ; //read thread
935 int32_t procStat ; //processing thread(s)
936 int32_t writStat ; //write thread
937
938 //info about some rates
939 int32_t deltaT ; //time in milli-seconds for rates
940 int32_t readEvt ; //#events read
941 int32_t procEvt ; //#events processed
942 int32_t writEvt ; //#events written
943 int32_t skipEvt ; //#events skipped
944
945 //some info about current state of event buffer (snapspot)
946 int32_t evtBuf; //#Events currently waiting in Buffer
947 uint64_t totMem; //#Bytes available in Buffer
948 uint64_t usdMem; //#Bytes currently used
949 uint64_t maxMem; //max #Bytes used during past Second
950 */
951 }
952
953
954 array<FAD::EventHeader, 40> fVecHeader;
955
956 template<typename T, class S>
957 array<T, 42> Compare(const S *vec, const T *t)
958 {
959 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
960
961 const T *min = NULL;
962 const T *val = NULL;
963 const T *max = NULL;
964
965 array<T, 42> arr;
966
967 bool rc = true;
968 for (int i=0; i<40; i++)
969 {
970 const char *base = reinterpret_cast<const char*>(vec+i);
971 const T *ref = reinterpret_cast<const T*>(base+offset);
972
973 arr[i] = *ref;
974
975 if (gi_NumConnect[i]!=7)
976 {
977 arr[i] = 0;
978 continue;
979 }
980
981 if (!val)
982 {
983 min = ref;
984 val = ref;
985 max = ref;
986 }
987
988 if (*ref<*min)
989 min = ref;
990
991 if (*ref>*max)
992 max = ref;
993
994 if (*val!=*ref)
995 rc = false;
996 }
997
998 arr[40] = val ? *min : 1;
999 arr[41] = val ? *max : 0;
1000
1001 return arr;
1002 }
1003
1004 template<typename T>
1005 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1006 {
1007 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1008
1009 T val = 0;
1010 T rc = 0;
1011
1012 array<T, 42> vec;
1013
1014 bool first = true;
1015
1016 for (int i=0; i<40; i++)
1017 {
1018 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1019 const T *ref = reinterpret_cast<const T*>(base+offset);
1020
1021 vec[i+2] = *ref;
1022
1023 if (gi_NumConnect[i]!=7)
1024 {
1025 vec[i+2] = 0;
1026 continue;
1027 }
1028
1029 if (first)
1030 {
1031 first = false;
1032 val = *ref;
1033 rc = 0;
1034 }
1035
1036 rc |= val^*ref;
1037 }
1038
1039 vec[0] = rc;
1040 vec[1] = val;
1041
1042 return vec;
1043 }
1044
1045 template<typename T, size_t N>
1046 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1047 {
1048// svc.setQuality(vec[40]<=vec[41]);
1049 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1050 svc.updateService();
1051 }
1052
1053 template<typename T>
1054 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1055 {
1056 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1057 for (int i=0; i<40;i++)
1058 cout << " " << data.second[i+3];
1059 cout << endl;
1060 }
1061
1062 vector<uint> fNumConnected;
1063
1064 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1065 {
1066 const uint16_t id = h.Id();
1067 if (id>39)
1068 return;
1069
1070 if (fNumConnected.size()!=40)
1071 fNumConnected.resize(40);
1072
1073 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1074
1075 const bool changed = con!=fNumConnected || !IsThreadRunning();
1076
1077 fNumConnected = con;
1078
1079 const FAD::EventHeader old = fVecHeader[id];
1080 fVecHeader[id] = h;
1081
1082 if (old.fVersion != h.fVersion || changed)
1083 {
1084 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1085
1086 array<float,42> data;
1087 for (int i=0; i<42; i++)
1088 {
1089 ostringstream str;
1090 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1091 data[i] = stof(str.str());
1092 }
1093 Update(fDimFwVersion, data);
1094 }
1095
1096 if (old.fRunNumber != h.fRunNumber || changed)
1097 {
1098 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1099 fDimRunNumber.Update(run);
1100 }
1101
1102 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1103 {
1104 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1105 fDimPrescaler.Update(pre);
1106 }
1107
1108 if (old.fDNA != h.fDNA || changed)
1109 {
1110 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1111 Update(fDimDNA, dna, 40);
1112 }
1113
1114 if (old.fStatus != h.fStatus || changed)
1115 {
1116 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1117 Update(fDimStatus, sts);
1118 }
1119
1120 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1121 {
1122 array<uint16_t, FAD::kNumDac*42> dacs;
1123
1124 for (int i=0; i<FAD::kNumDac; i++)
1125 {
1126 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1127 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1128 }
1129
1130 Update(fDimDac, dacs);
1131 }
1132
1133 // -----------
1134
1135 static Time oldt(boost::date_time::neg_infin);
1136 Time newt;
1137
1138 if (newt>oldt+boost::posix_time::seconds(1))
1139 {
1140 oldt = newt;
1141
1142 // --- RefClock
1143
1144 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1145 Update(fDimRefClock, clk);
1146
1147 // --- Temperatures
1148
1149 const array<int16_t,42> tmp[4] =
1150 {
1151 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1152 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1153 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1154 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1155 };
1156
1157 vector<int16_t> data;
1158 data.reserve(82);
1159 data.push_back(tmp[0][40]); // min: 0
1160 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1161 data.push_back(tmp[0][41]); // max: 41
1162 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1163
1164 for (int j=1; j<=3; j++)
1165 {
1166 const array<int16_t,42> &ref = tmp[j];
1167
1168 // Gloabl min
1169 if (ref[40]<data[0]) // 40=min
1170 data[0] = ref[40];
1171
1172 // Global max
1173 if (ref[41]>data[41]) // 41=max
1174 data[41] = ref[41];
1175
1176 for (int i=0; i<40; i++)
1177 {
1178 // min per board
1179 if (ref[i]<data[i+1]) // data: 1-40
1180 data[i+1] = ref[i]; // ref: 0-39
1181
1182 // max per board
1183 if (ref[i]>data[i+42]) // data: 42-81
1184 data[i+42] = ref[i]; // ref: 0-39
1185 }
1186
1187
1188 }
1189
1190 vector<float> deg(82); // 0: global min, 1-40: min
1191 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1192 deg[i] = data[i]/16.;
1193 fDimTemperature.Update(deg);
1194 }
1195
1196 /*
1197 uint16_t fTriggerType;
1198 uint32_t fTriggerId;
1199 uint32_t fEventCounter;
1200 uint16_t fAdcClockPhaseShift;
1201 uint16_t fNumTriggersToGenerate;
1202 uint16_t fTriggerGeneratorPrescaler;
1203 uint32_t fTimeStamp;
1204 int16_t fTempDrs[kNumTemp]; // In units of 1/16 deg(?)
1205 uint16_t fDac[kNumDac];
1206 */
1207 }
1208};
1209
1210EventBuilderWrapper *EventBuilderWrapper::This = 0;
1211
1212// ----------- Event builder callbacks implementation ---------------
1213extern "C"
1214{
1215 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1216 {
1217 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1218 }
1219
1220 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1221 {
1222 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1223 }
1224
1225 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1226 {
1227 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1228 }
1229
1230 void factOut(int severity, int err, const char *message)
1231 {
1232 EventBuilderWrapper::This->factOut(severity, err, message);
1233 }
1234
1235 void factStat(GUI_STAT stat)
1236 {
1237 EventBuilderWrapper::This->factStat(stat);
1238 }
1239
1240 void factStatNew(EVT_STAT stat)
1241 {
1242 EventBuilderWrapper::This->factStat(stat);
1243 }
1244
1245 void debugHead(int socket, int/*board*/, void *buf)
1246 {
1247 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1248
1249 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1250 }
1251
1252 void debugStream(int isock, void *buf, int len)
1253 {
1254 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1255 }
1256
1257 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1258 {
1259 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1260 }
1261
1262 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
1263 {
1264 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event);
1265 }
1266
1267 void gotNewRun(int runnr, PEVNT_HEADER *headers)
1268 {
1269 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1270 }
1271
1272 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer)
1273 {
1274 return 100;
1275 }
1276
1277}
1278
1279#endif
Note: See TracBrowser for help on using the repository browser.