Changeset 15494 for trunk/FACT++/src


Ignore:
Timestamp:
05/04/13 11:22:54 (12 years ago)
Author:
tbretz
Message:
Reverting to last revision.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilder.c

    r15490 r15494  
    7373
    7474int g_maxProc;
     75int g_maxSize;
     76int gi_maxSize;
    7577int gi_maxProc;
    7678
    7779uint g_actTime;
     80uint g_actUsec;
    7881int g_runStat;
    7982int g_reset;
    80 
     83int g_useFTM;
     84
     85int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
    8186size_t g_maxMem;                //maximum memory allowed for buffer
    8287
     88//no longer needed ...
     89int g_maxBoards;                //maximum number of boards to be initialized
     90int g_actBoards;
     91//
     92
    8393FACT_SOCK g_port[NBOARDS];      // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
    8494
     95
     96int gi_runStat;
     97int gp_runStat;
     98int gw_runStat;
     99
     100uint32_t actrun = 0;
     101
    85102uint gi_NumConnect[NBOARDS];    //4 crates * 10 boards
    86103
    87 //EVT_STAT gi;
     104EVT_STAT gi;
    88105GUI_STAT gj;
    89106
     
    149166    // Add the last free slot to the stack
    150167    TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry));
    151 
    152     // FIXME: Really free memory if memory usuage exceeds g_maxMem
    153168
    154169    entry->prev = tgb_last;
     
    205220/*-----------------------------------------------------------------*/
    206221
     222
     223
     224int
     225runFinish1 (uint32_t runnr)
     226{
     227   factPrintf(kInfo, 173, "Should finish(1) run %d (but not yet possible)", runnr);
     228   return 0;
     229}
     230int
     231runFinish (uint32_t runnr)
     232{
     233    factPrintf(kInfo, 173, "Should finish run %d (but not yet possible)", runnr);
     234    return 0;
     235}
    207236
    208237int
     
    409438    }
    410439
    411     // If we have already queued at least one event,
    412     // check the roi if the previous event
    413     // already belongs to the same run.
    414 
    415     // Get the runCtrl entry of the previous event.
    416     // If none is in the queue (we are anyhow super fast)
    417     // just get the correct entry from the runCtrl array.
    418     int idx = evtCtrl[beg].runCtrl_idx;
    419 
    420     // If there is an event in the queue and it has the same runID, we can use
    421     // that event to check for the roi consistency throughout the run
    422     if (evtCtrl_frstPtr!=evtCtrl_lastPtr && runCtrl[idx].runId==runID)
     440    // FIXME: This should be the time of the first receiped board
     441    struct timeval tv;
     442    gettimeofday (&tv, NULL);
     443
     444    const uint32_t tsec = tv.tv_sec;
     445    const uint32_t tusec = tv.tv_usec;
     446
     447    //check if runId already registered in runCtrl
     448
     449    uint oldest = g_actTime + 1000;
     450    int jold = -1;
     451
     452    int found = 0;
     453
     454    // fileId==-2: not yet used or run assigned but not open
     455    // fileId== 0: file open
     456    // fileId>0:   run closed
     457
     458    for (int k=0; k<MAX_RUN; k++)
    423459    {
    424460        // Check if run already registered (old entries should have runId==-1)
    425         if (runCtrl[idx].roi0 != nRoi[0] || runCtrl[idx].roi8 != nRoi[8])
     461        if (runCtrl[k].runId == runID)
    426462        {
    427             factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
    428                        runCtrl[idx].roi0, runCtrl[idx].roi8, nRoi[0], nRoi[8], runID, evID);
    429             return -9301;
     463            // FIXME: Compare to previous event
     464            if (runCtrl[k].roi0 != nRoi[0] || runCtrl[k].roi8 != nRoi[8])
     465            {
     466                factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
     467                           runCtrl[k].roi0, runCtrl[k].roi8, nRoi[0], nRoi[8], runID, evID);
     468                return -9301;
     469            }
     470
     471            found = 1;
     472            break;
     473        }
     474
     475        // This is just for sanity. We use the oldest free entry (until
     476        // we have understood the concept and can use "just" a free entry
     477        if (runCtrl[k].runId==0 && runCtrl[k].closeTime < oldest)
     478        {
     479            oldest = runCtrl[k].closeTime;
     480            jold = k;
    430481        }
    431482    }
    432483
    433     // If there is none in the queue, we have to search for the correct entry
    434     if (evtCtrl_frstPtr==evtCtrl_lastPtr)
     484    if (!found) // Run not yet registered, register run
    435485    {
    436         idx = -1;
    437 
    438         for (int k=0; k<MAX_RUN; k++)
    439         {
    440             if (runCtrl[k].runId==runID)
    441             {
    442                 idx = k;
    443                 break;
    444             }
    445         }
    446     }
    447 
    448     struct timeval tv;
    449     gettimeofday(&tv, NULL);
    450 
    451     const uint32_t tsec = tv.tv_sec;
    452     const uint32_t tusec = tv.tv_usec;
    453 
    454     // Run not yet registered, register run
    455     // If we haven't found a corresponding entry in the queue, or the runId has changed
    456     // we find the oldest empty entry in the runCtrl array and create a new runCtrl entry
    457     if (idx<0 || runCtrl[idx].runId!=runID)
    458     {
    459         // If there is none in the queue or
    460         idx = -1;
    461 
    462         uint oldest = g_actTime + 1000;
    463         for (int k=0; k<MAX_RUN; k++)
    464         {
    465             // This is just for sanity. We use the oldest free entry (until
    466             // we have understood the concept and can use "just" a free entry
    467             if (runCtrl[k].runId==0 && runCtrl[k].closeTime < oldest)
    468             {
    469                 oldest = runCtrl[k].closeTime;
    470                 idx = k;
    471             }
    472         }
    473 
    474         if (idx<0)
     486        if (jold < 0)
    475487        {
    476488            factPrintf(kFatal, 883, "Not able to register the new run %d", runID);
     
    478490        }
    479491
    480         factPrintf(kInfo, 503, "New run %d (evt=%d, idx=%d) registered with roi=%d and roi_tm=%d",
    481                    runID, evID, idx, nRoi[0], nRoi[8]);
    482 
    483         runCtrl[idx].runId      = runID;
    484         runCtrl[idx].roi0       = nRoi[0];  // FIXME: Make obsolete!
    485         runCtrl[idx].roi8       = nRoi[8];  // FIXME: Make obsolete!
    486         runCtrl[idx].fileId     = -2;
    487         runCtrl[idx].lastEvt    = 1;         // Number of events partially started to read
    488         runCtrl[idx].actEvt     = 0;         // Number of written events (write)
    489         runCtrl[idx].procEvt    = 0;         // Number of successfully checked events (checkEvent)
    490         runCtrl[idx].maxEvt     = 999999999; // max number events allowed
    491         runCtrl[idx].lastTime   = tsec;      // Time when the last event was written
    492         runCtrl[idx].closeTime  = tsec + 3600 * 24;     //max time allowed
     492        const int evFree = jold;
     493
     494        factPrintf(kInfo, 503, "New run %d (evID=%d, evFree=%d) registered with roi=%d and roi_tm=%d",
     495                   runID, evID, evFree, nRoi[0], nRoi[8]);
     496
     497        runCtrl[evFree].runId      = runID;
     498        runCtrl[evFree].roi0       = nRoi[0];  // FIXME: Make obsolete!
     499        runCtrl[evFree].roi8       = nRoi[8];  // FIXME: Make obsolete!
     500        runCtrl[evFree].fileId     = -2;
     501        runCtrl[evFree].procId     = -2;
     502        runCtrl[evFree].lastEvt    = 1;         // Number of events partially started to read
     503        runCtrl[evFree].actEvt     = 0;         // Number of written events (write)
     504        runCtrl[evFree].procEvt    = 0;         // Number of successfully checked events (checkEvent)
     505        runCtrl[evFree].maxEvt     = 999999999; // max number events allowed
     506        runCtrl[evFree].lastTime   = tsec;      // Time when the last event was written
     507        runCtrl[evFree].closeTime  = tsec + 3600 * 24;     //max time allowed
    493508    }
    494509
     
    501516        evtCtrl[k].board[b] = -1;
    502517
    503     evtCtrl[k].runCtrl_idx = idx;
    504518    evtCtrl[k].pcTime[0] = tsec;
    505519    evtCtrl[k].pcTime[1] = tusec;
     
    568582mBufFree (int i)
    569583{
     584//delete entry [i] from mBuffer:
     585//(and make sure multiple calls do no harm ....)
     586
    570587   TGB_free(evtCtrl[i].FADhead);
    571588
     
    580597   gj.bufTot--;
    581598
     599   /*if (gi_memStat < 0) {
     600      if (gj.usdMem <= 0.75 * gj.maxMem)
     601         gi_memStat = +1;
     602   }*/
     603
    582604   return 0;
    583605
    584606} /*-----------------------------------------------------------------*/
     607
     608/*
     609void
     610resetEvtStat ()
     611{
     612   for (int i = 0; i < MAX_SOCK; i++)
     613      gi.numRead[i] = 0;
     614
     615   for (int i = 0; i < NBOARDS; i++) {
     616      gi.gotByte[i] = 0;
     617      gi.gotErr[i] = 0;
     618
     619   }
     620
     621   gi.evtGet = 0;               //#new Start of Events read
     622   gi.evtTot = 0;               //#complete Events read
     623   gi.evtErr = 0;               //#Events with Errors
     624   gi.evtSkp = 0;               //#Events incomplete (timeout)
     625
     626   gi.procTot = 0;              //#Events processed
     627   gi.procErr = 0;              //#Events showed problem in processing
     628   gi.procTrg = 0;              //#Events accepted by SW trigger
     629   gi.procSkp = 0;              //#Events rejected by SW trigger
     630
     631   gi.feedTot = 0;              //#Events used for feedBack system
     632   gi.feedErr = 0;              //#Events rejected by feedBack
     633
     634   gi.wrtTot = 0;               //#Events written to disk
     635   gi.wrtErr = 0;               //#Events with write-error
     636
     637   gi.runOpen = 0;              //#Runs opened
     638   gi.runClose = 0;             //#Runs closed
     639   gi.runErr = 0;               //#Runs with open/close errors
     640
     641   return;
     642}*/ /*-----------------------------------------------------------------*/
    585643
    586644uint64_t reportIncomplete(int id, const char *txt)
     
    704762    READ_STRUCT rd[NBOARDS];       //buffer to read IP and afterwards store in mBuffer
    705763
    706     uint32_t actrun = 0;
    707 
    708764   const int minLen = sizeof(PEVNT_HEADER);  //min #bytes needed to check header: full header for debug
    709765
     766   //start.S = 0xFB01;
     767   //stop.S = 0x04FE;
    710768
    711769/* initialize run control logics */
     
    713771      runCtrl[i].runId  =  0;
    714772      runCtrl[i].fileId = -2;
    715    }
    716 
    717    int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
     773      runCtrl[i].procId = -2;
     774   }
    718775   gi_resetS = gi_resetR = 9;
    719776
     
    727784   //time in seconds
    728785   uint gi_SecTime = time(NULL);;
    729    g_actTime = gi_SecTime;
    730786
    731787   const int cntsock = 8 - NUMSOCK ;
     
    777833   while (g_runStat >= 0 && g_reset == 0)
    778834   {
     835      gi_runStat = g_runStat;
    779836      gj.readStat = g_runStat;
     837
     838      struct timeval tv;
     839      gettimeofday (&tv, NULL);
     840      g_actTime = tv.tv_sec;
     841      g_actUsec = tv.tv_usec;
     842
    780843
    781844      for (int b = 0; b < NBOARDS; b++)
     
    786849
    787850          gi_NumConnect[b] = 0;       //must close all connections
     851          //gi.numConn[b] = 0;
    788852          gj.numConn[b] = 0;
    789853
     
    856920              continue;
    857921
     922          //numok++;
     923
    858924          if (rd[i].bufLen>0)
    859925          {
     
    867933                  // There was just nothing waiting
    868934                  if (errno==EWOULDBLOCK || errno==EAGAIN)
     935                  {
     936                      //numok--;
    869937                      continue;
     938                  }
    870939
    871940                  factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);
     941                  //gi.gotErr[b]++;
    872942                  continue;
    873943              }
     
    881951                  GenSock(s0, i, 0, NULL, &rd[i]);
    882952
     953                  //gi.gotErr[b]++;
     954
    883955                  gi_NumConnect[i]-= cntsock ;
     956                  //gi.numConn[b]--;
    884957                  gj.numConn[i]--;
    885958
    886959                  continue;
    887960              }
     961              // Success (jrd > 0)
    888962
    889963              gj.rateBytes[i] += jrd;
     
    9991073              rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04)
    10001074          {
     1075              //gi.evtErr++;
    10011076              factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d",
    10021077                         i, rd[i].evtID, rd[i].fadLen,
     
    10961171          if (evtCtrl[idx].nBoard==1 && evtCtrl[idx].runNum != actrun)
    10971172          {
    1098              // Signal the fadctrl that a new run has been started
     1173              // Signal the fadctrl that a new run has been started
    10991174              gotNewRun(evtCtrl[idx].runNum, NULL);
    11001175
     
    11251200          // This is a non-ideal hack to lower the probability that
    11261201          // in mBufEvt the search for correct entry in runCtrl
    1127           // will not return a super-old entry. I don't want
    1128           // to manipulate that in another thread.
     1202          // will not return a super-old entry
    11291203          for (int ir=0; ir<MAX_RUN; ir++)
    11301204          {
     
    11801254      const int count = (evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT;
    11811255
    1182       // This could be improved having the pointer which separates the queue with
    1183       // the incomplete events from the queue with the complete events
    11841256      for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    11851257      {
     
    11901262
    11911263              // Event has not yet timed out or was reported already
    1192               if (evtCtrl[k0].evtStat==90 || evtCtrl[k0].pcTime[0]>=g_actTime - 30)
     1264              if (evtCtrl[k0].evtStat==90 || evtCtrl[k0].pcTime[0]/*evtCtrl[k0].lastRecv*/>=g_actTime - 30)
    11931265                  continue;
    11941266
     
    12241296              continue;
    12251297          }
     1298
     1299          // The first events in the queue are either incomplete or
     1300          // can be deleted (processing finished). As soon as we reach the
     1301          // first complete events which processing is pending, we can stop.
     1302          // All other events (if everything works well) must have the same state.
     1303          // FIXME: This only works if we start from lastPtr and go down to frstPtr
     1304          //break;
    12261305      }
    12271306
     
    12731352
    12741353       //inform others we have to quit ....
     1354       gi_runStat  = -11;        //inform all that no update to happen any more
    12751355       gj.readStat = -11;        //inform all that no update to happen any more
    12761356   }
     
    13131393               evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % MAX_EVT;
    13141394               evtCtrl[k0].evtStat = -1;
    1315            }
     1395          }
    13161396
    13171397           usleep(1);
     
    13411421   factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse);
    13421422
     1423   gi_runStat = -99;
    13431424   gj.readStat = -99;
    13441425
    13451426   factStat (gj);
    1346    //factStatNew (gi);
     1427   factStatNew (gi);
    13471428
    13481429   return 0;
     
    13631444        for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    13641445        {
    1365            // This is a threading issue... the evtStat might have been invalid
    1366            // but the frstPtr is not yet updated
    1367            if (evtCtrl[k0].evtStat==-1)
    1368                continue;
    1369 
    1370            // If we find the first event still waiting for processing
    1371            // there will be only unprocessed events after this one in the queue
    1372            if (evtCtrl[k0].evtStat<1000+threadID)
    1373            {
    1374                numWait = 1;
    1375                break;
    1376            }
    1377 
    1378            // If the event was processed already, skip it
    1379            // We could replace that to a moving pointer pointing to the first
    1380            // non-processed event
    1381            if (evtCtrl[k0].evtStat!=1000+threadID)
    1382                continue;
    1383 
    1384             const int jret = subProcEvt(threadID, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent, 0);
     1446            if (evtCtrl[k0].evtStat != 1000 + threadID)
     1447            {
     1448                if (evtCtrl[k0].evtStat < 1000 + threadID)
     1449                    numWait++;
     1450
     1451                continue;
     1452            }
     1453
     1454            /*
     1455            // gi_resetR>1 : flush buffers asap
     1456            if (gi_resetR>1)
     1457            {
     1458                evtCtrl[k].evtStat = 9000;              // flag as 'to be deleted'
     1459                continue;
     1460
     1461            }*/
     1462
     1463            const int jret = subProcEvt(threadID, evtCtrl[k0].FADhead,
     1464                                        evtCtrl[k0].fEvent, NULL/*mBuffer[id].buffer*/);
    13851465
    13861466            if (jret>0 && jret<=threadID)
     
    14241504   int status;
    14251505
     1506   int lastRun = 0;             //usually run from last event still valid
     1507
     1508//   cpu_set_t mask;
     1509//   int cpu = 1;                 //process thread  (will be several in final version)
     1510
    14261511   factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc);
     1512
     1513/* CPU_ZERO initializes all the bits in the mask to zero. */
     1514//   CPU_ZERO (&mask);
     1515/* CPU_SET sets only the bit corresponding to cpu. */
     1516// CPU_SET(  0 , &mask );  leave for system
     1517// CPU_SET(  1 , &mask );  used by write process
     1518//   CPU_SET (2, &mask);
     1519//   CPU_SET (3, &mask);
     1520//   CPU_SET (4, &mask);
     1521//   CPU_SET (5, &mask);
     1522//   CPU_SET (6, &mask);
     1523// CPU_SET(  7 , &mask );  used by read process
     1524/* sched_setaffinity returns 0 in success */
     1525//   if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
     1526//      snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
     1527//      factOut (kWarn, -1, str);
     1528//   }
     1529
    14271530
    14281531   pthread_t thread[100];
     
    14371540   {
    14381541       int numWait = 0;
     1542       int numProc = 0;
    14391543
    14401544       for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    14411545       {
    1442            // This is a threading issue... the evtStat might have been invalid
    1443            // but the frstPtr is not yet updated
    1444            if (evtCtrl[k0].evtStat==-1)
     1546           if (evtCtrl[k0].evtStat<90 || evtCtrl[k0].evtStat>=1000)
     1547           {
     1548               if (/*evtCtrl[k0].evtStat>=0 &&*/ evtCtrl[k0].evtStat<90)
     1549                   numWait++;
     1550
    14451551               continue;
    1446 
    1447            // If we find the first incomplete event which is not supposed to
    1448            // be processed, there are only more incomplete events in the queue
    1449            if (evtCtrl[k0].evtStat<90)
     1552           }
     1553
     1554           /*
     1555           //we are asked to flush buffers asap
     1556           if (gi_resetR > 1)
    14501557           {
    1451                numWait = 1;
    1452                break;
    1453            }
    1454 
    1455            // If the event was processed already, skip it.
    1456            // We could replace that to a moving pointer pointing to the first
    1457            // non-processed event
    1458            if (evtCtrl[k0].evtStat>=1000)
     1558               evtCtrl[k0].evtStat = 9000;
    14591559               continue;
     1560           }*/
    14601561
    14611562           //-------- it is better to open the run already here, so call can be used to initialize
     
    14641565           const int32_t  ievt = evtCtrl[k0].evNum;
    14651566
    1466            const int idx = evtCtrl[k0].runCtrl_idx;
    1467            if (runCtrl[idx].runId!=irun)
     1567           // Find entry in runCtrl which belongs to the event mBuffer[id]
     1568           // (only check if there is a need to check)
     1569           if (runCtrl[lastRun].runId != irun)
    14681570           {
    1469                //factPrintf(kFatal, 901, "procEvt: runCtrl entry for run %d vanished (evt=%d)", irun, ievt);
    1470                // FIXME: What is the right action? (Flag event for deletion?)
    1471                continue;
     1571               //check which fileID to use (or open if needed)
     1572               int j;
     1573               for (j=0;j<MAX_RUN; j++)
     1574                   if (runCtrl[j].runId == irun)
     1575                       break;
     1576
     1577               if (j>=MAX_RUN)
     1578               {
     1579                   factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, k0);
     1580                   // FIXME: What is the right action? (Flag event for deletion?)
     1581                   continue;
     1582               }
     1583
     1584               lastRun = j;
    14721585           }
    14731586
    14741587           // File not yet open
    1475            if (runCtrl[idx].fileId < 0)
     1588           if (runCtrl[lastRun].fileId < 0)
    14761589           {
     1590               //----            we need to open a new run ==> make sure all older runs are
     1591               //----            finished and marked to be closed ....
     1592               // This loop is unique to procEvt
     1593               for (int j=0; j<MAX_RUN; j++)
     1594               {
     1595                   if (runCtrl[j].fileId == 0)
     1596                   {
     1597                       runCtrl[j].procId = 2; //--> do no longer accept events for processing
     1598
     1599                       //----                  problem: processing still going on ==> must wait for closing ....
     1600                       factPrintf(kInfo, -1, "procEvt: Finished run since new one opened %d", runCtrl[j].runId);
     1601                       runFinish1(runCtrl[j].runId);
     1602                   }
     1603               }
     1604
    14771605               RUN_HEAD actRun;
    14781606               actRun.Version =  1;
     
    14881616               memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS*sizeof(PEVNT_HEADER));
    14891617
    1490                runCtrl[idx].fileHd = runOpen(irun, &actRun, sizeof (actRun));
    1491                if (runCtrl[idx].fileHd == NULL)
     1618               runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun));
     1619               if (runCtrl[lastRun].fileHd == NULL)
    14921620               {
    1493                    factPrintf(kError, 502, "procEvt: Could not open new file for run %d (idx=%d, evt=%d, runOpen failed)", irun, idx, ievt);
    1494                    runCtrl[idx].fileId = 91;
     1621                   factPrintf(kError, 502, "procEvt: Could not open a file for run %d (runOpen failed)", irun);
     1622                   runCtrl[lastRun].fileId = 91;
     1623                   runCtrl[lastRun].procId = 91;  // Is not set in writeEvt
    14951624                   continue;
    14961625               }
    14971626
    1498                runCtrl[idx].fileId = 0;
    1499 
    1500                factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (idx=%d, evt=%d)", irun, idx, ievt);
     1627               runCtrl[lastRun].fileId = 0;
     1628               runCtrl[lastRun].procId = 0;  // Is not set in writeEvt
     1629
     1630               factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt);
     1631           }
     1632
     1633           //-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
     1634           if (runCtrl[lastRun].procId == 0)
     1635           {
     1636               if (runCtrl[lastRun].closeTime < g_actTime ||
     1637                   runCtrl[lastRun].lastTime < g_actTime - 300 ||
     1638                   runCtrl[lastRun].maxEvt <= runCtrl[lastRun].procEvt)
     1639               {
     1640                   factPrintf(kInfo, 502, "procEvt: Reached end of run condition for run %d", irun);
     1641                   runFinish1 (runCtrl[lastRun].runId);
     1642                   runCtrl[lastRun].procId = 1;
     1643               }
     1644           }
     1645
     1646           // Skip event because of no active run
     1647           if (runCtrl[lastRun].procId != 0)
     1648           {
     1649               evtCtrl[k0].evtStat = 10000; // flag 'to be deleted'
     1650               continue;
    15011651           }
    15021652
     
    15241674           }
    15251675
    1526            const int rc = eventCheck(evtCtrl[k0].runNum, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent);
     1676           const int rc = eventCheck(evtCtrl[k0].runNum, evtCtrl[k0].FADhead,
     1677                                     evtCtrl[k0].fEvent);
     1678           //gi.procTot++;
     1679           numProc++;
     1680
    15271681           if (rc < 0)
    15281682           {
    15291683               evtCtrl[k0].evtStat = 10000;        // flag event to be deleted
     1684               //gi.procErr++;
    15301685           }
    15311686           else
    15321687           {
    15331688               evtCtrl[k0].evtStat = 1000;       // flag 'start processing'
    1534                runCtrl[idx].procEvt++;
     1689               runCtrl[lastRun].procEvt++;
    15351690           }
    15361691       }
     
    15381693       if (gj.readStat < -10 && numWait == 0) {  //nothing left to do
    15391694           factPrintf(kInfo, -1, "Exit Processing Process ...");
     1695           gp_runStat = -22;      //==> we should exit
    15401696           gj.procStat = -22;     //==> we should exit
    15411697           return 0;
    15421698       }
    15431699
    1544        usleep(1);
    1545 
     1700       //seems we have nothing to do, so sleep a little
     1701       if (numProc == 0)
     1702           usleep(1);
     1703
     1704       gp_runStat = gi_runStat;
    15461705       gj.procStat = gj.readStat;
     1706
    15471707   }
    15481708
     
    15561716   }
    15571717
     1718   /*
     1719   for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
     1720   {
     1721       if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat < 1000)
     1722           evtCtrl[k0].evtStat = 9000;    //flag event as 'processed'
     1723   }*/
     1724
     1725   gp_runStat = -99;
    15581726   gj.procStat = -99;
    15591727
     
    16181786        ii |= 8; // = 4;
    16191787
     1788    if (runCtrl[j].procId == 0)
     1789    {
     1790        runFinish1(runCtrl[j].runId);
     1791        runCtrl[j].procId = 92;
     1792    }
     1793
    16201794    runCtrl[j].closeTime = g_actTime - 1;
    16211795
     
    16421816/* *** main loop writing event (including opening and closing run-files */
    16431817
     1818//   cpu_set_t mask;
     1819//   int cpu = 1;                 //write thread
     1820
    16441821   factPrintf(kInfo, -1, "Starting write-thread");
     1822
     1823/* CPU_ZERO initializes all the bits in the mask to zero. */
     1824//   CPU_ZERO (&mask);
     1825/* CPU_SET sets only the bit corresponding to cpu. */
     1826//   CPU_SET (cpu, &mask);
     1827/* sched_setaffinity returns 0 in success */
     1828//   if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
     1829//      snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
     1830//   }
     1831
     1832   int lastRun = 0;             //usually run from last event still valid
    16451833
    16461834   while (g_runStat > -2)
    16471835   {
    1648        //int numWrite = 0;
     1836       int numWrite = 0;
    16491837       int numWait  = 0;
    16501838
     
    16531841       for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
    16541842       {
    1655            // This is a threading issue... the evtStat might have been invalid
    1656            // but the frstPtr is not yet updated
    1657            if (evtCtrl[k0].evtStat==-1)
     1843           if (evtCtrl[k0].evtStat<5000 || evtCtrl[k0].evtStat >= 10000)
     1844           {
     1845               if (/*evtCtrl[k0].evtStat > 0 &&*/ evtCtrl[k0].evtStat < 5000)
     1846                   numWait++;
     1847
    16581848               continue;
    1659 
    1660            // If we find the first non-written event which is not supposed to
    1661            // be written, there are only more incomplete events in the queue
    1662            if (evtCtrl[k0].evtStat<5000)
     1849           }
     1850
     1851           //we must drain the buffer asap
     1852           /*
     1853           if (gi_resetR > 1)
    16631854           {
    1664                numWait = 1;
    1665                break;
    1666            }
    1667 
    1668            // If the event was written already already, skip it
    1669            // We could replace that to a moving pointer pointing to the first
    1670            // non-processed event
    1671            if (evtCtrl[k0].evtStat!=5000)
     1855               evtCtrl[k0].evtStat = 9000;
    16721856               continue;
     1857           }*/
    16731858
    16741859           const uint32_t irun = evtCtrl[k0].runNum;
    16751860           const int32_t  ievt = evtCtrl[k0].evNum;
    16761861
    1677            const int idx = evtCtrl[k0].runCtrl_idx;
    1678 
    1679            if (runCtrl[idx].runId!=irun)
     1862           // Find entry in runCtrl which belongs to the event mBuffer[id]
     1863           // (only check if there is a need to check)
     1864           if (runCtrl[lastRun].runId != irun)
    16801865           {
    1681                //factPrintf(kFatal, 901, "writeEvt: runCtrl entry for run %d vanished (evt=%d)", irun, ievt);
    1682                // FIXME: What is the right action? (Flag event for deletion?)
    1683                continue;
     1866               //check which fileID to use (or open if needed)
     1867               int j;
     1868               for (j=0;j<MAX_RUN; j++)
     1869                   if (runCtrl[j].runId == irun)
     1870                       break;
     1871
     1872               if (j>=MAX_RUN)
     1873               {
     1874                   factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, k0);
     1875                   // FIXME: What is the right action?
     1876                   continue;
     1877               }
     1878
     1879               lastRun = j;
    16841880           }
    16851881
     1882           // File not yet open
     1883           if (runCtrl[lastRun].fileId < 0)
     1884           {
     1885               RUN_HEAD actRun;
     1886               actRun.Version =  1;
     1887               actRun.RunType = -1;  //to be adapted
     1888               actRun.Nroi    = evtCtrl[k0].nRoi;     //runCtrl[lastRun].roi0;
     1889               actRun.NroiTM  = evtCtrl[k0].nRoiTM;   //runCtrl[lastRun].roi8;
     1890               actRun.RunTime = evtCtrl[k0].pcTime[0];//runCtrl[lastRun].firstTime;
     1891               actRun.RunUsec = evtCtrl[k0].pcTime[1];//runCtrl[lastRun].firstUsec;
     1892               actRun.NBoard  = NBOARDS;
     1893               actRun.NPix    = NPIX;
     1894               actRun.NTm     = NTMARK;
     1895
     1896               memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS * sizeof (PEVNT_HEADER));
     1897
     1898               runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun));
     1899               if (runCtrl[lastRun].fileHd == NULL)
     1900               {
     1901                   factPrintf(kError, 502, "writeEvt: Could not open a file for run %d (runOpen failed)", irun);
     1902                   runCtrl[lastRun].fileId = 91;
     1903                   continue;
     1904               }
     1905
     1906               runCtrl[lastRun].fileId = 0;
     1907               factPrintf(kInfo, -1, "writeEvt: Opened new file for run %d (evt %d)", irun, ievt);
     1908           }
     1909
     1910           /*
     1911           if (runCtrl[lastRun].fileId > 0)
     1912           {
     1913               // There is an event but file is already closed
     1914               //if (runCtrl[j].fileId < 100)
     1915               //{
     1916               //    factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun);
     1917               //    runCtrl[j].fileId += 100;
     1918               //}
     1919
     1920               evtCtrl[k0].evtStat = 9000;
     1921           }*/
     1922
    16861923           // File is open
    1687            if (runCtrl[idx].fileId==0)
     1924           if (runCtrl[lastRun].fileId==0)
    16881925           {
    1689                const int rc = runWrite(runCtrl[idx].fileHd, evtCtrl[k0].fEvent, 0);
     1926               const int rc = runWrite(runCtrl[lastRun].fileHd, evtCtrl[k0].fEvent,
     1927                                       0/*sizeof (evtCtrl[k0])*/);
    16901928               if (rc >= 0)
    16911929               {
    16921930                   // Sucessfully wrote event
    1693                    runCtrl[idx].lastTime = g_actTime;
    1694                    runCtrl[idx].actEvt++;
     1931                   runCtrl[lastRun].lastTime = g_actTime;
     1932                   runCtrl[lastRun].actEvt++;
    16951933               }
    16961934               else
    16971935                   factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun);
    16981936
    1699                checkAndCloseRun(idx, irun, rc<0, 1);
     1937               checkAndCloseRun(lastRun, irun, rc<0, 1);
    17001938           }
    17011939
    17021940           evtCtrl[k0].evtStat = 10000;  // event written (or has to be discarded) -> delete
    17031941       }
     1942/*
     1943       //check if we should close a run (mainly when no event pending)
     1944       //ETIENNE but first figure out which one is the latest run with a complete event.
     1945       //i.e. max run Id and lastEvt >= 0
     1946       //this condition is sufficient because all pending events were written already in the loop just above
     1947       //actrun
     1948       uint32_t lastStartedTime = 0;
     1949       uint32_t runIdFound = 1;
     1950
     1951       //If we have an active run, look for its start time
     1952       if (actrun != 0)
     1953       {
     1954           runIdfound = 0;
     1955           for (int j=0;j<MAX_RUN;j++)
     1956           {
     1957               if (runCtrl[j].runId == actrun)
     1958               {
     1959                   lastStartedTime = runCtrl[j].lastTime;
     1960                   runIdFound = 1;
     1961               }
     1962           }
     1963       }
     1964
     1965      if (runIdFound == 0)
     1966      {
     1967          factPrintf(kInfo, 0, "An Active run (number %u) has been registered, but it could not be found in the runs list", actrun);
     1968      }
     1969
     1970       //Also check if some files will never be opened
     1971       //EDIT: this is completely useless, because as run Numbers are taken from FADs board,
     1972       //I will never get run numbers for which no file is to be opened
     1973       for (int j=0;j<MAX_RUN;j++)
     1974       {
     1975           if ((runCtrl[j].fileId < 0) &&
     1976               (runCtrl[j].lastTime < lastStartedTime) &&
     1977               (runCtrl[j].runId != 0))
     1978           {
     1979               factPrintf(kInfo, 0, "writeEvt: No file will be opened for run %u. Last run: %u (started)", runCtrl[j].runId, actrun);
     1980               ;//TODO notify that this run will never be opened
     1981           }
     1982       }
     1983       */
    17041984
    17051985       // Although the are no pending events, we have to check if a run should be closed (timeout)
     
    17141994       }
    17151995
    1716        usleep(1);
     1996      //seems we have nothing to do, so sleep a little
     1997      if (numWrite == 0)
     1998          usleep(1);
    17171999
    17182000       //nothing left to do
     
    17202002       {
    17212003           factPrintf(kInfo, -1, "Finish Write Process ...");
     2004           gw_runStat = -22;      //==> we should exit
    17222005           gj.writStat = -22;     //==> we should exit
    17232006           break;
    17242007       }
    17252008
     2009       gw_runStat = gi_runStat;
    17262010       gj.writStat = gj.readStat;
    17272011   }
     
    17342018   }
    17352019
     2020   gw_runStat = -99;
    17362021   gj.writStat = -99;
    17372022
     
    17522037   struct timespec xwait;
    17532038
     2039   gi_runStat = gp_runStat = gw_runStat = 0;
    17542040   gj.readStat = gj.procStat = gj.writStat = 0;
    17552041
     
    17612047      runCtrl[i].fileId = -2;
    17622048   }
     2049
     2050//prepare for subProcesses
     2051   gi_maxSize = g_maxSize;
     2052   if (gi_maxSize <= 0)
     2053      gi_maxSize = 1;
    17632054
    17642055   gi_maxProc = g_maxProc;
     
    19692260
    19702261   g_maxMem = 1024 * 1024;      //MBytes
     2262//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
    19712263   g_maxMem = g_maxMem * 200;   //100MBytes
    19722264
    19732265   g_maxProc = 20;
     2266   g_maxSize = 30000;
    19742267
    19752268   g_runStat = 40;
Note: See TracChangeset for help on using the changeset viewer.