source: trunk/FACT++/src/EventBuilder.c@ 12328

Last change on this file since 12328 was 12227, checked in by tbretz, 13 years ago
Latest updates and bug fixes.
File size: 78.5 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6
7#include <stdlib.h>
8#include <stdint.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include "EventBuilder.h"
26
27enum Severity
28{
29 kMessage = 10, ///< Just a message, usually obsolete
30 kInfo = 20, ///< An info telling something which can be interesting to know
31 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
32 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
33 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
34 kDebug = 99, ///< A message used for debugging only
35};
36
37#define MIN_LEN 32 // min #bytes needed to interpret FADheader
38#define MAX_LEN 256*1024 // size of read-buffer per socket
39
40//#define nanosleep(x,y)
41
42extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
43extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
44extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
45//extern int runFinish (uint32_t runnr);
46
47extern void factOut (int severity, int err, char *message);
48
49extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
50
51
52extern void factStat (GUI_STAT gj);
53
54extern void factStatNew (EVT_STAT gi);
55
56extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
57
58extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
59 int8_t * buffer);
60
61extern void debugHead (int i, int j, void *buf);
62
63extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
64 int32_t runnr, int state, uint32_t tsec,
65 uint32_t tusec);
66extern void debugStream (int isock, void *buf, int len);
67
68int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
69
70
71
72
73
74int g_maxProc;
75int g_maxSize;
76int gi_maxSize;
77int gi_maxProc;
78
79uint g_actTime;
80uint g_actUsec;
81int g_runStat;
82int g_reset;
83int g_useFTM;
84
85int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
86size_t g_maxMem; //maximum memory allowed for buffer
87
88//no longer needed ...
89int g_maxBoards; //maximum number of boards to be initialized
90int g_actBoards;
91//
92
93FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
94
95
96int gi_runStat;
97int gp_runStat;
98int gw_runStat;
99
100int gi_memStat = +1;
101
102uint32_t gi_myRun = 0;
103uint32_t actrun = 0;
104
105
106uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
107
108//uint gi_EvtStart= 0 ;
109//uint gi_EvtRead = 0 ;
110//uint gi_EvtBad = 0 ;
111//uint gi_EvtTot = 0 ;
112//size_t gi_usedMem = 0 ;
113
114//uint gw_EvtTot = 0 ;
115//uint gp_EvtTot = 0 ;
116
117PIX_MAP g_pixMap[NPIX];
118
119EVT_STAT gi;
120GUI_STAT gj;
121
122EVT_CTRL evtCtrl; //control of events during processing
123int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
124
125WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
126
127
128
129
130RUN_HEAD actRun;
131
132RUN_CTRL runCtrl[MAX_RUN];
133
134RUN_TAIL runTail[MAX_RUN];
135
136
137/*
138*** Definition of rdBuffer to read in IP packets; keep it global !!!!
139 */
140
141
142typedef union
143{
144 int8_t B[MAX_LEN];
145 int16_t S[MAX_LEN / 2];
146 int32_t I[MAX_LEN / 4];
147 int64_t L[MAX_LEN / 8];
148} CNV_FACT;
149
150typedef struct
151{
152 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
153 int32_t bufPos; //next byte to read to the buffer next
154 int32_t bufLen; //number of bytes left to read
155// size_t bufLen; //number of bytes left to read size_t might be better
156 int32_t skip; //number of bytes skipped before start of event
157
158 int errCnt; //how often connect failed since last successful
159 int sockStat; //-1 if socket not yet connected , 99 if not exist
160 int socket; //contains the sockets
161 struct sockaddr_in SockAddr; //IP for each socket
162
163 int evtID; // event ID of event currently read
164 int runID; // run "
165 int ftmID; // event ID from FTM
166 uint fadLen; // FADlength of event currently read
167 int fadVers; // Version of FAD
168 int ftmTyp; // trigger type
169 int board; // boardID (softwareID: 0..40 )
170 int Port;
171
172 CNV_FACT *rBuf;
173
174#ifdef EVTDEBUG
175 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
176#endif
177
178} READ_STRUCT;
179
180
181typedef union
182{
183 int8_t B[2];
184 int16_t S;
185} SHORT_BYTE;
186
187
188
189
190
191#define MXSTR 1000
192char str[MXSTR];
193
194SHORT_BYTE start, stop;
195
196READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
197
198
199
200/*-----------------------------------------------------------------*/
201
202
203/*-----------------------------------------------------------------*/
204
205int
206runFinish1 (uint32_t runnr)
207{
208 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
209 runnr);
210 factOut (kInfo, 173, str); //but continue anyhow
211 return 0;
212}
213
214
215int
216GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
217 READ_STRUCT * rd)
218{
219/*
220*** generate Address, create sockets and allocates readbuffer for it
221***
222*** if flag==0 generate socket and buffer
223*** <0 destroy socket and buffer
224*** >0 close and redo socket
225***
226*** sid : board*7 + port id
227 */
228
229 int j;
230 int optval = 1; //activate keepalive
231 socklen_t optlen = sizeof (optval);
232
233
234 if (sid % 7 >= NUMSOCK) {
235 //this is a not used socket, so do nothing ...
236 rd->sockStat = 77;
237 rd->rBuf == NULL ;
238 return 0;
239 }
240
241 if (rd->sockStat == 0) { //close socket if open
242 j = close (rd->socket);
243 if (j > 0) {
244 snprintf (str, MXSTR, "Error closing socket %d | %m", sid);
245 factOut (kFatal, 771, str);
246 } else {
247 snprintf (str, MXSTR, "Succesfully closed socket %d", sid);
248 factOut (kInfo, 771, str);
249 }
250 }
251
252 rd->sockStat = 99;
253
254 if (flag < 0) {
255 free (rd->rBuf); //and never open again
256#ifdef EVTDEBUG
257 free (rd->xBuf); //and never open again
258#endif
259 rd->rBuf = NULL;
260 return 0;
261 }
262
263
264 if (flag == 0) { //generate address and buffer ...
265 rd->Port = port;
266 rd->SockAddr.sin_family = sockAddr->sin_family;
267 rd->SockAddr.sin_port = htons (port);
268 rd->SockAddr.sin_addr = sockAddr->sin_addr;
269
270#ifdef EVTDEBUG
271 rd->xBuf = malloc (sizeof (CNV_FACT));
272#endif
273 rd->rBuf = malloc (sizeof (CNV_FACT));
274 if (rd->rBuf == NULL) {
275 snprintf (str, MXSTR, "Could not create local buffer %d", sid);
276 factOut (kFatal, 774, str);
277 rd->sockStat = 77;
278 return -3;
279 }
280 }
281
282
283 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
284 snprintf (str, MXSTR, "Could not generate socket %d | %m", sid);
285 factOut (kFatal, 773, str);
286 rd->sockStat = 88;
287 return -2;
288 }
289 optval = 1;
290 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
291 snprintf (str, MXSTR, "Could not set keepalive %d | %m", sid);
292 factOut (kInfo, 173, str); //but continue anyhow
293 }
294 optval = 10; //start after 10 seconds
295 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
296 snprintf (str, MXSTR, "Could not set keepidle %d | %m", sid);
297 factOut (kInfo, 173, str); //but continue anyhow
298 }
299 optval = 10; //do every 10 seconds
300 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
301 snprintf (str, MXSTR, "Could not set keepintvl %d | %m", sid);
302 factOut (kInfo, 173, str); //but continue anyhow
303 }
304 optval = 2; //close after 2 unsuccessful tries
305 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
306 snprintf (str, MXSTR, "Could not set keepalive probes %d | %m", sid);
307 factOut (kInfo, 173, str); //but continue anyhow
308 }
309
310
311 snprintf (str, MXSTR, "Successfully generated socket %d ", sid);
312 factOut (kInfo, 773, str);
313 rd->sockStat = -1; //try to (re)open socket
314 rd->errCnt = 0;
315 return 0;
316
317} /*-----------------------------------------------------------------*/
318
319 /*-----------------------------------------------------------------*/
320
321
322
323
324int
325mBufInit ()
326{
327// initialize mBuffer (mark all entries as unused\empty)
328
329 int i;
330 uint32_t actime;
331
332 actime = g_actTime + 50000000;
333
334 for (i = 0; i < MAX_EVT * MAX_RUN; i++) {
335 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
336 mBuffer[i].runNum = 0;
337
338 evtCtrl.evtBuf[i] = -1;
339 evtCtrl.evtStat[i] = -1;
340 evtCtrl.pcTime[i] = actime; //initiate to far future
341 }
342
343
344 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
345
346 evtCtrl.frstPtr = 0;
347 evtCtrl.lastPtr = 0;
348
349 return 0;
350
351} /*-----------------------------------------------------------------*/
352
353
354
355
356int
357mBufEvt (int evID, uint runID, int nRoi[], int sk,
358 int fadlen, int trgTyp, int trgNum, int fadNum)
359{
360// generate a new Event into mBuffer:
361// make sure only complete Event are possible, so 'free' will always work
362// returns index into mBuffer[], or negative value in case of error
363// error: <-9000 if roi screwed up (not consistent with run)
364// <-8000 (not consistent with event)
365// <-7000 (not consistent with board)
366// < 0 if no space left
367
368 struct timeval *tv, atv;
369 tv = &atv;
370 uint32_t tsec, tusec;
371 uint oldest;
372 int jold;
373
374 int i, k, jr, b, evFree;
375 int headmem = 0;
376 size_t needmem = 0;
377
378
379 b = sk / 7;
380
381 if (nRoi[0] < 0 || nRoi[0] > 1024) {
382 snprintf (str, MXSTR, "illegal nRoi[0]: %d", nRoi[0]);
383 factOut (kError, 999, str);
384 gj.badRoiR++;
385 gj.badRoi[b]++;
386 return -9999;
387 }
388
389 for (jr = 1; jr < 8; jr++) {
390 if (nRoi[jr] != nRoi[0]) {
391 snprintf (str, MXSTR, "wrong nRoi[%d]: %d %d", jr, nRoi[jr],
392 nRoi[0]);
393 factOut (kError, 711, str);
394 gj.badRoiB++;
395 gj.badRoi[b]++;
396 return -7101;
397 }
398 }
399 if (nRoi[8] < nRoi[0]) {
400 snprintf (str, MXSTR, "wrong nRoi_TM: %d %d", nRoi[8], nRoi[0]);
401 factOut (kError, 712, str);
402 gj.badRoiB++;
403 gj.badRoi[b]++;
404 return -7102;
405 }
406
407
408 i = evID % MAX_EVT;
409 evFree = -1;
410
411 for (k = 0; k < MAX_RUN; k++) {
412 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
413 // is it ok ????
414 if (mBuffer[i].nRoi != nRoi[0]
415 || mBuffer[i].nRoiTM != nRoi[8]) {
416 snprintf (str, MXSTR, "illegal evt_roi %d %d ; %d %d",
417 nRoi[0], nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM);
418 factOut (kError, 821, str);
419 gj.badRoiE++;
420 gj.badRoi[b]++;
421 return -8201;
422 }
423// count for inconsistencies
424
425 if (mBuffer[i].trgNum != trgNum)
426 mBuffer[i].Errors[0]++;
427 if (mBuffer[i].fadNum != fadNum)
428 mBuffer[i].Errors[1]++;
429 if (mBuffer[i].trgTyp != trgTyp)
430 mBuffer[i].Errors[2]++;
431
432 //everything seems fine so far ==> use this slot ....
433 return i;
434 }
435 if (evFree < 0 && mBuffer[i].evNum < 0)
436 evFree = i;
437 i += MAX_EVT;
438 }
439
440
441 //event does not yet exist; create it
442 if (evFree < 0) { //no space available in ctrl
443 snprintf (str, MXSTR, "no control slot to keep event %d", evID);
444 factOut (kError, 881, str);
445 return -1;
446 }
447 i = evFree; //found free entry; use it ...
448
449 gettimeofday (tv, NULL);
450 tsec = atv.tv_sec;
451 tusec = atv.tv_usec;
452
453 //check if runId already registered in runCtrl
454 evFree = -1;
455 oldest = g_actTime + 1000;
456 jold = -1;
457 for (k = 0; k < MAX_RUN; k++) {
458 if (runCtrl[k].runId == runID) {
459// if (runCtrl[k].procId > 0) { //run is closed -> reject
460// snprintf (str, MXSTR, "skip event since run %d finished", runID);
461// factOut (kInfo, 931, str);
462// return -21;
463// }
464
465 if (runCtrl[k].roi0 != nRoi[0]
466 || runCtrl[k].roi8 != nRoi[8]) {
467 snprintf (str, MXSTR, "illegal run_roi %d %d ; %d %d",
468 nRoi[0], nRoi[8], runCtrl[k].roi0, runCtrl[k].roi8);
469 factOut (kError, 931, str);
470 gj.badRoiR++;
471 gj.badRoi[b]++;
472 return -9301;
473 }
474 goto RUNFOUND;
475 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
476 evFree = k;
477 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
478 if (runCtrl[k].closeTime < oldest) {
479 oldest = runCtrl[k].closeTime;
480 jold = k;
481 }
482 }
483 }
484
485 if (evFree < 0 && jold < 0) {
486 snprintf (str, MXSTR, "not able to register the new run %d", runID);
487 factOut (kFatal, 883, str);
488 return -1001;
489 } else {
490 if (evFree < 0)
491 evFree = jold;
492 snprintf (str, MXSTR, "register new run %d(%d) roi: %d %d", runID,
493 evFree, nRoi[0], nRoi[8]);
494 factOut (kInfo, 503, str);
495 runCtrl[evFree].runId = runID;
496 runCtrl[evFree].roi0 = nRoi[0];
497 runCtrl[evFree].roi8 = nRoi[8];
498 runCtrl[evFree].fileId = -2;
499 runCtrl[evFree].procId = -2;
500 runCtrl[evFree].lastEvt = -1;
501 runCtrl[evFree].nextEvt = 0;
502 runCtrl[evFree].actEvt = 0;
503 runCtrl[evFree].procEvt = 0;
504 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
505 runCtrl[evFree].firstUsec = tusec;
506 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
507 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
508// runCtrl[evFree].lastTime = 0;
509
510 runTail[evFree].nEventsOk =
511 runTail[evFree].nEventsRej =
512 runTail[evFree].nEventsBad =
513 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
514 }
515
516 RUNFOUND:
517
518 needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
519
520 headmem = NBOARDS * sizeof (PEVNT_HEADER);
521
522 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
523 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
524 if (gi_memStat > 0) {
525 gi_memStat = -99;
526 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
527 evID, sk);
528 factOut (kError, 882, str);
529 } else {
530 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
531 evID, sk);
532 factOut (kDebug, 882, str);
533 }
534 return -11;
535 }
536
537 mBuffer[i].FADhead = malloc (headmem);
538 if (mBuffer[i].FADhead == NULL) {
539 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
540 factOut (kError, 882, str);
541 return -12;
542 }
543
544 mBuffer[i].fEvent = malloc (needmem);
545 if (mBuffer[i].fEvent == NULL) {
546 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
547 factOut (kError, 882, str);
548 free (mBuffer[i].FADhead);
549 mBuffer[i].FADhead = NULL;
550 return -22;
551 }
552
553 mBuffer[i].buffer = malloc (gi_maxSize);
554 if (mBuffer[i].buffer == NULL) {
555 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
556 factOut (kError, 882, str);
557 free (mBuffer[i].FADhead);
558 mBuffer[i].FADhead = NULL;
559 free (mBuffer[i].fEvent);
560 mBuffer[i].fEvent = NULL;
561 return -32;
562 }
563 //flag all boards as unused
564 mBuffer[i].nBoard = 0;
565 for (k = 0; k < NBOARDS; k++) {
566 mBuffer[i].board[k] = -1;
567 }
568 //flag all pixels as unused
569 for (k = 0; k < NPIX; k++) {
570 mBuffer[i].fEvent->StartPix[k] = -1;
571 }
572 //flag all TMark as unused
573 for (k = 0; k < NTMARK; k++) {
574 mBuffer[i].fEvent->StartTM[k] = -1;
575 }
576
577 mBuffer[i].fEvent->NumBoards = 0;
578 mBuffer[i].fEvent->PCUsec = tusec;
579 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
580 mBuffer[i].nRoi = nRoi[0];
581 mBuffer[i].nRoiTM = nRoi[8];
582 mBuffer[i].evNum = evID;
583 mBuffer[i].runNum = runID;
584 mBuffer[i].fadNum = fadNum;
585 mBuffer[i].trgNum = trgNum;
586 mBuffer[i].trgTyp = trgTyp;
587 mBuffer[i].evtLen = needmem;
588 mBuffer[i].Errors[0] =
589 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
590
591 gj.usdMem += needmem + headmem + gi_maxSize;
592 if (gj.usdMem > gj.maxMem)
593 gj.maxMem = gj.usdMem;
594
595 gj.bufTot++;
596 if (gj.bufTot > gj.maxEvt)
597 gj.maxEvt = gj.bufTot;
598
599 gj.rateNew++;
600
601 //register event in 'active list (reading)'
602
603 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
604 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
605 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
606 evtIdx[i] = evtCtrl.lastPtr;
607
608
609 snprintf (str, MXSTR,
610 "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", evID,
611 runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
612 factOut (kDebug, -11, str);
613 evtCtrl.lastPtr++;
614 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN)
615 evtCtrl.lastPtr = 0;
616
617 gi.evtGet++;
618
619 return i;
620
621} /*-----------------------------------------------------------------*/
622
623
624
625
626int
627mBufFree (int i)
628{
629//delete entry [i] from mBuffer:
630//(and make sure multiple calls do no harm ....)
631
632 int headmem = 0;
633 int evid;
634 size_t freemem = 0;
635
636 evid = mBuffer[i].evNum;
637 freemem = mBuffer[i].evtLen;
638
639 free (mBuffer[i].fEvent);
640 mBuffer[i].fEvent = NULL;
641
642 free (mBuffer[i].FADhead);
643 mBuffer[i].FADhead = NULL;
644
645 free (mBuffer[i].buffer);
646 mBuffer[i].buffer = NULL;
647
648 headmem = NBOARDS * sizeof (PEVNT_HEADER);
649 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
650 mBuffer[i].runNum = 0;
651
652 gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
653 gj.bufTot--;
654
655 if (gi_memStat < 0) {
656 if (gj.usdMem <= 0.75 * gj.maxMem)
657 gi_memStat = +1;
658 }
659
660
661 return 0;
662
663} /*-----------------------------------------------------------------*/
664
665
666void
667resetEvtStat ()
668{
669 int i;
670
671 for (i = 0; i < MAX_SOCK; i++)
672 gi.numRead[i] = 0;
673
674 for (i = 0; i < NBOARDS; i++) {
675 gi.gotByte[i] = 0;
676 gi.gotErr[i] = 0;
677
678 }
679
680 gi.evtGet = 0; //#new Start of Events read
681 gi.evtTot = 0; //#complete Events read
682 gi.evtErr = 0; //#Events with Errors
683 gi.evtSkp = 0; //#Events incomplete (timeout)
684
685 gi.procTot = 0; //#Events processed
686 gi.procErr = 0; //#Events showed problem in processing
687 gi.procTrg = 0; //#Events accepted by SW trigger
688 gi.procSkp = 0; //#Events rejected by SW trigger
689
690 gi.feedTot = 0; //#Events used for feedBack system
691 gi.feedErr = 0; //#Events rejected by feedBack
692
693 gi.wrtTot = 0; //#Events written to disk
694 gi.wrtErr = 0; //#Events with write-error
695
696 gi.runOpen = 0; //#Runs opened
697 gi.runClose = 0; //#Runs closed
698 gi.runErr = 0; //#Runs with open/close errors
699
700 return;
701} /*-----------------------------------------------------------------*/
702
703
704
705void
706initReadFAD ()
707{
708 return;
709} /*-----------------------------------------------------------------*/
710
711
712
713void *
714readFAD (void *ptr)
715{
716/* *** main loop reading FAD data and sorting them to complete events */
717 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k;
718 int actBoards = 0, minLen;
719 int32_t jrd;
720 uint gi_SecTime; //time in seconds
721 int boardId, roi[9], drs, px, src, pixS, pixH, pixC, pixR, tmS;
722
723 int goodhed = 0;
724
725 int sockDef[NBOARDS]; //internal state of sockets
726 int jrdx;
727
728
729 struct timespec xwait;
730
731
732 struct timeval *tv, atv;
733 tv = &atv;
734 uint32_t tsec, tusec;
735
736
737 snprintf (str, MXSTR, "start initializing");
738 factOut (kInfo, -1, str);
739
740 int cpu = 7; //read thread
741 cpu_set_t mask;
742
743/* CPU_ZERO initializes all the bits in the mask to zero. */
744// CPU_ZERO (&mask);
745/* CPU_SET sets only the bit corresponding to cpu. */
746 cpu = 7;
747// CPU_SET (cpu, &mask);
748
749/* sched_setaffinity returns 0 in success */
750// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
751// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
752// factOut (kWarn, -1, str);
753// }
754
755
756 head_len = sizeof (PEVNT_HEADER);
757 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
758 minLen = head_len; //min #bytes needed to check header: full header for debug
759
760 start.S = 0xFB01;
761 stop.S = 0x04FE;
762
763/* initialize run control logics */
764 for (i = 0; i < MAX_RUN; i++) {
765 runCtrl[i].runId = 0;
766 runCtrl[i].fileId = -2;
767 runCtrl[i].procId = -2;
768 }
769 gi_resetS = gi_resetR = 9;
770 for (i = 0; i < NBOARDS; i++)
771 sockDef[i] = 0;
772
773 START:
774 gettimeofday (tv, NULL);
775 g_actTime = tsec = atv.tv_sec;
776 g_actUsec = tusec = atv.tv_usec;
777 gi_myRun = g_actTime;
778 evtCtrl.frstPtr = 0;
779 evtCtrl.lastPtr = 0;
780
781 gi_SecTime = g_actTime;
782 gi_runStat = g_runStat;
783 gj.readStat = g_runStat;
784 numok = numok2 = 0;
785
786 int cntsock = 8 - NUMSOCK ;
787
788 if (gi_resetS > 0) {
789 //make sure all sockets are preallocated as 'not exist'
790 for (i = 0; i < MAX_SOCK; i++) {
791 rd[i].socket = -1;
792 rd[i].sockStat = 99;
793 }
794
795 for (k = 0; k < NBOARDS; k++) {
796 gi_NumConnect[k] = 0;
797 gi.numConn[k] = 0;
798 gj.numConn[k] = 0;
799 gj.errConn[k] = 0;
800 gj.rateBytes[k] = 0;
801 gj.totBytes[k] = 0;
802 }
803
804 }
805
806
807 if (gi_resetR > 0) {
808 resetEvtStat ();
809 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
810 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
811 gj.totMem = g_maxMem;
812 gj.bufNew = gj.bufEvt = 0;
813 gj.badRoiE = gj.badRoiR = gj.badRoiB =
814 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
815
816 int b,p;
817 for (b = 0; b < NBOARDS; b++)
818 gj.badRoi[b] = 0;
819
820 mBufInit (); //initialize buffers
821
822 snprintf (str, MXSTR, "end initializing");
823 factOut (kInfo, -1, str);
824 }
825
826
827 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
828
829 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
830
831 gi_runStat = g_runStat;
832 gj.readStat = g_runStat;
833 gettimeofday (tv, NULL);
834 g_actTime = tsec = atv.tv_sec;
835 g_actUsec = tusec = atv.tv_usec;
836
837
838 int b, p, p0, s0, nch;
839 nch = 0;
840 for (b = 0; b < NBOARDS; b++) {
841 k = b * 7;
842 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
843 nch++;
844 gi_NumConnect[b] = 0; //must close all connections
845 gi.numConn[b] = 0;
846 gj.numConn[b] = 0;
847 if (sockDef[b] == 0)
848 s0 = 0; //sockets to be defined and opened
849 else if (g_port[b].sockDef == 0)
850 s0 = -1; //sockets to be destroyed
851 else
852 s0 = +1; //sockets to be closed and reopened
853
854 if (s0 == 0)
855 p0 = ntohs (g_port[b].sockAddr.sin_port);
856 else
857 p0 = 0;
858
859 for (p = p0 + 1; p < p0 + 8; p++) {
860 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
861 k++;
862 }
863 sockDef[b] = g_port[b].sockDef;
864 }
865 }
866
867 if (nch > 0) {
868 actBoards = 0;
869 for (b = 0; b < NBOARDS; b++) {
870 if (sockDef[b] > 0)
871 actBoards++;
872 }
873 }
874
875
876 jrdx = 0;
877 numokx = 0;
878 numok = 0; //count number of succesfull actions
879
880 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read
881 b = i / 7 ;
882 p = i % 7 ;
883
884if ( p >= NUMSOCK) { ; }
885else {
886 if (sockDef[b] > 0)
887 s0 = +1;
888 else
889 s0 = -1;
890
891 if (rd[i].sockStat < 0) { //try to connect if not yet done
892 if (rd[i].sockStat == -1) {
893 rd[i].sockStat = connect (rd[i].socket,
894 (struct sockaddr *) &rd[i].SockAddr,
895 sizeof (rd[i].SockAddr));
896 if (rd[i].sockStat == -1) {
897 rd[i].errCnt++ ;
898// if (rd[i].errCnt < 100) rd[i].sockStat = rd[i].errCnt ;
899// else if (rd[i].errCnt < 1000) rd[i].sockStat = 1000 ;
900// else rd[i].sockStat = 10000 ;
901 }
902//printf("try to connect %d -> %d\n",i,rd[i].sockStat);
903
904 }
905 if (rd[i].sockStat < -1 ) {
906 rd[i].sockStat++ ;
907 }
908 if (rd[i].sockStat == 0) { //successfull ==>
909 if (sockDef[b] > 0) {
910 rd[i].bufTyp = 0; // expect a header
911 rd[i].bufLen = frst_len; // max size to read at begining
912 } else {
913 rd[i].bufTyp = -1; // full data to be skipped
914 rd[i].bufLen = MAX_LEN; //huge for skipping
915 }
916 rd[i].bufPos = 0; // no byte read so far
917 rd[i].skip = 0; // start empty
918// gi_NumConnect[b]++;
919 gi_NumConnect[b] += cntsock ;
920
921 gi.numConn[b]++;
922 gj.numConn[b]++;
923 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);
924 factOut (kInfo, -1, str);
925 }
926 }
927
928 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
929 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
930 numok++;
931 size_t maxread = rd[i].bufLen ;
932 if (maxread > MAXREAD ) maxread=MAXREAD ;
933
934 jrd =
935 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
936 maxread, MSG_DONTWAIT);
937// rd[i].bufLen, MSG_DONTWAIT);
938
939 if (jrd > 0) {
940 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
941#ifdef EVTDEBUG
942 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
943 &rd[i].rBuf->B[rd[i].bufPos], jrd);
944 snprintf (str, MXSTR,
945 "read sock %3d bytes %5d len %5d first %d %d", i,
946 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos],
947 rd[i].rBuf->B[rd[i].bufPos + 1]);
948 factOut (kDebug, 301, str);
949#endif
950 }
951
952 if (jrd == 0) { //connection has closed ...
953 snprintf (str, MXSTR, "Socket %d closed by FAD", i);
954 factOut (kInfo, 441, str);
955 GenSock (s0, i, 0, NULL, &rd[i]);
956 gi.gotErr[b]++;
957// gi_NumConnect[b]--;
958 gi_NumConnect[b]-= cntsock ;
959 gi.numConn[b]--;
960 gj.numConn[b]--;
961
962 } else if (jrd < 0) { //did not read anything
963 if (errno != EAGAIN && errno != EWOULDBLOCK) {
964 snprintf (str, MXSTR, "Error Reading from %d | %m", i);
965 factOut (kError, 442, str);
966 gi.gotErr[b]++;
967 } else
968 numok--; //else nothing waiting to be read
969 jrd = 0;
970 }
971 } else {
972 jrd = 0; //did read nothing as requested
973 snprintf (str, MXSTR, "do not read from socket %d %d", i,
974 rd[i].bufLen);
975 factOut (kDebug, 301, str);
976 }
977
978 gi.gotByte[b] += jrd;
979 gj.rateBytes[b] += jrd;
980
981 if (jrd > 0) {
982 numokx++;
983 jrdx += jrd;
984 }
985
986
987 if (rd[i].bufTyp < 0) { // we are skipping this board ...
988// just do nothing
989#ifdef EVTDEBUG
990 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
991 i);
992 factOut (kInfo, 301, str);
993#endif
994
995 } else if (rd[i].bufTyp > 0) { // we are reading data ...
996 if (jrd < rd[i].bufLen) { //not yet all read
997 rd[i].bufPos += jrd; //==> prepare for continuation
998 rd[i].bufLen -= jrd;
999 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
1000 } else { //full dataset read
1001 rd[i].bufLen = 0;
1002 rd[i].bufPos = rd[i].fadLen;
1003 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
1004 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
1005 gi.evtErr++;
1006 snprintf (str, MXSTR,
1007 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
1008 i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0],
1009 rd[i].rBuf->B[1],
1010 rd[i].rBuf->B[rd[i].bufPos - 2],
1011 rd[i].rBuf->B[rd[i].bufPos - 1]);
1012 factOut (kError, 301, str);
1013 goto EndBuf;
1014
1015#ifdef EVTDEBUG
1016 } else {
1017 snprintf (str, MXSTR,
1018 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
1019 i, rd[i].fadLen, rd[i].rBuf->B[0],
1020 rd[i].rBuf->B[1], start.B[1], start.B[0],
1021 rd[i].rBuf->B[rd[i].bufPos - 2],
1022 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
1023 stop.B[0]);
1024 factOut (kDebug, 301, str);
1025#endif
1026 }
1027
1028 if (jrd > 0)
1029 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1030
1031 //we have a complete buffer, copy to WORK area
1032 int jr;
1033 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1034 for (jr = 0; jr < 9; jr++) {
1035 roi[jr] =
1036 ntohs (rd[i].
1037 rBuf->S[head_len / 2 + 2 + jr * (roi[0] + 4)]);
1038 }
1039 //get index into mBuffer for this event (create if needed)
1040
1041 int actid;
1042 if (g_useFTM > 0)
1043 actid = rd[i].evtID;
1044 else
1045 actid = rd[i].ftmID;
1046
1047 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1048 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1049 rd[i].evtID);
1050
1051 if (evID < -1000) {
1052 goto EndBuf; //not usable board/event/run --> skip it
1053 }
1054 if (evID < 0) { //no space left, retry later
1055#ifdef EVTDEBUG
1056 if (rd[i].bufLen != 0) {
1057 snprintf (str, MXSTR, "something screwed up");
1058 factOut (kFatal, 1, str);
1059 }
1060#endif
1061 xwait.tv_sec = 0;
1062 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1063 nanosleep (&xwait, NULL);
1064 goto EndBuf1; //hope there is free space next round
1065 }
1066 //we have a valid entry in mBuffer[]; fill it
1067
1068#ifdef EVTDEBUG
1069 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1070 rd[i].fadLen);
1071 if (xchk != 0) {
1072 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d",
1073 xchk, rd[i].fadLen, i);
1074 factOut (kFatal, 1, str);
1075
1076 uint iq;
1077 for (iq = 0; iq < rd[i].fadLen; iq++) {
1078 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1079 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq,
1080 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
1081 factOut (kFatal, 1, str);
1082 }
1083 }
1084 }
1085#endif
1086 int qncpy = 0;
1087 boardId = b;
1088 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1089 int fadCrate = fadBoard / 256;
1090 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1091 snprintf (str, MXSTR, "wrong Board ID %d %d %d",
1092 fadCrate, fadBoard % 256, boardId);
1093 factOut (kWarn, 301, str);
1094 }
1095 if (mBuffer[evID].board[boardId] != -1) {
1096 snprintf (str, MXSTR,
1097 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ",
1098 evID, boardId, i, rd[i].fadLen,
1099 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1100 rd[i].rBuf->B[rd[i].bufPos - 2],
1101 rd[i].rBuf->B[rd[i].bufPos - 1]);
1102 factOut (kWarn, 501, str);
1103 goto EndBuf; //--> skip Board
1104 }
1105
1106 int iDx = evtIdx[evID]; //index into evtCtrl
1107
1108 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
1109 &rd[i].rBuf->S[0], head_len);
1110 qncpy += head_len;
1111
1112 src = head_len / 2;
1113 for (px = 0; px < 9; px++) { //different sort in FAD board.....
1114 for (drs = 0; drs < 4; drs++) {
1115 pixH = ntohs (rd[i].rBuf->S[src++]); // ID
1116 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell
1117 pixR = ntohs (rd[i].rBuf->S[src++]); // roi
1118//here we should check if pixH is correct ....
1119
1120 pixS = boardId * 36 + drs * 9 + px;
1121 src++;
1122
1123
1124 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1125 dest = pixS * roi[0];
1126 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1127 &rd[i].rBuf->S[src], roi[0] * 2);
1128 qncpy += roi[0] * 2;
1129 src += pixR;
1130
1131 if (px == 8) {
1132 tmS = boardId * 4 + drs;
1133 if (pixR > roi[0]) { //and we have additional TM info
1134 dest = tmS * roi[0] + NPIX * roi[0];
1135 int srcT = src - roi[0];
1136 mBuffer[evID].fEvent->StartTM[tmS] =
1137 (pixC + pixR - roi[0]) % 1024;
1138 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1139 &rd[i].rBuf->S[srcT], roi[0] * 2);
1140 qncpy += roi[0] * 2;
1141 } else {
1142 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1143 }
1144 }
1145 }
1146 } // now we have stored a new board contents into Event structure
1147
1148 mBuffer[evID].fEvent->NumBoards++;
1149 mBuffer[evID].board[boardId] = boardId;
1150 evtCtrl.evtStat[iDx]++;
1151 evtCtrl.pcTime[iDx] = g_actTime;
1152
1153 if (++mBuffer[evID].nBoard >= actBoards) {
1154 int qnrun = 0;
1155 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ???
1156 actrun = mBuffer[evID].runNum;
1157 int ir;
1158 for (ir = 0; ir < MAX_RUN; ir++) {
1159 qnrun++;
1160 if (runCtrl[ir].runId == actrun) {
1161 if (++runCtrl[ir].lastEvt == 0) {
1162 gotNewRun (actrun, mBuffer[evID].FADhead);
1163 snprintf (str, MXSTR, "gotNewRun %d (ev %d)",
1164 mBuffer[evID].runNum,
1165 mBuffer[evID].evNum);
1166 factOut (kInfo, 1, str);
1167 break;
1168 }
1169 }
1170 }
1171 }
1172 snprintf (str, MXSTR,
1173 "%5d complete event roi %4d roiTM %d cpy %8d %5d",
1174 mBuffer[evID].evNum, roi[0], roi[8] - roi[0],
1175 qncpy, qnrun);
1176 factOut (kDebug, -1, str);
1177
1178 //complete event read ---> flag for next processing
1179 evtCtrl.evtStat[iDx] = 99;
1180 gi.evtTot++;
1181 }
1182
1183 EndBuf:
1184 rd[i].bufTyp = 0; //ready to read next header
1185 rd[i].bufLen = frst_len;
1186 rd[i].bufPos = 0;
1187 EndBuf1:
1188 ;
1189 }
1190
1191 } else { //we are reading event header
1192 rd[i].bufPos += jrd;
1193 rd[i].bufLen -= jrd;
1194 if (rd[i].bufPos >= minLen) { //sufficient data to take action
1195 //check if startflag correct; else shift block ....
1196 for (k = 0; k < rd[i].bufPos - 1; k++) {
1197 if (rd[i].rBuf->B[k] == start.B[1]
1198 && rd[i].rBuf->B[k + 1] == start.B[0])
1199 break;
1200 }
1201 rd[i].skip += k;
1202
1203 if (k >= rd[i].bufPos - 1) { //no start of header found
1204 rd[i].bufPos = 0;
1205 rd[i].bufLen = head_len;
1206 } else if (k > 0) {
1207 rd[i].bufPos -= k;
1208 rd[i].bufLen += k;
1209 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1210 rd[i].bufPos);
1211#ifdef EVTDEBUG
1212 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1213 rd[i].bufPos);
1214#endif
1215 }
1216 if (rd[i].bufPos >= minLen) {
1217 if (rd[i].skip > 0) {
1218 snprintf (str, MXSTR, "skipped %d bytes on port%d",
1219 rd[i].skip, i);
1220 factOut (kInfo, 666, str);
1221 rd[i].skip = 0;
1222 }
1223 goodhed++;
1224 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2;
1225 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]);
1226 rd[i].ftmTyp = ntohs (rd[i].rBuf->S[5]);
1227 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt)
1228 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt)
1229 rd[i].runID = ntohl (rd[i].rBuf->I[11]);
1230 rd[i].bufTyp = 1; //ready to read full record
1231 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1232
1233 int fadboard = ntohs (rd[i].rBuf->S[12]);
1234 int fadcrate = fadboard / 256;
1235 fadboard = (fadcrate * 10 + fadboard % 256);
1236#ifdef EVTDEBUG
1237 snprintf (str, MXSTR,
1238 "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
1239 rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
1240 rd[i].runID, fadboard, jrd);
1241 factOut (kDebug, 1, str);
1242#endif
1243
1244 if (rd[i].runID == 0)
1245 rd[i].runID = gi_myRun;
1246
1247 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
1248 snprintf (str, MXSTR,
1249 "illegal event-length on port %d", i);
1250 factOut (kFatal, 881, str);
1251 rd[i].bufLen = 100000; //?
1252 }
1253 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1254 debugHead (i, fadBoard, rd[i].rBuf);
1255 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1256 } else {
1257 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1258 }
1259 } else {
1260 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1261 }
1262
1263 } //end interpreting last read
1264}
1265 } //end of successful read anything
1266 } //finished trying to read all sockets
1267
1268#ifdef EVTDEBUG
1269 snprintf (str, MXSTR, "Loop ---- %3d --- %8d", numokx, jrdx);
1270 factOut (kDebug, -1, str);
1271#endif
1272
1273 gi.numRead[numok]++;
1274
1275 g_actTime = time (NULL);
1276 if (g_actTime > gi_SecTime) {
1277 gi_SecTime = g_actTime;
1278
1279
1280 //loop over all active events and flag those older than read-timeout
1281 //delete those that are written to disk ....
1282
1283 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1284 if (kd < 0)
1285 kd += (MAX_EVT * MAX_RUN);
1286
1287 gj.bufNew = gj.bufEvt = 0;
1288 int k1 = evtCtrl.frstPtr;
1289 for (k = k1; k < (k1 + kd); k++) {
1290 int k0 = k % (MAX_EVT * MAX_RUN);
1291//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1292
1293 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) {
1294 gj.bufNew++; //incomplete event in Buffer
1295 if (evtCtrl.evtStat[k0] < 90
1296 && evtCtrl.pcTime[k0] < g_actTime - 10) {
1297 int id = evtCtrl.evtBuf[k0];
1298 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d",
1299 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0,
1300 evtCtrl.evtStat[k0]);
1301 factOut (kWarn, 601, str);
1302
1303 int ik,ib,jb;
1304 ik=0;
1305 for (ib=0; ib<NBOARDS; ib++) {
1306 if (ib%10==0) {
1307 snprintf (&str[ik], MXSTR, "'");
1308 ik++;
1309 }
1310 jb = mBuffer[id].board[ib];
1311 if ( jb<0 ) {
1312 if (gi_NumConnect[b] >0 ) {
1313 snprintf (&str[ik], MXSTR, ".");
1314 } else {
1315 snprintf (&str[ik], MXSTR, "x");
1316 }
1317 } else {
1318 snprintf (&str[ik], MXSTR, "%d",jb%10);
1319 }
1320 ik++;
1321 }
1322 snprintf (&str[ik], MXSTR, "'");
1323 factOut (kWarn, 601, str);
1324
1325 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events
1326 gi.evtSkp++;
1327 gi.evtTot++;
1328 gj.evtSkip++;
1329 }
1330 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete'
1331 || evtCtrl.evtStat[k0] == 0) { //'useless'
1332
1333 int id = evtCtrl.evtBuf[k0];
1334 snprintf (str, MXSTR, "%5d free event buffer, nb=%3d",
1335 mBuffer[id].evNum, mBuffer[id].nBoard);
1336 factOut (kDebug, -1, str);
1337 mBufFree (id); //event written--> free memory
1338 evtCtrl.evtStat[k0] = -1;
1339 gj.evtWrite++;
1340 gj.rateWrite++;
1341 } else if (evtCtrl.evtStat[k0] >= 95) {
1342 gj.bufEvt++; //complete event in Buffer
1343 }
1344
1345 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) {
1346 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1347 }
1348 }
1349
1350
1351 gj.deltaT = 1000; //temporary, must be improved
1352
1353 int b;
1354 for (b = 0; b < NBOARDS; b++)
1355 gj.totBytes[b] += gj.rateBytes[b];
1356 gj.totMem = g_maxMem;
1357 if (gj.maxMem > gj.xxxMem)
1358 gj.xxxMem = gj.maxMem;
1359 if (gj.maxEvt > gj.xxxEvt)
1360 gj.xxxEvt = gj.maxEvt;
1361
1362 factStat (gj);
1363 factStatNew (gi);
1364 gj.rateNew = gj.rateWrite = 0;
1365 gj.maxMem = gj.usdMem;
1366 gj.maxEvt = gj.bufTot;
1367 for (b = 0; b < NBOARDS; b++)
1368 gj.rateBytes[b] = 0;
1369 }
1370
1371 if (numok > 0)
1372 numok2 = 0;
1373 else if (numok2++ > 3) {
1374 if (g_runStat == 1) {
1375 xwait.tv_sec = 1;
1376 xwait.tv_nsec = 0; // hibernate for 1 sec
1377 } else {
1378 xwait.tv_sec = 0;
1379 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1380 }
1381 nanosleep (&xwait, NULL);
1382 }
1383
1384 } //and do next loop over all sockets ...
1385
1386
1387 snprintf (str, MXSTR, "stop reading ... RESET=%d", g_reset);
1388 factOut (kInfo, -1, str);
1389
1390 if (g_reset > 0) {
1391 gi_reset = g_reset;
1392 gi_resetR = gi_reset % 10; //shall we stop reading ?
1393 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1394 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1395 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1396 g_reset = 0;
1397 } else {
1398 gi_reset = 0;
1399 if (g_runStat == -1)
1400 gi_resetR = 1;
1401 else
1402 gi_resetR = 7;
1403 gi_resetS = 7; //close all sockets
1404 gi_resetW = 7; //close all files
1405 gi_resetX = 0;
1406
1407 //inform others we have to quit ....
1408 gi_runStat = -11; //inform all that no update to happen any more
1409 gj.readStat = -11; //inform all that no update to happen any more
1410 }
1411
1412 if (gi_resetS > 0) {
1413 //must close all open sockets ...
1414 snprintf (str, MXSTR, "close all sockets ...");
1415 factOut (kInfo, -1, str);
1416 for (i = 0; i < MAX_SOCK; i++) {
1417 if (rd[i].sockStat == 0) {
1418 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1419 if (i % 7 == 0) {
1420// gi_NumConnect[i / 7]--;
1421 gi_NumConnect[i / 7]-= cntsock ;
1422 gi.numConn[i / 7]--;
1423 gj.numConn[i / 7]--;
1424 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1425 rd[i / 7].sockStat = -1; //and try to open asap
1426 }
1427 }
1428 }
1429 }
1430
1431
1432 if (gi_resetR > 0) {
1433 //flag all events as 'read finished'
1434 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1435 if (kd < 0)
1436 kd += (MAX_EVT * MAX_RUN);
1437
1438 int k1 = evtCtrl.frstPtr;
1439 for (k = k1; k < (k1 + kd); k++) {
1440 int k0 = k % (MAX_EVT * MAX_RUN);
1441 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) {
1442 evtCtrl.evtStat[k0] = 91;
1443 gi.evtSkp++;
1444 gi.evtTot++;
1445 }
1446 }
1447
1448 xwait.tv_sec = 0;
1449 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1450 nanosleep (&xwait, NULL);
1451
1452 //and clear all buffers (might have to wait until all others are done)
1453 int minclear;
1454 if (gi_resetR == 1) {
1455 minclear = 900;
1456 snprintf (str, MXSTR, "drain all buffers ...");
1457 } else {
1458 minclear = 0;
1459 snprintf (str, MXSTR, "flush all buffers ...");
1460 }
1461 factOut (kInfo, -1, str);
1462
1463 int numclear = 1;
1464 while (numclear > 0) {
1465 numclear = 0;
1466 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1467 if (kd < 0)
1468 kd += (MAX_EVT * MAX_RUN);
1469
1470 int k1 = evtCtrl.frstPtr;
1471 for (k = k1; k < (k1 + kd); k++) {
1472 int k0 = k % (MAX_EVT * MAX_RUN);
1473 if (evtCtrl.evtStat[k0] > minclear) {
1474 int id = evtCtrl.evtBuf[k0];
1475 snprintf (str, MXSTR, "ev %5d free event buffer, nb=%3d",
1476 mBuffer[id].evNum, mBuffer[id].nBoard);
1477 factOut (kDebug, -1, str);
1478 mBufFree (id); //event written--> free memory
1479 evtCtrl.evtStat[k0] = -1;
1480 } else if (evtCtrl.evtStat[k0] > 0)
1481 numclear++; //writing is still ongoing...
1482
1483 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1484 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1485 }
1486
1487 xwait.tv_sec = 0;
1488 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1489 nanosleep (&xwait, NULL);
1490 }
1491 }
1492
1493 if (gi_reset > 0) {
1494 if (gi_resetW > 0) {
1495 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1496 }
1497 if (gi_resetX > 0) {
1498 xwait.tv_sec = gi_resetX;
1499 xwait.tv_nsec = 0;
1500 nanosleep (&xwait, NULL);
1501 }
1502
1503 snprintf (str, MXSTR, "Continue read Process ...");
1504 factOut (kInfo, -1, str);
1505 gi_reset = 0;
1506 goto START;
1507 }
1508
1509
1510
1511 snprintf (str, MXSTR, "Exit read Process ...");
1512 factOut (kInfo, -1, str);
1513 gi_runStat = -99;
1514 gj.readStat = -99;
1515 factStat (gj);
1516 factStatNew (gi);
1517 return 0;
1518
1519} /*-----------------------------------------------------------------*/
1520
1521
1522void *
1523subProc (void *thrid)
1524{
1525 int threadID, status, numWait, numProc, kd, k1, k0, k, jret;
1526 struct timespec xwait;
1527
1528 int32_t cntr ;
1529
1530 threadID = (int) thrid;
1531
1532 snprintf (str, MXSTR, "Starting sub-process-thread %d", threadID);
1533 factOut (kInfo, -1, str);
1534
1535 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1536 numWait = numProc = 0;
1537 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1538 if (kd < 0)
1539 kd += (MAX_EVT * MAX_RUN);
1540
1541 int k1 = evtCtrl.frstPtr;
1542 for (k = k1; k < (k1 + kd); k++) {
1543 int k0 = k % (MAX_EVT * MAX_RUN);
1544
1545 if (evtCtrl.evtStat[k0] == 1000 + threadID) {
1546 if (gi_resetR > 1) { //we are asked to flush buffers asap
1547 jret = 9100; //flag to be deleted
1548 } else {
1549 int id = evtCtrl.evtBuf[k0];
1550
1551
1552 jret =
1553 subProcEvt (threadID, mBuffer[id].FADhead,
1554 mBuffer[id].fEvent, mBuffer[id].buffer);
1555
1556
1557 if (jret <= threadID) {
1558 snprintf (str, MXSTR,
1559 "process %d wants to send to process %d",
1560 threadID, jret);
1561 factOut (kError, -1, str);
1562 jret = 5300;
1563 } else if (jret <= 0)
1564 jret = 9200 + threadID; //flag as 'to be deleted'
1565 else if (jret >= gi_maxProc)
1566 jret = 5200 + threadID; //flag as 'to be written'
1567 else
1568 jret = 1000 + jret; //flag for next proces
1569 }
1570 evtCtrl.evtStat[k0] = jret;
1571 numProc++;
1572 } else if (evtCtrl.evtStat[k0] < 1000 + threadID)
1573 numWait++;
1574 }
1575
1576 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1577 snprintf (str, MXSTR, "Exit subProcessing Process %d", threadID);
1578 factOut (kInfo, -1, str);
1579 return 0;
1580 }
1581 if (numProc == 0) {
1582 //seems we have nothing to do, so sleep a little
1583 xwait.tv_sec = 0;
1584 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1585 nanosleep (&xwait, NULL);
1586 }
1587 }
1588
1589 snprintf (str, MXSTR, "Ending sub-process-thread %d", threadID);
1590 factOut (kInfo, -1, str);
1591 return;
1592} /*-----------------------------------------------------------------*/
1593
1594
1595void *
1596procEvt (void *ptr)
1597{
1598/* *** main loop processing file, including SW-trigger */
1599 int numProc, numWait;
1600 int k, status, j;
1601 struct timespec xwait;
1602 char str[MXSTR];
1603
1604
1605
1606 int lastRun = 0; //usually run from last event still valid
1607
1608 cpu_set_t mask;
1609 int cpu = 1; //process thread (will be several in final version)
1610
1611 snprintf (str, MXSTR, "Starting process-thread with %d subprocess",
1612 gi_maxProc);
1613 factOut (kInfo, -1, str);
1614
1615/* CPU_ZERO initializes all the bits in the mask to zero. */
1616// CPU_ZERO (&mask);
1617/* CPU_SET sets only the bit corresponding to cpu. */
1618// CPU_SET( 0 , &mask ); leave for system
1619// CPU_SET( 1 , &mask ); used by write process
1620// CPU_SET (2, &mask);
1621// CPU_SET (3, &mask);
1622// CPU_SET (4, &mask);
1623// CPU_SET (5, &mask);
1624// CPU_SET (6, &mask);
1625// CPU_SET( 7 , &mask ); used by read process
1626/* sched_setaffinity returns 0 in success */
1627// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1628// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1629// factOut (kWarn, -1, str);
1630// }
1631
1632
1633 pthread_t thread[100];
1634 int th_ret[100];
1635
1636 for (k = 0; k < gi_maxProc; k++) {
1637 th_ret[k] = pthread_create (&thread[k], NULL, subProc, (void *) k);
1638 }
1639
1640 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1641
1642 numWait = numProc = 0;
1643 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1644 if (kd < 0)
1645 kd += (MAX_EVT * MAX_RUN);
1646
1647 int k1 = evtCtrl.frstPtr;
1648 for (k = k1; k < (k1 + kd); k++) {
1649 int k0 = k % (MAX_EVT * MAX_RUN);
1650//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1651 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) {
1652
1653 if (gi_resetR > 1) { //we are asked to flush buffers asap
1654 evtCtrl.evtStat[k0] = 9991;
1655 } else {
1656
1657//-------- it is better to open the run already here, so call can be used to initialize
1658//-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1659 int id = evtCtrl.evtBuf[k0];
1660 uint32_t irun = mBuffer[id].runNum;
1661 int32_t ievt = mBuffer[id].evNum;
1662 if (runCtrl[lastRun].runId == irun) {
1663 j = lastRun;
1664 } else {
1665 //check which fileID to use (or open if needed)
1666 for (j = 0; j < MAX_RUN; j++) {
1667 if (runCtrl[j].runId == irun)
1668 break;
1669 }
1670 if (j >= MAX_RUN) {
1671 snprintf (str, MXSTR,
1672 "P error: can not find run %d for event %d in %d",
1673 irun, ievt, id);
1674 factOut (kFatal, 901, str);
1675 }
1676 lastRun = j;
1677 }
1678
1679 if (runCtrl[j].fileId < 0) {
1680//---- we need to open a new run ==> make sure all older runs are
1681//---- finished and marked to be closed ....
1682 int j1;
1683 for (j1 = 0; j1 < MAX_RUN; j1++) {
1684 if (runCtrl[j1].fileId == 0) {
1685 runCtrl[j1].procId = 2; //--> do no longer accept events for processing
1686//---- problem: processing still going on ==> must wait for closing ....
1687 snprintf (str, MXSTR,
1688 "P finish run since new one opened %d",
1689 runCtrl[j1].runId);
1690 runFinish1 (runCtrl[j1].runId);
1691 }
1692
1693 }
1694
1695 actRun.Version = 1;
1696 actRun.RunType = -1; //to be adapted
1697
1698 actRun.Nroi = runCtrl[j].roi0;
1699 actRun.NroiTM = runCtrl[j].roi8;
1700 if (actRun.Nroi == actRun.NroiTM)
1701 actRun.NroiTM = 0;
1702 actRun.RunTime = runCtrl[j].firstTime;
1703 actRun.RunUsec = runCtrl[j].firstTime;
1704 actRun.NBoard = NBOARDS;
1705 actRun.NPix = NPIX;
1706 actRun.NTm = NTMARK;
1707 actRun.Nroi = mBuffer[id].nRoi;
1708 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1709 NBOARDS * sizeof (PEVNT_HEADER));
1710
1711 runCtrl[j].fileHd =
1712 runOpen (irun, &actRun, sizeof (actRun));
1713 if (runCtrl[j].fileHd == NULL) {
1714 snprintf (str, MXSTR,
1715 "P could not open a file for run %d", irun);
1716 factOut (kError, 502, str);
1717 runCtrl[j].fileId = 91;
1718 runCtrl[j].procId = 91;
1719 } else {
1720 snprintf (str, MXSTR, "P opened new run_file %d evt %d",
1721 irun, ievt);
1722 factOut (kInfo, -1, str);
1723 runCtrl[j].fileId = 0;
1724 runCtrl[j].procId = 0;
1725 }
1726
1727 }
1728//-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
1729 if (runCtrl[j].procId == 0) {
1730 if (runCtrl[j].closeTime < g_actTime
1731 || runCtrl[j].lastTime < g_actTime - 300
1732 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) {
1733 snprintf (str, MXSTR,
1734 "P reached end of run condition for run %d",
1735 irun);
1736 factOut (kInfo, 502, str);
1737 runFinish1 (runCtrl[j].runId);
1738 runCtrl[j].procId = 1;
1739 }
1740 }
1741 if (runCtrl[j].procId != 0) {
1742 snprintf (str, MXSTR,
1743 "P skip event %d because no active run %d", ievt,
1744 irun);
1745 factOut (kDebug, 502, str);
1746 evtCtrl.evtStat[k0] = 9091;
1747 } else {
1748//--------
1749//--------
1750 id = evtCtrl.evtBuf[k0];
1751 int itevt = mBuffer[id].trgNum;
1752 int itrg = mBuffer[id].trgTyp;
1753 int roi = mBuffer[id].nRoi;
1754 int roiTM = mBuffer[id].nRoiTM;
1755
1756//make sure unused pixels/tmarks are cleared to zero
1757 if (roiTM == roi)
1758 roiTM = 0;
1759 int ip, it, dest, ib;
1760 for (ip = 0; ip < NPIX; ip++) {
1761 if (mBuffer[id].fEvent->StartPix[ip] == -1) {
1762 dest = ip * roi;
1763 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1764 }
1765 }
1766 for (it = 0; it < NTMARK; it++) {
1767 if (mBuffer[id].fEvent->StartTM[it] == -1) {
1768 dest = it * roi + NPIX * roi;
1769 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1770 }
1771 }
1772
1773
1774//and set correct event header ; also check for consistency in event (not yet)
1775 mBuffer[id].fEvent->Roi = roi;
1776 mBuffer[id].fEvent->RoiTM = roiTM;
1777 mBuffer[id].fEvent->EventNum = ievt;
1778 mBuffer[id].fEvent->TriggerNum = itevt;
1779 mBuffer[id].fEvent->TriggerType = itrg;
1780 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
1781 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
1782 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
1783 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
1784 mBuffer[id].fEvent->SoftTrig = 0;
1785
1786
1787 for (ib = 0; ib < NBOARDS; ib++) {
1788 if (mBuffer[id].board[ib] == -1) { //board is not read
1789 mBuffer[id].FADhead[ib].start_package_flag = 0;
1790 mBuffer[id].fEvent->BoardTime[ib] = 0;
1791 } else {
1792 mBuffer[id].fEvent->BoardTime[ib] =
1793 ntohl (mBuffer[id].FADhead[ib].time);
1794 }
1795 }
1796
1797 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
1798 mBuffer[id].fEvent);
1799 gi.procTot++;
1800 numProc++;
1801
1802 if (i < 0) {
1803 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
1804 gi.procErr++;
1805 } else {
1806 evtCtrl.evtStat[k0] = 1000;
1807 runCtrl[j].procEvt++;
1808 }
1809 }
1810 }
1811 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) {
1812 numWait++;
1813 }
1814 }
1815
1816 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1817 snprintf (str, MXSTR, "Exit Processing Process ...");
1818 factOut (kInfo, -1, str);
1819 gp_runStat = -22; //==> we should exit
1820 gj.procStat = -22; //==> we should exit
1821 return 0;
1822 }
1823
1824 if (numProc == 0) {
1825 //seems we have nothing to do, so sleep a little
1826 xwait.tv_sec = 0;
1827 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1828 nanosleep (&xwait, NULL);
1829 }
1830 gp_runStat = gi_runStat;
1831 gj.procStat = gj.readStat;
1832
1833 }
1834
1835 //we are asked to abort asap ==> must flag all remaining events
1836 // when gi_runStat claims that all events are in the buffer...
1837
1838 snprintf (str, MXSTR, "Abort Processing Process ...");
1839 factOut (kInfo, -1, str);
1840 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1841 if (kd < 0)
1842 kd += (MAX_EVT * MAX_RUN);
1843
1844 for (k = 0; k < gi_maxProc; k++) {
1845 pthread_join (thread[k], (void **) &status);
1846 }
1847
1848 int k1 = evtCtrl.frstPtr;
1849 for (k = k1; k < (k1 + kd); k++) {
1850 int k0 = k % (MAX_EVT * MAX_RUN);
1851 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {
1852 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
1853 }
1854 }
1855
1856 gp_runStat = -99;
1857 gj.procStat = -99;
1858
1859 return 0;
1860
1861} /*-----------------------------------------------------------------*/
1862
1863int
1864CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
1865{
1866/* close run runId (all all runs if runId=0) */
1867/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1868 int j;
1869
1870
1871 if (runId == 0) {
1872 for (j = 0; j < MAX_RUN; j++) {
1873 if (runCtrl[j].fileId == 0) { //run is open
1874 runCtrl[j].closeTime = closeTime;
1875 runCtrl[j].maxEvt = maxEvt;
1876 }
1877 }
1878 return 0;
1879 }
1880
1881 for (j = 0; j < MAX_RUN; j++) {
1882 if (runCtrl[j].runId == runId) {
1883 if (runCtrl[j].fileId == 0) { //run is open
1884 runCtrl[j].closeTime = closeTime;
1885 runCtrl[j].maxEvt = maxEvt;
1886 return 0;
1887 } else if (runCtrl[j].fileId < 0) { //run not yet opened
1888 runCtrl[j].closeTime = closeTime;
1889 runCtrl[j].maxEvt = maxEvt;
1890 return +1;
1891 } else { // run already closed
1892 return +2;
1893 }
1894 }
1895 } //we only reach here if the run was never created
1896 return -1;
1897
1898} /*-----------------------------------------------------------------*/
1899
1900
1901void *
1902writeEvt (void *ptr)
1903{
1904/* *** main loop writing event (including opening and closing run-files */
1905
1906 int numWrite, numWait;
1907 int k, j ;
1908 struct timespec xwait;
1909 char str[MXSTR];
1910
1911 cpu_set_t mask;
1912 int cpu = 1; //write thread
1913
1914 snprintf (str, MXSTR, "Starting write-thread");
1915 factOut (kInfo, -1, str);
1916
1917/* CPU_ZERO initializes all the bits in the mask to zero. */
1918// CPU_ZERO (&mask);
1919/* CPU_SET sets only the bit corresponding to cpu. */
1920// CPU_SET (cpu, &mask);
1921/* sched_setaffinity returns 0 in success */
1922// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1923// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
1924// }
1925
1926 int lastRun = 0; //usually run from last event still valid
1927
1928 while (g_runStat > -2) {
1929
1930 numWait = numWrite = 0;
1931 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1932 if (kd < 0)
1933 kd += (MAX_EVT * MAX_RUN);
1934
1935 int k1 = evtCtrl.frstPtr;
1936 for (k = k1; k < (k1 + kd); k++) {
1937 int k0 = k % (MAX_EVT * MAX_RUN);
1938//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1939 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) {
1940
1941 if (gi_resetR > 1) { //we must drain the buffer asap
1942 evtCtrl.evtStat[k0] = 9904;
1943 } else {
1944
1945
1946 int id = evtCtrl.evtBuf[k0];
1947 uint32_t irun = mBuffer[id].runNum;
1948 int32_t ievt = mBuffer[id].evNum;
1949
1950 gi.wrtTot++;
1951 if (runCtrl[lastRun].runId == irun) {
1952 j = lastRun;
1953 } else {
1954 //check which fileID to use (or open if needed)
1955 for (j = 0; j < MAX_RUN; j++) {
1956 if (runCtrl[j].runId == irun)
1957 break;
1958 }
1959 if (j >= MAX_RUN) {
1960 snprintf (str, MXSTR,
1961 "W error: can not find run %d for event %d in %d",
1962 irun, ievt, id);
1963 factOut (kFatal, 901, str);
1964 gi.wrtErr++;
1965 }
1966 lastRun = j;
1967 }
1968
1969 if (runCtrl[j].fileId < 0) {
1970 actRun.Version = 1;
1971 actRun.RunType = -1; //to be adapted
1972
1973 actRun.Nroi = runCtrl[j].roi0;
1974 actRun.NroiTM = runCtrl[j].roi8;
1975 if (actRun.Nroi == actRun.NroiTM)
1976 actRun.NroiTM = 0;
1977 actRun.RunTime = runCtrl[j].firstTime;
1978 actRun.RunUsec = runCtrl[j].firstTime;
1979 actRun.NBoard = NBOARDS;
1980 actRun.NPix = NPIX;
1981 actRun.NTm = NTMARK;
1982 actRun.Nroi = mBuffer[id].nRoi;
1983 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1984 NBOARDS * sizeof (PEVNT_HEADER));
1985
1986 runCtrl[j].fileHd =
1987 runOpen (irun, &actRun, sizeof (actRun));
1988 if (runCtrl[j].fileHd == NULL) {
1989 snprintf (str, MXSTR,
1990 "W could not open a file for run %d", irun);
1991 factOut (kError, 502, str);
1992 runCtrl[j].fileId = 91;
1993 } else {
1994 snprintf (str, MXSTR, "W opened new run_file %d evt %d",
1995 irun, ievt);
1996 factOut (kInfo, -1, str);
1997 runCtrl[j].fileId = 0;
1998 }
1999
2000 }
2001
2002 if (runCtrl[j].fileId != 0) {
2003 if (runCtrl[j].fileId < 0) {
2004 snprintf (str, MXSTR,
2005 "W never opened file for this run %d", irun);
2006 factOut (kError, 123, str);
2007 } else if (runCtrl[j].fileId < 100) {
2008 snprintf (str, MXSTR, "W.file for this run is closed %d",
2009 irun);
2010 factOut (kWarn, 123, str);
2011 runCtrl[j].fileId += 100;
2012 } else {
2013 snprintf (str, MXSTR, "W:file for this run is closed %d",
2014 irun);
2015 factOut (kDebug, 123, str);
2016 }
2017 evtCtrl.evtStat[k0] = 9903;
2018 gi.wrtErr++;
2019 } else {
2020// snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id]));
2021// factOut (kInfo, 504, str);
2022 int i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
2023 sizeof (mBuffer[id]));
2024 if (i >= 0) {
2025 runCtrl[j].lastTime = g_actTime;
2026 runCtrl[j].actEvt++;
2027 evtCtrl.evtStat[k0] = 9901;
2028 snprintf (str, MXSTR,
2029 "%5d successfully wrote for run %d id %5d",
2030 ievt, irun, k0);
2031 factOut (kDebug, 504, str);
2032// gj.writEvt++ ;
2033 } else {
2034 snprintf (str, MXSTR, "W error writing event for run %d",
2035 irun);
2036 factOut (kError, 503, str);
2037 evtCtrl.evtStat[k0] = 9902;
2038 gi.wrtErr++;
2039 }
2040
2041 if (i < 0
2042 || runCtrl[j].lastTime < g_actTime - 300
2043 || runCtrl[j].closeTime < g_actTime
2044 || runCtrl[j].maxEvt < runCtrl[j].actEvt) {
2045 int ii = 0;
2046 if (i < 0)
2047 ii = 1;
2048 else if (runCtrl[j].closeTime < g_actTime)
2049 ii = 2;
2050 else if (runCtrl[j].lastTime < g_actTime - 300)
2051 ii = 3;
2052 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2053 ii = 4;
2054
2055
2056
2057 //close run for whatever reason
2058 if (runCtrl[j].runId == gi_myRun)
2059 gi_myRun = g_actTime;
2060
2061 if (runCtrl[j].procId == 0) {
2062 runFinish1 (runCtrl[j].runId);
2063 runCtrl[j].procId = 92;
2064 }
2065
2066 runCtrl[j].closeTime = g_actTime - 1;
2067 i = runClose (runCtrl[j].fileHd, &runTail[j],
2068 sizeof (runTail[j]));
2069 if (i < 0) {
2070 snprintf (str, MXSTR, "error closing run %d %d AAA",
2071 runCtrl[j].runId, i);
2072 factOut (kError, 503, str);
2073 runCtrl[j].fileId = 92;
2074 } else {
2075 snprintf (str, MXSTR, "W closed run %d AAA %d", irun,
2076 ii);
2077 factOut (kInfo, 503, str);
2078 runCtrl[j].fileId = 93;
2079 }
2080 }
2081 }
2082 }
2083 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2084 numWait++;
2085 }
2086
2087 //check if we should close a run (mainly when no event pending)
2088 for (j = 0; j < MAX_RUN; j++) {
2089 if (runCtrl[j].fileId == 0
2090 && (runCtrl[j].closeTime < g_actTime
2091 || runCtrl[j].lastTime < g_actTime - 300
2092 || runCtrl[j].maxEvt <= runCtrl[j].actEvt)) {
2093 if (runCtrl[j].runId == gi_myRun)
2094 gi_myRun = g_actTime;
2095 int ii = 0;
2096 if (runCtrl[j].closeTime < g_actTime)
2097 ii = 2;
2098 else if (runCtrl[j].lastTime < g_actTime - 300)
2099 ii = 3;
2100 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2101 ii = 4;
2102
2103 if (runCtrl[j].procId == 0) {
2104 runFinish1 (runCtrl[j].runId);
2105 runCtrl[j].procId = 92;
2106 }
2107
2108 runCtrl[j].closeTime = g_actTime - 1;
2109 int i = runClose (runCtrl[j].fileHd, &runTail[j],
2110 sizeof (runTail[j]));
2111 if (i < 0) {
2112 snprintf (str, MXSTR, "error closing run %d %d BBB",
2113 runCtrl[j].runId, i);
2114 factOut (kError, 506, str);
2115 runCtrl[j].fileId = 94;
2116 } else {
2117 snprintf (str, MXSTR, "W closed run %d BBB %d",
2118 runCtrl[j].runId, ii);
2119 factOut (kInfo, 507, str);
2120 runCtrl[j].fileId = 95;
2121 }
2122 }
2123 }
2124
2125 if (numWrite == 0) {
2126 //seems we have nothing to do, so sleep a little
2127 xwait.tv_sec = 0;
2128 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2129 nanosleep (&xwait, NULL);
2130 }
2131
2132 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2133 snprintf (str, MXSTR, "Finish Write Process ...");
2134 factOut (kInfo, -1, str);
2135 gw_runStat = -22; //==> we should exit
2136 gj.writStat = -22; //==> we should exit
2137 goto closerun;
2138 }
2139 gw_runStat = gi_runStat;
2140 gj.writStat = gj.readStat;
2141
2142 }
2143
2144 //must close all open files ....
2145 snprintf (str, MXSTR, "Abort Writing Process ...");
2146 factOut (kInfo, -1, str);
2147
2148 closerun:
2149 snprintf (str, MXSTR, "Close all open files ...");
2150 factOut (kInfo, -1, str);
2151 for (j = 0; j < MAX_RUN; j++)
2152 if (runCtrl[j].fileId == 0) {
2153 if (runCtrl[j].runId == gi_myRun)
2154 gi_myRun = g_actTime;
2155
2156 if (runCtrl[j].procId == 0) {
2157 runFinish1 (runCtrl[j].runId);
2158 runCtrl[j].procId = 92;
2159 }
2160
2161 runCtrl[j].closeTime = g_actTime - 1;
2162 int i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));
2163 int ii = 0;
2164 if (runCtrl[j].closeTime < g_actTime)
2165 ii = 2;
2166 else if (runCtrl[j].lastTime < g_actTime - 300)
2167 ii = 3;
2168 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2169 ii = 4;
2170 if (i < 0) {
2171 snprintf (str, MXSTR, "error closing run %d %d CCC",
2172 runCtrl[j].runId, i);
2173 factOut (kError, 506, str);
2174 runCtrl[j].fileId = 96;
2175 } else {
2176 snprintf (str, MXSTR, "W closed run %d CCC %d", runCtrl[j].runId,
2177 ii);
2178 factOut (kInfo, 507, str);
2179 runCtrl[j].fileId = 97;
2180 }
2181 }
2182
2183 gw_runStat = -99;
2184 gj.writStat = -99;
2185 snprintf (str, MXSTR, "Exit Writing Process ...");
2186 factOut (kInfo, -1, str);
2187 return 0;
2188
2189
2190
2191
2192} /*-----------------------------------------------------------------*/
2193
2194
2195
2196
2197void
2198StartEvtBuild ()
2199{
2200
2201 int i, j, imax, status, th_ret[50];
2202 pthread_t thread[50];
2203 struct timespec xwait;
2204
2205 gi_runStat = gp_runStat = gw_runStat = 0;
2206 gj.readStat = gj.procStat = gj.writStat = 0;
2207
2208 snprintf (str, MXSTR, "Starting EventBuilder V15.07 A");
2209 factOut (kInfo, -1, str);
2210
2211//initialize run control logics
2212 for (i = 0; i < MAX_RUN; i++) {
2213 runCtrl[i].runId = 0;
2214 runCtrl[i].fileId = -2;
2215 }
2216
2217//prepare for subProcesses
2218 gi_maxSize = g_maxSize;
2219 if (gi_maxSize <= 0)
2220 gi_maxSize = 1;
2221
2222 gi_maxProc = g_maxProc;
2223 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2224 snprintf (str, MXSTR, "illegal number of processes %d", gi_maxProc);
2225 factOut (kFatal, 301, str);
2226 gi_maxProc = 1;
2227 }
2228//partially initialize event control logics
2229 evtCtrl.frstPtr = 0;
2230 evtCtrl.lastPtr = 0;
2231
2232//start all threads (more to come) when we are allowed to ....
2233 while (g_runStat == 0) {
2234 xwait.tv_sec = 0;
2235 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2236 nanosleep (&xwait, NULL);
2237 }
2238
2239 i = 0;
2240 th_ret[i] = pthread_create (&thread[i], NULL, readFAD, NULL);
2241 i++;
2242 th_ret[i] = pthread_create (&thread[i], NULL, procEvt, NULL);
2243 i++;
2244 th_ret[i] = pthread_create (&thread[i], NULL, writeEvt, NULL);
2245 i++;
2246 imax = i;
2247
2248
2249#ifdef BILAND
2250 xwait.tv_sec = 30;;
2251 xwait.tv_nsec = 0; // sleep for ~20sec
2252 nanosleep (&xwait, NULL);
2253
2254 printf ("close all runs in 2 seconds\n");
2255
2256 CloseRunFile (0, time (NULL) + 2, 0);
2257
2258 xwait.tv_sec = 1;;
2259 xwait.tv_nsec = 0; // sleep for ~20sec
2260 nanosleep (&xwait, NULL);
2261
2262 printf ("setting g_runstat to -1\n");
2263
2264 g_runStat = -1;
2265#endif
2266
2267
2268//wait for all threads to finish
2269 for (i = 0; i < imax; i++) {
2270 j = pthread_join (thread[i], (void **) &status);
2271 }
2272
2273} /*-----------------------------------------------------------------*/
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289 /*-----------------------------------------------------------------*/
2290 /*-----------------------------------------------------------------*/
2291 /*-----------------------------------------------------------------*/
2292 /*-----------------------------------------------------------------*/
2293 /*-----------------------------------------------------------------*/
2294
2295#ifdef BILAND
2296
2297int
2298subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2299 int8_t * buffer)
2300{
2301 printf ("called subproc %d\n", threadID);
2302 return threadID + 1;
2303}
2304
2305
2306
2307
2308 /*-----------------------------------------------------------------*/
2309 /*-----------------------------------------------------------------*/
2310 /*-----------------------------------------------------------------*/
2311 /*-----------------------------------------------------------------*/
2312 /*-----------------------------------------------------------------*/
2313
2314
2315
2316
2317FileHandle_t
2318runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2319{
2320 return 1;
2321};
2322
2323int
2324runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2325{
2326 return 1;
2327 usleep (10000);
2328 return 1;
2329}
2330
2331
2332//{ return 1; } ;
2333
2334int
2335runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2336{
2337 return 1;
2338};
2339
2340
2341
2342
2343int
2344eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2345{
2346 int i = 0;
2347
2348// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2349// for (i=0; i<NBOARDS; i++) {
2350// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2351// }
2352 return 0;
2353}
2354
2355
2356void
2357factStatNew (EVT_STAT gi)
2358{
2359 int i;
2360
2361//for (i=0;i<MAX_SOCK;i++) {
2362// printf("%4d",gi.numRead[i]);
2363// if (i%20 == 0 ) printf("\n");
2364//}
2365}
2366
2367void
2368gotNewRun (int runnr, PEVNT_HEADER * headers)
2369{
2370 printf ("got new run %d\n", runnr);
2371 return;
2372}
2373
2374void
2375factStat (GUI_STAT gj)
2376{
2377// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2378// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2379}
2380
2381
2382void
2383debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2384 int state, uint32_t tsec, uint32_t tusec)
2385{
2386// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2387}
2388
2389
2390
2391void
2392debugStream (int isock, void *buf, int len)
2393{
2394}
2395
2396void
2397debugHead (int i, int j, void *buf)
2398{
2399}
2400
2401
2402void
2403factOut (int severity, int err, char *message)
2404{
2405 static FILE *fd;
2406 static int file = 0;
2407
2408 if (file == 0) {
2409 printf ("open file\n");
2410 fd = fopen ("x.out", "w+");
2411 file = 999;
2412 }
2413
2414 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2415
2416 if (severity != kDebug)
2417 printf ("%3d %3d | %s\n", severity, err, message);
2418}
2419
2420
2421
2422int
2423main ()
2424{
2425 int i, b, c, p;
2426 char ipStr[100];
2427 struct in_addr IPaddr;
2428
2429 g_maxMem = 1024 * 1024; //MBytes
2430//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2431 g_maxMem = g_maxMem * 200; //100MBytes
2432
2433 g_maxProc = 20;
2434 g_maxSize = 30000;
2435
2436 g_runStat = 40;
2437
2438 i = 0;
2439
2440// version for standard crates
2441//for (c=0; c<4,c++) {
2442// for (b=0; b<10; b++) {
2443// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2444//
2445// inet_pton(PF_INET, ipStr, &IPaddr) ;
2446//
2447// g_port[i].sockAddr.sin_family = PF_INET;
2448// g_port[i].sockAddr.sin_port = htons(5000) ;
2449// g_port[i].sockAddr.sin_addr = IPaddr ;
2450// g_port[i].sockDef = 1 ;
2451// i++ ;
2452// }
2453//}
2454//
2455//version for PC-test *
2456 for (c = 0; c < 4; c++) {
2457 for (b = 0; b < 10; b++) {
2458 sprintf (ipStr, "10.0.%d.11", 128 + c);
2459 if (c < 2)
2460 sprintf (ipStr, "10.0.%d.11", 128);
2461 else
2462 sprintf (ipStr, "10.0.%d.11", 131);
2463// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2464
2465 inet_pton (PF_INET, ipStr, &IPaddr);
2466 p = 31919 + 100 * c + 10 * b;
2467
2468
2469 g_port[i].sockAddr.sin_family = PF_INET;
2470 g_port[i].sockAddr.sin_port = htons (p);
2471 g_port[i].sockAddr.sin_addr = IPaddr;
2472 g_port[i].sockDef = 1;
2473
2474 i++;
2475 }
2476 }
2477
2478
2479//g_port[17].sockDef =-1 ;
2480//g_actBoards-- ;
2481
2482 StartEvtBuild ();
2483
2484 return 0;
2485
2486}
2487#endif
Note: See TracBrowser for help on using the repository browser.