Index: /trunk/FACT++/src/EventBuilder.cc
===================================================================
--- /trunk/FACT++/src/EventBuilder.cc	(revision 16039)
+++ /trunk/FACT++/src/EventBuilder.cc	(revision 16040)
@@ -1,85 +1,42 @@
-// #define EVTDEBUG
-
-#define NUMSOCK   1          //set to 7 for old configuration
-#define MAXREAD  65536       //64kB wiznet buffer
-
-#include <stdlib.h>
-#include <stdint.h>
-#include <stdarg.h>
-#include <unistd.h>
-#include <stdio.h>
 #include <sys/time.h>
-#include <arpa/inet.h>
-#include <string.h>
-#include <math.h>
-#include <error.h>
-#include <errno.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
+#include <sys/epoll.h>
 #include <netinet/tcp.h>
-#include <pthread.h>
-#include <sched.h>                              
-
-#include <memory>
-#include <deque>
-#include <map>
+
+#include <cstring>
+#include <cstdarg>
+#include <list>
+#include <forward_list>
 
 #include "queue.h"
 
+#include "MessageImp.h"
+
+using namespace std;
+
 #include "EventBuilder.h"
 
-enum Severity
-{
-   kMessage = 10,               ///< Just a message, usually obsolete
-   kInfo = 20,                  ///< An info telling something which can be interesting to know
-   kWarn = 30,                  ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
-   kError = 40,                 ///< Error, something unexpected happened, but can still be handled by the program
-   kFatal = 50,                 ///< An error which cannot be handled at all happend, the only solution is program termination
-   kDebug = 99,                 ///< A message used for debugging only
-};
-
-using namespace std;
-
-#define MIN_LEN  32             // min #bytes needed to interpret FADheader
-#define MAX_LEN 256*1024        // size of read-buffer per socket
-
-//#define nanosleep(x,y)
-
-extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
-extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
-extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
-//extern int runFinish (uint32_t runnr);
-
-extern "C" void factOut (int severity, int err, char *message);
-extern void factReportIncomplete (uint64_t rep);
-
-extern "C" void gotNewRun (int runnr, PEVNT_HEADER * headers);
-
-
-extern void factStat (GUI_STAT gj);
-
-extern void factStatNew (EVT_STAT gi);
-
-extern "C" int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
-
-extern "C" int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
-                       int8_t * buffer);
-
-extern void debugHead (int i, int j, void *buf);
-
-extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
-                       int32_t runnr, int state, uint32_t tsec,
-                       uint32_t tusec);
-extern void debugStream (int isock, void *buf, int len);
-
-int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
-
-int g_maxProc;
-int gi_maxProc;
-
-uint g_actTime;
-int g_runStat;
+#define MIN_LEN  32           // min #bytes needed to interpret FADheader
+#define MAX_LEN (36*3*1024)   // (data+header)*num channels
+
+#define COMPLETE_EVENTS
+//#define USE_EPOLL
+//#define USE_SELECT
+
+// ==========================================================================
+
+bool runOpen(const shared_ptr<EVT_CTRL2> &evt);
+bool runWrite(const shared_ptr<EVT_CTRL2> &evt);
+void runClose();
+void applyCalib(const shared_ptr<EVT_CTRL2> &evt);
+void factOut(int severity, const char *message);
+void factReportIncomplete (uint64_t rep);
+void gotNewRun(RUN_CTRL2 &run);
+void runFinished();
+void factStat(GUI_STAT gj);
+int  eventCheck(const shared_ptr<EVT_CTRL2> &evt);
+void debugHead(void *buf);
+
+// ==========================================================================
+
 int g_reset;
 
@@ -90,11 +47,9 @@
 uint gi_NumConnect[NBOARDS];    //4 crates * 10 boards
 
-//EVT_STAT gi;
 GUI_STAT gj;
 
-#define MAX_EVT   65536  // ( 300s @ 220Hz; 16GB = 5000 evt @ roi=1024 (27s) ; 18000 evt @ roi = 300 )
-#define MAX_RUN       8  // Number of concurrent runs
-
-void factPrintf(int severity, int id, const char *fmt, ...)
+// ==========================================================================
+
+void factPrintf(int severity, const char *fmt, ...)
 {
     char str[1000];
@@ -105,196 +60,399 @@
     va_end(ap);
 
-    factOut(severity, id, str);
-}
-
+    factOut(severity, str);
+}
+
+// ==========================================================================
 
 #define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
 #define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)
-typedef struct TGB_struct
-{
-    struct TGB_struct *prev;
-    void *mem;
-} TGB_entry;
-
-TGB_entry  *tgb_last = NULL;
-uint64_t    tgb_memory = 0;
-uint64_t    tgb_inuse  = 0;
-
-void *TGB_Malloc()
-{
-    // No free slot available, next alloc would exceed max memory
-    if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem)
-        return NULL;
-
-    // We will return this amount of memory
-    tgb_inuse += MAX_TOT_MEM;
-
-    // No free slot available, allocate a new one
-    if (!tgb_last)
-    {
-        tgb_memory += MAX_TOT_MEM;
-        return malloc(MAX_TOT_MEM);
-    }
-
-    // Get the next free slot from the stack and return it
-    TGB_entry *last = tgb_last;
-
-    void *mem = last->mem;
-    tgb_last  = last->prev;
-
-    free(last);
-
-    return mem;
+
+namespace Memory
+{
+    uint64_t inuse     = 0;
+    uint64_t allocated = 0;
+
+    uint64_t max_inuse = 0;
+
+    mutex mtx;
+
+    forward_list<void*> memory;
+
+    void *malloc()
+    {
+        // No free slot available, next alloc would exceed max memory
+        if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem)
+            return NULL;
+
+        // We will return this amount of memory
+        // This is not 100% thread safe, but it is not a super accurate measure anyway
+        inuse += MAX_TOT_MEM;
+        if (inuse>max_inuse)
+            max_inuse = inuse;
+
+        void *mem = NULL;
+
+        if (memory.empty())
+        {
+            // No free slot available, allocate a new one
+            allocated += MAX_TOT_MEM;
+            mem = new char[MAX_TOT_MEM];
+        }
+        else
+        {
+            // Get the next free slot from the stack and return it
+            const lock_guard<mutex> lock(mtx);
+            mem = memory.front();
+            memory.pop_front();
+        }
+
+        memset(mem, 0, MAX_HEAD_MEM);
+        return mem;
+    };
+
+    void free(void *mem)
+    {
+        if (!mem)
+            return;
+
+        // Decrease the amont of memory in use accordingly
+        inuse -= MAX_TOT_MEM;
+
+        // If the maximum memory has changed, we might be over the limit.
+        // In this case: free a slot
+        if (allocated>g_maxMem)
+        {
+            delete [] (char*)mem;
+            allocated -= MAX_TOT_MEM;
+            return;
+        }
+
+        const lock_guard<mutex> lock(mtx);
+        memory.push_front(mem);
+    }
 };
 
