Index: /trunk/FACT++/src/EventBuilder.cc
===================================================================
--- /trunk/FACT++/src/EventBuilder.cc	(revision 16102)
+++ /trunk/FACT++/src/EventBuilder.cc	(revision 16103)
@@ -8,18 +8,20 @@
 #include <forward_list>
 
+#include <boost/algorithm/string/join.hpp>
+
 #include "queue.h"
 
 #include "MessageImp.h"
+#include "EventBuilder.h"
 
 using namespace std;
 
-#include "EventBuilder.h"
-
-#define MIN_LEN  32           // min #bytes needed to interpret FADheader
-#define MAX_LEN (36*3*1024)   // (data+header)*num channels
-
-#define COMPLETE_EVENTS
+#define MIN_LEN  32    // min #bytes needed to interpret FADheader
+#define MAX_LEN  81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092  (data+boardheader+eventheader+endflag)
+
+//#define COMPLETE_EVENTS
 //#define USE_EPOLL
 //#define USE_SELECT
+//#define COMPLETE_EPOLL
 
 // ==========================================================================
@@ -206,5 +208,5 @@
     void destroy();
     bool create(sockaddr_in addr);
-    void check(int, sockaddr_in addr);
+    bool check(int, sockaddr_in addr);
     bool read();
 };
@@ -246,8 +248,6 @@
 {
 #ifdef USE_EPOLL
-    if (::close(fd_epoll) > 0)
-        factPrintf(MessageImp::kFatal, "Closing epoll: %m (close,rc=%d)", errno);
-    else
-        factPrintf(MessageImp::kInfo, "Succesfully closed epoll");
+    if (fd_epoll>=0 && ::close(fd_epoll)>0)
+        factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno);
 #endif
 
@@ -289,5 +289,5 @@
         factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
 
-    factPrintf(MessageImp::kInfo, "Successfully generated socket %d", sockId);
+    factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket);
 
     //connected = false;
@@ -299,5 +299,5 @@
 void READ_STRUCT::destroy()
 {
-    if (socket==-1)
+    if (socket<0)
         return;
 
@@ -311,5 +311,5 @@
         factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
     else
-        factPrintf(MessageImp::kInfo, "Succesfully closed socket %d", sockId);
+        factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket);
 
     socket = -1;
@@ -318,9 +318,10 @@
 }
 
-void READ_STRUCT::check(int sockDef, sockaddr_in addr)
+bool READ_STRUCT::check(int sockDef, sockaddr_in addr)
 {
     // Continue in the most most likely case (performance)
     //if (socket>=0 && sockDef!=0 && connected)
     //    return;
+    const int old = socket;
 
     // socket open, but should not be open
@@ -332,16 +333,18 @@
         create(addr); //generate address and socket
 
+    const bool retval = old!=socket;
+
     // Socket closed
     if (socket<0)
-        return;
+        return retval;
 
     // Socket open and connected: Nothing to do
     if (connected)
-        return;
+        return retval;
 
     //try to connect if not yet done
     const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
     if (rc == -1)
-        return;
+        return retval;
 
     connected = true;
@@ -362,5 +365,5 @@
     repmem = false;
 
-    factPrintf(MessageImp::kInfo, "New connection %d (%d)", sockId, socket);
+    factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
 
 #ifdef USE_EPOLL
@@ -371,4 +374,6 @@
         factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
 #endif
+
+    return retval;
 }
 
@@ -588,5 +593,5 @@
         actrun->maxEvt = actrun->lastEvt;
 
-        factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d and roi_tm=%d, prev=%d",
+        factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d",
                    rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
 
@@ -601,4 +606,9 @@
         actrun->roi0      = nRoi[0];  // FIXME: Make obsolete!
         actrun->roi8      = nRoi[8];  // FIXME: Make obsolete!
+
+        // Signal the fadctrl that a new run has been started
+        // Note this is the only place at which we can ensure that
+        // gotnewRun is called only once
+        gotNewRun(*actrun);
     }
 
@@ -614,12 +624,4 @@
     // if the event is accessed before it is fully initialized.
     evtCtrl.push_back(evt);
