Changeset 11335


Ignore:
Timestamp:
07/11/11 11:08:16 (13 years ago)
Author:
tbretz
Message:
Added g_reset and some other updates and minor fixes.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilder.c

    r11323 r11335  
    5454extern void debugStream(int isock, void *buf, int len) ;
    5555
    56 
    57 
    58 int g_actTime   ;
    59 int g_runStat   ;
     56int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
     57
     58
     59uint g_actTime   ;
     60int  g_runStat   ;
     61int  g_reset     ;
    6062size_t g_maxMem  ;  //maximum memory allowed for buffer
    6163
     
    251253      evtCtrl.evtStat[ i] = -1 ;
    252254      evtCtrl.pcTime[  i] = actime ;  //initiate to far future
    253 
    254255   }
    255256
     
    488489  int actBoards = 0, minLen ;
    489490  int32_t jrd ;
     491  uint gi_SecTime ;        //time in seconds
    490492  int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ;
    491493
    492494  int goodhed = 0 ;
    493495  int errcnt0 = 0 ;
     496
     497  int reset, resetR, resetS, resetW, resetX ;
     498  int sockDef[NBOARDS];    //internal state of sockets
     499
     500
    494501
    495502  struct timespec xwait ;
     
    508515
    509516/* CPU_ZERO initializes all the bits in the mask to zero. */
    510    CPU_ZERO( &mask );
     517  CPU_ZERO( &mask );
    511518/* CPU_SET sets only the bit corresponding to cpu. */
    512    cpu = 7 ;
    513    CPU_SET( cpu, &mask );
     519  cpu = 7 ;
     520  CPU_SET( cpu, &mask );
    514521
    515522/* sched_setaffinity returns 0 in success */
    516    if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
    517       snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
    518       factOut(kWarn,-1, str ) ;
    519    }
    520 
    521 
    522   //make sure all sockets are preallocated as 'not exist'
    523   for (i=0; i<MAX_SOCK; i++) {
    524      rd[i].socket   = -1 ;
    525      rd[i].sockStat = 99 ;
     523  if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
     524     snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
     525     factOut(kWarn,-1, str ) ;
    526526  }
    527527
    528   int sockDef[NBOARDS];    //internal state of sockets
    529   for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ;
    530 
    531 
    532   resetEvtStat();
    533   gj.usdMem = gj.maxMem = gj.xxxMem = 0 ;
    534   gj.totMem = g_maxMem ;
    535   gj.bufNew = gj.bufEvt = 0 ;
    536   gj.evtSkip= gj.evtWrite = gj.evtErr = 0 ;
    537 
    538   for (k=0; k<NBOARDS; k++) {
    539      gi_NumConnect[k]=0;
    540      gi.numConn[k]   =0;
    541      gj.numConn[k]   =0;
    542      gj.errConn[k]   =0;
    543      gj.rateBytes[k] =0;
    544      gj.totBytes[k]  =0;
    545   }
    546 
    547 
    548   uint gi_SecTime ;        //time in seconds
    549   gi_SecTime= g_actTime ;
    550 
    551   mBufInit() ;    //initialize buffers
    552 
    553   snprintf(str,MXSTR,"end   initializing");
    554   factOut(kInfo,-1, str ) ;
    555 
    556 
    557 
    558528  head_len = sizeof(PEVNT_HEADER) ;
    559 //frst_len = head_len + 36 * 12 ;   //fad_header plus 36*pix_header
    560529  frst_len = head_len ;   //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
    561 
    562 //minLen   = MIN_LEN  ;   //min #bytes needed to check header
    563530  minLen   = head_len ;   //min #bytes needed to check header: full header for debug
    564 
    565   numok = numok2   = 0 ;
    566531
    567532  start.S=0xFB01;
    568533  stop.S= 0x04FE;
    569534
    570   gi_myRun = g_actTime ;
     535/* initialize run control logics */
     536  for (i=0; i<MAX_RUN; i++) {
     537     runCtrl[i].runId = 0 ;
     538     runCtrl[i].fileId = -2 ;
     539  }
     540  resetS = resetR = 9;
     541
     542
     543START:
     544  evtCtrl.frstPtr = 0 ;
     545  evtCtrl.lastPtr = 0 ;
     546
     547  gi_myRun   = g_actTime ;
     548  gi_SecTime = g_actTime ;
    571549  gi_runStat = g_runStat ;
    572550  gj.readStat= g_runStat ;
    573 
    574 
    575   while (g_runStat >=0) {           //loop until global variable g_runStat claims stop
     551  numok = numok2   = 0 ;
     552
     553  if ( resetS > 0) {
     554     //make sure all sockets are preallocated as 'not exist'
     555     for (i=0; i<MAX_SOCK; i++) {
     556        rd[i].socket   = -1 ;
     557        rd[i].sockStat = 99 ;
     558     }
     559
     560     for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ;
     561
     562  }
     563
     564
     565  if ( resetR > 0) {
     566     resetEvtStat();
     567     gj.usdMem = gj.maxMem = gj.xxxMem = 0 ;
     568     gj.totMem = g_maxMem ;
     569     gj.bufNew = gj.bufEvt = 0 ;
     570     gj.evtSkip= gj.evtWrite = gj.evtErr = 0 ;
     571
     572     for (k=0; k<NBOARDS; k++) {
     573        gi_NumConnect[k]=0;
     574        gi.numConn[k]   =0;
     575        gj.numConn[k]   =0;
     576        gj.errConn[k]   =0;
     577        gj.rateBytes[k] =0;
     578        gj.totBytes[k]  =0;
     579     }
     580
     581     mBufInit() ;    //initialize buffers
     582
     583     snprintf(str,MXSTR,"end   initializing");
     584     factOut(kInfo,-1, str ) ;
     585  }
     586
     587
     588  reset = resetR = resetS = resetW = 0 ;
     589
     590  while (g_runStat >=0 || g_reset ==0 ) {  //loop until global variable g_runStat claims stop
    576591
    577592    gi_runStat = g_runStat;
    578593    gj.readStat= g_runStat;
    579 //  if (gi.totMem < 50000000 ) gi.totMem = 500000000 ;
    580594
    581595    int b,p,p0,s0,nch;
     
    867881
    868882          if (evtCtrl.evtStat[k0] > 0
    869            && evtCtrl.evtStat[k0] < 90 ) {
     883           && evtCtrl.evtStat[k0] < 92 ) {   
    870884             gj.bufNew++ ;   //incomplete event in Buffer
    871 
    872              if ( evtCtrl.pcTime[k0] < g_actTime-10 ) {
     885             if ( evtCtrl.evtStat[k0] < 90
     886               && evtCtrl.pcTime[k0] < g_actTime-10 ) {
    873887                int id =evtCtrl.evtBuf[k0] ;
    874888                snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]);
     
    881895          } else if (evtCtrl.evtStat[k0] >= 900 ) {
    882896
    883               int id =evtCtrl.evtBuf[k0] ;
    884               snprintf(str,MXSTR,"%5d free event buffer (written) %3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
    885               factOut(kDebug,-1, str ) ;
    886               mBufFree(id) ;               //event written--> free memory
    887               evtCtrl.evtStat[k0] = -1;
    888               gj.evtWrite++ ;
    889               gj.rateWrite++ ;
    890           } else if (evtCtrl.evtStat[k0] >= 900 ) {
     897             int id =evtCtrl.evtBuf[k0] ;
     898             snprintf(str,MXSTR,"%5d free event buffer (written) %3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
     899             factOut(kDebug,-1, str ) ;
     900             mBufFree(id) ;               //event written--> free memory
     901             evtCtrl.evtStat[k0] = -1;
     902             gj.evtWrite++ ;
     903             gj.rateWrite++ ;
     904          } else if (evtCtrl.evtStat[k0] >= 95 ) {
    891905             gj.bufEvt++ ;   //complete event in Buffer
    892906          }
     
    901915
    902916       int b;
    903        for ( b=0; b<NBOARDS; b++) gj.totBytes[k] +=gj.rateBytes[b] ;
     917       for ( b=0; b<NBOARDS; b++) gj.totBytes[b] +=gj.rateBytes[b] ;
    904918       gj.totMem  = g_maxMem ;
    905919       if (gj.maxMem >= gj.xxxMem) gj.xxxMem = gj.maxMem ;
     
    911925       for ( b=0; b<NBOARDS; b++) gj.rateBytes[b] =0 ;
    912926    }
    913 
    914 
    915 
    916927
    917928    if (numok > 0 ) numok2=0;
     
    929940 } //and do next loop over all sockets ...
    930941
    931  //must quit eventbuilding
    932  snprintf(str,MXSTR,"stop reading ...");
     942
     943 snprintf(str,MXSTR,"stop reading ... RESET=%d",g_reset);
    933944 factOut(kInfo,-1, str ) ;
    934945
    935  //flag all events as 'read finished'
    936  int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
    937  if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
    938 
    939  int k1=evtCtrl.frstPtr;
    940 
    941  for ( k=k1; k<(k1+kd); k++ ) {
    942     int k0 = k % (MAX_EVT*MAX_RUN) ;
    943     if (evtCtrl.evtStat[k0] > 0
    944      && evtCtrl.evtStat[k0] < 90 ) {
    945        evtCtrl.evtStat[k0] = 91 ;   
    946        gi.evtSkp++ ;
    947        gi.evtTot++ ;   
     946 if (g_reset >0 ) {
     947   reset  = g_reset ;
     948   resetR = reset%10 ;        //shall we stop reading ?
     949   resetS = (reset/10)%10 ;   //shall we close sockets ?
     950   resetW = (reset/100)%10 ;  //shall we close files ?
     951   resetX = reset/1000 ;      //shall we simply wait resetX seconds ?
     952   g_reset= 0;
     953 } else {
     954   reset  = 0;
     955   if ( g_runStat==-1 ) resetR = 1 ;
     956   else                 resetR = 7 ;
     957   resetS = 7 ; //close all sockets
     958   resetW = 7 ; //close all files
     959   resetX = 0 ;
     960
     961   //inform others we have to quit ....
     962   gi_runStat = -11 ;  //inform all that no update to happen any more
     963   gj.readStat= -11 ;  //inform all that no update to happen any more
     964 }
     965
     966 if (resetS > 0) {
     967    //must close all open sockets ...
     968    snprintf(str,MXSTR,"close all sockets ...");
     969    factOut(kInfo,-1, str ) ;
     970    for (i=0; i<MAX_SOCK; i++) { 
     971       if (rd[i].sockStat ==0 ) {
     972          GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy socket   
     973          gi_NumConnect[ i/7 ]-- ;
     974          gi.numConn[ i/7 ]-- ;
     975          gj.numConn[ i/7 ]-- ;
     976       }
    948977    }
    949978 }
    950979
    951  //must close all open sockets ...
    952  snprintf(str,MXSTR,"close all sockets ...");
    953  factOut(kInfo,-1, str ) ;
    954  for (i=0; i<MAX_SOCK; i++) { 
    955     if (rd[i].sockStat ==0 ) {
    956        GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy socket   
    957        gi_NumConnect[ i/7 ]-- ;
    958        gi.numConn[ i/7 ]-- ;
    959        gj.numConn[ i/7 ]-- ;
    960     }
    961  }
    962 
    963  xwait.tv_sec = 0;
    964  xwait.tv_nsec= 2000000 ;  // sleep for ~2 msec
    965  nanosleep( &xwait , NULL ) ;
    966  gi_runStat = -11 ;  //inform all that no update to happen any more
    967  gj.readStat= -11 ;  //inform all that no update to happen any more
    968 
    969 
    970  int minclear = 900 ; //usually wait until writing finished (stat 900)
    971  if (g_runStat <-1 ) minclear = 0 ;  //in case of abort clear all
    972 
    973 
    974  //and clear all buffers (might have to wait until all others are done)
    975  snprintf(str,MXSTR,"clear all buffers ...");
    976  factOut(kInfo,-1, str ) ;
    977  int numclear=1 ;
    978  while (numclear > 0 ) {
    979     numclear = 0 ;
     980
     981 if (resetR > 0) {
     982    //flag all events as 'read finished'
    980983    int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
    981984    if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
     
    984987    for ( k=k1; k<(k1+kd); k++ ) {
    985988       int k0 = k % (MAX_EVT*MAX_RUN) ;
    986        if (evtCtrl.evtStat[k0] > minclear ) {
    987          int id =evtCtrl.evtBuf[k0] ;
    988           mBufFree(id) ;               //event written--> free memory
    989           evtCtrl.evtStat[k0] = -1;
    990        } else if (evtCtrl.evtStat[k0] > 0) numclear++ ;  //writing is still ongoing...
    991 
    992        if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 )
    993           evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
     989       if (evtCtrl.evtStat[k0] > 0
     990        && evtCtrl.evtStat[k0] < 90 ) {
     991          evtCtrl.evtStat[k0] = 91 ;   
     992          gi.evtSkp++ ;
     993          gi.evtTot++ ;   
     994       }
    994995    }
    995996
     
    997998    xwait.tv_nsec= 2000000 ;  // sleep for ~2 msec
    998999    nanosleep( &xwait , NULL ) ;
     1000 
     1001    //and clear all buffers (might have to wait until all others are done)
     1002    int minclear ;
     1003    if (resetR == 1) {
     1004       minclear = 900 ;
     1005       snprintf(str,MXSTR,"drain all buffers ...");
     1006    } else {
     1007       minclear =   0 ;
     1008       snprintf(str,MXSTR,"flush all buffers ...");
     1009    }
     1010    factOut(kInfo,-1, str ) ;
     1011
     1012    int numclear=1 ;
     1013    while (numclear > 0 ) {
     1014       numclear = 0 ;
     1015       int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
     1016       if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
     1017
     1018       int k1=evtCtrl.frstPtr;
     1019       for ( k=k1; k<(k1+kd); k++ ) {
     1020          int k0 = k % (MAX_EVT*MAX_RUN) ;
     1021          if (evtCtrl.evtStat[k0] > minclear ) {
     1022            int id =evtCtrl.evtBuf[k0] ;
     1023             mBufFree(id) ;               //event written--> free memory
     1024             evtCtrl.evtStat[k0] = -1;
     1025          } else if (evtCtrl.evtStat[k0] > 0) numclear++ ;  //writing is still ongoing...
     1026
     1027          if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 )
     1028             evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
     1029       }
     1030
     1031       xwait.tv_sec = 0;
     1032       xwait.tv_nsec= 2000000 ;  // sleep for ~2 msec
     1033       nanosleep( &xwait , NULL ) ;
     1034    }
    9991035 }
     1036
     1037 if (reset > 0) {
     1038    if (resetW > 0) {
     1039       CloseRunFile(0,0,0) ; //ask all Runs to be closed
     1040    }
     1041    if (resetX > 0) {
     1042       xwait.tv_sec = resetX;
     1043       xwait.tv_nsec= 0 ;       
     1044       nanosleep( &xwait , NULL ) ;
     1045    }
     1046
     1047    snprintf(str,MXSTR,"Continue read Process ...");
     1048    factOut(kInfo,-1, str ) ;
     1049    reset = 0 ;
     1050    goto START ;
     1051 }
     1052
     1053
    10001054
    10011055 snprintf(str,MXSTR,"Exit read Process ...");
     
    13141368           if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
    13151369int ii =0 ;
    1316 if ( i < 0 ) ii=1 ;
    1317 else if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
     1370     if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
    13181371else if (runCtrl[j].lastTime  < g_actTime-300 ) ii=3 ;
    13191372else if (runCtrl[j].maxEvt    < runCtrl[j].actEvt ) ii=4 ;
     
    13631416        i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
    13641417int ii =0 ;
    1365 if ( i < 0 ) ii=1 ;
    1366 else if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
     1418     if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
    13671419else if (runCtrl[j].lastTime  < g_actTime-300 ) ii=3 ;
    13681420else if (runCtrl[j].maxEvt    < runCtrl[j].actEvt ) ii=4 ;
     
    13971449  pthread_t thread[50] ;
    13981450  struct timespec xwait ;
    1399   uint32_t actime ;
    14001451
    14011452  gi_runStat = gp_runStat = gw_runStat = 0 ;
    14021453  gj.readStat= gj.procStat= gj.writStat= 0 ;
    14031454
    1404 
    14051455  snprintf(str,MXSTR,"Starting EventBuilder");
    14061456  factOut(kInfo,-1, str ) ;
    1407 
    1408 
    1409    evtCtrl.frstPtr = 0 ;
    1410    evtCtrl.lastPtr = 0 ;
    1411 
    1412    actime = g_actTime + 50000000 ;
    1413 /* initialize run control logics */
    1414    for (i=0; i<MAX_RUN; i++) {
    1415       runCtrl[i].runId = 0 ;
    1416       runCtrl[i].fileId = -2 ;
    1417    }
    1418 
    14191457
    14201458//start all threads (more to come) when we are allowed to ....
    14211459  while (g_runStat == 0 ) {
    14221460     xwait.tv_sec = 0;
    1423      xwait.tv_nsec= 2000000 ;  // sleep for ~2 msec
     1461     xwait.tv_nsec= 10000000 ;  // sleep for ~10 msec
    14241462     nanosleep( &xwait , NULL ) ;
    14251463  }
     
    14341472  imax=i ;
    14351473
     1474
    14361475#ifdef BILAND
    1437      xwait.tv_sec = 20;;
    1438      xwait.tv_nsec= 0 ;  // sleep for ~20sec
    1439      nanosleep( &xwait , NULL ) ;
    1440 
    1441      printf("close all runs in 2 seconds\n");
    1442 
    1443      CloseRunFile( 0, time(NULL)+2, 0) ;
    1444 
    1445      xwait.tv_sec = 5;;
    1446      xwait.tv_nsec= 0 ;  // sleep for ~20sec
    1447      nanosleep( &xwait , NULL ) ;
    1448 
    1449      printf("setting g_runstat to -1\n");
    1450 
    1451      g_runStat = -1 ;
     1476         xwait.tv_sec = 20;;
     1477         xwait.tv_nsec= 0 ;  // sleep for ~20sec
     1478         nanosleep( &xwait , NULL ) ;
     1479
     1480         printf("close all runs in 2 seconds\n");
     1481
     1482         CloseRunFile( 0, time(NULL)+2, 0) ;
     1483
     1484         xwait.tv_sec = 5;;
     1485         xwait.tv_nsec= 0 ;  // sleep for ~20sec
     1486         nanosleep( &xwait , NULL ) ;
     1487
     1488         printf("setting g_runstat to -1\n");
     1489
     1490         g_runStat = -1 ;
    14521491#endif
    14531492
Note: See TracChangeset for help on using the changeset viewer.