source: trunk/FACT++/src/EventBuilder.c@ 11519

Last change on this file since 11519 was 11511, checked in by tbretz, 14 years ago
Updated to latest version of event builder.
File size: 55.2 KB
Line 
1
2
3
4
5
6#include <stdlib.h>
7#include <stdint.h>
8#include <unistd.h>
9#include <stdio.h>
10#include <sys/time.h>
11#include <arpa/inet.h>
12#include <string.h>
13#include <math.h>
14#include <error.h>
15#include <errno.h>
16#include <unistd.h>
17#include <sys/types.h>
18#include <sys/socket.h>
19#include <pthread.h>
20#include <sched.h>
21
22#include "EventBuilder.h"
23
24enum Severity
25{
26 kMessage = 10, ///< Just a message, usually obsolete
27 kInfo = 20, ///< An info telling something which can be interesting to know
28 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
29 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
30 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
31 kDebug = 99, ///< A message used for debugging only
32};
33
34#define MIN_LEN 32 // min #bytes needed to interpret FADheader
35#define MAX_LEN 256*1024 // size of read-buffer per socket
36
37extern FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) ;
38extern int runWrite(FileHandle_t fileHd , EVENT *event, size_t len ) ;
39extern int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len ) ;
40extern void factOut(int severity, int err, char* message ) ;
41extern void gotNewRun( int runnr, PEVNT_HEADER *headers );
42
43
44extern void factStat(GUI_STAT gj) ;
45
46extern void factStatNew(EVT_STAT gi) ;
47
48extern int eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ;
49
50extern void debugHead(int i, int j, void *buf);
51
52extern void debugRead(int isock, int ibyte, int32_t event,int32_t ftmevt,
53 int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) ;
54extern void debugStream(int isock, void *buf, int len) ;
55
56int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
57
58
59uint g_actTime ;
60uint g_actUsec ;
61int g_runStat ;
62int g_reset ;
63int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX ;
64size_t g_maxMem ; //maximum memory allowed for buffer
65
66//no longer needed ...
67 int g_maxBoards ; //maximum number of boards to be initialized
68 int g_actBoards ;
69//
70
71FACT_SOCK g_port[NBOARDS] ; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
72
73
74 int gi_runStat ;
75 int gp_runStat ;
76 int gw_runStat ;
77
78 int gi_memStat = +1 ;
79
80 uint32_t gi_myRun = 0 ;
81 uint32_t actrun = 0 ;
82
83
84uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
85
86//uint gi_EvtStart= 0 ;
87//uint gi_EvtRead = 0 ;
88//uint gi_EvtBad = 0 ;
89//uint gi_EvtTot = 0 ;
90//size_t gi_usedMem = 0 ;
91
92//uint gw_EvtTot = 0 ;
93//uint gp_EvtTot = 0 ;
94
95PIX_MAP g_pixMap[NPIX] ;
96
97EVT_STAT gi ;
98GUI_STAT gj ;
99
100EVT_CTRL evtCtrl ; //control of events during processing
101int evtIdx[MAX_EVT*MAX_RUN] ; //index from mBuffer to evtCtrl
102
103WRK_DATA mBuffer[MAX_EVT*MAX_RUN]; //local working space
104
105
106
107
108RUN_HEAD actRun ;
109
110RUN_CTRL runCtrl[MAX_RUN] ;
111
112RUN_TAIL runTail[MAX_RUN] ;
113
114
115/*
116*** Definition of rdBuffer to read in IP packets; keep it global !!!!
117 */
118
119
120typedef union {
121 int8_t B[MAX_LEN ];
122 int16_t S[MAX_LEN/2];
123 int32_t I[MAX_LEN/4];
124 int64_t L[MAX_LEN/8];
125} CNV_FACT ;
126
127typedef struct {
128 int bufTyp ; //what are we reading at the moment: 0=header 1=data -1=skip ...
129 int32_t bufPos ; //next byte to read to the buffer next
130 int32_t bufLen ; //number of bytes left to read
131 int32_t skip ; //number of bytes skipped before start of event
132
133 int sockStat ; //-1 if socket not yet connected , 99 if not exist
134 int socket ; //contains the sockets
135 struct sockaddr_in SockAddr ; //IP for each socket
136
137 int evtID ; // event ID of event currently read
138 int runID ; // run "
139 int ftmID ; // event ID from FTM
140 uint fadLen ; // FADlength of event currently read
141 int fadVers ; // Version of FAD
142 int board ; // boardID (softwareID: 0..40 )
143 int Port ;
144
145 CNV_FACT *xBuf ; //a copy of rBuf (temporary for debuging)
146 CNV_FACT *rBuf ;
147
148
149} READ_STRUCT ;
150
151
152typedef union {
153 int8_t B[2];
154 int16_t S ;
155} SHORT_BYTE ;
156
157
158
159
160
161#define MXSTR 1000
162char str[MXSTR] ;
163
164SHORT_BYTE start, stop;
165
166READ_STRUCT rd[MAX_SOCK] ; //buffer to read IP and afterwards store in mBuffer
167
168
169
170/*-----------------------------------------------------------------*/
171
172
173/*-----------------------------------------------------------------*/
174
175
176
177int GenSock(int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) {
178/*
179*** generate Address, create sockets and allocates readbuffer for it
180***
181*** if flag==0 generate socket and buffer
182*** <0 destroy socket and buffer
183*** >0 close and redo socket
184***
185*** sid : board*7 + port id
186 */
187
188 int j ;
189
190 if (rd->sockStat ==0 ) { //close socket if open
191 j=close(rd->socket) ;
192 if (j>0) {
193 snprintf(str,MXSTR,"Error closing socket %d | %m",sid);
194 factOut(kFatal,771, str ) ;
195 } else {
196 snprintf(str,MXSTR,"Succesfully closed socket %d",sid);
197 factOut(kInfo,771, str ) ;
198 }
199 }
200
201
202 if (flag < 0) {
203 free(rd->rBuf) ; //and never open again
204 free(rd->xBuf) ; //and never open again
205 rd->rBuf = NULL ;
206 rd->sockStat = 99 ;
207 return 0 ;
208 }
209
210
211 if (flag == 0) { //generate address and buffer ...
212 rd->Port = port ;
213 rd->SockAddr.sin_family = sockAddr->sin_family;
214 rd->SockAddr.sin_port = htons(port) ;
215 rd->SockAddr.sin_addr = sockAddr->sin_addr ;
216
217 rd->xBuf = malloc(sizeof(CNV_FACT) ) ;
218 rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
219 if ( rd->rBuf == NULL ) {
220 snprintf(str,MXSTR,"Could not create local buffer %d",sid);
221 factOut(kFatal,774, str ) ;
222 rd->sockStat = 77 ;
223 return -3 ;
224 }
225 }
226
227
228 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
229 snprintf(str,MXSTR,"Could not generate socket %d | %m",sid);
230 factOut(kFatal,773, str ) ;
231 rd->sockStat = 88 ;
232 return -2 ;
233 }
234
235 snprintf(str,MXSTR,"Successfully generated socket %d ",sid);
236 factOut(kInfo,773, str ) ;
237 rd->sockStat = -1 ; //try to (re)open socket
238 return 0 ;
239
240} /*-----------------------------------------------------------------*/
241
242 /*-----------------------------------------------------------------*/
243
244
245
246
247int mBufInit() {
248// initialize mBuffer (mark all entries as unused\empty)
249
250 int i ;
251 uint32_t actime ;
252
253 actime = g_actTime + 50000000 ;
254
255 for (i=0; i<MAX_EVT*MAX_RUN; i++) {
256 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
257 mBuffer[i].runNum= 0 ;
258
259 evtCtrl.evtBuf[ i] = -1 ;
260 evtCtrl.evtStat[ i] = -1 ;
261 evtCtrl.pcTime[ i] = actime ; //initiate to far future
262 }
263
264
265 actRun.FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ;
266
267 evtCtrl.frstPtr = 0 ;
268 evtCtrl.lastPtr = 0 ;
269
270 return 0 ;
271
272} /*-----------------------------------------------------------------*/
273
274
275
276
277int mBufEvt( int evID, uint runID, int nRoi, int sk, int fadlen) {
278// generate a new Event into mBuffer:
279// make sure only complete Event are possible, so 'free' will always work
280// returns index into mBuffer[], or negative value in case of error
281
282
283 int i, k, evFree ;
284 int headmem=0 ;
285 size_t needmem = 0 ;
286
287 if (nRoi <0 || nRoi > 1024) {
288 snprintf(str,MXSTR,"illegal nRoi: %d",nRoi) ;
289 factOut(kError, 1, str ) ;
290 return -9999 ;
291 }
292
293 i = evID % MAX_EVT ;
294 evFree = -1 ;
295
296 for ( k=0; k<MAX_RUN; k++) {
297 if ( mBuffer[i].evNum == evID
298 && mBuffer[i].runNum== runID ) {
299 return i ;
300 }
301 if ( evFree < 0 && mBuffer[i].evNum < 0 ) evFree = i ;
302 i += MAX_EVT ;
303 }
304
305
306 //event does not yet exist; create
307 if (evFree < 0 ) { //no space available in ctrl
308 snprintf(str,MXSTR,"no control slot to keep event %d",evID) ;
309 factOut(kError,881, str ) ;
310 return -1 ;
311 }
312 i = evFree ; //found free entry; use it ...
313
314
315 needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2; //
316
317 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
318
319 if ( gj.usdMem + needmem + headmem > g_maxMem) {
320 gj.maxMem = gj.usdMem + needmem + headmem ;
321 if (gi_memStat>0 ) {
322 gi_memStat = -99 ;
323 snprintf(str,MXSTR,"no memory left to keep event %6d sock %3d",evID,sk) ;
324 factOut(kError,882, str ) ;
325 } else {
326 snprintf(str,MXSTR,"no memory left to keep event %6d sock %3d",evID,sk) ;
327 factOut(kDebug,882, str ) ;
328 }
329 return -11 ;
330 }
331
332 mBuffer[i].FADhead = malloc( headmem ) ;
333 if (mBuffer[i].FADhead == NULL) {
334 snprintf(str,MXSTR,"malloc header failed for event %d",evID) ;
335 factOut(kError,882, str ) ;
336 return -12;
337 }
338
339 mBuffer[i].fEvent = malloc( needmem ) ;
340 if (mBuffer[i].fEvent == NULL) {
341 snprintf(str,MXSTR,"malloc data failed for event %d",evID) ;
342 factOut(kError,882, str ) ;
343 free(mBuffer[i].fEvent) ;
344 mBuffer[i].fEvent = NULL ;
345 return -22;
346 }
347
348 //flag all boards as unused
349 mBuffer[i].nBoard = 0 ;
350 for (k=0; k<NBOARDS; k++ ) {
351 mBuffer[i].board[k] = -1;
352 }
353
354 //flag all pixels as unused
355 for (k=0; k<NPIX; k++ ) {
356 mBuffer[i].fEvent->StartPix[k] = -1 ;
357 }
358
359 //flag all TMark as unused
360 for (k=0; k<NTMARK; k++ ) {
361 mBuffer[i].fEvent->StartTM[k] = -1 ;
362 }
363
364 mBuffer[i].fEvent->PCUsec = g_actUsec ;
365 mBuffer[i].fEvent->PCTime =
366 mBuffer[i].pcTime = g_actTime ;
367 mBuffer[i].nRoi = nRoi ;
368 mBuffer[i].evNum = evID ;
369 mBuffer[i].runNum = runID ;
370 mBuffer[i].evtLen = needmem ;
371
372 gj.usdMem += needmem + headmem;
373 if (gj.usdMem > gj.maxMem ) gj.maxMem = gj.usdMem ;
374
375 gj.rateNew++ ;
376
377 //register event in 'active list (reading)'
378
379 evtCtrl.evtBuf[ evtCtrl.lastPtr] = i ;
380 evtCtrl.evtStat[ evtCtrl.lastPtr] = 0 ;
381 evtCtrl.pcTime[ evtCtrl.lastPtr] = g_actTime ;
382 evtIdx[i] = evtCtrl.lastPtr ;
383
384
385 snprintf(str,MXSTR,"%5d start new evt %8d %8d sock %3d len %5d t %10d",
386 evID,i,evtCtrl.lastPtr,sk,fadlen,mBuffer[i].pcTime);
387 factOut(kDebug,-11, str ) ;
388 evtCtrl.lastPtr++ ;
389 if (evtCtrl.lastPtr == MAX_EVT*MAX_RUN ) evtCtrl.lastPtr = 0;
390
391 gi.evtGet++ ;
392
393 //check if runId already registered in runCtrl
394 evFree = -1 ;
395 for (k=0; k<MAX_RUN; k++) {
396 if (runCtrl[k].runId == runID ) return i ;//run exists already
397 else if (evFree < 0 && runCtrl[k].runId == 0 ) evFree = k ;
398 }
399
400 if (evFree <0 ) {
401 snprintf(str,MXSTR,"not able to register the new run %d",runID);
402 factOut(kFatal,883, str ) ;
403 } else {
404 runCtrl[evFree].runId = runID ;
405 runCtrl[evFree].fileId = -2 ;
406 runCtrl[evFree].lastEvt= -1 ;
407 runCtrl[evFree].nextEvt= 0;
408 runCtrl[evFree].actEvt = 0;
409 runCtrl[evFree].maxEvt = 999999999 ; //max number events allowed
410 runCtrl[evFree].firstUsec=g_actUsec ;
411 runCtrl[evFree].firstTime=
412 runCtrl[evFree].lastTime=g_actTime ;
413 runCtrl[evFree].closeTime=g_actTime + 3600*24 ; //max time allowed
414 runCtrl[evFree].lastTime = 0 ;
415
416 runTail[evFree].nEventsOk =
417 runTail[evFree].nEventsRej =
418 runTail[evFree].nEventsBad =
419 runTail[evFree].PCtime0 =
420 runTail[evFree].PCtimeX = 0 ;
421 }
422
423 return i ;
424
425} /*-----------------------------------------------------------------*/
426
427
428
429
430int mBufFree(int i) {
431//delete entry [i] from mBuffer:
432//(and make sure multiple calls do no harm ....)
433
434 int headmem=0 ;
435 int evid ;
436 size_t freemem = 0 ;
437
438 evid = mBuffer[i].evNum ;
439 freemem = mBuffer[i].evtLen ;
440
441 free(mBuffer[i].fEvent ) ;
442 mBuffer[i].fEvent = NULL ;
443
444 free(mBuffer[i].FADhead ) ;
445 mBuffer[i].FADhead = NULL ;
446
447 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
448 mBuffer[i].evNum = mBuffer[i].nRoi= -1;
449 mBuffer[i].runNum = 0;
450
451 gj.usdMem = gj.usdMem - freemem - headmem;
452
453 if (gi_memStat < 0 ) {
454 if (gj.usdMem <= 0.75 * gj.maxMem ) gi_memStat = +1 ;
455 }
456
457
458 return 0 ;
459
460} /*-----------------------------------------------------------------*/
461
462
463void resetEvtStat() {
464 int i ;
465
466 for (i=0; i<MAX_SOCK; i++) gi.numRead[i] = 0 ;
467
468 for (i=0; i<NBOARDS; i++ ) {
469 gi.gotByte[i] = 0 ;
470 gi.gotErr[i] = 0 ;
471 }
472
473 gi.evtGet = 0 ; //#new Start of Events read
474 gi.evtTot = 0 ; //#complete Events read
475 gi.evtErr = 0 ; //#Events with Errors
476 gi.evtSkp = 0 ; //#Events incomplete (timeout)
477
478 gi.procTot = 0 ; //#Events processed
479 gi.procErr = 0 ; //#Events showed problem in processing
480 gi.procTrg = 0 ; //#Events accepted by SW trigger
481 gi.procSkp = 0 ; //#Events rejected by SW trigger
482
483 gi.feedTot = 0 ; //#Events used for feedBack system
484 gi.feedErr = 0 ; //#Events rejected by feedBack
485
486 gi.wrtTot = 0 ; //#Events written to disk
487 gi.wrtErr = 0 ; //#Events with write-error
488
489 gi.runOpen = 0 ; //#Runs opened
490 gi.runClose= 0 ; //#Runs closed
491 gi.runErr = 0 ; //#Runs with open/close errors
492
493return ;
494} /*-----------------------------------------------------------------*/
495
496
497
498void initReadFAD() {
499return ;
500} /*-----------------------------------------------------------------*/
501
502
503
504void *readFAD( void *ptr ) {
505/* *** main loop reading FAD data and sorting them to complete events */
506 int head_len,frst_len,numok,numok2,numokx,dest,evID,i,k ;
507 int actBoards = 0, minLen ;
508 int32_t jrd ;
509 uint gi_SecTime ; //time in seconds
510 int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ;
511
512 int goodhed = 0 ;
513 int errcnt0 = 0 ;
514
515 int sockDef[NBOARDS]; //internal state of sockets
516 int jrdx ;
517
518
519 struct timespec xwait ;
520
521
522 struct timeval *tv, atv;
523 tv=&atv;
524 uint32_t tsec, tusec ;
525
526
527 gi_myRun = g_actTime ;
528 snprintf(str,MXSTR,"start initializing");
529 factOut(kInfo,-1, str ) ;
530
531 int cpu = 7 ; //read thread
532 cpu_set_t mask;
533
534/* CPU_ZERO initializes all the bits in the mask to zero. */
535 CPU_ZERO( &mask );
536/* CPU_SET sets only the bit corresponding to cpu. */
537 cpu = 7 ;
538 CPU_SET( cpu, &mask );
539
540/* sched_setaffinity returns 0 in success */
541 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
542 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
543 factOut(kWarn,-1, str ) ;
544 }
545
546 head_len = sizeof(PEVNT_HEADER) ;
547 frst_len = head_len ; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
548 minLen = head_len ; //min #bytes needed to check header: full header for debug
549
550 start.S=0xFB01;
551 stop.S= 0x04FE;
552
553/* initialize run control logics */
554 for (i=0; i<MAX_RUN; i++) {
555 runCtrl[i].runId = 0 ;
556 runCtrl[i].fileId = -2 ;
557 }
558 gi_resetS = gi_resetR = 9;
559 for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ;
560
561START:
562 evtCtrl.frstPtr = 0 ;
563 evtCtrl.lastPtr = 0 ;
564
565 gi_SecTime = g_actTime ;
566 gi_runStat = g_runStat ;
567 gj.readStat= g_runStat ;
568 numok = numok2 = 0 ;
569
570 if ( gi_resetS > 0) {
571 //make sure all sockets are preallocated as 'not exist'
572 for (i=0; i<MAX_SOCK; i++) {
573 rd[i].socket = -1 ;
574 rd[i].sockStat = 99 ;
575 }
576
577 for (k=0; k<NBOARDS; k++) {
578 gi_NumConnect[k]=0;
579 gi.numConn[k] =0;
580 gj.numConn[k] =0;
581 gj.errConn[k] =0;
582 gj.rateBytes[k] =0;
583 gj.totBytes[k] =0;
584 }
585
586 }
587
588
589 if ( gi_resetR > 0) {
590 resetEvtStat();
591 gj.usdMem = gj.maxMem = gj.xxxMem = 0 ;
592 gj.totMem = g_maxMem ;
593 gj.bufNew = gj.bufEvt = 0 ;
594 gj.evtSkip= gj.evtWrite = gj.evtErr = 0 ;
595
596
597 mBufInit() ; //initialize buffers
598
599 snprintf(str,MXSTR,"end initializing");
600 factOut(kInfo,-1, str ) ;
601 }
602
603
604 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0 ;
605
606 while (g_runStat >=0 && g_reset ==0 ) { //loop until global variable g_runStat claims stop
607
608 gi_runStat = g_runStat;
609 gj.readStat= g_runStat;
610
611 int b,p,p0,s0,nch;
612 nch = 0 ;
613 for (b=0; b<NBOARDS; b++ ) {
614 k = b*7 ;
615 if ( g_port[b].sockDef != sockDef[b] ) { //something has changed ...
616 nch++ ;
617 gi_NumConnect[ b ] = 0 ; //must close all connections
618 gi.numConn[ b ] = 0;
619 gj.numConn[ b ] = 0;
620 if ( sockDef[b] == 0) s0= 0 ; //sockets to be defined and opened
621 else if (g_port[b].sockDef == 0) s0= -1 ; //sockets to be destroyed
622 else s0= +1 ; //sockets to be closed and reopened
623
624 if (s0 == 0) p0=ntohs(g_port[b].sockAddr.sin_port);
625 else p0=0 ;
626
627 for (p=p0+1; p<p0+8; p++) {
628 GenSock(s0, k, p, &g_port[b].sockAddr, &rd[k]) ; //generate address and socket
629 k++ ;
630 }
631 sockDef[b] = g_port[b].sockDef ;
632 }
633 }
634
635 if (nch > 0 ) {
636 actBoards = 0 ;
637 for (b=0; b<NBOARDS; b++ ) {
638 if ( sockDef[b] > 0 ) actBoards++ ;
639 }
640 }
641
642
643 jrdx = 0;
644 numokx= 0;
645 numok = 0 ; //count number of succesfull actions
646
647 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read
648 b = i / 7 ;
649 if (sockDef[b] > 0) s0= +1 ;
650 else s0= -1 ;
651
652 gettimeofday( tv, NULL);
653 g_actTime = tsec = atv.tv_sec ;
654 g_actUsec = tusec= atv.tv_usec ;
655
656 if (rd[i].sockStat <0 ) { //try to connect if not yet done
657 rd[i].sockStat=connect(rd[i].socket,
658 (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ;
659 if (rd[i].sockStat ==0 ) { //successfull ==>
660 if (sockDef[b] > 0) {
661 rd[i].bufTyp = 0 ; // expect a header
662 rd[i].bufLen = frst_len ; // max size to read at begining
663 } else {
664 rd[i].bufTyp = -1 ; // full data to be skipped
665 rd[i].bufLen = MAX_LEN ; //huge for skipping
666 }
667 rd[i].bufPos = 0 ; // no byte read so far
668 rd[i].skip = 0 ; // start empty
669 gi_NumConnect[ b ]++ ;
670 gi.numConn[ b ]++ ;
671 gj.numConn[ b ]++ ;
672 snprintf(str,MXSTR,"+++connect %d %d",b,gi.numConn[ b ]);
673 factOut(kInfo,-1, str ) ;
674 }
675 }
676
677 if (rd[i].sockStat ==0 ) { //we have a connection ==> try to read
678 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
679 numok++ ;
680 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
681
682 if (jrd >0 ) {
683 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ;
684 memcpy(&rd[i].xBuf->B[ rd[i].bufPos], &rd[i].rBuf->B[ rd[i].bufPos], jrd) ;
685 snprintf(str,MXSTR,"read sock %3d bytes %5d len %5d first %d %d",i,jrd,rd[i].bufLen,
686 rd[i].rBuf->B[ rd[i].bufPos ],
687 rd[i].rBuf->B[ rd[i].bufPos +1] );
688 factOut(kDebug,301, str ) ;
689 }
690
691 if (jrd == 0) { //connection has closed ...
692 snprintf(str,MXSTR,"Socket %d closed by FAD",i);
693 factOut(kInfo,441, str ) ;
694 GenSock(s0, i, 0,NULL, &rd[i]) ;
695 gi.gotErr[ b ]++ ;
696 gi_NumConnect[ b ]-- ;
697 gi.numConn[ b ]-- ;
698 gj.numConn[ b ]-- ;
699
700 } else if ( jrd<0 ) { //did not read anything
701 if (errno != EAGAIN && errno != EWOULDBLOCK ) {
702 snprintf(str,MXSTR,"Error Reading from %d | %m",i);
703 factOut(kError,442, str ) ;
704 gi.gotErr[ b ]++ ;
705 } else numok-- ; //else nothing waiting to be read
706 jrd = 0 ;
707 }
708 } else {
709 jrd = 0 ; //did read nothing as requested
710 snprintf(str,MXSTR,"do not read from socket %d %d",i,rd[i].bufLen ) ;
711 factOut(kDebug,301, str ) ;
712 }
713
714 gi.gotByte[ b ] += jrd ;
715 gj.rateBytes[b] += jrd ;
716
717 if (jrd>0) {numokx++ ; jrdx+= jrd; }
718
719
720 if ( rd[i].bufTyp <0 ) { // we are skipping this board ...
721// just do nothing
722
723 } else if ( rd[i].bufTyp >0 ) { // we are reading data ...
724 if ( jrd < rd[i].bufLen ) { //not yet all read
725 rd[i].bufPos += jrd ; //==> prepare for continuation
726 rd[i].bufLen -= jrd ;
727 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
728 } else { //full dataset read
729 rd[i].bufLen = 0 ;
730 rd[i].bufPos = rd[i].fadLen ;
731 if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0]
732 || rd[i].rBuf->B[ rd[i].bufPos-2] != stop.B[1]) {
733 gi.evtErr++ ;
734 snprintf(str,MXSTR,"wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
735 i,rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0], rd[i].rBuf->B[1], rd[i].rBuf->B[ rd[i].bufPos-2],
736 rd[i].rBuf->B[ rd[i].bufPos-1]);
737 factOut(kError,301, str ) ;
738 goto EndBuf ;
739
740// } else {
741// snprintf(str,MXSTR,"good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
742// i,rd[i].fadLen,
743// rd[i].rBuf->B[ 0 ], rd[i].rBuf->B[ 1 ], start.B[1],start.B[0],
744// rd[i].rBuf->B[ rd[i].bufPos-2], rd[i].rBuf->B[ rd[i].bufPos-1], stop.B[1], stop.B[0]);
745// factOut(kDebug,301, str ) ;
746 }
747
748 if (jrd>0) debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
749
750 //we have a complete buffer, copy to WORK area
751 roi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 ]) ;
752 //get index into mBuffer for this event (create if needed)
753 evID = mBufEvt( rd[i].evtID, rd[i].runID, roi, i, rd[i].fadLen ) ;
754
755 if (evID <-9000) goto EndBuf ; //illegal event, skip it ...
756 if (evID < 0) {
757
758 if ( rd[i].bufLen != 0) {
759 snprintf(str,MXSTR,"something screwed up");
760 factOut(kFatal, 1, str ) ;
761 }
762 xwait.tv_sec = 0;
763 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec
764 nanosleep( &xwait , NULL ) ;
765 goto EndBuf1 ; //hope there is free space next round
766 }
767
768
769 //we have a valid entry in mBuffer[]; fill it
770
771int xchk = memcmp(&rd[i].xBuf->B[0], &rd[i].rBuf->B[0], rd[i].fadLen ) ;
772if (xchk != 0) {
773 snprintf(str,MXSTR,"ERROR OVERWRITE %d %d on port %d",xchk,rd[i].fadLen,i) ;
774 factOut(kFatal, 1, str ) ;
775
776 uint iq;
777 for (iq=0; iq < rd[i].fadLen ; iq++) {
778 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq] ) {
779 snprintf(str,MXSTR,"ERROR %4d %4d %x %x",i,iq,rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
780 factOut(kFatal, 1, str ) ;
781 }
782 }
783}
784
785
786 boardId = b ;
787 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
788 int fadCrate = fadBoard/256 ;
789 if (boardId != (fadCrate*10 + fadBoard%256) ) {
790 snprintf(str,MXSTR,"wrong Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
791 if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ; //print only few times
792 }
793 if ( mBuffer[evID].board[ boardId ] != -1) {
794 snprintf(str,MXSTR,"double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ",
795 evID,boardId,i,rd[i].fadLen, rd[i].rBuf->B[0],
796 rd[i].rBuf->B[1], rd[i].rBuf->B[ rd[i].bufPos-2], rd[i].rBuf->B[ rd[i].bufPos-1]);
797 factOut(kWarn,501, str ) ;
798 goto EndBuf ; //--> skip Board
799 }
800
801 int iDx = evtIdx[evID] ; //index into evtCtrl
802
803 memcpy( &mBuffer[evID].FADhead[boardId].start_package_flag,
804 &rd[i].rBuf->S[0], head_len) ;
805 roi = mBuffer[evID].nRoi ;
806
807 src = head_len/2 ;
808 for ( px=0; px<9; px++ ) { //different sort in FAD board.....
809 for ( drs=0; drs<4; drs++ ) {
810 pixH= ntohs(rd[i].rBuf->S[src++]) ; // ID
811 pixC= ntohs(rd[i].rBuf->S[src++]) ; // start-cell
812 pixR= ntohs(rd[i].rBuf->S[src++]) ; // roi
813//here we should check if pixH is correct ....
814
815 pixS = boardId*36 + drs*9 + px ;
816 src++ ;
817
818 if ( ( px != 8 && pixR == roi )
819 || ( px == 8 && pixR >= roi ) ) { //we have a reasonable roi
820
821 mBuffer[evID].fEvent->StartPix[pixS] =pixC;
822 dest= pixS * roi ;
823 memcpy(
824 &mBuffer[evID].fEvent->Adc_Data[dest],
825 &rd[i].rBuf->S[src], roi * 2) ;
826 src+= pixR ;
827
828 if ( px==8 ) {
829 if ( pixR > roi) { //and we have additional TM info
830 tmS =boardId*4 + drs ;
831 dest= tmS * roi + NPIX* roi ;
832 int srcT= src - roi ;
833 mBuffer[evID].fEvent->StartTM[tmS] = (pixC+pixR-roi)%1024 ;
834 memcpy(
835 &mBuffer[evID].fEvent->Adc_Data[dest],
836 &rd[i].rBuf->S[srcT], roi * 2) ;
837 } else {
838 mBuffer[evID].fEvent->StartTM[tmS] = -1 ;
839 }
840 }
841 } else {
842 snprintf(str,MXSTR,"wrong roi %d %d %d %d",px,pixR,roi,src-2);
843 gi.evtErr++ ;
844 factOut(kError,202, str ) ;
845 goto EndBuf ;
846 }
847 }
848 }// now we have stored a new board contents into Event structure
849
850 mBuffer[evID].board[ boardId ] = boardId ;
851 evtCtrl.evtStat[ iDx ]++ ;
852 evtCtrl.pcTime[ iDx ] = g_actTime ;
853
854 if (++mBuffer[evID].nBoard >= actBoards ) {
855 snprintf(str,MXSTR,"%5d complete event %8d %8d %2d",mBuffer[evID].evNum,evtCtrl.evtBuf[iDx],iDx,evtCtrl.evtStat[ iDx ]);
856 factOut(kDebug,-1, str ) ;
857
858 if (mBuffer[evID].runNum != actrun ) { // have we already reported first event of this run ???
859 actrun = mBuffer[i].runNum ;
860 int ir ;
861 for ( ir=0; ir<MAX_RUN; ir++) {
862 if ( runCtrl[ir].runId == actrun) {
863 if ( runCtrl[ir].lastEvt <0 ) {
864 runCtrl[ir].lastEvt = +1 ;
865 gotNewRun( actrun, mBuffer[evID].FADhead );
866 snprintf(str,MXSTR,"gotNewRun %d (ev %d)",mBuffer[evID].runNum,mBuffer[evID].evNum);
867 factOut(kInfo,1, str ) ;
868 break ;
869 }
870 }
871 }
872 }
873
874 //complete event read ---> flag for next processing
875 evtCtrl.evtStat[ iDx ] = 99;
876 gi.evtTot++ ;
877 }
878
879EndBuf:
880 rd[i].bufTyp = 0 ; //ready to read next header
881 rd[i].bufLen = frst_len ;
882 rd[i].bufPos = 0 ;
883EndBuf1:
884 ;
885 }
886
887 } else { //we are reading event header
888 rd[i].bufPos += jrd ;
889 rd[i].bufLen -= jrd ;
890 if ( rd[i].bufPos >= minLen ){ //sufficient data to take action
891 //check if startflag correct; else shift block ....
892 for (k=0; k<rd[i].bufPos -1 ; k++) {
893 if (rd[i].rBuf->B[k ] == start.B[1]
894 && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
895 }
896 rd[i].skip += k ;
897
898 if (k >= rd[i].bufPos-1 ) { //no start of header found
899 rd[i].bufPos = 0 ;
900 rd[i].bufLen = head_len ;
901 } else if ( k>0 ) {
902 rd[i].bufPos -= k ;
903 rd[i].bufLen += k ;
904 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ;
905 memcpy(&rd[i].xBuf->B[0], &rd[i].xBuf->B[k], rd[i].bufPos ) ;
906 }
907 if ( rd[i].bufPos >= minLen ) {
908 if ( rd[i].skip >0 ) {
909 snprintf(str,MXSTR,"skipped %d bytes on port%d", rd[i].skip, i ) ;
910 factOut(kInfo,666, str ) ;
911 rd[i].skip = 0 ;
912 }
913 goodhed++;
914 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ;
915 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
916 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; //(FADevt)
917 rd[i].ftmID = ntohl(rd[i].rBuf->I[5]) ; //(FTMevt)
918 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ;
919 rd[i].bufTyp = 1 ; //ready to read full record
920 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
921
922 int fadboard = ntohs(rd[i].rBuf->S[12] ) ;
923 int fadcrate = fadboard/256 ;
924 fadboard = (fadcrate*10 + fadboard%256) ;
925snprintf(str,MXSTR,"sk %3d head: %5d %5d %5d %10d %4d %6d",i,rd[i].fadLen,rd[i].evtID,rd[i].ftmID,rd[i].runID,fadboard,jrd) ;
926factOut(kDebug,1, str ) ;
927
928 if (rd[i].runID ==0 ) rd[i].runID = gi_myRun ;
929
930 if (rd[i].bufLen <=head_len || rd[i].bufLen > MAX_LEN ) {
931 snprintf(str,MXSTR,"illegal event-length on port %d",i) ;
932 factOut(kFatal,881, str ) ;
933 rd[i].bufLen = 100000 ; //?
934 }
935 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
936 debugHead(i,fadBoard,rd[i].rBuf);
937 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event
938 } else {
939 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
940 }
941 } else {
942 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
943 }
944
945 } //end interpreting last read
946 } //end of successful read anything
947 } //finished trying to read all sockets
948
949snprintf(str,MXSTR,"Loop ---- %3d --- %8d",numokx,jrdx);
950factOut(kDebug,-1, str ) ;
951 gi.numRead[ numok ] ++ ;
952
953 g_actTime = time(NULL) ;
954 if ( g_actTime > gi_SecTime ) {
955 gi_SecTime = g_actTime ;
956
957
958 //loop over all active events and flag those older than read-timeout
959 //delete those that are written to disk ....
960
961 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
962 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
963
964 gj.bufNew = gj.bufEvt = 0 ;
965 int k1=evtCtrl.frstPtr;
966 for ( k=k1; k<(k1+kd); k++ ) {
967 int k0 = k % (MAX_EVT*MAX_RUN) ;
968//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
969
970 if (evtCtrl.evtStat[k0] > 0
971 && evtCtrl.evtStat[k0] < 92 ) {
972 gj.bufNew++ ; //incomplete event in Buffer
973 if ( evtCtrl.evtStat[k0] < 90
974 && evtCtrl.pcTime[k0] < g_actTime-10 ) {
975 int id =evtCtrl.evtBuf[k0] ;
976 snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]);
977 factOut(kWarn,601, str ) ;
978 evtCtrl.evtStat[k0] = 91 ; //timeout for incomplete events
979 gi.evtSkp++ ;
980 gi.evtTot++ ;
981 gj.evtSkip++;
982 }
983 } else if (evtCtrl.evtStat[k0] >= 900 //'delete'
984 || evtCtrl.evtStat[k0] == 0 ) { //'useless'
985
986 int id =evtCtrl.evtBuf[k0] ;
987 snprintf(str,MXSTR,"%5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
988 factOut(kDebug,-1, str ) ;
989 mBufFree(id) ; //event written--> free memory
990 evtCtrl.evtStat[k0] = -1;
991 gj.evtWrite++ ;
992 gj.rateWrite++ ;
993 } else if (evtCtrl.evtStat[k0] >= 95 ) {
994 gj.bufEvt++ ; //complete event in Buffer
995 }
996
997 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) {
998 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
999 }
1000 }
1001
1002
1003 gj.deltaT = 1000 ; //temporary, must be improved
1004
1005 int b;
1006 for ( b=0; b<NBOARDS; b++) gj.totBytes[b] +=gj.rateBytes[b] ;
1007 gj.totMem = g_maxMem ;
1008 if (gj.maxMem >= gj.xxxMem) gj.xxxMem = gj.maxMem ;
1009
1010 factStat(gj);
1011 factStatNew(gi) ;
1012
1013 gj.rateNew = gj.rateWrite = 0 ;
1014 for ( b=0; b<NBOARDS; b++) gj.rateBytes[b] =0 ;
1015 }
1016
1017 if (numok > 0 ) numok2=0;
1018 else if (numok2++ > 3) {
1019 if (g_runStat == 1) {
1020 xwait.tv_sec = 1;
1021 xwait.tv_nsec= 0 ; // hibernate for 1 sec
1022 } else {
1023 xwait.tv_sec = 0;
1024 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1025 }
1026 nanosleep( &xwait , NULL ) ;
1027 }
1028
1029 } //and do next loop over all sockets ...
1030
1031
1032 snprintf(str,MXSTR,"stop reading ... RESET=%d",g_reset);
1033 factOut(kInfo,-1, str ) ;
1034
1035 if (g_reset >0 ) {
1036 gi_reset = g_reset ;
1037 gi_resetR = gi_reset%10 ; //shall we stop reading ?
1038 gi_resetS = (gi_reset/10)%10 ; //shall we close sockets ?
1039 gi_resetW = (gi_reset/100)%10 ; //shall we close files ?
1040 gi_resetX = gi_reset/1000 ; //shall we simply wait resetX seconds ?
1041 g_reset= 0;
1042 } else {
1043 gi_reset = 0;
1044 if ( g_runStat== -1 ) gi_resetR = 1 ;
1045 else gi_resetR = 7 ;
1046 gi_resetS = 7 ; //close all sockets
1047 gi_resetW = 7 ; //close all files
1048 gi_resetX = 0 ;
1049
1050 //inform others we have to quit ....
1051 gi_runStat = -11 ; //inform all that no update to happen any more
1052 gj.readStat= -11 ; //inform all that no update to happen any more
1053 }
1054
1055 if (gi_resetS > 0) {
1056 //must close all open sockets ...
1057 snprintf(str,MXSTR,"close all sockets ...");
1058 factOut(kInfo,-1, str ) ;
1059 for (i=0; i<MAX_SOCK; i++) {
1060 if (rd[i].sockStat ==0 ) {
1061 GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy open socket
1062 if ( i%7 == 0 ) {
1063 gi_NumConnect[ i/7 ]-- ;
1064 gi.numConn[ i/7 ]-- ;
1065 gj.numConn[ i/7 ]-- ;
1066 sockDef[i/7] = 0 ; //flag ro recreate the sockets ...
1067 rd[i/7].sockStat = -1; //and try to open asap
1068 }
1069 }
1070 }
1071 }
1072
1073
1074 if (gi_resetR > 0) {
1075 //flag all events as 'read finished'
1076 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1077 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1078
1079 int k1=evtCtrl.frstPtr;
1080 for ( k=k1; k<(k1+kd); k++ ) {
1081 int k0 = k % (MAX_EVT*MAX_RUN) ;
1082 if (evtCtrl.evtStat[k0] > 0
1083 && evtCtrl.evtStat[k0] < 90 ) {
1084 evtCtrl.evtStat[k0] = 91 ;
1085 gi.evtSkp++ ;
1086 gi.evtTot++ ;
1087 }
1088 }
1089
1090 xwait.tv_sec = 0;
1091 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1092 nanosleep( &xwait , NULL ) ;
1093
1094 //and clear all buffers (might have to wait until all others are done)
1095 int minclear ;
1096 if (gi_resetR == 1) {
1097 minclear = 900 ;
1098 snprintf(str,MXSTR,"drain all buffers ...");
1099 } else {
1100 minclear = 0 ;
1101 snprintf(str,MXSTR,"flush all buffers ...");
1102 }
1103 factOut(kInfo,-1, str ) ;
1104
1105 int numclear=1 ;
1106 while (numclear > 0 ) {
1107 numclear = 0 ;
1108 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1109 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1110
1111 int k1=evtCtrl.frstPtr;
1112 for ( k=k1; k<(k1+kd); k++ ) {
1113 int k0 = k % (MAX_EVT*MAX_RUN) ;
1114 if (evtCtrl.evtStat[k0] > minclear ) {
1115 int id =evtCtrl.evtBuf[k0] ;
1116 snprintf(str,MXSTR,"ev %5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
1117 factOut(kDebug,-1, str ) ;
1118 mBufFree(id) ; //event written--> free memory
1119 evtCtrl.evtStat[k0] = -1;
1120 } else if (evtCtrl.evtStat[k0] > 0) numclear++ ; //writing is still ongoing...
1121
1122 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 )
1123 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
1124 }
1125
1126 xwait.tv_sec = 0;
1127 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1128 nanosleep( &xwait , NULL ) ;
1129 }
1130 }
1131
1132 if (gi_reset > 0) {
1133 if (gi_resetW > 0) {
1134 CloseRunFile(0,0,0) ; //ask all Runs to be closed
1135 }
1136 if (gi_resetX > 0) {
1137 xwait.tv_sec = gi_resetX;
1138 xwait.tv_nsec= 0 ;
1139 nanosleep( &xwait , NULL ) ;
1140 }
1141
1142 snprintf(str,MXSTR,"Continue read Process ...");
1143 factOut(kInfo,-1, str ) ;
1144 gi_reset = 0 ;
1145 goto START ;
1146 }
1147
1148
1149
1150 snprintf(str,MXSTR,"Exit read Process ...");
1151 factOut(kInfo,-1, str ) ;
1152 gi_runStat = -99 ;
1153 gj.readStat= -99 ;
1154 factStat(gj);
1155 factStatNew(gi) ;
1156 return 0;
1157
1158} /*-----------------------------------------------------------------*/
1159
1160
1161void *procEvt( void *ptr ) {
1162/* *** main loop processing file, including SW-trigger */
1163 int numProc, numWait ;
1164 int k ;
1165 struct timespec xwait ;
1166 char str[MXSTR] ;
1167
1168 cpu_set_t mask;
1169 int cpu = 5 ; //process thread (will be several in final version)
1170
1171 snprintf(str,MXSTR,"Starting process-thread");
1172 factOut(kInfo,-1, str ) ;
1173
1174/* CPU_ZERO initializes all the bits in the mask to zero. */
1175 CPU_ZERO( &mask );
1176/* CPU_SET sets only the bit corresponding to cpu. */
1177 CPU_SET( cpu, &mask );
1178/* sched_setaffinity returns 0 in success */
1179 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1180 snprintf(str,MXSTR,"P ---> can not create affinity to %d",cpu);
1181 factOut(kWarn,-1, str ) ;
1182 }
1183
1184
1185 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1186
1187 numWait = numProc = 0 ;
1188 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1189 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1190
1191 int k1=evtCtrl.frstPtr;
1192 for ( k=k1; k<(k1+kd); k++ ) {
1193 int k0 = k % (MAX_EVT*MAX_RUN) ;
1194//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1195 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
1196
1197 if ( gi_resetR > 1 ) { //we are asked to flush buffers asap
1198 evtCtrl.evtStat[k0] = 991 ;
1199 } else {
1200
1201 int id = evtCtrl.evtBuf[k0] ;
1202 int ievt = mBuffer[id].evNum ;
1203 int roi = mBuffer[id].nRoi ;
1204// uint32_t irun = mBuffer[id].runNum ;
1205//snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ;
1206//factOut(kDebug,-1, str ) ;
1207
1208//make sure unused pixels/tmarks are cleared to zero
1209 int ip,it,dest,ib;
1210 for (ip=0; ip<NPIX; ip++) {
1211 if (mBuffer[id].fEvent->StartPix[ip] == -1 ) {
1212 dest= ip*roi ;
1213 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
1214 }
1215 }
1216 for (it=0; it<NTMARK; it++) {
1217 if (mBuffer[id].fEvent->StartTM[it] == -1 ) {
1218 dest= it*roi + NPIX*roi ;
1219 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
1220 }
1221 }
1222
1223
1224//and set correct event header ; also check for consistency in event (not yet)
1225 mBuffer[id].fEvent->Roi = roi ;
1226 mBuffer[id].fEvent->EventNum = ievt ;
1227 mBuffer[id].fEvent->TriggerType = 0 ; // TBD
1228 mBuffer[id].fEvent->SoftTrig = 0 ;
1229 for (ib=0; ib<NBOARDS; ib++) {
1230 if (mBuffer[id].board[ib] == -1 ) { //board is not read
1231 mBuffer[id].FADhead[ib].start_package_flag = 0 ;
1232 mBuffer[id].fEvent->BoardTime[ib] = 0 ;
1233 } else {
1234 mBuffer[id].fEvent->BoardTime[ib] =
1235 ntohl(mBuffer[id].FADhead[ib].time) ;
1236 }
1237 }
1238
1239 int i=eventCheck(mBuffer[id].FADhead,mBuffer[id].fEvent) ;
1240// gj.procEvt++ ;
1241 gi.procTot++ ;
1242 numProc++ ;
1243
1244 if (i<0) {
1245 evtCtrl.evtStat[k0] = 999 ; //flag event to be skipped
1246 gi.procErr++ ;
1247 } else {
1248 evtCtrl.evtStat[k0] = 520 ;
1249 }
1250 }
1251 } else if ( evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] < 90 ) {
1252 numWait++ ;
1253 }
1254 }
1255
1256 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do
1257 snprintf(str,MXSTR,"Exit Processing Process ...");
1258 factOut(kInfo,-1, str ) ;
1259 gp_runStat = -22 ; //==> we should exit
1260 gj.procStat= -22 ; //==> we should exit
1261 return 0 ;
1262 }
1263
1264 if (numProc == 0) {
1265 //seems we have nothing to do, so sleep a little
1266 xwait.tv_sec = 0;
1267 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1268 nanosleep( &xwait , NULL ) ;
1269 }
1270 gp_runStat = gi_runStat ;
1271 gj.procStat= gj.readStat ;
1272
1273 }
1274
1275 //we are asked to abort asap ==> must flag all remaining events
1276 // when gi_runStat claims that all events are in the buffer...
1277
1278 snprintf(str,MXSTR,"Abort Processing Process ...");
1279 factOut(kInfo,-1, str ) ;
1280 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1281 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1282
1283 int k1=evtCtrl.frstPtr;
1284 for ( k=k1; k<(k1+kd); k++ ) {
1285 int k0 = k % (MAX_EVT*MAX_RUN) ;
1286 if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <500) {
1287 evtCtrl.evtStat[k0] = 555 ; //flag event as 'processed'
1288 }
1289 }
1290
1291 gp_runStat = -99 ;
1292 gj.procStat= -99 ;
1293
1294 return 0;
1295
1296} /*-----------------------------------------------------------------*/
1297
1298int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt) {
1299/* close run runId (all all runs if runId=0) */
1300/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1301 int j ;
1302
1303
1304 if ( runId == 0 ) {
1305 for ( j=0; j<MAX_RUN; j++) {
1306 if ( runCtrl[j].fileId == 0 ) { //run is open
1307 runCtrl[j].closeTime = closeTime ;
1308 runCtrl[j].maxEvt = maxEvt ;
1309 }
1310 }
1311 return 0;
1312 }
1313
1314 for ( j=0; j<MAX_RUN; j++) {
1315 if ( runCtrl[j].runId == runId ) {
1316 if ( runCtrl[j].fileId == 0 ) { //run is open
1317 runCtrl[j].closeTime = closeTime ;
1318 runCtrl[j].maxEvt = maxEvt ;
1319 return 0;
1320 } else if ( runCtrl[j].fileId <0 ) { //run not yet opened
1321 runCtrl[j].closeTime = closeTime ;
1322 runCtrl[j].maxEvt = maxEvt ;
1323 return +1;
1324 } else { // run already closed
1325 return +2;
1326 }
1327 }
1328 } //we only reach here if the run was never created
1329 return -1;
1330
1331} /*-----------------------------------------------------------------*/
1332
1333
1334void *writeEvt( void *ptr ) {
1335/* *** main loop writing event (including opening and closing run-files */
1336
1337 int numWrite, numWait ;
1338 int k,j,i ;
1339 struct timespec xwait ;
1340 char str[MXSTR] ;
1341
1342 cpu_set_t mask;
1343 int cpu = 3 ; //write thread
1344
1345 snprintf(str,MXSTR,"Starting write-thread");
1346 factOut(kInfo,-1, str ) ;
1347
1348/* CPU_ZERO initializes all the bits in the mask to zero. */
1349 CPU_ZERO( &mask );
1350/* CPU_SET sets only the bit corresponding to cpu. */
1351 CPU_SET( cpu, &mask );
1352/* sched_setaffinity returns 0 in success */
1353 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1354 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
1355 }
1356
1357 int lastRun = 0 ; //usually run from last event still valid
1358
1359 while (g_runStat >-2) {
1360
1361 numWait = numWrite = 0 ;
1362 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1363 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1364
1365 int k1=evtCtrl.frstPtr;
1366 for ( k=k1; k<(k1+kd); k++ ) {
1367 int k0 = k % (MAX_EVT*MAX_RUN) ;
1368//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1369 if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] < 900) {
1370
1371 if ( gi_resetR > 1 ) { //we must drain the buffer asap
1372 evtCtrl.evtStat[k0] = 904 ;
1373 } else {
1374
1375
1376 int id = evtCtrl.evtBuf[k0] ;
1377 uint32_t irun = mBuffer[id].runNum ;
1378 int ievt = mBuffer[id].evNum ;
1379
1380 gi.wrtTot++ ;
1381 if (runCtrl[lastRun].runId == irun ) {
1382 j = lastRun ;
1383 } else {
1384 //check which fileID to use (or open if needed)
1385 for ( j=0; j<MAX_RUN; j++) {
1386 if ( runCtrl[j].runId == irun ) break ;
1387 }
1388 if ( j >= MAX_RUN ) {
1389 snprintf(str,MXSTR,"W error: can not find run %d for event %d in %d", irun,ievt,id);
1390 factOut(kFatal,901, str ) ;
1391 gi.wrtErr++ ;
1392for ( j=0; j<MAX_RUN; j++) printf("j %d run.j %d run %d\n",j,runCtrl[j].runId,irun );
1393exit(111);
1394 }
1395 lastRun = j ;
1396 }
1397
1398 if (runCtrl[j].fileId < 0 ) {
1399 actRun.Version = 1 ;
1400 actRun.RunType = -1 ;
1401
1402 actRun.RunTime = runCtrl[j].firstTime ;
1403 actRun.RunUsec = runCtrl[j].firstTime ;
1404 actRun.NBoard = NBOARDS ;
1405 actRun.NPix = NPIX ;
1406 actRun.NTm = NTMARK ;
1407 actRun.Nroi = mBuffer[id].nRoi ;
1408 memcpy(actRun.FADhead, mBuffer[id].FADhead, NBOARDS* sizeof(PEVNT_HEADER) ) ;
1409
1410 runCtrl[j].fileHd = runOpen(irun, &actRun, sizeof(actRun) ) ;
1411 if (runCtrl[j].fileHd == NULL ) {
1412 snprintf(str,MXSTR,"W could not open a file for run %d",irun);
1413 factOut(kError,502, str ) ;
1414 runCtrl[j].fileId = 91 ;
1415 } else {
1416 snprintf(str,MXSTR,"W opened new run_file %d",irun) ;
1417 factOut(kInfo,-1, str ) ;
1418 runCtrl[j].fileId = 0 ;
1419 }
1420
1421 }
1422
1423 if (runCtrl[j].fileId != 0 ) {
1424 if (runCtrl[j].fileId < 0 ) {
1425 snprintf(str,MXSTR,"W never opened file for this run %d",irun) ;
1426 factOut(kError,123,str) ;
1427 } else if (runCtrl[j].fileId < 100 ) {
1428 snprintf(str,MXSTR,"W.file for this run is closed %d",irun) ;
1429 factOut(kWarn,123,str) ;
1430 runCtrl[j].fileId += 100 ;
1431 } else {
1432 snprintf(str,MXSTR,"W:file for this run is closed %d",irun) ;
1433 factOut(kDebug,123,str) ;
1434 }
1435 evtCtrl.evtStat[k0] = 903 ;
1436 gi.wrtErr++ ;
1437 } else {
1438 i=runWrite(runCtrl[j].fileHd, mBuffer[id].fEvent, sizeof(mBuffer[id]) );
1439 if ( i>=0 ) {
1440 runCtrl[j].lastTime = g_actTime;
1441 runCtrl[j].actEvt++ ;
1442 evtCtrl.evtStat[k0] = 901 ;
1443 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0);
1444 factOut(kDebug,504, str ) ;
1445// gj.writEvt++ ;
1446 } else {
1447 snprintf(str,MXSTR,"W error writing event for run %d",irun) ;
1448 factOut(kError,503, str ) ;
1449 evtCtrl.evtStat[k0] = 902 ;
1450 gi.wrtErr++ ;
1451 }
1452
1453 if ( i < 0
1454 || runCtrl[j].lastTime < g_actTime-300
1455 || runCtrl[j].closeTime < g_actTime
1456 || runCtrl[j].maxEvt < runCtrl[j].actEvt ) {
1457int ii =0 ;
1458if ( i < 0 ) ii=1 ;
1459else if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1460else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1461else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1462
1463
1464
1465 //close run for whatever reason
1466 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1467 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1468 if (i<0) {
1469 snprintf(str,MXSTR,"error closing run %d %d AAA",runCtrl[j].runId,i) ;
1470 factOut(kError,503, str ) ;
1471 runCtrl[j].fileId = 92 ;
1472 } else {
1473 snprintf(str,MXSTR,"W closed run %d AAA %d",irun,ii) ;
1474 factOut(kInfo,503, str ) ;
1475 runCtrl[j].fileId = 93 ;
1476 }
1477 }
1478 }
1479 }
1480 } else if (evtCtrl.evtStat[k0] > 0
1481 && evtCtrl.evtStat[k0] < 900 ) numWait++ ;
1482 }
1483
1484 //check if we should close a run (mainly when no event pending)
1485 for ( j=0; j<MAX_RUN; j++) {
1486 if ( runCtrl[j].fileId==0
1487 && ( runCtrl[j].closeTime < g_actTime
1488 ||runCtrl[j].lastTime < g_actTime-300
1489 ||runCtrl[j].maxEvt < runCtrl[j].actEvt ) ) {
1490 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1491int ii =0 ;
1492 if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1493else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1494else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1495
1496
1497 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1498 if (i<0) {
1499 snprintf(str,MXSTR,"error closing run %d %d BBB",runCtrl[j].runId,i) ;
1500 factOut(kError,506, str ) ;
1501 runCtrl[j].fileId = 94 ;
1502 } else {
1503 snprintf(str,MXSTR,"W closed run %d BBB %d",runCtrl[j].runId,ii) ;
1504 factOut(kInfo,507, str ) ;
1505 runCtrl[j].fileId = 95 ;
1506 }
1507 }
1508 }
1509
1510 if (numWrite == 0) {
1511 //seems we have nothing to do, so sleep a little
1512 xwait.tv_sec = 0;
1513 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1514 nanosleep( &xwait , NULL ) ;
1515 }
1516
1517 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do
1518 snprintf(str,MXSTR,"Finish Write Process ...");
1519 factOut(kInfo,-1, str ) ;
1520 gw_runStat = -22 ; //==> we should exit
1521 gj.writStat= -22 ; //==> we should exit
1522 goto closerun ;
1523 }
1524 gw_runStat = gi_runStat ;
1525 gj.writStat= gj.readStat ;
1526
1527 }
1528
1529 //must close all open files ....
1530 snprintf(str,MXSTR,"Abort Writing Process ...");
1531 factOut(kInfo,-1, str ) ;
1532
1533closerun:
1534 snprintf(str,MXSTR,"Close all open files ...");
1535 factOut(kInfo,-1, str ) ;
1536 for ( j=0; j<MAX_RUN; j++)
1537 if ( runCtrl[j].fileId ==0 ) {
1538 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1539 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1540int ii =0 ;
1541 if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1542else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1543else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1544 if (i<0) {
1545 snprintf(str,MXSTR,"error closing run %d %d CCC",runCtrl[j].runId,i) ;
1546 factOut(kError,506, str ) ;
1547 runCtrl[j].fileId = 96 ;
1548 } else {
1549 snprintf(str,MXSTR,"W closed run %d CCC %d",runCtrl[j].runId,ii) ;
1550 factOut(kInfo,507, str ) ;
1551 runCtrl[j].fileId = 97 ;
1552 }
1553 }
1554
1555 gw_runStat = -99;
1556 gj.writStat= -99;
1557 snprintf(str,MXSTR,"Exit Writing Process ...");
1558 factOut(kInfo,-1, str ) ;
1559 return 0;
1560
1561
1562
1563
1564} /*-----------------------------------------------------------------*/
1565
1566
1567
1568
1569void StartEvtBuild() {
1570
1571 int i,j,imax,status,th_ret[50] ;
1572 pthread_t thread[50] ;
1573 struct timespec xwait ;
1574
1575 gi_runStat = gp_runStat = gw_runStat = 0 ;
1576 gj.readStat= gj.procStat= gj.writStat= 0 ;
1577
1578 snprintf(str,MXSTR,"Starting EventBuilder V15.07 A");
1579 factOut(kInfo,-1, str ) ;
1580
1581//initialize run control logics
1582 for (i=0; i<MAX_RUN; i++) {
1583 runCtrl[i].runId = 0 ;
1584 runCtrl[i].fileId = -2 ;
1585 }
1586
1587//partially initialize event control logics
1588 evtCtrl.frstPtr = 0 ;
1589 evtCtrl.lastPtr = 0 ;
1590
1591//start all threads (more to come) when we are allowed to ....
1592 while (g_runStat == 0 ) {
1593 xwait.tv_sec = 0;
1594 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec
1595 nanosleep( &xwait , NULL ) ;
1596 }
1597
1598 i=0 ;
1599 th_ret[i] = pthread_create( &thread[i], NULL, readFAD, NULL );
1600 i++;
1601 th_ret[i] = pthread_create( &thread[i], NULL, procEvt, NULL );
1602 i++;
1603 th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, NULL );
1604 i++;
1605 imax=i ;
1606
1607
1608#ifdef BILAND
1609 xwait.tv_sec = 30;;
1610 xwait.tv_nsec= 0 ; // sleep for ~20sec
1611 nanosleep( &xwait , NULL ) ;
1612
1613 printf("close all runs in 2 seconds\n");
1614
1615 CloseRunFile( 0, time(NULL)+2, 0) ;
1616
1617 xwait.tv_sec = 1;;
1618 xwait.tv_nsec= 0 ; // sleep for ~20sec
1619 nanosleep( &xwait , NULL ) ;
1620
1621 printf("setting g_runstat to -1\n");
1622
1623 g_runStat = -1 ;
1624#endif
1625
1626
1627//wait for all threads to finish
1628 for (i=0; i<imax; i++) {
1629 j = pthread_join ( thread[i], (void **)&status) ;
1630 }
1631
1632} /*-----------------------------------------------------------------*/
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648 /*-----------------------------------------------------------------*/
1649 /*-----------------------------------------------------------------*/
1650 /*-----------------------------------------------------------------*/
1651 /*-----------------------------------------------------------------*/
1652 /*-----------------------------------------------------------------*/
1653
1654
1655#ifdef BILAND
1656
1657
1658 /*-----------------------------------------------------------------*/
1659 /*-----------------------------------------------------------------*/
1660 /*-----------------------------------------------------------------*/
1661 /*-----------------------------------------------------------------*/
1662 /*-----------------------------------------------------------------*/
1663
1664
1665
1666
1667FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len )
1668{ return 1; } ;
1669
1670int runWrite(FileHandle_t fileHd , EVENT *event, size_t len )
1671{ return 1; usleep(10000); return 1; }
1672
1673
1674//{ return 1; } ;
1675
1676int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len )
1677{ return 1; } ;
1678
1679
1680
1681int eventCheck( PEVNT_HEADER *fadhd, EVENT *event)
1682{
1683 int i=0;
1684
1685// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
1686// for (i=0; i<NBOARDS; i++) {
1687// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
1688// }
1689 return 0;
1690}
1691
1692
1693void factStatNew(EVT_STAT gi) {
1694 int i ;
1695
1696//for (i=0;i<MAX_SOCK;i++) {
1697// printf("%4d",gi.numRead[i]);
1698// if (i%20 == 0 ) printf("\n");
1699//}
1700}
1701
1702void gotNewRun( int runnr, PEVNT_HEADER *headers )
1703{ printf("got new run %d\n",runnr); return; }
1704
1705void factStat(GUI_STAT gj) {
1706// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
1707// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
1708}
1709
1710
1711void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) {
1712// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
1713}
1714
1715
1716
1717void debugStream(int isock, void *buf, int len) {
1718}
1719
1720void debugHead(int i, int j, void *buf) {
1721}
1722
1723
1724void factOut(int severity, int err, char* message ) {
1725static FILE * fd ;
1726static int file=0 ;
1727
1728 if (file==0) {
1729 printf("open file\n");
1730 fd=fopen("x.out","w+") ;
1731 file=999;
1732 }
1733
1734 fprintf(fd,"%3d %3d | %s \n",severity,err,message) ;
1735
1736 if (severity != kDebug)
1737 printf("%3d %3d | %s\n",severity,err,message) ;
1738}
1739
1740
1741
1742int main() {
1743 int i,b,c,p ;
1744 char ipStr[100] ;
1745 struct in_addr IPaddr ;
1746
1747 g_maxMem = 1024*1024 ; //MBytes
1748//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
1749 g_maxMem = g_maxMem * 200; //100MBytes
1750
1751
1752 g_runStat = 40 ;
1753
1754 i=0 ;
1755
1756// version for standard crates
1757//for (c=0; c<4,c++) {
1758// for (b=0; b<10; b++) {
1759// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
1760//
1761// inet_pton(PF_INET, ipStr, &IPaddr) ;
1762//
1763// g_port[i].sockAddr.sin_family = PF_INET;
1764// g_port[i].sockAddr.sin_port = htons(5000) ;
1765// g_port[i].sockAddr.sin_addr = IPaddr ;
1766// g_port[i].sockDef = 1 ;
1767// i++ ;
1768// }
1769//}
1770//
1771//version for PC-test *
1772 for (c=0; c<4; c++) {
1773 for (b=0; b<10; b++) {
1774 sprintf(ipStr,"10.0.%d.11",128+c) ;
1775 if (c<2) sprintf(ipStr,"10.0.%d.11",128) ;
1776 else sprintf(ipStr,"10.0.%d.11",131) ;
1777// if (c==0) sprintf(ipStr,"10.0.100.11") ;
1778
1779 inet_pton(PF_INET, ipStr, &IPaddr) ;
1780 p = 31919+100*c+10*b;
1781
1782
1783 g_port[i].sockAddr.sin_family = PF_INET;
1784 g_port[i].sockAddr.sin_port = htons(p) ;
1785 g_port[i].sockAddr.sin_addr = IPaddr ;
1786 g_port[i].sockDef = 1 ;
1787
1788 i++ ;
1789 }
1790 }
1791
1792
1793//g_port[17].sockDef =-1 ;
1794//g_actBoards-- ;
1795
1796 StartEvtBuild() ;
1797
1798 return 0;
1799
1800}
1801#endif
Note: See TracBrowser for help on using the repository browser.