source: trunk/FACT++/src/EventBuilderWrapper.h@ 12472

Last change on this file since 12472 was 12470, checked in by tbretz, 14 years ago
Adapted the length of the fDimDrsCalibration format; added a proper error message if the path for InitRunNumber doesn't exist.
File size: 38.6 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 enum FileFormat_t
78 {
79 kNone = 0,
80 kDebug,
81 kFits,
82 kRaw,
83 kCalib
84 };
85
86 FileFormat_t fFileFormat;
87
88
89 uint32_t fMaxRun;
90 uint32_t fLastOpened;
91 uint32_t fLastClosed;
92 uint32_t fNumEvts[4];
93
94 DimWriteStatistics fDimWriteStats;
95 DimDescribedService fDimRuns;
96 DimDescribedService fDimEvents;
97 DimDescribedService fDimRawData;
98 DimDescribedService fDimEventData;
99 DimDescribedService fDimFeedbackData;
100 DimDescribedService fDimFwVersion;
101 DimDescribedService fDimRunNumber;
102 DimDescribedService fDimStatus;
103 DimDescribedService fDimDNA;
104 DimDescribedService fDimTemperature;
105 DimDescribedService fDimPrescaler;
106 DimDescribedService fDimRefClock;
107 DimDescribedService fDimRoi;
108 DimDescribedService fDimDac;
109 DimDescribedService fDimDrsCalibration;
110 DimDescribedService fDimStatistics1;
111 DimDescribedService fDimStatistics2;
112
113 bool fDebugStream;
114 bool fDebugRead;
115 bool fDebugLog;
116
117 string fPath;
118 uint32_t fRunNumber;
119
120protected:
121 int64_t InitRunNumber()
122 {
123 fRunNumber = 1000;
124
125 // Ensure that the night doesn't change during our check
126 const int check = Time().NightAsInt();
127
128 while (--fRunNumber>0)
129 {
130 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
131
132 if (access((name+"bin").c_str(), F_OK) == 0)
133 break;
134 if (access((name+"fits").c_str(), F_OK) == 0)
135 break;
136 if (access((name+"drs.fits").c_str(), F_OK) == 0)
137 break;
138
139 }
140
141 if (check != Time().NightAsInt())
142 return InitRunNumber();
143
144 fRunNumber++;
145
146 if (fRunNumber==1000)
147 {
148 fMsg.Error("You have a file with run-number 1000 in "+fPath);
149 return -1;
150 }
151
152 ostringstream str;
153 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
154 fMsg.Message(str);
155
156 fMsg.Info(" ==> TODO: Crosscheck with database!");
157
158 return check;
159 }
160
161 int64_t InitRunNumber(const string &path)
162 {
163 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
164 {
165 fMsg.Error("Data path "+path+" does not exist!");
166 return -1;
167 }
168
169 //const fs::path fullPath = fs::system_complete(fs::path(path));
170
171 fPath = path;
172
173 fDimWriteStats.SetCurrentFolder(fPath);
174
175 return InitRunNumber();
176 }
177
178public:
179 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
180 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
181 fDimWriteStats ("FAD_CONTROL", imp),
182 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
183 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
184 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
185 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
186 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
187 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
188 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
189 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
190 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
191 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
192 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
193 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
194 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
195 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
196 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:3;I:1;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:163840;F:163840", ""),
197 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
198 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
199 fDebugStream(false), fDebugRead(false), fDebugLog(false)
200 {
201 if (This)
202 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
203
204 This = this;
205
206 memset(fNumEvts, 0, sizeof(fNumEvts));
207 fDimEvents.Update(fNumEvts);
208
209 for (size_t i=0; i<40; i++)
210 ConnectSlot(i, tcp::endpoint());
211 }
212 virtual ~EventBuilderWrapper()
213 {
214 Abort();
215
216 // FIXME: Used timed_join and abort afterwards
217 // What's the maximum time the eb need to abort?
218 fThread.join();
219 //ffMsg.Info("EventBuilder stopped.");
220
221 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
222 delete *it;
223 }
224
225 struct RunDescription
226 {
227 uint32_t maxtime;
228 uint32_t maxevt;
229
230 string name;
231
232 FAD::Configuration reference;
233
234 bool started;
235 };
236
237 map<uint32_t, RunDescription> fExpectedRuns;
238
239 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
240 {
241 if (maxtime<=0 || maxtime>24*60*60)
242 maxtime = 24*60*60;
243 if (maxevt<=0 || maxevt>INT32_MAX)
244 maxevt = INT32_MAX;
245
246 const RunDescription descr =
247 {
248 uint32_t(maxtime),
249 uint32_t(maxevt),
250 ref.first,
251 ref.second,
252 false
253 };
254
255 // FIMXE: Maybe reset an event counter so that the mcp can count events?
256
257 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
258
259 fExpectedRuns[fRunNumber] = descr;
260 return fRunNumber++;
261 }
262
263 bool IsThreadRunning()
264 {
265 return !fThread.timed_join(boost::posix_time::microseconds(0));
266 }
267
268 void SetMaxMemory(unsigned int mb) const
269 {
270 /*
271 if (mb*1000000<GetUsedMemory())
272 {
273 // ffMsg.Warn("...");
274 return;
275 }*/
276
277 g_maxMem = size_t(mb)*1000000;
278 }
279
280 void StartThread(const vector<tcp::endpoint> &addr)
281 {
282 if (IsThreadRunning())
283 {
284 fMsg.Warn("Start - EventBuilder still running");
285 return;
286 }
287
288 fLastMessage.clear();
289
290 for (size_t i=0; i<40; i++)
291 ConnectSlot(i, addr[i]);
292
293 g_runStat = kModeRun;
294 g_maxProc = 3;
295
296 fMsg.Message("Starting EventBuilder thread");
297
298 fThread = boost::thread(StartEvtBuild);
299 }
300 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
301 {
302 if (i>39)
303 return;
304
305 if (addr==tcp::endpoint())
306 {
307 DisconnectSlot(i);
308 return;
309 }
310
311 g_port[i].sockAddr.sin_family = AF_INET;
312 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
313 g_port[i].sockAddr.sin_port = htons(addr.port());
314 // In this order
315 g_port[i].sockDef = 1;
316 }
317 void DisconnectSlot(unsigned int i)
318 {
319 if (i>39)
320 return;
321
322 g_port[i].sockDef = 0;
323 // In this order
324 g_port[i].sockAddr.sin_family = AF_INET;
325 g_port[i].sockAddr.sin_addr.s_addr = 0;
326 g_port[i].sockAddr.sin_port = 0;
327 }
328 void IgnoreSlot(unsigned int i)
329 {
330 if (i>39)
331 return;
332 if (g_port[i].sockAddr.sin_port==0)
333 return;
334
335 g_port[i].sockDef = -1;
336 }
337
338
339 void Abort()
340 {
341 fMsg.Message("Signal abort to EventBuilder thread...");
342 g_runStat = kAbort;
343 }
344
345 void ResetThread(bool soft)
346 {
347 /*
348 if (g_reset > 0)
349
350 * suspend reading
351 * reset = g_reset;
352 * g_reset=0
353
354 * reset% 10
355 == 0 leave event Buffers as they are
356 == 1 let all buffers drain (write (incomplete) events)
357 > 1 flush all buffers (do not write buffered events)
358
359 * (reset/10)%10
360 > 0 close all sockets and destroy them (also free the
361 allocated read-buffers)
362 recreate before resuming operation
363 [ this is more than just close/open that can be
364 triggered by e.g. close/open the base-socket ]
365
366 * (reset/100)%10
367 > 0 close all open run-files
368
369 * (reset/1000)
370 sleep so many seconds before resuming operation
371 (does not (yet) take into account time left when waiting
372 for buffers getting empty ...)
373
374 * resume_reading
375
376 */
377 fMsg.Message("Signal reset to EventBuilder thread...");
378 g_reset = soft ? 101 : 102;
379 }
380
381 void Exit()
382 {
383 fMsg.Message("Signal exit to EventBuilder thread...");
384 g_runStat = kExit;
385 }
386
387 /*
388 void Wait()
389 {
390 fThread.join();
391 ffMsg.Message("EventBuilder stopped.");
392 }*/
393
394 void Hybernate() const { g_runStat = kHybernate; }
395 void Sleep() const { g_runStat = kSleep; }
396 void FlushMode() const { g_runStat = kModeFlush; }
397 void TestMode() const { g_runStat = kModeTest; }
398 void FlagMode() const { g_runStat = kModeFlag; }
399 void RunMode() const { g_runStat = kModeRun; }
400
401 // FIXME: To be removed
402 void SetMode(int mode) const { g_runStat = mode; }
403
404 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
405 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
406 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
407 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
408 int GetNumFilesOpen() const { return fFiles.size(); }
409
410 /*
411 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
412 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
413 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
414 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
415 */
416
417 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
418 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
419
420 void SetOutputFormat(FileFormat_t f)
421 {
422 fFileFormat = f;
423 if (fFileFormat==kCalib)
424 {
425 DataCalib::Restart();
426 DataCalib::Update(fDimDrsCalibration);
427 fMsg.Message("Resetted DRS calibration.");
428 }
429 }
430
431 void SetDebugLog(bool b) { fDebugLog = b; }
432
433 void SetDebugStream(bool b)
434 {
435 fDebugStream = b;
436 if (b)
437 return;
438
439 for (int i=0; i<40; i++)
440 {
441 if (!fDumpStream[i].is_open())
442 continue;
443
444 fDumpStream[i].close();
445
446 ostringstream name;
447 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
448 fMsg.Message("Closed file '"+name.str()+"'");
449 }
450 }
451
452 void SetDebugRead(bool b)
453 {
454 fDebugRead = b;
455 if (b || !fDumpRead.is_open())
456 return;
457
458 fDumpRead.close();
459 fMsg.Message("Closed file 'socket_events.txt'");
460 }
461
462// size_t GetUsedMemory() const { return gi_usedMem; }
463
464 void LoadDrsCalibration(const char *fname)
465 {
466 if (!DataCalib::ReadFits(fname, fMsg))
467 return;
468 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
469 DataCalib::Update(fDimDrsCalibration);
470 }
471
472 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
473
474
475 /*
476 struct OpenFileToDim
477 {
478 int code;
479 char fileName[FILENAME_MAX];
480 };
481
482 SignalRunOpened(runid, filename);
483 // Send num open files
484 // Send runid, (more info about the run?), filename via dim
485
486 SignalEvtWritten(runid);
487 // Send num events written of newest file
488
489 SignalRunClose(runid);
490 // Send new num open files
491 // Send empty file-name if no file is open
492
493 */
494
495 // -------------- Mapped event builder callbacks ------------------
496
497 void UpdateRuns(const string &fname="")
498 {
499 uint32_t values[5] =
500 {
501 static_cast<uint32_t>(fFiles.size()),
502 0xffffffff,
503 0,
504 fLastOpened,
505 fLastClosed
506 };
507
508 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
509 it!=fFiles.end(); it++)
510 {
511 const DataProcessorImp *file = *it;
512
513 if (file->GetRunId()<values[1])
514 values[1] = file->GetRunId();
515
516 if (file->GetRunId()>values[2])
517 values[2] = file->GetRunId();
518 }
519
520 fMaxRun = values[2];
521
522 vector<char> data(sizeof(values)+fname.size()+1);
523 memcpy(data.data(), values, sizeof(values));
524 strcpy(data.data()+sizeof(values), fname.c_str());
525
526 fDimRuns.Update(data);
527 }
528
529 vector<DataProcessorImp*> fFiles;
530
531 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
532 {
533 fMsg.Info(" ==> TODO: Update run configuration in database!");
534 fMsg.Info(" ==> TODO: Remove run from fExpected runs in case of failure!");
535 fMsg.Info(" ==> TODO: Write information from fTargetConfig to header!");
536
537 // Check if file already exists...
538 DataProcessorImp *file = 0;
539 switch (fFileFormat)
540 {
541 case kNone: file = new DataDump(fPath, runid, fMsg); break;
542 case kDebug: file = new DataDebug(fPath, runid, fMsg); break;
543 case kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
544 case kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
545 case kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
546 }
547
548 try
549 {
550 if (!file->Open(h))
551 return 0;
552 }
553 catch (const exception &e)
554 {
555 fMsg.Error("Exception trying to open file: "+string(e.what()));
556 return 0;
557 }
558
559 fFiles.push_back(file);
560
561 ostringstream str;
562 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
563 fMsg.Info(str);
564
565 fDimWriteStats.FileOpened(file->GetFileName());
566
567 fLastOpened = runid;
568 UpdateRuns(file->GetFileName());
569
570 fNumEvts[kEventId] = 0;
571 fNumEvts[kTriggerId] = 0;
572
573 fNumEvts[kCurrent] = 0;
574 fDimEvents.Update(fNumEvts);
575 // fDimCurrentEvent.Update(uint32_t(0));
576
577 return reinterpret_cast<FileHandle_t>(file);
578 }
579
580 int runWrite(FileHandle_t handler, EVENT *e, size_t sz)
581 {
582 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
583
584 if (!file->WriteEvt(e))
585 return -1;
586
587 if (file->GetRunId()==fMaxRun)
588 {
589 fNumEvts[kCurrent]++;
590 fNumEvts[kEventId] = e->EventNum;
591 fNumEvts[kTriggerId] = e->TriggerNum;
592 }
593
594 fNumEvts[kTotal]++;
595
596 static Time oldt(boost::date_time::neg_infin);
597 Time newt;
598 if (newt>oldt+boost::posix_time::seconds(1))
599 {
600 fDimEvents.Update(fNumEvts);
601 oldt = newt;
602 }
603
604
605 // ===> SignalEvtWritten(runid);
606 // Send num events written of newest file
607
608 /* close run runId (all all runs if runId=0) */
609 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
610 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
611
612 return 0;
613 }
614
615 virtual void CloseRun(uint32_t runid) { }
616
617 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
618 {
619 fMsg.Info(" ==> TODO: Update run configuration in database!");
620 fMsg.Info(" ==> TODO: Remove run from fExpected runs!");
621
622 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
623
624 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
625 if (it==fFiles.end())
626 {
627 ostringstream str;
628 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
629 fMsg.Fatal(str);
630 return -1;
631 }
632
633 fFiles.erase(it);
634
635 fLastClosed = file->GetRunId();
636 CloseRun(fLastClosed);
637 UpdateRuns();
638
639 fDimEvents.Update(fNumEvts);
640
641 const bool rc = file->Close(tail);
642 if (!rc)
643 {
644 // Error message
645 }
646
647 ostringstream str;
648 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
649 fMsg.Info(str);
650
651 delete file;
652
653 // ==> SignalRunClose(runid);
654 // Send new num open files
655 // Send empty file-name if no file is open
656
657 return rc ? 0 : -1;
658 }
659
660 ofstream fDumpStream[40];
661
662 void debugStream(int isock, void *buf, int len)
663 {
664 if (!fDebugStream)
665 return;
666
667 const int slot = isock/7;
668 if (slot<0 || slot>39)
669 return;
670
671 if (!fDumpStream[slot].is_open())
672 {
673 ostringstream name;
674 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
675
676 fDumpStream[slot].open(name.str().c_str(), ios::app);
677 if (!fDumpStream[slot])
678 {
679 ostringstream str;
680 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
681 fMsg.Error(str);
682
683 return;
684 }
685
686 fMsg.Message("Opened file '"+name.str()+"' for writing.");
687 }
688
689 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
690 }
691
692 ofstream fDumpRead; // Stream to possibly dump docket events
693
694 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
695 {
696 // isock = socketID (0-279)
697 // ibyte = #bytes gelesen
698 // event = eventId (oder 0 wenn noch nicht bekannt)
699 // state : 1=finished reading data
700 // 0=reading data
701 // -1=start reading data (header)
702 // -2=start reading data,
703 // eventId not known yet (too little data)
704 // tsec, tusec = time when reading seconds, microseconds
705 //
706 if (!fDebugRead || ibyte==0)
707 return;
708
709 if (!fDumpRead.is_open())
710 {
711 fDumpRead.open("socket_events.txt", ios::app);
712 if (!fDumpRead)
713 {
714 ostringstream str;
715 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
716 fMsg.Error(str);
717
718 return;
719 }
720
721 fMsg.Message("Opened file 'socket_events.txt' for writing.");
722
723 fDumpRead << "# START: " << Time().GetAsStr() << endl;
724 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
725 }
726
727 fDumpRead
728 << setw(2) << state << " "
729 << setw(8) << tsec << " "
730 << setw(9) << tusec << " "
731 << setw(3) << isock << " "
732 << setw(2) << isock/7 << " "
733 << runno << " "
734 << event << " "
735 << ftmevt << " "
736 << ibyte << endl;
737 }
738
739 array<uint16_t,2> fVecRoi;
740
741 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int iboard)
742 {
743 /*
744 fadhd[i] ist ein array mit den 40 fad-headers
745 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
746
747 event ist die Struktur, die auch die write routine erhaelt;
748 darin sind im header die 'soll-werte' fuer z.B. eventID
749 als auch die ADC-Werte (falls Du die brauchst)
750
751 Wenn die routine einen negativen Wert liefert, wird das event
752 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
753 */
754
755 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
756
757 if (roi!=fVecRoi)
758 {
759 Update(fDimRoi, roi);
760 fVecRoi = roi;
761 }
762
763 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
764 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
765
766 // FIMXE: Compare with target configuration
767
768 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
769 {
770 // FIXME: Compare with expectations!!!
771 if (ptr->fStartDelimiter==0)
772 {
773 if (ptr==beg)
774 beg++;
775 continue;
776 }
777
778 if (beg->fStatus != ptr->fStatus)
779 {
780 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
781 CloseRunFile(runNr, 0, 0);
782 return -1;
783 }
784
785 if (beg->fRunNumber != ptr->fRunNumber)
786 {
787 fMsg.Error("Inconsistent run number detected.... closing run.");
788 CloseRunFile(runNr, 0, 0);
789 return -1;
790 }
791
792 /*
793 if (beg->fVersion != ptr->fVersion)
794 {
795 Error("Inconsist firmware version detected.... closing run.");
796 CloseRunFile(runNr, 0, 0);
797 break;
798 }
799 if (beg->fEventCounter != ptr->fEventCounter)
800 {
801 Error("Inconsist run number detected.... closing run.");
802 CloseRunFile(runNr, 0, 0);
803 break;
804 }
805 if (beg->fTriggerCounter != ptr->fTriggerCounter)
806 {
807 Error("Inconsist trigger number detected.... closing run.");
808 CloseRunFile(runNr, 0, 0);
809 break;
810 }*/
811
812 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
813 {
814 fMsg.Error("Inconsistent phase shift detected.... closing run.");
815 CloseRunFile(runNr, 0, 0);
816 return -1;
817 }
818
819 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
820 {
821 fMsg.Error("Inconsistent DAC values detected.... closing run.");
822 CloseRunFile(runNr, 0, 0);
823 return -1;
824 }
825
826 if (beg->fTriggerType != ptr->fTriggerType)
827 {
828 fMsg.Error("Inconsistent trigger type detected.... closing run.");
829 CloseRunFile(runNr, 0, 0);
830 return -1;
831 }
832 }
833
834 // check REFCLK_frequency
835 // check consistency with command configuration
836 // how to log errors?
837 // need gotNewRun/closedRun to know it is finished
838
839 return 0;
840 }
841
842 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
843 {
844 // Currently we send any event no matter what its trigger id is...
845 // To be changed.
846 static Time oldt(boost::date_time::neg_infin);
847 Time newt;
848
849 // FIXME: Only send events if the have newer run-numbers
850 if (newt<oldt+boost::posix_time::seconds(1))
851 return;
852
853 oldt = newt;
854
855 vector<char> data(sizeof(EVENT)+event->Roi*sizeof(float)*(1440+160));
856 memcpy(data.data(), event, sizeof(EVENT));
857
858 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
859
860 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
861 fDimRawData.Update(data);
862
863 DrsCalibrate::RemoveSpikes(vec, event->Roi);
864
865 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
866 DrsCalibrate::GetPixelStats(data2.data(), vec, event->Roi);
867
868 fDimEventData.Update(data2);
869 }
870
871 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
872 {
873 if (!DataCalib::IsValid())
874 return;
875
876 // Workaround to find a valid header.....
877 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
878 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
879
880 // FIMXE: Compare with target configuration
881
882 const FAD::EventHeader *ptr=beg;
883 for (; ptr!=end; ptr++)
884 {
885 if (ptr->fStartDelimiter==0)
886 continue;
887
888 // ptr->fTriggerType = htons(ptr->fTriggerType)
889
890 if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
891 return;
892 }
893
894 if (ptr->fStartDelimiter==0)
895 return;
896
897 vector<float> data(event->Roi*1440);
898 DataCalib::Apply(data.data(), event->Adc_Data, event->StartPix, event->Roi);
899
900 DrsCalibrate::RemoveSpikes(data.data(), event->Roi);
901
902 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
903 DrsCalibrate::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
904
905 fDimFeedbackData.Update(data2);
906 }
907
908 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t iboard, void */*buffer*/)
909 {
910 switch (threadID)
911 {
912 case 0:
913 SendRawData(fadhd, event);
914 return 1;
915 case 1:
916 SendFeedbackData(fadhd, event);
917 return 2;
918 }
919 return 100;
920 }
921
922
923 bool IsRunStarted() const
924 {
925 const map<uint32_t,RunDescription>::const_iterator it = fExpectedRuns.find(fRunNumber-1);
926 return it==fExpectedRuns.end();// ? true : it->second.started;
927 }
928
929 uint32_t GetRunNumber() const
930 {
931 return fRunNumber;
932 }
933
934 void IncreaseRunNumber(uint32_t run)
935 {
936 if (run>fRunNumber)
937 fRunNumber = run;
938 }
939
940 void gotNewRun(int runnr, PEVNT_HEADER */*headers*/)
941 {
942 // This function is called even when writing is switched off
943 const map<uint32_t,RunDescription>::iterator it = fExpectedRuns.find(runnr);
944 if (it==fExpectedRuns.end())
945 {
946 ostringstream str;
947 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
948 return;
949 }
950
951 CloseRunFile(runnr, time(NULL)+it->second.maxtime, it->second.maxevt);
952 // return: 0=close scheduled / >0 already closed / <0 does not exist
953
954 // FIXME: Move configuration from expected runs to runs which will soon
955 // be opened/closed
956
957 it->second.started = true;
958
959 fExpectedRuns.erase(it);
960 }
961
962 map<boost::thread::id, string> fLastMessage;
963
964 void factOut(int severity, int err, const char *message)
965 {
966 if (!fDebugLog && severity==99)
967 return;
968
969 ostringstream str;
970 //str << boost::this_thread::get_id() << " ";
971 str << "EventBuilder(";
972 if (err<0)
973 str << "---";
974 else
975 str << err;
976 str << "): " << message;
977
978 string &old = fLastMessage[boost::this_thread::get_id()];
979
980 if (str.str()==old)
981 return;
982 old = str.str();
983
984 fMsg.Update(str, severity);
985 }
986/*
987 void factStat(int64_t *stat, int len)
988 {
989 if (len!=7)
990 {
991 fMsg.Warn("factStat received unknown number of values.");
992 return;
993 }
994
995 vector<int64_t> data(1, g_maxMem);
996 data.insert(data.end(), stat, stat+len);
997
998 static vector<int64_t> last(8);
999 if (data==last)
1000 return;
1001 last = data;
1002
1003 fDimStatistics.Update(data);
1004
1005 // len ist die Laenge des arrays.
1006 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
1007 // kannst Du pruefen, ob die 100MB voll sind ....
1008
1009 ostringstream str;
1010 str
1011 << "Wait=" << stat[0] << " "
1012 << "Skip=" << stat[1] << " "
1013 << "Del=" << stat[2] << " "
1014 << "Tot=" << stat[3] << " "
1015 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1016 << "Read=" << stat[5] << " "
1017 << "Conn=" << stat[6];
1018
1019 fMsg.Info(str);
1020 }
1021 */
1022
1023 void factStat(const EVT_STAT &stat)
1024 {
1025 fDimStatistics2.Update(stat);
1026 }
1027
1028 void factStat(const GUI_STAT &stat)
1029 {
1030 fDimStatistics1.Update(stat);
1031 }
1032
1033
1034 array<FAD::EventHeader, 40> fVecHeader;
1035
1036 template<typename T, class S>
1037 array<T, 42> Compare(const S *vec, const T *t)
1038 {
1039 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1040
1041 const T *min = NULL;
1042 const T *val = NULL;
1043 const T *max = NULL;
1044
1045 array<T, 42> arr;
1046
1047 bool rc = true;
1048 for (int i=0; i<40; i++)
1049 {
1050 const char *base = reinterpret_cast<const char*>(vec+i);
1051 const T *ref = reinterpret_cast<const T*>(base+offset);
1052
1053 arr[i] = *ref;
1054
1055 if (gi_NumConnect[i]!=7)
1056 {
1057 arr[i] = 0;
1058 continue;
1059 }
1060
1061 if (!val)
1062 {
1063 min = ref;
1064 val = ref;
1065 max = ref;
1066 }
1067
1068 if (*ref<*min)
1069 min = ref;
1070
1071 if (*ref>*max)
1072 max = ref;
1073
1074 if (*val!=*ref)
1075 rc = false;
1076 }
1077
1078 arr[40] = val ? *min : 1;
1079 arr[41] = val ? *max : 0;
1080
1081 return arr;
1082 }
1083
1084 template<typename T>
1085 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1086 {
1087 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1088
1089 T val = 0;
1090 T rc = 0;
1091
1092 array<T, 42> vec;
1093
1094 bool first = true;
1095
1096 for (int i=0; i<40; i++)
1097 {
1098 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1099 const T *ref = reinterpret_cast<const T*>(base+offset);
1100
1101 vec[i+2] = *ref;
1102
1103 if (gi_NumConnect[i]!=7)
1104 {
1105 vec[i+2] = 0;
1106 continue;
1107 }
1108
1109 if (first)
1110 {
1111 first = false;
1112 val = *ref;
1113 rc = 0;
1114 }
1115
1116 rc |= val^*ref;
1117 }
1118
1119 vec[0] = rc;
1120 vec[1] = val;
1121
1122 return vec;
1123 }
1124
1125 template<typename T, size_t N>
1126 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1127 {
1128// svc.setQuality(vec[40]<=vec[41]);
1129 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1130 svc.Update();
1131 }
1132
1133 template<typename T>
1134 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1135 {
1136 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1137 for (int i=0; i<40;i++)
1138 cout << " " << data.second[i+3];
1139 cout << endl;
1140 }
1141
1142 vector<uint> fNumConnected;
1143
1144 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1145 {
1146 const uint16_t id = h.Id();
1147 if (id>39)
1148 return;
1149
1150 if (fNumConnected.size()!=40)
1151 fNumConnected.resize(40);
1152
1153 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1154
1155 const bool changed = con!=fNumConnected || !IsThreadRunning();
1156
1157 fNumConnected = con;
1158
1159 const FAD::EventHeader old = fVecHeader[id];
1160 fVecHeader[id] = h;
1161
1162 if (old.fVersion != h.fVersion || changed)
1163 {
1164 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1165
1166 array<float,42> data;
1167 for (int i=0; i<42; i++)
1168 {
1169 ostringstream str;
1170 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1171 data[i] = stof(str.str());
1172 }
1173 Update(fDimFwVersion, data);
1174 }
1175
1176 if (old.fRunNumber != h.fRunNumber || changed)
1177 {
1178 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1179 fDimRunNumber.Update(run);
1180 }
1181
1182 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1183 {
1184 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1185 fDimPrescaler.Update(pre);
1186 }
1187
1188 if (old.fDNA != h.fDNA || changed)
1189 {
1190 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1191 Update(fDimDNA, dna, 40);
1192 }
1193
1194 if (old.fStatus != h.fStatus || changed)
1195 {
1196 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1197 Update(fDimStatus, sts);
1198 }
1199
1200 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1201 {
1202 array<uint16_t, FAD::kNumDac*42> dacs;
1203
1204 for (int i=0; i<FAD::kNumDac; i++)
1205 {
1206 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1207 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1208 }
1209
1210 Update(fDimDac, dacs);
1211 }
1212
1213 // -----------
1214
1215 static Time oldt(boost::date_time::neg_infin);
1216 Time newt;
1217
1218 if (newt>oldt+boost::posix_time::seconds(1))
1219 {
1220 oldt = newt;
1221
1222 // --- RefClock
1223
1224 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1225 Update(fDimRefClock, clk);
1226
1227 // --- Temperatures
1228
1229 const array<int16_t,42> tmp[4] =
1230 {
1231 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1232 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1233 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1234 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1235 };
1236
1237 vector<int16_t> data;
1238 data.reserve(82);
1239 data.push_back(tmp[0][40]); // min: 0
1240 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1241 data.push_back(tmp[0][41]); // max: 41
1242 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1243
1244 for (int j=1; j<=3; j++)
1245 {
1246 const array<int16_t,42> &ref = tmp[j];
1247
1248 // Gloabl min
1249 if (ref[40]<data[0]) // 40=min
1250 data[0] = ref[40];
1251
1252 // Global max
1253 if (ref[41]>data[41]) // 41=max
1254 data[41] = ref[41];
1255
1256 for (int i=0; i<40; i++)
1257 {
1258 // min per board
1259 if (ref[i]<data[i+1]) // data: 1-40
1260 data[i+1] = ref[i]; // ref: 0-39
1261
1262 // max per board
1263 if (ref[i]>data[i+42]) // data: 42-81
1264 data[i+42] = ref[i]; // ref: 0-39
1265 }
1266
1267
1268 }
1269
1270 vector<float> deg(82); // 0: global min, 1-40: min
1271 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1272 deg[i] = data[i]/16.;
1273 fDimTemperature.Update(deg);
1274 }
1275 }
1276};
1277
1278EventBuilderWrapper *EventBuilderWrapper::This = 0;
1279
1280// ----------- Event builder callbacks implementation ---------------
1281extern "C"
1282{
1283 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1284 {
1285 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1286 }
1287
1288 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1289 {
1290 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1291 }
1292
1293 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1294 {
1295 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1296 }
1297
1298 // -----
1299
1300 void *runStart(uint32_t irun, RUN_HEAD *runhd, size_t len)
1301 {
1302 return NULL;
1303 }
1304
1305 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
1306 {
1307 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
1308 }
1309
1310 int runEnd(uint32_t, void *runPtr)
1311 {
1312 return 0;
1313 }
1314
1315 // -----
1316
1317 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
1318 {
1319 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
1320 }
1321
1322 void gotNewRun(int runnr, PEVNT_HEADER *headers)
1323 {
1324 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1325 }
1326
1327 // -----
1328
1329 void factOut(int severity, int err, const char *message)
1330 {
1331 EventBuilderWrapper::This->factOut(severity, err, message);
1332 }
1333
1334 void factStat(GUI_STAT stat)
1335 {
1336 EventBuilderWrapper::This->factStat(stat);
1337 }
1338
1339 void factStatNew(EVT_STAT stat)
1340 {
1341 EventBuilderWrapper::This->factStat(stat);
1342 }
1343
1344 // ------
1345
1346 void debugHead(int socket, int/*board*/, void *buf)
1347 {
1348 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1349
1350 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1351 }
1352
1353 void debugStream(int isock, void *buf, int len)
1354 {
1355 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1356 }
1357
1358 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1359 {
1360 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1361 }
1362}
1363
1364#endif
Note: See TracBrowser for help on using the repository browser.