Changeset 15495


Ignore:
Timestamp:
05/04/13 11:25:30 (11 years ago)
Author:
tbretz
Message:
Removed some more obsolete code; improved the logic of the processing loops; improved the performance by keeping the index of the corresponding runCtrl entry in each evtCtrl entry; removed a lot of code from the processing loop, which would never have been called like the runOpen in the writing loop; use the previous event for checking the roi in mBufEvt if available (this will increase the performence especially if the buffer is not empty); removed a couple of obsolete variables
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilder.c

    r15494 r15495  
    7373
    7474int g_maxProc;
    75 int g_maxSize;
    76 int gi_maxSize;
    7775int gi_maxProc;
    7876
    7977uint g_actTime;
    80 uint g_actUsec;
    8178int g_runStat;
    8279int g_reset;
    83 int g_useFTM;
    84 
    85 int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
     80
    8681size_t g_maxMem;                //maximum memory allowed for buffer
    8782
    88 //no longer needed ...
    89 int g_maxBoards;                //maximum number of boards to be initialized
    90 int g_actBoards;
    91 //
    92 
    9383FACT_SOCK g_port[NBOARDS];      // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
    9484
    95 
    96 int gi_runStat;
    97 int gp_runStat;
    98 int gw_runStat;
    99 
    100 uint32_t actrun = 0;
    101 
    10285uint gi_NumConnect[NBOARDS];    //4 crates * 10 boards
    10386
    104 EVT_STAT gi;
     87//EVT_STAT gi;
    10588GUI_STAT gj;
    10689
     
    166149    // Add the last free slot to the stack
    167150    TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry));
     151
     152    // FIXME: Really free memory if memory usuage exceeds g_maxMem
    168153
    169154    entry->prev = tgb_last;
     
    220205/*-----------------------------------------------------------------*/
    221206
    222 
    223 
    224 int
    225 runFinish1 (uint32_t runnr)
    226 {
    227    factPrintf(kInfo, 173, "Should finish(1) run %d (but not yet possible)", runnr);
    228    return 0;
    229 }
    230 int
    231 runFinish (uint32_t runnr)
    232 {
    233     factPrintf(kInfo, 173, "Should finish run %d (but not yet possible)", runnr);
    234     return 0;
    235 }
    236207
    237208int
     
    438409    }
    439410
    440     // FIXME: This should be the time of the first receiped board
     411    // If we have already queued at least one event,
     412    // check the roi if the previous event
     413    // already belongs to the same run.
     414
     415    // Get the runCtrl entry of the previous event.
     416    // If none is in the queue (we are anyhow super fast)
     417    // just get the correct entry from the runCtrl array.
     418    int idx = evtCtrl[beg].runCtrl_idx;
     419
     420    // If there is an event in the queue and it has the same runID, we can use
     421    // that event to check for the roi consistency throughout the run
     422    if (evtCtrl_frstPtr!=evtCtrl_lastPtr && runCtrl[idx].runId==runID)
     423    {
     424        // Check if run already registered (old entries should have runId==-1)
     425        if (runCtrl[idx].roi0 != nRoi[0] || runCtrl[idx].roi8 != nRoi[8])
     426        {
     427            factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
     428                       runCtrl[idx].roi0, runCtrl[idx].roi8, nRoi[0], nRoi[8], runID, evID);
     429            return -9301;
     430        }
     431    }
     432
     433    // If there is none in the queue, we have to search for the correct entry
     434    if (evtCtrl_frstPtr==evtCtrl_lastPtr)
     435    {
     436        idx = -1;
     437
     438        for (int k=0; k<MAX_RUN; k++)
     439        {
     440            if (runCtrl[k].runId==runID)
     441            {
     442                idx = k;
     443                break;
     444            }
     445        }
     446    }
     447
    441448    struct timeval tv;
    442     gettimeofday (&tv, NULL);
     449    gettimeofday(&tv, NULL);
    443450
    444451    const uint32_t tsec = tv.tv_sec;
    445452    const uint32_t tusec = tv.tv_usec;
    446453
    447     //check if runId already registered in runCtrl
    448 
    449     uint oldest = g_actTime + 1000;
    450     int jold = -1;
    451 
    452     int found = 0;
    453 
    454     // fileId==-2: not yet used or run assigned but not open
    455     // fileId== 0: file open
    456     // fileId>0:   run closed
    457 
    458     for (int k=0; k<MAX_RUN; k++)
     454    // Run not yet registered, register run
     455    // If we haven't found a corresponding entry in the queue, or the runId has changed
     456    // we find the oldest empty entry in the runCtrl array and create a new runCtrl entry
     457    if (idx<0 || runCtrl[idx].runId!=runID)
    459458    {
    460         // Check if run already registered (old entries should have runId==-1)
    461         if (runCtrl[k].runId == runID)
     459        // If there is none in the queue or
     460        idx = -1;
     461
     462        uint oldest = g_actTime + 1000;
     463        for (int k=0; k<MAX_RUN; k++)
    462464        {
    463             // FIXME: Compare to previous event
    464             if (runCtrl[k].roi0 != nRoi[0] || runCtrl[k].roi8 != nRoi[8])
     465            // This is just for sanity. We use the oldest free entry (until
     466            // we have understood the concept and can use "just" a free entry
     467            if (runCtrl[k].runId==0 && runCtrl[k].closeTime < oldest)
    465468            {
    466                 factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
    467                            runCtrl[k].roi0, runCtrl[k].roi8, nRoi[0], nRoi[8], runID, evID);
    468                 return -9301;
     469                oldest = runCtrl[k].closeTime;
     470                idx = k;
    469471            }
    470 
    471             found = 1;
    472             break;
    473472        }
    474473
    475         // This is just for sanity. We use the oldest free entry (until
    476         // we have understood the concept and can use "just" a free entry
    477         if (runCtrl[k].runId==0 && runCtrl[k].closeTime < oldest)
    478         {
    479             oldest = runCtrl[k].closeTime;
    480             jold = k;
    481         }
    482     }
    483 
    484     if (!found) // Run not yet registered, register run
    485     {
    486         if (jold < 0)
     474        if (idx<0)
    487475        {
    488476            factPrintf(kFatal, 883, "Not able to register the new run %d", runID);
     
    490478        }
    491479
    492         const int evFree = jold;
    493 
    494         factPrintf(kInfo, 503, "New run %d (evID=%d, evFree=%d) registered with roi=%d and roi_tm=%d",
    495                    runID, evID, evFree, nRoi[0], nRoi[8]);
    496 
    497         runCtrl[evFree].runId      = runID;
    498         runCtrl[evFree].roi0       = nRoi[0];  // FIXME: Make obsolete!
    499         runCtrl[evFree].roi8       = nRoi[8];  // FIXME: Make obsolete!
    500         runCtrl[evFree].fileId     = -2;
    501         runCtrl[evFree].procId     = -2;
    502         runCtrl[evFree].lastEvt    = 1;         // Number of events partially started to read
    503         runCtrl[evFree].actEvt     = 0;         // Number of written events (write)
    504         runCtrl[evFree].procEvt    = 0;         // Number of successfully checked events (checkEvent)
    505         runCtrl[evFree].maxEvt     = 999999999; // max number events allowed
    506         runCtrl[evFree].lastTime   = tsec;      // Time when the last event was written
    507         runCtrl[evFree].closeTime  = tsec + 3600 * 24;     //max time allowed
     480        factPrintf(kInfo, 503, "New run %d (evt=%d, idx=%d) registered with roi=%d and roi_tm=%d",
     481                   runID, evID, idx, nRoi[0], nRoi[8]);
     482
     483        runCtrl[idx].runId      = runID;
     484        runCtrl[idx].roi0       = nRoi[0];  // FIXME: Make obsolete!
     485        runCtrl[idx].roi8       = nRoi[8];  // FIXME: Make obsolete!
     486        runCtrl[idx].fileId     = -2;
     487        runCtrl[idx].lastEvt    = 1;         // Number of events partially started to read
     488        runCtrl[idx].actEvt     = 0;         // Number of written events (write)
     489        runCtrl[idx].procEvt    = 0;         // Number of successfully checked events (checkEvent)
     490        runCtrl[idx].maxEvt     = 999999999; // max number events allowed
     491        runCtrl[idx].lastTime   = tsec;      // Time when the last event was written
     492        runCtrl[idx].closeTime  = tsec + 3600 * 24;     //max time allowed
    508493    }
    509494
     
    516501        evtCtrl[k].board[b] = -1;
    517502
     503    evtCtrl[k].runCtrl_idx = idx;
    518504    evtCtrl[k].pcTime[0] = tsec;
    519505    evtCtrl[k].pcTime[1] = tusec;
     
    582568mBufFree (int i)
    583569{
    584 //delete entry [i] from mBuffer:
    585 //(and make sure multiple calls do no harm ....)
    586 
    587570   TGB_free(evtCtrl[i].FADhead);
    588571
     
    597580   gj.bufTot--;
    598581
    599    /*if (gi_memStat < 0) {
    600       if (gj.usdMem <= 0.75 * gj.maxMem)
    601          gi_memStat = +1;
    602    }*/
    603 
    604582   return 0;
    605583
    606584} /*-----------------------------------------------------------------*/
    607 
    608 /*
    609 void
    610 resetEvtStat ()
    611 {
    612    for (int i = 0; i < MAX_SOCK; i++)
    613       gi.numRead[i] = 0;
    614 
    615    for (int i = 0; i < NBOARDS; i++) {
    616       gi.gotByte[i] = 0;
    617       gi.gotErr[i] = 0;
    618 
    619    }
    620 
    621    gi.evtGet = 0;               //#new Start of Events read
    622    gi.evtTot = 0;               //#complete Events read
    623    gi.evtErr = 0;               //#Events with Errors
    624    gi.evtSkp = 0;               //#Events incomplete (timeout)
    625 
    626    gi.procTot = 0;              //#Events processed
    627    gi.procErr = 0;              //#Events showed problem in processing
    628    gi.procTrg = 0;              //#Events accepted by SW trigger
    629    gi.procSkp = 0;              //#Events rejected by SW trigger
    630 
    631    gi.feedTot = 0;              //#Events used for feedBack system
    632    gi.feedErr = 0;              //#Events rejected by feedBack
    633 
    634    gi.wrtTot = 0;               //#Events written to disk
    635    gi.wrtErr = 0;               //#Events with write-error
    636 
    637    gi.runOpen = 0;              //#Runs opened
    638    gi.runClose = 0;             //#Runs closed
    639    gi.runErr = 0;               //#Runs with open/close errors
    640 
    641    return;
    642 }*/ /*-----------------------------------------------------------------*/
    643585
    644586uint64_t reportIncomplete(int id, const char *txt)
     
    762704    READ_STRUCT rd[NBOARDS];       //buffer to read IP and afterwards store in mBuffer
    763705
     706    uint32_t actrun = 0;
     707
    764708   const int minLen = sizeof(PEVNT_HEADER);  //min #bytes needed to check header: full header for debug
    765709
    766    //start.S = 0xFB01;
    767    //stop.S = 0x04FE;
    768710
    769711/* initialize run control logics */
     
    771713      runCtrl[i].runId  =  0;
    772714      runCtrl[i].fileId = -2;
    773       runCtrl[i].procId = -2;
    774    }
     715   }
     716
     717   int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
    775718   gi_resetS = gi_resetR = 9;
    776719
     
    784727   //time in seconds
    785728   uint gi_SecTime = time(NULL);;
     729   g_actTime = gi_SecTime;
    786730
    787731   const int cntsock = 8 - NUMSOCK ;
     
    833777   while (g_runStat >= 0 && g_reset == 0)
    834778   {
    835       gi_runStat = g_runStat;
    836779      gj.readStat = g_runStat;
    837 
    838       struct timeval tv;
    839       gettimeofday (&tv, NULL);
    840       g_actTime = tv.tv_sec;
    841       g_actUsec = tv.tv_usec;
    842 
    843780
    844781      for (int b = 0; b < NBOARDS; b++)
     
    849786
    850787          gi_NumConnect[b] = 0;       //must close all connections
    851           //gi.numConn[b] = 0;
    852788          gj.numConn[b] = 0;
    853789
     
    920856              continue;
    921857
    922           //numok++;
    923 
    924858          if (rd[i].bufLen>0)
    925859          {
     
    933867                  // There was just nothing waiting
    934868                  if (errno==EWOULDBLOCK || errno==EAGAIN)
    935                   {
    936                       //numok--;
    937869                      continue;
    938                   }
    939870
    940871                  factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);
    941                   //gi.gotErr[b]++;
    942872                  continue;
    943873              }
     
    951881                  GenSock(s0, i, 0, NULL, &rd[i]);
    952882
    953                   //gi.gotErr[b]++;
    954 
    955883                  gi_NumConnect[i]-= cntsock ;
    956                   //gi.numConn[b]--;
    957884                  gj.numConn[i]--;
    958885
    959886                  continue;
    960887              }
    961               // Success (jrd > 0)
    962888
    963889              gj.rateBytes[i] += jrd;
     
    1073999              rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04)
    10741000          {
    1075               //gi.evtErr++;
    10761001              factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d",
    10771002                         i, rd[i].evtID, rd[i].fadLen,
     
    11711096          if (evtCtrl[idx].nBoard==1 && evtCtrl[idx].runNum != actrun)
    11721097          {
    1173               // Signal the fadctrl that a new run has been started
     1098             // Signal the fadctrl that a new run has been started
    11741099              gotNewRun(evtCtrl[idx].runNum, NULL);
    11751100
     
    12001125          // This is a non-ideal hack to lower the probability that
    12011126          // in mBufEvt the search for correct entry in runCtrl
    1202           // will not return a super-old entry
     1127          // will not return a super-old entry. I don't want
     1128          // to manipulate that in another thread.
    12031129          for (int ir=0; ir<MAX_RUN; ir++)
    12041130          {
     
    12541180      const int count = (evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT;
    12551181
     1182      // This could be improved having the pointer which separates the queue with
     1183      // the incomplete events from the queue with the complete events
    12561184      for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    12571185      {
     
    12621190
    12631191              // Event has not yet timed out or was reported already
    1264               if (evtCtrl[k0].evtStat==90 || evtCtrl[k0].pcTime[0]/*evtCtrl[k0].lastRecv*/>=g_actTime - 30)
     1192              if (evtCtrl[k0].evtStat==90 || evtCtrl[k0].pcTime[0]>=g_actTime - 30)
    12651193                  continue;
    12661194
     
    12961224              continue;
    12971225          }
    1298 
    1299           // The first events in the queue are either incomplete or
    1300           // can be deleted (processing finished). As soon as we reach the
    1301           // first complete events which processing is pending, we can stop.
    1302           // All other events (if everything works well) must have the same state.
    1303           // FIXME: This only works if we start from lastPtr and go down to frstPtr
    1304           //break;
    13051226      }
    13061227
     
    13521273
    13531274       //inform others we have to quit ....
    1354        gi_runStat  = -11;        //inform all that no update to happen any more
    13551275       gj.readStat = -11;        //inform all that no update to happen any more
    13561276   }
     
    13931313               evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % MAX_EVT;
    13941314               evtCtrl[k0].evtStat = -1;
    1395           }
     1315           }
    13961316
    13971317           usleep(1);
     
    14211341   factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse);
    14221342
    1423    gi_runStat = -99;
    14241343   gj.readStat = -99;
    14251344
    14261345   factStat (gj);
    1427    factStatNew (gi);
     1346   //factStatNew (gi);
    14281347
    14291348   return 0;
     
    14441363        for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    14451364        {
    1446             if (evtCtrl[k0].evtStat != 1000 + threadID)
    1447             {
    1448                 if (evtCtrl[k0].evtStat < 1000 + threadID)
    1449                     numWait++;
    1450 
    1451                 continue;
    1452             }
    1453 
    1454             /*
    1455             // gi_resetR>1 : flush buffers asap
    1456             if (gi_resetR>1)
    1457             {
    1458                 evtCtrl[k].evtStat = 9000;              // flag as 'to be deleted'
    1459                 continue;
    1460 
    1461             }*/
    1462 
    1463             const int jret = subProcEvt(threadID, evtCtrl[k0].FADhead,
    1464                                         evtCtrl[k0].fEvent, NULL/*mBuffer[id].buffer*/);
     1365           // This is a threading issue... the evtStat might have been invalid
     1366           // but the frstPtr is not yet updated
     1367           if (evtCtrl[k0].evtStat==-1)
     1368               continue;
     1369
     1370           // If we find the first event still waiting for processing
     1371           // there will be only unprocessed events after this one in the queue
     1372           if (evtCtrl[k0].evtStat<1000+threadID)
     1373           {
     1374               numWait = 1;
     1375               break;
     1376           }
     1377
     1378           // If the event was processed already, skip it
     1379           // We could replace that to a moving pointer pointing to the first
     1380           // non-processed event
     1381           if (evtCtrl[k0].evtStat!=1000+threadID)
     1382               continue;
     1383
     1384            const int jret = subProcEvt(threadID, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent, 0);
    14651385
    14661386            if (jret>0 && jret<=threadID)
     
    15041424   int status;
    15051425
    1506    int lastRun = 0;             //usually run from last event still valid
    1507 
    1508 //   cpu_set_t mask;
    1509 //   int cpu = 1;                 //process thread  (will be several in final version)
    1510 
    15111426   factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc);
    1512 
    1513 /* CPU_ZERO initializes all the bits in the mask to zero. */
    1514 //   CPU_ZERO (&mask);
    1515 /* CPU_SET sets only the bit corresponding to cpu. */
    1516 // CPU_SET(  0 , &mask );  leave for system
    1517 // CPU_SET(  1 , &mask );  used by write process
    1518 //   CPU_SET (2, &mask);
    1519 //   CPU_SET (3, &mask);
    1520 //   CPU_SET (4, &mask);
    1521 //   CPU_SET (5, &mask);
    1522 //   CPU_SET (6, &mask);
    1523 // CPU_SET(  7 , &mask );  used by read process
    1524 /* sched_setaffinity returns 0 in success */
    1525 //   if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
    1526 //      snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
    1527 //      factOut (kWarn, -1, str);
    1528 //   }
    1529 
    15301427
    15311428   pthread_t thread[100];
     
    15401437   {
    15411438       int numWait = 0;
    1542        int numProc = 0;
    15431439
    15441440       for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    15451441       {
    1546            if (evtCtrl[k0].evtStat<90 || evtCtrl[k0].evtStat>=1000)
     1442           // This is a threading issue... the evtStat might have been invalid
     1443           // but the frstPtr is not yet updated
     1444           if (evtCtrl[k0].evtStat==-1)
     1445               continue;
     1446
     1447           // If we find the first incomplete event which is not supposed to
     1448           // be processed, there are only more incomplete events in the queue
     1449           if (evtCtrl[k0].evtStat<90)
    15471450           {
    1548                if (/*evtCtrl[k0].evtStat>=0 &&*/ evtCtrl[k0].evtStat<90)
    1549                    numWait++;
    1550 
     1451               numWait = 1;
     1452               break;
     1453           }
     1454
     1455           // If the event was processed already, skip it.
     1456           // We could replace that to a moving pointer pointing to the first
     1457           // non-processed event
     1458           if (evtCtrl[k0].evtStat>=1000)
    15511459               continue;
    1552            }
    1553 
    1554            /*
    1555            //we are asked to flush buffers asap
    1556            if (gi_resetR > 1)
    1557            {
    1558                evtCtrl[k0].evtStat = 9000;
    1559                continue;
    1560            }*/
    15611460
    15621461           //-------- it is better to open the run already here, so call can be used to initialize
     
    15651464           const int32_t  ievt = evtCtrl[k0].evNum;
    15661465
    1567            // Find entry in runCtrl which belongs to the event mBuffer[id]
    1568            // (only check if there is a need to check)
    1569            if (runCtrl[lastRun].runId != irun)
     1466           const int idx = evtCtrl[k0].runCtrl_idx;
     1467           if (runCtrl[idx].runId!=irun)
    15701468           {
    1571                //check which fileID to use (or open if needed)
    1572                int j;
    1573                for (j=0;j<MAX_RUN; j++)
    1574                    if (runCtrl[j].runId == irun)
    1575                        break;
    1576 
    1577                if (j>=MAX_RUN)
    1578                {
    1579                    factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, k0);
    1580                    // FIXME: What is the right action? (Flag event for deletion?)
    1581                    continue;
    1582                }
    1583 
    1584                lastRun = j;
     1469               //factPrintf(kFatal, 901, "procEvt: runCtrl entry for run %d vanished (evt=%d)", irun, ievt);
     1470               // FIXME: What is the right action? (Flag event for deletion?)
     1471               continue;
    15851472           }
    15861473
    15871474           // File not yet open
    1588            if (runCtrl[lastRun].fileId < 0)
     1475           if (runCtrl[idx].fileId < 0)
    15891476           {
    1590                //----            we need to open a new run ==> make sure all older runs are
    1591                //----            finished and marked to be closed ....
    1592                // This loop is unique to procEvt
    1593                for (int j=0; j<MAX_RUN; j++)
    1594                {
    1595                    if (runCtrl[j].fileId == 0)
    1596                    {
    1597                        runCtrl[j].procId = 2; //--> do no longer accept events for processing
    1598 
    1599                        //----                  problem: processing still going on ==> must wait for closing ....
    1600                        factPrintf(kInfo, -1, "procEvt: Finished run since new one opened %d", runCtrl[j].runId);
    1601                        runFinish1(runCtrl[j].runId);
    1602                    }
    1603                }
    1604 
    16051477               RUN_HEAD actRun;
    16061478               actRun.Version =  1;
     
    16161488               memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS*sizeof(PEVNT_HEADER));
    16171489
    1618                runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun));
    1619                if (runCtrl[lastRun].fileHd == NULL)
     1490               runCtrl[idx].fileHd = runOpen(irun, &actRun, sizeof (actRun));
     1491               if (runCtrl[idx].fileHd == NULL)
    16201492               {
    1621                    factPrintf(kError, 502, "procEvt: Could not open a file for run %d (runOpen failed)", irun);
    1622                    runCtrl[lastRun].fileId = 91;
    1623                    runCtrl[lastRun].procId = 91;  // Is not set in writeEvt
     1493                   factPrintf(kError, 502, "procEvt: Could not open new file for run %d (idx=%d, evt=%d, runOpen failed)", irun, idx, ievt);
     1494                   runCtrl[idx].fileId = 91;
    16241495                   continue;
    16251496               }
    16261497
    1627                runCtrl[lastRun].fileId = 0;
    1628                runCtrl[lastRun].procId = 0;  // Is not set in writeEvt
    1629 
    1630                factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt);
    1631            }
    1632 
    1633            //-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
    1634            if (runCtrl[lastRun].procId == 0)
    1635            {
    1636                if (runCtrl[lastRun].closeTime < g_actTime ||
    1637                    runCtrl[lastRun].lastTime < g_actTime - 300 ||
    1638                    runCtrl[lastRun].maxEvt <= runCtrl[lastRun].procEvt)
    1639                {
    1640                    factPrintf(kInfo, 502, "procEvt: Reached end of run condition for run %d", irun);
    1641                    runFinish1 (runCtrl[lastRun].runId);
    1642                    runCtrl[lastRun].procId = 1;
    1643                }
    1644            }
    1645 
    1646            // Skip event because of no active run
    1647            if (runCtrl[lastRun].procId != 0)
    1648            {
    1649                evtCtrl[k0].evtStat = 10000; // flag 'to be deleted'
    1650                continue;
     1498               runCtrl[idx].fileId = 0;
     1499
     1500               factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (idx=%d, evt=%d)", irun, idx, ievt);
    16511501           }
    16521502
     
    16741524           }
    16751525
    1676            const int rc = eventCheck(evtCtrl[k0].runNum, evtCtrl[k0].FADhead,
    1677                                      evtCtrl[k0].fEvent);
    1678            //gi.procTot++;
    1679            numProc++;
    1680 
     1526           const int rc = eventCheck(evtCtrl[k0].runNum, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent);
    16811527           if (rc < 0)
    16821528           {
    16831529               evtCtrl[k0].evtStat = 10000;        // flag event to be deleted
    1684                //gi.procErr++;
    16851530           }
    16861531           else
    16871532           {
    16881533               evtCtrl[k0].evtStat = 1000;       // flag 'start processing'
    1689                runCtrl[lastRun].procEvt++;
     1534               runCtrl[idx].procEvt++;
    16901535           }
    16911536       }
     
    16931538       if (gj.readStat < -10 && numWait == 0) {  //nothing left to do
    16941539           factPrintf(kInfo, -1, "Exit Processing Process ...");
    1695            gp_runStat = -22;      //==> we should exit
    16961540           gj.procStat = -22;     //==> we should exit
    16971541           return 0;
    16981542       }
    16991543
    1700        //seems we have nothing to do, so sleep a little
    1701        if (numProc == 0)
    1702            usleep(1);
    1703 
    1704        gp_runStat = gi_runStat;
     1544       usleep(1);
     1545
    17051546       gj.procStat = gj.readStat;
    1706 
    17071547   }
    17081548
     
    17161556   }
    17171557
    1718    /*
    1719    for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    1720    {
    1721        if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat < 1000)
    1722            evtCtrl[k0].evtStat = 9000;    //flag event as 'processed'
    1723    }*/
    1724 
    1725    gp_runStat = -99;
    17261558   gj.procStat = -99;
    17271559
     
    17861618        ii |= 8; // = 4;
    17871619
    1788     if (runCtrl[j].procId == 0)
    1789     {
    1790         runFinish1(runCtrl[j].runId);
    1791         runCtrl[j].procId = 92;
    1792     }
    1793 
    17941620    runCtrl[j].closeTime = g_actTime - 1;
    17951621
     
    18161642/* *** main loop writing event (including opening and closing run-files */
    18171643
    1818 //   cpu_set_t mask;
    1819 //   int cpu = 1;                 //write thread
    1820 
    18211644   factPrintf(kInfo, -1, "Starting write-thread");
    1822 
    1823 /* CPU_ZERO initializes all the bits in the mask to zero. */
    1824 //   CPU_ZERO (&mask);
    1825 /* CPU_SET sets only the bit corresponding to cpu. */
    1826 //   CPU_SET (cpu, &mask);
    1827 /* sched_setaffinity returns 0 in success */
    1828 //   if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
    1829 //      snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
    1830 //   }
    1831 
    1832    int lastRun = 0;             //usually run from last event still valid
    18331645
    18341646   while (g_runStat > -2)
    18351647   {
    1836        int numWrite = 0;
     1648       //int numWrite = 0;
    18371649       int numWait  = 0;
    18381650
     
    18411653       for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    18421654       {
    1843            if (evtCtrl[k0].evtStat<5000 || evtCtrl[k0].evtStat >= 10000)
     1655           // This is a threading issue... the evtStat might have been invalid
     1656           // but the frstPtr is not yet updated
     1657           if (evtCtrl[k0].evtStat==-1)
     1658               continue;
     1659
     1660           // If we find the first non-written event which is not supposed to
     1661           // be written, there are only more incomplete events in the queue
     1662           if (evtCtrl[k0].evtStat<5000)
    18441663           {
    1845                if (/*evtCtrl[k0].evtStat > 0 &&*/ evtCtrl[k0].evtStat < 5000)
    1846                    numWait++;
    1847 
     1664               numWait = 1;
     1665               break;
     1666           }
     1667
     1668           // If the event was written already already, skip it
     1669           // We could replace that to a moving pointer pointing to the first
     1670           // non-processed event
     1671           if (evtCtrl[k0].evtStat!=5000)
     1672               continue;
     1673
     1674           const uint32_t irun = evtCtrl[k0].runNum;
     1675           const int32_t  ievt = evtCtrl[k0].evNum;
     1676
     1677           const int idx = evtCtrl[k0].runCtrl_idx;
     1678
     1679           if (runCtrl[idx].runId!=irun)
     1680           {
     1681               //factPrintf(kFatal, 901, "writeEvt: runCtrl entry for run %d vanished (evt=%d)", irun, ievt);
     1682               // FIXME: What is the right action? (Flag event for deletion?)
    18481683               continue;
    18491684           }
    18501685
    1851            //we must drain the buffer asap
    1852            /*
    1853            if (gi_resetR > 1)
     1686           // File is open
     1687           if (runCtrl[idx].fileId==0)
    18541688           {
    1855                evtCtrl[k0].evtStat = 9000;
    1856                continue;
    1857            }*/
    1858 
    1859            const uint32_t irun = evtCtrl[k0].runNum;
    1860            const int32_t  ievt = evtCtrl[k0].evNum;
    1861 
    1862            // Find entry in runCtrl which belongs to the event mBuffer[id]
    1863            // (only check if there is a need to check)
    1864            if (runCtrl[lastRun].runId != irun)
    1865            {
    1866                //check which fileID to use (or open if needed)
    1867                int j;
    1868                for (j=0;j<MAX_RUN; j++)
    1869                    if (runCtrl[j].runId == irun)
    1870                        break;
    1871 
    1872                if (j>=MAX_RUN)
    1873                {
    1874                    factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, k0);
    1875                    // FIXME: What is the right action?
    1876                    continue;
    1877                }
    1878 
    1879                lastRun = j;
    1880            }
    1881 
    1882            // File not yet open
    1883            if (runCtrl[lastRun].fileId < 0)
    1884            {
    1885                RUN_HEAD actRun;
    1886                actRun.Version =  1;
    1887                actRun.RunType = -1;  //to be adapted
    1888                actRun.Nroi    = evtCtrl[k0].nRoi;     //runCtrl[lastRun].roi0;
    1889                actRun.NroiTM  = evtCtrl[k0].nRoiTM;   //runCtrl[lastRun].roi8;
    1890                actRun.RunTime = evtCtrl[k0].pcTime[0];//runCtrl[lastRun].firstTime;
    1891                actRun.RunUsec = evtCtrl[k0].pcTime[1];//runCtrl[lastRun].firstUsec;
    1892                actRun.NBoard  = NBOARDS;
    1893                actRun.NPix    = NPIX;
    1894                actRun.NTm     = NTMARK;
    1895 
    1896                memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS * sizeof (PEVNT_HEADER));
    1897 
    1898                runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun));
    1899                if (runCtrl[lastRun].fileHd == NULL)
    1900                {
    1901                    factPrintf(kError, 502, "writeEvt: Could not open a file for run %d (runOpen failed)", irun);
    1902                    runCtrl[lastRun].fileId = 91;
    1903                    continue;
    1904                }
    1905 
    1906                runCtrl[lastRun].fileId = 0;
    1907                factPrintf(kInfo, -1, "writeEvt: Opened new file for run %d (evt %d)", irun, ievt);
    1908            }
    1909 
    1910            /*
    1911            if (runCtrl[lastRun].fileId > 0)
    1912            {
    1913                // There is an event but file is already closed
    1914                //if (runCtrl[j].fileId < 100)
    1915                //{
    1916                //    factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun);
    1917                //    runCtrl[j].fileId += 100;
    1918                //}
    1919 
    1920                evtCtrl[k0].evtStat = 9000;
    1921            }*/
    1922 
    1923            // File is open
    1924            if (runCtrl[lastRun].fileId==0)
    1925            {
    1926                const int rc = runWrite(runCtrl[lastRun].fileHd, evtCtrl[k0].fEvent,
    1927                                        0/*sizeof (evtCtrl[k0])*/);
     1689               const int rc = runWrite(runCtrl[idx].fileHd, evtCtrl[k0].fEvent, 0);
    19281690               if (rc >= 0)
    19291691               {
    19301692                   // Sucessfully wrote event
    1931                    runCtrl[lastRun].lastTime = g_actTime;
    1932                    runCtrl[lastRun].actEvt++;
     1693                   runCtrl[idx].lastTime = g_actTime;
     1694                   runCtrl[idx].actEvt++;
    19331695               }
    19341696               else
    19351697                   factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun);
    19361698
    1937                checkAndCloseRun(lastRun, irun, rc<0, 1);
     1699               checkAndCloseRun(idx, irun, rc<0, 1);
    19381700           }
    19391701
    19401702           evtCtrl[k0].evtStat = 10000;  // event written (or has to be discarded) -> delete
    19411703       }
    1942 /*
    1943        //check if we should close a run (mainly when no event pending)
    1944        //ETIENNE but first figure out which one is the latest run with a complete event.
    1945        //i.e. max run Id and lastEvt >= 0
    1946        //this condition is sufficient because all pending events were written already in the loop just above
    1947        //actrun
    1948        uint32_t lastStartedTime = 0;
    1949        uint32_t runIdFound = 1;
    1950 
    1951        //If we have an active run, look for its start time
    1952        if (actrun != 0)
    1953        {
    1954            runIdfound = 0;
    1955            for (int j=0;j<MAX_RUN;j++)
    1956            {
    1957                if (runCtrl[j].runId == actrun)
    1958                {
    1959                    lastStartedTime = runCtrl[j].lastTime;
    1960                    runIdFound = 1;
    1961                }
    1962            }
    1963        }
    1964 
    1965       if (runIdFound == 0)
    1966       {
    1967           factPrintf(kInfo, 0, "An Active run (number %u) has been registered, but it could not be found in the runs list", actrun);
    1968       }
    1969 
    1970        //Also check if some files will never be opened
    1971        //EDIT: this is completely useless, because as run Numbers are taken from FADs board,
    1972        //I will never get run numbers for which no file is to be opened
    1973        for (int j=0;j<MAX_RUN;j++)
    1974        {
    1975            if ((runCtrl[j].fileId < 0) &&
    1976                (runCtrl[j].lastTime < lastStartedTime) &&
    1977                (runCtrl[j].runId != 0))
    1978            {
    1979                factPrintf(kInfo, 0, "writeEvt: No file will be opened for run %u. Last run: %u (started)", runCtrl[j].runId, actrun);
    1980                ;//TODO notify that this run will never be opened
    1981            }
    1982        }
    1983        */
    19841704
    19851705       // Although the are no pending events, we have to check if a run should be closed (timeout)
     
    19941714       }
    19951715
    1996       //seems we have nothing to do, so sleep a little
    1997       if (numWrite == 0)
    1998           usleep(1);
     1716       usleep(1);
    19991717
    20001718       //nothing left to do
     
    20021720       {
    20031721           factPrintf(kInfo, -1, "Finish Write Process ...");
    2004            gw_runStat = -22;      //==> we should exit
    20051722           gj.writStat = -22;     //==> we should exit
    20061723           break;
    20071724       }
    20081725
    2009        gw_runStat = gi_runStat;
    20101726       gj.writStat = gj.readStat;
    20111727   }
     
    20181734   }
    20191735
    2020    gw_runStat = -99;
    20211736   gj.writStat = -99;
    20221737
     
    20371752   struct timespec xwait;
    20381753
    2039    gi_runStat = gp_runStat = gw_runStat = 0;
    20401754   gj.readStat = gj.procStat = gj.writStat = 0;
    20411755
     
    20471761      runCtrl[i].fileId = -2;
    20481762   }
    2049 
    2050 //prepare for subProcesses
    2051    gi_maxSize = g_maxSize;
    2052    if (gi_maxSize <= 0)
    2053       gi_maxSize = 1;
    20541763
    20551764   gi_maxProc = g_maxProc;
     
    22601969
    22611970   g_maxMem = 1024 * 1024;      //MBytes
    2262 //g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
    22631971   g_maxMem = g_maxMem * 200;   //100MBytes
    22641972
    22651973   g_maxProc = 20;
    2266    g_maxSize = 30000;
    22671974
    22681975   g_runStat = 40;
Note: See TracChangeset for help on using the changeset viewer.