Changeset 16040
- Timestamp:
- 05/22/13 18:28:01 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.cc
r15512 r16040 1 // #define EVTDEBUG2 3 #define NUMSOCK 1 //set to 7 for old configuration4 #define MAXREAD 65536 //64kB wiznet buffer5 6 #include <stdlib.h>7 #include <stdint.h>8 #include <stdarg.h>9 #include <unistd.h>10 #include <stdio.h>11 1 #include <sys/time.h> 12 #include <arpa/inet.h> 13 #include <string.h> 14 #include <math.h> 15 #include <error.h> 16 #include <errno.h> 17 #include <unistd.h> 18 #include <sys/types.h> 19 #include <sys/socket.h> 20 #include <netinet/in.h> 2 #include <sys/epoll.h> 21 3 #include <netinet/tcp.h> 22 #include <pthread.h> 23 #include <sched.h> 24 25 #include <memory> 26 #include <deque> 27 #include <map> 4 5 #include <cstring> 6 #include <cstdarg> 7 #include <list> 8 #include <forward_list> 28 9 29 10 #include "queue.h" 30 11 12 #include "MessageImp.h" 13 14 using namespace std; 15 31 16 #include "EventBuilder.h" 32 17 33 enum Severity 34 { 35 kMessage = 10, ///< Just a message, usually obsolete 36 kInfo = 20, ///< An info telling something which can be interesting to know 37 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour 38 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program 39 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination 40 kDebug = 99, ///< A message used for debugging only 41 }; 42 43 using namespace std; 44 45 #define MIN_LEN 32 // min #bytes needed to interpret FADheader 46 #define MAX_LEN 256*1024 // size of read-buffer per socket 47 48 //#define nanosleep(x,y) 49 50 extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len); 51 extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len); 52 extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len); 53 //extern int runFinish (uint32_t runnr); 54 55 extern "C" void factOut (int severity, int err, char *message); 56 extern void factReportIncomplete (uint64_t rep); 57 58 extern "C" void gotNewRun (int runnr, PEVNT_HEADER * headers); 59 60 61 extern void factStat (GUI_STAT gj); 62 63 extern void factStatNew (EVT_STAT gi); 64 65 extern "C" int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event); 66 67 extern "C" int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, 68 int8_t * buffer); 69 70 extern void debugHead (int i, int j, void *buf); 71 72 extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, 73 int32_t runnr, int state, uint32_t tsec, 74 uint32_t tusec); 75 extern void debugStream (int isock, void *buf, int len); 76 77 int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt); 78 79 int g_maxProc; 80 int gi_maxProc; 81 82 uint g_actTime; 83 int g_runStat; 18 #define MIN_LEN 32 // min #bytes needed to interpret FADheader 19 #define MAX_LEN (36*3*1024) // (data+header)*num channels 20 21 #define COMPLETE_EVENTS 22 //#define USE_EPOLL 23 //#define USE_SELECT 24 25 // ========================================================================== 26 27 bool runOpen(const shared_ptr<EVT_CTRL2> &evt); 28 bool runWrite(const shared_ptr<EVT_CTRL2> &evt); 29 void runClose(); 30 void applyCalib(const shared_ptr<EVT_CTRL2> &evt); 31 void factOut(int severity, const char *message); 32 void factReportIncomplete (uint64_t rep); 33 void gotNewRun(RUN_CTRL2 &run); 34 void runFinished(); 35 void factStat(GUI_STAT gj); 36 int eventCheck(const shared_ptr<EVT_CTRL2> &evt); 37 void debugHead(void *buf); 38 39 // ========================================================================== 40 84 41 int g_reset; 85 42 … … 90 47 uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards 91 48 92 //EVT_STAT gi;93 49 GUI_STAT gj; 94 50 95 #define MAX_EVT 65536 // ( 300s @ 220Hz; 16GB = 5000 evt @ roi=1024 (27s) ; 18000 evt @ roi = 300 ) 96 #define MAX_RUN 8 // Number of concurrent runs 97 98 void factPrintf(int severity, int id, const char *fmt, ...) 51 // ========================================================================== 52 53 void factPrintf(int severity, const char *fmt, ...) 99 54 { 100 55 char str[1000]; … … 105 60 va_end(ap); 106 61 107 factOut(severity, id, str); 108 } 109 62 factOut(severity, str); 63 } 64 65 // ========================================================================== 110 66 111 67 #define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER)) 112 68 #define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM) 113 typedef struct TGB_struct 114 { 115 struct TGB_struct *prev; 116 void *mem; 117 } TGB_entry; 118 119 TGB_entry *tgb_last = NULL; 120 uint64_t tgb_memory = 0; 121 uint64_t tgb_inuse = 0; 122 123 void *TGB_Malloc() 124 { 125 // No free slot available, next alloc would exceed max memory 126 if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem) 127 return NULL; 128 129 // We will return this amount of memory 130 tgb_inuse += MAX_TOT_MEM; 131 132 // No free slot available, allocate a new one 133 if (!tgb_last) 134 { 135 tgb_memory += MAX_TOT_MEM; 136 return malloc(MAX_TOT_MEM); 137 } 138 139 // Get the next free slot from the stack and return it 140 TGB_entry *last = tgb_last; 141 142 void *mem = last->mem; 143 tgb_last = last->prev; 144 145 free(last); 146 147 return mem; 69 70 namespace Memory 71 { 72 uint64_t inuse = 0; 73 uint64_t allocated = 0; 74 75 uint64_t max_inuse = 0; 76 77 mutex mtx; 78 79 forward_list<void*> memory; 80 81 void *malloc() 82 { 83 // No free slot available, next alloc would exceed max memory 84 if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem) 85 return NULL; 86 87 // We will return this amount of memory 88 // This is not 100% thread safe, but it is not a super accurate measure anyway 89 inuse += MAX_TOT_MEM; 90 if (inuse>max_inuse) 91 max_inuse = inuse; 92 93 void *mem = NULL; 94 95 if (memory.empty()) 96 { 97 // No free slot available, allocate a new one 98 allocated += MAX_TOT_MEM; 99 mem = new char[MAX_TOT_MEM]; 100 } 101 else 102 { 103 // Get the next free slot from the stack and return it 104 const lock_guard<mutex> lock(mtx); 105 mem = memory.front(); 106 memory.pop_front(); 107 } 108 109 memset(mem, 0, MAX_HEAD_MEM); 110 return mem; 111 }; 112 113 void free(void *mem) 114 { 115 if (!mem) 116 return; 117 118 // Decrease the amont of memory in use accordingly 119 inuse -= MAX_TOT_MEM; 120 121 // If the maximum memory has changed, we might be over the limit. 122 // In this case: free a slot 123 if (allocated>g_maxMem) 124 { 125 delete [] (char*)mem; 126 allocated -= MAX_TOT_MEM; 127 return; 128 } 129 130 const lock_guard<mutex> lock(mtx); 131 memory.push_front(mem); 132 } 148 133 }; 149 134 150 void TGB_free(void *mem) 151 { 152 if (!mem) 135 // ========================================================================== 136 137 struct READ_STRUCT 138 { 139 enum buftyp_t 140 { 141 kStream, 142 kHeader, 143 kData, 144 #ifdef COMPLETE_EVENTS 145 kWait 146 #endif 147 }; 148 149 // ---------- connection ---------- 150 151 static uint activeSockets; 152 153 int sockId; // socket id (board number) 154 int socket; // socket handle 155 bool connected; // is this socket connected? 156 157 struct sockaddr_in SockAddr; // Socket address copied from wrapper during socket creation 158 159 // ------------ epoll ------------- 160 161 static int fd_epoll; 162 static epoll_event events[NBOARDS]; 163 164 static void init(); 165 static void close(); 166 static int wait(); 167 static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); } 168 169 // ------------ buffer ------------ 170 171 buftyp_t bufTyp; // what are we reading at the moment: 0=header 1=data -1=skip ... 172 173 uint32_t bufLen; // number of bytes left to read 174 uint8_t *bufPos; // next byte to read to the buffer next 175 176 union 177 { 178 uint8_t B[MAX_LEN]; 179 uint16_t S[MAX_LEN / 2]; 180 uint32_t I[MAX_LEN / 4]; 181 uint64_t L[MAX_LEN / 8]; 182 PEVNT_HEADER H; 183 }; 184 185 uint64_t rateBytes; 186 uint32_t skip; // number of bytes skipped before start of event 187 bool repmem; // reportet no mmemory free 188 189 uint32_t len() const { return uint32_t(H.package_length)*2; } 190 191 void swapHeader(); 192 void swapData(); 193 194 // -------------------------------- 195 196 READ_STRUCT() : socket(-1), connected(false), rateBytes(0) 197 { 198 if (fd_epoll<0) 199 init(); 200 } 201 ~READ_STRUCT() 202 { 203 destroy(); 204 } 205 206 void destroy(); 207 bool create(sockaddr_in addr); 208 void check(int, sockaddr_in addr); 209 bool read(); 210 }; 211 212 int READ_STRUCT::wait() 213 { 214 // wait for something to do... 215 const int rc = epoll_wait(fd_epoll, events, NBOARDS, 10); // max, timeout[ms] 216 if (rc>=0) 217 return rc; 218 219 if (errno==EINTR) // timout or signal interruption 220 return 0; 221 222 factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno); 223 return -1; 224 } 225 226 uint READ_STRUCT::activeSockets = 0; 227 int READ_STRUCT::fd_epoll = -1; 228 epoll_event READ_STRUCT::events[NBOARDS]; 229 230 void READ_STRUCT::init() 231 { 232 if (fd_epoll>=0) 153 233 return; 154 234 155 // Add the last free slot to the stack 156 TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry)); 157 158 // FIXME: Really free memory if memory usuage exceeds g_maxMem 159 160 entry->prev = tgb_last; 161 entry->mem = mem; 162 163 tgb_last = entry; 164 165 // Decrease the amont of memory in use accordingly 166 tgb_inuse -= MAX_TOT_MEM; 167 168 gj.usdMem = tgb_inuse; 169 gj.bufTot--; 170 } 171 172 //RUN_CTRL runCtrl[MAX_RUN]; 173 174 /* 175 *** Definition of rdBuffer to read in IP packets; keep it global !!!! 176 */ 177 178 typedef union 179 { 180 uint8_t B[MAX_LEN]; 181 uint16_t S[MAX_LEN / 2]; 182 uint32_t I[MAX_LEN / 4]; 183 uint64_t L[MAX_LEN / 8]; 184 } CNV_FACT; 185 186 typedef struct 187 { 188 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ... 189 int32_t bufPos; //next byte to read to the buffer next 190 int32_t bufLen; //number of bytes left to read 191 int32_t skip; //number of bytes skipped before start of event 192 193 int errCnt; //how often connect failed since last successful 194 int sockStat; //-1 if socket not yet connected , 99 if not exist 195 int socket; //contains the sockets 196 197 struct sockaddr_in SockAddr; //IP for each socket 198 199 int evtID; // event ID of event currently read 200 int runID; // run " 201 int ftmID; // event ID from FTM 202 uint fadLen; // FADlength of event currently read 203 int fadVers; // Version of FAD 204 int ftmTyp; // trigger type 205 int Port; 206 207 CNV_FACT *rBuf; 208 209 } READ_STRUCT; 210 211 /*-----------------------------------------------------------------*/ 212 213 214 /*-----------------------------------------------------------------*/ 215 216 217 int 218 GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr, 219 READ_STRUCT * rs) 220 { 221 /* 222 *** generate Address, create sockets and allocates readbuffer for it 223 *** 224 *** if flag==0 generate socket and buffer 225 *** <0 destroy socket and buffer 226 *** >0 close and redo socket 227 *** 228 *** sid : board*7 + port id 229 */ 230 231 //close socket if open 232 if (rs->sockStat == 0) 233 { 234 if (close (rs->socket) > 0) { 235 factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno); 236 } else { 237 factPrintf(kInfo, 771, "Succesfully closed socket %d", sid); 238 } 239 } 240 241 rs->sockStat = 99; 242 243 if (flag < 0) { 244 free (rs->rBuf); //and never open again 245 rs->rBuf = NULL; 246 return 0; 247 } 248 249 250 if (flag == 0) { //generate address and buffer ... 251 rs->Port = port; 252 rs->SockAddr.sin_family = sockAddr->sin_family; 253 rs->SockAddr.sin_port = htons (port); 254 rs->SockAddr.sin_addr = sockAddr->sin_addr; 255 256 rs->rBuf = (CNV_FACT*)malloc (sizeof (CNV_FACT)); 257 if (rs->rBuf == NULL) { 258 factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid); 259 rs->sockStat = 77; 260 return -3; 261 } 262 } 263 264 265 if ((rs->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { 266 factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno); 267 rs->sockStat = 88; 268 return -2; 269 } 270 271 int optval = 1; 272 if (setsockopt (rs->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) { 273 factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); 274 } 275 optval = 10; //start after 10 seconds 276 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) { 277 factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); 278 } 279 optval = 10; //do every 10 seconds 280 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) { 281 factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); 282 } 283 optval = 2; //close after 2 unsuccessful tries 284 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) { 285 factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno); 286 } 287 288 factPrintf(kInfo, 773, "Successfully generated socket %d", sid); 289 290 rs->sockStat = -1; //try to (re)open socket 291 rs->errCnt = 0; 292 return 0; 293 294 } /*-----------------------------------------------------------------*/ 295 296 /*-----------------------------------------------------------------*/ 297 298 int checkRoiConsistency(const CNV_FACT *rbuf, int roi[]) 235 #ifdef USE_EPOLL 236 fd_epoll = epoll_create(NBOARDS); 237 if (fd_epoll<0) 238 { 239 factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno); 240 return; 241 } 242 #endif 243 } 244 245 void READ_STRUCT::close() 246 { 247 #ifdef USE_EPOLL 248 if (::close(fd_epoll) > 0) 249 factPrintf(MessageImp::kFatal, "Closing epoll: %m (close,rc=%d)", errno); 250 else 251 factPrintf(MessageImp::kInfo, "Succesfully closed epoll"); 252 #endif 253 254 fd_epoll = -1; 255 } 256 257 bool READ_STRUCT::create(sockaddr_in sockAddr) 258 { 259 if (socket>=0) 260 return false; 261 262 const int port = ntohs(sockAddr.sin_port) + 1; 263 264 SockAddr.sin_family = sockAddr.sin_family; 265 SockAddr.sin_addr = sockAddr.sin_addr; 266 SockAddr.sin_port = htons(port); 267 268 if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) 269 { 270 factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno); 271 socket = -1; 272 return false; 273 } 274 275 int optval = 1; 276 if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) 277 factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); 278 279 optval = 10; //start after 10 seconds 280 if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) 281 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); 282 283 optval = 10; //do every 10 seconds 284 if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) 285 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); 286 287 optval = 2; //close after 2 unsuccessful tries 288 if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) 289 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); 290 291 factPrintf(MessageImp::kInfo, "Successfully generated socket %d", sockId); 292 293 //connected = false; 294 activeSockets++; 295 296 return true; 297 } 298 299 void READ_STRUCT::destroy() 300 { 301 if (socket==-1) 302 return; 303 304 #ifdef USE_EPOLL 305 // strictly speaking this should not be necessary 306 if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0) 307 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno); 308 #endif 309 310 if (::close(socket) > 0) 311 factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno); 312 else 313 factPrintf(MessageImp::kInfo, "Succesfully closed socket %d", sockId); 314 315 socket = -1; 316 connected = false; 317 activeSockets--; 318 } 319 320 void READ_STRUCT::check(int sockDef, sockaddr_in addr) 321 { 322 // Continue in the most most likely case (performance) 323 //if (socket>=0 && sockDef!=0 && connected) 324 // return; 325 326 // socket open, but should not be open 327 if (socket>=0 && sockDef==0) 328 destroy(); 329 330 // Socket closed, but should be open 331 if (socket<0 && sockDef!=0) 332 create(addr); //generate address and socket 333 334 // Socket closed 335 if (socket<0) 336 return; 337 338 // Socket open and connected: Nothing to do 339 if (connected) 340 return; 341 342 //try to connect if not yet done 343 const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr)); 344 if (rc == -1) 345 return; 346 347 connected = true; 348 349 if (sockDef<0) 350 { 351 bufTyp = READ_STRUCT::kStream; // full data to be skipped 352 bufLen = MAX_LEN; // huge for skipping 353 } 354 else 355 { 356 bufTyp = READ_STRUCT::kHeader; // expect a header 357 bufLen = sizeof(PEVNT_HEADER); // max size to read at begining 358 } 359 360 bufPos = B; // no byte read so far 361 skip = 0; // start empty 362 repmem = false; 363 364 factPrintf(MessageImp::kInfo, "New connection %d (%d)", sockId, socket); 365 366 #ifdef USE_EPOLL 367 epoll_event ev; 368 ev.events = EPOLLIN; 369 ev.data.ptr = this; // user data (union: ev.ptr) 370 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0) 371 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno); 372 #endif 373 } 374 375 bool READ_STRUCT::read() 376 { 377 if (bufLen==0) 378 return true; 379 380 const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT); 381 // recv failed 382 if (jrd<0) 383 { 384 // There was just nothing waiting 385 if (errno==EWOULDBLOCK || errno==EAGAIN) 386 return false; 387 388 factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno); 389 return false; 390 } 391 392 // connection was closed ... 393 if (jrd==0) 394 { 395 factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId); 396 397 destroy();//DestroySocket(rd[i]); //generate address and socket 398 return false; 399 } 400 401 rateBytes += jrd; 402 403 // are we skipping this board ... 404 if (bufTyp==kStream) 405 return false; 406 407 bufPos += jrd; //==> prepare for continuation 408 bufLen -= jrd; 409 410 // not yet all read 411 return bufLen==0; 412 } 413 414 void READ_STRUCT::swapHeader() 415 { 416 S[1] = ntohs(S[1]); // package_length (bytes not swapped!) 417 S[2] = ntohs(S[2]); // version_no 418 S[3] = ntohs(S[3]); // PLLLCK 419 S[4] = ntohs(S[4]); // trigger_crc 420 S[5] = ntohs(S[5]); // trigger_type 421 422 I[3] = ntohl(I[3]); // trigger_id 423 I[4] = ntohl(I[4]); // fad_evt_counter 424 I[5] = ntohl(I[5]); // REFCLK_frequency 425 426 S[12] = ntohs(S[12]); // board id 427 S[13] = ntohs(S[13]); // adc_clock_phase_shift 428 S[14] = ntohs(S[14]); // number_of_triggers_to_generate 429 S[15] = ntohs(S[15]); // trigger_generator_prescaler 430 431 I[10] = ntohl(I[10]); // runnumber; 432 I[11] = ntohl(I[11]); // time; 433 434 for (int s=24; s<24+NTemp+NDAC; s++) 435 S[s] = ntohs(S[s]); // drs_temperature / dac 436 } 437 438 void READ_STRUCT::swapData() 439 { 440 // swapEventHeaderBytes: End of the header. to channels now 441 442 int i = 36; 443 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++) 444 { 445 S[i+0] = ntohs(S[i+0]);//id 446 S[i+1] = ntohs(S[i+1]);//start_cell 447 S[i+2] = ntohs(S[i+2]);//roi 448 S[i+3] = ntohs(S[i+3]);//filling 449 450 i += 4+S[i+2];//skip the pixel data 451 } 452 } 453 454 // ========================================================================== 455 456 bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[]) 299 457 { 300 458 int xjr = -1; … … 304 462 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2; 305 463 306 roi[0] = ntohs(r buf->S[roiPtr]);464 roi[0] = ntohs(rd.S[roiPtr]); 307 465 308 466 for (int jr = 0; jr < 9; jr++) 309 467 { 310 roi[jr] = ntohs(r buf->S[roiPtr]);311 312 if (roi[jr] <0 || roi[jr]>1024)313 { 314 factPrintf( kError, 999, "Illegal roi in channel %d (allowed: 0<=roi<=1024)", jr, roi[jr]);315 return 0;468 roi[jr] = ntohs(rd.S[roiPtr]); 469 470 if (roi[jr]>1024) 471 { 472 factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]); 473 return false; 316 474 } 317 475 … … 326 484 for (int kr = 1; kr < 4; kr++) 327 485 { 328 const int kroi = ntohs(r buf->S[roiPtr]);486 const int kroi = ntohs(rd.S[roiPtr]); 329 487 if (kroi != roi[jr]) 330 488 { … … 340 498 { 341 499 if (xkr<0) 342 factPrintf( kFatal, 1, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);500 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]); 343 501 else 344 factPrintf( kFatal, 1, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rbuf->S[roiPtr]));345 346 return 0;502 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr])); 503 504 return false; 347 505 } 348 506 349 507 if (roi[8] < roi[0]) 350 508 { 351 factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]); 352 //gj.badRoiB++; 353 //gj.badRoi[b]++; 354 return 0; 355 } 356 357 return 1; 358 } 359 360 deque<shared_ptr<EVT_CTRL>> evtCtrl; 361 map<int, shared_ptr<RUN_CTRL>> runCtrl; 362 363 void mBufFree(EVT_CTRL *evt) 364 { 365 TGB_free(evt->FADhead); 366 } 367 368 shared_ptr<EVT_CTRL> mBufEvt(const READ_STRUCT *rs) 369 { 370 int nRoi[9]; 371 if (!checkRoiConsistency(rs->rBuf, nRoi)) 372 return shared_ptr<EVT_CTRL>(); 373 374 const int evID = rs->evtID; 375 const uint runID = rs->runID; 376 const int trgTyp = rs->ftmTyp; 377 const int trgNum = rs->ftmID; 378 const int fadNum = rs->evtID; 509 factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]); 510 return false; 511 } 512 513 return true; 514 } 515 516 list<shared_ptr<EVT_CTRL2>> evtCtrl; 517 518 shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun) 519 { 520 uint16_t nRoi[9]; 521 if (!checkRoiConsistency(rd, nRoi)) 522 return shared_ptr<EVT_CTRL2>(); 379 523 380 524 for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++) 381 525 { 382 const shared_ptr<EVT_CTRL> evt = *it; 526 // A reference is enough because the evtCtrl holds the shared_ptr anyway 527 const shared_ptr<EVT_CTRL2> &evt = *it; 383 528 384 529 // If the run is different, go on searching. … … 386 531 // the case of the events, because theoretically, there 387 532 // can be the same run on two different days. 388 if (r unID!= evt->runNum)533 if (rd.H.runnumber != evt->runNum) 389 534 continue; 390 535 391 536 // If the ID of the new event if higher than the last one stored 392 537 // in that run, we have to assign a new slot (leave the loop) 393 if ( evID > evt->evNum)538 if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/) 394 539 break; 395 540 396 if ( evID != evt->evNum)541 if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/) 397 542 continue; 398 543 … … 401 546 if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8]) 402 547 { 403 factPrintf( kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",548 factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.", 404 549 evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]); 405 return shared_ptr<EVT_CTRL >();550 return shared_ptr<EVT_CTRL2>(); 406 551 } 407 552 408 553 // count for inconsistencies 409 if (evt->trgNum != trgNum)554 if (evt->trgNum != rd.H.trigger_id) 410 555 evt->Errors[0]++; 411 if (evt->fadNum != fadNum) 412 evt->Errors[1]++; 413 if (evt->trgTyp != trgTyp) 556 if (evt->trgTyp != rd.H.trigger_type) 414 557 evt->Errors[2]++; 415 558 … … 418 561 } 419 562 420 struct timeval tv; 421 gettimeofday(&tv, NULL); 422 423 auto ir = runCtrl.find(runID); 424 if (ir==runCtrl.end()) 425 { 426 shared_ptr<RUN_CTRL> run(new RUN_CTRL); 427 428 run->runId = runID; 429 run->roi0 = nRoi[0]; // FIXME: Make obsolete! 430 run->roi8 = nRoi[8]; // FIXME: Make obsolete! 431 run->fileId = -2; 432 run->lastEvt = 1; // Number of events partially started to read 433 run->actEvt = 0; // Number of written events (write) 434 run->procEvt = 0; // Number of successfully checked events (checkEvent) 435 run->maxEvt = 999999999; // max number events allowed 436 run->lastTime = tv.tv_sec; // Time when the last event was written 437 run->closeTime = tv.tv_sec + 3600 * 24; //max time allowed 438 439 ir = runCtrl.insert(make_pair(runID, run)).first; 440 } 441 442 const shared_ptr<RUN_CTRL> run = ir->second; 443 444 if (run->roi0 != nRoi[0] || run->roi8 != nRoi[8]) 445 { 446 factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)", 447 run->roi0, run->roi8, nRoi[0], nRoi[8], runID, evID); 448 return shared_ptr<EVT_CTRL>(); 449 } 450 451 const shared_ptr<EVT_CTRL> evt(new EVT_CTRL, mBufFree); 452 453 //flag all boards as unused 454 evt->nBoard = 0; 455 for (int b=0; b<NBOARDS; b++) 456 evt->board[b] = -1; 457 458 evt->run = run; 459 evt->pcTime[0] = tv.tv_sec; 460 evt->pcTime[1] = tv.tv_usec; 461 evt->nRoi = nRoi[0]; 462 evt->nRoiTM = nRoi[8]; 463 evt->evNum = evID; 464 evt->runNum = runID; 465 evt->fadNum = fadNum; 466 evt->trgNum = trgNum; 467 evt->trgTyp = trgTyp; 468 evt->Errors[0] = 0; 469 evt->Errors[1] = 0; 470 evt->Errors[2] = 0; 471 evt->Errors[3] = 0; 472 evt->fEvent = NULL; 473 evt->FADhead = NULL; 474 475 // -1: kInValid 476 // 0: kValid 477 // 1-40: kIncomplete 478 // 90: kIncompleteReported 479 // 100: kCompleteEventInBuffer 480 // 1000+x: kToBeProcessedByThreadX 481 // 5000: kToBeWritten 482 // 10000: kToBeDeleted 483 484 evt->evtStat = 0; 485 563 if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8])) 564 { 565 factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)", 566 actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter); 567 return shared_ptr<EVT_CTRL2>(); 568 } 569 570 shared_ptr<EVT_CTRL2> evt(new EVT_CTRL2); 571 572 gettimeofday(&evt->time, NULL); 573 574 evt->runNum = rd.H.runnumber; 575 evt->evNum = rd.H.fad_evt_counter; 576 577 evt->trgNum = rd.H.trigger_id; 578 evt->trgTyp = rd.H.trigger_type; 579 580 evt->nRoi = nRoi[0]; 581 evt->nRoiTM = nRoi[8]; 582 583 const bool newrun = actrun->runId != rd.H.runnumber; 584 if (newrun) 585 { 586 // Since we have started a new run, we know already when to close the 587 // previous run in terms of number of events 588 actrun->maxEvt = actrun->lastEvt; 589 590 factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d and roi_tm=%d, prev=%d", 591 rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId); 592 593 // The new run is the active run now 594 actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2); 595 596 const time_t &tsec = evt->time.tv_sec; 597 598 actrun->openTime = tsec; 599 actrun->closeTime = tsec + 3600 * 24; // max time allowed 600 actrun->runId = rd.H.runnumber; 601 actrun->roi0 = nRoi[0]; // FIXME: Make obsolete! 602 actrun->roi8 = nRoi[8]; // FIXME: Make obsolete! 603 } 604 605 // Increase the number of events we have started to receive in this run 606 actrun->lastTime = evt->time.tv_sec; // Time when the last event was received 607 actrun->lastEvt++; 608 609 // Keep pointer to run of this event 610 evt->runCtrl = actrun; 611 612 // Secure access to evtCtrl against access in CloseRunFile 613 // This should be the last... otherwise we can run into threading issues 614 // if the event is accessed before it is fully initialized. 486 615 evtCtrl.push_back(evt); 487 616 617 // Signal the fadctrl that a new run has been started 618 // Note this is the only place at which we can ensure that 619 // gotnewRun is called only once 620 // Note that this will callback CloseRunFile, therefor the event 621 // must already be in the evtCtrl structure 622 if (newrun) 623 gotNewRun(*actrun); 624 625 // An event can be the first and the last, but not the last and the first. 626 // Therefore gotNewRun is called before runFinished. 627 // runFinished signals that the last event of a run was just received. Processing 628 // might still be ongoing, but we can start a new run. 629 const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached 630 const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached 631 if (!cond1 || !cond2) 632 runFinished(); 633 488 634 return evt; 489 490 } /*-----------------------------------------------------------------*/ 491 492 493 void initEvent(const shared_ptr<EVT_CTRL> &evt) 494 { 495 evt->fEvent = (EVENT*)((char*)evt->FADhead+MAX_HEAD_MEM); 496 memset(evt->fEvent->Adc_Data, 0, (NPIX+NTMARK)*2*evt->nRoi); 497 498 //flag all pixels as unused 499 for (int k = 0; k < NPIX; k++) 500 evt->fEvent->StartPix[k] = -1; 501 502 //flag all TMark as unused 503 for (int k = 0; k < NTMARK; k++) 504 evt->fEvent->StartTM[k] = -1; 505 506 evt->fEvent->NumBoards = 0; 507 evt->fEvent->SoftTrig = 0; 508 evt->fEvent->PCTime = evt->pcTime[0]; 509 evt->fEvent->PCUsec = evt->pcTime[1]; 510 evt->fEvent->Roi = evt->nRoi; 511 evt->fEvent->RoiTM = evt->nRoiTM; 512 evt->fEvent->EventNum = evt->evNum; 513 evt->fEvent->TriggerNum = evt->trgNum; 514 evt->fEvent->TriggerType = evt->trgTyp; 515 } 516 517 518 uint64_t reportIncomplete(const shared_ptr<EVT_CTRL> &evt, const char *txt) 519 { 520 factPrintf(kWarn, 601, "skip incomplete evt (run=%d, evt=%d, %s)", 521 evt->runNum, evt->evNum, txt); 635 } 636 637 638 void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt) 639 { 640 const int i = rBuf.sockId; 641 642 memcpy(evt->FADhead.get()+i, &rBuf.H, sizeof(PEVNT_HEADER)); 643 644 int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts 645 646 // consistency of ROIs have been checked already (is it all correct?) 647 const uint16_t &roi = rBuf.S[src+2]; 648 649 // different sort in FAD board..... 650 for (int px = 0; px < 9; px++) 651 { 652 for (int drs = 0; drs < 4; drs++) 653 { 654 const int16_t pixC = rBuf.S[src+1]; // start-cell 655 const int16_t pixR = rBuf.S[src+2]; // roi 656 //here we should check if pixH is correct .... 657 658 const int pixS = i*36 + drs*9 + px; 659 660 evt->fEvent->StartPix[pixS] = pixC; 661 662 memcpy(evt->fEvent->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2); 663 664 src += 4+pixR; 665 666 // Treatment for ch 9 (TM channel) 667 if (px != 8) 668 continue; 669 670 const int tmS = i*4 + drs; 671 672 //and we have additional TM info 673 if (pixR > roi) 674 { 675 evt->fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024; 676 677 memcpy(evt->fEvent->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2); 678 } 679 else 680 { 681 evt->fEvent->StartTM[tmS] = -1; 682 } 683 } 684 } 685 } 686 687 // ========================================================================== 688 689 uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt) 690 { 691 factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)", 692 evt->runNum, evt->evNum, evtCtrl.size(), txt); 522 693 523 694 uint64_t report = 0; … … 541 712 // accoridng to the current connection status, not w.r.t. to the 542 713 // one when the event was taken. 543 if (gi_NumConnect[ib] <=0) // board not connected714 if (gi_NumConnect[ib]==0) // board not connected 544 715 { 545 716 str[ik++] = 'x'; … … 555 726 str[ik] = 0; 556 727 557 factOut( kWarn, 601, str);728 factOut(MessageImp::kWarn, str); 558 729 559 730 return report; 560 731 } 561 732 562 // i == board 563 void copyData(CNV_FACT *rbuf, int i, const shared_ptr<EVT_CTRL> &evt) 564 { 565 // swapEventHeaderBytes: End of the header. to channels now 566 int eStart = 36; 567 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++) 568 { 569 rbuf->S[eStart+0] = ntohs(rbuf->S[eStart+0]);//id 570 rbuf->S[eStart+1] = ntohs(rbuf->S[eStart+1]);//start_cell 571 rbuf->S[eStart+2] = ntohs(rbuf->S[eStart+2]);//roi 572 rbuf->S[eStart+3] = ntohs(rbuf->S[eStart+3]);//filling 573 574 eStart += 4+rbuf->S[eStart+2];//skip the pixel data 575 } 576 577 memcpy(&evt->FADhead[i], rbuf, sizeof(PEVNT_HEADER)); 578 579 int src = sizeof(PEVNT_HEADER) / 2; 580 581 // consistency of ROIs have been checked already (is it all correct?) 582 const int roi = rbuf->S[src+2]; 583 584 // different sort in FAD board..... 585 for (int px = 0; px < 9; px++) 586 { 587 for (int drs = 0; drs < 4; drs++) 588 { 589 // pixH = rd[i].rBuf->S[src++]; // ID 590 src++; 591 592 const int pixC = rbuf->S[src++]; // start-cell 593 const int pixR = rbuf->S[src++]; // roi 594 //here we should check if pixH is correct .... 595 596 const int pixS = i * 36 + drs * 9 + px; 597 src++; 598 599 evt->fEvent->StartPix[pixS] = pixC; 600 601 const int dest1 = pixS * roi; 602 memcpy(&evt->fEvent->Adc_Data[dest1], &rbuf->S[src], roi * 2); 603 604 src += pixR; 605 606 if (px == 8) 733 // ========================================================================== 734 // ========================================================================== 735 736 Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&applyCalib, placeholders::_1)); 737 738 // If this is not convenient anymore, it could be replaced by 739 // a command queue, to which command+data is posted, 740 // (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo) 741 void writeEvt(const shared_ptr<EVT_CTRL2> &evt) 742 { 743 const shared_ptr<RUN_CTRL2> &run = evt->runCtrl; 744 745 bool rc1 = true; 746 747 // Is this a valid event or just an empty event to trigger run close? 748 // If this is not an empty event open the new run-file 749 // Empty events are there to trigger run-closing conditions 750 if (evt->runNum>=0) 751 { 752 // File not yet open 753 if (run->fileStat==kFileNotYetOpen) 754 { 755 // runOpen will close a previous run, if still open 756 if (!runOpen(evt)) 607 757 { 608 const int tmS = i * 4 + drs; 609 610 //and we have additional TM info 611 if (pixR > roi) 758 factPrintf(MessageImp::kError, "writeEvt: Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum); 759 run->fileStat = kFileClosed; 760 return; 761 } 762 763 factPrintf(MessageImp::kInfo, "writeEvt: Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum); 764 run->fileStat = kFileOpen; 765 } 766 767 // Here we have a valid calibration and can go on with that. 768 processingQueue1.post(evt); 769 770 // File already closed 771 if (run->fileStat==kFileClosed) 772 return; 773 774 rc1 = runWrite(evt); 775 if (!rc1) 776 factPrintf(MessageImp::kError, "writeEvt: Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum); 777 } 778 779 const bool cond1 = run->lastEvt < run->maxEvt; // max number of events not reached 780 const bool cond2 = run->lastTime < run->closeTime; // max time not reached 781 const bool cond3 = rc1; // Write successfull 782 783 // File is not yet to be closed. 784 if (cond1 && cond2 && cond3) 785 return; 786 787 runClose(); 788 run->fileStat = kFileClosed; 789 790 string str; 791 if (!cond1) str += to_string(run->maxEvt)+" evts reached"; 792 if (!cond1 && (!cond2 || !cond3)) str += ", "; 793 if (!cond2) str += to_string(run->closeTime-run->openTime)+"s reached"; 794 if ((!cond1 || !cond2) && !cond3) str += ", "; 795 if (!cond3) str += "runWrite failed"; 796 factPrintf(MessageImp::kInfo, "File closed because %s", str.c_str()); 797 } 798 799 Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1)); 800 801 void procEvt(const shared_ptr<EVT_CTRL2> &evt) 802 { 803 if (evt->runNum>=0) 804 { 805 evt->fEvent->Errors[0] = evt->Errors[0]; 806 evt->fEvent->Errors[1] = evt->Errors[1]; 807 evt->fEvent->Errors[2] = evt->Errors[2]; 808 evt->fEvent->Errors[3] = evt->Errors[3]; 809 810 for (int ib=0; ib<NBOARDS; ib++) 811 evt->fEvent->BoardTime[ib] = evt->FADhead.get()[ib].time; 812 813 const int rc = eventCheck(evt); 814 if (rc < 0) 815 return; 816 } 817 818 // If file is open post the event for being written 819 secondaryQueue.post(evt); 820 } 821 822 // ========================================================================== 823 // ========================================================================== 824 825 shared_ptr<RUN_CTRL2> actrun; // needed in CloseRunFile 826 827 /* 828 task 1-4: 829 830 lock1()-lock4(); 831 while (1) 832 { 833 wait for signal [lockN]; // unlocked 834 835 while (n!=10) 836 wait sockets; 837 read; 838 839 lockM(); 840 finished[n] = true; 841 signal(mainloop); 842 unlockM(); 843 } 844 845 846 mainloop: 847 848 while (1) 849 { 850 lockM(); 851 while (!finished[0] || !finished[1] ...) 852 wait for signal [lockM]; // unlocked... signals can be sent 853 finished[0-1] = false; 854 unlockM() 855 856 copy data to queue // locked 857 858 lockN[0-3]; 859 signalN[0-3]; 860 unlockN[0-3]; 861 } 862 863 864 */ 865 866 /* 867 while (g_reset) 868 { 869 shared_ptr<EVT_CTRL2> evt = new shared_ptr<>; 870 871 // Check that all sockets are connected 872 873 for (int i=0; i<40; i++) 874 if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0) 875 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno); 876 877 while (g_reset) 878 { 879 if (READ_STRUCT::wait()<0) 880 break; 881 882 if (rc_epoll==0) 883 break; 884 885 for (int jj=0; jj<rc_epoll; jj++) 886 { 887 READ_STRUCT *rs = READ_STRUCT::get(jj); 888 if (!rs->connected) 889 continue; 890 891 const bool rc_read = rs->read(); 892 if (!rc_read) 893 continue; 894 895 if (rs->bufTyp==READ_STRUCT::kHeader) 896 { 897 [...] 898 } 899 900 [...] 901 902 if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0) 903 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno); 904 } 905 906 if (once_a_second) 907 { 908 if (evt==timeout) 909 break; 910 } 911 } 912 913 if (evt.nBoards==actBoards) 914 primaryQueue.post(evt); 915 } 916 */ 917 918 void CloseRunFile() 919 { 920 // Create a copy of the shared_ptr to ensure 921 // is not replaced in the middle of the action 922 const shared_ptr<RUN_CTRL2> run = actrun; 923 run->maxEvt = run->lastEvt; 924 } 925 926 bool mainloop(READ_STRUCT *rd) 927 { 928 factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop"); 929 930 Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1)); 931 932 primaryQueue.start(); 933 secondaryQueue.start(); 934 935 actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2); 936 937 //time in seconds 938 time_t gi_SecTime = time(NULL)-1; 939 940 //loop until global variable g_runStat claims stop 941 g_reset = 0; 942 while (g_reset == 0) 943 { 944 #ifdef USE_SELECT 945 fd_set readfs; 946 FD_ZERO(&readfs); 947 int nfsd = 0; 948 for (int i=0; i<NBOARDS; i++) 949 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0) 950 { 951 FD_SET(rd[i].socket, &readfs); 952 if (rd[i].socket>nfsd) 953 nfsd = rd[i].socket; 954 } 955 956 timeval tv; 957 tv.tv_sec = 0; 958 tv.tv_usec = 100; 959 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv); 960 // 0: timeout 961 // -1: error 962 if (rc_select<0) 963 { 964 factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno); 965 continue; 966 } 967 #endif 968 969 #ifdef USE_EPOLL 970 const int rc_epoll = READ_STRUCT::wait(); 971 if (rc_epoll<0) 972 break; 973 #endif 974 975 #ifdef USE_EPOLL 976 for (int jj=0; jj<rc_epoll; jj++) 977 #else 978 for (int jj=0; jj<NBOARDS; jj++) 979 #endif 980 { 981 #ifdef USE_EPOLL 982 // FIXME: How to get i? 983 READ_STRUCT *rs = READ_STRUCT::get(jj); 984 #else 985 986 const int i = (jj%4)*10 + (jj/4); 987 READ_STRUCT *rs = &rd[i]; 988 if (!rs->connected) 989 continue; 990 #endif 991 992 #ifdef USE_SELECT 993 if (!FD_ISSET(rs->socket, &readfs)) 994 continue; 995 #endif 996 997 998 #ifdef COMPLETE_EVENTS 999 if (rs->bufTyp==READ_STRUCT::kWait) 1000 continue; 1001 #endif 1002 1003 // ================================================================== 1004 1005 const bool rc_read = rs->read(); 1006 1007 // Connect might have gotten closed during read 1008 gi_NumConnect[rs->sockId] = rs->connected; 1009 gj.numConn[rs->sockId] = rs->connected; 1010 1011 // Read either failed or disconnected, or the buffer is not yet full 1012 if (!rc_read) 1013 continue; 1014 1015 // ================================================================== 1016 1017 if (rs->bufTyp==READ_STRUCT::kHeader) 1018 { 1019 //check if startflag correct; else shift block .... 1020 // FIXME: This is not enough... this combination of 1021 // bytes can be anywhere... at least the end bytes 1022 // must be checked somewhere, too. 1023 uint k; 1024 for (k=0; k<sizeof(PEVNT_HEADER)-1; k++) 612 1025 { 613 const int dest2 = tmS * roi + NPIX * roi; 614 615 const int srcT = src - roi; 616 evt->fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024; 617 618 memcpy(&evt->fEvent->Adc_Data[dest2], &rbuf->S[srcT], roi * 2); 1026 if (rs->B[k]==0xfb && rs->B[k+1] == 0x01) 1027 //if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0xfb01) 1028 break; 619 1029 } 1030 rs->skip += k; 1031 1032 //no start of header found 1033 if (k==sizeof(PEVNT_HEADER)-1) 1034 { 1035 rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1]; 1036 rs->bufPos = rs->B+1; 1037 rs->bufLen = sizeof(PEVNT_HEADER)-1; 1038 continue; 1039 } 1040 1041 if (k > 0) 1042 { 1043 memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k); 1044 1045 rs->bufPos -= k; 1046 rs->bufLen += k; 1047 1048 continue; // We need to read more (bufLen>0) 1049 } 1050 1051 if (rs->skip>0) 1052 { 1053 factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId); 1054 rs->skip = 0; 1055 } 1056 1057 // Swap the header entries from network to host order 1058 rs->swapHeader(); 1059 1060 rs->bufTyp = READ_STRUCT::kData; 1061 rs->bufLen = rs->len() - sizeof(PEVNT_HEADER); 1062 1063 debugHead(rs->B); // i and fadBoard not used 1064 1065 continue; 1066 } 1067 1068 const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2); 1069 if (end != 0xfe04) 1070 { 1071 factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x", 1072 rs->sockId, rs->H.fad_evt_counter, rs->len(), end); 1073 1074 // ready to read next header 1075 rs->bufTyp = READ_STRUCT::kHeader; 1076 rs->bufLen = sizeof(PEVNT_HEADER); 1077 rs->bufPos = rs->B; 1078 // FIXME: What to do with the validity flag? 1079 continue; 1080 } 1081 1082 // get index into mBuffer for this event (create if needed) 1083 const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun); 1084 1085 // We have a valid entry, but no memory has yet been allocated 1086 if (evt && !evt->FADhead) 1087 { 1088 // Try to get memory from the big buffer 1089 PEVNT_HEADER *mem = (PEVNT_HEADER*)Memory::malloc(); 1090 if (!mem) 1091 { 1092 // If this works properly, this is a hack which can be removed, or 1093 // replaced by a signal or dim message 1094 if (!rs->repmem) 1095 { 1096 factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum); 1097 rs->repmem = true; 1098 } 1099 continue; 1100 } 1101 1102 evt->initEvent(shared_ptr<PEVNT_HEADER>(mem, Memory::free)); 1103 } 1104 1105 // ready to read next header 1106 rs->bufTyp = READ_STRUCT::kHeader; 1107 rs->bufLen = sizeof(PEVNT_HEADER); 1108 rs->bufPos = rs->B; 1109 1110 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header. 1111 if (!evt) 1112 continue; 1113 1114 /* 1115 const int fad = (i/10)<<8)|(i%10); 1116 if (fad != rs->H.board_id) 1117 { 1118 factPrintf(MessageImp::kWarn, "Board ID mismatch. Expected %x, got %x", fad, rs->H.board_id); 1119 }*/ 1120 1121 // This should never happen 1122 if (evt->board[rs->sockId] != -1) 1123 { 1124 factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.", 1125 evt->evNum, rs->sockId, rs->sockId, rs->len()); 1126 // FIXME: What to do with the validity flag? 1127 continue; // Continue reading next header 1128 } 1129 1130 // Swap the data entries (board headers) from network to host order 1131 rs->swapData(); 1132 1133 // Copy data from rd[i] to mBuffer[evID] 1134 copyData(*rs, evt.get()); 1135 1136 #ifdef COMPLETE_EVENTS 1137 // Do not read anmymore from this board until the whole event has been received 1138 rs->bufTyp = READ_STRUCT::kWait; 1139 #endif 1140 // now we have stored a new board contents into Event structure 1141 evt->fEvent->NumBoards++; 1142 evt->board[rs->sockId] = rs->sockId; 1143 evt->nBoard++; 1144 1145 // event not yet complete 1146 if (evt->nBoard < READ_STRUCT::activeSockets) 1147 continue; 1148 1149 // All previous events are now flagged as incomplete ("expired") 1150 // and will be removed. (This is a bit tricky, because pop_front() 1151 // would invalidate the current iterator if not done _after_ the increment) 1152 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); ) 1153 { 1154 const bool found = it->get()==evt.get(); 1155 if (!found) 1156 reportIncomplete(*it, "expired"); 620 1157 else 1158 primaryQueue.post(evt); 1159 1160 it++; 1161 evtCtrl.pop_front(); 1162 1163 // We reached the current event, so we are done 1164 if (found) 1165 break; 1166 } 1167 1168 #ifdef COMPLETE_EVENTS 1169 for (int j=0; j<40; j++) 1170 { 1171 //if (rs->bufTyp==READ_STRUCT::kWait) 621 1172 { 622 evt->fEvent->StartTM[tmS] = -1; 1173 rs->bufTyp = READ_STRUCT::kHeader; 1174 rs->bufLen = sizeof(PEVNT_HEADER); 1175 rs->bufPos = rs->B; 623 1176 } 624 1177 } 625 }626 }627 }628 629 void doProcess(const shared_ptr<EVT_CTRL> &evt);630 void doWrite(const shared_ptr<EVT_CTRL> &evt);631 632 void checkAndCloseRun(const shared_ptr<RUN_CTRL> &run, int cond, int where);633 634 Queue<shared_ptr<EVT_CTRL>> process(bind(doProcess, placeholders::_1));635 Queue<shared_ptr<EVT_CTRL>> write_queue(bind(doWrite, placeholders::_1));636 637 void preProcess(const shared_ptr<EVT_CTRL> &evt)638 {639 //-------- it is better to open the run already here, so call can be used to initialize640 //-------- buffers etc. needed to interprete run (e.g. DRS calibration)641 const shared_ptr<RUN_CTRL> run = evt->run;642 if (run->runId==0)643 return;644 645 // File not yet open646 if (run->fileId < 0)647 {648 RUN_HEAD actRun;649 actRun.Version = 1;650 actRun.RunType = -1; //to be adapted651 actRun.Nroi = evt->nRoi; //runCtrl[lastRun].roi0;652 actRun.NroiTM = evt->nRoiTM; //runCtrl[lastRun].roi8;653 actRun.RunTime = evt->pcTime[0]; //runCtrl[lastRun].firstTime;654 actRun.RunUsec = evt->pcTime[1]; //runCtrl[lastRun].firstUsec;655 actRun.NBoard = NBOARDS;656 actRun.NPix = NPIX;657 actRun.NTm = NTMARK;658 659 memcpy(actRun.FADhead, evt->FADhead, NBOARDS*sizeof(PEVNT_HEADER));660 661 run->fileHd = runOpen(evt->runNum, &actRun, sizeof (actRun));662 if (run->fileHd == NULL)663 {664 factPrintf(kError, 502, "procEvt: Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);665 run->fileId = 91;666 667 // No further processing of this event668 return;669 }670 671 run->fileId = 0;672 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);673 }674 675 //and set correct event header ; also check for consistency in event (not yet)676 evt->fEvent->Errors[0] = evt->Errors[0];677 evt->fEvent->Errors[1] = evt->Errors[1];678 evt->fEvent->Errors[2] = evt->Errors[2];679 evt->fEvent->Errors[3] = evt->Errors[3];680 681 for (int ib=0; ib<NBOARDS; ib++)682 {683 // board is not read684 if (evt->board[ib] == -1)685 {686 evt->FADhead[ib].start_package_flag = 0;687 evt->fEvent->BoardTime[ib] = 0;688 }689 else690 {691 evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time;692 }693 }694 695 const int rc = eventCheck(evt->runNum, evt->FADhead, evt->fEvent);696 697 // no further processing of event ('delete')698 if (rc < 0)699 return;700 701 //evt->evtStat = 1000; // flag 'start processing'702 run->procEvt++;703 process.post(evt);704 }705 706 void doProcess(const shared_ptr<EVT_CTRL> &evt)707 {708 const int jret = subProcEvt(1, evt->FADhead, evt->fEvent, 0);709 710 if (jret>0 && jret<=1)711 factPrintf(kError, -1, "Process wants to send event to process %d... not allowed.", jret);712 713 // flag as 'to be written'714 if (jret<=1)715 return;716 717 //evt->evtStat = 5000;718 write_queue.post(evt);719 }720 721 void doWrite(const shared_ptr<EVT_CTRL> &evt)722 {723 const shared_ptr<RUN_CTRL> run = evt->run;724 if (run->runId==0)725 return;726 727 // File is not open728 if (run->fileId!=0)729 return;730 731 const int rc = runWrite(run->fileHd, evt->fEvent, 0);732 if (rc >= 0)733 {734 // Sucessfully wrote event735 run->lastTime = g_actTime;736 run->actEvt++;737 }738 else739 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", evt->runNum);740 741 checkAndCloseRun(run, rc<0, 1);742 743 /*744 // Although the are no pending events, we have to check if a run should be closed (timeout)745 for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)746 {747 if (ir->second->fileId == 0)748 {749 //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it750 const int cond = ir->second->runId == 0;751 checkAndCloseRun(ir->second, cond, 2);752 }753 }754 */755 }756 757 void *readFAD (void *ptr)758 {759 /* *** main loop reading FAD data and sorting them to complete events */760 761 Queue<shared_ptr<EVT_CTRL>> queue(bind(preProcess, placeholders::_1));762 763 factPrintf(kInfo, -1, "Start initializing (readFAD)");764 765 READ_STRUCT rd[NBOARDS]; //buffer to read IP and afterwards store in mBuffer766 767 uint32_t actrun = 0;768 769 const int minLen = sizeof(PEVNT_HEADER); //min #bytes needed to check header: full header for debug770 771 772 int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;773 gi_resetS = gi_resetR = 9;774 775 int sockDef[NBOARDS]; //internal state of sockets776 memset(sockDef, 0, NBOARDS*sizeof(int));777 778 START:779 //time in seconds780 uint gi_SecTime = time(NULL);;781 g_actTime = gi_SecTime;782 783 const int cntsock = 8 - NUMSOCK ;784 785 if (gi_resetS > 0) {786 //make sure all sockets are preallocated as 'not exist'787 for (int i = 0; i < NBOARDS; i++) {788 rd[i].socket = -1;789 rd[i].sockStat = 99;790 }791 792 for (int k = 0; k < NBOARDS; k++) {793 gi_NumConnect[k] = 0;794 //gi.numConn[k] = 0;795 gj.numConn[k] = 0;796 //gj.errConn[k] = 0;797 gj.rateBytes[k] = 0;798 gj.totBytes[k] = 0;799 }800 801 }802 803 if (gi_resetR > 0)804 {805 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;806 gj.usdMem = gj.maxMem = gj.xxxMem = 0;807 gj.totMem = tgb_memory;808 gj.bufNew = gj.bufEvt = 0;809 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;810 811 factPrintf(kInfo, -1, "End initializing (readFAD)");812 }813 814 815 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;816 817 //loop until global variable g_runStat claims stop818 while (g_runStat >= 0 && g_reset == 0)819 {820 gj.readStat = g_runStat;821 822 for (int b = 0; b < NBOARDS; b++)823 {824 // Nothing has changed825 if (g_port[b].sockDef == sockDef[b])826 continue;827 828 gi_NumConnect[b] = 0; //must close all connections829 gj.numConn[b] = 0;830 831 // s0 = 0: sockets to be defined and opened832 // s0 = -1: sockets to be destroyed833 // s0 = +1: sockets to be closed and reopened834 835 int s0 = 0;836 if (sockDef[b] != 0)837 s0 = g_port[b].sockDef==0 ? -1 : +1;838 839 const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0;840 841 GenSock(s0, b, p0+1, &g_port[b].sockAddr, &rd[b]); //generate address and socket842 843 sockDef[b] = g_port[b].sockDef;844 }845 846 // count the number of active boards847 int actBoards = 0;848 for (int b = 0; b < NBOARDS; b++)849 if (sockDef[b] > 0)850 actBoards++;851 852 //check all sockets if something to read853 for (int i = 0; i < NBOARDS; i++)854 {855 // Do not try to connect this socket856 if (rd[i].sockStat > 0)857 continue;858 859 if (rd[i].sockStat == -1)860 {861 //try to connect if not yet done862 rd[i].sockStat = connect (rd[i].socket,863 (struct sockaddr *) &rd[i].SockAddr,864 sizeof (rd[i].SockAddr));865 // Failed866 if (rd[i].sockStat == -1)867 {868 rd[i].errCnt++;869 usleep(25000);870 continue;871 }872 873 // Success (rd[i].sockStat == 0)874 875 if (sockDef[i] > 0)876 {877 rd[i].bufTyp = 0; // expect a header878 rd[i].bufLen = sizeof(PEVNT_HEADER); // max size to read at begining879 }880 else881 {882 rd[i].bufTyp = -1; // full data to be skipped883 rd[i].bufLen = MAX_LEN; // huge for skipping884 }885 886 rd[i].bufPos = 0; // no byte read so far887 rd[i].skip = 0; // start empty888 889 gi_NumConnect[i] += cntsock;890 gj.numConn[i]++;891 892 factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", i, gj.numConn[i]);893 }894 895 // Do not read from this socket896 if (rd[i].bufLen<0)897 continue;898 899 if (rd[i].bufLen>0)900 {901 const int32_t jrd =902 recv(rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],903 rd[i].bufLen, MSG_DONTWAIT);904 905 // recv failed906 if (jrd<0)907 {908 // There was just nothing waiting909 if (errno==EWOULDBLOCK || errno==EAGAIN)910 continue;911 912 factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);913 continue;914 }915 916 // connection was closed ...917 if (jrd==0)918 {919 factPrintf(kInfo, 441, "Socket %d closed by FAD", i);920 921 const int s0 = sockDef[i] > 0 ? +1 : -1;922 GenSock(s0, i, 0, NULL, &rd[i]);923 924 gi_NumConnect[i]-= cntsock ;925 gj.numConn[i]--;926 927 continue;928 }929 930 gj.rateBytes[i] += jrd;931 932 // are we skipping this board ...933 if (rd[i].bufTyp < 0)934 continue;935 936 rd[i].bufPos += jrd; //==> prepare for continuation937 rd[i].bufLen -= jrd;938 939 #ifdef EVTDEBUG940 debugRead(i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, rd[i].bufTyp, tv.tv_sec, tv.tv_usec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event941 1178 #endif 942 } 943 944 //we are reading event header 945 if (rd[i].bufTyp <= 0) 946 { 947 //not yet sufficient data to take action 948 if (rd[i].bufPos < minLen) 949 continue; 950 951 //check if startflag correct; else shift block .... 952 // FIXME: This is not enough... this combination of 953 // bytes can be anywhere... at least the end bytes 954 // must be checked somewhere, too. 955 int k; 956 for (k = 0; k < rd[i].bufPos - 1; k++) 957 { 958 //start.S = 0xFB01; 959 if (rd[i].rBuf->B[k] == 0xfb && rd[i].rBuf->B[k+1] == 0x01) 960 break; 961 } 962 rd[i].skip += k; 963 964 //no start of header found 965 if (k >= rd[i].bufPos - 1) 966 { 967 rd[i].rBuf->B[0] = rd[i].rBuf->B[rd[i].bufPos]; 968 rd[i].bufPos = 1; 969 rd[i].bufLen = sizeof(PEVNT_HEADER)-1; 970 continue; 971 } 972 973 if (k > 0) 974 { 975 rd[i].bufPos -= k; 976 rd[i].bufLen += k; 977 memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], 978 rd[i].bufPos); 979 } 980 981 if (rd[i].bufPos < minLen) 982 continue; 983 984 if (rd[i].skip > 0) 985 { 986 factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i); 987 rd[i].skip = 0; 988 } 989 990 // TGB: This needs much more checks than just the first two bytes! 991 992 // Swap everything except start_package_flag. 993 // It is to difficult to find out where it is used how, 994 // but it doesn't really matter because it is not really 995 // used anywehere else 996 // rd[i].rBuf->S[1] = ntohs(rd[i].rBuf->S[1]); // package_length 997 rd[i].rBuf->S[2] = ntohs(rd[i].rBuf->S[2]); // version_no 998 rd[i].rBuf->S[3] = ntohs(rd[i].rBuf->S[3]); // PLLLCK 999 rd[i].rBuf->S[4] = ntohs(rd[i].rBuf->S[4]); // trigger_crc 1000 rd[i].rBuf->S[5] = ntohs(rd[i].rBuf->S[5]); // trigger_type 1001 1002 rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]); // board id 1003 rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]); // adc_clock_phase_shift 1004 rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]); // number_of_triggers_to_generate 1005 rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]); // trigger_generator_prescaler 1006 1007 rd[i].rBuf->I[3] = ntohl(rd[i].rBuf->I[3]); // trigger_id 1008 rd[i].rBuf->I[4] = ntohl(rd[i].rBuf->I[4]); // fad_evt_counter 1009 rd[i].rBuf->I[5] = ntohl(rd[i].rBuf->I[5]); // REFCLK_frequency 1010 1011 rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]); // runnumber; 1012 rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]); // time; 1013 1014 for (int s=24;s<24+NTemp+NDAC;s++) 1015 rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac 1016 1017 rd[i].fadLen = ntohs(rd[i].rBuf->S[1]) * 2; 1018 rd[i].fadVers = rd[i].rBuf->S[2]; 1019 rd[i].ftmTyp = rd[i].rBuf->S[5]; 1020 rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt) 1021 rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt) 1022 rd[i].runID = rd[i].rBuf->I[11]==0 ? g_actTime : rd[i].rBuf->I[11]; 1023 rd[i].bufTyp = 1; //ready to read full record 1024 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos; 1025 1026 const int fadBoard = rd[i].rBuf->S[12]; 1027 debugHead(i, fadBoard, rd[i].rBuf); 1028 1029 continue; 1030 } 1031 1032 // are we reading data 1033 1034 // not yet all read 1035 if (rd[i].bufLen > 0) 1036 continue; 1037 1038 // stop.S = 0x04FE; 1039 if (rd[i].rBuf->B[rd[i].fadLen - 1] != 0xfe || 1040 rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04) 1041 { 1042 factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d", 1043 i, rd[i].evtID, rd[i].fadLen, 1044 rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]); 1045 1046 // ready to read next header 1047 rd[i].bufTyp = 0; 1048 rd[i].bufLen = sizeof(PEVNT_HEADER); 1049 rd[i].bufPos = 0; 1050 1051 continue; 1052 } 1053 1054 // int actid; 1055 // if (g_useFTM > 0) 1056 // actid = rd[i].evtID; 1057 // else 1058 // actid = rd[i].ftmID; 1059 1060 //get index into mBuffer for this event (create if needed) 1061 const shared_ptr<EVT_CTRL> evt = mBufEvt(&rd[i]); 1062 1063 // We have a valid entry, but no memory has yet been allocated 1064 if (evt && evt->FADhead == NULL) 1065 { 1066 // Try to get memory from the big buffer 1067 evt->FADhead = (PEVNT_HEADER*)TGB_Malloc(); 1068 if (evt->FADhead == NULL) 1069 { 1070 // If this works properly, this is a hack which can be removed, or 1071 // replaced by a signal or dim message 1072 if (rd[i].bufTyp==2) 1073 factPrintf(kError, 882, "malloc failed for event %d (run=%d)", evt->evNum, evt->runNum); 1074 rd[i].bufTyp = 2; 1075 continue; 1076 } 1077 1078 // Initialise contents of mBuffer[evID]->fEvent 1079 initEvent(evt); 1080 1081 // Some statistics 1082 gj.usdMem = tgb_inuse; 1083 1084 if (gj.usdMem > gj.maxMem) 1085 gj.maxMem = gj.usdMem; 1086 1087 gj.rateNew++; 1088 gj.bufTot++; 1089 if (gj.bufTot > gj.maxEvt) 1090 gj.maxEvt = gj.bufTot; 1091 } 1092 1093 rd[i].bufTyp = 0; 1094 rd[i].bufLen = sizeof(PEVNT_HEADER); 1095 rd[i].bufPos = 0; 1096 1097 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header. 1098 if (!evt) 1099 continue; 1100 1101 //we have a valid entry in mBuffer[]; fill it 1102 const int fadBoard = rd[i].rBuf->S[12]; 1103 const int fadCrate = fadBoard>>8; 1104 1105 if (i != (fadCrate * 10 + (fadBoard&0xff))) 1106 { 1107 factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)", 1108 i, fadBoard, fadCrate, fadBoard&0xff); 1109 } 1110 1111 if (evt->board[i] != -1) 1112 { 1113 factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d", 1114 evt->evNum, i, i, rd[i].fadLen, 1115 rd[i].rBuf->B[0], rd[i].rBuf->B[1], 1116 rd[i].rBuf->B[rd[i].fadLen - 2], 1117 rd[i].rBuf->B[rd[i].fadLen - 1]); 1118 continue; // Continue reading next header 1119 } 1120 1121 // Copy data from rd[i] to mBuffer[evID] 1122 copyData(rd[i].rBuf, i, evt); 1123 1124 // now we have stored a new board contents into Event structure 1125 1126 evt->fEvent->NumBoards++; 1127 evt->board[i] = i; 1128 evt->nBoard++; 1129 evt->evtStat = evt->nBoard; 1130 1131 // have we already reported first (partial) event of this run ??? 1132 if (evt->nBoard==1 && evt->runNum != actrun) 1133 { 1134 // Signal the fadctrl that a new run has been started 1135 gotNewRun(evt->runNum, NULL); 1136 1137 factPrintf(kInfo, 1, "gotNewRun called, prev run %d, new run %d, event %d", 1138 actrun, evt->runNum, evt->evNum); 1139 1140 // We got the first part of this event, so this is 1141 // the number of events we expect for this run 1142 evt->run->lastEvt++; 1143 1144 // Since we have started a new run, we know already when to close the 1145 // previous run in terms of number of events 1146 const auto ir = runCtrl.find(actrun); 1147 if (ir!=runCtrl.end()) 1148 ir->second->maxEvt = ir->second->lastEvt; 1149 1150 // Change 'actrun' the the new runnumber 1151 actrun = evt->runNum; 1152 } 1153 1154 // event not yet complete 1155 if (evt->nBoard < actBoards) 1156 continue; 1157 1158 // GARBAGE COLLECTION 1159 // This is a non-ideal hack to lower the probability that 1160 // in mBufEvt the search for correct entry in runCtrl 1161 // will not return a super-old entry. I don't want 1162 // to manipulate that in another thread. 1163 for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++) 1164 { 1165 if (ir->runId==evt->runNum) 1166 break; 1167 1168 if (ir->second->fileId>0) 1169 runCtrl.erase(ir); 1170 } 1171 1172 // we have just completed an event... so all previous events 1173 // must have been completed already. If they are not, there 1174 // is no need to wait for the timeout, because they will never 1175 // get completed. We can just ensure that if we check for the previous 1176 // event to be complete every time we receive a new complete event. 1177 // If we find an incomplete one, we remove all consecutive 1178 // incomplete ones. 1179 for (auto it=evtCtrl.begin()+1; it!=evtCtrl.end(); it++) 1180 { 1181 const shared_ptr<EVT_CTRL> e = *it; 1182 1183 if (e.get()==evt.get()) 1184 { 1185 queue.post(e); 1186 evtCtrl.erase(it); 1187 break; 1188 } 1189 1190 reportIncomplete(e, "expired"); 1191 evtCtrl.pop_front(); 1192 } 1193 1194 } // end for loop over all sockets 1195 1196 g_actTime = time (NULL); 1197 if (g_actTime <= gi_SecTime) 1198 { 1199 usleep(1); 1200 continue; 1201 } 1202 gi_SecTime = g_actTime; 1203 1204 gj.bufNew = 0; 1205 1206 //loop over all active events and flag those older than read-timeout 1207 //delete those that are written to disk .... 1208 1209 const int count = evtCtrl.size();//(evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT; 1210 1211 // This could be improved having the pointer which separates the queue with 1212 // the incomplete events from the queue with the complete events 1213 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++) 1214 { 1215 const shared_ptr<EVT_CTRL> evt = *it; 1216 1217 // Check the more likely case first: incomplete events 1218 if (evt->evtStat>=0 && evt->evtStat<100) 1219 { 1220 gj.bufNew++; //incomplete event in Buffer 1221 1222 // Event has not yet timed out or was reported already 1223 if (evt->evtStat==90 || evt->pcTime[0]>=g_actTime - 30) 1224 continue; 1225 1226 // This will result in the emission of a dim service. 1227 // It doesn't matter if that takes comparably long, 1228 // because we have to stop the run anyway. 1229 const uint64_t rep = reportIncomplete(evt, "timeout"); 1230 factReportIncomplete(rep); 1231 1232 //timeout for incomplete events 1233 evt->evtStat = 90; 1234 gj.evtSkip++; 1235 1236 continue; 1237 } 1238 1239 // Check the less likely case: 'useless' or 'delete' 1240 // evtState==0 can happen if the event was initialized (some data received) 1241 // but the data did not make sense (e.g. inconsistent rois) 1242 if (evt->evtStat==0 || evt->evtStat == 10000) 1243 { 1244 evtCtrl.erase(it); //event written--> free memory 1245 1246 gj.evtWrite++; 1247 gj.rateWrite++; 1248 } 1249 1250 // Remove leading invalidated slots from queue 1251 // Do they exist at all? 1252 if (evt->evtStat==-1) 1253 evtCtrl.erase(it); 1254 } 1255 1256 1257 1258 // The number of complete events in the buffer is the total number of 1259 // events in the buffer minus the number of incomplete events. 1260 gj.bufEvt = count - gj.bufNew; 1261 1262 gj.deltaT = 1000; //temporary, must be improved 1263 1264 for (int ib = 0; ib < NBOARDS; ib++) 1265 gj.totBytes[ib] += gj.rateBytes[ib]; 1266 1267 gj.totMem = tgb_memory; 1268 1269 if (gj.maxMem > gj.xxxMem) 1270 gj.xxxMem = gj.maxMem; 1271 if (gj.maxEvt > gj.xxxEvt) 1272 gj.xxxEvt = gj.maxEvt; 1273 1274 factStat (gj); 1275 //factStatNew (gi); 1276 gj.rateNew = gj.rateWrite = 0; 1277 gj.maxMem = gj.usdMem; 1278 gj.maxEvt = gj.bufTot; 1279 for (int b = 0; b < NBOARDS; b++) 1280 gj.rateBytes[b] = 0; 1281 1282 } // while (g_runStat >= 0 && g_reset == 0) 1283 1284 factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset); 1285 1286 if (g_reset > 0) 1287 { 1288 gi_reset = g_reset; 1289 gi_resetR = gi_reset % 10; //shall we stop reading ? 1290 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ? 1291 gi_resetW = (gi_reset / 100) % 10; //shall we close files ? 1292 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ? 1293 g_reset = 0; 1294 } 1295 else 1296 { 1297 gi_reset = 0; 1298 gi_resetR = g_runStat == -1 ? 1 : 7; 1299 1300 gi_resetS = 7; //close all sockets 1301 gi_resetW = 7; //close all files 1302 gi_resetX = 0; 1303 1304 //inform others we have to quit .... 1305 gj.readStat = -11; //inform all that no update to happen any more 1306 } 1307 1308 if (gi_resetS > 0) 1309 { 1310 //must close all open sockets ... 1311 factPrintf(kInfo, -1, "Close all sockets..."); 1312 1313 for (int i = 0; i < NBOARDS; i++) 1314 { 1315 if (rd[i].sockStat != 0) 1316 continue; 1317 1318 GenSock(-1, i, 0, NULL, &rd[i]); //close and destroy open socket 1319 1320 gi_NumConnect[i]-= cntsock ; 1321 gj.numConn[i]--; 1322 sockDef[i] = 0; //flag ro recreate the sockets ... 1323 rd[i].sockStat = -1; //and try to open asap 1324 } 1325 } 1326 1327 1328 if (gi_resetR > 0) 1329 { 1330 //and clear all buffers (might have to wait until all others are done) 1331 while (evtCtrl.size()) 1332 { 1333 const shared_ptr<EVT_CTRL> evt = evtCtrl.front(); 1334 1335 // flag incomplete events as 'read finished' 1336 // We cannot just detele all events, because some might currently being processed, 1337 // so we have to wait until the processing thread currently processing the event 1338 // signals that the event can be deleted. (Note, that there are currently never 1339 // two threads processing the same event at the same time) 1340 if ((evt->evtStat>0 && evt->evtStat<90) || evt->evtStat==10000) 1341 evtCtrl.pop_front(); 1342 1343 usleep(1); 1344 } 1345 } 1346 1347 //queue.wait(); 1348 //queue.join(); 1349 1350 if (gi_reset > 0) 1351 { 1352 if (gi_resetW > 0) 1353 CloseRunFile (0, 0, 0); //ask all Runs to be closed 1354 1355 if (gi_resetX > 0) 1356 { 1357 struct timespec xwait; 1358 xwait.tv_sec = gi_resetX; 1359 xwait.tv_nsec = 0; 1360 nanosleep (&xwait, NULL); 1361 } 1362 1363 factPrintf(kInfo, -1, "Continue read Process ..."); 1364 gi_reset = 0; 1365 goto START; 1366 } 1367 1368 factPrintf(kInfo, -1, "Exit read Process..."); 1369 1370 factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse); 1371 1372 gj.readStat = -99; 1373 1374 factStat (gj); 1375 //factStatNew (gi); 1376 1377 return 0; 1378 1379 } /*-----------------------------------------------------------------*/ 1380 /* 1381 1382 void *subProc(void *thrid) 1383 { 1384 const int64_t threadID = (int64_t)thrid; 1385 1386 factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID); 1387 1388 while (g_runStat > -2) //in case of 'exit' we still must process pending events 1389 { 1390 int numWait = 0; 1391 1392 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++) 1393 { 1394 const shared_ptr<EVT_CTRL> evt = *it; 1395 1396 // This is a threading issue... the evtStat might have been invalid 1397 // but the frstPtr is not yet updated 1398 if (evt->evtStat==-1) 1399 continue; 1400 1401 // If we find the first event still waiting for processing 1402 // there will be only unprocessed events after this one in the queue 1403 if (evt->evtStat<1000+threadID) 1404 { 1405 numWait = 1; 1406 break; 1407 } 1408 1409 // If the event was processed already, skip it 1410 // We could replace that to a moving pointer pointing to the first 1411 // non-processed event 1412 if (evt->evtStat!=1000+threadID) 1413 continue; 1414 1415 const int jret = subProcEvt(threadID, evt->FADhead, evt->fEvent, 0); 1416 1417 if (jret>0 && jret<=threadID) 1418 factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret); 1419 1420 if (jret<=threadID) 1421 { 1422 evt->evtStat = 10000; // flag as 'to be deleted' 1423 continue; 1424 } 1425 1426 if (jret>=gi_maxProc) 1427 { 1428 evt->evtStat = 5000; // flag as 'to be written' 1429 continue; 1430 } 1431 1432 evt->evtStat = 1000 + jret; // flag for next proces 1433 } 1434 1435 if (gj.readStat < -10 && numWait == 0) { //nothing left to do 1436 factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID); 1437 return 0; 1438 } 1439 1440 usleep(1); 1441 } 1442 1443 factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID); 1444 1445 return 0; 1446 } 1447 1448 1449 void * 1450 procEvt (void *ptr) 1451 { 1452 int status; 1453 1454 factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc); 1455 1456 pthread_t thread[100]; 1457 // int th_ret[100]; 1458 1459 for (long long k = 0; k < gi_maxProc; k++) { 1460 pthread_create (&thread[k], NULL, subProc, (void *) k); 1461 } 1462 1463 // in case of 'exit' we still must process pending events 1464 while (g_runStat > -2) 1465 { 1466 int numWait = 0; 1467 1468 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++) 1469 { 1470 const shared_ptr<EVT_CTRL> evt = *it; 1471 1472 // This is a threading issue... the evtStat might have been invalid 1473 // but the frstPtr is not yet updated 1474 if (evt->evtStat==-1) 1475 continue; 1476 1477 // If we find the first incomplete event which is not supposed to 1478 // be processed, there are only more incomplete events in the queue 1479 if (evt->evtStat<90) 1480 { 1481 numWait = 1; 1482 break; 1483 } 1484 1485 // If the event was processed already, skip it. 1486 // We could replace that to a moving pointer pointing to the first 1487 // non-processed event 1488 if (evt->evtStat>=1000) 1489 continue; 1490 1491 //-------- it is better to open the run already here, so call can be used to initialize 1492 //-------- buffers etc. needed to interprete run (e.g. DRS calibration) 1493 const uint32_t irun = evt->runNum; 1494 const int32_t ievt = evt->evNum; 1495 1496 const shared_ptr<RUN_CTRL> run = evt->run; 1497 if (run->runId==0) 1498 continue; 1499 1500 // File not yet open 1501 if (run->fileId < 0) 1502 { 1503 RUN_HEAD actRun; 1504 actRun.Version = 1; 1505 actRun.RunType = -1; //to be adapted 1506 actRun.Nroi = evt->nRoi; //runCtrl[lastRun].roi0; 1507 actRun.NroiTM = evt->nRoiTM; //runCtrl[lastRun].roi8; 1508 actRun.RunTime = evt->pcTime[0]; //runCtrl[lastRun].firstTime; 1509 actRun.RunUsec = evt->pcTime[1]; //runCtrl[lastRun].firstUsec; 1510 actRun.NBoard = NBOARDS; 1511 actRun.NPix = NPIX; 1512 actRun.NTm = NTMARK; 1513 1514 memcpy(actRun.FADhead, evt->FADhead, NBOARDS*sizeof(PEVNT_HEADER)); 1515 1516 run->fileHd = runOpen(irun, &actRun, sizeof (actRun)); 1517 if (run->fileHd == NULL) 1518 { 1519 factPrintf(kError, 502, "procEvt: Could not open new file for run %d (evt=%d, runOpen failed)", irun, ievt); 1520 run->fileId = 91; 1521 continue; 1522 } 1523 1524 run->fileId = 0; 1525 1526 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt); 1527 } 1528 1529 //-------- 1530 //-------- 1531 1532 //and set correct event header ; also check for consistency in event (not yet) 1533 evt->fEvent->Errors[0] = evt->Errors[0]; 1534 evt->fEvent->Errors[1] = evt->Errors[1]; 1535 evt->fEvent->Errors[2] = evt->Errors[2]; 1536 evt->fEvent->Errors[3] = evt->Errors[3]; 1537 1538 for (int ib=0; ib<NBOARDS; ib++) 1539 { 1540 // board is not read 1541 if (evt->board[ib] == -1) 1542 { 1543 evt->FADhead[ib].start_package_flag = 0; 1544 evt->fEvent->BoardTime[ib] = 0; 1545 } 1546 else 1547 { 1548 evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time; 1549 } 1550 } 1551 1552 const int rc = eventCheck(evt->runNum, evt->FADhead, evt->fEvent); 1553 if (rc < 0) 1554 { 1555 evt->evtStat = 10000; // flag event to be deleted 1556 } 1557 else 1558 { 1559 evt->evtStat = 1000; // flag 'start processing' 1560 run->procEvt++; 1561 } 1562 } 1563 1564 if (gj.readStat < -10 && numWait == 0) { //nothing left to do 1565 factPrintf(kInfo, -1, "Exit Processing Process ..."); 1566 gj.procStat = -22; //==> we should exit 1567 return 0; 1568 } 1569 1570 usleep(1); 1571 1572 gj.procStat = gj.readStat; 1573 } 1574 1575 //we are asked to abort asap ==> must flag all remaining events 1576 // when gi_runStat claims that all events are in the buffer... 1577 1578 factPrintf(kInfo, -1, "Abort Processing Process ..."); 1579 1580 for (int k = 0; k < gi_maxProc; k++) { 1581 pthread_join (thread[k], (void **) &status); 1582 } 1583 1584 gj.procStat = -99; 1585 1586 return 0; 1587 1588 } */ 1589 1590 int 1591 CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt) 1592 { 1593 /* close run runId (all all runs if runId=0) */ 1594 /* return: 0=close scheduled / >0 already closed / <0 does not exist */ 1595 1596 if (runId == 0) 1597 { 1598 for (auto it=runCtrl.begin(); it!=runCtrl.end(); it++) 1599 { 1600 const shared_ptr<RUN_CTRL> run = it->second; 1601 1602 //run is open 1603 if (run->fileId == 0) 1604 { 1605 run->closeTime = closeTime; 1606 run->maxEvt = maxEvt; 1607 } 1608 } 1609 return 0; 1610 } 1611 1612 auto it=runCtrl.find(runId); 1613 if (it==runCtrl.end()) 1614 return -1; 1615 1616 const shared_ptr<RUN_CTRL> run = it->second; 1617 1618 // run already closed 1619 if (run->fileId>0) 1620 return +2; 1621 1622 run->closeTime = closeTime; 1623 run->maxEvt = maxEvt; 1624 1625 return run->fileId==0 ? 0 : 1; 1626 1627 } 1628 1629 void checkAndCloseRun(const shared_ptr<RUN_CTRL> &run, int cond, int where) 1630 { 1631 if (!cond && 1632 run->closeTime >= g_actTime && 1633 run->lastTime >= g_actTime - 300 && 1634 run->maxEvt > run->actEvt) 1635 return; 1636 1637 //close run for whatever reason 1638 int ii = 0; 1639 if (cond) 1640 ii = 1; 1641 if (run->closeTime < g_actTime) 1642 ii |= 2; // = 2; 1643 if (run->lastTime < g_actTime - 300) 1644 ii |= 4; // = 3; 1645 if (run->maxEvt <= run->actEvt) 1646 ii |= 8; // = 4; 1647 1648 run->closeTime = g_actTime - 1; 1649 1650 const int rc = runClose(run->fileHd, NULL, 0);//&runTail[j], sizeof(runTail[j])); 1651 if (rc<0) 1652 { 1653 factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)", 1654 where, run->runId, rc); 1655 run->fileId = 92+where*2; 1656 } 1657 else 1658 { 1659 factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)", 1660 where, run->runId, ii); 1661 run->fileId = 93+where*2; 1662 } 1663 } 1664 1665 /*-----------------------------------------------------------------*/ 1666 1667 /* 1668 void *writeEvt (void *ptr) 1669 { 1670 factPrintf(kInfo, -1, "Starting write-thread"); 1671 1672 while (g_runStat > -2) 1673 { 1674 //int numWrite = 0; 1675 int numWait = 0; 1676 1677 // Note that the current loop does not at all gurantee that 1678 // the events are written in the correct order. 1679 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++) 1680 { 1681 const shared_ptr<EVT_CTRL> evt = *it; 1682 1683 // This is a threading issue... the evtStat might have been invalid 1684 // but the frstPtr is not yet updated 1685 if (evt->evtStat==-1) 1686 continue; 1687 1688 // If we find the first non-written event which is not supposed to 1689 // be written, there are only more incomplete events in the queue 1690 if (evt->evtStat<5000) 1691 { 1692 numWait = 1; 1693 break; 1694 } 1695 1696 // If the event was written already already, skip it 1697 // We could replace that to a moving pointer pointing to the first 1698 // non-processed event 1699 if (evt->evtStat!=5000) 1700 continue; 1701 1702 const shared_ptr<RUN_CTRL> run = evt->run; 1703 if (run->runId==0) 1704 continue; 1705 1706 // File is open 1707 if (run->fileId==0) 1708 { 1709 const int rc = runWrite(run->fileHd, evt->fEvent, 0); 1710 if (rc >= 0) 1711 { 1712 // Sucessfully wrote event 1713 run->lastTime = g_actTime; 1714 run->actEvt++; 1715 } 1716 else 1717 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", evt->runNum); 1718 1719 checkAndCloseRun(run, rc<0, 1); 1720 } 1721 1722 evt->evtStat = 10000; // event written (or has to be discarded) -> delete 1723 } 1724 1725 // Although the are no pending events, we have to check if a run should be closed (timeout) 1726 for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++) 1727 { 1728 if (ir->second->fileId == 0) 1729 { 1730 //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it 1731 const int cond = ir->second->runId == 0; 1732 checkAndCloseRun(ir->second, cond, 2); 1733 } 1734 } 1735 1736 usleep(1); 1737 1738 //nothing left to do 1739 if (gj.readStat < -10 && numWait == 0) 1740 { 1741 factPrintf(kInfo, -1, "Finish Write Process ..."); 1742 gj.writStat = -22; //==> we should exit 1743 break; 1744 } 1745 1746 gj.writStat = gj.readStat; 1747 } 1748 1749 factPrintf(kInfo, -1, "Close all open files ..."); 1750 for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++) 1751 { 1752 if (ir->second->fileId == 0) 1753 checkAndCloseRun(ir->second, 1, 3); 1754 } 1755 1756 gj.writStat = -99; 1757 1758 factPrintf(kInfo, -1, "Exit Writing Process ..."); 1759 1760 return 0; 1761 } 1762 */ 1763 1764 1765 1766 1767 void 1768 StartEvtBuild () 1769 { 1770 1771 int i, /*j,*/ imax, status/*, th_ret[50]*/; 1772 pthread_t thread[50]; 1773 struct timespec xwait; 1774 1775 gj.readStat = gj.procStat = gj.writStat = 0; 1776 1777 factPrintf(kInfo, -1, "Starting EventBuilder V15.07 A"); 1778 1779 1780 gi_maxProc = g_maxProc; 1781 if (gi_maxProc <= 0 || gi_maxProc > 90) { 1782 factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc); 1783 gi_maxProc = 1; 1784 } 1785 1786 //start all threads (more to come) when we are allowed to .... 1787 while (g_runStat == 0) { 1788 xwait.tv_sec = 0; 1789 xwait.tv_nsec = 10000000; // sleep for ~10 msec 1790 nanosleep (&xwait, NULL); 1791 } 1792 1793 i = 0; 1794 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL); 1795 i++; 1796 ///*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL); 1797 //i++; 1798 ///*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL); 1799 //i++; 1800 imax = i; 1801 1802 1803 #ifdef BILAND 1804 xwait.tv_sec = 30;; 1805 xwait.tv_nsec = 0; // sleep for ~20sec 1806 nanosleep (&xwait, NULL); 1807 1808 printf ("close all runs in 2 seconds\n"); 1809 1810 CloseRunFile (0, time (NULL) + 2, 0); 1811 1812 xwait.tv_sec = 1;; 1813 xwait.tv_nsec = 0; // sleep for ~20sec 1814 nanosleep (&xwait, NULL); 1815 1816 printf ("setting g_runstat to -1\n"); 1817 1818 g_runStat = -1; 1179 } // end for loop over all sockets 1180 1181 // ================================================================== 1182 1183 // +1 -> idx=0 1184 // -1 -> idx=0 1185 // +2 -> idx=0 1186 // -2 -> idx=0 1187 // +3 -> idx=0 1188 // -3 -> idx=0 1189 // +4 -> idx=0 1190 // -4 -> idx=0 1191 // +5 -> idx=0 1192 // -5 -> idx=0 1193 // +6 -> idx=0 1194 // -6 -> idx=0 1195 // 1196 1197 // ================================================================== 1198 1199 const time_t actTime = time(NULL); 1200 if (actTime == gi_SecTime) 1201 { 1202 #if !defined(USE_SELECT) && !defined(USE_EPOLL) 1203 if (evtCtrl.size()==0) 1204 usleep(1); 1819 1205 #endif 1820 1821 1822 //wait for all threads to finish 1823 for (i = 0; i < imax; i++) { 1824 /*j =*/ pthread_join (thread[i], (void **) &status); 1825 } 1826 1827 } /*-----------------------------------------------------------------*/ 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 /*-----------------------------------------------------------------*/ 1844 /*-----------------------------------------------------------------*/ 1845 /*-----------------------------------------------------------------*/ 1846 /*-----------------------------------------------------------------*/ 1847 /*-----------------------------------------------------------------*/ 1848 1849 #ifdef BILAND 1850 1851 int 1852 subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event, 1853 int8_t * buffer) 1854 { 1855 printf ("called subproc %d\n", threadID); 1856 return threadID + 1; 1857 } 1858 1859 1860 1861 1862 /*-----------------------------------------------------------------*/ 1863 /*-----------------------------------------------------------------*/ 1864 /*-----------------------------------------------------------------*/ 1865 /*-----------------------------------------------------------------*/ 1866 /*-----------------------------------------------------------------*/ 1867 1868 1869 1870 1871 FileHandle_t 1872 runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len) 1873 { 1874 return 1; 1875 }; 1876 1877 int 1878 runWrite (FileHandle_t fileHd, EVENT * event, size_t len) 1879 { 1880 return 1; 1881 usleep (10000); 1882 return 1; 1883 } 1884 1885 1886 //{ return 1; } ; 1887 1888 int 1889 runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len) 1890 { 1891 return 1; 1892 }; 1893 1894 1895 1896 1897 int 1898 eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event) 1899 { 1900 int i = 0; 1901 1902 // printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) ); 1903 // for (i=0; i<NBOARDS; i++) { 1904 // printf("b=%2d,=%5d\n",i,fadhd[i].board_id); 1905 // } 1906 return 0; 1907 } 1908 1909 1910 void 1911 factStatNew (EVT_STAT gi) 1912 { 1913 int i; 1914 1915 //for (i=0;i<MAX_SOCK;i++) { 1916 // printf("%4d",gi.numRead[i]); 1917 // if (i%20 == 0 ) printf("\n"); 1918 //} 1919 } 1920 1921 void 1922 gotNewRun (int runnr, PEVNT_HEADER * headers) 1923 { 1924 printf ("got new run %d\n", runnr); 1925 return; 1926 } 1927 1928 void 1929 factStat (GUI_STAT gj) 1930 { 1931 // printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n", 1932 // array[0],array[1],array[2],array[3],array[4],array[5],array[6]); 1933 } 1934 1935 1936 void 1937 debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, 1938 int state, uint32_t tsec, uint32_t tusec) 1939 { 1940 // printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ; 1941 } 1942 1943 1944 1945 void 1946 debugStream (int isock, void *buf, int len) 1947 { 1948 } 1949 1950 void 1951 debugHead (int i, int j, void *buf) 1952 { 1953 } 1954 1955 1956 void 1957 factOut (int severity, int err, char *message) 1958 { 1959 static FILE *fd; 1960 static int file = 0; 1961 1962 if (file == 0) { 1963 printf ("open file\n"); 1964 fd = fopen ("x.out", "w+"); 1965 file = 999; 1966 } 1967 1968 fprintf (fd, "%3d %3d | %s \n", severity, err, message); 1969 1970 if (severity != kDebug) 1971 printf ("%3d %3d | %s\n", severity, err, message); 1972 } 1973 1974 1975 1976 int 1977 main () 1978 { 1979 int i, b, c, p; 1980 char ipStr[100]; 1981 struct in_addr IPaddr; 1982 1983 g_maxMem = 1024 * 1024; //MBytes 1984 g_maxMem = g_maxMem * 200; //100MBytes 1985 1986 g_maxProc = 20; 1987 1988 g_runStat = 40; 1989 1990 i = 0; 1991 1992 // version for standard crates 1993 //for (c=0; c<4,c++) { 1994 // for (b=0; b<10; b++) { 1995 // sprintf(ipStr,"10.0.%d.%d",128+c,128+b) 1996 // 1997 // inet_pton(PF_INET, ipStr, &IPaddr) ; 1998 // 1999 // g_port[i].sockAddr.sin_family = PF_INET; 2000 // g_port[i].sockAddr.sin_port = htons(5000) ; 2001 // g_port[i].sockAddr.sin_addr = IPaddr ; 2002 // g_port[i].sockDef = 1 ; 2003 // i++ ; 2004 // } 2005 //} 2006 // 2007 //version for PC-test * 2008 for (c = 0; c < 4; c++) { 2009 for (b = 0; b < 10; b++) { 2010 sprintf (ipStr, "10.0.%d.11", 128 + c); 2011 if (c < 2) 2012 sprintf (ipStr, "10.0.%d.11", 128); 2013 else 2014 sprintf (ipStr, "10.0.%d.11", 131); 2015 // if (c==0) sprintf(ipStr,"10.0.100.11") ; 2016 2017 inet_pton (PF_INET, ipStr, &IPaddr); 2018 p = 31919 + 100 * c + 10 * b; 2019 2020 2021 g_port[i].sockAddr.sin_family = PF_INET; 2022 g_port[i].sockAddr.sin_port = htons (p); 2023 g_port[i].sockAddr.sin_addr = IPaddr; 2024 g_port[i].sockDef = 1; 2025 2026 i++; 2027 } 2028 } 2029 2030 2031 //g_port[17].sockDef =-1 ; 2032 //g_actBoards-- ; 2033 2034 StartEvtBuild (); 2035 2036 return 0; 2037 2038 } 2039 #endif 1206 continue; 1207 } 1208 gi_SecTime = actTime; 1209 1210 // ================================================================== 1211 //loop over all active events and flag those older than read-timeout 1212 //delete those that are written to disk .... 1213 //const int count = evtCtrl.size(); 1214 1215 // This could be improved having the pointer which separates the queue with 1216 // the incomplete events from the queue with the complete events 1217 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); ) 1218 { 1219 // A reference is enough because the shared_ptr is hold by the evtCtrl 1220 const shared_ptr<EVT_CTRL2> &evt = *it; 1221 1222 // The first event is the oldest. If the first event within the 1223 // timeout window was received, we can stop searchinf further. 1224 if (evt->time.tv_sec>=actTime - 30) 1225 break; 1226 1227 // This will result in the emission of a dim service. 1228 // It doesn't matter if that takes comparably long, 1229 // because we have to stop the run anyway. 1230 const uint64_t rep = reportIncomplete(evt, "timeout"); 1231 factReportIncomplete(rep); 1232 1233 it++; 1234 evtCtrl.pop_front(); 1235 } 1236 1237 // ================================================================= 1238 1239 // If nothing was received for more than 5min, close file 1240 if (actTime-actrun->lastTime>300) 1241 actrun->maxEvt = actrun->lastEvt; 1242 1243 // This is a fake event to trigger possible run-closing conditions once a second 1244 // FIXME: This is not yet ideal because a file would never be closed 1245 // if a new file has been started and no events of the new file 1246 // have been received yet 1247 if (actrun->fileStat==kFileOpen) 1248 primaryQueue.post(shared_ptr<EVT_CTRL2>(new EVT_CTRL2(actrun))); 1249 1250 // ================================================================= 1251 1252 gj.bufTot = Memory::max_inuse/MAX_TOT_MEM; 1253 gj.usdMem = Memory::max_inuse; 1254 gj.totMem = Memory::allocated; 1255 1256 gj.deltaT = 1000; // temporary, must be improved 1257 1258 for (int ib=0; ib<NBOARDS; ib++) 1259 { 1260 gj.rateBytes[ib] = rd[ib].rateBytes; 1261 gj.totBytes[ib] += rd[ib].rateBytes; 1262 1263 rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr); 1264 1265 gi_NumConnect[ib] = rd[ib].connected; 1266 gj.numConn[ib] = rd[ib].connected; 1267 } 1268 1269 factStat(gj); 1270 1271 Memory::max_inuse = 0; 1272 1273 for (int ib=0; ib<NBOARDS; ib++) 1274 rd[ib].rateBytes = 0; 1275 } 1276 1277 // 1: Stop, wait for event to get processed 1278 // 2: Stop, finish immediately 1279 // 101: Restart, wait for events to get processed 1280 // 101: Restart, finish immediately 1281 // 1282 const int gi_reset = g_reset; 1283 1284 const bool abort = gi_reset%100==2; 1285 1286 factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join"); 1287 1288 primaryQueue.wait(abort); 1289 secondaryQueue.wait(abort); 1290 1291 // Here we also destroy all runCtrl structures and hence close all open files 1292 evtCtrl.clear(); 1293 1294 factPrintf(MessageImp::kInfo, "Exit read Process..."); 1295 factPrintf(MessageImp::kInfo, "%ld Bytes flagged as in-use.", Memory::inuse); 1296 1297 factStat(gj); 1298 1299 return gi_reset>=100; 1300 } 1301 1302 // ========================================================================== 1303 // ========================================================================== 1304 1305 void StartEvtBuild() 1306 { 1307 factPrintf(MessageImp::kInfo, "Starting EventBuilder++"); 1308 1309 1310 for (int k=0; k<NBOARDS; k++) 1311 { 1312 gi_NumConnect[k] = 0; 1313 gj.numConn[k] = 0; 1314 gj.totBytes[k] = 0; 1315 } 1316 1317 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0; 1318 gj.maxMem = gj.xxxMem = 0; 1319 1320 gj.usdMem = Memory::inuse; 1321 gj.totMem = Memory::allocated; 1322 1323 gj.bufNew = gj.bufEvt = 0; 1324 gj.evtSkip = gj.evtWrite = gj.evtErr = 0; 1325 gj.readStat = gj.procStat = gj.writStat = 0; 1326 1327 1328 1329 READ_STRUCT rd[NBOARDS]; 1330 1331 // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets) 1332 for (int i=0; i<NBOARDS; i++) 1333 rd[i].sockId = i; 1334 1335 while (mainloop(rd)); 1336 1337 //must close all open sockets ... 1338 factPrintf(MessageImp::kInfo, "Close all sockets..."); 1339 1340 READ_STRUCT::close(); 1341 1342 // Now all sockets get closed. This is not reflected in gi_NumConnect 1343 // The current workaround is to count all sockets as closed when the thread is not running 1344 }
Note:
See TracChangeset
for help on using the changeset viewer.