// #define EVTDEBUG #define NUMSOCK 1 //set to 7 for old configuration #define MAXREAD 65536 //64kB wiznet buffer #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "EventBuilder.h" enum Severity { kMessage = 10, ///< Just a message, usually obsolete kInfo = 20, ///< An info telling something which can be interesting to know kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour kError = 40, ///< Error, something unexpected happened, but can still be handled by the program kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination kDebug = 99, ///< A message used for debugging only }; #define MIN_LEN 32 // min #bytes needed to interpret FADheader #define MAX_LEN 256*1024 // size of read-buffer per socket //#define nanosleep(x,y) extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len); extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len); extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len); //extern int runFinish (uint32_t runnr); extern void factOut (int severity, int err, char *message); extern void factReportIncomplete (uint64_t rep); extern void gotNewRun (int runnr, PEVNT_HEADER * headers); extern void factStat (GUI_STAT gj); extern void factStatNew (EVT_STAT gi); extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event); extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, int8_t * buffer); extern void debugHead (int i, int j, void *buf); extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, int state, uint32_t tsec, uint32_t tusec); extern void debugStream (int isock, void *buf, int len); int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt); int evtCtrl_frstPtr; // First event in queue int evtCtrl_lastPtr; // pointer to next free slot int g_maxProc; int gi_maxProc; uint g_actTime; int g_runStat; int g_reset; size_t g_maxMem; //maximum memory allowed for buffer FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd" uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards //EVT_STAT gi; GUI_STAT gj; #define MAX_EVT 65536 // ( 300s @ 220Hz; 16GB = 5000 evt @ roi=1024 (27s) ; 18000 evt @ roi = 300 ) #define MAX_RUN 8 // Number of concurrent runs EVT_CTRL evtCtrl[MAX_EVT]; //control of events during processing void factPrintf(int severity, int id, const char *fmt, ...) { char str[1000]; va_list ap; va_start(ap, fmt); vsnprintf(str, 1000, fmt, ap); va_end(ap); factOut(severity, id, str); } #define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER)) #define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM) typedef struct TGB_struct { struct TGB_struct *prev; void *mem; } TGB_entry; TGB_entry *tgb_last = NULL; uint64_t tgb_memory = 0; uint64_t tgb_inuse = 0; void *TGB_Malloc() { // No free slot available, next alloc would exceed max memory if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem) return NULL; // We will return this amount of memory tgb_inuse += MAX_TOT_MEM; gj.bufTot++; // No free slot available, allocate a new one if (!tgb_last) { tgb_memory += MAX_TOT_MEM; return malloc(MAX_TOT_MEM); } // Get the next free slot from the stack and return it TGB_entry *last = tgb_last; void *mem = last->mem; tgb_last = last->prev; free(last); return mem; }; void TGB_free(void *mem) { if (!mem) return; // Add the last free slot to the stack TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry)); // FIXME: Really free memory if memory usuage exceeds g_maxMem entry->prev = tgb_last; entry->mem = mem; tgb_last = entry; // Decrease the amont of memory in use accordingly tgb_inuse -= MAX_TOT_MEM; } RUN_CTRL runCtrl[MAX_RUN]; /* *** Definition of rdBuffer to read in IP packets; keep it global !!!! */ typedef union { uint8_t B[MAX_LEN]; uint16_t S[MAX_LEN / 2]; uint32_t I[MAX_LEN / 4]; uint64_t L[MAX_LEN / 8]; } CNV_FACT; typedef struct { int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ... int32_t bufPos; //next byte to read to the buffer next int32_t bufLen; //number of bytes left to read int32_t skip; //number of bytes skipped before start of event int errCnt; //how often connect failed since last successful int sockStat; //-1 if socket not yet connected , 99 if not exist int socket; //contains the sockets struct sockaddr_in SockAddr; //IP for each socket int evtID; // event ID of event currently read int runID; // run " int ftmID; // event ID from FTM uint fadLen; // FADlength of event currently read int fadVers; // Version of FAD int ftmTyp; // trigger type int Port; CNV_FACT *rBuf; } READ_STRUCT; /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ int GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT * rs) { /* *** generate Address, create sockets and allocates readbuffer for it *** *** if flag==0 generate socket and buffer *** <0 destroy socket and buffer *** >0 close and redo socket *** *** sid : board*7 + port id */ //close socket if open if (rs->sockStat == 0) { if (close (rs->socket) > 0) { factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno); } else { factPrintf(kInfo, 771, "Succesfully closed socket %d", sid); } } rs->sockStat = 99; if (flag < 0) { free (rs->rBuf); //and never open again rs->rBuf = NULL; return 0; } if (flag == 0) { //generate address and buffer ... rs->Port = port; rs->SockAddr.sin_family = sockAddr->sin_family; rs->SockAddr.sin_port = htons (port); rs->SockAddr.sin_addr = sockAddr->sin_addr; rs->rBuf = (CNV_FACT*)malloc (sizeof (CNV_FACT)); if (rs->rBuf == NULL) { factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid); rs->sockStat = 77; return -3; } } if ((rs->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno); rs->sockStat = 88; return -2; } int optval = 1; if (setsockopt (rs->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) { factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } optval = 10; //start after 10 seconds if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) { factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } optval = 10; //do every 10 seconds if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) { factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } optval = 2; //close after 2 unsuccessful tries if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) { factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } factPrintf(kInfo, 773, "Successfully generated socket %d", sid); rs->sockStat = -1; //try to (re)open socket rs->errCnt = 0; return 0; } /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ int checkRoiConsistency(const CNV_FACT *rbuf, int roi[]) { int xjr = -1; int xkr = -1; //points to the very first roi int roiPtr = sizeof(PEVNT_HEADER)/2 + 2; roi[0] = ntohs(rbuf->S[roiPtr]); for (int jr = 0; jr < 9; jr++) { roi[jr] = ntohs(rbuf->S[roiPtr]); if (roi[jr]<0 || roi[jr]>1024) { factPrintf(kError, 999, "Illegal roi in channel %d (allowed: 0<=roi<=1024)", jr, roi[jr]); return 0; } // Check that the roi of pixels jr are compatible with the one of pixel 0 if (jr!=8 && roi[jr]!=roi[0]) { xjr = jr; break; } // Check that the roi of all other DRS chips on boards are compatible for (int kr = 1; kr < 4; kr++) { const int kroi = ntohs(rbuf->S[roiPtr]); if (kroi != roi[jr]) { xjr = jr; xkr = kr; break; } roiPtr += kroi+4; } } if (xjr>=0) { if (xkr<0) factPrintf(kFatal, 1, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]); else factPrintf(kFatal, 1, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rbuf->S[roiPtr])); return 0; } if (roi[8] < roi[0]) { factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]); //gj.badRoiB++; //gj.badRoi[b]++; return 0; } return 1; } int mBufEvt(const READ_STRUCT *rs) { int nRoi[9]; if (!checkRoiConsistency(rs->rBuf, nRoi)) return -9999; const int evID = rs->evtID; const uint runID = rs->runID; const int trgTyp = rs->ftmTyp; const int trgNum = rs->ftmID; const int fadNum = rs->evtID; const int beg = (evtCtrl_lastPtr + MAX_EVT - 1) % MAX_EVT; const int end = (evtCtrl_frstPtr + MAX_EVT - 1) % MAX_EVT; for (int k=beg; k!=end; k=(k+MAX_EVT-1)%MAX_EVT) { // If the run is different, go on searching. // We cannot stop searching if a lower run-id is found as in // the case of the events, because theoretically, there // can be the same run on two different days. if (runID != evtCtrl[k].runNum) continue; // If the ID of the new event if higher than the last one stored // in that run, we have to assign a new slot (leave the loop) if (evID > evtCtrl[k].evNum/* && runID == evtCtrl[k].runNum*/) break; if (evID != evtCtrl[k].evNum/* || runID != evtCtrl[k].runNum*/) continue; // We have found an entry with the same runID and evtID // Check if ROI is consistent if (evtCtrl[k].nRoi != nRoi[0] || evtCtrl[k].nRoiTM != nRoi[8]) { factPrintf(kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.", evtCtrl[k].nRoi, evtCtrl[k].nRoiTM, nRoi[0], nRoi[8]); return -8201; } // count for inconsistencies if (evtCtrl[k].trgNum != trgNum) evtCtrl[k].Errors[0]++; if (evtCtrl[k].fadNum != fadNum) evtCtrl[k].Errors[1]++; if (evtCtrl[k].trgTyp != trgTyp) evtCtrl[k].Errors[2]++; //everything seems fine so far ==> use this slot .... return k; } // Check if the control structure still has space left if (end-beg==1 || (end==0 && beg==MAX_EVT-1)) { factPrintf(kError, 881, "No control slot to keep event %d (run %d) %d %d", evID, runID, beg, end); return -1; } // If we have already queued at least one event, // check the roi if the previous event // already belongs to the same run. // Get the runCtrl entry of the previous event. // If none is in the queue (we are anyhow super fast) // just get the correct entry from the runCtrl array. int idx = evtCtrl[beg].runCtrl_idx; // If there is an event in the queue and it has the same runID, we can use // that event to check for the roi consistency throughout the run if (evtCtrl_frstPtr!=evtCtrl_lastPtr && runCtrl[idx].runId==runID) { // Check if run already registered (old entries should have runId==-1) if (runCtrl[idx].roi0 != nRoi[0] || runCtrl[idx].roi8 != nRoi[8]) { factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)", runCtrl[idx].roi0, runCtrl[idx].roi8, nRoi[0], nRoi[8], runID, evID); return -9301; } } // If there is none in the queue, we have to search for the correct entry if (evtCtrl_frstPtr==evtCtrl_lastPtr) { idx = -1; for (int k=0; kAdc_Data, 0, (NPIX+NTMARK)*2*evtCtrl[i].nRoi); //flag all pixels as unused for (int k = 0; k < NPIX; k++) evtCtrl[i].fEvent->StartPix[k] = -1; //flag all TMark as unused for (int k = 0; k < NTMARK; k++) evtCtrl[i].fEvent->StartTM[k] = -1; evtCtrl[i].fEvent->NumBoards = 0; evtCtrl[i].fEvent->SoftTrig = 0; evtCtrl[i].fEvent->PCTime = evtCtrl[i].pcTime[0]; evtCtrl[i].fEvent->PCUsec = evtCtrl[i].pcTime[1]; evtCtrl[i].fEvent->Roi = evtCtrl[i].nRoi; evtCtrl[i].fEvent->RoiTM = evtCtrl[i].nRoiTM; evtCtrl[i].fEvent->EventNum = evtCtrl[i].evNum; evtCtrl[i].fEvent->TriggerNum = evtCtrl[i].trgNum; evtCtrl[i].fEvent->TriggerType = evtCtrl[i].trgTyp; } void mBufFree (int i) { TGB_free(evtCtrl[i].FADhead); evtCtrl[i].fEvent = NULL; evtCtrl[i].FADhead = NULL; evtCtrl[i].evNum = evtCtrl[i].nRoi = -1; evtCtrl[i].runNum = 0; gj.usdMem = tgb_inuse; gj.bufTot--; } uint64_t reportIncomplete(int id, const char *txt) { factPrintf(kWarn, 601, "skip incomplete evt (run=%d, evt=%d, %s)", evtCtrl[id].runNum, evtCtrl[id].evNum, txt); uint64_t report = 0; char str[1000]; int ik=0; for (int ib=0; ib=0) // data received from that board { str[ik++] = '0'+(jb%10); continue; } // FIXME: This is not synchronous... it reports // accoridng to the current connection status, not w.r.t. to the // one when the event was taken. if (gi_NumConnect[ib]<=0) // board not connected { str[ik++] = 'x'; continue; } // data from this board lost str[ik++] = '.'; report |= ((uint64_t)1)<S[eStart+0] = ntohs(rbuf->S[eStart+0]);//id rbuf->S[eStart+1] = ntohs(rbuf->S[eStart+1]);//start_cell rbuf->S[eStart+2] = ntohs(rbuf->S[eStart+2]);//roi rbuf->S[eStart+3] = ntohs(rbuf->S[eStart+3]);//filling eStart += 4+rbuf->S[eStart+2];//skip the pixel data } memcpy(&evtCtrl[evID].FADhead[i], rbuf, sizeof(PEVNT_HEADER)); int src = sizeof(PEVNT_HEADER) / 2; // consistency of ROIs have been checked already (is it all correct?) const int roi = rbuf->S[src+2]; // different sort in FAD board..... for (int px = 0; px < 9; px++) { for (int drs = 0; drs < 4; drs++) { // pixH = rd[i].rBuf->S[src++]; // ID src++; const int pixC = rbuf->S[src++]; // start-cell const int pixR = rbuf->S[src++]; // roi //here we should check if pixH is correct .... const int pixS = i * 36 + drs * 9 + px; src++; evtCtrl[evID].fEvent->StartPix[pixS] = pixC; const int dest1 = pixS * roi; memcpy(&evtCtrl[evID].fEvent->Adc_Data[dest1], &rbuf->S[src], roi * 2); src += pixR; if (px == 8) { const int tmS = i * 4 + drs; //and we have additional TM info if (pixR > roi) { const int dest2 = tmS * roi + NPIX * roi; const int srcT = src - roi; evtCtrl[evID].fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024; memcpy(&evtCtrl[evID].fEvent->Adc_Data[dest2], &rbuf->S[srcT], roi * 2); } else { evtCtrl[evID].fEvent->StartTM[tmS] = -1; } } } } } void *readFAD (void *ptr) { /* *** main loop reading FAD data and sorting them to complete events */ factPrintf(kInfo, -1, "Start initializing (readFAD)"); READ_STRUCT rd[NBOARDS]; //buffer to read IP and afterwards store in mBuffer uint32_t actrun = 0; const int minLen = sizeof(PEVNT_HEADER); //min #bytes needed to check header: full header for debug /* initialize run control logics */ for (int i = 0; i < MAX_RUN; i++) { runCtrl[i].runId = 0; runCtrl[i].fileId = -2; } int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX; gi_resetS = gi_resetR = 9; int sockDef[NBOARDS]; //internal state of sockets memset(sockDef, 0, NBOARDS*sizeof(int)); START: evtCtrl_frstPtr = 0; evtCtrl_lastPtr = 0; //time in seconds uint gi_SecTime = time(NULL);; g_actTime = gi_SecTime; const int cntsock = 8 - NUMSOCK ; if (gi_resetS > 0) { //make sure all sockets are preallocated as 'not exist' for (int i = 0; i < NBOARDS; i++) { rd[i].socket = -1; rd[i].sockStat = 99; } for (int k = 0; k < NBOARDS; k++) { gi_NumConnect[k] = 0; //gi.numConn[k] = 0; gj.numConn[k] = 0; //gj.errConn[k] = 0; gj.rateBytes[k] = 0; gj.totBytes[k] = 0; } } if (gi_resetR > 0) { gj.bufTot = gj.maxEvt = gj.xxxEvt = 0; gj.usdMem = gj.maxMem = gj.xxxMem = 0; gj.totMem = tgb_memory; gj.bufNew = gj.bufEvt = 0; gj.evtSkip = gj.evtWrite = gj.evtErr = 0; // initialize mBuffer (mark all entries as unused\empty) for (int i = 0; i < MAX_EVT; i++) { evtCtrl[i].evNum = evtCtrl[i].nRoi = -1; evtCtrl[i].runNum = 0; evtCtrl[i].evtStat = -1; } evtCtrl_frstPtr = 0; evtCtrl_lastPtr = 0; factPrintf(kInfo, -1, "End initializing (readFAD)"); } gi_reset = gi_resetR = gi_resetS = gi_resetW = 0; //loop until global variable g_runStat claims stop while (g_runStat >= 0 && g_reset == 0) { gj.readStat = g_runStat; for (int b = 0; b < NBOARDS; b++) { // Nothing has changed if (g_port[b].sockDef == sockDef[b]) continue; gi_NumConnect[b] = 0; //must close all connections gj.numConn[b] = 0; // s0 = 0: sockets to be defined and opened // s0 = -1: sockets to be destroyed // s0 = +1: sockets to be closed and reopened int s0 = 0; if (sockDef[b] != 0) s0 = g_port[b].sockDef==0 ? -1 : +1; const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0; GenSock(s0, b, p0+1, &g_port[b].sockAddr, &rd[b]); //generate address and socket sockDef[b] = g_port[b].sockDef; } // count the number of active boards int actBoards = 0; for (int b = 0; b < NBOARDS; b++) if (sockDef[b] > 0) actBoards++; //check all sockets if something to read for (int i = 0; i < NBOARDS; i++) { // Do not try to connect this socket if (rd[i].sockStat > 0) continue; if (rd[i].sockStat == -1) { //try to connect if not yet done rd[i].sockStat = connect (rd[i].socket, (struct sockaddr *) &rd[i].SockAddr, sizeof (rd[i].SockAddr)); // Failed if (rd[i].sockStat == -1) { rd[i].errCnt++; usleep(25000); continue; } // Success (rd[i].sockStat == 0) if (sockDef[i] > 0) { rd[i].bufTyp = 0; // expect a header rd[i].bufLen = sizeof(PEVNT_HEADER); // max size to read at begining } else { rd[i].bufTyp = -1; // full data to be skipped rd[i].bufLen = MAX_LEN; // huge for skipping } rd[i].bufPos = 0; // no byte read so far rd[i].skip = 0; // start empty gi_NumConnect[i] += cntsock; gj.numConn[i]++; factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", i, gj.numConn[i]); } // Do not read from this socket if (rd[i].bufLen<0) continue; if (rd[i].bufLen>0) { const int32_t jrd = recv(rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT); // recv failed if (jrd<0) { // There was just nothing waiting if (errno==EWOULDBLOCK || errno==EAGAIN) continue; factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno); continue; } // connection was closed ... if (jrd==0) { factPrintf(kInfo, 441, "Socket %d closed by FAD", i); const int s0 = sockDef[i] > 0 ? +1 : -1; GenSock(s0, i, 0, NULL, &rd[i]); gi_NumConnect[i]-= cntsock ; gj.numConn[i]--; continue; } gj.rateBytes[i] += jrd; // are we skipping this board ... if (rd[i].bufTyp < 0) continue; rd[i].bufPos += jrd; //==> prepare for continuation rd[i].bufLen -= jrd; #ifdef EVTDEBUG debugRead(i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, rd[i].bufTyp, tv.tv_sec, tv.tv_usec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event #endif } //we are reading event header if (rd[i].bufTyp <= 0) { //not yet sufficient data to take action if (rd[i].bufPos < minLen) continue; //check if startflag correct; else shift block .... // FIXME: This is not enough... this combination of // bytes can be anywhere... at least the end bytes // must be checked somewhere, too. int k; for (k = 0; k < rd[i].bufPos - 1; k++) { //start.S = 0xFB01; if (rd[i].rBuf->B[k] == 0xfb && rd[i].rBuf->B[k+1] == 0x01) break; } rd[i].skip += k; //no start of header found if (k >= rd[i].bufPos - 1) { rd[i].rBuf->B[0] = rd[i].rBuf->B[rd[i].bufPos]; rd[i].bufPos = 1; rd[i].bufLen = sizeof(PEVNT_HEADER)-1; continue; } if (k > 0) { rd[i].bufPos -= k; rd[i].bufLen += k; memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos); } if (rd[i].bufPos < minLen) continue; if (rd[i].skip > 0) { factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i); rd[i].skip = 0; } // TGB: This needs much more checks than just the first two bytes! // Swap everything except start_package_flag. // It is to difficult to find out where it is used how, // but it doesn't really matter because it is not really // used anywehere else // rd[i].rBuf->S[1] = ntohs(rd[i].rBuf->S[1]); // package_length rd[i].rBuf->S[2] = ntohs(rd[i].rBuf->S[2]); // version_no rd[i].rBuf->S[3] = ntohs(rd[i].rBuf->S[3]); // PLLLCK rd[i].rBuf->S[4] = ntohs(rd[i].rBuf->S[4]); // trigger_crc rd[i].rBuf->S[5] = ntohs(rd[i].rBuf->S[5]); // trigger_type rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]); // board id rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]); // adc_clock_phase_shift rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]); // number_of_triggers_to_generate rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]); // trigger_generator_prescaler rd[i].rBuf->I[3] = ntohl(rd[i].rBuf->I[3]); // trigger_id rd[i].rBuf->I[4] = ntohl(rd[i].rBuf->I[4]); // fad_evt_counter rd[i].rBuf->I[5] = ntohl(rd[i].rBuf->I[5]); // REFCLK_frequency rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]); // runnumber; rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]); // time; for (int s=24;s<24+NTemp+NDAC;s++) rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac rd[i].fadLen = ntohs(rd[i].rBuf->S[1]) * 2; rd[i].fadVers = rd[i].rBuf->S[2]; rd[i].ftmTyp = rd[i].rBuf->S[5]; rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt) rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt) rd[i].runID = rd[i].rBuf->I[11]==0 ? g_actTime : rd[i].rBuf->I[11]; rd[i].bufTyp = 1; //ready to read full record rd[i].bufLen = rd[i].fadLen - rd[i].bufPos; const int fadBoard = rd[i].rBuf->S[12]; debugHead(i, fadBoard, rd[i].rBuf); continue; } // are we reading data // not yet all read if (rd[i].bufLen > 0) continue; // stop.S = 0x04FE; if (rd[i].rBuf->B[rd[i].fadLen - 1] != 0xfe || rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04) { factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d", i, rd[i].evtID, rd[i].fadLen, rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]); // ready to read next header rd[i].bufTyp = 0; rd[i].bufLen = sizeof(PEVNT_HEADER); rd[i].bufPos = 0; continue; } // int actid; // if (g_useFTM > 0) // actid = rd[i].evtID; // else // actid = rd[i].ftmID; //get index into mBuffer for this event (create if needed) const int idx = mBufEvt(&rd[i]); // no free entry in mBuffer, retry later if (idx == -1) continue; // We have a valid entry, but no memory has yet been allocated if (idx >= 0 && evtCtrl[idx].evtStat==0 && evtCtrl[idx].FADhead==NULL) { // Try to get memory from the big buffer evtCtrl[idx].FADhead = (PEVNT_HEADER*)TGB_Malloc(); if (evtCtrl[idx].FADhead == NULL) { // If this works properly, this is a hack which can be removed, or // replaced by a signal or dim message if (rd[i].bufTyp==1) factPrintf(kError, 882, "No free memory left for %d (run=%d)", evtCtrl[idx].evNum, evtCtrl[idx].runNum); rd[i].bufTyp = 2; continue; } // Initialise contents of mBuffer[evID]->fEvent initEvent(idx); // Some statistics gj.usdMem = tgb_inuse; if (gj.usdMem > gj.maxMem) gj.maxMem = gj.usdMem; gj.rateNew++; if (gj.bufTot > gj.maxEvt) gj.maxEvt = gj.bufTot; } // ready to read next header rd[i].bufTyp = 0; rd[i].bufLen = sizeof(PEVNT_HEADER); rd[i].bufPos = 0; // Fatal error occured. Event cannot be processed. Skip it. Start reading next header. if (idx < -1000) continue; if (evtCtrl[idx].evtStat==-1 || evtCtrl[idx].evtStat>=90) { factPrintf(kError, 882, "Received data of event %d [%d] (run=%d) has already been advanced (stat=%d)... skipping", evtCtrl[idx].evNum, i, evtCtrl[idx].runNum, evtCtrl[idx].evtStat); continue; } //we have a valid entry in mBuffer[]; fill it const int fadBoard = rd[i].rBuf->S[12]; const int fadCrate = fadBoard>>8; if (i != (fadCrate * 10 + (fadBoard&0xff))) { factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)", i, fadBoard, fadCrate, fadBoard&0xff); } if (evtCtrl[idx].board[i] != -1) { factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d", evtCtrl[idx].evNum, i, i, rd[i].fadLen, rd[i].rBuf->B[0], rd[i].rBuf->B[1], rd[i].rBuf->B[rd[i].fadLen - 2], rd[i].rBuf->B[rd[i].fadLen - 1]); continue; // Continue reading next header } // Copy data from rd[i] to mBuffer[evID] copyData(rd[i].rBuf, i, idx); // now we have stored a new board contents into Event structure evtCtrl[idx].fEvent->NumBoards++; evtCtrl[idx].board[i] = i; evtCtrl[idx].nBoard++; evtCtrl[idx].evtStat = evtCtrl[idx].nBoard; // have we already reported first (partial) event of this run ??? if (evtCtrl[idx].nBoard==1 && evtCtrl[idx].runNum != actrun) { // Signal the fadctrl that a new run has been started gotNewRun(evtCtrl[idx].runNum, NULL); factPrintf(kInfo, 1, "gotNewRun called, prev run %d, new run %d, event %d", actrun, evtCtrl[idx].runNum, evtCtrl[idx].evNum); for (int j=0; j0) runCtrl[ir].runId = 0; } const int beg = (idx + MAX_EVT - 1) % MAX_EVT; const int end = (evtCtrl_frstPtr + MAX_EVT - 1) % MAX_EVT; // we have just completed an event... so all previous events // must have been completed already. If they are not, there // is no need to wait for the timeout, because they will never // get completed. We can just ensure that if we check for the previous // event to be complete every time we receive a new complete event. // If we find an incomplete one, we remove all consecutive // incomplete ones. for (int k=beg; k!=end; k=(k+MAX_EVT-1)%MAX_EVT) { // We are done if we find a complete or fully processed event if (evtCtrl[k].evtStat>=100) break; // we do not call factReportIncomplete here, because by starting a new // run and having received the first complete event from that run // the user has expressed that the old events are obsolste now // and the run will be closed anyway if (evtCtrl[k].evtStat>0 && evtCtrl[k].evtStat<90) { reportIncomplete(k, "expired"); evtCtrl[k].evtStat = 90; } } // Flag that the event is ready for processing evtCtrl[idx].evtStat = 100; } // end for loop over all sockets g_actTime = time (NULL); if (g_actTime <= gi_SecTime) { usleep(1); continue; } gi_SecTime = g_actTime; gj.bufNew = 0; //loop over all active events and flag those older than read-timeout //delete those that are written to disk .... const int count = (evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT; // This could be improved having the pointer which separates the queue with // the incomplete events from the queue with the complete events for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) { // Check the more likely case first: incomplete events if (evtCtrl[k0].evtStat>0 && evtCtrl[k0].evtStat<100) { gj.bufNew++; //incomplete event in Buffer // Event has not yet timed out or was reported already if (evtCtrl[k0].evtStat==90 || evtCtrl[k0].pcTime[0]>=g_actTime - 30) continue; // This will result in the emission of a dim service. // It doesn't matter if that takes comparably long, // because we have to stop the run anyway. const uint64_t rep = reportIncomplete(k0, "timeout"); factReportIncomplete(rep); //timeout for incomplete events evtCtrl[k0].evtStat = 90; gj.evtSkip++; continue; } // Check the less likely case: 'useless' or 'delete' // evtState==0 can happen if the event was initialized (some data received) // but the data did not make sense (e.g. inconsistent rois) if (evtCtrl[k0].evtStat==10000) { evtCtrl[k0].evtStat = -1; mBufFree(k0); //event written--> free memory gj.evtWrite++; gj.rateWrite++; } // Remove leading invalidated slots from queue if (evtCtrl[evtCtrl_frstPtr].evtStat==-1 && k0==evtCtrl_frstPtr) { evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % MAX_EVT; continue; } } // The number of complete events in the buffer is the total number of // events in the buffer minus the number of incomplete events. gj.bufEvt = count - gj.bufNew; gj.deltaT = 1000; //temporary, must be improved for (int ib = 0; ib < NBOARDS; ib++) gj.totBytes[ib] += gj.rateBytes[ib]; gj.totMem = tgb_memory; if (gj.maxMem > gj.xxxMem) gj.xxxMem = gj.maxMem; if (gj.maxEvt > gj.xxxEvt) gj.xxxEvt = gj.maxEvt; factStat (gj); //factStatNew (gi); gj.rateNew = gj.rateWrite = 0; gj.maxMem = gj.usdMem; gj.maxEvt = gj.bufTot; for (int b = 0; b < NBOARDS; b++) gj.rateBytes[b] = 0; } // while (g_runStat >= 0 && g_reset == 0) factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset); if (g_reset > 0) { gi_reset = g_reset; gi_resetR = gi_reset % 10; //shall we stop reading ? gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ? gi_resetW = (gi_reset / 100) % 10; //shall we close files ? gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ? g_reset = 0; } else { gi_reset = 0; gi_resetR = g_runStat == -1 ? 1 : 7; gi_resetS = 7; //close all sockets gi_resetW = 7; //close all files gi_resetX = 0; //inform others we have to quit .... gj.readStat = -11; //inform all that no update to happen any more } if (gi_resetS > 0) { //must close all open sockets ... factPrintf(kInfo, -1, "Close all sockets..."); for (int i = 0; i < NBOARDS; i++) { if (rd[i].sockStat != 0) continue; GenSock(-1, i, 0, NULL, &rd[i]); //close and destroy open socket gi_NumConnect[i]-= cntsock ; gj.numConn[i]--; sockDef[i] = 0; //flag ro recreate the sockets ... rd[i].sockStat = -1; //and try to open asap } } if (gi_resetR > 0) { //and clear all buffers (might have to wait until all others are done) while (evtCtrl_frstPtr!=evtCtrl_lastPtr) { const int k0=evtCtrl_frstPtr; // flag incomplete events as 'read finished' // We cannot just detele all events, because some might currently being processed, // so we have to wait until the processing thread currently processing the event // signals that the event can be deleted. (Note, that there are currently never // two threads processing the same event at the same time) if (evtCtrl[k0].evtStat<90 || evtCtrl[k0].evtStat==10000) { evtCtrl[k0].evtStat = -1; mBufFree(k0); //event written--> free memory evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % MAX_EVT; } usleep(1); } } if (gi_reset > 0) { if (gi_resetW > 0) CloseRunFile (0, 0, 0); //ask all Runs to be closed if (gi_resetX > 0) { struct timespec xwait; xwait.tv_sec = gi_resetX; xwait.tv_nsec = 0; nanosleep (&xwait, NULL); } factPrintf(kInfo, -1, "Continue read Process ..."); gi_reset = 0; goto START; } factPrintf(kInfo, -1, "Exit read Process..."); factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse); gj.readStat = -99; factStat (gj); //factStatNew (gi); return 0; } /*-----------------------------------------------------------------*/ void *subProc(void *thrid) { const int64_t threadID = (int64_t)thrid; factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID); while (g_runStat > -2) //in case of 'exit' we still must process pending events { int numWait = 0; for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) { // This is a threading issue... the evtStat might have been invalid // but the frstPtr is not yet updated if (evtCtrl[k0].evtStat==-1) continue; // If we find the first event still waiting for processing // there will be only unprocessed events after this one in the queue if (evtCtrl[k0].evtStat<1000+threadID) { numWait = 1; break; } // If the event was processed already, skip it // We could replace that to a moving pointer pointing to the first // non-processed event if (evtCtrl[k0].evtStat!=1000+threadID) continue; const int jret = subProcEvt(threadID, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent, 0); if (jret>0 && jret<=threadID) factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret); if (jret<=threadID) { evtCtrl[k0].evtStat = 10000; // flag as 'to be deleted' continue; } if (jret>=gi_maxProc) { evtCtrl[k0].evtStat = 5000; // flag as 'to be written' continue; } evtCtrl[k0].evtStat = 1000 + jret; // flag for next proces } if (gj.readStat < -10 && numWait == 0) { //nothing left to do factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID); return 0; } usleep(1); } factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID); return 0; } /*-----------------------------------------------------------------*/ void * procEvt (void *ptr) { /* *** main loop processing file, including SW-trigger */ int status; factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc); pthread_t thread[100]; // int th_ret[100]; for (long long k = 0; k < gi_maxProc; k++) { /*th_ret[k] =*/ pthread_create (&thread[k], NULL, subProc, (void *) k); } // in case of 'exit' we still must process pending events while (g_runStat > -2) { int numWait = 0; for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) { // This is a threading issue... the evtStat might have been invalid // but the frstPtr is not yet updated if (evtCtrl[k0].evtStat==-1) continue; // If we find the first incomplete event which is not supposed to // be processed, there are only more incomplete events in the queue if (evtCtrl[k0].evtStat<90) { numWait = 1; break; } // If the event was processed already, skip it. // We could replace that to a moving pointer pointing to the first // non-processed event if (evtCtrl[k0].evtStat>=1000) continue; //-------- it is better to open the run already here, so call can be used to initialize //-------- buffers etc. needed to interprete run (e.g. DRS calibration) const uint32_t irun = evtCtrl[k0].runNum; const int32_t ievt = evtCtrl[k0].evNum; const int idx = evtCtrl[k0].runCtrl_idx; if (runCtrl[idx].runId!=irun) { //factPrintf(kFatal, 901, "procEvt: runCtrl entry for run %d vanished (evt=%d)", irun, ievt); // FIXME: What is the right action? (Flag event for deletion?) continue; } // File not yet open if (runCtrl[idx].fileId < 0) { RUN_HEAD actRun; actRun.Version = 1; actRun.RunType = -1; //to be adapted actRun.Nroi = evtCtrl[k0].nRoi; //runCtrl[lastRun].roi0; actRun.NroiTM = evtCtrl[k0].nRoiTM; //runCtrl[lastRun].roi8; actRun.RunTime = evtCtrl[k0].pcTime[0]; //runCtrl[lastRun].firstTime; actRun.RunUsec = evtCtrl[k0].pcTime[1]; //runCtrl[lastRun].firstUsec; actRun.NBoard = NBOARDS; actRun.NPix = NPIX; actRun.NTm = NTMARK; memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS*sizeof(PEVNT_HEADER)); runCtrl[idx].fileHd = runOpen(irun, &actRun, sizeof (actRun)); if (runCtrl[idx].fileHd == NULL) { factPrintf(kError, 502, "procEvt: Could not open new file for run %d (idx=%d, evt=%d, runOpen failed)", irun, idx, ievt); runCtrl[idx].fileId = 91; // FIXME: What happens to evtStat? Shell we really just try again? continue; } runCtrl[idx].fileId = 0; factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (idx=%d, evt=%d)", irun, idx, ievt); } //-------- //-------- //and set correct event header ; also check for consistency in event (not yet) evtCtrl[k0].fEvent->Errors[0] = evtCtrl[k0].Errors[0]; evtCtrl[k0].fEvent->Errors[1] = evtCtrl[k0].Errors[1]; evtCtrl[k0].fEvent->Errors[2] = evtCtrl[k0].Errors[2]; evtCtrl[k0].fEvent->Errors[3] = evtCtrl[k0].Errors[3]; for (int ib=0; ibBoardTime[ib] = 0; } else { evtCtrl[k0].fEvent->BoardTime[ib] = evtCtrl[k0].FADhead[ib].time; } } const int rc = eventCheck(evtCtrl[k0].runNum, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent); if (rc < 0) { evtCtrl[k0].evtStat = 10000; // flag event to be deleted } else { evtCtrl[k0].evtStat = 1000; // flag 'start processing' runCtrl[idx].procEvt++; } } if (gj.readStat < -10 && numWait == 0) { //nothing left to do factPrintf(kInfo, -1, "Exit Processing Process ..."); gj.procStat = -22; //==> we should exit return 0; } usleep(1); gj.procStat = gj.readStat; } //we are asked to abort asap ==> must flag all remaining events // when gi_runStat claims that all events are in the buffer... factPrintf(kInfo, -1, "Abort Processing Process ..."); for (int k = 0; k < gi_maxProc; k++) { pthread_join (thread[k], (void **) &status); } gj.procStat = -99; return 0; } /*-----------------------------------------------------------------*/ int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt) { /* close run runId (all all runs if runId=0) */ /* return: 0=close scheduled / >0 already closed / <0 does not exist */ int j; if (runId == 0) { for (j = 0; j < MAX_RUN; j++) { if (runCtrl[j].fileId == 0) { //run is open runCtrl[j].closeTime = closeTime; runCtrl[j].maxEvt = maxEvt; } } return 0; } for (j = 0; j < MAX_RUN; j++) { if (runCtrl[j].runId == runId) { if (runCtrl[j].fileId == 0) { //run is open runCtrl[j].closeTime = closeTime; runCtrl[j].maxEvt = maxEvt; return 0; } else if (runCtrl[j].fileId < 0) { //run not yet opened runCtrl[j].closeTime = closeTime; runCtrl[j].maxEvt = maxEvt; return +1; } else { // run already closed return +2; } } } //we only reach here if the run was never created return -1; } void checkAndCloseRun(int j, int cond, int where) { if (!cond && runCtrl[j].closeTime >= g_actTime && runCtrl[j].lastTime >= g_actTime - 300 && runCtrl[j].maxEvt > runCtrl[j].actEvt) return; //close run for whatever reason int ii = 0; if (cond) ii = 1; if (runCtrl[j].closeTime < g_actTime) ii |= 2; // = 2; if (runCtrl[j].lastTime < g_actTime - 300) ii |= 4; // = 3; if (runCtrl[j].maxEvt <= runCtrl[j].actEvt) ii |= 8; // = 4; runCtrl[j].closeTime = g_actTime - 1; const int rc = runClose(runCtrl[j].fileHd, NULL, 0);//&runTail[j], sizeof(runTail[j])); if (rc<0) { factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)", where, runCtrl[j].runId, rc); runCtrl[j].fileId = 92+where*2; } else { factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)", where, runCtrl[j].runId, ii); runCtrl[j].fileId = 93+where*2; } } /*-----------------------------------------------------------------*/ void *writeEvt (void *ptr) { /* *** main loop writing event (including opening and closing run-files */ factPrintf(kInfo, -1, "Starting write-thread"); while (g_runStat > -2) { //int numWrite = 0; int numWait = 0; // Note that the current loop does not at all gurantee that // the events are written in the correct order. for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT) { // This is a threading issue... the evtStat might have been invalid // but the frstPtr is not yet updated if (evtCtrl[k0].evtStat==-1) continue; // If we find the first non-written event which is not supposed to // be written, there are only more incomplete events in the queue if (evtCtrl[k0].evtStat<5000) { numWait = 1; break; } // If the event was written already already, skip it // We could replace that to a moving pointer pointing to the first // non-processed event if (evtCtrl[k0].evtStat!=5000) continue; const uint32_t irun = evtCtrl[k0].runNum; const int idx = evtCtrl[k0].runCtrl_idx; if (runCtrl[idx].runId!=irun) { //factPrintf(kFatal, 901, "writeEvt: runCtrl entry for run %d vanished (evt=%d)", irun, ievt); // FIXME: What is the right action? (Flag event for deletion?) continue; } // File is open if (runCtrl[idx].fileId==0) { const int rc = runWrite(runCtrl[idx].fileHd, evtCtrl[k0].fEvent, 0); if (rc >= 0) { // Sucessfully wrote event runCtrl[idx].lastTime = g_actTime; runCtrl[idx].actEvt++; } else factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun); checkAndCloseRun(idx, rc<0, 1); } evtCtrl[k0].evtStat = 10000; // event written (or has to be discarded) -> delete } // Although the are no pending events, we have to check if a run should be closed (timeout) for (int j=0; j we should exit break; } gj.writStat = gj.readStat; } factPrintf(kInfo, -1, "Close all open files ..."); for (int j=0; j 90) { factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc); gi_maxProc = 1; } //partially initialize event control logics evtCtrl_frstPtr = 0; evtCtrl_lastPtr = 0; //start all threads (more to come) when we are allowed to .... while (g_runStat == 0) { xwait.tv_sec = 0; xwait.tv_nsec = 10000000; // sleep for ~10 msec nanosleep (&xwait, NULL); } i = 0; /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL); i++; /*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL); i++; /*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL); i++; imax = i; #ifdef BILAND xwait.tv_sec = 30;; xwait.tv_nsec = 0; // sleep for ~20sec nanosleep (&xwait, NULL); printf ("close all runs in 2 seconds\n"); CloseRunFile (0, time (NULL) + 2, 0); xwait.tv_sec = 1;; xwait.tv_nsec = 0; // sleep for ~20sec nanosleep (&xwait, NULL); printf ("setting g_runstat to -1\n"); g_runStat = -1; #endif //wait for all threads to finish for (i = 0; i < imax; i++) { /*j =*/ pthread_join (thread[i], (void **) &status); } } /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ #ifdef BILAND int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, int8_t * buffer) { printf ("called subproc %d\n", threadID); return threadID + 1; } /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len) { return 1; }; int runWrite (FileHandle_t fileHd, EVENT * event, size_t len) { return 1; usleep (10000); return 1; } //{ return 1; } ; int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len) { return 1; }; int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event) { int i = 0; // printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) ); // for (i=0; i