Ignore:
Timestamp:
06/09/11 21:15:53 (13 years ago)
Author:
tbretz
Message:
Some new version -- now chnagelog informations available.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilder.c

    r10773 r10963  
    11
    22
     3#define PX8   99  //simulator does not create double-length roi for pixel 8 
     4                  //for real data, set PX8 = 8 ==> ask for double roi=TM
    35
    46
     
    68#include <stdlib.h>
    79#include <stdint.h>
     10#include <unistd.h>
    811#include <stdio.h>
    912#include <time.h>
     
    1922#include <sched.h>
    2023
    21 
    2224#include "EventBuilder.h"
    2325
    24 #define ETHTEST   0
     26enum Severity
     27{
     28        kMessage = 10, ///< Just a message, usually obsolete
     29        kInfo    = 20, ///< An info telling something which can be interesting to know
     30        kWarn    = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
     31        kError   = 40, ///< Error, something unexpected happened, but can still be handled by the program
     32        kFatal   = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
     33        kDebug   = 99, ///< A message used for debugging only
     34};
     35
    2536#define MIN_LEN  32        // min #bytes needed to interpret FADheader
    2637#define MAX_LEN  64*1024   // size of read-buffer per socket
    2738   
     39extern FileHandle_t  runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) ;
     40extern int  runWrite(FileHandle_t fileHd ,  EVENT    *event, size_t len ) ;
     41extern int  runClose(FileHandle_t fileHd ,  RUN_TAIL *runth, size_t len ) ;
     42extern void factOut(int severity, int err, char* message ) ;
     43extern void factStat(int severity, int err, char* message ) ;
     44
    2845int g_actTime   =  0 ;
    2946int g_runStat   = 40 ;
    30 int g_actBoards = 20 ;
    31 
     47int g_actBoards = 40 ;
     48size_t g_maxMem  ;  //maximum memory allowed for buffer
     49
     50int     g_maxBoards ;    //maximum number of boards to be initialized
     51FACT_SOCK g_port[NBOARDS] ;  // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
     52
     53
     54int gi_runStat ;
     55int gp_runStat ;
     56int gw_runStat ;
     57
     58
     59int  gi_maxSocks = 0 ; 
    3260uint gi_SecRate[MAX_SOCK] ;
    3361uint gi_S10Rate[MAX_SOCK] ;
     
    4270uint gi_EvtBad  = 0 ;
    4371uint gi_EvtTot  = 0 ;
     72size_t gi_usedMem = 0 ;
     73
    4474uint gw_EvtTot  = 0 ;
    4575uint gp_EvtTot  = 0 ;
    4676
     77PIX_MAP g_pixMap[NPIX] ;
    4778
    4879EVT_CTRL    evtCtrl ;       //control of events during processing
     
    78109  int32_t  bufLen ;          //number of bytes left to read
    79110
    80   int  sockStat   ;      //-1 if socket not yet connected
     111  int  sockStat   ;      //-1 if socket not yet connected  , 99 if not exist
    81112  int  socket     ;      //contains the sockets
    82113  struct sockaddr_in SockAddr ; //IP for each socket
     
    84115  int  evtID  ;          // event ID of event currently read
    85116  int  runID  ;          // run       "
    86 //int  evtPtr ;          // index into evtCtrl structure
    87117  uint  fadLen ;          // FADlength of event currently read
    88118  int  fadVers ;         // Version of FAD
     
    90120  int  Port ;
    91121
    92 //  int8_t *rBuf;          //local buffer to be used when no event defined yet
    93122  CNV_FACT *rBuf ;
    94123
     
    102131
    103132
    104 
    105 struct timespec xwait ;
    106 
     133#define MXSTR 1000
     134char str[MXSTR] ;
    107135
    108136SHORT_BYTE  start, stop;
     
    119147
    120148
    121 int GenSock(int flag, int crate0, int board0, int port0, READ_STRUCT *rd) {
     149int GenSock(int flag, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) {
    122150/*
    123151*** generate Address, create sockets and allocates readbuffer for it
     
    126154 */
    127155
    128   int crate, board, port ;
    129   char IPstr[100] ;
    130   struct in_addr IPaddr ;
    131156
    132157  rd->sockStat = -1 ;
    133 
    134 
    135   crate = crate0;
    136   board = board0;
    137   port  = port0 ;
    138158
    139159
     
    141161     close(rd->socket) ;
    142162     if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
    143         error(1,errno,"Could not generate socket\n");
     163        snprintf(str,MXSTR,"Could not generate socket | %m");
     164        factOut(kFatal,771, str ) ;
    144165        return -2 ;
    145166     }
     
    147168  }
    148169
    149 
    150   if (ETHTEST >0) {
    151      port = port0+100*crate0+10*board0 ;
    152      sprintf(IPstr,"10.0.%d.11",128+crate); // test on fact1
    153 //      if (board==3) sprintf(IPstr,"10.0.100.11");
    154 
    155 //     sprintf(IPstr,"10.0.131.11"); // test on fact1
    156      inet_pton(PF_INET, IPstr, &IPaddr) ;
    157      port = port0+100*crate0+10*board0 ;
    158   } else {
    159 
    160 
    161      sprintf(IPstr,"10.0.%d.%d",128+crate,128+board); // real environment
    162      if ( inet_pton(PF_INET, IPstr, &IPaddr) <=0 ) {
    163         error(1,errno,"Error: bad address c=%d b=%d '%s'\n", crate, board, IPstr);
    164         return -1 ;
    165      }
    166   }
    167 
    168170     rd->Port  = port ;
    169      rd->board = crate0*10+board0 ;
    170      rd->SockAddr.sin_family = PF_INET;
     171     rd->SockAddr.sin_family = sockAddr->sin_family;
    171172     rd->SockAddr.sin_port = htons(port) ;
    172      rd->SockAddr.sin_addr = IPaddr ;
     173     rd->SockAddr.sin_addr = sockAddr->sin_addr ;
    173174
    174175     if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
    175         error(1,errno,"Could not generate socket\n");
     176        snprintf(str,MXSTR,"Could not generate socket | %m");
     177        factOut(kFatal,773, str ) ;
    176178        return -2 ;
    177179     } else {
    178180       rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
    179181       if ( rd->rBuf == NULL ) {
    180           error(1,errno,"Could not create local buffer\n");
     182          snprintf(str,MXSTR,"Could not create local buffer");
     183          factOut(kFatal,774, str ) ;
    181184          return -3 ;
    182185       }
    183186     }
    184 
    185187  return 0 ;
    186188
    187189} /*-----------------------------------------------------------------*/
    188190
    189 
    190 int PrintErr() {
    191 
    192   int k,c,b,p,s ;
    193 
    194 
    195        k=0 ;
    196        printf("Errors:\n");
    197        for (c=0; c<4; c++) {
    198           for (b=0; b<10; b++) {
    199              s=0 ;
    200              printf("c%d b%d: ",c,b);
    201              for (p=1; p<8; p++) {
    202                 printf("%7d",gi_ErrCnt[k]);
    203                 s+=gi_ErrCnt[k];
    204              }
    205              printf("%8d\n",s);
    206           }
    207        }
    208 
    209        return 0;
    210 } /*-----------------------------------------------------------------*/
    211 
    212 
    213 int PrintRate() {
    214 
    215   int k,c,b,p,s ;
    216 
    217 
    218     if (g_actTime > gi_SecTime) {
    219        gi_SecTime = g_actTime ;
    220        printf("Nr Ev start %d compl %d bad %d\n",gi_EvtStart,gi_EvtRead,gi_EvtBad) ;
    221 
    222 
    223        k=0 ;
    224        printf("Rate/Second:\n");
    225        for (c=0; c<4; c++) {
    226           for (b=0; b<10; b++) {
    227              s=0 ;
    228              printf("c%d b%d: ",c,b);
    229              for (p=1; p<8; p++) {
    230                 printf("%7d",gi_SecRate[k]);
    231                 s+=            gi_SecRate[k];
    232                 gi_S10Rate[k]+=gi_SecRate[k];
    233                 gi_SecRate[k++]=0 ;
    234              }
    235              printf("%8d\n",s);
    236           }
    237        }
    238        for (b=0; b<NBOARDS; b++)
    239           printf("%d ",gi_NumConnect[b]) ;
    240        printf("\n");
    241     }
    242 
    243 
    244     if ( g_actTime%10 == 0 && g_actTime > gi_S10Time) {
    245        gi_S10Time = g_actTime ;
    246        k=0 ;
    247        printf("Rate/10Second:\n");
    248        for (c=0; c<4; c++) {
    249           for (b=0; b<10; b++) {
    250              s=0 ;
    251              printf("c%d b%d: ",c,b);
    252              for (p=1; p<8; p++) {
    253                 printf("%7d",gi_S10Rate[k]);
    254                 s+=            gi_S10Rate[k];
    255                 gi_MinRate[k]+=gi_S10Rate[k];
    256                 gi_S10Rate[k++]=0 ;
    257              }
    258              printf("%8d\n",s);
    259           }
    260        }
    261     }
    262 
    263 
    264     if ( g_actTime%60 == 0 && g_actTime > gi_MinTime) {
    265        gi_MinTime = g_actTime ;
    266        k=0 ;
    267        printf("Rate/Minute:\n");
    268        for (c=0; c<4; c++) {
    269           for (b=0; b<10; b++) {
    270              printf("c%d b%d: ",c,b);
    271              s=0 ;
    272              for (p=1; p<8; p++) {
    273                 printf("%7d",gi_MinRate[k]);
    274                 s+=          gi_MinRate[k];
    275                 gi_MinRate[k++]=0 ;
    276              }
    277              printf("%8d\n",s);
    278           }
    279        }
    280     }
    281  
    282       return 0;
    283 
    284 } /*-----------------------------------------------------------------*/
     191  /*-----------------------------------------------------------------*/
    285192
    286193
     
    290197// initialize mBuffer (mark all entries as unused\empty)
    291198
    292    int i,j,k ;
     199   int i ;
     200   uint32_t actime ;
     201
     202   actime = g_actTime + 50000000 ;
    293203
    294204   for (i=0; i<MAX_EVT*MAX_RUN; i++) {
     
    297207      evtCtrl.evtBuf[  i] = -1 ;
    298208      evtCtrl.evtStat[ i] = -1 ;
    299       evtCtrl.pcTime[  i] = g_actTime + 50000000 ;  //initiate to far future
    300    }
    301 
    302    for (i=0; i<MAX_RUN; i++) {
    303       runCtrl[i].runId =
    304       runCtrl[i].lastTime =
    305       runCtrl[i].nextEvt = 0 ;
    306       runCtrl[i].fileId = -2 ;
    307 
    308       for (k=0; k<MAX_EVT; k++) runCtrl[i].buffId[k] = 0 ;
    309 
    310       runTail[i].nEventsOk =
    311       runTail[i].nEventsRej =
    312       runTail[i].nEventsBad =
    313       runTail[i].PCtime0 =
    314       runTail[i].PCtimeX = 0 ;
    315    }
    316 
    317 
    318    evtCtrl.readPtr =  0 ;
    319    evtCtrl.writePtr=  0 ;
     209      evtCtrl.pcTime[  i] = actime ;  //initiate to far future
     210
     211   }
     212
     213
     214   actRun.FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ;
     215
     216   evtCtrl.frstPtr = 0 ;
     217   evtCtrl.lastPtr = 0 ;
    320218
    321219   return 0 ;
     
    326224
    327225
    328 int mBufEvt(int evID, int runID, int nRoi) {
     226int mBufEvt(uint evID, uint runID, uint nRoi) {
    329227// generate a new Event into mBuffer:   
    330228// make sure only complete Event are possible, so 'free' will always work
     
    333231
    334232   int i, k, evFree ;
    335 
    336    if (nRoi < 0 || nRoi > 1024) {
    337       printf("illegal nRoi %d\n",nRoi) ;
     233   int headmem=0 ;
     234   size_t needmem = 0 ;
     235
     236   if (nRoi <=0 || nRoi > 1024) {
     237      snprintf(str,MXSTR,"illegal nRoi: %d",nRoi) ;
     238      factOut(kError, 1, str ) ;
    338239      return 99 ;
    339240   }
     
    354255   //event does not yet exist; create
    355256   if (evFree < 0 ) {        //no space available in ctrl
    356         error(0,0, "no space left to keep event...") ;
     257        snprintf(str,MXSTR,"no control slot to keep event...") ;
     258        factOut(kError,881, str ) ;
    357259        return -1 ;
    358260   }
     261   i = evFree ;   //found free entry; use it ...
    359262
    360263   
    361 
    362    i = evFree ;   //found free entry; use it ...
    363 
    364    mBuffer[i].fEvent  = malloc( sizeof(EVENT) ) ;
    365    if (mBuffer[i].fEvent  == NULL) return -12;
    366 
    367 
    368    mBuffer[i].fEvent->StartPix  = malloc( NPIX * sizeof(int16_t) ) ;
    369    if (mBuffer[i].fEvent->StartPix  == NULL) {
     264   needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2 - 2 ; //-2 because of dummy adc_data[1]
     265
     266   headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
     267
     268   if ( gi_usedMem + needmem + headmem > g_maxMem) {
     269        snprintf(str,MXSTR,"no memory left to keep event...") ;
     270        factOut(kError,882, str ) ;
     271        return -1 ;
     272   }
     273
     274   mBuffer[i].FADhead = malloc( headmem ) ;
     275   if (mBuffer[i].FADhead == NULL) {
     276      return -12;
     277   }
     278
     279   mBuffer[i].fEvent  = malloc( needmem ) ;
     280   if (mBuffer[i].fEvent  == NULL) {
    370281      free(mBuffer[i].fEvent) ;
    371282      mBuffer[i].fEvent = NULL ;
    372       mBuffer[i].nRoi = -3 ;
    373       return -13;
    374    }
    375    for (k=0; k<NPIX; k++) mBuffer[i].fEvent->StartPix[k] = -1 ;
    376 
    377    mBuffer[i].fEvent->StartTM   = malloc( NTMARK * sizeof(int16_t) ) ;
    378    if (mBuffer[i].fEvent->StartTM  == NULL) {
    379       free(mBuffer[i].fEvent->StartPix ) ;
    380       free(mBuffer[i].fEvent) ;
    381       mBuffer[i].fEvent = NULL ;
    382       mBuffer[i].nRoi = -4 ;
    383       return -14;
    384    }
    385    for (k=0; k<NTMARK; k++) mBuffer[i].fEvent->StartTM[k] = -1 ;
    386 
    387    mBuffer[i].fEvent->Adc_Data = malloc( NPIX  * nRoi * sizeof(int16_t) ) ;
    388    if (mBuffer[i].fEvent->Adc_Data == NULL) {
    389       free(mBuffer[i].fEvent->StartTM  ) ;
    390       free(mBuffer[i].fEvent->StartPix ) ;
    391       free(mBuffer[i].fEvent) ;
    392       mBuffer[i].fEvent = NULL ;
    393       mBuffer[i].nRoi = -5 ;
    394       return -15;
    395    }
    396 
    397    mBuffer[i].fEvent->Adc_Tmark= malloc( NTMARK* nRoi * sizeof(int16_t) ) ;
    398    if (mBuffer[i].fEvent->Adc_Tmark== NULL) {
    399       free(mBuffer[i].fEvent->Adc_Data ) ;
    400       free(mBuffer[i].fEvent->StartTM  ) ;
    401       free(mBuffer[i].fEvent->StartPix ) ;
    402       free(mBuffer[i].fEvent) ;
    403       mBuffer[i].fEvent = NULL ;
    404       mBuffer[i].nRoi = -6 ;
    405       return -16;
    406    }
    407 
    408    mBuffer[i].fEvent->FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ;
    409    if (mBuffer[i].fEvent->FADhead== NULL) {
    410       free(mBuffer[i].fEvent->Adc_Tmark ) ;
    411       free(mBuffer[i].fEvent->Adc_Data ) ;
    412       free(mBuffer[i].fEvent->StartTM  ) ;
    413       free(mBuffer[i].fEvent->StartPix ) ;
    414       free(mBuffer[i].fEvent) ;
    415       mBuffer[i].fEvent = NULL ;
    416       mBuffer[i].nRoi = -7 ;
    417       return -17;
    418    }
    419 
     283      return -22;
     284   }
     285
     286   //flag all boards as unused
    420287   mBuffer[i].nBoard = 0 ;
    421288   for (k=0; k<NBOARDS; k++ ) {
    422289      mBuffer[i].board[k] = -1;
     290   }
     291
     292   //flag all pixels as unused
     293   for (k=0; k<NPIX; k++ ) {
     294      mBuffer[i].fEvent->StartPix[k] = -1 ;
     295   }
     296
     297   //flag all TMark as unused
     298   for (k=0; k<NTMARK; k++ ) {
     299      mBuffer[i].fEvent->StartTM[k] = -1 ;
    423300   }
    424301
     
    427304   mBuffer[i].evNum   = evID  ;
    428305   mBuffer[i].runNum  = runID ;
    429 
     306   mBuffer[i].evtLen  = needmem ;
     307
     308   gi_usedMem += needmem + headmem;
    430309
    431310   //register event in 'active list (reading)'
    432311
    433    evtCtrl.evtBuf[  evtCtrl.readPtr] = i ;
    434    evtCtrl.evtStat[ evtCtrl.readPtr] = 0 ;
    435    evtCtrl.pcTime[  evtCtrl.readPtr] = g_actTime ;
    436    evtIdx[i] = evtCtrl.readPtr ;
    437 
    438    evtCtrl.readPtr++ ;
    439    if (evtCtrl.readPtr == MAX_EVT*MAX_RUN ) evtCtrl.readPtr = 0;
     312   evtCtrl.evtBuf[  evtCtrl.lastPtr] = i ;
     313   evtCtrl.evtStat[ evtCtrl.lastPtr] = 0 ;
     314   evtCtrl.pcTime[  evtCtrl.lastPtr] = g_actTime ;
     315   evtIdx[i] = evtCtrl.lastPtr ;
     316snprintf(str,MXSTR,"%5d start new evt  %8d %8d %2d",evID,i,evtCtrl.lastPtr,0);
     317factOut(kDebug,-11, str ) ;
     318   evtCtrl.lastPtr++ ;
     319   if (evtCtrl.lastPtr == MAX_EVT*MAX_RUN ) evtCtrl.lastPtr = 0;
     320
     321
     322
    440323
    441324   gi_EvtStart++ ;
     
    449332   
    450333   if (evFree <0 ) {
    451       error(0,0, "not able to register the new run %d\n",runID);
     334      snprintf(str,MXSTR,"not able to register the new run %d",runID);
     335      factOut(kError,883, str ) ;
    452336   } else {
    453337      runCtrl[evFree].runId = runID ;
    454338   }
    455 
    456339
    457340   return i ;
     
    464347//(and make sure multiple calls do no harm ....)
    465348
     349   int headmem=0 ;
     350   size_t freemem = 0 ;
     351
    466352   if ( mBuffer[i].nRoi > 0) {      //have an fEvent structure generated ...
    467       free(mBuffer[i].fEvent->Adc_Tmark) ;
    468       free(mBuffer[i].fEvent->Adc_Data ) ;
    469       free(mBuffer[i].fEvent->StartTM  ) ;
    470       free(mBuffer[i].fEvent->StartPix ) ;
     353      freemem = mBuffer[i].evtLen ;
    471354      free(mBuffer[i].fEvent ) ;
    472355      mBuffer[i].fEvent = NULL ;
    473    }
     356
     357      free(mBuffer[i].FADhead ) ;
     358      mBuffer[i].FADhead = NULL ;
     359
     360   }
     361   headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
    474362   mBuffer[i].evNum   = mBuffer[i].runNum = mBuffer[i].nRoi= -1;
    475    
     363
     364   gi_usedMem = gi_usedMem - freemem - headmem;
     365
     366
    476367   return 0 ;
    477368 
     
    483374
    484375
    485 int initReadFAD() {
    486 /* *** initialize reading of FAD data */
    487   int32_t i,j,k ;
    488   int c,b,p ;
    489 
     376void initReadFAD() {
     377return ;
     378} /*-----------------------------------------------------------------*/
     379
     380
     381
     382void *readFAD( void *ptr ) {
     383/* *** main loop reading FAD data and sorting them to complete events */
     384  int head_len,frst_len,numok,numok2,dest,evID,i,j,k ;
     385  int32_t jrd ;
     386  int32_t myRun ;
     387  int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ;
     388  uint qtot = 0, qread = 0, qconn = 0 ;
     389  int errcnt0 = 0 ;
     390
     391  int goodhed=0;
     392
     393  struct timespec xwait ;
     394
     395  int nokCnt[MAX_SOCK],loopCnt=0;
     396  int sokCnt[MAX_SOCK];
     397
     398
     399  snprintf(str,MXSTR,"start initializing");
     400  factOut(kInfo,-1, str ) ;
     401
     402  int cpu = 7 ;   //read thread
     403  cpu_set_t mask;
     404
     405/* CPU_ZERO initializes all the bits in the mask to zero. */
     406   CPU_ZERO( &mask );
     407/* CPU_SET sets only the bit corresponding to cpu. */
     408   cpu = 7 ;
     409   CPU_SET( cpu, &mask );
     410// cpu = 6 ;
     411// CPU_SET( cpu, &mask );
     412
     413/* sched_setaffinity returns 0 in success */
     414   if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
     415      snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
     416      factOut(kWarn,-1, str ) ;
     417   }
     418
     419
     420  gi_maxSocks = 0 ;
     421
     422  //make sure all sockets are preallocated as 'not exist'
     423  for (i=0; i<MAX_SOCK; i++) {
     424     rd[i].socket   = -1 ;
     425     rd[i].sockStat = 99 ;
     426  }
     427
     428  int b,p,p0 ;
     429  k = 0 ;
     430  for (b=0; b<NBOARDS; b++ ) {
     431     if ( g_port[b].sockDef >=0 ) {
     432        p0=ntohs(g_port[b].sockAddr.sin_port);
     433        for (p=p0+1; p<p0+8; p++) {
     434           j = GenSock(0,p, &g_port[b].sockAddr, &rd[k]) ;
     435           if ( j != 0 ) {
     436              snprintf(str,MXSTR,"problem with Address board %d port %d",b,p);
     437              factOut(kFatal,101, str ) ;
     438           } else {
     439              rd[k].board = b ;
     440              k++ ;
     441              gi_maxSocks++ ;
     442           }
     443        }
     444     }
     445  }
    490446
    491447  g_actTime = time(NULL) ;
    492 
    493   k = 0 ;
    494   for ( c=0; c<4; c++ )
    495      for (b=0; b<10; b++ )
    496         for (p=5001; p<5008; p++) {
    497           j = GenSock(0,c,b,p, &rd[k]) ;
    498           if ( j != 0 ) printf("problem with c%d b%d p%d\n",c,b,p);
    499 //        else          printf("ok socket %d = %d\n",k,rd[k].socket) ;
    500           k++ ;
    501         }
    502 
    503448  for (k=0; k<MAX_SOCK; k++)
    504449     gi_SecRate[k]=gi_S10Rate[k]=gi_MinRate[k]=gi_ErrCnt[k] = 0 ;
     
    509454
    510455  gi_SecTime= gi_S10Time= gi_MinTime= g_actTime ;
    511      
    512          return NULL;
    513 
    514 } /*-----------------------------------------------------------------*/
    515 
    516 
    517 
    518 void *readFAD( void *ptr ) {
    519 /* *** main loop reading FAD data and sorting them to complete events */
    520   int head_len,frst_len,numok,numok2,evFree,dest,evID,i,j,k ;
    521   int32_t jrd ;
    522   int8_t FADbyte0, FADbyte1, FADbyteX0, FADbyteX1 ;
    523   int32_t myRun, cleanTime ;
    524   int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ;
    525   int reqBoards = 40 ;
    526 
    527   int goodevt=0;
    528   int goodhed=0;
    529   int nbuf=0;
    530   int ret ;
    531 
    532   int waitTime = 10 ; //maximum nr of seconds wait for delayed packets
    533 
    534   int nokCnt[MAX_SOCK],loopCnt=0;
    535   int sokCnt[MAX_SOCK];
     456
     457
     458
     459  mBufInit() ;    //initialize buffers
     460
     461  snprintf(str,MXSTR,"end   initializing");
     462  factOut(kInfo,-1, str ) ;
     463
    536464
    537465  for (k=0; k<MAX_SOCK; k++) sokCnt[k]=nokCnt[k]=0 ;
    538466
    539 
    540467  head_len = sizeof(PEVNT_HEADER) ;
    541   frst_len = head_len + 36 * 12 ;
    542   if (head_len < MIN_LEN) { printf("headLen ...\n"); exit(99);}
     468  frst_len = head_len + 36 * 12 ;   //fad_header plus 36*pix_header
    543469
    544470  numok = numok2   = 0 ;
     
    548474
    549475  myRun = g_actTime ;
    550   cleanTime = g_actTime ;     //once per second cleanup buffers from too old data
    551 
    552 
    553 
    554 
    555   mBufInit() ;
    556 
    557 
    558   while (g_runStat > 0) {           //loop until global variable g_stop is set
    559 
     476
     477  gi_runStat = g_runStat ;
     478
     479
     480  while (g_runStat >=0) {           //loop until global variable g_stop is set
     481
     482    gi_runStat = g_runStat ;
    560483
    561484    g_actTime = time(NULL) ;
     
    566489    numok = 0 ;                       //count number of succesfull actions
    567490
    568     for (i=0; i<MAX_SOCK; i++) {      //check all sockets if something to read
    569 
     491    for (i=0; i<gi_maxSocks; i++) {      //check all sockets if something to read
    570492      if (rd[i].sockStat <0 ) {         //try to connect if not yet done
    571493        rd[i].sockStat=connect(rd[i].socket,
    572494            (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ;
    573         if (rd[i].sockStat >=0 ) {      //successfull ==>
     495        if (rd[i].sockStat ==0 ) {      //successfull ==>
    574496          rd[i].bufTyp = 0 ;            //  expect a header
    575497          rd[i].bufLen = frst_len ;     //  max size to read at begining
    576498          rd[i].bufPos = 0 ;            //  no byte read so far
    577  gi_NumConnect[ rd[i].board ]++ ;
    578  printf("+++connect %d %d\n",rd[i].board,gi_NumConnect[ rd[i].board ]);
     499          gi_NumConnect[ rd[i].board ]++ ;
     500          numok++ ;                     //make sure next round will execute
     501          snprintf(str,MXSTR,"+++connect %d %d",rd[i].board,gi_NumConnect[ rd[i].board ]);
     502          factOut(kInfo,-1, str ) ;
    579503        }
    580504      }
    581505
    582       if (rd[i].sockStat >=0) {     //we have a connection ==> try to read
     506      if (rd[i].sockStat ==0) {     //we have a connection ==> try to read
    583507        numok++ ;
    584508        sokCnt[i]++;
     
    587511        if (jrd == 0) {                 //connection has closed ...
    588512           rd[i].sockStat = -1 ;        //flag (try to reopen next round)
    589            error(0,errno,"Socket %d closed",i);
    590            j = GenSock(1,0,0,0, &rd[i]) ;
     513           snprintf(str,MXSTR,"Socket %d closed by FAD",i);
     514           factOut(kInfo,441, str ) ;
     515           j = GenSock(1,0,NULL, &rd[i]) ;
    591516           gi_ErrCnt[i]++ ;
    592  gi_NumConnect[ rd[i].board ]-- ;
    593  printf("disconnect %d %d\n",rd[i].board,gi_NumConnect[ rd[i].board ]);
     517           gi_NumConnect[ rd[i].board ]-- ;
    594518        } else if ( jrd<0 ) {           //did not read anything
    595519           if (errno != EAGAIN && errno != EWOULDBLOCK ) {
    596               error(1,errno,"Error Reading from %d",i);
     520              snprintf(str,MXSTR,"Error Reading from %d | %m",i);
     521              factOut(kError,442, str ) ;
    597522              gi_ErrCnt[i]++ ;
    598523           } else  numok-- ;            //else nothing waiting to be read
    599524
    600525        } else if ( rd[i].bufTyp >0 ) { // we are reading data ...
    601 //printf("received data %d %d\n", i,jrd);
    602 
    603             if ( jrd < rd[i].bufLen ) {    //not yet all read
     526           qread+=jrd ;
     527           if ( jrd < rd[i].bufLen ) {    //not yet all read
    604528             rd[i].bufPos += jrd ;        //==> prepare for continuation
    605529             rd[i].bufLen -= jrd ;
     
    610534               && rd[i].rBuf->B[ rd[i].bufPos  ] != stop.B[1]) {
    611535                gi_ErrCnt[i]++ ;
    612                 printf( "wrong end of buffer found %d\n",rd[i].bufPos);
    613  exit(1) ;
     536                snprintf(str,MXSTR,"wrong end of buffer found %d",rd[i].bufPos);
     537                factOut(kError,301, str ) ;
    614538                goto EndBuf ;
    615539
     
    624548
    625549             if (evID < 0) {
    626                 printf("no space left ...%d\n",evID) ;
    627  exit(2) ;
    628                 goto EndBuf ;
     550                snprintf(str,MXSTR,"no space left ...%d",evID) ;
     551                factOut(kError,201, str ) ;
     552                goto EndBuf ; //--> skip event (and hope it will improve)
    629553             }
    630554
     
    632556
    633557             boardId = rd[i].board  ;
    634              if ( mBuffer[evID].board[ boardId ] != -1) {   //this board already stored ...
    635                 printf( "board of this event already stored ...") ;
    636              } else {
    637 
    638                 int iDx = evtIdx[evID] ;   //index into evtCtrl
    639 
    640                 memcpy( &mBuffer[evID].fEvent->FADhead[boardId].start_package_flag,
     558             int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
     559             int fadCrate = fadBoard/256 ;
     560             if (boardId != (fadCrate*10 + fadBoard%256) ) {
     561                snprintf(str,MXSTR,"wrong Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
     562                if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ;  //print only few times
     563//           } else {
     564//              snprintf(str,MXSTR,"correct Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
     565//              if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ;  //print only few times
     566             }
     567             if ( mBuffer[evID].board[ boardId ] != -1) {   
     568                snprintf(str,MXSTR,"double board %d for event %d",boardId,evID) ;
     569                factOut(kWarn,501, str ) ;
     570                goto EndBuf ; //--> skip Board
     571             }
     572
     573             int iDx = evtIdx[evID] ;   //index into evtCtrl
     574
     575             memcpy( &mBuffer[evID].FADhead[boardId].start_package_flag,
    641576                        &rd[i].rBuf->S[0], head_len) ;
    642                 mBuffer[evID].board[ boardId ] = boardId ;
    643                 roi  = mBuffer[evID].nRoi ;
    644 
    645                 pixS = boardId*36 -1 ;   //
    646                 tmS  = boardId*4  -1 ;   //
    647                 src  = head_len/2 ;
    648                 for ( drs=0; drs<4; drs++ ) {
    649                    for ( px=0; px<9; px++ ) {
    650                       pixH= ntohs(rd[i].rBuf->S[src++]) ;
    651                       pixC= ntohs(rd[i].rBuf->S[src++]) ;
    652                       pixR= ntohs(rd[i].rBuf->S[src++]) ;
    653  
    654                       src++  ;
    655                       pixS++ ; //pixS = pixH2S[pixH] ;
    656                       if (pixR != roi ) {
    657                          if (px == 8 && pixR == 2*roi ) {
    658                          } else {
    659                             printf("wrong roi %d %d %d %d\n",px,pixR,roi,src-2);
    660 //exit(66);
    661                          }
    662 //                         goto EndBuf ;
    663                       }
    664 
     577             roi  = mBuffer[evID].nRoi ;
     578
     579             pixS = boardId*36 -1 ;   //
     580             tmS  = boardId*4  -1 ;   //
     581             src  = head_len/2 ;
     582             for ( drs=0; drs<4; drs++ ) {
     583                for ( px=0; px<9; px++ ) {
     584                   pixH= ntohs(rd[i].rBuf->S[src++]) ;
     585                   pixC= ntohs(rd[i].rBuf->S[src++]) ;
     586                   pixR= ntohs(rd[i].rBuf->S[src++]) ;
     587
     588                   src++  ;
     589                   pixS++ ; //pixS = pixH2S[pixH] ;
     590                   if ( ( px < PX8 && pixR == roi )
     591                     || ( px ==PX8 && pixR == 2*roi )
     592                     || ( px ==PX8 && pixR == roi && roi > 512 ) ) {
     593                       // correct roi
    665594                      mBuffer[evID].fEvent->StartPix[pixS] =pixC;
    666595                      dest= pixS * roi ;
    667596                      memcpy(
    668                             &mBuffer[evID].fEvent->Adc_Data[dest],
    669                             &rd[i].rBuf->S[src],  roi * 2) ;
     597                           &mBuffer[evID].fEvent->Adc_Data[dest],
     598                           &rd[i].rBuf->S[src],  roi * 2) ;
    670599                      src+= roi ;
    671 
    672         //            if (px==8 && roi < 512 ) {
    673         //               tmS++ ;
    674         //               dest= tmS * roi ;
    675         //               mBuffer[evID].fEvent->StartTM[pixS] =pixC+roi;
    676         //               memcpy(
    677         //                     &mBuffer[evID].fEvent->Adc_Tmark[dest],
    678         //                     &rd[i].rBuf.S[src],  roi * 2) ;
    679 // ?? not in the simulator ...                      src+= roi ;
    680         //            }
     600                      if ( px==PX8 ) {
     601                         tmS++; // tmS = tmH2S[pixH]
     602                         dest= tmS * roi + NPIX* roi ;
     603                         if ( roi <=512 ) {
     604                            mBuffer[evID].fEvent->StartTM[tmS] =(pixC+roi)%1024 ;
     605                            memcpy(
     606                              &mBuffer[evID].fEvent->Adc_Data[dest],
     607                              &rd[i].rBuf->S[src],  roi * 2) ;
     608                            src+=roi ;
     609                         } else {
     610                             mBuffer[evID].fEvent->StartTM[tmS] = -1 ;
     611                         }
     612                      }
     613                   } else {
     614                      snprintf(str,MXSTR,"wrong roi %d %d %d %d",px,pixR,roi,src-2);
     615                      factOut(kError,202, str ) ;
     616                      goto EndBuf ;
    681617                   }
    682618                }
    683                 evtCtrl.evtStat[ iDx ]++ ;
    684                 evtCtrl.pcTime[ iDx ] = g_actTime ;
    685 
    686                 if (++mBuffer[evID].nBoard == 19 ) {
    687                    //complete event read ---> flag for next processing
    688                    evtCtrl.evtStat[ iDx ] = 99;
    689                    gi_EvtRead++ ;
    690                    gi_EvtTot++ ;   
    691  printf("complete event --------------------------------------------------\n");
    692                 }
    693619             }// now we have stored a new board contents into Event structure
     620             mBuffer[evID].board[ boardId ] = boardId ;
     621             evtCtrl.evtStat[ iDx ]++ ;
     622             evtCtrl.pcTime[ iDx ] = g_actTime ;
     623
     624             if (++mBuffer[evID].nBoard == g_actBoards ) {
     625                snprintf(str,MXSTR,"%5d complete event %8d %8d %2d",mBuffer[evID].evNum,evtCtrl.evtBuf[iDx],iDx,evtCtrl.evtStat[ iDx ]);
     626                factOut(kDebug,-1, str ) ;
     627                //complete event read ---> flag for next processing
     628                evtCtrl.evtStat[ iDx ] = 99;
     629                gi_EvtRead++ ;
     630                gi_EvtTot++ ;   
     631             }
    694632
    695633EndBuf:
     
    700638
    701639        } else {                        //we are reading event header
     640           qread+=jrd ;
    702641           rd[i].bufPos += jrd ;
    703642           rd[i].bufLen -= jrd ;
     
    708647                  && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
    709648              }
     649
     650
     651              for (k=0; k<rd[i].bufPos -1 ; k++) {
     652                 if (rd[i].rBuf->B[k  ] == start.B[1]
     653                  && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
     654              }
     655
    710656              if (k >= rd[i].bufPos-1 ) {   //no start of header found
    711  printf("no start of header found !!!!\n");
     657                 snprintf(str,MXSTR,"no start of header on port%d", i ) ;
     658                 factOut(kWarn,666, str ) ;
     659
    712660                 rd[i].bufPos = 0 ;
    713661                 rd[i].bufLen = head_len ;
     
    719667              if ( rd[i].bufPos > MIN_LEN ) {
    720668                 goodhed++;
    721                  rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ; ///???
     669                 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ;
    722670                 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
    723                  rd[i].evtID  = ntohl(rd[i].rBuf->I[4]) ;
     671                 rd[i].evtID  = ntohl(rd[i].rBuf->I[4]) ; //(FADevt)
    724672                 rd[i].runID  = ntohl(rd[i].rBuf->I[11]) ;
    725  printf("received event %d %d\n",rd[i].evtID,i);
    726                     if (rd[i].runID ==0 ) rd[i].runID = myRun ;
     673if (rd[i].runID ==0 ) rd[i].runID = myRun ;
    727674                 rd[i].bufTyp = 1 ;       //ready to read full record
    728675                 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
     
    734681    } //finished trying to read all sockets
    735682
    736 
     683int qwait=0, qdel=0, qskip=0 ;
    737684    g_actTime = time(NULL) ;
    738685    if ( g_actTime > gi_SecTime ) {
     686         gi_SecTime = g_actTime ;
    739687//       PrintRate() ;
    740688
     689
    741690       //loop over all active events and flag those older than read-timeout
    742 
    743        int kd = evtCtrl.readPtr - evtCtrl.writePtr ;
     691       //delete those that are written to disk ....
     692
     693       int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
    744694       if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
    745        for ( k=evtCtrl.writePtr; k<(evtCtrl.writePtr+kd); k++ ) {
     695
     696       int k1=evtCtrl.frstPtr;
     697       for ( k=k1; k<(k1+kd); k++ ) {
    746698          int k0 = k % (MAX_EVT*MAX_RUN) ;
    747 
     699//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
    748700          if (evtCtrl.evtStat[k0] > 0
    749            && evtCtrl.evtStat[k0] < 90
    750            && evtCtrl.pcTime[k0] < g_actTime-10 ) {
    751              evtCtrl.evtStat[k0] = 91 ;
     701           && evtCtrl.evtStat[k0] < 90 ) {
     702
     703           qwait++;
     704
     705           if( evtCtrl.pcTime[k0] < g_actTime-10 ) {
     706             int id =evtCtrl.evtBuf[k0] ;
     707             snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]);
     708             factOut(kWarn,601, str ) ;
     709             evtCtrl.evtStat[k0] = 91 ;      //timeout for incomplete events
    752710             gi_EvtBad++ ;
    753711             gi_EvtTot++ ;   
     712             qskip++;
     713           }
     714
     715
     716          } else if (evtCtrl.evtStat[k0] >= 900 ) {
     717
     718              int id =evtCtrl.evtBuf[k0] ;
     719              snprintf(str,MXSTR,"%5d free event buffer (written) %3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
     720              factOut(kDebug,-1, str ) ;
     721              mBufFree(id) ;               //event written--> free memory
     722              evtCtrl.evtStat[k0] = -1;
     723              qdel++;
     724              qtot++;
     725          }
     726
     727          if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) {
     728             evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
    754729          }
    755730       }
     731
     732qconn=0 ;
     733int ib ;
     734for (ib=0; ib<NBOARDS; ib++) qconn+=gi_NumConnect[ib] ;
     735
     736snprintf(str,MXSTR,"bfr%5d skp%4d free%4d (tot%7d) mem%9lu rd%10d %3d",qwait,qskip,qdel,qtot,gi_usedMem,qread,qconn);
     737factStat(kInfo,-1, str ) ;
     738qread=0 ;
    756739    }
     740
     741
    757742
    758743
     
    765750          xwait.tv_sec = 0;
    766751          xwait.tv_nsec= 2000000 ;  // sleep for ~2 msec
    767 //          xwait.tv_nsec= 10000000 ;  // sleep for ~10 msec
    768752       }
    769 //       printf("sleeping ...\n");
    770753       nanosleep( &xwait , NULL ) ;
    771754    }
    772755
    773 
    774 
    775 
    776756 } //and do next loop over all sockets ...
    777  
    778       return NULL;
     757
     758 //must quit eventbuilding
     759 snprintf(str,MXSTR,"stop reading ...");
     760 factOut(kInfo,-1, str ) ;
     761
     762 //flag all events as 'read finished'
     763 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
     764 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
     765
     766 int k1=evtCtrl.frstPtr;
     767
     768 for ( k=k1; k<(k1+kd); k++ ) {
     769    int k0 = k % (MAX_EVT*MAX_RUN) ;
     770    if (evtCtrl.evtStat[k0] > 0
     771     && evtCtrl.evtStat[k0] < 90 ) {
     772       evtCtrl.evtStat[k0] = 91 ;   
     773       gi_EvtBad++ ;
     774       gi_EvtTot++ ;   
     775    }
     776 }
     777
     778 //must close all open sockets ...
     779 snprintf(str,MXSTR,"close all sockets ...");
     780 factOut(kInfo,-1, str ) ;
     781 for (i=0; i<MAX_SOCK; i++)   
     782    if (rd[i].sockStat ==0 ) { 
     783       j=close(rd[i].socket) ;
     784       if (j>0) {
     785          snprintf(str,MXSTR,"Error closing socket %d | %m",i);
     786          factOut(kFatal,771, str ) ;
     787       }
     788       rd[i].sockStat = -1 ;        //flag (try to reopen next round)
     789       gi_NumConnect[ rd[i].board ]-- ;
     790    }
     791
     792 xwait.tv_sec = 0;
     793 xwait.tv_nsec= 2000000 ;  // sleep for ~2 msec
     794 nanosleep( &xwait , NULL ) ;
     795 gi_runStat = -11 ;  //inform all that no update to happen any more
     796
     797
     798 int minclear = 900 ; //usually wait until writing finished (stat 900)
     799 if (g_runStat <-1 ) minclear = 0 ;  //in case of abort clear all
     800
     801
     802 //and clear all buffers (might have to wait until all others are done)
     803 snprintf(str,MXSTR,"clear all buffers ...");
     804 factOut(kInfo,-1, str ) ;
     805 int numclear=1 ;
     806 while (numclear > 0 ) {
     807    numclear = 0 ;
     808    int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
     809    if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
     810
     811    int k1=evtCtrl.frstPtr;
     812    for ( k=k1; k<(k1+kd); k++ ) {
     813       int k0 = k % (MAX_EVT*MAX_RUN) ;
     814       if (evtCtrl.evtStat[k0] > minclear ) {
     815         int id =evtCtrl.evtBuf[k0] ;
     816          mBufFree(id) ;               //event written--> free memory
     817          evtCtrl.evtStat[k0] = -1;
     818       } else if (evtCtrl.evtStat[k0] > 0) numclear++ ;  //writing is still ongoing...
     819
     820       if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 )
     821          evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
     822    }
     823
     824    xwait.tv_sec = 0;
     825    xwait.tv_nsec= 2000000 ;  // sleep for ~2 msec
     826    nanosleep( &xwait , NULL ) ;
     827 }
     828
     829 snprintf(str,MXSTR,"Exit read Process ...");
     830 factOut(kInfo,-1, str ) ;
     831 gi_runStat = -99 ;
     832 return 0;
     833
    779834} /*-----------------------------------------------------------------*/
    780835
     
    782837void *procEvt( void *ptr ) {
    783838/* *** main loop processing file, including SW-trigger */
    784   int numProc ;
    785   int k,k1,k2,kd ;
    786 
    787   while (g_runStat > 0) {
    788 
    789 
    790      kd = evtCtrl.readPtr - evtCtrl.writePtr ;
     839  int numProc, numWait ;
     840  int k ;
     841  struct timespec xwait ;
     842  char str[MXSTR] ;
     843
     844  cpu_set_t mask;
     845  int cpu = 5 ;   //process thread  (will be several in final version)
     846
     847  snprintf(str,MXSTR,"Starting process-thread");
     848  factOut(kInfo,-1, str ) ;
     849
     850/* CPU_ZERO initializes all the bits in the mask to zero. */
     851   CPU_ZERO( &mask );
     852/* CPU_SET sets only the bit corresponding to cpu. */
     853   CPU_SET( cpu, &mask );
     854/* sched_setaffinity returns 0 in success */
     855   if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
     856      snprintf(str,MXSTR,"P ---> can not create affinity to %d",cpu);
     857      factOut(kWarn,-1, str ) ;
     858   }
     859
     860
     861  while (g_runStat > -2) {   //in case of 'exit' we still must process pending events
     862
     863     numWait = numProc = 0 ;
     864     int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
    791865     if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
    792      k1=evtCtrl.writePtr;
    793      k2=evtCtrl.writePtr+kd;
    794 
    795      numProc = 0 ;
    796      if (gp_EvtTot < gi_EvtTot) {
    797         for ( k=k1; k<k2; k++ ) {
    798            int k0 = k % (MAX_EVT*MAX_RUN) ;
    799 
    800            if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
    801               //ready to be processed ...
    802               int      id   = evtCtrl.evtBuf[k0] ;
    803               uint32_t irun = mBuffer[id].runNum ;
    804               int      ievt = mBuffer[id].evNum ;
    805 printf("processing %d %d %d %d\n",ievt,k,evtCtrl.evtStat[k0],evtCtrl.writePtr) ;
    806               numProc++ ;
    807               evtCtrl.evtStat[k0] = 501 ;
    808               gp_EvtTot++ ;
     866
     867     int k1=evtCtrl.frstPtr;
     868     for ( k=k1; k<(k1+kd); k++ ) {
     869        int k0 = k % (MAX_EVT*MAX_RUN) ;
     870//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
     871        if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
     872           int      id   = evtCtrl.evtBuf[k0] ;
     873           uint32_t irun = mBuffer[id].runNum ;
     874           int      ievt = mBuffer[id].evNum ;
     875           int      roi  = mBuffer[id].nRoi ;
     876//snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ;
     877//factOut(kDebug,-1, str ) ;
     878
     879//make sure unused pixels/tmarks are cleared to zero
     880           int ip,it,dest,ib;
     881           for (ip=0; ip<NPIX; ip++) {
     882             if (mBuffer[id].fEvent->StartPix[ip] == -1 ) {
     883                dest= ip*roi ;
     884                bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
     885             }
    809886           }
     887           for (it=0; it<NTMARK; it++) {
     888             if (mBuffer[id].fEvent->StartTM[it] == -1 ) {
     889                dest= it*roi + NPIX*roi ;
     890                bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
     891             }
     892           }
     893//and set correct event header ; also check for consistency in event
     894          mBuffer[id].fEvent->Roi = roi ;
     895          mBuffer[id].fEvent->EventNum = ievt ;
     896          mBuffer[id].fEvent->TriggerType = 0 ; // TBD
     897          mBuffer[id].fEvent->SoftTrig = 0 ;
     898          for (ib=0; ib<NBOARDS; ib++) {
     899             mBuffer[id].fEvent->BoardTime[ib] = 123 ;
     900
     901
     902
     903
     904           }
     905           numProc++ ;
     906           evtCtrl.evtStat[k0] = 520 ;
     907           gp_EvtTot++ ;
     908        } else if ( evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] < 90 ) {
     909           numWait++ ;
    810910        }
    811911     }
     912
     913     if ( gi_runStat < -10 && numWait == 0) {  //nothing left to do
     914        snprintf(str,MXSTR,"Exit Processing Process ...");
     915        factOut(kInfo,-1, str ) ;
     916        gp_runStat = -22 ;                     //==> we should exit
     917        return 0 ;
     918     }
     919
    812920     if (numProc == 0) {
    813921        //seems we have nothing to do, so sleep a little
    814922        xwait.tv_sec = 0;
    815         xwait.tv_nsec= 10000000 ;  // sleep for ~10 msec
     923        xwait.tv_nsec= 2000000 ;  // sleep for ~2 msec
    816924        nanosleep( &xwait , NULL ) ;
    817925     }
     926     gp_runStat = gi_runStat ;
    818927
    819928  }
    820   return NULL;
     929
     930  //we are asked to abort asap ==> must flag all remaining events
     931  //   when gi_runStat claims that all events are in the buffer...
     932
     933  snprintf(str,MXSTR,"Abort Processing Process ...");
     934  factOut(kInfo,-1, str ) ;
     935  int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
     936  if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
     937
     938  int k1=evtCtrl.frstPtr;
     939  for ( k=k1; k<(k1+kd); k++ ) {
     940     int k0 = k % (MAX_EVT*MAX_RUN) ;
     941     if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <500) {
     942        evtCtrl.evtStat[k0] = 555 ; //flag event as 'processed'
     943     }
     944  }
     945
     946  gp_runStat = -99 ;
     947
     948  return 0;
    821949 
    822950} /*-----------------------------------------------------------------*/
    823951
     952int CloseRunFile(uint32_t runId, uint32_t closeTime) {
     953/* close run runId (all all runs if runId=0) */
     954/* return: 0=close scheduled / >0 already closed / <0 does not exist */
     955  int j ;
     956
     957  if (runId == 0 ) {
     958     for ( j=0; j<MAX_RUN; j++) {
     959        if ( runCtrl[j].fileId == 0 ) {  //run is open
     960           runCtrl[j].closeTime = closeTime ;
     961        }
     962     }
     963     return 0 ;
     964  }
     965
     966
     967  for ( j=0; j<MAX_RUN; j++) {
     968     if ( runCtrl[j].runId == runId ) {
     969        if ( runCtrl[j].fileId == 0 ) {  //run is open
     970           runCtrl[j].closeTime = closeTime ;
     971           return 0;
     972        } else if ( runCtrl[j].fileId <0 ) { //run not yet opened
     973           runCtrl[j].closeTime = closeTime ;
     974           return 0;
     975        } else {     // run already closed
     976           return +1;
     977        }
     978     }
     979  }   //we only reach here if the run was never created
     980  return -1;
     981
     982} /*-----------------------------------------------------------------*/
     983
    824984
    825985void *writeEvt( void *ptr ) {
    826986/* *** main loop writing event (including opening and closing run-files */
    827987
    828   int  numWrite = 0 ;
    829   int j,id,irun,ievt ;
    830 
    831   while (g_runStat > 0) {           //loop until global variable g_stop is set
    832 
    833     //loop over buffered events and check if something to write ...
    834 
    835     if ( gp_EvtTot == gw_EvtTot ) {
    836        //there is for sure nothing to do --> sleep a little
    837        xwait.tv_sec = 0;
    838        xwait.tv_nsec= 10000000 ;  // sleep for ~10 msec
    839        nanosleep( &xwait , NULL ) ;
    840 
    841     } else {  //go through evtCtrl list to check if there might be something
    842 
    843        //if run-file not yet opened==> open runfile (better to store headers in own structure ?)
    844 
    845        //if eventid == next event for run ==> write it (or flag it)
    846        //if eventid > next event exists, and nothing new for >time out ==> write it
    847        //if nothing for this run for >timeout ==> close run
    848 
    849        int kd = evtCtrl.readPtr - evtCtrl.writePtr ;
    850        if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
    851        int k,k1,k2 ;
    852 
    853 
    854        k1=evtCtrl.writePtr;
    855        k2=evtCtrl.writePtr+kd;
    856 
    857        int evtTot=gw_EvtTot ;
    858        for ( k=k1; k<k2; k++ ) {
    859           int k0 = k % (MAX_EVT*MAX_RUN) ;
    860 
    861           if (evtCtrl.evtStat[k0] > 500 ) { //ready to be written ...
    862              id   = evtCtrl.evtBuf[k0] ;
    863              irun = mBuffer[id].runNum ;
    864              ievt = mBuffer[id].evNum ;
    865 
    866              for ( j=0; j<MAX_RUN; j++) {
    867                 if ( runCtrl[j].runId == irun ) break ;
    868              }
    869              if ( j >= MAX_RUN ) {
    870  printf("error: can not find run %d\n", irun);
    871  exit(111);
    872              }
    873 
    874              if (runCtrl[j].fileId < 0 ) {
    875  printf("open new run_file %d\n",irun) ;
    876                 runCtrl[j].fileId = 999  ; // should be a function call
    877                 runCtrl[j].nextEvt= 0;
    878                 runCtrl[j].lastTime=g_actTime ;
    879              }
    880 
    881              if (runCtrl[j].nextEvt == ievt ) {  //write this event
    882  printf("write event %d (run %d %d)\n",ievt,irun,evtCtrl.evtStat[k0] ) ;
    883                 runCtrl[j].nextEvt= ievt+1;
    884                 runCtrl[j].lastTime=g_actTime ;
    885                 evtCtrl.evtStat[k0]= -1 ;
    886                 gw_EvtTot++ ;
    887                 numWrite++ ;
    888 //                evtCtrl.writePtr=k+1;
    889              } else if ( ievt < runCtrl[j].nextEvt ) {
    890  printf("delayed event (run %d %d %d) skipped\n",ievt,irun,evtCtrl.evtStat[k0] ) ;
    891                 evtCtrl.evtStat[k0]= -1 ;
    892 //                evtCtrl.writePtr=k+1;
    893                 gw_EvtTot++ ;
    894                 numWrite++ ;
    895              }
    896           }
    897 
    898 
    899 
    900           if ( runCtrl[j].lastTime < g_actTime-15) {
    901  printf("non existing event skip %d (run %d -> %d)\n",runCtrl[j].nextEvt,irun,ievt) ;
    902              runCtrl[j].nextEvt++;
    903              numWrite++;
    904           }
    905 
    906           for ( j=0; j<MAX_RUN; j++) {
    907              if ( runCtrl[j].runId >0 && runCtrl[j].lastTime < g_actTime-120) {
    908  printf("close run %d (timeout)\n",irun) ;
    909                 runCtrl[j].fileId = -2 ;
    910                 runCtrl[j].runId  =  0 ;
    911              }
    912           }
    913           if (numWrite == 0 ) {
    914              //nothing to do at the moment ==> sleep a little
    915              xwait.tv_sec = 0;
    916              xwait.tv_nsec= 10000000 ;  // sleep for ~10 msec
    917              nanosleep( &xwait , NULL ) ;
    918           }
    919 
    920        }
    921      }
    922 
    923 
    924 
    925 
    926 
    927 
    928 
    929 
    930 
    931 
    932 
    933 
    934   return NULL;
     988  int  numWrite, numWait ;
     989  int k,j ;
     990  struct timespec xwait ;
     991  char str[MXSTR] ;
     992
     993  cpu_set_t mask;
     994  int cpu = 3 ;   //write thread
     995
     996  snprintf(str,MXSTR,"Starting write-thread");
     997  factOut(kInfo,-1, str ) ;
     998
     999/* CPU_ZERO initializes all the bits in the mask to zero. */
     1000   CPU_ZERO( &mask );
     1001/* CPU_SET sets only the bit corresponding to cpu. */
     1002   CPU_SET( cpu, &mask );
     1003/* sched_setaffinity returns 0 in success */
     1004   if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
     1005      snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
     1006   }
     1007
     1008  int lastRun = 0 ; //usually run from last event still valid
     1009
     1010  while (g_runStat >-2) {
     1011
     1012     numWait = numWrite = 0 ;
     1013     int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
     1014     if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
     1015
     1016     int k1=evtCtrl.frstPtr;
     1017     for ( k=k1; k<(k1+kd); k++ ) {
     1018        int k0 = k % (MAX_EVT*MAX_RUN) ;
     1019//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
     1020        if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] < 900) {
     1021           int      id   = evtCtrl.evtBuf[k0] ;
     1022           uint32_t irun = mBuffer[id].runNum ;
     1023           int      ievt = mBuffer[id].evNum ;
     1024
     1025           if (runCtrl[lastRun].runId == irun ) {
     1026              j = lastRun ;
     1027           } else {
     1028              //check which fileID to use (or open if needed)
     1029              for ( j=0; j<MAX_RUN; j++) {
     1030                 if ( runCtrl[j].runId == irun ) break ;
     1031              }
     1032              if ( j >= MAX_RUN ) {
     1033                 snprintf(str,MXSTR,"W error: can not find run %d for event %d in %d", irun,ievt,id);
     1034                 factOut(kFatal,901, str ) ;
     1035for ( j=0; j<MAX_RUN; j++) printf("j %d   run.j %d   run %d\n",j,runCtrl[j].runId,irun );
     1036exit(111);
     1037              }
     1038              lastRun = j ;
     1039           }
     1040
     1041           if (runCtrl[j].fileId < 0 ) {
     1042              actRun.Version =  1 ;
     1043              actRun.RunType = -1 ;
     1044              actRun.NBoard  = NBOARDS ;
     1045              actRun.NPix    = NPIX ;
     1046              actRun.NTm     = NTMARK ;
     1047              actRun.Nroi    = mBuffer[id].nRoi ;
     1048//            actRun.FADhead = mBuffer[id].FADhead ;  //to be corrected
     1049              runCtrl[j].nextEvt= 0;
     1050              runCtrl[j].lastTime=g_actTime ;
     1051              runCtrl[j].fileHd = runOpen(irun,  &actRun, sizeof(actRun) ) ;
     1052              if (runCtrl[j].fileHd == NULL ) {
     1053                 snprintf(str,MXSTR,"W could not open a file for run %d",irun);
     1054                 factOut(kError,502, str ) ;
     1055                 runCtrl[j].fileId = 99 ;
     1056              } else {
     1057                 snprintf(str,MXSTR,"W opened new run_file %d",irun) ;
     1058                 factOut(kInfo,-1, str ) ;
     1059                 runCtrl[j].fileId = 0 ;
     1060              }
     1061
     1062           }
     1063
     1064           if (runCtrl[j].fileId > 0 ) {
     1065              snprintf(str,MXSTR,"W no open file for this run %d",irun) ;
     1066              factOut(kDebug,123,str) ;
     1067              evtCtrl.evtStat[k0] = 902 ;
     1068           } else {
     1069              int i=runWrite(runCtrl[j].fileHd, mBuffer[id].fEvent, sizeof(mBuffer[id]) );
     1070              if (i<0) {
     1071                 snprintf(str,MXSTR,"W error writing event for run %d",irun) ;
     1072                 factOut(kError,503, str ) ;
     1073                 evtCtrl.evtStat[k0] = 901 ;
     1074                 //close run
     1075                 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
     1076                 if (i<0) {
     1077                    snprintf(str,MXSTR,"W error closing run %d",irun) ;
     1078                    factOut(kError,503, str ) ;
     1079                 } else {
     1080                    snprintf(str,MXSTR,"W closed run %d because of write error",irun) ;
     1081                    factOut(kInfo,503, str ) ;
     1082                 }
     1083                 runCtrl[j].fileId = 9999 ;
     1084              } else {
     1085                 runCtrl[j].lastTime = g_actTime;
     1086                 evtCtrl.evtStat[k0] = 901 ;
     1087                 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0);
     1088                 factOut(kDebug,504, str ) ;
     1089              }
     1090           }
     1091        } else if (evtCtrl.evtStat[k0] > 0 ) numWait++ ;
     1092     }
     1093
     1094     //check if we should close a run ...
     1095     for ( j=0; j<MAX_RUN; j++) {
     1096        if ( runCtrl[j].fileId==0   
     1097          && (  runCtrl[j].closeTime < g_actTime 
     1098              ||runCtrl[j].lastTime  < g_actTime-120) ) {
     1099           int i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
     1100           if (i<0) {
     1101              snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
     1102              factOut(kError,506, str ) ;
     1103              runCtrl[j].fileId = 888 ;
     1104           } else {
     1105              snprintf(str,MXSTR,"closing run %d ok  BBB",runCtrl[j].runId);
     1106              factOut(kInfo,507, str ) ;
     1107              runCtrl[j].fileId = 7777 ;
     1108           }
     1109        }
     1110     }
     1111
     1112     if (numWrite == 0) {
     1113        //seems we have nothing to do, so sleep a little
     1114        xwait.tv_sec = 0;
     1115        xwait.tv_nsec= 2000000 ;  // sleep for ~2 msec
     1116        nanosleep( &xwait , NULL ) ;
     1117     }
     1118
     1119     if ( gi_runStat < -10 && numWait == 0) {  //nothing left to do
     1120        snprintf(str,MXSTR,"Finish Write Process ...");
     1121        factOut(kInfo,-1, str ) ;
     1122        gw_runStat = -22 ;                     //==> we should exit
     1123        goto closerun ;
     1124     }
     1125     gw_runStat = gi_runStat ;
     1126
    9351127  }
    9361128
    937   return NULL;
     1129  //must close all open files ....
     1130  snprintf(str,MXSTR,"Abort Writing Process ...");
     1131  factOut(kInfo,-1, str ) ;
     1132closerun:
     1133  snprintf(str,MXSTR,"Close all open files ...");
     1134  factOut(kInfo,-1, str ) ;
     1135  for ( j=0; j<MAX_RUN; j++)
     1136     if ( runCtrl[j].runId >0 ) {
     1137        int i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
     1138        if (i<0) {
     1139           snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
     1140           factOut(kError,506, str ) ;
     1141           runCtrl[j].fileId = 888 ;
     1142        } else {
     1143           snprintf(str,MXSTR,"closing run %d ok  AAA",runCtrl[j].runId);
     1144           factOut(kInfo,507, str ) ;
     1145           runCtrl[j].fileId = 7777 ;
     1146        }
     1147     }
     1148
     1149  gw_runStat = -99;
     1150  snprintf(str,MXSTR,"Exit Writing Process ...");
     1151  factOut(kInfo,-1, str ) ;
     1152  return 0;
     1153
     1154
     1155 
    9381156
    9391157} /*-----------------------------------------------------------------*/
     
    9411159
    9421160
     1161
     1162void StartEvtBuild() {
     1163
     1164  int i,j,imax,status,th_ret[50] ;
     1165  pthread_t thread[50] ;
     1166  struct timespec xwait ;
     1167  uint32_t actime ;
     1168
     1169  gi_runStat = gp_runStat = gw_runStat = 0 ;
     1170
     1171  snprintf(str,MXSTR,"Starting EventBuilder");
     1172  factOut(kInfo,-1, str ) ;
     1173
     1174
     1175   evtCtrl.frstPtr = 0 ;
     1176   evtCtrl.lastPtr = 0 ;
     1177
     1178   actime = g_actTime + 50000000 ;
     1179/* initialize run control logics */
     1180   for (i=0; i<MAX_RUN; i++) {
     1181      runCtrl[i].runId = 0 ;
     1182      runCtrl[i].lastTime = 0 ;
     1183      runCtrl[i].closeTime = time(NULL) + 3600*24*7;
     1184
     1185      runCtrl[i].nextEvt = 0 ;
     1186      runCtrl[i].fileId = -2 ;
     1187
     1188      runTail[i].nEventsOk =
     1189      runTail[i].nEventsRej =
     1190      runTail[i].nEventsBad =
     1191      runTail[i].PCtime0 =
     1192      runTail[i].PCtimeX = 0 ;
     1193   }
     1194
     1195//start all threads (more to come) when we are allowed to ....
     1196  while (g_runStat == 0 ) {
     1197     xwait.tv_sec = 0;
     1198     xwait.tv_nsec= 2000000 ;  // sleep for ~2 msec
     1199     nanosleep( &xwait , NULL ) ;
     1200  }
     1201
     1202  i=0 ;
     1203  th_ret[i] = pthread_create( &thread[i], NULL, readFAD,  NULL );
     1204  i++;
     1205  th_ret[i] = pthread_create( &thread[i], NULL, procEvt,  NULL );
     1206  i++;
     1207  th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, NULL );
     1208  i++;
     1209  imax=i ;
     1210
     1211
     1212
     1213
    9431214/*
    944 int main() {
    945   int i,th_ret[50] ;
    946   pthread_t thread[50] ;
    947 
    948   initReadFAD() ;
    949   i=0 ;
    950   th_ret[i] = pthread_create( &thread[i], NULL, readFAD,  (void*) i++ );
    951   th_ret[i] = pthread_create( &thread[i], NULL, procEvt,  (void*) i++ );
    952   th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, (void*) i++ );
    953 
    954   for(;;) { sleep(1); }
    955 
    956 
    957 }
    958 */
     1215
     1216     xwait.tv_sec = 20;;
     1217     xwait.tv_nsec= 0 ;  // sleep for ~20sec
     1218     nanosleep( &xwait , NULL ) ;
     1219
     1220
     1221     printf("close all runs in 2 seconds\n");
     1222
     1223
     1224     CloseRunFile( 0, time(NULL)+2) ;
     1225
     1226     xwait.tv_sec = 5;;
     1227     xwait.tv_nsec= 0 ;  // sleep for ~20sec
     1228     nanosleep( &xwait , NULL ) ;
     1229
     1230     printf("setting g_runstat to -1\n");
     1231
     1232     g_runStat = -1 ;
     1233
     1234
     1235 */
     1236
     1237
     1238
     1239
     1240
     1241
     1242
     1243
     1244
     1245//wait for all threads to finish
     1246  for (i=0; i<imax; i++) {
     1247     j = pthread_join ( thread[i], (void **)&status) ;
     1248  }
     1249
     1250} /*-----------------------------------------------------------------*/
     1251
Note: See TracChangeset for help on using the changeset viewer.