source: trunk/FACT++/src/EventBuilder.c@ 11343

Last change on this file since 11343 was 11343, checked in by tbretz, 13 years ago
Some fixes to the initialization procedure.
File size: 50.2 KB
Line 
1
2
3#define PX8 99 //simulator does not create double-length roi for pixel 8
4 //for real data, set PX8 = 8 ==> ask for double roi=TM
5
6
7
8#include <stdlib.h>
9#include <stdint.h>
10#include <unistd.h>
11#include <stdio.h>
12#include <sys/time.h>
13#include <arpa/inet.h>
14#include <string.h>
15#include <math.h>
16#include <error.h>
17#include <errno.h>
18#include <unistd.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <pthread.h>
22#include <sched.h>
23
24#include "EventBuilder.h"
25
26enum Severity
27{
28 kMessage = 10, ///< Just a message, usually obsolete
29 kInfo = 20, ///< An info telling something which can be interesting to know
30 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
31 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
32 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
33 kDebug = 99, ///< A message used for debugging only
34};
35
36#define MIN_LEN 32 // min #bytes needed to interpret FADheader
37#define MAX_LEN 256*1024 // size of read-buffer per socket
38
39extern FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) ;
40extern int runWrite(FileHandle_t fileHd , EVENT *event, size_t len ) ;
41extern int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len ) ;
42extern void factOut(int severity, int err, char* message ) ;
43
44extern void factStat(GUI_STAT gj) ;
45
46extern void factStatNew(EVT_STAT gi) ;
47
48extern int eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ;
49
50extern void debugHead(int i, int j, void *buf);
51
52extern void debugRead(int isock, int ibyte, int32_t event,int32_t ftmevt,
53 int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) ;
54extern void debugStream(int isock, void *buf, int len) ;
55
56int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
57
58
59uint g_actTime ;
60int g_runStat ;
61int g_reset ;
62int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX ;
63size_t g_maxMem ; //maximum memory allowed for buffer
64
65//no longer needed ...
66 int g_maxBoards ; //maximum number of boards to be initialized
67 int g_actBoards ;
68//
69
70FACT_SOCK g_port[NBOARDS] ; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
71
72
73 int gi_runStat ;
74 int gp_runStat ;
75 int gw_runStat ;
76
77 int gi_memStat = +1 ;
78
79 uint32_t gi_myRun ;
80
81
82
83uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
84
85//uint gi_EvtStart= 0 ;
86//uint gi_EvtRead = 0 ;
87//uint gi_EvtBad = 0 ;
88//uint gi_EvtTot = 0 ;
89//size_t gi_usedMem = 0 ;
90
91//uint gw_EvtTot = 0 ;
92//uint gp_EvtTot = 0 ;
93
94PIX_MAP g_pixMap[NPIX] ;
95
96EVT_STAT gi ;
97GUI_STAT gj ;
98
99EVT_CTRL evtCtrl ; //control of events during processing
100int evtIdx[MAX_EVT*MAX_RUN] ; //index from mBuffer to evtCtrl
101
102WRK_DATA mBuffer[MAX_EVT*MAX_RUN]; //local working space
103
104
105
106
107RUN_HEAD actRun ;
108
109RUN_CTRL runCtrl[MAX_RUN] ;
110
111RUN_TAIL runTail[MAX_RUN] ;
112
113
114/*
115*** Definition of rdBuffer to read in IP packets; keep it global !!!!
116 */
117
118
119typedef union {
120 int8_t B[MAX_LEN/8];
121 int16_t S[MAX_LEN/4];
122 int32_t I[MAX_LEN/2];
123 int64_t L[MAX_LEN ];
124} CNV_FACT ;
125
126typedef struct {
127 int bufTyp ; //what are we reading at the moment: 0=header 1=data -1=skip ...
128 int32_t bufPos ; //next byte to read to the buffer next
129 int32_t bufLen ; //number of bytes left to read
130 int32_t skip ; //number of bytes skipped before start of event
131
132 int sockStat ; //-1 if socket not yet connected , 99 if not exist
133 int socket ; //contains the sockets
134 struct sockaddr_in SockAddr ; //IP for each socket
135
136 int evtID ; // event ID of event currently read
137 int runID ; // run "
138 int ftmID ; // event ID from FTM
139 uint fadLen ; // FADlength of event currently read
140 int fadVers ; // Version of FAD
141 int board ; // boardID (softwareID: 0..40 )
142 int Port ;
143
144 CNV_FACT *rBuf ;
145
146} READ_STRUCT ;
147
148
149typedef union {
150 int8_t B[2];
151 int16_t S ;
152} SHORT_BYTE ;
153
154
155
156
157
158#define MXSTR 1000
159char str[MXSTR] ;
160
161SHORT_BYTE start, stop;
162
163READ_STRUCT rd[MAX_SOCK] ; //buffer to read IP and afterwards store in mBuffer
164
165
166
167/*-----------------------------------------------------------------*/
168
169
170/*-----------------------------------------------------------------*/
171
172
173
174int GenSock(int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) {
175/*
176*** generate Address, create sockets and allocates readbuffer for it
177***
178*** if flag==0 generate socket and buffer
179*** <0 destroy socket and buffer
180*** >0 close and redo socket
181***
182*** sid : board*7 + port id
183 */
184
185 int j ;
186
187 if (rd->sockStat ==0 ) { //close socket if open
188 j=close(rd->socket) ;
189 if (j>0) {
190 snprintf(str,MXSTR,"Error closing socket %d | %m",sid);
191 factOut(kFatal,771, str ) ;
192 } else {
193 snprintf(str,MXSTR,"Succesfully closed socket %d",sid);
194 factOut(kInfo,771, str ) ;
195 }
196 }
197
198
199 if (flag < 0) {
200 free(rd->rBuf) ; //and never open again
201 rd->rBuf = NULL ;
202 rd->sockStat = 99 ;
203 return 0 ;
204 }
205
206
207 if (flag == 0) { //generate address and buffer ...
208 rd->Port = port ;
209 rd->SockAddr.sin_family = sockAddr->sin_family;
210 rd->SockAddr.sin_port = htons(port) ;
211 rd->SockAddr.sin_addr = sockAddr->sin_addr ;
212
213 rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
214 if ( rd->rBuf == NULL ) {
215 snprintf(str,MXSTR,"Could not create local buffer %d",sid);
216 factOut(kFatal,774, str ) ;
217 rd->sockStat = 77 ;
218 return -3 ;
219 }
220 }
221
222
223 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
224 snprintf(str,MXSTR,"Could not generate socket %d | %m",sid);
225 factOut(kFatal,773, str ) ;
226 rd->sockStat = 88 ;
227 return -2 ;
228 }
229
230 snprintf(str,MXSTR,"Successfully generated socket %d ",sid);
231 factOut(kInfo,773, str ) ;
232 rd->sockStat = -1 ; //try to (re)open socket
233 return 0 ;
234
235} /*-----------------------------------------------------------------*/
236
237 /*-----------------------------------------------------------------*/
238
239
240
241
242int mBufInit() {
243// initialize mBuffer (mark all entries as unused\empty)
244
245 int i ;
246 uint32_t actime ;
247
248 actime = g_actTime + 50000000 ;
249
250 for (i=0; i<MAX_EVT*MAX_RUN; i++) {
251 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
252 mBuffer[i].runNum= 0 ;
253
254 evtCtrl.evtBuf[ i] = -1 ;
255 evtCtrl.evtStat[ i] = -1 ;
256 evtCtrl.pcTime[ i] = actime ; //initiate to far future
257 }
258
259
260 actRun.FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ;
261
262 evtCtrl.frstPtr = 0 ;
263 evtCtrl.lastPtr = 0 ;
264
265 return 0 ;
266
267} /*-----------------------------------------------------------------*/
268
269
270
271
272int mBufEvt( int evID, uint runID, int nRoi) {
273// generate a new Event into mBuffer:
274// make sure only complete Event are possible, so 'free' will always work
275// returns index into mBuffer[], or negative value in case of error
276
277
278 int i, k, evFree ;
279 int headmem=0 ;
280 size_t needmem = 0 ;
281
282 if (nRoi <0 || nRoi > 1024) {
283 snprintf(str,MXSTR,"illegal nRoi: %d",nRoi) ;
284 factOut(kError, 1, str ) ;
285 return -9999 ;
286 }
287
288 i = evID % MAX_EVT ;
289 evFree = -1 ;
290
291 for ( k=0; k<MAX_RUN; k++) {
292 if ( mBuffer[i].evNum == evID
293 && mBuffer[i].runNum== runID ) {
294 return i ;
295 }
296 if ( evFree < 0 && mBuffer[i].evNum < 0 ) evFree = i ;
297 i += MAX_EVT ;
298 }
299
300
301 //event does not yet exist; create
302 if (evFree < 0 ) { //no space available in ctrl
303 snprintf(str,MXSTR,"no control slot to keep event %d",evID) ;
304 factOut(kError,881, str ) ;
305 return -1 ;
306 }
307 i = evFree ; //found free entry; use it ...
308
309
310 needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2; //
311
312 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
313
314 if ( gj.usdMem + needmem + headmem > g_maxMem) {
315 gj.maxMem = gj.usdMem + needmem + headmem ;
316 if (gi_memStat>0 ) {
317 gi_memStat = -99 ;
318 snprintf(str,MXSTR,"no memory left to keep event %d",evID) ;
319 factOut(kError,882, str ) ;
320 }
321 return -11 ;
322 }
323
324 mBuffer[i].FADhead = malloc( headmem ) ;
325 if (mBuffer[i].FADhead == NULL) {
326 snprintf(str,MXSTR,"malloc header failed for event %d",evID) ;
327 factOut(kError,882, str ) ;
328 return -12;
329 }
330
331 mBuffer[i].fEvent = malloc( needmem ) ;
332 if (mBuffer[i].fEvent == NULL) {
333 snprintf(str,MXSTR,"malloc data failed for event %d",evID) ;
334 factOut(kError,882, str ) ;
335 free(mBuffer[i].fEvent) ;
336 mBuffer[i].fEvent = NULL ;
337 return -22;
338 }
339
340 //flag all boards as unused
341 mBuffer[i].nBoard = 0 ;
342 for (k=0; k<NBOARDS; k++ ) {
343 mBuffer[i].board[k] = -1;
344 }
345
346 //flag all pixels as unused
347 for (k=0; k<NPIX; k++ ) {
348 mBuffer[i].fEvent->StartPix[k] = -1 ;
349 }
350
351 //flag all TMark as unused
352 for (k=0; k<NTMARK; k++ ) {
353 mBuffer[i].fEvent->StartTM[k] = -1 ;
354 }
355
356 mBuffer[i].pcTime = g_actTime ;
357 mBuffer[i].nRoi = nRoi ;
358 mBuffer[i].evNum = evID ;
359 mBuffer[i].runNum = runID ;
360 mBuffer[i].evtLen = needmem ;
361
362 gj.usdMem += needmem + headmem;
363 if (gj.usdMem > gj.maxMem ) gj.maxMem = gj.usdMem ;
364
365 gj.rateNew++ ;
366
367 //register event in 'active list (reading)'
368
369 evtCtrl.evtBuf[ evtCtrl.lastPtr] = i ;
370 evtCtrl.evtStat[ evtCtrl.lastPtr] = 0 ;
371 evtCtrl.pcTime[ evtCtrl.lastPtr] = g_actTime ;
372 evtIdx[i] = evtCtrl.lastPtr ;
373
374
375snprintf(str,MXSTR,"%5d start new evt %8d %8d %2d",evID,i,evtCtrl.lastPtr,0);
376factOut(kDebug,-11, str ) ;
377 evtCtrl.lastPtr++ ;
378 if (evtCtrl.lastPtr == MAX_EVT*MAX_RUN ) evtCtrl.lastPtr = 0;
379
380 gi.evtGet++ ;
381
382 //check if runId already registered in runCtrl
383 evFree = -1 ;
384 for (k=0; k<MAX_RUN; k++) {
385 if (runCtrl[k].runId == runID ) return i ;//run exists already
386 else if (evFree < 0 && runCtrl[k].runId == 0 ) evFree = k ;
387 }
388
389 if (evFree <0 ) {
390 snprintf(str,MXSTR,"not able to register the new run %d",runID);
391 factOut(kFatal,883, str ) ;
392 } else {
393 runCtrl[evFree].runId = runID ;
394 runCtrl[evFree].fileId = -2 ;
395 runCtrl[evFree].nextEvt= 0;
396 runCtrl[evFree].actEvt = 0;
397 runCtrl[evFree].maxEvt = 999999999 ; //max number events allowed
398 runCtrl[evFree].lastTime=g_actTime ;
399 runCtrl[evFree].closeTime=g_actTime + 3600*24 ; //max time allowed
400 runCtrl[evFree].lastTime = 0 ;
401
402 runTail[evFree].nEventsOk =
403 runTail[evFree].nEventsRej =
404 runTail[evFree].nEventsBad =
405 runTail[evFree].PCtime0 =
406 runTail[evFree].PCtimeX = 0 ;
407 }
408
409 return i ;
410
411} /*-----------------------------------------------------------------*/
412
413
414
415
416int mBufFree(int i) {
417//delete entry [i] from mBuffer:
418//(and make sure multiple calls do no harm ....)
419
420 int headmem=0 ;
421 size_t freemem = 0 ;
422
423 freemem = mBuffer[i].evtLen ;
424
425 free(mBuffer[i].fEvent ) ;
426 mBuffer[i].fEvent = NULL ;
427
428 free(mBuffer[i].FADhead ) ;
429 mBuffer[i].FADhead = NULL ;
430
431 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
432 mBuffer[i].evNum = mBuffer[i].nRoi= -1;
433 mBuffer[i].runNum = 0;
434
435 gj.usdMem = gj.usdMem - freemem - headmem;
436
437 if (gi_memStat < 0 ) {
438 if (gj.usdMem <= 0.75 * gj.maxMem ) gi_memStat = +1 ;
439 }
440
441
442 return 0 ;
443
444} /*-----------------------------------------------------------------*/
445
446
447void resetEvtStat() {
448 int i ;
449
450 for (i=0; i<MAX_SOCK; i++) gi.numRead[i] = 0 ;
451
452 for (i=0; i<NBOARDS; i++ ) {
453 gi.gotByte[i] = 0 ;
454 gi.gotErr[i] = 0 ;
455 }
456
457 gi.evtGet = 0 ; //#new Start of Events read
458 gi.evtTot = 0 ; //#complete Events read
459 gi.evtErr = 0 ; //#Events with Errors
460 gi.evtSkp = 0 ; //#Events incomplete (timeout)
461
462 gi.procTot = 0 ; //#Events processed
463 gi.procErr = 0 ; //#Events showed problem in processing
464 gi.procTrg = 0 ; //#Events accepted by SW trigger
465 gi.procSkp = 0 ; //#Events rejected by SW trigger
466
467 gi.feedTot = 0 ; //#Events used for feedBack system
468 gi.feedErr = 0 ; //#Events rejected by feedBack
469
470 gi.wrtTot = 0 ; //#Events written to disk
471 gi.wrtErr = 0 ; //#Events with write-error
472
473 gi.runOpen = 0 ; //#Runs opened
474 gi.runClose= 0 ; //#Runs closed
475 gi.runErr = 0 ; //#Runs with open/close errors
476
477return ;
478} /*-----------------------------------------------------------------*/
479
480
481
482void initReadFAD() {
483return ;
484} /*-----------------------------------------------------------------*/
485
486
487
488void *readFAD( void *ptr ) {
489/* *** main loop reading FAD data and sorting them to complete events */
490 int head_len,frst_len,numok,numok2,dest,evID,i,k ;
491 int actBoards = 0, minLen ;
492 int32_t jrd ;
493 uint gi_SecTime ; //time in seconds
494 int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ;
495
496 int goodhed = 0 ;
497 int errcnt0 = 0 ;
498
499 int sockDef[NBOARDS]; //internal state of sockets
500
501
502
503 struct timespec xwait ;
504
505
506 struct timeval *tv, atv;
507 tv=&atv;
508 uint32_t tsec, tusec ;
509
510
511 snprintf(str,MXSTR,"start initializing");
512 factOut(kInfo,-1, str ) ;
513
514 int cpu = 7 ; //read thread
515 cpu_set_t mask;
516
517/* CPU_ZERO initializes all the bits in the mask to zero. */
518 CPU_ZERO( &mask );
519/* CPU_SET sets only the bit corresponding to cpu. */
520 cpu = 7 ;
521 CPU_SET( cpu, &mask );
522
523/* sched_setaffinity returns 0 in success */
524 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
525 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
526 factOut(kWarn,-1, str ) ;
527 }
528
529 head_len = sizeof(PEVNT_HEADER) ;
530 frst_len = head_len ; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
531 minLen = head_len ; //min #bytes needed to check header: full header for debug
532
533 start.S=0xFB01;
534 stop.S= 0x04FE;
535
536/* initialize run control logics */
537 for (i=0; i<MAX_RUN; i++) {
538 runCtrl[i].runId = 0 ;
539 runCtrl[i].fileId = -2 ;
540 }
541 gi_resetS = gi_resetR = 9;
542 for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ;
543
544START:
545 evtCtrl.frstPtr = 0 ;
546 evtCtrl.lastPtr = 0 ;
547
548 gi_myRun = g_actTime ;
549 gi_SecTime = g_actTime ;
550 gi_runStat = g_runStat ;
551 gj.readStat= g_runStat ;
552 numok = numok2 = 0 ;
553
554 if ( gi_resetS > 0) {
555 //make sure all sockets are preallocated as 'not exist'
556 for (i=0; i<MAX_SOCK; i++) {
557 rd[i].socket = -1 ;
558 rd[i].sockStat = 99 ;
559 }
560
561 for (k=0; k<NBOARDS; k++) {
562 gi_NumConnect[k]=0;
563 gi.numConn[k] =0;
564 gj.numConn[k] =0;
565 gj.errConn[k] =0;
566 gj.rateBytes[k] =0;
567 gj.totBytes[k] =0;
568 }
569
570 }
571
572
573 if ( gi_resetR > 0) {
574 resetEvtStat();
575 gj.usdMem = gj.maxMem = gj.xxxMem = 0 ;
576 gj.totMem = g_maxMem ;
577 gj.bufNew = gj.bufEvt = 0 ;
578 gj.evtSkip= gj.evtWrite = gj.evtErr = 0 ;
579
580
581 mBufInit() ; //initialize buffers
582
583 snprintf(str,MXSTR,"end initializing");
584 factOut(kInfo,-1, str ) ;
585 }
586
587
588 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0 ;
589
590 while (g_runStat >=0 && g_reset ==0 ) { //loop until global variable g_runStat claims stop
591
592 gi_runStat = g_runStat;
593 gj.readStat= g_runStat;
594
595 int b,p,p0,s0,nch;
596 nch = 0 ;
597 for (b=0; b<NBOARDS; b++ ) {
598 k = b*7 ;
599 if ( g_port[b].sockDef != sockDef[b] ) { //something has changed ...
600 nch++ ;
601 gi_NumConnect[ b ] = 0 ; //must close all connections
602 gi.numConn[ b ] = 0;
603 gj.numConn[ b ] = 0;
604 if ( sockDef[b] == 0) s0= 0 ; //sockets to be defined and opened
605 else if (g_port[b].sockDef == 0) s0=-1 ; //sockets to be destroyed
606 else s0=+1 ; //sockets to be closed and reopened
607
608 if (s0 == 0) p0=ntohs(g_port[b].sockAddr.sin_port);
609 else p0=0 ;
610
611 for (p=p0+1; p<p0+8; p++) {
612 GenSock(s0, k, p, &g_port[b].sockAddr, &rd[k]) ; //generate address and socket
613 k++ ;
614 }
615 sockDef[b] = g_port[b].sockDef ;
616 }
617 }
618
619 if (nch > 0 ) {
620 actBoards = 0 ;
621 for (b=0; b<NBOARDS; b++ ) {
622 if ( sockDef[b] > 0 ) actBoards++ ;
623 }
624 }
625
626
627
628
629 numok = 0 ; //count number of succesfull actions
630
631 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read
632 b = i / 7 ;
633 if (sockDef[b] > 0) s0=+1 ;
634 else s0=-1 ;
635
636 gettimeofday( tv, NULL);
637 g_actTime = tsec = atv.tv_sec ;
638 tusec= atv.tv_usec ;
639
640 if (rd[i].sockStat <0 ) { //try to connect if not yet done
641 rd[i].sockStat=connect(rd[i].socket,
642 (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ;
643 if (rd[i].sockStat ==0 ) { //successfull ==>
644 if (sockDef[b] > 0) {
645 rd[i].bufTyp = 0 ; // expect a header
646 rd[i].bufLen = frst_len ; // max size to read at begining
647 } else {
648 rd[i].bufTyp = -1 ; // full data to be skipped
649 rd[i].bufLen = sizeof(CNV_FACT) ; //huge for skipping
650 }
651 rd[i].bufPos = 0 ; // no byte read so far
652 rd[i].skip = 0 ; // start empty
653 gi_NumConnect[ b ]++ ;
654 gi.numConn[ b ]++ ;
655 gj.numConn[ b ]++ ;
656 snprintf(str,MXSTR,"+++connect %d %d",b,gi.numConn[ b ]);
657 factOut(kInfo,-1, str ) ;
658 }
659 }
660
661 if (rd[i].sockStat ==0 ) { //we have a connection ==> try to read
662 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
663 numok++ ;
664 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
665
666 if (jrd >0 ) {
667 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ;
668 }
669
670 if (jrd == 0) { //connection has closed ...
671 snprintf(str,MXSTR,"Socket %d closed by FAD",i);
672 factOut(kInfo,441, str ) ;
673 GenSock(s0, i, 0,NULL, &rd[i]) ;
674 gi.gotErr[ b ]++ ;
675 gi_NumConnect[ b ]-- ;
676 gi.numConn[ b ]-- ;
677 gj.numConn[ b ]-- ;
678
679 } else if ( jrd<0 ) { //did not read anything
680 if (errno != EAGAIN && errno != EWOULDBLOCK ) {
681 snprintf(str,MXSTR,"Error Reading from %d | %m",i);
682 factOut(kError,442, str ) ;
683 gi.gotErr[ b ]++ ;
684 } else numok-- ; //else nothing waiting to be read
685 jrd = 0 ;
686 }
687 } else jrd = 0 ; //did read nothing as requested
688
689 gi.gotByte[ b ] += jrd ;
690 gj.rateBytes[b] += jrd ;
691
692 if ( rd[i].bufTyp <0 ) { // we are skipping this board ...
693// just do nothing
694
695 } else if ( rd[i].bufTyp >0 ) { // we are reading data ...
696 if ( jrd < rd[i].bufLen ) { //not yet all read
697 rd[i].bufPos += jrd ; //==> prepare for continuation
698 rd[i].bufLen -= jrd ;
699 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
700 } else { //full dataset read
701 rd[i].bufLen = 0 ;
702 rd[i].bufPos = rd[i].fadLen ;
703 if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0]
704 && rd[i].rBuf->B[ rd[i].bufPos ] != stop.B[1]) {
705 gi.evtErr++ ;
706 snprintf(str,MXSTR,"wrong end of buffer found %d",rd[i].bufPos);
707 factOut(kError,301, str ) ;
708 goto EndBuf ;
709
710 }
711 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
712
713 //we have a complete buffer, copy to WORK area
714 roi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 ]) ;
715 //get index into mBuffer for this event (create if needed)
716 evID = mBufEvt( rd[i].evtID, rd[i].runID, roi ) ;
717
718 if (evID <-9000) goto EndBuf ; //illegal event, skip it ...
719 if (evID < 0) {
720 xwait.tv_sec = 0;
721 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec
722 nanosleep( &xwait , NULL ) ;
723 goto EndBuf1 ; //hope there is free space next round
724 }
725
726
727 //we have a valid entry in mBuffer[]; fill it
728
729 boardId = b ;
730 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
731 int fadCrate = fadBoard/256 ;
732 if (boardId != (fadCrate*10 + fadBoard%256) ) {
733 snprintf(str,MXSTR,"wrong Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
734 if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ; //print only few times
735 }
736 if ( mBuffer[evID].board[ boardId ] != -1) {
737 snprintf(str,MXSTR,"double board %d for event %d",boardId,evID) ;
738 factOut(kWarn,501, str ) ;
739 goto EndBuf ; //--> skip Board
740 }
741
742 int iDx = evtIdx[evID] ; //index into evtCtrl
743
744 memcpy( &mBuffer[evID].FADhead[boardId].start_package_flag,
745 &rd[i].rBuf->S[0], head_len) ;
746 roi = mBuffer[evID].nRoi ;
747
748 pixS = boardId*36 -1 ; //
749 tmS = boardId*4 -1 ; //
750 src = head_len/2 ;
751 for ( drs=0; drs<4; drs++ ) {
752 for ( px=0; px<9; px++ ) {
753 pixH= ntohs(rd[i].rBuf->S[src++]) ;
754 pixC= ntohs(rd[i].rBuf->S[src++]) ;
755 pixR= ntohs(rd[i].rBuf->S[src++]) ;
756
757 src++ ;
758 pixS++ ; //pixS = pixH2S[pixH] ;
759 if ( ( px < PX8 && pixR == roi )
760 || ( px ==PX8 && pixR == 2*roi )
761 || ( px ==PX8 && pixR == roi && roi > 512 ) ) {
762 // correct roi
763 mBuffer[evID].fEvent->StartPix[pixS] =pixC;
764 dest= pixS * roi ;
765 memcpy(
766 &mBuffer[evID].fEvent->Adc_Data[dest],
767 &rd[i].rBuf->S[src], roi * 2) ;
768 src+= roi ;
769 if ( px==PX8 ) {
770 tmS++; // tmS = tmH2S[pixH]
771 dest= tmS * roi + NPIX* roi ;
772 if ( roi <=512 ) {
773 mBuffer[evID].fEvent->StartTM[tmS] =(pixC+roi)%1024 ;
774 memcpy(
775 &mBuffer[evID].fEvent->Adc_Data[dest],
776 &rd[i].rBuf->S[src], roi * 2) ;
777 src+=roi ;
778 } else {
779 mBuffer[evID].fEvent->StartTM[tmS] = -1 ;
780 }
781 }
782 } else {
783 snprintf(str,MXSTR,"wrong roi %d %d %d %d",px,pixR,roi,src-2);
784 gi.evtErr++ ;
785 factOut(kError,202, str ) ;
786 goto EndBuf ;
787 }
788 }
789 }// now we have stored a new board contents into Event structure
790
791 mBuffer[evID].board[ boardId ] = boardId ;
792 evtCtrl.evtStat[ iDx ]++ ;
793 evtCtrl.pcTime[ iDx ] = g_actTime ;
794
795 if (++mBuffer[evID].nBoard >= actBoards ) {
796 snprintf(str,MXSTR,"%5d complete event %8d %8d %2d",mBuffer[evID].evNum,evtCtrl.evtBuf[iDx],iDx,evtCtrl.evtStat[ iDx ]);
797 factOut(kDebug,-1, str ) ;
798 //complete event read ---> flag for next processing
799 evtCtrl.evtStat[ iDx ] = 99;
800 gi.evtTot++ ;
801 }
802
803EndBuf:
804 rd[i].bufTyp = 0 ; //ready to read next header
805 rd[i].bufLen = frst_len ;
806 rd[i].bufPos = 0 ;
807EndBuf1:
808 ;
809 }
810
811 } else { //we are reading event header
812 rd[i].bufPos += jrd ;
813 rd[i].bufLen -= jrd ;
814 if ( rd[i].bufPos >= minLen ){ //sufficient data to take action
815 //check if startflag correct; else shift block ....
816 for (k=0; k<rd[i].bufPos -1 ; k++) {
817 if (rd[i].rBuf->B[k ] == start.B[1]
818 && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
819 }
820 rd[i].skip += k ;
821
822 if (k >= rd[i].bufPos-1 ) { //no start of header found
823// snprintf(str,MXSTR,"no start of header on port%d", i ) ;
824// factOut(kWarn,666, str ) ;
825
826 rd[i].bufPos = 0 ;
827 rd[i].bufLen = head_len ;
828 } else if ( k>0 ) {
829 rd[i].bufPos -= k ;
830 rd[i].bufLen += k ;
831 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ;
832 }
833 if ( rd[i].bufPos >= minLen ) {
834 if ( rd[i].skip >0 ) {
835 snprintf(str,MXSTR,"skipped %d bytes on port%d", rd[i].skip, i ) ;
836 factOut(kInfo,666, str ) ;
837 rd[i].skip = 0 ;
838 }
839 goodhed++;
840 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ;
841 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
842 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; //(FADevt)
843 rd[i].ftmID = ntohl(rd[i].rBuf->I[5]) ; //(FTMevt)
844 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ;
845 if (rd[i].runID ==0 ) rd[i].runID = gi_myRun ;
846 rd[i].bufTyp = 1 ; //ready to read full record
847 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
848 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ; //?
849 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
850 debugHead(i,fadBoard,rd[i].rBuf);
851 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event
852 } else {
853 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
854 }
855 } else {
856 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
857 }
858
859 } //end interpreting last read
860 } //end of successful read anything
861 } //finished trying to read all sockets
862
863 gi.numRead[ numok ] ++ ;
864
865 g_actTime = time(NULL) ;
866 if ( g_actTime > gi_SecTime ) {
867 gi_SecTime = g_actTime ;
868
869
870 //loop over all active events and flag those older than read-timeout
871 //delete those that are written to disk ....
872
873 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
874 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
875
876 gj.bufNew = gj.bufEvt = 0 ;
877 int k1=evtCtrl.frstPtr;
878 for ( k=k1; k<(k1+kd); k++ ) {
879 int k0 = k % (MAX_EVT*MAX_RUN) ;
880//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
881
882 if (evtCtrl.evtStat[k0] > 0
883 && evtCtrl.evtStat[k0] < 92 ) {
884 gj.bufNew++ ; //incomplete event in Buffer
885 if ( evtCtrl.evtStat[k0] < 90
886 && evtCtrl.pcTime[k0] < g_actTime-10 ) {
887 int id =evtCtrl.evtBuf[k0] ;
888 snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]);
889 factOut(kWarn,601, str ) ;
890 evtCtrl.evtStat[k0] = 91 ; //timeout for incomplete events
891 gi.evtSkp++ ;
892 gi.evtTot++ ;
893 gj.evtSkip++;
894 }
895 } else if (evtCtrl.evtStat[k0] >= 900 //'delete'
896 || evtCtrl.evtStat[k0] == 0 ) { //'useless'
897
898 int id =evtCtrl.evtBuf[k0] ;
899 snprintf(str,MXSTR,"%5d free event buffer (written) %3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
900 factOut(kDebug,-1, str ) ;
901 mBufFree(id) ; //event written--> free memory
902 evtCtrl.evtStat[k0] = -1;
903 gj.evtWrite++ ;
904 gj.rateWrite++ ;
905 } else if (evtCtrl.evtStat[k0] >= 95 ) {
906 gj.bufEvt++ ; //complete event in Buffer
907 }
908
909 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) {
910 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
911 }
912 }
913
914
915 gj.deltaT = 1000 ; //temporary, must be improved
916
917 int b;
918 for ( b=0; b<NBOARDS; b++) gj.totBytes[b] +=gj.rateBytes[b] ;
919 gj.totMem = g_maxMem ;
920 if (gj.maxMem >= gj.xxxMem) gj.xxxMem = gj.maxMem ;
921
922 factStat(gj);
923 factStatNew(gi) ;
924
925 gj.rateNew = gj.rateWrite = 0 ;
926 for ( b=0; b<NBOARDS; b++) gj.rateBytes[b] =0 ;
927 }
928
929 if (numok > 0 ) numok2=0;
930 else if (numok2++ > 3) {
931 if (g_runStat == 1) {
932 xwait.tv_sec = 1;
933 xwait.tv_nsec= 0 ; // hibernate for 1 sec
934 } else {
935 xwait.tv_sec = 0;
936 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
937 }
938 nanosleep( &xwait , NULL ) ;
939 }
940
941 } //and do next loop over all sockets ...
942
943
944 snprintf(str,MXSTR,"stop reading ... RESET=%d",g_reset);
945 factOut(kInfo,-1, str ) ;
946
947 if (g_reset >0 ) {
948 gi_reset = g_reset ;
949 gi_resetR = gi_reset%10 ; //shall we stop reading ?
950 gi_resetS = (gi_reset/10)%10 ; //shall we close sockets ?
951 gi_resetW = (gi_reset/100)%10 ; //shall we close files ?
952 gi_resetX = gi_reset/1000 ; //shall we simply wait resetX seconds ?
953 g_reset= 0;
954 } else {
955 gi_reset = 0;
956 if ( g_runStat==-1 ) gi_resetR = 1 ;
957 else gi_resetR = 7 ;
958 gi_resetS = 7 ; //close all sockets
959 gi_resetW = 7 ; //close all files
960 gi_resetX = 0 ;
961
962 //inform others we have to quit ....
963 gi_runStat = -11 ; //inform all that no update to happen any more
964 gj.readStat= -11 ; //inform all that no update to happen any more
965 }
966
967 if (gi_resetS > 0) {
968 //must close all open sockets ...
969 snprintf(str,MXSTR,"close all sockets ...");
970 factOut(kInfo,-1, str ) ;
971 for (i=0; i<MAX_SOCK; i++) {
972 if (rd[i].sockStat ==0 ) {
973 GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy socket
974 gi_NumConnect[ i/7 ]-- ;
975 gi.numConn[ i/7 ]-- ;
976 gj.numConn[ i/7 ]-- ;
977 }
978 }
979 }
980
981
982 if (gi_resetR > 0) {
983 //flag all events as 'read finished'
984 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
985 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
986
987 int k1=evtCtrl.frstPtr;
988 for ( k=k1; k<(k1+kd); k++ ) {
989 int k0 = k % (MAX_EVT*MAX_RUN) ;
990 if (evtCtrl.evtStat[k0] > 0
991 && evtCtrl.evtStat[k0] < 90 ) {
992 evtCtrl.evtStat[k0] = 91 ;
993 gi.evtSkp++ ;
994 gi.evtTot++ ;
995 }
996 }
997
998 xwait.tv_sec = 0;
999 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1000 nanosleep( &xwait , NULL ) ;
1001
1002 //and clear all buffers (might have to wait until all others are done)
1003 int minclear ;
1004 if (gi_resetR == 1) {
1005 minclear = 900 ;
1006 snprintf(str,MXSTR,"drain all buffers ...");
1007 } else {
1008 minclear = 0 ;
1009 snprintf(str,MXSTR,"flush all buffers ...");
1010 }
1011 factOut(kInfo,-1, str ) ;
1012
1013 int numclear=1 ;
1014 while (numclear > 0 ) {
1015 numclear = 0 ;
1016 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1017 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1018
1019 int k1=evtCtrl.frstPtr;
1020 for ( k=k1; k<(k1+kd); k++ ) {
1021 int k0 = k % (MAX_EVT*MAX_RUN) ;
1022 if (evtCtrl.evtStat[k0] > minclear ) {
1023 int id =evtCtrl.evtBuf[k0] ;
1024 mBufFree(id) ; //event written--> free memory
1025 evtCtrl.evtStat[k0] = -1;
1026 } else if (evtCtrl.evtStat[k0] > 0) numclear++ ; //writing is still ongoing...
1027
1028 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 )
1029 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
1030 }
1031
1032 xwait.tv_sec = 0;
1033 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1034 nanosleep( &xwait , NULL ) ;
1035 }
1036 }
1037
1038 if (gi_reset > 0) {
1039 if (gi_resetW > 0) {
1040 CloseRunFile(0,0,0) ; //ask all Runs to be closed
1041 }
1042 if (gi_resetX > 0) {
1043 xwait.tv_sec = gi_resetX;
1044 xwait.tv_nsec= 0 ;
1045 nanosleep( &xwait , NULL ) ;
1046 }
1047
1048 snprintf(str,MXSTR,"Continue read Process ...");
1049 factOut(kInfo,-1, str ) ;
1050 gi_reset = 0 ;
1051 goto START ;
1052 }
1053
1054
1055
1056 snprintf(str,MXSTR,"Exit read Process ...");
1057 factOut(kInfo,-1, str ) ;
1058 gi_runStat = -99 ;
1059 gj.readStat= -99 ;
1060 factStat(gj);
1061 factStatNew(gi) ;
1062 return 0;
1063
1064} /*-----------------------------------------------------------------*/
1065
1066
1067void *procEvt( void *ptr ) {
1068/* *** main loop processing file, including SW-trigger */
1069 int numProc, numWait ;
1070 int k ;
1071 struct timespec xwait ;
1072 char str[MXSTR] ;
1073
1074 cpu_set_t mask;
1075 int cpu = 5 ; //process thread (will be several in final version)
1076
1077 snprintf(str,MXSTR,"Starting process-thread");
1078 factOut(kInfo,-1, str ) ;
1079
1080/* CPU_ZERO initializes all the bits in the mask to zero. */
1081 CPU_ZERO( &mask );
1082/* CPU_SET sets only the bit corresponding to cpu. */
1083 CPU_SET( cpu, &mask );
1084/* sched_setaffinity returns 0 in success */
1085 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1086 snprintf(str,MXSTR,"P ---> can not create affinity to %d",cpu);
1087 factOut(kWarn,-1, str ) ;
1088 }
1089
1090
1091 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1092
1093 numWait = numProc = 0 ;
1094 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1095 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1096
1097 int k1=evtCtrl.frstPtr;
1098 for ( k=k1; k<(k1+kd); k++ ) {
1099 int k0 = k % (MAX_EVT*MAX_RUN) ;
1100//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1101 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
1102
1103 if ( gi_resetR > 1 ) { //we are asked to flush buffers asap
1104 evtCtrl.evtStat[k0] = 991 ;
1105 } else {
1106
1107 int id = evtCtrl.evtBuf[k0] ;
1108 int ievt = mBuffer[id].evNum ;
1109 int roi = mBuffer[id].nRoi ;
1110// uint32_t irun = mBuffer[id].runNum ;
1111//snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ;
1112//factOut(kDebug,-1, str ) ;
1113
1114//make sure unused pixels/tmarks are cleared to zero
1115 int ip,it,dest,ib;
1116 for (ip=0; ip<NPIX; ip++) {
1117 if (mBuffer[id].fEvent->StartPix[ip] == -1 ) {
1118 dest= ip*roi ;
1119 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
1120 }
1121 }
1122 for (it=0; it<NTMARK; it++) {
1123 if (mBuffer[id].fEvent->StartTM[it] == -1 ) {
1124 dest= it*roi + NPIX*roi ;
1125 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
1126 }
1127 }
1128
1129
1130//and set correct event header ; also check for consistency in event (not yet)
1131 mBuffer[id].fEvent->Roi = roi ;
1132 mBuffer[id].fEvent->EventNum = ievt ;
1133 mBuffer[id].fEvent->TriggerType = 0 ; // TBD
1134 mBuffer[id].fEvent->SoftTrig = 0 ;
1135 for (ib=0; ib<NBOARDS; ib++) {
1136 if (mBuffer[id].board[ib] == -1 ) { //board is not read
1137 mBuffer[id].FADhead[ib].start_package_flag = 0 ;
1138 mBuffer[id].fEvent->BoardTime[ib] = 0 ;
1139 } else {
1140 mBuffer[id].fEvent->BoardTime[ib] =
1141 ntohl(mBuffer[id].FADhead[ib].time) ;
1142 }
1143 }
1144
1145 int i=eventCheck(mBuffer[id].FADhead,mBuffer[id].fEvent) ;
1146// gj.procEvt++ ;
1147 gi.procTot++ ;
1148 numProc++ ;
1149
1150 if (i<0) {
1151 evtCtrl.evtStat[k0] = 999 ; //flag event to be skipped
1152 gi.procErr++ ;
1153 } else {
1154 evtCtrl.evtStat[k0] = 520 ;
1155 }
1156 }
1157 } else if ( evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] < 90 ) {
1158 numWait++ ;
1159 }
1160 }
1161
1162 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do
1163 snprintf(str,MXSTR,"Exit Processing Process ...");
1164 factOut(kInfo,-1, str ) ;
1165 gp_runStat = -22 ; //==> we should exit
1166 gj.procStat= -22 ; //==> we should exit
1167 return 0 ;
1168 }
1169
1170 if (numProc == 0) {
1171 //seems we have nothing to do, so sleep a little
1172 xwait.tv_sec = 0;
1173 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1174 nanosleep( &xwait , NULL ) ;
1175 }
1176 gp_runStat = gi_runStat ;
1177 gj.procStat= gj.readStat ;
1178
1179 }
1180
1181 //we are asked to abort asap ==> must flag all remaining events
1182 // when gi_runStat claims that all events are in the buffer...
1183
1184 snprintf(str,MXSTR,"Abort Processing Process ...");
1185 factOut(kInfo,-1, str ) ;
1186 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1187 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1188
1189 int k1=evtCtrl.frstPtr;
1190 for ( k=k1; k<(k1+kd); k++ ) {
1191 int k0 = k % (MAX_EVT*MAX_RUN) ;
1192 if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <500) {
1193 evtCtrl.evtStat[k0] = 555 ; //flag event as 'processed'
1194 }
1195 }
1196
1197 gp_runStat = -99 ;
1198 gj.procStat= -99 ;
1199
1200 return 0;
1201
1202} /*-----------------------------------------------------------------*/
1203
1204int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt) {
1205/* close run runId (all all runs if runId=0) */
1206/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1207 int j ;
1208
1209
1210 if ( runId == 0 ) {
1211 for ( j=0; j<MAX_RUN; j++) {
1212 if ( runCtrl[j].fileId == 0 ) { //run is open
1213 runCtrl[j].closeTime = closeTime ;
1214 runCtrl[j].maxEvt = maxEvt ;
1215 }
1216 }
1217 return 0;
1218 }
1219
1220 for ( j=0; j<MAX_RUN; j++) {
1221 if ( runCtrl[j].runId == runId ) {
1222 if ( runCtrl[j].fileId == 0 ) { //run is open
1223 runCtrl[j].closeTime = closeTime ;
1224 runCtrl[j].maxEvt = maxEvt ;
1225 return 0;
1226 } else if ( runCtrl[j].fileId <0 ) { //run not yet opened
1227 runCtrl[j].closeTime = closeTime ;
1228 runCtrl[j].maxEvt = maxEvt ;
1229 return +1;
1230 } else { // run already closed
1231 return +2;
1232 }
1233 }
1234 } //we only reach here if the run was never created
1235 return -1;
1236
1237} /*-----------------------------------------------------------------*/
1238
1239
1240void *writeEvt( void *ptr ) {
1241/* *** main loop writing event (including opening and closing run-files */
1242
1243 int numWrite, numWait ;
1244 int k,j,i ;
1245 struct timespec xwait ;
1246 char str[MXSTR] ;
1247
1248 cpu_set_t mask;
1249 int cpu = 3 ; //write thread
1250
1251 snprintf(str,MXSTR,"Starting write-thread");
1252 factOut(kInfo,-1, str ) ;
1253
1254/* CPU_ZERO initializes all the bits in the mask to zero. */
1255 CPU_ZERO( &mask );
1256/* CPU_SET sets only the bit corresponding to cpu. */
1257 CPU_SET( cpu, &mask );
1258/* sched_setaffinity returns 0 in success */
1259 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1260 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
1261 }
1262
1263 int lastRun = 0 ; //usually run from last event still valid
1264
1265 while (g_runStat >-2) {
1266
1267 numWait = numWrite = 0 ;
1268 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1269 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1270
1271 int k1=evtCtrl.frstPtr;
1272 for ( k=k1; k<(k1+kd); k++ ) {
1273 int k0 = k % (MAX_EVT*MAX_RUN) ;
1274//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1275 if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] < 900) {
1276
1277 if ( gi_resetR > 1 ) { //we must drain the buffer asap
1278 evtCtrl.evtStat[k0] = 904 ;
1279 } else {
1280
1281
1282 int id = evtCtrl.evtBuf[k0] ;
1283 uint32_t irun = mBuffer[id].runNum ;
1284 int ievt = mBuffer[id].evNum ;
1285
1286 gi.wrtTot++ ;
1287 if (runCtrl[lastRun].runId == irun ) {
1288 j = lastRun ;
1289 } else {
1290 //check which fileID to use (or open if needed)
1291 for ( j=0; j<MAX_RUN; j++) {
1292 if ( runCtrl[j].runId == irun ) break ;
1293 }
1294 if ( j >= MAX_RUN ) {
1295 snprintf(str,MXSTR,"W error: can not find run %d for event %d in %d", irun,ievt,id);
1296 factOut(kFatal,901, str ) ;
1297 gi.wrtErr++ ;
1298for ( j=0; j<MAX_RUN; j++) printf("j %d run.j %d run %d\n",j,runCtrl[j].runId,irun );
1299exit(111);
1300 }
1301 lastRun = j ;
1302 }
1303
1304 if (runCtrl[j].fileId < 0 ) {
1305 actRun.Version = 1 ;
1306 actRun.RunType = -1 ;
1307 actRun.NBoard = NBOARDS ;
1308 actRun.NPix = NPIX ;
1309 actRun.NTm = NTMARK ;
1310 actRun.Nroi = mBuffer[id].nRoi ;
1311// actRun.FADhead = mBuffer[id].FADhead ; //to be corrected
1312
1313 runCtrl[j].fileHd = runOpen(irun, &actRun, sizeof(actRun) ) ;
1314 if (runCtrl[j].fileHd == NULL ) {
1315 snprintf(str,MXSTR,"W could not open a file for run %d",irun);
1316 factOut(kError,502, str ) ;
1317 runCtrl[j].fileId = 99 ;
1318 } else {
1319 snprintf(str,MXSTR,"W opened new run_file %d",irun) ;
1320 factOut(kInfo,-1, str ) ;
1321 runCtrl[j].fileId = 0 ;
1322 }
1323
1324 }
1325
1326 if (runCtrl[j].fileId != 0 ) {
1327 snprintf(str,MXSTR,"W no open file for this run %d",irun) ;
1328 factOut(kWarn,123,str) ;
1329 evtCtrl.evtStat[k0] = 903 ;
1330 gi.wrtErr++ ;
1331 } else {
1332 i=runWrite(runCtrl[j].fileHd, mBuffer[id].fEvent, sizeof(mBuffer[id]) );
1333 if ( i>=0 ) {
1334 runCtrl[j].lastTime = g_actTime;
1335 runCtrl[j].actEvt++ ;
1336 evtCtrl.evtStat[k0] = 901 ;
1337 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0);
1338 factOut(kDebug,504, str ) ;
1339// gj.writEvt++ ;
1340 } else {
1341 snprintf(str,MXSTR,"W error writing event for run %d",irun) ;
1342 factOut(kError,503, str ) ;
1343 evtCtrl.evtStat[k0] = 902 ;
1344 gi.wrtErr++ ;
1345 }
1346
1347 if ( i < 0
1348 || runCtrl[j].lastTime < g_actTime-300
1349 || runCtrl[j].closeTime < g_actTime
1350 || runCtrl[j].maxEvt < runCtrl[j].actEvt ) {
1351int ii =0 ;
1352if ( i < 0 ) ii=1 ;
1353else if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1354else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1355else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1356
1357
1358
1359 //close run for whatever reason
1360 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1361 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1362 if (i<0) {
1363 snprintf(str,MXSTR,"error closing run %d %d AAA",runCtrl[j].runId,i) ;
1364 factOut(kError,503, str ) ;
1365 runCtrl[j].fileId = 9001 ;
1366 } else {
1367 snprintf(str,MXSTR,"W closed run %d AAA %d",irun,ii) ;
1368 factOut(kInfo,503, str ) ;
1369 runCtrl[j].fileId = 901 ;
1370 }
1371 }
1372 }
1373 }
1374 } else if (evtCtrl.evtStat[k0] > 0
1375 && evtCtrl.evtStat[k0] < 900 ) numWait++ ;
1376 }
1377
1378 //check if we should close a run (mainly when no event pending)
1379 for ( j=0; j<MAX_RUN; j++) {
1380 if ( runCtrl[j].fileId==0
1381 && ( runCtrl[j].closeTime < g_actTime
1382 ||runCtrl[j].lastTime < g_actTime-300
1383 ||runCtrl[j].maxEvt < runCtrl[j].actEvt ) ) {
1384 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1385int ii =0 ;
1386 if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1387else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1388else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1389
1390
1391 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1392 if (i<0) {
1393 snprintf(str,MXSTR,"error closing run %d %d BBB",runCtrl[j].runId,i) ;
1394 factOut(kError,506, str ) ;
1395 runCtrl[j].fileId = 9011 ;
1396 } else {
1397 snprintf(str,MXSTR,"W closed run %d BBB %d",runCtrl[j].runId,ii) ;
1398 factOut(kInfo,507, str ) ;
1399 runCtrl[j].fileId = 911 ;
1400 }
1401 }
1402 }
1403
1404 if (numWrite == 0) {
1405 //seems we have nothing to do, so sleep a little
1406 xwait.tv_sec = 0;
1407 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1408 nanosleep( &xwait , NULL ) ;
1409 }
1410
1411 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do
1412 snprintf(str,MXSTR,"Finish Write Process ...");
1413 factOut(kInfo,-1, str ) ;
1414 gw_runStat = -22 ; //==> we should exit
1415 gj.writStat= -22 ; //==> we should exit
1416 goto closerun ;
1417 }
1418 gw_runStat = gi_runStat ;
1419 gj.writStat= gj.readStat ;
1420
1421 }
1422
1423 //must close all open files ....
1424 snprintf(str,MXSTR,"Abort Writing Process ...");
1425 factOut(kInfo,-1, str ) ;
1426
1427closerun:
1428 snprintf(str,MXSTR,"Close all open files ...");
1429 factOut(kInfo,-1, str ) ;
1430 for ( j=0; j<MAX_RUN; j++)
1431 if ( runCtrl[j].fileId ==0 ) {
1432 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1433 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1434int ii =0 ;
1435 if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1436else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1437else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1438 if (i<0) {
1439 snprintf(str,MXSTR,"error closing run %d %d CCC",runCtrl[j].runId,i) ;
1440 factOut(kError,506, str ) ;
1441 runCtrl[j].fileId = 9021 ;
1442 } else {
1443 snprintf(str,MXSTR,"W closed run %d CCC %d",runCtrl[j].runId,ii) ;
1444 factOut(kInfo,507, str ) ;
1445 runCtrl[j].fileId = 921 ;
1446 }
1447 }
1448
1449 gw_runStat = -99;
1450 gj.writStat= -99;
1451 snprintf(str,MXSTR,"Exit Writing Process ...");
1452 factOut(kInfo,-1, str ) ;
1453 return 0;
1454
1455
1456
1457
1458} /*-----------------------------------------------------------------*/
1459
1460
1461
1462
1463void StartEvtBuild() {
1464
1465 int i,j,imax,status,th_ret[50] ;
1466 pthread_t thread[50] ;
1467 struct timespec xwait ;
1468
1469 gi_runStat = gp_runStat = gw_runStat = 0 ;
1470 gj.readStat= gj.procStat= gj.writStat= 0 ;
1471
1472 snprintf(str,MXSTR,"Starting EventBuilder");
1473 factOut(kInfo,-1, str ) ;
1474
1475/* initialize run control logics */
1476 for (i=0; i<MAX_RUN; i++) {
1477 runCtrl[i].runId = 0 ;
1478 runCtrl[i].fileId = -2 ;
1479 }
1480
1481/* partially initialize event control logics */
1482 evtCtrl.frstPtr = 0 ;
1483 evtCtrl.lastPtr = 0 ;
1484
1485//start all threads (more to come) when we are allowed to ....
1486 while (g_runStat == 0 ) {
1487 xwait.tv_sec = 0;
1488 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec
1489 nanosleep( &xwait , NULL ) ;
1490 }
1491
1492 i=0 ;
1493 th_ret[i] = pthread_create( &thread[i], NULL, readFAD, NULL );
1494 i++;
1495 th_ret[i] = pthread_create( &thread[i], NULL, procEvt, NULL );
1496 i++;
1497 th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, NULL );
1498 i++;
1499 imax=i ;
1500
1501
1502#ifdef BILAND
1503 xwait.tv_sec = 20;;
1504 xwait.tv_nsec= 0 ; // sleep for ~20sec
1505 nanosleep( &xwait , NULL ) ;
1506
1507 printf("close all runs in 2 seconds\n");
1508
1509 CloseRunFile( 0, time(NULL)+2, 0) ;
1510
1511 xwait.tv_sec = 5;;
1512 xwait.tv_nsec= 0 ; // sleep for ~20sec
1513 nanosleep( &xwait , NULL ) ;
1514
1515 printf("setting g_runstat to -1\n");
1516
1517 g_runStat = -1 ;
1518#endif
1519
1520
1521//wait for all threads to finish
1522 for (i=0; i<imax; i++) {
1523 j = pthread_join ( thread[i], (void **)&status) ;
1524 }
1525
1526} /*-----------------------------------------------------------------*/
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542 /*-----------------------------------------------------------------*/
1543 /*-----------------------------------------------------------------*/
1544 /*-----------------------------------------------------------------*/
1545 /*-----------------------------------------------------------------*/
1546 /*-----------------------------------------------------------------*/
1547
1548
1549#ifdef BILAND
1550
1551
1552 /*-----------------------------------------------------------------*/
1553 /*-----------------------------------------------------------------*/
1554 /*-----------------------------------------------------------------*/
1555 /*-----------------------------------------------------------------*/
1556 /*-----------------------------------------------------------------*/
1557
1558
1559
1560
1561FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len )
1562{ return 1; } ;
1563
1564int runWrite(FileHandle_t fileHd , EVENT *event, size_t len )
1565{ return 1; } ;
1566
1567int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len )
1568{ return 1; } ;
1569
1570
1571
1572int eventCheck( PEVNT_HEADER *fadhd, EVENT *event)
1573{
1574 int i=0;
1575
1576 printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
1577 for (i=0; i<NBOARDS; i++) {
1578 printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
1579 }
1580 return 0;
1581}
1582
1583
1584void factStatNew(EVT_STAT gi) {
1585 int i ;
1586
1587 for (i=0;i<MAX_SOCK;i++) {
1588 printf("%4d",gi.numRead[i]);
1589 if (i%20 == 0 ) printf("\n");
1590 }
1591}
1592
1593
1594void factStat(GUI_STAT gj) {
1595// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
1596// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
1597}
1598
1599
1600void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) {
1601// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
1602}
1603
1604
1605
1606void debugStream(int isock, void *buf, int len) {
1607}
1608
1609void debugHead(int i, int j, void *buf) {
1610}
1611
1612
1613void factOut(int severity, int err, char* message ) {
1614static FILE * fd ;
1615static int file=0 ;
1616
1617 if (file==0) {
1618 printf("open file\n");
1619 fd=fopen("x.out","w+") ;
1620 file=999;
1621 }
1622
1623 fprintf(fd,"%3d %3d | %s \n",severity,err,message) ;
1624
1625 if (severity != kDebug)
1626 printf("%3d %3d | %s\n",severity,err,message) ;
1627}
1628
1629
1630
1631int main() {
1632 int i,b,c,p ;
1633 char ipStr[100] ;
1634 struct in_addr IPaddr ;
1635
1636 g_maxMem = 1024*1024 ; //MBytes
1637 g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
1638
1639
1640 g_runStat = 40 ;
1641
1642 i=0 ;
1643
1644// version for standard crates
1645//for (c=0; c<4,c++) {
1646// for (b=0; b<10; b++) {
1647// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
1648//
1649// inet_pton(PF_INET, ipStr, &IPaddr) ;
1650//
1651// g_port[i].sockAddr.sin_family = PF_INET;
1652// g_port[i].sockAddr.sin_port = htons(5000) ;
1653// g_port[i].sockAddr.sin_addr = IPaddr ;
1654// g_port[i].sockDef = 1 ;
1655// i++ ;
1656// }
1657//}
1658//
1659//version for PC-test *
1660 for (c=0; c<4; c++) {
1661 for (b=0; b<10; b++) {
1662 sprintf(ipStr,"10.0.%d.11",128+c) ;
1663 if (c==0) sprintf(ipStr,"10.0.100.11") ;
1664
1665 inet_pton(PF_INET, ipStr, &IPaddr) ;
1666 p = 31919+100*c+10*b;
1667
1668
1669 g_port[i].sockAddr.sin_family = PF_INET;
1670 g_port[i].sockAddr.sin_port = htons(p) ;
1671 g_port[i].sockAddr.sin_addr = IPaddr ;
1672 g_port[i].sockDef = 1 ;
1673
1674 i++ ;
1675 }
1676 }
1677
1678
1679//g_port[17].sockDef =-1 ;
1680//g_actBoards-- ;
1681
1682 StartEvtBuild() ;
1683
1684 return 0;
1685
1686}
1687#endif
Note: See TracBrowser for help on using the repository browser.