source: trunk/FACT++/src/EventBuilderWrapper.h@ 11746

Last change on this file since 11746 was 11725, checked in by tbretz, 13 years ago
Renamed the data processors and moved them to their own files.
File size: 35.2 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/date_time/posix_time/posix_time_types.hpp>
13
14#include "DimWriteStatistics.h"
15
16#include "DataCalib.h"
17#include "DataWriteRaw.h"
18
19#ifdef HAVE_FITS
20#include "DataWriteFits.h"
21#else
22#define DataWriteFits DataWriteRaw
23#endif
24
25namespace ba = boost::asio;
26namespace bs = boost::system;
27
28using ba::ip::tcp;
29
30using namespace std;
31
32// ========================================================================
33
34#include "EventBuilder.h"
35
36extern "C" {
37 extern void StartEvtBuild();
38 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
39}
40
41// ========================================================================
42
43class EventBuilderWrapper
44{
45public:
46 // FIXME
47 static EventBuilderWrapper *This;
48
49 MessageImp &fMsg;
50
51private:
52 boost::thread fThread;
53
54 enum CommandStates_t // g_runStat
55 {
56 kAbort = -2, // quit as soon as possible ('abort')
57 kExit = -1, // stop reading, quit when buffered events done ('exit')
58 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
59 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
60 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
61 kModeFlush = 10, // read data from camera, but skip them ('flush')
62 kModeTest = 20, // read data and process them, but do not write to disk ('test')
63 kModeFlag = 30, // read data, process and write all to disk ('flag')
64 kModeRun = 40, // read data, process and write selected to disk ('run')
65 };
66
67 enum
68 {
69 kCurrent = 0,
70 kTotal = 1,
71 kEventId = 2,
72 kTriggerId = 3,
73 };
74
75 enum FileFormat_t
76 {
77 kNone = 0,
78 kDebug,
79 kFits,
80 kRaw,
81 kCalib
82 };
83
84 FileFormat_t fFileFormat;
85
86
87 uint32_t fMaxRun;
88 uint32_t fLastOpened;
89 uint32_t fLastClosed;
90 uint32_t fNumEvts[4];
91
92 DimWriteStatistics fDimWriteStats;
93 DimDescribedService fDimRuns;
94 DimDescribedService fDimEvents;
95 DimDescribedService fDimEventData;
96 DimDescribedService fDimFwVersion;
97 DimDescribedService fDimRunNumber;
98 DimDescribedService fDimStatus;
99 DimDescribedService fDimDNA;
100 DimDescribedService fDimTemperature;
101 DimDescribedService fDimPrescaler;
102 DimDescribedService fDimRefClock;
103 DimDescribedService fDimRoi;
104 DimDescribedService fDimDac;
105 DimDescribedService fDimDrsCalibration;
106 DimDescribedService fDimStatistics1;
107 DimDescribedService fDimStatistics2;
108
109 bool fDebugStream;
110 bool fDebugRead;
111 bool fDebugLog;
112
113 uint32_t fRunNumber;
114
115 void InitRunNumber()
116 {
117 // FIXME: Add a check that we are not too close to noon!
118 //const int night = Time().NightAsInt();
119
120 fRunNumber = 1000;
121
122 while (--fRunNumber>0)
123 {
124 const string name = DataProcessorImp::FormFileName(fRunNumber, "");
125
126 if (access((name+"bin").c_str(), F_OK) == 0)
127 break;
128 if (access((name+"fits").c_str(), F_OK) == 0)
129 break;
130 if (access((name+"drs.fits").c_str(), F_OK) == 0)
131 break;
132 }
133
134 fRunNumber++;
135
136 ostringstream str;
137 str << "Starting with run number " << fRunNumber;
138 fMsg.Message(str);
139
140 fMsg.Info(" ==> TODO: Run-number detection doesn't work when noon passes!");
141 fMsg.Info(" ==> TODO: Crosscheck with database!");
142 }
143
144public:
145 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
146 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
147 fDimWriteStats ("FAD_CONTROL", imp),
148 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
149 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
150 fDimEventData ("FAD_CONTROL/EVENT_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
151 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
152 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
153 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
154 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
155 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
156 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
157 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
158 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
159 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
160 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560", ""),
161 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
162 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
163 fDebugStream(false), fDebugRead(false), fDebugLog(false)
164 {
165 if (This)
166 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
167
168 This = this;
169
170 memset(fNumEvts, 0, sizeof(fNumEvts));
171
172 fDimEvents.Update(fNumEvts);
173
174 for (size_t i=0; i<40; i++)
175 ConnectSlot(i, tcp::endpoint());
176
177 InitRunNumber();
178 }
179 virtual ~EventBuilderWrapper()
180 {
181 Abort();
182
183 // FIXME: Used timed_join and abort afterwards
184 // What's the maximum time the eb need to abort?
185 fThread.join();
186 //ffMsg.Info("EventBuilder stopped.");
187
188 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
189 delete *it;
190 }
191
192 struct RunDescription
193 {
194 uint32_t maxtime;
195 uint32_t maxevt;
196
197 FAD::Configuration reference;
198 };
199
200 map<uint32_t, RunDescription> fExpectedRuns;
201
202 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const FAD::Configuration &ref)
203 {
204 if (maxtime<=0 || maxtime>24*60*60)
205 maxtime = 24*60*60;
206 if (maxevt<=0 || maxevt>INT32_MAX)
207 maxevt = INT32_MAX;
208
209 const RunDescription descr =
210 {
211 uint32_t(maxtime),
212 uint32_t(maxevt),
213 ref
214 };
215
216 // FIMXE: Maybe reset an event counter so that the mcp can count events?
217
218 fExpectedRuns[fRunNumber] = descr;
219 return fRunNumber++;
220 }
221
222 bool IsThreadRunning()
223 {
224 return !fThread.timed_join(boost::posix_time::microseconds(0));
225 }
226
227 void SetMaxMemory(unsigned int mb) const
228 {
229 /*
230 if (mb*1000000<GetUsedMemory())
231 {
232 // ffMsg.Warn("...");
233 return;
234 }*/
235
236 g_maxMem = size_t(mb)*1000000;
237 }
238
239 void StartThread(const vector<tcp::endpoint> &addr)
240 {
241 if (IsThreadRunning())
242 {
243 fMsg.Warn("Start - EventBuilder still running");
244 return;
245 }
246
247 fLastMessage.clear();
248
249 for (size_t i=0; i<40; i++)
250 ConnectSlot(i, addr[i]);
251
252 g_runStat = kModeRun;
253
254 fMsg.Message("Starting EventBuilder thread");
255
256 fThread = boost::thread(StartEvtBuild);
257 }
258 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
259 {
260 if (i>39)
261 return;
262
263 if (addr==tcp::endpoint())
264 {
265 DisconnectSlot(i);
266 return;
267 }
268
269 g_port[i].sockAddr.sin_family = AF_INET;
270 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
271 g_port[i].sockAddr.sin_port = htons(addr.port());
272 // In this order
273 g_port[i].sockDef = 1;
274 }
275 void DisconnectSlot(unsigned int i)
276 {
277 if (i>39)
278 return;
279
280 g_port[i].sockDef = 0;
281 // In this order
282 g_port[i].sockAddr.sin_family = AF_INET;
283 g_port[i].sockAddr.sin_addr.s_addr = 0;
284 g_port[i].sockAddr.sin_port = 0;
285 }
286 void IgnoreSlot(unsigned int i)
287 {
288 if (i>39)
289 return;
290 if (g_port[i].sockAddr.sin_port==0)
291 return;
292
293 g_port[i].sockDef = -1;
294 }
295
296
297 void Abort()
298 {
299 fMsg.Message("Signal abort to EventBuilder thread...");
300 g_runStat = kAbort;
301 }
302
303 void ResetThread(bool soft)
304 {
305 /*
306 if (g_reset > 0)
307
308 * suspend reading
309 * reset = g_reset;
310 * g_reset=0
311
312 * reset% 10
313 == 0 leave event Buffers as they are
314 == 1 let all buffers drain (write (incomplete) events)
315 > 1 flush all buffers (do not write buffered events)
316
317 * (reset/10)%10
318 > 0 close all sockets and destroy them (also free the
319 allocated read-buffers)
320 recreate before resuming operation
321 [ this is more than just close/open that can be
322 triggered by e.g. close/open the base-socket ]
323
324 * (reset/100)%10
325 > 0 close all open run-files
326
327 * (reset/1000)
328 sleep so many seconds before resuming operation
329 (does not (yet) take into account time left when waiting
330 for buffers getting empty ...)
331
332 * resume_reading
333
334 */
335 fMsg.Message("Signal reset to EventBuilder thread...");
336 g_reset = soft ? 101 : 102;
337 }
338
339 void Exit()
340 {
341 fMsg.Message("Signal exit to EventBuilder thread...");
342 g_runStat = kExit;
343 }
344
345 /*
346 void Wait()
347 {
348 fThread.join();
349 ffMsg.Message("EventBuilder stopped.");
350 }*/
351
352 void Hybernate() const { g_runStat = kHybernate; }
353 void Sleep() const { g_runStat = kSleep; }
354 void FlushMode() const { g_runStat = kModeFlush; }
355 void TestMode() const { g_runStat = kModeTest; }
356 void FlagMode() const { g_runStat = kModeFlag; }
357 void RunMode() const { g_runStat = kModeRun; }
358
359 // FIXME: To be removed
360 void SetMode(int mode) const { g_runStat = mode; }
361
362 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
363 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
364 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
365 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
366
367 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
368 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
369
370 void SetOutputFormat(FileFormat_t f)
371 {
372 fFileFormat = f;
373 if (fFileFormat==kCalib)
374 {
375 DataCalib::Restart();
376 DataCalib::Update(fDimDrsCalibration);
377 fMsg.Message("Resetted DRS calibration.");
378 }
379 }
380
381 void SetDebugLog(bool b) { fDebugLog = b; }
382
383 void SetDebugStream(bool b)
384 {
385 fDebugStream = b;
386 if (b)
387 return;
388
389 for (int i=0; i<40; i++)
390 {
391 if (!fDumpStream[i].is_open())
392 continue;
393
394 fDumpStream[i].close();
395
396 ostringstream name;
397 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
398 fMsg.Message("Closed file '"+name.str()+"'");
399 }
400 }
401
402 void SetDebugRead(bool b)
403 {
404 fDebugRead = b;
405 if (b || !fDumpRead.is_open())
406 return;
407
408 fDumpRead.close();
409 fMsg.Message("Closed file 'socket_events.txt'");
410 }
411
412// size_t GetUsedMemory() const { return gi_usedMem; }
413
414 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
415
416
417 /*
418 struct OpenFileToDim
419 {
420 int code;
421 char fileName[FILENAME_MAX];
422 };
423
424 SignalRunOpened(runid, filename);
425 // Send num open files
426 // Send runid, (more info about the run?), filename via dim
427
428 SignalEvtWritten(runid);
429 // Send num events written of newest file
430
431 SignalRunClose(runid);
432 // Send new num open files
433 // Send empty file-name if no file is open
434
435 */
436
437 // -------------- Mapped event builder callbacks ------------------
438
439 void UpdateRuns(const string &fname="")
440 {
441 uint32_t values[5] =
442 {
443 static_cast<uint32_t>(fFiles.size()),
444 0xffffffff,
445 0,
446 fLastOpened,
447 fLastClosed
448 };
449
450 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
451 it!=fFiles.end(); it++)
452 {
453 const DataProcessorImp *file = *it;
454
455 if (file->GetRunId()<values[1])
456 values[1] = file->GetRunId();
457
458 if (file->GetRunId()>values[2])
459 values[2] = file->GetRunId();
460 }
461
462 fMaxRun = values[2];
463
464 vector<char> data(sizeof(values)+fname.size()+1);
465 memcpy(data.data(), values, sizeof(values));
466 strcpy(data.data()+sizeof(values), fname.c_str());
467
468 fDimRuns.Update(data);
469 }
470
471 vector<DataProcessorImp*> fFiles;
472
473 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
474 {
475 fMsg.Info(" ==> TODO: Update run configuration in database!");
476
477 // Check if file already exists...
478 DataProcessorImp *file = 0;
479 switch (fFileFormat)
480 {
481 case kNone: file = new DataDump(runid, fMsg); break;
482 case kDebug: file = new DataDebug(runid, fMsg); break;
483 case kFits: file = new DataWriteFits(runid, fMsg); break;
484 case kRaw: file = new DataWriteRaw(runid, fMsg); break;
485 case kCalib: file = new DataCalib(runid, fDimDrsCalibration, fMsg); break;
486 }
487
488 try
489 {
490 if (!file->Open(h))
491 return 0;
492 }
493 catch (const exception &e)
494 {
495 fMsg.Error("Exception trying to open file: "+string(e.what()));
496 return 0;
497 }
498
499 fFiles.push_back(file);
500
501 ostringstream str;
502 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
503 fMsg.Info(str);
504
505 fDimWriteStats.FileOpened(file->GetFileName());
506
507 fLastOpened = runid;
508 UpdateRuns(file->GetFileName());
509
510 fNumEvts[kEventId] = 0;
511 fNumEvts[kTriggerId] = 0;
512
513 fNumEvts[kCurrent] = 0;
514 fDimEvents.Update(fNumEvts);
515 // fDimCurrentEvent.Update(uint32_t(0));
516
517 return reinterpret_cast<FileHandle_t>(file);
518 }
519
520 int runWrite(FileHandle_t handler, EVENT *e, size_t)
521 {
522 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
523
524 if (!file->WriteEvt(e))
525 return -1;
526
527 if (file->GetRunId()==fMaxRun)
528 {
529 fNumEvts[kCurrent]++;
530 fNumEvts[kEventId] = e->EventNum;
531 fNumEvts[kTriggerId] = e->TriggerNum;
532 }
533
534 fNumEvts[kTotal]++;
535
536 static Time oldt(boost::date_time::neg_infin);
537 Time newt;
538 if (newt>oldt+boost::posix_time::seconds(1))
539 {
540 fDimEvents.Update(fNumEvts);
541 oldt = newt;
542 }
543
544
545 // ===> SignalEvtWritten(runid);
546 // Send num events written of newest file
547
548 /* close run runId (all all runs if runId=0) */
549 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
550 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
551
552 return 0;
553 }
554
555 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
556 {
557 fMsg.Info(" ==> TODO: Update run configuration in database!");
558
559 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
560
561 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
562 if (it==fFiles.end())
563 {
564 ostringstream str;
565 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
566 fMsg.Fatal(str);
567 return -1;
568 }
569
570 fFiles.erase(it);
571
572 fLastClosed = file->GetRunId();
573 UpdateRuns();
574
575 fDimEvents.Update(fNumEvts);
576
577 const bool rc = file->Close(tail);
578 if (!rc)
579 {
580 // Error message
581 }
582
583 ostringstream str;
584 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
585 fMsg.Info(str);
586
587 delete file;
588
589 // ==> SignalRunClose(runid);
590 // Send new num open files
591 // Send empty file-name if no file is open
592
593 return rc ? 0 : -1;
594 }
595
596 ofstream fDumpStream[40];
597
598 void debugStream(int isock, void *buf, int len)
599 {
600 if (!fDebugStream)
601 return;
602
603 const int slot = isock/7;
604 if (slot<0 || slot>39)
605 return;
606
607 if (!fDumpStream[slot].is_open())
608 {
609 ostringstream name;
610 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
611
612 fDumpStream[slot].open(name.str().c_str(), ios::app);
613 if (!fDumpStream[slot])
614 {
615 ostringstream str;
616 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
617 fMsg.Error(str);
618
619 return;
620 }
621
622 fMsg.Message("Opened file '"+name.str()+"' for writing.");
623 }
624
625 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
626 }
627
628 ofstream fDumpRead; // Stream to possibly dump docket events
629
630 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
631 {
632 // isock = socketID (0-279)
633 // ibyte = #bytes gelesen
634 // event = eventId (oder 0 wenn noch nicht bekannt)
635 // state : 1=finished reading data
636 // 0=reading data
637 // -1=start reading data (header)
638 // -2=start reading data,
639 // eventId not known yet (too little data)
640 // tsec, tusec = time when reading seconds, microseconds
641 //
642 if (!fDebugRead || ibyte==0)
643 return;
644
645 if (!fDumpRead.is_open())
646 {
647 fDumpRead.open("socket_events.txt", ios::app);
648 if (!fDumpRead)
649 {
650 ostringstream str;
651 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
652 fMsg.Error(str);
653
654 return;
655 }
656
657 fMsg.Message("Opened file 'socket_events.txt' for writing.");
658
659 fDumpRead << "# START: " << Time().GetAsStr() << endl;
660 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
661 }
662
663 fDumpRead
664 << setw(2) << state << " "
665 << setw(8) << tsec << " "
666 << setw(9) << tusec << " "
667 << setw(3) << isock << " "
668 << setw(2) << isock/7 << " "
669 << runno << " "
670 << event << " "
671 << ftmevt << " "
672 << ibyte << endl;
673 }
674
675 array<uint16_t,2> fVecRoi;
676
677 int eventCheck(PEVNT_HEADER *fadhd, EVENT *event)
678 {
679 /*
680 fadhd[i] ist ein array mit den 40 fad-headers
681 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
682
683 event ist die Struktur, die auch die write routine erhaelt;
684 darin sind im header die 'soll-werte' fuer z.B. eventID
685 als auch die ADC-Werte (falls Du die brauchst)
686
687 Wenn die routine einen negativen Wert liefert, wird das event
688 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
689 */
690
691 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
692
693 if (roi!=fVecRoi)
694 {
695 Update(fDimRoi, roi);
696 fVecRoi = roi;
697 }
698
699 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
700 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
701
702 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
703 {
704 // Event incomplete
705 //if (ptr->fStartDelimiter==0)
706 // return -1;
707
708 // Either one of
709 // * fStatus
710 // * fRunNumber
711 // * fEventCounter
712 // * fAdcClockPhaseShift
713 // * fTriggerGeneratorPrescaler
714 // * fDac
715 // inconsistent
716
717 // FIXME: Produce some output, only once per run or
718 // minute
719
720 /*
721 if (*ptr != *beg)
722 return -1;
723
724 if (ptr->fTriggerType != beg->fTriggerType)
725 return -1;
726 if (ptr->fTriggerId != beg->fTriggerId)
727 return -1;
728 if (ptr->fVersion != beg->fVersion)
729 return -1;
730 */
731 }
732
733 // check REFCLK_frequency
734 // check consistency with command configuration
735 // how to log errors?
736 // need gotNewRun/closedRun to know it is finished
737
738 static Time oldt(boost::date_time::neg_infin);
739 Time newt;
740
741 // FIXME: Only send events if the have newer run-numbers
742 if (newt<oldt+boost::posix_time::seconds(1))
743 return 0;
744
745 oldt = newt;
746
747 const size_t sz = sizeof(EVENT)+event->Roi*2*1440;
748
749 vector<char> data(sz+event->Roi*2*1440);
750 memcpy(data.data(), event, sizeof(EVENT));
751
752 DataCalib::Apply(reinterpret_cast<float*>(data.data()+sizeof(EVENT)),
753 event->Adc_Data, event->StartPix, event->Roi);
754
755 fDimEventData.Update(data);
756
757 return 0;
758 }
759
760 bool IsRunStarted() const
761 {
762 return fExpectedRuns.find(fRunNumber-1)==fExpectedRuns.end();
763 }
764
765 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
766 {
767 // This function is called even when writing is switched off
768 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
769 if (it==fExpectedRuns.end())
770 {
771 ostringstream str;
772 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
773 return;
774 }
775
776 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
777 // return: 0=close scheduled / >0 already closed / <0 does not exist
778
779 fExpectedRuns.erase(it);
780 }
781
782 map<boost::thread::id, string> fLastMessage;
783
784 void factOut(int severity, int err, const char *message)
785 {
786 if (!fDebugLog && severity==99)
787 return;
788
789 ostringstream str;
790 //str << boost::this_thread::get_id() << " ";
791 str << "EventBuilder(";
792 if (err<0)
793 str << "---";
794 else
795 str << err;
796 str << "): " << message;
797
798 string &old = fLastMessage[boost::this_thread::get_id()];
799
800 if (str.str()==old)
801 return;
802 old = str.str();
803
804 fMsg.Update(str, severity);
805 }
806/*
807 void factStat(int64_t *stat, int len)
808 {
809 if (len!=7)
810 {
811 fMsg.Warn("factStat received unknown number of values.");
812 return;
813 }
814
815 vector<int64_t> data(1, g_maxMem);
816 data.insert(data.end(), stat, stat+len);
817
818 static vector<int64_t> last(8);
819 if (data==last)
820 return;
821 last = data;
822
823 fDimStatistics.Update(data);
824
825 // len ist die Laenge des arrays.
826 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
827 // kannst Du pruefen, ob die 100MB voll sind ....
828
829 ostringstream str;
830 str
831 << "Wait=" << stat[0] << " "
832 << "Skip=" << stat[1] << " "
833 << "Del=" << stat[2] << " "
834 << "Tot=" << stat[3] << " "
835 << "Mem=" << stat[4] << "/" << g_maxMem << " "
836 << "Read=" << stat[5] << " "
837 << "Conn=" << stat[6];
838
839 fMsg.Info(str);
840 }
841 */
842
843 void factStat(const EVT_STAT &stat)
844 {
845 fDimStatistics2.Update(stat);
846 /*
847 //some info about what happened since start of program (or last 'reset')
848 uint32_t reset ; //#if increased, reset all counters
849 uint32_t numRead[MAX_SOCK] ; //how often succesfull read from N sockets per loop
850
851 uint64_t gotByte[NBOARDS] ; //#Bytes read per Board
852 uint32_t gotErr[NBOARDS] ; //#Communication Errors per Board
853 uint32_t evtGet; //#new Start of Events read
854 uint32_t evtTot; //#complete Events read
855 uint32_t evtErr; //#Events with Errors
856 uint32_t evtSkp; //#Events incomplete (timeout)
857
858 uint32_t procTot; //#Events processed
859 uint32_t procErr; //#Events showed problem in processing
860 uint32_t procTrg; //#Events accepted by SW trigger
861 uint32_t procSkp; //#Events rejected by SW trigger
862
863 uint32_t feedTot; //#Events used for feedBack system
864 uint32_t feedErr; //#Events rejected by feedBack
865
866 uint32_t wrtTot; //#Events written to disk
867 uint32_t wrtErr; //#Events with write-error
868
869 uint32_t runOpen; //#Runs opened
870 uint32_t runClose; //#Runs closed
871 uint32_t runErr; //#Runs with open/close errors
872
873
874 //info about current connection status
875 uint8_t numConn[NBOARDS] ; //#Sockets succesfully open per board
876 */
877 }
878
879 void factStat(const GUI_STAT &stat)
880 {
881 fDimStatistics1.Update(stat);
882 /*
883 //info about status of the main threads
884 int32_t readStat ; //read thread
885 int32_t procStat ; //processing thread(s)
886 int32_t writStat ; //write thread
887
888 //info about some rates
889 int32_t deltaT ; //time in milli-seconds for rates
890 int32_t readEvt ; //#events read
891 int32_t procEvt ; //#events processed
892 int32_t writEvt ; //#events written
893 int32_t skipEvt ; //#events skipped
894
895 //some info about current state of event buffer (snapspot)
896 int32_t evtBuf; //#Events currently waiting in Buffer
897 uint64_t totMem; //#Bytes available in Buffer
898 uint64_t usdMem; //#Bytes currently used
899 uint64_t maxMem; //max #Bytes used during past Second
900 */
901 }
902
903
904 array<FAD::EventHeader, 40> fVecHeader;
905
906 template<typename T, class S>
907 array<T, 42> Compare(const S *vec, const T *t)
908 {
909 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
910
911 const T *min = NULL;
912 const T *val = NULL;
913 const T *max = NULL;
914
915 array<T, 42> arr;
916
917 bool rc = true;
918 for (int i=0; i<40; i++)
919 {
920 const char *base = reinterpret_cast<const char*>(vec+i);
921 const T *ref = reinterpret_cast<const T*>(base+offset);
922
923 arr[i] = *ref;
924
925 if (gi_NumConnect[i]!=7)
926 {
927 arr[i] = 0;
928 continue;
929 }
930
931 if (!val)
932 {
933 min = ref;
934 val = ref;
935 max = ref;
936 }
937
938 if (*ref<*min)
939 min = ref;
940
941 if (*ref>*max)
942 max = ref;
943
944 if (*val!=*ref)
945 rc = false;
946 }
947
948 arr[40] = val ? *min : 1;
949 arr[41] = val ? *max : 0;
950
951 return arr;
952 }
953
954 template<typename T>
955 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
956 {
957 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
958
959 T val = 0;
960 T rc = 0;
961
962 array<T, 42> vec;
963
964 bool first = true;
965
966 for (int i=0; i<40; i++)
967 {
968 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
969 const T *ref = reinterpret_cast<const T*>(base+offset);
970
971 vec[i+2] = *ref;
972
973 if (gi_NumConnect[i]!=7)
974 {
975 vec[i+2] = 0;
976 continue;
977 }
978
979 if (first)
980 {
981 first = false;
982 val = *ref;
983 rc = 0;
984 }
985
986 rc |= val^*ref;
987 }
988
989 vec[0] = rc;
990 vec[1] = val;
991
992 return vec;
993 }
994
995 template<typename T, size_t N>
996 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
997 {
998// svc.setQuality(vec[40]<=vec[41]);
999 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1000 svc.updateService();
1001 }
1002
1003 template<typename T>
1004 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1005 {
1006 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1007 for (int i=0; i<40;i++)
1008 cout << " " << data.second[i+3];
1009 cout << endl;
1010 }
1011
1012 vector<uint> fNumConnected;
1013
1014 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1015 {
1016 const uint16_t id = h.Id();
1017 if (id>39)
1018 return;
1019
1020 if (fNumConnected.size()!=40)
1021 fNumConnected.resize(40);
1022
1023 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1024
1025 const bool changed = con!=fNumConnected || !IsThreadRunning();
1026
1027 fNumConnected = con;
1028
1029 const FAD::EventHeader old = fVecHeader[id];
1030 fVecHeader[id] = h;
1031
1032 if (old.fVersion != h.fVersion || changed)
1033 {
1034 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1035
1036 array<float,42> data;
1037 for (int i=0; i<42; i++)
1038 {
1039 ostringstream str;
1040 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1041 data[i] = stof(str.str());
1042 }
1043 Update(fDimFwVersion, data);
1044 }
1045
1046 if (old.fRunNumber != h.fRunNumber || changed)
1047 {
1048 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1049 fDimRunNumber.Update(run);
1050 }
1051
1052 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1053 {
1054 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1055 fDimPrescaler.Update(pre);
1056 }
1057
1058 if (old.fDNA != h.fDNA || changed)
1059 {
1060 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1061 Update(fDimDNA, dna, 40);
1062 }
1063
1064 if (old.fStatus != h.fStatus || changed)
1065 {
1066 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1067 Update(fDimStatus, sts);
1068 }
1069
1070 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1071 {
1072 array<uint16_t, FAD::kNumDac*42> dacs;
1073
1074 for (int i=0; i<FAD::kNumDac; i++)
1075 {
1076 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1077 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1078 }
1079
1080 Update(fDimDac, dacs);
1081 }
1082
1083 // -----------
1084
1085 static Time oldt(boost::date_time::neg_infin);
1086 Time newt;
1087
1088 if (newt>oldt+boost::posix_time::seconds(1))
1089 {
1090 oldt = newt;
1091
1092 // --- RefClock
1093
1094 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1095 Update(fDimRefClock, clk);
1096
1097 // --- Temperatures
1098
1099 const array<int16_t,42> tmp[4] =
1100 {
1101 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1102 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1103 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1104 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1105 };
1106
1107 vector<int16_t> data;
1108 data.reserve(82);
1109 data.push_back(tmp[0][40]); // min: 0
1110 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1111 data.push_back(tmp[0][41]); // max: 41
1112 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1113
1114 for (int j=1; j<=3; j++)
1115 {
1116 const array<int16_t,42> &ref = tmp[j];
1117
1118 // Gloabl min
1119 if (ref[40]<data[0]) // 40=min
1120 data[0] = ref[40];
1121
1122 // Global max
1123 if (ref[41]>data[41]) // 41=max
1124 data[41] = ref[41];
1125
1126 for (int i=0; i<40; i++)
1127 {
1128 // min per board
1129 if (ref[i]<data[i+1]) // data: 1-40
1130 data[i+1] = ref[i]; // ref: 0-39
1131
1132 // max per board
1133 if (ref[i]>data[i+42]) // data: 42-81
1134 data[i+42] = ref[i]; // ref: 0-39
1135 }
1136
1137
1138 }
1139
1140 vector<float> deg(82); // 0: global min, 1-40: min
1141 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1142 deg[i] = data[i]/16.;
1143 fDimTemperature.Update(deg);
1144 }
1145
1146 /*
1147 uint16_t fTriggerType;
1148 uint32_t fTriggerId;
1149 uint32_t fEventCounter;
1150 uint16_t fAdcClockPhaseShift;
1151 uint16_t fNumTriggersToGenerate;
1152 uint16_t fTriggerGeneratorPrescaler;
1153 uint32_t fTimeStamp;
1154 int16_t fTempDrs[kNumTemp]; // In units of 1/16 deg(?)
1155 uint16_t fDac[kNumDac];
1156 */
1157 }
1158};
1159
1160EventBuilderWrapper *EventBuilderWrapper::This = 0;
1161
1162// ----------- Event builder callbacks implementation ---------------
1163extern "C"
1164{
1165 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1166 {
1167 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1168 }
1169
1170 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1171 {
1172 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1173 }
1174
1175 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1176 {
1177 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1178 }
1179
1180 void factOut(int severity, int err, const char *message)
1181 {
1182 EventBuilderWrapper::This->factOut(severity, err, message);
1183 }
1184
1185 void factStat(GUI_STAT stat)
1186 {
1187 EventBuilderWrapper::This->factStat(stat);
1188 }
1189
1190 void factStatNew(EVT_STAT stat)
1191 {
1192 EventBuilderWrapper::This->factStat(stat);
1193 }
1194
1195 void debugHead(int socket, int/*board*/, void *buf)
1196 {
1197 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1198
1199 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1200 }
1201
1202 void debugStream(int isock, void *buf, int len)
1203 {
1204 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1205 }
1206
1207 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1208 {
1209 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1210 }
1211
1212 int eventCheck(PEVNT_HEADER *fadhd, EVENT *event)
1213 {
1214 return EventBuilderWrapper::This->eventCheck(fadhd, event);
1215 }
1216
1217 void gotNewRun( int runnr, PEVNT_HEADER *headers )
1218 {
1219 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1220 }
1221}
1222
1223#endif
Note: See TracBrowser for help on using the repository browser.