source: trunk/FACT++/src/EventBuilderWrapper.h@ 12491

Last change on this file since 12491 was 12489, checked in by tbretz, 13 years ago
Fixed some mistakes in the new algorithmsin gotNewRun and runOpen
File size: 40.1 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 enum FileFormat_t
78 {
79 kNone = 0,
80 kDebug,
81 kFits,
82 kRaw,
83 kCalib
84 };
85
86 FileFormat_t fFileFormat;
87
88
89 uint32_t fMaxRun;
90 uint32_t fLastOpened;
91 uint32_t fLastClosed;
92 uint32_t fNumEvts[4];
93
94 DimWriteStatistics fDimWriteStats;
95 DimDescribedService fDimRuns;
96 DimDescribedService fDimEvents;
97 DimDescribedService fDimRawData;
98 DimDescribedService fDimEventData;
99 DimDescribedService fDimFeedbackData;
100 DimDescribedService fDimFwVersion;
101 DimDescribedService fDimRunNumber;
102 DimDescribedService fDimStatus;
103 DimDescribedService fDimDNA;
104 DimDescribedService fDimTemperature;
105 DimDescribedService fDimPrescaler;
106 DimDescribedService fDimRefClock;
107 DimDescribedService fDimRoi;
108 DimDescribedService fDimDac;
109 DimDescribedService fDimDrsCalibration;
110 DimDescribedService fDimStatistics1;
111 DimDescribedService fDimStatistics2;
112
113 bool fDebugStream;
114 bool fDebugRead;
115 bool fDebugLog;
116
117 string fPath;
118 uint32_t fRunNumber;
119
120protected:
121 int64_t InitRunNumber()
122 {
123 fRunNumber = 1000;
124
125 // Ensure that the night doesn't change during our check
126 const int check = Time().NightAsInt();
127
128 while (--fRunNumber>0)
129 {
130 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
131
132 if (access((name+"bin").c_str(), F_OK) == 0)
133 break;
134 if (access((name+"fits").c_str(), F_OK) == 0)
135 break;
136 if (access((name+"drs.fits").c_str(), F_OK) == 0)
137 break;
138
139 }
140
141 if (check != Time().NightAsInt())
142 return InitRunNumber();
143
144 fRunNumber++;
145
146 if (fRunNumber==1000)
147 {
148 fMsg.Error("You have a file with run-number 1000 in "+fPath);
149 return -1;
150 }
151
152 ostringstream str;
153 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
154 fMsg.Message(str);
155
156 fMsg.Info(" ==> TODO: Crosscheck with database!");
157
158 return check;
159 }
160
161 int64_t InitRunNumber(const string &path)
162 {
163 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
164 {
165 fMsg.Error("Data path "+path+" does not exist!");
166 return -1;
167 }
168
169 //const fs::path fullPath = fs::system_complete(fs::path(path));
170
171 fPath = path;
172
173 fDimWriteStats.SetCurrentFolder(fPath);
174
175 return InitRunNumber();
176 }
177
178public:
179 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
180 fFileFormat(kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
181 fDimWriteStats ("FAD_CONTROL", imp),
182 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
183 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
184 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
185 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
186 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
187 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
188 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
189 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
190 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
191 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
192 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
193 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
194 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
195 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
196 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:1;I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:163840;F:163840", ""),
197 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
198 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
199 fDebugStream(false), fDebugRead(false), fDebugLog(false)
200 {
201 if (This)
202 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
203
204 This = this;
205
206 memset(fNumEvts, 0, sizeof(fNumEvts));
207 fDimEvents.Update(fNumEvts);
208
209 for (size_t i=0; i<40; i++)
210 ConnectSlot(i, tcp::endpoint());
211 }
212 virtual ~EventBuilderWrapper()
213 {
214 Abort();
215
216 // FIXME: Used timed_join and abort afterwards
217 // What's the maximum time the eb need to abort?
218 fThread.join();
219 //ffMsg.Info("EventBuilder stopped.");
220
221 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
222 delete *it;
223 }
224
225 set<uint32_t> fIsRunStarted;
226 map<uint32_t, FAD::RunDescription> fExpectedRuns;
227
228 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
229 {
230 if (maxtime<=0 || maxtime>24*60*60)
231 maxtime = 24*60*60;
232 if (maxevt<=0 || maxevt>INT32_MAX)
233 maxevt = INT32_MAX;
234
235 const FAD::RunDescription descr =
236 {
237 uint32_t(maxtime),
238 uint32_t(maxevt),
239 ref.first,
240 ref.second,
241 };
242
243 // FIMXE: Maybe reset an event counter so that the mcp can count events?
244
245 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
246
247 fExpectedRuns[fRunNumber] = descr;
248 fIsRunStarted.insert(fRunNumber);
249 return fRunNumber++;
250 }
251
252 bool IsThreadRunning()
253 {
254 return !fThread.timed_join(boost::posix_time::microseconds(0));
255 }
256
257 void SetMaxMemory(unsigned int mb) const
258 {
259 /*
260 if (mb*1000000<GetUsedMemory())
261 {
262 // ffMsg.Warn("...");
263 return;
264 }*/
265
266 g_maxMem = size_t(mb)*1000000;
267 }
268
269 void StartThread(const vector<tcp::endpoint> &addr)
270 {
271 if (IsThreadRunning())
272 {
273 fMsg.Warn("Start - EventBuilder still running");
274 return;
275 }
276
277 fLastMessage.clear();
278
279 for (size_t i=0; i<40; i++)
280 ConnectSlot(i, addr[i]);
281
282 g_runStat = kModeRun;
283 g_maxProc = 3;
284
285 fMsg.Message("Starting EventBuilder thread");
286
287 fThread = boost::thread(StartEvtBuild);
288 }
289 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
290 {
291 if (i>39)
292 return;
293
294 if (addr==tcp::endpoint())
295 {
296 DisconnectSlot(i);
297 return;
298 }
299
300 g_port[i].sockAddr.sin_family = AF_INET;
301 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
302 g_port[i].sockAddr.sin_port = htons(addr.port());
303 // In this order
304 g_port[i].sockDef = 1;
305 }
306 void DisconnectSlot(unsigned int i)
307 {
308 if (i>39)
309 return;
310
311 g_port[i].sockDef = 0;
312 // In this order
313 g_port[i].sockAddr.sin_family = AF_INET;
314 g_port[i].sockAddr.sin_addr.s_addr = 0;
315 g_port[i].sockAddr.sin_port = 0;
316 }
317 void IgnoreSlot(unsigned int i)
318 {
319 if (i>39)
320 return;
321 if (g_port[i].sockAddr.sin_port==0)
322 return;
323
324 g_port[i].sockDef = -1;
325 }
326
327
328 void Abort()
329 {
330 fMsg.Message("Signal abort to EventBuilder thread...");
331 g_runStat = kAbort;
332 }
333
334 void ResetThread(bool soft)
335 {
336 /*
337 if (g_reset > 0)
338
339 * suspend reading
340 * reset = g_reset;
341 * g_reset=0
342
343 * reset% 10
344 == 0 leave event Buffers as they are
345 == 1 let all buffers drain (write (incomplete) events)
346 > 1 flush all buffers (do not write buffered events)
347
348 * (reset/10)%10
349 > 0 close all sockets and destroy them (also free the
350 allocated read-buffers)
351 recreate before resuming operation
352 [ this is more than just close/open that can be
353 triggered by e.g. close/open the base-socket ]
354
355 * (reset/100)%10
356 > 0 close all open run-files
357
358 * (reset/1000)
359 sleep so many seconds before resuming operation
360 (does not (yet) take into account time left when waiting
361 for buffers getting empty ...)
362
363 * resume_reading
364
365 */
366 fMsg.Message("Signal reset to EventBuilder thread...");
367 g_reset = soft ? 101 : 102;
368 }
369
370 void Exit()
371 {
372 fMsg.Message("Signal exit to EventBuilder thread...");
373 g_runStat = kExit;
374 }
375
376 /*
377 void Wait()
378 {
379 fThread.join();
380 ffMsg.Message("EventBuilder stopped.");
381 }*/
382
383 void Hybernate() const { g_runStat = kHybernate; }
384 void Sleep() const { g_runStat = kSleep; }
385 void FlushMode() const { g_runStat = kModeFlush; }
386 void TestMode() const { g_runStat = kModeTest; }
387 void FlagMode() const { g_runStat = kModeFlag; }
388 void RunMode() const { g_runStat = kModeRun; }
389
390 // FIXME: To be removed
391 void SetMode(int mode) const { g_runStat = mode; }
392
393 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
394 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
395 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
396 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
397 int GetNumFilesOpen() const { return fFiles.size(); }
398
399 /*
400 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
401 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
402 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
403 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
404 */
405
406 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
407 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
408
409 void SetOutputFormat(FileFormat_t f)
410 {
411 fFileFormat = f;
412 if (fFileFormat==kCalib)
413 {
414 DataCalib::Restart();
415 DataCalib::Update(fDimDrsCalibration);
416 fMsg.Message("Resetted DRS calibration.");
417 }
418 }
419
420 void SetDebugLog(bool b) { fDebugLog = b; }
421
422 void SetDebugStream(bool b)
423 {
424 fDebugStream = b;
425 if (b)
426 return;
427
428 for (int i=0; i<40; i++)
429 {
430 if (!fDumpStream[i].is_open())
431 continue;
432
433 fDumpStream[i].close();
434
435 ostringstream name;
436 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
437 fMsg.Message("Closed file '"+name.str()+"'");
438 }
439 }
440
441 void SetDebugRead(bool b)
442 {
443 fDebugRead = b;
444 if (b || !fDumpRead.is_open())
445 return;
446
447 fDumpRead.close();
448 fMsg.Message("Closed file 'socket_events.txt'");
449 }
450
451// size_t GetUsedMemory() const { return gi_usedMem; }
452
453 void LoadDrsCalibration(const char *fname)
454 {
455 if (!DataCalib::ReadFits(fname, fMsg))
456 return;
457 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
458 DataCalib::Update(fDimDrsCalibration);
459 }
460
461 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
462
463
464 /*
465 struct OpenFileToDim
466 {
467 int code;
468 char fileName[FILENAME_MAX];
469 };
470
471 SignalRunOpened(runid, filename);
472 // Send num open files
473 // Send runid, (more info about the run?), filename via dim
474
475 SignalEvtWritten(runid);
476 // Send num events written of newest file
477
478 SignalRunClose(runid);
479 // Send new num open files
480 // Send empty file-name if no file is open
481
482 */
483
484 // -------------- Mapped event builder callbacks ------------------
485
486 void UpdateRuns(const string &fname="")
487 {
488 uint32_t values[5] =
489 {
490 static_cast<uint32_t>(fFiles.size()),
491 0xffffffff,
492 0,
493 fLastOpened,
494 fLastClosed
495 };
496
497 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
498 it!=fFiles.end(); it++)
499 {
500 const DataProcessorImp *file = *it;
501
502 if (file->GetRunId()<values[1])
503 values[1] = file->GetRunId();
504
505 if (file->GetRunId()>values[2])
506 values[2] = file->GetRunId();
507 }
508
509 fMaxRun = values[2];
510
511 vector<char> data(sizeof(values)+fname.size()+1);
512 memcpy(data.data(), values, sizeof(values));
513 strcpy(data.data()+sizeof(values), fname.c_str());
514
515 fDimRuns.Update(data);
516 }
517
518 vector<DataProcessorImp*> fFiles;
519
520 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
521 {
522 fMsg.Info(" ==> TODO: Update run configuration in database!");
523 fMsg.Info(" ==> TODO: Remove run from fExpected runs in case of failure!");
524 fMsg.Info(" ==> TODO: Write information from fTargetConfig to header!");
525
526 map<uint32_t,FAD::RunDescription>::iterator it = fExpectedRuns.begin();
527 while (it!=fExpectedRuns.end())
528 {
529 if (it->first<runid)
530 {
531 ostringstream str;
532 str << "runOpen - Missed run " << it->first << "." << endl;
533 fMsg.Info(str);
534
535 fExpectedRuns.erase(it++);
536 continue;
537 }
538 if (it->first==runid)
539 break;
540 it++;
541 }
542 if (it==fExpectedRuns.end())
543 {
544 ostringstream str;
545 str << "runOpen - Run " << runid << " wasn't expected." << endl;
546 fMsg.Warn(str);
547 }
548
549 const FAD::RunDescription desc = it->second;
550
551 fExpectedRuns.erase(it);
552
553 // Check if file already exists...
554 DataProcessorImp *file = 0;
555 switch (fFileFormat)
556 {
557 case kNone: file = new DataDump(fPath, runid, fMsg); break;
558 case kDebug: file = new DataDebug(fPath, runid, fMsg); break;
559 case kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
560 case kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
561 case kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
562 }
563
564 try
565 {
566 if (!file->Open(h, desc))
567 return 0;
568 }
569 catch (const exception &e)
570 {
571 fMsg.Error("Exception trying to open file: "+string(e.what()));
572 return 0;
573 }
574
575 fFiles.push_back(file);
576
577 ostringstream str;
578 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
579 fMsg.Info(str);
580
581 fDimWriteStats.FileOpened(file->GetFileName());
582
583 fLastOpened = runid;
584 UpdateRuns(file->GetFileName());
585
586 fNumEvts[kEventId] = 0;
587 fNumEvts[kTriggerId] = 0;
588
589 fNumEvts[kCurrent] = 0;
590 fDimEvents.Update(fNumEvts);
591 // fDimCurrentEvent.Update(uint32_t(0));
592
593 return reinterpret_cast<FileHandle_t>(file);
594 }
595
596 int runWrite(FileHandle_t handler, EVENT *e, size_t sz)
597 {
598 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
599
600 if (!file->WriteEvt(e))
601 return -1;
602
603 if (file->GetRunId()==fMaxRun)
604 {
605 fNumEvts[kCurrent]++;
606 fNumEvts[kEventId] = e->EventNum;
607 fNumEvts[kTriggerId] = e->TriggerNum;
608 }
609
610 fNumEvts[kTotal]++;
611
612 static Time oldt(boost::date_time::neg_infin);
613 Time newt;
614 if (newt>oldt+boost::posix_time::seconds(1))
615 {
616 fDimEvents.Update(fNumEvts);
617 oldt = newt;
618 }
619
620
621 // ===> SignalEvtWritten(runid);
622 // Send num events written of newest file
623
624 /* close run runId (all all runs if runId=0) */
625 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
626 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
627
628 return 0;
629 }
630
631 virtual void CloseRun(uint32_t runid) { }
632
633 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
634 {
635 fMsg.Info(" ==> TODO: Update run configuration in database!");
636 fMsg.Info(" ==> TODO: Remove run from fExpected runs!");
637
638 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
639
640 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
641 if (it==fFiles.end())
642 {
643 ostringstream str;
644 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
645 fMsg.Fatal(str);
646 return -1;
647 }
648
649 fFiles.erase(it);
650
651 fLastClosed = file->GetRunId();
652 CloseRun(fLastClosed);
653 UpdateRuns();
654
655 fDimEvents.Update(fNumEvts);
656
657 const bool rc = file->Close(tail);
658 if (!rc)
659 {
660 // Error message
661 }
662
663 ostringstream str;
664 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
665 fMsg.Info(str);
666
667 delete file;
668
669 // ==> SignalRunClose(runid);
670 // Send new num open files
671 // Send empty file-name if no file is open
672
673 return rc ? 0 : -1;
674 }
675
676 ofstream fDumpStream[40];
677
678 void debugStream(int isock, void *buf, int len)
679 {
680 if (!fDebugStream)
681 return;
682
683 const int slot = isock/7;
684 if (slot<0 || slot>39)
685 return;
686
687 if (!fDumpStream[slot].is_open())
688 {
689 ostringstream name;
690 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
691
692 fDumpStream[slot].open(name.str().c_str(), ios::app);
693 if (!fDumpStream[slot])
694 {
695 ostringstream str;
696 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
697 fMsg.Error(str);
698
699 return;
700 }
701
702 fMsg.Message("Opened file '"+name.str()+"' for writing.");
703 }
704
705 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
706 }
707
708 ofstream fDumpRead; // Stream to possibly dump docket events
709
710 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
711 {
712 // isock = socketID (0-279)
713 // ibyte = #bytes gelesen
714 // event = eventId (oder 0 wenn noch nicht bekannt)
715 // state : 1=finished reading data
716 // 0=reading data
717 // -1=start reading data (header)
718 // -2=start reading data,
719 // eventId not known yet (too little data)
720 // tsec, tusec = time when reading seconds, microseconds
721 //
722 if (!fDebugRead || ibyte==0)
723 return;
724
725 if (!fDumpRead.is_open())
726 {
727 fDumpRead.open("socket_events.txt", ios::app);
728 if (!fDumpRead)
729 {
730 ostringstream str;
731 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
732 fMsg.Error(str);
733
734 return;
735 }
736
737 fMsg.Message("Opened file 'socket_events.txt' for writing.");
738
739 fDumpRead << "# START: " << Time().GetAsStr() << endl;
740 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
741 }
742
743 fDumpRead
744 << setw(2) << state << " "
745 << setw(8) << tsec << " "
746 << setw(9) << tusec << " "
747 << setw(3) << isock << " "
748 << setw(2) << isock/7 << " "
749 << runno << " "
750 << event << " "
751 << ftmevt << " "
752 << ibyte << endl;
753 }
754
755 array<uint16_t,2> fVecRoi;
756
757 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int iboard)
758 {
759 /*
760 fadhd[i] ist ein array mit den 40 fad-headers
761 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
762
763 event ist die Struktur, die auch die write routine erhaelt;
764 darin sind im header die 'soll-werte' fuer z.B. eventID
765 als auch die ADC-Werte (falls Du die brauchst)
766
767 Wenn die routine einen negativen Wert liefert, wird das event
768 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
769 */
770
771 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
772
773 if (roi!=fVecRoi)
774 {
775 Update(fDimRoi, roi);
776 fVecRoi = roi;
777 }
778
779 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
780 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
781
782 // FIMXE: Compare with target configuration
783
784 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
785 {
786 // FIXME: Compare with expectations!!!
787 if (ptr->fStartDelimiter==0)
788 {
789 if (ptr==beg)
790 beg++;
791 continue;
792 }
793
794 if (beg->fStatus != ptr->fStatus)
795 {
796 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
797 CloseRunFile(runNr, 0, 0);
798 return -1;
799 }
800
801 if (beg->fRunNumber != ptr->fRunNumber)
802 {
803 fMsg.Error("Inconsistent run number detected.... closing run.");
804 CloseRunFile(runNr, 0, 0);
805 return -1;
806 }
807
808 /*
809 if (beg->fVersion != ptr->fVersion)
810 {
811 Error("Inconsist firmware version detected.... closing run.");
812 CloseRunFile(runNr, 0, 0);
813 break;
814 }
815 if (beg->fEventCounter != ptr->fEventCounter)
816 {
817 Error("Inconsist run number detected.... closing run.");
818 CloseRunFile(runNr, 0, 0);
819 break;
820 }
821 if (beg->fTriggerCounter != ptr->fTriggerCounter)
822 {
823 Error("Inconsist trigger number detected.... closing run.");
824 CloseRunFile(runNr, 0, 0);
825 break;
826 }*/
827
828 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
829 {
830 fMsg.Error("Inconsistent phase shift detected.... closing run.");
831 CloseRunFile(runNr, 0, 0);
832 return -1;
833 }
834
835 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
836 {
837 fMsg.Error("Inconsistent DAC values detected.... closing run.");
838 CloseRunFile(runNr, 0, 0);
839 return -1;
840 }
841
842 if (beg->fTriggerType != ptr->fTriggerType)
843 {
844 fMsg.Error("Inconsistent trigger type detected.... closing run.");
845 CloseRunFile(runNr, 0, 0);
846 return -1;
847 }
848 }
849
850 // check REFCLK_frequency
851 // check consistency with command configuration
852 // how to log errors?
853 // need gotNewRun/closedRun to know it is finished
854
855 return 0;
856 }
857
858 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
859 {
860 // Currently we send any event no matter what its trigger id is...
861 // To be changed.
862 static Time oldt(boost::date_time::neg_infin);
863 Time newt;
864
865 // FIXME: Only send events if the have newer run-numbers
866 if (newt<oldt+boost::posix_time::seconds(1))
867 return;
868
869 oldt = newt;
870
871 vector<char> data(sizeof(EVENT)+event->Roi*sizeof(float)*(1440+160));
872 memcpy(data.data(), event, sizeof(EVENT));
873
874 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
875
876 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
877 fDimRawData.Update(data);
878
879 DrsCalibrate::RemoveSpikes(vec, event->Roi);
880
881 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
882 DrsCalibrate::GetPixelStats(data2.data(), vec, event->Roi);
883
884 fDimEventData.Update(data2);
885 }
886
887 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
888 {
889 if (!DataCalib::IsValid())
890 return;
891
892 // Workaround to find a valid header.....
893 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
894 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
895
896 // FIMXE: Compare with target configuration
897
898 const FAD::EventHeader *ptr=beg;
899 for (; ptr!=end; ptr++)
900 {
901 if (ptr->fStartDelimiter==0)
902 continue;
903
904 // ptr->fTriggerType = htons(ptr->fTriggerType)
905
906 if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
907 return;
908 }
909
910 if (ptr->fStartDelimiter==0)
911 return;
912
913 vector<float> data(event->Roi*1440);
914 DataCalib::Apply(data.data(), event->Adc_Data, event->StartPix, event->Roi);
915
916 DrsCalibrate::RemoveSpikes(data.data(), event->Roi);
917
918 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
919 DrsCalibrate::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
920
921 fDimFeedbackData.Update(data2);
922 }
923
924 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t iboard, void */*buffer*/)
925 {
926 switch (threadID)
927 {
928 case 0:
929 SendRawData(fadhd, event);
930 return 1;
931 case 1:
932 SendFeedbackData(fadhd, event);
933 return 2;
934 }
935 return 100;
936 }
937
938
939 bool IsRunStarted() const
940 {
941 const set<uint32_t>::const_iterator it = fIsRunStarted.find(fRunNumber-1);
942 return it==fIsRunStarted.end();// ? true : it->second.started;
943 }
944
945 uint32_t GetRunNumber() const
946 {
947 return fRunNumber;
948 }
949
950 void IncreaseRunNumber(uint32_t run)
951 {
952 if (run>fRunNumber)
953 fRunNumber = run;
954 }
955
956 void gotNewRun(uint32_t runnr, PEVNT_HEADER */*headers*/)
957 {
958 // This function is called even when writing is switched off
959 set<uint32_t>::iterator it = fIsRunStarted.begin();
960 while (it!=fIsRunStarted.end())
961 {
962 if (*it<runnr)
963 {
964 ostringstream str;
965 str << "gotNewRun - Missed run " << *it << "." << endl;
966 fMsg.Info(str);
967
968 fIsRunStarted.erase(it++);
969 continue;
970 }
971 if (*it==runnr)
972 break;
973 it++;
974 }
975 if (it==fIsRunStarted.end())
976 {
977 ostringstream str;
978 str << "gotNewRun - Not waiting for run " << runnr << "." << endl;
979 fMsg.Warn(str);
980 return;
981 }
982
983 map<uint32_t,FAD::RunDescription>::iterator i2 = fExpectedRuns.find(runnr);
984 if (i2==fExpectedRuns.end())
985 {
986 ostringstream str;
987 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
988 fMsg.Warn(str);
989 return;
990 }
991
992 CloseRunFile(runnr, time(NULL)+i2->second.maxtime, i2->second.maxevt);
993 // return: 0=close scheduled / >0 already closed / <0 does not exist
994
995 // FIXME: Move configuration from expected runs to runs which will soon
996 // be opened/closed
997
998 fIsRunStarted.erase(it);
999 }
1000
1001 map<boost::thread::id, string> fLastMessage;
1002
1003 void factOut(int severity, int err, const char *message)
1004 {
1005 if (!fDebugLog && severity==99)
1006 return;
1007
1008 ostringstream str;
1009 //str << boost::this_thread::get_id() << " ";
1010 str << "EventBuilder(";
1011 if (err<0)
1012 str << "---";
1013 else
1014 str << err;
1015 str << "): " << message;
1016
1017 string &old = fLastMessage[boost::this_thread::get_id()];
1018
1019 if (str.str()==old)
1020 return;
1021 old = str.str();
1022
1023 fMsg.Update(str, severity);
1024 }
1025/*
1026 void factStat(int64_t *stat, int len)
1027 {
1028 if (len!=7)
1029 {
1030 fMsg.Warn("factStat received unknown number of values.");
1031 return;
1032 }
1033
1034 vector<int64_t> data(1, g_maxMem);
1035 data.insert(data.end(), stat, stat+len);
1036
1037 static vector<int64_t> last(8);
1038 if (data==last)
1039 return;
1040 last = data;
1041
1042 fDimStatistics.Update(data);
1043
1044 // len ist die Laenge des arrays.
1045 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
1046 // kannst Du pruefen, ob die 100MB voll sind ....
1047
1048 ostringstream str;
1049 str
1050 << "Wait=" << stat[0] << " "
1051 << "Skip=" << stat[1] << " "
1052 << "Del=" << stat[2] << " "
1053 << "Tot=" << stat[3] << " "
1054 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1055 << "Read=" << stat[5] << " "
1056 << "Conn=" << stat[6];
1057
1058 fMsg.Info(str);
1059 }
1060 */
1061
1062 void factStat(const EVT_STAT &stat)
1063 {
1064 fDimStatistics2.Update(stat);
1065 }
1066
1067 void factStat(const GUI_STAT &stat)
1068 {
1069 fDimStatistics1.Update(stat);
1070 }
1071
1072
1073 array<FAD::EventHeader, 40> fVecHeader;
1074
1075 template<typename T, class S>
1076 array<T, 42> Compare(const S *vec, const T *t)
1077 {
1078 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1079
1080 const T *min = NULL;
1081 const T *val = NULL;
1082 const T *max = NULL;
1083
1084 array<T, 42> arr;
1085
1086 bool rc = true;
1087 for (int i=0; i<40; i++)
1088 {
1089 const char *base = reinterpret_cast<const char*>(vec+i);
1090 const T *ref = reinterpret_cast<const T*>(base+offset);
1091
1092 arr[i] = *ref;
1093
1094 if (gi_NumConnect[i]!=7)
1095 {
1096 arr[i] = 0;
1097 continue;
1098 }
1099
1100 if (!val)
1101 {
1102 min = ref;
1103 val = ref;
1104 max = ref;
1105 }
1106
1107 if (*ref<*min)
1108 min = ref;
1109
1110 if (*ref>*max)
1111 max = ref;
1112
1113 if (*val!=*ref)
1114 rc = false;
1115 }
1116
1117 arr[40] = val ? *min : 1;
1118 arr[41] = val ? *max : 0;
1119
1120 return arr;
1121 }
1122
1123 template<typename T>
1124 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1125 {
1126 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1127
1128 T val = 0;
1129 T rc = 0;
1130
1131 array<T, 42> vec;
1132
1133 bool first = true;
1134
1135 for (int i=0; i<40; i++)
1136 {
1137 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1138 const T *ref = reinterpret_cast<const T*>(base+offset);
1139
1140 vec[i+2] = *ref;
1141
1142 if (gi_NumConnect[i]!=7)
1143 {
1144 vec[i+2] = 0;
1145 continue;
1146 }
1147
1148 if (first)
1149 {
1150 first = false;
1151 val = *ref;
1152 rc = 0;
1153 }
1154
1155 rc |= val^*ref;
1156 }
1157
1158 vec[0] = rc;
1159 vec[1] = val;
1160
1161 return vec;
1162 }
1163
1164 template<typename T, size_t N>
1165 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1166 {
1167// svc.setQuality(vec[40]<=vec[41]);
1168 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1169 svc.Update();
1170 }
1171
1172 template<typename T>
1173 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1174 {
1175 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1176 for (int i=0; i<40;i++)
1177 cout << " " << data.second[i+3];
1178 cout << endl;
1179 }
1180
1181 vector<uint> fNumConnected;
1182
1183 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1184 {
1185 const uint16_t id = h.Id();
1186 if (id>39)
1187 return;
1188
1189 if (fNumConnected.size()!=40)
1190 fNumConnected.resize(40);
1191
1192 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1193
1194 const bool changed = con!=fNumConnected || !IsThreadRunning();
1195
1196 fNumConnected = con;
1197
1198 const FAD::EventHeader old = fVecHeader[id];
1199 fVecHeader[id] = h;
1200
1201 if (old.fVersion != h.fVersion || changed)
1202 {
1203 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1204
1205 array<float,42> data;
1206 for (int i=0; i<42; i++)
1207 {
1208 ostringstream str;
1209 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1210 data[i] = stof(str.str());
1211 }
1212 Update(fDimFwVersion, data);
1213 }
1214
1215 if (old.fRunNumber != h.fRunNumber || changed)
1216 {
1217 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1218 fDimRunNumber.Update(run);
1219 }
1220
1221 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1222 {
1223 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1224 fDimPrescaler.Update(pre);
1225 }
1226
1227 if (old.fDNA != h.fDNA || changed)
1228 {
1229 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1230 Update(fDimDNA, dna, 40);
1231 }
1232
1233 if (old.fStatus != h.fStatus || changed)
1234 {
1235 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1236 Update(fDimStatus, sts);
1237 }
1238
1239 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1240 {
1241 array<uint16_t, FAD::kNumDac*42> dacs;
1242
1243 for (int i=0; i<FAD::kNumDac; i++)
1244 {
1245 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1246 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1247 }
1248
1249 Update(fDimDac, dacs);
1250 }
1251
1252 // -----------
1253
1254 static Time oldt(boost::date_time::neg_infin);
1255 Time newt;
1256
1257 if (newt>oldt+boost::posix_time::seconds(1))
1258 {
1259 oldt = newt;
1260
1261 // --- RefClock
1262
1263 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1264 Update(fDimRefClock, clk);
1265
1266 // --- Temperatures
1267
1268 const array<int16_t,42> tmp[4] =
1269 {
1270 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1271 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1272 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1273 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1274 };
1275
1276 vector<int16_t> data;
1277 data.reserve(82);
1278 data.push_back(tmp[0][40]); // min: 0
1279 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1280 data.push_back(tmp[0][41]); // max: 41
1281 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1282
1283 for (int j=1; j<=3; j++)
1284 {
1285 const array<int16_t,42> &ref = tmp[j];
1286
1287 // Gloabl min
1288 if (ref[40]<data[0]) // 40=min
1289 data[0] = ref[40];
1290
1291 // Global max
1292 if (ref[41]>data[41]) // 41=max
1293 data[41] = ref[41];
1294
1295 for (int i=0; i<40; i++)
1296 {
1297 // min per board
1298 if (ref[i]<data[i+1]) // data: 1-40
1299 data[i+1] = ref[i]; // ref: 0-39
1300
1301 // max per board
1302 if (ref[i]>data[i+42]) // data: 42-81
1303 data[i+42] = ref[i]; // ref: 0-39
1304 }
1305
1306
1307 }
1308
1309 vector<float> deg(82); // 0: global min, 1-40: min
1310 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1311 deg[i] = data[i]/16.;
1312 fDimTemperature.Update(deg);
1313 }
1314 }
1315};
1316
1317EventBuilderWrapper *EventBuilderWrapper::This = 0;
1318
1319// ----------- Event builder callbacks implementation ---------------
1320extern "C"
1321{
1322 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1323 {
1324 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1325 }
1326
1327 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1328 {
1329 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1330 }
1331
1332 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1333 {
1334 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1335 }
1336
1337 // -----
1338
1339 void *runStart(uint32_t irun, RUN_HEAD *runhd, size_t len)
1340 {
1341 return NULL;
1342 }
1343
1344 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
1345 {
1346 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
1347 }
1348
1349 int runEnd(uint32_t, void *runPtr)
1350 {
1351 return 0;
1352 }
1353
1354 // -----
1355
1356 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
1357 {
1358 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
1359 }
1360
1361 void gotNewRun(uint32_t runnr, PEVNT_HEADER *headers)
1362 {
1363 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1364 }
1365
1366 // -----
1367
1368 void factOut(int severity, int err, const char *message)
1369 {
1370 EventBuilderWrapper::This->factOut(severity, err, message);
1371 }
1372
1373 void factStat(GUI_STAT stat)
1374 {
1375 EventBuilderWrapper::This->factStat(stat);
1376 }
1377
1378 void factStatNew(EVT_STAT stat)
1379 {
1380 EventBuilderWrapper::This->factStat(stat);
1381 }
1382
1383 // ------
1384
1385 void debugHead(int socket, int/*board*/, void *buf)
1386 {
1387 const uint16_t *ptr = reinterpret_cast<uint16_t*>(buf);
1388 EventBuilderWrapper::This->debugHead(socket, FAD::EventHeader(ptr));
1389
1390 // const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf);
1391 // EventBuilderWrapper::This->debugHead(socket, h);
1392
1393 }
1394
1395 void debugStream(int isock, void *buf, int len)
1396 {
1397 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1398 }
1399
1400 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1401 {
1402 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1403 }
1404}
1405
1406#endif
Note: See TracBrowser for help on using the repository browser.