-void TGB_free(void *mem)
-{
-    if (!mem)
+// ==========================================================================
+
+struct READ_STRUCT
+{
+    enum buftyp_t
+    {
+        kStream,
+        kHeader,
+        kData,
+#ifdef COMPLETE_EVENTS
+        kWait
+#endif
+    };
+
+    // ---------- connection ----------
+
+    static uint activeSockets;
+
+    int  sockId;       // socket id (board number)
+    int  socket;       // socket handle
+    bool connected;    // is this socket connected?
+
+    struct sockaddr_in SockAddr;  // Socket address copied from wrapper during socket creation
+
+    // ------------ epoll -------------
+
+    static int  fd_epoll;
+    static epoll_event events[NBOARDS];
+
+    static void init();
+    static void close();
+    static int  wait();
+    static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); }
+
+    // ------------ buffer ------------
+
+    buftyp_t  bufTyp;  // what are we reading at the moment: 0=header 1=data -1=skip ...
+
+    uint32_t  bufLen;  // number of bytes left to read
+    uint8_t  *bufPos;  // next byte to read to the buffer next
+
+    union
+    {
+        uint8_t  B[MAX_LEN];
+        uint16_t S[MAX_LEN / 2];
+        uint32_t I[MAX_LEN / 4];
+        uint64_t L[MAX_LEN / 8];
+        PEVNT_HEADER H;
+    };
+
+    uint64_t rateBytes;
+    uint32_t skip;     // number of bytes skipped before start of event
+    bool     repmem;   // reportet no mmemory free
+
+    uint32_t len() const { return uint32_t(H.package_length)*2; }
+
+    void swapHeader();
+    void swapData();
+
+    // --------------------------------
+
+    READ_STRUCT() : socket(-1), connected(false), rateBytes(0)
+    {
+        if (fd_epoll<0)
+            init();
+    }
+    ~READ_STRUCT()
+    {
+        destroy();
+    }
+
+    void destroy();
+    bool create(sockaddr_in addr);
+    void check(int, sockaddr_in addr);
+    bool read();
+};
+
+int READ_STRUCT::wait()
+{
+    // wait for something to do...
+    const int rc = epoll_wait(fd_epoll, events, NBOARDS, 10); // max, timeout[ms]
+    if (rc>=0)
+        return rc;
+
+    if (errno==EINTR) // timout or signal interruption
+        return 0;
+
+    factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno);
+    return -1;
+}
+
+uint READ_STRUCT::activeSockets = 0;
+int READ_STRUCT::fd_epoll = -1;
+epoll_event READ_STRUCT::events[NBOARDS];
+
+void READ_STRUCT::init()
+{
+    if (fd_epoll>=0)
         return;
 
-    // Add the last free slot to the stack
-    TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry));
-
-    // FIXME: Really free memory if memory usuage exceeds g_maxMem
-
-    entry->prev = tgb_last;
-    entry->mem  = mem;
-
-    tgb_last = entry;
-
-    // Decrease the amont of memory in use accordingly
-    tgb_inuse -= MAX_TOT_MEM;
-
-    gj.usdMem = tgb_inuse;
-    gj.bufTot--;
-}
-
-//RUN_CTRL runCtrl[MAX_RUN];
-
-/*
-*** Definition of rdBuffer to read in IP packets; keep it global !!!!
- */
-
-typedef union
-{
-   uint8_t B[MAX_LEN];
-   uint16_t S[MAX_LEN / 2];
-   uint32_t I[MAX_LEN / 4];
-   uint64_t L[MAX_LEN / 8];
-} CNV_FACT;
-
-typedef struct
-{
-   int bufTyp;                  //what are we reading at the moment: 0=header 1=data -1=skip ...
-   int32_t bufPos;              //next byte to read to the buffer next
-   int32_t bufLen;              //number of bytes left to read
-   int32_t skip;                //number of bytes skipped before start of event
-
-   int errCnt;                  //how often connect failed since last successful
-   int sockStat;                //-1 if socket not yet connected  , 99 if not exist
-   int socket;                  //contains the sockets
-
-   struct sockaddr_in SockAddr; //IP for each socket
-
-   int evtID;                   // event ID of event currently read
-   int runID;                   // run       "
-   int ftmID;                   // event ID from FTM
-   uint fadLen;                 // FADlength of event currently read
-   int fadVers;                 // Version of FAD
-   int ftmTyp;                  // trigger type
-   int Port;
-
-   CNV_FACT *rBuf;
-
-} READ_STRUCT;
-
-/*-----------------------------------------------------------------*/
-
-
-/*-----------------------------------------------------------------*/
-
-
-int
-GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
-         READ_STRUCT * rs)
-{
-/*
-*** generate Address, create sockets and allocates readbuffer for it
-*** 
-*** if flag==0 generate socket and buffer
-***         <0 destroy socket and buffer
-***         >0 close and redo socket
-***
-*** sid : board*7 + port id
- */
-
-   //close socket if open
-   if (rs->sockStat == 0)
-   {
-      if (close (rs->socket) > 0) {
-          factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno);
-      } else {
-          factPrintf(kInfo, 771, "Succesfully closed socket %d", sid);
-      }
-   }
-
-   rs->sockStat = 99;
-
-   if (flag < 0) {
-      free (rs->rBuf);          //and never open again
-      rs->rBuf = NULL;
-      return 0;
-   }
-
-
-   if (flag == 0) {             //generate address and buffer ...
-      rs->Port = port;
-      rs->SockAddr.sin_family = sockAddr->sin_family;
-      rs->SockAddr.sin_port = htons (port);
-      rs->SockAddr.sin_addr = sockAddr->sin_addr;
-
-      rs->rBuf = (CNV_FACT*)malloc (sizeof (CNV_FACT));
-      if (rs->rBuf == NULL) {
-         factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid);
-         rs->sockStat = 77;
-         return -3;
-      }
-   }
-
-
-   if ((rs->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
-       factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno);
-      rs->sockStat = 88;
-      return -2;
-   }
-
-   int optval = 1;
-   if (setsockopt (rs->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) {
-       factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
-   }
-   optval = 10;                 //start after 10 seconds
-   if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) {
-       factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
-   }
-   optval = 10;                 //do every 10 seconds
-   if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) {
-      factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
-   }
-   optval = 2;                  //close after 2 unsuccessful tries
-   if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) {
-      factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
-   }
-
-   factPrintf(kInfo, 773, "Successfully generated socket %d", sid);
-
-   rs->sockStat = -1;           //try to (re)open socket
-   rs->errCnt = 0;
-   return 0;
-
-} /*-----------------------------------------------------------------*/
-
-  /*-----------------------------------------------------------------*/
-
-int checkRoiConsistency(const CNV_FACT *rbuf, int roi[])
+#ifdef USE_EPOLL
+    fd_epoll = epoll_create(NBOARDS);
+    if (fd_epoll<0)
+    {
+        factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno);
+        return;
+    }
+#endif
+}
+
+void READ_STRUCT::close()
+{
+#ifdef USE_EPOLL
+    if (::close(fd_epoll) > 0)
+        factPrintf(MessageImp::kFatal, "Closing epoll: %m (close,rc=%d)", errno);
+    else
+        factPrintf(MessageImp::kInfo, "Succesfully closed epoll");
+#endif
+
+    fd_epoll = -1;
+}
+
+bool READ_STRUCT::create(sockaddr_in sockAddr)
+{
+    if (socket>=0)
+        return false;
+
+    const int port = ntohs(sockAddr.sin_port) + 1;
+
+    SockAddr.sin_family = sockAddr.sin_family;
+    SockAddr.sin_addr   = sockAddr.sin_addr;
+    SockAddr.sin_port   = htons(port);
+
+    if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
+    {
+        factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno);
+        socket = -1;
+        return false;
+    }
+
+    int optval = 1;
+    if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0)
+        factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
+
+    optval = 10;                 //start after 10 seconds
+    if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0)
+        factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
+
+    optval = 10;                 //do every 10 seconds
+    if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0)
+        factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
+
+    optval = 2;                  //close after 2 unsuccessful tries
+    if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0)
+        factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
+
+    factPrintf(MessageImp::kInfo, "Successfully generated socket %d", sockId);
+
+    //connected = false;
+    activeSockets++;
+
+    return true;
+}
+
+void READ_STRUCT::destroy()
+{
+    if (socket==-1)
+        return;
+
+#ifdef USE_EPOLL
+    // strictly speaking this should not be necessary
+    if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
+        factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
+#endif
+
+    if (::close(socket) > 0)
+        factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
+    else
+        factPrintf(MessageImp::kInfo, "Succesfully closed socket %d", sockId);
+
+    socket = -1;
+    connected = false;
+    activeSockets--;
+}
+
+void READ_STRUCT::check(int sockDef, sockaddr_in addr)
+{
+    // Continue in the most most likely case (performance)
+    //if (socket>=0 && sockDef!=0 && connected)
+    //    return;
+
+    // socket open, but should not be open
+    if (socket>=0 && sockDef==0)
+        destroy();
+
+    // Socket closed, but should be open
+    if (socket<0 && sockDef!=0)
+        create(addr); //generate address and socket
+
+    // Socket closed
+    if (socket<0)
+        return;
+
+    // Socket open and connected: Nothing to do
+    if (connected)
+        return;
+
+    //try to connect if not yet done
+    const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
+    if (rc == -1)
+        return;
+
+    connected = true;
+
+    if (sockDef<0)
+    {
+        bufTyp = READ_STRUCT::kStream; // full data to be skipped
+        bufLen = MAX_LEN;              // huge for skipping
+    }
+    else
+    {
+        bufTyp = READ_STRUCT::kHeader;  // expect a header
+        bufLen = sizeof(PEVNT_HEADER);  // max size to read at begining
+    }
+
+    bufPos = B;  // no byte read so far
+    skip   = 0;       // start empty
+    repmem = false;
+
+    factPrintf(MessageImp::kInfo, "New connection %d (%d)", sockId, socket);
+
+#ifdef USE_EPOLL
+    epoll_event ev;
+    ev.events = EPOLLIN;
+    ev.data.ptr = this;  // user data (union: ev.ptr)
+    if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
+        factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
+#endif
+}
+
+bool READ_STRUCT::read()
+{
+    if (bufLen==0)
+        return true;
+
+    const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
+    // recv failed
+    if (jrd<0)
+    {
+        // There was just nothing waiting
+        if (errno==EWOULDBLOCK || errno==EAGAIN)
+            return false;
+
+        factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno);
+        return false;
+    }
+
+    // connection was closed ...
+    if (jrd==0)
+    {
+        factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId);
+
+        destroy();//DestroySocket(rd[i]); //generate address and socket
+        return false;
+    }
+
+    rateBytes += jrd;
+
+    // are we skipping this board ...
+    if (bufTyp==kStream)
+        return false;
+
+    bufPos += jrd;  //==> prepare for continuation
+    bufLen -= jrd;
+
+    // not yet all read
+    return bufLen==0;
+}
+
+void READ_STRUCT::swapHeader()
+{
+    S[1]  = ntohs(S[1]);    // package_length (bytes not swapped!)
+    S[2]  = ntohs(S[2]);    // version_no
+    S[3]  = ntohs(S[3]);    // PLLLCK
+    S[4]  = ntohs(S[4]);    // trigger_crc
+    S[5]  = ntohs(S[5]);    // trigger_type
+
+    I[3]  = ntohl(I[3]);    // trigger_id
+    I[4]  = ntohl(I[4]);    // fad_evt_counter
+    I[5]  = ntohl(I[5]);    // REFCLK_frequency
+
+    S[12] = ntohs(S[12]);   // board id
+    S[13] = ntohs(S[13]);   // adc_clock_phase_shift
+    S[14] = ntohs(S[14]);   // number_of_triggers_to_generate
+    S[15] = ntohs(S[15]);   // trigger_generator_prescaler
+
+    I[10] = ntohl(I[10]);   // runnumber;
+    I[11] = ntohl(I[11]);   // time;
+
+    for (int s=24; s<24+NTemp+NDAC; s++)
+        S[s] = ntohs(S[s]); // drs_temperature / dac
+}
+
+void READ_STRUCT::swapData()
+{
+    // swapEventHeaderBytes: End of the header. to channels now
+
+    int i = 36;
+    for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
+    {
+        S[i+0] = ntohs(S[i+0]);//id
+        S[i+1] = ntohs(S[i+1]);//start_cell
+        S[i+2] = ntohs(S[i+2]);//roi
+        S[i+3] = ntohs(S[i+3]);//filling
+
+        i += 4+S[i+2];//skip the pixel data
+    }
+}
+
+// ==========================================================================
+
+bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
 {
     int xjr = -1;
@@ -304,14 +462,14 @@
     int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
 
-    roi[0] = ntohs(rbuf->S[roiPtr]);
+    roi[0] = ntohs(rd.S[roiPtr]);
 
     for (int jr = 0; jr < 9; jr++)
     {
-        roi[jr] = ntohs(rbuf->S[roiPtr]);
-
-        if (roi[jr]<0 || roi[jr]>1024)
-        {
-            factPrintf(kError, 999, "Illegal roi in channel %d (allowed: 0<=roi<=1024)", jr, roi[jr]);
-            return 0;
+        roi[jr] = ntohs(rd.S[roiPtr]);
+
+        if (roi[jr]>1024)
+        {
+            factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]);
+            return false;
         }
 
@@ -326,5 +484,5 @@
         for (int kr = 1; kr < 4; kr++)
         {
-            const int kroi = ntohs(rbuf->S[roiPtr]);
+            const int kroi = ntohs(rd.S[roiPtr]);
             if (kroi != roi[jr])
             {
@@ -340,45 +498,32 @@
     {
         if (xkr<0)
-            factPrintf(kFatal, 1, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
+            factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
         else
-            factPrintf(kFatal, 1, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rbuf->S[roiPtr]));
-
-        return 0;
+            factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr]));
+
+        return false;
     }
 
     if (roi[8] < roi[0])
     {
-        factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
-        //gj.badRoiB++;
-        //gj.badRoi[b]++;
-        return 0;
-    }
-
-    return 1;
-}
-
-deque<shared_ptr<EVT_CTRL>> evtCtrl;
-map<int, shared_ptr<RUN_CTRL>> runCtrl;
-
-void mBufFree(EVT_CTRL *evt)
-{
-    TGB_free(evt->FADhead);
-}
-
-shared_ptr<EVT_CTRL> mBufEvt(const READ_STRUCT *rs)
-{
-    int nRoi[9];
-    if (!checkRoiConsistency(rs->rBuf, nRoi))
-        return shared_ptr<EVT_CTRL>();
-
-    const int  evID   = rs->evtID;
-    const uint runID  = rs->runID;
-    const int  trgTyp = rs->ftmTyp;
-    const int  trgNum = rs->ftmID;
-    const int  fadNum = rs->evtID;
+        factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
+        return false;
+    }
+
+    return true;
+}
+
+list<shared_ptr<EVT_CTRL2>> evtCtrl;
+
+shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
+{
+    uint16_t nRoi[9];
+    if (!checkRoiConsistency(rd, nRoi))
+        return shared_ptr<EVT_CTRL2>();
 
     for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
     {
-        const shared_ptr<EVT_CTRL> evt = *it;
+        // A reference is enough because the evtCtrl holds the shared_ptr anyway
+        const shared_ptr<EVT_CTRL2> &evt = *it;
 
         // If the run is different, go on searching.
@@ -386,13 +531,13 @@
         // the case of the events, because theoretically, there
         // can be the same run on two different days.
-        if (runID != evt->runNum)
+        if (rd.H.runnumber != evt->runNum)
             continue;
 
         // If the ID of the new event if higher than the last one stored
         // in that run, we have to assign a new slot (leave the loop)
-        if (evID > evt->evNum)
+        if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/)
             break;
 
-        if (evID != evt->evNum)
+        if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/)
             continue;
 
@@ -401,15 +546,13 @@
         if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
         {
-            factPrintf(kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
+            factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
                        evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
-            return shared_ptr<EVT_CTRL>();
+            return shared_ptr<EVT_CTRL2>();
         }
 
         // count for inconsistencies
-        if (evt->trgNum != trgNum)
+        if (evt->trgNum != rd.H.trigger_id)
             evt->Errors[0]++;
-        if (evt->fadNum != fadNum)
-            evt->Errors[1]++;
-        if (evt->trgTyp != trgTyp)
+        if (evt->trgTyp != rd.H.trigger_type)
             evt->Errors[2]++;
 
@@ -418,106 +561,134 @@
     }
 
