source: trunk/FACT++/src/EventBuilderWrapper.h@ 12641

Last change on this file since 12641 was 12620, checked in by tbretz, 13 years ago
Removed some stray endl in messages.
File size: 44.9 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 FAD::FileFormat_t fFileFormat;
78
79 uint32_t fMaxRun;
80 uint32_t fLastOpened;
81 uint32_t fLastClosed;
82 uint32_t fNumEvts[4];
83
84 DimWriteStatistics fDimWriteStats;
85 DimDescribedService fDimRuns;
86 DimDescribedService fDimEvents;
87 DimDescribedService fDimRawData;
88 DimDescribedService fDimEventData;
89 DimDescribedService fDimFeedbackData;
90 DimDescribedService fDimFwVersion;
91 DimDescribedService fDimRunNumber;
92 DimDescribedService fDimStatus;
93 DimDescribedService fDimDNA;
94 DimDescribedService fDimTemperature;
95 DimDescribedService fDimPrescaler;
96 DimDescribedService fDimRefClock;
97 DimDescribedService fDimRoi;
98 DimDescribedService fDimDac;
99 DimDescribedService fDimDrsCalibration;
100 DimDescribedService fDimStatistics1;
101 DimDescribedService fDimStatistics2;
102 DimDescribedService fDimFileFormat;
103
104 bool fDebugStream;
105 bool fDebugRead;
106 bool fDebugLog;
107
108 string fPath;
109 uint32_t fRunNumber;
110
111protected:
112 int64_t InitRunNumber()
113 {
114 fRunNumber = 1000;
115
116 // Ensure that the night doesn't change during our check
117 const int check = Time().NightAsInt();
118
119 while (--fRunNumber>0)
120 {
121 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
122
123 if (access((name+"bin").c_str(), F_OK) == 0)
124 break;
125 if (access((name+"fits").c_str(), F_OK) == 0)
126 break;
127 if (access((name+"drs.fits").c_str(), F_OK) == 0)
128 break;
129
130 }
131
132 if (check != Time().NightAsInt())
133 return InitRunNumber();
134
135 fRunNumber++;
136
137 if (fRunNumber==1000)
138 {
139 fMsg.Error("You have a file with run-number 1000 in "+fPath);
140 return -1;
141 }
142
143 ostringstream str;
144 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
145 fMsg.Message(str);
146
147 fMsg.Info(" ==> TODO: Crosscheck with database!");
148
149 return check;
150 }
151
152 int64_t InitRunNumber(const string &path)
153 {
154 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
155 {
156 fMsg.Error("Data path "+path+" does not exist!");
157 return -1;
158 }
159
160 //const fs::path fullPath = fs::system_complete(fs::path(path));
161
162 fPath = path;
163
164 fDimWriteStats.SetCurrentFolder(fPath);
165
166 return InitRunNumber();
167 }
168
169public:
170 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
171 fFileFormat(FAD::kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
172 fDimWriteStats ("FAD_CONTROL", imp),
173 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C",
174 "Run files statistics"
175 "|stats[int]:num. of open run files, min run num., max run num., lastest opened or closed run"
176 "|file[string]:filename of last opened file"),
177 fDimEvents ("FAD_CONTROL/EVENTS", "I:4",
178 "Event counts"
179 "|evtsCount[int]:Num evts cur. run, total (all run), evt ID, trig. Num"),
180 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
181 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
182 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
183 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42",
184 "Firmware version number of fad boards"
185 "|firmware[float]:Version number of firmware, for each board. 40=min, 41=max"),
186 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42",
187 "Run numbers coming from FAD boards"
188 "|runNumbers[int]:current run number of each FAD board. 40=min, 41=max"),
189 fDimStatus ("FAD_CONTROL/STATUS", "S:42",
190 "Status of FAD boards"
191 "|status[bitpattern]:Status of each FAD board. Maybe buggy"),
192 fDimDNA ("FAD_CONTROL/DNA", "X:40",
193 "DNA of FAD boards"
194 "|DNA[hex]:Hex identifier of each FAD board"),
195 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82",
196 "FADs temperatures"
197 "|temp[deg. C]:0 global min, 1-40 min, 41 global max, 42-81 max"),
198 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42",
199 "Trigger generator prescaler of fad boards"
200 "|prescaler[int]:Trigger generator prescaler value, for each board"),
201 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42",
202 "Reference clock of FAD boards"
203 "|refClocks[t]:ref clocks of FAD boards. 40=min, 41=max"),
204 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
205 fDimDac ("FAD_CONTROL/DAC", "S:336",
206 "DAC counts of each FAD board"
207 "|DAC[int]:DAC counts, sequentally DAC 0 board 0, DAC 0 board 1... (plus min max)"),
208 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:1;I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:163840;F:163840", ""),
209 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40",
210 "Event Builder status for GUI display"
211 "|threadInfo[int]:Number of read, proc and writes"
212 "|bufferInfo[int]:Events in buffer, incomp., comp., tot., max past cycle, total"
213 "|memInfo[int]:total buf. mem, used mem, max used, max past cycle"
214 "|EvtCnt[int]:Number of events skipped, written, with errors"
215 "|badRoi[int]:Num boards with wrong ROI in event, run or board"
216 "|badRoiBoard[int]:Num boards with wrong ROI"
217 "|deltaT[ms]:Time in ms for rates"
218 "|rateNew[int]:Number of new start events received"
219 "|numConn[int]:Number of connections per board"
220 "|errConn[int]:IO errors per board"
221 "|rateBytes[int]:Bytes read this cycle"
222 "|totBytes[int]:Bytes read (counter)"),
223 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40",
224 "Event Builder status, events oriented"
225 "|reset[int]:If increased, reset all counters"
226 "|numRead[int]:How often sucessful read from N sockets per loop"
227 "|gotByte[int]:number of bytes read per board"
228 "|gotErr[int]:number of com. errors per board"
229 "|evtStat[int]:number of evts read, completed, with errors, incomplete"
230 "|procStat[int]:num. of evts proc., w probs, acc. or rej. by SW trigger"
231 "|feedStat[int]:number of evts used or rejected by feedback system"
232 "|wrtStat[int]:number of evts written to disk, with errors"
233 "|runStat[int]:number of run opened, closed, with open or close errors"
234 "|numConn[int]:number of sockets successfully opened per board"),
235 fDimFileFormat ("FAD_CONTROL/FILE_FORMAT", "S:1", ""),
236 fDebugStream(false), fDebugRead(false), fDebugLog(false)
237 {
238 if (This)
239 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
240
241 This = this;
242
243 memset(fNumEvts, 0, sizeof(fNumEvts));
244 fDimEvents.Update(fNumEvts);
245
246 for (size_t i=0; i<40; i++)
247 ConnectSlot(i, tcp::endpoint());
248 }
249 virtual ~EventBuilderWrapper()
250 {
251 Abort();
252
253 // FIXME: Used timed_join and abort afterwards
254 // What's the maximum time the eb need to abort?
255 fThread.join();
256 //ffMsg.Info("EventBuilder stopped.");
257
258 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
259 delete *it;
260 }
261
262 set<uint32_t> fIsRunStarted;
263 map<uint32_t, FAD::RunDescription> fExpectedRuns;
264
265 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
266 {
267 if (maxtime<=0 || maxtime>24*60*60)
268 maxtime = 24*60*60;
269 if (maxevt<=0 || maxevt>INT32_MAX)
270 maxevt = INT32_MAX;
271
272 const FAD::RunDescription descr =
273 {
274 uint32_t(maxtime),
275 uint32_t(maxevt),
276 ref.first,
277 ref.second,
278 };
279
280 // FIMXE: Maybe reset an event counter so that the mcp can count events?
281
282 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
283
284 fExpectedRuns[fRunNumber] = descr;
285 fIsRunStarted.insert(fRunNumber);
286 return fRunNumber++;
287 }
288
289 bool IsThreadRunning()
290 {
291 return !fThread.timed_join(boost::posix_time::microseconds(0));
292 }
293
294 void SetMaxMemory(unsigned int mb) const
295 {
296 /*
297 if (mb*1000000<GetUsedMemory())
298 {
299 // ffMsg.Warn("...");
300 return;
301 }*/
302
303 g_maxMem = size_t(mb)*1000000;
304 }
305
306 void StartThread(const vector<tcp::endpoint> &addr)
307 {
308 if (IsThreadRunning())
309 {
310 fMsg.Warn("Start - EventBuilder still running");
311 return;
312 }
313
314 fLastMessage.clear();
315
316 for (size_t i=0; i<40; i++)
317 ConnectSlot(i, addr[i]);
318
319 g_runStat = kModeRun;
320 g_maxProc = 3;
321
322 fMsg.Message("Starting EventBuilder thread");
323
324 fThread = boost::thread(StartEvtBuild);
325 }
326 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
327 {
328 if (i>39)
329 return;
330
331 if (addr==tcp::endpoint())
332 {
333 DisconnectSlot(i);
334 return;
335 }
336
337 g_port[i].sockAddr.sin_family = AF_INET;
338 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
339 g_port[i].sockAddr.sin_port = htons(addr.port());
340 // In this order
341 g_port[i].sockDef = 1;
342 }
343 void DisconnectSlot(unsigned int i)
344 {
345 if (i>39)
346 return;
347
348 g_port[i].sockDef = 0;
349 // In this order
350 g_port[i].sockAddr.sin_family = AF_INET;
351 g_port[i].sockAddr.sin_addr.s_addr = 0;
352 g_port[i].sockAddr.sin_port = 0;
353 }
354 void IgnoreSlot(unsigned int i)
355 {
356 if (i>39)
357 return;
358 if (g_port[i].sockAddr.sin_port==0)
359 return;
360
361 g_port[i].sockDef = -1;
362 }
363
364
365 void Abort()
366 {
367 fMsg.Message("Signal abort to EventBuilder thread...");
368 g_runStat = kAbort;
369 }
370
371 void ResetThread(bool soft)
372 {
373 /*
374 if (g_reset > 0)
375
376 * suspend reading
377 * reset = g_reset;
378 * g_reset=0
379
380 * reset% 10
381 == 0 leave event Buffers as they are
382 == 1 let all buffers drain (write (incomplete) events)
383 > 1 flush all buffers (do not write buffered events)
384
385 * (reset/10)%10
386 > 0 close all sockets and destroy them (also free the
387 allocated read-buffers)
388 recreate before resuming operation
389 [ this is more than just close/open that can be
390 triggered by e.g. close/open the base-socket ]
391
392 * (reset/100)%10
393 > 0 close all open run-files
394
395 * (reset/1000)
396 sleep so many seconds before resuming operation
397 (does not (yet) take into account time left when waiting
398 for buffers getting empty ...)
399
400 * resume_reading
401
402 */
403 fMsg.Message("Signal reset to EventBuilder thread...");
404 g_reset = soft ? 101 : 102;
405 }
406
407 void Exit()
408 {
409 fMsg.Message("Signal exit to EventBuilder thread...");
410 g_runStat = kExit;
411 }
412
413 /*
414 void Wait()
415 {
416 fThread.join();
417 ffMsg.Message("EventBuilder stopped.");
418 }*/
419
420 void Hybernate() const { g_runStat = kHybernate; }
421 void Sleep() const { g_runStat = kSleep; }
422 void FlushMode() const { g_runStat = kModeFlush; }
423 void TestMode() const { g_runStat = kModeTest; }
424 void FlagMode() const { g_runStat = kModeFlag; }
425 void RunMode() const { g_runStat = kModeRun; }
426
427 // FIXME: To be removed
428 void SetMode(int mode) const { g_runStat = mode; }
429
430 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
431 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
432 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
433 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
434 int GetNumFilesOpen() const { return fFiles.size(); }
435
436 /*
437 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
438 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
439 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
440 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
441 */
442
443 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
444 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
445
446 void SetOutputFormat(FAD::FileFormat_t f)
447 {
448 fFileFormat = f;
449 fDimFileFormat.Update(uint16_t(f));
450 if (fFileFormat==FAD::kCalib)
451 {
452 DataCalib::Restart();
453 DataCalib::Update(fDimDrsCalibration);
454 fMsg.Message("Resetted DRS calibration.");
455 }
456 }
457
458 virtual int ResetSecondaryDrsBaseline()
459 {
460 if (DataCalib::ResetTrgOff(fDimDrsCalibration))
461 {
462 fFileFormat = FAD::kCalib;
463 fDimFileFormat.Update(uint16_t(fFileFormat));
464 fMsg.Message("Resetted DRS calibration for secondary baseline.");
465 }
466 else
467 fMsg.Warn("Could not reset DRS calibration of secondary baseline.");
468
469 return 0;
470 }
471
472 void SetDebugLog(bool b) { fDebugLog = b; }
473
474 void SetDebugStream(bool b)
475 {
476 fDebugStream = b;
477 if (b)
478 return;
479
480 for (int i=0; i<40; i++)
481 {
482 if (!fDumpStream[i].is_open())
483 continue;
484
485 fDumpStream[i].close();
486
487 ostringstream name;
488 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
489 fMsg.Message("Closed file '"+name.str()+"'");
490 }
491 }
492
493 void SetDebugRead(bool b)
494 {
495 fDebugRead = b;
496 if (b || !fDumpRead.is_open())
497 return;
498
499 fDumpRead.close();
500 fMsg.Message("Closed file 'socket_events.txt'");
501 }
502
503// size_t GetUsedMemory() const { return gi_usedMem; }
504
505 void LoadDrsCalibration(const char *fname)
506 {
507 if (!DataCalib::ReadFits(fname, fMsg))
508 return;
509 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
510 DataCalib::Update(fDimDrsCalibration);
511 }
512
513 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
514
515
516 /*
517 struct OpenFileToDim
518 {
519 int code;
520 char fileName[FILENAME_MAX];
521 };
522
523 SignalRunOpened(runid, filename);
524 // Send num open files
525 // Send runid, (more info about the run?), filename via dim
526
527 SignalEvtWritten(runid);
528 // Send num events written of newest file
529
530 SignalRunClose(runid);
531 // Send new num open files
532 // Send empty file-name if no file is open
533
534 */
535
536 // -------------- Mapped event builder callbacks ------------------
537
538 void UpdateRuns(const string &fname="")
539 {
540 uint32_t values[5] =
541 {
542 static_cast<uint32_t>(fFiles.size()),
543 0xffffffff,
544 0,
545 fLastOpened,
546 fLastClosed
547 };
548
549 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
550 it!=fFiles.end(); it++)
551 {
552 const DataProcessorImp *file = *it;
553
554 if (file->GetRunId()<values[1])
555 values[1] = file->GetRunId();
556
557 if (file->GetRunId()>values[2])
558 values[2] = file->GetRunId();
559 }
560
561 fMaxRun = values[2];
562
563 vector<char> data(sizeof(values)+fname.size()+1);
564 memcpy(data.data(), values, sizeof(values));
565 strcpy(data.data()+sizeof(values), fname.c_str());
566
567 fDimRuns.Update(data);
568 }
569
570 vector<DataProcessorImp*> fFiles;
571
572 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
573 {
574 fMsg.Info(" ==> TODO: Update run configuration in database!");
575
576 map<uint32_t,FAD::RunDescription>::iterator it = fExpectedRuns.begin();
577 while (it!=fExpectedRuns.end())
578 {
579 if (it->first<runid)
580 {
581 ostringstream str;
582 str << "runOpen - Missed run " << it->first << ".";
583 fMsg.Info(str);
584
585 fExpectedRuns.erase(it++);
586 continue;
587 }
588 if (it->first==runid)
589 break;
590 it++;
591 }
592 if (it==fExpectedRuns.end())
593 {
594 ostringstream str;
595 str << "runOpen - Run " << runid << " wasn't expected.";
596 fMsg.Warn(str);
597 }
598
599 const FAD::RunDescription desc = it->second;
600
601 fExpectedRuns.erase(it);
602
603 // Check if file already exists...
604 DataProcessorImp *file = 0;
605 switch (fFileFormat)
606 {
607 case FAD::kNone: file = new DataDump(fPath, runid, fMsg); break;
608 case FAD::kDebug: file = new DataDebug(fPath, runid, fMsg); break;
609 case FAD::kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
610 case FAD::kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
611 case FAD::kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
612 }
613
614 try
615 {
616 if (!file->Open(h, desc))
617 return 0;
618 }
619 catch (const exception &e)
620 {
621 fMsg.Error("Exception trying to open file: "+string(e.what()));
622 return 0;
623 }
624
625 fFiles.push_back(file);
626
627 ostringstream str;
628 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
629 fMsg.Info(str);
630
631 fDimWriteStats.FileOpened(file->GetFileName());
632
633 fLastOpened = runid;
634 UpdateRuns(file->GetFileName());
635
636 fNumEvts[kEventId] = 0;
637 fNumEvts[kTriggerId] = 0;
638
639 fNumEvts[kCurrent] = 0;
640 fDimEvents.Update(fNumEvts);
641 // fDimCurrentEvent.Update(uint32_t(0));
642
643 return reinterpret_cast<FileHandle_t>(file);
644 }
645
646 int runWrite(FileHandle_t handler, EVENT *e, size_t sz)
647 {
648 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
649
650 if (!file->WriteEvt(e))
651 return -1;
652
653 if (file->GetRunId()==fMaxRun)
654 {
655 fNumEvts[kCurrent]++;
656 fNumEvts[kEventId] = e->EventNum;
657 fNumEvts[kTriggerId] = e->TriggerNum;
658 }
659
660 fNumEvts[kTotal]++;
661
662 static Time oldt(boost::date_time::neg_infin);
663 Time newt;
664 if (newt>oldt+boost::posix_time::seconds(1))
665 {
666 fDimEvents.Update(fNumEvts);
667 oldt = newt;
668 }
669
670
671 // ===> SignalEvtWritten(runid);
672 // Send num events written of newest file
673
674 /* close run runId (all all runs if runId=0) */
675 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
676 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
677
678 return 0;
679 }
680
681 virtual void CloseRun(uint32_t runid) { }
682
683 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
684 {
685 fMsg.Info(" ==> TODO: Update run configuration in database!");
686
687 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
688
689 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
690 if (it==fFiles.end())
691 {
692 ostringstream str;
693 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
694 fMsg.Fatal(str);
695 return -1;
696 }
697
698 fFiles.erase(it);
699
700 fLastClosed = file->GetRunId();
701 CloseRun(fLastClosed);
702 UpdateRuns();
703
704 fDimEvents.Update(fNumEvts);
705
706 const bool rc = file->Close(tail);
707 if (!rc)
708 {
709 // Error message
710 }
711
712 ostringstream str;
713 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
714 fMsg.Info(str);
715
716 delete file;
717
718 // ==> SignalRunClose(runid);
719 // Send new num open files
720 // Send empty file-name if no file is open
721
722 return rc ? 0 : -1;
723 }
724
725 ofstream fDumpStream[40];
726
727 void debugStream(int isock, void *buf, int len)
728 {
729 if (!fDebugStream)
730 return;
731
732 const int slot = isock/7;
733 if (slot<0 || slot>39)
734 return;
735
736 if (!fDumpStream[slot].is_open())
737 {
738 ostringstream name;
739 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
740
741 fDumpStream[slot].open(name.str().c_str(), ios::app);
742 if (!fDumpStream[slot])
743 {
744 ostringstream str;
745 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
746 fMsg.Error(str);
747
748 return;
749 }
750
751 fMsg.Message("Opened file '"+name.str()+"' for writing.");
752 }
753
754 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
755 }
756
757 ofstream fDumpRead; // Stream to possibly dump docket events
758
759 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
760 {
761 // isock = socketID (0-279)
762 // ibyte = #bytes gelesen
763 // event = eventId (oder 0 wenn noch nicht bekannt)
764 // state : 1=finished reading data
765 // 0=reading data
766 // -1=start reading data (header)
767 // -2=start reading data,
768 // eventId not known yet (too little data)
769 // tsec, tusec = time when reading seconds, microseconds
770 //
771 if (!fDebugRead || ibyte==0)
772 return;
773
774 if (!fDumpRead.is_open())
775 {
776 fDumpRead.open("socket_events.txt", ios::app);
777 if (!fDumpRead)
778 {
779 ostringstream str;
780 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
781 fMsg.Error(str);
782
783 return;
784 }
785
786 fMsg.Message("Opened file 'socket_events.txt' for writing.");
787
788 fDumpRead << "# START: " << Time().GetAsStr() << endl;
789 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
790 }
791
792 fDumpRead
793 << setw(2) << state << " "
794 << setw(8) << tsec << " "
795 << setw(9) << tusec << " "
796 << setw(3) << isock << " "
797 << setw(2) << isock/7 << " "
798 << runno << " "
799 << event << " "
800 << ftmevt << " "
801 << ibyte << endl;
802 }
803
804 array<uint16_t,2> fVecRoi;
805
806 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int iboard)
807 {
808 /*
809 fadhd[i] ist ein array mit den 40 fad-headers
810 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
811
812 event ist die Struktur, die auch die write routine erhaelt;
813 darin sind im header die 'soll-werte' fuer z.B. eventID
814 als auch die ADC-Werte (falls Du die brauchst)
815
816 Wenn die routine einen negativen Wert liefert, wird das event
817 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
818 */
819
820 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
821
822 if (roi!=fVecRoi)
823 {
824 Update(fDimRoi, roi);
825 fVecRoi = roi;
826 }
827
828 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
829 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
830
831 // FIMXE: Compare with target configuration
832
833 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
834 {
835 // FIXME: Compare with expectations!!!
836 if (ptr->fStartDelimiter==0)
837 {
838 if (ptr==beg)
839 beg++;
840 continue;
841 }
842
843 if (beg->fStatus != ptr->fStatus)
844 {
845 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
846 CloseRunFile(runNr, 0, 0);
847 return -1;
848 }
849
850 if (beg->fRunNumber != ptr->fRunNumber)
851 {
852 fMsg.Error("Inconsistent run number detected.... closing run.");
853 CloseRunFile(runNr, 0, 0);
854 return -1;
855 }
856
857 /*
858 if (beg->fVersion != ptr->fVersion)
859 {
860 Error("Inconsist firmware version detected.... closing run.");
861 CloseRunFile(runNr, 0, 0);
862 break;
863 }
864 if (beg->fEventCounter != ptr->fEventCounter)
865 {
866 Error("Inconsist run number detected.... closing run.");
867 CloseRunFile(runNr, 0, 0);
868 break;
869 }
870 if (beg->fTriggerCounter != ptr->fTriggerCounter)
871 {
872 Error("Inconsist trigger number detected.... closing run.");
873 CloseRunFile(runNr, 0, 0);
874 break;
875 }*/
876
877 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
878 {
879 fMsg.Error("Inconsistent phase shift detected.... closing run.");
880 CloseRunFile(runNr, 0, 0);
881 return -1;
882 }
883
884 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
885 {
886 fMsg.Error("Inconsistent DAC values detected.... closing run.");
887 CloseRunFile(runNr, 0, 0);
888 return -1;
889 }
890
891 if (beg->fTriggerType != ptr->fTriggerType)
892 {
893 fMsg.Error("Inconsistent trigger type detected.... closing run.");
894 CloseRunFile(runNr, 0, 0);
895 return -1;
896 }
897 }
898
899 // check REFCLK_frequency
900 // check consistency with command configuration
901 // how to log errors?
902 // need gotNewRun/closedRun to know it is finished
903
904 return 0;
905 }
906
907 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
908 {
909 // Currently we send any event no matter what its trigger id is...
910 // To be changed.
911 static Time oldt(boost::date_time::neg_infin);
912 Time newt;
913
914 // FIXME: Only send events if the have newer run-numbers
915 if (newt<oldt+boost::posix_time::seconds(1))
916 return;
917
918 oldt = newt;
919
920 vector<char> data(sizeof(EVENT)+event->Roi*sizeof(float)*(1440+160));
921 memcpy(data.data(), event, sizeof(EVENT));
922
923 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
924
925 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
926 fDimRawData.Update(data);
927
928 DrsCalibrate::RemoveSpikes(vec, event->Roi);
929
930 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
931 DrsCalibrate::GetPixelStats(data2.data(), vec, event->Roi);
932
933 fDimEventData.Update(data2);
934 }
935
936 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
937 {
938 if (!DataCalib::IsValid())
939 return;
940
941 // Workaround to find a valid header.....
942 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
943 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
944
945 // FIMXE: Compare with target configuration
946
947 const FAD::EventHeader *ptr=beg;
948 for (; ptr!=end; ptr++)
949 {
950 if (ptr->fStartDelimiter==0)
951 continue;
952
953 if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
954 return;
955 }
956
957 if (ptr->fStartDelimiter==0)
958 return;
959
960 vector<float> data(event->Roi*1440);
961 DataCalib::Apply(data.data(), event->Adc_Data, event->StartPix, event->Roi);
962
963 DrsCalibrate::RemoveSpikes(data.data(), event->Roi);
964
965 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
966 DrsCalibrate::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
967
968 fDimFeedbackData.Update(data2);
969 }
970
971 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t iboard, void */*buffer*/)
972 {
973 switch (threadID)
974 {
975 case 0:
976 SendRawData(fadhd, event);
977 return 1;
978 case 1:
979 SendFeedbackData(fadhd, event);
980 return 2;
981 }
982 return 100;
983 }
984
985
986 bool IsRunStarted() const
987 {
988 const set<uint32_t>::const_iterator it = fIsRunStarted.find(fRunNumber-1);
989 return it==fIsRunStarted.end();// ? true : it->second.started;
990 }
991
992 uint32_t GetRunNumber() const
993 {
994 return fRunNumber;
995 }
996
997 void IncreaseRunNumber(uint32_t run)
998 {
999 if (run>fRunNumber)
1000 fRunNumber = run;
1001 }
1002
1003 void gotNewRun(uint32_t runnr, PEVNT_HEADER */*headers*/)
1004 {
1005 // This function is called even when writing is switched off
1006 set<uint32_t>::iterator it = fIsRunStarted.begin();
1007 while (it!=fIsRunStarted.end())
1008 {
1009 if (*it<runnr)
1010 {
1011 ostringstream str;
1012 str << "gotNewRun - Missed run " << *it << ".";
1013 fMsg.Info(str);
1014
1015 fIsRunStarted.erase(it++);
1016 continue;
1017 }
1018 if (*it==runnr)
1019 break;
1020 it++;
1021 }
1022 if (it==fIsRunStarted.end())
1023 {
1024 ostringstream str;
1025 str << "gotNewRun - Not waiting for run " << runnr << ".";
1026 fMsg.Warn(str);
1027 return;
1028 }
1029
1030 map<uint32_t,FAD::RunDescription>::iterator i2 = fExpectedRuns.find(runnr);
1031 if (i2==fExpectedRuns.end())
1032 {
1033 ostringstream str;
1034 str << "gotNewRun - Run " << runnr << " wasn't expected.";
1035 fMsg.Warn(str);
1036 return;
1037 }
1038
1039 CloseRunFile(runnr, time(NULL)+i2->second.maxtime, i2->second.maxevt);
1040 // return: 0=close scheduled / >0 already closed / <0 does not exist
1041
1042 // FIXME: Move configuration from expected runs to runs which will soon
1043 // be opened/closed
1044
1045 fIsRunStarted.erase(it);
1046 }
1047
1048 map<boost::thread::id, string> fLastMessage;
1049
1050 void factOut(int severity, int err, const char *message)
1051 {
1052 if (!fDebugLog && severity==99)
1053 return;
1054
1055 ostringstream str;
1056 //str << boost::this_thread::get_id() << " ";
1057 str << "EventBuilder(";
1058 if (err<0)
1059 str << "---";
1060 else
1061 str << err;
1062 str << "): " << message;
1063
1064 string &old = fLastMessage[boost::this_thread::get_id()];
1065
1066 if (str.str()==old)
1067 return;
1068 old = str.str();
1069
1070 fMsg.Update(str, severity);
1071 }
1072/*
1073 void factStat(int64_t *stat, int len)
1074 {
1075 if (len!=7)
1076 {
1077 fMsg.Warn("factStat received unknown number of values.");
1078 return;
1079 }
1080
1081 vector<int64_t> data(1, g_maxMem);
1082 data.insert(data.end(), stat, stat+len);
1083
1084 static vector<int64_t> last(8);
1085 if (data==last)
1086 return;
1087 last = data;
1088
1089 fDimStatistics.Update(data);
1090
1091 // len ist die Laenge des arrays.
1092 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
1093 // kannst Du pruefen, ob die 100MB voll sind ....
1094
1095 ostringstream str;
1096 str
1097 << "Wait=" << stat[0] << " "
1098 << "Skip=" << stat[1] << " "
1099 << "Del=" << stat[2] << " "
1100 << "Tot=" << stat[3] << " "
1101 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1102 << "Read=" << stat[5] << " "
1103 << "Conn=" << stat[6];
1104
1105 fMsg.Info(str);
1106 }
1107 */
1108
1109 void factStat(const EVT_STAT &stat)
1110 {
1111 fDimStatistics2.Update(stat);
1112 }
1113
1114 void factStat(const GUI_STAT &stat)
1115 {
1116 fDimStatistics1.Update(stat);
1117 }
1118
1119
1120 array<FAD::EventHeader, 40> fVecHeader;
1121
1122 template<typename T, class S>
1123 array<T, 42> Compare(const S *vec, const T *t)
1124 {
1125 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1126
1127 const T *min = NULL;
1128 const T *val = NULL;
1129 const T *max = NULL;
1130
1131 array<T, 42> arr;
1132
1133 bool rc = true;
1134 for (int i=0; i<40; i++)
1135 {
1136 const char *base = reinterpret_cast<const char*>(vec+i);
1137 const T *ref = reinterpret_cast<const T*>(base+offset);
1138
1139 arr[i] = *ref;
1140
1141 if (gi_NumConnect[i]!=7)
1142 {
1143 arr[i] = 0;
1144 continue;
1145 }
1146
1147 if (!val)
1148 {
1149 min = ref;
1150 val = ref;
1151 max = ref;
1152 }
1153
1154 if (*ref<*min)
1155 min = ref;
1156
1157 if (*ref>*max)
1158 max = ref;
1159
1160 if (*val!=*ref)
1161 rc = false;
1162 }
1163
1164 arr[40] = val ? *min : 1;
1165 arr[41] = val ? *max : 0;
1166
1167 return arr;
1168 }
1169
1170 template<typename T>
1171 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1172 {
1173 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1174
1175 T val = 0;
1176 T rc = 0;
1177
1178 array<T, 42> vec;
1179
1180 bool first = true;
1181
1182 for (int i=0; i<40; i++)
1183 {
1184 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1185 const T *ref = reinterpret_cast<const T*>(base+offset);
1186
1187 vec[i+2] = *ref;
1188
1189 if (gi_NumConnect[i]!=7)
1190 {
1191 vec[i+2] = 0;
1192 continue;
1193 }
1194
1195 if (first)
1196 {
1197 first = false;
1198 val = *ref;
1199 rc = 0;
1200 }
1201
1202 rc |= val^*ref;
1203 }
1204
1205 vec[0] = rc;
1206 vec[1] = val;
1207
1208 return vec;
1209 }
1210
1211 template<typename T, size_t N>
1212 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1213 {
1214// svc.setQuality(vec[40]<=vec[41]);
1215 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1216 svc.Update();
1217 }
1218
1219 template<typename T>
1220 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1221 {
1222 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1223 for (int i=0; i<40;i++)
1224 cout << " " << data.second[i+3];
1225 cout << endl;
1226 }
1227
1228 vector<uint> fNumConnected;
1229
1230 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1231 {
1232 const uint16_t id = h.Id();
1233 if (id>39)
1234 return;
1235
1236 if (fNumConnected.size()!=40)
1237 fNumConnected.resize(40);
1238
1239 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1240
1241 const bool changed = con!=fNumConnected || !IsThreadRunning();
1242
1243 fNumConnected = con;
1244
1245 const FAD::EventHeader old = fVecHeader[id];
1246 fVecHeader[id] = h;
1247
1248 if (old.fVersion != h.fVersion || changed)
1249 {
1250 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1251
1252 array<float,42> data;
1253 for (int i=0; i<42; i++)
1254 {
1255 ostringstream str;
1256 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1257 data[i] = stof(str.str());
1258 }
1259 Update(fDimFwVersion, data);
1260 }
1261
1262 if (old.fRunNumber != h.fRunNumber || changed)
1263 {
1264 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1265 fDimRunNumber.Update(run);
1266 }
1267
1268 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1269 {
1270 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1271 fDimPrescaler.Update(pre);
1272 }
1273
1274 if (old.fDNA != h.fDNA || changed)
1275 {
1276 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1277 Update(fDimDNA, dna, 40);
1278 }
1279
1280 if (old.fStatus != h.fStatus || changed)
1281 {
1282 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1283 Update(fDimStatus, sts);
1284 }
1285
1286 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1287 {
1288 array<uint16_t, FAD::kNumDac*42> dacs;
1289
1290 for (int i=0; i<FAD::kNumDac; i++)
1291 {
1292 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1293 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1294 }
1295
1296 Update(fDimDac, dacs);
1297 }
1298
1299 // -----------
1300
1301 static Time oldt(boost::date_time::neg_infin);
1302 Time newt;
1303
1304 if (newt>oldt+boost::posix_time::seconds(1))
1305 {
1306 oldt = newt;
1307
1308 // --- RefClock
1309
1310 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1311 Update(fDimRefClock, clk);
1312
1313 // --- Temperatures
1314
1315 const array<int16_t,42> tmp[4] =
1316 {
1317 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1318 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1319 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1320 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1321 };
1322
1323 vector<int16_t> data;
1324 data.reserve(82);
1325 data.push_back(tmp[0][40]); // min: 0
1326 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1327 data.push_back(tmp[0][41]); // max: 41
1328 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1329
1330 for (int j=1; j<=3; j++)
1331 {
1332 const array<int16_t,42> &ref = tmp[j];
1333
1334 // Gloabl min
1335 if (ref[40]<data[0]) // 40=min
1336 data[0] = ref[40];
1337
1338 // Global max
1339 if (ref[41]>data[41]) // 41=max
1340 data[41] = ref[41];
1341
1342 for (int i=0; i<40; i++)
1343 {
1344 // min per board
1345 if (ref[i]<data[i+1]) // data: 1-40
1346 data[i+1] = ref[i]; // ref: 0-39
1347
1348 // max per board
1349 if (ref[i]>data[i+42]) // data: 42-81
1350 data[i+42] = ref[i]; // ref: 0-39
1351 }
1352
1353
1354 }
1355
1356 vector<float> deg(82); // 0: global min, 1-40: min
1357 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1358 deg[i] = data[i]/16.;
1359 fDimTemperature.Update(deg);
1360 }
1361 }
1362};
1363
1364EventBuilderWrapper *EventBuilderWrapper::This = 0;
1365
1366// ----------- Event builder callbacks implementation ---------------
1367extern "C"
1368{
1369 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1370 {
1371 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1372 }
1373
1374 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1375 {
1376 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1377 }
1378
1379 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1380 {
1381 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1382 }
1383
1384 // -----
1385
1386 void *runStart(uint32_t irun, RUN_HEAD *runhd, size_t len)
1387 {
1388 return NULL;
1389 }
1390
1391 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
1392 {
1393 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
1394 }
1395
1396 int runEnd(uint32_t, void *runPtr)
1397 {
1398 return 0;
1399 }
1400
1401 // -----
1402
1403 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
1404 {
1405 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
1406 }
1407
1408 void gotNewRun(uint32_t runnr, PEVNT_HEADER *headers)
1409 {
1410 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1411 }
1412
1413 // -----
1414
1415 void factOut(int severity, int err, const char *message)
1416 {
1417 EventBuilderWrapper::This->factOut(severity, err, message);
1418 }
1419
1420 void factStat(GUI_STAT stat)
1421 {
1422 EventBuilderWrapper::This->factStat(stat);
1423 }
1424
1425 void factStatNew(EVT_STAT stat)
1426 {
1427 EventBuilderWrapper::This->factStat(stat);
1428 }
1429
1430 // ------
1431
1432 void debugHead(int socket, int/*board*/, void *buf)
1433 {
1434 const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf);
1435 EventBuilderWrapper::This->debugHead(socket, h);
1436 }
1437
1438 void debugStream(int isock, void *buf, int len)
1439 {
1440 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1441 }
1442
1443 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1444 {
1445 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1446 }
1447}
1448
1449#endif
Note: See TracBrowser for help on using the repository browser.