Changeset 11748 for trunk/FACT++/src/EventBuilder.c
- Timestamp:
- 08/02/11 20:32:38 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.c
r11689 r11748 1 1 2 2 // // // #define EVTDEBUG 3 4 5 3 6 4 #include <stdlib.h> … … 17 15 #include <sys/types.h> 18 16 #include <sys/socket.h> 17 #include <netinet/in.h> 18 #include <netinet/tcp.h> 19 19 #include <pthread.h> 20 20 #include <sched.h> … … 48 48 extern void factStatNew(EVT_STAT gi) ; 49 49 50 extern int eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ; 50 extern int eventCheck( uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event) ; 51 52 extern int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer) ; 51 53 52 54 extern void debugHead(int i, int j, void *buf); … … 58 60 int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt); 59 61 62 63 int g_maxProc ; 64 int g_maxSize ; 65 int gi_maxSize ; 66 int gi_maxProc ; 60 67 61 68 uint g_actTime ; … … 194 201 195 202 int j ; 203 int optval = 1 ; //activate keepalive 204 socklen_t optlen = sizeof(optval); 196 205 197 206 if (rd->sockStat ==0 ) { //close socket if open … … 243 252 return -2 ; 244 253 } 254 optval=1; 255 if ( setsockopt(rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) { 256 snprintf(str,MXSTR,"Could not set keepalive %d | %m",sid); 257 factOut(kInfo,173, str ) ; //but continue anyhow 258 } 259 optval=10; //start after 10 seconds 260 if ( setsockopt(rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) { 261 snprintf(str,MXSTR,"Could not set keepidle %d | %m",sid); 262 factOut(kInfo,173, str ) ; //but continue anyhow 263 } 264 optval=10; //do every 10 seconds 265 if ( setsockopt(rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) { 266 snprintf(str,MXSTR,"Could not set keepintvl %d | %m",sid); 267 factOut(kInfo,173, str ) ; //but continue anyhow 268 } 269 optval=2; //close after 2 unsuccessful tries 270 if ( setsockopt(rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) { 271 snprintf(str,MXSTR,"Could not set keepalive probes %d | %m",sid); 272 factOut(kInfo,173, str ) ; //but continue anyhow 273 } 274 275 245 276 246 277 snprintf(str,MXSTR,"Successfully generated socket %d ",sid); … … 352 383 // count for inconsistencies 353 384 354 if ( mBuffer[i].trgNum != trgNum ) mBuffer[i].Errors ++;355 if ( mBuffer[i].fadNum != fadNum ) mBuffer[i].Errors +=100;356 if ( mBuffer[i].trgTyp != trgTyp ) mBuffer[i].Errors +=10000;385 if ( mBuffer[i].trgNum != trgNum ) mBuffer[i].Errors[0]++ ; 386 if ( mBuffer[i].fadNum != fadNum ) mBuffer[i].Errors[1]++ ; 387 if ( mBuffer[i].trgTyp != trgTyp ) mBuffer[i].Errors[2]++ ; 357 388 358 389 //everything seems fine so far ==> use this slot .... … … 428 459 headmem = NBOARDS* sizeof(PEVNT_HEADER) ; 429 460 430 if ( gj.usdMem + needmem + headmem > g_maxMem) {431 gj.maxMem = gj.usdMem + needmem + headmem ;461 if ( gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) { 462 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize ; 432 463 if (gi_memStat>0 ) { 433 464 gi_memStat = -99 ; … … 452 483 snprintf(str,MXSTR,"malloc data failed for event %d",evID) ; 453 484 factOut(kError,882, str ) ; 485 free(mBuffer[i].FADhead) ; 486 mBuffer[i].FADhead = NULL ; 487 return -22; 488 } 489 490 mBuffer[i].buffer = malloc( gi_maxSize ) ; 491 if (mBuffer[i].buffer == NULL) { 492 snprintf(str,MXSTR,"malloc buffer failed for event %d",evID) ; 493 factOut(kError,882, str ) ; 494 free(mBuffer[i].FADhead) ; 495 mBuffer[i].FADhead = NULL ; 454 496 free(mBuffer[i].fEvent) ; 455 497 mBuffer[i].fEvent = NULL ; 456 return - 22;498 return -32; 457 499 } 458 500 … … 485 527 mBuffer[i].trgTyp = trgTyp ; 486 528 mBuffer[i].evtLen = needmem ; 487 mBuffer[i].Errors = 0 ; 488 489 gj.usdMem += needmem + headmem; 529 mBuffer[i].Errors[0] = 530 mBuffer[i].Errors[1] = 531 mBuffer[i].Errors[2] = 532 mBuffer[i].Errors[3] = 0 ; 533 534 gj.usdMem += needmem + headmem + gi_maxSize ; 490 535 if (gj.usdMem > gj.maxMem ) gj.maxMem = gj.usdMem ; 491 536 … … 535 580 mBuffer[i].FADhead = NULL ; 536 581 582 free(mBuffer[i].buffer ) ; 583 mBuffer[i].buffer = NULL ; 584 537 585 headmem = NBOARDS* sizeof(PEVNT_HEADER) ; 538 586 mBuffer[i].evNum = mBuffer[i].nRoi= -1; 539 587 mBuffer[i].runNum = 0; 540 588 541 gj.usdMem = gj.usdMem - freemem - headmem ;589 gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize ; 542 590 gj.bufTot-- ; 543 591 … … 634 682 factOut(kWarn,-1, str ) ; 635 683 } 684 636 685 637 686 head_len = sizeof(PEVNT_HEADER) ; … … 988 1037 mBuffer[evID].evNum,roi[0],roi[8]-roi[0],qncpy,qnrun); 989 1038 factOut(kDebug,-1, str ) ; 990 factOut(kInfo,-1, str ) ;991 1039 992 1040 //complete event read ---> flag for next processing … … 1035 1083 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ; 1036 1084 rd[i].ftmTyp = ntohl(rd[i].rBuf->S[5]) ; 1085 rd[i].ftmID = ntohl(rd[i].rBuf->I[3]) ; //(FTMevt) 1037 1086 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; //(FADevt) 1038 rd[i].ftmID = ntohl(rd[i].rBuf->I[5]) ; //(FTMevt)1039 1087 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ; 1040 1088 rd[i].bufTyp = 1 ; //ready to read full record … … 1107 1155 gj.evtSkip++; 1108 1156 } 1109 } else if (evtCtrl.evtStat[k0] >= 900 1157 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete' 1110 1158 || evtCtrl.evtStat[k0] == 0 ) { //'useless' 1111 1159 … … 1287 1335 1288 1336 1337 void *subProc( void *thrid ) { 1338 int threadID,status,numWait,numProc,kd,k1,k0,k,jret; 1339 struct timespec xwait ; 1340 1341 threadID= (int) thrid; 1342 1343 snprintf(str,MXSTR,"Starting sub-process-thread %d",threadID); 1344 factOut(kInfo,-1, str ) ; 1345 1346 while (g_runStat > -2) { //in case of 'exit' we still must process pending events 1347 numWait = numProc = 0 ; 1348 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 1349 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 1350 1351 int k1=evtCtrl.frstPtr; 1352 for ( k=k1; k<(k1+kd); k++ ) { 1353 int k0 = k % (MAX_EVT*MAX_RUN) ; 1354 1355 if (evtCtrl.evtStat[k0] ==1000+threadID) { 1356 if ( gi_resetR > 1 ) { //we are asked to flush buffers asap 1357 jret= 9100 ; //flag to be deleted 1358 } else { 1359 int id = evtCtrl.evtBuf[k0] ; 1360 jret=subProcEvt(threadID, mBuffer[id].FADhead, mBuffer[id].fEvent, mBuffer[id].buffer) ; 1361 if (jret <= threadID) { 1362 snprintf(str,MXSTR,"process %d wants to send to process %d",threadID,jret) ; 1363 factOut(kError,-1, str ) ; 1364 jret = 5300; 1365 } else if ( jret <=0 ) jret = 9200+threadID ; //flag as 'to be deleted' 1366 else if ( jret >=gi_maxProc ) jret = 5200+threadID ; //flag as 'to be written' 1367 else jret = 1000+jret ; //flag for next proces 1368 } 1369 evtCtrl.evtStat[k0] = jret ; 1370 numProc++ ; 1371 } else if (evtCtrl.evtStat[k0] <1000+threadID) numWait++ ; 1372 } 1373 1374 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do 1375 snprintf(str,MXSTR,"Exit subProcessing Process %d",threadID); 1376 factOut(kInfo,-1, str ) ; 1377 return 0 ; 1378 } 1379 if (numProc == 0) { 1380 //seems we have nothing to do, so sleep a little 1381 xwait.tv_sec = 0; 1382 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 1383 nanosleep( &xwait , NULL ) ; 1384 } 1385 } 1386 1387 snprintf(str,MXSTR,"Ending sub-process-thread %d",threadID); 1388 factOut(kInfo,-1, str ) ; 1389 return ; 1390 } /*-----------------------------------------------------------------*/ 1391 1392 1289 1393 void *procEvt( void *ptr ) { 1290 1394 /* *** main loop processing file, including SW-trigger */ 1291 1395 int numProc, numWait ; 1292 int k ;1396 int k, status ; 1293 1397 struct timespec xwait ; 1294 1398 char str[MXSTR] ; 1295 1399 1400 1401 1402 1296 1403 cpu_set_t mask; 1297 int cpu = 5; //process thread (will be several in final version)1298 1299 snprintf(str,MXSTR,"Starting process-thread ");1404 int cpu = 1 ; //process thread (will be several in final version) 1405 1406 snprintf(str,MXSTR,"Starting process-thread with %d subprocess",gi_maxProc); 1300 1407 factOut(kInfo,-1, str ) ; 1301 1408 … … 1303 1410 CPU_ZERO( &mask ); 1304 1411 /* CPU_SET sets only the bit corresponding to cpu. */ 1305 CPU_SET( cpu, &mask ); 1412 // CPU_SET( 0 , &mask ); leave for system 1413 // CPU_SET( 1 , &mask ); used by write process 1414 CPU_SET( 2 , &mask ); 1415 CPU_SET( 3 , &mask ); 1416 CPU_SET( 4 , &mask ); 1417 CPU_SET( 5 , &mask ); 1418 CPU_SET( 6 , &mask ); 1419 // CPU_SET( 7 , &mask ); used by read process 1306 1420 /* sched_setaffinity returns 0 in success */ 1307 1421 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) { … … 1311 1425 1312 1426 1427 pthread_t thread[100] ; 1428 int th_ret[100]; 1429 1430 for (k=0; k < gi_maxProc; k++) { 1431 th_ret[k] = pthread_create( &thread[k], NULL, subProc, (void *)k ); 1432 } 1433 1313 1434 while (g_runStat > -2) { //in case of 'exit' we still must process pending events 1314 1435 … … 1321 1442 int k0 = k % (MAX_EVT*MAX_RUN) ; 1322 1443 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if) 1323 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 500) {1444 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <1000) { 1324 1445 1325 1446 if ( gi_resetR > 1 ) { //we are asked to flush buffers asap 1326 evtCtrl.evtStat[k0] = 99 1 ;1447 evtCtrl.evtStat[k0] = 9991 ; 1327 1448 } else { 1328 1449 … … 1333 1454 int roi = mBuffer[id].nRoi ; 1334 1455 int roiTM= mBuffer[id].nRoiTM ; 1335 int Errors=mBuffer[id].Errors ;1336 1456 // uint32_t irun = mBuffer[id].runNum ; 1337 1457 //snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ; … … 1361 1481 mBuffer[id].fEvent->TriggerNum = itevt ; 1362 1482 mBuffer[id].fEvent->TriggerType = itrg ; 1363 mBuffer[id].fEvent->Errors = Errors ; 1483 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0] ; 1484 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1] ; 1485 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2] ; 1486 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3] ; 1364 1487 mBuffer[id].fEvent->SoftTrig = 0 ; 1365 1488 … … 1375 1498 } 1376 1499 1377 int i=eventCheck(mBuffer[id].FADhead,mBuffer[id].fEvent) ; 1500 int i=eventCheck(mBuffer[id].runNum, mBuffer[id].FADhead, 1501 mBuffer[id].fEvent) ; 1378 1502 // gj.procEvt++ ; 1379 1503 gi.procTot++ ; … … 1381 1505 1382 1506 if (i<0) { 1383 evtCtrl.evtStat[k0] = 999 ; //flag event to be skipped1507 evtCtrl.evtStat[k0] = 9999 ; //flag event to be skipped 1384 1508 gi.procErr++ ; 1385 1509 } else { 1386 evtCtrl.evtStat[k0] = 520 ;1510 evtCtrl.evtStat[k0] = 1000 ; 1387 1511 } 1388 1512 } … … 1419 1543 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 1420 1544 1545 for (k=0; k<gi_maxProc; k++) { 1546 pthread_join( thread[k], (void **)&status) ; 1547 } 1548 1421 1549 int k1=evtCtrl.frstPtr; 1422 1550 for ( k=k1; k<(k1+kd); k++ ) { 1423 1551 int k0 = k % (MAX_EVT*MAX_RUN) ; 1424 if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] < 500) {1425 evtCtrl.evtStat[k0] = 555; //flag event as 'processed'1552 if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <1000) { 1553 evtCtrl.evtStat[k0] = 9800 ; //flag event as 'processed' 1426 1554 } 1427 1555 } … … 1479 1607 1480 1608 cpu_set_t mask; 1481 int cpu = 3; //write thread1609 int cpu = 1 ; //write thread 1482 1610 1483 1611 snprintf(str,MXSTR,"Starting write-thread"); … … 1505 1633 int k0 = k % (MAX_EVT*MAX_RUN) ; 1506 1634 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if) 1507 if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] <900) {1635 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9900) { 1508 1636 1509 1637 if ( gi_resetR > 1 ) { //we must drain the buffer asap 1510 evtCtrl.evtStat[k0] = 9 04 ;1638 evtCtrl.evtStat[k0] = 9904 ; 1511 1639 } else { 1512 1640 … … 1572 1700 factOut(kDebug,123,str) ; 1573 1701 } 1574 evtCtrl.evtStat[k0] = 9 03 ;1702 evtCtrl.evtStat[k0] = 9903 ; 1575 1703 gi.wrtErr++ ; 1576 1704 } else { … … 1579 1707 runCtrl[j].lastTime = g_actTime; 1580 1708 runCtrl[j].actEvt++ ; 1581 evtCtrl.evtStat[k0] = 9 01 ;1709 evtCtrl.evtStat[k0] = 9901 ; 1582 1710 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0); 1583 1711 factOut(kDebug,504, str ) ; … … 1586 1714 snprintf(str,MXSTR,"W error writing event for run %d",irun) ; 1587 1715 factOut(kError,503, str ) ; 1588 evtCtrl.evtStat[k0] = 9 02 ;1716 evtCtrl.evtStat[k0] = 9902 ; 1589 1717 gi.wrtErr++ ; 1590 1718 } … … 1618 1746 } 1619 1747 } else if (evtCtrl.evtStat[k0] > 0 1620 && evtCtrl.evtStat[k0] < 900 ) numWait++ ;1748 && evtCtrl.evtStat[k0] < 9000 ) numWait++ ; 1621 1749 } 1622 1750 … … 1724 1852 } 1725 1853 1854 //prepare for subProcesses 1855 gi_maxSize = g_maxSize ; 1856 if (gi_maxSize <=0 ) gi_maxSize = 1 ; 1857 1858 gi_maxProc = g_maxProc ; 1859 if (gi_maxProc <=0 || gi_maxProc>90) { 1860 snprintf(str,MXSTR,"illegal number of processes %d",gi_maxProc ) ; 1861 factOut(kFatal,301, str ) ; 1862 gi_maxProc=1; 1863 } 1864 1726 1865 //partially initialize event control logics 1727 1866 evtCtrl.frstPtr = 0 ; … … 1791 1930 /*-----------------------------------------------------------------*/ 1792 1931 1793 1794 1932 #ifdef BILAND 1933 1934 int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer) 1935 { 1936 printf("called subproc %d\n",threadID) ; 1937 return threadID+1 ; 1938 } 1939 1940 1795 1941 1796 1942 … … 1818 1964 1819 1965 1820 int eventCheck( PEVNT_HEADER *fadhd, EVENT *event) 1966 1967 int eventCheck( uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event) 1821 1968 { 1822 1969 int i=0; … … 1888 2035 g_maxMem = g_maxMem * 200; //100MBytes 1889 2036 2037 g_maxProc = 20 ; 2038 g_maxSize = 30000 ; 1890 2039 1891 2040 g_runStat = 40 ;
Note:
See TracChangeset
for help on using the changeset viewer.