source: trunk/FACT++/src/EventBuilderWrapper.h@ 12309

Last change on this file since 12309 was 12286, checked in by tbretz, 13 years ago
Fixed a misspelled class name.
File size: 38.3 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 enum FileFormat_t
78 {
79 kNone = 0,
80 kDebug,
81 kFits,
82 kRaw,
83 kCalib
84 };
85
86 FileFormat_t fFileFormat;
87
88
89 uint32_t fMaxRun;
90 uint32_t fLastOpened;
91 uint32_t fLastClosed;
92 uint32_t fNumEvts[4];
93
94 DimWriteStatistics fDimWriteStats;
95 DimDescribedService fDimRuns;
96 DimDescribedService fDimEvents;
97 DimDescribedService fDimRawData;
98 DimDescribedService fDimEventData;
99 DimDescribedService fDimFeedbackData;
100 DimDescribedService fDimFwVersion;
101 DimDescribedService fDimRunNumber;
102 DimDescribedService fDimStatus;
103 DimDescribedService fDimDNA;
104 DimDescribedService fDimTemperature;
105 DimDescribedService fDimPrescaler;
106 DimDescribedService fDimRefClock;
107 DimDescribedService fDimRoi;
108 DimDescribedService fDimDac;
109 DimDescribedService fDimDrsCalibration;
110 DimDescribedService fDimStatistics1;
111 DimDescribedService fDimStatistics2;
112
113 bool fDebugStream;
114 bool fDebugRead;
115 bool fDebugLog;
116
117 string fPath;
118 uint32_t fRunNumber;
119
120protected:
121 int64_t InitRunNumber()
122 {
123 fRunNumber = 1000;
124
125 // Ensure that the night doesn't change during our check
126 const int check = Time().NightAsInt();
127
128 while (--fRunNumber>0)
129 {
130 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
131
132 if (access((name+"bin").c_str(), F_OK) == 0)
133 break;
134 if (access((name+"fits").c_str(), F_OK) == 0)
135 break;
136 if (access((name+"drs.fits").c_str(), F_OK) == 0)
137 break;
138
139 }
140
141 if (check != Time().NightAsInt())
142 return InitRunNumber();
143
144 fRunNumber++;
145
146 if (fRunNumber==1000)
147 {
148 fMsg.Error("You have a file with run-number 1000 in "+fPath);
149 return -1;
150 }
151
152 ostringstream str;
153 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
154 fMsg.Message(str);
155
156 fMsg.Info(" ==> TODO: Crosscheck with database!");
157
158 return check;
159 }
160
161 int64_t InitRunNumber(const string &path)
162 {
163 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
164 return -1;
165
166 //const fs::path fullPath = fs::system_complete(fs::path(path));
167
168 fPath = path;
169
170 fDimWriteStats.SetCurrentFolder(fPath);
171
172 return InitRunNumber();
173 }
174
175public:
176 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
177 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
178 fDimWriteStats ("FAD_CONTROL", imp),
179 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
180 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
181 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
182 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
183 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
184 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
185 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
186 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
187 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
188 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
189 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
190 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
191 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
192 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
193 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560", ""),
194 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
195 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
196 fDebugStream(false), fDebugRead(false), fDebugLog(false)
197 {
198 if (This)
199 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
200
201 This = this;
202
203 memset(fNumEvts, 0, sizeof(fNumEvts));
204 fDimEvents.Update(fNumEvts);
205
206 for (size_t i=0; i<40; i++)
207 ConnectSlot(i, tcp::endpoint());
208 }
209 virtual ~EventBuilderWrapper()
210 {
211 Abort();
212
213 // FIXME: Used timed_join and abort afterwards
214 // What's the maximum time the eb need to abort?
215 fThread.join();
216 //ffMsg.Info("EventBuilder stopped.");
217
218 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
219 delete *it;
220 }
221
222 struct RunDescription
223 {
224 uint32_t maxtime;
225 uint32_t maxevt;
226
227 string name;
228
229 FAD::Configuration reference;
230
231 bool started;
232 };
233
234 map<uint32_t, RunDescription> fExpectedRuns;
235
236 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
237 {
238 if (maxtime<=0 || maxtime>24*60*60)
239 maxtime = 24*60*60;
240 if (maxevt<=0 || maxevt>INT32_MAX)
241 maxevt = INT32_MAX;
242
243 const RunDescription descr =
244 {
245 uint32_t(maxtime),
246 uint32_t(maxevt),
247 ref.first,
248 ref.second,
249 false
250 };
251
252 // FIMXE: Maybe reset an event counter so that the mcp can count events?
253
254 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
255
256 fExpectedRuns[fRunNumber] = descr;
257 return fRunNumber++;
258 }
259
260 bool IsThreadRunning()
261 {
262 return !fThread.timed_join(boost::posix_time::microseconds(0));
263 }
264
265 void SetMaxMemory(unsigned int mb) const
266 {
267 /*
268 if (mb*1000000<GetUsedMemory())
269 {
270 // ffMsg.Warn("...");
271 return;
272 }*/
273
274 g_maxMem = size_t(mb)*1000000;
275 }
276
277 void StartThread(const vector<tcp::endpoint> &addr)
278 {
279 if (IsThreadRunning())
280 {
281 fMsg.Warn("Start - EventBuilder still running");
282 return;
283 }
284
285 fLastMessage.clear();
286
287 for (size_t i=0; i<40; i++)
288 ConnectSlot(i, addr[i]);
289
290 g_runStat = kModeRun;
291 g_maxProc = 3;
292
293 fMsg.Message("Starting EventBuilder thread");
294
295 fThread = boost::thread(StartEvtBuild);
296 }
297 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
298 {
299 if (i>39)
300 return;
301
302 if (addr==tcp::endpoint())
303 {
304 DisconnectSlot(i);
305 return;
306 }
307
308 g_port[i].sockAddr.sin_family = AF_INET;
309 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
310 g_port[i].sockAddr.sin_port = htons(addr.port());
311 // In this order
312 g_port[i].sockDef = 1;
313 }
314 void DisconnectSlot(unsigned int i)
315 {
316 if (i>39)
317 return;
318
319 g_port[i].sockDef = 0;
320 // In this order
321 g_port[i].sockAddr.sin_family = AF_INET;
322 g_port[i].sockAddr.sin_addr.s_addr = 0;
323 g_port[i].sockAddr.sin_port = 0;
324 }
325 void IgnoreSlot(unsigned int i)
326 {
327 if (i>39)
328 return;
329 if (g_port[i].sockAddr.sin_port==0)
330 return;
331
332 g_port[i].sockDef = -1;
333 }
334
335
336 void Abort()
337 {
338 fMsg.Message("Signal abort to EventBuilder thread...");
339 g_runStat = kAbort;
340 }
341
342 void ResetThread(bool soft)
343 {
344 /*
345 if (g_reset > 0)
346
347 * suspend reading
348 * reset = g_reset;
349 * g_reset=0
350
351 * reset% 10
352 == 0 leave event Buffers as they are
353 == 1 let all buffers drain (write (incomplete) events)
354 > 1 flush all buffers (do not write buffered events)
355
356 * (reset/10)%10
357 > 0 close all sockets and destroy them (also free the
358 allocated read-buffers)
359 recreate before resuming operation
360 [ this is more than just close/open that can be
361 triggered by e.g. close/open the base-socket ]
362
363 * (reset/100)%10
364 > 0 close all open run-files
365
366 * (reset/1000)
367 sleep so many seconds before resuming operation
368 (does not (yet) take into account time left when waiting
369 for buffers getting empty ...)
370
371 * resume_reading
372
373 */
374 fMsg.Message("Signal reset to EventBuilder thread...");
375 g_reset = soft ? 101 : 102;
376 }
377
378 void Exit()
379 {
380 fMsg.Message("Signal exit to EventBuilder thread...");
381 g_runStat = kExit;
382 }
383
384 /*
385 void Wait()
386 {
387 fThread.join();
388 ffMsg.Message("EventBuilder stopped.");
389 }*/
390
391 void Hybernate() const { g_runStat = kHybernate; }
392 void Sleep() const { g_runStat = kSleep; }
393 void FlushMode() const { g_runStat = kModeFlush; }
394 void TestMode() const { g_runStat = kModeTest; }
395 void FlagMode() const { g_runStat = kModeFlag; }
396 void RunMode() const { g_runStat = kModeRun; }
397
398 // FIXME: To be removed
399 void SetMode(int mode) const { g_runStat = mode; }
400
401 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
402 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
403 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
404 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
405
406 /*
407 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
408 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
409 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
410 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
411 */
412
413 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
414 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
415
416 void SetOutputFormat(FileFormat_t f)
417 {
418 fFileFormat = f;
419 if (fFileFormat==kCalib)
420 {
421 DataCalib::Restart();
422 DataCalib::Update(fDimDrsCalibration);
423 fMsg.Message("Resetted DRS calibration.");
424 }
425 }
426
427 void SetDebugLog(bool b) { fDebugLog = b; }
428
429 void SetDebugStream(bool b)
430 {
431 fDebugStream = b;
432 if (b)
433 return;
434
435 for (int i=0; i<40; i++)
436 {
437 if (!fDumpStream[i].is_open())
438 continue;
439
440 fDumpStream[i].close();
441
442 ostringstream name;
443 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
444 fMsg.Message("Closed file '"+name.str()+"'");
445 }
446 }
447
448 void SetDebugRead(bool b)
449 {
450 fDebugRead = b;
451 if (b || !fDumpRead.is_open())
452 return;
453
454 fDumpRead.close();
455 fMsg.Message("Closed file 'socket_events.txt'");
456 }
457
458// size_t GetUsedMemory() const { return gi_usedMem; }
459
460 void LoadDrsCalibration(const char *fname)
461 {
462 if (!DataCalib::ReadFits(fname, fMsg))
463 return;
464 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
465 DataCalib::Update(fDimDrsCalibration);
466 }
467
468 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
469
470
471 /*
472 struct OpenFileToDim
473 {
474 int code;
475 char fileName[FILENAME_MAX];
476 };
477
478 SignalRunOpened(runid, filename);
479 // Send num open files
480 // Send runid, (more info about the run?), filename via dim
481
482 SignalEvtWritten(runid);
483 // Send num events written of newest file
484
485 SignalRunClose(runid);
486 // Send new num open files
487 // Send empty file-name if no file is open
488
489 */
490
491 // -------------- Mapped event builder callbacks ------------------
492
493 void UpdateRuns(const string &fname="")
494 {
495 uint32_t values[5] =
496 {
497 static_cast<uint32_t>(fFiles.size()),
498 0xffffffff,
499 0,
500 fLastOpened,
501 fLastClosed
502 };
503
504 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
505 it!=fFiles.end(); it++)
506 {
507 const DataProcessorImp *file = *it;
508
509 if (file->GetRunId()<values[1])
510 values[1] = file->GetRunId();
511
512 if (file->GetRunId()>values[2])
513 values[2] = file->GetRunId();
514 }
515
516 fMaxRun = values[2];
517
518 vector<char> data(sizeof(values)+fname.size()+1);
519 memcpy(data.data(), values, sizeof(values));
520 strcpy(data.data()+sizeof(values), fname.c_str());
521
522 fDimRuns.Update(data);
523 }
524
525 vector<DataProcessorImp*> fFiles;
526
527 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
528 {
529 fMsg.Info(" ==> TODO: Update run configuration in database!");
530 fMsg.Info(" ==> TODO: Remove run from fExpected runs in case of failure!");
531 fMsg.Info(" ==> TODO: Write information from fTargetConfig to header!");
532
533 // Check if file already exists...
534 DataProcessorImp *file = 0;
535 switch (fFileFormat)
536 {
537 case kNone: file = new DataDump(fPath, runid, fMsg); break;
538 case kDebug: file = new DataDebug(fPath, runid, fMsg); break;
539 case kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
540 case kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
541 case kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
542 }
543
544 try
545 {
546 if (!file->Open(h))
547 return 0;
548 }
549 catch (const exception &e)
550 {
551 fMsg.Error("Exception trying to open file: "+string(e.what()));
552 return 0;
553 }
554
555 fFiles.push_back(file);
556
557 ostringstream str;
558 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
559 fMsg.Info(str);
560
561 fDimWriteStats.FileOpened(file->GetFileName());
562
563 fLastOpened = runid;
564 UpdateRuns(file->GetFileName());
565
566 fNumEvts[kEventId] = 0;
567 fNumEvts[kTriggerId] = 0;
568
569 fNumEvts[kCurrent] = 0;
570 fDimEvents.Update(fNumEvts);
571 // fDimCurrentEvent.Update(uint32_t(0));
572
573 return reinterpret_cast<FileHandle_t>(file);
574 }
575
576 int runWrite(FileHandle_t handler, EVENT *e, size_t sz)
577 {
578 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
579
580 if (!file->WriteEvt(e))
581 return -1;
582
583 if (file->GetRunId()==fMaxRun)
584 {
585 fNumEvts[kCurrent]++;
586 fNumEvts[kEventId] = e->EventNum;
587 fNumEvts[kTriggerId] = e->TriggerNum;
588 }
589
590 fNumEvts[kTotal]++;
591
592 static Time oldt(boost::date_time::neg_infin);
593 Time newt;
594 if (newt>oldt+boost::posix_time::seconds(1))
595 {
596 fDimEvents.Update(fNumEvts);
597 oldt = newt;
598 }
599
600
601 // ===> SignalEvtWritten(runid);
602 // Send num events written of newest file
603
604 /* close run runId (all all runs if runId=0) */
605 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
606 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
607
608 return 0;
609 }
610
611 virtual void CloseRun(uint32_t runid) { }
612
613 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
614 {
615 fMsg.Info(" ==> TODO: Update run configuration in database!");
616 fMsg.Info(" ==> TODO: Remove run from fExpected runs!");
617
618 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
619
620 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
621 if (it==fFiles.end())
622 {
623 ostringstream str;
624 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
625 fMsg.Fatal(str);
626 return -1;
627 }
628
629 fFiles.erase(it);
630
631 fLastClosed = file->GetRunId();
632 CloseRun(fLastClosed);
633 UpdateRuns();
634
635 fDimEvents.Update(fNumEvts);
636
637 const bool rc = file->Close(tail);
638 if (!rc)
639 {
640 // Error message
641 }
642
643 ostringstream str;
644 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
645 fMsg.Info(str);
646
647 delete file;
648
649 // ==> SignalRunClose(runid);
650 // Send new num open files
651 // Send empty file-name if no file is open
652
653 return rc ? 0 : -1;
654 }
655
656 ofstream fDumpStream[40];
657
658 void debugStream(int isock, void *buf, int len)
659 {
660 if (!fDebugStream)
661 return;
662
663 const int slot = isock/7;
664 if (slot<0 || slot>39)
665 return;
666
667 if (!fDumpStream[slot].is_open())
668 {
669 ostringstream name;
670 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
671
672 fDumpStream[slot].open(name.str().c_str(), ios::app);
673 if (!fDumpStream[slot])
674 {
675 ostringstream str;
676 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
677 fMsg.Error(str);
678
679 return;
680 }
681
682 fMsg.Message("Opened file '"+name.str()+"' for writing.");
683 }
684
685 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
686 }
687
688 ofstream fDumpRead; // Stream to possibly dump docket events
689
690 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
691 {
692 // isock = socketID (0-279)
693 // ibyte = #bytes gelesen
694 // event = eventId (oder 0 wenn noch nicht bekannt)
695 // state : 1=finished reading data
696 // 0=reading data
697 // -1=start reading data (header)
698 // -2=start reading data,
699 // eventId not known yet (too little data)
700 // tsec, tusec = time when reading seconds, microseconds
701 //
702 if (!fDebugRead || ibyte==0)
703 return;
704
705 if (!fDumpRead.is_open())
706 {
707 fDumpRead.open("socket_events.txt", ios::app);
708 if (!fDumpRead)
709 {
710 ostringstream str;
711 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
712 fMsg.Error(str);
713
714 return;
715 }
716
717 fMsg.Message("Opened file 'socket_events.txt' for writing.");
718
719 fDumpRead << "# START: " << Time().GetAsStr() << endl;
720 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
721 }
722
723 fDumpRead
724 << setw(2) << state << " "
725 << setw(8) << tsec << " "
726 << setw(9) << tusec << " "
727 << setw(3) << isock << " "
728 << setw(2) << isock/7 << " "
729 << runno << " "
730 << event << " "
731 << ftmevt << " "
732 << ibyte << endl;
733 }
734
735 array<uint16_t,2> fVecRoi;
736
737 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int iboard)
738 {
739 /*
740 fadhd[i] ist ein array mit den 40 fad-headers
741 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
742
743 event ist die Struktur, die auch die write routine erhaelt;
744 darin sind im header die 'soll-werte' fuer z.B. eventID
745 als auch die ADC-Werte (falls Du die brauchst)
746
747 Wenn die routine einen negativen Wert liefert, wird das event
748 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
749 */
750
751 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
752
753 if (roi!=fVecRoi)
754 {
755 Update(fDimRoi, roi);
756 fVecRoi = roi;
757 }
758
759 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
760 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
761
762 // FIMXE: Compare with target configuration
763
764 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
765 {
766 // FIXME: Compare with expectations!!!
767 if (ptr->fStartDelimiter==0)
768 {
769 if (ptr==beg)
770 beg++;
771 continue;
772 }
773
774 if (beg->fStatus != ptr->fStatus)
775 {
776 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
777 CloseRunFile(runNr, 0, 0);
778 return -1;
779 }
780
781 if (beg->fRunNumber != ptr->fRunNumber)
782 {
783 fMsg.Error("Inconsistent run number detected.... closing run.");
784 CloseRunFile(runNr, 0, 0);
785 return -1;
786 }
787
788 /*
789 if (beg->fVersion != ptr->fVersion)
790 {
791 Error("Inconsist firmware version detected.... closing run.");
792 CloseRunFile(runNr, 0, 0);
793 break;
794 }
795 if (beg->fEventCounter != ptr->fEventCounter)
796 {
797 Error("Inconsist run number detected.... closing run.");
798 CloseRunFile(runNr, 0, 0);
799 break;
800 }
801 if (beg->fTriggerCounter != ptr->fTriggerCounter)
802 {
803 Error("Inconsist trigger number detected.... closing run.");
804 CloseRunFile(runNr, 0, 0);
805 break;
806 }*/
807
808 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
809 {
810 fMsg.Error("Inconsistent phase shift detected.... closing run.");
811 CloseRunFile(runNr, 0, 0);
812 return -1;
813 }
814
815 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
816 {
817 fMsg.Error("Inconsistent DAC values detected.... closing run.");
818 CloseRunFile(runNr, 0, 0);
819 return -1;
820 }
821
822 if (beg->fTriggerType != ptr->fTriggerType)
823 {
824 fMsg.Error("Inconsistent trigger type detected.... closing run.");
825 CloseRunFile(runNr, 0, 0);
826 return -1;
827 }
828 }
829
830 // check REFCLK_frequency
831 // check consistency with command configuration
832 // how to log errors?
833 // need gotNewRun/closedRun to know it is finished
834
835 return 0;
836 }
837
838 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
839 {
840 // Currently we send any event no matter what its trigger id is...
841 // To be changed.
842 static Time oldt(boost::date_time::neg_infin);
843 Time newt;
844
845 // FIXME: Only send events if the have newer run-numbers
846 if (newt<oldt+boost::posix_time::seconds(1))
847 return;
848
849 oldt = newt;
850
851 vector<char> data(sizeof(EVENT)+event->Roi*sizeof(float)*1440);
852 memcpy(data.data(), event, sizeof(EVENT));
853
854 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
855
856 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
857 fDimRawData.Update(data);
858
859 CalibData::RemoveSpikes(vec, event->Roi);
860
861 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
862 CalibData::GetPixelStats(data2.data(), vec, event->Roi);
863
864 fDimEventData.Update(data2);
865 }
866
867 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
868 {
869 if (!DataCalib::IsValid())
870 return;
871
872 // Workaround to find a valid header.....
873 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
874 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
875
876 // FIMXE: Compare with target configuration
877
878 const FAD::EventHeader *ptr=beg;
879 for (; ptr!=end; ptr++)
880 {
881 if (ptr->fStartDelimiter==0)
882 continue;
883
884 if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
885 return;
886 }
887
888 if (ptr->fStartDelimiter==0)
889 return;
890
891 vector<float> data(event->Roi*1440);
892 DataCalib::Apply(data.data(), event->Adc_Data, event->StartPix, event->Roi);
893
894 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
895 CalibData::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
896
897// dim_lock();
898 fDimFeedbackData.Update(data2);
899// dim_unlock();
900 }
901
902 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t iboard, void */*buffer*/)
903 {
904 switch (threadID)
905 {
906 case 0:
907 SendRawData(fadhd, event);
908 return 1;
909 case 1:
910 SendFeedbackData(fadhd, event);
911 return 2;
912 }
913 return 100;
914 }
915
916
917 bool IsRunStarted() const
918 {
919 const map<uint32_t,RunDescription>::const_iterator it = fExpectedRuns.find(fRunNumber-1);
920 return it==fExpectedRuns.end();// ? true : it->second.started;
921 }
922
923 uint32_t GetRunNumber() const
924 {
925 return fRunNumber;
926 }
927
928 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
929 {
930 // This function is called even when writing is switched off
931 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
932 if (it==fExpectedRuns.end())
933 {
934 ostringstream str;
935 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
936 return;
937 }
938
939 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
940 // return: 0=close scheduled / >0 already closed / <0 does not exist
941
942 // FIXME: Move configuration from expected runs to runs which will soon
943 // be opened/closed
944
945 it->second.started = true;
946
947 fExpectedRuns.erase(it);
948 }
949
950 map<boost::thread::id, string> fLastMessage;
951
952 void factOut(int severity, int err, const char *message)
953 {
954 if (!fDebugLog && severity==99)
955 return;
956
957 ostringstream str;
958 //str << boost::this_thread::get_id() << " ";
959 str << "EventBuilder(";
960 if (err<0)
961 str << "---";
962 else
963 str << err;
964 str << "): " << message;
965
966 string &old = fLastMessage[boost::this_thread::get_id()];
967
968 if (str.str()==old)
969 return;
970 old = str.str();
971
972 fMsg.Update(str, severity);
973 }
974/*
975 void factStat(int64_t *stat, int len)
976 {
977 if (len!=7)
978 {
979 fMsg.Warn("factStat received unknown number of values.");
980 return;
981 }
982
983 vector<int64_t> data(1, g_maxMem);
984 data.insert(data.end(), stat, stat+len);
985
986 static vector<int64_t> last(8);
987 if (data==last)
988 return;
989 last = data;
990
991 fDimStatistics.Update(data);
992
993 // len ist die Laenge des arrays.
994 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
995 // kannst Du pruefen, ob die 100MB voll sind ....
996
997 ostringstream str;
998 str
999 << "Wait=" << stat[0] << " "
1000 << "Skip=" << stat[1] << " "
1001 << "Del=" << stat[2] << " "
1002 << "Tot=" << stat[3] << " "
1003 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1004 << "Read=" << stat[5] << " "
1005 << "Conn=" << stat[6];
1006
1007 fMsg.Info(str);
1008 }
1009 */
1010
1011 void factStat(const EVT_STAT &stat)
1012 {
1013 fDimStatistics2.Update(stat);
1014 }
1015
1016 void factStat(const GUI_STAT &stat)
1017 {
1018 fDimStatistics1.Update(stat);
1019 }
1020
1021
1022 array<FAD::EventHeader, 40> fVecHeader;
1023
1024 template<typename T, class S>
1025 array<T, 42> Compare(const S *vec, const T *t)
1026 {
1027 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1028
1029 const T *min = NULL;
1030 const T *val = NULL;
1031 const T *max = NULL;
1032
1033 array<T, 42> arr;
1034
1035 bool rc = true;
1036 for (int i=0; i<40; i++)
1037 {
1038 const char *base = reinterpret_cast<const char*>(vec+i);
1039 const T *ref = reinterpret_cast<const T*>(base+offset);
1040
1041 arr[i] = *ref;
1042
1043 if (gi_NumConnect[i]!=7)
1044 {
1045 arr[i] = 0;
1046 continue;
1047 }
1048
1049 if (!val)
1050 {
1051 min = ref;
1052 val = ref;
1053 max = ref;
1054 }
1055
1056 if (*ref<*min)
1057 min = ref;
1058
1059 if (*ref>*max)
1060 max = ref;
1061
1062 if (*val!=*ref)
1063 rc = false;
1064 }
1065
1066 arr[40] = val ? *min : 1;
1067 arr[41] = val ? *max : 0;
1068
1069 return arr;
1070 }
1071
1072 template<typename T>
1073 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1074 {
1075 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1076
1077 T val = 0;
1078 T rc = 0;
1079
1080 array<T, 42> vec;
1081
1082 bool first = true;
1083
1084 for (int i=0; i<40; i++)
1085 {
1086 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1087 const T *ref = reinterpret_cast<const T*>(base+offset);
1088
1089 vec[i+2] = *ref;
1090
1091 if (gi_NumConnect[i]!=7)
1092 {
1093 vec[i+2] = 0;
1094 continue;
1095 }
1096
1097 if (first)
1098 {
1099 first = false;
1100 val = *ref;
1101 rc = 0;
1102 }
1103
1104 rc |= val^*ref;
1105 }
1106
1107 vec[0] = rc;
1108 vec[1] = val;
1109
1110 return vec;
1111 }
1112
1113 template<typename T, size_t N>
1114 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1115 {
1116// svc.setQuality(vec[40]<=vec[41]);
1117 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1118 svc.Update();
1119 }
1120
1121 template<typename T>
1122 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1123 {
1124 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1125 for (int i=0; i<40;i++)
1126 cout << " " << data.second[i+3];
1127 cout << endl;
1128 }
1129
1130 vector<uint> fNumConnected;
1131
1132 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1133 {
1134 const uint16_t id = h.Id();
1135 if (id>39)
1136 return;
1137
1138 if (fNumConnected.size()!=40)
1139 fNumConnected.resize(40);
1140
1141 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1142
1143 const bool changed = con!=fNumConnected || !IsThreadRunning();
1144
1145 fNumConnected = con;
1146
1147 const FAD::EventHeader old = fVecHeader[id];
1148 fVecHeader[id] = h;
1149
1150 if (old.fVersion != h.fVersion || changed)
1151 {
1152 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1153
1154 array<float,42> data;
1155 for (int i=0; i<42; i++)
1156 {
1157 ostringstream str;
1158 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1159 data[i] = stof(str.str());
1160 }
1161 Update(fDimFwVersion, data);
1162 }
1163
1164 if (old.fRunNumber != h.fRunNumber || changed)
1165 {
1166 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1167 fDimRunNumber.Update(run);
1168 }
1169
1170 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1171 {
1172 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1173 fDimPrescaler.Update(pre);
1174 }
1175
1176 if (old.fDNA != h.fDNA || changed)
1177 {
1178 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1179 Update(fDimDNA, dna, 40);
1180 }
1181
1182 if (old.fStatus != h.fStatus || changed)
1183 {
1184 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1185 Update(fDimStatus, sts);
1186 }
1187
1188 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1189 {
1190 array<uint16_t, FAD::kNumDac*42> dacs;
1191
1192 for (int i=0; i<FAD::kNumDac; i++)
1193 {
1194 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1195 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1196 }
1197
1198 Update(fDimDac, dacs);
1199 }
1200
1201 // -----------
1202
1203 static Time oldt(boost::date_time::neg_infin);
1204 Time newt;
1205
1206 if (newt>oldt+boost::posix_time::seconds(1))
1207 {
1208 oldt = newt;
1209
1210 // --- RefClock
1211
1212 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1213 Update(fDimRefClock, clk);
1214
1215 // --- Temperatures
1216
1217 const array<int16_t,42> tmp[4] =
1218 {
1219 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1220 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1221 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1222 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1223 };
1224
1225 vector<int16_t> data;
1226 data.reserve(82);
1227 data.push_back(tmp[0][40]); // min: 0
1228 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1229 data.push_back(tmp[0][41]); // max: 41
1230 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1231
1232 for (int j=1; j<=3; j++)
1233 {
1234 const array<int16_t,42> &ref = tmp[j];
1235
1236 // Gloabl min
1237 if (ref[40]<data[0]) // 40=min
1238 data[0] = ref[40];
1239
1240 // Global max
1241 if (ref[41]>data[41]) // 41=max
1242 data[41] = ref[41];
1243
1244 for (int i=0; i<40; i++)
1245 {
1246 // min per board
1247 if (ref[i]<data[i+1]) // data: 1-40
1248 data[i+1] = ref[i]; // ref: 0-39
1249
1250 // max per board
1251 if (ref[i]>data[i+42]) // data: 42-81
1252 data[i+42] = ref[i]; // ref: 0-39
1253 }
1254
1255
1256 }
1257
1258 vector<float> deg(82); // 0: global min, 1-40: min
1259 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1260 deg[i] = data[i]/16.;
1261 fDimTemperature.Update(deg);
1262 }
1263 }
1264};
1265
1266EventBuilderWrapper *EventBuilderWrapper::This = 0;
1267
1268// ----------- Event builder callbacks implementation ---------------
1269extern "C"
1270{
1271 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1272 {
1273 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1274 }
1275
1276 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1277 {
1278 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1279 }
1280
1281 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1282 {
1283 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1284 }
1285
1286 // -----
1287
1288 void *runStart(uint32_t irun, RUN_HEAD *runhd, size_t len)
1289 {
1290 return NULL;
1291 }
1292
1293 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
1294 {
1295 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
1296 }
1297
1298 int runEnd(uint32_t, void *runPtr)
1299 {
1300 return 0;
1301 }
1302
1303 // -----
1304
1305 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
1306 {
1307 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
1308 }
1309
1310 void gotNewRun(int runnr, PEVNT_HEADER *headers)
1311 {
1312 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1313 }
1314
1315 // -----
1316
1317 void factOut(int severity, int err, const char *message)
1318 {
1319 EventBuilderWrapper::This->factOut(severity, err, message);
1320 }
1321
1322 void factStat(GUI_STAT stat)
1323 {
1324 EventBuilderWrapper::This->factStat(stat);
1325 }
1326
1327 void factStatNew(EVT_STAT stat)
1328 {
1329 EventBuilderWrapper::This->factStat(stat);
1330 }
1331
1332 // ------
1333
1334 void debugHead(int socket, int/*board*/, void *buf)
1335 {
1336 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1337
1338 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1339 }
1340
1341 void debugStream(int isock, void *buf, int len)
1342 {
1343 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1344 }
1345
1346 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1347 {
1348 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1349 }
1350}
1351
1352#endif
Note: See TracBrowser for help on using the repository browser.