Changeset 15346
- Timestamp:
- 04/15/13 15:33:10 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.c
r15345 r15346 866 866 #endif 867 867 evtCtrl.lastPtr++; 868 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN) 869 evtCtrl.lastPtr = 0; 868 evtCtrl.lastPtr %= MAX_EVT * MAX_RUN; 870 869 871 870 gi.evtGet++; … … 977 976 return; 978 977 } /*-----------------------------------------------------------------*/ 978 979 void reportIncomplete(int k0) 980 { 981 const int id = evtCtrl.evtBuf[k0]; 982 983 factPrintf(kWarn, 601, "%5d skip incomplete evt %8d %8d %2d", 984 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0, evtCtrl.evtStat[k0]); 985 986 uint64_t report = 0; 987 988 char str[1000]; 989 990 int ik=0; 991 for (int ib=0; ib<NBOARDS; ib++) 992 { 993 if (ib%10==0) 994 str[ik++] = '|'; 995 996 const int jb = mBuffer[id].board[ib]; 997 if (jb>=0) // data received from that board 998 { 999 str[ik++] = '0'+(jb%10); 1000 continue; 1001 } 1002 1003 // FIXME: Is that really 'b' or should that be 'ib' ? 1004 if (gi_NumConnect[ib]<=0) // board not connected 1005 { 1006 str[ik++] = 'x'; 1007 continue; 1008 } 1009 1010 // data from this board lost 1011 str[ik++] = '.'; 1012 report |= ((uint64_t)1)<<ib; 1013 } 1014 1015 str[ik++] = '|'; 1016 str[ik] = 0; 1017 1018 factOut(kWarn, 601, str); 1019 1020 factReportIncomplete(report); 1021 } 979 1022 980 1023 … … 1721 1764 } 1722 1765 1723 gi_SecTime = g_actTime; 1724 1725 1726 //loop over all active events and flag those older than read-timeout 1727 //delete those that are written to disk .... 1728 1729 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 1730 if (kd < 0) 1731 kd += (MAX_EVT * MAX_RUN); 1732 1733 gj.bufNew = gj.bufEvt = 0; 1734 int k1 = evtCtrl.frstPtr; 1735 for (k = k1; k < (k1 + kd); k++) { 1736 int k0 = k % (MAX_EVT * MAX_RUN); 1737 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if) 1738 1739 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) { 1740 gj.bufNew++; //incomplete event in Buffer 1741 if (evtCtrl.evtStat[k0] < 90 1742 && evtCtrl.pcTime[k0] < g_actTime - 30) { 1743 int id = evtCtrl.evtBuf[k0]; 1744 factPrintf(kWarn, 601, "%5d skip incomplete evt %8d %8d %2d", 1745 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0, evtCtrl.evtStat[k0]); 1746 1747 uint64_t report = 0; 1748 1749 char str[1000]; 1750 1751 int ik,ib,jb; 1752 ik=0; 1753 for (ib=0; ib<NBOARDS; ib++) { 1754 if (ib%10==0) { 1755 str[ik++] = '|'; 1756 } 1757 jb = mBuffer[id].board[ib]; 1758 if ( jb<0 ) { 1759 if (gi_NumConnect[b] >0 ) { 1760 str[ik++] = '.'; 1761 report |= ((uint64_t)1)<<ib; 1762 } else { 1763 str[ik++] = 'x'; 1764 } 1765 } else { 1766 str[ik++] = '0'+(jb%10); 1767 } 1768 } 1769 str[ik++] = '|'; 1770 str[ik] = 0; 1771 factOut(kWarn, 601, str); 1772 1773 factReportIncomplete(report); 1774 1775 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events 1776 gi.evtSkp++; 1777 gi.evtTot++; 1778 gj.evtSkip++; 1779 } 1780 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete' 1781 || evtCtrl.evtStat[k0] == 0) { //'useless' 1782 1783 int id = evtCtrl.evtBuf[k0]; 1766 gi_SecTime = g_actTime; 1767 1768 1769 //loop over all active events and flag those older than read-timeout 1770 //delete those that are written to disk .... 1771 1772 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 1773 if (kd < 0) 1774 kd += (MAX_EVT * MAX_RUN); 1775 1776 gj.bufNew = gj.bufEvt = 0; 1777 1778 const int k1 = evtCtrl.frstPtr; 1779 for (int k=k1; k<k1+kd; k++) 1780 { 1781 const int k0 = k % (MAX_EVT * MAX_RUN); 1782 1783 if (k0==evtCtrl.frstPtr && evtCtrl.evtStat[k0]<0) 1784 { 1785 evtCtrl.frstPtr++; 1786 evtCtrl.frstPtr %= MAX_EVT * MAX_RUN; 1787 1788 // Continue because evtCtrl.evtStat[k0] must be <0 1789 continue; 1790 } 1791 1792 // Check the more likely case first: incomplete events 1793 if (evtCtrl.evtStat[k0]>0 && evtCtrl.evtStat[k0]<92) 1794 { 1795 gj.bufNew++; //incomplete event in Buffer 1796 1797 // Event has not yet timed out or was reported already 1798 if (evtCtrl.evtStat[k0]>=90 || evtCtrl.pcTime[k0]>=g_actTime - 30) 1799 continue; 1800 1801 reportIncomplete(k0); 1802 1803 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events 1804 gi.evtSkp++; 1805 gi.evtTot++; 1806 gj.evtSkip++; 1807 1808 continue; 1809 } 1810 1811 // complete event in Buffer 1812 if (evtCtrl.evtStat[k0] >= 95) 1813 gj.bufEvt++; 1814 1815 // Check the less likely case: 'useless' or 'delete' 1816 if (evtCtrl.evtStat[k0]==0 || evtCtrl.evtStat[k0] >= 9000) 1817 { 1818 const int id = evtCtrl.evtBuf[k0]; 1784 1819 #ifdef EVTDEBUG 1785 factPrintf(kDebug, -1, "%5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard); 1786 #endif 1787 mBufFree (id); //event written--> free memory 1788 evtCtrl.evtStat[k0] = -1; 1789 gj.evtWrite++; 1790 gj.rateWrite++; 1791 } else if (evtCtrl.evtStat[k0] >= 95) { 1792 gj.bufEvt++; //complete event in Buffer 1793 } 1794 1795 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) { 1796 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN); 1797 } 1798 } 1820 factPrintf(kDebug, -1, "%5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard); 1821 #endif 1822 mBufFree (id); //event written--> free memory 1823 evtCtrl.evtStat[k0] = -1; 1824 gj.evtWrite++; 1825 gj.rateWrite++; 1826 } 1827 1828 } 1799 1829 1800 1830 … … 1880 1910 1881 1911 1882 if (gi_resetR > 0) { 1883 //flag all events as 'read finished' 1884 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 1885 if (kd < 0) 1886 kd += (MAX_EVT * MAX_RUN); 1887 1888 int k1 = evtCtrl.frstPtr; 1889 for (k = k1; k < (k1 + kd); k++) { 1890 int k0 = k % (MAX_EVT * MAX_RUN); 1891 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) { 1892 evtCtrl.evtStat[k0] = 91; 1893 gi.evtSkp++; 1894 gi.evtTot++; 1895 } 1896 } 1897 1898 xwait.tv_sec = 0; 1899 xwait.tv_nsec = 1000; // sleep for ~1 usec 1900 nanosleep (&xwait, NULL); 1912 if (gi_resetR > 0) 1913 { 1914 //flag all events as 'read finished' 1915 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 1916 if (kd < 0) 1917 kd += (MAX_EVT * MAX_RUN); 1918 1919 const int k1 = evtCtrl.frstPtr; 1920 for (int k=k1; k<k1+kd; k++) 1921 { 1922 const int k0 = k % (MAX_EVT * MAX_RUN); 1923 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) 1924 { 1925 evtCtrl.evtStat[k0] = 91; 1926 gi.evtSkp++; 1927 gi.evtTot++; 1928 } 1929 } 1901 1930 1902 1931 //and clear all buffers (might have to wait until all others are done) … … 1911 1940 1912 1941 int numclear = 1; 1913 while (numclear > 0) { 1942 while (numclear > 0) 1943 { 1914 1944 numclear = 0; 1945 1915 1946 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 1916 1947 if (kd < 0) 1917 kd += (MAX_EVT * MAX_RUN); 1918 1919 int k1 = evtCtrl.frstPtr; 1920 for (k = k1; k < (k1 + kd); k++) { 1921 int k0 = k % (MAX_EVT * MAX_RUN); 1922 if (evtCtrl.evtStat[k0] > minclear) { 1923 int id = evtCtrl.evtBuf[k0]; 1948 kd += (MAX_EVT * MAX_RUN); 1949 1950 const int k1 = evtCtrl.frstPtr; 1951 for (int k=k1; k<k1+kd; k++) 1952 { 1953 const int k0 = k % (MAX_EVT * MAX_RUN); 1954 if (evtCtrl.evtStat[k0] > minclear) 1955 { 1956 const int id = evtCtrl.evtBuf[k0]; 1924 1957 #ifdef EVTDEBUG 1925 factPrintf(kDebug, -1, "ev %5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard); 1926 #endif 1927 mBufFree (id); //event written--> free memory 1928 evtCtrl.evtStat[k0] = -1; 1929 } else if (evtCtrl.evtStat[k0] > 0) 1930 numclear++; //writing is still ongoing... 1931 1932 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) 1933 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN); 1958 factPrintf(kDebug, -1, "ev %5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard); 1959 #endif 1960 mBufFree (id); //event written--> free memory 1961 evtCtrl.evtStat[k0] = -1; 1962 } 1963 else 1964 if (evtCtrl.evtStat[k0] > 0) 1965 numclear++; //writing is still ongoing... 1966 1967 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) 1968 { 1969 evtCtrl.frstPtr++; 1970 evtCtrl.frstPtr %= MAX_EVT * MAX_RUN; 1971 } 1934 1972 } 1935 1973 1936 xwait.tv_sec = 0; 1937 xwait.tv_nsec = 1000; // sleep for ~1 umsec 1938 nanosleep (&xwait, NULL); 1974 usleep(1); 1939 1975 } 1940 1976 } … … 1972 2008 1973 2009 1974 void * 1975 subProc (void *thrid) 1976 { 1977 int64_t threadID; 1978 int numWait, numProc, k, jret; 1979 struct timespec xwait; 1980 1981 // int32_t cntr ; 1982 1983 threadID = (int64_t)thrid; 1984 1985 factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID); 1986 1987 while (g_runStat > -2) { //in case of 'exit' we still must process pending events 1988 numWait = numProc = 0; 1989 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 1990 if (kd < 0) 1991 kd += (MAX_EVT * MAX_RUN); 1992 1993 int k1 = evtCtrl.frstPtr; 1994 for (k = k1; k < (k1 + kd); k++) { 1995 int k0 = k % (MAX_EVT * MAX_RUN); 1996 1997 if (evtCtrl.evtStat[k0] == 1000 + threadID) { 1998 if (gi_resetR > 1) { //we are asked to flush buffers asap 1999 jret = 9100; //flag to be deleted 2000 } else { 2001 int id = evtCtrl.evtBuf[k0]; 2002 2003 2004 jret = 2005 subProcEvt (threadID, mBuffer[id].FADhead, 2006 mBuffer[id].fEvent, mBuffer[id].buffer); 2007 2008 2009 if (jret <= threadID) { 2010 factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret); 2011 jret = 5300; 2012 } else if (jret <= 0) 2013 jret = 9200 + threadID; //flag as 'to be deleted' 2014 else if (jret >= gi_maxProc) 2015 jret = 5200 + threadID; //flag as 'to be written' 2016 else 2017 jret = 1000 + jret; //flag for next proces 2010 void *subProc(void *thrid) 2011 { 2012 const int64_t threadID = (int64_t)thrid; 2013 2014 factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID); 2015 2016 while (g_runStat > -2) //in case of 'exit' we still must process pending events 2017 { 2018 int numWait = 0; 2019 int numProc = 0; 2020 2021 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 2022 if (kd < 0) 2023 kd += (MAX_EVT * MAX_RUN); 2024 2025 const int k1 = evtCtrl.frstPtr; 2026 for (int k=k1; k<k1+kd; k++) 2027 { 2028 const int k0 = k % (MAX_EVT * MAX_RUN); 2029 2030 if (!(evtCtrl.evtStat[k0] == 1000 + threadID)) 2031 { 2032 if (evtCtrl.evtStat[k0] < 1000 + threadID) 2033 numWait++; 2034 2035 continue; 2018 2036 } 2037 2038 /*** if (evtCtrl.evtStat[k0] == 1000 + threadID) ****/ 2039 2040 int jret = 9100; // flag to be deleted (gi_resetR>1 : flush buffers asap) 2041 2042 if (gi_resetR<=1) 2043 { 2044 const int id = evtCtrl.evtBuf[k0]; 2045 2046 jret = subProcEvt(threadID, mBuffer[id].FADhead, 2047 mBuffer[id].fEvent, mBuffer[id].buffer); 2048 2049 2050 if (jret <= threadID) { 2051 factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret); 2052 jret = 5300; 2053 } else if (jret <= 0) 2054 jret = 9200 + threadID; // flag as 'to be deleted' 2055 else if (jret >= gi_maxProc) 2056 jret = 5200 + threadID; // flag as 'to be written' 2057 else 2058 jret = 1000 + jret; // flag for next proces 2059 } 2060 2019 2061 evtCtrl.evtStat[k0] = jret; 2020 2062 numProc++; 2021 } else if (evtCtrl.evtStat[k0] < 1000 + threadID) 2022 numWait++; 2023 } 2024 2025 if (gj.readStat < -10 && numWait == 0) { //nothing left to do 2026 factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID); 2027 return 0; 2028 } 2029 if (numProc == 0) { 2030 //seems we have nothing to do, so sleep a little 2031 xwait.tv_sec = 0; 2032 xwait.tv_nsec = 1000; // sleep for ~1 usec 2033 nanosleep (&xwait, NULL); 2034 } 2035 } 2036 2037 factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID); 2038 return 0; 2039 } /*-----------------------------------------------------------------*/ 2063 } 2064 2065 if (gj.readStat < -10 && numWait == 0) { //nothing left to do 2066 factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID); 2067 return 0; 2068 } 2069 2070 //seems we have nothing to do, so sleep a little 2071 if (numProc == 0) 2072 usleep(1); 2073 } 2074 2075 factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID); 2076 2077 return 0; 2078 } 2079 2080 /*-----------------------------------------------------------------*/ 2040 2081 2041 2082 … … 2044 2085 { 2045 2086 /* *** main loop processing file, including SW-trigger */ 2046 int numProc, numWait;2047 2087 int status, j; 2048 struct timespec xwait;2049 2088 2050 2089 int lastRun = 0; //usually run from last event still valid … … 2080 2119 } 2081 2120 2082 while (g_runStat > -2) { //in case of 'exit' we still must process pending events 2083 2084 numWait = numProc = 0; 2085 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 2086 if (kd < 0) 2087 kd += (MAX_EVT * MAX_RUN); 2088 2089 int k1 = evtCtrl.frstPtr; 2090 for (int k = k1; k < (k1 + kd); k++) { 2091 int k0 = k % (MAX_EVT * MAX_RUN); 2092 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if) 2093 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) { 2094 2095 if (gi_resetR > 1) { //we are asked to flush buffers asap 2121 // in case of 'exit' we still must process pending events 2122 while (g_runStat > -2) 2123 { 2124 int numWait = 0; 2125 int numProc = 0; 2126 2127 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 2128 if (kd < 0) 2129 kd += (MAX_EVT * MAX_RUN); 2130 2131 const int k1 = evtCtrl.frstPtr; 2132 for (int k=k1; k<k1+kd; k++) 2133 { 2134 const int k0 = k % (MAX_EVT * MAX_RUN); 2135 if (!(evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000)) 2136 { 2137 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) 2138 numWait++; 2139 2140 continue; 2141 } 2142 2143 /*** if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) ***/ 2144 2145 //we are asked to flush buffers asap 2146 if (gi_resetR > 1) 2147 { 2096 2148 evtCtrl.evtStat[k0] = 9991; 2097 } else { 2098 2099 //-------- it is better to open the run already here, so call can be used to initialize 2100 //-------- buffers etc. needed to interprete run (e.g. DRS calibration) 2101 int id = evtCtrl.evtBuf[k0]; 2102 uint32_t irun = mBuffer[id].runNum; 2103 int32_t ievt = mBuffer[id].evNum; 2104 if (runCtrl[lastRun].runId == irun) { 2105 j = lastRun; 2106 } else { 2107 //check which fileID to use (or open if needed) 2108 for (j = 0; j < MAX_RUN; j++) { 2109 if (runCtrl[j].runId == irun) 2110 break; 2111 } 2112 if (j >= MAX_RUN) { 2113 factPrintf(kFatal, 901, "procEvt: Can not find run %d for event %d in %d", irun, ievt, id); 2114 } 2115 lastRun = j; 2149 continue; 2150 } 2151 2152 //-------- it is better to open the run already here, so call can be used to initialize 2153 //-------- buffers etc. needed to interprete run (e.g. DRS calibration) 2154 const int id = evtCtrl.evtBuf[k0]; 2155 2156 const uint32_t irun = mBuffer[id].runNum; 2157 const int32_t ievt = mBuffer[id].evNum; 2158 2159 int j = lastRun; 2160 if (runCtrl[lastRun].runId != irun) 2161 { 2162 //check which fileID to use (or open if needed) 2163 for (j = 0; j < MAX_RUN; j++) { 2164 if (runCtrl[j].runId == irun) 2165 break; 2116 2166 } 2117 2118 if (runCtrl[j].fileId < 0) { 2119 //---- we need to open a new run ==> make sure all older runs are 2120 //---- finished and marked to be closed .... 2121 int j1; 2122 for (j1 = 0; j1 < MAX_RUN; j1++) { 2123 if (runCtrl[j1].fileId == 0) { 2124 runCtrl[j1].procId = 2; //--> do no longer accept events for processing 2125 //---- problem: processing still going on ==> must wait for closing .... 2126 factPrintf(kInfo, -1, "procEvt: Finished run since new one opened %d", runCtrl[j1].runId); 2127 runFinish1 (runCtrl[j1].runId); 2128 } 2129 2130 } 2131 2132 actRun.Version = 1; 2133 actRun.RunType = -1; //to be adapted 2134 2135 actRun.Nroi = runCtrl[j].roi0; 2136 actRun.NroiTM = runCtrl[j].roi8; 2137 //ETIENNE don't reset it to zero as it is taken care of in DataWriteFits 2138 // if (actRun.Nroi == actRun.NroiTM) 2139 // actRun.NroiTM = 0; 2140 actRun.RunTime = runCtrl[j].firstTime; 2141 actRun.RunUsec = runCtrl[j].firstTime; 2142 actRun.NBoard = NBOARDS; 2143 actRun.NPix = NPIX; 2144 actRun.NTm = NTMARK; 2145 actRun.Nroi = mBuffer[id].nRoi; 2146 memcpy (actRun.FADhead, mBuffer[id].FADhead, 2147 NBOARDS * sizeof (PEVNT_HEADER)); 2148 2149 runCtrl[j].fileHd = 2150 runOpen (irun, &actRun, sizeof (actRun)); 2151 if (runCtrl[j].fileHd == NULL) { 2152 factPrintf(kError, 502, "procEvt: Could not open a file for run %d (runOpen failed)", irun); 2153 runCtrl[j].fileId = 91; 2154 runCtrl[j].procId = 91; 2155 } else { 2156 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt); 2157 runCtrl[j].fileId = 0; 2158 runCtrl[j].procId = 0; 2159 } 2160 2167 if (j >= MAX_RUN) { 2168 factPrintf(kFatal, 901, "procEvt: Can not find run %d for event %d in %d", irun, ievt, id); 2161 2169 } 2162 //-------- also check if run shall be closed (==> skip event, but do not close the file !!! ) 2163 if (runCtrl[j].procId == 0) { 2164 if (runCtrl[j].closeTime < g_actTime 2165 || runCtrl[j].lastTime < g_actTime - 300 2166 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) { 2167 factPrintf(kInfo, 502, "procEvt: Reached end of run condition for run %d", irun); 2168 runFinish1 (runCtrl[j].runId); 2169 runCtrl[j].procId = 1; 2170 } 2170 lastRun = j; 2171 } 2172 2173 if (runCtrl[j].fileId < 0) 2174 { 2175 //---- we need to open a new run ==> make sure all older runs are 2176 //---- finished and marked to be closed .... 2177 int j1; 2178 for (j1 = 0; j1 < MAX_RUN; j1++) { 2179 if (runCtrl[j1].fileId == 0) { 2180 runCtrl[j1].procId = 2; //--> do no longer accept events for processing 2181 //---- problem: processing still going on ==> must wait for closing .... 2182 factPrintf(kInfo, -1, "procEvt: Finished run since new one opened %d", runCtrl[j1].runId); 2183 runFinish1(runCtrl[j1].runId); 2184 } 2171 2185 } 2172 if (runCtrl[j].procId != 0) { 2186 2187 actRun.Version = 1; 2188 actRun.RunType = -1; //to be adapted 2189 2190 actRun.Nroi = runCtrl[j].roi0; 2191 actRun.NroiTM = runCtrl[j].roi8; 2192 //ETIENNE don't reset it to zero as it is taken care of in DataWriteFits 2193 // if (actRun.Nroi == actRun.NroiTM) 2194 // actRun.NroiTM = 0; 2195 actRun.RunTime = runCtrl[j].firstTime; 2196 actRun.RunUsec = runCtrl[j].firstTime; 2197 actRun.NBoard = NBOARDS; 2198 actRun.NPix = NPIX; 2199 actRun.NTm = NTMARK; 2200 actRun.Nroi = mBuffer[id].nRoi; 2201 2202 memcpy(actRun.FADhead, mBuffer[id].FADhead, NBOARDS*sizeof(PEVNT_HEADER)); 2203 2204 runCtrl[j].fileHd = runOpen (irun, &actRun, sizeof (actRun)); 2205 if (runCtrl[j].fileHd == NULL) 2206 { 2207 factPrintf(kError, 502, "procEvt: Could not open a file for run %d (runOpen failed)", irun); 2208 runCtrl[j].fileId = 91; 2209 runCtrl[j].procId = 91; 2210 } 2211 else 2212 { 2213 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt); 2214 runCtrl[j].fileId = 0; 2215 runCtrl[j].procId = 0; 2216 } 2217 } 2218 2219 //-------- also check if run shall be closed (==> skip event, but do not close the file !!! ) 2220 if (runCtrl[j].procId == 0) 2221 { 2222 if (runCtrl[j].closeTime < g_actTime || 2223 runCtrl[j].lastTime < g_actTime - 300 || 2224 runCtrl[j].maxEvt <= runCtrl[j].procEvt) 2225 { 2226 factPrintf(kInfo, 502, "procEvt: Reached end of run condition for run %d", irun); 2227 runFinish1 (runCtrl[j].runId); 2228 runCtrl[j].procId = 1; 2229 } 2230 } 2231 if (runCtrl[j].procId != 0) { 2173 2232 #ifdef EVTDEBUG 2174 fcatPrintf(kDebug, 502, "procEvt: Skip event %d because no active run %d", ievt, irun); 2175 #endif 2176 evtCtrl.evtStat[k0] = 9091; 2177 } else { 2178 //-------- 2179 //-------- 2180 id = evtCtrl.evtBuf[k0]; 2181 int itevt = mBuffer[id].trgNum; 2182 int itrg = mBuffer[id].trgTyp; 2183 int roi = mBuffer[id].nRoi; 2184 int roiTM = mBuffer[id].nRoiTM; 2185 2186 //make sure unused pixels/tmarks are cleared to zero 2187 //ETIENNE don't reset it to zero as it is taken care of in DataWriteFits 2188 // if (roiTM == roi) 2189 // roiTM = 0; 2190 int ip, it, dest, ib; 2191 for (ip = 0; ip < NPIX; ip++) { 2192 if (mBuffer[id].fEvent->StartPix[ip] == -1) { 2193 dest = ip * roi; 2194 memset (&mBuffer[id].fEvent->Adc_Data[dest], 0, roi * 2); 2195 } 2196 } 2197 2198 for (it = 0; it < NTMARK; it++) { 2199 if (mBuffer[id].fEvent->StartTM[it] == -1) { 2200 dest = it * roi + NPIX * roi; 2201 memset (&mBuffer[id].fEvent->Adc_Data[dest], 0, roi * 2); 2202 } 2203 } 2204 2205 2206 //and set correct event header ; also check for consistency in event (not yet) 2207 mBuffer[id].fEvent->Roi = roi; 2208 mBuffer[id].fEvent->RoiTM = roiTM; 2209 mBuffer[id].fEvent->EventNum = ievt; 2210 mBuffer[id].fEvent->TriggerNum = itevt; 2211 mBuffer[id].fEvent->TriggerType = itrg; 2212 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0]; 2213 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1]; 2214 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2]; 2215 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3]; 2216 mBuffer[id].fEvent->SoftTrig = 0; 2217 2218 2219 for (ib = 0; ib < NBOARDS; ib++) { 2220 if (mBuffer[id].board[ib] == -1) { //board is not read 2221 mBuffer[id].FADhead[ib].start_package_flag = 0; 2222 mBuffer[id].fEvent->BoardTime[ib] = 0; 2223 } else { 2224 mBuffer[id].fEvent->BoardTime[ib] = 2225 mBuffer[id].FADhead[ib].time; 2226 } 2227 } 2228 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead, 2229 mBuffer[id].fEvent); 2230 gi.procTot++; 2231 numProc++; 2232 2233 if (i < 0) { 2234 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped 2235 gi.procErr++; 2236 } else { 2237 evtCtrl.evtStat[k0] = 1000; 2238 runCtrl[j].procEvt++; 2239 } 2233 factPrintf(kDebug, 502, "procEvt: Skip event %d because no active run %d", ievt, irun); 2234 #endif 2235 evtCtrl.evtStat[k0] = 9091; 2236 continue; 2237 } 2238 2239 //-------- 2240 //-------- 2241 2242 const int roi = mBuffer[id].nRoi; 2243 //const int roiTM = mBuffer[id].nRoiTM; 2244 2245 //make sure unused pixels/tmarks are cleared to zero 2246 //ETIENNE don't reset it to zero as it is taken care of in DataWriteFits 2247 // if (roiTM == roi) 2248 // roiTM = 0; 2249 for (int ip=0; ip<NPIX; ip++) 2250 { 2251 if (mBuffer[id].fEvent->StartPix[ip] == -1) 2252 { 2253 const int dest = ip * roi; 2254 memset(&mBuffer[id].fEvent->Adc_Data[dest], 0, roi * 2); 2240 2255 } 2241 } 2242 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) { 2243 numWait++; 2244 } 2245 } 2246 2247 if (gj.readStat < -10 && numWait == 0) { //nothing left to do 2248 factPrintf(kInfo, -1, "Exit Processing Process ..."); 2249 gp_runStat = -22; //==> we should exit 2250 gj.procStat = -22; //==> we should exit 2251 return 0; 2252 } 2253 2254 if (numProc == 0) { 2255 //seems we have nothing to do, so sleep a little 2256 xwait.tv_sec = 0; 2257 xwait.tv_nsec = 1000; // sleep for ~1 usec 2258 nanosleep (&xwait, NULL); 2259 } 2260 gp_runStat = gi_runStat; 2261 gj.procStat = gj.readStat; 2262 2263 } 2264 2265 //we are asked to abort asap ==> must flag all remaining events 2256 } 2257 2258 for (int it=0; it<NTMARK; it++) 2259 { 2260 if (mBuffer[id].fEvent->StartTM[it] == -1) 2261 { 2262 const int dest = it * roi + NPIX * roi; 2263 memset(&mBuffer[id].fEvent->Adc_Data[dest], 0, roi * 2); 2264 } 2265 } 2266 2267 //and set correct event header ; also check for consistency in event (not yet) 2268 mBuffer[id].fEvent->Roi = mBuffer[id].nRoi; 2269 mBuffer[id].fEvent->RoiTM = mBuffer[id].nRoiTM; 2270 mBuffer[id].fEvent->EventNum = mBuffer[id].evNum; 2271 mBuffer[id].fEvent->TriggerNum = mBuffer[id].trgNum; 2272 mBuffer[id].fEvent->TriggerType = mBuffer[id].trgTyp; 2273 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0]; 2274 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1]; 2275 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2]; 2276 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3]; 2277 mBuffer[id].fEvent->SoftTrig = 0; 2278 2279 2280 for (int ib=0; ib<NBOARDS; ib++) 2281 { 2282 // board is not read 2283 if (mBuffer[id].board[ib] == -1) 2284 { 2285 mBuffer[id].FADhead[ib].start_package_flag = 0; 2286 mBuffer[id].fEvent->BoardTime[ib] = 0; 2287 } 2288 else 2289 { 2290 mBuffer[id].fEvent->BoardTime[ib] = mBuffer[id].FADhead[ib].time; 2291 } 2292 } 2293 2294 const int rc = eventCheck(mBuffer[id].runNum, mBuffer[id].FADhead, 2295 mBuffer[id].fEvent); 2296 gi.procTot++; 2297 numProc++; 2298 2299 if (rc < 0) 2300 { 2301 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped 2302 gi.procErr++; 2303 } 2304 else 2305 { 2306 evtCtrl.evtStat[k0] = 1000; 2307 runCtrl[j].procEvt++; 2308 } 2309 } 2310 2311 if (gj.readStat < -10 && numWait == 0) { //nothing left to do 2312 factPrintf(kInfo, -1, "Exit Processing Process ..."); 2313 gp_runStat = -22; //==> we should exit 2314 gj.procStat = -22; //==> we should exit 2315 return 0; 2316 } 2317 2318 //seems we have nothing to do, so sleep a little 2319 if (numProc == 0) 2320 usleep(1); 2321 2322 gp_runStat = gi_runStat; 2323 gj.procStat = gj.readStat; 2324 2325 } 2326 2327 //we are asked to abort asap ==> must flag all remaining events 2266 2328 // when gi_runStat claims that all events are in the buffer... 2267 2329 … … 2275 2337 } 2276 2338 2277 int k1 = evtCtrl.frstPtr;2278 for (int k = k1; k < (k1 + kd); k++) {2279 int k0 = k % (MAX_EVT * MAX_RUN);2280 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {2281 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'2282 }2339 const int k1 = evtCtrl.frstPtr; 2340 for (int k=k1; k<k1+kd; k++) 2341 { 2342 const int k0 = k % (MAX_EVT * MAX_RUN); 2343 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) 2344 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed' 2283 2345 } 2284 2346 … … 2332 2394 runCtrl[j].closeTime >= g_actTime && 2333 2395 runCtrl[j].lastTime >= g_actTime - 300 && 2334 runCtrl[j].maxEvt > =runCtrl[j].actEvt)2396 runCtrl[j].maxEvt > runCtrl[j].actEvt) 2335 2397 return; 2336 2398 … … 2375 2437 2376 2438 2377 void * 2378 writeEvt (void *ptr) 2439 void *writeEvt (void *ptr) 2379 2440 { 2380 2441 /* *** main loop writing event (including opening and closing run-files */ 2381 2382 int numWrite, numWait;2383 int k, j ;2384 struct timespec xwait;2385 2442 2386 2443 // cpu_set_t mask; … … 2400 2457 int lastRun = 0; //usually run from last event still valid 2401 2458 2402 while (g_runStat > -2) { 2403 2404 numWait = numWrite = 0; 2405 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 2406 if (kd < 0) 2407 kd += (MAX_EVT * MAX_RUN); 2408 2409 int k1 = evtCtrl.frstPtr; 2410 for (k = k1; k < (k1 + kd); k++) { 2411 int k0 = k % (MAX_EVT * MAX_RUN); 2412 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if) 2413 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) { 2414 2415 if (gi_resetR > 1) { //we must drain the buffer asap 2459 while (g_runStat > -2) 2460 { 2461 int numWrite = 0; 2462 int numWait = 0; 2463 2464 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr; 2465 if (kd < 0) 2466 kd += (MAX_EVT * MAX_RUN); 2467 2468 const int k1 = evtCtrl.frstPtr; 2469 for (int k=k1; k<k1+kd; k++) 2470 { 2471 const int k0 = k % (MAX_EVT * MAX_RUN); 2472 2473 if (evtCtrl.evtStat[k0] <= 5000 || evtCtrl.evtStat[k0] >= 9000) 2474 { 2475 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000) 2476 numWait++; 2477 2478 continue; 2479 } 2480 2481 /*** if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) ***/ 2482 2483 //we must drain the buffer asap 2484 if (gi_resetR > 1) 2485 { 2416 2486 evtCtrl.evtStat[k0] = 9904; 2417 } else { 2418 2419 2420 int id = evtCtrl.evtBuf[k0]; 2421 uint32_t irun = mBuffer[id].runNum; 2422 int32_t ievt = mBuffer[id].evNum; 2423 2424 gi.wrtTot++; 2425 if (runCtrl[lastRun].runId == irun) { 2426 j = lastRun; 2487 continue; 2488 } 2489 2490 const int id = evtCtrl.evtBuf[k0]; 2491 2492 const uint32_t irun = mBuffer[id].runNum; 2493 const int32_t ievt = mBuffer[id].evNum; 2494 2495 gi.wrtTot++; 2496 2497 int j = lastRun; 2498 2499 if (runCtrl[lastRun].runId != irun) 2500 { 2501 //check which fileID to use (or open if needed) 2502 for (j = 0; j < MAX_RUN; j++) 2503 { 2504 if (runCtrl[j].runId == irun) 2505 break; 2506 } 2507 2508 if (j >= MAX_RUN) 2509 { 2510 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, id); 2511 gi.wrtErr++; 2512 } 2513 2514 lastRun = j; 2515 } 2516 2517 if (runCtrl[j].fileId < 0) { 2518 actRun.Version = 1; 2519 actRun.RunType = -1; //to be adapted 2520 2521 actRun.Nroi = runCtrl[j].roi0; 2522 actRun.NroiTM = runCtrl[j].roi8; 2523 //ETIENNE don't reset it to zero as it is taken care of in DataWriteFits 2524 // if (actRun.Nroi == actRun.NroiTM) 2525 // actRun.NroiTM = 0; 2526 actRun.RunTime = runCtrl[j].firstTime; 2527 actRun.RunUsec = runCtrl[j].firstTime; 2528 actRun.NBoard = NBOARDS; 2529 actRun.NPix = NPIX; 2530 actRun.NTm = NTMARK; 2531 actRun.Nroi = mBuffer[id].nRoi; 2532 memcpy (actRun.FADhead, mBuffer[id].FADhead, 2533 NBOARDS * sizeof (PEVNT_HEADER)); 2534 2535 runCtrl[j].fileHd = 2536 runOpen (irun, &actRun, sizeof (actRun)); 2537 if (runCtrl[j].fileHd == NULL) { 2538 factPrintf(kError, 502, "writeEvt: Could not open a file for run %d (runOpen failed)", irun); 2539 runCtrl[j].fileId = 91; 2427 2540 } else { 2428 //check which fileID to use (or open if needed) 2429 for (j = 0; j < MAX_RUN; j++) { 2430 if (runCtrl[j].runId == irun) 2431 break; 2432 } 2433 if (j >= MAX_RUN) { 2434 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, id); 2435 gi.wrtErr++; 2436 } 2437 lastRun = j; 2541 factPrintf(kInfo, -1, "writeEvt: Opened new file for run %d (evt %d)", irun, ievt); 2542 runCtrl[j].fileId = 0; 2438 2543 } 2439 2544 2545 } 2546 2547 if (runCtrl[j].fileId != 0) { 2440 2548 if (runCtrl[j].fileId < 0) { 2441 actRun.Version = 1; 2442 actRun.RunType = -1; //to be adapted 2443 2444 actRun.Nroi = runCtrl[j].roi0; 2445 actRun.NroiTM = runCtrl[j].roi8; 2446 //ETIENNE don't reset it to zero as it is taken care of in DataWriteFits 2447 // if (actRun.Nroi == actRun.NroiTM) 2448 // actRun.NroiTM = 0; 2449 actRun.RunTime = runCtrl[j].firstTime; 2450 actRun.RunUsec = runCtrl[j].firstTime; 2451 actRun.NBoard = NBOARDS; 2452 actRun.NPix = NPIX; 2453 actRun.NTm = NTMARK; 2454 actRun.Nroi = mBuffer[id].nRoi; 2455 memcpy (actRun.FADhead, mBuffer[id].FADhead, 2456 NBOARDS * sizeof (PEVNT_HEADER)); 2457 2458 runCtrl[j].fileHd = 2459 runOpen (irun, &actRun, sizeof (actRun)); 2460 if (runCtrl[j].fileHd == NULL) { 2461 factPrintf(kError, 502, "writeEvt: Could not open a file for run %d (runOpen failed)", irun); 2462 runCtrl[j].fileId = 91; 2463 } else { 2464 factPrintf(kInfo, -1, "writeEvt: Opened new file for run %d (evt %d)", irun, ievt); 2465 runCtrl[j].fileId = 0; 2466 } 2467 2549 factPrintf(kError, 123, "writeEvt: Never opened file for run %d", irun); 2550 } else if (runCtrl[j].fileId < 100) { 2551 factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun); 2552 runCtrl[j].fileId += 100; 2553 } else { 2554 #ifdef EVTDEBUG 2555 factPrintf(kDebug, 123, "writeEvt: File for run %d is closed", irun); 2556 #endif 2468 2557 } 2469 2470 if (runCtrl[j].fileId != 0) { 2471 if (runCtrl[j].fileId < 0) { 2472 factPrintf(kError, 123, "writeEvt: Never opened file for run %d", irun); 2473 } else if (runCtrl[j].fileId < 100) { 2474 factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun); 2475 runCtrl[j].fileId += 100; 2476 } else { 2558 evtCtrl.evtStat[k0] = 9903; 2559 gi.wrtErr++; 2560 } else { 2561 // snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id])); 2562 // factOut (kInfo, 504, str); 2563 const int rc = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent, 2564 sizeof (mBuffer[id])); 2565 if (rc >= 0) { 2566 runCtrl[j].lastTime = g_actTime; 2567 runCtrl[j].actEvt++; 2568 evtCtrl.evtStat[k0] = 9901; 2477 2569 #ifdef EVTDEBUG 2478 factPrintf(kDebug, 123, "writeEvt: File for run %d is closed", irun); 2479 #endif 2480 } 2481 evtCtrl.evtStat[k0] = 9903; 2482 gi.wrtErr++; 2570 factPrintf(kDebug, 504, "%5d successfully wrote for run %d id %5d", ievt, irun, k0); 2571 #endif 2572 // gj.writEvt++ ; 2483 2573 } else { 2484 // snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id])); 2485 // factOut (kInfo, 504, str); 2486 const int rc = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent, 2487 sizeof (mBuffer[id])); 2488 if (rc >= 0) { 2489 runCtrl[j].lastTime = g_actTime; 2490 runCtrl[j].actEvt++; 2491 evtCtrl.evtStat[k0] = 9901; 2492 #ifdef EVTDEBUG 2493 factPrintf(kDebug, 504, "%5d successfully wrote for run %d id %5d", ievt, irun, k0); 2494 #endif 2495 // gj.writEvt++ ; 2496 } else { 2497 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun); 2498 evtCtrl.evtStat[k0] = 9902; 2499 gi.wrtErr++; 2500 } 2501 2502 checkAndCloseRun(j, irun, rc<0, 1); 2574 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun); 2575 evtCtrl.evtStat[k0] = 9902; 2576 gi.wrtErr++; 2503 2577 } 2504 } 2505 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)2506 numWait++;2507 }2578 2579 checkAndCloseRun(j, irun, rc<0, 1); 2580 } 2581 } 2508 2582 2509 2583 //check if we should close a run (mainly when no event pending) … … 2516 2590 if (actrun != 0) 2517 2591 {//If we have an active run, look for its start time 2518 for ( j=0;j<MAX_RUN;j++)2592 for (int j=0;j<MAX_RUN;j++) 2519 2593 { 2520 2594 if (runCtrl[j].runId == actrun) … … 2537 2611 //EDIT: this is completely useless, because as run Numbers are taken from FADs board, 2538 2612 //I will never get run numbers for which no file is to be opened 2539 for ( j=0;j<MAX_RUN;j++)2613 for (int j=0;j<MAX_RUN;j++) 2540 2614 { 2541 2615 if ((runCtrl[j].fileId < 0) && … … 2547 2621 } 2548 2622 } 2549 for ( j = 0; j <MAX_RUN; j++)2623 for (int j=0; j<MAX_RUN; j++) 2550 2624 { 2551 2625 if (runCtrl[j].fileId == 0) … … 2557 2631 } 2558 2632 2559 if (numWrite == 0) { 2560 //seems we have nothing to do, so sleep a little 2561 xwait.tv_sec = 0; 2562 xwait.tv_nsec = 1000; // sleep for ~1 usec 2563 nanosleep (&xwait, NULL); 2564 } 2633 //seems we have nothing to do, so sleep a little 2634 if (numWrite == 0) 2635 usleep(1); 2565 2636 2566 2637 if (gj.readStat < -10 && numWait == 0) { //nothing left to do … … 2580 2651 closerun: 2581 2652 factPrintf(kInfo, -1, "Close all open files ..."); 2582 for ( j = 0; j <MAX_RUN; j++)2653 for (int j=0; j<MAX_RUN; j++) 2583 2654 { 2584 2655 if (runCtrl[j].fileId == 0)
Note:
See TracChangeset
for help on using the changeset viewer.