// // // #define EVTDEBUG #define NUMSOCK 1 //set to 7 for old configuration #define MAXREAD 65536 //64kB wiznet buffer #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "EventBuilder.h" enum Severity { kMessage = 10, ///< Just a message, usually obsolete kInfo = 20, ///< An info telling something which can be interesting to know kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour kError = 40, ///< Error, something unexpected happened, but can still be handled by the program kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination kDebug = 99, ///< A message used for debugging only }; #define MIN_LEN 32 // min #bytes needed to interpret FADheader #define MAX_LEN 256*1024 // size of read-buffer per socket //#define nanosleep(x,y) extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len); extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len); extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len); //extern int runFinish (uint32_t runnr); extern void factOut (int severity, int err, char *message); extern void factReportIncomplete (uint64_t rep); extern void gotNewRun (int runnr, PEVNT_HEADER * headers); extern void factStat (GUI_STAT gj); extern void factStatNew (EVT_STAT gi); extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event); extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, int8_t * buffer); extern void debugHead (int i, int j, void *buf); extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, int state, uint32_t tsec, uint32_t tusec); extern void debugStream (int isock, void *buf, int len); int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt); int g_maxProc; int g_maxSize; int gi_maxSize; int gi_maxProc; uint g_actTime; uint g_actUsec; int g_runStat; int g_reset; int g_useFTM; int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX; size_t g_maxMem; //maximum memory allowed for buffer //no longer needed ... int g_maxBoards; //maximum number of boards to be initialized int g_actBoards; // FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd" int gi_runStat; int gp_runStat; int gw_runStat; //int gi_memStat = +1; uint32_t actrun = 0; uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards //uint gi_EvtStart= 0 ; //uint gi_EvtRead = 0 ; //uint gi_EvtBad = 0 ; //uint gi_EvtTot = 0 ; //size_t gi_usedMem = 0 ; //uint gw_EvtTot = 0 ; //uint gp_EvtTot = 0 ; PIX_MAP g_pixMap[NPIX]; EVT_STAT gi; GUI_STAT gj; EVT_CTRL evtCtrl; //control of events during processing int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space //#define MXSTR 1000 //char str[MXSTR]; void factPrintf(int severity, int id, const char *fmt, ...) { char str[1000]; va_list ap; va_start(ap, fmt); vsnprintf(str, 1000, fmt, ap); va_end(ap); factOut(severity, id, str); } #define THOMAS_MALLOC #ifdef THOMAS_MALLOC #define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER)) #define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM) typedef struct TGB_struct { struct TGB_struct *prev; void *mem; } TGB_entry; TGB_entry *tgb_last = NULL; uint64_t tgb_memory = 0; uint64_t tgb_inuse = 0; void *TGB_Malloc() { // No free slot available, next alloc would exceed max memory if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem) return NULL; // We will return this amount of memory tgb_inuse += MAX_TOT_MEM; // No free slot available, allocate a new one if (!tgb_last) { tgb_memory += MAX_TOT_MEM; return malloc(MAX_TOT_MEM); } // Get the next free slot from the stack and return it TGB_entry *last = tgb_last; void *mem = last->mem; tgb_last = last->prev; free(last); return mem; }; void TGB_free(void *mem) { // Add the last free slot to the stack TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry)); entry->prev = tgb_last; entry->mem = mem; tgb_last = entry; // Decrease the amont of memory in use accordingly tgb_inuse -= MAX_TOT_MEM; } #endif #ifdef ETIENNE_MALLOC //ETIENNE #define MAX_SLOTS_PER_CHUNK 100 typedef struct { int32_t eventNumber; int32_t chunk; int32_t slot; } CHUNK_MAPPING; CHUNK_MAPPING mBufferMapping[MAX_EVT * MAX_RUN]; #define MAX_EVT_MEM (sizeof(EVENT) + NPIX*1024*2 + NTMARK*1024*2) #define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER)) #define MAX_SLOT_SIZE (MAX_EVT_MEM + MAX_HEAD_MEM) #define MAX_CHUNK_SIZE (MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK) typedef struct { void * pointers[MAX_SLOTS_PER_CHUNK]; int32_t events[MAX_SLOTS_PER_CHUNK]; int32_t nFreeSlots; int32_t nSlots; } ETI_CHUNK; int32_t numAllocatedChunks = 0; #define MAX_CHUNKS 8096 ETI_CHUNK EtiMemoryChunks[MAX_CHUNKS]; void* ETI_Malloc(int evtId, int evtIndex) { for (int i=0;i 0) { for (int j=0;j= g_maxMem) return NULL; EtiMemoryChunks[numAllocatedChunks].pointers[0] = malloc(MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK); if (EtiMemoryChunks[numAllocatedChunks].pointers[0] == NULL) { factPrintf(kError, 0, "Allocation of %lu bytes failed. %d chunks are currently allocated (max allowed %lu bytes)", MAX_CHUNK_SIZE, numAllocatedChunks, g_maxMem); return NULL; } EtiMemoryChunks[numAllocatedChunks].nSlots = numNewSlots; EtiMemoryChunks[numAllocatedChunks].events[0] = evtId; EtiMemoryChunks[numAllocatedChunks].nFreeSlots = numNewSlots-1; mBufferMapping[evtIndex].eventNumber = evtId; mBufferMapping[evtIndex].chunk = numAllocatedChunks; mBufferMapping[evtIndex].slot = 0; for (int i=1;ievents[mBufferMapping[evtIndex].slot] != evtId) { factPrintf(kError, 0, "Mismatch in chunk mapping table. Expected evtId %d. Got %d. No memory was freed.", evtId, currentChunk->events[mBufferMapping[evtIndex].slot]); return; } currentChunk->events[mBufferMapping[evtIndex].slot] = -1; currentChunk->nFreeSlots++; return; /* TEST */ int chunkIndex = mBufferMapping[evtIndex].chunk; if (chunkIndex != numAllocatedChunks-1) return; while (EtiMemoryChunks[chunkIndex].nFreeSlots == EtiMemoryChunks[chunkIndex].nSlots) {//free this chunk if (EtiMemoryChunks[chunkIndex].pointers[0] == NULL) { factPrintf(kError, 0, "Chunk %d not allocated as it ought to be. Skipping memory release.", chunkIndex); return; } free(EtiMemoryChunks[chunkIndex].pointers[0]); EtiMemoryChunks[chunkIndex].pointers[0] = NULL; EtiMemoryChunks[chunkIndex].nSlots = 0; numAllocatedChunks--; chunkIndex--; if (numAllocatedChunks == 0) break; } } //END ETIENNE #endif RUN_CTRL runCtrl[MAX_RUN]; RUN_TAIL runTail[MAX_RUN]; /* *** Definition of rdBuffer to read in IP packets; keep it global !!!! */ typedef union { int8_t B[MAX_LEN]; int16_t S[MAX_LEN / 2]; int32_t I[MAX_LEN / 4]; int64_t L[MAX_LEN / 8]; } CNV_FACT; typedef struct { int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ... int32_t bufPos; //next byte to read to the buffer next int32_t bufLen; //number of bytes left to read // size_t bufLen; //number of bytes left to read size_t might be better int32_t skip; //number of bytes skipped before start of event int errCnt; //how often connect failed since last successful int sockStat; //-1 if socket not yet connected , 99 if not exist int socket; //contains the sockets struct sockaddr_in SockAddr; //IP for each socket int evtID; // event ID of event currently read int runID; // run " int ftmID; // event ID from FTM uint fadLen; // FADlength of event currently read int fadVers; // Version of FAD int ftmTyp; // trigger type int board; // boardID (softwareID: 0..40 ) int Port; CNV_FACT *rBuf; } READ_STRUCT; typedef union { int8_t B[2]; int16_t S; } SHORT_BYTE; SHORT_BYTE start, stop; READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ int runFinish1 (uint32_t runnr) { factPrintf(kInfo, 173, "Should finish(1) run %d (but not yet possible)", runnr); return 0; } int runFinish (uint32_t runnr) { factPrintf(kInfo, 173, "Should finish run %d (but not yet possible)", runnr); return 0; } int GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT * rs) { /* *** generate Address, create sockets and allocates readbuffer for it *** *** if flag==0 generate socket and buffer *** <0 destroy socket and buffer *** >0 close and redo socket *** *** sid : board*7 + port id */ int j; int optval = 1; //activate keepalive socklen_t optlen = sizeof (optval); if (sid % 7 >= NUMSOCK) { //this is a not used socket, so do nothing ... rs->sockStat = 77; rs->rBuf = NULL ; return 0; } if (rs->sockStat == 0) { //close socket if open j = close (rs->socket); if (j > 0) { factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno); } else { factPrintf(kInfo, 771, "Succesfully closed socket %d", sid); } } rs->sockStat = 99; if (flag < 0) { free (rs->rBuf); //and never open again rs->rBuf = NULL; return 0; } if (flag == 0) { //generate address and buffer ... rs->Port = port; rs->SockAddr.sin_family = sockAddr->sin_family; rs->SockAddr.sin_port = htons (port); rs->SockAddr.sin_addr = sockAddr->sin_addr; rs->rBuf = (CNV_FACT*)malloc (sizeof (CNV_FACT)); if (rs->rBuf == NULL) { factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid); rs->sockStat = 77; return -3; } } if ((rs->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno); rd->sockStat = 88; return -2; } optval = 1; if (setsockopt (rs->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) { factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } optval = 10; //start after 10 seconds if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) { factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } optval = 10; //do every 10 seconds if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) { factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } optval = 2; //close after 2 unsuccessful tries if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) { factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } factPrintf(kInfo, 773, "Successfully generated socket %d", sid); rs->sockStat = -1; //try to (re)open socket rs->errCnt = 0; return 0; } /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ int mBufInit () { // initialize mBuffer (mark all entries as unused\empty) uint32_t actime = g_actTime + 50000000; for (int i = 0; i < MAX_EVT * MAX_RUN; i++) { mBuffer[i].evNum = mBuffer[i].nRoi = -1; mBuffer[i].runNum = 0; evtCtrl.evtBuf[i] = -1; evtCtrl.evtStat[i] = -1; evtCtrl.pcTime[i] = actime; //initiate to far future #ifdef ETIENNE_MALLOC //ETIENNE mBufferMapping[i].chunk = -1; mBufferMapping[i].eventNumber = -1; mBufferMapping[i].slot = -1; //END ETIENNE #endif } #ifdef ETIENNE_MALLOC for (int j=0;j use this slot .... return i; } if (evFree < 0 && mBuffer[i].evNum < 0) evFree = i; i += MAX_EVT; } //event does not yet exist; create it if (evFree < 0) //no space available in ctrl { factPrintf(kError, 881, "No control slot to keep event %d", evID); return -1; } i = evFree; //found free entry; use it ... // FIXME: This should be the time of the first receiped board struct timeval tv; gettimeofday (&tv, NULL); const uint32_t tsec = tv.tv_sec; const uint32_t tusec = tv.tv_usec; //check if runId already registered in runCtrl evFree = -1; uint oldest = g_actTime + 1000; int jold = -1; int found = 0; // fileId==-2: not yet used or run assigned but not open // fileId== 0: file open // fileId>0: run closed for (int k = 0; k < MAX_RUN; k++) { // Check if run already registered (old entries should have runId==-1) if (runCtrl[k].runId == runID) { if (runCtrl[k].roi0 != nRoi[0] || runCtrl[k].roi8 != nRoi[8]) { factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)", runCtrl[k].roi0, runCtrl[k].roi8, nRoi[0], nRoi[8], runID, evID); return -9301; } found = 1; break; } // This is just for sanity. We use the oldest free entry (until // we have understood the concept and can use "just" a free entry if (runCtrl[k].runId==0 && runCtrl[k].closeTime < oldest) { oldest = runCtrl[k].closeTime; jold = k; } /* // Empty slot already found? if (evFree>=0) continue; */ /* if (runCtrl[k].fileId!=0) { evFree = k; continue; }*/ /* // Slot not yet used if (runCtrl[k].fileId < 0) { evFree = k; continue; } // Slot already closed if (runCtrl[k].fileId > 0 && runCtrl[k].closeTime < oldest) { oldest = runCtrl[k].closeTime; jold = k; }*/ } if (!found) // Run not yet registered, register run { if (evFree < 0 && jold < 0) { factPrintf(kFatal, 883, "Not able to register the new run %d", runID); return -1001; } if (evFree < 0) evFree = jold; factPrintf(kInfo, 503, "New run %d (evID=%d, evFree=%d) registered with roi=%d and roi_tm=%d", runID, evID, evFree, nRoi[0], nRoi[8]); runCtrl[evFree].runId = runID; runCtrl[evFree].roi0 = nRoi[0]; runCtrl[evFree].roi8 = nRoi[8]; runCtrl[evFree].fileId = -2; runCtrl[evFree].procId = -2; runCtrl[evFree].lastEvt = 1; // Number of events partially started to read runCtrl[evFree].actEvt = 0; // Number of written events (write) runCtrl[evFree].procEvt = 0; // Number of successfully checked events (checkEvent) runCtrl[evFree].maxEvt = 999999999; // max number events allowed runCtrl[evFree].firstTime = tsec; runCtrl[evFree].firstUsec = tusec; runCtrl[evFree].lastTime = tsec; // Time when the last event was written runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed runTail[evFree].nEventsOk = 0; runTail[evFree].nEventsRej = 0; runTail[evFree].nEventsBad = 0; runTail[evFree].PCtime0 = 0; runTail[evFree].PCtimeX = 0; } //flag all boards as unused mBuffer[i].nBoard = 0; for (int k = 0; k < NBOARDS; k++) mBuffer[i].board[k] = -1; mBuffer[i].pcTime[0] = tsec; mBuffer[i].pcTime[1] = tusec; mBuffer[i].nRoi = nRoi[0]; mBuffer[i].nRoiTM = nRoi[8]; mBuffer[i].evNum = evID; mBuffer[i].runNum = runID; mBuffer[i].fadNum = fadNum; mBuffer[i].trgNum = trgNum; mBuffer[i].trgTyp = trgTyp; mBuffer[i].Errors[0] = 0; mBuffer[i].Errors[1] = 0; mBuffer[i].Errors[2] = 0; mBuffer[i].Errors[3] = 0; mBuffer[i].fEvent = NULL; mBuffer[i].buffer = NULL; mBuffer[i].FADhead = NULL; /* #ifdef ETIENNE_MALLOC mBuffer[i].FADhead = ETI_Malloc(evID, i); mBuffer[i].buffer = NULL; if (mBuffer[i].FADhead != NULL) mBuffer[i].fEvent = (EVENT*)&(((char*)(mBuffer[i].FADhead))[MAX_HEAD_MEM]); else { mBuffer[i].fEvent = NULL; gj.usdMem = 0; for (int k=0;k gj.maxMem) gj.maxMem = gj.usdMem; else { factPrintf(kDebug, 882, "No memory left to keep event %6d sock %3d", evID, sk); } return -11; } #endif #ifdef STANDARD_MALLOC mBuffer[i].FADhead = malloc (MAX_HEAD_MEM+MAX_EVT_MEM); mBuffer[i].fEvent = NULL; mBuffer[i].buffer = NULL; if (mBuffer[i].FADhead == NULL) { factPrintf(kError, 882, "malloc header failed for event %d", evID); return -12; } mBuffer[i].fEvent = (void*)((char*)mBuffer[i].FADhead+MAX_HEAD_MEM); #endif #ifdef ETIENNE_MALLOC //ETIENNE //gj.usdMem += needmem + headmem + gi_maxSize; gj.usdMem = 0; for (int k=0;kStartPix[k] = -1; //flag all TMark as unused for (int k = 0; k < NTMARK; k++) mBuffer[i].fEvent->StartTM[k] = -1; mBuffer[i].fEvent->NumBoards = 0; mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime[0]; mBuffer[i].fEvent->PCUsec = mBuffer[i].pcTime[1]; } int mBufFree (int i) { //delete entry [i] from mBuffer: //(and make sure multiple calls do no harm ....) // int headmem = 0; // size_t freemem = 0; #ifdef ETIENNE_MALLOC int evid; evid = mBuffer[i].evNum; ETI_Free(evid, i); #endif // freemem = mBuffer[i].evtLen; //ETIENNE #ifdef THOMAS_MALLOC TGB_free(mBuffer[i].FADhead); #endif #ifdef STANDARD_MALLOC free (mBuffer[i].FADhead); #endif // free (mBuffer[i].fEvent); mBuffer[i].fEvent = NULL; // free (mBuffer[i].FADhead); mBuffer[i].FADhead = NULL; // free (mBuffer[i].buffer); mBuffer[i].buffer = NULL; //END ETIENNE // headmem = NBOARDS * sizeof (PEVNT_HEADER); mBuffer[i].evNum = mBuffer[i].nRoi = -1; mBuffer[i].runNum = 0; #ifdef ETIENNE_MALLOC //ETIENNE // gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize; gj.usdMem = 0; for (int k=0;k=0) // data received from that board { str[ik++] = '0'+(jb%10); continue; } // FIXME: Is that really 'b' or should that be 'ib' ? if (gi_NumConnect[ib]<=0) // board not connected { str[ik++] = 'x'; continue; } // data from this board lost str[ik++] = '.'; report |= ((uint64_t)1)<S[roiPtr]); for (int jr = 0; jr < 9; jr++) { roi[jr] = ntohs(rd[i].rBuf->S[roiPtr]); if (roi[jr]<0 || roi[jr]>1024) { factPrintf(kError, 999, "Illegal roi in channel %d (allowed: 0<=roi<=1024)", jr, roi[jr]); return 0; } // Check that the roi of pixels jr are compatible with the one of pixel 0 if (jr!=8 && roi[jr]!=roi[0]) { xjr = jr; break; } // Check that the roi of all other DRS chips on boards are compatible for (int kr = 1; kr < 4; kr++) { const int kroi = ntohs(rd[i].rBuf->S[roiPtr]); if (kroi != roi[jr]) { xjr = jr; xkr = kr; break; } roiPtr += kroi+4; } } if (xjr>=0) { if (xkr<0) factPrintf(kFatal, 1, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]); else factPrintf(kFatal, 1, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd[i].rBuf->S[roiPtr])); return 0; } //const int b = i / 7; /* if (roi[0]<0 || roi[0] > 1024) { factPrintf(kError, 999, "Illegal roi in channel 0: %d (allowed: 0<=roi<=1024)", roi[0]); gj.badRoiR++; gj.badRoi[b]++; return 0; } */ /* for (int jr = 1; jr < 8; jr++) { if (roi[jr] != roi[0]) { factPrintf(kError, 711, "Mismatch of roi (%d) in channel %d with roi (%d) in channel 0.", roi[jr], jr, roi[0]); gj.badRoiB++; gj.badRoi[b]++; return 0; } } */ if (roi[8] < roi[0]) { factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]); //gj.badRoiB++; //gj.badRoi[b]++; return 0; } return 1; } void swapEventHeaderBytes(int i) { //End of the header. to channels now int eStart = 36; for (int ePatchesCount = 0; ePatchesCount<4*9;ePatchesCount++) { rd[i].rBuf->S[eStart+0] = ntohs(rd[i].rBuf->S[eStart+0]);//id rd[i].rBuf->S[eStart+1] = ntohs(rd[i].rBuf->S[eStart+1]);//start_cell rd[i].rBuf->S[eStart+2] = ntohs(rd[i].rBuf->S[eStart+2]);//roi rd[i].rBuf->S[eStart+3] = ntohs(rd[i].rBuf->S[eStart+3]);//filling eStart += 4+rd[i].rBuf->S[eStart+2];//skip the pixel data } } void copyData(int i, int evID, /*int roi,*/ int boardId) { swapEventHeaderBytes(i); memcpy(&mBuffer[evID].FADhead[boardId].start_package_flag, &rd[i].rBuf->S[0], sizeof(PEVNT_HEADER)); int src = sizeof(PEVNT_HEADER) / 2; // consistency of ROIs have been checked already (is it all correct?) const int roi = rd[i].rBuf->S[src+2]; // different sort in FAD board..... for (int px = 0; px < 9; px++) { for (int drs = 0; drs < 4; drs++) { // pixH = rd[i].rBuf->S[src++]; // ID src++; const int pixC = rd[i].rBuf->S[src++]; // start-cell const int pixR = rd[i].rBuf->S[src++]; // roi //here we should check if pixH is correct .... const int pixS = boardId * 36 + drs * 9 + px; src++; mBuffer[evID].fEvent->StartPix[pixS] = pixC; const int dest1 = pixS * roi; memcpy(&mBuffer[evID].fEvent->Adc_Data[dest1], &rd[i].rBuf->S[src], roi * 2); src += pixR; if (px == 8) { const int tmS = boardId * 4 + drs; //and we have additional TM info if (pixR > roi) { const int dest2 = tmS * roi + NPIX * roi; const int srcT = src - roi; mBuffer[evID].fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024; memcpy(&mBuffer[evID].fEvent->Adc_Data[dest2], &rd[i].rBuf->S[srcT], roi * 2); } else { mBuffer[evID].fEvent->StartTM[tmS] = -1; //ETIENNE because the TM channels are always processed during drs calib, //set them to zero if they are not present //I suspect that it may be more efficient to set all the allocated mem to //zero when allocating it // dest = tmS*roi[0] + NPIX*roi[0]; // bzero(&mBuffer[evID].fEvent->Adc_Data[dest],roi[0]*2); } } } } } void initReadFAD () { return; } /*-----------------------------------------------------------------*/ //struct rnd //{ // int val; // int idx; //}; // //struct rnd random_arr[MAX_SOCK]; // //int compare(const void *f1, const void *f2) //{ // struct rnd *r1 = (struct rnd*)f1; // struct rnd *r2 = (struct rnd*)f2; // return r1->val - r2->val; //} void *readFAD (void *ptr) { /* *** main loop reading FAD data and sorting them to complete events */ factPrintf(kInfo, -1, "Start initializing (readFAD)"); // int cpu = 7; //read thread // cpu_set_t mask; /* CPU_ZERO initializes all the bits in the mask to zero. */ // CPU_ZERO (&mask); /* CPU_SET sets only the bit corresponding to cpu. */ // cpu = 7; // CPU_SET (cpu, &mask); /* sched_setaffinity returns 0 in success */ // if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { // snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu); // factOut (kWarn, -1, str); // } const int minLen = sizeof(PEVNT_HEADER); //min #bytes needed to check header: full header for debug start.S = 0xFB01; stop.S = 0x04FE; /* initialize run control logics */ for (int i = 0; i < MAX_RUN; i++) { runCtrl[i].runId = 0; runCtrl[i].fileId = -2; runCtrl[i].procId = -2; } gi_resetS = gi_resetR = 9; int sockDef[NBOARDS]; //internal state of sockets memset(sockDef, 0, NBOARDS*sizeof(int)); START: evtCtrl.frstPtr = 0; evtCtrl.lastPtr = 0; //time in seconds uint gi_SecTime = time(NULL);; const int cntsock = 8 - NUMSOCK ; if (gi_resetS > 0) { //make sure all sockets are preallocated as 'not exist' for (int i = 0; i < MAX_SOCK; i++) { rd[i].socket = -1; rd[i].sockStat = 99; } for (int k = 0; k < NBOARDS; k++) { gi_NumConnect[k] = 0; //gi.numConn[k] = 0; gj.numConn[k] = 0; //gj.errConn[k] = 0; gj.rateBytes[k] = 0; gj.totBytes[k] = 0; } } if (gi_resetR > 0) { //resetEvtStat (); gj.bufTot = gj.maxEvt = gj.xxxEvt = 0; gj.usdMem = gj.maxMem = gj.xxxMem = 0; #ifdef THOMAS_MALLOC gj.totMem = tgb_memory; #else gj.totMem = g_maxMem; #endif gj.bufNew = gj.bufEvt = 0; //gj.badRoiE = gj.badRoiR = gj.badRoiB = 0; gj.evtSkip = gj.evtWrite = gj.evtErr = 0; //for (int b = 0; b < NBOARDS; b++) // gj.badRoi[b] = 0; mBufInit (); //initialize buffers factPrintf(kInfo, -1, "End initializing (readFAD)"); } gi_reset = gi_resetR = gi_resetS = gi_resetW = 0; //loop until global variable g_runStat claims stop while (g_runStat >= 0 && g_reset == 0) { gi_runStat = g_runStat; gj.readStat = g_runStat; struct timeval tv; gettimeofday (&tv, NULL); g_actTime = tv.tv_sec; g_actUsec = tv.tv_usec; for (int b = 0; b < NBOARDS; b++) { // Nothing has changed if (g_port[b].sockDef == sockDef[b]) continue; gi_NumConnect[b] = 0; //must close all connections //gi.numConn[b] = 0; gj.numConn[b] = 0; // s0 = 0: sockets to be defined and opened // s0 = -1: sockets to be destroyed // s0 = +1: sockets to be closed and reopened int s0 = 0; if (sockDef[b] != 0) s0 = g_port[b].sockDef==0 ? -1 : +1; const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0; int k = b * 7; for (int p = p0 + 1; p < p0 + 8; p++, k++) GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket sockDef[b] = g_port[b].sockDef; } // count the number of active boards int actBoards = 0; for (int b = 0; b < NBOARDS; b++) if (sockDef[b] > 0) actBoards++; //count number of succesfull actions // int numok = 0; /* for (i=0; i 0) continue; const int board = i / 7 ; //const int p = i % 7 ; if (rd[i].sockStat == -1) { //try to connect if not yet done rd[i].sockStat = connect (rd[i].socket, (struct sockaddr *) &rd[i].SockAddr, sizeof (rd[i].SockAddr)); // Failed if (rd[i].sockStat == -1) { rd[i].errCnt++; usleep(25000); continue; } // Success (rd[i].sockStat == 0) if (sockDef[board] > 0) { rd[i].bufTyp = 0; // expect a header rd[i].bufLen = sizeof(PEVNT_HEADER); // max size to read at begining } else { rd[i].bufTyp = -1; // full data to be skipped rd[i].bufLen = MAX_LEN; // huge for skipping } rd[i].bufPos = 0; // no byte read so far rd[i].skip = 0; // start empty gi_NumConnect[board] += cntsock; //gi.numConn[b]++; gj.numConn[board]++; factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", board, gj.numConn[board]); } // Do not read from this socket if (rd[i].bufLen<0) continue; //numok++; if (rd[i].bufLen>0) { const int32_t jrd = recv(rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT); // recv failed if (jrd<0) { // There was just nothing waiting if (errno==EWOULDBLOCK || errno==EAGAIN) { //numok--; continue; } factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno); //gi.gotErr[b]++; continue; } // connection was closed ... if (jrd==0) { factPrintf(kInfo, 441, "Socket %d closed by FAD", i); const int s0 = sockDef[board] > 0 ? +1 : -1; GenSock(s0, i, 0, NULL, &rd[i]); //gi.gotErr[b]++; gi_NumConnect[board]-= cntsock ; //gi.numConn[b]--; gj.numConn[board]--; continue; } // Success (jrd > 0) gj.rateBytes[board] += jrd; // are we skipping this board ... if (rd[i].bufTyp < 0) continue; rd[i].bufPos += jrd; //==> prepare for continuation rd[i].bufLen -= jrd; #ifdef EVTDEBUG debugRead(i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, rd[i].bufTyp, tv.tv_sec, tv.tv_usec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event #endif } //we are reading event header if (rd[i].bufTyp <= 0) { //not yet sufficient data to take action if (rd[i].bufPos < minLen) continue; //check if startflag correct; else shift block .... // FIXME: This is not enough... this combination of // bytes can be anywhere... at least the end bytes // must be checked somewhere, too. int k; for (k = 0; k < rd[i].bufPos - 1; k++) { if (rd[i].rBuf->B[k] == start.B[1] && rd[i].rBuf->B[k+1] == start.B[0]) break; } rd[i].skip += k; //no start of header found if (k >= rd[i].bufPos - 1) { rd[i].bufPos = 0; rd[i].bufLen = sizeof(PEVNT_HEADER); continue; } if (k > 0) { rd[i].bufPos -= k; rd[i].bufLen += k; memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos); } if (rd[i].bufPos < minLen) continue; if (rd[i].skip > 0) { factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i); rd[i].skip = 0; } // TGB: This needs much more checks than just the first two bytes! // Swap everything except start_package_flag. // It is to difficult to find out where it is used how, // but it doesn't really matter because it is not really // used anywehere else // rd[i].rBuf->S[1] = ntohs(rd[i].rBuf->S[1]); // package_length rd[i].rBuf->S[2] = ntohs(rd[i].rBuf->S[2]); // version_no rd[i].rBuf->S[3] = ntohs(rd[i].rBuf->S[3]); // PLLLCK rd[i].rBuf->S[4] = ntohs(rd[i].rBuf->S[4]); // trigger_crc rd[i].rBuf->S[5] = ntohs(rd[i].rBuf->S[5]); // trigger_type rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]); // board id rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]); // adc_clock_phase_shift rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]); // number_of_triggers_to_generate rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]); // trigger_generator_prescaler rd[i].rBuf->I[3] = ntohl(rd[i].rBuf->I[3]); // trigger_id rd[i].rBuf->I[4] = ntohl(rd[i].rBuf->I[4]); // fad_evt_counter rd[i].rBuf->I[5] = ntohl(rd[i].rBuf->I[5]); // REFCLK_frequency rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]); // runnumber; rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]); // time; for (int s=24;s<24+NTemp+NDAC;s++) rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac rd[i].fadLen = ntohs(rd[i].rBuf->S[1]) * 2; rd[i].fadVers = rd[i].rBuf->S[2]; rd[i].ftmTyp = rd[i].rBuf->S[5]; rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt) rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt) rd[i].runID = rd[i].rBuf->I[11]==0 ? (int)g_actTime : rd[i].rBuf->I[11]; rd[i].bufTyp = 1; //ready to read full record rd[i].bufLen = rd[i].fadLen - rd[i].bufPos; const int fadBoard = rd[i].rBuf->S[12]; debugHead(i, fadBoard, rd[i].rBuf); continue; } // are we reading data // not yet all read if (rd[i].bufLen > 0) continue; if (rd[i].rBuf->B[rd[i].fadLen - 1] != stop.B[0] || rd[i].rBuf->B[rd[i].fadLen - 2] != stop.B[1]) { //gi.evtErr++; factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), expected %3d %3d, got %3d %3d", i, rd[i].evtID, rd[i].fadLen, stop.B[0], stop.B[1], rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]); // ready to read next header rd[i].bufTyp = 0; rd[i].bufLen = sizeof(PEVNT_HEADER); rd[i].bufPos = 0; continue; } // int actid; // if (g_useFTM > 0) // actid = rd[i].evtID; // else // actid = rd[i].ftmID; //get index into mBuffer for this event (create if needed) const int evID = mBufEvt(i); // no free entry in mBuffer, retry later if (evID == -1) continue; // We have a valid entry, but no memory has yet been allocated if (evID >= 0 && mBuffer[evID].FADhead == NULL) { // Try to get memory from the big buffer mBuffer[evID].FADhead = (PEVT_HEADER*)TGB_Malloc(); if (mBuffer[evID].FADhead == NULL) { // If this works properly, this is a hack which can be removed, or // replaced by a signal or dim message if (rd[i].bufTyp==2) factPrintf(kError, 882, "malloc failed for event %d", evID); rd[i].bufTyp = 2; continue; } // Initialice mBuffer[evID]->fEvent initEvent(evID); // Some statistics gj.usdMem = tgb_inuse; if (gj.usdMem > gj.maxMem) gj.maxMem = gj.usdMem; gj.rateNew++; gj.bufTot++; if (gj.bufTot > gj.maxEvt) gj.maxEvt = gj.bufTot; //register event in 'active list (reading)' evtIdx[evID] = evtCtrl.lastPtr; evtCtrl.evtBuf[evtCtrl.lastPtr] = evID; evtCtrl.evtStat[evtCtrl.lastPtr] = 0; evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime; evtCtrl.lastPtr++; evtCtrl.lastPtr %= MAX_EVT * MAX_RUN; } // ready to read next header rd[i].bufTyp = 0; rd[i].bufLen = sizeof(PEVNT_HEADER); rd[i].bufPos = 0; // Fatal error occured. Event cannot be processed. Skip it. Start reading next header. if (evID < -1000) continue; //we have a valid entry in mBuffer[]; fill it const int fadBoard = rd[i].rBuf->S[12]; const int fadCrate = fadBoard>>8; if (board != (fadCrate * 10 + (fadBoard&0xff))) { factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)", board, fadBoard, fadCrate, fadBoard&0xff); } if (mBuffer[evID].board[board] != -1) { factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d", evID, board, i, rd[i].fadLen, rd[i].rBuf->B[0], rd[i].rBuf->B[1], rd[i].rBuf->B[rd[i].fadLen - 2], rd[i].rBuf->B[rd[i].fadLen - 1]); continue; // Continue reading next header } // Copy data from rd[i] to mBuffer[evID] copyData(i, evID, board); // now we have stored a new board contents into Event structure const int iDx = evtIdx[evID]; //index into evtCtrl mBuffer[evID].fEvent->NumBoards++; mBuffer[evID].board[board] = board; mBuffer[evID].nBoard++; evtCtrl.evtStat[iDx] = mBuffer[evID].nBoard; evtCtrl.pcTime[iDx] = g_actTime; // have we already reported first (partial) event of this run ??? if (mBuffer[evID].nBoard==1 && mBuffer[evID].runNum != actrun) { // Signal the fadctrl that a new run has been started gotNewRun(mBuffer[evID].runNum, NULL); factPrintf(kInfo, 1, "gotNewRun called, prev run %d, new run %d, event %d", actrun, mBuffer[evID].runNum, mBuffer[evID].evNum); for (int j=0; j0) runCtrl[ir].runId = 0; } // Flag that the event is ready for processing evtCtrl.evtStat[iDx] = 99; } // end for loop over all sockets g_actTime = time (NULL); if (g_actTime <= gi_SecTime) { usleep(1); continue; } gi_SecTime = g_actTime; gj.bufNew = gj.bufEvt = 0; //loop over all active events and flag those older than read-timeout //delete those that are written to disk .... for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { if (k0==evtCtrl.frstPtr && evtCtrl.evtStat[k0]<0) { evtCtrl.frstPtr++; evtCtrl.frstPtr %= MAX_EVT * MAX_RUN; // Continue because evtCtrl.evtStat[k0] must be <0 continue; } // Check the more likely case first: incomplete events if (evtCtrl.evtStat[k0]>0 && evtCtrl.evtStat[k0]<92) { gj.bufNew++; //incomplete event in Buffer // Event has not yet timed out or was reported already if (evtCtrl.evtStat[k0]>=90 || evtCtrl.pcTime[k0]>=g_actTime - 30) continue; reportIncomplete(k0); //timeout for incomplete events evtCtrl.evtStat[k0] = 91; gj.evtSkip++; continue; } // complete event in Buffer if (evtCtrl.evtStat[k0] >= 95) gj.bufEvt++; // Check the less likely case: 'useless' or 'delete' if (evtCtrl.evtStat[k0]==0 || evtCtrl.evtStat[k0] >= 9000) { const int id = evtCtrl.evtBuf[k0]; #ifdef EVTDEBUG factPrintf(kDebug, -1, "%5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard); #endif mBufFree (id); //event written--> free memory evtCtrl.evtStat[k0] = -1; gj.evtWrite++; gj.rateWrite++; } } gj.deltaT = 1000; //temporary, must be improved for (int ib = 0; ib < NBOARDS; ib++) gj.totBytes[ib] += gj.rateBytes[ib]; #ifdef THOMAS_MALLOC gj.totMem = tgb_memory; #else gj.totMem = g_maxMem; #endif if (gj.maxMem > gj.xxxMem) gj.xxxMem = gj.maxMem; if (gj.maxEvt > gj.xxxEvt) gj.xxxEvt = gj.maxEvt; factStat (gj); factStatNew (gi); gj.rateNew = gj.rateWrite = 0; gj.maxMem = gj.usdMem; gj.maxEvt = gj.bufTot; for (int b = 0; b < NBOARDS; b++) gj.rateBytes[b] = 0; } // while (g_runStat >= 0 && g_reset == 0) factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset); if (g_reset > 0) { gi_reset = g_reset; gi_resetR = gi_reset % 10; //shall we stop reading ? gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ? gi_resetW = (gi_reset / 100) % 10; //shall we close files ? gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ? g_reset = 0; } else { gi_reset = 0; gi_resetR = g_runStat == -1 ? 1 : 7; gi_resetS = 7; //close all sockets gi_resetW = 7; //close all files gi_resetX = 0; //inform others we have to quit .... gi_runStat = -11; //inform all that no update to happen any more gj.readStat = -11; //inform all that no update to happen any more } if (gi_resetS > 0) { //must close all open sockets ... factPrintf(kInfo, -1, "Close all sockets..."); for (int i = 0; i < MAX_SOCK; i++) { if (rd[i].sockStat != 0) continue; GenSock(-1, i, 0, NULL, &rd[i]); //close and destroy open socket if (i%7) continue; gi_NumConnect[i / 7]-= cntsock ; //gi.numConn[i / 7]--; gj.numConn[i / 7]--; sockDef[i / 7] = 0; //flag ro recreate the sockets ... rd[i / 7].sockStat = -1; //and try to open asap } } if (gi_resetR > 0) { //flag all events as 'read finished' for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) { evtCtrl.evtStat[k0] = 91; //gi.evtSkp++; //gi.evtTot++; } } //and clear all buffers (might have to wait until all others are done) // If minclear is 0, an event could be deleted while writing is still ongoing /* int minclear; if (gi_resetR == 1) { minclear = 900; factPrintf(kInfo, -1, "Drain all buffers ..."); } else { minclear = 0; factPrintf(kInfo, -1, "Flush all buffers ..."); }*/ const int minclear = 900; int numclear = 1; while (numclear > 0) { numclear = 0; for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { if (evtCtrl.evtStat[k0] > minclear) { const int id = evtCtrl.evtBuf[k0]; #ifdef EVTDEBUG factPrintf(kDebug, -1, "ev %5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard); #endif mBufFree (id); //event written--> free memory evtCtrl.evtStat[k0] = -1; } else if (evtCtrl.evtStat[k0] > 0) numclear++; //writing is still ongoing... if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) { evtCtrl.frstPtr++; evtCtrl.frstPtr %= MAX_EVT * MAX_RUN; } } usleep(1); } } if (gi_reset > 0) { if (gi_resetW > 0) CloseRunFile (0, 0, 0); //ask all Runs to be closed if (gi_resetX > 0) { struct timespec xwait; xwait.tv_sec = gi_resetX; xwait.tv_nsec = 0; nanosleep (&xwait, NULL); } factPrintf(kInfo, -1, "Continue read Process ..."); gi_reset = 0; goto START; } factPrintf(kInfo, -1, "Exit read Process..."); #ifdef THOMAS_MALLOC factPrintf(kInfo, -1, "%ld Bytes flaged as in-use.", tgb_inuse); #endif gi_runStat = -99; gj.readStat = -99; factStat (gj); factStatNew (gi); return 0; } /*-----------------------------------------------------------------*/ void *subProc(void *thrid) { const int64_t threadID = (int64_t)thrid; factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID); while (g_runStat > -2) //in case of 'exit' we still must process pending events { int numWait = 0; int numProc = 0; for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { if (!(evtCtrl.evtStat[k0] == 1000 + threadID)) { if (evtCtrl.evtStat[k0] < 1000 + threadID) numWait++; continue; } /*** if (evtCtrl.evtStat[k0] == 1000 + threadID) ****/ int jret = 9100; // flag to be deleted (gi_resetR>1 : flush buffers asap) if (gi_resetR<=1) { const int id = evtCtrl.evtBuf[k0]; jret = subProcEvt(threadID, mBuffer[id].FADhead, mBuffer[id].fEvent, mBuffer[id].buffer); if (jret <= threadID) { factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret); jret = 5300; } else if (jret <= 0) jret = 9200 + threadID; // flag as 'to be deleted' else if (jret >= gi_maxProc) jret = 5200 + threadID; // flag as 'to be written' else jret = 1000 + jret; // flag for next proces } evtCtrl.evtStat[k0] = jret; numProc++; } if (gj.readStat < -10 && numWait == 0) { //nothing left to do factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID); return 0; } //seems we have nothing to do, so sleep a little if (numProc == 0) usleep(1); } factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID); return 0; } /*-----------------------------------------------------------------*/ void * procEvt (void *ptr) { /* *** main loop processing file, including SW-trigger */ int status; int lastRun = 0; //usually run from last event still valid // cpu_set_t mask; // int cpu = 1; //process thread (will be several in final version) factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc); /* CPU_ZERO initializes all the bits in the mask to zero. */ // CPU_ZERO (&mask); /* CPU_SET sets only the bit corresponding to cpu. */ // CPU_SET( 0 , &mask ); leave for system // CPU_SET( 1 , &mask ); used by write process // CPU_SET (2, &mask); // CPU_SET (3, &mask); // CPU_SET (4, &mask); // CPU_SET (5, &mask); // CPU_SET (6, &mask); // CPU_SET( 7 , &mask ); used by read process /* sched_setaffinity returns 0 in success */ // if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { // snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu); // factOut (kWarn, -1, str); // } pthread_t thread[100]; // int th_ret[100]; for (long long k = 0; k < gi_maxProc; k++) { /*th_ret[k] =*/ pthread_create (&thread[k], NULL, subProc, (void *) k); } // in case of 'exit' we still must process pending events while (g_runStat > -2) { int numWait = 0; int numProc = 0; for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { if (evtCtrl.evtStat[k0] <= 90 || evtCtrl.evtStat[k0] >= 1000) { if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) numWait++; continue; } //we are asked to flush buffers asap if (gi_resetR > 1) { evtCtrl.evtStat[k0] = 9991; continue; } //-------- it is better to open the run already here, so call can be used to initialize //-------- buffers etc. needed to interprete run (e.g. DRS calibration) const int id = evtCtrl.evtBuf[k0]; const uint32_t irun = mBuffer[id].runNum; const int32_t ievt = mBuffer[id].evNum; // Find entry in runCtrl which belongs to the event mBuffer[id] // (only check if there is a need to check) if (runCtrl[lastRun].runId != irun) { //check which fileID to use (or open if needed) int j; for (j=0;j=MAX_RUN) { factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, id); // FIXME: What is the right action? (Flag event for deletion?) continue; } lastRun = j; } // File not yet open if (runCtrl[lastRun].fileId < 0) { //---- we need to open a new run ==> make sure all older runs are //---- finished and marked to be closed .... // This loop is unique to procEvt for (int j=0; j do no longer accept events for processing //---- problem: processing still going on ==> must wait for closing .... factPrintf(kInfo, -1, "procEvt: Finished run since new one opened %d", runCtrl[j].runId); runFinish1(runCtrl[j].runId); } } RUN_HEAD actRun; actRun.Version = 1; actRun.RunType = -1; //to be adapted actRun.Nroi = runCtrl[lastRun].roi0; actRun.NroiTM = runCtrl[lastRun].roi8; //ETIENNE don't reset it to zero as it is taken care of in DataWriteFits // if (actRun.Nroi == actRun.NroiTM) // actRun.NroiTM = 0; actRun.RunTime = runCtrl[lastRun].firstTime; actRun.RunUsec = runCtrl[lastRun].firstUsec; actRun.NBoard = NBOARDS; actRun.NPix = NPIX; actRun.NTm = NTMARK; actRun.Nroi = mBuffer[id].nRoi; memcpy(actRun.FADhead, mBuffer[id].FADhead, NBOARDS*sizeof(PEVNT_HEADER)); runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun)); if (runCtrl[lastRun].fileHd == NULL) { factPrintf(kError, 502, "procEvt: Could not open a file for run %d (runOpen failed)", irun); runCtrl[lastRun].fileId = 91; runCtrl[lastRun].procId = 91; // Is not set in writeEvt continue; } runCtrl[lastRun].fileId = 0; runCtrl[lastRun].procId = 0; // Is not set in writeEvt factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt); } //-------- also check if run shall be closed (==> skip event, but do not close the file !!! ) if (runCtrl[lastRun].procId == 0) { if (runCtrl[lastRun].closeTime < g_actTime || runCtrl[lastRun].lastTime < g_actTime - 300 || runCtrl[lastRun].maxEvt <= runCtrl[lastRun].procEvt) { factPrintf(kInfo, 502, "procEvt: Reached end of run condition for run %d", irun); runFinish1 (runCtrl[lastRun].runId); runCtrl[lastRun].procId = 1; } } // Skip event because of no active run if (runCtrl[lastRun].procId != 0) { evtCtrl.evtStat[k0] = 9091; continue; } //-------- //-------- const int roi = mBuffer[id].nRoi; //const int roiTM = mBuffer[id].nRoiTM; //make sure unused pixels/tmarks are cleared to zero //ETIENNE don't reset it to zero as it is taken care of in DataWriteFits // if (roiTM == roi) // roiTM = 0; for (int ip=0; ipStartPix[ip] == -1) { const int dest = ip * roi; memset(&mBuffer[id].fEvent->Adc_Data[dest], 0, roi * 2); } } for (int it=0; itStartTM[it] == -1) { const int dest = it * roi + NPIX * roi; memset(&mBuffer[id].fEvent->Adc_Data[dest], 0, roi * 2); } } //and set correct event header ; also check for consistency in event (not yet) mBuffer[id].fEvent->Roi = mBuffer[id].nRoi; mBuffer[id].fEvent->RoiTM = mBuffer[id].nRoiTM; mBuffer[id].fEvent->EventNum = mBuffer[id].evNum; mBuffer[id].fEvent->TriggerNum = mBuffer[id].trgNum; mBuffer[id].fEvent->TriggerType = mBuffer[id].trgTyp; mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0]; mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1]; mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2]; mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3]; mBuffer[id].fEvent->SoftTrig = 0; for (int ib=0; ibBoardTime[ib] = 0; } else { mBuffer[id].fEvent->BoardTime[ib] = mBuffer[id].FADhead[ib].time; } } const int rc = eventCheck(mBuffer[id].runNum, mBuffer[id].FADhead, mBuffer[id].fEvent); //gi.procTot++; numProc++; if (rc < 0) { evtCtrl.evtStat[k0] = 9999; //flag event to be skipped //gi.procErr++; } else { evtCtrl.evtStat[k0] = 1000; runCtrl[lastRun].procEvt++; } } if (gj.readStat < -10 && numWait == 0) { //nothing left to do factPrintf(kInfo, -1, "Exit Processing Process ..."); gp_runStat = -22; //==> we should exit gj.procStat = -22; //==> we should exit return 0; } //seems we have nothing to do, so sleep a little if (numProc == 0) usleep(1); gp_runStat = gi_runStat; gj.procStat = gj.readStat; } //we are asked to abort asap ==> must flag all remaining events // when gi_runStat claims that all events are in the buffer... factPrintf(kInfo, -1, "Abort Processing Process ..."); for (int k = 0; k < gi_maxProc; k++) { pthread_join (thread[k], (void **) &status); } for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) evtCtrl.evtStat[k0] = 9800; //flag event as 'processed' } gp_runStat = -99; gj.procStat = -99; return 0; } /*-----------------------------------------------------------------*/ int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt) { /* close run runId (all all runs if runId=0) */ /* return: 0=close scheduled / >0 already closed / <0 does not exist */ int j; if (runId == 0) { for (j = 0; j < MAX_RUN; j++) { if (runCtrl[j].fileId == 0) { //run is open runCtrl[j].closeTime = closeTime; runCtrl[j].maxEvt = maxEvt; } } return 0; } for (j = 0; j < MAX_RUN; j++) { if (runCtrl[j].runId == runId) { if (runCtrl[j].fileId == 0) { //run is open runCtrl[j].closeTime = closeTime; runCtrl[j].maxEvt = maxEvt; return 0; } else if (runCtrl[j].fileId < 0) { //run not yet opened runCtrl[j].closeTime = closeTime; runCtrl[j].maxEvt = maxEvt; return +1; } else { // run already closed return +2; } } } //we only reach here if the run was never created return -1; } void checkAndCloseRun(int j, int irun, int cond, int where) { if (!cond && runCtrl[j].closeTime >= g_actTime && runCtrl[j].lastTime >= g_actTime - 300 && runCtrl[j].maxEvt > runCtrl[j].actEvt) return; //close run for whatever reason int ii = 0; if (cond) ii = 1; if (runCtrl[j].closeTime < g_actTime) ii |= 2; // = 2; if (runCtrl[j].lastTime < g_actTime - 300) ii |= 4; // = 3; if (runCtrl[j].maxEvt <= runCtrl[j].actEvt) ii |= 8; // = 4; if (runCtrl[j].procId == 0) { runFinish1(runCtrl[j].runId); runCtrl[j].procId = 92; } runCtrl[j].closeTime = g_actTime - 1; const int rc = runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j])); if (rc<0) { factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)", where, runCtrl[j].runId, rc); runCtrl[j].fileId = 92+where*2; } else { factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)", where, irun, ii); runCtrl[j].fileId = 93+where*2; } } /*-----------------------------------------------------------------*/ void *writeEvt (void *ptr) { /* *** main loop writing event (including opening and closing run-files */ // cpu_set_t mask; // int cpu = 1; //write thread factPrintf(kInfo, -1, "Starting write-thread"); /* CPU_ZERO initializes all the bits in the mask to zero. */ // CPU_ZERO (&mask); /* CPU_SET sets only the bit corresponding to cpu. */ // CPU_SET (cpu, &mask); /* sched_setaffinity returns 0 in success */ // if (sched_setaffinity (0, sizeof (mask), &mask) == -1) { // snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu); // } int lastRun = 0; //usually run from last event still valid while (g_runStat > -2) { int numWrite = 0; int numWait = 0; for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN) { if (evtCtrl.evtStat[k0] <= 5000 || evtCtrl.evtStat[k0] >= 9000) { if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000) numWait++; continue; } //we must drain the buffer asap if (gi_resetR > 1) { evtCtrl.evtStat[k0] = 9904; continue; } const int id = evtCtrl.evtBuf[k0]; const uint32_t irun = mBuffer[id].runNum; const int32_t ievt = mBuffer[id].evNum; // Find entry in runCtrl which belongs to the event mBuffer[id] // (only check if there is a need to check) if (runCtrl[lastRun].runId != irun) { //check which fileID to use (or open if needed) int j; for (j=0;j=MAX_RUN) { factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, id); // FIXME: What is the right action? continue; } lastRun = j; } // File not yet open if (runCtrl[lastRun].fileId < 0) { RUN_HEAD actRun; actRun.Version = 1; actRun.RunType = -1; //to be adapted actRun.Nroi = runCtrl[lastRun].roi0; actRun.NroiTM = runCtrl[lastRun].roi8; //ETIENNE don't reset it to zero as it is taken care of in DataWriteFits // if (actRun.Nroi == actRun.NroiTM) // actRun.NroiTM = 0; actRun.RunTime = runCtrl[lastRun].firstTime; actRun.RunUsec = runCtrl[lastRun].firstUsec; actRun.NBoard = NBOARDS; actRun.NPix = NPIX; actRun.NTm = NTMARK; actRun.Nroi = mBuffer[id].nRoi; memcpy(actRun.FADhead, mBuffer[id].FADhead, NBOARDS * sizeof (PEVNT_HEADER)); runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun)); if (runCtrl[lastRun].fileHd == NULL) { factPrintf(kError, 502, "writeEvt: Could not open a file for run %d (runOpen failed)", irun); runCtrl[lastRun].fileId = 91; continue; } runCtrl[lastRun].fileId = 0; factPrintf(kInfo, -1, "writeEvt: Opened new file for run %d (evt %d)", irun, ievt); } if (runCtrl[lastRun].fileId > 0) { // There is an event but file is already closed /* if (runCtrl[j].fileId < 100) { factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun); runCtrl[j].fileId += 100; }*/ evtCtrl.evtStat[k0] = 9903; } // File is open if (runCtrl[lastRun].fileId==0) { const int rc = runWrite(runCtrl[lastRun].fileHd, mBuffer[id].fEvent, sizeof (mBuffer[id])); if (rc >= 0) { // Sucessfully wrote event runCtrl[lastRun].lastTime = g_actTime; runCtrl[lastRun].actEvt++; evtCtrl.evtStat[k0] = 9901; } else { factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun); evtCtrl.evtStat[k0] = 9902; } checkAndCloseRun(lastRun, irun, rc<0, 1); } } /* //check if we should close a run (mainly when no event pending) //ETIENNE but first figure out which one is the latest run with a complete event. //i.e. max run Id and lastEvt >= 0 //this condition is sufficient because all pending events were written already in the loop just above //actrun uint32_t lastStartedTime = 0; uint32_t runIdFound = 1; //If we have an active run, look for its start time if (actrun != 0) { runIdfound = 0; for (int j=0;j we should exit gj.writStat = -22; //==> we should exit break; } gw_runStat = gi_runStat; gj.writStat = gj.readStat; } factPrintf(kInfo, -1, "Close all open files ..."); for (int j=0; j 90) { factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc); gi_maxProc = 1; } //partially initialize event control logics evtCtrl.frstPtr = 0; evtCtrl.lastPtr = 0; //start all threads (more to come) when we are allowed to .... while (g_runStat == 0) { xwait.tv_sec = 0; xwait.tv_nsec = 10000000; // sleep for ~10 msec nanosleep (&xwait, NULL); } i = 0; /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL); i++; /*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL); i++; /*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL); i++; imax = i; #ifdef BILAND xwait.tv_sec = 30;; xwait.tv_nsec = 0; // sleep for ~20sec nanosleep (&xwait, NULL); printf ("close all runs in 2 seconds\n"); CloseRunFile (0, time (NULL) + 2, 0); xwait.tv_sec = 1;; xwait.tv_nsec = 0; // sleep for ~20sec nanosleep (&xwait, NULL); printf ("setting g_runstat to -1\n"); g_runStat = -1; #endif //wait for all threads to finish for (i = 0; i < imax; i++) { /*j =*/ pthread_join (thread[i], (void **) &status); } } /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ #ifdef BILAND int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, int8_t * buffer) { printf ("called subproc %d\n", threadID); return threadID + 1; } /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len) { return 1; }; int runWrite (FileHandle_t fileHd, EVENT * event, size_t len) { return 1; usleep (10000); return 1; } //{ return 1; } ; int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len) { return 1; }; int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event) { int i = 0; // printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) ); // for (i=0; i