// #define EVTDEBUG #define NUMSOCK 1 //set to 7 for old configuration #define MAXREAD 65536 //64kB wiznet buffer #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "queue.h" #include "EventBuilder.h" enum Severity { kMessage = 10, ///< Just a message, usually obsolete kInfo = 20, ///< An info telling something which can be interesting to know kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour kError = 40, ///< Error, something unexpected happened, but can still be handled by the program kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination kDebug = 99, ///< A message used for debugging only }; using namespace std; #define MIN_LEN 32 // min #bytes needed to interpret FADheader #define MAX_LEN 256*1024 // size of read-buffer per socket //#define nanosleep(x,y) extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len); extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len); extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len); //extern int runFinish (uint32_t runnr); extern "C" void factOut (int severity, int err, char *message); extern void factReportIncomplete (uint64_t rep); extern "C" void gotNewRun (int runnr, PEVNT_HEADER * headers); extern void factStat (GUI_STAT gj); extern void factStatNew (EVT_STAT gi); extern "C" int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event); extern "C" int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, int8_t * buffer); extern void debugHead (int i, int j, void *buf); extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, int state, uint32_t tsec, uint32_t tusec); extern void debugStream (int isock, void *buf, int len); int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt); int g_maxProc; int gi_maxProc; uint g_actTime; int g_runStat; int g_reset; size_t g_maxMem; //maximum memory allowed for buffer FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd" uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards //EVT_STAT gi; GUI_STAT gj; #define MAX_EVT 65536 // ( 300s @ 220Hz; 16GB = 5000 evt @ roi=1024 (27s) ; 18000 evt @ roi = 300 ) #define MAX_RUN 8 // Number of concurrent runs void factPrintf(int severity, int id, const char *fmt, ...) { char str[1000]; va_list ap; va_start(ap, fmt); vsnprintf(str, 1000, fmt, ap); va_end(ap); factOut(severity, id, str); } #define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER)) #define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM) typedef struct TGB_struct { struct TGB_struct *prev; void *mem; } TGB_entry; TGB_entry *tgb_last = NULL; uint64_t tgb_memory = 0; uint64_t tgb_inuse = 0; void *TGB_Malloc() { // No free slot available, next alloc would exceed max memory if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem) return NULL; // We will return this amount of memory tgb_inuse += MAX_TOT_MEM; // No free slot available, allocate a new one if (!tgb_last) { tgb_memory += MAX_TOT_MEM; return malloc(MAX_TOT_MEM); } // Get the next free slot from the stack and return it TGB_entry *last = tgb_last; void *mem = last->mem; tgb_last = last->prev; free(last); return mem; }; void TGB_free(void *mem) { if (!mem) return; // Add the last free slot to the stack TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry)); // FIXME: Really free memory if memory usuage exceeds g_maxMem entry->prev = tgb_last; entry->mem = mem; tgb_last = entry; // Decrease the amont of memory in use accordingly tgb_inuse -= MAX_TOT_MEM; gj.usdMem = tgb_inuse; gj.bufTot--; } //RUN_CTRL runCtrl[MAX_RUN]; /* *** Definition of rdBuffer to read in IP packets; keep it global !!!! */ typedef union { uint8_t B[MAX_LEN]; uint16_t S[MAX_LEN / 2]; uint32_t I[MAX_LEN / 4]; uint64_t L[MAX_LEN / 8]; } CNV_FACT; typedef struct { int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ... int32_t bufPos; //next byte to read to the buffer next int32_t bufLen; //number of bytes left to read int32_t skip; //number of bytes skipped before start of event int errCnt; //how often connect failed since last successful int sockStat; //-1 if socket not yet connected , 99 if not exist int socket; //contains the sockets struct sockaddr_in SockAddr; //IP for each socket int evtID; // event ID of event currently read int runID; // run " int ftmID; // event ID from FTM uint fadLen; // FADlength of event currently read int fadVers; // Version of FAD int ftmTyp; // trigger type int Port; CNV_FACT *rBuf; } READ_STRUCT; /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ int GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT * rs) { /* *** generate Address, create sockets and allocates readbuffer for it *** *** if flag==0 generate socket and buffer *** <0 destroy socket and buffer *** >0 close and redo socket *** *** sid : board*7 + port id */ //close socket if open if (rs->sockStat == 0) { if (close (rs->socket) > 0) { factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno); } else { factPrintf(kInfo, 771, "Succesfully closed socket %d", sid); } } rs->sockStat = 99; if (flag < 0) { free (rs->rBuf); //and never open again rs->rBuf = NULL; return 0; } if (flag == 0) { //generate address and buffer ... rs->Port = port; rs->SockAddr.sin_family = sockAddr->sin_family; rs->SockAddr.sin_port = htons (port); rs->SockAddr.sin_addr = sockAddr->sin_addr; rs->rBuf = (CNV_FACT*)malloc (sizeof (CNV_FACT)); if (rs->rBuf == NULL) { factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid); rs->sockStat = 77; return -3; } } if ((rs->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno); rs->sockStat = 88; return -2; } int optval = 1; if (setsockopt (rs->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) { factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } optval = 10; //start after 10 seconds if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) { factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } optval = 10; //do every 10 seconds if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) { factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } optval = 2; //close after 2 unsuccessful tries if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) { factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); } factPrintf(kInfo, 773, "Successfully generated socket %d", sid); rs->sockStat = -1; //try to (re)open socket rs->errCnt = 0; return 0; } /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ int checkRoiConsistency(const CNV_FACT *rbuf, int roi[]) { int xjr = -1; int xkr = -1; //points to the very first roi int roiPtr = sizeof(PEVNT_HEADER)/2 + 2; roi[0] = ntohs(rbuf->S[roiPtr]); for (int jr = 0; jr < 9; jr++) { roi[jr] = ntohs(rbuf->S[roiPtr]); if (roi[jr]<0 || roi[jr]>1024) { factPrintf(kError, 999, "Illegal roi in channel %d (allowed: 0<=roi<=1024)", jr, roi[jr]); return 0; } // Check that the roi of pixels jr are compatible with the one of pixel 0 if (jr!=8 && roi[jr]!=roi[0]) { xjr = jr; break; } // Check that the roi of all other DRS chips on boards are compatible for (int kr = 1; kr < 4; kr++) { const int kroi = ntohs(rbuf->S[roiPtr]); if (kroi != roi[jr]) { xjr = jr; xkr = kr; break; } roiPtr += kroi+4; } } if (xjr>=0) { if (xkr<0) factPrintf(kFatal, 1, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]); else factPrintf(kFatal, 1, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rbuf->S[roiPtr])); return 0; } if (roi[8] < roi[0]) { factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]); //gj.badRoiB++; //gj.badRoi[b]++; return 0; } return 1; } deque> evtCtrl; map> runCtrl; void mBufFree(EVT_CTRL *evt) { TGB_free(evt->FADhead); } shared_ptr mBufEvt(const READ_STRUCT *rs) { int nRoi[9]; if (!checkRoiConsistency(rs->rBuf, nRoi)) return shared_ptr(); const int evID = rs->evtID; const uint runID = rs->runID; const int trgTyp = rs->ftmTyp; const int trgNum = rs->ftmID; const int fadNum = rs->evtID; for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++) { const shared_ptr evt = *it; // If the run is different, go on searching. // We cannot stop searching if a lower run-id is found as in // the case of the events, because theoretically, there // can be the same run on two different days. if (runID != evt->runNum) continue; // If the ID of the new event if higher than the last one stored // in that run, we have to assign a new slot (leave the loop) if (evID > evt->evNum) break; if (evID != evt->evNum) continue; // We have found an entry with the same runID and evtID // Check if ROI is consistent if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8]) { factPrintf(kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.", evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]); return shared_ptr(); } // count for inconsistencies if (evt->trgNum != trgNum) evt->Errors[0]++; if (evt->fadNum != fadNum) evt->Errors[1]++; if (evt->trgTyp != trgTyp) evt->Errors[2]++; //everything seems fine so far ==> use this slot .... return evt; } struct timeval tv; gettimeofday(&tv, NULL); auto ir = runCtrl.find(runID); if (ir==runCtrl.end()) { shared_ptr run(new RUN_CTRL); run->runId = runID; run->roi0 = nRoi[0]; // FIXME: Make obsolete! run->roi8 = nRoi[8]; // FIXME: Make obsolete! run->fileId = -2; run->lastEvt = 1; // Number of events partially started to read run->actEvt = 0; // Number of written events (write) run->procEvt = 0; // Number of successfully checked events (checkEvent) run->maxEvt = 999999999; // max number events allowed run->lastTime = tv.tv_sec; // Time when the last event was written run->closeTime = tv.tv_sec + 3600 * 24; //max time allowed ir = runCtrl.insert(make_pair(runID, run)).first; } const shared_ptr run = ir->second; if (run->roi0 != nRoi[0] || run->roi8 != nRoi[8]) { factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)", run->roi0, run->roi8, nRoi[0], nRoi[8], runID, evID); return shared_ptr(); } const shared_ptr evt(new EVT_CTRL, mBufFree); //flag all boards as unused evt->nBoard = 0; for (int b=0; bboard[b] = -1; evt->run = run; evt->pcTime[0] = tv.tv_sec; evt->pcTime[1] = tv.tv_usec; evt->nRoi = nRoi[0]; evt->nRoiTM = nRoi[8]; evt->evNum = evID; evt->runNum = runID; evt->fadNum = fadNum; evt->trgNum = trgNum; evt->trgTyp = trgTyp; evt->Errors[0] = 0; evt->Errors[1] = 0; evt->Errors[2] = 0; evt->Errors[3] = 0; evt->fEvent = NULL; evt->FADhead = NULL; // -1: kInValid // 0: kValid // 1-40: kIncomplete // 90: kIncompleteReported // 100: kCompleteEventInBuffer // 1000+x: kToBeProcessedByThreadX // 5000: kToBeWritten // 10000: kToBeDeleted evt->evtStat = 0; evtCtrl.push_back(evt); return evt; } /*-----------------------------------------------------------------*/ void initEvent(const shared_ptr &evt) { evt->fEvent = (EVENT*)((char*)evt->FADhead+MAX_HEAD_MEM); memset(evt->fEvent->Adc_Data, 0, (NPIX+NTMARK)*2*evt->nRoi); //flag all pixels as unused for (int k = 0; k < NPIX; k++) evt->fEvent->StartPix[k] = -1; //flag all TMark as unused for (int k = 0; k < NTMARK; k++) evt->fEvent->StartTM[k] = -1; evt->fEvent->NumBoards = 0; evt->fEvent->SoftTrig = 0; evt->fEvent->PCTime = evt->pcTime[0]; evt->fEvent->PCUsec = evt->pcTime[1]; evt->fEvent->Roi = evt->nRoi; evt->fEvent->RoiTM = evt->nRoiTM; evt->fEvent->EventNum = evt->evNum; evt->fEvent->TriggerNum = evt->trgNum; evt->fEvent->TriggerType = evt->trgTyp; } uint64_t reportIncomplete(const shared_ptr &evt, const char *txt) { factPrintf(kWarn, 601, "skip incomplete evt (run=%d, evt=%d, %s)", evt->runNum, evt->evNum, txt); uint64_t report = 0; char str[1000]; int ik=0; for (int ib=0; ibboard[ib]; if (jb>=0) // data received from that board { str[ik++] = '0'+(jb%10); continue; } // FIXME: This is not synchronous... it reports // accoridng to the current connection status, not w.r.t. to the // one when the event was taken. if (gi_NumConnect[ib]<=0) // board not connected { str[ik++] = 'x'; continue; } // data from this board lost str[ik++] = '.'; report |= ((uint64_t)1)< &evt) { // swapEventHeaderBytes: End of the header. to channels now int eStart = 36; for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++) { rbuf->S[eStart+0] = ntohs(rbuf->S[eStart+0]);//id rbuf->S[eStart+1] = ntohs(rbuf->S[eStart+1]);//start_cell rbuf->S[eStart+2] = ntohs(rbuf->S[eStart+2]);//roi rbuf->S[eStart+3] = ntohs(rbuf->S[eStart+3]);//filling eStart += 4+rbuf->S[eStart+2];//skip the pixel data } memcpy(&evt->FADhead[i], rbuf, sizeof(PEVNT_HEADER)); int src = sizeof(PEVNT_HEADER) / 2; // consistency of ROIs have been checked already (is it all correct?) const int roi = rbuf->S[src+2]; // different sort in FAD board..... for (int px = 0; px < 9; px++) { for (int drs = 0; drs < 4; drs++) { // pixH = rd[i].rBuf->S[src++]; // ID src++; const int pixC = rbuf->S[src++]; // start-cell const int pixR = rbuf->S[src++]; // roi //here we should check if pixH is correct .... const int pixS = i * 36 + drs * 9 + px; src++; evt->fEvent->StartPix[pixS] = pixC; const int dest1 = pixS * roi; memcpy(&evt->fEvent->Adc_Data[dest1], &rbuf->S[src], roi * 2); src += pixR; if (px == 8) { const int tmS = i * 4 + drs; //and we have additional TM info if (pixR > roi) { const int dest2 = tmS * roi + NPIX * roi; const int srcT = src - roi; evt->fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024; memcpy(&evt->fEvent->Adc_Data[dest2], &rbuf->S[srcT], roi * 2); } else { evt->fEvent->StartTM[tmS] = -1; } } } } } void doProcess(const shared_ptr &evt); void doWrite(const shared_ptr &evt); void checkAndCloseRun(const shared_ptr &run, int cond, int where); Queue> process(bind(doProcess, placeholders::_1)); Queue> write_queue(bind(doWrite, placeholders::_1)); void preProcess(const shared_ptr &evt) { //-------- it is better to open the run already here, so call can be used to initialize //-------- buffers etc. needed to interprete run (e.g. DRS calibration) const shared_ptr run = evt->run; if (run->runId==0) return; // File not yet open if (run->fileId < 0) { RUN_HEAD actRun; actRun.Version = 1; actRun.RunType = -1; //to be adapted actRun.Nroi = evt->nRoi; //runCtrl[lastRun].roi0; actRun.NroiTM = evt->nRoiTM; //runCtrl[lastRun].roi8; actRun.RunTime = evt->pcTime[0]; //runCtrl[lastRun].firstTime; actRun.RunUsec = evt->pcTime[1]; //runCtrl[lastRun].firstUsec; actRun.NBoard = NBOARDS; actRun.NPix = NPIX; actRun.NTm = NTMARK; memcpy(actRun.FADhead, evt->FADhead, NBOARDS*sizeof(PEVNT_HEADER)); run->fileHd = runOpen(evt->runNum, &actRun, sizeof (actRun)); if (run->fileHd == NULL) { factPrintf(kError, 502, "procEvt: Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum); run->fileId = 91; // No further processing of this event return; } run->fileId = 0; factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum); } //and set correct event header ; also check for consistency in event (not yet) evt->fEvent->Errors[0] = evt->Errors[0]; evt->fEvent->Errors[1] = evt->Errors[1]; evt->fEvent->Errors[2] = evt->Errors[2]; evt->fEvent->Errors[3] = evt->Errors[3]; for (int ib=0; ibboard[ib] == -1) { evt->FADhead[ib].start_package_flag = 0; evt->fEvent->BoardTime[ib] = 0; } else { evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time; } } const int rc = eventCheck(evt->runNum, evt->FADhead, evt->fEvent); // no further processing of event ('delete') if (rc < 0) return; //evt->evtStat = 1000; // flag 'start processing' run->procEvt++; process.post(evt); } void doProcess(const shared_ptr &evt) { const int jret = subProcEvt(1, evt->FADhead, evt->fEvent, 0); if (jret>0 && jret<=1) factPrintf(kError, -1, "Process wants to send event to process %d... not allowed.", jret); // flag as 'to be written' if (jret<=1) return; //evt->evtStat = 5000; write_queue.post(evt); } void doWrite(const shared_ptr &evt) { const shared_ptr run = evt->run; if (run->runId==0) return; // File is not open if (run->fileId!=0) return; const int rc = runWrite(run->fileHd, evt->fEvent, 0); if (rc >= 0) { // Sucessfully wrote event run->lastTime = g_actTime; run->actEvt++; } else factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", evt->runNum); checkAndCloseRun(run, rc<0, 1); /* // Although the are no pending events, we have to check if a run should be closed (timeout) for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++) { if (ir->second->fileId == 0) { //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it const int cond = ir->second->runId == 0; checkAndCloseRun(ir->second, cond, 2); } } */ } void *readFAD (void *ptr) { /* *** main loop reading FAD data and sorting them to complete events */ Queue> queue(bind(preProcess, placeholders::_1)); factPrintf(kInfo, -1, "Start initializing (readFAD)"); READ_STRUCT rd[NBOARDS]; //buffer to read IP and afterwards store in mBuffer uint32_t actrun = 0; const int minLen = sizeof(PEVNT_HEADER); //min #bytes needed to check header: full header for debug int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX; gi_resetS = gi_resetR = 9; int sockDef[NBOARDS]; //internal state of sockets memset(sockDef, 0, NBOARDS*sizeof(int)); START: //time in seconds uint gi_SecTime = time(NULL);; g_actTime = gi_SecTime; const int cntsock = 8 - NUMSOCK ; if (gi_resetS > 0) { //make sure all sockets are preallocated as 'not exist' for (int i = 0; i < NBOARDS; i++) { rd[i].socket = -1; rd[i].sockStat = 99; } for (int k = 0; k < NBOARDS; k++) { gi_NumConnect[k] = 0; //gi.numConn[k] = 0; gj.numConn[k] = 0; //gj.errConn[k] = 0; gj.rateBytes[k] = 0; gj.totBytes[k] = 0; } } if (gi_resetR > 0) { gj.bufTot = gj.maxEvt = gj.xxxEvt = 0; gj.usdMem = gj.maxMem = gj.xxxMem = 0; gj.totMem = tgb_memory; gj.bufNew = gj.bufEvt = 0; gj.evtSkip = gj.evtWrite = gj.evtErr = 0; factPrintf(kInfo, -1, "End initializing (readFAD)"); } gi_reset = gi_resetR = gi_resetS = gi_resetW = 0; //loop until global variable g_runStat claims stop while (g_runStat >= 0 && g_reset == 0) { gj.readStat = g_runStat; for (int b = 0; b < NBOARDS; b++) { // Nothing has changed if (g_port[b].sockDef == sockDef[b]) continue; gi_NumConnect[b] = 0; //must close all connections gj.numConn[b] = 0; // s0 = 0: sockets to be defined and opened // s0 = -1: sockets to be destroyed // s0 = +1: sockets to be closed and reopened int s0 = 0; if (sockDef[b] != 0) s0 = g_port[b].sockDef==0 ? -1 : +1; const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0; GenSock(s0, b, p0+1, &g_port[b].sockAddr, &rd[b]); //generate address and socket sockDef[b] = g_port[b].sockDef; } // count the number of active boards int actBoards = 0; for (int b = 0; b < NBOARDS; b++) if (sockDef[b] > 0) actBoards++; //check all sockets if something to read for (int i = 0; i < NBOARDS; i++) { // Do not try to connect this socket if (rd[i].sockStat > 0) continue; if (rd[i].sockStat == -1) { //try to connect if not yet done rd[i].sockStat = connect (rd[i].socket, (struct sockaddr *) &rd[i].SockAddr, sizeof (rd[i].SockAddr)); // Failed if (rd[i].sockStat == -1) { rd[i].errCnt++; usleep(25000); continue; } // Success (rd[i].sockStat == 0) if (sockDef[i] > 0) { rd[i].bufTyp = 0; // expect a header rd[i].bufLen = sizeof(PEVNT_HEADER); // max size to read at begining } else { rd[i].bufTyp = -1; // full data to be skipped rd[i].bufLen = MAX_LEN; // huge for skipping } rd[i].bufPos = 0; // no byte read so far rd[i].skip = 0; // start empty gi_NumConnect[i] += cntsock; gj.numConn[i]++; factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", i, gj.numConn[i]); } // Do not read from this socket if (rd[i].bufLen<0) continue; if (rd[i].bufLen>0) { const int32_t jrd = recv(rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT); // recv failed if (jrd<0) { // There was just nothing waiting if (errno==EWOULDBLOCK || errno==EAGAIN) continue; factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno); continue; } // connection was closed ... if (jrd==0) { factPrintf(kInfo, 441, "Socket %d closed by FAD", i); const int s0 = sockDef[i] > 0 ? +1 : -1; GenSock(s0, i, 0, NULL, &rd[i]); gi_NumConnect[i]-= cntsock ; gj.numConn[i]--; continue; } gj.rateBytes[i] += jrd; // are we skipping this board ... if (rd[i].bufTyp < 0) continue; rd[i].bufPos += jrd; //==> prepare for continuation rd[i].bufLen -= jrd; #ifdef EVTDEBUG debugRead(i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, rd[i].bufTyp, tv.tv_sec, tv.tv_usec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event #endif } //we are reading event header if (rd[i].bufTyp <= 0) { //not yet sufficient data to take action if (rd[i].bufPos < minLen) continue; //check if startflag correct; else shift block .... // FIXME: This is not enough... this combination of // bytes can be anywhere... at least the end bytes // must be checked somewhere, too. int k; for (k = 0; k < rd[i].bufPos - 1; k++) { //start.S = 0xFB01; if (rd[i].rBuf->B[k] == 0xfb && rd[i].rBuf->B[k+1] == 0x01) break; } rd[i].skip += k; //no start of header found if (k >= rd[i].bufPos - 1) { rd[i].rBuf->B[0] = rd[i].rBuf->B[rd[i].bufPos]; rd[i].bufPos = 1; rd[i].bufLen = sizeof(PEVNT_HEADER)-1; continue; } if (k > 0) { rd[i].bufPos -= k; rd[i].bufLen += k; memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos); } if (rd[i].bufPos < minLen) continue; if (rd[i].skip > 0) { factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i); rd[i].skip = 0; } // TGB: This needs much more checks than just the first two bytes! // Swap everything except start_package_flag. // It is to difficult to find out where it is used how, // but it doesn't really matter because it is not really // used anywehere else // rd[i].rBuf->S[1] = ntohs(rd[i].rBuf->S[1]); // package_length rd[i].rBuf->S[2] = ntohs(rd[i].rBuf->S[2]); // version_no rd[i].rBuf->S[3] = ntohs(rd[i].rBuf->S[3]); // PLLLCK rd[i].rBuf->S[4] = ntohs(rd[i].rBuf->S[4]); // trigger_crc rd[i].rBuf->S[5] = ntohs(rd[i].rBuf->S[5]); // trigger_type rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]); // board id rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]); // adc_clock_phase_shift rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]); // number_of_triggers_to_generate rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]); // trigger_generator_prescaler rd[i].rBuf->I[3] = ntohl(rd[i].rBuf->I[3]); // trigger_id rd[i].rBuf->I[4] = ntohl(rd[i].rBuf->I[4]); // fad_evt_counter rd[i].rBuf->I[5] = ntohl(rd[i].rBuf->I[5]); // REFCLK_frequency rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]); // runnumber; rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]); // time; for (int s=24;s<24+NTemp+NDAC;s++) rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac rd[i].fadLen = ntohs(rd[i].rBuf->S[1]) * 2; rd[i].fadVers = rd[i].rBuf->S[2]; rd[i].ftmTyp = rd[i].rBuf->S[5]; rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt) rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt) rd[i].runID = rd[i].rBuf->I[11]==0 ? g_actTime : rd[i].rBuf->I[11]; rd[i].bufTyp = 1; //ready to read full record rd[i].bufLen = rd[i].fadLen - rd[i].bufPos; const int fadBoard = rd[i].rBuf->S[12]; debugHead(i, fadBoard, rd[i].rBuf); continue; } // are we reading data // not yet all read if (rd[i].bufLen > 0) continue; // stop.S = 0x04FE; if (rd[i].rBuf->B[rd[i].fadLen - 1] != 0xfe || rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04) { factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d", i, rd[i].evtID, rd[i].fadLen, rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]); // ready to read next header rd[i].bufTyp = 0; rd[i].bufLen = sizeof(PEVNT_HEADER); rd[i].bufPos = 0; continue; } // int actid; // if (g_useFTM > 0) // actid = rd[i].evtID; // else // actid = rd[i].ftmID; //get index into mBuffer for this event (create if needed) const shared_ptr evt = mBufEvt(&rd[i]); // We have a valid entry, but no memory has yet been allocated if (evt && evt->FADhead == NULL) { // Try to get memory from the big buffer evt->FADhead = (PEVNT_HEADER*)TGB_Malloc(); if (evt->FADhead == NULL) { // If this works properly, this is a hack which can be removed, or // replaced by a signal or dim message if (rd[i].bufTyp==2) factPrintf(kError, 882, "malloc failed for event %d (run=%d)", evt->evNum, evt->runNum); rd[i].bufTyp = 2; continue; } // Initialise contents of mBuffer[evID]->fEvent initEvent(evt); // Some statistics gj.usdMem = tgb_inuse; if (gj.usdMem > gj.maxMem) gj.maxMem = gj.usdMem; gj.rateNew++; gj.bufTot++; if (gj.bufTot > gj.maxEvt) gj.maxEvt = gj.bufTot; } rd[i].bufTyp = 0; rd[i].bufLen = sizeof(PEVNT_HEADER); rd[i].bufPos = 0; // Fatal error occured. Event cannot be processed. Skip it. Start reading next header. if (!evt) continue; //we have a valid entry in mBuffer[]; fill it const int fadBoard = rd[i].rBuf->S[12]; const int fadCrate = fadBoard>>8; if (i != (fadCrate * 10 + (fadBoard&0xff))) { factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)", i, fadBoard, fadCrate, fadBoard&0xff); } if (evt->board[i] != -1) { factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d", evt->evNum, i, i, rd[i].fadLen, rd[i].rBuf->B[0], rd[i].rBuf->B[1], rd[i].rBuf->B[rd[i].fadLen - 2], rd[i].rBuf->B[rd[i].fadLen - 1]); continue; // Continue reading next header } // Copy data from rd[i] to mBuffer[evID] copyData(rd[i].rBuf, i, evt); // now we have stored a new board contents into Event structure evt->fEvent->NumBoards++; evt->board[i] = i; evt->nBoard++; evt->evtStat = evt->nBoard; // have we already reported first (partial) event of this run ??? if (evt->nBoard==1 && evt->runNum != actrun) { // Signal the fadctrl that a new run has been started gotNewRun(evt->runNum, NULL); factPrintf(kInfo, 1, "gotNewRun called, prev run %d, new run %d, event %d", actrun, evt->runNum, evt->evNum); // We got the first part of this event, so this is // the number of events we expect for this run evt->run->lastEvt++; // Since we have started a new run, we know already when to close the // previous run in terms of number of events const auto ir = runCtrl.find(actrun); if (ir!=runCtrl.end()) ir->second->maxEvt = ir->second->lastEvt; // Change 'actrun' the the new runnumber actrun = evt->runNum; } // event not yet complete if (evt->nBoard < actBoards) continue; // GARBAGE COLLECTION // This is a non-ideal hack to lower the probability that // in mBufEvt the search for correct entry in runCtrl // will not return a super-old entry. I don't want // to manipulate that in another thread. for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++) { if (ir->runId==evt->runNum) break; if (ir->second->fileId>0) runCtrl.erase(ir); } // we have just completed an event... so all previous events // must have been completed already. If they are not, there // is no need to wait for the timeout, because they will never // get completed. We can just ensure that if we check for the previous // event to be complete every time we receive a new complete event. // If we find an incomplete one, we remove all consecutive // incomplete ones. for (auto it=evtCtrl.begin()+1; it!=evtCtrl.end(); it++) { const shared_ptr e = *it; if (e.get()==evt.get()) { queue.post(e); evtCtrl.erase(it); break; } reportIncomplete(e, "expired"); evtCtrl.pop_front(); } } // end for loop over all sockets g_actTime = time (NULL); if (g_actTime <= gi_SecTime) { usleep(1); continue; } gi_SecTime = g_actTime; gj.bufNew = 0; //loop over all active events and flag those older than read-timeout //delete those that are written to disk .... const int count = evtCtrl.size();//(evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT; // This could be improved having the pointer which separates the queue with // the incomplete events from the queue with the complete events for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++) { const shared_ptr evt = *it; // Check the more likely case first: incomplete events if (evt->evtStat>=0 && evt->evtStat<100) { gj.bufNew++; //incomplete event in Buffer // Event has not yet timed out or was reported already if (evt->evtStat==90 || evt->pcTime[0]>=g_actTime - 30) continue; // This will result in the emission of a dim service. // It doesn't matter if that takes comparably long, // because we have to stop the run anyway. const uint64_t rep = reportIncomplete(evt, "timeout"); factReportIncomplete(rep); //timeout for incomplete events evt->evtStat = 90; gj.evtSkip++; continue; } // Check the less likely case: 'useless' or 'delete' // evtState==0 can happen if the event was initialized (some data received) // but the data did not make sense (e.g. inconsistent rois) if (evt->evtStat==0 || evt->evtStat == 10000) { evtCtrl.erase(it); //event written--> free memory gj.evtWrite++; gj.rateWrite++; } // Remove leading invalidated slots from queue // Do they exist at all? if (evt->evtStat==-1) evtCtrl.erase(it); } // The number of complete events in the buffer is the total number of // events in the buffer minus the number of incomplete events. gj.bufEvt = count - gj.bufNew; gj.deltaT = 1000; //temporary, must be improved for (int ib = 0; ib < NBOARDS; ib++) gj.totBytes[ib] += gj.rateBytes[ib]; gj.totMem = tgb_memory; if (gj.maxMem > gj.xxxMem) gj.xxxMem = gj.maxMem; if (gj.maxEvt > gj.xxxEvt) gj.xxxEvt = gj.maxEvt; factStat (gj); //factStatNew (gi); gj.rateNew = gj.rateWrite = 0; gj.maxMem = gj.usdMem; gj.maxEvt = gj.bufTot; for (int b = 0; b < NBOARDS; b++) gj.rateBytes[b] = 0; } // while (g_runStat >= 0 && g_reset == 0) factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset); if (g_reset > 0) { gi_reset = g_reset; gi_resetR = gi_reset % 10; //shall we stop reading ? gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ? gi_resetW = (gi_reset / 100) % 10; //shall we close files ? gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ? g_reset = 0; } else { gi_reset = 0; gi_resetR = g_runStat == -1 ? 1 : 7; gi_resetS = 7; //close all sockets gi_resetW = 7; //close all files gi_resetX = 0; //inform others we have to quit .... gj.readStat = -11; //inform all that no update to happen any more } if (gi_resetS > 0) { //must close all open sockets ... factPrintf(kInfo, -1, "Close all sockets..."); for (int i = 0; i < NBOARDS; i++) { if (rd[i].sockStat != 0) continue; GenSock(-1, i, 0, NULL, &rd[i]); //close and destroy open socket gi_NumConnect[i]-= cntsock ; gj.numConn[i]--; sockDef[i] = 0; //flag ro recreate the sockets ... rd[i].sockStat = -1; //and try to open asap } } if (gi_resetR > 0) { //and clear all buffers (might have to wait until all others are done) while (evtCtrl.size()) { const shared_ptr evt = evtCtrl.front(); // flag incomplete events as 'read finished' // We cannot just detele all events, because some might currently being processed, // so we have to wait until the processing thread currently processing the event // signals that the event can be deleted. (Note, that there are currently never // two threads processing the same event at the same time) if ((evt->evtStat>0 && evt->evtStat<90) || evt->evtStat==10000) evtCtrl.pop_front(); usleep(1); } } //queue.wait(); //queue.join(); if (gi_reset > 0) { if (gi_resetW > 0) CloseRunFile (0, 0, 0); //ask all Runs to be closed if (gi_resetX > 0) { struct timespec xwait; xwait.tv_sec = gi_resetX; xwait.tv_nsec = 0; nanosleep (&xwait, NULL); } factPrintf(kInfo, -1, "Continue read Process ..."); gi_reset = 0; goto START; } factPrintf(kInfo, -1, "Exit read Process..."); factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse); gj.readStat = -99; factStat (gj); //factStatNew (gi); return 0; } /*-----------------------------------------------------------------*/ /* void *subProc(void *thrid) { const int64_t threadID = (int64_t)thrid; factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID); while (g_runStat > -2) //in case of 'exit' we still must process pending events { int numWait = 0; for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++) { const shared_ptr evt = *it; // This is a threading issue... the evtStat might have been invalid // but the frstPtr is not yet updated if (evt->evtStat==-1) continue; // If we find the first event still waiting for processing // there will be only unprocessed events after this one in the queue if (evt->evtStat<1000+threadID) { numWait = 1; break; } // If the event was processed already, skip it // We could replace that to a moving pointer pointing to the first // non-processed event if (evt->evtStat!=1000+threadID) continue; const int jret = subProcEvt(threadID, evt->FADhead, evt->fEvent, 0); if (jret>0 && jret<=threadID) factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret); if (jret<=threadID) { evt->evtStat = 10000; // flag as 'to be deleted' continue; } if (jret>=gi_maxProc) { evt->evtStat = 5000; // flag as 'to be written' continue; } evt->evtStat = 1000 + jret; // flag for next proces } if (gj.readStat < -10 && numWait == 0) { //nothing left to do factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID); return 0; } usleep(1); } factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID); return 0; } void * procEvt (void *ptr) { int status; factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc); pthread_t thread[100]; // int th_ret[100]; for (long long k = 0; k < gi_maxProc; k++) { pthread_create (&thread[k], NULL, subProc, (void *) k); } // in case of 'exit' we still must process pending events while (g_runStat > -2) { int numWait = 0; for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++) { const shared_ptr evt = *it; // This is a threading issue... the evtStat might have been invalid // but the frstPtr is not yet updated if (evt->evtStat==-1) continue; // If we find the first incomplete event which is not supposed to // be processed, there are only more incomplete events in the queue if (evt->evtStat<90) { numWait = 1; break; } // If the event was processed already, skip it. // We could replace that to a moving pointer pointing to the first // non-processed event if (evt->evtStat>=1000) continue; //-------- it is better to open the run already here, so call can be used to initialize //-------- buffers etc. needed to interprete run (e.g. DRS calibration) const uint32_t irun = evt->runNum; const int32_t ievt = evt->evNum; const shared_ptr run = evt->run; if (run->runId==0) continue; // File not yet open if (run->fileId < 0) { RUN_HEAD actRun; actRun.Version = 1; actRun.RunType = -1; //to be adapted actRun.Nroi = evt->nRoi; //runCtrl[lastRun].roi0; actRun.NroiTM = evt->nRoiTM; //runCtrl[lastRun].roi8; actRun.RunTime = evt->pcTime[0]; //runCtrl[lastRun].firstTime; actRun.RunUsec = evt->pcTime[1]; //runCtrl[lastRun].firstUsec; actRun.NBoard = NBOARDS; actRun.NPix = NPIX; actRun.NTm = NTMARK; memcpy(actRun.FADhead, evt->FADhead, NBOARDS*sizeof(PEVNT_HEADER)); run->fileHd = runOpen(irun, &actRun, sizeof (actRun)); if (run->fileHd == NULL) { factPrintf(kError, 502, "procEvt: Could not open new file for run %d (evt=%d, runOpen failed)", irun, ievt); run->fileId = 91; continue; } run->fileId = 0; factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt); } //-------- //-------- //and set correct event header ; also check for consistency in event (not yet) evt->fEvent->Errors[0] = evt->Errors[0]; evt->fEvent->Errors[1] = evt->Errors[1]; evt->fEvent->Errors[2] = evt->Errors[2]; evt->fEvent->Errors[3] = evt->Errors[3]; for (int ib=0; ibboard[ib] == -1) { evt->FADhead[ib].start_package_flag = 0; evt->fEvent->BoardTime[ib] = 0; } else { evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time; } } const int rc = eventCheck(evt->runNum, evt->FADhead, evt->fEvent); if (rc < 0) { evt->evtStat = 10000; // flag event to be deleted } else { evt->evtStat = 1000; // flag 'start processing' run->procEvt++; } } if (gj.readStat < -10 && numWait == 0) { //nothing left to do factPrintf(kInfo, -1, "Exit Processing Process ..."); gj.procStat = -22; //==> we should exit return 0; } usleep(1); gj.procStat = gj.readStat; } //we are asked to abort asap ==> must flag all remaining events // when gi_runStat claims that all events are in the buffer... factPrintf(kInfo, -1, "Abort Processing Process ..."); for (int k = 0; k < gi_maxProc; k++) { pthread_join (thread[k], (void **) &status); } gj.procStat = -99; return 0; } */ int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt) { /* close run runId (all all runs if runId=0) */ /* return: 0=close scheduled / >0 already closed / <0 does not exist */ if (runId == 0) { for (auto it=runCtrl.begin(); it!=runCtrl.end(); it++) { const shared_ptr run = it->second; //run is open if (run->fileId == 0) { run->closeTime = closeTime; run->maxEvt = maxEvt; } } return 0; } auto it=runCtrl.find(runId); if (it==runCtrl.end()) return -1; const shared_ptr run = it->second; // run already closed if (run->fileId>0) return +2; run->closeTime = closeTime; run->maxEvt = maxEvt; return run->fileId==0 ? 0 : 1; } void checkAndCloseRun(const shared_ptr &run, int cond, int where) { if (!cond && run->closeTime >= g_actTime && run->lastTime >= g_actTime - 300 && run->maxEvt > run->actEvt) return; //close run for whatever reason int ii = 0; if (cond) ii = 1; if (run->closeTime < g_actTime) ii |= 2; // = 2; if (run->lastTime < g_actTime - 300) ii |= 4; // = 3; if (run->maxEvt <= run->actEvt) ii |= 8; // = 4; run->closeTime = g_actTime - 1; const int rc = runClose(run->fileHd, NULL, 0);//&runTail[j], sizeof(runTail[j])); if (rc<0) { factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)", where, run->runId, rc); run->fileId = 92+where*2; } else { factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)", where, run->runId, ii); run->fileId = 93+where*2; } } /*-----------------------------------------------------------------*/ /* void *writeEvt (void *ptr) { factPrintf(kInfo, -1, "Starting write-thread"); while (g_runStat > -2) { //int numWrite = 0; int numWait = 0; // Note that the current loop does not at all gurantee that // the events are written in the correct order. for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++) { const shared_ptr evt = *it; // This is a threading issue... the evtStat might have been invalid // but the frstPtr is not yet updated if (evt->evtStat==-1) continue; // If we find the first non-written event which is not supposed to // be written, there are only more incomplete events in the queue if (evt->evtStat<5000) { numWait = 1; break; } // If the event was written already already, skip it // We could replace that to a moving pointer pointing to the first // non-processed event if (evt->evtStat!=5000) continue; const shared_ptr run = evt->run; if (run->runId==0) continue; // File is open if (run->fileId==0) { const int rc = runWrite(run->fileHd, evt->fEvent, 0); if (rc >= 0) { // Sucessfully wrote event run->lastTime = g_actTime; run->actEvt++; } else factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", evt->runNum); checkAndCloseRun(run, rc<0, 1); } evt->evtStat = 10000; // event written (or has to be discarded) -> delete } // Although the are no pending events, we have to check if a run should be closed (timeout) for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++) { if (ir->second->fileId == 0) { //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it const int cond = ir->second->runId == 0; checkAndCloseRun(ir->second, cond, 2); } } usleep(1); //nothing left to do if (gj.readStat < -10 && numWait == 0) { factPrintf(kInfo, -1, "Finish Write Process ..."); gj.writStat = -22; //==> we should exit break; } gj.writStat = gj.readStat; } factPrintf(kInfo, -1, "Close all open files ..."); for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++) { if (ir->second->fileId == 0) checkAndCloseRun(ir->second, 1, 3); } gj.writStat = -99; factPrintf(kInfo, -1, "Exit Writing Process ..."); return 0; } */ void StartEvtBuild () { int i, /*j,*/ imax, status/*, th_ret[50]*/; pthread_t thread[50]; struct timespec xwait; gj.readStat = gj.procStat = gj.writStat = 0; factPrintf(kInfo, -1, "Starting EventBuilder V15.07 A"); gi_maxProc = g_maxProc; if (gi_maxProc <= 0 || gi_maxProc > 90) { factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc); gi_maxProc = 1; } //start all threads (more to come) when we are allowed to .... while (g_runStat == 0) { xwait.tv_sec = 0; xwait.tv_nsec = 10000000; // sleep for ~10 msec nanosleep (&xwait, NULL); } i = 0; /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL); i++; ///*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL); //i++; ///*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL); //i++; imax = i; #ifdef BILAND xwait.tv_sec = 30;; xwait.tv_nsec = 0; // sleep for ~20sec nanosleep (&xwait, NULL); printf ("close all runs in 2 seconds\n"); CloseRunFile (0, time (NULL) + 2, 0); xwait.tv_sec = 1;; xwait.tv_nsec = 0; // sleep for ~20sec nanosleep (&xwait, NULL); printf ("setting g_runstat to -1\n"); g_runStat = -1; #endif //wait for all threads to finish for (i = 0; i < imax; i++) { /*j =*/ pthread_join (thread[i], (void **) &status); } } /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ #ifdef BILAND int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, int8_t * buffer) { printf ("called subproc %d\n", threadID); return threadID + 1; } /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ /*-----------------------------------------------------------------*/ FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len) { return 1; }; int runWrite (FileHandle_t fileHd, EVENT * event, size_t len) { return 1; usleep (10000); return 1; } //{ return 1; } ; int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len) { return 1; }; int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event) { int i = 0; // printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) ); // for (i=0; i