- Timestamp:
- 05/28/13 13:50:50 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.cc
r16108 r16382 1 #include <poll.h> 1 2 #include <sys/time.h> 2 3 #include <sys/epoll.h> … … 6 7 #include <cstdarg> 7 8 #include <list> 8 #include <forward_list>9 9 10 10 #include <boost/algorithm/string/join.hpp> … … 21 21 22 22 //#define COMPLETE_EVENTS 23 //#define USE_POLL 23 24 //#define USE_EPOLL 24 25 //#define USE_SELECT 25 26 //#define COMPLETE_EPOLL 26 27 28 // Reading only 1024: 13: 77Hz, 87% 29 // Reading only 1024: 12: 78Hz, 46% 30 // Reading only 300: 4: 250Hz, 92% 31 // Reading only 300: 3: 258Hz, 40% 32 33 // Reading only four threads 1024: 13: 77Hz, 60% 34 // Reading only four threads 1024: 12: 78Hz, 46% 35 // Reading only four threads 300: 4: 250Hz, 92% 36 // Reading only four threads 300: 3: 258Hz, 40% 37 38 // Default 300: 4: 249Hz, 92% 39 // Default 300: 3: 261Hz, 40% 40 // Default 1024: 13: 76Hz, 93% 41 // Default 1024: 12: 79Hz, 46% 42 43 // Poll [selected] 1024: 13: 63Hz, 45% 44 // Poll [selected] 1024: 14: 63Hz, 63% 45 // Poll [selected] 1024: 15: 64Hz, 80% 46 // Poll [selected] 300: 4: 230Hz, 47% 47 // Poll [selected] 300: 3: 200Hz, 94% 48 49 // Poll [all] 1024: 13: 65Hz, 47% 50 // Poll [all] 1024: 14: 64Hz, 59% 51 // Poll [all] 1024: 15: 62Hz, 67% 52 // Poll [all] 300: 4: 230Hz, 47% 53 // Poll [all] 300: 3: 230Hz, 35% 54 27 55 // ========================================================================== 28 56 29 bool runOpen(const shared_ptr<EVT_CTRL2>&evt);30 bool runWrite(const shared_ptr<EVT_CTRL2>&evt);57 bool runOpen(const EVT_CTRL2 &evt); 58 bool runWrite(const EVT_CTRL2 &evt); 31 59 void runClose(); 32 60 void applyCalib(const shared_ptr<EVT_CTRL2> &evt); … … 36 64 void runFinished(); 37 65 void factStat(GUI_STAT gj); 38 bool eventCheck(const shared_ptr<EVT_CTRL2>&evt);66 bool eventCheck(const EVT_CTRL2 &evt); 39 67 void debugHead(void *buf); 40 68 … … 52 80 53 81 // ========================================================================== 54 55 void factPrintf(int severity, const char *fmt, ...)56 {57 char str[1000];58 59 va_list ap;60 va_start(ap, fmt);61 vsnprintf(str, 1000, fmt, ap);62 va_end(ap);63 64 factOut(severity, str);65 }66 67 // ==========================================================================68 69 #define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))70 #define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)71 82 72 83 namespace Memory … … 77 88 uint64_t max_inuse = 0; 78 89 79 mutex mtx;80 81 forward_list<void*> memory;90 std::mutex mtx; 91 92 std::forward_list<void*> memory; 82 93 83 94 void *malloc() … … 104 115 { 105 116 // Get the next free slot from the stack and return it 106 const lock_guard<mutex> lock(mtx);117 const std::lock_guard<std::mutex> lock(mtx); 107 118 mem = memory.front(); 108 119 memory.pop_front(); … … 130 141 } 131 142 132 const lock_guard<mutex> lock(mtx);143 const std::lock_guard<std::mutex> lock(mtx); 133 144 memory.push_front(mem); 134 145 } 146 135 147 }; 148 149 // ========================================================================== 150 151 void factPrintf(int severity, const char *fmt, ...) 152 { 153 char str[1000]; 154 155 va_list ap; 156 va_start(ap, fmt); 157 vsnprintf(str, 1000, fmt, ap); 158 va_end(ap); 159 160 factOut(severity, str); 161 } 136 162 137 163 // ========================================================================== … … 185 211 }; 186 212 213 timeval time; 187 214 uint64_t rateBytes; 188 215 uint32_t skip; // number of bytes skipped before start of event 189 bool repmem; // reportet no mmemory free190 216 191 217 uint32_t len() const { return uint32_t(H.package_length)*2; } … … 215 241 { 216 242 // wait for something to do... 217 const int rc = epoll_wait(fd_epoll, events, NBOARDS, 10 ); // max, timeout[ms]243 const int rc = epoll_wait(fd_epoll, events, NBOARDS, 100); // max, timeout[ms] 218 244 if (rc>=0) 219 245 return rc; … … 363 389 bufPos = B; // no byte read so far 364 390 skip = 0; // start empty 365 repmem = false;366 391 367 392 factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket); … … 380 405 bool READ_STRUCT::read() 381 406 { 407 if (!connected) 408 return false; 409 382 410 if (bufLen==0) 383 411 return true; … … 409 437 if (bufTyp==kStream) 410 438 return false; 439 440 if (bufPos==B) 441 gettimeofday(&time, NULL); 411 442 412 443 bufPos += jrd; //==> prepare for continuation … … 437 468 I[11] = ntohl(I[11]); // time; 438 469 470 // Use back inserter?? 439 471 for (int s=24; s<24+NTemp+NDAC; s++) 440 472 S[s] = ntohs(S[s]); // drs_temperature / dac … … 523 555 shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun) 524 556 { 557 /* 558 checkroi consistence 559 find existing entry 560 if no entry, try to allocate memory 561 if entry and memory, init event structure 562 */ 563 525 564 uint16_t nRoi[9]; 526 565 if (!checkRoiConsistency(rd, nRoi)) … … 562 601 evt->Errors[2]++; 563 602 603 // It is maybe not likely, but the header of this board might have 604 // arrived earlier. (We could also update the run-info, but 605 // this should not make a difference here) 606 if ((rd.time.tv_sec==evt->time.tv_sec && rd.time.tv_usec<evt->time.tv_usec) || 607 rd.time.tv_sec<evt->time.tv_sec) 608 evt->time = rd.time; 609 564 610 //everything seems fine so far ==> use this slot .... 565 611 return evt; … … 573 619 } 574 620 575 shared_ptr<EVT_CTRL2> evt(new EVT_CTRL2);576 577 gettimeofday(&evt->time, NULL);621 EVT_CTRL2 *evt = new EVT_CTRL2; 622 623 evt->time = rd.time; 578 624 579 625 evt->runNum = rd.H.runnumber; … … 613 659 } 614 660 661 // Keep pointer to run of this event 662 evt->runCtrl = actrun; 663 615 664 // Increase the number of events we have started to receive in this run 616 665 actrun->lastTime = evt->time.tv_sec; // Time when the last event was received 617 666 actrun->lastEvt++; 618 619 // Keep pointer to run of this event620 evt->runCtrl = actrun;621 622 // Secure access to evtCtrl against access in CloseRunFile623 // This should be the last... otherwise we can run into threading issues624 // if the event is accessed before it is fully initialized.625 evtCtrl.push_back(evt);626 667 627 668 // An event can be the first and the last, but not the last and the first. … … 629 670 // runFinished signals that the last event of a run was just received. Processing 630 671 // might still be ongoing, but we can start a new run. 631 const bool cond1 = actrun->lastEvt < actrun->maxEvt;// max number of events not reached672 const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached 632 673 const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached 633 674 if (!cond1 || !cond2) 634 675 runFinished(); 635 676 636 return evt; 677 // We don't mind here that this is not common to all events, 678 // because every coming event will fullfil the condition as well. 679 if (!cond1) 680 evt->closeRequest |= kRequestMaxEvtsReached; 681 if (!cond2) 682 evt->closeRequest |= kRequestMaxTimeReached; 683 684 // Secure access to evtCtrl against access in CloseRunFile 685 // This should be the last... otherwise we can run into threading issues 686 // if the event is accessed before it is fully initialized. 687 evtCtrl.emplace_back(evt); 688 return evtCtrl.back(); 637 689 } 638 690 … … 642 694 const int i = rBuf.sockId; 643 695 644 memcpy(evt->FADhead .get()+i, &rBuf.H, sizeof(PEVNT_HEADER));696 memcpy(evt->FADhead+i, &rBuf.H, sizeof(PEVNT_HEADER)); 645 697 646 698 int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts … … 743 795 void writeEvt(const shared_ptr<EVT_CTRL2> &evt) 744 796 { 745 const shared_ptr<RUN_CTRL2> &run = evt->runCtrl; 797 //const shared_ptr<RUN_CTRL2> &run = evt->runCtrl; 798 RUN_CTRL2 &run = *evt->runCtrl; 746 799 747 800 // Is this a valid event or just an empty event to trigger run close? 748 801 // If this is not an empty event open the new run-file 749 802 // Empty events are there to trigger run-closing conditions 750 if (evt-> runNum>=0)803 if (evt->valid()) 751 804 { 752 805 // File not yet open 753 if (run ->fileStat==kFileNotYetOpen)806 if (run.fileStat==kFileNotYetOpen) 754 807 { 755 808 // runOpen will close a previous run, if still open 756 if (!runOpen( evt))809 if (!runOpen(*evt)) 757 810 { 758 811 factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum); 759 run ->fileStat = kFileClosed;812 run.fileStat = kFileClosed; 760 813 return; 761 814 } 762 815 763 816 factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum); 764 run ->fileStat = kFileOpen;817 run.fileStat = kFileOpen; 765 818 } 766 819 767 820 // Here we have a valid calibration and can go on with that. 821 // It is important that _all_ events are sent for calibration (except broken ones) 768 822 processingQueue1.post(evt); 769 823 } 770 824 771 825 // File already closed 772 if (run ->fileStat==kFileClosed)826 if (run.fileStat==kFileClosed) 773 827 return; 774 828 775 829 bool rc1 = true; 776 if (evt-> runNum>=0)777 { 778 rc1 = runWrite( evt);830 if (evt->valid()) 831 { 832 rc1 = runWrite(*evt); 779 833 if (!rc1) 780 834 factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum); 781 835 } 782 836 783 const bool cond1 = run->lastEvt < run->maxEvt; // max number of events not reached784 const bool cond2 = run->lastTime < run->closeTime; // max time not reached785 const bool cond3 = run->closeRequest==kRequestNone; // file signaled to be closed786 const bool cond4 = rc1; // Write successfull837 // File not open... no need to close or to check for close 838 // ... this is the case if CloseRunFile was called before any file was opened. 839 if (run.fileStat!=kFileOpen) 840 return; 787 841 788 842 // File is not yet to be closed. 789 if ( cond1 && cond2 && cond3 && cond4)843 if (rc1 && evt->closeRequest==kRequestNone) 790 844 return; 791 845 792 846 runClose(); 793 run ->fileStat = kFileClosed;847 run.fileStat = kFileClosed; 794 848 795 849 vector<string> reason; 796 if (!cond1) 797 reason.push_back(to_string(run->maxEvt)+" evts reached"); 798 if (!cond2) 799 reason.push_back(to_string(run->closeTime-run->openTime)+"s reached"); 800 if (!cond3) 801 { 802 if (run->closeRequest&kRequestManual) 803 reason.push_back("close requested"); 804 if (run->closeRequest&kRequestTimeout) 805 reason.push_back("receive timeout"); 806 if (run->closeRequest&kRequestConnectionChange) 807 reason.push_back("connection changed"); 808 if (run->closeRequest&kRequestEventCheckFailed) 809 reason.push_back("event check failed"); 810 } 811 if (!cond4) 812 reason.push_back("runWrite failed"); 850 if (evt->closeRequest&kRequestManual) 851 reason.emplace_back("close requested"); 852 if (evt->closeRequest&kRequestTimeout) 853 reason.emplace_back("receive timeout"); 854 if (evt->closeRequest&kRequestConnectionChange) 855 reason.emplace_back("connection changed"); 856 if (evt->closeRequest&kRequestEventCheckFailed) 857 reason.emplace_back("event check failed"); 858 if (evt->closeRequest&kRequestMaxTimeReached) 859 reason.push_back(to_string(run.closeTime-run.openTime)+"s reached"); 860 if (evt->closeRequest&kRequestMaxEvtsReached) 861 reason.push_back(to_string(run.maxEvt)+" evts reached"); 862 if (!rc1) 863 reason.emplace_back("runWrite failed"); 813 864 814 865 const string str = boost::algorithm::join(reason, ", "); … … 820 871 void procEvt(const shared_ptr<EVT_CTRL2> &evt) 821 872 { 822 if (evt-> runNum>=0)873 if (evt->valid()) 823 874 { 824 875 evt->fEvent->Errors[0] = evt->Errors[0]; … … 827 878 evt->fEvent->Errors[3] = evt->Errors[3]; 828 879 880 evt->fEvent->PCTime = evt->time.tv_sec; 881 evt->fEvent->PCUsec = evt->time.tv_usec; 882 883 evt->fEvent->NumBoards = evt->nBoard; 884 829 885 for (int ib=0; ib<NBOARDS; ib++) 830 evt->fEvent->BoardTime[ib] = evt->FADhead .get()[ib].time;831 832 if (!eventCheck( evt))833 { 834 evt->runCtrl->closeRequest = kRequestEventCheckFailed;886 evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time; 887 888 if (!eventCheck(*evt)) 889 { 890 secondaryQueue.emplace(new EVT_CTRL2(kRequestEventCheckFailed, evt->runCtrl)); 835 891 return; 836 892 } … … 843 899 // ========================================================================== 844 900 // ========================================================================== 845 846 shared_ptr<RUN_CTRL2> actrun; // needed in CloseRunFile847 901 848 902 /* … … 937 991 */ 938 992 993 Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1)); 994 995 // This corresponds more or less to fFile... should we merge both? 996 shared_ptr<RUN_CTRL2> actrun; 997 939 998 void CloseRunFile() 940 999 { 941 // Create a copy of the shared_ptr to ensure 942 // is not replaced in the middle of the action 943 const shared_ptr<RUN_CTRL2> run = actrun; 944 if (run) 945 run->closeRequest |= kRequestManual; 1000 // Currently we need actrun here, to be able to set kFileClosed. 1001 // Apart from that we have to ensure that there is an open file at all 1002 // which we can close. 1003 // Submission to the primary queue ensures that the event 1004 // is placed at the right place in the processing chain. 1005 // (Corresponds to the correct run) 1006 primaryQueue.emplace(new EVT_CTRL2(kRequestManual, actrun)); 946 1007 } 947 1008 … … 949 1010 { 950 1011 factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop"); 951 952 Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));953 1012 954 1013 primaryQueue.start(); 955 1014 secondaryQueue.start(); 1015 processingQueue1.start();; 956 1016 957 1017 actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2); … … 964 1024 while (g_reset == 0) 965 1025 { 1026 #ifdef USE_POLL 1027 int pp[40]; 1028 int nn = 0; 1029 pollfd fds[40]; 1030 for (int i=0; i<40; i++) 1031 { 1032 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0) 1033 { 1034 fds[nn].fd = rd[i].socket; 1035 fds[nn].events = POLLIN; 1036 pp[nn] = i; 1037 nn++; 1038 } 1039 } 1040 1041 const int rc_epoll = poll(fds, nn, 100); 1042 if (rc_epoll<0) 1043 break; 1044 #endif 1045 966 1046 #ifdef USE_SELECT 967 1047 fd_set readfs; … … 978 1058 timeval tv; 979 1059 tv.tv_sec = 0; 980 tv.tv_usec = 100 ;1060 tv.tv_usec = 100000; 981 1061 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv); 982 1062 // 0: timeout … … 995 1075 #endif 996 1076 997 #ifdef USE_EPOLL 1077 #if defined(USE_POLL) 1078 for (int jj=0; jj<nn; jj++) 1079 #endif 1080 #if defined(USE_EPOLL) 998 1081 for (int jj=0; jj<rc_epoll; jj++) 999 #else 1082 #endif 1083 #if !defined(USE_EPOLL) && !defined(USE_POLL) 1000 1084 for (int jj=0; jj<NBOARDS; jj++) 1001 1085 #endif 1002 1086 { 1087 #ifdef USE_SELECT 1088 if (!FD_ISSET(rs->socket, &readfs)) 1089 continue; 1090 #endif 1091 1092 #ifdef USE_POLL 1093 if ((fds[jj].revents&POLLIN)==0) 1094 continue; 1095 #endif 1096 1003 1097 #ifdef USE_EPOLL 1004 1098 // FIXME: How to get i? 1005 1099 READ_STRUCT *rs = READ_STRUCT::get(jj); 1006 #else 1007 1100 #endif 1101 1102 #ifdef USE_POLL 1103 // FIXME: How to get i? 1104 READ_STRUCT *rs = &rd[pp[jj]]; 1105 #endif 1106 1107 #if !defined(USE_POLL) && !defined(USE_EPOLL) 1008 1108 const int i = (jj%4)*10 + (jj/4); 1009 1109 READ_STRUCT *rs = &rd[i]; 1010 if (!rs->connected) 1011 continue; 1012 #endif 1013 1014 #ifdef USE_SELECT 1015 if (!FD_ISSET(rs->socket, &readfs)) 1016 continue; 1017 #endif 1018 1110 #endif 1019 1111 1020 1112 #ifdef COMPLETE_EVENTS … … 1106 1198 1107 1199 // We have a valid entry, but no memory has yet been allocated 1108 if (evt && !evt-> FADhead)1200 if (evt && !evt->initMemory()) 1109 1201 { 1110 // Try to get memory from the big buffer 1111 PEVNT_HEADER *mem = (PEVNT_HEADER*)Memory::malloc(); 1112 if (!mem) 1113 { 1114 // If this works properly, this is a hack which can be removed, or 1115 // replaced by a signal or dim message 1116 if (!rs->repmem) 1117 { 1118 factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum); 1119 rs->repmem = true; 1120 } 1202 if (evt->reportMem) 1121 1203 continue; 1122 } 1123 1124 evt->initEvent(shared_ptr<PEVNT_HEADER>(mem, Memory::free)); 1204 1205 factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum); 1206 evt->reportMem = true; 1207 continue; 1125 1208 } 1126 1209 … … 1133 1216 if (!evt) 1134 1217 continue; 1135 1136 /*1137 const int fad = (i/10)<<8)|(i%10);1138 if (fad != rs->H.board_id)1139 {1140 factPrintf(MessageImp::kWarn, "Board ID mismatch. Expected %x, got %x", fad, rs->H.board_id);1141 }*/1142 1218 1143 1219 // This should never happen … … 1161 1237 #endif 1162 1238 // now we have stored a new board contents into Event structure 1163 evt->fEvent->NumBoards++;1164 1239 evt->board[rs->sockId] = rs->sockId; 1240 evt->header = evt->FADhead+rs->sockId; 1165 1241 evt->nBoard++; 1166 1242 1167 1243 #ifdef COMPLETE_EPOLL 1168 1244 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0) 1245 { 1169 1246 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno); 1247 break; 1248 } 1170 1249 #endif 1171 1250 // event not yet complete … … 1199 1278 ev.data.ptr = &rd[j]; // user data (union: ev.ptr) 1200 1279 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0) 1280 { 1201 1281 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno); 1282 return; 1283 } 1202 1284 } 1203 1285 #endif … … 1229 1311 if (actTime == gi_SecTime) 1230 1312 { 1231 #if !defined(USE_SELECT) && !defined(USE_EPOLL) 1313 #if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL) 1232 1314 if (evtCtrl.empty()) 1233 1315 usleep(1); … … 1263 1345 } 1264 1346 1265 // If nothing was received for more than 5min, close file1266 if (actTime-actrun->lastTime>300)1267 actrun->closeRequest |= kRequestTimeout;1268 1269 1347 // ================================================================= 1270 1348 … … 1275 1353 gj.deltaT = 1000; // temporary, must be improved 1276 1354 1355 bool changed = false; 1356 1277 1357 for (int ib=0; ib<NBOARDS; ib++) 1278 1358 { … … 1281 1361 1282 1362 if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr)) 1283 actrun->closeRequest |= kRequestConnectionChange;1363 changed = true; 1284 1364 1285 1365 gi_NumConnect[ib] = rd[ib].connected; … … 1287 1367 } 1288 1368 1289 1290 1369 factStat(gj); 1291 1370 1292 1371 Memory::max_inuse = 0; 1293 1372 gj.maxEvt = 0; 1373 1294 1374 for (int ib=0; ib<NBOARDS; ib++) 1295 1375 rd[ib].rateBytes = 0; … … 1301 1381 // if a new file has been started and no events of the new file 1302 1382 // have been received yet 1383 int request = kRequestNone; 1384 1385 // If nothing was received for more than 5min, close file 1386 if (actTime-actrun->lastTime>300) 1387 request |= kRequestTimeout; 1388 1389 // If connection status has changed 1390 if (changed) 1391 request |= kRequestConnectionChange; 1392 1393 if (request!=kRequestNone) 1394 runFinished(); 1395 1303 1396 if (actrun->fileStat==kFileOpen) 1304 primaryQueue. post(shared_ptr<EVT_CTRL2>(new EVT_CTRL2(actrun)));1397 primaryQueue.emplace(new EVT_CTRL2(request, actrun)); 1305 1398 } 1306 1399 … … 1322 1415 // Here we also destroy all runCtrl structures and hence close all open files 1323 1416 evtCtrl.clear(); 1417 actrun.reset(); 1324 1418 1325 1419 factPrintf(MessageImp::kInfo, "Exit read Process..."); … … 1338 1432 factPrintf(MessageImp::kInfo, "Starting EventBuilder++"); 1339 1433 1340 1341 for (int k=0; k<NBOARDS; k++) 1342 { 1343 gi_NumConnect[k] = 0; 1344 gj.numConn[k] = 0; 1345 gj.totBytes[k] = 0; 1346 } 1434 memset(gi_NumConnect, 0, NBOARDS*sizeof(*gi_NumConnect)); 1435 memset(gj.numConn, 0, NBOARDS*sizeof(*gj.numConn)); 1436 memset(gj.totBytes, 0, NBOARDS*sizeof(*gj.totBytes)); 1347 1437 1348 1438 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
Note:
See TracChangeset
for help on using the changeset viewer.