source: trunk/FACT++/src/EventBuilder.c@ 11169

Last change on this file since 11169 was 11134, checked in by tbretz, 13 years ago
Latest Update.
File size: 45.0 KB
Line 
1
2
3#define PX8 99 //simulator does not create double-length roi for pixel 8
4 //for real data, set PX8 = 8 ==> ask for double roi=TM
5
6
7
8#include <stdlib.h>
9#include <stdint.h>
10#include <unistd.h>
11#include <stdio.h>
12#include <sys/time.h>
13#include <arpa/inet.h>
14#include <string.h>
15#include <math.h>
16#include <error.h>
17#include <errno.h>
18#include <unistd.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <pthread.h>
22#include <sched.h>
23
24#include "EventBuilder.h"
25
26enum Severity
27{
28 kMessage = 10, ///< Just a message, usually obsolete
29 kInfo = 20, ///< An info telling something which can be interesting to know
30 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
31 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
32 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
33 kDebug = 99, ///< A message used for debugging only
34};
35
36#define MIN_LEN 32 // min #bytes needed to interpret FADheader
37#define MAX_LEN 64*1024 // size of read-buffer per socket
38
39extern FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) ;
40extern int runWrite(FileHandle_t fileHd , EVENT *event, size_t len ) ;
41extern int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len ) ;
42extern void factOut(int severity, int err, char* message ) ;
43extern void factStat(int64_t *array , int len ) ;
44
45extern int eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ;
46
47
48extern void debugHead(int i, void *buf);
49
50extern void debugRead(int isock, int ibyte, int32_t event, int state,
51 uint32_t tsec, uint32_t tusec ) ;
52extern void debugStream(int isock, void *buf, int len) ;
53
54
55
56int g_actTime = 0 ;
57int g_runStat = 40 ;
58size_t g_maxMem ; //maximum memory allowed for buffer
59
60//no longer needed ...
61 int g_maxBoards ; //maximum number of boards to be initialized
62 int g_actBoards ;
63//
64
65FACT_SOCK g_port[NBOARDS] ; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
66
67
68int gi_runStat ;
69int gp_runStat ;
70int gw_runStat ;
71
72
73uint gi_SecRate[MAX_SOCK] ;
74uint gi_S10Rate[MAX_SOCK] ;
75uint gi_MinRate[MAX_SOCK] ;
76uint gi_ErrCnt[MAX_SOCK] ;
77
78uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
79
80uint gi_SecTime, gi_S10Time, gi_MinTime ;
81uint gi_EvtStart= 0 ;
82uint gi_EvtRead = 0 ;
83uint gi_EvtBad = 0 ;
84uint gi_EvtTot = 0 ;
85size_t gi_usedMem = 0 ;
86
87uint gw_EvtTot = 0 ;
88uint gp_EvtTot = 0 ;
89
90PIX_MAP g_pixMap[NPIX] ;
91
92EVT_CTRL evtCtrl ; //control of events during processing
93int evtIdx[MAX_EVT*MAX_RUN] ; //index from mBuffer to evtCtrl
94
95WRK_DATA mBuffer[MAX_EVT*MAX_RUN]; //local working space
96
97
98
99
100RUN_HEAD actRun ;
101
102RUN_CTRL runCtrl[MAX_RUN] ;
103
104RUN_TAIL runTail[MAX_RUN] ;
105
106
107/*
108*** Definition of rdBuffer to read in IP packets; keep it global !!!!
109 */
110
111
112typedef union {
113 int8_t B[MAX_LEN/8];
114 int16_t S[MAX_LEN/4];
115 int32_t I[MAX_LEN/2];
116 int64_t L[MAX_LEN ];
117} CNV_FACT ;
118
119typedef struct {
120 int bufTyp ; //what are we reading at the moment: 0=header 1=data -1=skip ...
121 int32_t bufPos ; //next byte to read to the buffer next
122 int32_t bufLen ; //number of bytes left to read
123 int32_t skip ; //number of bytes skipped before start of event
124
125 int sockStat ; //-1 if socket not yet connected , 99 if not exist
126 int socket ; //contains the sockets
127 struct sockaddr_in SockAddr ; //IP for each socket
128
129 int evtID ; // event ID of event currently read
130 int runID ; // run "
131 uint fadLen ; // FADlength of event currently read
132 int fadVers ; // Version of FAD
133 int board ; // boardID (softwareID: 0..40 )
134 int Port ;
135
136 CNV_FACT *rBuf ;
137
138} READ_STRUCT ;
139
140
141typedef union {
142 int8_t B[2];
143 int16_t S ;
144} SHORT_BYTE ;
145
146
147
148
149
150#define MXSTR 1000
151char str[MXSTR] ;
152
153SHORT_BYTE start, stop;
154
155READ_STRUCT rd[MAX_SOCK] ; //buffer to read IP and afterwards store in mBuffer
156
157
158
159/*-----------------------------------------------------------------*/
160
161
162/*-----------------------------------------------------------------*/
163
164
165
166int GenSock(int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) {
167/*
168*** generate Address, create sockets and allocates readbuffer for it
169***
170*** if flag==0 generate socket and buffer
171*** <0 destroy socket and buffer
172*** >0 close and redo socket
173***
174*** sid : board*7 + port id
175 */
176
177 int j ;
178
179 if (rd->sockStat ==0 ) { //close socket if open
180 j=close(rd->socket) ;
181 if (j>0) {
182 snprintf(str,MXSTR,"Error closing socket %d | %m",sid);
183 factOut(kFatal,771, str ) ;
184 } else {
185 snprintf(str,MXSTR,"Succesfully closed socket %d",sid);
186 factOut(kInfo,771, str ) ;
187 }
188 }
189
190
191 if (flag < 0) {
192 free(rd->rBuf) ; //and never open again
193 rd->rBuf = NULL ;
194 rd->sockStat = 99 ;
195 return 0 ;
196 }
197
198
199 if (flag == 0) { //generate address and buffer ...
200 rd->Port = port ;
201 rd->SockAddr.sin_family = sockAddr->sin_family;
202 rd->SockAddr.sin_port = htons(port) ;
203 rd->SockAddr.sin_addr = sockAddr->sin_addr ;
204
205 rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
206 if ( rd->rBuf == NULL ) {
207 snprintf(str,MXSTR,"Could not create local buffer %d",sid);
208 factOut(kFatal,774, str ) ;
209 rd->sockStat = 77 ;
210 return -3 ;
211 }
212 }
213
214
215 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
216 snprintf(str,MXSTR,"Could not generate socket %d | %m",sid);
217 factOut(kFatal,773, str ) ;
218 rd->sockStat = 88 ;
219 return -2 ;
220 }
221
222 snprintf(str,MXSTR,"Successfully generated socket %d ",sid);
223 factOut(kInfo,773, str ) ;
224 rd->sockStat = -1 ; //try to (re)open socket
225 return 0 ;
226
227} /*-----------------------------------------------------------------*/
228
229 /*-----------------------------------------------------------------*/
230
231
232
233
234int mBufInit() {
235// initialize mBuffer (mark all entries as unused\empty)
236
237 int i ;
238 uint32_t actime ;
239
240 actime = g_actTime + 50000000 ;
241
242 for (i=0; i<MAX_EVT*MAX_RUN; i++) {
243 mBuffer[i].evNum = mBuffer[i].runNum = -1;
244
245 evtCtrl.evtBuf[ i] = -1 ;
246 evtCtrl.evtStat[ i] = -1 ;
247 evtCtrl.pcTime[ i] = actime ; //initiate to far future
248
249 }
250
251
252 actRun.FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ;
253
254 evtCtrl.frstPtr = 0 ;
255 evtCtrl.lastPtr = 0 ;
256
257 return 0 ;
258
259} /*-----------------------------------------------------------------*/
260
261
262
263
264int mBufEvt(uint evID, uint runID, uint nRoi) {
265// generate a new Event into mBuffer:
266// make sure only complete Event are possible, so 'free' will always work
267// returns index into mBuffer[], or negative value in case of error
268
269
270 int i, k, evFree ;
271 int headmem=0 ;
272 size_t needmem = 0 ;
273
274 if (nRoi <=0 || nRoi > 1024) {
275 snprintf(str,MXSTR,"illegal nRoi: %d",nRoi) ;
276 factOut(kError, 1, str ) ;
277 return -9999 ;
278 }
279
280 i = evID % MAX_EVT ;
281 evFree = -1 ;
282
283 for ( k=0; k<MAX_RUN; k++) {
284 if ( mBuffer[i].evNum == evID
285 && mBuffer[i].runNum== runID ) {
286 return i ;
287 }
288 if ( evFree < 0 && mBuffer[i].evNum < 0 ) evFree = i ;
289 i += MAX_EVT ;
290 }
291
292
293 //event does not yet exist; create
294 if (evFree < 0 ) { //no space available in ctrl
295 snprintf(str,MXSTR,"no control slot to keep event %d",evID) ;
296 factOut(kError,881, str ) ;
297 return -1 ;
298 }
299 i = evFree ; //found free entry; use it ...
300
301
302 needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2; //
303
304 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
305
306 if ( gi_usedMem + needmem + headmem > g_maxMem) {
307 snprintf(str,MXSTR,"no memory left to keep event %d",evID) ;
308 factOut(kError,882, str ) ;
309 return -11 ;
310 }
311
312 mBuffer[i].FADhead = malloc( headmem ) ;
313 if (mBuffer[i].FADhead == NULL) {
314 snprintf(str,MXSTR,"malloc header failed for event %d",evID) ;
315 factOut(kError,882, str ) ;
316 return -12;
317 }
318
319 mBuffer[i].fEvent = malloc( needmem ) ;
320 if (mBuffer[i].fEvent == NULL) {
321 snprintf(str,MXSTR,"malloc data failed for event %d",evID) ;
322 factOut(kError,882, str ) ;
323 free(mBuffer[i].fEvent) ;
324 mBuffer[i].fEvent = NULL ;
325 return -22;
326 }
327
328 //flag all boards as unused
329 mBuffer[i].nBoard = 0 ;
330 for (k=0; k<NBOARDS; k++ ) {
331 mBuffer[i].board[k] = -1;
332 }
333
334 //flag all pixels as unused
335 for (k=0; k<NPIX; k++ ) {
336 mBuffer[i].fEvent->StartPix[k] = -1 ;
337 }
338
339 //flag all TMark as unused
340 for (k=0; k<NTMARK; k++ ) {
341 mBuffer[i].fEvent->StartTM[k] = -1 ;
342 }
343
344 mBuffer[i].pcTime = g_actTime ;
345 mBuffer[i].nRoi = nRoi ;
346 mBuffer[i].evNum = evID ;
347 mBuffer[i].runNum = runID ;
348 mBuffer[i].evtLen = needmem ;
349
350 gi_usedMem += needmem + headmem;
351
352 //register event in 'active list (reading)'
353
354 evtCtrl.evtBuf[ evtCtrl.lastPtr] = i ;
355 evtCtrl.evtStat[ evtCtrl.lastPtr] = 0 ;
356 evtCtrl.pcTime[ evtCtrl.lastPtr] = g_actTime ;
357 evtIdx[i] = evtCtrl.lastPtr ;
358snprintf(str,MXSTR,"%5d start new evt %8d %8d %2d",evID,i,evtCtrl.lastPtr,0);
359factOut(kDebug,-11, str ) ;
360 evtCtrl.lastPtr++ ;
361 if (evtCtrl.lastPtr == MAX_EVT*MAX_RUN ) evtCtrl.lastPtr = 0;
362
363
364
365
366 gi_EvtStart++ ;
367
368 //check if runId already registered in runCtrl
369 evFree = -1 ;
370 for (k=0; k<MAX_RUN; k++) {
371 if (runCtrl[k].runId == runID ) return i ;//run exists already
372 else if (evFree < 0 && runCtrl[k].runId == 0 ) evFree = k ;
373 }
374
375 if (evFree <0 ) {
376 snprintf(str,MXSTR,"not able to register the new run %d",runID);
377 factOut(kFatal,883, str ) ;
378 } else {
379 runCtrl[evFree].runId = runID ;
380 }
381
382 return i ;
383
384} /*-----------------------------------------------------------------*/
385
386
387int mBufFree(int i) {
388//delete entry [i] from mBuffer:
389//(and make sure multiple calls do no harm ....)
390
391 int headmem=0 ;
392 size_t freemem = 0 ;
393
394 if ( mBuffer[i].nRoi > 0) { //have an fEvent structure generated ...
395 freemem = mBuffer[i].evtLen ;
396 free(mBuffer[i].fEvent ) ;
397 mBuffer[i].fEvent = NULL ;
398
399 free(mBuffer[i].FADhead ) ;
400 mBuffer[i].FADhead = NULL ;
401
402 }
403 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
404 mBuffer[i].evNum = mBuffer[i].runNum = mBuffer[i].nRoi= -1;
405
406 gi_usedMem = gi_usedMem - freemem - headmem;
407
408
409 return 0 ;
410
411} /*-----------------------------------------------------------------*/
412
413
414 /*-----------------------------------------------------------------*/
415
416
417
418void initReadFAD() {
419return ;
420} /*-----------------------------------------------------------------*/
421
422
423
424void *readFAD( void *ptr ) {
425/* *** main loop reading FAD data and sorting them to complete events */
426 int head_len,frst_len,numok,numok2,dest,evID,i,k ;
427 int actBoards = 0, minLen ;
428 int32_t jrd ;
429 int32_t myRun ;
430 int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ;
431 uint qtot = 0, qread = 0, qconn = 0 ;
432 int errcnt0 = 0 ;
433
434 int goodhed=0;
435
436 struct timespec xwait ;
437
438 int nokCnt[MAX_SOCK],loopCnt=0;
439 int sokCnt[MAX_SOCK];
440 int sockDef[NBOARDS];
441
442 struct timeval *tv, atv;
443 tv=&atv;
444 uint32_t tsec, tusec ;
445
446
447 snprintf(str,MXSTR,"start initializing");
448 factOut(kInfo,-1, str ) ;
449
450 int cpu = 7 ; //read thread
451 cpu_set_t mask;
452
453/* CPU_ZERO initializes all the bits in the mask to zero. */
454 CPU_ZERO( &mask );
455/* CPU_SET sets only the bit corresponding to cpu. */
456 cpu = 7 ;
457 CPU_SET( cpu, &mask );
458
459/* sched_setaffinity returns 0 in success */
460 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
461 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
462 factOut(kWarn,-1, str ) ;
463 }
464
465
466 //make sure all sockets are preallocated as 'not exist'
467 for (i=0; i<MAX_SOCK; i++) {
468 rd[i].socket = -1 ;
469 rd[i].sockStat = 99 ;
470 }
471 for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ;
472
473
474 g_actTime = time(NULL) ;
475 for (k=0; k<MAX_SOCK; k++)
476 gi_SecRate[k]=gi_S10Rate[k]=gi_MinRate[k]=gi_ErrCnt[k] = 0 ;
477
478 for (k=0; k<NBOARDS; k++)
479 gi_NumConnect[k]=0;
480
481
482 gi_SecTime= gi_S10Time= gi_MinTime= g_actTime ;
483
484 mBufInit() ; //initialize buffers
485
486 snprintf(str,MXSTR,"end initializing");
487 factOut(kInfo,-1, str ) ;
488
489
490 for (k=0; k<MAX_SOCK; k++) sokCnt[k]=nokCnt[k]=0 ;
491
492 head_len = sizeof(PEVNT_HEADER) ;
493//frst_len = head_len + 36 * 12 ; //fad_header plus 36*pix_header
494 frst_len = head_len ; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
495
496//minLen = MIN_LEN ; //min #bytes needed to check header
497 minLen = head_len ; //min #bytes needed to check header: full header for debug
498
499 numok = numok2 = 0 ;
500
501 start.S=0xFB01;
502 stop.S= 0x04FE;
503
504 myRun = g_actTime ;
505
506 gi_runStat = g_runStat ;
507
508
509 while (g_runStat >=0) { //loop until global variable g_runStat claims stop
510
511 gi_runStat = g_runStat ;
512
513 int b,p,p0,s0,nch;
514 nch = 0 ;
515 for (b=0; b<NBOARDS; b++ ) {
516 k = b*7 ;
517 if ( g_port[b].sockDef != sockDef[b] ) { //something has changed ...
518 nch++ ;
519 gi_NumConnect[ b ] = 0 ; //must close all connections
520 if ( sockDef[b] == 0) s0= 0 ; //sockets to be defined and opened
521 else if (g_port[b].sockDef == 0) s0=-1 ; //sockets to be destroyed
522 else s0=+1 ; //sockets to be closed and reopened
523
524 if (s0 == 0) p0=ntohs(g_port[b].sockAddr.sin_port);
525 else p0=0 ;
526
527 for (p=p0+1; p<p0+8; p++) {
528 GenSock(s0, k, p, &g_port[b].sockAddr, &rd[k]) ; //generate address and socket
529 k++ ;
530 }
531 sockDef[b] = g_port[b].sockDef ;
532 }
533 }
534
535 if (nch > 0 ) {
536 actBoards = 0 ;
537 for (b=0; b<NBOARDS; b++ ) {
538 if ( sockDef[b] > 0 ) actBoards++ ;
539 }
540 }
541
542
543 g_actTime = time(NULL) ;
544 nokCnt[numok]++;
545
546 loopCnt++ ;
547
548 numok = 0 ; //count number of succesfull actions
549
550 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read
551 b = i / 7 ;
552 if (sockDef[b] > 0) s0=+1 ;
553 else s0=-1 ;
554
555 gettimeofday( tv, NULL);
556 tsec = atv.tv_sec ;
557 tusec= atv.tv_usec ;
558
559 if (rd[i].sockStat <0 ) { //try to connect if not yet done
560 rd[i].sockStat=connect(rd[i].socket,
561 (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ;
562 if (rd[i].sockStat ==0 ) { //successfull ==>
563 if (sockDef[b] > 0) {
564 rd[i].bufTyp = 0 ; // expect a header
565 rd[i].bufLen = frst_len ; // max size to read at begining
566 } else {
567 rd[i].bufTyp = -1 ; // full data to be skipped
568 rd[i].bufLen = sizeof(CNV_FACT) ; //huge for skipping
569 }
570 rd[i].bufPos = 0 ; // no byte read so far
571 rd[i].skip = 0 ; // start empty
572 gi_NumConnect[ b ]++ ;
573 numok++ ; //make sure next round will execute
574 snprintf(str,MXSTR,"+++connect %d %d",b,gi_NumConnect[ b ]);
575 factOut(kInfo,-1, str ) ;
576 }
577 }
578
579 if (rd[i].sockStat ==0 ) { //we have a connection ==> try to read
580 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
581 numok++ ;
582 sokCnt[i]++;
583 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
584
585 if (jrd >0 ) {
586 qread+=jrd ;
587 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ;
588 }
589
590 if (jrd == 0) { //connection has closed ...
591 snprintf(str,MXSTR,"Socket %d closed by FAD",i);
592 factOut(kInfo,441, str ) ;
593 GenSock(s0, i, 0,NULL, &rd[i]) ;
594 gi_ErrCnt[i]++ ;
595 gi_NumConnect[ b ]-- ;
596
597 } else if ( jrd<0 ) { //did not read anything
598 if (errno != EAGAIN && errno != EWOULDBLOCK ) {
599 snprintf(str,MXSTR,"Error Reading from %d | %m",i);
600 factOut(kError,442, str ) ;
601 gi_ErrCnt[i]++ ;
602 } else numok-- ; //else nothing waiting to be read
603 jrd = 0 ;
604 }
605 } else jrd = 0 ; //did read nothing as requested
606
607 if ( rd[i].bufTyp <0 ) { // we are skipping this board ...
608// just do nothing
609
610 } else if ( rd[i].bufTyp >0 ) { // we are reading data ...
611 if ( jrd < rd[i].bufLen ) { //not yet all read
612 rd[i].bufPos += jrd ; //==> prepare for continuation
613 rd[i].bufLen -= jrd ;
614 debugRead(i,jrd,rd[i].evtID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
615 } else { //full dataset read
616 rd[i].bufLen = 0 ;
617 rd[i].bufPos = rd[i].fadLen ;
618 if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0]
619 && rd[i].rBuf->B[ rd[i].bufPos ] != stop.B[1]) {
620 gi_ErrCnt[i]++ ;
621 snprintf(str,MXSTR,"wrong end of buffer found %d",rd[i].bufPos);
622 factOut(kError,301, str ) ;
623 goto EndBuf ;
624
625 }
626 debugRead(i,jrd,rd[i].evtID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
627
628 //we have a complete buffer, copy to WORK area
629 gi_SecRate[i]++ ;
630
631 roi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 ]) ;
632 //get index into mBuffer for this event (create if needed)
633 evID = mBufEvt( rd[i].evtID, rd[i].runID, roi ) ;
634
635 if (evID <-9000) goto EndBuf ; //illegal event, skip it ...
636 if (evID < 0) {
637 xwait.tv_sec = 0;
638 xwait.tv_nsec= 20000000 ; // sleep for ~20 msec
639 nanosleep( &xwait , NULL ) ;
640 goto EndBuf1 ; //hope there is free space next round
641 }
642
643
644 //we have a valid entry in mBuffer[]; fill it
645
646 boardId = b ;
647 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
648 int fadCrate = fadBoard/256 ;
649 if (boardId != (fadCrate*10 + fadBoard%256) ) {
650 snprintf(str,MXSTR,"wrong Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
651 if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ; //print only few times
652// } else {
653// snprintf(str,MXSTR,"correct Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
654// if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ; //print only few times
655 }
656 if ( mBuffer[evID].board[ boardId ] != -1) {
657 snprintf(str,MXSTR,"double board %d for event %d",boardId,evID) ;
658 factOut(kWarn,501, str ) ;
659 goto EndBuf ; //--> skip Board
660 }
661
662 int iDx = evtIdx[evID] ; //index into evtCtrl
663
664 memcpy( &mBuffer[evID].FADhead[boardId].start_package_flag,
665 &rd[i].rBuf->S[0], head_len) ;
666 roi = mBuffer[evID].nRoi ;
667
668 pixS = boardId*36 -1 ; //
669 tmS = boardId*4 -1 ; //
670 src = head_len/2 ;
671 for ( drs=0; drs<4; drs++ ) {
672 for ( px=0; px<9; px++ ) {
673 pixH= ntohs(rd[i].rBuf->S[src++]) ;
674 pixC= ntohs(rd[i].rBuf->S[src++]) ;
675 pixR= ntohs(rd[i].rBuf->S[src++]) ;
676
677 src++ ;
678 pixS++ ; //pixS = pixH2S[pixH] ;
679 if ( ( px < PX8 && pixR == roi )
680 || ( px ==PX8 && pixR == 2*roi )
681 || ( px ==PX8 && pixR == roi && roi > 512 ) ) {
682 // correct roi
683 mBuffer[evID].fEvent->StartPix[pixS] =pixC;
684 dest= pixS * roi ;
685 memcpy(
686 &mBuffer[evID].fEvent->Adc_Data[dest],
687 &rd[i].rBuf->S[src], roi * 2) ;
688 src+= roi ;
689 if ( px==PX8 ) {
690 tmS++; // tmS = tmH2S[pixH]
691 dest= tmS * roi + NPIX* roi ;
692 if ( roi <=512 ) {
693 mBuffer[evID].fEvent->StartTM[tmS] =(pixC+roi)%1024 ;
694 memcpy(
695 &mBuffer[evID].fEvent->Adc_Data[dest],
696 &rd[i].rBuf->S[src], roi * 2) ;
697 src+=roi ;
698 } else {
699 mBuffer[evID].fEvent->StartTM[tmS] = -1 ;
700 }
701 }
702 } else {
703 snprintf(str,MXSTR,"wrong roi %d %d %d %d",px,pixR,roi,src-2);
704 factOut(kError,202, str ) ;
705 goto EndBuf ;
706 }
707 }
708 }// now we have stored a new board contents into Event structure
709 mBuffer[evID].board[ boardId ] = boardId ;
710 evtCtrl.evtStat[ iDx ]++ ;
711 evtCtrl.pcTime[ iDx ] = g_actTime ;
712
713 if (++mBuffer[evID].nBoard >= actBoards ) {
714 snprintf(str,MXSTR,"%5d complete event %8d %8d %2d",mBuffer[evID].evNum,evtCtrl.evtBuf[iDx],iDx,evtCtrl.evtStat[ iDx ]);
715 factOut(kDebug,-1, str ) ;
716 //complete event read ---> flag for next processing
717 evtCtrl.evtStat[ iDx ] = 99;
718 gi_EvtRead++ ;
719 gi_EvtTot++ ;
720 }
721
722EndBuf:
723 rd[i].bufTyp = 0 ; //ready to read next header
724 rd[i].bufLen = frst_len ;
725 rd[i].bufPos = 0 ;
726EndBuf1:
727 ;
728 }
729
730 } else { //we are reading event header
731 rd[i].bufPos += jrd ;
732 rd[i].bufLen -= jrd ;
733 if ( rd[i].bufPos >= minLen ){ //sufficient data to take action
734 //check if startflag correct; else shift block ....
735 for (k=0; k<rd[i].bufPos -1 ; k++) {
736 if (rd[i].rBuf->B[k ] == start.B[1]
737 && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
738 }
739 rd[i].skip += k ;
740
741 if (k >= rd[i].bufPos-1 ) { //no start of header found
742// snprintf(str,MXSTR,"no start of header on port%d", i ) ;
743// factOut(kWarn,666, str ) ;
744
745 rd[i].bufPos = 0 ;
746 rd[i].bufLen = head_len ;
747 } else if ( k>0 ) {
748 rd[i].bufPos -= k ;
749 rd[i].bufLen += k ;
750 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ;
751 }
752 if ( rd[i].bufPos >= minLen ) {
753 if ( rd[i].skip >0 ) {
754 snprintf(str,MXSTR,"skipped %d bytes on port%d", rd[i].skip, i ) ;
755 factOut(kInfo,666, str ) ;
756 rd[i].skip = 0 ;
757 }
758 goodhed++;
759 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ;
760 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
761 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; //(FADevt)
762 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ;
763 if (rd[i].runID ==0 ) rd[i].runID = myRun ;
764 rd[i].bufTyp = 1 ; //ready to read full record
765 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
766 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ; //?
767 debugHead(i,rd[i].rBuf);
768 debugRead(i,jrd,rd[i].evtID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event
769 } else {
770 debugRead(i,jrd,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
771 }
772 } else {
773 debugRead(i,jrd,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
774 }
775
776 } //end interpreting last read
777 } //end of successful read anything
778 } //finished trying to read all sockets
779
780 int qwait=0, qdel=0, qskip=0 ;
781 g_actTime = time(NULL) ;
782 if ( g_actTime > gi_SecTime ) {
783 gi_SecTime = g_actTime ;
784
785
786 //loop over all active events and flag those older than read-timeout
787 //delete those that are written to disk ....
788
789 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
790 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
791
792 int k1=evtCtrl.frstPtr;
793 for ( k=k1; k<(k1+kd); k++ ) {
794 int k0 = k % (MAX_EVT*MAX_RUN) ;
795//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
796 if (evtCtrl.evtStat[k0] > 0
797 && evtCtrl.evtStat[k0] < 90 ) {
798
799 qwait++;
800
801 if( evtCtrl.pcTime[k0] < g_actTime-10 ) {
802 int id =evtCtrl.evtBuf[k0] ;
803 snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]);
804 factOut(kWarn,601, str ) ;
805 evtCtrl.evtStat[k0] = 91 ; //timeout for incomplete events
806 gi_EvtBad++ ;
807 gi_EvtTot++ ;
808 qskip++;
809 }
810
811
812 } else if (evtCtrl.evtStat[k0] >= 900 ) {
813
814 int id =evtCtrl.evtBuf[k0] ;
815 snprintf(str,MXSTR,"%5d free event buffer (written) %3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
816 factOut(kDebug,-1, str ) ;
817 mBufFree(id) ; //event written--> free memory
818 evtCtrl.evtStat[k0] = -1;
819 qdel++;
820 qtot++;
821 }
822
823 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) {
824 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
825 }
826 }
827
828qconn=0 ;
829int ib ;
830for (ib=0; ib<NBOARDS; ib++) qconn+=gi_NumConnect[ib] ;
831int64_t stat[9] ;
832 stat[0]= qwait;
833 stat[1]= qskip;
834 stat[2]= qdel ;
835 stat[3]= qtot ;
836 stat[4]= gi_usedMem ;
837 stat[5]= qread;
838 stat[6]= qconn;
839
840factStat(stat,7);
841qread=0 ;
842 }
843
844
845
846
847 if (numok > 0 ) numok2=0;
848 else if (numok2++ > 3) {
849 if (g_runStat == 1) {
850 xwait.tv_sec = 1;
851 xwait.tv_nsec= 0 ; // hibernate for 1 sec
852 } else {
853 xwait.tv_sec = 0;
854 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
855 }
856 nanosleep( &xwait , NULL ) ;
857 }
858
859 } //and do next loop over all sockets ...
860
861 //must quit eventbuilding
862 snprintf(str,MXSTR,"stop reading ...");
863 factOut(kInfo,-1, str ) ;
864
865 //flag all events as 'read finished'
866 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
867 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
868
869 int k1=evtCtrl.frstPtr;
870
871 for ( k=k1; k<(k1+kd); k++ ) {
872 int k0 = k % (MAX_EVT*MAX_RUN) ;
873 if (evtCtrl.evtStat[k0] > 0
874 && evtCtrl.evtStat[k0] < 90 ) {
875 evtCtrl.evtStat[k0] = 91 ;
876 gi_EvtBad++ ;
877 gi_EvtTot++ ;
878 }
879 }
880
881 //must close all open sockets ...
882 snprintf(str,MXSTR,"close all sockets ...");
883 factOut(kInfo,-1, str ) ;
884 for (i=0; i<MAX_SOCK; i++) {
885 GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy socket
886 gi_NumConnect[ i/7 ]-- ;
887 }
888
889 xwait.tv_sec = 0;
890 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
891 nanosleep( &xwait , NULL ) ;
892 gi_runStat = -11 ; //inform all that no update to happen any more
893
894
895 int minclear = 900 ; //usually wait until writing finished (stat 900)
896 if (g_runStat <-1 ) minclear = 0 ; //in case of abort clear all
897
898
899 //and clear all buffers (might have to wait until all others are done)
900 snprintf(str,MXSTR,"clear all buffers ...");
901 factOut(kInfo,-1, str ) ;
902 int numclear=1 ;
903 while (numclear > 0 ) {
904 numclear = 0 ;
905 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
906 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
907
908 int k1=evtCtrl.frstPtr;
909 for ( k=k1; k<(k1+kd); k++ ) {
910 int k0 = k % (MAX_EVT*MAX_RUN) ;
911 if (evtCtrl.evtStat[k0] > minclear ) {
912 int id =evtCtrl.evtBuf[k0] ;
913 mBufFree(id) ; //event written--> free memory
914 evtCtrl.evtStat[k0] = -1;
915 } else if (evtCtrl.evtStat[k0] > 0) numclear++ ; //writing is still ongoing...
916
917 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 )
918 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
919 }
920
921 xwait.tv_sec = 0;
922 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
923 nanosleep( &xwait , NULL ) ;
924 }
925
926 snprintf(str,MXSTR,"Exit read Process ...");
927 factOut(kInfo,-1, str ) ;
928 gi_runStat = -99 ;
929 return 0;
930
931} /*-----------------------------------------------------------------*/
932
933
934void *procEvt( void *ptr ) {
935/* *** main loop processing file, including SW-trigger */
936 int numProc, numWait ;
937 int k ;
938 struct timespec xwait ;
939 char str[MXSTR] ;
940
941 cpu_set_t mask;
942 int cpu = 5 ; //process thread (will be several in final version)
943
944 snprintf(str,MXSTR,"Starting process-thread");
945 factOut(kInfo,-1, str ) ;
946
947/* CPU_ZERO initializes all the bits in the mask to zero. */
948 CPU_ZERO( &mask );
949/* CPU_SET sets only the bit corresponding to cpu. */
950 CPU_SET( cpu, &mask );
951/* sched_setaffinity returns 0 in success */
952 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
953 snprintf(str,MXSTR,"P ---> can not create affinity to %d",cpu);
954 factOut(kWarn,-1, str ) ;
955 }
956
957
958 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
959
960 numWait = numProc = 0 ;
961 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
962 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
963
964 int k1=evtCtrl.frstPtr;
965 for ( k=k1; k<(k1+kd); k++ ) {
966 int k0 = k % (MAX_EVT*MAX_RUN) ;
967//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
968 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
969 int id = evtCtrl.evtBuf[k0] ;
970 int ievt = mBuffer[id].evNum ;
971 int roi = mBuffer[id].nRoi ;
972// uint32_t irun = mBuffer[id].runNum ;
973//snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ;
974//factOut(kDebug,-1, str ) ;
975
976//make sure unused pixels/tmarks are cleared to zero
977 int ip,it,dest,ib;
978 for (ip=0; ip<NPIX; ip++) {
979 if (mBuffer[id].fEvent->StartPix[ip] == -1 ) {
980 dest= ip*roi ;
981 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
982 }
983 }
984 for (it=0; it<NTMARK; it++) {
985 if (mBuffer[id].fEvent->StartTM[it] == -1 ) {
986 dest= it*roi + NPIX*roi ;
987 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
988 }
989 }
990
991
992//and set correct event header ; also check for consistency in event (not yet)
993 mBuffer[id].fEvent->Roi = roi ;
994 mBuffer[id].fEvent->EventNum = ievt ;
995 mBuffer[id].fEvent->TriggerType = 0 ; // TBD
996 mBuffer[id].fEvent->SoftTrig = 0 ;
997 for (ib=0; ib<NBOARDS; ib++) {
998 if (mBuffer[id].board[ib] == -1 ) { //board is not read
999 mBuffer[id].FADhead[ib].start_package_flag = 0 ;
1000 mBuffer[id].fEvent->BoardTime[ib] = 0 ;
1001 } else {
1002 mBuffer[id].fEvent->BoardTime[ib] =
1003 ntohl(mBuffer[id].FADhead[ib].time) ;
1004 }
1005 }
1006
1007 int i=eventCheck(mBuffer[id].FADhead,mBuffer[id].fEvent) ;
1008
1009 if (i<0) evtCtrl.evtStat[k0] = 999 ; //flag event to be skipped
1010
1011 numProc++ ;
1012 evtCtrl.evtStat[k0] = 520 ;
1013 gp_EvtTot++ ;
1014 } else if ( evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] < 90 ) {
1015 numWait++ ;
1016 }
1017 }
1018
1019 if ( gi_runStat < -10 && numWait == 0) { //nothing left to do
1020 snprintf(str,MXSTR,"Exit Processing Process ...");
1021 factOut(kInfo,-1, str ) ;
1022 gp_runStat = -22 ; //==> we should exit
1023 return 0 ;
1024 }
1025
1026 if (numProc == 0) {
1027 //seems we have nothing to do, so sleep a little
1028 xwait.tv_sec = 0;
1029 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1030 nanosleep( &xwait , NULL ) ;
1031 }
1032 gp_runStat = gi_runStat ;
1033
1034 }
1035
1036 //we are asked to abort asap ==> must flag all remaining events
1037 // when gi_runStat claims that all events are in the buffer...
1038
1039 snprintf(str,MXSTR,"Abort Processing Process ...");
1040 factOut(kInfo,-1, str ) ;
1041 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1042 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1043
1044 int k1=evtCtrl.frstPtr;
1045 for ( k=k1; k<(k1+kd); k++ ) {
1046 int k0 = k % (MAX_EVT*MAX_RUN) ;
1047 if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <500) {
1048 evtCtrl.evtStat[k0] = 555 ; //flag event as 'processed'
1049 }
1050 }
1051
1052 gp_runStat = -99 ;
1053
1054 return 0;
1055
1056} /*-----------------------------------------------------------------*/
1057
1058int CloseRunFile(uint32_t runId, uint32_t closeTime) {
1059/* close run runId (all all runs if runId=0) */
1060/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1061 int i, j ;
1062
1063 if (runId == 0 ) {
1064 for ( j=0; j<MAX_RUN; j++) {
1065 if ( runCtrl[j].fileId == 0 ) { //run is open
1066 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1067 if (i<0) {
1068 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
1069 factOut(kError,506, str ) ;
1070 runCtrl[j].fileId = 888 ;
1071 } else {
1072 snprintf(str,MXSTR,"closing run %d ok AAA",runCtrl[j].runId);
1073 factOut(kInfo,507, str ) ;
1074 runCtrl[j].fileId = 7777 ;
1075 }
1076 runCtrl[j].closeTime = closeTime ;
1077 }
1078 }
1079 return 0 ;
1080 }
1081
1082
1083 for ( j=0; j<MAX_RUN; j++) {
1084 if ( runCtrl[j].runId == runId ) {
1085 if ( runCtrl[j].fileId == 0 ) { //run is open
1086 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1087 if (i<0) {
1088 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
1089 factOut(kError,506, str ) ;
1090 runCtrl[j].fileId = 888 ;
1091 } else {
1092 snprintf(str,MXSTR,"closing run %d ok AAA",runCtrl[j].runId);
1093 factOut(kInfo,507, str ) ;
1094 runCtrl[j].fileId = 7777 ;
1095 }
1096 runCtrl[j].closeTime = closeTime ;
1097 return 0;
1098 } else if ( runCtrl[j].fileId <0 ) { //run not yet opened
1099 runCtrl[j].closeTime = closeTime ;
1100 return +1;
1101 } else { // run already closed
1102 return +2;
1103 }
1104 }
1105 } //we only reach here if the run was never created
1106 return -1;
1107
1108} /*-----------------------------------------------------------------*/
1109
1110
1111void *writeEvt( void *ptr ) {
1112/* *** main loop writing event (including opening and closing run-files */
1113
1114 int numWrite, numWait ;
1115 int k,j ;
1116 struct timespec xwait ;
1117 char str[MXSTR] ;
1118
1119 cpu_set_t mask;
1120 int cpu = 3 ; //write thread
1121
1122 snprintf(str,MXSTR,"Starting write-thread");
1123 factOut(kInfo,-1, str ) ;
1124
1125/* CPU_ZERO initializes all the bits in the mask to zero. */
1126 CPU_ZERO( &mask );
1127/* CPU_SET sets only the bit corresponding to cpu. */
1128 CPU_SET( cpu, &mask );
1129/* sched_setaffinity returns 0 in success */
1130 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1131 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
1132 }
1133
1134 int lastRun = 0 ; //usually run from last event still valid
1135
1136 while (g_runStat >-2) {
1137
1138 numWait = numWrite = 0 ;
1139 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1140 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1141
1142 int k1=evtCtrl.frstPtr;
1143 for ( k=k1; k<(k1+kd); k++ ) {
1144 int k0 = k % (MAX_EVT*MAX_RUN) ;
1145//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1146 if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] < 900) {
1147 int id = evtCtrl.evtBuf[k0] ;
1148 uint32_t irun = mBuffer[id].runNum ;
1149 int ievt = mBuffer[id].evNum ;
1150
1151 if (runCtrl[lastRun].runId == irun ) {
1152 j = lastRun ;
1153 } else {
1154 //check which fileID to use (or open if needed)
1155 for ( j=0; j<MAX_RUN; j++) {
1156 if ( runCtrl[j].runId == irun ) break ;
1157 }
1158 if ( j >= MAX_RUN ) {
1159 snprintf(str,MXSTR,"W error: can not find run %d for event %d in %d", irun,ievt,id);
1160 factOut(kFatal,901, str ) ;
1161for ( j=0; j<MAX_RUN; j++) printf("j %d run.j %d run %d\n",j,runCtrl[j].runId,irun );
1162exit(111);
1163 }
1164 lastRun = j ;
1165 }
1166
1167 if (runCtrl[j].fileId < 0 ) {
1168 actRun.Version = 1 ;
1169 actRun.RunType = -1 ;
1170 actRun.NBoard = NBOARDS ;
1171 actRun.NPix = NPIX ;
1172 actRun.NTm = NTMARK ;
1173 actRun.Nroi = mBuffer[id].nRoi ;
1174// actRun.FADhead = mBuffer[id].FADhead ; //to be corrected
1175 runCtrl[j].nextEvt= 0;
1176 runCtrl[j].lastTime=g_actTime ;
1177 runCtrl[j].fileHd = runOpen(irun, &actRun, sizeof(actRun) ) ;
1178 if (runCtrl[j].fileHd == NULL ) {
1179 snprintf(str,MXSTR,"W could not open a file for run %d",irun);
1180 factOut(kError,502, str ) ;
1181 runCtrl[j].fileId = 99 ;
1182 } else {
1183 snprintf(str,MXSTR,"W opened new run_file %d",irun) ;
1184 factOut(kInfo,-1, str ) ;
1185 runCtrl[j].fileId = 0 ;
1186 }
1187
1188 }
1189
1190 if (runCtrl[j].fileId > 0 ) {
1191 snprintf(str,MXSTR,"W no open file for this run %d",irun) ;
1192 factOut(kDebug,123,str) ;
1193 evtCtrl.evtStat[k0] = 902 ;
1194 } else {
1195 int i=runWrite(runCtrl[j].fileHd, mBuffer[id].fEvent, sizeof(mBuffer[id]) );
1196 if (i<0) {
1197 snprintf(str,MXSTR,"W error writing event for run %d",irun) ;
1198 factOut(kError,503, str ) ;
1199 evtCtrl.evtStat[k0] = 901 ;
1200 //close run
1201 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1202 if (i<0) {
1203 snprintf(str,MXSTR,"W error closing run %d",irun) ;
1204 factOut(kError,503, str ) ;
1205 } else {
1206 snprintf(str,MXSTR,"W closed run %d because of write error",irun) ;
1207 factOut(kInfo,503, str ) ;
1208 }
1209 runCtrl[j].fileId = 9999 ;
1210 } else {
1211 runCtrl[j].lastTime = g_actTime;
1212 evtCtrl.evtStat[k0] = 901 ;
1213 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0);
1214 factOut(kDebug,504, str ) ;
1215 }
1216 }
1217 } else if (evtCtrl.evtStat[k0] > 0 ) numWait++ ;
1218 }
1219
1220 //check if we should close a run ...
1221 for ( j=0; j<MAX_RUN; j++) {
1222 if ( runCtrl[j].fileId==0
1223 && ( runCtrl[j].closeTime < g_actTime
1224 ||runCtrl[j].lastTime < g_actTime-120) ) {
1225 int i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1226 if (i<0) {
1227 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
1228 factOut(kError,506, str ) ;
1229 runCtrl[j].fileId = 888 ;
1230 } else {
1231 snprintf(str,MXSTR,"closing run %d ok BBB",runCtrl[j].runId);
1232 factOut(kInfo,507, str ) ;
1233 runCtrl[j].fileId = 7777 ;
1234 }
1235 }
1236 }
1237
1238 if (numWrite == 0) {
1239 //seems we have nothing to do, so sleep a little
1240 xwait.tv_sec = 0;
1241 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1242 nanosleep( &xwait , NULL ) ;
1243 }
1244
1245 if ( gi_runStat < -10 && numWait == 0) { //nothing left to do
1246 snprintf(str,MXSTR,"Finish Write Process ...");
1247 factOut(kInfo,-1, str ) ;
1248 gw_runStat = -22 ; //==> we should exit
1249 goto closerun ;
1250 }
1251 gw_runStat = gi_runStat ;
1252
1253 }
1254
1255 //must close all open files ....
1256 snprintf(str,MXSTR,"Abort Writing Process ...");
1257 factOut(kInfo,-1, str ) ;
1258closerun:
1259 snprintf(str,MXSTR,"Close all open files ...");
1260 factOut(kInfo,-1, str ) ;
1261 for ( j=0; j<MAX_RUN; j++)
1262 if ( runCtrl[j].fileId ==0 ) {
1263 int i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1264 if (i<0) {
1265 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
1266 factOut(kError,506, str ) ;
1267 runCtrl[j].fileId = 888 ;
1268 } else {
1269 snprintf(str,MXSTR,"closing run %d ok AAA",runCtrl[j].runId);
1270 factOut(kInfo,507, str ) ;
1271 runCtrl[j].fileId = 7777 ;
1272 }
1273 }
1274
1275 gw_runStat = -99;
1276 snprintf(str,MXSTR,"Exit Writing Process ...");
1277 factOut(kInfo,-1, str ) ;
1278 return 0;
1279
1280
1281
1282
1283} /*-----------------------------------------------------------------*/
1284
1285
1286
1287
1288void StartEvtBuild() {
1289
1290 int i,j,imax,status,th_ret[50] ;
1291 pthread_t thread[50] ;
1292 struct timespec xwait ;
1293 uint32_t actime ;
1294
1295 gi_runStat = gp_runStat = gw_runStat = 0 ;
1296
1297 snprintf(str,MXSTR,"Starting EventBuilder");
1298 factOut(kInfo,-1, str ) ;
1299
1300
1301 evtCtrl.frstPtr = 0 ;
1302 evtCtrl.lastPtr = 0 ;
1303
1304 actime = g_actTime + 50000000 ;
1305/* initialize run control logics */
1306 for (i=0; i<MAX_RUN; i++) {
1307 runCtrl[i].runId = 0 ;
1308 runCtrl[i].lastTime = 0 ;
1309 runCtrl[i].closeTime = time(NULL) + 3600*24*7;
1310
1311 runCtrl[i].nextEvt = 0 ;
1312 runCtrl[i].fileId = -2 ;
1313
1314 runTail[i].nEventsOk =
1315 runTail[i].nEventsRej =
1316 runTail[i].nEventsBad =
1317 runTail[i].PCtime0 =
1318 runTail[i].PCtimeX = 0 ;
1319 }
1320
1321//start all threads (more to come) when we are allowed to ....
1322 while (g_runStat == 0 ) {
1323 xwait.tv_sec = 0;
1324 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1325 nanosleep( &xwait , NULL ) ;
1326 }
1327
1328 i=0 ;
1329 th_ret[i] = pthread_create( &thread[i], NULL, readFAD, NULL );
1330 i++;
1331 th_ret[i] = pthread_create( &thread[i], NULL, procEvt, NULL );
1332 i++;
1333 th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, NULL );
1334 i++;
1335 imax=i ;
1336
1337
1338
1339
1340
1341/*
1342 xwait.tv_sec = 20;;
1343 xwait.tv_nsec= 0 ; // sleep for ~20sec
1344 nanosleep( &xwait , NULL ) ;
1345
1346
1347 printf("close all runs in 2 seconds\n");
1348
1349
1350 CloseRunFile( 0, time(NULL)+2) ;
1351
1352 xwait.tv_sec = 5;;
1353 xwait.tv_nsec= 0 ; // sleep for ~20sec
1354 nanosleep( &xwait , NULL ) ;
1355
1356 printf("setting g_runstat to -1\n");
1357
1358 g_runStat = -1 ;
1359*/
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371//wait for all threads to finish
1372 for (i=0; i<imax; i++) {
1373 j = pthread_join ( thread[i], (void **)&status) ;
1374 }
1375
1376} /*-----------------------------------------------------------------*/
1377
1378
1379
1380
1381
1382 /*-----------------------------------------------------------------*/
1383 /*-----------------------------------------------------------------*/
1384 /*-----------------------------------------------------------------*/
1385 /*-----------------------------------------------------------------*/
1386 /*-----------------------------------------------------------------*/
1387 /*-----------------------------------------------------------------*/
1388 /*-----------------------------------------------------------------*/
1389 /*-----------------------------------------------------------------*/
1390 /*-----------------------------------------------------------------*/
1391 /*-----------------------------------------------------------------*/
1392
1393
1394/*
1395
1396FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len )
1397{ return 1; } ;
1398
1399int runWrite(FileHandle_t fileHd , EVENT *event, size_t len )
1400{ return 1; } ;
1401
1402int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len )
1403{ return 1; } ;
1404
1405
1406
1407int eventCheck( PEVNT_HEADER *fadhd, EVENT *event)
1408{
1409 int i=0;
1410
1411 printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
1412 for (i=0; i<NBOARDS; i++) {
1413 printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
1414 }
1415 return 0;
1416}
1417
1418
1419void factStat(int64_t *array, int len ) {
1420 printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
1421 array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
1422}
1423
1424
1425void debugRead(int isock, int ibyte, int32_t event, int state, uint32_t tsec, uint32_t tusec ) {
1426// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
1427}
1428
1429
1430
1431void debugStream(int isock, void *buf, int len) {
1432}
1433
1434void debugHead(int i, void *buf) {
1435}
1436
1437
1438void factOut(int severity, int err, char* message ) {
1439static FILE * fd ;
1440static int file=0 ;
1441
1442 if (file==0) {
1443 printf("open file\n");
1444 fd=fopen("x.out","w+") ;
1445 file=999;
1446 }
1447
1448 fprintf(fd,"%3d %3d | %s \n",severity,err,message) ;
1449
1450 if (severity != kDebug)
1451 printf("%3d %3d | %s\n",severity,err,message) ;
1452}
1453
1454
1455
1456int main() {
1457 int i,b,c,p ;
1458 char ipStr[100] ;
1459 struct in_addr IPaddr ;
1460
1461 g_maxMem = 1024*1024 ; //MBytes
1462 g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
1463
1464
1465 g_runStat = 40 ;
1466
1467 i=0 ;
1468
1469// version for standard crates
1470//for (c=0; c<4,c++) {
1471// for (b=0; b<10; b++) {
1472// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
1473//
1474// inet_pton(PF_INET, ipStr, &IPaddr) ;
1475//
1476// g_port[i].sockAddr.sin_family = PF_INET;
1477// g_port[i].sockAddr.sin_port = htons(5000) ;
1478// g_port[i].sockAddr.sin_addr = IPaddr ;
1479// g_port[i].sockDef = 1 ;
1480// i++ ;
1481// }
1482//}
1483//
1484//version for PC-test
1485 for (c=0; c<4; c++) {
1486 for (b=0; b<10; b++) {
1487 sprintf(ipStr,"10.0.%d.11",128+c) ;
1488 if (c==0) sprintf(ipStr,"10.0.100.11") ;
1489
1490 inet_pton(PF_INET, ipStr, &IPaddr) ;
1491 p = 31919+100*c+10*b;
1492
1493
1494 g_port[i].sockAddr.sin_family = PF_INET;
1495 g_port[i].sockAddr.sin_port = htons(p) ;
1496 g_port[i].sockAddr.sin_addr = IPaddr ;
1497 g_port[i].sockDef = 1 ;
1498
1499 i++ ;
1500 }
1501 }
1502
1503
1504//g_port[17].sockDef =-1 ;
1505//g_actBoards-- ;
1506
1507 StartEvtBuild() ;
1508
1509 return 0;
1510
1511}
1512*/
Note: See TracBrowser for help on using the repository browser.