Index: trunk/FACT++/src/EventBuilder.cc
===================================================================
--- trunk/FACT++/src/EventBuilder.cc	(revision 16381)
+++ trunk/FACT++/src/EventBuilder.cc	(revision 16382)
@@ -1,2 +1,3 @@
+#include <poll.h>
 #include <sys/time.h>
 #include <sys/epoll.h>
@@ -6,5 +7,4 @@
 #include <cstdarg>
 #include <list>
-#include <forward_list>
 
 #include <boost/algorithm/string/join.hpp>
@@ -21,12 +21,40 @@
 
 //#define COMPLETE_EVENTS
+//#define USE_POLL
 //#define USE_EPOLL
 //#define USE_SELECT
 //#define COMPLETE_EPOLL
 
+// Reading only 1024: 13:  77Hz, 87%
+// Reading only 1024: 12:  78Hz, 46%
+// Reading only  300:  4: 250Hz, 92%
+// Reading only  300:  3: 258Hz, 40%
+
+// Reading only four threads 1024: 13:  77Hz, 60%
+// Reading only four threads 1024: 12:  78Hz, 46%
+// Reading only four threads  300:  4: 250Hz, 92%
+// Reading only four threads  300:  3: 258Hz, 40%
+
+// Default  300:  4: 249Hz, 92%
+// Default  300:  3: 261Hz, 40%
+// Default 1024: 13:  76Hz, 93%
+// Default 1024: 12:  79Hz, 46%
+
+// Poll [selected] 1024: 13:  63Hz, 45%
+// Poll [selected] 1024: 14:  63Hz, 63%
+// Poll [selected] 1024: 15:  64Hz, 80%
+// Poll [selected]  300:  4: 230Hz, 47%
+// Poll [selected]  300:  3: 200Hz, 94%
+
+// Poll [all]      1024: 13:  65Hz, 47%
+// Poll [all]      1024: 14:  64Hz, 59%
+// Poll [all]      1024: 15:  62Hz, 67%
+// Poll [all]       300:  4: 230Hz, 47%
+// Poll [all]       300:  3: 230Hz, 35%
+
 // ==========================================================================
 
-bool runOpen(const shared_ptr<EVT_CTRL2> &evt);
-bool runWrite(const shared_ptr<EVT_CTRL2> &evt);
+bool runOpen(const EVT_CTRL2 &evt);
+bool runWrite(const EVT_CTRL2 &evt);
 void runClose();
 void applyCalib(const shared_ptr<EVT_CTRL2> &evt);
@@ -36,5 +64,5 @@
 void runFinished();
 void factStat(GUI_STAT gj);
-bool eventCheck(const shared_ptr<EVT_CTRL2> &evt);
+bool eventCheck(const EVT_CTRL2 &evt);
 void debugHead(void *buf);
 
@@ -52,21 +80,4 @@
 
 // ==========================================================================
-
-void factPrintf(int severity, const char *fmt, ...)
-{
-    char str[1000];
-
-    va_list ap;
-    va_start(ap, fmt);
-    vsnprintf(str, 1000, fmt, ap);
-    va_end(ap);
-
-    factOut(severity, str);
-}
-
-// ==========================================================================
-
-#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
-#define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)
 
 namespace Memory
@@ -77,7 +88,7 @@
     uint64_t max_inuse = 0;
 
-    mutex mtx;
-
-    forward_list<void*> memory;
+    std::mutex mtx;
+
+    std::forward_list<void*> memory;
 
     void *malloc()
@@ -104,5 +115,5 @@
         {
             // Get the next free slot from the stack and return it
-            const lock_guard<mutex> lock(mtx);
+            const std::lock_guard<std::mutex> lock(mtx);
             mem = memory.front();
             memory.pop_front();
@@ -130,8 +141,23 @@
         }
 
-        const lock_guard<mutex> lock(mtx);
+        const std::lock_guard<std::mutex> lock(mtx);
         memory.push_front(mem);
     }
+
 };
+
+// ==========================================================================
+
+void factPrintf(int severity, const char *fmt, ...)
+{
+    char str[1000];
+
+    va_list ap;
+    va_start(ap, fmt);
+    vsnprintf(str, 1000, fmt, ap);
+    va_end(ap);
+
+    factOut(severity, str);
+}
 
 // ==========================================================================
@@ -185,7 +211,7 @@
     };
 
+    timeval  time;
     uint64_t rateBytes;
     uint32_t skip;     // number of bytes skipped before start of event
