Changeset 16103 for trunk


Ignore:
Timestamp:
05/24/13 13:53:35 (12 years ago)
Author:
tbretz
Message:
some imporvements of the output; some improvements to the statistics; added a enum for signals to the file status; also wait for the processingQueue1 at the end of mainloop()
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilder.cc

    r16040 r16103  
    88#include <forward_list>
    99
     10#include <boost/algorithm/string/join.hpp>
     11
    1012#include "queue.h"
    1113
    1214#include "MessageImp.h"
     15#include "EventBuilder.h"
    1316
    1417using namespace std;
    1518
    16 #include "EventBuilder.h"
    17 
    18 #define MIN_LEN  32           // min #bytes needed to interpret FADheader
    19 #define MAX_LEN (36*3*1024)   // (data+header)*num channels
    20 
    21 #define COMPLETE_EVENTS
     19#define MIN_LEN  32    // min #bytes needed to interpret FADheader
     20#define MAX_LEN  81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092  (data+boardheader+eventheader+endflag)
     21
     22//#define COMPLETE_EVENTS
    2223//#define USE_EPOLL
    2324//#define USE_SELECT
     25//#define COMPLETE_EPOLL
    2426
    2527// ==========================================================================
     
    206208    void destroy();
    207209    bool create(sockaddr_in addr);
    208     void check(int, sockaddr_in addr);
     210    bool check(int, sockaddr_in addr);
    209211    bool read();
    210212};
     
    246248{
    247249#ifdef USE_EPOLL
    248     if (::close(fd_epoll) > 0)
    249         factPrintf(MessageImp::kFatal, "Closing epoll: %m (close,rc=%d)", errno);
    250     else
    251         factPrintf(MessageImp::kInfo, "Succesfully closed epoll");
     250    if (fd_epoll>=0 && ::close(fd_epoll)>0)
     251        factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno);
    252252#endif
    253253
     
    289289        factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
    290290
    291     factPrintf(MessageImp::kInfo, "Successfully generated socket %d", sockId);
     291    factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket);
    292292
    293293    //connected = false;
     
    299299void READ_STRUCT::destroy()
    300300{
    301     if (socket==-1)
     301    if (socket<0)
    302302        return;
    303303
     
    311311        factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
    312312    else
    313         factPrintf(MessageImp::kInfo, "Succesfully closed socket %d", sockId);
     313        factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket);
    314314
    315315    socket = -1;
     
    318318}
    319319
    320 void READ_STRUCT::check(int sockDef, sockaddr_in addr)
     320bool READ_STRUCT::check(int sockDef, sockaddr_in addr)
    321321{
    322322    // Continue in the most most likely case (performance)
    323323    //if (socket>=0 && sockDef!=0 && connected)
    324324    //    return;
     325    const int old = socket;
    325326
    326327    // socket open, but should not be open
     
    332333        create(addr); //generate address and socket
    333334
     335    const bool retval = old!=socket;
     336
    334337    // Socket closed
    335338    if (socket<0)
    336         return;
     339        return retval;
    337340
    338341    // Socket open and connected: Nothing to do
    339342    if (connected)
    340         return;
     343        return retval;
    341344
    342345    //try to connect if not yet done
    343346    const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
    344347    if (rc == -1)
    345         return;
     348        return retval;
    346349
    347350    connected = true;
     
    362365    repmem = false;
    363366
    364     factPrintf(MessageImp::kInfo, "New connection %d (%d)", sockId, socket);
     367    factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
    365368
    366369#ifdef USE_EPOLL
     
    371374        factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
    372375#endif
     376
     377    return retval;
    373378}
    374379
     
    588593        actrun->maxEvt = actrun->lastEvt;
    589594
    590         factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d and roi_tm=%d, prev=%d",
     595        factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d",
    591596                   rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
    592597
     
    601606        actrun->roi0      = nRoi[0];  // FIXME: Make obsolete!
    602607        actrun->roi8      = nRoi[8];  // FIXME: Make obsolete!
     608
     609        // Signal the fadctrl that a new run has been started
     610        // Note this is the only place at which we can ensure that
     611        // gotnewRun is called only once
     612        gotNewRun(*actrun);
    603613    }
    604614
     
    614624    // if the event is accessed before it is fully initialized.
    615625    evtCtrl.push_back(evt);
    616 
    617     // Signal the fadctrl that a new run has been started
    618     // Note this is the only place at which we can ensure that
    619     // gotnewRun is called only once
    620     // Note that this will callback CloseRunFile, therefor the event
    621     // must already be in the evtCtrl structure
    622     if (newrun)
    623         gotNewRun(*actrun);
    624626
    625627    // An event can be the first and the last, but not the last and the first.
     
    743745    const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
    744746
    745     bool rc1 = true;
    746 
    747747    // Is this a valid event or just an empty event to trigger run close?
    748748    // If this is not an empty event open the new run-file
     
    756756            if (!runOpen(evt))
    757757            {
    758                 factPrintf(MessageImp::kError, "writeEvt: Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
     758                factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
    759759                run->fileStat = kFileClosed;
    760760                return;
    761761            }
    762762
    763             factPrintf(MessageImp::kInfo, "writeEvt: Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
     763            factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
    764764            run->fileStat = kFileOpen;
    765765        }
     
    767767        // Here we have a valid calibration and can go on with that.
    768768        processingQueue1.post(evt);
    769 
    770         // File already closed
    771         if (run->fileStat==kFileClosed)
    772             return;
    773 
     769    }
     770
     771    // File already closed
     772    if (run->fileStat==kFileClosed)
     773        return;
     774
     775    bool rc1 = true;
     776    if (evt->runNum>=0)
     777    {
    774778        rc1 = runWrite(evt);
    775779        if (!rc1)
    776             factPrintf(MessageImp::kError, "writeEvt: Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
    777     }
    778 
    779     const bool cond1 = run->lastEvt < run->maxEvt;      // max number of events not reached
    780     const bool cond2 = run->lastTime < run->closeTime;  // max time not reached
    781     const bool cond3 = rc1;                             // Write successfull
     780            factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
     781    }
     782
     783    const bool cond1 =  run->lastEvt < run->maxEvt;      // max number of events not reached
     784    const bool cond2 =  run->lastTime < run->closeTime;  // max time not reached
     785    const bool cond3 =  run->closeRequest==kRequestNone; // file signaled to be closed
     786    const bool cond4 =  rc1;                             // Write successfull
    782787
    783788    // File is not yet to be closed.
    784     if (cond1 && cond2 && cond3)
     789    if (cond1 && cond2 && cond3 && cond4)
    785790        return;
    786791
     
    788793    run->fileStat = kFileClosed;
    789794
    790     string str;
    791     if (!cond1) str += to_string(run->maxEvt)+" evts reached";
    792     if (!cond1 && (!cond2 || !cond3)) str += ", ";
    793     if (!cond2) str += to_string(run->closeTime-run->openTime)+"s reached";
    794     if ((!cond1 || !cond2) && !cond3) str += ", ";
    795     if (!cond3) str += "runWrite failed";
     795    vector<string> reason;
     796    if (!cond1)
     797        reason.push_back(to_string(run->maxEvt)+" evts reached");
     798    if (!cond2)
     799        reason.push_back(to_string(run->closeTime-run->openTime)+"s reached");
     800    if (!cond3)
     801    {
     802        if (run->closeRequest&kRequestManual)
     803            reason.push_back("close requested");
     804        if (run->closeRequest&kRequestTimeout)
     805            reason.push_back("receive timeout");
     806        if (run->closeRequest&kRequestConnectionChange)
     807            reason.push_back("connection changed");
     808    }
     809    if (!cond4)
     810        reason.push_back("runWrite failed");
     811
     812    const string str = boost::algorithm::join(reason, ", ");
    796813    factPrintf(MessageImp::kInfo, "File closed because %s",  str.c_str());
    797814}
     
    921938    // is not replaced in the middle of the action
    922939    const shared_ptr<RUN_CTRL2> run = actrun;
    923     run->maxEvt = run->lastEvt;
     940    if (run)
     941        run->closeRequest |= kRequestManual;
    924942}
    925943
     
    10241042                for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
    10251043                {
    1026                     if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
    1027                     //if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0xfb01)
     1044                    //if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
     1045                    if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0x01fb)
    10281046                        break;
    10291047                }
     
    11431161            evt->nBoard++;
    11441162
     1163#ifdef COMPLETE_EPOLL
     1164            if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
     1165                factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
     1166#endif
    11451167            // event not yet complete
    11461168            if (evt->nBoard < READ_STRUCT::activeSockets)
     
    11661188            }
    11671189
     1190#ifdef COMPLETE_EPOLL
     1191            for (int j=0; j<40; j++)
     1192            {
     1193                epoll_event ev;
     1194                ev.events = EPOLLIN;
     1195                ev.data.ptr = &rd[j];  // user data (union: ev.ptr)
     1196                if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
     1197                    factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
     1198            }
     1199#endif
     1200
    11681201#ifdef COMPLETE_EVENTS
    11691202            for (int j=0; j<40; j++)
     
    11811214        // ==================================================================
    11821215
    1183         // +1 -> idx=0
    1184         // -1 -> idx=0
    1185         // +2 -> idx=0
    1186         // -2 -> idx=0
    1187         // +3 -> idx=0
    1188         // -3 -> idx=0
    1189         // +4 -> idx=0
    1190         // -4 -> idx=0
    1191         // +5 -> idx=0
    1192         // -5 -> idx=0
    1193         // +6 -> idx=0
    1194         // -6 -> idx=0
    1195         //
     1216        gj.bufNew = evtCtrl.size();      //# incomplete events in buffer
     1217        gj.bufEvt = primaryQueue.size(); //# complete events in buffer
     1218        gj.bufTot = gj.bufNew+gj.bufEvt; //# total events currently in buffer
     1219        if (gj.bufNew>gj.maxEvt)         //# maximum events in buffer past cycle
     1220            gj.maxEvt = gj.bufNew;
    11961221
    11971222        // ==================================================================
     
    12011226        {
    12021227#if !defined(USE_SELECT) && !defined(USE_EPOLL)
    1203             if (evtCtrl.size()==0)
     1228            if (evtCtrl.empty())
    12041229                usleep(1);
    12051230#endif
     
    12111236        //loop over all active events and flag those older than read-timeout
    12121237        //delete those that are written to disk ....
    1213         //const int count = evtCtrl.size();
    12141238
    12151239        // This could be improved having the pointer which separates the queue with
     
    12351259        }
    12361260
    1237         // =================================================================
    1238 
    12391261        // If nothing was received for more than 5min, close file
    12401262        if (actTime-actrun->lastTime>300)
    1241             actrun->maxEvt = actrun->lastEvt;
     1263            actrun->closeRequest |= kRequestTimeout;
     1264
     1265        // =================================================================
     1266
     1267        gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
     1268        gj.usdMem = Memory::max_inuse;
     1269        gj.totMem = Memory::allocated;
     1270
     1271        gj.deltaT = 1000; // temporary, must be improved
     1272
     1273        for (int ib=0; ib<NBOARDS; ib++)
     1274        {
     1275            gj.rateBytes[ib]  = rd[ib].rateBytes;
     1276            gj.totBytes[ib]  += rd[ib].rateBytes;
     1277
     1278            if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
     1279                actrun->closeRequest |= kRequestConnectionChange;
     1280
     1281            gi_NumConnect[ib] = rd[ib].connected;
     1282            gj.numConn[ib]    = rd[ib].connected;
     1283        }
     1284
     1285
     1286        factStat(gj);
     1287
     1288        Memory::max_inuse = 0;
     1289        gj.maxEvt = 0;
     1290        for (int ib=0; ib<NBOARDS; ib++)
     1291            rd[ib].rateBytes = 0;
     1292
     1293        // =================================================================
    12421294
    12431295        // This is a fake event to trigger possible run-closing conditions once a second
     
    12471299        if (actrun->fileStat==kFileOpen)
    12481300            primaryQueue.post(shared_ptr<EVT_CTRL2>(new EVT_CTRL2(actrun)));
    1249 
    1250         // =================================================================
    1251 
    1252         gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
    1253         gj.usdMem = Memory::max_inuse;
    1254         gj.totMem = Memory::allocated;
    1255 
    1256         gj.deltaT = 1000; // temporary, must be improved
    1257 
    1258         for (int ib=0; ib<NBOARDS; ib++)
    1259         {
    1260             gj.rateBytes[ib]  = rd[ib].rateBytes;
    1261             gj.totBytes[ib]  += rd[ib].rateBytes;
    1262 
    1263             rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr);
    1264 
    1265             gi_NumConnect[ib] = rd[ib].connected;
    1266             gj.numConn[ib]    = rd[ib].connected;
    1267         }
    1268 
    1269         factStat(gj);
    1270 
    1271         Memory::max_inuse = 0;
    1272 
    1273         for (int ib=0; ib<NBOARDS; ib++)
    1274             rd[ib].rateBytes = 0;
    12751301    }
    12761302
     
    12881314    primaryQueue.wait(abort);
    12891315    secondaryQueue.wait(abort);
     1316    processingQueue1.wait(abort);
    12901317
    12911318    // Here we also destroy all runCtrl structures and hence close all open files
Note: See TracChangeset for help on using the changeset viewer.