source: trunk/FACT++/src/EventBuilderWrapper.h@ 15851

Last change on this file since 15851 was 15610, checked in by tbretz, 11 years ago
Decoupled updating ROI from eventCheck.
File size: 52.5 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14#include <condition_variable>
15
16#include "DimWriteStatistics.h"
17
18#include "DataCalib.h"
19#include "DataWriteRaw.h"
20
21#ifdef HAVE_FITS
22#include "DataWriteFits.h"
23#else
24#define DataWriteFits DataWriteFits2
25#endif
26
27#include "DataWriteFits2.h"
28
29#include "queue.h"
30
31namespace ba = boost::asio;
32namespace bs = boost::system;
33namespace fs = boost::filesystem;
34
35using ba::ip::tcp;
36
37using namespace std;
38
39// ========================================================================
40
41#include "EventBuilder.h"
42
43extern "C" {
44 extern void StartEvtBuild();
45 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
46}
47
48// ========================================================================
49
50class EventBuilderWrapper
51{
52public:
53 // FIXME
54 static EventBuilderWrapper *This;
55
56 MessageImp &fMsg;
57
58private:
59 boost::thread fThreadMain;
60
61 enum CommandStates_t // g_runStat
62 {
63 kAbort = -2, // quit as soon as possible ('abort')
64 kExit = -1, // stop reading, quit when buffered events done ('exit')
65 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
66 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
67 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
68 kModeFlush = 10, // read data from camera, but skip them ('flush')
69 kModeTest = 20, // read data and process them, but do not write to disk ('test')
70 kModeFlag = 30, // read data, process and write all to disk ('flag')
71 kModeRun = 40, // read data, process and write selected to disk ('run')
72 };
73
74 enum
75 {
76 kCurrent = 0,
77 kTotal = 1,
78 kEventId = 2,
79 kTriggerId = 3,
80 };
81
82 FAD::FileFormat_t fFileFormat;
83
84 uint32_t fMaxRun;
85 uint32_t fLastOpened;
86 uint32_t fLastClosed;
87 array<uint32_t,4> fNumEvts;
88
89 DimWriteStatistics fDimWriteStats;
90 DimDescribedService fDimRuns;
91 DimDescribedService fDimEvents;
92 DimDescribedService fDimRawData;
93 DimDescribedService fDimEventData;
94 DimDescribedService fDimFeedbackData;
95 DimDescribedService fDimFwVersion;
96 DimDescribedService fDimRunNumber;
97 DimDescribedService fDimStatus;
98 DimDescribedService fDimDNA;
99 DimDescribedService fDimTemperature;
100 DimDescribedService fDimPrescaler;
101 DimDescribedService fDimRefClock;
102 DimDescribedService fDimRoi;
103 DimDescribedService fDimDac;
104 DimDescribedService fDimDrsRuns;
105 DimDescribedService fDimDrsCalibration;
106 DimDescribedService fDimStatistics1;
107 //DimDescribedService fDimStatistics2;
108 DimDescribedService fDimFileFormat;
109 DimDescribedService fDimIncomplete;
110
111 Queue<pair<Time,GUI_STAT>> fDimQueue1;
112 Queue<tuple<Time,bool,FAD::EventHeader>> fDimQueue2;
113 Queue<pair<Time,array<uint32_t,4>>> fDimQueue3;
114 Queue<pair<Time,array<uint16_t,2>>> fDimQueue4;
115
116 bool fDebugStream;
117 bool fDebugRead;
118 bool fDebugLog;
119
120 string fPath;
121 uint64_t fNightAsInt;
122 uint32_t fRunNumber;
123
124protected:
125 bool InitRunNumber(const string &path="")
126 {
127 if (!path.empty())
128 {
129 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
130 {
131 fMsg.Error("Data path "+path+" does not exist!");
132 return false;
133 }
134
135 fPath = path;
136 fDimWriteStats.SetCurrentFolder(fPath);
137
138 fMsg.Info("Data path set to "+path+".");
139 }
140
141 // Get current night
142 const uint64_t night = Time().NightAsInt();
143 if (night==fNightAsInt)
144 return true;
145
146 // Check for run numbers
147 fRunNumber = 1000;
148
149 while (--fRunNumber>0)
150 {
151 const string name = DataProcessorImp::FormFileName(fPath, night, fRunNumber, "");
152
153 if (access((name+"bin").c_str(), F_OK) == 0)
154 break;
155 if (access((name+"fits").c_str(), F_OK) == 0)
156 break;
157 if (access((name+"drs.fits").c_str(), F_OK) == 0)
158 break;
159 }
160
161 // This is now the first file which does not exist
162 fRunNumber++;
163 fLastOpened = 0;
164
165 // Check if we have exceeded the maximum
166 if (fRunNumber==1000)
167 {
168 fMsg.Error("You have a file with run-number 1000 in "+fPath+" ["+to_string(night)+"]");
169 return false;
170 }
171
172 ostringstream str;
173 if (fNightAsInt==0)
174 str << "First night...";
175 else
176 str << "Night has changd from " << fNightAsInt << "... new";
177 str << " run-number is " << night << "-" << setfill('0') << setw(3) << fRunNumber << " [" << (fPath.empty()?".":fPath) << "]";
178 fMsg.Message(str);
179
180 fNightAsInt = night;
181
182 return true;
183 }
184
185public:
186 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
187 fFileFormat(FAD::kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
188 fDimWriteStats ("FAD_CONTROL", imp),
189 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C",
190 "Run files statistics"
191 "|stats[int]:num of open files, min/max run no, last opened or closed run"
192 "|file[string]:filename of last opened file"),
193 fDimEvents ("FAD_CONTROL/EVENTS", "I:4",
194 "Event counts"
195 "|evtsCount[int]:Num evts cur. run, total (all run), evt ID, trig. Num"),
196 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;S:1;I:1;I:1;S:1;I:1;C:4;I:1;I:2;I:40;S:1440;S:160;F",
197 "|roi[uint16]:number of samples per pixel"
198 "|roi_tm[uint16]:number of samples per time-marker channel"
199 "|num_fad[uint32]:event number from FADs"
200 "|num_ftm[uint32]:trigger number from FTM"
201 "|type[uint16]:trigger type from FTM"
202 "|num_boards[uint32]:number of active boards"
203 "|error[uint8]:event builder error counters"
204 "|dummy[]:"
205 "|time[uint32]:PC time as unix time stamp"
206 "|time_board[uint32]:Time stamp of FAD boards"
207 "|start_pix[int16]:start sample of pixels"
208 "|start_tm[int16]:start sample of time marker channels"
209 "|adc[int16]:adc data"),
210 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", "|avg:|rms:|max:|pos"),
211 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
212 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42",
213 "Firmware version number of fad boards"
214 "|firmware[float]:Version number of firmware, for each board. 40=min, 41=max"),
215 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42",
216 "Run numbers coming from FAD boards"
217 "|runNumbers[int]:current run number of each FAD board. 40=min, 41=max"),
218 fDimStatus ("FAD_CONTROL/STATUS", "S:42",
219 "Status of FAD boards"
220 "|status[bitpattern]:Status of each FAD board. Maybe buggy"),
221 fDimDNA ("FAD_CONTROL/DNA", "X:40",
222 "DNA of FAD boards"
223 "|DNA[hex]:Hex identifier of each FAD board"),
224 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82",
225 "FADs temperatures"
226 "|temp[deg. C]:0 global min, 1-40 min, 41 global max, 42-81 max"),
227 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42",
228 "Trigger generator prescaler of fad boards"
229 "|prescaler[int]:Trigger generator prescaler value, for each board"),
230 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42",
231 "Reference clock of FAD boards"
232 "|refClocks[t]:ref clocks of FAD boards. 40=min, 41=max"),
233 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", "roi:|roi_rm:"),
234 fDimDac ("FAD_CONTROL/DAC", "S:336",
235 "DAC settings of each FAD board"
236 "|DAC[int]:DAC counts, sequentially DAC 0 board 0, 0/1, 0/2... (plus min max)"),
237 fDimDrsRuns ("FAD_CONTROL/DRS_RUNS", "I:1;I:3",
238 "|roi:Region of interest of secondary baseline"
239 "|run:Run numbers of DRS runs (0=none)"),
240 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:1;I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:163840;F:163840",
241 "|roi:Region of interest of secondary baseline"
242 "|run:Run numbers of DRS runs (0=none)"),
243 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;X:40",
244 "Event Builder status for GUI display"
245 "|threadInfo[int]:Number of read, proc and writes"
246 "|bufferInfo[int]:Events in buffer, incomp., comp., tot., max past cycle, total"
247 "|memInfo[int]:total buf. mem, used mem, max used, max past cycle"
248 "|EvtCnt[int]:Number of events skipped, written, with errors"
249 "|badRoi[int]:Num boards with wrong ROI in event, run or board"
250 "|badRoiBoard[int]:Num boards with wrong ROI"
251 "|deltaT[ms]:Time in ms for rates"
252 "|rateNew[int]:Number of new start events received"
253 "|numConn[int]:Number of connections per board"
254 "|rateBytes[int]:Bytes read this cycle"
255 "|totBytes[int]:Bytes read (counter)"),
256 /*fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40",
257 "Event Builder status, events oriented"
258 "|reset[int]:If increased, reset all counters"
259 "|numRead[int]:How often sucessful read from N sockets per loop"
260 "|gotByte[int]:number of bytes read per board"
261 "|gotErr[int]:number of com. errors per board"
262 "|evtStat[int]:number of evts read, completed, with errors, incomplete"
263 "|procStat[int]:num. of evts proc., w probs, acc. or rej. by SW trigger"
264 "|feedStat[int]:number of evts used or rejected by feedback system"
265 "|wrtStat[int]:number of evts written to disk, with errors"
266 "|runStat[int]:number of run opened, closed, with open or close errors"
267 "|numConn[int]:number of sockets successfully opened per board"),*/
268 fDimFileFormat("FAD_CONTROL/FILE_FORMAT", "S:1", "|format[int]:Current file format"),
269 fDimIncomplete("FAD_CONTROL/INCOMPLETE", "X:1", "|incomplete[bits]:One bit per board"),
270 // It is important to instantiate them after the DimServices
271 fDimQueue1(std::bind(&EventBuilderWrapper::factStatSend, this, placeholders::_1)),
272 fDimQueue2(std::bind(&EventBuilderWrapper::procHeader, this, placeholders::_1)),
273 fDimQueue3(std::bind(&EventBuilderWrapper::updateEvents, this, placeholders::_1)),
274 fDimQueue4(std::bind(&EventBuilderWrapper::updateRoi, this, placeholders::_1)),
275 fDebugStream(false), fDebugRead(false), fDebugLog(false), fNightAsInt(0)
276 {
277 if (This)
278 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
279
280 This = this;
281
282 memset(fNumEvts.data(), 0, sizeof(fNumEvts));
283 fDimEvents.Update(fNumEvts);
284
285 for (size_t i=0; i<40; i++)
286 ConnectSlot(i, tcp::endpoint());
287 }
288
289 virtual ~EventBuilderWrapper()
290 {
291 Abort();
292
293 // FIXME: Used timed_join and abort afterwards
294 // What's the maximum time the eb need to abort?
295 fThreadMain.join();
296
297 //ffMsg.Info("EventBuilder stopped.");
298
299 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
300 delete *it;
301 }
302
303 set<uint32_t> fIsRunStarted;
304 map<uint32_t, FAD::RunDescription> fExpectedRuns;
305
306 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
307 {
308 if (maxtime<=0 || maxtime>24*60*60)
309 maxtime = 24*60*60;
310 if (maxevt<=0 || maxevt>INT32_MAX)
311 maxevt = INT32_MAX;
312
313 const FAD::RunDescription descr =
314 {
315 uint32_t(maxtime),
316 uint32_t(maxevt),
317 ref.first,
318 ref.second,
319 };
320
321 if (!InitRunNumber())
322 return 0;
323
324 // FIMXE: Maybe reset an event counter so that the mcp can count events?
325
326 //fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
327
328 fExpectedRuns[fRunNumber] = descr;
329 fIsRunStarted.insert(fRunNumber);
330 return fRunNumber++;
331 }
332
333 bool IsThreadRunning()
334 {
335 return !fThreadMain.timed_join(boost::posix_time::microseconds(0));
336 }
337
338 void SetMaxMemory(unsigned int mb) const
339 {
340 /*
341 if (mb*1000000<GetUsedMemory())
342 {
343 // ffMsg.Warn("...");
344 return;
345 }*/
346
347 g_maxMem = size_t(mb)*1000000;
348 }
349
350 void StartThread(const vector<tcp::endpoint> &addr)
351 {
352 if (IsThreadRunning())
353 {
354 fMsg.Warn("Start - EventBuilder still running");
355 return;
356 }
357
358 fLastMessage.clear();
359
360 for (size_t i=0; i<40; i++)
361 ConnectSlot(i, addr[i]);
362
363 g_runStat = kModeRun;
364 g_maxProc = 1;
365
366 fMsg.Message("Starting EventBuilder thread");
367
368 fThreadMain = boost::thread(StartEvtBuild);
369 }
370 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
371 {
372 if (i>39)
373 return;
374
375 if (addr==tcp::endpoint())
376 {
377 DisconnectSlot(i);
378 return;
379 }
380
381 struct sockaddr_in sockaddr; //IP for each socket
382 sockaddr.sin_family = AF_INET;
383 sockaddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
384 sockaddr.sin_port = htons(addr.port());
385 memcpy(&g_port[i].sockAddr, &sockaddr, sizeof(struct sockaddr_in));
386
387 // In this order
388 g_port[i].sockDef = 1;
389
390 fDimIncomplete.setQuality(0);
391 fDimIncomplete.Update(uint64_t(0));
392 }
393
394 void DisconnectSlot(unsigned int i)
395 {
396 if (i>39)
397 return;
398
399 g_port[i].sockDef = 0;
400 // In this order
401
402 struct sockaddr_in addr; //IP for each socket
403 addr.sin_family = AF_INET;
404 addr.sin_addr.s_addr = 0;
405 addr.sin_port = 0;
406 memcpy(&g_port[i].sockAddr, &addr, sizeof(struct sockaddr_in));
407
408 fDimIncomplete.setQuality(0);
409 fDimIncomplete.Update(uint64_t(0));
410 }
411 void IgnoreSlot(unsigned int i)
412 {
413 if (i>39)
414 return;
415 if (g_port[i].sockAddr.sin_port==0)
416 return;
417
418 g_port[i].sockDef = -1;
419 }
420
421
422 void Abort()
423 {
424 fMsg.Message("Signal abort to EventBuilder thread...");
425 g_reset = 1;
426 }
427
428 void ResetThread(bool soft)
429 {
430 /*
431 if (g_reset > 0)
432
433 * suspend reading
434 * reset = g_reset;
435 * g_reset=0
436
437 * reset% 10
438 == 0 leave event Buffers as they are
439 == 1 let all buffers drain (write (incomplete) events)
440 > 1 flush all buffers (do not write buffered events)
441
442 * (reset/10)%10
443 > 0 close all sockets and destroy them (also free the
444 allocated read-buffers)
445 recreate before resuming operation
446 [ this is more than just close/open that can be
447 triggered by e.g. close/open the base-socket ]
448
449 * (reset/100)%10
450 > 0 close all open run-files
451
452 * (reset/1000)
453 sleep so many seconds before resuming operation
454 (does not (yet) take into account time left when waiting
455 for buffers getting empty ...)
456
457 * resume_reading
458
459 */
460 fMsg.Message("Signal reset to EventBuilder thread...");
461 g_reset = soft ? 101 : 102;
462 }
463
464 void Exit()
465 {
466 fMsg.Message("Signal exit to EventBuilder thread...");
467 g_runStat = kExit;
468 }
469
470 /*
471 void Wait()
472 {
473 fThread.join();
474 ffMsg.Message("EventBuilder stopped.");
475 }*/
476
477 void Hybernate() const { g_runStat = kHybernate; }
478 void Sleep() const { g_runStat = kSleep; }
479 void FlushMode() const { g_runStat = kModeFlush; }
480 void TestMode() const { g_runStat = kModeTest; }
481 void FlagMode() const { g_runStat = kModeFlag; }
482 void RunMode() const { g_runStat = kModeRun; }
483
484 // FIXME: To be removed
485 //void SetMode(int mode) const { g_runStat = mode; }
486
487 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
488 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
489 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
490 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
491 int GetNumFilesOpen() const { return fFiles.size(); }
492
493 /*
494 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
495 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
496 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
497 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
498 */
499
500 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
501 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
502
503 void SetOutputFormat(FAD::FileFormat_t f)
504 {
505 const bool changed = f!=fFileFormat;
506
507 fFileFormat = f;
508 fDimFileFormat.Update(uint16_t(f));
509
510 string msg = "File format set to: ";
511 switch (f)
512 {
513 case FAD::kNone: msg += "kNone."; break;
514 case FAD::kDebug: msg += "kDebug."; break;
515 case FAD::kFits: msg += "kFits."; break;
516 case FAD::kCfitsio: msg += "kCfitsio"; break;
517 case FAD::kRaw: msg += "kRaw"; break;
518 case FAD::kCalib:
519 DataCalib::Restart();
520 DataCalib::Update(fDimDrsCalibration, fDimDrsRuns);
521 fMsg.Message("Resetted DRS calibration.");
522 return;
523 }
524
525 if (changed)
526 fMsg.Message(msg);
527 }
528
529 virtual int ResetSecondaryDrsBaseline()
530 {
531 if (DataCalib::ResetTrgOff(fDimDrsCalibration, fDimDrsRuns))
532 {
533 fFileFormat = FAD::kCalib;
534 fDimFileFormat.Update(uint16_t(fFileFormat));
535 fMsg.Message("Resetted DRS calibration for secondary baseline.");
536 }
537 else
538 fMsg.Warn("Could not reset DRS calibration of secondary baseline.");
539
540 return 0;
541 }
542
543 void SetDebugLog(bool b) { fDebugLog = b; }
544
545 void SetDebugStream(bool b)
546 {
547 fDebugStream = b;
548 if (b)
549 return;
550
551 for (int i=0; i<40; i++)
552 {
553 if (!fDumpStream[i].is_open())
554 continue;
555
556 fDumpStream[i].close();
557
558 ostringstream name;
559 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
560 fMsg.Message("Closed file '"+name.str()+"'");
561 }
562 }
563
564 void SetDebugRead(bool b)
565 {
566 fDebugRead = b;
567 if (b || !fDumpRead.is_open())
568 return;
569
570 fDumpRead.close();
571 fMsg.Message("Closed file 'socket_events.txt'");
572 }
573
574// size_t GetUsedMemory() const { return gi_usedMem; }
575
576 void LoadDrsCalibration(const char *fname)
577 {
578 if (!DataCalib::ReadFits(fname, fMsg))
579 return;
580 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
581 DataCalib::Update(fDimDrsCalibration, fDimDrsRuns);
582 }
583
584 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
585
586
587 /*
588 struct OpenFileToDim
589 {
590 int code;
591 char fileName[FILENAME_MAX];
592 };
593
594 SignalRunOpened(runid, filename);
595 // Send num open files
596 // Send runid, (more info about the run?), filename via dim
597
598 SignalEvtWritten(runid);
599 // Send num events written of newest file
600
601 SignalRunClose(runid);
602 // Send new num open files
603 // Send empty file-name if no file is open
604
605 */
606
607 // -------------- Mapped event builder callbacks ------------------
608
609 void UpdateRuns(const string &fname="")
610 {
611 uint32_t values[5] =
612 {
613 static_cast<uint32_t>(fFiles.size()),
614 0xffffffff,
615 0,
616 fLastOpened,
617 fLastClosed
618 };
619
620 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
621 it!=fFiles.end(); it++)
622 {
623 const DataProcessorImp *file = *it;
624
625 if (file->GetRunId()<values[1])
626 values[1] = file->GetRunId();
627
628 if (file->GetRunId()>values[2])
629 values[2] = file->GetRunId();
630 }
631
632 fMaxRun = values[2];
633
634 vector<char> data(sizeof(values)+fname.size()+1);
635 memcpy(data.data(), values, sizeof(values));
636 strcpy(data.data()+sizeof(values), fname.c_str());
637
638 fDimRuns.Update(data);
639 }
640
641 vector<DataProcessorImp*> fFiles;
642
643 void updateEvents(const pair<Time,array<uint32_t,4>> &stat)
644 {
645 fDimEvents.setData(stat.second.data(), sizeof(uint32_t)*4);
646 fDimEvents.Update(stat.first);
647 }
648
649 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
650 {
651 //fMsg.Info(" ==> TODO: Update run configuration in database!");
652
653 map<uint32_t,FAD::RunDescription>::iterator it = fExpectedRuns.begin();
654 while (it!=fExpectedRuns.end())
655 {
656 if (it->first<runid)
657 {
658 ostringstream str;
659 str << "runOpen - Missed run " << it->first << ".";
660 fMsg.Info(str);
661
662 fExpectedRuns.erase(it++);
663 continue;
664 }
665 if (it->first==runid)
666 break;
667 it++;
668 }
669
670 FAD::RunDescription desc;
671
672 if (it==fExpectedRuns.end())
673 {
674 ostringstream str;
675 str << "runOpen - Run " << runid << " wasn't expected (maybe manual triggers)";
676 fMsg.Warn(str);
677 }
678 else
679 {
680 desc = it->second;
681 fExpectedRuns.erase(it);
682 }
683
684 // Check if file already exists...
685 DataProcessorImp *file = 0;
686 switch (fFileFormat)
687 {
688 case FAD::kNone: file = new DataDump(fPath, fNightAsInt, runid, fMsg); break;
689 case FAD::kDebug: file = new DataDebug(fPath, fNightAsInt, runid, fMsg); break;
690 case FAD::kCfitsio: file = new DataWriteFits(fPath, fNightAsInt, runid, fMsg); break;
691 case FAD::kFits: file = new DataWriteFits2(fPath, fNightAsInt, runid, fMsg); break;
692 case FAD::kRaw: file = new DataWriteRaw(fPath, fNightAsInt, runid, fMsg); break;
693 case FAD::kCalib: file = new DataCalib(fPath, fNightAsInt, runid, fDimDrsCalibration, fDimDrsRuns, fMsg); break;
694 }
695
696 try
697 {
698 if (!file->Open(h, desc))
699 return 0;
700 }
701 catch (const exception &e)
702 {
703 fMsg.Error("Exception trying to open file: "+string(e.what()));
704 return 0;
705 }
706
707 fFiles.push_back(file);
708
709 ostringstream str;
710 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
711 fMsg.Info(str);
712
713 fDimWriteStats.FileOpened(file->GetFileName());
714
715 fLastOpened = runid;
716 UpdateRuns(file->GetFileName());
717
718 fNumEvts[kEventId] = 0;
719 fNumEvts[kTriggerId] = 0;
720
721 fNumEvts[kCurrent] = 0;
722 fDimQueue3.post(make_pair(Time(), fNumEvts));
723 // fDimCurrentEvent.Update(uint32_t(0));
724
725 return reinterpret_cast<FileHandle_t>(file);
726 }
727
728 int runWrite(FileHandle_t handler, EVENT *e, size_t /*sz*/)
729 {
730 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
731
732 if (!file->WriteEvt(e))
733 return -1;
734
735 if (file->GetRunId()==fMaxRun)
736 {
737 fNumEvts[kCurrent]++;
738 fNumEvts[kEventId] = e->EventNum;
739 fNumEvts[kTriggerId] = e->TriggerNum;
740 }
741
742 fNumEvts[kTotal]++;
743
744 static Time oldt(boost::date_time::neg_infin);
745 Time newt;
746 if (newt>oldt+boost::posix_time::seconds(1))
747 {
748 fDimQueue3.post(make_pair(Time(), fNumEvts));
749 //fDimEvents.Update(fNumEvts);
750 oldt = newt;
751 }
752
753
754 // ===> SignalEvtWritten(runid);
755 // Send num events written of newest file
756
757 /* close run runId (all all runs if runId=0) */
758 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
759 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
760
761 return 0;
762 }
763
764 virtual void CloseRun(uint32_t /*runid*/) { }
765
766 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
767 {
768 //fMsg.Info(" ==> TODO: Update run configuration in database!");
769
770 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
771
772 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
773 if (it==fFiles.end())
774 {
775 ostringstream str;
776 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
777 fMsg.Fatal(str);
778 return -1;
779 }
780
781 /*
782 fFiles.erase(it);
783
784 fLastClosed = file->GetRunId();
785 CloseRun(fLastClosed);
786 UpdateRuns();
787
788 fDimEvents.Update(fNumEvts);
789 */
790
791 const bool rc = file->Close(tail);
792 if (!rc)
793 {
794 // Error message
795 }
796
797 // Note that this is the signal for the fadctrl to change from
798 // WritingData back to Connected. If this is done too early,
799 // a new run might be started before this is closed. This is
800 // faster, but leads to problems with the DRS calibration
801 // if the system is fast enough to start the new run before
802 // this one has really been closed.
803 fFiles.erase(it);
804
805 fLastClosed = file->GetRunId();
806 CloseRun(fLastClosed);
807 UpdateRuns();
808
809 fDimQueue3.post(make_pair(Time(),fNumEvts));
810 //fDimEvents.Update(fNumEvts);
811
812
813 ostringstream str;
814 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
815 fMsg.Info(str);
816
817 delete file;
818
819 // ==> SignalRunClose(runid);
820 // Send new num open files
821 // Send empty file-name if no file is open
822
823 return rc ? 0 : -1;
824 }
825
826 ofstream fDumpStream[40];
827
828 void debugStream(int isock, void *buf, int len)
829 {
830 if (!fDebugStream)
831 return;
832
833 const int slot = isock/7;
834 if (slot<0 || slot>39)
835 return;
836
837 if (!fDumpStream[slot].is_open())
838 {
839 ostringstream name;
840 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
841
842 fDumpStream[slot].open(name.str().c_str(), ios::app);
843 if (!fDumpStream[slot])
844 {
845 ostringstream str;
846 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
847 fMsg.Error(str);
848
849 return;
850 }
851
852 fMsg.Message("Opened file '"+name.str()+"' for writing.");
853 }
854
855 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
856 }
857
858 ofstream fDumpRead; // Stream to possibly dump docket events
859
860 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
861 {
862 // isock = socketID (0-279)
863 // ibyte = #bytes gelesen
864 // event = eventId (oder 0 wenn noch nicht bekannt)
865 // state : 1=finished reading data
866 // 0=reading data
867 // -1=start reading data (header)
868 // -2=start reading data,
869 // eventId not known yet (too little data)
870 // tsec, tusec = time when reading seconds, microseconds
871 //
872 if (!fDebugRead || ibyte==0)
873 return;
874
875 if (!fDumpRead.is_open())
876 {
877 fDumpRead.open("socket_events.txt", ios::app);
878 if (!fDumpRead)
879 {
880 ostringstream str;
881 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
882 fMsg.Error(str);
883
884 return;
885 }
886
887 fMsg.Message("Opened file 'socket_events.txt' for writing.");
888
889 fDumpRead << "# START: " << Time().GetAsStr() << endl;
890 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
891 }
892
893 fDumpRead
894 << setw(2) << state << " "
895 << setw(8) << tsec << " "
896 << setw(9) << tusec << " "
897 << setw(3) << isock << " "
898 << setw(2) << isock/7 << " "
899 << runno << " "
900 << event << " "
901 << ftmevt << " "
902 << ibyte << endl;
903 }
904
905 array<uint16_t,2> fVecRoi;
906
907 void updateRoi(const pair<Time, array<uint16_t,2>> &roi)
908 {
909 fDimRoi.setData(roi.second.data(), sizeof(uint16_t)*2);
910 fDimRoi.Update(roi.first);
911 }
912
913 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int /*iboard*/)
914 {
915 /*
916 fadhd[i] ist ein array mit den 40 fad-headers
917 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
918
919 event ist die Struktur, die auch die write routine erhaelt;
920 darin sind im header die 'soll-werte' fuer z.B. eventID
921 als auch die ADC-Werte (falls Du die brauchst)
922
923 Wenn die routine einen negativen Wert liefert, wird das event
924 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
925 */
926
927 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
928
929 if (roi!=fVecRoi)
930 {
931 fDimQueue4.post(make_pair(Time(), roi));
932 fVecRoi = roi;
933 }
934
935 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
936 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
937
938 // FIMXE: Compare with target configuration
939
940 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
941 {
942 // FIXME: Compare with expectations!!!
943 if (ptr->fStartDelimiter==0)
944 {
945 if (ptr==beg)
946 beg++;
947 continue;
948 }
949
950 if (beg->fStatus != ptr->fStatus)
951 {
952 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
953 CloseRunFile(runNr, 0, 0);
954 return -1;
955 }
956
957 if (beg->fRunNumber != ptr->fRunNumber)
958 {
959 fMsg.Error("Inconsistent run number detected.... closing run.");
960 CloseRunFile(runNr, 0, 0);
961 return -1;
962 }
963
964 /*
965 if (beg->fVersion != ptr->fVersion)
966 {
967 Error("Inconsist firmware version detected.... closing run.");
968 CloseRunFile(runNr, 0, 0);
969 break;
970 }
971 */
972 if (beg->fEventCounter != ptr->fEventCounter)
973 {
974 fMsg.Error("Inconsistent FAD event number detected.... closing run.");
975 CloseRunFile(runNr, 0, 0);
976 return -1;
977 }
978
979 if (beg->fTriggerCounter != ptr->fTriggerCounter)
980 {
981 fMsg.Error("Inconsistent FTM trigger number detected.... closing run.");
982 CloseRunFile(runNr, 0, 0);
983 return -1;
984 }
985
986 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
987 {
988 fMsg.Error("Inconsistent phase shift detected.... closing run.");
989 CloseRunFile(runNr, 0, 0);
990 return -1;
991 }
992
993 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
994 {
995 fMsg.Error("Inconsistent DAC values detected.... closing run.");
996 CloseRunFile(runNr, 0, 0);
997 return -1;
998 }
999
1000 if (beg->fTriggerType != ptr->fTriggerType)
1001 {
1002 fMsg.Error("Inconsistent trigger type detected.... closing run.");
1003 CloseRunFile(runNr, 0, 0);
1004 return -1;
1005 }
1006 }
1007
1008 // check REFCLK_frequency
1009 // check consistency with command configuration
1010 // how to log errors?
1011 // need gotNewRun/closedRun to know it is finished
1012
1013 return 0;
1014 }
1015
1016 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
1017 {
1018 // Currently we send any event no matter what its trigger id is...
1019 // To be changed.
1020 static Time oldt(boost::date_time::neg_infin);
1021 Time newt;
1022
1023 static int skip = 0;
1024
1025 // FIXME: Only send events if the have newer run-numbers
1026 if (newt<oldt+boost::posix_time::milliseconds(skip>0 ? 100 : 1000))
1027 return;
1028 oldt = newt;
1029
1030 // Workaround to find a valid header.....
1031 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
1032 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
1033
1034 // FIMXE: Compare with target configuration
1035 const FAD::EventHeader *ptr=beg;
1036 for (; ptr!=end; ptr++)
1037 {
1038 if (ptr->fStartDelimiter!=0)
1039 break;
1040 }
1041 if (ptr==end)
1042 return;
1043
1044
1045 vector<char> data(sizeof(EVENT)+event->Roi*sizeof(float)*(1440+160));
1046 memcpy(data.data(), event, sizeof(EVENT));
1047
1048 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
1049
1050 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
1051 DrsCalibrate::RemoveSpikes(vec, event->Roi);
1052
1053 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
1054 const double max = DrsCalibrate::GetPixelStats(data2.data(), vec, event->Roi);
1055
1056 // Maximum above roughly 5pe
1057 if (ptr->IsTriggerPhys() && max<100 && skip<10)
1058 {
1059 skip++;
1060 return;
1061 }
1062
1063 skip = 0;
1064
1065 fDimRawData.setQuality(ptr->fTriggerType);
1066 fDimRawData.Update(data);
1067
1068 fDimEventData.setQuality(ptr->fTriggerType);
1069 fDimEventData.Update(data2);
1070 }
1071
1072 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
1073 {
1074 /*
1075 if (!DataCalib::IsValid())
1076 return;
1077
1078 // Workaround to find a valid header.....
1079 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
1080 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
1081
1082 // FIMXE: Compare with target configuration
1083
1084 const FAD::EventHeader *ptr=beg;
1085 for (; ptr<end; ptr++)
1086 {
1087 if (ptr->fStartDelimiter!=0)
1088 break;
1089 }
1090
1091 if (ptr==end)
1092 return;
1093
1094 if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
1095 return;
1096
1097 vector<float> data(event->Roi*1440);
1098 DataCalib::Apply(data.data(), event->Adc_Data, event->StartPix, event->Roi);
1099
1100 DrsCalibrate::RemoveSpikes(data.data(), event->Roi);
1101
1102 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
1103 DrsCalibrate::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
1104
1105 fDimFeedbackData.Update(data2);
1106 */
1107 }
1108
1109 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t /*iboard*/, void */*buffer*/)
1110 {
1111 switch (threadID)
1112 {
1113 case 0:
1114 SendRawData(fadhd, event);
1115 return 100;
1116 /*
1117 case 1:
1118 SendFeedbackData(fadhd, event);
1119 return 2;*/
1120 }
1121 return 100;
1122 }
1123
1124
1125 bool IsRunStarted() const
1126 {
1127 const set<uint32_t>::const_iterator it = fIsRunStarted.find(fRunNumber-1);
1128 return it==fIsRunStarted.end();// ? true : it->second.started;
1129 }
1130
1131 uint32_t GetRunNumber() const
1132 {
1133 return fRunNumber;
1134 }
1135
1136 bool IsRunFileOpen()
1137 {
1138 return fLastOpened==fRunNumber-1;
1139 }
1140
1141 bool IncreaseRunNumber(uint32_t run)
1142 {
1143 if (!InitRunNumber())
1144 return false;
1145
1146 if (run<fRunNumber)
1147 {
1148 ostringstream msg;
1149 msg <<
1150 "Run number " << run << " smaller than next available "
1151 "run number " << fRunNumber << " in " << fPath << " [" << fNightAsInt << "]";
1152 fMsg.Error(msg);
1153 return false;
1154 }
1155
1156 fRunNumber = run;
1157
1158 return true;
1159 }
1160
1161 void gotNewRun(uint32_t runnr, PEVNT_HEADER */*headers*/)
1162 {
1163 // This function is called even when writing is switched off
1164 set<uint32_t>::iterator it = fIsRunStarted.begin();
1165 while (it!=fIsRunStarted.end())
1166 {
1167 if (*it<runnr)
1168 {
1169 ostringstream str;
1170 str << "gotNewRun - Missed run " << *it << ".";
1171 fMsg.Info(str);
1172
1173 fIsRunStarted.erase(it++);
1174 continue;
1175 }
1176 if (*it==runnr)
1177 break;
1178 it++;
1179 }
1180 if (it==fIsRunStarted.end())
1181 {
1182 ostringstream str;
1183 str << "gotNewRun - Not waiting for run " << runnr << ".";
1184 fMsg.Warn(str);
1185 return;
1186 }
1187
1188 map<uint32_t,FAD::RunDescription>::iterator i2 = fExpectedRuns.find(runnr);
1189 if (i2==fExpectedRuns.end())
1190 {
1191 ostringstream str;
1192 str << "gotNewRun - Run " << runnr << " wasn't expected.";
1193 fMsg.Warn(str);
1194 return;
1195 }
1196
1197 CloseRunFile(runnr, time(NULL)+i2->second.maxtime, i2->second.maxevt);
1198 // return: 0=close scheduled / >0 already closed / <0 does not exist
1199
1200 // FIXME: Move configuration from expected runs to runs which will soon
1201 // be opened/closed
1202
1203 fIsRunStarted.erase(it);
1204 }
1205
1206 map<boost::thread::id, string> fLastMessage;
1207
1208 void factOut(int severity, int err, const char *message)
1209 {
1210 if (!fDebugLog && severity==99)
1211 return;
1212
1213 ostringstream str;
1214 //str << boost::this_thread::get_id() << " ";
1215 str << "EventBuilder(";
1216 if (err<0)
1217 str << "---";
1218 else
1219 str << err;
1220 str << "): " << message;
1221
1222 string &old = fLastMessage[boost::this_thread::get_id()];
1223
1224 if (str.str()==old)
1225 return;
1226 old = str.str();
1227
1228 fMsg.Update(str, severity);
1229 }
1230
1231/*
1232 void factStat(int64_t *stat, int len)
1233 {
1234 if (len!=7)
1235 {
1236 fMsg.Warn("factStat received unknown number of values.");
1237 return;
1238 }
1239
1240 vector<int64_t> data(1, g_maxMem);
1241 data.insert(data.end(), stat, stat+len);
1242
1243 static vector<int64_t> last(8);
1244 if (data==last)
1245 return;
1246 last = data;
1247
1248 fDimStatistics.Update(data);
1249
1250 // len ist die Laenge des arrays.
1251 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
1252 // kannst Du pruefen, ob die 100MB voll sind ....
1253
1254 ostringstream str;
1255 str
1256 << "Wait=" << stat[0] << " "
1257 << "Skip=" << stat[1] << " "
1258 << "Del=" << stat[2] << " "
1259 << "Tot=" << stat[3] << " "
1260 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1261 << "Read=" << stat[5] << " "
1262 << "Conn=" << stat[6];
1263
1264 fMsg.Info(str);
1265 }
1266 */
1267
1268 void factStat(const EVT_STAT &/*stat*/)
1269 {
1270 //fDimStatistics2.Update(stat);
1271 }
1272
1273 void factStatSend(const pair<Time,GUI_STAT> &stat)
1274 {
1275 fDimStatistics1.setData(&stat.second, sizeof(GUI_STAT));
1276 fDimStatistics1.Update(stat.first);
1277 }
1278
1279 void factStat(const GUI_STAT &stat)
1280 {
1281 fDimQueue1.post(make_pair(Time(), stat));
1282 }
1283
1284 void factReportIncomplete(uint64_t rep)
1285 {
1286 fDimIncomplete.setQuality(1);
1287 fDimIncomplete.Update(rep);
1288 }
1289
1290 array<FAD::EventHeader, 40> fVecHeader;
1291
1292 template<typename T, class S>
1293 array<T, 42> Compare(const S *vec, const T *t)
1294 {
1295 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1296
1297 const T *min = NULL;
1298 const T *val = NULL;
1299 const T *max = NULL;
1300
1301 array<T, 42> arr;
1302
1303 // bool rc = true;
1304 for (int i=0; i<40; i++)
1305 {
1306 const char *base = reinterpret_cast<const char*>(vec+i);
1307 const T *ref = reinterpret_cast<const T*>(base+offset);
1308
1309 arr[i] = *ref;
1310
1311 if (gi_NumConnect[i]!=7)
1312 {
1313 arr[i] = 0;
1314 continue;
1315 }
1316
1317 if (!val)
1318 {
1319 min = ref;
1320 val = ref;
1321 max = ref;
1322 }
1323
1324 if (*ref<*min)
1325 min = ref;
1326
1327 if (*ref>*max)
1328 max = ref;
1329
1330 // if (*val!=*ref)
1331 // rc = false;
1332 }
1333
1334 arr[40] = val ? *min : 1;
1335 arr[41] = val ? *max : 0;
1336
1337 return arr;
1338 }
1339
1340 template<typename T>
1341 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1342 {
1343 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1344
1345 T val = 0;
1346 T rc = 0;
1347
1348 array<T, 42> vec;
1349
1350 bool first = true;
1351
1352 for (int i=0; i<40; i++)
1353 {
1354 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1355 const T *ref = reinterpret_cast<const T*>(base+offset);
1356
1357 vec[i+2] = *ref;
1358
1359 if (gi_NumConnect[i]!=7)
1360 {
1361 vec[i+2] = 0;
1362 continue;
1363 }
1364
1365 if (first)
1366 {
1367 first = false;
1368 val = *ref;
1369 rc = 0;
1370 }
1371
1372 rc |= val^*ref;
1373 }
1374
1375 vec[0] = rc;
1376 vec[1] = val;
1377
1378 return vec;
1379 }
1380
1381 template<typename T, size_t N>
1382 void Update(DimDescribedService &svc, const array<T, N> &data, const Time &t=Time(), int n=N)
1383 {
1384// svc.setQuality(vec[40]<=vec[41]);
1385 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1386 svc.Update(t);
1387 }
1388
1389 template<typename T>
1390 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1391 {
1392 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1393 for (int i=0; i<40;i++)
1394 cout << " " << data.second[i+3];
1395 cout << endl;
1396 }
1397
1398 vector<uint> fNumConnected;
1399
1400 void procHeader(const tuple<Time,bool,FAD::EventHeader> &dat)
1401 {
1402 const Time &t = get<0>(dat);
1403 const bool changed = get<1>(dat);
1404 const FAD::EventHeader &h = get<2>(dat);
1405
1406 const FAD::EventHeader old = fVecHeader[h.Id()];
1407 fVecHeader[h.Id()] = h;
1408
1409 if (old.fVersion != h.fVersion || changed)
1410 {
1411 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1412
1413 array<float,42> data;
1414 for (int i=0; i<42; i++)
1415 {
1416 ostringstream str;
1417 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1418 data[i] = stof(str.str());
1419 }
1420 Update(fDimFwVersion, data, t);
1421 }
1422
1423 if (old.fRunNumber != h.fRunNumber || changed)
1424 {
1425 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1426 fDimRunNumber.setData(&run[0], 42*sizeof(uint32_t));
1427 fDimRunNumber.Update(t);
1428 }
1429
1430 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1431 {
1432 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1433 fDimPrescaler.setData(&pre[0], 42*sizeof(uint16_t));
1434 fDimPrescaler.Update(t);
1435 }
1436
1437 if (old.fDNA != h.fDNA || changed)
1438 {
1439 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1440 Update(fDimDNA, dna, t, 40);
1441 }
1442
1443 if (old.fStatus != h.fStatus || changed)
1444 {
1445 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1446 Update(fDimStatus, sts, t);
1447 }
1448
1449 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1450 {
1451 array<uint16_t, FAD::kNumDac*42> dacs;
1452
1453 for (int i=0; i<FAD::kNumDac; i++)
1454 {
1455 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1456 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1457 }
1458
1459 Update(fDimDac, dacs, t);
1460 }
1461
1462 // -----------
1463
1464 static Time oldt(boost::date_time::neg_infin);
1465 Time newt;
1466
1467 if (newt>oldt+boost::posix_time::seconds(1))
1468 {
1469 oldt = newt;
1470
1471 // --- RefClock
1472
1473 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1474 Update(fDimRefClock, clk, t);
1475
1476 // --- Temperatures
1477
1478 const array<int16_t,42> tmp[4] =
1479 {
1480 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1481 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1482 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1483 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1484 };
1485
1486 vector<int16_t> data;
1487 data.reserve(82);
1488 data.push_back(tmp[0][40]); // min: 0
1489 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1490 data.push_back(tmp[0][41]); // max: 41
1491 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1492
1493 for (int j=1; j<=3; j++)
1494 {
1495 const array<int16_t,42> &ref = tmp[j];
1496
1497 // Gloabl min
1498 if (ref[40]<data[0]) // 40=min
1499 data[0] = ref[40];
1500
1501 // Global max
1502 if (ref[41]>data[41]) // 41=max
1503 data[41] = ref[41];
1504
1505 for (int i=0; i<40; i++)
1506 {
1507 // min per board
1508 if (ref[i]<data[i+1]) // data: 1-40
1509 data[i+1] = ref[i]; // ref: 0-39
1510
1511 // max per board
1512 if (ref[i]>data[i+42]) // data: 42-81
1513 data[i+42] = ref[i]; // ref: 0-39
1514 }
1515 }
1516
1517 vector<float> deg(82); // 0: global min, 1-40: min
1518 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1519 deg[i] = data[i]/16.;
1520
1521 fDimTemperature.setData(deg.data(), 82*sizeof(float));
1522 fDimTemperature.Update(t);
1523 }
1524 }
1525
1526 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1527 {
1528 const uint16_t id = h.Id();
1529 if (id>39)
1530 return;
1531
1532 if (fNumConnected.size()!=40)
1533 fNumConnected.resize(40);
1534
1535 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1536
1537 const bool changed = con!=fNumConnected || !IsThreadRunning();
1538
1539 fNumConnected = con;
1540
1541 fDimQueue2.post(make_tuple(Time(), changed, h));
1542
1543 //const lock_guard<mutex> guard(fMutexDimQueue2);
1544 //fDimQueue2.push_back(make_tuple(Time(), changed, h));
1545 }
1546};
1547
1548EventBuilderWrapper *EventBuilderWrapper::This = 0;
1549
1550// ----------- Event builder callbacks implementation ---------------
1551extern "C"
1552{
1553 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1554 {
1555 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1556 }
1557
1558 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1559 {
1560 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1561 }
1562
1563 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1564 {
1565 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1566 }
1567
1568 // -----
1569
1570 //void *runStart(uint32_t /*irun*/, RUN_HEAD */*runhd*/, size_t /*len*/)
1571 //{
1572 // return NULL;
1573 //}
1574
1575 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
1576 {
1577 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
1578 }
1579
1580 int runEnd(uint32_t, void */*runPtr*/)
1581 {
1582 return 0;
1583 }
1584
1585 // -----
1586
1587 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
1588 {
1589 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
1590 }
1591
1592 void gotNewRun(uint32_t runnr, PEVNT_HEADER *headers)
1593 {
1594 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1595 }
1596
1597 // -----
1598
1599 void factOut(int severity, int err, const char *message)
1600 {
1601 EventBuilderWrapper::This->factOut(severity, err, message);
1602 }
1603
1604 void factStat(GUI_STAT stat)
1605 {
1606 EventBuilderWrapper::This->factStat(stat);
1607 }
1608
1609 void factStatNew(EVT_STAT stat)
1610 {
1611 EventBuilderWrapper::This->factStat(stat);
1612 }
1613
1614 void factReportIncomplete (uint64_t rep)
1615 {
1616 EventBuilderWrapper::This->factReportIncomplete(rep);
1617 }
1618
1619 // ------
1620
1621 void debugHead(int socket, int/*board*/, void *buf)
1622 {
1623 const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf);
1624 EventBuilderWrapper::This->debugHead(socket, h);
1625 }
1626
1627 void debugStream(int isock, void *buf, int len)
1628 {
1629 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1630 }
1631
1632 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1633 {
1634 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1635 }
1636}
1637
1638#endif
Note: See TracBrowser for help on using the repository browser.