Changeset 11134 for trunk/FACT++/src/EventBuilder.c
- Timestamp:
- 06/23/11 15:54:33 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.c
r11096 r11134 40 40 extern int runWrite(FileHandle_t fileHd , EVENT *event, size_t len ) ; 41 41 extern int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len ) ; 42 extern void factOut(int severity, int err, c onst char* message ) ;43 extern void factStat(int severity, int err, const char* message) ;42 extern void factOut(int severity, int err, char* message ) ; 43 extern void factStat(int64_t *array , int len ) ; 44 44 45 45 extern int eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ; 46 47 48 extern void debugHead(int i, void *buf); 46 49 47 50 extern void debugRead(int isock, int ibyte, int32_t event, int state, … … 118 121 int32_t bufPos ; //next byte to read to the buffer next 119 122 int32_t bufLen ; //number of bytes left to read 123 int32_t skip ; //number of bytes skipped before start of event 120 124 121 125 int sockStat ; //-1 if socket not yet connected , 99 if not exist … … 271 275 snprintf(str,MXSTR,"illegal nRoi: %d",nRoi) ; 272 276 factOut(kError, 1, str ) ; 273 return 99 ;277 return -9999 ; 274 278 } 275 279 … … 289 293 //event does not yet exist; create 290 294 if (evFree < 0 ) { //no space available in ctrl 291 snprintf(str,MXSTR,"no control slot to keep event ...") ;295 snprintf(str,MXSTR,"no control slot to keep event %d",evID) ; 292 296 factOut(kError,881, str ) ; 293 297 return -1 ; … … 301 305 302 306 if ( gi_usedMem + needmem + headmem > g_maxMem) { 303 snprintf(str,MXSTR,"no memory left to keep event ...") ;307 snprintf(str,MXSTR,"no memory left to keep event %d",evID) ; 304 308 factOut(kError,882, str ) ; 305 return -1 ;309 return -11 ; 306 310 } 307 311 308 312 mBuffer[i].FADhead = malloc( headmem ) ; 309 313 if (mBuffer[i].FADhead == NULL) { 314 snprintf(str,MXSTR,"malloc header failed for event %d",evID) ; 315 factOut(kError,882, str ) ; 310 316 return -12; 311 317 } … … 313 319 mBuffer[i].fEvent = malloc( needmem ) ; 314 320 if (mBuffer[i].fEvent == NULL) { 321 snprintf(str,MXSTR,"malloc data failed for event %d",evID) ; 322 factOut(kError,882, str ) ; 315 323 free(mBuffer[i].fEvent) ; 316 324 mBuffer[i].fEvent = NULL ; … … 367 375 if (evFree <0 ) { 368 376 snprintf(str,MXSTR,"not able to register the new run %d",runID); 369 factOut(k Error,883, str ) ;377 factOut(kFatal,883, str ) ; 370 378 } else { 371 379 runCtrl[evFree].runId = runID ; … … 416 424 void *readFAD( void *ptr ) { 417 425 /* *** main loop reading FAD data and sorting them to complete events */ 418 int head_len,frst_len,numok,numok2,dest,evID,i, j,k ;419 int actBoards = 0 ;426 int head_len,frst_len,numok,numok2,dest,evID,i,k ; 427 int actBoards = 0, minLen ; 420 428 int32_t jrd ; 421 429 int32_t myRun ; … … 484 492 head_len = sizeof(PEVNT_HEADER) ; 485 493 //frst_len = head_len + 36 * 12 ; //fad_header plus 36*pix_header 486 frst_len = head_len ; //fad_header only, so each event must be longer, even for roi=0 487 494 frst_len = head_len ; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0 495 496 //minLen = MIN_LEN ; //min #bytes needed to check header 497 minLen = head_len ; //min #bytes needed to check header: full header for debug 488 498 489 499 numok = numok2 = 0 ; … … 540 550 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read 541 551 b = i / 7 ; 552 if (sockDef[b] > 0) s0=+1 ; 553 else s0=-1 ; 542 554 543 555 gettimeofday( tv, NULL); … … 553 565 rd[i].bufLen = frst_len ; // max size to read at begining 554 566 } else { 555 rd[i].bufTyp = -1 ; // data to be skipped567 rd[i].bufTyp = -1 ; // full data to be skipped 556 568 rd[i].bufLen = sizeof(CNV_FACT) ; //huge for skipping 557 569 } 558 570 rd[i].bufPos = 0 ; // no byte read so far 571 rd[i].skip = 0 ; // start empty 559 572 gi_NumConnect[ b ]++ ; 560 573 numok++ ; //make sure next round will execute … … 564 577 } 565 578 566 if (rd[i].sockStat ==0) { //we have a connection ==> try to read 567 numok++ ; 568 sokCnt[i]++; 569 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT); 570 571 if (jrd >0 ) { 572 qread+=jrd ; 573 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ; 574 } 575 576 if (jrd == 0) { //connection has closed ... 577 snprintf(str,MXSTR,"Socket %d closed by FAD",i); 578 factOut(kInfo,441, str ) ; 579 GenSock(1, i, 0,NULL, &rd[i]) ; 580 gi_ErrCnt[i]++ ; 581 gi_NumConnect[ b ]-- ; 582 583 } else if ( jrd<0 ) { //did not read anything 584 if (errno != EAGAIN && errno != EWOULDBLOCK ) { 585 snprintf(str,MXSTR,"Error Reading from %d | %m",i); 586 factOut(kError,442, str ) ; 587 gi_ErrCnt[i]++ ; 588 } else numok-- ; //else nothing waiting to be read 589 590 } else if ( rd[i].bufTyp <0 ) { // we are skipping this board ... 579 if (rd[i].sockStat ==0 ) { //we have a connection ==> try to read 580 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full] 581 numok++ ; 582 sokCnt[i]++; 583 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT); 584 585 if (jrd >0 ) { 586 qread+=jrd ; 587 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ; 588 } 589 590 if (jrd == 0) { //connection has closed ... 591 snprintf(str,MXSTR,"Socket %d closed by FAD",i); 592 factOut(kInfo,441, str ) ; 593 GenSock(s0, i, 0,NULL, &rd[i]) ; 594 gi_ErrCnt[i]++ ; 595 gi_NumConnect[ b ]-- ; 596 597 } else if ( jrd<0 ) { //did not read anything 598 if (errno != EAGAIN && errno != EWOULDBLOCK ) { 599 snprintf(str,MXSTR,"Error Reading from %d | %m",i); 600 factOut(kError,442, str ) ; 601 gi_ErrCnt[i]++ ; 602 } else numok-- ; //else nothing waiting to be read 603 jrd = 0 ; 604 } 605 } else jrd = 0 ; //did read nothing as requested 606 607 if ( rd[i].bufTyp <0 ) { // we are skipping this board ... 591 608 // just do nothing 592 609 … … 597 614 debugRead(i,jrd,rd[i].evtID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data 598 615 } else { //full dataset read 599 rd[i].bufLen = rd[i].bufPos + j;616 rd[i].bufLen = 0 ; 600 617 rd[i].bufPos = rd[i].fadLen ; 601 618 if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0] … … 616 633 evID = mBufEvt( rd[i].evtID, rd[i].runID, roi ) ; 617 634 635 if (evID <-9000) goto EndBuf ; //illegal event, skip it ... 618 636 if (evID < 0) { 619 snprintf(str,MXSTR,"no space left ...%d",evID) ; 620 factOut(kError,201, str ) ; 621 goto EndBuf ; //--> skip event (and hope it will improve) 637 xwait.tv_sec = 0; 638 xwait.tv_nsec= 20000000 ; // sleep for ~20 msec 639 nanosleep( &xwait , NULL ) ; 640 goto EndBuf1 ; //hope there is free space next round 622 641 } 642 623 643 624 644 //we have a valid entry in mBuffer[]; fill it … … 704 724 rd[i].bufLen = frst_len ; 705 725 rd[i].bufPos = 0 ; 726 EndBuf1: 727 ; 706 728 } 707 729 … … 709 731 rd[i].bufPos += jrd ; 710 732 rd[i].bufLen -= jrd ; 711 if ( rd[i].bufPos > MIN_LEN){ //sufficient data to take action733 if ( rd[i].bufPos >= minLen ){ //sufficient data to take action 712 734 //check if startflag correct; else shift block .... 713 735 for (k=0; k<rd[i].bufPos -1 ; k++) { … … 715 737 && rd[i].rBuf->B[k+1] == start.B[0] ) break ; 716 738 } 717 718 719 for (k=0; k<rd[i].bufPos -1 ; k++) { 720 if (rd[i].rBuf->B[k ] == start.B[1] 721 && rd[i].rBuf->B[k+1] == start.B[0] ) break ; 722 } 739 rd[i].skip += k ; 723 740 724 741 if (k >= rd[i].bufPos-1 ) { //no start of header found 725 snprintf(str,MXSTR,"no start of header on port%d", i ) ;726 factOut(kWarn,666, str ) ;742 // snprintf(str,MXSTR,"no start of header on port%d", i ) ; 743 // factOut(kWarn,666, str ) ; 727 744 728 745 rd[i].bufPos = 0 ; … … 733 750 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ; 734 751 } 735 if ( rd[i].bufPos > MIN_LEN ) { 752 if ( rd[i].bufPos >= minLen ) { 753 if ( rd[i].skip >0 ) { 754 snprintf(str,MXSTR,"skipped %d bytes on port%d", rd[i].skip, i ) ; 755 factOut(kInfo,666, str ) ; 756 rd[i].skip = 0 ; 757 } 736 758 goodhed++; 737 759 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ; … … 739 761 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; //(FADevt) 740 762 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ; 741 if (rd[i].runID ==0 ) rd[i].runID = myRun ;763 if (rd[i].runID ==0 ) rd[i].runID = myRun ; 742 764 rd[i].bufTyp = 1 ; //ready to read full record 743 765 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ; 744 766 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ; //? 767 debugHead(i,rd[i].rBuf); 745 768 debugRead(i,jrd,rd[i].evtID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event 746 769 } else { … … 759 782 if ( g_actTime > gi_SecTime ) { 760 783 gi_SecTime = g_actTime ; 761 // PrintRate() ;762 784 763 785 … … 807 829 int ib ; 808 830 for (ib=0; ib<NBOARDS; ib++) qconn+=gi_NumConnect[ib] ; 809 810 snprintf(str,MXSTR,"bfr%5d skp%4d free%4d (tot%7d) mem%9lu rd%10d %3d",qwait,qskip,qdel,qtot,gi_usedMem,qread,qconn); 811 factStat(kInfo,-1, str ) ; 831 int64_t stat[9] ; 832 stat[0]= qwait; 833 stat[1]= qskip; 834 stat[2]= qdel ; 835 stat[3]= qtot ; 836 stat[4]= gi_usedMem ; 837 stat[5]= qread; 838 stat[6]= qconn; 839 840 factStat(stat,7); 812 841 qread=0 ; 813 842 } … … 855 884 for (i=0; i<MAX_SOCK; i++) { 856 885 GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy socket 857 if (gi_NumConnect[ i/7 ]>0) 858 gi_NumConnect[ i/7 ]-- ; 886 gi_NumConnect[ i/7 ]-- ; 859 887 } 860 888 … … 973 1001 } else { 974 1002 mBuffer[id].fEvent->BoardTime[ib] = 975 ntohl(mBuffer[id].FADhead[ib].time) ;1003 ntohl(mBuffer[id].FADhead[ib].time) ; 976 1004 } 977 1005 } … … 1364 1392 1365 1393 1366 1367 1394 /* 1395 1368 1396 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) 1369 1397 { return 1; } ; … … 1389 1417 1390 1418 1391 void factStat(int severity, int err, char* message ) { 1392 printf("%3d %3d : %s\n",severity,err,message) ; 1419 void factStat(int64_t *array, int len ) { 1420 printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n", 1421 array[0],array[1],array[2],array[3],array[4],array[5],array[6]); 1393 1422 } 1394 1423 … … 1401 1430 1402 1431 void debugStream(int isock, void *buf, int len) { 1432 } 1433 1434 void debugHead(int i, void *buf) { 1403 1435 } 1404 1436 … … 1450 1482 //} 1451 1483 // 1452 //version for PC-test 1484 //version for PC-test 1453 1485 for (c=0; c<4; c++) { 1454 1486 for (b=0; b<10; b++) { … … 1478 1510 1479 1511 } 1480 */ 1512 */
Note:
See TracChangeset
for help on using the changeset viewer.