Changeset 12091 for trunk/FACT++/src


Ignore:
Timestamp:
09/13/11 14:45:19 (13 years ago)
Author:
tbretz
Message:
Updated to the latest event builder version which includes runEnd and runStart to control the memory of the processing threads.
Location:
trunk/FACT++/src
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilder.c

    r11895 r12091  
    22// // // #define EVTDEBUG
    33
    4 #define NUMSOCK   1          //set to 7 for old configuration
    5 #define MAXREAD  65536       //64kB wiznet buffer
     4#define NUMSOCK   1             //set to 7 for old configuration
     5#define MAXREAD  65536          //64kB wiznet buffer
     6#define MIN_LEN  32             // min #bytes needed to interpret FADheader
     7#define MAX_LEN 256*1024        // size of read-buffer per socket
    68
    79#include <stdlib.h>
     
    2123#include <netinet/tcp.h>
    2224#include <pthread.h>
    23 #include <sched.h>                             
     25#include <sched.h>
    2426
    2527#include "EventBuilder.h"
     
    3537};
    3638
    37 #define MIN_LEN  32             // min #bytes needed to interpret FADheader
    38 #define MAX_LEN 256*1024        // size of read-buffer per socket
     39
     40void *gi_procPtr;
     41
    3942
    4043//#define nanosleep(x,y)
     
    4548//extern int runFinish (uint32_t runnr);
    4649
     50
     51
     52extern void * runStart (uint32_t irun, RUN_HEAD * runhd, size_t len);
     53
     54extern int  runEnd (uint32_t, void * runPtr );
     55
    4756extern void factOut (int severity, int err, char *message);
    4857
     
    5463extern void factStatNew (EVT_STAT gi);
    5564
    56 extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
     65extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event,
     66                       int mboard);
    5767
    5868extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
    59                        int8_t * buffer);
     69                       int16_t mboard, void * buffer);
    6070
    6171extern void debugHead (int i, int j, void *buf);
     
    7181int g_maxProc;
    7282int g_maxSize;
    73 int gi_maxSize;
     83int gi_maxSize =0;
    7484int gi_maxProc;
     85uint32_t gi_actRun;
     86void *gi_runPtr;
    7587
    7688uint g_actTime;
     
    446458      if (runCtrl[k].runId == runID) {
    447459//         if (runCtrl[k].procId > 0) {   //run is closed -> reject
    448 //            snprintf (str, MXSTR, "skip event since run %d finished", runID);
     460//            snprintf (str, MXSTR, "skip event since run %d already closed", runID);
    449461//            factOut (kInfo, 931, str);
    450462//            return -21;
     
    494506      runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
    495507      runCtrl[evFree].closeTime = tsec + 3600 * 24;     //max time allowed
    496 //    runCtrl[evFree].lastTime = 0;
    497508
    498509      runTail[evFree].nEventsOk =
     
    539550   }
    540551
    541    mBuffer[i].buffer = malloc (gi_maxSize);
    542    if (mBuffer[i].buffer == NULL) {
    543       snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
    544       factOut (kError, 882, str);
    545       free (mBuffer[i].FADhead);
    546       mBuffer[i].FADhead = NULL;
    547       free (mBuffer[i].fEvent);
    548       mBuffer[i].fEvent = NULL;
    549       return -32;
    550    }
     552// if (gi_maxSize > 0) {
     553//    mBuffer[i].buffer = malloc (gi_maxSize);
     554//    if (mBuffer[i].buffer == NULL) {
     555//       snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
     556//       factOut (kError, 882, str);
     557//       free (mBuffer[i].FADhead);
     558//       mBuffer[i].FADhead = NULL;
     559//       free (mBuffer[i].fEvent);
     560//       mBuffer[i].fEvent = NULL;
     561//       return -32;
     562//    }
     563// } else {
     564//    mBuffer[i].buffer = NULL;
     565// }
     566
    551567   //flag all boards as unused
    552568   mBuffer[i].nBoard = 0;
     
    631647   mBuffer[i].FADhead = NULL;
    632648
    633    free (mBuffer[i].buffer);
    634    mBuffer[i].buffer = NULL;
     649// if (gi_maxSize > 0) {
     650//    free (mBuffer[i].buffer);
     651//    mBuffer[i].buffer = NULL;
     652// }
    635653
    636654   headmem = NBOARDS * sizeof (PEVNT_HEADER);
     
    711729   int goodhed = 0;
    712730
     731   char str[MXSTR];
    713732   int sockDef[NBOARDS];        //internal state of sockets
    714733   int jrdx;
     
    730749
    731750/* CPU_ZERO initializes all the bits in the mask to zero. */
    732    CPU_ZERO (&mask);
     751//   CPU_ZERO (&mask);
    733752/* CPU_SET sets only the bit corresponding to cpu. */
    734753   cpu = 7;
    735    CPU_SET (cpu, &mask);
     754//   CPU_SET (cpu, &mask);
    736755
    737756/* sched_setaffinity returns 0 in success */
    738    if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
    739       snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
    740       factOut (kWarn, -1, str);
    741    }
     757//   if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
     758//      snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
     759//      factOut (kWarn, -1, str);
     760//   }
    742761
    743762
     
    758777   for (i = 0; i < NBOARDS; i++)
    759778      sockDef[i] = 0;
     779
     780   gi_actRun = 0;
     781
    760782
    761783 START:
     
    772794   numok = numok2 = 0;
    773795
    774    int cntsock = 8 - NUMSOCK ;
     796   int cntsock = 8 - NUMSOCK;
    775797
    776798   if (gi_resetS > 0) {
     
    802824         gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
    803825
    804       int b,p;
     826      int b, p;
    805827      for (b = 0; b < NBOARDS; b++)
    806828         gj.badRoi[b] = 0;
     
    846868
    847869            for (p = p0 + 1; p < p0 + 8; p++) {
    848                GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
     870               if (p < NUMSOCK)
     871                  GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]);      //generate address and socket
    849872               k++;
    850873            }
     
    867890
    868891      for (i = 0; i < MAX_SOCK; i++) {  //check all sockets if something to read
    869          b = i / 7 ;
    870          p = i % 7 ;
    871 
    872 //if ( b==32 && p>0) {  ; }
    873 if ( p >= NUMSOCK) { ; }
    874 else {
    875          if (sockDef[b] > 0)
    876             s0 = +1;
    877          else
    878             s0 = -1;
    879 
    880          if (rd[i].sockStat < 0) {      //try to connect if not yet done
    881             rd[i].sockStat = connect (rd[i].socket,
    882                                       (struct sockaddr *) &rd[i].SockAddr,
    883                                       sizeof (rd[i].SockAddr));
    884             if (rd[i].sockStat == 0) {  //successfull ==>
    885                if (sockDef[b] > 0) {
    886                   rd[i].bufTyp = 0;     //  expect a header
    887                   rd[i].bufLen = frst_len;      //  max size to read at begining
    888                } else {
    889                   rd[i].bufTyp = -1;    //  full data to be skipped
    890                   rd[i].bufLen = MAX_LEN;       //huge for skipping
     892         b = i / 7;
     893         p = i % 7;
     894
     895         if (p >= NUMSOCK) {;
     896         } else {
     897            if (sockDef[b] > 0)
     898               s0 = +1;
     899            else
     900               s0 = -1;
     901
     902            if (rd[i].sockStat < 0) {   //try to connect if not yet done
     903               rd[i].sockStat = connect (rd[i].socket,
     904                                         (struct sockaddr *) &rd[i].SockAddr,
     905                                         sizeof (rd[i].SockAddr));
     906               if (rd[i].sockStat == 0) {       //successfull ==>
     907                  if (sockDef[b] > 0) {
     908                     rd[i].bufTyp = 0;  //  expect a header
     909                     rd[i].bufLen = frst_len;   //  max size to read at begining
     910                  } else {
     911                     rd[i].bufTyp = -1; //  full data to be skipped
     912                     rd[i].bufLen = MAX_LEN;    //huge for skipping
     913                  }
     914                  rd[i].bufPos = 0;     //  no byte read so far
     915                  rd[i].skip = 0;       //  start empty
     916                  gi_NumConnect[b] += cntsock;
     917
     918                  gi.numConn[b]++;
     919                  gj.numConn[b]++;
     920                  snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);
     921                  factOut (kInfo, -1, str);
    891922               }
    892                rd[i].bufPos = 0;        //  no byte read so far
    893                rd[i].skip = 0;  //  start empty
    894 //             gi_NumConnect[b]++;
    895                gi_NumConnect[b] += cntsock ;
    896 
    897                gi.numConn[b]++;
    898                gj.numConn[b]++;
    899                snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);
    900                factOut (kInfo, -1, str);
    901923            }
    902          }
    903 
    904          if (rd[i].sockStat == 0) {     //we have a connection ==> try to read
    905             if (rd[i].bufLen > 0) {     //might be nothing to read [buffer full]
    906                numok++;
    907                size_t maxread = rd[i].bufLen ;
    908                if (maxread > MAXREAD ) maxread=MAXREAD ;
    909 
    910                jrd =
    911                   recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
    912                         maxread, MSG_DONTWAIT);
    913 //                      rd[i].bufLen, MSG_DONTWAIT);
    914 
    915                if (jrd > 0) {
    916                   debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
     924
     925            if (rd[i].sockStat == 0) {  //we have a connection ==> try to read
     926               if (rd[i].bufLen > 0) {  //might be nothing to read [buffer full]
     927                  numok++;
     928                  size_t maxread = rd[i].bufLen;
     929                  if (maxread > MAXREAD)
     930                     maxread = MAXREAD;
     931
     932                  jrd =
     933                     recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
     934                           maxread, MSG_DONTWAIT);
     935
     936                  if (jrd > 0) {
     937                     debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
    917938#ifdef EVTDEBUG
    918                   memcpy (&rd[i].xBuf->B[rd[i].bufPos],
    919                           &rd[i].rBuf->B[rd[i].bufPos], jrd);
    920                   snprintf (str, MXSTR,
    921                             "read sock %3d bytes %5d len %5d first %d %d", i,
    922                             jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos],
    923                             rd[i].rBuf->B[rd[i].bufPos + 1]);
    924                   factOut (kDebug, 301, str);
    925 #endif
    926                }
    927 
    928                if (jrd == 0) {  //connection has closed ...
    929                   snprintf (str, MXSTR, "Socket %d closed by FAD", i);
    930                   factOut (kInfo, 441, str);
    931                   GenSock (s0, i, 0, NULL, &rd[i]);
    932                   gi.gotErr[b]++;
    933 //                gi_NumConnect[b]--;
    934                   gi_NumConnect[b]-= cntsock ;
    935                   gi.numConn[b]--;
    936                   gj.numConn[b]--;
    937 
    938                } else if (jrd < 0) {    //did not read anything
    939                   if (errno != EAGAIN && errno != EWOULDBLOCK) {
    940                      snprintf (str, MXSTR, "Error Reading from %d | %m", i);
    941                      factOut (kError, 442, str);
    942                      gi.gotErr[b]++;
    943                   } else
    944                      numok--;   //else nothing waiting to be read
    945                   jrd = 0;
    946                }
    947             } else {
    948                jrd = 0;         //did read nothing as requested
    949                snprintf (str, MXSTR, "do not read from socket %d  %d", i,
    950                          rd[i].bufLen);
    951                factOut (kDebug, 301, str);
    952             }
    953 
    954             gi.gotByte[b] += jrd;
    955             gj.rateBytes[b] += jrd;
    956 
    957             if (jrd > 0) {
    958                numokx++;
    959                jrdx += jrd;
    960             }
    961 
    962 
    963             if (rd[i].bufTyp < 0) {     // we are skipping this board ...
    964 //         just do nothing
    965 #ifdef EVTDEBUG
    966                snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
    967                          i);
    968                factOut (kInfo, 301, str);
    969 #endif
    970 
    971             } else if (rd[i].bufTyp > 0) {      // we are reading data ...
    972                if (jrd < rd[i].bufLen) {        //not yet all read
    973                   rd[i].bufPos += jrd;  //==> prepare for continuation
    974                   rd[i].bufLen -= jrd;
    975                   debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec);    // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
    976                } else {         //full dataset read
    977                   rd[i].bufLen = 0;
    978                   rd[i].bufPos = rd[i].fadLen;
    979                   if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
    980                       || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
    981                      gi.evtErr++;
     939                     memcpy (&rd[i].xBuf->B[rd[i].bufPos],
     940                             &rd[i].rBuf->B[rd[i].bufPos], jrd);
    982941                     snprintf (str, MXSTR,
    983                                "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
    984                                i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0],
    985                                rd[i].rBuf->B[1],
    986                                rd[i].rBuf->B[rd[i].bufPos - 2],
    987                                rd[i].rBuf->B[rd[i].bufPos - 1]);
    988                      factOut (kError, 301, str);
    989                      goto EndBuf;
    990 
    991 #ifdef EVTDEBUG
    992                   } else {
    993                      snprintf (str, MXSTR,
    994                                "good  end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
    995                                i, rd[i].fadLen, rd[i].rBuf->B[0],
    996                                rd[i].rBuf->B[1], start.B[1], start.B[0],
    997                                rd[i].rBuf->B[rd[i].bufPos - 2],
    998                                rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
    999                                stop.B[0]);
     942                               "read sock %3d bytes %5d len %5d first %d %d",
     943                               i, jrd, rd[i].bufLen,
     944                               rd[i].rBuf->B[rd[i].bufPos],
     945                               rd[i].rBuf->B[rd[i].bufPos + 1]);
    1000946                     factOut (kDebug, 301, str);
    1001947#endif
    1002948                  }
    1003949
    1004                   if (jrd > 0)
    1005                      debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
    1006 
    1007                   //we have a complete buffer, copy to WORK area
    1008                   int jr;
    1009                   roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
    1010                   for (jr = 0; jr < 9; jr++) {
    1011                      roi[jr] =
    1012                         ntohs (rd[i].
    1013                                rBuf->S[head_len / 2 + 2 + jr * (roi[0] + 4)]);
     950                  if (jrd == 0) {       //connection has closed ...
     951                     snprintf (str, MXSTR, "Socket %d closed by FAD", i);
     952                     factOut (kInfo, 441, str);
     953                     GenSock (s0, i, 0, NULL, &rd[i]);
     954                     gi.gotErr[b]++;
     955                     gi_NumConnect[b] -= cntsock;
     956                     gi.numConn[b]--;
     957                     gj.numConn[b]--;
     958
     959                  } else if (jrd < 0) { //did not read anything
     960                     if (errno != EAGAIN && errno != EWOULDBLOCK) {
     961                        snprintf (str, MXSTR, "Error Reading from %d | %m",
     962                                  i);
     963                        factOut (kError, 442, str);
     964                        gi.gotErr[b]++;
     965                     } else
     966                        numok--;        //else nothing waiting to be read
     967                     jrd = 0;
    1014968                  }
    1015                   //get index into mBuffer for this event (create if needed)
    1016 
    1017                   int actid;
    1018                   if (g_useFTM > 0)
    1019                      actid = rd[i].evtID;
    1020                   else
    1021                      actid = rd[i].ftmID;
    1022 
    1023                   evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
    1024                                   rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
    1025                                   rd[i].evtID);
    1026 
    1027                   if (evID < -1000) {
    1028                      goto EndBuf;       //not usable board/event/run --> skip it
    1029                   }
    1030                   if (evID < 0) {       //no space left, retry later
     969               } else {
     970                  jrd = 0;      //did read nothing as requested
     971                  snprintf (str, MXSTR, "do not read from socket %d  %d", i,
     972                            rd[i].bufLen);
     973                  factOut (kDebug, 301, str);
     974               }
     975
     976               gi.gotByte[b] += jrd;
     977               gj.rateBytes[b] += jrd;
     978
     979               if (jrd > 0) {
     980                  numokx++;
     981                  jrdx += jrd;
     982               }
     983
     984
     985               if (rd[i].bufTyp < 0) {  // we are skipping this board ...
     986                  //         just do nothing
    1031987#ifdef EVTDEBUG
    1032                      if (rd[i].bufLen != 0) {
    1033                         snprintf (str, MXSTR, "something screwed up");
    1034                         factOut (kFatal, 1, str);
     988                  snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
     989                            i);
     990                  factOut (kInfo, 301, str);
     991#endif
     992
     993               } else if (rd[i].bufTyp > 0) {   // we are reading data ...
     994                  if (jrd < rd[i].bufLen) {     //not yet all read
     995                     rd[i].bufPos += jrd;       //==> prepare for continuation
     996                     rd[i].bufLen -= jrd;
     997                     debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
     998                  } else {      //full dataset read
     999                     rd[i].bufLen = 0;
     1000                     rd[i].bufPos = rd[i].fadLen;
     1001                     if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
     1002                         || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
     1003                        gi.evtErr++;
     1004                        snprintf (str, MXSTR,
     1005                                  "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
     1006                                  i, rd[i].fadLen, rd[i].evtID,
     1007                                  rd[i].rBuf->B[0], rd[i].rBuf->B[1],
     1008                                  rd[i].rBuf->B[rd[i].bufPos - 2],
     1009                                  rd[i].rBuf->B[rd[i].bufPos - 1]);
     1010                        factOut (kError, 301, str);
     1011                        goto EndBuf;
     1012
     1013#ifdef EVTDEBUG
     1014                     } else {
     1015                        snprintf (str, MXSTR,
     1016                                  "good  end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
     1017                                  i, rd[i].fadLen, rd[i].rBuf->B[0],
     1018                                  rd[i].rBuf->B[1], start.B[1], start.B[0],
     1019                                  rd[i].rBuf->B[rd[i].bufPos - 2],
     1020                                  rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
     1021                                  stop.B[0]);
     1022                        factOut (kDebug, 301, str);
     1023#endif
    10351024                     }
    1036 #endif
    1037                      xwait.tv_sec = 0;
    1038                      xwait.tv_nsec = 10000000;  // sleep for ~10 msec
    1039                      nanosleep (&xwait, NULL);
    1040                      goto EndBuf1;      //hope there is free space next round
    1041                   }
    1042                   //we have a valid entry in mBuffer[]; fill it
    1043 
     1025
     1026                     if (jrd > 0)
     1027                        debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec);      // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
     1028
     1029                     //we have a complete buffer, copy to WORK area
     1030                     int jr;
     1031                     roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
     1032                     for (jr = 0; jr < 9; jr++) {
     1033                        roi[jr] =
     1034                           ntohs (rd[i].rBuf->S[head_len / 2 + 2 +
     1035                                          jr * (roi[0] + 4)]);
     1036                     }
     1037                     //get index into mBuffer for this event (create if needed)
     1038
     1039                     int actid;
     1040                     if (g_useFTM > 0)
     1041                        actid = rd[i].evtID;
     1042                     else
     1043                        actid = rd[i].ftmID;
     1044
     1045                     evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
     1046                                     rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
     1047                                     rd[i].evtID);
     1048
     1049                     if (evID < -1000) {
     1050                        goto EndBuf;    //not usable board/event/run --> skip it
     1051                     }
     1052                     if (evID < 0) {    //no space left, retry later
    10441053#ifdef EVTDEBUG
    1045                   int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
    1046                                      rd[i].fadLen);
    1047                   if (xchk != 0) {
    1048                      snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d",
    1049                                xchk, rd[i].fadLen, i);
    1050                      factOut (kFatal, 1, str);
    1051 
    1052                      uint iq;
    1053                      for (iq = 0; iq < rd[i].fadLen; iq++) {
    1054                         if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
    1055                            snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq,
    1056                                      rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
     1054                        if (rd[i].bufLen != 0) {
     1055                           snprintf (str, MXSTR, "something screwed up");
    10571056                           factOut (kFatal, 1, str);
    10581057                        }
     1058#endif
     1059                        xwait.tv_sec = 0;
     1060                        xwait.tv_nsec = 10000000;       // sleep for ~10 msec
     1061                        nanosleep (&xwait, NULL);
     1062                        goto EndBuf1;   //hope there is free space next round
    10591063                     }
    1060                   }
    1061 #endif
    1062                   int qncpy = 0;
    1063                   boardId = b;
    1064                   int fadBoard = ntohs (rd[i].rBuf->S[12]);
    1065                   int fadCrate = fadBoard / 256;
    1066                   if (boardId != (fadCrate * 10 + fadBoard % 256)) {
    1067                      snprintf (str, MXSTR, "wrong Board ID %d %d %d",
    1068                                fadCrate, fadBoard % 256, boardId);
    1069                      factOut (kWarn, 301, str);
    1070                   }
    1071                   if (mBuffer[evID].board[boardId] != -1) {
    1072                      snprintf (str, MXSTR,
    1073                                "double board: ev %5d, b %3d, %3d;  len %5d %3d %3d - %3d %3d ",
    1074                                evID, boardId, i, rd[i].fadLen,
    1075                                rd[i].rBuf->B[0], rd[i].rBuf->B[1],
    1076                                rd[i].rBuf->B[rd[i].bufPos - 2],
    1077                                rd[i].rBuf->B[rd[i].bufPos - 1]);
    1078                      factOut (kWarn, 501, str);
    1079                      goto EndBuf;       //--> skip Board
    1080                   }
    1081 
    1082                   int iDx = evtIdx[evID];       //index into evtCtrl
    1083 
    1084                   memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
    1085                           &rd[i].rBuf->S[0], head_len);
    1086                   qncpy += head_len;
    1087 
    1088                   src = head_len / 2;
    1089                   for (px = 0; px < 9; px++) {  //different sort in FAD board.....
    1090                      for (drs = 0; drs < 4; drs++) {
    1091                         pixH = ntohs (rd[i].rBuf->S[src++]);    // ID
    1092                         pixC = ntohs (rd[i].rBuf->S[src++]);    // start-cell
    1093                         pixR = ntohs (rd[i].rBuf->S[src++]);    // roi
    1094 //here we should check if pixH is correct ....
    1095 
    1096                         pixS = boardId * 36 + drs * 9 + px;
    1097                         src++;
    1098 
    1099 
    1100                         mBuffer[evID].fEvent->StartPix[pixS] = pixC;
    1101                         dest = pixS * roi[0];
    1102                         memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
    1103                                 &rd[i].rBuf->S[src], roi[0] * 2);
    1104                         qncpy += roi[0] * 2;
    1105                         src += pixR;
    1106 
    1107                         if (px == 8) {
    1108                            tmS = boardId * 4 + drs;
    1109                            if (pixR > roi[0]) { //and we have additional TM info
    1110                               dest = tmS * roi[0] + NPIX * roi[0];
    1111                               int srcT = src - roi[0];
    1112                               mBuffer[evID].fEvent->StartTM[tmS] =
    1113                                  (pixC + pixR - roi[0]) % 1024;
    1114                               memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
    1115                                       &rd[i].rBuf->S[srcT], roi[0] * 2);
    1116                               qncpy += roi[0] * 2;
    1117                            } else {
    1118                               mBuffer[evID].fEvent->StartTM[tmS] = -1;
     1064                     //we have a valid entry in mBuffer[]; fill it
     1065
     1066#ifdef EVTDEBUG
     1067                     int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
     1068                                        rd[i].fadLen);
     1069                     if (xchk != 0) {
     1070                        snprintf (str, MXSTR,
     1071                                  "ERROR OVERWRITE %d %d on port %d", xchk,
     1072                                  rd[i].fadLen, i);
     1073                        factOut (kFatal, 1, str);
     1074
     1075                        uint iq;
     1076                        for (iq = 0; iq < rd[i].fadLen; iq++) {
     1077                           if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
     1078                              snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i,
     1079                                        iq, rd[i].rBuf->B[iq],
     1080                                        rd[i].xBuf->B[iq]);
     1081                              factOut (kFatal, 1, str);
    11191082                           }
    11201083                        }
    11211084                     }
    1122                   }             // now we have stored a new board contents into Event structure
    1123 
    1124                   mBuffer[evID].fEvent->NumBoards++;
    1125                   mBuffer[evID].board[boardId] = boardId;
    1126                   evtCtrl.evtStat[iDx]++;
    1127                   evtCtrl.pcTime[iDx] = g_actTime;
    1128 
    1129                   if (++mBuffer[evID].nBoard >= actBoards) {
    1130                      int qnrun = 0;
    1131                      if (mBuffer[evID].runNum != actrun) {      // have we already reported first event of this run ???
    1132                         actrun = mBuffer[evID].runNum;
    1133                         int ir;
    1134                         for (ir = 0; ir < MAX_RUN; ir++) {
    1135                            qnrun++;
    1136                            if (runCtrl[ir].runId == actrun) {
    1137                               if (++runCtrl[ir].lastEvt == 0) {
    1138                                  gotNewRun (actrun, mBuffer[evID].FADhead);
    1139                                  snprintf (str, MXSTR, "gotNewRun %d (ev %d)",
    1140                                            mBuffer[evID].runNum,
    1141                                            mBuffer[evID].evNum);
    1142                                  factOut (kInfo, 1, str);
    1143                                  break;
     1085#endif
     1086                     int qncpy = 0;
     1087                     boardId = b;
     1088                     int fadBoard = ntohs (rd[i].rBuf->S[12]);
     1089                     int fadCrate = fadBoard / 256;
     1090                     if (boardId != (fadCrate * 10 + fadBoard % 256)) {
     1091                        snprintf (str, MXSTR, "wrong Board ID %d %d %d",
     1092                                  fadCrate, fadBoard % 256, boardId);
     1093                        factOut (kWarn, 301, str);
     1094                     }
     1095                     if (mBuffer[evID].board[boardId] != -1) {
     1096                        snprintf (str, MXSTR,
     1097                                  "double board: ev %5d, b %3d, %3d;  len %5d %3d %3d - %3d %3d ",
     1098                                  evID, boardId, i, rd[i].fadLen,
     1099                                  rd[i].rBuf->B[0], rd[i].rBuf->B[1],
     1100                                  rd[i].rBuf->B[rd[i].bufPos - 2],
     1101                                  rd[i].rBuf->B[rd[i].bufPos - 1]);
     1102                        factOut (kWarn, 501, str);
     1103                        goto EndBuf;    //--> skip Board
     1104                     }
     1105
     1106                     int iDx = evtIdx[evID];    //index into evtCtrl
     1107
     1108                     memcpy (&mBuffer[evID].FADhead[boardId].
     1109                             start_package_flag, &rd[i].rBuf->S[0], head_len);
     1110                     qncpy += head_len;
     1111
     1112                     src = head_len / 2;
     1113                     for (px = 0; px < 9; px++) {       //different sort in FAD board.....
     1114                        for (drs = 0; drs < 4; drs++) {
     1115                           pixH = ntohs (rd[i].rBuf->S[src++]); // ID
     1116                           pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell
     1117                           pixR = ntohs (rd[i].rBuf->S[src++]); // roi
     1118//here we should check if pixH is correct ....
     1119
     1120                           pixS = boardId * 36 + drs * 9 + px;
     1121                           src++;
     1122
     1123
     1124                           mBuffer[evID].fEvent->StartPix[pixS] = pixC;
     1125                           dest = pixS * roi[0];
     1126                           memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
     1127                                   &rd[i].rBuf->S[src], roi[0] * 2);
     1128                           qncpy += roi[0] * 2;
     1129                           src += pixR;
     1130
     1131                           if (px == 8) {
     1132                              tmS = boardId * 4 + drs;
     1133                              if (pixR > roi[0]) {      //and we have additional TM info
     1134                                 dest = tmS * roi[0] + NPIX * roi[0];
     1135                                 int srcT = src - roi[0];
     1136                                 mBuffer[evID].fEvent->StartTM[tmS] =
     1137                                    (pixC + pixR - roi[0]) % 1024;
     1138                                 memcpy (&mBuffer[evID].fEvent->
     1139                                         Adc_Data[dest], &rd[i].rBuf->S[srcT],
     1140                                         roi[0] * 2);
     1141                                 qncpy += roi[0] * 2;
     1142                              } else {
     1143                                 mBuffer[evID].fEvent->StartTM[tmS] = -1;
    11441144                              }
    11451145                           }
    11461146                        }
     1147                     }          // now we have stored a new board contents into Event structure
     1148
     1149                     mBuffer[evID].fEvent->NumBoards++;
     1150                     mBuffer[evID].board[boardId] = boardId;
     1151                     evtCtrl.evtStat[iDx]++;
     1152                     evtCtrl.pcTime[iDx] = g_actTime;
     1153
     1154                     if (++mBuffer[evID].nBoard >= actBoards) {
     1155                        int qnrun = 0;
     1156                        if (mBuffer[evID].runNum != actrun) {   // have we already reported first event of this run ???
     1157                           actrun = mBuffer[evID].runNum;
     1158                           int ir;
     1159                           for (ir = 0; ir < MAX_RUN; ir++) {
     1160                              qnrun++;
     1161                              if (runCtrl[ir].runId == actrun) {
     1162                                 if (++runCtrl[ir].lastEvt == 0) {
     1163                                    gotNewRun (actrun, mBuffer[evID].FADhead);
     1164                                    snprintf (str, MXSTR,
     1165                                              "gotNewRun %d (ev %d)",
     1166                                              mBuffer[evID].runNum,
     1167                                              mBuffer[evID].evNum);
     1168                                    factOut (kInfo, 1, str);
     1169                                    break;
     1170                                 }
     1171                              }
     1172                           }
     1173                        }
     1174                        snprintf (str, MXSTR,
     1175                                  "%5d complete event roi %4d roiTM %d cpy %8d %5d",
     1176                                  mBuffer[evID].evNum, roi[0],
     1177                                  roi[8] - roi[0], qncpy, qnrun);
     1178                        factOut (kDebug, -1, str);
     1179
     1180                        //complete event read ---> flag for next processing
     1181                        evtCtrl.evtStat[iDx] = 99;
     1182                        gi.evtTot++;
    11471183                     }
    1148                      snprintf (str, MXSTR,
    1149                                "%5d complete event roi %4d roiTM %d cpy %8d %5d",
    1150                                mBuffer[evID].evNum, roi[0], roi[8] - roi[0],
    1151                                qncpy, qnrun);
    1152                      factOut (kDebug, -1, str);
    1153 
    1154                      //complete event read ---> flag for next processing
    1155                      evtCtrl.evtStat[iDx] = 99;
    1156                      gi.evtTot++;
     1184
     1185                   EndBuf:
     1186                     rd[i].bufTyp = 0;  //ready to read next header
     1187                     rd[i].bufLen = frst_len;
     1188                     rd[i].bufPos = 0;
     1189                   EndBuf1:
     1190                     ;
    11571191                  }
    11581192
    1159                 EndBuf:
    1160                   rd[i].bufTyp = 0;     //ready to read next header
    1161                   rd[i].bufLen = frst_len;
    1162                   rd[i].bufPos = 0;
    1163                 EndBuf1:
    1164                   ;
    1165                }
    1166 
    1167             } else {            //we are reading event header
    1168                rd[i].bufPos += jrd;
    1169                rd[i].bufLen -= jrd;
    1170                if (rd[i].bufPos >= minLen) {    //sufficient data to take action
    1171                   //check if startflag correct; else shift block ....
    1172                   for (k = 0; k < rd[i].bufPos - 1; k++) {
    1173                      if (rd[i].rBuf->B[k] == start.B[1]
    1174                          && rd[i].rBuf->B[k + 1] == start.B[0])
    1175                         break;
    1176                   }
    1177                   rd[i].skip += k;
    1178 
    1179                   if (k >= rd[i].bufPos - 1) {  //no start of header found
    1180                      rd[i].bufPos = 0;
    1181                      rd[i].bufLen = head_len;
    1182                   } else if (k > 0) {
    1183                      rd[i].bufPos -= k;
    1184                      rd[i].bufLen += k;
    1185                      memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
    1186                              rd[i].bufPos);
     1193               } else {         //we are reading event header
     1194                  rd[i].bufPos += jrd;
     1195                  rd[i].bufLen -= jrd;
     1196                  if (rd[i].bufPos >= minLen) { //sufficient data to take action
     1197                     //check if startflag correct; else shift block ....
     1198                     for (k = 0; k < rd[i].bufPos - 1; k++) {
     1199                        if (rd[i].rBuf->B[k] == start.B[1]
     1200                            && rd[i].rBuf->B[k + 1] == start.B[0])
     1201                           break;
     1202                     }
     1203                     rd[i].skip += k;
     1204
     1205                     if (k >= rd[i].bufPos - 1) {       //no start of header found
     1206                        rd[i].bufPos = 0;
     1207                        rd[i].bufLen = head_len;
     1208                     } else if (k > 0) {
     1209                        rd[i].bufPos -= k;
     1210                        rd[i].bufLen += k;
     1211                        memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
     1212                                rd[i].bufPos);
    11871213#ifdef EVTDEBUG
    1188                      memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
    1189                              rd[i].bufPos);
     1214                        memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
     1215                                rd[i].bufPos);
    11901216#endif
    1191                   }
    1192                   if (rd[i].bufPos >= minLen) {
    1193                      if (rd[i].skip > 0) {
    1194                         snprintf (str, MXSTR, "skipped %d bytes on port%d",
    1195                                   rd[i].skip, i);
    1196                         factOut (kInfo, 666, str);
    1197                         rd[i].skip = 0;
    11981217                     }
    1199                      goodhed++;
    1200                      rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2;
    1201                      rd[i].fadVers = ntohs (rd[i].rBuf->S[2]);
    1202                      rd[i].ftmTyp = ntohl (rd[i].rBuf->S[5]);
    1203                      rd[i].ftmID = ntohl (rd[i].rBuf->I[3]);    //(FTMevt)
    1204                      rd[i].evtID = ntohl (rd[i].rBuf->I[4]);    //(FADevt)
    1205                      rd[i].runID = ntohl (rd[i].rBuf->I[11]);
    1206                      rd[i].bufTyp = 1;  //ready to read full record
    1207                      rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
    1208 
    1209                      int fadboard = ntohs (rd[i].rBuf->S[12]);
    1210                      int fadcrate = fadboard / 256;
    1211                      fadboard = (fadcrate * 10 + fadboard % 256);
     1218                     if (rd[i].bufPos >= minLen) {
     1219                        if (rd[i].skip > 0) {
     1220                           snprintf (str, MXSTR, "skipped %d bytes on port%d",
     1221                                     rd[i].skip, i);
     1222                           factOut (kInfo, 666, str);
     1223                           rd[i].skip = 0;
     1224                        }
     1225                        goodhed++;
     1226                        rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2;
     1227                        rd[i].fadVers = ntohs (rd[i].rBuf->S[2]);
     1228                        rd[i].ftmTyp = ntohl (rd[i].rBuf->S[5]);
     1229                        rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt)
     1230                        rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt)
     1231                        rd[i].runID = ntohl (rd[i].rBuf->I[11]);
     1232                        rd[i].bufTyp = 1;       //ready to read full record
     1233                        rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
     1234
     1235                        int fadboard = ntohs (rd[i].rBuf->S[12]);
     1236                        int fadcrate = fadboard / 256;
     1237                        fadboard = (fadcrate * 10 + fadboard % 256);
    12121238#ifdef EVTDEBUG
    1213                      snprintf (str, MXSTR,
    1214                                "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
    1215                                rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
    1216                                rd[i].runID, fadboard, jrd);
    1217                      factOut (kDebug, 1, str);
     1239                        snprintf (str, MXSTR,
     1240                                  "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
     1241                                  rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
     1242                                  rd[i].runID, fadboard, jrd);
     1243                        factOut (kDebug, 1, str);
    12181244#endif
    12191245
    1220                      if (rd[i].runID == 0)
    1221                         rd[i].runID = gi_myRun;
    1222 
    1223                      if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
    1224                         snprintf (str, MXSTR,
    1225                                   "illegal event-length on port %d", i);
    1226                         factOut (kFatal, 881, str);
    1227                         rd[i].bufLen = 100000;  //?
     1246                        if (rd[i].runID == 0)
     1247                           rd[i].runID = gi_myRun;
     1248
     1249                        if (rd[i].bufLen <= head_len
     1250                            || rd[i].bufLen > MAX_LEN) {
     1251                           snprintf (str, MXSTR,
     1252                                     "illegal event-length on port %d", i);
     1253                           factOut (kFatal, 881, str);
     1254                           rd[i].bufLen = 100000;       //?
     1255                        }
     1256                        int fadBoard = ntohs (rd[i].rBuf->S[12]);
     1257                        debugHead (i, fadBoard, rd[i].rBuf);
     1258                        debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec);     // i=socket; jrd=#bytes; ievt=eventid;-1=start event
     1259                     } else {
     1260                        debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec);   // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
    12281261                     }
    1229                      int fadBoard = ntohs (rd[i].rBuf->S[12]);
    1230                      debugHead (i, fadBoard, rd[i].rBuf);
    1231                      debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec);        // i=socket; jrd=#bytes; ievt=eventid;-1=start event
    12321262                  } else {
    12331263                     debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec);      // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
    12341264                  }
    1235                } else {
    1236                   debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
    1237                }
    1238 
    1239             }                   //end interpreting last read
    1240 }
     1265
     1266               }                //end interpreting last read
     1267            }
    12411268         }                      //end of successful read anything
    12421269      }                         //finished trying to read all sockets
     
    12721299                   && evtCtrl.pcTime[k0] < g_actTime - 10) {
    12731300                  int id = evtCtrl.evtBuf[k0];
     1301                  int ic,ib,ik,jb;
    12741302                  snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d",
    12751303                            mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0,
    12761304                            evtCtrl.evtStat[k0]);
    12771305                  factOut (kWarn, 601, str);
    1278                   evtCtrl.evtStat[k0] = 91;     //timeout for incomplete events
     1306
     1307                  ik=0;
     1308                  for (ib=0; ib<NBOARDS; ib++) {
     1309                     if (ib%10==0) {
     1310                        snprintf (&str[ik], MXSTR, "'");
     1311                        ik++;
     1312                     }
     1313                     jb = mBuffer[id].board[ib];
     1314                     if ( jb<0 ) {
     1315                        snprintf (&str[ik], MXSTR, ".");
     1316                        ik++;
     1317                     } else {
     1318                        snprintf (&str[ik], MXSTR, "%d",jb%10);
     1319                        ik+=1;
     1320                     }
     1321                  }
     1322                  snprintf (&str[ik], MXSTR, "'");
     1323                  factOut (kWarn, 601, str);
     1324
     1325                  evtCtrl.evtStat[k0] = 93;     //timeout for incomplete events
    12791326                  gi.evtSkp++;
    12801327                  gi.evtTot++;
     
    13711418            GenSock (-1, i, 0, NULL, &rd[i]);   //close and destroy open socket   
    13721419            if (i % 7 == 0) {
    1373 //             gi_NumConnect[i / 7]--;
    1374                gi_NumConnect[i / 7]-= cntsock ;
     1420               gi_NumConnect[i / 7] -= cntsock;
    13751421               gi.numConn[i / 7]--;
    13761422               gj.numConn[i / 7]--;
     
    14761522subProc (void *thrid)
    14771523{
     1524   char str[MXSTR];
    14781525   int threadID, status, numWait, numProc, kd, k1, k0, k, jret;
    14791526   struct timespec xwait;
     
    15011548               jret =
    15021549                  subProcEvt (threadID, mBuffer[id].FADhead,
    1503                               mBuffer[id].fEvent, mBuffer[id].buffer);
     1550                              mBuffer[id].fEvent, mBuffer[id].mBoard, gi_runPtr );
     1551
    15041552               if (jret <= threadID) {
    15051553                  snprintf (str, MXSTR,
     
    15441592{
    15451593/* *** main loop processing file, including SW-trigger */
    1546    int numProc, numWait;
     1594   int numProc, numWait, iskip;
    15471595   int k, status, j;
    15481596   struct timespec xwait;
     
    15611609
    15621610/* CPU_ZERO initializes all the bits in the mask to zero. */
    1563    CPU_ZERO (&mask);
     1611//   CPU_ZERO (&mask);
    15641612/* CPU_SET sets only the bit corresponding to cpu. */
    15651613// CPU_SET(  0 , &mask );  leave for system
    15661614// CPU_SET(  1 , &mask );  used by write process
    1567    CPU_SET (2, &mask);
    1568    CPU_SET (3, &mask);
    1569    CPU_SET (4, &mask);
    1570    CPU_SET (5, &mask);
    1571    CPU_SET (6, &mask);
     1615//   CPU_SET (2, &mask);
     1616//   CPU_SET (3, &mask);
     1617//   CPU_SET (4, &mask);
     1618//   CPU_SET (5, &mask);
     1619//   CPU_SET (6, &mask);
    15721620// CPU_SET(  7 , &mask );  used by read process
    15731621/* sched_setaffinity returns 0 in success */
    1574    if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
    1575       snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
    1576       factOut (kWarn, -1, str);
    1577    }
     1622//   if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
     1623//      snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
     1624//      factOut (kWarn, -1, str);
     1625//   }
    15781626
    15791627
     
    16021650            } else {
    16031651
    1604 //-------- it is better to open the run already here, so call can be used to initialize
    1605 //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
    16061652               int id = evtCtrl.evtBuf[k0];
    16071653               uint32_t irun = mBuffer[id].runNum;
    16081654               int32_t ievt = mBuffer[id].evNum;
    1609                if (runCtrl[lastRun].runId == irun) {
    1610                   j = lastRun;
    1611                } else {
    1612                   //check which fileID to use (or open if needed)
    1613                   for (j = 0; j < MAX_RUN; j++) {
    1614                      if (runCtrl[j].runId == irun)
    1615                         break;
     1655               iskip = 0 ;
     1656               if (irun != gi_actRun) { // we should open a new run ?
     1657                  if (irun < gi_actRun) { //  very old event; skip it
     1658                     iskip = -1 ;         //  but anyhow checkEvent() will be done
     1659                     snprintf (str, MXSTR,
     1660                               "P event %d from old run???", ievt,irun);
     1661                     factOut (kWarn, 901, str);
     1662                  } else if (gi_actRun != 0) { //    but there is still old one open ?
     1663
     1664//  // // // // // //MUST WAIT UNTIL PENDING OLD EVENTS PROCESSED ...
     1665//  loop over all waiting events; and decide what to do ....
     1666                     int q,q0 ;
     1667                     int qcnt=0;
     1668                     for (q=k1; q < (k1 + kd); q++) {
     1669                        q0 = q % (MAX_EVT * MAX_RUN);
     1670                        if (evtCtrl.evtStat[q0] > 0
     1671                         && evtCtrl.evtStat[q0] < 9000) {
     1672                           //this event is still active in the queue
     1673                           int qd = evtCtrl.evtBuf[q0];
     1674                           uint32_t qrun = mBuffer[qd].runNum;
     1675                           if (qrun < irun ) {
     1676                              if (evtCtrl.evtStat[q0] < 92) {
     1677                                 //incomplete old event -> reduce timeout
     1678                                 evtCtrl.pcTime[q0] = 0 ;
     1679                              }
     1680                              qcnt++ ;
     1681                           }
     1682                        }
     1683                     }
     1684                     if (qcnt == 0 ) {
     1685                        runEnd (gi_actRun, gi_runPtr );
     1686                        snprintf (str, MXSTR,
     1687                                  "P run %d ended", gi_actRun);
     1688                        factOut (kWarn, 901, str);
     1689                        gi_actRun = 0;  //there are no events from old runs left
     1690                     }
    16161691                  }
    1617                   if (j >= MAX_RUN) {
    1618                      snprintf (str, MXSTR,
    1619                                "P error: can not find run %d for event %d in %d",
    1620                                irun, ievt, id);
    1621                      factOut (kFatal, 901, str);
    1622                   }
    1623                   lastRun = j;
    1624                }
    1625 
    1626                if (runCtrl[j].fileId < 0) {
    1627 //----            we need to open a new run ==> make sure all older runs are
    1628 //----            finished and marked to be closed ....
    1629                   int j1;
    1630                   for (j1 = 0; j1 < MAX_RUN; j1++) {
    1631                      if (runCtrl[j1].fileId == 0) {
    1632                         runCtrl[j1].procId = 2; //--> do no longer accept events for processing
    1633 //----                  problem: processing still going on ==> must wait for closing ....
     1692
     1693
     1694                  if (gi_actRun == 0) { //    now we can try to open new run
     1695                     for (j = 0; j < MAX_RUN; j++) {
     1696                        if (runCtrl[j].runId == irun)
     1697                           break;
     1698                     }
     1699                     if (j >= MAX_RUN) {
    16341700                        snprintf (str, MXSTR,
    1635                                   "P finish run since new one opened %d",
    1636                                   runCtrl[j1].runId);
    1637                         runFinish1 (runCtrl[j1].runId);
     1701                                  "P error: can not find run %d for event %d in %d",
     1702                                  irun, ievt, id);
     1703                        factOut (kFatal, 901, str);
     1704                        j = 0;
    16381705                     }
    16391706
    1640                   }
    1641 
    1642                   actRun.Version = 1;
    1643                   actRun.RunType = -1;  //to be adapted
    1644 
    1645                   actRun.Nroi = runCtrl[j].roi0;
    1646                   actRun.NroiTM = runCtrl[j].roi8;
    1647                   if (actRun.Nroi == actRun.NroiTM)
    1648                      actRun.NroiTM = 0;
    1649                   actRun.RunTime = runCtrl[j].firstTime;
    1650                   actRun.RunUsec = runCtrl[j].firstTime;
    1651                   actRun.NBoard = NBOARDS;
    1652                   actRun.NPix = NPIX;
    1653                   actRun.NTm = NTMARK;
    1654                   actRun.Nroi = mBuffer[id].nRoi;
    1655                   memcpy (actRun.FADhead, mBuffer[id].FADhead,
    1656                           NBOARDS * sizeof (PEVNT_HEADER));
    1657 
    1658                   runCtrl[j].fileHd =
    1659                      runOpen (irun, &actRun, sizeof (actRun));
    1660                   if (runCtrl[j].fileHd == NULL) {
    1661                      snprintf (str, MXSTR,
    1662                                "P could not open a file for run %d", irun);
    1663                      factOut (kError, 502, str);
    1664                      runCtrl[j].fileId = 91;
    1665                      runCtrl[j].procId = 91;
    1666                   } else {                                                 
    1667                      snprintf (str, MXSTR, "P opened new run_file %d evt %d",
    1668                                irun, ievt);
    1669                      factOut (kInfo, -1, str);
    1670                      runCtrl[j].fileId = 0;
    1671                      runCtrl[j].procId = 0;
    1672                   }
    1673 
    1674                }
    1675 //-------- also check if run shall be closed (==> skip event, but do not close the file !!! ) 
    1676                if (runCtrl[j].procId == 0) {
    1677                   if (runCtrl[j].closeTime < g_actTime
    1678                       || runCtrl[j].lastTime < g_actTime - 300
    1679                       || runCtrl[j].maxEvt <= runCtrl[j].procEvt) {
    1680                      snprintf (str, MXSTR,
    1681                                "P reached end of run condition for run %d",
    1682                                irun);
    1683                      factOut (kInfo, 502, str);
    1684                      runFinish1 (runCtrl[j].runId);
    1685                      runCtrl[j].procId = 1;
     1707                     if (runCtrl[j].fileId == 0) {
     1708                        snprintf (str, MXSTR,
     1709                                  "P run %d is already open ???", irun);
     1710                        factOut (kWarn, 901, str);
     1711                     } else if (runCtrl[j].fileId > 0) {
     1712                        snprintf (str, MXSTR,
     1713                                  "P run %d is already closed ???", irun);
     1714                        factOut (kWarn, 901, str);
     1715                     } else {   // now open new run
     1716
     1717                        actRun.Version = 1;
     1718                        actRun.RunType = -1;    //to be adapted
     1719
     1720                        actRun.Nroi = runCtrl[j].roi0;
     1721                        actRun.NroiTM = runCtrl[j].roi8;
     1722                        if (actRun.Nroi == actRun.NroiTM)
     1723                           actRun.NroiTM = 0;
     1724                        actRun.RunTime = runCtrl[j].firstTime;
     1725                        actRun.RunUsec = runCtrl[j].firstTime;
     1726                        actRun.NBoard = NBOARDS;
     1727                        actRun.NPix = NPIX;
     1728                        actRun.NTm = NTMARK;
     1729                        actRun.Nroi = mBuffer[id].nRoi;
     1730                        memcpy (actRun.FADhead, mBuffer[id].FADhead,
     1731                                NBOARDS * sizeof (PEVNT_HEADER));
     1732
     1733                        gi_actRun = irun;
     1734                        gi_runPtr =
     1735                           runStart (irun, &actRun, sizeof (actRun));
     1736                        if (gi_runPtr == NULL) {
     1737                           snprintf (str, MXSTR,
     1738                                     "P could not start run %d", irun);
     1739                           factOut (kFatal, 502, str);
     1740                        } else {
     1741                           snprintf (str, MXSTR, "P started new run %d",
     1742                                     irun);
     1743                           factOut (kInfo, -1, str);
     1744                        }
     1745
     1746                        runCtrl[j].fileHd =
     1747                           runOpen (irun, &actRun, sizeof (actRun));
     1748                        if (runCtrl[j].fileHd == NULL) {
     1749                           snprintf (str, MXSTR,
     1750                                     "P could not open a file for run %d",
     1751                                     irun);
     1752                           factOut (kError, 502, str);
     1753                           runCtrl[j].fileId = 91;
     1754                           runCtrl[j].procId = 91;
     1755                        } else {
     1756                           snprintf (str, MXSTR,
     1757                                     "P opened new run_file %d evt %d", irun,
     1758                                     ievt);
     1759                           factOut (kInfo, -1, str);
     1760                           runCtrl[j].fileId = 0;
     1761                           runCtrl[j].procId = 0;
     1762                        }
     1763                     }
    16861764                  }
    16871765               }
    1688                if (runCtrl[j].procId != 0) {
    1689                   snprintf (str, MXSTR,
    1690                             "P skip event %d because no active run %d", ievt,
    1691                             irun);
    1692                   factOut (kDebug, 502, str);
    1693                   evtCtrl.evtStat[k0] = 9091;
    1694                } else {
    1695 //--------
    1696 //--------
    1697                       id = evtCtrl.evtBuf[k0];
     1766
     1767               if (irun == gi_actRun) {
     1768//                now we are sure the run is ready and we can start processing the event
     1769                  id = evtCtrl.evtBuf[k0];
    16981770                  int itevt = mBuffer[id].trgNum;
    16991771                  int itrg = mBuffer[id].trgTyp;
     
    17011773                  int roiTM = mBuffer[id].nRoiTM;
    17021774
    1703 //make sure unused pixels/tmarks are cleared to zero
     1775                  //make sure unused pixels/tmarks are cleared to zero
    17041776                  if (roiTM == roi)
    17051777                     roiTM = 0;
     
    17191791
    17201792
    1721 //and set correct event header ; also check for consistency in event (not yet)
     1793                  //and set correct event header
    17221794                  mBuffer[id].fEvent->Roi = roi;
    17231795                  mBuffer[id].fEvent->RoiTM = roiTM;
     
    17311803                  mBuffer[id].fEvent->SoftTrig = 0;
    17321804
    1733 
     1805                  int mboard = -1;
    17341806                  for (ib = 0; ib < NBOARDS; ib++) {
    1735                      if (mBuffer[id].board[ib] == -1) { //board is not read
     1807                     if (mBuffer[id].board[ib] == -1) {    //board is not read
    17361808                        mBuffer[id].FADhead[ib].start_package_flag = 0;
    17371809                        mBuffer[id].fEvent->BoardTime[ib] = 0;
     
    17391811                        mBuffer[id].fEvent->BoardTime[ib] =
    17401812                           ntohl (mBuffer[id].FADhead[ib].time);
     1813                        if (mboard < 0)
     1814                           mboard = ib;
    17411815                     }
    17421816                  }
    1743 
     1817                  mBuffer[id].mBoard = mboard;
    17441818                  int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
    1745                                       mBuffer[id].fEvent);
     1819                                      mBuffer[id].fEvent, mboard);
    17461820                  gi.procTot++;
    17471821                  numProc++;
    17481822
    1749                   if (i < 0) {
    1750                      evtCtrl.evtStat[k0] = 9999;        //flag event to be skipped
     1823                  if (i < 0 || iskip < 0 ) {
     1824                     evtCtrl.evtStat[k0] = 9999;   //flag event to be skipped
    17511825                     gi.procErr++;
    17521826                  } else {
    1753                      evtCtrl.evtStat[k0] = 1000;
     1827                     evtCtrl.evtStat[k0] = 1000;   //flag event to be processed
    17541828                     runCtrl[j].procEvt++;
     1829                     if (runCtrl[j].procId != 0)
     1830                        mBuffer[id].toWrite = -1;  //flag event to be processed but now written     
     1831                     else
     1832                        mBuffer[id].toWrite = 0;
    17551833                  }
    17561834               }
     
    18631941
    18641942/* CPU_ZERO initializes all the bits in the mask to zero. */
    1865    CPU_ZERO (&mask);
     1943//   CPU_ZERO (&mask);
    18661944/* CPU_SET sets only the bit corresponding to cpu. */
    1867    CPU_SET (cpu, &mask);
     1945//   CPU_SET (cpu, &mask);
    18681946/* sched_setaffinity returns 0 in success */
    1869    if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
    1870       snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
    1871    }
     1947//   if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
     1948//      snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
     1949//   }
    18721950
    18731951   int lastRun = 0;             //usually run from last event still valid
     
    19412019                     snprintf (str, MXSTR, "W opened new run_file %d evt %d",
    19422020                               irun, ievt);
    1943                      factOut (kInfo, -1, str);
     2021                     factOut (kWarn, -1, str);
    19442022                     runCtrl[j].fileId = 0;
    19452023                  }
     
    19472025               }
    19482026
    1949                if (runCtrl[j].fileId != 0) {
     2027               if (runCtrl[j].fileId != 0 || mBuffer[id].toWrite < 0) {
    19502028                  if (runCtrl[j].fileId < 0) {
    19512029                     snprintf (str, MXSTR,
     
    19752053                               ievt, irun, k0);
    19762054                     factOut (kDebug, 504, str);
    1977 //               gj.writEvt++ ;
    19782055                  } else {
    19792056                     snprintf (str, MXSTR, "W error writing event for run %d",
     
    21612238
    21622239//prepare for subProcesses
    2163    gi_maxSize = g_maxSize;
    2164    if (gi_maxSize <= 0)
    2165       gi_maxSize = 1;
     2240//   gi_maxSize = g_maxSize;
     2241//   if (gi_maxSize < 0)
     2242      gi_maxSize = 0;
    21662243
    21672244   gi_maxProc = g_maxProc;
     
    22422319int
    22432320subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
    2244             int8_t * buffer)
     2321            int16_t mboard, void * buffer)
    22452322{
    22462323   printf ("called subproc %d\n", threadID);
     
    22872364
    22882365int
    2289 eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
     2366eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event, int mboard)
    22902367{
    22912368   int i = 0;
     
    23772454
    23782455   g_maxProc = 20;
    2379    g_maxSize = 30000;
     2456   g_maxSize = 0;
    23802457
    23812458   g_runStat = 40;
  • trunk/FACT++/src/EventBuilderWrapper.h

    r12070 r12091  
    735735    array<uint16_t,2> fVecRoi;
    736736
    737     int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
     737    int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int iboard)
    738738    {
    739739        /*
     
    908908    }
    909909
    910     int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t */*buffer*/)
     910    int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t iboard, void */*buffer*/)
    911911    {
    912912        switch (threadID)
     
    12921292    }
    12931293
     1294    // -----
     1295
     1296    void *runStart(uint32_t irun, RUN_HEAD *runhd, size_t len)
     1297    {
     1298        return NULL;
     1299    }
     1300
     1301    int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int16_t mboard, void *runPtr)
     1302    {
     1303        return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, mboard, runPtr);
     1304    }
     1305
     1306    int runEnd(uint32_t, void *runPtr)
     1307    {
     1308        return 0;
     1309    }
     1310
     1311    // -----
     1312
     1313    int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event, int mboard)
     1314    {
     1315        return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event, mboard);
     1316    }
     1317
     1318    void gotNewRun(int runnr, PEVNT_HEADER *headers)
     1319    {
     1320        return EventBuilderWrapper::This->gotNewRun(runnr, headers);
     1321    }
     1322
     1323    // -----
     1324
    12941325    void factOut(int severity, int err, const char *message)
    12951326    {
     
    13071338    }
    13081339
     1340    // ------
     1341
    13091342    void debugHead(int socket, int/*board*/, void *buf)
    13101343    {
     
    13231356        EventBuilderWrapper::This->debugRead(isock, ibyte, event, ftmevt, runno, state, tsec, tusec);
    13241357    }
    1325 
    1326     int eventCheck(uint32_t runNr, PEVNT_HEADER *fadhd, EVENT *event)
    1327     {
    1328         return EventBuilderWrapper::This->eventCheck(runNr, fadhd, event);
    1329     }
    1330 
    1331     void gotNewRun(int runnr, PEVNT_HEADER *headers)
    1332     {
    1333         return EventBuilderWrapper::This->gotNewRun(runnr, headers);
    1334     }
    1335 
    1336     int subProcEvt(int threadID, PEVNT_HEADER *fadhd, EVENT *event, int8_t *buffer)
    1337     {
    1338         return EventBuilderWrapper::This->subProcEvt(threadID, fadhd, event, buffer);
    1339     }
    1340 
    13411358}
    13421359
  • trunk/FACT++/src/FAD.h

    r11766 r12091  
    222222  int16_t  nRoi   ;
    223223  int16_t  nRoiTM ;
     224  int16_t  mBoard ;
    224225  uint32_t pcTime ;
    225226  int32_t  evtLen ;
     227  int16_t  toWrite ;
    226228  uint8_t  Errors[4] ;
    227229  EVENT   *fEvent ;
    228230  PEVNT_HEADER *FADhead; //
    229   int8_t  *buffer ;
    230231
    231232} WRK_DATA ;             //internal to eventbuilder
Note: See TracChangeset for help on using the changeset viewer.