source: trunk/FACT++/src/EventBuilderWrapper.h@ 12523

Last change on this file since 12523 was 12522, checked in by tbretz, 14 years ago
Added a new service FILE_FORMAT to distribute the file format currently switched on; added controlos and displays to set and see the file format.
File size: 40.3 KB
Line 
1#ifndef FACT_EventBuilderWrapper
2#define FACT_EventBuilderWrapper
3
4#include <sstream>
5
6#if BOOST_VERSION < 104400
7#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 4))
8#undef BOOST_HAS_RVALUE_REFS
9#endif
10#endif
11#include <boost/thread.hpp>
12#include <boost/filesystem.hpp>
13#include <boost/date_time/posix_time/posix_time_types.hpp>
14
15#include "DimWriteStatistics.h"
16
17#include "DataCalib.h"
18#include "DataWriteRaw.h"
19
20#ifdef HAVE_FITS
21#include "DataWriteFits.h"
22#else
23#define DataWriteFits DataWriteRaw
24#endif
25
26namespace ba = boost::asio;
27namespace bs = boost::system;
28namespace fs = boost::filesystem;
29
30using ba::ip::tcp;
31
32using namespace std;
33
34// ========================================================================
35
36#include "EventBuilder.h"
37
38extern "C" {
39 extern void StartEvtBuild();
40 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
41}
42
43// ========================================================================
44
45class EventBuilderWrapper
46{
47public:
48 // FIXME
49 static EventBuilderWrapper *This;
50
51 MessageImp &fMsg;
52
53private:
54 boost::thread fThread;
55
56 enum CommandStates_t // g_runStat
57 {
58 kAbort = -2, // quit as soon as possible ('abort')
59 kExit = -1, // stop reading, quit when buffered events done ('exit')
60 kInitialize = 0, // 'initialize' (e.g. dim not yet started)
61 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]
62 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]
63 kModeFlush = 10, // read data from camera, but skip them ('flush')
64 kModeTest = 20, // read data and process them, but do not write to disk ('test')
65 kModeFlag = 30, // read data, process and write all to disk ('flag')
66 kModeRun = 40, // read data, process and write selected to disk ('run')
67 };
68
69 enum
70 {
71 kCurrent = 0,
72 kTotal = 1,
73 kEventId = 2,
74 kTriggerId = 3,
75 };
76
77 FAD::FileFormat_t fFileFormat;
78
79 uint32_t fMaxRun;
80 uint32_t fLastOpened;
81 uint32_t fLastClosed;
82 uint32_t fNumEvts[4];
83
84 DimWriteStatistics fDimWriteStats;
85 DimDescribedService fDimRuns;
86 DimDescribedService fDimEvents;
87 DimDescribedService fDimRawData;
88 DimDescribedService fDimEventData;
89 DimDescribedService fDimFeedbackData;
90 DimDescribedService fDimFwVersion;
91 DimDescribedService fDimRunNumber;
92 DimDescribedService fDimStatus;
93 DimDescribedService fDimDNA;
94 DimDescribedService fDimTemperature;
95 DimDescribedService fDimPrescaler;
96 DimDescribedService fDimRefClock;
97 DimDescribedService fDimRoi;
98 DimDescribedService fDimDac;
99 DimDescribedService fDimDrsCalibration;
100 DimDescribedService fDimStatistics1;
101 DimDescribedService fDimStatistics2;
102 DimDescribedService fDimFileFormat;
103
104 bool fDebugStream;
105 bool fDebugRead;
106 bool fDebugLog;
107
108 string fPath;
109 uint32_t fRunNumber;
110
111protected:
112 int64_t InitRunNumber()
113 {
114 fRunNumber = 1000;
115
116 // Ensure that the night doesn't change during our check
117 const int check = Time().NightAsInt();
118
119 while (--fRunNumber>0)
120 {
121 const string name = DataProcessorImp::FormFileName(fPath, fRunNumber, "");
122
123 if (access((name+"bin").c_str(), F_OK) == 0)
124 break;
125 if (access((name+"fits").c_str(), F_OK) == 0)
126 break;
127 if (access((name+"drs.fits").c_str(), F_OK) == 0)
128 break;
129
130 }
131
132 if (check != Time().NightAsInt())
133 return InitRunNumber();
134
135 fRunNumber++;
136
137 if (fRunNumber==1000)
138 {
139 fMsg.Error("You have a file with run-number 1000 in "+fPath);
140 return -1;
141 }
142
143 ostringstream str;
144 str << "Set next run-number to " << fRunNumber << " determined from " << fPath;
145 fMsg.Message(str);
146
147 fMsg.Info(" ==> TODO: Crosscheck with database!");
148
149 return check;
150 }
151
152 int64_t InitRunNumber(const string &path)
153 {
154 if (!DimWriteStatistics::DoesPathExist(path, fMsg))
155 {
156 fMsg.Error("Data path "+path+" does not exist!");
157 return -1;
158 }
159
160 //const fs::path fullPath = fs::system_complete(fs::path(path));
161
162 fPath = path;
163
164 fDimWriteStats.SetCurrentFolder(fPath);
165
166 return InitRunNumber();
167 }
168
169public:
170 EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
171 fFileFormat(FAD::kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
172 fDimWriteStats ("FAD_CONTROL", imp),
173 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", ""),
174 fDimEvents ("FAD_CONTROL/EVENTS", "I:4", ""),
175 fDimRawData ("FAD_CONTROL/RAW_DATA", "S:1;I:1;S:1;I:1;I:2;I:40;S:1440;S:160;F", ""),
176 fDimEventData ("FAD_CONTROL/EVENT_DATA", "F:1440;F:1440;F:1440;F:1440", ""),
177 fDimFeedbackData("FAD_CONTROL/FEEDBACK_DATA", "F:1440", ""),
178 fDimFwVersion ("FAD_CONTROL/FIRMWARE_VERSION", "F:42", ""),
179 fDimRunNumber ("FAD_CONTROL/RUN_NUMBER", "I:42", ""),
180 fDimStatus ("FAD_CONTROL/STATUS", "S:42", ""),
181 fDimDNA ("FAD_CONTROL/DNA", "X:40", ""),
182 fDimTemperature ("FAD_CONTROL/TEMPERATURE", "F:82", ""),
183 fDimPrescaler ("FAD_CONTROL/PRESCALER", "S:42", ""),
184 fDimRefClock ("FAD_CONTROL/REFERENCE_CLOCK", "I:42", ""),
185 fDimRoi ("FAD_CONTROL/REGION_OF_INTEREST", "S:2", ""),
186 fDimDac ("FAD_CONTROL/DAC", "S:336", ""),
187 fDimDrsCalibration("FAD_CONTROL/DRS_CALIBRATION", "I:1;I:3;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:1474560;F:163840;F:163840", ""),
188 fDimStatistics1 ("FAD_CONTROL/STATISTICS1", "I:3;I:5;X:4;I:3;I:3;I:40;I:1;I:2;C:40;I:40;I:40;X:40", ""),
189 fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40", ""),
190 fDimFileFormat ("FAD_CONTROL/FILE_FORMAT", "S:1", ""),
191 fDebugStream(false), fDebugRead(false), fDebugLog(false)
192 {
193 if (This)
194 throw logic_error("EventBuilderWrapper cannot be instantiated twice.");
195
196 This = this;
197
198 memset(fNumEvts, 0, sizeof(fNumEvts));
199 fDimEvents.Update(fNumEvts);
200
201 for (size_t i=0; i<40; i++)
202 ConnectSlot(i, tcp::endpoint());
203 }
204 virtual ~EventBuilderWrapper()
205 {
206 Abort();
207
208 // FIXME: Used timed_join and abort afterwards
209 // What's the maximum time the eb need to abort?
210 fThread.join();
211 //ffMsg.Info("EventBuilder stopped.");
212
213 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++)
214 delete *it;
215 }
216
217 set<uint32_t> fIsRunStarted;
218 map<uint32_t, FAD::RunDescription> fExpectedRuns;
219
220 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref)
221 {
222 if (maxtime<=0 || maxtime>24*60*60)
223 maxtime = 24*60*60;
224 if (maxevt<=0 || maxevt>INT32_MAX)
225 maxevt = INT32_MAX;
226
227 const FAD::RunDescription descr =
228 {
229 uint32_t(maxtime),
230 uint32_t(maxevt),
231 ref.first,
232 ref.second,
233 };
234
235 // FIMXE: Maybe reset an event counter so that the mcp can count events?
236
237 fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!");
238
239 fExpectedRuns[fRunNumber] = descr;
240 fIsRunStarted.insert(fRunNumber);
241 return fRunNumber++;
242 }
243
244 bool IsThreadRunning()
245 {
246 return !fThread.timed_join(boost::posix_time::microseconds(0));
247 }
248
249 void SetMaxMemory(unsigned int mb) const
250 {
251 /*
252 if (mb*1000000<GetUsedMemory())
253 {
254 // ffMsg.Warn("...");
255 return;
256 }*/
257
258 g_maxMem = size_t(mb)*1000000;
259 }
260
261 void StartThread(const vector<tcp::endpoint> &addr)
262 {
263 if (IsThreadRunning())
264 {
265 fMsg.Warn("Start - EventBuilder still running");
266 return;
267 }
268
269 fLastMessage.clear();
270
271 for (size_t i=0; i<40; i++)
272 ConnectSlot(i, addr[i]);
273
274 g_runStat = kModeRun;
275 g_maxProc = 3;
276
277 fMsg.Message("Starting EventBuilder thread");
278
279 fThread = boost::thread(StartEvtBuild);
280 }
281 void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
282 {
283 if (i>39)
284 return;
285
286 if (addr==tcp::endpoint())
287 {
288 DisconnectSlot(i);
289 return;
290 }
291
292 g_port[i].sockAddr.sin_family = AF_INET;
293 g_port[i].sockAddr.sin_addr.s_addr = htonl(addr.address().to_v4().to_ulong());
294 g_port[i].sockAddr.sin_port = htons(addr.port());
295 // In this order
296 g_port[i].sockDef = 1;
297 }
298 void DisconnectSlot(unsigned int i)
299 {
300 if (i>39)
301 return;
302
303 g_port[i].sockDef = 0;
304 // In this order
305 g_port[i].sockAddr.sin_family = AF_INET;
306 g_port[i].sockAddr.sin_addr.s_addr = 0;
307 g_port[i].sockAddr.sin_port = 0;
308 }
309 void IgnoreSlot(unsigned int i)
310 {
311 if (i>39)
312 return;
313 if (g_port[i].sockAddr.sin_port==0)
314 return;
315
316 g_port[i].sockDef = -1;
317 }
318
319
320 void Abort()
321 {
322 fMsg.Message("Signal abort to EventBuilder thread...");
323 g_runStat = kAbort;
324 }
325
326 void ResetThread(bool soft)
327 {
328 /*
329 if (g_reset > 0)
330
331 * suspend reading
332 * reset = g_reset;
333 * g_reset=0
334
335 * reset% 10
336 == 0 leave event Buffers as they are
337 == 1 let all buffers drain (write (incomplete) events)
338 > 1 flush all buffers (do not write buffered events)
339
340 * (reset/10)%10
341 > 0 close all sockets and destroy them (also free the
342 allocated read-buffers)
343 recreate before resuming operation
344 [ this is more than just close/open that can be
345 triggered by e.g. close/open the base-socket ]
346
347 * (reset/100)%10
348 > 0 close all open run-files
349
350 * (reset/1000)
351 sleep so many seconds before resuming operation
352 (does not (yet) take into account time left when waiting
353 for buffers getting empty ...)
354
355 * resume_reading
356
357 */
358 fMsg.Message("Signal reset to EventBuilder thread...");
359 g_reset = soft ? 101 : 102;
360 }
361
362 void Exit()
363 {
364 fMsg.Message("Signal exit to EventBuilder thread...");
365 g_runStat = kExit;
366 }
367
368 /*
369 void Wait()
370 {
371 fThread.join();
372 ffMsg.Message("EventBuilder stopped.");
373 }*/
374
375 void Hybernate() const { g_runStat = kHybernate; }
376 void Sleep() const { g_runStat = kSleep; }
377 void FlushMode() const { g_runStat = kModeFlush; }
378 void TestMode() const { g_runStat = kModeTest; }
379 void FlagMode() const { g_runStat = kModeFlag; }
380 void RunMode() const { g_runStat = kModeRun; }
381
382 // FIXME: To be removed
383 void SetMode(int mode) const { g_runStat = mode; }
384
385 bool IsConnected(int i) const { return gi_NumConnect[i]==7; }
386 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
387 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
388 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
389 int GetNumFilesOpen() const { return fFiles.size(); }
390
391 /*
392 bool IsConnected(int i) const { return gi_NumConnect[i]>0; }
393 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); }
394 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; }
395 int GetNumConnected(int i) const { return gi_NumConnect[i]; }
396 */
397
398 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; }
399 bool IsIgnored(int i) const { return g_port[i].sockDef==-1; }
400
401 void SetOutputFormat(FAD::FileFormat_t f)
402 {
403 fFileFormat = f;
404 fDimFileFormat.Update(uint16_t(f));
405 if (fFileFormat==FAD::kCalib)
406 {
407 DataCalib::Restart();
408 DataCalib::Update(fDimDrsCalibration);
409 fMsg.Message("Resetted DRS calibration.");
410 }
411 }
412
413 virtual int ResetSecondaryDrsBaseline()
414 {
415 if (DataCalib::ResetTrgOff(fDimDrsCalibration))
416 fMsg.Message("Resetted DRS calibration for secondary baseline.");
417 else
418 fMsg.Warn("Could not reset DRS calibration of secondary baseline.");
419
420 return 0;
421 }
422
423 void SetDebugLog(bool b) { fDebugLog = b; }
424
425 void SetDebugStream(bool b)
426 {
427 fDebugStream = b;
428 if (b)
429 return;
430
431 for (int i=0; i<40; i++)
432 {
433 if (!fDumpStream[i].is_open())
434 continue;
435
436 fDumpStream[i].close();
437
438 ostringstream name;
439 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";
440 fMsg.Message("Closed file '"+name.str()+"'");
441 }
442 }
443
444 void SetDebugRead(bool b)
445 {
446 fDebugRead = b;
447 if (b || !fDumpRead.is_open())
448 return;
449
450 fDumpRead.close();
451 fMsg.Message("Closed file 'socket_events.txt'");
452 }
453
454// size_t GetUsedMemory() const { return gi_usedMem; }
455
456 void LoadDrsCalibration(const char *fname)
457 {
458 if (!DataCalib::ReadFits(fname, fMsg))
459 return;
460 fMsg.Info("Successfully loaded DRS calibration from "+string(fname));
461 DataCalib::Update(fDimDrsCalibration);
462 }
463
464 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; }
465
466
467 /*
468 struct OpenFileToDim
469 {
470 int code;
471 char fileName[FILENAME_MAX];
472 };
473
474 SignalRunOpened(runid, filename);
475 // Send num open files
476 // Send runid, (more info about the run?), filename via dim
477
478 SignalEvtWritten(runid);
479 // Send num events written of newest file
480
481 SignalRunClose(runid);
482 // Send new num open files
483 // Send empty file-name if no file is open
484
485 */
486
487 // -------------- Mapped event builder callbacks ------------------
488
489 void UpdateRuns(const string &fname="")
490 {
491 uint32_t values[5] =
492 {
493 static_cast<uint32_t>(fFiles.size()),
494 0xffffffff,
495 0,
496 fLastOpened,
497 fLastClosed
498 };
499
500 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();
501 it!=fFiles.end(); it++)
502 {
503 const DataProcessorImp *file = *it;
504
505 if (file->GetRunId()<values[1])
506 values[1] = file->GetRunId();
507
508 if (file->GetRunId()>values[2])
509 values[2] = file->GetRunId();
510 }
511
512 fMaxRun = values[2];
513
514 vector<char> data(sizeof(values)+fname.size()+1);
515 memcpy(data.data(), values, sizeof(values));
516 strcpy(data.data()+sizeof(values), fname.c_str());
517
518 fDimRuns.Update(data);
519 }
520
521 vector<DataProcessorImp*> fFiles;
522
523 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t)
524 {
525 fMsg.Info(" ==> TODO: Update run configuration in database!");
526 fMsg.Info(" ==> TODO: Remove run from fExpected runs in case of failure!");
527 fMsg.Info(" ==> TODO: Write information from fTargetConfig to header!");
528
529 map<uint32_t,FAD::RunDescription>::iterator it = fExpectedRuns.begin();
530 while (it!=fExpectedRuns.end())
531 {
532 if (it->first<runid)
533 {
534 ostringstream str;
535 str << "runOpen - Missed run " << it->first << "." << endl;
536 fMsg.Info(str);
537
538 fExpectedRuns.erase(it++);
539 continue;
540 }
541 if (it->first==runid)
542 break;
543 it++;
544 }
545 if (it==fExpectedRuns.end())
546 {
547 ostringstream str;
548 str << "runOpen - Run " << runid << " wasn't expected." << endl;
549 fMsg.Warn(str);
550 }
551
552 const FAD::RunDescription desc = it->second;
553
554 fExpectedRuns.erase(it);
555
556 // Check if file already exists...
557 DataProcessorImp *file = 0;
558 switch (fFileFormat)
559 {
560 case FAD::kNone: file = new DataDump(fPath, runid, fMsg); break;
561 case FAD::kDebug: file = new DataDebug(fPath, runid, fMsg); break;
562 case FAD::kFits: file = new DataWriteFits(fPath, runid, fMsg); break;
563 case FAD::kRaw: file = new DataWriteRaw(fPath, runid, fMsg); break;
564 case FAD::kCalib: file = new DataCalib(fPath, runid, fDimDrsCalibration, fMsg); break;
565 }
566
567 try
568 {
569 if (!file->Open(h, desc))
570 return 0;
571 }
572 catch (const exception &e)
573 {
574 fMsg.Error("Exception trying to open file: "+string(e.what()));
575 return 0;
576 }
577
578 fFiles.push_back(file);
579
580 ostringstream str;
581 str << "Opened: " << file->GetFileName() << " (" << file->GetRunId() << ")";
582 fMsg.Info(str);
583
584 fDimWriteStats.FileOpened(file->GetFileName());
585
586 fLastOpened = runid;
587 UpdateRuns(file->GetFileName());
588
589 fNumEvts[kEventId] = 0;
590 fNumEvts[kTriggerId] = 0;
591
592 fNumEvts[kCurrent] = 0;
593 fDimEvents.Update(fNumEvts);
594 // fDimCurrentEvent.Update(uint32_t(0));
595
596 return reinterpret_cast<FileHandle_t>(file);
597 }
598
599 int runWrite(FileHandle_t handler, EVENT *e, size_t sz)
600 {
601 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
602
603 if (!file->WriteEvt(e))
604 return -1;
605
606 if (file->GetRunId()==fMaxRun)
607 {
608 fNumEvts[kCurrent]++;
609 fNumEvts[kEventId] = e->EventNum;
610 fNumEvts[kTriggerId] = e->TriggerNum;
611 }
612
613 fNumEvts[kTotal]++;
614
615 static Time oldt(boost::date_time::neg_infin);
616 Time newt;
617 if (newt>oldt+boost::posix_time::seconds(1))
618 {
619 fDimEvents.Update(fNumEvts);
620 oldt = newt;
621 }
622
623
624 // ===> SignalEvtWritten(runid);
625 // Send num events written of newest file
626
627 /* close run runId (all all runs if runId=0) */
628 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
629 //CloseRunFile(file->GetRunId(), time(NULL)+2) ;
630
631 return 0;
632 }
633
634 virtual void CloseRun(uint32_t runid) { }
635
636 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)
637 {
638 fMsg.Info(" ==> TODO: Update run configuration in database!");
639 fMsg.Info(" ==> TODO: Remove run from fExpected runs!");
640
641 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);
642
643 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);
644 if (it==fFiles.end())
645 {
646 ostringstream str;
647 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";
648 fMsg.Fatal(str);
649 return -1;
650 }
651
652 fFiles.erase(it);
653
654 fLastClosed = file->GetRunId();
655 CloseRun(fLastClosed);
656 UpdateRuns();
657
658 fDimEvents.Update(fNumEvts);
659
660 const bool rc = file->Close(tail);
661 if (!rc)
662 {
663 // Error message
664 }
665
666 ostringstream str;
667 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";
668 fMsg.Info(str);
669
670 delete file;
671
672 // ==> SignalRunClose(runid);
673 // Send new num open files
674 // Send empty file-name if no file is open
675
676 return rc ? 0 : -1;
677 }
678
679 ofstream fDumpStream[40];
680
681 void debugStream(int isock, void *buf, int len)
682 {
683 if (!fDebugStream)
684 return;
685
686 const int slot = isock/7;
687 if (slot<0 || slot>39)
688 return;
689
690 if (!fDumpStream[slot].is_open())
691 {
692 ostringstream name;
693 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";
694
695 fDumpStream[slot].open(name.str().c_str(), ios::app);
696 if (!fDumpStream[slot])
697 {
698 ostringstream str;
699 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";
700 fMsg.Error(str);
701
702 return;
703 }
704
705 fMsg.Message("Opened file '"+name.str()+"' for writing.");
706 }
707
708 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);
709 }
710
711 ofstream fDumpRead; // Stream to possibly dump docket events
712
713 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)
714 {
715 // isock = socketID (0-279)
716 // ibyte = #bytes gelesen
717 // event = eventId (oder 0 wenn noch nicht bekannt)
718 // state : 1=finished reading data
719 // 0=reading data
720 // -1=start reading data (header)
721 // -2=start reading data,
722 // eventId not known yet (too little data)
723 // tsec, tusec = time when reading seconds, microseconds
724 //
725 if (!fDebugRead || ibyte==0)
726 return;
727
728 if (!fDumpRead.is_open())
729 {
730 fDumpRead.open("socket_events.txt", ios::app);
731 if (!fDumpRead)
732 {
733 ostringstream str;
734 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";
735 fMsg.Error(str);
736
737 return;
738 }
739
740 fMsg.Message("Opened file 'socket_events.txt' for writing.");
741
742 fDumpRead << "# START: " << Time().GetAsStr() << endl;
743 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;
744 }
745
746 fDumpRead
747 << setw(2) << state << " "
748 << setw(8) << tsec << " "
749 << setw(9) << tusec << " "
750 << setw(3) << isock << " "
751 << setw(2) << isock/7 << " "
752 << runno << " "
753 << event << " "
754 << ftmevt << " "
755 << ibyte << endl;
756 }
757
758 array<uint16_t,2> fVecRoi;
759
760 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int iboard)
761 {
762 /*
763 fadhd[i] ist ein array mit den 40 fad-headers
764 (falls ein board nicht gelesen wurde, ist start_package_flag =0 )
765
766 event ist die Struktur, die auch die write routine erhaelt;
767 darin sind im header die 'soll-werte' fuer z.B. eventID
768 als auch die ADC-Werte (falls Du die brauchst)
769
770 Wenn die routine einen negativen Wert liefert, wird das event
771 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip]
772 */
773
774 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }};
775
776 if (roi!=fVecRoi)
777 {
778 Update(fDimRoi, roi);
779 fVecRoi = roi;
780 }
781
782 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
783 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
784
785 // FIMXE: Compare with target configuration
786
787 for (const FAD::EventHeader *ptr=beg; ptr!=end; ptr++)
788 {
789 // FIXME: Compare with expectations!!!
790 if (ptr->fStartDelimiter==0)
791 {
792 if (ptr==beg)
793 beg++;
794 continue;
795 }
796
797 if (beg->fStatus != ptr->fStatus)
798 {
799 fMsg.Error("Inconsistency in FAD status detected.... closing run.");
800 CloseRunFile(runNr, 0, 0);
801 return -1;
802 }
803
804 if (beg->fRunNumber != ptr->fRunNumber)
805 {
806 fMsg.Error("Inconsistent run number detected.... closing run.");
807 CloseRunFile(runNr, 0, 0);
808 return -1;
809 }
810
811 /*
812 if (beg->fVersion != ptr->fVersion)
813 {
814 Error("Inconsist firmware version detected.... closing run.");
815 CloseRunFile(runNr, 0, 0);
816 break;
817 }
818 if (beg->fEventCounter != ptr->fEventCounter)
819 {
820 Error("Inconsist run number detected.... closing run.");
821 CloseRunFile(runNr, 0, 0);
822 break;
823 }
824 if (beg->fTriggerCounter != ptr->fTriggerCounter)
825 {
826 Error("Inconsist trigger number detected.... closing run.");
827 CloseRunFile(runNr, 0, 0);
828 break;
829 }*/
830
831 if (beg->fAdcClockPhaseShift != ptr->fAdcClockPhaseShift)
832 {
833 fMsg.Error("Inconsistent phase shift detected.... closing run.");
834 CloseRunFile(runNr, 0, 0);
835 return -1;
836 }
837
838 if (memcmp(beg->fDac, ptr->fDac, sizeof(beg->fDac)))
839 {
840 fMsg.Error("Inconsistent DAC values detected.... closing run.");
841 CloseRunFile(runNr, 0, 0);
842 return -1;
843 }
844
845 if (beg->fTriggerType != ptr->fTriggerType)
846 {
847 fMsg.Error("Inconsistent trigger type detected.... closing run.");
848 CloseRunFile(runNr, 0, 0);
849 return -1;
850 }
851 }
852
853 // check REFCLK_frequency
854 // check consistency with command configuration
855 // how to log errors?
856 // need gotNewRun/closedRun to know it is finished
857
858 return 0;
859 }
860
861 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event)
862 {
863 // Currently we send any event no matter what its trigger id is...
864 // To be changed.
865 static Time oldt(boost::date_time::neg_infin);
866 Time newt;
867
868 // FIXME: Only send events if the have newer run-numbers
869 if (newt<oldt+boost::posix_time::seconds(1))
870 return;
871
872 oldt = newt;
873
874 vector<char> data(sizeof(EVENT)+event->Roi*sizeof(float)*(1440+160));
875 memcpy(data.data(), event, sizeof(EVENT));
876
877 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT));
878
879 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);
880 fDimRawData.Update(data);
881
882 DrsCalibrate::RemoveSpikes(vec, event->Roi);
883
884 vector<float> data2(1440*4); // Mean, RMS, Max, Pos
885 DrsCalibrate::GetPixelStats(data2.data(), vec, event->Roi);
886
887 fDimEventData.Update(data2);
888 }
889
890 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event)
891 {
892 if (!DataCalib::IsValid())
893 return;
894
895 // Workaround to find a valid header.....
896 const FAD::EventHeader *beg = reinterpret_cast<FAD::EventHeader*>(fadhd);
897 const FAD::EventHeader *end = reinterpret_cast<FAD::EventHeader*>(fadhd)+40;
898
899 // FIMXE: Compare with target configuration
900
901 const FAD::EventHeader *ptr=beg;
902 for (; ptr!=end; ptr++)
903 {
904 if (ptr->fStartDelimiter==0)
905 continue;
906
907 if (!ptr->HasTriggerLPext() && !ptr->HasTriggerLPint())
908 return;
909 }
910
911 if (ptr->fStartDelimiter==0)
912 return;
913
914 vector<float> data(event->Roi*1440);
915 DataCalib::Apply(data.data(), event->Adc_Data, event->StartPix, event->Roi);
916
917 DrsCalibrate::RemoveSpikes(data.data(), event->Roi);
918
919 vector<float> data2(1440); // Mean, RMS, Max, Pos, first, last
920 DrsCalibrate::GetPixelMax(data2.data(), data.data(), event->Roi, 0, event->Roi-1);
921
922 fDimFeedbackData.Update(data2);
923 }
924
925 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t iboard, void */*buffer*/)
926 {
927 switch (threadID)
928 {
929 case 0:
930 SendRawData(fadhd, event);
931 return 1;
932 case 1:
933 SendFeedbackData(fadhd, event);
934 return 2;
935 }
936 return 100;
937 }
938
939
940 bool IsRunStarted() const
941 {
942 const set<uint32_t>::const_iterator it = fIsRunStarted.find(fRunNumber-1);
943 return it==fIsRunStarted.end();// ? true : it->second.started;
944 }
945
946 uint32_t GetRunNumber() const
947 {
948 return fRunNumber;
949 }
950
951 void IncreaseRunNumber(uint32_t run)
952 {
953 if (run>fRunNumber)
954 fRunNumber = run;
955 }
956
957 void gotNewRun(uint32_t runnr, PEVNT_HEADER */*headers*/)
958 {
959 // This function is called even when writing is switched off
960 set<uint32_t>::iterator it = fIsRunStarted.begin();
961 while (it!=fIsRunStarted.end())
962 {
963 if (*it<runnr)
964 {
965 ostringstream str;
966 str << "gotNewRun - Missed run " << *it << "." << endl;
967 fMsg.Info(str);
968
969 fIsRunStarted.erase(it++);
970 continue;
971 }
972 if (*it==runnr)
973 break;
974 it++;
975 }
976 if (it==fIsRunStarted.end())
977 {
978 ostringstream str;
979 str << "gotNewRun - Not waiting for run " << runnr << "." << endl;
980 fMsg.Warn(str);
981 return;
982 }
983
984 map<uint32_t,FAD::RunDescription>::iterator i2 = fExpectedRuns.find(runnr);
985 if (i2==fExpectedRuns.end())
986 {
987 ostringstream str;
988 str << "gotNewRun - Run " << runnr << " wasn't expected." << endl;
989 fMsg.Warn(str);
990 return;
991 }
992
993 CloseRunFile(runnr, time(NULL)+i2->second.maxtime, i2->second.maxevt);
994 // return: 0=close scheduled / >0 already closed / <0 does not exist
995
996 // FIXME: Move configuration from expected runs to runs which will soon
997 // be opened/closed
998
999 fIsRunStarted.erase(it);
1000 }
1001
1002 map<boost::thread::id, string> fLastMessage;
1003
1004 void factOut(int severity, int err, const char *message)
1005 {
1006 if (!fDebugLog && severity==99)
1007 return;
1008
1009 ostringstream str;
1010 //str << boost::this_thread::get_id() << " ";
1011 str << "EventBuilder(";
1012 if (err<0)
1013 str << "---";
1014 else
1015 str << err;
1016 str << "): " << message;
1017
1018 string &old = fLastMessage[boost::this_thread::get_id()];
1019
1020 if (str.str()==old)
1021 return;
1022 old = str.str();
1023
1024 fMsg.Update(str, severity);
1025 }
1026/*
1027 void factStat(int64_t *stat, int len)
1028 {
1029 if (len!=7)
1030 {
1031 fMsg.Warn("factStat received unknown number of values.");
1032 return;
1033 }
1034
1035 vector<int64_t> data(1, g_maxMem);
1036 data.insert(data.end(), stat, stat+len);
1037
1038 static vector<int64_t> last(8);
1039 if (data==last)
1040 return;
1041 last = data;
1042
1043 fDimStatistics.Update(data);
1044
1045 // len ist die Laenge des arrays.
1046 // array[4] enthaelt wieviele bytes im Buffer aktuell belegt sind; daran
1047 // kannst Du pruefen, ob die 100MB voll sind ....
1048
1049 ostringstream str;
1050 str
1051 << "Wait=" << stat[0] << " "
1052 << "Skip=" << stat[1] << " "
1053 << "Del=" << stat[2] << " "
1054 << "Tot=" << stat[3] << " "
1055 << "Mem=" << stat[4] << "/" << g_maxMem << " "
1056 << "Read=" << stat[5] << " "
1057 << "Conn=" << stat[6];
1058
1059 fMsg.Info(str);
1060 }
1061 */
1062
1063 void factStat(const EVT_STAT &stat)
1064 {
1065 fDimStatistics2.Update(stat);
1066 }
1067
1068 void factStat(const GUI_STAT &stat)
1069 {
1070 fDimStatistics1.Update(stat);
1071 }
1072
1073
1074 array<FAD::EventHeader, 40> fVecHeader;
1075
1076 template<typename T, class S>
1077 array<T, 42> Compare(const S *vec, const T *t)
1078 {
1079 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(vec);
1080
1081 const T *min = NULL;
1082 const T *val = NULL;
1083 const T *max = NULL;
1084
1085 array<T, 42> arr;
1086
1087 bool rc = true;
1088 for (int i=0; i<40; i++)
1089 {
1090 const char *base = reinterpret_cast<const char*>(vec+i);
1091 const T *ref = reinterpret_cast<const T*>(base+offset);
1092
1093 arr[i] = *ref;
1094
1095 if (gi_NumConnect[i]!=7)
1096 {
1097 arr[i] = 0;
1098 continue;
1099 }
1100
1101 if (!val)
1102 {
1103 min = ref;
1104 val = ref;
1105 max = ref;
1106 }
1107
1108 if (*ref<*min)
1109 min = ref;
1110
1111 if (*ref>*max)
1112 max = ref;
1113
1114 if (*val!=*ref)
1115 rc = false;
1116 }
1117
1118 arr[40] = val ? *min : 1;
1119 arr[41] = val ? *max : 0;
1120
1121 return arr;
1122 }
1123
1124 template<typename T>
1125 array<T, 42> CompareBits(const FAD::EventHeader *h, const T *t)
1126 {
1127 const int offset = reinterpret_cast<const char *>(t) - reinterpret_cast<const char *>(h);
1128
1129 T val = 0;
1130 T rc = 0;
1131
1132 array<T, 42> vec;
1133
1134 bool first = true;
1135
1136 for (int i=0; i<40; i++)
1137 {
1138 const char *base = reinterpret_cast<const char*>(&fVecHeader[i]);
1139 const T *ref = reinterpret_cast<const T*>(base+offset);
1140
1141 vec[i+2] = *ref;
1142
1143 if (gi_NumConnect[i]!=7)
1144 {
1145 vec[i+2] = 0;
1146 continue;
1147 }
1148
1149 if (first)
1150 {
1151 first = false;
1152 val = *ref;
1153 rc = 0;
1154 }
1155
1156 rc |= val^*ref;
1157 }
1158
1159 vec[0] = rc;
1160 vec[1] = val;
1161
1162 return vec;
1163 }
1164
1165 template<typename T, size_t N>
1166 void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
1167 {
1168// svc.setQuality(vec[40]<=vec[41]);
1169 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
1170 svc.Update();
1171 }
1172
1173 template<typename T>
1174 void Print(const char *name, const pair<bool,array<T, 43>> &data)
1175 {
1176 cout << name << "|" << data.first << "|" << data.second[1] << "|" << data.second[0] << "<x<" << data.second[1] << ":";
1177 for (int i=0; i<40;i++)
1178 cout << " " << data.second[i+3];
1179 cout << endl;
1180 }
1181
1182 vector<uint> fNumConnected;
1183
1184 void debugHead(int /*socket*/, const FAD::EventHeader &h)
1185 {
1186 const uint16_t id = h.Id();
1187 if (id>39)
1188 return;
1189
1190 if (fNumConnected.size()!=40)
1191 fNumConnected.resize(40);
1192
1193 const vector<uint> con(gi_NumConnect, gi_NumConnect+40);
1194
1195 const bool changed = con!=fNumConnected || !IsThreadRunning();
1196
1197 fNumConnected = con;
1198
1199 const FAD::EventHeader old = fVecHeader[id];
1200 fVecHeader[id] = h;
1201
1202 if (old.fVersion != h.fVersion || changed)
1203 {
1204 const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
1205
1206 array<float,42> data;
1207 for (int i=0; i<42; i++)
1208 {
1209 ostringstream str;
1210 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
1211 data[i] = stof(str.str());
1212 }
1213 Update(fDimFwVersion, data);
1214 }
1215
1216 if (old.fRunNumber != h.fRunNumber || changed)
1217 {
1218 const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
1219 fDimRunNumber.Update(run);
1220 }
1221
1222 if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
1223 {
1224 const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
1225 fDimPrescaler.Update(pre);
1226 }
1227
1228 if (old.fDNA != h.fDNA || changed)
1229 {
1230 const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
1231 Update(fDimDNA, dna, 40);
1232 }
1233
1234 if (old.fStatus != h.fStatus || changed)
1235 {
1236 const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
1237 Update(fDimStatus, sts);
1238 }
1239
1240 if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
1241 {
1242 array<uint16_t, FAD::kNumDac*42> dacs;
1243
1244 for (int i=0; i<FAD::kNumDac; i++)
1245 {
1246 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
1247 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
1248 }
1249
1250 Update(fDimDac, dacs);
1251 }
1252
1253 // -----------
1254
1255 static Time oldt(boost::date_time::neg_infin);
1256 Time newt;
1257
1258 if (newt>oldt+boost::posix_time::seconds(1))
1259 {
1260 oldt = newt;
1261
1262 // --- RefClock
1263
1264 const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
1265 Update(fDimRefClock, clk);
1266
1267 // --- Temperatures
1268
1269 const array<int16_t,42> tmp[4] =
1270 {
1271 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]), // 0-39:val, 40:min, 41:max
1272 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]), // 0-39:val, 40:min, 41:max
1273 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]), // 0-39:val, 40:min, 41:max
1274 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3]) // 0-39:val, 40:min, 41:max
1275 };
1276
1277 vector<int16_t> data;
1278 data.reserve(82);
1279 data.push_back(tmp[0][40]); // min: 0
1280 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 1-40
1281 data.push_back(tmp[0][41]); // max: 41
1282 data.insert(data.end(), tmp[0].data(), tmp[0].data()+40); // val: 42-81
1283
1284 for (int j=1; j<=3; j++)
1285 {
1286 const array<int16_t,42> &ref = tmp[j];
1287
1288 // Gloabl min
1289 if (ref[40]<data[0]) // 40=min
1290 data[0] = ref[40];
1291
1292 // Global max
1293 if (ref[41]>data[41]) // 41=max
1294 data[41] = ref[41];
1295
1296 for (int i=0; i<40; i++)
1297 {
1298 // min per board
1299 if (ref[i]<data[i+1]) // data: 1-40
1300 data[i+1] = ref[i]; // ref: 0-39
1301
1302 // max per board
1303 if (ref[i]>data[i+42]) // data: 42-81
1304 data[i+42] = ref[i]; // ref: 0-39
1305 }
1306
1307
1308 }
1309
1310 vector<float> deg(82); // 0: global min, 1-40: min
1311 for (int i=0; i<82; i++) // 41: global max, 42-81: max
1312 deg[i] = data[i]/16.;
1313 fDimTemperature.Update(deg);
1314 }
1315 }
1316};
1317
1318EventBuilderWrapper *EventBuilderWrapper::This = 0;
1319
1320// ----------- Event builder callbacks implementation ---------------
1321extern "C"
1322{
1323 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len)
1324 {
1325 return EventBuilderWrapper::This->runOpen(irun, runhd, len);
1326 }
1327
1328 int runWrite(FileHandle_t fileId, EVENT *event, size_t len)
1329 {
1330 return EventBuilderWrapper::This->runWrite(fileId, event, len);
1331 }
1332
1333 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len)
1334 {
1335 return EventBuilderWrapper::This->runClose(fileId, runth, len);
1336 }
1337
1338 // -----
1339
1340 void *runStart(uint32_t irun, RUN_HEAD *runhd, size_t len)
1341 {
1342 return NULL;
1343 }
1344
1345 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
1346 {
1347 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
1348 }
1349
1350 int runEnd(uint32_t, void *runPtr)
1351 {
1352 return 0;
1353 }
1354
1355 // -----
1356
1357 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
1358 {
1359 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
1360 }
1361
1362 void gotNewRun(uint32_t runnr, PEVNT_HEADER *headers)
1363 {
1364 return EventBuilderWrapper::This->gotNewRun(runnr, headers);
1365 }
1366
1367 // -----
1368
1369 void factOut(int severity, int err, const char *message)
1370 {
1371 EventBuilderWrapper::This->factOut(severity, err, message);
1372 }
1373
1374 void factStat(GUI_STAT stat)
1375 {
1376 EventBuilderWrapper::This->factStat(stat);
1377 }
1378
1379 void factStatNew(EVT_STAT stat)
1380 {
1381 EventBuilderWrapper::This->factStat(stat);
1382 }
1383
1384 // ------
1385
1386 void debugHead(int socket, int/*board*/, void *buf)
1387 {
1388 const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf);
1389 EventBuilderWrapper::This->debugHead(socket, h);
1390 }
1391
1392 void debugStream(int isock, void *buf, int len)
1393 {
1394 return EventBuilderWrapper::This->debugStream(isock, buf, len);
1395 }
1396
1397 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec)
1398 {
1399 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
1400 }
1401}
1402
1403#endif
Note: See TracBrowser for help on using the repository browser.