source: trunk/FACT++/src/EventBuilder.c@ 11299

Last change on this file since 11299 was 11293, checked in by tbretz, 14 years ago
Updated statistics.
File size: 48.4 KB
Line 
1
2
3#define PX8 99 //simulator does not create double-length roi for pixel 8
4 //for real data, set PX8 = 8 ==> ask for double roi=TM
5
6
7
8#include <stdlib.h>
9#include <stdint.h>
10#include <unistd.h>
11#include <stdio.h>
12#include <sys/time.h>
13#include <arpa/inet.h>
14#include <string.h>
15#include <math.h>
16#include <error.h>
17#include <errno.h>
18#include <unistd.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <pthread.h>
22#include <sched.h>
23
24#include "EventBuilder.h"
25
26enum Severity
27{
28 kMessage = 10, ///< Just a message, usually obsolete
29 kInfo = 20, ///< An info telling something which can be interesting to know
30 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
31 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
32 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
33 kDebug = 99, ///< A message used for debugging only
34};
35
36#define MIN_LEN 32 // min #bytes needed to interpret FADheader
37#define MAX_LEN 256*1024 // size of read-buffer per socket
38
39extern FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) ;
40extern int runWrite(FileHandle_t fileHd , EVENT *event, size_t len ) ;
41extern int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len ) ;
42extern void factOut(int severity, int err, char* message ) ;
43
44extern void factStat(GUI_STAT gj) ;
45
46extern void factStatNew(EVT_STAT gi) ;
47
48extern int eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ;
49
50extern void debugHead(int i, int j, void *buf);
51
52extern void debugRead(int isock, int ibyte, int32_t event,int32_t ftmevt,
53 int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) ;
54extern void debugStream(int isock, void *buf, int len) ;
55
56
57
58int g_actTime ;
59int g_runStat ;
60size_t g_maxMem ; //maximum memory allowed for buffer
61
62//no longer needed ...
63 int g_maxBoards ; //maximum number of boards to be initialized
64 int g_actBoards ;
65//
66
67FACT_SOCK g_port[NBOARDS] ; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
68
69
70 int gi_runStat ;
71 int gp_runStat ;
72 int gw_runStat ;
73
74 int gi_memStat = +1 ;
75
76 int32_t gi_myRun ;
77
78
79
80uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
81
82//uint gi_EvtStart= 0 ;
83//uint gi_EvtRead = 0 ;
84//uint gi_EvtBad = 0 ;
85//uint gi_EvtTot = 0 ;
86//size_t gi_usedMem = 0 ;
87
88//uint gw_EvtTot = 0 ;
89//uint gp_EvtTot = 0 ;
90
91PIX_MAP g_pixMap[NPIX] ;
92
93EVT_STAT gi ;
94GUI_STAT gj ;
95
96EVT_CTRL evtCtrl ; //control of events during processing
97int evtIdx[MAX_EVT*MAX_RUN] ; //index from mBuffer to evtCtrl
98
99WRK_DATA mBuffer[MAX_EVT*MAX_RUN]; //local working space
100
101
102
103
104RUN_HEAD actRun ;
105
106RUN_CTRL runCtrl[MAX_RUN] ;
107
108RUN_TAIL runTail[MAX_RUN] ;
109
110
111/*
112*** Definition of rdBuffer to read in IP packets; keep it global !!!!
113 */
114
115
116typedef union {
117 int8_t B[MAX_LEN/8];
118 int16_t S[MAX_LEN/4];
119 int32_t I[MAX_LEN/2];
120 int64_t L[MAX_LEN ];
121} CNV_FACT ;
122
123typedef struct {
124 int bufTyp ; //what are we reading at the moment: 0=header 1=data -1=skip ...
125 int32_t bufPos ; //next byte to read to the buffer next
126 int32_t bufLen ; //number of bytes left to read
127 int32_t skip ; //number of bytes skipped before start of event
128
129 int sockStat ; //-1 if socket not yet connected , 99 if not exist
130 int socket ; //contains the sockets
131 struct sockaddr_in SockAddr ; //IP for each socket
132
133 int evtID ; // event ID of event currently read
134 int runID ; // run "
135 int ftmID ; // event ID from FTM
136 uint fadLen ; // FADlength of event currently read
137 int fadVers ; // Version of FAD
138 int board ; // boardID (softwareID: 0..40 )
139 int Port ;
140
141 CNV_FACT *rBuf ;
142
143} READ_STRUCT ;
144
145
146typedef union {
147 int8_t B[2];
148 int16_t S ;
149} SHORT_BYTE ;
150
151
152
153
154
155#define MXSTR 1000
156char str[MXSTR] ;
157
158SHORT_BYTE start, stop;
159
160READ_STRUCT rd[MAX_SOCK] ; //buffer to read IP and afterwards store in mBuffer
161
162
163
164/*-----------------------------------------------------------------*/
165
166
167/*-----------------------------------------------------------------*/
168
169
170
171int GenSock(int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) {
172/*
173*** generate Address, create sockets and allocates readbuffer for it
174***
175*** if flag==0 generate socket and buffer
176*** <0 destroy socket and buffer
177*** >0 close and redo socket
178***
179*** sid : board*7 + port id
180 */
181
182 int j ;
183
184 if (rd->sockStat ==0 ) { //close socket if open
185 j=close(rd->socket) ;
186 if (j>0) {
187 snprintf(str,MXSTR,"Error closing socket %d | %m",sid);
188 factOut(kFatal,771, str ) ;
189 } else {
190 snprintf(str,MXSTR,"Succesfully closed socket %d",sid);
191 factOut(kInfo,771, str ) ;
192 }
193 }
194
195
196 if (flag < 0) {
197 free(rd->rBuf) ; //and never open again
198 rd->rBuf = NULL ;
199 rd->sockStat = 99 ;
200 return 0 ;
201 }
202
203
204 if (flag == 0) { //generate address and buffer ...
205 rd->Port = port ;
206 rd->SockAddr.sin_family = sockAddr->sin_family;
207 rd->SockAddr.sin_port = htons(port) ;
208 rd->SockAddr.sin_addr = sockAddr->sin_addr ;
209
210 rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
211 if ( rd->rBuf == NULL ) {
212 snprintf(str,MXSTR,"Could not create local buffer %d",sid);
213 factOut(kFatal,774, str ) ;
214 rd->sockStat = 77 ;
215 return -3 ;
216 }
217 }
218
219
220 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
221 snprintf(str,MXSTR,"Could not generate socket %d | %m",sid);
222 factOut(kFatal,773, str ) ;
223 rd->sockStat = 88 ;
224 return -2 ;
225 }
226
227 snprintf(str,MXSTR,"Successfully generated socket %d ",sid);
228 factOut(kInfo,773, str ) ;
229 rd->sockStat = -1 ; //try to (re)open socket
230 return 0 ;
231
232} /*-----------------------------------------------------------------*/
233
234 /*-----------------------------------------------------------------*/
235
236
237
238
239int mBufInit() {
240// initialize mBuffer (mark all entries as unused\empty)
241
242 int i ;
243 uint32_t actime ;
244
245 actime = g_actTime + 50000000 ;
246
247 for (i=0; i<MAX_EVT*MAX_RUN; i++) {
248 mBuffer[i].evNum = mBuffer[i].runNum = -1;
249
250 evtCtrl.evtBuf[ i] = -1 ;
251 evtCtrl.evtStat[ i] = -1 ;
252 evtCtrl.pcTime[ i] = actime ; //initiate to far future
253
254 }
255
256
257 actRun.FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ;
258
259 evtCtrl.frstPtr = 0 ;
260 evtCtrl.lastPtr = 0 ;
261
262 return 0 ;
263
264} /*-----------------------------------------------------------------*/
265
266
267
268
269int mBufEvt(uint evID, uint runID, uint nRoi) {
270// generate a new Event into mBuffer:
271// make sure only complete Event are possible, so 'free' will always work
272// returns index into mBuffer[], or negative value in case of error
273
274
275 int i, k, evFree ;
276 int headmem=0 ;
277 size_t needmem = 0 ;
278
279 if (nRoi <=0 || nRoi > 1024) {
280 snprintf(str,MXSTR,"illegal nRoi: %d",nRoi) ;
281 factOut(kError, 1, str ) ;
282 return -9999 ;
283 }
284
285 i = evID % MAX_EVT ;
286 evFree = -1 ;
287
288 for ( k=0; k<MAX_RUN; k++) {
289 if ( mBuffer[i].evNum == evID
290 && mBuffer[i].runNum== runID ) {
291 return i ;
292 }
293 if ( evFree < 0 && mBuffer[i].evNum < 0 ) evFree = i ;
294 i += MAX_EVT ;
295 }
296
297
298 //event does not yet exist; create
299 if (evFree < 0 ) { //no space available in ctrl
300 snprintf(str,MXSTR,"no control slot to keep event %d",evID) ;
301 factOut(kError,881, str ) ;
302 return -1 ;
303 }
304 i = evFree ; //found free entry; use it ...
305
306
307 needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2; //
308
309 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
310
311 if ( gj.usdMem + needmem + headmem > g_maxMem) {
312 gj.maxMem = gj.usdMem + needmem + headmem ;
313 if (gi_memStat>0 ) {
314 gi_memStat = -99 ;
315 snprintf(str,MXSTR,"no memory left to keep event %d",evID) ;
316 factOut(kError,882, str ) ;
317 }
318 return -11 ;
319 }
320
321 mBuffer[i].FADhead = malloc( headmem ) ;
322 if (mBuffer[i].FADhead == NULL) {
323 snprintf(str,MXSTR,"malloc header failed for event %d",evID) ;
324 factOut(kError,882, str ) ;
325 return -12;
326 }
327
328 mBuffer[i].fEvent = malloc( needmem ) ;
329 if (mBuffer[i].fEvent == NULL) {
330 snprintf(str,MXSTR,"malloc data failed for event %d",evID) ;
331 factOut(kError,882, str ) ;
332 free(mBuffer[i].fEvent) ;
333 mBuffer[i].fEvent = NULL ;
334 return -22;
335 }
336
337 //flag all boards as unused
338 mBuffer[i].nBoard = 0 ;
339 for (k=0; k<NBOARDS; k++ ) {
340 mBuffer[i].board[k] = -1;
341 }
342
343 //flag all pixels as unused
344 for (k=0; k<NPIX; k++ ) {
345 mBuffer[i].fEvent->StartPix[k] = -1 ;
346 }
347
348 //flag all TMark as unused
349 for (k=0; k<NTMARK; k++ ) {
350 mBuffer[i].fEvent->StartTM[k] = -1 ;
351 }
352
353 mBuffer[i].pcTime = g_actTime ;
354 mBuffer[i].nRoi = nRoi ;
355 mBuffer[i].evNum = evID ;
356 mBuffer[i].runNum = runID ;
357 mBuffer[i].evtLen = needmem ;
358
359 gj.usdMem += needmem + headmem;
360 if (gj.usdMem > gj.maxMem ) gj.maxMem = gj.usdMem ;
361
362 gj.rateNew++ ;
363
364 //register event in 'active list (reading)'
365
366 evtCtrl.evtBuf[ evtCtrl.lastPtr] = i ;
367 evtCtrl.evtStat[ evtCtrl.lastPtr] = 0 ;
368 evtCtrl.pcTime[ evtCtrl.lastPtr] = g_actTime ;
369 evtIdx[i] = evtCtrl.lastPtr ;
370
371
372snprintf(str,MXSTR,"%5d start new evt %8d %8d %2d",evID,i,evtCtrl.lastPtr,0);
373factOut(kDebug,-11, str ) ;
374 evtCtrl.lastPtr++ ;
375 if (evtCtrl.lastPtr == MAX_EVT*MAX_RUN ) evtCtrl.lastPtr = 0;
376
377 gi.evtGet++ ;
378
379 //check if runId already registered in runCtrl
380 evFree = -1 ;
381 for (k=0; k<MAX_RUN; k++) {
382 if (runCtrl[k].runId == runID ) return i ;//run exists already
383 else if (evFree < 0 && runCtrl[k].runId == 0 ) evFree = k ;
384 }
385
386 if (evFree <0 ) {
387 snprintf(str,MXSTR,"not able to register the new run %d",runID);
388 factOut(kFatal,883, str ) ;
389 } else {
390 runCtrl[evFree].runId = runID ;
391 runCtrl[evFree].fileId = -2 ;
392 runCtrl[evFree].nextEvt= 0;
393 runCtrl[evFree].actEvt = 0;
394 runCtrl[evFree].maxEvt = 999999999 ; //max number events allowed
395 runCtrl[evFree].lastTime=g_actTime ;
396 runCtrl[evFree].closeTime=g_actTime + 3600*24 ; //max time allowed
397 runCtrl[evFree].lastTime = 0 ;
398
399 runTail[evFree].nEventsOk =
400 runTail[evFree].nEventsRej =
401 runTail[evFree].nEventsBad =
402 runTail[evFree].PCtime0 =
403 runTail[evFree].PCtimeX = 0 ;
404 }
405
406 return i ;
407
408} /*-----------------------------------------------------------------*/
409
410
411
412
413int mBufFree(int i) {
414//delete entry [i] from mBuffer:
415//(and make sure multiple calls do no harm ....)
416
417 int headmem=0 ;
418 size_t freemem = 0 ;
419
420 if ( mBuffer[i].nRoi > 0) { //have an fEvent structure generated ...
421 freemem = mBuffer[i].evtLen ;
422 free(mBuffer[i].fEvent ) ;
423 mBuffer[i].fEvent = NULL ;
424
425 free(mBuffer[i].FADhead ) ;
426 mBuffer[i].FADhead = NULL ;
427
428 }
429 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
430 mBuffer[i].evNum = mBuffer[i].runNum = mBuffer[i].nRoi= -1;
431
432 gj.usdMem = gj.usdMem - freemem - headmem;
433
434 if (gi_memStat < 0 ) {
435 if (gj.usdMem <= 0.75 * gj.maxMem ) gi_memStat = +1 ;
436 }
437
438
439 return 0 ;
440
441} /*-----------------------------------------------------------------*/
442
443
444void resetEvtStat() {
445 int i ;
446
447 for (i=0; i<MAX_SOCK; i++) gi.numRead[i] = 0 ;
448
449 for (i=0; i<NBOARDS; i++ ) {
450 gi.gotByte[i] = 0 ;
451 gi.gotErr[i] = 0 ;
452 }
453
454 gi.evtGet = 0 ; //#new Start of Events read
455 gi.evtTot = 0 ; //#complete Events read
456 gi.evtErr = 0 ; //#Events with Errors
457 gi.evtSkp = 0 ; //#Events incomplete (timeout)
458
459 gi.procTot = 0 ; //#Events processed
460 gi.procErr = 0 ; //#Events showed problem in processing
461 gi.procTrg = 0 ; //#Events accepted by SW trigger
462 gi.procSkp = 0 ; //#Events rejected by SW trigger
463
464 gi.feedTot = 0 ; //#Events used for feedBack system
465 gi.feedErr = 0 ; //#Events rejected by feedBack
466
467 gi.wrtTot = 0 ; //#Events written to disk
468 gi.wrtErr = 0 ; //#Events with write-error
469
470 gi.runOpen = 0 ; //#Runs opened
471 gi.runClose= 0 ; //#Runs closed
472 gi.runErr = 0 ; //#Runs with open/close errors
473
474return ;
475} /*-----------------------------------------------------------------*/
476
477
478
479void initReadFAD() {
480return ;
481} /*-----------------------------------------------------------------*/
482
483
484
485void *readFAD( void *ptr ) {
486/* *** main loop reading FAD data and sorting them to complete events */
487 int head_len,frst_len,numok,numok2,dest,evID,i,k ;
488 int actBoards = 0, minLen ;
489 int32_t jrd ;
490 int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ;
491
492 int goodhed = 0 ;
493 int errcnt0 = 0 ;
494
495 struct timespec xwait ;
496
497
498 struct timeval *tv, atv;
499 tv=&atv;
500 uint32_t tsec, tusec ;
501
502
503 snprintf(str,MXSTR,"start initializing");
504 factOut(kInfo,-1, str ) ;
505
506 int cpu = 7 ; //read thread
507 cpu_set_t mask;
508
509/* CPU_ZERO initializes all the bits in the mask to zero. */
510 CPU_ZERO( &mask );
511/* CPU_SET sets only the bit corresponding to cpu. */
512 cpu = 7 ;
513 CPU_SET( cpu, &mask );
514
515/* sched_setaffinity returns 0 in success */
516 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
517 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
518 factOut(kWarn,-1, str ) ;
519 }
520
521
522 //make sure all sockets are preallocated as 'not exist'
523 for (i=0; i<MAX_SOCK; i++) {
524 rd[i].socket = -1 ;
525 rd[i].sockStat = 99 ;
526 }
527
528 int sockDef[NBOARDS]; //internal state of sockets
529 for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ;
530
531
532 resetEvtStat();
533 gj.usdMem = gj.maxMem = gj.xxxMem = 0 ;
534 gj.totMem = g_maxMem ;
535 gj.bufNew = gj.bufEvt = 0 ;
536 gj.evtSkip= gj.evtWrite = gj.evtErr = 0 ;
537
538 for (k=0; k<NBOARDS; k++) {
539 gi_NumConnect[k]=0;
540 gi.numConn[k] =0;
541 gj.numConn[k] =0;
542 gj.errConn[k] =0;
543 gj.rateBytes[k] =0;
544 gj.totBytes[k] =0;
545 }
546
547
548 uint gi_SecTime ; //time in seconds
549 gi_SecTime= g_actTime ;
550
551 mBufInit() ; //initialize buffers
552
553 snprintf(str,MXSTR,"end initializing");
554 factOut(kInfo,-1, str ) ;
555
556
557
558 head_len = sizeof(PEVNT_HEADER) ;
559//frst_len = head_len + 36 * 12 ; //fad_header plus 36*pix_header
560 frst_len = head_len ; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
561
562//minLen = MIN_LEN ; //min #bytes needed to check header
563 minLen = head_len ; //min #bytes needed to check header: full header for debug
564
565 numok = numok2 = 0 ;
566
567 start.S=0xFB01;
568 stop.S= 0x04FE;
569
570 gi_myRun = g_actTime ;
571 gi_runStat = g_runStat ;
572 gj.readStat= g_runStat ;
573
574
575 while (g_runStat >=0) { //loop until global variable g_runStat claims stop
576
577 gi_runStat = g_runStat;
578 gj.readStat= g_runStat;
579// if (gi.totMem < 50000000 ) gi.totMem = 500000000 ;
580
581 int b,p,p0,s0,nch;
582 nch = 0 ;
583 for (b=0; b<NBOARDS; b++ ) {
584 k = b*7 ;
585 if ( g_port[b].sockDef != sockDef[b] ) { //something has changed ...
586 nch++ ;
587 gi_NumConnect[ b ] = 0 ; //must close all connections
588 gi.numConn[ b ] = 0;
589 gj.numConn[ b ] = 0;
590 if ( sockDef[b] == 0) s0= 0 ; //sockets to be defined and opened
591 else if (g_port[b].sockDef == 0) s0=-1 ; //sockets to be destroyed
592 else s0=+1 ; //sockets to be closed and reopened
593
594 if (s0 == 0) p0=ntohs(g_port[b].sockAddr.sin_port);
595 else p0=0 ;
596
597 for (p=p0+1; p<p0+8; p++) {
598 GenSock(s0, k, p, &g_port[b].sockAddr, &rd[k]) ; //generate address and socket
599 k++ ;
600 }
601 sockDef[b] = g_port[b].sockDef ;
602 }
603 }
604
605 if (nch > 0 ) {
606 actBoards = 0 ;
607 for (b=0; b<NBOARDS; b++ ) {
608 if ( sockDef[b] > 0 ) actBoards++ ;
609 }
610 }
611
612
613
614
615 numok = 0 ; //count number of succesfull actions
616
617 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read
618 b = i / 7 ;
619 if (sockDef[b] > 0) s0=+1 ;
620 else s0=-1 ;
621
622 gettimeofday( tv, NULL);
623 g_actTime = tsec = atv.tv_sec ;
624 tusec= atv.tv_usec ;
625
626 if (rd[i].sockStat <0 ) { //try to connect if not yet done
627 rd[i].sockStat=connect(rd[i].socket,
628 (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ;
629 if (rd[i].sockStat ==0 ) { //successfull ==>
630 if (sockDef[b] > 0) {
631 rd[i].bufTyp = 0 ; // expect a header
632 rd[i].bufLen = frst_len ; // max size to read at begining
633 } else {
634 rd[i].bufTyp = -1 ; // full data to be skipped
635 rd[i].bufLen = sizeof(CNV_FACT) ; //huge for skipping
636 }
637 rd[i].bufPos = 0 ; // no byte read so far
638 rd[i].skip = 0 ; // start empty
639 gi_NumConnect[ b ]++ ;
640 gi.numConn[ b ]++ ;
641 gj.numConn[ b ]++ ;
642 snprintf(str,MXSTR,"+++connect %d %d",b,gi.numConn[ b ]);
643 factOut(kInfo,-1, str ) ;
644 }
645 }
646
647 if (rd[i].sockStat ==0 ) { //we have a connection ==> try to read
648 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
649 numok++ ;
650 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
651
652 if (jrd >0 ) {
653 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ;
654 }
655
656 if (jrd == 0) { //connection has closed ...
657 snprintf(str,MXSTR,"Socket %d closed by FAD",i);
658 factOut(kInfo,441, str ) ;
659 GenSock(s0, i, 0,NULL, &rd[i]) ;
660 gi.gotErr[ b ]++ ;
661 gi_NumConnect[ b ]-- ;
662 gi.numConn[ b ]-- ;
663 gj.numConn[ b ]-- ;
664
665 } else if ( jrd<0 ) { //did not read anything
666 if (errno != EAGAIN && errno != EWOULDBLOCK ) {
667 snprintf(str,MXSTR,"Error Reading from %d | %m",i);
668 factOut(kError,442, str ) ;
669 gi.gotErr[ b ]++ ;
670 } else numok-- ; //else nothing waiting to be read
671 jrd = 0 ;
672 }
673 } else jrd = 0 ; //did read nothing as requested
674
675 gi.gotByte[ b ] += jrd ;
676 gj.rateBytes[b] += jrd ;
677
678 if ( rd[i].bufTyp <0 ) { // we are skipping this board ...
679// just do nothing
680
681 } else if ( rd[i].bufTyp >0 ) { // we are reading data ...
682 if ( jrd < rd[i].bufLen ) { //not yet all read
683 rd[i].bufPos += jrd ; //==> prepare for continuation
684 rd[i].bufLen -= jrd ;
685 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
686 } else { //full dataset read
687 rd[i].bufLen = 0 ;
688 rd[i].bufPos = rd[i].fadLen ;
689 if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0]
690 && rd[i].rBuf->B[ rd[i].bufPos ] != stop.B[1]) {
691 gi.evtErr++ ;
692 snprintf(str,MXSTR,"wrong end of buffer found %d",rd[i].bufPos);
693 factOut(kError,301, str ) ;
694 goto EndBuf ;
695
696 }
697 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
698
699 //we have a complete buffer, copy to WORK area
700 roi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 ]) ;
701 //get index into mBuffer for this event (create if needed)
702 evID = mBufEvt( rd[i].evtID, rd[i].runID, roi ) ;
703
704 if (evID <-9000) goto EndBuf ; //illegal event, skip it ...
705 if (evID < 0) {
706 xwait.tv_sec = 0;
707 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec
708 nanosleep( &xwait , NULL ) ;
709 goto EndBuf1 ; //hope there is free space next round
710 }
711
712
713 //we have a valid entry in mBuffer[]; fill it
714
715 boardId = b ;
716 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
717 int fadCrate = fadBoard/256 ;
718 if (boardId != (fadCrate*10 + fadBoard%256) ) {
719 snprintf(str,MXSTR,"wrong Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
720 if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ; //print only few times
721 }
722 if ( mBuffer[evID].board[ boardId ] != -1) {
723 snprintf(str,MXSTR,"double board %d for event %d",boardId,evID) ;
724 factOut(kWarn,501, str ) ;
725 goto EndBuf ; //--> skip Board
726 }
727
728 int iDx = evtIdx[evID] ; //index into evtCtrl
729
730 memcpy( &mBuffer[evID].FADhead[boardId].start_package_flag,
731 &rd[i].rBuf->S[0], head_len) ;
732 roi = mBuffer[evID].nRoi ;
733
734 pixS = boardId*36 -1 ; //
735 tmS = boardId*4 -1 ; //
736 src = head_len/2 ;
737 for ( drs=0; drs<4; drs++ ) {
738 for ( px=0; px<9; px++ ) {
739 pixH= ntohs(rd[i].rBuf->S[src++]) ;
740 pixC= ntohs(rd[i].rBuf->S[src++]) ;
741 pixR= ntohs(rd[i].rBuf->S[src++]) ;
742
743 src++ ;
744 pixS++ ; //pixS = pixH2S[pixH] ;
745 if ( ( px < PX8 && pixR == roi )
746 || ( px ==PX8 && pixR == 2*roi )
747 || ( px ==PX8 && pixR == roi && roi > 512 ) ) {
748 // correct roi
749 mBuffer[evID].fEvent->StartPix[pixS] =pixC;
750 dest= pixS * roi ;
751 memcpy(
752 &mBuffer[evID].fEvent->Adc_Data[dest],
753 &rd[i].rBuf->S[src], roi * 2) ;
754 src+= roi ;
755 if ( px==PX8 ) {
756 tmS++; // tmS = tmH2S[pixH]
757 dest= tmS * roi + NPIX* roi ;
758 if ( roi <=512 ) {
759 mBuffer[evID].fEvent->StartTM[tmS] =(pixC+roi)%1024 ;
760 memcpy(
761 &mBuffer[evID].fEvent->Adc_Data[dest],
762 &rd[i].rBuf->S[src], roi * 2) ;
763 src+=roi ;
764 } else {
765 mBuffer[evID].fEvent->StartTM[tmS] = -1 ;
766 }
767 }
768 } else {
769 snprintf(str,MXSTR,"wrong roi %d %d %d %d",px,pixR,roi,src-2);
770 gi.evtErr++ ;
771 factOut(kError,202, str ) ;
772 goto EndBuf ;
773 }
774 }
775 }// now we have stored a new board contents into Event structure
776
777 mBuffer[evID].board[ boardId ] = boardId ;
778 evtCtrl.evtStat[ iDx ]++ ;
779 evtCtrl.pcTime[ iDx ] = g_actTime ;
780
781 if (++mBuffer[evID].nBoard >= actBoards ) {
782 snprintf(str,MXSTR,"%5d complete event %8d %8d %2d",mBuffer[evID].evNum,evtCtrl.evtBuf[iDx],iDx,evtCtrl.evtStat[ iDx ]);
783 factOut(kDebug,-1, str ) ;
784 //complete event read ---> flag for next processing
785 evtCtrl.evtStat[ iDx ] = 99;
786 gi.evtTot++ ;
787 }
788
789EndBuf:
790 rd[i].bufTyp = 0 ; //ready to read next header
791 rd[i].bufLen = frst_len ;
792 rd[i].bufPos = 0 ;
793EndBuf1:
794 ;
795 }
796
797 } else { //we are reading event header
798 rd[i].bufPos += jrd ;
799 rd[i].bufLen -= jrd ;
800 if ( rd[i].bufPos >= minLen ){ //sufficient data to take action
801 //check if startflag correct; else shift block ....
802 for (k=0; k<rd[i].bufPos -1 ; k++) {
803 if (rd[i].rBuf->B[k ] == start.B[1]
804 && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
805 }
806 rd[i].skip += k ;
807
808 if (k >= rd[i].bufPos-1 ) { //no start of header found
809// snprintf(str,MXSTR,"no start of header on port%d", i ) ;
810// factOut(kWarn,666, str ) ;
811
812 rd[i].bufPos = 0 ;
813 rd[i].bufLen = head_len ;
814 } else if ( k>0 ) {
815 rd[i].bufPos -= k ;
816 rd[i].bufLen += k ;
817 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ;
818 }
819 if ( rd[i].bufPos >= minLen ) {
820 if ( rd[i].skip >0 ) {
821 snprintf(str,MXSTR,"skipped %d bytes on port%d", rd[i].skip, i ) ;
822 factOut(kInfo,666, str ) ;
823 rd[i].skip = 0 ;
824 }
825 goodhed++;
826 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ;
827 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
828 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; //(FADevt)
829 rd[i].ftmID = ntohl(rd[i].rBuf->I[5]) ; //(FTMevt)
830 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ;
831 if (rd[i].runID ==0 ) rd[i].runID = gi_myRun ;
832 rd[i].bufTyp = 1 ; //ready to read full record
833 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
834 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ; //?
835 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
836 debugHead(i,fadBoard,rd[i].rBuf);
837 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event
838 } else {
839 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
840 }
841 } else {
842 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
843 }
844
845 } //end interpreting last read
846 } //end of successful read anything
847 } //finished trying to read all sockets
848
849 gi.numRead[ numok ] ++ ;
850
851 g_actTime = time(NULL) ;
852 if ( g_actTime > gi_SecTime ) {
853 gi_SecTime = g_actTime ;
854
855
856 //loop over all active events and flag those older than read-timeout
857 //delete those that are written to disk ....
858
859 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
860 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
861
862 gj.bufNew = gj.bufEvt = 0 ;
863 int k1=evtCtrl.frstPtr;
864 for ( k=k1; k<(k1+kd); k++ ) {
865 int k0 = k % (MAX_EVT*MAX_RUN) ;
866//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
867
868 if (evtCtrl.evtStat[k0] > 0
869 && evtCtrl.evtStat[k0] < 90 ) {
870 gj.bufNew++ ; //incomplete event in Buffer
871
872 if ( evtCtrl.pcTime[k0] < g_actTime-10 ) {
873 int id =evtCtrl.evtBuf[k0] ;
874 snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]);
875 factOut(kWarn,601, str ) ;
876 evtCtrl.evtStat[k0] = 91 ; //timeout for incomplete events
877 gi.evtSkp++ ;
878 gi.evtTot++ ;
879 gj.evtSkip++;
880 }
881 } else if (evtCtrl.evtStat[k0] >= 900 ) {
882
883 int id =evtCtrl.evtBuf[k0] ;
884 snprintf(str,MXSTR,"%5d free event buffer (written) %3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
885 factOut(kDebug,-1, str ) ;
886 mBufFree(id) ; //event written--> free memory
887 evtCtrl.evtStat[k0] = -1;
888 gj.evtWrite++ ;
889 gj.rateWrite++ ;
890 } else if (evtCtrl.evtStat[k0] >= 900 ) {
891 gj.bufEvt++ ; //complete event in Buffer
892 }
893
894 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) {
895 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
896 }
897 }
898
899
900 gj.deltaT = 1000 ; //temporary, must be improved
901
902 int b;
903 for ( b=0; b<NBOARDS; b++) gj.totBytes[k] +=gj.rateBytes[b] ;
904 gj.totMem = g_maxMem ;
905 if (gj.maxMem >= gj.xxxMem) gj.xxxMem = gj.maxMem ;
906
907 factStat(gj);
908 factStatNew(gi) ;
909
910 gj.rateNew = gj.rateWrite = 0 ;
911 for ( b=0; b<NBOARDS; b++) gj.rateBytes[k] =0 ;
912 }
913
914
915
916
917 if (numok > 0 ) numok2=0;
918 else if (numok2++ > 3) {
919 if (g_runStat == 1) {
920 xwait.tv_sec = 1;
921 xwait.tv_nsec= 0 ; // hibernate for 1 sec
922 } else {
923 xwait.tv_sec = 0;
924 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
925 }
926 nanosleep( &xwait , NULL ) ;
927 }
928
929 } //and do next loop over all sockets ...
930
931 //must quit eventbuilding
932 snprintf(str,MXSTR,"stop reading ...");
933 factOut(kInfo,-1, str ) ;
934
935 //flag all events as 'read finished'
936 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
937 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
938
939 int k1=evtCtrl.frstPtr;
940
941 for ( k=k1; k<(k1+kd); k++ ) {
942 int k0 = k % (MAX_EVT*MAX_RUN) ;
943 if (evtCtrl.evtStat[k0] > 0
944 && evtCtrl.evtStat[k0] < 90 ) {
945 evtCtrl.evtStat[k0] = 91 ;
946 gi.evtSkp++ ;
947 gi.evtTot++ ;
948 }
949 }
950
951 //must close all open sockets ...
952 snprintf(str,MXSTR,"close all sockets ...");
953 factOut(kInfo,-1, str ) ;
954 for (i=0; i<MAX_SOCK; i++) {
955 if (rd[i].sockStat ==0 ) {
956 GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy socket
957 gi_NumConnect[ i/7 ]-- ;
958 gi.numConn[ i/7 ]-- ;
959 gj.numConn[ i/7 ]-- ;
960 }
961 }
962
963 xwait.tv_sec = 0;
964 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
965 nanosleep( &xwait , NULL ) ;
966 gi_runStat = -11 ; //inform all that no update to happen any more
967 gj.readStat= -11 ; //inform all that no update to happen any more
968
969
970 int minclear = 900 ; //usually wait until writing finished (stat 900)
971 if (g_runStat <-1 ) minclear = 0 ; //in case of abort clear all
972
973
974 //and clear all buffers (might have to wait until all others are done)
975 snprintf(str,MXSTR,"clear all buffers ...");
976 factOut(kInfo,-1, str ) ;
977 int numclear=1 ;
978 while (numclear > 0 ) {
979 numclear = 0 ;
980 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
981 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
982
983 int k1=evtCtrl.frstPtr;
984 for ( k=k1; k<(k1+kd); k++ ) {
985 int k0 = k % (MAX_EVT*MAX_RUN) ;
986 if (evtCtrl.evtStat[k0] > minclear ) {
987 int id =evtCtrl.evtBuf[k0] ;
988 mBufFree(id) ; //event written--> free memory
989 evtCtrl.evtStat[k0] = -1;
990 } else if (evtCtrl.evtStat[k0] > 0) numclear++ ; //writing is still ongoing...
991
992 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 )
993 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
994 }
995
996 xwait.tv_sec = 0;
997 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
998 nanosleep( &xwait , NULL ) ;
999 }
1000
1001 snprintf(str,MXSTR,"Exit read Process ...");
1002 factOut(kInfo,-1, str ) ;
1003 gi_runStat = -99 ;
1004 gj.readStat= -99 ;
1005 factStat(gj);
1006 factStatNew(gi) ;
1007 return 0;
1008
1009} /*-----------------------------------------------------------------*/
1010
1011
1012void *procEvt( void *ptr ) {
1013/* *** main loop processing file, including SW-trigger */
1014 int numProc, numWait ;
1015 int k ;
1016 struct timespec xwait ;
1017 char str[MXSTR] ;
1018
1019 cpu_set_t mask;
1020 int cpu = 5 ; //process thread (will be several in final version)
1021
1022 snprintf(str,MXSTR,"Starting process-thread");
1023 factOut(kInfo,-1, str ) ;
1024
1025/* CPU_ZERO initializes all the bits in the mask to zero. */
1026 CPU_ZERO( &mask );
1027/* CPU_SET sets only the bit corresponding to cpu. */
1028 CPU_SET( cpu, &mask );
1029/* sched_setaffinity returns 0 in success */
1030 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1031 snprintf(str,MXSTR,"P ---> can not create affinity to %d",cpu);
1032 factOut(kWarn,-1, str ) ;
1033 }
1034
1035
1036 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1037
1038 numWait = numProc = 0 ;
1039 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1040 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1041
1042 int k1=evtCtrl.frstPtr;
1043 for ( k=k1; k<(k1+kd); k++ ) {
1044 int k0 = k % (MAX_EVT*MAX_RUN) ;
1045//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1046 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
1047 int id = evtCtrl.evtBuf[k0] ;
1048 int ievt = mBuffer[id].evNum ;
1049 int roi = mBuffer[id].nRoi ;
1050// uint32_t irun = mBuffer[id].runNum ;
1051//snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ;
1052//factOut(kDebug,-1, str ) ;
1053
1054//make sure unused pixels/tmarks are cleared to zero
1055 int ip,it,dest,ib;
1056 for (ip=0; ip<NPIX; ip++) {
1057 if (mBuffer[id].fEvent->StartPix[ip] == -1 ) {
1058 dest= ip*roi ;
1059 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
1060 }
1061 }
1062 for (it=0; it<NTMARK; it++) {
1063 if (mBuffer[id].fEvent->StartTM[it] == -1 ) {
1064 dest= it*roi + NPIX*roi ;
1065 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
1066 }
1067 }
1068
1069
1070//and set correct event header ; also check for consistency in event (not yet)
1071 mBuffer[id].fEvent->Roi = roi ;
1072 mBuffer[id].fEvent->EventNum = ievt ;
1073 mBuffer[id].fEvent->TriggerType = 0 ; // TBD
1074 mBuffer[id].fEvent->SoftTrig = 0 ;
1075 for (ib=0; ib<NBOARDS; ib++) {
1076 if (mBuffer[id].board[ib] == -1 ) { //board is not read
1077 mBuffer[id].FADhead[ib].start_package_flag = 0 ;
1078 mBuffer[id].fEvent->BoardTime[ib] = 0 ;
1079 } else {
1080 mBuffer[id].fEvent->BoardTime[ib] =
1081 ntohl(mBuffer[id].FADhead[ib].time) ;
1082 }
1083 }
1084
1085 int i=eventCheck(mBuffer[id].FADhead,mBuffer[id].fEvent) ;
1086// gj.procEvt++ ;
1087 gi.procTot++ ;
1088 numProc++ ;
1089 evtCtrl.evtStat[k0] = 520 ;
1090
1091 if (i<0) {
1092 evtCtrl.evtStat[k0] = 999 ; //flag event to be skipped
1093 gi.procErr++ ;
1094 }
1095 } else if ( evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] < 90 ) {
1096 numWait++ ;
1097 }
1098 }
1099
1100 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do
1101 snprintf(str,MXSTR,"Exit Processing Process ...");
1102 factOut(kInfo,-1, str ) ;
1103 gp_runStat = -22 ; //==> we should exit
1104 gj.procStat= -22 ; //==> we should exit
1105 return 0 ;
1106 }
1107
1108 if (numProc == 0) {
1109 //seems we have nothing to do, so sleep a little
1110 xwait.tv_sec = 0;
1111 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1112 nanosleep( &xwait , NULL ) ;
1113 }
1114 gp_runStat = gi_runStat ;
1115 gj.procStat= gj.readStat ;
1116
1117 }
1118
1119 //we are asked to abort asap ==> must flag all remaining events
1120 // when gi_runStat claims that all events are in the buffer...
1121
1122 snprintf(str,MXSTR,"Abort Processing Process ...");
1123 factOut(kInfo,-1, str ) ;
1124 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1125 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1126
1127 int k1=evtCtrl.frstPtr;
1128 for ( k=k1; k<(k1+kd); k++ ) {
1129 int k0 = k % (MAX_EVT*MAX_RUN) ;
1130 if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <500) {
1131 evtCtrl.evtStat[k0] = 555 ; //flag event as 'processed'
1132 }
1133 }
1134
1135 gp_runStat = -99 ;
1136 gj.procStat= -99 ;
1137
1138 return 0;
1139
1140} /*-----------------------------------------------------------------*/
1141
1142int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt) {
1143/* close run runId (all all runs if runId=0) */
1144/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1145 int j ;
1146
1147
1148 if ( runId == 0 ) {
1149 for ( j=0; j<MAX_RUN; j++) {
1150 if ( runCtrl[j].fileId == 0 ) { //run is open
1151 runCtrl[j].closeTime = closeTime ;
1152 runCtrl[j].maxEvt = maxEvt ;
1153 }
1154 }
1155 return 0;
1156 }
1157
1158 for ( j=0; j<MAX_RUN; j++) {
1159 if ( runCtrl[j].runId == runId ) {
1160 if ( runCtrl[j].fileId == 0 ) { //run is open
1161 runCtrl[j].closeTime = closeTime ;
1162 runCtrl[j].maxEvt = maxEvt ;
1163 return 0;
1164 } else if ( runCtrl[j].fileId <0 ) { //run not yet opened
1165 runCtrl[j].closeTime = closeTime ;
1166 runCtrl[j].maxEvt = maxEvt ;
1167 return +1;
1168 } else { // run already closed
1169 return +2;
1170 }
1171 }
1172 } //we only reach here if the run was never created
1173 return -1;
1174
1175} /*-----------------------------------------------------------------*/
1176
1177
1178void *writeEvt( void *ptr ) {
1179/* *** main loop writing event (including opening and closing run-files */
1180
1181 int numWrite, numWait ;
1182 int k,j,i ;
1183 struct timespec xwait ;
1184 char str[MXSTR] ;
1185
1186 cpu_set_t mask;
1187 int cpu = 3 ; //write thread
1188
1189 snprintf(str,MXSTR,"Starting write-thread");
1190 factOut(kInfo,-1, str ) ;
1191
1192/* CPU_ZERO initializes all the bits in the mask to zero. */
1193 CPU_ZERO( &mask );
1194/* CPU_SET sets only the bit corresponding to cpu. */
1195 CPU_SET( cpu, &mask );
1196/* sched_setaffinity returns 0 in success */
1197 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1198 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
1199 }
1200
1201 int lastRun = 0 ; //usually run from last event still valid
1202
1203 while (g_runStat >-2) {
1204
1205 numWait = numWrite = 0 ;
1206 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1207 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1208
1209 int k1=evtCtrl.frstPtr;
1210 for ( k=k1; k<(k1+kd); k++ ) {
1211 int k0 = k % (MAX_EVT*MAX_RUN) ;
1212//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1213 if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] < 900) {
1214 int id = evtCtrl.evtBuf[k0] ;
1215 uint32_t irun = mBuffer[id].runNum ;
1216 int ievt = mBuffer[id].evNum ;
1217
1218 gi.wrtTot++ ;
1219 if (runCtrl[lastRun].runId == irun ) {
1220 j = lastRun ;
1221 } else {
1222 //check which fileID to use (or open if needed)
1223 for ( j=0; j<MAX_RUN; j++) {
1224 if ( runCtrl[j].runId == irun ) break ;
1225 }
1226 if ( j >= MAX_RUN ) {
1227 snprintf(str,MXSTR,"W error: can not find run %d for event %d in %d", irun,ievt,id);
1228 factOut(kFatal,901, str ) ;
1229 gi.wrtErr++ ;
1230for ( j=0; j<MAX_RUN; j++) printf("j %d run.j %d run %d\n",j,runCtrl[j].runId,irun );
1231exit(111);
1232 }
1233 lastRun = j ;
1234 }
1235
1236 if (runCtrl[j].fileId < 0 ) {
1237 actRun.Version = 1 ;
1238 actRun.RunType = -1 ;
1239 actRun.NBoard = NBOARDS ;
1240 actRun.NPix = NPIX ;
1241 actRun.NTm = NTMARK ;
1242 actRun.Nroi = mBuffer[id].nRoi ;
1243// actRun.FADhead = mBuffer[id].FADhead ; //to be corrected
1244
1245 runCtrl[j].fileHd = runOpen(irun, &actRun, sizeof(actRun) ) ;
1246 if (runCtrl[j].fileHd == NULL ) {
1247 snprintf(str,MXSTR,"W could not open a file for run %d",irun);
1248 factOut(kError,502, str ) ;
1249 runCtrl[j].fileId = 99 ;
1250 } else {
1251 snprintf(str,MXSTR,"W opened new run_file %d",irun) ;
1252 factOut(kInfo,-1, str ) ;
1253 runCtrl[j].fileId = 0 ;
1254 }
1255
1256 }
1257
1258 if (runCtrl[j].fileId != 0 ) {
1259 snprintf(str,MXSTR,"W no open file for this run %d",irun) ;
1260 factOut(kWarn,123,str) ;
1261 evtCtrl.evtStat[k0] = 902 ;
1262 gi.wrtErr++ ;
1263 } else {
1264 i=runWrite(runCtrl[j].fileHd, mBuffer[id].fEvent, sizeof(mBuffer[id]) );
1265 if ( i>=0 ) {
1266 runCtrl[j].lastTime = g_actTime;
1267 runCtrl[j].actEvt++ ;
1268 evtCtrl.evtStat[k0] = 901 ;
1269 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0);
1270 factOut(kDebug,504, str ) ;
1271// gj.writEvt++ ;
1272 } else {
1273 snprintf(str,MXSTR,"W error writing event for run %d",irun) ;
1274 factOut(kError,503, str ) ;
1275 evtCtrl.evtStat[k0] = 901 ;
1276 gi.wrtErr++ ;
1277 }
1278
1279 if ( i < 0
1280 || runCtrl[j].closeTime < g_actTime
1281 || runCtrl[j].lastTime < g_actTime-300
1282 || runCtrl[j].maxEvt < runCtrl[j].actEvt ) {
1283int ii =0 ;
1284if ( i < 0 ) ii=1 ;
1285else if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1286else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1287else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1288
1289
1290
1291 //close run for whatever reason
1292 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1293 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1294 if (i<0) {
1295 snprintf(str,MXSTR,"error closing run %d %d AAA",runCtrl[j].runId,i) ;
1296 factOut(kError,503, str ) ;
1297 runCtrl[j].fileId = 9001 ;
1298 } else {
1299 snprintf(str,MXSTR,"W closed run %d AAA %d",irun,ii) ;
1300 factOut(kInfo,503, str ) ;
1301 runCtrl[j].fileId = 901 ;
1302 }
1303 }
1304 }
1305 } else if (evtCtrl.evtStat[k0] > 0 ) numWait++ ;
1306 }
1307
1308 //check if we should close a run (mainly when no event pending)
1309 for ( j=0; j<MAX_RUN; j++) {
1310 if ( runCtrl[j].fileId==0
1311 && ( runCtrl[j].closeTime < g_actTime
1312 ||runCtrl[j].lastTime < g_actTime-300
1313 ||runCtrl[j].maxEvt < runCtrl[j].actEvt ) ) {
1314 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1315int ii =0 ;
1316if ( i < 0 ) ii=1 ;
1317else if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1318else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1319else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1320
1321
1322 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1323 if (i<0) {
1324 snprintf(str,MXSTR,"error closing run %d %d BBB",runCtrl[j].runId,i) ;
1325 factOut(kError,506, str ) ;
1326 runCtrl[j].fileId = 9011 ;
1327 } else {
1328 snprintf(str,MXSTR,"W closed run %d BBB %d",runCtrl[j].runId,ii) ;
1329 factOut(kInfo,507, str ) ;
1330 runCtrl[j].fileId = 911 ;
1331 }
1332 }
1333 }
1334
1335 if (numWrite == 0) {
1336 //seems we have nothing to do, so sleep a little
1337 xwait.tv_sec = 0;
1338 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1339 nanosleep( &xwait , NULL ) ;
1340 }
1341
1342 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do
1343 snprintf(str,MXSTR,"Finish Write Process ...");
1344 factOut(kInfo,-1, str ) ;
1345 gw_runStat = -22 ; //==> we should exit
1346 gj.writStat= -22 ; //==> we should exit
1347 goto closerun ;
1348 }
1349 gw_runStat = gi_runStat ;
1350 gj.writStat= gj.readStat ;
1351
1352 }
1353
1354 //must close all open files ....
1355 snprintf(str,MXSTR,"Abort Writing Process ...");
1356 factOut(kInfo,-1, str ) ;
1357closerun:
1358 snprintf(str,MXSTR,"Close all open files ...");
1359 factOut(kInfo,-1, str ) ;
1360 for ( j=0; j<MAX_RUN; j++)
1361 if ( runCtrl[j].fileId ==0 ) {
1362 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1363 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1364int ii =0 ;
1365if ( i < 0 ) ii=1 ;
1366else if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1367else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1368else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1369 if (i<0) {
1370 snprintf(str,MXSTR,"error closing run %d %d CCC",runCtrl[j].runId,i) ;
1371 factOut(kError,506, str ) ;
1372 runCtrl[j].fileId = 9021 ;
1373 } else {
1374 snprintf(str,MXSTR,"W closed run %d CCC %d",runCtrl[j].runId,ii) ;
1375 factOut(kInfo,507, str ) ;
1376 runCtrl[j].fileId = 921 ;
1377 }
1378 }
1379
1380 gw_runStat = -99;
1381 gj.writStat= -99;
1382 snprintf(str,MXSTR,"Exit Writing Process ...");
1383 factOut(kInfo,-1, str ) ;
1384 return 0;
1385
1386
1387
1388
1389} /*-----------------------------------------------------------------*/
1390
1391
1392
1393
1394void StartEvtBuild() {
1395
1396 int i,j,imax,status,th_ret[50] ;
1397 pthread_t thread[50] ;
1398 struct timespec xwait ;
1399 uint32_t actime ;
1400
1401 gi_runStat = gp_runStat = gw_runStat = 0 ;
1402 gj.readStat= gj.procStat= gj.writStat= 0 ;
1403
1404
1405 snprintf(str,MXSTR,"Starting EventBuilder");
1406 factOut(kInfo,-1, str ) ;
1407
1408
1409 evtCtrl.frstPtr = 0 ;
1410 evtCtrl.lastPtr = 0 ;
1411
1412 actime = g_actTime + 50000000 ;
1413/* initialize run control logics */
1414 for (i=0; i<MAX_RUN; i++) {
1415 runCtrl[i].runId = 0 ;
1416 runCtrl[i].fileId = -2 ;
1417 }
1418
1419
1420//start all threads (more to come) when we are allowed to ....
1421 while (g_runStat == 0 ) {
1422 xwait.tv_sec = 0;
1423 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1424 nanosleep( &xwait , NULL ) ;
1425 }
1426
1427 i=0 ;
1428 th_ret[i] = pthread_create( &thread[i], NULL, readFAD, NULL );
1429 i++;
1430 th_ret[i] = pthread_create( &thread[i], NULL, procEvt, NULL );
1431 i++;
1432 th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, NULL );
1433 i++;
1434 imax=i ;
1435
1436#ifdef BILAND
1437 xwait.tv_sec = 20;;
1438 xwait.tv_nsec= 0 ; // sleep for ~20sec
1439 nanosleep( &xwait , NULL ) ;
1440
1441 printf("close all runs in 2 seconds\n");
1442
1443 CloseRunFile( 0, time(NULL)+2, 0) ;
1444
1445 xwait.tv_sec = 5;;
1446 xwait.tv_nsec= 0 ; // sleep for ~20sec
1447 nanosleep( &xwait , NULL ) ;
1448
1449 printf("setting g_runstat to -1\n");
1450
1451 g_runStat = -1 ;
1452#endif
1453
1454
1455//wait for all threads to finish
1456 for (i=0; i<imax; i++) {
1457 j = pthread_join ( thread[i], (void **)&status) ;
1458 }
1459
1460} /*-----------------------------------------------------------------*/
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476 /*-----------------------------------------------------------------*/
1477 /*-----------------------------------------------------------------*/
1478 /*-----------------------------------------------------------------*/
1479 /*-----------------------------------------------------------------*/
1480 /*-----------------------------------------------------------------*/
1481
1482
1483#ifdef BILAND
1484
1485
1486 /*-----------------------------------------------------------------*/
1487 /*-----------------------------------------------------------------*/
1488 /*-----------------------------------------------------------------*/
1489 /*-----------------------------------------------------------------*/
1490 /*-----------------------------------------------------------------*/
1491
1492
1493
1494
1495FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len )
1496{ return 1; } ;
1497
1498int runWrite(FileHandle_t fileHd , EVENT *event, size_t len )
1499{ return 1; } ;
1500
1501int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len )
1502{ return 1; } ;
1503
1504
1505
1506int eventCheck( PEVNT_HEADER *fadhd, EVENT *event)
1507{
1508 int i=0;
1509
1510 printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
1511 for (i=0; i<NBOARDS; i++) {
1512 printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
1513 }
1514 return 0;
1515}
1516
1517
1518void factStatNew(EVT_STAT gi) {
1519 int i ;
1520
1521 for (i=0;i<MAX_SOCK;i++) {
1522 printf("%4d",gi.numRead[i]);
1523 if (i%20 == 0 ) printf("\n");
1524 }
1525}
1526
1527
1528void factStat(GUI_STAT gj) {
1529// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
1530// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
1531}
1532
1533
1534void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) {
1535// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
1536}
1537
1538
1539
1540void debugStream(int isock, void *buf, int len) {
1541}
1542
1543void debugHead(int i, int j, void *buf) {
1544}
1545
1546
1547void factOut(int severity, int err, char* message ) {
1548static FILE * fd ;
1549static int file=0 ;
1550
1551 if (file==0) {
1552 printf("open file\n");
1553 fd=fopen("x.out","w+") ;
1554 file=999;
1555 }
1556
1557 fprintf(fd,"%3d %3d | %s \n",severity,err,message) ;
1558
1559 if (severity != kDebug)
1560 printf("%3d %3d | %s\n",severity,err,message) ;
1561}
1562
1563
1564
1565int main() {
1566 int i,b,c,p ;
1567 char ipStr[100] ;
1568 struct in_addr IPaddr ;
1569
1570 g_maxMem = 1024*1024 ; //MBytes
1571 g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
1572
1573
1574 g_runStat = 40 ;
1575
1576 i=0 ;
1577
1578// version for standard crates
1579//for (c=0; c<4,c++) {
1580// for (b=0; b<10; b++) {
1581// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
1582//
1583// inet_pton(PF_INET, ipStr, &IPaddr) ;
1584//
1585// g_port[i].sockAddr.sin_family = PF_INET;
1586// g_port[i].sockAddr.sin_port = htons(5000) ;
1587// g_port[i].sockAddr.sin_addr = IPaddr ;
1588// g_port[i].sockDef = 1 ;
1589// i++ ;
1590// }
1591//}
1592//
1593//version for PC-test *
1594 for (c=0; c<4; c++) {
1595 for (b=0; b<10; b++) {
1596 sprintf(ipStr,"10.0.%d.11",128+c) ;
1597 if (c==0) sprintf(ipStr,"10.0.100.11") ;
1598
1599 inet_pton(PF_INET, ipStr, &IPaddr) ;
1600 p = 31919+100*c+10*b;
1601
1602
1603 g_port[i].sockAddr.sin_family = PF_INET;
1604 g_port[i].sockAddr.sin_port = htons(p) ;
1605 g_port[i].sockAddr.sin_addr = IPaddr ;
1606 g_port[i].sockDef = 1 ;
1607
1608 i++ ;
1609 }
1610 }
1611
1612
1613//g_port[17].sockDef =-1 ;
1614//g_actBoards-- ;
1615
1616 StartEvtBuild() ;
1617
1618 return 0;
1619
1620}
1621#endif
Note: See TracBrowser for help on using the repository browser.