source: trunk/FACT++/src/EventBuilder.c@ 12091

Last change on this file since 12091 was 12091, checked in by tbretz, 13 years ago
Updated to the latest event builder version which includes runEnd and runStart to control the memory of the processing threads.
File size: 80.6 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6#define MIN_LEN 32 // min #bytes needed to interpret FADheader
7#define MAX_LEN 256*1024 // size of read-buffer per socket
8
9#include <stdlib.h>
10#include <stdint.h>
11#include <unistd.h>
12#include <stdio.h>
13#include <sys/time.h>
14#include <arpa/inet.h>
15#include <string.h>
16#include <math.h>
17#include <error.h>
18#include <errno.h>
19#include <unistd.h>
20#include <sys/types.h>
21#include <sys/socket.h>
22#include <netinet/in.h>
23#include <netinet/tcp.h>
24#include <pthread.h>
25#include <sched.h>
26
27#include "EventBuilder.h"
28
29enum Severity
30{
31 kMessage = 10, ///< Just a message, usually obsolete
32 kInfo = 20, ///< An info telling something which can be interesting to know
33 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
34 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
35 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
36 kDebug = 99, ///< A message used for debugging only
37};
38
39
40void *gi_procPtr;
41
42
43//#define nanosleep(x,y)
44
45extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
46extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
47extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
48//extern int runFinish (uint32_t runnr);
49
50
51
52extern void * runStart (uint32_t irun, RUN_HEAD * runhd, size_t len);
53
54extern int runEnd (uint32_t, void * runPtr );
55
56extern void factOut (int severity, int err, char *message);
57
58extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
59
60
61extern void factStat (GUI_STAT gj);
62
63extern void factStatNew (EVT_STAT gi);
64
65extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event,
66 int mboard);
67
68extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
69 int16_t mboard, void * buffer);
70
71extern void debugHead (int i, int j, void *buf);
72
73extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
74 int32_t runnr, int state, uint32_t tsec,
75 uint32_t tusec);
76extern void debugStream (int isock, void *buf, int len);
77
78int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
79
80
81int g_maxProc;
82int g_maxSize;
83int gi_maxSize =0;
84int gi_maxProc;
85uint32_t gi_actRun;
86void *gi_runPtr;
87
88uint g_actTime;
89uint g_actUsec;
90int g_runStat;
91int g_reset;
92int g_useFTM;
93
94int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
95size_t g_maxMem; //maximum memory allowed for buffer
96
97//no longer needed ...
98int g_maxBoards; //maximum number of boards to be initialized
99int g_actBoards;
100//
101
102FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
103
104
105int gi_runStat;
106int gp_runStat;
107int gw_runStat;
108
109int gi_memStat = +1;
110
111uint32_t gi_myRun = 0;
112uint32_t actrun = 0;
113
114
115uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
116
117//uint gi_EvtStart= 0 ;
118//uint gi_EvtRead = 0 ;
119//uint gi_EvtBad = 0 ;
120//uint gi_EvtTot = 0 ;
121//size_t gi_usedMem = 0 ;
122
123//uint gw_EvtTot = 0 ;
124//uint gp_EvtTot = 0 ;
125
126PIX_MAP g_pixMap[NPIX];
127
128EVT_STAT gi;
129GUI_STAT gj;
130
131EVT_CTRL evtCtrl; //control of events during processing
132int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
133
134WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
135
136
137
138
139RUN_HEAD actRun;
140
141RUN_CTRL runCtrl[MAX_RUN];
142
143RUN_TAIL runTail[MAX_RUN];
144
145
146/*
147*** Definition of rdBuffer to read in IP packets; keep it global !!!!
148 */
149
150
151typedef union
152{
153 int8_t B[MAX_LEN];
154 int16_t S[MAX_LEN / 2];
155 int32_t I[MAX_LEN / 4];
156 int64_t L[MAX_LEN / 8];
157} CNV_FACT;
158
159typedef struct
160{
161 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
162 int32_t bufPos; //next byte to read to the buffer next
163 int32_t bufLen; //number of bytes left to read
164// size_t bufLen; //number of bytes left to read size_t might be better
165 int32_t skip; //number of bytes skipped before start of event
166
167 int sockStat; //-1 if socket not yet connected , 99 if not exist
168 int socket; //contains the sockets
169 struct sockaddr_in SockAddr; //IP for each socket
170
171 int evtID; // event ID of event currently read
172 int runID; // run "
173 int ftmID; // event ID from FTM
174 uint fadLen; // FADlength of event currently read
175 int fadVers; // Version of FAD
176 int ftmTyp; // trigger type
177 int board; // boardID (softwareID: 0..40 )
178 int Port;
179
180 CNV_FACT *rBuf;
181
182#ifdef EVTDEBUG
183 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
184#endif
185
186} READ_STRUCT;
187
188
189typedef union
190{
191 int8_t B[2];
192 int16_t S;
193} SHORT_BYTE;
194
195
196
197
198
199#define MXSTR 1000
200char str[MXSTR];
201
202SHORT_BYTE start, stop;
203
204READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
205
206
207
208/*-----------------------------------------------------------------*/
209
210
211/*-----------------------------------------------------------------*/
212
213int
214runFinish1 (uint32_t runnr)
215{
216 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
217 runnr);
218 factOut (kInfo, 173, str); //but continue anyhow
219 return 0;
220}
221
222
223int
224GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
225 READ_STRUCT * rd)
226{
227/*
228*** generate Address, create sockets and allocates readbuffer for it
229***
230*** if flag==0 generate socket and buffer
231*** <0 destroy socket and buffer
232*** >0 close and redo socket
233***
234*** sid : board*7 + port id
235 */
236
237 int j;
238 int optval = 1; //activate keepalive
239 socklen_t optlen = sizeof (optval);
240
241 if (rd->sockStat == 0) { //close socket if open
242 j = close (rd->socket);
243 if (j > 0) {
244 snprintf (str, MXSTR, "Error closing socket %d | %m", sid);
245 factOut (kFatal, 771, str);
246 } else {
247 snprintf (str, MXSTR, "Succesfully closed socket %d", sid);
248 factOut (kInfo, 771, str);
249 }
250 }
251
252
253 if (flag < 0) {
254 free (rd->rBuf); //and never open again
255#ifdef EVTDEBUG
256 free (rd->xBuf); //and never open again
257#endif
258 rd->rBuf = NULL;
259 rd->sockStat = 99;
260 return 0;
261 }
262
263
264 if (flag == 0) { //generate address and buffer ...
265 rd->Port = port;
266 rd->SockAddr.sin_family = sockAddr->sin_family;
267 rd->SockAddr.sin_port = htons (port);
268 rd->SockAddr.sin_addr = sockAddr->sin_addr;
269
270#ifdef EVTDEBUG
271 rd->xBuf = malloc (sizeof (CNV_FACT));
272#endif
273 rd->rBuf = malloc (sizeof (CNV_FACT));
274 if (rd->rBuf == NULL) {
275 snprintf (str, MXSTR, "Could not create local buffer %d", sid);
276 factOut (kFatal, 774, str);
277 rd->sockStat = 77;
278 return -3;
279 }
280 }
281
282
283 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
284 snprintf (str, MXSTR, "Could not generate socket %d | %m", sid);
285 factOut (kFatal, 773, str);
286 rd->sockStat = 88;
287 return -2;
288 }
289 optval = 1;
290 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
291 snprintf (str, MXSTR, "Could not set keepalive %d | %m", sid);
292 factOut (kInfo, 173, str); //but continue anyhow
293 }
294 optval = 10; //start after 10 seconds
295 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
296 snprintf (str, MXSTR, "Could not set keepidle %d | %m", sid);
297 factOut (kInfo, 173, str); //but continue anyhow
298 }
299 optval = 10; //do every 10 seconds
300 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
301 snprintf (str, MXSTR, "Could not set keepintvl %d | %m", sid);
302 factOut (kInfo, 173, str); //but continue anyhow
303 }
304 optval = 2; //close after 2 unsuccessful tries
305 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
306 snprintf (str, MXSTR, "Could not set keepalive probes %d | %m", sid);
307 factOut (kInfo, 173, str); //but continue anyhow
308 }
309
310
311
312 snprintf (str, MXSTR, "Successfully generated socket %d ", sid);
313 factOut (kInfo, 773, str);
314 rd->sockStat = -1; //try to (re)open socket
315 return 0;
316
317} /*-----------------------------------------------------------------*/
318
319 /*-----------------------------------------------------------------*/
320
321
322
323
324int
325mBufInit ()
326{
327// initialize mBuffer (mark all entries as unused\empty)
328
329 int i;
330 uint32_t actime;
331
332 actime = g_actTime + 50000000;
333
334 for (i = 0; i < MAX_EVT * MAX_RUN; i++) {
335 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
336 mBuffer[i].runNum = 0;
337
338 evtCtrl.evtBuf[i] = -1;
339 evtCtrl.evtStat[i] = -1;
340 evtCtrl.pcTime[i] = actime; //initiate to far future
341 }
342
343
344 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
345
346 evtCtrl.frstPtr = 0;
347 evtCtrl.lastPtr = 0;
348
349 return 0;
350
351} /*-----------------------------------------------------------------*/
352
353
354
355
356int
357mBufEvt (int evID, uint runID, int nRoi[], int sk,
358 int fadlen, int trgTyp, int trgNum, int fadNum)
359{
360// generate a new Event into mBuffer:
361// make sure only complete Event are possible, so 'free' will always work
362// returns index into mBuffer[], or negative value in case of error
363// error: <-9000 if roi screwed up (not consistent with run)
364// <-8000 (not consistent with event)
365// <-7000 (not consistent with board)
366// < 0 if no space left
367
368 struct timeval *tv, atv;
369 tv = &atv;
370 uint32_t tsec, tusec;
371 uint oldest;
372 int jold;
373
374 int i, k, jr, b, evFree;
375 int headmem = 0;
376 size_t needmem = 0;
377
378
379 b = sk / 7;
380
381 if (nRoi[0] < 0 || nRoi[0] > 1024) {
382 snprintf (str, MXSTR, "illegal nRoi[0]: %d", nRoi[0]);
383 factOut (kError, 999, str);
384 gj.badRoiR++;
385 gj.badRoi[b]++;
386 return -9999;
387 }
388
389 for (jr = 1; jr < 8; jr++) {
390 if (nRoi[jr] != nRoi[0]) {
391 snprintf (str, MXSTR, "wrong nRoi[%d]: %d %d", jr, nRoi[jr],
392 nRoi[0]);
393 factOut (kError, 711, str);
394 gj.badRoiB++;
395 gj.badRoi[b]++;
396 return -7101;
397 }
398 }
399 if (nRoi[8] < nRoi[0]) {
400 snprintf (str, MXSTR, "wrong nRoi_TM: %d %d", nRoi[8], nRoi[0]);
401 factOut (kError, 712, str);
402 gj.badRoiB++;
403 gj.badRoi[b]++;
404 return -7102;
405 }
406
407
408 i = evID % MAX_EVT;
409 evFree = -1;
410
411 for (k = 0; k < MAX_RUN; k++) {
412 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
413 // is it ok ????
414 if (mBuffer[i].nRoi != nRoi[0]
415 || mBuffer[i].nRoiTM != nRoi[8]) {
416 snprintf (str, MXSTR, "illegal evt_roi %d %d ; %d %d",
417 nRoi[0], nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM);
418 factOut (kError, 821, str);
419 gj.badRoiE++;
420 gj.badRoi[b]++;
421 return -8201;
422 }
423// count for inconsistencies
424
425 if (mBuffer[i].trgNum != trgNum)
426 mBuffer[i].Errors[0]++;
427 if (mBuffer[i].fadNum != fadNum)
428 mBuffer[i].Errors[1]++;
429 if (mBuffer[i].trgTyp != trgTyp)
430 mBuffer[i].Errors[2]++;
431
432 //everything seems fine so far ==> use this slot ....
433 return i;
434 }
435 if (evFree < 0 && mBuffer[i].evNum < 0)
436 evFree = i;
437 i += MAX_EVT;
438 }
439
440
441 //event does not yet exist; create it
442 if (evFree < 0) { //no space available in ctrl
443 snprintf (str, MXSTR, "no control slot to keep event %d", evID);
444 factOut (kError, 881, str);
445 return -1;
446 }
447 i = evFree; //found free entry; use it ...
448
449 gettimeofday (tv, NULL);
450 tsec = atv.tv_sec;
451 tusec = atv.tv_usec;
452
453 //check if runId already registered in runCtrl
454 evFree = -1;
455 oldest = g_actTime + 1000;
456 jold = -1;
457 for (k = 0; k < MAX_RUN; k++) {
458 if (runCtrl[k].runId == runID) {
459// if (runCtrl[k].procId > 0) { //run is closed -> reject
460// snprintf (str, MXSTR, "skip event since run %d already closed", runID);
461// factOut (kInfo, 931, str);
462// return -21;
463// }
464
465 if (runCtrl[k].roi0 != nRoi[0]
466 || runCtrl[k].roi8 != nRoi[8]) {
467 snprintf (str, MXSTR, "illegal run_roi %d %d ; %d %d",
468 nRoi[0], nRoi[8], runCtrl[k].roi0, runCtrl[k].roi8);
469 factOut (kError, 931, str);
470 gj.badRoiR++;
471 gj.badRoi[b]++;
472 return -9301;
473 }
474 goto RUNFOUND;
475 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
476 evFree = k;
477 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
478 if (runCtrl[k].closeTime < oldest) {
479 oldest = runCtrl[k].closeTime;
480 jold = k;
481 }
482 }
483 }
484
485 if (evFree < 0 && jold < 0) {
486 snprintf (str, MXSTR, "not able to register the new run %d", runID);
487 factOut (kFatal, 883, str);
488 return -1001;
489 } else {
490 if (evFree < 0)
491 evFree = jold;
492 snprintf (str, MXSTR, "register new run %d(%d) roi: %d %d", runID,
493 evFree, nRoi[0], nRoi[8]);
494 factOut (kInfo, 503, str);
495 runCtrl[evFree].runId = runID;
496 runCtrl[evFree].roi0 = nRoi[0];
497 runCtrl[evFree].roi8 = nRoi[8];
498 runCtrl[evFree].fileId = -2;
499 runCtrl[evFree].procId = -2;
500 runCtrl[evFree].lastEvt = -1;
501 runCtrl[evFree].nextEvt = 0;
502 runCtrl[evFree].actEvt = 0;
503 runCtrl[evFree].procEvt = 0;
504 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
505 runCtrl[evFree].firstUsec = tusec;
506 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
507 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
508
509 runTail[evFree].nEventsOk =
510 runTail[evFree].nEventsRej =
511 runTail[evFree].nEventsBad =
512 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
513 }
514
515 RUNFOUND:
516
517 needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
518
519 headmem = NBOARDS * sizeof (PEVNT_HEADER);
520
521 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
522 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
523 if (gi_memStat > 0) {
524 gi_memStat = -99;
525 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
526 evID, sk);
527 factOut (kError, 882, str);
528 } else {
529 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
530 evID, sk);
531 factOut (kDebug, 882, str);
532 }
533 return -11;
534 }
535
536 mBuffer[i].FADhead = malloc (headmem);
537 if (mBuffer[i].FADhead == NULL) {
538 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
539 factOut (kError, 882, str);
540 return -12;
541 }
542
543 mBuffer[i].fEvent = malloc (needmem);
544 if (mBuffer[i].fEvent == NULL) {
545 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
546 factOut (kError, 882, str);
547 free (mBuffer[i].FADhead);
548 mBuffer[i].FADhead = NULL;
549 return -22;
550 }
551
552// if (gi_maxSize > 0) {
553// mBuffer[i].buffer = malloc (gi_maxSize);
554// if (mBuffer[i].buffer == NULL) {
555// snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
556// factOut (kError, 882, str);
557// free (mBuffer[i].FADhead);
558// mBuffer[i].FADhead = NULL;
559// free (mBuffer[i].fEvent);
560// mBuffer[i].fEvent = NULL;
561// return -32;
562// }
563// } else {
564// mBuffer[i].buffer = NULL;
565// }
566
567 //flag all boards as unused
568 mBuffer[i].nBoard = 0;
569 for (k = 0; k < NBOARDS; k++) {
570 mBuffer[i].board[k] = -1;
571 }
572 //flag all pixels as unused
573 for (k = 0; k < NPIX; k++) {
574 mBuffer[i].fEvent->StartPix[k] = -1;
575 }
576 //flag all TMark as unused
577 for (k = 0; k < NTMARK; k++) {
578 mBuffer[i].fEvent->StartTM[k] = -1;
579 }
580
581 mBuffer[i].fEvent->NumBoards = 0;
582 mBuffer[i].fEvent->PCUsec = tusec;
583 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
584 mBuffer[i].nRoi = nRoi[0];
585 mBuffer[i].nRoiTM = nRoi[8];
586 mBuffer[i].evNum = evID;
587 mBuffer[i].runNum = runID;
588 mBuffer[i].fadNum = fadNum;
589 mBuffer[i].trgNum = trgNum;
590 mBuffer[i].trgTyp = trgTyp;
591 mBuffer[i].evtLen = needmem;
592 mBuffer[i].Errors[0] =
593 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
594
595 gj.usdMem += needmem + headmem + gi_maxSize;
596 if (gj.usdMem > gj.maxMem)
597 gj.maxMem = gj.usdMem;
598
599 gj.bufTot++;
600 if (gj.bufTot > gj.maxEvt)
601 gj.maxEvt = gj.bufTot;
602
603 gj.rateNew++;
604
605 //register event in 'active list (reading)'
606
607 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
608 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
609 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
610 evtIdx[i] = evtCtrl.lastPtr;
611
612
613 snprintf (str, MXSTR,
614 "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", evID,
615 runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
616 factOut (kDebug, -11, str);
617 evtCtrl.lastPtr++;
618 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN)
619 evtCtrl.lastPtr = 0;
620
621 gi.evtGet++;
622
623 return i;
624
625} /*-----------------------------------------------------------------*/
626
627
628
629
630int
631mBufFree (int i)
632{
633//delete entry [i] from mBuffer:
634//(and make sure multiple calls do no harm ....)
635
636 int headmem = 0;
637 int evid;
638 size_t freemem = 0;
639
640 evid = mBuffer[i].evNum;
641 freemem = mBuffer[i].evtLen;
642
643 free (mBuffer[i].fEvent);
644 mBuffer[i].fEvent = NULL;
645
646 free (mBuffer[i].FADhead);
647 mBuffer[i].FADhead = NULL;
648
649// if (gi_maxSize > 0) {
650// free (mBuffer[i].buffer);
651// mBuffer[i].buffer = NULL;
652// }
653
654 headmem = NBOARDS * sizeof (PEVNT_HEADER);
655 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
656 mBuffer[i].runNum = 0;
657
658 gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
659 gj.bufTot--;
660
661 if (gi_memStat < 0) {
662 if (gj.usdMem <= 0.75 * gj.maxMem)
663 gi_memStat = +1;
664 }
665
666
667 return 0;
668
669} /*-----------------------------------------------------------------*/
670
671
672void
673resetEvtStat ()
674{
675 int i;
676
677 for (i = 0; i < MAX_SOCK; i++)
678 gi.numRead[i] = 0;
679
680 for (i = 0; i < NBOARDS; i++) {
681 gi.gotByte[i] = 0;
682 gi.gotErr[i] = 0;
683
684 }
685
686 gi.evtGet = 0; //#new Start of Events read
687 gi.evtTot = 0; //#complete Events read
688 gi.evtErr = 0; //#Events with Errors
689 gi.evtSkp = 0; //#Events incomplete (timeout)
690
691 gi.procTot = 0; //#Events processed
692 gi.procErr = 0; //#Events showed problem in processing
693 gi.procTrg = 0; //#Events accepted by SW trigger
694 gi.procSkp = 0; //#Events rejected by SW trigger
695
696 gi.feedTot = 0; //#Events used for feedBack system
697 gi.feedErr = 0; //#Events rejected by feedBack
698
699 gi.wrtTot = 0; //#Events written to disk
700 gi.wrtErr = 0; //#Events with write-error
701
702 gi.runOpen = 0; //#Runs opened
703 gi.runClose = 0; //#Runs closed
704 gi.runErr = 0; //#Runs with open/close errors
705
706 return;
707} /*-----------------------------------------------------------------*/
708
709
710
711void
712initReadFAD ()
713{
714 return;
715} /*-----------------------------------------------------------------*/
716
717
718
719void *
720readFAD (void *ptr)
721{
722/* *** main loop reading FAD data and sorting them to complete events */
723 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k;
724 int actBoards = 0, minLen;
725 int32_t jrd;
726 uint gi_SecTime; //time in seconds
727 int boardId, roi[9], drs, px, src, pixS, pixH, pixC, pixR, tmS;
728
729 int goodhed = 0;
730
731 char str[MXSTR];
732 int sockDef[NBOARDS]; //internal state of sockets
733 int jrdx;
734
735
736 struct timespec xwait;
737
738
739 struct timeval *tv, atv;
740 tv = &atv;
741 uint32_t tsec, tusec;
742
743
744 snprintf (str, MXSTR, "start initializing");
745 factOut (kInfo, -1, str);
746
747 int cpu = 7; //read thread
748 cpu_set_t mask;
749
750/* CPU_ZERO initializes all the bits in the mask to zero. */
751// CPU_ZERO (&mask);
752/* CPU_SET sets only the bit corresponding to cpu. */
753 cpu = 7;
754// CPU_SET (cpu, &mask);
755
756/* sched_setaffinity returns 0 in success */
757// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
758// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
759// factOut (kWarn, -1, str);
760// }
761
762
763 head_len = sizeof (PEVNT_HEADER);
764 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
765 minLen = head_len; //min #bytes needed to check header: full header for debug
766
767 start.S = 0xFB01;
768 stop.S = 0x04FE;
769
770/* initialize run control logics */
771 for (i = 0; i < MAX_RUN; i++) {
772 runCtrl[i].runId = 0;
773 runCtrl[i].fileId = -2;
774 runCtrl[i].procId = -2;
775 }
776 gi_resetS = gi_resetR = 9;
777 for (i = 0; i < NBOARDS; i++)
778 sockDef[i] = 0;
779
780 gi_actRun = 0;
781
782
783 START:
784 gettimeofday (tv, NULL);
785 g_actTime = tsec = atv.tv_sec;
786 g_actUsec = tusec = atv.tv_usec;
787 gi_myRun = g_actTime;
788 evtCtrl.frstPtr = 0;
789 evtCtrl.lastPtr = 0;
790
791 gi_SecTime = g_actTime;
792 gi_runStat = g_runStat;
793 gj.readStat = g_runStat;
794 numok = numok2 = 0;
795
796 int cntsock = 8 - NUMSOCK;
797
798 if (gi_resetS > 0) {
799 //make sure all sockets are preallocated as 'not exist'
800 for (i = 0; i < MAX_SOCK; i++) {
801 rd[i].socket = -1;
802 rd[i].sockStat = 99;
803 }
804
805 for (k = 0; k < NBOARDS; k++) {
806 gi_NumConnect[k] = 0;
807 gi.numConn[k] = 0;
808 gj.numConn[k] = 0;
809 gj.errConn[k] = 0;
810 gj.rateBytes[k] = 0;
811 gj.totBytes[k] = 0;
812 }
813
814 }
815
816
817 if (gi_resetR > 0) {
818 resetEvtStat ();
819 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
820 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
821 gj.totMem = g_maxMem;
822 gj.bufNew = gj.bufEvt = 0;
823 gj.badRoiE = gj.badRoiR = gj.badRoiB =
824 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
825
826 int b, p;
827 for (b = 0; b < NBOARDS; b++)
828 gj.badRoi[b] = 0;
829
830 mBufInit (); //initialize buffers
831
832 snprintf (str, MXSTR, "end initializing");
833 factOut (kInfo, -1, str);
834 }
835
836
837 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
838
839 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
840
841 gi_runStat = g_runStat;
842 gj.readStat = g_runStat;
843 gettimeofday (tv, NULL);
844 g_actTime = tsec = atv.tv_sec;
845 g_actUsec = tusec = atv.tv_usec;
846
847
848 int b, p, p0, s0, nch;
849 nch = 0;
850 for (b = 0; b < NBOARDS; b++) {
851 k = b * 7;
852 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
853 nch++;
854 gi_NumConnect[b] = 0; //must close all connections
855 gi.numConn[b] = 0;
856 gj.numConn[b] = 0;
857 if (sockDef[b] == 0)
858 s0 = 0; //sockets to be defined and opened
859 else if (g_port[b].sockDef == 0)
860 s0 = -1; //sockets to be destroyed
861 else
862 s0 = +1; //sockets to be closed and reopened
863
864 if (s0 == 0)
865 p0 = ntohs (g_port[b].sockAddr.sin_port);
866 else
867 p0 = 0;
868
869 for (p = p0 + 1; p < p0 + 8; p++) {
870 if (p < NUMSOCK)
871 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
872 k++;
873 }
874 sockDef[b] = g_port[b].sockDef;
875 }
876 }
877
878 if (nch > 0) {
879 actBoards = 0;
880 for (b = 0; b < NBOARDS; b++) {
881 if (sockDef[b] > 0)
882 actBoards++;
883 }
884 }
885
886
887 jrdx = 0;
888 numokx = 0;
889 numok = 0; //count number of succesfull actions
890
891 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read
892 b = i / 7;
893 p = i % 7;
894
895 if (p >= NUMSOCK) {;
896 } else {
897 if (sockDef[b] > 0)
898 s0 = +1;
899 else
900 s0 = -1;
901
902 if (rd[i].sockStat < 0) { //try to connect if not yet done
903 rd[i].sockStat = connect (rd[i].socket,
904 (struct sockaddr *) &rd[i].SockAddr,
905 sizeof (rd[i].SockAddr));
906 if (rd[i].sockStat == 0) { //successfull ==>
907 if (sockDef[b] > 0) {
908 rd[i].bufTyp = 0; // expect a header
909 rd[i].bufLen = frst_len; // max size to read at begining
910 } else {
911 rd[i].bufTyp = -1; // full data to be skipped
912 rd[i].bufLen = MAX_LEN; //huge for skipping
913 }
914 rd[i].bufPos = 0; // no byte read so far
915 rd[i].skip = 0; // start empty
916 gi_NumConnect[b] += cntsock;
917
918 gi.numConn[b]++;
919 gj.numConn[b]++;
920 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);
921 factOut (kInfo, -1, str);
922 }
923 }
924
925 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
926 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
927 numok++;
928 size_t maxread = rd[i].bufLen;
929 if (maxread > MAXREAD)
930 maxread = MAXREAD;
931
932 jrd =
933 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
934 maxread, MSG_DONTWAIT);
935
936 if (jrd > 0) {
937 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
938#ifdef EVTDEBUG
939 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
940 &rd[i].rBuf->B[rd[i].bufPos], jrd);
941 snprintf (str, MXSTR,
942 "read sock %3d bytes %5d len %5d first %d %d",
943 i, jrd, rd[i].bufLen,
944 rd[i].rBuf->B[rd[i].bufPos],
945 rd[i].rBuf->B[rd[i].bufPos + 1]);
946 factOut (kDebug, 301, str);
947#endif
948 }
949
950 if (jrd == 0) { //connection has closed ...
951 snprintf (str, MXSTR, "Socket %d closed by FAD", i);
952 factOut (kInfo, 441, str);
953 GenSock (s0, i, 0, NULL, &rd[i]);
954 gi.gotErr[b]++;
955 gi_NumConnect[b] -= cntsock;
956 gi.numConn[b]--;
957 gj.numConn[b]--;
958
959 } else if (jrd < 0) { //did not read anything
960 if (errno != EAGAIN && errno != EWOULDBLOCK) {
961 snprintf (str, MXSTR, "Error Reading from %d | %m",
962 i);
963 factOut (kError, 442, str);
964 gi.gotErr[b]++;
965 } else
966 numok--; //else nothing waiting to be read
967 jrd = 0;
968 }
969 } else {
970 jrd = 0; //did read nothing as requested
971 snprintf (str, MXSTR, "do not read from socket %d %d", i,
972 rd[i].bufLen);
973 factOut (kDebug, 301, str);
974 }
975
976 gi.gotByte[b] += jrd;
977 gj.rateBytes[b] += jrd;
978
979 if (jrd > 0) {
980 numokx++;
981 jrdx += jrd;
982 }
983
984
985 if (rd[i].bufTyp < 0) { // we are skipping this board ...
986 // just do nothing
987#ifdef EVTDEBUG
988 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
989 i);
990 factOut (kInfo, 301, str);
991#endif
992
993 } else if (rd[i].bufTyp > 0) { // we are reading data ...
994 if (jrd < rd[i].bufLen) { //not yet all read
995 rd[i].bufPos += jrd; //==> prepare for continuation
996 rd[i].bufLen -= jrd;
997 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
998 } else { //full dataset read
999 rd[i].bufLen = 0;
1000 rd[i].bufPos = rd[i].fadLen;
1001 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
1002 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
1003 gi.evtErr++;
1004 snprintf (str, MXSTR,
1005 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
1006 i, rd[i].fadLen, rd[i].evtID,
1007 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1008 rd[i].rBuf->B[rd[i].bufPos - 2],
1009 rd[i].rBuf->B[rd[i].bufPos - 1]);
1010 factOut (kError, 301, str);
1011 goto EndBuf;
1012
1013#ifdef EVTDEBUG
1014 } else {
1015 snprintf (str, MXSTR,
1016 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
1017 i, rd[i].fadLen, rd[i].rBuf->B[0],
1018 rd[i].rBuf->B[1], start.B[1], start.B[0],
1019 rd[i].rBuf->B[rd[i].bufPos - 2],
1020 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
1021 stop.B[0]);
1022 factOut (kDebug, 301, str);
1023#endif
1024 }
1025
1026 if (jrd > 0)
1027 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1028
1029 //we have a complete buffer, copy to WORK area
1030 int jr;
1031 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1032 for (jr = 0; jr < 9; jr++) {
1033 roi[jr] =
1034 ntohs (rd[i].rBuf->S[head_len / 2 + 2 +
1035 jr * (roi[0] + 4)]);
1036 }
1037 //get index into mBuffer for this event (create if needed)
1038
1039 int actid;
1040 if (g_useFTM > 0)
1041 actid = rd[i].evtID;
1042 else
1043 actid = rd[i].ftmID;
1044
1045 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1046 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1047 rd[i].evtID);
1048
1049 if (evID < -1000) {
1050 goto EndBuf; //not usable board/event/run --> skip it
1051 }
1052 if (evID < 0) { //no space left, retry later
1053#ifdef EVTDEBUG
1054 if (rd[i].bufLen != 0) {
1055 snprintf (str, MXSTR, "something screwed up");
1056 factOut (kFatal, 1, str);
1057 }
1058#endif
1059 xwait.tv_sec = 0;
1060 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1061 nanosleep (&xwait, NULL);
1062 goto EndBuf1; //hope there is free space next round
1063 }
1064 //we have a valid entry in mBuffer[]; fill it
1065
1066#ifdef EVTDEBUG
1067 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1068 rd[i].fadLen);
1069 if (xchk != 0) {
1070 snprintf (str, MXSTR,
1071 "ERROR OVERWRITE %d %d on port %d", xchk,
1072 rd[i].fadLen, i);
1073 factOut (kFatal, 1, str);
1074
1075 uint iq;
1076 for (iq = 0; iq < rd[i].fadLen; iq++) {
1077 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1078 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i,
1079 iq, rd[i].rBuf->B[iq],
1080 rd[i].xBuf->B[iq]);
1081 factOut (kFatal, 1, str);
1082 }
1083 }
1084 }
1085#endif
1086 int qncpy = 0;
1087 boardId = b;
1088 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1089 int fadCrate = fadBoard / 256;
1090 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1091 snprintf (str, MXSTR, "wrong Board ID %d %d %d",
1092 fadCrate, fadBoard % 256, boardId);
1093 factOut (kWarn, 301, str);
1094 }
1095 if (mBuffer[evID].board[boardId] != -1) {
1096 snprintf (str, MXSTR,
1097 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ",
1098 evID, boardId, i, rd[i].fadLen,
1099 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1100 rd[i].rBuf->B[rd[i].bufPos - 2],
1101 rd[i].rBuf->B[rd[i].bufPos - 1]);
1102 factOut (kWarn, 501, str);
1103 goto EndBuf; //--> skip Board
1104 }
1105
1106 int iDx = evtIdx[evID]; //index into evtCtrl
1107
1108 memcpy (&mBuffer[evID].FADhead[boardId].
1109 start_package_flag, &rd[i].rBuf->S[0], head_len);
1110 qncpy += head_len;
1111
1112 src = head_len / 2;
1113 for (px = 0; px < 9; px++) { //different sort in FAD board.....
1114 for (drs = 0; drs < 4; drs++) {
1115 pixH = ntohs (rd[i].rBuf->S[src++]); // ID
1116 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell
1117 pixR = ntohs (rd[i].rBuf->S[src++]); // roi
1118//here we should check if pixH is correct ....
1119
1120 pixS = boardId * 36 + drs * 9 + px;
1121 src++;
1122
1123
1124 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1125 dest = pixS * roi[0];
1126 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1127 &rd[i].rBuf->S[src], roi[0] * 2);
1128 qncpy += roi[0] * 2;
1129 src += pixR;
1130
1131 if (px == 8) {
1132 tmS = boardId * 4 + drs;
1133 if (pixR > roi[0]) { //and we have additional TM info
1134 dest = tmS * roi[0] + NPIX * roi[0];
1135 int srcT = src - roi[0];
1136 mBuffer[evID].fEvent->StartTM[tmS] =
1137 (pixC + pixR - roi[0]) % 1024;
1138 memcpy (&mBuffer[evID].fEvent->
1139 Adc_Data[dest], &rd[i].rBuf->S[srcT],
1140 roi[0] * 2);
1141 qncpy += roi[0] * 2;
1142 } else {
1143 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1144 }
1145 }
1146 }
1147 } // now we have stored a new board contents into Event structure
1148
1149 mBuffer[evID].fEvent->NumBoards++;
1150 mBuffer[evID].board[boardId] = boardId;
1151 evtCtrl.evtStat[iDx]++;
1152 evtCtrl.pcTime[iDx] = g_actTime;
1153
1154 if (++mBuffer[evID].nBoard >= actBoards) {
1155 int qnrun = 0;
1156 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ???
1157 actrun = mBuffer[evID].runNum;
1158 int ir;
1159 for (ir = 0; ir < MAX_RUN; ir++) {
1160 qnrun++;
1161 if (runCtrl[ir].runId == actrun) {
1162 if (++runCtrl[ir].lastEvt == 0) {
1163 gotNewRun (actrun, mBuffer[evID].FADhead);
1164 snprintf (str, MXSTR,
1165 "gotNewRun %d (ev %d)",
1166 mBuffer[evID].runNum,
1167 mBuffer[evID].evNum);
1168 factOut (kInfo, 1, str);
1169 break;
1170 }
1171 }
1172 }
1173 }
1174 snprintf (str, MXSTR,
1175 "%5d complete event roi %4d roiTM %d cpy %8d %5d",
1176 mBuffer[evID].evNum, roi[0],
1177 roi[8] - roi[0], qncpy, qnrun);
1178 factOut (kDebug, -1, str);
1179
1180 //complete event read ---> flag for next processing
1181 evtCtrl.evtStat[iDx] = 99;
1182 gi.evtTot++;
1183 }
1184
1185 EndBuf:
1186 rd[i].bufTyp = 0; //ready to read next header
1187 rd[i].bufLen = frst_len;
1188 rd[i].bufPos = 0;
1189 EndBuf1:
1190 ;
1191 }
1192
1193 } else { //we are reading event header
1194 rd[i].bufPos += jrd;
1195 rd[i].bufLen -= jrd;
1196 if (rd[i].bufPos >= minLen) { //sufficient data to take action
1197 //check if startflag correct; else shift block ....
1198 for (k = 0; k < rd[i].bufPos - 1; k++) {
1199 if (rd[i].rBuf->B[k] == start.B[1]
1200 && rd[i].rBuf->B[k + 1] == start.B[0])
1201 break;
1202 }
1203 rd[i].skip += k;
1204
1205 if (k >= rd[i].bufPos - 1) { //no start of header found
1206 rd[i].bufPos = 0;
1207 rd[i].bufLen = head_len;
1208 } else if (k > 0) {
1209 rd[i].bufPos -= k;
1210 rd[i].bufLen += k;
1211 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1212 rd[i].bufPos);
1213#ifdef EVTDEBUG
1214 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1215 rd[i].bufPos);
1216#endif
1217 }
1218 if (rd[i].bufPos >= minLen) {
1219 if (rd[i].skip > 0) {
1220 snprintf (str, MXSTR, "skipped %d bytes on port%d",
1221 rd[i].skip, i);
1222 factOut (kInfo, 666, str);
1223 rd[i].skip = 0;
1224 }
1225 goodhed++;
1226 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2;
1227 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]);
1228 rd[i].ftmTyp = ntohl (rd[i].rBuf->S[5]);
1229 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt)
1230 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt)
1231 rd[i].runID = ntohl (rd[i].rBuf->I[11]);
1232 rd[i].bufTyp = 1; //ready to read full record
1233 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1234
1235 int fadboard = ntohs (rd[i].rBuf->S[12]);
1236 int fadcrate = fadboard / 256;
1237 fadboard = (fadcrate * 10 + fadboard % 256);
1238#ifdef EVTDEBUG
1239 snprintf (str, MXSTR,
1240 "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
1241 rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
1242 rd[i].runID, fadboard, jrd);
1243 factOut (kDebug, 1, str);
1244#endif
1245
1246 if (rd[i].runID == 0)
1247 rd[i].runID = gi_myRun;
1248
1249 if (rd[i].bufLen <= head_len
1250 || rd[i].bufLen > MAX_LEN) {
1251 snprintf (str, MXSTR,
1252 "illegal event-length on port %d", i);
1253 factOut (kFatal, 881, str);
1254 rd[i].bufLen = 100000; //?
1255 }
1256 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1257 debugHead (i, fadBoard, rd[i].rBuf);
1258 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1259 } else {
1260 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1261 }
1262 } else {
1263 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1264 }
1265
1266 } //end interpreting last read
1267 }
1268 } //end of successful read anything
1269 } //finished trying to read all sockets
1270
1271#ifdef EVTDEBUG
1272 snprintf (str, MXSTR, "Loop ---- %3d --- %8d", numokx, jrdx);
1273 factOut (kDebug, -1, str);
1274#endif
1275
1276 gi.numRead[numok]++;
1277
1278 g_actTime = time (NULL);
1279 if (g_actTime > gi_SecTime) {
1280 gi_SecTime = g_actTime;
1281
1282
1283 //loop over all active events and flag those older than read-timeout
1284 //delete those that are written to disk ....
1285
1286 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1287 if (kd < 0)
1288 kd += (MAX_EVT * MAX_RUN);
1289
1290 gj.bufNew = gj.bufEvt = 0;
1291 int k1 = evtCtrl.frstPtr;
1292 for (k = k1; k < (k1 + kd); k++) {
1293 int k0 = k % (MAX_EVT * MAX_RUN);
1294//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1295
1296 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) {
1297 gj.bufNew++; //incomplete event in Buffer
1298 if (evtCtrl.evtStat[k0] < 90
1299 && evtCtrl.pcTime[k0] < g_actTime - 10) {
1300 int id = evtCtrl.evtBuf[k0];
1301 int ic,ib,ik,jb;
1302 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d",
1303 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0,
1304 evtCtrl.evtStat[k0]);
1305 factOut (kWarn, 601, str);
1306
1307 ik=0;
1308 for (ib=0; ib<NBOARDS; ib++) {
1309 if (ib%10==0) {
1310 snprintf (&str[ik], MXSTR, "'");
1311 ik++;
1312 }
1313 jb = mBuffer[id].board[ib];
1314 if ( jb<0 ) {
1315 snprintf (&str[ik], MXSTR, ".");
1316 ik++;
1317 } else {
1318 snprintf (&str[ik], MXSTR, "%d",jb%10);
1319 ik+=1;
1320 }
1321 }
1322 snprintf (&str[ik], MXSTR, "'");
1323 factOut (kWarn, 601, str);
1324
1325 evtCtrl.evtStat[k0] = 93; //timeout for incomplete events
1326 gi.evtSkp++;
1327 gi.evtTot++;
1328 gj.evtSkip++;
1329 }
1330 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete'
1331 || evtCtrl.evtStat[k0] == 0) { //'useless'
1332
1333 int id = evtCtrl.evtBuf[k0];
1334 snprintf (str, MXSTR, "%5d free event buffer, nb=%3d",
1335 mBuffer[id].evNum, mBuffer[id].nBoard);
1336 factOut (kDebug, -1, str);
1337 mBufFree (id); //event written--> free memory
1338 evtCtrl.evtStat[k0] = -1;
1339 gj.evtWrite++;
1340 gj.rateWrite++;
1341 } else if (evtCtrl.evtStat[k0] >= 95) {
1342 gj.bufEvt++; //complete event in Buffer
1343 }
1344
1345 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) {
1346 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1347 }
1348 }
1349
1350
1351 gj.deltaT = 1000; //temporary, must be improved
1352
1353 int b;
1354 for (b = 0; b < NBOARDS; b++)
1355 gj.totBytes[b] += gj.rateBytes[b];
1356 gj.totMem = g_maxMem;
1357 if (gj.maxMem > gj.xxxMem)
1358 gj.xxxMem = gj.maxMem;
1359 if (gj.maxEvt > gj.xxxEvt)
1360 gj.xxxEvt = gj.maxEvt;
1361
1362 factStat (gj);
1363 factStatNew (gi);
1364 gj.rateNew = gj.rateWrite = 0;
1365 gj.maxMem = gj.usdMem;
1366 gj.maxEvt = gj.bufTot;
1367 for (b = 0; b < NBOARDS; b++)
1368 gj.rateBytes[b] = 0;
1369 }
1370
1371 if (numok > 0)
1372 numok2 = 0;
1373 else if (numok2++ > 3) {
1374 if (g_runStat == 1) {
1375 xwait.tv_sec = 1;
1376 xwait.tv_nsec = 0; // hibernate for 1 sec
1377 } else {
1378 xwait.tv_sec = 0;
1379 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1380 }
1381 nanosleep (&xwait, NULL);
1382 }
1383
1384 } //and do next loop over all sockets ...
1385
1386
1387 snprintf (str, MXSTR, "stop reading ... RESET=%d", g_reset);
1388 factOut (kInfo, -1, str);
1389
1390 if (g_reset > 0) {
1391 gi_reset = g_reset;
1392 gi_resetR = gi_reset % 10; //shall we stop reading ?
1393 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1394 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1395 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1396 g_reset = 0;
1397 } else {
1398 gi_reset = 0;
1399 if (g_runStat == -1)
1400 gi_resetR = 1;
1401 else
1402 gi_resetR = 7;
1403 gi_resetS = 7; //close all sockets
1404 gi_resetW = 7; //close all files
1405 gi_resetX = 0;
1406
1407 //inform others we have to quit ....
1408 gi_runStat = -11; //inform all that no update to happen any more
1409 gj.readStat = -11; //inform all that no update to happen any more
1410 }
1411
1412 if (gi_resetS > 0) {
1413 //must close all open sockets ...
1414 snprintf (str, MXSTR, "close all sockets ...");
1415 factOut (kInfo, -1, str);
1416 for (i = 0; i < MAX_SOCK; i++) {
1417 if (rd[i].sockStat == 0) {
1418 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1419 if (i % 7 == 0) {
1420 gi_NumConnect[i / 7] -= cntsock;
1421 gi.numConn[i / 7]--;
1422 gj.numConn[i / 7]--;
1423 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1424 rd[i / 7].sockStat = -1; //and try to open asap
1425 }
1426 }
1427 }
1428 }
1429
1430
1431 if (gi_resetR > 0) {
1432 //flag all events as 'read finished'
1433 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1434 if (kd < 0)
1435 kd += (MAX_EVT * MAX_RUN);
1436
1437 int k1 = evtCtrl.frstPtr;
1438 for (k = k1; k < (k1 + kd); k++) {
1439 int k0 = k % (MAX_EVT * MAX_RUN);
1440 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) {
1441 evtCtrl.evtStat[k0] = 91;
1442 gi.evtSkp++;
1443 gi.evtTot++;
1444 }
1445 }
1446
1447 xwait.tv_sec = 0;
1448 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1449 nanosleep (&xwait, NULL);
1450
1451 //and clear all buffers (might have to wait until all others are done)
1452 int minclear;
1453 if (gi_resetR == 1) {
1454 minclear = 900;
1455 snprintf (str, MXSTR, "drain all buffers ...");
1456 } else {
1457 minclear = 0;
1458 snprintf (str, MXSTR, "flush all buffers ...");
1459 }
1460 factOut (kInfo, -1, str);
1461
1462 int numclear = 1;
1463 while (numclear > 0) {
1464 numclear = 0;
1465 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1466 if (kd < 0)
1467 kd += (MAX_EVT * MAX_RUN);
1468
1469 int k1 = evtCtrl.frstPtr;
1470 for (k = k1; k < (k1 + kd); k++) {
1471 int k0 = k % (MAX_EVT * MAX_RUN);
1472 if (evtCtrl.evtStat[k0] > minclear) {
1473 int id = evtCtrl.evtBuf[k0];
1474 snprintf (str, MXSTR, "ev %5d free event buffer, nb=%3d",
1475 mBuffer[id].evNum, mBuffer[id].nBoard);
1476 factOut (kDebug, -1, str);
1477 mBufFree (id); //event written--> free memory
1478 evtCtrl.evtStat[k0] = -1;
1479 } else if (evtCtrl.evtStat[k0] > 0)
1480 numclear++; //writing is still ongoing...
1481
1482 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1483 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1484 }
1485
1486 xwait.tv_sec = 0;
1487 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1488 nanosleep (&xwait, NULL);
1489 }
1490 }
1491
1492 if (gi_reset > 0) {
1493 if (gi_resetW > 0) {
1494 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1495 }
1496 if (gi_resetX > 0) {
1497 xwait.tv_sec = gi_resetX;
1498 xwait.tv_nsec = 0;
1499 nanosleep (&xwait, NULL);
1500 }
1501
1502 snprintf (str, MXSTR, "Continue read Process ...");
1503 factOut (kInfo, -1, str);
1504 gi_reset = 0;
1505 goto START;
1506 }
1507
1508
1509
1510 snprintf (str, MXSTR, "Exit read Process ...");
1511 factOut (kInfo, -1, str);
1512 gi_runStat = -99;
1513 gj.readStat = -99;
1514 factStat (gj);
1515 factStatNew (gi);
1516 return 0;
1517
1518} /*-----------------------------------------------------------------*/
1519
1520
1521void *
1522subProc (void *thrid)
1523{
1524 char str[MXSTR];
1525 int threadID, status, numWait, numProc, kd, k1, k0, k, jret;
1526 struct timespec xwait;
1527
1528 threadID = (int) thrid;
1529
1530 snprintf (str, MXSTR, "Starting sub-process-thread %d", threadID);
1531 factOut (kInfo, -1, str);
1532
1533 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1534 numWait = numProc = 0;
1535 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1536 if (kd < 0)
1537 kd += (MAX_EVT * MAX_RUN);
1538
1539 int k1 = evtCtrl.frstPtr;
1540 for (k = k1; k < (k1 + kd); k++) {
1541 int k0 = k % (MAX_EVT * MAX_RUN);
1542
1543 if (evtCtrl.evtStat[k0] == 1000 + threadID) {
1544 if (gi_resetR > 1) { //we are asked to flush buffers asap
1545 jret = 9100; //flag to be deleted
1546 } else {
1547 int id = evtCtrl.evtBuf[k0];
1548 jret =
1549 subProcEvt (threadID, mBuffer[id].FADhead,
1550 mBuffer[id].fEvent, mBuffer[id].mBoard, gi_runPtr );
1551
1552 if (jret <= threadID) {
1553 snprintf (str, MXSTR,
1554 "process %d wants to send to process %d",
1555 threadID, jret);
1556 factOut (kError, -1, str);
1557 jret = 5300;
1558 } else if (jret <= 0)
1559 jret = 9200 + threadID; //flag as 'to be deleted'
1560 else if (jret >= gi_maxProc)
1561 jret = 5200 + threadID; //flag as 'to be written'
1562 else
1563 jret = 1000 + jret; //flag for next proces
1564 }
1565 evtCtrl.evtStat[k0] = jret;
1566 numProc++;
1567 } else if (evtCtrl.evtStat[k0] < 1000 + threadID)
1568 numWait++;
1569 }
1570
1571 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1572 snprintf (str, MXSTR, "Exit subProcessing Process %d", threadID);
1573 factOut (kInfo, -1, str);
1574 return 0;
1575 }
1576 if (numProc == 0) {
1577 //seems we have nothing to do, so sleep a little
1578 xwait.tv_sec = 0;
1579 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1580 nanosleep (&xwait, NULL);
1581 }
1582 }
1583
1584 snprintf (str, MXSTR, "Ending sub-process-thread %d", threadID);
1585 factOut (kInfo, -1, str);
1586 return;
1587} /*-----------------------------------------------------------------*/
1588
1589
1590void *
1591procEvt (void *ptr)
1592{
1593/* *** main loop processing file, including SW-trigger */
1594 int numProc, numWait, iskip;
1595 int k, status, j;
1596 struct timespec xwait;
1597 char str[MXSTR];
1598
1599
1600
1601 int lastRun = 0; //usually run from last event still valid
1602
1603 cpu_set_t mask;
1604 int cpu = 1; //process thread (will be several in final version)
1605
1606 snprintf (str, MXSTR, "Starting process-thread with %d subprocess",
1607 gi_maxProc);
1608 factOut (kInfo, -1, str);
1609
1610/* CPU_ZERO initializes all the bits in the mask to zero. */
1611// CPU_ZERO (&mask);
1612/* CPU_SET sets only the bit corresponding to cpu. */
1613// CPU_SET( 0 , &mask ); leave for system
1614// CPU_SET( 1 , &mask ); used by write process
1615// CPU_SET (2, &mask);
1616// CPU_SET (3, &mask);
1617// CPU_SET (4, &mask);
1618// CPU_SET (5, &mask);
1619// CPU_SET (6, &mask);
1620// CPU_SET( 7 , &mask ); used by read process
1621/* sched_setaffinity returns 0 in success */
1622// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1623// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1624// factOut (kWarn, -1, str);
1625// }
1626
1627
1628 pthread_t thread[100];
1629 int th_ret[100];
1630
1631 for (k = 0; k < gi_maxProc; k++) {
1632 th_ret[k] = pthread_create (&thread[k], NULL, subProc, (void *) k);
1633 }
1634
1635 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1636
1637 numWait = numProc = 0;
1638 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1639 if (kd < 0)
1640 kd += (MAX_EVT * MAX_RUN);
1641
1642 int k1 = evtCtrl.frstPtr;
1643 for (k = k1; k < (k1 + kd); k++) {
1644 int k0 = k % (MAX_EVT * MAX_RUN);
1645//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1646 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) {
1647
1648 if (gi_resetR > 1) { //we are asked to flush buffers asap
1649 evtCtrl.evtStat[k0] = 9991;
1650 } else {
1651
1652 int id = evtCtrl.evtBuf[k0];
1653 uint32_t irun = mBuffer[id].runNum;
1654 int32_t ievt = mBuffer[id].evNum;
1655 iskip = 0 ;
1656 if (irun != gi_actRun) { // we should open a new run ?
1657 if (irun < gi_actRun) { // very old event; skip it
1658 iskip = -1 ; // but anyhow checkEvent() will be done
1659 snprintf (str, MXSTR,
1660 "P event %d from old run???", ievt,irun);
1661 factOut (kWarn, 901, str);
1662 } else if (gi_actRun != 0) { // but there is still old one open ?
1663
1664// // // // // // //MUST WAIT UNTIL PENDING OLD EVENTS PROCESSED ...
1665// loop over all waiting events; and decide what to do ....
1666 int q,q0 ;
1667 int qcnt=0;
1668 for (q=k1; q < (k1 + kd); q++) {
1669 q0 = q % (MAX_EVT * MAX_RUN);
1670 if (evtCtrl.evtStat[q0] > 0
1671 && evtCtrl.evtStat[q0] < 9000) {
1672 //this event is still active in the queue
1673 int qd = evtCtrl.evtBuf[q0];
1674 uint32_t qrun = mBuffer[qd].runNum;
1675 if (qrun < irun ) {
1676 if (evtCtrl.evtStat[q0] < 92) {
1677 //incomplete old event -> reduce timeout
1678 evtCtrl.pcTime[q0] = 0 ;
1679 }
1680 qcnt++ ;
1681 }
1682 }
1683 }
1684 if (qcnt == 0 ) {
1685 runEnd (gi_actRun, gi_runPtr );
1686 snprintf (str, MXSTR,
1687 "P run %d ended", gi_actRun);
1688 factOut (kWarn, 901, str);
1689 gi_actRun = 0; //there are no events from old runs left
1690 }
1691 }
1692
1693
1694 if (gi_actRun == 0) { // now we can try to open new run
1695 for (j = 0; j < MAX_RUN; j++) {
1696 if (runCtrl[j].runId == irun)
1697 break;
1698 }
1699 if (j >= MAX_RUN) {
1700 snprintf (str, MXSTR,
1701 "P error: can not find run %d for event %d in %d",
1702 irun, ievt, id);
1703 factOut (kFatal, 901, str);
1704 j = 0;
1705 }
1706
1707 if (runCtrl[j].fileId == 0) {
1708 snprintf (str, MXSTR,
1709 "P run %d is already open ???", irun);
1710 factOut (kWarn, 901, str);
1711 } else if (runCtrl[j].fileId > 0) {
1712 snprintf (str, MXSTR,
1713 "P run %d is already closed ???", irun);
1714 factOut (kWarn, 901, str);
1715 } else { // now open new run
1716
1717 actRun.Version = 1;
1718 actRun.RunType = -1; //to be adapted
1719
1720 actRun.Nroi = runCtrl[j].roi0;
1721 actRun.NroiTM = runCtrl[j].roi8;
1722 if (actRun.Nroi == actRun.NroiTM)
1723 actRun.NroiTM = 0;
1724 actRun.RunTime = runCtrl[j].firstTime;
1725 actRun.RunUsec = runCtrl[j].firstTime;
1726 actRun.NBoard = NBOARDS;
1727 actRun.NPix = NPIX;
1728 actRun.NTm = NTMARK;
1729 actRun.Nroi = mBuffer[id].nRoi;
1730 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1731 NBOARDS * sizeof (PEVNT_HEADER));
1732
1733 gi_actRun = irun;
1734 gi_runPtr =
1735 runStart (irun, &actRun, sizeof (actRun));
1736 if (gi_runPtr == NULL) {
1737 snprintf (str, MXSTR,
1738 "P could not start run %d", irun);
1739 factOut (kFatal, 502, str);
1740 } else {
1741 snprintf (str, MXSTR, "P started new run %d",
1742 irun);
1743 factOut (kInfo, -1, str);
1744 }
1745
1746 runCtrl[j].fileHd =
1747 runOpen (irun, &actRun, sizeof (actRun));
1748 if (runCtrl[j].fileHd == NULL) {
1749 snprintf (str, MXSTR,
1750 "P could not open a file for run %d",
1751 irun);
1752 factOut (kError, 502, str);
1753 runCtrl[j].fileId = 91;
1754 runCtrl[j].procId = 91;
1755 } else {
1756 snprintf (str, MXSTR,
1757 "P opened new run_file %d evt %d", irun,
1758 ievt);
1759 factOut (kInfo, -1, str);
1760 runCtrl[j].fileId = 0;
1761 runCtrl[j].procId = 0;
1762 }
1763 }
1764 }
1765 }
1766
1767 if (irun == gi_actRun) {
1768// now we are sure the run is ready and we can start processing the event
1769 id = evtCtrl.evtBuf[k0];
1770 int itevt = mBuffer[id].trgNum;
1771 int itrg = mBuffer[id].trgTyp;
1772 int roi = mBuffer[id].nRoi;
1773 int roiTM = mBuffer[id].nRoiTM;
1774
1775 //make sure unused pixels/tmarks are cleared to zero
1776 if (roiTM == roi)
1777 roiTM = 0;
1778 int ip, it, dest, ib;
1779 for (ip = 0; ip < NPIX; ip++) {
1780 if (mBuffer[id].fEvent->StartPix[ip] == -1) {
1781 dest = ip * roi;
1782 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1783 }
1784 }
1785 for (it = 0; it < NTMARK; it++) {
1786 if (mBuffer[id].fEvent->StartTM[it] == -1) {
1787 dest = it * roi + NPIX * roi;
1788 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1789 }
1790 }
1791
1792
1793 //and set correct event header
1794 mBuffer[id].fEvent->Roi = roi;
1795 mBuffer[id].fEvent->RoiTM = roiTM;
1796 mBuffer[id].fEvent->EventNum = ievt;
1797 mBuffer[id].fEvent->TriggerNum = itevt;
1798 mBuffer[id].fEvent->TriggerType = itrg;
1799 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
1800 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
1801 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
1802 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
1803 mBuffer[id].fEvent->SoftTrig = 0;
1804
1805 int mboard = -1;
1806 for (ib = 0; ib < NBOARDS; ib++) {
1807 if (mBuffer[id].board[ib] == -1) { //board is not read
1808 mBuffer[id].FADhead[ib].start_package_flag = 0;
1809 mBuffer[id].fEvent->BoardTime[ib] = 0;
1810 } else {
1811 mBuffer[id].fEvent->BoardTime[ib] =
1812 ntohl (mBuffer[id].FADhead[ib].time);
1813 if (mboard < 0)
1814 mboard = ib;
1815 }
1816 }
1817 mBuffer[id].mBoard = mboard;
1818 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
1819 mBuffer[id].fEvent, mboard);
1820 gi.procTot++;
1821 numProc++;
1822
1823 if (i < 0 || iskip < 0 ) {
1824 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
1825 gi.procErr++;
1826 } else {
1827 evtCtrl.evtStat[k0] = 1000; //flag event to be processed
1828 runCtrl[j].procEvt++;
1829 if (runCtrl[j].procId != 0)
1830 mBuffer[id].toWrite = -1; //flag event to be processed but now written
1831 else
1832 mBuffer[id].toWrite = 0;
1833 }
1834 }
1835 }
1836 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) {
1837 numWait++;
1838 }
1839 }
1840
1841 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1842 snprintf (str, MXSTR, "Exit Processing Process ...");
1843 factOut (kInfo, -1, str);
1844 gp_runStat = -22; //==> we should exit
1845 gj.procStat = -22; //==> we should exit
1846 return 0;
1847 }
1848
1849 if (numProc == 0) {
1850 //seems we have nothing to do, so sleep a little
1851 xwait.tv_sec = 0;
1852 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1853 nanosleep (&xwait, NULL);
1854 }
1855 gp_runStat = gi_runStat;
1856 gj.procStat = gj.readStat;
1857
1858 }
1859
1860 //we are asked to abort asap ==> must flag all remaining events
1861 // when gi_runStat claims that all events are in the buffer...
1862
1863 snprintf (str, MXSTR, "Abort Processing Process ...");
1864 factOut (kInfo, -1, str);
1865 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1866 if (kd < 0)
1867 kd += (MAX_EVT * MAX_RUN);
1868
1869 for (k = 0; k < gi_maxProc; k++) {
1870 pthread_join (thread[k], (void **) &status);
1871 }
1872
1873 int k1 = evtCtrl.frstPtr;
1874 for (k = k1; k < (k1 + kd); k++) {
1875 int k0 = k % (MAX_EVT * MAX_RUN);
1876 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {
1877 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
1878 }
1879 }
1880
1881 gp_runStat = -99;
1882 gj.procStat = -99;
1883
1884 return 0;
1885
1886} /*-----------------------------------------------------------------*/
1887
1888int
1889CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
1890{
1891/* close run runId (all all runs if runId=0) */
1892/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1893 int j;
1894
1895
1896 if (runId == 0) {
1897 for (j = 0; j < MAX_RUN; j++) {
1898 if (runCtrl[j].fileId == 0) { //run is open
1899 runCtrl[j].closeTime = closeTime;
1900 runCtrl[j].maxEvt = maxEvt;
1901 }
1902 }
1903 return 0;
1904 }
1905
1906 for (j = 0; j < MAX_RUN; j++) {
1907 if (runCtrl[j].runId == runId) {
1908 if (runCtrl[j].fileId == 0) { //run is open
1909 runCtrl[j].closeTime = closeTime;
1910 runCtrl[j].maxEvt = maxEvt;
1911 return 0;
1912 } else if (runCtrl[j].fileId < 0) { //run not yet opened
1913 runCtrl[j].closeTime = closeTime;
1914 runCtrl[j].maxEvt = maxEvt;
1915 return +1;
1916 } else { // run already closed
1917 return +2;
1918 }
1919 }
1920 } //we only reach here if the run was never created
1921 return -1;
1922
1923} /*-----------------------------------------------------------------*/
1924
1925
1926void *
1927writeEvt (void *ptr)
1928{
1929/* *** main loop writing event (including opening and closing run-files */
1930
1931 int numWrite, numWait;
1932 int k, j, i;
1933 struct timespec xwait;
1934 char str[MXSTR];
1935
1936 cpu_set_t mask;
1937 int cpu = 1; //write thread
1938
1939 snprintf (str, MXSTR, "Starting write-thread");
1940 factOut (kInfo, -1, str);
1941
1942/* CPU_ZERO initializes all the bits in the mask to zero. */
1943// CPU_ZERO (&mask);
1944/* CPU_SET sets only the bit corresponding to cpu. */
1945// CPU_SET (cpu, &mask);
1946/* sched_setaffinity returns 0 in success */
1947// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1948// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
1949// }
1950
1951 int lastRun = 0; //usually run from last event still valid
1952
1953 while (g_runStat > -2) {
1954
1955 numWait = numWrite = 0;
1956 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1957 if (kd < 0)
1958 kd += (MAX_EVT * MAX_RUN);
1959
1960 int k1 = evtCtrl.frstPtr;
1961 for (k = k1; k < (k1 + kd); k++) {
1962 int k0 = k % (MAX_EVT * MAX_RUN);
1963//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1964 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9900) {
1965
1966 if (gi_resetR > 1) { //we must drain the buffer asap
1967 evtCtrl.evtStat[k0] = 9904;
1968 } else {
1969
1970
1971 int id = evtCtrl.evtBuf[k0];
1972 uint32_t irun = mBuffer[id].runNum;
1973 int32_t ievt = mBuffer[id].evNum;
1974
1975 gi.wrtTot++;
1976 if (runCtrl[lastRun].runId == irun) {
1977 j = lastRun;
1978 } else {
1979 //check which fileID to use (or open if needed)
1980 for (j = 0; j < MAX_RUN; j++) {
1981 if (runCtrl[j].runId == irun)
1982 break;
1983 }
1984 if (j >= MAX_RUN) {
1985 snprintf (str, MXSTR,
1986 "W error: can not find run %d for event %d in %d",
1987 irun, ievt, id);
1988 factOut (kFatal, 901, str);
1989 gi.wrtErr++;
1990 }
1991 lastRun = j;
1992 }
1993
1994 if (runCtrl[j].fileId < 0) {
1995 actRun.Version = 1;
1996 actRun.RunType = -1; //to be adapted
1997
1998 actRun.Nroi = runCtrl[j].roi0;
1999 actRun.NroiTM = runCtrl[j].roi8;
2000 if (actRun.Nroi == actRun.NroiTM)
2001 actRun.NroiTM = 0;
2002 actRun.RunTime = runCtrl[j].firstTime;
2003 actRun.RunUsec = runCtrl[j].firstTime;
2004 actRun.NBoard = NBOARDS;
2005 actRun.NPix = NPIX;
2006 actRun.NTm = NTMARK;
2007 actRun.Nroi = mBuffer[id].nRoi;
2008 memcpy (actRun.FADhead, mBuffer[id].FADhead,
2009 NBOARDS * sizeof (PEVNT_HEADER));
2010
2011 runCtrl[j].fileHd =
2012 runOpen (irun, &actRun, sizeof (actRun));
2013 if (runCtrl[j].fileHd == NULL) {
2014 snprintf (str, MXSTR,
2015 "W could not open a file for run %d", irun);
2016 factOut (kError, 502, str);
2017 runCtrl[j].fileId = 91;
2018 } else {
2019 snprintf (str, MXSTR, "W opened new run_file %d evt %d",
2020 irun, ievt);
2021 factOut (kWarn, -1, str);
2022 runCtrl[j].fileId = 0;
2023 }
2024
2025 }
2026
2027 if (runCtrl[j].fileId != 0 || mBuffer[id].toWrite < 0) {
2028 if (runCtrl[j].fileId < 0) {
2029 snprintf (str, MXSTR,
2030 "W never opened file for this run %d", irun);
2031 factOut (kError, 123, str);
2032 } else if (runCtrl[j].fileId < 100) {
2033 snprintf (str, MXSTR, "W.file for this run is closed %d",
2034 irun);
2035 factOut (kWarn, 123, str);
2036 runCtrl[j].fileId += 100;
2037 } else {
2038 snprintf (str, MXSTR, "W:file for this run is closed %d",
2039 irun);
2040 factOut (kDebug, 123, str);
2041 }
2042 evtCtrl.evtStat[k0] = 9903;
2043 gi.wrtErr++;
2044 } else {
2045 i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
2046 sizeof (mBuffer[id]));
2047 if (i >= 0) {
2048 runCtrl[j].lastTime = g_actTime;
2049 runCtrl[j].actEvt++;
2050 evtCtrl.evtStat[k0] = 9901;
2051 snprintf (str, MXSTR,
2052 "%5d successfully wrote for run %d id %5d",
2053 ievt, irun, k0);
2054 factOut (kDebug, 504, str);
2055 } else {
2056 snprintf (str, MXSTR, "W error writing event for run %d",
2057 irun);
2058 factOut (kError, 503, str);
2059 evtCtrl.evtStat[k0] = 9902;
2060 gi.wrtErr++;
2061 }
2062
2063 if (i < 0
2064 || runCtrl[j].lastTime < g_actTime - 300
2065 || runCtrl[j].closeTime < g_actTime
2066 || runCtrl[j].maxEvt < runCtrl[j].actEvt) {
2067 int ii = 0;
2068 if (i < 0)
2069 ii = 1;
2070 else if (runCtrl[j].closeTime < g_actTime)
2071 ii = 2;
2072 else if (runCtrl[j].lastTime < g_actTime - 300)
2073 ii = 3;
2074 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2075 ii = 4;
2076
2077
2078
2079 //close run for whatever reason
2080 if (runCtrl[j].runId == gi_myRun)
2081 gi_myRun = g_actTime;
2082
2083 if (runCtrl[j].procId == 0) {
2084 runFinish1 (runCtrl[j].runId);
2085 runCtrl[j].procId = 92;
2086 }
2087
2088 runCtrl[j].closeTime = g_actTime - 1;
2089 i = runClose (runCtrl[j].fileHd, &runTail[j],
2090 sizeof (runTail[j]));
2091 if (i < 0) {
2092 snprintf (str, MXSTR, "error closing run %d %d AAA",
2093 runCtrl[j].runId, i);
2094 factOut (kError, 503, str);
2095 runCtrl[j].fileId = 92;
2096 } else {
2097 snprintf (str, MXSTR, "W closed run %d AAA %d", irun,
2098 ii);
2099 factOut (kInfo, 503, str);
2100 runCtrl[j].fileId = 93;
2101 }
2102 }
2103 }
2104 }
2105 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2106 numWait++;
2107 }
2108
2109 //check if we should close a run (mainly when no event pending)
2110 for (j = 0; j < MAX_RUN; j++) {
2111 if (runCtrl[j].fileId == 0
2112 && (runCtrl[j].closeTime < g_actTime
2113 || runCtrl[j].lastTime < g_actTime - 300
2114 || runCtrl[j].maxEvt <= runCtrl[j].actEvt)) {
2115 if (runCtrl[j].runId == gi_myRun)
2116 gi_myRun = g_actTime;
2117 int ii = 0;
2118 if (runCtrl[j].closeTime < g_actTime)
2119 ii = 2;
2120 else if (runCtrl[j].lastTime < g_actTime - 300)
2121 ii = 3;
2122 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2123 ii = 4;
2124
2125 if (runCtrl[j].procId == 0) {
2126 runFinish1 (runCtrl[j].runId);
2127 runCtrl[j].procId = 92;
2128 }
2129
2130 runCtrl[j].closeTime = g_actTime - 1;
2131 i = runClose (runCtrl[j].fileHd, &runTail[j],
2132 sizeof (runTail[j]));
2133 if (i < 0) {
2134 snprintf (str, MXSTR, "error closing run %d %d BBB",
2135 runCtrl[j].runId, i);
2136 factOut (kError, 506, str);
2137 runCtrl[j].fileId = 94;
2138 } else {
2139 snprintf (str, MXSTR, "W closed run %d BBB %d",
2140 runCtrl[j].runId, ii);
2141 factOut (kInfo, 507, str);
2142 runCtrl[j].fileId = 95;
2143 }
2144 }
2145 }
2146
2147 if (numWrite == 0) {
2148 //seems we have nothing to do, so sleep a little
2149 xwait.tv_sec = 0;
2150 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2151 nanosleep (&xwait, NULL);
2152 }
2153
2154 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2155 snprintf (str, MXSTR, "Finish Write Process ...");
2156 factOut (kInfo, -1, str);
2157 gw_runStat = -22; //==> we should exit
2158 gj.writStat = -22; //==> we should exit
2159 goto closerun;
2160 }
2161 gw_runStat = gi_runStat;
2162 gj.writStat = gj.readStat;
2163
2164 }
2165
2166 //must close all open files ....
2167 snprintf (str, MXSTR, "Abort Writing Process ...");
2168 factOut (kInfo, -1, str);
2169
2170 closerun:
2171 snprintf (str, MXSTR, "Close all open files ...");
2172 factOut (kInfo, -1, str);
2173 for (j = 0; j < MAX_RUN; j++)
2174 if (runCtrl[j].fileId == 0) {
2175 if (runCtrl[j].runId == gi_myRun)
2176 gi_myRun = g_actTime;
2177
2178 if (runCtrl[j].procId == 0) {
2179 runFinish1 (runCtrl[j].runId);
2180 runCtrl[j].procId = 92;
2181 }
2182
2183 runCtrl[j].closeTime = g_actTime - 1;
2184 i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));
2185 int ii = 0;
2186 if (runCtrl[j].closeTime < g_actTime)
2187 ii = 2;
2188 else if (runCtrl[j].lastTime < g_actTime - 300)
2189 ii = 3;
2190 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2191 ii = 4;
2192 if (i < 0) {
2193 snprintf (str, MXSTR, "error closing run %d %d CCC",
2194 runCtrl[j].runId, i);
2195 factOut (kError, 506, str);
2196 runCtrl[j].fileId = 96;
2197 } else {
2198 snprintf (str, MXSTR, "W closed run %d CCC %d", runCtrl[j].runId,
2199 ii);
2200 factOut (kInfo, 507, str);
2201 runCtrl[j].fileId = 97;
2202 }
2203 }
2204
2205 gw_runStat = -99;
2206 gj.writStat = -99;
2207 snprintf (str, MXSTR, "Exit Writing Process ...");
2208 factOut (kInfo, -1, str);
2209 return 0;
2210
2211
2212
2213
2214} /*-----------------------------------------------------------------*/
2215
2216
2217
2218
2219void
2220StartEvtBuild ()
2221{
2222
2223 int i, j, imax, status, th_ret[50];
2224 pthread_t thread[50];
2225 struct timespec xwait;
2226
2227 gi_runStat = gp_runStat = gw_runStat = 0;
2228 gj.readStat = gj.procStat = gj.writStat = 0;
2229
2230 snprintf (str, MXSTR, "Starting EventBuilder V15.07 A");
2231 factOut (kInfo, -1, str);
2232
2233//initialize run control logics
2234 for (i = 0; i < MAX_RUN; i++) {
2235 runCtrl[i].runId = 0;
2236 runCtrl[i].fileId = -2;
2237 }
2238
2239//prepare for subProcesses
2240// gi_maxSize = g_maxSize;
2241// if (gi_maxSize < 0)
2242 gi_maxSize = 0;
2243
2244 gi_maxProc = g_maxProc;
2245 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2246 snprintf (str, MXSTR, "illegal number of processes %d", gi_maxProc);
2247 factOut (kFatal, 301, str);
2248 gi_maxProc = 1;
2249 }
2250//partially initialize event control logics
2251 evtCtrl.frstPtr = 0;
2252 evtCtrl.lastPtr = 0;
2253
2254//start all threads (more to come) when we are allowed to ....
2255 while (g_runStat == 0) {
2256 xwait.tv_sec = 0;
2257 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2258 nanosleep (&xwait, NULL);
2259 }
2260
2261 i = 0;
2262 th_ret[i] = pthread_create (&thread[i], NULL, readFAD, NULL);
2263 i++;
2264 th_ret[i] = pthread_create (&thread[i], NULL, procEvt, NULL);
2265 i++;
2266 th_ret[i] = pthread_create (&thread[i], NULL, writeEvt, NULL);
2267 i++;
2268 imax = i;
2269
2270
2271#ifdef BILAND
2272 xwait.tv_sec = 30;;
2273 xwait.tv_nsec = 0; // sleep for ~20sec
2274 nanosleep (&xwait, NULL);
2275
2276 printf ("close all runs in 2 seconds\n");
2277
2278 CloseRunFile (0, time (NULL) + 2, 0);
2279
2280 xwait.tv_sec = 1;;
2281 xwait.tv_nsec = 0; // sleep for ~20sec
2282 nanosleep (&xwait, NULL);
2283
2284 printf ("setting g_runstat to -1\n");
2285
2286 g_runStat = -1;
2287#endif
2288
2289
2290//wait for all threads to finish
2291 for (i = 0; i < imax; i++) {
2292 j = pthread_join (thread[i], (void **) &status);
2293 }
2294
2295} /*-----------------------------------------------------------------*/
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311 /*-----------------------------------------------------------------*/
2312 /*-----------------------------------------------------------------*/
2313 /*-----------------------------------------------------------------*/
2314 /*-----------------------------------------------------------------*/
2315 /*-----------------------------------------------------------------*/
2316
2317#ifdef BILAND
2318
2319int
2320subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2321 int16_t mboard, void * buffer)
2322{
2323 printf ("called subproc %d\n", threadID);
2324 return threadID + 1;
2325}
2326
2327
2328
2329
2330 /*-----------------------------------------------------------------*/
2331 /*-----------------------------------------------------------------*/
2332 /*-----------------------------------------------------------------*/
2333 /*-----------------------------------------------------------------*/
2334 /*-----------------------------------------------------------------*/
2335
2336
2337
2338
2339FileHandle_t
2340runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2341{
2342 return 1;
2343};
2344
2345int
2346runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2347{
2348 return 1;
2349 usleep (10000);
2350 return 1;
2351}
2352
2353
2354//{ return 1; } ;
2355
2356int
2357runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2358{
2359 return 1;
2360};
2361
2362
2363
2364
2365int
2366eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event, int mboard)
2367{
2368 int i = 0;
2369
2370// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2371// for (i=0; i<NBOARDS; i++) {
2372// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2373// }
2374 return 0;
2375}
2376
2377
2378void
2379factStatNew (EVT_STAT gi)
2380{
2381 int i;
2382
2383//for (i=0;i<MAX_SOCK;i++) {
2384// printf("%4d",gi.numRead[i]);
2385// if (i%20 == 0 ) printf("\n");
2386//}
2387}
2388
2389void
2390gotNewRun (int runnr, PEVNT_HEADER * headers)
2391{
2392 printf ("got new run %d\n", runnr);
2393 return;
2394}
2395
2396void
2397factStat (GUI_STAT gj)
2398{
2399// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2400// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2401}
2402
2403
2404void
2405debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2406 int state, uint32_t tsec, uint32_t tusec)
2407{
2408// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2409}
2410
2411
2412
2413void
2414debugStream (int isock, void *buf, int len)
2415{
2416}
2417
2418void
2419debugHead (int i, int j, void *buf)
2420{
2421}
2422
2423
2424void
2425factOut (int severity, int err, char *message)
2426{
2427 static FILE *fd;
2428 static int file = 0;
2429
2430 if (file == 0) {
2431 printf ("open file\n");
2432 fd = fopen ("x.out", "w+");
2433 file = 999;
2434 }
2435
2436 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2437
2438 if (severity != kDebug)
2439 printf ("%3d %3d | %s\n", severity, err, message);
2440}
2441
2442
2443
2444int
2445main ()
2446{
2447 int i, b, c, p;
2448 char ipStr[100];
2449 struct in_addr IPaddr;
2450
2451 g_maxMem = 1024 * 1024; //MBytes
2452//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2453 g_maxMem = g_maxMem * 200; //100MBytes
2454
2455 g_maxProc = 20;
2456 g_maxSize = 0;
2457
2458 g_runStat = 40;
2459
2460 i = 0;
2461
2462// version for standard crates
2463//for (c=0; c<4,c++) {
2464// for (b=0; b<10; b++) {
2465// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2466//
2467// inet_pton(PF_INET, ipStr, &IPaddr) ;
2468//
2469// g_port[i].sockAddr.sin_family = PF_INET;
2470// g_port[i].sockAddr.sin_port = htons(5000) ;
2471// g_port[i].sockAddr.sin_addr = IPaddr ;
2472// g_port[i].sockDef = 1 ;
2473// i++ ;
2474// }
2475//}
2476//
2477//version for PC-test *
2478 for (c = 0; c < 4; c++) {
2479 for (b = 0; b < 10; b++) {
2480 sprintf (ipStr, "10.0.%d.11", 128 + c);
2481 if (c < 2)
2482 sprintf (ipStr, "10.0.%d.11", 128);
2483 else
2484 sprintf (ipStr, "10.0.%d.11", 131);
2485// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2486
2487 inet_pton (PF_INET, ipStr, &IPaddr);
2488 p = 31919 + 100 * c + 10 * b;
2489
2490
2491 g_port[i].sockAddr.sin_family = PF_INET;
2492 g_port[i].sockAddr.sin_port = htons (p);
2493 g_port[i].sockAddr.sin_addr = IPaddr;
2494 g_port[i].sockDef = 1;
2495
2496 i++;
2497 }
2498 }
2499
2500
2501//g_port[17].sockDef =-1 ;
2502//g_actBoards-- ;
2503
2504 StartEvtBuild ();
2505
2506 return 0;
2507
2508}
2509#endif
Note: See TracBrowser for help on using the repository browser.