source: trunk/FACT++/src/EventBuilderWrapper.h@ 12064

Last change on this file since 12064 was 12057, checked in by tbretz, 13 years ago
Simplified SendFeedbackData for the momen
File size: 38.4 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 enum FileFormat_t
78 {
79 kNone = 0,
80 kDebug,
81 kFits,
82 kRaw,
83 kCalib
84 };
85
86 FileFormat_t fFileFormat;
87
88
89 uint32_t fMaxRun;
90 uint32_t fLastOpened;
91 uint32_t fLastClosed;
92 uint32_t fNumEvts[4];
93
94 DimWriteStatistics fDimWriteStats;
95 DimDescribedService fDimRuns;
96 DimDescribedService fDimEvents;
97 DimDescribedService fDimRawData;
98 DimDescribedService fDimEventData;
99 DimDescribedService fDimFeedbackData;
100 DimDescribedService fDimFwVersion;
101 DimDescribedService fDimRunNumber;
102 DimDescribedService fDimStatus;
103 DimDescribedService fDimDNA;
104 DimDescribedService fDimTemperature;
105 DimDescribedService fDimPrescaler;
106 DimDescribedService fDimRefClock;
107 DimDescribedService fDimRoi;
108 DimDescribedService fDimDac;
109 DimDescribedService fDimDrsCalibration;
110 DimDescribedService fDimStatistics1;
111 DimDescribedService fDimStatistics2;
112
113 bool fDebugStream;
114 bool fDebugRead;
115 bool fDebugLog;
116
117 string fPath;
118 uint32_t fRunNumber;
119
120protected:
121 int64_t InitRunNumber()
122 {
123 fRunNumber = 1000;
124
125 // Ensure that the night doesn't change during our check
126 const int check = Time().NightAsInt();
127
128 while (--fRunNumber>0)
129 {
130 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
131
132 if (access((name+"bin").c_str(), F_OK) == 0)
133 break;
134 if (access((name+"fits").c_str(), F_OK) == 0)
135 break;
136 if (access((name+"drs.fits").c_str(), F_OK) == 0)
137 break;
138
139 }
140
141 if (check != Time().NightAsInt())
142 return InitRunNumber();
143
144 fRunNumber++;
145
146 if (fRunNumber==1000)
147 {
148 fMsg.Error("You have a file with run-number 1000 in "+fPath);
149 return -1;
150 }
151
152 ostringstream str;
153 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
154 fMsg.Message(str);
155
156 fMsg.Info(" ==> TODO: Crosscheck with database!");
157
158 return check;
159 }
160
161 int64_t InitRunNumber(const string &path)
162 {
163 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
164 return -1;
165
166 //const fs::path fullPath = fs::system_complete(fs::path(path));
167
168 fPath = path;
169
170 fDimWriteStats.SetCurrentFolder(fPath);
171
172 return InitRunNumber();
173 }
174
175public:
176 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
177 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
178 fDimWriteStats ("FAD_CONTROL", imp),
179 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
180 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
181 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
182 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
183 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
184 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
185 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
186 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
187 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
188 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
189 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
190 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
191 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
192 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
193 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560", ""),
194 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
195 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
196 fDebugStream(false), fDebugRead(false), fDebugLog(false)
197 {
198 if (This)
199 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
200
201 This = this;
202
203 memset(fNumEvts, 0, sizeof(fNumEvts));
204 fDimEvents.Update(fNumEvts);
205
206 for (size_t i=0; i<40; i++)
207 ConnectSlot(i, tcp::endpoint());
208 }
209 virtual ~EventBuilderWrapper()
210 {
211 Abort();
212
213 // FIXME: Used timed_join and abort afterwards
214 // What's the maximum time the eb need to abort?
215 fThread.join();
216 //ffMsg.Info("EventBuilder stopped.");
217
218 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
219 delete *it;
220 }
221
222 struct RunDescription
223 {
224 uint32_t maxtime;
225 uint32_t maxevt;
226
227 string name;
228
229 FAD::Configuration reference;
230
231 bool started;
232 };
233
234 map<uint32_t, RunDescription> fExpectedRuns;
235
236 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
237 {
238 if (maxtime<=0 || maxtime>24*60*60)
239 maxtime = 24*60*60;
240 if (maxevt<=0 || maxevt>INT32_MAX)
241 maxevt = INT32_MAX;
242
243 const RunDescription descr =
244 {
245 uint32_t(maxtime),
246 uint32_t(maxevt),
247 ref.first,
248 ref.second,
249 false
250 };
251
252 // FIMXE: Maybe reset an event counter so that the mcp can count events?
253
254 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
255
256 fExpectedRuns[fRunNumber] = descr;
257 return fRunNumber++;
258 }
259
260 bool IsThreadRunning()
261 {
262 return !fThread.timed_join(boost::posix_time::microseconds(0));
263 }
264
265 void SetMaxMemory(unsigned int mb) const
266 {
267 /*
268 if (mb*1000000<GetUsedMemory())
269 {
270 // ffMsg.Warn("...");
271 return;
272 }*/
273
274 g_maxMem = size_t(mb)*1000000;
275 }
276
277 void StartThread(const vector<tcp::endpoint> &addr)
278 {
279 if (IsThreadRunning())
280 {
281 fMsg.Warn("Start - EventBuilder still running");
282 return;
283 }
284
285 fLastMessage.clear();
286
287 for (size_t i=0; i<40; i++)
288 ConnectSlot(i, addr[i]);
289
290 g_runStat = kModeRun;
291 g_maxProc = 3;
292
293 fMsg.Message("Starting EventBuilder thread");
294
295 fThread = boost::thread(StartEvtBuild);
296 }
297 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
298 {
299 if (i>39)
300 return;
301
302 if (addr==tcp::endpoint())
303 {
304 DisconnectSlot(i);
305 return;
306 }
307
308 g_port[i].sockAddr.sin_family = AF_INET;
309 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
310 g_port[i].sockAddr.sin_port = htons(addr.port());
311 // In this order
312 g_port[i].sockDef = 1;
313 }
314 void DisconnectSlot(unsigned int i)
315 {
316 if (i>39)
317 return;
318
319 g_port[i].sockDef = 0;
320 // In this order
321 g_port[i].sockAddr.sin_family = AF_INET;
322 g_port[i].sockAddr.sin_addr.s_addr = 0;
323 g_port[i].sockAddr.sin_port = 0;
324 }
325 void IgnoreSlot(unsigned int i)
326 {
327 if (i>39)
328 return;
329 if (g_port[i].sockAddr.sin_port==0)
330 return;
331
332 g_port[i].sockDef = -1;
333 }
334
335
336 void Abort()
337 {
338 fMsg.Message("Signal abort to EventBuilder thread...");
339 g_runStat = kAbort;
340 }
341
342 void ResetThread(bool soft)
343 {
344 /*
345 if (g_reset > 0)
346
347 * suspend reading
348 * reset = g_reset;
349 * g_reset=0
350
351 * reset% 10
352 == 0 leave event Buffers as they are
353 == 1 let all buffers drain (write (incomplete) events)
354 > 1 flush all buffers (do not write buffered events)
355
356 * (reset/10)%10
357 > 0 close all sockets and destroy them (also free the
358 allocated read-buffers)
359 recreate before resuming operation
360 [ this is more than just close/open that can be
361 triggered by e.g. close/open the base-socket ]
362
363 * (reset/100)%10
364 > 0 close all open run-files
365
366 * (reset/1000)
367 sleep so many seconds before resuming operation
368 (does not (yet) take into account time left when waiting
369 for buffers getting empty ...)
370
371 * resume_reading
372
373 */
374 fMsg.Message("Signal reset to EventBuilder thread...");
375 g_reset = soft ? 101 : 102;
376 }
377
378 void Exit()
379 {
380 fMsg.Message("Signal exit to EventBuilder thread...");
381 g_runStat = kExit;
382 }
383
384 /*
385 void Wait()
386 {
387 fThread.join();
388 ffMsg.Message("EventBuilder stopped.");
389 }*/
390
391 void Hybernate() const { g_runStat = kHybernate; }
392 void Sleep() const { g_runStat = kSleep; }
393 void FlushMode() const { g_runStat = kModeFlush; }
394 void TestMode() const { g_runStat = kModeTest; }
395 void FlagMode() const { g_runStat = kModeFlag; }
396 void RunMode() const { g_runStat = kModeRun; }
397
398 // FIXME: To be removed
399 void SetMode(int mode) const { g_runStat = mode; }
400
401 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
402 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
403 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
404 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
405
406 /*
407 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
408 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
409 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
410 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
411 */
412
413 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
414 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
415
416 void SetOutputFormat(FileFormat_t f)
417 {
418 fFileFormat = f;
419 if (fFileFormat==kCalib)
420 {
421 DataCalib::Restart();
422 DataCalib::Update(fDimDrsCalibration);
423 fMsg.Message("Resetted DRS calibration.");
424 }
425 }
426
427 void SetDebugLog(bool b) { fDebugLog = b; }
428
429 void SetDebugStream(bool b)
430 {
431 fDebugStream = b;
432 if (b)
433 return;
434
435 for (int i=0; i<40; i++)
436 {
437 if (!fDumpStream[i].is_open())
438 continue;
439
440 fDumpStream[i].close();
441
442 ostringstream name;
443 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
444 fMsg.Message("Closed file '"+name.str()+"'");
445 }
446 }
447
448 void SetDebugRead(bool b)
449 {
450 fDebugRead = b;
451 if (b || !fDumpRead.is_open())
452 return;
453
454 fDumpRead.close();
455 fMsg.Message("Closed file 'socket_events.txt'");
456 }
457
458// size_t GetUsedMemory() const { return gi_usedMem; }
459
460 void LoadDrsCalibration(const char *fname)
461 {
462 if (!DataCalib::ReadFits(fname, fMsg))
463 return;
464 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
465 DataCalib::Update(fDimDrsCalibration);
466 }
467
468 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
469
470
471 /*
472 struct OpenFileToDim
473 {
474 int code;
475 char fileName[FILENAME_MAX];
476 };
477
478 SignalRunOpened(runid, filename);
479 // Send num open files
480 // Send runid, (more info about the run?), filename via dim
481
482 SignalEvtWritten(runid);
483 // Send num events written of newest file
484
485 SignalRunClose(runid);
486 // Send new num open files
487 // Send empty file-name if no file is open
488
489 */
490
491 // -------------- Mapped event builder callbacks ------------------
492
493 void UpdateRuns(const string &fname="")
494 {
495 uint32_t values[5] =
496 {
497 static_cast<uint32_t>(fFiles.size()),
498 0xffffffff,
499 0,
500 fLastOpened,
501 fLastClosed
502 };
503
504 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
505 it!=fFiles.end(); it++)
506 {
507 const DataProcessorImp *file = *it;
508
509 if (file->GetRunId()<values[1])
510 values[1] = file->GetRunId();
511
512 if (file->GetRunId()>values[2])
513 values[2] = file->GetRunId();
514 }
515
516 fMaxRun = values[2];
517
518 vector<char> data(sizeof(values)+fname.size()+1);
519 memcpy(data.data(), values, sizeof(values));
520 strcpy(data.data()+sizeof(values), fname.c_str());
521
522 fDimRuns.Update(data);
523 }
524
525 vector<DataProcessorImp*> fFiles;
526
527 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
528 {
529 fMsg.Info(" ==> TODO: Update run configuration in database!");
530 fMsg.Info(" ==> TODO: Remove run from fExpected runs in case of failure!");
531 fMsg.Info(" ==> TODO: Write information from fTargetConfig to header!");
532
533 // Check if file already exists...
534 DataProcessorImp *file = 0;
535 switch (fFileFormat)
536 {
537 case kNone: file = new DataDump(fPath, runid, fMsg); break;
538 case kDebug: file = new DataDebug(fPath, runid, fMsg); break;
539 case kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
540 case kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
541 case kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
542 }
543
544 try
545 {
546 if (!file->Open(h))
547 return 0;
548 }
549 catch (const exception &e)
550 {
551 fMsg.Error("Exception trying to open file: "+string(e.what()));
552 return 0;
553 }
554
555 fFiles.push_back(file);
556
557 ostringstream str;
558 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
559 fMsg.Info(str);
560
561 fDimWriteStats.FileOpened(file->GetFileName());
562
563 fLastOpened = runid;
564 UpdateRuns(file->GetFileName());
565
566 fNumEvts[kEventId] = 0;
567 fNumEvts[kTriggerId] = 0;
568
569 fNumEvts[kCurrent] = 0;
570 fDimEvents.Update(fNumEvts);
571 // fDimCurrentEvent.Update(uint32_t(0));
572
573 return reinterpret_cast<FileHandle_t>(file);
574 }
575
576 int runWrite(FileHandle_t handler, EVENT *e, size_t sz)
577 {
578 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
579
580 if (!file->WriteEvt(e))
581 return -1;
582
583 if (file->GetRunId()==fMaxRun)
584 {
585 fNumEvts[kCurrent]++;
586 fNumEvts[kEventId] = e->EventNum;
587 fNumEvts[kTriggerId] = e->TriggerNum;
588 }
589
590 fNumEvts[kTotal]++;
591
592 static Time oldt(boost::date_time::neg_infin);
593 Time newt;
594 if (newt>oldt+boost::posix_time::seconds(1))
595 {
596 fDimEvents.Update(fNumEvts);
597 oldt = newt;
598 }
599
600
601 // ===> SignalEvtWritten(runid);
602 // Send num events written of newest file
603
604 /* close run runId (all all runs if runId=0) */
605 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
606 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
607
608 return 0;
609 }
610
611 virtual void CloseRun(uint32_t runid) { }
612
613 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
614 {
615 fMsg.Info(" ==> TODO: Update run configuration in database!");
616 fMsg.Info(" ==> TODO: Remove run from fExpected runs!");
617
618 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
619
620 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
621 if (it==fFiles.end())
622 {
623 ostringstream str;
624 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
625 fMsg.Fatal(str);
626 return -1;
627 }
628
629 fFiles.erase(it);
630
631 fLastClosed = file->GetRunId();
632 CloseRun(fLastClosed);
633 UpdateRuns();
634
635 fDimEvents.Update(fNumEvts);
636
637 const bool rc = file->Close(tail);
638 if (!rc)
639 {
640 // Error message
641 }
642
643 ostringstream str;
644 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
645 fMsg.Info(str);
646
647 delete file;
648
649 // ==> SignalRunClose(runid);
650 // Send new num open files
651 // Send empty file-name if no file is open
652
653 return rc ? 0 : -1;
654 }
655
656 ofstream fDumpStream[40];
657
658 void debugStream(int isock, void *buf, int len)
659 {
660 if (!fDebugStream)
661 return;
662
663 const int slot = isock/7;
664 if (slot<0 || slot>39)
665 return;
666
667 if (!fDumpStream[slot].is_open())
668 {
669 ostringstream name;
670 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
671
672 fDumpStream[slot].open(name.str().c_str(), ios::app);
673 if (!fDumpStream[slot])
674 {
675 ostringstream str;
676 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
677 fMsg.Error(str);
678
679 return;
680 }
681
682 fMsg.Message("Opened file '"+name.str()+"' for writing.");
683 }
684
685 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
686 }
687
688 ofstream fDumpRead; // Stream to possibly dump docket events
689
690 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
691 {
692 // isock = socketID (0-279)
693 // ibyte = #bytes gelesen
694 // event = eventId (oder 0 wenn noch nicht bekannt)
695 // state : 1=finished reading data
696 // 0=reading data
697 // -1=start reading data (header)
698 // -2=start reading data,
699 // eventId not known yet (too little data)
700 // tsec, tusec = time when reading seconds, microseconds
701 //
702 if (!fDebugRead || ibyte==0)
703 return;
704
705 if (!fDumpRead.is_open())
706 {
707 fDumpRead.open("socket_events.txt", ios::app);
708 if (!fDumpRead)
709 {
710 ostringstream str;
711 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
712 fMsg.Error(str);
713
714 return;
715 }
716
717 fMsg.Message("Opened file 'socket_events.txt' for writing.");
718
719 fDumpRead << "# START: " << Time().GetAsStr() << endl;
720 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
721 }
722
723 fDumpRead
724 << setw(2) << state << " "
725 << setw(8) << tsec << " "
726 << setw(9) << tusec << " "
727 << setw(3) << isock << " "
728 << setw(2) << isock/7 << " "
729 << runno << " "
730 << event << " "
731 << ftmevt << " "
732 << ibyte << endl;
733 }
734
735 array<uint16_t,2> fVecRoi;
736
737 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
738 {
739 /*
740 fadhd[i] ist ein array mit den 40 fad-headers
741 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
742
743 event ist die Struktur, die auch die write routine erhaelt;
744 darin sind im header die 'soll-werte' fuer z.B. eventID
745 als auch die ADC-Werte (falls Du die brauchst)
746
747 Wenn die routine einen negativen Wert liefert, wird das event
748 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
749 */
750
751 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
752
753 if (roi!=fVecRoi)
754 {
755 Update(fDimRoi, roi);
756 fVecRoi = roi;
757 }
758
759 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
760 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
761
762 // FIMXE: Compare with target configuration
763
764 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
765 {
766 // FIXME: Compare with expectations!!!
767 if (ptr->fStartDelimiter==0)
768 {
769 if (ptr==beg)
770 beg++;
771 continue;
772 }
773
774 if (beg->fStatus != ptr->fStatus)
775 {
776 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
777 CloseRunFile(runNr, 0, 0);
778 return -1;
779 }
780
781 if (beg->fRunNumber != ptr->fRunNumber)
782 {
783 fMsg.Error("Inconsistent run number detected.... closing run.");
784 CloseRunFile(runNr, 0, 0);
785 return -1;
786 }
787
788 /*
789 if (beg->fVersion != ptr->fVersion)
790 {
791 Error("Inconsist firmware version detected.... closing run.");
792 CloseRunFile(runNr, 0, 0);
793 break;
794 }
795 if (beg->fEventCounter != ptr->fEventCounter)
796 {
797 Error("Inconsist run number detected.... closing run.");
798 CloseRunFile(runNr, 0, 0);
799 break;
800 }
801 if (beg->fTriggerCounter != ptr->fTriggerCounter)
802 {
803 Error("Inconsist trigger number detected.... closing run.");
804 CloseRunFile(runNr, 0, 0);
805 break;
806 }*/
807
808 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
809 {
810 fMsg.Error("Inconsistent phase shift detected.... closing run.");
811 CloseRunFile(runNr, 0, 0);
812 return -1;
813 }
814
815 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
816 {
817 fMsg.Error("Inconsistent DAC values detected.... closing run.");
818 CloseRunFile(runNr, 0, 0);
819 return -1;
820 }
821
822 if (beg->fTriggerType != ptr->fTriggerType)
823 {
824 fMsg.Error("Inconsistent trigger type detected.... closing run.");
825 CloseRunFile(runNr, 0, 0);
826 return -1;
827 }
828 }
829
830 // check REFCLK_frequency
831 // check consistency with command configuration
832 // how to log errors?
833 // need gotNewRun/closedRun to know it is finished
834
835 return 0;
836 }
837
838 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
839 {
840 // Currently we send any event no matter what its trigger id is...
841 // To be changed.
842 static Time oldt(boost::date_time::neg_infin);
843 Time newt;
844
845 // FIXME: Only send events if the have newer run-numbers
846 if (newt<oldt+boost::posix_time::seconds(1))
847 return;
848
849 oldt = newt;
850
851 const size_t sz = sizeof(EVENT)+event->Roi*2*1440;
852
853 vector<char> data(sz+event->Roi*2*1440);
854 memcpy(data.data(), event, sizeof(EVENT));
855
856 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
857
858 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
859 fDimRawData.Update(data);
860
861 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
862 CalibData::GetPixelStats(data2.data(), vec, event->Roi);
863
864 fDimEventData.Update(data2);
865 }
866
867 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
868 {
869 if (!DataCalib::IsValid())
870 return;
871
872/*
873 // Workaround to find a valid header.....
874 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
875 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
876
877 // FIMXE: Compare with target configuration
878
879 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++, beg++)
880 if (ptr->fStartDelimiter!=0)
881 break;
882
883 if (ptr->fStartDelimiter==0)
884 return;
885*/
886 /*
887 // FIXME: Time limit?!
888 static Time oldt(boost::date_time::neg_infin);
889 Time newt;
890
891 // FIXME: Only send events if the have newer run-numbers
892 if (newt<oldt+boost::posix_time::milliseconds(100))
893 return;
894
895 oldt = newt;
896 */
897
898 // FIXME: Check event type here
899
900 const size_t sz = sizeof(EVENT)+event->Roi*2*1440;
901
902 vector<char> data(sz+event->Roi*2*1440);
903 memcpy(data.data(), event, sizeof(EVENT));
904
905 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
906
907 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
908
909 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
910 CalibData::GetPixelMax(data2.data(), vec, event->Roi, 0, event->Roi-1);
911
912// dim_lock();
913 fDimFeedbackData.Update(data2);
914// dim_unlock();
915 }
916
917 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t */*buffer*/)
918 {
919 switch (threadID)
920 {
921 case 0:
922 SendRawData(fadhd, event);
923 return 1;
924 case 1:
925 SendFeedbackData(fadhd, event);
926 return 2;
927 }
928 return 100;
929 }
930
931
932 bool IsRunStarted() const
933 {
934 const map<uint32_t,RunDescription>::const_iterator it = fExpectedRuns.find(fRunNumber-1);
935 return it==fExpectedRuns.end();// ? true : it->second.started;
936 }
937
938 uint32_t GetRunNumber() const
939 {
940 return fRunNumber;
941 }
942
943 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
944 {
945 // This function is called even when writing is switched off
946 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
947 if (it==fExpectedRuns.end())
948 {
949 ostringstream str;
950 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
951 return;
952 }
953
954 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
955 // return: 0=close scheduled / >0 already closed / <0 does not exist
956
957 // FIXME: Move configuration from expected runs to runs which will soon
958 // be opened/closed
959
960 it->second.started = true;
961
962 fExpectedRuns.erase(it);
963 }
964
965 map<boost::thread::id, string> fLastMessage;
966
967 void factOut(int severity, int err, const char *message)
968 {
969 if (!fDebugLog && severity==99)
970 return;
971
972 ostringstream str;
973 //str << boost::this_thread::get_id() << " ";
974 str << "EventBuilder(";
975 if (err<0)
976 str << "---";
977 else
978 str << err;
979 str << "): " << message;
980
981 string &old = fLastMessage[boost::this_thread::get_id()];
982
983 if (str.str()==old)
984 return;
985 old = str.str();
986
987 fMsg.Update(str, severity);
988 }
989/*
990 void factStat(int64_t *stat, int len)
991 {
992 if (len!=7)
993 {
994 fMsg.Warn("factStat received unknown number of values.");
995 return;
996 }
997
998 vector<int64_t> data(1, g_maxMem);
999 data.insert(data.end(), stat, stat+len);
1000
1001 static vector<int64_t> last(8);
1002 if (data==last)
1003 return;
1004 last = data;
1005
1006 fDimStatistics.Update(data);
1007
1008 // len ist die Laenge des arrays.
1009 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
1010 // kannst Du pruefen, ob die 100MB voll sind ....
1011
1012 ostringstream str;
1013 str
1014 << "Wait=" << stat[0] << " "
1015 << "Skip=" << stat[1] << " "
1016 << "Del=" << stat[2] << " "
1017 << "Tot=" << stat[3] << " "
1018 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1019 << "Read=" << stat[5] << " "
1020 << "Conn=" << stat[6];
1021
1022 fMsg.Info(str);
1023 }
1024 */
1025
1026 void factStat(const EVT_STAT &stat)
1027 {
1028 fDimStatistics2.Update(stat);
1029 }
1030
1031 void factStat(const GUI_STAT &stat)
1032 {
1033 fDimStatistics1.Update(stat);
1034 }
1035
1036
1037 array<FAD::EventHeader, 40> fVecHeader;
1038
1039 template<typename T, class S>
1040 array<T, 42> Compare(const S *vec, const T *t)
1041 {
1042 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1043
1044 const T *min = NULL;
1045 const T *val = NULL;
1046 const T *max = NULL;
1047
1048 array<T, 42> arr;
1049
1050 bool rc = true;
1051 for (int i=0; i<40; i++)
1052 {
1053 const char *base = reinterpret_cast<const char*>(vec+i);
1054 const T *ref = reinterpret_cast<const T*>(base+offset);
1055
1056 arr[i] = *ref;
1057
1058 if (gi_NumConnect[i]!=7)
1059 {
1060 arr[i] = 0;
1061 continue;
1062 }
1063
1064 if (!val)
1065 {
1066 min = ref;
1067 val = ref;
1068 max = ref;
1069 }
1070
1071 if (*ref<*min)
1072 min = ref;
1073
1074 if (*ref>*max)
1075 max = ref;
1076
1077 if (*val!=*ref)
1078 rc = false;
1079 }
1080
1081 arr[40] = val ? *min : 1;
1082 arr[41] = val ? *max : 0;
1083
1084 return arr;
1085 }
1086
1087 template<typename T>
1088 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1089 {
1090 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1091
1092 T val = 0;
1093 T rc = 0;
1094
1095 array<T, 42> vec;
1096
1097 bool first = true;
1098
1099 for (int i=0; i<40; i++)
1100 {
1101 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1102 const T *ref = reinterpret_cast<const T*>(base+offset);
1103
1104 vec[i+2] = *ref;
1105
1106 if (gi_NumConnect[i]!=7)
1107 {
1108 vec[i+2] = 0;
1109 continue;
1110 }
1111
1112 if (first)
1113 {
1114 first = false;
1115 val = *ref;
1116 rc = 0;
1117 }
1118
1119 rc |= val^*ref;
1120 }
1121
1122 vec[0] = rc;
1123 vec[1] = val;
1124
1125 return vec;
1126 }
1127
1128 template<typename T, size_t N>
1129 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1130 {
1131// svc.setQuality(vec[40]<=vec[41]);
1132 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1133 svc.Update();
1134 }
1135
1136 template<typename T>
1137 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1138 {
1139 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1140 for (int i=0; i<40;i++)
1141 cout << " " << data.second[i+3];
1142 cout << endl;
1143 }
1144
1145 vector<uint> fNumConnected;
1146
1147 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1148 {
1149 const uint16_t id = h.Id();
1150 if (id>39)
1151 return;
1152
1153 if (fNumConnected.size()!=40)
1154 fNumConnected.resize(40);
1155
1156 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1157
1158 const bool changed = con!=fNumConnected || !IsThreadRunning();
1159
1160 fNumConnected = con;
1161
1162 const FAD::EventHeader old = fVecHeader[id];
1163 fVecHeader[id] = h;
1164
1165 if (old.fVersion != h.fVersion || changed)
1166 {
1167 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1168
1169 array<float,42> data;
1170 for (int i=0; i<42; i++)
1171 {
1172 ostringstream str;
1173 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1174 data[i] = stof(str.str());
1175 }
1176 Update(fDimFwVersion, data);
1177 }
1178
1179 if (old.fRunNumber != h.fRunNumber || changed)
1180 {
1181 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1182 fDimRunNumber.Update(run);
1183 }
1184
1185 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1186 {
1187 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1188 fDimPrescaler.Update(pre);
1189 }
1190
1191 if (old.fDNA != h.fDNA || changed)
1192 {
1193 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1194 Update(fDimDNA, dna, 40);
1195 }
1196
1197 if (old.fStatus != h.fStatus || changed)
1198 {
1199 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1200 Update(fDimStatus, sts);
1201 }
1202
1203 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1204 {
1205 array<uint16_t, FAD::kNumDac*42> dacs;
1206
1207 for (int i=0; i<FAD::kNumDac; i++)
1208 {
1209 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1210 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1211 }
1212
1213 Update(fDimDac, dacs);
1214 }
1215
1216 // -----------
1217
1218 static Time oldt(boost::date_time::neg_infin);
1219 Time newt;
1220
1221 if (newt>oldt+boost::posix_time::seconds(1))
1222 {
1223 oldt = newt;
1224
1225 // --- RefClock
1226
1227 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1228 Update(fDimRefClock, clk);
1229
1230 // --- Temperatures
1231
1232 const array<int16_t,42> tmp[4] =
1233 {
1234 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1235 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1236 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1237 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1238 };
1239
1240 vector<int16_t> data;
1241 data.reserve(82);
1242 data.push_back(tmp[0][40]); // min: 0
1243 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1244 data.push_back(tmp[0][41]); // max: 41
1245 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1246
1247 for (int j=1; j<=3; j++)
1248 {
1249 const array<int16_t,42> &ref = tmp[j];
1250
1251 // Gloabl min
1252 if (ref[40]<data[0]) // 40=min
1253 data[0] = ref[40];
1254
1255 // Global max
1256 if (ref[41]>data[41]) // 41=max
1257 data[41] = ref[41];
1258
1259 for (int i=0; i<40; i++)
1260 {
1261 // min per board
1262 if (ref[i]<data[i+1]) // data: 1-40
1263 data[i+1] = ref[i]; // ref: 0-39
1264
1265 // max per board
1266 if (ref[i]>data[i+42]) // data: 42-81
1267 data[i+42] = ref[i]; // ref: 0-39
1268 }
1269
1270
1271 }
1272
1273 vector<float> deg(82); // 0: global min, 1-40: min
1274 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1275 deg[i] = data[i]/16.;
1276 fDimTemperature.Update(deg);
1277 }
1278 }
1279};
1280
1281EventBuilderWrapper *EventBuilderWrapper::This = 0;
1282
1283// ----------- Event builder callbacks implementation ---------------
1284extern "C"
1285{
1286 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1287 {
1288 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1289 }
1290
1291 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1292 {
1293 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1294 }
1295
1296 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1297 {
1298 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1299 }
1300
1301 void factOut(int severity, int err, const char *message)
1302 {
1303 EventBuilderWrapper::This->factOut(severity, err, message);
1304 }
1305
1306 void factStat(GUI_STAT stat)
1307 {
1308 EventBuilderWrapper::This->factStat(stat);
1309 }
1310
1311 void factStatNew(EVT_STAT stat)
1312 {
1313 EventBuilderWrapper::This->factStat(stat);
1314 }
1315
1316 void debugHead(int socket, int/*board*/, void *buf)
1317 {
1318 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1319
1320 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1321 }
1322
1323 void debugStream(int isock, void *buf, int len)
1324 {
1325 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1326 }
1327
1328 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1329 {
1330 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1331 }
1332
1333 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
1334 {
1335 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event);
1336 }
1337
1338 void gotNewRun(int runnr, PEVNT_HEADER *headers)
1339 {
1340 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1341 }
1342
1343 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer)
1344 {
1345 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, buffer);
1346 }
1347
1348}
1349
1350#endif
Note: See TracBrowser for help on using the repository browser.