-    struct timeval tv;
-    gettimeofday(&tv, NULL);
-
-    auto ir = runCtrl.find(runID);
-    if (ir==runCtrl.end())
-    {
-        shared_ptr<RUN_CTRL> run(new RUN_CTRL);
-
-        run->runId      = runID;
-        run->roi0       = nRoi[0];  // FIXME: Make obsolete!
-        run->roi8       = nRoi[8];  // FIXME: Make obsolete!
-        run->fileId     = -2;
-        run->lastEvt    = 1;         // Number of events partially started to read
-        run->actEvt     = 0;         // Number of written events (write)
-        run->procEvt    = 0;         // Number of successfully checked events (checkEvent)
-        run->maxEvt     = 999999999; // max number events allowed
-        run->lastTime   = tv.tv_sec;      // Time when the last event was written
-        run->closeTime  = tv.tv_sec + 3600 * 24;     //max time allowed
-
-        ir = runCtrl.insert(make_pair(runID, run)).first;
-    }
-
-    const shared_ptr<RUN_CTRL> run = ir->second;
-
-    if (run->roi0 != nRoi[0] || run->roi8 != nRoi[8])
-    {
-        factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
-                   run->roi0, run->roi8, nRoi[0], nRoi[8], runID, evID);
-        return shared_ptr<EVT_CTRL>();
-    }
-
-    const shared_ptr<EVT_CTRL> evt(new EVT_CTRL, mBufFree);
-
-    //flag all boards as unused
-    evt->nBoard = 0;
-    for (int b=0; b<NBOARDS; b++)
-        evt->board[b] = -1;
-
-    evt->run       = run;
-    evt->pcTime[0] = tv.tv_sec;
-    evt->pcTime[1] = tv.tv_usec;
-    evt->nRoi      = nRoi[0];
-    evt->nRoiTM    = nRoi[8];
-    evt->evNum     = evID;
-    evt->runNum    = runID;
-    evt->fadNum    = fadNum;
-    evt->trgNum    = trgNum;
-    evt->trgTyp    = trgTyp;
-    evt->Errors[0] = 0;
-    evt->Errors[1] = 0;
-    evt->Errors[2] = 0;
-    evt->Errors[3] = 0;
-    evt->fEvent    = NULL;
-    evt->FADhead   = NULL;
-
-    //    -1:   kInValid
-    //     0:   kValid
-    //  1-40:   kIncomplete
-    //    90:   kIncompleteReported
-    //   100:   kCompleteEventInBuffer
-    //  1000+x: kToBeProcessedByThreadX
-    //  5000:   kToBeWritten
-    // 10000:   kToBeDeleted
-
-    evt->evtStat = 0;
-
+    if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
+    {
+        factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
+                   actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter);
+        return shared_ptr<EVT_CTRL2>();
+    }
+
+    shared_ptr<EVT_CTRL2> evt(new EVT_CTRL2);
+
+    gettimeofday(&evt->time, NULL);
+
+    evt->runNum = rd.H.runnumber;
+    evt->evNum  = rd.H.fad_evt_counter;
+
+    evt->trgNum = rd.H.trigger_id;
+    evt->trgTyp = rd.H.trigger_type;
+
+    evt->nRoi   = nRoi[0];
+    evt->nRoiTM = nRoi[8];
+
+    const bool newrun = actrun->runId != rd.H.runnumber;
+    if (newrun)
+    {
+        // Since we have started a new run, we know already when to close the
+        // previous run in terms of number of events
+        actrun->maxEvt = actrun->lastEvt;
+
+        factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d and roi_tm=%d, prev=%d",
+                   rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
+
+        // The new run is the active run now
+        actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
+
+        const time_t &tsec = evt->time.tv_sec;
+
+        actrun->openTime  = tsec;
+        actrun->closeTime = tsec + 3600 * 24; // max time allowed
+        actrun->runId     = rd.H.runnumber;
+        actrun->roi0      = nRoi[0];  // FIXME: Make obsolete!
+        actrun->roi8      = nRoi[8];  // FIXME: Make obsolete!
+    }
+
+    // Increase the number of events we have started to receive in this run
+    actrun->lastTime = evt->time.tv_sec;  // Time when the last event was received
+    actrun->lastEvt++;
+
+    // Keep pointer to run of this event
+    evt->runCtrl = actrun;
+
+    // Secure access to evtCtrl against access in CloseRunFile
+    // This should be the last... otherwise we can run into threading issues
+    // if the event is accessed before it is fully initialized.
     evtCtrl.push_back(evt);
 
+    // Signal the fadctrl that a new run has been started
+    // Note this is the only place at which we can ensure that
+    // gotnewRun is called only once
+    // Note that this will callback CloseRunFile, therefor the event
+    // must already be in the evtCtrl structure
+    if (newrun)
+        gotNewRun(*actrun);
+
+    // An event can be the first and the last, but not the last and the first.
+    // Therefore gotNewRun is called before runFinished.
+    // runFinished signals that the last event of a run was just received. Processing
+    // might still be ongoing, but we can start a new run.
+    const bool cond1 = actrun->lastEvt < actrun->maxEvt;      // max number of events not reached
+    const bool cond2 = actrun->lastTime < actrun->closeTime;  // max time not reached
+    if (!cond1 || !cond2)
+        runFinished();
+
     return evt;
-
-} /*-----------------------------------------------------------------*/
-
-
-void initEvent(const shared_ptr<EVT_CTRL> &evt)
-{
-    evt->fEvent = (EVENT*)((char*)evt->FADhead+MAX_HEAD_MEM);
-    memset(evt->fEvent->Adc_Data, 0, (NPIX+NTMARK)*2*evt->nRoi);
-
-    //flag all pixels as unused
-    for (int k = 0; k < NPIX; k++)
-        evt->fEvent->StartPix[k] = -1;
-
-    //flag all TMark as unused
-    for (int k = 0; k < NTMARK; k++)
-        evt->fEvent->StartTM[k] = -1;
-
-    evt->fEvent->NumBoards   = 0;
-    evt->fEvent->SoftTrig    = 0;
-    evt->fEvent->PCTime      = evt->pcTime[0];
-    evt->fEvent->PCUsec      = evt->pcTime[1];
-    evt->fEvent->Roi         = evt->nRoi;
-    evt->fEvent->RoiTM       = evt->nRoiTM;
-    evt->fEvent->EventNum    = evt->evNum;
-    evt->fEvent->TriggerNum  = evt->trgNum;
-    evt->fEvent->TriggerType = evt->trgTyp;
-}
-
-
-uint64_t reportIncomplete(const shared_ptr<EVT_CTRL> &evt, const char *txt)
-{
-    factPrintf(kWarn, 601, "skip incomplete evt (run=%d, evt=%d, %s)",
-               evt->runNum, evt->evNum, txt);
+}
+
+
+void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
+{
+    const int i = rBuf.sockId;
+
+    memcpy(evt->FADhead.get()+i, &rBuf.H, sizeof(PEVNT_HEADER));
+
+    int src = sizeof(PEVNT_HEADER) / 2;  // Header is 72 byte = 36 shorts
+
+    // consistency of ROIs have been checked already (is it all correct?)
+    const uint16_t &roi = rBuf.S[src+2];
+
+    // different sort in FAD board.....
+    for (int px = 0; px < 9; px++)
+    {
+        for (int drs = 0; drs < 4; drs++)
+        {
+            const int16_t pixC = rBuf.S[src+1];    // start-cell
+            const int16_t pixR = rBuf.S[src+2];    // roi
+            //here we should check if pixH is correct ....
+
+            const int pixS = i*36 + drs*9 + px;
+
+            evt->fEvent->StartPix[pixS] = pixC;
+
+            memcpy(evt->fEvent->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2);
+
+            src += 4+pixR;
+
+            // Treatment for ch 9 (TM channel)
+            if (px != 8)
+                continue;
+
+            const int tmS = i*4 + drs;
+
+            //and we have additional TM info
+            if (pixR > roi)
+            {
+                evt->fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024;
+
+                memcpy(evt->fEvent->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2);
+            }
+            else
+            {
+                evt->fEvent->StartTM[tmS] = -1;
+            }
+        }
+    }
+}
+
+// ==========================================================================
+
+uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt)
+{
+    factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)",
+               evt->runNum, evt->evNum, evtCtrl.size(), txt);
 
     uint64_t report = 0;
@@ -541,5 +712,5 @@
         // accoridng to the current connection status, not w.r.t. to the
         // one when the event was taken.
-        if (gi_NumConnect[ib]<=0) // board not connected
+        if (gi_NumConnect[ib]==0) // board not connected
         {
             str[ik++] = 'x';
@@ -555,1485 +726,619 @@
     str[ik]   = 0;
 
-    factOut(kWarn, 601, str);
+    factOut(MessageImp::kWarn, str);
 
     return report;
 }
 
