#include #include #include #include #include #include #include #include "queue.h" #include "MessageImp.h" using namespace std; #include "EventBuilder.h" #define MIN_LEN 32 // min #bytes needed to interpret FADheader #define MAX_LEN (36*3*1024) // (data+header)*num channels #define COMPLETE_EVENTS //#define USE_EPOLL //#define USE_SELECT // ========================================================================== bool runOpen(const shared_ptr &evt); bool runWrite(const shared_ptr &evt); void runClose(); void applyCalib(const shared_ptr &evt); void factOut(int severity, const char *message); void factReportIncomplete (uint64_t rep); void gotNewRun(RUN_CTRL2 &run); void runFinished(); void factStat(GUI_STAT gj); int eventCheck(const shared_ptr &evt); void debugHead(void *buf); // ========================================================================== int g_reset; size_t g_maxMem; //maximum memory allowed for buffer FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd" uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards GUI_STAT gj; // ========================================================================== void factPrintf(int severity, const char *fmt, ...) { char str[1000]; va_list ap; va_start(ap, fmt); vsnprintf(str, 1000, fmt, ap); va_end(ap); factOut(severity, str); } // ========================================================================== #define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER)) #define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM) namespace Memory { uint64_t inuse = 0; uint64_t allocated = 0; uint64_t max_inuse = 0; mutex mtx; forward_list memory; void *malloc() { // No free slot available, next alloc would exceed max memory if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem) return NULL; // We will return this amount of memory // This is not 100% thread safe, but it is not a super accurate measure anyway inuse += MAX_TOT_MEM; if (inuse>max_inuse) max_inuse = inuse; void *mem = NULL; if (memory.empty()) { // No free slot available, allocate a new one allocated += MAX_TOT_MEM; mem = new char[MAX_TOT_MEM]; } else { // Get the next free slot from the stack and return it const lock_guard lock(mtx); mem = memory.front(); memory.pop_front(); } memset(mem, 0, MAX_HEAD_MEM); return mem; }; void free(void *mem) { if (!mem) return; // Decrease the amont of memory in use accordingly inuse -= MAX_TOT_MEM; // If the maximum memory has changed, we might be over the limit. // In this case: free a slot if (allocated>g_maxMem) { delete [] (char*)mem; allocated -= MAX_TOT_MEM; return; } const lock_guard lock(mtx); memory.push_front(mem); } }; // ========================================================================== struct READ_STRUCT { enum buftyp_t { kStream, kHeader, kData, #ifdef COMPLETE_EVENTS kWait #endif }; // ---------- connection ---------- static uint activeSockets; int sockId; // socket id (board number) int socket; // socket handle bool connected; // is this socket connected? struct sockaddr_in SockAddr; // Socket address copied from wrapper during socket creation // ------------ epoll ------------- static int fd_epoll; static epoll_event events[NBOARDS]; static void init(); static void close(); static int wait(); static READ_STRUCT *get(int i) { return reinterpret_cast(events[i].data.ptr); } // ------------ buffer ------------ buftyp_t bufTyp; // what are we reading at the moment: 0=header 1=data -1=skip ... uint32_t bufLen; // number of bytes left to read uint8_t *bufPos; // next byte to read to the buffer next union { uint8_t B[MAX_LEN]; uint16_t S[MAX_LEN / 2]; uint32_t I[MAX_LEN / 4]; uint64_t L[MAX_LEN / 8]; PEVNT_HEADER H; }; uint64_t rateBytes; uint32_t skip; // number of bytes skipped before start of event bool repmem; // reportet no mmemory free uint32_t len() const { return uint32_t(H.package_length)*2; } void swapHeader(); void swapData(); // -------------------------------- READ_STRUCT() : socket(-1), connected(false), rateBytes(0) { if (fd_epoll<0) init(); } ~READ_STRUCT() { destroy(); } void destroy(); bool create(sockaddr_in addr); void check(int, sockaddr_in addr); bool read(); }; int READ_STRUCT::wait() { // wait for something to do... const int rc = epoll_wait(fd_epoll, events, NBOARDS, 10); // max, timeout[ms] if (rc>=0) return rc; if (errno==EINTR) // timout or signal interruption return 0; factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno); return -1; } uint READ_STRUCT::activeSockets = 0; int READ_STRUCT::fd_epoll = -1; epoll_event READ_STRUCT::events[NBOARDS]; void READ_STRUCT::init() { if (fd_epoll>=0) return; #ifdef USE_EPOLL fd_epoll = epoll_create(NBOARDS); if (fd_epoll<0) { factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno); return; } #endif } void READ_STRUCT::close() { #ifdef USE_EPOLL if (::close(fd_epoll) > 0) factPrintf(MessageImp::kFatal, "Closing epoll: %m (close,rc=%d)", errno); else factPrintf(MessageImp::kInfo, "Succesfully closed epoll"); #endif fd_epoll = -1; } bool READ_STRUCT::create(sockaddr_in sockAddr) { if (socket>=0) return false; const int port = ntohs(sockAddr.sin_port) + 1; SockAddr.sin_family = sockAddr.sin_family; SockAddr.sin_addr = sockAddr.sin_addr; SockAddr.sin_port = htons(port); if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno); socket = -1; return false; } int optval = 1; if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); optval = 10; //start after 10 seconds if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); optval = 10; //do every 10 seconds if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); optval = 2; //close after 2 unsuccessful tries if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); factPrintf(MessageImp::kInfo, "Successfully generated socket %d", sockId); //connected = false; activeSockets++; return true; } void READ_STRUCT::destroy() { if (socket==-1) return; #ifdef USE_EPOLL // strictly speaking this should not be necessary if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0) factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno); #endif if (::close(socket) > 0) factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno); else factPrintf(MessageImp::kInfo, "Succesfully closed socket %d", sockId); socket = -1; connected = false; activeSockets--; } void READ_STRUCT::check(int sockDef, sockaddr_in addr) { // Continue in the most most likely case (performance) //if (socket>=0 && sockDef!=0 && connected) // return; // socket open, but should not be open if (socket>=0 && sockDef==0) destroy(); // Socket closed, but should be open if (socket<0 && sockDef!=0) create(addr); //generate address and socket // Socket closed if (socket<0) return; // Socket open and connected: Nothing to do if (connected) return; //try to connect if not yet done const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr)); if (rc == -1) return; connected = true; if (sockDef<0) { bufTyp = READ_STRUCT::kStream; // full data to be skipped bufLen = MAX_LEN; // huge for skipping } else { bufTyp = READ_STRUCT::kHeader; // expect a header bufLen = sizeof(PEVNT_HEADER); // max size to read at begining } bufPos = B; // no byte read so far skip = 0; // start empty repmem = false; factPrintf(MessageImp::kInfo, "New connection %d (%d)", sockId, socket); #ifdef USE_EPOLL epoll_event ev; ev.events = EPOLLIN; ev.data.ptr = this; // user data (union: ev.ptr) if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0) factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno); #endif } bool READ_STRUCT::read() { if (bufLen==0) return true; const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT); // recv failed if (jrd<0) { // There was just nothing waiting if (errno==EWOULDBLOCK || errno==EAGAIN) return false; factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno); return false; } // connection was closed ... if (jrd==0) { factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId); destroy();//DestroySocket(rd[i]); //generate address and socket return false; } rateBytes += jrd; // are we skipping this board ... if (bufTyp==kStream) return false; bufPos += jrd; //==> prepare for continuation bufLen -= jrd; // not yet all read return bufLen==0; } void READ_STRUCT::swapHeader() { S[1] = ntohs(S[1]); // package_length (bytes not swapped!) S[2] = ntohs(S[2]); // version_no S[3] = ntohs(S[3]); // PLLLCK S[4] = ntohs(S[4]); // trigger_crc S[5] = ntohs(S[5]); // trigger_type I[3] = ntohl(I[3]); // trigger_id I[4] = ntohl(I[4]); // fad_evt_counter I[5] = ntohl(I[5]); // REFCLK_frequency S[12] = ntohs(S[12]); // board id S[13] = ntohs(S[13]); // adc_clock_phase_shift S[14] = ntohs(S[14]); // number_of_triggers_to_generate S[15] = ntohs(S[15]); // trigger_generator_prescaler I[10] = ntohl(I[10]); // runnumber; I[11] = ntohl(I[11]); // time; for (int s=24; s<24+NTemp+NDAC; s++) S[s] = ntohs(S[s]); // drs_temperature / dac } void READ_STRUCT::swapData() { // swapEventHeaderBytes: End of the header. to channels now int i = 36; for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++) { S[i+0] = ntohs(S[i+0]);//id S[i+1] = ntohs(S[i+1]);//start_cell S[i+2] = ntohs(S[i+2]);//roi S[i+3] = ntohs(S[i+3]);//filling i += 4+S[i+2];//skip the pixel data } } // ========================================================================== bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[]) { int xjr = -1; int xkr = -1; //points to the very first roi int roiPtr = sizeof(PEVNT_HEADER)/2 + 2; roi[0] = ntohs(rd.S[roiPtr]); for (int jr = 0; jr < 9; jr++) { roi[jr] = ntohs(rd.S[roiPtr]); if (roi[jr]>1024) { factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]); return false; } // Check that the roi of pixels jr are compatible with the one of pixel 0 if (jr!=8 && roi[jr]!=roi[0]) { xjr = jr; break; } // Check that the roi of all other DRS chips on boards are compatible for (int kr = 1; kr < 4; kr++) { const int kroi = ntohs(rd.S[roiPtr]); if (kroi != roi[jr]) { xjr = jr; xkr = kr; break; } roiPtr += kroi+4; } } if (xjr>=0) { if (xkr<0) factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]); else factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr])); return false; } if (roi[8] < roi[0]) { factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]); return false; } return true; } list> evtCtrl; shared_ptr mBufEvt(const READ_STRUCT &rd, shared_ptr &actrun) { uint16_t nRoi[9]; if (!checkRoiConsistency(rd, nRoi)) return shared_ptr(); for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++) { // A reference is enough because the evtCtrl holds the shared_ptr anyway const shared_ptr &evt = *it; // If the run is different, go on searching. // We cannot stop searching if a lower run-id is found as in // the case of the events, because theoretically, there // can be the same run on two different days. if (rd.H.runnumber != evt->runNum) continue; // If the ID of the new event if higher than the last one stored // in that run, we have to assign a new slot (leave the loop) if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/) break; if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/) continue; // We have found an entry with the same runID and evtID // Check if ROI is consistent if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8]) { factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.", evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]); return shared_ptr(); } // count for inconsistencies if (evt->trgNum != rd.H.trigger_id) evt->Errors[0]++; if (evt->trgTyp != rd.H.trigger_type) evt->Errors[2]++; //everything seems fine so far ==> use this slot .... return evt; } if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8])) { factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)", actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter); return shared_ptr(); } shared_ptr evt(new EVT_CTRL2); gettimeofday(&evt->time, NULL); evt->runNum = rd.H.runnumber; evt->evNum = rd.H.fad_evt_counter; evt->trgNum = rd.H.trigger_id; evt->trgTyp = rd.H.trigger_type; evt->nRoi = nRoi[0]; evt->nRoiTM = nRoi[8]; const bool newrun = actrun->runId != rd.H.runnumber; if (newrun) { // Since we have started a new run, we know already when to close the // previous run in terms of number of events actrun->maxEvt = actrun->lastEvt; factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d and roi_tm=%d, prev=%d", rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId); // The new run is the active run now actrun = shared_ptr(new RUN_CTRL2); const time_t &tsec = evt->time.tv_sec; actrun->openTime = tsec; actrun->closeTime = tsec + 3600 * 24; // max time allowed actrun->runId = rd.H.runnumber; actrun->roi0 = nRoi[0]; // FIXME: Make obsolete! actrun->roi8 = nRoi[8]; // FIXME: Make obsolete! } // Increase the number of events we have started to receive in this run actrun->lastTime = evt->time.tv_sec; // Time when the last event was received actrun->lastEvt++; // Keep pointer to run of this event evt->runCtrl = actrun; // Secure access to evtCtrl against access in CloseRunFile // This should be the last... otherwise we can run into threading issues // if the event is accessed before it is fully initialized. evtCtrl.push_back(evt); // Signal the fadctrl that a new run has been started // Note this is the only place at which we can ensure that // gotnewRun is called only once // Note that this will callback CloseRunFile, therefor the event // must already be in the evtCtrl structure if (newrun) gotNewRun(*actrun); // An event can be the first and the last, but not the last and the first. // Therefore gotNewRun is called before runFinished. // runFinished signals that the last event of a run was just received. Processing // might still be ongoing, but we can start a new run. const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached if (!cond1 || !cond2) runFinished(); return evt; } void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt) { const int i = rBuf.sockId; memcpy(evt->FADhead.get()+i, &rBuf.H, sizeof(PEVNT_HEADER)); int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts // consistency of ROIs have been checked already (is it all correct?) const uint16_t &roi = rBuf.S[src+2]; // different sort in FAD board..... for (int px = 0; px < 9; px++) { for (int drs = 0; drs < 4; drs++) { const int16_t pixC = rBuf.S[src+1]; // start-cell const int16_t pixR = rBuf.S[src+2]; // roi //here we should check if pixH is correct .... const int pixS = i*36 + drs*9 + px; evt->fEvent->StartPix[pixS] = pixC; memcpy(evt->fEvent->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2); src += 4+pixR; // Treatment for ch 9 (TM channel) if (px != 8) continue; const int tmS = i*4 + drs; //and we have additional TM info if (pixR > roi) { evt->fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024; memcpy(evt->fEvent->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2); } else { evt->fEvent->StartTM[tmS] = -1; } } } } // ========================================================================== uint64_t reportIncomplete(const shared_ptr &evt, const char *txt) { factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)", evt->runNum, evt->evNum, evtCtrl.size(), txt); uint64_t report = 0; char str[1000]; int ik=0; for (int ib=0; ibboard[ib]; if (jb>=0) // data received from that board { str[ik++] = '0'+(jb%10); continue; } // FIXME: This is not synchronous... it reports // accoridng to the current connection status, not w.r.t. to the // one when the event was taken. if (gi_NumConnect[ib]==0) // board not connected { str[ik++] = 'x'; continue; } // data from this board lost str[ik++] = '.'; report |= ((uint64_t)1)<> processingQueue1(bind(&applyCalib, placeholders::_1)); // If this is not convenient anymore, it could be replaced by // a command queue, to which command+data is posted, // (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo) void writeEvt(const shared_ptr &evt) { const shared_ptr &run = evt->runCtrl; bool rc1 = true; // Is this a valid event or just an empty event to trigger run close? // If this is not an empty event open the new run-file // Empty events are there to trigger run-closing conditions if (evt->runNum>=0) { // File not yet open if (run->fileStat==kFileNotYetOpen) { // runOpen will close a previous run, if still open if (!runOpen(evt)) { factPrintf(MessageImp::kError, "writeEvt: Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum); run->fileStat = kFileClosed; return; } factPrintf(MessageImp::kInfo, "writeEvt: Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum); run->fileStat = kFileOpen; } // Here we have a valid calibration and can go on with that. processingQueue1.post(evt); // File already closed if (run->fileStat==kFileClosed) return; rc1 = runWrite(evt); if (!rc1) factPrintf(MessageImp::kError, "writeEvt: Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum); } const bool cond1 = run->lastEvt < run->maxEvt; // max number of events not reached const bool cond2 = run->lastTime < run->closeTime; // max time not reached const bool cond3 = rc1; // Write successfull // File is not yet to be closed. if (cond1 && cond2 && cond3) return; runClose(); run->fileStat = kFileClosed; string str; if (!cond1) str += to_string(run->maxEvt)+" evts reached"; if (!cond1 && (!cond2 || !cond3)) str += ", "; if (!cond2) str += to_string(run->closeTime-run->openTime)+"s reached"; if ((!cond1 || !cond2) && !cond3) str += ", "; if (!cond3) str += "runWrite failed"; factPrintf(MessageImp::kInfo, "File closed because %s", str.c_str()); } Queue> secondaryQueue(bind(&writeEvt, placeholders::_1)); void procEvt(const shared_ptr &evt) { if (evt->runNum>=0) { evt->fEvent->Errors[0] = evt->Errors[0]; evt->fEvent->Errors[1] = evt->Errors[1]; evt->fEvent->Errors[2] = evt->Errors[2]; evt->fEvent->Errors[3] = evt->Errors[3]; for (int ib=0; ibfEvent->BoardTime[ib] = evt->FADhead.get()[ib].time; const int rc = eventCheck(evt); if (rc < 0) return; } // If file is open post the event for being written secondaryQueue.post(evt); } // ========================================================================== // ========================================================================== shared_ptr actrun; // needed in CloseRunFile /* task 1-4: lock1()-lock4(); while (1) { wait for signal [lockN]; // unlocked while (n!=10) wait sockets; read; lockM(); finished[n] = true; signal(mainloop); unlockM(); } mainloop: while (1) { lockM(); while (!finished[0] || !finished[1] ...) wait for signal [lockM]; // unlocked... signals can be sent finished[0-1] = false; unlockM() copy data to queue // locked lockN[0-3]; signalN[0-3]; unlockN[0-3]; } */ /* while (g_reset) { shared_ptr evt = new shared_ptr<>; // Check that all sockets are connected for (int i=0; i<40; i++) if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0) factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno); while (g_reset) { if (READ_STRUCT::wait()<0) break; if (rc_epoll==0) break; for (int jj=0; jjconnected) continue; const bool rc_read = rs->read(); if (!rc_read) continue; if (rs->bufTyp==READ_STRUCT::kHeader) { [...] } [...] if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0) factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno); } if (once_a_second) { if (evt==timeout) break; } } if (evt.nBoards==actBoards) primaryQueue.post(evt); } */ void CloseRunFile() { // Create a copy of the shared_ptr to ensure // is not replaced in the middle of the action const shared_ptr run = actrun; run->maxEvt = run->lastEvt; } bool mainloop(READ_STRUCT *rd) { factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop"); Queue> primaryQueue(bind(&procEvt, placeholders::_1)); primaryQueue.start(); secondaryQueue.start(); actrun = shared_ptr(new RUN_CTRL2); //time in seconds time_t gi_SecTime = time(NULL)-1; //loop until global variable g_runStat claims stop g_reset = 0; while (g_reset == 0) { #ifdef USE_SELECT fd_set readfs; FD_ZERO(&readfs); int nfsd = 0; for (int i=0; i=0 && rd[i].connected && rd[i].bufLen>0) { FD_SET(rd[i].socket, &readfs); if (rd[i].socket>nfsd) nfsd = rd[i].socket; } timeval tv; tv.tv_sec = 0; tv.tv_usec = 100; const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv); // 0: timeout // -1: error if (rc_select<0) { factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno); continue; } #endif #ifdef USE_EPOLL const int rc_epoll = READ_STRUCT::wait(); if (rc_epoll<0) break; #endif #ifdef USE_EPOLL for (int jj=0; jjconnected) continue; #endif #ifdef USE_SELECT if (!FD_ISSET(rs->socket, &readfs)) continue; #endif #ifdef COMPLETE_EVENTS if (rs->bufTyp==READ_STRUCT::kWait) continue; #endif // ================================================================== const bool rc_read = rs->read(); // Connect might have gotten closed during read gi_NumConnect[rs->sockId] = rs->connected; gj.numConn[rs->sockId] = rs->connected; // Read either failed or disconnected, or the buffer is not yet full if (!rc_read) continue; // ================================================================== if (rs->bufTyp==READ_STRUCT::kHeader) { //check if startflag correct; else shift block .... // FIXME: This is not enough... this combination of // bytes can be anywhere... at least the end bytes // must be checked somewhere, too. uint k; for (k=0; kB[k]==0xfb && rs->B[k+1] == 0x01) //if (*reinterpret_cast(rs->B+k) == 0xfb01) break; } rs->skip += k; //no start of header found if (k==sizeof(PEVNT_HEADER)-1) { rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1]; rs->bufPos = rs->B+1; rs->bufLen = sizeof(PEVNT_HEADER)-1; continue; } if (k > 0) { memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k); rs->bufPos -= k; rs->bufLen += k; continue; // We need to read more (bufLen>0) } if (rs->skip>0) { factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId); rs->skip = 0; } // Swap the header entries from network to host order rs->swapHeader(); rs->bufTyp = READ_STRUCT::kData; rs->bufLen = rs->len() - sizeof(PEVNT_HEADER); debugHead(rs->B); // i and fadBoard not used continue; } const uint16_t &end = *reinterpret_cast(rs->bufPos-2); if (end != 0xfe04) { factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x", rs->sockId, rs->H.fad_evt_counter, rs->len(), end); // ready to read next header rs->bufTyp = READ_STRUCT::kHeader; rs->bufLen = sizeof(PEVNT_HEADER); rs->bufPos = rs->B; // FIXME: What to do with the validity flag? continue; } // get index into mBuffer for this event (create if needed) const shared_ptr evt = mBufEvt(*rs, actrun); // We have a valid entry, but no memory has yet been allocated if (evt && !evt->FADhead) { // Try to get memory from the big buffer PEVNT_HEADER *mem = (PEVNT_HEADER*)Memory::malloc(); if (!mem) { // If this works properly, this is a hack which can be removed, or // replaced by a signal or dim message if (!rs->repmem) { factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum); rs->repmem = true; } continue; } evt->initEvent(shared_ptr(mem, Memory::free)); } // ready to read next header rs->bufTyp = READ_STRUCT::kHeader; rs->bufLen = sizeof(PEVNT_HEADER); rs->bufPos = rs->B; // Fatal error occured. Event cannot be processed. Skip it. Start reading next header. if (!evt) continue; /* const int fad = (i/10)<<8)|(i%10); if (fad != rs->H.board_id) { factPrintf(MessageImp::kWarn, "Board ID mismatch. Expected %x, got %x", fad, rs->H.board_id); }*/ // This should never happen if (evt->board[rs->sockId] != -1) { factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.", evt->evNum, rs->sockId, rs->sockId, rs->len()); // FIXME: What to do with the validity flag? continue; // Continue reading next header } // Swap the data entries (board headers) from network to host order rs->swapData(); // Copy data from rd[i] to mBuffer[evID] copyData(*rs, evt.get()); #ifdef COMPLETE_EVENTS // Do not read anmymore from this board until the whole event has been received rs->bufTyp = READ_STRUCT::kWait; #endif // now we have stored a new board contents into Event structure evt->fEvent->NumBoards++; evt->board[rs->sockId] = rs->sockId; evt->nBoard++; // event not yet complete if (evt->nBoard < READ_STRUCT::activeSockets) continue; // All previous events are now flagged as incomplete ("expired") // and will be removed. (This is a bit tricky, because pop_front() // would invalidate the current iterator if not done _after_ the increment) for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); ) { const bool found = it->get()==evt.get(); if (!found) reportIncomplete(*it, "expired"); else primaryQueue.post(evt); it++; evtCtrl.pop_front(); // We reached the current event, so we are done if (found) break; } #ifdef COMPLETE_EVENTS for (int j=0; j<40; j++) { //if (rs->bufTyp==READ_STRUCT::kWait) { rs->bufTyp = READ_STRUCT::kHeader; rs->bufLen = sizeof(PEVNT_HEADER); rs->bufPos = rs->B; } } #endif } // end for loop over all sockets // ================================================================== // +1 -> idx=0 // -1 -> idx=0 // +2 -> idx=0 // -2 -> idx=0 // +3 -> idx=0 // -3 -> idx=0 // +4 -> idx=0 // -4 -> idx=0 // +5 -> idx=0 // -5 -> idx=0 // +6 -> idx=0 // -6 -> idx=0 // // ================================================================== const time_t actTime = time(NULL); if (actTime == gi_SecTime) { #if !defined(USE_SELECT) && !defined(USE_EPOLL) if (evtCtrl.size()==0) usleep(1); #endif continue; } gi_SecTime = actTime; // ================================================================== //loop over all active events and flag those older than read-timeout //delete those that are written to disk .... //const int count = evtCtrl.size(); // This could be improved having the pointer which separates the queue with // the incomplete events from the queue with the complete events for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); ) { // A reference is enough because the shared_ptr is hold by the evtCtrl const shared_ptr &evt = *it; // The first event is the oldest. If the first event within the // timeout window was received, we can stop searchinf further. if (evt->time.tv_sec>=actTime - 30) break; // This will result in the emission of a dim service. // It doesn't matter if that takes comparably long, // because we have to stop the run anyway. const uint64_t rep = reportIncomplete(evt, "timeout"); factReportIncomplete(rep); it++; evtCtrl.pop_front(); } // ================================================================= // If nothing was received for more than 5min, close file if (actTime-actrun->lastTime>300) actrun->maxEvt = actrun->lastEvt; // This is a fake event to trigger possible run-closing conditions once a second // FIXME: This is not yet ideal because a file would never be closed // if a new file has been started and no events of the new file // have been received yet if (actrun->fileStat==kFileOpen) primaryQueue.post(shared_ptr(new EVT_CTRL2(actrun))); // ================================================================= gj.bufTot = Memory::max_inuse/MAX_TOT_MEM; gj.usdMem = Memory::max_inuse; gj.totMem = Memory::allocated; gj.deltaT = 1000; // temporary, must be improved for (int ib=0; ib=100; } // ========================================================================== // ========================================================================== void StartEvtBuild() { factPrintf(MessageImp::kInfo, "Starting EventBuilder++"); for (int k=0; k