-
-    // Signal the fadctrl that a new run has been started
-    // Note this is the only place at which we can ensure that
-    // gotnewRun is called only once
-    // Note that this will callback CloseRunFile, therefor the event
-    // must already be in the evtCtrl structure
-    if (newrun)
-        gotNewRun(*actrun);
 
     // An event can be the first and the last, but not the last and the first.
@@ -743,6 +745,4 @@
     const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
 
-    bool rc1 = true;
-
     // Is this a valid event or just an empty event to trigger run close?
     // If this is not an empty event open the new run-file
@@ -756,10 +756,10 @@
             if (!runOpen(evt))
             {
-                factPrintf(MessageImp::kError, "writeEvt: Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
+                factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
                 run->fileStat = kFileClosed;
                 return;
             }
 
-            factPrintf(MessageImp::kInfo, "writeEvt: Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
+            factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
             run->fileStat = kFileOpen;
         }
@@ -767,20 +767,25 @@
         // Here we have a valid calibration and can go on with that.
         processingQueue1.post(evt);
-
-        // File already closed
-        if (run->fileStat==kFileClosed)
-            return;
-
+    }
+
+    // File already closed
+    if (run->fileStat==kFileClosed)
+        return;
+
+    bool rc1 = true;
+    if (evt->runNum>=0)
+    {
         rc1 = runWrite(evt);
         if (!rc1)
-            factPrintf(MessageImp::kError, "writeEvt: Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
-    }
-
-    const bool cond1 = run->lastEvt < run->maxEvt;      // max number of events not reached
-    const bool cond2 = run->lastTime < run->closeTime;  // max time not reached
-    const bool cond3 = rc1;                             // Write successfull
+            factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
+    }
+
+    const bool cond1 =  run->lastEvt < run->maxEvt;      // max number of events not reached
+    const bool cond2 =  run->lastTime < run->closeTime;  // max time not reached
+    const bool cond3 =  run->closeRequest==kRequestNone; // file signaled to be closed
+    const bool cond4 =  rc1;                             // Write successfull
 
     // File is not yet to be closed.
-    if (cond1 && cond2 && cond3)
+    if (cond1 && cond2 && cond3 && cond4)
         return;
 
@@ -788,10 +793,22 @@
     run->fileStat = kFileClosed;
 
-    string str;
-    if (!cond1) str += to_string(run->maxEvt)+" evts reached";
-    if (!cond1 && (!cond2 || !cond3)) str += ", ";
-    if (!cond2) str += to_string(run->closeTime-run->openTime)+"s reached";
-    if ((!cond1 || !cond2) && !cond3) str += ", ";
-    if (!cond3) str += "runWrite failed";
+    vector<string> reason;
+    if (!cond1)
+        reason.push_back(to_string(run->maxEvt)+" evts reached");
+    if (!cond2)
+        reason.push_back(to_string(run->closeTime-run->openTime)+"s reached");
+    if (!cond3)
+    {
+        if (run->closeRequest&kRequestManual)
+            reason.push_back("close requested");
+        if (run->closeRequest&kRequestTimeout)
+            reason.push_back("receive timeout");
+        if (run->closeRequest&kRequestConnectionChange)
+            reason.push_back("connection changed");
+    }
+    if (!cond4)
+        reason.push_back("runWrite failed");
+
+    const string str = boost::algorithm::join(reason, ", ");
     factPrintf(MessageImp::kInfo, "File closed because %s",  str.c_str());
 }
@@ -921,5 +938,6 @@
     // is not replaced in the middle of the action
     const shared_ptr<RUN_CTRL2> run = actrun;
-    run->maxEvt = run->lastEvt;
+    if (run)
+        run->closeRequest |= kRequestManual;
 }
 
@@ -1024,6 +1042,6 @@
                 for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
                 {
-                    if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
-                    //if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0xfb01)
+                    //if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
+                    if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0x01fb)
                         break;
                 }
@@ -1143,4 +1161,8 @@
             evt->nBoard++;
 
+#ifdef COMPLETE_EPOLL
+            if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
+                factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
+#endif
             // event not yet complete
             if (evt->nBoard < READ_STRUCT::activeSockets)
@@ -1166,4 +1188,15 @@
             }
 
