Changeset 12091 for trunk/FACT++/src/EventBuilder.c
- Timestamp:
- 09/13/11 14:45:19 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.c
r11895 r12091 2 2 // // // #define EVTDEBUG 3 3 4 #define NUMSOCK 1 //set to 7 for old configuration 5 #define MAXREAD 65536 //64kB wiznet buffer 4 #define NUMSOCK 1 //set to 7 for old configuration 5 #define MAXREAD 65536 //64kB wiznet buffer 6 #define MIN_LEN 32 // min #bytes needed to interpret FADheader 7 #define MAX_LEN 256*1024 // size of read-buffer per socket 6 8 7 9 #include <stdlib.h> … … 21 23 #include <netinet/tcp.h> 22 24 #include <pthread.h> 23 #include <sched.h> 25 #include <sched.h> 24 26 25 27 #include "EventBuilder.h" … … 35 37 }; 36 38 37 #define MIN_LEN 32 // min #bytes needed to interpret FADheader 38 #define MAX_LEN 256*1024 // size of read-buffer per socket 39 40 void *gi_procPtr; 41 39 42 40 43 //#define nanosleep(x,y) … … 45 48 //extern int runFinish (uint32_t runnr); 46 49 50 51 52 extern void * runStart (uint32_t irun, RUN_HEAD * runhd, size_t len); 53 54 extern int runEnd (uint32_t, void * runPtr ); 55 47 56 extern void factOut (int severity, int err, char *message); 48 57 … … 54 63 extern void factStatNew (EVT_STAT gi); 55 64 56 extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event); 65 extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event, 66 int mboard); 57 67 58 68 extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, 59 int 8_t* buffer);69 int16_t mboard, void * buffer); 60 70 61 71 extern void debugHead (int i, int j, void *buf); … … 71 81 int g_maxProc; 72 82 int g_maxSize; 73 int gi_maxSize ;83 int gi_maxSize =0; 74 84 int gi_maxProc; 85 uint32_t gi_actRun; 86 void *gi_runPtr; 75 87 76 88 uint g_actTime; … … 446 458 if (runCtrl[k].runId == runID) { 447 459 // if (runCtrl[k].procId > 0) { //run is closed -> reject 448 // snprintf (str, MXSTR, "skip event since run %d finished", runID);460 // snprintf (str, MXSTR, "skip event since run %d already closed", runID); 449 461 // factOut (kInfo, 931, str); 450 462 // return -21; … … 494 506 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec; 495 507 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed 496 // runCtrl[evFree].lastTime = 0;497 508 498 509 runTail[evFree].nEventsOk = … … 539 550 } 540 551 541 mBuffer[i].buffer = malloc (gi_maxSize); 542 if (mBuffer[i].buffer == NULL) { 543 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID); 544 factOut (kError, 882, str); 545 free (mBuffer[i].FADhead); 546 mBuffer[i].FADhead = NULL; 547 free (mBuffer[i].fEvent); 548 mBuffer[i].fEvent = NULL; 549 return -32; 550 } 552 // if (gi_maxSize > 0) { 553 // mBuffer[i].buffer = malloc (gi_maxSize); 554 // if (mBuffer[i].buffer == NULL) { 555 // snprintf (str, MXSTR, "malloc buffer failed for event %d", evID); 556 // factOut (kError, 882, str); 557 // free (mBuffer[i].FADhead); 558 // mBuffer[i].FADhead = NULL; 559 // free (mBuffer[i].fEvent); 560 // mBuffer[i].fEvent = NULL; 561 // return -32; 562 // } 563 // } else { 564 // mBuffer[i].buffer = NULL; 565 // } 566 551 567 //flag all boards as unused 552 568 mBuffer[i].nBoard = 0; … … 631 647 mBuffer[i].FADhead = NULL; 632 648 633 free (mBuffer[i].buffer); 634 mBuffer[i].buffer = NULL; 649 // if (gi_maxSize > 0) { 650 // free (mBuffer[i].buffer); 651 // mBuffer[i].buffer = NULL; 652 // } 635 653 636 654 headmem = NBOARDS * sizeof (PEVNT_HEADER); … … 711 729 int goodhed = 0; 712 730 731 char str[MXSTR]; 713 732 int sockDef[NBOARDS]; //internal state of sockets 714 733 int jrdx; … … 730 749 731 750 /* CPU_ZERO initializes all the bits in the mask to zero. */ 732 CPU_ZERO (&mask);751 // CPU_ZERO (&mask); 733 752 /* CPU_SET sets only the bit corresponding to cpu. */ 734 753 cpu = 7; 735 CPU_SET (cpu, &mask);754 // CPU_SET (cpu, &mask); 736 755 737 756 /* sched_setaffinity returns 0 in success */ 738 if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {739 snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);740 factOut (kWarn, -1, str);741 }757 // if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { 758 // snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu); 759 // factOut (kWarn, -1, str); 760 // } 742 761 743 762 … … 758 777 for (i = 0; i < NBOARDS; i++) 759 778 sockDef[i] = 0; 779 780 gi_actRun = 0; 781 760 782 761 783 START: … … 772 794 numok = numok2 = 0; 773 795 774 int cntsock = 8 - NUMSOCK 796 int cntsock = 8 - NUMSOCK; 775 797 776 798 if (gi_resetS > 0) { … … 802 824 gj.evtSkip = gj.evtWrite = gj.evtErr = 0; 803 825 804 int b, p;826 int b, p; 805 827 for (b = 0; b < NBOARDS; b++) 806 828 gj.badRoi[b] = 0; … … 846 868 847 869 for (p = p0 + 1; p < p0 + 8; p++) { 848 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket 870 if (p < NUMSOCK) 871 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket 849 872 k++; 850 873 } … … 867 890 868 891 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read 869 b = i / 7 ; 870 p = i % 7 ; 871 872 //if ( b==32 && p>0) { ; } 873 if ( p >= NUMSOCK) { ; } 874 else { 875 if (sockDef[b] > 0) 876 s0 = +1; 877 else 878 s0 = -1; 879 880 if (rd[i].sockStat < 0) { //try to connect if not yet done 881 rd[i].sockStat = connect (rd[i].socket, 882 (struct sockaddr *) &rd[i].SockAddr, 883 sizeof (rd[i].SockAddr)); 884 if (rd[i].sockStat == 0) { //successfull ==> 885 if (sockDef[b] > 0) { 886 rd[i].bufTyp = 0; // expect a header 887 rd[i].bufLen = frst_len; // max size to read at begining 888 } else { 889 rd[i].bufTyp = -1; // full data to be skipped 890 rd[i].bufLen = MAX_LEN; //huge for skipping 892 b = i / 7; 893 p = i % 7; 894 895 if (p >= NUMSOCK) {; 896 } else { 897 if (sockDef[b] > 0) 898 s0 = +1; 899 else 900 s0 = -1; 901 902 if (rd[i].sockStat < 0) { //try to connect if not yet done 903 rd[i].sockStat = connect (rd[i].socket, 904 (struct sockaddr *) &rd[i].SockAddr, 905 sizeof (rd[i].SockAddr)); 906 if (rd[i].sockStat == 0) { //successfull ==> 907 if (sockDef[b] > 0) { 908 rd[i].bufTyp = 0; // expect a header 909 rd[i].bufLen = frst_len; // max size to read at begining 910 } else { 911 rd[i].bufTyp = -1; // full data to be skipped 912 rd[i].bufLen = MAX_LEN; //huge for skipping 913 } 914 rd[i].bufPos = 0; // no byte read so far 915 rd[i].skip = 0; // start empty 916 gi_NumConnect[b] += cntsock; 917 918 gi.numConn[b]++; 919 gj.numConn[b]++; 920 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]); 921 factOut (kInfo, -1, str); 891 922 } 892 rd[i].bufPos = 0; // no byte read so far893 rd[i].skip = 0; // start empty894 // gi_NumConnect[b]++;895 gi_NumConnect[b] += cntsock ;896 897 gi.numConn[b]++;898 gj.numConn[b]++;899 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);900 factOut (kInfo, -1, str);901 923 } 902 } 903 904 if (rd[i].sockStat == 0) { //we have a connection ==> try to read 905 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full] 906 numok++; 907 size_t maxread = rd[i].bufLen ; 908 if (maxread > MAXREAD ) maxread=MAXREAD ; 909 910 jrd = 911 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos], 912 maxread, MSG_DONTWAIT); 913 // rd[i].bufLen, MSG_DONTWAIT); 914 915 if (jrd > 0) { 916 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd); 924 925 if (rd[i].sockStat == 0) { //we have a connection ==> try to read 926 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full] 927 numok++; 928 size_t maxread = rd[i].bufLen; 929 if (maxread > MAXREAD) 930 maxread = MAXREAD; 931 932 jrd = 933 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos], 934 maxread, MSG_DONTWAIT); 935 936 if (jrd > 0) { 937 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd); 917 938 #ifdef EVTDEBUG 918 memcpy (&rd[i].xBuf->B[rd[i].bufPos], 919 &rd[i].rBuf->B[rd[i].bufPos], jrd); 920 snprintf (str, MXSTR, 921 "read sock %3d bytes %5d len %5d first %d %d", i, 922 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos], 923 rd[i].rBuf->B[rd[i].bufPos + 1]); 924 factOut (kDebug, 301, str); 925 #endif 926 } 927 928 if (jrd == 0) { //connection has closed ... 929 snprintf (str, MXSTR, "Socket %d closed by FAD", i); 930 factOut (kInfo, 441, str); 931 GenSock (s0, i, 0, NULL, &rd[i]); 932 gi.gotErr[b]++; 933 // gi_NumConnect[b]--; 934 gi_NumConnect[b]-= cntsock ; 935 gi.numConn[b]--; 936 gj.numConn[b]--; 937 938 } else if (jrd < 0) { //did not read anything 939 if (errno != EAGAIN && errno != EWOULDBLOCK) { 940 snprintf (str, MXSTR, "Error Reading from %d | %m", i); 941 factOut (kError, 442, str); 942 gi.gotErr[b]++; 943 } else 944 numok--; //else nothing waiting to be read 945 jrd = 0; 946 } 947 } else { 948 jrd = 0; //did read nothing as requested 949 snprintf (str, MXSTR, "do not read from socket %d %d", i, 950 rd[i].bufLen); 951 factOut (kDebug, 301, str); 952 } 953 954 gi.gotByte[b] += jrd; 955 gj.rateBytes[b] += jrd; 956 957 if (jrd > 0) { 958 numokx++; 959 jrdx += jrd; 960 } 961 962 963 if (rd[i].bufTyp < 0) { // we are skipping this board ... 964 // just do nothing 965 #ifdef EVTDEBUG 966 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd, 967 i); 968 factOut (kInfo, 301, str); 969 #endif 970 971 } else if (rd[i].bufTyp > 0) { // we are reading data ... 972 if (jrd < rd[i].bufLen) { //not yet all read 973 rd[i].bufPos += jrd; //==> prepare for continuation 974 rd[i].bufLen -= jrd; 975 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data 976 } else { //full dataset read 977 rd[i].bufLen = 0; 978 rd[i].bufPos = rd[i].fadLen; 979 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0] 980 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) { 981 gi.evtErr++; 939 memcpy (&rd[i].xBuf->B[rd[i].bufPos], 940 &rd[i].rBuf->B[rd[i].bufPos], jrd); 982 941 snprintf (str, MXSTR, 983 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ", 984 i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0], 985 rd[i].rBuf->B[1], 986 rd[i].rBuf->B[rd[i].bufPos - 2], 987 rd[i].rBuf->B[rd[i].bufPos - 1]); 988 factOut (kError, 301, str); 989 goto EndBuf; 990 991 #ifdef EVTDEBUG 992 } else { 993 snprintf (str, MXSTR, 994 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d", 995 i, rd[i].fadLen, rd[i].rBuf->B[0], 996 rd[i].rBuf->B[1], start.B[1], start.B[0], 997 rd[i].rBuf->B[rd[i].bufPos - 2], 998 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1], 999 stop.B[0]); 942 "read sock %3d bytes %5d len %5d first %d %d", 943 i, jrd, rd[i].bufLen, 944 rd[i].rBuf->B[rd[i].bufPos], 945 rd[i].rBuf->B[rd[i].bufPos + 1]); 1000 946 factOut (kDebug, 301, str); 1001 947 #endif 1002 948 } 1003 949 1004 if (jrd > 0) 1005 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event 1006 1007 //we have a complete buffer, copy to WORK area 1008 int jr; 1009 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]); 1010 for (jr = 0; jr < 9; jr++) { 1011 roi[jr] = 1012 ntohs (rd[i]. 1013 rBuf->S[head_len / 2 + 2 + jr * (roi[0] + 4)]); 950 if (jrd == 0) { //connection has closed ... 951 snprintf (str, MXSTR, "Socket %d closed by FAD", i); 952 factOut (kInfo, 441, str); 953 GenSock (s0, i, 0, NULL, &rd[i]); 954 gi.gotErr[b]++; 955 gi_NumConnect[b] -= cntsock; 956 gi.numConn[b]--; 957 gj.numConn[b]--; 958 959 } else if (jrd < 0) { //did not read anything 960 if (errno != EAGAIN && errno != EWOULDBLOCK) { 961 snprintf (str, MXSTR, "Error Reading from %d | %m", 962 i); 963 factOut (kError, 442, str); 964 gi.gotErr[b]++; 965 } else 966 numok--; //else nothing waiting to be read 967 jrd = 0; 1014 968 } 1015 //get index into mBuffer for this event (create if needed) 1016 1017 int actid; 1018 if (g_useFTM > 0) 1019 actid = rd[i].evtID; 1020 else 1021 actid = rd[i].ftmID; 1022 1023 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i, 1024 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID, 1025 rd[i].evtID); 1026 1027 if (evID < -1000) { 1028 goto EndBuf; //not usable board/event/run --> skip it 1029 } 1030 if (evID < 0) { //no space left, retry later 969 } else { 970 jrd = 0; //did read nothing as requested 971 snprintf (str, MXSTR, "do not read from socket %d %d", i, 972 rd[i].bufLen); 973 factOut (kDebug, 301, str); 974 } 975 976 gi.gotByte[b] += jrd; 977 gj.rateBytes[b] += jrd; 978 979 if (jrd > 0) { 980 numokx++; 981 jrdx += jrd; 982 } 983 984 985 if (rd[i].bufTyp < 0) { // we are skipping this board ... 986 // just do nothing 1031 987 #ifdef EVTDEBUG 1032 if (rd[i].bufLen != 0) { 1033 snprintf (str, MXSTR, "something screwed up"); 1034 factOut (kFatal, 1, str); 988 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd, 989 i); 990 factOut (kInfo, 301, str); 991 #endif 992 993 } else if (rd[i].bufTyp > 0) { // we are reading data ... 994 if (jrd < rd[i].bufLen) { //not yet all read 995 rd[i].bufPos += jrd; //==> prepare for continuation 996 rd[i].bufLen -= jrd; 997 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data 998 } else { //full dataset read 999 rd[i].bufLen = 0; 1000 rd[i].bufPos = rd[i].fadLen; 1001 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0] 1002 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) { 1003 gi.evtErr++; 1004 snprintf (str, MXSTR, 1005 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ", 1006 i, rd[i].fadLen, rd[i].evtID, 1007 rd[i].rBuf->B[0], rd[i].rBuf->B[1], 1008 rd[i].rBuf->B[rd[i].bufPos - 2], 1009 rd[i].rBuf->B[rd[i].bufPos - 1]); 1010 factOut (kError, 301, str); 1011 goto EndBuf; 1012 1013 #ifdef EVTDEBUG 1014 } else { 1015 snprintf (str, MXSTR, 1016 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d", 1017 i, rd[i].fadLen, rd[i].rBuf->B[0], 1018 rd[i].rBuf->B[1], start.B[1], start.B[0], 1019 rd[i].rBuf->B[rd[i].bufPos - 2], 1020 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1], 1021 stop.B[0]); 1022 factOut (kDebug, 301, str); 1023 #endif 1035 1024 } 1036 #endif 1037 xwait.tv_sec = 0; 1038 xwait.tv_nsec = 10000000; // sleep for ~10 msec 1039 nanosleep (&xwait, NULL); 1040 goto EndBuf1; //hope there is free space next round 1041 } 1042 //we have a valid entry in mBuffer[]; fill it 1043 1025 1026 if (jrd > 0) 1027 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event 1028 1029 //we have a complete buffer, copy to WORK area 1030 int jr; 1031 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]); 1032 for (jr = 0; jr < 9; jr++) { 1033 roi[jr] = 1034 ntohs (rd[i].rBuf->S[head_len / 2 + 2 + 1035 jr * (roi[0] + 4)]); 1036 } 1037 //get index into mBuffer for this event (create if needed) 1038 1039 int actid; 1040 if (g_useFTM > 0) 1041 actid = rd[i].evtID; 1042 else 1043 actid = rd[i].ftmID; 1044 1045 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i, 1046 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID, 1047 rd[i].evtID); 1048 1049 if (evID < -1000) { 1050 goto EndBuf; //not usable board/event/run --> skip it 1051 } 1052 if (evID < 0) { //no space left, retry later 1044 1053 #ifdef EVTDEBUG 1045 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0], 1046 rd[i].fadLen); 1047 if (xchk != 0) { 1048 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d", 1049 xchk, rd[i].fadLen, i); 1050 factOut (kFatal, 1, str); 1051 1052 uint iq; 1053 for (iq = 0; iq < rd[i].fadLen; iq++) { 1054 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) { 1055 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq, 1056 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]); 1054 if (rd[i].bufLen != 0) { 1055 snprintf (str, MXSTR, "something screwed up"); 1057 1056 factOut (kFatal, 1, str); 1058 1057 } 1058 #endif 1059 xwait.tv_sec = 0; 1060 xwait.tv_nsec = 10000000; // sleep for ~10 msec 1061 nanosleep (&xwait, NULL); 1062 goto EndBuf1; //hope there is free space next round 1059 1063 } 1060 } 1061 #endif 1062 int qncpy = 0; 1063 boardId = b; 1064 int fadBoard = ntohs (rd[i].rBuf->S[12]); 1065 int fadCrate = fadBoard / 256; 1066 if (boardId != (fadCrate * 10 + fadBoard % 256)) { 1067 snprintf (str, MXSTR, "wrong Board ID %d %d %d", 1068 fadCrate, fadBoard % 256, boardId); 1069 factOut (kWarn, 301, str); 1070 } 1071 if (mBuffer[evID].board[boardId] != -1) { 1072 snprintf (str, MXSTR, 1073 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ", 1074 evID, boardId, i, rd[i].fadLen, 1075 rd[i].rBuf->B[0], rd[i].rBuf->B[1], 1076 rd[i].rBuf->B[rd[i].bufPos - 2], 1077 rd[i].rBuf->B[rd[i].bufPos - 1]); 1078 factOut (kWarn, 501, str); 1079 goto EndBuf; //--> skip Board 1080 } 1081 1082 int iDx = evtIdx[evID]; //index into evtCtrl 1083 1084 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag, 1085 &rd[i].rBuf->S[0], head_len); 1086 qncpy += head_len; 1087 1088 src = head_len / 2; 1089 for (px = 0; px < 9; px++) { //different sort in FAD board..... 1090 for (drs = 0; drs < 4; drs++) { 1091 pixH = ntohs (rd[i].rBuf->S[src++]); // ID 1092 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell 1093 pixR = ntohs (rd[i].rBuf->S[src++]); // roi 1094 //here we should check if pixH is correct .... 1095 1096 pixS = boardId * 36 + drs * 9 + px; 1097 src++; 1098 1099 1100 mBuffer[evID].fEvent->StartPix[pixS] = pixC; 1101 dest = pixS * roi[0]; 1102 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest], 1103 &rd[i].rBuf->S[src], roi[0] * 2); 1104 qncpy += roi[0] * 2; 1105 src += pixR; 1106 1107 if (px == 8) { 1108 tmS = boardId * 4 + drs; 1109 if (pixR > roi[0]) { //and we have additional TM info 1110 dest = tmS * roi[0] + NPIX * roi[0]; 1111 int srcT = src - roi[0]; 1112 mBuffer[evID].fEvent->StartTM[tmS] = 1113 (pixC + pixR - roi[0]) % 1024; 1114 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest], 1115 &rd[i].rBuf->S[srcT], roi[0] * 2); 1116 qncpy += roi[0] * 2; 1117 } else { 1118 mBuffer[evID].fEvent->StartTM[tmS] = -1; 1064 //we have a valid entry in mBuffer[]; fill it 1065 1066 #ifdef EVTDEBUG 1067 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0], 1068 rd[i].fadLen); 1069 if (xchk != 0) { 1070 snprintf (str, MXSTR, 1071 "ERROR OVERWRITE %d %d on port %d", xchk, 1072 rd[i].fadLen, i); 1073 factOut (kFatal, 1, str); 1074 1075 uint iq; 1076 for (iq = 0; iq < rd[i].fadLen; iq++) { 1077 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) { 1078 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, 1079 iq, rd[i].rBuf->B[iq], 1080 rd[i].xBuf->B[iq]); 1081 factOut (kFatal, 1, str); 1119 1082 } 1120 1083 } 1121 1084 } 1122 } // now we have stored a new board contents into Event structure 1123 1124 mBuffer[evID].fEvent->NumBoards++; 1125 mBuffer[evID].board[boardId] = boardId; 1126 evtCtrl.evtStat[iDx]++; 1127 evtCtrl.pcTime[iDx] = g_actTime; 1128 1129 if (++mBuffer[evID].nBoard >= actBoards) { 1130 int qnrun = 0; 1131 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ??? 1132 actrun = mBuffer[evID].runNum; 1133 int ir; 1134 for (ir = 0; ir < MAX_RUN; ir++) { 1135 qnrun++; 1136 if (runCtrl[ir].runId == actrun) { 1137 if (++runCtrl[ir].lastEvt == 0) { 1138 gotNewRun (actrun, mBuffer[evID].FADhead); 1139 snprintf (str, MXSTR, "gotNewRun %d (ev %d)", 1140 mBuffer[evID].runNum, 1141 mBuffer[evID].evNum); 1142 factOut (kInfo, 1, str); 1143 break; 1085 #endif 1086 int qncpy = 0; 1087 boardId = b; 1088 int fadBoard = ntohs (rd[i].rBuf->S[12]); 1089 int fadCrate = fadBoard / 256; 1090 if (boardId != (fadCrate * 10 + fadBoard % 256)) { 1091 snprintf (str, MXSTR, "wrong Board ID %d %d %d", 1092 fadCrate, fadBoard % 256, boardId); 1093 factOut (kWarn, 301, str); 1094 } 1095 if (mBuffer[evID].board[boardId] != -1) { 1096 snprintf (str, MXSTR, 1097 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ", 1098 evID, boardId, i, rd[i].fadLen, 1099 rd[i].rBuf->B[0], rd[i].rBuf->B[1], 1100 rd[i].rBuf->B[rd[i].bufPos - 2], 1101 rd[i].rBuf->B[rd[i].bufPos - 1]); 1102 factOut (kWarn, 501, str); 1103 goto EndBuf; //--> skip Board 1104 } 1105 1106 int iDx = evtIdx[evID]; //index into evtCtrl 1107 1108 memcpy (&mBuffer[evID].FADhead[boardId]. 1109 start_package_flag, &rd[i].rBuf->S[0], head_len); 1110 qncpy += head_len; 1111 1112 src = head_len / 2; 1113 for (px = 0; px < 9; px++) { //different sort in FAD board..... 1114 for (drs = 0; drs < 4; drs++) { 1115 pixH = ntohs (rd[i].rBuf->S[src++]); // ID 1116 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell 1117 pixR = ntohs (rd[i].rBuf->S[src++]); // roi 1118 //here we should check if pixH is correct .... 1119 1120 pixS = boardId * 36 + drs * 9 + px; 1121 src++; 1122 1123 1124 mBuffer[evID].fEvent->StartPix[pixS] = pixC; 1125 dest = pixS * roi[0]; 1126 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest], 1127 &rd[i].rBuf->S[src], roi[0] * 2); 1128 qncpy += roi[0] * 2; 1129 src += pixR; 1130 1131 if (px == 8) { 1132 tmS = boardId * 4 + drs; 1133 if (pixR > roi[0]) { //and we have additional TM info 1134 dest = tmS * roi[0] + NPIX * roi[0]; 1135 int srcT = src - roi[0]; 1136 mBuffer[evID].fEvent->StartTM[tmS] = 1137 (pixC + pixR - roi[0]) % 1024; 1138 memcpy (&mBuffer[evID].fEvent-> 1139 Adc_Data[dest], &rd[i].rBuf->S[srcT], 1140 roi[0] * 2); 1141 qncpy += roi[0] * 2; 1142 } else { 1143 mBuffer[evID].fEvent->StartTM[tmS] = -1; 1144 1144 } 1145 1145 } 1146 1146 } 1147 } // now we have stored a new board contents into Event structure 1148 1149 mBuffer[evID].fEvent->NumBoards++; 1150 mBuffer[evID].board[boardId] = boardId; 1151 evtCtrl.evtStat[iDx]++; 1152 evtCtrl.pcTime[iDx] = g_actTime; 1153 1154 if (++mBuffer[evID].nBoard >= actBoards) { 1155 int qnrun = 0; 1156 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ??? 1157 actrun = mBuffer[evID].runNum; 1158 int ir; 1159 for (ir = 0; ir < MAX_RUN; ir++) { 1160 qnrun++; 1161 if (runCtrl[ir].runId == actrun) { 1162 if (++runCtrl[ir].lastEvt == 0) { 1163 gotNewRun (actrun, mBuffer[evID].FADhead); 1164 snprintf (str, MXSTR, 1165 "gotNewRun %d (ev %d)", 1166 mBuffer[evID].runNum, 1167 mBuffer[evID].evNum); 1168 factOut (kInfo, 1, str); 1169 break; 1170 } 1171 } 1172 } 1173 } 1174 snprintf (str, MXSTR, 1175 "%5d complete event roi %4d roiTM %d cpy %8d %5d", 1176 mBuffer[evID].evNum, roi[0], 1177 roi[8] - roi[0], qncpy, qnrun); 1178 factOut (kDebug, -1, str); 1179 1180 //complete event read ---> flag for next processing 1181 evtCtrl.evtStat[iDx] = 99; 1182 gi.evtTot++; 1147 1183 } 1148 snprintf (str, MXSTR, 1149 "%5d complete event roi %4d roiTM %d cpy %8d %5d", 1150 mBuffer[evID].evNum, roi[0], roi[8] - roi[0], 1151 qncpy, qnrun); 1152 factOut (kDebug, -1, str); 1153 1154 //complete event read ---> flag for next processing 1155 evtCtrl.evtStat[iDx] = 99; 1156 gi.evtTot++; 1184 1185 EndBuf: 1186 rd[i].bufTyp = 0; //ready to read next header 1187 rd[i].bufLen = frst_len; 1188 rd[i].bufPos = 0; 1189 EndBuf1: 1190 ; 1157 1191 } 1158 1192 1159 EndBuf: 1160 rd[i].bufTyp = 0; //ready to read next header 1161 rd[i].bufLen = frst_len; 1162 rd[i].bufPos = 0; 1163 EndBuf1: 1164 ; 1165 } 1166 1167 } else { //we are reading event header 1168 rd[i].bufPos += jrd; 1169 rd[i].bufLen -= jrd; 1170 if (rd[i].bufPos >= minLen) { //sufficient data to take action 1171 //check if startflag correct; else shift block .... 1172 for (k = 0; k < rd[i].bufPos - 1; k++) { 1173 if (rd[i].rBuf->B[k] == start.B[1] 1174 && rd[i].rBuf->B[k + 1] == start.B[0]) 1175 break; 1176 } 1177 rd[i].skip += k; 1178 1179 if (k >= rd[i].bufPos - 1) { //no start of header found 1180 rd[i].bufPos = 0; 1181 rd[i].bufLen = head_len; 1182 } else if (k > 0) { 1183 rd[i].bufPos -= k; 1184 rd[i].bufLen += k; 1185 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], 1186 rd[i].bufPos); 1193 } else { //we are reading event header 1194 rd[i].bufPos += jrd; 1195 rd[i].bufLen -= jrd; 1196 if (rd[i].bufPos >= minLen) { //sufficient data to take action 1197 //check if startflag correct; else shift block .... 1198 for (k = 0; k < rd[i].bufPos - 1; k++) { 1199 if (rd[i].rBuf->B[k] == start.B[1] 1200 && rd[i].rBuf->B[k + 1] == start.B[0]) 1201 break; 1202 } 1203 rd[i].skip += k; 1204 1205 if (k >= rd[i].bufPos - 1) { //no start of header found 1206 rd[i].bufPos = 0; 1207 rd[i].bufLen = head_len; 1208 } else if (k > 0) { 1209 rd[i].bufPos -= k; 1210 rd[i].bufLen += k; 1211 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], 1212 rd[i].bufPos); 1187 1213 #ifdef EVTDEBUG 1188 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],1189 rd[i].bufPos);1214 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k], 1215 rd[i].bufPos); 1190 1216 #endif 1191 }1192 if (rd[i].bufPos >= minLen) {1193 if (rd[i].skip > 0) {1194 snprintf (str, MXSTR, "skipped %d bytes on port%d",1195 rd[i].skip, i);1196 factOut (kInfo, 666, str);1197 rd[i].skip = 0;1198 1217 } 1199 goodhed++; 1200 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2; 1201 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]); 1202 rd[i].ftmTyp = ntohl (rd[i].rBuf->S[5]); 1203 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt) 1204 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt) 1205 rd[i].runID = ntohl (rd[i].rBuf->I[11]); 1206 rd[i].bufTyp = 1; //ready to read full record 1207 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos; 1208 1209 int fadboard = ntohs (rd[i].rBuf->S[12]); 1210 int fadcrate = fadboard / 256; 1211 fadboard = (fadcrate * 10 + fadboard % 256); 1218 if (rd[i].bufPos >= minLen) { 1219 if (rd[i].skip > 0) { 1220 snprintf (str, MXSTR, "skipped %d bytes on port%d", 1221 rd[i].skip, i); 1222 factOut (kInfo, 666, str); 1223 rd[i].skip = 0; 1224 } 1225 goodhed++; 1226 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2; 1227 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]); 1228 rd[i].ftmTyp = ntohl (rd[i].rBuf->S[5]); 1229 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt) 1230 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt) 1231 rd[i].runID = ntohl (rd[i].rBuf->I[11]); 1232 rd[i].bufTyp = 1; //ready to read full record 1233 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos; 1234 1235 int fadboard = ntohs (rd[i].rBuf->S[12]); 1236 int fadcrate = fadboard / 256; 1237 fadboard = (fadcrate * 10 + fadboard % 256); 1212 1238 #ifdef EVTDEBUG 1213 snprintf (str, MXSTR,1214 "sk %3d head: %5d %5d %5d %10d %4d %6d", i,1215 rd[i].fadLen, rd[i].evtID, rd[i].ftmID,1216 rd[i].runID, fadboard, jrd);1217 factOut (kDebug, 1, str);1239 snprintf (str, MXSTR, 1240 "sk %3d head: %5d %5d %5d %10d %4d %6d", i, 1241 rd[i].fadLen, rd[i].evtID, rd[i].ftmID, 1242 rd[i].runID, fadboard, jrd); 1243 factOut (kDebug, 1, str); 1218 1244 #endif 1219 1245 1220 if (rd[i].runID == 0) 1221 rd[i].runID = gi_myRun; 1222 1223 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) { 1224 snprintf (str, MXSTR, 1225 "illegal event-length on port %d", i); 1226 factOut (kFatal, 881, str); 1227 rd[i].bufLen = 100000; //? 1246 if (rd[i].runID == 0) 1247 rd[i].runID = gi_myRun; 1248 1249 if (rd[i].bufLen <= head_len 1250 || rd[i].bufLen > MAX_LEN) { 1251 snprintf (str, MXSTR, 1252 "illegal event-length on port %d", i); 1253 factOut (kFatal, 881, str); 1254 rd[i].bufLen = 100000; //? 1255 } 1256 int fadBoard = ntohs (rd[i].rBuf->S[12]); 1257 debugHead (i, fadBoard, rd[i].rBuf); 1258 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event 1259 } else { 1260 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet 1228 1261 } 1229 int fadBoard = ntohs (rd[i].rBuf->S[12]);1230 debugHead (i, fadBoard, rd[i].rBuf);1231 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event1232 1262 } else { 1233 1263 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet 1234 1264 } 1235 } else { 1236 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet 1237 } 1238 1239 } //end interpreting last read 1240 } 1265 1266 } //end interpreting last read 1267 } 1241 1268 } //end of successful read anything 1242 1269 } //finished trying to read all sockets … … 1272 1299 && evtCtrl.pcTime[k0] < g_actTime - 10) { 1273 1300 int id = evtCtrl.evtBuf[k0]; 1301 int ic,ib,ik,jb; 1274 1302 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d", 1275 1303 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0, 1276 1304 evtCtrl.evtStat[k0]); 1277 1305 factOut (kWarn, 601, str); 1278 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events 1306 1307 ik=0; 1308 for (ib=0; ib<NBOARDS; ib++) { 1309 if (ib%10==0) { 1310 snprintf (&str[ik], MXSTR, "'"); 1311 ik++; 1312 } 1313 jb = mBuffer[id].board[ib]; 1314 if ( jb<0 ) { 1315 snprintf (&str[ik], MXSTR, "."); 1316 ik++; 1317 } else { 1318 snprintf (&str[ik], MXSTR, "%d",jb%10); 1319 ik+=1; 1320 } 1321 } 1322 snprintf (&str[ik], MXSTR, "'"); 1323 factOut (kWarn, 601, str); 1324 1325 evtCtrl.evtStat[k0] = 93; //timeout for incomplete events 1279 1326 gi.evtSkp++; 1280 1327 gi.evtTot++; … … 1371 1418 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket 1372 1419 if (i % 7 == 0) { 1373 // gi_NumConnect[i / 7]--; 1374 gi_NumConnect[i / 7]-= cntsock ; 1420 gi_NumConnect[i / 7] -= cntsock; 1375 1421 gi.numConn[i / 7]--; 1376 1422 gj.numConn[i / 7]--; … … 1476 1522 subProc (void *thrid) 1477 1523 { 1524 char str[MXSTR]; 1478 1525 int threadID, status, numWait, numProc, kd, k1, k0, k, jret; 1479 1526 struct timespec xwait; … … 1501 1548 jret = 1502 1549 subProcEvt (threadID, mBuffer[id].FADhead, 1503 mBuffer[id].fEvent, mBuffer[id].buffer); 1550 mBuffer[id].fEvent, mBuffer[id].mBoard, gi_runPtr ); 1551 1504 1552 if (jret <= threadID) { 1505 1553 snprintf (str, MXSTR, … … 1544 1592 { 1545 1593 /* *** main loop processing file, including SW-trigger */ 1546 int numProc, numWait ;1594 int numProc, numWait, iskip; 1547 1595 int k, status, j; 1548 1596 struct timespec xwait; … … 1561 1609 1562 1610 /* CPU_ZERO initializes all the bits in the mask to zero. */ 1563 CPU_ZERO (&mask);1611 // CPU_ZERO (&mask); 1564 1612 /* CPU_SET sets only the bit corresponding to cpu. */ 1565 1613 // CPU_SET( 0 , &mask ); leave for system 1566 1614 // CPU_SET( 1 , &mask ); used by write process 1567 CPU_SET (2, &mask);1568 CPU_SET (3, &mask);1569 CPU_SET (4, &mask);1570 CPU_SET (5, &mask);1571 CPU_SET (6, &mask);1615 // CPU_SET (2, &mask); 1616 // CPU_SET (3, &mask); 1617 // CPU_SET (4, &mask); 1618 // CPU_SET (5, &mask); 1619 // CPU_SET (6, &mask); 1572 1620 // CPU_SET( 7 , &mask ); used by read process 1573 1621 /* sched_setaffinity returns 0 in success */ 1574 if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {1575 snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);1576 factOut (kWarn, -1, str);1577 }1622 // if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { 1623 // snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu); 1624 // factOut (kWarn, -1, str); 1625 // } 1578 1626 1579 1627 … … 1602 1650 } else { 1603 1651 1604 //-------- it is better to open the run already here, so call can be used to initialize1605 //-------- buffers etc. needed to interprete run (e.g. DRS calibration)1606 1652 int id = evtCtrl.evtBuf[k0]; 1607 1653 uint32_t irun = mBuffer[id].runNum; 1608 1654 int32_t ievt = mBuffer[id].evNum; 1609 if (runCtrl[lastRun].runId == irun) { 1610 j = lastRun; 1611 } else { 1612 //check which fileID to use (or open if needed) 1613 for (j = 0; j < MAX_RUN; j++) { 1614 if (runCtrl[j].runId == irun) 1615 break; 1655 iskip = 0 ; 1656 if (irun != gi_actRun) { // we should open a new run ? 1657 if (irun < gi_actRun) { // very old event; skip it 1658 iskip = -1 ; // but anyhow checkEvent() will be done 1659 snprintf (str, MXSTR, 1660 "P event %d from old run???", ievt,irun); 1661 factOut (kWarn, 901, str); 1662 } else if (gi_actRun != 0) { // but there is still old one open ? 1663 1664 // // // // // // //MUST WAIT UNTIL PENDING OLD EVENTS PROCESSED ... 1665 // loop over all waiting events; and decide what to do .... 1666 int q,q0 ; 1667 int qcnt=0; 1668 for (q=k1; q < (k1 + kd); q++) { 1669 q0 = q % (MAX_EVT * MAX_RUN); 1670 if (evtCtrl.evtStat[q0] > 0 1671 && evtCtrl.evtStat[q0] < 9000) { 1672 //this event is still active in the queue 1673 int qd = evtCtrl.evtBuf[q0]; 1674 uint32_t qrun = mBuffer[qd].runNum; 1675 if (qrun < irun ) { 1676 if (evtCtrl.evtStat[q0] < 92) { 1677 //incomplete old event -> reduce timeout 1678 evtCtrl.pcTime[q0] = 0 ; 1679 } 1680 qcnt++ ; 1681 } 1682 } 1683 } 1684 if (qcnt == 0 ) { 1685 runEnd (gi_actRun, gi_runPtr ); 1686 snprintf (str, MXSTR, 1687 "P run %d ended", gi_actRun); 1688 factOut (kWarn, 901, str); 1689 gi_actRun = 0; //there are no events from old runs left 1690 } 1616 1691 } 1617 if (j >= MAX_RUN) { 1618 snprintf (str, MXSTR, 1619 "P error: can not find run %d for event %d in %d", 1620 irun, ievt, id); 1621 factOut (kFatal, 901, str); 1622 } 1623 lastRun = j; 1624 } 1625 1626 if (runCtrl[j].fileId < 0) { 1627 //---- we need to open a new run ==> make sure all older runs are 1628 //---- finished and marked to be closed .... 1629 int j1; 1630 for (j1 = 0; j1 < MAX_RUN; j1++) { 1631 if (runCtrl[j1].fileId == 0) { 1632 runCtrl[j1].procId = 2; //--> do no longer accept events for processing 1633 //---- problem: processing still going on ==> must wait for closing .... 1692 1693 1694 if (gi_actRun == 0) { // now we can try to open new run 1695 for (j = 0; j < MAX_RUN; j++) { 1696 if (runCtrl[j].runId == irun) 1697 break; 1698 } 1699 if (j >= MAX_RUN) { 1634 1700 snprintf (str, MXSTR, 1635 "P finish run since new one opened %d", 1636 runCtrl[j1].runId); 1637 runFinish1 (runCtrl[j1].runId); 1701 "P error: can not find run %d for event %d in %d", 1702 irun, ievt, id); 1703 factOut (kFatal, 901, str); 1704 j = 0; 1638 1705 } 1639 1706 1640 } 1641 1642 actRun.Version = 1; 1643 actRun.RunType = -1; //to be adapted 1644 1645 actRun.Nroi = runCtrl[j].roi0; 1646 actRun.NroiTM = runCtrl[j].roi8; 1647 if (actRun.Nroi == actRun.NroiTM) 1648 actRun.NroiTM = 0; 1649 actRun.RunTime = runCtrl[j].firstTime; 1650 actRun.RunUsec = runCtrl[j].firstTime; 1651 actRun.NBoard = NBOARDS; 1652 actRun.NPix = NPIX; 1653 actRun.NTm = NTMARK; 1654 actRun.Nroi = mBuffer[id].nRoi; 1655 memcpy (actRun.FADhead, mBuffer[id].FADhead, 1656 NBOARDS * sizeof (PEVNT_HEADER)); 1657 1658 runCtrl[j].fileHd = 1659 runOpen (irun, &actRun, sizeof (actRun)); 1660 if (runCtrl[j].fileHd == NULL) { 1661 snprintf (str, MXSTR, 1662 "P could not open a file for run %d", irun); 1663 factOut (kError, 502, str); 1664 runCtrl[j].fileId = 91; 1665 runCtrl[j].procId = 91; 1666 } else { 1667 snprintf (str, MXSTR, "P opened new run_file %d evt %d", 1668 irun, ievt); 1669 factOut (kInfo, -1, str); 1670 runCtrl[j].fileId = 0; 1671 runCtrl[j].procId = 0; 1672 } 1673 1674 } 1675 //-------- also check if run shall be closed (==> skip event, but do not close the file !!! ) 1676 if (runCtrl[j].procId == 0) { 1677 if (runCtrl[j].closeTime < g_actTime 1678 || runCtrl[j].lastTime < g_actTime - 300 1679 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) { 1680 snprintf (str, MXSTR, 1681 "P reached end of run condition for run %d", 1682 irun); 1683 factOut (kInfo, 502, str); 1684 runFinish1 (runCtrl[j].runId); 1685 runCtrl[j].procId = 1; 1707 if (runCtrl[j].fileId == 0) { 1708 snprintf (str, MXSTR, 1709 "P run %d is already open ???", irun); 1710 factOut (kWarn, 901, str); 1711 } else if (runCtrl[j].fileId > 0) { 1712 snprintf (str, MXSTR, 1713 "P run %d is already closed ???", irun); 1714 factOut (kWarn, 901, str); 1715 } else { // now open new run 1716 1717 actRun.Version = 1; 1718 actRun.RunType = -1; //to be adapted 1719 1720 actRun.Nroi = runCtrl[j].roi0; 1721 actRun.NroiTM = runCtrl[j].roi8; 1722 if (actRun.Nroi == actRun.NroiTM) 1723 actRun.NroiTM = 0; 1724 actRun.RunTime = runCtrl[j].firstTime; 1725 actRun.RunUsec = runCtrl[j].firstTime; 1726 actRun.NBoard = NBOARDS; 1727 actRun.NPix = NPIX; 1728 actRun.NTm = NTMARK; 1729 actRun.Nroi = mBuffer[id].nRoi; 1730 memcpy (actRun.FADhead, mBuffer[id].FADhead, 1731 NBOARDS * sizeof (PEVNT_HEADER)); 1732 1733 gi_actRun = irun; 1734 gi_runPtr = 1735 runStart (irun, &actRun, sizeof (actRun)); 1736 if (gi_runPtr == NULL) { 1737 snprintf (str, MXSTR, 1738 "P could not start run %d", irun); 1739 factOut (kFatal, 502, str); 1740 } else { 1741 snprintf (str, MXSTR, "P started new run %d", 1742 irun); 1743 factOut (kInfo, -1, str); 1744 } 1745 1746 runCtrl[j].fileHd = 1747 runOpen (irun, &actRun, sizeof (actRun)); 1748 if (runCtrl[j].fileHd == NULL) { 1749 snprintf (str, MXSTR, 1750 "P could not open a file for run %d", 1751 irun); 1752 factOut (kError, 502, str); 1753 runCtrl[j].fileId = 91; 1754 runCtrl[j].procId = 91; 1755 } else { 1756 snprintf (str, MXSTR, 1757 "P opened new run_file %d evt %d", irun, 1758 ievt); 1759 factOut (kInfo, -1, str); 1760 runCtrl[j].fileId = 0; 1761 runCtrl[j].procId = 0; 1762 } 1763 } 1686 1764 } 1687 1765 } 1688 if (runCtrl[j].procId != 0) { 1689 snprintf (str, MXSTR, 1690 "P skip event %d because no active run %d", ievt, 1691 irun); 1692 factOut (kDebug, 502, str); 1693 evtCtrl.evtStat[k0] = 9091; 1694 } else { 1695 //-------- 1696 //-------- 1697 id = evtCtrl.evtBuf[k0]; 1766 1767 if (irun == gi_actRun) { 1768 // now we are sure the run is ready and we can start processing the event 1769 id = evtCtrl.evtBuf[k0]; 1698 1770 int itevt = mBuffer[id].trgNum; 1699 1771 int itrg = mBuffer[id].trgTyp; … … 1701 1773 int roiTM = mBuffer[id].nRoiTM; 1702 1774 1703 //make sure unused pixels/tmarks are cleared to zero1775 //make sure unused pixels/tmarks are cleared to zero 1704 1776 if (roiTM == roi) 1705 1777 roiTM = 0; … … 1719 1791 1720 1792 1721 //and set correct event header ; also check for consistency in event (not yet) 1793 //and set correct event header 1722 1794 mBuffer[id].fEvent->Roi = roi; 1723 1795 mBuffer[id].fEvent->RoiTM = roiTM; … … 1731 1803 mBuffer[id].fEvent->SoftTrig = 0; 1732 1804 1733 1805 int mboard = -1; 1734 1806 for (ib = 0; ib < NBOARDS; ib++) { 1735 if (mBuffer[id].board[ib] == -1) { //board is not read1807 if (mBuffer[id].board[ib] == -1) { //board is not read 1736 1808 mBuffer[id].FADhead[ib].start_package_flag = 0; 1737 1809 mBuffer[id].fEvent->BoardTime[ib] = 0; … … 1739 1811 mBuffer[id].fEvent->BoardTime[ib] = 1740 1812 ntohl (mBuffer[id].FADhead[ib].time); 1813 if (mboard < 0) 1814 mboard = ib; 1741 1815 } 1742 1816 } 1743 1817 mBuffer[id].mBoard = mboard; 1744 1818 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead, 1745 mBuffer[id].fEvent );1819 mBuffer[id].fEvent, mboard); 1746 1820 gi.procTot++; 1747 1821 numProc++; 1748 1822 1749 if (i < 0 ) {1750 evtCtrl.evtStat[k0] = 9999; 1823 if (i < 0 || iskip < 0 ) { 1824 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped 1751 1825 gi.procErr++; 1752 1826 } else { 1753 evtCtrl.evtStat[k0] = 1000; 1827 evtCtrl.evtStat[k0] = 1000; //flag event to be processed 1754 1828 runCtrl[j].procEvt++; 1829 if (runCtrl[j].procId != 0) 1830 mBuffer[id].toWrite = -1; //flag event to be processed but now written 1831 else 1832 mBuffer[id].toWrite = 0; 1755 1833 } 1756 1834 } … … 1863 1941 1864 1942 /* CPU_ZERO initializes all the bits in the mask to zero. */ 1865 CPU_ZERO (&mask);1943 // CPU_ZERO (&mask); 1866 1944 /* CPU_SET sets only the bit corresponding to cpu. */ 1867 CPU_SET (cpu, &mask);1945 // CPU_SET (cpu, &mask); 1868 1946 /* sched_setaffinity returns 0 in success */ 1869 if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {1870 snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);1871 }1947 // if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { 1948 // snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu); 1949 // } 1872 1950 1873 1951 int lastRun = 0; //usually run from last event still valid … … 1941 2019 snprintf (str, MXSTR, "W opened new run_file %d evt %d", 1942 2020 irun, ievt); 1943 factOut (k Info, -1, str);2021 factOut (kWarn, -1, str); 1944 2022 runCtrl[j].fileId = 0; 1945 2023 } … … 1947 2025 } 1948 2026 1949 if (runCtrl[j].fileId != 0 ) {2027 if (runCtrl[j].fileId != 0 || mBuffer[id].toWrite < 0) { 1950 2028 if (runCtrl[j].fileId < 0) { 1951 2029 snprintf (str, MXSTR, … … 1975 2053 ievt, irun, k0); 1976 2054 factOut (kDebug, 504, str); 1977 // gj.writEvt++ ;1978 2055 } else { 1979 2056 snprintf (str, MXSTR, "W error writing event for run %d", … … 2161 2238 2162 2239 //prepare for subProcesses 2163 gi_maxSize = g_maxSize;2164 if (gi_maxSize <=0)2165 gi_maxSize = 1;2240 // gi_maxSize = g_maxSize; 2241 // if (gi_maxSize < 0) 2242 gi_maxSize = 0; 2166 2243 2167 2244 gi_maxProc = g_maxProc; … … 2242 2319 int 2243 2320 subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, 2244 int 8_t* buffer)2321 int16_t mboard, void * buffer) 2245 2322 { 2246 2323 printf ("called subproc %d\n", threadID); … … 2287 2364 2288 2365 int 2289 eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event )2366 eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event, int mboard) 2290 2367 { 2291 2368 int i = 0; … … 2377 2454 2378 2455 g_maxProc = 20; 2379 g_maxSize = 30000;2456 g_maxSize = 0; 2380 2457 2381 2458 g_runStat = 40;
Note:
See TracChangeset
for help on using the changeset viewer.