Changeset 15494 for trunk/FACT++/src
- Timestamp:
- 05/04/13 11:22:54 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.c
r15490 r15494 73 73 74 74 int g_maxProc; 75 int g_maxSize; 76 int gi_maxSize; 75 77 int gi_maxProc; 76 78 77 79 uint g_actTime; 80 uint g_actUsec; 78 81 int g_runStat; 79 82 int g_reset; 80 83 int g_useFTM; 84 85 int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX; 81 86 size_t g_maxMem; //maximum memory allowed for buffer 82 87 88 //no longer needed ... 89 int g_maxBoards; //maximum number of boards to be initialized 90 int g_actBoards; 91 // 92 83 93 FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd" 84 94 95 96 int gi_runStat; 97 int gp_runStat; 98 int gw_runStat; 99 100 uint32_t actrun = 0; 101 85 102 uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards 86 103 87 //EVT_STAT gi;104 EVT_STAT gi; 88 105 GUI_STAT gj; 89 106 … … 149 166 // Add the last free slot to the stack 150 167 TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry)); 151 152 // FIXME: Really free memory if memory usuage exceeds g_maxMem153 168 154 169 entry->prev = tgb_last; … … 205 220 /*-----------------------------------------------------------------*/ 206 221 222 223 224 int 225 runFinish1 (uint32_t runnr) 226 { 227 factPrintf(kInfo, 173, "Should finish(1) run %d (but not yet possible)", runnr); 228 return 0; 229 } 230 int 231 runFinish (uint32_t runnr) 232 { 233 factPrintf(kInfo, 173, "Should finish run %d (but not yet possible)", runnr); 234 return 0; 235 } 207 236 208 237 int … … 409 438 } 410 439 411 // If we have already queued at least one event, 412 // check the roi if the previous event 413 // already belongs to the same run. 414 415 // Get the runCtrl entry of the previous event. 416 // If none is in the queue (we are anyhow super fast) 417 // just get the correct entry from the runCtrl array. 418 int idx = evtCtrl[beg].runCtrl_idx; 419 420 // If there is an event in the queue and it has the same runID, we can use 421 // that event to check for the roi consistency throughout the run 422 if (evtCtrl_frstPtr!=evtCtrl_lastPtr && runCtrl[idx].runId==runID) 440 // FIXME: This should be the time of the first receiped board 441 struct timeval tv; 442 gettimeofday (&tv, NULL); 443 444 const uint32_t tsec = tv.tv_sec; 445 const uint32_t tusec = tv.tv_usec; 446 447 //check if runId already registered in runCtrl 448 449 uint oldest = g_actTime + 1000; 450 int jold = -1; 451 452 int found = 0; 453 454 // fileId==-2: not yet used or run assigned but not open 455 // fileId== 0: file open 456 // fileId>0: run closed 457 458 for (int k=0; k<MAX_RUN; k++) 423 459 { 424 460 // Check if run already registered (old entries should have runId==-1) 425 if (runCtrl[ idx].roi0 != nRoi[0] || runCtrl[idx].roi8 != nRoi[8])461 if (runCtrl[k].runId == runID) 426 462 { 427 factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)", 428 runCtrl[idx].roi0, runCtrl[idx].roi8, nRoi[0], nRoi[8], runID, evID); 429 return -9301; 463 // FIXME: Compare to previous event 464 if (runCtrl[k].roi0 != nRoi[0] || runCtrl[k].roi8 != nRoi[8]) 465 { 466 factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)", 467 runCtrl[k].roi0, runCtrl[k].roi8, nRoi[0], nRoi[8], runID, evID); 468 return -9301; 469 } 470 471 found = 1; 472 break; 473 } 474 475 // This is just for sanity. We use the oldest free entry (until 476 // we have understood the concept and can use "just" a free entry 477 if (runCtrl[k].runId==0 && runCtrl[k].closeTime < oldest) 478 { 479 oldest = runCtrl[k].closeTime; 480 jold = k; 430 481 } 431 482 } 432 483 433 // If there is none in the queue, we have to search for the correct entry 434 if (evtCtrl_frstPtr==evtCtrl_lastPtr) 484 if (!found) // Run not yet registered, register run 435 485 { 436 idx = -1; 437 438 for (int k=0; k<MAX_RUN; k++) 439 { 440 if (runCtrl[k].runId==runID) 441 { 442 idx = k; 443 break; 444 } 445 } 446 } 447 448 struct timeval tv; 449 gettimeofday(&tv, NULL); 450 451 const uint32_t tsec = tv.tv_sec; 452 const uint32_t tusec = tv.tv_usec; 453 454 // Run not yet registered, register run 455 // If we haven't found a corresponding entry in the queue, or the runId has changed 456 // we find the oldest empty entry in the runCtrl array and create a new runCtrl entry 457 if (idx<0 || runCtrl[idx].runId!=runID) 458 { 459 // If there is none in the queue or 460 idx = -1; 461 462 uint oldest = g_actTime + 1000; 463 for (int k=0; k<MAX_RUN; k++) 464 { 465 // This is just for sanity. We use the oldest free entry (until 466 // we have understood the concept and can use "just" a free entry 467 if (runCtrl[k].runId==0 && runCtrl[k].closeTime < oldest) 468 { 469 oldest = runCtrl[k].closeTime; 470 idx = k; 471 } 472 } 473 474 if (idx<0) 486 if (jold < 0) 475 487 { 476 488 factPrintf(kFatal, 883, "Not able to register the new run %d", runID); … … 478 490 } 479 491 480 factPrintf(kInfo, 503, "New run %d (evt=%d, idx=%d) registered with roi=%d and roi_tm=%d", 481 runID, evID, idx, nRoi[0], nRoi[8]); 482 483 runCtrl[idx].runId = runID; 484 runCtrl[idx].roi0 = nRoi[0]; // FIXME: Make obsolete! 485 runCtrl[idx].roi8 = nRoi[8]; // FIXME: Make obsolete! 486 runCtrl[idx].fileId = -2; 487 runCtrl[idx].lastEvt = 1; // Number of events partially started to read 488 runCtrl[idx].actEvt = 0; // Number of written events (write) 489 runCtrl[idx].procEvt = 0; // Number of successfully checked events (checkEvent) 490 runCtrl[idx].maxEvt = 999999999; // max number events allowed 491 runCtrl[idx].lastTime = tsec; // Time when the last event was written 492 runCtrl[idx].closeTime = tsec + 3600 * 24; //max time allowed 492 const int evFree = jold; 493 494 factPrintf(kInfo, 503, "New run %d (evID=%d, evFree=%d) registered with roi=%d and roi_tm=%d", 495 runID, evID, evFree, nRoi[0], nRoi[8]); 496 497 runCtrl[evFree].runId = runID; 498 runCtrl[evFree].roi0 = nRoi[0]; // FIXME: Make obsolete! 499 runCtrl[evFree].roi8 = nRoi[8]; // FIXME: Make obsolete! 500 runCtrl[evFree].fileId = -2; 501 runCtrl[evFree].procId = -2; 502 runCtrl[evFree].lastEvt = 1; // Number of events partially started to read 503 runCtrl[evFree].actEvt = 0; // Number of written events (write) 504 runCtrl[evFree].procEvt = 0; // Number of successfully checked events (checkEvent) 505 runCtrl[evFree].maxEvt = 999999999; // max number events allowed 506 runCtrl[evFree].lastTime = tsec; // Time when the last event was written 507 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed 493 508 } 494 509 … … 501 516 evtCtrl[k].board[b] = -1; 502 517 503 evtCtrl[k].runCtrl_idx = idx;504 518 evtCtrl[k].pcTime[0] = tsec; 505 519 evtCtrl[k].pcTime[1] = tusec; … … 568 582 mBufFree (int i) 569 583 { 584 //delete entry [i] from mBuffer: 585 //(and make sure multiple calls do no harm ....) 586 570 587 TGB_free(evtCtrl[i].FADhead); 571 588 … … 580 597 gj.bufTot--; 581 598 599 /*if (gi_memStat < 0) { 600 if (gj.usdMem <= 0.75 * gj.maxMem) 601 gi_memStat = +1; 602 }*/ 603 582 604 return 0; 583 605 584 606 } /*-----------------------------------------------------------------*/ 607 608 /* 609 void 610 resetEvtStat () 611 { 612 for (int i = 0; i < MAX_SOCK; i++) 613 gi.numRead[i] = 0; 614 615 for (int i = 0; i < NBOARDS; i++) { 616 gi.gotByte[i] = 0; 617 gi.gotErr[i] = 0; 618 619 } 620 621 gi.evtGet = 0; //#new Start of Events read 622 gi.evtTot = 0; //#complete Events read 623 gi.evtErr = 0; //#Events with Errors 624 gi.evtSkp = 0; //#Events incomplete (timeout) 625 626 gi.procTot = 0; //#Events processed 627 gi.procErr = 0; //#Events showed problem in processing 628 gi.procTrg = 0; //#Events accepted by SW trigger 629 gi.procSkp = 0; //#Events rejected by SW trigger 630 631 gi.feedTot = 0; //#Events used for feedBack system 632 gi.feedErr = 0; //#Events rejected by feedBack 633 634 gi.wrtTot = 0; //#Events written to disk 635 gi.wrtErr = 0; //#Events with write-error 636 637 gi.runOpen = 0; //#Runs opened 638 gi.runClose = 0; //#Runs closed 639 gi.runErr = 0; //#Runs with open/close errors 640 641 return; 642 }*/ /*-----------------------------------------------------------------*/ 585 643 586 644 uint64_t reportIncomplete(int id, const char *txt) … … 704 762 READ_STRUCT rd[NBOARDS]; //buffer to read IP and afterwards store in mBuffer 705 763 706 uint32_t actrun = 0;707 708 764 const int minLen = sizeof(PEVNT_HEADER); //min #bytes needed to check header: full header for debug 709 765 766 //start.S = 0xFB01; 767 //stop.S = 0x04FE; 710 768 711 769 /* initialize run control logics */ … … 713 771 runCtrl[i].runId = 0; 714 772 runCtrl[i].fileId = -2; 715 } 716 717 int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX; 773 runCtrl[i].procId = -2; 774 } 718 775 gi_resetS = gi_resetR = 9; 719 776 … … 727 784 //time in seconds 728 785 uint gi_SecTime = time(NULL);; 729 g_actTime = gi_SecTime;730 786 731 787 const int cntsock = 8 - NUMSOCK ; … … 777 833 while (g_runStat >= 0 && g_reset == 0) 778 834 { 835 gi_runStat = g_runStat; 779 836 gj.readStat = g_runStat; 837 838 struct timeval tv; 839 gettimeofday (&tv, NULL); 840 g_actTime = tv.tv_sec; 841 g_actUsec = tv.tv_usec; 842 780 843 781 844 for (int b = 0; b < NBOARDS; b++) … … 786 849 787 850 gi_NumConnect[b] = 0; //must close all connections 851 //gi.numConn[b] = 0; 788 852 gj.numConn[b] = 0; 789 853 … … 856 920 continue; 857 921 922 //numok++; 923 858 924 if (rd[i].bufLen>0) 859 925 { … … 867 933 // There was just nothing waiting 868 934 if (errno==EWOULDBLOCK || errno==EAGAIN) 935 { 936 //numok--; 869 937 continue; 938 } 870 939 871 940 factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno); 941 //gi.gotErr[b]++; 872 942 continue; 873 943 } … … 881 951 GenSock(s0, i, 0, NULL, &rd[i]); 882 952 953 //gi.gotErr[b]++; 954 883 955 gi_NumConnect[i]-= cntsock ; 956 //gi.numConn[b]--; 884 957 gj.numConn[i]--; 885 958 886 959 continue; 887 960 } 961 // Success (jrd > 0) 888 962 889 963 gj.rateBytes[i] += jrd; … … 999 1073 rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04) 1000 1074 { 1075 //gi.evtErr++; 1001 1076 factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d", 1002 1077 i, rd[i].evtID, rd[i].fadLen, … … 1096 1171 if (evtCtrl[idx].nBoard==1 && evtCtrl[idx].runNum != actrun) 1097 1172 { 1098 // Signal the fadctrl that a new run has been started1173 // Signal the fadctrl that a new run has been started 1099 1174 gotNewRun(evtCtrl[idx].runNum, NULL); 1100 1175 … … 1125 1200 // This is a non-ideal hack to lower the probability that 1126 1201 // in mBufEvt the search for correct entry in runCtrl 1127 // will not return a super-old entry. I don't want 1128 // to manipulate that in another thread. 1202 // will not return a super-old entry 1129 1203 for (int ir=0; ir<MAX_RUN; ir++) 1130 1204 { … … 1180 1254 const int count = (evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT; 1181 1255 1182 // This could be improved having the pointer which separates the queue with1183 // the incomplete events from the queue with the complete events1184 1256 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1185 1257 { … … 1190 1262 1191 1263 // Event has not yet timed out or was reported already 1192 if (evtCtrl[k0].evtStat==90 || evtCtrl[k0].pcTime[0] >=g_actTime - 30)1264 if (evtCtrl[k0].evtStat==90 || evtCtrl[k0].pcTime[0]/*evtCtrl[k0].lastRecv*/>=g_actTime - 30) 1193 1265 continue; 1194 1266 … … 1224 1296 continue; 1225 1297 } 1298 1299 // The first events in the queue are either incomplete or 1300 // can be deleted (processing finished). As soon as we reach the 1301 // first complete events which processing is pending, we can stop. 1302 // All other events (if everything works well) must have the same state. 1303 // FIXME: This only works if we start from lastPtr and go down to frstPtr 1304 //break; 1226 1305 } 1227 1306 … … 1273 1352 1274 1353 //inform others we have to quit .... 1354 gi_runStat = -11; //inform all that no update to happen any more 1275 1355 gj.readStat = -11; //inform all that no update to happen any more 1276 1356 } … … 1313 1393 evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % MAX_EVT; 1314 1394 evtCtrl[k0].evtStat = -1; 1315 1395 } 1316 1396 1317 1397 usleep(1); … … 1341 1421 factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse); 1342 1422 1423 gi_runStat = -99; 1343 1424 gj.readStat = -99; 1344 1425 1345 1426 factStat (gj); 1346 //factStatNew (gi);1427 factStatNew (gi); 1347 1428 1348 1429 return 0; … … 1363 1444 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1364 1445 { 1365 // This is a threading issue... the evtStat might have been invalid 1366 // but the frstPtr is not yet updated 1367 if (evtCtrl[k0].evtStat==-1) 1368 continue; 1369 1370 // If we find the first event still waiting for processing 1371 // there will be only unprocessed events after this one in the queue 1372 if (evtCtrl[k0].evtStat<1000+threadID) 1373 { 1374 numWait = 1; 1375 break; 1376 } 1377 1378 // If the event was processed already, skip it 1379 // We could replace that to a moving pointer pointing to the first 1380 // non-processed event 1381 if (evtCtrl[k0].evtStat!=1000+threadID) 1382 continue; 1383 1384 const int jret = subProcEvt(threadID, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent, 0); 1446 if (evtCtrl[k0].evtStat != 1000 + threadID) 1447 { 1448 if (evtCtrl[k0].evtStat < 1000 + threadID) 1449 numWait++; 1450 1451 continue; 1452 } 1453 1454 /* 1455 // gi_resetR>1 : flush buffers asap 1456 if (gi_resetR>1) 1457 { 1458 evtCtrl[k].evtStat = 9000; // flag as 'to be deleted' 1459 continue; 1460 1461 }*/ 1462 1463 const int jret = subProcEvt(threadID, evtCtrl[k0].FADhead, 1464 evtCtrl[k0].fEvent, NULL/*mBuffer[id].buffer*/); 1385 1465 1386 1466 if (jret>0 && jret<=threadID) … … 1424 1504 int status; 1425 1505 1506 int lastRun = 0; //usually run from last event still valid 1507 1508 // cpu_set_t mask; 1509 // int cpu = 1; //process thread (will be several in final version) 1510 1426 1511 factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc); 1512 1513 /* CPU_ZERO initializes all the bits in the mask to zero. */ 1514 // CPU_ZERO (&mask); 1515 /* CPU_SET sets only the bit corresponding to cpu. */ 1516 // CPU_SET( 0 , &mask ); leave for system 1517 // CPU_SET( 1 , &mask ); used by write process 1518 // CPU_SET (2, &mask); 1519 // CPU_SET (3, &mask); 1520 // CPU_SET (4, &mask); 1521 // CPU_SET (5, &mask); 1522 // CPU_SET (6, &mask); 1523 // CPU_SET( 7 , &mask ); used by read process 1524 /* sched_setaffinity returns 0 in success */ 1525 // if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { 1526 // snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu); 1527 // factOut (kWarn, -1, str); 1528 // } 1529 1427 1530 1428 1531 pthread_t thread[100]; … … 1437 1540 { 1438 1541 int numWait = 0; 1542 int numProc = 0; 1439 1543 1440 1544 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1441 1545 { 1442 // This is a threading issue... the evtStat might have been invalid 1443 // but the frstPtr is not yet updated 1444 if (evtCtrl[k0].evtStat==-1) 1546 if (evtCtrl[k0].evtStat<90 || evtCtrl[k0].evtStat>=1000) 1547 { 1548 if (/*evtCtrl[k0].evtStat>=0 &&*/ evtCtrl[k0].evtStat<90) 1549 numWait++; 1550 1445 1551 continue; 1446 1447 // If we find the first incomplete event which is not supposed to 1448 // be processed, there are only more incomplete events in the queue 1449 if (evtCtrl[k0].evtStat<90) 1552 } 1553 1554 /* 1555 //we are asked to flush buffers asap 1556 if (gi_resetR > 1) 1450 1557 { 1451 numWait = 1; 1452 break; 1453 } 1454 1455 // If the event was processed already, skip it. 1456 // We could replace that to a moving pointer pointing to the first 1457 // non-processed event 1458 if (evtCtrl[k0].evtStat>=1000) 1558 evtCtrl[k0].evtStat = 9000; 1459 1559 continue; 1560 }*/ 1460 1561 1461 1562 //-------- it is better to open the run already here, so call can be used to initialize … … 1464 1565 const int32_t ievt = evtCtrl[k0].evNum; 1465 1566 1466 const int idx = evtCtrl[k0].runCtrl_idx; 1467 if (runCtrl[idx].runId!=irun) 1567 // Find entry in runCtrl which belongs to the event mBuffer[id] 1568 // (only check if there is a need to check) 1569 if (runCtrl[lastRun].runId != irun) 1468 1570 { 1469 //factPrintf(kFatal, 901, "procEvt: runCtrl entry for run %d vanished (evt=%d)", irun, ievt); 1470 // FIXME: What is the right action? (Flag event for deletion?) 1471 continue; 1571 //check which fileID to use (or open if needed) 1572 int j; 1573 for (j=0;j<MAX_RUN; j++) 1574 if (runCtrl[j].runId == irun) 1575 break; 1576 1577 if (j>=MAX_RUN) 1578 { 1579 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, k0); 1580 // FIXME: What is the right action? (Flag event for deletion?) 1581 continue; 1582 } 1583 1584 lastRun = j; 1472 1585 } 1473 1586 1474 1587 // File not yet open 1475 if (runCtrl[ idx].fileId < 0)1588 if (runCtrl[lastRun].fileId < 0) 1476 1589 { 1590 //---- we need to open a new run ==> make sure all older runs are 1591 //---- finished and marked to be closed .... 1592 // This loop is unique to procEvt 1593 for (int j=0; j<MAX_RUN; j++) 1594 { 1595 if (runCtrl[j].fileId == 0) 1596 { 1597 runCtrl[j].procId = 2; //--> do no longer accept events for processing 1598 1599 //---- problem: processing still going on ==> must wait for closing .... 1600 factPrintf(kInfo, -1, "procEvt: Finished run since new one opened %d", runCtrl[j].runId); 1601 runFinish1(runCtrl[j].runId); 1602 } 1603 } 1604 1477 1605 RUN_HEAD actRun; 1478 1606 actRun.Version = 1; … … 1488 1616 memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS*sizeof(PEVNT_HEADER)); 1489 1617 1490 runCtrl[ idx].fileHd = runOpen(irun, &actRun, sizeof (actRun));1491 if (runCtrl[ idx].fileHd == NULL)1618 runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun)); 1619 if (runCtrl[lastRun].fileHd == NULL) 1492 1620 { 1493 factPrintf(kError, 502, "procEvt: Could not open new file for run %d (idx=%d, evt=%d, runOpen failed)", irun, idx, ievt); 1494 runCtrl[idx].fileId = 91; 1621 factPrintf(kError, 502, "procEvt: Could not open a file for run %d (runOpen failed)", irun); 1622 runCtrl[lastRun].fileId = 91; 1623 runCtrl[lastRun].procId = 91; // Is not set in writeEvt 1495 1624 continue; 1496 1625 } 1497 1626 1498 runCtrl[idx].fileId = 0; 1499 1500 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (idx=%d, evt=%d)", irun, idx, ievt); 1627 runCtrl[lastRun].fileId = 0; 1628 runCtrl[lastRun].procId = 0; // Is not set in writeEvt 1629 1630 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt); 1631 } 1632 1633 //-------- also check if run shall be closed (==> skip event, but do not close the file !!! ) 1634 if (runCtrl[lastRun].procId == 0) 1635 { 1636 if (runCtrl[lastRun].closeTime < g_actTime || 1637 runCtrl[lastRun].lastTime < g_actTime - 300 || 1638 runCtrl[lastRun].maxEvt <= runCtrl[lastRun].procEvt) 1639 { 1640 factPrintf(kInfo, 502, "procEvt: Reached end of run condition for run %d", irun); 1641 runFinish1 (runCtrl[lastRun].runId); 1642 runCtrl[lastRun].procId = 1; 1643 } 1644 } 1645 1646 // Skip event because of no active run 1647 if (runCtrl[lastRun].procId != 0) 1648 { 1649 evtCtrl[k0].evtStat = 10000; // flag 'to be deleted' 1650 continue; 1501 1651 } 1502 1652 … … 1524 1674 } 1525 1675 1526 const int rc = eventCheck(evtCtrl[k0].runNum, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent); 1676 const int rc = eventCheck(evtCtrl[k0].runNum, evtCtrl[k0].FADhead, 1677 evtCtrl[k0].fEvent); 1678 //gi.procTot++; 1679 numProc++; 1680 1527 1681 if (rc < 0) 1528 1682 { 1529 1683 evtCtrl[k0].evtStat = 10000; // flag event to be deleted 1684 //gi.procErr++; 1530 1685 } 1531 1686 else 1532 1687 { 1533 1688 evtCtrl[k0].evtStat = 1000; // flag 'start processing' 1534 runCtrl[ idx].procEvt++;1689 runCtrl[lastRun].procEvt++; 1535 1690 } 1536 1691 } … … 1538 1693 if (gj.readStat < -10 && numWait == 0) { //nothing left to do 1539 1694 factPrintf(kInfo, -1, "Exit Processing Process ..."); 1695 gp_runStat = -22; //==> we should exit 1540 1696 gj.procStat = -22; //==> we should exit 1541 1697 return 0; 1542 1698 } 1543 1699 1544 usleep(1); 1545 1700 //seems we have nothing to do, so sleep a little 1701 if (numProc == 0) 1702 usleep(1); 1703 1704 gp_runStat = gi_runStat; 1546 1705 gj.procStat = gj.readStat; 1706 1547 1707 } 1548 1708 … … 1556 1716 } 1557 1717 1718 /* 1719 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1720 { 1721 if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat < 1000) 1722 evtCtrl[k0].evtStat = 9000; //flag event as 'processed' 1723 }*/ 1724 1725 gp_runStat = -99; 1558 1726 gj.procStat = -99; 1559 1727 … … 1618 1786 ii |= 8; // = 4; 1619 1787 1788 if (runCtrl[j].procId == 0) 1789 { 1790 runFinish1(runCtrl[j].runId); 1791 runCtrl[j].procId = 92; 1792 } 1793 1620 1794 runCtrl[j].closeTime = g_actTime - 1; 1621 1795 … … 1642 1816 /* *** main loop writing event (including opening and closing run-files */ 1643 1817 1818 // cpu_set_t mask; 1819 // int cpu = 1; //write thread 1820 1644 1821 factPrintf(kInfo, -1, "Starting write-thread"); 1822 1823 /* CPU_ZERO initializes all the bits in the mask to zero. */ 1824 // CPU_ZERO (&mask); 1825 /* CPU_SET sets only the bit corresponding to cpu. */ 1826 // CPU_SET (cpu, &mask); 1827 /* sched_setaffinity returns 0 in success */ 1828 // if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { 1829 // snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu); 1830 // } 1831 1832 int lastRun = 0; //usually run from last event still valid 1645 1833 1646 1834 while (g_runStat > -2) 1647 1835 { 1648 //int numWrite = 0;1836 int numWrite = 0; 1649 1837 int numWait = 0; 1650 1838 … … 1653 1841 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1654 1842 { 1655 // This is a threading issue... the evtStat might have been invalid 1656 // but the frstPtr is not yet updated 1657 if (evtCtrl[k0].evtStat==-1) 1843 if (evtCtrl[k0].evtStat<5000 || evtCtrl[k0].evtStat >= 10000) 1844 { 1845 if (/*evtCtrl[k0].evtStat > 0 &&*/ evtCtrl[k0].evtStat < 5000) 1846 numWait++; 1847 1658 1848 continue; 1659 1660 // If we find the first non-written event which is not supposed to 1661 // be written, there are only more incomplete events in the queue 1662 if (evtCtrl[k0].evtStat<5000) 1849 } 1850 1851 //we must drain the buffer asap 1852 /* 1853 if (gi_resetR > 1) 1663 1854 { 1664 numWait = 1; 1665 break; 1666 } 1667 1668 // If the event was written already already, skip it 1669 // We could replace that to a moving pointer pointing to the first 1670 // non-processed event 1671 if (evtCtrl[k0].evtStat!=5000) 1855 evtCtrl[k0].evtStat = 9000; 1672 1856 continue; 1857 }*/ 1673 1858 1674 1859 const uint32_t irun = evtCtrl[k0].runNum; 1675 1860 const int32_t ievt = evtCtrl[k0].evNum; 1676 1861 1677 const int idx = evtCtrl[k0].runCtrl_idx;1678 1679 if (runCtrl[ idx].runId!=irun)1862 // Find entry in runCtrl which belongs to the event mBuffer[id] 1863 // (only check if there is a need to check) 1864 if (runCtrl[lastRun].runId != irun) 1680 1865 { 1681 //factPrintf(kFatal, 901, "writeEvt: runCtrl entry for run %d vanished (evt=%d)", irun, ievt); 1682 // FIXME: What is the right action? (Flag event for deletion?) 1683 continue; 1866 //check which fileID to use (or open if needed) 1867 int j; 1868 for (j=0;j<MAX_RUN; j++) 1869 if (runCtrl[j].runId == irun) 1870 break; 1871 1872 if (j>=MAX_RUN) 1873 { 1874 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, k0); 1875 // FIXME: What is the right action? 1876 continue; 1877 } 1878 1879 lastRun = j; 1684 1880 } 1685 1881 1882 // File not yet open 1883 if (runCtrl[lastRun].fileId < 0) 1884 { 1885 RUN_HEAD actRun; 1886 actRun.Version = 1; 1887 actRun.RunType = -1; //to be adapted 1888 actRun.Nroi = evtCtrl[k0].nRoi; //runCtrl[lastRun].roi0; 1889 actRun.NroiTM = evtCtrl[k0].nRoiTM; //runCtrl[lastRun].roi8; 1890 actRun.RunTime = evtCtrl[k0].pcTime[0];//runCtrl[lastRun].firstTime; 1891 actRun.RunUsec = evtCtrl[k0].pcTime[1];//runCtrl[lastRun].firstUsec; 1892 actRun.NBoard = NBOARDS; 1893 actRun.NPix = NPIX; 1894 actRun.NTm = NTMARK; 1895 1896 memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS * sizeof (PEVNT_HEADER)); 1897 1898 runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun)); 1899 if (runCtrl[lastRun].fileHd == NULL) 1900 { 1901 factPrintf(kError, 502, "writeEvt: Could not open a file for run %d (runOpen failed)", irun); 1902 runCtrl[lastRun].fileId = 91; 1903 continue; 1904 } 1905 1906 runCtrl[lastRun].fileId = 0; 1907 factPrintf(kInfo, -1, "writeEvt: Opened new file for run %d (evt %d)", irun, ievt); 1908 } 1909 1910 /* 1911 if (runCtrl[lastRun].fileId > 0) 1912 { 1913 // There is an event but file is already closed 1914 //if (runCtrl[j].fileId < 100) 1915 //{ 1916 // factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun); 1917 // runCtrl[j].fileId += 100; 1918 //} 1919 1920 evtCtrl[k0].evtStat = 9000; 1921 }*/ 1922 1686 1923 // File is open 1687 if (runCtrl[ idx].fileId==0)1924 if (runCtrl[lastRun].fileId==0) 1688 1925 { 1689 const int rc = runWrite(runCtrl[idx].fileHd, evtCtrl[k0].fEvent, 0); 1926 const int rc = runWrite(runCtrl[lastRun].fileHd, evtCtrl[k0].fEvent, 1927 0/*sizeof (evtCtrl[k0])*/); 1690 1928 if (rc >= 0) 1691 1929 { 1692 1930 // Sucessfully wrote event 1693 runCtrl[ idx].lastTime = g_actTime;1694 runCtrl[ idx].actEvt++;1931 runCtrl[lastRun].lastTime = g_actTime; 1932 runCtrl[lastRun].actEvt++; 1695 1933 } 1696 1934 else 1697 1935 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun); 1698 1936 1699 checkAndCloseRun( idx, irun, rc<0, 1);1937 checkAndCloseRun(lastRun, irun, rc<0, 1); 1700 1938 } 1701 1939 1702 1940 evtCtrl[k0].evtStat = 10000; // event written (or has to be discarded) -> delete 1703 1941 } 1942 /* 1943 //check if we should close a run (mainly when no event pending) 1944 //ETIENNE but first figure out which one is the latest run with a complete event. 1945 //i.e. max run Id and lastEvt >= 0 1946 //this condition is sufficient because all pending events were written already in the loop just above 1947 //actrun 1948 uint32_t lastStartedTime = 0; 1949 uint32_t runIdFound = 1; 1950 1951 //If we have an active run, look for its start time 1952 if (actrun != 0) 1953 { 1954 runIdfound = 0; 1955 for (int j=0;j<MAX_RUN;j++) 1956 { 1957 if (runCtrl[j].runId == actrun) 1958 { 1959 lastStartedTime = runCtrl[j].lastTime; 1960 runIdFound = 1; 1961 } 1962 } 1963 } 1964 1965 if (runIdFound == 0) 1966 { 1967 factPrintf(kInfo, 0, "An Active run (number %u) has been registered, but it could not be found in the runs list", actrun); 1968 } 1969 1970 //Also check if some files will never be opened 1971 //EDIT: this is completely useless, because as run Numbers are taken from FADs board, 1972 //I will never get run numbers for which no file is to be opened 1973 for (int j=0;j<MAX_RUN;j++) 1974 { 1975 if ((runCtrl[j].fileId < 0) && 1976 (runCtrl[j].lastTime < lastStartedTime) && 1977 (runCtrl[j].runId != 0)) 1978 { 1979 factPrintf(kInfo, 0, "writeEvt: No file will be opened for run %u. Last run: %u (started)", runCtrl[j].runId, actrun); 1980 ;//TODO notify that this run will never be opened 1981 } 1982 } 1983 */ 1704 1984 1705 1985 // Although the are no pending events, we have to check if a run should be closed (timeout) … … 1714 1994 } 1715 1995 1716 usleep(1); 1996 //seems we have nothing to do, so sleep a little 1997 if (numWrite == 0) 1998 usleep(1); 1717 1999 1718 2000 //nothing left to do … … 1720 2002 { 1721 2003 factPrintf(kInfo, -1, "Finish Write Process ..."); 2004 gw_runStat = -22; //==> we should exit 1722 2005 gj.writStat = -22; //==> we should exit 1723 2006 break; 1724 2007 } 1725 2008 2009 gw_runStat = gi_runStat; 1726 2010 gj.writStat = gj.readStat; 1727 2011 } … … 1734 2018 } 1735 2019 2020 gw_runStat = -99; 1736 2021 gj.writStat = -99; 1737 2022 … … 1752 2037 struct timespec xwait; 1753 2038 2039 gi_runStat = gp_runStat = gw_runStat = 0; 1754 2040 gj.readStat = gj.procStat = gj.writStat = 0; 1755 2041 … … 1761 2047 runCtrl[i].fileId = -2; 1762 2048 } 2049 2050 //prepare for subProcesses 2051 gi_maxSize = g_maxSize; 2052 if (gi_maxSize <= 0) 2053 gi_maxSize = 1; 1763 2054 1764 2055 gi_maxProc = g_maxProc; … … 1969 2260 1970 2261 g_maxMem = 1024 * 1024; //MBytes 2262 //g_maxMem = g_maxMem * 1024 *10 ; //10GBytes 1971 2263 g_maxMem = g_maxMem * 200; //100MBytes 1972 2264 1973 2265 g_maxProc = 20; 2266 g_maxSize = 30000; 1974 2267 1975 2268 g_runStat = 40;
Note:
See TracChangeset
for help on using the changeset viewer.