Changeset 15483
- Timestamp:
- 05/03/13 22:30:18 (12 years ago)
- Location:
- trunk/FACT++/src
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.c
r15476 r15483 69 69 int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt); 70 70 71 int evtCtrl_frstPtr; 72 int evtCtrl_lastPtr; 71 int evtCtrl_frstPtr; // First event in queue 72 int evtCtrl_lastPtr; // pointer to next free slot 73 73 74 74 int g_maxProc; … … 105 105 GUI_STAT gj; 106 106 107 EVT_CTRL evtCtrl[MAX_EVT * MAX_RUN]; //control of events during processing 107 #define MAX_EVT 65536 // ( 300s @ 220Hz; 16GB = 5000 evt @ roi=1024 (27s) ; 18000 evt @ roi = 300 ) 108 #define MAX_RUN 8 // Number of concurrent runs 109 110 EVT_CTRL evtCtrl[MAX_EVT]; //control of events during processing 108 111 109 112 void factPrintf(int severity, int id, const char *fmt, ...) … … 377 380 int mBufEvt(const READ_STRUCT *rs) 378 381 { 379 // generate a new Event into mBuffer:380 // make sure only complete Event are possible, so 'free' will always work381 // returns index into mBuffer[], or negative value in case of error382 // error: <-9000 if roi screwed up (not consistent with run)383 // <-8000 (not consistent with event)384 // <-7000 (not consistent with board)385 // < 0 if no space left386 387 382 int nRoi[9]; 388 383 if (!checkRoiConsistency(rs->rBuf, nRoi)) … … 395 390 const int fadNum = rs->evtID; 396 391 397 const int beg = (evtCtrl_lastPtr + MAX_EVT *MAX_RUN - 1) % (MAX_EVT*MAX_RUN);398 const int end = (evtCtrl_frstPtr + MAX_EVT *MAX_RUN - 1) % (MAX_EVT*MAX_RUN);399 400 for (int k=beg; k!=end; k=(k+MAX_EVT *MAX_RUN-1)%(MAX_EVT*MAX_RUN))392 const int beg = (evtCtrl_lastPtr + MAX_EVT - 1) % MAX_EVT; 393 const int end = (evtCtrl_frstPtr + MAX_EVT - 1) % MAX_EVT; 394 395 for (int k=beg; k!=end; k=(k+MAX_EVT-1)%MAX_EVT) 401 396 { 402 397 // If the run is different, go on searching. … … 437 432 438 433 // Check if the control structure still has space left 439 if (end-beg==1 || (end==0 && beg==MAX_EVT *MAX_RUN-1))434 if (end-beg==1 || (end==0 && beg==MAX_EVT-1)) 440 435 { 441 436 factPrintf(kError, 881, "No control slot to keep event %d (run %d) %d %d", evID, runID, beg, end); … … 537 532 evtCtrl[k].FADhead = NULL; 538 533 534 // -1: kInValid 535 // 0: kValid 536 // 1-40: kIncomplete 537 // 90: kIncompleteReported 538 // 100: kCompleteEventInBuffer 539 // 1000+x: kToBeProcessedByThreadX 540 // 5000: kToBeWritten 541 // 10000: kToBeDeleted 542 539 543 evtCtrl[k].evtStat = 0; 540 544 … … 543 547 // in two instructions. Must be done only _after_ the contents 544 548 // have been initialized 545 evtCtrl_lastPtr = (evtCtrl_lastPtr+1) % (MAX_EVT*MAX_RUN);549 evtCtrl_lastPtr = (evtCtrl_lastPtr+1) % MAX_EVT; 546 550 547 551 return k; … … 638 642 }*/ /*-----------------------------------------------------------------*/ 639 643 640 void reportIncomplete(int id)641 { 642 factPrintf(kWarn, 601, " %5d skip incomplete evt %8d",643 evtCtrl[id]. evNum, id);644 uint64_t reportIncomplete(int id, const char *txt) 645 { 646 factPrintf(kWarn, 601, "skip incomplete evt (run=%d, evt=%d, %s)", 647 evtCtrl[id].runNum, evtCtrl[id].evNum, txt); 644 648 645 649 uint64_t report = 0; … … 660 664 } 661 665 662 // FIXME: Is that really 'b' or should that be 'ib' ? 666 // FIXME: This is not synchronous... it reports 667 // accoridng to the current connection status, not w.r.t. to the 668 // one when the event was taken. 663 669 if (gi_NumConnect[ib]<=0) // board not connected 664 670 { … … 677 683 factOut(kWarn, 601, str); 678 684 679 factReportIncomplete(report);685 return report; 680 686 } 681 687 … … 808 814 809 815 // initialize mBuffer (mark all entries as unused\empty) 810 for (int i = 0; i < MAX_EVT * MAX_RUN; i++)816 for (int i = 0; i < MAX_EVT; i++) 811 817 { 812 818 evtCtrl[i].evNum = evtCtrl[i].nRoi = -1; … … 1201 1207 } 1202 1208 1209 const int beg = (idx + MAX_EVT - 1) % MAX_EVT; 1210 const int end = (evtCtrl_frstPtr + MAX_EVT - 1) % MAX_EVT; 1211 1212 // we have just completed an event... so all previous events 1213 // must have been completed already. If they are not, there 1214 // is no need to wait for the timeout, because they will never 1215 // get completed. We can just ensure that if we check for the previous 1216 // event to be complete every time we receive a new complete event. 1217 // If we find an incomplete one, we remove all consecutive 1218 // incomplete ones. 1219 for (int k=beg; k!=end; k=(k+MAX_EVT-1)%MAX_EVT) 1220 { 1221 // We are done if we find a complete or fully processed event 1222 if (evtCtrl[k].evtStat>=100) 1223 break; 1224 1225 // we do not call factReportIncomplete here, because by starting a new 1226 // run and having received the first complete event from that run 1227 // the user has expressed that the old events are obsolste now 1228 // and the run will be closed anyway 1229 if (evtCtrl[k].evtStat>=0 && evtCtrl[k].evtStat<90) 1230 { 1231 reportIncomplete(k, "expired"); 1232 evtCtrl[k].evtStat = 90; 1233 } 1234 } 1235 1203 1236 // Flag that the event is ready for processing 1204 evtCtrl[idx].evtStat = 99;1237 evtCtrl[idx].evtStat = 100; 1205 1238 1206 1239 } // end for loop over all sockets … … 1219 1252 //delete those that are written to disk .... 1220 1253 1221 const int count = (evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT *MAX_RUN)%(MAX_EVT*MAX_RUN);1222 1223 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT *MAX_RUN)1254 const int count = (evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT; 1255 1256 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1224 1257 { 1225 1258 // Check the more likely case first: incomplete events 1226 if (evtCtrl[k0].evtStat>=0 && evtCtrl[k0].evtStat< 92)1259 if (evtCtrl[k0].evtStat>=0 && evtCtrl[k0].evtStat<100) 1227 1260 { 1228 1261 gj.bufNew++; //incomplete event in Buffer 1229 1262 1230 1263 // Event has not yet timed out or was reported already 1231 if (evtCtrl[k0].evtStat >=90 || evtCtrl[k0].pcTime[0]/*evtCtrl[k0].lastRecv*/>=g_actTime - 30)1264 if (evtCtrl[k0].evtStat==90 || evtCtrl[k0].pcTime[0]/*evtCtrl[k0].lastRecv*/>=g_actTime - 30) 1232 1265 continue; 1233 1266 … … 1235 1268 // It doesn't matter if that takes comparably long, 1236 1269 // because we have to stop the run anyway. 1237 reportIncomplete(k0); 1270 const uint64_t rep = reportIncomplete(k0, "timeout"); 1271 factReportIncomplete(rep); 1238 1272 1239 1273 //timeout for incomplete events 1240 evtCtrl[k0].evtStat = 9 1;1274 evtCtrl[k0].evtStat = 90; 1241 1275 gj.evtSkip++; 1242 1276 1243 1277 continue; 1244 1278 } 1245 1246 // complete event in Buffer1247 //if (evtCtrl[k0].evtStat >= 95)1248 // gj.bufEvt++;1249 1279 1250 1280 // Check the less likely case: 'useless' or 'delete' 1251 1281 // evtState==0 can happen if the event was initialized (some data received) 1252 1282 // but the data did not make sense (e.g. inconsistent rois) 1253 if (evtCtrl[k0].evtStat==0 || evtCtrl[k0].evtStat >= 9000)1283 if (evtCtrl[k0].evtStat==0 || evtCtrl[k0].evtStat == 10000) 1254 1284 { 1255 1285 mBufFree(k0); //event written--> free memory 1256 1286 evtCtrl[k0].evtStat = -1; 1257 1287 1258 if (k0==evtCtrl_frstPtr)1259 evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % (MAX_EVT*MAX_RUN);1260 else1261 factPrintf(kError, -1, "Freed a non-first slot");1262 1263 1288 gj.evtWrite++; 1264 1289 gj.rateWrite++; 1265 1290 } 1291 1292 // Remove leading invalidated slots from queue 1293 if (evtCtrl[evtCtrl_frstPtr].evtStat==-1 && k0==evtCtrl_frstPtr) 1294 { 1295 evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % MAX_EVT; 1266 1296 continue; 1267 1297 } … … 1271 1301 // first complete events which processing is pending, we can stop. 1272 1302 // All other events (if everything works well) must have the same state. 1273 break; 1303 // FIXME: This only works if we start from lastPtr and go down to frstPtr 1304 //break; 1274 1305 } 1275 1306 … … 1353 1384 1354 1385 // flag incomplete events as 'read finished' 1355 if (evtCtrl[k0].evtStat>0 && evtCtrl[k0].evtStat < 90) 1356 evtCtrl[k0].evtStat = 91; 1357 1358 if (evtCtrl[k0].evtStat==0 || evtCtrl[k0].evtStat>900) 1386 // We cannot just detele all events, because some might currently being processed, 1387 // so we have to wait until the processing thread currently processing the event 1388 // signals that the event can be deleted. (Note, that there are currently never 1389 // two threads processing the same event at the same time) 1390 if ((evtCtrl[k0].evtStat>0 && evtCtrl[k0].evtStat<90) || evtCtrl[k0].evtStat==10000) 1359 1391 { 1360 1392 mBufFree(k0); //event written--> free memory 1393 evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % MAX_EVT; 1361 1394 evtCtrl[k0].evtStat = -1; 1362 1363 evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % (MAX_EVT*MAX_RUN); 1364 } 1395 } 1365 1396 1366 1397 usleep(1); … … 1410 1441 { 1411 1442 int numWait = 0; 1412 int numProc = 0; 1413 1414 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) 1443 1444 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1415 1445 { 1416 1446 if (evtCtrl[k0].evtStat != 1000 + threadID) … … 1422 1452 } 1423 1453 1424 int jret = 9100; // flag to be deleted (gi_resetR>1 : flush buffers asap)1425 1426 if (gi_resetR <=1)1454 /* 1455 // gi_resetR>1 : flush buffers asap 1456 if (gi_resetR>1) 1427 1457 { 1428 jret = subProcEvt(threadID, evtCtrl[k0].FADhead, 1429 evtCtrl[k0].fEvent, NULL/*mBuffer[id].buffer*/); 1430 1431 1432 if (jret <= threadID) { 1433 factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret); 1434 jret = 5300; 1435 } else if (jret <= 0) 1436 jret = 9200 + threadID; // flag as 'to be deleted' 1437 else if (jret >= gi_maxProc) 1438 jret = 5200 + threadID; // flag as 'to be written' 1439 else 1440 jret = 1000 + jret; // flag for next proces 1458 evtCtrl[k].evtStat = 9000; // flag as 'to be deleted' 1459 continue; 1460 1461 }*/ 1462 1463 const int jret = subProcEvt(threadID, evtCtrl[k0].FADhead, 1464 evtCtrl[k0].fEvent, NULL/*mBuffer[id].buffer*/); 1465 1466 if (jret>0 && jret<=threadID) 1467 factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret); 1468 1469 if (jret<=threadID) 1470 { 1471 evtCtrl[k0].evtStat = 10000; // flag as 'to be deleted' 1472 continue; 1441 1473 } 1442 1474 1443 evtCtrl[k0].evtStat = jret; 1444 numProc++; 1475 if (jret>=gi_maxProc) 1476 { 1477 evtCtrl[k0].evtStat = 5000; // flag as 'to be written' 1478 continue; 1479 } 1480 1481 evtCtrl[k0].evtStat = 1000 + jret; // flag for next proces 1445 1482 } 1446 1483 … … 1450 1487 } 1451 1488 1452 //seems we have nothing to do, so sleep a little 1453 if (numProc == 0) 1454 usleep(1); 1489 usleep(1); 1455 1490 } 1456 1491 … … 1507 1542 int numProc = 0; 1508 1543 1509 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT *MAX_RUN)1544 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1510 1545 { 1511 if (evtCtrl[k0].evtStat <= 90 || evtCtrl[k0].evtStat >=1000)1546 if (evtCtrl[k0].evtStat<90 || evtCtrl[k0].evtStat>=1000) 1512 1547 { 1513 if ( evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat<90)1548 if (/*evtCtrl[k0].evtStat>=0 &&*/ evtCtrl[k0].evtStat<90) 1514 1549 numWait++; 1515 1550 … … 1517 1552 } 1518 1553 1554 /* 1519 1555 //we are asked to flush buffers asap 1520 1556 if (gi_resetR > 1) 1521 1557 { 1522 evtCtrl[k0].evtStat = 9 991;1558 evtCtrl[k0].evtStat = 9000; 1523 1559 continue; 1524 } 1560 }*/ 1525 1561 1526 1562 //-------- it is better to open the run already here, so call can be used to initialize … … 1611 1647 if (runCtrl[lastRun].procId != 0) 1612 1648 { 1613 evtCtrl[k0].evtStat = 9091;1649 evtCtrl[k0].evtStat = 10000; // flag 'to be deleted' 1614 1650 continue; 1615 1651 } … … 1645 1681 if (rc < 0) 1646 1682 { 1647 evtCtrl[k0].evtStat = 9999; //flag event to be skipped1683 evtCtrl[k0].evtStat = 10000; // flag event to be deleted 1648 1684 //gi.procErr++; 1649 1685 } 1650 1686 else 1651 1687 { 1652 evtCtrl[k0].evtStat = 1000; 1688 evtCtrl[k0].evtStat = 1000; // flag 'start processing' 1653 1689 runCtrl[lastRun].procEvt++; 1654 1690 } … … 1680 1716 } 1681 1717 1682 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) 1718 /* 1719 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1683 1720 { 1684 1721 if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat < 1000) 1685 evtCtrl[k0].evtStat = 9 800; //flag event as 'processed'1686 } 1722 evtCtrl[k0].evtStat = 9000; //flag event as 'processed' 1723 }*/ 1687 1724 1688 1725 gp_runStat = -99; … … 1800 1837 int numWait = 0; 1801 1838 1802 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) 1839 // Note that the current loop does not at all gurantee that 1840 // the events are written in the correct order. 1841 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) 1803 1842 { 1804 if (evtCtrl[k0].evtStat <= 5000 || evtCtrl[k0].evtStat >= 9000)1843 if (evtCtrl[k0].evtStat<5000 || evtCtrl[k0].evtStat >= 10000) 1805 1844 { 1806 if ( evtCtrl[k0].evtStat > 0 && evtCtrl[k0].evtStat < 9000)1845 if (/*evtCtrl[k0].evtStat > 0 &&*/ evtCtrl[k0].evtStat < 5000) 1807 1846 numWait++; 1808 1847 … … 1811 1850 1812 1851 //we must drain the buffer asap 1852 /* 1813 1853 if (gi_resetR > 1) 1814 1854 { 1815 evtCtrl[k0].evtStat = 9 904;1855 evtCtrl[k0].evtStat = 9000; 1816 1856 continue; 1817 } 1857 }*/ 1818 1858 1819 1859 const uint32_t irun = evtCtrl[k0].runNum; … … 1868 1908 } 1869 1909 1910 /* 1870 1911 if (runCtrl[lastRun].fileId > 0) 1871 1912 { 1872 1913 // There is an event but file is already closed 1873 /* 1874 if (runCtrl[j].fileId < 100) 1875 { 1876 factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun); 1877 runCtrl[j].fileId += 100; 1878 }*/ 1879 1880 evtCtrl[k0].evtStat = 9903; 1881 } 1914 //if (runCtrl[j].fileId < 100) 1915 //{ 1916 // factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun); 1917 // runCtrl[j].fileId += 100; 1918 //} 1919 1920 evtCtrl[k0].evtStat = 9000; 1921 }*/ 1882 1922 1883 1923 // File is open … … 1891 1931 runCtrl[lastRun].lastTime = g_actTime; 1892 1932 runCtrl[lastRun].actEvt++; 1893 1894 evtCtrl[k0].evtStat = 9901;1895 1933 } 1896 1934 else 1897 {1898 1935 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun); 1899 evtCtrl[k0].evtStat = 9902;1900 }1901 1936 1902 1937 checkAndCloseRun(lastRun, irun, rc<0, 1); 1903 1938 } 1939 1940 evtCtrl[k0].evtStat = 10000; // event written (or has to be discarded) -> delete 1904 1941 } 1905 1942 /* -
trunk/FACT++/src/FAD.h
r15474 r15483 177 177 178 178 //--------------------------------------------------------------- 179 180 #define MAX_RUN 8181 #define MAX_EVT 32768 //don't worry, for events is MAX_RUN*MAX_EVT182 179 183 180 typedef void* FileHandle_t ;
Note:
See TracChangeset
for help on using the changeset viewer.