source: trunk/FACT++/src/EventBuilder.c@ 11338

Last change on this file since 11338 was 11335, checked in by tbretz, 13 years ago
Added g_reset and some other updates and minor fixes.
File size: 49.5 KB
Line 
1
2
3#define PX8 99 //simulator does not create double-length roi for pixel 8
4 //for real data, set PX8 = 8 ==> ask for double roi=TM
5
6
7
8#include <stdlib.h>
9#include <stdint.h>
10#include <unistd.h>
11#include <stdio.h>
12#include <sys/time.h>
13#include <arpa/inet.h>
14#include <string.h>
15#include <math.h>
16#include <error.h>
17#include <errno.h>
18#include <unistd.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <pthread.h>
22#include <sched.h>
23
24#include "EventBuilder.h"
25
26enum Severity
27{
28 kMessage = 10, ///< Just a message, usually obsolete
29 kInfo = 20, ///< An info telling something which can be interesting to know
30 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
31 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
32 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
33 kDebug = 99, ///< A message used for debugging only
34};
35
36#define MIN_LEN 32 // min #bytes needed to interpret FADheader
37#define MAX_LEN 256*1024 // size of read-buffer per socket
38
39extern FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) ;
40extern int runWrite(FileHandle_t fileHd , EVENT *event, size_t len ) ;
41extern int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len ) ;
42extern void factOut(int severity, int err, char* message ) ;
43
44extern void factStat(GUI_STAT gj) ;
45
46extern void factStatNew(EVT_STAT gi) ;
47
48extern int eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ;
49
50extern void debugHead(int i, int j, void *buf);
51
52extern void debugRead(int isock, int ibyte, int32_t event,int32_t ftmevt,
53 int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) ;
54extern void debugStream(int isock, void *buf, int len) ;
55
56int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
57
58
59uint g_actTime ;
60int g_runStat ;
61int g_reset ;
62size_t g_maxMem ; //maximum memory allowed for buffer
63
64//no longer needed ...
65 int g_maxBoards ; //maximum number of boards to be initialized
66 int g_actBoards ;
67//
68
69FACT_SOCK g_port[NBOARDS] ; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
70
71
72 int gi_runStat ;
73 int gp_runStat ;
74 int gw_runStat ;
75
76 int gi_memStat = +1 ;
77
78 int32_t gi_myRun ;
79
80
81
82uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
83
84//uint gi_EvtStart= 0 ;
85//uint gi_EvtRead = 0 ;
86//uint gi_EvtBad = 0 ;
87//uint gi_EvtTot = 0 ;
88//size_t gi_usedMem = 0 ;
89
90//uint gw_EvtTot = 0 ;
91//uint gp_EvtTot = 0 ;
92
93PIX_MAP g_pixMap[NPIX] ;
94
95EVT_STAT gi ;
96GUI_STAT gj ;
97
98EVT_CTRL evtCtrl ; //control of events during processing
99int evtIdx[MAX_EVT*MAX_RUN] ; //index from mBuffer to evtCtrl
100
101WRK_DATA mBuffer[MAX_EVT*MAX_RUN]; //local working space
102
103
104
105
106RUN_HEAD actRun ;
107
108RUN_CTRL runCtrl[MAX_RUN] ;
109
110RUN_TAIL runTail[MAX_RUN] ;
111
112
113/*
114*** Definition of rdBuffer to read in IP packets; keep it global !!!!
115 */
116
117
118typedef union {
119 int8_t B[MAX_LEN/8];
120 int16_t S[MAX_LEN/4];
121 int32_t I[MAX_LEN/2];
122 int64_t L[MAX_LEN ];
123} CNV_FACT ;
124
125typedef struct {
126 int bufTyp ; //what are we reading at the moment: 0=header 1=data -1=skip ...
127 int32_t bufPos ; //next byte to read to the buffer next
128 int32_t bufLen ; //number of bytes left to read
129 int32_t skip ; //number of bytes skipped before start of event
130
131 int sockStat ; //-1 if socket not yet connected , 99 if not exist
132 int socket ; //contains the sockets
133 struct sockaddr_in SockAddr ; //IP for each socket
134
135 int evtID ; // event ID of event currently read
136 int runID ; // run "
137 int ftmID ; // event ID from FTM
138 uint fadLen ; // FADlength of event currently read
139 int fadVers ; // Version of FAD
140 int board ; // boardID (softwareID: 0..40 )
141 int Port ;
142
143 CNV_FACT *rBuf ;
144
145} READ_STRUCT ;
146
147
148typedef union {
149 int8_t B[2];
150 int16_t S ;
151} SHORT_BYTE ;
152
153
154
155
156
157#define MXSTR 1000
158char str[MXSTR] ;
159
160SHORT_BYTE start, stop;
161
162READ_STRUCT rd[MAX_SOCK] ; //buffer to read IP and afterwards store in mBuffer
163
164
165
166/*-----------------------------------------------------------------*/
167
168
169/*-----------------------------------------------------------------*/
170
171
172
173int GenSock(int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) {
174/*
175*** generate Address, create sockets and allocates readbuffer for it
176***
177*** if flag==0 generate socket and buffer
178*** <0 destroy socket and buffer
179*** >0 close and redo socket
180***
181*** sid : board*7 + port id
182 */
183
184 int j ;
185
186 if (rd->sockStat ==0 ) { //close socket if open
187 j=close(rd->socket) ;
188 if (j>0) {
189 snprintf(str,MXSTR,"Error closing socket %d | %m",sid);
190 factOut(kFatal,771, str ) ;
191 } else {
192 snprintf(str,MXSTR,"Succesfully closed socket %d",sid);
193 factOut(kInfo,771, str ) ;
194 }
195 }
196
197
198 if (flag < 0) {
199 free(rd->rBuf) ; //and never open again
200 rd->rBuf = NULL ;
201 rd->sockStat = 99 ;
202 return 0 ;
203 }
204
205
206 if (flag == 0) { //generate address and buffer ...
207 rd->Port = port ;
208 rd->SockAddr.sin_family = sockAddr->sin_family;
209 rd->SockAddr.sin_port = htons(port) ;
210 rd->SockAddr.sin_addr = sockAddr->sin_addr ;
211
212 rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
213 if ( rd->rBuf == NULL ) {
214 snprintf(str,MXSTR,"Could not create local buffer %d",sid);
215 factOut(kFatal,774, str ) ;
216 rd->sockStat = 77 ;
217 return -3 ;
218 }
219 }
220
221
222 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
223 snprintf(str,MXSTR,"Could not generate socket %d | %m",sid);
224 factOut(kFatal,773, str ) ;
225 rd->sockStat = 88 ;
226 return -2 ;
227 }
228
229 snprintf(str,MXSTR,"Successfully generated socket %d ",sid);
230 factOut(kInfo,773, str ) ;
231 rd->sockStat = -1 ; //try to (re)open socket
232 return 0 ;
233
234} /*-----------------------------------------------------------------*/
235
236 /*-----------------------------------------------------------------*/
237
238
239
240
241int mBufInit() {
242// initialize mBuffer (mark all entries as unused\empty)
243
244 int i ;
245 uint32_t actime ;
246
247 actime = g_actTime + 50000000 ;
248
249 for (i=0; i<MAX_EVT*MAX_RUN; i++) {
250 mBuffer[i].evNum = mBuffer[i].runNum = -1;
251
252 evtCtrl.evtBuf[ i] = -1 ;
253 evtCtrl.evtStat[ i] = -1 ;
254 evtCtrl.pcTime[ i] = actime ; //initiate to far future
255 }
256
257
258 actRun.FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ;
259
260 evtCtrl.frstPtr = 0 ;
261 evtCtrl.lastPtr = 0 ;
262
263 return 0 ;
264
265} /*-----------------------------------------------------------------*/
266
267
268
269
270int mBufEvt(uint evID, uint runID, uint nRoi) {
271// generate a new Event into mBuffer:
272// make sure only complete Event are possible, so 'free' will always work
273// returns index into mBuffer[], or negative value in case of error
274
275
276 int i, k, evFree ;
277 int headmem=0 ;
278 size_t needmem = 0 ;
279
280 if (nRoi <=0 || nRoi > 1024) {
281 snprintf(str,MXSTR,"illegal nRoi: %d",nRoi) ;
282 factOut(kError, 1, str ) ;
283 return -9999 ;
284 }
285
286 i = evID % MAX_EVT ;
287 evFree = -1 ;
288
289 for ( k=0; k<MAX_RUN; k++) {
290 if ( mBuffer[i].evNum == evID
291 && mBuffer[i].runNum== runID ) {
292 return i ;
293 }
294 if ( evFree < 0 && mBuffer[i].evNum < 0 ) evFree = i ;
295 i += MAX_EVT ;
296 }
297
298
299 //event does not yet exist; create
300 if (evFree < 0 ) { //no space available in ctrl
301 snprintf(str,MXSTR,"no control slot to keep event %d",evID) ;
302 factOut(kError,881, str ) ;
303 return -1 ;
304 }
305 i = evFree ; //found free entry; use it ...
306
307
308 needmem = sizeof(EVENT) + NPIX*nRoi*2 + NTMARK*nRoi*2; //
309
310 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
311
312 if ( gj.usdMem + needmem + headmem > g_maxMem) {
313 gj.maxMem = gj.usdMem + needmem + headmem ;
314 if (gi_memStat>0 ) {
315 gi_memStat = -99 ;
316 snprintf(str,MXSTR,"no memory left to keep event %d",evID) ;
317 factOut(kError,882, str ) ;
318 }
319 return -11 ;
320 }
321
322 mBuffer[i].FADhead = malloc( headmem ) ;
323 if (mBuffer[i].FADhead == NULL) {
324 snprintf(str,MXSTR,"malloc header failed for event %d",evID) ;
325 factOut(kError,882, str ) ;
326 return -12;
327 }
328
329 mBuffer[i].fEvent = malloc( needmem ) ;
330 if (mBuffer[i].fEvent == NULL) {
331 snprintf(str,MXSTR,"malloc data failed for event %d",evID) ;
332 factOut(kError,882, str ) ;
333 free(mBuffer[i].fEvent) ;
334 mBuffer[i].fEvent = NULL ;
335 return -22;
336 }
337
338 //flag all boards as unused
339 mBuffer[i].nBoard = 0 ;
340 for (k=0; k<NBOARDS; k++ ) {
341 mBuffer[i].board[k] = -1;
342 }
343
344 //flag all pixels as unused
345 for (k=0; k<NPIX; k++ ) {
346 mBuffer[i].fEvent->StartPix[k] = -1 ;
347 }
348
349 //flag all TMark as unused
350 for (k=0; k<NTMARK; k++ ) {
351 mBuffer[i].fEvent->StartTM[k] = -1 ;
352 }
353
354 mBuffer[i].pcTime = g_actTime ;
355 mBuffer[i].nRoi = nRoi ;
356 mBuffer[i].evNum = evID ;
357 mBuffer[i].runNum = runID ;
358 mBuffer[i].evtLen = needmem ;
359
360 gj.usdMem += needmem + headmem;
361 if (gj.usdMem > gj.maxMem ) gj.maxMem = gj.usdMem ;
362
363 gj.rateNew++ ;
364
365 //register event in 'active list (reading)'
366
367 evtCtrl.evtBuf[ evtCtrl.lastPtr] = i ;
368 evtCtrl.evtStat[ evtCtrl.lastPtr] = 0 ;
369 evtCtrl.pcTime[ evtCtrl.lastPtr] = g_actTime ;
370 evtIdx[i] = evtCtrl.lastPtr ;
371
372
373snprintf(str,MXSTR,"%5d start new evt %8d %8d %2d",evID,i,evtCtrl.lastPtr,0);
374factOut(kDebug,-11, str ) ;
375 evtCtrl.lastPtr++ ;
376 if (evtCtrl.lastPtr == MAX_EVT*MAX_RUN ) evtCtrl.lastPtr = 0;
377
378 gi.evtGet++ ;
379
380 //check if runId already registered in runCtrl
381 evFree = -1 ;
382 for (k=0; k<MAX_RUN; k++) {
383 if (runCtrl[k].runId == runID ) return i ;//run exists already
384 else if (evFree < 0 && runCtrl[k].runId == 0 ) evFree = k ;
385 }
386
387 if (evFree <0 ) {
388 snprintf(str,MXSTR,"not able to register the new run %d",runID);
389 factOut(kFatal,883, str ) ;
390 } else {
391 runCtrl[evFree].runId = runID ;
392 runCtrl[evFree].fileId = -2 ;
393 runCtrl[evFree].nextEvt= 0;
394 runCtrl[evFree].actEvt = 0;
395 runCtrl[evFree].maxEvt = 999999999 ; //max number events allowed
396 runCtrl[evFree].lastTime=g_actTime ;
397 runCtrl[evFree].closeTime=g_actTime + 3600*24 ; //max time allowed
398 runCtrl[evFree].lastTime = 0 ;
399
400 runTail[evFree].nEventsOk =
401 runTail[evFree].nEventsRej =
402 runTail[evFree].nEventsBad =
403 runTail[evFree].PCtime0 =
404 runTail[evFree].PCtimeX = 0 ;
405 }
406
407 return i ;
408
409} /*-----------------------------------------------------------------*/
410
411
412
413
414int mBufFree(int i) {
415//delete entry [i] from mBuffer:
416//(and make sure multiple calls do no harm ....)
417
418 int headmem=0 ;
419 size_t freemem = 0 ;
420
421 if ( mBuffer[i].nRoi > 0) { //have an fEvent structure generated ...
422 freemem = mBuffer[i].evtLen ;
423 free(mBuffer[i].fEvent ) ;
424 mBuffer[i].fEvent = NULL ;
425
426 free(mBuffer[i].FADhead ) ;
427 mBuffer[i].FADhead = NULL ;
428
429 }
430 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
431 mBuffer[i].evNum = mBuffer[i].runNum = mBuffer[i].nRoi= -1;
432
433 gj.usdMem = gj.usdMem - freemem - headmem;
434
435 if (gi_memStat < 0 ) {
436 if (gj.usdMem <= 0.75 * gj.maxMem ) gi_memStat = +1 ;
437 }
438
439
440 return 0 ;
441
442} /*-----------------------------------------------------------------*/
443
444
445void resetEvtStat() {
446 int i ;
447
448 for (i=0; i<MAX_SOCK; i++) gi.numRead[i] = 0 ;
449
450 for (i=0; i<NBOARDS; i++ ) {
451 gi.gotByte[i] = 0 ;
452 gi.gotErr[i] = 0 ;
453 }
454
455 gi.evtGet = 0 ; //#new Start of Events read
456 gi.evtTot = 0 ; //#complete Events read
457 gi.evtErr = 0 ; //#Events with Errors
458 gi.evtSkp = 0 ; //#Events incomplete (timeout)
459
460 gi.procTot = 0 ; //#Events processed
461 gi.procErr = 0 ; //#Events showed problem in processing
462 gi.procTrg = 0 ; //#Events accepted by SW trigger
463 gi.procSkp = 0 ; //#Events rejected by SW trigger
464
465 gi.feedTot = 0 ; //#Events used for feedBack system
466 gi.feedErr = 0 ; //#Events rejected by feedBack
467
468 gi.wrtTot = 0 ; //#Events written to disk
469 gi.wrtErr = 0 ; //#Events with write-error
470
471 gi.runOpen = 0 ; //#Runs opened
472 gi.runClose= 0 ; //#Runs closed
473 gi.runErr = 0 ; //#Runs with open/close errors
474
475return ;
476} /*-----------------------------------------------------------------*/
477
478
479
480void initReadFAD() {
481return ;
482} /*-----------------------------------------------------------------*/
483
484
485
486void *readFAD( void *ptr ) {
487/* *** main loop reading FAD data and sorting them to complete events */
488 int head_len,frst_len,numok,numok2,dest,evID,i,k ;
489 int actBoards = 0, minLen ;
490 int32_t jrd ;
491 uint gi_SecTime ; //time in seconds
492 int boardId, roi,drs,px,src,pixS,pixH,pixC,pixR,tmS ;
493
494 int goodhed = 0 ;
495 int errcnt0 = 0 ;
496
497 int reset, resetR, resetS, resetW, resetX ;
498 int sockDef[NBOARDS]; //internal state of sockets
499
500
501
502 struct timespec xwait ;
503
504
505 struct timeval *tv, atv;
506 tv=&atv;
507 uint32_t tsec, tusec ;
508
509
510 snprintf(str,MXSTR,"start initializing");
511 factOut(kInfo,-1, str ) ;
512
513 int cpu = 7 ; //read thread
514 cpu_set_t mask;
515
516/* CPU_ZERO initializes all the bits in the mask to zero. */
517 CPU_ZERO( &mask );
518/* CPU_SET sets only the bit corresponding to cpu. */
519 cpu = 7 ;
520 CPU_SET( cpu, &mask );
521
522/* sched_setaffinity returns 0 in success */
523 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
524 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
525 factOut(kWarn,-1, str ) ;
526 }
527
528 head_len = sizeof(PEVNT_HEADER) ;
529 frst_len = head_len ; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
530 minLen = head_len ; //min #bytes needed to check header: full header for debug
531
532 start.S=0xFB01;
533 stop.S= 0x04FE;
534
535/* initialize run control logics */
536 for (i=0; i<MAX_RUN; i++) {
537 runCtrl[i].runId = 0 ;
538 runCtrl[i].fileId = -2 ;
539 }
540 resetS = resetR = 9;
541
542
543START:
544 evtCtrl.frstPtr = 0 ;
545 evtCtrl.lastPtr = 0 ;
546
547 gi_myRun = g_actTime ;
548 gi_SecTime = g_actTime ;
549 gi_runStat = g_runStat ;
550 gj.readStat= g_runStat ;
551 numok = numok2 = 0 ;
552
553 if ( resetS > 0) {
554 //make sure all sockets are preallocated as 'not exist'
555 for (i=0; i<MAX_SOCK; i++) {
556 rd[i].socket = -1 ;
557 rd[i].sockStat = 99 ;
558 }
559
560 for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ;
561
562 }
563
564
565 if ( resetR > 0) {
566 resetEvtStat();
567 gj.usdMem = gj.maxMem = gj.xxxMem = 0 ;
568 gj.totMem = g_maxMem ;
569 gj.bufNew = gj.bufEvt = 0 ;
570 gj.evtSkip= gj.evtWrite = gj.evtErr = 0 ;
571
572 for (k=0; k<NBOARDS; k++) {
573 gi_NumConnect[k]=0;
574 gi.numConn[k] =0;
575 gj.numConn[k] =0;
576 gj.errConn[k] =0;
577 gj.rateBytes[k] =0;
578 gj.totBytes[k] =0;
579 }
580
581 mBufInit() ; //initialize buffers
582
583 snprintf(str,MXSTR,"end initializing");
584 factOut(kInfo,-1, str ) ;
585 }
586
587
588 reset = resetR = resetS = resetW = 0 ;
589
590 while (g_runStat >=0 || g_reset ==0 ) { //loop until global variable g_runStat claims stop
591
592 gi_runStat = g_runStat;
593 gj.readStat= g_runStat;
594
595 int b,p,p0,s0,nch;
596 nch = 0 ;
597 for (b=0; b<NBOARDS; b++ ) {
598 k = b*7 ;
599 if ( g_port[b].sockDef != sockDef[b] ) { //something has changed ...
600 nch++ ;
601 gi_NumConnect[ b ] = 0 ; //must close all connections
602 gi.numConn[ b ] = 0;
603 gj.numConn[ b ] = 0;
604 if ( sockDef[b] == 0) s0= 0 ; //sockets to be defined and opened
605 else if (g_port[b].sockDef == 0) s0=-1 ; //sockets to be destroyed
606 else s0=+1 ; //sockets to be closed and reopened
607
608 if (s0 == 0) p0=ntohs(g_port[b].sockAddr.sin_port);
609 else p0=0 ;
610
611 for (p=p0+1; p<p0+8; p++) {
612 GenSock(s0, k, p, &g_port[b].sockAddr, &rd[k]) ; //generate address and socket
613 k++ ;
614 }
615 sockDef[b] = g_port[b].sockDef ;
616 }
617 }
618
619 if (nch > 0 ) {
620 actBoards = 0 ;
621 for (b=0; b<NBOARDS; b++ ) {
622 if ( sockDef[b] > 0 ) actBoards++ ;
623 }
624 }
625
626
627
628
629 numok = 0 ; //count number of succesfull actions
630
631 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read
632 b = i / 7 ;
633 if (sockDef[b] > 0) s0=+1 ;
634 else s0=-1 ;
635
636 gettimeofday( tv, NULL);
637 g_actTime = tsec = atv.tv_sec ;
638 tusec= atv.tv_usec ;
639
640 if (rd[i].sockStat <0 ) { //try to connect if not yet done
641 rd[i].sockStat=connect(rd[i].socket,
642 (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ;
643 if (rd[i].sockStat ==0 ) { //successfull ==>
644 if (sockDef[b] > 0) {
645 rd[i].bufTyp = 0 ; // expect a header
646 rd[i].bufLen = frst_len ; // max size to read at begining
647 } else {
648 rd[i].bufTyp = -1 ; // full data to be skipped
649 rd[i].bufLen = sizeof(CNV_FACT) ; //huge for skipping
650 }
651 rd[i].bufPos = 0 ; // no byte read so far
652 rd[i].skip = 0 ; // start empty
653 gi_NumConnect[ b ]++ ;
654 gi.numConn[ b ]++ ;
655 gj.numConn[ b ]++ ;
656 snprintf(str,MXSTR,"+++connect %d %d",b,gi.numConn[ b ]);
657 factOut(kInfo,-1, str ) ;
658 }
659 }
660
661 if (rd[i].sockStat ==0 ) { //we have a connection ==> try to read
662 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
663 numok++ ;
664 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
665
666 if (jrd >0 ) {
667 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ;
668 }
669
670 if (jrd == 0) { //connection has closed ...
671 snprintf(str,MXSTR,"Socket %d closed by FAD",i);
672 factOut(kInfo,441, str ) ;
673 GenSock(s0, i, 0,NULL, &rd[i]) ;
674 gi.gotErr[ b ]++ ;
675 gi_NumConnect[ b ]-- ;
676 gi.numConn[ b ]-- ;
677 gj.numConn[ b ]-- ;
678
679 } else if ( jrd<0 ) { //did not read anything
680 if (errno != EAGAIN && errno != EWOULDBLOCK ) {
681 snprintf(str,MXSTR,"Error Reading from %d | %m",i);
682 factOut(kError,442, str ) ;
683 gi.gotErr[ b ]++ ;
684 } else numok-- ; //else nothing waiting to be read
685 jrd = 0 ;
686 }
687 } else jrd = 0 ; //did read nothing as requested
688
689 gi.gotByte[ b ] += jrd ;
690 gj.rateBytes[b] += jrd ;
691
692 if ( rd[i].bufTyp <0 ) { // we are skipping this board ...
693// just do nothing
694
695 } else if ( rd[i].bufTyp >0 ) { // we are reading data ...
696 if ( jrd < rd[i].bufLen ) { //not yet all read
697 rd[i].bufPos += jrd ; //==> prepare for continuation
698 rd[i].bufLen -= jrd ;
699 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
700 } else { //full dataset read
701 rd[i].bufLen = 0 ;
702 rd[i].bufPos = rd[i].fadLen ;
703 if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0]
704 && rd[i].rBuf->B[ rd[i].bufPos ] != stop.B[1]) {
705 gi.evtErr++ ;
706 snprintf(str,MXSTR,"wrong end of buffer found %d",rd[i].bufPos);
707 factOut(kError,301, str ) ;
708 goto EndBuf ;
709
710 }
711 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
712
713 //we have a complete buffer, copy to WORK area
714 roi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 ]) ;
715 //get index into mBuffer for this event (create if needed)
716 evID = mBufEvt( rd[i].evtID, rd[i].runID, roi ) ;
717
718 if (evID <-9000) goto EndBuf ; //illegal event, skip it ...
719 if (evID < 0) {
720 xwait.tv_sec = 0;
721 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec
722 nanosleep( &xwait , NULL ) ;
723 goto EndBuf1 ; //hope there is free space next round
724 }
725
726
727 //we have a valid entry in mBuffer[]; fill it
728
729 boardId = b ;
730 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
731 int fadCrate = fadBoard/256 ;
732 if (boardId != (fadCrate*10 + fadBoard%256) ) {
733 snprintf(str,MXSTR,"wrong Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
734 if (errcnt0++ < 99 ) factOut(kWarn,301, str ) ; //print only few times
735 }
736 if ( mBuffer[evID].board[ boardId ] != -1) {
737 snprintf(str,MXSTR,"double board %d for event %d",boardId,evID) ;
738 factOut(kWarn,501, str ) ;
739 goto EndBuf ; //--> skip Board
740 }
741
742 int iDx = evtIdx[evID] ; //index into evtCtrl
743
744 memcpy( &mBuffer[evID].FADhead[boardId].start_package_flag,
745 &rd[i].rBuf->S[0], head_len) ;
746 roi = mBuffer[evID].nRoi ;
747
748 pixS = boardId*36 -1 ; //
749 tmS = boardId*4 -1 ; //
750 src = head_len/2 ;
751 for ( drs=0; drs<4; drs++ ) {
752 for ( px=0; px<9; px++ ) {
753 pixH= ntohs(rd[i].rBuf->S[src++]) ;
754 pixC= ntohs(rd[i].rBuf->S[src++]) ;
755 pixR= ntohs(rd[i].rBuf->S[src++]) ;
756
757 src++ ;
758 pixS++ ; //pixS = pixH2S[pixH] ;
759 if ( ( px < PX8 && pixR == roi )
760 || ( px ==PX8 && pixR == 2*roi )
761 || ( px ==PX8 && pixR == roi && roi > 512 ) ) {
762 // correct roi
763 mBuffer[evID].fEvent->StartPix[pixS] =pixC;
764 dest= pixS * roi ;
765 memcpy(
766 &mBuffer[evID].fEvent->Adc_Data[dest],
767 &rd[i].rBuf->S[src], roi * 2) ;
768 src+= roi ;
769 if ( px==PX8 ) {
770 tmS++; // tmS = tmH2S[pixH]
771 dest= tmS * roi + NPIX* roi ;
772 if ( roi <=512 ) {
773 mBuffer[evID].fEvent->StartTM[tmS] =(pixC+roi)%1024 ;
774 memcpy(
775 &mBuffer[evID].fEvent->Adc_Data[dest],
776 &rd[i].rBuf->S[src], roi * 2) ;
777 src+=roi ;
778 } else {
779 mBuffer[evID].fEvent->StartTM[tmS] = -1 ;
780 }
781 }
782 } else {
783 snprintf(str,MXSTR,"wrong roi %d %d %d %d",px,pixR,roi,src-2);
784 gi.evtErr++ ;
785 factOut(kError,202, str ) ;
786 goto EndBuf ;
787 }
788 }
789 }// now we have stored a new board contents into Event structure
790
791 mBuffer[evID].board[ boardId ] = boardId ;
792 evtCtrl.evtStat[ iDx ]++ ;
793 evtCtrl.pcTime[ iDx ] = g_actTime ;
794
795 if (++mBuffer[evID].nBoard >= actBoards ) {
796 snprintf(str,MXSTR,"%5d complete event %8d %8d %2d",mBuffer[evID].evNum,evtCtrl.evtBuf[iDx],iDx,evtCtrl.evtStat[ iDx ]);
797 factOut(kDebug,-1, str ) ;
798 //complete event read ---> flag for next processing
799 evtCtrl.evtStat[ iDx ] = 99;
800 gi.evtTot++ ;
801 }
802
803EndBuf:
804 rd[i].bufTyp = 0 ; //ready to read next header
805 rd[i].bufLen = frst_len ;
806 rd[i].bufPos = 0 ;
807EndBuf1:
808 ;
809 }
810
811 } else { //we are reading event header
812 rd[i].bufPos += jrd ;
813 rd[i].bufLen -= jrd ;
814 if ( rd[i].bufPos >= minLen ){ //sufficient data to take action
815 //check if startflag correct; else shift block ....
816 for (k=0; k<rd[i].bufPos -1 ; k++) {
817 if (rd[i].rBuf->B[k ] == start.B[1]
818 && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
819 }
820 rd[i].skip += k ;
821
822 if (k >= rd[i].bufPos-1 ) { //no start of header found
823// snprintf(str,MXSTR,"no start of header on port%d", i ) ;
824// factOut(kWarn,666, str ) ;
825
826 rd[i].bufPos = 0 ;
827 rd[i].bufLen = head_len ;
828 } else if ( k>0 ) {
829 rd[i].bufPos -= k ;
830 rd[i].bufLen += k ;
831 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ;
832 }
833 if ( rd[i].bufPos >= minLen ) {
834 if ( rd[i].skip >0 ) {
835 snprintf(str,MXSTR,"skipped %d bytes on port%d", rd[i].skip, i ) ;
836 factOut(kInfo,666, str ) ;
837 rd[i].skip = 0 ;
838 }
839 goodhed++;
840 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ;
841 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
842 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; //(FADevt)
843 rd[i].ftmID = ntohl(rd[i].rBuf->I[5]) ; //(FTMevt)
844 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ;
845 if (rd[i].runID ==0 ) rd[i].runID = gi_myRun ;
846 rd[i].bufTyp = 1 ; //ready to read full record
847 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
848 if (rd[i].bufLen <=0 ) rd[i].bufLen = 100000 ; //?
849 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
850 debugHead(i,fadBoard,rd[i].rBuf);
851 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event
852 } else {
853 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
854 }
855 } else {
856 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
857 }
858
859 } //end interpreting last read
860 } //end of successful read anything
861 } //finished trying to read all sockets
862
863 gi.numRead[ numok ] ++ ;
864
865 g_actTime = time(NULL) ;
866 if ( g_actTime > gi_SecTime ) {
867 gi_SecTime = g_actTime ;
868
869
870 //loop over all active events and flag those older than read-timeout
871 //delete those that are written to disk ....
872
873 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
874 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
875
876 gj.bufNew = gj.bufEvt = 0 ;
877 int k1=evtCtrl.frstPtr;
878 for ( k=k1; k<(k1+kd); k++ ) {
879 int k0 = k % (MAX_EVT*MAX_RUN) ;
880//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
881
882 if (evtCtrl.evtStat[k0] > 0
883 && evtCtrl.evtStat[k0] < 92 ) {
884 gj.bufNew++ ; //incomplete event in Buffer
885 if ( evtCtrl.evtStat[k0] < 90
886 && evtCtrl.pcTime[k0] < g_actTime-10 ) {
887 int id =evtCtrl.evtBuf[k0] ;
888 snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]);
889 factOut(kWarn,601, str ) ;
890 evtCtrl.evtStat[k0] = 91 ; //timeout for incomplete events
891 gi.evtSkp++ ;
892 gi.evtTot++ ;
893 gj.evtSkip++;
894 }
895 } else if (evtCtrl.evtStat[k0] >= 900 ) {
896
897 int id =evtCtrl.evtBuf[k0] ;
898 snprintf(str,MXSTR,"%5d free event buffer (written) %3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
899 factOut(kDebug,-1, str ) ;
900 mBufFree(id) ; //event written--> free memory
901 evtCtrl.evtStat[k0] = -1;
902 gj.evtWrite++ ;
903 gj.rateWrite++ ;
904 } else if (evtCtrl.evtStat[k0] >= 95 ) {
905 gj.bufEvt++ ; //complete event in Buffer
906 }
907
908 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) {
909 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
910 }
911 }
912
913
914 gj.deltaT = 1000 ; //temporary, must be improved
915
916 int b;
917 for ( b=0; b<NBOARDS; b++) gj.totBytes[b] +=gj.rateBytes[b] ;
918 gj.totMem = g_maxMem ;
919 if (gj.maxMem >= gj.xxxMem) gj.xxxMem = gj.maxMem ;
920
921 factStat(gj);
922 factStatNew(gi) ;
923
924 gj.rateNew = gj.rateWrite = 0 ;
925 for ( b=0; b<NBOARDS; b++) gj.rateBytes[b] =0 ;
926 }
927
928 if (numok > 0 ) numok2=0;
929 else if (numok2++ > 3) {
930 if (g_runStat == 1) {
931 xwait.tv_sec = 1;
932 xwait.tv_nsec= 0 ; // hibernate for 1 sec
933 } else {
934 xwait.tv_sec = 0;
935 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
936 }
937 nanosleep( &xwait , NULL ) ;
938 }
939
940 } //and do next loop over all sockets ...
941
942
943 snprintf(str,MXSTR,"stop reading ... RESET=%d",g_reset);
944 factOut(kInfo,-1, str ) ;
945
946 if (g_reset >0 ) {
947 reset = g_reset ;
948 resetR = reset%10 ; //shall we stop reading ?
949 resetS = (reset/10)%10 ; //shall we close sockets ?
950 resetW = (reset/100)%10 ; //shall we close files ?
951 resetX = reset/1000 ; //shall we simply wait resetX seconds ?
952 g_reset= 0;
953 } else {
954 reset = 0;
955 if ( g_runStat==-1 ) resetR = 1 ;
956 else resetR = 7 ;
957 resetS = 7 ; //close all sockets
958 resetW = 7 ; //close all files
959 resetX = 0 ;
960
961 //inform others we have to quit ....
962 gi_runStat = -11 ; //inform all that no update to happen any more
963 gj.readStat= -11 ; //inform all that no update to happen any more
964 }
965
966 if (resetS > 0) {
967 //must close all open sockets ...
968 snprintf(str,MXSTR,"close all sockets ...");
969 factOut(kInfo,-1, str ) ;
970 for (i=0; i<MAX_SOCK; i++) {
971 if (rd[i].sockStat ==0 ) {
972 GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy socket
973 gi_NumConnect[ i/7 ]-- ;
974 gi.numConn[ i/7 ]-- ;
975 gj.numConn[ i/7 ]-- ;
976 }
977 }
978 }
979
980
981 if (resetR > 0) {
982 //flag all events as 'read finished'
983 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
984 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
985
986 int k1=evtCtrl.frstPtr;
987 for ( k=k1; k<(k1+kd); k++ ) {
988 int k0 = k % (MAX_EVT*MAX_RUN) ;
989 if (evtCtrl.evtStat[k0] > 0
990 && evtCtrl.evtStat[k0] < 90 ) {
991 evtCtrl.evtStat[k0] = 91 ;
992 gi.evtSkp++ ;
993 gi.evtTot++ ;
994 }
995 }
996
997 xwait.tv_sec = 0;
998 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
999 nanosleep( &xwait , NULL ) ;
1000
1001 //and clear all buffers (might have to wait until all others are done)
1002 int minclear ;
1003 if (resetR == 1) {
1004 minclear = 900 ;
1005 snprintf(str,MXSTR,"drain all buffers ...");
1006 } else {
1007 minclear = 0 ;
1008 snprintf(str,MXSTR,"flush all buffers ...");
1009 }
1010 factOut(kInfo,-1, str ) ;
1011
1012 int numclear=1 ;
1013 while (numclear > 0 ) {
1014 numclear = 0 ;
1015 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1016 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1017
1018 int k1=evtCtrl.frstPtr;
1019 for ( k=k1; k<(k1+kd); k++ ) {
1020 int k0 = k % (MAX_EVT*MAX_RUN) ;
1021 if (evtCtrl.evtStat[k0] > minclear ) {
1022 int id =evtCtrl.evtBuf[k0] ;
1023 mBufFree(id) ; //event written--> free memory
1024 evtCtrl.evtStat[k0] = -1;
1025 } else if (evtCtrl.evtStat[k0] > 0) numclear++ ; //writing is still ongoing...
1026
1027 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 )
1028 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
1029 }
1030
1031 xwait.tv_sec = 0;
1032 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1033 nanosleep( &xwait , NULL ) ;
1034 }
1035 }
1036
1037 if (reset > 0) {
1038 if (resetW > 0) {
1039 CloseRunFile(0,0,0) ; //ask all Runs to be closed
1040 }
1041 if (resetX > 0) {
1042 xwait.tv_sec = resetX;
1043 xwait.tv_nsec= 0 ;
1044 nanosleep( &xwait , NULL ) ;
1045 }
1046
1047 snprintf(str,MXSTR,"Continue read Process ...");
1048 factOut(kInfo,-1, str ) ;
1049 reset = 0 ;
1050 goto START ;
1051 }
1052
1053
1054
1055 snprintf(str,MXSTR,"Exit read Process ...");
1056 factOut(kInfo,-1, str ) ;
1057 gi_runStat = -99 ;
1058 gj.readStat= -99 ;
1059 factStat(gj);
1060 factStatNew(gi) ;
1061 return 0;
1062
1063} /*-----------------------------------------------------------------*/
1064
1065
1066void *procEvt( void *ptr ) {
1067/* *** main loop processing file, including SW-trigger */
1068 int numProc, numWait ;
1069 int k ;
1070 struct timespec xwait ;
1071 char str[MXSTR] ;
1072
1073 cpu_set_t mask;
1074 int cpu = 5 ; //process thread (will be several in final version)
1075
1076 snprintf(str,MXSTR,"Starting process-thread");
1077 factOut(kInfo,-1, str ) ;
1078
1079/* CPU_ZERO initializes all the bits in the mask to zero. */
1080 CPU_ZERO( &mask );
1081/* CPU_SET sets only the bit corresponding to cpu. */
1082 CPU_SET( cpu, &mask );
1083/* sched_setaffinity returns 0 in success */
1084 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1085 snprintf(str,MXSTR,"P ---> can not create affinity to %d",cpu);
1086 factOut(kWarn,-1, str ) ;
1087 }
1088
1089
1090 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1091
1092 numWait = numProc = 0 ;
1093 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1094 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1095
1096 int k1=evtCtrl.frstPtr;
1097 for ( k=k1; k<(k1+kd); k++ ) {
1098 int k0 = k % (MAX_EVT*MAX_RUN) ;
1099//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1100 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
1101 int id = evtCtrl.evtBuf[k0] ;
1102 int ievt = mBuffer[id].evNum ;
1103 int roi = mBuffer[id].nRoi ;
1104// uint32_t irun = mBuffer[id].runNum ;
1105//snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ;
1106//factOut(kDebug,-1, str ) ;
1107
1108//make sure unused pixels/tmarks are cleared to zero
1109 int ip,it,dest,ib;
1110 for (ip=0; ip<NPIX; ip++) {
1111 if (mBuffer[id].fEvent->StartPix[ip] == -1 ) {
1112 dest= ip*roi ;
1113 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
1114 }
1115 }
1116 for (it=0; it<NTMARK; it++) {
1117 if (mBuffer[id].fEvent->StartTM[it] == -1 ) {
1118 dest= it*roi + NPIX*roi ;
1119 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
1120 }
1121 }
1122
1123
1124//and set correct event header ; also check for consistency in event (not yet)
1125 mBuffer[id].fEvent->Roi = roi ;
1126 mBuffer[id].fEvent->EventNum = ievt ;
1127 mBuffer[id].fEvent->TriggerType = 0 ; // TBD
1128 mBuffer[id].fEvent->SoftTrig = 0 ;
1129 for (ib=0; ib<NBOARDS; ib++) {
1130 if (mBuffer[id].board[ib] == -1 ) { //board is not read
1131 mBuffer[id].FADhead[ib].start_package_flag = 0 ;
1132 mBuffer[id].fEvent->BoardTime[ib] = 0 ;
1133 } else {
1134 mBuffer[id].fEvent->BoardTime[ib] =
1135 ntohl(mBuffer[id].FADhead[ib].time) ;
1136 }
1137 }
1138
1139 int i=eventCheck(mBuffer[id].FADhead,mBuffer[id].fEvent) ;
1140// gj.procEvt++ ;
1141 gi.procTot++ ;
1142 numProc++ ;
1143 evtCtrl.evtStat[k0] = 520 ;
1144
1145 if (i<0) {
1146 evtCtrl.evtStat[k0] = 999 ; //flag event to be skipped
1147 gi.procErr++ ;
1148 }
1149 } else if ( evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] < 90 ) {
1150 numWait++ ;
1151 }
1152 }
1153
1154 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do
1155 snprintf(str,MXSTR,"Exit Processing Process ...");
1156 factOut(kInfo,-1, str ) ;
1157 gp_runStat = -22 ; //==> we should exit
1158 gj.procStat= -22 ; //==> we should exit
1159 return 0 ;
1160 }
1161
1162 if (numProc == 0) {
1163 //seems we have nothing to do, so sleep a little
1164 xwait.tv_sec = 0;
1165 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1166 nanosleep( &xwait , NULL ) ;
1167 }
1168 gp_runStat = gi_runStat ;
1169 gj.procStat= gj.readStat ;
1170
1171 }
1172
1173 //we are asked to abort asap ==> must flag all remaining events
1174 // when gi_runStat claims that all events are in the buffer...
1175
1176 snprintf(str,MXSTR,"Abort Processing Process ...");
1177 factOut(kInfo,-1, str ) ;
1178 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1179 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1180
1181 int k1=evtCtrl.frstPtr;
1182 for ( k=k1; k<(k1+kd); k++ ) {
1183 int k0 = k % (MAX_EVT*MAX_RUN) ;
1184 if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <500) {
1185 evtCtrl.evtStat[k0] = 555 ; //flag event as 'processed'
1186 }
1187 }
1188
1189 gp_runStat = -99 ;
1190 gj.procStat= -99 ;
1191
1192 return 0;
1193
1194} /*-----------------------------------------------------------------*/
1195
1196int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt) {
1197/* close run runId (all all runs if runId=0) */
1198/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1199 int j ;
1200
1201
1202 if ( runId == 0 ) {
1203 for ( j=0; j<MAX_RUN; j++) {
1204 if ( runCtrl[j].fileId == 0 ) { //run is open
1205 runCtrl[j].closeTime = closeTime ;
1206 runCtrl[j].maxEvt = maxEvt ;
1207 }
1208 }
1209 return 0;
1210 }
1211
1212 for ( j=0; j<MAX_RUN; j++) {
1213 if ( runCtrl[j].runId == runId ) {
1214 if ( runCtrl[j].fileId == 0 ) { //run is open
1215 runCtrl[j].closeTime = closeTime ;
1216 runCtrl[j].maxEvt = maxEvt ;
1217 return 0;
1218 } else if ( runCtrl[j].fileId <0 ) { //run not yet opened
1219 runCtrl[j].closeTime = closeTime ;
1220 runCtrl[j].maxEvt = maxEvt ;
1221 return +1;
1222 } else { // run already closed
1223 return +2;
1224 }
1225 }
1226 } //we only reach here if the run was never created
1227 return -1;
1228
1229} /*-----------------------------------------------------------------*/
1230
1231
1232void *writeEvt( void *ptr ) {
1233/* *** main loop writing event (including opening and closing run-files */
1234
1235 int numWrite, numWait ;
1236 int k,j,i ;
1237 struct timespec xwait ;
1238 char str[MXSTR] ;
1239
1240 cpu_set_t mask;
1241 int cpu = 3 ; //write thread
1242
1243 snprintf(str,MXSTR,"Starting write-thread");
1244 factOut(kInfo,-1, str ) ;
1245
1246/* CPU_ZERO initializes all the bits in the mask to zero. */
1247 CPU_ZERO( &mask );
1248/* CPU_SET sets only the bit corresponding to cpu. */
1249 CPU_SET( cpu, &mask );
1250/* sched_setaffinity returns 0 in success */
1251 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1252 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
1253 }
1254
1255 int lastRun = 0 ; //usually run from last event still valid
1256
1257 while (g_runStat >-2) {
1258
1259 numWait = numWrite = 0 ;
1260 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1261 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1262
1263 int k1=evtCtrl.frstPtr;
1264 for ( k=k1; k<(k1+kd); k++ ) {
1265 int k0 = k % (MAX_EVT*MAX_RUN) ;
1266//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1267 if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] < 900) {
1268 int id = evtCtrl.evtBuf[k0] ;
1269 uint32_t irun = mBuffer[id].runNum ;
1270 int ievt = mBuffer[id].evNum ;
1271
1272 gi.wrtTot++ ;
1273 if (runCtrl[lastRun].runId == irun ) {
1274 j = lastRun ;
1275 } else {
1276 //check which fileID to use (or open if needed)
1277 for ( j=0; j<MAX_RUN; j++) {
1278 if ( runCtrl[j].runId == irun ) break ;
1279 }
1280 if ( j >= MAX_RUN ) {
1281 snprintf(str,MXSTR,"W error: can not find run %d for event %d in %d", irun,ievt,id);
1282 factOut(kFatal,901, str ) ;
1283 gi.wrtErr++ ;
1284for ( j=0; j<MAX_RUN; j++) printf("j %d run.j %d run %d\n",j,runCtrl[j].runId,irun );
1285exit(111);
1286 }
1287 lastRun = j ;
1288 }
1289
1290 if (runCtrl[j].fileId < 0 ) {
1291 actRun.Version = 1 ;
1292 actRun.RunType = -1 ;
1293 actRun.NBoard = NBOARDS ;
1294 actRun.NPix = NPIX ;
1295 actRun.NTm = NTMARK ;
1296 actRun.Nroi = mBuffer[id].nRoi ;
1297// actRun.FADhead = mBuffer[id].FADhead ; //to be corrected
1298
1299 runCtrl[j].fileHd = runOpen(irun, &actRun, sizeof(actRun) ) ;
1300 if (runCtrl[j].fileHd == NULL ) {
1301 snprintf(str,MXSTR,"W could not open a file for run %d",irun);
1302 factOut(kError,502, str ) ;
1303 runCtrl[j].fileId = 99 ;
1304 } else {
1305 snprintf(str,MXSTR,"W opened new run_file %d",irun) ;
1306 factOut(kInfo,-1, str ) ;
1307 runCtrl[j].fileId = 0 ;
1308 }
1309
1310 }
1311
1312 if (runCtrl[j].fileId != 0 ) {
1313 snprintf(str,MXSTR,"W no open file for this run %d",irun) ;
1314 factOut(kWarn,123,str) ;
1315 evtCtrl.evtStat[k0] = 902 ;
1316 gi.wrtErr++ ;
1317 } else {
1318 i=runWrite(runCtrl[j].fileHd, mBuffer[id].fEvent, sizeof(mBuffer[id]) );
1319 if ( i>=0 ) {
1320 runCtrl[j].lastTime = g_actTime;
1321 runCtrl[j].actEvt++ ;
1322 evtCtrl.evtStat[k0] = 901 ;
1323 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0);
1324 factOut(kDebug,504, str ) ;
1325// gj.writEvt++ ;
1326 } else {
1327 snprintf(str,MXSTR,"W error writing event for run %d",irun) ;
1328 factOut(kError,503, str ) ;
1329 evtCtrl.evtStat[k0] = 901 ;
1330 gi.wrtErr++ ;
1331 }
1332
1333 if ( i < 0
1334 || runCtrl[j].closeTime < g_actTime
1335 || runCtrl[j].lastTime < g_actTime-300
1336 || runCtrl[j].maxEvt < runCtrl[j].actEvt ) {
1337int ii =0 ;
1338if ( i < 0 ) ii=1 ;
1339else if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1340else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1341else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1342
1343
1344
1345 //close run for whatever reason
1346 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1347 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1348 if (i<0) {
1349 snprintf(str,MXSTR,"error closing run %d %d AAA",runCtrl[j].runId,i) ;
1350 factOut(kError,503, str ) ;
1351 runCtrl[j].fileId = 9001 ;
1352 } else {
1353 snprintf(str,MXSTR,"W closed run %d AAA %d",irun,ii) ;
1354 factOut(kInfo,503, str ) ;
1355 runCtrl[j].fileId = 901 ;
1356 }
1357 }
1358 }
1359 } else if (evtCtrl.evtStat[k0] > 0 ) numWait++ ;
1360 }
1361
1362 //check if we should close a run (mainly when no event pending)
1363 for ( j=0; j<MAX_RUN; j++) {
1364 if ( runCtrl[j].fileId==0
1365 && ( runCtrl[j].closeTime < g_actTime
1366 ||runCtrl[j].lastTime < g_actTime-300
1367 ||runCtrl[j].maxEvt < runCtrl[j].actEvt ) ) {
1368 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1369int ii =0 ;
1370 if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1371else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1372else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1373
1374
1375 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1376 if (i<0) {
1377 snprintf(str,MXSTR,"error closing run %d %d BBB",runCtrl[j].runId,i) ;
1378 factOut(kError,506, str ) ;
1379 runCtrl[j].fileId = 9011 ;
1380 } else {
1381 snprintf(str,MXSTR,"W closed run %d BBB %d",runCtrl[j].runId,ii) ;
1382 factOut(kInfo,507, str ) ;
1383 runCtrl[j].fileId = 911 ;
1384 }
1385 }
1386 }
1387
1388 if (numWrite == 0) {
1389 //seems we have nothing to do, so sleep a little
1390 xwait.tv_sec = 0;
1391 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1392 nanosleep( &xwait , NULL ) ;
1393 }
1394
1395 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do
1396 snprintf(str,MXSTR,"Finish Write Process ...");
1397 factOut(kInfo,-1, str ) ;
1398 gw_runStat = -22 ; //==> we should exit
1399 gj.writStat= -22 ; //==> we should exit
1400 goto closerun ;
1401 }
1402 gw_runStat = gi_runStat ;
1403 gj.writStat= gj.readStat ;
1404
1405 }
1406
1407 //must close all open files ....
1408 snprintf(str,MXSTR,"Abort Writing Process ...");
1409 factOut(kInfo,-1, str ) ;
1410closerun:
1411 snprintf(str,MXSTR,"Close all open files ...");
1412 factOut(kInfo,-1, str ) ;
1413 for ( j=0; j<MAX_RUN; j++)
1414 if ( runCtrl[j].fileId ==0 ) {
1415 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1416 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1417int ii =0 ;
1418 if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1419else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1420else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1421 if (i<0) {
1422 snprintf(str,MXSTR,"error closing run %d %d CCC",runCtrl[j].runId,i) ;
1423 factOut(kError,506, str ) ;
1424 runCtrl[j].fileId = 9021 ;
1425 } else {
1426 snprintf(str,MXSTR,"W closed run %d CCC %d",runCtrl[j].runId,ii) ;
1427 factOut(kInfo,507, str ) ;
1428 runCtrl[j].fileId = 921 ;
1429 }
1430 }
1431
1432 gw_runStat = -99;
1433 gj.writStat= -99;
1434 snprintf(str,MXSTR,"Exit Writing Process ...");
1435 factOut(kInfo,-1, str ) ;
1436 return 0;
1437
1438
1439
1440
1441} /*-----------------------------------------------------------------*/
1442
1443
1444
1445
1446void StartEvtBuild() {
1447
1448 int i,j,imax,status,th_ret[50] ;
1449 pthread_t thread[50] ;
1450 struct timespec xwait ;
1451
1452 gi_runStat = gp_runStat = gw_runStat = 0 ;
1453 gj.readStat= gj.procStat= gj.writStat= 0 ;
1454
1455 snprintf(str,MXSTR,"Starting EventBuilder");
1456 factOut(kInfo,-1, str ) ;
1457
1458//start all threads (more to come) when we are allowed to ....
1459 while (g_runStat == 0 ) {
1460 xwait.tv_sec = 0;
1461 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec
1462 nanosleep( &xwait , NULL ) ;
1463 }
1464
1465 i=0 ;
1466 th_ret[i] = pthread_create( &thread[i], NULL, readFAD, NULL );
1467 i++;
1468 th_ret[i] = pthread_create( &thread[i], NULL, procEvt, NULL );
1469 i++;
1470 th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, NULL );
1471 i++;
1472 imax=i ;
1473
1474
1475#ifdef BILAND
1476 xwait.tv_sec = 20;;
1477 xwait.tv_nsec= 0 ; // sleep for ~20sec
1478 nanosleep( &xwait , NULL ) ;
1479
1480 printf("close all runs in 2 seconds\n");
1481
1482 CloseRunFile( 0, time(NULL)+2, 0) ;
1483
1484 xwait.tv_sec = 5;;
1485 xwait.tv_nsec= 0 ; // sleep for ~20sec
1486 nanosleep( &xwait , NULL ) ;
1487
1488 printf("setting g_runstat to -1\n");
1489
1490 g_runStat = -1 ;
1491#endif
1492
1493
1494//wait for all threads to finish
1495 for (i=0; i<imax; i++) {
1496 j = pthread_join ( thread[i], (void **)&status) ;
1497 }
1498
1499} /*-----------------------------------------------------------------*/
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515 /*-----------------------------------------------------------------*/
1516 /*-----------------------------------------------------------------*/
1517 /*-----------------------------------------------------------------*/
1518 /*-----------------------------------------------------------------*/
1519 /*-----------------------------------------------------------------*/
1520
1521
1522#ifdef BILAND
1523
1524
1525 /*-----------------------------------------------------------------*/
1526 /*-----------------------------------------------------------------*/
1527 /*-----------------------------------------------------------------*/
1528 /*-----------------------------------------------------------------*/
1529 /*-----------------------------------------------------------------*/
1530
1531
1532
1533
1534FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len )
1535{ return 1; } ;
1536
1537int runWrite(FileHandle_t fileHd , EVENT *event, size_t len )
1538{ return 1; } ;
1539
1540int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len )
1541{ return 1; } ;
1542
1543
1544
1545int eventCheck( PEVNT_HEADER *fadhd, EVENT *event)
1546{
1547 int i=0;
1548
1549 printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
1550 for (i=0; i<NBOARDS; i++) {
1551 printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
1552 }
1553 return 0;
1554}
1555
1556
1557void factStatNew(EVT_STAT gi) {
1558 int i ;
1559
1560 for (i=0;i<MAX_SOCK;i++) {
1561 printf("%4d",gi.numRead[i]);
1562 if (i%20 == 0 ) printf("\n");
1563 }
1564}
1565
1566
1567void factStat(GUI_STAT gj) {
1568// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
1569// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
1570}
1571
1572
1573void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) {
1574// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
1575}
1576
1577
1578
1579void debugStream(int isock, void *buf, int len) {
1580}
1581
1582void debugHead(int i, int j, void *buf) {
1583}
1584
1585
1586void factOut(int severity, int err, char* message ) {
1587static FILE * fd ;
1588static int file=0 ;
1589
1590 if (file==0) {
1591 printf("open file\n");
1592 fd=fopen("x.out","w+") ;
1593 file=999;
1594 }
1595
1596 fprintf(fd,"%3d %3d | %s \n",severity,err,message) ;
1597
1598 if (severity != kDebug)
1599 printf("%3d %3d | %s\n",severity,err,message) ;
1600}
1601
1602
1603
1604int main() {
1605 int i,b,c,p ;
1606 char ipStr[100] ;
1607 struct in_addr IPaddr ;
1608
1609 g_maxMem = 1024*1024 ; //MBytes
1610 g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
1611
1612
1613 g_runStat = 40 ;
1614
1615 i=0 ;
1616
1617// version for standard crates
1618//for (c=0; c<4,c++) {
1619// for (b=0; b<10; b++) {
1620// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
1621//
1622// inet_pton(PF_INET, ipStr, &IPaddr) ;
1623//
1624// g_port[i].sockAddr.sin_family = PF_INET;
1625// g_port[i].sockAddr.sin_port = htons(5000) ;
1626// g_port[i].sockAddr.sin_addr = IPaddr ;
1627// g_port[i].sockDef = 1 ;
1628// i++ ;
1629// }
1630//}
1631//
1632//version for PC-test *
1633 for (c=0; c<4; c++) {
1634 for (b=0; b<10; b++) {
1635 sprintf(ipStr,"10.0.%d.11",128+c) ;
1636 if (c==0) sprintf(ipStr,"10.0.100.11") ;
1637
1638 inet_pton(PF_INET, ipStr, &IPaddr) ;
1639 p = 31919+100*c+10*b;
1640
1641
1642 g_port[i].sockAddr.sin_family = PF_INET;
1643 g_port[i].sockAddr.sin_port = htons(p) ;
1644 g_port[i].sockAddr.sin_addr = IPaddr ;
1645 g_port[i].sockDef = 1 ;
1646
1647 i++ ;
1648 }
1649 }
1650
1651
1652//g_port[17].sockDef =-1 ;
1653//g_actBoards-- ;
1654
1655 StartEvtBuild() ;
1656
1657 return 0;
1658
1659}
1660#endif
Note: See TracBrowser for help on using the repository browser.