Changeset 10963
- Timestamp:
- 06/09/11 21:15:53 (13 years ago)
- Location:
- trunk/FACT++/src
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/EventBuilder.c
r10773 r10963 1 1 2 2 3 #define PX8 99 //simulator does not create double-length roi for pixel 8 4 //for real data, set PX8 = 8 ==> ask for double roi=TM 3 5 4 6 … … 6 8 #include <stdlib.h> 7 9 #include <stdint.h> 10 #include <unistd.h> 8 11 #include <stdio.h> 9 12 #include <time.h> … … 19 22 #include <sched.h> 20 23 21 22 24 #include "EventBuilder.h" 23 25 24 #define ETHTEST 0 26 enum Severity 27 { 28 kMessage = 10, ///< Just a message, usually obsolete 29 kInfo = 20, ///< An info telling something which can be interesting to know 30 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour 31 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program 32 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination 33 kDebug = 99, ///< A message used for debugging only 34 }; 35 25 36 #define MIN_LEN 32 // min #bytes needed to interpret FADheader 26 37 #define MAX_LEN 64*1024 // size of read-buffer per socket 27 38 39 extern FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) ; 40 extern int runWrite(FileHandle_t fileHd , EVENT *event, size_t len ) ; 41 extern int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len ) ; 42 extern void factOut(int severity, int err, char* message ) ; 43 extern void factStat(int severity, int err, char* message ) ; 44 28 45 int g_actTime = 0 ; 29 46 int g_runStat = 40 ; 30 int g_actBoards = 20 ; 31 47 int g_actBoards = 40 ; 48 size_t g_maxMem ; //maximum memory allowed for buffer 49 50 int g_maxBoards ; //maximum number of boards to be initialized 51 FACT_SOCK g_port[NBOARDS] ; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd" 52 53 54 int gi_runStat ; 55 int gp_runStat ; 56 int gw_runStat ; 57 58 59 int gi_maxSocks = 0 ; 32 60 uint gi_SecRate[MAX_SOCK] ; 33 61 uint gi_S10Rate[MAX_SOCK] ; … … 42 70 uint gi_EvtBad = 0 ; 43 71 uint gi_EvtTot = 0 ; 72 size_t gi_usedMem = 0 ; 73 44 74 uint gw_EvtTot = 0 ; 45 75 uint gp_EvtTot = 0 ; 46 76 77 PIX_MAP g_pixMap[NPIX] ; 47 78 48 79 EVT_CTRL evtCtrl ; //control of events during processing … … 78 109 int32_t bufLen ; //number of bytes left to read 79 110 80 int sockStat ; //-1 if socket not yet connected 111 int sockStat ; //-1 if socket not yet connected , 99 if not exist 81 112 int socket ; //contains the sockets 82 113 struct sockaddr_in SockAddr ; //IP for each socket … … 84 115 int evtID ; // event ID of event currently read 85 116 int runID ; // run " 86 //int evtPtr ; // index into evtCtrl structure87 117 uint fadLen ; // FADlength of event currently read 88 118 int fadVers ; // Version of FAD … … 90 120 int Port ; 91 121 92 // int8_t *rBuf; //local buffer to be used when no event defined yet93 122 CNV_FACT *rBuf ; 94 123 … … 102 131 103 132 104 105 struct timespec xwait ; 106 133 #define MXSTR 1000 134 char str[MXSTR] ; 107 135 108 136 SHORT_BYTE start, stop; … … 119 147 120 148 121 int GenSock(int flag, int crate0, int board0, int port0,READ_STRUCT *rd) {149 int GenSock(int flag, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) { 122 150 /* 123 151 *** generate Address, create sockets and allocates readbuffer for it … … 126 154 */ 127 155 128 int crate, board, port ;129 char IPstr[100] ;130 struct in_addr IPaddr ;131 156 132 157 rd->sockStat = -1 ; 133 134 135 crate = crate0;136 board = board0;137 port = port0 ;138 158 139 159 … … 141 161 close(rd->socket) ; 142 162 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { 143 error(1,errno,"Could not generate socket\n"); 163 snprintf(str,MXSTR,"Could not generate socket | %m"); 164 factOut(kFatal,771, str ) ; 144 165 return -2 ; 145 166 } … … 147 168 } 148 169 149 150 if (ETHTEST >0) {151 port = port0+100*crate0+10*board0 ;152 sprintf(IPstr,"10.0.%d.11",128+crate); // test on fact1153 // if (board==3) sprintf(IPstr,"10.0.100.11");154 155 // sprintf(IPstr,"10.0.131.11"); // test on fact1156 inet_pton(PF_INET, IPstr, &IPaddr) ;157 port = port0+100*crate0+10*board0 ;158 } else {159 160 161 sprintf(IPstr,"10.0.%d.%d",128+crate,128+board); // real environment162 if ( inet_pton(PF_INET, IPstr, &IPaddr) <=0 ) {163 error(1,errno,"Error: bad address c=%d b=%d '%s'\n", crate, board, IPstr);164 return -1 ;165 }166 }167 168 170 rd->Port = port ; 169 rd->board = crate0*10+board0 ; 170 rd->SockAddr.sin_family = PF_INET; 171 rd->SockAddr.sin_family = sockAddr->sin_family; 171 172 rd->SockAddr.sin_port = htons(port) ; 172 rd->SockAddr.sin_addr = IPaddr ;173 rd->SockAddr.sin_addr = sockAddr->sin_addr ; 173 174 174 175 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) { 175 error(1,errno,"Could not generate socket\n"); 176 snprintf(str,MXSTR,"Could not generate socket | %m"); 177 factOut(kFatal,773, str ) ; 176 178 return -2 ; 177 179 } else { 178 180 rd->rBuf = malloc(sizeof(CNV_FACT) ) ; 179 181 if ( rd->rBuf == NULL ) { 180 error(1,errno,"Could not create local buffer\n"); 182 snprintf(str,MXSTR,"Could not create local buffer"); 183 factOut(kFatal,774, str ) ; 181 184 return -3 ; 182 185 } 183 186 } 184 185 187 return 0 ; 186 188 187 189 } /*-----------------------------------------------------------------*/ 188 190 189 190 int PrintErr() { 191 192 int k,c,b,p,s ; 193 194 195 k=0 ; 196 printf("Errors:\n"); 197 for (c=0; c<4; c++) { 198 for (b=0; b<10; b++) { 199 s=0 ; 200 printf("c%d b%d: ",c,b); 201 for (p=1; p<8; p++) { 202 printf("%7d",gi_ErrCnt[k]); 203 s+=gi_ErrCnt[k]; 204 } 205 printf("%8d\n",s); 206 } 207 } 208 209 return 0; 210 } /*-----------------------------------------------------------------*/ 211 212 213 int PrintRate() { 214 215 int k,c,b,p,s ; 216 217 218 if (g_actTime > gi_SecTime) { 219 gi_SecTime = g_actTime ; 220 printf("Nr Ev start %d compl %d bad %d\n",gi_EvtStart,gi_EvtRead,gi_EvtBad) ; 221 222 223 k=0 ; 224 printf("Rate/Second:\n"); 225 for (c=0; c<4; c++) { 226 for (b=0; b<10; b++) { 227 s=0 ; 228 printf("c%d b%d: ",c,b); 229 for (p=1; p<8; p++) { 230 printf("%7d",gi_SecRate[k]); 231 s+= gi_SecRate[k]; 232 gi_S10Rate[k]+=gi_SecRate[k]; 233 gi_SecRate[k++]=0 ; 234 } 235 printf("%8d\n",s); 236 } 237 } 238 for (b=0; b<NBOARDS; b++) 239 printf("%d ",gi_NumConnect[b]) ; 240 printf("\n"); 241 } 242 243 244 if ( g_actTime%10 == 0 && g_actTime > gi_S10Time) { 245 gi_S10Time = g_actTime ; 246 k=0 ; 247 printf("Rate/10Second:\n"); 248 for (c=0; c<4; c++) { 249 for (b=0; b<10; b++) { 250 s=0 ; 251 printf("c%d b%d: ",c,b); 252 for (p=1; p<8; p++) { 253 printf("%7d",gi_S10Rate[k]); 254 s+= gi_S10Rate[k]; 255 gi_MinRate[k]+=gi_S10Rate[k]; 256 gi_S10Rate[k++]=0 ; 257 } 258 printf("%8d\n",s); 259 } 260 } 261 } 262 263 264 if ( g_actTime%60 == 0 && g_actTime > gi_MinTime) { 265 gi_MinTime = g_actTime ; 266 k=0 ; 267 printf("Rate/Minute:\n"); 268 for (c=0; c<4; c++) { 269 for (b=0; b<10; b++) { 270 printf("c%d b%d: ",c,b); 271 s=0 ; 272 for (p=1; p<8; p++) { 273 printf("%7d",gi_MinRate[k]); 274 s+= gi_MinRate[k]; 275 gi_MinRate[k++]=0 ; 276 } 277 printf("%8d\n",s); 278 } 279 } 280 } 281 282 return 0; 283 284 } /*-----------------------------------------------------------------*/ 191 /*-----------------------------------------------------------------*/ 285 192 286 193 … … 290 197 // initialize mBuffer (mark all entries as unused\empty) 291 198 292 int i,j,k ; 199 int i ; 200 uint32_t actime ; 201 202 actime = g_actTime + 50000000 ; 293 203 294 204 for (i=0; i<MAX_EVT*MAX_RUN; i++) { … … 297 207 evtCtrl.evtBuf[ i] = -1 ; 298 208 evtCtrl.evtStat[ i] = -1 ; 299 evtCtrl.pcTime[ i] = g_actTime + 50000000 ; //initiate to far future 300 } 301 302 for (i=0; i<MAX_RUN; i++) { 303 runCtrl[i].runId = 304 runCtrl[i].lastTime = 305 runCtrl[i].nextEvt = 0 ; 306 runCtrl[i].fileId = -2 ; 307 308 for (k=0; k<MAX_EVT; k++) runCtrl[i].buffId[k] = 0 ; 309 310 runTail[i].nEventsOk = 311 runTail[i].nEventsRej = 312 runTail[i].nEventsBad = 313 runTail[i].PCtime0 = 314 runTail[i].PCtimeX = 0 ; 315 } 316 317 318 evtCtrl.readPtr = 0 ; 319 evtCtrl.writePtr= 0 ; 209 evtCtrl.pcTime[ i] = actime ; //initiate to far future 210 211 } 212 213 214 actRun.FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ; 215 216 evtCtrl.frstPtr = 0 ; 217 evtCtrl.lastPtr = 0 ; 320 218 321 219 return 0 ; … … 326 224 327 225 328 int mBufEvt( int evID, int runID,int nRoi) {226 int mBufEvt(uint evID, uint runID, uint nRoi) { 329 227 // generate a new Event into mBuffer: 330 228 // make sure only complete Event are possible, so 'free' will always work … … 333 231 334 232 int i, k, evFree ; 335 336 if (nRoi < 0 || nRoi > 1024) { 337 printf("illegal nRoi %d\n",nRoi) ; 233 int headmem=0 ; 234 size_t needmem = 0 ; 235 236 if (nRoi <=0 || nRoi > 1024) { 237 snprintf(str,MXSTR,"illegal nRoi: %d",nRoi) ; 238 factOut(kError, 1, str ) ; 338 239 return 99 ; 339 240 } … … 354 255 //event does not yet exist; create 355 256 if (evFree < 0 ) { //no space available in ctrl 356 error(0,0, "no space left to keep event...") ; 257 snprintf(str,MXSTR,"no control slot to keep event...") ; 258 factOut(kError,881, str ) ; 357 259 return -1 ; 358 260 } 261 i = evFree ; //found free entry; use it ... 359 262 360 263 361 362 i = evFree ; //found free entry; use it ... 363 364 mBuffer[i].fEvent = malloc( sizeof(EVENT) ) ; 365 if (mBuffer[i].fEvent == NULL) return -12; 366 367 368 mBuffer[i].fEvent->StartPix = malloc( NPIX * sizeof(int16_t) ) ; 369 if (mBuffer[i].fEvent->StartPix == NULL) { 264 needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2 - 2 ; //-2 because of dummy adc_data[1] 265 266 headmem = NBOARDS* sizeof(PEVNT_HEADER) ; 267 268 if ( gi_usedMem + needmem + headmem > g_maxMem) { 269 snprintf(str,MXSTR,"no memory left to keep event...") ; 270 factOut(kError,882, str ) ; 271 return -1 ; 272 } 273 274 mBuffer[i].FADhead = malloc( headmem ) ; 275 if (mBuffer[i].FADhead == NULL) { 276 return -12; 277 } 278 279 mBuffer[i].fEvent = malloc( needmem ) ; 280 if (mBuffer[i].fEvent == NULL) { 370 281 free(mBuffer[i].fEvent) ; 371 282 mBuffer[i].fEvent = NULL ; 372 mBuffer[i].nRoi = -3 ; 373 return -13; 374 } 375 for (k=0; k<NPIX; k++) mBuffer[i].fEvent->StartPix[k] = -1 ; 376 377 mBuffer[i].fEvent->StartTM = malloc( NTMARK * sizeof(int16_t) ) ; 378 if (mBuffer[i].fEvent->StartTM == NULL) { 379 free(mBuffer[i].fEvent->StartPix ) ; 380 free(mBuffer[i].fEvent) ; 381 mBuffer[i].fEvent = NULL ; 382 mBuffer[i].nRoi = -4 ; 383 return -14; 384 } 385 for (k=0; k<NTMARK; k++) mBuffer[i].fEvent->StartTM[k] = -1 ; 386 387 mBuffer[i].fEvent->Adc_Data = malloc( NPIX * nRoi * sizeof(int16_t) ) ; 388 if (mBuffer[i].fEvent->Adc_Data == NULL) { 389 free(mBuffer[i].fEvent->StartTM ) ; 390 free(mBuffer[i].fEvent->StartPix ) ; 391 free(mBuffer[i].fEvent) ; 392 mBuffer[i].fEvent = NULL ; 393 mBuffer[i].nRoi = -5 ; 394 return -15; 395 } 396 397 mBuffer[i].fEvent->Adc_Tmark= malloc( NTMARK* nRoi * sizeof(int16_t) ) ; 398 if (mBuffer[i].fEvent->Adc_Tmark== NULL) { 399 free(mBuffer[i].fEvent->Adc_Data ) ; 400 free(mBuffer[i].fEvent->StartTM ) ; 401 free(mBuffer[i].fEvent->StartPix ) ; 402 free(mBuffer[i].fEvent) ; 403 mBuffer[i].fEvent = NULL ; 404 mBuffer[i].nRoi = -6 ; 405 return -16; 406 } 407 408 mBuffer[i].fEvent->FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ; 409 if (mBuffer[i].fEvent->FADhead== NULL) { 410 free(mBuffer[i].fEvent->Adc_Tmark ) ; 411 free(mBuffer[i].fEvent->Adc_Data ) ; 412 free(mBuffer[i].fEvent->StartTM ) ; 413 free(mBuffer[i].fEvent->StartPix ) ; 414 free(mBuffer[i].fEvent) ; 415 mBuffer[i].fEvent = NULL ; 416 mBuffer[i].nRoi = -7 ; 417 return -17; 418 } 419 283 return -22; 284 } 285 286 //flag all boards as unused 420 287 mBuffer[i].nBoard = 0 ; 421 288 for (k=0; k<NBOARDS; k++ ) { 422 289 mBuffer[i].board[k] = -1; 290 } 291 292 //flag all pixels as unused 293 for (k=0; k<NPIX; k++ ) { 294 mBuffer[i].fEvent->StartPix[k] = -1 ; 295 } 296 297 //flag all TMark as unused 298 for (k=0; k<NTMARK; k++ ) { 299 mBuffer[i].fEvent->StartTM[k] = -1 ; 423 300 } 424 301 … … 427 304 mBuffer[i].evNum = evID ; 428 305 mBuffer[i].runNum = runID ; 429 306 mBuffer[i].evtLen = needmem ; 307 308 gi_usedMem += needmem + headmem; 430 309 431 310 //register event in 'active list (reading)' 432 311 433 evtCtrl.evtBuf[ evtCtrl.readPtr] = i ; 434 evtCtrl.evtStat[ evtCtrl.readPtr] = 0 ; 435 evtCtrl.pcTime[ evtCtrl.readPtr] = g_actTime ; 436 evtIdx[i] = evtCtrl.readPtr ; 437 438 evtCtrl.readPtr++ ; 439 if (evtCtrl.readPtr == MAX_EVT*MAX_RUN ) evtCtrl.readPtr = 0; 312 evtCtrl.evtBuf[ evtCtrl.lastPtr] = i ; 313 evtCtrl.evtStat[ evtCtrl.lastPtr] = 0 ; 314 evtCtrl.pcTime[ evtCtrl.lastPtr] = g_actTime ; 315 evtIdx[i] = evtCtrl.lastPtr ; 316 snprintf(str,MXSTR,"%5d start new evt %8d %8d %2d",evID,i,evtCtrl.lastPtr,0); 317 factOut(kDebug,-11, str ) ; 318 evtCtrl.lastPtr++ ; 319 if (evtCtrl.lastPtr == MAX_EVT*MAX_RUN ) evtCtrl.lastPtr = 0; 320 321 322 440 323 441 324 gi_EvtStart++ ; … … 449 332 450 333 if (evFree <0 ) { 451 error(0,0, "not able to register the new run %d\n",runID); 334 snprintf(str,MXSTR,"not able to register the new run %d",runID); 335 factOut(kError,883, str ) ; 452 336 } else { 453 337 runCtrl[evFree].runId = runID ; 454 338 } 455 456 339 457 340 return i ; … … 464 347 //(and make sure multiple calls do no harm ....) 465 348 349 int headmem=0 ; 350 size_t freemem = 0 ; 351 466 352 if ( mBuffer[i].nRoi > 0) { //have an fEvent structure generated ... 467 free(mBuffer[i].fEvent->Adc_Tmark) ; 468 free(mBuffer[i].fEvent->Adc_Data ) ; 469 free(mBuffer[i].fEvent->StartTM ) ; 470 free(mBuffer[i].fEvent->StartPix ) ; 353 freemem = mBuffer[i].evtLen ; 471 354 free(mBuffer[i].fEvent ) ; 472 355 mBuffer[i].fEvent = NULL ; 473 } 356 357 free(mBuffer[i].FADhead ) ; 358 mBuffer[i].FADhead = NULL ; 359 360 } 361 headmem = NBOARDS* sizeof(PEVNT_HEADER) ; 474 362 mBuffer[i].evNum = mBuffer[i].runNum = mBuffer[i].nRoi= -1; 475 363 364 gi_usedMem = gi_usedMem - freemem - headmem; 365 366 476 367 return 0 ; 477 368 … … 483 374 484 375 485 int initReadFAD() { 486 /* *** initialize reading of FAD data */ 487 int32_t i,j,k ; 488 int c,b,p ; 489 376 void initReadFAD() { 377 return ; 378 } /*-----------------------------------------------------------------*/ 379 380 381 382 void *readFAD( void *ptr ) { 383 /* *** main loop reading FAD data and sorting them to complete events */ 384 int head_len,frst_len,numok,numok2,dest,evID,i,j,k ; 385 int32_t jrd ; 386 int32_t myRun ; 387 int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ; 388 uint qtot = 0, qread = 0, qconn = 0 ; 389 int errcnt0 = 0 ; 390 391 int goodhed=0; 392 393 struct timespec xwait ; 394 395 int nokCnt[MAX_SOCK],loopCnt=0; 396 int sokCnt[MAX_SOCK]; 397 398 399 snprintf(str,MXSTR,"start initializing"); 400 factOut(kInfo,-1, str ) ; 401 402 int cpu = 7 ; //read thread 403 cpu_set_t mask; 404 405 /* CPU_ZERO initializes all the bits in the mask to zero. */ 406 CPU_ZERO( &mask ); 407 /* CPU_SET sets only the bit corresponding to cpu. */ 408 cpu = 7 ; 409 CPU_SET( cpu, &mask ); 410 // cpu = 6 ; 411 // CPU_SET( cpu, &mask ); 412 413 /* sched_setaffinity returns 0 in success */ 414 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) { 415 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu); 416 factOut(kWarn,-1, str ) ; 417 } 418 419 420 gi_maxSocks = 0 ; 421 422 //make sure all sockets are preallocated as 'not exist' 423 for (i=0; i<MAX_SOCK; i++) { 424 rd[i].socket = -1 ; 425 rd[i].sockStat = 99 ; 426 } 427 428 int b,p,p0 ; 429 k = 0 ; 430 for (b=0; b<NBOARDS; b++ ) { 431 if ( g_port[b].sockDef >=0 ) { 432 p0=ntohs(g_port[b].sockAddr.sin_port); 433 for (p=p0+1; p<p0+8; p++) { 434 j = GenSock(0,p, &g_port[b].sockAddr, &rd[k]) ; 435 if ( j != 0 ) { 436 snprintf(str,MXSTR,"problem with Address board %d port %d",b,p); 437 factOut(kFatal,101, str ) ; 438 } else { 439 rd[k].board = b ; 440 k++ ; 441 gi_maxSocks++ ; 442 } 443 } 444 } 445 } 490 446 491 447 g_actTime = time(NULL) ; 492 493 k = 0 ;494 for ( c=0; c<4; c++ )495 for (b=0; b<10; b++ )496 for (p=5001; p<5008; p++) {497 j = GenSock(0,c,b,p, &rd[k]) ;498 if ( j != 0 ) printf("problem with c%d b%d p%d\n",c,b,p);499 // else printf("ok socket %d = %d\n",k,rd[k].socket) ;500 k++ ;501 }502 503 448 for (k=0; k<MAX_SOCK; k++) 504 449 gi_SecRate[k]=gi_S10Rate[k]=gi_MinRate[k]=gi_ErrCnt[k] = 0 ; … … 509 454 510 455 gi_SecTime= gi_S10Time= gi_MinTime= g_actTime ; 511 512 return NULL; 513 514 } /*-----------------------------------------------------------------*/ 515 516 517 518 void *readFAD( void *ptr ) { 519 /* *** main loop reading FAD data and sorting them to complete events */ 520 int head_len,frst_len,numok,numok2,evFree,dest,evID,i,j,k ; 521 int32_t jrd ; 522 int8_t FADbyte0, FADbyte1, FADbyteX0, FADbyteX1 ; 523 int32_t myRun, cleanTime ; 524 int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ; 525 int reqBoards = 40 ; 526 527 int goodevt=0; 528 int goodhed=0; 529 int nbuf=0; 530 int ret ; 531 532 int waitTime = 10 ; //maximum nr of seconds wait for delayed packets 533 534 int nokCnt[MAX_SOCK],loopCnt=0; 535 int sokCnt[MAX_SOCK]; 456 457 458 459 mBufInit() ; //initialize buffers 460 461 snprintf(str,MXSTR,"end initializing"); 462 factOut(kInfo,-1, str ) ; 463 536 464 537 465 for (k=0; k<MAX_SOCK; k++) sokCnt[k]=nokCnt[k]=0 ; 538 466 539 540 467 head_len = sizeof(PEVNT_HEADER) ; 541 frst_len = head_len + 36 * 12 ; 542 if (head_len < MIN_LEN) { printf("headLen ...\n"); exit(99);} 468 frst_len = head_len + 36 * 12 ; //fad_header plus 36*pix_header 543 469 544 470 numok = numok2 = 0 ; … … 548 474 549 475 myRun = g_actTime ; 550 cleanTime = g_actTime ; //once per second cleanup buffers from too old data 551 552 553 554 555 mBufInit() ; 556 557 558 while (g_runStat > 0) { //loop until global variable g_stop is set 559 476 477 gi_runStat = g_runStat ; 478 479 480 while (g_runStat >=0) { //loop until global variable g_stop is set 481 482 gi_runStat = g_runStat ; 560 483 561 484 g_actTime = time(NULL) ; … … 566 489 numok = 0 ; //count number of succesfull actions 567 490 568 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read 569 491 for (i=0; i<gi_maxSocks; i++) { //check all sockets if something to read 570 492 if (rd[i].sockStat <0 ) { //try to connect if not yet done 571 493 rd[i].sockStat=connect(rd[i].socket, 572 494 (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ; 573 if (rd[i].sockStat >=0 ) { //successfull ==>495 if (rd[i].sockStat ==0 ) { //successfull ==> 574 496 rd[i].bufTyp = 0 ; // expect a header 575 497 rd[i].bufLen = frst_len ; // max size to read at begining 576 498 rd[i].bufPos = 0 ; // no byte read so far 577 gi_NumConnect[ rd[i].board ]++ ; 578 printf("+++connect %d %d\n",rd[i].board,gi_NumConnect[ rd[i].board ]); 499 gi_NumConnect[ rd[i].board ]++ ; 500 numok++ ; //make sure next round will execute 501 snprintf(str,MXSTR,"+++connect %d %d",rd[i].board,gi_NumConnect[ rd[i].board ]); 502 factOut(kInfo,-1, str ) ; 579 503 } 580 504 } 581 505 582 if (rd[i].sockStat >=0) { //we have a connection ==> try to read506 if (rd[i].sockStat ==0) { //we have a connection ==> try to read 583 507 numok++ ; 584 508 sokCnt[i]++; … … 587 511 if (jrd == 0) { //connection has closed ... 588 512 rd[i].sockStat = -1 ; //flag (try to reopen next round) 589 error(0,errno,"Socket %d closed",i); 590 j = GenSock(1,0,0,0, &rd[i]) ; 513 snprintf(str,MXSTR,"Socket %d closed by FAD",i); 514 factOut(kInfo,441, str ) ; 515 j = GenSock(1,0,NULL, &rd[i]) ; 591 516 gi_ErrCnt[i]++ ; 592 gi_NumConnect[ rd[i].board ]-- ; 593 printf("disconnect %d %d\n",rd[i].board,gi_NumConnect[ rd[i].board ]); 517 gi_NumConnect[ rd[i].board ]-- ; 594 518 } else if ( jrd<0 ) { //did not read anything 595 519 if (errno != EAGAIN && errno != EWOULDBLOCK ) { 596 error(1,errno,"Error Reading from %d",i); 520 snprintf(str,MXSTR,"Error Reading from %d | %m",i); 521 factOut(kError,442, str ) ; 597 522 gi_ErrCnt[i]++ ; 598 523 } else numok-- ; //else nothing waiting to be read 599 524 600 525 } else if ( rd[i].bufTyp >0 ) { // we are reading data ... 601 //printf("received data %d %d\n", i,jrd); 602 603 if ( jrd < rd[i].bufLen ) { //not yet all read 526 qread+=jrd ; 527 if ( jrd < rd[i].bufLen ) { //not yet all read 604 528 rd[i].bufPos += jrd ; //==> prepare for continuation 605 529 rd[i].bufLen -= jrd ; … … 610 534 && rd[i].rBuf->B[ rd[i].bufPos ] != stop.B[1]) { 611 535 gi_ErrCnt[i]++ ; 612 printf( "wrong end of buffer found %d\n",rd[i].bufPos);613 exit(1) ;536 snprintf(str,MXSTR,"wrong end of buffer found %d",rd[i].bufPos); 537 factOut(kError,301, str ) ; 614 538 goto EndBuf ; 615 539 … … 624 548 625 549 if (evID < 0) { 626 printf("no space left ...%d\n",evID) ;627 exit(2) ;628 goto EndBuf ; 550 snprintf(str,MXSTR,"no space left ...%d",evID) ; 551 factOut(kError,201, str ) ; 552 goto EndBuf ; //--> skip event (and hope it will improve) 629 553 } 630 554 … … 632 556 633 557 boardId = rd[i].board ; 634 if ( mBuffer[evID].board[ boardId ] != -1) { //this board already stored ... 635 printf( "board of this event already stored ...") ; 636 } else { 637 638 int iDx = evtIdx[evID] ; //index into evtCtrl 639 640 memcpy( &mBuffer[evID].fEvent->FADhead[boardId].start_package_flag, 558 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ; 559 int fadCrate = fadBoard/256 ; 560 if (boardId != (fadCrate*10 + fadBoard%256) ) { 561 snprintf(str,MXSTR,"wrong Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ; 562 if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ; //print only few times 563 // } else { 564 // snprintf(str,MXSTR,"correct Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ; 565 // if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ; //print only few times 566 } 567 if ( mBuffer[evID].board[ boardId ] != -1) { 568 snprintf(str,MXSTR,"double board %d for event %d",boardId,evID) ; 569 factOut(kWarn,501, str ) ; 570 goto EndBuf ; //--> skip Board 571 } 572 573 int iDx = evtIdx[evID] ; //index into evtCtrl 574 575 memcpy( &mBuffer[evID].FADhead[boardId].start_package_flag, 641 576 &rd[i].rBuf->S[0], head_len) ; 642 mBuffer[evID].board[ boardId ] = boardId ; 643 roi = mBuffer[evID].nRoi ; 644 645 pixS = boardId*36 -1 ; // 646 tmS = boardId*4 -1 ; // 647 src = head_len/2 ; 648 for ( drs=0; drs<4; drs++ ) { 649 for ( px=0; px<9; px++ ) { 650 pixH= ntohs(rd[i].rBuf->S[src++]) ; 651 pixC= ntohs(rd[i].rBuf->S[src++]) ; 652 pixR= ntohs(rd[i].rBuf->S[src++]) ; 653 654 src++ ; 655 pixS++ ; //pixS = pixH2S[pixH] ; 656 if (pixR != roi ) { 657 if (px == 8 && pixR == 2*roi ) { 658 } else { 659 printf("wrong roi %d %d %d %d\n",px,pixR,roi,src-2); 660 //exit(66); 661 } 662 // goto EndBuf ; 663 } 664 577 roi = mBuffer[evID].nRoi ; 578 579 pixS = boardId*36 -1 ; // 580 tmS = boardId*4 -1 ; // 581 src = head_len/2 ; 582 for ( drs=0; drs<4; drs++ ) { 583 for ( px=0; px<9; px++ ) { 584 pixH= ntohs(rd[i].rBuf->S[src++]) ; 585 pixC= ntohs(rd[i].rBuf->S[src++]) ; 586 pixR= ntohs(rd[i].rBuf->S[src++]) ; 587 588 src++ ; 589 pixS++ ; //pixS = pixH2S[pixH] ; 590 if ( ( px < PX8 && pixR == roi ) 591 || ( px ==PX8 && pixR == 2*roi ) 592 || ( px ==PX8 && pixR == roi && roi > 512 ) ) { 593 // correct roi 665 594 mBuffer[evID].fEvent->StartPix[pixS] =pixC; 666 595 dest= pixS * roi ; 667 596 memcpy( 668 669 597 &mBuffer[evID].fEvent->Adc_Data[dest], 598 &rd[i].rBuf->S[src], roi * 2) ; 670 599 src+= roi ; 671 672 // if (px==8 && roi < 512 ) { 673 // tmS++ ; 674 // dest= tmS * roi ; 675 // mBuffer[evID].fEvent->StartTM[pixS] =pixC+roi; 676 // memcpy( 677 // &mBuffer[evID].fEvent->Adc_Tmark[dest], 678 // &rd[i].rBuf.S[src], roi * 2) ; 679 // ?? not in the simulator ... src+= roi ; 680 // } 600 if ( px==PX8 ) { 601 tmS++; // tmS = tmH2S[pixH] 602 dest= tmS * roi + NPIX* roi ; 603 if ( roi <=512 ) { 604 mBuffer[evID].fEvent->StartTM[tmS] =(pixC+roi)%1024 ; 605 memcpy( 606 &mBuffer[evID].fEvent->Adc_Data[dest], 607 &rd[i].rBuf->S[src], roi * 2) ; 608 src+=roi ; 609 } else { 610 mBuffer[evID].fEvent->StartTM[tmS] = -1 ; 611 } 612 } 613 } else { 614 snprintf(str,MXSTR,"wrong roi %d %d %d %d",px,pixR,roi,src-2); 615 factOut(kError,202, str ) ; 616 goto EndBuf ; 681 617 } 682 618 } 683 evtCtrl.evtStat[ iDx ]++ ;684 evtCtrl.pcTime[ iDx ] = g_actTime ;685 686 if (++mBuffer[evID].nBoard == 19 ) {687 //complete event read ---> flag for next processing688 evtCtrl.evtStat[ iDx ] = 99;689 gi_EvtRead++ ;690 gi_EvtTot++ ;691 printf("complete event --------------------------------------------------\n");692 }693 619 }// now we have stored a new board contents into Event structure 620 mBuffer[evID].board[ boardId ] = boardId ; 621 evtCtrl.evtStat[ iDx ]++ ; 622 evtCtrl.pcTime[ iDx ] = g_actTime ; 623 624 if (++mBuffer[evID].nBoard == g_actBoards ) { 625 snprintf(str,MXSTR,"%5d complete event %8d %8d %2d",mBuffer[evID].evNum,evtCtrl.evtBuf[iDx],iDx,evtCtrl.evtStat[ iDx ]); 626 factOut(kDebug,-1, str ) ; 627 //complete event read ---> flag for next processing 628 evtCtrl.evtStat[ iDx ] = 99; 629 gi_EvtRead++ ; 630 gi_EvtTot++ ; 631 } 694 632 695 633 EndBuf: … … 700 638 701 639 } else { //we are reading event header 640 qread+=jrd ; 702 641 rd[i].bufPos += jrd ; 703 642 rd[i].bufLen -= jrd ; … … 708 647 && rd[i].rBuf->B[k+1] == start.B[0] ) break ; 709 648 } 649 650 651 for (k=0; k<rd[i].bufPos -1 ; k++) { 652 if (rd[i].rBuf->B[k ] == start.B[1] 653 && rd[i].rBuf->B[k+1] == start.B[0] ) break ; 654 } 655 710 656 if (k >= rd[i].bufPos-1 ) { //no start of header found 711 printf("no start of header found !!!!\n"); 657 snprintf(str,MXSTR,"no start of header on port%d", i ) ; 658 factOut(kWarn,666, str ) ; 659 712 660 rd[i].bufPos = 0 ; 713 661 rd[i].bufLen = head_len ; … … 719 667 if ( rd[i].bufPos > MIN_LEN ) { 720 668 goodhed++; 721 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ; ///???669 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ; 722 670 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ; 723 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; 671 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; //(FADevt) 724 672 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ; 725 printf("received event %d %d\n",rd[i].evtID,i); 726 if (rd[i].runID ==0 ) rd[i].runID = myRun ; 673 if (rd[i].runID ==0 ) rd[i].runID = myRun ; 727 674 rd[i].bufTyp = 1 ; //ready to read full record 728 675 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ; … … 734 681 } //finished trying to read all sockets 735 682 736 683 int qwait=0, qdel=0, qskip=0 ; 737 684 g_actTime = time(NULL) ; 738 685 if ( g_actTime > gi_SecTime ) { 686 gi_SecTime = g_actTime ; 739 687 // PrintRate() ; 740 688 689 741 690 //loop over all active events and flag those older than read-timeout 742 743 int kd = evtCtrl.readPtr - evtCtrl.writePtr ; 691 //delete those that are written to disk .... 692 693 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 744 694 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 745 for ( k=evtCtrl.writePtr; k<(evtCtrl.writePtr+kd); k++ ) { 695 696 int k1=evtCtrl.frstPtr; 697 for ( k=k1; k<(k1+kd); k++ ) { 746 698 int k0 = k % (MAX_EVT*MAX_RUN) ; 747 699 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if) 748 700 if (evtCtrl.evtStat[k0] > 0 749 && evtCtrl.evtStat[k0] < 90 750 && evtCtrl.pcTime[k0] < g_actTime-10 ) { 751 evtCtrl.evtStat[k0] = 91 ; 701 && evtCtrl.evtStat[k0] < 90 ) { 702 703 qwait++; 704 705 if( evtCtrl.pcTime[k0] < g_actTime-10 ) { 706 int id =evtCtrl.evtBuf[k0] ; 707 snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]); 708 factOut(kWarn,601, str ) ; 709 evtCtrl.evtStat[k0] = 91 ; //timeout for incomplete events 752 710 gi_EvtBad++ ; 753 711 gi_EvtTot++ ; 712 qskip++; 713 } 714 715 716 } else if (evtCtrl.evtStat[k0] >= 900 ) { 717 718 int id =evtCtrl.evtBuf[k0] ; 719 snprintf(str,MXSTR,"%5d free event buffer (written) %3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ; 720 factOut(kDebug,-1, str ) ; 721 mBufFree(id) ; //event written--> free memory 722 evtCtrl.evtStat[k0] = -1; 723 qdel++; 724 qtot++; 725 } 726 727 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) { 728 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ; 754 729 } 755 730 } 731 732 qconn=0 ; 733 int ib ; 734 for (ib=0; ib<NBOARDS; ib++) qconn+=gi_NumConnect[ib] ; 735 736 snprintf(str,MXSTR,"bfr%5d skp%4d free%4d (tot%7d) mem%9lu rd%10d %3d",qwait,qskip,qdel,qtot,gi_usedMem,qread,qconn); 737 factStat(kInfo,-1, str ) ; 738 qread=0 ; 756 739 } 740 741 757 742 758 743 … … 765 750 xwait.tv_sec = 0; 766 751 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 767 // xwait.tv_nsec= 10000000 ; // sleep for ~10 msec768 752 } 769 // printf("sleeping ...\n");770 753 nanosleep( &xwait , NULL ) ; 771 754 } 772 755 773 774 775 776 756 } //and do next loop over all sockets ... 777 778 return NULL; 757 758 //must quit eventbuilding 759 snprintf(str,MXSTR,"stop reading ..."); 760 factOut(kInfo,-1, str ) ; 761 762 //flag all events as 'read finished' 763 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 764 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 765 766 int k1=evtCtrl.frstPtr; 767 768 for ( k=k1; k<(k1+kd); k++ ) { 769 int k0 = k % (MAX_EVT*MAX_RUN) ; 770 if (evtCtrl.evtStat[k0] > 0 771 && evtCtrl.evtStat[k0] < 90 ) { 772 evtCtrl.evtStat[k0] = 91 ; 773 gi_EvtBad++ ; 774 gi_EvtTot++ ; 775 } 776 } 777 778 //must close all open sockets ... 779 snprintf(str,MXSTR,"close all sockets ..."); 780 factOut(kInfo,-1, str ) ; 781 for (i=0; i<MAX_SOCK; i++) 782 if (rd[i].sockStat ==0 ) { 783 j=close(rd[i].socket) ; 784 if (j>0) { 785 snprintf(str,MXSTR,"Error closing socket %d | %m",i); 786 factOut(kFatal,771, str ) ; 787 } 788 rd[i].sockStat = -1 ; //flag (try to reopen next round) 789 gi_NumConnect[ rd[i].board ]-- ; 790 } 791 792 xwait.tv_sec = 0; 793 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 794 nanosleep( &xwait , NULL ) ; 795 gi_runStat = -11 ; //inform all that no update to happen any more 796 797 798 int minclear = 900 ; //usually wait until writing finished (stat 900) 799 if (g_runStat <-1 ) minclear = 0 ; //in case of abort clear all 800 801 802 //and clear all buffers (might have to wait until all others are done) 803 snprintf(str,MXSTR,"clear all buffers ..."); 804 factOut(kInfo,-1, str ) ; 805 int numclear=1 ; 806 while (numclear > 0 ) { 807 numclear = 0 ; 808 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 809 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 810 811 int k1=evtCtrl.frstPtr; 812 for ( k=k1; k<(k1+kd); k++ ) { 813 int k0 = k % (MAX_EVT*MAX_RUN) ; 814 if (evtCtrl.evtStat[k0] > minclear ) { 815 int id =evtCtrl.evtBuf[k0] ; 816 mBufFree(id) ; //event written--> free memory 817 evtCtrl.evtStat[k0] = -1; 818 } else if (evtCtrl.evtStat[k0] > 0) numclear++ ; //writing is still ongoing... 819 820 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) 821 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ; 822 } 823 824 xwait.tv_sec = 0; 825 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 826 nanosleep( &xwait , NULL ) ; 827 } 828 829 snprintf(str,MXSTR,"Exit read Process ..."); 830 factOut(kInfo,-1, str ) ; 831 gi_runStat = -99 ; 832 return 0; 833 779 834 } /*-----------------------------------------------------------------*/ 780 835 … … 782 837 void *procEvt( void *ptr ) { 783 838 /* *** main loop processing file, including SW-trigger */ 784 int numProc ; 785 int k,k1,k2,kd ; 786 787 while (g_runStat > 0) { 788 789 790 kd = evtCtrl.readPtr - evtCtrl.writePtr ; 839 int numProc, numWait ; 840 int k ; 841 struct timespec xwait ; 842 char str[MXSTR] ; 843 844 cpu_set_t mask; 845 int cpu = 5 ; //process thread (will be several in final version) 846 847 snprintf(str,MXSTR,"Starting process-thread"); 848 factOut(kInfo,-1, str ) ; 849 850 /* CPU_ZERO initializes all the bits in the mask to zero. */ 851 CPU_ZERO( &mask ); 852 /* CPU_SET sets only the bit corresponding to cpu. */ 853 CPU_SET( cpu, &mask ); 854 /* sched_setaffinity returns 0 in success */ 855 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) { 856 snprintf(str,MXSTR,"P ---> can not create affinity to %d",cpu); 857 factOut(kWarn,-1, str ) ; 858 } 859 860 861 while (g_runStat > -2) { //in case of 'exit' we still must process pending events 862 863 numWait = numProc = 0 ; 864 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 791 865 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 792 k1=evtCtrl.writePtr; 793 k2=evtCtrl.writePtr+kd; 794 795 numProc = 0 ; 796 if (gp_EvtTot < gi_EvtTot) { 797 for ( k=k1; k<k2; k++ ) { 798 int k0 = k % (MAX_EVT*MAX_RUN) ; 799 800 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) { 801 //ready to be processed ... 802 int id = evtCtrl.evtBuf[k0] ; 803 uint32_t irun = mBuffer[id].runNum ; 804 int ievt = mBuffer[id].evNum ; 805 printf("processing %d %d %d %d\n",ievt,k,evtCtrl.evtStat[k0],evtCtrl.writePtr) ; 806 numProc++ ; 807 evtCtrl.evtStat[k0] = 501 ; 808 gp_EvtTot++ ; 866 867 int k1=evtCtrl.frstPtr; 868 for ( k=k1; k<(k1+kd); k++ ) { 869 int k0 = k % (MAX_EVT*MAX_RUN) ; 870 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if) 871 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) { 872 int id = evtCtrl.evtBuf[k0] ; 873 uint32_t irun = mBuffer[id].runNum ; 874 int ievt = mBuffer[id].evNum ; 875 int roi = mBuffer[id].nRoi ; 876 //snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ; 877 //factOut(kDebug,-1, str ) ; 878 879 //make sure unused pixels/tmarks are cleared to zero 880 int ip,it,dest,ib; 881 for (ip=0; ip<NPIX; ip++) { 882 if (mBuffer[id].fEvent->StartPix[ip] == -1 ) { 883 dest= ip*roi ; 884 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ; 885 } 809 886 } 887 for (it=0; it<NTMARK; it++) { 888 if (mBuffer[id].fEvent->StartTM[it] == -1 ) { 889 dest= it*roi + NPIX*roi ; 890 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ; 891 } 892 } 893 //and set correct event header ; also check for consistency in event 894 mBuffer[id].fEvent->Roi = roi ; 895 mBuffer[id].fEvent->EventNum = ievt ; 896 mBuffer[id].fEvent->TriggerType = 0 ; // TBD 897 mBuffer[id].fEvent->SoftTrig = 0 ; 898 for (ib=0; ib<NBOARDS; ib++) { 899 mBuffer[id].fEvent->BoardTime[ib] = 123 ; 900 901 902 903 904 } 905 numProc++ ; 906 evtCtrl.evtStat[k0] = 520 ; 907 gp_EvtTot++ ; 908 } else if ( evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] < 90 ) { 909 numWait++ ; 810 910 } 811 911 } 912 913 if ( gi_runStat < -10 && numWait == 0) { //nothing left to do 914 snprintf(str,MXSTR,"Exit Processing Process ..."); 915 factOut(kInfo,-1, str ) ; 916 gp_runStat = -22 ; //==> we should exit 917 return 0 ; 918 } 919 812 920 if (numProc == 0) { 813 921 //seems we have nothing to do, so sleep a little 814 922 xwait.tv_sec = 0; 815 xwait.tv_nsec= 10000000 ; // sleep for ~10msec923 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 816 924 nanosleep( &xwait , NULL ) ; 817 925 } 926 gp_runStat = gi_runStat ; 818 927 819 928 } 820 return NULL; 929 930 //we are asked to abort asap ==> must flag all remaining events 931 // when gi_runStat claims that all events are in the buffer... 932 933 snprintf(str,MXSTR,"Abort Processing Process ..."); 934 factOut(kInfo,-1, str ) ; 935 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 936 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 937 938 int k1=evtCtrl.frstPtr; 939 for ( k=k1; k<(k1+kd); k++ ) { 940 int k0 = k % (MAX_EVT*MAX_RUN) ; 941 if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <500) { 942 evtCtrl.evtStat[k0] = 555 ; //flag event as 'processed' 943 } 944 } 945 946 gp_runStat = -99 ; 947 948 return 0; 821 949 822 950 } /*-----------------------------------------------------------------*/ 823 951 952 int CloseRunFile(uint32_t runId, uint32_t closeTime) { 953 /* close run runId (all all runs if runId=0) */ 954 /* return: 0=close scheduled / >0 already closed / <0 does not exist */ 955 int j ; 956 957 if (runId == 0 ) { 958 for ( j=0; j<MAX_RUN; j++) { 959 if ( runCtrl[j].fileId == 0 ) { //run is open 960 runCtrl[j].closeTime = closeTime ; 961 } 962 } 963 return 0 ; 964 } 965 966 967 for ( j=0; j<MAX_RUN; j++) { 968 if ( runCtrl[j].runId == runId ) { 969 if ( runCtrl[j].fileId == 0 ) { //run is open 970 runCtrl[j].closeTime = closeTime ; 971 return 0; 972 } else if ( runCtrl[j].fileId <0 ) { //run not yet opened 973 runCtrl[j].closeTime = closeTime ; 974 return 0; 975 } else { // run already closed 976 return +1; 977 } 978 } 979 } //we only reach here if the run was never created 980 return -1; 981 982 } /*-----------------------------------------------------------------*/ 983 824 984 825 985 void *writeEvt( void *ptr ) { 826 986 /* *** main loop writing event (including opening and closing run-files */ 827 987 828 int numWrite = 0 ; 829 int j,id,irun,ievt ; 830 831 while (g_runStat > 0) { //loop until global variable g_stop is set 832 833 //loop over buffered events and check if something to write ... 834 835 if ( gp_EvtTot == gw_EvtTot ) { 836 //there is for sure nothing to do --> sleep a little 837 xwait.tv_sec = 0; 838 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec 839 nanosleep( &xwait , NULL ) ; 840 841 } else { //go through evtCtrl list to check if there might be something 842 843 //if run-file not yet opened==> open runfile (better to store headers in own structure ?) 844 845 //if eventid == next event for run ==> write it (or flag it) 846 //if eventid > next event exists, and nothing new for >time out ==> write it 847 //if nothing for this run for >timeout ==> close run 848 849 int kd = evtCtrl.readPtr - evtCtrl.writePtr ; 850 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 851 int k,k1,k2 ; 852 853 854 k1=evtCtrl.writePtr; 855 k2=evtCtrl.writePtr+kd; 856 857 int evtTot=gw_EvtTot ; 858 for ( k=k1; k<k2; k++ ) { 859 int k0 = k % (MAX_EVT*MAX_RUN) ; 860 861 if (evtCtrl.evtStat[k0] > 500 ) { //ready to be written ... 862 id = evtCtrl.evtBuf[k0] ; 863 irun = mBuffer[id].runNum ; 864 ievt = mBuffer[id].evNum ; 865 866 for ( j=0; j<MAX_RUN; j++) { 867 if ( runCtrl[j].runId == irun ) break ; 868 } 869 if ( j >= MAX_RUN ) { 870 printf("error: can not find run %d\n", irun); 871 exit(111); 872 } 873 874 if (runCtrl[j].fileId < 0 ) { 875 printf("open new run_file %d\n",irun) ; 876 runCtrl[j].fileId = 999 ; // should be a function call 877 runCtrl[j].nextEvt= 0; 878 runCtrl[j].lastTime=g_actTime ; 879 } 880 881 if (runCtrl[j].nextEvt == ievt ) { //write this event 882 printf("write event %d (run %d %d)\n",ievt,irun,evtCtrl.evtStat[k0] ) ; 883 runCtrl[j].nextEvt= ievt+1; 884 runCtrl[j].lastTime=g_actTime ; 885 evtCtrl.evtStat[k0]= -1 ; 886 gw_EvtTot++ ; 887 numWrite++ ; 888 // evtCtrl.writePtr=k+1; 889 } else if ( ievt < runCtrl[j].nextEvt ) { 890 printf("delayed event (run %d %d %d) skipped\n",ievt,irun,evtCtrl.evtStat[k0] ) ; 891 evtCtrl.evtStat[k0]= -1 ; 892 // evtCtrl.writePtr=k+1; 893 gw_EvtTot++ ; 894 numWrite++ ; 895 } 896 } 897 898 899 900 if ( runCtrl[j].lastTime < g_actTime-15) { 901 printf("non existing event skip %d (run %d -> %d)\n",runCtrl[j].nextEvt,irun,ievt) ; 902 runCtrl[j].nextEvt++; 903 numWrite++; 904 } 905 906 for ( j=0; j<MAX_RUN; j++) { 907 if ( runCtrl[j].runId >0 && runCtrl[j].lastTime < g_actTime-120) { 908 printf("close run %d (timeout)\n",irun) ; 909 runCtrl[j].fileId = -2 ; 910 runCtrl[j].runId = 0 ; 911 } 912 } 913 if (numWrite == 0 ) { 914 //nothing to do at the moment ==> sleep a little 915 xwait.tv_sec = 0; 916 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec 917 nanosleep( &xwait , NULL ) ; 918 } 919 920 } 921 } 922 923 924 925 926 927 928 929 930 931 932 933 934 return NULL; 988 int numWrite, numWait ; 989 int k,j ; 990 struct timespec xwait ; 991 char str[MXSTR] ; 992 993 cpu_set_t mask; 994 int cpu = 3 ; //write thread 995 996 snprintf(str,MXSTR,"Starting write-thread"); 997 factOut(kInfo,-1, str ) ; 998 999 /* CPU_ZERO initializes all the bits in the mask to zero. */ 1000 CPU_ZERO( &mask ); 1001 /* CPU_SET sets only the bit corresponding to cpu. */ 1002 CPU_SET( cpu, &mask ); 1003 /* sched_setaffinity returns 0 in success */ 1004 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) { 1005 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu); 1006 } 1007 1008 int lastRun = 0 ; //usually run from last event still valid 1009 1010 while (g_runStat >-2) { 1011 1012 numWait = numWrite = 0 ; 1013 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ; 1014 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ; 1015 1016 int k1=evtCtrl.frstPtr; 1017 for ( k=k1; k<(k1+kd); k++ ) { 1018 int k0 = k % (MAX_EVT*MAX_RUN) ; 1019 //would be better to use bitmaps for evtStat (allow '&' instead of multi-if) 1020 if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] < 900) { 1021 int id = evtCtrl.evtBuf[k0] ; 1022 uint32_t irun = mBuffer[id].runNum ; 1023 int ievt = mBuffer[id].evNum ; 1024 1025 if (runCtrl[lastRun].runId == irun ) { 1026 j = lastRun ; 1027 } else { 1028 //check which fileID to use (or open if needed) 1029 for ( j=0; j<MAX_RUN; j++) { 1030 if ( runCtrl[j].runId == irun ) break ; 1031 } 1032 if ( j >= MAX_RUN ) { 1033 snprintf(str,MXSTR,"W error: can not find run %d for event %d in %d", irun,ievt,id); 1034 factOut(kFatal,901, str ) ; 1035 for ( j=0; j<MAX_RUN; j++) printf("j %d run.j %d run %d\n",j,runCtrl[j].runId,irun ); 1036 exit(111); 1037 } 1038 lastRun = j ; 1039 } 1040 1041 if (runCtrl[j].fileId < 0 ) { 1042 actRun.Version = 1 ; 1043 actRun.RunType = -1 ; 1044 actRun.NBoard = NBOARDS ; 1045 actRun.NPix = NPIX ; 1046 actRun.NTm = NTMARK ; 1047 actRun.Nroi = mBuffer[id].nRoi ; 1048 // actRun.FADhead = mBuffer[id].FADhead ; //to be corrected 1049 runCtrl[j].nextEvt= 0; 1050 runCtrl[j].lastTime=g_actTime ; 1051 runCtrl[j].fileHd = runOpen(irun, &actRun, sizeof(actRun) ) ; 1052 if (runCtrl[j].fileHd == NULL ) { 1053 snprintf(str,MXSTR,"W could not open a file for run %d",irun); 1054 factOut(kError,502, str ) ; 1055 runCtrl[j].fileId = 99 ; 1056 } else { 1057 snprintf(str,MXSTR,"W opened new run_file %d",irun) ; 1058 factOut(kInfo,-1, str ) ; 1059 runCtrl[j].fileId = 0 ; 1060 } 1061 1062 } 1063 1064 if (runCtrl[j].fileId > 0 ) { 1065 snprintf(str,MXSTR,"W no open file for this run %d",irun) ; 1066 factOut(kDebug,123,str) ; 1067 evtCtrl.evtStat[k0] = 902 ; 1068 } else { 1069 int i=runWrite(runCtrl[j].fileHd, mBuffer[id].fEvent, sizeof(mBuffer[id]) ); 1070 if (i<0) { 1071 snprintf(str,MXSTR,"W error writing event for run %d",irun) ; 1072 factOut(kError,503, str ) ; 1073 evtCtrl.evtStat[k0] = 901 ; 1074 //close run 1075 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) ); 1076 if (i<0) { 1077 snprintf(str,MXSTR,"W error closing run %d",irun) ; 1078 factOut(kError,503, str ) ; 1079 } else { 1080 snprintf(str,MXSTR,"W closed run %d because of write error",irun) ; 1081 factOut(kInfo,503, str ) ; 1082 } 1083 runCtrl[j].fileId = 9999 ; 1084 } else { 1085 runCtrl[j].lastTime = g_actTime; 1086 evtCtrl.evtStat[k0] = 901 ; 1087 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0); 1088 factOut(kDebug,504, str ) ; 1089 } 1090 } 1091 } else if (evtCtrl.evtStat[k0] > 0 ) numWait++ ; 1092 } 1093 1094 //check if we should close a run ... 1095 for ( j=0; j<MAX_RUN; j++) { 1096 if ( runCtrl[j].fileId==0 1097 && ( runCtrl[j].closeTime < g_actTime 1098 ||runCtrl[j].lastTime < g_actTime-120) ) { 1099 int i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) ); 1100 if (i<0) { 1101 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ; 1102 factOut(kError,506, str ) ; 1103 runCtrl[j].fileId = 888 ; 1104 } else { 1105 snprintf(str,MXSTR,"closing run %d ok BBB",runCtrl[j].runId); 1106 factOut(kInfo,507, str ) ; 1107 runCtrl[j].fileId = 7777 ; 1108 } 1109 } 1110 } 1111 1112 if (numWrite == 0) { 1113 //seems we have nothing to do, so sleep a little 1114 xwait.tv_sec = 0; 1115 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 1116 nanosleep( &xwait , NULL ) ; 1117 } 1118 1119 if ( gi_runStat < -10 && numWait == 0) { //nothing left to do 1120 snprintf(str,MXSTR,"Finish Write Process ..."); 1121 factOut(kInfo,-1, str ) ; 1122 gw_runStat = -22 ; //==> we should exit 1123 goto closerun ; 1124 } 1125 gw_runStat = gi_runStat ; 1126 935 1127 } 936 1128 937 return NULL; 1129 //must close all open files .... 1130 snprintf(str,MXSTR,"Abort Writing Process ..."); 1131 factOut(kInfo,-1, str ) ; 1132 closerun: 1133 snprintf(str,MXSTR,"Close all open files ..."); 1134 factOut(kInfo,-1, str ) ; 1135 for ( j=0; j<MAX_RUN; j++) 1136 if ( runCtrl[j].runId >0 ) { 1137 int i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) ); 1138 if (i<0) { 1139 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ; 1140 factOut(kError,506, str ) ; 1141 runCtrl[j].fileId = 888 ; 1142 } else { 1143 snprintf(str,MXSTR,"closing run %d ok AAA",runCtrl[j].runId); 1144 factOut(kInfo,507, str ) ; 1145 runCtrl[j].fileId = 7777 ; 1146 } 1147 } 1148 1149 gw_runStat = -99; 1150 snprintf(str,MXSTR,"Exit Writing Process ..."); 1151 factOut(kInfo,-1, str ) ; 1152 return 0; 1153 1154 1155 938 1156 939 1157 } /*-----------------------------------------------------------------*/ … … 941 1159 942 1160 1161 1162 void StartEvtBuild() { 1163 1164 int i,j,imax,status,th_ret[50] ; 1165 pthread_t thread[50] ; 1166 struct timespec xwait ; 1167 uint32_t actime ; 1168 1169 gi_runStat = gp_runStat = gw_runStat = 0 ; 1170 1171 snprintf(str,MXSTR,"Starting EventBuilder"); 1172 factOut(kInfo,-1, str ) ; 1173 1174 1175 evtCtrl.frstPtr = 0 ; 1176 evtCtrl.lastPtr = 0 ; 1177 1178 actime = g_actTime + 50000000 ; 1179 /* initialize run control logics */ 1180 for (i=0; i<MAX_RUN; i++) { 1181 runCtrl[i].runId = 0 ; 1182 runCtrl[i].lastTime = 0 ; 1183 runCtrl[i].closeTime = time(NULL) + 3600*24*7; 1184 1185 runCtrl[i].nextEvt = 0 ; 1186 runCtrl[i].fileId = -2 ; 1187 1188 runTail[i].nEventsOk = 1189 runTail[i].nEventsRej = 1190 runTail[i].nEventsBad = 1191 runTail[i].PCtime0 = 1192 runTail[i].PCtimeX = 0 ; 1193 } 1194 1195 //start all threads (more to come) when we are allowed to .... 1196 while (g_runStat == 0 ) { 1197 xwait.tv_sec = 0; 1198 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec 1199 nanosleep( &xwait , NULL ) ; 1200 } 1201 1202 i=0 ; 1203 th_ret[i] = pthread_create( &thread[i], NULL, readFAD, NULL ); 1204 i++; 1205 th_ret[i] = pthread_create( &thread[i], NULL, procEvt, NULL ); 1206 i++; 1207 th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, NULL ); 1208 i++; 1209 imax=i ; 1210 1211 1212 1213 943 1214 /* 944 int main() { 945 int i,th_ret[50] ; 946 pthread_t thread[50] ; 947 948 initReadFAD() ; 949 i=0 ; 950 th_ret[i] = pthread_create( &thread[i], NULL, readFAD, (void*) i++ ); 951 th_ret[i] = pthread_create( &thread[i], NULL, procEvt, (void*) i++ ); 952 th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, (void*) i++ ); 953 954 for(;;) { sleep(1); } 955 956 957 } 958 */ 1215 1216 xwait.tv_sec = 20;; 1217 xwait.tv_nsec= 0 ; // sleep for ~20sec 1218 nanosleep( &xwait , NULL ) ; 1219 1220 1221 printf("close all runs in 2 seconds\n"); 1222 1223 1224 CloseRunFile( 0, time(NULL)+2) ; 1225 1226 xwait.tv_sec = 5;; 1227 xwait.tv_nsec= 0 ; // sleep for ~20sec 1228 nanosleep( &xwait , NULL ) ; 1229 1230 printf("setting g_runstat to -1\n"); 1231 1232 g_runStat = -1 ; 1233 1234 1235 */ 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 //wait for all threads to finish 1246 for (i=0; i<imax; i++) { 1247 j = pthread_join ( thread[i], (void **)&status) ; 1248 } 1249 1250 } /*-----------------------------------------------------------------*/ 1251 -
trunk/FACT++/src/EventBuilder.h
r10784 r10963 20 20 extern int g_actTime ; //actual time, to be updated regularily 21 21 extern int g_runStat ; //main steering variable 22 extern int g_actBoards ; //number of boards that should exist 22 extern int g_actBoards ; //number of boards that should exist at the moment 23 extern size_t g_maxMem ; //maximum memory allowed for buffer 24 25 extern PIX_MAP g_pixMap[NPIX] ; 26 27 extern int g_maxBoards ; //maximum number of boards to be initialized 28 extern FACT_SOCK g_port[NBOARDS] ; // .port = baseport, .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd" 23 29 24 30 extern uint gi_SecRate[MAX_SOCK] ; //MAX_SOCK defined in FACTEvent.h … … 32 38 extern uint gi_EvtBad ; 33 39 extern uint gi_EvtTot ; 40 extern int gi_maxSocks ; //maximum sockets that will be tried 41 extern size_t gi_usedMem ; 42 extern int gi_runStat ; 43 44 extern uint gp_EvtTot ; 45 extern int gp_runStat ; 34 46 extern uint gw_EvtTot ; 35 extern uint gp_EvtTot;47 extern int gw_runStat ; 36 48 37 //#define error Error 38 //extern void Error(int severity, int errnum, const char *fmt, ...); 49 50 39 51 40 52 #ifdef __cplusplus -
trunk/FACT++/src/FAD.h
r10773 r10963 29 29 // Data structures 30 30 // 31 32 typedef struct{ 33 int hardID ; //hardware ID 34 int pos_i ; //geometrical positon row 35 int pos_j ; // column 36 int G_APD ; //G-APD identifier 37 double V_op ; //operation voltage 38 int HV_B ; //HV Board 39 int HV_C ; //HV Channel 40 double pos_X ; //geometrical position in pixel units 41 double pos_Y ; // 42 } PIX_MAP ; 43 44 31 45 typedef struct { 32 46 … … 76 90 #define NBOARDS 40 // max. number of boards 77 91 #define NPIX 1440 // max. number of pixels 78 #define NTMARK 1 80 // max. number of timeMarker signals92 #define NTMARK 160 // max. number of timeMarker signals 79 93 #define MAX_SOCK 280 // NBOARDS * 7 80 94 … … 101 115 102 116 typedef struct { 117 uint16_t Roi ; // #slices per pixel (same for all pixels and tmarks) 103 118 uint32_t EventNum ; // EventNumber as from FTM 104 119 uint16_t TriggerType ; // Trigger Type from FTM 105 uint16_t Roi ; // #slices per pixel (same for all pixels and tmarks)106 120 107 121 uint32_t SoftTrig ; // SoftTrigger Info (TBD) 108 122 uint32_t PCTime ; // when did event start to arrive at PC 109 123 110 int16_t *StartPix ; //First Channel per Pixel (Pixels sorted according Software ID) ; -1 if not filled 111 112 int16_t *StartTM ; //First Channel for TimeMark (sorted Hardware ID) ; -1 if not filled 113 114 uint16_t *Adc_Data ; // [ NPixels ] [ ROI ] sorted softID 115 116 uint16_t *Adc_Tmark ; // [ NTmark ] [ ROI ] sorted hardID 117 118 //this is highly redundant and should be reduced to usefull info [if any] 119 PEVNT_HEADER *FADhead; // [ NBoards ] sorted Board Headers (according Hardware ID) 124 uint32_t BoardTime[NBOARDS];// 125 126 int16_t StartPix[NPIX]; // First Channel per Pixel (Pixels sorted according Software ID) ; -1 if not filled 127 128 int16_t StartTM[NTMARK]; // First Channel for TimeMark (sorted Hardware ID) ; -1 if not filled 129 130 uint16_t Adc_Data[1]; // final length defined by malloc .... 120 131 121 132 } EVENT ; … … 162 173 //--------------------------------------------------------------- 163 174 164 #define MAX_RUN 64 165 #define MAX_EVT 131072 175 #define MAX_RUN 256 176 #define MAX_EVT 32768 //don't worry, for events is MAX_RUN*MAX_EVT 177 178 typedef void* FileHandle_t ; 166 179 167 180 typedef struct { 168 181 uint32_t runId ; //run number 169 182 uint32_t lastTime ; //time when last event written so far 183 uint32_t closeTime ; //time when run should be closed 170 184 uint32_t nextEvt ; //next event number to be written 171 185 uint32_t waitEvt ; //event that would be ready to be written 172 int32_t fileId ; //id of open file 186 int32_t fileId ; //<0 never opened, 0=open, >0 closed 187 FileHandle_t fileHd ; //fileHandle (NULL if not open) 173 188 int16_t ctrlId[MAX_EVT] ; //index to buffId (sorted list; -1 =end) 174 189 uint16_t buffId[MAX_EVT] ; //index to mBuffer(buffered raw data) … … 188 203 int32_t nRoi ; 189 204 int32_t pcTime ; 205 int32_t evtLen ; 190 206 EVENT *fEvent ; 207 PEVNT_HEADER *FADhead; // 191 208 192 209 } WRK_DATA ; //internal to eventbuilder … … 197 214 198 215 typedef struct { 199 int readPtr ; //index of reading200 int writePtr ; //index of writing216 int frstPtr ; //first used index 217 int lastPtr ; //last used index 201 218 int evtBuf[MAX_EVT*MAX_RUN] ; //index of event in mBuffer 202 219 int evtStat[MAX_EVT*MAX_RUN] ; //status of event: … … 215 232 } EVT_CTRL ; 216 233 234 //--------------------------------------------------------------- 235 236 237 typedef struct { 238 struct sockaddr_in sockAddr ; 239 int sockDef ; //<0 not defined/ ==0 not to be used/ >0 used 240 } FACT_SOCK ; 217 241 218 242
Note:
See TracChangeset
for help on using the changeset viewer.