source: trunk/FACT++/src/EventBuilderWrapper.h@ 12040

Last change on this file since 12040 was 12040, checked in by tbretz, 13 years ago
Added the correct path to DimWriteStatistics; fixed the extraction region in the call of CalibData::GetPixelMax; do not limit sending rate of DRS calib result for feedback; set g_maxProc to 3
File size: 38.3 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 enum FileFormat_t
78 {
79 kNone = 0,
80 kDebug,
81 kFits,
82 kRaw,
83 kCalib
84 };
85
86 FileFormat_t fFileFormat;
87
88
89 uint32_t fMaxRun;
90 uint32_t fLastOpened;
91 uint32_t fLastClosed;
92 uint32_t fNumEvts[4];
93
94 DimWriteStatistics fDimWriteStats;
95 DimDescribedService fDimRuns;
96 DimDescribedService fDimEvents;
97 DimDescribedService fDimRawData;
98 DimDescribedService fDimEventData;
99 DimDescribedService fDimFeedbackData;
100 DimDescribedService fDimFwVersion;
101 DimDescribedService fDimRunNumber;
102 DimDescribedService fDimStatus;
103 DimDescribedService fDimDNA;
104 DimDescribedService fDimTemperature;
105 DimDescribedService fDimPrescaler;
106 DimDescribedService fDimRefClock;
107 DimDescribedService fDimRoi;
108 DimDescribedService fDimDac;
109 DimDescribedService fDimDrsCalibration;
110 DimDescribedService fDimStatistics1;
111 DimDescribedService fDimStatistics2;
112
113 bool fDebugStream;
114 bool fDebugRead;
115 bool fDebugLog;
116
117 string fPath;
118 uint32_t fRunNumber;
119
120protected:
121 int64_t InitRunNumber()
122 {
123 fRunNumber = 1000;
124
125 // Ensure that the night doesn't change during our check
126 const int check = Time().NightAsInt();
127
128 while (--fRunNumber>0)
129 {
130 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
131
132 if (access((name+"bin").c_str(), F_OK) == 0)
133 break;
134 if (access((name+"fits").c_str(), F_OK) == 0)
135 break;
136 if (access((name+"drs.fits").c_str(), F_OK) == 0)
137 break;
138
139 }
140
141 if (check != Time().NightAsInt())
142 return InitRunNumber();
143
144 fRunNumber++;
145
146 if (fRunNumber==1000)
147 {
148 fMsg.Error("You have a file with run-number 1000 in "+fPath);
149 return -1;
150 }
151
152 ostringstream str;
153 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
154 fMsg.Message(str);
155
156 fMsg.Info(" ==> TODO: Crosscheck with database!");
157
158 return check;
159 }
160
161 int64_t InitRunNumber(const string &path)
162 {
163 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
164 return -1;
165
166 //const fs::path fullPath = fs::system_complete(fs::path(path));
167
168 fPath = path;
169
170 fDimWriteStats.SetCurrentFolder(fPath);
171
172 return InitRunNumber();
173 }
174
175public:
176 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
177 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
178 fDimWriteStats ("FAD_CONTROL", imp),
179 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
180 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
181 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
182 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
183 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
184 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
185 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
186 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
187 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
188 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
189 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
190 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
191 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
192 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
193 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560", ""),
194 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
195 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
196 fDebugStream(false), fDebugRead(false), fDebugLog(false)
197 {
198 if (This)
199 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
200
201 This = this;
202
203 memset(fNumEvts, 0, sizeof(fNumEvts));
204 fDimEvents.Update(fNumEvts);
205
206 for (size_t i=0; i<40; i++)
207 ConnectSlot(i, tcp::endpoint());
208 }
209 virtual ~EventBuilderWrapper()
210 {
211 Abort();
212
213 // FIXME: Used timed_join and abort afterwards
214 // What's the maximum time the eb need to abort?
215 fThread.join();
216 //ffMsg.Info("EventBuilder stopped.");
217
218 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
219 delete *it;
220 }
221
222 struct RunDescription
223 {
224 uint32_t maxtime;
225 uint32_t maxevt;
226
227 string name;
228
229 FAD::Configuration reference;
230
231 bool started;
232 };
233
234 map<uint32_t, RunDescription> fExpectedRuns;
235
236 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
237 {
238 if (maxtime<=0 || maxtime>24*60*60)
239 maxtime = 24*60*60;
240 if (maxevt<=0 || maxevt>INT32_MAX)
241 maxevt = INT32_MAX;
242
243 const RunDescription descr =
244 {
245 uint32_t(maxtime),
246 uint32_t(maxevt),
247 ref.first,
248 ref.second,
249 false
250 };
251
252 // FIMXE: Maybe reset an event counter so that the mcp can count events?
253
254 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
255
256 fExpectedRuns[fRunNumber] = descr;
257 return fRunNumber++;
258 }
259
260 bool IsThreadRunning()
261 {
262 return !fThread.timed_join(boost::posix_time::microseconds(0));
263 }
264
265 void SetMaxMemory(unsigned int mb) const
266 {
267 /*
268 if (mb*1000000<GetUsedMemory())
269 {
270 // ffMsg.Warn("...");
271 return;
272 }*/
273
274 g_maxMem = size_t(mb)*1000000;
275 }
276
277 void StartThread(const vector<tcp::endpoint> &addr)
278 {
279 if (IsThreadRunning())
280 {
281 fMsg.Warn("Start - EventBuilder still running");
282 return;
283 }
284
285 fLastMessage.clear();
286
287 for (size_t i=0; i<40; i++)
288 ConnectSlot(i, addr[i]);
289
290 g_runStat = kModeRun;
291 g_maxProc = 3;
292
293 fMsg.Message("Starting EventBuilder thread");
294
295 fThread = boost::thread(StartEvtBuild);
296 }
297 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
298 {
299 if (i>39)
300 return;
301
302 if (addr==tcp::endpoint())
303 {
304 DisconnectSlot(i);
305 return;
306 }
307
308 g_port[i].sockAddr.sin_family = AF_INET;
309 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
310 g_port[i].sockAddr.sin_port = htons(addr.port());
311 // In this order
312 g_port[i].sockDef = 1;
313 }
314 void DisconnectSlot(unsigned int i)
315 {
316 if (i>39)
317 return;
318
319 g_port[i].sockDef = 0;
320 // In this order
321 g_port[i].sockAddr.sin_family = AF_INET;
322 g_port[i].sockAddr.sin_addr.s_addr = 0;
323 g_port[i].sockAddr.sin_port = 0;
324 }
325 void IgnoreSlot(unsigned int i)
326 {
327 if (i>39)
328 return;
329 if (g_port[i].sockAddr.sin_port==0)
330 return;
331
332 g_port[i].sockDef = -1;
333 }
334
335
336 void Abort()
337 {
338 fMsg.Message("Signal abort to EventBuilder thread...");
339 g_runStat = kAbort;
340 }
341
342 void ResetThread(bool soft)
343 {
344 /*
345 if (g_reset > 0)
346
347 * suspend reading
348 * reset = g_reset;
349 * g_reset=0
350
351 * reset% 10
352 == 0 leave event Buffers as they are
353 == 1 let all buffers drain (write (incomplete) events)
354 > 1 flush all buffers (do not write buffered events)
355
356 * (reset/10)%10
357 > 0 close all sockets and destroy them (also free the
358 allocated read-buffers)
359 recreate before resuming operation
360 [ this is more than just close/open that can be
361 triggered by e.g. close/open the base-socket ]
362
363 * (reset/100)%10
364 > 0 close all open run-files
365
366 * (reset/1000)
367 sleep so many seconds before resuming operation
368 (does not (yet) take into account time left when waiting
369 for buffers getting empty ...)
370
371 * resume_reading
372
373 */
374 fMsg.Message("Signal reset to EventBuilder thread...");
375 g_reset = soft ? 101 : 102;
376 }
377
378 void Exit()
379 {
380 fMsg.Message("Signal exit to EventBuilder thread...");
381 g_runStat = kExit;
382 }
383
384 /*
385 void Wait()
386 {
387 fThread.join();
388 ffMsg.Message("EventBuilder stopped.");
389 }*/
390
391 void Hybernate() const { g_runStat = kHybernate; }
392 void Sleep() const { g_runStat = kSleep; }
393 void FlushMode() const { g_runStat = kModeFlush; }
394 void TestMode() const { g_runStat = kModeTest; }
395 void FlagMode() const { g_runStat = kModeFlag; }
396 void RunMode() const { g_runStat = kModeRun; }
397
398 // FIXME: To be removed
399 void SetMode(int mode) const { g_runStat = mode; }
400
401 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
402 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
403 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
404 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
405
406 /*
407 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
408 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
409 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
410 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
411 */
412
413 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
414 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
415
416 void SetOutputFormat(FileFormat_t f)
417 {
418 fFileFormat = f;
419 if (fFileFormat==kCalib)
420 {
421 DataCalib::Restart();
422 DataCalib::Update(fDimDrsCalibration);
423 fMsg.Message("Resetted DRS calibration.");
424 }
425 }
426
427 void SetDebugLog(bool b) { fDebugLog = b; }
428
429 void SetDebugStream(bool b)
430 {
431 fDebugStream = b;
432 if (b)
433 return;
434
435 for (int i=0; i<40; i++)
436 {
437 if (!fDumpStream[i].is_open())
438 continue;
439
440 fDumpStream[i].close();
441
442 ostringstream name;
443 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
444 fMsg.Message("Closed file '"+name.str()+"'");
445 }
446 }
447
448 void SetDebugRead(bool b)
449 {
450 fDebugRead = b;
451 if (b || !fDumpRead.is_open())
452 return;
453
454 fDumpRead.close();
455 fMsg.Message("Closed file 'socket_events.txt'");
456 }
457
458// size_t GetUsedMemory() const { return gi_usedMem; }
459
460 void LoadDrsCalibration(const char *fname)
461 {
462 if (!DataCalib::ReadFits(fname, fMsg))
463 return;
464 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
465 DataCalib::Update(fDimDrsCalibration);
466 }
467
468 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
469
470
471 /*
472 struct OpenFileToDim
473 {
474 int code;
475 char fileName[FILENAME_MAX];
476 };
477
478 SignalRunOpened(runid, filename);
479 // Send num open files
480 // Send runid, (more info about the run?), filename via dim
481
482 SignalEvtWritten(runid);
483 // Send num events written of newest file
484
485 SignalRunClose(runid);
486 // Send new num open files
487 // Send empty file-name if no file is open
488
489 */
490
491 // -------------- Mapped event builder callbacks ------------------
492
493 void UpdateRuns(const string &fname="")
494 {
495 uint32_t values[5] =
496 {
497 static_cast<uint32_t>(fFiles.size()),
498 0xffffffff,
499 0,
500 fLastOpened,
501 fLastClosed
502 };
503
504 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
505 it!=fFiles.end(); it++)
506 {
507 const DataProcessorImp *file = *it;
508
509 if (file->GetRunId()<values[1])
510 values[1] = file->GetRunId();
511
512 if (file->GetRunId()>values[2])
513 values[2] = file->GetRunId();
514 }
515
516 fMaxRun = values[2];
517
518 vector<char> data(sizeof(values)+fname.size()+1);
519 memcpy(data.data(), values, sizeof(values));
520 strcpy(data.data()+sizeof(values), fname.c_str());
521
522 fDimRuns.Update(data);
523 }
524
525 vector<DataProcessorImp*> fFiles;
526
527 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
528 {
529 fMsg.Info(" ==> TODO: Update run configuration in database!");
530 fMsg.Info(" ==> TODO: Remove run from fExpected runs in case of failure!");
531 fMsg.Info(" ==> TODO: Write information from fTargetConfig to header!");
532
533 // Check if file already exists...
534 DataProcessorImp *file = 0;
535 switch (fFileFormat)
536 {
537 case kNone: file = new DataDump(fPath, runid, fMsg); break;
538 case kDebug: file = new DataDebug(fPath, runid, fMsg); break;
539 case kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
540 case kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
541 case kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
542 }
543
544 try
545 {
546 if (!file->Open(h))
547 return 0;
548 }
549 catch (const exception &e)
550 {
551 fMsg.Error("Exception trying to open file: "+string(e.what()));
552 return 0;
553 }
554
555 fFiles.push_back(file);
556
557 ostringstream str;
558 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
559 fMsg.Info(str);
560
561 fDimWriteStats.FileOpened(file->GetFileName());
562
563 fLastOpened = runid;
564 UpdateRuns(file->GetFileName());
565
566 fNumEvts[kEventId] = 0;
567 fNumEvts[kTriggerId] = 0;
568
569 fNumEvts[kCurrent] = 0;
570 fDimEvents.Update(fNumEvts);
571 // fDimCurrentEvent.Update(uint32_t(0));
572
573 return reinterpret_cast<FileHandle_t>(file);
574 }
575
576 int runWrite(FileHandle_t handler, EVENT *e, size_t sz)
577 {
578 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
579
580 if (!file->WriteEvt(e))
581 return -1;
582
583 if (file->GetRunId()==fMaxRun)
584 {
585 fNumEvts[kCurrent]++;
586 fNumEvts[kEventId] = e->EventNum;
587 fNumEvts[kTriggerId] = e->TriggerNum;
588 }
589
590 fNumEvts[kTotal]++;
591
592 static Time oldt(boost::date_time::neg_infin);
593 Time newt;
594 if (newt>oldt+boost::posix_time::seconds(1))
595 {
596 fDimEvents.Update(fNumEvts);
597 oldt = newt;
598 }
599
600
601 // ===> SignalEvtWritten(runid);
602 // Send num events written of newest file
603
604 /* close run runId (all all runs if runId=0) */
605 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
606 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
607
608 return 0;
609 }
610
611 virtual void CloseRun(uint32_t runid) { }
612
613 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
614 {
615 fMsg.Info(" ==> TODO: Update run configuration in database!");
616 fMsg.Info(" ==> TODO: Remove run from fExpected runs!");
617
618 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
619
620 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
621 if (it==fFiles.end())
622 {
623 ostringstream str;
624 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
625 fMsg.Fatal(str);
626 return -1;
627 }
628
629 fFiles.erase(it);
630
631 fLastClosed = file->GetRunId();
632 CloseRun(fLastClosed);
633 UpdateRuns();
634
635 fDimEvents.Update(fNumEvts);
636
637 const bool rc = file->Close(tail);
638 if (!rc)
639 {
640 // Error message
641 }
642
643 ostringstream str;
644 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
645 fMsg.Info(str);
646
647 delete file;
648
649 // ==> SignalRunClose(runid);
650 // Send new num open files
651 // Send empty file-name if no file is open
652
653 return rc ? 0 : -1;
654 }
655
656 ofstream fDumpStream[40];
657
658 void debugStream(int isock, void *buf, int len)
659 {
660 if (!fDebugStream)
661 return;
662
663 const int slot = isock/7;
664 if (slot<0 || slot>39)
665 return;
666
667 if (!fDumpStream[slot].is_open())
668 {
669 ostringstream name;
670 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
671
672 fDumpStream[slot].open(name.str().c_str(), ios::app);
673 if (!fDumpStream[slot])
674 {
675 ostringstream str;
676 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
677 fMsg.Error(str);
678
679 return;
680 }
681
682 fMsg.Message("Opened file '"+name.str()+"' for writing.");
683 }
684
685 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
686 }
687
688 ofstream fDumpRead; // Stream to possibly dump docket events
689
690 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
691 {
692 // isock = socketID (0-279)
693 // ibyte = #bytes gelesen
694 // event = eventId (oder 0 wenn noch nicht bekannt)
695 // state : 1=finished reading data
696 // 0=reading data
697 // -1=start reading data (header)
698 // -2=start reading data,
699 // eventId not known yet (too little data)
700 // tsec, tusec = time when reading seconds, microseconds
701 //
702 if (!fDebugRead || ibyte==0)
703 return;
704
705 if (!fDumpRead.is_open())
706 {
707 fDumpRead.open("socket_events.txt", ios::app);
708 if (!fDumpRead)
709 {
710 ostringstream str;
711 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
712 fMsg.Error(str);
713
714 return;
715 }
716
717 fMsg.Message("Opened file 'socket_events.txt' for writing.");
718
719 fDumpRead << "# START: " << Time().GetAsStr() << endl;
720 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
721 }
722
723 fDumpRead
724 << setw(2) << state << " "
725 << setw(8) << tsec << " "
726 << setw(9) << tusec << " "
727 << setw(3) << isock << " "
728 << setw(2) << isock/7 << " "
729 << runno << " "
730 << event << " "
731 << ftmevt << " "
732 << ibyte << endl;
733 }
734
735 array<uint16_t,2> fVecRoi;
736
737 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
738 {
739 /*
740 fadhd[i] ist ein array mit den 40 fad-headers
741 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
742
743 event ist die Struktur, die auch die write routine erhaelt;
744 darin sind im header die 'soll-werte' fuer z.B. eventID
745 als auch die ADC-Werte (falls Du die brauchst)
746
747 Wenn die routine einen negativen Wert liefert, wird das event
748 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
749 */
750
751 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
752
753 if (roi!=fVecRoi)
754 {
755 Update(fDimRoi, roi);
756 fVecRoi = roi;
757 }
758
759 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
760 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
761
762 // FIMXE: Compare with target configuration
763
764 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
765 {
766 // FIXME: Compare with expectations!!!
767 if (ptr->fStartDelimiter==0)
768 {
769 if (ptr==beg)
770 beg++;
771 continue;
772 }
773
774 if (beg->fStatus != ptr->fStatus)
775 {
776 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
777 CloseRunFile(runNr, 0, 0);
778 return -1;
779 }
780
781 if (beg->fRunNumber != ptr->fRunNumber)
782 {
783 fMsg.Error("Inconsistent run number detected.... closing run.");
784 CloseRunFile(runNr, 0, 0);
785 return -1;
786 }
787
788 /*
789 if (beg->fVersion != ptr->fVersion)
790 {
791 Error("Inconsist firmware version detected.... closing run.");
792 CloseRunFile(runNr, 0, 0);
793 break;
794 }
795 if (beg->fEventCounter != ptr->fEventCounter)
796 {
797 Error("Inconsist run number detected.... closing run.");
798 CloseRunFile(runNr, 0, 0);
799 break;
800 }
801 if (beg->fTriggerCounter != ptr->fTriggerCounter)
802 {
803 Error("Inconsist trigger number detected.... closing run.");
804 CloseRunFile(runNr, 0, 0);
805 break;
806 }*/
807
808 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
809 {
810 fMsg.Error("Inconsistent phase shift detected.... closing run.");
811 CloseRunFile(runNr, 0, 0);
812 return -1;
813 }
814
815 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
816 {
817 fMsg.Error("Inconsistent DAC values detected.... closing run.");
818 CloseRunFile(runNr, 0, 0);
819 return -1;
820 }
821
822 if (beg->fTriggerType != ptr->fTriggerType)
823 {
824 fMsg.Error("Inconsistent trigger type detected.... closing run.");
825 CloseRunFile(runNr, 0, 0);
826 return -1;
827 }
828 }
829
830 // check REFCLK_frequency
831 // check consistency with command configuration
832 // how to log errors?
833 // need gotNewRun/closedRun to know it is finished
834
835 return 0;
836 }
837
838 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
839 {
840 // Currently we send any event no matter what its trigger id is...
841 // To be changed.
842 static Time oldt(boost::date_time::neg_infin);
843 Time newt;
844
845 // FIXME: Only send events if the have newer run-numbers
846 if (newt<oldt+boost::posix_time::seconds(1))
847 return;
848
849 oldt = newt;
850
851 const size_t sz = sizeof(EVENT)+event->Roi*2*1440;
852
853 vector<char> data(sz+event->Roi*2*1440);
854 memcpy(data.data(), event, sizeof(EVENT));
855
856 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
857
858 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
859 fDimRawData.Update(data);
860
861 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
862 CalibData::GetPixelStats(data2.data(), vec, event->Roi);
863
864 fDimEventData.Update(data2);
865 }
866
867 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
868 {
869 if (!DataCalib::IsValid())
870 return;
871
872 // Workaround to find a valid header.....
873 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
874 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
875
876 // FIMXE: Compare with target configuration
877
878 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++, beg++)
879 if (ptr->fStartDelimiter!=0)
880 break;
881
882 /*
883 // FIXME: Time limit?!
884 static Time oldt(boost::date_time::neg_infin);
885 Time newt;
886
887 // FIXME: Only send events if the have newer run-numbers
888 if (newt<oldt+boost::posix_time::milliseconds(100))
889 return;
890
891 oldt = newt;
892 */
893
894 // FIXME: Check event type here
895
896 const size_t sz = sizeof(EVENT)+event->Roi*2*1440;
897
898 vector<char> data(sz+event->Roi*2*1440);
899 memcpy(data.data(), event, sizeof(EVENT));
900
901 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
902
903 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
904
905 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
906 CalibData::GetPixelMax(data2.data(), vec, event->Roi, 0, event->Roi-1);
907
908// dim_lock();
909 fDimFeedbackData.Update(data2);
910// dim_unlock();
911 }
912
913 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t */*buffer*/)
914 {
915 switch (threadID)
916 {
917 case 0:
918 SendRawData(fadhd, event);
919 return 1;
920 case 1:
921 SendFeedbackData(fadhd, event);
922 return 2;
923 }
924 return 100;
925 }
926
927
928 bool IsRunStarted() const
929 {
930 const map<uint32_t,RunDescription>::const_iterator it = fExpectedRuns.find(fRunNumber-1);
931 return it==fExpectedRuns.end();// ? true : it->second.started;
932 }
933
934 uint32_t GetRunNumber() const
935 {
936 return fRunNumber;
937 }
938
939 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
940 {
941 // This function is called even when writing is switched off
942 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
943 if (it==fExpectedRuns.end())
944 {
945 ostringstream str;
946 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
947 return;
948 }
949
950 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
951 // return: 0=close scheduled / >0 already closed / <0 does not exist
952
953 // FIXME: Move configuration from expected runs to runs which will soon
954 // be opened/closed
955
956 it->second.started = true;
957
958 fExpectedRuns.erase(it);
959 }
960
961 map<boost::thread::id, string> fLastMessage;
962
963 void factOut(int severity, int err, const char *message)
964 {
965 if (!fDebugLog && severity==99)
966 return;
967
968 ostringstream str;
969 //str << boost::this_thread::get_id() << " ";
970 str << "EventBuilder(";
971 if (err<0)
972 str << "---";
973 else
974 str << err;
975 str << "): " << message;
976
977 string &old = fLastMessage[boost::this_thread::get_id()];
978
979 if (str.str()==old)
980 return;
981 old = str.str();
982
983 fMsg.Update(str, severity);
984 }
985/*
986 void factStat(int64_t *stat, int len)
987 {
988 if (len!=7)
989 {
990 fMsg.Warn("factStat received unknown number of values.");
991 return;
992 }
993
994 vector<int64_t> data(1, g_maxMem);
995 data.insert(data.end(), stat, stat+len);
996
997 static vector<int64_t> last(8);
998 if (data==last)
999 return;
1000 last = data;
1001
1002 fDimStatistics.Update(data);
1003
1004 // len ist die Laenge des arrays.
1005 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
1006 // kannst Du pruefen, ob die 100MB voll sind ....
1007
1008 ostringstream str;
1009 str
1010 << "Wait=" << stat[0] << " "
1011 << "Skip=" << stat[1] << " "
1012 << "Del=" << stat[2] << " "
1013 << "Tot=" << stat[3] << " "
1014 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1015 << "Read=" << stat[5] << " "
1016 << "Conn=" << stat[6];
1017
1018 fMsg.Info(str);
1019 }
1020 */
1021
1022 void factStat(const EVT_STAT &stat)
1023 {
1024 fDimStatistics2.Update(stat);
1025 }
1026
1027 void factStat(const GUI_STAT &stat)
1028 {
1029 fDimStatistics1.Update(stat);
1030 }
1031
1032
1033 array<FAD::EventHeader, 40> fVecHeader;
1034
1035 template<typename T, class S>
1036 array<T, 42> Compare(const S *vec, const T *t)
1037 {
1038 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1039
1040 const T *min = NULL;
1041 const T *val = NULL;
1042 const T *max = NULL;
1043
1044 array<T, 42> arr;
1045
1046 bool rc = true;
1047 for (int i=0; i<40; i++)
1048 {
1049 const char *base = reinterpret_cast<const char*>(vec+i);
1050 const T *ref = reinterpret_cast<const T*>(base+offset);
1051
1052 arr[i] = *ref;
1053
1054 if (gi_NumConnect[i]!=7)
1055 {
1056 arr[i] = 0;
1057 continue;
1058 }
1059
1060 if (!val)
1061 {
1062 min = ref;
1063 val = ref;
1064 max = ref;
1065 }
1066
1067 if (*ref<*min)
1068 min = ref;
1069
1070 if (*ref>*max)
1071 max = ref;
1072
1073 if (*val!=*ref)
1074 rc = false;
1075 }
1076
1077 arr[40] = val ? *min : 1;
1078 arr[41] = val ? *max : 0;
1079
1080 return arr;
1081 }
1082
1083 template<typename T>
1084 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1085 {
1086 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1087
1088 T val = 0;
1089 T rc = 0;
1090
1091 array<T, 42> vec;
1092
1093 bool first = true;
1094
1095 for (int i=0; i<40; i++)
1096 {
1097 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1098 const T *ref = reinterpret_cast<const T*>(base+offset);
1099
1100 vec[i+2] = *ref;
1101
1102 if (gi_NumConnect[i]!=7)
1103 {
1104 vec[i+2] = 0;
1105 continue;
1106 }
1107
1108 if (first)
1109 {
1110 first = false;
1111 val = *ref;
1112 rc = 0;
1113 }
1114
1115 rc |= val^*ref;
1116 }
1117
1118 vec[0] = rc;
1119 vec[1] = val;
1120
1121 return vec;
1122 }
1123
1124 template<typename T, size_t N>
1125 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1126 {
1127// svc.setQuality(vec[40]<=vec[41]);
1128 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1129 svc.Update();
1130 }
1131
1132 template<typename T>
1133 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1134 {
1135 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1136 for (int i=0; i<40;i++)
1137 cout << " " << data.second[i+3];
1138 cout << endl;
1139 }
1140
1141 vector<uint> fNumConnected;
1142
1143 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1144 {
1145 const uint16_t id = h.Id();
1146 if (id>39)
1147 return;
1148
1149 if (fNumConnected.size()!=40)
1150 fNumConnected.resize(40);
1151
1152 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1153
1154 const bool changed = con!=fNumConnected || !IsThreadRunning();
1155
1156 fNumConnected = con;
1157
1158 const FAD::EventHeader old = fVecHeader[id];
1159 fVecHeader[id] = h;
1160
1161 if (old.fVersion != h.fVersion || changed)
1162 {
1163 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1164
1165 array<float,42> data;
1166 for (int i=0; i<42; i++)
1167 {
1168 ostringstream str;
1169 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1170 data[i] = stof(str.str());
1171 }
1172 Update(fDimFwVersion, data);
1173 }
1174
1175 if (old.fRunNumber != h.fRunNumber || changed)
1176 {
1177 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1178 fDimRunNumber.Update(run);
1179 }
1180
1181 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1182 {
1183 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1184 fDimPrescaler.Update(pre);
1185 }
1186
1187 if (old.fDNA != h.fDNA || changed)
1188 {
1189 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1190 Update(fDimDNA, dna, 40);
1191 }
1192
1193 if (old.fStatus != h.fStatus || changed)
1194 {
1195 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1196 Update(fDimStatus, sts);
1197 }
1198
1199 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1200 {
1201 array<uint16_t, FAD::kNumDac*42> dacs;
1202
1203 for (int i=0; i<FAD::kNumDac; i++)
1204 {
1205 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1206 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1207 }
1208
1209 Update(fDimDac, dacs);
1210 }
1211
1212 // -----------
1213
1214 static Time oldt(boost::date_time::neg_infin);
1215 Time newt;
1216
1217 if (newt>oldt+boost::posix_time::seconds(1))
1218 {
1219 oldt = newt;
1220
1221 // --- RefClock
1222
1223 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1224 Update(fDimRefClock, clk);
1225
1226 // --- Temperatures
1227
1228 const array<int16_t,42> tmp[4] =
1229 {
1230 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1231 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1232 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1233 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1234 };
1235
1236 vector<int16_t> data;
1237 data.reserve(82);
1238 data.push_back(tmp[0][40]); // min: 0
1239 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1240 data.push_back(tmp[0][41]); // max: 41
1241 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1242
1243 for (int j=1; j<=3; j++)
1244 {
1245 const array<int16_t,42> &ref = tmp[j];
1246
1247 // Gloabl min
1248 if (ref[40]<data[0]) // 40=min
1249 data[0] = ref[40];
1250
1251 // Global max
1252 if (ref[41]>data[41]) // 41=max
1253 data[41] = ref[41];
1254
1255 for (int i=0; i<40; i++)
1256 {
1257 // min per board
1258 if (ref[i]<data[i+1]) // data: 1-40
1259 data[i+1] = ref[i]; // ref: 0-39
1260
1261 // max per board
1262 if (ref[i]>data[i+42]) // data: 42-81
1263 data[i+42] = ref[i]; // ref: 0-39
1264 }
1265
1266
1267 }
1268
1269 vector<float> deg(82); // 0: global min, 1-40: min
1270 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1271 deg[i] = data[i]/16.;
1272 fDimTemperature.Update(deg);
1273 }
1274 }
1275};
1276
1277EventBuilderWrapper *EventBuilderWrapper::This = 0;
1278
1279// ----------- Event builder callbacks implementation ---------------
1280extern "C"
1281{
1282 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1283 {
1284 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1285 }
1286
1287 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1288 {
1289 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1290 }
1291
1292 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1293 {
1294 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1295 }
1296
1297 void factOut(int severity, int err, const char *message)
1298 {
1299 EventBuilderWrapper::This->factOut(severity, err, message);
1300 }
1301
1302 void factStat(GUI_STAT stat)
1303 {
1304 EventBuilderWrapper::This->factStat(stat);
1305 }
1306
1307 void factStatNew(EVT_STAT stat)
1308 {
1309 EventBuilderWrapper::This->factStat(stat);
1310 }
1311
1312 void debugHead(int socket, int/*board*/, void *buf)
1313 {
1314 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1315
1316 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1317 }
1318
1319 void debugStream(int isock, void *buf, int len)
1320 {
1321 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1322 }
1323
1324 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1325 {
1326 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1327 }
1328
1329 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
1330 {
1331 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event);
1332 }
1333
1334 void gotNewRun(int runnr, PEVNT_HEADER *headers)
1335 {
1336 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1337 }
1338
1339 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer)
1340 {
1341 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, buffer);
1342 }
1343
1344}
1345
1346#endif
Note: See TracBrowser for help on using the repository browser.