source: trunk/FACT++/src/EventBuilderWrapper.h@ 11966

Last change on this file since 11966 was 11893, checked in by tbretz, 13 years ago
Implemented a new determination fo run-number after noon in the fadctrl; created the possibility to add the date to the raw-data path; allow to set the path as program option; for all this moved some code from the datalogger to DimWriteStatistics
File size: 35.8 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 enum FileFormat_t
78 {
79 kNone = 0,
80 kDebug,
81 kFits,
82 kRaw,
83 kCalib
84 };
85
86 FileFormat_t fFileFormat;
87
88
89 uint32_t fMaxRun;
90 uint32_t fLastOpened;
91 uint32_t fLastClosed;
92 uint32_t fNumEvts[4];
93
94 DimWriteStatistics fDimWriteStats;
95 DimDescribedService fDimRuns;
96 DimDescribedService fDimEvents;
97 DimDescribedService fDimRawData;
98 DimDescribedService fDimEventData;
99 DimDescribedService fDimFwVersion;
100 DimDescribedService fDimRunNumber;
101 DimDescribedService fDimStatus;
102 DimDescribedService fDimDNA;
103 DimDescribedService fDimTemperature;
104 DimDescribedService fDimPrescaler;
105 DimDescribedService fDimRefClock;
106 DimDescribedService fDimRoi;
107 DimDescribedService fDimDac;
108 DimDescribedService fDimDrsCalibration;
109 DimDescribedService fDimStatistics1;
110 DimDescribedService fDimStatistics2;
111
112 bool fDebugStream;
113 bool fDebugRead;
114 bool fDebugLog;
115
116 string fPath;
117 uint32_t fRunNumber;
118
119protected:
120 int64_t InitRunNumber()
121 {
122 fRunNumber = 1000;
123
124 // Ensure that the night doesn't change during our check
125 const int check = Time().NightAsInt();
126
127 while (--fRunNumber>0)
128 {
129 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
130
131 if (access((name+"bin").c_str(), F_OK) == 0)
132 break;
133 if (access((name+"fits").c_str(), F_OK) == 0)
134 break;
135 if (access((name+"drs.fits").c_str(), F_OK) == 0)
136 break;
137 }
138
139 if (check != Time().NightAsInt())
140 return InitRunNumber();
141
142 fRunNumber++;
143
144 if (fRunNumber==1000)
145 {
146 fMsg.Error("You have a file with run-number 1000 in "+fPath);
147 return -1;
148 }
149
150 ostringstream str;
151 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
152 fMsg.Message(str);
153
154 fMsg.Info(" ==> TODO: Crosscheck with database!");
155
156 return check;
157 }
158
159 int64_t InitRunNumber(const string &path)
160 {
161 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
162 return -1;
163
164 //const fs::path fullPath = fs::system_complete(fs::path(path));
165
166 fPath = path;
167
168 return InitRunNumber();
169 }
170
171public:
172 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
173 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
174 fDimWriteStats ("FAD_CONTROL", imp),
175 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
176 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
177 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
178 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
179 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
180 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
181 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
182 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
183 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
184 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
185 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
186 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
187 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
188 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560", ""),
189 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
190 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
191 fDebugStream(false), fDebugRead(false), fDebugLog(false)
192 {
193 if (This)
194 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
195
196 This = this;
197
198 memset(fNumEvts, 0, sizeof(fNumEvts));
199
200 fDimEvents.Update(fNumEvts);
201
202 for (size_t i=0; i<40; i++)
203 ConnectSlot(i, tcp::endpoint());
204 }
205 virtual ~EventBuilderWrapper()
206 {
207 Abort();
208
209 // FIXME: Used timed_join and abort afterwards
210 // What's the maximum time the eb need to abort?
211 fThread.join();
212 //ffMsg.Info("EventBuilder stopped.");
213
214 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
215 delete *it;
216 }
217
218 struct RunDescription
219 {
220 uint32_t maxtime;
221 uint32_t maxevt;
222
223 string name;
224
225 FAD::Configuration reference;
226
227 bool started;
228 };
229
230 map<uint32_t, RunDescription> fExpectedRuns;
231
232 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
233 {
234 if (maxtime<=0 || maxtime>24*60*60)
235 maxtime = 24*60*60;
236 if (maxevt<=0 || maxevt>INT32_MAX)
237 maxevt = INT32_MAX;
238
239 const RunDescription descr =
240 {
241 uint32_t(maxtime),
242 uint32_t(maxevt),
243 ref.first,
244 ref.second,
245 false
246 };
247
248 // FIMXE: Maybe reset an event counter so that the mcp can count events?
249
250 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
251
252 fExpectedRuns[fRunNumber] = descr;
253 return fRunNumber++;
254 }
255
256 bool IsThreadRunning()
257 {
258 return !fThread.timed_join(boost::posix_time::microseconds(0));
259 }
260
261 void SetMaxMemory(unsigned int mb) const
262 {
263 /*
264 if (mb*1000000<GetUsedMemory())
265 {
266 // ffMsg.Warn("...");
267 return;
268 }*/
269
270 g_maxMem = size_t(mb)*1000000;
271 }
272
273 void StartThread(const vector<tcp::endpoint> &addr)
274 {
275 if (IsThreadRunning())
276 {
277 fMsg.Warn("Start - EventBuilder still running");
278 return;
279 }
280
281 fLastMessage.clear();
282
283 for (size_t i=0; i<40; i++)
284 ConnectSlot(i, addr[i]);
285
286 g_runStat = kModeRun;
287
288 fMsg.Message("Starting EventBuilder thread");
289
290 fThread = boost::thread(StartEvtBuild);
291 }
292 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
293 {
294 if (i>39)
295 return;
296
297 if (addr==tcp::endpoint())
298 {
299 DisconnectSlot(i);
300 return;
301 }
302
303 g_port[i].sockAddr.sin_family = AF_INET;
304 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
305 g_port[i].sockAddr.sin_port = htons(addr.port());
306 // In this order
307 g_port[i].sockDef = 1;
308 }
309 void DisconnectSlot(unsigned int i)
310 {
311 if (i>39)
312 return;
313
314 g_port[i].sockDef = 0;
315 // In this order
316 g_port[i].sockAddr.sin_family = AF_INET;
317 g_port[i].sockAddr.sin_addr.s_addr = 0;
318 g_port[i].sockAddr.sin_port = 0;
319 }
320 void IgnoreSlot(unsigned int i)
321 {
322 if (i>39)
323 return;
324 if (g_port[i].sockAddr.sin_port==0)
325 return;
326
327 g_port[i].sockDef = -1;
328 }
329
330
331 void Abort()
332 {
333 fMsg.Message("Signal abort to EventBuilder thread...");
334 g_runStat = kAbort;
335 }
336
337 void ResetThread(bool soft)
338 {
339 /*
340 if (g_reset > 0)
341
342 * suspend reading
343 * reset = g_reset;
344 * g_reset=0
345
346 * reset% 10
347 == 0 leave event Buffers as they are
348 == 1 let all buffers drain (write (incomplete) events)
349 > 1 flush all buffers (do not write buffered events)
350
351 * (reset/10)%10
352 > 0 close all sockets and destroy them (also free the
353 allocated read-buffers)
354 recreate before resuming operation
355 [ this is more than just close/open that can be
356 triggered by e.g. close/open the base-socket ]
357
358 * (reset/100)%10
359 > 0 close all open run-files
360
361 * (reset/1000)
362 sleep so many seconds before resuming operation
363 (does not (yet) take into account time left when waiting
364 for buffers getting empty ...)
365
366 * resume_reading
367
368 */
369 fMsg.Message("Signal reset to EventBuilder thread...");
370 g_reset = soft ? 101 : 102;
371 }
372
373 void Exit()
374 {
375 fMsg.Message("Signal exit to EventBuilder thread...");
376 g_runStat = kExit;
377 }
378
379 /*
380 void Wait()
381 {
382 fThread.join();
383 ffMsg.Message("EventBuilder stopped.");
384 }*/
385
386 void Hybernate() const { g_runStat = kHybernate; }
387 void Sleep() const { g_runStat = kSleep; }
388 void FlushMode() const { g_runStat = kModeFlush; }
389 void TestMode() const { g_runStat = kModeTest; }
390 void FlagMode() const { g_runStat = kModeFlag; }
391 void RunMode() const { g_runStat = kModeRun; }
392
393 // FIXME: To be removed
394 void SetMode(int mode) const { g_runStat = mode; }
395
396 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
397 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
398 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
399 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
400
401 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
402 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
403
404 void SetOutputFormat(FileFormat_t f)
405 {
406 fFileFormat = f;
407 if (fFileFormat==kCalib)
408 {
409 DataCalib::Restart();
410 DataCalib::Update(fDimDrsCalibration);
411 fMsg.Message("Resetted DRS calibration.");
412 }
413 }
414
415 void SetDebugLog(bool b) { fDebugLog = b; }
416
417 void SetDebugStream(bool b)
418 {
419 fDebugStream = b;
420 if (b)
421 return;
422
423 for (int i=0; i<40; i++)
424 {
425 if (!fDumpStream[i].is_open())
426 continue;
427
428 fDumpStream[i].close();
429
430 ostringstream name;
431 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
432 fMsg.Message("Closed file '"+name.str()+"'");
433 }
434 }
435
436 void SetDebugRead(bool b)
437 {
438 fDebugRead = b;
439 if (b || !fDumpRead.is_open())
440 return;
441
442 fDumpRead.close();
443 fMsg.Message("Closed file 'socket_events.txt'");
444 }
445
446// size_t GetUsedMemory() const { return gi_usedMem; }
447
448 void LoadDrsCalibration(const char *fname)
449 {
450 if (!DataCalib::ReadFits(fname, fMsg))
451 return;
452 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
453 DataCalib::Update(fDimDrsCalibration);
454 }
455
456 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
457
458
459 /*
460 struct OpenFileToDim
461 {
462 int code;
463 char fileName[FILENAME_MAX];
464 };
465
466 SignalRunOpened(runid, filename);
467 // Send num open files
468 // Send runid, (more info about the run?), filename via dim
469
470 SignalEvtWritten(runid);
471 // Send num events written of newest file
472
473 SignalRunClose(runid);
474 // Send new num open files
475 // Send empty file-name if no file is open
476
477 */
478
479 // -------------- Mapped event builder callbacks ------------------
480
481 void UpdateRuns(const string &fname="")
482 {
483 uint32_t values[5] =
484 {
485 static_cast<uint32_t>(fFiles.size()),
486 0xffffffff,
487 0,
488 fLastOpened,
489 fLastClosed
490 };
491
492 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
493 it!=fFiles.end(); it++)
494 {
495 const DataProcessorImp *file = *it;
496
497 if (file->GetRunId()<values[1])
498 values[1] = file->GetRunId();
499
500 if (file->GetRunId()>values[2])
501 values[2] = file->GetRunId();
502 }
503
504 fMaxRun = values[2];
505
506 vector<char> data(sizeof(values)+fname.size()+1);
507 memcpy(data.data(), values, sizeof(values));
508 strcpy(data.data()+sizeof(values), fname.c_str());
509
510 fDimRuns.Update(data);
511 }
512
513 vector<DataProcessorImp*> fFiles;
514
515 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
516 {
517 fMsg.Info(" ==> TODO: Update run configuration in database!");
518 fMsg.Info(" ==> TODO: Remove run from fExpected runs in case of failure!");
519 fMsg.Info(" ==> TODO: Write information from fTargetConfig to header!");
520
521 // Check if file already exists...
522 DataProcessorImp *file = 0;
523 switch (fFileFormat)
524 {
525 case kNone: file = new DataDump(fPath, runid, fMsg); break;
526 case kDebug: file = new DataDebug(fPath, runid, fMsg); break;
527 case kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
528 case kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
529 case kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
530 }
531
532 try
533 {
534 if (!file->Open(h))
535 return 0;
536 }
537 catch (const exception &e)
538 {
539 fMsg.Error("Exception trying to open file: "+string(e.what()));
540 return 0;
541 }
542
543 fFiles.push_back(file);
544
545 ostringstream str;
546 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
547 fMsg.Info(str);
548
549 fDimWriteStats.FileOpened(file->GetFileName());
550
551 fLastOpened = runid;
552 UpdateRuns(file->GetFileName());
553
554 fNumEvts[kEventId] = 0;
555 fNumEvts[kTriggerId] = 0;
556
557 fNumEvts[kCurrent] = 0;
558 fDimEvents.Update(fNumEvts);
559 // fDimCurrentEvent.Update(uint32_t(0));
560
561 return reinterpret_cast<FileHandle_t>(file);
562 }
563
564 int runWrite(FileHandle_t handler, EVENT *e, size_t)
565 {
566 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
567
568 if (!file->WriteEvt(e))
569 return -1;
570
571 if (file->GetRunId()==fMaxRun)
572 {
573 fNumEvts[kCurrent]++;
574 fNumEvts[kEventId] = e->EventNum;
575 fNumEvts[kTriggerId] = e->TriggerNum;
576 }
577
578 fNumEvts[kTotal]++;
579
580 static Time oldt(boost::date_time::neg_infin);
581 Time newt;
582 if (newt>oldt+boost::posix_time::seconds(1))
583 {
584 fDimEvents.Update(fNumEvts);
585 oldt = newt;
586 }
587
588
589 // ===> SignalEvtWritten(runid);
590 // Send num events written of newest file
591
592 /* close run runId (all all runs if runId=0) */
593 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
594 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
595
596 return 0;
597 }
598
599 virtual void CloseRun(uint32_t runid) { }
600
601 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
602 {
603 fMsg.Info(" ==> TODO: Update run configuration in database!");
604 fMsg.Info(" ==> TODO: Remove run from fExpected runs!");
605
606 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
607
608 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
609 if (it==fFiles.end())
610 {
611 ostringstream str;
612 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
613 fMsg.Fatal(str);
614 return -1;
615 }
616
617 fFiles.erase(it);
618
619 fLastClosed = file->GetRunId();
620 CloseRun(fLastClosed);
621 UpdateRuns();
622
623 fDimEvents.Update(fNumEvts);
624
625 const bool rc = file->Close(tail);
626 if (!rc)
627 {
628 // Error message
629 }
630
631 ostringstream str;
632 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
633 fMsg.Info(str);
634
635 delete file;
636
637 // ==> SignalRunClose(runid);
638 // Send new num open files
639 // Send empty file-name if no file is open
640
641 return rc ? 0 : -1;
642 }
643
644 ofstream fDumpStream[40];
645
646 void debugStream(int isock, void *buf, int len)
647 {
648 if (!fDebugStream)
649 return;
650
651 const int slot = isock/7;
652 if (slot<0 || slot>39)
653 return;
654
655 if (!fDumpStream[slot].is_open())
656 {
657 ostringstream name;
658 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
659
660 fDumpStream[slot].open(name.str().c_str(), ios::app);
661 if (!fDumpStream[slot])
662 {
663 ostringstream str;
664 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
665 fMsg.Error(str);
666
667 return;
668 }
669
670 fMsg.Message("Opened file '"+name.str()+"' for writing.");
671 }
672
673 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
674 }
675
676 ofstream fDumpRead; // Stream to possibly dump docket events
677
678 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
679 {
680 // isock = socketID (0-279)
681 // ibyte = #bytes gelesen
682 // event = eventId (oder 0 wenn noch nicht bekannt)
683 // state : 1=finished reading data
684 // 0=reading data
685 // -1=start reading data (header)
686 // -2=start reading data,
687 // eventId not known yet (too little data)
688 // tsec, tusec = time when reading seconds, microseconds
689 //
690 if (!fDebugRead || ibyte==0)
691 return;
692
693 if (!fDumpRead.is_open())
694 {
695 fDumpRead.open("socket_events.txt", ios::app);
696 if (!fDumpRead)
697 {
698 ostringstream str;
699 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
700 fMsg.Error(str);
701
702 return;
703 }
704
705 fMsg.Message("Opened file 'socket_events.txt' for writing.");
706
707 fDumpRead << "# START: " << Time().GetAsStr() << endl;
708 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
709 }
710
711 fDumpRead
712 << setw(2) << state << " "
713 << setw(8) << tsec << " "
714 << setw(9) << tusec << " "
715 << setw(3) << isock << " "
716 << setw(2) << isock/7 << " "
717 << runno << " "
718 << event << " "
719 << ftmevt << " "
720 << ibyte << endl;
721 }
722
723 array<uint16_t,2> fVecRoi;
724
725 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
726 {
727 /*
728 fadhd[i] ist ein array mit den 40 fad-headers
729 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
730
731 event ist die Struktur, die auch die write routine erhaelt;
732 darin sind im header die 'soll-werte' fuer z.B. eventID
733 als auch die ADC-Werte (falls Du die brauchst)
734
735 Wenn die routine einen negativen Wert liefert, wird das event
736 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
737 */
738
739 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
740
741 if (roi!=fVecRoi)
742 {
743 Update(fDimRoi, roi);
744 fVecRoi = roi;
745 }
746
747 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
748 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
749
750 // FIMXE: Compare with target configuration
751
752 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
753 {
754 // FIXME: Compare with expectations!!!
755 if (ptr->fStartDelimiter==0)
756 {
757 if (ptr==beg)
758 beg++;
759 continue;
760 }
761
762 if (beg->fStatus != ptr->fStatus)
763 {
764 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
765 CloseRunFile(runNr, 0, 0);
766 return -1;
767 }
768
769 if (beg->fRunNumber != ptr->fRunNumber)
770 {
771 fMsg.Error("Inconsistent run number detected.... closing run.");
772 CloseRunFile(runNr, 0, 0);
773 return -1;
774 }
775
776 /*
777 if (beg->fVersion != ptr->fVersion)
778 {
779 Error("Inconsist firmware version detected.... closing run.");
780 CloseRunFile(runNr, 0, 0);
781 break;
782 }
783 if (beg->fEventCounter != ptr->fEventCounter)
784 {
785 Error("Inconsist run number detected.... closing run.");
786 CloseRunFile(runNr, 0, 0);
787 break;
788 }
789 if (beg->fTriggerCounter != ptr->fTriggerCounter)
790 {
791 Error("Inconsist trigger number detected.... closing run.");
792 CloseRunFile(runNr, 0, 0);
793 break;
794 }*/
795
796 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
797 {
798 fMsg.Error("Inconsistent phase shift detected.... closing run.");
799 CloseRunFile(runNr, 0, 0);
800 return -1;
801 }
802
803 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
804 {
805 fMsg.Error("Inconsistent DAC values detected.... closing run.");
806 CloseRunFile(runNr, 0, 0);
807 return -1;
808 }
809
810 if (beg->fTriggerType != ptr->fTriggerType)
811 {
812 fMsg.Error("Inconsistent trigger type detected.... closing run.");
813 CloseRunFile(runNr, 0, 0);
814 return -1;
815 }
816 }
817
818 // check REFCLK_frequency
819 // check consistency with command configuration
820 // how to log errors?
821 // need gotNewRun/closedRun to know it is finished
822
823 static Time oldt(boost::date_time::neg_infin);
824 Time newt;
825
826 // FIXME: Only send events if the have newer run-numbers
827 if (newt<oldt+boost::posix_time::seconds(1))
828 return 0;
829
830 oldt = newt;
831
832 const size_t sz = sizeof(EVENT)+event->Roi*2*1440;
833
834 vector<char> data(sz+event->Roi*2*1440);
835 memcpy(data.data(), event, sizeof(EVENT));
836
837 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
838
839 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
840 fDimRawData.Update(data);
841
842 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
843 CalibData::GetPixelStats(data2.data(), vec, event->Roi);
844
845 fDimEventData.Update(data2);
846
847
848
849
850 return 0;
851 }
852
853 bool IsRunStarted() const
854 {
855 const map<uint32_t,RunDescription>::const_iterator it = fExpectedRuns.find(fRunNumber-1);
856 return it==fExpectedRuns.end();// ? true : it->second.started;
857 }
858
859 uint32_t GetRunNumber() const
860 {
861 return fRunNumber;
862 }
863
864 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
865 {
866 // This function is called even when writing is switched off
867 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
868 if (it==fExpectedRuns.end())
869 {
870 ostringstream str;
871 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
872 return;
873 }
874
875 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
876 // return: 0=close scheduled / >0 already closed / <0 does not exist
877
878 // FIXME: Move configuration from expected runs to runs which will soon
879 // be opened/closed
880
881 it->second.started = true;
882
883 fExpectedRuns.erase(it);
884 }
885
886 map<boost::thread::id, string> fLastMessage;
887
888 void factOut(int severity, int err, const char *message)
889 {
890 if (!fDebugLog && severity==99)
891 return;
892
893 ostringstream str;
894 //str << boost::this_thread::get_id() << " ";
895 str << "EventBuilder(";
896 if (err<0)
897 str << "---";
898 else
899 str << err;
900 str << "): " << message;
901
902 string &old = fLastMessage[boost::this_thread::get_id()];
903
904 if (str.str()==old)
905 return;
906 old = str.str();
907
908 fMsg.Update(str, severity);
909 }
910/*
911 void factStat(int64_t *stat, int len)
912 {
913 if (len!=7)
914 {
915 fMsg.Warn("factStat received unknown number of values.");
916 return;
917 }
918
919 vector<int64_t> data(1, g_maxMem);
920 data.insert(data.end(), stat, stat+len);
921
922 static vector<int64_t> last(8);
923 if (data==last)
924 return;
925 last = data;
926
927 fDimStatistics.Update(data);
928
929 // len ist die Laenge des arrays.
930 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
931 // kannst Du pruefen, ob die 100MB voll sind ....
932
933 ostringstream str;
934 str
935 << "Wait=" << stat[0] << " "
936 << "Skip=" << stat[1] << " "
937 << "Del=" << stat[2] << " "
938 << "Tot=" << stat[3] << " "
939 << "Mem=" << stat[4] << "/" << g_maxMem << " "
940 << "Read=" << stat[5] << " "
941 << "Conn=" << stat[6];
942
943 fMsg.Info(str);
944 }
945 */
946
947 void factStat(const EVT_STAT &stat)
948 {
949 fDimStatistics2.Update(stat);
950 }
951
952 void factStat(const GUI_STAT &stat)
953 {
954 fDimStatistics1.Update(stat);
955 }
956
957
958 array<FAD::EventHeader, 40> fVecHeader;
959
960 template<typename T, class S>
961 array<T, 42> Compare(const S *vec, const T *t)
962 {
963 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
964
965 const T *min = NULL;
966 const T *val = NULL;
967 const T *max = NULL;
968
969 array<T, 42> arr;
970
971 bool rc = true;
972 for (int i=0; i<40; i++)
973 {
974 const char *base = reinterpret_cast<const char*>(vec+i);
975 const T *ref = reinterpret_cast<const T*>(base+offset);
976
977 arr[i] = *ref;
978
979 if (gi_NumConnect[i]!=7)
980 {
981 arr[i] = 0;
982 continue;
983 }
984
985 if (!val)
986 {
987 min = ref;
988 val = ref;
989 max = ref;
990 }
991
992 if (*ref<*min)
993 min = ref;
994
995 if (*ref>*max)
996 max = ref;
997
998 if (*val!=*ref)
999 rc = false;
1000 }
1001
1002 arr[40] = val ? *min : 1;
1003 arr[41] = val ? *max : 0;
1004
1005 return arr;
1006 }
1007
1008 template<typename T>
1009 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1010 {
1011 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1012
1013 T val = 0;
1014 T rc = 0;
1015
1016 array<T, 42> vec;
1017
1018 bool first = true;
1019
1020 for (int i=0; i<40; i++)
1021 {
1022 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1023 const T *ref = reinterpret_cast<const T*>(base+offset);
1024
1025 vec[i+2] = *ref;
1026
1027 if (gi_NumConnect[i]!=7)
1028 {
1029 vec[i+2] = 0;
1030 continue;
1031 }
1032
1033 if (first)
1034 {
1035 first = false;
1036 val = *ref;
1037 rc = 0;
1038 }
1039
1040 rc |= val^*ref;
1041 }
1042
1043 vec[0] = rc;
1044 vec[1] = val;
1045
1046 return vec;
1047 }
1048
1049 template<typename T, size_t N>
1050 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1051 {
1052// svc.setQuality(vec[40]<=vec[41]);
1053 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1054 svc.Update();
1055 }
1056
1057 template<typename T>
1058 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1059 {
1060 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1061 for (int i=0; i<40;i++)
1062 cout << " " << data.second[i+3];
1063 cout << endl;
1064 }
1065
1066 vector<uint> fNumConnected;
1067
1068 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1069 {
1070 const uint16_t id = h.Id();
1071 if (id>39)
1072 return;
1073
1074 if (fNumConnected.size()!=40)
1075 fNumConnected.resize(40);
1076
1077 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1078
1079 const bool changed = con!=fNumConnected || !IsThreadRunning();
1080
1081 fNumConnected = con;
1082
1083 const FAD::EventHeader old = fVecHeader[id];
1084 fVecHeader[id] = h;
1085
1086 if (old.fVersion != h.fVersion || changed)
1087 {
1088 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1089
1090 array<float,42> data;
1091 for (int i=0; i<42; i++)
1092 {
1093 ostringstream str;
1094 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1095 data[i] = stof(str.str());
1096 }
1097 Update(fDimFwVersion, data);
1098 }
1099
1100 if (old.fRunNumber != h.fRunNumber || changed)
1101 {
1102 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1103 fDimRunNumber.Update(run);
1104 }
1105
1106 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1107 {
1108 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1109 fDimPrescaler.Update(pre);
1110 }
1111
1112 if (old.fDNA != h.fDNA || changed)
1113 {
1114 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1115 Update(fDimDNA, dna, 40);
1116 }
1117
1118 if (old.fStatus != h.fStatus || changed)
1119 {
1120 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1121 Update(fDimStatus, sts);
1122 }
1123
1124 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1125 {
1126 array<uint16_t, FAD::kNumDac*42> dacs;
1127
1128 for (int i=0; i<FAD::kNumDac; i++)
1129 {
1130 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1131 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1132 }
1133
1134 Update(fDimDac, dacs);
1135 }
1136
1137 // -----------
1138
1139 static Time oldt(boost::date_time::neg_infin);
1140 Time newt;
1141
1142 if (newt>oldt+boost::posix_time::seconds(1))
1143 {
1144 oldt = newt;
1145
1146 // --- RefClock
1147
1148 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1149 Update(fDimRefClock, clk);
1150
1151 // --- Temperatures
1152
1153 const array<int16_t,42> tmp[4] =
1154 {
1155 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1156 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1157 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1158 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1159 };
1160
1161 vector<int16_t> data;
1162 data.reserve(82);
1163 data.push_back(tmp[0][40]); // min: 0
1164 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1165 data.push_back(tmp[0][41]); // max: 41
1166 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1167
1168 for (int j=1; j<=3; j++)
1169 {
1170 const array<int16_t,42> &ref = tmp[j];
1171
1172 // Gloabl min
1173 if (ref[40]<data[0]) // 40=min
1174 data[0] = ref[40];
1175
1176 // Global max
1177 if (ref[41]>data[41]) // 41=max
1178 data[41] = ref[41];
1179
1180 for (int i=0; i<40; i++)
1181 {
1182 // min per board
1183 if (ref[i]<data[i+1]) // data: 1-40
1184 data[i+1] = ref[i]; // ref: 0-39
1185
1186 // max per board
1187 if (ref[i]>data[i+42]) // data: 42-81
1188 data[i+42] = ref[i]; // ref: 0-39
1189 }
1190
1191
1192 }
1193
1194 vector<float> deg(82); // 0: global min, 1-40: min
1195 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1196 deg[i] = data[i]/16.;
1197 fDimTemperature.Update(deg);
1198 }
1199 }
1200};
1201
1202EventBuilderWrapper *EventBuilderWrapper::This = 0;
1203
1204// ----------- Event builder callbacks implementation ---------------
1205extern "C"
1206{
1207 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1208 {
1209 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1210 }
1211
1212 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1213 {
1214 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1215 }
1216
1217 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1218 {
1219 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1220 }
1221
1222 void factOut(int severity, int err, const char *message)
1223 {
1224 EventBuilderWrapper::This->factOut(severity, err, message);
1225 }
1226
1227 void factStat(GUI_STAT stat)
1228 {
1229 EventBuilderWrapper::This->factStat(stat);
1230 }
1231
1232 void factStatNew(EVT_STAT stat)
1233 {
1234 EventBuilderWrapper::This->factStat(stat);
1235 }
1236
1237 void debugHead(int socket, int/*board*/, void *buf)
1238 {
1239 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1240
1241 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1242 }
1243
1244 void debugStream(int isock, void *buf, int len)
1245 {
1246 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1247 }
1248
1249 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1250 {
1251 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1252 }
1253
1254 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
1255 {
1256 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event);
1257 }
1258
1259 void gotNewRun(int runnr, PEVNT_HEADER *headers)
1260 {
1261 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1262 }
1263
1264 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer)
1265 {
1266 return 100;
1267 }
1268
1269}
1270
1271#endif
Note: See TracBrowser for help on using the repository browser.