-// i == board
-void copyData(CNV_FACT *rbuf, int i, const shared_ptr<EVT_CTRL> &evt)
-{
-    // swapEventHeaderBytes: End of the header. to channels now
-    int eStart = 36;
-    for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
-    {
-        rbuf->S[eStart+0] = ntohs(rbuf->S[eStart+0]);//id
-        rbuf->S[eStart+1] = ntohs(rbuf->S[eStart+1]);//start_cell
-        rbuf->S[eStart+2] = ntohs(rbuf->S[eStart+2]);//roi
-        rbuf->S[eStart+3] = ntohs(rbuf->S[eStart+3]);//filling
-
-        eStart += 4+rbuf->S[eStart+2];//skip the pixel data
-    }
-
-    memcpy(&evt->FADhead[i], rbuf, sizeof(PEVNT_HEADER));
-
-    int src = sizeof(PEVNT_HEADER) / 2;
-
-    // consistency of ROIs have been checked already (is it all correct?)
-    const int roi = rbuf->S[src+2];
-
-    // different sort in FAD board.....
-    for (int px = 0; px < 9; px++)
-    {
-        for (int drs = 0; drs < 4; drs++)
-        {
-            // pixH = rd[i].rBuf->S[src++];    // ID
-            src++;
-
-            const int pixC = rbuf->S[src++];    // start-cell
-            const int pixR = rbuf->S[src++];    // roi
-            //here we should check if pixH is correct ....
-
-            const int pixS = i * 36 + drs * 9 + px;
-            src++;
-
-            evt->fEvent->StartPix[pixS] = pixC;
-
-            const int dest1 = pixS * roi;
-            memcpy(&evt->fEvent->Adc_Data[dest1], &rbuf->S[src], roi * 2);
-
-            src += pixR;
-
-            if (px == 8)
+// ==========================================================================
+// ==========================================================================
+
+Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&applyCalib, placeholders::_1));
+
+// If this is not convenient anymore, it could be replaced by
+// a command queue, to which command+data is posted,
+// (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo)
+void writeEvt(const shared_ptr<EVT_CTRL2> &evt)
+{
+    const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
+
+    bool rc1 = true;
+
+    // Is this a valid event or just an empty event to trigger run close?
+    // If this is not an empty event open the new run-file
+    // Empty events are there to trigger run-closing conditions
+    if (evt->runNum>=0)
+    {
+        // File not yet open
+        if (run->fileStat==kFileNotYetOpen)
+        {
+            // runOpen will close a previous run, if still open
+            if (!runOpen(evt))
             {
-                const int tmS = i * 4 + drs;
-
-                //and we have additional TM info
-                if (pixR > roi)
+                factPrintf(MessageImp::kError, "writeEvt: Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
+                run->fileStat = kFileClosed;
+                return;
+            }
+
+            factPrintf(MessageImp::kInfo, "writeEvt: Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
+            run->fileStat = kFileOpen;
+        }
+
+        // Here we have a valid calibration and can go on with that.
+        processingQueue1.post(evt);
+
+        // File already closed
+        if (run->fileStat==kFileClosed)
+            return;
+
+        rc1 = runWrite(evt);
+        if (!rc1)
+            factPrintf(MessageImp::kError, "writeEvt: Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
+    }
+
+    const bool cond1 = run->lastEvt < run->maxEvt;      // max number of events not reached
+    const bool cond2 = run->lastTime < run->closeTime;  // max time not reached
+    const bool cond3 = rc1;                             // Write successfull
+
+    // File is not yet to be closed.
+    if (cond1 && cond2 && cond3)
+        return;
+
+    runClose();
+    run->fileStat = kFileClosed;
+
+    string str;
+    if (!cond1) str += to_string(run->maxEvt)+" evts reached";
+    if (!cond1 && (!cond2 || !cond3)) str += ", ";
+    if (!cond2) str += to_string(run->closeTime-run->openTime)+"s reached";
+    if ((!cond1 || !cond2) && !cond3) str += ", ";
+    if (!cond3) str += "runWrite failed";
+    factPrintf(MessageImp::kInfo, "File closed because %s",  str.c_str());
+}
+
+Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1));
+
+void procEvt(const shared_ptr<EVT_CTRL2> &evt)
+{
+    if (evt->runNum>=0)
+    {
+        evt->fEvent->Errors[0] = evt->Errors[0];
+        evt->fEvent->Errors[1] = evt->Errors[1];
+        evt->fEvent->Errors[2] = evt->Errors[2];
+        evt->fEvent->Errors[3] = evt->Errors[3];
+
+        for (int ib=0; ib<NBOARDS; ib++)
+            evt->fEvent->BoardTime[ib] = evt->FADhead.get()[ib].time;
+
+        const int rc = eventCheck(evt);
+        if (rc < 0)
+            return;
+    }
+
+    // If file is open post the event for being written
+    secondaryQueue.post(evt);
+}
+
+// ==========================================================================
+// ==========================================================================
+
+shared_ptr<RUN_CTRL2> actrun; // needed in CloseRunFile
+
+/*
+ task 1-4:
+
+ lock1()-lock4();
+ while (1)
+ {
+       wait for signal [lockN];  // unlocked
+
+       while (n!=10)
+         wait sockets;
+         read;
+
+       lockM();
+       finished[n] = true;
+       signal(mainloop);
+       unlockM();
+ }
+
+
+ mainloop:
+
+ while (1)
+ {
+       lockM();
+       while (!finished[0] || !finished[1] ...)
+          wait for signal [lockM];  // unlocked... signals can be sent
+       finished[0-1] = false;
+       unlockM()
+
+       copy data to queue    // locked
+
+       lockN[0-3];
+       signalN[0-3];
+       unlockN[0-3];
+ }
+
+
+ */
+
+/*
+    while (g_reset)
+    {
+        shared_ptr<EVT_CTRL2> evt = new shared_ptr<>;
+
+        // Check that all sockets are connected
+
+        for (int i=0; i<40; i++)
+            if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0)
+               factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
+
+        while (g_reset)
+        {
+           if (READ_STRUCT::wait()<0)
+              break;
+
+           if (rc_epoll==0)
+              break;
+
+           for (int jj=0; jj<rc_epoll; jj++)
+           {
+              READ_STRUCT *rs = READ_STRUCT::get(jj);
+              if (!rs->connected)
+                  continue;
+
+              const bool rc_read = rs->read();
+              if (!rc_read)
+                  continue;
+
+              if (rs->bufTyp==READ_STRUCT::kHeader)
+              {
+                  [...]
+              }
+
+              [...]
+
+              if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
+                 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
+           }
+
+           if (once_a_second)
+           {
+              if (evt==timeout)
+                  break;
+           }
+        }
+
+        if (evt.nBoards==actBoards)
+            primaryQueue.post(evt);
+    }
+*/
+
+void CloseRunFile()
+{
+    // Create a copy of the shared_ptr to ensure
+    // is not replaced in the middle of the action
+    const shared_ptr<RUN_CTRL2> run = actrun;
+    run->maxEvt = run->lastEvt;
+}
+
+bool mainloop(READ_STRUCT *rd)
+{
+    factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
+
+    Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
+
+    primaryQueue.start();
+    secondaryQueue.start();
+
+    actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
+
+    //time in seconds
+    time_t gi_SecTime = time(NULL)-1;
+
+    //loop until global variable g_runStat claims stop
+    g_reset = 0;
+    while (g_reset == 0)
+    {
+#ifdef USE_SELECT
+        fd_set readfs;
+        FD_ZERO(&readfs);
+        int nfsd = 0;
+        for (int i=0; i<NBOARDS; i++)
+            if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
+            {
+                FD_SET(rd[i].socket, &readfs);
+                if (rd[i].socket>nfsd)
+                    nfsd = rd[i].socket;
+            }
+
+        timeval tv;
+        tv.tv_sec = 0;
+        tv.tv_usec = 100;
+        const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
+        // 0: timeout
+        // -1: error
+        if (rc_select<0)
+        {
+            factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
+            continue;
+        }
+#endif
+
+#ifdef USE_EPOLL
+        const int rc_epoll = READ_STRUCT::wait();
+        if (rc_epoll<0)
+            break;
+#endif
+
+#ifdef USE_EPOLL
+        for (int jj=0; jj<rc_epoll; jj++)
+#else
+        for (int jj=0; jj<NBOARDS; jj++)
+#endif
+        {
+#ifdef USE_EPOLL
+            // FIXME: How to get i?
+            READ_STRUCT *rs = READ_STRUCT::get(jj);
+#else
+
+            const int i = (jj%4)*10 + (jj/4);
+            READ_STRUCT *rs = &rd[i];
+            if (!rs->connected)
+                continue;
+#endif
+
+#ifdef USE_SELECT
+            if (!FD_ISSET(rs->socket, &readfs))
+                continue;
+#endif
+
+
+#ifdef COMPLETE_EVENTS
+            if (rs->bufTyp==READ_STRUCT::kWait)
+                continue;
+#endif
+
+            // ==================================================================
+
+            const bool rc_read = rs->read();
+
+            // Connect might have gotten closed during read
+            gi_NumConnect[rs->sockId] = rs->connected;
+            gj.numConn[rs->sockId]    = rs->connected;
+
+            // Read either failed or disconnected, or the buffer is not yet full
+            if (!rc_read)
+                continue;
+
+            // ==================================================================
+
+            if (rs->bufTyp==READ_STRUCT::kHeader)
+            {
+                //check if startflag correct; else shift block ....
+                // FIXME: This is not enough... this combination of
+                //        bytes can be anywhere... at least the end bytes
+                //        must be checked somewhere, too.
+                uint k;
+                for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
                 {
-                    const int dest2 = tmS * roi + NPIX * roi;
-
-                    const int srcT = src - roi;
-                    evt->fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024;
-
-                    memcpy(&evt->fEvent->Adc_Data[dest2], &rbuf->S[srcT], roi * 2);
+                    if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
+                    //if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0xfb01)
+                        break;
                 }
+                rs->skip += k;
+
+                //no start of header found
+                if (k==sizeof(PEVNT_HEADER)-1)
+                {
+                    rs->B[0]   = rs->B[sizeof(PEVNT_HEADER)-1];
+                    rs->bufPos = rs->B+1;
+                    rs->bufLen = sizeof(PEVNT_HEADER)-1;
+                    continue;
+                }
+
+                if (k > 0)
+                {
+                    memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
+
+                    rs->bufPos -= k;
+                    rs->bufLen += k;
+
+                    continue; // We need to read more (bufLen>0)
+                }
+
+                if (rs->skip>0)
+                {
+                    factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
+                    rs->skip = 0;
+                }
+
+                // Swap the header entries from network to host order
+                rs->swapHeader();
+
+                rs->bufTyp = READ_STRUCT::kData;
+                rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
+
+                debugHead(rs->B);  // i and fadBoard not used
+
+                continue;
+            }
+
+            const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
+            if (end != 0xfe04)
+            {
+                factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
+                           rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
+
+                // ready to read next header
+                rs->bufTyp = READ_STRUCT::kHeader;
+                rs->bufLen = sizeof(PEVNT_HEADER);
+                rs->bufPos = rs->B;
+                // FIXME: What to do with the validity flag?
+                continue;
+            }
+
+            // get index into mBuffer for this event (create if needed)
+            const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
+
+            // We have a valid entry, but no memory has yet been allocated
+            if (evt && !evt->FADhead)
+            {
+                // Try to get memory from the big buffer
+                PEVNT_HEADER *mem = (PEVNT_HEADER*)Memory::malloc();
+                if (!mem)
+                {
+                    // If this works properly, this is a hack which can be removed, or
+                    // replaced by a signal or dim message
+                    if (!rs->repmem)
+                    {
+                        factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
+                        rs->repmem = true;
+                    }
+                    continue;
+                }
+
+                evt->initEvent(shared_ptr<PEVNT_HEADER>(mem, Memory::free));
+            }
+
+            // ready to read next header
+            rs->bufTyp = READ_STRUCT::kHeader;
+            rs->bufLen = sizeof(PEVNT_HEADER);
+            rs->bufPos = rs->B;
+
+            // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
+            if (!evt)
+                continue;
+
+            /*
+             const int fad = (i/10)<<8)|(i%10);
+             if (fad != rs->H.board_id)
+             {
+                 factPrintf(MessageImp::kWarn, "Board ID mismatch. Expected %x, got %x", fad, rs->H.board_id);
+             }*/
+
+            // This should never happen
+            if (evt->board[rs->sockId] != -1)
+            {
+                factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
+                           evt->evNum, rs->sockId, rs->sockId, rs->len());
+                // FIXME: What to do with the validity flag?
+                continue; // Continue reading next header
+            }
+
+            // Swap the data entries (board headers) from network to host order
+            rs->swapData();
+
+            // Copy data from rd[i] to mBuffer[evID]
+            copyData(*rs, evt.get());
+
+#ifdef COMPLETE_EVENTS
+            // Do not read anmymore from this board until the whole event has been received
+            rs->bufTyp = READ_STRUCT::kWait;
+#endif
+            // now we have stored a new board contents into Event structure
+            evt->fEvent->NumBoards++;
+            evt->board[rs->sockId] = rs->sockId;
+            evt->nBoard++;
+
+            // event not yet complete
+            if (evt->nBoard < READ_STRUCT::activeSockets)
+                continue;
+
+            // All previous events are now flagged as incomplete ("expired")
+            // and will be removed. (This is a bit tricky, because pop_front()
+            // would invalidate the current iterator if not done _after_ the increment)
+            for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
+            {
+                const bool found = it->get()==evt.get();
+                if (!found)
+                    reportIncomplete(*it, "expired");
                 else
+                    primaryQueue.post(evt);
+
+                it++;
+                evtCtrl.pop_front();
+
+                // We reached the current event, so we are done
+                if (found)
+                    break;
+            }
+
+#ifdef COMPLETE_EVENTS
+            for (int j=0; j<40; j++)
+            {
+                //if (rs->bufTyp==READ_STRUCT::kWait)
                 {
-                    evt->fEvent->StartTM[tmS] = -1;
+                    rs->bufTyp = READ_STRUCT::kHeader;
+                    rs->bufLen = sizeof(PEVNT_HEADER);
+                    rs->bufPos = rs->B;
                 }
             }
-        }
-    }
-}
-
-void doProcess(const shared_ptr<EVT_CTRL> &evt);
-void doWrite(const shared_ptr<EVT_CTRL> &evt);
-
-void checkAndCloseRun(const shared_ptr<RUN_CTRL> &run, int cond, int where);
-
-Queue<shared_ptr<EVT_CTRL>> process(bind(doProcess, placeholders::_1));
-Queue<shared_ptr<EVT_CTRL>> write_queue(bind(doWrite, placeholders::_1));
-
-void preProcess(const shared_ptr<EVT_CTRL> &evt)
-{
-    //-------- it is better to open the run already here, so call can be used to initialize
-    //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
-    const shared_ptr<RUN_CTRL> run = evt->run;
-    if (run->runId==0)
-        return;
-
-    // File not yet open
-    if (run->fileId < 0)
-    {
-        RUN_HEAD actRun;
-        actRun.Version =  1;
-        actRun.RunType = -1;  //to be adapted
-        actRun.Nroi    = evt->nRoi;      //runCtrl[lastRun].roi0;
-        actRun.NroiTM  = evt->nRoiTM;    //runCtrl[lastRun].roi8;
-        actRun.RunTime = evt->pcTime[0]; //runCtrl[lastRun].firstTime;
-        actRun.RunUsec = evt->pcTime[1]; //runCtrl[lastRun].firstUsec;
-        actRun.NBoard  = NBOARDS;
-        actRun.NPix    = NPIX;
-        actRun.NTm     = NTMARK;
-
-        memcpy(actRun.FADhead, evt->FADhead, NBOARDS*sizeof(PEVNT_HEADER));
-
-        run->fileHd = runOpen(evt->runNum, &actRun, sizeof (actRun));
-        if (run->fileHd == NULL)
-        {
-            factPrintf(kError, 502, "procEvt: Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
-            run->fileId = 91;
-
-            // No further processing of this event
-            return;
-        }
-
-        run->fileId = 0;
-        factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
-    }
-
-    //and set correct event header ; also check for consistency in event (not yet)
-    evt->fEvent->Errors[0] = evt->Errors[0];
-    evt->fEvent->Errors[1] = evt->Errors[1];
-    evt->fEvent->Errors[2] = evt->Errors[2];
-    evt->fEvent->Errors[3] = evt->Errors[3];
-
-    for (int ib=0; ib<NBOARDS; ib++)
-    {
-        // board is not read
-        if (evt->board[ib] == -1)
-        {
-            evt->FADhead[ib].start_package_flag = 0;
-            evt->fEvent->BoardTime[ib] = 0;
-        }
-        else
-        {
-            evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time;
-        }
-    }
-
-    const int rc = eventCheck(evt->runNum, evt->FADhead, evt->fEvent);
-
-    // no further processing of event ('delete')
-    if (rc < 0)
-        return;
-
-    //evt->evtStat = 1000;       // flag 'start processing'
-    run->procEvt++;
-    process.post(evt);
-}
-
-void doProcess(const shared_ptr<EVT_CTRL> &evt)
-{
-    const int jret = subProcEvt(1, evt->FADhead, evt->fEvent, 0);
-
-    if (jret>0 && jret<=1)
-        factPrintf(kError, -1, "Process wants to send event to process %d... not allowed.", jret);
-
-    // flag as 'to be written'
-    if (jret<=1)
-        return;
-
-    //evt->evtStat = 5000;
-    write_queue.post(evt);
-}
-
-void doWrite(const shared_ptr<EVT_CTRL> &evt)
-{
-    const shared_ptr<RUN_CTRL> run = evt->run;
-    if (run->runId==0)
-        return;
-
-    // File is not open
-    if (run->fileId!=0)
-        return;
-
-    const int rc = runWrite(run->fileHd, evt->fEvent, 0);
-    if (rc >= 0)
-    {
-        // Sucessfully wrote event
-        run->lastTime = g_actTime;
-        run->actEvt++;
-    }
-    else
-        factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", evt->runNum);
-
-    checkAndCloseRun(run, rc<0, 1);
-
-    /*
-    // Although the are no pending events, we have to check if a run should be closed (timeout)
-    for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
-    {
-        if (ir->second->fileId == 0)
-        {
-            //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
-            const int cond = ir->second->runId == 0;
-            checkAndCloseRun(ir->second, cond, 2);
-        }
-    }
-    */
-}
-
-void *readFAD (void *ptr)
-{
-/* *** main loop reading FAD data and sorting them to complete events */
-
-    Queue<shared_ptr<EVT_CTRL>> queue(bind(preProcess, placeholders::_1));
-
-    factPrintf(kInfo, -1, "Start initializing (readFAD)");
-
-    READ_STRUCT rd[NBOARDS];       //buffer to read IP and afterwards store in mBuffer
-
-    uint32_t actrun = 0;
-
-   const int minLen = sizeof(PEVNT_HEADER);  //min #bytes needed to check header: full header for debug
-
-
-   int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
-   gi_resetS = gi_resetR = 9;
-
-   int sockDef[NBOARDS];        //internal state of sockets
-   memset(sockDef, 0, NBOARDS*sizeof(int));
-
- START:
-   //time in seconds
-   uint gi_SecTime = time(NULL);;
-   g_actTime = gi_SecTime;
-
-   const int cntsock = 8 - NUMSOCK ;
-
-   if (gi_resetS > 0) {
-      //make sure all sockets are preallocated as 'not exist'
-      for (int i = 0; i < NBOARDS; i++) {
-         rd[i].socket = -1;
-         rd[i].sockStat = 99;
-      }
-
-      for (int k = 0; k < NBOARDS; k++) {
-         gi_NumConnect[k] = 0;
-         //gi.numConn[k] = 0;
-         gj.numConn[k] = 0;
-         //gj.errConn[k] = 0;
-         gj.rateBytes[k] = 0;
-         gj.totBytes[k] = 0;
-      }
-
-   }
-
-   if (gi_resetR > 0)
-   {
-      gj.bufTot  = gj.maxEvt = gj.xxxEvt = 0;
-      gj.usdMem  = gj.maxMem = gj.xxxMem = 0;
-      gj.totMem  = tgb_memory;
-      gj.bufNew  = gj.bufEvt = 0;
-      gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
-
-      factPrintf(kInfo, -1, "End   initializing (readFAD)");
-   }
-
-
-   gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
-
-   //loop until global variable g_runStat claims stop
-   while (g_runStat >= 0 && g_reset == 0)
-   {
-      gj.readStat = g_runStat;
-
-      for (int b = 0; b < NBOARDS; b++)
-      {
-          // Nothing has changed
-          if (g_port[b].sockDef == sockDef[b])
-              continue;
-
-          gi_NumConnect[b] = 0;       //must close all connections
-          gj.numConn[b] = 0;
-
-          // s0 =  0: sockets to be defined and opened
-          // s0 = -1: sockets to be destroyed
-          // s0 = +1: sockets to be closed and reopened
-
-          int s0 = 0;
-          if (sockDef[b] != 0)
-              s0 = g_port[b].sockDef==0 ? -1 : +1;
-
-          const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0;
-
-          GenSock(s0, b, p0+1, &g_port[b].sockAddr, &rd[b]); //generate address and socket
-
-          sockDef[b] = g_port[b].sockDef;
-      }
-
-      // count the number of active boards
-      int actBoards = 0;
-      for (int b = 0; b < NBOARDS; b++)
-          if (sockDef[b] > 0)
-              actBoards++;
-
-      //check all sockets if something to read
-      for (int i = 0; i < NBOARDS; i++)
-      {
-          // Do not try to connect this socket
-          if (rd[i].sockStat > 0)
-              continue;
-
-          if (rd[i].sockStat == -1)
-          {
-              //try to connect if not yet done
-              rd[i].sockStat = connect (rd[i].socket,
-                                        (struct sockaddr *) &rd[i].SockAddr,
-                                        sizeof (rd[i].SockAddr));
-              // Failed
-              if (rd[i].sockStat == -1)
-              {
-                  rd[i].errCnt++;
-                  usleep(25000);
-                  continue;
-              }
-
-              // Success (rd[i].sockStat == 0)
-
-              if (sockDef[i] > 0)
-              {
-                  rd[i].bufTyp = 0;                     // expect a header
-                  rd[i].bufLen = sizeof(PEVNT_HEADER);  // max size to read at begining
-              }
-              else
-              {
-                  rd[i].bufTyp = -1;       // full data to be skipped
-                  rd[i].bufLen = MAX_LEN;  // huge for skipping
-              }
-
-              rd[i].bufPos = 0;  //  no byte read so far
-              rd[i].skip   = 0;  //  start empty
-
-              gi_NumConnect[i] += cntsock;
-              gj.numConn[i]++;
-
-              factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", i, gj.numConn[i]);
-          }
-
-          // Do not read from this socket
-          if (rd[i].bufLen<0)
-              continue;
-
-          if (rd[i].bufLen>0)
-          {
-              const int32_t jrd =
-                  recv(rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
-                       rd[i].bufLen, MSG_DONTWAIT);
-
-              // recv failed
-              if (jrd<0)
-              {
-                  // There was just nothing waiting
-                  if (errno==EWOULDBLOCK || errno==EAGAIN)
-                      continue;
-
-                  factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);
-                  continue;
-              }
-
-              // connection was closed ...
-              if (jrd==0)
-              {
-                  factPrintf(kInfo, 441, "Socket %d closed by FAD", i);
-
-                  const int s0 = sockDef[i] > 0 ? +1 : -1;
-                  GenSock(s0, i, 0, NULL, &rd[i]);
-
-                  gi_NumConnect[i]-= cntsock ;
-                  gj.numConn[i]--;
-
-                  continue;
-              }
-
-              gj.rateBytes[i] += jrd;
-
-              // are we skipping this board ...
-              if (rd[i].bufTyp < 0)
-                  continue;
-
-              rd[i].bufPos += jrd;  //==> prepare for continuation
-              rd[i].bufLen -= jrd;
-
-#ifdef EVTDEBUG
-              debugRead(i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, rd[i].bufTyp, tv.tv_sec, tv.tv_usec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
 #endif
-          }
-
-          //we are reading event header
-          if (rd[i].bufTyp <= 0)
-          {
-              //not yet sufficient data to take action
-              if (rd[i].bufPos < minLen)
-                  continue;
-
-              //check if startflag correct; else shift block ....
-              // FIXME: This is not enough... this combination of
-              //        bytes can be anywhere... at least the end bytes
-              //        must be checked somewhere, too.
-              int k;
-              for (k = 0; k < rd[i].bufPos - 1; k++)
-              {
-                  //start.S = 0xFB01;
-                  if (rd[i].rBuf->B[k] == 0xfb && rd[i].rBuf->B[k+1] == 0x01)
-                      break;
-              }
-              rd[i].skip += k;
-
-              //no start of header found
-              if (k >= rd[i].bufPos - 1)
-              {
-                  rd[i].rBuf->B[0] = rd[i].rBuf->B[rd[i].bufPos];
-                  rd[i].bufPos = 1;
-                  rd[i].bufLen = sizeof(PEVNT_HEADER)-1;
-                  continue;
-              }
-
-              if (k > 0)
-              {
-                  rd[i].bufPos -= k;
-                  rd[i].bufLen += k;
-                  memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
-                           rd[i].bufPos);
-              }
-
-              if (rd[i].bufPos < minLen)
-                  continue;
-
-              if (rd[i].skip > 0)
-              {
-                  factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i);
-                  rd[i].skip = 0;
-              }
-
-              // TGB: This needs much more checks than just the first two bytes!
-
-              // Swap everything except start_package_flag.
-              // It is to difficult to find out where it is used how,
-              // but it doesn't really matter because it is not really
-              // used anywehere else
-              //   rd[i].rBuf->S[1]  = ntohs(rd[i].rBuf->S[1]);    // package_length
-              rd[i].rBuf->S[2]  = ntohs(rd[i].rBuf->S[2]);    // version_no
-              rd[i].rBuf->S[3]  = ntohs(rd[i].rBuf->S[3]);    // PLLLCK
-              rd[i].rBuf->S[4]  = ntohs(rd[i].rBuf->S[4]);    // trigger_crc
-              rd[i].rBuf->S[5]  = ntohs(rd[i].rBuf->S[5]);    // trigger_type
-
-              rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]);   // board id
-              rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]);   // adc_clock_phase_shift
-              rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]);   // number_of_triggers_to_generate
-              rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]);   // trigger_generator_prescaler
-
-              rd[i].rBuf->I[3]  = ntohl(rd[i].rBuf->I[3]);    // trigger_id
-              rd[i].rBuf->I[4]  = ntohl(rd[i].rBuf->I[4]);    // fad_evt_counter
-              rd[i].rBuf->I[5]  = ntohl(rd[i].rBuf->I[5]);    // REFCLK_frequency
-
-              rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]);   // runnumber;
-              rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]);   // time;
-
-              for (int s=24;s<24+NTemp+NDAC;s++)
-                  rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac
-
-              rd[i].fadLen  = ntohs(rd[i].rBuf->S[1]) * 2;
-              rd[i].fadVers = rd[i].rBuf->S[2];
-              rd[i].ftmTyp  = rd[i].rBuf->S[5];
-              rd[i].ftmID   = rd[i].rBuf->I[3];    //(FTMevt)
-              rd[i].evtID   = rd[i].rBuf->I[4];    //(FADevt)
-              rd[i].runID   = rd[i].rBuf->I[11]==0 ? g_actTime : rd[i].rBuf->I[11];
-              rd[i].bufTyp  = 1;  //ready to read full record
-              rd[i].bufLen  = rd[i].fadLen - rd[i].bufPos;
-
-              const int fadBoard = rd[i].rBuf->S[12];
-              debugHead(i, fadBoard, rd[i].rBuf);
-
-              continue;
-          }
-
-          // are we reading data
-
-          // not yet all read
-          if (rd[i].bufLen > 0)
-              continue;
-
-          // stop.S = 0x04FE;
-          if (rd[i].rBuf->B[rd[i].fadLen - 1] != 0xfe ||
-              rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04)
-          {
-              factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d",
-                         i, rd[i].evtID, rd[i].fadLen,
-                         rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]);
-
-              // ready to read next header
-              rd[i].bufTyp = 0;
-              rd[i].bufLen = sizeof(PEVNT_HEADER);
-              rd[i].bufPos = 0;
-
-              continue;
-          }
-
-          //  int actid;
-          //  if (g_useFTM > 0)
-          //     actid = rd[i].evtID;
-          //  else
-          //     actid = rd[i].ftmID;
-
-          //get index into mBuffer for this event (create if needed)
-          const shared_ptr<EVT_CTRL> evt = mBufEvt(&rd[i]);
-
-          // We have a valid entry, but no memory has yet been allocated
-          if (evt && evt->FADhead == NULL)
-          {
-              // Try to get memory from the big buffer
-              evt->FADhead = (PEVNT_HEADER*)TGB_Malloc();
-              if (evt->FADhead == NULL)
-              {
-                  // If this works properly, this is a hack which can be removed, or
-                  // replaced by a signal or dim message
-                  if (rd[i].bufTyp==2)
-                      factPrintf(kError, 882, "malloc failed for event %d (run=%d)", evt->evNum, evt->runNum);
-                  rd[i].bufTyp = 2;
-                  continue;
-              }
-
-              // Initialise contents of mBuffer[evID]->fEvent
-              initEvent(evt);
-
-              // Some statistics
-              gj.usdMem = tgb_inuse;
-
-              if (gj.usdMem > gj.maxMem)
-                  gj.maxMem = gj.usdMem;
-
-              gj.rateNew++;
-              gj.bufTot++;
-              if (gj.bufTot > gj.maxEvt)
-                  gj.maxEvt = gj.bufTot;
-          }
-
-          rd[i].bufTyp = 0;
-          rd[i].bufLen = sizeof(PEVNT_HEADER);
-          rd[i].bufPos = 0;
-
-          // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
-          if (!evt)
-              continue;
-
-          //we have a valid entry in mBuffer[]; fill it
-          const int fadBoard = rd[i].rBuf->S[12];
-          const int fadCrate = fadBoard>>8;
-
-          if (i != (fadCrate * 10 + (fadBoard&0xff)))
-          {
-              factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)",
-                         i, fadBoard, fadCrate, fadBoard&0xff);
-          }
-
-          if (evt->board[i] != -1)
-          {
-              factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d",
-                         evt->evNum, i, i, rd[i].fadLen,
-                         rd[i].rBuf->B[0], rd[i].rBuf->B[1],
-                         rd[i].rBuf->B[rd[i].fadLen - 2],
-                         rd[i].rBuf->B[rd[i].fadLen - 1]);
-              continue; // Continue reading next header
-          }
-
-          // Copy data from rd[i] to mBuffer[evID]
-          copyData(rd[i].rBuf, i, evt);
-
-          // now we have stored a new board contents into Event structure
-
-          evt->fEvent->NumBoards++;
-          evt->board[i] = i;
-          evt->nBoard++;
-          evt->evtStat = evt->nBoard;
-
-          // have we already reported first (partial) event of this run ???
-          if (evt->nBoard==1 && evt->runNum != actrun)
-          {
-             // Signal the fadctrl that a new run has been started
-              gotNewRun(evt->runNum, NULL);
-
-              factPrintf(kInfo, 1, "gotNewRun called, prev run %d, new run %d, event %d",
-                         actrun, evt->runNum, evt->evNum);
-
-              // We got the first part of this event, so this is
-              // the number of events we expect for this run
-              evt->run->lastEvt++;
-
-              // Since we have started a new run, we know already when to close the
-              // previous run in terms of number of events
-              const auto ir = runCtrl.find(actrun);
-              if (ir!=runCtrl.end())
-                  ir->second->maxEvt = ir->second->lastEvt;
-
-              // Change 'actrun' the the new runnumber
-              actrun = evt->runNum;
-          }
-
-          // event not yet complete
-          if (evt->nBoard < actBoards)
-              continue;
-
-          // GARBAGE COLLECTION
-          // This is a non-ideal hack to lower the probability that
-          // in mBufEvt the search for correct entry in runCtrl
-          // will not return a super-old entry. I don't want
-          // to manipulate that in another thread.
-          for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
-          {
-              if (ir->runId==evt->runNum)
-                  break;
-
-              if (ir->second->fileId>0)
-                  runCtrl.erase(ir);
-          }
-
-          // we have just completed an event... so all previous events
-          // must have been completed already. If they are not, there
-          // is no need to wait for the timeout, because they will never
-          // get completed. We can just ensure that if we check for the previous
-          // event to be complete every time we receive a new complete event.
-          // If we find an incomplete one, we remove all consecutive
-          // incomplete ones.
-          for (auto it=evtCtrl.begin()+1; it!=evtCtrl.end(); it++)
-          {
-              const shared_ptr<EVT_CTRL> e = *it;
-
-              if (e.get()==evt.get())
-              {
-                  queue.post(e);
-                  evtCtrl.erase(it);
-                  break;
-              }
-
-              reportIncomplete(e, "expired");
-              evtCtrl.pop_front();
-          }
-
-      } // end for loop over all sockets
-
-      g_actTime = time (NULL);
-      if (g_actTime <= gi_SecTime)
-      {
-          usleep(1);
-          continue;
-      }
-      gi_SecTime = g_actTime;
-
-      gj.bufNew = 0;
-
-      //loop over all active events and flag those older than read-timeout
-      //delete those that are written to disk ....
-
-      const int count = evtCtrl.size();//(evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT;
-
-      // This could be improved having the pointer which separates the queue with
-      // the incomplete events from the queue with the complete events
-      for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
-      {
-          const shared_ptr<EVT_CTRL> evt = *it;
-
-          // Check the more likely case first: incomplete events
-          if (evt->evtStat>=0 && evt->evtStat<100)
-          {
-              gj.bufNew++;     //incomplete event in Buffer
-
-              // Event has not yet timed out or was reported already
-              if (evt->evtStat==90 || evt->pcTime[0]>=g_actTime - 30)
-                  continue;
-
-              // This will result in the emission of a dim service.
-              // It doesn't matter if that takes comparably long,
-              // because we have to stop the run anyway.
-              const uint64_t rep = reportIncomplete(evt, "timeout");
-              factReportIncomplete(rep);
-
-              //timeout for incomplete events
-              evt->evtStat = 90;
-              gj.evtSkip++;
-
-              continue;
-          }
-
-          // Check the less likely case: 'useless' or 'delete'
-          // evtState==0 can happen if the event was initialized (some data received)
-          // but the data did not make sense (e.g. inconsistent rois)
-          if (evt->evtStat==0 || evt->evtStat == 10000)
-          {
-              evtCtrl.erase(it); //event written--> free memory
-
-              gj.evtWrite++;
-              gj.rateWrite++;
-          }
-
-          // Remove leading invalidated slots from queue
-          // Do they exist at all?
-          if (evt->evtStat==-1)
-              evtCtrl.erase(it);
-      }
-
-
-
-      // The number of complete events in the buffer is the total number of
-      // events in the buffer minus the number of incomplete events.
-      gj.bufEvt = count - gj.bufNew;
-
-      gj.deltaT = 1000;      //temporary, must be improved
-
-      for (int ib = 0; ib < NBOARDS; ib++)
-          gj.totBytes[ib] += gj.rateBytes[ib];
-
-      gj.totMem = tgb_memory;
-
-      if (gj.maxMem > gj.xxxMem)
-          gj.xxxMem = gj.maxMem;
-      if (gj.maxEvt > gj.xxxEvt)
-          gj.xxxEvt = gj.maxEvt;
-
-      factStat (gj);
-      //factStatNew (gi);
-      gj.rateNew = gj.rateWrite = 0;
-      gj.maxMem = gj.usdMem;
-      gj.maxEvt = gj.bufTot;
-      for (int b = 0; b < NBOARDS; b++)
-          gj.rateBytes[b] = 0;
-
-   } // while (g_runStat >= 0 && g_reset == 0)
-
-   factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset);
-
-   if (g_reset > 0)
-   {
-       gi_reset  = g_reset;
-       gi_resetR = gi_reset % 10;          //shall we stop reading ?
-       gi_resetS = (gi_reset / 10) % 10;   //shall we close sockets ?
-       gi_resetW = (gi_reset / 100) % 10;  //shall we close files ?
-       gi_resetX = gi_reset / 1000;        //shall we simply wait resetX seconds ?
-       g_reset   = 0;
-   }
-   else
-   {
-       gi_reset  = 0;
-       gi_resetR = g_runStat == -1 ? 1 : 7;
-
-       gi_resetS = 7;            //close all sockets
-       gi_resetW = 7;            //close all files
-       gi_resetX = 0;
-
-       //inform others we have to quit ....
-       gj.readStat = -11;        //inform all that no update to happen any more
-   }
-
-   if (gi_resetS > 0)
-   {
-       //must close all open sockets ...
-       factPrintf(kInfo, -1, "Close all sockets...");
-
-       for (int i = 0; i < NBOARDS; i++)
-       {
-           if (rd[i].sockStat != 0)
-               continue;
-
-           GenSock(-1, i, 0, NULL, &rd[i]);   //close and destroy open socket
-
-           gi_NumConnect[i]-= cntsock ;
-           gj.numConn[i]--;
-           sockDef[i] = 0;      //flag ro recreate the sockets ...
-           rd[i].sockStat = -1; //and try to open asap
-       }
-   }
-
-
-   if (gi_resetR > 0)
-   {
-       //and clear all buffers (might have to wait until all others are done)
-       while (evtCtrl.size())
-       {
-           const shared_ptr<EVT_CTRL> evt = evtCtrl.front();
-
-           // flag incomplete events as 'read finished'
-           // We cannot just detele all events, because some might currently being processed,
-           // so we have to wait until the processing thread currently processing the event
-           // signals that the event can be deleted. (Note, that there are currently never
-           // two threads processing the same event at the same time)
-           if ((evt->evtStat>0 && evt->evtStat<90) || evt->evtStat==10000)
-               evtCtrl.pop_front();
-
-           usleep(1);
-       }
-   }
-
-   //queue.wait();
-   //queue.join();
-
-   if (gi_reset > 0)
-   {
-      if (gi_resetW > 0)
-         CloseRunFile (0, 0, 0);        //ask all Runs to be closed
-
-      if (gi_resetX > 0)
-      {
-          struct timespec xwait;
-          xwait.tv_sec = gi_resetX;
-          xwait.tv_nsec = 0;
-          nanosleep (&xwait, NULL);
-      }
-
-      factPrintf(kInfo, -1, "Continue read Process ...");
-      gi_reset = 0;
-      goto START;
-   }
-
-   factPrintf(kInfo, -1, "Exit read Process...");
-
-   factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse);
-
-   gj.readStat = -99;
-
-   factStat (gj);
-   //factStatNew (gi);
-
-   return 0;
-
-} /*-----------------------------------------------------------------*/
-/*
-
-void *subProc(void *thrid)
-{
-    const int64_t threadID = (int64_t)thrid;
-
-    factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID);
-
-    while (g_runStat > -2) //in case of 'exit' we still must process pending events
-    {
-        int numWait = 0;
-
-        for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
-        {
-            const shared_ptr<EVT_CTRL> evt = *it;
-
-           // This is a threading issue... the evtStat might have been invalid
-           // but the frstPtr is not yet updated
-           if (evt->evtStat==-1)
-               continue;
-
-           // If we find the first event still waiting for processing
-           // there will be only unprocessed events after this one in the queue
-           if (evt->evtStat<1000+threadID)
-           {
-               numWait = 1;
-               break;
-           }
-
-           // If the event was processed already, skip it
-           // We could replace that to a moving pointer pointing to the first
-           // non-processed event
-           if (evt->evtStat!=1000+threadID)
-               continue;
-
-            const int jret = subProcEvt(threadID, evt->FADhead, evt->fEvent, 0);
-
-            if (jret>0 && jret<=threadID)
-                factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret);
-
-            if (jret<=threadID)
-            {
-                evt->evtStat = 10000;            // flag as 'to be deleted'
-                continue;
-            }
-
-            if (jret>=gi_maxProc)
-            {
-                evt->evtStat = 5000;            // flag as 'to be written'
-                continue;
-            }
-
-            evt->evtStat = 1000 + jret;       // flag for next proces
-        }
-
-        if (gj.readStat < -10 && numWait == 0) {  //nothing left to do
-            factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID);
-            return 0;
-        }
-
-        usleep(1);
-    }
-
-    factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID);
-
-    return 0;
-}
-
-
-void *
-procEvt (void *ptr)
-{
-   int status;
-
-   factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc);
-
-   pthread_t thread[100];
-//   int th_ret[100];
-
-   for (long long k = 0; k < gi_maxProc; k++) {
-       pthread_create (&thread[k], NULL, subProc, (void *) k);
-   }
-
-   // in case of 'exit' we still must process pending events
-   while (g_runStat > -2)
-   {
-       int numWait = 0;
-
-       for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
-       {
-           const shared_ptr<EVT_CTRL> evt = *it;
-
-           // This is a threading issue... the evtStat might have been invalid
-           // but the frstPtr is not yet updated
-           if (evt->evtStat==-1)
-               continue;
-
-           // If we find the first incomplete event which is not supposed to
-           // be processed, there are only more incomplete events in the queue
-           if (evt->evtStat<90)
-           {
-               numWait = 1;
-               break;
-           }
-
-           // If the event was processed already, skip it.
-           // We could replace that to a moving pointer pointing to the first
-           // non-processed event
-           if (evt->evtStat>=1000)
-               continue;
-
-           //-------- it is better to open the run already here, so call can be used to initialize
-           //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
-           const uint32_t irun = evt->runNum;
-           const int32_t  ievt = evt->evNum;
-
-           const shared_ptr<RUN_CTRL> run = evt->run;
-           if (run->runId==0)
-               continue;
-
-           // File not yet open
-           if (run->fileId < 0)
-           {
-               RUN_HEAD actRun;
-               actRun.Version =  1;
-               actRun.RunType = -1;  //to be adapted
-               actRun.Nroi    = evt->nRoi;      //runCtrl[lastRun].roi0;
-               actRun.NroiTM  = evt->nRoiTM;    //runCtrl[lastRun].roi8;
-               actRun.RunTime = evt->pcTime[0]; //runCtrl[lastRun].firstTime;
-               actRun.RunUsec = evt->pcTime[1]; //runCtrl[lastRun].firstUsec;
-               actRun.NBoard  = NBOARDS;
-               actRun.NPix    = NPIX;
-               actRun.NTm     = NTMARK;
-
-               memcpy(actRun.FADhead, evt->FADhead, NBOARDS*sizeof(PEVNT_HEADER));
-
-               run->fileHd = runOpen(irun, &actRun, sizeof (actRun));
-               if (run->fileHd == NULL)
-               {
-                   factPrintf(kError, 502, "procEvt: Could not open new file for run %d (evt=%d, runOpen failed)", irun, ievt);
-                   run->fileId = 91;
-                   continue;
-               }
-
-               run->fileId = 0;
-
-               factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt);
-           }
-
-           //--------
-           //--------
-
-           //and set correct event header ; also check for consistency in event (not yet)
-           evt->fEvent->Errors[0] = evt->Errors[0];
-           evt->fEvent->Errors[1] = evt->Errors[1];
-           evt->fEvent->Errors[2] = evt->Errors[2];
-           evt->fEvent->Errors[3] = evt->Errors[3];
-
-           for (int ib=0; ib<NBOARDS; ib++)
-           {
-               // board is not read
-               if (evt->board[ib] == -1)
-               {
-                   evt->FADhead[ib].start_package_flag = 0;
-                   evt->fEvent->BoardTime[ib] = 0;
-               }
-               else
-               {
-                   evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time;
-               }
-           }
-
-           const int rc = eventCheck(evt->runNum, evt->FADhead, evt->fEvent);
-           if (rc < 0)
-           {
-               evt->evtStat = 10000;        // flag event to be deleted
-           }
-           else
-           {
-               evt->evtStat = 1000;       // flag 'start processing'
-               run->procEvt++;
-           }
-       }
-
-       if (gj.readStat < -10 && numWait == 0) {  //nothing left to do
-           factPrintf(kInfo, -1, "Exit Processing Process ...");
-           gj.procStat = -22;     //==> we should exit
-           return 0;
-       }
-
-       usleep(1);
-
-       gj.procStat = gj.readStat;
-   }
-
-   //we are asked to abort asap ==> must flag all remaining events
-   //   when gi_runStat claims that all events are in the buffer...
-
-   factPrintf(kInfo, -1, "Abort Processing Process ...");
-
-   for (int k = 0; k < gi_maxProc; k++) {
-      pthread_join (thread[k], (void **) &status);
-   }
-
-   gj.procStat = -99;
-
-   return 0;
-
-} */
-
-int
-CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
-{
-/* close run runId (all all runs if runId=0) */
-/* return: 0=close scheduled / >0 already closed / <0 does not exist */
-
-   if (runId == 0)
-   {
-       for (auto it=runCtrl.begin(); it!=runCtrl.end(); it++)
-       {
-           const shared_ptr<RUN_CTRL> run = it->second;
-
-           //run is open
-           if (run->fileId == 0)
-           {
-               run->closeTime = closeTime;
-               run->maxEvt = maxEvt;
-           }
-       }
-       return 0;
-   }
-
-   auto it=runCtrl.find(runId);
-   if (it==runCtrl.end())
-       return -1;
-
-   const shared_ptr<RUN_CTRL> run = it->second;
-
-   // run already closed
-   if (run->fileId>0)
-       return +2;
-
-   run->closeTime = closeTime;
-   run->maxEvt = maxEvt;
-
-   return run->fileId==0 ? 0 : 1;
-
-}
-
-void checkAndCloseRun(const shared_ptr<RUN_CTRL> &run, int cond, int where)
-{
-    if (!cond &&
-        run->closeTime >= g_actTime &&
-        run->lastTime >= g_actTime - 300 &&
-        run->maxEvt > run->actEvt)
-        return;
-
-    //close run for whatever reason
-    int ii = 0;
-    if (cond)
-        ii = 1;
-    if (run->closeTime < g_actTime)
-        ii |= 2; // = 2;
-    if (run->lastTime < g_actTime - 300)
-        ii |= 4; // = 3;
-    if (run->maxEvt <= run->actEvt)
-        ii |= 8; // = 4;
-
-    run->closeTime = g_actTime - 1;
-
-    const int rc = runClose(run->fileHd, NULL, 0);//&runTail[j], sizeof(runTail[j]));
-    if (rc<0)
-    {
-        factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)",
-                   where, run->runId, rc);
-        run->fileId = 92+where*2;
-    }
-    else
-    {
-        factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)",
-                   where, run->runId, ii);
-        run->fileId = 93+where*2;
-    }
-}
-
-/*-----------------------------------------------------------------*/
-
-/*
-void *writeEvt (void *ptr)
-{
-   factPrintf(kInfo, -1, "Starting write-thread");
-
-   while (g_runStat > -2)
-   {
-       //int numWrite = 0;
-       int numWait  = 0;
-
-       // Note that the current loop does not at all gurantee that
-       // the events are written in the correct order.
-       for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
-       {
-           const shared_ptr<EVT_CTRL> evt = *it;
-
-           // This is a threading issue... the evtStat might have been invalid
-           // but the frstPtr is not yet updated
-           if (evt->evtStat==-1)
-               continue;
-
-           // If we find the first non-written event which is not supposed to
-           // be written, there are only more incomplete events in the queue
-           if (evt->evtStat<5000)
-           {
-               numWait = 1;
-               break;
-           }
-
-           // If the event was written already already, skip it
-           // We could replace that to a moving pointer pointing to the first
-           // non-processed event
-           if (evt->evtStat!=5000)
-               continue;
-
-           const shared_ptr<RUN_CTRL> run = evt->run;
-           if (run->runId==0)
-               continue;
-
-           // File is open
-           if (run->fileId==0)
-           {
-               const int rc = runWrite(run->fileHd, evt->fEvent, 0);
-               if (rc >= 0)
-               {
-                   // Sucessfully wrote event
-                   run->lastTime = g_actTime;
-                   run->actEvt++;
-               }
-               else
-                   factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", evt->runNum);
-
-               checkAndCloseRun(run, rc<0, 1);
-           }
-
-           evt->evtStat = 10000;  // event written (or has to be discarded) -> delete
-       }
-
-       // Although the are no pending events, we have to check if a run should be closed (timeout)
-       for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
-       {
-           if (ir->second->fileId == 0)
-           {
-               //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
-               const int cond = ir->second->runId == 0;
-               checkAndCloseRun(ir->second, cond, 2);
-           }
-       }
-
-       usleep(1);
-
-       //nothing left to do
-       if (gj.readStat < -10 && numWait == 0)
-       {
-           factPrintf(kInfo, -1, "Finish Write Process ...");
-           gj.writStat = -22;     //==> we should exit
-           break;
-       }
-
-       gj.writStat = gj.readStat;
-   }
-
-   factPrintf(kInfo, -1, "Close all open files ...");
-   for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
-   {
-       if (ir->second->fileId == 0)
-           checkAndCloseRun(ir->second, 1, 3);
-   }
-
-   gj.writStat = -99;
-
-   factPrintf(kInfo, -1, "Exit Writing Process ...");
-
-   return 0;
-}
-   */
-
-
-
-
-void
-StartEvtBuild ()
-{
-
-   int i, /*j,*/ imax, status/*, th_ret[50]*/;
-   pthread_t thread[50];
-   struct timespec xwait;
-
-   gj.readStat = gj.procStat = gj.writStat = 0;
-
-   factPrintf(kInfo, -1, "Starting EventBuilder V15.07 A");
-
-
-   gi_maxProc = g_maxProc;
-   if (gi_maxProc <= 0 || gi_maxProc > 90) {
-      factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc);
-      gi_maxProc = 1;
-   }
-
-//start all threads (more to come) when we are allowed to ....
-   while (g_runStat == 0) {
-      xwait.tv_sec = 0;
-      xwait.tv_nsec = 10000000; // sleep for ~10 msec
-      nanosleep (&xwait, NULL);
-   }
-
-   i = 0;
-   /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL);
-   i++;
-   ///*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL);
-   //i++;
-   ///*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL);
-   //i++;
-   imax = i;
-
-
-#ifdef BILAND
-   xwait.tv_sec = 30;;
-   xwait.tv_nsec = 0;           // sleep for ~20sec
-   nanosleep (&xwait, NULL);
-
-   printf ("close all runs in 2 seconds\n");
-
-   CloseRunFile (0, time (NULL) + 2, 0);
-
-   xwait.tv_sec = 1;;
-   xwait.tv_nsec = 0;           // sleep for ~20sec
-   nanosleep (&xwait, NULL);
-
-   printf ("setting g_runstat to -1\n");
-
-   g_runStat = -1;
+        } // end for loop over all sockets
+
+        // ==================================================================
+
+        // +1 -> idx=0
+        // -1 -> idx=0
+        // +2 -> idx=0
+        // -2 -> idx=0
+        // +3 -> idx=0
+        // -3 -> idx=0
+        // +4 -> idx=0
+        // -4 -> idx=0
+        // +5 -> idx=0
+        // -5 -> idx=0
+        // +6 -> idx=0
+        // -6 -> idx=0
+        //
+
+        // ==================================================================
+
+        const time_t actTime = time(NULL);
+        if (actTime == gi_SecTime)
+        {
+#if !defined(USE_SELECT) && !defined(USE_EPOLL)
+            if (evtCtrl.size()==0)
+                usleep(1);
 #endif
-
-
-//wait for all threads to finish
-   for (i = 0; i < imax; i++) {
-      /*j =*/ pthread_join (thread[i], (void **) &status);
-   }
-
-} /*-----------------------------------------------------------------*/
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-  /*-----------------------------------------------------------------*/
-  /*-----------------------------------------------------------------*/
-  /*-----------------------------------------------------------------*/
-  /*-----------------------------------------------------------------*/
-  /*-----------------------------------------------------------------*/
-
-#ifdef BILAND
-
-int
-subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
-            int8_t * buffer)
-{
-   printf ("called subproc %d\n", threadID);
-   return threadID + 1;
-}
-
-
-
-
-  /*-----------------------------------------------------------------*/
-  /*-----------------------------------------------------------------*/
-  /*-----------------------------------------------------------------*/
-  /*-----------------------------------------------------------------*/
-  /*-----------------------------------------------------------------*/
-
-
-
-
-FileHandle_t
-runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
-{
-   return 1;
-};
-
-int
-runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
-{
-   return 1;
-   usleep (10000);
-   return 1;
-}
-
-
-//{ return 1; } ;
-
-int
-runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
-{
-   return 1;
-};
-
-
-
-
-int
-eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
-{
-   int i = 0;
-
-// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
-// for (i=0; i<NBOARDS; i++) {
-//    printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
-// }
-   return 0;
-}
-
-
-void
-factStatNew (EVT_STAT gi)
-{
-   int i;
-
-//for (i=0;i<MAX_SOCK;i++) {
-//   printf("%4d",gi.numRead[i]);
-//   if (i%20 == 0 ) printf("\n");
-//}
-}
-
-void
-gotNewRun (int runnr, PEVNT_HEADER * headers)
-{
-   printf ("got new run %d\n", runnr);
-   return;
-}
-
-void
-factStat (GUI_STAT gj)
-{
-//  printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
-//    array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
-}
-
-
-void
-debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
-           int state, uint32_t tsec, uint32_t tusec)
-{
-//  printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
-}
-
-
-
-void
-debugStream (int isock, void *buf, int len)
-{
-}
-
-void
-debugHead (int i, int j, void *buf)
-{
-}
-
-
-void
-factOut (int severity, int err, char *message)
-{
-   static FILE *fd;
-   static int file = 0;
-
-   if (file == 0) {
-      printf ("open file\n");
-      fd = fopen ("x.out", "w+");
-      file = 999;
-   }
-
-   fprintf (fd, "%3d %3d | %s \n", severity, err, message);
-
-   if (severity != kDebug)
-      printf ("%3d %3d | %s\n", severity, err, message);
-}
-
-
-
-int
-main ()
-{
-   int i, b, c, p;
-   char ipStr[100];
-   struct in_addr IPaddr;
-
-   g_maxMem = 1024 * 1024;      //MBytes
-   g_maxMem = g_maxMem * 200;   //100MBytes
-
-   g_maxProc = 20;
-
-   g_runStat = 40;
-
-   i = 0;
-
-// version for standard crates
-//for (c=0; c<4,c++) {
-//   for (b=0; b<10; b++) {
-//      sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
-//
-//      inet_pton(PF_INET, ipStr, &IPaddr) ;
-//
-//      g_port[i].sockAddr.sin_family = PF_INET;
-//      g_port[i].sockAddr.sin_port = htons(5000) ;
-//      g_port[i].sockAddr.sin_addr = IPaddr ;
-//      g_port[i].sockDef = 1 ;
-//      i++ ;
-//   }
-//}
-//
-//version for PC-test * 
-   for (c = 0; c < 4; c++) {
-      for (b = 0; b < 10; b++) {
-         sprintf (ipStr, "10.0.%d.11", 128 + c);
-         if (c < 2)
-            sprintf (ipStr, "10.0.%d.11", 128);
-         else
-            sprintf (ipStr, "10.0.%d.11", 131);
-//      if (c==0) sprintf(ipStr,"10.0.100.11") ;
-
-         inet_pton (PF_INET, ipStr, &IPaddr);
-         p = 31919 + 100 * c + 10 * b;
-
-
-         g_port[i].sockAddr.sin_family = PF_INET;
-         g_port[i].sockAddr.sin_port = htons (p);
-         g_port[i].sockAddr.sin_addr = IPaddr;
-         g_port[i].sockDef = 1;
-
-         i++;
-      }
-   }
-
-
-//g_port[17].sockDef =-1 ;
-//g_actBoards-- ; 
-
-   StartEvtBuild ();
-
-   return 0;
-
-}
-#endif
+            continue;
+        }
+        gi_SecTime = actTime;
+
+        // ==================================================================
+        //loop over all active events and flag those older than read-timeout
+        //delete those that are written to disk ....
+        //const int count = evtCtrl.size();
+
+        // This could be improved having the pointer which separates the queue with
+        // the incomplete events from the queue with the complete events
+        for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
+        {
+            // A reference is enough because the shared_ptr is hold by the evtCtrl
+            const shared_ptr<EVT_CTRL2> &evt = *it;
+
+            // The first event is the oldest. If the first event within the
+            // timeout window was received, we can stop searchinf further.
+            if (evt->time.tv_sec>=actTime - 30)
+                break;
+
+            // This will result in the emission of a dim service.
+            // It doesn't matter if that takes comparably long,
+            // because we have to stop the run anyway.
+            const uint64_t rep = reportIncomplete(evt, "timeout");
+            factReportIncomplete(rep);
+
+            it++;
+            evtCtrl.pop_front();
+        }
+
+        // =================================================================
+
+        // If nothing was received for more than 5min, close file
+        if (actTime-actrun->lastTime>300)
+            actrun->maxEvt = actrun->lastEvt;
+
+        // This is a fake event to trigger possible run-closing conditions once a second
+        // FIXME: This is not yet ideal because a file would never be closed
+        //        if a new file has been started and no events of the new file
+        //        have been received yet
+        if (actrun->fileStat==kFileOpen)
+            primaryQueue.post(shared_ptr<EVT_CTRL2>(new EVT_CTRL2(actrun)));
+
+        // =================================================================
+
+        gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
+        gj.usdMem = Memory::max_inuse;
+        gj.totMem = Memory::allocated;
+
+        gj.deltaT = 1000; // temporary, must be improved
+
+        for (int ib=0; ib<NBOARDS; ib++)
+        {
+            gj.rateBytes[ib]  = rd[ib].rateBytes;
+            gj.totBytes[ib]  += rd[ib].rateBytes;
+
+            rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr);
+
+            gi_NumConnect[ib] = rd[ib].connected;
+            gj.numConn[ib]    = rd[ib].connected;
+        }
+
+        factStat(gj);
+
+        Memory::max_inuse = 0;
+
+        for (int ib=0; ib<NBOARDS; ib++)
+            rd[ib].rateBytes = 0;
+    }
+
+    //   1: Stop, wait for event to get processed
+    //   2: Stop, finish immediately
+    // 101: Restart, wait for events to get processed
+    // 101: Restart, finish immediately
+    //
+    const int gi_reset = g_reset;
+
+    const bool abort = gi_reset%100==2;
+
+    factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
+
+    primaryQueue.wait(abort);
+    secondaryQueue.wait(abort);
+
+    // Here we also destroy all runCtrl structures and hence close all open files
+    evtCtrl.clear();
+
+    factPrintf(MessageImp::kInfo, "Exit read Process...");
+    factPrintf(MessageImp::kInfo, "%ld Bytes flagged as in-use.", Memory::inuse);
+
+    factStat(gj);
+
+    return gi_reset>=100;
+}
+
+// ==========================================================================
+// ==========================================================================
+
+void StartEvtBuild()
+{
+    factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
+
+
+    for (int k=0; k<NBOARDS; k++)
+    {
+        gi_NumConnect[k] = 0;
+        gj.numConn[k] = 0;
+        gj.totBytes[k] = 0;
+    }
+
+    gj.bufTot   = gj.maxEvt = gj.xxxEvt = 0;
+    gj.maxMem   = gj.xxxMem = 0;
+
+    gj.usdMem   = Memory::inuse;
+    gj.totMem   = Memory::allocated;
+
+    gj.bufNew   = gj.bufEvt = 0;
+    gj.evtSkip  = gj.evtWrite = gj.evtErr = 0;
+    gj.readStat = gj.procStat = gj.writStat = 0;
+
+
+
+    READ_STRUCT rd[NBOARDS];
+
+    // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets)
+    for (int i=0; i<NBOARDS; i++)
+        rd[i].sockId = i;
+
+    while (mainloop(rd));
+
+    //must close all open sockets ...
+    factPrintf(MessageImp::kInfo, "Close all sockets...");
+
+    READ_STRUCT::close();
+
+    // Now all sockets get closed. This is not reflected in gi_NumConnect
+    // The current workaround is to count all sockets as closed when the thread is not running
+}