-    bool     repmem;   // reportet no mmemory free
 
     uint32_t len() const { return uint32_t(H.package_length)*2; }
@@ -215,5 +241,5 @@
 {
     // wait for something to do...
-    const int rc = epoll_wait(fd_epoll, events, NBOARDS, 10); // max, timeout[ms]
+    const int rc = epoll_wait(fd_epoll, events, NBOARDS, 100); // max, timeout[ms]
     if (rc>=0)
         return rc;
@@ -363,5 +389,4 @@
     bufPos = B;  // no byte read so far
     skip   = 0;       // start empty
-    repmem = false;
 
     factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
@@ -380,4 +405,7 @@
 bool READ_STRUCT::read()
 {
+    if (!connected)
+        return false;
+
     if (bufLen==0)
         return true;
@@ -409,4 +437,7 @@
     if (bufTyp==kStream)
         return false;
+
+    if (bufPos==B)
+        gettimeofday(&time, NULL);
 
     bufPos += jrd;  //==> prepare for continuation
@@ -437,4 +468,5 @@
     I[11] = ntohl(I[11]);   // time;
 
+    // Use back inserter??
     for (int s=24; s<24+NTemp+NDAC; s++)
         S[s] = ntohs(S[s]); // drs_temperature / dac
@@ -523,4 +555,11 @@
 shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
 {
+    /*
+     checkroi consistence
+     find existing entry
+     if no entry, try to allocate memory
+     if entry and memory, init event structure
+     */
+
     uint16_t nRoi[9];
     if (!checkRoiConsistency(rd, nRoi))
@@ -562,4 +601,11 @@
             evt->Errors[2]++;
 
+        // It is maybe not likely, but the header of this board might have
+        // arrived earlier. (We could also update the run-info, but
+        // this should not make a difference here)
+        if ((rd.time.tv_sec==evt->time.tv_sec && rd.time.tv_usec<evt->time.tv_usec) ||
+            rd.time.tv_sec<evt->time.tv_sec)
+            evt->time = rd.time;
+
         //everything seems fine so far ==> use this slot ....
         return evt;
@@ -573,7 +619,7 @@
     }
 
-    shared_ptr<EVT_CTRL2> evt(new EVT_CTRL2);
-
-    gettimeofday(&evt->time, NULL);
+    EVT_CTRL2 *evt = new EVT_CTRL2;
+
+    evt->time   = rd.time;
 
     evt->runNum = rd.H.runnumber;
@@ -613,15 +659,10 @@
     }
 
+    // Keep pointer to run of this event
+    evt->runCtrl = actrun;
+
     // Increase the number of events we have started to receive in this run
     actrun->lastTime = evt->time.tv_sec;  // Time when the last event was received
     actrun->lastEvt++;
-
-    // Keep pointer to run of this event
-    evt->runCtrl = actrun;
-
-    // Secure access to evtCtrl against access in CloseRunFile
-    // This should be the last... otherwise we can run into threading issues
-    // if the event is accessed before it is fully initialized.
-    evtCtrl.push_back(evt);
 
     // An event can be the first and the last, but not the last and the first.
@@ -629,10 +670,21 @@
     // runFinished signals that the last event of a run was just received. Processing
     // might still be ongoing, but we can start a new run.
-    const bool cond1 = actrun->lastEvt < actrun->maxEvt;      // max number of events not reached
+    const bool cond1 = actrun->lastEvt  < actrun->maxEvt;     // max number of events not reached
     const bool cond2 = actrun->lastTime < actrun->closeTime;  // max time not reached
     if (!cond1 || !cond2)
         runFinished();
 
-    return evt;
+    // We don't mind here that this is not common to all events,
+    // because every coming event will fullfil the condition as well.
+    if (!cond1)
+        evt->closeRequest |= kRequestMaxEvtsReached;
+    if (!cond2)
+        evt->closeRequest |= kRequestMaxTimeReached;
+
+    // Secure access to evtCtrl against access in CloseRunFile
+    // This should be the last... otherwise we can run into threading issues
+    // if the event is accessed before it is fully initialized.
+    evtCtrl.emplace_back(evt);
+    return evtCtrl.back();
 }
 
@@ -642,5 +694,5 @@
     const int i = rBuf.sockId;
 
