source: trunk/FACT++/src/EventBuilderWrapper.h@ 11866

Last change on this file since 11866 was 11837, checked in by tbretz, 15 years ago
Changed Dim services to ensure they send the time.
File size: 38.0 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/date_time/posix_time/posix_time_types.hpp>
13
14#include "DimWriteStatistics.h"
15
16#include "DataCalib.h"
17#include "DataWriteRaw.h"
18
19#ifdef HAVE_FITS
20#include "DataWriteFits.h"
21#else
22#define DataWriteFits DataWriteRaw
23#endif
24
25namespace ba = boost::asio;
26namespace bs = boost::system;
27
28using ba::ip::tcp;
29
30using namespace std;
31
32// ========================================================================
33
34#include "EventBuilder.h"
35
36extern "C" {
37 extern void StartEvtBuild();
38 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
39}
40
41// ========================================================================
42
43class EventBuilderWrapper
44{
45public:
46 // FIXME
47 static EventBuilderWrapper *This;
48
49 MessageImp &fMsg;
50
51private:
52 boost::thread fThread;
53
54 enum CommandStates_t // g_runStat
55 {
56 kAbort = -2, // quit as soon as possible ('abort')
57 kExit = -1, // stop reading, quit when buffered events done ('exit')
58 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
59 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
60 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
61 kModeFlush = 10, // read data from camera, but skip them ('flush')
62 kModeTest = 20, // read data and process them, but do not write to disk ('test')
63 kModeFlag = 30, // read data, process and write all to disk ('flag')
64 kModeRun = 40, // read data, process and write selected to disk ('run')
65 };
66
67 enum
68 {
69 kCurrent = 0,
70 kTotal = 1,
71 kEventId = 2,
72 kTriggerId = 3,
73 };
74
75 enum FileFormat_t
76 {
77 kNone = 0,
78 kDebug,
79 kFits,
80 kRaw,
81 kCalib
82 };
83
84 FileFormat_t fFileFormat;
85
86
87 uint32_t fMaxRun;
88 uint32_t fLastOpened;
89 uint32_t fLastClosed;
90 uint32_t fNumEvts[4];
91
92 DimWriteStatistics fDimWriteStats;
93 DimDescribedService fDimRuns;
94 DimDescribedService fDimEvents;
95 DimDescribedService fDimRawData;
96 DimDescribedService fDimEventData;
97 DimDescribedService fDimFwVersion;
98 DimDescribedService fDimRunNumber;
99 DimDescribedService fDimStatus;
100 DimDescribedService fDimDNA;
101 DimDescribedService fDimTemperature;
102 DimDescribedService fDimPrescaler;
103 DimDescribedService fDimRefClock;
104 DimDescribedService fDimRoi;
105 DimDescribedService fDimDac;
106 DimDescribedService fDimDrsCalibration;
107 DimDescribedService fDimStatistics1;
108 DimDescribedService fDimStatistics2;
109
110 bool fDebugStream;
111 bool fDebugRead;
112 bool fDebugLog;
113
114 uint32_t fRunNumber;
115
116 void InitRunNumber()
117 {
118 // FIXME: Add a check that we are not too close to noon!
119 //const int night = Time().NightAsInt();
120
121 fRunNumber = 1000;
122
123 while (--fRunNumber>0)
124 {
125 const string name = DataProcessorImp::FormFileName(fRunNumber, "");
126
127 if (access((name+"bin").c_str(), F_OK) == 0)
128 break;
129 if (access((name+"fits").c_str(), F_OK) == 0)
130 break;
131 if (access((name+"drs.fits").c_str(), F_OK) == 0)
132 break;
133 }
134
135 fRunNumber++;
136
137 ostringstream str;
138 str << "Starting with run number " << fRunNumber;
139 fMsg.Message(str);
140
141 fMsg.Info(" ==> TODO: Run-number detection doesn't work when noon passes!");
142 fMsg.Info(" ==> TODO: Crosscheck with database!");
143 }
144
145public:
146 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
147 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
148 fDimWriteStats ("FAD_CONTROL", imp),
149 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
150 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
151 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
152 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
153 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
154 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
155 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
156 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
157 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
158 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
159 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
160 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
161 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
162 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560", ""),
163 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
164 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
165 fDebugStream(false), fDebugRead(false), fDebugLog(false)
166 {
167 if (This)
168 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
169
170 This = this;
171
172 memset(fNumEvts, 0, sizeof(fNumEvts));
173
174 fDimEvents.Update(fNumEvts);
175
176 for (size_t i=0; i<40; i++)
177 ConnectSlot(i, tcp::endpoint());
178
179 InitRunNumber();
180 }
181 virtual ~EventBuilderWrapper()
182 {
183 Abort();
184
185 // FIXME: Used timed_join and abort afterwards
186 // What's the maximum time the eb need to abort?
187 fThread.join();
188 //ffMsg.Info("EventBuilder stopped.");
189
190 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
191 delete *it;
192 }
193
194 struct RunDescription
195 {
196 uint32_t maxtime;
197 uint32_t maxevt;
198
199 string name;
200
201 FAD::Configuration reference;
202
203 bool started;
204 };
205
206 map<uint32_t, RunDescription> fExpectedRuns;
207
208 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
209 {
210 if (maxtime<=0 || maxtime>24*60*60)
211 maxtime = 24*60*60;
212 if (maxevt<=0 || maxevt>INT32_MAX)
213 maxevt = INT32_MAX;
214
215 const RunDescription descr =
216 {
217 uint32_t(maxtime),
218 uint32_t(maxevt),
219 ref.first,
220 ref.second,
221 false
222 };
223
224 // FIMXE: Maybe reset an event counter so that the mcp can count events?
225
226 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
227
228 fExpectedRuns[fRunNumber] = descr;
229 return fRunNumber++;
230 }
231
232 bool IsThreadRunning()
233 {
234 return !fThread.timed_join(boost::posix_time::microseconds(0));
235 }
236
237 void SetMaxMemory(unsigned int mb) const
238 {
239 /*
240 if (mb*1000000<GetUsedMemory())
241 {
242 // ffMsg.Warn("...");
243 return;
244 }*/
245
246 g_maxMem = size_t(mb)*1000000;
247 }
248
249 void StartThread(const vector<tcp::endpoint> &addr)
250 {
251 if (IsThreadRunning())
252 {
253 fMsg.Warn("Start - EventBuilder still running");
254 return;
255 }
256
257 fLastMessage.clear();
258
259 for (size_t i=0; i<40; i++)
260 ConnectSlot(i, addr[i]);
261
262 g_runStat = kModeRun;
263
264 fMsg.Message("Starting EventBuilder thread");
265
266 fThread = boost::thread(StartEvtBuild);
267 }
268 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
269 {
270 if (i>39)
271 return;
272
273 if (addr==tcp::endpoint())
274 {
275 DisconnectSlot(i);
276 return;
277 }
278
279 g_port[i].sockAddr.sin_family = AF_INET;
280 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
281 g_port[i].sockAddr.sin_port = htons(addr.port());
282 // In this order
283 g_port[i].sockDef = 1;
284 }
285 void DisconnectSlot(unsigned int i)
286 {
287 if (i>39)
288 return;
289
290 g_port[i].sockDef = 0;
291 // In this order
292 g_port[i].sockAddr.sin_family = AF_INET;
293 g_port[i].sockAddr.sin_addr.s_addr = 0;
294 g_port[i].sockAddr.sin_port = 0;
295 }
296 void IgnoreSlot(unsigned int i)
297 {
298 if (i>39)
299 return;
300 if (g_port[i].sockAddr.sin_port==0)
301 return;
302
303 g_port[i].sockDef = -1;
304 }
305
306
307 void Abort()
308 {
309 fMsg.Message("Signal abort to EventBuilder thread...");
310 g_runStat = kAbort;
311 }
312
313 void ResetThread(bool soft)
314 {
315 /*
316 if (g_reset > 0)
317
318 * suspend reading
319 * reset = g_reset;
320 * g_reset=0
321
322 * reset% 10
323 == 0 leave event Buffers as they are
324 == 1 let all buffers drain (write (incomplete) events)
325 > 1 flush all buffers (do not write buffered events)
326
327 * (reset/10)%10
328 > 0 close all sockets and destroy them (also free the
329 allocated read-buffers)
330 recreate before resuming operation
331 [ this is more than just close/open that can be
332 triggered by e.g. close/open the base-socket ]
333
334 * (reset/100)%10
335 > 0 close all open run-files
336
337 * (reset/1000)
338 sleep so many seconds before resuming operation
339 (does not (yet) take into account time left when waiting
340 for buffers getting empty ...)
341
342 * resume_reading
343
344 */
345 fMsg.Message("Signal reset to EventBuilder thread...");
346 g_reset = soft ? 101 : 102;
347 }
348
349 void Exit()
350 {
351 fMsg.Message("Signal exit to EventBuilder thread...");
352 g_runStat = kExit;
353 }
354
355 /*
356 void Wait()
357 {
358 fThread.join();
359 ffMsg.Message("EventBuilder stopped.");
360 }*/
361
362 void Hybernate() const { g_runStat = kHybernate; }
363 void Sleep() const { g_runStat = kSleep; }
364 void FlushMode() const { g_runStat = kModeFlush; }
365 void TestMode() const { g_runStat = kModeTest; }
366 void FlagMode() const { g_runStat = kModeFlag; }
367 void RunMode() const { g_runStat = kModeRun; }
368
369 // FIXME: To be removed
370 void SetMode(int mode) const { g_runStat = mode; }
371
372 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
373 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
374 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
375 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
376
377 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
378 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
379
380 void SetOutputFormat(FileFormat_t f)
381 {
382 fFileFormat = f;
383 if (fFileFormat==kCalib)
384 {
385 DataCalib::Restart();
386 DataCalib::Update(fDimDrsCalibration);
387 fMsg.Message("Resetted DRS calibration.");
388 }
389 }
390
391 void SetDebugLog(bool b) { fDebugLog = b; }
392
393 void SetDebugStream(bool b)
394 {
395 fDebugStream = b;
396 if (b)
397 return;
398
399 for (int i=0; i<40; i++)
400 {
401 if (!fDumpStream[i].is_open())
402 continue;
403
404 fDumpStream[i].close();
405
406 ostringstream name;
407 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
408 fMsg.Message("Closed file '"+name.str()+"'");
409 }
410 }
411
412 void SetDebugRead(bool b)
413 {
414 fDebugRead = b;
415 if (b || !fDumpRead.is_open())
416 return;
417
418 fDumpRead.close();
419 fMsg.Message("Closed file 'socket_events.txt'");
420 }
421
422// size_t GetUsedMemory() const { return gi_usedMem; }
423
424 void LoadDrsCalibration(const char *fname)
425 {
426 if (!DataCalib::ReadFits(fname, fMsg))
427 return;
428 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
429 DataCalib::Update(fDimDrsCalibration);
430 }
431
432 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
433
434
435 /*
436 struct OpenFileToDim
437 {
438 int code;
439 char fileName[FILENAME_MAX];
440 };
441
442 SignalRunOpened(runid, filename);
443 // Send num open files
444 // Send runid, (more info about the run?), filename via dim
445
446 SignalEvtWritten(runid);
447 // Send num events written of newest file
448
449 SignalRunClose(runid);
450 // Send new num open files
451 // Send empty file-name if no file is open
452
453 */
454
455 // -------------- Mapped event builder callbacks ------------------
456
457 void UpdateRuns(const string &fname="")
458 {
459 uint32_t values[5] =
460 {
461 static_cast<uint32_t>(fFiles.size()),
462 0xffffffff,
463 0,
464 fLastOpened,
465 fLastClosed
466 };
467
468 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
469 it!=fFiles.end(); it++)
470 {
471 const DataProcessorImp *file = *it;
472
473 if (file->GetRunId()<values[1])
474 values[1] = file->GetRunId();
475
476 if (file->GetRunId()>values[2])
477 values[2] = file->GetRunId();
478 }
479
480 fMaxRun = values[2];
481
482 vector<char> data(sizeof(values)+fname.size()+1);
483 memcpy(data.data(), values, sizeof(values));
484 strcpy(data.data()+sizeof(values), fname.c_str());
485
486 fDimRuns.Update(data);
487 }
488
489 vector<DataProcessorImp*> fFiles;
490
491 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
492 {
493 fMsg.Info(" ==> TODO: Update run configuration in database!");
494 fMsg.Info(" ==> TODO: Remove run from fExpected runs in case of failure!");
495 fMsg.Info(" ==> TODO: Write information from fTargetConfig to header!");
496
497 // Check if file already exists...
498 DataProcessorImp *file = 0;
499 switch (fFileFormat)
500 {
501 case kNone: file = new DataDump(runid, fMsg); break;
502 case kDebug: file = new DataDebug(runid, fMsg); break;
503 case kFits: file = new DataWriteFits(runid, fMsg); break;
504 case kRaw: file = new DataWriteRaw(runid, fMsg); break;
505 case kCalib: file = new DataCalib(runid, fDimDrsCalibration, fMsg); break;
506 }
507
508 try
509 {
510 if (!file->Open(h))
511 return 0;
512 }
513 catch (const exception &e)
514 {
515 fMsg.Error("Exception trying to open file: "+string(e.what()));
516 return 0;
517 }
518
519 fFiles.push_back(file);
520
521 ostringstream str;
522 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
523 fMsg.Info(str);
524
525 fDimWriteStats.FileOpened(file->GetFileName());
526
527 fLastOpened = runid;
528 UpdateRuns(file->GetFileName());
529
530 fNumEvts[kEventId] = 0;
531 fNumEvts[kTriggerId] = 0;
532
533 fNumEvts[kCurrent] = 0;
534 fDimEvents.Update(fNumEvts);
535 // fDimCurrentEvent.Update(uint32_t(0));
536
537 return reinterpret_cast<FileHandle_t>(file);
538 }
539
540 int runWrite(FileHandle_t handler, EVENT *e, size_t)
541 {
542 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
543
544 if (!file->WriteEvt(e))
545 return -1;
546
547 if (file->GetRunId()==fMaxRun)
548 {
549 fNumEvts[kCurrent]++;
550 fNumEvts[kEventId] = e->EventNum;
551 fNumEvts[kTriggerId] = e->TriggerNum;
552 }
553
554 fNumEvts[kTotal]++;
555
556 static Time oldt(boost::date_time::neg_infin);
557 Time newt;
558 if (newt>oldt+boost::posix_time::seconds(1))
559 {
560 fDimEvents.Update(fNumEvts);
561 oldt = newt;
562 }
563
564
565 // ===> SignalEvtWritten(runid);
566 // Send num events written of newest file
567
568 /* close run runId (all all runs if runId=0) */
569 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
570 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
571
572 return 0;
573 }
574
575 virtual void CloseRun(uint32_t runid) { }
576
577 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
578 {
579 fMsg.Info(" ==> TODO: Update run configuration in database!");
580 fMsg.Info(" ==> TODO: Remove run from fExpected runs!");
581
582 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
583
584 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
585 if (it==fFiles.end())
586 {
587 ostringstream str;
588 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
589 fMsg.Fatal(str);
590 return -1;
591 }
592
593 fFiles.erase(it);
594
595 fLastClosed = file->GetRunId();
596 CloseRun(fLastClosed);
597 UpdateRuns();
598
599 fDimEvents.Update(fNumEvts);
600
601 const bool rc = file->Close(tail);
602 if (!rc)
603 {
604 // Error message
605 }
606
607 ostringstream str;
608 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
609 fMsg.Info(str);
610
611 delete file;
612
613 // ==> SignalRunClose(runid);
614 // Send new num open files
615 // Send empty file-name if no file is open
616
617 return rc ? 0 : -1;
618 }
619
620 ofstream fDumpStream[40];
621
622 void debugStream(int isock, void *buf, int len)
623 {
624 if (!fDebugStream)
625 return;
626
627 const int slot = isock/7;
628 if (slot<0 || slot>39)
629 return;
630
631 if (!fDumpStream[slot].is_open())
632 {
633 ostringstream name;
634 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
635
636 fDumpStream[slot].open(name.str().c_str(), ios::app);
637 if (!fDumpStream[slot])
638 {
639 ostringstream str;
640 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
641 fMsg.Error(str);
642
643 return;
644 }
645
646 fMsg.Message("Opened file '"+name.str()+"' for writing.");
647 }
648
649 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
650 }
651
652 ofstream fDumpRead; // Stream to possibly dump docket events
653
654 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
655 {
656 // isock = socketID (0-279)
657 // ibyte = #bytes gelesen
658 // event = eventId (oder 0 wenn noch nicht bekannt)
659 // state : 1=finished reading data
660 // 0=reading data
661 // -1=start reading data (header)
662 // -2=start reading data,
663 // eventId not known yet (too little data)
664 // tsec, tusec = time when reading seconds, microseconds
665 //
666 if (!fDebugRead || ibyte==0)
667 return;
668
669 if (!fDumpRead.is_open())
670 {
671 fDumpRead.open("socket_events.txt", ios::app);
672 if (!fDumpRead)
673 {
674 ostringstream str;
675 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
676 fMsg.Error(str);
677
678 return;
679 }
680
681 fMsg.Message("Opened file 'socket_events.txt' for writing.");
682
683 fDumpRead << "# START: " << Time().GetAsStr() << endl;
684 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
685 }
686
687 fDumpRead
688 << setw(2) << state << " "
689 << setw(8) << tsec << " "
690 << setw(9) << tusec << " "
691 << setw(3) << isock << " "
692 << setw(2) << isock/7 << " "
693 << runno << " "
694 << event << " "
695 << ftmevt << " "
696 << ibyte << endl;
697 }
698
699 array<uint16_t,2> fVecRoi;
700
701 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
702 {
703 /*
704 fadhd[i] ist ein array mit den 40 fad-headers
705 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
706
707 event ist die Struktur, die auch die write routine erhaelt;
708 darin sind im header die 'soll-werte' fuer z.B. eventID
709 als auch die ADC-Werte (falls Du die brauchst)
710
711 Wenn die routine einen negativen Wert liefert, wird das event
712 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
713 */
714
715 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
716
717 if (roi!=fVecRoi)
718 {
719 Update(fDimRoi, roi);
720 fVecRoi = roi;
721 }
722
723 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
724 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
725
726 // FIMXE: Compare with target configuration
727
728 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
729 {
730 // FIXME: Compare with expectations!!!
731 if (ptr->fStartDelimiter==0)
732 {
733 if (ptr==beg)
734 beg++;
735 continue;
736 }
737
738 if (beg->fStatus != ptr->fStatus)
739 {
740 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
741 CloseRunFile(runNr, 0, 0);
742 return -1;
743 }
744
745 if (beg->fRunNumber != ptr->fRunNumber)
746 {
747 fMsg.Error("Inconsistent run number detected.... closing run.");
748 CloseRunFile(runNr, 0, 0);
749 return -1;
750 }
751
752 /*
753 if (beg->fVersion != ptr->fVersion)
754 {
755 Error("Inconsist firmware version detected.... closing run.");
756 CloseRunFile(runNr, 0, 0);
757 break;
758 }
759 if (beg->fEventCounter != ptr->fEventCounter)
760 {
761 Error("Inconsist run number detected.... closing run.");
762 CloseRunFile(runNr, 0, 0);
763 break;
764 }
765 if (beg->fTriggerCounter != ptr->fTriggerCounter)
766 {
767 Error("Inconsist trigger number detected.... closing run.");
768 CloseRunFile(runNr, 0, 0);
769 break;
770 }*/
771
772 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
773 {
774 fMsg.Error("Inconsistent phase shift detected.... closing run.");
775 CloseRunFile(runNr, 0, 0);
776 return -1;
777 }
778
779 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
780 {
781 fMsg.Error("Inconsistent DAC values detected.... closing run.");
782 CloseRunFile(runNr, 0, 0);
783 return -1;
784 }
785
786 if (beg->fTriggerType != ptr->fTriggerType)
787 {
788 fMsg.Error("Inconsistent trigger type detected.... closing run.");
789 CloseRunFile(runNr, 0, 0);
790 return -1;
791 }
792 }
793
794 // check REFCLK_frequency
795 // check consistency with command configuration
796 // how to log errors?
797 // need gotNewRun/closedRun to know it is finished
798
799 static Time oldt(boost::date_time::neg_infin);
800 Time newt;
801
802 // FIXME: Only send events if the have newer run-numbers
803 if (newt<oldt+boost::posix_time::seconds(1))
804 return 0;
805
806 oldt = newt;
807
808 const size_t sz = sizeof(EVENT)+event->Roi*2*1440;
809
810 vector<char> data(sz+event->Roi*2*1440);
811 memcpy(data.data(), event, sizeof(EVENT));
812
813 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
814
815 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
816 fDimRawData.Update(data);
817
818 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
819 CalibData::GetPixelStats(data2.data(), vec, event->Roi);
820
821 fDimEventData.Update(data2);
822
823
824
825
826 return 0;
827 }
828
829 bool IsRunStarted() const
830 {
831 const map<uint32_t,RunDescription>::const_iterator it = fExpectedRuns.find(fRunNumber-1);
832 return it==fExpectedRuns.end();// ? true : it->second.started;
833 }
834
835 uint32_t GetRunNumber() const
836 {
837 return fRunNumber;
838 }
839
840 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
841 {
842 // This function is called even when writing is switched off
843 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
844 if (it==fExpectedRuns.end())
845 {
846 ostringstream str;
847 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
848 return;
849 }
850
851 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
852 // return: 0=close scheduled / >0 already closed / <0 does not exist
853
854 // FIXME: Move configuration from expected runs to runs which will soon
855 // be opened/closed
856
857 it->second.started = true;
858
859 fExpectedRuns.erase(it);
860 }
861
862 map<boost::thread::id, string> fLastMessage;
863
864 void factOut(int severity, int err, const char *message)
865 {
866 if (!fDebugLog && severity==99)
867 return;
868
869 ostringstream str;
870 //str << boost::this_thread::get_id() << " ";
871 str << "EventBuilder(";
872 if (err<0)
873 str << "---";
874 else
875 str << err;
876 str << "): " << message;
877
878 string &old = fLastMessage[boost::this_thread::get_id()];
879
880 if (str.str()==old)
881 return;
882 old = str.str();
883
884 fMsg.Update(str, severity);
885 }
886/*
887 void factStat(int64_t *stat, int len)
888 {
889 if (len!=7)
890 {
891 fMsg.Warn("factStat received unknown number of values.");
892 return;
893 }
894
895 vector<int64_t> data(1, g_maxMem);
896 data.insert(data.end(), stat, stat+len);
897
898 static vector<int64_t> last(8);
899 if (data==last)
900 return;
901 last = data;
902
903 fDimStatistics.Update(data);
904
905 // len ist die Laenge des arrays.
906 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
907 // kannst Du pruefen, ob die 100MB voll sind ....
908
909 ostringstream str;
910 str
911 << "Wait=" << stat[0] << " "
912 << "Skip=" << stat[1] << " "
913 << "Del=" << stat[2] << " "
914 << "Tot=" << stat[3] << " "
915 << "Mem=" << stat[4] << "/" << g_maxMem << " "
916 << "Read=" << stat[5] << " "
917 << "Conn=" << stat[6];
918
919 fMsg.Info(str);
920 }
921 */
922
923 void factStat(const EVT_STAT &stat)
924 {
925 fDimStatistics2.Update(stat);
926 /*
927 //some info about what happened since start of program (or last 'reset')
928 uint32_t reset ; //#if increased, reset all counters
929 uint32_t numRead[MAX_SOCK] ; //how often succesfull read from N sockets per loop
930
931 uint64_t gotByte[NBOARDS] ; //#Bytes read per Board
932 uint32_t gotErr[NBOARDS] ; //#Communication Errors per Board
933 uint32_t evtGet; //#new Start of Events read
934 uint32_t evtTot; //#complete Events read
935 uint32_t evtErr; //#Events with Errors
936 uint32_t evtSkp; //#Events incomplete (timeout)
937
938 uint32_t procTot; //#Events processed
939 uint32_t procErr; //#Events showed problem in processing
940 uint32_t procTrg; //#Events accepted by SW trigger
941 uint32_t procSkp; //#Events rejected by SW trigger
942
943 uint32_t feedTot; //#Events used for feedBack system
944 uint32_t feedErr; //#Events rejected by feedBack
945
946 uint32_t wrtTot; //#Events written to disk
947 uint32_t wrtErr; //#Events with write-error
948
949 uint32_t runOpen; //#Runs opened
950 uint32_t runClose; //#Runs closed
951 uint32_t runErr; //#Runs with open/close errors
952
953
954 //info about current connection status
955 uint8_t numConn[NBOARDS] ; //#Sockets succesfully open per board
956 */
957 }
958
959 void factStat(const GUI_STAT &stat)
960 {
961 fDimStatistics1.Update(stat);
962 /*
963 //info about status of the main threads
964 int32_t readStat ; //read thread
965 int32_t procStat ; //processing thread(s)
966 int32_t writStat ; //write thread
967
968 //info about some rates
969 int32_t deltaT ; //time in milli-seconds for rates
970 int32_t readEvt ; //#events read
971 int32_t procEvt ; //#events processed
972 int32_t writEvt ; //#events written
973 int32_t skipEvt ; //#events skipped
974
975 //some info about current state of event buffer (snapspot)
976 int32_t evtBuf; //#Events currently waiting in Buffer
977 uint64_t totMem; //#Bytes available in Buffer
978 uint64_t usdMem; //#Bytes currently used
979 uint64_t maxMem; //max #Bytes used during past Second
980 */
981 }
982
983
984 array<FAD::EventHeader, 40> fVecHeader;
985
986 template<typename T, class S>
987 array<T, 42> Compare(const S *vec, const T *t)
988 {
989 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
990
991 const T *min = NULL;
992 const T *val = NULL;
993 const T *max = NULL;
994
995 array<T, 42> arr;
996
997 bool rc = true;
998 for (int i=0; i<40; i++)
999 {
1000 const char *base = reinterpret_cast<const char*>(vec+i);
1001 const T *ref = reinterpret_cast<const T*>(base+offset);
1002
1003 arr[i] = *ref;
1004
1005 if (gi_NumConnect[i]!=7)
1006 {
1007 arr[i] = 0;
1008 continue;
1009 }
1010
1011 if (!val)
1012 {
1013 min = ref;
1014 val = ref;
1015 max = ref;
1016 }
1017
1018 if (*ref<*min)
1019 min = ref;
1020
1021 if (*ref>*max)
1022 max = ref;
1023
1024 if (*val!=*ref)
1025 rc = false;
1026 }
1027
1028 arr[40] = val ? *min : 1;
1029 arr[41] = val ? *max : 0;
1030
1031 return arr;
1032 }
1033
1034 template<typename T>
1035 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1036 {
1037 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1038
1039 T val = 0;
1040 T rc = 0;
1041
1042 array<T, 42> vec;
1043
1044 bool first = true;
1045
1046 for (int i=0; i<40; i++)
1047 {
1048 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1049 const T *ref = reinterpret_cast<const T*>(base+offset);
1050
1051 vec[i+2] = *ref;
1052
1053 if (gi_NumConnect[i]!=7)
1054 {
1055 vec[i+2] = 0;
1056 continue;
1057 }
1058
1059 if (first)
1060 {
1061 first = false;
1062 val = *ref;
1063 rc = 0;
1064 }
1065
1066 rc |= val^*ref;
1067 }
1068
1069 vec[0] = rc;
1070 vec[1] = val;
1071
1072 return vec;
1073 }
1074
1075 template<typename T, size_t N>
1076 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1077 {
1078// svc.setQuality(vec[40]<=vec[41]);
1079 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1080 svc.Update();
1081 }
1082
1083 template<typename T>
1084 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1085 {
1086 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1087 for (int i=0; i<40;i++)
1088 cout << " " << data.second[i+3];
1089 cout << endl;
1090 }
1091
1092 vector<uint> fNumConnected;
1093
1094 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1095 {
1096 const uint16_t id = h.Id();
1097 if (id>39)
1098 return;
1099
1100 if (fNumConnected.size()!=40)
1101 fNumConnected.resize(40);
1102
1103 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1104
1105 const bool changed = con!=fNumConnected || !IsThreadRunning();
1106
1107 fNumConnected = con;
1108
1109 const FAD::EventHeader old = fVecHeader[id];
1110 fVecHeader[id] = h;
1111
1112 if (old.fVersion != h.fVersion || changed)
1113 {
1114 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1115
1116 array<float,42> data;
1117 for (int i=0; i<42; i++)
1118 {
1119 ostringstream str;
1120 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1121 data[i] = stof(str.str());
1122 }
1123 Update(fDimFwVersion, data);
1124 }
1125
1126 if (old.fRunNumber != h.fRunNumber || changed)
1127 {
1128 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1129 fDimRunNumber.Update(run);
1130 }
1131
1132 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1133 {
1134 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1135 fDimPrescaler.Update(pre);
1136 }
1137
1138 if (old.fDNA != h.fDNA || changed)
1139 {
1140 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1141 Update(fDimDNA, dna, 40);
1142 }
1143
1144 if (old.fStatus != h.fStatus || changed)
1145 {
1146 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1147 Update(fDimStatus, sts);
1148 }
1149
1150 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1151 {
1152 array<uint16_t, FAD::kNumDac*42> dacs;
1153
1154 for (int i=0; i<FAD::kNumDac; i++)
1155 {
1156 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1157 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1158 }
1159
1160 Update(fDimDac, dacs);
1161 }
1162
1163 // -----------
1164
1165 static Time oldt(boost::date_time::neg_infin);
1166 Time newt;
1167
1168 if (newt>oldt+boost::posix_time::seconds(1))
1169 {
1170 oldt = newt;
1171
1172 // --- RefClock
1173
1174 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1175 Update(fDimRefClock, clk);
1176
1177 // --- Temperatures
1178
1179 const array<int16_t,42> tmp[4] =
1180 {
1181 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1182 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1183 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1184 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1185 };
1186
1187 vector<int16_t> data;
1188 data.reserve(82);
1189 data.push_back(tmp[0][40]); // min: 0
1190 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1191 data.push_back(tmp[0][41]); // max: 41
1192 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1193
1194 for (int j=1; j<=3; j++)
1195 {
1196 const array<int16_t,42> &ref = tmp[j];
1197
1198 // Gloabl min
1199 if (ref[40]<data[0]) // 40=min
1200 data[0] = ref[40];
1201
1202 // Global max
1203 if (ref[41]>data[41]) // 41=max
1204 data[41] = ref[41];
1205
1206 for (int i=0; i<40; i++)
1207 {
1208 // min per board
1209 if (ref[i]<data[i+1]) // data: 1-40
1210 data[i+1] = ref[i]; // ref: 0-39
1211
1212 // max per board
1213 if (ref[i]>data[i+42]) // data: 42-81
1214 data[i+42] = ref[i]; // ref: 0-39
1215 }
1216
1217
1218 }
1219
1220 vector<float> deg(82); // 0: global min, 1-40: min
1221 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1222 deg[i] = data[i]/16.;
1223 fDimTemperature.Update(deg);
1224 }
1225
1226 /*
1227 uint16_t fTriggerType;
1228 uint32_t fTriggerId;
1229 uint32_t fEventCounter;
1230 uint16_t fAdcClockPhaseShift;
1231 uint16_t fNumTriggersToGenerate;
1232 uint16_t fTriggerGeneratorPrescaler;
1233 uint32_t fTimeStamp;
1234 int16_t fTempDrs[kNumTemp]; // In units of 1/16 deg(?)
1235 uint16_t fDac[kNumDac];
1236 */
1237 }
1238};
1239
1240EventBuilderWrapper *EventBuilderWrapper::This = 0;
1241
1242// ----------- Event builder callbacks implementation ---------------
1243extern "C"
1244{
1245 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1246 {
1247 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1248 }
1249
1250 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1251 {
1252 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1253 }
1254
1255 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1256 {
1257 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1258 }
1259
1260 void factOut(int severity, int err, const char *message)
1261 {
1262 EventBuilderWrapper::This->factOut(severity, err, message);
1263 }
1264
1265 void factStat(GUI_STAT stat)
1266 {
1267 EventBuilderWrapper::This->factStat(stat);
1268 }
1269
1270 void factStatNew(EVT_STAT stat)
1271 {
1272 EventBuilderWrapper::This->factStat(stat);
1273 }
1274
1275 void debugHead(int socket, int/*board*/, void *buf)
1276 {
1277 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1278
1279 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1280 }
1281
1282 void debugStream(int isock, void *buf, int len)
1283 {
1284 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1285 }
1286
1287 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1288 {
1289 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1290 }
1291
1292 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
1293 {
1294 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event);
1295 }
1296
1297 void gotNewRun(int runnr, PEVNT_HEADER *headers)
1298 {
1299 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1300 }
1301
1302 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer)
1303 {
1304 return 100;
1305 }
1306
1307}
1308
1309#endif
Note: See TracBrowser for help on using the repository browser.