Changeset 11748 for trunk


Ignore:
Timestamp:
08/02/11 20:32:38 (13 years ago)
Author:
tbretz
Message:
Latest updates.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilder.c

    r11689 r11748  
    11
    22// // // #define EVTDEBUG
    3 
    4 
    53
    64#include <stdlib.h>
     
    1715#include <sys/types.h>
    1816#include <sys/socket.h>
     17#include <netinet/in.h>
     18#include <netinet/tcp.h>
    1919#include <pthread.h>
    2020#include <sched.h>
     
    4848extern void factStatNew(EVT_STAT gi) ;
    4949
    50 extern int  eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ;
     50extern int  eventCheck( uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event) ;
     51
     52extern int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer) ;
    5153
    5254extern void debugHead(int i, int j, void *buf);
     
    5860int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
    5961
     62
     63int g_maxProc   ;
     64int g_maxSize   ;
     65int gi_maxSize  ;
     66int gi_maxProc  ;
    6067
    6168uint g_actTime   ;
     
    194201
    195202  int j ;
     203  int optval = 1 ;                       //activate keepalive
     204  socklen_t optlen = sizeof(optval);
    196205
    197206  if (rd->sockStat ==0 ) {   //close socket if open
     
    243252     return -2 ;
    244253  }
     254  optval=1;
     255  if ( setsockopt(rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
     256     snprintf(str,MXSTR,"Could not set keepalive %d | %m",sid);
     257     factOut(kInfo,173, str ) ;  //but continue anyhow
     258  }
     259  optval=10; //start after 10 seconds
     260  if ( setsockopt(rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
     261     snprintf(str,MXSTR,"Could not set keepidle %d | %m",sid);
     262     factOut(kInfo,173, str ) ;  //but continue anyhow
     263  }
     264  optval=10; //do every 10 seconds
     265  if ( setsockopt(rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
     266     snprintf(str,MXSTR,"Could not set keepintvl %d | %m",sid);
     267     factOut(kInfo,173, str ) ;  //but continue anyhow
     268  }
     269  optval=2;  //close after 2 unsuccessful tries
     270  if ( setsockopt(rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
     271     snprintf(str,MXSTR,"Could not set keepalive probes %d | %m",sid);
     272     factOut(kInfo,173, str ) ;  //but continue anyhow
     273  }
     274
     275
    245276
    246277  snprintf(str,MXSTR,"Successfully generated socket %d ",sid);
     
    352383//       count for inconsistencies
    353384
    354          if ( mBuffer[i].trgNum != trgNum ) mBuffer[i].Errors++  ;
    355          if ( mBuffer[i].fadNum != fadNum ) mBuffer[i].Errors+=100 ;
    356          if ( mBuffer[i].trgTyp != trgTyp ) mBuffer[i].Errors+=10000 ;
     385         if ( mBuffer[i].trgNum != trgNum ) mBuffer[i].Errors[0]++ ;
     386         if ( mBuffer[i].fadNum != fadNum ) mBuffer[i].Errors[1]++ ;
     387         if ( mBuffer[i].trgTyp != trgTyp ) mBuffer[i].Errors[2]++ ;
    357388
    358389         //everything seems fine so far ==> use this slot ....
     
    428459   headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
    429460
    430    if ( gj.usdMem + needmem + headmem > g_maxMem) {
    431         gj.maxMem = gj.usdMem + needmem + headmem ;
     461   if ( gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
     462        gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize ;
    432463        if (gi_memStat>0 ) {
    433464           gi_memStat = -99 ;
     
    452483        snprintf(str,MXSTR,"malloc data failed for event %d",evID) ;
    453484        factOut(kError,882, str ) ;
     485      free(mBuffer[i].FADhead) ;
     486      mBuffer[i].FADhead = NULL ;
     487      return -22;
     488   }
     489
     490   mBuffer[i].buffer  = malloc( gi_maxSize ) ;
     491   if (mBuffer[i].buffer == NULL) {
     492        snprintf(str,MXSTR,"malloc buffer failed for event %d",evID) ;
     493        factOut(kError,882, str ) ;
     494      free(mBuffer[i].FADhead) ;
     495      mBuffer[i].FADhead = NULL ;
    454496      free(mBuffer[i].fEvent) ;
    455497      mBuffer[i].fEvent = NULL ;
    456       return -22;
     498      return -32;
    457499   }
    458500
     
    485527   mBuffer[i].trgTyp  = trgTyp ;
    486528   mBuffer[i].evtLen  = needmem ;
    487    mBuffer[i].Errors  = 0 ;
    488 
    489    gj.usdMem += needmem + headmem;
     529   mBuffer[i].Errors[0] =
     530   mBuffer[i].Errors[1] =
     531   mBuffer[i].Errors[2] =
     532   mBuffer[i].Errors[3] = 0 ;
     533
     534   gj.usdMem += needmem + headmem + gi_maxSize ;
    490535   if (gj.usdMem > gj.maxMem ) gj.maxMem = gj.usdMem ;
    491536
     
    535580   mBuffer[i].FADhead = NULL ;
    536581
     582   free(mBuffer[i].buffer ) ;
     583   mBuffer[i].buffer = NULL ;
     584
    537585   headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
    538586   mBuffer[i].evNum   = mBuffer[i].nRoi= -1;
    539587   mBuffer[i].runNum  = 0;
    540588
    541    gj.usdMem = gj.usdMem - freemem - headmem;
     589   gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize ;
    542590   gj.bufTot-- ;
    543591
     
    634682     factOut(kWarn,-1, str ) ;
    635683  }
     684
    636685
    637686  head_len = sizeof(PEVNT_HEADER) ;
     
    9881037                   mBuffer[evID].evNum,roi[0],roi[8]-roi[0],qncpy,qnrun);
    9891038                factOut(kDebug,-1, str ) ;
    990 factOut(kInfo,-1, str ) ;
    9911039
    9921040                //complete event read ---> flag for next processing
     
    10351083                 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
    10361084                 rd[i].ftmTyp = ntohl(rd[i].rBuf->S[5]) ;
     1085                 rd[i].ftmID  = ntohl(rd[i].rBuf->I[3]) ; //(FTMevt)
    10371086                 rd[i].evtID  = ntohl(rd[i].rBuf->I[4]) ; //(FADevt)
    1038                  rd[i].ftmID  = ntohl(rd[i].rBuf->I[5]) ; //(FTMevt)
    10391087                 rd[i].runID  = ntohl(rd[i].rBuf->I[11]) ;
    10401088                 rd[i].bufTyp = 1 ;       //ready to read full record
     
    11071155                gj.evtSkip++;
    11081156             }
    1109           } else if (evtCtrl.evtStat[k0] >= 900     //'delete'
     1157          } else if (evtCtrl.evtStat[k0] >= 9000    //'delete'
    11101158                  || evtCtrl.evtStat[k0] == 0 ) {   //'useless'
    11111159
     
    12871335
    12881336
     1337void *subProc( void *thrid ) {
     1338  int threadID,status,numWait,numProc,kd,k1,k0,k,jret;
     1339  struct timespec xwait ;
     1340
     1341  threadID= (int) thrid;
     1342
     1343  snprintf(str,MXSTR,"Starting sub-process-thread %d",threadID);
     1344  factOut(kInfo,-1, str ) ;
     1345
     1346  while (g_runStat > -2) {   //in case of 'exit' we still must process pending events
     1347     numWait = numProc = 0 ;
     1348     int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
     1349     if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
     1350
     1351     int k1=evtCtrl.frstPtr;
     1352     for ( k=k1; k<(k1+kd); k++ ) {
     1353        int k0 = k % (MAX_EVT*MAX_RUN) ;
     1354
     1355        if (evtCtrl.evtStat[k0] ==1000+threadID) {
     1356           if ( gi_resetR > 1 ) {    //we are asked to flush buffers asap
     1357              jret= 9100 ;     //flag to be deleted
     1358           } else {
     1359              int id = evtCtrl.evtBuf[k0] ;
     1360              jret=subProcEvt(threadID, mBuffer[id].FADhead, mBuffer[id].fEvent, mBuffer[id].buffer) ;
     1361              if (jret <= threadID) {
     1362                 snprintf(str,MXSTR,"process %d wants to send to process %d",threadID,jret) ;
     1363                 factOut(kError,-1, str ) ;
     1364                 jret = 5300;
     1365              } else if ( jret <=0 )          jret = 9200+threadID ;  //flag as 'to be deleted'
     1366                else if ( jret >=gi_maxProc ) jret = 5200+threadID ;  //flag as 'to be written'
     1367                else                          jret = 1000+jret ;      //flag for next proces
     1368           }
     1369           evtCtrl.evtStat[k0] = jret ;
     1370           numProc++ ;
     1371        } else if (evtCtrl.evtStat[k0] <1000+threadID) numWait++ ;
     1372     }
     1373
     1374     if ( gj.readStat < -10 && numWait == 0) {  //nothing left to do
     1375        snprintf(str,MXSTR,"Exit subProcessing Process %d",threadID);
     1376        factOut(kInfo,-1, str ) ;
     1377        return 0 ;
     1378     }
     1379     if (numProc == 0) {
     1380        //seems we have nothing to do, so sleep a little
     1381        xwait.tv_sec = 0;
     1382        xwait.tv_nsec= 2000000 ;  // sleep for ~2 msec
     1383        nanosleep( &xwait , NULL ) ;
     1384     }
     1385  }
     1386
     1387  snprintf(str,MXSTR,"Ending sub-process-thread %d",threadID);
     1388  factOut(kInfo,-1, str ) ;
     1389  return ;
     1390} /*-----------------------------------------------------------------*/
     1391
     1392
    12891393void *procEvt( void *ptr ) {
    12901394/* *** main loop processing file, including SW-trigger */
    12911395  int numProc, numWait ;
    1292   int k ;
     1396  int k, status ;
    12931397  struct timespec xwait ;
    12941398  char str[MXSTR] ;
    12951399
     1400
     1401
     1402
    12961403  cpu_set_t mask;
    1297   int cpu = 5 ;   //process thread  (will be several in final version)
    1298 
    1299   snprintf(str,MXSTR,"Starting process-thread");
     1404  int cpu = 1 ;   //process thread  (will be several in final version)
     1405
     1406  snprintf(str,MXSTR,"Starting process-thread with %d subprocess",gi_maxProc);
    13001407  factOut(kInfo,-1, str ) ;
    13011408
     
    13031410   CPU_ZERO( &mask );
    13041411/* CPU_SET sets only the bit corresponding to cpu. */
    1305    CPU_SET( cpu, &mask );
     1412// CPU_SET(  0 , &mask );  leave for system
     1413// CPU_SET(  1 , &mask );  used by write process
     1414   CPU_SET(  2 , &mask );
     1415   CPU_SET(  3 , &mask );
     1416   CPU_SET(  4 , &mask );
     1417   CPU_SET(  5 , &mask );
     1418   CPU_SET(  6 , &mask );
     1419// CPU_SET(  7 , &mask );  used by read process
    13061420/* sched_setaffinity returns 0 in success */
    13071421   if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
     
    13111425
    13121426
     1427  pthread_t thread[100] ;
     1428  int th_ret[100];
     1429
     1430  for (k=0; k < gi_maxProc; k++) {
     1431     th_ret[k] = pthread_create( &thread[k], NULL, subProc,  (void *)k );
     1432  }
     1433
    13131434  while (g_runStat > -2) {   //in case of 'exit' we still must process pending events
    13141435
     
    13211442        int k0 = k % (MAX_EVT*MAX_RUN) ;
    13221443//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
    1323         if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
     1444        if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <1000) {
    13241445
    13251446         if ( gi_resetR > 1 ) {    //we are asked to flush buffers asap
    1326            evtCtrl.evtStat[k0] = 991 ;
     1447           evtCtrl.evtStat[k0] = 9991 ;
    13271448         } else {
    13281449
     
    13331454           int      roi  = mBuffer[id].nRoi ;
    13341455           int      roiTM= mBuffer[id].nRoiTM ;
    1335            int      Errors=mBuffer[id].Errors ;
    13361456//         uint32_t irun = mBuffer[id].runNum ;
    13371457//snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ;
     
    13611481           mBuffer[id].fEvent->TriggerNum = itevt ;
    13621482           mBuffer[id].fEvent->TriggerType = itrg ;
    1363            mBuffer[id].fEvent->Errors = Errors ;
     1483           mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0] ;
     1484           mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1] ;
     1485           mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2] ;
     1486           mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3] ;
    13641487           mBuffer[id].fEvent->SoftTrig = 0 ;
    13651488
     
    13751498           }
    13761499
    1377            int i=eventCheck(mBuffer[id].FADhead,mBuffer[id].fEvent) ;
     1500           int i=eventCheck(mBuffer[id].runNum, mBuffer[id].FADhead,
     1501               mBuffer[id].fEvent) ;
    13781502//         gj.procEvt++ ;
    13791503           gi.procTot++ ;
     
    13811505           
    13821506           if (i<0) {
    1383               evtCtrl.evtStat[k0] = 999 ; //flag event to be skipped
     1507              evtCtrl.evtStat[k0] = 9999 ; //flag event to be skipped
    13841508              gi.procErr++ ;
    13851509           } else {
    1386               evtCtrl.evtStat[k0] = 520 ;
     1510              evtCtrl.evtStat[k0] = 1000 ;
    13871511           }
    13881512         }
     
    14191543  if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
    14201544
     1545  for (k=0; k<gi_maxProc; k++) {
     1546     pthread_join( thread[k], (void **)&status) ;
     1547  }
     1548
    14211549  int k1=evtCtrl.frstPtr;
    14221550  for ( k=k1; k<(k1+kd); k++ ) {
    14231551     int k0 = k % (MAX_EVT*MAX_RUN) ;
    1424      if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <500) {
    1425         evtCtrl.evtStat[k0] = 555 ; //flag event as 'processed'
     1552     if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <1000) {
     1553        evtCtrl.evtStat[k0] = 9800 ; //flag event as 'processed'
    14261554     }
    14271555  }
     
    14791607
    14801608  cpu_set_t mask;
    1481   int cpu = 3 ;   //write thread
     1609  int cpu = 1 ;   //write thread
    14821610
    14831611  snprintf(str,MXSTR,"Starting write-thread");
     
    15051633        int k0 = k % (MAX_EVT*MAX_RUN) ;
    15061634//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
    1507         if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] < 900) {
     1635        if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9900) {
    15081636
    15091637         if ( gi_resetR > 1 ) {        //we must drain the buffer asap
    1510            evtCtrl.evtStat[k0] = 904 ;
     1638           evtCtrl.evtStat[k0] = 9904 ;
    15111639         } else {
    15121640
     
    15721700                 factOut(kDebug,123,str) ;
    15731701              }
    1574               evtCtrl.evtStat[k0] = 903 ;
     1702              evtCtrl.evtStat[k0] = 9903 ;
    15751703              gi.wrtErr++ ;
    15761704           } else {
     
    15791707                 runCtrl[j].lastTime = g_actTime;
    15801708                 runCtrl[j].actEvt++ ;
    1581                  evtCtrl.evtStat[k0] = 901 ;
     1709                 evtCtrl.evtStat[k0] = 9901 ;
    15821710                 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0);
    15831711                 factOut(kDebug,504, str ) ;
     
    15861714                 snprintf(str,MXSTR,"W error writing event for run %d",irun) ;
    15871715                 factOut(kError,503, str ) ;
    1588                  evtCtrl.evtStat[k0] = 902 ;
     1716                 evtCtrl.evtStat[k0] = 9902 ;
    15891717                 gi.wrtErr++ ;
    15901718              }
     
    16181746         }
    16191747        } else if (evtCtrl.evtStat[k0] > 0
    1620                 && evtCtrl.evtStat[k0] < 900 ) numWait++ ;
     1748                && evtCtrl.evtStat[k0] < 9000 ) numWait++ ;
    16211749     }
    16221750
     
    17241852  }
    17251853
     1854//prepare for subProcesses
     1855  gi_maxSize = g_maxSize ;
     1856  if (gi_maxSize <=0 ) gi_maxSize = 1 ;
     1857
     1858  gi_maxProc = g_maxProc ;
     1859  if (gi_maxProc <=0  || gi_maxProc>90) {
     1860     snprintf(str,MXSTR,"illegal number of processes %d",gi_maxProc ) ;
     1861     factOut(kFatal,301, str ) ;
     1862     gi_maxProc=1;
     1863  }
     1864
    17261865//partially initialize event control logics
    17271866  evtCtrl.frstPtr = 0 ;
     
    17911930  /*-----------------------------------------------------------------*/
    17921931
    1793 
    17941932#ifdef BILAND
     1933
     1934int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer)
     1935{
     1936  printf("called subproc %d\n",threadID) ;
     1937  return threadID+1 ;
     1938}
     1939
     1940
    17951941
    17961942
     
    18181964
    18191965
    1820 int  eventCheck( PEVNT_HEADER *fadhd, EVENT *event)
     1966
     1967int  eventCheck( uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
    18211968{
    18221969   int i=0;
     
    18882035  g_maxMem = g_maxMem *  200;      //100MBytes
    18892036
     2037  g_maxProc = 20  ;
     2038  g_maxSize = 30000  ;
    18902039
    18912040  g_runStat = 40 ;
Note: See TracChangeset for help on using the changeset viewer.