source: trunk/FACT++/src/EventBuilder.c@ 12064

Last change on this file since 12064 was 11895, checked in by tbretz, 13 years ago
Latest one socket version with a fix to stop buffering data after the requested number of events.
File size: 76.8 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6
7#include <stdlib.h>
8#include <stdint.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include "EventBuilder.h"
26
27enum Severity
28{
29 kMessage = 10, ///< Just a message, usually obsolete
30 kInfo = 20, ///< An info telling something which can be interesting to know
31 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
32 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
33 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
34 kDebug = 99, ///< A message used for debugging only
35};
36
37#define MIN_LEN 32 // min #bytes needed to interpret FADheader
38#define MAX_LEN 256*1024 // size of read-buffer per socket
39
40//#define nanosleep(x,y)
41
42extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
43extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
44extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
45//extern int runFinish (uint32_t runnr);
46
47extern void factOut (int severity, int err, char *message);
48
49extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
50
51
52extern void factStat (GUI_STAT gj);
53
54extern void factStatNew (EVT_STAT gi);
55
56extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
57
58extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
59 int8_t * buffer);
60
61extern void debugHead (int i, int j, void *buf);
62
63extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
64 int32_t runnr, int state, uint32_t tsec,
65 uint32_t tusec);
66extern void debugStream (int isock, void *buf, int len);
67
68int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
69
70
71int g_maxProc;
72int g_maxSize;
73int gi_maxSize;
74int gi_maxProc;
75
76uint g_actTime;
77uint g_actUsec;
78int g_runStat;
79int g_reset;
80int g_useFTM;
81
82int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
83size_t g_maxMem; //maximum memory allowed for buffer
84
85//no longer needed ...
86int g_maxBoards; //maximum number of boards to be initialized
87int g_actBoards;
88//
89
90FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
91
92
93int gi_runStat;
94int gp_runStat;
95int gw_runStat;
96
97int gi_memStat = +1;
98
99uint32_t gi_myRun = 0;
100uint32_t actrun = 0;
101
102
103uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
104
105//uint gi_EvtStart= 0 ;
106//uint gi_EvtRead = 0 ;
107//uint gi_EvtBad = 0 ;
108//uint gi_EvtTot = 0 ;
109//size_t gi_usedMem = 0 ;
110
111//uint gw_EvtTot = 0 ;
112//uint gp_EvtTot = 0 ;
113
114PIX_MAP g_pixMap[NPIX];
115
116EVT_STAT gi;
117GUI_STAT gj;
118
119EVT_CTRL evtCtrl; //control of events during processing
120int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
121
122WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
123
124
125
126
127RUN_HEAD actRun;
128
129RUN_CTRL runCtrl[MAX_RUN];
130
131RUN_TAIL runTail[MAX_RUN];
132
133
134/*
135*** Definition of rdBuffer to read in IP packets; keep it global !!!!
136 */
137
138
139typedef union
140{
141 int8_t B[MAX_LEN];
142 int16_t S[MAX_LEN / 2];
143 int32_t I[MAX_LEN / 4];
144 int64_t L[MAX_LEN / 8];
145} CNV_FACT;
146
147typedef struct
148{
149 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
150 int32_t bufPos; //next byte to read to the buffer next
151 int32_t bufLen; //number of bytes left to read
152// size_t bufLen; //number of bytes left to read size_t might be better
153 int32_t skip; //number of bytes skipped before start of event
154
155 int sockStat; //-1 if socket not yet connected , 99 if not exist
156 int socket; //contains the sockets
157 struct sockaddr_in SockAddr; //IP for each socket
158
159 int evtID; // event ID of event currently read
160 int runID; // run "
161 int ftmID; // event ID from FTM
162 uint fadLen; // FADlength of event currently read
163 int fadVers; // Version of FAD
164 int ftmTyp; // trigger type
165 int board; // boardID (softwareID: 0..40 )
166 int Port;
167
168 CNV_FACT *rBuf;
169
170#ifdef EVTDEBUG
171 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
172#endif
173
174} READ_STRUCT;
175
176
177typedef union
178{
179 int8_t B[2];
180 int16_t S;
181} SHORT_BYTE;
182
183
184
185
186
187#define MXSTR 1000
188char str[MXSTR];
189
190SHORT_BYTE start, stop;
191
192READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
193
194
195
196/*-----------------------------------------------------------------*/
197
198
199/*-----------------------------------------------------------------*/
200
201int
202runFinish1 (uint32_t runnr)
203{
204 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
205 runnr);
206 factOut (kInfo, 173, str); //but continue anyhow
207 return 0;
208}
209
210
211int
212GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
213 READ_STRUCT * rd)
214{
215/*
216*** generate Address, create sockets and allocates readbuffer for it
217***
218*** if flag==0 generate socket and buffer
219*** <0 destroy socket and buffer
220*** >0 close and redo socket
221***
222*** sid : board*7 + port id
223 */
224
225 int j;
226 int optval = 1; //activate keepalive
227 socklen_t optlen = sizeof (optval);
228
229 if (rd->sockStat == 0) { //close socket if open
230 j = close (rd->socket);
231 if (j > 0) {
232 snprintf (str, MXSTR, "Error closing socket %d | %m", sid);
233 factOut (kFatal, 771, str);
234 } else {
235 snprintf (str, MXSTR, "Succesfully closed socket %d", sid);
236 factOut (kInfo, 771, str);
237 }
238 }
239
240
241 if (flag < 0) {
242 free (rd->rBuf); //and never open again
243#ifdef EVTDEBUG
244 free (rd->xBuf); //and never open again
245#endif
246 rd->rBuf = NULL;
247 rd->sockStat = 99;
248 return 0;
249 }
250
251
252 if (flag == 0) { //generate address and buffer ...
253 rd->Port = port;
254 rd->SockAddr.sin_family = sockAddr->sin_family;
255 rd->SockAddr.sin_port = htons (port);
256 rd->SockAddr.sin_addr = sockAddr->sin_addr;
257
258#ifdef EVTDEBUG
259 rd->xBuf = malloc (sizeof (CNV_FACT));
260#endif
261 rd->rBuf = malloc (sizeof (CNV_FACT));
262 if (rd->rBuf == NULL) {
263 snprintf (str, MXSTR, "Could not create local buffer %d", sid);
264 factOut (kFatal, 774, str);
265 rd->sockStat = 77;
266 return -3;
267 }
268 }
269
270
271 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
272 snprintf (str, MXSTR, "Could not generate socket %d | %m", sid);
273 factOut (kFatal, 773, str);
274 rd->sockStat = 88;
275 return -2;
276 }
277 optval = 1;
278 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
279 snprintf (str, MXSTR, "Could not set keepalive %d | %m", sid);
280 factOut (kInfo, 173, str); //but continue anyhow
281 }
282 optval = 10; //start after 10 seconds
283 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
284 snprintf (str, MXSTR, "Could not set keepidle %d | %m", sid);
285 factOut (kInfo, 173, str); //but continue anyhow
286 }
287 optval = 10; //do every 10 seconds
288 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
289 snprintf (str, MXSTR, "Could not set keepintvl %d | %m", sid);
290 factOut (kInfo, 173, str); //but continue anyhow
291 }
292 optval = 2; //close after 2 unsuccessful tries
293 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
294 snprintf (str, MXSTR, "Could not set keepalive probes %d | %m", sid);
295 factOut (kInfo, 173, str); //but continue anyhow
296 }
297
298
299
300 snprintf (str, MXSTR, "Successfully generated socket %d ", sid);
301 factOut (kInfo, 773, str);
302 rd->sockStat = -1; //try to (re)open socket
303 return 0;
304
305} /*-----------------------------------------------------------------*/
306
307 /*-----------------------------------------------------------------*/
308
309
310
311
312int
313mBufInit ()
314{
315// initialize mBuffer (mark all entries as unused\empty)
316
317 int i;
318 uint32_t actime;
319
320 actime = g_actTime + 50000000;
321
322 for (i = 0; i < MAX_EVT * MAX_RUN; i++) {
323 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
324 mBuffer[i].runNum = 0;
325
326 evtCtrl.evtBuf[i] = -1;
327 evtCtrl.evtStat[i] = -1;
328 evtCtrl.pcTime[i] = actime; //initiate to far future
329 }
330
331
332 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
333
334 evtCtrl.frstPtr = 0;
335 evtCtrl.lastPtr = 0;
336
337 return 0;
338
339} /*-----------------------------------------------------------------*/
340
341
342
343
344int
345mBufEvt (int evID, uint runID, int nRoi[], int sk,
346 int fadlen, int trgTyp, int trgNum, int fadNum)
347{
348// generate a new Event into mBuffer:
349// make sure only complete Event are possible, so 'free' will always work
350// returns index into mBuffer[], or negative value in case of error
351// error: <-9000 if roi screwed up (not consistent with run)
352// <-8000 (not consistent with event)
353// <-7000 (not consistent with board)
354// < 0 if no space left
355
356 struct timeval *tv, atv;
357 tv = &atv;
358 uint32_t tsec, tusec;
359 uint oldest;
360 int jold;
361
362 int i, k, jr, b, evFree;
363 int headmem = 0;
364 size_t needmem = 0;
365
366
367 b = sk / 7;
368
369 if (nRoi[0] < 0 || nRoi[0] > 1024) {
370 snprintf (str, MXSTR, "illegal nRoi[0]: %d", nRoi[0]);
371 factOut (kError, 999, str);
372 gj.badRoiR++;
373 gj.badRoi[b]++;
374 return -9999;
375 }
376
377 for (jr = 1; jr < 8; jr++) {
378 if (nRoi[jr] != nRoi[0]) {
379 snprintf (str, MXSTR, "wrong nRoi[%d]: %d %d", jr, nRoi[jr],
380 nRoi[0]);
381 factOut (kError, 711, str);
382 gj.badRoiB++;
383 gj.badRoi[b]++;
384 return -7101;
385 }
386 }
387 if (nRoi[8] < nRoi[0]) {
388 snprintf (str, MXSTR, "wrong nRoi_TM: %d %d", nRoi[8], nRoi[0]);
389 factOut (kError, 712, str);
390 gj.badRoiB++;
391 gj.badRoi[b]++;
392 return -7102;
393 }
394
395
396 i = evID % MAX_EVT;
397 evFree = -1;
398
399 for (k = 0; k < MAX_RUN; k++) {
400 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
401 // is it ok ????
402 if (mBuffer[i].nRoi != nRoi[0]
403 || mBuffer[i].nRoiTM != nRoi[8]) {
404 snprintf (str, MXSTR, "illegal evt_roi %d %d ; %d %d",
405 nRoi[0], nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM);
406 factOut (kError, 821, str);
407 gj.badRoiE++;
408 gj.badRoi[b]++;
409 return -8201;
410 }
411// count for inconsistencies
412
413 if (mBuffer[i].trgNum != trgNum)
414 mBuffer[i].Errors[0]++;
415 if (mBuffer[i].fadNum != fadNum)
416 mBuffer[i].Errors[1]++;
417 if (mBuffer[i].trgTyp != trgTyp)
418 mBuffer[i].Errors[2]++;
419
420 //everything seems fine so far ==> use this slot ....
421 return i;
422 }
423 if (evFree < 0 && mBuffer[i].evNum < 0)
424 evFree = i;
425 i += MAX_EVT;
426 }
427
428
429 //event does not yet exist; create it
430 if (evFree < 0) { //no space available in ctrl
431 snprintf (str, MXSTR, "no control slot to keep event %d", evID);
432 factOut (kError, 881, str);
433 return -1;
434 }
435 i = evFree; //found free entry; use it ...
436
437 gettimeofday (tv, NULL);
438 tsec = atv.tv_sec;
439 tusec = atv.tv_usec;
440
441 //check if runId already registered in runCtrl
442 evFree = -1;
443 oldest = g_actTime + 1000;
444 jold = -1;
445 for (k = 0; k < MAX_RUN; k++) {
446 if (runCtrl[k].runId == runID) {
447// if (runCtrl[k].procId > 0) { //run is closed -> reject
448// snprintf (str, MXSTR, "skip event since run %d finished", runID);
449// factOut (kInfo, 931, str);
450// return -21;
451// }
452
453 if (runCtrl[k].roi0 != nRoi[0]
454 || runCtrl[k].roi8 != nRoi[8]) {
455 snprintf (str, MXSTR, "illegal run_roi %d %d ; %d %d",
456 nRoi[0], nRoi[8], runCtrl[k].roi0, runCtrl[k].roi8);
457 factOut (kError, 931, str);
458 gj.badRoiR++;
459 gj.badRoi[b]++;
460 return -9301;
461 }
462 goto RUNFOUND;
463 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
464 evFree = k;
465 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
466 if (runCtrl[k].closeTime < oldest) {
467 oldest = runCtrl[k].closeTime;
468 jold = k;
469 }
470 }
471 }
472
473 if (evFree < 0 && jold < 0) {
474 snprintf (str, MXSTR, "not able to register the new run %d", runID);
475 factOut (kFatal, 883, str);
476 return -1001;
477 } else {
478 if (evFree < 0)
479 evFree = jold;
480 snprintf (str, MXSTR, "register new run %d(%d) roi: %d %d", runID,
481 evFree, nRoi[0], nRoi[8]);
482 factOut (kInfo, 503, str);
483 runCtrl[evFree].runId = runID;
484 runCtrl[evFree].roi0 = nRoi[0];
485 runCtrl[evFree].roi8 = nRoi[8];
486 runCtrl[evFree].fileId = -2;
487 runCtrl[evFree].procId = -2;
488 runCtrl[evFree].lastEvt = -1;
489 runCtrl[evFree].nextEvt = 0;
490 runCtrl[evFree].actEvt = 0;
491 runCtrl[evFree].procEvt = 0;
492 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
493 runCtrl[evFree].firstUsec = tusec;
494 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
495 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
496// runCtrl[evFree].lastTime = 0;
497
498 runTail[evFree].nEventsOk =
499 runTail[evFree].nEventsRej =
500 runTail[evFree].nEventsBad =
501 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
502 }
503
504 RUNFOUND:
505
506 needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
507
508 headmem = NBOARDS * sizeof (PEVNT_HEADER);
509
510 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
511 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
512 if (gi_memStat > 0) {
513 gi_memStat = -99;
514 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
515 evID, sk);
516 factOut (kError, 882, str);
517 } else {
518 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
519 evID, sk);
520 factOut (kDebug, 882, str);
521 }
522 return -11;
523 }
524
525 mBuffer[i].FADhead = malloc (headmem);
526 if (mBuffer[i].FADhead == NULL) {
527 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
528 factOut (kError, 882, str);
529 return -12;
530 }
531
532 mBuffer[i].fEvent = malloc (needmem);
533 if (mBuffer[i].fEvent == NULL) {
534 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
535 factOut (kError, 882, str);
536 free (mBuffer[i].FADhead);
537 mBuffer[i].FADhead = NULL;
538 return -22;
539 }
540
541 mBuffer[i].buffer = malloc (gi_maxSize);
542 if (mBuffer[i].buffer == NULL) {
543 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
544 factOut (kError, 882, str);
545 free (mBuffer[i].FADhead);
546 mBuffer[i].FADhead = NULL;
547 free (mBuffer[i].fEvent);
548 mBuffer[i].fEvent = NULL;
549 return -32;
550 }
551 //flag all boards as unused
552 mBuffer[i].nBoard = 0;
553 for (k = 0; k < NBOARDS; k++) {
554 mBuffer[i].board[k] = -1;
555 }
556 //flag all pixels as unused
557 for (k = 0; k < NPIX; k++) {
558 mBuffer[i].fEvent->StartPix[k] = -1;
559 }
560 //flag all TMark as unused
561 for (k = 0; k < NTMARK; k++) {
562 mBuffer[i].fEvent->StartTM[k] = -1;
563 }
564
565 mBuffer[i].fEvent->NumBoards = 0;
566 mBuffer[i].fEvent->PCUsec = tusec;
567 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
568 mBuffer[i].nRoi = nRoi[0];
569 mBuffer[i].nRoiTM = nRoi[8];
570 mBuffer[i].evNum = evID;
571 mBuffer[i].runNum = runID;
572 mBuffer[i].fadNum = fadNum;
573 mBuffer[i].trgNum = trgNum;
574 mBuffer[i].trgTyp = trgTyp;
575 mBuffer[i].evtLen = needmem;
576 mBuffer[i].Errors[0] =
577 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
578
579 gj.usdMem += needmem + headmem + gi_maxSize;
580 if (gj.usdMem > gj.maxMem)
581 gj.maxMem = gj.usdMem;
582
583 gj.bufTot++;
584 if (gj.bufTot > gj.maxEvt)
585 gj.maxEvt = gj.bufTot;
586
587 gj.rateNew++;
588
589 //register event in 'active list (reading)'
590
591 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
592 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
593 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
594 evtIdx[i] = evtCtrl.lastPtr;
595
596
597 snprintf (str, MXSTR,
598 "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", evID,
599 runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
600 factOut (kDebug, -11, str);
601 evtCtrl.lastPtr++;
602 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN)
603 evtCtrl.lastPtr = 0;
604
605 gi.evtGet++;
606
607 return i;
608
609} /*-----------------------------------------------------------------*/
610
611
612
613
614int
615mBufFree (int i)
616{
617//delete entry [i] from mBuffer:
618//(and make sure multiple calls do no harm ....)
619
620 int headmem = 0;
621 int evid;
622 size_t freemem = 0;
623
624 evid = mBuffer[i].evNum;
625 freemem = mBuffer[i].evtLen;
626
627 free (mBuffer[i].fEvent);
628 mBuffer[i].fEvent = NULL;
629
630 free (mBuffer[i].FADhead);
631 mBuffer[i].FADhead = NULL;
632
633 free (mBuffer[i].buffer);
634 mBuffer[i].buffer = NULL;
635
636 headmem = NBOARDS * sizeof (PEVNT_HEADER);
637 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
638 mBuffer[i].runNum = 0;
639
640 gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
641 gj.bufTot--;
642
643 if (gi_memStat < 0) {
644 if (gj.usdMem <= 0.75 * gj.maxMem)
645 gi_memStat = +1;
646 }
647
648
649 return 0;
650
651} /*-----------------------------------------------------------------*/
652
653
654void
655resetEvtStat ()
656{
657 int i;
658
659 for (i = 0; i < MAX_SOCK; i++)
660 gi.numRead[i] = 0;
661
662 for (i = 0; i < NBOARDS; i++) {
663 gi.gotByte[i] = 0;
664 gi.gotErr[i] = 0;
665
666 }
667
668 gi.evtGet = 0; //#new Start of Events read
669 gi.evtTot = 0; //#complete Events read
670 gi.evtErr = 0; //#Events with Errors
671 gi.evtSkp = 0; //#Events incomplete (timeout)
672
673 gi.procTot = 0; //#Events processed
674 gi.procErr = 0; //#Events showed problem in processing
675 gi.procTrg = 0; //#Events accepted by SW trigger
676 gi.procSkp = 0; //#Events rejected by SW trigger
677
678 gi.feedTot = 0; //#Events used for feedBack system
679 gi.feedErr = 0; //#Events rejected by feedBack
680
681 gi.wrtTot = 0; //#Events written to disk
682 gi.wrtErr = 0; //#Events with write-error
683
684 gi.runOpen = 0; //#Runs opened
685 gi.runClose = 0; //#Runs closed
686 gi.runErr = 0; //#Runs with open/close errors
687
688 return;
689} /*-----------------------------------------------------------------*/
690
691
692
693void
694initReadFAD ()
695{
696 return;
697} /*-----------------------------------------------------------------*/
698
699
700
701void *
702readFAD (void *ptr)
703{
704/* *** main loop reading FAD data and sorting them to complete events */
705 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k;
706 int actBoards = 0, minLen;
707 int32_t jrd;
708 uint gi_SecTime; //time in seconds
709 int boardId, roi[9], drs, px, src, pixS, pixH, pixC, pixR, tmS;
710
711 int goodhed = 0;
712
713 int sockDef[NBOARDS]; //internal state of sockets
714 int jrdx;
715
716
717 struct timespec xwait;
718
719
720 struct timeval *tv, atv;
721 tv = &atv;
722 uint32_t tsec, tusec;
723
724
725 snprintf (str, MXSTR, "start initializing");
726 factOut (kInfo, -1, str);
727
728 int cpu = 7; //read thread
729 cpu_set_t mask;
730
731/* CPU_ZERO initializes all the bits in the mask to zero. */
732 CPU_ZERO (&mask);
733/* CPU_SET sets only the bit corresponding to cpu. */
734 cpu = 7;
735 CPU_SET (cpu, &mask);
736
737/* sched_setaffinity returns 0 in success */
738 if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
739 snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
740 factOut (kWarn, -1, str);
741 }
742
743
744 head_len = sizeof (PEVNT_HEADER);
745 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
746 minLen = head_len; //min #bytes needed to check header: full header for debug
747
748 start.S = 0xFB01;
749 stop.S = 0x04FE;
750
751/* initialize run control logics */
752 for (i = 0; i < MAX_RUN; i++) {
753 runCtrl[i].runId = 0;
754 runCtrl[i].fileId = -2;
755 runCtrl[i].procId = -2;
756 }
757 gi_resetS = gi_resetR = 9;
758 for (i = 0; i < NBOARDS; i++)
759 sockDef[i] = 0;
760
761 START:
762 gettimeofday (tv, NULL);
763 g_actTime = tsec = atv.tv_sec;
764 g_actUsec = tusec = atv.tv_usec;
765 gi_myRun = g_actTime;
766 evtCtrl.frstPtr = 0;
767 evtCtrl.lastPtr = 0;
768
769 gi_SecTime = g_actTime;
770 gi_runStat = g_runStat;
771 gj.readStat = g_runStat;
772 numok = numok2 = 0;
773
774 int cntsock = 8 - NUMSOCK ;
775
776 if (gi_resetS > 0) {
777 //make sure all sockets are preallocated as 'not exist'
778 for (i = 0; i < MAX_SOCK; i++) {
779 rd[i].socket = -1;
780 rd[i].sockStat = 99;
781 }
782
783 for (k = 0; k < NBOARDS; k++) {
784 gi_NumConnect[k] = 0;
785 gi.numConn[k] = 0;
786 gj.numConn[k] = 0;
787 gj.errConn[k] = 0;
788 gj.rateBytes[k] = 0;
789 gj.totBytes[k] = 0;
790 }
791
792 }
793
794
795 if (gi_resetR > 0) {
796 resetEvtStat ();
797 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
798 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
799 gj.totMem = g_maxMem;
800 gj.bufNew = gj.bufEvt = 0;
801 gj.badRoiE = gj.badRoiR = gj.badRoiB =
802 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
803
804 int b,p;
805 for (b = 0; b < NBOARDS; b++)
806 gj.badRoi[b] = 0;
807
808 mBufInit (); //initialize buffers
809
810 snprintf (str, MXSTR, "end initializing");
811 factOut (kInfo, -1, str);
812 }
813
814
815 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
816
817 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
818
819 gi_runStat = g_runStat;
820 gj.readStat = g_runStat;
821 gettimeofday (tv, NULL);
822 g_actTime = tsec = atv.tv_sec;
823 g_actUsec = tusec = atv.tv_usec;
824
825
826 int b, p, p0, s0, nch;
827 nch = 0;
828 for (b = 0; b < NBOARDS; b++) {
829 k = b * 7;
830 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
831 nch++;
832 gi_NumConnect[b] = 0; //must close all connections
833 gi.numConn[b] = 0;
834 gj.numConn[b] = 0;
835 if (sockDef[b] == 0)
836 s0 = 0; //sockets to be defined and opened
837 else if (g_port[b].sockDef == 0)
838 s0 = -1; //sockets to be destroyed
839 else
840 s0 = +1; //sockets to be closed and reopened
841
842 if (s0 == 0)
843 p0 = ntohs (g_port[b].sockAddr.sin_port);
844 else
845 p0 = 0;
846
847 for (p = p0 + 1; p < p0 + 8; p++) {
848 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
849 k++;
850 }
851 sockDef[b] = g_port[b].sockDef;
852 }
853 }
854
855 if (nch > 0) {
856 actBoards = 0;
857 for (b = 0; b < NBOARDS; b++) {
858 if (sockDef[b] > 0)
859 actBoards++;
860 }
861 }
862
863
864 jrdx = 0;
865 numokx = 0;
866 numok = 0; //count number of succesfull actions
867
868 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read
869 b = i / 7 ;
870 p = i % 7 ;
871
872//if ( b==32 && p>0) { ; }
873if ( p >= NUMSOCK) { ; }
874else {
875 if (sockDef[b] > 0)
876 s0 = +1;
877 else
878 s0 = -1;
879
880 if (rd[i].sockStat < 0) { //try to connect if not yet done
881 rd[i].sockStat = connect (rd[i].socket,
882 (struct sockaddr *) &rd[i].SockAddr,
883 sizeof (rd[i].SockAddr));
884 if (rd[i].sockStat == 0) { //successfull ==>
885 if (sockDef[b] > 0) {
886 rd[i].bufTyp = 0; // expect a header
887 rd[i].bufLen = frst_len; // max size to read at begining
888 } else {
889 rd[i].bufTyp = -1; // full data to be skipped
890 rd[i].bufLen = MAX_LEN; //huge for skipping
891 }
892 rd[i].bufPos = 0; // no byte read so far
893 rd[i].skip = 0; // start empty
894// gi_NumConnect[b]++;
895 gi_NumConnect[b] += cntsock ;
896
897 gi.numConn[b]++;
898 gj.numConn[b]++;
899 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);
900 factOut (kInfo, -1, str);
901 }
902 }
903
904 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
905 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
906 numok++;
907 size_t maxread = rd[i].bufLen ;
908 if (maxread > MAXREAD ) maxread=MAXREAD ;
909
910 jrd =
911 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
912 maxread, MSG_DONTWAIT);
913// rd[i].bufLen, MSG_DONTWAIT);
914
915 if (jrd > 0) {
916 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
917#ifdef EVTDEBUG
918 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
919 &rd[i].rBuf->B[rd[i].bufPos], jrd);
920 snprintf (str, MXSTR,
921 "read sock %3d bytes %5d len %5d first %d %d", i,
922 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos],
923 rd[i].rBuf->B[rd[i].bufPos + 1]);
924 factOut (kDebug, 301, str);
925#endif
926 }
927
928 if (jrd == 0) { //connection has closed ...
929 snprintf (str, MXSTR, "Socket %d closed by FAD", i);
930 factOut (kInfo, 441, str);
931 GenSock (s0, i, 0, NULL, &rd[i]);
932 gi.gotErr[b]++;
933// gi_NumConnect[b]--;
934 gi_NumConnect[b]-= cntsock ;
935 gi.numConn[b]--;
936 gj.numConn[b]--;
937
938 } else if (jrd < 0) { //did not read anything
939 if (errno != EAGAIN && errno != EWOULDBLOCK) {
940 snprintf (str, MXSTR, "Error Reading from %d | %m", i);
941 factOut (kError, 442, str);
942 gi.gotErr[b]++;
943 } else
944 numok--; //else nothing waiting to be read
945 jrd = 0;
946 }
947 } else {
948 jrd = 0; //did read nothing as requested
949 snprintf (str, MXSTR, "do not read from socket %d %d", i,
950 rd[i].bufLen);
951 factOut (kDebug, 301, str);
952 }
953
954 gi.gotByte[b] += jrd;
955 gj.rateBytes[b] += jrd;
956
957 if (jrd > 0) {
958 numokx++;
959 jrdx += jrd;
960 }
961
962
963 if (rd[i].bufTyp < 0) { // we are skipping this board ...
964// just do nothing
965#ifdef EVTDEBUG
966 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
967 i);
968 factOut (kInfo, 301, str);
969#endif
970
971 } else if (rd[i].bufTyp > 0) { // we are reading data ...
972 if (jrd < rd[i].bufLen) { //not yet all read
973 rd[i].bufPos += jrd; //==> prepare for continuation
974 rd[i].bufLen -= jrd;
975 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
976 } else { //full dataset read
977 rd[i].bufLen = 0;
978 rd[i].bufPos = rd[i].fadLen;
979 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
980 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
981 gi.evtErr++;
982 snprintf (str, MXSTR,
983 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
984 i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0],
985 rd[i].rBuf->B[1],
986 rd[i].rBuf->B[rd[i].bufPos - 2],
987 rd[i].rBuf->B[rd[i].bufPos - 1]);
988 factOut (kError, 301, str);
989 goto EndBuf;
990
991#ifdef EVTDEBUG
992 } else {
993 snprintf (str, MXSTR,
994 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
995 i, rd[i].fadLen, rd[i].rBuf->B[0],
996 rd[i].rBuf->B[1], start.B[1], start.B[0],
997 rd[i].rBuf->B[rd[i].bufPos - 2],
998 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
999 stop.B[0]);
1000 factOut (kDebug, 301, str);
1001#endif
1002 }
1003
1004 if (jrd > 0)
1005 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1006
1007 //we have a complete buffer, copy to WORK area
1008 int jr;
1009 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1010 for (jr = 0; jr < 9; jr++) {
1011 roi[jr] =
1012 ntohs (rd[i].
1013 rBuf->S[head_len / 2 + 2 + jr * (roi[0] + 4)]);
1014 }
1015 //get index into mBuffer for this event (create if needed)
1016
1017 int actid;
1018 if (g_useFTM > 0)
1019 actid = rd[i].evtID;
1020 else
1021 actid = rd[i].ftmID;
1022
1023 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1024 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1025 rd[i].evtID);
1026
1027 if (evID < -1000) {
1028 goto EndBuf; //not usable board/event/run --> skip it
1029 }
1030 if (evID < 0) { //no space left, retry later
1031#ifdef EVTDEBUG
1032 if (rd[i].bufLen != 0) {
1033 snprintf (str, MXSTR, "something screwed up");
1034 factOut (kFatal, 1, str);
1035 }
1036#endif
1037 xwait.tv_sec = 0;
1038 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1039 nanosleep (&xwait, NULL);
1040 goto EndBuf1; //hope there is free space next round
1041 }
1042 //we have a valid entry in mBuffer[]; fill it
1043
1044#ifdef EVTDEBUG
1045 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1046 rd[i].fadLen);
1047 if (xchk != 0) {
1048 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d",
1049 xchk, rd[i].fadLen, i);
1050 factOut (kFatal, 1, str);
1051
1052 uint iq;
1053 for (iq = 0; iq < rd[i].fadLen; iq++) {
1054 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1055 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq,
1056 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
1057 factOut (kFatal, 1, str);
1058 }
1059 }
1060 }
1061#endif
1062 int qncpy = 0;
1063 boardId = b;
1064 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1065 int fadCrate = fadBoard / 256;
1066 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1067 snprintf (str, MXSTR, "wrong Board ID %d %d %d",
1068 fadCrate, fadBoard % 256, boardId);
1069 factOut (kWarn, 301, str);
1070 }
1071 if (mBuffer[evID].board[boardId] != -1) {
1072 snprintf (str, MXSTR,
1073 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ",
1074 evID, boardId, i, rd[i].fadLen,
1075 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1076 rd[i].rBuf->B[rd[i].bufPos - 2],
1077 rd[i].rBuf->B[rd[i].bufPos - 1]);
1078 factOut (kWarn, 501, str);
1079 goto EndBuf; //--> skip Board
1080 }
1081
1082 int iDx = evtIdx[evID]; //index into evtCtrl
1083
1084 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
1085 &rd[i].rBuf->S[0], head_len);
1086 qncpy += head_len;
1087
1088 src = head_len / 2;
1089 for (px = 0; px < 9; px++) { //different sort in FAD board.....
1090 for (drs = 0; drs < 4; drs++) {
1091 pixH = ntohs (rd[i].rBuf->S[src++]); // ID
1092 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell
1093 pixR = ntohs (rd[i].rBuf->S[src++]); // roi
1094//here we should check if pixH is correct ....
1095
1096 pixS = boardId * 36 + drs * 9 + px;
1097 src++;
1098
1099
1100 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1101 dest = pixS * roi[0];
1102 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1103 &rd[i].rBuf->S[src], roi[0] * 2);
1104 qncpy += roi[0] * 2;
1105 src += pixR;
1106
1107 if (px == 8) {
1108 tmS = boardId * 4 + drs;
1109 if (pixR > roi[0]) { //and we have additional TM info
1110 dest = tmS * roi[0] + NPIX * roi[0];
1111 int srcT = src - roi[0];
1112 mBuffer[evID].fEvent->StartTM[tmS] =
1113 (pixC + pixR - roi[0]) % 1024;
1114 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1115 &rd[i].rBuf->S[srcT], roi[0] * 2);
1116 qncpy += roi[0] * 2;
1117 } else {
1118 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1119 }
1120 }
1121 }
1122 } // now we have stored a new board contents into Event structure
1123
1124 mBuffer[evID].fEvent->NumBoards++;
1125 mBuffer[evID].board[boardId] = boardId;
1126 evtCtrl.evtStat[iDx]++;
1127 evtCtrl.pcTime[iDx] = g_actTime;
1128
1129 if (++mBuffer[evID].nBoard >= actBoards) {
1130 int qnrun = 0;
1131 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ???
1132 actrun = mBuffer[evID].runNum;
1133 int ir;
1134 for (ir = 0; ir < MAX_RUN; ir++) {
1135 qnrun++;
1136 if (runCtrl[ir].runId == actrun) {
1137 if (++runCtrl[ir].lastEvt == 0) {
1138 gotNewRun (actrun, mBuffer[evID].FADhead);
1139 snprintf (str, MXSTR, "gotNewRun %d (ev %d)",
1140 mBuffer[evID].runNum,
1141 mBuffer[evID].evNum);
1142 factOut (kInfo, 1, str);
1143 break;
1144 }
1145 }
1146 }
1147 }
1148 snprintf (str, MXSTR,
1149 "%5d complete event roi %4d roiTM %d cpy %8d %5d",
1150 mBuffer[evID].evNum, roi[0], roi[8] - roi[0],
1151 qncpy, qnrun);
1152 factOut (kDebug, -1, str);
1153
1154 //complete event read ---> flag for next processing
1155 evtCtrl.evtStat[iDx] = 99;
1156 gi.evtTot++;
1157 }
1158
1159 EndBuf:
1160 rd[i].bufTyp = 0; //ready to read next header
1161 rd[i].bufLen = frst_len;
1162 rd[i].bufPos = 0;
1163 EndBuf1:
1164 ;
1165 }
1166
1167 } else { //we are reading event header
1168 rd[i].bufPos += jrd;
1169 rd[i].bufLen -= jrd;
1170 if (rd[i].bufPos >= minLen) { //sufficient data to take action
1171 //check if startflag correct; else shift block ....
1172 for (k = 0; k < rd[i].bufPos - 1; k++) {
1173 if (rd[i].rBuf->B[k] == start.B[1]
1174 && rd[i].rBuf->B[k + 1] == start.B[0])
1175 break;
1176 }
1177 rd[i].skip += k;
1178
1179 if (k >= rd[i].bufPos - 1) { //no start of header found
1180 rd[i].bufPos = 0;
1181 rd[i].bufLen = head_len;
1182 } else if (k > 0) {
1183 rd[i].bufPos -= k;
1184 rd[i].bufLen += k;
1185 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1186 rd[i].bufPos);
1187#ifdef EVTDEBUG
1188 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1189 rd[i].bufPos);
1190#endif
1191 }
1192 if (rd[i].bufPos >= minLen) {
1193 if (rd[i].skip > 0) {
1194 snprintf (str, MXSTR, "skipped %d bytes on port%d",
1195 rd[i].skip, i);
1196 factOut (kInfo, 666, str);
1197 rd[i].skip = 0;
1198 }
1199 goodhed++;
1200 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2;
1201 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]);
1202 rd[i].ftmTyp = ntohl (rd[i].rBuf->S[5]);
1203 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt)
1204 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt)
1205 rd[i].runID = ntohl (rd[i].rBuf->I[11]);
1206 rd[i].bufTyp = 1; //ready to read full record
1207 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1208
1209 int fadboard = ntohs (rd[i].rBuf->S[12]);
1210 int fadcrate = fadboard / 256;
1211 fadboard = (fadcrate * 10 + fadboard % 256);
1212#ifdef EVTDEBUG
1213 snprintf (str, MXSTR,
1214 "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
1215 rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
1216 rd[i].runID, fadboard, jrd);
1217 factOut (kDebug, 1, str);
1218#endif
1219
1220 if (rd[i].runID == 0)
1221 rd[i].runID = gi_myRun;
1222
1223 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
1224 snprintf (str, MXSTR,
1225 "illegal event-length on port %d", i);
1226 factOut (kFatal, 881, str);
1227 rd[i].bufLen = 100000; //?
1228 }
1229 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1230 debugHead (i, fadBoard, rd[i].rBuf);
1231 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1232 } else {
1233 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1234 }
1235 } else {
1236 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1237 }
1238
1239 } //end interpreting last read
1240}
1241 } //end of successful read anything
1242 } //finished trying to read all sockets
1243
1244#ifdef EVTDEBUG
1245 snprintf (str, MXSTR, "Loop ---- %3d --- %8d", numokx, jrdx);
1246 factOut (kDebug, -1, str);
1247#endif
1248
1249 gi.numRead[numok]++;
1250
1251 g_actTime = time (NULL);
1252 if (g_actTime > gi_SecTime) {
1253 gi_SecTime = g_actTime;
1254
1255
1256 //loop over all active events and flag those older than read-timeout
1257 //delete those that are written to disk ....
1258
1259 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1260 if (kd < 0)
1261 kd += (MAX_EVT * MAX_RUN);
1262
1263 gj.bufNew = gj.bufEvt = 0;
1264 int k1 = evtCtrl.frstPtr;
1265 for (k = k1; k < (k1 + kd); k++) {
1266 int k0 = k % (MAX_EVT * MAX_RUN);
1267//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1268
1269 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) {
1270 gj.bufNew++; //incomplete event in Buffer
1271 if (evtCtrl.evtStat[k0] < 90
1272 && evtCtrl.pcTime[k0] < g_actTime - 10) {
1273 int id = evtCtrl.evtBuf[k0];
1274 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d",
1275 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0,
1276 evtCtrl.evtStat[k0]);
1277 factOut (kWarn, 601, str);
1278 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events
1279 gi.evtSkp++;
1280 gi.evtTot++;
1281 gj.evtSkip++;
1282 }
1283 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete'
1284 || evtCtrl.evtStat[k0] == 0) { //'useless'
1285
1286 int id = evtCtrl.evtBuf[k0];
1287 snprintf (str, MXSTR, "%5d free event buffer, nb=%3d",
1288 mBuffer[id].evNum, mBuffer[id].nBoard);
1289 factOut (kDebug, -1, str);
1290 mBufFree (id); //event written--> free memory
1291 evtCtrl.evtStat[k0] = -1;
1292 gj.evtWrite++;
1293 gj.rateWrite++;
1294 } else if (evtCtrl.evtStat[k0] >= 95) {
1295 gj.bufEvt++; //complete event in Buffer
1296 }
1297
1298 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) {
1299 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1300 }
1301 }
1302
1303
1304 gj.deltaT = 1000; //temporary, must be improved
1305
1306 int b;
1307 for (b = 0; b < NBOARDS; b++)
1308 gj.totBytes[b] += gj.rateBytes[b];
1309 gj.totMem = g_maxMem;
1310 if (gj.maxMem > gj.xxxMem)
1311 gj.xxxMem = gj.maxMem;
1312 if (gj.maxEvt > gj.xxxEvt)
1313 gj.xxxEvt = gj.maxEvt;
1314
1315 factStat (gj);
1316 factStatNew (gi);
1317 gj.rateNew = gj.rateWrite = 0;
1318 gj.maxMem = gj.usdMem;
1319 gj.maxEvt = gj.bufTot;
1320 for (b = 0; b < NBOARDS; b++)
1321 gj.rateBytes[b] = 0;
1322 }
1323
1324 if (numok > 0)
1325 numok2 = 0;
1326 else if (numok2++ > 3) {
1327 if (g_runStat == 1) {
1328 xwait.tv_sec = 1;
1329 xwait.tv_nsec = 0; // hibernate for 1 sec
1330 } else {
1331 xwait.tv_sec = 0;
1332 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1333 }
1334 nanosleep (&xwait, NULL);
1335 }
1336
1337 } //and do next loop over all sockets ...
1338
1339
1340 snprintf (str, MXSTR, "stop reading ... RESET=%d", g_reset);
1341 factOut (kInfo, -1, str);
1342
1343 if (g_reset > 0) {
1344 gi_reset = g_reset;
1345 gi_resetR = gi_reset % 10; //shall we stop reading ?
1346 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1347 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1348 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1349 g_reset = 0;
1350 } else {
1351 gi_reset = 0;
1352 if (g_runStat == -1)
1353 gi_resetR = 1;
1354 else
1355 gi_resetR = 7;
1356 gi_resetS = 7; //close all sockets
1357 gi_resetW = 7; //close all files
1358 gi_resetX = 0;
1359
1360 //inform others we have to quit ....
1361 gi_runStat = -11; //inform all that no update to happen any more
1362 gj.readStat = -11; //inform all that no update to happen any more
1363 }
1364
1365 if (gi_resetS > 0) {
1366 //must close all open sockets ...
1367 snprintf (str, MXSTR, "close all sockets ...");
1368 factOut (kInfo, -1, str);
1369 for (i = 0; i < MAX_SOCK; i++) {
1370 if (rd[i].sockStat == 0) {
1371 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1372 if (i % 7 == 0) {
1373// gi_NumConnect[i / 7]--;
1374 gi_NumConnect[i / 7]-= cntsock ;
1375 gi.numConn[i / 7]--;
1376 gj.numConn[i / 7]--;
1377 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1378 rd[i / 7].sockStat = -1; //and try to open asap
1379 }
1380 }
1381 }
1382 }
1383
1384
1385 if (gi_resetR > 0) {
1386 //flag all events as 'read finished'
1387 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1388 if (kd < 0)
1389 kd += (MAX_EVT * MAX_RUN);
1390
1391 int k1 = evtCtrl.frstPtr;
1392 for (k = k1; k < (k1 + kd); k++) {
1393 int k0 = k % (MAX_EVT * MAX_RUN);
1394 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) {
1395 evtCtrl.evtStat[k0] = 91;
1396 gi.evtSkp++;
1397 gi.evtTot++;
1398 }
1399 }
1400
1401 xwait.tv_sec = 0;
1402 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1403 nanosleep (&xwait, NULL);
1404
1405 //and clear all buffers (might have to wait until all others are done)
1406 int minclear;
1407 if (gi_resetR == 1) {
1408 minclear = 900;
1409 snprintf (str, MXSTR, "drain all buffers ...");
1410 } else {
1411 minclear = 0;
1412 snprintf (str, MXSTR, "flush all buffers ...");
1413 }
1414 factOut (kInfo, -1, str);
1415
1416 int numclear = 1;
1417 while (numclear > 0) {
1418 numclear = 0;
1419 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1420 if (kd < 0)
1421 kd += (MAX_EVT * MAX_RUN);
1422
1423 int k1 = evtCtrl.frstPtr;
1424 for (k = k1; k < (k1 + kd); k++) {
1425 int k0 = k % (MAX_EVT * MAX_RUN);
1426 if (evtCtrl.evtStat[k0] > minclear) {
1427 int id = evtCtrl.evtBuf[k0];
1428 snprintf (str, MXSTR, "ev %5d free event buffer, nb=%3d",
1429 mBuffer[id].evNum, mBuffer[id].nBoard);
1430 factOut (kDebug, -1, str);
1431 mBufFree (id); //event written--> free memory
1432 evtCtrl.evtStat[k0] = -1;
1433 } else if (evtCtrl.evtStat[k0] > 0)
1434 numclear++; //writing is still ongoing...
1435
1436 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1437 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1438 }
1439
1440 xwait.tv_sec = 0;
1441 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1442 nanosleep (&xwait, NULL);
1443 }
1444 }
1445
1446 if (gi_reset > 0) {
1447 if (gi_resetW > 0) {
1448 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1449 }
1450 if (gi_resetX > 0) {
1451 xwait.tv_sec = gi_resetX;
1452 xwait.tv_nsec = 0;
1453 nanosleep (&xwait, NULL);
1454 }
1455
1456 snprintf (str, MXSTR, "Continue read Process ...");
1457 factOut (kInfo, -1, str);
1458 gi_reset = 0;
1459 goto START;
1460 }
1461
1462
1463
1464 snprintf (str, MXSTR, "Exit read Process ...");
1465 factOut (kInfo, -1, str);
1466 gi_runStat = -99;
1467 gj.readStat = -99;
1468 factStat (gj);
1469 factStatNew (gi);
1470 return 0;
1471
1472} /*-----------------------------------------------------------------*/
1473
1474
1475void *
1476subProc (void *thrid)
1477{
1478 int threadID, status, numWait, numProc, kd, k1, k0, k, jret;
1479 struct timespec xwait;
1480
1481 threadID = (int) thrid;
1482
1483 snprintf (str, MXSTR, "Starting sub-process-thread %d", threadID);
1484 factOut (kInfo, -1, str);
1485
1486 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1487 numWait = numProc = 0;
1488 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1489 if (kd < 0)
1490 kd += (MAX_EVT * MAX_RUN);
1491
1492 int k1 = evtCtrl.frstPtr;
1493 for (k = k1; k < (k1 + kd); k++) {
1494 int k0 = k % (MAX_EVT * MAX_RUN);
1495
1496 if (evtCtrl.evtStat[k0] == 1000 + threadID) {
1497 if (gi_resetR > 1) { //we are asked to flush buffers asap
1498 jret = 9100; //flag to be deleted
1499 } else {
1500 int id = evtCtrl.evtBuf[k0];
1501 jret =
1502 subProcEvt (threadID, mBuffer[id].FADhead,
1503 mBuffer[id].fEvent, mBuffer[id].buffer);
1504 if (jret <= threadID) {
1505 snprintf (str, MXSTR,
1506 "process %d wants to send to process %d",
1507 threadID, jret);
1508 factOut (kError, -1, str);
1509 jret = 5300;
1510 } else if (jret <= 0)
1511 jret = 9200 + threadID; //flag as 'to be deleted'
1512 else if (jret >= gi_maxProc)
1513 jret = 5200 + threadID; //flag as 'to be written'
1514 else
1515 jret = 1000 + jret; //flag for next proces
1516 }
1517 evtCtrl.evtStat[k0] = jret;
1518 numProc++;
1519 } else if (evtCtrl.evtStat[k0] < 1000 + threadID)
1520 numWait++;
1521 }
1522
1523 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1524 snprintf (str, MXSTR, "Exit subProcessing Process %d", threadID);
1525 factOut (kInfo, -1, str);
1526 return 0;
1527 }
1528 if (numProc == 0) {
1529 //seems we have nothing to do, so sleep a little
1530 xwait.tv_sec = 0;
1531 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1532 nanosleep (&xwait, NULL);
1533 }
1534 }
1535
1536 snprintf (str, MXSTR, "Ending sub-process-thread %d", threadID);
1537 factOut (kInfo, -1, str);
1538 return;
1539} /*-----------------------------------------------------------------*/
1540
1541
1542void *
1543procEvt (void *ptr)
1544{
1545/* *** main loop processing file, including SW-trigger */
1546 int numProc, numWait;
1547 int k, status, j;
1548 struct timespec xwait;
1549 char str[MXSTR];
1550
1551
1552
1553 int lastRun = 0; //usually run from last event still valid
1554
1555 cpu_set_t mask;
1556 int cpu = 1; //process thread (will be several in final version)
1557
1558 snprintf (str, MXSTR, "Starting process-thread with %d subprocess",
1559 gi_maxProc);
1560 factOut (kInfo, -1, str);
1561
1562/* CPU_ZERO initializes all the bits in the mask to zero. */
1563 CPU_ZERO (&mask);
1564/* CPU_SET sets only the bit corresponding to cpu. */
1565// CPU_SET( 0 , &mask ); leave for system
1566// CPU_SET( 1 , &mask ); used by write process
1567 CPU_SET (2, &mask);
1568 CPU_SET (3, &mask);
1569 CPU_SET (4, &mask);
1570 CPU_SET (5, &mask);
1571 CPU_SET (6, &mask);
1572// CPU_SET( 7 , &mask ); used by read process
1573/* sched_setaffinity returns 0 in success */
1574 if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1575 snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1576 factOut (kWarn, -1, str);
1577 }
1578
1579
1580 pthread_t thread[100];
1581 int th_ret[100];
1582
1583 for (k = 0; k < gi_maxProc; k++) {
1584 th_ret[k] = pthread_create (&thread[k], NULL, subProc, (void *) k);
1585 }
1586
1587 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1588
1589 numWait = numProc = 0;
1590 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1591 if (kd < 0)
1592 kd += (MAX_EVT * MAX_RUN);
1593
1594 int k1 = evtCtrl.frstPtr;
1595 for (k = k1; k < (k1 + kd); k++) {
1596 int k0 = k % (MAX_EVT * MAX_RUN);
1597//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1598 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) {
1599
1600 if (gi_resetR > 1) { //we are asked to flush buffers asap
1601 evtCtrl.evtStat[k0] = 9991;
1602 } else {
1603
1604//-------- it is better to open the run already here, so call can be used to initialize
1605//-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1606 int id = evtCtrl.evtBuf[k0];
1607 uint32_t irun = mBuffer[id].runNum;
1608 int32_t ievt = mBuffer[id].evNum;
1609 if (runCtrl[lastRun].runId == irun) {
1610 j = lastRun;
1611 } else {
1612 //check which fileID to use (or open if needed)
1613 for (j = 0; j < MAX_RUN; j++) {
1614 if (runCtrl[j].runId == irun)
1615 break;
1616 }
1617 if (j >= MAX_RUN) {
1618 snprintf (str, MXSTR,
1619 "P error: can not find run %d for event %d in %d",
1620 irun, ievt, id);
1621 factOut (kFatal, 901, str);
1622 }
1623 lastRun = j;
1624 }
1625
1626 if (runCtrl[j].fileId < 0) {
1627//---- we need to open a new run ==> make sure all older runs are
1628//---- finished and marked to be closed ....
1629 int j1;
1630 for (j1 = 0; j1 < MAX_RUN; j1++) {
1631 if (runCtrl[j1].fileId == 0) {
1632 runCtrl[j1].procId = 2; //--> do no longer accept events for processing
1633//---- problem: processing still going on ==> must wait for closing ....
1634 snprintf (str, MXSTR,
1635 "P finish run since new one opened %d",
1636 runCtrl[j1].runId);
1637 runFinish1 (runCtrl[j1].runId);
1638 }
1639
1640 }
1641
1642 actRun.Version = 1;
1643 actRun.RunType = -1; //to be adapted
1644
1645 actRun.Nroi = runCtrl[j].roi0;
1646 actRun.NroiTM = runCtrl[j].roi8;
1647 if (actRun.Nroi == actRun.NroiTM)
1648 actRun.NroiTM = 0;
1649 actRun.RunTime = runCtrl[j].firstTime;
1650 actRun.RunUsec = runCtrl[j].firstTime;
1651 actRun.NBoard = NBOARDS;
1652 actRun.NPix = NPIX;
1653 actRun.NTm = NTMARK;
1654 actRun.Nroi = mBuffer[id].nRoi;
1655 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1656 NBOARDS * sizeof (PEVNT_HEADER));
1657
1658 runCtrl[j].fileHd =
1659 runOpen (irun, &actRun, sizeof (actRun));
1660 if (runCtrl[j].fileHd == NULL) {
1661 snprintf (str, MXSTR,
1662 "P could not open a file for run %d", irun);
1663 factOut (kError, 502, str);
1664 runCtrl[j].fileId = 91;
1665 runCtrl[j].procId = 91;
1666 } else {
1667 snprintf (str, MXSTR, "P opened new run_file %d evt %d",
1668 irun, ievt);
1669 factOut (kInfo, -1, str);
1670 runCtrl[j].fileId = 0;
1671 runCtrl[j].procId = 0;
1672 }
1673
1674 }
1675//-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
1676 if (runCtrl[j].procId == 0) {
1677 if (runCtrl[j].closeTime < g_actTime
1678 || runCtrl[j].lastTime < g_actTime - 300
1679 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) {
1680 snprintf (str, MXSTR,
1681 "P reached end of run condition for run %d",
1682 irun);
1683 factOut (kInfo, 502, str);
1684 runFinish1 (runCtrl[j].runId);
1685 runCtrl[j].procId = 1;
1686 }
1687 }
1688 if (runCtrl[j].procId != 0) {
1689 snprintf (str, MXSTR,
1690 "P skip event %d because no active run %d", ievt,
1691 irun);
1692 factOut (kDebug, 502, str);
1693 evtCtrl.evtStat[k0] = 9091;
1694 } else {
1695//--------
1696//--------
1697 id = evtCtrl.evtBuf[k0];
1698 int itevt = mBuffer[id].trgNum;
1699 int itrg = mBuffer[id].trgTyp;
1700 int roi = mBuffer[id].nRoi;
1701 int roiTM = mBuffer[id].nRoiTM;
1702
1703//make sure unused pixels/tmarks are cleared to zero
1704 if (roiTM == roi)
1705 roiTM = 0;
1706 int ip, it, dest, ib;
1707 for (ip = 0; ip < NPIX; ip++) {
1708 if (mBuffer[id].fEvent->StartPix[ip] == -1) {
1709 dest = ip * roi;
1710 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1711 }
1712 }
1713 for (it = 0; it < NTMARK; it++) {
1714 if (mBuffer[id].fEvent->StartTM[it] == -1) {
1715 dest = it * roi + NPIX * roi;
1716 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1717 }
1718 }
1719
1720
1721//and set correct event header ; also check for consistency in event (not yet)
1722 mBuffer[id].fEvent->Roi = roi;
1723 mBuffer[id].fEvent->RoiTM = roiTM;
1724 mBuffer[id].fEvent->EventNum = ievt;
1725 mBuffer[id].fEvent->TriggerNum = itevt;
1726 mBuffer[id].fEvent->TriggerType = itrg;
1727 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
1728 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
1729 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
1730 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
1731 mBuffer[id].fEvent->SoftTrig = 0;
1732
1733
1734 for (ib = 0; ib < NBOARDS; ib++) {
1735 if (mBuffer[id].board[ib] == -1) { //board is not read
1736 mBuffer[id].FADhead[ib].start_package_flag = 0;
1737 mBuffer[id].fEvent->BoardTime[ib] = 0;
1738 } else {
1739 mBuffer[id].fEvent->BoardTime[ib] =
1740 ntohl (mBuffer[id].FADhead[ib].time);
1741 }
1742 }
1743
1744 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
1745 mBuffer[id].fEvent);
1746 gi.procTot++;
1747 numProc++;
1748
1749 if (i < 0) {
1750 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
1751 gi.procErr++;
1752 } else {
1753 evtCtrl.evtStat[k0] = 1000;
1754 runCtrl[j].procEvt++;
1755 }
1756 }
1757 }
1758 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) {
1759 numWait++;
1760 }
1761 }
1762
1763 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1764 snprintf (str, MXSTR, "Exit Processing Process ...");
1765 factOut (kInfo, -1, str);
1766 gp_runStat = -22; //==> we should exit
1767 gj.procStat = -22; //==> we should exit
1768 return 0;
1769 }
1770
1771 if (numProc == 0) {
1772 //seems we have nothing to do, so sleep a little
1773 xwait.tv_sec = 0;
1774 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1775 nanosleep (&xwait, NULL);
1776 }
1777 gp_runStat = gi_runStat;
1778 gj.procStat = gj.readStat;
1779
1780 }
1781
1782 //we are asked to abort asap ==> must flag all remaining events
1783 // when gi_runStat claims that all events are in the buffer...
1784
1785 snprintf (str, MXSTR, "Abort Processing Process ...");
1786 factOut (kInfo, -1, str);
1787 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1788 if (kd < 0)
1789 kd += (MAX_EVT * MAX_RUN);
1790
1791 for (k = 0; k < gi_maxProc; k++) {
1792 pthread_join (thread[k], (void **) &status);
1793 }
1794
1795 int k1 = evtCtrl.frstPtr;
1796 for (k = k1; k < (k1 + kd); k++) {
1797 int k0 = k % (MAX_EVT * MAX_RUN);
1798 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {
1799 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
1800 }
1801 }
1802
1803 gp_runStat = -99;
1804 gj.procStat = -99;
1805
1806 return 0;
1807
1808} /*-----------------------------------------------------------------*/
1809
1810int
1811CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
1812{
1813/* close run runId (all all runs if runId=0) */
1814/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1815 int j;
1816
1817
1818 if (runId == 0) {
1819 for (j = 0; j < MAX_RUN; j++) {
1820 if (runCtrl[j].fileId == 0) { //run is open
1821 runCtrl[j].closeTime = closeTime;
1822 runCtrl[j].maxEvt = maxEvt;
1823 }
1824 }
1825 return 0;
1826 }
1827
1828 for (j = 0; j < MAX_RUN; j++) {
1829 if (runCtrl[j].runId == runId) {
1830 if (runCtrl[j].fileId == 0) { //run is open
1831 runCtrl[j].closeTime = closeTime;
1832 runCtrl[j].maxEvt = maxEvt;
1833 return 0;
1834 } else if (runCtrl[j].fileId < 0) { //run not yet opened
1835 runCtrl[j].closeTime = closeTime;
1836 runCtrl[j].maxEvt = maxEvt;
1837 return +1;
1838 } else { // run already closed
1839 return +2;
1840 }
1841 }
1842 } //we only reach here if the run was never created
1843 return -1;
1844
1845} /*-----------------------------------------------------------------*/
1846
1847
1848void *
1849writeEvt (void *ptr)
1850{
1851/* *** main loop writing event (including opening and closing run-files */
1852
1853 int numWrite, numWait;
1854 int k, j, i;
1855 struct timespec xwait;
1856 char str[MXSTR];
1857
1858 cpu_set_t mask;
1859 int cpu = 1; //write thread
1860
1861 snprintf (str, MXSTR, "Starting write-thread");
1862 factOut (kInfo, -1, str);
1863
1864/* CPU_ZERO initializes all the bits in the mask to zero. */
1865 CPU_ZERO (&mask);
1866/* CPU_SET sets only the bit corresponding to cpu. */
1867 CPU_SET (cpu, &mask);
1868/* sched_setaffinity returns 0 in success */
1869 if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1870 snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
1871 }
1872
1873 int lastRun = 0; //usually run from last event still valid
1874
1875 while (g_runStat > -2) {
1876
1877 numWait = numWrite = 0;
1878 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1879 if (kd < 0)
1880 kd += (MAX_EVT * MAX_RUN);
1881
1882 int k1 = evtCtrl.frstPtr;
1883 for (k = k1; k < (k1 + kd); k++) {
1884 int k0 = k % (MAX_EVT * MAX_RUN);
1885//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1886 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9900) {
1887
1888 if (gi_resetR > 1) { //we must drain the buffer asap
1889 evtCtrl.evtStat[k0] = 9904;
1890 } else {
1891
1892
1893 int id = evtCtrl.evtBuf[k0];
1894 uint32_t irun = mBuffer[id].runNum;
1895 int32_t ievt = mBuffer[id].evNum;
1896
1897 gi.wrtTot++;
1898 if (runCtrl[lastRun].runId == irun) {
1899 j = lastRun;
1900 } else {
1901 //check which fileID to use (or open if needed)
1902 for (j = 0; j < MAX_RUN; j++) {
1903 if (runCtrl[j].runId == irun)
1904 break;
1905 }
1906 if (j >= MAX_RUN) {
1907 snprintf (str, MXSTR,
1908 "W error: can not find run %d for event %d in %d",
1909 irun, ievt, id);
1910 factOut (kFatal, 901, str);
1911 gi.wrtErr++;
1912 }
1913 lastRun = j;
1914 }
1915
1916 if (runCtrl[j].fileId < 0) {
1917 actRun.Version = 1;
1918 actRun.RunType = -1; //to be adapted
1919
1920 actRun.Nroi = runCtrl[j].roi0;
1921 actRun.NroiTM = runCtrl[j].roi8;
1922 if (actRun.Nroi == actRun.NroiTM)
1923 actRun.NroiTM = 0;
1924 actRun.RunTime = runCtrl[j].firstTime;
1925 actRun.RunUsec = runCtrl[j].firstTime;
1926 actRun.NBoard = NBOARDS;
1927 actRun.NPix = NPIX;
1928 actRun.NTm = NTMARK;
1929 actRun.Nroi = mBuffer[id].nRoi;
1930 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1931 NBOARDS * sizeof (PEVNT_HEADER));
1932
1933 runCtrl[j].fileHd =
1934 runOpen (irun, &actRun, sizeof (actRun));
1935 if (runCtrl[j].fileHd == NULL) {
1936 snprintf (str, MXSTR,
1937 "W could not open a file for run %d", irun);
1938 factOut (kError, 502, str);
1939 runCtrl[j].fileId = 91;
1940 } else {
1941 snprintf (str, MXSTR, "W opened new run_file %d evt %d",
1942 irun, ievt);
1943 factOut (kInfo, -1, str);
1944 runCtrl[j].fileId = 0;
1945 }
1946
1947 }
1948
1949 if (runCtrl[j].fileId != 0) {
1950 if (runCtrl[j].fileId < 0) {
1951 snprintf (str, MXSTR,
1952 "W never opened file for this run %d", irun);
1953 factOut (kError, 123, str);
1954 } else if (runCtrl[j].fileId < 100) {
1955 snprintf (str, MXSTR, "W.file for this run is closed %d",
1956 irun);
1957 factOut (kWarn, 123, str);
1958 runCtrl[j].fileId += 100;
1959 } else {
1960 snprintf (str, MXSTR, "W:file for this run is closed %d",
1961 irun);
1962 factOut (kDebug, 123, str);
1963 }
1964 evtCtrl.evtStat[k0] = 9903;
1965 gi.wrtErr++;
1966 } else {
1967 i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
1968 sizeof (mBuffer[id]));
1969 if (i >= 0) {
1970 runCtrl[j].lastTime = g_actTime;
1971 runCtrl[j].actEvt++;
1972 evtCtrl.evtStat[k0] = 9901;
1973 snprintf (str, MXSTR,
1974 "%5d successfully wrote for run %d id %5d",
1975 ievt, irun, k0);
1976 factOut (kDebug, 504, str);
1977// gj.writEvt++ ;
1978 } else {
1979 snprintf (str, MXSTR, "W error writing event for run %d",
1980 irun);
1981 factOut (kError, 503, str);
1982 evtCtrl.evtStat[k0] = 9902;
1983 gi.wrtErr++;
1984 }
1985
1986 if (i < 0
1987 || runCtrl[j].lastTime < g_actTime - 300
1988 || runCtrl[j].closeTime < g_actTime
1989 || runCtrl[j].maxEvt < runCtrl[j].actEvt) {
1990 int ii = 0;
1991 if (i < 0)
1992 ii = 1;
1993 else if (runCtrl[j].closeTime < g_actTime)
1994 ii = 2;
1995 else if (runCtrl[j].lastTime < g_actTime - 300)
1996 ii = 3;
1997 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
1998 ii = 4;
1999
2000
2001
2002 //close run for whatever reason
2003 if (runCtrl[j].runId == gi_myRun)
2004 gi_myRun = g_actTime;
2005
2006 if (runCtrl[j].procId == 0) {
2007 runFinish1 (runCtrl[j].runId);
2008 runCtrl[j].procId = 92;
2009 }
2010
2011 runCtrl[j].closeTime = g_actTime - 1;
2012 i = runClose (runCtrl[j].fileHd, &runTail[j],
2013 sizeof (runTail[j]));
2014 if (i < 0) {
2015 snprintf (str, MXSTR, "error closing run %d %d AAA",
2016 runCtrl[j].runId, i);
2017 factOut (kError, 503, str);
2018 runCtrl[j].fileId = 92;
2019 } else {
2020 snprintf (str, MXSTR, "W closed run %d AAA %d", irun,
2021 ii);
2022 factOut (kInfo, 503, str);
2023 runCtrl[j].fileId = 93;
2024 }
2025 }
2026 }
2027 }
2028 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2029 numWait++;
2030 }
2031
2032 //check if we should close a run (mainly when no event pending)
2033 for (j = 0; j < MAX_RUN; j++) {
2034 if (runCtrl[j].fileId == 0
2035 && (runCtrl[j].closeTime < g_actTime
2036 || runCtrl[j].lastTime < g_actTime - 300
2037 || runCtrl[j].maxEvt <= runCtrl[j].actEvt)) {
2038 if (runCtrl[j].runId == gi_myRun)
2039 gi_myRun = g_actTime;
2040 int ii = 0;
2041 if (runCtrl[j].closeTime < g_actTime)
2042 ii = 2;
2043 else if (runCtrl[j].lastTime < g_actTime - 300)
2044 ii = 3;
2045 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2046 ii = 4;
2047
2048 if (runCtrl[j].procId == 0) {
2049 runFinish1 (runCtrl[j].runId);
2050 runCtrl[j].procId = 92;
2051 }
2052
2053 runCtrl[j].closeTime = g_actTime - 1;
2054 i = runClose (runCtrl[j].fileHd, &runTail[j],
2055 sizeof (runTail[j]));
2056 if (i < 0) {
2057 snprintf (str, MXSTR, "error closing run %d %d BBB",
2058 runCtrl[j].runId, i);
2059 factOut (kError, 506, str);
2060 runCtrl[j].fileId = 94;
2061 } else {
2062 snprintf (str, MXSTR, "W closed run %d BBB %d",
2063 runCtrl[j].runId, ii);
2064 factOut (kInfo, 507, str);
2065 runCtrl[j].fileId = 95;
2066 }
2067 }
2068 }
2069
2070 if (numWrite == 0) {
2071 //seems we have nothing to do, so sleep a little
2072 xwait.tv_sec = 0;
2073 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2074 nanosleep (&xwait, NULL);
2075 }
2076
2077 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2078 snprintf (str, MXSTR, "Finish Write Process ...");
2079 factOut (kInfo, -1, str);
2080 gw_runStat = -22; //==> we should exit
2081 gj.writStat = -22; //==> we should exit
2082 goto closerun;
2083 }
2084 gw_runStat = gi_runStat;
2085 gj.writStat = gj.readStat;
2086
2087 }
2088
2089 //must close all open files ....
2090 snprintf (str, MXSTR, "Abort Writing Process ...");
2091 factOut (kInfo, -1, str);
2092
2093 closerun:
2094 snprintf (str, MXSTR, "Close all open files ...");
2095 factOut (kInfo, -1, str);
2096 for (j = 0; j < MAX_RUN; j++)
2097 if (runCtrl[j].fileId == 0) {
2098 if (runCtrl[j].runId == gi_myRun)
2099 gi_myRun = g_actTime;
2100
2101 if (runCtrl[j].procId == 0) {
2102 runFinish1 (runCtrl[j].runId);
2103 runCtrl[j].procId = 92;
2104 }
2105
2106 runCtrl[j].closeTime = g_actTime - 1;
2107 i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));
2108 int ii = 0;
2109 if (runCtrl[j].closeTime < g_actTime)
2110 ii = 2;
2111 else if (runCtrl[j].lastTime < g_actTime - 300)
2112 ii = 3;
2113 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2114 ii = 4;
2115 if (i < 0) {
2116 snprintf (str, MXSTR, "error closing run %d %d CCC",
2117 runCtrl[j].runId, i);
2118 factOut (kError, 506, str);
2119 runCtrl[j].fileId = 96;
2120 } else {
2121 snprintf (str, MXSTR, "W closed run %d CCC %d", runCtrl[j].runId,
2122 ii);
2123 factOut (kInfo, 507, str);
2124 runCtrl[j].fileId = 97;
2125 }
2126 }
2127
2128 gw_runStat = -99;
2129 gj.writStat = -99;
2130 snprintf (str, MXSTR, "Exit Writing Process ...");
2131 factOut (kInfo, -1, str);
2132 return 0;
2133
2134
2135
2136
2137} /*-----------------------------------------------------------------*/
2138
2139
2140
2141
2142void
2143StartEvtBuild ()
2144{
2145
2146 int i, j, imax, status, th_ret[50];
2147 pthread_t thread[50];
2148 struct timespec xwait;
2149
2150 gi_runStat = gp_runStat = gw_runStat = 0;
2151 gj.readStat = gj.procStat = gj.writStat = 0;
2152
2153 snprintf (str, MXSTR, "Starting EventBuilder V15.07 A");
2154 factOut (kInfo, -1, str);
2155
2156//initialize run control logics
2157 for (i = 0; i < MAX_RUN; i++) {
2158 runCtrl[i].runId = 0;
2159 runCtrl[i].fileId = -2;
2160 }
2161
2162//prepare for subProcesses
2163 gi_maxSize = g_maxSize;
2164 if (gi_maxSize <= 0)
2165 gi_maxSize = 1;
2166
2167 gi_maxProc = g_maxProc;
2168 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2169 snprintf (str, MXSTR, "illegal number of processes %d", gi_maxProc);
2170 factOut (kFatal, 301, str);
2171 gi_maxProc = 1;
2172 }
2173//partially initialize event control logics
2174 evtCtrl.frstPtr = 0;
2175 evtCtrl.lastPtr = 0;
2176
2177//start all threads (more to come) when we are allowed to ....
2178 while (g_runStat == 0) {
2179 xwait.tv_sec = 0;
2180 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2181 nanosleep (&xwait, NULL);
2182 }
2183
2184 i = 0;
2185 th_ret[i] = pthread_create (&thread[i], NULL, readFAD, NULL);
2186 i++;
2187 th_ret[i] = pthread_create (&thread[i], NULL, procEvt, NULL);
2188 i++;
2189 th_ret[i] = pthread_create (&thread[i], NULL, writeEvt, NULL);
2190 i++;
2191 imax = i;
2192
2193
2194#ifdef BILAND
2195 xwait.tv_sec = 30;;
2196 xwait.tv_nsec = 0; // sleep for ~20sec
2197 nanosleep (&xwait, NULL);
2198
2199 printf ("close all runs in 2 seconds\n");
2200
2201 CloseRunFile (0, time (NULL) + 2, 0);
2202
2203 xwait.tv_sec = 1;;
2204 xwait.tv_nsec = 0; // sleep for ~20sec
2205 nanosleep (&xwait, NULL);
2206
2207 printf ("setting g_runstat to -1\n");
2208
2209 g_runStat = -1;
2210#endif
2211
2212
2213//wait for all threads to finish
2214 for (i = 0; i < imax; i++) {
2215 j = pthread_join (thread[i], (void **) &status);
2216 }
2217
2218} /*-----------------------------------------------------------------*/
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234 /*-----------------------------------------------------------------*/
2235 /*-----------------------------------------------------------------*/
2236 /*-----------------------------------------------------------------*/
2237 /*-----------------------------------------------------------------*/
2238 /*-----------------------------------------------------------------*/
2239
2240#ifdef BILAND
2241
2242int
2243subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2244 int8_t * buffer)
2245{
2246 printf ("called subproc %d\n", threadID);
2247 return threadID + 1;
2248}
2249
2250
2251
2252
2253 /*-----------------------------------------------------------------*/
2254 /*-----------------------------------------------------------------*/
2255 /*-----------------------------------------------------------------*/
2256 /*-----------------------------------------------------------------*/
2257 /*-----------------------------------------------------------------*/
2258
2259
2260
2261
2262FileHandle_t
2263runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2264{
2265 return 1;
2266};
2267
2268int
2269runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2270{
2271 return 1;
2272 usleep (10000);
2273 return 1;
2274}
2275
2276
2277//{ return 1; } ;
2278
2279int
2280runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2281{
2282 return 1;
2283};
2284
2285
2286
2287
2288int
2289eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2290{
2291 int i = 0;
2292
2293// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2294// for (i=0; i<NBOARDS; i++) {
2295// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2296// }
2297 return 0;
2298}
2299
2300
2301void
2302factStatNew (EVT_STAT gi)
2303{
2304 int i;
2305
2306//for (i=0;i<MAX_SOCK;i++) {
2307// printf("%4d",gi.numRead[i]);
2308// if (i%20 == 0 ) printf("\n");
2309//}
2310}
2311
2312void
2313gotNewRun (int runnr, PEVNT_HEADER * headers)
2314{
2315 printf ("got new run %d\n", runnr);
2316 return;
2317}
2318
2319void
2320factStat (GUI_STAT gj)
2321{
2322// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2323// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2324}
2325
2326
2327void
2328debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2329 int state, uint32_t tsec, uint32_t tusec)
2330{
2331// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2332}
2333
2334
2335
2336void
2337debugStream (int isock, void *buf, int len)
2338{
2339}
2340
2341void
2342debugHead (int i, int j, void *buf)
2343{
2344}
2345
2346
2347void
2348factOut (int severity, int err, char *message)
2349{
2350 static FILE *fd;
2351 static int file = 0;
2352
2353 if (file == 0) {
2354 printf ("open file\n");
2355 fd = fopen ("x.out", "w+");
2356 file = 999;
2357 }
2358
2359 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2360
2361 if (severity != kDebug)
2362 printf ("%3d %3d | %s\n", severity, err, message);
2363}
2364
2365
2366
2367int
2368main ()
2369{
2370 int i, b, c, p;
2371 char ipStr[100];
2372 struct in_addr IPaddr;
2373
2374 g_maxMem = 1024 * 1024; //MBytes
2375//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2376 g_maxMem = g_maxMem * 200; //100MBytes
2377
2378 g_maxProc = 20;
2379 g_maxSize = 30000;
2380
2381 g_runStat = 40;
2382
2383 i = 0;
2384
2385// version for standard crates
2386//for (c=0; c<4,c++) {
2387// for (b=0; b<10; b++) {
2388// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2389//
2390// inet_pton(PF_INET, ipStr, &IPaddr) ;
2391//
2392// g_port[i].sockAddr.sin_family = PF_INET;
2393// g_port[i].sockAddr.sin_port = htons(5000) ;
2394// g_port[i].sockAddr.sin_addr = IPaddr ;
2395// g_port[i].sockDef = 1 ;
2396// i++ ;
2397// }
2398//}
2399//
2400//version for PC-test *
2401 for (c = 0; c < 4; c++) {
2402 for (b = 0; b < 10; b++) {
2403 sprintf (ipStr, "10.0.%d.11", 128 + c);
2404 if (c < 2)
2405 sprintf (ipStr, "10.0.%d.11", 128);
2406 else
2407 sprintf (ipStr, "10.0.%d.11", 131);
2408// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2409
2410 inet_pton (PF_INET, ipStr, &IPaddr);
2411 p = 31919 + 100 * c + 10 * b;
2412
2413
2414 g_port[i].sockAddr.sin_family = PF_INET;
2415 g_port[i].sockAddr.sin_port = htons (p);
2416 g_port[i].sockAddr.sin_addr = IPaddr;
2417 g_port[i].sockDef = 1;
2418
2419 i++;
2420 }
2421 }
2422
2423
2424//g_port[17].sockDef =-1 ;
2425//g_actBoards-- ;
2426
2427 StartEvtBuild ();
2428
2429 return 0;
2430
2431}
2432#endif
Note: See TracBrowser for help on using the repository browser.