source: trunk/FACT++/src/EventBuilderWrapper.h@ 12000

Last change on this file since 12000 was 11968, checked in by tbretz, 13 years ago
Moved sending of Event Data to new threads (subProcEvt); added fDimFeedbackData
File size: 37.8 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 enum FileFormat_t
78 {
79 kNone = 0,
80 kDebug,
81 kFits,
82 kRaw,
83 kCalib
84 };
85
86 FileFormat_t fFileFormat;
87
88
89 uint32_t fMaxRun;
90 uint32_t fLastOpened;
91 uint32_t fLastClosed;
92 uint32_t fNumEvts[4];
93
94 DimWriteStatistics fDimWriteStats;
95 DimDescribedService fDimRuns;
96 DimDescribedService fDimEvents;
97 DimDescribedService fDimRawData;
98 DimDescribedService fDimEventData;
99 DimDescribedService fDimFeedbackData;
100 DimDescribedService fDimFwVersion;
101 DimDescribedService fDimRunNumber;
102 DimDescribedService fDimStatus;
103 DimDescribedService fDimDNA;
104 DimDescribedService fDimTemperature;
105 DimDescribedService fDimPrescaler;
106 DimDescribedService fDimRefClock;
107 DimDescribedService fDimRoi;
108 DimDescribedService fDimDac;
109 DimDescribedService fDimDrsCalibration;
110 DimDescribedService fDimStatistics1;
111 DimDescribedService fDimStatistics2;
112
113 bool fDebugStream;
114 bool fDebugRead;
115 bool fDebugLog;
116
117 string fPath;
118 uint32_t fRunNumber;
119
120protected:
121 int64_t InitRunNumber()
122 {
123 fRunNumber = 1000;
124
125 // Ensure that the night doesn't change during our check
126 const int check = Time().NightAsInt();
127
128 while (--fRunNumber>0)
129 {
130 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
131
132 if (access((name+"bin").c_str(), F_OK) == 0)
133 break;
134 if (access((name+"fits").c_str(), F_OK) == 0)
135 break;
136 if (access((name+"drs.fits").c_str(), F_OK) == 0)
137 break;
138 }
139
140 if (check != Time().NightAsInt())
141 return InitRunNumber();
142
143 fRunNumber++;
144
145 if (fRunNumber==1000)
146 {
147 fMsg.Error("You have a file with run-number 1000 in "+fPath);
148 return -1;
149 }
150
151 ostringstream str;
152 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
153 fMsg.Message(str);
154
155 fMsg.Info(" ==> TODO: Crosscheck with database!");
156
157 return check;
158 }
159
160 int64_t InitRunNumber(const string &path)
161 {
162 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
163 return -1;
164
165 //const fs::path fullPath = fs::system_complete(fs::path(path));
166
167 fPath = path;
168
169 return InitRunNumber();
170 }
171
172public:
173 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
174 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
175 fDimWriteStats ("FAD_CONTROL", imp),
176 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
177 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
178 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
179 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
180 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
181 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
182 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
183 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
184 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
185 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
186 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
187 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
188 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
189 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
190 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560", ""),
191 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
192 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
193 fDebugStream(false), fDebugRead(false), fDebugLog(false)
194 {
195 if (This)
196 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
197
198 This = this;
199
200 memset(fNumEvts, 0, sizeof(fNumEvts));
201
202 fDimEvents.Update(fNumEvts);
203
204 for (size_t i=0; i<40; i++)
205 ConnectSlot(i, tcp::endpoint());
206 }
207 virtual ~EventBuilderWrapper()
208 {
209 Abort();
210
211 // FIXME: Used timed_join and abort afterwards
212 // What's the maximum time the eb need to abort?
213 fThread.join();
214 //ffMsg.Info("EventBuilder stopped.");
215
216 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
217 delete *it;
218 }
219
220 struct RunDescription
221 {
222 uint32_t maxtime;
223 uint32_t maxevt;
224
225 string name;
226
227 FAD::Configuration reference;
228
229 bool started;
230 };
231
232 map<uint32_t, RunDescription> fExpectedRuns;
233
234 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
235 {
236 if (maxtime<=0 || maxtime>24*60*60)
237 maxtime = 24*60*60;
238 if (maxevt<=0 || maxevt>INT32_MAX)
239 maxevt = INT32_MAX;
240
241 const RunDescription descr =
242 {
243 uint32_t(maxtime),
244 uint32_t(maxevt),
245 ref.first,
246 ref.second,
247 false
248 };
249
250 // FIMXE: Maybe reset an event counter so that the mcp can count events?
251
252 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
253
254 fExpectedRuns[fRunNumber] = descr;
255 return fRunNumber++;
256 }
257
258 bool IsThreadRunning()
259 {
260 return !fThread.timed_join(boost::posix_time::microseconds(0));
261 }
262
263 void SetMaxMemory(unsigned int mb) const
264 {
265 /*
266 if (mb*1000000<GetUsedMemory())
267 {
268 // ffMsg.Warn("...");
269 return;
270 }*/
271
272 g_maxMem = size_t(mb)*1000000;
273 }
274
275 void StartThread(const vector<tcp::endpoint> &addr)
276 {
277 if (IsThreadRunning())
278 {
279 fMsg.Warn("Start - EventBuilder still running");
280 return;
281 }
282
283 fLastMessage.clear();
284
285 for (size_t i=0; i<40; i++)
286 ConnectSlot(i, addr[i]);
287
288 g_runStat = kModeRun;
289
290 fMsg.Message("Starting EventBuilder thread");
291
292 fThread = boost::thread(StartEvtBuild);
293 }
294 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
295 {
296 if (i>39)
297 return;
298
299 if (addr==tcp::endpoint())
300 {
301 DisconnectSlot(i);
302 return;
303 }
304
305 g_port[i].sockAddr.sin_family = AF_INET;
306 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
307 g_port[i].sockAddr.sin_port = htons(addr.port());
308 // In this order
309 g_port[i].sockDef = 1;
310 }
311 void DisconnectSlot(unsigned int i)
312 {
313 if (i>39)
314 return;
315
316 g_port[i].sockDef = 0;
317 // In this order
318 g_port[i].sockAddr.sin_family = AF_INET;
319 g_port[i].sockAddr.sin_addr.s_addr = 0;
320 g_port[i].sockAddr.sin_port = 0;
321 }
322 void IgnoreSlot(unsigned int i)
323 {
324 if (i>39)
325 return;
326 if (g_port[i].sockAddr.sin_port==0)
327 return;
328
329 g_port[i].sockDef = -1;
330 }
331
332
333 void Abort()
334 {
335 fMsg.Message("Signal abort to EventBuilder thread...");
336 g_runStat = kAbort;
337 }
338
339 void ResetThread(bool soft)
340 {
341 /*
342 if (g_reset > 0)
343
344 * suspend reading
345 * reset = g_reset;
346 * g_reset=0
347
348 * reset% 10
349 == 0 leave event Buffers as they are
350 == 1 let all buffers drain (write (incomplete) events)
351 > 1 flush all buffers (do not write buffered events)
352
353 * (reset/10)%10
354 > 0 close all sockets and destroy them (also free the
355 allocated read-buffers)
356 recreate before resuming operation
357 [ this is more than just close/open that can be
358 triggered by e.g. close/open the base-socket ]
359
360 * (reset/100)%10
361 > 0 close all open run-files
362
363 * (reset/1000)
364 sleep so many seconds before resuming operation
365 (does not (yet) take into account time left when waiting
366 for buffers getting empty ...)
367
368 * resume_reading
369
370 */
371 fMsg.Message("Signal reset to EventBuilder thread...");
372 g_reset = soft ? 101 : 102;
373 }
374
375 void Exit()
376 {
377 fMsg.Message("Signal exit to EventBuilder thread...");
378 g_runStat = kExit;
379 }
380
381 /*
382 void Wait()
383 {
384 fThread.join();
385 ffMsg.Message("EventBuilder stopped.");
386 }*/
387
388 void Hybernate() const { g_runStat = kHybernate; }
389 void Sleep() const { g_runStat = kSleep; }
390 void FlushMode() const { g_runStat = kModeFlush; }
391 void TestMode() const { g_runStat = kModeTest; }
392 void FlagMode() const { g_runStat = kModeFlag; }
393 void RunMode() const { g_runStat = kModeRun; }
394
395 // FIXME: To be removed
396 void SetMode(int mode) const { g_runStat = mode; }
397
398 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
399 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
400 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
401 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
402
403 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
404 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
405
406 void SetOutputFormat(FileFormat_t f)
407 {
408 fFileFormat = f;
409 if (fFileFormat==kCalib)
410 {
411 DataCalib::Restart();
412 DataCalib::Update(fDimDrsCalibration);
413 fMsg.Message("Resetted DRS calibration.");
414 }
415 }
416
417 void SetDebugLog(bool b) { fDebugLog = b; }
418
419 void SetDebugStream(bool b)
420 {
421 fDebugStream = b;
422 if (b)
423 return;
424
425 for (int i=0; i<40; i++)
426 {
427 if (!fDumpStream[i].is_open())
428 continue;
429
430 fDumpStream[i].close();
431
432 ostringstream name;
433 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
434 fMsg.Message("Closed file '"+name.str()+"'");
435 }
436 }
437
438 void SetDebugRead(bool b)
439 {
440 fDebugRead = b;
441 if (b || !fDumpRead.is_open())
442 return;
443
444 fDumpRead.close();
445 fMsg.Message("Closed file 'socket_events.txt'");
446 }
447
448// size_t GetUsedMemory() const { return gi_usedMem; }
449
450 void LoadDrsCalibration(const char *fname)
451 {
452 if (!DataCalib::ReadFits(fname, fMsg))
453 return;
454 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
455 DataCalib::Update(fDimDrsCalibration);
456 }
457
458 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
459
460
461 /*
462 struct OpenFileToDim
463 {
464 int code;
465 char fileName[FILENAME_MAX];
466 };
467
468 SignalRunOpened(runid, filename);
469 // Send num open files
470 // Send runid, (more info about the run?), filename via dim
471
472 SignalEvtWritten(runid);
473 // Send num events written of newest file
474
475 SignalRunClose(runid);
476 // Send new num open files
477 // Send empty file-name if no file is open
478
479 */
480
481 // -------------- Mapped event builder callbacks ------------------
482
483 void UpdateRuns(const string &fname="")
484 {
485 uint32_t values[5] =
486 {
487 static_cast<uint32_t>(fFiles.size()),
488 0xffffffff,
489 0,
490 fLastOpened,
491 fLastClosed
492 };
493
494 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
495 it!=fFiles.end(); it++)
496 {
497 const DataProcessorImp *file = *it;
498
499 if (file->GetRunId()<values[1])
500 values[1] = file->GetRunId();
501
502 if (file->GetRunId()>values[2])
503 values[2] = file->GetRunId();
504 }
505
506 fMaxRun = values[2];
507
508 vector<char> data(sizeof(values)+fname.size()+1);
509 memcpy(data.data(), values, sizeof(values));
510 strcpy(data.data()+sizeof(values), fname.c_str());
511
512 fDimRuns.Update(data);
513 }
514
515 vector<DataProcessorImp*> fFiles;
516
517 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
518 {
519 fMsg.Info(" ==> TODO: Update run configuration in database!");
520 fMsg.Info(" ==> TODO: Remove run from fExpected runs in case of failure!");
521 fMsg.Info(" ==> TODO: Write information from fTargetConfig to header!");
522
523 // Check if file already exists...
524 DataProcessorImp *file = 0;
525 switch (fFileFormat)
526 {
527 case kNone: file = new DataDump(fPath, runid, fMsg); break;
528 case kDebug: file = new DataDebug(fPath, runid, fMsg); break;
529 case kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
530 case kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
531 case kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
532 }
533
534 try
535 {
536 if (!file->Open(h))
537 return 0;
538 }
539 catch (const exception &e)
540 {
541 fMsg.Error("Exception trying to open file: "+string(e.what()));
542 return 0;
543 }
544
545 fFiles.push_back(file);
546
547 ostringstream str;
548 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
549 fMsg.Info(str);
550
551 fDimWriteStats.FileOpened(file->GetFileName());
552
553 fLastOpened = runid;
554 UpdateRuns(file->GetFileName());
555
556 fNumEvts[kEventId] = 0;
557 fNumEvts[kTriggerId] = 0;
558
559 fNumEvts[kCurrent] = 0;
560 fDimEvents.Update(fNumEvts);
561 // fDimCurrentEvent.Update(uint32_t(0));
562
563 return reinterpret_cast<FileHandle_t>(file);
564 }
565
566 int runWrite(FileHandle_t handler, EVENT *e, size_t)
567 {
568 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
569
570 if (!file->WriteEvt(e))
571 return -1;
572
573 if (file->GetRunId()==fMaxRun)
574 {
575 fNumEvts[kCurrent]++;
576 fNumEvts[kEventId] = e->EventNum;
577 fNumEvts[kTriggerId] = e->TriggerNum;
578 }
579
580 fNumEvts[kTotal]++;
581
582 static Time oldt(boost::date_time::neg_infin);
583 Time newt;
584 if (newt>oldt+boost::posix_time::seconds(1))
585 {
586 fDimEvents.Update(fNumEvts);
587 oldt = newt;
588 }
589
590
591 // ===> SignalEvtWritten(runid);
592 // Send num events written of newest file
593
594 /* close run runId (all all runs if runId=0) */
595 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
596 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
597
598 return 0;
599 }
600
601 virtual void CloseRun(uint32_t runid) { }
602
603 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
604 {
605 fMsg.Info(" ==> TODO: Update run configuration in database!");
606 fMsg.Info(" ==> TODO: Remove run from fExpected runs!");
607
608 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
609
610 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
611 if (it==fFiles.end())
612 {
613 ostringstream str;
614 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
615 fMsg.Fatal(str);
616 return -1;
617 }
618
619 fFiles.erase(it);
620
621 fLastClosed = file->GetRunId();
622 CloseRun(fLastClosed);
623 UpdateRuns();
624
625 fDimEvents.Update(fNumEvts);
626
627 const bool rc = file->Close(tail);
628 if (!rc)
629 {
630 // Error message
631 }
632
633 ostringstream str;
634 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
635 fMsg.Info(str);
636
637 delete file;
638
639 // ==> SignalRunClose(runid);
640 // Send new num open files
641 // Send empty file-name if no file is open
642
643 return rc ? 0 : -1;
644 }
645
646 ofstream fDumpStream[40];
647
648 void debugStream(int isock, void *buf, int len)
649 {
650 if (!fDebugStream)
651 return;
652
653 const int slot = isock/7;
654 if (slot<0 || slot>39)
655 return;
656
657 if (!fDumpStream[slot].is_open())
658 {
659 ostringstream name;
660 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
661
662 fDumpStream[slot].open(name.str().c_str(), ios::app);
663 if (!fDumpStream[slot])
664 {
665 ostringstream str;
666 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
667 fMsg.Error(str);
668
669 return;
670 }
671
672 fMsg.Message("Opened file '"+name.str()+"' for writing.");
673 }
674
675 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
676 }
677
678 ofstream fDumpRead; // Stream to possibly dump docket events
679
680 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
681 {
682 // isock = socketID (0-279)
683 // ibyte = #bytes gelesen
684 // event = eventId (oder 0 wenn noch nicht bekannt)
685 // state : 1=finished reading data
686 // 0=reading data
687 // -1=start reading data (header)
688 // -2=start reading data,
689 // eventId not known yet (too little data)
690 // tsec, tusec = time when reading seconds, microseconds
691 //
692 if (!fDebugRead || ibyte==0)
693 return;
694
695 if (!fDumpRead.is_open())
696 {
697 fDumpRead.open("socket_events.txt", ios::app);
698 if (!fDumpRead)
699 {
700 ostringstream str;
701 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
702 fMsg.Error(str);
703
704 return;
705 }
706
707 fMsg.Message("Opened file 'socket_events.txt' for writing.");
708
709 fDumpRead << "# START: " << Time().GetAsStr() << endl;
710 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
711 }
712
713 fDumpRead
714 << setw(2) << state << " "
715 << setw(8) << tsec << " "
716 << setw(9) << tusec << " "
717 << setw(3) << isock << " "
718 << setw(2) << isock/7 << " "
719 << runno << " "
720 << event << " "
721 << ftmevt << " "
722 << ibyte << endl;
723 }
724
725 array<uint16_t,2> fVecRoi;
726
727 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
728 {
729 /*
730 fadhd[i] ist ein array mit den 40 fad-headers
731 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
732
733 event ist die Struktur, die auch die write routine erhaelt;
734 darin sind im header die 'soll-werte' fuer z.B. eventID
735 als auch die ADC-Werte (falls Du die brauchst)
736
737 Wenn die routine einen negativen Wert liefert, wird das event
738 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
739 */
740
741 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
742
743 if (roi!=fVecRoi)
744 {
745 Update(fDimRoi, roi);
746 fVecRoi = roi;
747 }
748
749 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
750 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
751
752 // FIMXE: Compare with target configuration
753
754 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
755 {
756 // FIXME: Compare with expectations!!!
757 if (ptr->fStartDelimiter==0)
758 {
759 if (ptr==beg)
760 beg++;
761 continue;
762 }
763
764 if (beg->fStatus != ptr->fStatus)
765 {
766 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
767 CloseRunFile(runNr, 0, 0);
768 return -1;
769 }
770
771 if (beg->fRunNumber != ptr->fRunNumber)
772 {
773 fMsg.Error("Inconsistent run number detected.... closing run.");
774 CloseRunFile(runNr, 0, 0);
775 return -1;
776 }
777
778 /*
779 if (beg->fVersion != ptr->fVersion)
780 {
781 Error("Inconsist firmware version detected.... closing run.");
782 CloseRunFile(runNr, 0, 0);
783 break;
784 }
785 if (beg->fEventCounter != ptr->fEventCounter)
786 {
787 Error("Inconsist run number detected.... closing run.");
788 CloseRunFile(runNr, 0, 0);
789 break;
790 }
791 if (beg->fTriggerCounter != ptr->fTriggerCounter)
792 {
793 Error("Inconsist trigger number detected.... closing run.");
794 CloseRunFile(runNr, 0, 0);
795 break;
796 }*/
797
798 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
799 {
800 fMsg.Error("Inconsistent phase shift detected.... closing run.");
801 CloseRunFile(runNr, 0, 0);
802 return -1;
803 }
804
805 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
806 {
807 fMsg.Error("Inconsistent DAC values detected.... closing run.");
808 CloseRunFile(runNr, 0, 0);
809 return -1;
810 }
811
812 if (beg->fTriggerType != ptr->fTriggerType)
813 {
814 fMsg.Error("Inconsistent trigger type detected.... closing run.");
815 CloseRunFile(runNr, 0, 0);
816 return -1;
817 }
818 }
819
820 // check REFCLK_frequency
821 // check consistency with command configuration
822 // how to log errors?
823 // need gotNewRun/closedRun to know it is finished
824
825 return 0;
826 }
827
828 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
829 {
830 // Currently we send any event no matter what its trigger id is...
831 // To be changed.
832 static Time oldt(boost::date_time::neg_infin);
833 Time newt;
834
835 // FIXME: Only send events if the have newer run-numbers
836 if (newt<oldt+boost::posix_time::seconds(1))
837 return;
838
839 oldt = newt;
840
841 const size_t sz = sizeof(EVENT)+event->Roi*2*1440;
842
843 vector<char> data(sz+event->Roi*2*1440);
844 memcpy(data.data(), event, sizeof(EVENT));
845
846 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
847
848 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
849 fDimRawData.Update(data);
850
851 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
852 CalibData::GetPixelStats(data2.data(), vec, event->Roi);
853
854 fDimEventData.Update(data2);
855 }
856
857 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
858 {
859 if (!DataCalib::IsValid())
860 return;
861
862 // Workaround to find a valid header.....
863 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
864 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
865
866 // FIMXE: Compare with target configuration
867
868 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++, beg++)
869 if (ptr->fStartDelimiter!=0)
870 break;
871
872 // FIXME: Time limit?!
873 /*
874 static Time oldt(boost::date_time::neg_infin);
875 Time newt;
876
877 // FIXME: Only send events if the have newer run-numbers
878 if (newt<oldt+boost::posix_time::seconds(1))
879 return 0;
880
881 oldt = newt;
882 */
883
884 // FIXME: Check event type here
885 return;
886
887 const size_t sz = sizeof(EVENT)+event->Roi*2*1440;
888
889 vector<char> data(sz+event->Roi*2*1440);
890 memcpy(data.data(), event, sizeof(EVENT));
891
892 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
893
894 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
895
896 vector<float> data2(1440); // Mean, RMS, Max, Pos
897 CalibData::GetPixelMax(data2.data(), vec, event->Roi, 0, event->Roi);
898
899 fDimFeedbackData.Update(data2);
900 }
901
902 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t */*buffer*/)
903 {
904 switch (threadID)
905 {
906 case 0:
907 SendRawData(fadhd, event);
908 return 1;
909 case 1:
910 SendFeedbackData(fadhd, event);
911 return 2;
912 }
913 return 100;
914 }
915
916
917 bool IsRunStarted() const
918 {
919 const map<uint32_t,RunDescription>::const_iterator it = fExpectedRuns.find(fRunNumber-1);
920 return it==fExpectedRuns.end();// ? true : it->second.started;
921 }
922
923 uint32_t GetRunNumber() const
924 {
925 return fRunNumber;
926 }
927
928 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
929 {
930 // This function is called even when writing is switched off
931 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
932 if (it==fExpectedRuns.end())
933 {
934 ostringstream str;
935 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
936 return;
937 }
938
939 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
940 // return: 0=close scheduled / >0 already closed / <0 does not exist
941
942 // FIXME: Move configuration from expected runs to runs which will soon
943 // be opened/closed
944
945 it->second.started = true;
946
947 fExpectedRuns.erase(it);
948 }
949
950 map<boost::thread::id, string> fLastMessage;
951
952 void factOut(int severity, int err, const char *message)
953 {
954 if (!fDebugLog && severity==99)
955 return;
956
957 ostringstream str;
958 //str << boost::this_thread::get_id() << " ";
959 str << "EventBuilder(";
960 if (err<0)
961 str << "---";
962 else
963 str << err;
964 str << "): " << message;
965
966 string &old = fLastMessage[boost::this_thread::get_id()];
967
968 if (str.str()==old)
969 return;
970 old = str.str();
971
972 fMsg.Update(str, severity);
973 }
974/*
975 void factStat(int64_t *stat, int len)
976 {
977 if (len!=7)
978 {
979 fMsg.Warn("factStat received unknown number of values.");
980 return;
981 }
982
983 vector<int64_t> data(1, g_maxMem);
984 data.insert(data.end(), stat, stat+len);
985
986 static vector<int64_t> last(8);
987 if (data==last)
988 return;
989 last = data;
990
991 fDimStatistics.Update(data);
992
993 // len ist die Laenge des arrays.
994 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
995 // kannst Du pruefen, ob die 100MB voll sind ....
996
997 ostringstream str;
998 str
999 << "Wait=" << stat[0] << " "
1000 << "Skip=" << stat[1] << " "
1001 << "Del=" << stat[2] << " "
1002 << "Tot=" << stat[3] << " "
1003 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1004 << "Read=" << stat[5] << " "
1005 << "Conn=" << stat[6];
1006
1007 fMsg.Info(str);
1008 }
1009 */
1010
1011 void factStat(const EVT_STAT &stat)
1012 {
1013 fDimStatistics2.Update(stat);
1014 }
1015
1016 void factStat(const GUI_STAT &stat)
1017 {
1018 fDimStatistics1.Update(stat);
1019 }
1020
1021
1022 array<FAD::EventHeader, 40> fVecHeader;
1023
1024 template<typename T, class S>
1025 array<T, 42> Compare(const S *vec, const T *t)
1026 {
1027 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1028
1029 const T *min = NULL;
1030 const T *val = NULL;
1031 const T *max = NULL;
1032
1033 array<T, 42> arr;
1034
1035 bool rc = true;
1036 for (int i=0; i<40; i++)
1037 {
1038 const char *base = reinterpret_cast<const char*>(vec+i);
1039 const T *ref = reinterpret_cast<const T*>(base+offset);
1040
1041 arr[i] = *ref;
1042
1043 if (gi_NumConnect[i]!=7)
1044 {
1045 arr[i] = 0;
1046 continue;
1047 }
1048
1049 if (!val)
1050 {
1051 min = ref;
1052 val = ref;
1053 max = ref;
1054 }
1055
1056 if (*ref<*min)
1057 min = ref;
1058
1059 if (*ref>*max)
1060 max = ref;
1061
1062 if (*val!=*ref)
1063 rc = false;
1064 }
1065
1066 arr[40] = val ? *min : 1;
1067 arr[41] = val ? *max : 0;
1068
1069 return arr;
1070 }
1071
1072 template<typename T>
1073 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1074 {
1075 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1076
1077 T val = 0;
1078 T rc = 0;
1079
1080 array<T, 42> vec;
1081
1082 bool first = true;
1083
1084 for (int i=0; i<40; i++)
1085 {
1086 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1087 const T *ref = reinterpret_cast<const T*>(base+offset);
1088
1089 vec[i+2] = *ref;
1090
1091 if (gi_NumConnect[i]!=7)
1092 {
1093 vec[i+2] = 0;
1094 continue;
1095 }
1096
1097 if (first)
1098 {
1099 first = false;
1100 val = *ref;
1101 rc = 0;
1102 }
1103
1104 rc |= val^*ref;
1105 }
1106
1107 vec[0] = rc;
1108 vec[1] = val;
1109
1110 return vec;
1111 }
1112
1113 template<typename T, size_t N>
1114 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1115 {
1116// svc.setQuality(vec[40]<=vec[41]);
1117 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1118 svc.Update();
1119 }
1120
1121 template<typename T>
1122 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1123 {
1124 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1125 for (int i=0; i<40;i++)
1126 cout << " " << data.second[i+3];
1127 cout << endl;
1128 }
1129
1130 vector<uint> fNumConnected;
1131
1132 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1133 {
1134 const uint16_t id = h.Id();
1135 if (id>39)
1136 return;
1137
1138 if (fNumConnected.size()!=40)
1139 fNumConnected.resize(40);
1140
1141 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1142
1143 const bool changed = con!=fNumConnected || !IsThreadRunning();
1144
1145 fNumConnected = con;
1146
1147 const FAD::EventHeader old = fVecHeader[id];
1148 fVecHeader[id] = h;
1149
1150 if (old.fVersion != h.fVersion || changed)
1151 {
1152 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1153
1154 array<float,42> data;
1155 for (int i=0; i<42; i++)
1156 {
1157 ostringstream str;
1158 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1159 data[i] = stof(str.str());
1160 }
1161 Update(fDimFwVersion, data);
1162 }
1163
1164 if (old.fRunNumber != h.fRunNumber || changed)
1165 {
1166 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1167 fDimRunNumber.Update(run);
1168 }
1169
1170 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1171 {
1172 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1173 fDimPrescaler.Update(pre);
1174 }
1175
1176 if (old.fDNA != h.fDNA || changed)
1177 {
1178 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1179 Update(fDimDNA, dna, 40);
1180 }
1181
1182 if (old.fStatus != h.fStatus || changed)
1183 {
1184 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1185 Update(fDimStatus, sts);
1186 }
1187
1188 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1189 {
1190 array<uint16_t, FAD::kNumDac*42> dacs;
1191
1192 for (int i=0; i<FAD::kNumDac; i++)
1193 {
1194 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1195 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1196 }
1197
1198 Update(fDimDac, dacs);
1199 }
1200
1201 // -----------
1202
1203 static Time oldt(boost::date_time::neg_infin);
1204 Time newt;
1205
1206 if (newt>oldt+boost::posix_time::seconds(1))
1207 {
1208 oldt = newt;
1209
1210 // --- RefClock
1211
1212 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1213 Update(fDimRefClock, clk);
1214
1215 // --- Temperatures
1216
1217 const array<int16_t,42> tmp[4] =
1218 {
1219 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1220 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1221 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1222 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1223 };
1224
1225 vector<int16_t> data;
1226 data.reserve(82);
1227 data.push_back(tmp[0][40]); // min: 0
1228 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1229 data.push_back(tmp[0][41]); // max: 41
1230 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1231
1232 for (int j=1; j<=3; j++)
1233 {
1234 const array<int16_t,42> &ref = tmp[j];
1235
1236 // Gloabl min
1237 if (ref[40]<data[0]) // 40=min
1238 data[0] = ref[40];
1239
1240 // Global max
1241 if (ref[41]>data[41]) // 41=max
1242 data[41] = ref[41];
1243
1244 for (int i=0; i<40; i++)
1245 {
1246 // min per board
1247 if (ref[i]<data[i+1]) // data: 1-40
1248 data[i+1] = ref[i]; // ref: 0-39
1249
1250 // max per board
1251 if (ref[i]>data[i+42]) // data: 42-81
1252 data[i+42] = ref[i]; // ref: 0-39
1253 }
1254
1255
1256 }
1257
1258 vector<float> deg(82); // 0: global min, 1-40: min
1259 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1260 deg[i] = data[i]/16.;
1261 fDimTemperature.Update(deg);
1262 }
1263 }
1264};
1265
1266EventBuilderWrapper *EventBuilderWrapper::This = 0;
1267
1268// ----------- Event builder callbacks implementation ---------------
1269extern "C"
1270{
1271 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1272 {
1273 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1274 }
1275
1276 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1277 {
1278 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1279 }
1280
1281 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1282 {
1283 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1284 }
1285
1286 void factOut(int severity, int err, const char *message)
1287 {
1288 EventBuilderWrapper::This->factOut(severity, err, message);
1289 }
1290
1291 void factStat(GUI_STAT stat)
1292 {
1293 EventBuilderWrapper::This->factStat(stat);
1294 }
1295
1296 void factStatNew(EVT_STAT stat)
1297 {
1298 EventBuilderWrapper::This->factStat(stat);
1299 }
1300
1301 void debugHead(int socket, int/*board*/, void *buf)
1302 {
1303 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1304
1305 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1306 }
1307
1308 void debugStream(int isock, void *buf, int len)
1309 {
1310 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1311 }
1312
1313 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1314 {
1315 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1316 }
1317
1318 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
1319 {
1320 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event);
1321 }
1322
1323 void gotNewRun(int runnr, PEVNT_HEADER *headers)
1324 {
1325 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1326 }
1327
1328 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer)
1329 {
1330 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, buffer);
1331 }
1332
1333}
1334
1335#endif
Note: See TracBrowser for help on using the repository browser.