source: trunk/FACT++/src/EventBuilderWrapper.h@ 12493

Last change on this file since 12493 was 12492, checked in by tbretz, 14 years ago
Removed hacked byte swapping from debugHead
File size: 39.9 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 enum FileFormat_t
78 {
79 kNone = 0,
80 kDebug,
81 kFits,
82 kRaw,
83 kCalib
84 };
85
86 FileFormat_t fFileFormat;
87
88
89 uint32_t fMaxRun;
90 uint32_t fLastOpened;
91 uint32_t fLastClosed;
92 uint32_t fNumEvts[4];
93
94 DimWriteStatistics fDimWriteStats;
95 DimDescribedService fDimRuns;
96 DimDescribedService fDimEvents;
97 DimDescribedService fDimRawData;
98 DimDescribedService fDimEventData;
99 DimDescribedService fDimFeedbackData;
100 DimDescribedService fDimFwVersion;
101 DimDescribedService fDimRunNumber;
102 DimDescribedService fDimStatus;
103 DimDescribedService fDimDNA;
104 DimDescribedService fDimTemperature;
105 DimDescribedService fDimPrescaler;
106 DimDescribedService fDimRefClock;
107 DimDescribedService fDimRoi;
108 DimDescribedService fDimDac;
109 DimDescribedService fDimDrsCalibration;
110 DimDescribedService fDimStatistics1;
111 DimDescribedService fDimStatistics2;
112
113 bool fDebugStream;
114 bool fDebugRead;
115 bool fDebugLog;
116
117 string fPath;
118 uint32_t fRunNumber;
119
120protected:
121 int64_t InitRunNumber()
122 {
123 fRunNumber = 1000;
124
125 // Ensure that the night doesn't change during our check
126 const int check = Time().NightAsInt();
127
128 while (--fRunNumber>0)
129 {
130 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
131
132 if (access((name+"bin").c_str(), F_OK) == 0)
133 break;
134 if (access((name+"fits").c_str(), F_OK) == 0)
135 break;
136 if (access((name+"drs.fits").c_str(), F_OK) == 0)
137 break;
138
139 }
140
141 if (check != Time().NightAsInt())
142 return InitRunNumber();
143
144 fRunNumber++;
145
146 if (fRunNumber==1000)
147 {
148 fMsg.Error("You have a file with run-number 1000 in "+fPath);
149 return -1;
150 }
151
152 ostringstream str;
153 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
154 fMsg.Message(str);
155
156 fMsg.Info(" ==> TODO: Crosscheck with database!");
157
158 return check;
159 }
160
161 int64_t InitRunNumber(const string &path)
162 {
163 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
164 {
165 fMsg.Error("Data path "+path+" does not exist!");
166 return -1;
167 }
168
169 //const fs::path fullPath = fs::system_complete(fs::path(path));
170
171 fPath = path;
172
173 fDimWriteStats.SetCurrentFolder(fPath);
174
175 return InitRunNumber();
176 }
177
178public:
179 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
180 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
181 fDimWriteStats ("FAD_CONTROL", imp),
182 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
183 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
184 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
185 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
186 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
187 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
188 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
189 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
190 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
191 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
192 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
193 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
194 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
195 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
196 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:1;I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:163840;F:163840", ""),
197 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
198 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
199 fDebugStream(false), fDebugRead(false), fDebugLog(false)
200 {
201 if (This)
202 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
203
204 This = this;
205
206 memset(fNumEvts, 0, sizeof(fNumEvts));
207 fDimEvents.Update(fNumEvts);
208
209 for (size_t i=0; i<40; i++)
210 ConnectSlot(i, tcp::endpoint());
211 }
212 virtual ~EventBuilderWrapper()
213 {
214 Abort();
215
216 // FIXME: Used timed_join and abort afterwards
217 // What's the maximum time the eb need to abort?
218 fThread.join();
219 //ffMsg.Info("EventBuilder stopped.");
220
221 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
222 delete *it;
223 }
224
225 set<uint32_t> fIsRunStarted;
226 map<uint32_t, FAD::RunDescription> fExpectedRuns;
227
228 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
229 {
230 if (maxtime<=0 || maxtime>24*60*60)
231 maxtime = 24*60*60;
232 if (maxevt<=0 || maxevt>INT32_MAX)
233 maxevt = INT32_MAX;
234
235 const FAD::RunDescription descr =
236 {
237 uint32_t(maxtime),
238 uint32_t(maxevt),
239 ref.first,
240 ref.second,
241 };
242
243 // FIMXE: Maybe reset an event counter so that the mcp can count events?
244
245 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
246
247 fExpectedRuns[fRunNumber] = descr;
248 fIsRunStarted.insert(fRunNumber);
249 return fRunNumber++;
250 }
251
252 bool IsThreadRunning()
253 {
254 return !fThread.timed_join(boost::posix_time::microseconds(0));
255 }
256
257 void SetMaxMemory(unsigned int mb) const
258 {
259 /*
260 if (mb*1000000<GetUsedMemory())
261 {
262 // ffMsg.Warn("...");
263 return;
264 }*/
265
266 g_maxMem = size_t(mb)*1000000;
267 }
268
269 void StartThread(const vector<tcp::endpoint> &addr)
270 {
271 if (IsThreadRunning())
272 {
273 fMsg.Warn("Start - EventBuilder still running");
274 return;
275 }
276
277 fLastMessage.clear();
278
279 for (size_t i=0; i<40; i++)
280 ConnectSlot(i, addr[i]);
281
282 g_runStat = kModeRun;
283 g_maxProc = 3;
284
285 fMsg.Message("Starting EventBuilder thread");
286
287 fThread = boost::thread(StartEvtBuild);
288 }
289 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
290 {
291 if (i>39)
292 return;
293
294 if (addr==tcp::endpoint())
295 {
296 DisconnectSlot(i);
297 return;
298 }
299
300 g_port[i].sockAddr.sin_family = AF_INET;
301 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
302 g_port[i].sockAddr.sin_port = htons(addr.port());
303 // In this order
304 g_port[i].sockDef = 1;
305 }
306 void DisconnectSlot(unsigned int i)
307 {
308 if (i>39)
309 return;
310
311 g_port[i].sockDef = 0;
312 // In this order
313 g_port[i].sockAddr.sin_family = AF_INET;
314 g_port[i].sockAddr.sin_addr.s_addr = 0;
315 g_port[i].sockAddr.sin_port = 0;
316 }
317 void IgnoreSlot(unsigned int i)
318 {
319 if (i>39)
320 return;
321 if (g_port[i].sockAddr.sin_port==0)
322 return;
323
324 g_port[i].sockDef = -1;
325 }
326
327
328 void Abort()
329 {
330 fMsg.Message("Signal abort to EventBuilder thread...");
331 g_runStat = kAbort;
332 }
333
334 void ResetThread(bool soft)
335 {
336 /*
337 if (g_reset > 0)
338
339 * suspend reading
340 * reset = g_reset;
341 * g_reset=0
342
343 * reset% 10
344 == 0 leave event Buffers as they are
345 == 1 let all buffers drain (write (incomplete) events)
346 > 1 flush all buffers (do not write buffered events)
347
348 * (reset/10)%10
349 > 0 close all sockets and destroy them (also free the
350 allocated read-buffers)
351 recreate before resuming operation
352 [ this is more than just close/open that can be
353 triggered by e.g. close/open the base-socket ]
354
355 * (reset/100)%10
356 > 0 close all open run-files
357
358 * (reset/1000)
359 sleep so many seconds before resuming operation
360 (does not (yet) take into account time left when waiting
361 for buffers getting empty ...)
362
363 * resume_reading
364
365 */
366 fMsg.Message("Signal reset to EventBuilder thread...");
367 g_reset = soft ? 101 : 102;
368 }
369
370 void Exit()
371 {
372 fMsg.Message("Signal exit to EventBuilder thread...");
373 g_runStat = kExit;
374 }
375
376 /*
377 void Wait()
378 {
379 fThread.join();
380 ffMsg.Message("EventBuilder stopped.");
381 }*/
382
383 void Hybernate() const { g_runStat = kHybernate; }
384 void Sleep() const { g_runStat = kSleep; }
385 void FlushMode() const { g_runStat = kModeFlush; }
386 void TestMode() const { g_runStat = kModeTest; }
387 void FlagMode() const { g_runStat = kModeFlag; }
388 void RunMode() const { g_runStat = kModeRun; }
389
390 // FIXME: To be removed
391 void SetMode(int mode) const { g_runStat = mode; }
392
393 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
394 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
395 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
396 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
397 int GetNumFilesOpen() const { return fFiles.size(); }
398
399 /*
400 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
401 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
402 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
403 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
404 */
405
406 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
407 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
408
409 void SetOutputFormat(FileFormat_t f)
410 {
411 fFileFormat = f;
412 if (fFileFormat==kCalib)
413 {
414 DataCalib::Restart();
415 DataCalib::Update(fDimDrsCalibration);
416 fMsg.Message("Resetted DRS calibration.");
417 }
418 }
419
420 void SetDebugLog(bool b) { fDebugLog = b; }
421
422 void SetDebugStream(bool b)
423 {
424 fDebugStream = b;
425 if (b)
426 return;
427
428 for (int i=0; i<40; i++)
429 {
430 if (!fDumpStream[i].is_open())
431 continue;
432
433 fDumpStream[i].close();
434
435 ostringstream name;
436 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
437 fMsg.Message("Closed file '"+name.str()+"'");
438 }
439 }
440
441 void SetDebugRead(bool b)
442 {
443 fDebugRead = b;
444 if (b || !fDumpRead.is_open())
445 return;
446
447 fDumpRead.close();
448 fMsg.Message("Closed file 'socket_events.txt'");
449 }
450
451// size_t GetUsedMemory() const { return gi_usedMem; }
452
453 void LoadDrsCalibration(const char *fname)
454 {
455 if (!DataCalib::ReadFits(fname, fMsg))
456 return;
457 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
458 DataCalib::Update(fDimDrsCalibration);
459 }
460
461 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
462
463
464 /*
465 struct OpenFileToDim
466 {
467 int code;
468 char fileName[FILENAME_MAX];
469 };
470
471 SignalRunOpened(runid, filename);
472 // Send num open files
473 // Send runid, (more info about the run?), filename via dim
474
475 SignalEvtWritten(runid);
476 // Send num events written of newest file
477
478 SignalRunClose(runid);
479 // Send new num open files
480 // Send empty file-name if no file is open
481
482 */
483
484 // -------------- Mapped event builder callbacks ------------------
485
486 void UpdateRuns(const string &fname="")
487 {
488 uint32_t values[5] =
489 {
490 static_cast<uint32_t>(fFiles.size()),
491 0xffffffff,
492 0,
493 fLastOpened,
494 fLastClosed
495 };
496
497 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
498 it!=fFiles.end(); it++)
499 {
500 const DataProcessorImp *file = *it;
501
502 if (file->GetRunId()<values[1])
503 values[1] = file->GetRunId();
504
505 if (file->GetRunId()>values[2])
506 values[2] = file->GetRunId();
507 }
508
509 fMaxRun = values[2];
510
511 vector<char> data(sizeof(values)+fname.size()+1);
512 memcpy(data.data(), values, sizeof(values));
513 strcpy(data.data()+sizeof(values), fname.c_str());
514
515 fDimRuns.Update(data);
516 }
517
518 vector<DataProcessorImp*> fFiles;
519
520 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
521 {
522 fMsg.Info(" ==> TODO: Update run configuration in database!");
523 fMsg.Info(" ==> TODO: Remove run from fExpected runs in case of failure!");
524 fMsg.Info(" ==> TODO: Write information from fTargetConfig to header!");
525
526 map<uint32_t,FAD::RunDescription>::iterator it = fExpectedRuns.begin();
527 while (it!=fExpectedRuns.end())
528 {
529 if (it->first<runid)
530 {
531 ostringstream str;
532 str << "runOpen - Missed run " << it->first << "." << endl;
533 fMsg.Info(str);
534
535 fExpectedRuns.erase(it++);
536 continue;
537 }
538 if (it->first==runid)
539 break;
540 it++;
541 }
542 if (it==fExpectedRuns.end())
543 {
544 ostringstream str;
545 str << "runOpen - Run " << runid << " wasn't expected." << endl;
546 fMsg.Warn(str);
547 }
548
549 const FAD::RunDescription desc = it->second;
550
551 fExpectedRuns.erase(it);
552
553 // Check if file already exists...
554 DataProcessorImp *file = 0;
555 switch (fFileFormat)
556 {
557 case kNone: file = new DataDump(fPath, runid, fMsg); break;
558 case kDebug: file = new DataDebug(fPath, runid, fMsg); break;
559 case kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
560 case kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
561 case kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
562 }
563
564 try
565 {
566 if (!file->Open(h, desc))
567 return 0;
568 }
569 catch (const exception &e)
570 {
571 fMsg.Error("Exception trying to open file: "+string(e.what()));
572 return 0;
573 }
574
575 fFiles.push_back(file);
576
577 ostringstream str;
578 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
579 fMsg.Info(str);
580
581 fDimWriteStats.FileOpened(file->GetFileName());
582
583 fLastOpened = runid;
584 UpdateRuns(file->GetFileName());
585
586 fNumEvts[kEventId] = 0;
587 fNumEvts[kTriggerId] = 0;
588
589 fNumEvts[kCurrent] = 0;
590 fDimEvents.Update(fNumEvts);
591 // fDimCurrentEvent.Update(uint32_t(0));
592
593 return reinterpret_cast<FileHandle_t>(file);
594 }
595
596 int runWrite(FileHandle_t handler, EVENT *e, size_t sz)
597 {
598 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
599
600 if (!file->WriteEvt(e))
601 return -1;
602
603 if (file->GetRunId()==fMaxRun)
604 {
605 fNumEvts[kCurrent]++;
606 fNumEvts[kEventId] = e->EventNum;
607 fNumEvts[kTriggerId] = e->TriggerNum;
608 }
609
610 fNumEvts[kTotal]++;
611
612 static Time oldt(boost::date_time::neg_infin);
613 Time newt;
614 if (newt>oldt+boost::posix_time::seconds(1))
615 {
616 fDimEvents.Update(fNumEvts);
617 oldt = newt;
618 }
619
620
621 // ===> SignalEvtWritten(runid);
622 // Send num events written of newest file
623
624 /* close run runId (all all runs if runId=0) */
625 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
626 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
627
628 return 0;
629 }
630
631 virtual void CloseRun(uint32_t runid) { }
632
633 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
634 {
635 fMsg.Info(" ==> TODO: Update run configuration in database!");
636 fMsg.Info(" ==> TODO: Remove run from fExpected runs!");
637
638 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
639
640 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
641 if (it==fFiles.end())
642 {
643 ostringstream str;
644 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
645 fMsg.Fatal(str);
646 return -1;
647 }
648
649 fFiles.erase(it);
650
651 fLastClosed = file->GetRunId();
652 CloseRun(fLastClosed);
653 UpdateRuns();
654
655 fDimEvents.Update(fNumEvts);
656
657 const bool rc = file->Close(tail);
658 if (!rc)
659 {
660 // Error message
661 }
662
663 ostringstream str;
664 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
665 fMsg.Info(str);
666
667 delete file;
668
669 // ==> SignalRunClose(runid);
670 // Send new num open files
671 // Send empty file-name if no file is open
672
673 return rc ? 0 : -1;
674 }
675
676 ofstream fDumpStream[40];
677
678 void debugStream(int isock, void *buf, int len)
679 {
680 if (!fDebugStream)
681 return;
682
683 const int slot = isock/7;
684 if (slot<0 || slot>39)
685 return;
686
687 if (!fDumpStream[slot].is_open())
688 {
689 ostringstream name;
690 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
691
692 fDumpStream[slot].open(name.str().c_str(), ios::app);
693 if (!fDumpStream[slot])
694 {
695 ostringstream str;
696 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
697 fMsg.Error(str);
698
699 return;
700 }
701
702 fMsg.Message("Opened file '"+name.str()+"' for writing.");
703 }
704
705 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
706 }
707
708 ofstream fDumpRead; // Stream to possibly dump docket events
709
710 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
711 {
712 // isock = socketID (0-279)
713 // ibyte = #bytes gelesen
714 // event = eventId (oder 0 wenn noch nicht bekannt)
715 // state : 1=finished reading data
716 // 0=reading data
717 // -1=start reading data (header)
718 // -2=start reading data,
719 // eventId not known yet (too little data)
720 // tsec, tusec = time when reading seconds, microseconds
721 //
722 if (!fDebugRead || ibyte==0)
723 return;
724
725 if (!fDumpRead.is_open())
726 {
727 fDumpRead.open("socket_events.txt", ios::app);
728 if (!fDumpRead)
729 {
730 ostringstream str;
731 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
732 fMsg.Error(str);
733
734 return;
735 }
736
737 fMsg.Message("Opened file 'socket_events.txt' for writing.");
738
739 fDumpRead << "# START: " << Time().GetAsStr() << endl;
740 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
741 }
742
743 fDumpRead
744 << setw(2) << state << " "
745 << setw(8) << tsec << " "
746 << setw(9) << tusec << " "
747 << setw(3) << isock << " "
748 << setw(2) << isock/7 << " "
749 << runno << " "
750 << event << " "
751 << ftmevt << " "
752 << ibyte << endl;
753 }
754
755 array<uint16_t,2> fVecRoi;
756
757 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int iboard)
758 {
759 /*
760 fadhd[i] ist ein array mit den 40 fad-headers
761 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
762
763 event ist die Struktur, die auch die write routine erhaelt;
764 darin sind im header die 'soll-werte' fuer z.B. eventID
765 als auch die ADC-Werte (falls Du die brauchst)
766
767 Wenn die routine einen negativen Wert liefert, wird das event
768 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
769 */
770
771 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
772
773 if (roi!=fVecRoi)
774 {
775 Update(fDimRoi, roi);
776 fVecRoi = roi;
777 }
778
779 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
780 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
781
782 // FIMXE: Compare with target configuration
783
784 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
785 {
786 // FIXME: Compare with expectations!!!
787 if (ptr->fStartDelimiter==0)
788 {
789 if (ptr==beg)
790 beg++;
791 continue;
792 }
793
794 if (beg->fStatus != ptr->fStatus)
795 {
796 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
797 CloseRunFile(runNr, 0, 0);
798 return -1;
799 }
800
801 if (beg->fRunNumber != ptr->fRunNumber)
802 {
803 fMsg.Error("Inconsistent run number detected.... closing run.");
804 CloseRunFile(runNr, 0, 0);
805 return -1;
806 }
807
808 /*
809 if (beg->fVersion != ptr->fVersion)
810 {
811 Error("Inconsist firmware version detected.... closing run.");
812 CloseRunFile(runNr, 0, 0);
813 break;
814 }
815 if (beg->fEventCounter != ptr->fEventCounter)
816 {
817 Error("Inconsist run number detected.... closing run.");
818 CloseRunFile(runNr, 0, 0);
819 break;
820 }
821 if (beg->fTriggerCounter != ptr->fTriggerCounter)
822 {
823 Error("Inconsist trigger number detected.... closing run.");
824 CloseRunFile(runNr, 0, 0);
825 break;
826 }*/
827
828 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
829 {
830 fMsg.Error("Inconsistent phase shift detected.... closing run.");
831 CloseRunFile(runNr, 0, 0);
832 return -1;
833 }
834
835 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
836 {
837 fMsg.Error("Inconsistent DAC values detected.... closing run.");
838 CloseRunFile(runNr, 0, 0);
839 return -1;
840 }
841
842 if (beg->fTriggerType != ptr->fTriggerType)
843 {
844 fMsg.Error("Inconsistent trigger type detected.... closing run.");
845 CloseRunFile(runNr, 0, 0);
846 return -1;
847 }
848 }
849
850 // check REFCLK_frequency
851 // check consistency with command configuration
852 // how to log errors?
853 // need gotNewRun/closedRun to know it is finished
854
855 return 0;
856 }
857
858 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
859 {
860 // Currently we send any event no matter what its trigger id is...
861 // To be changed.
862 static Time oldt(boost::date_time::neg_infin);
863 Time newt;
864
865 // FIXME: Only send events if the have newer run-numbers
866 if (newt<oldt+boost::posix_time::seconds(1))
867 return;
868
869 oldt = newt;
870
871 vector<char> data(sizeof(EVENT)+event->Roi*sizeof(float)*(1440+160));
872 memcpy(data.data(), event, sizeof(EVENT));
873
874 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
875
876 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
877 fDimRawData.Update(data);
878
879 DrsCalibrate::RemoveSpikes(vec, event->Roi);
880
881 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
882 DrsCalibrate::GetPixelStats(data2.data(), vec, event->Roi);
883
884 fDimEventData.Update(data2);
885 }
886
887 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
888 {
889 if (!DataCalib::IsValid())
890 return;
891
892 // Workaround to find a valid header.....
893 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
894 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
895
896 // FIMXE: Compare with target configuration
897
898 const FAD::EventHeader *ptr=beg;
899 for (; ptr!=end; ptr++)
900 {
901 if (ptr->fStartDelimiter==0)
902 continue;
903
904 if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
905 return;
906 }
907
908 if (ptr->fStartDelimiter==0)
909 return;
910
911 vector<float> data(event->Roi*1440);
912 DataCalib::Apply(data.data(), event->Adc_Data, event->StartPix, event->Roi);
913
914 DrsCalibrate::RemoveSpikes(data.data(), event->Roi);
915
916 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
917 DrsCalibrate::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
918
919 fDimFeedbackData.Update(data2);
920 }
921
922 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t iboard, void */*buffer*/)
923 {
924 switch (threadID)
925 {
926 case 0:
927 SendRawData(fadhd, event);
928 return 1;
929 case 1:
930 SendFeedbackData(fadhd, event);
931 return 2;
932 }
933 return 100;
934 }
935
936
937 bool IsRunStarted() const
938 {
939 const set<uint32_t>::const_iterator it = fIsRunStarted.find(fRunNumber-1);
940 return it==fIsRunStarted.end();// ? true : it->second.started;
941 }
942
943 uint32_t GetRunNumber() const
944 {
945 return fRunNumber;
946 }
947
948 void IncreaseRunNumber(uint32_t run)
949 {
950 if (run>fRunNumber)
951 fRunNumber = run;
952 }
953
954 void gotNewRun(uint32_t runnr, PEVNT_HEADER */*headers*/)
955 {
956 // This function is called even when writing is switched off
957 set<uint32_t>::iterator it = fIsRunStarted.begin();
958 while (it!=fIsRunStarted.end())
959 {
960 if (*it<runnr)
961 {
962 ostringstream str;
963 str << "gotNewRun - Missed run " << *it << "." << endl;
964 fMsg.Info(str);
965
966 fIsRunStarted.erase(it++);
967 continue;
968 }
969 if (*it==runnr)
970 break;
971 it++;
972 }
973 if (it==fIsRunStarted.end())
974 {
975 ostringstream str;
976 str << "gotNewRun - Not waiting for run " << runnr << "." << endl;
977 fMsg.Warn(str);
978 return;
979 }
980
981 map<uint32_t,FAD::RunDescription>::iterator i2 = fExpectedRuns.find(runnr);
982 if (i2==fExpectedRuns.end())
983 {
984 ostringstream str;
985 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
986 fMsg.Warn(str);
987 return;
988 }
989
990 CloseRunFile(runnr, time(NULL)+i2->second.maxtime, i2->second.maxevt);
991 // return: 0=close scheduled / >0 already closed / <0 does not exist
992
993 // FIXME: Move configuration from expected runs to runs which will soon
994 // be opened/closed
995
996 fIsRunStarted.erase(it);
997 }
998
999 map<boost::thread::id, string> fLastMessage;
1000
1001 void factOut(int severity, int err, const char *message)
1002 {
1003 if (!fDebugLog && severity==99)
1004 return;
1005
1006 ostringstream str;
1007 //str << boost::this_thread::get_id() << " ";
1008 str << "EventBuilder(";
1009 if (err<0)
1010 str << "---";
1011 else
1012 str << err;
1013 str << "): " << message;
1014
1015 string &old = fLastMessage[boost::this_thread::get_id()];
1016
1017 if (str.str()==old)
1018 return;
1019 old = str.str();
1020
1021 fMsg.Update(str, severity);
1022 }
1023/*
1024 void factStat(int64_t *stat, int len)
1025 {
1026 if (len!=7)
1027 {
1028 fMsg.Warn("factStat received unknown number of values.");
1029 return;
1030 }
1031
1032 vector<int64_t> data(1, g_maxMem);
1033 data.insert(data.end(), stat, stat+len);
1034
1035 static vector<int64_t> last(8);
1036 if (data==last)
1037 return;
1038 last = data;
1039
1040 fDimStatistics.Update(data);
1041
1042 // len ist die Laenge des arrays.
1043 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
1044 // kannst Du pruefen, ob die 100MB voll sind ....
1045
1046 ostringstream str;
1047 str
1048 << "Wait=" << stat[0] << " "
1049 << "Skip=" << stat[1] << " "
1050 << "Del=" << stat[2] << " "
1051 << "Tot=" << stat[3] << " "
1052 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1053 << "Read=" << stat[5] << " "
1054 << "Conn=" << stat[6];
1055
1056 fMsg.Info(str);
1057 }
1058 */
1059
1060 void factStat(const EVT_STAT &stat)
1061 {
1062 fDimStatistics2.Update(stat);
1063 }
1064
1065 void factStat(const GUI_STAT &stat)
1066 {
1067 fDimStatistics1.Update(stat);
1068 }
1069
1070
1071 array<FAD::EventHeader, 40> fVecHeader;
1072
1073 template<typename T, class S>
1074 array<T, 42> Compare(const S *vec, const T *t)
1075 {
1076 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1077
1078 const T *min = NULL;
1079 const T *val = NULL;
1080 const T *max = NULL;
1081
1082 array<T, 42> arr;
1083
1084 bool rc = true;
1085 for (int i=0; i<40; i++)
1086 {
1087 const char *base = reinterpret_cast<const char*>(vec+i);
1088 const T *ref = reinterpret_cast<const T*>(base+offset);
1089
1090 arr[i] = *ref;
1091
1092 if (gi_NumConnect[i]!=7)
1093 {
1094 arr[i] = 0;
1095 continue;
1096 }
1097
1098 if (!val)
1099 {
1100 min = ref;
1101 val = ref;
1102 max = ref;
1103 }
1104
1105 if (*ref<*min)
1106 min = ref;
1107
1108 if (*ref>*max)
1109 max = ref;
1110
1111 if (*val!=*ref)
1112 rc = false;
1113 }
1114
1115 arr[40] = val ? *min : 1;
1116 arr[41] = val ? *max : 0;
1117
1118 return arr;
1119 }
1120
1121 template<typename T>
1122 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1123 {
1124 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1125
1126 T val = 0;
1127 T rc = 0;
1128
1129 array<T, 42> vec;
1130
1131 bool first = true;
1132
1133 for (int i=0; i<40; i++)
1134 {
1135 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1136 const T *ref = reinterpret_cast<const T*>(base+offset);
1137
1138 vec[i+2] = *ref;
1139
1140 if (gi_NumConnect[i]!=7)
1141 {
1142 vec[i+2] = 0;
1143 continue;
1144 }
1145
1146 if (first)
1147 {
1148 first = false;
1149 val = *ref;
1150 rc = 0;
1151 }
1152
1153 rc |= val^*ref;
1154 }
1155
1156 vec[0] = rc;
1157 vec[1] = val;
1158
1159 return vec;
1160 }
1161
1162 template<typename T, size_t N>
1163 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1164 {
1165// svc.setQuality(vec[40]<=vec[41]);
1166 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1167 svc.Update();
1168 }
1169
1170 template<typename T>
1171 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1172 {
1173 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1174 for (int i=0; i<40;i++)
1175 cout << " " << data.second[i+3];
1176 cout << endl;
1177 }
1178
1179 vector<uint> fNumConnected;
1180
1181 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1182 {
1183 const uint16_t id = h.Id();
1184 if (id>39)
1185 return;
1186
1187 if (fNumConnected.size()!=40)
1188 fNumConnected.resize(40);
1189
1190 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1191
1192 const bool changed = con!=fNumConnected || !IsThreadRunning();
1193
1194 fNumConnected = con;
1195
1196 const FAD::EventHeader old = fVecHeader[id];
1197 fVecHeader[id] = h;
1198
1199 if (old.fVersion != h.fVersion || changed)
1200 {
1201 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1202
1203 array<float,42> data;
1204 for (int i=0; i<42; i++)
1205 {
1206 ostringstream str;
1207 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1208 data[i] = stof(str.str());
1209 }
1210 Update(fDimFwVersion, data);
1211 }
1212
1213 if (old.fRunNumber != h.fRunNumber || changed)
1214 {
1215 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1216 fDimRunNumber.Update(run);
1217 }
1218
1219 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1220 {
1221 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1222 fDimPrescaler.Update(pre);
1223 }
1224
1225 if (old.fDNA != h.fDNA || changed)
1226 {
1227 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1228 Update(fDimDNA, dna, 40);
1229 }
1230
1231 if (old.fStatus != h.fStatus || changed)
1232 {
1233 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1234 Update(fDimStatus, sts);
1235 }
1236
1237 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1238 {
1239 array<uint16_t, FAD::kNumDac*42> dacs;
1240
1241 for (int i=0; i<FAD::kNumDac; i++)
1242 {
1243 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1244 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1245 }
1246
1247 Update(fDimDac, dacs);
1248 }
1249
1250 // -----------
1251
1252 static Time oldt(boost::date_time::neg_infin);
1253 Time newt;
1254
1255 if (newt>oldt+boost::posix_time::seconds(1))
1256 {
1257 oldt = newt;
1258
1259 // --- RefClock
1260
1261 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1262 Update(fDimRefClock, clk);
1263
1264 // --- Temperatures
1265
1266 const array<int16_t,42> tmp[4] =
1267 {
1268 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1269 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1270 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1271 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1272 };
1273
1274 vector<int16_t> data;
1275 data.reserve(82);
1276 data.push_back(tmp[0][40]); // min: 0
1277 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1278 data.push_back(tmp[0][41]); // max: 41
1279 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1280
1281 for (int j=1; j<=3; j++)
1282 {
1283 const array<int16_t,42> &ref = tmp[j];
1284
1285 // Gloabl min
1286 if (ref[40]<data[0]) // 40=min
1287 data[0] = ref[40];
1288
1289 // Global max
1290 if (ref[41]>data[41]) // 41=max
1291 data[41] = ref[41];
1292
1293 for (int i=0; i<40; i++)
1294 {
1295 // min per board
1296 if (ref[i]<data[i+1]) // data: 1-40
1297 data[i+1] = ref[i]; // ref: 0-39
1298
1299 // max per board
1300 if (ref[i]>data[i+42]) // data: 42-81
1301 data[i+42] = ref[i]; // ref: 0-39
1302 }
1303
1304
1305 }
1306
1307 vector<float> deg(82); // 0: global min, 1-40: min
1308 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1309 deg[i] = data[i]/16.;
1310 fDimTemperature.Update(deg);
1311 }
1312 }
1313};
1314
1315EventBuilderWrapper *EventBuilderWrapper::This = 0;
1316
1317// ----------- Event builder callbacks implementation ---------------
1318extern "C"
1319{
1320 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1321 {
1322 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1323 }
1324
1325 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1326 {
1327 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1328 }
1329
1330 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1331 {
1332 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1333 }
1334
1335 // -----
1336
1337 void *runStart(uint32_t irun, RUN_HEAD *runhd, size_t len)
1338 {
1339 return NULL;
1340 }
1341
1342 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
1343 {
1344 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
1345 }
1346
1347 int runEnd(uint32_t, void *runPtr)
1348 {
1349 return 0;
1350 }
1351
1352 // -----
1353
1354 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
1355 {
1356 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
1357 }
1358
1359 void gotNewRun(uint32_t runnr, PEVNT_HEADER *headers)
1360 {
1361 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1362 }
1363
1364 // -----
1365
1366 void factOut(int severity, int err, const char *message)
1367 {
1368 EventBuilderWrapper::This->factOut(severity, err, message);
1369 }
1370
1371 void factStat(GUI_STAT stat)
1372 {
1373 EventBuilderWrapper::This->factStat(stat);
1374 }
1375
1376 void factStatNew(EVT_STAT stat)
1377 {
1378 EventBuilderWrapper::This->factStat(stat);
1379 }
1380
1381 // ------
1382
1383 void debugHead(int socket, int/*board*/, void *buf)
1384 {
1385 const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf);
1386 EventBuilderWrapper::This->debugHead(socket, h);
1387 }
1388
1389 void debugStream(int isock, void *buf, int len)
1390 {
1391 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1392 }
1393
1394 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1395 {
1396 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1397 }
1398}
1399
1400#endif
Note: See TracBrowser for help on using the repository browser.