Index: trunk/FACT++/src/EventBuilder.c
===================================================================
--- trunk/FACT++/src/EventBuilder.c	(revision 11747)
+++ trunk/FACT++/src/EventBuilder.c	(revision 11748)
@@ -1,6 +1,4 @@
 
 // // // #define EVTDEBUG
-
-
 
 #include <stdlib.h>
@@ -17,4 +15,6 @@
 #include <sys/types.h> 
 #include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
 #include <pthread.h>
 #include <sched.h>
@@ -48,5 +48,7 @@
 extern void factStatNew(EVT_STAT gi) ;
 
-extern int  eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ;
+extern int  eventCheck( uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event) ;
+
+extern int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer) ;
 
 extern void debugHead(int i, int j, void *buf); 
@@ -58,4 +60,9 @@
 int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
 
+
+int g_maxProc   ;
+int g_maxSize   ;
+int gi_maxSize  ;
+int gi_maxProc  ;
 
 uint g_actTime   ;
@@ -194,4 +201,6 @@
 
   int j ;
+  int optval = 1 ;                       //activate keepalive
+  socklen_t optlen = sizeof(optval);
 
   if (rd->sockStat ==0 ) {   //close socket if open
@@ -243,4 +252,26 @@
      return -2 ;
   }
+  optval=1;
+  if ( setsockopt(rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
+     snprintf(str,MXSTR,"Could not set keepalive %d | %m",sid);
+     factOut(kInfo,173, str ) ;  //but continue anyhow
+  }
+  optval=10; //start after 10 seconds
+  if ( setsockopt(rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
+     snprintf(str,MXSTR,"Could not set keepidle %d | %m",sid);
+     factOut(kInfo,173, str ) ;  //but continue anyhow
+  }
+  optval=10; //do every 10 seconds
+  if ( setsockopt(rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
+     snprintf(str,MXSTR,"Could not set keepintvl %d | %m",sid);
+     factOut(kInfo,173, str ) ;  //but continue anyhow
+  }
+  optval=2;  //close after 2 unsuccessful tries
+  if ( setsockopt(rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
+     snprintf(str,MXSTR,"Could not set keepalive probes %d | %m",sid);
+     factOut(kInfo,173, str ) ;  //but continue anyhow
+  }
+
+
 
   snprintf(str,MXSTR,"Successfully generated socket %d ",sid);
@@ -352,7 +383,7 @@
 //       count for inconsistencies
 
-         if ( mBuffer[i].trgNum != trgNum ) mBuffer[i].Errors++   ;
-         if ( mBuffer[i].fadNum != fadNum ) mBuffer[i].Errors+=100 ;
-         if ( mBuffer[i].trgTyp != trgTyp ) mBuffer[i].Errors+=10000 ;
+         if ( mBuffer[i].trgNum != trgNum ) mBuffer[i].Errors[0]++ ;
+         if ( mBuffer[i].fadNum != fadNum ) mBuffer[i].Errors[1]++ ;
+         if ( mBuffer[i].trgTyp != trgTyp ) mBuffer[i].Errors[2]++ ;
 
          //everything seems fine so far ==> use this slot ....
@@ -428,6 +459,6 @@
    headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
 
-   if ( gj.usdMem + needmem + headmem > g_maxMem) {
-        gj.maxMem = gj.usdMem + needmem + headmem ;
+   if ( gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
+        gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize ;
         if (gi_memStat>0 ) {
            gi_memStat = -99 ;
@@ -452,7 +483,18 @@
         snprintf(str,MXSTR,"malloc data failed for event %d",evID) ;
         factOut(kError,882, str ) ;
+      free(mBuffer[i].FADhead) ;
+      mBuffer[i].FADhead = NULL ;
+      return -22;
+   }
+
+   mBuffer[i].buffer  = malloc( gi_maxSize ) ;
+   if (mBuffer[i].buffer == NULL) {
+        snprintf(str,MXSTR,"malloc buffer failed for event %d",evID) ;
+        factOut(kError,882, str ) ;
+      free(mBuffer[i].FADhead) ;
+      mBuffer[i].FADhead = NULL ;
       free(mBuffer[i].fEvent) ;
       mBuffer[i].fEvent = NULL ;
-      return -22;
+      return -32;
    }
 
@@ -485,7 +527,10 @@
    mBuffer[i].trgTyp  = trgTyp ;
    mBuffer[i].evtLen  = needmem ;
-   mBuffer[i].Errors  = 0 ;
-
-   gj.usdMem += needmem + headmem;
+   mBuffer[i].Errors[0] =
+   mBuffer[i].Errors[1] =
+   mBuffer[i].Errors[2] =
+   mBuffer[i].Errors[3] = 0 ;
+
+   gj.usdMem += needmem + headmem + gi_maxSize ;
    if (gj.usdMem > gj.maxMem ) gj.maxMem = gj.usdMem ; 
 
@@ -535,9 +580,12 @@
    mBuffer[i].FADhead = NULL ;
 
+   free(mBuffer[i].buffer ) ;
+   mBuffer[i].buffer = NULL ;
+
    headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
    mBuffer[i].evNum   = mBuffer[i].nRoi= -1;
    mBuffer[i].runNum  = 0;
 
-   gj.usdMem = gj.usdMem - freemem - headmem;
+   gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize ;
    gj.bufTot-- ;
 
@@ -634,4 +682,5 @@
      factOut(kWarn,-1, str ) ;
   }
+
 
   head_len = sizeof(PEVNT_HEADER) ;
@@ -988,5 +1037,4 @@
                    mBuffer[evID].evNum,roi[0],roi[8]-roi[0],qncpy,qnrun);
                 factOut(kDebug,-1, str ) ;
-factOut(kInfo,-1, str ) ;
 
                 //complete event read ---> flag for next processing
@@ -1035,6 +1083,6 @@
                  rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
                  rd[i].ftmTyp = ntohl(rd[i].rBuf->S[5]) ; 
+                 rd[i].ftmID  = ntohl(rd[i].rBuf->I[3]) ; //(FTMevt)
                  rd[i].evtID  = ntohl(rd[i].rBuf->I[4]) ; //(FADevt)
-                 rd[i].ftmID  = ntohl(rd[i].rBuf->I[5]) ; //(FTMevt)
                  rd[i].runID  = ntohl(rd[i].rBuf->I[11]) ;
                  rd[i].bufTyp = 1 ;       //ready to read full record
@@ -1107,5 +1155,5 @@
                 gj.evtSkip++; 
              } 
-          } else if (evtCtrl.evtStat[k0] >= 900     //'delete'
+          } else if (evtCtrl.evtStat[k0] >= 9000    //'delete'
                   || evtCtrl.evtStat[k0] == 0 ) {   //'useless'
 
@@ -1287,15 +1335,74 @@
 
 
+void *subProc( void *thrid ) {
+  int threadID,status,numWait,numProc,kd,k1,k0,k,jret;
+  struct timespec xwait ;
+
+  threadID= (int) thrid; 
+
+  snprintf(str,MXSTR,"Starting sub-process-thread %d",threadID);
+  factOut(kInfo,-1, str ) ;
+
+  while (g_runStat > -2) {   //in case of 'exit' we still must process pending events
+     numWait = numProc = 0 ;
+     int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
+     if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
+
+     int k1=evtCtrl.frstPtr;
+     for ( k=k1; k<(k1+kd); k++ ) {
+        int k0 = k % (MAX_EVT*MAX_RUN) ;
+
+        if (evtCtrl.evtStat[k0] ==1000+threadID) {
+           if ( gi_resetR > 1 ) {    //we are asked to flush buffers asap
+              jret= 9100 ;     //flag to be deleted
+           } else {
+              int id = evtCtrl.evtBuf[k0] ;
+              jret=subProcEvt(threadID, mBuffer[id].FADhead, mBuffer[id].fEvent, mBuffer[id].buffer) ;
+              if (jret <= threadID) {
+                 snprintf(str,MXSTR,"process %d wants to send to process %d",threadID,jret) ;
+                 factOut(kError,-1, str ) ;
+                 jret = 5300;
+              } else if ( jret <=0 )          jret = 9200+threadID ;  //flag as 'to be deleted'
+                else if ( jret >=gi_maxProc ) jret = 5200+threadID ;  //flag as 'to be written'
+                else                          jret = 1000+jret ;      //flag for next proces
+           }
+           evtCtrl.evtStat[k0] = jret ;
+           numProc++ ;
+        } else if (evtCtrl.evtStat[k0] <1000+threadID) numWait++ ;
+     }
+
+     if ( gj.readStat < -10 && numWait == 0) {  //nothing left to do
+        snprintf(str,MXSTR,"Exit subProcessing Process %d",threadID);
+        factOut(kInfo,-1, str ) ;
+        return 0 ;
+     }
+     if (numProc == 0) {
+        //seems we have nothing to do, so sleep a little
+        xwait.tv_sec = 0;
+        xwait.tv_nsec= 2000000 ;  // sleep for ~2 msec
+        nanosleep( &xwait , NULL ) ;
+     }
+  }
+
+  snprintf(str,MXSTR,"Ending sub-process-thread %d",threadID);
+  factOut(kInfo,-1, str ) ;
+  return ; 
+} /*-----------------------------------------------------------------*/
+
+
 void *procEvt( void *ptr ) {
 /* *** main loop processing file, including SW-trigger */
   int numProc, numWait ;
-  int k ;
+  int k, status ;
   struct timespec xwait ;
   char str[MXSTR] ;
 
+
+
+
   cpu_set_t mask;
-  int cpu = 5 ;   //process thread  (will be several in final version)
-
-  snprintf(str,MXSTR,"Starting process-thread");
+  int cpu = 1 ;   //process thread  (will be several in final version)
+
+  snprintf(str,MXSTR,"Starting process-thread with %d subprocess",gi_maxProc);
   factOut(kInfo,-1, str ) ;
 
@@ -1303,5 +1410,12 @@
    CPU_ZERO( &mask );
 /* CPU_SET sets only the bit corresponding to cpu. */
-   CPU_SET( cpu, &mask );
+// CPU_SET(  0 , &mask );  leave for system
+// CPU_SET(  1 , &mask );  used by write process
+   CPU_SET(  2 , &mask );
+   CPU_SET(  3 , &mask );
+   CPU_SET(  4 , &mask );
+   CPU_SET(  5 , &mask );
+   CPU_SET(  6 , &mask );
+// CPU_SET(  7 , &mask );  used by read process
 /* sched_setaffinity returns 0 in success */
    if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
@@ -1311,4 +1425,11 @@
 
 
+  pthread_t thread[100] ;
+  int th_ret[100];
+
+  for (k=0; k < gi_maxProc; k++) {
+     th_ret[k] = pthread_create( &thread[k], NULL, subProc,  (void *)k );
+  }
+
   while (g_runStat > -2) {   //in case of 'exit' we still must process pending events
 
@@ -1321,8 +1442,8 @@
         int k0 = k % (MAX_EVT*MAX_RUN) ;
 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
-        if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
+        if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <1000) {
 
          if ( gi_resetR > 1 ) {    //we are asked to flush buffers asap
-           evtCtrl.evtStat[k0] = 991 ; 
+           evtCtrl.evtStat[k0] = 9991 ; 
          } else {
 
@@ -1333,5 +1454,4 @@
            int      roi  = mBuffer[id].nRoi ;
            int      roiTM= mBuffer[id].nRoiTM ;
-           int      Errors=mBuffer[id].Errors ;
 //         uint32_t irun = mBuffer[id].runNum ;
 //snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ;
@@ -1361,5 +1481,8 @@
            mBuffer[id].fEvent->TriggerNum = itevt ;
            mBuffer[id].fEvent->TriggerType = itrg ;
-           mBuffer[id].fEvent->Errors = Errors ;
+           mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0] ;
+           mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1] ;
+           mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2] ;
+           mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3] ;
            mBuffer[id].fEvent->SoftTrig = 0 ;
 
@@ -1375,5 +1498,6 @@
            }
 
-           int i=eventCheck(mBuffer[id].FADhead,mBuffer[id].fEvent) ;
+           int i=eventCheck(mBuffer[id].runNum, mBuffer[id].FADhead,
+               mBuffer[id].fEvent) ;
 //         gj.procEvt++ ;
            gi.procTot++ ;
@@ -1381,8 +1505,8 @@
            
            if (i<0) {
-              evtCtrl.evtStat[k0] = 999 ; //flag event to be skipped
+              evtCtrl.evtStat[k0] = 9999 ; //flag event to be skipped
               gi.procErr++ ;
            } else {
-              evtCtrl.evtStat[k0] = 520 ;
+              evtCtrl.evtStat[k0] = 1000 ;
            }
          }
