#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "EventBuilder.h" #define ETHTEST 0 #define MIN_LEN 32 // min #bytes needed to interpret FADheader #define MAX_LEN 64*1024 // size of read-buffer per socket int g_actTime = 0 ; int g_runStat = 40 ; int g_actBoards = 20 ; uint gi_SecRate[MAX_SOCK] ; uint gi_S10Rate[MAX_SOCK] ; uint gi_MinRate[MAX_SOCK] ; uint gi_ErrCnt[MAX_SOCK] ; uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards uint gi_SecTime, gi_S10Time, gi_MinTime ; uint gi_EvtStart= 0 ; uint gi_EvtRead = 0 ; uint gi_EvtBad = 0 ; uint gi_EvtTot = 0 ; uint gw_EvtTot = 0 ; uint gp_EvtTot = 0 ; EVT_CTRL evtCtrl ; //control of events during processing int evtIdx[MAX_EVT*MAX_RUN] ; //index from mBuffer to evtCtrl WRK_DATA mBuffer[MAX_EVT*MAX_RUN]; //local working space RUN_HEAD actRun ; RUN_CTRL runCtrl[MAX_RUN] ; RUN_TAIL runTail[MAX_RUN] ; /* *** Definition of rdBuffer to read in IP packets; keep it global !!!! */ typedef union { int8_t B[MAX_LEN/8]; int16_t S[MAX_LEN/4]; int32_t I[MAX_LEN/2]; int64_t L[MAX_LEN ]; } CNV_FACT ; typedef struct { int bufTyp ; //what are we reading at the moment: 0=header 1=data -1=skip ... int32_t bufPos ; //next byte to read to the buffer next int32_t bufLen ; //number of bytes left to read int sockStat ; //-1 if socket not yet connected int socket ; //contains the sockets struct sockaddr_in SockAddr ; //IP for each socket int evtID ; // event ID of event currently read int runID ; // run " //int evtPtr ; // index into evtCtrl structure uint fadLen ; // FADlength of event currently read int fadVers ; // Version of FAD int board ; // boardID (softwareID: 0..40 ) int Port ; // int8_t *rBuf; //local buffer to be used when no event defined yet CNV_FACT *rBuf ; } READ_STRUCT ; typedef union { int8_t B[2]; int16_t S ; } SHORT_BYTE ; struct timespec xwait ; SHORT_BYTE start, stop; READ_STRUCT rd[MAX_SOCK] ; //buffer to read IP and afterwards store in mBuffer /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ int GenSock(int flag, int crate0, int board0, int port0, READ_STRUCT *rd) { /* *** generate Address, create sockets and allocates readbuffer for it *** *** if flag!=0 only close and redo the socket */ int crate, board, port ; char IPstr[100] ; struct in_addr IPaddr ; rd->sockStat = -1 ; crate = crate0; board = board0; port = port0 ; if (flag !=0 ) { close(rd->socket) ; if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { error(1,errno,"Could not generate socket\n"); return -2 ; } return 0 ; } if (ETHTEST >0) { port = port0+100*crate0+10*board0 ; sprintf(IPstr,"10.0.%d.11",128+crate); // test on fact1 // if (board==3) sprintf(IPstr,"10.0.100.11"); // sprintf(IPstr,"10.0.131.11"); // test on fact1 inet_pton(PF_INET, IPstr, &IPaddr) ; port = port0+100*crate0+10*board0 ; } else { sprintf(IPstr,"10.0.%d.%d",128+crate,128+board); // real environment if ( inet_pton(PF_INET, IPstr, &IPaddr) <=0 ) { error(1,errno,"Error: bad address c=%d b=%d '%s'\n", crate, board, IPstr); return -1 ; } } rd->Port = port ; rd->board = crate0*10+board0 ; rd->SockAddr.sin_family = PF_INET; rd->SockAddr.sin_port = htons(port) ; rd->SockAddr.sin_addr = IPaddr ; if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { error(1,errno,"Could not generate socket\n"); return -2 ; } else { rd->rBuf = malloc(sizeof(CNV_FACT) ) ; if ( rd->rBuf == NULL ) { error(1,errno,"Could not create local buffer\n"); return -3 ; } } return 0 ; } /*-----------------------------------------------------------------*/ int PrintErr() { int k,c,b,p,s ; k=0 ; printf("Errors:\n"); for (c=0; c<4; c++) { for (b=0; b<10; b++) { s=0 ; printf("c%d b%d: ",c,b); for (p=1; p<8; p++) { printf("%7d",gi_ErrCnt[k]); s+=gi_ErrCnt[k]; } printf("%8d\n",s); } } return 0; } /*-----------------------------------------------------------------*/ int PrintRate() { int k,c,b,p,s ; if (g_actTime > gi_SecTime) { gi_SecTime = g_actTime ; printf("Nr Ev start %d compl %d bad %d\n",gi_EvtStart,gi_EvtRead,gi_EvtBad) ; k=0 ; printf("Rate/Second:\n"); for (c=0; c<4; c++) { for (b=0; b<10; b++) { s=0 ; printf("c%d b%d: ",c,b); for (p=1; p<8; p++) { printf("%7d",gi_SecRate[k]); s+= gi_SecRate[k]; gi_S10Rate[k]+=gi_SecRate[k]; gi_SecRate[k++]=0 ; } printf("%8d\n",s); } } for (b=0; b gi_S10Time) { gi_S10Time = g_actTime ; k=0 ; printf("Rate/10Second:\n"); for (c=0; c<4; c++) { for (b=0; b<10; b++) { s=0 ; printf("c%d b%d: ",c,b); for (p=1; p<8; p++) { printf("%7d",gi_S10Rate[k]); s+= gi_S10Rate[k]; gi_MinRate[k]+=gi_S10Rate[k]; gi_S10Rate[k++]=0 ; } printf("%8d\n",s); } } } if ( g_actTime%60 == 0 && g_actTime > gi_MinTime) { gi_MinTime = g_actTime ; k=0 ; printf("Rate/Minute:\n"); for (c=0; c<4; c++) { for (b=0; b<10; b++) { printf("c%d b%d: ",c,b); s=0 ; for (p=1; p<8; p++) { printf("%7d",gi_MinRate[k]); s+= gi_MinRate[k]; gi_MinRate[k++]=0 ; } printf("%8d\n",s); } } } return 0; } /*-----------------------------------------------------------------*/ int mBufInit() { // initialize mBuffer (mark all entries as unused\empty) int i,j,k ; for (i=0; i 1024) { printf("illegal nRoi %d\n",nRoi) ; return 99 ; } i = evID % MAX_EVT ; evFree = -1 ; for ( k=0; kStartPix = malloc( NPIX * sizeof(int16_t) ) ; if (mBuffer[i].fEvent->StartPix == NULL) { free(mBuffer[i].fEvent) ; mBuffer[i].fEvent = NULL ; mBuffer[i].nRoi = -3 ; return -13; } for (k=0; kStartPix[k] = -1 ; mBuffer[i].fEvent->StartTM = malloc( NTMARK * sizeof(int16_t) ) ; if (mBuffer[i].fEvent->StartTM == NULL) { free(mBuffer[i].fEvent->StartPix ) ; free(mBuffer[i].fEvent) ; mBuffer[i].fEvent = NULL ; mBuffer[i].nRoi = -4 ; return -14; } for (k=0; kStartTM[k] = -1 ; mBuffer[i].fEvent->Adc_Data = malloc( NPIX * nRoi * sizeof(int16_t) ) ; if (mBuffer[i].fEvent->Adc_Data == NULL) { free(mBuffer[i].fEvent->StartTM ) ; free(mBuffer[i].fEvent->StartPix ) ; free(mBuffer[i].fEvent) ; mBuffer[i].fEvent = NULL ; mBuffer[i].nRoi = -5 ; return -15; } mBuffer[i].fEvent->Adc_Tmark= malloc( NTMARK* nRoi * sizeof(int16_t) ) ; if (mBuffer[i].fEvent->Adc_Tmark== NULL) { free(mBuffer[i].fEvent->Adc_Data ) ; free(mBuffer[i].fEvent->StartTM ) ; free(mBuffer[i].fEvent->StartPix ) ; free(mBuffer[i].fEvent) ; mBuffer[i].fEvent = NULL ; mBuffer[i].nRoi = -6 ; return -16; } mBuffer[i].fEvent->FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ; if (mBuffer[i].fEvent->FADhead== NULL) { free(mBuffer[i].fEvent->Adc_Tmark ) ; free(mBuffer[i].fEvent->Adc_Data ) ; free(mBuffer[i].fEvent->StartTM ) ; free(mBuffer[i].fEvent->StartPix ) ; free(mBuffer[i].fEvent) ; mBuffer[i].fEvent = NULL ; mBuffer[i].nRoi = -7 ; return -17; } mBuffer[i].nBoard = 0 ; for (k=0; k 0) { //have an fEvent structure generated ... free(mBuffer[i].fEvent->Adc_Tmark) ; free(mBuffer[i].fEvent->Adc_Data ) ; free(mBuffer[i].fEvent->StartTM ) ; free(mBuffer[i].fEvent->StartPix ) ; free(mBuffer[i].fEvent ) ; mBuffer[i].fEvent = NULL ; } mBuffer[i].evNum = mBuffer[i].runNum = mBuffer[i].nRoi= -1; return 0 ; } /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ int initReadFAD() { /* *** initialize reading of FAD data */ int32_t i,j,k ; int c,b,p ; g_actTime = time(NULL) ; k = 0 ; for ( c=0; c<4; c++ ) for (b=0; b<10; b++ ) for (p=5001; p<5008; p++) { j = GenSock(0,c,b,p, &rd[k]) ; if ( j != 0 ) printf("problem with c%d b%d p%d\n",c,b,p); // else printf("ok socket %d = %d\n",k,rd[k].socket) ; k++ ; } for (k=0; k 0) { //loop until global variable g_stop is set g_actTime = time(NULL) ; nokCnt[numok]++; loopCnt++ ; numok = 0 ; //count number of succesfull actions for (i=0; i=0 ) { //successfull ==> rd[i].bufTyp = 0 ; // expect a header rd[i].bufLen = frst_len ; // max size to read at begining rd[i].bufPos = 0 ; // no byte read so far gi_NumConnect[ rd[i].board ]++ ; printf("+++connect %d %d\n",rd[i].board,gi_NumConnect[ rd[i].board ]); } } if (rd[i].sockStat >=0) { //we have a connection ==> try to read numok++ ; sokCnt[i]++; jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT); if (jrd == 0) { //connection has closed ... rd[i].sockStat = -1 ; //flag (try to reopen next round) error(0,errno,"Socket %d closed",i); j = GenSock(1,0,0,0, &rd[i]) ; gi_ErrCnt[i]++ ; gi_NumConnect[ rd[i].board ]-- ; printf("disconnect %d %d\n",rd[i].board,gi_NumConnect[ rd[i].board ]); } else if ( jrd<0 ) { //did not read anything if (errno != EAGAIN && errno != EWOULDBLOCK ) { error(1,errno,"Error Reading from %d",i); gi_ErrCnt[i]++ ; } else numok-- ; //else nothing waiting to be read } else if ( rd[i].bufTyp >0 ) { // we are reading data ... //printf("received data %d %d\n", i,jrd); if ( jrd < rd[i].bufLen ) { //not yet all read rd[i].bufPos += jrd ; //==> prepare for continuation rd[i].bufLen -= jrd ; } else { //full dataset read rd[i].bufLen = rd[i].bufPos + j ; rd[i].bufPos = rd[i].fadLen ; if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0] && rd[i].rBuf->B[ rd[i].bufPos ] != stop.B[1]) { gi_ErrCnt[i]++ ; printf( "wrong end of buffer found %d\n",rd[i].bufPos); exit(1) ; goto EndBuf ; } //we have a complete buffer, copy to WORK area gi_SecRate[i]++ ; roi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 ]) ; //get index into mBuffer for this event (create if needed) evID = mBufEvt( rd[i].evtID, rd[i].runID, roi ) ; if (evID < 0) { printf("no space left ...%d\n",evID) ; exit(2) ; goto EndBuf ; } //we have a valid entry in mBuffer[]; fill it boardId = rd[i].board ; if ( mBuffer[evID].board[ boardId ] != -1) { //this board already stored ... printf( "board of this event already stored ...") ; } else { int iDx = evtIdx[evID] ; //index into evtCtrl memcpy( &mBuffer[evID].fEvent->FADhead[boardId].start_package_flag, &rd[i].rBuf->S[0], head_len) ; mBuffer[evID].board[ boardId ] = boardId ; roi = mBuffer[evID].nRoi ; pixS = boardId*36 -1 ; // tmS = boardId*4 -1 ; // src = head_len/2 ; for ( drs=0; drs<4; drs++ ) { for ( px=0; px<9; px++ ) { pixH= ntohs(rd[i].rBuf->S[src++]) ; pixC= ntohs(rd[i].rBuf->S[src++]) ; pixR= ntohs(rd[i].rBuf->S[src++]) ; src++ ; pixS++ ; //pixS = pixH2S[pixH] ; if (pixR != roi ) { if (px == 8 && pixR == 2*roi ) { } else { printf("wrong roi %d %d %d %d\n",px,pixR,roi,src-2); //exit(66); } // goto EndBuf ; } mBuffer[evID].fEvent->StartPix[pixS] =pixC; dest= pixS * roi ; memcpy( &mBuffer[evID].fEvent->Adc_Data[dest], &rd[i].rBuf->S[src], roi * 2) ; src+= roi ; // if (px==8 && roi < 512 ) { // tmS++ ; // dest= tmS * roi ; // mBuffer[evID].fEvent->StartTM[pixS] =pixC+roi; // memcpy( // &mBuffer[evID].fEvent->Adc_Tmark[dest], // &rd[i].rBuf.S[src], roi * 2) ; // ?? not in the simulator ... src+= roi ; // } } } evtCtrl.evtStat[ iDx ]++ ; evtCtrl.pcTime[ iDx ] = g_actTime ; if (++mBuffer[evID].nBoard == 19 ) { //complete event read ---> flag for next processing evtCtrl.evtStat[ iDx ] = 99; gi_EvtRead++ ; gi_EvtTot++ ; printf("complete event --------------------------------------------------\n"); } }// now we have stored a new board contents into Event structure EndBuf: rd[i].bufTyp = 0 ; //ready to read next header rd[i].bufLen = frst_len ; rd[i].bufPos = 0 ; } } else { //we are reading event header rd[i].bufPos += jrd ; rd[i].bufLen -= jrd ; if ( rd[i].bufPos > MIN_LEN ){ //sufficient data to take action //check if startflag correct; else shift block .... for (k=0; kB[k ] == start.B[1] && rd[i].rBuf->B[k+1] == start.B[0] ) break ; } if (k >= rd[i].bufPos-1 ) { //no start of header found printf("no start of header found !!!!\n"); rd[i].bufPos = 0 ; rd[i].bufLen = head_len ; } else if ( k>0 ) { rd[i].bufPos -= k ; rd[i].bufLen += k ; memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ; } if ( rd[i].bufPos > MIN_LEN ) { goodhed++; rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ; ///??? rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ; rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; rd[i].runID = ntohl(rd[i].rBuf->I[11]) ; printf("received event %d %d\n",rd[i].evtID,i); if (rd[i].runID ==0 ) rd[i].runID = myRun ; rd[i].bufTyp = 1 ; //ready to read full record rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ; if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ; } } } //end interpreting last read } //end of successful read anything } //finished trying to read all sockets g_actTime = time(NULL) ; if ( g_actTime > gi_SecTime ) { // PrintRate() ; //loop over all active events and flag those older than read-timeout int kd = evtCtrl.readPtr - evtCtrl.writePtr ; if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; for ( k=evtCtrl.writePtr; k<(evtCtrl.writePtr+kd); k++ ) { int k0 = k % (MAX_EVT*MAX_RUN) ; if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90 && evtCtrl.pcTime[k0] < g_actTime-10 ) { evtCtrl.evtStat[k0] = 91 ; gi_EvtBad++ ; gi_EvtTot++ ; } } } if (numok > 0 ) numok2=0; else if (numok2++ > 3) { if (g_runStat == 1) { xwait.tv_sec = 1; xwait.tv_nsec= 0 ; // hibernate for 1 sec } else { xwait.tv_sec = 0; xwait.tv_nsec= 2000000 ; // sleep for ~2 msec // xwait.tv_nsec= 10000000 ; // sleep for ~10 msec } // printf("sleeping ...\n"); nanosleep( &xwait , NULL ) ; } } //and do next loop over all sockets ... return NULL; } /*-----------------------------------------------------------------*/ void *procEvt( void *ptr ) { /* *** main loop processing file, including SW-trigger */ int numProc ; int k,k1,k2,kd ; while (g_runStat > 0) { kd = evtCtrl.readPtr - evtCtrl.writePtr ; if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; k1=evtCtrl.writePtr; k2=evtCtrl.writePtr+kd; numProc = 0 ; if (gp_EvtTot < gi_EvtTot) { for ( k=k1; k 90 && evtCtrl.evtStat[k0] <500) { //ready to be processed ... int id = evtCtrl.evtBuf[k0] ; uint32_t irun = mBuffer[id].runNum ; int ievt = mBuffer[id].evNum ; printf("processing %d %d %d %d\n",ievt,k,evtCtrl.evtStat[k0],evtCtrl.writePtr) ; numProc++ ; evtCtrl.evtStat[k0] = 501 ; gp_EvtTot++ ; } } } if (numProc == 0) { //seems we have nothing to do, so sleep a little xwait.tv_sec = 0; xwait.tv_nsec= 10000000 ; // sleep for ~10 msec nanosleep( &xwait , NULL ) ; } } return NULL; } /*-----------------------------------------------------------------*/ void *writeEvt( void *ptr ) { /* *** main loop writing event (including opening and closing run-files */ int numWrite = 0 ; int j,id,irun,ievt ; while (g_runStat > 0) { //loop until global variable g_stop is set //loop over buffered events and check if something to write ... if ( gp_EvtTot == gw_EvtTot ) { //there is for sure nothing to do --> sleep a little xwait.tv_sec = 0; xwait.tv_nsec= 10000000 ; // sleep for ~10 msec nanosleep( &xwait , NULL ) ; } else { //go through evtCtrl list to check if there might be something //if run-file not yet opened==> open runfile (better to store headers in own structure ?) //if eventid == next event for run ==> write it (or flag it) //if eventid > next event exists, and nothing new for >time out ==> write it //if nothing for this run for >timeout ==> close run int kd = evtCtrl.readPtr - evtCtrl.writePtr ; if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; int k,k1,k2 ; k1=evtCtrl.writePtr; k2=evtCtrl.writePtr+kd; int evtTot=gw_EvtTot ; for ( k=k1; k 500 ) { //ready to be written ... id = evtCtrl.evtBuf[k0] ; irun = mBuffer[id].runNum ; ievt = mBuffer[id].evNum ; for ( j=0; j= MAX_RUN ) { printf("error: can not find run %d\n", irun); exit(111); } if (runCtrl[j].fileId < 0 ) { printf("open new run_file %d\n",irun) ; runCtrl[j].fileId = 999 ; // should be a function call runCtrl[j].nextEvt= 0; runCtrl[j].lastTime=g_actTime ; } if (runCtrl[j].nextEvt == ievt ) { //write this event printf("write event %d (run %d %d)\n",ievt,irun,evtCtrl.evtStat[k0] ) ; runCtrl[j].nextEvt= ievt+1; runCtrl[j].lastTime=g_actTime ; evtCtrl.evtStat[k0]= -1 ; gw_EvtTot++ ; numWrite++ ; // evtCtrl.writePtr=k+1; } else if ( ievt < runCtrl[j].nextEvt ) { printf("delayed event (run %d %d %d) skipped\n",ievt,irun,evtCtrl.evtStat[k0] ) ; evtCtrl.evtStat[k0]= -1 ; // evtCtrl.writePtr=k+1; gw_EvtTot++ ; numWrite++ ; } } if ( runCtrl[j].lastTime < g_actTime-15) { printf("non existing event skip %d (run %d -> %d)\n",runCtrl[j].nextEvt,irun,ievt) ; runCtrl[j].nextEvt++; numWrite++; } for ( j=0; j0 && runCtrl[j].lastTime < g_actTime-120) { printf("close run %d (timeout)\n",irun) ; runCtrl[j].fileId = -2 ; runCtrl[j].runId = 0 ; } } if (numWrite == 0 ) { //nothing to do at the moment ==> sleep a little xwait.tv_sec = 0; xwait.tv_nsec= 10000000 ; // sleep for ~10 msec nanosleep( &xwait , NULL ) ; } } } return NULL; } return NULL; } /*-----------------------------------------------------------------*/ /* int main() { int i,th_ret[50] ; pthread_t thread[50] ; initReadFAD() ; i=0 ; th_ret[i] = pthread_create( &thread[i], NULL, readFAD, (void*) i++ ); th_ret[i] = pthread_create( &thread[i], NULL, procEvt, (void*) i++ ); th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, (void*) i++ ); for(;;) { sleep(1); } } */