source: trunk/FACT++/src/EventBuilder.c@ 11292

Last change on this file since 11292 was 11282, checked in by tbretz, 13 years ago
Mainly new statistics.
File size: 47.9 KB
Line 
1
2
3#define PX8 99 //simulator does not create double-length roi for pixel 8
4 //for real data, set PX8 = 8 ==> ask for double roi=TM
5
6
7
8#include <stdlib.h>
9#include <stdint.h>
10#include <unistd.h>
11#include <stdio.h>
12#include <sys/time.h>
13#include <arpa/inet.h>
14#include <string.h>
15#include <math.h>
16#include <error.h>
17#include <errno.h>
18#include <unistd.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <pthread.h>
22#include <sched.h>
23
24#include "EventBuilder.h"
25
26enum Severity
27{
28 kMessage = 10, ///< Just a message, usually obsolete
29 kInfo = 20, ///< An info telling something which can be interesting to know
30 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
31 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
32 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
33 kDebug = 99, ///< A message used for debugging only
34};
35
36#define MIN_LEN 32 // min #bytes needed to interpret FADheader
37#define MAX_LEN 256*1024 // size of read-buffer per socket
38
39extern FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) ;
40extern int runWrite(FileHandle_t fileHd , EVENT *event, size_t len ) ;
41extern int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len ) ;
42extern void factOut(int severity, int err, char* message ) ;
43
44extern void factStat(GUI_STAT gj) ;
45
46extern void factStatNew(EVT_STAT gi) ;
47
48extern int eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ;
49
50extern void debugHead(int i, int j, void *buf);
51
52extern void debugRead(int isock, int ibyte, int32_t event,int32_t ftmevt,
53 int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) ;
54extern void debugStream(int isock, void *buf, int len) ;
55
56
57
58int g_actTime ;
59int g_runStat ;
60size_t g_maxMem ; //maximum memory allowed for buffer
61
62//no longer needed ...
63 int g_maxBoards ; //maximum number of boards to be initialized
64 int g_actBoards ;
65//
66
67FACT_SOCK g_port[NBOARDS] ; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
68
69
70 int gi_runStat ;
71 int gp_runStat ;
72 int gw_runStat ;
73 int32_t gi_myRun ;
74
75
76
77uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
78
79//uint gi_EvtStart= 0 ;
80//uint gi_EvtRead = 0 ;
81//uint gi_EvtBad = 0 ;
82//uint gi_EvtTot = 0 ;
83//size_t gi_usedMem = 0 ;
84
85//uint gw_EvtTot = 0 ;
86//uint gp_EvtTot = 0 ;
87
88PIX_MAP g_pixMap[NPIX] ;
89
90EVT_STAT gi ;
91GUI_STAT gj ;
92
93EVT_CTRL evtCtrl ; //control of events during processing
94int evtIdx[MAX_EVT*MAX_RUN] ; //index from mBuffer to evtCtrl
95
96WRK_DATA mBuffer[MAX_EVT*MAX_RUN]; //local working space
97
98
99
100
101RUN_HEAD actRun ;
102
103RUN_CTRL runCtrl[MAX_RUN] ;
104
105RUN_TAIL runTail[MAX_RUN] ;
106
107
108/*
109*** Definition of rdBuffer to read in IP packets; keep it global !!!!
110 */
111
112
113typedef union {
114 int8_t B[MAX_LEN/8];
115 int16_t S[MAX_LEN/4];
116 int32_t I[MAX_LEN/2];
117 int64_t L[MAX_LEN ];
118} CNV_FACT ;
119
120typedef struct {
121 int bufTyp ; //what are we reading at the moment: 0=header 1=data -1=skip ...
122 int32_t bufPos ; //next byte to read to the buffer next
123 int32_t bufLen ; //number of bytes left to read
124 int32_t skip ; //number of bytes skipped before start of event
125
126 int sockStat ; //-1 if socket not yet connected , 99 if not exist
127 int socket ; //contains the sockets
128 struct sockaddr_in SockAddr ; //IP for each socket
129
130 int evtID ; // event ID of event currently read
131 int runID ; // run "
132 int ftmID ; // event ID from FTM
133 uint fadLen ; // FADlength of event currently read
134 int fadVers ; // Version of FAD
135 int board ; // boardID (softwareID: 0..40 )
136 int Port ;
137
138 CNV_FACT *rBuf ;
139
140} READ_STRUCT ;
141
142
143typedef union {
144 int8_t B[2];
145 int16_t S ;
146} SHORT_BYTE ;
147
148
149
150
151
152#define MXSTR 1000
153char str[MXSTR] ;
154
155SHORT_BYTE start, stop;
156
157READ_STRUCT rd[MAX_SOCK] ; //buffer to read IP and afterwards store in mBuffer
158
159
160
161/*-----------------------------------------------------------------*/
162
163
164/*-----------------------------------------------------------------*/
165
166
167
168int GenSock(int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) {
169/*
170*** generate Address, create sockets and allocates readbuffer for it
171***
172*** if flag==0 generate socket and buffer
173*** <0 destroy socket and buffer
174*** >0 close and redo socket
175***
176*** sid : board*7 + port id
177 */
178
179 int j ;
180
181 if (rd->sockStat ==0 ) { //close socket if open
182 j=close(rd->socket) ;
183 if (j>0) {
184 snprintf(str,MXSTR,"Error closing socket %d | %m",sid);
185 factOut(kFatal,771, str ) ;
186 } else {
187 snprintf(str,MXSTR,"Succesfully closed socket %d",sid);
188 factOut(kInfo,771, str ) ;
189 }
190 }
191
192
193 if (flag < 0) {
194 free(rd->rBuf) ; //and never open again
195 rd->rBuf = NULL ;
196 rd->sockStat = 99 ;
197 return 0 ;
198 }
199
200
201 if (flag == 0) { //generate address and buffer ...
202 rd->Port = port ;
203 rd->SockAddr.sin_family = sockAddr->sin_family;
204 rd->SockAddr.sin_port = htons(port) ;
205 rd->SockAddr.sin_addr = sockAddr->sin_addr ;
206
207 rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
208 if ( rd->rBuf == NULL ) {
209 snprintf(str,MXSTR,"Could not create local buffer %d",sid);
210 factOut(kFatal,774, str ) ;
211 rd->sockStat = 77 ;
212 return -3 ;
213 }
214 }
215
216
217 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
218 snprintf(str,MXSTR,"Could not generate socket %d | %m",sid);
219 factOut(kFatal,773, str ) ;
220 rd->sockStat = 88 ;
221 return -2 ;
222 }
223
224 snprintf(str,MXSTR,"Successfully generated socket %d ",sid);
225 factOut(kInfo,773, str ) ;
226 rd->sockStat = -1 ; //try to (re)open socket
227 return 0 ;
228
229} /*-----------------------------------------------------------------*/
230
231 /*-----------------------------------------------------------------*/
232
233
234
235
236int mBufInit() {
237// initialize mBuffer (mark all entries as unused\empty)
238
239 int i ;
240 uint32_t actime ;
241
242 actime = g_actTime + 50000000 ;
243
244 for (i=0; i<MAX_EVT*MAX_RUN; i++) {
245 mBuffer[i].evNum = mBuffer[i].runNum = -1;
246
247 evtCtrl.evtBuf[ i] = -1 ;
248 evtCtrl.evtStat[ i] = -1 ;
249 evtCtrl.pcTime[ i] = actime ; //initiate to far future
250
251 }
252
253
254 actRun.FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ;
255
256 evtCtrl.frstPtr = 0 ;
257 evtCtrl.lastPtr = 0 ;
258
259 return 0 ;
260
261} /*-----------------------------------------------------------------*/
262
263
264
265
266int mBufEvt(uint evID, uint runID, uint nRoi) {
267// generate a new Event into mBuffer:
268// make sure only complete Event are possible, so 'free' will always work
269// returns index into mBuffer[], or negative value in case of error
270
271
272 int i, k, evFree ;
273 int headmem=0 ;
274 size_t needmem = 0 ;
275
276 if (nRoi <=0 || nRoi > 1024) {
277 snprintf(str,MXSTR,"illegal nRoi: %d",nRoi) ;
278 factOut(kError, 1, str ) ;
279 return -9999 ;
280 }
281
282 i = evID % MAX_EVT ;
283 evFree = -1 ;
284
285 for ( k=0; k<MAX_RUN; k++) {
286 if ( mBuffer[i].evNum == evID
287 && mBuffer[i].runNum== runID ) {
288 return i ;
289 }
290 if ( evFree < 0 && mBuffer[i].evNum < 0 ) evFree = i ;
291 i += MAX_EVT ;
292 }
293
294
295 //event does not yet exist; create
296 if (evFree < 0 ) { //no space available in ctrl
297 snprintf(str,MXSTR,"no control slot to keep event %d",evID) ;
298 factOut(kError,881, str ) ;
299 return -1 ;
300 }
301 i = evFree ; //found free entry; use it ...
302
303
304 needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2; //
305
306 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
307
308 if ( gj.usdMem + needmem + headmem > g_maxMem) {
309 snprintf(str,MXSTR,"no memory left to keep event %d",evID) ;
310 factOut(kError,882, str ) ;
311 return -11 ;
312 }
313
314 mBuffer[i].FADhead = malloc( headmem ) ;
315 if (mBuffer[i].FADhead == NULL) {
316 snprintf(str,MXSTR,"malloc header failed for event %d",evID) ;
317 factOut(kError,882, str ) ;
318 return -12;
319 }
320
321 mBuffer[i].fEvent = malloc( needmem ) ;
322 if (mBuffer[i].fEvent == NULL) {
323 snprintf(str,MXSTR,"malloc data failed for event %d",evID) ;
324 factOut(kError,882, str ) ;
325 free(mBuffer[i].fEvent) ;
326 mBuffer[i].fEvent = NULL ;
327 return -22;
328 }
329
330 //flag all boards as unused
331 mBuffer[i].nBoard = 0 ;
332 for (k=0; k<NBOARDS; k++ ) {
333 mBuffer[i].board[k] = -1;
334 }
335
336 //flag all pixels as unused
337 for (k=0; k<NPIX; k++ ) {
338 mBuffer[i].fEvent->StartPix[k] = -1 ;
339 }
340
341 //flag all TMark as unused
342 for (k=0; k<NTMARK; k++ ) {
343 mBuffer[i].fEvent->StartTM[k] = -1 ;
344 }
345
346 mBuffer[i].pcTime = g_actTime ;
347 mBuffer[i].nRoi = nRoi ;
348 mBuffer[i].evNum = evID ;
349 mBuffer[i].runNum = runID ;
350 mBuffer[i].evtLen = needmem ;
351
352 gj.usdMem += needmem + headmem;
353 if (gj.usdMem > gj.maxMem ) gj.maxMem = gj.usdMem ;
354
355 //register event in 'active list (reading)'
356
357 evtCtrl.evtBuf[ evtCtrl.lastPtr] = i ;
358 evtCtrl.evtStat[ evtCtrl.lastPtr] = 0 ;
359 evtCtrl.pcTime[ evtCtrl.lastPtr] = g_actTime ;
360 evtIdx[i] = evtCtrl.lastPtr ;
361
362 gj.readEvt++ ; //#events read
363 gj.evtBuf++ ;
364
365
366snprintf(str,MXSTR,"%5d start new evt %8d %8d %2d",evID,i,evtCtrl.lastPtr,0);
367factOut(kDebug,-11, str ) ;
368 evtCtrl.lastPtr++ ;
369 if (evtCtrl.lastPtr == MAX_EVT*MAX_RUN ) evtCtrl.lastPtr = 0;
370
371
372 gi.evtGet++ ;
373
374 //check if runId already registered in runCtrl
375 evFree = -1 ;
376 for (k=0; k<MAX_RUN; k++) {
377 if (runCtrl[k].runId == runID ) return i ;//run exists already
378 else if (evFree < 0 && runCtrl[k].runId == 0 ) evFree = k ;
379 }
380
381 if (evFree <0 ) {
382 snprintf(str,MXSTR,"not able to register the new run %d",runID);
383 factOut(kFatal,883, str ) ;
384 } else {
385 runCtrl[evFree].runId = runID ;
386 runCtrl[evFree].fileId = -2 ;
387 runCtrl[evFree].nextEvt= 0;
388 runCtrl[evFree].actEvt = 0;
389 runCtrl[evFree].maxEvt = 999999999 ; //max number events allowed
390 runCtrl[evFree].lastTime=g_actTime ;
391 runCtrl[evFree].closeTime=g_actTime + 3600*24 ; //max time allowed
392 runCtrl[evFree].lastTime = 0 ;
393
394 runTail[evFree].nEventsOk =
395 runTail[evFree].nEventsRej =
396 runTail[evFree].nEventsBad =
397 runTail[evFree].PCtime0 =
398 runTail[evFree].PCtimeX = 0 ;
399 }
400
401 return i ;
402
403} /*-----------------------------------------------------------------*/
404
405
406
407
408int mBufFree(int i) {
409//delete entry [i] from mBuffer:
410//(and make sure multiple calls do no harm ....)
411
412 int headmem=0 ;
413 size_t freemem = 0 ;
414
415 if ( mBuffer[i].nRoi > 0) { //have an fEvent structure generated ...
416 freemem = mBuffer[i].evtLen ;
417 free(mBuffer[i].fEvent ) ;
418 mBuffer[i].fEvent = NULL ;
419
420 free(mBuffer[i].FADhead ) ;
421 mBuffer[i].FADhead = NULL ;
422
423 }
424 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
425 mBuffer[i].evNum = mBuffer[i].runNum = mBuffer[i].nRoi= -1;
426
427 gj.usdMem = gj.usdMem - freemem - headmem;
428 gj.evtBuf-- ;
429
430 return 0 ;
431
432} /*-----------------------------------------------------------------*/
433
434
435void resetEvtStat() {
436 int i ;
437
438 for (i=0; i<MAX_SOCK; i++) gi.numRead[i] = 0 ;
439
440 for (i=0; i<NBOARDS; i++ ) {
441 gi.gotByte[i] = 0 ;
442 gi.gotErr[i] = 0 ;
443 }
444
445 gi.evtGet = 0 ; //#new Start of Events read
446 gi.evtTot = 0 ; //#complete Events read
447 gi.evtErr = 0 ; //#Events with Errors
448 gi.evtSkp = 0 ; //#Events incomplete (timeout)
449
450 gi.procTot = 0 ; //#Events processed
451 gi.procErr = 0 ; //#Events showed problem in processing
452 gi.procTrg = 0 ; //#Events accepted by SW trigger
453 gi.procSkp = 0 ; //#Events rejected by SW trigger
454
455 gi.feedTot = 0 ; //#Events used for feedBack system
456 gi.feedErr = 0 ; //#Events rejected by feedBack
457
458 gi.wrtTot = 0 ; //#Events written to disk
459 gi.wrtErr = 0 ; //#Events with write-error
460
461 gi.runOpen = 0 ; //#Runs opened
462 gi.runClose= 0 ; //#Runs closed
463 gi.runErr = 0 ; //#Runs with open/close errors
464
465return ;
466} /*-----------------------------------------------------------------*/
467
468
469
470void initReadFAD() {
471return ;
472} /*-----------------------------------------------------------------*/
473
474
475
476void *readFAD( void *ptr ) {
477/* *** main loop reading FAD data and sorting them to complete events */
478 int head_len,frst_len,numok,numok2,dest,evID,i,k ;
479 int actBoards = 0, minLen ;
480 int32_t jrd ;
481 int64_t stat[9] ;
482 int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ;
483 uint qtot = 0, qread = 0, qconn = 0 ;
484
485 int goodhed = 0 ;
486 int errcnt0 = 0 ;
487
488 struct timespec xwait ;
489
490
491 struct timeval *tv, atv;
492 tv=&atv;
493 uint32_t tsec, tusec ;
494
495
496 snprintf(str,MXSTR,"start initializing");
497 factOut(kInfo,-1, str ) ;
498
499 int cpu = 7 ; //read thread
500 cpu_set_t mask;
501
502/* CPU_ZERO initializes all the bits in the mask to zero. */
503 CPU_ZERO( &mask );
504/* CPU_SET sets only the bit corresponding to cpu. */
505 cpu = 7 ;
506 CPU_SET( cpu, &mask );
507
508/* sched_setaffinity returns 0 in success */
509 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
510 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
511 factOut(kWarn,-1, str ) ;
512 }
513
514
515 //make sure all sockets are preallocated as 'not exist'
516 for (i=0; i<MAX_SOCK; i++) {
517 rd[i].socket = -1 ;
518 rd[i].sockStat = 99 ;
519 }
520
521 int sockDef[NBOARDS]; //internal state of sockets
522 for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ;
523
524
525 resetEvtStat();
526 gj.usdMem = gj.maxMem = 0 ;
527 gj.evtBuf = 0 ;
528
529 for (k=0; k<NBOARDS; k++) {
530 gi_NumConnect[k]=0;
531 gi.numConn[k] =0;
532 }
533
534
535 uint gi_SecTime ; //time in seconds
536 gi_SecTime= g_actTime ;
537
538 mBufInit() ; //initialize buffers
539
540 snprintf(str,MXSTR,"end initializing");
541 factOut(kInfo,-1, str ) ;
542
543
544
545 head_len = sizeof(PEVNT_HEADER) ;
546//frst_len = head_len + 36 * 12 ; //fad_header plus 36*pix_header
547 frst_len = head_len ; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
548
549//minLen = MIN_LEN ; //min #bytes needed to check header
550 minLen = head_len ; //min #bytes needed to check header: full header for debug
551
552 numok = numok2 = 0 ;
553
554 start.S=0xFB01;
555 stop.S= 0x04FE;
556
557 gi_myRun = g_actTime ;
558 gi_runStat = g_runStat ;
559 gj.readStat= g_runStat ;
560
561
562 while (g_runStat >=0) { //loop until global variable g_runStat claims stop
563
564 gi_runStat = g_runStat;
565 gj.readStat= g_runStat;
566 gj.totMem = g_maxMem ;
567// if (gi.totMem < 50000000 ) gi.totMem = 500000000 ;
568
569 int b,p,p0,s0,nch;
570 nch = 0 ;
571 for (b=0; b<NBOARDS; b++ ) {
572 k = b*7 ;
573 if ( g_port[b].sockDef != sockDef[b] ) { //something has changed ...
574 nch++ ;
575 gi_NumConnect[ b ] = 0 ; //must close all connections
576 gi.numConn[ b ] = 0;
577 if ( sockDef[b] == 0) s0= 0 ; //sockets to be defined and opened
578 else if (g_port[b].sockDef == 0) s0=-1 ; //sockets to be destroyed
579 else s0=+1 ; //sockets to be closed and reopened
580
581 if (s0 == 0) p0=ntohs(g_port[b].sockAddr.sin_port);
582 else p0=0 ;
583
584 for (p=p0+1; p<p0+8; p++) {
585 GenSock(s0, k, p, &g_port[b].sockAddr, &rd[k]) ; //generate address and socket
586 k++ ;
587 }
588 sockDef[b] = g_port[b].sockDef ;
589 }
590 }
591
592 if (nch > 0 ) {
593 actBoards = 0 ;
594 for (b=0; b<NBOARDS; b++ ) {
595 if ( sockDef[b] > 0 ) actBoards++ ;
596 }
597 }
598
599
600
601
602 numok = 0 ; //count number of succesfull actions
603
604 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read
605 b = i / 7 ;
606 if (sockDef[b] > 0) s0=+1 ;
607 else s0=-1 ;
608
609 gettimeofday( tv, NULL);
610 g_actTime = tsec = atv.tv_sec ;
611 tusec= atv.tv_usec ;
612
613 if (rd[i].sockStat <0 ) { //try to connect if not yet done
614 rd[i].sockStat=connect(rd[i].socket,
615 (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ;
616 if (rd[i].sockStat ==0 ) { //successfull ==>
617 if (sockDef[b] > 0) {
618 rd[i].bufTyp = 0 ; // expect a header
619 rd[i].bufLen = frst_len ; // max size to read at begining
620 } else {
621 rd[i].bufTyp = -1 ; // full data to be skipped
622 rd[i].bufLen = sizeof(CNV_FACT) ; //huge for skipping
623 }
624 rd[i].bufPos = 0 ; // no byte read so far
625 rd[i].skip = 0 ; // start empty
626 gi_NumConnect[ b ]++ ;
627 gi.numConn[ b ]++ ;
628 snprintf(str,MXSTR,"+++connect %d %d",b,gi.numConn[ b ]);
629 factOut(kInfo,-1, str ) ;
630 }
631 }
632
633 if (rd[i].sockStat ==0 ) { //we have a connection ==> try to read
634 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
635 numok++ ;
636 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
637
638 if (jrd >0 ) {
639 qread+=jrd ;
640 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ;
641 }
642
643 if (jrd == 0) { //connection has closed ...
644 snprintf(str,MXSTR,"Socket %d closed by FAD",i);
645 factOut(kInfo,441, str ) ;
646 GenSock(s0, i, 0,NULL, &rd[i]) ;
647 gi.gotErr[ b ]++ ;
648 gi_NumConnect[ b ]-- ;
649 gi.numConn[ b ]-- ;
650
651 } else if ( jrd<0 ) { //did not read anything
652 if (errno != EAGAIN && errno != EWOULDBLOCK ) {
653 snprintf(str,MXSTR,"Error Reading from %d | %m",i);
654 factOut(kError,442, str ) ;
655 gi.gotErr[ b ]++ ;
656 } else numok-- ; //else nothing waiting to be read
657 jrd = 0 ;
658 }
659 } else jrd = 0 ; //did read nothing as requested
660
661 gi.gotByte[ b ] += jrd ;
662
663 if ( rd[i].bufTyp <0 ) { // we are skipping this board ...
664// just do nothing
665
666 } else if ( rd[i].bufTyp >0 ) { // we are reading data ...
667 if ( jrd < rd[i].bufLen ) { //not yet all read
668 rd[i].bufPos += jrd ; //==> prepare for continuation
669 rd[i].bufLen -= jrd ;
670 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
671 } else { //full dataset read
672 rd[i].bufLen = 0 ;
673 rd[i].bufPos = rd[i].fadLen ;
674 if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0]
675 && rd[i].rBuf->B[ rd[i].bufPos ] != stop.B[1]) {
676 gi.evtErr++ ;
677 snprintf(str,MXSTR,"wrong end of buffer found %d",rd[i].bufPos);
678 factOut(kError,301, str ) ;
679 goto EndBuf ;
680
681 }
682 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
683
684 //we have a complete buffer, copy to WORK area
685 roi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 ]) ;
686 //get index into mBuffer for this event (create if needed)
687 evID = mBufEvt( rd[i].evtID, rd[i].runID, roi ) ;
688
689 if (evID <-9000) goto EndBuf ; //illegal event, skip it ...
690 if (evID < 0) {
691 xwait.tv_sec = 0;
692 xwait.tv_nsec= 20000000 ; // sleep for ~20 msec
693 nanosleep( &xwait , NULL ) ;
694 goto EndBuf1 ; //hope there is free space next round
695 }
696
697
698 //we have a valid entry in mBuffer[]; fill it
699
700 boardId = b ;
701 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
702 int fadCrate = fadBoard/256 ;
703 if (boardId != (fadCrate*10 + fadBoard%256) ) {
704 snprintf(str,MXSTR,"wrong Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
705 if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ; //print only few times
706 }
707 if ( mBuffer[evID].board[ boardId ] != -1) {
708 snprintf(str,MXSTR,"double board %d for event %d",boardId,evID) ;
709 factOut(kWarn,501, str ) ;
710 goto EndBuf ; //--> skip Board
711 }
712
713 int iDx = evtIdx[evID] ; //index into evtCtrl
714
715 memcpy( &mBuffer[evID].FADhead[boardId].start_package_flag,
716 &rd[i].rBuf->S[0], head_len) ;
717 roi = mBuffer[evID].nRoi ;
718
719 pixS = boardId*36 -1 ; //
720 tmS = boardId*4 -1 ; //
721 src = head_len/2 ;
722 for ( drs=0; drs<4; drs++ ) {
723 for ( px=0; px<9; px++ ) {
724 pixH= ntohs(rd[i].rBuf->S[src++]) ;
725 pixC= ntohs(rd[i].rBuf->S[src++]) ;
726 pixR= ntohs(rd[i].rBuf->S[src++]) ;
727
728 src++ ;
729 pixS++ ; //pixS = pixH2S[pixH] ;
730 if ( ( px < PX8 && pixR == roi )
731 || ( px ==PX8 && pixR == 2*roi )
732 || ( px ==PX8 && pixR == roi && roi > 512 ) ) {
733 // correct roi
734 mBuffer[evID].fEvent->StartPix[pixS] =pixC;
735 dest= pixS * roi ;
736 memcpy(
737 &mBuffer[evID].fEvent->Adc_Data[dest],
738 &rd[i].rBuf->S[src], roi * 2) ;
739 src+= roi ;
740 if ( px==PX8 ) {
741 tmS++; // tmS = tmH2S[pixH]
742 dest= tmS * roi + NPIX* roi ;
743 if ( roi <=512 ) {
744 mBuffer[evID].fEvent->StartTM[tmS] =(pixC+roi)%1024 ;
745 memcpy(
746 &mBuffer[evID].fEvent->Adc_Data[dest],
747 &rd[i].rBuf->S[src], roi * 2) ;
748 src+=roi ;
749 } else {
750 mBuffer[evID].fEvent->StartTM[tmS] = -1 ;
751 }
752 }
753 } else {
754 snprintf(str,MXSTR,"wrong roi %d %d %d %d",px,pixR,roi,src-2);
755 gi.evtErr++ ;
756 factOut(kError,202, str ) ;
757 goto EndBuf ;
758 }
759 }
760 }// now we have stored a new board contents into Event structure
761
762 mBuffer[evID].board[ boardId ] = boardId ;
763 evtCtrl.evtStat[ iDx ]++ ;
764 evtCtrl.pcTime[ iDx ] = g_actTime ;
765
766 if (++mBuffer[evID].nBoard >= actBoards ) {
767 snprintf(str,MXSTR,"%5d complete event %8d %8d %2d",mBuffer[evID].evNum,evtCtrl.evtBuf[iDx],iDx,evtCtrl.evtStat[ iDx ]);
768 factOut(kDebug,-1, str ) ;
769 //complete event read ---> flag for next processing
770 evtCtrl.evtStat[ iDx ] = 99;
771 gi.evtTot++ ;
772 }
773
774EndBuf:
775 rd[i].bufTyp = 0 ; //ready to read next header
776 rd[i].bufLen = frst_len ;
777 rd[i].bufPos = 0 ;
778EndBuf1:
779 ;
780 }
781
782 } else { //we are reading event header
783 rd[i].bufPos += jrd ;
784 rd[i].bufLen -= jrd ;
785 if ( rd[i].bufPos >= minLen ){ //sufficient data to take action
786 //check if startflag correct; else shift block ....
787 for (k=0; k<rd[i].bufPos -1 ; k++) {
788 if (rd[i].rBuf->B[k ] == start.B[1]
789 && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
790 }
791 rd[i].skip += k ;
792
793 if (k >= rd[i].bufPos-1 ) { //no start of header found
794// snprintf(str,MXSTR,"no start of header on port%d", i ) ;
795// factOut(kWarn,666, str ) ;
796
797 rd[i].bufPos = 0 ;
798 rd[i].bufLen = head_len ;
799 } else if ( k>0 ) {
800 rd[i].bufPos -= k ;
801 rd[i].bufLen += k ;
802 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ;
803 }
804 if ( rd[i].bufPos >= minLen ) {
805 if ( rd[i].skip >0 ) {
806 snprintf(str,MXSTR,"skipped %d bytes on port%d", rd[i].skip, i ) ;
807 factOut(kInfo,666, str ) ;
808 rd[i].skip = 0 ;
809 }
810 goodhed++;
811 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ;
812 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
813 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; //(FADevt)
814 rd[i].ftmID = ntohl(rd[i].rBuf->I[5]) ; //(FTMevt)
815 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ;
816 if (rd[i].runID ==0 ) rd[i].runID = gi_myRun ;
817 rd[i].bufTyp = 1 ; //ready to read full record
818 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
819 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ; //?
820 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
821 debugHead(i,fadBoard,rd[i].rBuf);
822 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event
823 } else {
824 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
825 }
826 } else {
827 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
828 }
829
830 } //end interpreting last read
831 } //end of successful read anything
832 } //finished trying to read all sockets
833
834 gi.numRead[ numok ] ++ ;
835
836 int qwait=0, qdel=0, qskip=0 ;
837 g_actTime = time(NULL) ;
838 if ( g_actTime > gi_SecTime ) {
839 gi_SecTime = g_actTime ;
840
841
842 //loop over all active events and flag those older than read-timeout
843 //delete those that are written to disk ....
844
845 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
846 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
847
848 int k1=evtCtrl.frstPtr;
849 for ( k=k1; k<(k1+kd); k++ ) {
850 int k0 = k % (MAX_EVT*MAX_RUN) ;
851//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
852 if (evtCtrl.evtStat[k0] > 0
853 && evtCtrl.evtStat[k0] < 90 ) {
854
855 qwait++;
856
857 if( evtCtrl.pcTime[k0] < g_actTime-10 ) {
858 int id =evtCtrl.evtBuf[k0] ;
859 snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]);
860 factOut(kWarn,601, str ) ;
861 evtCtrl.evtStat[k0] = 91 ; //timeout for incomplete events
862 gi.evtSkp++ ;
863 gi.evtTot++ ;
864 gj.skipEvt++;
865 qskip++;
866 }
867
868
869 } else if (evtCtrl.evtStat[k0] >= 900 ) {
870
871 int id =evtCtrl.evtBuf[k0] ;
872 snprintf(str,MXSTR,"%5d free event buffer (written) %3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
873 factOut(kDebug,-1, str ) ;
874 mBufFree(id) ; //event written--> free memory
875 evtCtrl.evtStat[k0] = -1;
876 qdel++;
877 qtot++;
878 }
879
880 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) {
881 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
882 }
883 }
884
885qconn=0 ;
886int ib ;
887for (ib=0; ib<NBOARDS; ib++) qconn+=gi_NumConnect[ib] ;
888 stat[0]= qwait;
889 stat[1]= qskip;
890 stat[2]= qdel ;
891 stat[3]= qtot ;
892 stat[4]= gj.usdMem ;
893 stat[5]= qread;
894 stat[6]= qconn;
895
896qread=0 ;
897 gj.deltaT = 1000 ; //temporary, must be improved
898
899 factStat(gj);
900 factStatNew(gi) ;
901
902 gj.readEvt = gj.procEvt = gj.writEvt = gj.skipEvt = 0 ;
903 }
904
905
906
907
908 if (numok > 0 ) numok2=0;
909 else if (numok2++ > 3) {
910 if (g_runStat == 1) {
911 xwait.tv_sec = 1;
912 xwait.tv_nsec= 0 ; // hibernate for 1 sec
913 } else {
914 xwait.tv_sec = 0;
915 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
916 }
917 nanosleep( &xwait , NULL ) ;
918 }
919
920 } //and do next loop over all sockets ...
921
922 //must quit eventbuilding
923 snprintf(str,MXSTR,"stop reading ...");
924 factOut(kInfo,-1, str ) ;
925
926 //flag all events as 'read finished'
927 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
928 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
929
930 int k1=evtCtrl.frstPtr;
931
932 for ( k=k1; k<(k1+kd); k++ ) {
933 int k0 = k % (MAX_EVT*MAX_RUN) ;
934 if (evtCtrl.evtStat[k0] > 0
935 && evtCtrl.evtStat[k0] < 90 ) {
936 evtCtrl.evtStat[k0] = 91 ;
937 gi.evtSkp++ ;
938 gi.evtTot++ ;
939 }
940 }
941
942 //must close all open sockets ...
943 snprintf(str,MXSTR,"close all sockets ...");
944 factOut(kInfo,-1, str ) ;
945 for (i=0; i<MAX_SOCK; i++) {
946 if (rd[i].sockStat ==0 ) {
947 GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy socket
948 gi_NumConnect[ i/7 ]-- ;
949 gi.numConn[ i/7 ]-- ;
950 }
951 }
952
953 xwait.tv_sec = 0;
954 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
955 nanosleep( &xwait , NULL ) ;
956 gi_runStat = -11 ; //inform all that no update to happen any more
957 gj.readStat= -11 ; //inform all that no update to happen any more
958
959
960 int minclear = 900 ; //usually wait until writing finished (stat 900)
961 if (g_runStat <-1 ) minclear = 0 ; //in case of abort clear all
962
963
964 //and clear all buffers (might have to wait until all others are done)
965 snprintf(str,MXSTR,"clear all buffers ...");
966 factOut(kInfo,-1, str ) ;
967 int numclear=1 ;
968 while (numclear > 0 ) {
969 numclear = 0 ;
970 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
971 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
972
973 int k1=evtCtrl.frstPtr;
974 for ( k=k1; k<(k1+kd); k++ ) {
975 int k0 = k % (MAX_EVT*MAX_RUN) ;
976 if (evtCtrl.evtStat[k0] > minclear ) {
977 int id =evtCtrl.evtBuf[k0] ;
978 mBufFree(id) ; //event written--> free memory
979 evtCtrl.evtStat[k0] = -1;
980 } else if (evtCtrl.evtStat[k0] > 0) numclear++ ; //writing is still ongoing...
981
982 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 )
983 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
984 }
985
986 xwait.tv_sec = 0;
987 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
988 nanosleep( &xwait , NULL ) ;
989 }
990
991 snprintf(str,MXSTR,"Exit read Process ...");
992 factOut(kInfo,-1, str ) ;
993 gi_runStat = -99 ;
994 gj.readStat= -99 ;
995 factStat(gj);
996 factStatNew(gi) ;
997 return 0;
998
999} /*-----------------------------------------------------------------*/
1000
1001
1002void *procEvt( void *ptr ) {
1003/* *** main loop processing file, including SW-trigger */
1004 int numProc, numWait ;
1005 int k ;
1006 struct timespec xwait ;
1007 char str[MXSTR] ;
1008
1009 cpu_set_t mask;
1010 int cpu = 5 ; //process thread (will be several in final version)
1011
1012 snprintf(str,MXSTR,"Starting process-thread");
1013 factOut(kInfo,-1, str ) ;
1014
1015/* CPU_ZERO initializes all the bits in the mask to zero. */
1016 CPU_ZERO( &mask );
1017/* CPU_SET sets only the bit corresponding to cpu. */
1018 CPU_SET( cpu, &mask );
1019/* sched_setaffinity returns 0 in success */
1020 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1021 snprintf(str,MXSTR,"P ---> can not create affinity to %d",cpu);
1022 factOut(kWarn,-1, str ) ;
1023 }
1024
1025
1026 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1027
1028 numWait = numProc = 0 ;
1029 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1030 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1031
1032 int k1=evtCtrl.frstPtr;
1033 for ( k=k1; k<(k1+kd); k++ ) {
1034 int k0 = k % (MAX_EVT*MAX_RUN) ;
1035//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1036 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
1037 int id = evtCtrl.evtBuf[k0] ;
1038 int ievt = mBuffer[id].evNum ;
1039 int roi = mBuffer[id].nRoi ;
1040// uint32_t irun = mBuffer[id].runNum ;
1041//snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ;
1042//factOut(kDebug,-1, str ) ;
1043
1044//make sure unused pixels/tmarks are cleared to zero
1045 int ip,it,dest,ib;
1046 for (ip=0; ip<NPIX; ip++) {
1047 if (mBuffer[id].fEvent->StartPix[ip] == -1 ) {
1048 dest= ip*roi ;
1049 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
1050 }
1051 }
1052 for (it=0; it<NTMARK; it++) {
1053 if (mBuffer[id].fEvent->StartTM[it] == -1 ) {
1054 dest= it*roi + NPIX*roi ;
1055 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
1056 }
1057 }
1058
1059
1060//and set correct event header ; also check for consistency in event (not yet)
1061 mBuffer[id].fEvent->Roi = roi ;
1062 mBuffer[id].fEvent->EventNum = ievt ;
1063 mBuffer[id].fEvent->TriggerType = 0 ; // TBD
1064 mBuffer[id].fEvent->SoftTrig = 0 ;
1065 for (ib=0; ib<NBOARDS; ib++) {
1066 if (mBuffer[id].board[ib] == -1 ) { //board is not read
1067 mBuffer[id].FADhead[ib].start_package_flag = 0 ;
1068 mBuffer[id].fEvent->BoardTime[ib] = 0 ;
1069 } else {
1070 mBuffer[id].fEvent->BoardTime[ib] =
1071 ntohl(mBuffer[id].FADhead[ib].time) ;
1072 }
1073 }
1074
1075 int i=eventCheck(mBuffer[id].FADhead,mBuffer[id].fEvent) ;
1076 gj.procEvt++ ;
1077 gi.procTot++ ;
1078 numProc++ ;
1079 evtCtrl.evtStat[k0] = 520 ;
1080
1081 if (i<0) {
1082 evtCtrl.evtStat[k0] = 999 ; //flag event to be skipped
1083 gi.procErr++ ;
1084 }
1085 } else if ( evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] < 90 ) {
1086 numWait++ ;
1087 }
1088 }
1089
1090 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do
1091 snprintf(str,MXSTR,"Exit Processing Process ...");
1092 factOut(kInfo,-1, str ) ;
1093 gp_runStat = -22 ; //==> we should exit
1094 gj.procStat= -22 ; //==> we should exit
1095 return 0 ;
1096 }
1097
1098 if (numProc == 0) {
1099 //seems we have nothing to do, so sleep a little
1100 xwait.tv_sec = 0;
1101 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1102 nanosleep( &xwait , NULL ) ;
1103 }
1104 gp_runStat = gi_runStat ;
1105 gj.procStat= gj.readStat ;
1106
1107 }
1108
1109 //we are asked to abort asap ==> must flag all remaining events
1110 // when gi_runStat claims that all events are in the buffer...
1111
1112 snprintf(str,MXSTR,"Abort Processing Process ...");
1113 factOut(kInfo,-1, str ) ;
1114 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1115 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1116
1117 int k1=evtCtrl.frstPtr;
1118 for ( k=k1; k<(k1+kd); k++ ) {
1119 int k0 = k % (MAX_EVT*MAX_RUN) ;
1120 if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <500) {
1121 evtCtrl.evtStat[k0] = 555 ; //flag event as 'processed'
1122 }
1123 }
1124
1125 gp_runStat = -99 ;
1126 gj.procStat= -99 ;
1127
1128 return 0;
1129
1130} /*-----------------------------------------------------------------*/
1131
1132int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt) {
1133/* close run runId (all all runs if runId=0) */
1134/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1135 int j ;
1136
1137
1138 if ( runId == 0 ) {
1139 for ( j=0; j<MAX_RUN; j++) {
1140 if ( runCtrl[j].fileId == 0 ) { //run is open
1141 runCtrl[j].closeTime = closeTime ;
1142 runCtrl[j].maxEvt = maxEvt ;
1143 }
1144 }
1145 return 0;
1146 }
1147
1148 for ( j=0; j<MAX_RUN; j++) {
1149 if ( runCtrl[j].runId == runId ) {
1150 if ( runCtrl[j].fileId == 0 ) { //run is open
1151 runCtrl[j].closeTime = closeTime ;
1152 runCtrl[j].maxEvt = maxEvt ;
1153 return 0;
1154 } else if ( runCtrl[j].fileId <0 ) { //run not yet opened
1155 runCtrl[j].closeTime = closeTime ;
1156 runCtrl[j].maxEvt = maxEvt ;
1157 return +1;
1158 } else { // run already closed
1159 return +2;
1160 }
1161 }
1162 } //we only reach here if the run was never created
1163 return -1;
1164
1165} /*-----------------------------------------------------------------*/
1166
1167
1168void *writeEvt( void *ptr ) {
1169/* *** main loop writing event (including opening and closing run-files */
1170
1171 int numWrite, numWait ;
1172 int k,j,i ;
1173 struct timespec xwait ;
1174 char str[MXSTR] ;
1175
1176 cpu_set_t mask;
1177 int cpu = 3 ; //write thread
1178
1179 snprintf(str,MXSTR,"Starting write-thread");
1180 factOut(kInfo,-1, str ) ;
1181
1182/* CPU_ZERO initializes all the bits in the mask to zero. */
1183 CPU_ZERO( &mask );
1184/* CPU_SET sets only the bit corresponding to cpu. */
1185 CPU_SET( cpu, &mask );
1186/* sched_setaffinity returns 0 in success */
1187 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1188 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
1189 }
1190
1191 int lastRun = 0 ; //usually run from last event still valid
1192
1193 while (g_runStat >-2) {
1194
1195 numWait = numWrite = 0 ;
1196 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1197 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1198
1199 int k1=evtCtrl.frstPtr;
1200 for ( k=k1; k<(k1+kd); k++ ) {
1201 int k0 = k % (MAX_EVT*MAX_RUN) ;
1202//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1203 if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] < 900) {
1204 int id = evtCtrl.evtBuf[k0] ;
1205 uint32_t irun = mBuffer[id].runNum ;
1206 int ievt = mBuffer[id].evNum ;
1207
1208 gi.wrtTot++ ;
1209 if (runCtrl[lastRun].runId == irun ) {
1210 j = lastRun ;
1211 } else {
1212 //check which fileID to use (or open if needed)
1213 for ( j=0; j<MAX_RUN; j++) {
1214 if ( runCtrl[j].runId == irun ) break ;
1215 }
1216 if ( j >= MAX_RUN ) {
1217 snprintf(str,MXSTR,"W error: can not find run %d for event %d in %d", irun,ievt,id);
1218 factOut(kFatal,901, str ) ;
1219 gi.wrtErr++ ;
1220for ( j=0; j<MAX_RUN; j++) printf("j %d run.j %d run %d\n",j,runCtrl[j].runId,irun );
1221exit(111);
1222 }
1223 lastRun = j ;
1224 }
1225
1226 if (runCtrl[j].fileId < 0 ) {
1227 actRun.Version = 1 ;
1228 actRun.RunType = -1 ;
1229 actRun.NBoard = NBOARDS ;
1230 actRun.NPix = NPIX ;
1231 actRun.NTm = NTMARK ;
1232 actRun.Nroi = mBuffer[id].nRoi ;
1233// actRun.FADhead = mBuffer[id].FADhead ; //to be corrected
1234
1235 runCtrl[j].fileHd = runOpen(irun, &actRun, sizeof(actRun) ) ;
1236 if (runCtrl[j].fileHd == NULL ) {
1237 snprintf(str,MXSTR,"W could not open a file for run %d",irun);
1238 factOut(kError,502, str ) ;
1239 runCtrl[j].fileId = 99 ;
1240 } else {
1241 snprintf(str,MXSTR,"W opened new run_file %d",irun) ;
1242 factOut(kInfo,-1, str ) ;
1243 runCtrl[j].fileId = 0 ;
1244 }
1245
1246 }
1247
1248 if (runCtrl[j].fileId != 0 ) {
1249 snprintf(str,MXSTR,"W no open file for this run %d",irun) ;
1250 factOut(kWarn,123,str) ;
1251 evtCtrl.evtStat[k0] = 902 ;
1252 gi.wrtErr++ ;
1253 } else {
1254 i=runWrite(runCtrl[j].fileHd, mBuffer[id].fEvent, sizeof(mBuffer[id]) );
1255 if ( i>=0 ) {
1256 runCtrl[j].lastTime = g_actTime;
1257 runCtrl[j].actEvt++ ;
1258 evtCtrl.evtStat[k0] = 901 ;
1259 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0);
1260 factOut(kDebug,504, str ) ;
1261 gj.writEvt++ ;
1262 } else {
1263 snprintf(str,MXSTR,"W error writing event for run %d",irun) ;
1264 factOut(kError,503, str ) ;
1265 evtCtrl.evtStat[k0] = 901 ;
1266 gi.wrtErr++ ;
1267 }
1268
1269 if ( i < 0
1270 || runCtrl[j].closeTime < g_actTime
1271 || runCtrl[j].lastTime < g_actTime-300
1272 || runCtrl[j].maxEvt < runCtrl[j].actEvt ) {
1273int ii =0 ;
1274if ( i < 0 ) ii=1 ;
1275else if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1276else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1277else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1278
1279
1280
1281 //close run for whatever reason
1282 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1283 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1284 if (i<0) {
1285 snprintf(str,MXSTR,"error closing run %d %d AAA",runCtrl[j].runId,i) ;
1286 factOut(kError,503, str ) ;
1287 runCtrl[j].fileId = 9001 ;
1288 } else {
1289 snprintf(str,MXSTR,"W closed run %d AAA %d",irun,ii) ;
1290 factOut(kInfo,503, str ) ;
1291 runCtrl[j].fileId = 901 ;
1292 }
1293 }
1294 }
1295 } else if (evtCtrl.evtStat[k0] > 0 ) numWait++ ;
1296 }
1297
1298 //check if we should close a run (mainly when no event pending)
1299 for ( j=0; j<MAX_RUN; j++) {
1300 if ( runCtrl[j].fileId==0
1301 && ( runCtrl[j].closeTime < g_actTime
1302 ||runCtrl[j].lastTime < g_actTime-300
1303 ||runCtrl[j].maxEvt < runCtrl[j].actEvt ) ) {
1304 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1305int ii =0 ;
1306if ( i < 0 ) ii=1 ;
1307else if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1308else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1309else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1310
1311
1312 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1313 if (i<0) {
1314 snprintf(str,MXSTR,"error closing run %d %d BBB",runCtrl[j].runId,i) ;
1315 factOut(kError,506, str ) ;
1316 runCtrl[j].fileId = 9011 ;
1317 } else {
1318 snprintf(str,MXSTR,"W closed run %d BBB %d",runCtrl[j].runId,ii) ;
1319 factOut(kInfo,507, str ) ;
1320 runCtrl[j].fileId = 911 ;
1321 }
1322 }
1323 }
1324
1325 if (numWrite == 0) {
1326 //seems we have nothing to do, so sleep a little
1327 xwait.tv_sec = 0;
1328 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1329 nanosleep( &xwait , NULL ) ;
1330 }
1331
1332 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do
1333 snprintf(str,MXSTR,"Finish Write Process ...");
1334 factOut(kInfo,-1, str ) ;
1335 gw_runStat = -22 ; //==> we should exit
1336 gj.writStat= -22 ; //==> we should exit
1337 goto closerun ;
1338 }
1339 gw_runStat = gi_runStat ;
1340 gj.writStat= gj.readStat ;
1341
1342 }
1343
1344 //must close all open files ....
1345 snprintf(str,MXSTR,"Abort Writing Process ...");
1346 factOut(kInfo,-1, str ) ;
1347closerun:
1348 snprintf(str,MXSTR,"Close all open files ...");
1349 factOut(kInfo,-1, str ) ;
1350 for ( j=0; j<MAX_RUN; j++)
1351 if ( runCtrl[j].fileId ==0 ) {
1352 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1353 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1354int ii =0 ;
1355if ( i < 0 ) ii=1 ;
1356else if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1357else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1358else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1359 if (i<0) {
1360 snprintf(str,MXSTR,"error closing run %d %d CCC",runCtrl[j].runId,i) ;
1361 factOut(kError,506, str ) ;
1362 runCtrl[j].fileId = 9021 ;
1363 } else {
1364 snprintf(str,MXSTR,"W closed run %d CCC %d",runCtrl[j].runId,ii) ;
1365 factOut(kInfo,507, str ) ;
1366 runCtrl[j].fileId = 921 ;
1367 }
1368 }
1369
1370 gw_runStat = -99;
1371 gj.writStat= -99;
1372 snprintf(str,MXSTR,"Exit Writing Process ...");
1373 factOut(kInfo,-1, str ) ;
1374 return 0;
1375
1376
1377
1378
1379} /*-----------------------------------------------------------------*/
1380
1381
1382
1383
1384void StartEvtBuild() {
1385
1386 int i,j,imax,status,th_ret[50] ;
1387 pthread_t thread[50] ;
1388 struct timespec xwait ;
1389 uint32_t actime ;
1390
1391 gi_runStat = gp_runStat = gw_runStat = 0 ;
1392 gj.readStat= gj.procStat= gj.writStat= 0 ;
1393
1394
1395 snprintf(str,MXSTR,"Starting EventBuilder");
1396 factOut(kInfo,-1, str ) ;
1397
1398
1399 evtCtrl.frstPtr = 0 ;
1400 evtCtrl.lastPtr = 0 ;
1401
1402 actime = g_actTime + 50000000 ;
1403/* initialize run control logics */
1404 for (i=0; i<MAX_RUN; i++) {
1405 runCtrl[i].runId = 0 ;
1406 runCtrl[i].fileId = -2 ;
1407 }
1408
1409 gj.evtBuf =
1410 gj.readEvt =
1411 gj.procEvt =
1412 gj.writEvt =
1413 gj.skipEvt = 0 ;
1414
1415//start all threads (more to come) when we are allowed to ....
1416 while (g_runStat == 0 ) {
1417 xwait.tv_sec = 0;
1418 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1419 nanosleep( &xwait , NULL ) ;
1420 }
1421
1422 i=0 ;
1423 th_ret[i] = pthread_create( &thread[i], NULL, readFAD, NULL );
1424 i++;
1425 th_ret[i] = pthread_create( &thread[i], NULL, procEvt, NULL );
1426 i++;
1427 th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, NULL );
1428 i++;
1429 imax=i ;
1430
1431#ifdef BILAND
1432 xwait.tv_sec = 20;;
1433 xwait.tv_nsec= 0 ; // sleep for ~20sec
1434 nanosleep( &xwait , NULL ) ;
1435
1436 printf("close all runs in 2 seconds\n");
1437
1438 CloseRunFile( 0, time(NULL)+2, 0) ;
1439
1440 xwait.tv_sec = 5;;
1441 xwait.tv_nsec= 0 ; // sleep for ~20sec
1442 nanosleep( &xwait , NULL ) ;
1443
1444 printf("setting g_runstat to -1\n");
1445
1446 g_runStat = -1 ;
1447#endif
1448
1449
1450//wait for all threads to finish
1451 for (i=0; i<imax; i++) {
1452 j = pthread_join ( thread[i], (void **)&status) ;
1453 }
1454
1455} /*-----------------------------------------------------------------*/
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471 /*-----------------------------------------------------------------*/
1472 /*-----------------------------------------------------------------*/
1473 /*-----------------------------------------------------------------*/
1474 /*-----------------------------------------------------------------*/
1475 /*-----------------------------------------------------------------*/
1476
1477
1478#ifdef BILAND
1479
1480
1481 /*-----------------------------------------------------------------*/
1482 /*-----------------------------------------------------------------*/
1483 /*-----------------------------------------------------------------*/
1484 /*-----------------------------------------------------------------*/
1485 /*-----------------------------------------------------------------*/
1486
1487
1488
1489
1490FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len )
1491{ return 1; } ;
1492
1493int runWrite(FileHandle_t fileHd , EVENT *event, size_t len )
1494{ return 1; } ;
1495
1496int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len )
1497{ return 1; } ;
1498
1499
1500
1501int eventCheck( PEVNT_HEADER *fadhd, EVENT *event)
1502{
1503 int i=0;
1504
1505 printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
1506 for (i=0; i<NBOARDS; i++) {
1507 printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
1508 }
1509 return 0;
1510}
1511
1512
1513void factStatNew(EVT_STAT gi) {
1514 int i ;
1515
1516 for (i=0;i<MAX_SOCK;i++) {
1517 printf("%4d",gi.numRead[i]);
1518 if (i%20 == 0 ) printf("\n");
1519 }
1520}
1521
1522
1523void factStat(GUI_STAT gj) {
1524// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
1525// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
1526}
1527
1528
1529void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) {
1530// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
1531}
1532
1533
1534
1535void debugStream(int isock, void *buf, int len) {
1536}
1537
1538void debugHead(int i, int j, void *buf) {
1539}
1540
1541
1542void factOut(int severity, int err, char* message ) {
1543static FILE * fd ;
1544static int file=0 ;
1545
1546 if (file==0) {
1547 printf("open file\n");
1548 fd=fopen("x.out","w+") ;
1549 file=999;
1550 }
1551
1552 fprintf(fd,"%3d %3d | %s \n",severity,err,message) ;
1553
1554 if (severity != kDebug)
1555 printf("%3d %3d | %s\n",severity,err,message) ;
1556}
1557
1558
1559
1560int main() {
1561 int i,b,c,p ;
1562 char ipStr[100] ;
1563 struct in_addr IPaddr ;
1564
1565 g_maxMem = 1024*1024 ; //MBytes
1566 g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
1567
1568
1569 g_runStat = 40 ;
1570
1571 i=0 ;
1572
1573// version for standard crates
1574//for (c=0; c<4,c++) {
1575// for (b=0; b<10; b++) {
1576// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
1577//
1578// inet_pton(PF_INET, ipStr, &IPaddr) ;
1579//
1580// g_port[i].sockAddr.sin_family = PF_INET;
1581// g_port[i].sockAddr.sin_port = htons(5000) ;
1582// g_port[i].sockAddr.sin_addr = IPaddr ;
1583// g_port[i].sockDef = 1 ;
1584// i++ ;
1585// }
1586//}
1587//
1588//version for PC-test *
1589 for (c=0; c<4; c++) {
1590 for (b=0; b<10; b++) {
1591 sprintf(ipStr,"10.0.%d.11",128+c) ;
1592 if (c==0) sprintf(ipStr,"10.0.100.11") ;
1593
1594 inet_pton(PF_INET, ipStr, &IPaddr) ;
1595 p = 31919+100*c+10*b;
1596
1597
1598 g_port[i].sockAddr.sin_family = PF_INET;
1599 g_port[i].sockAddr.sin_port = htons(p) ;
1600 g_port[i].sockAddr.sin_addr = IPaddr ;
1601 g_port[i].sockDef = 1 ;
1602
1603 i++ ;
1604 }
1605 }
1606
1607
1608//g_port[17].sockDef =-1 ;
1609//g_actBoards-- ;
1610
1611 StartEvtBuild() ;
1612
1613 return 0;
1614
1615}
1616#endif
Note: See TracBrowser for help on using the repository browser.