source: trunk/FACT++/src/EventBuilderWrapper.h@ 12105

Last change on this file since 12105 was 12091, checked in by tbretz, 13 years ago
Updated to the latest event builder version which includes runEnd and runStart to control the memory of the processing threads.
File size: 38.4 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 enum FileFormat_t
78 {
79 kNone = 0,
80 kDebug,
81 kFits,
82 kRaw,
83 kCalib
84 };
85
86 FileFormat_t fFileFormat;
87
88
89 uint32_t fMaxRun;
90 uint32_t fLastOpened;
91 uint32_t fLastClosed;
92 uint32_t fNumEvts[4];
93
94 DimWriteStatistics fDimWriteStats;
95 DimDescribedService fDimRuns;
96 DimDescribedService fDimEvents;
97 DimDescribedService fDimRawData;
98 DimDescribedService fDimEventData;
99 DimDescribedService fDimFeedbackData;
100 DimDescribedService fDimFwVersion;
101 DimDescribedService fDimRunNumber;
102 DimDescribedService fDimStatus;
103 DimDescribedService fDimDNA;
104 DimDescribedService fDimTemperature;
105 DimDescribedService fDimPrescaler;
106 DimDescribedService fDimRefClock;
107 DimDescribedService fDimRoi;
108 DimDescribedService fDimDac;
109 DimDescribedService fDimDrsCalibration;
110 DimDescribedService fDimStatistics1;
111 DimDescribedService fDimStatistics2;
112
113 bool fDebugStream;
114 bool fDebugRead;
115 bool fDebugLog;
116
117 string fPath;
118 uint32_t fRunNumber;
119
120protected:
121 int64_t InitRunNumber()
122 {
123 fRunNumber = 1000;
124
125 // Ensure that the night doesn't change during our check
126 const int check = Time().NightAsInt();
127
128 while (--fRunNumber>0)
129 {
130 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
131
132 if (access((name+"bin").c_str(), F_OK) == 0)
133 break;
134 if (access((name+"fits").c_str(), F_OK) == 0)
135 break;
136 if (access((name+"drs.fits").c_str(), F_OK) == 0)
137 break;
138
139 }
140
141 if (check != Time().NightAsInt())
142 return InitRunNumber();
143
144 fRunNumber++;
145
146 if (fRunNumber==1000)
147 {
148 fMsg.Error("You have a file with run-number 1000 in "+fPath);
149 return -1;
150 }
151
152 ostringstream str;
153 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
154 fMsg.Message(str);
155
156 fMsg.Info(" ==> TODO: Crosscheck with database!");
157
158 return check;
159 }
160
161 int64_t InitRunNumber(const string &path)
162 {
163 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
164 return -1;
165
166 //const fs::path fullPath = fs::system_complete(fs::path(path));
167
168 fPath = path;
169
170 fDimWriteStats.SetCurrentFolder(fPath);
171
172 return InitRunNumber();
173 }
174
175public:
176 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
177 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
178 fDimWriteStats ("FAD_CONTROL", imp),
179 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
180 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
181 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
182 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
183 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
184 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
185 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
186 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
187 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
188 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
189 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
190 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
191 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
192 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
193 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560", ""),
194 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
195 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
196 fDebugStream(false), fDebugRead(false), fDebugLog(false)
197 {
198 if (This)
199 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
200
201 This = this;
202
203 memset(fNumEvts, 0, sizeof(fNumEvts));
204 fDimEvents.Update(fNumEvts);
205
206 for (size_t i=0; i<40; i++)
207 ConnectSlot(i, tcp::endpoint());
208 }
209 virtual ~EventBuilderWrapper()
210 {
211 Abort();
212
213 // FIXME: Used timed_join and abort afterwards
214 // What's the maximum time the eb need to abort?
215 fThread.join();
216 //ffMsg.Info("EventBuilder stopped.");
217
218 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
219 delete *it;
220 }
221
222 struct RunDescription
223 {
224 uint32_t maxtime;
225 uint32_t maxevt;
226
227 string name;
228
229 FAD::Configuration reference;
230
231 bool started;
232 };
233
234 map<uint32_t, RunDescription> fExpectedRuns;
235
236 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
237 {
238 if (maxtime<=0 || maxtime>24*60*60)
239 maxtime = 24*60*60;
240 if (maxevt<=0 || maxevt>INT32_MAX)
241 maxevt = INT32_MAX;
242
243 const RunDescription descr =
244 {
245 uint32_t(maxtime),
246 uint32_t(maxevt),
247 ref.first,
248 ref.second,
249 false
250 };
251
252 // FIMXE: Maybe reset an event counter so that the mcp can count events?
253
254 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
255
256 fExpectedRuns[fRunNumber] = descr;
257 return fRunNumber++;
258 }
259
260 bool IsThreadRunning()
261 {
262 return !fThread.timed_join(boost::posix_time::microseconds(0));
263 }
264
265 void SetMaxMemory(unsigned int mb) const
266 {
267 /*
268 if (mb*1000000<GetUsedMemory())
269 {
270 // ffMsg.Warn("...");
271 return;
272 }*/
273
274 g_maxMem = size_t(mb)*1000000;
275 }
276
277 void StartThread(const vector<tcp::endpoint> &addr)
278 {
279 if (IsThreadRunning())
280 {
281 fMsg.Warn("Start - EventBuilder still running");
282 return;
283 }
284
285 fLastMessage.clear();
286
287 for (size_t i=0; i<40; i++)
288 ConnectSlot(i, addr[i]);
289
290 g_runStat = kModeRun;
291 g_maxProc = 3;
292
293 fMsg.Message("Starting EventBuilder thread");
294
295 fThread = boost::thread(StartEvtBuild);
296 }
297 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
298 {
299 if (i>39)
300 return;
301
302 if (addr==tcp::endpoint())
303 {
304 DisconnectSlot(i);
305 return;
306 }
307
308 g_port[i].sockAddr.sin_family = AF_INET;
309 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
310 g_port[i].sockAddr.sin_port = htons(addr.port());
311 // In this order
312 g_port[i].sockDef = 1;
313 }
314 void DisconnectSlot(unsigned int i)
315 {
316 if (i>39)
317 return;
318
319 g_port[i].sockDef = 0;
320 // In this order
321 g_port[i].sockAddr.sin_family = AF_INET;
322 g_port[i].sockAddr.sin_addr.s_addr = 0;
323 g_port[i].sockAddr.sin_port = 0;
324 }
325 void IgnoreSlot(unsigned int i)
326 {
327 if (i>39)
328 return;
329 if (g_port[i].sockAddr.sin_port==0)
330 return;
331
332 g_port[i].sockDef = -1;
333 }
334
335
336 void Abort()
337 {
338 fMsg.Message("Signal abort to EventBuilder thread...");
339 g_runStat = kAbort;
340 }
341
342 void ResetThread(bool soft)
343 {
344 /*
345 if (g_reset > 0)
346
347 * suspend reading
348 * reset = g_reset;
349 * g_reset=0
350
351 * reset% 10
352 == 0 leave event Buffers as they are
353 == 1 let all buffers drain (write (incomplete) events)
354 > 1 flush all buffers (do not write buffered events)
355
356 * (reset/10)%10
357 > 0 close all sockets and destroy them (also free the
358 allocated read-buffers)
359 recreate before resuming operation
360 [ this is more than just close/open that can be
361 triggered by e.g. close/open the base-socket ]
362
363 * (reset/100)%10
364 > 0 close all open run-files
365
366 * (reset/1000)
367 sleep so many seconds before resuming operation
368 (does not (yet) take into account time left when waiting
369 for buffers getting empty ...)
370
371 * resume_reading
372
373 */
374 fMsg.Message("Signal reset to EventBuilder thread...");
375 g_reset = soft ? 101 : 102;
376 }
377
378 void Exit()
379 {
380 fMsg.Message("Signal exit to EventBuilder thread...");
381 g_runStat = kExit;
382 }
383
384 /*
385 void Wait()
386 {
387 fThread.join();
388 ffMsg.Message("EventBuilder stopped.");
389 }*/
390
391 void Hybernate() const { g_runStat = kHybernate; }
392 void Sleep() const { g_runStat = kSleep; }
393 void FlushMode() const { g_runStat = kModeFlush; }
394 void TestMode() const { g_runStat = kModeTest; }
395 void FlagMode() const { g_runStat = kModeFlag; }
396 void RunMode() const { g_runStat = kModeRun; }
397
398 // FIXME: To be removed
399 void SetMode(int mode) const { g_runStat = mode; }
400
401 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
402 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
403 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
404 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
405
406 /*
407 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
408 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
409 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
410 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
411 */
412
413 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
414 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
415
416 void SetOutputFormat(FileFormat_t f)
417 {
418 fFileFormat = f;
419 if (fFileFormat==kCalib)
420 {
421 DataCalib::Restart();
422 DataCalib::Update(fDimDrsCalibration);
423 fMsg.Message("Resetted DRS calibration.");
424 }
425 }
426
427 void SetDebugLog(bool b) { fDebugLog = b; }
428
429 void SetDebugStream(bool b)
430 {
431 fDebugStream = b;
432 if (b)
433 return;
434
435 for (int i=0; i<40; i++)
436 {
437 if (!fDumpStream[i].is_open())
438 continue;
439
440 fDumpStream[i].close();
441
442 ostringstream name;
443 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
444 fMsg.Message("Closed file '"+name.str()+"'");
445 }
446 }
447
448 void SetDebugRead(bool b)
449 {
450 fDebugRead = b;
451 if (b || !fDumpRead.is_open())
452 return;
453
454 fDumpRead.close();
455 fMsg.Message("Closed file 'socket_events.txt'");
456 }
457
458// size_t GetUsedMemory() const { return gi_usedMem; }
459
460 void LoadDrsCalibration(const char *fname)
461 {
462 if (!DataCalib::ReadFits(fname, fMsg))
463 return;
464 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
465 DataCalib::Update(fDimDrsCalibration);
466 }
467
468 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
469
470
471 /*
472 struct OpenFileToDim
473 {
474 int code;
475 char fileName[FILENAME_MAX];
476 };
477
478 SignalRunOpened(runid, filename);
479 // Send num open files
480 // Send runid, (more info about the run?), filename via dim
481
482 SignalEvtWritten(runid);
483 // Send num events written of newest file
484
485 SignalRunClose(runid);
486 // Send new num open files
487 // Send empty file-name if no file is open
488
489 */
490
491 // -------------- Mapped event builder callbacks ------------------
492
493 void UpdateRuns(const string &fname="")
494 {
495 uint32_t values[5] =
496 {
497 static_cast<uint32_t>(fFiles.size()),
498 0xffffffff,
499 0,
500 fLastOpened,
501 fLastClosed
502 };
503
504 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
505 it!=fFiles.end(); it++)
506 {
507 const DataProcessorImp *file = *it;
508
509 if (file->GetRunId()<values[1])
510 values[1] = file->GetRunId();
511
512 if (file->GetRunId()>values[2])
513 values[2] = file->GetRunId();
514 }
515
516 fMaxRun = values[2];
517
518 vector<char> data(sizeof(values)+fname.size()+1);
519 memcpy(data.data(), values, sizeof(values));
520 strcpy(data.data()+sizeof(values), fname.c_str());
521
522 fDimRuns.Update(data);
523 }
524
525 vector<DataProcessorImp*> fFiles;
526
527 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
528 {
529 fMsg.Info(" ==> TODO: Update run configuration in database!");
530 fMsg.Info(" ==> TODO: Remove run from fExpected runs in case of failure!");
531 fMsg.Info(" ==> TODO: Write information from fTargetConfig to header!");
532
533 // Check if file already exists...
534 DataProcessorImp *file = 0;
535 switch (fFileFormat)
536 {
537 case kNone: file = new DataDump(fPath, runid, fMsg); break;
538 case kDebug: file = new DataDebug(fPath, runid, fMsg); break;
539 case kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
540 case kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
541 case kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
542 }
543
544 try
545 {
546 if (!file->Open(h))
547 return 0;
548 }
549 catch (const exception &e)
550 {
551 fMsg.Error("Exception trying to open file: "+string(e.what()));
552 return 0;
553 }
554
555 fFiles.push_back(file);
556
557 ostringstream str;
558 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
559 fMsg.Info(str);
560
561 fDimWriteStats.FileOpened(file->GetFileName());
562
563 fLastOpened = runid;
564 UpdateRuns(file->GetFileName());
565
566 fNumEvts[kEventId] = 0;
567 fNumEvts[kTriggerId] = 0;
568
569 fNumEvts[kCurrent] = 0;
570 fDimEvents.Update(fNumEvts);
571 // fDimCurrentEvent.Update(uint32_t(0));
572
573 return reinterpret_cast<FileHandle_t>(file);
574 }
575
576 int runWrite(FileHandle_t handler, EVENT *e, size_t sz)
577 {
578 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
579
580 if (!file->WriteEvt(e))
581 return -1;
582
583 if (file->GetRunId()==fMaxRun)
584 {
585 fNumEvts[kCurrent]++;
586 fNumEvts[kEventId] = e->EventNum;
587 fNumEvts[kTriggerId] = e->TriggerNum;
588 }
589
590 fNumEvts[kTotal]++;
591
592 static Time oldt(boost::date_time::neg_infin);
593 Time newt;
594 if (newt>oldt+boost::posix_time::seconds(1))
595 {
596 fDimEvents.Update(fNumEvts);
597 oldt = newt;
598 }
599
600
601 // ===> SignalEvtWritten(runid);
602 // Send num events written of newest file
603
604 /* close run runId (all all runs if runId=0) */
605 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
606 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
607
608 return 0;
609 }
610
611 virtual void CloseRun(uint32_t runid) { }
612
613 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
614 {
615 fMsg.Info(" ==> TODO: Update run configuration in database!");
616 fMsg.Info(" ==> TODO: Remove run from fExpected runs!");
617
618 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
619
620 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
621 if (it==fFiles.end())
622 {
623 ostringstream str;
624 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
625 fMsg.Fatal(str);
626 return -1;
627 }
628
629 fFiles.erase(it);
630
631 fLastClosed = file->GetRunId();
632 CloseRun(fLastClosed);
633 UpdateRuns();
634
635 fDimEvents.Update(fNumEvts);
636
637 const bool rc = file->Close(tail);
638 if (!rc)
639 {
640 // Error message
641 }
642
643 ostringstream str;
644 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
645 fMsg.Info(str);
646
647 delete file;
648
649 // ==> SignalRunClose(runid);
650 // Send new num open files
651 // Send empty file-name if no file is open
652
653 return rc ? 0 : -1;
654 }
655
656 ofstream fDumpStream[40];
657
658 void debugStream(int isock, void *buf, int len)
659 {
660 if (!fDebugStream)
661 return;
662
663 const int slot = isock/7;
664 if (slot<0 || slot>39)
665 return;
666
667 if (!fDumpStream[slot].is_open())
668 {
669 ostringstream name;
670 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
671
672 fDumpStream[slot].open(name.str().c_str(), ios::app);
673 if (!fDumpStream[slot])
674 {
675 ostringstream str;
676 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
677 fMsg.Error(str);
678
679 return;
680 }
681
682 fMsg.Message("Opened file '"+name.str()+"' for writing.");
683 }
684
685 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
686 }
687
688 ofstream fDumpRead; // Stream to possibly dump docket events
689
690 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
691 {
692 // isock = socketID (0-279)
693 // ibyte = #bytes gelesen
694 // event = eventId (oder 0 wenn noch nicht bekannt)
695 // state : 1=finished reading data
696 // 0=reading data
697 // -1=start reading data (header)
698 // -2=start reading data,
699 // eventId not known yet (too little data)
700 // tsec, tusec = time when reading seconds, microseconds
701 //
702 if (!fDebugRead || ibyte==0)
703 return;
704
705 if (!fDumpRead.is_open())
706 {
707 fDumpRead.open("socket_events.txt", ios::app);
708 if (!fDumpRead)
709 {
710 ostringstream str;
711 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
712 fMsg.Error(str);
713
714 return;
715 }
716
717 fMsg.Message("Opened file 'socket_events.txt' for writing.");
718
719 fDumpRead << "# START: " << Time().GetAsStr() << endl;
720 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
721 }
722
723 fDumpRead
724 << setw(2) << state << " "
725 << setw(8) << tsec << " "
726 << setw(9) << tusec << " "
727 << setw(3) << isock << " "
728 << setw(2) << isock/7 << " "
729 << runno << " "
730 << event << " "
731 << ftmevt << " "
732 << ibyte << endl;
733 }
734
735 array<uint16_t,2> fVecRoi;
736
737 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int iboard)
738 {
739 /*
740 fadhd[i] ist ein array mit den 40 fad-headers
741 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
742
743 event ist die Struktur, die auch die write routine erhaelt;
744 darin sind im header die 'soll-werte' fuer z.B. eventID
745 als auch die ADC-Werte (falls Du die brauchst)
746
747 Wenn die routine einen negativen Wert liefert, wird das event
748 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
749 */
750
751 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
752
753 if (roi!=fVecRoi)
754 {
755 Update(fDimRoi, roi);
756 fVecRoi = roi;
757 }
758
759 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
760 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
761
762 // FIMXE: Compare with target configuration
763
764 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
765 {
766 // FIXME: Compare with expectations!!!
767 if (ptr->fStartDelimiter==0)
768 {
769 if (ptr==beg)
770 beg++;
771 continue;
772 }
773
774 if (beg->fStatus != ptr->fStatus)
775 {
776 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
777 CloseRunFile(runNr, 0, 0);
778 return -1;
779 }
780
781 if (beg->fRunNumber != ptr->fRunNumber)
782 {
783 fMsg.Error("Inconsistent run number detected.... closing run.");
784 CloseRunFile(runNr, 0, 0);
785 return -1;
786 }
787
788 /*
789 if (beg->fVersion != ptr->fVersion)
790 {
791 Error("Inconsist firmware version detected.... closing run.");
792 CloseRunFile(runNr, 0, 0);
793 break;
794 }
795 if (beg->fEventCounter != ptr->fEventCounter)
796 {
797 Error("Inconsist run number detected.... closing run.");
798 CloseRunFile(runNr, 0, 0);
799 break;
800 }
801 if (beg->fTriggerCounter != ptr->fTriggerCounter)
802 {
803 Error("Inconsist trigger number detected.... closing run.");
804 CloseRunFile(runNr, 0, 0);
805 break;
806 }*/
807
808 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
809 {
810 fMsg.Error("Inconsistent phase shift detected.... closing run.");
811 CloseRunFile(runNr, 0, 0);
812 return -1;
813 }
814
815 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
816 {
817 fMsg.Error("Inconsistent DAC values detected.... closing run.");
818 CloseRunFile(runNr, 0, 0);
819 return -1;
820 }
821
822 if (beg->fTriggerType != ptr->fTriggerType)
823 {
824 fMsg.Error("Inconsistent trigger type detected.... closing run.");
825 CloseRunFile(runNr, 0, 0);
826 return -1;
827 }
828 }
829
830 // check REFCLK_frequency
831 // check consistency with command configuration
832 // how to log errors?
833 // need gotNewRun/closedRun to know it is finished
834
835 return 0;
836 }
837
838 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
839 {
840 // Currently we send any event no matter what its trigger id is...
841 // To be changed.
842 static Time oldt(boost::date_time::neg_infin);
843 Time newt;
844
845 // FIXME: Only send events if the have newer run-numbers
846 if (newt<oldt+boost::posix_time::seconds(1))
847 return;
848
849 oldt = newt;
850
851 vector<char> data(sizeof(EVENT)+event->Roi*sizeof(float)*1440);
852 memcpy(data.data(), event, sizeof(EVENT));
853
854 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
855
856 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
857 fDimRawData.Update(data);
858
859 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
860 CalibData::GetPixelStats(data2.data(), vec, event->Roi);
861
862 fDimEventData.Update(data2);
863 }
864
865 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
866 {
867 if (!DataCalib::IsValid())
868 return;
869
870/*
871 // Workaround to find a valid header.....
872 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
873 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
874
875 // FIMXE: Compare with target configuration
876
877 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++, beg++)
878 if (ptr->fStartDelimiter!=0)
879 break;
880
881 if (ptr->fStartDelimiter==0)
882 return;
883*/
884 /*
885 // FIXME: Time limit?!
886 static Time oldt(boost::date_time::neg_infin);
887 Time newt;
888
889 // FIXME: Only send events if the have newer run-numbers
890 if (newt<oldt+boost::posix_time::milliseconds(100))
891 return;
892
893 oldt = newt;
894 */
895
896 // FIXME: Check event type here
897
898 vector<float> data(event->Roi*1440);
899
900 DataCalib::Apply(data.data(), event->Adc_Data, event->StartPix, event->Roi);
901
902 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
903 CalibData::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
904
905// dim_lock();
906 fDimFeedbackData.Update(data2);
907// dim_unlock();
908 }
909
910 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t iboard, void */*buffer*/)
911 {
912 switch (threadID)
913 {
914 case 0:
915 SendRawData(fadhd, event);
916 return 1;
917 case 1:
918 SendFeedbackData(fadhd, event);
919 return 2;
920 }
921 return 100;
922 }
923
924
925 bool IsRunStarted() const
926 {
927 const map<uint32_t,RunDescription>::const_iterator it = fExpectedRuns.find(fRunNumber-1);
928 return it==fExpectedRuns.end();// ? true : it->second.started;
929 }
930
931 uint32_t GetRunNumber() const
932 {
933 return fRunNumber;
934 }
935
936 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
937 {
938 // This function is called even when writing is switched off
939 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
940 if (it==fExpectedRuns.end())
941 {
942 ostringstream str;
943 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
944 return;
945 }
946
947 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
948 // return: 0=close scheduled / >0 already closed / <0 does not exist
949
950 // FIXME: Move configuration from expected runs to runs which will soon
951 // be opened/closed
952
953 it->second.started = true;
954
955 fExpectedRuns.erase(it);
956 }
957
958 map<boost::thread::id, string> fLastMessage;
959
960 void factOut(int severity, int err, const char *message)
961 {
962 if (!fDebugLog && severity==99)
963 return;
964
965 ostringstream str;
966 //str << boost::this_thread::get_id() << " ";
967 str << "EventBuilder(";
968 if (err<0)
969 str << "---";
970 else
971 str << err;
972 str << "): " << message;
973
974 string &old = fLastMessage[boost::this_thread::get_id()];
975
976 if (str.str()==old)
977 return;
978 old = str.str();
979
980 fMsg.Update(str, severity);
981 }
982/*
983 void factStat(int64_t *stat, int len)
984 {
985 if (len!=7)
986 {
987 fMsg.Warn("factStat received unknown number of values.");
988 return;
989 }
990
991 vector<int64_t> data(1, g_maxMem);
992 data.insert(data.end(), stat, stat+len);
993
994 static vector<int64_t> last(8);
995 if (data==last)
996 return;
997 last = data;
998
999 fDimStatistics.Update(data);
1000
1001 // len ist die Laenge des arrays.
1002 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
1003 // kannst Du pruefen, ob die 100MB voll sind ....
1004
1005 ostringstream str;
1006 str
1007 << "Wait=" << stat[0] << " "
1008 << "Skip=" << stat[1] << " "
1009 << "Del=" << stat[2] << " "
1010 << "Tot=" << stat[3] << " "
1011 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1012 << "Read=" << stat[5] << " "
1013 << "Conn=" << stat[6];
1014
1015 fMsg.Info(str);
1016 }
1017 */
1018
1019 void factStat(const EVT_STAT &stat)
1020 {
1021 fDimStatistics2.Update(stat);
1022 }
1023
1024 void factStat(const GUI_STAT &stat)
1025 {
1026 fDimStatistics1.Update(stat);
1027 }
1028
1029
1030 array<FAD::EventHeader, 40> fVecHeader;
1031
1032 template<typename T, class S>
1033 array<T, 42> Compare(const S *vec, const T *t)
1034 {
1035 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1036
1037 const T *min = NULL;
1038 const T *val = NULL;
1039 const T *max = NULL;
1040
1041 array<T, 42> arr;
1042
1043 bool rc = true;
1044 for (int i=0; i<40; i++)
1045 {
1046 const char *base = reinterpret_cast<const char*>(vec+i);
1047 const T *ref = reinterpret_cast<const T*>(base+offset);
1048
1049 arr[i] = *ref;
1050
1051 if (gi_NumConnect[i]!=7)
1052 {
1053 arr[i] = 0;
1054 continue;
1055 }
1056
1057 if (!val)
1058 {
1059 min = ref;
1060 val = ref;
1061 max = ref;
1062 }
1063
1064 if (*ref<*min)
1065 min = ref;
1066
1067 if (*ref>*max)
1068 max = ref;
1069
1070 if (*val!=*ref)
1071 rc = false;
1072 }
1073
1074 arr[40] = val ? *min : 1;
1075 arr[41] = val ? *max : 0;
1076
1077 return arr;
1078 }
1079
1080 template<typename T>
1081 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1082 {
1083 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1084
1085 T val = 0;
1086 T rc = 0;
1087
1088 array<T, 42> vec;
1089
1090 bool first = true;
1091
1092 for (int i=0; i<40; i++)
1093 {
1094 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1095 const T *ref = reinterpret_cast<const T*>(base+offset);
1096
1097 vec[i+2] = *ref;
1098
1099 if (gi_NumConnect[i]!=7)
1100 {
1101 vec[i+2] = 0;
1102 continue;
1103 }
1104
1105 if (first)
1106 {
1107 first = false;
1108 val = *ref;
1109 rc = 0;
1110 }
1111
1112 rc |= val^*ref;
1113 }
1114
1115 vec[0] = rc;
1116 vec[1] = val;
1117
1118 return vec;
1119 }
1120
1121 template<typename T, size_t N>
1122 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1123 {
1124// svc.setQuality(vec[40]<=vec[41]);
1125 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1126 svc.Update();
1127 }
1128
1129 template<typename T>
1130 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1131 {
1132 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1133 for (int i=0; i<40;i++)
1134 cout << " " << data.second[i+3];
1135 cout << endl;
1136 }
1137
1138 vector<uint> fNumConnected;
1139
1140 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1141 {
1142 const uint16_t id = h.Id();
1143 if (id>39)
1144 return;
1145
1146 if (fNumConnected.size()!=40)
1147 fNumConnected.resize(40);
1148
1149 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1150
1151 const bool changed = con!=fNumConnected || !IsThreadRunning();
1152
1153 fNumConnected = con;
1154
1155 const FAD::EventHeader old = fVecHeader[id];
1156 fVecHeader[id] = h;
1157
1158 if (old.fVersion != h.fVersion || changed)
1159 {
1160 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1161
1162 array<float,42> data;
1163 for (int i=0; i<42; i++)
1164 {
1165 ostringstream str;
1166 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1167 data[i] = stof(str.str());
1168 }
1169 Update(fDimFwVersion, data);
1170 }
1171
1172 if (old.fRunNumber != h.fRunNumber || changed)
1173 {
1174 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1175 fDimRunNumber.Update(run);
1176 }
1177
1178 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1179 {
1180 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1181 fDimPrescaler.Update(pre);
1182 }
1183
1184 if (old.fDNA != h.fDNA || changed)
1185 {
1186 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1187 Update(fDimDNA, dna, 40);
1188 }
1189
1190 if (old.fStatus != h.fStatus || changed)
1191 {
1192 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1193 Update(fDimStatus, sts);
1194 }
1195
1196 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1197 {
1198 array<uint16_t, FAD::kNumDac*42> dacs;
1199
1200 for (int i=0; i<FAD::kNumDac; i++)
1201 {
1202 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1203 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1204 }
1205
1206 Update(fDimDac, dacs);
1207 }
1208
1209 // -----------
1210
1211 static Time oldt(boost::date_time::neg_infin);
1212 Time newt;
1213
1214 if (newt>oldt+boost::posix_time::seconds(1))
1215 {
1216 oldt = newt;
1217
1218 // --- RefClock
1219
1220 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1221 Update(fDimRefClock, clk);
1222
1223 // --- Temperatures
1224
1225 const array<int16_t,42> tmp[4] =
1226 {
1227 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1228 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1229 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1230 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1231 };
1232
1233 vector<int16_t> data;
1234 data.reserve(82);
1235 data.push_back(tmp[0][40]); // min: 0
1236 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1237 data.push_back(tmp[0][41]); // max: 41
1238 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1239
1240 for (int j=1; j<=3; j++)
1241 {
1242 const array<int16_t,42> &ref = tmp[j];
1243
1244 // Gloabl min
1245 if (ref[40]<data[0]) // 40=min
1246 data[0] = ref[40];
1247
1248 // Global max
1249 if (ref[41]>data[41]) // 41=max
1250 data[41] = ref[41];
1251
1252 for (int i=0; i<40; i++)
1253 {
1254 // min per board
1255 if (ref[i]<data[i+1]) // data: 1-40
1256 data[i+1] = ref[i]; // ref: 0-39
1257
1258 // max per board
1259 if (ref[i]>data[i+42]) // data: 42-81
1260 data[i+42] = ref[i]; // ref: 0-39
1261 }
1262
1263
1264 }
1265
1266 vector<float> deg(82); // 0: global min, 1-40: min
1267 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1268 deg[i] = data[i]/16.;
1269 fDimTemperature.Update(deg);
1270 }
1271 }
1272};
1273
1274EventBuilderWrapper *EventBuilderWrapper::This = 0;
1275
1276// ----------- Event builder callbacks implementation ---------------
1277extern "C"
1278{
1279 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1280 {
1281 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1282 }
1283
1284 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1285 {
1286 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1287 }
1288
1289 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1290 {
1291 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1292 }
1293
1294 // -----
1295
1296 void *runStart(uint32_t irun, RUN_HEAD *runhd, size_t len)
1297 {
1298 return NULL;
1299 }
1300
1301 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
1302 {
1303 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
1304 }
1305
1306 int runEnd(uint32_t, void *runPtr)
1307 {
1308 return 0;
1309 }
1310
1311 // -----
1312
1313 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
1314 {
1315 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
1316 }
1317
1318 void gotNewRun(int runnr, PEVNT_HEADER *headers)
1319 {
1320 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1321 }
1322
1323 // -----
1324
1325 void factOut(int severity, int err, const char *message)
1326 {
1327 EventBuilderWrapper::This->factOut(severity, err, message);
1328 }
1329
1330 void factStat(GUI_STAT stat)
1331 {
1332 EventBuilderWrapper::This->factStat(stat);
1333 }
1334
1335 void factStatNew(EVT_STAT stat)
1336 {
1337 EventBuilderWrapper::This->factStat(stat);
1338 }
1339
1340 // ------
1341
1342 void debugHead(int socket, int/*board*/, void *buf)
1343 {
1344 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1345
1346 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1347 }
1348
1349 void debugStream(int isock, void *buf, int len)
1350 {
1351 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1352 }
1353
1354 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1355 {
1356 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1357 }
1358}
1359
1360#endif
Note: See TracBrowser for help on using the repository browser.