source: trunk/FACT++/src/EventBuilderWrapper.h@ 15494

Last change on this file since 15494 was 15477, checked in by tbretz, 12 years ago
Moved all Dim emitting services called from the event builder#s main loop to their own threads.
File size: 53.0 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteFits2
24#endif
25
26#include "DataWriteFits2.h"
27
28namespace ba = boost::asio;
29namespace bs = boost::system;
30namespace fs = boost::filesystem;
31
32using ba::ip::tcp;
33
34using namespace std;
35
36// ========================================================================
37
38#include "EventBuilder.h"
39
40extern "C" {
41 extern void StartEvtBuild();
42 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
43}
44
45// ========================================================================
46
47class EventBuilderWrapper
48{
49public:
50 // FIXME
51 static EventBuilderWrapper *This;
52
53 MessageImp &fMsg;
54
55private:
56 boost::thread fThreadMain;
57 boost::thread fThreadDimQueue1;
58 boost::thread fThreadDimQueue2;
59
60 bool fThreadDimQueueStop;
61
62 deque<pair<Time,GUI_STAT>> fDimQueue1;
63 deque<tuple<Time,bool,FAD::EventHeader>> fDimQueue2;
64
65 mutex fMutexDimQueue1;
66 mutex fMutexDimQueue2;
67
68
69 enum CommandStates_t // g_runStat
70 {
71 kAbort = -2, // quit as soon as possible ('abort')
72 kExit = -1, // stop reading, quit when buffered events done ('exit')
73 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
74 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
75 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
76 kModeFlush = 10, // read data from camera, but skip them ('flush')
77 kModeTest = 20, // read data and process them, but do not write to disk ('test')
78 kModeFlag = 30, // read data, process and write all to disk ('flag')
79 kModeRun = 40, // read data, process and write selected to disk ('run')
80 };
81
82 enum
83 {
84 kCurrent = 0,
85 kTotal = 1,
86 kEventId = 2,
87 kTriggerId = 3,
88 };
89
90 FAD::FileFormat_t fFileFormat;
91
92 uint32_t fMaxRun;
93 uint32_t fLastOpened;
94 uint32_t fLastClosed;
95 uint32_t fNumEvts[4];
96
97 DimWriteStatistics fDimWriteStats;
98 DimDescribedService fDimRuns;
99 DimDescribedService fDimEvents;
100 DimDescribedService fDimRawData;
101 DimDescribedService fDimEventData;
102 DimDescribedService fDimFeedbackData;
103 DimDescribedService fDimFwVersion;
104 DimDescribedService fDimRunNumber;
105 DimDescribedService fDimStatus;
106 DimDescribedService fDimDNA;
107 DimDescribedService fDimTemperature;
108 DimDescribedService fDimPrescaler;
109 DimDescribedService fDimRefClock;
110 DimDescribedService fDimRoi;
111 DimDescribedService fDimDac;
112 DimDescribedService fDimDrsRuns;
113 DimDescribedService fDimDrsCalibration;
114 DimDescribedService fDimStatistics1;
115 //DimDescribedService fDimStatistics2;
116 DimDescribedService fDimFileFormat;
117 DimDescribedService fDimIncomplete;
118
119 bool fDebugStream;
120 bool fDebugRead;
121 bool fDebugLog;
122
123 string fPath;
124 uint64_t fNightAsInt;
125 uint32_t fRunNumber;
126
127protected:
128 bool InitRunNumber(const string &path="")
129 {
130 if (!path.empty())
131 {
132 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
133 {
134 fMsg.Error("Data path "+path+" does not exist!");
135 return false;
136 }
137
138 fPath = path;
139 fDimWriteStats.SetCurrentFolder(fPath);
140
141 fMsg.Info("Data path set to "+path+".");
142 }
143
144 // Get current night
145 const uint64_t night = Time().NightAsInt();
146 if (night==fNightAsInt)
147 return true;
148
149 // Check for run numbers
150 fRunNumber = 1000;
151
152 while (--fRunNumber>0)
153 {
154 const string name = DataProcessorImp::FormFileName(fPath, night, fRunNumber, "");
155
156 if (access((name+"bin").c_str(), F_OK) == 0)
157 break;
158 if (access((name+"fits").c_str(), F_OK) == 0)
159 break;
160 if (access((name+"drs.fits").c_str(), F_OK) == 0)
161 break;
162 }
163
164 // This is now the first file which does not exist
165 fRunNumber++;
166 fLastOpened = 0;
167
168 // Check if we have exceeded the maximum
169 if (fRunNumber==1000)
170 {
171 fMsg.Error("You have a file with run-number 1000 in "+fPath+" ["+to_string(night)+"]");
172 return false;
173 }
174
175 ostringstream str;
176 if (fNightAsInt==0)
177 str << "First night...";
178 else
179 str << "Night has changd from " << fNightAsInt << "... new";
180 str << " run-number is " << night << "-" << setfill('0') << setw(3) << fRunNumber << " [" << (fPath.empty()?".":fPath) << "]";
181 fMsg.Message(str);
182
183 fNightAsInt = night;
184
185 return true;
186 }
187
188public:
189 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
190 fThreadDimQueueStop(false),
191 fFileFormat(FAD::kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
192 fDimWriteStats ("FAD_CONTROL", imp),
193 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C",
194 "Run files statistics"
195 "|stats[int]:num of open files, min/max run no, last opened or closed run"
196 "|file[string]:filename of last opened file"),
197 fDimEvents ("FAD_CONTROL/EVENTS", "I:4",
198 "Event counts"
199 "|evtsCount[int]:Num evts cur. run, total (all run), evt ID, trig. Num"),
200 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;S:1;I:1;I:1;S:1;I:1;C:4;I:1;I:2;I:40;S:1440;S:160;F",
201 "|roi[uint16]:number of samples per pixel"
202 "|roi_tm[uint16]:number of samples per time-marker channel"
203 "|num_fad[uint32]:event number from FADs"
204 "|num_ftm[uint32]:trigger number from FTM"
205 "|type[uint16]:trigger type from FTM"
206 "|num_boards[uint32]:number of active boards"
207 "|error[uint8]:event builder error counters"
208 "|dummy[]:"
209 "|time[uint32]:PC time as unix time stamp"
210 "|time_board[uint32]:Time stamp of FAD boards"
211 "|start_pix[int16]:start sample of pixels"
212 "|start_tm[int16]:start sample of time marker channels"
213 "|adc[int16]:adc data"),
214 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", "|avg:|rms:|max:|pos"),
215 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
216 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42",
217 "Firmware version number of fad boards"
218 "|firmware[float]:Version number of firmware, for each board. 40=min, 41=max"),
219 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42",
220 "Run numbers coming from FAD boards"
221 "|runNumbers[int]:current run number of each FAD board. 40=min, 41=max"),
222 fDimStatus ("FAD_CONTROL/STATUS", "S:42",
223 "Status of FAD boards"
224 "|status[bitpattern]:Status of each FAD board. Maybe buggy"),
225 fDimDNA ("FAD_CONTROL/DNA", "X:40",
226 "DNA of FAD boards"
227 "|DNA[hex]:Hex identifier of each FAD board"),
228 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82",
229 "FADs temperatures"
230 "|temp[deg. C]:0 global min, 1-40 min, 41 global max, 42-81 max"),
231 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42",
232 "Trigger generator prescaler of fad boards"
233 "|prescaler[int]:Trigger generator prescaler value, for each board"),
234 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42",
235 "Reference clock of FAD boards"
236 "|refClocks[t]:ref clocks of FAD boards. 40=min, 41=max"),
237 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", "roi:|roi_rm:"),
238 fDimDac ("FAD_CONTROL/DAC", "S:336",
239 "DAC settings of each FAD board"
240 "|DAC[int]:DAC counts, sequentially DAC 0 board 0, 0/1, 0/2... (plus min max)"),
241 fDimDrsRuns ("FAD_CONTROL/DRS_RUNS", "I:1;I:3",
242 "|roi:Region of interest of secondary baseline"
243 "|run:Run numbers of DRS runs (0=none)"),
244 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:1;I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:163840;F:163840",
245 "|roi:Region of interest of secondary baseline"
246 "|run:Run numbers of DRS runs (0=none)"),
247 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;X:40",
248 "Event Builder status for GUI display"
249 "|threadInfo[int]:Number of read, proc and writes"
250 "|bufferInfo[int]:Events in buffer, incomp., comp., tot., max past cycle, total"
251 "|memInfo[int]:total buf. mem, used mem, max used, max past cycle"
252 "|EvtCnt[int]:Number of events skipped, written, with errors"
253 "|badRoi[int]:Num boards with wrong ROI in event, run or board"
254 "|badRoiBoard[int]:Num boards with wrong ROI"
255 "|deltaT[ms]:Time in ms for rates"
256 "|rateNew[int]:Number of new start events received"
257 "|numConn[int]:Number of connections per board"
258 "|rateBytes[int]:Bytes read this cycle"
259 "|totBytes[int]:Bytes read (counter)"),
260 /*fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40",
261 "Event Builder status, events oriented"
262 "|reset[int]:If increased, reset all counters"
263 "|numRead[int]:How often sucessful read from N sockets per loop"
264 "|gotByte[int]:number of bytes read per board"
265 "|gotErr[int]:number of com. errors per board"
266 "|evtStat[int]:number of evts read, completed, with errors, incomplete"
267 "|procStat[int]:num. of evts proc., w probs, acc. or rej. by SW trigger"
268 "|feedStat[int]:number of evts used or rejected by feedback system"
269 "|wrtStat[int]:number of evts written to disk, with errors"
270 "|runStat[int]:number of run opened, closed, with open or close errors"
271 "|numConn[int]:number of sockets successfully opened per board"),*/
272 fDimFileFormat("FAD_CONTROL/FILE_FORMAT", "S:1", "|format[int]:Current file format"),
273 fDimIncomplete("FAD_CONTROL/INCOMPLETE", "X:1", "|incomplete[bits]:One bit per board"),
274 fDebugStream(false), fDebugRead(false), fDebugLog(false), fNightAsInt(0)
275 {
276 if (This)
277 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
278
279 This = this;
280
281 memset(fNumEvts, 0, sizeof(fNumEvts));
282 fDimEvents.Update(fNumEvts);
283
284 for (size_t i=0; i<40; i++)
285 ConnectSlot(i, tcp::endpoint());
286
287 fThreadDimQueue1 = boost::thread(boost::bind(&EventBuilderWrapper::SendStat, this));
288 fThreadDimQueue2 = boost::thread(boost::bind(&EventBuilderWrapper::ProcHeader, this));
289 }
290
291 virtual ~EventBuilderWrapper()
292 {
293 Abort();
294 fThreadDimQueueStop = true;
295
296 // FIXME: Used timed_join and abort afterwards
297 // What's the maximum time the eb need to abort?
298 fThreadMain.join();
299 fThreadDimQueue1.join();
300 fThreadDimQueue2.join();
301
302 //ffMsg.Info("EventBuilder stopped.");
303
304 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
305 delete *it;
306 }
307
308 set<uint32_t> fIsRunStarted;
309 map<uint32_t, FAD::RunDescription> fExpectedRuns;
310
311 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
312 {
313 if (maxtime<=0 || maxtime>24*60*60)
314 maxtime = 24*60*60;
315 if (maxevt<=0 || maxevt>INT32_MAX)
316 maxevt = INT32_MAX;
317
318 const FAD::RunDescription descr =
319 {
320 uint32_t(maxtime),
321 uint32_t(maxevt),
322 ref.first,
323 ref.second,
324 };
325
326 if (!InitRunNumber())
327 return 0;
328
329 // FIMXE: Maybe reset an event counter so that the mcp can count events?
330
331 //fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
332
333 fExpectedRuns[fRunNumber] = descr;
334 fIsRunStarted.insert(fRunNumber);
335 return fRunNumber++;
336 }
337
338 bool IsThreadRunning()
339 {
340 return !fThreadMain.timed_join(boost::posix_time::microseconds(0));
341 }
342
343 void SetMaxMemory(unsigned int mb) const
344 {
345 /*
346 if (mb*1000000<GetUsedMemory())
347 {
348 // ffMsg.Warn("...");
349 return;
350 }*/
351
352 g_maxMem = size_t(mb)*1000000;
353 }
354
355 void StartThread(const vector<tcp::endpoint> &addr)
356 {
357 if (IsThreadRunning())
358 {
359 fMsg.Warn("Start - EventBuilder still running");
360 return;
361 }
362
363 fLastMessage.clear();
364
365 for (size_t i=0; i<40; i++)
366 ConnectSlot(i, addr[i]);
367
368 g_runStat = kModeRun;
369 g_maxProc = 3;
370
371 fMsg.Message("Starting EventBuilder thread");
372
373 fThreadMain = boost::thread(StartEvtBuild);
374 }
375 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
376 {
377 if (i>39)
378 return;
379
380 if (addr==tcp::endpoint())
381 {
382 DisconnectSlot(i);
383 return;
384 }
385
386 struct sockaddr_in sockaddr; //IP for each socket
387 sockaddr.sin_family = AF_INET;
388 sockaddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
389 sockaddr.sin_port = htons(addr.port());
390 memcpy(&g_port[i].sockAddr, &sockaddr, sizeof(struct sockaddr_in));
391
392 // In this order
393 g_port[i].sockDef = 1;
394
395 fDimIncomplete.setQuality(0);
396 fDimIncomplete.Update(uint64_t(0));
397 }
398
399 void DisconnectSlot(unsigned int i)
400 {
401 if (i>39)
402 return;
403
404 g_port[i].sockDef = 0;
405 // In this order
406
407 struct sockaddr_in addr; //IP for each socket
408 addr.sin_family = AF_INET;
409 addr.sin_addr.s_addr = 0;
410 addr.sin_port = 0;
411 memcpy(&g_port[i].sockAddr, &addr, sizeof(struct sockaddr_in));
412
413 fDimIncomplete.setQuality(0);
414 fDimIncomplete.Update(uint64_t(0));
415 }
416 void IgnoreSlot(unsigned int i)
417 {
418 if (i>39)
419 return;
420 if (g_port[i].sockAddr.sin_port==0)
421 return;
422
423 g_port[i].sockDef = -1;
424 }
425
426
427 void Abort()
428 {
429 fMsg.Message("Signal abort to EventBuilder thread...");
430 g_runStat = kAbort;
431 }
432
433 void ResetThread(bool soft)
434 {
435 /*
436 if (g_reset > 0)
437
438 * suspend reading
439 * reset = g_reset;
440 * g_reset=0
441
442 * reset% 10
443 == 0 leave event Buffers as they are
444 == 1 let all buffers drain (write (incomplete) events)
445 > 1 flush all buffers (do not write buffered events)
446
447 * (reset/10)%10
448 > 0 close all sockets and destroy them (also free the
449 allocated read-buffers)
450 recreate before resuming operation
451 [ this is more than just close/open that can be
452 triggered by e.g. close/open the base-socket ]
453
454 * (reset/100)%10
455 > 0 close all open run-files
456
457 * (reset/1000)
458 sleep so many seconds before resuming operation
459 (does not (yet) take into account time left when waiting
460 for buffers getting empty ...)
461
462 * resume_reading
463
464 */
465 fMsg.Message("Signal reset to EventBuilder thread...");
466 g_reset = soft ? 101 : 102;
467 }
468
469 void Exit()
470 {
471 fMsg.Message("Signal exit to EventBuilder thread...");
472 g_runStat = kExit;
473 }
474
475 /*
476 void Wait()
477 {
478 fThread.join();
479 ffMsg.Message("EventBuilder stopped.");
480 }*/
481
482 void Hybernate() const { g_runStat = kHybernate; }
483 void Sleep() const { g_runStat = kSleep; }
484 void FlushMode() const { g_runStat = kModeFlush; }
485 void TestMode() const { g_runStat = kModeTest; }
486 void FlagMode() const { g_runStat = kModeFlag; }
487 void RunMode() const { g_runStat = kModeRun; }
488
489 // FIXME: To be removed
490 //void SetMode(int mode) const { g_runStat = mode; }
491
492 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
493 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
494 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
495 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
496 int GetNumFilesOpen() const { return fFiles.size(); }
497
498 /*
499 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
500 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
501 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
502 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
503 */
504
505 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
506 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
507
508 void SetOutputFormat(FAD::FileFormat_t f)
509 {
510 const bool changed = f!=fFileFormat;
511
512 fFileFormat = f;
513 fDimFileFormat.Update(uint16_t(f));
514
515 string msg = "File format set to: ";
516 switch (f)
517 {
518 case FAD::kNone: msg += "kNone."; break;
519 case FAD::kDebug: msg += "kDebug."; break;
520 case FAD::kFits: msg += "kFits."; break;
521 case FAD::kCfitsio: msg += "kCfitsio"; break;
522 case FAD::kRaw: msg += "kRaw"; break;
523 case FAD::kCalib:
524 DataCalib::Restart();
525 DataCalib::Update(fDimDrsCalibration, fDimDrsRuns);
526 fMsg.Message("Resetted DRS calibration.");
527 return;
528 }
529
530 if (changed)
531 fMsg.Message(msg);
532 }
533
534 virtual int ResetSecondaryDrsBaseline()
535 {
536 if (DataCalib::ResetTrgOff(fDimDrsCalibration, fDimDrsRuns))
537 {
538 fFileFormat = FAD::kCalib;
539 fDimFileFormat.Update(uint16_t(fFileFormat));
540 fMsg.Message("Resetted DRS calibration for secondary baseline.");
541 }
542 else
543 fMsg.Warn("Could not reset DRS calibration of secondary baseline.");
544
545 return 0;
546 }
547
548 void SetDebugLog(bool b) { fDebugLog = b; }
549
550 void SetDebugStream(bool b)
551 {
552 fDebugStream = b;
553 if (b)
554 return;
555
556 for (int i=0; i<40; i++)
557 {
558 if (!fDumpStream[i].is_open())
559 continue;
560
561 fDumpStream[i].close();
562
563 ostringstream name;
564 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
565 fMsg.Message("Closed file '"+name.str()+"'");
566 }
567 }
568
569 void SetDebugRead(bool b)
570 {
571 fDebugRead = b;
572 if (b || !fDumpRead.is_open())
573 return;
574
575 fDumpRead.close();
576 fMsg.Message("Closed file 'socket_events.txt'");
577 }
578
579// size_t GetUsedMemory() const { return gi_usedMem; }
580
581 void LoadDrsCalibration(const char *fname)
582 {
583 if (!DataCalib::ReadFits(fname, fMsg))
584 return;
585 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
586 DataCalib::Update(fDimDrsCalibration, fDimDrsRuns);
587 }
588
589 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
590
591
592 /*
593 struct OpenFileToDim
594 {
595 int code;
596 char fileName[FILENAME_MAX];
597 };
598
599 SignalRunOpened(runid, filename);
600 // Send num open files
601 // Send runid, (more info about the run?), filename via dim
602
603 SignalEvtWritten(runid);
604 // Send num events written of newest file
605
606 SignalRunClose(runid);
607 // Send new num open files
608 // Send empty file-name if no file is open
609
610 */
611
612 // -------------- Mapped event builder callbacks ------------------
613
614 void UpdateRuns(const string &fname="")
615 {
616 uint32_t values[5] =
617 {
618 static_cast<uint32_t>(fFiles.size()),
619 0xffffffff,
620 0,
621 fLastOpened,
622 fLastClosed
623 };
624
625 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
626 it!=fFiles.end(); it++)
627 {
628 const DataProcessorImp *file = *it;
629
630 if (file->GetRunId()<values[1])
631 values[1] = file->GetRunId();
632
633 if (file->GetRunId()>values[2])
634 values[2] = file->GetRunId();
635 }
636
637 fMaxRun = values[2];
638
639 vector<char> data(sizeof(values)+fname.size()+1);
640 memcpy(data.data(), values, sizeof(values));
641 strcpy(data.data()+sizeof(values), fname.c_str());
642
643 fDimRuns.Update(data);
644 }
645
646 vector<DataProcessorImp*> fFiles;
647
648 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
649 {
650 //fMsg.Info(" ==> TODO: Update run configuration in database!");
651
652 map<uint32_t,FAD::RunDescription>::iterator it = fExpectedRuns.begin();
653 while (it!=fExpectedRuns.end())
654 {
655 if (it->first<runid)
656 {
657 ostringstream str;
658 str << "runOpen - Missed run " << it->first << ".";
659 fMsg.Info(str);
660
661 fExpectedRuns.erase(it++);
662 continue;
663 }
664 if (it->first==runid)
665 break;
666 it++;
667 }
668
669 FAD::RunDescription desc;
670
671 if (it==fExpectedRuns.end())
672 {
673 ostringstream str;
674 str << "runOpen - Run " << runid << " wasn't expected (maybe manual triggers)";
675 fMsg.Warn(str);
676 }
677 else
678 {
679 desc = it->second;
680 fExpectedRuns.erase(it);
681 }
682
683 // Check if file already exists...
684 DataProcessorImp *file = 0;
685 switch (fFileFormat)
686 {
687 case FAD::kNone: file = new DataDump(fPath, fNightAsInt, runid, fMsg); break;
688 case FAD::kDebug: file = new DataDebug(fPath, fNightAsInt, runid, fMsg); break;
689 case FAD::kCfitsio: file = new DataWriteFits(fPath, fNightAsInt, runid, fMsg); break;
690 case FAD::kFits: file = new DataWriteFits2(fPath, fNightAsInt, runid, fMsg); break;
691 case FAD::kRaw: file = new DataWriteRaw(fPath, fNightAsInt, runid, fMsg); break;
692 case FAD::kCalib: file = new DataCalib(fPath, fNightAsInt, runid, fDimDrsCalibration, fDimDrsRuns, fMsg); break;
693 }
694
695 try
696 {
697 if (!file->Open(h, desc))
698 return 0;
699 }
700 catch (const exception &e)
701 {
702 fMsg.Error("Exception trying to open file: "+string(e.what()));
703 return 0;
704 }
705
706 fFiles.push_back(file);
707
708 ostringstream str;
709 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
710 fMsg.Info(str);
711
712 fDimWriteStats.FileOpened(file->GetFileName());
713
714 fLastOpened = runid;
715 UpdateRuns(file->GetFileName());
716
717 fNumEvts[kEventId] = 0;
718 fNumEvts[kTriggerId] = 0;
719
720 fNumEvts[kCurrent] = 0;
721 fDimEvents.Update(fNumEvts);
722 // fDimCurrentEvent.Update(uint32_t(0));
723
724 return reinterpret_cast<FileHandle_t>(file);
725 }
726
727 int runWrite(FileHandle_t handler, EVENT *e, size_t /*sz*/)
728 {
729 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
730
731 if (!file->WriteEvt(e))
732 return -1;
733
734 if (file->GetRunId()==fMaxRun)
735 {
736 fNumEvts[kCurrent]++;
737 fNumEvts[kEventId] = e->EventNum;
738 fNumEvts[kTriggerId] = e->TriggerNum;
739 }
740
741 fNumEvts[kTotal]++;
742
743 static Time oldt(boost::date_time::neg_infin);
744 Time newt;
745 if (newt>oldt+boost::posix_time::seconds(1))
746 {
747 fDimEvents.Update(fNumEvts);
748 oldt = newt;
749 }
750
751
752 // ===> SignalEvtWritten(runid);
753 // Send num events written of newest file
754
755 /* close run runId (all all runs if runId=0) */
756 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
757 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
758
759 return 0;
760 }
761
762 virtual void CloseRun(uint32_t /*runid*/) { }
763
764 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
765 {
766 //fMsg.Info(" ==> TODO: Update run configuration in database!");
767
768 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
769
770 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
771 if (it==fFiles.end())
772 {
773 ostringstream str;
774 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
775 fMsg.Fatal(str);
776 return -1;
777 }
778
779 /*
780 fFiles.erase(it);
781
782 fLastClosed = file->GetRunId();
783 CloseRun(fLastClosed);
784 UpdateRuns();
785
786 fDimEvents.Update(fNumEvts);
787 */
788
789 const bool rc = file->Close(tail);
790 if (!rc)
791 {
792 // Error message
793 }
794
795 // Note that this is the signal for the fadctrl to change from
796 // WritingData back to Connected. If this is done too early,
797 // a new run might be started before this is closed. This is
798 // faster, but leads to problems with the DRS calibration
799 // if the system is fast enough to start the new run before
800 // this one has really been closed.
801 fFiles.erase(it);
802
803 fLastClosed = file->GetRunId();
804 CloseRun(fLastClosed);
805 UpdateRuns();
806
807 fDimEvents.Update(fNumEvts);
808
809
810 ostringstream str;
811 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
812 fMsg.Info(str);
813
814 delete file;
815
816 // ==> SignalRunClose(runid);
817 // Send new num open files
818 // Send empty file-name if no file is open
819
820 return rc ? 0 : -1;
821 }
822
823 ofstream fDumpStream[40];
824
825 void debugStream(int isock, void *buf, int len)
826 {
827 if (!fDebugStream)
828 return;
829
830 const int slot = isock/7;
831 if (slot<0 || slot>39)
832 return;
833
834 if (!fDumpStream[slot].is_open())
835 {
836 ostringstream name;
837 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
838
839 fDumpStream[slot].open(name.str().c_str(), ios::app);
840 if (!fDumpStream[slot])
841 {
842 ostringstream str;
843 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
844 fMsg.Error(str);
845
846 return;
847 }
848
849 fMsg.Message("Opened file '"+name.str()+"' for writing.");
850 }
851
852 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
853 }
854
855 ofstream fDumpRead; // Stream to possibly dump docket events
856
857 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
858 {
859 // isock = socketID (0-279)
860 // ibyte = #bytes gelesen
861 // event = eventId (oder 0 wenn noch nicht bekannt)
862 // state : 1=finished reading data
863 // 0=reading data
864 // -1=start reading data (header)
865 // -2=start reading data,
866 // eventId not known yet (too little data)
867 // tsec, tusec = time when reading seconds, microseconds
868 //
869 if (!fDebugRead || ibyte==0)
870 return;
871
872 if (!fDumpRead.is_open())
873 {
874 fDumpRead.open("socket_events.txt", ios::app);
875 if (!fDumpRead)
876 {
877 ostringstream str;
878 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
879 fMsg.Error(str);
880
881 return;
882 }
883
884 fMsg.Message("Opened file 'socket_events.txt' for writing.");
885
886 fDumpRead << "# START: " << Time().GetAsStr() << endl;
887 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
888 }
889
890 fDumpRead
891 << setw(2) << state << " "
892 << setw(8) << tsec << " "
893 << setw(9) << tusec << " "
894 << setw(3) << isock << " "
895 << setw(2) << isock/7 << " "
896 << runno << " "
897 << event << " "
898 << ftmevt << " "
899 << ibyte << endl;
900 }
901
902 array<uint16_t,2> fVecRoi;
903
904 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int /*iboard*/)
905 {
906 /*
907 fadhd[i] ist ein array mit den 40 fad-headers
908 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
909
910 event ist die Struktur, die auch die write routine erhaelt;
911 darin sind im header die 'soll-werte' fuer z.B. eventID
912 als auch die ADC-Werte (falls Du die brauchst)
913
914 Wenn die routine einen negativen Wert liefert, wird das event
915 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
916 */
917
918 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
919
920 if (roi!=fVecRoi)
921 {
922 Update(fDimRoi, roi);
923 fVecRoi = roi;
924 }
925
926 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
927 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
928
929 // FIMXE: Compare with target configuration
930
931 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
932 {
933 // FIXME: Compare with expectations!!!
934 if (ptr->fStartDelimiter==0)
935 {
936 if (ptr==beg)
937 beg++;
938 continue;
939 }
940
941 if (beg->fStatus != ptr->fStatus)
942 {
943 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
944 CloseRunFile(runNr, 0, 0);
945 return -1;
946 }
947
948 if (beg->fRunNumber != ptr->fRunNumber)
949 {
950 fMsg.Error("Inconsistent run number detected.... closing run.");
951 CloseRunFile(runNr, 0, 0);
952 return -1;
953 }
954
955 /*
956 if (beg->fVersion != ptr->fVersion)
957 {
958 Error("Inconsist firmware version detected.... closing run.");
959 CloseRunFile(runNr, 0, 0);
960 break;
961 }
962 */
963 if (beg->fEventCounter != ptr->fEventCounter)
964 {
965 fMsg.Error("Inconsistent FAD event number detected.... closing run.");
966 CloseRunFile(runNr, 0, 0);
967 return -1;
968 }
969
970 if (beg->fTriggerCounter != ptr->fTriggerCounter)
971 {
972 fMsg.Error("Inconsistent FTM trigger number detected.... closing run.");
973 CloseRunFile(runNr, 0, 0);
974 return -1;
975 }
976
977 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
978 {
979 fMsg.Error("Inconsistent phase shift detected.... closing run.");
980 CloseRunFile(runNr, 0, 0);
981 return -1;
982 }
983
984 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
985 {
986 fMsg.Error("Inconsistent DAC values detected.... closing run.");
987 CloseRunFile(runNr, 0, 0);
988 return -1;
989 }
990
991 if (beg->fTriggerType != ptr->fTriggerType)
992 {
993 fMsg.Error("Inconsistent trigger type detected.... closing run.");
994 CloseRunFile(runNr, 0, 0);
995 return -1;
996 }
997 }
998
999 // check REFCLK_frequency
1000 // check consistency with command configuration
1001 // how to log errors?
1002 // need gotNewRun/closedRun to know it is finished
1003
1004 return 0;
1005 }
1006
1007 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
1008 {
1009 // Currently we send any event no matter what its trigger id is...
1010 // To be changed.
1011 static Time oldt(boost::date_time::neg_infin);
1012 Time newt;
1013
1014 static int skip = 0;
1015
1016 // FIXME: Only send events if the have newer run-numbers
1017 if (newt<oldt+boost::posix_time::milliseconds(skip>0 ? 100 : 1000))
1018 return;
1019 oldt = newt;
1020
1021 // Workaround to find a valid header.....
1022 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
1023 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
1024
1025 // FIMXE: Compare with target configuration
1026 const FAD::EventHeader *ptr=beg;
1027 for (; ptr!=end; ptr++)
1028 {
1029 if (ptr->fStartDelimiter!=0)
1030 break;
1031 }
1032 if (ptr==end)
1033 return;
1034
1035
1036 vector<char> data(sizeof(EVENT)+event->Roi*sizeof(float)*(1440+160));
1037 memcpy(data.data(), event, sizeof(EVENT));
1038
1039 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
1040
1041 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
1042 DrsCalibrate::RemoveSpikes(vec, event->Roi);
1043
1044 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
1045 const double max = DrsCalibrate::GetPixelStats(data2.data(), vec, event->Roi);
1046
1047 // Maximum above roughly 5pe
1048 if (ptr->IsTriggerPhys() && max<100 && skip<10)
1049 {
1050 skip++;
1051 return;
1052 }
1053
1054 skip = 0;
1055
1056 fDimRawData.setQuality(ptr->fTriggerType);
1057 fDimRawData.Update(data);
1058
1059 fDimEventData.setQuality(ptr->fTriggerType);
1060 fDimEventData.Update(data2);
1061 }
1062
1063 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
1064 {
1065 /*
1066 if (!DataCalib::IsValid())
1067 return;
1068
1069 // Workaround to find a valid header.....
1070 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
1071 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
1072
1073 // FIMXE: Compare with target configuration
1074
1075 const FAD::EventHeader *ptr=beg;
1076 for (; ptr<end; ptr++)
1077 {
1078 if (ptr->fStartDelimiter!=0)
1079 break;
1080 }
1081
1082 if (ptr==end)
1083 return;
1084
1085 if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
1086 return;
1087
1088 vector<float> data(event->Roi*1440);
1089 DataCalib::Apply(data.data(), event->Adc_Data, event->StartPix, event->Roi);
1090
1091 DrsCalibrate::RemoveSpikes(data.data(), event->Roi);
1092
1093 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
1094 DrsCalibrate::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
1095
1096 fDimFeedbackData.Update(data2);
1097 */
1098 }
1099
1100 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t /*iboard*/, void */*buffer*/)
1101 {
1102 switch (threadID)
1103 {
1104 case 0:
1105 SendRawData(fadhd, event);
1106 return 1;
1107 case 1:
1108 SendFeedbackData(fadhd, event);
1109 return 2;
1110 }
1111 return 100;
1112 }
1113
1114
1115 bool IsRunStarted() const
1116 {
1117 const set<uint32_t>::const_iterator it = fIsRunStarted.find(fRunNumber-1);
1118 return it==fIsRunStarted.end();// ? true : it->second.started;
1119 }
1120
1121 uint32_t GetRunNumber() const
1122 {
1123 return fRunNumber;
1124 }
1125
1126 bool IsRunFileOpen()
1127 {
1128 return fLastOpened==fRunNumber-1;
1129 }
1130
1131 bool IncreaseRunNumber(uint32_t run)
1132 {
1133 if (!InitRunNumber())
1134 return false;
1135
1136 if (run<fRunNumber)
1137 {
1138 ostringstream msg;
1139 msg <<
1140 "Run number " << run << " smaller than next available "
1141 "run number " << fRunNumber << " in " << fPath << " [" << fNightAsInt << "]";
1142 fMsg.Error(msg);
1143 return false;
1144 }
1145
1146 fRunNumber = run;
1147
1148 return true;
1149 }
1150
1151 void gotNewRun(uint32_t runnr, PEVNT_HEADER */*headers*/)
1152 {
1153 // This function is called even when writing is switched off
1154 set<uint32_t>::iterator it = fIsRunStarted.begin();
1155 while (it!=fIsRunStarted.end())
1156 {
1157 if (*it<runnr)
1158 {
1159 ostringstream str;
1160 str << "gotNewRun - Missed run " << *it << ".";
1161 fMsg.Info(str);
1162
1163 fIsRunStarted.erase(it++);
1164 continue;
1165 }
1166 if (*it==runnr)
1167 break;
1168 it++;
1169 }
1170 if (it==fIsRunStarted.end())
1171 {
1172 ostringstream str;
1173 str << "gotNewRun - Not waiting for run " << runnr << ".";
1174 fMsg.Warn(str);
1175 return;
1176 }
1177
1178 map<uint32_t,FAD::RunDescription>::iterator i2 = fExpectedRuns.find(runnr);
1179 if (i2==fExpectedRuns.end())
1180 {
1181 ostringstream str;
1182 str << "gotNewRun - Run " << runnr << " wasn't expected.";
1183 fMsg.Warn(str);
1184 return;
1185 }
1186
1187 CloseRunFile(runnr, time(NULL)+i2->second.maxtime, i2->second.maxevt);
1188 // return: 0=close scheduled / >0 already closed / <0 does not exist
1189
1190 // FIXME: Move configuration from expected runs to runs which will soon
1191 // be opened/closed
1192
1193 fIsRunStarted.erase(it);
1194 }
1195
1196 map<boost::thread::id, string> fLastMessage;
1197
1198 void factOut(int severity, int err, const char *message)
1199 {
1200 if (!fDebugLog && severity==99)
1201 return;
1202
1203 ostringstream str;
1204 //str << boost::this_thread::get_id() << " ";
1205 str << "EventBuilder(";
1206 if (err<0)
1207 str << "---";
1208 else
1209 str << err;
1210 str << "): " << message;
1211
1212 string &old = fLastMessage[boost::this_thread::get_id()];
1213
1214 if (str.str()==old)
1215 return;
1216 old = str.str();
1217
1218 fMsg.Update(str, severity);
1219 }
1220
1221/*
1222 void factStat(int64_t *stat, int len)
1223 {
1224 if (len!=7)
1225 {
1226 fMsg.Warn("factStat received unknown number of values.");
1227 return;
1228 }
1229
1230 vector<int64_t> data(1, g_maxMem);
1231 data.insert(data.end(), stat, stat+len);
1232
1233 static vector<int64_t> last(8);
1234 if (data==last)
1235 return;
1236 last = data;
1237
1238 fDimStatistics.Update(data);
1239
1240 // len ist die Laenge des arrays.
1241 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
1242 // kannst Du pruefen, ob die 100MB voll sind ....
1243
1244 ostringstream str;
1245 str
1246 << "Wait=" << stat[0] << " "
1247 << "Skip=" << stat[1] << " "
1248 << "Del=" << stat[2] << " "
1249 << "Tot=" << stat[3] << " "
1250 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1251 << "Read=" << stat[5] << " "
1252 << "Conn=" << stat[6];
1253
1254 fMsg.Info(str);
1255 }
1256 */
1257
1258 void factStat(const EVT_STAT &/*stat*/)
1259 {
1260 //fDimStatistics2.Update(stat);
1261 }
1262
1263 void SendStat()
1264 {
1265 // Performance could be increased signaling the thread to wake up
1266 while (!fThreadDimQueueStop)
1267 {
1268 if (fDimQueue1.size()==0)
1269 {
1270 usleep(1000);
1271 continue;
1272 }
1273
1274 fMutexDimQueue1.lock();
1275 const pair<Time,GUI_STAT> stat = fDimQueue1.front();
1276 fDimQueue1.pop_front();
1277 fMutexDimQueue1.unlock();
1278
1279 fDimStatistics1.setData(&stat.second, sizeof(GUI_STAT));
1280 fDimStatistics1.Update(stat.first);
1281 }
1282 }
1283
1284 void factStat(const GUI_STAT &stat)
1285 {
1286 const lock_guard<mutex> guard(fMutexDimQueue1);
1287 fDimQueue1.push_back(make_pair(Time(), stat));
1288 }
1289
1290 void factReportIncomplete(uint64_t rep)
1291 {
1292 fDimIncomplete.setQuality(1);
1293 fDimIncomplete.Update(rep);
1294 }
1295
1296 array<FAD::EventHeader, 40> fVecHeader;
1297
1298 template<typename T, class S>
1299 array<T, 42> Compare(const S *vec, const T *t)
1300 {
1301 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1302
1303 const T *min = NULL;
1304 const T *val = NULL;
1305 const T *max = NULL;
1306
1307 array<T, 42> arr;
1308
1309 // bool rc = true;
1310 for (int i=0; i<40; i++)
1311 {
1312 const char *base = reinterpret_cast<const char*>(vec+i);
1313 const T *ref = reinterpret_cast<const T*>(base+offset);
1314
1315 arr[i] = *ref;
1316
1317 if (gi_NumConnect[i]!=7)
1318 {
1319 arr[i] = 0;
1320 continue;
1321 }
1322
1323 if (!val)
1324 {
1325 min = ref;
1326 val = ref;
1327 max = ref;
1328 }
1329
1330 if (*ref<*min)
1331 min = ref;
1332
1333 if (*ref>*max)
1334 max = ref;
1335
1336 // if (*val!=*ref)
1337 // rc = false;
1338 }
1339
1340 arr[40] = val ? *min : 1;
1341 arr[41] = val ? *max : 0;
1342
1343 return arr;
1344 }
1345
1346 template<typename T>
1347 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1348 {
1349 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1350
1351 T val = 0;
1352 T rc = 0;
1353
1354 array<T, 42> vec;
1355
1356 bool first = true;
1357
1358 for (int i=0; i<40; i++)
1359 {
1360 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1361 const T *ref = reinterpret_cast<const T*>(base+offset);
1362
1363 vec[i+2] = *ref;
1364
1365 if (gi_NumConnect[i]!=7)
1366 {
1367 vec[i+2] = 0;
1368 continue;
1369 }
1370
1371 if (first)
1372 {
1373 first = false;
1374 val = *ref;
1375 rc = 0;
1376 }
1377
1378 rc |= val^*ref;
1379 }
1380
1381 vec[0] = rc;
1382 vec[1] = val;
1383
1384 return vec;
1385 }
1386
1387 template<typename T, size_t N>
1388 void Update(DimDescribedService &svc, const array<T, N> &data, const Time &t=Time(), int n=N)
1389 {
1390// svc.setQuality(vec[40]<=vec[41]);
1391 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1392 svc.Update(t);
1393 }
1394
1395 template<typename T>
1396 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1397 {
1398 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1399 for (int i=0; i<40;i++)
1400 cout << " " << data.second[i+3];
1401 cout << endl;
1402 }
1403
1404 vector<uint> fNumConnected;
1405
1406 void ProcHeader()
1407 {
1408 // Performance could be increased signaling the thread to wake up
1409 while (!fThreadDimQueueStop)
1410 {
1411 if (fDimQueue2.size()==0)
1412 {
1413 usleep(100);
1414 continue;
1415 }
1416
1417 fMutexDimQueue2.lock();
1418 const auto dat = fDimQueue2.front();
1419 fDimQueue2.pop_front();
1420 fMutexDimQueue2.unlock();
1421
1422 const Time &t = get<0>(dat);
1423 const bool changed = get<1>(dat);
1424 const FAD::EventHeader &h = get<2>(dat);
1425
1426 const FAD::EventHeader old = fVecHeader[h.Id()];
1427 fVecHeader[h.Id()] = h;
1428
1429 if (old.fVersion != h.fVersion || changed)
1430 {
1431 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1432
1433 array<float,42> data;
1434 for (int i=0; i<42; i++)
1435 {
1436 ostringstream str;
1437 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1438 data[i] = stof(str.str());
1439 }
1440 Update(fDimFwVersion, data, t);
1441 }
1442
1443 if (old.fRunNumber != h.fRunNumber || changed)
1444 {
1445 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1446 fDimRunNumber.setData(&run[0], 42*sizeof(uint32_t));
1447 fDimRunNumber.Update(t);
1448 }
1449
1450 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1451 {
1452 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1453 fDimPrescaler.setData(&pre[0], 42*sizeof(uint16_t));
1454 fDimPrescaler.Update(t);
1455 }
1456
1457 if (old.fDNA != h.fDNA || changed)
1458 {
1459 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1460 Update(fDimDNA, dna, t, 40);
1461 }
1462
1463 if (old.fStatus != h.fStatus || changed)
1464 {
1465 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1466 Update(fDimStatus, sts, t);
1467 }
1468
1469 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1470 {
1471 array<uint16_t, FAD::kNumDac*42> dacs;
1472
1473 for (int i=0; i<FAD::kNumDac; i++)
1474 {
1475 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1476 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1477 }
1478
1479 Update(fDimDac, dacs, t);
1480 }
1481
1482 // -----------
1483
1484 static Time oldt(boost::date_time::neg_infin);
1485 Time newt;
1486
1487 if (newt>oldt+boost::posix_time::seconds(1))
1488 {
1489 oldt = newt;
1490
1491 // --- RefClock
1492
1493 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1494 Update(fDimRefClock, clk, t);
1495
1496 // --- Temperatures
1497
1498 const array<int16_t,42> tmp[4] =
1499 {
1500 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1501 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1502 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1503 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1504 };
1505
1506 vector<int16_t> data;
1507 data.reserve(82);
1508 data.push_back(tmp[0][40]); // min: 0
1509 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1510 data.push_back(tmp[0][41]); // max: 41
1511 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1512
1513 for (int j=1; j<=3; j++)
1514 {
1515 const array<int16_t,42> &ref = tmp[j];
1516
1517 // Gloabl min
1518 if (ref[40]<data[0]) // 40=min
1519 data[0] = ref[40];
1520
1521 // Global max
1522 if (ref[41]>data[41]) // 41=max
1523 data[41] = ref[41];
1524
1525 for (int i=0; i<40; i++)
1526 {
1527 // min per board
1528 if (ref[i]<data[i+1]) // data: 1-40
1529 data[i+1] = ref[i]; // ref: 0-39
1530
1531 // max per board
1532 if (ref[i]>data[i+42]) // data: 42-81
1533 data[i+42] = ref[i]; // ref: 0-39
1534 }
1535 }
1536
1537 vector<float> deg(82); // 0: global min, 1-40: min
1538 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1539 deg[i] = data[i]/16.;
1540
1541 fDimTemperature.setData(deg.data(), 82*sizeof(float));
1542 fDimTemperature.Update(t);
1543 }
1544
1545 }
1546 }
1547
1548 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1549 {
1550 const uint16_t id = h.Id();
1551 if (id>39)
1552 return;
1553
1554 if (fNumConnected.size()!=40)
1555 fNumConnected.resize(40);
1556
1557 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1558
1559 const bool changed = con!=fNumConnected || !IsThreadRunning();
1560
1561 fNumConnected = con;
1562
1563 const lock_guard<mutex> guard(fMutexDimQueue2);
1564 fDimQueue2.push_back(make_tuple(Time(), changed, h));
1565 }
1566};
1567
1568EventBuilderWrapper *EventBuilderWrapper::This = 0;
1569
1570// ----------- Event builder callbacks implementation ---------------
1571extern "C"
1572{
1573 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1574 {
1575 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1576 }
1577
1578 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1579 {
1580 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1581 }
1582
1583 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1584 {
1585 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1586 }
1587
1588 // -----
1589
1590 //void *runStart(uint32_t /*irun*/, RUN_HEAD */*runhd*/, size_t /*len*/)
1591 //{
1592 // return NULL;
1593 //}
1594
1595 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
1596 {
1597 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
1598 }
1599
1600 int runEnd(uint32_t, void */*runPtr*/)
1601 {
1602 return 0;
1603 }
1604
1605 // -----
1606
1607 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
1608 {
1609 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
1610 }
1611
1612 void gotNewRun(uint32_t runnr, PEVNT_HEADER *headers)
1613 {
1614 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1615 }
1616
1617 // -----
1618
1619 void factOut(int severity, int err, const char *message)
1620 {
1621 EventBuilderWrapper::This->factOut(severity, err, message);
1622 }
1623
1624 void factStat(GUI_STAT stat)
1625 {
1626 EventBuilderWrapper::This->factStat(stat);
1627 }
1628
1629 void factStatNew(EVT_STAT stat)
1630 {
1631 EventBuilderWrapper::This->factStat(stat);
1632 }
1633
1634 void factReportIncomplete (uint64_t rep)
1635 {
1636 EventBuilderWrapper::This->factReportIncomplete(rep);
1637 }
1638
1639 // ------
1640
1641 void debugHead(int socket, int/*board*/, void *buf)
1642 {
1643 const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf);
1644 EventBuilderWrapper::This->debugHead(socket, h);
1645 }
1646
1647 void debugStream(int isock, void *buf, int len)
1648 {
1649 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1650 }
1651
1652 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1653 {
1654 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1655 }
1656}
1657
1658#endif
Note: See TracBrowser for help on using the repository browser.