+#ifdef COMPLETE_EPOLL
+            for (int j=0; j<40; j++)
+            {
+                epoll_event ev;
+                ev.events = EPOLLIN;
+                ev.data.ptr = &rd[j];  // user data (union: ev.ptr)
+                if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
+                    factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
+            }
+#endif
+
 #ifdef COMPLETE_EVENTS
             for (int j=0; j<40; j++)
@@ -1181,17 +1214,9 @@
         // ==================================================================
 
-        // +1 -> idx=0
-        // -1 -> idx=0
-        // +2 -> idx=0
-        // -2 -> idx=0
-        // +3 -> idx=0
-        // -3 -> idx=0
-        // +4 -> idx=0
-        // -4 -> idx=0
-        // +5 -> idx=0
-        // -5 -> idx=0
-        // +6 -> idx=0
-        // -6 -> idx=0
-        //
+        gj.bufNew = evtCtrl.size();      //# incomplete events in buffer
+        gj.bufEvt = primaryQueue.size(); //# complete events in buffer
+        gj.bufTot = gj.bufNew+gj.bufEvt; //# total events currently in buffer
+        if (gj.bufNew>gj.maxEvt)         //# maximum events in buffer past cycle
+            gj.maxEvt = gj.bufNew;
 
         // ==================================================================
@@ -1201,5 +1226,5 @@
         {
 #if !defined(USE_SELECT) && !defined(USE_EPOLL)
-            if (evtCtrl.size()==0)
+            if (evtCtrl.empty())
                 usleep(1);
 #endif
@@ -1211,5 +1236,4 @@
         //loop over all active events and flag those older than read-timeout
         //delete those that are written to disk ....
-        //const int count = evtCtrl.size();
 
         // This could be improved having the pointer which separates the queue with
@@ -1235,9 +1259,37 @@
         }
 
-        // =================================================================
-
         // If nothing was received for more than 5min, close file
         if (actTime-actrun->lastTime>300)
-            actrun->maxEvt = actrun->lastEvt;
+            actrun->closeRequest |= kRequestTimeout;
+
+        // =================================================================
+
+        gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
+        gj.usdMem = Memory::max_inuse;
+        gj.totMem = Memory::allocated;
+
+        gj.deltaT = 1000; // temporary, must be improved
+
+        for (int ib=0; ib<NBOARDS; ib++)
+        {
+            gj.rateBytes[ib]  = rd[ib].rateBytes;
+            gj.totBytes[ib]  += rd[ib].rateBytes;
+
+            if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
+                actrun->closeRequest |= kRequestConnectionChange;
+
+            gi_NumConnect[ib] = rd[ib].connected;
+            gj.numConn[ib]    = rd[ib].connected;
+        }
+
+
+        factStat(gj);
+
+        Memory::max_inuse = 0;
+        gj.maxEvt = 0;
+        for (int ib=0; ib<NBOARDS; ib++)
+            rd[ib].rateBytes = 0;
+
+        // =================================================================
 
         // This is a fake event to trigger possible run-closing conditions once a second
@@ -1247,30 +1299,4 @@
         if (actrun->fileStat==kFileOpen)
             primaryQueue.post(shared_ptr<EVT_CTRL2>(new EVT_CTRL2(actrun)));
-
-        // =================================================================
-
-        gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
-        gj.usdMem = Memory::max_inuse;
-        gj.totMem = Memory::allocated;
-
-        gj.deltaT = 1000; // temporary, must be improved
-
-        for (int ib=0; ib<NBOARDS; ib++)
-        {
-            gj.rateBytes[ib]  = rd[ib].rateBytes;
-            gj.totBytes[ib]  += rd[ib].rateBytes;
-
-            rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr);
-
-            gi_NumConnect[ib] = rd[ib].connected;
-            gj.numConn[ib]    = rd[ib].connected;
-        }
-
-        factStat(gj);
-
-        Memory::max_inuse = 0;
-
-        for (int ib=0; ib<NBOARDS; ib++)
-            rd[ib].rateBytes = 0;
     }
 
@@ -1288,4 +1314,5 @@
     primaryQueue.wait(abort);
     secondaryQueue.wait(abort);
+    processingQueue1.wait(abort);
 
     // Here we also destroy all runCtrl structures and hence close all open files
