Changeset 11090 for trunk/FACT++/src/EventBuilder.c
- Timestamp:
- 06/21/11 15:40:04 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.c
r11082 r11090 52 52 int g_actTime = 0 ; 53 53 int g_runStat = 40 ; 54 int g_actBoards = 40 ;55 54 size_t g_maxMem ; //maximum memory allowed for buffer 56 55 57 int g_maxBoards ; //maximum number of boards to be initialized 56 //no longer needed ... 57 int g_maxBoards ; //maximum number of boards to be initialized 58 int g_actBoards ; 59 // 60 58 61 FACT_SOCK g_port[NBOARDS] ; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd" 59 62 … … 64 67 65 68 66 int gi_maxSocks = 0 ;67 69 uint gi_SecRate[MAX_SOCK] ; 68 70 uint gi_S10Rate[MAX_SOCK] ; … … 157 159 158 160 159 int GenSock(int flag, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) {161 int GenSock(int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) { 160 162 /* 161 163 *** generate Address, create sockets and allocates readbuffer for it 162 164 *** 163 *** if flag!=0 only close and redo the socket 165 *** if flag==0 generate socket and buffer 166 *** <0 destroy socket and buffer 167 *** >0 close and redo socket 168 *** 169 *** sid : board*7 + port id 164 170 */ 165 171 166 167 rd->sockStat = -1 ; 168 169 170 if (flag !=0 ) { 171 close(rd->socket) ; 172 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { 173 snprintf(str,MXSTR,"Could not generate socket | %m"); 172 int j ; 173 174 if (rd->sockStat ==0 ) { //close socket if open 175 j=close(rd->socket) ; 176 if (j>0) { 177 snprintf(str,MXSTR,"Error closing socket %d | %m",sid); 174 178 factOut(kFatal,771, str ) ; 175 return -2 ; 179 } else { 180 snprintf(str,MXSTR,"Succesfully closed socket %d",sid); 181 factOut(kInfo,771, str ) ; 176 182 } 183 } 184 185 186 if (flag < 0) { 187 free(rd->rBuf) ; //and never open again 188 rd->rBuf = NULL ; 189 rd->sockStat = 99 ; 177 190 return 0 ; 178 191 } 179 192 193 194 if (flag == 0) { //generate address and buffer ... 180 195 rd->Port = port ; 181 196 rd->SockAddr.sin_family = sockAddr->sin_family; … … 183 198 rd->SockAddr.sin_addr = sockAddr->sin_addr ; 184 199 185 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { 186 snprintf(str,MXSTR,"Could not generate socket | %m"); 187 factOut(kFatal,773, str ) ; 188 return -2 ; 189 } else { 190 rd->rBuf = malloc(sizeof(CNV_FACT) ) ; 191 if ( rd->rBuf == NULL ) { 192 snprintf(str,MXSTR,"Could not create local buffer"); 193 factOut(kFatal,774, str ) ; 194 return -3 ; 195 } 200 rd->rBuf = malloc(sizeof(CNV_FACT) ) ; 201 if ( rd->rBuf == NULL ) { 202 snprintf(str,MXSTR,"Could not create local buffer %d",sid); 203 factOut(kFatal,774, str ) ; 204 rd->sockStat = 77 ; 205 return -3 ; 196 206 } 207 } 208 209 210 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { 211 snprintf(str,MXSTR,"Could not generate socket %d | %m",sid); 212 factOut(kFatal,773, str ) ; 213 rd->sockStat = 88 ; 214 return -2 ; 215 } 216 217 snprintf(str,MXSTR,"Successfully generated socket %d ",sid); 218 factOut(kInfo,773, str ) ; 219 rd->sockStat = -1 ; //try to (re)open socket 197 220 return 0 ; 198 221 … … 272 295 273 296 274 needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2 ;297 needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2 ; 275 298 276 299 headmem = NBOARDS* sizeof(PEVNT_HEADER) ; … … 393 416 /* *** main loop reading FAD data and sorting them to complete events */ 394 417 int head_len,frst_len,numok,numok2,dest,evID,i,j,k ; 418 int actBoards = 0; 395 419 int32_t jrd ; 396 420 int32_t myRun ; … … 405 429 int nokCnt[MAX_SOCK],loopCnt=0; 406 430 int sokCnt[MAX_SOCK]; 431 int sockDef[NBOARDS]; 407 432 408 433 struct timeval *tv, atv; … … 422 447 cpu = 7 ; 423 448 CPU_SET( cpu, &mask ); 424 // cpu = 6 ;425 // CPU_SET( cpu, &mask );426 449 427 450 /* sched_setaffinity returns 0 in success */ … … 431 454 } 432 455 433 434 gi_maxSocks = 0 ;435 456 436 457 //make sure all sockets are preallocated as 'not exist' … … 439 460 rd[i].sockStat = 99 ; 440 461 } 441 442 int b,p,p0 ; 443 k = 0 ; 444 for (b=0; b<NBOARDS; b++ ) { 445 if ( g_port[b].sockDef >=0 ) { 446 p0=ntohs(g_port[b].sockAddr.sin_port); 447 for (p=p0+1; p<p0+8; p++) { 448 j = GenSock(0,p, &g_port[b].sockAddr, &rd[k]) ; 449 if ( j != 0 ) { 450 snprintf(str,MXSTR,"problem with Address board %d port %d",b,p); 451 factOut(kFatal,101, str ) ; 452 } else { 453 rd[k].board = b ; 454 k++ ; 455 gi_maxSocks++ ; 456 } 457 } 458 } 459 } 462 for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ; 463 460 464 461 465 g_actTime = time(NULL) ; … … 469 473 gi_SecTime= gi_S10Time= gi_MinTime= g_actTime ; 470 474 471 472 473 475 mBufInit() ; //initialize buffers 474 476 … … 494 496 495 497 496 while (g_runStat >=0) { //loop until global variable g_ stop is set498 while (g_runStat >=0) { //loop until global variable g_runStat claims stop 497 499 498 500 gi_runStat = g_runStat ; 501 502 int b,p,p0,s0,nch; 503 nch = 0 ; 504 for (b=0; b<NBOARDS; b++ ) { 505 k = b*7 ; 506 if ( g_port[b].sockDef != sockDef[b] ) { //something has changed ... 507 nch++ ; 508 gi_NumConnect[ b ] = 0 ; //must close all connections 509 if ( sockDef[b] == 0) s0= 0 ; //sockets to be defined and opened 510 else if (g_port[b].sockDef == 0) s0=-1 ; //sockets to be destroyed 511 else s0=+1 ; //sockets to be closed and reopened 512 513 if (s0 == 0) p0=ntohs(g_port[b].sockAddr.sin_port); 514 else p0=0 ; 515 516 for (p=p0+1; p<p0+8; p++) { 517 GenSock(s0, k, p, &g_port[b].sockAddr, &rd[k]) ; //generate address and socket 518 k++ ; 519 } 520 sockDef[b] = g_port[b].sockDef ; 521 } 522 } 523 524 if (nch > 0 ) { 525 actBoards = 0 ; 526 for (b=0; b<NBOARDS; b++ ) { 527 if ( sockDef[b] > 0 ) actBoards++ ; 528 } 529 } 530 499 531 500 532 g_actTime = time(NULL) ; … … 505 537 numok = 0 ; //count number of succesfull actions 506 538 507 for (i=0; i<gi_maxSocks; i++) { //check all sockets if something to read 508 509 gettimeofday( tv, NULL); 510 tsec = atv.tv_sec ; 511 tusec= atv.tv_usec ; 539 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read 540 b = i / 7 ; 541 542 gettimeofday( tv, NULL); 543 tsec = atv.tv_sec ; 544 tusec= atv.tv_usec ; 512 545 513 546 if (rd[i].sockStat <0 ) { //try to connect if not yet done … … 515 548 (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ; 516 549 if (rd[i].sockStat ==0 ) { //successfull ==> 517 rd[i].bufTyp = 0 ; // expect a header 518 rd[i].bufLen = frst_len ; // max size to read at begining 550 if (sockDef[b] > 0) { 551 rd[i].bufTyp = 0 ; // expect a header 552 rd[i].bufLen = frst_len ; // max size to read at begining 553 } else { 554 rd[i].bufTyp = -1 ; // data to be skipped 555 rd[i].bufLen = sizeof(CNV_FACT) ; //huge for skipping 556 } 519 557 rd[i].bufPos = 0 ; // no byte read so far 520 gi_NumConnect[ rd[i].board]++ ;558 gi_NumConnect[ b ]++ ; 521 559 numok++ ; //make sure next round will execute 522 snprintf(str,MXSTR,"+++connect %d %d", rd[i].board,gi_NumConnect[ rd[i].board]);560 snprintf(str,MXSTR,"+++connect %d %d",b,gi_NumConnect[ b ]); 523 561 factOut(kInfo,-1, str ) ; 524 562 } … … 530 568 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT); 531 569 532 533 if (jrd >0 ) { 534 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ; 535 } 536 537 570 if (jrd >0 ) { 571 qread+=jrd ; 572 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ; 573 } 538 574 539 575 if (jrd == 0) { //connection has closed ... 540 rd[i].sockStat = -1 ; //flag (try to reopen next round)541 576 snprintf(str,MXSTR,"Socket %d closed by FAD",i); 542 577 factOut(kInfo,441, str ) ; 543 j = GenSock(1, 0,NULL, &rd[i]) ;578 j = GenSock(1, i, 0,NULL, &rd[i]) ; 544 579 gi_ErrCnt[i]++ ; 545 gi_NumConnect[ rd[i].board ]-- ; 580 gi_NumConnect[ b ]-- ; 581 546 582 } else if ( jrd<0 ) { //did not read anything 547 583 if (errno != EAGAIN && errno != EWOULDBLOCK ) { … … 551 587 } else numok-- ; //else nothing waiting to be read 552 588 589 } else if ( rd[i].bufTyp <0 ) { // we are skipping this board ... 590 // just do nothing 591 553 592 } else if ( rd[i].bufTyp >0 ) { // we are reading data ... 554 qread+=jrd ;555 593 if ( jrd < rd[i].bufLen ) { //not yet all read 556 594 rd[i].bufPos += jrd ; //==> prepare for continuation 557 595 rd[i].bufLen -= jrd ; 558 debugRead(i,jrd,rd[i].evtID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data596 debugRead(i,jrd,rd[i].evtID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data 559 597 } else { //full dataset read 560 598 rd[i].bufLen = rd[i].bufPos + j ; … … 568 606 569 607 } 570 debugRead(i,jrd,rd[i].evtID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event608 debugRead(i,jrd,rd[i].evtID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event 571 609 572 610 //we have a complete buffer, copy to WORK area … … 585 623 //we have a valid entry in mBuffer[]; fill it 586 624 587 boardId = rd[i].board;625 boardId = b ; 588 626 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ; 589 627 int fadCrate = fadBoard/256 ; … … 652 690 evtCtrl.pcTime[ iDx ] = g_actTime ; 653 691 654 if (++mBuffer[evID].nBoard == g_actBoards ) {692 if (++mBuffer[evID].nBoard >= actBoards ) { 655 693 snprintf(str,MXSTR,"%5d complete event %8d %8d %2d",mBuffer[evID].evNum,evtCtrl.evtBuf[iDx],iDx,evtCtrl.evtStat[ iDx ]); 656 694 factOut(kDebug,-1, str ) ; … … 668 706 669 707 } else { //we are reading event header 670 qread+=jrd ;671 708 rd[i].bufPos += jrd ; 672 709 rd[i].bufLen -= jrd ; … … 704 741 rd[i].bufTyp = 1 ; //ready to read full record 705 742 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ; 706 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ; 707 debugRead(i,jrd,rd[i].evtID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event 743 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ; //? 744 debugRead(i,jrd,rd[i].evtID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event 745 } else { 746 debugRead(i,jrd,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet 708 747 } 709 else { 710 debugRead(i,jrd,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet 711 } 712 748 } else { 749 debugRead(i,jrd,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet 713 750 } 714 715 else {716 debugRead(i,jrd,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet717 }718 751 719 752 } //end interpreting last read … … 721 754 } //finished trying to read all sockets 722 755 723 int qwait=0, qdel=0, qskip=0 ;756 int qwait=0, qdel=0, qskip=0 ; 724 757 g_actTime = time(NULL) ; 725 758 if ( g_actTime > gi_SecTime ) { … … 827 860 } 828 861 rd[i].sockStat = -1 ; //flag (try to reopen next round) 829 gi_NumConnect[ rd[i].board]-- ;862 gi_NumConnect[ i/7 ]-- ; 830 863 } 831 864 … … 1325 1358 /*-----------------------------------------------------------------*/ 1326 1359 1360 1361 1327 1362 /* 1328 1329 1330 1363 FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) 1331 1364 { return 1; } ; … … 1379 1412 g_maxMem = g_maxMem * 1024 *10 ; //10GBytes 1380 1413 1381 g_maxBoards = 40 ;1382 1383 g_actBoards = g_maxBoards;1384 1414 1385 1415 g_runStat = 40 ; … … 1430 1460 1431 1461 } 1432 1433 */ 1462 */
Note:
See TracChangeset
for help on using the changeset viewer.