Ignore:
Timestamp:
06/21/11 15:40:04 (13 years ago)
Author:
tbretz
Message:
Changed bahaviour of opening closing sockets
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilder.c

    r11082 r11090  
    5252int g_actTime   =  0 ;
    5353int g_runStat   = 40 ;
    54 int g_actBoards = 40 ;
    5554size_t g_maxMem  ;  //maximum memory allowed for buffer
    5655
    57 int     g_maxBoards ;    //maximum number of boards to be initialized
     56//no longer needed ...
     57    int     g_maxBoards ;    //maximum number of boards to be initialized
     58    int     g_actBoards  ;
     59//
     60
    5861FACT_SOCK g_port[NBOARDS] ;  // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
    5962
     
    6467
    6568
    66 int  gi_maxSocks = 0 ; 
    6769uint gi_SecRate[MAX_SOCK] ;
    6870uint gi_S10Rate[MAX_SOCK] ;
     
    157159
    158160
    159 int GenSock(int flag, int port, struct sockaddr_in *sockAddr,  READ_STRUCT *rd) {
     161int GenSock(int flag, int sid,  int port, struct sockaddr_in *sockAddr,  READ_STRUCT *rd) {
    160162/*
    161163*** generate Address, create sockets and allocates readbuffer for it
    162164***
    163 *** if flag!=0 only close and redo the socket
     165*** if flag==0 generate socket and buffer
     166***         <0 destroy socket and buffer
     167***         >0 close and redo socket
     168***
     169*** sid : board*7 + port id
    164170 */
    165171
    166 
    167   rd->sockStat = -1 ;
    168 
    169 
    170   if (flag !=0 ) {
    171      close(rd->socket) ;
    172      if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
    173         snprintf(str,MXSTR,"Could not generate socket | %m");
     172  int j ;
     173
     174  if (rd->sockStat ==0 ) {   //close socket if open
     175     j=close(rd->socket) ;
     176     if (j>0) {
     177        snprintf(str,MXSTR,"Error closing socket %d | %m",sid);
    174178        factOut(kFatal,771, str ) ;
    175         return -2 ;
     179     } else {
     180        snprintf(str,MXSTR,"Succesfully closed socket %d",sid);
     181        factOut(kInfo,771, str ) ;
    176182     }
     183  }
     184
     185
     186  if (flag < 0) {
     187     free(rd->rBuf) ;   //and never open again
     188     rd->rBuf = NULL ;     
     189     rd->sockStat = 99 ; 
    177190     return 0 ;
    178191  }
    179192
     193
     194  if (flag == 0) {    //generate address and buffer ...
    180195     rd->Port  = port ;
    181196     rd->SockAddr.sin_family = sockAddr->sin_family;
     
    183198     rd->SockAddr.sin_addr = sockAddr->sin_addr ;
    184199
    185      if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
    186         snprintf(str,MXSTR,"Could not generate socket | %m");
    187         factOut(kFatal,773, str ) ;
    188         return -2 ;
    189      } else {
    190        rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
    191        if ( rd->rBuf == NULL ) {
    192           snprintf(str,MXSTR,"Could not create local buffer");
    193           factOut(kFatal,774, str ) ;
    194           return -3 ;
    195        }
     200     rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
     201     if ( rd->rBuf == NULL ) {
     202        snprintf(str,MXSTR,"Could not create local buffer %d",sid);
     203        factOut(kFatal,774, str ) ;
     204        rd->sockStat = 77 ;
     205        return -3 ;
    196206     }
     207  }
     208
     209
     210  if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
     211     snprintf(str,MXSTR,"Could not generate socket %d | %m",sid);
     212     factOut(kFatal,773, str ) ;
     213     rd->sockStat = 88 ;
     214     return -2 ;
     215  }
     216
     217  snprintf(str,MXSTR,"Successfully generated socket %d ",sid);
     218  factOut(kInfo,773, str ) ;
     219  rd->sockStat = -1 ;    //try to (re)open socket
    197220  return 0 ;
    198221
     
    272295
    273296   
    274    needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2;
     297   needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2 ;
    275298
    276299   headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
     
    393416/* *** main loop reading FAD data and sorting them to complete events */
    394417  int head_len,frst_len,numok,numok2,dest,evID,i,j,k ;
     418  int actBoards = 0;
    395419  int32_t jrd ;
    396420  int32_t myRun ;
     
    405429  int nokCnt[MAX_SOCK],loopCnt=0;
    406430  int sokCnt[MAX_SOCK];
     431  int sockDef[NBOARDS];
    407432
    408433  struct timeval  *tv, atv;
     
    422447   cpu = 7 ;
    423448   CPU_SET( cpu, &mask );
    424 // cpu = 6 ;
    425 // CPU_SET( cpu, &mask );
    426449
    427450/* sched_setaffinity returns 0 in success */
     
    431454   }
    432455
    433 
    434   gi_maxSocks = 0 ;
    435456
    436457  //make sure all sockets are preallocated as 'not exist'
     
    439460     rd[i].sockStat = 99 ;
    440461  }
    441 
    442   int b,p,p0 ;
    443   k = 0 ;
    444   for (b=0; b<NBOARDS; b++ ) {
    445      if ( g_port[b].sockDef >=0 ) {
    446         p0=ntohs(g_port[b].sockAddr.sin_port);
    447         for (p=p0+1; p<p0+8; p++) {
    448            j = GenSock(0,p, &g_port[b].sockAddr, &rd[k]) ;
    449            if ( j != 0 ) {
    450               snprintf(str,MXSTR,"problem with Address board %d port %d",b,p);
    451               factOut(kFatal,101, str ) ;
    452            } else {
    453               rd[k].board = b ;
    454               k++ ;
    455               gi_maxSocks++ ;
    456            }
    457         }
    458      }
    459   }
     462  for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ;
     463
    460464
    461465  g_actTime = time(NULL) ;
     
    469473  gi_SecTime= gi_S10Time= gi_MinTime= g_actTime ;
    470474
    471 
    472 
    473475  mBufInit() ;    //initialize buffers
    474476
     
    494496
    495497
    496   while (g_runStat >=0) {           //loop until global variable g_stop is set
     498  while (g_runStat >=0) {           //loop until global variable g_runStat claims stop
    497499
    498500    gi_runStat = g_runStat ;
     501
     502    int b,p,p0,s0,nch;
     503    nch = 0 ;
     504    for (b=0; b<NBOARDS; b++ ) {
     505       k = b*7 ;
     506       if ( g_port[b].sockDef != sockDef[b] ) {   //something has changed ...
     507          nch++ ;
     508          gi_NumConnect[ b ] = 0 ;                  //must close all connections
     509          if (            sockDef[b] == 0) s0= 0 ;  //sockets to be defined and opened   
     510          else if (g_port[b].sockDef == 0) s0=-1 ;  //sockets to be destroyed
     511          else                             s0=+1 ;  //sockets to be closed and reopened
     512
     513          if (s0 == 0) p0=ntohs(g_port[b].sockAddr.sin_port);
     514          else         p0=0 ;
     515
     516          for (p=p0+1; p<p0+8; p++) {
     517             GenSock(s0, k, p, &g_port[b].sockAddr, &rd[k]) ; //generate address and socket
     518             k++ ;
     519          }
     520          sockDef[b] = g_port[b].sockDef ;
     521       }
     522    }
     523
     524    if (nch > 0 ) {
     525       actBoards = 0 ;
     526       for (b=0; b<NBOARDS; b++ ) {
     527          if ( sockDef[b] > 0 ) actBoards++ ;
     528       }
     529    }
     530
    499531
    500532    g_actTime = time(NULL) ;
     
    505537    numok = 0 ;                       //count number of succesfull actions
    506538
    507     for (i=0; i<gi_maxSocks; i++) {      //check all sockets if something to read
    508 
    509 gettimeofday( tv, NULL);
    510 tsec = atv.tv_sec ;
    511 tusec= atv.tv_usec ;
     539    for (i=0; i<MAX_SOCK; i++) {         //check all sockets if something to read
     540      b = i / 7 ;
     541
     542      gettimeofday( tv, NULL);
     543      tsec = atv.tv_sec ;
     544      tusec= atv.tv_usec ;
    512545
    513546      if (rd[i].sockStat <0 ) {         //try to connect if not yet done
     
    515548            (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ;
    516549        if (rd[i].sockStat ==0 ) {      //successfull ==>
    517           rd[i].bufTyp = 0 ;            //  expect a header
    518           rd[i].bufLen = frst_len ;     //  max size to read at begining
     550          if (sockDef[b] > 0) {
     551             rd[i].bufTyp = 0 ;            //  expect a header
     552             rd[i].bufLen = frst_len ;     //  max size to read at begining
     553          } else {
     554             rd[i].bufTyp = -1 ;           //  data to be skipped
     555             rd[i].bufLen = sizeof(CNV_FACT) ; //huge for skipping
     556          }
    519557          rd[i].bufPos = 0 ;            //  no byte read so far
    520           gi_NumConnect[ rd[i].board ]++ ;
     558          gi_NumConnect[ b ]++ ;
    521559          numok++ ;                     //make sure next round will execute
    522           snprintf(str,MXSTR,"+++connect %d %d",rd[i].board,gi_NumConnect[ rd[i].board ]);
     560          snprintf(str,MXSTR,"+++connect %d %d",b,gi_NumConnect[ b ]);
    523561          factOut(kInfo,-1, str ) ;
    524562        }
     
    530568        jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
    531569
    532 
    533 if (jrd >0 ) {
    534   debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ;
    535 }
    536 
    537 
     570        if (jrd >0 ) {
     571           qread+=jrd ;
     572           debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ;
     573        }
    538574
    539575        if (jrd == 0) {                 //connection has closed ...
    540            rd[i].sockStat = -1 ;        //flag (try to reopen next round)
    541576           snprintf(str,MXSTR,"Socket %d closed by FAD",i);
    542577           factOut(kInfo,441, str ) ;
    543            j = GenSock(1,0,NULL, &rd[i]) ;
     578           j = GenSock(1, i, 0,NULL, &rd[i]) ;
    544579           gi_ErrCnt[i]++ ;
    545            gi_NumConnect[ rd[i].board ]-- ;
     580           gi_NumConnect[ b ]-- ;
     581
    546582        } else if ( jrd<0 ) {           //did not read anything
    547583           if (errno != EAGAIN && errno != EWOULDBLOCK ) {
     
    551587           } else  numok-- ;            //else nothing waiting to be read
    552588
     589        } else if ( rd[i].bufTyp <0 ) { // we are skipping this board ...
     590//         just do nothing
     591
    553592        } else if ( rd[i].bufTyp >0 ) { // we are reading data ...
    554            qread+=jrd ;
    555593           if ( jrd < rd[i].bufLen ) {    //not yet all read
    556594             rd[i].bufPos += jrd ;        //==> prepare for continuation
    557595             rd[i].bufLen -= jrd ;
    558 debugRead(i,jrd,rd[i].evtID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
     596             debugRead(i,jrd,rd[i].evtID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
    559597           } else {                     //full dataset read
    560598             rd[i].bufLen  = rd[i].bufPos + j ;
     
    568606
    569607             }
    570 debugRead(i,jrd,rd[i].evtID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
     608             debugRead(i,jrd,rd[i].evtID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
    571609
    572610             //we have a complete buffer, copy to WORK area
     
    585623             //we have a valid entry in mBuffer[]; fill it
    586624
    587              boardId = rd[i].board ;
     625             boardId = b ;
    588626             int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
    589627             int fadCrate = fadBoard/256 ;
     
    652690             evtCtrl.pcTime[ iDx ] = g_actTime ;
    653691
    654              if (++mBuffer[evID].nBoard == g_actBoards ) {
     692             if (++mBuffer[evID].nBoard >= actBoards ) {
    655693                snprintf(str,MXSTR,"%5d complete event %8d %8d %2d",mBuffer[evID].evNum,evtCtrl.evtBuf[iDx],iDx,evtCtrl.evtStat[ iDx ]);
    656694                factOut(kDebug,-1, str ) ;
     
    668706
    669707        } else {                        //we are reading event header
    670            qread+=jrd ;
    671708           rd[i].bufPos += jrd ;
    672709           rd[i].bufLen -= jrd ;
     
    704741                 rd[i].bufTyp = 1 ;       //ready to read full record
    705742                 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
    706                  if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ;
    707 debugRead(i,jrd,rd[i].evtID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event
     743                 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ;    //?
     744                 debugRead(i,jrd,rd[i].evtID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event
     745              } else {
     746                 debugRead(i,jrd,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
    708747              }
    709   else {
    710 debugRead(i,jrd,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
    711   }
    712 
     748           } else {
     749              debugRead(i,jrd,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
    713750           }
    714 
    715   else {
    716 debugRead(i,jrd,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
    717   }
    718751
    719752        } //end interpreting last read
     
    721754    } //finished trying to read all sockets
    722755
    723 int qwait=0, qdel=0, qskip=0 ;
     756    int qwait=0, qdel=0, qskip=0 ;
    724757    g_actTime = time(NULL) ;
    725758    if ( g_actTime > gi_SecTime ) {
     
    827860       }
    828861       rd[i].sockStat = -1 ;        //flag (try to reopen next round)
    829        gi_NumConnect[ rd[i].board ]-- ;
     862       gi_NumConnect[ i/7 ]-- ;
    830863    }
    831864
     
    13251358  /*-----------------------------------------------------------------*/
    13261359
     1360 
     1361
    13271362/*
    1328 
    1329 
    13301363FileHandle_t  runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len )
    13311364{ return 1; } ;
     
    13791412  g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
    13801413
    1381   g_maxBoards = 40 ;
    1382 
    1383   g_actBoards = g_maxBoards;
    13841414
    13851415  g_runStat = 40 ;
     
    14301460
    14311461}
    1432    
    1433    */
     1462*/
Note: See TracChangeset for help on using the changeset viewer.