@@ -1419,9 +1543,13 @@
   if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
 
+  for (k=0; k<gi_maxProc; k++) {
+     pthread_join( thread[k], (void **)&status) ;
+  }
+
   int k1=evtCtrl.frstPtr;
   for ( k=k1; k<(k1+kd); k++ ) {
      int k0 = k % (MAX_EVT*MAX_RUN) ;
-     if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <500) {
-        evtCtrl.evtStat[k0] = 555 ; //flag event as 'processed'
+     if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <1000) {
+        evtCtrl.evtStat[k0] = 9800 ; //flag event as 'processed'
      }
   }
@@ -1479,5 +1607,5 @@
 
   cpu_set_t mask;
-  int cpu = 3 ;   //write thread
+  int cpu = 1 ;   //write thread
 
   snprintf(str,MXSTR,"Starting write-thread");
@@ -1505,8 +1633,8 @@
         int k0 = k % (MAX_EVT*MAX_RUN) ;
 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
-        if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] < 900) {
+        if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9900) {
 
          if ( gi_resetR > 1 ) {        //we must drain the buffer asap
-           evtCtrl.evtStat[k0] = 904 ;
+           evtCtrl.evtStat[k0] = 9904 ;
          } else {
 
@@ -1572,5 +1700,5 @@
                  factOut(kDebug,123,str) ;
               }
-              evtCtrl.evtStat[k0] = 903 ;
+              evtCtrl.evtStat[k0] = 9903 ;
               gi.wrtErr++ ;
            } else {
@@ -1579,5 +1707,5 @@
                  runCtrl[j].lastTime = g_actTime; 
                  runCtrl[j].actEvt++ ;
-                 evtCtrl.evtStat[k0] = 901 ;
+                 evtCtrl.evtStat[k0] = 9901 ;
                  snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0);
                  factOut(kDebug,504, str ) ;
