source: trunk/FACT++/src/EventBuilderWrapper.h@ 15606

Last change on this file since 15606 was 15606, checked in by tbretz, 11 years ago
Fixed some typos in the new code.
File size: 52.2 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14#include <condition_variable>
15
16#include "DimWriteStatistics.h"
17
18#include "DataCalib.h"
19#include "DataWriteRaw.h"
20
21#ifdef HAVE_FITS
22#include "DataWriteFits.h"
23#else
24#define DataWriteFits DataWriteFits2
25#endif
26
27#include "DataWriteFits2.h"
28
29#include "queue.h"
30
31namespace ba = boost::asio;
32namespace bs = boost::system;
33namespace fs = boost::filesystem;
34
35using ba::ip::tcp;
36
37using namespace std;
38
39// ========================================================================
40
41#include "EventBuilder.h"
42
43extern "C" {
44 extern void StartEvtBuild();
45 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
46}
47
48// ========================================================================
49
50class EventBuilderWrapper
51{
52public:
53 // FIXME
54 static EventBuilderWrapper *This;
55
56 MessageImp &fMsg;
57
58private:
59 boost::thread fThreadMain;
60
61 enum CommandStates_t // g_runStat
62 {
63 kAbort = -2, // quit as soon as possible ('abort')
64 kExit = -1, // stop reading, quit when buffered events done ('exit')
65 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
66 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
67 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
68 kModeFlush = 10, // read data from camera, but skip them ('flush')
69 kModeTest = 20, // read data and process them, but do not write to disk ('test')
70 kModeFlag = 30, // read data, process and write all to disk ('flag')
71 kModeRun = 40, // read data, process and write selected to disk ('run')
72 };
73
74 enum
75 {
76 kCurrent = 0,
77 kTotal = 1,
78 kEventId = 2,
79 kTriggerId = 3,
80 };
81
82 FAD::FileFormat_t fFileFormat;
83
84 uint32_t fMaxRun;
85 uint32_t fLastOpened;
86 uint32_t fLastClosed;
87 array<uint32_t,4> fNumEvts;
88
89 DimWriteStatistics fDimWriteStats;
90 DimDescribedService fDimRuns;
91 DimDescribedService fDimEvents;
92 DimDescribedService fDimRawData;
93 DimDescribedService fDimEventData;
94 DimDescribedService fDimFeedbackData;
95 DimDescribedService fDimFwVersion;
96 DimDescribedService fDimRunNumber;
97 DimDescribedService fDimStatus;
98 DimDescribedService fDimDNA;
99 DimDescribedService fDimTemperature;
100 DimDescribedService fDimPrescaler;
101 DimDescribedService fDimRefClock;
102 DimDescribedService fDimRoi;
103 DimDescribedService fDimDac;
104 DimDescribedService fDimDrsRuns;
105 DimDescribedService fDimDrsCalibration;
106 DimDescribedService fDimStatistics1;
107 //DimDescribedService fDimStatistics2;
108 DimDescribedService fDimFileFormat;
109 DimDescribedService fDimIncomplete;
110
111 Queue<pair<Time,GUI_STAT>> fDimQueue1;
112 Queue<tuple<Time,bool,FAD::EventHeader>> fDimQueue2;
113 Queue<pair<Time,array<uint32_t,4>>> fDimQueue3;
114
115 bool fDebugStream;
116 bool fDebugRead;
117 bool fDebugLog;
118
119 string fPath;
120 uint64_t fNightAsInt;
121 uint32_t fRunNumber;
122
123protected:
124 bool InitRunNumber(const string &path="")
125 {
126 if (!path.empty())
127 {
128 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
129 {
130 fMsg.Error("Data path "+path+" does not exist!");
131 return false;
132 }
133
134 fPath = path;
135 fDimWriteStats.SetCurrentFolder(fPath);
136
137 fMsg.Info("Data path set to "+path+".");
138 }
139
140 // Get current night
141 const uint64_t night = Time().NightAsInt();
142 if (night==fNightAsInt)
143 return true;
144
145 // Check for run numbers
146 fRunNumber = 1000;
147
148 while (--fRunNumber>0)
149 {
150 const string name = DataProcessorImp::FormFileName(fPath, night, fRunNumber, "");
151
152 if (access((name+"bin").c_str(), F_OK) == 0)
153 break;
154 if (access((name+"fits").c_str(), F_OK) == 0)
155 break;
156 if (access((name+"drs.fits").c_str(), F_OK) == 0)
157 break;
158 }
159
160 // This is now the first file which does not exist
161 fRunNumber++;
162 fLastOpened = 0;
163
164 // Check if we have exceeded the maximum
165 if (fRunNumber==1000)
166 {
167 fMsg.Error("You have a file with run-number 1000 in "+fPath+" ["+to_string(night)+"]");
168 return false;
169 }
170
171 ostringstream str;
172 if (fNightAsInt==0)
173 str << "First night...";
174 else
175 str << "Night has changd from " << fNightAsInt << "... new";
176 str << " run-number is " << night << "-" << setfill('0') << setw(3) << fRunNumber << " [" << (fPath.empty()?".":fPath) << "]";
177 fMsg.Message(str);
178
179 fNightAsInt = night;
180
181 return true;
182 }
183
184public:
185 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
186 fFileFormat(FAD::kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
187 fDimWriteStats ("FAD_CONTROL", imp),
188 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C",
189 "Run files statistics"
190 "|stats[int]:num of open files, min/max run no, last opened or closed run"
191 "|file[string]:filename of last opened file"),
192 fDimEvents ("FAD_CONTROL/EVENTS", "I:4",
193 "Event counts"
194 "|evtsCount[int]:Num evts cur. run, total (all run), evt ID, trig. Num"),
195 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;S:1;I:1;I:1;S:1;I:1;C:4;I:1;I:2;I:40;S:1440;S:160;F",
196 "|roi[uint16]:number of samples per pixel"
197 "|roi_tm[uint16]:number of samples per time-marker channel"
198 "|num_fad[uint32]:event number from FADs"
199 "|num_ftm[uint32]:trigger number from FTM"
200 "|type[uint16]:trigger type from FTM"
201 "|num_boards[uint32]:number of active boards"
202 "|error[uint8]:event builder error counters"
203 "|dummy[]:"
204 "|time[uint32]:PC time as unix time stamp"
205 "|time_board[uint32]:Time stamp of FAD boards"
206 "|start_pix[int16]:start sample of pixels"
207 "|start_tm[int16]:start sample of time marker channels"
208 "|adc[int16]:adc data"),
209 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", "|avg:|rms:|max:|pos"),
210 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
211 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42",
212 "Firmware version number of fad boards"
213 "|firmware[float]:Version number of firmware, for each board. 40=min, 41=max"),
214 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42",
215 "Run numbers coming from FAD boards"
216 "|runNumbers[int]:current run number of each FAD board. 40=min, 41=max"),
217 fDimStatus ("FAD_CONTROL/STATUS", "S:42",
218 "Status of FAD boards"
219 "|status[bitpattern]:Status of each FAD board. Maybe buggy"),
220 fDimDNA ("FAD_CONTROL/DNA", "X:40",
221 "DNA of FAD boards"
222 "|DNA[hex]:Hex identifier of each FAD board"),
223 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82",
224 "FADs temperatures"
225 "|temp[deg. C]:0 global min, 1-40 min, 41 global max, 42-81 max"),
226 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42",
227 "Trigger generator prescaler of fad boards"
228 "|prescaler[int]:Trigger generator prescaler value, for each board"),
229 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42",
230 "Reference clock of FAD boards"
231 "|refClocks[t]:ref clocks of FAD boards. 40=min, 41=max"),
232 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", "roi:|roi_rm:"),
233 fDimDac ("FAD_CONTROL/DAC", "S:336",
234 "DAC settings of each FAD board"
235 "|DAC[int]:DAC counts, sequentially DAC 0 board 0, 0/1, 0/2... (plus min max)"),
236 fDimDrsRuns ("FAD_CONTROL/DRS_RUNS", "I:1;I:3",
237 "|roi:Region of interest of secondary baseline"
238 "|run:Run numbers of DRS runs (0=none)"),
239 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:1;I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:163840;F:163840",
240 "|roi:Region of interest of secondary baseline"
241 "|run:Run numbers of DRS runs (0=none)"),
242 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;X:40",
243 "Event Builder status for GUI display"
244 "|threadInfo[int]:Number of read, proc and writes"
245 "|bufferInfo[int]:Events in buffer, incomp., comp., tot., max past cycle, total"
246 "|memInfo[int]:total buf. mem, used mem, max used, max past cycle"
247 "|EvtCnt[int]:Number of events skipped, written, with errors"
248 "|badRoi[int]:Num boards with wrong ROI in event, run or board"
249 "|badRoiBoard[int]:Num boards with wrong ROI"
250 "|deltaT[ms]:Time in ms for rates"
251 "|rateNew[int]:Number of new start events received"
252 "|numConn[int]:Number of connections per board"
253 "|rateBytes[int]:Bytes read this cycle"
254 "|totBytes[int]:Bytes read (counter)"),
255 /*fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40",
256 "Event Builder status, events oriented"
257 "|reset[int]:If increased, reset all counters"
258 "|numRead[int]:How often sucessful read from N sockets per loop"
259 "|gotByte[int]:number of bytes read per board"
260 "|gotErr[int]:number of com. errors per board"
261 "|evtStat[int]:number of evts read, completed, with errors, incomplete"
262 "|procStat[int]:num. of evts proc., w probs, acc. or rej. by SW trigger"
263 "|feedStat[int]:number of evts used or rejected by feedback system"
264 "|wrtStat[int]:number of evts written to disk, with errors"
265 "|runStat[int]:number of run opened, closed, with open or close errors"
266 "|numConn[int]:number of sockets successfully opened per board"),*/
267 fDimFileFormat("FAD_CONTROL/FILE_FORMAT", "S:1", "|format[int]:Current file format"),
268 fDimIncomplete("FAD_CONTROL/INCOMPLETE", "X:1", "|incomplete[bits]:One bit per board"),
269 // It is important to instantiate them after the DimServices
270 fDimQueue1(std::bind(&EventBuilderWrapper::factStatSend, this, placeholders::_1)),
271 fDimQueue2(std::bind(&EventBuilderWrapper::procHeader, this, placeholders::_1)),
272 fDimQueue3(std::bind(&EventBuilderWrapper::updateEvents, this, placeholders::_1)),
273 fDebugStream(false), fDebugRead(false), fDebugLog(false), fNightAsInt(0)
274 {
275 if (This)
276 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
277
278 This = this;
279
280 memset(fNumEvts.data(), 0, sizeof(fNumEvts));
281 fDimEvents.Update(fNumEvts);
282
283 for (size_t i=0; i<40; i++)
284 ConnectSlot(i, tcp::endpoint());
285 }
286
287 virtual ~EventBuilderWrapper()
288 {
289 Abort();
290
291 // FIXME: Used timed_join and abort afterwards
292 // What's the maximum time the eb need to abort?
293 fThreadMain.join();
294
295 //ffMsg.Info("EventBuilder stopped.");
296
297 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
298 delete *it;
299 }
300
301 set<uint32_t> fIsRunStarted;
302 map<uint32_t, FAD::RunDescription> fExpectedRuns;
303
304 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
305 {
306 if (maxtime<=0 || maxtime>24*60*60)
307 maxtime = 24*60*60;
308 if (maxevt<=0 || maxevt>INT32_MAX)
309 maxevt = INT32_MAX;
310
311 const FAD::RunDescription descr =
312 {
313 uint32_t(maxtime),
314 uint32_t(maxevt),
315 ref.first,
316 ref.second,
317 };
318
319 if (!InitRunNumber())
320 return 0;
321
322 // FIMXE: Maybe reset an event counter so that the mcp can count events?
323
324 //fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
325
326 fExpectedRuns[fRunNumber] = descr;
327 fIsRunStarted.insert(fRunNumber);
328 return fRunNumber++;
329 }
330
331 bool IsThreadRunning()
332 {
333 return !fThreadMain.timed_join(boost::posix_time::microseconds(0));
334 }
335
336 void SetMaxMemory(unsigned int mb) const
337 {
338 /*
339 if (mb*1000000<GetUsedMemory())
340 {
341 // ffMsg.Warn("...");
342 return;
343 }*/
344
345 g_maxMem = size_t(mb)*1000000;
346 }
347
348 void StartThread(const vector<tcp::endpoint> &addr)
349 {
350 if (IsThreadRunning())
351 {
352 fMsg.Warn("Start - EventBuilder still running");
353 return;
354 }
355
356 fLastMessage.clear();
357
358 for (size_t i=0; i<40; i++)
359 ConnectSlot(i, addr[i]);
360
361 g_runStat = kModeRun;
362 g_maxProc = 1;
363
364 fMsg.Message("Starting EventBuilder thread");
365
366 fThreadMain = boost::thread(StartEvtBuild);
367 }
368 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
369 {
370 if (i>39)
371 return;
372
373 if (addr==tcp::endpoint())
374 {
375 DisconnectSlot(i);
376 return;
377 }
378
379 struct sockaddr_in sockaddr; //IP for each socket
380 sockaddr.sin_family = AF_INET;
381 sockaddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
382 sockaddr.sin_port = htons(addr.port());
383 memcpy(&g_port[i].sockAddr, &sockaddr, sizeof(struct sockaddr_in));
384
385 // In this order
386 g_port[i].sockDef = 1;
387
388 fDimIncomplete.setQuality(0);
389 fDimIncomplete.Update(uint64_t(0));
390 }
391
392 void DisconnectSlot(unsigned int i)
393 {
394 if (i>39)
395 return;
396
397 g_port[i].sockDef = 0;
398 // In this order
399
400 struct sockaddr_in addr; //IP for each socket
401 addr.sin_family = AF_INET;
402 addr.sin_addr.s_addr = 0;
403 addr.sin_port = 0;
404 memcpy(&g_port[i].sockAddr, &addr, sizeof(struct sockaddr_in));
405
406 fDimIncomplete.setQuality(0);
407 fDimIncomplete.Update(uint64_t(0));
408 }
409 void IgnoreSlot(unsigned int i)
410 {
411 if (i>39)
412 return;
413 if (g_port[i].sockAddr.sin_port==0)
414 return;
415
416 g_port[i].sockDef = -1;
417 }
418
419
420 void Abort()
421 {
422 fMsg.Message("Signal abort to EventBuilder thread...");
423 g_runStat = kAbort;
424 }
425
426 void ResetThread(bool soft)
427 {
428 /*
429 if (g_reset > 0)
430
431 * suspend reading
432 * reset = g_reset;
433 * g_reset=0
434
435 * reset% 10
436 == 0 leave event Buffers as they are
437 == 1 let all buffers drain (write (incomplete) events)
438 > 1 flush all buffers (do not write buffered events)
439
440 * (reset/10)%10
441 > 0 close all sockets and destroy them (also free the
442 allocated read-buffers)
443 recreate before resuming operation
444 [ this is more than just close/open that can be
445 triggered by e.g. close/open the base-socket ]
446
447 * (reset/100)%10
448 > 0 close all open run-files
449
450 * (reset/1000)
451 sleep so many seconds before resuming operation
452 (does not (yet) take into account time left when waiting
453 for buffers getting empty ...)
454
455 * resume_reading
456
457 */
458 fMsg.Message("Signal reset to EventBuilder thread...");
459 g_reset = soft ? 101 : 102;
460 }
461
462 void Exit()
463 {
464 fMsg.Message("Signal exit to EventBuilder thread...");
465 g_runStat = kExit;
466 }
467
468 /*
469 void Wait()
470 {
471 fThread.join();
472 ffMsg.Message("EventBuilder stopped.");
473 }*/
474
475 void Hybernate() const { g_runStat = kHybernate; }
476 void Sleep() const { g_runStat = kSleep; }
477 void FlushMode() const { g_runStat = kModeFlush; }
478 void TestMode() const { g_runStat = kModeTest; }
479 void FlagMode() const { g_runStat = kModeFlag; }
480 void RunMode() const { g_runStat = kModeRun; }
481
482 // FIXME: To be removed
483 //void SetMode(int mode) const { g_runStat = mode; }
484
485 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
486 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
487 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
488 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
489 int GetNumFilesOpen() const { return fFiles.size(); }
490
491 /*
492 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
493 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
494 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
495 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
496 */
497
498 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
499 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
500
501 void SetOutputFormat(FAD::FileFormat_t f)
502 {
503 const bool changed = f!=fFileFormat;
504
505 fFileFormat = f;
506 fDimFileFormat.Update(uint16_t(f));
507
508 string msg = "File format set to: ";
509 switch (f)
510 {
511 case FAD::kNone: msg += "kNone."; break;
512 case FAD::kDebug: msg += "kDebug."; break;
513 case FAD::kFits: msg += "kFits."; break;
514 case FAD::kCfitsio: msg += "kCfitsio"; break;
515 case FAD::kRaw: msg += "kRaw"; break;
516 case FAD::kCalib:
517 DataCalib::Restart();
518 DataCalib::Update(fDimDrsCalibration, fDimDrsRuns);
519 fMsg.Message("Resetted DRS calibration.");
520 return;
521 }
522
523 if (changed)
524 fMsg.Message(msg);
525 }
526
527 virtual int ResetSecondaryDrsBaseline()
528 {
529 if (DataCalib::ResetTrgOff(fDimDrsCalibration, fDimDrsRuns))
530 {
531 fFileFormat = FAD::kCalib;
532 fDimFileFormat.Update(uint16_t(fFileFormat));
533 fMsg.Message("Resetted DRS calibration for secondary baseline.");
534 }
535 else
536 fMsg.Warn("Could not reset DRS calibration of secondary baseline.");
537
538 return 0;
539 }
540
541 void SetDebugLog(bool b) { fDebugLog = b; }
542
543 void SetDebugStream(bool b)
544 {
545 fDebugStream = b;
546 if (b)
547 return;
548
549 for (int i=0; i<40; i++)
550 {
551 if (!fDumpStream[i].is_open())
552 continue;
553
554 fDumpStream[i].close();
555
556 ostringstream name;
557 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
558 fMsg.Message("Closed file '"+name.str()+"'");
559 }
560 }
561
562 void SetDebugRead(bool b)
563 {
564 fDebugRead = b;
565 if (b || !fDumpRead.is_open())
566 return;
567
568 fDumpRead.close();
569 fMsg.Message("Closed file 'socket_events.txt'");
570 }
571
572// size_t GetUsedMemory() const { return gi_usedMem; }
573
574 void LoadDrsCalibration(const char *fname)
575 {
576 if (!DataCalib::ReadFits(fname, fMsg))
577 return;
578 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
579 DataCalib::Update(fDimDrsCalibration, fDimDrsRuns);
580 }
581
582 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
583
584
585 /*
586 struct OpenFileToDim
587 {
588 int code;
589 char fileName[FILENAME_MAX];
590 };
591
592 SignalRunOpened(runid, filename);
593 // Send num open files
594 // Send runid, (more info about the run?), filename via dim
595
596 SignalEvtWritten(runid);
597 // Send num events written of newest file
598
599 SignalRunClose(runid);
600 // Send new num open files
601 // Send empty file-name if no file is open
602
603 */
604
605 // -------------- Mapped event builder callbacks ------------------
606
607 void UpdateRuns(const string &fname="")
608 {
609 uint32_t values[5] =
610 {
611 static_cast<uint32_t>(fFiles.size()),
612 0xffffffff,
613 0,
614 fLastOpened,
615 fLastClosed
616 };
617
618 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
619 it!=fFiles.end(); it++)
620 {
621 const DataProcessorImp *file = *it;
622
623 if (file->GetRunId()<values[1])
624 values[1] = file->GetRunId();
625
626 if (file->GetRunId()>values[2])
627 values[2] = file->GetRunId();
628 }
629
630 fMaxRun = values[2];
631
632 vector<char> data(sizeof(values)+fname.size()+1);
633 memcpy(data.data(), values, sizeof(values));
634 strcpy(data.data()+sizeof(values), fname.c_str());
635
636 fDimRuns.Update(data);
637 }
638
639 vector<DataProcessorImp*> fFiles;
640
641 void updateEvents(const pair<Time,array<uint32_t,4>> &stat)
642 {
643 fDimEvents.setData(stat.second.data(), sizeof(uint32_t)*4);
644 fDimEvents.Update(stat.first);
645 }
646
647 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
648 {
649 //fMsg.Info(" ==> TODO: Update run configuration in database!");
650
651 map<uint32_t,FAD::RunDescription>::iterator it = fExpectedRuns.begin();
652 while (it!=fExpectedRuns.end())
653 {
654 if (it->first<runid)
655 {
656 ostringstream str;
657 str << "runOpen - Missed run " << it->first << ".";
658 fMsg.Info(str);
659
660 fExpectedRuns.erase(it++);
661 continue;
662 }
663 if (it->first==runid)
664 break;
665 it++;
666 }
667
668 FAD::RunDescription desc;
669
670 if (it==fExpectedRuns.end())
671 {
672 ostringstream str;
673 str << "runOpen - Run " << runid << " wasn't expected (maybe manual triggers)";
674 fMsg.Warn(str);
675 }
676 else
677 {
678 desc = it->second;
679 fExpectedRuns.erase(it);
680 }
681
682 // Check if file already exists...
683 DataProcessorImp *file = 0;
684 switch (fFileFormat)
685 {
686 case FAD::kNone: file = new DataDump(fPath, fNightAsInt, runid, fMsg); break;
687 case FAD::kDebug: file = new DataDebug(fPath, fNightAsInt, runid, fMsg); break;
688 case FAD::kCfitsio: file = new DataWriteFits(fPath, fNightAsInt, runid, fMsg); break;
689 case FAD::kFits: file = new DataWriteFits2(fPath, fNightAsInt, runid, fMsg); break;
690 case FAD::kRaw: file = new DataWriteRaw(fPath, fNightAsInt, runid, fMsg); break;
691 case FAD::kCalib: file = new DataCalib(fPath, fNightAsInt, runid, fDimDrsCalibration, fDimDrsRuns, fMsg); break;
692 }
693
694 try
695 {
696 if (!file->Open(h, desc))
697 return 0;
698 }
699 catch (const exception &e)
700 {
701 fMsg.Error("Exception trying to open file: "+string(e.what()));
702 return 0;
703 }
704
705 fFiles.push_back(file);
706
707 ostringstream str;
708 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
709 fMsg.Info(str);
710
711 fDimWriteStats.FileOpened(file->GetFileName());
712
713 fLastOpened = runid;
714 UpdateRuns(file->GetFileName());
715
716 fNumEvts[kEventId] = 0;
717 fNumEvts[kTriggerId] = 0;
718
719 fNumEvts[kCurrent] = 0;
720 fDimQueue3.post(make_pair(Time(), fNumEvts));
721 // fDimCurrentEvent.Update(uint32_t(0));
722
723 return reinterpret_cast<FileHandle_t>(file);
724 }
725
726 int runWrite(FileHandle_t handler, EVENT *e, size_t /*sz*/)
727 {
728 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
729
730 if (!file->WriteEvt(e))
731 return -1;
732
733 if (file->GetRunId()==fMaxRun)
734 {
735 fNumEvts[kCurrent]++;
736 fNumEvts[kEventId] = e->EventNum;
737 fNumEvts[kTriggerId] = e->TriggerNum;
738 }
739
740 fNumEvts[kTotal]++;
741
742 static Time oldt(boost::date_time::neg_infin);
743 Time newt;
744 if (newt>oldt+boost::posix_time::seconds(1))
745 {
746 fDimQueue3.post(make_pair(Time(), fNumEvts));
747 //fDimEvents.Update(fNumEvts);
748 oldt = newt;
749 }
750
751
752 // ===> SignalEvtWritten(runid);
753 // Send num events written of newest file
754
755 /* close run runId (all all runs if runId=0) */
756 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
757 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
758
759 return 0;
760 }
761
762 virtual void CloseRun(uint32_t /*runid*/) { }
763
764 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
765 {
766 //fMsg.Info(" ==> TODO: Update run configuration in database!");
767
768 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
769
770 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
771 if (it==fFiles.end())
772 {
773 ostringstream str;
774 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
775 fMsg.Fatal(str);
776 return -1;
777 }
778
779 /*
780 fFiles.erase(it);
781
782 fLastClosed = file->GetRunId();
783 CloseRun(fLastClosed);
784 UpdateRuns();
785
786 fDimEvents.Update(fNumEvts);
787 */
788
789 const bool rc = file->Close(tail);
790 if (!rc)
791 {
792 // Error message
793 }
794
795 // Note that this is the signal for the fadctrl to change from
796 // WritingData back to Connected. If this is done too early,
797 // a new run might be started before this is closed. This is
798 // faster, but leads to problems with the DRS calibration
799 // if the system is fast enough to start the new run before
800 // this one has really been closed.
801 fFiles.erase(it);
802
803 fLastClosed = file->GetRunId();
804 CloseRun(fLastClosed);
805 UpdateRuns();
806
807 fDimQueue3.post(make_pair(Time(),fNumEvts));
808 //fDimEvents.Update(fNumEvts);
809
810
811 ostringstream str;
812 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
813 fMsg.Info(str);
814
815 delete file;
816
817 // ==> SignalRunClose(runid);
818 // Send new num open files
819 // Send empty file-name if no file is open
820
821 return rc ? 0 : -1;
822 }
823
824 ofstream fDumpStream[40];
825
826 void debugStream(int isock, void *buf, int len)
827 {
828 if (!fDebugStream)
829 return;
830
831 const int slot = isock/7;
832 if (slot<0 || slot>39)
833 return;
834
835 if (!fDumpStream[slot].is_open())
836 {
837 ostringstream name;
838 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
839
840 fDumpStream[slot].open(name.str().c_str(), ios::app);
841 if (!fDumpStream[slot])
842 {
843 ostringstream str;
844 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
845 fMsg.Error(str);
846
847 return;
848 }
849
850 fMsg.Message("Opened file '"+name.str()+"' for writing.");
851 }
852
853 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
854 }
855
856 ofstream fDumpRead; // Stream to possibly dump docket events
857
858 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
859 {
860 // isock = socketID (0-279)
861 // ibyte = #bytes gelesen
862 // event = eventId (oder 0 wenn noch nicht bekannt)
863 // state : 1=finished reading data
864 // 0=reading data
865 // -1=start reading data (header)
866 // -2=start reading data,
867 // eventId not known yet (too little data)
868 // tsec, tusec = time when reading seconds, microseconds
869 //
870 if (!fDebugRead || ibyte==0)
871 return;
872
873 if (!fDumpRead.is_open())
874 {
875 fDumpRead.open("socket_events.txt", ios::app);
876 if (!fDumpRead)
877 {
878 ostringstream str;
879 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
880 fMsg.Error(str);
881
882 return;
883 }
884
885 fMsg.Message("Opened file 'socket_events.txt' for writing.");
886
887 fDumpRead << "# START: " << Time().GetAsStr() << endl;
888 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
889 }
890
891 fDumpRead
892 << setw(2) << state << " "
893 << setw(8) << tsec << " "
894 << setw(9) << tusec << " "
895 << setw(3) << isock << " "
896 << setw(2) << isock/7 << " "
897 << runno << " "
898 << event << " "
899 << ftmevt << " "
900 << ibyte << endl;
901 }
902
903 array<uint16_t,2> fVecRoi;
904
905 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int /*iboard*/)
906 {
907 /*
908 fadhd[i] ist ein array mit den 40 fad-headers
909 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
910
911 event ist die Struktur, die auch die write routine erhaelt;
912 darin sind im header die 'soll-werte' fuer z.B. eventID
913 als auch die ADC-Werte (falls Du die brauchst)
914
915 Wenn die routine einen negativen Wert liefert, wird das event
916 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
917 */
918
919 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
920
921 if (roi!=fVecRoi)
922 {
923 Update(fDimRoi, roi);
924 fVecRoi = roi;
925 }
926
927 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
928 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
929
930 // FIMXE: Compare with target configuration
931
932 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
933 {
934 // FIXME: Compare with expectations!!!
935 if (ptr->fStartDelimiter==0)
936 {
937 if (ptr==beg)
938 beg++;
939 continue;
940 }
941
942 if (beg->fStatus != ptr->fStatus)
943 {
944 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
945 CloseRunFile(runNr, 0, 0);
946 return -1;
947 }
948
949 if (beg->fRunNumber != ptr->fRunNumber)
950 {
951 fMsg.Error("Inconsistent run number detected.... closing run.");
952 CloseRunFile(runNr, 0, 0);
953 return -1;
954 }
955
956 /*
957 if (beg->fVersion != ptr->fVersion)
958 {
959 Error("Inconsist firmware version detected.... closing run.");
960 CloseRunFile(runNr, 0, 0);
961 break;
962 }
963 */
964 if (beg->fEventCounter != ptr->fEventCounter)
965 {
966 fMsg.Error("Inconsistent FAD event number detected.... closing run.");
967 CloseRunFile(runNr, 0, 0);
968 return -1;
969 }
970
971 if (beg->fTriggerCounter != ptr->fTriggerCounter)
972 {
973 fMsg.Error("Inconsistent FTM trigger number detected.... closing run.");
974 CloseRunFile(runNr, 0, 0);
975 return -1;
976 }
977
978 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
979 {
980 fMsg.Error("Inconsistent phase shift detected.... closing run.");
981 CloseRunFile(runNr, 0, 0);
982 return -1;
983 }
984
985 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
986 {
987 fMsg.Error("Inconsistent DAC values detected.... closing run.");
988 CloseRunFile(runNr, 0, 0);
989 return -1;
990 }
991
992 if (beg->fTriggerType != ptr->fTriggerType)
993 {
994 fMsg.Error("Inconsistent trigger type detected.... closing run.");
995 CloseRunFile(runNr, 0, 0);
996 return -1;
997 }
998 }
999
1000 // check REFCLK_frequency
1001 // check consistency with command configuration
1002 // how to log errors?
1003 // need gotNewRun/closedRun to know it is finished
1004
1005 return 0;
1006 }
1007
1008 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
1009 {
1010 // Currently we send any event no matter what its trigger id is...
1011 // To be changed.
1012 static Time oldt(boost::date_time::neg_infin);
1013 Time newt;
1014
1015 static int skip = 0;
1016
1017 // FIXME: Only send events if the have newer run-numbers
1018 if (newt<oldt+boost::posix_time::milliseconds(skip>0 ? 100 : 1000))
1019 return;
1020 oldt = newt;
1021
1022 // Workaround to find a valid header.....
1023 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
1024 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
1025
1026 // FIMXE: Compare with target configuration
1027 const FAD::EventHeader *ptr=beg;
1028 for (; ptr!=end; ptr++)
1029 {
1030 if (ptr->fStartDelimiter!=0)
1031 break;
1032 }
1033 if (ptr==end)
1034 return;
1035
1036
1037 vector<char> data(sizeof(EVENT)+event->Roi*sizeof(float)*(1440+160));
1038 memcpy(data.data(), event, sizeof(EVENT));
1039
1040 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
1041
1042 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
1043 DrsCalibrate::RemoveSpikes(vec, event->Roi);
1044
1045 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
1046 const double max = DrsCalibrate::GetPixelStats(data2.data(), vec, event->Roi);
1047
1048 // Maximum above roughly 5pe
1049 if (ptr->IsTriggerPhys() && max<100 && skip<10)
1050 {
1051 skip++;
1052 return;
1053 }
1054
1055 skip = 0;
1056
1057 fDimRawData.setQuality(ptr->fTriggerType);
1058 fDimRawData.Update(data);
1059
1060 fDimEventData.setQuality(ptr->fTriggerType);
1061 fDimEventData.Update(data2);
1062 }
1063
1064 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
1065 {
1066 /*
1067 if (!DataCalib::IsValid())
1068 return;
1069
1070 // Workaround to find a valid header.....
1071 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
1072 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
1073
1074 // FIMXE: Compare with target configuration
1075
1076 const FAD::EventHeader *ptr=beg;
1077 for (; ptr<end; ptr++)
1078 {
1079 if (ptr->fStartDelimiter!=0)
1080 break;
1081 }
1082
1083 if (ptr==end)
1084 return;
1085
1086 if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
1087 return;
1088
1089 vector<float> data(event->Roi*1440);
1090 DataCalib::Apply(data.data(), event->Adc_Data, event->StartPix, event->Roi);
1091
1092 DrsCalibrate::RemoveSpikes(data.data(), event->Roi);
1093
1094 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
1095 DrsCalibrate::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
1096
1097 fDimFeedbackData.Update(data2);
1098 */
1099 }
1100
1101 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t /*iboard*/, void */*buffer*/)
1102 {
1103 switch (threadID)
1104 {
1105 case 0:
1106 SendRawData(fadhd, event);
1107 return 100;
1108 /*
1109 case 1:
1110 SendFeedbackData(fadhd, event);
1111 return 2;*/
1112 }
1113 return 100;
1114 }
1115
1116
1117 bool IsRunStarted() const
1118 {
1119 const set<uint32_t>::const_iterator it = fIsRunStarted.find(fRunNumber-1);
1120 return it==fIsRunStarted.end();// ? true : it->second.started;
1121 }
1122
1123 uint32_t GetRunNumber() const
1124 {
1125 return fRunNumber;
1126 }
1127
1128 bool IsRunFileOpen()
1129 {
1130 return fLastOpened==fRunNumber-1;
1131 }
1132
1133 bool IncreaseRunNumber(uint32_t run)
1134 {
1135 if (!InitRunNumber())
1136 return false;
1137
1138 if (run<fRunNumber)
1139 {
1140 ostringstream msg;
1141 msg <<
1142 "Run number " << run << " smaller than next available "
1143 "run number " << fRunNumber << " in " << fPath << " [" << fNightAsInt << "]";
1144 fMsg.Error(msg);
1145 return false;
1146 }
1147
1148 fRunNumber = run;
1149
1150 return true;
1151 }
1152
1153 void gotNewRun(uint32_t runnr, PEVNT_HEADER */*headers*/)
1154 {
1155 // This function is called even when writing is switched off
1156 set<uint32_t>::iterator it = fIsRunStarted.begin();
1157 while (it!=fIsRunStarted.end())
1158 {
1159 if (*it<runnr)
1160 {
1161 ostringstream str;
1162 str << "gotNewRun - Missed run " << *it << ".";
1163 fMsg.Info(str);
1164
1165 fIsRunStarted.erase(it++);
1166 continue;
1167 }
1168 if (*it==runnr)
1169 break;
1170 it++;
1171 }
1172 if (it==fIsRunStarted.end())
1173 {
1174 ostringstream str;
1175 str << "gotNewRun - Not waiting for run " << runnr << ".";
1176 fMsg.Warn(str);
1177 return;
1178 }
1179
1180 map<uint32_t,FAD::RunDescription>::iterator i2 = fExpectedRuns.find(runnr);
1181 if (i2==fExpectedRuns.end())
1182 {
1183 ostringstream str;
1184 str << "gotNewRun - Run " << runnr << " wasn't expected.";
1185 fMsg.Warn(str);
1186 return;
1187 }
1188
1189 CloseRunFile(runnr, time(NULL)+i2->second.maxtime, i2->second.maxevt);
1190 // return: 0=close scheduled / >0 already closed / <0 does not exist
1191
1192 // FIXME: Move configuration from expected runs to runs which will soon
1193 // be opened/closed
1194
1195 fIsRunStarted.erase(it);
1196 }
1197
1198 map<boost::thread::id, string> fLastMessage;
1199
1200 void factOut(int severity, int err, const char *message)
1201 {
1202 if (!fDebugLog && severity==99)
1203 return;
1204
1205 ostringstream str;
1206 //str << boost::this_thread::get_id() << " ";
1207 str << "EventBuilder(";
1208 if (err<0)
1209 str << "---";
1210 else
1211 str << err;
1212 str << "): " << message;
1213
1214 string &old = fLastMessage[boost::this_thread::get_id()];
1215
1216 if (str.str()==old)
1217 return;
1218 old = str.str();
1219
1220 fMsg.Update(str, severity);
1221 }
1222
1223/*
1224 void factStat(int64_t *stat, int len)
1225 {
1226 if (len!=7)
1227 {
1228 fMsg.Warn("factStat received unknown number of values.");
1229 return;
1230 }
1231
1232 vector<int64_t> data(1, g_maxMem);
1233 data.insert(data.end(), stat, stat+len);
1234
1235 static vector<int64_t> last(8);
1236 if (data==last)
1237 return;
1238 last = data;
1239
1240 fDimStatistics.Update(data);
1241
1242 // len ist die Laenge des arrays.
1243 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
1244 // kannst Du pruefen, ob die 100MB voll sind ....
1245
1246 ostringstream str;
1247 str
1248 << "Wait=" << stat[0] << " "
1249 << "Skip=" << stat[1] << " "
1250 << "Del=" << stat[2] << " "
1251 << "Tot=" << stat[3] << " "
1252 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1253 << "Read=" << stat[5] << " "
1254 << "Conn=" << stat[6];
1255
1256 fMsg.Info(str);
1257 }
1258 */
1259
1260 void factStat(const EVT_STAT &/*stat*/)
1261 {
1262 //fDimStatistics2.Update(stat);
1263 }
1264
1265 void factStatSend(const pair<Time,GUI_STAT> &stat)
1266 {
1267 fDimStatistics1.setData(&stat.second, sizeof(GUI_STAT));
1268 fDimStatistics1.Update(stat.first);
1269 }
1270
1271 void factStat(const GUI_STAT &stat)
1272 {
1273 fDimQueue1.post(make_pair(Time(), stat));
1274 }
1275
1276 void factReportIncomplete(uint64_t rep)
1277 {
1278 fDimIncomplete.setQuality(1);
1279 fDimIncomplete.Update(rep);
1280 }
1281
1282 array<FAD::EventHeader, 40> fVecHeader;
1283
1284 template<typename T, class S>
1285 array<T, 42> Compare(const S *vec, const T *t)
1286 {
1287 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1288
1289 const T *min = NULL;
1290 const T *val = NULL;
1291 const T *max = NULL;
1292
1293 array<T, 42> arr;
1294
1295 // bool rc = true;
1296 for (int i=0; i<40; i++)
1297 {
1298 const char *base = reinterpret_cast<const char*>(vec+i);
1299 const T *ref = reinterpret_cast<const T*>(base+offset);
1300
1301 arr[i] = *ref;
1302
1303 if (gi_NumConnect[i]!=7)
1304 {
1305 arr[i] = 0;
1306 continue;
1307 }
1308
1309 if (!val)
1310 {
1311 min = ref;
1312 val = ref;
1313 max = ref;
1314 }
1315
1316 if (*ref<*min)
1317 min = ref;
1318
1319 if (*ref>*max)
1320 max = ref;
1321
1322 // if (*val!=*ref)
1323 // rc = false;
1324 }
1325
1326 arr[40] = val ? *min : 1;
1327 arr[41] = val ? *max : 0;
1328
1329 return arr;
1330 }
1331
1332 template<typename T>
1333 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1334 {
1335 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1336
1337 T val = 0;
1338 T rc = 0;
1339
1340 array<T, 42> vec;
1341
1342 bool first = true;
1343
1344 for (int i=0; i<40; i++)
1345 {
1346 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1347 const T *ref = reinterpret_cast<const T*>(base+offset);
1348
1349 vec[i+2] = *ref;
1350
1351 if (gi_NumConnect[i]!=7)
1352 {
1353 vec[i+2] = 0;
1354 continue;
1355 }
1356
1357 if (first)
1358 {
1359 first = false;
1360 val = *ref;
1361 rc = 0;
1362 }
1363
1364 rc |= val^*ref;
1365 }
1366
1367 vec[0] = rc;
1368 vec[1] = val;
1369
1370 return vec;
1371 }
1372
1373 template<typename T, size_t N>
1374 void Update(DimDescribedService &svc, const array<T, N> &data, const Time &t=Time(), int n=N)
1375 {
1376// svc.setQuality(vec[40]<=vec[41]);
1377 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1378 svc.Update(t);
1379 }
1380
1381 template<typename T>
1382 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1383 {
1384 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1385 for (int i=0; i<40;i++)
1386 cout << " " << data.second[i+3];
1387 cout << endl;
1388 }
1389
1390 vector<uint> fNumConnected;
1391
1392 void procHeader(const tuple<Time,bool,FAD::EventHeader> &dat)
1393 {
1394 const Time &t = get<0>(dat);
1395 const bool changed = get<1>(dat);
1396 const FAD::EventHeader &h = get<2>(dat);
1397
1398 const FAD::EventHeader old = fVecHeader[h.Id()];
1399 fVecHeader[h.Id()] = h;
1400
1401 if (old.fVersion != h.fVersion || changed)
1402 {
1403 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1404
1405 array<float,42> data;
1406 for (int i=0; i<42; i++)
1407 {
1408 ostringstream str;
1409 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1410 data[i] = stof(str.str());
1411 }
1412 Update(fDimFwVersion, data, t);
1413 }
1414
1415 if (old.fRunNumber != h.fRunNumber || changed)
1416 {
1417 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1418 fDimRunNumber.setData(&run[0], 42*sizeof(uint32_t));
1419 fDimRunNumber.Update(t);
1420 }
1421
1422 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1423 {
1424 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1425 fDimPrescaler.setData(&pre[0], 42*sizeof(uint16_t));
1426 fDimPrescaler.Update(t);
1427 }
1428
1429 if (old.fDNA != h.fDNA || changed)
1430 {
1431 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1432 Update(fDimDNA, dna, t, 40);
1433 }
1434
1435 if (old.fStatus != h.fStatus || changed)
1436 {
1437 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1438 Update(fDimStatus, sts, t);
1439 }
1440
1441 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1442 {
1443 array<uint16_t, FAD::kNumDac*42> dacs;
1444
1445 for (int i=0; i<FAD::kNumDac; i++)
1446 {
1447 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1448 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1449 }
1450
1451 Update(fDimDac, dacs, t);
1452 }
1453
1454 // -----------
1455
1456 static Time oldt(boost::date_time::neg_infin);
1457 Time newt;
1458
1459 if (newt>oldt+boost::posix_time::seconds(1))
1460 {
1461 oldt = newt;
1462
1463 // --- RefClock
1464
1465 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1466 Update(fDimRefClock, clk, t);
1467
1468 // --- Temperatures
1469
1470 const array<int16_t,42> tmp[4] =
1471 {
1472 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1473 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1474 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1475 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1476 };
1477
1478 vector<int16_t> data;
1479 data.reserve(82);
1480 data.push_back(tmp[0][40]); // min: 0
1481 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1482 data.push_back(tmp[0][41]); // max: 41
1483 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1484
1485 for (int j=1; j<=3; j++)
1486 {
1487 const array<int16_t,42> &ref = tmp[j];
1488
1489 // Gloabl min
1490 if (ref[40]<data[0]) // 40=min
1491 data[0] = ref[40];
1492
1493 // Global max
1494 if (ref[41]>data[41]) // 41=max
1495 data[41] = ref[41];
1496
1497 for (int i=0; i<40; i++)
1498 {
1499 // min per board
1500 if (ref[i]<data[i+1]) // data: 1-40
1501 data[i+1] = ref[i]; // ref: 0-39
1502
1503 // max per board
1504 if (ref[i]>data[i+42]) // data: 42-81
1505 data[i+42] = ref[i]; // ref: 0-39
1506 }
1507 }
1508
1509 vector<float> deg(82); // 0: global min, 1-40: min
1510 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1511 deg[i] = data[i]/16.;
1512
1513 fDimTemperature.setData(deg.data(), 82*sizeof(float));
1514 fDimTemperature.Update(t);
1515 }
1516 }
1517
1518 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1519 {
1520 const uint16_t id = h.Id();
1521 if (id>39)
1522 return;
1523
1524 if (fNumConnected.size()!=40)
1525 fNumConnected.resize(40);
1526
1527 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1528
1529 const bool changed = con!=fNumConnected || !IsThreadRunning();
1530
1531 fNumConnected = con;
1532
1533 fDimQueue2.post(make_tuple(Time(), changed, h));
1534
1535 //const lock_guard<mutex> guard(fMutexDimQueue2);
1536 //fDimQueue2.push_back(make_tuple(Time(), changed, h));
1537 }
1538};
1539
1540EventBuilderWrapper *EventBuilderWrapper::This = 0;
1541
1542// ----------- Event builder callbacks implementation ---------------
1543extern "C"
1544{
1545 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1546 {
1547 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1548 }
1549
1550 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1551 {
1552 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1553 }
1554
1555 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1556 {
1557 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1558 }
1559
1560 // -----
1561
1562 //void *runStart(uint32_t /*irun*/, RUN_HEAD */*runhd*/, size_t /*len*/)
1563 //{
1564 // return NULL;
1565 //}
1566
1567 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
1568 {
1569 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
1570 }
1571
1572 int runEnd(uint32_t, void */*runPtr*/)
1573 {
1574 return 0;
1575 }
1576
1577 // -----
1578
1579 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
1580 {
1581 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
1582 }
1583
1584 void gotNewRun(uint32_t runnr, PEVNT_HEADER *headers)
1585 {
1586 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1587 }
1588
1589 // -----
1590
1591 void factOut(int severity, int err, const char *message)
1592 {
1593 EventBuilderWrapper::This->factOut(severity, err, message);
1594 }
1595
1596 void factStat(GUI_STAT stat)
1597 {
1598 EventBuilderWrapper::This->factStat(stat);
1599 }
1600
1601 void factStatNew(EVT_STAT stat)
1602 {
1603 EventBuilderWrapper::This->factStat(stat);
1604 }
1605
1606 void factReportIncomplete (uint64_t rep)
1607 {
1608 EventBuilderWrapper::This->factReportIncomplete(rep);
1609 }
1610
1611 // ------
1612
1613 void debugHead(int socket, int/*board*/, void *buf)
1614 {
1615 const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf);
1616 EventBuilderWrapper::This->debugHead(socket, h);
1617 }
1618
1619 void debugStream(int isock, void *buf, int len)
1620 {
1621 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1622 }
1623
1624 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1625 {
1626 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1627 }
1628}
1629
1630#endif
Note: See TracBrowser for help on using the repository browser.