Changeset 11335 for trunk/FACT++/src/EventBuilder.c
- Timestamp:
- 07/11/11 11:08:16 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.c
r11323 r11335 54 54 extern void debugStream(int isock, void *buf, int len) ; 55 55 56 57 58 int g_actTime ; 59 int g_runStat ; 56 int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt); 57 58 59 uint g_actTime ; 60 int g_runStat ; 61 int g_reset ; 60 62 size_t g_maxMem ; //maximum memory allowed for buffer 61 63 … … 251 253 evtCtrl.evtStat[ i] = -1 ; 252 254 evtCtrl.pcTime[ i] = actime ; //initiate to far future 253 254 255 } 255 256 … … 488 489 int actBoards = 0, minLen ; 489 490 int32_t jrd ; 491 uint gi_SecTime ; //time in seconds 490 492 int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ; 491 493 492 494 int goodhed = 0 ; 493 495 int errcnt0 = 0 ; 496 497 int reset, resetR, resetS, resetW, resetX ; 498 int sockDef[NBOARDS]; //internal state of sockets 499 500 494 501 495 502 struct timespec xwait ; … … 508 515 509 516 /* CPU_ZERO initializes all the bits in the mask to zero. */ 510 517 CPU_ZERO( &mask ); 511 518 /* CPU_SET sets only the bit corresponding to cpu. */ 512 513 519 cpu = 7 ; 520 CPU_SET( cpu, &mask ); 514 521 515 522 /* sched_setaffinity returns 0 in success */ 516 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) { 517 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu); 518 factOut(kWarn,-1, str ) ; 519 } 520 521 522 //make sure all sockets are preallocated as 'not exist' 523 for (i=0; i<MAX_SOCK; i++) { 524 rd[i].socket = -1 ; 525 rd[i].sockStat = 99 ; 523 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) { 524 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu); 525 factOut(kWarn,-1, str ) ; 526 526 } 527 527 528 int sockDef[NBOARDS]; //internal state of sockets529 for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ;530 531 532 resetEvtStat();533 gj.usdMem = gj.maxMem = gj.xxxMem = 0 ;534 gj.totMem = g_maxMem ;535 gj.bufNew = gj.bufEvt = 0 ;536 gj.evtSkip= gj.evtWrite = gj.evtErr = 0 ;537 538 for (k=0; k<NBOARDS; k++) {539 gi_NumConnect[k]=0;540 gi.numConn[k] =0;541 gj.numConn[k] =0;542 gj.errConn[k] =0;543 gj.rateBytes[k] =0;544 gj.totBytes[k] =0;545 }546 547 548 uint gi_SecTime ; //time in seconds549 gi_SecTime= g_actTime ;550 551 mBufInit() ; //initialize buffers552 553 snprintf(str,MXSTR,"end initializing");554 factOut(kInfo,-1, str ) ;555 556 557 558 528 head_len = sizeof(PEVNT_HEADER) ; 559 //frst_len = head_len + 36 * 12 ; //fad_header plus 36*pix_header560 529 frst_len = head_len ; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0 561 562 //minLen = MIN_LEN ; //min #bytes needed to check header563 530 minLen = head_len ; //min #bytes needed to check header: full header for debug 564 565 numok = numok2 = 0 ;566 531 567 532 start.S=0xFB01; 568 533 stop.S= 0x04FE; 569 534 570 gi_myRun = g_actTime ; 535 /* initialize run control logics */ 536 for (i=0; i<MAX_RUN; i++) { 537 runCtrl[i].runId = 0 ; 538 runCtrl[i].fileId = -2 ; 539 } 540 resetS = resetR = 9; 541 542 543 START: 544 evtCtrl.frstPtr = 0 ; 545 evtCtrl.lastPtr = 0 ; 546 547 gi_myRun = g_actTime ; 548 gi_SecTime = g_actTime ; 571 549 gi_runStat = g_runStat ; 572 550 gj.readStat= g_runStat ; 573 574 575 while (g_runStat >=0) { //loop until global variable g_runStat claims stop 551 numok = numok2 = 0 ; 552 553 if ( resetS > 0) { 554 //make sure all sockets are preallocated as 'not exist' 555 for (i=0; i<MAX_SOCK; i++) { 556 rd[i].socket = -1 ; 557 rd[i].sockStat = 99 ; 558 } 559 560 for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ; 561 562 } 563 564 565 if ( resetR > 0) { 566 resetEvtStat(); 567 gj.usdMem = gj.maxMem = gj.xxxMem = 0 ; 568 gj.totMem = g_maxMem ; 569 gj.bufNew = gj.bufEvt = 0 ; 570 gj.evtSkip= gj.evtWrite = gj.evtErr = 0 ; 571 572 for (k=0; k<NBOARDS; k++) { 573 gi_NumConnect[k]=0; 574 gi.numConn[k] =0; 575 gj.numConn[k] =0; 576 gj.errConn[k] =0; 577 gj.rateBytes[k] =0; 578 gj.totBytes[k] =0; 579 } 580 581 mBufInit() ; //initialize buffers 582 583 snprintf(str,MXSTR,"end initializing"); 584 factOut(kInfo,-1, str ) ; 585 } 586 587 588 reset = resetR = resetS = resetW = 0 ; 589 590 while (g_runStat >=0 || g_reset ==0 ) { //loop until global variable g_runStat claims stop 576 591 577 592 gi_runStat = g_runStat; 578 593 gj.readStat= g_runStat; 579 // if (gi.totMem < 50000000 ) gi.totMem = 500000000 ;580 594 581 595 int b,p,p0,s0,nch; … … 867 881 868 882 if (evtCtrl.evtStat[k0] > 0 869 && evtCtrl.evtStat[k0] < 9 0 ) {883 && evtCtrl.evtStat[k0] < 92 ) { 870 884 gj.bufNew++ ; //incomplete event in Buffer 871 872 if (evtCtrl.pcTime[k0] < g_actTime-10 ) {885 if ( evtCtrl.evtStat[k0] < 90 886 && evtCtrl.pcTime[k0] < g_actTime-10 ) { 873 887 int id =evtCtrl.evtBuf[k0] ; 874 888 snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]); … … 881 895 } else if (evtCtrl.evtStat[k0] >= 900 ) { 882 896 883 884 885 886 887 888 889 890 } else if (evtCtrl.evtStat[k0] >= 9 00) {897 int id =evtCtrl.evtBuf[k0] ; 898 snprintf(str,MXSTR,"%5d free event buffer (written) %3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ; 899 factOut(kDebug,-1, str ) ; 900 mBufFree(id) ; //event written--> free memory 901 evtCtrl.evtStat[k0] = -1; 902 gj.evtWrite++ ; 903 gj.rateWrite++ ; 904 } else if (evtCtrl.evtStat[k0] >= 95 ) { 891 905 gj.bufEvt++ ; //complete event in Buffer 892 906 } … … 901 915 902 916 int b; 903 for ( b=0; b<NBOARDS; b++) gj.totBytes[ k] +=gj.rateBytes[b] ;917 for ( b=0; b<NBOARDS; b++) gj.totBytes[b] +=gj.rateBytes[b] ; 904 918 gj.totMem = g_maxMem ; 905 919 if (gj.maxMem >= gj.xxxMem) gj.xxxMem = gj.maxMem ; … … 911 925 for ( b=0; b<NBOARDS; b++) gj.rateBytes[b] =0 ; 912 926 } 913 914 915 916 927 917 928 if (numok > 0 ) numok2=0; … … 929 940 } //and do next loop over all sockets ... 930 941 931 //must quit eventbuilding 932 snprintf(str,MXSTR,"stop reading ... ");942 943 snprintf(str,MXSTR,"stop reading ... RESET=%d",g_reset); 933 944 factOut(kInfo,-1, str ) ; 934 945 935 //flag all events as 'read finished' 936 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 937 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 938 939 int k1=evtCtrl.frstPtr; 940 941 for ( k=k1; k<(k1+kd); k++ ) { 942 int k0 = k % (MAX_EVT*MAX_RUN) ; 943 if (evtCtrl.evtStat[k0] > 0 944 && evtCtrl.evtStat[k0] < 90 ) { 945 evtCtrl.evtStat[k0] = 91 ; 946 gi.evtSkp++ ; 947 gi.evtTot++ ; 946 if (g_reset >0 ) { 947 reset = g_reset ; 948 resetR = reset%10 ; //shall we stop reading ? 949 resetS = (reset/10)%10 ; //shall we close sockets ? 950 resetW = (reset/100)%10 ; //shall we close files ? 951 resetX = reset/1000 ; //shall we simply wait resetX seconds ? 952 g_reset= 0; 953 } else { 954 reset = 0; 955 if ( g_runStat==-1 ) resetR = 1 ; 956 else resetR = 7 ; 957 resetS = 7 ; //close all sockets 958 resetW = 7 ; //close all files 959 resetX = 0 ; 960 961 //inform others we have to quit .... 962 gi_runStat = -11 ; //inform all that no update to happen any more 963 gj.readStat= -11 ; //inform all that no update to happen any more 964 } 965 966 if (resetS > 0) { 967 //must close all open sockets ... 968 snprintf(str,MXSTR,"close all sockets ..."); 969 factOut(kInfo,-1, str ) ; 970 for (i=0; i<MAX_SOCK; i++) { 971 if (rd[i].sockStat ==0 ) { 972 GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy socket 973 gi_NumConnect[ i/7 ]-- ; 974 gi.numConn[ i/7 ]-- ; 975 gj.numConn[ i/7 ]-- ; 976 } 948 977 } 949 978 } 950 979 951 //must close all open sockets ... 952 snprintf(str,MXSTR,"close all sockets ..."); 953 factOut(kInfo,-1, str ) ; 954 for (i=0; i<MAX_SOCK; i++) { 955 if (rd[i].sockStat ==0 ) { 956 GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy socket 957 gi_NumConnect[ i/7 ]-- ; 958 gi.numConn[ i/7 ]-- ; 959 gj.numConn[ i/7 ]-- ; 960 } 961 } 962 963 xwait.tv_sec = 0; 964 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 965 nanosleep( &xwait , NULL ) ; 966 gi_runStat = -11 ; //inform all that no update to happen any more 967 gj.readStat= -11 ; //inform all that no update to happen any more 968 969 970 int minclear = 900 ; //usually wait until writing finished (stat 900) 971 if (g_runStat <-1 ) minclear = 0 ; //in case of abort clear all 972 973 974 //and clear all buffers (might have to wait until all others are done) 975 snprintf(str,MXSTR,"clear all buffers ..."); 976 factOut(kInfo,-1, str ) ; 977 int numclear=1 ; 978 while (numclear > 0 ) { 979 numclear = 0 ; 980 981 if (resetR > 0) { 982 //flag all events as 'read finished' 980 983 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 981 984 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; … … 984 987 for ( k=k1; k<(k1+kd); k++ ) { 985 988 int k0 = k % (MAX_EVT*MAX_RUN) ; 986 if (evtCtrl.evtStat[k0] > minclear ) { 987 int id =evtCtrl.evtBuf[k0] ; 988 mBufFree(id) ; //event written--> free memory 989 evtCtrl.evtStat[k0] = -1; 990 } else if (evtCtrl.evtStat[k0] > 0) numclear++ ; //writing is still ongoing... 991 992 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) 993 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ; 989 if (evtCtrl.evtStat[k0] > 0 990 && evtCtrl.evtStat[k0] < 90 ) { 991 evtCtrl.evtStat[k0] = 91 ; 992 gi.evtSkp++ ; 993 gi.evtTot++ ; 994 } 994 995 } 995 996 … … 997 998 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 998 999 nanosleep( &xwait , NULL ) ; 1000 1001 //and clear all buffers (might have to wait until all others are done) 1002 int minclear ; 1003 if (resetR == 1) { 1004 minclear = 900 ; 1005 snprintf(str,MXSTR,"drain all buffers ..."); 1006 } else { 1007 minclear = 0 ; 1008 snprintf(str,MXSTR,"flush all buffers ..."); 1009 } 1010 factOut(kInfo,-1, str ) ; 1011 1012 int numclear=1 ; 1013 while (numclear > 0 ) { 1014 numclear = 0 ; 1015 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 1016 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 1017 1018 int k1=evtCtrl.frstPtr; 1019 for ( k=k1; k<(k1+kd); k++ ) { 1020 int k0 = k % (MAX_EVT*MAX_RUN) ; 1021 if (evtCtrl.evtStat[k0] > minclear ) { 1022 int id =evtCtrl.evtBuf[k0] ; 1023 mBufFree(id) ; //event written--> free memory 1024 evtCtrl.evtStat[k0] = -1; 1025 } else if (evtCtrl.evtStat[k0] > 0) numclear++ ; //writing is still ongoing... 1026 1027 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) 1028 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ; 1029 } 1030 1031 xwait.tv_sec = 0; 1032 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 1033 nanosleep( &xwait , NULL ) ; 1034 } 999 1035 } 1036 1037 if (reset > 0) { 1038 if (resetW > 0) { 1039 CloseRunFile(0,0,0) ; //ask all Runs to be closed 1040 } 1041 if (resetX > 0) { 1042 xwait.tv_sec = resetX; 1043 xwait.tv_nsec= 0 ; 1044 nanosleep( &xwait , NULL ) ; 1045 } 1046 1047 snprintf(str,MXSTR,"Continue read Process ..."); 1048 factOut(kInfo,-1, str ) ; 1049 reset = 0 ; 1050 goto START ; 1051 } 1052 1053 1000 1054 1001 1055 snprintf(str,MXSTR,"Exit read Process ..."); … … 1314 1368 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ; 1315 1369 int ii =0 ; 1316 if ( i < 0 ) ii=1 ; 1317 else if (runCtrl[j].closeTime < g_actTime ) ii=2 ; 1370 if (runCtrl[j].closeTime < g_actTime ) ii=2 ; 1318 1371 else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ; 1319 1372 else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ; … … 1363 1416 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) ); 1364 1417 int ii =0 ; 1365 if ( i < 0 ) ii=1 ; 1366 else if (runCtrl[j].closeTime < g_actTime ) ii=2 ; 1418 if (runCtrl[j].closeTime < g_actTime ) ii=2 ; 1367 1419 else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ; 1368 1420 else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ; … … 1397 1449 pthread_t thread[50] ; 1398 1450 struct timespec xwait ; 1399 uint32_t actime ;1400 1451 1401 1452 gi_runStat = gp_runStat = gw_runStat = 0 ; 1402 1453 gj.readStat= gj.procStat= gj.writStat= 0 ; 1403 1454 1404 1405 1455 snprintf(str,MXSTR,"Starting EventBuilder"); 1406 1456 factOut(kInfo,-1, str ) ; 1407 1408 1409 evtCtrl.frstPtr = 0 ;1410 evtCtrl.lastPtr = 0 ;1411 1412 actime = g_actTime + 50000000 ;1413 /* initialize run control logics */1414 for (i=0; i<MAX_RUN; i++) {1415 runCtrl[i].runId = 0 ;1416 runCtrl[i].fileId = -2 ;1417 }1418 1419 1457 1420 1458 //start all threads (more to come) when we are allowed to .... 1421 1459 while (g_runStat == 0 ) { 1422 1460 xwait.tv_sec = 0; 1423 xwait.tv_nsec= 2000000 ; // sleep for ~2msec1461 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec 1424 1462 nanosleep( &xwait , NULL ) ; 1425 1463 } … … 1434 1472 imax=i ; 1435 1473 1474 1436 1475 #ifdef BILAND 1437 xwait.tv_sec = 20;;1438 xwait.tv_nsec= 0 ; // sleep for ~20sec1439 nanosleep( &xwait , NULL ) ;1440 1441 printf("close all runs in 2 seconds\n");1442 1443 CloseRunFile( 0, time(NULL)+2, 0) ;1444 1445 xwait.tv_sec = 5;;1446 xwait.tv_nsec= 0 ; // sleep for ~20sec1447 nanosleep( &xwait , NULL ) ;1448 1449 printf("setting g_runstat to -1\n");1450 1451 g_runStat = -1 ;1476 xwait.tv_sec = 20;; 1477 xwait.tv_nsec= 0 ; // sleep for ~20sec 1478 nanosleep( &xwait , NULL ) ; 1479 1480 printf("close all runs in 2 seconds\n"); 1481 1482 CloseRunFile( 0, time(NULL)+2, 0) ; 1483 1484 xwait.tv_sec = 5;; 1485 xwait.tv_nsec= 0 ; // sleep for ~20sec 1486 nanosleep( &xwait , NULL ) ; 1487 1488 printf("setting g_runstat to -1\n"); 1489 1490 g_runStat = -1 ; 1452 1491 #endif 1453 1492
Note:
See TracChangeset
for help on using the changeset viewer.