@@ -1586,5 +1714,5 @@
                  snprintf(str,MXSTR,"W error writing event for run %d",irun) ;
                  factOut(kError,503, str ) ;
-                 evtCtrl.evtStat[k0] = 902 ;
+                 evtCtrl.evtStat[k0] = 9902 ;
                  gi.wrtErr++ ;
               }
@@ -1618,5 +1746,5 @@
          }
         } else if (evtCtrl.evtStat[k0] > 0
-                && evtCtrl.evtStat[k0] < 900 ) numWait++ ;
+                && evtCtrl.evtStat[k0] < 9000 ) numWait++ ;
      }
 
@@ -1724,4 +1852,15 @@
   }
 
+//prepare for subProcesses
+  gi_maxSize = g_maxSize ;
+  if (gi_maxSize <=0 ) gi_maxSize = 1 ;
+
+  gi_maxProc = g_maxProc ;
+  if (gi_maxProc <=0  || gi_maxProc>90) {
+     snprintf(str,MXSTR,"illegal number of processes %d",gi_maxProc ) ;
+     factOut(kFatal,301, str ) ;
+     gi_maxProc=1;
+  }
+
 //partially initialize event control logics
   evtCtrl.frstPtr = 0 ;
@@ -1791,6 +1930,13 @@
   /*-----------------------------------------------------------------*/
 
-
 #ifdef BILAND
+
+int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer) 
+{
+  printf("called subproc %d\n",threadID) ;
+  return threadID+1 ;
+}
+
+
 
 
@@ -1818,5 +1964,6 @@
 
 
-int  eventCheck( PEVNT_HEADER *fadhd, EVENT *event) 
+
+int  eventCheck( uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event) 
 {
    int i=0;
@@ -1888,4 +2035,6 @@
   g_maxMem = g_maxMem *  200;      //100MBytes
 
+  g_maxProc = 20  ;
+  g_maxSize = 30000  ;
 
   g_runStat = 40 ;
