source: trunk/FACT++/src/EventBuilder.c@ 11191

Last change on this file since 11191 was 11191, checked in by tbretz, 13 years ago
Workaroudn for a problem with negative gi_NumConnects
File size: 45.2 KB
Line 
1
2
3#define PX8 99 //simulator does not create double-length roi for pixel 8
4 //for real data, set PX8 = 8 ==> ask for double roi=TM
5
6
7
8#include <stdlib.h>
9#include <stdint.h>
10#include <unistd.h>
11#include <stdio.h>
12#include <sys/time.h>
13#include <arpa/inet.h>
14#include <string.h>
15#include <math.h>
16#include <error.h>
17#include <errno.h>
18#include <unistd.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <pthread.h>
22#include <sched.h>
23
24#include "EventBuilder.h"
25
26enum Severity
27{
28 kMessage = 10, ///< Just a message, usually obsolete
29 kInfo = 20, ///< An info telling something which can be interesting to know
30 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
31 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
32 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
33 kDebug = 99, ///< A message used for debugging only
34};
35
36#define MIN_LEN 32 // min #bytes needed to interpret FADheader
37#define MAX_LEN 64*1024 // size of read-buffer per socket
38
39extern FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) ;
40extern int runWrite(FileHandle_t fileHd , EVENT *event, size_t len ) ;
41extern int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len ) ;
42extern void factOut(int severity, int err, char* message ) ;
43extern void factStat(int64_t *array , int len ) ;
44
45extern int eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ;
46
47
48extern void debugHead(int i, void *buf);
49
50extern void debugRead(int isock, int ibyte, int32_t event,int32_t ftmevt, int32_t runnr, int state,
51 uint32_t tsec, uint32_t tusec ) ;
52extern void debugStream(int isock, void *buf, int len) ;
53
54
55
56int g_actTime = 0 ;
57int g_runStat = 40 ;
58size_t g_maxMem ; //maximum memory allowed for buffer
59
60//no longer needed ...
61 int g_maxBoards ; //maximum number of boards to be initialized
62 int g_actBoards ;
63//
64
65FACT_SOCK g_port[NBOARDS] ; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
66
67
68int gi_runStat ;
69int gp_runStat ;
70int gw_runStat ;
71
72
73uint gi_SecRate[MAX_SOCK] ;
74uint gi_S10Rate[MAX_SOCK] ;
75uint gi_MinRate[MAX_SOCK] ;
76uint gi_ErrCnt[MAX_SOCK] ;
77
78uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
79
80uint gi_SecTime, gi_S10Time, gi_MinTime ;
81uint gi_EvtStart= 0 ;
82uint gi_EvtRead = 0 ;
83uint gi_EvtBad = 0 ;
84uint gi_EvtTot = 0 ;
85size_t gi_usedMem = 0 ;
86
87uint gw_EvtTot = 0 ;
88uint gp_EvtTot = 0 ;
89
90PIX_MAP g_pixMap[NPIX] ;
91
92EVT_CTRL evtCtrl ; //control of events during processing
93int evtIdx[MAX_EVT*MAX_RUN] ; //index from mBuffer to evtCtrl
94
95WRK_DATA mBuffer[MAX_EVT*MAX_RUN]; //local working space
96
97
98
99
100RUN_HEAD actRun ;
101
102RUN_CTRL runCtrl[MAX_RUN] ;
103
104RUN_TAIL runTail[MAX_RUN] ;
105
106
107/*
108*** Definition of rdBuffer to read in IP packets; keep it global !!!!
109 */
110
111
112typedef union {
113 int8_t B[MAX_LEN/8];
114 int16_t S[MAX_LEN/4];
115 int32_t I[MAX_LEN/2];
116 int64_t L[MAX_LEN ];
117} CNV_FACT ;
118
119typedef struct {
120 int bufTyp ; //what are we reading at the moment: 0=header 1=data -1=skip ...
121 int32_t bufPos ; //next byte to read to the buffer next
122 int32_t bufLen ; //number of bytes left to read
123 int32_t skip ; //number of bytes skipped before start of event
124
125 int sockStat ; //-1 if socket not yet connected , 99 if not exist
126 int socket ; //contains the sockets
127 struct sockaddr_in SockAddr ; //IP for each socket
128
129 int evtID ; // event ID of event currently read
130 int runID ; // run "
131 int ftmID ; // event ID from FTM
132 uint fadLen ; // FADlength of event currently read
133 int fadVers ; // Version of FAD
134 int board ; // boardID (softwareID: 0..40 )
135 int Port ;
136
137 CNV_FACT *rBuf ;
138
139} READ_STRUCT ;
140
141
142typedef union {
143 int8_t B[2];
144 int16_t S ;
145} SHORT_BYTE ;
146
147
148
149
150
151#define MXSTR 1000
152char str[MXSTR] ;
153
154SHORT_BYTE start, stop;
155
156READ_STRUCT rd[MAX_SOCK] ; //buffer to read IP and afterwards store in mBuffer
157
158
159
160/*-----------------------------------------------------------------*/
161
162
163/*-----------------------------------------------------------------*/
164
165
166
167int GenSock(int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) {
168/*
169*** generate Address, create sockets and allocates readbuffer for it
170***
171*** if flag==0 generate socket and buffer
172*** <0 destroy socket and buffer
173*** >0 close and redo socket
174***
175*** sid : board*7 + port id
176 */
177
178 int j ;
179
180 if (rd->sockStat ==0 ) { //close socket if open
181 j=close(rd->socket) ;
182 if (j>0) {
183 snprintf(str,MXSTR,"Error closing socket %d | %m",sid);
184 factOut(kFatal,771, str ) ;
185 } else {
186 snprintf(str,MXSTR,"Succesfully closed socket %d",sid);
187 factOut(kInfo,771, str ) ;
188 }
189 }
190
191
192 if (flag < 0) {
193 free(rd->rBuf) ; //and never open again
194 rd->rBuf = NULL ;
195 rd->sockStat = 99 ;
196 return 0 ;
197 }
198
199
200 if (flag == 0) { //generate address and buffer ...
201 rd->Port = port ;
202 rd->SockAddr.sin_family = sockAddr->sin_family;
203 rd->SockAddr.sin_port = htons(port) ;
204 rd->SockAddr.sin_addr = sockAddr->sin_addr ;
205
206 rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
207 if ( rd->rBuf == NULL ) {
208 snprintf(str,MXSTR,"Could not create local buffer %d",sid);
209 factOut(kFatal,774, str ) ;
210 rd->sockStat = 77 ;
211 return -3 ;
212 }
213 }
214
215
216 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
217 snprintf(str,MXSTR,"Could not generate socket %d | %m",sid);
218 factOut(kFatal,773, str ) ;
219 rd->sockStat = 88 ;
220 return -2 ;
221 }
222
223 snprintf(str,MXSTR,"Successfully generated socket %d ",sid);
224 factOut(kInfo,773, str ) ;
225 rd->sockStat = -1 ; //try to (re)open socket
226 return 0 ;
227
228} /*-----------------------------------------------------------------*/
229
230 /*-----------------------------------------------------------------*/
231
232
233
234
235int mBufInit() {
236// initialize mBuffer (mark all entries as unused\empty)
237
238 int i ;
239 uint32_t actime ;
240
241 actime = g_actTime + 50000000 ;
242
243 for (i=0; i<MAX_EVT*MAX_RUN; i++) {
244 mBuffer[i].evNum = mBuffer[i].runNum = -1;
245
246 evtCtrl.evtBuf[ i] = -1 ;
247 evtCtrl.evtStat[ i] = -1 ;
248 evtCtrl.pcTime[ i] = actime ; //initiate to far future
249
250 }
251
252
253 actRun.FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ;
254
255 evtCtrl.frstPtr = 0 ;
256 evtCtrl.lastPtr = 0 ;
257
258 return 0 ;
259
260} /*-----------------------------------------------------------------*/
261
262
263
264
265int mBufEvt(uint evID, uint runID, uint nRoi) {
266// generate a new Event into mBuffer:
267// make sure only complete Event are possible, so 'free' will always work
268// returns index into mBuffer[], or negative value in case of error
269
270
271 int i, k, evFree ;
272 int headmem=0 ;
273 size_t needmem = 0 ;
274
275 if (nRoi <=0 || nRoi > 1024) {
276 snprintf(str,MXSTR,"illegal nRoi: %d",nRoi) ;
277 factOut(kError, 1, str ) ;
278 return -9999 ;
279 }
280
281 i = evID % MAX_EVT ;
282 evFree = -1 ;
283
284 for ( k=0; k<MAX_RUN; k++) {
285 if ( mBuffer[i].evNum == evID
286 && mBuffer[i].runNum== runID ) {
287 return i ;
288 }
289 if ( evFree < 0 && mBuffer[i].evNum < 0 ) evFree = i ;
290 i += MAX_EVT ;
291 }
292
293
294 //event does not yet exist; create
295 if (evFree < 0 ) { //no space available in ctrl
296 snprintf(str,MXSTR,"no control slot to keep event %d",evID) ;
297 factOut(kError,881, str ) ;
298 return -1 ;
299 }
300 i = evFree ; //found free entry; use it ...
301
302
303 needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2; //
304
305 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
306
307 if ( gi_usedMem + needmem + headmem > g_maxMem) {
308 snprintf(str,MXSTR,"no memory left to keep event %d",evID) ;
309 factOut(kError,882, str ) ;
310 return -11 ;
311 }
312
313 mBuffer[i].FADhead = malloc( headmem ) ;
314 if (mBuffer[i].FADhead == NULL) {
315 snprintf(str,MXSTR,"malloc header failed for event %d",evID) ;
316 factOut(kError,882, str ) ;
317 return -12;
318 }
319
320 mBuffer[i].fEvent = malloc( needmem ) ;
321 if (mBuffer[i].fEvent == NULL) {
322 snprintf(str,MXSTR,"malloc data failed for event %d",evID) ;
323 factOut(kError,882, str ) ;
324 free(mBuffer[i].fEvent) ;
325 mBuffer[i].fEvent = NULL ;
326 return -22;
327 }
328
329 //flag all boards as unused
330 mBuffer[i].nBoard = 0 ;
331 for (k=0; k<NBOARDS; k++ ) {
332 mBuffer[i].board[k] = -1;
333 }
334
335 //flag all pixels as unused
336 for (k=0; k<NPIX; k++ ) {
337 mBuffer[i].fEvent->StartPix[k] = -1 ;
338 }
339
340 //flag all TMark as unused
341 for (k=0; k<NTMARK; k++ ) {
342 mBuffer[i].fEvent->StartTM[k] = -1 ;
343 }
344
345 mBuffer[i].pcTime = g_actTime ;
346 mBuffer[i].nRoi = nRoi ;
347 mBuffer[i].evNum = evID ;
348 mBuffer[i].runNum = runID ;
349 mBuffer[i].evtLen = needmem ;
350
351 gi_usedMem += needmem + headmem;
352
353 //register event in 'active list (reading)'
354
355 evtCtrl.evtBuf[ evtCtrl.lastPtr] = i ;
356 evtCtrl.evtStat[ evtCtrl.lastPtr] = 0 ;
357 evtCtrl.pcTime[ evtCtrl.lastPtr] = g_actTime ;
358 evtIdx[i] = evtCtrl.lastPtr ;
359snprintf(str,MXSTR,"%5d start new evt %8d %8d %2d",evID,i,evtCtrl.lastPtr,0);
360factOut(kDebug,-11, str ) ;
361 evtCtrl.lastPtr++ ;
362 if (evtCtrl.lastPtr == MAX_EVT*MAX_RUN ) evtCtrl.lastPtr = 0;
363
364
365
366
367 gi_EvtStart++ ;
368
369 //check if runId already registered in runCtrl
370 evFree = -1 ;
371 for (k=0; k<MAX_RUN; k++) {
372 if (runCtrl[k].runId == runID ) return i ;//run exists already
373 else if (evFree < 0 && runCtrl[k].runId == 0 ) evFree = k ;
374 }
375
376 if (evFree <0 ) {
377 snprintf(str,MXSTR,"not able to register the new run %d",runID);
378 factOut(kFatal,883, str ) ;
379 } else {
380 runCtrl[evFree].runId = runID ;
381 }
382
383 return i ;
384
385} /*-----------------------------------------------------------------*/
386
387
388int mBufFree(int i) {
389//delete entry [i] from mBuffer:
390//(and make sure multiple calls do no harm ....)
391
392 int headmem=0 ;
393 size_t freemem = 0 ;
394
395 if ( mBuffer[i].nRoi > 0) { //have an fEvent structure generated ...
396 freemem = mBuffer[i].evtLen ;
397 free(mBuffer[i].fEvent ) ;
398 mBuffer[i].fEvent = NULL ;
399
400 free(mBuffer[i].FADhead ) ;
401 mBuffer[i].FADhead = NULL ;
402
403 }
404 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
405 mBuffer[i].evNum = mBuffer[i].runNum = mBuffer[i].nRoi= -1;
406
407 gi_usedMem = gi_usedMem - freemem - headmem;
408
409
410 return 0 ;
411
412} /*-----------------------------------------------------------------*/
413
414
415 /*-----------------------------------------------------------------*/
416
417
418
419void initReadFAD() {
420return ;
421} /*-----------------------------------------------------------------*/
422
423
424
425void *readFAD( void *ptr ) {
426/* *** main loop reading FAD data and sorting them to complete events */
427 int head_len,frst_len,numok,numok2,dest,evID,i,k ;
428 int actBoards = 0, minLen ;
429 int32_t jrd ;
430 int32_t myRun ;
431 int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ;
432 uint qtot = 0, qread = 0, qconn = 0 ;
433 int errcnt0 = 0 ;
434
435 int goodhed=0;
436
437 struct timespec xwait ;
438
439 int nokCnt[MAX_SOCK],loopCnt=0;
440 int sokCnt[MAX_SOCK];
441 int sockDef[NBOARDS];
442
443 struct timeval *tv, atv;
444 tv=&atv;
445 uint32_t tsec, tusec ;
446
447
448 snprintf(str,MXSTR,"start initializing");
449 factOut(kInfo,-1, str ) ;
450
451 int cpu = 7 ; //read thread
452 cpu_set_t mask;
453
454/* CPU_ZERO initializes all the bits in the mask to zero. */
455 CPU_ZERO( &mask );
456/* CPU_SET sets only the bit corresponding to cpu. */
457 cpu = 7 ;
458 CPU_SET( cpu, &mask );
459
460/* sched_setaffinity returns 0 in success */
461 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
462 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
463 factOut(kWarn,-1, str ) ;
464 }
465
466
467 //make sure all sockets are preallocated as 'not exist'
468 for (i=0; i<MAX_SOCK; i++) {
469 rd[i].socket = -1 ;
470 rd[i].sockStat = 99 ;
471 }
472 for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ;
473
474
475 g_actTime = time(NULL) ;
476 for (k=0; k<MAX_SOCK; k++)
477 gi_SecRate[k]=gi_S10Rate[k]=gi_MinRate[k]=gi_ErrCnt[k] = 0 ;
478
479 for (k=0; k<NBOARDS; k++)
480 gi_NumConnect[k]=0;
481
482
483 gi_SecTime= gi_S10Time= gi_MinTime= g_actTime ;
484
485 mBufInit() ; //initialize buffers
486
487 snprintf(str,MXSTR,"end initializing");
488 factOut(kInfo,-1, str ) ;
489
490
491 for (k=0; k<MAX_SOCK; k++) sokCnt[k]=nokCnt[k]=0 ;
492
493 head_len = sizeof(PEVNT_HEADER) ;
494//frst_len = head_len + 36 * 12 ; //fad_header plus 36*pix_header
495 frst_len = head_len ; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
496
497//minLen = MIN_LEN ; //min #bytes needed to check header
498 minLen = head_len ; //min #bytes needed to check header: full header for debug
499
500 numok = numok2 = 0 ;
501
502 start.S=0xFB01;
503 stop.S= 0x04FE;
504
505 myRun = g_actTime ;
506
507 gi_runStat = g_runStat ;
508
509
510 while (g_runStat >=0) { //loop until global variable g_runStat claims stop
511
512 gi_runStat = g_runStat ;
513
514 int b,p,p0,s0,nch;
515 nch = 0 ;
516 for (b=0; b<NBOARDS; b++ ) {
517 k = b*7 ;
518 if ( g_port[b].sockDef != sockDef[b] ) { //something has changed ...
519 nch++ ;
520 gi_NumConnect[ b ] = 0 ; //must close all connections
521 if ( sockDef[b] == 0) s0= 0 ; //sockets to be defined and opened
522 else if (g_port[b].sockDef == 0) s0=-1 ; //sockets to be destroyed
523 else s0=+1 ; //sockets to be closed and reopened
524
525 if (s0 == 0) p0=ntohs(g_port[b].sockAddr.sin_port);
526 else p0=0 ;
527
528 for (p=p0+1; p<p0+8; p++) {
529 GenSock(s0, k, p, &g_port[b].sockAddr, &rd[k]) ; //generate address and socket
530 k++ ;
531 }
532 sockDef[b] = g_port[b].sockDef ;
533 }
534 }
535
536 if (nch > 0 ) {
537 actBoards = 0 ;
538 for (b=0; b<NBOARDS; b++ ) {
539 if ( sockDef[b] > 0 ) actBoards++ ;
540 }
541 }
542
543
544 g_actTime = time(NULL) ;
545 nokCnt[numok]++;
546
547 loopCnt++ ;
548
549 numok = 0 ; //count number of succesfull actions
550
551 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read
552 b = i / 7 ;
553 if (sockDef[b] > 0) s0=+1 ;
554 else s0=-1 ;
555
556 gettimeofday( tv, NULL);
557 tsec = atv.tv_sec ;
558 tusec= atv.tv_usec ;
559
560 if (rd[i].sockStat <0 ) { //try to connect if not yet done
561 rd[i].sockStat=connect(rd[i].socket,
562 (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ;
563 if (rd[i].sockStat ==0 ) { //successfull ==>
564 if (sockDef[b] > 0) {
565 rd[i].bufTyp = 0 ; // expect a header
566 rd[i].bufLen = frst_len ; // max size to read at begining
567 } else {
568 rd[i].bufTyp = -1 ; // full data to be skipped
569 rd[i].bufLen = sizeof(CNV_FACT) ; //huge for skipping
570 }
571 rd[i].bufPos = 0 ; // no byte read so far
572 rd[i].skip = 0 ; // start empty
573 gi_NumConnect[ b ]++ ;
574 numok++ ; //make sure next round will execute
575 snprintf(str,MXSTR,"+++connect %d %d",b,gi_NumConnect[ b ]);
576 factOut(kInfo,-1, str ) ;
577 }
578 }
579
580 if (rd[i].sockStat ==0 ) { //we have a connection ==> try to read
581 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
582 numok++ ;
583 sokCnt[i]++;
584 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
585
586 if (jrd >0 ) {
587 qread+=jrd ;
588 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ;
589 }
590
591 if (jrd == 0) { //connection has closed ...
592 snprintf(str,MXSTR,"Socket %d closed by FAD",i);
593 factOut(kInfo,441, str ) ;
594 GenSock(s0, i, 0,NULL, &rd[i]) ;
595 gi_ErrCnt[i]++ ;
596 gi_NumConnect[ b ]-- ;
597
598 } else if ( jrd<0 ) { //did not read anything
599 if (errno != EAGAIN && errno != EWOULDBLOCK ) {
600 snprintf(str,MXSTR,"Error Reading from %d | %m",i);
601 factOut(kError,442, str ) ;
602 gi_ErrCnt[i]++ ;
603 } else numok-- ; //else nothing waiting to be read
604 jrd = 0 ;
605 }
606 } else jrd = 0 ; //did read nothing as requested
607
608 if ( rd[i].bufTyp <0 ) { // we are skipping this board ...
609// just do nothing
610
611 } else if ( rd[i].bufTyp >0 ) { // we are reading data ...
612 if ( jrd < rd[i].bufLen ) { //not yet all read
613 rd[i].bufPos += jrd ; //==> prepare for continuation
614 rd[i].bufLen -= jrd ;
615 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
616 } else { //full dataset read
617 rd[i].bufLen = 0 ;
618 rd[i].bufPos = rd[i].fadLen ;
619 if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0]
620 && rd[i].rBuf->B[ rd[i].bufPos ] != stop.B[1]) {
621 gi_ErrCnt[i]++ ;
622 snprintf(str,MXSTR,"wrong end of buffer found %d",rd[i].bufPos);
623 factOut(kError,301, str ) ;
624 goto EndBuf ;
625
626 }
627 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
628
629 //we have a complete buffer, copy to WORK area
630 gi_SecRate[i]++ ;
631
632 roi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 ]) ;
633 //get index into mBuffer for this event (create if needed)
634 evID = mBufEvt( rd[i].evtID, rd[i].runID, roi ) ;
635
636 if (evID <-9000) goto EndBuf ; //illegal event, skip it ...
637 if (evID < 0) {
638 xwait.tv_sec = 0;
639 xwait.tv_nsec= 20000000 ; // sleep for ~20 msec
640 nanosleep( &xwait , NULL ) ;
641 goto EndBuf1 ; //hope there is free space next round
642 }
643
644
645 //we have a valid entry in mBuffer[]; fill it
646
647 boardId = b ;
648 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
649 int fadCrate = fadBoard/256 ;
650 if (boardId != (fadCrate*10 + fadBoard%256) ) {
651 snprintf(str,MXSTR,"wrong Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
652 if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ; //print only few times
653// } else {
654// snprintf(str,MXSTR,"correct Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
655// if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ; //print only few times
656 }
657 if ( mBuffer[evID].board[ boardId ] != -1) {
658 snprintf(str,MXSTR,"double board %d for event %d",boardId,evID) ;
659 factOut(kWarn,501, str ) ;
660 goto EndBuf ; //--> skip Board
661 }
662
663 int iDx = evtIdx[evID] ; //index into evtCtrl
664
665 memcpy( &mBuffer[evID].FADhead[boardId].start_package_flag,
666 &rd[i].rBuf->S[0], head_len) ;
667 roi = mBuffer[evID].nRoi ;
668
669 pixS = boardId*36 -1 ; //
670 tmS = boardId*4 -1 ; //
671 src = head_len/2 ;
672 for ( drs=0; drs<4; drs++ ) {
673 for ( px=0; px<9; px++ ) {
674 pixH= ntohs(rd[i].rBuf->S[src++]) ;
675 pixC= ntohs(rd[i].rBuf->S[src++]) ;
676 pixR= ntohs(rd[i].rBuf->S[src++]) ;
677
678 src++ ;
679 pixS++ ; //pixS = pixH2S[pixH] ;
680 if ( ( px < PX8 && pixR == roi )
681 || ( px ==PX8 && pixR == 2*roi )
682 || ( px ==PX8 && pixR == roi && roi > 512 ) ) {
683 // correct roi
684 mBuffer[evID].fEvent->StartPix[pixS] =pixC;
685 dest= pixS * roi ;
686 memcpy(
687 &mBuffer[evID].fEvent->Adc_Data[dest],
688 &rd[i].rBuf->S[src], roi * 2) ;
689 src+= roi ;
690 if ( px==PX8 ) {
691 tmS++; // tmS = tmH2S[pixH]
692 dest= tmS * roi + NPIX* roi ;
693 if ( roi <=512 ) {
694 mBuffer[evID].fEvent->StartTM[tmS] =(pixC+roi)%1024 ;
695 memcpy(
696 &mBuffer[evID].fEvent->Adc_Data[dest],
697 &rd[i].rBuf->S[src], roi * 2) ;
698 src+=roi ;
699 } else {
700 mBuffer[evID].fEvent->StartTM[tmS] = -1 ;
701 }
702 }
703 } else {
704 snprintf(str,MXSTR,"wrong roi %d %d %d %d",px,pixR,roi,src-2);
705 factOut(kError,202, str ) ;
706 goto EndBuf ;
707 }
708 }
709 }// now we have stored a new board contents into Event structure
710 mBuffer[evID].board[ boardId ] = boardId ;
711 evtCtrl.evtStat[ iDx ]++ ;
712 evtCtrl.pcTime[ iDx ] = g_actTime ;
713
714 if (++mBuffer[evID].nBoard >= actBoards ) {
715 snprintf(str,MXSTR,"%5d complete event %8d %8d %2d",mBuffer[evID].evNum,evtCtrl.evtBuf[iDx],iDx,evtCtrl.evtStat[ iDx ]);
716 factOut(kDebug,-1, str ) ;
717 //complete event read ---> flag for next processing
718 evtCtrl.evtStat[ iDx ] = 99;
719 gi_EvtRead++ ;
720 gi_EvtTot++ ;
721 }
722
723EndBuf:
724 rd[i].bufTyp = 0 ; //ready to read next header
725 rd[i].bufLen = frst_len ;
726 rd[i].bufPos = 0 ;
727EndBuf1:
728 ;
729 }
730
731 } else { //we are reading event header
732 rd[i].bufPos += jrd ;
733 rd[i].bufLen -= jrd ;
734 if ( rd[i].bufPos >= minLen ){ //sufficient data to take action
735 //check if startflag correct; else shift block ....
736 for (k=0; k<rd[i].bufPos -1 ; k++) {
737 if (rd[i].rBuf->B[k ] == start.B[1]
738 && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
739 }
740 rd[i].skip += k ;
741
742 if (k >= rd[i].bufPos-1 ) { //no start of header found
743// snprintf(str,MXSTR,"no start of header on port%d", i ) ;
744// factOut(kWarn,666, str ) ;
745
746 rd[i].bufPos = 0 ;
747 rd[i].bufLen = head_len ;
748 } else if ( k>0 ) {
749 rd[i].bufPos -= k ;
750 rd[i].bufLen += k ;
751 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ;
752 }
753 if ( rd[i].bufPos >= minLen ) {
754 if ( rd[i].skip >0 ) {
755 snprintf(str,MXSTR,"skipped %d bytes on port%d", rd[i].skip, i ) ;
756 factOut(kInfo,666, str ) ;
757 rd[i].skip = 0 ;
758 }
759 goodhed++;
760 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ;
761 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
762 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; //(FADevt)
763 rd[i].ftmID = ntohl(rd[i].rBuf->I[5]) ; //(FTMevt)
764 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ;
765 if (rd[i].runID ==0 ) rd[i].runID = myRun ;
766 rd[i].bufTyp = 1 ; //ready to read full record
767 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
768 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ; //?
769 debugHead(i,rd[i].rBuf);
770 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event
771 } else {
772 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
773 }
774 } else {
775 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
776 }
777
778 } //end interpreting last read
779 } //end of successful read anything
780 } //finished trying to read all sockets
781
782 int qwait=0, qdel=0, qskip=0 ;
783 g_actTime = time(NULL) ;
784 if ( g_actTime > gi_SecTime ) {
785 gi_SecTime = g_actTime ;
786
787
788 //loop over all active events and flag those older than read-timeout
789 //delete those that are written to disk ....
790
791 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
792 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
793
794 int k1=evtCtrl.frstPtr;
795 for ( k=k1; k<(k1+kd); k++ ) {
796 int k0 = k % (MAX_EVT*MAX_RUN) ;
797//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
798 if (evtCtrl.evtStat[k0] > 0
799 && evtCtrl.evtStat[k0] < 90 ) {
800
801 qwait++;
802
803 if( evtCtrl.pcTime[k0] < g_actTime-10 ) {
804 int id =evtCtrl.evtBuf[k0] ;
805 snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]);
806 factOut(kWarn,601, str ) ;
807 evtCtrl.evtStat[k0] = 91 ; //timeout for incomplete events
808 gi_EvtBad++ ;
809 gi_EvtTot++ ;
810 qskip++;
811 }
812
813
814 } else if (evtCtrl.evtStat[k0] >= 900 ) {
815
816 int id =evtCtrl.evtBuf[k0] ;
817 snprintf(str,MXSTR,"%5d free event buffer (written) %3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
818 factOut(kDebug,-1, str ) ;
819 mBufFree(id) ; //event written--> free memory
820 evtCtrl.evtStat[k0] = -1;
821 qdel++;
822 qtot++;
823 }
824
825 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) {
826 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
827 }
828 }
829
830qconn=0 ;
831int ib ;
832for (ib=0; ib<NBOARDS; ib++) qconn+=gi_NumConnect[ib] ;
833int64_t stat[9] ;
834 stat[0]= qwait;
835 stat[1]= qskip;
836 stat[2]= qdel ;
837 stat[3]= qtot ;
838 stat[4]= gi_usedMem ;
839 stat[5]= qread;
840 stat[6]= qconn;
841
842factStat(stat,7);
843qread=0 ;
844 }
845
846
847
848
849 if (numok > 0 ) numok2=0;
850 else if (numok2++ > 3) {
851 if (g_runStat == 1) {
852 xwait.tv_sec = 1;
853 xwait.tv_nsec= 0 ; // hibernate for 1 sec
854 } else {
855 xwait.tv_sec = 0;
856 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
857 }
858 nanosleep( &xwait , NULL ) ;
859 }
860
861 } //and do next loop over all sockets ...
862
863 //must quit eventbuilding
864 snprintf(str,MXSTR,"stop reading ...");
865 factOut(kInfo,-1, str ) ;
866
867 //flag all events as 'read finished'
868 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
869 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
870
871 int k1=evtCtrl.frstPtr;
872
873 for ( k=k1; k<(k1+kd); k++ ) {
874 int k0 = k % (MAX_EVT*MAX_RUN) ;
875 if (evtCtrl.evtStat[k0] > 0
876 && evtCtrl.evtStat[k0] < 90 ) {
877 evtCtrl.evtStat[k0] = 91 ;
878 gi_EvtBad++ ;
879 gi_EvtTot++ ;
880 }
881 }
882
883 //must close all open sockets ...
884 snprintf(str,MXSTR,"close all sockets ...");
885 factOut(kInfo,-1, str ) ;
886 for (i=0; i<MAX_SOCK; i++) {
887 GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy socket
888 if (gi_NumConnect[ i/7 ]>0)
889 gi_NumConnect[ i/7 ]-- ;
890 }
891
892 xwait.tv_sec = 0;
893 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
894 nanosleep( &xwait , NULL ) ;
895 gi_runStat = -11 ; //inform all that no update to happen any more
896
897
898 int minclear = 900 ; //usually wait until writing finished (stat 900)
899 if (g_runStat <-1 ) minclear = 0 ; //in case of abort clear all
900
901
902 //and clear all buffers (might have to wait until all others are done)
903 snprintf(str,MXSTR,"clear all buffers ...");
904 factOut(kInfo,-1, str ) ;
905 int numclear=1 ;
906 while (numclear > 0 ) {
907 numclear = 0 ;
908 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
909 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
910
911 int k1=evtCtrl.frstPtr;
912 for ( k=k1; k<(k1+kd); k++ ) {
913 int k0 = k % (MAX_EVT*MAX_RUN) ;
914 if (evtCtrl.evtStat[k0] > minclear ) {
915 int id =evtCtrl.evtBuf[k0] ;
916 mBufFree(id) ; //event written--> free memory
917 evtCtrl.evtStat[k0] = -1;
918 } else if (evtCtrl.evtStat[k0] > 0) numclear++ ; //writing is still ongoing...
919
920 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 )
921 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
922 }
923
924 xwait.tv_sec = 0;
925 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
926 nanosleep( &xwait , NULL ) ;
927 }
928
929 snprintf(str,MXSTR,"Exit read Process ...");
930 factOut(kInfo,-1, str ) ;
931 gi_runStat = -99 ;
932 return 0;
933
934} /*-----------------------------------------------------------------*/
935
936
937void *procEvt( void *ptr ) {
938/* *** main loop processing file, including SW-trigger */
939 int numProc, numWait ;
940 int k ;
941 struct timespec xwait ;
942 char str[MXSTR] ;
943
944 cpu_set_t mask;
945 int cpu = 5 ; //process thread (will be several in final version)
946
947 snprintf(str,MXSTR,"Starting process-thread");
948 factOut(kInfo,-1, str ) ;
949
950/* CPU_ZERO initializes all the bits in the mask to zero. */
951 CPU_ZERO( &mask );
952/* CPU_SET sets only the bit corresponding to cpu. */
953 CPU_SET( cpu, &mask );
954/* sched_setaffinity returns 0 in success */
955 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
956 snprintf(str,MXSTR,"P ---> can not create affinity to %d",cpu);
957 factOut(kWarn,-1, str ) ;
958 }
959
960
961 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
962
963 numWait = numProc = 0 ;
964 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
965 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
966
967 int k1=evtCtrl.frstPtr;
968 for ( k=k1; k<(k1+kd); k++ ) {
969 int k0 = k % (MAX_EVT*MAX_RUN) ;
970//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
971 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
972 int id = evtCtrl.evtBuf[k0] ;
973 int ievt = mBuffer[id].evNum ;
974 int roi = mBuffer[id].nRoi ;
975// uint32_t irun = mBuffer[id].runNum ;
976//snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ;
977//factOut(kDebug,-1, str ) ;
978
979//make sure unused pixels/tmarks are cleared to zero
980 int ip,it,dest,ib;
981 for (ip=0; ip<NPIX; ip++) {
982 if (mBuffer[id].fEvent->StartPix[ip] == -1 ) {
983 dest= ip*roi ;
984 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
985 }
986 }
987 for (it=0; it<NTMARK; it++) {
988 if (mBuffer[id].fEvent->StartTM[it] == -1 ) {
989 dest= it*roi + NPIX*roi ;
990 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
991 }
992 }
993
994
995//and set correct event header ; also check for consistency in event (not yet)
996 mBuffer[id].fEvent->Roi = roi ;
997 mBuffer[id].fEvent->EventNum = ievt ;
998 mBuffer[id].fEvent->TriggerType = 0 ; // TBD
999 mBuffer[id].fEvent->SoftTrig = 0 ;
1000 for (ib=0; ib<NBOARDS; ib++) {
1001 if (mBuffer[id].board[ib] == -1 ) { //board is not read
1002 mBuffer[id].FADhead[ib].start_package_flag = 0 ;
1003 mBuffer[id].fEvent->BoardTime[ib] = 0 ;
1004 } else {
1005 mBuffer[id].fEvent->BoardTime[ib] =
1006 ntohl(mBuffer[id].FADhead[ib].time) ;
1007 }
1008 }
1009
1010 int i=eventCheck(mBuffer[id].FADhead,mBuffer[id].fEvent) ;
1011
1012 if (i<0) evtCtrl.evtStat[k0] = 999 ; //flag event to be skipped
1013
1014 numProc++ ;
1015 evtCtrl.evtStat[k0] = 520 ;
1016 gp_EvtTot++ ;
1017 } else if ( evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] < 90 ) {
1018 numWait++ ;
1019 }
1020 }
1021
1022 if ( gi_runStat < -10 && numWait == 0) { //nothing left to do
1023 snprintf(str,MXSTR,"Exit Processing Process ...");
1024 factOut(kInfo,-1, str ) ;
1025 gp_runStat = -22 ; //==> we should exit
1026 return 0 ;
1027 }
1028
1029 if (numProc == 0) {
1030 //seems we have nothing to do, so sleep a little
1031 xwait.tv_sec = 0;
1032 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1033 nanosleep( &xwait , NULL ) ;
1034 }
1035 gp_runStat = gi_runStat ;
1036
1037 }
1038
1039 //we are asked to abort asap ==> must flag all remaining events
1040 // when gi_runStat claims that all events are in the buffer...
1041
1042 snprintf(str,MXSTR,"Abort Processing Process ...");
1043 factOut(kInfo,-1, str ) ;
1044 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1045 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1046
1047 int k1=evtCtrl.frstPtr;
1048 for ( k=k1; k<(k1+kd); k++ ) {
1049 int k0 = k % (MAX_EVT*MAX_RUN) ;
1050 if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <500) {
1051 evtCtrl.evtStat[k0] = 555 ; //flag event as 'processed'
1052 }
1053 }
1054
1055 gp_runStat = -99 ;
1056
1057 return 0;
1058
1059} /*-----------------------------------------------------------------*/
1060
1061int CloseRunFile(uint32_t runId, uint32_t closeTime) {
1062/* close run runId (all all runs if runId=0) */
1063/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1064 int i, j ;
1065
1066 if (runId == 0 ) {
1067 for ( j=0; j<MAX_RUN; j++) {
1068 if ( runCtrl[j].fileId == 0 ) { //run is open
1069 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1070 if (i<0) {
1071 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
1072 factOut(kError,506, str ) ;
1073 runCtrl[j].fileId = 888 ;
1074 } else {
1075 snprintf(str,MXSTR,"closing run %d ok AAA",runCtrl[j].runId);
1076 factOut(kInfo,507, str ) ;
1077 runCtrl[j].fileId = 7777 ;
1078 }
1079 runCtrl[j].closeTime = closeTime ;
1080 }
1081 }
1082 return 0 ;
1083 }
1084
1085
1086 for ( j=0; j<MAX_RUN; j++) {
1087 if ( runCtrl[j].runId == runId ) {
1088 if ( runCtrl[j].fileId == 0 ) { //run is open
1089 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1090 if (i<0) {
1091 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
1092 factOut(kError,506, str ) ;
1093 runCtrl[j].fileId = 888 ;
1094 } else {
1095 snprintf(str,MXSTR,"closing run %d ok AAA",runCtrl[j].runId);
1096 factOut(kInfo,507, str ) ;
1097 runCtrl[j].fileId = 7777 ;
1098 }
1099 runCtrl[j].closeTime = closeTime ;
1100 return 0;
1101 } else if ( runCtrl[j].fileId <0 ) { //run not yet opened
1102 runCtrl[j].closeTime = closeTime ;
1103 return +1;
1104 } else { // run already closed
1105 return +2;
1106 }
1107 }
1108 } //we only reach here if the run was never created
1109 return -1;
1110
1111} /*-----------------------------------------------------------------*/
1112
1113
1114void *writeEvt( void *ptr ) {
1115/* *** main loop writing event (including opening and closing run-files */
1116
1117 int numWrite, numWait ;
1118 int k,j ;
1119 struct timespec xwait ;
1120 char str[MXSTR] ;
1121
1122 cpu_set_t mask;
1123 int cpu = 3 ; //write thread
1124
1125 snprintf(str,MXSTR,"Starting write-thread");
1126 factOut(kInfo,-1, str ) ;
1127
1128/* CPU_ZERO initializes all the bits in the mask to zero. */
1129 CPU_ZERO( &mask );
1130/* CPU_SET sets only the bit corresponding to cpu. */
1131 CPU_SET( cpu, &mask );
1132/* sched_setaffinity returns 0 in success */
1133 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1134 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
1135 }
1136
1137 int lastRun = 0 ; //usually run from last event still valid
1138
1139 while (g_runStat >-2) {
1140
1141 numWait = numWrite = 0 ;
1142 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1143 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1144
1145 int k1=evtCtrl.frstPtr;
1146 for ( k=k1; k<(k1+kd); k++ ) {
1147 int k0 = k % (MAX_EVT*MAX_RUN) ;
1148//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1149 if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] < 900) {
1150 int id = evtCtrl.evtBuf[k0] ;
1151 uint32_t irun = mBuffer[id].runNum ;
1152 int ievt = mBuffer[id].evNum ;
1153
1154 if (runCtrl[lastRun].runId == irun ) {
1155 j = lastRun ;
1156 } else {
1157 //check which fileID to use (or open if needed)
1158 for ( j=0; j<MAX_RUN; j++) {
1159 if ( runCtrl[j].runId == irun ) break ;
1160 }
1161 if ( j >= MAX_RUN ) {
1162 snprintf(str,MXSTR,"W error: can not find run %d for event %d in %d", irun,ievt,id);
1163 factOut(kFatal,901, str ) ;
1164for ( j=0; j<MAX_RUN; j++) printf("j %d run.j %d run %d\n",j,runCtrl[j].runId,irun );
1165exit(111);
1166 }
1167 lastRun = j ;
1168 }
1169
1170 if (runCtrl[j].fileId < 0 ) {
1171 actRun.Version = 1 ;
1172 actRun.RunType = -1 ;
1173 actRun.NBoard = NBOARDS ;
1174 actRun.NPix = NPIX ;
1175 actRun.NTm = NTMARK ;
1176 actRun.Nroi = mBuffer[id].nRoi ;
1177// actRun.FADhead = mBuffer[id].FADhead ; //to be corrected
1178 runCtrl[j].nextEvt= 0;
1179 runCtrl[j].lastTime=g_actTime ;
1180 runCtrl[j].fileHd = runOpen(irun, &actRun, sizeof(actRun) ) ;
1181 if (runCtrl[j].fileHd == NULL ) {
1182 snprintf(str,MXSTR,"W could not open a file for run %d",irun);
1183 factOut(kError,502, str ) ;
1184 runCtrl[j].fileId = 99 ;
1185 } else {
1186 snprintf(str,MXSTR,"W opened new run_file %d",irun) ;
1187 factOut(kInfo,-1, str ) ;
1188 runCtrl[j].fileId = 0 ;
1189 }
1190
1191 }
1192
1193 if (runCtrl[j].fileId > 0 ) {
1194 snprintf(str,MXSTR,"W no open file for this run %d",irun) ;
1195 factOut(kDebug,123,str) ;
1196 evtCtrl.evtStat[k0] = 902 ;
1197 } else {
1198 int i=runWrite(runCtrl[j].fileHd, mBuffer[id].fEvent, sizeof(mBuffer[id]) );
1199 if (i<0) {
1200 snprintf(str,MXSTR,"W error writing event for run %d",irun) ;
1201 factOut(kError,503, str ) ;
1202 evtCtrl.evtStat[k0] = 901 ;
1203 //close run
1204 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1205 if (i<0) {
1206 snprintf(str,MXSTR,"W error closing run %d",irun) ;
1207 factOut(kError,503, str ) ;
1208 } else {
1209 snprintf(str,MXSTR,"W closed run %d because of write error",irun) ;
1210 factOut(kInfo,503, str ) ;
1211 }
1212 runCtrl[j].fileId = 9999 ;
1213 } else {
1214 runCtrl[j].lastTime = g_actTime;
1215 evtCtrl.evtStat[k0] = 901 ;
1216 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0);
1217 factOut(kDebug,504, str ) ;
1218 }
1219 }
1220 } else if (evtCtrl.evtStat[k0] > 0 ) numWait++ ;
1221 }
1222
1223 //check if we should close a run ...
1224 for ( j=0; j<MAX_RUN; j++) {
1225 if ( runCtrl[j].fileId==0
1226 && ( runCtrl[j].closeTime < g_actTime
1227 ||runCtrl[j].lastTime < g_actTime-120) ) {
1228 int i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1229 if (i<0) {
1230 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
1231 factOut(kError,506, str ) ;
1232 runCtrl[j].fileId = 888 ;
1233 } else {
1234 snprintf(str,MXSTR,"closing run %d ok BBB",runCtrl[j].runId);
1235 factOut(kInfo,507, str ) ;
1236 runCtrl[j].fileId = 7777 ;
1237 }
1238 }
1239 }
1240
1241 if (numWrite == 0) {
1242 //seems we have nothing to do, so sleep a little
1243 xwait.tv_sec = 0;
1244 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1245 nanosleep( &xwait , NULL ) ;
1246 }
1247
1248 if ( gi_runStat < -10 && numWait == 0) { //nothing left to do
1249 snprintf(str,MXSTR,"Finish Write Process ...");
1250 factOut(kInfo,-1, str ) ;
1251 gw_runStat = -22 ; //==> we should exit
1252 goto closerun ;
1253 }
1254 gw_runStat = gi_runStat ;
1255
1256 }
1257
1258 //must close all open files ....
1259 snprintf(str,MXSTR,"Abort Writing Process ...");
1260 factOut(kInfo,-1, str ) ;
1261closerun:
1262 snprintf(str,MXSTR,"Close all open files ...");
1263 factOut(kInfo,-1, str ) ;
1264 for ( j=0; j<MAX_RUN; j++)
1265 if ( runCtrl[j].fileId ==0 ) {
1266 int i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1267 if (i<0) {
1268 snprintf(str,MXSTR,"error closing run %d %d",runCtrl[j].runId,i) ;
1269 factOut(kError,506, str ) ;
1270 runCtrl[j].fileId = 888 ;
1271 } else {
1272 snprintf(str,MXSTR,"closing run %d ok AAA",runCtrl[j].runId);
1273 factOut(kInfo,507, str ) ;
1274 runCtrl[j].fileId = 7777 ;
1275 }
1276 }
1277
1278 gw_runStat = -99;
1279 snprintf(str,MXSTR,"Exit Writing Process ...");
1280 factOut(kInfo,-1, str ) ;
1281 return 0;
1282
1283
1284
1285
1286} /*-----------------------------------------------------------------*/
1287
1288
1289
1290
1291void StartEvtBuild() {
1292
1293 int i,j,imax,status,th_ret[50] ;
1294 pthread_t thread[50] ;
1295 struct timespec xwait ;
1296 uint32_t actime ;
1297
1298 gi_runStat = gp_runStat = gw_runStat = 0 ;
1299
1300 snprintf(str,MXSTR,"Starting EventBuilder");
1301 factOut(kInfo,-1, str ) ;
1302
1303
1304 evtCtrl.frstPtr = 0 ;
1305 evtCtrl.lastPtr = 0 ;
1306
1307 actime = g_actTime + 50000000 ;
1308/* initialize run control logics */
1309 for (i=0; i<MAX_RUN; i++) {
1310 runCtrl[i].runId = 0 ;
1311 runCtrl[i].lastTime = 0 ;
1312 runCtrl[i].closeTime = time(NULL) + 3600*24*7;
1313
1314 runCtrl[i].nextEvt = 0 ;
1315 runCtrl[i].fileId = -2 ;
1316
1317 runTail[i].nEventsOk =
1318 runTail[i].nEventsRej =
1319 runTail[i].nEventsBad =
1320 runTail[i].PCtime0 =
1321 runTail[i].PCtimeX = 0 ;
1322 }
1323
1324//start all threads (more to come) when we are allowed to ....
1325 while (g_runStat == 0 ) {
1326 xwait.tv_sec = 0;
1327 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1328 nanosleep( &xwait , NULL ) ;
1329 }
1330
1331 i=0 ;
1332 th_ret[i] = pthread_create( &thread[i], NULL, readFAD, NULL );
1333 i++;
1334 th_ret[i] = pthread_create( &thread[i], NULL, procEvt, NULL );
1335 i++;
1336 th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, NULL );
1337 i++;
1338 imax=i ;
1339
1340
1341
1342
1343
1344/*
1345 xwait.tv_sec = 20;;
1346 xwait.tv_nsec= 0 ; // sleep for ~20sec
1347 nanosleep( &xwait , NULL ) ;
1348
1349
1350 printf("close all runs in 2 seconds\n");
1351
1352
1353 CloseRunFile( 0, time(NULL)+2) ;
1354
1355 xwait.tv_sec = 5;;
1356 xwait.tv_nsec= 0 ; // sleep for ~20sec
1357 nanosleep( &xwait , NULL ) ;
1358
1359 printf("setting g_runstat to -1\n");
1360
1361 g_runStat = -1 ;
1362*/
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374//wait for all threads to finish
1375 for (i=0; i<imax; i++) {
1376 j = pthread_join ( thread[i], (void **)&status) ;
1377 }
1378
1379} /*-----------------------------------------------------------------*/
1380
1381
1382
1383
1384
1385 /*-----------------------------------------------------------------*/
1386 /*-----------------------------------------------------------------*/
1387 /*-----------------------------------------------------------------*/
1388 /*-----------------------------------------------------------------*/
1389 /*-----------------------------------------------------------------*/
1390 /*-----------------------------------------------------------------*/
1391 /*-----------------------------------------------------------------*/
1392 /*-----------------------------------------------------------------*/
1393 /*-----------------------------------------------------------------*/
1394 /*-----------------------------------------------------------------*/
1395
1396
1397/*
1398
1399FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len )
1400{ return 1; } ;
1401
1402int runWrite(FileHandle_t fileHd , EVENT *event, size_t len )
1403{ return 1; } ;
1404
1405int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len )
1406{ return 1; } ;
1407
1408
1409
1410int eventCheck( PEVNT_HEADER *fadhd, EVENT *event)
1411{
1412 int i=0;
1413
1414 printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
1415 for (i=0; i<NBOARDS; i++) {
1416 printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
1417 }
1418 return 0;
1419}
1420
1421
1422void factStat(int64_t *array, int len ) {
1423 printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
1424 array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
1425}
1426
1427
1428void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) {
1429// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
1430}
1431
1432
1433
1434void debugStream(int isock, void *buf, int len) {
1435}
1436
1437void debugHead(int i, void *buf) {
1438}
1439
1440
1441void factOut(int severity, int err, char* message ) {
1442static FILE * fd ;
1443static int file=0 ;
1444
1445 if (file==0) {
1446 printf("open file\n");
1447 fd=fopen("x.out","w+") ;
1448 file=999;
1449 }
1450
1451 fprintf(fd,"%3d %3d | %s \n",severity,err,message) ;
1452
1453 if (severity != kDebug)
1454 printf("%3d %3d | %s\n",severity,err,message) ;
1455}
1456
1457
1458
1459int main() {
1460 int i,b,c,p ;
1461 char ipStr[100] ;
1462 struct in_addr IPaddr ;
1463
1464 g_maxMem = 1024*1024 ; //MBytes
1465 g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
1466
1467
1468 g_runStat = 40 ;
1469
1470 i=0 ;
1471
1472// version for standard crates
1473//for (c=0; c<4,c++) {
1474// for (b=0; b<10; b++) {
1475// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
1476//
1477// inet_pton(PF_INET, ipStr, &IPaddr) ;
1478//
1479// g_port[i].sockAddr.sin_family = PF_INET;
1480// g_port[i].sockAddr.sin_port = htons(5000) ;
1481// g_port[i].sockAddr.sin_addr = IPaddr ;
1482// g_port[i].sockDef = 1 ;
1483// i++ ;
1484// }
1485//}
1486//
1487//version for PC-test
1488 for (c=0; c<4; c++) {
1489 for (b=0; b<10; b++) {
1490 sprintf(ipStr,"10.0.%d.11",128+c) ;
1491 if (c==0) sprintf(ipStr,"10.0.100.11") ;
1492
1493 inet_pton(PF_INET, ipStr, &IPaddr) ;
1494 p = 31919+100*c+10*b;
1495
1496
1497 g_port[i].sockAddr.sin_family = PF_INET;
1498 g_port[i].sockAddr.sin_port = htons(p) ;
1499 g_port[i].sockAddr.sin_addr = IPaddr ;
1500 g_port[i].sockDef = 1 ;
1501
1502 i++ ;
1503 }
1504 }
1505
1506
1507//g_port[17].sockDef =-1 ;
1508//g_actBoards-- ;
1509
1510 StartEvtBuild() ;
1511
1512 return 0;
1513
1514}
1515*/
Note: See TracBrowser for help on using the repository browser.