source: trunk/FACT++/src/EventBuilder.c@ 10934

Last change on this file since 10934 was 10773, checked in by tbretz, 13 years ago
File size: 27.4 KB
Line 
1
2
3
4
5
6#include <stdlib.h>
7#include <stdint.h>
8#include <stdio.h>
9#include <time.h>
10#include <arpa/inet.h>
11#include <string.h>
12#include <math.h>
13#include <error.h>
14#include <errno.h>
15#include <unistd.h>
16#include <sys/types.h>
17#include <sys/socket.h>
18#include <pthread.h>
19#include <sched.h>
20
21
22#include "EventBuilder.h"
23
24#define ETHTEST 0
25#define MIN_LEN 32 // min #bytes needed to interpret FADheader
26#define MAX_LEN 64*1024 // size of read-buffer per socket
27
28int g_actTime = 0 ;
29int g_runStat = 40 ;
30int g_actBoards = 20 ;
31
32uint gi_SecRate[MAX_SOCK] ;
33uint gi_S10Rate[MAX_SOCK] ;
34uint gi_MinRate[MAX_SOCK] ;
35uint gi_ErrCnt[MAX_SOCK] ;
36
37uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
38
39uint gi_SecTime, gi_S10Time, gi_MinTime ;
40uint gi_EvtStart= 0 ;
41uint gi_EvtRead = 0 ;
42uint gi_EvtBad = 0 ;
43uint gi_EvtTot = 0 ;
44uint gw_EvtTot = 0 ;
45uint gp_EvtTot = 0 ;
46
47
48EVT_CTRL evtCtrl ; //control of events during processing
49int evtIdx[MAX_EVT*MAX_RUN] ; //index from mBuffer to evtCtrl
50
51WRK_DATA mBuffer[MAX_EVT*MAX_RUN]; //local working space
52
53
54
55
56RUN_HEAD actRun ;
57
58RUN_CTRL runCtrl[MAX_RUN] ;
59
60RUN_TAIL runTail[MAX_RUN] ;
61
62
63/*
64*** Definition of rdBuffer to read in IP packets; keep it global !!!!
65 */
66
67
68typedef union {
69 int8_t B[MAX_LEN/8];
70 int16_t S[MAX_LEN/4];
71 int32_t I[MAX_LEN/2];
72 int64_t L[MAX_LEN ];
73} CNV_FACT ;
74
75typedef struct {
76 int bufTyp ; //what are we reading at the moment: 0=header 1=data -1=skip ...
77 int32_t bufPos ; //next byte to read to the buffer next
78 int32_t bufLen ; //number of bytes left to read
79
80 int sockStat ; //-1 if socket not yet connected
81 int socket ; //contains the sockets
82 struct sockaddr_in SockAddr ; //IP for each socket
83
84 int evtID ; // event ID of event currently read
85 int runID ; // run "
86//int evtPtr ; // index into evtCtrl structure
87 uint fadLen ; // FADlength of event currently read
88 int fadVers ; // Version of FAD
89 int board ; // boardID (softwareID: 0..40 )
90 int Port ;
91
92// int8_t *rBuf; //local buffer to be used when no event defined yet
93 CNV_FACT *rBuf ;
94
95} READ_STRUCT ;
96
97
98typedef union {
99 int8_t B[2];
100 int16_t S ;
101} SHORT_BYTE ;
102
103
104
105struct timespec xwait ;
106
107
108SHORT_BYTE start, stop;
109
110READ_STRUCT rd[MAX_SOCK] ; //buffer to read IP and afterwards store in mBuffer
111
112
113
114/*-----------------------------------------------------------------*/
115
116
117/*-----------------------------------------------------------------*/
118
119
120
121int GenSock(int flag, int crate0, int board0, int port0, READ_STRUCT *rd) {
122/*
123*** generate Address, create sockets and allocates readbuffer for it
124***
125*** if flag!=0 only close and redo the socket
126 */
127
128 int crate, board, port ;
129 char IPstr[100] ;
130 struct in_addr IPaddr ;
131
132 rd->sockStat = -1 ;
133
134
135 crate = crate0;
136 board = board0;
137 port = port0 ;
138
139
140 if (flag !=0 ) {
141 close(rd->socket) ;
142 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
143 error(1,errno,"Could not generate socket\n");
144 return -2 ;
145 }
146 return 0 ;
147 }
148
149
150 if (ETHTEST >0) {
151 port = port0+100*crate0+10*board0 ;
152 sprintf(IPstr,"10.0.%d.11",128+crate); // test on fact1
153// if (board==3) sprintf(IPstr,"10.0.100.11");
154
155// sprintf(IPstr,"10.0.131.11"); // test on fact1
156 inet_pton(PF_INET, IPstr, &IPaddr) ;
157 port = port0+100*crate0+10*board0 ;
158 } else {
159
160
161 sprintf(IPstr,"10.0.%d.%d",128+crate,128+board); // real environment
162 if ( inet_pton(PF_INET, IPstr, &IPaddr) <=0 ) {
163 error(1,errno,"Error: bad address c=%d b=%d '%s'\n", crate, board, IPstr);
164 return -1 ;
165 }
166 }
167
168 rd->Port = port ;
169 rd->board = crate0*10+board0 ;
170 rd->SockAddr.sin_family = PF_INET;
171 rd->SockAddr.sin_port = htons(port) ;
172 rd->SockAddr.sin_addr = IPaddr ;
173
174 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
175 error(1,errno,"Could not generate socket\n");
176 return -2 ;
177 } else {
178 rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
179 if ( rd->rBuf == NULL ) {
180 error(1,errno,"Could not create local buffer\n");
181 return -3 ;
182 }
183 }
184
185 return 0 ;
186
187} /*-----------------------------------------------------------------*/
188
189
190int PrintErr() {
191
192 int k,c,b,p,s ;
193
194
195 k=0 ;
196 printf("Errors:\n");
197 for (c=0; c<4; c++) {
198 for (b=0; b<10; b++) {
199 s=0 ;
200 printf("c%d b%d: ",c,b);
201 for (p=1; p<8; p++) {
202 printf("%7d",gi_ErrCnt[k]);
203 s+=gi_ErrCnt[k];
204 }
205 printf("%8d\n",s);
206 }
207 }
208
209 return 0;
210} /*-----------------------------------------------------------------*/
211
212
213int PrintRate() {
214
215 int k,c,b,p,s ;
216
217
218 if (g_actTime > gi_SecTime) {
219 gi_SecTime = g_actTime ;
220 printf("Nr Ev start %d compl %d bad %d\n",gi_EvtStart,gi_EvtRead,gi_EvtBad) ;
221
222
223 k=0 ;
224 printf("Rate/Second:\n");
225 for (c=0; c<4; c++) {
226 for (b=0; b<10; b++) {
227 s=0 ;
228 printf("c%d b%d: ",c,b);
229 for (p=1; p<8; p++) {
230 printf("%7d",gi_SecRate[k]);
231 s+= gi_SecRate[k];
232 gi_S10Rate[k]+=gi_SecRate[k];
233 gi_SecRate[k++]=0 ;
234 }
235 printf("%8d\n",s);
236 }
237 }
238 for (b=0; b<NBOARDS; b++)
239 printf("%d ",gi_NumConnect[b]) ;
240 printf("\n");
241 }
242
243
244 if ( g_actTime%10 == 0 && g_actTime > gi_S10Time) {
245 gi_S10Time = g_actTime ;
246 k=0 ;
247 printf("Rate/10Second:\n");
248 for (c=0; c<4; c++) {
249 for (b=0; b<10; b++) {
250 s=0 ;
251 printf("c%d b%d: ",c,b);
252 for (p=1; p<8; p++) {
253 printf("%7d",gi_S10Rate[k]);
254 s+= gi_S10Rate[k];
255 gi_MinRate[k]+=gi_S10Rate[k];
256 gi_S10Rate[k++]=0 ;
257 }
258 printf("%8d\n",s);
259 }
260 }
261 }
262
263
264 if ( g_actTime%60 == 0 && g_actTime > gi_MinTime) {
265 gi_MinTime = g_actTime ;
266 k=0 ;
267 printf("Rate/Minute:\n");
268 for (c=0; c<4; c++) {
269 for (b=0; b<10; b++) {
270 printf("c%d b%d: ",c,b);
271 s=0 ;
272 for (p=1; p<8; p++) {
273 printf("%7d",gi_MinRate[k]);
274 s+= gi_MinRate[k];
275 gi_MinRate[k++]=0 ;
276 }
277 printf("%8d\n",s);
278 }
279 }
280 }
281
282 return 0;
283
284} /*-----------------------------------------------------------------*/
285
286
287
288
289int mBufInit() {
290// initialize mBuffer (mark all entries as unused\empty)
291
292 int i,j,k ;
293
294 for (i=0; i<MAX_EVT*MAX_RUN; i++) {
295 mBuffer[i].evNum = mBuffer[i].runNum = -1;
296
297 evtCtrl.evtBuf[ i] = -1 ;
298 evtCtrl.evtStat[ i] = -1 ;
299 evtCtrl.pcTime[ i] = g_actTime + 50000000 ; //initiate to far future
300 }
301
302 for (i=0; i<MAX_RUN; i++) {
303 runCtrl[i].runId =
304 runCtrl[i].lastTime =
305 runCtrl[i].nextEvt = 0 ;
306 runCtrl[i].fileId = -2 ;
307
308 for (k=0; k<MAX_EVT; k++) runCtrl[i].buffId[k] = 0 ;
309
310 runTail[i].nEventsOk =
311 runTail[i].nEventsRej =
312 runTail[i].nEventsBad =
313 runTail[i].PCtime0 =
314 runTail[i].PCtimeX = 0 ;
315 }
316
317
318 evtCtrl.readPtr = 0 ;
319 evtCtrl.writePtr= 0 ;
320
321 return 0 ;
322
323} /*-----------------------------------------------------------------*/
324
325
326
327
328int mBufEvt(int evID, int runID, int nRoi) {
329// generate a new Event into mBuffer:
330// make sure only complete Event are possible, so 'free' will always work
331// returns index into mBuffer[], or negative value in case of error
332
333
334 int i, k, evFree ;
335
336 if (nRoi < 0 || nRoi > 1024) {
337 printf("illegal nRoi %d\n",nRoi) ;
338 return 99 ;
339 }
340
341 i = evID % MAX_EVT ;
342 evFree = -1 ;
343
344 for ( k=0; k<MAX_RUN; k++) {
345 if ( mBuffer[i].evNum == evID
346 && mBuffer[i].runNum== runID ) {
347 return i ;
348 }
349 if ( evFree < 0 && mBuffer[i].evNum < 0 ) evFree = i ;
350 i += MAX_EVT ;
351 }
352
353
354 //event does not yet exist; create
355 if (evFree < 0 ) { //no space available in ctrl
356 error(0,0, "no space left to keep event...") ;
357 return -1 ;
358 }
359
360
361
362 i = evFree ; //found free entry; use it ...
363
364 mBuffer[i].fEvent = malloc( sizeof(EVENT) ) ;
365 if (mBuffer[i].fEvent == NULL) return -12;
366
367
368 mBuffer[i].fEvent->StartPix = malloc( NPIX * sizeof(int16_t) ) ;
369 if (mBuffer[i].fEvent->StartPix == NULL) {
370 free(mBuffer[i].fEvent) ;
371 mBuffer[i].fEvent = NULL ;
372 mBuffer[i].nRoi = -3 ;
373 return -13;
374 }
375 for (k=0; k<NPIX; k++) mBuffer[i].fEvent->StartPix[k] = -1 ;
376
377 mBuffer[i].fEvent->StartTM = malloc( NTMARK * sizeof(int16_t) ) ;
378 if (mBuffer[i].fEvent->StartTM == NULL) {
379 free(mBuffer[i].fEvent->StartPix ) ;
380 free(mBuffer[i].fEvent) ;
381 mBuffer[i].fEvent = NULL ;
382 mBuffer[i].nRoi = -4 ;
383 return -14;
384 }
385 for (k=0; k<NTMARK; k++) mBuffer[i].fEvent->StartTM[k] = -1 ;
386
387 mBuffer[i].fEvent->Adc_Data = malloc( NPIX * nRoi * sizeof(int16_t) ) ;
388 if (mBuffer[i].fEvent->Adc_Data == NULL) {
389 free(mBuffer[i].fEvent->StartTM ) ;
390 free(mBuffer[i].fEvent->StartPix ) ;
391 free(mBuffer[i].fEvent) ;
392 mBuffer[i].fEvent = NULL ;
393 mBuffer[i].nRoi = -5 ;
394 return -15;
395 }
396
397 mBuffer[i].fEvent->Adc_Tmark= malloc( NTMARK* nRoi * sizeof(int16_t) ) ;
398 if (mBuffer[i].fEvent->Adc_Tmark== NULL) {
399 free(mBuffer[i].fEvent->Adc_Data ) ;
400 free(mBuffer[i].fEvent->StartTM ) ;
401 free(mBuffer[i].fEvent->StartPix ) ;
402 free(mBuffer[i].fEvent) ;
403 mBuffer[i].fEvent = NULL ;
404 mBuffer[i].nRoi = -6 ;
405 return -16;
406 }
407
408 mBuffer[i].fEvent->FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ;
409 if (mBuffer[i].fEvent->FADhead== NULL) {
410 free(mBuffer[i].fEvent->Adc_Tmark ) ;
411 free(mBuffer[i].fEvent->Adc_Data ) ;
412 free(mBuffer[i].fEvent->StartTM ) ;
413 free(mBuffer[i].fEvent->StartPix ) ;
414 free(mBuffer[i].fEvent) ;
415 mBuffer[i].fEvent = NULL ;
416 mBuffer[i].nRoi = -7 ;
417 return -17;
418 }
419
420 mBuffer[i].nBoard = 0 ;
421 for (k=0; k<NBOARDS; k++ ) {
422 mBuffer[i].board[k] = -1;
423 }
424
425 mBuffer[i].pcTime = g_actTime ;
426 mBuffer[i].nRoi = nRoi ;
427 mBuffer[i].evNum = evID ;
428 mBuffer[i].runNum = runID ;
429
430
431 //register event in 'active list (reading)'
432
433 evtCtrl.evtBuf[ evtCtrl.readPtr] = i ;
434 evtCtrl.evtStat[ evtCtrl.readPtr] = 0 ;
435 evtCtrl.pcTime[ evtCtrl.readPtr] = g_actTime ;
436 evtIdx[i] = evtCtrl.readPtr ;
437
438 evtCtrl.readPtr++ ;
439 if (evtCtrl.readPtr == MAX_EVT*MAX_RUN ) evtCtrl.readPtr = 0;
440
441 gi_EvtStart++ ;
442
443 //check if runId already registered in runCtrl
444 evFree = -1 ;
445 for (k=0; k<MAX_RUN; k++) {
446 if (runCtrl[k].runId == runID ) return i ;//run exists already
447 else if (evFree < 0 && runCtrl[k].runId == 0 ) evFree = k ;
448 }
449
450 if (evFree <0 ) {
451 error(0,0, "not able to register the new run %d\n",runID);
452 } else {
453 runCtrl[evFree].runId = runID ;
454 }
455
456
457 return i ;
458
459} /*-----------------------------------------------------------------*/
460
461
462int mBufFree(int i) {
463//delete entry [i] from mBuffer:
464//(and make sure multiple calls do no harm ....)
465
466 if ( mBuffer[i].nRoi > 0) { //have an fEvent structure generated ...
467 free(mBuffer[i].fEvent->Adc_Tmark) ;
468 free(mBuffer[i].fEvent->Adc_Data ) ;
469 free(mBuffer[i].fEvent->StartTM ) ;
470 free(mBuffer[i].fEvent->StartPix ) ;
471 free(mBuffer[i].fEvent ) ;
472 mBuffer[i].fEvent = NULL ;
473 }
474 mBuffer[i].evNum = mBuffer[i].runNum = mBuffer[i].nRoi= -1;
475
476 return 0 ;
477
478} /*-----------------------------------------------------------------*/
479
480
481 /*-----------------------------------------------------------------*/
482
483
484
485int initReadFAD() {
486/* *** initialize reading of FAD data */
487 int32_t i,j,k ;
488 int c,b,p ;
489
490
491 g_actTime = time(NULL) ;
492
493 k = 0 ;
494 for ( c=0; c<4; c++ )
495 for (b=0; b<10; b++ )
496 for (p=5001; p<5008; p++) {
497 j = GenSock(0,c,b,p, &rd[k]) ;
498 if ( j != 0 ) printf("problem with c%d b%d p%d\n",c,b,p);
499// else printf("ok socket %d = %d\n",k,rd[k].socket) ;
500 k++ ;
501 }
502
503 for (k=0; k<MAX_SOCK; k++)
504 gi_SecRate[k]=gi_S10Rate[k]=gi_MinRate[k]=gi_ErrCnt[k] = 0 ;
505
506 for (k=0; k<NBOARDS; k++)
507 gi_NumConnect[k]=0;
508
509
510 gi_SecTime= gi_S10Time= gi_MinTime= g_actTime ;
511
512 return NULL;
513
514} /*-----------------------------------------------------------------*/
515
516
517
518void *readFAD( void *ptr ) {
519/* *** main loop reading FAD data and sorting them to complete events */
520 int head_len,frst_len,numok,numok2,evFree,dest,evID,i,j,k ;
521 int32_t jrd ;
522 int8_t FADbyte0, FADbyte1, FADbyteX0, FADbyteX1 ;
523 int32_t myRun, cleanTime ;
524 int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ;
525 int reqBoards = 40 ;
526
527 int goodevt=0;
528 int goodhed=0;
529 int nbuf=0;
530 int ret ;
531
532 int waitTime = 10 ; //maximum nr of seconds wait for delayed packets
533
534 int nokCnt[MAX_SOCK],loopCnt=0;
535 int sokCnt[MAX_SOCK];
536
537 for (k=0; k<MAX_SOCK; k++) sokCnt[k]=nokCnt[k]=0 ;
538
539
540 head_len = sizeof(PEVNT_HEADER) ;
541 frst_len = head_len + 36 * 12 ;
542 if (head_len < MIN_LEN) { printf("headLen ...\n"); exit(99);}
543
544 numok = numok2 = 0 ;
545
546 start.S=0xFB01;
547 stop.S= 0x04FE;
548
549 myRun = g_actTime ;
550 cleanTime = g_actTime ; //once per second cleanup buffers from too old data
551
552
553
554
555 mBufInit() ;
556
557
558 while (g_runStat > 0) { //loop until global variable g_stop is set
559
560
561 g_actTime = time(NULL) ;
562 nokCnt[numok]++;
563
564 loopCnt++ ;
565
566 numok = 0 ; //count number of succesfull actions
567
568 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read
569
570 if (rd[i].sockStat <0 ) { //try to connect if not yet done
571 rd[i].sockStat=connect(rd[i].socket,
572 (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ;
573 if (rd[i].sockStat >=0 ) { //successfull ==>
574 rd[i].bufTyp = 0 ; // expect a header
575 rd[i].bufLen = frst_len ; // max size to read at begining
576 rd[i].bufPos = 0 ; // no byte read so far
577 gi_NumConnect[ rd[i].board ]++ ;
578 printf("+++connect %d %d\n",rd[i].board,gi_NumConnect[ rd[i].board ]);
579 }
580 }
581
582 if (rd[i].sockStat >=0) { //we have a connection ==> try to read
583 numok++ ;
584 sokCnt[i]++;
585 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
586
587 if (jrd == 0) { //connection has closed ...
588 rd[i].sockStat = -1 ; //flag (try to reopen next round)
589 error(0,errno,"Socket %d closed",i);
590 j = GenSock(1,0,0,0, &rd[i]) ;
591 gi_ErrCnt[i]++ ;
592 gi_NumConnect[ rd[i].board ]-- ;
593 printf("disconnect %d %d\n",rd[i].board,gi_NumConnect[ rd[i].board ]);
594 } else if ( jrd<0 ) { //did not read anything
595 if (errno != EAGAIN && errno != EWOULDBLOCK ) {
596 error(1,errno,"Error Reading from %d",i);
597 gi_ErrCnt[i]++ ;
598 } else numok-- ; //else nothing waiting to be read
599
600 } else if ( rd[i].bufTyp >0 ) { // we are reading data ...
601//printf("received data %d %d\n", i,jrd);
602
603 if ( jrd < rd[i].bufLen ) { //not yet all read
604 rd[i].bufPos += jrd ; //==> prepare for continuation
605 rd[i].bufLen -= jrd ;
606 } else { //full dataset read
607 rd[i].bufLen = rd[i].bufPos + j ;
608 rd[i].bufPos = rd[i].fadLen ;
609 if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0]
610 && rd[i].rBuf->B[ rd[i].bufPos ] != stop.B[1]) {
611 gi_ErrCnt[i]++ ;
612 printf( "wrong end of buffer found %d\n",rd[i].bufPos);
613 exit(1) ;
614 goto EndBuf ;
615
616 }
617
618 //we have a complete buffer, copy to WORK area
619 gi_SecRate[i]++ ;
620
621 roi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 ]) ;
622 //get index into mBuffer for this event (create if needed)
623 evID = mBufEvt( rd[i].evtID, rd[i].runID, roi ) ;
624
625 if (evID < 0) {
626 printf("no space left ...%d\n",evID) ;
627 exit(2) ;
628 goto EndBuf ;
629 }
630
631 //we have a valid entry in mBuffer[]; fill it
632
633 boardId = rd[i].board ;
634 if ( mBuffer[evID].board[ boardId ] != -1) { //this board already stored ...
635 printf( "board of this event already stored ...") ;
636 } else {
637
638 int iDx = evtIdx[evID] ; //index into evtCtrl
639
640 memcpy( &mBuffer[evID].fEvent->FADhead[boardId].start_package_flag,
641 &rd[i].rBuf->S[0], head_len) ;
642 mBuffer[evID].board[ boardId ] = boardId ;
643 roi = mBuffer[evID].nRoi ;
644
645 pixS = boardId*36 -1 ; //
646 tmS = boardId*4 -1 ; //
647 src = head_len/2 ;
648 for ( drs=0; drs<4; drs++ ) {
649 for ( px=0; px<9; px++ ) {
650 pixH= ntohs(rd[i].rBuf->S[src++]) ;
651 pixC= ntohs(rd[i].rBuf->S[src++]) ;
652 pixR= ntohs(rd[i].rBuf->S[src++]) ;
653
654 src++ ;
655 pixS++ ; //pixS = pixH2S[pixH] ;
656 if (pixR != roi ) {
657 if (px == 8 && pixR == 2*roi ) {
658 } else {
659 printf("wrong roi %d %d %d %d\n",px,pixR,roi,src-2);
660//exit(66);
661 }
662// goto EndBuf ;
663 }
664
665 mBuffer[evID].fEvent->StartPix[pixS] =pixC;
666 dest= pixS * roi ;
667 memcpy(
668 &mBuffer[evID].fEvent->Adc_Data[dest],
669 &rd[i].rBuf->S[src], roi * 2) ;
670 src+= roi ;
671
672 // if (px==8 && roi < 512 ) {
673 // tmS++ ;
674 // dest= tmS * roi ;
675 // mBuffer[evID].fEvent->StartTM[pixS] =pixC+roi;
676 // memcpy(
677 // &mBuffer[evID].fEvent->Adc_Tmark[dest],
678 // &rd[i].rBuf.S[src], roi * 2) ;
679// ?? not in the simulator ... src+= roi ;
680 // }
681 }
682 }
683 evtCtrl.evtStat[ iDx ]++ ;
684 evtCtrl.pcTime[ iDx ] = g_actTime ;
685
686 if (++mBuffer[evID].nBoard == 19 ) {
687 //complete event read ---> flag for next processing
688 evtCtrl.evtStat[ iDx ] = 99;
689 gi_EvtRead++ ;
690 gi_EvtTot++ ;
691 printf("complete event --------------------------------------------------\n");
692 }
693 }// now we have stored a new board contents into Event structure
694
695EndBuf:
696 rd[i].bufTyp = 0 ; //ready to read next header
697 rd[i].bufLen = frst_len ;
698 rd[i].bufPos = 0 ;
699 }
700
701 } else { //we are reading event header
702 rd[i].bufPos += jrd ;
703 rd[i].bufLen -= jrd ;
704 if ( rd[i].bufPos > MIN_LEN ){ //sufficient data to take action
705 //check if startflag correct; else shift block ....
706 for (k=0; k<rd[i].bufPos -1 ; k++) {
707 if (rd[i].rBuf->B[k ] == start.B[1]
708 && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
709 }
710 if (k >= rd[i].bufPos-1 ) { //no start of header found
711 printf("no start of header found !!!!\n");
712 rd[i].bufPos = 0 ;
713 rd[i].bufLen = head_len ;
714 } else if ( k>0 ) {
715 rd[i].bufPos -= k ;
716 rd[i].bufLen += k ;
717 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ;
718 }
719 if ( rd[i].bufPos > MIN_LEN ) {
720 goodhed++;
721 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ; ///???
722 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
723 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ;
724 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ;
725 printf("received event %d %d\n",rd[i].evtID,i);
726 if (rd[i].runID ==0 ) rd[i].runID = myRun ;
727 rd[i].bufTyp = 1 ; //ready to read full record
728 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
729 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ;
730 }
731 }
732 } //end interpreting last read
733 } //end of successful read anything
734 } //finished trying to read all sockets
735
736
737 g_actTime = time(NULL) ;
738 if ( g_actTime > gi_SecTime ) {
739// PrintRate() ;
740
741 //loop over all active events and flag those older than read-timeout
742
743 int kd = evtCtrl.readPtr - evtCtrl.writePtr ;
744 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
745 for ( k=evtCtrl.writePtr; k<(evtCtrl.writePtr+kd); k++ ) {
746 int k0 = k % (MAX_EVT*MAX_RUN) ;
747
748 if (evtCtrl.evtStat[k0] > 0
749 && evtCtrl.evtStat[k0] < 90
750 && evtCtrl.pcTime[k0] < g_actTime-10 ) {
751 evtCtrl.evtStat[k0] = 91 ;
752 gi_EvtBad++ ;
753 gi_EvtTot++ ;
754 }
755 }
756 }
757
758
759 if (numok > 0 ) numok2=0;
760 else if (numok2++ > 3) {
761 if (g_runStat == 1) {
762 xwait.tv_sec = 1;
763 xwait.tv_nsec= 0 ; // hibernate for 1 sec
764 } else {
765 xwait.tv_sec = 0;
766 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
767// xwait.tv_nsec= 10000000 ; // sleep for ~10 msec
768 }
769// printf("sleeping ...\n");
770 nanosleep( &xwait , NULL ) ;
771 }
772
773
774
775
776 } //and do next loop over all sockets ...
777
778 return NULL;
779} /*-----------------------------------------------------------------*/
780
781
782void *procEvt( void *ptr ) {
783/* *** main loop processing file, including SW-trigger */
784 int numProc ;
785 int k,k1,k2,kd ;
786
787 while (g_runStat > 0) {
788
789
790 kd = evtCtrl.readPtr - evtCtrl.writePtr ;
791 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
792 k1=evtCtrl.writePtr;
793 k2=evtCtrl.writePtr+kd;
794
795 numProc = 0 ;
796 if (gp_EvtTot < gi_EvtTot) {
797 for ( k=k1; k<k2; k++ ) {
798 int k0 = k % (MAX_EVT*MAX_RUN) ;
799
800 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
801 //ready to be processed ...
802 int id = evtCtrl.evtBuf[k0] ;
803 uint32_t irun = mBuffer[id].runNum ;
804 int ievt = mBuffer[id].evNum ;
805printf("processing %d %d %d %d\n",ievt,k,evtCtrl.evtStat[k0],evtCtrl.writePtr) ;
806 numProc++ ;
807 evtCtrl.evtStat[k0] = 501 ;
808 gp_EvtTot++ ;
809 }
810 }
811 }
812 if (numProc == 0) {
813 //seems we have nothing to do, so sleep a little
814 xwait.tv_sec = 0;
815 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec
816 nanosleep( &xwait , NULL ) ;
817 }
818
819 }
820 return NULL;
821
822} /*-----------------------------------------------------------------*/
823
824
825void *writeEvt( void *ptr ) {
826/* *** main loop writing event (including opening and closing run-files */
827
828 int numWrite = 0 ;
829 int j,id,irun,ievt ;
830
831 while (g_runStat > 0) { //loop until global variable g_stop is set
832
833 //loop over buffered events and check if something to write ...
834
835 if ( gp_EvtTot == gw_EvtTot ) {
836 //there is for sure nothing to do --> sleep a little
837 xwait.tv_sec = 0;
838 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec
839 nanosleep( &xwait , NULL ) ;
840
841 } else { //go through evtCtrl list to check if there might be something
842
843 //if run-file not yet opened==> open runfile (better to store headers in own structure ?)
844
845 //if eventid == next event for run ==> write it (or flag it)
846 //if eventid > next event exists, and nothing new for >time out ==> write it
847 //if nothing for this run for >timeout ==> close run
848
849 int kd = evtCtrl.readPtr - evtCtrl.writePtr ;
850 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
851 int k,k1,k2 ;
852
853
854 k1=evtCtrl.writePtr;
855 k2=evtCtrl.writePtr+kd;
856
857 int evtTot=gw_EvtTot ;
858 for ( k=k1; k<k2; k++ ) {
859 int k0 = k % (MAX_EVT*MAX_RUN) ;
860
861 if (evtCtrl.evtStat[k0] > 500 ) { //ready to be written ...
862 id = evtCtrl.evtBuf[k0] ;
863 irun = mBuffer[id].runNum ;
864 ievt = mBuffer[id].evNum ;
865
866 for ( j=0; j<MAX_RUN; j++) {
867 if ( runCtrl[j].runId == irun ) break ;
868 }
869 if ( j >= MAX_RUN ) {
870 printf("error: can not find run %d\n", irun);
871 exit(111);
872 }
873
874 if (runCtrl[j].fileId < 0 ) {
875 printf("open new run_file %d\n",irun) ;
876 runCtrl[j].fileId = 999 ; // should be a function call
877 runCtrl[j].nextEvt= 0;
878 runCtrl[j].lastTime=g_actTime ;
879 }
880
881 if (runCtrl[j].nextEvt == ievt ) { //write this event
882 printf("write event %d (run %d %d)\n",ievt,irun,evtCtrl.evtStat[k0] ) ;
883 runCtrl[j].nextEvt= ievt+1;
884 runCtrl[j].lastTime=g_actTime ;
885 evtCtrl.evtStat[k0]= -1 ;
886 gw_EvtTot++ ;
887 numWrite++ ;
888// evtCtrl.writePtr=k+1;
889 } else if ( ievt < runCtrl[j].nextEvt ) {
890 printf("delayed event (run %d %d %d) skipped\n",ievt,irun,evtCtrl.evtStat[k0] ) ;
891 evtCtrl.evtStat[k0]= -1 ;
892// evtCtrl.writePtr=k+1;
893 gw_EvtTot++ ;
894 numWrite++ ;
895 }
896 }
897
898
899
900 if ( runCtrl[j].lastTime < g_actTime-15) {
901 printf("non existing event skip %d (run %d -> %d)\n",runCtrl[j].nextEvt,irun,ievt) ;
902 runCtrl[j].nextEvt++;
903 numWrite++;
904 }
905
906 for ( j=0; j<MAX_RUN; j++) {
907 if ( runCtrl[j].runId >0 && runCtrl[j].lastTime < g_actTime-120) {
908 printf("close run %d (timeout)\n",irun) ;
909 runCtrl[j].fileId = -2 ;
910 runCtrl[j].runId = 0 ;
911 }
912 }
913 if (numWrite == 0 ) {
914 //nothing to do at the moment ==> sleep a little
915 xwait.tv_sec = 0;
916 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec
917 nanosleep( &xwait , NULL ) ;
918 }
919
920 }
921 }
922
923
924
925
926
927
928
929
930
931
932
933
934 return NULL;
935 }
936
937 return NULL;
938
939} /*-----------------------------------------------------------------*/
940
941
942
943/*
944int main() {
945 int i,th_ret[50] ;
946 pthread_t thread[50] ;
947
948 initReadFAD() ;
949 i=0 ;
950 th_ret[i] = pthread_create( &thread[i], NULL, readFAD, (void*) i++ );
951 th_ret[i] = pthread_create( &thread[i], NULL, procEvt, (void*) i++ );
952 th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, (void*) i++ );
953
954 for(;;) { sleep(1); }
955
956
957}
958*/
Note: See TracBrowser for help on using the repository browser.