source: trunk/FACT++/src/EventBuilder.c@ 12464

Last change on this file since 12464 was 12464, checked in by lyard, 13 years ago
I am so tired
File size: 87.1 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6
7#include <stdlib.h>
8#include <stdint.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include "EventBuilder.h"
26
27enum Severity
28{
29 kMessage = 10, ///< Just a message, usually obsolete
30 kInfo = 20, ///< An info telling something which can be interesting to know
31 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
32 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
33 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
34 kDebug = 99, ///< A message used for debugging only
35};
36
37#define MIN_LEN 32 // min #bytes needed to interpret FADheader
38#define MAX_LEN 256*1024 // size of read-buffer per socket
39
40//#define nanosleep(x,y)
41
42extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
43extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
44extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
45//extern int runFinish (uint32_t runnr);
46
47extern void factOut (int severity, int err, char *message);
48
49extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
50
51
52extern void factStat (GUI_STAT gj);
53
54extern void factStatNew (EVT_STAT gi);
55
56extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
57
58extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
59 int8_t * buffer);
60
61extern void debugHead (int i, int j, void *buf);
62
63extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
64 int32_t runnr, int state, uint32_t tsec,
65 uint32_t tusec);
66extern void debugStream (int isock, void *buf, int len);
67
68int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
69
70
71
72
73
74int g_maxProc;
75int g_maxSize;
76int gi_maxSize;
77int gi_maxProc;
78
79uint g_actTime;
80uint g_actUsec;
81int g_runStat;
82int g_reset;
83int g_useFTM;
84
85int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
86size_t g_maxMem; //maximum memory allowed for buffer
87
88//no longer needed ...
89int g_maxBoards; //maximum number of boards to be initialized
90int g_actBoards;
91//
92
93FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
94
95
96int gi_runStat;
97int gp_runStat;
98int gw_runStat;
99
100int gi_memStat = +1;
101
102uint32_t gi_myRun = 0;
103uint32_t actrun = 0;
104
105
106uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
107
108//uint gi_EvtStart= 0 ;
109//uint gi_EvtRead = 0 ;
110//uint gi_EvtBad = 0 ;
111//uint gi_EvtTot = 0 ;
112//size_t gi_usedMem = 0 ;
113
114//uint gw_EvtTot = 0 ;
115//uint gp_EvtTot = 0 ;
116
117PIX_MAP g_pixMap[NPIX];
118
119EVT_STAT gi;
120GUI_STAT gj;
121
122EVT_CTRL evtCtrl; //control of events during processing
123int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
124
125WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
126
127#define MXSTR 1000
128char str[MXSTR];
129
130//ETIENNE
131#define MAX_SLOTS_PER_CHUNK 100
132
133typedef struct {
134 int32_t eventNumber;
135 int32_t chunk;
136 int32_t slot;
137} CHUNK_MAPPING;
138
139CHUNK_MAPPING mBufferMapping[MAX_EVT * MAX_RUN];
140
141#define MAX_EVT_MEM (sizeof(EVENT) + NPIX*1024*2 + NTMARK*1024*2)
142#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
143#define MAX_SLOT_SIZE (MAX_EVT_MEM + MAX_HEAD_MEM)
144#define MAX_CHUNK_SIZE (MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK)
145
146typedef struct {
147 void * pointers[MAX_SLOTS_PER_CHUNK];
148 int32_t events[MAX_SLOTS_PER_CHUNK];
149 int32_t nFreeSlots;
150 int32_t nSlots;
151} ETI_CHUNK;
152
153int32_t numAllocatedChunks = 0;
154
155#define MAX_CHUNKS 8096
156ETI_CHUNK EtiMemoryChunks[MAX_CHUNKS];
157
158void* ETI_Malloc(int evtId, int evtIndex)
159{
160 int i,j;
161 for (i=0;i<numAllocatedChunks;i++) {
162 if (EtiMemoryChunks[i].nFreeSlots > 0) {
163 for (j=0;j<EtiMemoryChunks[i].nSlots;j++)
164 {
165 if (EtiMemoryChunks[i].events[j] == -1)
166 {
167 EtiMemoryChunks[i].events[j] = evtId;
168 EtiMemoryChunks[i].nFreeSlots--;
169 mBufferMapping[evtIndex].eventNumber = evtId;
170 mBufferMapping[evtIndex].chunk = i;
171 mBufferMapping[evtIndex].slot = j;
172 return EtiMemoryChunks[i].pointers[j];
173 }
174 }
175 //If I reach this point then we have a problem because it should have found
176 //a free spot just above.
177 }
178 }
179 //If we reach this point this means that we should allocate more memory
180 int32_t numNewSlots = 0;
181 if ((numAllocatedChunks + 1)*MAX_CHUNK_SIZE < g_maxMem)
182 numNewSlots = MAX_SLOTS_PER_CHUNK;
183 else
184 numNewSlots = (g_maxMem - numAllocatedChunks*MAX_CHUNK_SIZE)/MAX_SLOT_SIZE;
185
186 if (numNewSlots == 0)//cannot allocate more without exceeding the max mem limit
187 {
188 return NULL;
189 }
190
191 EtiMemoryChunks[numAllocatedChunks].pointers[0] = malloc(MAX_SLOT_SIZE*numNewSlots);
192 if (EtiMemoryChunks[numAllocatedChunks].pointers[0] == NULL)
193 {
194 snprintf (str, MXSTR, "could not allocate %lu bytes. %d chunks are currently allocated (max allowed %lu bytes)", MAX_CHUNK_SIZE, numAllocatedChunks, g_maxMem);
195 factOut (kError, 000, str);
196 return NULL;
197 }
198
199 EtiMemoryChunks[numAllocatedChunks].nSlots = numNewSlots;
200 EtiMemoryChunks[numAllocatedChunks].events[0] = evtId;
201 EtiMemoryChunks[numAllocatedChunks].nFreeSlots = numNewSlots-1;
202 mBufferMapping[evtIndex].eventNumber = evtId;
203 mBufferMapping[evtIndex].chunk = numAllocatedChunks;
204 mBufferMapping[evtIndex].slot = 0;
205
206 for (i=1;i<numNewSlots;i++)
207 {
208 EtiMemoryChunks[numAllocatedChunks].pointers[i] = &(((char*)(EtiMemoryChunks[numAllocatedChunks].pointers[i-1]))[MAX_SLOT_SIZE]);
209 EtiMemoryChunks[numAllocatedChunks].events[i] = -1;
210 }
211 numAllocatedChunks++;
212
213 return EtiMemoryChunks[numAllocatedChunks-1].pointers[0];
214}
215
216void ETI_Free(int evtId, int evtIndex)
217{
218 ETI_CHUNK* currentChunk = &EtiMemoryChunks[mBufferMapping[evtIndex].chunk];
219 if (currentChunk->events[mBufferMapping[evtIndex].slot] != evtId)
220 {
221 snprintf (str, MXSTR, "Mismatch in chunk mapping table. expected evtId %d. Got %d. No memory was freed.", evtId, currentChunk->events[mBufferMapping[evtIndex].slot]);
222 factOut (kError, 000, str);
223 return;
224 }
225 currentChunk->events[mBufferMapping[evtIndex].slot] = -1;
226 currentChunk->nFreeSlots++;
227
228 int chunkIndex = mBufferMapping[evtIndex].chunk;
229 if (chunkIndex != numAllocatedChunks-1)
230 return;
231
232 while (EtiMemoryChunks[chunkIndex].nFreeSlots == EtiMemoryChunks[chunkIndex].nSlots)
233 {//free this chunk
234 if (EtiMemoryChunks[chunkIndex].pointers[0] == NULL)
235 {
236 snprintf(str, MXSTR, "Chunk %d not allocated as it ought to be. Skipping release", chunkIndex);
237 factOut(kError, 000, str);
238 return;
239 }
240 free(EtiMemoryChunks[chunkIndex].pointers[0]);
241 EtiMemoryChunks[chunkIndex].pointers[0] = NULL;
242 numAllocatedChunks--;
243 chunkIndex--;
244 if (numAllocatedChunks == 0)
245 break;
246 }
247}
248//END ETIENNE
249
250
251
252RUN_HEAD actRun;
253
254RUN_CTRL runCtrl[MAX_RUN];
255
256RUN_TAIL runTail[MAX_RUN];
257
258
259/*
260*** Definition of rdBuffer to read in IP packets; keep it global !!!!
261 */
262
263
264typedef union
265{
266 int8_t B[MAX_LEN];
267 int16_t S[MAX_LEN / 2];
268 int32_t I[MAX_LEN / 4];
269 int64_t L[MAX_LEN / 8];
270} CNV_FACT;
271
272typedef struct
273{
274 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
275 int32_t bufPos; //next byte to read to the buffer next
276 int32_t bufLen; //number of bytes left to read
277// size_t bufLen; //number of bytes left to read size_t might be better
278 int32_t skip; //number of bytes skipped before start of event
279
280 int errCnt; //how often connect failed since last successful
281 int sockStat; //-1 if socket not yet connected , 99 if not exist
282 int socket; //contains the sockets
283 struct sockaddr_in SockAddr; //IP for each socket
284
285 int evtID; // event ID of event currently read
286 int runID; // run "
287 int ftmID; // event ID from FTM
288 uint fadLen; // FADlength of event currently read
289 int fadVers; // Version of FAD
290 int ftmTyp; // trigger type
291 int board; // boardID (softwareID: 0..40 )
292 int Port;
293
294 CNV_FACT *rBuf;
295
296#ifdef EVTDEBUG
297 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
298#endif
299
300} READ_STRUCT;
301
302
303typedef union
304{
305 int8_t B[2];
306 int16_t S;
307} SHORT_BYTE;
308
309
310
311
312
313SHORT_BYTE start, stop;
314
315READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
316
317
318
319/*-----------------------------------------------------------------*/
320
321
322/*-----------------------------------------------------------------*/
323
324int
325runFinish1 (uint32_t runnr)
326{
327 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
328 runnr);
329 factOut (kInfo, 173, str); //but continue anyhow
330 return 0;
331}
332int
333runFinish (uint32_t runnr)
334{
335 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
336 runnr);
337 factOut (kInfo, 173, str); //but continue anyhow
338 return 0;
339}
340
341int
342GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
343 READ_STRUCT * rd)
344{
345/*
346*** generate Address, create sockets and allocates readbuffer for it
347***
348*** if flag==0 generate socket and buffer
349*** <0 destroy socket and buffer
350*** >0 close and redo socket
351***
352*** sid : board*7 + port id
353 */
354
355 int j;
356 int optval = 1; //activate keepalive
357 socklen_t optlen = sizeof (optval);
358
359
360 if (sid % 7 >= NUMSOCK) {
361 //this is a not used socket, so do nothing ...
362 rd->sockStat = 77;
363 rd->rBuf = NULL ;
364 return 0;
365 }
366
367 if (rd->sockStat == 0) { //close socket if open
368 j = close (rd->socket);
369 if (j > 0) {
370 snprintf (str, MXSTR, "Error closing socket %d | %m", sid);
371 factOut (kFatal, 771, str);
372 } else {
373 snprintf (str, MXSTR, "Succesfully closed socket %d", sid);
374 factOut (kInfo, 771, str);
375 }
376 }
377
378 rd->sockStat = 99;
379
380 if (flag < 0) {
381 free (rd->rBuf); //and never open again
382#ifdef EVTDEBUG
383 free (rd->xBuf); //and never open again
384#endif
385 rd->rBuf = NULL;
386 return 0;
387 }
388
389
390 if (flag == 0) { //generate address and buffer ...
391 rd->Port = port;
392 rd->SockAddr.sin_family = sockAddr->sin_family;
393 rd->SockAddr.sin_port = htons (port);
394 rd->SockAddr.sin_addr = sockAddr->sin_addr;
395
396#ifdef EVTDEBUG
397 rd->xBuf = malloc (sizeof (CNV_FACT));
398#endif
399 rd->rBuf = malloc (sizeof (CNV_FACT));
400 if (rd->rBuf == NULL) {
401 snprintf (str, MXSTR, "Could not create local buffer %d", sid);
402 factOut (kFatal, 774, str);
403 rd->sockStat = 77;
404 return -3;
405 }
406 }
407
408
409 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
410 snprintf (str, MXSTR, "Could not generate socket %d | %m", sid);
411 factOut (kFatal, 773, str);
412 rd->sockStat = 88;
413 return -2;
414 }
415 optval = 1;
416 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
417 snprintf (str, MXSTR, "Could not set keepalive %d | %m", sid);
418 factOut (kInfo, 173, str); //but continue anyhow
419 }
420 optval = 10; //start after 10 seconds
421 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
422 snprintf (str, MXSTR, "Could not set keepidle %d | %m", sid);
423 factOut (kInfo, 173, str); //but continue anyhow
424 }
425 optval = 10; //do every 10 seconds
426 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
427 snprintf (str, MXSTR, "Could not set keepintvl %d | %m", sid);
428 factOut (kInfo, 173, str); //but continue anyhow
429 }
430 optval = 2; //close after 2 unsuccessful tries
431 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
432 snprintf (str, MXSTR, "Could not set keepalive probes %d | %m", sid);
433 factOut (kInfo, 173, str); //but continue anyhow
434 }
435
436
437 snprintf (str, MXSTR, "Successfully generated socket %d ", sid);
438 factOut (kInfo, 773, str);
439 rd->sockStat = -1; //try to (re)open socket
440 rd->errCnt = 0;
441 return 0;
442
443} /*-----------------------------------------------------------------*/
444
445 /*-----------------------------------------------------------------*/
446
447
448
449
450int
451mBufInit ()
452{
453// initialize mBuffer (mark all entries as unused\empty)
454
455 int i,j;
456 uint32_t actime;
457
458 actime = g_actTime + 50000000;
459
460 for (i = 0; i < MAX_EVT * MAX_RUN; i++) {
461 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
462 mBuffer[i].runNum = 0;
463
464 evtCtrl.evtBuf[i] = -1;
465 evtCtrl.evtStat[i] = -1;
466 evtCtrl.pcTime[i] = actime; //initiate to far future
467
468 //ETIENNE
469 mBufferMapping[i].chunk = -1;
470 mBufferMapping[i].eventNumber = -1;
471 mBufferMapping[i].slot = -1;
472 for (j=0;j<MAX_CHUNKS;j++)
473 EtiMemoryChunks[j].pointers[0] = NULL;
474 //END ETIENNE
475 }
476
477 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
478
479 evtCtrl.frstPtr = 0;
480 evtCtrl.lastPtr = 0;
481
482 return 0;
483
484} /*-----------------------------------------------------------------*/
485
486
487
488
489int
490mBufEvt (int evID, uint runID, int nRoi[], int sk,
491 int fadlen, int trgTyp, int trgNum, int fadNum)
492{
493// generate a new Event into mBuffer:
494// make sure only complete Event are possible, so 'free' will always work
495// returns index into mBuffer[], or negative value in case of error
496// error: <-9000 if roi screwed up (not consistent with run)
497// <-8000 (not consistent with event)
498// <-7000 (not consistent with board)
499// < 0 if no space left
500
501 struct timeval *tv, atv;
502 tv = &atv;
503 uint32_t tsec, tusec;
504 uint oldest;
505 int jold;
506
507 int i, k, jr, b, evFree;
508 int headmem = 0;
509 size_t needmem = 0;
510
511
512 b = sk / 7;
513
514 if (nRoi[0] < 0 || nRoi[0] > 1024) {
515 snprintf (str, MXSTR, "illegal nRoi[0]: %d", nRoi[0]);
516 factOut (kError, 999, str);
517 gj.badRoiR++;
518 gj.badRoi[b]++;
519 return -9999;
520 }
521
522 for (jr = 1; jr < 8; jr++) {
523 if (nRoi[jr] != nRoi[0]) {
524 snprintf (str, MXSTR, "wrong nRoi[%d]: %d %d", jr, nRoi[jr],
525 nRoi[0]);
526 factOut (kError, 711, str);
527 gj.badRoiB++;
528 gj.badRoi[b]++;
529 return -7101;
530 }
531 }
532 if (nRoi[8] < nRoi[0]) {
533 snprintf (str, MXSTR, "wrong nRoi_TM: %d %d", nRoi[8], nRoi[0]);
534 factOut (kError, 712, str);
535 gj.badRoiB++;
536 gj.badRoi[b]++;
537 return -7102;
538 }
539
540
541 i = evID % MAX_EVT;
542 evFree = -1;
543
544 for (k = 0; k < MAX_RUN; k++) {
545 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
546 // is it ok ????
547 if (mBuffer[i].nRoi != nRoi[0]
548 || mBuffer[i].nRoiTM != nRoi[8]) {
549 snprintf (str, MXSTR, "illegal evt_roi %d %d ; %d %d",
550 nRoi[0], nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM);
551 factOut (kError, 821, str);
552 gj.badRoiE++;
553 gj.badRoi[b]++;
554 return -8201;
555 }
556// count for inconsistencies
557
558 if (mBuffer[i].trgNum != trgNum)
559 mBuffer[i].Errors[0]++;
560 if (mBuffer[i].fadNum != fadNum)
561 mBuffer[i].Errors[1]++;
562 if (mBuffer[i].trgTyp != trgTyp)
563 mBuffer[i].Errors[2]++;
564
565 //everything seems fine so far ==> use this slot ....
566 return i;
567 }
568 if (evFree < 0 && mBuffer[i].evNum < 0)
569 evFree = i;
570 i += MAX_EVT;
571 }
572
573
574 //event does not yet exist; create it
575 if (evFree < 0) { //no space available in ctrl
576 snprintf (str, MXSTR, "no control slot to keep event %d", evID);
577 factOut (kError, 881, str);
578 return -1;
579 }
580 i = evFree; //found free entry; use it ...
581
582 gettimeofday (tv, NULL);
583 tsec = atv.tv_sec;
584 tusec = atv.tv_usec;
585
586 //check if runId already registered in runCtrl
587 evFree = -1;
588 oldest = g_actTime + 1000;
589 jold = -1;
590 for (k = 0; k < MAX_RUN; k++) {
591 if (runCtrl[k].runId == runID) {
592// if (runCtrl[k].procId > 0) { //run is closed -> reject
593// snprintf (str, MXSTR, "skip event since run %d finished", runID);
594// factOut (kInfo, 931, str);
595// return -21;
596// }
597
598 if (runCtrl[k].roi0 != nRoi[0]
599 || runCtrl[k].roi8 != nRoi[8]) {
600 snprintf (str, MXSTR, "illegal run_roi %d %d ; %d %d",
601 nRoi[0], nRoi[8], runCtrl[k].roi0, runCtrl[k].roi8);
602 factOut (kError, 931, str);
603 gj.badRoiR++;
604 gj.badRoi[b]++;
605 return -9301;
606 }
607 goto RUNFOUND;
608 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
609 evFree = k;
610 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
611 if (runCtrl[k].closeTime < oldest) {
612 oldest = runCtrl[k].closeTime;
613 jold = k;
614 }
615 }
616 }
617
618 if (evFree < 0 && jold < 0) {
619 snprintf (str, MXSTR, "not able to register the new run %d", runID);
620 factOut (kFatal, 883, str);
621 return -1001;
622 } else {
623 if (evFree < 0)
624 evFree = jold;
625 snprintf (str, MXSTR, "register new run %d(%d) roi: %d %d", runID,
626 evFree, nRoi[0], nRoi[8]);
627 factOut (kInfo, 503, str);
628 runCtrl[evFree].runId = runID;
629 runCtrl[evFree].roi0 = nRoi[0];
630 runCtrl[evFree].roi8 = nRoi[8];
631 runCtrl[evFree].fileId = -2;
632 runCtrl[evFree].procId = -2;
633 runCtrl[evFree].lastEvt = -1;
634 runCtrl[evFree].nextEvt = 0;
635 runCtrl[evFree].actEvt = 0;
636 runCtrl[evFree].procEvt = 0;
637 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
638 runCtrl[evFree].firstUsec = tusec;
639 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
640 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
641// runCtrl[evFree].lastTime = 0;
642
643 runTail[evFree].nEventsOk =
644 runTail[evFree].nEventsRej =
645 runTail[evFree].nEventsBad =
646 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
647 }
648
649 RUNFOUND:
650 //ETIENNE
651/* needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
652
653 headmem = NBOARDS * sizeof (PEVNT_HEADER);
654
655 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
656 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
657 if (gi_memStat > 0) {
658 gi_memStat = -99;
659 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
660 evID, sk);
661 factOut (kError, 882, str);
662 } else {
663 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
664 evID, sk);
665 factOut (kDebug, 882, str);
666 }
667 return -11;
668 }
669*/
670 mBuffer[i].FADhead = ETI_Malloc(evID, i);
671 mBuffer[i].buffer = NULL;
672 if (mBuffer[i].FADhead != NULL)
673 mBuffer[i].fEvent = (EVENT*)&(((char*)(mBuffer[i].FADhead))[MAX_HEAD_MEM]);
674 else
675 {
676 mBuffer[i].fEvent = NULL;
677 gj.usdMem = 0;
678 for (k=0;k<numAllocatedChunks;k++)
679 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
680 if (gj.usdMem > gj.maxMem)
681 gj.maxMem = gj.usdMem;
682 if (gi_memStat > 0) {
683 gi_memStat = -99;
684 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
685 evID, sk);
686 factOut (kError, 882, str);
687 } else {
688 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
689 evID, sk);
690 factOut (kDebug, 882, str);
691 }
692 return -11;
693 }
694 /*
695 mBuffer[i].FADhead = malloc (headmem);
696 if (mBuffer[i].FADhead == NULL) {
697 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
698 factOut (kError, 882, str);
699 return -12;
700 }
701
702 mBuffer[i].fEvent = malloc (needmem);
703 if (mBuffer[i].fEvent == NULL) {
704 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
705 factOut (kError, 882, str);
706 free (mBuffer[i].FADhead);
707 mBuffer[i].FADhead = NULL;
708 return -22;
709 }
710
711 mBuffer[i].buffer = malloc (gi_maxSize);
712 if (mBuffer[i].buffer == NULL) {
713 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
714 factOut (kError, 882, str);
715 free (mBuffer[i].FADhead);
716 mBuffer[i].FADhead = NULL;
717 free (mBuffer[i].fEvent);
718 mBuffer[i].fEvent = NULL;
719 return -32;
720 }*/
721 //END ETIENNE
722 //flag all boards as unused
723 mBuffer[i].nBoard = 0;
724 for (k = 0; k < NBOARDS; k++) {
725 mBuffer[i].board[k] = -1;
726 }
727 //flag all pixels as unused
728 for (k = 0; k < NPIX; k++) {
729 mBuffer[i].fEvent->StartPix[k] = -1;
730 }
731 //flag all TMark as unused
732 for (k = 0; k < NTMARK; k++) {
733 mBuffer[i].fEvent->StartTM[k] = -1;
734 }
735
736 mBuffer[i].fEvent->NumBoards = 0;
737 mBuffer[i].fEvent->PCUsec = tusec;
738 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
739 mBuffer[i].nRoi = nRoi[0];
740 mBuffer[i].nRoiTM = nRoi[8];
741 mBuffer[i].evNum = evID;
742 mBuffer[i].runNum = runID;
743 mBuffer[i].fadNum = fadNum;
744 mBuffer[i].trgNum = trgNum;
745 mBuffer[i].trgTyp = trgTyp;
746 mBuffer[i].evtLen = needmem;
747 mBuffer[i].Errors[0] =
748 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
749//ETIENNE
750 //gj.usdMem += needmem + headmem + gi_maxSize;
751 gj.usdMem = 0;
752 for (k=0;k<numAllocatedChunks;k++)
753 {
754 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
755 }
756//END ETIENNE
757 if (gj.usdMem > gj.maxMem)
758 gj.maxMem = gj.usdMem;
759
760 gj.bufTot++;
761 if (gj.bufTot > gj.maxEvt)
762 gj.maxEvt = gj.bufTot;
763
764 gj.rateNew++;
765
766 //register event in 'active list (reading)'
767
768 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
769 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
770 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
771 evtIdx[i] = evtCtrl.lastPtr;
772
773
774 snprintf (str, MXSTR,
775 "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", evID,
776 runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
777 factOut (kDebug, -11, str);
778 evtCtrl.lastPtr++;
779 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN)
780 evtCtrl.lastPtr = 0;
781
782 gi.evtGet++;
783
784 return i;
785
786} /*-----------------------------------------------------------------*/
787
788
789
790
791int
792mBufFree (int i)
793{
794//delete entry [i] from mBuffer:
795//(and make sure multiple calls do no harm ....)
796
797 int headmem = 0;
798 int evid;
799 size_t freemem = 0;
800
801 evid = mBuffer[i].evNum;
802 freemem = mBuffer[i].evtLen;
803 //ETIENNE
804 ETI_Free(evid, i);
805// free (mBuffer[i].fEvent);
806 mBuffer[i].fEvent = NULL;
807
808// free (mBuffer[i].FADhead);
809 mBuffer[i].FADhead = NULL;
810
811// free (mBuffer[i].buffer);
812 mBuffer[i].buffer = NULL;
813//END ETIENNE
814 headmem = NBOARDS * sizeof (PEVNT_HEADER);
815 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
816 mBuffer[i].runNum = 0;
817//ETIENNE
818// gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
819 gj.usdMem = 0;
820 int k;
821 for (k=0;k<numAllocatedChunks;k++)
822 {
823 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
824 }
825//END ETIENNE
826 gj.bufTot--;
827
828 if (gi_memStat < 0) {
829 if (gj.usdMem <= 0.75 * gj.maxMem)
830 gi_memStat = +1;
831 }
832
833
834 return 0;
835
836} /*-----------------------------------------------------------------*/
837
838
839void
840resetEvtStat ()
841{
842 int i;
843
844 for (i = 0; i < MAX_SOCK; i++)
845 gi.numRead[i] = 0;
846
847 for (i = 0; i < NBOARDS; i++) {
848 gi.gotByte[i] = 0;
849 gi.gotErr[i] = 0;
850
851 }
852
853 gi.evtGet = 0; //#new Start of Events read
854 gi.evtTot = 0; //#complete Events read
855 gi.evtErr = 0; //#Events with Errors
856 gi.evtSkp = 0; //#Events incomplete (timeout)
857
858 gi.procTot = 0; //#Events processed
859 gi.procErr = 0; //#Events showed problem in processing
860 gi.procTrg = 0; //#Events accepted by SW trigger
861 gi.procSkp = 0; //#Events rejected by SW trigger
862
863 gi.feedTot = 0; //#Events used for feedBack system
864 gi.feedErr = 0; //#Events rejected by feedBack
865
866 gi.wrtTot = 0; //#Events written to disk
867 gi.wrtErr = 0; //#Events with write-error
868
869 gi.runOpen = 0; //#Runs opened
870 gi.runClose = 0; //#Runs closed
871 gi.runErr = 0; //#Runs with open/close errors
872
873 return;
874} /*-----------------------------------------------------------------*/
875
876
877
878void
879initReadFAD ()
880{
881 return;
882} /*-----------------------------------------------------------------*/
883
884
885
886void *
887readFAD (void *ptr)
888{
889/* *** main loop reading FAD data and sorting them to complete events */
890 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k;
891 int actBoards = 0, minLen;
892 int32_t jrd;
893 uint gi_SecTime; //time in seconds
894 int boardId, roi[9], drs, px, src, pixS, pixH, pixC, pixR, tmS;
895
896 int goodhed = 0;
897
898 int sockDef[NBOARDS]; //internal state of sockets
899 int jrdx;
900
901
902 struct timespec xwait;
903
904
905 struct timeval *tv, atv;
906 tv = &atv;
907 uint32_t tsec, tusec;
908
909
910 snprintf (str, MXSTR, "start initializing");
911 factOut (kInfo, -1, str);
912
913 int cpu = 7; //read thread
914 cpu_set_t mask;
915
916/* CPU_ZERO initializes all the bits in the mask to zero. */
917// CPU_ZERO (&mask);
918/* CPU_SET sets only the bit corresponding to cpu. */
919 cpu = 7;
920// CPU_SET (cpu, &mask);
921
922/* sched_setaffinity returns 0 in success */
923// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
924// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
925// factOut (kWarn, -1, str);
926// }
927
928
929 head_len = sizeof (PEVNT_HEADER);
930 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
931 minLen = head_len; //min #bytes needed to check header: full header for debug
932
933 start.S = 0xFB01;
934 stop.S = 0x04FE;
935
936/* initialize run control logics */
937 for (i = 0; i < MAX_RUN; i++) {
938 runCtrl[i].runId = 0;
939 runCtrl[i].fileId = -2;
940 runCtrl[i].procId = -2;
941 }
942 gi_resetS = gi_resetR = 9;
943 for (i = 0; i < NBOARDS; i++)
944 sockDef[i] = 0;
945
946 START:
947 gettimeofday (tv, NULL);
948 g_actTime = tsec = atv.tv_sec;
949 g_actUsec = tusec = atv.tv_usec;
950 gi_myRun = g_actTime;
951 evtCtrl.frstPtr = 0;
952 evtCtrl.lastPtr = 0;
953
954 gi_SecTime = g_actTime;
955 gi_runStat = g_runStat;
956 gj.readStat = g_runStat;
957 numok = numok2 = 0;
958
959 int cntsock = 8 - NUMSOCK ;
960
961 if (gi_resetS > 0) {
962 //make sure all sockets are preallocated as 'not exist'
963 for (i = 0; i < MAX_SOCK; i++) {
964 rd[i].socket = -1;
965 rd[i].sockStat = 99;
966 }
967
968 for (k = 0; k < NBOARDS; k++) {
969 gi_NumConnect[k] = 0;
970 gi.numConn[k] = 0;
971 gj.numConn[k] = 0;
972 gj.errConn[k] = 0;
973 gj.rateBytes[k] = 0;
974 gj.totBytes[k] = 0;
975 }
976
977 }
978
979
980 if (gi_resetR > 0) {
981 resetEvtStat ();
982 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
983 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
984 gj.totMem = g_maxMem;
985 gj.bufNew = gj.bufEvt = 0;
986 gj.badRoiE = gj.badRoiR = gj.badRoiB =
987 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
988
989 int b,p;
990 for (b = 0; b < NBOARDS; b++)
991 gj.badRoi[b] = 0;
992
993 mBufInit (); //initialize buffers
994
995 snprintf (str, MXSTR, "end initializing");
996 factOut (kInfo, -1, str);
997 }
998
999
1000 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
1001
1002 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
1003
1004 gi_runStat = g_runStat;
1005 gj.readStat = g_runStat;
1006 gettimeofday (tv, NULL);
1007 g_actTime = tsec = atv.tv_sec;
1008 g_actUsec = tusec = atv.tv_usec;
1009
1010
1011 int b, p, p0, s0, nch;
1012 nch = 0;
1013 for (b = 0; b < NBOARDS; b++) {
1014 k = b * 7;
1015 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
1016 nch++;
1017 gi_NumConnect[b] = 0; //must close all connections
1018 gi.numConn[b] = 0;
1019 gj.numConn[b] = 0;
1020 if (sockDef[b] == 0)
1021 s0 = 0; //sockets to be defined and opened
1022 else if (g_port[b].sockDef == 0)
1023 s0 = -1; //sockets to be destroyed
1024 else
1025 s0 = +1; //sockets to be closed and reopened
1026
1027 if (s0 == 0)
1028 p0 = ntohs (g_port[b].sockAddr.sin_port);
1029 else
1030 p0 = 0;
1031
1032 for (p = p0 + 1; p < p0 + 8; p++) {
1033 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
1034 k++;
1035 }
1036 sockDef[b] = g_port[b].sockDef;
1037 }
1038 }
1039
1040 if (nch > 0) {
1041 actBoards = 0;
1042 for (b = 0; b < NBOARDS; b++) {
1043 if (sockDef[b] > 0)
1044 actBoards++;
1045 }
1046 }
1047
1048
1049 jrdx = 0;
1050 numokx = 0;
1051 numok = 0; //count number of succesfull actions
1052
1053 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read
1054 b = i / 7 ;
1055 p = i % 7 ;
1056
1057if ( p >= NUMSOCK) { ; }
1058else {
1059 if (sockDef[b] > 0)
1060 s0 = +1;
1061 else
1062 s0 = -1;
1063
1064 if (rd[i].sockStat < 0) { //try to connect if not yet done
1065 if (rd[i].sockStat == -1) {
1066 rd[i].sockStat = connect (rd[i].socket,
1067 (struct sockaddr *) &rd[i].SockAddr,
1068 sizeof (rd[i].SockAddr));
1069 if (rd[i].sockStat == -1) {
1070 rd[i].errCnt++ ;
1071// if (rd[i].errCnt < 100) rd[i].sockStat = rd[i].errCnt ;
1072// else if (rd[i].errCnt < 1000) rd[i].sockStat = 1000 ;
1073// else rd[i].sockStat = 10000 ;
1074 }
1075//printf("try to connect %d -> %d\n",i,rd[i].sockStat);
1076
1077 }
1078 if (rd[i].sockStat < -1 ) {
1079 rd[i].sockStat++ ;
1080 }
1081 if (rd[i].sockStat == 0) { //successfull ==>
1082 if (sockDef[b] > 0) {
1083 rd[i].bufTyp = 0; // expect a header
1084 rd[i].bufLen = frst_len; // max size to read at begining
1085 } else {
1086 rd[i].bufTyp = -1; // full data to be skipped
1087 rd[i].bufLen = MAX_LEN; //huge for skipping
1088 }
1089 rd[i].bufPos = 0; // no byte read so far
1090 rd[i].skip = 0; // start empty
1091// gi_NumConnect[b]++;
1092 gi_NumConnect[b] += cntsock ;
1093
1094 gi.numConn[b]++;
1095 gj.numConn[b]++;
1096 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);
1097 factOut (kInfo, -1, str);
1098 }
1099 }
1100
1101 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
1102 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
1103 numok++;
1104 size_t maxread = rd[i].bufLen ;
1105 if (maxread > MAXREAD ) maxread=MAXREAD ;
1106
1107 jrd =
1108 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
1109 maxread, MSG_DONTWAIT);
1110// rd[i].bufLen, MSG_DONTWAIT);
1111
1112 if (jrd > 0) {
1113 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
1114#ifdef EVTDEBUG
1115 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
1116 &rd[i].rBuf->B[rd[i].bufPos], jrd);
1117 snprintf (str, MXSTR,
1118 "read sock %3d bytes %5d len %5d first %d %d", i,
1119 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos],
1120 rd[i].rBuf->B[rd[i].bufPos + 1]);
1121 factOut (kDebug, 301, str);
1122#endif
1123 }
1124
1125 if (jrd == 0) { //connection has closed ...
1126 snprintf (str, MXSTR, "Socket %d closed by FAD", i);
1127 factOut (kInfo, 441, str);
1128 GenSock (s0, i, 0, NULL, &rd[i]);
1129 gi.gotErr[b]++;
1130// gi_NumConnect[b]--;
1131 gi_NumConnect[b]-= cntsock ;
1132 gi.numConn[b]--;
1133 gj.numConn[b]--;
1134
1135 } else if (jrd < 0) { //did not read anything
1136 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1137 snprintf (str, MXSTR, "Error Reading from %d | %m", i);
1138 factOut (kError, 442, str);
1139 gi.gotErr[b]++;
1140 } else
1141 numok--; //else nothing waiting to be read
1142 jrd = 0;
1143 }
1144 } else {
1145 jrd = 0; //did read nothing as requested
1146 snprintf (str, MXSTR, "do not read from socket %d %d", i,
1147 rd[i].bufLen);
1148 factOut (kDebug, 301, str);
1149 }
1150
1151 gi.gotByte[b] += jrd;
1152 gj.rateBytes[b] += jrd;
1153
1154 if (jrd > 0) {
1155 numokx++;
1156 jrdx += jrd;
1157 }
1158
1159
1160 if (rd[i].bufTyp < 0) { // we are skipping this board ...
1161// just do nothing
1162#ifdef EVTDEBUG
1163 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
1164 i);
1165 factOut (kInfo, 301, str);
1166#endif
1167
1168 } else if (rd[i].bufTyp > 0) { // we are reading data ...
1169 if (jrd < rd[i].bufLen) { //not yet all read
1170 rd[i].bufPos += jrd; //==> prepare for continuation
1171 rd[i].bufLen -= jrd;
1172 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
1173 } else { //full dataset read
1174 rd[i].bufLen = 0;
1175 rd[i].bufPos = rd[i].fadLen;
1176 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
1177 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
1178 gi.evtErr++;
1179 snprintf (str, MXSTR,
1180 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
1181 i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0],
1182 rd[i].rBuf->B[1],
1183 rd[i].rBuf->B[rd[i].bufPos - 2],
1184 rd[i].rBuf->B[rd[i].bufPos - 1]);
1185 factOut (kError, 301, str);
1186 goto EndBuf;
1187
1188#ifdef EVTDEBUG
1189 } else {
1190 snprintf (str, MXSTR,
1191 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
1192 i, rd[i].fadLen, rd[i].rBuf->B[0],
1193 rd[i].rBuf->B[1], start.B[1], start.B[0],
1194 rd[i].rBuf->B[rd[i].bufPos - 2],
1195 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
1196 stop.B[0]);
1197 factOut (kDebug, 301, str);
1198#endif
1199 }
1200
1201 if (jrd > 0)
1202 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1203
1204/* //we have a complete buffer, copy to WORK area
1205 int jr, kr;
1206 int checkRoi;
1207 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1208 for (kr = 1; kr < 4; kr++)
1209 {
1210 checkRoi = ntohs(rd[i].rBuf->S[head_len/ 2 +
1211 + kr*(roi[jr-1]+4)]);
1212 if (checkRoi != roi[0])
1213 {
1214 snprintf (str, MXSTR, "Inconsistent Roi accross board patches");
1215 factOut (kFatal, 1, str);
1216 goto EndBuf;
1217 }
1218
1219 }
1220 for (jr = 1; jr < 9; jr++) {
1221 roi[jr] =
1222 ntohs (rd[i].
1223 rBuf->S[head_len / 2 + 2 + jr * (roi[jr-1] + 4)]);
1224
1225 }
1226*/
1227 //we have a complete buffer, copy to WORK area
1228 int jr, kr;
1229 int checkRoi;
1230 int roiHopper = head_len/2 + 2; //points to the very first roi
1231 roi[0] = ntohs (rd[i].rBuf->S[roiHopper]);
1232 roiHopper += roi[0]+4;//skip to the second roi (i.e. next board, same patch-pixel)
1233 for (kr = 1; kr < 4; kr++)
1234 {
1235 checkRoi = ntohs(rd[i].rBuf->S[roiHopper]);
1236 if (checkRoi != roi[0])
1237 {
1238 snprintf (str, MXSTR, "Inconsistent Roi accross board patches a %d %d %d", kr, checkRoi, roi[0]);
1239 factOut (kError, 1, str);
1240 goto EndBuf;
1241 }
1242 roiHopper += checkRoi+4;
1243 }
1244 //roiHopper now points to the first pixel of board 2. Do the 8 remaining pixels
1245 for (jr = 1; jr < 9; jr++) {
1246 roi[jr] = ntohs(rd[i].rBuf->S[roiHopper]);
1247 checkRoi = roi[jr];
1248 for (kr = 1; kr < 4; kr++)
1249 {
1250 roiHopper += checkRoi+4;
1251 checkRoi = ntohs(rd[i].rBuf->S[roiHopper]);
1252 if (checkRoi != roi[jr])
1253 {
1254 snprintf (str, MXSTR, "Inconsistent Roi accross board patches B=%d P=%d %d %d", kr, jr, roi[jr], checkRoi);
1255 factOut (kFatal, 1, str);
1256 goto EndBuf;
1257 }
1258 }
1259 }
1260 //get index into mBuffer for this event (create if needed)
1261
1262 int actid;
1263 if (g_useFTM > 0)
1264 actid = rd[i].evtID;
1265 else
1266 actid = rd[i].ftmID;
1267
1268 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1269 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1270 rd[i].evtID);
1271
1272 if (evID < -1000) {
1273 goto EndBuf; //not usable board/event/run --> skip it
1274 }
1275 if (evID < 0) { //no space left, retry later
1276#ifdef EVTDEBUG
1277 if (rd[i].bufLen != 0) {
1278 snprintf (str, MXSTR, "something screwed up");
1279 factOut (kFatal, 1, str);
1280 }
1281#endif
1282 xwait.tv_sec = 0;
1283 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1284 nanosleep (&xwait, NULL);
1285 goto EndBuf1; //hope there is free space next round
1286 }
1287 //we have a valid entry in mBuffer[]; fill it
1288
1289#ifdef EVTDEBUG
1290 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1291 rd[i].fadLen);
1292 if (xchk != 0) {
1293 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d",
1294 xchk, rd[i].fadLen, i);
1295 factOut (kFatal, 1, str);
1296
1297 uint iq;
1298 for (iq = 0; iq < rd[i].fadLen; iq++) {
1299 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1300 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq,
1301 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
1302 factOut (kFatal, 1, str);
1303 }
1304 }
1305 }
1306#endif
1307 int qncpy = 0;
1308 boardId = b;
1309 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1310 int fadCrate = fadBoard / 256;
1311 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1312 snprintf (str, MXSTR, "wrong Board ID %d %d %d",
1313 fadCrate, fadBoard % 256, boardId);
1314 factOut (kWarn, 301, str);
1315 }
1316 if (mBuffer[evID].board[boardId] != -1) {
1317 snprintf (str, MXSTR,
1318 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ",
1319 evID, boardId, i, rd[i].fadLen,
1320 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1321 rd[i].rBuf->B[rd[i].bufPos - 2],
1322 rd[i].rBuf->B[rd[i].bufPos - 1]);
1323 factOut (kWarn, 501, str);
1324 goto EndBuf; //--> skip Board
1325 }
1326
1327 int iDx = evtIdx[evID]; //index into evtCtrl
1328
1329 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
1330 &rd[i].rBuf->S[0], head_len);
1331 qncpy += head_len;
1332
1333 src = head_len / 2;
1334 for (px = 0; px < 9; px++) { //different sort in FAD board.....
1335 for (drs = 0; drs < 4; drs++) {
1336 pixH = ntohs (rd[i].rBuf->S[src++]); // ID
1337 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell
1338 pixR = ntohs (rd[i].rBuf->S[src++]); // roi
1339//here we should check if pixH is correct ....
1340
1341 pixS = boardId * 36 + drs * 9 + px;
1342 src++;
1343
1344
1345 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1346 dest = pixS * roi[0];
1347 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1348 &rd[i].rBuf->S[src], roi[0] * 2);
1349 qncpy += roi[0] * 2;
1350 src += pixR;
1351
1352 if (px == 8) {
1353 tmS = boardId * 4 + drs;
1354 if (pixR > roi[0]) { //and we have additional TM info
1355 dest = tmS * roi[0] + NPIX * roi[0];
1356 int srcT = src - roi[0];
1357 mBuffer[evID].fEvent->StartTM[tmS] =
1358 (pixC + pixR - roi[0]) % 1024;
1359 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1360 &rd[i].rBuf->S[srcT], roi[0] * 2);
1361 qncpy += roi[0] * 2;
1362 } else {
1363 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1364 //ETIENNE because the TM channels are always processed during drs calib,
1365 //set them to zero if they are not present
1366 //I suspect that it may be more efficient to set all the allocated mem to
1367 //zero when allocating it
1368// dest = tmS*roi[0] + NPIX*roi[0];
1369 // bzero(&mBuffer[evID].fEvent->Adc_Data[dest],roi[0]*2);
1370 }
1371 }
1372 }
1373 } // now we have stored a new board contents into Event structure
1374
1375 mBuffer[evID].fEvent->NumBoards++;
1376 mBuffer[evID].board[boardId] = boardId;
1377 evtCtrl.evtStat[iDx]++;
1378 evtCtrl.pcTime[iDx] = g_actTime;
1379
1380 if (++mBuffer[evID].nBoard >= actBoards) {
1381 int qnrun = 0;
1382 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ???
1383 actrun = mBuffer[evID].runNum;
1384 int ir;
1385 for (ir = 0; ir < MAX_RUN; ir++) {
1386 qnrun++;
1387 if (runCtrl[ir].runId == actrun) {
1388 if (++runCtrl[ir].lastEvt == 0) {
1389 gotNewRun (actrun, mBuffer[evID].FADhead);
1390 snprintf (str, MXSTR, "gotNewRun %d (ev %d)",
1391 mBuffer[evID].runNum,
1392 mBuffer[evID].evNum);
1393 factOut (kInfo, 1, str);
1394 break;
1395 }
1396 }
1397 }
1398 }
1399 snprintf (str, MXSTR,
1400 "%5d complete event roi %4d roiTM %d cpy %8d %5d",
1401 mBuffer[evID].evNum, roi[0], roi[8] - roi[0],
1402 qncpy, qnrun);
1403 factOut (kDebug, -1, str);
1404
1405 //complete event read ---> flag for next processing
1406 evtCtrl.evtStat[iDx] = 99;
1407 gi.evtTot++;
1408 }
1409
1410 EndBuf:
1411 rd[i].bufTyp = 0; //ready to read next header
1412 rd[i].bufLen = frst_len;
1413 rd[i].bufPos = 0;
1414 EndBuf1:
1415 ;
1416 }
1417
1418 } else { //we are reading event header
1419 rd[i].bufPos += jrd;
1420 rd[i].bufLen -= jrd;
1421 if (rd[i].bufPos >= minLen) { //sufficient data to take action
1422 //check if startflag correct; else shift block ....
1423 for (k = 0; k < rd[i].bufPos - 1; k++) {
1424 if (rd[i].rBuf->B[k] == start.B[1]
1425 && rd[i].rBuf->B[k + 1] == start.B[0])
1426 break;
1427 }
1428 rd[i].skip += k;
1429
1430 if (k >= rd[i].bufPos - 1) { //no start of header found
1431 rd[i].bufPos = 0;
1432 rd[i].bufLen = head_len;
1433 } else if (k > 0) {
1434 rd[i].bufPos -= k;
1435 rd[i].bufLen += k;
1436 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1437 rd[i].bufPos);
1438#ifdef EVTDEBUG
1439 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1440 rd[i].bufPos);
1441#endif
1442 }
1443 if (rd[i].bufPos >= minLen) {
1444 if (rd[i].skip > 0) {
1445 snprintf (str, MXSTR, "skipped %d bytes on port%d",
1446 rd[i].skip, i);
1447 factOut (kInfo, 666, str);
1448 rd[i].skip = 0;
1449 }
1450 goodhed++;
1451 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2;
1452 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]);
1453 rd[i].ftmTyp = ntohs (rd[i].rBuf->S[5]);
1454 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt)
1455 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt)
1456 rd[i].runID = ntohl (rd[i].rBuf->I[11]);
1457 rd[i].bufTyp = 1; //ready to read full record
1458 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1459
1460 int fadboard = ntohs (rd[i].rBuf->S[12]);
1461 int fadcrate = fadboard / 256;
1462 fadboard = (fadcrate * 10 + fadboard % 256);
1463#ifdef EVTDEBUG
1464 snprintf (str, MXSTR,
1465 "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
1466 rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
1467 rd[i].runID, fadboard, jrd);
1468 factOut (kDebug, 1, str);
1469#endif
1470
1471 if (rd[i].runID == 0)
1472 rd[i].runID = gi_myRun;
1473
1474 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
1475 snprintf (str, MXSTR,
1476 "illegal event-length on port %d", i);
1477 factOut (kFatal, 881, str);
1478 rd[i].bufLen = 100000; //?
1479 }
1480 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1481 debugHead (i, fadBoard, rd[i].rBuf);
1482 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1483 } else {
1484 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1485 }
1486 } else {
1487 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1488 }
1489
1490 } //end interpreting last read
1491}
1492 } //end of successful read anything
1493 } //finished trying to read all sockets
1494
1495#ifdef EVTDEBUG
1496 snprintf (str, MXSTR, "Loop ---- %3d --- %8d", numokx, jrdx);
1497 factOut (kDebug, -1, str);
1498#endif
1499
1500 gi.numRead[numok]++;
1501
1502 g_actTime = time (NULL);
1503 if (g_actTime > gi_SecTime) {
1504 gi_SecTime = g_actTime;
1505
1506
1507 //loop over all active events and flag those older than read-timeout
1508 //delete those that are written to disk ....
1509
1510 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1511 if (kd < 0)
1512 kd += (MAX_EVT * MAX_RUN);
1513
1514 gj.bufNew = gj.bufEvt = 0;
1515 int k1 = evtCtrl.frstPtr;
1516 for (k = k1; k < (k1 + kd); k++) {
1517 int k0 = k % (MAX_EVT * MAX_RUN);
1518//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1519
1520 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) {
1521 gj.bufNew++; //incomplete event in Buffer
1522 if (evtCtrl.evtStat[k0] < 90
1523 && evtCtrl.pcTime[k0] < g_actTime - 10) {
1524 int id = evtCtrl.evtBuf[k0];
1525 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d",
1526 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0,
1527 evtCtrl.evtStat[k0]);
1528 factOut (kWarn, 601, str);
1529
1530 int ik,ib,jb;
1531 ik=0;
1532 for (ib=0; ib<NBOARDS; ib++) {
1533 if (ib%10==0) {
1534 snprintf (&str[ik], MXSTR, "'");
1535 ik++;
1536 }
1537 jb = mBuffer[id].board[ib];
1538 if ( jb<0 ) {
1539 if (gi_NumConnect[b] >0 ) {
1540 snprintf (&str[ik], MXSTR, ".");
1541 } else {
1542 snprintf (&str[ik], MXSTR, "x");
1543 }
1544 } else {
1545 snprintf (&str[ik], MXSTR, "%d",jb%10);
1546 }
1547 ik++;
1548 }
1549 snprintf (&str[ik], MXSTR, "'");
1550 factOut (kWarn, 601, str);
1551
1552 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events
1553 gi.evtSkp++;
1554 gi.evtTot++;
1555 gj.evtSkip++;
1556 }
1557 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete'
1558 || evtCtrl.evtStat[k0] == 0) { //'useless'
1559
1560 int id = evtCtrl.evtBuf[k0];
1561 snprintf (str, MXSTR, "%5d free event buffer, nb=%3d",
1562 mBuffer[id].evNum, mBuffer[id].nBoard);
1563 factOut (kDebug, -1, str);
1564 mBufFree (id); //event written--> free memory
1565 evtCtrl.evtStat[k0] = -1;
1566 gj.evtWrite++;
1567 gj.rateWrite++;
1568 } else if (evtCtrl.evtStat[k0] >= 95) {
1569 gj.bufEvt++; //complete event in Buffer
1570 }
1571
1572 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) {
1573 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1574 }
1575 }
1576
1577
1578 gj.deltaT = 1000; //temporary, must be improved
1579
1580 int b;
1581 for (b = 0; b < NBOARDS; b++)
1582 gj.totBytes[b] += gj.rateBytes[b];
1583 gj.totMem = g_maxMem;
1584 if (gj.maxMem > gj.xxxMem)
1585 gj.xxxMem = gj.maxMem;
1586 if (gj.maxEvt > gj.xxxEvt)
1587 gj.xxxEvt = gj.maxEvt;
1588
1589 factStat (gj);
1590 factStatNew (gi);
1591 gj.rateNew = gj.rateWrite = 0;
1592 gj.maxMem = gj.usdMem;
1593 gj.maxEvt = gj.bufTot;
1594 for (b = 0; b < NBOARDS; b++)
1595 gj.rateBytes[b] = 0;
1596 }
1597
1598 if (numok > 0)
1599 numok2 = 0;
1600 else if (numok2++ > 3) {
1601 if (g_runStat == 1) {
1602 xwait.tv_sec = 1;
1603 xwait.tv_nsec = 0; // hibernate for 1 sec
1604 } else {
1605 xwait.tv_sec = 0;
1606 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1607 }
1608 nanosleep (&xwait, NULL);
1609 }
1610
1611 } //and do next loop over all sockets ...
1612
1613
1614 snprintf (str, MXSTR, "stop reading ... RESET=%d", g_reset);
1615 factOut (kInfo, -1, str);
1616
1617 if (g_reset > 0) {
1618 gi_reset = g_reset;
1619 gi_resetR = gi_reset % 10; //shall we stop reading ?
1620 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1621 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1622 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1623 g_reset = 0;
1624 } else {
1625 gi_reset = 0;
1626 if (g_runStat == -1)
1627 gi_resetR = 1;
1628 else
1629 gi_resetR = 7;
1630 gi_resetS = 7; //close all sockets
1631 gi_resetW = 7; //close all files
1632 gi_resetX = 0;
1633
1634 //inform others we have to quit ....
1635 gi_runStat = -11; //inform all that no update to happen any more
1636 gj.readStat = -11; //inform all that no update to happen any more
1637 }
1638
1639 if (gi_resetS > 0) {
1640 //must close all open sockets ...
1641 snprintf (str, MXSTR, "close all sockets ...");
1642 factOut (kInfo, -1, str);
1643 for (i = 0; i < MAX_SOCK; i++) {
1644 if (rd[i].sockStat == 0) {
1645 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1646 if (i % 7 == 0) {
1647// gi_NumConnect[i / 7]--;
1648 gi_NumConnect[i / 7]-= cntsock ;
1649 gi.numConn[i / 7]--;
1650 gj.numConn[i / 7]--;
1651 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1652 rd[i / 7].sockStat = -1; //and try to open asap
1653 }
1654 }
1655 }
1656 }
1657
1658
1659 if (gi_resetR > 0) {
1660 //flag all events as 'read finished'
1661 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1662 if (kd < 0)
1663 kd += (MAX_EVT * MAX_RUN);
1664
1665 int k1 = evtCtrl.frstPtr;
1666 for (k = k1; k < (k1 + kd); k++) {
1667 int k0 = k % (MAX_EVT * MAX_RUN);
1668 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) {
1669 evtCtrl.evtStat[k0] = 91;
1670 gi.evtSkp++;
1671 gi.evtTot++;
1672 }
1673 }
1674
1675 xwait.tv_sec = 0;
1676 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1677 nanosleep (&xwait, NULL);
1678
1679 //and clear all buffers (might have to wait until all others are done)
1680 int minclear;
1681 if (gi_resetR == 1) {
1682 minclear = 900;
1683 snprintf (str, MXSTR, "drain all buffers ...");
1684 } else {
1685 minclear = 0;
1686 snprintf (str, MXSTR, "flush all buffers ...");
1687 }
1688 factOut (kInfo, -1, str);
1689
1690 int numclear = 1;
1691 while (numclear > 0) {
1692 numclear = 0;
1693 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1694 if (kd < 0)
1695 kd += (MAX_EVT * MAX_RUN);
1696
1697 int k1 = evtCtrl.frstPtr;
1698 for (k = k1; k < (k1 + kd); k++) {
1699 int k0 = k % (MAX_EVT * MAX_RUN);
1700 if (evtCtrl.evtStat[k0] > minclear) {
1701 int id = evtCtrl.evtBuf[k0];
1702 snprintf (str, MXSTR, "ev %5d free event buffer, nb=%3d",
1703 mBuffer[id].evNum, mBuffer[id].nBoard);
1704 factOut (kDebug, -1, str);
1705 mBufFree (id); //event written--> free memory
1706 evtCtrl.evtStat[k0] = -1;
1707 } else if (evtCtrl.evtStat[k0] > 0)
1708 numclear++; //writing is still ongoing...
1709
1710 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1711 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1712 }
1713
1714 xwait.tv_sec = 0;
1715 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1716 nanosleep (&xwait, NULL);
1717 }
1718 }
1719
1720 if (gi_reset > 0) {
1721 if (gi_resetW > 0) {
1722 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1723 }
1724 if (gi_resetX > 0) {
1725 xwait.tv_sec = gi_resetX;
1726 xwait.tv_nsec = 0;
1727 nanosleep (&xwait, NULL);
1728 }
1729
1730 snprintf (str, MXSTR, "Continue read Process ...");
1731 factOut (kInfo, -1, str);
1732 gi_reset = 0;
1733 goto START;
1734 }
1735
1736
1737
1738 snprintf (str, MXSTR, "Exit read Process ...");
1739 factOut (kInfo, -1, str);
1740 gi_runStat = -99;
1741 gj.readStat = -99;
1742 factStat (gj);
1743 factStatNew (gi);
1744 return 0;
1745
1746} /*-----------------------------------------------------------------*/
1747
1748
1749void *
1750subProc (void *thrid)
1751{
1752 int threadID, status, numWait, numProc, kd, k1, k0, k, jret;
1753 struct timespec xwait;
1754
1755 int32_t cntr ;
1756
1757 threadID = (int) thrid;
1758
1759 snprintf (str, MXSTR, "Starting sub-process-thread %d", threadID);
1760 factOut (kInfo, -1, str);
1761
1762 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1763 numWait = numProc = 0;
1764 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1765 if (kd < 0)
1766 kd += (MAX_EVT * MAX_RUN);
1767
1768 int k1 = evtCtrl.frstPtr;
1769 for (k = k1; k < (k1 + kd); k++) {
1770 int k0 = k % (MAX_EVT * MAX_RUN);
1771
1772 if (evtCtrl.evtStat[k0] == 1000 + threadID) {
1773 if (gi_resetR > 1) { //we are asked to flush buffers asap
1774 jret = 9100; //flag to be deleted
1775 } else {
1776 int id = evtCtrl.evtBuf[k0];
1777
1778
1779 jret =
1780 subProcEvt (threadID, mBuffer[id].FADhead,
1781 mBuffer[id].fEvent, mBuffer[id].buffer);
1782
1783
1784 if (jret <= threadID) {
1785 snprintf (str, MXSTR,
1786 "process %d wants to send to process %d",
1787 threadID, jret);
1788 factOut (kError, -1, str);
1789 jret = 5300;
1790 } else if (jret <= 0)
1791 jret = 9200 + threadID; //flag as 'to be deleted'
1792 else if (jret >= gi_maxProc)
1793 jret = 5200 + threadID; //flag as 'to be written'
1794 else
1795 jret = 1000 + jret; //flag for next proces
1796 }
1797 evtCtrl.evtStat[k0] = jret;
1798 numProc++;
1799 } else if (evtCtrl.evtStat[k0] < 1000 + threadID)
1800 numWait++;
1801 }
1802
1803 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1804 snprintf (str, MXSTR, "Exit subProcessing Process %d", threadID);
1805 factOut (kInfo, -1, str);
1806 return 0;
1807 }
1808 if (numProc == 0) {
1809 //seems we have nothing to do, so sleep a little
1810 xwait.tv_sec = 0;
1811 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1812 nanosleep (&xwait, NULL);
1813 }
1814 }
1815
1816 snprintf (str, MXSTR, "Ending sub-process-thread %d", threadID);
1817 factOut (kInfo, -1, str);
1818 return;
1819} /*-----------------------------------------------------------------*/
1820
1821
1822void *
1823procEvt (void *ptr)
1824{
1825/* *** main loop processing file, including SW-trigger */
1826 int numProc, numWait;
1827 int k, status, j;
1828 struct timespec xwait;
1829 char str[MXSTR];
1830
1831
1832
1833 int lastRun = 0; //usually run from last event still valid
1834
1835 cpu_set_t mask;
1836 int cpu = 1; //process thread (will be several in final version)
1837
1838 snprintf (str, MXSTR, "Starting process-thread with %d subprocess",
1839 gi_maxProc);
1840 factOut (kInfo, -1, str);
1841
1842/* CPU_ZERO initializes all the bits in the mask to zero. */
1843// CPU_ZERO (&mask);
1844/* CPU_SET sets only the bit corresponding to cpu. */
1845// CPU_SET( 0 , &mask ); leave for system
1846// CPU_SET( 1 , &mask ); used by write process
1847// CPU_SET (2, &mask);
1848// CPU_SET (3, &mask);
1849// CPU_SET (4, &mask);
1850// CPU_SET (5, &mask);
1851// CPU_SET (6, &mask);
1852// CPU_SET( 7 , &mask ); used by read process
1853/* sched_setaffinity returns 0 in success */
1854// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1855// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1856// factOut (kWarn, -1, str);
1857// }
1858
1859
1860 pthread_t thread[100];
1861 int th_ret[100];
1862
1863 for (k = 0; k < gi_maxProc; k++) {
1864 th_ret[k] = pthread_create (&thread[k], NULL, subProc, (void *) k);
1865 }
1866
1867 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1868
1869 numWait = numProc = 0;
1870 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1871 if (kd < 0)
1872 kd += (MAX_EVT * MAX_RUN);
1873
1874 int k1 = evtCtrl.frstPtr;
1875 for (k = k1; k < (k1 + kd); k++) {
1876 int k0 = k % (MAX_EVT * MAX_RUN);
1877//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1878 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) {
1879
1880 if (gi_resetR > 1) { //we are asked to flush buffers asap
1881 evtCtrl.evtStat[k0] = 9991;
1882 } else {
1883
1884//-------- it is better to open the run already here, so call can be used to initialize
1885//-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1886 int id = evtCtrl.evtBuf[k0];
1887 uint32_t irun = mBuffer[id].runNum;
1888 int32_t ievt = mBuffer[id].evNum;
1889 if (runCtrl[lastRun].runId == irun) {
1890 j = lastRun;
1891 } else {
1892 //check which fileID to use (or open if needed)
1893 for (j = 0; j < MAX_RUN; j++) {
1894 if (runCtrl[j].runId == irun)
1895 break;
1896 }
1897 if (j >= MAX_RUN) {
1898 snprintf (str, MXSTR,
1899 "P error: can not find run %d for event %d in %d",
1900 irun, ievt, id);
1901 factOut (kFatal, 901, str);
1902 }
1903 lastRun = j;
1904 }
1905
1906 if (runCtrl[j].fileId < 0) {
1907//---- we need to open a new run ==> make sure all older runs are
1908//---- finished and marked to be closed ....
1909 int j1;
1910 for (j1 = 0; j1 < MAX_RUN; j1++) {
1911 if (runCtrl[j1].fileId == 0) {
1912 runCtrl[j1].procId = 2; //--> do no longer accept events for processing
1913//---- problem: processing still going on ==> must wait for closing ....
1914 snprintf (str, MXSTR,
1915 "P finish run since new one opened %d",
1916 runCtrl[j1].runId);
1917 runFinish1 (runCtrl[j1].runId);
1918 }
1919
1920 }
1921
1922 actRun.Version = 1;
1923 actRun.RunType = -1; //to be adapted
1924
1925 actRun.Nroi = runCtrl[j].roi0;
1926 actRun.NroiTM = runCtrl[j].roi8;
1927//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
1928// if (actRun.Nroi == actRun.NroiTM)
1929// actRun.NroiTM = 0;
1930 actRun.RunTime = runCtrl[j].firstTime;
1931 actRun.RunUsec = runCtrl[j].firstTime;
1932 actRun.NBoard = NBOARDS;
1933 actRun.NPix = NPIX;
1934 actRun.NTm = NTMARK;
1935 actRun.Nroi = mBuffer[id].nRoi;
1936 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1937 NBOARDS * sizeof (PEVNT_HEADER));
1938
1939 runCtrl[j].fileHd =
1940 runOpen (irun, &actRun, sizeof (actRun));
1941 if (runCtrl[j].fileHd == NULL) {
1942 snprintf (str, MXSTR,
1943 "P could not open a file for run %d", irun);
1944 factOut (kError, 502, str);
1945 runCtrl[j].fileId = 91;
1946 runCtrl[j].procId = 91;
1947 } else {
1948 snprintf (str, MXSTR, "P opened new run_file %d evt %d",
1949 irun, ievt);
1950 factOut (kInfo, -1, str);
1951 runCtrl[j].fileId = 0;
1952 runCtrl[j].procId = 0;
1953 }
1954
1955 }
1956//-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
1957 if (runCtrl[j].procId == 0) {
1958 if (runCtrl[j].closeTime < g_actTime
1959 || runCtrl[j].lastTime < g_actTime - 300
1960 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) {
1961 snprintf (str, MXSTR,
1962 "P reached end of run condition for run %d",
1963 irun);
1964 factOut (kInfo, 502, str);
1965 runFinish1 (runCtrl[j].runId);
1966 runCtrl[j].procId = 1;
1967 }
1968 }
1969 if (runCtrl[j].procId != 0) {
1970 snprintf (str, MXSTR,
1971 "P skip event %d because no active run %d", ievt,
1972 irun);
1973 factOut (kDebug, 502, str);
1974 evtCtrl.evtStat[k0] = 9091;
1975 } else {
1976//--------
1977//--------
1978 id = evtCtrl.evtBuf[k0];
1979 int itevt = mBuffer[id].trgNum;
1980 int itrg = mBuffer[id].trgTyp;
1981 int roi = mBuffer[id].nRoi;
1982 int roiTM = mBuffer[id].nRoiTM;
1983
1984//make sure unused pixels/tmarks are cleared to zero
1985//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
1986// if (roiTM == roi)
1987// roiTM = 0;
1988 int ip, it, dest, ib;
1989 for (ip = 0; ip < NPIX; ip++) {
1990 if (mBuffer[id].fEvent->StartPix[ip] == -1) {
1991 dest = ip * roi;
1992 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1993 }
1994 }
1995
1996 for (it = 0; it < NTMARK; it++) {
1997 if (mBuffer[id].fEvent->StartTM[it] == -1) {
1998 dest = it * roi + NPIX * roi;
1999 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
2000 }
2001 }
2002
2003
2004//and set correct event header ; also check for consistency in event (not yet)
2005 mBuffer[id].fEvent->Roi = roi;
2006 mBuffer[id].fEvent->RoiTM = roiTM;
2007 mBuffer[id].fEvent->EventNum = ievt;
2008 mBuffer[id].fEvent->TriggerNum = itevt;
2009 mBuffer[id].fEvent->TriggerType = itrg;
2010 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
2011 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
2012 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
2013 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
2014 mBuffer[id].fEvent->SoftTrig = 0;
2015
2016
2017 for (ib = 0; ib < NBOARDS; ib++) {
2018 if (mBuffer[id].board[ib] == -1) { //board is not read
2019 mBuffer[id].FADhead[ib].start_package_flag = 0;
2020 mBuffer[id].fEvent->BoardTime[ib] = 0;
2021 } else {
2022 mBuffer[id].fEvent->BoardTime[ib] =
2023 ntohl (mBuffer[id].FADhead[ib].time);
2024 }
2025 }
2026 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
2027 mBuffer[id].fEvent);
2028 gi.procTot++;
2029 numProc++;
2030
2031 if (i < 0) {
2032 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
2033 gi.procErr++;
2034 } else {
2035 evtCtrl.evtStat[k0] = 1000;
2036 runCtrl[j].procEvt++;
2037 }
2038 }
2039 }
2040 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) {
2041 numWait++;
2042 }
2043 }
2044
2045 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2046 snprintf (str, MXSTR, "Exit Processing Process ...");
2047 factOut (kInfo, -1, str);
2048 gp_runStat = -22; //==> we should exit
2049 gj.procStat = -22; //==> we should exit
2050 return 0;
2051 }
2052
2053 if (numProc == 0) {
2054 //seems we have nothing to do, so sleep a little
2055 xwait.tv_sec = 0;
2056 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2057 nanosleep (&xwait, NULL);
2058 }
2059 gp_runStat = gi_runStat;
2060 gj.procStat = gj.readStat;
2061
2062 }
2063
2064 //we are asked to abort asap ==> must flag all remaining events
2065 // when gi_runStat claims that all events are in the buffer...
2066
2067 snprintf (str, MXSTR, "Abort Processing Process ...");
2068 factOut (kInfo, -1, str);
2069 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2070 if (kd < 0)
2071 kd += (MAX_EVT * MAX_RUN);
2072
2073 for (k = 0; k < gi_maxProc; k++) {
2074 pthread_join (thread[k], (void **) &status);
2075 }
2076
2077 int k1 = evtCtrl.frstPtr;
2078 for (k = k1; k < (k1 + kd); k++) {
2079 int k0 = k % (MAX_EVT * MAX_RUN);
2080 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {
2081 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
2082 }
2083 }
2084
2085 gp_runStat = -99;
2086 gj.procStat = -99;
2087
2088 return 0;
2089
2090} /*-----------------------------------------------------------------*/
2091
2092int
2093CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
2094{
2095/* close run runId (all all runs if runId=0) */
2096/* return: 0=close scheduled / >0 already closed / <0 does not exist */
2097 int j;
2098
2099
2100 if (runId == 0) {
2101 for (j = 0; j < MAX_RUN; j++) {
2102 if (runCtrl[j].fileId == 0) { //run is open
2103 runCtrl[j].closeTime = closeTime;
2104 runCtrl[j].maxEvt = maxEvt;
2105 }
2106 }
2107 return 0;
2108 }
2109
2110 for (j = 0; j < MAX_RUN; j++) {
2111 if (runCtrl[j].runId == runId) {
2112 if (runCtrl[j].fileId == 0) { //run is open
2113 runCtrl[j].closeTime = closeTime;
2114 runCtrl[j].maxEvt = maxEvt;
2115 return 0;
2116 } else if (runCtrl[j].fileId < 0) { //run not yet opened
2117 runCtrl[j].closeTime = closeTime;
2118 runCtrl[j].maxEvt = maxEvt;
2119 return +1;
2120 } else { // run already closed
2121 return +2;
2122 }
2123 }
2124 } //we only reach here if the run was never created
2125 return -1;
2126
2127} /*-----------------------------------------------------------------*/
2128
2129
2130void *
2131writeEvt (void *ptr)
2132{
2133/* *** main loop writing event (including opening and closing run-files */
2134
2135 int numWrite, numWait;
2136 int k, j ;
2137 struct timespec xwait;
2138 char str[MXSTR];
2139
2140 cpu_set_t mask;
2141 int cpu = 1; //write thread
2142
2143 snprintf (str, MXSTR, "Starting write-thread");
2144 factOut (kInfo, -1, str);
2145
2146/* CPU_ZERO initializes all the bits in the mask to zero. */
2147// CPU_ZERO (&mask);
2148/* CPU_SET sets only the bit corresponding to cpu. */
2149// CPU_SET (cpu, &mask);
2150/* sched_setaffinity returns 0 in success */
2151// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
2152// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
2153// }
2154
2155 int lastRun = 0; //usually run from last event still valid
2156
2157 while (g_runStat > -2) {
2158
2159 numWait = numWrite = 0;
2160 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2161 if (kd < 0)
2162 kd += (MAX_EVT * MAX_RUN);
2163
2164 int k1 = evtCtrl.frstPtr;
2165 for (k = k1; k < (k1 + kd); k++) {
2166 int k0 = k % (MAX_EVT * MAX_RUN);
2167//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
2168 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) {
2169
2170 if (gi_resetR > 1) { //we must drain the buffer asap
2171 evtCtrl.evtStat[k0] = 9904;
2172 } else {
2173
2174
2175 int id = evtCtrl.evtBuf[k0];
2176 uint32_t irun = mBuffer[id].runNum;
2177 int32_t ievt = mBuffer[id].evNum;
2178
2179 gi.wrtTot++;
2180 if (runCtrl[lastRun].runId == irun) {
2181 j = lastRun;
2182 } else {
2183 //check which fileID to use (or open if needed)
2184 for (j = 0; j < MAX_RUN; j++) {
2185 if (runCtrl[j].runId == irun)
2186 break;
2187 }
2188 if (j >= MAX_RUN) {
2189 snprintf (str, MXSTR,
2190 "W error: can not find run %d for event %d in %d",
2191 irun, ievt, id);
2192 factOut (kFatal, 901, str);
2193 gi.wrtErr++;
2194 }
2195 lastRun = j;
2196 }
2197
2198 if (runCtrl[j].fileId < 0) {
2199 actRun.Version = 1;
2200 actRun.RunType = -1; //to be adapted
2201
2202 actRun.Nroi = runCtrl[j].roi0;
2203 actRun.NroiTM = runCtrl[j].roi8;
2204//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
2205// if (actRun.Nroi == actRun.NroiTM)
2206// actRun.NroiTM = 0;
2207 actRun.RunTime = runCtrl[j].firstTime;
2208 actRun.RunUsec = runCtrl[j].firstTime;
2209 actRun.NBoard = NBOARDS;
2210 actRun.NPix = NPIX;
2211 actRun.NTm = NTMARK;
2212 actRun.Nroi = mBuffer[id].nRoi;
2213 memcpy (actRun.FADhead, mBuffer[id].FADhead,
2214 NBOARDS * sizeof (PEVNT_HEADER));
2215
2216 runCtrl[j].fileHd =
2217 runOpen (irun, &actRun, sizeof (actRun));
2218 if (runCtrl[j].fileHd == NULL) {
2219 snprintf (str, MXSTR,
2220 "W could not open a file for run %d", irun);
2221 factOut (kError, 502, str);
2222 runCtrl[j].fileId = 91;
2223 } else {
2224 snprintf (str, MXSTR, "W opened new run_file %d evt %d",
2225 irun, ievt);
2226 factOut (kInfo, -1, str);
2227 runCtrl[j].fileId = 0;
2228 }
2229
2230 }
2231
2232 if (runCtrl[j].fileId != 0) {
2233 if (runCtrl[j].fileId < 0) {
2234 snprintf (str, MXSTR,
2235 "W never opened file for this run %d", irun);
2236 factOut (kError, 123, str);
2237 } else if (runCtrl[j].fileId < 100) {
2238 snprintf (str, MXSTR, "W.file for this run is closed %d",
2239 irun);
2240 factOut (kWarn, 123, str);
2241 runCtrl[j].fileId += 100;
2242 } else {
2243 snprintf (str, MXSTR, "W:file for this run is closed %d",
2244 irun);
2245 factOut (kDebug, 123, str);
2246 }
2247 evtCtrl.evtStat[k0] = 9903;
2248 gi.wrtErr++;
2249 } else {
2250// snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id]));
2251// factOut (kInfo, 504, str);
2252 int i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
2253 sizeof (mBuffer[id]));
2254 if (i >= 0) {
2255 runCtrl[j].lastTime = g_actTime;
2256 runCtrl[j].actEvt++;
2257 evtCtrl.evtStat[k0] = 9901;
2258 snprintf (str, MXSTR,
2259 "%5d successfully wrote for run %d id %5d",
2260 ievt, irun, k0);
2261 factOut (kDebug, 504, str);
2262// gj.writEvt++ ;
2263 } else {
2264 snprintf (str, MXSTR, "W error writing event for run %d",
2265 irun);
2266 factOut (kError, 503, str);
2267 evtCtrl.evtStat[k0] = 9902;
2268 gi.wrtErr++;
2269 }
2270
2271 if (i < 0
2272 || runCtrl[j].lastTime < g_actTime - 300
2273 || runCtrl[j].closeTime < g_actTime
2274 || runCtrl[j].maxEvt < runCtrl[j].actEvt) {
2275 int ii = 0;
2276 if (i < 0)
2277 ii = 1;
2278 else if (runCtrl[j].closeTime < g_actTime)
2279 ii = 2;
2280 else if (runCtrl[j].lastTime < g_actTime - 300)
2281 ii = 3;
2282 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2283 ii = 4;
2284
2285
2286
2287 //close run for whatever reason
2288 if (runCtrl[j].runId == gi_myRun)
2289 gi_myRun = g_actTime;
2290
2291 if (runCtrl[j].procId == 0) {
2292 runFinish1 (runCtrl[j].runId);
2293 runCtrl[j].procId = 92;
2294 }
2295
2296 runCtrl[j].closeTime = g_actTime - 1;
2297 i = runClose (runCtrl[j].fileHd, &runTail[j],
2298 sizeof (runTail[j]));
2299 if (i < 0) {
2300 snprintf (str, MXSTR, "error closing run %d %d AAA",
2301 runCtrl[j].runId, i);
2302 factOut (kError, 503, str);
2303 runCtrl[j].fileId = 92;
2304 } else {
2305 snprintf (str, MXSTR, "W closed run %d AAA %d", irun,
2306 ii);
2307 factOut (kInfo, 503, str);
2308 runCtrl[j].fileId = 93;
2309 }
2310 }
2311 }
2312 }
2313 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2314 numWait++;
2315 }
2316
2317 //check if we should close a run (mainly when no event pending)
2318 for (j = 0; j < MAX_RUN; j++) {
2319 if (runCtrl[j].fileId == 0
2320 && (runCtrl[j].closeTime < g_actTime
2321 || runCtrl[j].lastTime < g_actTime - 300
2322 || runCtrl[j].maxEvt <= runCtrl[j].actEvt)) {
2323 if (runCtrl[j].runId == gi_myRun)
2324 gi_myRun = g_actTime;
2325 int ii = 0;
2326 if (runCtrl[j].closeTime < g_actTime)
2327 ii = 2;
2328 else if (runCtrl[j].lastTime < g_actTime - 300)
2329 ii = 3;
2330 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2331 ii = 4;
2332
2333 if (runCtrl[j].procId == 0) {
2334 runFinish1 (runCtrl[j].runId);
2335 runCtrl[j].procId = 92;
2336 }
2337
2338 runCtrl[j].closeTime = g_actTime - 1;
2339 int i = runClose (runCtrl[j].fileHd, &runTail[j],
2340 sizeof (runTail[j]));
2341 if (i < 0) {
2342 snprintf (str, MXSTR, "error closing run %d %d BBB",
2343 runCtrl[j].runId, i);
2344 factOut (kError, 506, str);
2345 runCtrl[j].fileId = 94;
2346 } else {
2347 snprintf (str, MXSTR, "W closed run %d BBB %d",
2348 runCtrl[j].runId, ii);
2349 factOut (kInfo, 507, str);
2350 runCtrl[j].fileId = 95;
2351 }
2352 }
2353 }
2354
2355 if (numWrite == 0) {
2356 //seems we have nothing to do, so sleep a little
2357 xwait.tv_sec = 0;
2358 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2359 nanosleep (&xwait, NULL);
2360 }
2361
2362 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2363 snprintf (str, MXSTR, "Finish Write Process ...");
2364 factOut (kInfo, -1, str);
2365 gw_runStat = -22; //==> we should exit
2366 gj.writStat = -22; //==> we should exit
2367 goto closerun;
2368 }
2369 gw_runStat = gi_runStat;
2370 gj.writStat = gj.readStat;
2371
2372 }
2373
2374 //must close all open files ....
2375 snprintf (str, MXSTR, "Abort Writing Process ...");
2376 factOut (kInfo, -1, str);
2377
2378 closerun:
2379 snprintf (str, MXSTR, "Close all open files ...");
2380 factOut (kInfo, -1, str);
2381 for (j = 0; j < MAX_RUN; j++)
2382 if (runCtrl[j].fileId == 0) {
2383 if (runCtrl[j].runId == gi_myRun)
2384 gi_myRun = g_actTime;
2385
2386 if (runCtrl[j].procId == 0) {
2387 runFinish1 (runCtrl[j].runId);
2388 runCtrl[j].procId = 92;
2389 }
2390
2391 runCtrl[j].closeTime = g_actTime - 1;
2392 int i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));
2393 int ii = 0;
2394 if (runCtrl[j].closeTime < g_actTime)
2395 ii = 2;
2396 else if (runCtrl[j].lastTime < g_actTime - 300)
2397 ii = 3;
2398 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2399 ii = 4;
2400 if (i < 0) {
2401 snprintf (str, MXSTR, "error closing run %d %d CCC",
2402 runCtrl[j].runId, i);
2403 factOut (kError, 506, str);
2404 runCtrl[j].fileId = 96;
2405 } else {
2406 snprintf (str, MXSTR, "W closed run %d CCC %d", runCtrl[j].runId,
2407 ii);
2408 factOut (kInfo, 507, str);
2409 runCtrl[j].fileId = 97;
2410 }
2411 }
2412
2413 gw_runStat = -99;
2414 gj.writStat = -99;
2415 snprintf (str, MXSTR, "Exit Writing Process ...");
2416 factOut (kInfo, -1, str);
2417 return 0;
2418
2419
2420
2421
2422} /*-----------------------------------------------------------------*/
2423
2424
2425
2426
2427void
2428StartEvtBuild ()
2429{
2430
2431 int i, j, imax, status, th_ret[50];
2432 pthread_t thread[50];
2433 struct timespec xwait;
2434
2435 gi_runStat = gp_runStat = gw_runStat = 0;
2436 gj.readStat = gj.procStat = gj.writStat = 0;
2437
2438 snprintf (str, MXSTR, "Starting EventBuilder V15.07 A");
2439 factOut (kInfo, -1, str);
2440
2441//initialize run control logics
2442 for (i = 0; i < MAX_RUN; i++) {
2443 runCtrl[i].runId = 0;
2444 runCtrl[i].fileId = -2;
2445 }
2446
2447//prepare for subProcesses
2448 gi_maxSize = g_maxSize;
2449 if (gi_maxSize <= 0)
2450 gi_maxSize = 1;
2451
2452 gi_maxProc = g_maxProc;
2453 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2454 snprintf (str, MXSTR, "illegal number of processes %d", gi_maxProc);
2455 factOut (kFatal, 301, str);
2456 gi_maxProc = 1;
2457 }
2458//partially initialize event control logics
2459 evtCtrl.frstPtr = 0;
2460 evtCtrl.lastPtr = 0;
2461
2462//start all threads (more to come) when we are allowed to ....
2463 while (g_runStat == 0) {
2464 xwait.tv_sec = 0;
2465 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2466 nanosleep (&xwait, NULL);
2467 }
2468
2469 i = 0;
2470 th_ret[i] = pthread_create (&thread[i], NULL, readFAD, NULL);
2471 i++;
2472 th_ret[i] = pthread_create (&thread[i], NULL, procEvt, NULL);
2473 i++;
2474 th_ret[i] = pthread_create (&thread[i], NULL, writeEvt, NULL);
2475 i++;
2476 imax = i;
2477
2478
2479#ifdef BILAND
2480 xwait.tv_sec = 30;;
2481 xwait.tv_nsec = 0; // sleep for ~20sec
2482 nanosleep (&xwait, NULL);
2483
2484 printf ("close all runs in 2 seconds\n");
2485
2486 CloseRunFile (0, time (NULL) + 2, 0);
2487
2488 xwait.tv_sec = 1;;
2489 xwait.tv_nsec = 0; // sleep for ~20sec
2490 nanosleep (&xwait, NULL);
2491
2492 printf ("setting g_runstat to -1\n");
2493
2494 g_runStat = -1;
2495#endif
2496
2497
2498//wait for all threads to finish
2499 for (i = 0; i < imax; i++) {
2500 j = pthread_join (thread[i], (void **) &status);
2501 }
2502
2503} /*-----------------------------------------------------------------*/
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519 /*-----------------------------------------------------------------*/
2520 /*-----------------------------------------------------------------*/
2521 /*-----------------------------------------------------------------*/
2522 /*-----------------------------------------------------------------*/
2523 /*-----------------------------------------------------------------*/
2524
2525#ifdef BILAND
2526
2527int
2528subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2529 int8_t * buffer)
2530{
2531 printf ("called subproc %d\n", threadID);
2532 return threadID + 1;
2533}
2534
2535
2536
2537
2538 /*-----------------------------------------------------------------*/
2539 /*-----------------------------------------------------------------*/
2540 /*-----------------------------------------------------------------*/
2541 /*-----------------------------------------------------------------*/
2542 /*-----------------------------------------------------------------*/
2543
2544
2545
2546
2547FileHandle_t
2548runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2549{
2550 return 1;
2551};
2552
2553int
2554runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2555{
2556 return 1;
2557 usleep (10000);
2558 return 1;
2559}
2560
2561
2562//{ return 1; } ;
2563
2564int
2565runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2566{
2567 return 1;
2568};
2569
2570
2571
2572
2573int
2574eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2575{
2576 int i = 0;
2577
2578// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2579// for (i=0; i<NBOARDS; i++) {
2580// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2581// }
2582 return 0;
2583}
2584
2585
2586void
2587factStatNew (EVT_STAT gi)
2588{
2589 int i;
2590
2591//for (i=0;i<MAX_SOCK;i++) {
2592// printf("%4d",gi.numRead[i]);
2593// if (i%20 == 0 ) printf("\n");
2594//}
2595}
2596
2597void
2598gotNewRun (int runnr, PEVNT_HEADER * headers)
2599{
2600 printf ("got new run %d\n", runnr);
2601 return;
2602}
2603
2604void
2605factStat (GUI_STAT gj)
2606{
2607// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2608// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2609}
2610
2611
2612void
2613debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2614 int state, uint32_t tsec, uint32_t tusec)
2615{
2616// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2617}
2618
2619
2620
2621void
2622debugStream (int isock, void *buf, int len)
2623{
2624}
2625
2626void
2627debugHead (int i, int j, void *buf)
2628{
2629}
2630
2631
2632void
2633factOut (int severity, int err, char *message)
2634{
2635 static FILE *fd;
2636 static int file = 0;
2637
2638 if (file == 0) {
2639 printf ("open file\n");
2640 fd = fopen ("x.out", "w+");
2641 file = 999;
2642 }
2643
2644 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2645
2646 if (severity != kDebug)
2647 printf ("%3d %3d | %s\n", severity, err, message);
2648}
2649
2650
2651
2652int
2653main ()
2654{
2655 int i, b, c, p;
2656 char ipStr[100];
2657 struct in_addr IPaddr;
2658
2659 g_maxMem = 1024 * 1024; //MBytes
2660//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2661 g_maxMem = g_maxMem * 200; //100MBytes
2662
2663 g_maxProc = 20;
2664 g_maxSize = 30000;
2665
2666 g_runStat = 40;
2667
2668 i = 0;
2669
2670// version for standard crates
2671//for (c=0; c<4,c++) {
2672// for (b=0; b<10; b++) {
2673// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2674//
2675// inet_pton(PF_INET, ipStr, &IPaddr) ;
2676//
2677// g_port[i].sockAddr.sin_family = PF_INET;
2678// g_port[i].sockAddr.sin_port = htons(5000) ;
2679// g_port[i].sockAddr.sin_addr = IPaddr ;
2680// g_port[i].sockDef = 1 ;
2681// i++ ;
2682// }
2683//}
2684//
2685//version for PC-test *
2686 for (c = 0; c < 4; c++) {
2687 for (b = 0; b < 10; b++) {
2688 sprintf (ipStr, "10.0.%d.11", 128 + c);
2689 if (c < 2)
2690 sprintf (ipStr, "10.0.%d.11", 128);
2691 else
2692 sprintf (ipStr, "10.0.%d.11", 131);
2693// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2694
2695 inet_pton (PF_INET, ipStr, &IPaddr);
2696 p = 31919 + 100 * c + 10 * b;
2697
2698
2699 g_port[i].sockAddr.sin_family = PF_INET;
2700 g_port[i].sockAddr.sin_port = htons (p);
2701 g_port[i].sockAddr.sin_addr = IPaddr;
2702 g_port[i].sockDef = 1;
2703
2704 i++;
2705 }
2706 }
2707
2708
2709//g_port[17].sockDef =-1 ;
2710//g_actBoards-- ;
2711
2712 StartEvtBuild ();
2713
2714 return 0;
2715
2716}
2717#endif
Note: See TracBrowser for help on using the repository browser.