// // // #define EVTDEBUG #define NUMSOCK 1 //set to 7 for old configuration #define MAXREAD 65536 //64kB wiznet buffer #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "EventBuilder.h" enum Severity { kMessage = 10, ///< Just a message, usually obsolete kInfo = 20, ///< An info telling something which can be interesting to know kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour kError = 40, ///< Error, something unexpected happened, but can still be handled by the program kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination kDebug = 99, ///< A message used for debugging only }; #define MIN_LEN 32 // min #bytes needed to interpret FADheader #define MAX_LEN 256*1024 // size of read-buffer per socket //#define nanosleep(x,y) extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len); extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len); extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len); //extern int runFinish (uint32_t runnr); extern void factOut (int severity, int err, char *message); extern void factReportIncomplete (uint64_t rep); extern void gotNewRun (int runnr, PEVNT_HEADER * headers); extern void factStat (GUI_STAT gj); extern void factStatNew (EVT_STAT gi); extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event); extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, int8_t * buffer); extern void debugHead (int i, int j, void *buf); extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, int state, uint32_t tsec, uint32_t tusec); extern void debugStream (int isock, void *buf, int len); int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt); int evtCtrl_frstPtr; int evtCtrl_lastPtr; int g_maxProc; int g_maxSize; int gi_maxSize; int gi_maxProc; uint g_actTime; uint g_actUsec; int g_runStat; int g_reset; int g_useFTM; int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX; size_t g_maxMem; //maximum memory allowed for buffer //no longer needed ... int g_maxBoards; //maximum number of boards to be initialized int g_actBoards; // FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd" int gi_runStat; int gp_runStat; int gw_runStat; //int gi_memStat = +1; uint32_t actrun = 0; uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards //uint gi_EvtStart= 0 ; //uint gi_EvtRead = 0 ; //uint gi_EvtBad = 0 ; //uint gi_EvtTot = 0 ; //size_t gi_usedMem = 0 ; //uint gw_EvtTot = 0 ; //uint gp_EvtTot = 0 ; //PIX_MAP g_pixMap[NPIX]; EVT_STAT gi; GUI_STAT gj; EVT_CTRL evtCtrl[MAX_EVT * MAX_RUN]; //control of events during processing //#define MXSTR 1000 //char str[MXSTR]; void factPrintf(int severity, int id, const char *fmt, ...) { char str[1000]; va_list ap; va_start(ap, fmt); vsnprintf(str, 1000, fmt, ap); va_end(ap); factOut(severity, id, str); } #define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER)) #define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM) typedef struct TGB_struct { struct TGB_struct *prev; void *mem; } TGB_entry; TGB_entry *tgb_last = NULL; uint64_t tgb_memory = 0; uint64_t tgb_inuse = 0; void *TGB_Malloc() { // No free slot available, next alloc would exceed max memory if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem) return NULL; // We will return this amount of memory tgb_inuse += MAX_TOT_MEM; // No free slot available, allocate a new one if (!tgb_last) { tgb_memory += MAX_TOT_MEM; return malloc(MAX_TOT_MEM); } // Get the next free slot from the stack and return it TGB_entry *last = tgb_last; void *mem = last->mem; tgb_last = last->prev; free(last); return mem; }; void TGB_free(void *mem) { // Add the last free slot to the stack TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry)); entry->prev = tgb_last; entry->mem = mem; tgb_last = entry; // Decrease the amont of memory in use accordingly tgb_inuse -= MAX_TOT_MEM; } RUN_CTRL runCtrl[MAX_RUN]; //RUN_TAIL runTail[MAX_RUN]; /* *** Definition of rdBuffer to read in IP packets; keep it global !!!! */ typedef union { int8_t B[MAX_LEN]; int16_t S[MAX_LEN / 2]; int32_t I[MAX_LEN / 4]; int64_t L[MAX_LEN / 8]; } CNV_FACT; typedef struct { int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ... int32_t bufPos; //next byte to read to the buffer next int32_t bufLen; //number of bytes left to read // size_t bufLen; //number of bytes left to read size_t might be better int32_t skip; //number of bytes skipped before start of event int errCnt; //how often connect failed since last successful int sockStat; //-1 if socket not yet connected , 99 if not exist int socket; //contains the sockets struct sockaddr_in SockAddr; //IP for each socket int evtID; // event ID of event currently read int runID; // run " int ftmID; // event ID from FTM uint fadLen; // FADlength of event currently read int fadVers; // Version of FAD int ftmTyp; // trigger type int board; // boardID (softwareID: 0..40 ) int Port; CNV_FACT *rBuf; } READ_STRUCT; typedef union { int8_t B[2]; int16_t S; } SHORT_BYTE; SHORT_BYTE start, stop; READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ int runFinish1 (uint32_t runnr) { factPrintf(kInfo, 173, "Should finish(1) run %d (but not yet possible)", runnr); return 0; } int runFinish (uint32_t runnr) { factPrintf(kInfo, 173, "Should finish run %d (but not yet possible)", runnr); return 0; } int GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT * rs) { /* *** generate Address, create sockets and allocates readbuffer for it *** *** if flag==0 generate socket and buffer *** <0 destroy socket and buffer *** >0 close and redo socket *** *** sid : board*7 + port id */ int j; int optval = 1; //activate keepalive socklen_t optlen = sizeof (optval); if (sid % 7 >= NUMSOCK) { //this is a not used socket, so do nothing ... rs->sockStat = 77; rs->rBuf = NULL ; return 0; } if (rs->sockStat == 0) { //close socket if open j = close (rs->socket); if (j > 0) { factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno); } else { factPrintf(kInfo, 771, "Succesfully closed socket %d", sid); } } rs->sockStat = 99; if (flag < 0) { free (rs->rBuf); //and never open again rs->rBuf = NULL; return 0; } if (flag == 0) { //generate address and buffer ... rs->Port = port; rs->SockAddr.sin_family = sockAddr->sin_family; rs->SockAddr.sin_port = htons (port); rs->SockAddr.sin_addr = sockAddr->sin_addr; rs->rBuf = (CNV_FACT*)malloc (sizeof (CNV_FACT)); if (rs->rBuf == NULL) { factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid); rs->sockStat = 77; return -3; } } if ((rs->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno); rs->sockStat = 88; return -2; } optval = 1; if (setsockopt (rs->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) { factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } optval = 10; //start after 10 seconds if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) { factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } optval = 10; //do every 10 seconds if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) { factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } optval = 2; //close after 2 unsuccessful tries if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) { factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } factPrintf(kInfo, 773, "Successfully generated socket %d", sid); rs->sockStat = -1; //try to (re)open socket rs->errCnt = 0; return 0; } /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ int mBufInit () { // initialize mBuffer (mark all entries as unused\empty) //uint32_t actime = g_actTime + 50000000; for (int i = 0; i < MAX_EVT * MAX_RUN; i++) { evtCtrl[i].evNum = evtCtrl[i].nRoi = -1; evtCtrl[i].runNum = 0; //evtCtrl[i].mBuffer_idx = -1; evtCtrl[i].evtStat = -1; } //actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER)); evtCtrl_frstPtr = 0; evtCtrl_lastPtr = 0; return 0; } /*-----------------------------------------------------------------*/ int checkRoiConsistency(int i, int roi[]); int mBufEvt(int sk) { // generate a new Event into mBuffer: // make sure only complete Event are possible, so 'free' will always work // returns index into mBuffer[], or negative value in case of error // error: <-9000 if roi screwed up (not consistent with run) // <-8000 (not consistent with event) // <-7000 (not consistent with board) // < 0 if no space left // int evFree; // int headmem = 0; // size_t needmem = 0; int nRoi[9]; if (!checkRoiConsistency(sk, nRoi)) return -9999; //const int b = sk / 7; const int evID = rd[sk].evtID; const uint runID = rd[sk].runID; const int trgTyp = rd[sk].ftmTyp; const int trgNum = rd[sk].ftmID; const int fadNum = rd[sk].evtID; int beg = (evtCtrl_lastPtr + MAX_EVT*MAX_RUN - 1) % (MAX_EVT*MAX_RUN); int end = (evtCtrl_frstPtr + MAX_EVT*MAX_RUN - 1) % (MAX_EVT*MAX_RUN); for (int k=beg; k!=end; k--, k += MAX_EVT*MAX_RUN, k %= MAX_EVT*MAX_RUN) { // If the run is different, go on searching. // We cannot stop searching if a lower run-id is found as in // the case of the events, because theoretically, there // can be the same run on two different days. if (runID != evtCtrl[k].runNum) continue; // The event in the control structure has an event id with // a lower event id than the current one. All previous events // should have an even lower event id, so there is no way it // can be found in the structure. if (evtCtrl[k].evNum < evID/* && runID == evtCtrl[k].runNum*/) break; if (evID != evtCtrl[k].evNum/* || runID != evtCtrl[k].runNum*/) continue; // is it ok ???? if (evtCtrl[k].nRoi != nRoi[0] || evtCtrl[k].nRoiTM != nRoi[8]) { factPrintf(kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.", evtCtrl[k].nRoi, evtCtrl[k].nRoiTM, nRoi[0], nRoi[8]); return -8201; } // count for inconsistencies if (evtCtrl[k].trgNum != trgNum) evtCtrl[k].Errors[0]++; if (evtCtrl[k].fadNum != fadNum) evtCtrl[k].Errors[1]++; if (evtCtrl[k].trgTyp != trgTyp) evtCtrl[k].Errors[2]++; //everything seems fine so far ==> use this slot .... return k; } //event does not yet exist; create it if (end-beg==1 || (end==0 && beg==MAX_EVT*MAX_RUN-1)) //no space available in ctrl { factPrintf(kError, 881, "No control slot to keep event %d (run %d) %d %d", evID, runID, beg, end); return -1; } // FIXME: This should be the time of the first receiped board struct timeval tv; gettimeofday (&tv, NULL); const uint32_t tsec = tv.tv_sec; const uint32_t tusec = tv.tv_usec; //check if runId already registered in runCtrl uint oldest = g_actTime + 1000; int jold = -1; int found = 0; // fileId==-2: not yet used or run assigned but not open // fileId== 0: file open // fileId>0: run closed for (int k=0; kAdc_Data, 0, (NPIX+NTMARK)*2*evtCtrl[i].nRoi); //flag all pixels as unused for (int k = 0; k < NPIX; k++) evtCtrl[i].fEvent->StartPix[k] = -1; //flag all TMark as unused for (int k = 0; k < NTMARK; k++) evtCtrl[i].fEvent->StartTM[k] = -1; evtCtrl[i].fEvent->NumBoards = 0; evtCtrl[i].fEvent->SoftTrig = 0; evtCtrl[i].fEvent->PCTime = evtCtrl[i].pcTime[0]; evtCtrl[i].fEvent->PCUsec = evtCtrl[i].pcTime[1]; evtCtrl[i].fEvent->Roi = evtCtrl[i].nRoi; evtCtrl[i].fEvent->RoiTM = evtCtrl[i].nRoiTM; evtCtrl[i].fEvent->EventNum = evtCtrl[i].evNum; evtCtrl[i].fEvent->TriggerNum = evtCtrl[i].trgNum; evtCtrl[i].fEvent->TriggerType = evtCtrl[i].trgTyp; } int mBufFree (int i) { //delete entry [i] from mBuffer: //(and make sure multiple calls do no harm ....) TGB_free(evtCtrl[i].FADhead); evtCtrl[i].fEvent = NULL; evtCtrl[i].FADhead = NULL; evtCtrl[i].evNum = evtCtrl[i].nRoi = -1; evtCtrl[i].runNum = 0; gj.usdMem = tgb_inuse; gj.bufTot--; /*if (gi_memStat < 0) { if (gj.usdMem <= 0.75 * gj.maxMem) gi_memStat = +1; }*/ return 0; } /*-----------------------------------------------------------------*/ /* void resetEvtStat () { for (int i = 0; i < MAX_SOCK; i++) gi.numRead[i] = 0; for (int i = 0; i < NBOARDS; i++) { gi.gotByte[i] = 0; gi.gotErr[i] = 0; } gi.evtGet = 0; //#new Start of Events read gi.evtTot = 0; //#complete Events read gi.evtErr = 0; //#Events with Errors gi.evtSkp = 0; //#Events incomplete (timeout) gi.procTot = 0; //#Events processed gi.procErr = 0; //#Events showed problem in processing gi.procTrg = 0; //#Events accepted by SW trigger gi.procSkp = 0; //#Events rejected by SW trigger gi.feedTot = 0; //#Events used for feedBack system gi.feedErr = 0; //#Events rejected by feedBack gi.wrtTot = 0; //#Events written to disk gi.wrtErr = 0; //#Events with write-error gi.runOpen = 0; //#Runs opened gi.runClose = 0; //#Runs closed gi.runErr = 0; //#Runs with open/close errors return; }*/ /*-----------------------------------------------------------------*/ void reportIncomplete(int id) { factPrintf(kWarn, 601, "%5d skip incomplete evt %8d", evtCtrl[id].evNum, id); uint64_t report = 0; char str[1000]; int ik=0; for (int ib=0; ib=0) // data received from that board { str[ik++] = '0'+(jb%10); continue; } // FIXME: Is that really 'b' or should that be 'ib' ? if (gi_NumConnect[ib]<=0) // board not connected { str[ik++] = 'x'; continue; } // data from this board lost str[ik++] = '.'; report |= ((uint64_t)1)<S[roiPtr]); for (int jr = 0; jr < 9; jr++) { roi[jr] = ntohs(rd[i].rBuf->S[roiPtr]); if (roi[jr]<0 || roi[jr]>1024) { factPrintf(kError, 999, "Illegal roi in channel %d (allowed: 0<=roi<=1024)", jr, roi[jr]); return 0; } // Check that the roi of pixels jr are compatible with the one of pixel 0 if (jr!=8 && roi[jr]!=roi[0]) { xjr = jr; break; } // Check that the roi of all other DRS chips on boards are compatible for (int kr = 1; kr < 4; kr++) { const int kroi = ntohs(rd[i].rBuf->S[roiPtr]); if (kroi != roi[jr]) { xjr = jr; xkr = kr; break; } roiPtr += kroi+4; } } if (xjr>=0) { if (xkr<0) factPrintf(kFatal, 1, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]); else factPrintf(kFatal, 1, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd[i].rBuf->S[roiPtr])); return 0; } //const int b = i / 7; /* if (roi[0]<0 || roi[0] > 1024) { factPrintf(kError, 999, "Illegal roi in channel 0: %d (allowed: 0<=roi<=1024)", roi[0]); gj.badRoiR++; gj.badRoi[b]++; return 0; } */ /* for (int jr = 1; jr < 8; jr++) { if (roi[jr] != roi[0]) { factPrintf(kError, 711, "Mismatch of roi (%d) in channel %d with roi (%d) in channel 0.", roi[jr], jr, roi[0]); gj.badRoiB++; gj.badRoi[b]++; return 0; } } */ if (roi[8] < roi[0]) { factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]); //gj.badRoiB++; //gj.badRoi[b]++; return 0; } return 1; } void swapEventHeaderBytes(int i) { //End of the header. to channels now int eStart = 36; for (int ePatchesCount = 0; ePatchesCount<4*9;ePatchesCount++) { rd[i].rBuf->S[eStart+0] = ntohs(rd[i].rBuf->S[eStart+0]);//id rd[i].rBuf->S[eStart+1] = ntohs(rd[i].rBuf->S[eStart+1]);//start_cell rd[i].rBuf->S[eStart+2] = ntohs(rd[i].rBuf->S[eStart+2]);//roi rd[i].rBuf->S[eStart+3] = ntohs(rd[i].rBuf->S[eStart+3]);//filling eStart += 4+rd[i].rBuf->S[eStart+2];//skip the pixel data } } void copyData(int i, int evID, /*int roi,*/ int boardId) { swapEventHeaderBytes(i); memcpy(&evtCtrl[evID].FADhead[boardId].start_package_flag, &rd[i].rBuf->S[0], sizeof(PEVNT_HEADER)); int src = sizeof(PEVNT_HEADER) / 2; // consistency of ROIs have been checked already (is it all correct?) const int roi = rd[i].rBuf->S[src+2]; // different sort in FAD board..... for (int px = 0; px < 9; px++) { for (int drs = 0; drs < 4; drs++) { // pixH = rd[i].rBuf->S[src++]; // ID src++; const int pixC = rd[i].rBuf->S[src++]; // start-cell const int pixR = rd[i].rBuf->S[src++]; // roi //here we should check if pixH is correct .... const int pixS = boardId * 36 + drs * 9 + px; src++; evtCtrl[evID].fEvent->StartPix[pixS] = pixC; const int dest1 = pixS * roi; memcpy(&evtCtrl[evID].fEvent->Adc_Data[dest1], &rd[i].rBuf->S[src], roi * 2); src += pixR; if (px == 8) { const int tmS = boardId * 4 + drs; //and we have additional TM info if (pixR > roi) { const int dest2 = tmS * roi + NPIX * roi; const int srcT = src - roi; evtCtrl[evID].fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024; memcpy(&evtCtrl[evID].fEvent->Adc_Data[dest2], &rd[i].rBuf->S[srcT], roi * 2); } else { evtCtrl[evID].fEvent->StartTM[tmS] = -1; //ETIENNE because the TM channels are always processed during drs calib, //set them to zero if they are not present //I suspect that it may be more efficient to set all the allocated mem to //zero when allocating it // dest = tmS*roi[0] + NPIX*roi[0]; // bzero(&mBuffer[evID].fEvent->Adc_Data[dest],roi[0]*2); } } } } } void initReadFAD () { return; } /*-----------------------------------------------------------------*/ //struct rnd //{ // int val; // int idx; //}; // //struct rnd random_arr[MAX_SOCK]; // //int compare(const void *f1, const void *f2) //{ // struct rnd *r1 = (struct rnd*)f1; // struct rnd *r2 = (struct rnd*)f2; // return r1->val - r2->val; //} void *readFAD (void *ptr) { /* *** main loop reading FAD data and sorting them to complete events */ factPrintf(kInfo, -1, "Start initializing (readFAD)"); // int cpu = 7; //read thread // cpu_set_t mask; /* CPU_ZERO initializes all the bits in the mask to zero. */ // CPU_ZERO (&mask); /* CPU_SET sets only the bit corresponding to cpu. */ // cpu = 7; // CPU_SET (cpu, &mask); /* sched_setaffinity returns 0 in success */ // if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { // snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu); // factOut (kWarn, -1, str); // } const int minLen = sizeof(PEVNT_HEADER); //min #bytes needed to check header: full header for debug start.S = 0xFB01; stop.S = 0x04FE; /* initialize run control logics */ for (int i = 0; i < MAX_RUN; i++) { runCtrl[i].runId = 0; runCtrl[i].fileId = -2; runCtrl[i].procId = -2; } gi_resetS = gi_resetR = 9; int sockDef[NBOARDS]; //internal state of sockets memset(sockDef, 0, NBOARDS*sizeof(int)); START: evtCtrl_frstPtr = 0; evtCtrl_lastPtr = 0; //time in seconds uint gi_SecTime = time(NULL);; const int cntsock = 8 - NUMSOCK ; if (gi_resetS > 0) { //make sure all sockets are preallocated as 'not exist' for (int i = 0; i < MAX_SOCK; i++) { rd[i].socket = -1; rd[i].sockStat = 99; } for (int k = 0; k < NBOARDS; k++) { gi_NumConnect[k] = 0; //gi.numConn[k] = 0; gj.numConn[k] = 0; //gj.errConn[k] = 0; gj.rateBytes[k] = 0; gj.totBytes[k] = 0; } } if (gi_resetR > 0) { //resetEvtStat (); gj.bufTot = gj.maxEvt = gj.xxxEvt = 0; gj.usdMem = gj.maxMem = gj.xxxMem = 0; gj.totMem = tgb_memory; gj.bufNew = gj.bufEvt = 0; //gj.badRoiE = gj.badRoiR = gj.badRoiB = 0; gj.evtSkip = gj.evtWrite = gj.evtErr = 0; //for (int b = 0; b < NBOARDS; b++) // gj.badRoi[b] = 0; mBufInit (); //initialize buffers factPrintf(kInfo, -1, "End initializing (readFAD)"); } gi_reset = gi_resetR = gi_resetS = gi_resetW = 0; //loop until global variable g_runStat claims stop while (g_runStat >= 0 && g_reset == 0) { gi_runStat = g_runStat; gj.readStat = g_runStat; struct timeval tv; gettimeofday (&tv, NULL); g_actTime = tv.tv_sec; g_actUsec = tv.tv_usec; for (int b = 0; b < NBOARDS; b++) { // Nothing has changed if (g_port[b].sockDef == sockDef[b]) continue; gi_NumConnect[b] = 0; //must close all connections //gi.numConn[b] = 0; gj.numConn[b] = 0; // s0 = 0: sockets to be defined and opened // s0 = -1: sockets to be destroyed // s0 = +1: sockets to be closed and reopened int s0 = 0; if (sockDef[b] != 0) s0 = g_port[b].sockDef==0 ? -1 : +1; const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0; int k = b * 7; for (int p = p0 + 1; p < p0 + 8; p++, k++) GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket sockDef[b] = g_port[b].sockDef; } // count the number of active boards int actBoards = 0; for (int b = 0; b < NBOARDS; b++) if (sockDef[b] > 0) actBoards++; //count number of succesfull actions // int numok = 0; /* for (i=0; i 0) continue; const int board = i / 7 ; //const int p = i % 7 ; if (rd[i].sockStat == -1) { //try to connect if not yet done rd[i].sockStat = connect (rd[i].socket, (struct sockaddr *) &rd[i].SockAddr, sizeof (rd[i].SockAddr)); // Failed if (rd[i].sockStat == -1) { rd[i].errCnt++; usleep(25000); continue; } // Success (rd[i].sockStat == 0) if (sockDef[board] > 0) { rd[i].bufTyp = 0; // expect a header rd[i].bufLen = sizeof(PEVNT_HEADER); // max size to read at begining } else { rd[i].bufTyp = -1; // full data to be skipped rd[i].bufLen = MAX_LEN; // huge for skipping } rd[i].bufPos = 0; // no byte read so far rd[i].skip = 0; // start empty gi_NumConnect[board] += cntsock; //gi.numConn[b]++; gj.numConn[board]++; factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", board, gj.numConn[board]); } // Do not read from this socket if (rd[i].bufLen<0) continue; //numok++; if (rd[i].bufLen>0) { const int32_t jrd = recv(rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT); // recv failed if (jrd<0) { // There was just nothing waiting if (errno==EWOULDBLOCK || errno==EAGAIN) { //numok--; continue; } factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno); //gi.gotErr[b]++; continue; } // connection was closed ... if (jrd==0) { factPrintf(kInfo, 441, "Socket %d closed by FAD", i); const int s0 = sockDef[board] > 0 ? +1 : -1; GenSock(s0, i, 0, NULL, &rd[i]); //gi.gotErr[b]++; gi_NumConnect[board]-= cntsock ; //gi.numConn[b]--; gj.numConn[board]--; continue; } // Success (jrd > 0) gj.rateBytes[board] += jrd; // are we skipping this board ... if (rd[i].bufTyp < 0) continue; rd[i].bufPos += jrd; //==> prepare for continuation rd[i].bufLen -= jrd; #ifdef EVTDEBUG debugRead(i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, rd[i].bufTyp, tv.tv_sec, tv.tv_usec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event #endif } //we are reading event header if (rd[i].bufTyp <= 0) { //not yet sufficient data to take action if (rd[i].bufPos < minLen) continue; //check if startflag correct; else shift block .... // FIXME: This is not enough... this combination of // bytes can be anywhere... at least the end bytes // must be checked somewhere, too. int k; for (k = 0; k < rd[i].bufPos - 1; k++) { if (rd[i].rBuf->B[k] == start.B[1] && rd[i].rBuf->B[k+1] == start.B[0]) break; } rd[i].skip += k; //no start of header found if (k >= rd[i].bufPos - 1) { rd[i].bufPos = 0; rd[i].bufLen = sizeof(PEVNT_HEADER); continue; } if (k > 0) { rd[i].bufPos -= k; rd[i].bufLen += k; memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos); } if (rd[i].bufPos < minLen) continue; if (rd[i].skip > 0) { factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i); rd[i].skip = 0; } // TGB: This needs much more checks than just the first two bytes! // Swap everything except start_package_flag. // It is to difficult to find out where it is used how, // but it doesn't really matter because it is not really // used anywehere else // rd[i].rBuf->S[1] = ntohs(rd[i].rBuf->S[1]); // package_length rd[i].rBuf->S[2] = ntohs(rd[i].rBuf->S[2]); // version_no rd[i].rBuf->S[3] = ntohs(rd[i].rBuf->S[3]); // PLLLCK rd[i].rBuf->S[4] = ntohs(rd[i].rBuf->S[4]); // trigger_crc rd[i].rBuf->S[5] = ntohs(rd[i].rBuf->S[5]); // trigger_type rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]); // board id rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]); // adc_clock_phase_shift rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]); // number_of_triggers_to_generate rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]); // trigger_generator_prescaler rd[i].rBuf->I[3] = ntohl(rd[i].rBuf->I[3]); // trigger_id rd[i].rBuf->I[4] = ntohl(rd[i].rBuf->I[4]); // fad_evt_counter rd[i].rBuf->I[5] = ntohl(rd[i].rBuf->I[5]); // REFCLK_frequency rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]); // runnumber; rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]); // time; for (int s=24;s<24+NTemp+NDAC;s++) rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac rd[i].fadLen = ntohs(rd[i].rBuf->S[1]) * 2; rd[i].fadVers = rd[i].rBuf->S[2]; rd[i].ftmTyp = rd[i].rBuf->S[5]; rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt) rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt) rd[i].runID = rd[i].rBuf->I[11]==0 ? (int)g_actTime : rd[i].rBuf->I[11]; rd[i].bufTyp = 1; //ready to read full record rd[i].bufLen = rd[i].fadLen - rd[i].bufPos; const int fadBoard = rd[i].rBuf->S[12]; debugHead(i, fadBoard, rd[i].rBuf); continue; } // are we reading data // not yet all read if (rd[i].bufLen > 0) continue; if (rd[i].rBuf->B[rd[i].fadLen - 1] != stop.B[0] || rd[i].rBuf->B[rd[i].fadLen - 2] != stop.B[1]) { //gi.evtErr++; factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), expected %3d %3d, got %3d %3d", i, rd[i].evtID, rd[i].fadLen, stop.B[0], stop.B[1], rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]); // ready to read next header rd[i].bufTyp = 0; rd[i].bufLen = sizeof(PEVNT_HEADER); rd[i].bufPos = 0; continue; } // int actid; // if (g_useFTM > 0) // actid = rd[i].evtID; // else // actid = rd[i].ftmID; //get index into mBuffer for this event (create if needed) const int idx = mBufEvt(i); // no free entry in mBuffer, retry later if (idx == -1) continue; // We have a valid entry, but no memory has yet been allocated if (idx >= 0 && evtCtrl[idx].FADhead == NULL) { // Try to get memory from the big buffer evtCtrl[idx].FADhead = (PEVNT_HEADER*)TGB_Malloc(); if (evtCtrl[idx].FADhead == NULL) { // If this works properly, this is a hack which can be removed, or // replaced by a signal or dim message if (rd[i].bufTyp==2) factPrintf(kError, 882, "malloc failed for event %d (run=%d)", evtCtrl[idx].evNum, evtCtrl[idx].runNum); rd[i].bufTyp = 2; continue; } // Initialise contents of mBuffer[evID]->fEvent initEvent(idx); // Some statistics gj.usdMem = tgb_inuse; if (gj.usdMem > gj.maxMem) gj.maxMem = gj.usdMem; gj.rateNew++; gj.bufTot++; if (gj.bufTot > gj.maxEvt) gj.maxEvt = gj.bufTot; } // ready to read next header rd[i].bufTyp = 0; rd[i].bufLen = sizeof(PEVNT_HEADER); rd[i].bufPos = 0; // Fatal error occured. Event cannot be processed. Skip it. Start reading next header. if (idx < -1000) continue; //we have a valid entry in mBuffer[]; fill it const int fadBoard = rd[i].rBuf->S[12]; const int fadCrate = fadBoard>>8; if (board != (fadCrate * 10 + (fadBoard&0xff))) { factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)", board, fadBoard, fadCrate, fadBoard&0xff); } if (evtCtrl[idx].board[board] != -1) { factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d", evtCtrl[idx].evNum, board, i, rd[i].fadLen, rd[i].rBuf->B[0], rd[i].rBuf->B[1], rd[i].rBuf->B[rd[i].fadLen - 2], rd[i].rBuf->B[rd[i].fadLen - 1]); continue; // Continue reading next header } // Copy data from rd[i] to mBuffer[evID] copyData(i, idx, board); // now we have stored a new board contents into Event structure evtCtrl[idx].fEvent->NumBoards++; evtCtrl[idx].board[board] = board; evtCtrl[idx].nBoard++; evtCtrl[idx].evtStat = evtCtrl[idx].nBoard; // have we already reported first (partial) event of this run ??? if (evtCtrl[idx].nBoard==1 && evtCtrl[idx].runNum != actrun) { // Signal the fadctrl that a new run has been started gotNewRun(evtCtrl[idx].runNum, NULL); factPrintf(kInfo, 1, "gotNewRun called, prev run %d, new run %d, event %d", actrun, evtCtrl[idx].runNum, evtCtrl[idx].evNum); for (int j=0; j0) runCtrl[ir].runId = 0; } // Flag that the event is ready for processing evtCtrl[idx].evtStat = 99; } // end for loop over all sockets g_actTime = time (NULL); if (g_actTime <= gi_SecTime) { usleep(1); continue; } gi_SecTime = g_actTime; gj.bufNew = gj.bufEvt = 0; //loop over all active events and flag those older than read-timeout //delete those that are written to disk .... for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { // Check the more likely case first: incomplete events if (evtCtrl[k0].evtStat>0 && evtCtrl[k0].evtStat<92) { gj.bufNew++; //incomplete event in Buffer // Event has not yet timed out or was reported already if (evtCtrl[k0].evtStat>=90 || evtCtrl[k0].pcTime[0]/*evtCtrl[k0].lastRecv*/>=g_actTime - 30) continue; reportIncomplete(k0); //timeout for incomplete events evtCtrl[k0].evtStat = 91; gj.evtSkip++; continue; } // complete event in Buffer if (evtCtrl[k0].evtStat >= 95) gj.bufEvt++; // Check the less likely case: 'useless' or 'delete' if (evtCtrl[k0].evtStat==0 || evtCtrl[k0].evtStat >= 9000) { mBufFree(k0); //event written--> free memory evtCtrl[k0].evtStat = -1; if (k0==evtCtrl_frstPtr) { evtCtrl_frstPtr++; evtCtrl_frstPtr %= MAX_EVT * MAX_RUN; } else factPrintf(kDebug, -1, "Freed a non-first slot"); gj.evtWrite++; gj.rateWrite++; } } gj.deltaT = 1000; //temporary, must be improved for (int ib = 0; ib < NBOARDS; ib++) gj.totBytes[ib] += gj.rateBytes[ib]; gj.totMem = tgb_memory; if (gj.maxMem > gj.xxxMem) gj.xxxMem = gj.maxMem; if (gj.maxEvt > gj.xxxEvt) gj.xxxEvt = gj.maxEvt; factStat (gj); factStatNew (gi); gj.rateNew = gj.rateWrite = 0; gj.maxMem = gj.usdMem; gj.maxEvt = gj.bufTot; for (int b = 0; b < NBOARDS; b++) gj.rateBytes[b] = 0; } // while (g_runStat >= 0 && g_reset == 0) factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset); if (g_reset > 0) { gi_reset = g_reset; gi_resetR = gi_reset % 10; //shall we stop reading ? gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ? gi_resetW = (gi_reset / 100) % 10; //shall we close files ? gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ? g_reset = 0; } else { gi_reset = 0; gi_resetR = g_runStat == -1 ? 1 : 7; gi_resetS = 7; //close all sockets gi_resetW = 7; //close all files gi_resetX = 0; //inform others we have to quit .... gi_runStat = -11; //inform all that no update to happen any more gj.readStat = -11; //inform all that no update to happen any more } if (gi_resetS > 0) { //must close all open sockets ... factPrintf(kInfo, -1, "Close all sockets..."); for (int i = 0; i < MAX_SOCK; i++) { if (rd[i].sockStat != 0) continue; GenSock(-1, i, 0, NULL, &rd[i]); //close and destroy open socket if (i%7) continue; gi_NumConnect[i / 7]-= cntsock ; //gi.numConn[i / 7]--; gj.numConn[i / 7]--; sockDef[i / 7] = 0; //flag ro recreate the sockets ... rd[i / 7].sockStat = -1; //and try to open asap } } if (gi_resetR > 0) { //flag all events as 'read finished' for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { if (evtCtrl[k0].evtStat > 0 && evtCtrl[k0].evtStat < 90) { evtCtrl[k0].evtStat = 91; //gi.evtSkp++; //gi.evtTot++; } } //and clear all buffers (might have to wait until all others are done) // If minclear is 0, an event could be deleted while writing is still ongoing /* int minclear; if (gi_resetR == 1) { minclear = 900; factPrintf(kInfo, -1, "Drain all buffers ..."); } else { minclear = 0; factPrintf(kInfo, -1, "Flush all buffers ..."); }*/ const int minclear = 900; int numclear = 1; while (numclear > 0) { numclear = 0; for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { if (evtCtrl[k0].evtStat > minclear) { mBufFree(k0); //event written--> free memory evtCtrl[k0].evtStat = -1; if (k0==evtCtrl_frstPtr) { evtCtrl_frstPtr++; evtCtrl_frstPtr %= MAX_EVT * MAX_RUN; } else factPrintf(kDebug, -1, "Freed a non-first slot"); } else if (evtCtrl[k0].evtStat > 0) numclear++; //writing is still ongoing... } usleep(1); } } if (gi_reset > 0) { if (gi_resetW > 0) CloseRunFile (0, 0, 0); //ask all Runs to be closed if (gi_resetX > 0) { struct timespec xwait; xwait.tv_sec = gi_resetX; xwait.tv_nsec = 0; nanosleep (&xwait, NULL); } factPrintf(kInfo, -1, "Continue read Process ..."); gi_reset = 0; goto START; } factPrintf(kInfo, -1, "Exit read Process..."); factPrintf(kInfo, -1, "%ld Bytes flaged as in-use.", tgb_inuse); gi_runStat = -99; gj.readStat = -99; factStat (gj); factStatNew (gi); return 0; } /*-----------------------------------------------------------------*/ void *subProc(void *thrid) { const int64_t threadID = (int64_t)thrid; factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID); while (g_runStat > -2) //in case of 'exit' we still must process pending events { int numWait = 0; int numProc = 0; for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { if (evtCtrl[k0].evtStat != 1000 + threadID) { if (evtCtrl[k0].evtStat < 1000 + threadID) numWait++; continue; } int jret = 9100; // flag to be deleted (gi_resetR>1 : flush buffers asap) if (gi_resetR<=1) { jret = subProcEvt(threadID, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent, NULL/*mBuffer[id].buffer*/); if (jret <= threadID) { factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret); jret = 5300; } else if (jret <= 0) jret = 9200 + threadID; // flag as 'to be deleted' else if (jret >= gi_maxProc) jret = 5200 + threadID; // flag as 'to be written' else jret = 1000 + jret; // flag for next proces } evtCtrl[k0].evtStat = jret; numProc++; } if (gj.readStat < -10 && numWait == 0) { //nothing left to do factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID); return 0; } //seems we have nothing to do, so sleep a little if (numProc == 0) usleep(1); } factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID); return 0; } /*-----------------------------------------------------------------*/ void * procEvt (void *ptr) { /* *** main loop processing file, including SW-trigger */ int status; int lastRun = 0; //usually run from last event still valid // cpu_set_t mask; // int cpu = 1; //process thread (will be several in final version) factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc); /* CPU_ZERO initializes all the bits in the mask to zero. */ // CPU_ZERO (&mask); /* CPU_SET sets only the bit corresponding to cpu. */ // CPU_SET( 0 , &mask ); leave for system // CPU_SET( 1 , &mask ); used by write process // CPU_SET (2, &mask); // CPU_SET (3, &mask); // CPU_SET (4, &mask); // CPU_SET (5, &mask); // CPU_SET (6, &mask); // CPU_SET( 7 , &mask ); used by read process /* sched_setaffinity returns 0 in success */ // if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { // snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu); // factOut (kWarn, -1, str); // } pthread_t thread[100]; // int th_ret[100]; for (long long k = 0; k < gi_maxProc; k++) { /*th_ret[k] =*/ pthread_create (&thread[k], NULL, subProc, (void *) k); } // in case of 'exit' we still must process pending events while (g_runStat > -2) { int numWait = 0; int numProc = 0; for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { if (evtCtrl[k0].evtStat <= 90 || evtCtrl[k0].evtStat >= 1000) { if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat< 90) numWait++; continue; } //we are asked to flush buffers asap if (gi_resetR > 1) { evtCtrl[k0].evtStat = 9991; continue; } //-------- it is better to open the run already here, so call can be used to initialize //-------- buffers etc. needed to interprete run (e.g. DRS calibration) const uint32_t irun = evtCtrl[k0].runNum; const int32_t ievt = evtCtrl[k0].evNum; // Find entry in runCtrl which belongs to the event mBuffer[id] // (only check if there is a need to check) if (runCtrl[lastRun].runId != irun) { //check which fileID to use (or open if needed) int j; for (j=0;j=MAX_RUN) { factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, k0); // FIXME: What is the right action? (Flag event for deletion?) continue; } lastRun = j; } // File not yet open if (runCtrl[lastRun].fileId < 0) { //---- we need to open a new run ==> make sure all older runs are //---- finished and marked to be closed .... // This loop is unique to procEvt for (int j=0; j do no longer accept events for processing //---- problem: processing still going on ==> must wait for closing .... factPrintf(kInfo, -1, "procEvt: Finished run since new one opened %d", runCtrl[j].runId); runFinish1(runCtrl[j].runId); } } RUN_HEAD actRun; actRun.Version = 1; actRun.RunType = -1; //to be adapted actRun.Nroi = evtCtrl[k0].nRoi; //runCtrl[lastRun].roi0; actRun.NroiTM = evtCtrl[k0].nRoiTM; //runCtrl[lastRun].roi8; actRun.RunTime = evtCtrl[k0].pcTime[0]; //runCtrl[lastRun].firstTime; actRun.RunUsec = evtCtrl[k0].pcTime[1]; //runCtrl[lastRun].firstUsec; actRun.NBoard = NBOARDS; actRun.NPix = NPIX; actRun.NTm = NTMARK; memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS*sizeof(PEVNT_HEADER)); runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun)); if (runCtrl[lastRun].fileHd == NULL) { factPrintf(kError, 502, "procEvt: Could not open a file for run %d (runOpen failed)", irun); runCtrl[lastRun].fileId = 91; runCtrl[lastRun].procId = 91; // Is not set in writeEvt continue; } runCtrl[lastRun].fileId = 0; runCtrl[lastRun].procId = 0; // Is not set in writeEvt factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt); } //-------- also check if run shall be closed (==> skip event, but do not close the file !!! ) if (runCtrl[lastRun].procId == 0) { if (runCtrl[lastRun].closeTime < g_actTime || runCtrl[lastRun].lastTime < g_actTime - 300 || runCtrl[lastRun].maxEvt <= runCtrl[lastRun].procEvt) { factPrintf(kInfo, 502, "procEvt: Reached end of run condition for run %d", irun); runFinish1 (runCtrl[lastRun].runId); runCtrl[lastRun].procId = 1; } } // Skip event because of no active run if (runCtrl[lastRun].procId != 0) { evtCtrl[k0].evtStat = 9091; continue; } //-------- //-------- //and set correct event header ; also check for consistency in event (not yet) evtCtrl[k0].fEvent->Errors[0] = evtCtrl[k0].Errors[0]; evtCtrl[k0].fEvent->Errors[1] = evtCtrl[k0].Errors[1]; evtCtrl[k0].fEvent->Errors[2] = evtCtrl[k0].Errors[2]; evtCtrl[k0].fEvent->Errors[3] = evtCtrl[k0].Errors[3]; for (int ib=0; ibBoardTime[ib] = 0; } else { evtCtrl[k0].fEvent->BoardTime[ib] = evtCtrl[k0].FADhead[ib].time; } } const int rc = eventCheck(evtCtrl[k0].runNum, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent); //gi.procTot++; numProc++; if (rc < 0) { evtCtrl[k0].evtStat = 9999; //flag event to be skipped //gi.procErr++; } else { evtCtrl[k0].evtStat = 1000; runCtrl[lastRun].procEvt++; } } if (gj.readStat < -10 && numWait == 0) { //nothing left to do factPrintf(kInfo, -1, "Exit Processing Process ..."); gp_runStat = -22; //==> we should exit gj.procStat = -22; //==> we should exit return 0; } //seems we have nothing to do, so sleep a little if (numProc == 0) usleep(1); gp_runStat = gi_runStat; gj.procStat = gj.readStat; } //we are asked to abort asap ==> must flag all remaining events // when gi_runStat claims that all events are in the buffer... factPrintf(kInfo, -1, "Abort Processing Process ..."); for (int k = 0; k < gi_maxProc; k++) { pthread_join (thread[k], (void **) &status); } for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat < 1000) evtCtrl[k0].evtStat = 9800; //flag event as 'processed' } gp_runStat = -99; gj.procStat = -99; return 0; } /*-----------------------------------------------------------------*/ int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt) { /* close run runId (all all runs if runId=0) */ /* return: 0=close scheduled / >0 already closed / <0 does not exist */ int j; if (runId == 0) { for (j = 0; j < MAX_RUN; j++) { if (runCtrl[j].fileId == 0) { //run is open runCtrl[j].closeTime = closeTime; runCtrl[j].maxEvt = maxEvt; } } return 0; } for (j = 0; j < MAX_RUN; j++) { if (runCtrl[j].runId == runId) { if (runCtrl[j].fileId == 0) { //run is open runCtrl[j].closeTime = closeTime; runCtrl[j].maxEvt = maxEvt; return 0; } else if (runCtrl[j].fileId < 0) { //run not yet opened runCtrl[j].closeTime = closeTime; runCtrl[j].maxEvt = maxEvt; return +1; } else { // run already closed return +2; } } } //we only reach here if the run was never created return -1; } void checkAndCloseRun(int j, int irun, int cond, int where) { if (!cond && runCtrl[j].closeTime >= g_actTime && runCtrl[j].lastTime >= g_actTime - 300 && runCtrl[j].maxEvt > runCtrl[j].actEvt) return; //close run for whatever reason int ii = 0; if (cond) ii = 1; if (runCtrl[j].closeTime < g_actTime) ii |= 2; // = 2; if (runCtrl[j].lastTime < g_actTime - 300) ii |= 4; // = 3; if (runCtrl[j].maxEvt <= runCtrl[j].actEvt) ii |= 8; // = 4; if (runCtrl[j].procId == 0) { runFinish1(runCtrl[j].runId); runCtrl[j].procId = 92; } runCtrl[j].closeTime = g_actTime - 1; const int rc = runClose(runCtrl[j].fileHd, NULL, 0);//&runTail[j], sizeof(runTail[j])); if (rc<0) { factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)", where, runCtrl[j].runId, rc); runCtrl[j].fileId = 92+where*2; } else { factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)", where, irun, ii); runCtrl[j].fileId = 93+where*2; } } /*-----------------------------------------------------------------*/ void *writeEvt (void *ptr) { /* *** main loop writing event (including opening and closing run-files */ // cpu_set_t mask; // int cpu = 1; //write thread factPrintf(kInfo, -1, "Starting write-thread"); /* CPU_ZERO initializes all the bits in the mask to zero. */ // CPU_ZERO (&mask); /* CPU_SET sets only the bit corresponding to cpu. */ // CPU_SET (cpu, &mask); /* sched_setaffinity returns 0 in success */ // if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { // snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu); // } int lastRun = 0; //usually run from last event still valid while (g_runStat > -2) { int numWrite = 0; int numWait = 0; for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { if (evtCtrl[k0].evtStat <= 5000 || evtCtrl[k0].evtStat >= 9000) { if (evtCtrl[k0].evtStat > 0 && evtCtrl[k0].evtStat < 9000) numWait++; continue; } //we must drain the buffer asap if (gi_resetR > 1) { evtCtrl[k0].evtStat = 9904; continue; } const int id = k0;//evtCtrl[k0].mBuffer_idx; const uint32_t irun = evtCtrl[id].runNum; const int32_t ievt = evtCtrl[id].evNum; // Find entry in runCtrl which belongs to the event mBuffer[id] // (only check if there is a need to check) if (runCtrl[lastRun].runId != irun) { //check which fileID to use (or open if needed) int j; for (j=0;j=MAX_RUN) { factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, id); // FIXME: What is the right action? continue; } lastRun = j; } // File not yet open if (runCtrl[lastRun].fileId < 0) { RUN_HEAD actRun; actRun.Version = 1; actRun.RunType = -1; //to be adapted actRun.Nroi = evtCtrl[id].nRoi; //runCtrl[lastRun].roi0; actRun.NroiTM = evtCtrl[id].nRoiTM; //runCtrl[lastRun].roi8; actRun.RunTime = evtCtrl[id].pcTime[0];//runCtrl[lastRun].firstTime; actRun.RunUsec = evtCtrl[id].pcTime[1];//runCtrl[lastRun].firstUsec; actRun.NBoard = NBOARDS; actRun.NPix = NPIX; actRun.NTm = NTMARK; memcpy(actRun.FADhead, evtCtrl[id].FADhead, NBOARDS * sizeof (PEVNT_HEADER)); runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun)); if (runCtrl[lastRun].fileHd == NULL) { factPrintf(kError, 502, "writeEvt: Could not open a file for run %d (runOpen failed)", irun); runCtrl[lastRun].fileId = 91; continue; } runCtrl[lastRun].fileId = 0; factPrintf(kInfo, -1, "writeEvt: Opened new file for run %d (evt %d)", irun, ievt); } if (runCtrl[lastRun].fileId > 0) { // There is an event but file is already closed /* if (runCtrl[j].fileId < 100) { factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun); runCtrl[j].fileId += 100; }*/ evtCtrl[k0].evtStat = 9903; } // File is open if (runCtrl[lastRun].fileId==0) { const int rc = runWrite(runCtrl[lastRun].fileHd, evtCtrl[id].fEvent, sizeof (evtCtrl[id])); if (rc >= 0) { // Sucessfully wrote event runCtrl[lastRun].lastTime = g_actTime; runCtrl[lastRun].actEvt++; evtCtrl[k0].evtStat = 9901; } else { factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun); evtCtrl[k0].evtStat = 9902; } checkAndCloseRun(lastRun, irun, rc<0, 1); } } /* //check if we should close a run (mainly when no event pending) //ETIENNE but first figure out which one is the latest run with a complete event. //i.e. max run Id and lastEvt >= 0 //this condition is sufficient because all pending events were written already in the loop just above //actrun uint32_t lastStartedTime = 0; uint32_t runIdFound = 1; //If we have an active run, look for its start time if (actrun != 0) { runIdfound = 0; for (int j=0;j we should exit gj.writStat = -22; //==> we should exit break; } gw_runStat = gi_runStat; gj.writStat = gj.readStat; } factPrintf(kInfo, -1, "Close all open files ..."); for (int j=0; j 90) { factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc); gi_maxProc = 1; } //partially initialize event control logics evtCtrl_frstPtr = 0; evtCtrl_lastPtr = 0; //start all threads (more to come) when we are allowed to .... while (g_runStat == 0) { xwait.tv_sec = 0; xwait.tv_nsec = 10000000; // sleep for ~10 msec nanosleep (&xwait, NULL); } i = 0; /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL); i++; /*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL); i++; /*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL); i++; imax = i; #ifdef BILAND xwait.tv_sec = 30;; xwait.tv_nsec = 0; // sleep for ~20sec nanosleep (&xwait, NULL); printf ("close all runs in 2 seconds\n"); CloseRunFile (0, time (NULL) + 2, 0); xwait.tv_sec = 1;; xwait.tv_nsec = 0; // sleep for ~20sec nanosleep (&xwait, NULL); printf ("setting g_runstat to -1\n"); g_runStat = -1; #endif //wait for all threads to finish for (i = 0; i < imax; i++) { /*j =*/ pthread_join (thread[i], (void **) &status); } } /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ #ifdef BILAND int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, int8_t * buffer) { printf ("called subproc %d\n", threadID); return threadID + 1; } /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len) { return 1; }; int runWrite (FileHandle_t fileHd, EVENT * event, size_t len) { return 1; usleep (10000); return 1; } //{ return 1; } ; int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len) { return 1; }; int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event) { int i = 0; // printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) ); // for (i=0; i