Index: /trunk/FACT++/src/EventBuilder.cc
===================================================================
--- /trunk/FACT++/src/EventBuilder.cc	(revision 15512)
+++ /trunk/FACT++/src/EventBuilder.cc	(revision 15512)
@@ -0,0 +1,2039 @@
+// #define EVTDEBUG
+
+#define NUMSOCK   1          //set to 7 for old configuration
+#define MAXREAD  65536       //64kB wiznet buffer
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdarg.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <sys/time.h>
+#include <arpa/inet.h>
+#include <string.h>
+#include <math.h>
+#include <error.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <pthread.h>
+#include <sched.h>                              
+
+#include <memory>
+#include <deque>
+#include <map>
+
+#include "queue.h"
+
+#include "EventBuilder.h"
+
+enum Severity
+{
+   kMessage = 10,               ///< Just a message, usually obsolete
+   kInfo = 20,                  ///< An info telling something which can be interesting to know
+   kWarn = 30,                  ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
+   kError = 40,                 ///< Error, something unexpected happened, but can still be handled by the program
+   kFatal = 50,                 ///< An error which cannot be handled at all happend, the only solution is program termination
+   kDebug = 99,                 ///< A message used for debugging only
+};
+
+using namespace std;
+
+#define MIN_LEN  32             // min #bytes needed to interpret FADheader
+#define MAX_LEN 256*1024        // size of read-buffer per socket
+
+//#define nanosleep(x,y)
+
+extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
+extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
+extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
+//extern int runFinish (uint32_t runnr);
+
+extern "C" void factOut (int severity, int err, char *message);
+extern void factReportIncomplete (uint64_t rep);
+
+extern "C" void gotNewRun (int runnr, PEVNT_HEADER * headers);
+
+
+extern void factStat (GUI_STAT gj);
+
+extern void factStatNew (EVT_STAT gi);
+
+extern "C" int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
+
+extern "C" int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
+                       int8_t * buffer);
+
+extern void debugHead (int i, int j, void *buf);
+
+extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
+                       int32_t runnr, int state, uint32_t tsec,
+                       uint32_t tusec);
+extern void debugStream (int isock, void *buf, int len);
+
+int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
+
+int g_maxProc;
+int gi_maxProc;
+
+uint g_actTime;
+int g_runStat;
+int g_reset;
+
+size_t g_maxMem;                //maximum memory allowed for buffer
+
+FACT_SOCK g_port[NBOARDS];      // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
+
+uint gi_NumConnect[NBOARDS];    //4 crates * 10 boards
+
+//EVT_STAT gi;
+GUI_STAT gj;
+
+#define MAX_EVT   65536  // ( 300s @ 220Hz; 16GB = 5000 evt @ roi=1024 (27s) ; 18000 evt @ roi = 300 )
+#define MAX_RUN       8  // Number of concurrent runs
+
+void factPrintf(int severity, int id, const char *fmt, ...)
+{
+    char str[1000];
+
+    va_list ap;
+    va_start(ap, fmt);
+    vsnprintf(str, 1000, fmt, ap);
+    va_end(ap);
+
+    factOut(severity, id, str);
+}
+
+
+#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
+#define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)
+typedef struct TGB_struct
+{
+    struct TGB_struct *prev;
+    void *mem;
+} TGB_entry;
+
+TGB_entry  *tgb_last = NULL;
+uint64_t    tgb_memory = 0;
+uint64_t    tgb_inuse  = 0;
+
+void *TGB_Malloc()
+{
+    // No free slot available, next alloc would exceed max memory
+    if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem)
+        return NULL;
+
+    // We will return this amount of memory
+    tgb_inuse += MAX_TOT_MEM;
+
+    // No free slot available, allocate a new one
+    if (!tgb_last)
+    {
+        tgb_memory += MAX_TOT_MEM;
+        return malloc(MAX_TOT_MEM);
+    }
+
+    // Get the next free slot from the stack and return it
+    TGB_entry *last = tgb_last;
+
+    void *mem = last->mem;
+    tgb_last  = last->prev;
+
+    free(last);
+
+    return mem;
+};
+
+void TGB_free(void *mem)
+{
+    if (!mem)
+        return;
+
+    // Add the last free slot to the stack
+    TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry));
+
+    // FIXME: Really free memory if memory usuage exceeds g_maxMem
+
+    entry->prev = tgb_last;
+    entry->mem  = mem;
+
+    tgb_last = entry;
+
+    // Decrease the amont of memory in use accordingly
+    tgb_inuse -= MAX_TOT_MEM;
+
+    gj.usdMem = tgb_inuse;
+    gj.bufTot--;
+}
+
+//RUN_CTRL runCtrl[MAX_RUN];
+
+/*
+*** Definition of rdBuffer to read in IP packets; keep it global !!!!
+ */
+
+typedef union
+{
+   uint8_t B[MAX_LEN];
+   uint16_t S[MAX_LEN / 2];
+   uint32_t I[MAX_LEN / 4];
+   uint64_t L[MAX_LEN / 8];
+} CNV_FACT;
+
+typedef struct
+{
+   int bufTyp;                  //what are we reading at the moment: 0=header 1=data -1=skip ...
+   int32_t bufPos;              //next byte to read to the buffer next
+   int32_t bufLen;              //number of bytes left to read
+   int32_t skip;                //number of bytes skipped before start of event
+
+   int errCnt;                  //how often connect failed since last successful
+   int sockStat;                //-1 if socket not yet connected  , 99 if not exist
+   int socket;                  //contains the sockets
+
+   struct sockaddr_in SockAddr; //IP for each socket
+
+   int evtID;                   // event ID of event currently read
+   int runID;                   // run       "
+   int ftmID;                   // event ID from FTM
+   uint fadLen;                 // FADlength of event currently read
+   int fadVers;                 // Version of FAD
+   int ftmTyp;                  // trigger type
+   int Port;
+
+   CNV_FACT *rBuf;
+
+} READ_STRUCT;
+
+/*-----------------------------------------------------------------*/
+
+
+/*-----------------------------------------------------------------*/
+
+
+int
+GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
+         READ_STRUCT * rs)
+{
+/*
+*** generate Address, create sockets and allocates readbuffer for it
+*** 
+*** if flag==0 generate socket and buffer
+***         <0 destroy socket and buffer
+***         >0 close and redo socket
+***
+*** sid : board*7 + port id
+ */
+
+   //close socket if open
+   if (rs->sockStat == 0)
+   {
+      if (close (rs->socket) > 0) {
+          factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno);
+      } else {
+          factPrintf(kInfo, 771, "Succesfully closed socket %d", sid);
+      }
+   }
+
+   rs->sockStat = 99;
+
+   if (flag < 0) {
+      free (rs->rBuf);          //and never open again
+      rs->rBuf = NULL;
+      return 0;
+   }
+
+
+   if (flag == 0) {             //generate address and buffer ...
+      rs->Port = port;
+      rs->SockAddr.sin_family = sockAddr->sin_family;
+      rs->SockAddr.sin_port = htons (port);
+      rs->SockAddr.sin_addr = sockAddr->sin_addr;
+
+      rs->rBuf = (CNV_FACT*)malloc (sizeof (CNV_FACT));
+      if (rs->rBuf == NULL) {
+         factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid);
+         rs->sockStat = 77;
+         return -3;
+      }
+   }
+
+
+   if ((rs->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
+       factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno);
+      rs->sockStat = 88;
+      return -2;
+   }
+
+   int optval = 1;
+   if (setsockopt (rs->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) {
+       factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
+   }
+   optval = 10;                 //start after 10 seconds
+   if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) {
+       factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
+   }
+   optval = 10;                 //do every 10 seconds
+   if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) {
+      factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
+   }
+   optval = 2;                  //close after 2 unsuccessful tries
+   if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) {
+      factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
+   }
+
+   factPrintf(kInfo, 773, "Successfully generated socket %d", sid);
+
+   rs->sockStat = -1;           //try to (re)open socket
+   rs->errCnt = 0;
+   return 0;
+
+} /*-----------------------------------------------------------------*/
+
+  /*-----------------------------------------------------------------*/
+
+int checkRoiConsistency(const CNV_FACT *rbuf, int roi[])
+{
+    int xjr = -1;
+    int xkr = -1;
+
+    //points to the very first roi
+    int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
+
+    roi[0] = ntohs(rbuf->S[roiPtr]);
+
+    for (int jr = 0; jr < 9; jr++)
+    {
+        roi[jr] = ntohs(rbuf->S[roiPtr]);
+
+        if (roi[jr]<0 || roi[jr]>1024)
+        {
+            factPrintf(kError, 999, "Illegal roi in channel %d (allowed: 0<=roi<=1024)", jr, roi[jr]);
+            return 0;
+        }
+
+        // Check that the roi of pixels jr are compatible with the one of pixel 0
+        if (jr!=8 && roi[jr]!=roi[0])
+        {
+            xjr = jr;
+            break;
+        }
+
+        // Check that the roi of all other DRS chips on boards are compatible
+        for (int kr = 1; kr < 4; kr++)
+        {
+            const int kroi = ntohs(rbuf->S[roiPtr]);
+            if (kroi != roi[jr])
+            {
+                xjr = jr;
+                xkr = kr;
+                break;
+            }
+            roiPtr += kroi+4;
+        }
+    }
+
+    if (xjr>=0)
+    {
+        if (xkr<0)
+            factPrintf(kFatal, 1, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
+        else
+            factPrintf(kFatal, 1, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rbuf->S[roiPtr]));
+
+        return 0;
+    }
+
+    if (roi[8] < roi[0])
+    {
+        factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
+        //gj.badRoiB++;
+        //gj.badRoi[b]++;
+        return 0;
+    }
+
+    return 1;
+}
+
+deque<shared_ptr<EVT_CTRL>> evtCtrl;
+map<int, shared_ptr<RUN_CTRL>> runCtrl;
+
+void mBufFree(EVT_CTRL *evt)
+{
+    TGB_free(evt->FADhead);
+}
+
+shared_ptr<EVT_CTRL> mBufEvt(const READ_STRUCT *rs)
+{
+    int nRoi[9];
+    if (!checkRoiConsistency(rs->rBuf, nRoi))
+        return shared_ptr<EVT_CTRL>();
+
+    const int  evID   = rs->evtID;
+    const uint runID  = rs->runID;
+    const int  trgTyp = rs->ftmTyp;
+    const int  trgNum = rs->ftmID;
+    const int  fadNum = rs->evtID;
+
+    for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
+    {
+        const shared_ptr<EVT_CTRL> evt = *it;
+
+        // If the run is different, go on searching.
+        // We cannot stop searching if a lower run-id is found as in
+        // the case of the events, because theoretically, there
+        // can be the same run on two different days.
+        if (runID != evt->runNum)
+            continue;
+
+        // If the ID of the new event if higher than the last one stored
+        // in that run, we have to assign a new slot (leave the loop)
+        if (evID > evt->evNum)
+            break;
+
+        if (evID != evt->evNum)
+            continue;
+
+        // We have found an entry with the same runID and evtID
+        // Check if ROI is consistent
+        if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
+        {
+            factPrintf(kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
+                       evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
+            return shared_ptr<EVT_CTRL>();
+        }
+
+        // count for inconsistencies
+        if (evt->trgNum != trgNum)
+            evt->Errors[0]++;
+        if (evt->fadNum != fadNum)
+            evt->Errors[1]++;
+        if (evt->trgTyp != trgTyp)
+            evt->Errors[2]++;
+
+        //everything seems fine so far ==> use this slot ....
+        return evt;
+    }
+
+    struct timeval tv;
+    gettimeofday(&tv, NULL);
+
+    auto ir = runCtrl.find(runID);
+    if (ir==runCtrl.end())
+    {
+        shared_ptr<RUN_CTRL> run(new RUN_CTRL);
+
+        run->runId      = runID;
+        run->roi0       = nRoi[0];  // FIXME: Make obsolete!
+        run->roi8       = nRoi[8];  // FIXME: Make obsolete!
+        run->fileId     = -2;
+        run->lastEvt    = 1;         // Number of events partially started to read
+        run->actEvt     = 0;         // Number of written events (write)
+        run->procEvt    = 0;         // Number of successfully checked events (checkEvent)
+        run->maxEvt     = 999999999; // max number events allowed
+        run->lastTime   = tv.tv_sec;      // Time when the last event was written
+        run->closeTime  = tv.tv_sec + 3600 * 24;     //max time allowed
+
+        ir = runCtrl.insert(make_pair(runID, run)).first;
+    }
+
+    const shared_ptr<RUN_CTRL> run = ir->second;
+
+    if (run->roi0 != nRoi[0] || run->roi8 != nRoi[8])
+    {
+        factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
+                   run->roi0, run->roi8, nRoi[0], nRoi[8], runID, evID);
+        return shared_ptr<EVT_CTRL>();
+    }
+
+    const shared_ptr<EVT_CTRL> evt(new EVT_CTRL, mBufFree);
+
+    //flag all boards as unused
+    evt->nBoard = 0;
+    for (int b=0; b<NBOARDS; b++)
+        evt->board[b] = -1;
+
+    evt->run       = run;
+    evt->pcTime[0] = tv.tv_sec;
+    evt->pcTime[1] = tv.tv_usec;
+    evt->nRoi      = nRoi[0];
+    evt->nRoiTM    = nRoi[8];
+    evt->evNum     = evID;
+    evt->runNum    = runID;
+    evt->fadNum    = fadNum;
+    evt->trgNum    = trgNum;
+    evt->trgTyp    = trgTyp;
+    evt->Errors[0] = 0;
+    evt->Errors[1] = 0;
+    evt->Errors[2] = 0;
+    evt->Errors[3] = 0;
+    evt->fEvent    = NULL;
+    evt->FADhead   = NULL;
+
+    //    -1:   kInValid
+    //     0:   kValid
+    //  1-40:   kIncomplete
+    //    90:   kIncompleteReported
+    //   100:   kCompleteEventInBuffer
+    //  1000+x: kToBeProcessedByThreadX
+    //  5000:   kToBeWritten
+    // 10000:   kToBeDeleted
+
+    evt->evtStat = 0;
+
+    evtCtrl.push_back(evt);
+
+    return evt;
+
+} /*-----------------------------------------------------------------*/
+
+
+void initEvent(const shared_ptr<EVT_CTRL> &evt)
+{
+    evt->fEvent = (EVENT*)((char*)evt->FADhead+MAX_HEAD_MEM);
+    memset(evt->fEvent->Adc_Data, 0, (NPIX+NTMARK)*2*evt->nRoi);
+
+    //flag all pixels as unused
+    for (int k = 0; k < NPIX; k++)
+        evt->fEvent->StartPix[k] = -1;
+
+    //flag all TMark as unused
+    for (int k = 0; k < NTMARK; k++)
+        evt->fEvent->StartTM[k] = -1;
+
+    evt->fEvent->NumBoards   = 0;
+    evt->fEvent->SoftTrig    = 0;
+    evt->fEvent->PCTime      = evt->pcTime[0];
+    evt->fEvent->PCUsec      = evt->pcTime[1];
+    evt->fEvent->Roi         = evt->nRoi;
+    evt->fEvent->RoiTM       = evt->nRoiTM;
+    evt->fEvent->EventNum    = evt->evNum;
+    evt->fEvent->TriggerNum  = evt->trgNum;
+    evt->fEvent->TriggerType = evt->trgTyp;
+}
+
+
+uint64_t reportIncomplete(const shared_ptr<EVT_CTRL> &evt, const char *txt)
+{
+    factPrintf(kWarn, 601, "skip incomplete evt (run=%d, evt=%d, %s)",
+               evt->runNum, evt->evNum, txt);
+
+    uint64_t report = 0;
+
+    char str[1000];
+
+    int ik=0;
+    for (int ib=0; ib<NBOARDS; ib++)
+    {
+        if (ib%10==0)
+            str[ik++] = '|';
+
+        const int jb = evt->board[ib];
+        if (jb>=0) // data received from that board
+        {
+            str[ik++] = '0'+(jb%10);
+            continue;
+        }
+
+        // FIXME: This is not synchronous... it reports
+        // accoridng to the current connection status, not w.r.t. to the
+        // one when the event was taken.
+        if (gi_NumConnect[ib]<=0) // board not connected
+        {
+            str[ik++] = 'x';
+            continue;
+        }
+
+        // data from this board lost
+        str[ik++] = '.';
+        report |= ((uint64_t)1)<<ib;
+    }
+
+    str[ik++] = '|';
+    str[ik]   = 0;
+
+    factOut(kWarn, 601, str);
+
+    return report;
+}
+
+// i == board
+void copyData(CNV_FACT *rbuf, int i, const shared_ptr<EVT_CTRL> &evt)
+{
+    // swapEventHeaderBytes: End of the header. to channels now
+    int eStart = 36;
+    for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
+    {
+        rbuf->S[eStart+0] = ntohs(rbuf->S[eStart+0]);//id
+        rbuf->S[eStart+1] = ntohs(rbuf->S[eStart+1]);//start_cell
+        rbuf->S[eStart+2] = ntohs(rbuf->S[eStart+2]);//roi
+        rbuf->S[eStart+3] = ntohs(rbuf->S[eStart+3]);//filling
+
+        eStart += 4+rbuf->S[eStart+2];//skip the pixel data
+    }
+
+    memcpy(&evt->FADhead[i], rbuf, sizeof(PEVNT_HEADER));
+
+    int src = sizeof(PEVNT_HEADER) / 2;
+
+    // consistency of ROIs have been checked already (is it all correct?)
+    const int roi = rbuf->S[src+2];
+
+    // different sort in FAD board.....
+    for (int px = 0; px < 9; px++)
+    {
+        for (int drs = 0; drs < 4; drs++)
+        {
+            // pixH = rd[i].rBuf->S[src++];    // ID
+            src++;
+
+            const int pixC = rbuf->S[src++];    // start-cell
+            const int pixR = rbuf->S[src++];    // roi
+            //here we should check if pixH is correct ....
+
+            const int pixS = i * 36 + drs * 9 + px;
+            src++;
+
+            evt->fEvent->StartPix[pixS] = pixC;
+
+            const int dest1 = pixS * roi;
+            memcpy(&evt->fEvent->Adc_Data[dest1], &rbuf->S[src], roi * 2);
+
+            src += pixR;
+
+            if (px == 8)
+            {
+                const int tmS = i * 4 + drs;
+
+                //and we have additional TM info
+                if (pixR > roi)
+                {
+                    const int dest2 = tmS * roi + NPIX * roi;
+
+                    const int srcT = src - roi;
+                    evt->fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024;
+
+                    memcpy(&evt->fEvent->Adc_Data[dest2], &rbuf->S[srcT], roi * 2);
+                }
+                else
+                {
+                    evt->fEvent->StartTM[tmS] = -1;
+                }
+            }
+        }
+    }
+}
+
+void doProcess(const shared_ptr<EVT_CTRL> &evt);
+void doWrite(const shared_ptr<EVT_CTRL> &evt);
+
+void checkAndCloseRun(const shared_ptr<RUN_CTRL> &run, int cond, int where);
+
+Queue<shared_ptr<EVT_CTRL>> process(bind(doProcess, placeholders::_1));
+Queue<shared_ptr<EVT_CTRL>> write_queue(bind(doWrite, placeholders::_1));
+
+void preProcess(const shared_ptr<EVT_CTRL> &evt)
+{
+    //-------- it is better to open the run already here, so call can be used to initialize
+    //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
+    const shared_ptr<RUN_CTRL> run = evt->run;
+    if (run->runId==0)
+        return;
+
+    // File not yet open
+    if (run->fileId < 0)
+    {
+        RUN_HEAD actRun;
+        actRun.Version =  1;
+        actRun.RunType = -1;  //to be adapted
+        actRun.Nroi    = evt->nRoi;      //runCtrl[lastRun].roi0;
+        actRun.NroiTM  = evt->nRoiTM;    //runCtrl[lastRun].roi8;
+        actRun.RunTime = evt->pcTime[0]; //runCtrl[lastRun].firstTime;
+        actRun.RunUsec = evt->pcTime[1]; //runCtrl[lastRun].firstUsec;
+        actRun.NBoard  = NBOARDS;
+        actRun.NPix    = NPIX;
+        actRun.NTm     = NTMARK;
+
+        memcpy(actRun.FADhead, evt->FADhead, NBOARDS*sizeof(PEVNT_HEADER));
+
+        run->fileHd = runOpen(evt->runNum, &actRun, sizeof (actRun));
+        if (run->fileHd == NULL)
+        {
+            factPrintf(kError, 502, "procEvt: Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
+            run->fileId = 91;
+
+            // No further processing of this event
+            return;
+        }
+
+        run->fileId = 0;
+        factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
+    }
+
+    //and set correct event header ; also check for consistency in event (not yet)
+    evt->fEvent->Errors[0] = evt->Errors[0];
+    evt->fEvent->Errors[1] = evt->Errors[1];
+    evt->fEvent->Errors[2] = evt->Errors[2];
+    evt->fEvent->Errors[3] = evt->Errors[3];
+
+    for (int ib=0; ib<NBOARDS; ib++)
+    {
+        // board is not read
+        if (evt->board[ib] == -1)
+        {
+            evt->FADhead[ib].start_package_flag = 0;
+            evt->fEvent->BoardTime[ib] = 0;
+        }
+        else
+        {
+            evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time;
+        }
+    }
+
+    const int rc = eventCheck(evt->runNum, evt->FADhead, evt->fEvent);
+
+    // no further processing of event ('delete')
+    if (rc < 0)
+        return;
+
+    //evt->evtStat = 1000;       // flag 'start processing'
+    run->procEvt++;
+    process.post(evt);
+}
+
+void doProcess(const shared_ptr<EVT_CTRL> &evt)
+{
+    const int jret = subProcEvt(1, evt->FADhead, evt->fEvent, 0);
+
+    if (jret>0 && jret<=1)
+        factPrintf(kError, -1, "Process wants to send event to process %d... not allowed.", jret);
+
+    // flag as 'to be written'
+    if (jret<=1)
+        return;
+
+    //evt->evtStat = 5000;
+    write_queue.post(evt);
+}
+
+void doWrite(const shared_ptr<EVT_CTRL> &evt)
+{
+    const shared_ptr<RUN_CTRL> run = evt->run;
+    if (run->runId==0)
+        return;
+
+    // File is not open
+    if (run->fileId!=0)
+        return;
+
+    const int rc = runWrite(run->fileHd, evt->fEvent, 0);
+    if (rc >= 0)
+    {
+        // Sucessfully wrote event
+        run->lastTime = g_actTime;
+        run->actEvt++;
+    }
+    else
+        factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", evt->runNum);
+
+    checkAndCloseRun(run, rc<0, 1);
+
+    /*
+    // Although the are no pending events, we have to check if a run should be closed (timeout)
+    for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
+    {
+        if (ir->second->fileId == 0)
+        {
+            //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
+            const int cond = ir->second->runId == 0;
+            checkAndCloseRun(ir->second, cond, 2);
+        }
+    }
+    */
+}
+
+void *readFAD (void *ptr)
+{
+/* *** main loop reading FAD data and sorting them to complete events */
+
+    Queue<shared_ptr<EVT_CTRL>> queue(bind(preProcess, placeholders::_1));
+
+    factPrintf(kInfo, -1, "Start initializing (readFAD)");
+
+    READ_STRUCT rd[NBOARDS];       //buffer to read IP and afterwards store in mBuffer
+
+    uint32_t actrun = 0;
+
+   const int minLen = sizeof(PEVNT_HEADER);  //min #bytes needed to check header: full header for debug
+
+
+   int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
+   gi_resetS = gi_resetR = 9;
+
+   int sockDef[NBOARDS];        //internal state of sockets
+   memset(sockDef, 0, NBOARDS*sizeof(int));
+
+ START:
+   //time in seconds
+   uint gi_SecTime = time(NULL);;
+   g_actTime = gi_SecTime;
+
+   const int cntsock = 8 - NUMSOCK ;
+
+   if (gi_resetS > 0) {
+      //make sure all sockets are preallocated as 'not exist'
+      for (int i = 0; i < NBOARDS; i++) {
+         rd[i].socket = -1;
+         rd[i].sockStat = 99;
+      }
+
+      for (int k = 0; k < NBOARDS; k++) {
+         gi_NumConnect[k] = 0;
+         //gi.numConn[k] = 0;
+         gj.numConn[k] = 0;
+         //gj.errConn[k] = 0;
+         gj.rateBytes[k] = 0;
+         gj.totBytes[k] = 0;
+      }
+
+   }
+
+   if (gi_resetR > 0)
+   {
+      gj.bufTot  = gj.maxEvt = gj.xxxEvt = 0;
+      gj.usdMem  = gj.maxMem = gj.xxxMem = 0;
+      gj.totMem  = tgb_memory;
+      gj.bufNew  = gj.bufEvt = 0;
+      gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
+
+      factPrintf(kInfo, -1, "End   initializing (readFAD)");
+   }
+
+
+   gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
+
+   //loop until global variable g_runStat claims stop
+   while (g_runStat >= 0 && g_reset == 0)
+   {
+      gj.readStat = g_runStat;
+
+      for (int b = 0; b < NBOARDS; b++)
+      {
+          // Nothing has changed
+          if (g_port[b].sockDef == sockDef[b])
+              continue;
+
+          gi_NumConnect[b] = 0;       //must close all connections
+          gj.numConn[b] = 0;
+
+          // s0 =  0: sockets to be defined and opened
+          // s0 = -1: sockets to be destroyed
+          // s0 = +1: sockets to be closed and reopened
+
+          int s0 = 0;
+          if (sockDef[b] != 0)
+              s0 = g_port[b].sockDef==0 ? -1 : +1;
+
+          const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0;
+
+          GenSock(s0, b, p0+1, &g_port[b].sockAddr, &rd[b]); //generate address and socket
+
+          sockDef[b] = g_port[b].sockDef;
+      }
+
+      // count the number of active boards
+      int actBoards = 0;
+      for (int b = 0; b < NBOARDS; b++)
+          if (sockDef[b] > 0)
+              actBoards++;
+
+      //check all sockets if something to read
+      for (int i = 0; i < NBOARDS; i++)
+      {
+          // Do not try to connect this socket
+          if (rd[i].sockStat > 0)
+              continue;
+
+          if (rd[i].sockStat == -1)
+          {
+              //try to connect if not yet done
+              rd[i].sockStat = connect (rd[i].socket,
+                                        (struct sockaddr *) &rd[i].SockAddr,
+                                        sizeof (rd[i].SockAddr));
+              // Failed
+              if (rd[i].sockStat == -1)
+              {
+                  rd[i].errCnt++;
+                  usleep(25000);
+                  continue;
+              }
+
+              // Success (rd[i].sockStat == 0)
+
+              if (sockDef[i] > 0)
+              {
+                  rd[i].bufTyp = 0;                     // expect a header
+                  rd[i].bufLen = sizeof(PEVNT_HEADER);  // max size to read at begining
+              }
+              else
+              {
+                  rd[i].bufTyp = -1;       // full data to be skipped
+                  rd[i].bufLen = MAX_LEN;  // huge for skipping
+              }
+
+              rd[i].bufPos = 0;  //  no byte read so far
+              rd[i].skip   = 0;  //  start empty
+
+              gi_NumConnect[i] += cntsock;
+              gj.numConn[i]++;
+
+              factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", i, gj.numConn[i]);
+          }
+
+          // Do not read from this socket
+          if (rd[i].bufLen<0)
+              continue;
+
+          if (rd[i].bufLen>0)
+          {
+              const int32_t jrd =
+                  recv(rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
+                       rd[i].bufLen, MSG_DONTWAIT);
+
+              // recv failed
+              if (jrd<0)
+              {
+                  // There was just nothing waiting
+                  if (errno==EWOULDBLOCK || errno==EAGAIN)
+                      continue;
+
+                  factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);
+                  continue;
+              }
+
+              // connection was closed ...
+              if (jrd==0)
+              {
+                  factPrintf(kInfo, 441, "Socket %d closed by FAD", i);
+
+                  const int s0 = sockDef[i] > 0 ? +1 : -1;
+                  GenSock(s0, i, 0, NULL, &rd[i]);
+
+                  gi_NumConnect[i]-= cntsock ;
+                  gj.numConn[i]--;
+
+                  continue;
+              }
+
+              gj.rateBytes[i] += jrd;
+
+              // are we skipping this board ...
+              if (rd[i].bufTyp < 0)
+                  continue;
+
+              rd[i].bufPos += jrd;  //==> prepare for continuation
+              rd[i].bufLen -= jrd;
+
+#ifdef EVTDEBUG
+              debugRead(i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, rd[i].bufTyp, tv.tv_sec, tv.tv_usec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
+#endif
+          }
+
+          //we are reading event header
+          if (rd[i].bufTyp <= 0)
+          {
+              //not yet sufficient data to take action
+              if (rd[i].bufPos < minLen)
+                  continue;
+
+              //check if startflag correct; else shift block ....
+              // FIXME: This is not enough... this combination of
+              //        bytes can be anywhere... at least the end bytes
+              //        must be checked somewhere, too.
+              int k;
+              for (k = 0; k < rd[i].bufPos - 1; k++)
+              {
+                  //start.S = 0xFB01;
+                  if (rd[i].rBuf->B[k] == 0xfb && rd[i].rBuf->B[k+1] == 0x01)
+                      break;
+              }
+              rd[i].skip += k;
+
+              //no start of header found
+              if (k >= rd[i].bufPos - 1)
+              {
+                  rd[i].rBuf->B[0] = rd[i].rBuf->B[rd[i].bufPos];
+                  rd[i].bufPos = 1;
+                  rd[i].bufLen = sizeof(PEVNT_HEADER)-1;
+                  continue;
+              }
+
+              if (k > 0)
+              {
+                  rd[i].bufPos -= k;
+                  rd[i].bufLen += k;
+                  memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
+                           rd[i].bufPos);
+              }
+
+              if (rd[i].bufPos < minLen)
+                  continue;
+
+              if (rd[i].skip > 0)
+              {
+                  factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i);
+                  rd[i].skip = 0;
+              }
+
+              // TGB: This needs much more checks than just the first two bytes!
+
+              // Swap everything except start_package_flag.
+              // It is to difficult to find out where it is used how,
+              // but it doesn't really matter because it is not really
+              // used anywehere else
+              //   rd[i].rBuf->S[1]  = ntohs(rd[i].rBuf->S[1]);    // package_length
+              rd[i].rBuf->S[2]  = ntohs(rd[i].rBuf->S[2]);    // version_no
+              rd[i].rBuf->S[3]  = ntohs(rd[i].rBuf->S[3]);    // PLLLCK
+              rd[i].rBuf->S[4]  = ntohs(rd[i].rBuf->S[4]);    // trigger_crc
+              rd[i].rBuf->S[5]  = ntohs(rd[i].rBuf->S[5]);    // trigger_type
+
+              rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]);   // board id
+              rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]);   // adc_clock_phase_shift
+              rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]);   // number_of_triggers_to_generate
+              rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]);   // trigger_generator_prescaler
+
+              rd[i].rBuf->I[3]  = ntohl(rd[i].rBuf->I[3]);    // trigger_id
+              rd[i].rBuf->I[4]  = ntohl(rd[i].rBuf->I[4]);    // fad_evt_counter
+              rd[i].rBuf->I[5]  = ntohl(rd[i].rBuf->I[5]);    // REFCLK_frequency
+
+              rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]);   // runnumber;
+              rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]);   // time;
+
+              for (int s=24;s<24+NTemp+NDAC;s++)
+                  rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac
+
+              rd[i].fadLen  = ntohs(rd[i].rBuf->S[1]) * 2;
+              rd[i].fadVers = rd[i].rBuf->S[2];
+              rd[i].ftmTyp  = rd[i].rBuf->S[5];
+              rd[i].ftmID   = rd[i].rBuf->I[3];    //(FTMevt)
+              rd[i].evtID   = rd[i].rBuf->I[4];    //(FADevt)
+              rd[i].runID   = rd[i].rBuf->I[11]==0 ? g_actTime : rd[i].rBuf->I[11];
+              rd[i].bufTyp  = 1;  //ready to read full record
+              rd[i].bufLen  = rd[i].fadLen - rd[i].bufPos;
+
+              const int fadBoard = rd[i].rBuf->S[12];
+              debugHead(i, fadBoard, rd[i].rBuf);
+
+              continue;
+          }
+
+          // are we reading data
+
+          // not yet all read
+          if (rd[i].bufLen > 0)
+              continue;
+
+          // stop.S = 0x04FE;
+          if (rd[i].rBuf->B[rd[i].fadLen - 1] != 0xfe ||
+              rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04)
+          {
+              factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d",
+                         i, rd[i].evtID, rd[i].fadLen,
+                         rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]);
+
+              // ready to read next header
+              rd[i].bufTyp = 0;
+              rd[i].bufLen = sizeof(PEVNT_HEADER);
+              rd[i].bufPos = 0;
+
+              continue;
+          }
+
+          //  int actid;
+          //  if (g_useFTM > 0)
+          //     actid = rd[i].evtID;
+          //  else
+          //     actid = rd[i].ftmID;
+
+          //get index into mBuffer for this event (create if needed)
+          const shared_ptr<EVT_CTRL> evt = mBufEvt(&rd[i]);
+
+          // We have a valid entry, but no memory has yet been allocated
+          if (evt && evt->FADhead == NULL)
+          {
+              // Try to get memory from the big buffer
+              evt->FADhead = (PEVNT_HEADER*)TGB_Malloc();
+              if (evt->FADhead == NULL)
+              {
+                  // If this works properly, this is a hack which can be removed, or
+                  // replaced by a signal or dim message
+                  if (rd[i].bufTyp==2)
+                      factPrintf(kError, 882, "malloc failed for event %d (run=%d)", evt->evNum, evt->runNum);
+                  rd[i].bufTyp = 2;
+                  continue;
+              }
+
+              // Initialise contents of mBuffer[evID]->fEvent
+              initEvent(evt);
+
+              // Some statistics
+              gj.usdMem = tgb_inuse;
+
+              if (gj.usdMem > gj.maxMem)
+                  gj.maxMem = gj.usdMem;
+
+              gj.rateNew++;
+              gj.bufTot++;
+              if (gj.bufTot > gj.maxEvt)
+                  gj.maxEvt = gj.bufTot;
+          }
+
+          rd[i].bufTyp = 0;
+          rd[i].bufLen = sizeof(PEVNT_HEADER);
+          rd[i].bufPos = 0;
+
+          // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
+          if (!evt)
+              continue;
+
+          //we have a valid entry in mBuffer[]; fill it
+          const int fadBoard = rd[i].rBuf->S[12];
+          const int fadCrate = fadBoard>>8;
+
+          if (i != (fadCrate * 10 + (fadBoard&0xff)))
+          {
+              factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)",
+                         i, fadBoard, fadCrate, fadBoard&0xff);
+          }
+
+          if (evt->board[i] != -1)
+          {
+              factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d",
+                         evt->evNum, i, i, rd[i].fadLen,
+                         rd[i].rBuf->B[0], rd[i].rBuf->B[1],
+                         rd[i].rBuf->B[rd[i].fadLen - 2],
+                         rd[i].rBuf->B[rd[i].fadLen - 1]);
+              continue; // Continue reading next header
+          }
+
+          // Copy data from rd[i] to mBuffer[evID]
+          copyData(rd[i].rBuf, i, evt);
+
+          // now we have stored a new board contents into Event structure
+
+          evt->fEvent->NumBoards++;
+          evt->board[i] = i;
+          evt->nBoard++;
+          evt->evtStat = evt->nBoard;
+
+          // have we already reported first (partial) event of this run ???
+          if (evt->nBoard==1 && evt->runNum != actrun)
+          {
+             // Signal the fadctrl that a new run has been started
+              gotNewRun(evt->runNum, NULL);
+
+              factPrintf(kInfo, 1, "gotNewRun called, prev run %d, new run %d, event %d",
+                         actrun, evt->runNum, evt->evNum);
+
+              // We got the first part of this event, so this is
+              // the number of events we expect for this run
+              evt->run->lastEvt++;
+
+              // Since we have started a new run, we know already when to close the
+              // previous run in terms of number of events
+              const auto ir = runCtrl.find(actrun);
+              if (ir!=runCtrl.end())
+                  ir->second->maxEvt = ir->second->lastEvt;
+
+              // Change 'actrun' the the new runnumber
+              actrun = evt->runNum;
+          }
+
+          // event not yet complete
+          if (evt->nBoard < actBoards)
+              continue;
+
+          // GARBAGE COLLECTION
+          // This is a non-ideal hack to lower the probability that
+          // in mBufEvt the search for correct entry in runCtrl
+          // will not return a super-old entry. I don't want
+          // to manipulate that in another thread.
+          for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
+          {
+              if (ir->runId==evt->runNum)
+                  break;
+
+              if (ir->second->fileId>0)
+                  runCtrl.erase(ir);
+          }
+
+          // we have just completed an event... so all previous events
+          // must have been completed already. If they are not, there
+          // is no need to wait for the timeout, because they will never
+          // get completed. We can just ensure that if we check for the previous
+          // event to be complete every time we receive a new complete event.
+          // If we find an incomplete one, we remove all consecutive
+          // incomplete ones.
+          for (auto it=evtCtrl.begin()+1; it!=evtCtrl.end(); it++)
+          {
+              const shared_ptr<EVT_CTRL> e = *it;
+
+              if (e.get()==evt.get())
+              {
+                  queue.post(e);
+                  evtCtrl.erase(it);
+                  break;
+              }
+
+              reportIncomplete(e, "expired");
+              evtCtrl.pop_front();
+          }
+
+      } // end for loop over all sockets
+
+      g_actTime = time (NULL);
+      if (g_actTime <= gi_SecTime)
+      {
+          usleep(1);
+          continue;
+      }
+      gi_SecTime = g_actTime;
+
+      gj.bufNew = 0;
+
+      //loop over all active events and flag those older than read-timeout
+      //delete those that are written to disk ....
+
+      const int count = evtCtrl.size();//(evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT;
+
+      // This could be improved having the pointer which separates the queue with
+      // the incomplete events from the queue with the complete events
+      for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
+      {
+          const shared_ptr<EVT_CTRL> evt = *it;
+
+          // Check the more likely case first: incomplete events
+          if (evt->evtStat>=0 && evt->evtStat<100)
+          {
+              gj.bufNew++;     //incomplete event in Buffer
+
+              // Event has not yet timed out or was reported already
+              if (evt->evtStat==90 || evt->pcTime[0]>=g_actTime - 30)
+                  continue;
+
+              // This will result in the emission of a dim service.
+              // It doesn't matter if that takes comparably long,
+              // because we have to stop the run anyway.
+              const uint64_t rep = reportIncomplete(evt, "timeout");
+              factReportIncomplete(rep);
+
+              //timeout for incomplete events
+              evt->evtStat = 90;
+              gj.evtSkip++;
+
+              continue;
+          }
+
+          // Check the less likely case: 'useless' or 'delete'
+          // evtState==0 can happen if the event was initialized (some data received)
+          // but the data did not make sense (e.g. inconsistent rois)
+          if (evt->evtStat==0 || evt->evtStat == 10000)
+          {
+              evtCtrl.erase(it); //event written--> free memory
+
+              gj.evtWrite++;
+              gj.rateWrite++;
+          }
+
+          // Remove leading invalidated slots from queue
+          // Do they exist at all?
+          if (evt->evtStat==-1)
+              evtCtrl.erase(it);
+      }
+
+
+
+      // The number of complete events in the buffer is the total number of
+      // events in the buffer minus the number of incomplete events.
+      gj.bufEvt = count - gj.bufNew;
+
+      gj.deltaT = 1000;      //temporary, must be improved
+
+      for (int ib = 0; ib < NBOARDS; ib++)
+          gj.totBytes[ib] += gj.rateBytes[ib];
+
+      gj.totMem = tgb_memory;
+
+      if (gj.maxMem > gj.xxxMem)
+          gj.xxxMem = gj.maxMem;
+      if (gj.maxEvt > gj.xxxEvt)
+          gj.xxxEvt = gj.maxEvt;
+
+      factStat (gj);
+      //factStatNew (gi);
+      gj.rateNew = gj.rateWrite = 0;
+      gj.maxMem = gj.usdMem;
+      gj.maxEvt = gj.bufTot;
+      for (int b = 0; b < NBOARDS; b++)
+          gj.rateBytes[b] = 0;
+
+   } // while (g_runStat >= 0 && g_reset == 0)
+
+   factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset);
+
+   if (g_reset > 0)
+   {
+       gi_reset  = g_reset;
+       gi_resetR = gi_reset % 10;          //shall we stop reading ?
+       gi_resetS = (gi_reset / 10) % 10;   //shall we close sockets ?
+       gi_resetW = (gi_reset / 100) % 10;  //shall we close files ?
+       gi_resetX = gi_reset / 1000;        //shall we simply wait resetX seconds ?
+       g_reset   = 0;
+   }
+   else
+   {
+       gi_reset  = 0;
+       gi_resetR = g_runStat == -1 ? 1 : 7;
+
+       gi_resetS = 7;            //close all sockets
+       gi_resetW = 7;            //close all files
+       gi_resetX = 0;
+
+       //inform others we have to quit ....
+       gj.readStat = -11;        //inform all that no update to happen any more
+   }
+
+   if (gi_resetS > 0)
+   {
+       //must close all open sockets ...
+       factPrintf(kInfo, -1, "Close all sockets...");
+
+       for (int i = 0; i < NBOARDS; i++)
+       {
+           if (rd[i].sockStat != 0)
+               continue;
+
+           GenSock(-1, i, 0, NULL, &rd[i]);   //close and destroy open socket
+
+           gi_NumConnect[i]-= cntsock ;
+           gj.numConn[i]--;
+           sockDef[i] = 0;      //flag ro recreate the sockets ...
+           rd[i].sockStat = -1; //and try to open asap
+       }
+   }
+
+
+   if (gi_resetR > 0)
+   {
+       //and clear all buffers (might have to wait until all others are done)
+       while (evtCtrl.size())
+       {
+           const shared_ptr<EVT_CTRL> evt = evtCtrl.front();
+
+           // flag incomplete events as 'read finished'
+           // We cannot just detele all events, because some might currently being processed,
+           // so we have to wait until the processing thread currently processing the event
+           // signals that the event can be deleted. (Note, that there are currently never
+           // two threads processing the same event at the same time)
+           if ((evt->evtStat>0 && evt->evtStat<90) || evt->evtStat==10000)
+               evtCtrl.pop_front();
+
+           usleep(1);
+       }
+   }
+
+   //queue.wait();
+   //queue.join();
+
+   if (gi_reset > 0)
+   {
+      if (gi_resetW > 0)
+         CloseRunFile (0, 0, 0);        //ask all Runs to be closed
+
+      if (gi_resetX > 0)
+      {
+          struct timespec xwait;
+          xwait.tv_sec = gi_resetX;
+          xwait.tv_nsec = 0;
+          nanosleep (&xwait, NULL);
+      }
+
+      factPrintf(kInfo, -1, "Continue read Process ...");
+      gi_reset = 0;
+      goto START;
+   }
+
+   factPrintf(kInfo, -1, "Exit read Process...");
+
+   factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse);
+
+   gj.readStat = -99;
+
+   factStat (gj);
+   //factStatNew (gi);
+
+   return 0;
+
+} /*-----------------------------------------------------------------*/
+/*
+
+void *subProc(void *thrid)
+{
+    const int64_t threadID = (int64_t)thrid;
+
+    factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID);
+
+    while (g_runStat > -2) //in case of 'exit' we still must process pending events
+    {
+        int numWait = 0;
+
+        for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
+        {
+            const shared_ptr<EVT_CTRL> evt = *it;
+
+           // This is a threading issue... the evtStat might have been invalid
+           // but the frstPtr is not yet updated
+           if (evt->evtStat==-1)
+               continue;
+
+           // If we find the first event still waiting for processing
+           // there will be only unprocessed events after this one in the queue
+           if (evt->evtStat<1000+threadID)
+           {
+               numWait = 1;
+               break;
+           }
+
+           // If the event was processed already, skip it
+           // We could replace that to a moving pointer pointing to the first
+           // non-processed event
+           if (evt->evtStat!=1000+threadID)
+               continue;
+
+            const int jret = subProcEvt(threadID, evt->FADhead, evt->fEvent, 0);
+
+            if (jret>0 && jret<=threadID)
+                factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret);
+
+            if (jret<=threadID)
+            {
+                evt->evtStat = 10000;            // flag as 'to be deleted'
+                continue;
+            }
+
+            if (jret>=gi_maxProc)
+            {
+                evt->evtStat = 5000;            // flag as 'to be written'
+                continue;
+            }
+
+            evt->evtStat = 1000 + jret;       // flag for next proces
+        }
+
+        if (gj.readStat < -10 && numWait == 0) {  //nothing left to do
+            factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID);
+            return 0;
+        }
+
+        usleep(1);
+    }
+
+    factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID);
+
+    return 0;
+}
+
+
+void *
+procEvt (void *ptr)
+{
+   int status;
+
+   factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc);
+
+   pthread_t thread[100];
+//   int th_ret[100];
+
+   for (long long k = 0; k < gi_maxProc; k++) {
+       pthread_create (&thread[k], NULL, subProc, (void *) k);
+   }
+
+   // in case of 'exit' we still must process pending events
+   while (g_runStat > -2)
+   {
+       int numWait = 0;
+
+       for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
+       {
+           const shared_ptr<EVT_CTRL> evt = *it;
+
+           // This is a threading issue... the evtStat might have been invalid
+           // but the frstPtr is not yet updated
+           if (evt->evtStat==-1)
+               continue;
+
+           // If we find the first incomplete event which is not supposed to
+           // be processed, there are only more incomplete events in the queue
+           if (evt->evtStat<90)
+           {
+               numWait = 1;
+               break;
+           }
+
+           // If the event was processed already, skip it.
+           // We could replace that to a moving pointer pointing to the first
+           // non-processed event
+           if (evt->evtStat>=1000)
+               continue;
+
+           //-------- it is better to open the run already here, so call can be used to initialize
+           //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
+           const uint32_t irun = evt->runNum;
+           const int32_t  ievt = evt->evNum;
+
+           const shared_ptr<RUN_CTRL> run = evt->run;
+           if (run->runId==0)
+               continue;
+
+           // File not yet open
+           if (run->fileId < 0)
+           {
+               RUN_HEAD actRun;
+               actRun.Version =  1;
+               actRun.RunType = -1;  //to be adapted
+               actRun.Nroi    = evt->nRoi;      //runCtrl[lastRun].roi0;
+               actRun.NroiTM  = evt->nRoiTM;    //runCtrl[lastRun].roi8;
+               actRun.RunTime = evt->pcTime[0]; //runCtrl[lastRun].firstTime;
+               actRun.RunUsec = evt->pcTime[1]; //runCtrl[lastRun].firstUsec;
+               actRun.NBoard  = NBOARDS;
+               actRun.NPix    = NPIX;
+               actRun.NTm     = NTMARK;
+
+               memcpy(actRun.FADhead, evt->FADhead, NBOARDS*sizeof(PEVNT_HEADER));
+
+               run->fileHd = runOpen(irun, &actRun, sizeof (actRun));
+               if (run->fileHd == NULL)
+               {
+                   factPrintf(kError, 502, "procEvt: Could not open new file for run %d (evt=%d, runOpen failed)", irun, ievt);
+                   run->fileId = 91;
+                   continue;
+               }
+
+               run->fileId = 0;
+
+               factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt);
+           }
+
+           //--------
+           //--------
+
+           //and set correct event header ; also check for consistency in event (not yet)
+           evt->fEvent->Errors[0] = evt->Errors[0];
+           evt->fEvent->Errors[1] = evt->Errors[1];
+           evt->fEvent->Errors[2] = evt->Errors[2];
+           evt->fEvent->Errors[3] = evt->Errors[3];
+
+           for (int ib=0; ib<NBOARDS; ib++)
+           {
+               // board is not read
+               if (evt->board[ib] == -1)
+               {
+                   evt->FADhead[ib].start_package_flag = 0;
+                   evt->fEvent->BoardTime[ib] = 0;
+               }
+               else
+               {
+                   evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time;
+               }
+           }
+
+           const int rc = eventCheck(evt->runNum, evt->FADhead, evt->fEvent);
+           if (rc < 0)
+           {
+               evt->evtStat = 10000;        // flag event to be deleted
+           }
+           else
+           {
+               evt->evtStat = 1000;       // flag 'start processing'
+               run->procEvt++;
+           }
+       }
+
+       if (gj.readStat < -10 && numWait == 0) {  //nothing left to do
+           factPrintf(kInfo, -1, "Exit Processing Process ...");
+           gj.procStat = -22;     //==> we should exit
+           return 0;
+       }
+
+       usleep(1);
+
+       gj.procStat = gj.readStat;
+   }
+
+   //we are asked to abort asap ==> must flag all remaining events
+   //   when gi_runStat claims that all events are in the buffer...
+
+   factPrintf(kInfo, -1, "Abort Processing Process ...");
+
+   for (int k = 0; k < gi_maxProc; k++) {
+      pthread_join (thread[k], (void **) &status);
+   }
+
+   gj.procStat = -99;
+
+   return 0;
+
+} */
+
+int
+CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
+{
+/* close run runId (all all runs if runId=0) */
+/* return: 0=close scheduled / >0 already closed / <0 does not exist */
+
+   if (runId == 0)
+   {
+       for (auto it=runCtrl.begin(); it!=runCtrl.end(); it++)
+       {
+           const shared_ptr<RUN_CTRL> run = it->second;
+
+           //run is open
+           if (run->fileId == 0)
+           {
+               run->closeTime = closeTime;
+               run->maxEvt = maxEvt;
+           }
+       }
+       return 0;
+   }
+
+   auto it=runCtrl.find(runId);
+   if (it==runCtrl.end())
+       return -1;
+
+   const shared_ptr<RUN_CTRL> run = it->second;
+
+   // run already closed
+   if (run->fileId>0)
+       return +2;
+
+   run->closeTime = closeTime;
+   run->maxEvt = maxEvt;
+
+   return run->fileId==0 ? 0 : 1;
+
+}
+
+void checkAndCloseRun(const shared_ptr<RUN_CTRL> &run, int cond, int where)
+{
+    if (!cond &&
+        run->closeTime >= g_actTime &&
+        run->lastTime >= g_actTime - 300 &&
+        run->maxEvt > run->actEvt)
+        return;
+
+    //close run for whatever reason
+    int ii = 0;
+    if (cond)
+        ii = 1;
+    if (run->closeTime < g_actTime)
+        ii |= 2; // = 2;
+    if (run->lastTime < g_actTime - 300)
+        ii |= 4; // = 3;
+    if (run->maxEvt <= run->actEvt)
+        ii |= 8; // = 4;
+
+    run->closeTime = g_actTime - 1;
+
+    const int rc = runClose(run->fileHd, NULL, 0);//&runTail[j], sizeof(runTail[j]));
+    if (rc<0)
+    {
+        factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)",
+                   where, run->runId, rc);
+        run->fileId = 92+where*2;
+    }
+    else
+    {
+        factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)",
+                   where, run->runId, ii);
+        run->fileId = 93+where*2;
+    }
+}
+
+/*-----------------------------------------------------------------*/
+
+/*
+void *writeEvt (void *ptr)
+{
+   factPrintf(kInfo, -1, "Starting write-thread");
+
+   while (g_runStat > -2)
+   {
+       //int numWrite = 0;
+       int numWait  = 0;
+
+       // Note that the current loop does not at all gurantee that
+       // the events are written in the correct order.
+       for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
+       {
+           const shared_ptr<EVT_CTRL> evt = *it;
+
+           // This is a threading issue... the evtStat might have been invalid
+           // but the frstPtr is not yet updated
+           if (evt->evtStat==-1)
+               continue;
+
+           // If we find the first non-written event which is not supposed to
+           // be written, there are only more incomplete events in the queue
+           if (evt->evtStat<5000)
+           {
+               numWait = 1;
+               break;
+           }
+
+           // If the event was written already already, skip it
+           // We could replace that to a moving pointer pointing to the first
+           // non-processed event
+           if (evt->evtStat!=5000)
+               continue;
+
+           const shared_ptr<RUN_CTRL> run = evt->run;
+           if (run->runId==0)
+               continue;
+
+           // File is open
+           if (run->fileId==0)
+           {
+               const int rc = runWrite(run->fileHd, evt->fEvent, 0);
+               if (rc >= 0)
+               {
+                   // Sucessfully wrote event
+                   run->lastTime = g_actTime;
+                   run->actEvt++;
+               }
+               else
+                   factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", evt->runNum);
+
+               checkAndCloseRun(run, rc<0, 1);
+           }
+
+           evt->evtStat = 10000;  // event written (or has to be discarded) -> delete
+       }
+
+       // Although the are no pending events, we have to check if a run should be closed (timeout)
+       for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
+       {
+           if (ir->second->fileId == 0)
+           {
+               //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
+               const int cond = ir->second->runId == 0;
+               checkAndCloseRun(ir->second, cond, 2);
+           }
+       }
+
+       usleep(1);
+
+       //nothing left to do
+       if (gj.readStat < -10 && numWait == 0)
+       {
+           factPrintf(kInfo, -1, "Finish Write Process ...");
+           gj.writStat = -22;     //==> we should exit
+           break;
+       }
+
+       gj.writStat = gj.readStat;
+   }
+
+   factPrintf(kInfo, -1, "Close all open files ...");
+   for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
+   {
+       if (ir->second->fileId == 0)
+           checkAndCloseRun(ir->second, 1, 3);
+   }
+
+   gj.writStat = -99;
+
+   factPrintf(kInfo, -1, "Exit Writing Process ...");
+
+   return 0;
+}
+   */
+
+
+
+
+void
+StartEvtBuild ()
+{
+
+   int i, /*j,*/ imax, status/*, th_ret[50]*/;
+   pthread_t thread[50];
+   struct timespec xwait;
+
+   gj.readStat = gj.procStat = gj.writStat = 0;
+
+   factPrintf(kInfo, -1, "Starting EventBuilder V15.07 A");
+
+
+   gi_maxProc = g_maxProc;
+   if (gi_maxProc <= 0 || gi_maxProc > 90) {
+      factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc);
+      gi_maxProc = 1;
+   }
+
+//start all threads (more to come) when we are allowed to ....
+   while (g_runStat == 0) {
+      xwait.tv_sec = 0;
+      xwait.tv_nsec = 10000000; // sleep for ~10 msec
+      nanosleep (&xwait, NULL);
+   }
+
+   i = 0;
+   /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL);
+   i++;
+   ///*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL);
+   //i++;
+   ///*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL);
+   //i++;
+   imax = i;
+
+
+#ifdef BILAND
+   xwait.tv_sec = 30;;
+   xwait.tv_nsec = 0;           // sleep for ~20sec
+   nanosleep (&xwait, NULL);
+
+   printf ("close all runs in 2 seconds\n");
+
+   CloseRunFile (0, time (NULL) + 2, 0);
+
+   xwait.tv_sec = 1;;
+   xwait.tv_nsec = 0;           // sleep for ~20sec
+   nanosleep (&xwait, NULL);
+
+   printf ("setting g_runstat to -1\n");
+
+   g_runStat = -1;
+#endif
+
+
+//wait for all threads to finish
+   for (i = 0; i < imax; i++) {
+      /*j =*/ pthread_join (thread[i], (void **) &status);
+   }
+
+} /*-----------------------------------------------------------------*/
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+  /*-----------------------------------------------------------------*/
+  /*-----------------------------------------------------------------*/
+  /*-----------------------------------------------------------------*/
+  /*-----------------------------------------------------------------*/
+  /*-----------------------------------------------------------------*/
+
+#ifdef BILAND
+
+int
+subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
+            int8_t * buffer)
+{
+   printf ("called subproc %d\n", threadID);
+   return threadID + 1;
+}
+
+
+
+
+  /*-----------------------------------------------------------------*/
+  /*-----------------------------------------------------------------*/
+  /*-----------------------------------------------------------------*/
+  /*-----------------------------------------------------------------*/
+  /*-----------------------------------------------------------------*/
+
+
+
+
+FileHandle_t
+runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
+{
+   return 1;
+};
+
+int
+runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
+{
+   return 1;
+   usleep (10000);
+   return 1;
+}
+
+
+//{ return 1; } ;
+
+int
+runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
+{
+   return 1;
+};
+
+
+
+
+int
+eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
+{
+   int i = 0;
+
+// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
+// for (i=0; i<NBOARDS; i++) {
+//    printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
+// }
+   return 0;
+}
+
+
+void
+factStatNew (EVT_STAT gi)
+{
+   int i;
+
+//for (i=0;i<MAX_SOCK;i++) {
+//   printf("%4d",gi.numRead[i]);
+//   if (i%20 == 0 ) printf("\n");
+//}
+}
+
+void
+gotNewRun (int runnr, PEVNT_HEADER * headers)
+{
+   printf ("got new run %d\n", runnr);
+   return;
+}
+
+void
+factStat (GUI_STAT gj)
+{
+//  printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
+//    array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
+}
+
+
+void
+debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
+           int state, uint32_t tsec, uint32_t tusec)
+{
+//  printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
+}
+
+
+
+void
+debugStream (int isock, void *buf, int len)
+{
+}
+
+void
+debugHead (int i, int j, void *buf)
+{
+}
+
+
+void
+factOut (int severity, int err, char *message)
+{
+   static FILE *fd;
+   static int file = 0;
+
+   if (file == 0) {
+      printf ("open file\n");
+      fd = fopen ("x.out", "w+");
+      file = 999;
+   }
+
+   fprintf (fd, "%3d %3d | %s \n", severity, err, message);
+
+   if (severity != kDebug)
+      printf ("%3d %3d | %s\n", severity, err, message);
+}
+
+
+
+int
+main ()
+{
+   int i, b, c, p;
+   char ipStr[100];
+   struct in_addr IPaddr;
+
+   g_maxMem = 1024 * 1024;      //MBytes
+   g_maxMem = g_maxMem * 200;   //100MBytes
+
+   g_maxProc = 20;
+
+   g_runStat = 40;
+
+   i = 0;
+
+// version for standard crates
+//for (c=0; c<4,c++) {
+//   for (b=0; b<10; b++) {
+//      sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
+//
+//      inet_pton(PF_INET, ipStr, &IPaddr) ;
+//
+//      g_port[i].sockAddr.sin_family = PF_INET;
+//      g_port[i].sockAddr.sin_port = htons(5000) ;
+//      g_port[i].sockAddr.sin_addr = IPaddr ;
+//      g_port[i].sockDef = 1 ;
+//      i++ ;
+//   }
+//}
+//
+//version for PC-test * 
+   for (c = 0; c < 4; c++) {
+      for (b = 0; b < 10; b++) {
+         sprintf (ipStr, "10.0.%d.11", 128 + c);
+         if (c < 2)
+            sprintf (ipStr, "10.0.%d.11", 128);
+         else
+            sprintf (ipStr, "10.0.%d.11", 131);
+//      if (c==0) sprintf(ipStr,"10.0.100.11") ;
+
+         inet_pton (PF_INET, ipStr, &IPaddr);
+         p = 31919 + 100 * c + 10 * b;
+
+
+         g_port[i].sockAddr.sin_family = PF_INET;
+         g_port[i].sockAddr.sin_port = htons (p);
+         g_port[i].sockAddr.sin_addr = IPaddr;
+         g_port[i].sockDef = 1;
+
+         i++;
+      }
+   }
+
+
+//g_port[17].sockDef =-1 ;
+//g_actBoards-- ; 
+
+   StartEvtBuild ();
+
+   return 0;
+
+}
+#endif
