source: trunk/FACT++/src/EventBuilder.c @ 10796

Last change on this file since 10796 was 10773, checked in by tbretz, 9 years ago
File size: 27.4 KB
Line 
1
2
3
4
5
6#include <stdlib.h>
7#include <stdint.h>
8#include <stdio.h>
9#include <time.h>
10#include <arpa/inet.h>
11#include <string.h>
12#include <math.h>
13#include <error.h>
14#include <errno.h>
15#include <unistd.h>
16#include <sys/types.h> 
17#include <sys/socket.h>
18#include <pthread.h>
19#include <sched.h>
20
21
22#include "EventBuilder.h"
23
24#define ETHTEST   0
25#define MIN_LEN  32        // min #bytes needed to interpret FADheader
26#define MAX_LEN  64*1024   // size of read-buffer per socket
27   
28int g_actTime   =  0 ;
29int g_runStat   = 40 ;
30int g_actBoards = 20 ;
31
32uint gi_SecRate[MAX_SOCK] ;
33uint gi_S10Rate[MAX_SOCK] ;
34uint gi_MinRate[MAX_SOCK] ;
35uint gi_ErrCnt[MAX_SOCK] ;
36
37uint gi_NumConnect[NBOARDS];   //4 crates * 10 boards
38
39uint gi_SecTime, gi_S10Time, gi_MinTime ;
40uint gi_EvtStart= 0 ;
41uint gi_EvtRead = 0 ;
42uint gi_EvtBad  = 0 ;
43uint gi_EvtTot  = 0 ;
44uint gw_EvtTot  = 0 ;
45uint gp_EvtTot  = 0 ;
46
47
48EVT_CTRL    evtCtrl ;       //control of events during processing
49int         evtIdx[MAX_EVT*MAX_RUN] ; //index from mBuffer to evtCtrl
50
51WRK_DATA    mBuffer[MAX_EVT*MAX_RUN]; //local working space
52
53
54
55
56RUN_HEAD    actRun ;
57
58RUN_CTRL    runCtrl[MAX_RUN] ;
59
60RUN_TAIL    runTail[MAX_RUN] ;
61
62
63/*
64*** Definition of rdBuffer to read in IP packets; keep it global !!!!
65 */
66
67
68typedef union {
69  int8_t  B[MAX_LEN/8];
70  int16_t S[MAX_LEN/4];
71  int32_t I[MAX_LEN/2];
72  int64_t L[MAX_LEN  ];
73} CNV_FACT ;
74
75typedef struct {
76  int  bufTyp ;          //what are we reading at the moment: 0=header 1=data -1=skip ...
77  int32_t  bufPos ;          //next byte to read to the buffer next
78  int32_t  bufLen ;          //number of bytes left to read
79
80  int  sockStat   ;      //-1 if socket not yet connected
81  int  socket     ;      //contains the sockets
82  struct sockaddr_in SockAddr ; //IP for each socket
83
84  int  evtID  ;          // event ID of event currently read
85  int  runID  ;          // run       "
86//int  evtPtr ;          // index into evtCtrl structure
87  uint  fadLen ;          // FADlength of event currently read
88  int  fadVers ;         // Version of FAD
89  int  board ;           // boardID (softwareID: 0..40 )
90  int  Port ;
91
92//  int8_t *rBuf;          //local buffer to be used when no event defined yet
93  CNV_FACT *rBuf ;
94
95} READ_STRUCT ;
96
97
98typedef union {
99  int8_t  B[2];
100  int16_t S  ;
101} SHORT_BYTE ;
102
103
104
105struct timespec xwait ;
106
107
108SHORT_BYTE  start, stop;
109
110READ_STRUCT rd[MAX_SOCK] ;  //buffer to read IP and afterwards store in mBuffer
111
112
113
114/*-----------------------------------------------------------------*/
115
116
117/*-----------------------------------------------------------------*/
118
119
120
121int GenSock(int flag, int crate0, int board0, int port0, READ_STRUCT *rd) {
122/*
123*** generate Address, create sockets and allocates readbuffer for it
124***
125*** if flag!=0 only close and redo the socket
126 */
127
128  int crate, board, port ;
129  char IPstr[100] ;
130  struct in_addr IPaddr ;
131
132  rd->sockStat = -1 ;
133
134
135  crate = crate0;
136  board = board0;
137  port  = port0 ;
138
139
140  if (flag !=0 ) {
141     close(rd->socket) ;
142     if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
143        error(1,errno,"Could not generate socket\n");
144        return -2 ;
145     }
146     return 0 ;
147  }
148
149
150  if (ETHTEST >0) {
151     port = port0+100*crate0+10*board0 ;
152     sprintf(IPstr,"10.0.%d.11",128+crate); // test on fact1
153//      if (board==3) sprintf(IPstr,"10.0.100.11");
154
155//     sprintf(IPstr,"10.0.131.11"); // test on fact1
156     inet_pton(PF_INET, IPstr, &IPaddr) ;
157     port = port0+100*crate0+10*board0 ;
158  } else {
159
160
161     sprintf(IPstr,"10.0.%d.%d",128+crate,128+board); // real environment
162     if ( inet_pton(PF_INET, IPstr, &IPaddr) <=0 ) {
163        error(1,errno,"Error: bad address c=%d b=%d '%s'\n", crate, board, IPstr);
164        return -1 ;
165     }
166  } 
167
168     rd->Port  = port ;
169     rd->board = crate0*10+board0 ;
170     rd->SockAddr.sin_family = PF_INET;
171     rd->SockAddr.sin_port = htons(port) ;
172     rd->SockAddr.sin_addr = IPaddr ;
173
174     if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
175        error(1,errno,"Could not generate socket\n");
176        return -2 ;
177     } else {
178       rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
179       if ( rd->rBuf == NULL ) {
180          error(1,errno,"Could not create local buffer\n");
181          return -3 ;
182       }
183     }
184
185  return 0 ;
186
187} /*-----------------------------------------------------------------*/
188
189
190int PrintErr() {
191
192  int k,c,b,p,s ;
193
194
195       k=0 ;
196       printf("Errors:\n");
197       for (c=0; c<4; c++) {
198          for (b=0; b<10; b++) {
199             s=0 ;
200             printf("c%d b%d: ",c,b);
201             for (p=1; p<8; p++) {
202                printf("%7d",gi_ErrCnt[k]);
203                s+=gi_ErrCnt[k];
204             }
205             printf("%8d\n",s);
206          }
207       }
208
209       return 0;
210} /*-----------------------------------------------------------------*/
211
212
213int PrintRate() {
214
215  int k,c,b,p,s ;
216
217
218    if (g_actTime > gi_SecTime) {
219       gi_SecTime = g_actTime ;
220       printf("Nr Ev start %d compl %d bad %d\n",gi_EvtStart,gi_EvtRead,gi_EvtBad) ;
221
222
223       k=0 ;
224       printf("Rate/Second:\n");
225       for (c=0; c<4; c++) {
226          for (b=0; b<10; b++) {
227             s=0 ;
228             printf("c%d b%d: ",c,b);
229             for (p=1; p<8; p++) {
230                printf("%7d",gi_SecRate[k]);
231                s+=            gi_SecRate[k];
232                gi_S10Rate[k]+=gi_SecRate[k];
233                gi_SecRate[k++]=0 ;
234             }
235             printf("%8d\n",s);
236          }
237       }
238       for (b=0; b<NBOARDS; b++)
239          printf("%d ",gi_NumConnect[b]) ;
240       printf("\n");
241    }
242
243
244    if ( g_actTime%10 == 0 && g_actTime > gi_S10Time) {
245       gi_S10Time = g_actTime ;
246       k=0 ;
247       printf("Rate/10Second:\n");
248       for (c=0; c<4; c++) {
249          for (b=0; b<10; b++) {
250             s=0 ;
251             printf("c%d b%d: ",c,b);
252             for (p=1; p<8; p++) {
253                printf("%7d",gi_S10Rate[k]);
254                s+=            gi_S10Rate[k];
255                gi_MinRate[k]+=gi_S10Rate[k];
256                gi_S10Rate[k++]=0 ;
257             }
258             printf("%8d\n",s);
259          }
260       }
261    }
262
263
264    if ( g_actTime%60 == 0 && g_actTime > gi_MinTime) {
265       gi_MinTime = g_actTime ;
266       k=0 ;
267       printf("Rate/Minute:\n");
268       for (c=0; c<4; c++) {
269          for (b=0; b<10; b++) {
270             printf("c%d b%d: ",c,b);
271             s=0 ;
272             for (p=1; p<8; p++) {
273                printf("%7d",gi_MinRate[k]);
274                s+=          gi_MinRate[k];
275                gi_MinRate[k++]=0 ;
276             }
277             printf("%8d\n",s);
278          }
279       }
280    }
281 
282      return 0;
283
284} /*-----------------------------------------------------------------*/
285
286
287
288
289int mBufInit() {
290// initialize mBuffer (mark all entries as unused\empty)
291
292   int i,j,k ;
293
294   for (i=0; i<MAX_EVT*MAX_RUN; i++) {
295      mBuffer[i].evNum = mBuffer[i].runNum = -1;
296
297      evtCtrl.evtBuf[  i] = -1 ;
298      evtCtrl.evtStat[ i] = -1 ;
299      evtCtrl.pcTime[  i] = g_actTime + 50000000 ;  //initiate to far future
300   }
301
302   for (i=0; i<MAX_RUN; i++) {
303      runCtrl[i].runId = 
304      runCtrl[i].lastTime = 
305      runCtrl[i].nextEvt = 0 ;
306      runCtrl[i].fileId = -2 ;
307
308      for (k=0; k<MAX_EVT; k++) runCtrl[i].buffId[k] = 0 ;
309
310      runTail[i].nEventsOk =
311      runTail[i].nEventsRej =
312      runTail[i].nEventsBad =
313      runTail[i].PCtime0 =
314      runTail[i].PCtimeX = 0 ;
315   }
316
317
318   evtCtrl.readPtr =  0 ;
319   evtCtrl.writePtr=  0 ;
320
321   return 0 ;
322
323} /*-----------------------------------------------------------------*/
324
325
326
327
328int mBufEvt(int evID, int runID, int nRoi) {
329// generate a new Event into mBuffer:   
330// make sure only complete Event are possible, so 'free' will always work
331// returns index into mBuffer[], or negative value in case of error
332
333
334   int i, k, evFree ;
335
336   if (nRoi < 0 || nRoi > 1024) {
337      printf("illegal nRoi %d\n",nRoi) ;
338      return 99 ;
339   }
340
341   i = evID % MAX_EVT ;
342   evFree = -1 ;
343
344   for ( k=0; k<MAX_RUN; k++) {
345      if ( mBuffer[i].evNum == evID
346        && mBuffer[i].runNum== runID ) {
347         return i ; 
348      }
349      if ( evFree < 0 && mBuffer[i].evNum < 0 ) evFree = i ;
350      i += MAX_EVT ;
351   }
352
353
354   //event does not yet exist; create
355   if (evFree < 0 ) {        //no space available in ctrl
356        error(0,0, "no space left to keep event...") ;
357        return -1 ;
358   }
359
360   
361
362   i = evFree ;   //found free entry; use it ...
363
364   mBuffer[i].fEvent  = malloc( sizeof(EVENT) ) ;
365   if (mBuffer[i].fEvent  == NULL) return -12;
366
367
368   mBuffer[i].fEvent->StartPix  = malloc( NPIX * sizeof(int16_t) ) ;
369   if (mBuffer[i].fEvent->StartPix  == NULL) {
370      free(mBuffer[i].fEvent) ;
371      mBuffer[i].fEvent = NULL ;
372      mBuffer[i].nRoi = -3 ;
373      return -13;
374   }
375   for (k=0; k<NPIX; k++) mBuffer[i].fEvent->StartPix[k] = -1 ;
376
377   mBuffer[i].fEvent->StartTM   = malloc( NTMARK * sizeof(int16_t) ) ;
378   if (mBuffer[i].fEvent->StartTM  == NULL) {
379      free(mBuffer[i].fEvent->StartPix ) ;
380      free(mBuffer[i].fEvent) ;
381      mBuffer[i].fEvent = NULL ;
382      mBuffer[i].nRoi = -4 ;
383      return -14;
384   }
385   for (k=0; k<NTMARK; k++) mBuffer[i].fEvent->StartTM[k] = -1 ;
386
387   mBuffer[i].fEvent->Adc_Data = malloc( NPIX  * nRoi * sizeof(int16_t) ) ;
388   if (mBuffer[i].fEvent->Adc_Data == NULL) {
389      free(mBuffer[i].fEvent->StartTM  ) ;
390      free(mBuffer[i].fEvent->StartPix ) ;
391      free(mBuffer[i].fEvent) ;
392      mBuffer[i].fEvent = NULL ;
393      mBuffer[i].nRoi = -5 ;
394      return -15;
395   }
396
397   mBuffer[i].fEvent->Adc_Tmark= malloc( NTMARK* nRoi * sizeof(int16_t) ) ;
398   if (mBuffer[i].fEvent->Adc_Tmark== NULL) {
399      free(mBuffer[i].fEvent->Adc_Data ) ;
400      free(mBuffer[i].fEvent->StartTM  ) ;
401      free(mBuffer[i].fEvent->StartPix ) ;
402      free(mBuffer[i].fEvent) ;
403      mBuffer[i].fEvent = NULL ;
404      mBuffer[i].nRoi = -6 ;
405      return -16;
406   }
407
408   mBuffer[i].fEvent->FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ;
409   if (mBuffer[i].fEvent->FADhead== NULL) {
410      free(mBuffer[i].fEvent->Adc_Tmark ) ;
411      free(mBuffer[i].fEvent->Adc_Data ) ;
412      free(mBuffer[i].fEvent->StartTM  ) ;
413      free(mBuffer[i].fEvent->StartPix ) ;
414      free(mBuffer[i].fEvent) ;
415      mBuffer[i].fEvent = NULL ;
416      mBuffer[i].nRoi = -7 ;
417      return -17;
418   }
419
420   mBuffer[i].nBoard = 0 ;
421   for (k=0; k<NBOARDS; k++ ) {
422      mBuffer[i].board[k] = -1;
423   }
424
425   mBuffer[i].pcTime  = g_actTime ;
426   mBuffer[i].nRoi    = nRoi ;
427   mBuffer[i].evNum   = evID  ;
428   mBuffer[i].runNum  = runID ;
429
430
431   //register event in 'active list (reading)'
432
433   evtCtrl.evtBuf[  evtCtrl.readPtr] = i ;
434   evtCtrl.evtStat[ evtCtrl.readPtr] = 0 ;
435   evtCtrl.pcTime[  evtCtrl.readPtr] = g_actTime ;
436   evtIdx[i] = evtCtrl.readPtr ;
437
438   evtCtrl.readPtr++ ;
439   if (evtCtrl.readPtr == MAX_EVT*MAX_RUN ) evtCtrl.readPtr = 0;
440
441   gi_EvtStart++ ;
442
443   //check if runId already registered in runCtrl
444   evFree = -1 ;
445   for (k=0; k<MAX_RUN; k++) {
446      if (runCtrl[k].runId == runID ) return i ;//run exists already
447      else if (evFree < 0 && runCtrl[k].runId == 0 ) evFree = k ;
448   }
449   
450   if (evFree <0 ) {
451      error(0,0, "not able to register the new run %d\n",runID);
452   } else {
453      runCtrl[evFree].runId = runID ;
454   }
455
456
457   return i ;
458 
459} /*-----------------------------------------------------------------*/
460
461
462int mBufFree(int i) {
463//delete entry [i] from mBuffer:
464//(and make sure multiple calls do no harm ....)
465
466   if ( mBuffer[i].nRoi > 0) {      //have an fEvent structure generated ...
467      free(mBuffer[i].fEvent->Adc_Tmark) ;
468      free(mBuffer[i].fEvent->Adc_Data ) ;
469      free(mBuffer[i].fEvent->StartTM  ) ;
470      free(mBuffer[i].fEvent->StartPix ) ;
471      free(mBuffer[i].fEvent ) ;
472      mBuffer[i].fEvent = NULL ;
473   }
474   mBuffer[i].evNum   = mBuffer[i].runNum = mBuffer[i].nRoi= -1;
475   
476   return 0 ;
477 
478} /*-----------------------------------------------------------------*/
479
480
481  /*-----------------------------------------------------------------*/
482
483
484
485int initReadFAD() {
486/* *** initialize reading of FAD data */
487  int32_t i,j,k ;
488  int c,b,p ;
489
490
491  g_actTime = time(NULL) ;
492
493  k = 0 ;
494  for ( c=0; c<4; c++ )
495     for (b=0; b<10; b++ )
496        for (p=5001; p<5008; p++) {
497          j = GenSock(0,c,b,p, &rd[k]) ;
498          if ( j != 0 ) printf("problem with c%d b%d p%d\n",c,b,p);
499//        else          printf("ok socket %d = %d\n",k,rd[k].socket) ;
500          k++ ;
501        }
502
503  for (k=0; k<MAX_SOCK; k++)
504     gi_SecRate[k]=gi_S10Rate[k]=gi_MinRate[k]=gi_ErrCnt[k] = 0 ;
505
506  for (k=0; k<NBOARDS; k++)
507     gi_NumConnect[k]=0;
508
509
510  gi_SecTime= gi_S10Time= gi_MinTime= g_actTime ;
511     
512         return NULL;
513
514} /*-----------------------------------------------------------------*/
515
516
517
518void *readFAD( void *ptr ) {
519/* *** main loop reading FAD data and sorting them to complete events */
520  int head_len,frst_len,numok,numok2,evFree,dest,evID,i,j,k ;
521  int32_t jrd ;
522  int8_t FADbyte0, FADbyte1, FADbyteX0, FADbyteX1 ;
523  int32_t myRun, cleanTime ;
524  int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ;
525  int reqBoards = 40 ;
526
527  int goodevt=0;
528  int goodhed=0;
529  int nbuf=0;
530  int ret ;
531
532  int waitTime = 10 ; //maximum nr of seconds wait for delayed packets
533
534  int nokCnt[MAX_SOCK],loopCnt=0;
535  int sokCnt[MAX_SOCK];
536
537  for (k=0; k<MAX_SOCK; k++) sokCnt[k]=nokCnt[k]=0 ;
538
539
540  head_len = sizeof(PEVNT_HEADER) ;
541  frst_len = head_len + 36 * 12 ;
542  if (head_len < MIN_LEN) { printf("headLen ...\n"); exit(99);}
543
544  numok = numok2   = 0 ;
545
546  start.S=0xFB01;
547  stop.S= 0x04FE;
548
549  myRun = g_actTime ;
550  cleanTime = g_actTime ;     //once per second cleanup buffers from too old data
551
552
553
554
555  mBufInit() ;
556
557
558  while (g_runStat > 0) {           //loop until global variable g_stop is set
559
560
561    g_actTime = time(NULL) ;
562    nokCnt[numok]++;
563
564    loopCnt++ ;
565
566    numok = 0 ;                       //count number of succesfull actions
567
568    for (i=0; i<MAX_SOCK; i++) {      //check all sockets if something to read
569
570      if (rd[i].sockStat <0 ) {         //try to connect if not yet done
571        rd[i].sockStat=connect(rd[i].socket,
572            (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ;
573        if (rd[i].sockStat >=0 ) {      //successfull ==>
574          rd[i].bufTyp = 0 ;            //  expect a header
575          rd[i].bufLen = frst_len ;     //  max size to read at begining
576          rd[i].bufPos = 0 ;            //  no byte read so far
577 gi_NumConnect[ rd[i].board ]++ ;
578 printf("+++connect %d %d\n",rd[i].board,gi_NumConnect[ rd[i].board ]);
579        }
580      }
581
582      if (rd[i].sockStat >=0) {     //we have a connection ==> try to read
583        numok++ ;
584        sokCnt[i]++;
585        jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
586
587        if (jrd == 0) {                 //connection has closed ...
588           rd[i].sockStat = -1 ;        //flag (try to reopen next round)
589           error(0,errno,"Socket %d closed",i);
590           j = GenSock(1,0,0,0, &rd[i]) ;
591           gi_ErrCnt[i]++ ;
592 gi_NumConnect[ rd[i].board ]-- ;
593 printf("disconnect %d %d\n",rd[i].board,gi_NumConnect[ rd[i].board ]);
594        } else if ( jrd<0 ) {           //did not read anything
595           if (errno != EAGAIN && errno != EWOULDBLOCK ) {
596              error(1,errno,"Error Reading from %d",i);
597              gi_ErrCnt[i]++ ;
598           } else  numok-- ;            //else nothing waiting to be read
599
600        } else if ( rd[i].bufTyp >0 ) { // we are reading data ...
601//printf("received data %d %d\n", i,jrd);
602
603            if ( jrd < rd[i].bufLen ) {    //not yet all read
604             rd[i].bufPos += jrd ;        //==> prepare for continuation
605             rd[i].bufLen -= jrd ;
606           } else {                     //full dataset read
607             rd[i].bufLen  = rd[i].bufPos + j ;
608             rd[i].bufPos  = rd[i].fadLen ;
609             if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0]
610               && rd[i].rBuf->B[ rd[i].bufPos  ] != stop.B[1]) {
611                gi_ErrCnt[i]++ ;
612                printf( "wrong end of buffer found %d\n",rd[i].bufPos);
613 exit(1) ;
614                goto EndBuf ;
615
616             } 
617
618             //we have a complete buffer, copy to WORK area
619             gi_SecRate[i]++ ;
620
621             roi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 ]) ; 
622             //get index into mBuffer for this event (create if needed)
623             evID = mBufEvt( rd[i].evtID, rd[i].runID, roi ) ;
624
625             if (evID < 0) {
626                printf("no space left ...%d\n",evID) ;
627 exit(2) ;
628                goto EndBuf ;
629             } 
630
631             //we have a valid entry in mBuffer[]; fill it
632
633             boardId = rd[i].board  ;
634             if ( mBuffer[evID].board[ boardId ] != -1) {   //this board already stored ...
635                printf( "board of this event already stored ...") ;
636             } else {
637
638                int iDx = evtIdx[evID] ;   //index into evtCtrl
639
640                memcpy( &mBuffer[evID].fEvent->FADhead[boardId].start_package_flag,
641                        &rd[i].rBuf->S[0], head_len) ;
642                mBuffer[evID].board[ boardId ] = boardId ;
643                roi  = mBuffer[evID].nRoi ;
644
645                pixS = boardId*36 -1 ;   //
646                tmS  = boardId*4  -1 ;   //
647                src  = head_len/2 ;
648                for ( drs=0; drs<4; drs++ ) {
649                   for ( px=0; px<9; px++ ) {
650                      pixH= ntohs(rd[i].rBuf->S[src++]) ;
651                      pixC= ntohs(rd[i].rBuf->S[src++]) ;
652                      pixR= ntohs(rd[i].rBuf->S[src++]) ;
653 
654                      src++  ;
655                      pixS++ ; //pixS = pixH2S[pixH] ;
656                      if (pixR != roi ) {
657                         if (px == 8 && pixR == 2*roi ) {
658                         } else {
659                            printf("wrong roi %d %d %d %d\n",px,pixR,roi,src-2);
660//exit(66);
661                         }
662//                         goto EndBuf ;
663                      }
664
665                      mBuffer[evID].fEvent->StartPix[pixS] =pixC;
666                      dest= pixS * roi ;
667                      memcpy(
668                            &mBuffer[evID].fEvent->Adc_Data[dest],
669                            &rd[i].rBuf->S[src],  roi * 2) ;
670                      src+= roi ;
671
672        //            if (px==8 && roi < 512 ) {
673        //               tmS++ ;
674        //               dest= tmS * roi ;
675        //               mBuffer[evID].fEvent->StartTM[pixS] =pixC+roi;
676        //               memcpy(
677        //                     &mBuffer[evID].fEvent->Adc_Tmark[dest],
678        //                     &rd[i].rBuf.S[src],  roi * 2) ;
679// ?? not in the simulator ...                      src+= roi ;
680        //            }
681                   }
682                }
683                evtCtrl.evtStat[ iDx ]++ ;
684                evtCtrl.pcTime[ iDx ] = g_actTime ;
685
686                if (++mBuffer[evID].nBoard == 19 ) {
687                   //complete event read ---> flag for next processing
688                   evtCtrl.evtStat[ iDx ] = 99;
689                   gi_EvtRead++ ;
690                   gi_EvtTot++ ;   
691 printf("complete event --------------------------------------------------\n");
692                }
693             }// now we have stored a new board contents into Event structure
694
695EndBuf:
696             rd[i].bufTyp = 0 ;           //ready to read next header
697             rd[i].bufLen = frst_len ;
698             rd[i].bufPos = 0 ;         
699           }
700
701        } else {                        //we are reading event header
702           rd[i].bufPos += jrd ;
703           rd[i].bufLen -= jrd ;
704           if ( rd[i].bufPos > MIN_LEN ){ //sufficient data to take action
705              //check if startflag correct; else shift block ....
706              for (k=0; k<rd[i].bufPos -1 ; k++) {
707                 if (rd[i].rBuf->B[] == start.B[1]
708                  && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
709              }
710              if (k >= rd[i].bufPos-1 ) {   //no start of header found
711 printf("no start of header found !!!!\n");
712                 rd[i].bufPos = 0 ;
713                 rd[i].bufLen = head_len ;
714              } else if ( k>0 ) {
715                 rd[i].bufPos -= k ;
716                 rd[i].bufLen += k ;
717                 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ;
718              }
719              if ( rd[i].bufPos > MIN_LEN ) {
720                 goodhed++;
721                 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ; ///???
722                 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
723                 rd[i].evtID  = ntohl(rd[i].rBuf->I[4]) ;
724                 rd[i].runID  = ntohl(rd[i].rBuf->I[11]) ;
725 printf("received event %d %d\n",rd[i].evtID,i);
726                    if (rd[i].runID ==0 ) rd[i].runID = myRun ;
727                 rd[i].bufTyp = 1 ;       //ready to read full record
728                 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
729                 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ;
730              }
731           }
732        } //end interpreting last read
733      } //end of successful read anything
734    } //finished trying to read all sockets
735
736
737    g_actTime = time(NULL) ;
738    if ( g_actTime > gi_SecTime ) {
739//       PrintRate() ;
740
741       //loop over all active events and flag those older than read-timeout
742
743       int kd = evtCtrl.readPtr - evtCtrl.writePtr ;
744       if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
745       for ( k=evtCtrl.writePtr; k<(evtCtrl.writePtr+kd); k++ ) {
746          int k0 = k % (MAX_EVT*MAX_RUN) ;
747
748          if (evtCtrl.evtStat[k0] > 0
749           && evtCtrl.evtStat[k0] < 90
750           && evtCtrl.pcTime[k0] < g_actTime-10 ) {
751             evtCtrl.evtStat[k0] = 91 ;
752             gi_EvtBad++ ;
753             gi_EvtTot++ ;   
754          }
755       }
756    }
757
758
759    if (numok > 0 ) numok2=0;
760    else if (numok2++ > 3) {
761       if (g_runStat == 1) {
762          xwait.tv_sec = 1;
763          xwait.tv_nsec= 0 ;        // hibernate for 1 sec
764       } else {
765          xwait.tv_sec = 0;
766          xwait.tv_nsec= 2000000 ;  // sleep for ~2 msec
767//          xwait.tv_nsec= 10000000 ;  // sleep for ~10 msec
768       }
769//       printf("sleeping ...\n");
770       nanosleep( &xwait , NULL ) ;
771    }
772
773
774
775
776 } //and do next loop over all sockets ...
777 
778      return NULL;
779} /*-----------------------------------------------------------------*/
780
781
782void *procEvt( void *ptr ) {
783/* *** main loop processing file, including SW-trigger */
784  int numProc ;
785  int k,k1,k2,kd ;
786
787  while (g_runStat > 0) {
788
789
790     kd = evtCtrl.readPtr - evtCtrl.writePtr ;
791     if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
792     k1=evtCtrl.writePtr;
793     k2=evtCtrl.writePtr+kd;
794
795     numProc = 0 ;
796     if (gp_EvtTot < gi_EvtTot) {
797        for ( k=k1; k<k2; k++ ) {
798           int k0 = k % (MAX_EVT*MAX_RUN) ;
799
800           if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
801              //ready to be processed ...
802              int      id   = evtCtrl.evtBuf[k0] ;
803              uint32_t irun = mBuffer[id].runNum ;
804              int      ievt = mBuffer[id].evNum ;
805printf("processing %d %d %d %d\n",ievt,k,evtCtrl.evtStat[k0],evtCtrl.writePtr) ;
806              numProc++ ;
807              evtCtrl.evtStat[k0] = 501 ;
808              gp_EvtTot++ ;
809           }
810        }
811     }
812     if (numProc == 0) {
813        //seems we have nothing to do, so sleep a little
814        xwait.tv_sec = 0;
815        xwait.tv_nsec= 10000000 ;  // sleep for ~10 msec
816        nanosleep( &xwait , NULL ) ;
817     }
818
819  }
820  return NULL;
821 
822} /*-----------------------------------------------------------------*/
823
824
825void *writeEvt( void *ptr ) {
826/* *** main loop writing event (including opening and closing run-files */
827
828  int  numWrite = 0 ;
829  int j,id,irun,ievt ;
830
831  while (g_runStat > 0) {           //loop until global variable g_stop is set
832
833    //loop over buffered events and check if something to write ...
834
835    if ( gp_EvtTot == gw_EvtTot ) {
836       //there is for sure nothing to do --> sleep a little
837       xwait.tv_sec = 0;
838       xwait.tv_nsec= 10000000 ;  // sleep for ~10 msec
839       nanosleep( &xwait , NULL ) ;
840
841    } else {  //go through evtCtrl list to check if there might be something
842
843       //if run-file not yet opened==> open runfile (better to store headers in own structure ?)
844
845       //if eventid == next event for run ==> write it (or flag it)
846       //if eventid > next event exists, and nothing new for >time out ==> write it
847       //if nothing for this run for >timeout ==> close run
848
849       int kd = evtCtrl.readPtr - evtCtrl.writePtr ;
850       if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
851       int k,k1,k2 ;
852
853
854       k1=evtCtrl.writePtr;
855       k2=evtCtrl.writePtr+kd;
856
857       int evtTot=gw_EvtTot ;
858       for ( k=k1; k<k2; k++ ) {
859          int k0 = k % (MAX_EVT*MAX_RUN) ;
860
861          if (evtCtrl.evtStat[k0] > 500 ) { //ready to be written ...
862             id   = evtCtrl.evtBuf[k0] ;
863             irun = mBuffer[id].runNum ;
864             ievt = mBuffer[id].evNum ;
865
866             for ( j=0; j<MAX_RUN; j++) {
867                if ( runCtrl[j].runId == irun ) break ;
868             }
869             if ( j >= MAX_RUN ) {
870 printf("error: can not find run %d\n", irun);
871 exit(111);
872             }
873
874             if (runCtrl[j].fileId < 0 ) {
875 printf("open new run_file %d\n",irun) ;
876                runCtrl[j].fileId = 999  ; // should be a function call
877                runCtrl[j].nextEvt= 0;
878                runCtrl[j].lastTime=g_actTime ;
879             }
880
881             if (runCtrl[j].nextEvt == ievt ) {  //write this event
882 printf("write event %d (run %d %d)\n",ievt,irun,evtCtrl.evtStat[k0] ) ;
883                runCtrl[j].nextEvt= ievt+1;
884                runCtrl[j].lastTime=g_actTime ;
885                evtCtrl.evtStat[k0]= -1 ;
886                gw_EvtTot++ ;
887                numWrite++ ;
888//                evtCtrl.writePtr=k+1;
889             } else if ( ievt < runCtrl[j].nextEvt ) {
890 printf("delayed event (run %d %d %d) skipped\n",ievt,irun,evtCtrl.evtStat[k0] ) ;
891                evtCtrl.evtStat[k0]= -1 ;
892//                evtCtrl.writePtr=k+1;
893                gw_EvtTot++ ;
894                numWrite++ ;
895             } 
896          }
897
898
899
900          if ( runCtrl[j].lastTime < g_actTime-15) {
901 printf("non existing event skip %d (run %d -> %d)\n",runCtrl[j].nextEvt,irun,ievt) ;
902             runCtrl[j].nextEvt++;
903             numWrite++;
904          } 
905
906          for ( j=0; j<MAX_RUN; j++) {
907             if ( runCtrl[j].runId >0 && runCtrl[j].lastTime < g_actTime-120) {
908 printf("close run %d (timeout)\n",irun) ;
909                runCtrl[j].fileId = -2 ;
910                runCtrl[j].runId  =  0 ;
911             }
912          }
913          if (numWrite == 0 ) {
914             //nothing to do at the moment ==> sleep a little
915             xwait.tv_sec = 0;
916             xwait.tv_nsec= 10000000 ;  // sleep for ~10 msec
917             nanosleep( &xwait , NULL ) ;
918          }
919
920       }
921     }
922
923
924
925
926
927
928
929
930
931
932
933
934  return NULL;
935  }
936
937  return NULL;
938
939} /*-----------------------------------------------------------------*/
940
941
942
943/*
944int main() {
945  int i,th_ret[50] ;
946  pthread_t thread[50] ;
947
948  initReadFAD() ;
949  i=0 ;
950  th_ret[i] = pthread_create( &thread[i], NULL, readFAD,  (void*) i++ );
951  th_ret[i] = pthread_create( &thread[i], NULL, procEvt,  (void*) i++ );
952  th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, (void*) i++ );
953
954  for(;;) { sleep(1); }
955
956
957}
958*/
Note: See TracBrowser for help on using the repository browser.