source: trunk/FACT++/src/EventBuilder.c@ 11834

Last change on this file since 11834 was 11766, checked in by tbretz, 13 years ago
Latest changes.
File size: 76.1 KB
Line 
1
2// // // #define EVTDEBUG
3
4#include <stdlib.h>
5#include <stdint.h>
6#include <unistd.h>
7#include <stdio.h>
8#include <sys/time.h>
9#include <arpa/inet.h>
10#include <string.h>
11#include <math.h>
12#include <error.h>
13#include <errno.h>
14#include <unistd.h>
15#include <sys/types.h>
16#include <sys/socket.h>
17#include <netinet/in.h>
18#include <netinet/tcp.h>
19#include <pthread.h>
20#include <sched.h>
21
22#include "EventBuilder.h"
23
24enum Severity
25{
26 kMessage = 10, ///< Just a message, usually obsolete
27 kInfo = 20, ///< An info telling something which can be interesting to know
28 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
29 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
30 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
31 kDebug = 99, ///< A message used for debugging only
32};
33
34#define MIN_LEN 32 // min #bytes needed to interpret FADheader
35#define MAX_LEN 256*1024 // size of read-buffer per socket
36
37//#define nanosleep(x,y)
38
39extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
40extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
41extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
42//extern int runFinish (uint32_t runnr);
43
44extern void factOut (int severity, int err, char *message);
45
46extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
47
48
49extern void factStat (GUI_STAT gj);
50
51extern void factStatNew (EVT_STAT gi);
52
53extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
54
55extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
56 int8_t * buffer);
57
58extern void debugHead (int i, int j, void *buf);
59
60extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
61 int32_t runnr, int state, uint32_t tsec,
62 uint32_t tusec);
63extern void debugStream (int isock, void *buf, int len);
64
65int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
66
67
68int g_maxProc;
69int g_maxSize;
70int gi_maxSize;
71int gi_maxProc;
72
73uint g_actTime;
74uint g_actUsec;
75int g_runStat;
76int g_reset;
77int g_useFTM;
78
79int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
80size_t g_maxMem; //maximum memory allowed for buffer
81
82//no longer needed ...
83int g_maxBoards; //maximum number of boards to be initialized
84int g_actBoards;
85//
86
87FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
88
89
90int gi_runStat;
91int gp_runStat;
92int gw_runStat;
93
94int gi_memStat = +1;
95
96uint32_t gi_myRun = 0;
97uint32_t actrun = 0;
98
99
100uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
101
102//uint gi_EvtStart= 0 ;
103//uint gi_EvtRead = 0 ;
104//uint gi_EvtBad = 0 ;
105//uint gi_EvtTot = 0 ;
106//size_t gi_usedMem = 0 ;
107
108//uint gw_EvtTot = 0 ;
109//uint gp_EvtTot = 0 ;
110
111PIX_MAP g_pixMap[NPIX];
112
113EVT_STAT gi;
114GUI_STAT gj;
115
116EVT_CTRL evtCtrl; //control of events during processing
117int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
118
119WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
120
121
122
123
124RUN_HEAD actRun;
125
126RUN_CTRL runCtrl[MAX_RUN];
127
128RUN_TAIL runTail[MAX_RUN];
129
130
131/*
132*** Definition of rdBuffer to read in IP packets; keep it global !!!!
133 */
134
135
136typedef union
137{
138 int8_t B[MAX_LEN];
139 int16_t S[MAX_LEN / 2];
140 int32_t I[MAX_LEN / 4];
141 int64_t L[MAX_LEN / 8];
142} CNV_FACT;
143
144typedef struct
145{
146 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
147 int32_t bufPos; //next byte to read to the buffer next
148 int32_t bufLen; //number of bytes left to read
149 int32_t skip; //number of bytes skipped before start of event
150
151 int sockStat; //-1 if socket not yet connected , 99 if not exist
152 int socket; //contains the sockets
153 struct sockaddr_in SockAddr; //IP for each socket
154
155 int evtID; // event ID of event currently read
156 int runID; // run "
157 int ftmID; // event ID from FTM
158 uint fadLen; // FADlength of event currently read
159 int fadVers; // Version of FAD
160 int ftmTyp; // trigger type
161 int board; // boardID (softwareID: 0..40 )
162 int Port;
163
164 CNV_FACT *rBuf;
165
166#ifdef EVTDEBUG
167 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
168#endif
169
170} READ_STRUCT;
171
172
173typedef union
174{
175 int8_t B[2];
176 int16_t S;
177} SHORT_BYTE;
178
179
180
181
182
183#define MXSTR 1000
184char str[MXSTR];
185
186SHORT_BYTE start, stop;
187
188READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
189
190
191
192/*-----------------------------------------------------------------*/
193
194
195/*-----------------------------------------------------------------*/
196
197int
198runFinish1 (uint32_t runnr)
199{
200 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
201 runnr);
202 factOut (kInfo, 173, str); //but continue anyhow
203 return 0;
204}
205
206
207int
208GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
209 READ_STRUCT * rd)
210{
211/*
212*** generate Address, create sockets and allocates readbuffer for it
213***
214*** if flag==0 generate socket and buffer
215*** <0 destroy socket and buffer
216*** >0 close and redo socket
217***
218*** sid : board*7 + port id
219 */
220
221 int j;
222 int optval = 1; //activate keepalive
223 socklen_t optlen = sizeof (optval);
224
225 if (rd->sockStat == 0) { //close socket if open
226 j = close (rd->socket);
227 if (j > 0) {
228 snprintf (str, MXSTR, "Error closing socket %d | %m", sid);
229 factOut (kFatal, 771, str);
230 } else {
231 snprintf (str, MXSTR, "Succesfully closed socket %d", sid);
232 factOut (kInfo, 771, str);
233 }
234 }
235
236
237 if (flag < 0) {
238 free (rd->rBuf); //and never open again
239#ifdef EVTDEBUG
240 free (rd->xBuf); //and never open again
241#endif
242 rd->rBuf = NULL;
243 rd->sockStat = 99;
244 return 0;
245 }
246
247
248 if (flag == 0) { //generate address and buffer ...
249 rd->Port = port;
250 rd->SockAddr.sin_family = sockAddr->sin_family;
251 rd->SockAddr.sin_port = htons (port);
252 rd->SockAddr.sin_addr = sockAddr->sin_addr;
253
254#ifdef EVTDEBUG
255 rd->xBuf = malloc (sizeof (CNV_FACT));
256#endif
257 rd->rBuf = malloc (sizeof (CNV_FACT));
258 if (rd->rBuf == NULL) {
259 snprintf (str, MXSTR, "Could not create local buffer %d", sid);
260 factOut (kFatal, 774, str);
261 rd->sockStat = 77;
262 return -3;
263 }
264 }
265
266
267 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
268 snprintf (str, MXSTR, "Could not generate socket %d | %m", sid);
269 factOut (kFatal, 773, str);
270 rd->sockStat = 88;
271 return -2;
272 }
273 optval = 1;
274 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
275 snprintf (str, MXSTR, "Could not set keepalive %d | %m", sid);
276 factOut (kInfo, 173, str); //but continue anyhow
277 }
278 optval = 10; //start after 10 seconds
279 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
280 snprintf (str, MXSTR, "Could not set keepidle %d | %m", sid);
281 factOut (kInfo, 173, str); //but continue anyhow
282 }
283 optval = 10; //do every 10 seconds
284 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
285 snprintf (str, MXSTR, "Could not set keepintvl %d | %m", sid);
286 factOut (kInfo, 173, str); //but continue anyhow
287 }
288 optval = 2; //close after 2 unsuccessful tries
289 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
290 snprintf (str, MXSTR, "Could not set keepalive probes %d | %m", sid);
291 factOut (kInfo, 173, str); //but continue anyhow
292 }
293
294
295
296 snprintf (str, MXSTR, "Successfully generated socket %d ", sid);
297 factOut (kInfo, 773, str);
298 rd->sockStat = -1; //try to (re)open socket
299 return 0;
300
301} /*-----------------------------------------------------------------*/
302
303 /*-----------------------------------------------------------------*/
304
305
306
307
308int
309mBufInit ()
310{
311// initialize mBuffer (mark all entries as unused\empty)
312
313 int i;
314 uint32_t actime;
315
316 actime = g_actTime + 50000000;
317
318 for (i = 0; i < MAX_EVT * MAX_RUN; i++) {
319 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
320 mBuffer[i].runNum = 0;
321
322 evtCtrl.evtBuf[i] = -1;
323 evtCtrl.evtStat[i] = -1;
324 evtCtrl.pcTime[i] = actime; //initiate to far future
325 }
326
327
328 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
329
330 evtCtrl.frstPtr = 0;
331 evtCtrl.lastPtr = 0;
332
333 return 0;
334
335} /*-----------------------------------------------------------------*/
336
337
338
339
340int
341mBufEvt (int evID, uint runID, int nRoi[], int sk,
342 int fadlen, int trgTyp, int trgNum, int fadNum)
343{
344// generate a new Event into mBuffer:
345// make sure only complete Event are possible, so 'free' will always work
346// returns index into mBuffer[], or negative value in case of error
347// error: <-9000 if roi screwed up (not consistent with run)
348// <-8000 (not consistent with event)
349// <-7000 (not consistent with board)
350// < 0 if no space left
351
352 struct timeval *tv, atv;
353 tv = &atv;
354 uint32_t tsec, tusec;
355 uint oldest;
356 int jold;
357
358 int i, k, jr, b, evFree;
359 int headmem = 0;
360 size_t needmem = 0;
361
362
363 b = sk / 7;
364
365 if (nRoi[0] < 0 || nRoi[0] > 1024) {
366 snprintf (str, MXSTR, "illegal nRoi[0]: %d", nRoi[0]);
367 factOut (kError, 999, str);
368 gj.badRoiR++;
369 gj.badRoi[b]++;
370 return -9999;
371 }
372
373 for (jr = 1; jr < 8; jr++) {
374 if (nRoi[jr] != nRoi[0]) {
375 snprintf (str, MXSTR, "wrong nRoi[%d]: %d %d", jr, nRoi[jr],
376 nRoi[0]);
377 factOut (kError, 711, str);
378 gj.badRoiB++;
379 gj.badRoi[b]++;
380 return -7101;
381 }
382 }
383 if (nRoi[8] < nRoi[0]) {
384 snprintf (str, MXSTR, "wrong nRoi_TM: %d %d", nRoi[8], nRoi[0]);
385 factOut (kError, 712, str);
386 gj.badRoiB++;
387 gj.badRoi[b]++;
388 return -7102;
389 }
390
391
392 i = evID % MAX_EVT;
393 evFree = -1;
394
395 for (k = 0; k < MAX_RUN; k++) {
396 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
397 // is it ok ????
398 if (mBuffer[i].nRoi != nRoi[0]
399 || mBuffer[i].nRoiTM != nRoi[8]) {
400 snprintf (str, MXSTR, "illegal evt_roi %d %d ; %d %d",
401 nRoi[0], nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM);
402 factOut (kError, 821, str);
403 gj.badRoiE++;
404 gj.badRoi[b]++;
405 return -8201;
406 }
407// count for inconsistencies
408
409 if (mBuffer[i].trgNum != trgNum)
410 mBuffer[i].Errors[0]++;
411 if (mBuffer[i].fadNum != fadNum)
412 mBuffer[i].Errors[1]++;
413 if (mBuffer[i].trgTyp != trgTyp)
414 mBuffer[i].Errors[2]++;
415
416 //everything seems fine so far ==> use this slot ....
417 return i;
418 }
419 if (evFree < 0 && mBuffer[i].evNum < 0)
420 evFree = i;
421 i += MAX_EVT;
422 }
423
424
425 //event does not yet exist; create it
426 if (evFree < 0) { //no space available in ctrl
427 snprintf (str, MXSTR, "no control slot to keep event %d", evID);
428 factOut (kError, 881, str);
429 return -1;
430 }
431 i = evFree; //found free entry; use it ...
432
433 gettimeofday (tv, NULL);
434 tsec = atv.tv_sec;
435 tusec = atv.tv_usec;
436
437 //check if runId already registered in runCtrl
438 evFree = -1;
439 oldest = g_actTime + 1000;
440 jold = -1;
441 for (k = 0; k < MAX_RUN; k++) {
442 if (runCtrl[k].runId == runID) {
443 if (runCtrl[k].procId > 0) { //run is closed -> reject
444 snprintf (str, MXSTR, "skip event since run %d finished", runID);
445 factOut (kInfo, 931, str);
446 return -21;
447 }
448
449 if (runCtrl[k].roi0 != nRoi[0]
450 || runCtrl[k].roi8 != nRoi[8]) {
451 snprintf (str, MXSTR, "illegal run_roi %d %d ; %d %d",
452 nRoi[0], nRoi[8], runCtrl[k].roi0, runCtrl[k].roi8);
453 factOut (kError, 931, str);
454 gj.badRoiR++;
455 gj.badRoi[b]++;
456 return -9301;
457 }
458 goto RUNFOUND;
459 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
460 evFree = k;
461 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
462 if (runCtrl[k].closeTime < oldest) {
463 oldest = runCtrl[k].closeTime;
464 jold = k;
465 }
466 }
467 }
468
469 if (evFree < 0 && jold < 0) {
470 snprintf (str, MXSTR, "not able to register the new run %d", runID);
471 factOut (kFatal, 883, str);
472 return -1001;
473 } else {
474 if (evFree < 0)
475 evFree = jold;
476 snprintf (str, MXSTR, "register new run %d(%d) roi: %d %d", runID,
477 evFree, nRoi[0], nRoi[8]);
478 factOut (kInfo, 503, str);
479 runCtrl[evFree].runId = runID;
480 runCtrl[evFree].roi0 = nRoi[0];
481 runCtrl[evFree].roi8 = nRoi[8];
482 runCtrl[evFree].fileId = -2;
483 runCtrl[evFree].procId = -2;
484 runCtrl[evFree].lastEvt = -1;
485 runCtrl[evFree].nextEvt = 0;
486 runCtrl[evFree].actEvt = 0;
487 runCtrl[evFree].procEvt = 0;
488 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
489 runCtrl[evFree].firstUsec = tusec;
490 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
491 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
492 runCtrl[evFree].lastTime = 0;
493
494 runTail[evFree].nEventsOk =
495 runTail[evFree].nEventsRej =
496 runTail[evFree].nEventsBad =
497 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
498 }
499
500 RUNFOUND:
501
502 needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
503
504 headmem = NBOARDS * sizeof (PEVNT_HEADER);
505
506 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
507 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
508 if (gi_memStat > 0) {
509 gi_memStat = -99;
510 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
511 evID, sk);
512 factOut (kError, 882, str);
513 } else {
514 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
515 evID, sk);
516 factOut (kDebug, 882, str);
517 }
518 return -11;
519 }
520
521 mBuffer[i].FADhead = malloc (headmem);
522 if (mBuffer[i].FADhead == NULL) {
523 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
524 factOut (kError, 882, str);
525 return -12;
526 }
527
528 mBuffer[i].fEvent = malloc (needmem);
529 if (mBuffer[i].fEvent == NULL) {
530 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
531 factOut (kError, 882, str);
532 free (mBuffer[i].FADhead);
533 mBuffer[i].FADhead = NULL;
534 return -22;
535 }
536
537 mBuffer[i].buffer = malloc (gi_maxSize);
538 if (mBuffer[i].buffer == NULL) {
539 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
540 factOut (kError, 882, str);
541 free (mBuffer[i].FADhead);
542 mBuffer[i].FADhead = NULL;
543 free (mBuffer[i].fEvent);
544 mBuffer[i].fEvent = NULL;
545 return -32;
546 }
547 //flag all boards as unused
548 mBuffer[i].nBoard = 0;
549 for (k = 0; k < NBOARDS; k++) {
550 mBuffer[i].board[k] = -1;
551 }
552 //flag all pixels as unused
553 for (k = 0; k < NPIX; k++) {
554 mBuffer[i].fEvent->StartPix[k] = -1;
555 }
556 //flag all TMark as unused
557 for (k = 0; k < NTMARK; k++) {
558 mBuffer[i].fEvent->StartTM[k] = -1;
559 }
560
561 mBuffer[i].fEvent->NumBoards = 0;
562 mBuffer[i].fEvent->PCUsec = tusec;
563 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
564 mBuffer[i].nRoi = nRoi[0];
565 mBuffer[i].nRoiTM = nRoi[8];
566 mBuffer[i].evNum = evID;
567 mBuffer[i].runNum = runID;
568 mBuffer[i].fadNum = fadNum;
569 mBuffer[i].trgNum = trgNum;
570 mBuffer[i].trgTyp = trgTyp;
571 mBuffer[i].evtLen = needmem;
572 mBuffer[i].Errors[0] =
573 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
574
575 gj.usdMem += needmem + headmem + gi_maxSize;
576 if (gj.usdMem > gj.maxMem)
577 gj.maxMem = gj.usdMem;
578
579 gj.bufTot++;
580 if (gj.bufTot > gj.maxEvt)
581 gj.maxEvt = gj.bufTot;
582
583 gj.rateNew++;
584
585 //register event in 'active list (reading)'
586
587 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
588 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
589 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
590 evtIdx[i] = evtCtrl.lastPtr;
591
592
593 snprintf (str, MXSTR,
594 "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", evID,
595 runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
596 factOut (kDebug, -11, str);
597 evtCtrl.lastPtr++;
598 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN)
599 evtCtrl.lastPtr = 0;
600
601 gi.evtGet++;
602
603 return i;
604
605} /*-----------------------------------------------------------------*/
606
607
608
609
610int
611mBufFree (int i)
612{
613//delete entry [i] from mBuffer:
614//(and make sure multiple calls do no harm ....)
615
616 int headmem = 0;
617 int evid;
618 size_t freemem = 0;
619
620 evid = mBuffer[i].evNum;
621 freemem = mBuffer[i].evtLen;
622
623 free (mBuffer[i].fEvent);
624 mBuffer[i].fEvent = NULL;
625
626 free (mBuffer[i].FADhead);
627 mBuffer[i].FADhead = NULL;
628
629 free (mBuffer[i].buffer);
630 mBuffer[i].buffer = NULL;
631
632 headmem = NBOARDS * sizeof (PEVNT_HEADER);
633 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
634 mBuffer[i].runNum = 0;
635
636 gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
637 gj.bufTot--;
638
639 if (gi_memStat < 0) {
640 if (gj.usdMem <= 0.75 * gj.maxMem)
641 gi_memStat = +1;
642 }
643
644
645 return 0;
646
647} /*-----------------------------------------------------------------*/
648
649
650void
651resetEvtStat ()
652{
653 int i;
654
655 for (i = 0; i < MAX_SOCK; i++)
656 gi.numRead[i] = 0;
657
658 for (i = 0; i < NBOARDS; i++) {
659 gi.gotByte[i] = 0;
660 gi.gotErr[i] = 0;
661
662 }
663
664 gi.evtGet = 0; //#new Start of Events read
665 gi.evtTot = 0; //#complete Events read
666 gi.evtErr = 0; //#Events with Errors
667 gi.evtSkp = 0; //#Events incomplete (timeout)
668
669 gi.procTot = 0; //#Events processed
670 gi.procErr = 0; //#Events showed problem in processing
671 gi.procTrg = 0; //#Events accepted by SW trigger
672 gi.procSkp = 0; //#Events rejected by SW trigger
673
674 gi.feedTot = 0; //#Events used for feedBack system
675 gi.feedErr = 0; //#Events rejected by feedBack
676
677 gi.wrtTot = 0; //#Events written to disk
678 gi.wrtErr = 0; //#Events with write-error
679
680 gi.runOpen = 0; //#Runs opened
681 gi.runClose = 0; //#Runs closed
682 gi.runErr = 0; //#Runs with open/close errors
683
684 return;
685} /*-----------------------------------------------------------------*/
686
687
688
689void
690initReadFAD ()
691{
692 return;
693} /*-----------------------------------------------------------------*/
694
695
696
697void *
698readFAD (void *ptr)
699{
700/* *** main loop reading FAD data and sorting them to complete events */
701 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k;
702 int actBoards = 0, minLen;
703 int32_t jrd;
704 uint gi_SecTime; //time in seconds
705 int boardId, roi[9], drs, px, src, pixS, pixH, pixC, pixR, tmS;
706
707 int goodhed = 0;
708
709 int sockDef[NBOARDS]; //internal state of sockets
710 int jrdx;
711
712
713 struct timespec xwait;
714
715
716 struct timeval *tv, atv;
717 tv = &atv;
718 uint32_t tsec, tusec;
719
720
721 snprintf (str, MXSTR, "start initializing");
722 factOut (kInfo, -1, str);
723
724 int cpu = 7; //read thread
725 cpu_set_t mask;
726
727/* CPU_ZERO initializes all the bits in the mask to zero. */
728 CPU_ZERO (&mask);
729/* CPU_SET sets only the bit corresponding to cpu. */
730 cpu = 7;
731 CPU_SET (cpu, &mask);
732
733/* sched_setaffinity returns 0 in success */
734 if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
735 snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
736 factOut (kWarn, -1, str);
737 }
738
739
740 head_len = sizeof (PEVNT_HEADER);
741 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
742 minLen = head_len; //min #bytes needed to check header: full header for debug
743
744 start.S = 0xFB01;
745 stop.S = 0x04FE;
746
747/* initialize run control logics */
748 for (i = 0; i < MAX_RUN; i++) {
749 runCtrl[i].runId = 0;
750 runCtrl[i].fileId = -2;
751 runCtrl[i].procId = -2;
752 }
753 gi_resetS = gi_resetR = 9;
754 for (i = 0; i < NBOARDS; i++)
755 sockDef[i] = 0;
756
757 START:
758 gettimeofday (tv, NULL);
759 g_actTime = tsec = atv.tv_sec;
760 g_actUsec = tusec = atv.tv_usec;
761 gi_myRun = g_actTime;
762 evtCtrl.frstPtr = 0;
763 evtCtrl.lastPtr = 0;
764
765 gi_SecTime = g_actTime;
766 gi_runStat = g_runStat;
767 gj.readStat = g_runStat;
768 numok = numok2 = 0;
769
770 if (gi_resetS > 0) {
771 //make sure all sockets are preallocated as 'not exist'
772 for (i = 0; i < MAX_SOCK; i++) {
773 rd[i].socket = -1;
774 rd[i].sockStat = 99;
775 }
776
777 for (k = 0; k < NBOARDS; k++) {
778 gi_NumConnect[k] = 0;
779 gi.numConn[k] = 0;
780 gj.numConn[k] = 0;
781 gj.errConn[k] = 0;
782 gj.rateBytes[k] = 0;
783 gj.totBytes[k] = 0;
784 }
785
786 }
787
788
789 if (gi_resetR > 0) {
790 resetEvtStat ();
791 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
792 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
793 gj.totMem = g_maxMem;
794 gj.bufNew = gj.bufEvt = 0;
795 gj.badRoiE = gj.badRoiR = gj.badRoiB =
796 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
797
798 int b;
799 for (b = 0; b < NBOARDS; b++)
800 gj.badRoi[b] = 0;
801
802 mBufInit (); //initialize buffers
803
804 snprintf (str, MXSTR, "end initializing");
805 factOut (kInfo, -1, str);
806 }
807
808
809 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
810
811 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
812
813 gi_runStat = g_runStat;
814 gj.readStat = g_runStat;
815 gettimeofday (tv, NULL);
816 g_actTime = tsec = atv.tv_sec;
817 g_actUsec = tusec = atv.tv_usec;
818
819
820 int b, p, p0, s0, nch;
821 nch = 0;
822 for (b = 0; b < NBOARDS; b++) {
823 k = b * 7;
824 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
825 nch++;
826 gi_NumConnect[b] = 0; //must close all connections
827 gi.numConn[b] = 0;
828 gj.numConn[b] = 0;
829 if (sockDef[b] == 0)
830 s0 = 0; //sockets to be defined and opened
831 else if (g_port[b].sockDef == 0)
832 s0 = -1; //sockets to be destroyed
833 else
834 s0 = +1; //sockets to be closed and reopened
835
836 if (s0 == 0)
837 p0 = ntohs (g_port[b].sockAddr.sin_port);
838 else
839 p0 = 0;
840
841 for (p = p0 + 1; p < p0 + 8; p++) {
842 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
843 k++;
844 }
845 sockDef[b] = g_port[b].sockDef;
846 }
847 }
848
849 if (nch > 0) {
850 actBoards = 0;
851 for (b = 0; b < NBOARDS; b++) {
852 if (sockDef[b] > 0)
853 actBoards++;
854 }
855 }
856
857
858 jrdx = 0;
859 numokx = 0;
860 numok = 0; //count number of succesfull actions
861
862 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read
863 b = i / 7;
864 if (sockDef[b] > 0)
865 s0 = +1;
866 else
867 s0 = -1;
868
869 if (rd[i].sockStat < 0) { //try to connect if not yet done
870 rd[i].sockStat = connect (rd[i].socket,
871 (struct sockaddr *) &rd[i].SockAddr,
872 sizeof (rd[i].SockAddr));
873 if (rd[i].sockStat == 0) { //successfull ==>
874 if (sockDef[b] > 0) {
875 rd[i].bufTyp = 0; // expect a header
876 rd[i].bufLen = frst_len; // max size to read at begining
877 } else {
878 rd[i].bufTyp = -1; // full data to be skipped
879 rd[i].bufLen = MAX_LEN; //huge for skipping
880 }
881 rd[i].bufPos = 0; // no byte read so far
882 rd[i].skip = 0; // start empty
883 gi_NumConnect[b]++;
884 gi.numConn[b]++;
885 gj.numConn[b]++;
886 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);
887 factOut (kInfo, -1, str);
888 }
889 }
890
891 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
892 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
893 numok++;
894 jrd =
895 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
896 rd[i].bufLen, MSG_DONTWAIT);
897
898 if (jrd > 0) {
899 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
900#ifdef EVTDEBUG
901 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
902 &rd[i].rBuf->B[rd[i].bufPos], jrd);
903 snprintf (str, MXSTR,
904 "read sock %3d bytes %5d len %5d first %d %d", i,
905 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos],
906 rd[i].rBuf->B[rd[i].bufPos + 1]);
907 factOut (kDebug, 301, str);
908#endif
909 }
910
911 if (jrd == 0) { //connection has closed ...
912 snprintf (str, MXSTR, "Socket %d closed by FAD", i);
913 factOut (kInfo, 441, str);
914 GenSock (s0, i, 0, NULL, &rd[i]);
915 gi.gotErr[b]++;
916 gi_NumConnect[b]--;
917 gi.numConn[b]--;
918 gj.numConn[b]--;
919
920 } else if (jrd < 0) { //did not read anything
921 if (errno != EAGAIN && errno != EWOULDBLOCK) {
922 snprintf (str, MXSTR, "Error Reading from %d | %m", i);
923 factOut (kError, 442, str);
924 gi.gotErr[b]++;
925 } else
926 numok--; //else nothing waiting to be read
927 jrd = 0;
928 }
929 } else {
930 jrd = 0; //did read nothing as requested
931 snprintf (str, MXSTR, "do not read from socket %d %d", i,
932 rd[i].bufLen);
933 factOut (kDebug, 301, str);
934 }
935
936 gi.gotByte[b] += jrd;
937 gj.rateBytes[b] += jrd;
938
939 if (jrd > 0) {
940 numokx++;
941 jrdx += jrd;
942 }
943
944
945 if (rd[i].bufTyp < 0) { // we are skipping this board ...
946// just do nothing
947#ifdef EVTDEBUG
948 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
949 i);
950 factOut (kInfo, 301, str);
951#endif
952
953 } else if (rd[i].bufTyp > 0) { // we are reading data ...
954 if (jrd < rd[i].bufLen) { //not yet all read
955 rd[i].bufPos += jrd; //==> prepare for continuation
956 rd[i].bufLen -= jrd;
957 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
958 } else { //full dataset read
959 rd[i].bufLen = 0;
960 rd[i].bufPos = rd[i].fadLen;
961 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
962 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
963 gi.evtErr++;
964 snprintf (str, MXSTR,
965 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
966 i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0],
967 rd[i].rBuf->B[1],
968 rd[i].rBuf->B[rd[i].bufPos - 2],
969 rd[i].rBuf->B[rd[i].bufPos - 1]);
970 factOut (kError, 301, str);
971 goto EndBuf;
972
973#ifdef EVTDEBUG
974 } else {
975 snprintf (str, MXSTR,
976 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
977 i, rd[i].fadLen, rd[i].rBuf->B[0],
978 rd[i].rBuf->B[1], start.B[1], start.B[0],
979 rd[i].rBuf->B[rd[i].bufPos - 2],
980 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
981 stop.B[0]);
982 factOut (kDebug, 301, str);
983#endif
984 }
985
986 if (jrd > 0)
987 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
988
989 //we have a complete buffer, copy to WORK area
990 int jr;
991 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
992 for (jr = 0; jr < 9; jr++) {
993 roi[jr] =
994 ntohs (rd[i].
995 rBuf->S[head_len / 2 + 2 + jr * (roi[0] + 4)]);
996 }
997 //get index into mBuffer for this event (create if needed)
998
999 int actid;
1000 if (g_useFTM > 0)
1001 actid = rd[i].evtID;
1002 else
1003 actid = rd[i].ftmID;
1004
1005 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1006 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1007 rd[i].evtID);
1008
1009 if (evID < -1000) {
1010 goto EndBuf; //not usable board/event/run --> skip it
1011 }
1012 if (evID < 0) { //no space left, retry later
1013#ifdef EVTDEBUG
1014 if (rd[i].bufLen != 0) {
1015 snprintf (str, MXSTR, "something screwed up");
1016 factOut (kFatal, 1, str);
1017 }
1018#endif
1019 xwait.tv_sec = 0;
1020 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1021 nanosleep (&xwait, NULL);
1022 goto EndBuf1; //hope there is free space next round
1023 }
1024 //we have a valid entry in mBuffer[]; fill it
1025
1026#ifdef EVTDEBUG
1027 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1028 rd[i].fadLen);
1029 if (xchk != 0) {
1030 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d",
1031 xchk, rd[i].fadLen, i);
1032 factOut (kFatal, 1, str);
1033
1034 uint iq;
1035 for (iq = 0; iq < rd[i].fadLen; iq++) {
1036 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1037 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq,
1038 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
1039 factOut (kFatal, 1, str);
1040 }
1041 }
1042 }
1043#endif
1044 int qncpy = 0;
1045 boardId = b;
1046 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1047 int fadCrate = fadBoard / 256;
1048 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1049 snprintf (str, MXSTR, "wrong Board ID %d %d %d",
1050 fadCrate, fadBoard % 256, boardId);
1051 factOut (kWarn, 301, str);
1052 }
1053 if (mBuffer[evID].board[boardId] != -1) {
1054 snprintf (str, MXSTR,
1055 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ",
1056 evID, boardId, i, rd[i].fadLen,
1057 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1058 rd[i].rBuf->B[rd[i].bufPos - 2],
1059 rd[i].rBuf->B[rd[i].bufPos - 1]);
1060 factOut (kWarn, 501, str);
1061 goto EndBuf; //--> skip Board
1062 }
1063
1064 int iDx = evtIdx[evID]; //index into evtCtrl
1065
1066 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
1067 &rd[i].rBuf->S[0], head_len);
1068 qncpy += head_len;
1069
1070 src = head_len / 2;
1071 for (px = 0; px < 9; px++) { //different sort in FAD board.....
1072 for (drs = 0; drs < 4; drs++) {
1073 pixH = ntohs (rd[i].rBuf->S[src++]); // ID
1074 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell
1075 pixR = ntohs (rd[i].rBuf->S[src++]); // roi
1076//here we should check if pixH is correct ....
1077
1078 pixS = boardId * 36 + drs * 9 + px;
1079 src++;
1080
1081
1082 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1083 dest = pixS * roi[0];
1084 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1085 &rd[i].rBuf->S[src], roi[0] * 2);
1086 qncpy += roi[0] * 2;
1087 src += pixR;
1088
1089 if (px == 8) {
1090 tmS = boardId * 4 + drs;
1091 if (pixR > roi[0]) { //and we have additional TM info
1092 dest = tmS * roi[0] + NPIX * roi[0];
1093 int srcT = src - roi[0];
1094 mBuffer[evID].fEvent->StartTM[tmS] =
1095 (pixC + pixR - roi[0]) % 1024;
1096 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1097 &rd[i].rBuf->S[srcT], roi[0] * 2);
1098 qncpy += roi[0] * 2;
1099 } else {
1100 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1101 }
1102 }
1103 }
1104 } // now we have stored a new board contents into Event structure
1105
1106 mBuffer[evID].fEvent->NumBoards++;
1107 mBuffer[evID].board[boardId] = boardId;
1108 evtCtrl.evtStat[iDx]++;
1109 evtCtrl.pcTime[iDx] = g_actTime;
1110
1111 if (++mBuffer[evID].nBoard >= actBoards) {
1112 int qnrun = 0;
1113 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ???
1114 actrun = mBuffer[evID].runNum;
1115 int ir;
1116 for (ir = 0; ir < MAX_RUN; ir++) {
1117 qnrun++;
1118 if (runCtrl[ir].runId == actrun) {
1119 if (++runCtrl[ir].lastEvt == 0) {
1120 gotNewRun (actrun, mBuffer[evID].FADhead);
1121 snprintf (str, MXSTR, "gotNewRun %d (ev %d)",
1122 mBuffer[evID].runNum,
1123 mBuffer[evID].evNum);
1124 factOut (kInfo, 1, str);
1125 break;
1126 }
1127 }
1128 }
1129 }
1130 snprintf (str, MXSTR,
1131 "%5d complete event roi %4d roiTM %d cpy %8d %5d",
1132 mBuffer[evID].evNum, roi[0], roi[8] - roi[0],
1133 qncpy, qnrun);
1134 factOut (kDebug, -1, str);
1135
1136 //complete event read ---> flag for next processing
1137 evtCtrl.evtStat[iDx] = 99;
1138 gi.evtTot++;
1139 }
1140
1141 EndBuf:
1142 rd[i].bufTyp = 0; //ready to read next header
1143 rd[i].bufLen = frst_len;
1144 rd[i].bufPos = 0;
1145 EndBuf1:
1146 ;
1147 }
1148
1149 } else { //we are reading event header
1150 rd[i].bufPos += jrd;
1151 rd[i].bufLen -= jrd;
1152 if (rd[i].bufPos >= minLen) { //sufficient data to take action
1153 //check if startflag correct; else shift block ....
1154 for (k = 0; k < rd[i].bufPos - 1; k++) {
1155 if (rd[i].rBuf->B[k] == start.B[1]
1156 && rd[i].rBuf->B[k + 1] == start.B[0])
1157 break;
1158 }
1159 rd[i].skip += k;
1160
1161 if (k >= rd[i].bufPos - 1) { //no start of header found
1162 rd[i].bufPos = 0;
1163 rd[i].bufLen = head_len;
1164 } else if (k > 0) {
1165 rd[i].bufPos -= k;
1166 rd[i].bufLen += k;
1167 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1168 rd[i].bufPos);
1169#ifdef EVTDEBUG
1170 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1171 rd[i].bufPos);
1172#endif
1173 }
1174 if (rd[i].bufPos >= minLen) {
1175 if (rd[i].skip > 0) {
1176 snprintf (str, MXSTR, "skipped %d bytes on port%d",
1177 rd[i].skip, i);
1178 factOut (kInfo, 666, str);
1179 rd[i].skip = 0;
1180 }
1181 goodhed++;
1182 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2;
1183 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]);
1184 rd[i].ftmTyp = ntohl (rd[i].rBuf->S[5]);
1185 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt)
1186 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt)
1187 rd[i].runID = ntohl (rd[i].rBuf->I[11]);
1188 rd[i].bufTyp = 1; //ready to read full record
1189 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1190
1191 int fadboard = ntohs (rd[i].rBuf->S[12]);
1192 int fadcrate = fadboard / 256;
1193 fadboard = (fadcrate * 10 + fadboard % 256);
1194#ifdef EVTDEBUG
1195 snprintf (str, MXSTR,
1196 "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
1197 rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
1198 rd[i].runID, fadboard, jrd);
1199 factOut (kDebug, 1, str);
1200#endif
1201
1202 if (rd[i].runID == 0)
1203 rd[i].runID = gi_myRun;
1204
1205 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
1206 snprintf (str, MXSTR,
1207 "illegal event-length on port %d", i);
1208 factOut (kFatal, 881, str);
1209 rd[i].bufLen = 100000; //?
1210 }
1211 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1212 debugHead (i, fadBoard, rd[i].rBuf);
1213 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1214 } else {
1215 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1216 }
1217 } else {
1218 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1219 }
1220
1221 } //end interpreting last read
1222 } //end of successful read anything
1223 } //finished trying to read all sockets
1224
1225#ifdef EVTDEBUG
1226 snprintf (str, MXSTR, "Loop ---- %3d --- %8d", numokx, jrdx);
1227 factOut (kDebug, -1, str);
1228#endif
1229
1230 gi.numRead[numok]++;
1231
1232 g_actTime = time (NULL);
1233 if (g_actTime > gi_SecTime) {
1234 gi_SecTime = g_actTime;
1235
1236
1237 //loop over all active events and flag those older than read-timeout
1238 //delete those that are written to disk ....
1239
1240 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1241 if (kd < 0)
1242 kd += (MAX_EVT * MAX_RUN);
1243
1244 gj.bufNew = gj.bufEvt = 0;
1245 int k1 = evtCtrl.frstPtr;
1246 for (k = k1; k < (k1 + kd); k++) {
1247 int k0 = k % (MAX_EVT * MAX_RUN);
1248//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1249
1250 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) {
1251 gj.bufNew++; //incomplete event in Buffer
1252 if (evtCtrl.evtStat[k0] < 90
1253 && evtCtrl.pcTime[k0] < g_actTime - 10) {
1254 int id = evtCtrl.evtBuf[k0];
1255 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d",
1256 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0,
1257 evtCtrl.evtStat[k0]);
1258 factOut (kWarn, 601, str);
1259 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events
1260 gi.evtSkp++;
1261 gi.evtTot++;
1262 gj.evtSkip++;
1263 }
1264 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete'
1265 || evtCtrl.evtStat[k0] == 0) { //'useless'
1266
1267 int id = evtCtrl.evtBuf[k0];
1268 snprintf (str, MXSTR, "%5d free event buffer, nb=%3d",
1269 mBuffer[id].evNum, mBuffer[id].nBoard);
1270 factOut (kDebug, -1, str);
1271 mBufFree (id); //event written--> free memory
1272 evtCtrl.evtStat[k0] = -1;
1273 gj.evtWrite++;
1274 gj.rateWrite++;
1275 } else if (evtCtrl.evtStat[k0] >= 95) {
1276 gj.bufEvt++; //complete event in Buffer
1277 }
1278
1279 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) {
1280 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1281 }
1282 }
1283
1284
1285 gj.deltaT = 1000; //temporary, must be improved
1286
1287 int b;
1288 for (b = 0; b < NBOARDS; b++)
1289 gj.totBytes[b] += gj.rateBytes[b];
1290 gj.totMem = g_maxMem;
1291 if (gj.maxMem > gj.xxxMem)
1292 gj.xxxMem = gj.maxMem;
1293 if (gj.maxEvt > gj.xxxEvt)
1294 gj.xxxEvt = gj.maxEvt;
1295
1296 factStat (gj);
1297 factStatNew (gi);
1298 gj.rateNew = gj.rateWrite = 0;
1299 gj.maxMem = gj.usdMem;
1300 gj.maxEvt = gj.bufTot;
1301 for (b = 0; b < NBOARDS; b++)
1302 gj.rateBytes[b] = 0;
1303 }
1304
1305 if (numok > 0)
1306 numok2 = 0;
1307 else if (numok2++ > 3) {
1308 if (g_runStat == 1) {
1309 xwait.tv_sec = 1;
1310 xwait.tv_nsec = 0; // hibernate for 1 sec
1311 } else {
1312 xwait.tv_sec = 0;
1313 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1314 }
1315 nanosleep (&xwait, NULL);
1316 }
1317
1318 } //and do next loop over all sockets ...
1319
1320
1321 snprintf (str, MXSTR, "stop reading ... RESET=%d", g_reset);
1322 factOut (kInfo, -1, str);
1323
1324 if (g_reset > 0) {
1325 gi_reset = g_reset;
1326 gi_resetR = gi_reset % 10; //shall we stop reading ?
1327 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1328 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1329 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1330 g_reset = 0;
1331 } else {
1332 gi_reset = 0;
1333 if (g_runStat == -1)
1334 gi_resetR = 1;
1335 else
1336 gi_resetR = 7;
1337 gi_resetS = 7; //close all sockets
1338 gi_resetW = 7; //close all files
1339 gi_resetX = 0;
1340
1341 //inform others we have to quit ....
1342 gi_runStat = -11; //inform all that no update to happen any more
1343 gj.readStat = -11; //inform all that no update to happen any more
1344 }
1345
1346 if (gi_resetS > 0) {
1347 //must close all open sockets ...
1348 snprintf (str, MXSTR, "close all sockets ...");
1349 factOut (kInfo, -1, str);
1350 for (i = 0; i < MAX_SOCK; i++) {
1351 if (rd[i].sockStat == 0) {
1352 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1353 if (i % 7 == 0) {
1354 gi_NumConnect[i / 7]--;
1355 gi.numConn[i / 7]--;
1356 gj.numConn[i / 7]--;
1357 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1358 rd[i / 7].sockStat = -1; //and try to open asap
1359 }
1360 }
1361 }
1362 }
1363
1364
1365 if (gi_resetR > 0) {
1366 //flag all events as 'read finished'
1367 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1368 if (kd < 0)
1369 kd += (MAX_EVT * MAX_RUN);
1370
1371 int k1 = evtCtrl.frstPtr;
1372 for (k = k1; k < (k1 + kd); k++) {
1373 int k0 = k % (MAX_EVT * MAX_RUN);
1374 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) {
1375 evtCtrl.evtStat[k0] = 91;
1376 gi.evtSkp++;
1377 gi.evtTot++;
1378 }
1379 }
1380
1381 xwait.tv_sec = 0;
1382 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1383 nanosleep (&xwait, NULL);
1384
1385 //and clear all buffers (might have to wait until all others are done)
1386 int minclear;
1387 if (gi_resetR == 1) {
1388 minclear = 900;
1389 snprintf (str, MXSTR, "drain all buffers ...");
1390 } else {
1391 minclear = 0;
1392 snprintf (str, MXSTR, "flush all buffers ...");
1393 }
1394 factOut (kInfo, -1, str);
1395
1396 int numclear = 1;
1397 while (numclear > 0) {
1398 numclear = 0;
1399 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1400 if (kd < 0)
1401 kd += (MAX_EVT * MAX_RUN);
1402
1403 int k1 = evtCtrl.frstPtr;
1404 for (k = k1; k < (k1 + kd); k++) {
1405 int k0 = k % (MAX_EVT * MAX_RUN);
1406 if (evtCtrl.evtStat[k0] > minclear) {
1407 int id = evtCtrl.evtBuf[k0];
1408 snprintf (str, MXSTR, "ev %5d free event buffer, nb=%3d",
1409 mBuffer[id].evNum, mBuffer[id].nBoard);
1410 factOut (kDebug, -1, str);
1411 mBufFree (id); //event written--> free memory
1412 evtCtrl.evtStat[k0] = -1;
1413 } else if (evtCtrl.evtStat[k0] > 0)
1414 numclear++; //writing is still ongoing...
1415
1416 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1417 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1418 }
1419
1420 xwait.tv_sec = 0;
1421 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1422 nanosleep (&xwait, NULL);
1423 }
1424 }
1425
1426 if (gi_reset > 0) {
1427 if (gi_resetW > 0) {
1428 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1429 }
1430 if (gi_resetX > 0) {
1431 xwait.tv_sec = gi_resetX;
1432 xwait.tv_nsec = 0;
1433 nanosleep (&xwait, NULL);
1434 }
1435
1436 snprintf (str, MXSTR, "Continue read Process ...");
1437 factOut (kInfo, -1, str);
1438 gi_reset = 0;
1439 goto START;
1440 }
1441
1442
1443
1444 snprintf (str, MXSTR, "Exit read Process ...");
1445 factOut (kInfo, -1, str);
1446 gi_runStat = -99;
1447 gj.readStat = -99;
1448 factStat (gj);
1449 factStatNew (gi);
1450 return 0;
1451
1452} /*-----------------------------------------------------------------*/
1453
1454
1455void *
1456subProc (void *thrid)
1457{
1458 int threadID, status, numWait, numProc, kd, k1, k0, k, jret;
1459 struct timespec xwait;
1460
1461 threadID = (int) thrid;
1462
1463 snprintf (str, MXSTR, "Starting sub-process-thread %d", threadID);
1464 factOut (kInfo, -1, str);
1465
1466 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1467 numWait = numProc = 0;
1468 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1469 if (kd < 0)
1470 kd += (MAX_EVT * MAX_RUN);
1471
1472 int k1 = evtCtrl.frstPtr;
1473 for (k = k1; k < (k1 + kd); k++) {
1474 int k0 = k % (MAX_EVT * MAX_RUN);
1475
1476 if (evtCtrl.evtStat[k0] == 1000 + threadID) {
1477 if (gi_resetR > 1) { //we are asked to flush buffers asap
1478 jret = 9100; //flag to be deleted
1479 } else {
1480 int id = evtCtrl.evtBuf[k0];
1481 jret =
1482 subProcEvt (threadID, mBuffer[id].FADhead,
1483 mBuffer[id].fEvent, mBuffer[id].buffer);
1484 if (jret <= threadID) {
1485 snprintf (str, MXSTR,
1486 "process %d wants to send to process %d",
1487 threadID, jret);
1488 factOut (kError, -1, str);
1489 jret = 5300;
1490 } else if (jret <= 0)
1491 jret = 9200 + threadID; //flag as 'to be deleted'
1492 else if (jret >= gi_maxProc)
1493 jret = 5200 + threadID; //flag as 'to be written'
1494 else
1495 jret = 1000 + jret; //flag for next proces
1496 }
1497 evtCtrl.evtStat[k0] = jret;
1498 numProc++;
1499 } else if (evtCtrl.evtStat[k0] < 1000 + threadID)
1500 numWait++;
1501 }
1502
1503 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1504 snprintf (str, MXSTR, "Exit subProcessing Process %d", threadID);
1505 factOut (kInfo, -1, str);
1506 return 0;
1507 }
1508 if (numProc == 0) {
1509 //seems we have nothing to do, so sleep a little
1510 xwait.tv_sec = 0;
1511 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1512 nanosleep (&xwait, NULL);
1513 }
1514 }
1515
1516 snprintf (str, MXSTR, "Ending sub-process-thread %d", threadID);
1517 factOut (kInfo, -1, str);
1518 return;
1519} /*-----------------------------------------------------------------*/
1520
1521
1522void *
1523procEvt (void *ptr)
1524{
1525/* *** main loop processing file, including SW-trigger */
1526 int numProc, numWait;
1527 int k, status, j;
1528 struct timespec xwait;
1529 char str[MXSTR];
1530
1531
1532
1533 int lastRun = 0; //usually run from last event still valid
1534
1535 cpu_set_t mask;
1536 int cpu = 1; //process thread (will be several in final version)
1537
1538 snprintf (str, MXSTR, "Starting process-thread with %d subprocess",
1539 gi_maxProc);
1540 factOut (kInfo, -1, str);
1541
1542/* CPU_ZERO initializes all the bits in the mask to zero. */
1543 CPU_ZERO (&mask);
1544/* CPU_SET sets only the bit corresponding to cpu. */
1545// CPU_SET( 0 , &mask ); leave for system
1546// CPU_SET( 1 , &mask ); used by write process
1547 CPU_SET (2, &mask);
1548 CPU_SET (3, &mask);
1549 CPU_SET (4, &mask);
1550 CPU_SET (5, &mask);
1551 CPU_SET (6, &mask);
1552// CPU_SET( 7 , &mask ); used by read process
1553/* sched_setaffinity returns 0 in success */
1554 if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1555 snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1556 factOut (kWarn, -1, str);
1557 }
1558
1559
1560 pthread_t thread[100];
1561 int th_ret[100];
1562
1563 for (k = 0; k < gi_maxProc; k++) {
1564 th_ret[k] = pthread_create (&thread[k], NULL, subProc, (void *) k);
1565 }
1566
1567 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1568
1569 numWait = numProc = 0;
1570 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1571 if (kd < 0)
1572 kd += (MAX_EVT * MAX_RUN);
1573
1574 int k1 = evtCtrl.frstPtr;
1575 for (k = k1; k < (k1 + kd); k++) {
1576 int k0 = k % (MAX_EVT * MAX_RUN);
1577//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1578 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) {
1579
1580 if (gi_resetR > 1) { //we are asked to flush buffers asap
1581 evtCtrl.evtStat[k0] = 9991;
1582 } else {
1583
1584//-------- it is better to open the run already here, so call can be used to initialize
1585//-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1586 int id = evtCtrl.evtBuf[k0];
1587 uint32_t irun = mBuffer[id].runNum;
1588 int32_t ievt = mBuffer[id].evNum;
1589 if (runCtrl[lastRun].runId == irun) {
1590 j = lastRun;
1591 } else {
1592 //check which fileID to use (or open if needed)
1593 for (j = 0; j < MAX_RUN; j++) {
1594 if (runCtrl[j].runId == irun)
1595 break;
1596 }
1597 if (j >= MAX_RUN) {
1598 snprintf (str, MXSTR,
1599 "P error: can not find run %d for event %d in %d",
1600 irun, ievt, id);
1601 factOut (kFatal, 901, str);
1602 }
1603 lastRun = j;
1604 }
1605
1606 if (runCtrl[j].fileId < 0) {
1607//---- we need to open a new run ==> make sure all older runs are
1608//---- finished and marked to be closed ....
1609 int j1;
1610 for (j1 = 0; j1 < MAX_RUN; j1++) {
1611 if (runCtrl[j1].fileId == 0) {
1612 runCtrl[j1].procId = 2; //--> do no longer accept events for processing
1613//---- problem: processing still going on ==> must wait for closing ....
1614 snprintf (str, MXSTR,
1615 "P finish run since new one opened %d",
1616 runCtrl[j1].runId);
1617 runFinish1 (runCtrl[j1].runId);
1618 }
1619
1620 }
1621
1622 actRun.Version = 1;
1623 actRun.RunType = -1; //to be adapted
1624
1625 actRun.Nroi = runCtrl[j].roi0;
1626 actRun.NroiTM = runCtrl[j].roi8;
1627 if (actRun.Nroi == actRun.NroiTM)
1628 actRun.NroiTM = 0;
1629 actRun.RunTime = runCtrl[j].firstTime;
1630 actRun.RunUsec = runCtrl[j].firstTime;
1631 actRun.NBoard = NBOARDS;
1632 actRun.NPix = NPIX;
1633 actRun.NTm = NTMARK;
1634 actRun.Nroi = mBuffer[id].nRoi;
1635 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1636 NBOARDS * sizeof (PEVNT_HEADER));
1637
1638 runCtrl[j].fileHd =
1639 runOpen (irun, &actRun, sizeof (actRun));
1640 if (runCtrl[j].fileHd == NULL) {
1641 snprintf (str, MXSTR,
1642 "P could not open a file for run %d", irun);
1643 factOut (kError, 502, str);
1644 runCtrl[j].fileId = 91;
1645 runCtrl[j].procId = 91;
1646 } else {
1647 snprintf (str, MXSTR, "P opened new run_file %d evt %d",
1648 irun, ievt);
1649 factOut (kInfo, -1, str);
1650 runCtrl[j].fileId = 0;
1651 runCtrl[j].procId = 0;
1652 }
1653
1654 }
1655//-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
1656 if (runCtrl[j].procId == 0) {
1657 if (runCtrl[j].closeTime < g_actTime
1658 || runCtrl[j].lastTime < g_actTime - 300
1659 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) {
1660 snprintf (str, MXSTR,
1661 "P reached end of run condition for run %d",
1662 irun);
1663 factOut (kInfo, 502, str);
1664 runFinish1 (runCtrl[j].runId);
1665 runCtrl[j].procId = 1;
1666 }
1667 }
1668 if (runCtrl[j].procId != 0) {
1669 snprintf (str, MXSTR,
1670 "P skip event %d because no active run %d", ievt,
1671 irun);
1672 factOut (kInfo, 502, str);
1673 evtCtrl.evtStat[k0] = 9091;
1674 } else {
1675//--------
1676//--------
1677 int id = evtCtrl.evtBuf[k0];
1678 int itevt = mBuffer[id].trgNum;
1679 int itrg = mBuffer[id].trgTyp;
1680 int roi = mBuffer[id].nRoi;
1681 int roiTM = mBuffer[id].nRoiTM;
1682
1683//make sure unused pixels/tmarks are cleared to zero
1684 if (roiTM == roi)
1685 roiTM = 0;
1686 int ip, it, dest, ib;
1687 for (ip = 0; ip < NPIX; ip++) {
1688 if (mBuffer[id].fEvent->StartPix[ip] == -1) {
1689 dest = ip * roi;
1690 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1691 }
1692 }
1693 for (it = 0; it < NTMARK; it++) {
1694 if (mBuffer[id].fEvent->StartTM[it] == -1) {
1695 dest = it * roi + NPIX * roi;
1696 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1697 }
1698 }
1699
1700
1701//and set correct event header ; also check for consistency in event (not yet)
1702 mBuffer[id].fEvent->Roi = roi;
1703 mBuffer[id].fEvent->RoiTM = roiTM;
1704 mBuffer[id].fEvent->EventNum = ievt;
1705 mBuffer[id].fEvent->TriggerNum = itevt;
1706 mBuffer[id].fEvent->TriggerType = itrg;
1707 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
1708 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
1709 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
1710 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
1711 mBuffer[id].fEvent->SoftTrig = 0;
1712
1713
1714 for (ib = 0; ib < NBOARDS; ib++) {
1715 if (mBuffer[id].board[ib] == -1) { //board is not read
1716 mBuffer[id].FADhead[ib].start_package_flag = 0;
1717 mBuffer[id].fEvent->BoardTime[ib] = 0;
1718 } else {
1719 mBuffer[id].fEvent->BoardTime[ib] =
1720 ntohl (mBuffer[id].FADhead[ib].time);
1721 }
1722 }
1723
1724 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
1725 mBuffer[id].fEvent);
1726 gi.procTot++;
1727 numProc++;
1728
1729 if (i < 0) {
1730 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
1731 gi.procErr++;
1732 } else {
1733 evtCtrl.evtStat[k0] = 1000;
1734 runCtrl[j].procEvt++;
1735 }
1736 }
1737 }
1738 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) {
1739 numWait++;
1740 }
1741 }
1742
1743 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1744 snprintf (str, MXSTR, "Exit Processing Process ...");
1745 factOut (kInfo, -1, str);
1746 gp_runStat = -22; //==> we should exit
1747 gj.procStat = -22; //==> we should exit
1748 return 0;
1749 }
1750
1751 if (numProc == 0) {
1752 //seems we have nothing to do, so sleep a little
1753 xwait.tv_sec = 0;
1754 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1755 nanosleep (&xwait, NULL);
1756 }
1757 gp_runStat = gi_runStat;
1758 gj.procStat = gj.readStat;
1759
1760 }
1761
1762 //we are asked to abort asap ==> must flag all remaining events
1763 // when gi_runStat claims that all events are in the buffer...
1764
1765 snprintf (str, MXSTR, "Abort Processing Process ...");
1766 factOut (kInfo, -1, str);
1767 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1768 if (kd < 0)
1769 kd += (MAX_EVT * MAX_RUN);
1770
1771 for (k = 0; k < gi_maxProc; k++) {
1772 pthread_join (thread[k], (void **) &status);
1773 }
1774
1775 int k1 = evtCtrl.frstPtr;
1776 for (k = k1; k < (k1 + kd); k++) {
1777 int k0 = k % (MAX_EVT * MAX_RUN);
1778 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {
1779 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
1780 }
1781 }
1782
1783 gp_runStat = -99;
1784 gj.procStat = -99;
1785
1786 return 0;
1787
1788} /*-----------------------------------------------------------------*/
1789
1790int
1791CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
1792{
1793/* close run runId (all all runs if runId=0) */
1794/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1795 int j;
1796
1797
1798 if (runId == 0) {
1799 for (j = 0; j < MAX_RUN; j++) {
1800 if (runCtrl[j].fileId == 0) { //run is open
1801 runCtrl[j].closeTime = closeTime;
1802 runCtrl[j].maxEvt = maxEvt;
1803 }
1804 }
1805 return 0;
1806 }
1807
1808 for (j = 0; j < MAX_RUN; j++) {
1809 if (runCtrl[j].runId == runId) {
1810 if (runCtrl[j].fileId == 0) { //run is open
1811 runCtrl[j].closeTime = closeTime;
1812 runCtrl[j].maxEvt = maxEvt;
1813 return 0;
1814 } else if (runCtrl[j].fileId < 0) { //run not yet opened
1815 runCtrl[j].closeTime = closeTime;
1816 runCtrl[j].maxEvt = maxEvt;
1817 return +1;
1818 } else { // run already closed
1819 return +2;
1820 }
1821 }
1822 } //we only reach here if the run was never created
1823 return -1;
1824
1825} /*-----------------------------------------------------------------*/
1826
1827
1828void *
1829writeEvt (void *ptr)
1830{
1831/* *** main loop writing event (including opening and closing run-files */
1832
1833 int numWrite, numWait;
1834 int k, j, i;
1835 struct timespec xwait;
1836 char str[MXSTR];
1837
1838 cpu_set_t mask;
1839 int cpu = 1; //write thread
1840
1841 snprintf (str, MXSTR, "Starting write-thread");
1842 factOut (kInfo, -1, str);
1843
1844/* CPU_ZERO initializes all the bits in the mask to zero. */
1845 CPU_ZERO (&mask);
1846/* CPU_SET sets only the bit corresponding to cpu. */
1847 CPU_SET (cpu, &mask);
1848/* sched_setaffinity returns 0 in success */
1849 if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1850 snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
1851 }
1852
1853 int lastRun = 0; //usually run from last event still valid
1854
1855 while (g_runStat > -2) {
1856
1857 numWait = numWrite = 0;
1858 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1859 if (kd < 0)
1860 kd += (MAX_EVT * MAX_RUN);
1861
1862 int k1 = evtCtrl.frstPtr;
1863 for (k = k1; k < (k1 + kd); k++) {
1864 int k0 = k % (MAX_EVT * MAX_RUN);
1865//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1866 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9900) {
1867
1868 if (gi_resetR > 1) { //we must drain the buffer asap
1869 evtCtrl.evtStat[k0] = 9904;
1870 } else {
1871
1872
1873 int id = evtCtrl.evtBuf[k0];
1874 uint32_t irun = mBuffer[id].runNum;
1875 int32_t ievt = mBuffer[id].evNum;
1876
1877 gi.wrtTot++;
1878 if (runCtrl[lastRun].runId == irun) {
1879 j = lastRun;
1880 } else {
1881 //check which fileID to use (or open if needed)
1882 for (j = 0; j < MAX_RUN; j++) {
1883 if (runCtrl[j].runId == irun)
1884 break;
1885 }
1886 if (j >= MAX_RUN) {
1887 snprintf (str, MXSTR,
1888 "W error: can not find run %d for event %d in %d",
1889 irun, ievt, id);
1890 factOut (kFatal, 901, str);
1891 gi.wrtErr++;
1892 }
1893 lastRun = j;
1894 }
1895
1896 if (runCtrl[j].fileId < 0) {
1897 actRun.Version = 1;
1898 actRun.RunType = -1; //to be adapted
1899
1900 actRun.Nroi = runCtrl[j].roi0;
1901 actRun.NroiTM = runCtrl[j].roi8;
1902 if (actRun.Nroi == actRun.NroiTM)
1903 actRun.NroiTM = 0;
1904 actRun.RunTime = runCtrl[j].firstTime;
1905 actRun.RunUsec = runCtrl[j].firstTime;
1906 actRun.NBoard = NBOARDS;
1907 actRun.NPix = NPIX;
1908 actRun.NTm = NTMARK;
1909 actRun.Nroi = mBuffer[id].nRoi;
1910 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1911 NBOARDS * sizeof (PEVNT_HEADER));
1912
1913 runCtrl[j].fileHd =
1914 runOpen (irun, &actRun, sizeof (actRun));
1915 if (runCtrl[j].fileHd == NULL) {
1916 snprintf (str, MXSTR,
1917 "W could not open a file for run %d", irun);
1918 factOut (kError, 502, str);
1919 runCtrl[j].fileId = 91;
1920 } else {
1921 snprintf (str, MXSTR, "W opened new run_file %d evt %d",
1922 irun, ievt);
1923 factOut (kInfo, -1, str);
1924 runCtrl[j].fileId = 0;
1925 }
1926
1927 }
1928
1929 if (runCtrl[j].fileId != 0) {
1930 if (runCtrl[j].fileId < 0) {
1931 snprintf (str, MXSTR,
1932 "W never opened file for this run %d", irun);
1933 factOut (kError, 123, str);
1934 } else if (runCtrl[j].fileId < 100) {
1935 snprintf (str, MXSTR, "W.file for this run is closed %d",
1936 irun);
1937 factOut (kWarn, 123, str);
1938 runCtrl[j].fileId += 100;
1939 } else {
1940 snprintf (str, MXSTR, "W:file for this run is closed %d",
1941 irun);
1942 factOut (kDebug, 123, str);
1943 }
1944 evtCtrl.evtStat[k0] = 9903;
1945 gi.wrtErr++;
1946 } else {
1947 i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
1948 sizeof (mBuffer[id]));
1949 if (i >= 0) {
1950 runCtrl[j].lastTime = g_actTime;
1951 runCtrl[j].actEvt++;
1952 evtCtrl.evtStat[k0] = 9901;
1953 snprintf (str, MXSTR,
1954 "%5d successfully wrote for run %d id %5d",
1955 ievt, irun, k0);
1956 factOut (kDebug, 504, str);
1957// gj.writEvt++ ;
1958 } else {
1959 snprintf (str, MXSTR, "W error writing event for run %d",
1960 irun);
1961 factOut (kError, 503, str);
1962 evtCtrl.evtStat[k0] = 9902;
1963 gi.wrtErr++;
1964 }
1965
1966 if (i < 0
1967 || runCtrl[j].lastTime < g_actTime - 300
1968 || runCtrl[j].closeTime < g_actTime
1969 || runCtrl[j].maxEvt < runCtrl[j].actEvt) {
1970 int ii = 0;
1971 if (i < 0)
1972 ii = 1;
1973 else if (runCtrl[j].closeTime < g_actTime)
1974 ii = 2;
1975 else if (runCtrl[j].lastTime < g_actTime - 300)
1976 ii = 3;
1977 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
1978 ii = 4;
1979
1980
1981
1982 //close run for whatever reason
1983 if (runCtrl[j].runId == gi_myRun)
1984 gi_myRun = g_actTime;
1985
1986 if (runCtrl[j].procId == 0) {
1987 runFinish1 (runCtrl[j].runId);
1988 runCtrl[j].procId = 92;
1989 }
1990
1991 runCtrl[j].closeTime = g_actTime - 1;
1992 i = runClose (runCtrl[j].fileHd, &runTail[j],
1993 sizeof (runTail[j]));
1994 if (i < 0) {
1995 snprintf (str, MXSTR, "error closing run %d %d AAA",
1996 runCtrl[j].runId, i);
1997 factOut (kError, 503, str);
1998 runCtrl[j].fileId = 92;
1999 } else {
2000 snprintf (str, MXSTR, "W closed run %d AAA %d", irun,
2001 ii);
2002 factOut (kInfo, 503, str);
2003 runCtrl[j].fileId = 93;
2004 }
2005 }
2006 }
2007 }
2008 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2009 numWait++;
2010 }
2011
2012 //check if we should close a run (mainly when no event pending)
2013 for (j = 0; j < MAX_RUN; j++) {
2014 if (runCtrl[j].fileId == 0
2015 && (runCtrl[j].closeTime < g_actTime
2016 || runCtrl[j].lastTime < g_actTime - 300
2017 || runCtrl[j].maxEvt <= runCtrl[j].actEvt)) {
2018 if (runCtrl[j].runId == gi_myRun)
2019 gi_myRun = g_actTime;
2020 int ii = 0;
2021 if (runCtrl[j].closeTime < g_actTime)
2022 ii = 2;
2023 else if (runCtrl[j].lastTime < g_actTime - 300)
2024 ii = 3;
2025 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2026 ii = 4;
2027
2028 if (runCtrl[j].procId == 0) {
2029 runFinish1 (runCtrl[j].runId);
2030 runCtrl[j].procId = 92;
2031 }
2032
2033 runCtrl[j].closeTime = g_actTime - 1;
2034 i = runClose (runCtrl[j].fileHd, &runTail[j],
2035 sizeof (runTail[j]));
2036 if (i < 0) {
2037 snprintf (str, MXSTR, "error closing run %d %d BBB",
2038 runCtrl[j].runId, i);
2039 factOut (kError, 506, str);
2040 runCtrl[j].fileId = 94;
2041 } else {
2042 snprintf (str, MXSTR, "W closed run %d BBB %d",
2043 runCtrl[j].runId, ii);
2044 factOut (kInfo, 507, str);
2045 runCtrl[j].fileId = 95;
2046 }
2047 }
2048 }
2049
2050 if (numWrite == 0) {
2051 //seems we have nothing to do, so sleep a little
2052 xwait.tv_sec = 0;
2053 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2054 nanosleep (&xwait, NULL);
2055 }
2056
2057 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2058 snprintf (str, MXSTR, "Finish Write Process ...");
2059 factOut (kInfo, -1, str);
2060 gw_runStat = -22; //==> we should exit
2061 gj.writStat = -22; //==> we should exit
2062 goto closerun;
2063 }
2064 gw_runStat = gi_runStat;
2065 gj.writStat = gj.readStat;
2066
2067 }
2068
2069 //must close all open files ....
2070 snprintf (str, MXSTR, "Abort Writing Process ...");
2071 factOut (kInfo, -1, str);
2072
2073 closerun:
2074 snprintf (str, MXSTR, "Close all open files ...");
2075 factOut (kInfo, -1, str);
2076 for (j = 0; j < MAX_RUN; j++)
2077 if (runCtrl[j].fileId == 0) {
2078 if (runCtrl[j].runId == gi_myRun)
2079 gi_myRun = g_actTime;
2080
2081 if (runCtrl[j].procId == 0) {
2082 runFinish (runCtrl[j].runId);
2083 runCtrl[j].procId = 92;
2084 }
2085
2086 runCtrl[j].closeTime = g_actTime - 1;
2087 i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));
2088 int ii = 0;
2089 if (runCtrl[j].closeTime < g_actTime)
2090 ii = 2;
2091 else if (runCtrl[j].lastTime < g_actTime - 300)
2092 ii = 3;
2093 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2094 ii = 4;
2095 if (i < 0) {
2096 snprintf (str, MXSTR, "error closing run %d %d CCC",
2097 runCtrl[j].runId, i);
2098 factOut (kError, 506, str);
2099 runCtrl[j].fileId = 96;
2100 } else {
2101 snprintf (str, MXSTR, "W closed run %d CCC %d", runCtrl[j].runId,
2102 ii);
2103 factOut (kInfo, 507, str);
2104 runCtrl[j].fileId = 97;
2105 }
2106 }
2107
2108 gw_runStat = -99;
2109 gj.writStat = -99;
2110 snprintf (str, MXSTR, "Exit Writing Process ...");
2111 factOut (kInfo, -1, str);
2112 return 0;
2113
2114
2115
2116
2117} /*-----------------------------------------------------------------*/
2118
2119
2120
2121
2122void
2123StartEvtBuild ()
2124{
2125
2126 int i, j, imax, status, th_ret[50];
2127 pthread_t thread[50];
2128 struct timespec xwait;
2129
2130 gi_runStat = gp_runStat = gw_runStat = 0;
2131 gj.readStat = gj.procStat = gj.writStat = 0;
2132
2133 snprintf (str, MXSTR, "Starting EventBuilder V15.07 A");
2134 factOut (kInfo, -1, str);
2135
2136//initialize run control logics
2137 for (i = 0; i < MAX_RUN; i++) {
2138 runCtrl[i].runId = 0;
2139 runCtrl[i].fileId = -2;
2140 }
2141
2142//prepare for subProcesses
2143 gi_maxSize = g_maxSize;
2144 if (gi_maxSize <= 0)
2145 gi_maxSize = 1;
2146
2147 gi_maxProc = g_maxProc;
2148 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2149 snprintf (str, MXSTR, "illegal number of processes %d", gi_maxProc);
2150 factOut (kFatal, 301, str);
2151 gi_maxProc = 1;
2152 }
2153//partially initialize event control logics
2154 evtCtrl.frstPtr = 0;
2155 evtCtrl.lastPtr = 0;
2156
2157//start all threads (more to come) when we are allowed to ....
2158 while (g_runStat == 0) {
2159 xwait.tv_sec = 0;
2160 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2161 nanosleep (&xwait, NULL);
2162 }
2163
2164 i = 0;
2165 th_ret[i] = pthread_create (&thread[i], NULL, readFAD, NULL);
2166 i++;
2167 th_ret[i] = pthread_create (&thread[i], NULL, procEvt, NULL);
2168 i++;
2169 th_ret[i] = pthread_create (&thread[i], NULL, writeEvt, NULL);
2170 i++;
2171 imax = i;
2172
2173
2174#ifdef BILAND
2175 xwait.tv_sec = 30;;
2176 xwait.tv_nsec = 0; // sleep for ~20sec
2177 nanosleep (&xwait, NULL);
2178
2179 printf ("close all runs in 2 seconds\n");
2180
2181 CloseRunFile (0, time (NULL) + 2, 0);
2182
2183 xwait.tv_sec = 1;;
2184 xwait.tv_nsec = 0; // sleep for ~20sec
2185 nanosleep (&xwait, NULL);
2186
2187 printf ("setting g_runstat to -1\n");
2188
2189 g_runStat = -1;
2190#endif
2191
2192
2193//wait for all threads to finish
2194 for (i = 0; i < imax; i++) {
2195 j = pthread_join (thread[i], (void **) &status);
2196 }
2197
2198} /*-----------------------------------------------------------------*/
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214 /*-----------------------------------------------------------------*/
2215 /*-----------------------------------------------------------------*/
2216 /*-----------------------------------------------------------------*/
2217 /*-----------------------------------------------------------------*/
2218 /*-----------------------------------------------------------------*/
2219
2220#ifdef BILAND
2221
2222int
2223subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2224 int8_t * buffer)
2225{
2226 printf ("called subproc %d\n", threadID);
2227 return threadID + 1;
2228}
2229
2230
2231
2232
2233 /*-----------------------------------------------------------------*/
2234 /*-----------------------------------------------------------------*/
2235 /*-----------------------------------------------------------------*/
2236 /*-----------------------------------------------------------------*/
2237 /*-----------------------------------------------------------------*/
2238
2239
2240
2241
2242FileHandle_t
2243runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2244{
2245 return 1;
2246};
2247
2248int
2249runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2250{
2251 return 1;
2252 usleep (10000);
2253 return 1;
2254}
2255
2256
2257//{ return 1; } ;
2258
2259int
2260runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2261{
2262 return 1;
2263};
2264
2265
2266
2267
2268int
2269eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2270{
2271 int i = 0;
2272
2273// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2274// for (i=0; i<NBOARDS; i++) {
2275// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2276// }
2277 return 0;
2278}
2279
2280
2281void
2282factStatNew (EVT_STAT gi)
2283{
2284 int i;
2285
2286//for (i=0;i<MAX_SOCK;i++) {
2287// printf("%4d",gi.numRead[i]);
2288// if (i%20 == 0 ) printf("\n");
2289//}
2290}
2291
2292void
2293gotNewRun (int runnr, PEVNT_HEADER * headers)
2294{
2295 printf ("got new run %d\n", runnr);
2296 return;
2297}
2298
2299void
2300factStat (GUI_STAT gj)
2301{
2302// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2303// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2304}
2305
2306
2307void
2308debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2309 int state, uint32_t tsec, uint32_t tusec)
2310{
2311// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2312}
2313
2314
2315
2316void
2317debugStream (int isock, void *buf, int len)
2318{
2319}
2320
2321void
2322debugHead (int i, int j, void *buf)
2323{
2324}
2325
2326
2327void
2328factOut (int severity, int err, char *message)
2329{
2330 static FILE *fd;
2331 static int file = 0;
2332
2333 if (file == 0) {
2334 printf ("open file\n");
2335 fd = fopen ("x.out", "w+");
2336 file = 999;
2337 }
2338
2339 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2340
2341 if (severity != kDebug)
2342 printf ("%3d %3d | %s\n", severity, err, message);
2343}
2344
2345
2346
2347int
2348main ()
2349{
2350 int i, b, c, p;
2351 char ipStr[100];
2352 struct in_addr IPaddr;
2353
2354 g_maxMem = 1024 * 1024; //MBytes
2355//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2356 g_maxMem = g_maxMem * 200; //100MBytes
2357
2358 g_maxProc = 20;
2359 g_maxSize = 30000;
2360
2361 g_runStat = 40;
2362
2363 i = 0;
2364
2365// version for standard crates
2366//for (c=0; c<4,c++) {
2367// for (b=0; b<10; b++) {
2368// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2369//
2370// inet_pton(PF_INET, ipStr, &IPaddr) ;
2371//
2372// g_port[i].sockAddr.sin_family = PF_INET;
2373// g_port[i].sockAddr.sin_port = htons(5000) ;
2374// g_port[i].sockAddr.sin_addr = IPaddr ;
2375// g_port[i].sockDef = 1 ;
2376// i++ ;
2377// }
2378//}
2379//
2380//version for PC-test *
2381 for (c = 0; c < 4; c++) {
2382 for (b = 0; b < 10; b++) {
2383 sprintf (ipStr, "10.0.%d.11", 128 + c);
2384 if (c < 2)
2385 sprintf (ipStr, "10.0.%d.11", 128);
2386 else
2387 sprintf (ipStr, "10.0.%d.11", 131);
2388// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2389
2390 inet_pton (PF_INET, ipStr, &IPaddr);
2391 p = 31919 + 100 * c + 10 * b;
2392
2393
2394 g_port[i].sockAddr.sin_family = PF_INET;
2395 g_port[i].sockAddr.sin_port = htons (p);
2396 g_port[i].sockAddr.sin_addr = IPaddr;
2397 g_port[i].sockDef = 1;
2398
2399 i++;
2400 }
2401 }
2402
2403
2404//g_port[17].sockDef =-1 ;
2405//g_actBoards-- ;
2406
2407 StartEvtBuild ();
2408
2409 return 0;
2410
2411}
2412#endif
Note: See TracBrowser for help on using the repository browser.