source: trunk/FACT++/src/EventBuilder.c@ 11711

Last change on this file since 11711 was 11689, checked in by tbretz, 14 years ago
Added latest changes in the EventBuilder; consequently added TriggerNum and NumBoards to Event Header in FITS.
File size: 59.6 KB
Line 
1
2// // // #define EVTDEBUG
3
4
5
6#include <stdlib.h>
7#include <stdint.h>
8#include <unistd.h>
9#include <stdio.h>
10#include <sys/time.h>
11#include <arpa/inet.h>
12#include <string.h>
13#include <math.h>
14#include <error.h>
15#include <errno.h>
16#include <unistd.h>
17#include <sys/types.h>
18#include <sys/socket.h>
19#include <pthread.h>
20#include <sched.h>
21
22#include "EventBuilder.h"
23
24enum Severity
25{
26 kMessage = 10, ///< Just a message, usually obsolete
27 kInfo = 20, ///< An info telling something which can be interesting to know
28 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
29 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
30 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
31 kDebug = 99, ///< A message used for debugging only
32};
33
34#define MIN_LEN 32 // min #bytes needed to interpret FADheader
35#define MAX_LEN 256*1024 // size of read-buffer per socket
36
37//#define nanosleep(x,y)
38
39extern FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len ) ;
40extern int runWrite(FileHandle_t fileHd , EVENT *event, size_t len ) ;
41extern int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len ) ;
42extern void factOut(int severity, int err, char* message ) ;
43extern void gotNewRun( int runnr, PEVNT_HEADER *headers );
44
45
46extern void factStat(GUI_STAT gj) ;
47
48extern void factStatNew(EVT_STAT gi) ;
49
50extern int eventCheck( PEVNT_HEADER *fadhd, EVENT *event) ;
51
52extern void debugHead(int i, int j, void *buf);
53
54extern void debugRead(int isock, int ibyte, int32_t event,int32_t ftmevt,
55 int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) ;
56extern void debugStream(int isock, void *buf, int len) ;
57
58int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
59
60
61uint g_actTime ;
62uint g_actUsec ;
63int g_runStat ;
64int g_reset ;
65int g_useFTM ;
66
67int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX ;
68size_t g_maxMem ; //maximum memory allowed for buffer
69
70//no longer needed ...
71 int g_maxBoards ; //maximum number of boards to be initialized
72 int g_actBoards ;
73//
74
75FACT_SOCK g_port[NBOARDS] ; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
76
77
78 int gi_runStat ;
79 int gp_runStat ;
80 int gw_runStat ;
81
82 int gi_memStat = +1 ;
83
84 uint32_t gi_myRun = 0 ;
85 uint32_t actrun = 0 ;
86
87
88uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
89
90//uint gi_EvtStart= 0 ;
91//uint gi_EvtRead = 0 ;
92//uint gi_EvtBad = 0 ;
93//uint gi_EvtTot = 0 ;
94//size_t gi_usedMem = 0 ;
95
96//uint gw_EvtTot = 0 ;
97//uint gp_EvtTot = 0 ;
98
99PIX_MAP g_pixMap[NPIX] ;
100
101EVT_STAT gi ;
102GUI_STAT gj ;
103
104EVT_CTRL evtCtrl ; //control of events during processing
105int evtIdx[MAX_EVT*MAX_RUN] ; //index from mBuffer to evtCtrl
106
107WRK_DATA mBuffer[MAX_EVT*MAX_RUN]; //local working space
108
109
110
111
112RUN_HEAD actRun ;
113
114RUN_CTRL runCtrl[MAX_RUN] ;
115
116RUN_TAIL runTail[MAX_RUN] ;
117
118
119/*
120*** Definition of rdBuffer to read in IP packets; keep it global !!!!
121 */
122
123
124typedef union {
125 int8_t B[MAX_LEN ];
126 int16_t S[MAX_LEN/2];
127 int32_t I[MAX_LEN/4];
128 int64_t L[MAX_LEN/8];
129} CNV_FACT ;
130
131typedef struct {
132 int bufTyp ; //what are we reading at the moment: 0=header 1=data -1=skip ...
133 int32_t bufPos ; //next byte to read to the buffer next
134 int32_t bufLen ; //number of bytes left to read
135 int32_t skip ; //number of bytes skipped before start of event
136
137 int sockStat ; //-1 if socket not yet connected , 99 if not exist
138 int socket ; //contains the sockets
139 struct sockaddr_in SockAddr ; //IP for each socket
140
141 int evtID ; // event ID of event currently read
142 int runID ; // run "
143 int ftmID ; // event ID from FTM
144 uint fadLen ; // FADlength of event currently read
145 int fadVers ; // Version of FAD
146 int ftmTyp ; // trigger type
147 int board ; // boardID (softwareID: 0..40 )
148 int Port ;
149
150 CNV_FACT *rBuf ;
151
152#ifdef EVTDEBUG
153 CNV_FACT *xBuf ; //a copy of rBuf (temporary for debuging)
154#endif
155
156} READ_STRUCT ;
157
158
159typedef union {
160 int8_t B[2];
161 int16_t S ;
162} SHORT_BYTE ;
163
164
165
166
167
168#define MXSTR 1000
169char str[MXSTR] ;
170
171SHORT_BYTE start, stop;
172
173READ_STRUCT rd[MAX_SOCK] ; //buffer to read IP and afterwards store in mBuffer
174
175
176
177/*-----------------------------------------------------------------*/
178
179
180/*-----------------------------------------------------------------*/
181
182
183
184int GenSock(int flag, int sid, int port, struct sockaddr_in *sockAddr, READ_STRUCT *rd) {
185/*
186*** generate Address, create sockets and allocates readbuffer for it
187***
188*** if flag==0 generate socket and buffer
189*** <0 destroy socket and buffer
190*** >0 close and redo socket
191***
192*** sid : board*7 + port id
193 */
194
195 int j ;
196
197 if (rd->sockStat ==0 ) { //close socket if open
198 j=close(rd->socket) ;
199 if (j>0) {
200 snprintf(str,MXSTR,"Error closing socket %d | %m",sid);
201 factOut(kFatal,771, str ) ;
202 } else {
203 snprintf(str,MXSTR,"Succesfully closed socket %d",sid);
204 factOut(kInfo,771, str ) ;
205 }
206 }
207
208
209 if (flag < 0) {
210 free(rd->rBuf) ; //and never open again
211#ifdef EVTDEBUG
212 free(rd->xBuf) ; //and never open again
213#endif
214 rd->rBuf = NULL ;
215 rd->sockStat = 99 ;
216 return 0 ;
217 }
218
219
220 if (flag == 0) { //generate address and buffer ...
221 rd->Port = port ;
222 rd->SockAddr.sin_family = sockAddr->sin_family;
223 rd->SockAddr.sin_port = htons(port) ;
224 rd->SockAddr.sin_addr = sockAddr->sin_addr ;
225
226#ifdef EVTDEBUG
227 rd->xBuf = malloc(sizeof(CNV_FACT) ) ;
228#endif
229 rd->rBuf = malloc(sizeof(CNV_FACT) ) ;
230 if ( rd->rBuf == NULL ) {
231 snprintf(str,MXSTR,"Could not create local buffer %d",sid);
232 factOut(kFatal,774, str ) ;
233 rd->sockStat = 77 ;
234 return -3 ;
235 }
236 }
237
238
239 if ( (rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
240 snprintf(str,MXSTR,"Could not generate socket %d | %m",sid);
241 factOut(kFatal,773, str ) ;
242 rd->sockStat = 88 ;
243 return -2 ;
244 }
245
246 snprintf(str,MXSTR,"Successfully generated socket %d ",sid);
247 factOut(kInfo,773, str ) ;
248 rd->sockStat = -1 ; //try to (re)open socket
249 return 0 ;
250
251} /*-----------------------------------------------------------------*/
252
253 /*-----------------------------------------------------------------*/
254
255
256
257
258int mBufInit() {
259// initialize mBuffer (mark all entries as unused\empty)
260
261 int i ;
262 uint32_t actime ;
263
264 actime = g_actTime + 50000000 ;
265
266 for (i=0; i<MAX_EVT*MAX_RUN; i++) {
267 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
268 mBuffer[i].runNum= 0 ;
269
270 evtCtrl.evtBuf[ i] = -1 ;
271 evtCtrl.evtStat[ i] = -1 ;
272 evtCtrl.pcTime[ i] = actime ; //initiate to far future
273 }
274
275
276 actRun.FADhead = malloc( NBOARDS* sizeof(PEVNT_HEADER) ) ;
277
278 evtCtrl.frstPtr = 0 ;
279 evtCtrl.lastPtr = 0 ;
280
281 return 0 ;
282
283} /*-----------------------------------------------------------------*/
284
285
286
287
288int mBufEvt( int evID, uint runID, int nRoi[], int sk,
289 int fadlen, int trgTyp, int trgNum, int fadNum) {
290// generate a new Event into mBuffer:
291// make sure only complete Event are possible, so 'free' will always work
292// returns index into mBuffer[], or negative value in case of error
293// error: <-9000 if roi screwed up (not consistent with run)
294// <-8000 (not consistent with event)
295// <-7000 (not consistent with board)
296// < 0 if no space left
297
298 struct timeval *tv, atv;
299 tv=&atv;
300 uint32_t tsec, tusec ;
301
302 int i, k, jr, b, evFree ;
303 int headmem=0 ;
304 size_t needmem = 0 ;
305
306
307 b = sk/7 ;
308
309 if (nRoi[0] <0 || nRoi[0] > 1024) {
310 snprintf(str,MXSTR,"illegal nRoi[0]: %d",nRoi[0]) ;
311 factOut(kError, 999, str ) ;
312 gj.badRoiR++ ;
313 gj.badRoi[b]++ ;
314 return -9999 ;
315 }
316
317 for (jr=1; jr<8; jr++) {
318 if ( nRoi[jr] != nRoi[0] ) {
319 snprintf(str,MXSTR,"wrong nRoi[%d]: %d %d",jr,nRoi[jr],nRoi[0]) ;
320 factOut(kError,711, str ) ;
321 gj.badRoiB++ ;
322 gj.badRoi[b]++ ;
323 return -7101 ;
324 }
325 }
326 if ( nRoi[8] < nRoi[0] ) {
327 snprintf(str,MXSTR,"wrong nRoi_TM: %d %d",nRoi[8],nRoi[0]) ;
328 factOut(kError,712, str ) ;
329 gj.badRoiB++ ;
330 gj.badRoi[b]++ ;
331 return -7102 ;
332 }
333
334
335 i = evID % MAX_EVT ;
336 evFree = -1 ;
337
338 for ( k=0; k<MAX_RUN; k++) {
339 if ( mBuffer[i].evNum == evID
340 && mBuffer[i].runNum== runID ) { //event is already registered;
341 // is it ok ????
342 if ( mBuffer[i].nRoi != nRoi[0]
343 || mBuffer[i].nRoiTM != nRoi[8] ) {
344 snprintf(str,MXSTR,"illegal evt_roi %d %d ; %d %d",
345 nRoi[0],nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM );
346 factOut(kError,821, str ) ;
347 gj.badRoiE++ ;
348 gj.badRoi[b]++ ;
349 return -8201 ;
350 }
351
352// count for inconsistencies
353
354 if ( mBuffer[i].trgNum != trgNum ) mBuffer[i].Errors++ ;
355 if ( mBuffer[i].fadNum != fadNum ) mBuffer[i].Errors+=100 ;
356 if ( mBuffer[i].trgTyp != trgTyp ) mBuffer[i].Errors+=10000 ;
357
358 //everything seems fine so far ==> use this slot ....
359 return i ;
360 }
361 if ( evFree < 0 && mBuffer[i].evNum < 0 ) evFree = i ;
362 i += MAX_EVT ;
363 }
364
365
366 //event does not yet exist; create it
367 if (evFree < 0 ) { //no space available in ctrl
368 snprintf(str,MXSTR,"no control slot to keep event %d",evID) ;
369 factOut(kError,881, str ) ;
370 return -1 ;
371 }
372 i = evFree ; //found free entry; use it ...
373
374 gettimeofday( tv, NULL);
375 tsec = atv.tv_sec ;
376 tusec= atv.tv_usec ;
377
378 //check if runId already registered in runCtrl
379 evFree = -1 ;
380 for (k=0; k<MAX_RUN; k++) {
381 if (runCtrl[k].runId == runID ) {
382 if ( runCtrl[k].roi0 != nRoi[0]
383 || runCtrl[k].roi8 != nRoi[8] ) {
384 snprintf(str,MXSTR,"illegal run_roi %d %d ; %d %d",
385 nRoi[0],nRoi[8],runCtrl[k].roi0,runCtrl[k].roi8 );
386 factOut(kError,931, str ) ;
387 gj.badRoiR++ ;
388 gj.badRoi[b]++ ;
389 return -9301 ;
390 }
391 goto RUNFOUND ;
392 }
393 else if (evFree < 0 && runCtrl[k].runId == 0 ) evFree = k ;
394 }
395
396 if (evFree <0 ) {
397 snprintf(str,MXSTR,"not able to register the new run %d",runID);
398 factOut(kFatal,883, str ) ;
399 return -1001 ;
400 } else {
401 snprintf(str,MXSTR,"register new run %d roi: %d %d",runID,nRoi[0],nRoi[8]) ;
402 factOut(kInfo,503, str ) ;
403 runCtrl[evFree].runId = runID ;
404 runCtrl[evFree].roi0 = nRoi[0] ;
405 runCtrl[evFree].roi8 = nRoi[8] ;
406 runCtrl[evFree].fileId = -2 ;
407 runCtrl[evFree].lastEvt= -1 ;
408 runCtrl[evFree].nextEvt= 0;
409 runCtrl[evFree].actEvt = 0;
410 runCtrl[evFree].maxEvt = 999999999 ; //max number events allowed
411 runCtrl[evFree].firstUsec=tusec ;
412 runCtrl[evFree].firstTime=
413 runCtrl[evFree].lastTime=tsec ;
414 runCtrl[evFree].closeTime=tsec + 3600*24 ; //max time allowed
415 runCtrl[evFree].lastTime = 0 ;
416
417 runTail[evFree].nEventsOk =
418 runTail[evFree].nEventsRej =
419 runTail[evFree].nEventsBad =
420 runTail[evFree].PCtime0 =
421 runTail[evFree].PCtimeX = 0 ;
422 }
423
424RUNFOUND:
425
426 needmem = sizeof(EVENT) + NPIX*nRoi[0]*2 + NTMARK*nRoi[0]*2; //
427
428 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
429
430 if ( gj.usdMem + needmem + headmem > g_maxMem) {
431 gj.maxMem = gj.usdMem + needmem + headmem ;
432 if (gi_memStat>0 ) {
433 gi_memStat = -99 ;
434 snprintf(str,MXSTR,"no memory left to keep event %6d sock %3d",evID,sk) ;
435 factOut(kError,882, str ) ;
436 } else {
437 snprintf(str,MXSTR,"no memory left to keep event %6d sock %3d",evID,sk) ;
438 factOut(kDebug,882, str ) ;
439 }
440 return -11 ;
441 }
442
443 mBuffer[i].FADhead = malloc( headmem ) ;
444 if (mBuffer[i].FADhead == NULL) {
445 snprintf(str,MXSTR,"malloc header failed for event %d",evID) ;
446 factOut(kError,882, str ) ;
447 return -12;
448 }
449
450 mBuffer[i].fEvent = malloc( needmem ) ;
451 if (mBuffer[i].fEvent == NULL) {
452 snprintf(str,MXSTR,"malloc data failed for event %d",evID) ;
453 factOut(kError,882, str ) ;
454 free(mBuffer[i].fEvent) ;
455 mBuffer[i].fEvent = NULL ;
456 return -22;
457 }
458
459
460
461 //flag all boards as unused
462 mBuffer[i].nBoard = 0 ;
463 for (k=0; k<NBOARDS; k++ ) {
464 mBuffer[i].board[k] = -1;
465 }
466 //flag all pixels as unused
467 for (k=0; k<NPIX; k++ ) {
468 mBuffer[i].fEvent->StartPix[k] = -1 ;
469 }
470 //flag all TMark as unused
471 for (k=0; k<NTMARK; k++ ) {
472 mBuffer[i].fEvent->StartTM[k] = -1 ;
473 }
474
475 mBuffer[i].fEvent->NumBoards = 0 ;
476 mBuffer[i].fEvent->PCUsec = tusec ;
477 mBuffer[i].fEvent->PCTime =
478 mBuffer[i].pcTime = tsec ;
479 mBuffer[i].nRoi = nRoi[0] ;
480 mBuffer[i].nRoiTM = nRoi[8] ;
481 mBuffer[i].evNum = evID ;
482 mBuffer[i].runNum = runID ;
483 mBuffer[i].fadNum = fadNum;
484 mBuffer[i].trgNum = trgNum;
485 mBuffer[i].trgTyp = trgTyp ;
486 mBuffer[i].evtLen = needmem ;
487 mBuffer[i].Errors = 0 ;
488
489 gj.usdMem += needmem + headmem;
490 if (gj.usdMem > gj.maxMem ) gj.maxMem = gj.usdMem ;
491
492 gj.bufTot++ ;
493 if (gj.bufTot > gj.maxEvt ) gj.maxEvt = gj.bufTot ;
494
495 gj.rateNew++ ;
496
497 //register event in 'active list (reading)'
498
499 evtCtrl.evtBuf[ evtCtrl.lastPtr] = i ;
500 evtCtrl.evtStat[ evtCtrl.lastPtr] = 0 ;
501 evtCtrl.pcTime[ evtCtrl.lastPtr] = g_actTime ;
502 evtIdx[i] = evtCtrl.lastPtr ;
503
504
505 snprintf(str,MXSTR,"%5d %8d start new evt %8d %8d sock %3d len %5d t %10d",
506 evID,runID,i,evtCtrl.lastPtr,sk,fadlen,mBuffer[i].pcTime);
507 factOut(kDebug,-11, str ) ;
508 evtCtrl.lastPtr++ ;
509 if (evtCtrl.lastPtr == MAX_EVT*MAX_RUN ) evtCtrl.lastPtr = 0;
510
511 gi.evtGet++ ;
512
513 return i ;
514
515} /*-----------------------------------------------------------------*/
516
517
518
519
520int mBufFree(int i) {
521//delete entry [i] from mBuffer:
522//(and make sure multiple calls do no harm ....)
523
524 int headmem=0 ;
525 int evid ;
526 size_t freemem = 0 ;
527
528 evid = mBuffer[i].evNum ;
529 freemem = mBuffer[i].evtLen ;
530
531 free(mBuffer[i].fEvent ) ;
532 mBuffer[i].fEvent = NULL ;
533
534 free(mBuffer[i].FADhead ) ;
535 mBuffer[i].FADhead = NULL ;
536
537 headmem = NBOARDS* sizeof(PEVNT_HEADER) ;
538 mBuffer[i].evNum = mBuffer[i].nRoi= -1;
539 mBuffer[i].runNum = 0;
540
541 gj.usdMem = gj.usdMem - freemem - headmem;
542 gj.bufTot-- ;
543
544 if (gi_memStat < 0 ) {
545 if (gj.usdMem <= 0.75 * gj.maxMem ) gi_memStat = +1 ;
546 }
547
548
549 return 0 ;
550
551} /*-----------------------------------------------------------------*/
552
553
554void resetEvtStat() {
555 int i ;
556
557 for (i=0; i<MAX_SOCK; i++) gi.numRead[i] = 0 ;
558
559 for (i=0; i<NBOARDS; i++ ) {
560 gi.gotByte[i] = 0 ;
561 gi.gotErr[i] = 0 ;
562
563 }
564
565 gi.evtGet = 0 ; //#new Start of Events read
566 gi.evtTot = 0 ; //#complete Events read
567 gi.evtErr = 0 ; //#Events with Errors
568 gi.evtSkp = 0 ; //#Events incomplete (timeout)
569
570 gi.procTot = 0 ; //#Events processed
571 gi.procErr = 0 ; //#Events showed problem in processing
572 gi.procTrg = 0 ; //#Events accepted by SW trigger
573 gi.procSkp = 0 ; //#Events rejected by SW trigger
574
575 gi.feedTot = 0 ; //#Events used for feedBack system
576 gi.feedErr = 0 ; //#Events rejected by feedBack
577
578 gi.wrtTot = 0 ; //#Events written to disk
579 gi.wrtErr = 0 ; //#Events with write-error
580
581 gi.runOpen = 0 ; //#Runs opened
582 gi.runClose= 0 ; //#Runs closed
583 gi.runErr = 0 ; //#Runs with open/close errors
584
585return ;
586} /*-----------------------------------------------------------------*/
587
588
589
590void initReadFAD() {
591return ;
592} /*-----------------------------------------------------------------*/
593
594
595
596void *readFAD( void *ptr ) {
597/* *** main loop reading FAD data and sorting them to complete events */
598 int head_len,frst_len,numok,numok2,numokx,dest,evID,i,k ;
599 int actBoards = 0, minLen ;
600 int32_t jrd ;
601 uint gi_SecTime ; //time in seconds
602 int boardId, roi[9],drs,px,src,pixS,pixH,pixC,pixR,tmS ;
603
604 int goodhed = 0 ;
605 int errcnt0 = 0 ;
606
607 int sockDef[NBOARDS]; //internal state of sockets
608 int jrdx ;
609
610
611 struct timespec xwait ;
612
613
614 struct timeval *tv, atv;
615 tv=&atv;
616 uint32_t tsec, tusec ;
617
618
619 snprintf(str,MXSTR,"start initializing");
620 factOut(kInfo,-1, str ) ;
621
622 int cpu = 7 ; //read thread
623 cpu_set_t mask;
624
625/* CPU_ZERO initializes all the bits in the mask to zero. */
626 CPU_ZERO( &mask );
627/* CPU_SET sets only the bit corresponding to cpu. */
628 cpu = 7 ;
629 CPU_SET( cpu, &mask );
630
631/* sched_setaffinity returns 0 in success */
632 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
633 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
634 factOut(kWarn,-1, str ) ;
635 }
636
637 head_len = sizeof(PEVNT_HEADER) ;
638 frst_len = head_len ; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
639 minLen = head_len ; //min #bytes needed to check header: full header for debug
640
641 start.S=0xFB01;
642 stop.S= 0x04FE;
643
644/* initialize run control logics */
645 for (i=0; i<MAX_RUN; i++) {
646 runCtrl[i].runId = 0 ;
647 runCtrl[i].fileId = -2 ;
648 }
649 gi_resetS = gi_resetR = 9;
650 for (i=0; i<NBOARDS; i++) sockDef[i]= 0 ;
651
652START:
653 gettimeofday( tv, NULL);
654 g_actTime = tsec = atv.tv_sec ;
655 g_actUsec = tusec= atv.tv_usec ;
656 gi_myRun = g_actTime ;
657 evtCtrl.frstPtr = 0 ;
658 evtCtrl.lastPtr = 0 ;
659
660 gi_SecTime = g_actTime ;
661 gi_runStat = g_runStat ;
662 gj.readStat= g_runStat ;
663 numok = numok2 = 0 ;
664
665 if ( gi_resetS > 0) {
666 //make sure all sockets are preallocated as 'not exist'
667 for (i=0; i<MAX_SOCK; i++) {
668 rd[i].socket = -1 ;
669 rd[i].sockStat = 99 ;
670 }
671
672 for (k=0; k<NBOARDS; k++) {
673 gi_NumConnect[k]=0;
674 gi.numConn[k] =0;
675 gj.numConn[k] =0;
676 gj.errConn[k] =0;
677 gj.rateBytes[k] =0;
678 gj.totBytes[k] =0;
679 }
680
681 }
682
683
684 if ( gi_resetR > 0) {
685 resetEvtStat();
686 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0 ;
687 gj.usdMem = gj.maxMem = gj.xxxMem = 0 ;
688 gj.totMem = g_maxMem ;
689 gj.bufNew = gj.bufEvt = 0 ;
690 gj.badRoiE = gj.badRoiR = gj.badRoiB =
691 gj.evtSkip = gj.evtWrite = gj.evtErr = 0 ;
692
693 int b ;
694 for (b=0; b<NBOARDS; b++) gj.badRoi[b]=0 ;
695
696 mBufInit() ; //initialize buffers
697
698 snprintf(str,MXSTR,"end initializing");
699 factOut(kInfo,-1, str ) ;
700 }
701
702
703 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0 ;
704
705 while (g_runStat >=0 && g_reset ==0 ) { //loop until global variable g_runStat claims stop
706
707 gi_runStat = g_runStat;
708 gj.readStat= g_runStat;
709 gettimeofday( tv, NULL);
710 g_actTime = tsec = atv.tv_sec ;
711 g_actUsec = tusec= atv.tv_usec ;
712
713
714 int b,p,p0,s0,nch;
715 nch = 0 ;
716 for (b=0; b<NBOARDS; b++ ) {
717 k = b*7 ;
718 if ( g_port[b].sockDef != sockDef[b] ) { //something has changed ...
719 nch++ ;
720 gi_NumConnect[ b ] = 0 ; //must close all connections
721 gi.numConn[ b ] = 0;
722 gj.numConn[ b ] = 0;
723 if ( sockDef[b] == 0) s0= 0 ; //sockets to be defined and opened
724 else if (g_port[b].sockDef == 0) s0= -1 ; //sockets to be destroyed
725 else s0= +1 ; //sockets to be closed and reopened
726
727 if (s0 == 0) p0=ntohs(g_port[b].sockAddr.sin_port);
728 else p0=0 ;
729
730 for (p=p0+1; p<p0+8; p++) {
731 GenSock(s0, k, p, &g_port[b].sockAddr, &rd[k]) ; //generate address and socket
732 k++ ;
733 }
734 sockDef[b] = g_port[b].sockDef ;
735 }
736 }
737
738 if (nch > 0 ) {
739 actBoards = 0 ;
740 for (b=0; b<NBOARDS; b++ ) {
741 if ( sockDef[b] > 0 ) actBoards++ ;
742 }
743 }
744
745
746 jrdx = 0;
747 numokx= 0;
748 numok = 0 ; //count number of succesfull actions
749
750 for (i=0; i<MAX_SOCK; i++) { //check all sockets if something to read
751 b = i / 7 ;
752 if (sockDef[b] > 0) s0= +1 ;
753 else s0= -1 ;
754
755 if (rd[i].sockStat <0 ) { //try to connect if not yet done
756 rd[i].sockStat=connect(rd[i].socket,
757 (struct sockaddr*) &rd[i].SockAddr, sizeof(rd[i].SockAddr)) ;
758 if (rd[i].sockStat ==0 ) { //successfull ==>
759 if (sockDef[b] > 0) {
760 rd[i].bufTyp = 0 ; // expect a header
761 rd[i].bufLen = frst_len ; // max size to read at begining
762 } else {
763 rd[i].bufTyp = -1 ; // full data to be skipped
764 rd[i].bufLen = MAX_LEN ; //huge for skipping
765 }
766 rd[i].bufPos = 0 ; // no byte read so far
767 rd[i].skip = 0 ; // start empty
768 gi_NumConnect[ b ]++ ;
769 gi.numConn[ b ]++ ;
770 gj.numConn[ b ]++ ;
771 snprintf(str,MXSTR,"+++connect %d %d",b,gi.numConn[ b ]);
772 factOut(kInfo,-1, str ) ;
773 }
774 }
775
776 if (rd[i].sockStat ==0 ) { //we have a connection ==> try to read
777 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
778 numok++ ;
779 jrd=recv(rd[i].socket,&rd[i].rBuf->B[ rd[i].bufPos], rd[i].bufLen, MSG_DONTWAIT);
780
781 if (jrd >0 ) {
782 debugStream(i,&rd[i].rBuf->B[ rd[i].bufPos],jrd) ;
783#ifdef EVTDEBUG
784 memcpy(&rd[i].xBuf->B[ rd[i].bufPos], &rd[i].rBuf->B[ rd[i].bufPos], jrd) ;
785 snprintf(str,MXSTR,"read sock %3d bytes %5d len %5d first %d %d",i,jrd,rd[i].bufLen,
786 rd[i].rBuf->B[ rd[i].bufPos ],
787 rd[i].rBuf->B[ rd[i].bufPos +1] );
788 factOut(kDebug,301, str ) ;
789#endif
790 }
791
792 if (jrd == 0) { //connection has closed ...
793 snprintf(str,MXSTR,"Socket %d closed by FAD",i);
794 factOut(kInfo,441, str ) ;
795 GenSock(s0, i, 0,NULL, &rd[i]) ;
796 gi.gotErr[ b ]++ ;
797 gi_NumConnect[ b ]-- ;
798 gi.numConn[ b ]-- ;
799 gj.numConn[ b ]-- ;
800
801 } else if ( jrd<0 ) { //did not read anything
802 if (errno != EAGAIN && errno != EWOULDBLOCK ) {
803 snprintf(str,MXSTR,"Error Reading from %d | %m",i);
804 factOut(kError,442, str ) ;
805 gi.gotErr[ b ]++ ;
806 } else numok-- ; //else nothing waiting to be read
807 jrd = 0 ;
808 }
809 } else {
810 jrd = 0 ; //did read nothing as requested
811 snprintf(str,MXSTR,"do not read from socket %d %d",i,rd[i].bufLen ) ;
812 factOut(kDebug,301, str ) ;
813 }
814
815 gi.gotByte[ b ] += jrd ;
816 gj.rateBytes[b] += jrd ;
817
818 if (jrd>0) {numokx++ ; jrdx+= jrd; }
819
820
821 if ( rd[i].bufTyp <0 ) { // we are skipping this board ...
822// just do nothing
823#ifdef EVTDEBUG
824 snprintf(str,MXSTR,"skipping %d bytes on socket %d",jrd,i) ;
825 factOut(kInfo,301, str ) ;
826#endif
827
828 } else if ( rd[i].bufTyp >0 ) { // we are reading data ...
829 if ( jrd < rd[i].bufLen ) { //not yet all read
830 rd[i].bufPos += jrd ; //==> prepare for continuation
831 rd[i].bufLen -= jrd ;
832 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 0,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
833 } else { //full dataset read
834 rd[i].bufLen = 0 ;
835 rd[i].bufPos = rd[i].fadLen ;
836 if ( rd[i].rBuf->B[ rd[i].bufPos-1] != stop.B[0]
837 || rd[i].rBuf->B[ rd[i].bufPos-2] != stop.B[1]) {
838 gi.evtErr++ ;
839 snprintf(str,MXSTR,"wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
840 i,rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0], rd[i].rBuf->B[1], rd[i].rBuf->B[ rd[i].bufPos-2],
841 rd[i].rBuf->B[ rd[i].bufPos-1]);
842 factOut(kError,301, str ) ;
843 goto EndBuf ;
844
845#ifdef EVTDEBUG
846 } else {
847 snprintf(str,MXSTR,"good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
848 i,rd[i].fadLen,
849 rd[i].rBuf->B[ 0 ], rd[i].rBuf->B[ 1 ], start.B[1],start.B[0],
850 rd[i].rBuf->B[ rd[i].bufPos-2], rd[i].rBuf->B[ rd[i].bufPos-1], stop.B[1], stop.B[0]);
851 factOut(kDebug,301, str ) ;
852#endif
853 }
854
855 if (jrd>0) debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID, 1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
856
857 //we have a complete buffer, copy to WORK area
858 int jr ;
859 roi[0] = ntohs(rd[i].rBuf->S[ head_len/2 + 2 ]) ;
860 for (jr=0; jr<9; jr++) {
861 roi[jr] = ntohs(rd[i].rBuf->S[ head_len/2 + 2 + jr*(roi[0]+4) ]) ;
862 }
863 //get index into mBuffer for this event (create if needed)
864
865 int actid;
866 if (g_useFTM >0) actid = rd[i].evtID ;
867 else actid = rd[i].ftmID ;
868
869 evID = mBufEvt( rd[i].evtID, rd[i].runID, roi, i,
870 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID, rd[i].evtID ) ;
871
872 if (evID <-1000) {
873 goto EndBuf ; //not usable board/event/run --> skip it
874 }
875 if (evID < 0) { //no space left, retry later
876#ifdef EVTDEBUG
877 if ( rd[i].bufLen != 0) {
878 snprintf(str,MXSTR,"something screwed up");
879 factOut(kFatal, 1, str ) ;
880 }
881#endif
882 xwait.tv_sec = 0;
883 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec
884 nanosleep( &xwait , NULL ) ;
885 goto EndBuf1 ; //hope there is free space next round
886 }
887
888
889 //we have a valid entry in mBuffer[]; fill it
890
891#ifdef EVTDEBUG
892 int xchk = memcmp(&rd[i].xBuf->B[0], &rd[i].rBuf->B[0], rd[i].fadLen ) ;
893 if (xchk != 0) {
894 snprintf(str,MXSTR,"ERROR OVERWRITE %d %d on port %d",xchk,rd[i].fadLen,i) ;
895 factOut(kFatal, 1, str ) ;
896
897 uint iq;
898 for (iq=0; iq < rd[i].fadLen ; iq++) {
899 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq] ) {
900 snprintf(str,MXSTR,"ERROR %4d %4d %x %x",i,iq,rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
901 factOut(kFatal, 1, str ) ;
902 }
903 }
904 }
905#endif
906 int qncpy = 0 ;
907 boardId = b ;
908 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
909 int fadCrate = fadBoard/256 ;
910 if (boardId != (fadCrate*10 + fadBoard%256) ) {
911 snprintf(str,MXSTR,"wrong Board ID %d %d %d",fadCrate,fadBoard%256,boardId) ;
912 factOut(kWarn,301, str ) ;
913 }
914 if ( mBuffer[evID].board[ boardId ] != -1) {
915 snprintf(str,MXSTR,"double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ",
916 evID,boardId,i,rd[i].fadLen, rd[i].rBuf->B[0],
917 rd[i].rBuf->B[1], rd[i].rBuf->B[ rd[i].bufPos-2], rd[i].rBuf->B[ rd[i].bufPos-1]);
918 factOut(kWarn,501, str ) ;
919 goto EndBuf ; //--> skip Board
920 }
921
922 int iDx = evtIdx[evID] ; //index into evtCtrl
923
924 memcpy( &mBuffer[evID].FADhead[boardId].start_package_flag,
925 &rd[i].rBuf->S[0], head_len) ;
926 qncpy+=head_len ;
927
928 src = head_len/2 ;
929 for ( px=0; px<9; px++ ) { //different sort in FAD board.....
930 for ( drs=0; drs<4; drs++ ) {
931 pixH= ntohs(rd[i].rBuf->S[src++]) ; // ID
932 pixC= ntohs(rd[i].rBuf->S[src++]) ; // start-cell
933 pixR= ntohs(rd[i].rBuf->S[src++]) ; // roi
934//here we should check if pixH is correct ....
935
936 pixS = boardId*36 + drs*9 + px ;
937 src++ ;
938
939
940 mBuffer[evID].fEvent->StartPix[pixS] =pixC;
941 dest= pixS * roi[0] ;
942 memcpy(
943 &mBuffer[evID].fEvent->Adc_Data[dest],
944 &rd[i].rBuf->S[src], roi[0] * 2) ;
945 qncpy+=roi[0]*2 ;
946 src+= pixR ;
947
948 if ( px==8 ) {
949 tmS =boardId*4 + drs ;
950 if ( pixR > roi[0]) { //and we have additional TM info
951 dest= tmS * roi[0] + NPIX* roi[0] ;
952 int srcT= src - roi[0] ;
953 mBuffer[evID].fEvent->StartTM[tmS] = (pixC+pixR-roi[0])%1024 ;
954 memcpy(
955 &mBuffer[evID].fEvent->Adc_Data[dest],
956 &rd[i].rBuf->S[srcT], roi[0] * 2) ;
957 qncpy+=roi[0]*2 ;
958 } else {
959 mBuffer[evID].fEvent->StartTM[tmS] = -1 ;
960 }
961 }
962 }
963 }// now we have stored a new board contents into Event structure
964
965 mBuffer[evID].fEvent->NumBoards++ ;
966 mBuffer[evID].board[ boardId ] = boardId ;
967 evtCtrl.evtStat[ iDx ]++ ;
968 evtCtrl.pcTime[ iDx ] = g_actTime ;
969
970 if (++mBuffer[evID].nBoard >= actBoards ) {
971 int qnrun =0 ;
972 if (mBuffer[evID].runNum != actrun ) { // have we already reported first event of this run ???
973 actrun = mBuffer[evID].runNum ;
974 int ir ;
975 for ( ir=0; ir<MAX_RUN; ir++) {
976 qnrun++ ;
977 if ( runCtrl[ir].runId == actrun) {
978 if ( ++runCtrl[ir].lastEvt ==0 ) {
979 gotNewRun( actrun, mBuffer[evID].FADhead );
980 snprintf(str,MXSTR,"gotNewRun %d (ev %d)",mBuffer[evID].runNum,mBuffer[evID].evNum);
981 factOut(kInfo,1, str ) ;
982 break ;
983 }
984 }
985 }
986 }
987 snprintf(str,MXSTR,"%5d complete event roi %4d roiTM %d cpy %8d %5d",
988 mBuffer[evID].evNum,roi[0],roi[8]-roi[0],qncpy,qnrun);
989 factOut(kDebug,-1, str ) ;
990factOut(kInfo,-1, str ) ;
991
992 //complete event read ---> flag for next processing
993 evtCtrl.evtStat[ iDx ] = 99;
994 gi.evtTot++ ;
995 }
996
997EndBuf:
998 rd[i].bufTyp = 0 ; //ready to read next header
999 rd[i].bufLen = frst_len ;
1000 rd[i].bufPos = 0 ;
1001EndBuf1:
1002 ;
1003 }
1004
1005 } else { //we are reading event header
1006 rd[i].bufPos += jrd ;
1007 rd[i].bufLen -= jrd ;
1008 if ( rd[i].bufPos >= minLen ){ //sufficient data to take action
1009 //check if startflag correct; else shift block ....
1010 for (k=0; k<rd[i].bufPos -1 ; k++) {
1011 if (rd[i].rBuf->B[k ] == start.B[1]
1012 && rd[i].rBuf->B[k+1] == start.B[0] ) break ;
1013 }
1014 rd[i].skip += k ;
1015
1016 if (k >= rd[i].bufPos-1 ) { //no start of header found
1017 rd[i].bufPos = 0 ;
1018 rd[i].bufLen = head_len ;
1019 } else if ( k>0 ) {
1020 rd[i].bufPos -= k ;
1021 rd[i].bufLen += k ;
1022 memcpy(&rd[i].rBuf->B[0], &rd[i].rBuf->B[k], rd[i].bufPos ) ;
1023#ifdef EVTDEBUG
1024 memcpy(&rd[i].xBuf->B[0], &rd[i].xBuf->B[k], rd[i].bufPos ) ;
1025#endif
1026 }
1027 if ( rd[i].bufPos >= minLen ) {
1028 if ( rd[i].skip >0 ) {
1029 snprintf(str,MXSTR,"skipped %d bytes on port%d", rd[i].skip, i ) ;
1030 factOut(kInfo,666, str ) ;
1031 rd[i].skip = 0 ;
1032 }
1033 goodhed++;
1034 rd[i].fadLen = ntohs(rd[i].rBuf->S[1])*2 ;
1035 rd[i].fadVers= ntohs(rd[i].rBuf->S[2]) ;
1036 rd[i].ftmTyp = ntohl(rd[i].rBuf->S[5]) ;
1037 rd[i].evtID = ntohl(rd[i].rBuf->I[4]) ; //(FADevt)
1038 rd[i].ftmID = ntohl(rd[i].rBuf->I[5]) ; //(FTMevt)
1039 rd[i].runID = ntohl(rd[i].rBuf->I[11]) ;
1040 rd[i].bufTyp = 1 ; //ready to read full record
1041 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos ;
1042
1043 int fadboard = ntohs(rd[i].rBuf->S[12] ) ;
1044 int fadcrate = fadboard/256 ;
1045 fadboard = (fadcrate*10 + fadboard%256) ;
1046#ifdef EVTDEBUG
1047 snprintf(str,MXSTR,"sk %3d head: %5d %5d %5d %10d %4d %6d",i,rd[i].fadLen,rd[i].evtID,rd[i].ftmID,rd[i].runID,fadboard,jrd) ;
1048 factOut(kDebug,1, str ) ;
1049#endif
1050
1051 if (rd[i].runID ==0 ) rd[i].runID = gi_myRun ;
1052
1053 if (rd[i].bufLen <=head_len || rd[i].bufLen > MAX_LEN ) {
1054 snprintf(str,MXSTR,"illegal event-length on port %d",i) ;
1055 factOut(kFatal,881, str ) ;
1056 rd[i].bufLen = 100000 ; //?
1057 }
1058 int fadBoard = ntohs(rd[i].rBuf->S[12] ) ;
1059 debugHead(i,fadBoard,rd[i].rBuf);
1060 debugRead(i,jrd,rd[i].evtID,rd[i].ftmID,rd[i].runID,-1,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1061 } else {
1062 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1063 }
1064 } else {
1065 debugRead(i,jrd,0,0,0,-2,tsec,tusec) ; // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1066 }
1067
1068 } //end interpreting last read
1069 } //end of successful read anything
1070 } //finished trying to read all sockets
1071
1072#ifdef EVTDEBUG
1073 snprintf(str,MXSTR,"Loop ---- %3d --- %8d",numokx,jrdx);
1074 factOut(kDebug,-1, str ) ;
1075#endif
1076
1077 gi.numRead[ numok ] ++ ;
1078
1079 g_actTime = time(NULL) ;
1080 if ( g_actTime > gi_SecTime ) {
1081 gi_SecTime = g_actTime ;
1082
1083
1084 //loop over all active events and flag those older than read-timeout
1085 //delete those that are written to disk ....
1086
1087 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1088 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1089
1090 gj.bufNew = gj.bufEvt = 0 ;
1091 int k1=evtCtrl.frstPtr;
1092 for ( k=k1; k<(k1+kd); k++ ) {
1093 int k0 = k % (MAX_EVT*MAX_RUN) ;
1094//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1095
1096 if (evtCtrl.evtStat[k0] > 0
1097 && evtCtrl.evtStat[k0] < 92 ) {
1098 gj.bufNew++ ; //incomplete event in Buffer
1099 if ( evtCtrl.evtStat[k0] < 90
1100 && evtCtrl.pcTime[k0] < g_actTime-10 ) {
1101 int id =evtCtrl.evtBuf[k0] ;
1102 snprintf(str,MXSTR,"%5d skip short evt %8d %8d %2d",mBuffer[id].evNum,evtCtrl.evtBuf[k0],k0 ,evtCtrl.evtStat[k0]);
1103 factOut(kWarn,601, str ) ;
1104 evtCtrl.evtStat[k0] = 91 ; //timeout for incomplete events
1105 gi.evtSkp++ ;
1106 gi.evtTot++ ;
1107 gj.evtSkip++;
1108 }
1109 } else if (evtCtrl.evtStat[k0] >= 900 //'delete'
1110 || evtCtrl.evtStat[k0] == 0 ) { //'useless'
1111
1112 int id =evtCtrl.evtBuf[k0] ;
1113 snprintf(str,MXSTR,"%5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
1114 factOut(kDebug,-1, str ) ;
1115 mBufFree(id) ; //event written--> free memory
1116 evtCtrl.evtStat[k0] = -1;
1117 gj.evtWrite++ ;
1118 gj.rateWrite++ ;
1119 } else if (evtCtrl.evtStat[k0] >= 95 ) {
1120 gj.bufEvt++ ; //complete event in Buffer
1121 }
1122
1123 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 ) {
1124 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
1125 }
1126 }
1127
1128
1129 gj.deltaT = 1000 ; //temporary, must be improved
1130
1131 int b;
1132 for ( b=0; b<NBOARDS; b++) gj.totBytes[b] +=gj.rateBytes[b] ;
1133 gj.totMem = g_maxMem ;
1134 if (gj.maxMem > gj.xxxMem) gj.xxxMem = gj.maxMem ;
1135 if (gj.maxEvt > gj.xxxEvt) gj.xxxEvt = gj.maxEvt ;
1136
1137 factStat(gj);
1138 factStatNew(gi) ;
1139 gj.rateNew = gj.rateWrite = 0 ;
1140 gj.maxMem = gj.usdMem ;
1141 gj.maxEvt = gj.bufTot ;
1142 for ( b=0; b<NBOARDS; b++) gj.rateBytes[b] =0 ;
1143 }
1144
1145 if (numok > 0 ) numok2=0;
1146 else if (numok2++ > 3) {
1147 if (g_runStat == 1) {
1148 xwait.tv_sec = 1;
1149 xwait.tv_nsec= 0 ; // hibernate for 1 sec
1150 } else {
1151 xwait.tv_sec = 0;
1152 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1153 }
1154 nanosleep( &xwait , NULL ) ;
1155 }
1156
1157 } //and do next loop over all sockets ...
1158
1159
1160 snprintf(str,MXSTR,"stop reading ... RESET=%d",g_reset);
1161 factOut(kInfo,-1, str ) ;
1162
1163 if (g_reset >0 ) {
1164 gi_reset = g_reset ;
1165 gi_resetR = gi_reset%10 ; //shall we stop reading ?
1166 gi_resetS = (gi_reset/10)%10 ; //shall we close sockets ?
1167 gi_resetW = (gi_reset/100)%10 ; //shall we close files ?
1168 gi_resetX = gi_reset/1000 ; //shall we simply wait resetX seconds ?
1169 g_reset= 0;
1170 } else {
1171 gi_reset = 0;
1172 if ( g_runStat== -1 ) gi_resetR = 1 ;
1173 else gi_resetR = 7 ;
1174 gi_resetS = 7 ; //close all sockets
1175 gi_resetW = 7 ; //close all files
1176 gi_resetX = 0 ;
1177
1178 //inform others we have to quit ....
1179 gi_runStat = -11 ; //inform all that no update to happen any more
1180 gj.readStat= -11 ; //inform all that no update to happen any more
1181 }
1182
1183 if (gi_resetS > 0) {
1184 //must close all open sockets ...
1185 snprintf(str,MXSTR,"close all sockets ...");
1186 factOut(kInfo,-1, str ) ;
1187 for (i=0; i<MAX_SOCK; i++) {
1188 if (rd[i].sockStat ==0 ) {
1189 GenSock(-1, i, 0, NULL, &rd[i]) ; //close and destroy open socket
1190 if ( i%7 == 0 ) {
1191 gi_NumConnect[ i/7 ]-- ;
1192 gi.numConn[ i/7 ]-- ;
1193 gj.numConn[ i/7 ]-- ;
1194 sockDef[i/7] = 0 ; //flag ro recreate the sockets ...
1195 rd[i/7].sockStat = -1; //and try to open asap
1196 }
1197 }
1198 }
1199 }
1200
1201
1202 if (gi_resetR > 0) {
1203 //flag all events as 'read finished'
1204 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1205 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1206
1207 int k1=evtCtrl.frstPtr;
1208 for ( k=k1; k<(k1+kd); k++ ) {
1209 int k0 = k % (MAX_EVT*MAX_RUN) ;
1210 if (evtCtrl.evtStat[k0] > 0
1211 && evtCtrl.evtStat[k0] < 90 ) {
1212 evtCtrl.evtStat[k0] = 91 ;
1213 gi.evtSkp++ ;
1214 gi.evtTot++ ;
1215 }
1216 }
1217
1218 xwait.tv_sec = 0;
1219 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1220 nanosleep( &xwait , NULL ) ;
1221
1222 //and clear all buffers (might have to wait until all others are done)
1223 int minclear ;
1224 if (gi_resetR == 1) {
1225 minclear = 900 ;
1226 snprintf(str,MXSTR,"drain all buffers ...");
1227 } else {
1228 minclear = 0 ;
1229 snprintf(str,MXSTR,"flush all buffers ...");
1230 }
1231 factOut(kInfo,-1, str ) ;
1232
1233 int numclear=1 ;
1234 while (numclear > 0 ) {
1235 numclear = 0 ;
1236 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1237 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1238
1239 int k1=evtCtrl.frstPtr;
1240 for ( k=k1; k<(k1+kd); k++ ) {
1241 int k0 = k % (MAX_EVT*MAX_RUN) ;
1242 if (evtCtrl.evtStat[k0] > minclear ) {
1243 int id =evtCtrl.evtBuf[k0] ;
1244 snprintf(str,MXSTR,"ev %5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard ) ;
1245 factOut(kDebug,-1, str ) ;
1246 mBufFree(id) ; //event written--> free memory
1247 evtCtrl.evtStat[k0] = -1;
1248 } else if (evtCtrl.evtStat[k0] > 0) numclear++ ; //writing is still ongoing...
1249
1250 if ( k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] <0 )
1251 evtCtrl.frstPtr = (evtCtrl.frstPtr+1) % (MAX_EVT*MAX_RUN) ;
1252 }
1253
1254 xwait.tv_sec = 0;
1255 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1256 nanosleep( &xwait , NULL ) ;
1257 }
1258 }
1259
1260 if (gi_reset > 0) {
1261 if (gi_resetW > 0) {
1262 CloseRunFile(0,0,0) ; //ask all Runs to be closed
1263 }
1264 if (gi_resetX > 0) {
1265 xwait.tv_sec = gi_resetX;
1266 xwait.tv_nsec= 0 ;
1267 nanosleep( &xwait , NULL ) ;
1268 }
1269
1270 snprintf(str,MXSTR,"Continue read Process ...");
1271 factOut(kInfo,-1, str ) ;
1272 gi_reset = 0 ;
1273 goto START ;
1274 }
1275
1276
1277
1278 snprintf(str,MXSTR,"Exit read Process ...");
1279 factOut(kInfo,-1, str ) ;
1280 gi_runStat = -99 ;
1281 gj.readStat= -99 ;
1282 factStat(gj);
1283 factStatNew(gi) ;
1284 return 0;
1285
1286} /*-----------------------------------------------------------------*/
1287
1288
1289void *procEvt( void *ptr ) {
1290/* *** main loop processing file, including SW-trigger */
1291 int numProc, numWait ;
1292 int k ;
1293 struct timespec xwait ;
1294 char str[MXSTR] ;
1295
1296 cpu_set_t mask;
1297 int cpu = 5 ; //process thread (will be several in final version)
1298
1299 snprintf(str,MXSTR,"Starting process-thread");
1300 factOut(kInfo,-1, str ) ;
1301
1302/* CPU_ZERO initializes all the bits in the mask to zero. */
1303 CPU_ZERO( &mask );
1304/* CPU_SET sets only the bit corresponding to cpu. */
1305 CPU_SET( cpu, &mask );
1306/* sched_setaffinity returns 0 in success */
1307 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1308 snprintf(str,MXSTR,"P ---> can not create affinity to %d",cpu);
1309 factOut(kWarn,-1, str ) ;
1310 }
1311
1312
1313 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1314
1315 numWait = numProc = 0 ;
1316 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1317 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1318
1319 int k1=evtCtrl.frstPtr;
1320 for ( k=k1; k<(k1+kd); k++ ) {
1321 int k0 = k % (MAX_EVT*MAX_RUN) ;
1322//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1323 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] <500) {
1324
1325 if ( gi_resetR > 1 ) { //we are asked to flush buffers asap
1326 evtCtrl.evtStat[k0] = 991 ;
1327 } else {
1328
1329 int id = evtCtrl.evtBuf[k0] ;
1330 int ievt = mBuffer[id].evNum ;
1331 int itevt= mBuffer[id].trgNum ;
1332 int itrg = mBuffer[id].trgTyp ;
1333 int roi = mBuffer[id].nRoi ;
1334 int roiTM= mBuffer[id].nRoiTM ;
1335 int Errors=mBuffer[id].Errors ;
1336// uint32_t irun = mBuffer[id].runNum ;
1337//snprintf(str,MXSTR,"P processing %d %d %d %d",ievt,k,id,evtCtrl.evtStat[k0]) ;
1338//factOut(kDebug,-1, str ) ;
1339
1340//make sure unused pixels/tmarks are cleared to zero
1341 if (roiTM == roi) roiTM=0 ;
1342 int ip,it,dest,ib;
1343 for (ip=0; ip<NPIX; ip++) {
1344 if (mBuffer[id].fEvent->StartPix[ip] == -1 ) {
1345 dest= ip*roi ;
1346 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
1347 }
1348 }
1349 for (it=0; it<NTMARK; it++) {
1350 if (mBuffer[id].fEvent->StartTM[it] == -1 ) {
1351 dest= it*roi + NPIX*roi ;
1352 bzero( &mBuffer[id].fEvent->Adc_Data[dest], roi*2) ;
1353 }
1354 }
1355
1356
1357//and set correct event header ; also check for consistency in event (not yet)
1358 mBuffer[id].fEvent->Roi = roi ;
1359 mBuffer[id].fEvent->RoiTM = roiTM ;
1360 mBuffer[id].fEvent->EventNum = ievt ;
1361 mBuffer[id].fEvent->TriggerNum = itevt ;
1362 mBuffer[id].fEvent->TriggerType = itrg ;
1363 mBuffer[id].fEvent->Errors = Errors ;
1364 mBuffer[id].fEvent->SoftTrig = 0 ;
1365
1366
1367 for (ib=0; ib<NBOARDS; ib++) {
1368 if (mBuffer[id].board[ib] == -1 ) { //board is not read
1369 mBuffer[id].FADhead[ib].start_package_flag = 0 ;
1370 mBuffer[id].fEvent->BoardTime[ib] = 0 ;
1371 } else {
1372 mBuffer[id].fEvent->BoardTime[ib] =
1373 ntohl(mBuffer[id].FADhead[ib].time) ;
1374 }
1375 }
1376
1377 int i=eventCheck(mBuffer[id].FADhead,mBuffer[id].fEvent) ;
1378// gj.procEvt++ ;
1379 gi.procTot++ ;
1380 numProc++ ;
1381
1382 if (i<0) {
1383 evtCtrl.evtStat[k0] = 999 ; //flag event to be skipped
1384 gi.procErr++ ;
1385 } else {
1386 evtCtrl.evtStat[k0] = 520 ;
1387 }
1388 }
1389 } else if ( evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] < 90 ) {
1390 numWait++ ;
1391 }
1392 }
1393
1394 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do
1395 snprintf(str,MXSTR,"Exit Processing Process ...");
1396 factOut(kInfo,-1, str ) ;
1397 gp_runStat = -22 ; //==> we should exit
1398 gj.procStat= -22 ; //==> we should exit
1399 return 0 ;
1400 }
1401
1402 if (numProc == 0) {
1403 //seems we have nothing to do, so sleep a little
1404 xwait.tv_sec = 0;
1405 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1406 nanosleep( &xwait , NULL ) ;
1407 }
1408 gp_runStat = gi_runStat ;
1409 gj.procStat= gj.readStat ;
1410
1411 }
1412
1413 //we are asked to abort asap ==> must flag all remaining events
1414 // when gi_runStat claims that all events are in the buffer...
1415
1416 snprintf(str,MXSTR,"Abort Processing Process ...");
1417 factOut(kInfo,-1, str ) ;
1418 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1419 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1420
1421 int k1=evtCtrl.frstPtr;
1422 for ( k=k1; k<(k1+kd); k++ ) {
1423 int k0 = k % (MAX_EVT*MAX_RUN) ;
1424 if (evtCtrl.evtStat[k0] >=0 && evtCtrl.evtStat[k0] <500) {
1425 evtCtrl.evtStat[k0] = 555 ; //flag event as 'processed'
1426 }
1427 }
1428
1429 gp_runStat = -99 ;
1430 gj.procStat= -99 ;
1431
1432 return 0;
1433
1434} /*-----------------------------------------------------------------*/
1435
1436int CloseRunFile(uint32_t runId, uint32_t closeTime, uint32_t maxEvt) {
1437/* close run runId (all all runs if runId=0) */
1438/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1439 int j ;
1440
1441
1442 if ( runId == 0 ) {
1443 for ( j=0; j<MAX_RUN; j++) {
1444 if ( runCtrl[j].fileId == 0 ) { //run is open
1445 runCtrl[j].closeTime = closeTime ;
1446 runCtrl[j].maxEvt = maxEvt ;
1447 }
1448 }
1449 return 0;
1450 }
1451
1452 for ( j=0; j<MAX_RUN; j++) {
1453 if ( runCtrl[j].runId == runId ) {
1454 if ( runCtrl[j].fileId == 0 ) { //run is open
1455 runCtrl[j].closeTime = closeTime ;
1456 runCtrl[j].maxEvt = maxEvt ;
1457 return 0;
1458 } else if ( runCtrl[j].fileId <0 ) { //run not yet opened
1459 runCtrl[j].closeTime = closeTime ;
1460 runCtrl[j].maxEvt = maxEvt ;
1461 return +1;
1462 } else { // run already closed
1463 return +2;
1464 }
1465 }
1466 } //we only reach here if the run was never created
1467 return -1;
1468
1469} /*-----------------------------------------------------------------*/
1470
1471
1472void *writeEvt( void *ptr ) {
1473/* *** main loop writing event (including opening and closing run-files */
1474
1475 int numWrite, numWait ;
1476 int k,j,i ;
1477 struct timespec xwait ;
1478 char str[MXSTR] ;
1479
1480 cpu_set_t mask;
1481 int cpu = 3 ; //write thread
1482
1483 snprintf(str,MXSTR,"Starting write-thread");
1484 factOut(kInfo,-1, str ) ;
1485
1486/* CPU_ZERO initializes all the bits in the mask to zero. */
1487 CPU_ZERO( &mask );
1488/* CPU_SET sets only the bit corresponding to cpu. */
1489 CPU_SET( cpu, &mask );
1490/* sched_setaffinity returns 0 in success */
1491 if ( sched_setaffinity( 0, sizeof(mask), &mask ) == -1 ) {
1492 snprintf(str,MXSTR,"W ---> can not create affinity to %d",cpu);
1493 }
1494
1495 int lastRun = 0 ; //usually run from last event still valid
1496
1497 while (g_runStat >-2) {
1498
1499 numWait = numWrite = 0 ;
1500 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr ;
1501 if ( kd < 0 ) kd+= (MAX_EVT*MAX_RUN) ;
1502
1503 int k1=evtCtrl.frstPtr;
1504 for ( k=k1; k<(k1+kd); k++ ) {
1505 int k0 = k % (MAX_EVT*MAX_RUN) ;
1506//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1507 if (evtCtrl.evtStat[k0] > 500 && evtCtrl.evtStat[k0] < 900) {
1508
1509 if ( gi_resetR > 1 ) { //we must drain the buffer asap
1510 evtCtrl.evtStat[k0] = 904 ;
1511 } else {
1512
1513
1514 int id = evtCtrl.evtBuf[k0] ;
1515 uint32_t irun = mBuffer[id].runNum ;
1516 int ievt = mBuffer[id].evNum ;
1517
1518 gi.wrtTot++ ;
1519 if (runCtrl[lastRun].runId == irun ) {
1520 j = lastRun ;
1521 } else {
1522 //check which fileID to use (or open if needed)
1523 for ( j=0; j<MAX_RUN; j++) {
1524 if ( runCtrl[j].runId == irun ) break ;
1525 }
1526 if ( j >= MAX_RUN ) {
1527 snprintf(str,MXSTR,"W error: can not find run %d for event %d in %d", irun,ievt,id);
1528 factOut(kFatal,901, str ) ;
1529 gi.wrtErr++ ;
1530 }
1531 lastRun = j ;
1532 }
1533
1534 if (runCtrl[j].fileId < 0 ) {
1535 actRun.Version = 1 ;
1536 actRun.RunType = -1 ; //to be adapted
1537
1538 actRun.Nroi = runCtrl[j].roi0 ;
1539 actRun.NroiTM = runCtrl[j].roi8 ;
1540 if ( actRun.Nroi == actRun.NroiTM ) actRun.NroiTM = 0 ;
1541 actRun.RunTime = runCtrl[j].firstTime ;
1542 actRun.RunUsec = runCtrl[j].firstTime ;
1543 actRun.NBoard = NBOARDS ;
1544 actRun.NPix = NPIX ;
1545 actRun.NTm = NTMARK ;
1546 actRun.Nroi = mBuffer[id].nRoi ;
1547 memcpy(actRun.FADhead, mBuffer[id].FADhead, NBOARDS* sizeof(PEVNT_HEADER) ) ;
1548
1549 runCtrl[j].fileHd = runOpen(irun, &actRun, sizeof(actRun) ) ;
1550 if (runCtrl[j].fileHd == NULL ) {
1551 snprintf(str,MXSTR,"W could not open a file for run %d",irun);
1552 factOut(kError,502, str ) ;
1553 runCtrl[j].fileId = 91 ;
1554 } else {
1555 snprintf(str,MXSTR,"W opened new run_file %d evt %d",irun,ievt) ;
1556 factOut(kInfo,-1, str ) ;
1557 runCtrl[j].fileId = 0 ;
1558 }
1559
1560 }
1561
1562 if (runCtrl[j].fileId != 0 ) {
1563 if (runCtrl[j].fileId < 0 ) {
1564 snprintf(str,MXSTR,"W never opened file for this run %d",irun) ;
1565 factOut(kError,123,str) ;
1566 } else if (runCtrl[j].fileId < 100 ) {
1567 snprintf(str,MXSTR,"W.file for this run is closed %d",irun) ;
1568 factOut(kWarn,123,str) ;
1569 runCtrl[j].fileId += 100 ;
1570 } else {
1571 snprintf(str,MXSTR,"W:file for this run is closed %d",irun) ;
1572 factOut(kDebug,123,str) ;
1573 }
1574 evtCtrl.evtStat[k0] = 903 ;
1575 gi.wrtErr++ ;
1576 } else {
1577 i=runWrite(runCtrl[j].fileHd, mBuffer[id].fEvent, sizeof(mBuffer[id]) );
1578 if ( i>=0 ) {
1579 runCtrl[j].lastTime = g_actTime;
1580 runCtrl[j].actEvt++ ;
1581 evtCtrl.evtStat[k0] = 901 ;
1582 snprintf(str,MXSTR,"%5d successfully wrote for run %d id %5d",ievt,irun,k0);
1583 factOut(kDebug,504, str ) ;
1584// gj.writEvt++ ;
1585 } else {
1586 snprintf(str,MXSTR,"W error writing event for run %d",irun) ;
1587 factOut(kError,503, str ) ;
1588 evtCtrl.evtStat[k0] = 902 ;
1589 gi.wrtErr++ ;
1590 }
1591
1592 if ( i < 0
1593 || runCtrl[j].lastTime < g_actTime-300
1594 || runCtrl[j].closeTime < g_actTime
1595 || runCtrl[j].maxEvt < runCtrl[j].actEvt ) {
1596int ii =0 ;
1597if ( i < 0 ) ii=1 ;
1598else if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1599else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1600else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1601
1602
1603
1604 //close run for whatever reason
1605 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1606 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1607 if (i<0) {
1608 snprintf(str,MXSTR,"error closing run %d %d AAA",runCtrl[j].runId,i) ;
1609 factOut(kError,503, str ) ;
1610 runCtrl[j].fileId = 92 ;
1611 } else {
1612 snprintf(str,MXSTR,"W closed run %d AAA %d",irun,ii) ;
1613 factOut(kInfo,503, str ) ;
1614 runCtrl[j].fileId = 93 ;
1615 }
1616 }
1617 }
1618 }
1619 } else if (evtCtrl.evtStat[k0] > 0
1620 && evtCtrl.evtStat[k0] < 900 ) numWait++ ;
1621 }
1622
1623 //check if we should close a run (mainly when no event pending)
1624 for ( j=0; j<MAX_RUN; j++) {
1625 if ( runCtrl[j].fileId==0
1626 && ( runCtrl[j].closeTime < g_actTime
1627 ||runCtrl[j].lastTime < g_actTime-300
1628 ||runCtrl[j].maxEvt < runCtrl[j].actEvt ) ) {
1629 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1630int ii =0 ;
1631 if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1632else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1633else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1634
1635
1636 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1637 if (i<0) {
1638 snprintf(str,MXSTR,"error closing run %d %d BBB",runCtrl[j].runId,i) ;
1639 factOut(kError,506, str ) ;
1640 runCtrl[j].fileId = 94 ;
1641 } else {
1642 snprintf(str,MXSTR,"W closed run %d BBB %d",runCtrl[j].runId,ii) ;
1643 factOut(kInfo,507, str ) ;
1644 runCtrl[j].fileId = 95 ;
1645 }
1646 }
1647 }
1648
1649 if (numWrite == 0) {
1650 //seems we have nothing to do, so sleep a little
1651 xwait.tv_sec = 0;
1652 xwait.tv_nsec= 2000000 ; // sleep for ~2 msec
1653 nanosleep( &xwait , NULL ) ;
1654 }
1655
1656 if ( gj.readStat < -10 && numWait == 0) { //nothing left to do
1657 snprintf(str,MXSTR,"Finish Write Process ...");
1658 factOut(kInfo,-1, str ) ;
1659 gw_runStat = -22 ; //==> we should exit
1660 gj.writStat= -22 ; //==> we should exit
1661 goto closerun ;
1662 }
1663 gw_runStat = gi_runStat ;
1664 gj.writStat= gj.readStat ;
1665
1666 }
1667
1668 //must close all open files ....
1669 snprintf(str,MXSTR,"Abort Writing Process ...");
1670 factOut(kInfo,-1, str ) ;
1671
1672closerun:
1673 snprintf(str,MXSTR,"Close all open files ...");
1674 factOut(kInfo,-1, str ) ;
1675 for ( j=0; j<MAX_RUN; j++)
1676 if ( runCtrl[j].fileId ==0 ) {
1677 if (runCtrl[j].runId == gi_myRun) gi_myRun = g_actTime ;
1678 i=runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]) );
1679int ii =0 ;
1680 if (runCtrl[j].closeTime < g_actTime ) ii=2 ;
1681else if (runCtrl[j].lastTime < g_actTime-300 ) ii=3 ;
1682else if (runCtrl[j].maxEvt < runCtrl[j].actEvt ) ii=4 ;
1683 if (i<0) {
1684 snprintf(str,MXSTR,"error closing run %d %d CCC",runCtrl[j].runId,i) ;
1685 factOut(kError,506, str ) ;
1686 runCtrl[j].fileId = 96 ;
1687 } else {
1688 snprintf(str,MXSTR,"W closed run %d CCC %d",runCtrl[j].runId,ii) ;
1689 factOut(kInfo,507, str ) ;
1690 runCtrl[j].fileId = 97 ;
1691 }
1692 }
1693
1694 gw_runStat = -99;
1695 gj.writStat= -99;
1696 snprintf(str,MXSTR,"Exit Writing Process ...");
1697 factOut(kInfo,-1, str ) ;
1698 return 0;
1699
1700
1701
1702
1703} /*-----------------------------------------------------------------*/
1704
1705
1706
1707
1708void StartEvtBuild() {
1709
1710 int i,j,imax,status,th_ret[50] ;
1711 pthread_t thread[50] ;
1712 struct timespec xwait ;
1713
1714 gi_runStat = gp_runStat = gw_runStat = 0 ;
1715 gj.readStat= gj.procStat= gj.writStat= 0 ;
1716
1717 snprintf(str,MXSTR,"Starting EventBuilder V15.07 A");
1718 factOut(kInfo,-1, str ) ;
1719
1720//initialize run control logics
1721 for (i=0; i<MAX_RUN; i++) {
1722 runCtrl[i].runId = 0 ;
1723 runCtrl[i].fileId = -2 ;
1724 }
1725
1726//partially initialize event control logics
1727 evtCtrl.frstPtr = 0 ;
1728 evtCtrl.lastPtr = 0 ;
1729
1730//start all threads (more to come) when we are allowed to ....
1731 while (g_runStat == 0 ) {
1732 xwait.tv_sec = 0;
1733 xwait.tv_nsec= 10000000 ; // sleep for ~10 msec
1734 nanosleep( &xwait , NULL ) ;
1735 }
1736
1737 i=0 ;
1738 th_ret[i] = pthread_create( &thread[i], NULL, readFAD, NULL );
1739 i++;
1740 th_ret[i] = pthread_create( &thread[i], NULL, procEvt, NULL );
1741 i++;
1742 th_ret[i] = pthread_create( &thread[i], NULL, writeEvt, NULL );
1743 i++;
1744 imax=i ;
1745
1746
1747#ifdef BILAND
1748 xwait.tv_sec = 30;;
1749 xwait.tv_nsec= 0 ; // sleep for ~20sec
1750 nanosleep( &xwait , NULL ) ;
1751
1752 printf("close all runs in 2 seconds\n");
1753
1754 CloseRunFile( 0, time(NULL)+2, 0) ;
1755
1756 xwait.tv_sec = 1;;
1757 xwait.tv_nsec= 0 ; // sleep for ~20sec
1758 nanosleep( &xwait , NULL ) ;
1759
1760 printf("setting g_runstat to -1\n");
1761
1762 g_runStat = -1 ;
1763#endif
1764
1765
1766//wait for all threads to finish
1767 for (i=0; i<imax; i++) {
1768 j = pthread_join ( thread[i], (void **)&status) ;
1769 }
1770
1771} /*-----------------------------------------------------------------*/
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787 /*-----------------------------------------------------------------*/
1788 /*-----------------------------------------------------------------*/
1789 /*-----------------------------------------------------------------*/
1790 /*-----------------------------------------------------------------*/
1791 /*-----------------------------------------------------------------*/
1792
1793
1794#ifdef BILAND
1795
1796
1797 /*-----------------------------------------------------------------*/
1798 /*-----------------------------------------------------------------*/
1799 /*-----------------------------------------------------------------*/
1800 /*-----------------------------------------------------------------*/
1801 /*-----------------------------------------------------------------*/
1802
1803
1804
1805
1806FileHandle_t runOpen(uint32_t irun, RUN_HEAD *runhd, size_t len )
1807{ return 1; } ;
1808
1809int runWrite(FileHandle_t fileHd , EVENT *event, size_t len )
1810{ return 1; usleep(10000); return 1; }
1811
1812
1813//{ return 1; } ;
1814
1815int runClose(FileHandle_t fileHd , RUN_TAIL *runth, size_t len )
1816{ return 1; } ;
1817
1818
1819
1820int eventCheck( PEVNT_HEADER *fadhd, EVENT *event)
1821{
1822 int i=0;
1823
1824// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
1825// for (i=0; i<NBOARDS; i++) {
1826// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
1827// }
1828 return 0;
1829}
1830
1831
1832void factStatNew(EVT_STAT gi) {
1833 int i ;
1834
1835//for (i=0;i<MAX_SOCK;i++) {
1836// printf("%4d",gi.numRead[i]);
1837// if (i%20 == 0 ) printf("\n");
1838//}
1839}
1840
1841void gotNewRun( int runnr, PEVNT_HEADER *headers )
1842{ printf("got new run %d\n",runnr); return; }
1843
1844void factStat(GUI_STAT gj) {
1845// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
1846// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
1847}
1848
1849
1850void debugRead(int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr, int state, uint32_t tsec, uint32_t tusec ) {
1851// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
1852}
1853
1854
1855
1856void debugStream(int isock, void *buf, int len) {
1857}
1858
1859void debugHead(int i, int j, void *buf) {
1860}
1861
1862
1863void factOut(int severity, int err, char* message ) {
1864static FILE * fd ;
1865static int file=0 ;
1866
1867 if (file==0) {
1868 printf("open file\n");
1869 fd=fopen("x.out","w+") ;
1870 file=999;
1871 }
1872
1873 fprintf(fd,"%3d %3d | %s \n",severity,err,message) ;
1874
1875 if (severity != kDebug)
1876 printf("%3d %3d | %s\n",severity,err,message) ;
1877}
1878
1879
1880
1881int main() {
1882 int i,b,c,p ;
1883 char ipStr[100] ;
1884 struct in_addr IPaddr ;
1885
1886 g_maxMem = 1024*1024 ; //MBytes
1887//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
1888 g_maxMem = g_maxMem * 200; //100MBytes
1889
1890
1891 g_runStat = 40 ;
1892
1893 i=0 ;
1894
1895// version for standard crates
1896//for (c=0; c<4,c++) {
1897// for (b=0; b<10; b++) {
1898// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
1899//
1900// inet_pton(PF_INET, ipStr, &IPaddr) ;
1901//
1902// g_port[i].sockAddr.sin_family = PF_INET;
1903// g_port[i].sockAddr.sin_port = htons(5000) ;
1904// g_port[i].sockAddr.sin_addr = IPaddr ;
1905// g_port[i].sockDef = 1 ;
1906// i++ ;
1907// }
1908//}
1909//
1910//version for PC-test *
1911 for (c=0; c<4; c++) {
1912 for (b=0; b<10; b++) {
1913 sprintf(ipStr,"10.0.%d.11",128+c) ;
1914 if (c<2) sprintf(ipStr,"10.0.%d.11",128) ;
1915 else sprintf(ipStr,"10.0.%d.11",131) ;
1916// if (c==0) sprintf(ipStr,"10.0.100.11") ;
1917
1918 inet_pton(PF_INET, ipStr, &IPaddr) ;
1919 p = 31919+100*c+10*b;
1920
1921
1922 g_port[i].sockAddr.sin_family = PF_INET;
1923 g_port[i].sockAddr.sin_port = htons(p) ;
1924 g_port[i].sockAddr.sin_addr = IPaddr ;
1925 g_port[i].sockDef = 1 ;
1926
1927 i++ ;
1928 }
1929 }
1930
1931
1932//g_port[17].sockDef =-1 ;
1933//g_actBoards-- ;
1934
1935 StartEvtBuild() ;
1936
1937 return 0;
1938
1939}
1940#endif
Note: See TracBrowser for help on using the repository browser.