- Timestamp:
- 05/24/13 14:00:44 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilderWrapper.h
r15610 r16107 12 12 #include <boost/filesystem.hpp> 13 13 #include <boost/date_time/posix_time/posix_time_types.hpp> 14 #include <condition_variable>15 14 16 15 #include "DimWriteStatistics.h" … … 41 40 #include "EventBuilder.h" 42 41 43 extern "C" { 44 extern void StartEvtBuild(); 45 extern int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt); 46 } 42 void StartEvtBuild(); 43 void CloseRunFile(); 47 44 48 45 // ======================================================================== … … 59 56 boost::thread fThreadMain; 60 57 61 enum CommandStates_t // g_runStat62 {63 kAbort = -2, // quit as soon as possible ('abort')64 kExit = -1, // stop reading, quit when buffered events done ('exit')65 kInitialize = 0, // 'initialize' (e.g. dim not yet started)66 kHybernate = 1, // do nothing for long time ('hybernate') [wakeup within ~1sec]67 kSleep = 2, // do nothing ('sleep') [wakeup within ~10msec]68 kModeFlush = 10, // read data from camera, but skip them ('flush')69 kModeTest = 20, // read data and process them, but do not write to disk ('test')70 kModeFlag = 30, // read data, process and write all to disk ('flag')71 kModeRun = 40, // read data, process and write selected to disk ('run')72 };73 74 58 enum 75 59 { … … 82 66 FAD::FileFormat_t fFileFormat; 83 67 84 uint32_t fMaxRun;68 //uint32_t fMaxRun; 85 69 uint32_t fLastOpened; 86 70 uint32_t fLastClosed; … … 113 97 Queue<pair<Time,array<uint32_t,4>>> fDimQueue3; 114 98 Queue<pair<Time,array<uint16_t,2>>> fDimQueue4; 115 116 bool fDebugStream;117 bool fDebugRead;118 bool fDebugLog;119 99 120 100 string fPath; 121 uint 64_t fNightAsInt;101 uint32_t fNightAsInt; 122 102 uint32_t fRunNumber; 103 int64_t fRunInProgress; 104 105 array<uint16_t,2> fVecRoi; 123 106 124 107 protected: … … 140 123 141 124 // Get current night 142 const uint 64_t night = Time().NightAsInt();125 const uint32_t night = Time().NightAsInt(); 143 126 if (night==fNightAsInt) 144 127 return true; … … 185 168 public: 186 169 EventBuilderWrapper(MessageImp &imp) : fMsg(imp), 187 fFileFormat(FAD::kNone), fMaxRun(0),fLastOpened(0), fLastClosed(0),170 fFileFormat(FAD::kNone), /*fMaxRun(0),*/ fLastOpened(0), fLastClosed(0), 188 171 fDimWriteStats ("FAD_CONTROL", imp), 189 172 fDimRuns ("FAD_CONTROL/RUNS", "I:5;C", … … 254 237 "|rateBytes[int]:Bytes read this cycle" 255 238 "|totBytes[int]:Bytes read (counter)"), 256 /*fDimStatistics2 ("FAD_CONTROL/STATISTICS2", "I:1;I:280;X:40;I:40;I:4;I:4;I:2;I:2;I:3;C:40",257 "Event Builder status, events oriented"258 "|reset[int]:If increased, reset all counters"259 "|numRead[int]:How often sucessful read from N sockets per loop"260 "|gotByte[int]:number of bytes read per board"261 "|gotErr[int]:number of com. errors per board"262 "|evtStat[int]:number of evts read, completed, with errors, incomplete"263 "|procStat[int]:num. of evts proc., w probs, acc. or rej. by SW trigger"264 "|feedStat[int]:number of evts used or rejected by feedback system"265 "|wrtStat[int]:number of evts written to disk, with errors"266 "|runStat[int]:number of run opened, closed, with open or close errors"267 "|numConn[int]:number of sockets successfully opened per board"),*/268 239 fDimFileFormat("FAD_CONTROL/FILE_FORMAT", "S:1", "|format[int]:Current file format"), 269 240 fDimIncomplete("FAD_CONTROL/INCOMPLETE", "X:1", "|incomplete[bits]:One bit per board"), … … 273 244 fDimQueue3(std::bind(&EventBuilderWrapper::updateEvents, this, placeholders::_1)), 274 245 fDimQueue4(std::bind(&EventBuilderWrapper::updateRoi, this, placeholders::_1)), 275 f DebugStream(false), fDebugRead(false), fDebugLog(false), fNightAsInt(0)246 fNightAsInt(0), fRunInProgress(-1) 276 247 { 277 248 if (This) … … 279 250 280 251 This = this; 252 253 fVecRoi.fill(0); 281 254 282 255 memset(fNumEvts.data(), 0, sizeof(fNumEvts)); … … 294 267 // What's the maximum time the eb need to abort? 295 268 fThreadMain.join(); 296 297 //ffMsg.Info("EventBuilder stopped."); 298 299 for (vector<DataProcessorImp*>::iterator it=fFiles.begin(); it!=fFiles.end(); it++) 300 delete *it; 301 } 302 303 set<uint32_t> fIsRunStarted; 269 } 270 304 271 map<uint32_t, FAD::RunDescription> fExpectedRuns; 272 273 mutex mtx_newrun; 305 274 306 275 uint32_t StartNewRun(int64_t maxtime, int64_t maxevt, const pair<string, FAD::Configuration> &ref) … … 311 280 maxevt = INT32_MAX; 312 281 282 if (!InitRunNumber()) 283 return 0; 284 313 285 const FAD::RunDescription descr = 314 286 { 315 287 uint32_t(maxtime), 316 288 uint32_t(maxevt), 289 fNightAsInt, 317 290 ref.first, 318 291 ref.second, 319 292 }; 320 293 321 if (!InitRunNumber()) 322 return 0; 323 324 // FIMXE: Maybe reset an event counter so that the mcp can count events? 325 326 //fMsg.Info(" ==> TODO: Set a limit on the size of fExpectedRuns!"); 327 294 const lock_guard<mutex> lock(mtx_newrun); 328 295 fExpectedRuns[fRunNumber] = descr; 329 fIsRunStarted.insert(fRunNumber);330 296 return fRunNumber++; 331 297 } … … 333 299 bool IsThreadRunning() 334 300 { 301 if (fThreadMain.get_id()==boost::this_thread::get_id()) 302 return true; 335 303 return !fThreadMain.timed_join(boost::posix_time::microseconds(0)); 336 304 } … … 338 306 void SetMaxMemory(unsigned int mb) const 339 307 { 340 /*341 if (mb*1000000<GetUsedMemory())342 {343 // ffMsg.Warn("...");344 return;345 }*/346 347 308 g_maxMem = size_t(mb)*1000000; 348 309 } … … 356 317 } 357 318 358 fLastMessage.clear();319 //fLastMessage.clear(); 359 320 360 321 for (size_t i=0; i<40; i++) 361 322 ConnectSlot(i, addr[i]); 362 323 363 g_runStat = kModeRun;364 g_maxProc = 1;365 366 324 fMsg.Message("Starting EventBuilder thread"); 367 325 368 326 fThreadMain = boost::thread(StartEvtBuild); 369 327 } 328 370 329 void ConnectSlot(unsigned int i, const tcp::endpoint &addr) 371 330 { … … 400 359 // In this order 401 360 402 struct sockaddr_in addr; //IP for each socket403 addr.sin_family = AF_INET;404 addr.sin_addr.s_addr = 0;405 addr.sin_port = 0;406 memcpy(&g_port[i].sockAddr, &addr, sizeof(struct sockaddr_in));407 408 361 fDimIncomplete.setQuality(0); 409 362 fDimIncomplete.Update(uint64_t(0)); … … 413 366 if (i>39) 414 367 return; 368 415 369 if (g_port[i].sockAddr.sin_port==0) 416 370 return; … … 423 377 { 424 378 fMsg.Message("Signal abort to EventBuilder thread..."); 425 g_reset = 1;379 g_reset = 2; 426 380 } 427 381 428 382 void ResetThread(bool soft) 429 383 { 430 /*431 if (g_reset > 0)432 433 * suspend reading434 * reset = g_reset;435 * g_reset=0436 437 * reset% 10438 == 0 leave event Buffers as they are439 == 1 let all buffers drain (write (incomplete) events)440 > 1 flush all buffers (do not write buffered events)441 442 * (reset/10)%10443 > 0 close all sockets and destroy them (also free the444 allocated read-buffers)445 recreate before resuming operation446 [ this is more than just close/open that can be447 triggered by e.g. close/open the base-socket ]448 449 * (reset/100)%10450 > 0 close all open run-files451 452 * (reset/1000)453 sleep so many seconds before resuming operation454 (does not (yet) take into account time left when waiting455 for buffers getting empty ...)456 457 * resume_reading458 459 */460 384 fMsg.Message("Signal reset to EventBuilder thread..."); 461 385 g_reset = soft ? 101 : 102; … … 465 389 { 466 390 fMsg.Message("Signal exit to EventBuilder thread..."); 467 g_runStat = kExit; 468 } 469 470 /* 471 void Wait() 472 { 473 fThread.join(); 474 ffMsg.Message("EventBuilder stopped."); 475 }*/ 476 477 void Hybernate() const { g_runStat = kHybernate; } 478 void Sleep() const { g_runStat = kSleep; } 479 void FlushMode() const { g_runStat = kModeFlush; } 480 void TestMode() const { g_runStat = kModeTest; } 481 void FlagMode() const { g_runStat = kModeFlag; } 482 void RunMode() const { g_runStat = kModeRun; } 483 484 // FIXME: To be removed 485 //void SetMode(int mode) const { g_runStat = mode; } 486 487 bool IsConnected(int i) const { return gi_NumConnect[i]==7; } 488 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); } 489 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; } 490 int GetNumConnected(int i) const { return gi_NumConnect[i]; } 491 int GetNumFilesOpen() const { return fFiles.size(); } 492 493 /* 494 bool IsConnected(int i) const { return gi_NumConnect[i]>0; } 495 bool IsConnecting(int i) const { return !IsConnected(i) && !IsDisconnected(i); } 496 bool IsDisconnected(int i) const { return gi_NumConnect[i]<=0 && g_port[i].sockDef==0; } 497 int GetNumConnected(int i) const { return gi_NumConnect[i]; } 498 */ 391 g_reset = 1; 392 } 393 394 bool IsConnected(int i) const { return gi_NumConnect[i]==1; } 395 bool IsConnecting(int i) const { return gi_NumConnect[i]==0 && g_port[i].sockDef!=0; } 396 bool IsDisconnected(int i) const { return gi_NumConnect[i]==0 && g_port[i].sockDef==0; } 397 bool IsRunInProgress() const { return fRunInProgress>=0; } 499 398 500 399 void SetIgnore(int i, bool b) const { if (g_port[i].sockDef!=0) g_port[i].sockDef=b?-1:1; } … … 541 440 } 542 441 543 void SetDebugLog(bool b) { fDebugLog = b; }544 545 void SetDebugStream(bool b)546 {547 fDebugStream = b;548 if (b)549 return;550 551 for (int i=0; i<40; i++)552 {553 if (!fDumpStream[i].is_open())554 continue;555 556 fDumpStream[i].close();557 558 ostringstream name;559 name << "socket_dump-" << setfill('0') << setw(2) << i << ".bin";560 fMsg.Message("Closed file '"+name.str()+"'");561 }562 }563 564 void SetDebugRead(bool b)565 {566 fDebugRead = b;567 if (b || !fDumpRead.is_open())568 return;569 570 fDumpRead.close();571 fMsg.Message("Closed file 'socket_events.txt'");572 }573 574 // size_t GetUsedMemory() const { return gi_usedMem; }575 576 442 void LoadDrsCalibration(const char *fname) 577 443 { … … 582 448 } 583 449 584 virtual int CloseOpenFiles() { CloseRunFile(0, 0, 0); return 0; } 585 586 587 /* 588 struct OpenFileToDim 589 { 590 int code; 591 char fileName[FILENAME_MAX]; 592 }; 593 594 SignalRunOpened(runid, filename); 595 // Send num open files 596 // Send runid, (more info about the run?), filename via dim 597 598 SignalEvtWritten(runid); 599 // Send num events written of newest file 600 601 SignalRunClose(runid); 602 // Send new num open files 603 // Send empty file-name if no file is open 604 605 */ 450 virtual int CloseOpenFiles() { CloseRunFile(); return 0; } 451 606 452 607 453 // -------------- Mapped event builder callbacks ------------------ … … 611 457 uint32_t values[5] = 612 458 { 613 static_cast<uint32_t>(fFiles.size()),614 0xffffffff,615 0,459 !fFile ? 0 : 1, 460 fFile ? fFile->GetRunId() : 0, 461 fFile ? fFile->GetRunId() : 0, 616 462 fLastOpened, 617 463 fLastClosed 618 464 }; 619 465 620 for (vector<DataProcessorImp*>::const_iterator it=fFiles.begin();621 it!=fFiles.end(); it++)622 {623 const DataProcessorImp *file = *it;624 625 if (file->GetRunId()<values[1])626 values[1] = file->GetRunId();627 628 if (file->GetRunId()>values[2])629 values[2] = file->GetRunId();630 }631 632 fMaxRun = values[2];633 634 466 vector<char> data(sizeof(values)+fname.size()+1); 635 467 memcpy(data.data(), values, sizeof(values)); 636 468 strcpy(data.data()+sizeof(values), fname.c_str()); 637 638 469 fDimRuns.Update(data); 639 470 } 640 471 641 vector<DataProcessorImp*> fFiles;472 shared_ptr<DataProcessorImp> fFile; 642 473 643 474 void updateEvents(const pair<Time,array<uint32_t,4>> &stat) … … 647 478 } 648 479 649 FileHandle_t runOpen(uint32_t runid, RUN_HEAD *h, size_t) 650 { 651 //fMsg.Info(" ==> TODO: Update run configuration in database!"); 652 653 map<uint32_t,FAD::RunDescription>::iterator it = fExpectedRuns.begin(); 654 while (it!=fExpectedRuns.end()) 655 { 656 if (it->first<runid) 657 { 658 ostringstream str; 659 str << "runOpen - Missed run " << it->first << "."; 660 fMsg.Info(str); 661 662 fExpectedRuns.erase(it++); 663 continue; 664 } 665 if (it->first==runid) 666 break; 667 it++; 668 } 669 670 FAD::RunDescription desc; 671 672 if (it==fExpectedRuns.end()) 673 { 674 ostringstream str; 675 str << "runOpen - Run " << runid << " wasn't expected (maybe manual triggers)"; 676 fMsg.Warn(str); 677 } 678 else 679 { 680 desc = it->second; 681 fExpectedRuns.erase(it); 682 } 683 684 // Check if file already exists... 480 bool runOpen(const shared_ptr<EVT_CTRL2> &evt) 481 { 482 const uint32_t night = evt->runCtrl->night; 483 const uint32_t runid = evt->runNum>0 ? evt->runNum : time(NULL); 484 485 // If there is still an open file: close it 486 if (fFile) 487 runClose(); 488 489 // Keep a copy of the currently valid drs calibration 490 // and associate it to the run control structure 491 evt->runCtrl->calib = shared_ptr<DrsCalibration>(new DrsCalibration(DataCalib::GetCalibration())); 492 493 // FIMXE: Check if file already exists... 494 495 // Crate the file 685 496 DataProcessorImp *file = 0; 686 497 switch (fFileFormat) 687 498 { 688 case FAD::kNone: file = new DataDump(fPath, fNightAsInt, runid, fMsg); break;689 case FAD::kDebug: file = new DataDebug(fPath, fNightAsInt, runid, fMsg); break;690 case FAD::kCfitsio: file = new DataWriteFits(fPath, fNightAsInt, runid, fMsg); break;691 case FAD::kFits: file = new DataWriteFits2(fPath, fNightAsInt, runid,fMsg); break;692 case FAD::kRaw: file = new DataWriteRaw(fPath, fNightAsInt, runid,fMsg); break;693 case FAD::kCalib: file = new DataCalib(fPath, fNightAsInt, runid, fDimDrsCalibration, fDimDrsRuns, fMsg); break;499 case FAD::kNone: file = new DataDump(fPath, night, runid, fMsg); break; 500 case FAD::kDebug: file = new DataDebug(fPath, night, runid, fMsg); break; 501 case FAD::kCfitsio: file = new DataWriteFits(fPath, night, runid, fMsg); break; 502 case FAD::kFits: file = new DataWriteFits2(fPath, night, runid, fMsg); break; 503 case FAD::kRaw: file = new DataWriteRaw(fPath, night, runid, fMsg); break; 504 case FAD::kCalib: file = new DataCalib(fPath, night, runid, fDimDrsCalibration, fDimDrsRuns, fMsg); break; 694 505 } 695 506 696 507 try 697 508 { 698 if (!file->Open(h, desc)) 699 return 0; 509 // Try to open the file 510 FAD::RunDescription desc; 511 desc.name = evt->runCtrl->runType; 512 513 if (!file->Open(*evt, desc)) 514 return false; 700 515 } 701 516 catch (const exception &e) 702 517 { 703 518 fMsg.Error("Exception trying to open file: "+string(e.what())); 704 return 0; 705 } 706 707 fFiles.push_back(file); 519 return false; 520 } 521 522 fLastOpened = runid; 523 524 // Signal that a file is open 525 fFile = shared_ptr<DataProcessorImp>(file); 526 527 // Now do all the calls which potentially block (dim) 528 529 // Time for update runs before time for update events 530 UpdateRuns(file->GetFileName()); 531 fNumEvts[kEventId] = 0; 532 fNumEvts[kTriggerId] = 0; 533 fNumEvts[kCurrent] = 0; 534 fDimQueue3.post(make_pair(Time(), fNumEvts)); 535 536 fDimWriteStats.FileOpened(file->GetFileName()); 708 537 709 538 ostringstream str; … … 711 540 fMsg.Info(str); 712 541 713 fDimWriteStats.FileOpened(file->GetFileName()); 714 715 fLastOpened = runid; 716 UpdateRuns(file->GetFileName()); 717 718 fNumEvts[kEventId] = 0; 719 fNumEvts[kTriggerId] = 0; 720 721 fNumEvts[kCurrent] = 0; 722 fDimQueue3.post(make_pair(Time(), fNumEvts)); 723 // fDimCurrentEvent.Update(uint32_t(0)); 724 725 return reinterpret_cast<FileHandle_t>(file); 726 } 727 728 int runWrite(FileHandle_t handler, EVENT *e, size_t /*sz*/) 729 { 730 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler); 731 732 if (!file->WriteEvt(e)) 733 return -1; 734 735 if (file->GetRunId()==fMaxRun) 736 { 737 fNumEvts[kCurrent]++; 738 fNumEvts[kEventId] = e->EventNum; 739 fNumEvts[kTriggerId] = e->TriggerNum; 740 } 741 542 return true; 543 } 544 545 bool runWrite(const shared_ptr<EVT_CTRL2> &e) 546 { 547 const EVENT &evt = *e->fEvent; 548 if (!fFile->WriteEvt(evt)) 549 return false; 550 551 fNumEvts[kCurrent]++; 552 fNumEvts[kEventId] = evt.EventNum; 553 fNumEvts[kTriggerId] = evt.TriggerNum; 742 554 fNumEvts[kTotal]++; 743 555 … … 747 559 { 748 560 fDimQueue3.post(make_pair(Time(), fNumEvts)); 749 //fDimEvents.Update(fNumEvts);750 561 oldt = newt; 751 562 } 752 563 753 754 // ===> SignalEvtWritten(runid); 755 // Send num events written of newest file 756 757 /* close run runId (all all runs if runId=0) */ 758 /* return: 0=close scheduled / >0 already closed / <0 does not exist */ 759 //CloseRunFile(file->GetRunId(), time(NULL)+2) ; 760 761 return 0; 564 return true; 565 } 566 567 void runClose() 568 { 569 if (!fFile) 570 return; 571 572 // It can happen that runFinished was never called 573 // (e.g. runWrite failed) 574 if (fRunInProgress==fFile->GetRunId()) 575 fRunInProgress = -1; 576 577 // Close the file 578 const bool rc = fFile->Close(NULL); 579 580 fLastClosed = fFile->GetRunId(); 581 582 ostringstream str; 583 str << "Closed: " << fFile->GetFileName() << " (" << fFile->GetRunId() << ")"; 584 if (!rc) 585 str << "... failed!"; 586 587 // Signal that the file is closed 588 589 fFile.reset(); 590 591 // Now do all the calls which can potentially block (dim) 592 593 CloseRun(fLastClosed); 594 595 // Time for update events before time for update runs 596 fDimQueue3.post(make_pair(Time(), fNumEvts)); 597 UpdateRuns(); 598 599 // Do the potentially blocking call after all others 600 rc ? fMsg.Info(str) : fMsg.Error(str); 762 601 } 763 602 764 603 virtual void CloseRun(uint32_t /*runid*/) { } 765 766 int runClose(FileHandle_t handler, RUN_TAIL *tail, size_t)767 {768 //fMsg.Info(" ==> TODO: Update run configuration in database!");769 770 DataProcessorImp *file = reinterpret_cast<DataProcessorImp*>(handler);771 772 const vector<DataProcessorImp*>::iterator it = find(fFiles.begin(), fFiles.end(), file);773 if (it==fFiles.end())774 {775 ostringstream str;776 str << "File handler (" << handler << ") requested to close by event builder doesn't exist.";777 fMsg.Fatal(str);778 return -1;779 }780 781 /*782 fFiles.erase(it);783 784 fLastClosed = file->GetRunId();785 CloseRun(fLastClosed);786 UpdateRuns();787 788 fDimEvents.Update(fNumEvts);789 */790 791 const bool rc = file->Close(tail);792 if (!rc)793 {794 // Error message795 }796 797 // Note that this is the signal for the fadctrl to change from798 // WritingData back to Connected. If this is done too early,799 // a new run might be started before this is closed. This is800 // faster, but leads to problems with the DRS calibration801 // if the system is fast enough to start the new run before802 // this one has really been closed.803 fFiles.erase(it);804 805 fLastClosed = file->GetRunId();806 CloseRun(fLastClosed);807 UpdateRuns();808 809 fDimQueue3.post(make_pair(Time(),fNumEvts));810 //fDimEvents.Update(fNumEvts);811 812 813 ostringstream str;814 str << "Closed: " << file->GetFileName() << " (" << file->GetRunId() << ")";815 fMsg.Info(str);816 817 delete file;818 819 // ==> SignalRunClose(runid);820 // Send new num open files821 // Send empty file-name if no file is open822 823 return rc ? 0 : -1;824 }825 826 ofstream fDumpStream[40];827 828 void debugStream(int isock, void *buf, int len)829 {830 if (!fDebugStream)831 return;832 833 const int slot = isock/7;834 if (slot<0 || slot>39)835 return;836 837 if (!fDumpStream[slot].is_open())838 {839 ostringstream name;840 name << "socket_dump-" << setfill('0') << setw(2) << slot << ".bin";841 842 fDumpStream[slot].open(name.str().c_str(), ios::app);843 if (!fDumpStream[slot])844 {845 ostringstream str;846 str << "Open file '" << name << "': " << strerror(errno) << " (errno=" << errno << ")";847 fMsg.Error(str);848 849 return;850 }851 852 fMsg.Message("Opened file '"+name.str()+"' for writing.");853 }854 855 fDumpStream[slot].write(reinterpret_cast<const char*>(buf), len);856 }857 858 ofstream fDumpRead; // Stream to possibly dump docket events859 860 void debugRead(int isock, int ibyte, uint32_t event, uint32_t ftmevt, uint32_t runno, int state, uint32_t tsec, uint32_t tusec)861 {862 // isock = socketID (0-279)863 // ibyte = #bytes gelesen864 // event = eventId (oder 0 wenn noch nicht bekannt)865 // state : 1=finished reading data866 // 0=reading data867 // -1=start reading data (header)868 // -2=start reading data,869 // eventId not known yet (too little data)870 // tsec, tusec = time when reading seconds, microseconds871 //872 if (!fDebugRead || ibyte==0)873 return;874 875 if (!fDumpRead.is_open())876 {877 fDumpRead.open("socket_events.txt", ios::app);878 if (!fDumpRead)879 {880 ostringstream str;881 str << "Open file 'socket_events.txt': " << strerror(errno) << " (errno=" << errno << ")";882 fMsg.Error(str);883 884 return;885 }886 887 fMsg.Message("Opened file 'socket_events.txt' for writing.");888 889 fDumpRead << "# START: " << Time().GetAsStr() << endl;890 fDumpRead << "# state time_sec time_usec socket slot runno event_id trigger_id bytes_received" << endl;891 }892 893 fDumpRead894 << setw(2) << state << " "895 << setw(8) << tsec << " "896 << setw(9) << tusec << " "897 << setw(3) << isock << " "898 << setw(2) << isock/7 << " "899 << runno << " "900 << event << " "901 << ftmevt << " "902 << ibyte << endl;903 }904 905 array<uint16_t,2> fVecRoi;906 604 907 605 void updateRoi(const pair<Time, array<uint16_t,2>> &roi) … … 911 609 } 912 610 913 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int /*iboard*/) 914 { 915 /* 916 fadhd[i] ist ein array mit den 40 fad-headers 917 (falls ein board nicht gelesen wurde, ist start_package_flag =0 ) 918 919 event ist die Struktur, die auch die write routine erhaelt; 920 darin sind im header die 'soll-werte' fuer z.B. eventID 921 als auch die ADC-Werte (falls Du die brauchst) 922 923 Wenn die routine einen negativen Wert liefert, wird das event 924 geloescht (nicht an die write-routine weitergeleitet [mind. im Prinzip] 925 */ 611 int eventCheck(const shared_ptr<EVT_CTRL2> &evt) 612 { 613 const PEVNT_HEADER *fadhd = evt->FADhead.get(); 614 const EVENT *event = evt->fEvent; 926 615 927 616 const array<uint16_t,2> roi = {{ event->Roi, event->RoiTM }}; … … 933 622 } 934 623 935 const FAD::EventHeader *beg = reinterpret_cast< FAD::EventHeader*>(fadhd);936 const FAD::EventHeader *end = reinterpret_cast< FAD::EventHeader*>(fadhd)+40;624 const FAD::EventHeader *beg = reinterpret_cast<const FAD::EventHeader*>(fadhd); 625 const FAD::EventHeader *end = reinterpret_cast<const FAD::EventHeader*>(fadhd)+40; 937 626 938 627 // FIMXE: Compare with target configuration … … 951 640 { 952 641 fMsg.Error("Inconsistency in FAD status detected.... closing run."); 953 CloseRunFile(runNr, 0, 0);642 evt->runCtrl->maxEvt = 0; 954 643 return -1; 955 644 } … … 958 647 { 959 648 fMsg.Error("Inconsistent run number detected.... closing run."); 960 CloseRunFile(runNr, 0, 0);649 evt->runCtrl->maxEvt = 0; 961 650 return -1; 962 651 } … … 973 662 { 974 663 fMsg.Error("Inconsistent FAD event number detected.... closing run."); 975 CloseRunFile(runNr, 0, 0);664 evt->runCtrl->maxEvt = 0; 976 665 return -1; 977 666 } … … 980 669 { 981 670 fMsg.Error("Inconsistent FTM trigger number detected.... closing run."); 982 CloseRunFile(runNr, 0, 0);671 evt->runCtrl->maxEvt = 0; 983 672 return -1; 984 673 } … … 987 676 { 988 677 fMsg.Error("Inconsistent phase shift detected.... closing run."); 989 CloseRunFile(runNr, 0, 0);678 evt->runCtrl->maxEvt = 0; 990 679 return -1; 991 680 } … … 994 683 { 995 684 fMsg.Error("Inconsistent DAC values detected.... closing run."); 996 CloseRunFile(runNr, 0, 0);685 evt->runCtrl->maxEvt = 0; 997 686 return -1; 998 687 } … … 1001 690 { 1002 691 fMsg.Error("Inconsistent trigger type detected.... closing run."); 1003 CloseRunFile(runNr, 0, 0);692 evt->runCtrl->maxEvt = 0; 1004 693 return -1; 1005 694 } … … 1014 703 } 1015 704 1016 void SendRawData(PEVNT_HEADER *fadhd, EVENT *event) 1017 { 705 void applyCalib(const shared_ptr<EVT_CTRL2> &evt) 706 { 707 const PEVNT_HEADER *fadhd = evt->FADhead.get(); 708 const EVENT *event = evt->fEvent; 709 1018 710 // Currently we send any event no matter what its trigger id is... 1019 711 // To be changed. … … 1029 721 1030 722 // Workaround to find a valid header..... 1031 const FAD::EventHeader *beg = reinterpret_cast< FAD::EventHeader*>(fadhd);1032 const FAD::EventHeader *end = reinterpret_cast< FAD::EventHeader*>(fadhd)+40;723 const FAD::EventHeader *beg = reinterpret_cast<const FAD::EventHeader*>(fadhd); 724 const FAD::EventHeader *end = reinterpret_cast<const FAD::EventHeader*>(fadhd)+40; 1033 725 1034 726 // FIMXE: Compare with target configuration … … 1048 740 float *vec = reinterpret_cast<float*>(data.data()+sizeof(EVENT)); 1049 741 1050 DataCalib::Apply(vec, event->Adc_Data, event->StartPix, event->Roi);742 evt->runCtrl->calib->Apply(vec, event->Adc_Data, event->StartPix, event->Roi); 1051 743 DrsCalibrate::RemoveSpikes(vec, event->Roi); 1052 744 … … 1068 760 fDimEventData.setQuality(ptr->fTriggerType); 1069 761 fDimEventData.Update(data2); 1070 } 1071 762 763 } 764 765 /* 1072 766 void SendFeedbackData(PEVNT_HEADER *fadhd, EVENT *event) 1073 767 { 1074 /*1075 768 if (!DataCalib::IsValid()) 1076 769 return; … … 1104 797 1105 798 fDimFeedbackData.Update(data2); 1106 */ 1107 } 1108 1109 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t /*iboard*/, void */*buffer*/) 1110 { 1111 switch (threadID) 1112 { 1113 case 0: 1114 SendRawData(fadhd, event); 1115 return 100; 1116 /* 1117 case 1: 1118 SendFeedbackData(fadhd, event); 1119 return 2;*/ 1120 } 1121 return 100; 1122 } 1123 1124 1125 bool IsRunStarted() const 1126 { 1127 const set<uint32_t>::const_iterator it = fIsRunStarted.find(fRunNumber-1); 1128 return it==fIsRunStarted.end();// ? true : it->second.started; 799 } 800 */ 801 802 bool IsRunWaiting()// const 803 { 804 const lock_guard<mutex> lock(mtx_newrun); 805 return fExpectedRuns.find(fRunNumber-1)!=fExpectedRuns.end(); 1129 806 } 1130 807 … … 1134 811 } 1135 812 813 /* 1136 814 bool IsRunFileOpen() 1137 815 { 1138 816 return fLastOpened==fRunNumber-1; 1139 } 817 }*/ 1140 818 1141 819 bool IncreaseRunNumber(uint32_t run) … … 1159 837 } 1160 838 1161 void gotNewRun(uint32_t runnr, PEVNT_HEADER */*headers*/) 1162 { 1163 // This function is called even when writing is switched off 1164 set<uint32_t>::iterator it = fIsRunStarted.begin(); 1165 while (it!=fIsRunStarted.end()) 1166 { 1167 if (*it<runnr) 839 void gotNewRun(RUN_CTRL2 &run) 840 { 841 // This is to secure iteration over fExpectedRuns 842 const lock_guard<mutex> lock(mtx_newrun); 843 844 map<uint32_t,FAD::RunDescription>::const_iterator it = fExpectedRuns.begin(); 845 while (it!=fExpectedRuns.end()) 846 { 847 if (it->first<run.runId) 1168 848 { 1169 849 ostringstream str; 1170 str << " gotNewRun - Missed run " << *it << ".";850 str << "runOpen - Missed run " << it->first << "."; 1171 851 fMsg.Info(str); 1172 852 1173 fIsRunStarted.erase(it++); 853 // Increase the iterator first, it becomes invalid with the next call 854 const auto is = it++; 855 fExpectedRuns.erase(is); 1174 856 continue; 1175 857 } 1176 if (*it==runnr) 858 859 if (it->first==run.runId) 1177 860 break; 861 1178 862 it++; 1179 863 } 1180 if (it==fIsRunStarted.end()) 864 865 if (it==fExpectedRuns.end()) 1181 866 { 1182 867 ostringstream str; 1183 str << " gotNewRun - Not waiting for run " << runnr << ".";868 str << "runOpen - Run " << run.runId << " wasn't expected (maybe manual triggers)"; 1184 869 fMsg.Warn(str); 1185 return; 1186 } 1187 1188 map<uint32_t,FAD::RunDescription>::iterator i2 = fExpectedRuns.find(runnr); 1189 if (i2==fExpectedRuns.end()) 1190 { 1191 ostringstream str; 1192 str << "gotNewRun - Run " << runnr << " wasn't expected."; 1193 fMsg.Warn(str); 1194 return; 1195 } 1196 1197 CloseRunFile(runnr, time(NULL)+i2->second.maxtime, i2->second.maxevt); 1198 // return: 0=close scheduled / >0 already closed / <0 does not exist 1199 1200 // FIXME: Move configuration from expected runs to runs which will soon 1201 // be opened/closed 1202 1203 fIsRunStarted.erase(it); 1204 } 1205 1206 map<boost::thread::id, string> fLastMessage; 1207 1208 void factOut(int severity, int err, const char *message) 1209 { 1210 if (!fDebugLog && severity==99) 1211 return; 1212 870 871 // This is not ideal, but the best we can do 872 run.night = fNightAsInt; 873 874 return; 875 } 876 877 const FAD::RunDescription &conf = it->second; 878 879 run.runType = conf.name; 880 run.maxEvt = conf.maxevt; 881 run.closeTime = conf.maxtime + run.openTime; 882 run.night = conf.night; 883 884 fExpectedRuns.erase(it); 885 886 // Now signal the fadctrl (configuration process that a run is in progress) 887 // Maybe this could be done earlier, but we are talking about a 888 // negligible time scale here. 889 fRunInProgress = run.runId; 890 } 891 892 void runFinished() 893 { 894 // This is called when the last event of a run (run time exceeded or 895 // max number of events exceeded) has been received. 896 fRunInProgress = -1; 897 } 898 899 //map<boost::thread::id, string> fLastMessage; 900 901 void factOut(int severity, const char *message) 902 { 1213 903 ostringstream str; 1214 //str << boost::this_thread::get_id() << " "; 1215 str << "EventBuilder("; 1216 if (err<0) 1217 str << "---"; 1218 else 1219 str << err; 1220 str << "): " << message; 1221 904 str << "EventBuilder: " << message; 905 906 /* 1222 907 string &old = fLastMessage[boost::this_thread::get_id()]; 1223 908 … … 1225 910 return; 1226 911 old = str.str(); 912 */ 1227 913 1228 914 fMsg.Update(str, severity); … … 1266 952 */ 1267 953 1268 void factStat(const EVT_STAT &/*stat*/)1269 {1270 //fDimStatistics2.Update(stat);1271 }1272 1273 954 void factStatSend(const pair<Time,GUI_STAT> &stat) 1274 955 { … … 1309 990 arr[i] = *ref; 1310 991 1311 if (gi_NumConnect[i] !=7)992 if (gi_NumConnect[i]==0) 1312 993 { 1313 994 arr[i] = 0; … … 1357 1038 vec[i+2] = *ref; 1358 1039 1359 if (gi_NumConnect[i] !=7)1040 if (gi_NumConnect[i]==0) 1360 1041 { 1361 1042 vec[i+2] = 0; … … 1382 1063 void Update(DimDescribedService &svc, const array<T, N> &data, const Time &t=Time(), int n=N) 1383 1064 { 1384 // svc.setQuality(vec[40]<=vec[41]);1385 1065 svc.setData(const_cast<T*>(data.data()), sizeof(T)*n); 1386 1066 svc.Update(t); … … 1524 1204 } 1525 1205 1526 void debugHead( int /*socket*/,const FAD::EventHeader &h)1206 void debugHead(const FAD::EventHeader &h) 1527 1207 { 1528 1208 const uint16_t id = h.Id(); … … 1540 1220 1541 1221 fDimQueue2.post(make_tuple(Time(), changed, h)); 1542 1543 //const lock_guard<mutex> guard(fMutexDimQueue2);1544 //fDimQueue2.push_back(make_tuple(Time(), changed, h));1545 1222 } 1546 1223 }; … … 1549 1226 1550 1227 // ----------- Event builder callbacks implementation --------------- 1551 extern "C" 1228 bool runOpen(const shared_ptr<EVT_CTRL2> &evt) 1552 1229 { 1553 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len) 1554 { 1555 return EventBuilderWrapper::This->runOpen(irun, runhd, len); 1556 } 1557 1558 int runWrite(FileHandle_t fileId, EVENT *event, size_t len) 1559 { 1560 return EventBuilderWrapper::This->runWrite(fileId, event, len); 1561 } 1562 1563 int runClose(FileHandle_t fileId, RUN_TAIL *runth, size_t len) 1564 { 1565 return EventBuilderWrapper::This->runClose(fileId, runth, len); 1566 } 1567 1568 // ----- 1569 1570 //void *runStart(uint32_t /*irun*/, RUN_HEAD */*runhd*/, size_t /*len*/) 1571 //{ 1572 // return NULL; 1573 //} 1574 1575 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr) 1576 { 1577 return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr); 1578 } 1579 1580 int runEnd(uint32_t, void */*runPtr*/) 1581 { 1582 return 0; 1583 } 1584 1585 // ----- 1586 1587 int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard) 1588 { 1589 return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard); 1590 } 1591 1592 void gotNewRun(uint32_t runnr, PEVNT_HEADER *headers) 1593 { 1594 return EventBuilderWrapper::This->gotNewRun(runnr, headers); 1595 } 1596 1597 // ----- 1598 1599 void factOut(int severity, int err, const char *message) 1600 { 1601 EventBuilderWrapper::This->factOut(severity, err, message); 1602 } 1603 1604 void factStat(GUI_STAT stat) 1605 { 1606 EventBuilderWrapper::This->factStat(stat); 1607 } 1608 1609 void factStatNew(EVT_STAT stat) 1610 { 1611 EventBuilderWrapper::This->factStat(stat); 1612 } 1613 1614 void factReportIncomplete (uint64_t rep) 1615 { 1616 EventBuilderWrapper::This->factReportIncomplete(rep); 1617 } 1618 1619 // ------ 1620 1621 void debugHead(int socket, int/*board*/, void *buf) 1622 { 1623 const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf); 1624 EventBuilderWrapper::This->debugHead(socket, h); 1625 } 1626 1627 void debugStream(int isock, void *buf, int len) 1628 { 1629 return EventBuilderWrapper::This->debugStream(isock, buf, len); 1630 } 1631 1632 void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runno, int state, uint32_t tsec, uint32_t tusec) 1633 { 1634 EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec); 1635 } 1230 return EventBuilderWrapper::This->runOpen(evt); 1636 1231 } 1637 1232 1233 bool runWrite(const shared_ptr<EVT_CTRL2> &evt) 1234 { 1235 return EventBuilderWrapper::This->runWrite(evt); 1236 } 1237 1238 void runClose() 1239 { 1240 EventBuilderWrapper::This->runClose(); 1241 } 1242 1243 int eventCheck(const shared_ptr<EVT_CTRL2> &evt) 1244 { 1245 return EventBuilderWrapper::This->eventCheck(evt); 1246 } 1247 1248 void gotNewRun(RUN_CTRL2 &run) 1249 { 1250 EventBuilderWrapper::This->gotNewRun(run); 1251 } 1252 1253 void runFinished() 1254 { 1255 EventBuilderWrapper::This->runFinished(); 1256 } 1257 1258 void applyCalib(const shared_ptr<EVT_CTRL2> &evt) 1259 { 1260 EventBuilderWrapper::This->applyCalib(evt); 1261 } 1262 1263 void factOut(int severity, const char *message) 1264 { 1265 EventBuilderWrapper::This->factOut(severity, message); 1266 } 1267 1268 void factStat(GUI_STAT stat) 1269 { 1270 EventBuilderWrapper::This->factStat(stat); 1271 } 1272 1273 void factReportIncomplete(uint64_t rep) 1274 { 1275 EventBuilderWrapper::This->factReportIncomplete(rep); 1276 } 1277 1278 // ------ 1279 1280 void debugHead(void *buf) 1281 { 1282 const FAD::EventHeader &h = *reinterpret_cast<FAD::EventHeader*>(buf); 1283 EventBuilderWrapper::This->debugHead(h); 1284 } 1285 1638 1286 #endif
Note:
See TracChangeset
for help on using the changeset viewer.