Changeset 11134


Ignore:
Timestamp:
06/23/11 15:54:33 (13 years ago)
Author:
tbretz
Message:
Latest Update.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilder.c

    r11096 r11134  
    4040extern int  runWrite(FileHandle_t fileHd ,  EVENT    *event, size_t len ) ;
    4141extern int  runClose(FileHandle_t fileHd ,  RUN_TAIL *runth, size_t len ) ;
    42 extern void factOut(int severity, int err, const char* message ) ;
    43 extern void factStat(int severity, int err, const char* message ) ;
     42extern void factOut(int severity, int err, char* message ) ;
     43extern void factStat(int64_t *array , int len ) ;
    4444
    4545extern int  eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ;
     46
     47
     48extern void debugHead(int i, void *buf);
    4649
    4750extern void debugRead(int isock, int ibyte, int32_t event, int state,
     
    118121  int32_t  bufPos ;          //next byte to read to the buffer next
    119122  int32_t  bufLen ;          //number of bytes left to read
     123  int32_t  skip ;            //number of bytes skipped before start of event
    120124
    121125  int  sockStat   ;      //-1 if socket not yet connected  , 99 if not exist
     
    271275      snprintf(str,MXSTR,"illegal nRoi: %d",nRoi) ;
    272276      factOut(kError, 1, str ) ;
    273       return 99 ;
     277      return -9999 ;
    274278   }
    275279
     
    289293   //event does not yet exist; create
    290294   if (evFree < 0 ) {        //no space available in ctrl
    291         snprintf(str,MXSTR,"no control slot to keep event...") ;
     295        snprintf(str,MXSTR,"no control slot to keep event %d",evID) ;
    292296        factOut(kError,881, str ) ;
    293297        return -1 ;
     
    301305
    302306   if ( gi_usedMem + needmem + headmem > g_maxMem) {
    303         snprintf(str,MXSTR,"no memory left to keep event...") ;
     307        snprintf(str,MXSTR,"no memory left to keep event %d",evID) ;
    304308        factOut(kError,882, str ) ;
    305         return -1 ;
     309        return -11 ;
    306310   }
    307311
    308312   mBuffer[i].FADhead = malloc( headmem ) ;
    309313   if (mBuffer[i].FADhead == NULL) {
     314        snprintf(str,MXSTR,"malloc header failed for event %d",evID) ;
     315        factOut(kError,882, str ) ;
    310316      return -12;
    311317   }
     
    313319   mBuffer[i].fEvent  = malloc( needmem ) ;
    314320   if (mBuffer[i].fEvent  == NULL) {
     321        snprintf(str,MXSTR,"malloc data failed for event %d",evID) ;
     322        factOut(kError,882, str ) ;
    315323      free(mBuffer[i].fEvent) ;
    316324      mBuffer[i].fEvent = NULL ;
     
    367375   if (evFree <0 ) {
    368376      snprintf(str,MXSTR,"not able to register the new run %d",runID);
    369       factOut(kError,883, str ) ;
     377      factOut(kFatal,883, str ) ;
    370378   } else {
    371379      runCtrl[evFree].runId = runID ;
     
    416424void *readFAD( void *ptr ) {
    417425/* *** main loop reading FAD data and sorting them to complete events */
    418   int head_len,frst_len,numok,numok2,dest,evID,i,j,k ;
    419   int actBoards = 0;
     426  int head_len,frst_len,numok,numok2,dest,evID,i,k ;
     427  int actBoards = 0, minLen ;
    420428  int32_t jrd ;
    421429  int32_t myRun ;
     
    484492  head_len = sizeof(PEVNT_HEADER) ;
    485493//frst_len = head_len + 36 * 12 ;   //fad_header plus 36*pix_header
    486   frst_len = head_len ;             //fad_header only, so each event must be longer, even for roi=0
    487 
     494  frst_len = head_len ;   //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
     495
     496//minLen   = MIN_LEN  ;   //min #bytes needed to check header
     497  minLen   = head_len ;   //min #bytes needed to check header: full header for debug
    488498
    489499  numok = numok2   = 0 ;
     
    540550    for (i=0; i<MAX_SOCK; i++) {         //check all sockets if something to read
    541551      b = i / 7 ;
     552      if (sockDef[b] > 0) s0=+1 ;
     553      else                s0=-1 ;
    542554
    543555      gettimeofday( tv, NULL);
     
    553565             rd[i].bufLen = frst_len ;     //  max size to read at begining
    554566          } else {
    555              rd[i].bufTyp = -1 ;           //  data to be skipped
     567             rd[i].bufTyp = -1 ;           //  full data to be skipped
    556568             rd[i].bufLen = sizeof(CNV_FACT) ; //huge for skipping
    557569          }
    558570          rd[i].bufPos = 0 ;            //  no byte read so far
     571          rd[i].skip   = 0 ;            //  start empty
    559572          gi_NumConnect[ b ]++ ;
    560573          numok++ ;                     //make sure next round will execute
     
    564577      }
    565578
    566       if (rd[i].sockStat ==0) {     //we have a connection ==> try to read
    567         numok++ ;
    568         sokCnt[i]++;
    569         jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
    570 
    571         if (jrd >0 ) {
    572            qread+=jrd ;
    573            debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ;
    574         }
    575 
    576         if (jrd == 0) {                 //connection has closed ...
    577            snprintf(str,MXSTR,"Socket %d closed by FAD",i);
    578            factOut(kInfo,441, str ) ;
    579            GenSock(1, i, 0,NULL, &rd[i]) ;
    580            gi_ErrCnt[i]++ ;
    581            gi_NumConnect[ b ]-- ;
    582 
    583         } else if ( jrd<0 ) {           //did not read anything
    584            if (errno != EAGAIN && errno != EWOULDBLOCK ) {
    585               snprintf(str,MXSTR,"Error Reading from %d | %m",i);
    586               factOut(kError,442, str ) ;
    587               gi_ErrCnt[i]++ ;
    588            } else  numok-- ;            //else nothing waiting to be read
    589 
    590         } else if ( rd[i].bufTyp <0 ) { // we are skipping this board ...
     579      if (rd[i].sockStat ==0 ) {     //we have a connection ==> try to read
     580        if (rd[i].bufLen > 0) {      //might be nothing to read [buffer full]
     581          numok++ ;
     582          sokCnt[i]++;
     583          jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
     584
     585          if (jrd >0 ) {
     586             qread+=jrd ;
     587             debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ;
     588          }
     589
     590          if (jrd == 0) {                 //connection has closed ...
     591             snprintf(str,MXSTR,"Socket %d closed by FAD",i);
     592             factOut(kInfo,441, str ) ;
     593             GenSock(s0, i, 0,NULL, &rd[i]) ;
     594             gi_ErrCnt[i]++ ;
     595             gi_NumConnect[ b ]-- ;
     596
     597          } else if ( jrd<0 ) {           //did not read anything
     598             if (errno != EAGAIN && errno != EWOULDBLOCK ) {
     599                snprintf(str,MXSTR,"Error Reading from %d | %m",i);
     600                factOut(kError,442, str ) ;
     601                gi_ErrCnt[i]++ ;
     602             } else  numok-- ;            //else nothing waiting to be read
     603             jrd = 0 ;
     604          }
     605        } else jrd = 0 ;                //did read nothing as requested
     606
     607        if ( rd[i].bufTyp <0 ) { // we are skipping this board ...
    591608//         just do nothing
    592609
     
    597614             debugRead(i,jrd,rd[i].evtID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
    598615           } else {                     //full dataset read
    599              rd[i].bufLen  = rd[i].bufPos + j ;
     616             rd[i].bufLen  = 0 ;
    600617             rd[i].bufPos  = rd[i].fadLen ;
    601618             if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0]
     
    616633             evID = mBufEvt( rd[i].evtID, rd[i].runID, roi ) ;
    617634
     635             if (evID <-9000) goto EndBuf ;  //illegal event, skip it ...
    618636             if (evID < 0) {
    619                 snprintf(str,MXSTR,"no space left ...%d",evID) ;
    620                 factOut(kError,201, str ) ;
    621                 goto EndBuf ; //--> skip event (and hope it will improve)
     637                xwait.tv_sec = 0;
     638                xwait.tv_nsec= 20000000 ;  // sleep for ~20 msec
     639                nanosleep( &xwait , NULL ) ;
     640                goto EndBuf1 ; //hope there is free space next round
    622641             }
     642
    623643
    624644             //we have a valid entry in mBuffer[]; fill it
     
    704724             rd[i].bufLen = frst_len ;
    705725             rd[i].bufPos = 0 ;         
     726EndBuf1:
     727             ;
    706728           }
    707729
     
    709731           rd[i].bufPos += jrd ;
    710732           rd[i].bufLen -= jrd ;
    711            if ( rd[i].bufPos > MIN_LEN ){ //sufficient data to take action
     733           if ( rd[i].bufPos >= minLen ){ //sufficient data to take action
    712734              //check if startflag correct; else shift block ....
    713735              for (k=0; k<rd[i].bufPos -1 ; k++) {
     
    715737                  && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
    716738              }
    717 
    718 
    719               for (k=0; k<rd[i].bufPos -1 ; k++) {
    720                  if (rd[i].rBuf->B[k  ] == start.B[1]
    721                   && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
    722               }
     739              rd[i].skip += k ;
    723740
    724741              if (k >= rd[i].bufPos-1 ) {   //no start of header found
    725                  snprintf(str,MXSTR,"no start of header on port%d", i ) ;
    726                  factOut(kWarn,666, str ) ;
     742//                 snprintf(str,MXSTR,"no start of header on port%d", i ) ;
     743//                 factOut(kWarn,666, str ) ;
    727744
    728745                 rd[i].bufPos = 0 ;
     
    733750                 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ;
    734751              }
    735               if ( rd[i].bufPos > MIN_LEN ) {
     752              if ( rd[i].bufPos >= minLen ) {
     753                 if ( rd[i].skip >0 ) {
     754                    snprintf(str,MXSTR,"skipped %d bytes on port%d", rd[i].skip, i ) ;
     755                    factOut(kInfo,666, str ) ;
     756                    rd[i].skip = 0 ;
     757                 }
    736758                 goodhed++;
    737759                 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ;
     
    739761                 rd[i].evtID  = ntohl(rd[i].rBuf->I[4]) ; //(FADevt)
    740762                 rd[i].runID  = ntohl(rd[i].rBuf->I[11]) ;
    741 if (rd[i].runID ==0 ) rd[i].runID = myRun ;
     763                 if (rd[i].runID ==0 ) rd[i].runID = myRun ;
    742764                 rd[i].bufTyp = 1 ;       //ready to read full record
    743765                 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
    744766                 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ;    //?
     767                 debugHead(i,rd[i].rBuf);
    745768                 debugRead(i,jrd,rd[i].evtID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event
    746769              } else {
     
    759782    if ( g_actTime > gi_SecTime ) {
    760783         gi_SecTime = g_actTime ;
    761 //       PrintRate() ;
    762784
    763785
     
    807829int ib ;
    808830for (ib=0; ib<NBOARDS; ib++) qconn+=gi_NumConnect[ib] ;
    809 
    810 snprintf(str,MXSTR,"bfr%5d skp%4d free%4d (tot%7d) mem%9lu rd%10d %3d",qwait,qskip,qdel,qtot,gi_usedMem,qread,qconn);
    811 factStat(kInfo,-1, str ) ;
     831int64_t stat[9] ;
     832        stat[0]= qwait;
     833        stat[1]= qskip;
     834        stat[2]= qdel ;
     835        stat[3]= qtot ;
     836        stat[4]= gi_usedMem ;
     837        stat[5]= qread;
     838        stat[6]= qconn;
     839
     840factStat(stat,7);
    812841qread=0 ;
    813842    }
     
    855884 for (i=0; i<MAX_SOCK; i++) { 
    856885    GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy socket   
    857     if (gi_NumConnect[ i/7 ]>0)
    858         gi_NumConnect[ i/7 ]-- ;
     886    gi_NumConnect[ i/7 ]-- ;
    859887 }
    860888
     
    9731001              } else {
    9741002                 mBuffer[id].fEvent->BoardTime[ib] =
    975                  ntohl(mBuffer[id].FADhead[ib].time) ;
     1003                   ntohl(mBuffer[id].FADhead[ib].time) ;
    9761004              }
    9771005           }
     
    13641392
    13651393 
    1366 
    13671394/*
     1395
    13681396FileHandle_t  runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len )
    13691397{ return 1; } ;
     
    13891417
    13901418
    1391 void factStat(int severity, int err, char* message ) {
    1392   printf("%3d %3d : %s\n",severity,err,message) ;
     1419void factStat(int64_t *array, int len ) {
     1420  printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
     1421    array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
    13931422}
    13941423
     
    14011430
    14021431void debugStream(int isock, void *buf, int len) {
     1432}
     1433
     1434void debugHead(int i, void *buf) {
    14031435}
    14041436
     
    14501482//}
    14511483//
    1452 //version for PC-test
     1484//version for PC-test 
    14531485  for (c=0; c<4; c++) {
    14541486     for (b=0; b<10; b++) {
     
    14781510
    14791511}
    1480 */
     1512*/ 
Note: See TracChangeset for help on using the changeset viewer.