source: trunk/FACT++/src/EventBuilder.c@ 11185

Last change on this file since 11185 was 11181, checked in by tbretz, 13 years ago
Added more infos to debugRead; some fixed to CompareBit
File size: 45.2 KB
Line 
1
2
3#define PX8 99 //simulator does not create double-length roi for pixel 8
4 //for real data, set PX8 = 8 ==> ask for double roi=TM
5
6
7
8#include <stdlib.h>
9#include <stdint.h>
10#include <unistd.h>
11#include <stdio.h>
12#include <sys/time.h>
13#include <arpa/inet.h>
14#include <string.h>
15#include <math.h>
16#include <error.h>
17#include <errno.h>
18#include <unistd.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <pthread.h>
22#include <sched.h>
23
24#include "EventBuilder.h"
25
26enum Severity
27{
28 kMessage = 10, ///< Just a message, usually obsolete
29 kInfo = 20, ///< An info telling something which can be interesting to know
30 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
31 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
32 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
33 kDebug = 99, ///< A message used for debugging only
34};
35
36#define MIN_LEN 32 // min #bytes needed to interpret FADheader
37#define MAX_LEN 64*1024 // size of read-buffer per socket
38
39extern FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) ;
40extern int runWrite(FileHandle_t fileHd , EVENT *event, size_t len ) ;
41extern int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len ) ;
42extern void factOut(int severity, int err, char* message ) ;
43extern void factStat(int64_t *array , int len ) ;
44
45extern int eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ;
46
47
48extern void debugHead(int i, void *buf);
49
50extern void debugRead(int isock, int ibyte, int32_t event,int32_t ftmevt, int32_t runnr, int state,
51 uint32_t tsec, uint32_t tusec ) ;
52extern void debugStream(int isock, void *buf, int len) ;
53
54
55
56int g_actTime = 0 ;
57int g_runStat = 40 ;
58size_t g_maxMem ; //maximum memory allowed for buffer
59
60//no longer needed ...
61 int g_maxBoards ; //maximum number of boards to be initialized
62 int g_actBoards ;
63//
64
65FACT_SOCK g_port[NBOARDS] ; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
66
67
68int gi_runStat ;
69int gp_runStat ;
70int gw_runStat ;
71
72
73uint gi_SecRate[MAX_SOCK] ;
74uint gi_S10Rate[MAX_SOCK] ;
75uint gi_MinRate[MAX_SOCK] ;
76uint gi_ErrCnt[MAX_SOCK] ;
77
78uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
79
80uint gi_SecTime, gi_S10Time, gi_MinTime ;
81uint gi_EvtStart= 0 ;
82uint gi_EvtRead = 0 ;
83uint gi_EvtBad = 0 ;
84uint gi_EvtTot = 0 ;
85size_t gi_usedMem = 0 ;
86
87uint gw_EvtTot = 0 ;
88uint gp_EvtTot = 0 ;
89
90PIX_MAP g_pixMap[NPIX] ;
91
92EVT_CTRL evtCtrl ; //control of events during processing
93int evtIdx[MAX_EVT*MAX_RUN] ; //index from mBuffer to evtCtrl
94
95WRK_DATA mBuffer[MAX_EVT*MAX_RUN]; //local working space
96
97
98
99
100RUN_HEAD actRun ;
101
102RUN_CTRL runCtrl[MAX_RUN] ;
103
104RUN_TAIL runTail[MAX_RUN] ;
105
106
107/*
108*** Definition of rdBuffer to read in IP packets; keep it global !!!!
109 */
110
111
112typedef union {
113 int8_t B[MAX_LEN/8];
114 int16_t S[MAX_LEN/4];
115 int32_t I[MAX_LEN/2];
116 int64_t L[MAX_LEN ];
117} CNV_FACT ;
118
119typedef struct {
120 int bufTyp ; //what are we reading at the moment: 0=header 1=data -1=skip ...
121 int32_t bufPos ; //next byte to read to the buffer next
122 int32_t bufLen ; //number of bytes left to read
123 int32_t skip ; //number of bytes skipped before start of event
124
125 int sockStat ; //-1 if socket not yet connected , 99 if not exist
126 int socket ; //contains the sockets
127 struct sockaddr_in SockAddr ; //IP for each socket
128
129 int evtID ; // event ID of event currently read
130 int runID ; // run "
131 int ftmID ; // event ID from FTM
132 uint fadLen ; // FADlength of event currently read
133 int fadVers ; // Version of FAD
134 int board ; // boardID (softwareID: 0..40 )
135 int Port ;
136
137 CNV_FACT *rBuf ;
138
139} READ_STRUCT ;
140
141
142typedef union {
143 int8_t B[2];
144 int16_t S ;
145} SHORT_BYTE ;
146
147
148
149
150
151#define MXSTR 1000
152char str[MXSTR] ;
153
154SHORT_BYTE start, stop;
155
156READ_STRUCT rd[MAX_SOCK] ; //buffer to read IP and afterwards store in mBuffer
157
158
159
160/*-----------------------------------------------------------------*/
161
162
163/*-----------------------------------------------------------------*/
164
165
166
167int GenSock(int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) {
168/*
169*** generate Address, create sockets and allocates readbuffer for it
170***
171*** if flag==0 generate socket and buffer
172*** <0 destroy socket and buffer
173*** >0 close and redo socket
174***
175*** sid : board*7 + port id
176 */
177
178 int j ;
179
180 if (rd->sockStat ==0 ) { //close socket if open
181 j=close(rd->socket) ;
182 if (j>0) {
183 snprintf(str,MXSTR,"Error closing socket %d | %m",sid);
184 factOut(kFatal,771, str ) ;
185 } else {
186 snprintf(str,MXSTR,"Succesfully closed socket %d",sid);
187 factOut(kInfo,771, str ) ;
188 }
189 }
190
191
192 if (flag < 0) {
193 free(rd->rBuf) ; //and never open again
194 rd->rBuf = NULL ;
195 rd->sockStat = 99 ;
196 return 0 ;
197 }
198
199
200 if (flag == 0) { //generate address and buffer ...
201 rd->Port = port ;
202 rd->SockAddr.sin_family = sockAddr->sin_family;
203 rd->SockAddr.sin_port = htons(port) ;
204 rd->SockAddr.sin_addr = sockAddr->sin_addr ;
205
206 rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
207 if ( rd->rBuf == NULL ) {
208 snprintf(str,MXSTR,"Could not create local buffer %d",sid);
209 factOut(kFatal,774, str ) ;
210 rd->sockStat = 77 ;
211 return -3 ;
212 }
213 }
214
215
216 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
217 snprintf(str,MXSTR,"Could not generate socket %d | %m",sid);
218 factOut(kFatal,773, str ) ;
219 rd->sockStat = 88 ;
220 return -2 ;
221 }
222
223 snprintf(str,MXSTR,"Successfully generated socket %d ",sid);
224 factOut(kInfo,773, str ) ;
225 rd->sockStat = -1 ; //try to (re)open socket
226 return 0 ;
227
228} /*-----------------------------------------------------------------*/
229
230 /*-----------------------------------------------------------------*/
231
232
233
234
235int mBufInit() {
236// initialize mBuffer (mark all entries as unused\empty)
237
238 int i ;
239 uint32_t actime ;
240
241 actime = g_actTime + 50000000 ;
242
243 for (i=0; i<MAX_EVT*MAX_RUN; i++) {
244 mBuffer[i].evNum = mBuffer[i].runNum = -1;
245
246 evtCtrl.evtBuf[ i] = -1 ;
247 evtCtrl.evtStat[ i] = -1 ;
248 evtCtrl.pcTime[ i] = actime ; //initiate to far future
249
250 }
251
252
253 actRun.FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ;
254
255 evtCtrl.frstPtr = 0 ;
256 evtCtrl.lastPtr = 0 ;
257
258 return 0 ;
259
260} /*-----------------------------------------------------------------*/
261
262
263
264
265int mBufEvt(uint evID, uint runID, uint nRoi) {
266// generate a new Event into mBuffer:
267// make sure only complete Event are possible, so 'free' will always work
268// returns index into mBuffer[], or negative value in case of error
269
270
271 int i, k, evFree ;
272 int headmem=0 ;
273 size_t needmem = 0 ;
274
275 if (nRoi <=0 || nRoi > 1024) {
276 snprintf(str,MXSTR,"illegal nRoi: %d",nRoi) ;
277 factOut(kError, 1, str ) ;
278 return -9999 ;
279 }
280
281 i = evID % MAX_EVT ;
282 evFree = -1 ;
283
284 for ( k=0; k<MAX_RUN; k++) {
285 if ( mBuffer[i].evNum == evID
286 && mBuffer[i].runNum== runID ) {
287 return i ;
288 }
289 if ( evFree < 0 && mBuffer[i].evNum < 0 ) evFree = i ;
290 i += MAX_EVT ;
291 }
292
293
294 //event does not yet exist; create
295 if (evFree < 0 ) { //no space available in ctrl
296 snprintf(str,MXSTR,"no control slot to keep event %d",evID) ;
297 factOut(kError,881, str ) ;
298 return -1 ;
299 }
300 i = evFree ; //found free entry; use it ...
301
302
303 needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2; //
304
305 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
306
307 if ( gi_usedMem + needmem + headmem > g_maxMem) {
308 snprintf(str,MXSTR,"no memory left to keep event %d",evID) ;
309 factOut(kError,882, str ) ;
310 return -11 ;
311 }
312
313 mBuffer[i].FADhead = malloc( headmem ) ;
314 if (mBuffer[i].FADhead == NULL) {
315 snprintf(str,MXSTR,"malloc header failed for event %d",evID) ;
316 factOut(kError,882, str ) ;
317 return -12;
318 }
319
320 mBuffer[i].fEvent = malloc( needmem ) ;
321 if (mBuffer[i].fEvent == NULL) {
322 snprintf(str,MXSTR,"malloc data failed for event %d",evID) ;
323 factOut(kError,882, str ) ;
324 free(mBuffer[i].fEvent) ;
325 mBuffer[i].fEvent = NULL ;
326 return -22;
327 }
328
329 //flag all boards as unused
330 mBuffer[i].nBoard = 0 ;
331 for (k=0; k<NBOARDS; k++ ) {
332 mBuffer[i].board[k] = -1;
333 }
334
335 //flag all pixels as unused
336 for (k=0; k<NPIX; k++ ) {
337 mBuffer[i].fEvent->StartPix[k] = -1 ;
338 }
339
340 //flag all TMark as unused
341 for (k=0; k<NTMARK; k++ ) {
342 mBuffer[i].fEvent->StartTM[k] = -1 ;
343 }
344
345 mBuffer[i].pcTime = g_actTime ;
346 mBuffer[i].nRoi = nRoi ;
347 mBuffer[i].evNum = evID ;
348 mBuffer[i].runNum = runID ;
349 mBuffer[i].evtLen = needmem ;
350
351 gi_usedMem += needmem + headmem;
352
353 //register event in 'active list (reading)'
354
355 evtCtrl.evtBuf[ evtCtrl.lastPtr] = i ;
356 evtCtrl.evtStat[ evtCtrl.lastPtr] = 0 ;
357 evtCtrl.pcTime[ evtCtrl.lastPtr] = g_actTime ;
358 evtIdx[i] = evtCtrl.lastPtr ;
359snprintf(str,MXSTR,"%5d start new evt %8d %8d %2d",evID,i,evtCtrl.lastPtr,0);
360factOut(kDebug,-11, str ) ;
361 evtCtrl.lastPtr++ ;
362 if (evtCtrl.lastPtr == MAX_EVT*MAX_RUN ) evtCtrl.lastPtr = 0;
363
364
365
366
367 gi_EvtStart++ ;
368
369 //check if runId already registered in runCtrl
370 evFree = -1 ;
371 for (k=0; k<MAX_RUN; k++) {
372 if (runCtrl[k].runId == runID ) return i ;//run exists already
373 else if (evFree < 0 && runCtrl[k].runId == 0 ) evFree = k ;
374 }
375
376 if (evFree <0 ) {
377 snprintf(str,MXSTR,"not able to register the new run %d",runID);
378 factOut(kFatal,883, str ) ;
379 } else {
380 runCtrl[evFree].runId = runID ;
381 }
382
383 return i ;
384
385} /*-----------------------------------------------------------------*/
386
387
388int mBufFree(int i) {
389//delete entry [i] from mBuffer:
390//(and make sure multiple calls do no harm ....)
391
392 int headmem=0 ;
393 size_t freemem = 0 ;
394
395 if ( mBuffer[i].nRoi > 0) { //have an fEvent structure generated ...
396 freemem = mBuffer[i].evtLen ;
397 free(mBuffer[i].fEvent ) ;
398 mBuffer[i].fEvent = NULL ;
399
400 free(mBuffer[i].FADhead ) ;
401 mBuffer[i].FADhead = NULL ;
402
403 }
404 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
405 mBuffer[i].evNum = mBuffer[i].runNum = mBuffer[i].nRoi= -1;
406
407 gi_usedMem = gi_usedMem - freemem - headmem;
408
409
410 return 0 ;
411
412} /*-----------------------------------------------------------------*/
413
414
415 /*-----------------------------------------------------------------*/
416
417
418
419void initReadFAD() {
420return ;
421} /*-----------------------------------------------------------------*/
422
423
424
425void *readFAD( void *ptr ) {
426/* *** main loop reading FAD data and sorting them to complete events */
427 int head_len,frst_len,numok,numok2,dest,evID,i,k ;
428 int actBoards = 0, minLen ;
429 int32_t jrd ;
430 int32_t myRun ;
431 int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ;
432 uint qtot = 0, qread = 0, qconn = 0 ;
433 int errcnt0 = 0 ;
434
435 int goodhed=0;
436
437 struct timespec xwait ;
438
439 int nokCnt[MAX_SOCK],loopCnt=0;
440 int sokCnt[MAX_SOCK];
441 int sockDef[NBOARDS];
442
443 struct timeval *tv, atv;
444 tv=&atv;
445 uint32_t tsec, tusec ;
446
447
448 snprintf(str,MXSTR,"start initializing");
449 factOut(kInfo,-1, str ) ;
450
451 int cpu = 7 ; //read thread
452 cpu_set_t mask;
453
454/* CPU_ZERO initializes all the bits in the mask to zero. */
455 CPU_ZERO( &mask );
456/* CPU_SET sets only the bit corresponding to cpu. */
457 cpu = 7 ;
458 CPU_SET( cpu, &mask );
459
460/* sched_setaffinity returns 0 in success */
461 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
462 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
463 factOut(kWarn,-1, str ) ;
464 }
465
466
467 //make sure all sockets are preallocated as 'not exist'
468 for (i=0; i<MAX_SOCK; i++) {
469 rd[i].socket = -1 ;
470 rd[i].sockStat = 99 ;
471 }
472 for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ;
473
474
475 g_actTime = time(NULL) ;
476 for (k=0; k<MAX_SOCK; k++)
477 gi_SecRate[k]=gi_S10Rate[k]=gi_MinRate[k]=gi_ErrCnt[k] = 0 ;
478
479 for (k=0; k<NBOARDS; k++)
480 gi_NumConnect[k]=0;
481
482
483 gi_SecTime= gi_S10Time= gi_MinTime= g_actTime ;
484
485 mBufInit() ; //initialize buffers
486
487 snprintf(str,MXSTR,"end initializing");
488 factOut(kInfo,-1, str ) ;
489
490
491 for (k=0; k<MAX_SOCK; k++) sokCnt[k]=nokCnt[k]=0 ;
492
493 head_len = sizeof(PEVNT_HEADER) ;
494//frst_len = head_len + 36 * 12 ; //fad_header plus 36*pix_header
495 frst_len = head_len ; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
496
497//minLen = MIN_LEN ; //min #bytes needed to check header
498 minLen = head_len ; //min #bytes needed to check header: full header for debug
499
500 numok = numok2 = 0 ;
501
502 start.S=0xFB01;
503 stop.S= 0x04FE;
504
505 myRun = g_actTime ;
506
507 gi_runStat = g_runStat ;
508
509
510 while (g_runStat >=0) { //loop until global variable g_runStat claims stop
511
512 gi_runStat = g_runStat ;
513
514 int b,p,p0,s0,nch;
515 nch = 0 ;
516 for (b=0; b<NBOARDS; b++ ) {
517 k = b*7 ;
518 if ( g_port[b].sockDef != sockDef[b] ) { //something has changed ...
519 nch++ ;
520 gi_NumConnect[ b ] = 0 ; //must close all connections
521 if ( sockDef[b] == 0) s0= 0 ; //sockets to be defined and opened
522 else if (g_port[b].sockDef == 0) s0=-1 ; //sockets to be destroyed
523 else s0=+1 ; //sockets to be closed and reopened
524
525 if (s0 == 0) p0=ntohs(g_port[b].sockAddr.sin_port);
526 else p0=0 ;
527
528 for (p=p0+1; p<p0+8; p++) {
529 GenSock(s0, k, p, &g_port[b].sockAddr, &rd[k]) ; //generate address and socket
530 k++ ;
531 }
532 sockDef[b] = g_port[b].sockDef ;
533 }
534 }
535
536 if (nch > 0 ) {
537 actBoards = 0 ;
538 for (b=0; b<NBOARDS; b++ ) {
539 if ( sockDef[b] > 0 ) actBoards++ ;
540 }
541 }
542
543
544 g_actTime = time(NULL) ;
545 nokCnt[numok]++;
546
547 loopCnt++ ;
548
549 numok = 0 ; //count number of succesfull actions
550
551 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read
552 b = i / 7 ;
553 if (sockDef[b] > 0) s0=+1 ;
554 else s0=-1 ;
555
556 gettimeofday( tv, NULL);
557 tsec = atv.tv_sec ;
558 tusec= atv.tv_usec ;
559
560 if (rd[i].sockStat <0 ) { //try to connect if not yet done
561 rd[i].sockStat=connect(rd[i].socket,
562 (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ;
563 if (rd[i].sockStat ==0 ) { //successfull ==>
564 if (sockDef[b] > 0) {
565 rd[i].bufTyp = 0 ; // expect a header
566 rd[i].bufLen = frst_len ; // max size to read at begining
567 } else {
568 rd[i].bufTyp = -1 ; // full data to be skipped
569 rd[i].bufLen = sizeof(CNV_FACT) ; //huge for skipping
570 }
571 rd[i].bufPos = 0 ; // no byte read so far
572 rd[i].skip = 0 ; // start empty
573 gi_NumConnect[ b ]++ ;
574 numok++ ; //make sure next round will execute
575 snprintf(str,MXSTR,"+++connect %d %d",b,gi_NumConnect[ b ]);
576 factOut(kInfo,-1, str ) ;
577 }
578 }
579
580 if (rd[i].sockStat ==0 ) { //we have a connection ==> try to read
581 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
582 numok++ ;
583 sokCnt[i]++;
584 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
585
586 if (jrd >0 ) {
587 qread+=jrd ;
588 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ;
589 }
590
591 if (jrd == 0) { //connection has closed ...
592 snprintf(str,MXSTR,"Socket %d closed by FAD",i);
593 factOut(kInfo,441, str ) ;
594 GenSock(s0, i, 0,NULL, &rd[i]) ;
595 gi_ErrCnt[i]++ ;
596 gi_NumConnect[ b ]-- ;
597
598 } else if ( jrd<0 ) { //did not read anything
599 if (errno != EAGAIN && errno != EWOULDBLOCK ) {
600 snprintf(str,MXSTR,"Error Reading from %d | %m",i);
601 factOut(kError,442, str ) ;
602 gi_ErrCnt[i]++ ;
603 } else numok-- ; //else nothing waiting to be read
604 jrd = 0 ;
605 }
606 } else jrd = 0 ; //did read nothing as requested
607
608 if ( rd[i].bufTyp <0 ) { // we are skipping this board ...
609// just do nothing
610
611 } else if ( rd[i].bufTyp >0 ) { // we are reading data ...
612 if ( jrd < rd[i].bufLen ) { //not yet all read
613 rd[i].bufPos += jrd ; //==> prepare for continuation
614 rd[i].bufLen -= jrd ;
615 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
616 } else { //full dataset read
617 rd[i].bufLen = 0 ;
618 rd[i].bufPos = rd[i].fadLen ;
619 if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0]
620 && rd[i].rBuf->B[ rd[i].bufPos ] != stop.B[1]) {
621 gi_ErrCnt[i]++ ;
622 snprintf(str,MXSTR,"wrong end of buffer found %d",rd[i].bufPos);
623 factOut(kError,301, str ) ;
624 goto EndBuf ;
625
626 }
627 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
628
629 //we have a complete buffer, copy to WORK area
630 gi_SecRate[i]++ ;
631
632 roi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 ]) ;
633 //get index into mBuffer for this event (create if needed)
634 evID = mBufEvt( rd[i].evtID, rd[i].runID, roi ) ;
635
636 if (evID <-9000) goto EndBuf ; //illegal event, skip it ...
637 if (evID < 0) {
638 xwait.tv_sec = 0;
639 xwait.tv_nsec= 20000000 ; // sleep for ~20 msec
640 nanosleep( &xwait , NULL ) ;
641 goto EndBuf1 ; //hope there is free space next round
642 }
643
644
645 //we have a valid entry in mBuffer[]; fill it
646
647 boardId = b ;
648 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
649 int fadCrate = fadBoard/256 ;
650 if (boardId != (fadCrate*10 + fadBoard%256) ) {
651 snprintf(str,MXSTR,"wrong Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
652 if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ; //print only few times
653// } else {
654// snprintf(str,MXSTR,"correct Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
655// if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ; //print only few times
656 }
657 if ( mBuffer[evID].board[ boardId ] != -1) {
658 snprintf(str,MXSTR,"double board %d for event %d",boardId,evID) ;
659 factOut(kWarn,501, str ) ;
660 goto EndBuf ; //--> skip Board
661 }
662
663 int iDx = evtIdx[evID] ; //index into evtCtrl
664
665 memcpy( &mBuffer[evID].FADhead[boardId].start_package_flag,
666 &rd[i].rBuf->S[0], head_len) ;
667 roi = mBuffer[evID].nRoi ;
668
669 pixS = boardId*36 -1 ; //
670 tmS = boardId*4 -1 ; //
671 src = head_len/2 ;
672 for ( drs=0; drs<4; drs++ ) {
673 for ( px=0; px<9; px++ ) {
674 pixH= ntohs(rd[i].rBuf->S[src++]) ;
675 pixC= ntohs(rd[i].rBuf->S[src++]) ;
676 pixR= ntohs(rd[i].rBuf->S[src++]) ;
677
678 src++ ;
679 pixS++ ; //pixS = pixH2S[pixH] ;
680 if ( ( px < PX8 && pixR == roi )
681 || ( px ==PX8 && pixR == 2*roi )
682 || ( px ==PX8 && pixR == roi && roi > 512 ) ) {
683 // correct roi
684 mBuffer[evID].fEvent->StartPix[pixS] =pixC;
685 dest= pixS * roi ;
686 memcpy(
687 &mBuffer[evID].fEvent->Adc_Data[dest],
688 &rd[i].rBuf->S[src], roi * 2) ;
689 src+= roi ;
690 if ( px==PX8 ) {
691 tmS++; // tmS = tmH2S[pixH]
692 dest= tmS * roi + NPIX* roi ;
693 if ( roi <=512 ) {
694 mBuffer[evID].fEvent->StartTM[tmS] =(pixC+roi)%1024 ;
695 memcpy(
696 &mBuffer[evID].fEvent->Adc_Data[dest],
697 &rd[i].rBuf->S[src], roi * 2) ;
698 src+=roi ;
699 } else {
700 mBuffer[evID].fEvent->StartTM[tmS] = -1 ;
701 }
702 }
703 } else {
704 snprintf(str,MXSTR,"wrong roi %d %d %d %d",px,pixR,roi,src-2);
705 factOut(kError,202, str ) ;
706 goto EndBuf ;
707 }
708 }
709 }// now we have stored a new board contents into Event structure
710 mBuffer[evID].board[ boardId ] = boardId ;
711 evtCtrl.evtStat[ iDx ]++ ;
712 evtCtrl.pcTime[ iDx ] = g_actTime ;
713
714 if (++mBuffer[evID].nBoard >= actBoards ) {
715 snprintf(str,MXSTR,"%5d complete event %8d %8d %2d",mBuffer[evID].evNum,evtCtrl.evtBuf[iDx],iDx,evtCtrl.evtStat[ iDx ]);
716 factOut(kDebug,-1, str ) ;
717 //complete event read ---> flag for next processing
718 evtCtrl.evtStat[ iDx ] = 99;
719 gi_EvtRead++ ;
720 gi_EvtTot++ ;
721 }
722
723EndBuf:
724 rd[i].bufTyp = 0 ; //ready to read next header
725 rd[i].bufLen = frst_len ;
726 rd[i].bufPos = 0 ;
727EndBuf1:
728 ;
729 }
730
731 } else { //we are reading event header
732 rd[i].bufPos += jrd ;
733 rd[i].bufLen -= jrd ;
734 if ( rd[i].bufPos >= minLen ){ //sufficient data to take action
735 //check if startflag correct; else shift block ....
736 for (k=0; k<rd[i].bufPos -1 ; k++) {
737 if (rd[i].rBuf->B[k ] == start.B[1]
738 && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
739 }
740 rd[i].skip += k ;
741
742 if (k >= rd[i].bufPos-1 ) { //no start of header found
743// snprintf(str,MXSTR,"no start of header on port%d", i ) ;
744// factOut(kWarn,666, str ) ;
745
746 rd[i].bufPos = 0 ;
747 rd[i].bufLen = head_len ;
748 } else if ( k>0 ) {
749 rd[i].bufPos -= k ;
750 rd[i].bufLen += k ;
751 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ;
752 }
753 if ( rd[i].bufPos >= minLen ) {
754 if ( rd[i].skip >0 ) {
755 snprintf(str,MXSTR,"skipped %d bytes on port%d", rd[i].skip, i ) ;
756 factOut(kInfo,666, str ) ;
757 rd[i].skip = 0 ;
758 }
759 goodhed++;
760 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ;
761 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
762 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; //(FADevt)
763 rd[i].ftmID = ntohl(rd[i].rBuf->I[5]) ; //(FTMevt)
764 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ;
765 if (rd[i].runID ==0 ) rd[i].runID = myRun ;
766 rd[i].bufTyp = 1 ; //ready to read full record
767 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
768 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ; //?
769 debugHead(i,rd[i].rBuf);
770 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event
771 } else {
772 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
773 }
774 } else {
775 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
776 }
777
778 } //end interpreting last read
779 } //end of successful read anything
780 } //finished trying to read all sockets
781
782 int qwait=0, qdel=0, qskip=0 ;
783 g_actTime = time(NULL) ;
784 if ( g_actTime > gi_SecTime ) {
785 gi_SecTime = g_actTime ;
786
787
788 //loop over all active events and flag those older than read-timeout
789 //delete those that are written to disk ....
790
791 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
792 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
793
794 int k1=evtCtrl.frstPtr;
795 for ( k=k1; k<(k1+kd); k++ ) {
796 int k0 = k % (MAX_EVT*MAX_RUN) ;
797//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
798 if (evtCtrl.evtStat[k0] > 0
799 && evtCtrl.evtStat[k0] < 90 ) {
800
801 qwait++;
802
803 if( evtCtrl.pcTime[k0] < g_actTime-10 ) {
804 int id =evtCtrl.evtBuf[k0] ;
805 snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]);
806 factOut(kWarn,601, str ) ;
807 evtCtrl.evtStat[k0] = 91 ; //timeout for incomplete events
808 gi_EvtBad++ ;
809 gi_EvtTot++ ;
810 qskip++;
811 }
812
813
814 } else if (evtCtrl.evtStat[k0] >= 900 ) {
815
816 int id =evtCtrl.evtBuf[k0] ;
817 snprintf(str,MXSTR,"%5d free event buffer (written) %3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
818 factOut(kDebug,-1, str ) ;
819 mBufFree(id) ; //event written--> free memory
820 evtCtrl.evtStat[k0] = -1;
821 qdel++;
822 qtot++;
823 }
824
825 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) {
826 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
827 }
828 }
829
830qconn=0 ;
831int ib ;
832for (ib=0; ib<NBOARDS; ib++) qconn+=gi_NumConnect[ib] ;
833int64_t stat[9] ;
834 stat[0]= qwait;
835 stat[1]= qskip;
836 stat[2]= qdel ;
837 stat[3]= qtot ;
838 stat[4]= gi_usedMem ;
839 stat[5]= qread;
840 stat[6]= qconn;
841
842factStat(stat,7);
843qread=0 ;
844 }
845
846
847
848
849 if (numok > 0 ) numok2=0;
850 else if (numok2++ > 3) {
851 if (g_runStat == 1) {
852 xwait.tv_sec = 1;
853 xwait.tv_nsec= 0 ; // hibernate for 1 sec
854 } else {
855 xwait.tv_sec = 0;
856 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
857 }
858 nanosleep( &xwait , NULL ) ;
859 }
860
861 } //and do next loop over all sockets ...
862
863 //must quit eventbuilding
864 snprintf(str,MXSTR,"stop reading ...");
865 factOut(kInfo,-1, str ) ;
866
867 //flag all events as 'read finished'
868 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
869 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
870
871 int k1=evtCtrl.frstPtr;
872
873 for ( k=k1; k<(k1+kd); k++ ) {
874 int k0 = k % (MAX_EVT*MAX_RUN) ;
875 if (evtCtrl.evtStat[k0] > 0
876 && evtCtrl.evtStat[k0] < 90 ) {
877 evtCtrl.evtStat[k0] = 91 ;
878 gi_EvtBad++ ;
879 gi_EvtTot++ ;
880 }
881 }
882
883 //must close all open sockets ...
884 snprintf(str,MXSTR,"close all sockets ...");
885 factOut(kInfo,-1, str ) ;
886 for (i=0; i<MAX_SOCK; i++) {
887 GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy socket
888 gi_NumConnect[ i/7 ]-- ;
889 }
890
891 xwait.tv_sec = 0;
892 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
893 nanosleep( &xwait , NULL ) ;
894 gi_runStat = -11 ; //inform all that no update to happen any more
895
896
897 int minclear = 900 ; //usually wait until writing finished (stat 900)
898 if (g_runStat <-1 ) minclear = 0 ; //in case of abort clear all
899
900
901 //and clear all buffers (might have to wait until all others are done)
902 snprintf(str,MXSTR,"clear all buffers ...");
903 factOut(kInfo,-1, str ) ;
904 int numclear=1 ;
905 while (numclear > 0 ) {
906 numclear = 0 ;
907 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
908 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
909
910 int k1=evtCtrl.frstPtr;
911 for ( k=k1; k<(k1+kd); k++ ) {
912 int k0 = k % (MAX_EVT*MAX_RUN) ;
913 if (evtCtrl.evtStat[k0] > minclear ) {
914 int id =evtCtrl.evtBuf[k0] ;
915 mBufFree(id) ; //event written--> free memory
916 evtCtrl.evtStat[k0] = -1;
917 } else if (evtCtrl.evtStat[k0] > 0) numclear++ ; //writing is still ongoing...
918
919 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 )
920 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
921 }
922
923 xwait.tv_sec = 0;
924 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
925 nanosleep( &xwait , NULL ) ;
926 }
927
928 snprintf(str,MXSTR,"Exit read Process ...");
929 factOut(kInfo,-1, str ) ;
930 gi_runStat = -99 ;
931 return 0;
932
933} /*-----------------------------------------------------------------*/
934
935
936void *procEvt( void *ptr ) {
937/* *** main loop processing file, including SW-trigger */
938 int numProc, numWait ;
939 int k ;
940 struct timespec xwait ;
941 char str[MXSTR] ;
942
943 cpu_set_t mask;
944 int cpu = 5 ; //process thread (will be several in final version)
945
946 snprintf(str,MXSTR,"Starting process-thread");
947 factOut(kInfo,-1, str ) ;
948
949/* CPU_ZERO initializes all the bits in the mask to zero. */
950 CPU_ZERO( &mask );
951/* CPU_SET sets only the bit corresponding to cpu. */
952 CPU_SET( cpu, &mask );
953/* sched_setaffinity returns 0 in success */
954 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
955 snprintf(str,MXSTR,"P ---> can not create affinity to %d",cpu);
956 factOut(kWarn,-1, str ) ;
957 }
958
959
960 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
961
962 numWait = numProc = 0 ;
963 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
964 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
965
966 int k1=evtCtrl.frstPtr;
967 for ( k=k1; k<(k1+kd); k++ ) {
968 int k0 = k % (MAX_EVT*MAX_RUN) ;
969//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
970 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
971 int id = evtCtrl.evtBuf[k0] ;
972 int ievt = mBuffer[id].evNum ;
973 int roi = mBuffer[id].nRoi ;
974// uint32_t irun = mBuffer[id].runNum ;
975//snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ;
976//factOut(kDebug,-1, str ) ;
977
978//make sure unused pixels/tmarks are cleared to zero
979 int ip,it,dest,ib;
980 for (ip=0; ip<NPIX; ip++) {
981 if (mBuffer[id].fEvent->StartPix[ip] == -1 ) {
982 dest= ip*roi ;
983 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
984 }
985 }
986 for (it=0; it<NTMARK; it++) {
987 if (mBuffer[id].fEvent->StartTM[it] == -1 ) {
988 dest= it*roi + NPIX*roi ;
989 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
990 }
991 }
992
993
994//and set correct event header ; also check for consistency in event (not yet)
995 mBuffer[id].fEvent->Roi = roi ;
996 mBuffer[id].fEvent->EventNum = ievt ;
997 mBuffer[id].fEvent->TriggerType = 0 ; // TBD
998 mBuffer[id].fEvent->SoftTrig = 0 ;
999 for (ib=0; ib<NBOARDS; ib++) {
1000 if (mBuffer[id].board[ib] == -1 ) { //board is not read
1001 mBuffer[id].FADhead[ib].start_package_flag = 0 ;
1002 mBuffer[id].fEvent->BoardTime[ib] = 0 ;
1003 } else {
1004 mBuffer[id].fEvent->BoardTime[ib] =
1005 ntohl(mBuffer[id].FADhead[ib].time) ;
1006 }
1007 }
1008
1009 int i=eventCheck(mBuffer[id].FADhead,mBuffer[id].fEvent) ;
1010
1011 if (i<0) evtCtrl.evtStat[k0] = 999 ; //flag event to be skipped
1012
1013 numProc++ ;
1014 evtCtrl.evtStat[k0] = 520 ;
1015 gp_EvtTot++ ;
1016 } else if ( evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] < 90 ) {
1017 numWait++ ;
1018 }
1019 }
1020
1021 if ( gi_runStat < -10 && numWait == 0) { //nothing left to do
1022 snprintf(str,MXSTR,"Exit Processing Process ...");
1023 factOut(kInfo,-1, str ) ;
1024 gp_runStat = -22 ; //==> we should exit
1025 return 0 ;
1026 }
1027
1028 if (numProc == 0) {
1029 //seems we have nothing to do, so sleep a little
1030 xwait.tv_sec = 0;
1031 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1032 nanosleep( &xwait , NULL ) ;
1033 }
1034 gp_runStat = gi_runStat ;
1035
1036 }
1037
1038 //we are asked to abort asap ==> must flag all remaining events
1039 // when gi_runStat claims that all events are in the buffer...
1040
1041 snprintf(str,MXSTR,"Abort Processing Process ...");
1042 factOut(kInfo,-1, str ) ;
1043 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1044 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1045
1046 int k1=evtCtrl.frstPtr;
1047 for ( k=k1; k<(k1+kd); k++ ) {
1048 int k0 = k % (MAX_EVT*MAX_RUN) ;
1049 if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <500) {
1050 evtCtrl.evtStat[k0] = 555 ; //flag event as 'processed'
1051 }
1052 }
1053
1054 gp_runStat = -99 ;
1055
1056 return 0;
1057
1058} /*-----------------------------------------------------------------*/
1059
1060int CloseRunFile(uint32_t runId, uint32_t closeTime) {
1061/* close run runId (all all runs if runId=0) */
1062/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1063 int i, j ;
1064
1065 if (runId == 0 ) {
1066 for ( j=0; j<MAX_RUN; j++) {
1067 if ( runCtrl[j].fileId == 0 ) { //run is open
1068 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1069 if (i<0) {
1070 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
1071 factOut(kError,506, str ) ;
1072 runCtrl[j].fileId = 888 ;
1073 } else {
1074 snprintf(str,MXSTR,"closing run %d ok AAA",runCtrl[j].runId);
1075 factOut(kInfo,507, str ) ;
1076 runCtrl[j].fileId = 7777 ;
1077 }
1078 runCtrl[j].closeTime = closeTime ;
1079 }
1080 }
1081 return 0 ;
1082 }
1083
1084
1085 for ( j=0; j<MAX_RUN; j++) {
1086 if ( runCtrl[j].runId == runId ) {
1087 if ( runCtrl[j].fileId == 0 ) { //run is open
1088 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1089 if (i<0) {
1090 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
1091 factOut(kError,506, str ) ;
1092 runCtrl[j].fileId = 888 ;
1093 } else {
1094 snprintf(str,MXSTR,"closing run %d ok AAA",runCtrl[j].runId);
1095 factOut(kInfo,507, str ) ;
1096 runCtrl[j].fileId = 7777 ;
1097 }
1098 runCtrl[j].closeTime = closeTime ;
1099 return 0;
1100 } else if ( runCtrl[j].fileId <0 ) { //run not yet opened
1101 runCtrl[j].closeTime = closeTime ;
1102 return +1;
1103 } else { // run already closed
1104 return +2;
1105 }
1106 }
1107 } //we only reach here if the run was never created
1108 return -1;
1109
1110} /*-----------------------------------------------------------------*/
1111
1112
1113void *writeEvt( void *ptr ) {
1114/* *** main loop writing event (including opening and closing run-files */
1115
1116 int numWrite, numWait ;
1117 int k,j ;
1118 struct timespec xwait ;
1119 char str[MXSTR] ;
1120
1121 cpu_set_t mask;
1122 int cpu = 3 ; //write thread
1123
1124 snprintf(str,MXSTR,"Starting write-thread");
1125 factOut(kInfo,-1, str ) ;
1126
1127/* CPU_ZERO initializes all the bits in the mask to zero. */
1128 CPU_ZERO( &mask );
1129/* CPU_SET sets only the bit corresponding to cpu. */
1130 CPU_SET( cpu, &mask );
1131/* sched_setaffinity returns 0 in success */
1132 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1133 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
1134 }
1135
1136 int lastRun = 0 ; //usually run from last event still valid
1137
1138 while (g_runStat >-2) {
1139
1140 numWait = numWrite = 0 ;
1141 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1142 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1143
1144 int k1=evtCtrl.frstPtr;
1145 for ( k=k1; k<(k1+kd); k++ ) {
1146 int k0 = k % (MAX_EVT*MAX_RUN) ;
1147//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1148 if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] < 900) {
1149 int id = evtCtrl.evtBuf[k0] ;
1150 uint32_t irun = mBuffer[id].runNum ;
1151 int ievt = mBuffer[id].evNum ;
1152
1153 if (runCtrl[lastRun].runId == irun ) {
1154 j = lastRun ;
1155 } else {
1156 //check which fileID to use (or open if needed)
1157 for ( j=0; j<MAX_RUN; j++) {
1158 if ( runCtrl[j].runId == irun ) break ;
1159 }
1160 if ( j >= MAX_RUN ) {
1161 snprintf(str,MXSTR,"W error: can not find run %d for event %d in %d", irun,ievt,id);
1162 factOut(kFatal,901, str ) ;
1163for ( j=0; j<MAX_RUN; j++) printf("j %d run.j %d run %d\n",j,runCtrl[j].runId,irun );
1164exit(111);
1165 }
1166 lastRun = j ;
1167 }
1168
1169 if (runCtrl[j].fileId < 0 ) {
1170 actRun.Version = 1 ;
1171 actRun.RunType = -1 ;
1172 actRun.NBoard = NBOARDS ;
1173 actRun.NPix = NPIX ;
1174 actRun.NTm = NTMARK ;
1175 actRun.Nroi = mBuffer[id].nRoi ;
1176// actRun.FADhead = mBuffer[id].FADhead ; //to be corrected
1177 runCtrl[j].nextEvt= 0;
1178 runCtrl[j].lastTime=g_actTime ;
1179 runCtrl[j].fileHd = runOpen(irun, &actRun, sizeof(actRun) ) ;
1180 if (runCtrl[j].fileHd == NULL ) {
1181 snprintf(str,MXSTR,"W could not open a file for run %d",irun);
1182 factOut(kError,502, str ) ;
1183 runCtrl[j].fileId = 99 ;
1184 } else {
1185 snprintf(str,MXSTR,"W opened new run_file %d",irun) ;
1186 factOut(kInfo,-1, str ) ;
1187 runCtrl[j].fileId = 0 ;
1188 }
1189
1190 }
1191
1192 if (runCtrl[j].fileId > 0 ) {
1193 snprintf(str,MXSTR,"W no open file for this run %d",irun) ;
1194 factOut(kDebug,123,str) ;
1195 evtCtrl.evtStat[k0] = 902 ;
1196 } else {
1197 int i=runWrite(runCtrl[j].fileHd, mBuffer[id].fEvent, sizeof(mBuffer[id]) );
1198 if (i<0) {
1199 snprintf(str,MXSTR,"W error writing event for run %d",irun) ;
1200 factOut(kError,503, str ) ;
1201 evtCtrl.evtStat[k0] = 901 ;
1202 //close run
1203 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1204 if (i<0) {
1205 snprintf(str,MXSTR,"W error closing run %d",irun) ;
1206 factOut(kError,503, str ) ;
1207 } else {
1208 snprintf(str,MXSTR,"W closed run %d because of write error",irun) ;
1209 factOut(kInfo,503, str ) ;
1210 }
1211 runCtrl[j].fileId = 9999 ;
1212 } else {
1213 runCtrl[j].lastTime = g_actTime;
1214 evtCtrl.evtStat[k0] = 901 ;
1215 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0);
1216 factOut(kDebug,504, str ) ;
1217 }
1218 }
1219 } else if (evtCtrl.evtStat[k0] > 0 ) numWait++ ;
1220 }
1221
1222 //check if we should close a run ...
1223 for ( j=0; j<MAX_RUN; j++) {
1224 if ( runCtrl[j].fileId==0
1225 && ( runCtrl[j].closeTime < g_actTime
1226 ||runCtrl[j].lastTime < g_actTime-120) ) {
1227 int i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1228 if (i<0) {
1229 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
1230 factOut(kError,506, str ) ;
1231 runCtrl[j].fileId = 888 ;
1232 } else {
1233 snprintf(str,MXSTR,"closing run %d ok BBB",runCtrl[j].runId);
1234 factOut(kInfo,507, str ) ;
1235 runCtrl[j].fileId = 7777 ;
1236 }
1237 }
1238 }
1239
1240 if (numWrite == 0) {
1241 //seems we have nothing to do, so sleep a little
1242 xwait.tv_sec = 0;
1243 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1244 nanosleep( &xwait , NULL ) ;
1245 }
1246
1247 if ( gi_runStat < -10 && numWait == 0) { //nothing left to do
1248 snprintf(str,MXSTR,"Finish Write Process ...");
1249 factOut(kInfo,-1, str ) ;
1250 gw_runStat = -22 ; //==> we should exit
1251 goto closerun ;
1252 }
1253 gw_runStat = gi_runStat ;
1254
1255 }
1256
1257 //must close all open files ....
1258 snprintf(str,MXSTR,"Abort Writing Process ...");
1259 factOut(kInfo,-1, str ) ;
1260closerun:
1261 snprintf(str,MXSTR,"Close all open files ...");
1262 factOut(kInfo,-1, str ) ;
1263 for ( j=0; j<MAX_RUN; j++)
1264 if ( runCtrl[j].fileId ==0 ) {
1265 int i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1266 if (i<0) {
1267 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
1268 factOut(kError,506, str ) ;
1269 runCtrl[j].fileId = 888 ;
1270 } else {
1271 snprintf(str,MXSTR,"closing run %d ok AAA",runCtrl[j].runId);
1272 factOut(kInfo,507, str ) ;
1273 runCtrl[j].fileId = 7777 ;
1274 }
1275 }
1276
1277 gw_runStat = -99;
1278 snprintf(str,MXSTR,"Exit Writing Process ...");
1279 factOut(kInfo,-1, str ) ;
1280 return 0;
1281
1282
1283
1284
1285} /*-----------------------------------------------------------------*/
1286
1287
1288
1289
1290void StartEvtBuild() {
1291
1292 int i,j,imax,status,th_ret[50] ;
1293 pthread_t thread[50] ;
1294 struct timespec xwait ;
1295 uint32_t actime ;
1296
1297 gi_runStat = gp_runStat = gw_runStat = 0 ;
1298
1299 snprintf(str,MXSTR,"Starting EventBuilder");
1300 factOut(kInfo,-1, str ) ;
1301
1302
1303 evtCtrl.frstPtr = 0 ;
1304 evtCtrl.lastPtr = 0 ;
1305
1306 actime = g_actTime + 50000000 ;
1307/* initialize run control logics */
1308 for (i=0; i<MAX_RUN; i++) {
1309 runCtrl[i].runId = 0 ;
1310 runCtrl[i].lastTime = 0 ;
1311 runCtrl[i].closeTime = time(NULL) + 3600*24*7;
1312
1313 runCtrl[i].nextEvt = 0 ;
1314 runCtrl[i].fileId = -2 ;
1315
1316 runTail[i].nEventsOk =
1317 runTail[i].nEventsRej =
1318 runTail[i].nEventsBad =
1319 runTail[i].PCtime0 =
1320 runTail[i].PCtimeX = 0 ;
1321 }
1322
1323//start all threads (more to come) when we are allowed to ....
1324 while (g_runStat == 0 ) {
1325 xwait.tv_sec = 0;
1326 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1327 nanosleep( &xwait , NULL ) ;
1328 }
1329
1330 i=0 ;
1331 th_ret[i] = pthread_create( &thread[i], NULL, readFAD, NULL );
1332 i++;
1333 th_ret[i] = pthread_create( &thread[i], NULL, procEvt, NULL );
1334 i++;
1335 th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, NULL );
1336 i++;
1337 imax=i ;
1338
1339
1340
1341
1342
1343
1344 xwait.tv_sec = 20;;
1345 xwait.tv_nsec= 0 ; // sleep for ~20sec
1346 nanosleep( &xwait , NULL ) ;
1347
1348
1349 printf("close all runs in 2 seconds\n");
1350
1351
1352 CloseRunFile( 0, time(NULL)+2) ;
1353
1354 xwait.tv_sec = 5;;
1355 xwait.tv_nsec= 0 ; // sleep for ~20sec
1356 nanosleep( &xwait , NULL ) ;
1357
1358 printf("setting g_runstat to -1\n");
1359
1360 g_runStat = -1 ;
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373//wait for all threads to finish
1374 for (i=0; i<imax; i++) {
1375 j = pthread_join ( thread[i], (void **)&status) ;
1376 }
1377
1378} /*-----------------------------------------------------------------*/
1379
1380
1381
1382
1383
1384 /*-----------------------------------------------------------------*/
1385 /*-----------------------------------------------------------------*/
1386 /*-----------------------------------------------------------------*/
1387 /*-----------------------------------------------------------------*/
1388 /*-----------------------------------------------------------------*/
1389 /*-----------------------------------------------------------------*/
1390 /*-----------------------------------------------------------------*/
1391 /*-----------------------------------------------------------------*/
1392 /*-----------------------------------------------------------------*/
1393 /*-----------------------------------------------------------------*/
1394
1395
1396/*
1397
1398FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len )
1399{ return 1; } ;
1400
1401int runWrite(FileHandle_t fileHd , EVENT *event, size_t len )
1402{ return 1; } ;
1403
1404int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len )
1405{ return 1; } ;
1406
1407
1408
1409int eventCheck( PEVNT_HEADER *fadhd, EVENT *event)
1410{
1411 int i=0;
1412
1413 printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
1414 for (i=0; i<NBOARDS; i++) {
1415 printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
1416 }
1417 return 0;
1418}
1419
1420
1421void factStat(int64_t *array, int len ) {
1422 printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
1423 array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
1424}
1425
1426
1427void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) {
1428// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
1429}
1430
1431
1432
1433void debugStream(int isock, void *buf, int len) {
1434}
1435
1436void debugHead(int i, void *buf) {
1437}
1438
1439
1440void factOut(int severity, int err, char* message ) {
1441static FILE * fd ;
1442static int file=0 ;
1443
1444 if (file==0) {
1445 printf("open file\n");
1446 fd=fopen("x.out","w+") ;
1447 file=999;
1448 }
1449
1450 fprintf(fd,"%3d %3d | %s \n",severity,err,message) ;
1451
1452 if (severity != kDebug)
1453 printf("%3d %3d | %s\n",severity,err,message) ;
1454}
1455
1456
1457
1458int main() {
1459 int i,b,c,p ;
1460 char ipStr[100] ;
1461 struct in_addr IPaddr ;
1462
1463 g_maxMem = 1024*1024 ; //MBytes
1464 g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
1465
1466
1467 g_runStat = 40 ;
1468
1469 i=0 ;
1470
1471// version for standard crates
1472//for (c=0; c<4,c++) {
1473// for (b=0; b<10; b++) {
1474// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
1475//
1476// inet_pton(PF_INET, ipStr, &IPaddr) ;
1477//
1478// g_port[i].sockAddr.sin_family = PF_INET;
1479// g_port[i].sockAddr.sin_port = htons(5000) ;
1480// g_port[i].sockAddr.sin_addr = IPaddr ;
1481// g_port[i].sockDef = 1 ;
1482// i++ ;
1483// }
1484//}
1485//
1486//version for PC-test
1487 for (c=0; c<4; c++) {
1488 for (b=0; b<10; b++) {
1489 sprintf(ipStr,"10.0.%d.11",128+c) ;
1490 if (c==0) sprintf(ipStr,"10.0.100.11") ;
1491
1492 inet_pton(PF_INET, ipStr, &IPaddr) ;
1493 p = 31919+100*c+10*b;
1494
1495
1496 g_port[i].sockAddr.sin_family = PF_INET;
1497 g_port[i].sockAddr.sin_port = htons(p) ;
1498 g_port[i].sockAddr.sin_addr = IPaddr ;
1499 g_port[i].sockDef = 1 ;
1500
1501 i++ ;
1502 }
1503 }
1504
1505
1506//g_port[17].sockDef =-1 ;
1507//g_actBoards-- ;
1508
1509 StartEvtBuild() ;
1510
1511 return 0;
1512
1513}
1514*/
Note: See TracBrowser for help on using the repository browser.