Changeset 16103
- Timestamp:
- 05/24/13 13:53:35 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.cc
r16040 r16103 8 8 #include <forward_list> 9 9 10 #include <boost/algorithm/string/join.hpp> 11 10 12 #include "queue.h" 11 13 12 14 #include "MessageImp.h" 15 #include "EventBuilder.h" 13 16 14 17 using namespace std; 15 18 16 #include "EventBuilder.h" 17 18 #define MIN_LEN 32 // min #bytes needed to interpret FADheader 19 #define MAX_LEN (36*3*1024) // (data+header)*num channels 20 21 #define COMPLETE_EVENTS 19 #define MIN_LEN 32 // min #bytes needed to interpret FADheader 20 #define MAX_LEN 81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092 (data+boardheader+eventheader+endflag) 21 22 //#define COMPLETE_EVENTS 22 23 //#define USE_EPOLL 23 24 //#define USE_SELECT 25 //#define COMPLETE_EPOLL 24 26 25 27 // ========================================================================== … … 206 208 void destroy(); 207 209 bool create(sockaddr_in addr); 208 voidcheck(int, sockaddr_in addr);210 bool check(int, sockaddr_in addr); 209 211 bool read(); 210 212 }; … … 246 248 { 247 249 #ifdef USE_EPOLL 248 if (::close(fd_epoll) > 0) 249 factPrintf(MessageImp::kFatal, "Closing epoll: %m (close,rc=%d)", errno); 250 else 251 factPrintf(MessageImp::kInfo, "Succesfully closed epoll"); 250 if (fd_epoll>=0 && ::close(fd_epoll)>0) 251 factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno); 252 252 #endif 253 253 … … 289 289 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); 290 290 291 factPrintf(MessageImp::kInfo, " Successfully generated socket %d", sockId);291 factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket); 292 292 293 293 //connected = false; … … 299 299 void READ_STRUCT::destroy() 300 300 { 301 if (socket ==-1)301 if (socket<0) 302 302 return; 303 303 … … 311 311 factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno); 312 312 else 313 factPrintf(MessageImp::kInfo, " Succesfully closed socket %d", sockId);313 factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket); 314 314 315 315 socket = -1; … … 318 318 } 319 319 320 voidREAD_STRUCT::check(int sockDef, sockaddr_in addr)320 bool READ_STRUCT::check(int sockDef, sockaddr_in addr) 321 321 { 322 322 // Continue in the most most likely case (performance) 323 323 //if (socket>=0 && sockDef!=0 && connected) 324 324 // return; 325 const int old = socket; 325 326 326 327 // socket open, but should not be open … … 332 333 create(addr); //generate address and socket 333 334 335 const bool retval = old!=socket; 336 334 337 // Socket closed 335 338 if (socket<0) 336 return ;339 return retval; 337 340 338 341 // Socket open and connected: Nothing to do 339 342 if (connected) 340 return ;343 return retval; 341 344 342 345 //try to connect if not yet done 343 346 const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr)); 344 347 if (rc == -1) 345 return ;348 return retval; 346 349 347 350 connected = true; … … 362 365 repmem = false; 363 366 364 factPrintf(MessageImp::kInfo, " New connection%d (%d)", sockId, socket);367 factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket); 365 368 366 369 #ifdef USE_EPOLL … … 371 374 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno); 372 375 #endif 376 377 return retval; 373 378 } 374 379 … … 588 593 actrun->maxEvt = actrun->lastEvt; 589 594 590 factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d and roi_tm=%d, prev=%d",595 factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d", 591 596 rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId); 592 597 … … 601 606 actrun->roi0 = nRoi[0]; // FIXME: Make obsolete! 602 607 actrun->roi8 = nRoi[8]; // FIXME: Make obsolete! 608 609 // Signal the fadctrl that a new run has been started 610 // Note this is the only place at which we can ensure that 611 // gotnewRun is called only once 612 gotNewRun(*actrun); 603 613 } 604 614 … … 614 624 // if the event is accessed before it is fully initialized. 615 625 evtCtrl.push_back(evt); 616 617 // Signal the fadctrl that a new run has been started618 // Note this is the only place at which we can ensure that619 // gotnewRun is called only once620 // Note that this will callback CloseRunFile, therefor the event621 // must already be in the evtCtrl structure622 if (newrun)623 gotNewRun(*actrun);624 626 625 627 // An event can be the first and the last, but not the last and the first. … … 743 745 const shared_ptr<RUN_CTRL2> &run = evt->runCtrl; 744 746 745 bool rc1 = true;746 747 747 // Is this a valid event or just an empty event to trigger run close? 748 748 // If this is not an empty event open the new run-file … … 756 756 if (!runOpen(evt)) 757 757 { 758 factPrintf(MessageImp::kError, " writeEvt:Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);758 factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum); 759 759 run->fileStat = kFileClosed; 760 760 return; 761 761 } 762 762 763 factPrintf(MessageImp::kInfo, " writeEvt:Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);763 factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum); 764 764 run->fileStat = kFileOpen; 765 765 } … … 767 767 // Here we have a valid calibration and can go on with that. 768 768 processingQueue1.post(evt); 769 770 // File already closed 771 if (run->fileStat==kFileClosed) 772 return; 773 769 } 770 771 // File already closed 772 if (run->fileStat==kFileClosed) 773 return; 774 775 bool rc1 = true; 776 if (evt->runNum>=0) 777 { 774 778 rc1 = runWrite(evt); 775 779 if (!rc1) 776 factPrintf(MessageImp::kError, "writeEvt: Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum); 777 } 778 779 const bool cond1 = run->lastEvt < run->maxEvt; // max number of events not reached 780 const bool cond2 = run->lastTime < run->closeTime; // max time not reached 781 const bool cond3 = rc1; // Write successfull 780 factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum); 781 } 782 783 const bool cond1 = run->lastEvt < run->maxEvt; // max number of events not reached 784 const bool cond2 = run->lastTime < run->closeTime; // max time not reached 785 const bool cond3 = run->closeRequest==kRequestNone; // file signaled to be closed 786 const bool cond4 = rc1; // Write successfull 782 787 783 788 // File is not yet to be closed. 784 if (cond1 && cond2 && cond3 )789 if (cond1 && cond2 && cond3 && cond4) 785 790 return; 786 791 … … 788 793 run->fileStat = kFileClosed; 789 794 790 string str; 791 if (!cond1) str += to_string(run->maxEvt)+" evts reached"; 792 if (!cond1 && (!cond2 || !cond3)) str += ", "; 793 if (!cond2) str += to_string(run->closeTime-run->openTime)+"s reached"; 794 if ((!cond1 || !cond2) && !cond3) str += ", "; 795 if (!cond3) str += "runWrite failed"; 795 vector<string> reason; 796 if (!cond1) 797 reason.push_back(to_string(run->maxEvt)+" evts reached"); 798 if (!cond2) 799 reason.push_back(to_string(run->closeTime-run->openTime)+"s reached"); 800 if (!cond3) 801 { 802 if (run->closeRequest&kRequestManual) 803 reason.push_back("close requested"); 804 if (run->closeRequest&kRequestTimeout) 805 reason.push_back("receive timeout"); 806 if (run->closeRequest&kRequestConnectionChange) 807 reason.push_back("connection changed"); 808 } 809 if (!cond4) 810 reason.push_back("runWrite failed"); 811 812 const string str = boost::algorithm::join(reason, ", "); 796 813 factPrintf(MessageImp::kInfo, "File closed because %s", str.c_str()); 797 814 } … … 921 938 // is not replaced in the middle of the action 922 939 const shared_ptr<RUN_CTRL2> run = actrun; 923 run->maxEvt = run->lastEvt; 940 if (run) 941 run->closeRequest |= kRequestManual; 924 942 } 925 943 … … 1024 1042 for (k=0; k<sizeof(PEVNT_HEADER)-1; k++) 1025 1043 { 1026 if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)1027 //if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0xfb01)1044 //if (rs->B[k]==0xfb && rs->B[k+1] == 0x01) 1045 if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0x01fb) 1028 1046 break; 1029 1047 } … … 1143 1161 evt->nBoard++; 1144 1162 1163 #ifdef COMPLETE_EPOLL 1164 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0) 1165 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno); 1166 #endif 1145 1167 // event not yet complete 1146 1168 if (evt->nBoard < READ_STRUCT::activeSockets) … … 1166 1188 } 1167 1189 1190 #ifdef COMPLETE_EPOLL 1191 for (int j=0; j<40; j++) 1192 { 1193 epoll_event ev; 1194 ev.events = EPOLLIN; 1195 ev.data.ptr = &rd[j]; // user data (union: ev.ptr) 1196 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0) 1197 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno); 1198 } 1199 #endif 1200 1168 1201 #ifdef COMPLETE_EVENTS 1169 1202 for (int j=0; j<40; j++) … … 1181 1214 // ================================================================== 1182 1215 1183 // +1 -> idx=0 1184 // -1 -> idx=0 1185 // +2 -> idx=0 1186 // -2 -> idx=0 1187 // +3 -> idx=0 1188 // -3 -> idx=0 1189 // +4 -> idx=0 1190 // -4 -> idx=0 1191 // +5 -> idx=0 1192 // -5 -> idx=0 1193 // +6 -> idx=0 1194 // -6 -> idx=0 1195 // 1216 gj.bufNew = evtCtrl.size(); //# incomplete events in buffer 1217 gj.bufEvt = primaryQueue.size(); //# complete events in buffer 1218 gj.bufTot = gj.bufNew+gj.bufEvt; //# total events currently in buffer 1219 if (gj.bufNew>gj.maxEvt) //# maximum events in buffer past cycle 1220 gj.maxEvt = gj.bufNew; 1196 1221 1197 1222 // ================================================================== … … 1201 1226 { 1202 1227 #if !defined(USE_SELECT) && !defined(USE_EPOLL) 1203 if (evtCtrl. size()==0)1228 if (evtCtrl.empty()) 1204 1229 usleep(1); 1205 1230 #endif … … 1211 1236 //loop over all active events and flag those older than read-timeout 1212 1237 //delete those that are written to disk .... 1213 //const int count = evtCtrl.size();1214 1238 1215 1239 // This could be improved having the pointer which separates the queue with … … 1235 1259 } 1236 1260 1237 // =================================================================1238 1239 1261 // If nothing was received for more than 5min, close file 1240 1262 if (actTime-actrun->lastTime>300) 1241 actrun->maxEvt = actrun->lastEvt; 1263 actrun->closeRequest |= kRequestTimeout; 1264 1265 // ================================================================= 1266 1267 gj.bufTot = Memory::max_inuse/MAX_TOT_MEM; 1268 gj.usdMem = Memory::max_inuse; 1269 gj.totMem = Memory::allocated; 1270 1271 gj.deltaT = 1000; // temporary, must be improved 1272 1273 for (int ib=0; ib<NBOARDS; ib++) 1274 { 1275 gj.rateBytes[ib] = rd[ib].rateBytes; 1276 gj.totBytes[ib] += rd[ib].rateBytes; 1277 1278 if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr)) 1279 actrun->closeRequest |= kRequestConnectionChange; 1280 1281 gi_NumConnect[ib] = rd[ib].connected; 1282 gj.numConn[ib] = rd[ib].connected; 1283 } 1284 1285 1286 factStat(gj); 1287 1288 Memory::max_inuse = 0; 1289 gj.maxEvt = 0; 1290 for (int ib=0; ib<NBOARDS; ib++) 1291 rd[ib].rateBytes = 0; 1292 1293 // ================================================================= 1242 1294 1243 1295 // This is a fake event to trigger possible run-closing conditions once a second … … 1247 1299 if (actrun->fileStat==kFileOpen) 1248 1300 primaryQueue.post(shared_ptr<EVT_CTRL2>(new EVT_CTRL2(actrun))); 1249 1250 // =================================================================1251 1252 gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;1253 gj.usdMem = Memory::max_inuse;1254 gj.totMem = Memory::allocated;1255 1256 gj.deltaT = 1000; // temporary, must be improved1257 1258 for (int ib=0; ib<NBOARDS; ib++)1259 {1260 gj.rateBytes[ib] = rd[ib].rateBytes;1261 gj.totBytes[ib] += rd[ib].rateBytes;1262 1263 rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr);1264 1265 gi_NumConnect[ib] = rd[ib].connected;1266 gj.numConn[ib] = rd[ib].connected;1267 }1268 1269 factStat(gj);1270 1271 Memory::max_inuse = 0;1272 1273 for (int ib=0; ib<NBOARDS; ib++)1274 rd[ib].rateBytes = 0;1275 1301 } 1276 1302 … … 1288 1314 primaryQueue.wait(abort); 1289 1315 secondaryQueue.wait(abort); 1316 processingQueue1.wait(abort); 1290 1317 1291 1318 // Here we also destroy all runCtrl structures and hence close all open files
Note:
See TracChangeset
for help on using the changeset viewer.