-    memcpy(evt->FADhead.get()+i, &rBuf.H, sizeof(PEVNT_HEADER));
+    memcpy(evt->FADhead+i, &rBuf.H, sizeof(PEVNT_HEADER));
 
     int src = sizeof(PEVNT_HEADER) / 2;  // Header is 72 byte = 36 shorts
@@ -743,72 +795,71 @@
 void writeEvt(const shared_ptr<EVT_CTRL2> &evt)
 {
-    const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
+    //const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
+    RUN_CTRL2 &run = *evt->runCtrl;
 
     // Is this a valid event or just an empty event to trigger run close?
     // If this is not an empty event open the new run-file
     // Empty events are there to trigger run-closing conditions
-    if (evt->runNum>=0)
+    if (evt->valid())
     {
         // File not yet open
-        if (run->fileStat==kFileNotYetOpen)
+        if (run.fileStat==kFileNotYetOpen)
         {
             // runOpen will close a previous run, if still open
-            if (!runOpen(evt))
+            if (!runOpen(*evt))
             {
                 factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
-                run->fileStat = kFileClosed;
+                run.fileStat = kFileClosed;
                 return;
             }
 
             factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
-            run->fileStat = kFileOpen;
+            run.fileStat = kFileOpen;
         }
 
         // Here we have a valid calibration and can go on with that.
+        // It is important that _all_ events are sent for calibration (except broken ones)
         processingQueue1.post(evt);
     }
 
     // File already closed
-    if (run->fileStat==kFileClosed)
+    if (run.fileStat==kFileClosed)
         return;
 
     bool rc1 = true;
-    if (evt->runNum>=0)
-    {
-        rc1 = runWrite(evt);
+    if (evt->valid())
+    {
+        rc1 = runWrite(*evt);
         if (!rc1)
             factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
     }
 
-    const bool cond1 =  run->lastEvt < run->maxEvt;      // max number of events not reached
-    const bool cond2 =  run->lastTime < run->closeTime;  // max time not reached
-    const bool cond3 =  run->closeRequest==kRequestNone; // file signaled to be closed
-    const bool cond4 =  rc1;                             // Write successfull
+    // File not open... no need to close or to check for close
+    // ... this is the case if CloseRunFile was called before any file was opened.
+    if (run.fileStat!=kFileOpen)
+        return;
 
     // File is not yet to be closed.
-    if (cond1 && cond2 && cond3 && cond4)
+    if (rc1 && evt->closeRequest==kRequestNone)
         return;
 
     runClose();
-    run->fileStat = kFileClosed;
+    run.fileStat = kFileClosed;
 
     vector<string> reason;
-    if (!cond1)
-        reason.push_back(to_string(run->maxEvt)+" evts reached");
-    if (!cond2)
-        reason.push_back(to_string(run->closeTime-run->openTime)+"s reached");
-    if (!cond3)
-    {
-        if (run->closeRequest&kRequestManual)
-            reason.push_back("close requested");
-        if (run->closeRequest&kRequestTimeout)
-            reason.push_back("receive timeout");
-        if (run->closeRequest&kRequestConnectionChange)
-            reason.push_back("connection changed");
-        if (run->closeRequest&kRequestEventCheckFailed)
-            reason.push_back("event check failed");
-    }
-    if (!cond4)
-        reason.push_back("runWrite failed");
+    if (evt->closeRequest&kRequestManual)
+        reason.emplace_back("close requested");
+    if (evt->closeRequest&kRequestTimeout)
+        reason.emplace_back("receive timeout");
+    if (evt->closeRequest&kRequestConnectionChange)
+        reason.emplace_back("connection changed");
+    if (evt->closeRequest&kRequestEventCheckFailed)
+        reason.emplace_back("event check failed");
+    if (evt->closeRequest&kRequestMaxTimeReached)
+        reason.push_back(to_string(run.closeTime-run.openTime)+"s reached");
+    if (evt->closeRequest&kRequestMaxEvtsReached)
+        reason.push_back(to_string(run.maxEvt)+" evts reached");
+    if (!rc1)
+        reason.emplace_back("runWrite failed");
 
     const string str = boost::algorithm::join(reason, ", ");
@@ -820,5 +871,5 @@
 void procEvt(const shared_ptr<EVT_CTRL2> &evt)
 {
-    if (evt->runNum>=0)
+    if (evt->valid())
     {
         evt->fEvent->Errors[0] = evt->Errors[0];
@@ -827,10 +878,15 @@
         evt->fEvent->Errors[3] = evt->Errors[3];
 
+        evt->fEvent->PCTime = evt->time.tv_sec;
+        evt->fEvent->PCUsec = evt->time.tv_usec;
+
+        evt->fEvent->NumBoards = evt->nBoard;
+
         for (int ib=0; ib<NBOARDS; ib++)
-            evt->fEvent->BoardTime[ib] = evt->FADhead.get()[ib].time;
-
-        if (!eventCheck(evt))
-        {
-            evt->runCtrl->closeRequest = kRequestEventCheckFailed;
+            evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time;
+
+        if (!eventCheck(*evt))
+        {
+            secondaryQueue.emplace(new EVT_CTRL2(kRequestEventCheckFailed, evt->runCtrl));
             return;
         }
@@ -843,6 +899,4 @@
 // ==========================================================================
 // ==========================================================================
-
-shared_ptr<RUN_CTRL2> actrun; // needed in CloseRunFile
 
 /*
@@ -937,11 +991,18 @@
 */
 
+Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
+
+// This corresponds more or less to fFile... should we merge both?
+shared_ptr<RUN_CTRL2> actrun;
+
 void CloseRunFile()
 {
-    // Create a copy of the shared_ptr to ensure
-    // is not replaced in the middle of the action
-    const shared_ptr<RUN_CTRL2> run = actrun;
-    if (run)
-        run->closeRequest |= kRequestManual;
+    // Currently we need actrun here, to be able to set kFileClosed.
+    // Apart from that we have to ensure that there is an open file at all
+    // which we can close.
+    // Submission to the primary queue ensures that the event
+    // is placed at the right place in the processing chain.
+    // (Corresponds to the correct run)
+    primaryQueue.emplace(new EVT_CTRL2(kRequestManual, actrun));
 }
 
@@ -949,9 +1010,8 @@
 {
     factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
-
-    Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
 
     primaryQueue.start();
     secondaryQueue.start();
+    processingQueue1.start();;
 
     actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
@@ -964,4 +1024,24 @@
     while (g_reset == 0)
     {
+#ifdef USE_POLL
+        int    pp[40];
+        int    nn = 0;
+        pollfd fds[40];
+        for (int i=0; i<40; i++)
+        {
+            if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
+            {
+                fds[nn].fd = rd[i].socket;
+                fds[nn].events = POLLIN;
+                pp[nn] = i;
+                nn++;
+            }
+        }
+
+        const int rc_epoll = poll(fds, nn, 100);
+        if (rc_epoll<0)
+            break;
+#endif
+
 #ifdef USE_SELECT
         fd_set readfs;
@@ -978,5 +1058,5 @@
         timeval tv;
         tv.tv_sec = 0;
-        tv.tv_usec = 100;
+        tv.tv_usec = 100000;
         const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
         // 0: timeout
@@ -995,26 +1075,38 @@
 #endif
 
-#ifdef USE_EPOLL
+#if defined(USE_POLL)
+        for (int jj=0; jj<nn; jj++)
+#endif
+#if defined(USE_EPOLL)
         for (int jj=0; jj<rc_epoll; jj++)
-#else
+#endif
+#if !defined(USE_EPOLL) && !defined(USE_POLL)
         for (int jj=0; jj<NBOARDS; jj++)
 #endif
         {
+#ifdef USE_SELECT
+            if (!FD_ISSET(rs->socket, &readfs))
+                continue;
+#endif
+
+#ifdef USE_POLL
+            if ((fds[jj].revents&POLLIN)==0)
+                continue;
+#endif
+
 #ifdef USE_EPOLL
             // FIXME: How to get i?
             READ_STRUCT *rs = READ_STRUCT::get(jj);
-#else
-
+#endif
+
+#ifdef USE_POLL
+            // FIXME: How to get i?
+            READ_STRUCT *rs = &rd[pp[jj]];
+#endif
+
+#if !defined(USE_POLL) && !defined(USE_EPOLL)
             const int i = (jj%4)*10 + (jj/4);
             READ_STRUCT *rs = &rd[i];
-            if (!rs->connected)
-                continue;
-#endif
-
-#ifdef USE_SELECT
-            if (!FD_ISSET(rs->socket, &readfs))
-                continue;
-#endif
-
+#endif
 
 #ifdef COMPLETE_EVENTS
@@ -1106,21 +1198,12 @@
 
             // We have a valid entry, but no memory has yet been allocated
-            if (evt && !evt->FADhead)
+            if (evt && !evt->initMemory())
             {
-                // Try to get memory from the big buffer
-                PEVNT_HEADER *mem = (PEVNT_HEADER*)Memory::malloc();
-                if (!mem)
-                {
-                    // If this works properly, this is a hack which can be removed, or
-                    // replaced by a signal or dim message
-                    if (!rs->repmem)
-                    {
-                        factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
-                        rs->repmem = true;
-                    }
+                if (evt->reportMem)
                     continue;
-                }
-
-                evt->initEvent(shared_ptr<PEVNT_HEADER>(mem, Memory::free));
+
+                factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
+                evt->reportMem = true;
+                continue;
             }
 
@@ -1133,11 +1216,4 @@
             if (!evt)
                 continue;
-
-            /*
-             const int fad = (i/10)<<8)|(i%10);
-             if (fad != rs->H.board_id)
-             {
-                 factPrintf(MessageImp::kWarn, "Board ID mismatch. Expected %x, got %x", fad, rs->H.board_id);
-             }*/
 
             // This should never happen
@@ -1161,11 +1237,14 @@
 #endif
             // now we have stored a new board contents into Event structure
-            evt->fEvent->NumBoards++;
             evt->board[rs->sockId] = rs->sockId;
+            evt->header = evt->FADhead+rs->sockId;
             evt->nBoard++;
 
 #ifdef COMPLETE_EPOLL
             if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
+            {
                 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
+                break;
+            }
 #endif
             // event not yet complete
@@ -1199,5 +1278,8 @@
                 ev.data.ptr = &rd[j];  // user data (union: ev.ptr)
                 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
+                {
                     factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
+                    return;
+                }
             }
 #endif
@@ -1229,5 +1311,5 @@
         if (actTime == gi_SecTime)
         {
-#if !defined(USE_SELECT) && !defined(USE_EPOLL)
+#if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL)
             if (evtCtrl.empty())
                 usleep(1);
@@ -1263,8 +1345,4 @@
         }
 
-        // If nothing was received for more than 5min, close file
-        if (actTime-actrun->lastTime>300)
-            actrun->closeRequest |= kRequestTimeout;
-
         // =================================================================
 
@@ -1275,4 +1353,6 @@
         gj.deltaT = 1000; // temporary, must be improved
 
+        bool changed = false;
+
         for (int ib=0; ib<NBOARDS; ib++)
         {
@@ -1281,5 +1361,5 @@
 
             if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
-                actrun->closeRequest |= kRequestConnectionChange;
+                changed = true;
 
             gi_NumConnect[ib] = rd[ib].connected;
@@ -1287,9 +1367,9 @@
         }
 
-
         factStat(gj);
 
         Memory::max_inuse = 0;
         gj.maxEvt = 0;
+
         for (int ib=0; ib<NBOARDS; ib++)
             rd[ib].rateBytes = 0;
@@ -1301,6 +1381,19 @@
         //        if a new file has been started and no events of the new file
         //        have been received yet
+        int request = kRequestNone;
+
+        // If nothing was received for more than 5min, close file
+        if (actTime-actrun->lastTime>300)
+            request |= kRequestTimeout;
+
+        // If connection status has changed
+        if (changed)
+            request |= kRequestConnectionChange;
+
+        if (request!=kRequestNone)
+            runFinished();
+
         if (actrun->fileStat==kFileOpen)
-            primaryQueue.post(shared_ptr<EVT_CTRL2>(new EVT_CTRL2(actrun)));
+            primaryQueue.emplace(new EVT_CTRL2(request, actrun));
     }
 
@@ -1322,4 +1415,5 @@
     // Here we also destroy all runCtrl structures and hence close all open files
     evtCtrl.clear();
+    actrun.reset();
 
     factPrintf(MessageImp::kInfo, "Exit read Process...");
@@ -1338,11 +1432,7 @@
     factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
 
-
-    for (int k=0; k<NBOARDS; k++)
-    {
-        gi_NumConnect[k] = 0;
-        gj.numConn[k] = 0;
-        gj.totBytes[k] = 0;
-    }
+    memset(gi_NumConnect, 0, NBOARDS*sizeof(*gi_NumConnect));
+    memset(gj.numConn,    0, NBOARDS*sizeof(*gj.numConn));
+    memset(gj.totBytes,   0, NBOARDS*sizeof(*gj.totBytes));
 
     gj.bufTot   = gj.maxEvt = gj.xxxEvt = 0;
