source: trunk/FACT++/src/EventBuilderWrapper.h@ 12267

Last change on this file since 12267 was 12223, checked in by tbretz, 13 years ago
Fixed a scoping problem.
File size: 38.2 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 enum FileFormat_t
78 {
79 kNone = 0,
80 kDebug,
81 kFits,
82 kRaw,
83 kCalib
84 };
85
86 FileFormat_t fFileFormat;
87
88
89 uint32_t fMaxRun;
90 uint32_t fLastOpened;
91 uint32_t fLastClosed;
92 uint32_t fNumEvts[4];
93
94 DimWriteStatistics fDimWriteStats;
95 DimDescribedService fDimRuns;
96 DimDescribedService fDimEvents;
97 DimDescribedService fDimRawData;
98 DimDescribedService fDimEventData;
99 DimDescribedService fDimFeedbackData;
100 DimDescribedService fDimFwVersion;
101 DimDescribedService fDimRunNumber;
102 DimDescribedService fDimStatus;
103 DimDescribedService fDimDNA;
104 DimDescribedService fDimTemperature;
105 DimDescribedService fDimPrescaler;
106 DimDescribedService fDimRefClock;
107 DimDescribedService fDimRoi;
108 DimDescribedService fDimDac;
109 DimDescribedService fDimDrsCalibration;
110 DimDescribedService fDimStatistics1;
111 DimDescribedService fDimStatistics2;
112
113 bool fDebugStream;
114 bool fDebugRead;
115 bool fDebugLog;
116
117 string fPath;
118 uint32_t fRunNumber;
119
120protected:
121 int64_t InitRunNumber()
122 {
123 fRunNumber = 1000;
124
125 // Ensure that the night doesn't change during our check
126 const int check = Time().NightAsInt();
127
128 while (--fRunNumber>0)
129 {
130 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
131
132 if (access((name+"bin").c_str(), F_OK) == 0)
133 break;
134 if (access((name+"fits").c_str(), F_OK) == 0)
135 break;
136 if (access((name+"drs.fits").c_str(), F_OK) == 0)
137 break;
138
139 }
140
141 if (check != Time().NightAsInt())
142 return InitRunNumber();
143
144 fRunNumber++;
145
146 if (fRunNumber==1000)
147 {
148 fMsg.Error("You have a file with run-number 1000 in "+fPath);
149 return -1;
150 }
151
152 ostringstream str;
153 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
154 fMsg.Message(str);
155
156 fMsg.Info(" ==> TODO: Crosscheck with database!");
157
158 return check;
159 }
160
161 int64_t InitRunNumber(const string &path)
162 {
163 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
164 return -1;
165
166 //const fs::path fullPath = fs::system_complete(fs::path(path));
167
168 fPath = path;
169
170 fDimWriteStats.SetCurrentFolder(fPath);
171
172 return InitRunNumber();
173 }
174
175public:
176 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
177 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
178 fDimWriteStats ("FAD_CONTROL", imp),
179 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
180 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
181 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
182 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
183 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
184 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
185 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
186 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
187 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
188 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
189 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
190 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
191 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
192 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
193 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560", ""),
194 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
195 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
196 fDebugStream(false), fDebugRead(false), fDebugLog(false)
197 {
198 if (This)
199 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
200
201 This = this;
202
203 memset(fNumEvts, 0, sizeof(fNumEvts));
204 fDimEvents.Update(fNumEvts);
205
206 for (size_t i=0; i<40; i++)
207 ConnectSlot(i, tcp::endpoint());
208 }
209 virtual ~EventBuilderWrapper()
210 {
211 Abort();
212
213 // FIXME: Used timed_join and abort afterwards
214 // What's the maximum time the eb need to abort?
215 fThread.join();
216 //ffMsg.Info("EventBuilder stopped.");
217
218 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
219 delete *it;
220 }
221
222 struct RunDescription
223 {
224 uint32_t maxtime;
225 uint32_t maxevt;
226
227 string name;
228
229 FAD::Configuration reference;
230
231 bool started;
232 };
233
234 map<uint32_t, RunDescription> fExpectedRuns;
235
236 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
237 {
238 if (maxtime<=0 || maxtime>24*60*60)
239 maxtime = 24*60*60;
240 if (maxevt<=0 || maxevt>INT32_MAX)
241 maxevt = INT32_MAX;
242
243 const RunDescription descr =
244 {
245 uint32_t(maxtime),
246 uint32_t(maxevt),
247 ref.first,
248 ref.second,
249 false
250 };
251
252 // FIMXE: Maybe reset an event counter so that the mcp can count events?
253
254 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
255
256 fExpectedRuns[fRunNumber] = descr;
257 return fRunNumber++;
258 }
259
260 bool IsThreadRunning()
261 {
262 return !fThread.timed_join(boost::posix_time::microseconds(0));
263 }
264
265 void SetMaxMemory(unsigned int mb) const
266 {
267 /*
268 if (mb*1000000<GetUsedMemory())
269 {
270 // ffMsg.Warn("...");
271 return;
272 }*/
273
274 g_maxMem = size_t(mb)*1000000;
275 }
276
277 void StartThread(const vector<tcp::endpoint> &addr)
278 {
279 if (IsThreadRunning())
280 {
281 fMsg.Warn("Start - EventBuilder still running");
282 return;
283 }
284
285 fLastMessage.clear();
286
287 for (size_t i=0; i<40; i++)
288 ConnectSlot(i, addr[i]);
289
290 g_runStat = kModeRun;
291 g_maxProc = 3;
292
293 fMsg.Message("Starting EventBuilder thread");
294
295 fThread = boost::thread(StartEvtBuild);
296 }
297 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
298 {
299 if (i>39)
300 return;
301
302 if (addr==tcp::endpoint())
303 {
304 DisconnectSlot(i);
305 return;
306 }
307
308 g_port[i].sockAddr.sin_family = AF_INET;
309 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
310 g_port[i].sockAddr.sin_port = htons(addr.port());
311 // In this order
312 g_port[i].sockDef = 1;
313 }
314 void DisconnectSlot(unsigned int i)
315 {
316 if (i>39)
317 return;
318
319 g_port[i].sockDef = 0;
320 // In this order
321 g_port[i].sockAddr.sin_family = AF_INET;
322 g_port[i].sockAddr.sin_addr.s_addr = 0;
323 g_port[i].sockAddr.sin_port = 0;
324 }
325 void IgnoreSlot(unsigned int i)
326 {
327 if (i>39)
328 return;
329 if (g_port[i].sockAddr.sin_port==0)
330 return;
331
332 g_port[i].sockDef = -1;
333 }
334
335
336 void Abort()
337 {
338 fMsg.Message("Signal abort to EventBuilder thread...");
339 g_runStat = kAbort;
340 }
341
342 void ResetThread(bool soft)
343 {
344 /*
345 if (g_reset > 0)
346
347 * suspend reading
348 * reset = g_reset;
349 * g_reset=0
350
351 * reset% 10
352 == 0 leave event Buffers as they are
353 == 1 let all buffers drain (write (incomplete) events)
354 > 1 flush all buffers (do not write buffered events)
355
356 * (reset/10)%10
357 > 0 close all sockets and destroy them (also free the
358 allocated read-buffers)
359 recreate before resuming operation
360 [ this is more than just close/open that can be
361 triggered by e.g. close/open the base-socket ]
362
363 * (reset/100)%10
364 > 0 close all open run-files
365
366 * (reset/1000)
367 sleep so many seconds before resuming operation
368 (does not (yet) take into account time left when waiting
369 for buffers getting empty ...)
370
371 * resume_reading
372
373 */
374 fMsg.Message("Signal reset to EventBuilder thread...");
375 g_reset = soft ? 101 : 102;
376 }
377
378 void Exit()
379 {
380 fMsg.Message("Signal exit to EventBuilder thread...");
381 g_runStat = kExit;
382 }
383
384 /*
385 void Wait()
386 {
387 fThread.join();
388 ffMsg.Message("EventBuilder stopped.");
389 }*/
390
391 void Hybernate() const { g_runStat = kHybernate; }
392 void Sleep() const { g_runStat = kSleep; }
393 void FlushMode() const { g_runStat = kModeFlush; }
394 void TestMode() const { g_runStat = kModeTest; }
395 void FlagMode() const { g_runStat = kModeFlag; }
396 void RunMode() const { g_runStat = kModeRun; }
397
398 // FIXME: To be removed
399 void SetMode(int mode) const { g_runStat = mode; }
400
401 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
402 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
403 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
404 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
405
406 /*
407 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
408 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
409 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
410 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
411 */
412
413 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
414 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
415
416 void SetOutputFormat(FileFormat_t f)
417 {
418 fFileFormat = f;
419 if (fFileFormat==kCalib)
420 {
421 DataCalib::Restart();
422 DataCalib::Update(fDimDrsCalibration);
423 fMsg.Message("Resetted DRS calibration.");
424 }
425 }
426
427 void SetDebugLog(bool b) { fDebugLog = b; }
428
429 void SetDebugStream(bool b)
430 {
431 fDebugStream = b;
432 if (b)
433 return;
434
435 for (int i=0; i<40; i++)
436 {
437 if (!fDumpStream[i].is_open())
438 continue;
439
440 fDumpStream[i].close();
441
442 ostringstream name;
443 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
444 fMsg.Message("Closed file '"+name.str()+"'");
445 }
446 }
447
448 void SetDebugRead(bool b)
449 {
450 fDebugRead = b;
451 if (b || !fDumpRead.is_open())
452 return;
453
454 fDumpRead.close();
455 fMsg.Message("Closed file 'socket_events.txt'");
456 }
457
458// size_t GetUsedMemory() const { return gi_usedMem; }
459
460 void LoadDrsCalibration(const char *fname)
461 {
462 if (!DataCalib::ReadFits(fname, fMsg))
463 return;
464 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
465 DataCalib::Update(fDimDrsCalibration);
466 }
467
468 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
469
470
471 /*
472 struct OpenFileToDim
473 {
474 int code;
475 char fileName[FILENAME_MAX];
476 };
477
478 SignalRunOpened(runid, filename);
479 // Send num open files
480 // Send runid, (more info about the run?), filename via dim
481
482 SignalEvtWritten(runid);
483 // Send num events written of newest file
484
485 SignalRunClose(runid);
486 // Send new num open files
487 // Send empty file-name if no file is open
488
489 */
490
491 // -------------- Mapped event builder callbacks ------------------
492
493 void UpdateRuns(const string &fname="")
494 {
495 uint32_t values[5] =
496 {
497 static_cast<uint32_t>(fFiles.size()),
498 0xffffffff,
499 0,
500 fLastOpened,
501 fLastClosed
502 };
503
504 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
505 it!=fFiles.end(); it++)
506 {
507 const DataProcessorImp *file = *it;
508
509 if (file->GetRunId()<values[1])
510 values[1] = file->GetRunId();
511
512 if (file->GetRunId()>values[2])
513 values[2] = file->GetRunId();
514 }
515
516 fMaxRun = values[2];
517
518 vector<char> data(sizeof(values)+fname.size()+1);
519 memcpy(data.data(), values, sizeof(values));
520 strcpy(data.data()+sizeof(values), fname.c_str());
521
522 fDimRuns.Update(data);
523 }
524
525 vector<DataProcessorImp*> fFiles;
526
527 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
528 {
529 fMsg.Info(" ==> TODO: Update run configuration in database!");
530 fMsg.Info(" ==> TODO: Remove run from fExpected runs in case of failure!");
531 fMsg.Info(" ==> TODO: Write information from fTargetConfig to header!");
532
533 // Check if file already exists...
534 DataProcessorImp *file = 0;
535 switch (fFileFormat)
536 {
537 case kNone: file = new DataDump(fPath, runid, fMsg); break;
538 case kDebug: file = new DataDebug(fPath, runid, fMsg); break;
539 case kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
540 case kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
541 case kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
542 }
543
544 try
545 {
546 if (!file->Open(h))
547 return 0;
548 }
549 catch (const exception &e)
550 {
551 fMsg.Error("Exception trying to open file: "+string(e.what()));
552 return 0;
553 }
554
555 fFiles.push_back(file);
556
557 ostringstream str;
558 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
559 fMsg.Info(str);
560
561 fDimWriteStats.FileOpened(file->GetFileName());
562
563 fLastOpened = runid;
564 UpdateRuns(file->GetFileName());
565
566 fNumEvts[kEventId] = 0;
567 fNumEvts[kTriggerId] = 0;
568
569 fNumEvts[kCurrent] = 0;
570 fDimEvents.Update(fNumEvts);
571 // fDimCurrentEvent.Update(uint32_t(0));
572
573 return reinterpret_cast<FileHandle_t>(file);
574 }
575
576 int runWrite(FileHandle_t handler, EVENT *e, size_t sz)
577 {
578 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
579
580 if (!file->WriteEvt(e))
581 return -1;
582
583 if (file->GetRunId()==fMaxRun)
584 {
585 fNumEvts[kCurrent]++;
586 fNumEvts[kEventId] = e->EventNum;
587 fNumEvts[kTriggerId] = e->TriggerNum;
588 }
589
590 fNumEvts[kTotal]++;
591
592 static Time oldt(boost::date_time::neg_infin);
593 Time newt;
594 if (newt>oldt+boost::posix_time::seconds(1))
595 {
596 fDimEvents.Update(fNumEvts);
597 oldt = newt;
598 }
599
600
601 // ===> SignalEvtWritten(runid);
602 // Send num events written of newest file
603
604 /* close run runId (all all runs if runId=0) */
605 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
606 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
607
608 return 0;
609 }
610
611 virtual void CloseRun(uint32_t runid) { }
612
613 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
614 {
615 fMsg.Info(" ==> TODO: Update run configuration in database!");
616 fMsg.Info(" ==> TODO: Remove run from fExpected runs!");
617
618 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
619
620 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
621 if (it==fFiles.end())
622 {
623 ostringstream str;
624 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
625 fMsg.Fatal(str);
626 return -1;
627 }
628
629 fFiles.erase(it);
630
631 fLastClosed = file->GetRunId();
632 CloseRun(fLastClosed);
633 UpdateRuns();
634
635 fDimEvents.Update(fNumEvts);
636
637 const bool rc = file->Close(tail);
638 if (!rc)
639 {
640 // Error message
641 }
642
643 ostringstream str;
644 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
645 fMsg.Info(str);
646
647 delete file;
648
649 // ==> SignalRunClose(runid);
650 // Send new num open files
651 // Send empty file-name if no file is open
652
653 return rc ? 0 : -1;
654 }
655
656 ofstream fDumpStream[40];
657
658 void debugStream(int isock, void *buf, int len)
659 {
660 if (!fDebugStream)
661 return;
662
663 const int slot = isock/7;
664 if (slot<0 || slot>39)
665 return;
666
667 if (!fDumpStream[slot].is_open())
668 {
669 ostringstream name;
670 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
671
672 fDumpStream[slot].open(name.str().c_str(), ios::app);
673 if (!fDumpStream[slot])
674 {
675 ostringstream str;
676 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
677 fMsg.Error(str);
678
679 return;
680 }
681
682 fMsg.Message("Opened file '"+name.str()+"' for writing.");
683 }
684
685 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
686 }
687
688 ofstream fDumpRead; // Stream to possibly dump docket events
689
690 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
691 {
692 // isock = socketID (0-279)
693 // ibyte = #bytes gelesen
694 // event = eventId (oder 0 wenn noch nicht bekannt)
695 // state : 1=finished reading data
696 // 0=reading data
697 // -1=start reading data (header)
698 // -2=start reading data,
699 // eventId not known yet (too little data)
700 // tsec, tusec = time when reading seconds, microseconds
701 //
702 if (!fDebugRead || ibyte==0)
703 return;
704
705 if (!fDumpRead.is_open())
706 {
707 fDumpRead.open("socket_events.txt", ios::app);
708 if (!fDumpRead)
709 {
710 ostringstream str;
711 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
712 fMsg.Error(str);
713
714 return;
715 }
716
717 fMsg.Message("Opened file 'socket_events.txt' for writing.");
718
719 fDumpRead << "# START: " << Time().GetAsStr() << endl;
720 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
721 }
722
723 fDumpRead
724 << setw(2) << state << " "
725 << setw(8) << tsec << " "
726 << setw(9) << tusec << " "
727 << setw(3) << isock << " "
728 << setw(2) << isock/7 << " "
729 << runno << " "
730 << event << " "
731 << ftmevt << " "
732 << ibyte << endl;
733 }
734
735 array<uint16_t,2> fVecRoi;
736
737 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int iboard)
738 {
739 /*
740 fadhd[i] ist ein array mit den 40 fad-headers
741 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
742
743 event ist die Struktur, die auch die write routine erhaelt;
744 darin sind im header die 'soll-werte' fuer z.B. eventID
745 als auch die ADC-Werte (falls Du die brauchst)
746
747 Wenn die routine einen negativen Wert liefert, wird das event
748 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
749 */
750
751 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
752
753 if (roi!=fVecRoi)
754 {
755 Update(fDimRoi, roi);
756 fVecRoi = roi;
757 }
758
759 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
760 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
761
762 // FIMXE: Compare with target configuration
763
764 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
765 {
766 // FIXME: Compare with expectations!!!
767 if (ptr->fStartDelimiter==0)
768 {
769 if (ptr==beg)
770 beg++;
771 continue;
772 }
773
774 if (beg->fStatus != ptr->fStatus)
775 {
776 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
777 CloseRunFile(runNr, 0, 0);
778 return -1;
779 }
780
781 if (beg->fRunNumber != ptr->fRunNumber)
782 {
783 fMsg.Error("Inconsistent run number detected.... closing run.");
784 CloseRunFile(runNr, 0, 0);
785 return -1;
786 }
787
788 /*
789 if (beg->fVersion != ptr->fVersion)
790 {
791 Error("Inconsist firmware version detected.... closing run.");
792 CloseRunFile(runNr, 0, 0);
793 break;
794 }
795 if (beg->fEventCounter != ptr->fEventCounter)
796 {
797 Error("Inconsist run number detected.... closing run.");
798 CloseRunFile(runNr, 0, 0);
799 break;
800 }
801 if (beg->fTriggerCounter != ptr->fTriggerCounter)
802 {
803 Error("Inconsist trigger number detected.... closing run.");
804 CloseRunFile(runNr, 0, 0);
805 break;
806 }*/
807
808 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
809 {
810 fMsg.Error("Inconsistent phase shift detected.... closing run.");
811 CloseRunFile(runNr, 0, 0);
812 return -1;
813 }
814
815 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
816 {
817 fMsg.Error("Inconsistent DAC values detected.... closing run.");
818 CloseRunFile(runNr, 0, 0);
819 return -1;
820 }
821
822 if (beg->fTriggerType != ptr->fTriggerType)
823 {
824 fMsg.Error("Inconsistent trigger type detected.... closing run.");
825 CloseRunFile(runNr, 0, 0);
826 return -1;
827 }
828 }
829
830 // check REFCLK_frequency
831 // check consistency with command configuration
832 // how to log errors?
833 // need gotNewRun/closedRun to know it is finished
834
835 return 0;
836 }
837
838 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
839 {
840 // Currently we send any event no matter what its trigger id is...
841 // To be changed.
842 static Time oldt(boost::date_time::neg_infin);
843 Time newt;
844
845 // FIXME: Only send events if the have newer run-numbers
846 if (newt<oldt+boost::posix_time::seconds(1))
847 return;
848
849 oldt = newt;
850
851 vector<char> data(sizeof(EVENT)+event->Roi*sizeof(float)*1440);
852 memcpy(data.data(), event, sizeof(EVENT));
853
854 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
855
856 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
857 fDimRawData.Update(data);
858
859 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
860 CalibData::GetPixelStats(data2.data(), vec, event->Roi);
861
862 fDimEventData.Update(data2);
863 }
864
865 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
866 {
867 if (!DataCalib::IsValid())
868 return;
869
870 // Workaround to find a valid header.....
871 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
872 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
873
874 // FIMXE: Compare with target configuration
875
876 const FAD::EventHeader *ptr=beg;
877 for (; ptr!=end; ptr++)
878 {
879 if (ptr->fStartDelimiter==0)
880 continue;
881
882 if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
883 return;
884 }
885
886 if (ptr->fStartDelimiter==0)
887 return;
888
889 vector<float> data(event->Roi*1440);
890 DataCalib::Apply(data.data(), event->Adc_Data, event->StartPix, event->Roi);
891
892 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
893 CalibData::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
894
895// dim_lock();
896 fDimFeedbackData.Update(data2);
897// dim_unlock();
898 }
899
900 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t iboard, void */*buffer*/)
901 {
902 switch (threadID)
903 {
904 case 0:
905 SendRawData(fadhd, event);
906 return 1;
907 case 1:
908 SendFeedbackData(fadhd, event);
909 return 2;
910 }
911 return 100;
912 }
913
914
915 bool IsRunStarted() const
916 {
917 const map<uint32_t,RunDescription>::const_iterator it = fExpectedRuns.find(fRunNumber-1);
918 return it==fExpectedRuns.end();// ? true : it->second.started;
919 }
920
921 uint32_t GetRunNumber() const
922 {
923 return fRunNumber;
924 }
925
926 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
927 {
928 // This function is called even when writing is switched off
929 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
930 if (it==fExpectedRuns.end())
931 {
932 ostringstream str;
933 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
934 return;
935 }
936
937 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
938 // return: 0=close scheduled / >0 already closed / <0 does not exist
939
940 // FIXME: Move configuration from expected runs to runs which will soon
941 // be opened/closed
942
943 it->second.started = true;
944
945 fExpectedRuns.erase(it);
946 }
947
948 map<boost::thread::id, string> fLastMessage;
949
950 void factOut(int severity, int err, const char *message)
951 {
952 if (!fDebugLog && severity==99)
953 return;
954
955 ostringstream str;
956 //str << boost::this_thread::get_id() << " ";
957 str << "EventBuilder(";
958 if (err<0)
959 str << "---";
960 else
961 str << err;
962 str << "): " << message;
963
964 string &old = fLastMessage[boost::this_thread::get_id()];
965
966 if (str.str()==old)
967 return;
968 old = str.str();
969
970 fMsg.Update(str, severity);
971 }
972/*
973 void factStat(int64_t *stat, int len)
974 {
975 if (len!=7)
976 {
977 fMsg.Warn("factStat received unknown number of values.");
978 return;
979 }
980
981 vector<int64_t> data(1, g_maxMem);
982 data.insert(data.end(), stat, stat+len);
983
984 static vector<int64_t> last(8);
985 if (data==last)
986 return;
987 last = data;
988
989 fDimStatistics.Update(data);
990
991 // len ist die Laenge des arrays.
992 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
993 // kannst Du pruefen, ob die 100MB voll sind ....
994
995 ostringstream str;
996 str
997 << "Wait=" << stat[0] << " "
998 << "Skip=" << stat[1] << " "
999 << "Del=" << stat[2] << " "
1000 << "Tot=" << stat[3] << " "
1001 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1002 << "Read=" << stat[5] << " "
1003 << "Conn=" << stat[6];
1004
1005 fMsg.Info(str);
1006 }
1007 */
1008
1009 void factStat(const EVT_STAT &stat)
1010 {
1011 fDimStatistics2.Update(stat);
1012 }
1013
1014 void factStat(const GUI_STAT &stat)
1015 {
1016 fDimStatistics1.Update(stat);
1017 }
1018
1019
1020 array<FAD::EventHeader, 40> fVecHeader;
1021
1022 template<typename T, class S>
1023 array<T, 42> Compare(const S *vec, const T *t)
1024 {
1025 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1026
1027 const T *min = NULL;
1028 const T *val = NULL;
1029 const T *max = NULL;
1030
1031 array<T, 42> arr;
1032
1033 bool rc = true;
1034 for (int i=0; i<40; i++)
1035 {
1036 const char *base = reinterpret_cast<const char*>(vec+i);
1037 const T *ref = reinterpret_cast<const T*>(base+offset);
1038
1039 arr[i] = *ref;
1040
1041 if (gi_NumConnect[i]!=7)
1042 {
1043 arr[i] = 0;
1044 continue;
1045 }
1046
1047 if (!val)
1048 {
1049 min = ref;
1050 val = ref;
1051 max = ref;
1052 }
1053
1054 if (*ref<*min)
1055 min = ref;
1056
1057 if (*ref>*max)
1058 max = ref;
1059
1060 if (*val!=*ref)
1061 rc = false;
1062 }
1063
1064 arr[40] = val ? *min : 1;
1065 arr[41] = val ? *max : 0;
1066
1067 return arr;
1068 }
1069
1070 template<typename T>
1071 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1072 {
1073 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1074
1075 T val = 0;
1076 T rc = 0;
1077
1078 array<T, 42> vec;
1079
1080 bool first = true;
1081
1082 for (int i=0; i<40; i++)
1083 {
1084 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1085 const T *ref = reinterpret_cast<const T*>(base+offset);
1086
1087 vec[i+2] = *ref;
1088
1089 if (gi_NumConnect[i]!=7)
1090 {
1091 vec[i+2] = 0;
1092 continue;
1093 }
1094
1095 if (first)
1096 {
1097 first = false;
1098 val = *ref;
1099 rc = 0;
1100 }
1101
1102 rc |= val^*ref;
1103 }
1104
1105 vec[0] = rc;
1106 vec[1] = val;
1107
1108 return vec;
1109 }
1110
1111 template<typename T, size_t N>
1112 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1113 {
1114// svc.setQuality(vec[40]<=vec[41]);
1115 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1116 svc.Update();
1117 }
1118
1119 template<typename T>
1120 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1121 {
1122 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1123 for (int i=0; i<40;i++)
1124 cout << " " << data.second[i+3];
1125 cout << endl;
1126 }
1127
1128 vector<uint> fNumConnected;
1129
1130 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1131 {
1132 const uint16_t id = h.Id();
1133 if (id>39)
1134 return;
1135
1136 if (fNumConnected.size()!=40)
1137 fNumConnected.resize(40);
1138
1139 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1140
1141 const bool changed = con!=fNumConnected || !IsThreadRunning();
1142
1143 fNumConnected = con;
1144
1145 const FAD::EventHeader old = fVecHeader[id];
1146 fVecHeader[id] = h;
1147
1148 if (old.fVersion != h.fVersion || changed)
1149 {
1150 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1151
1152 array<float,42> data;
1153 for (int i=0; i<42; i++)
1154 {
1155 ostringstream str;
1156 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1157 data[i] = stof(str.str());
1158 }
1159 Update(fDimFwVersion, data);
1160 }
1161
1162 if (old.fRunNumber != h.fRunNumber || changed)
1163 {
1164 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1165 fDimRunNumber.Update(run);
1166 }
1167
1168 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1169 {
1170 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1171 fDimPrescaler.Update(pre);
1172 }
1173
1174 if (old.fDNA != h.fDNA || changed)
1175 {
1176 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1177 Update(fDimDNA, dna, 40);
1178 }
1179
1180 if (old.fStatus != h.fStatus || changed)
1181 {
1182 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1183 Update(fDimStatus, sts);
1184 }
1185
1186 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1187 {
1188 array<uint16_t, FAD::kNumDac*42> dacs;
1189
1190 for (int i=0; i<FAD::kNumDac; i++)
1191 {
1192 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1193 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1194 }
1195
1196 Update(fDimDac, dacs);
1197 }
1198
1199 // -----------
1200
1201 static Time oldt(boost::date_time::neg_infin);
1202 Time newt;
1203
1204 if (newt>oldt+boost::posix_time::seconds(1))
1205 {
1206 oldt = newt;
1207
1208 // --- RefClock
1209
1210 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1211 Update(fDimRefClock, clk);
1212
1213 // --- Temperatures
1214
1215 const array<int16_t,42> tmp[4] =
1216 {
1217 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1218 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1219 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1220 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1221 };
1222
1223 vector<int16_t> data;
1224 data.reserve(82);
1225 data.push_back(tmp[0][40]); // min: 0
1226 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1227 data.push_back(tmp[0][41]); // max: 41
1228 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1229
1230 for (int j=1; j<=3; j++)
1231 {
1232 const array<int16_t,42> &ref = tmp[j];
1233
1234 // Gloabl min
1235 if (ref[40]<data[0]) // 40=min
1236 data[0] = ref[40];
1237
1238 // Global max
1239 if (ref[41]>data[41]) // 41=max
1240 data[41] = ref[41];
1241
1242 for (int i=0; i<40; i++)
1243 {
1244 // min per board
1245 if (ref[i]<data[i+1]) // data: 1-40
1246 data[i+1] = ref[i]; // ref: 0-39
1247
1248 // max per board
1249 if (ref[i]>data[i+42]) // data: 42-81
1250 data[i+42] = ref[i]; // ref: 0-39
1251 }
1252
1253
1254 }
1255
1256 vector<float> deg(82); // 0: global min, 1-40: min
1257 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1258 deg[i] = data[i]/16.;
1259 fDimTemperature.Update(deg);
1260 }
1261 }
1262};
1263
1264EventBuilderWrapper *EventBuilderWrapper::This = 0;
1265
1266// ----------- Event builder callbacks implementation ---------------
1267extern "C"
1268{
1269 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1270 {
1271 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1272 }
1273
1274 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1275 {
1276 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1277 }
1278
1279 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1280 {
1281 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1282 }
1283
1284 // -----
1285
1286 void *runStart(uint32_t irun, RUN_HEAD *runhd, size_t len)
1287 {
1288 return NULL;
1289 }
1290
1291 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
1292 {
1293 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
1294 }
1295
1296 int runEnd(uint32_t, void *runPtr)
1297 {
1298 return 0;
1299 }
1300
1301 // -----
1302
1303 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
1304 {
1305 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
1306 }
1307
1308 void gotNewRun(int runnr, PEVNT_HEADER *headers)
1309 {
1310 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1311 }
1312
1313 // -----
1314
1315 void factOut(int severity, int err, const char *message)
1316 {
1317 EventBuilderWrapper::This->factOut(severity, err, message);
1318 }
1319
1320 void factStat(GUI_STAT stat)
1321 {
1322 EventBuilderWrapper::This->factStat(stat);
1323 }
1324
1325 void factStatNew(EVT_STAT stat)
1326 {
1327 EventBuilderWrapper::This->factStat(stat);
1328 }
1329
1330 // ------
1331
1332 void debugHead(int socket, int/*board*/, void *buf)
1333 {
1334 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1335
1336 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1337 }
1338
1339 void debugStream(int isock, void *buf, int len)
1340 {
1341 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1342 }
1343
1344 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1345 {
1346 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1347 }
1348}
1349
1350#endif
Note: See TracBrowser for help on using the repository browser.