Changeset 15495 for trunk/FACT++
- Timestamp:
- 05/04/13 11:25:30 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified trunk/FACT++/src/EventBuilder.c ¶
r15494 r15495 73 73 74 74 int g_maxProc; 75 int g_maxSize;76 int gi_maxSize;77 75 int gi_maxProc; 78 76 79 77 uint g_actTime; 80 uint g_actUsec;81 78 int g_runStat; 82 79 int g_reset; 83 int g_useFTM; 84 85 int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX; 80 86 81 size_t g_maxMem; //maximum memory allowed for buffer 87 82 88 //no longer needed ...89 int g_maxBoards; //maximum number of boards to be initialized90 int g_actBoards;91 //92 93 83 FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd" 94 84 95 96 int gi_runStat;97 int gp_runStat;98 int gw_runStat;99 100 uint32_t actrun = 0;101 102 85 uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards 103 86 104 EVT_STAT gi;87 //EVT_STAT gi; 105 88 GUI_STAT gj; 106 89 … … 166 149 // Add the last free slot to the stack 167 150 TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry)); 151 152 // FIXME: Really free memory if memory usuage exceeds g_maxMem 168 153 169 154 entry->prev = tgb_last; … … 220 205 /*-----------------------------------------------------------------*/ 221 206 222 223 224 int225 runFinish1 (uint32_t runnr)226 {227 factPrintf(kInfo, 173, "Should finish(1) run %d (but not yet possible)", runnr);228 return 0;229 }230 int231 runFinish (uint32_t runnr)232 {233 factPrintf(kInfo, 173, "Should finish run %d (but not yet possible)", runnr);234 return 0;235 }236 207 237 208 int … … 438 409 } 439 410 440 // FIXME: This should be the time of the first receiped board 411 // If we have already queued at least one event, 412 // check the roi if the previous event 413 // already belongs to the same run. 414 415 // Get the runCtrl entry of the previous event. 416 // If none is in the queue (we are anyhow super fast) 417 // just get the correct entry from the runCtrl array. 418 int idx = evtCtrl[beg].runCtrl_idx; 419 420 // If there is an event in the queue and it has the same runID, we can use 421 // that event to check for the roi consistency throughout the run 422 if (evtCtrl_frstPtr!=evtCtrl_lastPtr && runCtrl[idx].runId==runID) 423 { 424 // Check if run already registered (old entries should have runId==-1) 425 if (runCtrl[idx].roi0 != nRoi[0] || runCtrl[idx].roi8 != nRoi[8]) 426 { 427 factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)", 428 runCtrl[idx].roi0, runCtrl[idx].roi8, nRoi[0], nRoi[8], runID, evID); 429 return -9301; 430 } 431 } 432 433 // If there is none in the queue, we have to search for the correct entry 434 if (evtCtrl_frstPtr==evtCtrl_lastPtr) 435 { 436 idx = -1; 437 438 for (int k=0; k<MAX_RUN; k++) 439 { 440 if (runCtrl[k].runId==runID) 441 { 442 idx = k; 443 break; 444 } 445 } 446 } 447 441 448 struct timeval tv; 442 gettimeofday 449 gettimeofday(&tv, NULL); 443 450 444 451 const uint32_t tsec = tv.tv_sec; 445 452 const uint32_t tusec = tv.tv_usec; 446 453 447 //check if runId already registered in runCtrl 448 449 uint oldest = g_actTime + 1000; 450 int jold = -1; 451 452 int found = 0; 453 454 // fileId==-2: not yet used or run assigned but not open 455 // fileId== 0: file open 456 // fileId>0: run closed 457 458 for (int k=0; k<MAX_RUN; k++) 454 // Run not yet registered, register run 455 // If we haven't found a corresponding entry in the queue, or the runId has changed 456 // we find the oldest empty entry in the runCtrl array and create a new runCtrl entry 457 if (idx<0 || runCtrl[idx].runId!=runID) 459 458 { 460 // Check if run already registered (old entries should have runId==-1) 461 if (runCtrl[k].runId == runID) 459 // If there is none in the queue or 460 idx = -1; 461 462 uint oldest = g_actTime + 1000; 463 for (int k=0; k<MAX_RUN; k++) 462 464 { 463 // FIXME: Compare to previous event 464 if (runCtrl[k].roi0 != nRoi[0] || runCtrl[k].roi8 != nRoi[8]) 465 // This is just for sanity. We use the oldest free entry (until 466 // we have understood the concept and can use "just" a free entry 467 if (runCtrl[k].runId==0 && runCtrl[k].closeTime < oldest) 465 468 { 466 factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)", 467 runCtrl[k].roi0, runCtrl[k].roi8, nRoi[0], nRoi[8], runID, evID); 468 return -9301; 469 oldest = runCtrl[k].closeTime; 470 idx = k; 469 471 } 470 471 found = 1;472 break;473 472 } 474 473 475 // This is just for sanity. We use the oldest free entry (until 476 // we have understood the concept and can use "just" a free entry 477 if (runCtrl[k].runId==0 && runCtrl[k].closeTime < oldest) 478 { 479 oldest = runCtrl[k].closeTime; 480 jold = k; 481 } 482 } 483 484 if (!found) // Run not yet registered, register run 485 { 486 if (jold < 0) 474 if (idx<0) 487 475 { 488 476 factPrintf(kFatal, 883, "Not able to register the new run %d", runID); … … 490 478 } 491 479 492 const int evFree = jold; 493 494 factPrintf(kInfo, 503, "New run %d (evID=%d, evFree=%d) registered with roi=%d and roi_tm=%d", 495 runID, evID, evFree, nRoi[0], nRoi[8]); 496 497 runCtrl[evFree].runId = runID; 498 runCtrl[evFree].roi0 = nRoi[0]; // FIXME: Make obsolete! 499 runCtrl[evFree].roi8 = nRoi[8]; // FIXME: Make obsolete! 500 runCtrl[evFree].fileId = -2; 501 runCtrl[evFree].procId = -2; 502 runCtrl[evFree].lastEvt = 1; // Number of events partially started to read 503 runCtrl[evFree].actEvt = 0; // Number of written events (write) 504 runCtrl[evFree].procEvt = 0; // Number of successfully checked events (checkEvent) 505 runCtrl[evFree].maxEvt = 999999999; // max number events allowed 506 runCtrl[evFree].lastTime = tsec; // Time when the last event was written 507 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed 480 factPrintf(kInfo, 503, "New run %d (evt=%d, idx=%d) registered with roi=%d and roi_tm=%d", 481 runID, evID, idx, nRoi[0], nRoi[8]); 482 483 runCtrl[idx].runId = runID; 484 runCtrl[idx].roi0 = nRoi[0]; // FIXME: Make obsolete! 485 runCtrl[idx].roi8 = nRoi[8]; // FIXME: Make obsolete! 486 runCtrl[idx].fileId = -2; 487 runCtrl[idx].lastEvt = 1; // Number of events partially started to read 488 runCtrl[idx].actEvt = 0; // Number of written events (write) 489 runCtrl[idx].procEvt = 0; // Number of successfully checked events (checkEvent) 490 runCtrl[idx].maxEvt = 999999999; // max number events allowed 491 runCtrl[idx].lastTime = tsec; // Time when the last event was written 492 runCtrl[idx].closeTime = tsec + 3600 * 24; //max time allowed 508 493 } 509 494 … … 516 501 evtCtrl[k].board[b] = -1; 517 502 503 evtCtrl[k].runCtrl_idx = idx; 518 504 evtCtrl[k].pcTime[0] = tsec; 519 505 evtCtrl[k].pcTime[1] = tusec; … … 582 568 mBufFree (int i) 583 569 { 584 //delete entry [i] from mBuffer:585 //(and make sure multiple calls do no harm ....)586 587 570 TGB_free(evtCtrl[i].FADhead); 588 571 … … 597 580 gj.bufTot--; 598 581 599 /*if (gi_memStat < 0) {600 if (gj.usdMem <= 0.75 * gj.maxMem)601 gi_memStat = +1;602 }*/603 604 582 return 0; 605 583 606 584 } /*-----------------------------------------------------------------*/ 607 608 /*609 void610 resetEvtStat ()611 {612 for (int i = 0; i < MAX_SOCK; i++)613 gi.numRead[i] = 0;614 615 for (int i = 0; i < NBOARDS; i++) {616 gi.gotByte[i] = 0;617 gi.gotErr[i] = 0;618 619 }620 621 gi.evtGet = 0; //#new Start of Events read622 gi.evtTot = 0; //#complete Events read623 gi.evtErr = 0; //#Events with Errors624 gi.evtSkp = 0; //#Events incomplete (timeout)625 626 gi.procTot = 0; //#Events processed627 gi.procErr = 0; //#Events showed problem in processing628 gi.procTrg = 0; //#Events accepted by SW trigger629 gi.procSkp = 0; //#Events rejected by SW trigger630 631 gi.feedTot = 0; //#Events used for feedBack system632 gi.feedErr = 0; //#Events rejected by feedBack633 634 gi.wrtTot = 0; //#Events written to disk635 gi.wrtErr = 0; //#Events with write-error636 637 gi.runOpen = 0; //#Runs opened638 gi.runClose = 0; //#Runs closed639 gi.runErr = 0; //#Runs with open/close errors640 641 return;642 }*/ /*-----------------------------------------------------------------*/643 585 644 586 uint64_t reportIncomplete(int id, const char *txt) … … 762 704 READ_STRUCT rd[NBOARDS]; //buffer to read IP and afterwards store in mBuffer 763 705 706 uint32_t actrun = 0; 707 764 708 const int minLen = sizeof(PEVNT_HEADER); //min #bytes needed to check header: full header for debug 765 709 766 //start.S = 0xFB01;767 //stop.S = 0x04FE;768 710 769 711 /* initialize run control logics */ … … 771 713 runCtrl[i].runId = 0; 772 714 runCtrl[i].fileId = -2; 773 runCtrl[i].procId = -2; 774 } 715 } 716 717 int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX; 775 718 gi_resetS = gi_resetR = 9; 776 719 … … 784 727 //time in seconds 785 728 uint gi_SecTime = time(NULL);; 729 g_actTime = gi_SecTime; 786 730 787 731 const int cntsock = 8 - NUMSOCK ; … … 833 777 while (g_runStat >= 0 && g_reset == 0) 834 778 { 835 gi_runStat = g_runStat;836 779 gj.readStat = g_runStat; 837 838 struct timeval tv;839 gettimeofday (&tv, NULL);840 g_actTime = tv.tv_sec;841 g_actUsec = tv.tv_usec;842 843 780 844 781 for (int b = 0; b < NBOARDS; b++) … … 849 786 850 787 gi_NumConnect[b] = 0; //must close all connections 851 //gi.numConn[b] = 0;852 788 gj.numConn[b] = 0; 853 789 … … 920 856 continue; 921 857 922 //numok++;923 924 858 if (rd[i].bufLen>0) 925 859 { … … 933 867 // There was just nothing waiting 934 868 if (errno==EWOULDBLOCK || errno==EAGAIN) 935 {936 //numok--;937 869 continue; 938 }939 870 940 871 factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno); 941 //gi.gotErr[b]++;942 872 continue; 943 873 } … … 951 881 GenSock(s0, i, 0, NULL, &rd[i]); 952 882 953 //gi.gotErr[b]++;954 955 883 gi_NumConnect[i]-= cntsock ; 956 //gi.numConn[b]--;957 884 gj.numConn[i]--; 958 885 959 886 continue; 960 887 } 961 // Success (jrd > 0)962 888 963 889 gj.rateBytes[i] += jrd; … … 1073 999 rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04) 1074 1000 { 1075 //gi.evtErr++;1076 1001 factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d", 1077 1002 i, rd[i].evtID, rd[i].fadLen, … … 1171 1096 if (evtCtrl[idx].nBoard==1 && evtCtrl[idx].runNum != actrun) 1172 1097 { 1173 1098 // Signal the fadctrl that a new run has been started 1174 1099 gotNewRun(evtCtrl[idx].runNum, NULL); 1175 1100 … … 1200 1125 // This is a non-ideal hack to lower the probability that 1201 1126 // in mBufEvt the search for correct entry in runCtrl 1202 // will not return a super-old entry 1127 // will not return a super-old entry. I don't want 1128 // to manipulate that in another thread. 1203 1129 for (int ir=0; ir<MAX_RUN; ir++) 1204 1130 { … … 1254 1180 const int count = (evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT; 1255 1181 1182 // This could be improved having the pointer which separates the queue with 1183 // the incomplete events from the queue with the complete events 1256 1184 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1257 1185 { … … 1262 1190 1263 1191 // Event has not yet timed out or was reported already 1264 if (evtCtrl[k0].evtStat==90 || evtCtrl[k0].pcTime[0] /*evtCtrl[k0].lastRecv*/>=g_actTime - 30)1192 if (evtCtrl[k0].evtStat==90 || evtCtrl[k0].pcTime[0]>=g_actTime - 30) 1265 1193 continue; 1266 1194 … … 1296 1224 continue; 1297 1225 } 1298 1299 // The first events in the queue are either incomplete or1300 // can be deleted (processing finished). As soon as we reach the1301 // first complete events which processing is pending, we can stop.1302 // All other events (if everything works well) must have the same state.1303 // FIXME: This only works if we start from lastPtr and go down to frstPtr1304 //break;1305 1226 } 1306 1227 … … 1352 1273 1353 1274 //inform others we have to quit .... 1354 gi_runStat = -11; //inform all that no update to happen any more1355 1275 gj.readStat = -11; //inform all that no update to happen any more 1356 1276 } … … 1393 1313 evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % MAX_EVT; 1394 1314 evtCtrl[k0].evtStat = -1; 1395 }1315 } 1396 1316 1397 1317 usleep(1); … … 1421 1341 factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse); 1422 1342 1423 gi_runStat = -99;1424 1343 gj.readStat = -99; 1425 1344 1426 1345 factStat (gj); 1427 factStatNew (gi);1346 //factStatNew (gi); 1428 1347 1429 1348 return 0; … … 1444 1363 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1445 1364 { 1446 if (evtCtrl[k0].evtStat != 1000 + threadID) 1447 { 1448 if (evtCtrl[k0].evtStat < 1000 + threadID) 1449 numWait++; 1450 1451 continue; 1452 } 1453 1454 /* 1455 // gi_resetR>1 : flush buffers asap 1456 if (gi_resetR>1) 1457 { 1458 evtCtrl[k].evtStat = 9000; // flag as 'to be deleted' 1459 continue; 1460 1461 }*/ 1462 1463 const int jret = subProcEvt(threadID, evtCtrl[k0].FADhead, 1464 evtCtrl[k0].fEvent, NULL/*mBuffer[id].buffer*/); 1365 // This is a threading issue... the evtStat might have been invalid 1366 // but the frstPtr is not yet updated 1367 if (evtCtrl[k0].evtStat==-1) 1368 continue; 1369 1370 // If we find the first event still waiting for processing 1371 // there will be only unprocessed events after this one in the queue 1372 if (evtCtrl[k0].evtStat<1000+threadID) 1373 { 1374 numWait = 1; 1375 break; 1376 } 1377 1378 // If the event was processed already, skip it 1379 // We could replace that to a moving pointer pointing to the first 1380 // non-processed event 1381 if (evtCtrl[k0].evtStat!=1000+threadID) 1382 continue; 1383 1384 const int jret = subProcEvt(threadID, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent, 0); 1465 1385 1466 1386 if (jret>0 && jret<=threadID) … … 1504 1424 int status; 1505 1425 1506 int lastRun = 0; //usually run from last event still valid1507 1508 // cpu_set_t mask;1509 // int cpu = 1; //process thread (will be several in final version)1510 1511 1426 factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc); 1512 1513 /* CPU_ZERO initializes all the bits in the mask to zero. */1514 // CPU_ZERO (&mask);1515 /* CPU_SET sets only the bit corresponding to cpu. */1516 // CPU_SET( 0 , &mask ); leave for system1517 // CPU_SET( 1 , &mask ); used by write process1518 // CPU_SET (2, &mask);1519 // CPU_SET (3, &mask);1520 // CPU_SET (4, &mask);1521 // CPU_SET (5, &mask);1522 // CPU_SET (6, &mask);1523 // CPU_SET( 7 , &mask ); used by read process1524 /* sched_setaffinity returns 0 in success */1525 // if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {1526 // snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);1527 // factOut (kWarn, -1, str);1528 // }1529 1530 1427 1531 1428 pthread_t thread[100]; … … 1540 1437 { 1541 1438 int numWait = 0; 1542 int numProc = 0;1543 1439 1544 1440 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1545 1441 { 1546 if (evtCtrl[k0].evtStat<90 || evtCtrl[k0].evtStat>=1000) 1442 // This is a threading issue... the evtStat might have been invalid 1443 // but the frstPtr is not yet updated 1444 if (evtCtrl[k0].evtStat==-1) 1445 continue; 1446 1447 // If we find the first incomplete event which is not supposed to 1448 // be processed, there are only more incomplete events in the queue 1449 if (evtCtrl[k0].evtStat<90) 1547 1450 { 1548 if (/*evtCtrl[k0].evtStat>=0 &&*/ evtCtrl[k0].evtStat<90) 1549 numWait++; 1550 1451 numWait = 1; 1452 break; 1453 } 1454 1455 // If the event was processed already, skip it. 1456 // We could replace that to a moving pointer pointing to the first 1457 // non-processed event 1458 if (evtCtrl[k0].evtStat>=1000) 1551 1459 continue; 1552 }1553 1554 /*1555 //we are asked to flush buffers asap1556 if (gi_resetR > 1)1557 {1558 evtCtrl[k0].evtStat = 9000;1559 continue;1560 }*/1561 1460 1562 1461 //-------- it is better to open the run already here, so call can be used to initialize … … 1565 1464 const int32_t ievt = evtCtrl[k0].evNum; 1566 1465 1567 // Find entry in runCtrl which belongs to the event mBuffer[id] 1568 // (only check if there is a need to check) 1569 if (runCtrl[lastRun].runId != irun) 1466 const int idx = evtCtrl[k0].runCtrl_idx; 1467 if (runCtrl[idx].runId!=irun) 1570 1468 { 1571 //check which fileID to use (or open if needed) 1572 int j; 1573 for (j=0;j<MAX_RUN; j++) 1574 if (runCtrl[j].runId == irun) 1575 break; 1576 1577 if (j>=MAX_RUN) 1578 { 1579 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, k0); 1580 // FIXME: What is the right action? (Flag event for deletion?) 1581 continue; 1582 } 1583 1584 lastRun = j; 1469 //factPrintf(kFatal, 901, "procEvt: runCtrl entry for run %d vanished (evt=%d)", irun, ievt); 1470 // FIXME: What is the right action? (Flag event for deletion?) 1471 continue; 1585 1472 } 1586 1473 1587 1474 // File not yet open 1588 if (runCtrl[ lastRun].fileId < 0)1475 if (runCtrl[idx].fileId < 0) 1589 1476 { 1590 //---- we need to open a new run ==> make sure all older runs are1591 //---- finished and marked to be closed ....1592 // This loop is unique to procEvt1593 for (int j=0; j<MAX_RUN; j++)1594 {1595 if (runCtrl[j].fileId == 0)1596 {1597 runCtrl[j].procId = 2; //--> do no longer accept events for processing1598 1599 //---- problem: processing still going on ==> must wait for closing ....1600 factPrintf(kInfo, -1, "procEvt: Finished run since new one opened %d", runCtrl[j].runId);1601 runFinish1(runCtrl[j].runId);1602 }1603 }1604 1605 1477 RUN_HEAD actRun; 1606 1478 actRun.Version = 1; … … 1616 1488 memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS*sizeof(PEVNT_HEADER)); 1617 1489 1618 runCtrl[ lastRun].fileHd = runOpen(irun, &actRun, sizeof (actRun));1619 if (runCtrl[ lastRun].fileHd == NULL)1490 runCtrl[idx].fileHd = runOpen(irun, &actRun, sizeof (actRun)); 1491 if (runCtrl[idx].fileHd == NULL) 1620 1492 { 1621 factPrintf(kError, 502, "procEvt: Could not open a file for run %d (runOpen failed)", irun); 1622 runCtrl[lastRun].fileId = 91; 1623 runCtrl[lastRun].procId = 91; // Is not set in writeEvt 1493 factPrintf(kError, 502, "procEvt: Could not open new file for run %d (idx=%d, evt=%d, runOpen failed)", irun, idx, ievt); 1494 runCtrl[idx].fileId = 91; 1624 1495 continue; 1625 1496 } 1626 1497 1627 runCtrl[lastRun].fileId = 0; 1628 runCtrl[lastRun].procId = 0; // Is not set in writeEvt 1629 1630 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt); 1631 } 1632 1633 //-------- also check if run shall be closed (==> skip event, but do not close the file !!! ) 1634 if (runCtrl[lastRun].procId == 0) 1635 { 1636 if (runCtrl[lastRun].closeTime < g_actTime || 1637 runCtrl[lastRun].lastTime < g_actTime - 300 || 1638 runCtrl[lastRun].maxEvt <= runCtrl[lastRun].procEvt) 1639 { 1640 factPrintf(kInfo, 502, "procEvt: Reached end of run condition for run %d", irun); 1641 runFinish1 (runCtrl[lastRun].runId); 1642 runCtrl[lastRun].procId = 1; 1643 } 1644 } 1645 1646 // Skip event because of no active run 1647 if (runCtrl[lastRun].procId != 0) 1648 { 1649 evtCtrl[k0].evtStat = 10000; // flag 'to be deleted' 1650 continue; 1498 runCtrl[idx].fileId = 0; 1499 1500 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (idx=%d, evt=%d)", irun, idx, ievt); 1651 1501 } 1652 1502 … … 1674 1524 } 1675 1525 1676 const int rc = eventCheck(evtCtrl[k0].runNum, evtCtrl[k0].FADhead, 1677 evtCtrl[k0].fEvent); 1678 //gi.procTot++; 1679 numProc++; 1680 1526 const int rc = eventCheck(evtCtrl[k0].runNum, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent); 1681 1527 if (rc < 0) 1682 1528 { 1683 1529 evtCtrl[k0].evtStat = 10000; // flag event to be deleted 1684 //gi.procErr++;1685 1530 } 1686 1531 else 1687 1532 { 1688 1533 evtCtrl[k0].evtStat = 1000; // flag 'start processing' 1689 runCtrl[ lastRun].procEvt++;1534 runCtrl[idx].procEvt++; 1690 1535 } 1691 1536 } … … 1693 1538 if (gj.readStat < -10 && numWait == 0) { //nothing left to do 1694 1539 factPrintf(kInfo, -1, "Exit Processing Process ..."); 1695 gp_runStat = -22; //==> we should exit1696 1540 gj.procStat = -22; //==> we should exit 1697 1541 return 0; 1698 1542 } 1699 1543 1700 //seems we have nothing to do, so sleep a little 1701 if (numProc == 0) 1702 usleep(1); 1703 1704 gp_runStat = gi_runStat; 1544 usleep(1); 1545 1705 1546 gj.procStat = gj.readStat; 1706 1707 1547 } 1708 1548 … … 1716 1556 } 1717 1557 1718 /*1719 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)1720 {1721 if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat < 1000)1722 evtCtrl[k0].evtStat = 9000; //flag event as 'processed'1723 }*/1724 1725 gp_runStat = -99;1726 1558 gj.procStat = -99; 1727 1559 … … 1786 1618 ii |= 8; // = 4; 1787 1619 1788 if (runCtrl[j].procId == 0)1789 {1790 runFinish1(runCtrl[j].runId);1791 runCtrl[j].procId = 92;1792 }1793 1794 1620 runCtrl[j].closeTime = g_actTime - 1; 1795 1621 … … 1816 1642 /* *** main loop writing event (including opening and closing run-files */ 1817 1643 1818 // cpu_set_t mask;1819 // int cpu = 1; //write thread1820 1821 1644 factPrintf(kInfo, -1, "Starting write-thread"); 1822 1823 /* CPU_ZERO initializes all the bits in the mask to zero. */1824 // CPU_ZERO (&mask);1825 /* CPU_SET sets only the bit corresponding to cpu. */1826 // CPU_SET (cpu, &mask);1827 /* sched_setaffinity returns 0 in success */1828 // if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {1829 // snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);1830 // }1831 1832 int lastRun = 0; //usually run from last event still valid1833 1645 1834 1646 while (g_runStat > -2) 1835 1647 { 1836 int numWrite = 0;1648 //int numWrite = 0; 1837 1649 int numWait = 0; 1838 1650 … … 1841 1653 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1842 1654 { 1843 if (evtCtrl[k0].evtStat<5000 || evtCtrl[k0].evtStat >= 10000) 1655 // This is a threading issue... the evtStat might have been invalid 1656 // but the frstPtr is not yet updated 1657 if (evtCtrl[k0].evtStat==-1) 1658 continue; 1659 1660 // If we find the first non-written event which is not supposed to 1661 // be written, there are only more incomplete events in the queue 1662 if (evtCtrl[k0].evtStat<5000) 1844 1663 { 1845 if (/*evtCtrl[k0].evtStat > 0 &&*/ evtCtrl[k0].evtStat < 5000) 1846 numWait++; 1847 1664 numWait = 1; 1665 break; 1666 } 1667 1668 // If the event was written already already, skip it 1669 // We could replace that to a moving pointer pointing to the first 1670 // non-processed event 1671 if (evtCtrl[k0].evtStat!=5000) 1672 continue; 1673 1674 const uint32_t irun = evtCtrl[k0].runNum; 1675 const int32_t ievt = evtCtrl[k0].evNum; 1676 1677 const int idx = evtCtrl[k0].runCtrl_idx; 1678 1679 if (runCtrl[idx].runId!=irun) 1680 { 1681 //factPrintf(kFatal, 901, "writeEvt: runCtrl entry for run %d vanished (evt=%d)", irun, ievt); 1682 // FIXME: What is the right action? (Flag event for deletion?) 1848 1683 continue; 1849 1684 } 1850 1685 1851 //we must drain the buffer asap 1852 /* 1853 if (gi_resetR > 1) 1686 // File is open 1687 if (runCtrl[idx].fileId==0) 1854 1688 { 1855 evtCtrl[k0].evtStat = 9000; 1856 continue; 1857 }*/ 1858 1859 const uint32_t irun = evtCtrl[k0].runNum; 1860 const int32_t ievt = evtCtrl[k0].evNum; 1861 1862 // Find entry in runCtrl which belongs to the event mBuffer[id] 1863 // (only check if there is a need to check) 1864 if (runCtrl[lastRun].runId != irun) 1865 { 1866 //check which fileID to use (or open if needed) 1867 int j; 1868 for (j=0;j<MAX_RUN; j++) 1869 if (runCtrl[j].runId == irun) 1870 break; 1871 1872 if (j>=MAX_RUN) 1873 { 1874 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, k0); 1875 // FIXME: What is the right action? 1876 continue; 1877 } 1878 1879 lastRun = j; 1880 } 1881 1882 // File not yet open 1883 if (runCtrl[lastRun].fileId < 0) 1884 { 1885 RUN_HEAD actRun; 1886 actRun.Version = 1; 1887 actRun.RunType = -1; //to be adapted 1888 actRun.Nroi = evtCtrl[k0].nRoi; //runCtrl[lastRun].roi0; 1889 actRun.NroiTM = evtCtrl[k0].nRoiTM; //runCtrl[lastRun].roi8; 1890 actRun.RunTime = evtCtrl[k0].pcTime[0];//runCtrl[lastRun].firstTime; 1891 actRun.RunUsec = evtCtrl[k0].pcTime[1];//runCtrl[lastRun].firstUsec; 1892 actRun.NBoard = NBOARDS; 1893 actRun.NPix = NPIX; 1894 actRun.NTm = NTMARK; 1895 1896 memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS * sizeof (PEVNT_HEADER)); 1897 1898 runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun)); 1899 if (runCtrl[lastRun].fileHd == NULL) 1900 { 1901 factPrintf(kError, 502, "writeEvt: Could not open a file for run %d (runOpen failed)", irun); 1902 runCtrl[lastRun].fileId = 91; 1903 continue; 1904 } 1905 1906 runCtrl[lastRun].fileId = 0; 1907 factPrintf(kInfo, -1, "writeEvt: Opened new file for run %d (evt %d)", irun, ievt); 1908 } 1909 1910 /* 1911 if (runCtrl[lastRun].fileId > 0) 1912 { 1913 // There is an event but file is already closed 1914 //if (runCtrl[j].fileId < 100) 1915 //{ 1916 // factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun); 1917 // runCtrl[j].fileId += 100; 1918 //} 1919 1920 evtCtrl[k0].evtStat = 9000; 1921 }*/ 1922 1923 // File is open 1924 if (runCtrl[lastRun].fileId==0) 1925 { 1926 const int rc = runWrite(runCtrl[lastRun].fileHd, evtCtrl[k0].fEvent, 1927 0/*sizeof (evtCtrl[k0])*/); 1689 const int rc = runWrite(runCtrl[idx].fileHd, evtCtrl[k0].fEvent, 0); 1928 1690 if (rc >= 0) 1929 1691 { 1930 1692 // Sucessfully wrote event 1931 runCtrl[ lastRun].lastTime = g_actTime;1932 runCtrl[ lastRun].actEvt++;1693 runCtrl[idx].lastTime = g_actTime; 1694 runCtrl[idx].actEvt++; 1933 1695 } 1934 1696 else 1935 1697 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun); 1936 1698 1937 checkAndCloseRun( lastRun, irun, rc<0, 1);1699 checkAndCloseRun(idx, irun, rc<0, 1); 1938 1700 } 1939 1701 1940 1702 evtCtrl[k0].evtStat = 10000; // event written (or has to be discarded) -> delete 1941 1703 } 1942 /*1943 //check if we should close a run (mainly when no event pending)1944 //ETIENNE but first figure out which one is the latest run with a complete event.1945 //i.e. max run Id and lastEvt >= 01946 //this condition is sufficient because all pending events were written already in the loop just above1947 //actrun1948 uint32_t lastStartedTime = 0;1949 uint32_t runIdFound = 1;1950 1951 //If we have an active run, look for its start time1952 if (actrun != 0)1953 {1954 runIdfound = 0;1955 for (int j=0;j<MAX_RUN;j++)1956 {1957 if (runCtrl[j].runId == actrun)1958 {1959 lastStartedTime = runCtrl[j].lastTime;1960 runIdFound = 1;1961 }1962 }1963 }1964 1965 if (runIdFound == 0)1966 {1967 factPrintf(kInfo, 0, "An Active run (number %u) has been registered, but it could not be found in the runs list", actrun);1968 }1969 1970 //Also check if some files will never be opened1971 //EDIT: this is completely useless, because as run Numbers are taken from FADs board,1972 //I will never get run numbers for which no file is to be opened1973 for (int j=0;j<MAX_RUN;j++)1974 {1975 if ((runCtrl[j].fileId < 0) &&1976 (runCtrl[j].lastTime < lastStartedTime) &&1977 (runCtrl[j].runId != 0))1978 {1979 factPrintf(kInfo, 0, "writeEvt: No file will be opened for run %u. Last run: %u (started)", runCtrl[j].runId, actrun);1980 ;//TODO notify that this run will never be opened1981 }1982 }1983 */1984 1704 1985 1705 // Although the are no pending events, we have to check if a run should be closed (timeout) … … 1994 1714 } 1995 1715 1996 //seems we have nothing to do, so sleep a little 1997 if (numWrite == 0) 1998 usleep(1); 1716 usleep(1); 1999 1717 2000 1718 //nothing left to do … … 2002 1720 { 2003 1721 factPrintf(kInfo, -1, "Finish Write Process ..."); 2004 gw_runStat = -22; //==> we should exit2005 1722 gj.writStat = -22; //==> we should exit 2006 1723 break; 2007 1724 } 2008 1725 2009 gw_runStat = gi_runStat;2010 1726 gj.writStat = gj.readStat; 2011 1727 } … … 2018 1734 } 2019 1735 2020 gw_runStat = -99;2021 1736 gj.writStat = -99; 2022 1737 … … 2037 1752 struct timespec xwait; 2038 1753 2039 gi_runStat = gp_runStat = gw_runStat = 0;2040 1754 gj.readStat = gj.procStat = gj.writStat = 0; 2041 1755 … … 2047 1761 runCtrl[i].fileId = -2; 2048 1762 } 2049 2050 //prepare for subProcesses2051 gi_maxSize = g_maxSize;2052 if (gi_maxSize <= 0)2053 gi_maxSize = 1;2054 1763 2055 1764 gi_maxProc = g_maxProc; … … 2260 1969 2261 1970 g_maxMem = 1024 * 1024; //MBytes 2262 //g_maxMem = g_maxMem * 1024 *10 ; //10GBytes2263 1971 g_maxMem = g_maxMem * 200; //100MBytes 2264 1972 2265 1973 g_maxProc = 20; 2266 g_maxSize = 30000;2267 1974 2268 1975 g_runStat = 40;
Note:
See TracChangeset
for help on using the changeset viewer.