Changeset 15483


Ignore:
Timestamp:
05/03/13 22:30:18 (12 years ago)
Author:
tbretz
Message:
Replaced MAX_EVT*MAX_RUN by just MAX_EVT; simplified evtStat to have only the minimum amount of states (easier to read); remove incomplete events which are superseeded by complete events (this might happen if a new run was started)
Location:
trunk/FACT++/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilder.c

    r15476 r15483  
    6969int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
    7070
    71 int evtCtrl_frstPtr;
    72 int evtCtrl_lastPtr;
     71int evtCtrl_frstPtr;   // First event in queue
     72int evtCtrl_lastPtr;   // pointer to next free slot
    7373
    7474int g_maxProc;
     
    105105GUI_STAT gj;
    106106
    107 EVT_CTRL evtCtrl[MAX_EVT * MAX_RUN]; //control of events during processing
     107#define MAX_EVT   65536  // ( 300s @ 220Hz; 16GB = 5000 evt @ roi=1024 (27s) ; 18000 evt @ roi = 300 )
     108#define MAX_RUN       8  // Number of concurrent runs
     109
     110EVT_CTRL evtCtrl[MAX_EVT]; //control of events during processing
    108111
    109112void factPrintf(int severity, int id, const char *fmt, ...)
     
    377380int mBufEvt(const READ_STRUCT *rs)
    378381{
    379 // generate a new Event into mBuffer:   
    380 // make sure only complete Event are possible, so 'free' will always work
    381 // returns index into mBuffer[], or negative value in case of error
    382 // error: <-9000 if roi screwed up (not consistent with run)
    383 //        <-8000                   (not consistent with event)
    384 //        <-7000                   (not consistent with board)
    385 //        < 0    if no space left
    386 
    387382    int nRoi[9];
    388383    if (!checkRoiConsistency(rs->rBuf, nRoi))
     
    395390    const int  fadNum = rs->evtID;
    396391
    397     const int beg = (evtCtrl_lastPtr + MAX_EVT*MAX_RUN - 1) % (MAX_EVT*MAX_RUN);
    398     const int end = (evtCtrl_frstPtr + MAX_EVT*MAX_RUN - 1) % (MAX_EVT*MAX_RUN);
    399 
    400     for (int k=beg; k!=end; k=(k+MAX_EVT*MAX_RUN-1)%(MAX_EVT*MAX_RUN))
     392    const int beg = (evtCtrl_lastPtr + MAX_EVT - 1) % MAX_EVT;
     393    const int end = (evtCtrl_frstPtr + MAX_EVT - 1) % MAX_EVT;
     394
     395    for (int k=beg; k!=end; k=(k+MAX_EVT-1)%MAX_EVT)
    401396    {
    402397        // If the run is different, go on searching.
     
    437432
    438433    // Check if the control structure still has space left
    439     if (end-beg==1 || (end==0 && beg==MAX_EVT*MAX_RUN-1))
     434    if (end-beg==1 || (end==0 && beg==MAX_EVT-1))
    440435    {
    441436        factPrintf(kError, 881, "No control slot to keep event %d (run %d) %d %d", evID, runID, beg, end);
     
    537532    evtCtrl[k].FADhead   = NULL;
    538533
     534    //    -1:   kInValid
     535    //     0:   kValid
     536    //  1-40:   kIncomplete
     537    //    90:   kIncompleteReported
     538    //   100:   kCompleteEventInBuffer
     539    //  1000+x: kToBeProcessedByThreadX
     540    //  5000:   kToBeWritten
     541    // 10000:   kToBeDeleted
     542
    539543    evtCtrl[k].evtStat = 0;
    540544
     
    543547    // in two instructions. Must be done only _after_ the contents
    544548    // have been initialized
    545     evtCtrl_lastPtr = (evtCtrl_lastPtr+1) % (MAX_EVT*MAX_RUN);
     549    evtCtrl_lastPtr = (evtCtrl_lastPtr+1) % MAX_EVT;
    546550
    547551    return k;
     
    638642}*/ /*-----------------------------------------------------------------*/
    639643
    640 void reportIncomplete(int id)
    641 {
    642     factPrintf(kWarn, 601, "%5d skip incomplete evt %8d",
    643                evtCtrl[id].evNum, id);
     644uint64_t reportIncomplete(int id, const char *txt)
     645{
     646    factPrintf(kWarn, 601, "skip incomplete evt (run=%d, evt=%d, %s)",
     647               evtCtrl[id].runNum, evtCtrl[id].evNum, txt);
    644648
    645649    uint64_t report = 0;
     
    660664        }
    661665
    662         // FIXME: Is that really 'b' or should that be 'ib' ?
     666        // FIXME: This is not synchronous... it reports
     667        // accoridng to the current connection status, not w.r.t. to the
     668        // one when the event was taken.
    663669        if (gi_NumConnect[ib]<=0) // board not connected
    664670        {
     
    677683    factOut(kWarn, 601, str);
    678684
    679     factReportIncomplete(report);
     685    return report;
    680686}
    681687
     
    808814
    809815      // initialize mBuffer (mark all entries as unused\empty)
    810       for (int i = 0; i < MAX_EVT * MAX_RUN; i++)
     816      for (int i = 0; i < MAX_EVT; i++)
    811817      {
    812818          evtCtrl[i].evNum = evtCtrl[i].nRoi = -1;
     
    12011207          }
    12021208
     1209          const int beg = (idx             + MAX_EVT - 1) % MAX_EVT;
     1210          const int end = (evtCtrl_frstPtr + MAX_EVT - 1) % MAX_EVT;
     1211
     1212          // we have just completed an event... so all previous events
     1213          // must have been completed already. If they are not, there
     1214          // is no need to wait for the timeout, because they will never
     1215          // get completed. We can just ensure that if we check for the previous
     1216          // event to be complete every time we receive a new complete event.
     1217          // If we find an incomplete one, we remove all consecutive
     1218          // incomplete ones.
     1219          for (int k=beg; k!=end; k=(k+MAX_EVT-1)%MAX_EVT)
     1220          {
     1221              // We are done if we find a complete or fully processed event
     1222              if (evtCtrl[k].evtStat>=100)
     1223                  break;
     1224
     1225              // we do not call factReportIncomplete here, because by starting a new
     1226              // run and having received the first complete event from that run
     1227              // the user has expressed that the old events are obsolste now
     1228              // and the run will be closed anyway
     1229              if (evtCtrl[k].evtStat>=0 && evtCtrl[k].evtStat<90)
     1230              {
     1231                  reportIncomplete(k, "expired");
     1232                  evtCtrl[k].evtStat = 90;
     1233              }
     1234          }
     1235
    12031236          // Flag that the event is ready for processing
    1204           evtCtrl[idx].evtStat = 99;
     1237          evtCtrl[idx].evtStat = 100;
    12051238
    12061239      } // end for loop over all sockets
     
    12191252      //delete those that are written to disk ....
    12201253
    1221       const int count = (evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT*MAX_RUN)%(MAX_EVT*MAX_RUN);
    1222 
    1223       for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
     1254      const int count = (evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT;
     1255
     1256      for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    12241257      {
    12251258          // Check the more likely case first: incomplete events
    1226           if (evtCtrl[k0].evtStat>=0 && evtCtrl[k0].evtStat<92)
     1259          if (evtCtrl[k0].evtStat>=0 && evtCtrl[k0].evtStat<100)
    12271260          {
    12281261              gj.bufNew++;     //incomplete event in Buffer
    12291262
    12301263              // Event has not yet timed out or was reported already
    1231               if (evtCtrl[k0].evtStat>=90 || evtCtrl[k0].pcTime[0]/*evtCtrl[k0].lastRecv*/>=g_actTime - 30)
     1264              if (evtCtrl[k0].evtStat==90 || evtCtrl[k0].pcTime[0]/*evtCtrl[k0].lastRecv*/>=g_actTime - 30)
    12321265                  continue;
    12331266
     
    12351268              // It doesn't matter if that takes comparably long,
    12361269              // because we have to stop the run anyway.
    1237               reportIncomplete(k0);
     1270              const uint64_t rep = reportIncomplete(k0, "timeout");
     1271              factReportIncomplete(rep);
    12381272
    12391273              //timeout for incomplete events
    1240               evtCtrl[k0].evtStat = 91;
     1274              evtCtrl[k0].evtStat = 90;
    12411275              gj.evtSkip++;
    12421276
    12431277              continue;
    12441278          }
    1245 
    1246           // complete event in Buffer
    1247           //if (evtCtrl[k0].evtStat >= 95)
    1248           //    gj.bufEvt++;
    12491279
    12501280          // Check the less likely case: 'useless' or 'delete'
    12511281          // evtState==0 can happen if the event was initialized (some data received)
    12521282          // but the data did not make sense (e.g. inconsistent rois)
    1253           if (evtCtrl[k0].evtStat==0 || evtCtrl[k0].evtStat >= 9000)
     1283          if (evtCtrl[k0].evtStat==0 || evtCtrl[k0].evtStat == 10000)
    12541284          {
    12551285              mBufFree(k0);   //event written--> free memory
    12561286              evtCtrl[k0].evtStat = -1;
    12571287
    1258               if (k0==evtCtrl_frstPtr)
    1259                   evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % (MAX_EVT*MAX_RUN);
    1260               else
    1261                   factPrintf(kError, -1, "Freed a non-first slot");
    1262 
    12631288              gj.evtWrite++;
    12641289              gj.rateWrite++;
    1265 
     1290          }
     1291
     1292          // Remove leading invalidated slots from queue
     1293          if (evtCtrl[evtCtrl_frstPtr].evtStat==-1 && k0==evtCtrl_frstPtr)
     1294          {
     1295              evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % MAX_EVT;
    12661296              continue;
    12671297          }
     
    12711301          // first complete events which processing is pending, we can stop.
    12721302          // All other events (if everything works well) must have the same state.
    1273           break;
     1303          // FIXME: This only works if we start from lastPtr and go down to frstPtr
     1304          //break;
    12741305      }
    12751306
     
    13531384
    13541385           // flag incomplete events as 'read finished'
    1355            if (evtCtrl[k0].evtStat>0 && evtCtrl[k0].evtStat < 90)
    1356                evtCtrl[k0].evtStat = 91;
    1357 
    1358            if (evtCtrl[k0].evtStat==0 || evtCtrl[k0].evtStat>900)
     1386           // We cannot just detele all events, because some might currently being processed,
     1387           // so we have to wait until the processing thread currently processing the event
     1388           // signals that the event can be deleted. (Note, that there are currently never
     1389           // two threads processing the same event at the same time)
     1390           if ((evtCtrl[k0].evtStat>0 && evtCtrl[k0].evtStat<90) || evtCtrl[k0].evtStat==10000)
    13591391           {
    13601392               mBufFree(k0);   //event written--> free memory
     1393               evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % MAX_EVT;
    13611394               evtCtrl[k0].evtStat = -1;
    1362 
    1363                evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % (MAX_EVT*MAX_RUN);
    1364            }
     1395          }
    13651396
    13661397           usleep(1);
     
    14101441    {
    14111442        int numWait = 0;
    1412         int numProc = 0;
    1413 
    1414         for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
     1443
     1444        for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    14151445        {
    14161446            if (evtCtrl[k0].evtStat != 1000 + threadID)
     
    14221452            }
    14231453
    1424             int jret = 9100; // flag to be deleted (gi_resetR>1 : flush buffers asap)
    1425 
    1426             if (gi_resetR<=1)
     1454            /*
     1455            // gi_resetR>1 : flush buffers asap
     1456            if (gi_resetR>1)
    14271457            {
    1428                 jret = subProcEvt(threadID, evtCtrl[k0].FADhead,
    1429                                   evtCtrl[k0].fEvent, NULL/*mBuffer[id].buffer*/);
    1430 
    1431 
    1432                 if (jret <= threadID) {
    1433                     factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret);
    1434                     jret = 5300;
    1435                 } else if (jret <= 0)
    1436                     jret = 9200 + threadID;   // flag as 'to be deleted'
    1437                 else if (jret >= gi_maxProc)
    1438                     jret = 5200 + threadID;   // flag as 'to be written'
    1439                 else
    1440                     jret = 1000 + jret;       // flag for next proces
     1458                evtCtrl[k].evtStat = 9000;              // flag as 'to be deleted'
     1459                continue;
     1460
     1461            }*/
     1462
     1463            const int jret = subProcEvt(threadID, evtCtrl[k0].FADhead,
     1464                                        evtCtrl[k0].fEvent, NULL/*mBuffer[id].buffer*/);
     1465
     1466            if (jret>0 && jret<=threadID)
     1467                factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret);
     1468
     1469            if (jret<=threadID)
     1470            {
     1471                evtCtrl[k0].evtStat = 10000;            // flag as 'to be deleted'
     1472                continue;
    14411473            }
    14421474
    1443             evtCtrl[k0].evtStat = jret;
    1444             numProc++;
     1475            if (jret>=gi_maxProc)
     1476            {
     1477                evtCtrl[k0].evtStat = 5000;            // flag as 'to be written'
     1478                continue;
     1479            }
     1480
     1481            evtCtrl[k0].evtStat = 1000 + jret;       // flag for next proces
    14451482        }
    14461483
     
    14501487        }
    14511488
    1452         //seems we have nothing to do, so sleep a little
    1453         if (numProc == 0)
    1454             usleep(1);
     1489        usleep(1);
    14551490    }
    14561491
     
    15071542       int numProc = 0;
    15081543
    1509        for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
     1544       for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    15101545       {
    1511            if (evtCtrl[k0].evtStat <= 90 || evtCtrl[k0].evtStat >= 1000)
     1546           if (evtCtrl[k0].evtStat<90 || evtCtrl[k0].evtStat>=1000)
    15121547           {
    1513                if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat< 90)
     1548               if (/*evtCtrl[k0].evtStat>=0 &&*/ evtCtrl[k0].evtStat<90)
    15141549                   numWait++;
    15151550
     
    15171552           }
    15181553
     1554           /*
    15191555           //we are asked to flush buffers asap
    15201556           if (gi_resetR > 1)
    15211557           {
    1522                evtCtrl[k0].evtStat = 9991;
     1558               evtCtrl[k0].evtStat = 9000;
    15231559               continue;
    1524            }
     1560           }*/
    15251561
    15261562           //-------- it is better to open the run already here, so call can be used to initialize
     
    16111647           if (runCtrl[lastRun].procId != 0)
    16121648           {
    1613                evtCtrl[k0].evtStat = 9091;
     1649               evtCtrl[k0].evtStat = 10000; // flag 'to be deleted'
    16141650               continue;
    16151651           }
     
    16451681           if (rc < 0)
    16461682           {
    1647                evtCtrl[k0].evtStat = 9999;        //flag event to be skipped
     1683               evtCtrl[k0].evtStat = 10000;        // flag event to be deleted
    16481684               //gi.procErr++;
    16491685           }
    16501686           else
    16511687           {
    1652                evtCtrl[k0].evtStat = 1000;
     1688               evtCtrl[k0].evtStat = 1000;       // flag 'start processing'
    16531689               runCtrl[lastRun].procEvt++;
    16541690           }
     
    16801716   }
    16811717
    1682    for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
     1718   /*
     1719   for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    16831720   {
    16841721       if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat < 1000)
    1685            evtCtrl[k0].evtStat = 9800;    //flag event as 'processed'
    1686    }
     1722           evtCtrl[k0].evtStat = 9000;    //flag event as 'processed'
     1723   }*/
    16871724
    16881725   gp_runStat = -99;
     
    18001837       int numWait  = 0;
    18011838
    1802        for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
     1839       // Note that the current loop does not at all gurantee that
     1840       // the events are written in the correct order.
     1841       for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    18031842       {
    1804            if (evtCtrl[k0].evtStat <= 5000 || evtCtrl[k0].evtStat >= 9000)
     1843           if (evtCtrl[k0].evtStat<5000 || evtCtrl[k0].evtStat >= 10000)
    18051844           {
    1806                if (evtCtrl[k0].evtStat > 0 && evtCtrl[k0].evtStat < 9000)
     1845               if (/*evtCtrl[k0].evtStat > 0 &&*/ evtCtrl[k0].evtStat < 5000)
    18071846                   numWait++;
    18081847
     
    18111850
    18121851           //we must drain the buffer asap
     1852           /*
    18131853           if (gi_resetR > 1)
    18141854           {
    1815                evtCtrl[k0].evtStat = 9904;
     1855               evtCtrl[k0].evtStat = 9000;
    18161856               continue;
    1817            }
     1857           }*/
    18181858
    18191859           const uint32_t irun = evtCtrl[k0].runNum;
     
    18681908           }
    18691909
     1910           /*
    18701911           if (runCtrl[lastRun].fileId > 0)
    18711912           {
    18721913               // There is an event but file is already closed
    1873                /*
    1874                if (runCtrl[j].fileId < 100)
    1875                {
    1876                    factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun);
    1877                    runCtrl[j].fileId += 100;
    1878                }*/
    1879 
    1880                evtCtrl[k0].evtStat = 9903;
    1881            }
     1914               //if (runCtrl[j].fileId < 100)
     1915               //{
     1916               //    factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun);
     1917               //    runCtrl[j].fileId += 100;
     1918               //}
     1919
     1920               evtCtrl[k0].evtStat = 9000;
     1921           }*/
    18821922
    18831923           // File is open
     
    18911931                   runCtrl[lastRun].lastTime = g_actTime;
    18921932                   runCtrl[lastRun].actEvt++;
    1893 
    1894                    evtCtrl[k0].evtStat = 9901;
    18951933               }
    18961934               else
    1897                {
    18981935                   factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun);
    1899                    evtCtrl[k0].evtStat = 9902;
    1900                }
    19011936
    19021937               checkAndCloseRun(lastRun, irun, rc<0, 1);
    19031938           }
     1939
     1940           evtCtrl[k0].evtStat = 10000;  // event written (or has to be discarded) -> delete
    19041941       }
    19051942/*
  • trunk/FACT++/src/FAD.h

    r15474 r15483  
    177177
    178178//---------------------------------------------------------------
    179 
    180 #define MAX_RUN     8
    181 #define MAX_EVT   32768  //don't worry, for events is MAX_RUN*MAX_EVT
    182179
    183180typedef void* FileHandle_t ;
Note: See TracChangeset for help on using the changeset viewer.