Changeset 12227
- Timestamp:
- 10/21/11 18:09:38 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.c
r12091 r12227 2 2 // // // #define EVTDEBUG 3 3 4 #define NUMSOCK 1 //set to 7 for old configuration 5 #define MAXREAD 65536 //64kB wiznet buffer 6 #define MIN_LEN 32 // min #bytes needed to interpret FADheader 7 #define MAX_LEN 256*1024 // size of read-buffer per socket 4 #define NUMSOCK 1 //set to 7 for old configuration 5 #define MAXREAD 65536 //64kB wiznet buffer 8 6 9 7 #include <stdlib.h> … … 23 21 #include <netinet/tcp.h> 24 22 #include <pthread.h> 25 #include <sched.h> 23 #include <sched.h> 26 24 27 25 #include "EventBuilder.h" … … 37 35 }; 38 36 39 40 void *gi_procPtr; 41 37 #define MIN_LEN 32 // min #bytes needed to interpret FADheader 38 #define MAX_LEN 256*1024 // size of read-buffer per socket 42 39 43 40 //#define nanosleep(x,y) … … 48 45 //extern int runFinish (uint32_t runnr); 49 46 50 51 52 extern void * runStart (uint32_t irun, RUN_HEAD * runhd, size_t len);53 54 extern int runEnd (uint32_t, void * runPtr );55 56 47 extern void factOut (int severity, int err, char *message); 57 48 … … 63 54 extern void factStatNew (EVT_STAT gi); 64 55 65 extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event, 66 int mboard); 56 extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event); 67 57 68 58 extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, 69 int 16_t mboard, void* buffer);59 int8_t * buffer); 70 60 71 61 extern void debugHead (int i, int j, void *buf); … … 79 69 80 70 71 72 73 81 74 int g_maxProc; 82 75 int g_maxSize; 83 int gi_maxSize =0;76 int gi_maxSize; 84 77 int gi_maxProc; 85 uint32_t gi_actRun;86 void *gi_runPtr;87 78 88 79 uint g_actTime; … … 165 156 int32_t skip; //number of bytes skipped before start of event 166 157 158 int errCnt; //how often connect failed since last successful 167 159 int sockStat; //-1 if socket not yet connected , 99 if not exist 168 160 int socket; //contains the sockets … … 239 231 socklen_t optlen = sizeof (optval); 240 232 233 234 if (sid % 7 >= NUMSOCK) { 235 //this is a not used socket, so do nothing ... 236 rd->sockStat = 77; 237 rd->rBuf == NULL ; 238 return 0; 239 } 240 241 241 if (rd->sockStat == 0) { //close socket if open 242 242 j = close (rd->socket); … … 250 250 } 251 251 252 rd->sockStat = 99; 252 253 253 254 if (flag < 0) { … … 257 258 #endif 258 259 rd->rBuf = NULL; 259 rd->sockStat = 99;260 260 return 0; 261 261 } … … 309 309 310 310 311 312 311 snprintf (str, MXSTR, "Successfully generated socket %d ", sid); 313 312 factOut (kInfo, 773, str); 314 313 rd->sockStat = -1; //try to (re)open socket 314 rd->errCnt = 0; 315 315 return 0; 316 316 … … 458 458 if (runCtrl[k].runId == runID) { 459 459 // if (runCtrl[k].procId > 0) { //run is closed -> reject 460 // snprintf (str, MXSTR, "skip event since run %d already closed", runID);460 // snprintf (str, MXSTR, "skip event since run %d finished", runID); 461 461 // factOut (kInfo, 931, str); 462 462 // return -21; … … 506 506 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec; 507 507 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed 508 // runCtrl[evFree].lastTime = 0; 508 509 509 510 runTail[evFree].nEventsOk = … … 550 551 } 551 552 552 // if (gi_maxSize > 0) { 553 // mBuffer[i].buffer = malloc (gi_maxSize); 554 // if (mBuffer[i].buffer == NULL) { 555 // snprintf (str, MXSTR, "malloc buffer failed for event %d", evID); 556 // factOut (kError, 882, str); 557 // free (mBuffer[i].FADhead); 558 // mBuffer[i].FADhead = NULL; 559 // free (mBuffer[i].fEvent); 560 // mBuffer[i].fEvent = NULL; 561 // return -32; 562 // } 563 // } else { 564 // mBuffer[i].buffer = NULL; 565 // } 566 553 mBuffer[i].buffer = malloc (gi_maxSize); 554 if (mBuffer[i].buffer == NULL) { 555 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID); 556 factOut (kError, 882, str); 557 free (mBuffer[i].FADhead); 558 mBuffer[i].FADhead = NULL; 559 free (mBuffer[i].fEvent); 560 mBuffer[i].fEvent = NULL; 561 return -32; 562 } 567 563 //flag all boards as unused 568 564 mBuffer[i].nBoard = 0; … … 647 643 mBuffer[i].FADhead = NULL; 648 644 649 // if (gi_maxSize > 0) { 650 // free (mBuffer[i].buffer); 651 // mBuffer[i].buffer = NULL; 652 // } 645 free (mBuffer[i].buffer); 646 mBuffer[i].buffer = NULL; 653 647 654 648 headmem = NBOARDS * sizeof (PEVNT_HEADER); … … 729 723 int goodhed = 0; 730 724 731 char str[MXSTR];732 725 int sockDef[NBOARDS]; //internal state of sockets 733 726 int jrdx; … … 777 770 for (i = 0; i < NBOARDS; i++) 778 771 sockDef[i] = 0; 779 780 gi_actRun = 0;781 782 772 783 773 START: … … 794 784 numok = numok2 = 0; 795 785 796 int cntsock = 8 - NUMSOCK ;786 int cntsock = 8 - NUMSOCK ; 797 787 798 788 if (gi_resetS > 0) { … … 824 814 gj.evtSkip = gj.evtWrite = gj.evtErr = 0; 825 815 826 int b, 816 int b,p; 827 817 for (b = 0; b < NBOARDS; b++) 828 818 gj.badRoi[b] = 0; … … 868 858 869 859 for (p = p0 + 1; p < p0 + 8; p++) { 870 if (p < NUMSOCK) 871 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket 860 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket 872 861 k++; 873 862 } … … 890 879 891 880 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read 892 b = i / 7; 893 p = i % 7; 894 895 if (p >= NUMSOCK) {; 896 } else { 897 if (sockDef[b] > 0) 898 s0 = +1; 899 else 900 s0 = -1; 901 902 if (rd[i].sockStat < 0) { //try to connect if not yet done 881 b = i / 7 ; 882 p = i % 7 ; 883 884 if ( p >= NUMSOCK) { ; } 885 else { 886 if (sockDef[b] > 0) 887 s0 = +1; 888 else 889 s0 = -1; 890 891 if (rd[i].sockStat < 0) { //try to connect if not yet done 892 if (rd[i].sockStat == -1) { 903 893 rd[i].sockStat = connect (rd[i].socket, 904 (struct sockaddr *) &rd[i].SockAddr, 905 sizeof (rd[i].SockAddr)); 906 if (rd[i].sockStat == 0) { //successfull ==> 907 if (sockDef[b] > 0) { 908 rd[i].bufTyp = 0; // expect a header 909 rd[i].bufLen = frst_len; // max size to read at begining 894 (struct sockaddr *) &rd[i].SockAddr, 895 sizeof (rd[i].SockAddr)); 896 if (rd[i].sockStat == -1) { 897 rd[i].errCnt++ ; 898 // if (rd[i].errCnt < 100) rd[i].sockStat = rd[i].errCnt ; 899 // else if (rd[i].errCnt < 1000) rd[i].sockStat = 1000 ; 900 // else rd[i].sockStat = 10000 ; 901 } 902 //printf("try to connect %d -> %d\n",i,rd[i].sockStat); 903 904 } 905 if (rd[i].sockStat < -1 ) { 906 rd[i].sockStat++ ; 907 } 908 if (rd[i].sockStat == 0) { //successfull ==> 909 if (sockDef[b] > 0) { 910 rd[i].bufTyp = 0; // expect a header 911 rd[i].bufLen = frst_len; // max size to read at begining 912 } else { 913 rd[i].bufTyp = -1; // full data to be skipped 914 rd[i].bufLen = MAX_LEN; //huge for skipping 915 } 916 rd[i].bufPos = 0; // no byte read so far 917 rd[i].skip = 0; // start empty 918 // gi_NumConnect[b]++; 919 gi_NumConnect[b] += cntsock ; 920 921 gi.numConn[b]++; 922 gj.numConn[b]++; 923 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]); 924 factOut (kInfo, -1, str); 925 } 926 } 927 928 if (rd[i].sockStat == 0) { //we have a connection ==> try to read 929 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full] 930 numok++; 931 size_t maxread = rd[i].bufLen ; 932 if (maxread > MAXREAD ) maxread=MAXREAD ; 933 934 jrd = 935 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos], 936 maxread, MSG_DONTWAIT); 937 // rd[i].bufLen, MSG_DONTWAIT); 938 939 if (jrd > 0) { 940 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd); 941 #ifdef EVTDEBUG 942 memcpy (&rd[i].xBuf->B[rd[i].bufPos], 943 &rd[i].rBuf->B[rd[i].bufPos], jrd); 944 snprintf (str, MXSTR, 945 "read sock %3d bytes %5d len %5d first %d %d", i, 946 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos], 947 rd[i].rBuf->B[rd[i].bufPos + 1]); 948 factOut (kDebug, 301, str); 949 #endif 950 } 951 952 if (jrd == 0) { //connection has closed ... 953 snprintf (str, MXSTR, "Socket %d closed by FAD", i); 954 factOut (kInfo, 441, str); 955 GenSock (s0, i, 0, NULL, &rd[i]); 956 gi.gotErr[b]++; 957 // gi_NumConnect[b]--; 958 gi_NumConnect[b]-= cntsock ; 959 gi.numConn[b]--; 960 gj.numConn[b]--; 961 962 } else if (jrd < 0) { //did not read anything 963 if (errno != EAGAIN && errno != EWOULDBLOCK) { 964 snprintf (str, MXSTR, "Error Reading from %d | %m", i); 965 factOut (kError, 442, str); 966 gi.gotErr[b]++; 967 } else 968 numok--; //else nothing waiting to be read 969 jrd = 0; 970 } 971 } else { 972 jrd = 0; //did read nothing as requested 973 snprintf (str, MXSTR, "do not read from socket %d %d", i, 974 rd[i].bufLen); 975 factOut (kDebug, 301, str); 976 } 977 978 gi.gotByte[b] += jrd; 979 gj.rateBytes[b] += jrd; 980 981 if (jrd > 0) { 982 numokx++; 983 jrdx += jrd; 984 } 985 986 987 if (rd[i].bufTyp < 0) { // we are skipping this board ... 988 // just do nothing 989 #ifdef EVTDEBUG 990 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd, 991 i); 992 factOut (kInfo, 301, str); 993 #endif 994 995 } else if (rd[i].bufTyp > 0) { // we are reading data ... 996 if (jrd < rd[i].bufLen) { //not yet all read 997 rd[i].bufPos += jrd; //==> prepare for continuation 998 rd[i].bufLen -= jrd; 999 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data 1000 } else { //full dataset read 1001 rd[i].bufLen = 0; 1002 rd[i].bufPos = rd[i].fadLen; 1003 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0] 1004 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) { 1005 gi.evtErr++; 1006 snprintf (str, MXSTR, 1007 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ", 1008 i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0], 1009 rd[i].rBuf->B[1], 1010 rd[i].rBuf->B[rd[i].bufPos - 2], 1011 rd[i].rBuf->B[rd[i].bufPos - 1]); 1012 factOut (kError, 301, str); 1013 goto EndBuf; 1014 1015 #ifdef EVTDEBUG 910 1016 } else { 911 rd[i].bufTyp = -1; // full data to be skipped912 rd[i].bufLen = MAX_LEN; //huge for skipping913 }914 rd[i].bufPos = 0; // no byte read so far915 rd[i].skip = 0; // start empty916 gi_NumConnect[b] += cntsock;917 918 gi.numConn[b]++;919 gj.numConn[b]++;920 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);921 factOut (kInfo, -1, str);922 }923 }924 925 if (rd[i].sockStat == 0) { //we have a connection ==> try to read926 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]927 numok++;928 size_t maxread = rd[i].bufLen;929 if (maxread > MAXREAD)930 maxread = MAXREAD;931 932 jrd =933 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],934 maxread, MSG_DONTWAIT);935 936 if (jrd > 0) {937 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);938 #ifdef EVTDEBUG939 memcpy (&rd[i].xBuf->B[rd[i].bufPos],940 &rd[i].rBuf->B[rd[i].bufPos], jrd);941 1017 snprintf (str, MXSTR, 942 "read sock %3d bytes %5d len %5d first %d %d", 943 i, jrd, rd[i].bufLen, 944 rd[i].rBuf->B[rd[i].bufPos], 945 rd[i].rBuf->B[rd[i].bufPos + 1]); 1018 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d", 1019 i, rd[i].fadLen, rd[i].rBuf->B[0], 1020 rd[i].rBuf->B[1], start.B[1], start.B[0], 1021 rd[i].rBuf->B[rd[i].bufPos - 2], 1022 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1], 1023 stop.B[0]); 946 1024 factOut (kDebug, 301, str); 947 1025 #endif 948 1026 } 949 1027 950 if (jrd == 0) { //connection has closed ... 951 snprintf (str, MXSTR, "Socket %d closed by FAD", i); 952 factOut (kInfo, 441, str); 953 GenSock (s0, i, 0, NULL, &rd[i]); 954 gi.gotErr[b]++; 955 gi_NumConnect[b] -= cntsock; 956 gi.numConn[b]--; 957 gj.numConn[b]--; 958 959 } else if (jrd < 0) { //did not read anything 960 if (errno != EAGAIN && errno != EWOULDBLOCK) { 961 snprintf (str, MXSTR, "Error Reading from %d | %m", 962 i); 963 factOut (kError, 442, str); 964 gi.gotErr[b]++; 965 } else 966 numok--; //else nothing waiting to be read 967 jrd = 0; 968 } 969 } else { 970 jrd = 0; //did read nothing as requested 971 snprintf (str, MXSTR, "do not read from socket %d %d", i, 972 rd[i].bufLen); 973 factOut (kDebug, 301, str); 974 } 975 976 gi.gotByte[b] += jrd; 977 gj.rateBytes[b] += jrd; 978 979 if (jrd > 0) { 980 numokx++; 981 jrdx += jrd; 982 } 983 984 985 if (rd[i].bufTyp < 0) { // we are skipping this board ... 986 // just do nothing 1028 if (jrd > 0) 1029 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event 1030 1031 //we have a complete buffer, copy to WORK area 1032 int jr; 1033 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]); 1034 for (jr = 0; jr < 9; jr++) { 1035 roi[jr] = 1036 ntohs (rd[i]. 1037 rBuf->S[head_len / 2 + 2 + jr * (roi[0] + 4)]); 1038 } 1039 //get index into mBuffer for this event (create if needed) 1040 1041 int actid; 1042 if (g_useFTM > 0) 1043 actid = rd[i].evtID; 1044 else 1045 actid = rd[i].ftmID; 1046 1047 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i, 1048 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID, 1049 rd[i].evtID); 1050 1051 if (evID < -1000) { 1052 goto EndBuf; //not usable board/event/run --> skip it 1053 } 1054 if (evID < 0) { //no space left, retry later 987 1055 #ifdef EVTDEBUG 988 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd, 989 i); 990 factOut (kInfo, 301, str); 1056 if (rd[i].bufLen != 0) { 1057 snprintf (str, MXSTR, "something screwed up"); 1058 factOut (kFatal, 1, str); 1059 } 991 1060 #endif 992 993 } else if (rd[i].bufTyp > 0) { // we are reading data ... 994 if (jrd < rd[i].bufLen) { //not yet all read 995 rd[i].bufPos += jrd; //==> prepare for continuation 996 rd[i].bufLen -= jrd; 997 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data 998 } else { //full dataset read 999 rd[i].bufLen = 0; 1000 rd[i].bufPos = rd[i].fadLen; 1001 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0] 1002 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) { 1003 gi.evtErr++; 1004 snprintf (str, MXSTR, 1005 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ", 1006 i, rd[i].fadLen, rd[i].evtID, 1007 rd[i].rBuf->B[0], rd[i].rBuf->B[1], 1008 rd[i].rBuf->B[rd[i].bufPos - 2], 1009 rd[i].rBuf->B[rd[i].bufPos - 1]); 1010 factOut (kError, 301, str); 1011 goto EndBuf; 1061 xwait.tv_sec = 0; 1062 xwait.tv_nsec = 10000000; // sleep for ~10 msec 1063 nanosleep (&xwait, NULL); 1064 goto EndBuf1; //hope there is free space next round 1065 } 1066 //we have a valid entry in mBuffer[]; fill it 1012 1067 1013 1068 #ifdef EVTDEBUG 1014 } else { 1015 snprintf (str, MXSTR, 1016 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d", 1017 i, rd[i].fadLen, rd[i].rBuf->B[0], 1018 rd[i].rBuf->B[1], start.B[1], start.B[0], 1019 rd[i].rBuf->B[rd[i].bufPos - 2], 1020 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1], 1021 stop.B[0]); 1022 factOut (kDebug, 301, str); 1023 #endif 1024 } 1025 1026 if (jrd > 0) 1027 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event 1028 1029 //we have a complete buffer, copy to WORK area 1030 int jr; 1031 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]); 1032 for (jr = 0; jr < 9; jr++) { 1033 roi[jr] = 1034 ntohs (rd[i].rBuf->S[head_len / 2 + 2 + 1035 jr * (roi[0] + 4)]); 1036 } 1037 //get index into mBuffer for this event (create if needed) 1038 1039 int actid; 1040 if (g_useFTM > 0) 1041 actid = rd[i].evtID; 1042 else 1043 actid = rd[i].ftmID; 1044 1045 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i, 1046 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID, 1047 rd[i].evtID); 1048 1049 if (evID < -1000) { 1050 goto EndBuf; //not usable board/event/run --> skip it 1051 } 1052 if (evID < 0) { //no space left, retry later 1053 #ifdef EVTDEBUG 1054 if (rd[i].bufLen != 0) { 1055 snprintf (str, MXSTR, "something screwed up"); 1069 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0], 1070 rd[i].fadLen); 1071 if (xchk != 0) { 1072 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d", 1073 xchk, rd[i].fadLen, i); 1074 factOut (kFatal, 1, str); 1075 1076 uint iq; 1077 for (iq = 0; iq < rd[i].fadLen; iq++) { 1078 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) { 1079 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq, 1080 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]); 1056 1081 factOut (kFatal, 1, str); 1057 1082 } 1083 } 1084 } 1058 1085 #endif 1059 xwait.tv_sec = 0; 1060 xwait.tv_nsec = 10000000; // sleep for ~10 msec 1061 nanosleep (&xwait, NULL); 1062 goto EndBuf1; //hope there is free space next round 1063 } 1064 //we have a valid entry in mBuffer[]; fill it 1065 1066 #ifdef EVTDEBUG 1067 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0], 1068 rd[i].fadLen); 1069 if (xchk != 0) { 1070 snprintf (str, MXSTR, 1071 "ERROR OVERWRITE %d %d on port %d", xchk, 1072 rd[i].fadLen, i); 1073 factOut (kFatal, 1, str); 1074 1075 uint iq; 1076 for (iq = 0; iq < rd[i].fadLen; iq++) { 1077 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) { 1078 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, 1079 iq, rd[i].rBuf->B[iq], 1080 rd[i].xBuf->B[iq]); 1081 factOut (kFatal, 1, str); 1086 int qncpy = 0; 1087 boardId = b; 1088 int fadBoard = ntohs (rd[i].rBuf->S[12]); 1089 int fadCrate = fadBoard / 256; 1090 if (boardId != (fadCrate * 10 + fadBoard % 256)) { 1091 snprintf (str, MXSTR, "wrong Board ID %d %d %d", 1092 fadCrate, fadBoard % 256, boardId); 1093 factOut (kWarn, 301, str); 1094 } 1095 if (mBuffer[evID].board[boardId] != -1) { 1096 snprintf (str, MXSTR, 1097 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ", 1098 evID, boardId, i, rd[i].fadLen, 1099 rd[i].rBuf->B[0], rd[i].rBuf->B[1], 1100 rd[i].rBuf->B[rd[i].bufPos - 2], 1101 rd[i].rBuf->B[rd[i].bufPos - 1]); 1102 factOut (kWarn, 501, str); 1103 goto EndBuf; //--> skip Board 1104 } 1105 1106 int iDx = evtIdx[evID]; //index into evtCtrl 1107 1108 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag, 1109 &rd[i].rBuf->S[0], head_len); 1110 qncpy += head_len; 1111 1112 src = head_len / 2; 1113 for (px = 0; px < 9; px++) { //different sort in FAD board..... 1114 for (drs = 0; drs < 4; drs++) { 1115 pixH = ntohs (rd[i].rBuf->S[src++]); // ID 1116 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell 1117 pixR = ntohs (rd[i].rBuf->S[src++]); // roi 1118 //here we should check if pixH is correct .... 1119 1120 pixS = boardId * 36 + drs * 9 + px; 1121 src++; 1122 1123 1124 mBuffer[evID].fEvent->StartPix[pixS] = pixC; 1125 dest = pixS * roi[0]; 1126 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest], 1127 &rd[i].rBuf->S[src], roi[0] * 2); 1128 qncpy += roi[0] * 2; 1129 src += pixR; 1130 1131 if (px == 8) { 1132 tmS = boardId * 4 + drs; 1133 if (pixR > roi[0]) { //and we have additional TM info 1134 dest = tmS * roi[0] + NPIX * roi[0]; 1135 int srcT = src - roi[0]; 1136 mBuffer[evID].fEvent->StartTM[tmS] = 1137 (pixC + pixR - roi[0]) % 1024; 1138 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest], 1139 &rd[i].rBuf->S[srcT], roi[0] * 2); 1140 qncpy += roi[0] * 2; 1141 } else { 1142 mBuffer[evID].fEvent->StartTM[tmS] = -1; 1082 1143 } 1083 1144 } 1084 1145 } 1085 #endif 1086 int qncpy = 0; 1087 boardId = b; 1088 int fadBoard = ntohs (rd[i].rBuf->S[12]); 1089 int fadCrate = fadBoard / 256; 1090 if (boardId != (fadCrate * 10 + fadBoard % 256)) { 1091 snprintf (str, MXSTR, "wrong Board ID %d %d %d", 1092 fadCrate, fadBoard % 256, boardId); 1093 factOut (kWarn, 301, str); 1094 } 1095 if (mBuffer[evID].board[boardId] != -1) { 1096 snprintf (str, MXSTR, 1097 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ", 1098 evID, boardId, i, rd[i].fadLen, 1099 rd[i].rBuf->B[0], rd[i].rBuf->B[1], 1100 rd[i].rBuf->B[rd[i].bufPos - 2], 1101 rd[i].rBuf->B[rd[i].bufPos - 1]); 1102 factOut (kWarn, 501, str); 1103 goto EndBuf; //--> skip Board 1104 } 1105 1106 int iDx = evtIdx[evID]; //index into evtCtrl 1107 1108 memcpy (&mBuffer[evID].FADhead[boardId]. 1109 start_package_flag, &rd[i].rBuf->S[0], head_len); 1110 qncpy += head_len; 1111 1112 src = head_len / 2; 1113 for (px = 0; px < 9; px++) { //different sort in FAD board..... 1114 for (drs = 0; drs < 4; drs++) { 1115 pixH = ntohs (rd[i].rBuf->S[src++]); // ID 1116 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell 1117 pixR = ntohs (rd[i].rBuf->S[src++]); // roi 1118 //here we should check if pixH is correct .... 1119 1120 pixS = boardId * 36 + drs * 9 + px; 1121 src++; 1122 1123 1124 mBuffer[evID].fEvent->StartPix[pixS] = pixC; 1125 dest = pixS * roi[0]; 1126 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest], 1127 &rd[i].rBuf->S[src], roi[0] * 2); 1128 qncpy += roi[0] * 2; 1129 src += pixR; 1130 1131 if (px == 8) { 1132 tmS = boardId * 4 + drs; 1133 if (pixR > roi[0]) { //and we have additional TM info 1134 dest = tmS * roi[0] + NPIX * roi[0]; 1135 int srcT = src - roi[0]; 1136 mBuffer[evID].fEvent->StartTM[tmS] = 1137 (pixC + pixR - roi[0]) % 1024; 1138 memcpy (&mBuffer[evID].fEvent-> 1139 Adc_Data[dest], &rd[i].rBuf->S[srcT], 1140 roi[0] * 2); 1141 qncpy += roi[0] * 2; 1142 } else { 1143 mBuffer[evID].fEvent->StartTM[tmS] = -1; 1146 } // now we have stored a new board contents into Event structure 1147 1148 mBuffer[evID].fEvent->NumBoards++; 1149 mBuffer[evID].board[boardId] = boardId; 1150 evtCtrl.evtStat[iDx]++; 1151 evtCtrl.pcTime[iDx] = g_actTime; 1152 1153 if (++mBuffer[evID].nBoard >= actBoards) { 1154 int qnrun = 0; 1155 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ??? 1156 actrun = mBuffer[evID].runNum; 1157 int ir; 1158 for (ir = 0; ir < MAX_RUN; ir++) { 1159 qnrun++; 1160 if (runCtrl[ir].runId == actrun) { 1161 if (++runCtrl[ir].lastEvt == 0) { 1162 gotNewRun (actrun, mBuffer[evID].FADhead); 1163 snprintf (str, MXSTR, "gotNewRun %d (ev %d)", 1164 mBuffer[evID].runNum, 1165 mBuffer[evID].evNum); 1166 factOut (kInfo, 1, str); 1167 break; 1144 1168 } 1145 1169 } 1146 1170 } 1147 } // now we have stored a new board contents into Event structure 1148 1149 mBuffer[evID].fEvent->NumBoards++; 1150 mBuffer[evID].board[boardId] = boardId; 1151 evtCtrl.evtStat[iDx]++; 1152 evtCtrl.pcTime[iDx] = g_actTime; 1153 1154 if (++mBuffer[evID].nBoard >= actBoards) { 1155 int qnrun = 0; 1156 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ??? 1157 actrun = mBuffer[evID].runNum; 1158 int ir; 1159 for (ir = 0; ir < MAX_RUN; ir++) { 1160 qnrun++; 1161 if (runCtrl[ir].runId == actrun) { 1162 if (++runCtrl[ir].lastEvt == 0) { 1163 gotNewRun (actrun, mBuffer[evID].FADhead); 1164 snprintf (str, MXSTR, 1165 "gotNewRun %d (ev %d)", 1166 mBuffer[evID].runNum, 1167 mBuffer[evID].evNum); 1168 factOut (kInfo, 1, str); 1169 break; 1170 } 1171 } 1172 } 1173 } 1171 } 1172 snprintf (str, MXSTR, 1173 "%5d complete event roi %4d roiTM %d cpy %8d %5d", 1174 mBuffer[evID].evNum, roi[0], roi[8] - roi[0], 1175 qncpy, qnrun); 1176 factOut (kDebug, -1, str); 1177 1178 //complete event read ---> flag for next processing 1179 evtCtrl.evtStat[iDx] = 99; 1180 gi.evtTot++; 1181 } 1182 1183 EndBuf: 1184 rd[i].bufTyp = 0; //ready to read next header 1185 rd[i].bufLen = frst_len; 1186 rd[i].bufPos = 0; 1187 EndBuf1: 1188 ; 1189 } 1190 1191 } else { //we are reading event header 1192 rd[i].bufPos += jrd; 1193 rd[i].bufLen -= jrd; 1194 if (rd[i].bufPos >= minLen) { //sufficient data to take action 1195 //check if startflag correct; else shift block .... 1196 for (k = 0; k < rd[i].bufPos - 1; k++) { 1197 if (rd[i].rBuf->B[k] == start.B[1] 1198 && rd[i].rBuf->B[k + 1] == start.B[0]) 1199 break; 1200 } 1201 rd[i].skip += k; 1202 1203 if (k >= rd[i].bufPos - 1) { //no start of header found 1204 rd[i].bufPos = 0; 1205 rd[i].bufLen = head_len; 1206 } else if (k > 0) { 1207 rd[i].bufPos -= k; 1208 rd[i].bufLen += k; 1209 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], 1210 rd[i].bufPos); 1211 #ifdef EVTDEBUG 1212 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k], 1213 rd[i].bufPos); 1214 #endif 1215 } 1216 if (rd[i].bufPos >= minLen) { 1217 if (rd[i].skip > 0) { 1218 snprintf (str, MXSTR, "skipped %d bytes on port%d", 1219 rd[i].skip, i); 1220 factOut (kInfo, 666, str); 1221 rd[i].skip = 0; 1222 } 1223 goodhed++; 1224 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2; 1225 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]); 1226 rd[i].ftmTyp = ntohs (rd[i].rBuf->S[5]); 1227 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt) 1228 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt) 1229 rd[i].runID = ntohl (rd[i].rBuf->I[11]); 1230 rd[i].bufTyp = 1; //ready to read full record 1231 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos; 1232 1233 int fadboard = ntohs (rd[i].rBuf->S[12]); 1234 int fadcrate = fadboard / 256; 1235 fadboard = (fadcrate * 10 + fadboard % 256); 1236 #ifdef EVTDEBUG 1237 snprintf (str, MXSTR, 1238 "sk %3d head: %5d %5d %5d %10d %4d %6d", i, 1239 rd[i].fadLen, rd[i].evtID, rd[i].ftmID, 1240 rd[i].runID, fadboard, jrd); 1241 factOut (kDebug, 1, str); 1242 #endif 1243 1244 if (rd[i].runID == 0) 1245 rd[i].runID = gi_myRun; 1246 1247 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) { 1174 1248 snprintf (str, MXSTR, 1175 "%5d complete event roi %4d roiTM %d cpy %8d %5d", 1176 mBuffer[evID].evNum, roi[0], 1177 roi[8] - roi[0], qncpy, qnrun); 1178 factOut (kDebug, -1, str); 1179 1180 //complete event read ---> flag for next processing 1181 evtCtrl.evtStat[iDx] = 99; 1182 gi.evtTot++; 1249 "illegal event-length on port %d", i); 1250 factOut (kFatal, 881, str); 1251 rd[i].bufLen = 100000; //? 1183 1252 } 1184 1185 EndBuf: 1186 rd[i].bufTyp = 0; //ready to read next header 1187 rd[i].bufLen = frst_len; 1188 rd[i].bufPos = 0; 1189 EndBuf1: 1190 ; 1191 } 1192 1193 } else { //we are reading event header 1194 rd[i].bufPos += jrd; 1195 rd[i].bufLen -= jrd; 1196 if (rd[i].bufPos >= minLen) { //sufficient data to take action 1197 //check if startflag correct; else shift block .... 1198 for (k = 0; k < rd[i].bufPos - 1; k++) { 1199 if (rd[i].rBuf->B[k] == start.B[1] 1200 && rd[i].rBuf->B[k + 1] == start.B[0]) 1201 break; 1202 } 1203 rd[i].skip += k; 1204 1205 if (k >= rd[i].bufPos - 1) { //no start of header found 1206 rd[i].bufPos = 0; 1207 rd[i].bufLen = head_len; 1208 } else if (k > 0) { 1209 rd[i].bufPos -= k; 1210 rd[i].bufLen += k; 1211 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], 1212 rd[i].bufPos); 1213 #ifdef EVTDEBUG 1214 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k], 1215 rd[i].bufPos); 1216 #endif 1217 } 1218 if (rd[i].bufPos >= minLen) { 1219 if (rd[i].skip > 0) { 1220 snprintf (str, MXSTR, "skipped %d bytes on port%d", 1221 rd[i].skip, i); 1222 factOut (kInfo, 666, str); 1223 rd[i].skip = 0; 1224 } 1225 goodhed++; 1226 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2; 1227 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]); 1228 rd[i].ftmTyp = ntohl (rd[i].rBuf->S[5]); 1229 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt) 1230 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt) 1231 rd[i].runID = ntohl (rd[i].rBuf->I[11]); 1232 rd[i].bufTyp = 1; //ready to read full record 1233 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos; 1234 1235 int fadboard = ntohs (rd[i].rBuf->S[12]); 1236 int fadcrate = fadboard / 256; 1237 fadboard = (fadcrate * 10 + fadboard % 256); 1238 #ifdef EVTDEBUG 1239 snprintf (str, MXSTR, 1240 "sk %3d head: %5d %5d %5d %10d %4d %6d", i, 1241 rd[i].fadLen, rd[i].evtID, rd[i].ftmID, 1242 rd[i].runID, fadboard, jrd); 1243 factOut (kDebug, 1, str); 1244 #endif 1245 1246 if (rd[i].runID == 0) 1247 rd[i].runID = gi_myRun; 1248 1249 if (rd[i].bufLen <= head_len 1250 || rd[i].bufLen > MAX_LEN) { 1251 snprintf (str, MXSTR, 1252 "illegal event-length on port %d", i); 1253 factOut (kFatal, 881, str); 1254 rd[i].bufLen = 100000; //? 1255 } 1256 int fadBoard = ntohs (rd[i].rBuf->S[12]); 1257 debugHead (i, fadBoard, rd[i].rBuf); 1258 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event 1259 } else { 1260 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet 1261 } 1253 int fadBoard = ntohs (rd[i].rBuf->S[12]); 1254 debugHead (i, fadBoard, rd[i].rBuf); 1255 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event 1262 1256 } else { 1263 1257 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet 1264 1258 } 1265 1266 } //end interpreting last read 1267 } 1259 } else { 1260 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet 1261 } 1262 1263 } //end interpreting last read 1264 } 1268 1265 } //end of successful read anything 1269 1266 } //finished trying to read all sockets … … 1299 1296 && evtCtrl.pcTime[k0] < g_actTime - 10) { 1300 1297 int id = evtCtrl.evtBuf[k0]; 1301 int ic,ib,ik,jb;1302 1298 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d", 1303 1299 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0, … … 1305 1301 factOut (kWarn, 601, str); 1306 1302 1303 int ik,ib,jb; 1307 1304 ik=0; 1308 1305 for (ib=0; ib<NBOARDS; ib++) { … … 1313 1310 jb = mBuffer[id].board[ib]; 1314 1311 if ( jb<0 ) { 1315 snprintf (&str[ik], MXSTR, "."); 1316 ik++; 1312 if (gi_NumConnect[b] >0 ) { 1313 snprintf (&str[ik], MXSTR, "."); 1314 } else { 1315 snprintf (&str[ik], MXSTR, "x"); 1316 } 1317 1317 } else { 1318 1318 snprintf (&str[ik], MXSTR, "%d",jb%10); 1319 ik+=1;1320 1319 } 1320 ik++; 1321 1321 } 1322 1322 snprintf (&str[ik], MXSTR, "'"); 1323 1323 factOut (kWarn, 601, str); 1324 1324 1325 evtCtrl.evtStat[k0] = 9 3; //timeout for incomplete events1325 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events 1326 1326 gi.evtSkp++; 1327 1327 gi.evtTot++; … … 1418 1418 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket 1419 1419 if (i % 7 == 0) { 1420 gi_NumConnect[i / 7] -= cntsock; 1420 // gi_NumConnect[i / 7]--; 1421 gi_NumConnect[i / 7]-= cntsock ; 1421 1422 gi.numConn[i / 7]--; 1422 1423 gj.numConn[i / 7]--; … … 1522 1523 subProc (void *thrid) 1523 1524 { 1524 char str[MXSTR];1525 1525 int threadID, status, numWait, numProc, kd, k1, k0, k, jret; 1526 1526 struct timespec xwait; 1527 1528 int32_t cntr ; 1527 1529 1528 1530 threadID = (int) thrid; … … 1546 1548 } else { 1547 1549 int id = evtCtrl.evtBuf[k0]; 1550 1551 1548 1552 jret = 1549 1553 subProcEvt (threadID, mBuffer[id].FADhead, 1550 mBuffer[id].fEvent, mBuffer[id].mBoard, gi_runPtr ); 1554 mBuffer[id].fEvent, mBuffer[id].buffer); 1555 1551 1556 1552 1557 if (jret <= threadID) { … … 1592 1597 { 1593 1598 /* *** main loop processing file, including SW-trigger */ 1594 int numProc, numWait , iskip;1599 int numProc, numWait; 1595 1600 int k, status, j; 1596 1601 struct timespec xwait; … … 1650 1655 } else { 1651 1656 1657 //-------- it is better to open the run already here, so call can be used to initialize 1658 //-------- buffers etc. needed to interprete run (e.g. DRS calibration) 1652 1659 int id = evtCtrl.evtBuf[k0]; 1653 1660 uint32_t irun = mBuffer[id].runNum; 1654 1661 int32_t ievt = mBuffer[id].evNum; 1655 iskip = 0 ; 1656 if (irun != gi_actRun) { // we should open a new run ? 1657 if (irun < gi_actRun) { // very old event; skip it 1658 iskip = -1 ; // but anyhow checkEvent() will be done 1662 if (runCtrl[lastRun].runId == irun) { 1663 j = lastRun; 1664 } else { 1665 //check which fileID to use (or open if needed) 1666 for (j = 0; j < MAX_RUN; j++) { 1667 if (runCtrl[j].runId == irun) 1668 break; 1669 } 1670 if (j >= MAX_RUN) { 1659 1671 snprintf (str, MXSTR, 1660 "P event %d from old run???", ievt,irun); 1661 factOut (kWarn, 901, str); 1662 } else if (gi_actRun != 0) { // but there is still old one open ? 1663 1664 // // // // // // //MUST WAIT UNTIL PENDING OLD EVENTS PROCESSED ... 1665 // loop over all waiting events; and decide what to do .... 1666 int q,q0 ; 1667 int qcnt=0; 1668 for (q=k1; q < (k1 + kd); q++) { 1669 q0 = q % (MAX_EVT * MAX_RUN); 1670 if (evtCtrl.evtStat[q0] > 0 1671 && evtCtrl.evtStat[q0] < 9000) { 1672 //this event is still active in the queue 1673 int qd = evtCtrl.evtBuf[q0]; 1674 uint32_t qrun = mBuffer[qd].runNum; 1675 if (qrun < irun ) { 1676 if (evtCtrl.evtStat[q0] < 92) { 1677 //incomplete old event -> reduce timeout 1678 evtCtrl.pcTime[q0] = 0 ; 1679 } 1680 qcnt++ ; 1681 } 1682 } 1672 "P error: can not find run %d for event %d in %d", 1673 irun, ievt, id); 1674 factOut (kFatal, 901, str); 1675 } 1676 lastRun = j; 1677 } 1678 1679 if (runCtrl[j].fileId < 0) { 1680 //---- we need to open a new run ==> make sure all older runs are 1681 //---- finished and marked to be closed .... 1682 int j1; 1683 for (j1 = 0; j1 < MAX_RUN; j1++) { 1684 if (runCtrl[j1].fileId == 0) { 1685 runCtrl[j1].procId = 2; //--> do no longer accept events for processing 1686 //---- problem: processing still going on ==> must wait for closing .... 1687 snprintf (str, MXSTR, 1688 "P finish run since new one opened %d", 1689 runCtrl[j1].runId); 1690 runFinish1 (runCtrl[j1].runId); 1683 1691 } 1684 if (qcnt == 0 ) { 1685 runEnd (gi_actRun, gi_runPtr ); 1686 snprintf (str, MXSTR, 1687 "P run %d ended", gi_actRun); 1688 factOut (kWarn, 901, str); 1689 gi_actRun = 0; //there are no events from old runs left 1690 } 1691 } 1692 1693 1694 if (gi_actRun == 0) { // now we can try to open new run 1695 for (j = 0; j < MAX_RUN; j++) { 1696 if (runCtrl[j].runId == irun) 1697 break; 1698 } 1699 if (j >= MAX_RUN) { 1700 snprintf (str, MXSTR, 1701 "P error: can not find run %d for event %d in %d", 1702 irun, ievt, id); 1703 factOut (kFatal, 901, str); 1704 j = 0; 1705 } 1706 1707 if (runCtrl[j].fileId == 0) { 1708 snprintf (str, MXSTR, 1709 "P run %d is already open ???", irun); 1710 factOut (kWarn, 901, str); 1711 } else if (runCtrl[j].fileId > 0) { 1712 snprintf (str, MXSTR, 1713 "P run %d is already closed ???", irun); 1714 factOut (kWarn, 901, str); 1715 } else { // now open new run 1716 1717 actRun.Version = 1; 1718 actRun.RunType = -1; //to be adapted 1719 1720 actRun.Nroi = runCtrl[j].roi0; 1721 actRun.NroiTM = runCtrl[j].roi8; 1722 if (actRun.Nroi == actRun.NroiTM) 1723 actRun.NroiTM = 0; 1724 actRun.RunTime = runCtrl[j].firstTime; 1725 actRun.RunUsec = runCtrl[j].firstTime; 1726 actRun.NBoard = NBOARDS; 1727 actRun.NPix = NPIX; 1728 actRun.NTm = NTMARK; 1729 actRun.Nroi = mBuffer[id].nRoi; 1730 memcpy (actRun.FADhead, mBuffer[id].FADhead, 1731 NBOARDS * sizeof (PEVNT_HEADER)); 1732 1733 gi_actRun = irun; 1734 gi_runPtr = 1735 runStart (irun, &actRun, sizeof (actRun)); 1736 if (gi_runPtr == NULL) { 1737 snprintf (str, MXSTR, 1738 "P could not start run %d", irun); 1739 factOut (kFatal, 502, str); 1740 } else { 1741 snprintf (str, MXSTR, "P started new run %d", 1742 irun); 1743 factOut (kInfo, -1, str); 1744 } 1745 1746 runCtrl[j].fileHd = 1747 runOpen (irun, &actRun, sizeof (actRun)); 1748 if (runCtrl[j].fileHd == NULL) { 1749 snprintf (str, MXSTR, 1750 "P could not open a file for run %d", 1751 irun); 1752 factOut (kError, 502, str); 1753 runCtrl[j].fileId = 91; 1754 runCtrl[j].procId = 91; 1755 } else { 1756 snprintf (str, MXSTR, 1757 "P opened new run_file %d evt %d", irun, 1758 ievt); 1759 factOut (kInfo, -1, str); 1760 runCtrl[j].fileId = 0; 1761 runCtrl[j].procId = 0; 1762 } 1763 } 1764 } 1692 1693 } 1694 1695 actRun.Version = 1; 1696 actRun.RunType = -1; //to be adapted 1697 1698 actRun.Nroi = runCtrl[j].roi0; 1699 actRun.NroiTM = runCtrl[j].roi8; 1700 if (actRun.Nroi == actRun.NroiTM) 1701 actRun.NroiTM = 0; 1702 actRun.RunTime = runCtrl[j].firstTime; 1703 actRun.RunUsec = runCtrl[j].firstTime; 1704 actRun.NBoard = NBOARDS; 1705 actRun.NPix = NPIX; 1706 actRun.NTm = NTMARK; 1707 actRun.Nroi = mBuffer[id].nRoi; 1708 memcpy (actRun.FADhead, mBuffer[id].FADhead, 1709 NBOARDS * sizeof (PEVNT_HEADER)); 1710 1711 runCtrl[j].fileHd = 1712 runOpen (irun, &actRun, sizeof (actRun)); 1713 if (runCtrl[j].fileHd == NULL) { 1714 snprintf (str, MXSTR, 1715 "P could not open a file for run %d", irun); 1716 factOut (kError, 502, str); 1717 runCtrl[j].fileId = 91; 1718 runCtrl[j].procId = 91; 1719 } else { 1720 snprintf (str, MXSTR, "P opened new run_file %d evt %d", 1721 irun, ievt); 1722 factOut (kInfo, -1, str); 1723 runCtrl[j].fileId = 0; 1724 runCtrl[j].procId = 0; 1725 } 1726 1765 1727 } 1766 1767 if (irun == gi_actRun) { 1768 // now we are sure the run is ready and we can start processing the event 1769 id = evtCtrl.evtBuf[k0]; 1728 //-------- also check if run shall be closed (==> skip event, but do not close the file !!! ) 1729 if (runCtrl[j].procId == 0) { 1730 if (runCtrl[j].closeTime < g_actTime 1731 || runCtrl[j].lastTime < g_actTime - 300 1732 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) { 1733 snprintf (str, MXSTR, 1734 "P reached end of run condition for run %d", 1735 irun); 1736 factOut (kInfo, 502, str); 1737 runFinish1 (runCtrl[j].runId); 1738 runCtrl[j].procId = 1; 1739 } 1740 } 1741 if (runCtrl[j].procId != 0) { 1742 snprintf (str, MXSTR, 1743 "P skip event %d because no active run %d", ievt, 1744 irun); 1745 factOut (kDebug, 502, str); 1746 evtCtrl.evtStat[k0] = 9091; 1747 } else { 1748 //-------- 1749 //-------- 1750 id = evtCtrl.evtBuf[k0]; 1770 1751 int itevt = mBuffer[id].trgNum; 1771 1752 int itrg = mBuffer[id].trgTyp; … … 1773 1754 int roiTM = mBuffer[id].nRoiTM; 1774 1755 1775 1756 //make sure unused pixels/tmarks are cleared to zero 1776 1757 if (roiTM == roi) 1777 1758 roiTM = 0; … … 1791 1772 1792 1773 1793 //and set correct event header 1774 //and set correct event header ; also check for consistency in event (not yet) 1794 1775 mBuffer[id].fEvent->Roi = roi; 1795 1776 mBuffer[id].fEvent->RoiTM = roiTM; … … 1803 1784 mBuffer[id].fEvent->SoftTrig = 0; 1804 1785 1805 int mboard = -1; 1786 1806 1787 for (ib = 0; ib < NBOARDS; ib++) { 1807 if (mBuffer[id].board[ib] == -1) { 1788 if (mBuffer[id].board[ib] == -1) { //board is not read 1808 1789 mBuffer[id].FADhead[ib].start_package_flag = 0; 1809 1790 mBuffer[id].fEvent->BoardTime[ib] = 0; … … 1811 1792 mBuffer[id].fEvent->BoardTime[ib] = 1812 1793 ntohl (mBuffer[id].FADhead[ib].time); 1813 if (mboard < 0)1814 mboard = ib;1815 1794 } 1816 1795 } 1817 mBuffer[id].mBoard = mboard; 1796 1818 1797 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead, 1819 mBuffer[id].fEvent , mboard);1798 mBuffer[id].fEvent); 1820 1799 gi.procTot++; 1821 1800 numProc++; 1822 1801 1823 if (i < 0 || iskip < 0) {1824 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped1802 if (i < 0) { 1803 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped 1825 1804 gi.procErr++; 1826 1805 } else { 1827 evtCtrl.evtStat[k0] = 1000; //flag event to be processed1806 evtCtrl.evtStat[k0] = 1000; 1828 1807 runCtrl[j].procEvt++; 1829 if (runCtrl[j].procId != 0)1830 mBuffer[id].toWrite = -1; //flag event to be processed but now written1831 else1832 mBuffer[id].toWrite = 0;1833 1808 } 1834 1809 } … … 1930 1905 1931 1906 int numWrite, numWait; 1932 int k, j , i;1907 int k, j ; 1933 1908 struct timespec xwait; 1934 1909 char str[MXSTR]; … … 1962 1937 int k0 = k % (MAX_EVT * MAX_RUN); 1963 1938 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if) 1964 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9 900) {1939 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) { 1965 1940 1966 1941 if (gi_resetR > 1) { //we must drain the buffer asap … … 2019 1994 snprintf (str, MXSTR, "W opened new run_file %d evt %d", 2020 1995 irun, ievt); 2021 factOut (k Warn, -1, str);1996 factOut (kInfo, -1, str); 2022 1997 runCtrl[j].fileId = 0; 2023 1998 } … … 2025 2000 } 2026 2001 2027 if (runCtrl[j].fileId != 0 || mBuffer[id].toWrite < 0) {2002 if (runCtrl[j].fileId != 0) { 2028 2003 if (runCtrl[j].fileId < 0) { 2029 2004 snprintf (str, MXSTR, … … 2043 2018 gi.wrtErr++; 2044 2019 } else { 2045 i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent, 2020 // snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id])); 2021 // factOut (kInfo, 504, str); 2022 int i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent, 2046 2023 sizeof (mBuffer[id])); 2047 2024 if (i >= 0) { … … 2053 2030 ievt, irun, k0); 2054 2031 factOut (kDebug, 504, str); 2032 // gj.writEvt++ ; 2055 2033 } else { 2056 2034 snprintf (str, MXSTR, "W error writing event for run %d", … … 2129 2107 2130 2108 runCtrl[j].closeTime = g_actTime - 1; 2131 i = runClose (runCtrl[j].fileHd, &runTail[j],2109 int i = runClose (runCtrl[j].fileHd, &runTail[j], 2132 2110 sizeof (runTail[j])); 2133 2111 if (i < 0) { … … 2182 2160 2183 2161 runCtrl[j].closeTime = g_actTime - 1; 2184 i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));2162 int i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j])); 2185 2163 int ii = 0; 2186 2164 if (runCtrl[j].closeTime < g_actTime) … … 2238 2216 2239 2217 //prepare for subProcesses 2240 //gi_maxSize = g_maxSize;2241 // if (gi_maxSize <0)2242 gi_maxSize = 0;2218 gi_maxSize = g_maxSize; 2219 if (gi_maxSize <= 0) 2220 gi_maxSize = 1; 2243 2221 2244 2222 gi_maxProc = g_maxProc; … … 2319 2297 int 2320 2298 subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, 2321 int 16_t mboard, void* buffer)2299 int8_t * buffer) 2322 2300 { 2323 2301 printf ("called subproc %d\n", threadID); … … 2364 2342 2365 2343 int 2366 eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event , int mboard)2344 eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event) 2367 2345 { 2368 2346 int i = 0; … … 2454 2432 2455 2433 g_maxProc = 20; 2456 g_maxSize = 0;2434 g_maxSize = 30000; 2457 2435 2458 2436 g_runStat = 40;
Note:
See TracChangeset
for help on using the changeset viewer.