source: trunk/FACT++/src/EventBuilderWrapper.h@ 12333

Last change on this file since 12333 was 12333, checked in by tbretz, 13 years ago
Added GetNumFilesOpen and IncreaseRunNumber.
File size: 38.5 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 enum FileFormat_t
78 {
79 kNone = 0,
80 kDebug,
81 kFits,
82 kRaw,
83 kCalib
84 };
85
86 FileFormat_t fFileFormat;
87
88
89 uint32_t fMaxRun;
90 uint32_t fLastOpened;
91 uint32_t fLastClosed;
92 uint32_t fNumEvts[4];
93
94 DimWriteStatistics fDimWriteStats;
95 DimDescribedService fDimRuns;
96 DimDescribedService fDimEvents;
97 DimDescribedService fDimRawData;
98 DimDescribedService fDimEventData;
99 DimDescribedService fDimFeedbackData;
100 DimDescribedService fDimFwVersion;
101 DimDescribedService fDimRunNumber;
102 DimDescribedService fDimStatus;
103 DimDescribedService fDimDNA;
104 DimDescribedService fDimTemperature;
105 DimDescribedService fDimPrescaler;
106 DimDescribedService fDimRefClock;
107 DimDescribedService fDimRoi;
108 DimDescribedService fDimDac;
109 DimDescribedService fDimDrsCalibration;
110 DimDescribedService fDimStatistics1;
111 DimDescribedService fDimStatistics2;
112
113 bool fDebugStream;
114 bool fDebugRead;
115 bool fDebugLog;
116
117 string fPath;
118 uint32_t fRunNumber;
119
120protected:
121 int64_t InitRunNumber()
122 {
123 fRunNumber = 1000;
124
125 // Ensure that the night doesn't change during our check
126 const int check = Time().NightAsInt();
127
128 while (--fRunNumber>0)
129 {
130 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
131
132 if (access((name+"bin").c_str(), F_OK) == 0)
133 break;
134 if (access((name+"fits").c_str(), F_OK) == 0)
135 break;
136 if (access((name+"drs.fits").c_str(), F_OK) == 0)
137 break;
138
139 }
140
141 if (check != Time().NightAsInt())
142 return InitRunNumber();
143
144 fRunNumber++;
145
146 if (fRunNumber==1000)
147 {
148 fMsg.Error("You have a file with run-number 1000 in "+fPath);
149 return -1;
150 }
151
152 ostringstream str;
153 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
154 fMsg.Message(str);
155
156 fMsg.Info(" ==> TODO: Crosscheck with database!");
157
158 return check;
159 }
160
161 int64_t InitRunNumber(const string &path)
162 {
163 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
164 return -1;
165
166 //const fs::path fullPath = fs::system_complete(fs::path(path));
167
168 fPath = path;
169
170 fDimWriteStats.SetCurrentFolder(fPath);
171
172 return InitRunNumber();
173 }
174
175public:
176 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
177 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
178 fDimWriteStats ("FAD_CONTROL", imp),
179 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
180 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
181 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
182 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
183 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
184 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
185 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
186 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
187 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
188 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
189 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
190 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
191 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
192 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
193 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560", ""),
194 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
195 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
196 fDebugStream(false), fDebugRead(false), fDebugLog(false)
197 {
198 if (This)
199 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
200
201 This = this;
202
203 memset(fNumEvts, 0, sizeof(fNumEvts));
204 fDimEvents.Update(fNumEvts);
205
206 for (size_t i=0; i<40; i++)
207 ConnectSlot(i, tcp::endpoint());
208 }
209 virtual ~EventBuilderWrapper()
210 {
211 Abort();
212
213 // FIXME: Used timed_join and abort afterwards
214 // What's the maximum time the eb need to abort?
215 fThread.join();
216 //ffMsg.Info("EventBuilder stopped.");
217
218 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
219 delete *it;
220 }
221
222 struct RunDescription
223 {
224 uint32_t maxtime;
225 uint32_t maxevt;
226
227 string name;
228
229 FAD::Configuration reference;
230
231 bool started;
232 };
233
234 map<uint32_t, RunDescription> fExpectedRuns;
235
236 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
237 {
238 if (maxtime<=0 || maxtime>24*60*60)
239 maxtime = 24*60*60;
240 if (maxevt<=0 || maxevt>INT32_MAX)
241 maxevt = INT32_MAX;
242
243 const RunDescription descr =
244 {
245 uint32_t(maxtime),
246 uint32_t(maxevt),
247 ref.first,
248 ref.second,
249 false
250 };
251
252 // FIMXE: Maybe reset an event counter so that the mcp can count events?
253
254 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
255
256 fExpectedRuns[fRunNumber] = descr;
257 return fRunNumber++;
258 }
259
260 bool IsThreadRunning()
261 {
262 return !fThread.timed_join(boost::posix_time::microseconds(0));
263 }
264
265 void SetMaxMemory(unsigned int mb) const
266 {
267 /*
268 if (mb*1000000<GetUsedMemory())
269 {
270 // ffMsg.Warn("...");
271 return;
272 }*/
273
274 g_maxMem = size_t(mb)*1000000;
275 }
276
277 void StartThread(const vector<tcp::endpoint> &addr)
278 {
279 if (IsThreadRunning())
280 {
281 fMsg.Warn("Start - EventBuilder still running");
282 return;
283 }
284
285 fLastMessage.clear();
286
287 for (size_t i=0; i<40; i++)
288 ConnectSlot(i, addr[i]);
289
290 g_runStat = kModeRun;
291 g_maxProc = 3;
292
293 fMsg.Message("Starting EventBuilder thread");
294
295 fThread = boost::thread(StartEvtBuild);
296 }
297 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
298 {
299 if (i>39)
300 return;
301
302 if (addr==tcp::endpoint())
303 {
304 DisconnectSlot(i);
305 return;
306 }
307
308 g_port[i].sockAddr.sin_family = AF_INET;
309 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
310 g_port[i].sockAddr.sin_port = htons(addr.port());
311 // In this order
312 g_port[i].sockDef = 1;
313 }
314 void DisconnectSlot(unsigned int i)
315 {
316 if (i>39)
317 return;
318
319 g_port[i].sockDef = 0;
320 // In this order
321 g_port[i].sockAddr.sin_family = AF_INET;
322 g_port[i].sockAddr.sin_addr.s_addr = 0;
323 g_port[i].sockAddr.sin_port = 0;
324 }
325 void IgnoreSlot(unsigned int i)
326 {
327 if (i>39)
328 return;
329 if (g_port[i].sockAddr.sin_port==0)
330 return;
331
332 g_port[i].sockDef = -1;
333 }
334
335
336 void Abort()
337 {
338 fMsg.Message("Signal abort to EventBuilder thread...");
339 g_runStat = kAbort;
340 }
341
342 void ResetThread(bool soft)
343 {
344 /*
345 if (g_reset > 0)
346
347 * suspend reading
348 * reset = g_reset;
349 * g_reset=0
350
351 * reset% 10
352 == 0 leave event Buffers as they are
353 == 1 let all buffers drain (write (incomplete) events)
354 > 1 flush all buffers (do not write buffered events)
355
356 * (reset/10)%10
357 > 0 close all sockets and destroy them (also free the
358 allocated read-buffers)
359 recreate before resuming operation
360 [ this is more than just close/open that can be
361 triggered by e.g. close/open the base-socket ]
362
363 * (reset/100)%10
364 > 0 close all open run-files
365
366 * (reset/1000)
367 sleep so many seconds before resuming operation
368 (does not (yet) take into account time left when waiting
369 for buffers getting empty ...)
370
371 * resume_reading
372
373 */
374 fMsg.Message("Signal reset to EventBuilder thread...");
375 g_reset = soft ? 101 : 102;
376 }
377
378 void Exit()
379 {
380 fMsg.Message("Signal exit to EventBuilder thread...");
381 g_runStat = kExit;
382 }
383
384 /*
385 void Wait()
386 {
387 fThread.join();
388 ffMsg.Message("EventBuilder stopped.");
389 }*/
390
391 void Hybernate() const { g_runStat = kHybernate; }
392 void Sleep() const { g_runStat = kSleep; }
393 void FlushMode() const { g_runStat = kModeFlush; }
394 void TestMode() const { g_runStat = kModeTest; }
395 void FlagMode() const { g_runStat = kModeFlag; }
396 void RunMode() const { g_runStat = kModeRun; }
397
398 // FIXME: To be removed
399 void SetMode(int mode) const { g_runStat = mode; }
400
401 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
402 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
403 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
404 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
405 int GetNumFilesOpen() const { return fFiles.size(); }
406
407 /*
408 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
409 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
410 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
411 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
412 */
413
414 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
415 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
416
417 void SetOutputFormat(FileFormat_t f)
418 {
419 fFileFormat = f;
420 if (fFileFormat==kCalib)
421 {
422 DataCalib::Restart();
423 DataCalib::Update(fDimDrsCalibration);
424 fMsg.Message("Resetted DRS calibration.");
425 }
426 }
427
428 void SetDebugLog(bool b) { fDebugLog = b; }
429
430 void SetDebugStream(bool b)
431 {
432 fDebugStream = b;
433 if (b)
434 return;
435
436 for (int i=0; i<40; i++)
437 {
438 if (!fDumpStream[i].is_open())
439 continue;
440
441 fDumpStream[i].close();
442
443 ostringstream name;
444 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
445 fMsg.Message("Closed file '"+name.str()+"'");
446 }
447 }
448
449 void SetDebugRead(bool b)
450 {
451 fDebugRead = b;
452 if (b || !fDumpRead.is_open())
453 return;
454
455 fDumpRead.close();
456 fMsg.Message("Closed file 'socket_events.txt'");
457 }
458
459// size_t GetUsedMemory() const { return gi_usedMem; }
460
461 void LoadDrsCalibration(const char *fname)
462 {
463 if (!DataCalib::ReadFits(fname, fMsg))
464 return;
465 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
466 DataCalib::Update(fDimDrsCalibration);
467 }
468
469 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
470
471
472 /*
473 struct OpenFileToDim
474 {
475 int code;
476 char fileName[FILENAME_MAX];
477 };
478
479 SignalRunOpened(runid, filename);
480 // Send num open files
481 // Send runid, (more info about the run?), filename via dim
482
483 SignalEvtWritten(runid);
484 // Send num events written of newest file
485
486 SignalRunClose(runid);
487 // Send new num open files
488 // Send empty file-name if no file is open
489
490 */
491
492 // -------------- Mapped event builder callbacks ------------------
493
494 void UpdateRuns(const string &fname="")
495 {
496 uint32_t values[5] =
497 {
498 static_cast<uint32_t>(fFiles.size()),
499 0xffffffff,
500 0,
501 fLastOpened,
502 fLastClosed
503 };
504
505 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
506 it!=fFiles.end(); it++)
507 {
508 const DataProcessorImp *file = *it;
509
510 if (file->GetRunId()<values[1])
511 values[1] = file->GetRunId();
512
513 if (file->GetRunId()>values[2])
514 values[2] = file->GetRunId();
515 }
516
517 fMaxRun = values[2];
518
519 vector<char> data(sizeof(values)+fname.size()+1);
520 memcpy(data.data(), values, sizeof(values));
521 strcpy(data.data()+sizeof(values), fname.c_str());
522
523 fDimRuns.Update(data);
524 }
525
526 vector<DataProcessorImp*> fFiles;
527
528 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
529 {
530 fMsg.Info(" ==> TODO: Update run configuration in database!");
531 fMsg.Info(" ==> TODO: Remove run from fExpected runs in case of failure!");
532 fMsg.Info(" ==> TODO: Write information from fTargetConfig to header!");
533
534 // Check if file already exists...
535 DataProcessorImp *file = 0;
536 switch (fFileFormat)
537 {
538 case kNone: file = new DataDump(fPath, runid, fMsg); break;
539 case kDebug: file = new DataDebug(fPath, runid, fMsg); break;
540 case kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
541 case kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
542 case kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
543 }
544
545 try
546 {
547 if (!file->Open(h))
548 return 0;
549 }
550 catch (const exception &e)
551 {
552 fMsg.Error("Exception trying to open file: "+string(e.what()));
553 return 0;
554 }
555
556 fFiles.push_back(file);
557
558 ostringstream str;
559 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
560 fMsg.Info(str);
561
562 fDimWriteStats.FileOpened(file->GetFileName());
563
564 fLastOpened = runid;
565 UpdateRuns(file->GetFileName());
566
567 fNumEvts[kEventId] = 0;
568 fNumEvts[kTriggerId] = 0;
569
570 fNumEvts[kCurrent] = 0;
571 fDimEvents.Update(fNumEvts);
572 // fDimCurrentEvent.Update(uint32_t(0));
573
574 return reinterpret_cast<FileHandle_t>(file);
575 }
576
577 int runWrite(FileHandle_t handler, EVENT *e, size_t sz)
578 {
579 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
580
581 if (!file->WriteEvt(e))
582 return -1;
583
584 if (file->GetRunId()==fMaxRun)
585 {
586 fNumEvts[kCurrent]++;
587 fNumEvts[kEventId] = e->EventNum;
588 fNumEvts[kTriggerId] = e->TriggerNum;
589 }
590
591 fNumEvts[kTotal]++;
592
593 static Time oldt(boost::date_time::neg_infin);
594 Time newt;
595 if (newt>oldt+boost::posix_time::seconds(1))
596 {
597 fDimEvents.Update(fNumEvts);
598 oldt = newt;
599 }
600
601
602 // ===> SignalEvtWritten(runid);
603 // Send num events written of newest file
604
605 /* close run runId (all all runs if runId=0) */
606 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
607 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
608
609 return 0;
610 }
611
612 virtual void CloseRun(uint32_t runid) { }
613
614 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
615 {
616 fMsg.Info(" ==> TODO: Update run configuration in database!");
617 fMsg.Info(" ==> TODO: Remove run from fExpected runs!");
618
619 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
620
621 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
622 if (it==fFiles.end())
623 {
624 ostringstream str;
625 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
626 fMsg.Fatal(str);
627 return -1;
628 }
629
630 fFiles.erase(it);
631
632 fLastClosed = file->GetRunId();
633 CloseRun(fLastClosed);
634 UpdateRuns();
635
636 fDimEvents.Update(fNumEvts);
637
638 const bool rc = file->Close(tail);
639 if (!rc)
640 {
641 // Error message
642 }
643
644 ostringstream str;
645 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
646 fMsg.Info(str);
647
648 delete file;
649
650 // ==> SignalRunClose(runid);
651 // Send new num open files
652 // Send empty file-name if no file is open
653
654 return rc ? 0 : -1;
655 }
656
657 ofstream fDumpStream[40];
658
659 void debugStream(int isock, void *buf, int len)
660 {
661 if (!fDebugStream)
662 return;
663
664 const int slot = isock/7;
665 if (slot<0 || slot>39)
666 return;
667
668 if (!fDumpStream[slot].is_open())
669 {
670 ostringstream name;
671 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
672
673 fDumpStream[slot].open(name.str().c_str(), ios::app);
674 if (!fDumpStream[slot])
675 {
676 ostringstream str;
677 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
678 fMsg.Error(str);
679
680 return;
681 }
682
683 fMsg.Message("Opened file '"+name.str()+"' for writing.");
684 }
685
686 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
687 }
688
689 ofstream fDumpRead; // Stream to possibly dump docket events
690
691 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
692 {
693 // isock = socketID (0-279)
694 // ibyte = #bytes gelesen
695 // event = eventId (oder 0 wenn noch nicht bekannt)
696 // state : 1=finished reading data
697 // 0=reading data
698 // -1=start reading data (header)
699 // -2=start reading data,
700 // eventId not known yet (too little data)
701 // tsec, tusec = time when reading seconds, microseconds
702 //
703 if (!fDebugRead || ibyte==0)
704 return;
705
706 if (!fDumpRead.is_open())
707 {
708 fDumpRead.open("socket_events.txt", ios::app);
709 if (!fDumpRead)
710 {
711 ostringstream str;
712 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
713 fMsg.Error(str);
714
715 return;
716 }
717
718 fMsg.Message("Opened file 'socket_events.txt' for writing.");
719
720 fDumpRead << "# START: " << Time().GetAsStr() << endl;
721 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
722 }
723
724 fDumpRead
725 << setw(2) << state << " "
726 << setw(8) << tsec << " "
727 << setw(9) << tusec << " "
728 << setw(3) << isock << " "
729 << setw(2) << isock/7 << " "
730 << runno << " "
731 << event << " "
732 << ftmevt << " "
733 << ibyte << endl;
734 }
735
736 array<uint16_t,2> fVecRoi;
737
738 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int iboard)
739 {
740 /*
741 fadhd[i] ist ein array mit den 40 fad-headers
742 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
743
744 event ist die Struktur, die auch die write routine erhaelt;
745 darin sind im header die 'soll-werte' fuer z.B. eventID
746 als auch die ADC-Werte (falls Du die brauchst)
747
748 Wenn die routine einen negativen Wert liefert, wird das event
749 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
750 */
751
752 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
753
754 if (roi!=fVecRoi)
755 {
756 Update(fDimRoi, roi);
757 fVecRoi = roi;
758 }
759
760 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
761 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
762
763 // FIMXE: Compare with target configuration
764
765 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
766 {
767 // FIXME: Compare with expectations!!!
768 if (ptr->fStartDelimiter==0)
769 {
770 if (ptr==beg)
771 beg++;
772 continue;
773 }
774
775 if (beg->fStatus != ptr->fStatus)
776 {
777 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
778 CloseRunFile(runNr, 0, 0);
779 return -1;
780 }
781
782 if (beg->fRunNumber != ptr->fRunNumber)
783 {
784 fMsg.Error("Inconsistent run number detected.... closing run.");
785 CloseRunFile(runNr, 0, 0);
786 return -1;
787 }
788
789 /*
790 if (beg->fVersion != ptr->fVersion)
791 {
792 Error("Inconsist firmware version detected.... closing run.");
793 CloseRunFile(runNr, 0, 0);
794 break;
795 }
796 if (beg->fEventCounter != ptr->fEventCounter)
797 {
798 Error("Inconsist run number detected.... closing run.");
799 CloseRunFile(runNr, 0, 0);
800 break;
801 }
802 if (beg->fTriggerCounter != ptr->fTriggerCounter)
803 {
804 Error("Inconsist trigger number detected.... closing run.");
805 CloseRunFile(runNr, 0, 0);
806 break;
807 }*/
808
809 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
810 {
811 fMsg.Error("Inconsistent phase shift detected.... closing run.");
812 CloseRunFile(runNr, 0, 0);
813 return -1;
814 }
815
816 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
817 {
818 fMsg.Error("Inconsistent DAC values detected.... closing run.");
819 CloseRunFile(runNr, 0, 0);
820 return -1;
821 }
822
823 if (beg->fTriggerType != ptr->fTriggerType)
824 {
825 fMsg.Error("Inconsistent trigger type detected.... closing run.");
826 CloseRunFile(runNr, 0, 0);
827 return -1;
828 }
829 }
830
831 // check REFCLK_frequency
832 // check consistency with command configuration
833 // how to log errors?
834 // need gotNewRun/closedRun to know it is finished
835
836 return 0;
837 }
838
839 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
840 {
841 // Currently we send any event no matter what its trigger id is...
842 // To be changed.
843 static Time oldt(boost::date_time::neg_infin);
844 Time newt;
845
846 // FIXME: Only send events if the have newer run-numbers
847 if (newt<oldt+boost::posix_time::seconds(1))
848 return;
849
850 oldt = newt;
851
852 vector<char> data(sizeof(EVENT)+event->Roi*sizeof(float)*1440);
853 memcpy(data.data(), event, sizeof(EVENT));
854
855 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
856
857 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
858 fDimRawData.Update(data);
859
860 CalibData::RemoveSpikes(vec, event->Roi);
861
862 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
863 CalibData::GetPixelStats(data2.data(), vec, event->Roi);
864
865 fDimEventData.Update(data2);
866 }
867
868 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
869 {
870 if (!DataCalib::IsValid())
871 return;
872
873 // Workaround to find a valid header.....
874 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
875 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
876
877 // FIMXE: Compare with target configuration
878
879 const FAD::EventHeader *ptr=beg;
880 for (; ptr!=end; ptr++)
881 {
882 if (ptr->fStartDelimiter==0)
883 continue;
884
885 if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
886 return;
887 }
888
889 if (ptr->fStartDelimiter==0)
890 return;
891
892 vector<float> data(event->Roi*1440);
893 DataCalib::Apply(data.data(), event->Adc_Data, event->StartPix, event->Roi);
894
895 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
896 CalibData::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
897
898// dim_lock();
899 fDimFeedbackData.Update(data2);
900// dim_unlock();
901 }
902
903 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t iboard, void */*buffer*/)
904 {
905 switch (threadID)
906 {
907 case 0:
908 SendRawData(fadhd, event);
909 return 1;
910 case 1:
911 SendFeedbackData(fadhd, event);
912 return 2;
913 }
914 return 100;
915 }
916
917
918 bool IsRunStarted() const
919 {
920 const map<uint32_t,RunDescription>::const_iterator it = fExpectedRuns.find(fRunNumber-1);
921 return it==fExpectedRuns.end();// ? true : it->second.started;
922 }
923
924 uint32_t GetRunNumber() const
925 {
926 return fRunNumber;
927 }
928
929 void IncreaseRunNumber(uint32_t run)
930 {
931 if (run>fRunNumber)
932 fRunNumber = run;
933 }
934
935 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
936 {
937 // This function is called even when writing is switched off
938 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
939 if (it==fExpectedRuns.end())
940 {
941 ostringstream str;
942 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
943 return;
944 }
945
946 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
947 // return: 0=close scheduled / >0 already closed / <0 does not exist
948
949 // FIXME: Move configuration from expected runs to runs which will soon
950 // be opened/closed
951
952 it->second.started = true;
953
954 fExpectedRuns.erase(it);
955 }
956
957 map<boost::thread::id, string> fLastMessage;
958
959 void factOut(int severity, int err, const char *message)
960 {
961 if (!fDebugLog && severity==99)
962 return;
963
964 ostringstream str;
965 //str << boost::this_thread::get_id() << " ";
966 str << "EventBuilder(";
967 if (err<0)
968 str << "---";
969 else
970 str << err;
971 str << "): " << message;
972
973 string &old = fLastMessage[boost::this_thread::get_id()];
974
975 if (str.str()==old)
976 return;
977 old = str.str();
978
979 fMsg.Update(str, severity);
980 }
981/*
982 void factStat(int64_t *stat, int len)
983 {
984 if (len!=7)
985 {
986 fMsg.Warn("factStat received unknown number of values.");
987 return;
988 }
989
990 vector<int64_t> data(1, g_maxMem);
991 data.insert(data.end(), stat, stat+len);
992
993 static vector<int64_t> last(8);
994 if (data==last)
995 return;
996 last = data;
997
998 fDimStatistics.Update(data);
999
1000 // len ist die Laenge des arrays.
1001 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
1002 // kannst Du pruefen, ob die 100MB voll sind ....
1003
1004 ostringstream str;
1005 str
1006 << "Wait=" << stat[0] << " "
1007 << "Skip=" << stat[1] << " "
1008 << "Del=" << stat[2] << " "
1009 << "Tot=" << stat[3] << " "
1010 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1011 << "Read=" << stat[5] << " "
1012 << "Conn=" << stat[6];
1013
1014 fMsg.Info(str);
1015 }
1016 */
1017
1018 void factStat(const EVT_STAT &stat)
1019 {
1020 fDimStatistics2.Update(stat);
1021 }
1022
1023 void factStat(const GUI_STAT &stat)
1024 {
1025 fDimStatistics1.Update(stat);
1026 }
1027
1028
1029 array<FAD::EventHeader, 40> fVecHeader;
1030
1031 template<typename T, class S>
1032 array<T, 42> Compare(const S *vec, const T *t)
1033 {
1034 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1035
1036 const T *min = NULL;
1037 const T *val = NULL;
1038 const T *max = NULL;
1039
1040 array<T, 42> arr;
1041
1042 bool rc = true;
1043 for (int i=0; i<40; i++)
1044 {
1045 const char *base = reinterpret_cast<const char*>(vec+i);
1046 const T *ref = reinterpret_cast<const T*>(base+offset);
1047
1048 arr[i] = *ref;
1049
1050 if (gi_NumConnect[i]!=7)
1051 {
1052 arr[i] = 0;
1053 continue;
1054 }
1055
1056 if (!val)
1057 {
1058 min = ref;
1059 val = ref;
1060 max = ref;
1061 }
1062
1063 if (*ref<*min)
1064 min = ref;
1065
1066 if (*ref>*max)
1067 max = ref;
1068
1069 if (*val!=*ref)
1070 rc = false;
1071 }
1072
1073 arr[40] = val ? *min : 1;
1074 arr[41] = val ? *max : 0;
1075
1076 return arr;
1077 }
1078
1079 template<typename T>
1080 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1081 {
1082 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1083
1084 T val = 0;
1085 T rc = 0;
1086
1087 array<T, 42> vec;
1088
1089 bool first = true;
1090
1091 for (int i=0; i<40; i++)
1092 {
1093 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1094 const T *ref = reinterpret_cast<const T*>(base+offset);
1095
1096 vec[i+2] = *ref;
1097
1098 if (gi_NumConnect[i]!=7)
1099 {
1100 vec[i+2] = 0;
1101 continue;
1102 }
1103
1104 if (first)
1105 {
1106 first = false;
1107 val = *ref;
1108 rc = 0;
1109 }
1110
1111 rc |= val^*ref;
1112 }
1113
1114 vec[0] = rc;
1115 vec[1] = val;
1116
1117 return vec;
1118 }
1119
1120 template<typename T, size_t N>
1121 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1122 {
1123// svc.setQuality(vec[40]<=vec[41]);
1124 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1125 svc.Update();
1126 }
1127
1128 template<typename T>
1129 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1130 {
1131 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1132 for (int i=0; i<40;i++)
1133 cout << " " << data.second[i+3];
1134 cout << endl;
1135 }
1136
1137 vector<uint> fNumConnected;
1138
1139 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1140 {
1141 const uint16_t id = h.Id();
1142 if (id>39)
1143 return;
1144
1145 if (fNumConnected.size()!=40)
1146 fNumConnected.resize(40);
1147
1148 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1149
1150 const bool changed = con!=fNumConnected || !IsThreadRunning();
1151
1152 fNumConnected = con;
1153
1154 const FAD::EventHeader old = fVecHeader[id];
1155 fVecHeader[id] = h;
1156
1157 if (old.fVersion != h.fVersion || changed)
1158 {
1159 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1160
1161 array<float,42> data;
1162 for (int i=0; i<42; i++)
1163 {
1164 ostringstream str;
1165 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1166 data[i] = stof(str.str());
1167 }
1168 Update(fDimFwVersion, data);
1169 }
1170
1171 if (old.fRunNumber != h.fRunNumber || changed)
1172 {
1173 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1174 fDimRunNumber.Update(run);
1175 }
1176
1177 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1178 {
1179 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1180 fDimPrescaler.Update(pre);
1181 }
1182
1183 if (old.fDNA != h.fDNA || changed)
1184 {
1185 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1186 Update(fDimDNA, dna, 40);
1187 }
1188
1189 if (old.fStatus != h.fStatus || changed)
1190 {
1191 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1192 Update(fDimStatus, sts);
1193 }
1194
1195 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1196 {
1197 array<uint16_t, FAD::kNumDac*42> dacs;
1198
1199 for (int i=0; i<FAD::kNumDac; i++)
1200 {
1201 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1202 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1203 }
1204
1205 Update(fDimDac, dacs);
1206 }
1207
1208 // -----------
1209
1210 static Time oldt(boost::date_time::neg_infin);
1211 Time newt;
1212
1213 if (newt>oldt+boost::posix_time::seconds(1))
1214 {
1215 oldt = newt;
1216
1217 // --- RefClock
1218
1219 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1220 Update(fDimRefClock, clk);
1221
1222 // --- Temperatures
1223
1224 const array<int16_t,42> tmp[4] =
1225 {
1226 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1227 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1228 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1229 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1230 };
1231
1232 vector<int16_t> data;
1233 data.reserve(82);
1234 data.push_back(tmp[0][40]); // min: 0
1235 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1236 data.push_back(tmp[0][41]); // max: 41
1237 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1238
1239 for (int j=1; j<=3; j++)
1240 {
1241 const array<int16_t,42> &ref = tmp[j];
1242
1243 // Gloabl min
1244 if (ref[40]<data[0]) // 40=min
1245 data[0] = ref[40];
1246
1247 // Global max
1248 if (ref[41]>data[41]) // 41=max
1249 data[41] = ref[41];
1250
1251 for (int i=0; i<40; i++)
1252 {
1253 // min per board
1254 if (ref[i]<data[i+1]) // data: 1-40
1255 data[i+1] = ref[i]; // ref: 0-39
1256
1257 // max per board
1258 if (ref[i]>data[i+42]) // data: 42-81
1259 data[i+42] = ref[i]; // ref: 0-39
1260 }
1261
1262
1263 }
1264
1265 vector<float> deg(82); // 0: global min, 1-40: min
1266 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1267 deg[i] = data[i]/16.;
1268 fDimTemperature.Update(deg);
1269 }
1270 }
1271};
1272
1273EventBuilderWrapper *EventBuilderWrapper::This = 0;
1274
1275// ----------- Event builder callbacks implementation ---------------
1276extern "C"
1277{
1278 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1279 {
1280 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1281 }
1282
1283 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1284 {
1285 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1286 }
1287
1288 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1289 {
1290 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1291 }
1292
1293 // -----
1294
1295 void *runStart(uint32_t irun, RUN_HEAD *runhd, size_t len)
1296 {
1297 return NULL;
1298 }
1299
1300 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
1301 {
1302 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
1303 }
1304
1305 int runEnd(uint32_t, void *runPtr)
1306 {
1307 return 0;
1308 }
1309
1310 // -----
1311
1312 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
1313 {
1314 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
1315 }
1316
1317 void gotNewRun(int runnr, PEVNT_HEADER *headers)
1318 {
1319 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1320 }
1321
1322 // -----
1323
1324 void factOut(int severity, int err, const char *message)
1325 {
1326 EventBuilderWrapper::This->factOut(severity, err, message);
1327 }
1328
1329 void factStat(GUI_STAT stat)
1330 {
1331 EventBuilderWrapper::This->factStat(stat);
1332 }
1333
1334 void factStatNew(EVT_STAT stat)
1335 {
1336 EventBuilderWrapper::This->factStat(stat);
1337 }
1338
1339 // ------
1340
1341 void debugHead(int socket, int/*board*/, void *buf)
1342 {
1343 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1344
1345 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1346 }
1347
1348 void debugStream(int isock, void *buf, int len)
1349 {
1350 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1351 }
1352
1353 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1354 {
1355 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1356 }
1357}
1358
1359#endif
Note: See TracBrowser for help on using the repository browser.