source: trunk/FACT++/src/EventBuilderWrapper.h@ 12905

Last change on this file since 12905 was 12831, checked in by tbretz, 13 years ago
Removed variables which were defined but never really used in any reasonable context
File size: 45.0 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 FAD::FileFormat_t fFileFormat;
78
79 uint32_t fMaxRun;
80 uint32_t fLastOpened;
81 uint32_t fLastClosed;
82 uint32_t fNumEvts[4];
83
84 DimWriteStatistics fDimWriteStats;
85 DimDescribedService fDimRuns;
86 DimDescribedService fDimEvents;
87 DimDescribedService fDimRawData;
88 DimDescribedService fDimEventData;
89 DimDescribedService fDimFeedbackData;
90 DimDescribedService fDimFwVersion;
91 DimDescribedService fDimRunNumber;
92 DimDescribedService fDimStatus;
93 DimDescribedService fDimDNA;
94 DimDescribedService fDimTemperature;
95 DimDescribedService fDimPrescaler;
96 DimDescribedService fDimRefClock;
97 DimDescribedService fDimRoi;
98 DimDescribedService fDimDac;
99 DimDescribedService fDimDrsCalibration;
100 DimDescribedService fDimStatistics1;
101 DimDescribedService fDimStatistics2;
102 DimDescribedService fDimFileFormat;
103
104 bool fDebugStream;
105 bool fDebugRead;
106 bool fDebugLog;
107
108 string fPath;
109 uint32_t fRunNumber;
110
111protected:
112 int64_t InitRunNumber()
113 {
114 fRunNumber = 1000;
115
116 // Ensure that the night doesn't change during our check
117 const int check = Time().NightAsInt();
118
119 while (--fRunNumber>0)
120 {
121 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
122
123 if (access((name+"bin").c_str(), F_OK) == 0)
124 break;
125 if (access((name+"fits").c_str(), F_OK) == 0)
126 break;
127 if (access((name+"drs.fits").c_str(), F_OK) == 0)
128 break;
129
130 }
131
132 if (check != Time().NightAsInt())
133 return InitRunNumber();
134
135 fRunNumber++;
136
137 if (fRunNumber==1000)
138 {
139 fMsg.Error("You have a file with run-number 1000 in "+fPath);
140 return -1;
141 }
142
143 ostringstream str;
144 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
145 fMsg.Message(str);
146
147 fMsg.Info(" ==> TODO: Crosscheck with database!");
148
149 return check;
150 }
151
152 int64_t InitRunNumber(const string &path)
153 {
154 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
155 {
156 fMsg.Error("Data path "+path+" does not exist!");
157 return -1;
158 }
159
160 //const fs::path fullPath = fs::system_complete(fs::path(path));
161
162 fPath = path;
163
164 fDimWriteStats.SetCurrentFolder(fPath);
165
166 return InitRunNumber();
167 }
168
169public:
170 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
171 fFileFormat(FAD::kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
172 fDimWriteStats ("FAD_CONTROL", imp),
173 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C",
174 "Run files statistics"
175 "|stats[int]:num. of open run files, min run num., max run num., lastest opened or closed run"
176 "|file[string]:filename of last opened file"),
177 fDimEvents ("FAD_CONTROL/EVENTS", "I:4",
178 "Event counts"
179 "|evtsCount[int]:Num evts cur. run, total (all run), evt ID, trig. Num"),
180 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
181 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
182 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
183 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42",
184 "Firmware version number of fad boards"
185 "|firmware[float]:Version number of firmware, for each board. 40=min, 41=max"),
186 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42",
187 "Run numbers coming from FAD boards"
188 "|runNumbers[int]:current run number of each FAD board. 40=min, 41=max"),
189 fDimStatus ("FAD_CONTROL/STATUS", "S:42",
190 "Status of FAD boards"
191 "|status[bitpattern]:Status of each FAD board. Maybe buggy"),
192 fDimDNA ("FAD_CONTROL/DNA", "X:40",
193 "DNA of FAD boards"
194 "|DNA[hex]:Hex identifier of each FAD board"),
195 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82",
196 "FADs temperatures"
197 "|temp[deg. C]:0 global min, 1-40 min, 41 global max, 42-81 max"),
198 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42",
199 "Trigger generator prescaler of fad boards"
200 "|prescaler[int]:Trigger generator prescaler value, for each board"),
201 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42",
202 "Reference clock of FAD boards"
203 "|refClocks[t]:ref clocks of FAD boards. 40=min, 41=max"),
204 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
205 fDimDac ("FAD_CONTROL/DAC", "S:336",
206 "DAC counts of each FAD board"
207 "|DAC[int]:DAC counts, sequentally DAC 0 board 0, DAC 0 board 1... (plus min max)"),
208 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:1;I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:163840;F:163840", ""),
209 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40",
210 "Event Builder status for GUI display"
211 "|threadInfo[int]:Number of read, proc and writes"
212 "|bufferInfo[int]:Events in buffer, incomp., comp., tot., max past cycle, total"
213 "|memInfo[int]:total buf. mem, used mem, max used, max past cycle"
214 "|EvtCnt[int]:Number of events skipped, written, with errors"
215 "|badRoi[int]:Num boards with wrong ROI in event, run or board"
216 "|badRoiBoard[int]:Num boards with wrong ROI"
217 "|deltaT[ms]:Time in ms for rates"
218 "|rateNew[int]:Number of new start events received"
219 "|numConn[int]:Number of connections per board"
220 "|errConn[int]:IO errors per board"
221 "|rateBytes[int]:Bytes read this cycle"
222 "|totBytes[int]:Bytes read (counter)"),
223 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40",
224 "Event Builder status, events oriented"
225 "|reset[int]:If increased, reset all counters"
226 "|numRead[int]:How often sucessful read from N sockets per loop"
227 "|gotByte[int]:number of bytes read per board"
228 "|gotErr[int]:number of com. errors per board"
229 "|evtStat[int]:number of evts read, completed, with errors, incomplete"
230 "|procStat[int]:num. of evts proc., w probs, acc. or rej. by SW trigger"
231 "|feedStat[int]:number of evts used or rejected by feedback system"
232 "|wrtStat[int]:number of evts written to disk, with errors"
233 "|runStat[int]:number of run opened, closed, with open or close errors"
234 "|numConn[int]:number of sockets successfully opened per board"),
235 fDimFileFormat ("FAD_CONTROL/FILE_FORMAT", "S:1", ""),
236 fDebugStream(false), fDebugRead(false), fDebugLog(false)
237 {
238 if (This)
239 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
240
241 This = this;
242
243 memset(fNumEvts, 0, sizeof(fNumEvts));
244 fDimEvents.Update(fNumEvts);
245
246 for (size_t i=0; i<40; i++)
247 ConnectSlot(i, tcp::endpoint());
248 }
249 virtual ~EventBuilderWrapper()
250 {
251 Abort();
252
253 // FIXME: Used timed_join and abort afterwards
254 // What's the maximum time the eb need to abort?
255 fThread.join();
256 //ffMsg.Info("EventBuilder stopped.");
257
258 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
259 delete *it;
260 }
261
262 set<uint32_t> fIsRunStarted;
263 map<uint32_t, FAD::RunDescription> fExpectedRuns;
264
265 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
266 {
267 if (maxtime<=0 || maxtime>24*60*60)
268 maxtime = 24*60*60;
269 if (maxevt<=0 || maxevt>INT32_MAX)
270 maxevt = INT32_MAX;
271
272 const FAD::RunDescription descr =
273 {
274 uint32_t(maxtime),
275 uint32_t(maxevt),
276 ref.first,
277 ref.second,
278 };
279
280 // FIMXE: Maybe reset an event counter so that the mcp can count events?
281
282 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
283
284 fExpectedRuns[fRunNumber] = descr;
285 fIsRunStarted.insert(fRunNumber);
286 return fRunNumber++;
287 }
288
289 bool IsThreadRunning()
290 {
291 return !fThread.timed_join(boost::posix_time::microseconds(0));
292 }
293
294 void SetMaxMemory(unsigned int mb) const
295 {
296 /*
297 if (mb*1000000<GetUsedMemory())
298 {
299 // ffMsg.Warn("...");
300 return;
301 }*/
302
303 g_maxMem = size_t(mb)*1000000;
304 }
305
306 void StartThread(const vector<tcp::endpoint> &addr)
307 {
308 if (IsThreadRunning())
309 {
310 fMsg.Warn("Start - EventBuilder still running");
311 return;
312 }
313
314 fLastMessage.clear();
315
316 for (size_t i=0; i<40; i++)
317 ConnectSlot(i, addr[i]);
318
319 g_runStat = kModeRun;
320 g_maxProc = 3;
321
322 fMsg.Message("Starting EventBuilder thread");
323
324 fThread = boost::thread(StartEvtBuild);
325 }
326 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
327 {
328 if (i>39)
329 return;
330
331 if (addr==tcp::endpoint())
332 {
333 DisconnectSlot(i);
334 return;
335 }
336
337 g_port[i].sockAddr.sin_family = AF_INET;
338 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
339 g_port[i].sockAddr.sin_port = htons(addr.port());
340 // In this order
341 g_port[i].sockDef = 1;
342 }
343 void DisconnectSlot(unsigned int i)
344 {
345 if (i>39)
346 return;
347
348 g_port[i].sockDef = 0;
349 // In this order
350 g_port[i].sockAddr.sin_family = AF_INET;
351 g_port[i].sockAddr.sin_addr.s_addr = 0;
352 g_port[i].sockAddr.sin_port = 0;
353 }
354 void IgnoreSlot(unsigned int i)
355 {
356 if (i>39)
357 return;
358 if (g_port[i].sockAddr.sin_port==0)
359 return;
360
361 g_port[i].sockDef = -1;
362 }
363
364
365 void Abort()
366 {
367 fMsg.Message("Signal abort to EventBuilder thread...");
368 g_runStat = kAbort;
369 }
370
371 void ResetThread(bool soft)
372 {
373 /*
374 if (g_reset > 0)
375
376 * suspend reading
377 * reset = g_reset;
378 * g_reset=0
379
380 * reset% 10
381 == 0 leave event Buffers as they are
382 == 1 let all buffers drain (write (incomplete) events)
383 > 1 flush all buffers (do not write buffered events)
384
385 * (reset/10)%10
386 > 0 close all sockets and destroy them (also free the
387 allocated read-buffers)
388 recreate before resuming operation
389 [ this is more than just close/open that can be
390 triggered by e.g. close/open the base-socket ]
391
392 * (reset/100)%10
393 > 0 close all open run-files
394
395 * (reset/1000)
396 sleep so many seconds before resuming operation
397 (does not (yet) take into account time left when waiting
398 for buffers getting empty ...)
399
400 * resume_reading
401
402 */
403 fMsg.Message("Signal reset to EventBuilder thread...");
404 g_reset = soft ? 101 : 102;
405 }
406
407 void Exit()
408 {
409 fMsg.Message("Signal exit to EventBuilder thread...");
410 g_runStat = kExit;
411 }
412
413 /*
414 void Wait()
415 {
416 fThread.join();
417 ffMsg.Message("EventBuilder stopped.");
418 }*/
419
420 void Hybernate() const { g_runStat = kHybernate; }
421 void Sleep() const { g_runStat = kSleep; }
422 void FlushMode() const { g_runStat = kModeFlush; }
423 void TestMode() const { g_runStat = kModeTest; }
424 void FlagMode() const { g_runStat = kModeFlag; }
425 void RunMode() const { g_runStat = kModeRun; }
426
427 // FIXME: To be removed
428 void SetMode(int mode) const { g_runStat = mode; }
429
430 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
431 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
432 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
433 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
434 int GetNumFilesOpen() const { return fFiles.size(); }
435
436 /*
437 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
438 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
439 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
440 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
441 */
442
443 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
444 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
445
446 void SetOutputFormat(FAD::FileFormat_t f)
447 {
448 fFileFormat = f;
449 fDimFileFormat.Update(uint16_t(f));
450 if (fFileFormat==FAD::kCalib)
451 {
452 DataCalib::Restart();
453 DataCalib::Update(fDimDrsCalibration);
454 fMsg.Message("Resetted DRS calibration.");
455 }
456 }
457
458 virtual int ResetSecondaryDrsBaseline()
459 {
460 if (DataCalib::ResetTrgOff(fDimDrsCalibration))
461 {
462 fFileFormat = FAD::kCalib;
463 fDimFileFormat.Update(uint16_t(fFileFormat));
464 fMsg.Message("Resetted DRS calibration for secondary baseline.");
465 }
466 else
467 fMsg.Warn("Could not reset DRS calibration of secondary baseline.");
468
469 return 0;
470 }
471
472 void SetDebugLog(bool b) { fDebugLog = b; }
473
474 void SetDebugStream(bool b)
475 {
476 fDebugStream = b;
477 if (b)
478 return;
479
480 for (int i=0; i<40; i++)
481 {
482 if (!fDumpStream[i].is_open())
483 continue;
484
485 fDumpStream[i].close();
486
487 ostringstream name;
488 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
489 fMsg.Message("Closed file '"+name.str()+"'");
490 }
491 }
492
493 void SetDebugRead(bool b)
494 {
495 fDebugRead = b;
496 if (b || !fDumpRead.is_open())
497 return;
498
499 fDumpRead.close();
500 fMsg.Message("Closed file 'socket_events.txt'");
501 }
502
503// size_t GetUsedMemory() const { return gi_usedMem; }
504
505 void LoadDrsCalibration(const char *fname)
506 {
507 if (!DataCalib::ReadFits(fname, fMsg))
508 return;
509 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
510 DataCalib::Update(fDimDrsCalibration);
511 }
512
513 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
514
515
516 /*
517 struct OpenFileToDim
518 {
519 int code;
520 char fileName[FILENAME_MAX];
521 };
522
523 SignalRunOpened(runid, filename);
524 // Send num open files
525 // Send runid, (more info about the run?), filename via dim
526
527 SignalEvtWritten(runid);
528 // Send num events written of newest file
529
530 SignalRunClose(runid);
531 // Send new num open files
532 // Send empty file-name if no file is open
533
534 */
535
536 // -------------- Mapped event builder callbacks ------------------
537
538 void UpdateRuns(const string &fname="")
539 {
540 uint32_t values[5] =
541 {
542 static_cast<uint32_t>(fFiles.size()),
543 0xffffffff,
544 0,
545 fLastOpened,
546 fLastClosed
547 };
548
549 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
550 it!=fFiles.end(); it++)
551 {
552 const DataProcessorImp *file = *it;
553
554 if (file->GetRunId()<values[1])
555 values[1] = file->GetRunId();
556
557 if (file->GetRunId()>values[2])
558 values[2] = file->GetRunId();
559 }
560
561 fMaxRun = values[2];
562
563 vector<char> data(sizeof(values)+fname.size()+1);
564 memcpy(data.data(), values, sizeof(values));
565 strcpy(data.data()+sizeof(values), fname.c_str());
566
567 fDimRuns.Update(data);
568 }
569
570 vector<DataProcessorImp*> fFiles;
571
572 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
573 {
574 fMsg.Info(" ==> TODO: Update run configuration in database!");
575
576 map<uint32_t,FAD::RunDescription>::iterator it = fExpectedRuns.begin();
577 while (it!=fExpectedRuns.end())
578 {
579 if (it->first<runid)
580 {
581 ostringstream str;
582 str << "runOpen - Missed run " << it->first << ".";
583 fMsg.Info(str);
584
585 fExpectedRuns.erase(it++);
586 continue;
587 }
588 if (it->first==runid)
589 break;
590 it++;
591 }
592 if (it==fExpectedRuns.end())
593 {
594 ostringstream str;
595 str << "runOpen - Run " << runid << " wasn't expected.";
596 fMsg.Warn(str);
597 }
598
599 const FAD::RunDescription desc = it->second;
600
601 fExpectedRuns.erase(it);
602
603 // Check if file already exists...
604 DataProcessorImp *file = 0;
605 switch (fFileFormat)
606 {
607 case FAD::kNone: file = new DataDump(fPath, runid, fMsg); break;
608 case FAD::kDebug: file = new DataDebug(fPath, runid, fMsg); break;
609 case FAD::kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
610 case FAD::kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
611 case FAD::kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
612 }
613
614 try
615 {
616 if (!file->Open(h, desc))
617 return 0;
618 }
619 catch (const exception &e)
620 {
621 fMsg.Error("Exception trying to open file: "+string(e.what()));
622 return 0;
623 }
624
625 fFiles.push_back(file);
626
627 ostringstream str;
628 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
629 fMsg.Info(str);
630
631 fDimWriteStats.FileOpened(file->GetFileName());
632
633 fLastOpened = runid;
634 UpdateRuns(file->GetFileName());
635
636 fNumEvts[kEventId] = 0;
637 fNumEvts[kTriggerId] = 0;
638
639 fNumEvts[kCurrent] = 0;
640 fDimEvents.Update(fNumEvts);
641 // fDimCurrentEvent.Update(uint32_t(0));
642
643 return reinterpret_cast<FileHandle_t>(file);
644 }
645
646 int runWrite(FileHandle_t handler, EVENT *e, size_t /*sz*/)
647 {
648 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
649
650 if (!file->WriteEvt(e))
651 return -1;
652
653 if (file->GetRunId()==fMaxRun)
654 {
655 fNumEvts[kCurrent]++;
656 fNumEvts[kEventId] = e->EventNum;
657 fNumEvts[kTriggerId] = e->TriggerNum;
658 }
659
660 fNumEvts[kTotal]++;
661
662 static Time oldt(boost::date_time::neg_infin);
663 Time newt;
664 if (newt>oldt+boost::posix_time::seconds(1))
665 {
666 fDimEvents.Update(fNumEvts);
667 oldt = newt;
668 }
669
670
671 // ===> SignalEvtWritten(runid);
672 // Send num events written of newest file
673
674 /* close run runId (all all runs if runId=0) */
675 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
676 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
677
678 return 0;
679 }
680
681 virtual void CloseRun(uint32_t /*runid*/) { }
682
683 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
684 {
685 fMsg.Info(" ==> TODO: Update run configuration in database!");
686
687 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
688
689 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
690 if (it==fFiles.end())
691 {
692 ostringstream str;
693 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
694 fMsg.Fatal(str);
695 return -1;
696 }
697
698 fFiles.erase(it);
699
700 fLastClosed = file->GetRunId();
701 CloseRun(fLastClosed);
702 UpdateRuns();
703
704 fDimEvents.Update(fNumEvts);
705
706 const bool rc = file->Close(tail);
707 if (!rc)
708 {
709 // Error message
710 }
711
712 ostringstream str;
713 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
714 fMsg.Info(str);
715
716 delete file;
717
718 // ==> SignalRunClose(runid);
719 // Send new num open files
720 // Send empty file-name if no file is open
721
722 return rc ? 0 : -1;
723 }
724
725 ofstream fDumpStream[40];
726
727 void debugStream(int isock, void *buf, int len)
728 {
729 if (!fDebugStream)
730 return;
731
732 const int slot = isock/7;
733 if (slot<0 || slot>39)
734 return;
735
736 if (!fDumpStream[slot].is_open())
737 {
738 ostringstream name;
739 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
740
741 fDumpStream[slot].open(name.str().c_str(), ios::app);
742 if (!fDumpStream[slot])
743 {
744 ostringstream str;
745 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
746 fMsg.Error(str);
747
748 return;
749 }
750
751 fMsg.Message("Opened file '"+name.str()+"' for writing.");
752 }
753
754 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
755 }
756
757 ofstream fDumpRead; // Stream to possibly dump docket events
758
759 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
760 {
761 // isock = socketID (0-279)
762 // ibyte = #bytes gelesen
763 // event = eventId (oder 0 wenn noch nicht bekannt)
764 // state : 1=finished reading data
765 // 0=reading data
766 // -1=start reading data (header)
767 // -2=start reading data,
768 // eventId not known yet (too little data)
769 // tsec, tusec = time when reading seconds, microseconds
770 //
771 if (!fDebugRead || ibyte==0)
772 return;
773
774 if (!fDumpRead.is_open())
775 {
776 fDumpRead.open("socket_events.txt", ios::app);
777 if (!fDumpRead)
778 {
779 ostringstream str;
780 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
781 fMsg.Error(str);
782
783 return;
784 }
785
786 fMsg.Message("Opened file 'socket_events.txt' for writing.");
787
788 fDumpRead << "# START: " << Time().GetAsStr() << endl;
789 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
790 }
791
792 fDumpRead
793 << setw(2) << state << " "
794 << setw(8) << tsec << " "
795 << setw(9) << tusec << " "
796 << setw(3) << isock << " "
797 << setw(2) << isock/7 << " "
798 << runno << " "
799 << event << " "
800 << ftmevt << " "
801 << ibyte << endl;
802 }
803
804 array<uint16_t,2> fVecRoi;
805
806 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int /*iboard*/)
807 {
808 /*
809 fadhd[i] ist ein array mit den 40 fad-headers
810 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
811
812 event ist die Struktur, die auch die write routine erhaelt;
813 darin sind im header die 'soll-werte' fuer z.B. eventID
814 als auch die ADC-Werte (falls Du die brauchst)
815
816 Wenn die routine einen negativen Wert liefert, wird das event
817 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
818 */
819
820 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
821
822 if (roi!=fVecRoi)
823 {
824 Update(fDimRoi, roi);
825 fVecRoi = roi;
826 }
827
828 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
829 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
830
831 // FIMXE: Compare with target configuration
832
833 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
834 {
835 // FIXME: Compare with expectations!!!
836 if (ptr->fStartDelimiter==0)
837 {
838 if (ptr==beg)
839 beg++;
840 continue;
841 }
842
843 if (beg->fStatus != ptr->fStatus)
844 {
845 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
846 CloseRunFile(runNr, 0, 0);
847 return -1;
848 }
849
850 if (beg->fRunNumber != ptr->fRunNumber)
851 {
852 fMsg.Error("Inconsistent run number detected.... closing run.");
853 CloseRunFile(runNr, 0, 0);
854 return -1;
855 }
856
857 /*
858 if (beg->fVersion != ptr->fVersion)
859 {
860 Error("Inconsist firmware version detected.... closing run.");
861 CloseRunFile(runNr, 0, 0);
862 break;
863 }
864 if (beg->fEventCounter != ptr->fEventCounter)
865 {
866 Error("Inconsist run number detected.... closing run.");
867 CloseRunFile(runNr, 0, 0);
868 break;
869 }
870 if (beg->fTriggerCounter != ptr->fTriggerCounter)
871 {
872 Error("Inconsist trigger number detected.... closing run.");
873 CloseRunFile(runNr, 0, 0);
874 break;
875 }*/
876
877 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
878 {
879 fMsg.Error("Inconsistent phase shift detected.... closing run.");
880 CloseRunFile(runNr, 0, 0);
881 return -1;
882 }
883
884 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
885 {
886 fMsg.Error("Inconsistent DAC values detected.... closing run.");
887 CloseRunFile(runNr, 0, 0);
888 return -1;
889 }
890
891 if (beg->fTriggerType != ptr->fTriggerType)
892 {
893 fMsg.Error("Inconsistent trigger type detected.... closing run.");
894 CloseRunFile(runNr, 0, 0);
895 return -1;
896 }
897 }
898
899 // check REFCLK_frequency
900 // check consistency with command configuration
901 // how to log errors?
902 // need gotNewRun/closedRun to know it is finished
903
904 return 0;
905 }
906
907 void SendRawData(PEVNT_HEADER */*fadhd*/, EVENT *event)
908 {
909 // Currently we send any event no matter what its trigger id is...
910 // To be changed.
911 static Time oldt(boost::date_time::neg_infin);
912 Time newt;
913
914 // FIXME: Only send events if the have newer run-numbers
915 if (newt<oldt+boost::posix_time::seconds(1))
916 return;
917
918 oldt = newt;
919
920 vector<char> data(sizeof(EVENT)+event->Roi*sizeof(float)*(1440+160));
921 memcpy(data.data(), event, sizeof(EVENT));
922
923 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
924
925 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
926 fDimRawData.Update(data);
927
928 DrsCalibrate::RemoveSpikes(vec, event->Roi);
929
930 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
931 DrsCalibrate::GetPixelStats(data2.data(), vec, event->Roi);
932
933 fDimEventData.Update(data2);
934 }
935
936 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
937 {
938 if (!DataCalib::IsValid())
939 return;
940
941 // Workaround to find a valid header.....
942 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
943 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
944
945 // FIMXE: Compare with target configuration
946
947 const FAD::EventHeader *ptr=beg;
948 for (; ptr!=end; ptr++)
949 {
950 if (ptr->fStartDelimiter==0)
951 continue;
952
953 if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
954 return;
955 }
956
957 if (ptr->fStartDelimiter==0)
958 return;
959
960 vector<float> data(event->Roi*1440);
961 DataCalib::Apply(data.data(), event->Adc_Data, event->StartPix, event->Roi);
962
963 DrsCalibrate::RemoveSpikes(data.data(), event->Roi);
964
965 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
966 DrsCalibrate::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
967
968 fDimFeedbackData.Update(data2);
969 }
970
971 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t /*iboard*/, void */*buffer*/)
972 {
973 switch (threadID)
974 {
975 case 0:
976 SendRawData(fadhd, event);
977 return 1;
978 case 1:
979 SendFeedbackData(fadhd, event);
980 return 2;
981 }
982 return 100;
983 }
984
985
986 bool IsRunStarted() const
987 {
988 const set<uint32_t>::const_iterator it = fIsRunStarted.find(fRunNumber-1);
989 return it==fIsRunStarted.end();// ? true : it->second.started;
990 }
991
992 uint32_t GetRunNumber() const
993 {
994 return fRunNumber;
995 }
996
997 void IncreaseRunNumber(uint32_t run)
998 {
999 if (run>fRunNumber)
1000 fRunNumber = run;
1001 }
1002
1003 void gotNewRun(uint32_t runnr, PEVNT_HEADER */*headers*/)
1004 {
1005 // This function is called even when writing is switched off
1006 set<uint32_t>::iterator it = fIsRunStarted.begin();
1007 while (it!=fIsRunStarted.end())
1008 {
1009 if (*it<runnr)
1010 {
1011 ostringstream str;
1012 str << "gotNewRun - Missed run " << *it << ".";
1013 fMsg.Info(str);
1014
1015 fIsRunStarted.erase(it++);
1016 continue;
1017 }
1018 if (*it==runnr)
1019 break;
1020 it++;
1021 }
1022 if (it==fIsRunStarted.end())
1023 {
1024 ostringstream str;
1025 str << "gotNewRun - Not waiting for run " << runnr << ".";
1026 fMsg.Warn(str);
1027 return;
1028 }
1029
1030 map<uint32_t,FAD::RunDescription>::iterator i2 = fExpectedRuns.find(runnr);
1031 if (i2==fExpectedRuns.end())
1032 {
1033 ostringstream str;
1034 str << "gotNewRun - Run " << runnr << " wasn't expected.";
1035 fMsg.Warn(str);
1036 return;
1037 }
1038
1039 CloseRunFile(runnr, time(NULL)+i2->second.maxtime, i2->second.maxevt);
1040 // return: 0=close scheduled / >0 already closed / <0 does not exist
1041
1042 // FIXME: Move configuration from expected runs to runs which will soon
1043 // be opened/closed
1044
1045 fIsRunStarted.erase(it);
1046 }
1047
1048 map<boost::thread::id, string> fLastMessage;
1049
1050 void factOut(int severity, int err, const char *message)
1051 {
1052 if (!fDebugLog && severity==99)
1053 return;
1054
1055 ostringstream str;
1056 //str << boost::this_thread::get_id() << " ";
1057 str << "EventBuilder(";
1058 if (err<0)
1059 str << "---";
1060 else
1061 str << err;
1062 str << "): " << message;
1063
1064 string &old = fLastMessage[boost::this_thread::get_id()];
1065
1066 if (str.str()==old)
1067 return;
1068 old = str.str();
1069
1070 fMsg.Update(str, severity);
1071 }
1072/*
1073 void factStat(int64_t *stat, int len)
1074 {
1075 if (len!=7)
1076 {
1077 fMsg.Warn("factStat received unknown number of values.");
1078 return;
1079 }
1080
1081 vector<int64_t> data(1, g_maxMem);
1082 data.insert(data.end(), stat, stat+len);
1083
1084 static vector<int64_t> last(8);
1085 if (data==last)
1086 return;
1087 last = data;
1088
1089 fDimStatistics.Update(data);
1090
1091 // len ist die Laenge des arrays.
1092 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
1093 // kannst Du pruefen, ob die 100MB voll sind ....
1094
1095 ostringstream str;
1096 str
1097 << "Wait=" << stat[0] << " "
1098 << "Skip=" << stat[1] << " "
1099 << "Del=" << stat[2] << " "
1100 << "Tot=" << stat[3] << " "
1101 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1102 << "Read=" << stat[5] << " "
1103 << "Conn=" << stat[6];
1104
1105 fMsg.Info(str);
1106 }
1107 */
1108
1109 void factStat(const EVT_STAT &stat)
1110 {
1111 fDimStatistics2.Update(stat);
1112 }
1113
1114 void factStat(const GUI_STAT &stat)
1115 {
1116 fDimStatistics1.Update(stat);
1117 }
1118
1119
1120 array<FAD::EventHeader, 40> fVecHeader;
1121
1122 template<typename T, class S>
1123 array<T, 42> Compare(const S *vec, const T *t)
1124 {
1125 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1126
1127 const T *min = NULL;
1128 const T *val = NULL;
1129 const T *max = NULL;
1130
1131 array<T, 42> arr;
1132
1133 // bool rc = true;
1134 for (int i=0; i<40; i++)
1135 {
1136 const char *base = reinterpret_cast<const char*>(vec+i);
1137 const T *ref = reinterpret_cast<const T*>(base+offset);
1138
1139 arr[i] = *ref;
1140
1141 if (gi_NumConnect[i]!=7)
1142 {
1143 arr[i] = 0;
1144 continue;
1145 }
1146
1147 if (!val)
1148 {
1149 min = ref;
1150 val = ref;
1151 max = ref;
1152 }
1153
1154 if (*ref<*min)
1155 min = ref;
1156
1157 if (*ref>*max)
1158 max = ref;
1159
1160 // if (*val!=*ref)
1161 // rc = false;
1162 }
1163
1164 arr[40] = val ? *min : 1;
1165 arr[41] = val ? *max : 0;
1166
1167 return arr;
1168 }
1169
1170 template<typename T>
1171 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1172 {
1173 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1174
1175 T val = 0;
1176 T rc = 0;
1177
1178 array<T, 42> vec;
1179
1180 bool first = true;
1181
1182 for (int i=0; i<40; i++)
1183 {
1184 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1185 const T *ref = reinterpret_cast<const T*>(base+offset);
1186
1187 vec[i+2] = *ref;
1188
1189 if (gi_NumConnect[i]!=7)
1190 {
1191 vec[i+2] = 0;
1192 continue;
1193 }
1194
1195 if (first)
1196 {
1197 first = false;
1198 val = *ref;
1199 rc = 0;
1200 }
1201
1202 rc |= val^*ref;
1203 }
1204
1205 vec[0] = rc;
1206 vec[1] = val;
1207
1208 return vec;
1209 }
1210
1211 template<typename T, size_t N>
1212 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1213 {
1214// svc.setQuality(vec[40]<=vec[41]);
1215 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1216 svc.Update();
1217 }
1218
1219 template<typename T>
1220 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1221 {
1222 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1223 for (int i=0; i<40;i++)
1224 cout << " " << data.second[i+3];
1225 cout << endl;
1226 }
1227
1228 vector<uint> fNumConnected;
1229
1230 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1231 {
1232 const uint16_t id = h.Id();
1233 if (id>39)
1234 return;
1235
1236 if (fNumConnected.size()!=40)
1237 fNumConnected.resize(40);
1238
1239 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1240
1241 const bool changed = con!=fNumConnected || !IsThreadRunning();
1242
1243 fNumConnected = con;
1244
1245 const FAD::EventHeader old = fVecHeader[id];
1246 fVecHeader[id] = h;
1247
1248 if (old.fVersion != h.fVersion || changed)
1249 {
1250 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1251
1252 array<float,42> data;
1253 for (int i=0; i<42; i++)
1254 {
1255 ostringstream str;
1256 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1257 data[i] = stof(str.str());
1258 }
1259 Update(fDimFwVersion, data);
1260 }
1261
1262 if (old.fRunNumber != h.fRunNumber || changed)
1263 {
1264 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1265 fDimRunNumber.Update(run);
1266 }
1267
1268 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1269 {
1270 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1271 fDimPrescaler.Update(pre);
1272 }
1273
1274 if (old.fDNA != h.fDNA || changed)
1275 {
1276 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1277 Update(fDimDNA, dna, 40);
1278 }
1279
1280 if (old.fStatus != h.fStatus || changed)
1281 {
1282 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1283 Update(fDimStatus, sts);
1284 }
1285
1286 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1287 {
1288 array<uint16_t, FAD::kNumDac*42> dacs;
1289
1290 for (int i=0; i<FAD::kNumDac; i++)
1291 {
1292 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1293 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1294 }
1295
1296 Update(fDimDac, dacs);
1297 }
1298
1299 // -----------
1300
1301 static Time oldt(boost::date_time::neg_infin);
1302 Time newt;
1303
1304 if (newt>oldt+boost::posix_time::seconds(1))
1305 {
1306 oldt = newt;
1307
1308 // --- RefClock
1309
1310 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1311 Update(fDimRefClock, clk);
1312
1313 // --- Temperatures
1314
1315 const array<int16_t,42> tmp[4] =
1316 {
1317 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1318 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1319 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1320 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1321 };
1322
1323 vector<int16_t> data;
1324 data.reserve(82);
1325 data.push_back(tmp[0][40]); // min: 0
1326 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1327 data.push_back(tmp[0][41]); // max: 41
1328 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1329
1330 for (int j=1; j<=3; j++)
1331 {
1332 const array<int16_t,42> &ref = tmp[j];
1333
1334 // Gloabl min
1335 if (ref[40]<data[0]) // 40=min
1336 data[0] = ref[40];
1337
1338 // Global max
1339 if (ref[41]>data[41]) // 41=max
1340 data[41] = ref[41];
1341
1342 for (int i=0; i<40; i++)
1343 {
1344 // min per board
1345 if (ref[i]<data[i+1]) // data: 1-40
1346 data[i+1] = ref[i]; // ref: 0-39
1347
1348 // max per board
1349 if (ref[i]>data[i+42]) // data: 42-81
1350 data[i+42] = ref[i]; // ref: 0-39
1351 }
1352
1353
1354 }
1355
1356 vector<float> deg(82); // 0: global min, 1-40: min
1357 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1358 deg[i] = data[i]/16.;
1359 fDimTemperature.Update(deg);
1360 }
1361 }
1362};
1363
1364EventBuilderWrapper *EventBuilderWrapper::This = 0;
1365
1366// ----------- Event builder callbacks implementation ---------------
1367extern "C"
1368{
1369 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1370 {
1371 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1372 }
1373
1374 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1375 {
1376 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1377 }
1378
1379 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1380 {
1381 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1382 }
1383
1384 // -----
1385
1386 //void *runStart(uint32_t /*irun*/, RUN_HEAD */*runhd*/, size_t /*len*/)
1387 //{
1388 // return NULL;
1389 //}
1390
1391 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
1392 {
1393 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
1394 }
1395
1396 int runEnd(uint32_t, void */*runPtr*/)
1397 {
1398 return 0;
1399 }
1400
1401 // -----
1402
1403 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
1404 {
1405 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
1406 }
1407
1408 void gotNewRun(uint32_t runnr, PEVNT_HEADER *headers)
1409 {
1410 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1411 }
1412
1413 // -----
1414
1415 void factOut(int severity, int err, const char *message)
1416 {
1417 EventBuilderWrapper::This->factOut(severity, err, message);
1418 }
1419
1420 void factStat(GUI_STAT stat)
1421 {
1422 EventBuilderWrapper::This->factStat(stat);
1423 }
1424
1425 void factStatNew(EVT_STAT stat)
1426 {
1427 EventBuilderWrapper::This->factStat(stat);
1428 }
1429
1430 // ------
1431
1432 void debugHead(int socket, int/*board*/, void *buf)
1433 {
1434 const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf);
1435 EventBuilderWrapper::This->debugHead(socket, h);
1436 }
1437
1438 void debugStream(int isock, void *buf, int len)
1439 {
1440 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1441 }
1442
1443 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1444 {
1445 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1446 }
1447}
1448
1449#endif
Note: See TracBrowser for help on using the repository browser.