source: trunk/FACT++/src/EventBuilder.c@ 12430

Last change on this file since 12430 was 12410, checked in by lyard, 13 years ago
added correct Rois extraction to event builder
File size: 85.5 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6
7#include <stdlib.h>
8#include <stdint.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include "EventBuilder.h"
26
27enum Severity
28{
29 kMessage = 10, ///< Just a message, usually obsolete
30 kInfo = 20, ///< An info telling something which can be interesting to know
31 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
32 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
33 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
34 kDebug = 99, ///< A message used for debugging only
35};
36
37#define MIN_LEN 32 // min #bytes needed to interpret FADheader
38#define MAX_LEN 256*1024 // size of read-buffer per socket
39
40//#define nanosleep(x,y)
41
42extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
43extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
44extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
45//extern int runFinish (uint32_t runnr);
46
47extern void factOut (int severity, int err, char *message);
48
49extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
50
51
52extern void factStat (GUI_STAT gj);
53
54extern void factStatNew (EVT_STAT gi);
55
56extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
57
58extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
59 int8_t * buffer);
60
61extern void debugHead (int i, int j, void *buf);
62
63extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
64 int32_t runnr, int state, uint32_t tsec,
65 uint32_t tusec);
66extern void debugStream (int isock, void *buf, int len);
67
68int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
69
70
71
72
73
74int g_maxProc;
75int g_maxSize;
76int gi_maxSize;
77int gi_maxProc;
78
79uint g_actTime;
80uint g_actUsec;
81int g_runStat;
82int g_reset;
83int g_useFTM;
84
85int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
86size_t g_maxMem; //maximum memory allowed for buffer
87
88//no longer needed ...
89int g_maxBoards; //maximum number of boards to be initialized
90int g_actBoards;
91//
92
93FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
94
95
96int gi_runStat;
97int gp_runStat;
98int gw_runStat;
99
100int gi_memStat = +1;
101
102uint32_t gi_myRun = 0;
103uint32_t actrun = 0;
104
105
106uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
107
108//uint gi_EvtStart= 0 ;
109//uint gi_EvtRead = 0 ;
110//uint gi_EvtBad = 0 ;
111//uint gi_EvtTot = 0 ;
112//size_t gi_usedMem = 0 ;
113
114//uint gw_EvtTot = 0 ;
115//uint gp_EvtTot = 0 ;
116
117PIX_MAP g_pixMap[NPIX];
118
119EVT_STAT gi;
120GUI_STAT gj;
121
122EVT_CTRL evtCtrl; //control of events during processing
123int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
124
125WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
126
127#define MXSTR 1000
128char str[MXSTR];
129
130//ETIENNE
131#define MAX_SLOTS_PER_CHUNK 100
132
133typedef struct {
134 int32_t eventNumber;
135 int32_t chunk;
136 int32_t slot;
137} CHUNK_MAPPING;
138
139CHUNK_MAPPING mBufferMapping[MAX_EVT * MAX_RUN];
140
141#define MAX_EVT_MEM (sizeof(EVENT) + NPIX*1024*2 + NTMARK*1024*2)
142#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
143#define MAX_SLOT_SIZE (MAX_EVT_MEM + MAX_HEAD_MEM)
144#define MAX_CHUNK_SIZE (MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK)
145
146typedef struct {
147 void * pointers[MAX_SLOTS_PER_CHUNK];
148 int32_t events[MAX_SLOTS_PER_CHUNK];
149 int32_t nFreeSlots;
150 int32_t nSlots;
151} ETI_CHUNK;
152
153int32_t numAllocatedChunks = 0;
154
155#define MAX_CHUNKS 8096
156ETI_CHUNK EtiMemoryChunks[MAX_CHUNKS];
157
158void* ETI_Malloc(int evtId, int evtIndex)
159{
160 int i,j;
161 for (i=0;i<numAllocatedChunks;i++) {
162 if (EtiMemoryChunks[i].nFreeSlots > 0) {
163 for (j=0;j<EtiMemoryChunks[i].nSlots;j++)
164 {
165 if (EtiMemoryChunks[i].events[j] == -1)
166 {
167 EtiMemoryChunks[i].events[j] = evtId;
168 EtiMemoryChunks[i].nFreeSlots--;
169 mBufferMapping[evtIndex].eventNumber = evtId;
170 mBufferMapping[evtIndex].chunk = i;
171 mBufferMapping[evtIndex].slot = j;
172 return EtiMemoryChunks[i].pointers[j];
173 }
174 }
175 //If I reach this point then we have a problem because it should have found
176 //a free spot just above.
177 }
178 }
179 //If we reach this point this means that we should allocate more memory
180 int32_t numNewSlots = 0;
181 if ((numAllocatedChunks + 1)*MAX_CHUNK_SIZE < g_maxMem)
182 numNewSlots = MAX_SLOTS_PER_CHUNK;
183 else
184 numNewSlots = (g_maxMem - numAllocatedChunks*MAX_CHUNK_SIZE)/MAX_SLOT_SIZE;
185
186 if (numNewSlots == 0)//cannot allocate more without exceeding the max mem limit
187 {
188 return NULL;
189 }
190
191 EtiMemoryChunks[numAllocatedChunks].pointers[0] = malloc(MAX_SLOT_SIZE*numNewSlots);
192 if (EtiMemoryChunks[numAllocatedChunks].pointers[0] == NULL)
193 {
194 snprintf (str, MXSTR, "could not allocate %lu bytes. %d chunks are currently allocated (max allowed %lu bytes)", MAX_CHUNK_SIZE, numAllocatedChunks, g_maxMem);
195 factOut (kError, 000, str);
196 return NULL;
197 }
198
199 EtiMemoryChunks[numAllocatedChunks].nSlots = numNewSlots;
200 EtiMemoryChunks[numAllocatedChunks].events[0] = evtId;
201 EtiMemoryChunks[numAllocatedChunks].nFreeSlots = numNewSlots-1;
202 mBufferMapping[evtIndex].eventNumber = evtId;
203 mBufferMapping[evtIndex].chunk = numAllocatedChunks;
204 mBufferMapping[evtIndex].slot = 0;
205
206 for (i=1;i<numNewSlots;i++)
207 {
208 EtiMemoryChunks[numAllocatedChunks].pointers[i] = &(((char*)(EtiMemoryChunks[numAllocatedChunks].pointers[i-1]))[MAX_SLOT_SIZE]);
209 EtiMemoryChunks[numAllocatedChunks].events[i] = -1;
210 }
211 numAllocatedChunks++;
212
213 return EtiMemoryChunks[numAllocatedChunks-1].pointers[0];
214}
215
216void ETI_Free(int evtId, int evtIndex)
217{
218 ETI_CHUNK* currentChunk = &EtiMemoryChunks[mBufferMapping[evtIndex].chunk];
219 if (currentChunk->events[mBufferMapping[evtIndex].slot] != evtId)
220 {
221 snprintf (str, MXSTR, "Mismatch in chunk mapping table. expected evtId %d. Got %d. No memory was freed.", evtId, currentChunk->events[mBufferMapping[evtIndex].slot]);
222 factOut (kError, 000, str);
223 return;
224 }
225 currentChunk->events[mBufferMapping[evtIndex].slot] = -1;
226 currentChunk->nFreeSlots++;
227
228 int chunkIndex = mBufferMapping[evtIndex].chunk;
229 if (chunkIndex != numAllocatedChunks-1)
230 return;
231
232 while (EtiMemoryChunks[chunkIndex].nFreeSlots == EtiMemoryChunks[chunkIndex].nSlots)
233 {//free this chunk
234 free(EtiMemoryChunks[chunkIndex].pointers[0]);
235 numAllocatedChunks--;
236 chunkIndex--;
237 if (numAllocatedChunks == 0)
238 break;
239 }
240}
241//END ETIENNE
242
243
244
245RUN_HEAD actRun;
246
247RUN_CTRL runCtrl[MAX_RUN];
248
249RUN_TAIL runTail[MAX_RUN];
250
251
252/*
253*** Definition of rdBuffer to read in IP packets; keep it global !!!!
254 */
255
256
257typedef union
258{
259 int8_t B[MAX_LEN];
260 int16_t S[MAX_LEN / 2];
261 int32_t I[MAX_LEN / 4];
262 int64_t L[MAX_LEN / 8];
263} CNV_FACT;
264
265typedef struct
266{
267 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
268 int32_t bufPos; //next byte to read to the buffer next
269 int32_t bufLen; //number of bytes left to read
270// size_t bufLen; //number of bytes left to read size_t might be better
271 int32_t skip; //number of bytes skipped before start of event
272
273 int errCnt; //how often connect failed since last successful
274 int sockStat; //-1 if socket not yet connected , 99 if not exist
275 int socket; //contains the sockets
276 struct sockaddr_in SockAddr; //IP for each socket
277
278 int evtID; // event ID of event currently read
279 int runID; // run "
280 int ftmID; // event ID from FTM
281 uint fadLen; // FADlength of event currently read
282 int fadVers; // Version of FAD
283 int ftmTyp; // trigger type
284 int board; // boardID (softwareID: 0..40 )
285 int Port;
286
287 CNV_FACT *rBuf;
288
289#ifdef EVTDEBUG
290 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
291#endif
292
293} READ_STRUCT;
294
295
296typedef union
297{
298 int8_t B[2];
299 int16_t S;
300} SHORT_BYTE;
301
302
303
304
305
306SHORT_BYTE start, stop;
307
308READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
309
310
311
312/*-----------------------------------------------------------------*/
313
314
315/*-----------------------------------------------------------------*/
316
317int
318runFinish1 (uint32_t runnr)
319{
320 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
321 runnr);
322 factOut (kInfo, 173, str); //but continue anyhow
323 return 0;
324}
325int
326runFinish (uint32_t runnr)
327{
328 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
329 runnr);
330 factOut (kInfo, 173, str); //but continue anyhow
331 return 0;
332}
333
334int
335GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
336 READ_STRUCT * rd)
337{
338/*
339*** generate Address, create sockets and allocates readbuffer for it
340***
341*** if flag==0 generate socket and buffer
342*** <0 destroy socket and buffer
343*** >0 close and redo socket
344***
345*** sid : board*7 + port id
346 */
347
348 int j;
349 int optval = 1; //activate keepalive
350 socklen_t optlen = sizeof (optval);
351
352
353 if (sid % 7 >= NUMSOCK) {
354 //this is a not used socket, so do nothing ...
355 rd->sockStat = 77;
356 rd->rBuf = NULL ;
357 return 0;
358 }
359
360 if (rd->sockStat == 0) { //close socket if open
361 j = close (rd->socket);
362 if (j > 0) {
363 snprintf (str, MXSTR, "Error closing socket %d | %m", sid);
364 factOut (kFatal, 771, str);
365 } else {
366 snprintf (str, MXSTR, "Succesfully closed socket %d", sid);
367 factOut (kInfo, 771, str);
368 }
369 }
370
371 rd->sockStat = 99;
372
373 if (flag < 0) {
374 free (rd->rBuf); //and never open again
375#ifdef EVTDEBUG
376 free (rd->xBuf); //and never open again
377#endif
378 rd->rBuf = NULL;
379 return 0;
380 }
381
382
383 if (flag == 0) { //generate address and buffer ...
384 rd->Port = port;
385 rd->SockAddr.sin_family = sockAddr->sin_family;
386 rd->SockAddr.sin_port = htons (port);
387 rd->SockAddr.sin_addr = sockAddr->sin_addr;
388
389#ifdef EVTDEBUG
390 rd->xBuf = malloc (sizeof (CNV_FACT));
391#endif
392 rd->rBuf = malloc (sizeof (CNV_FACT));
393 if (rd->rBuf == NULL) {
394 snprintf (str, MXSTR, "Could not create local buffer %d", sid);
395 factOut (kFatal, 774, str);
396 rd->sockStat = 77;
397 return -3;
398 }
399 }
400
401
402 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
403 snprintf (str, MXSTR, "Could not generate socket %d | %m", sid);
404 factOut (kFatal, 773, str);
405 rd->sockStat = 88;
406 return -2;
407 }
408 optval = 1;
409 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
410 snprintf (str, MXSTR, "Could not set keepalive %d | %m", sid);
411 factOut (kInfo, 173, str); //but continue anyhow
412 }
413 optval = 10; //start after 10 seconds
414 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
415 snprintf (str, MXSTR, "Could not set keepidle %d | %m", sid);
416 factOut (kInfo, 173, str); //but continue anyhow
417 }
418 optval = 10; //do every 10 seconds
419 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
420 snprintf (str, MXSTR, "Could not set keepintvl %d | %m", sid);
421 factOut (kInfo, 173, str); //but continue anyhow
422 }
423 optval = 2; //close after 2 unsuccessful tries
424 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
425 snprintf (str, MXSTR, "Could not set keepalive probes %d | %m", sid);
426 factOut (kInfo, 173, str); //but continue anyhow
427 }
428
429
430 snprintf (str, MXSTR, "Successfully generated socket %d ", sid);
431 factOut (kInfo, 773, str);
432 rd->sockStat = -1; //try to (re)open socket
433 rd->errCnt = 0;
434 return 0;
435
436} /*-----------------------------------------------------------------*/
437
438 /*-----------------------------------------------------------------*/
439
440
441
442
443int
444mBufInit ()
445{
446// initialize mBuffer (mark all entries as unused\empty)
447
448 int i;
449 uint32_t actime;
450
451 actime = g_actTime + 50000000;
452
453 for (i = 0; i < MAX_EVT * MAX_RUN; i++) {
454 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
455 mBuffer[i].runNum = 0;
456
457 evtCtrl.evtBuf[i] = -1;
458 evtCtrl.evtStat[i] = -1;
459 evtCtrl.pcTime[i] = actime; //initiate to far future
460
461 //ETIENNE
462 mBufferMapping[i].chunk = -1;
463 mBufferMapping[i].eventNumber = -1;
464 mBufferMapping[i].slot = -1;
465 //END ETIENNE
466 }
467
468 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
469
470 evtCtrl.frstPtr = 0;
471 evtCtrl.lastPtr = 0;
472
473 return 0;
474
475} /*-----------------------------------------------------------------*/
476
477
478
479
480int
481mBufEvt (int evID, uint runID, int nRoi[], int sk,
482 int fadlen, int trgTyp, int trgNum, int fadNum)
483{
484// generate a new Event into mBuffer:
485// make sure only complete Event are possible, so 'free' will always work
486// returns index into mBuffer[], or negative value in case of error
487// error: <-9000 if roi screwed up (not consistent with run)
488// <-8000 (not consistent with event)
489// <-7000 (not consistent with board)
490// < 0 if no space left
491
492 struct timeval *tv, atv;
493 tv = &atv;
494 uint32_t tsec, tusec;
495 uint oldest;
496 int jold;
497
498 int i, k, jr, b, evFree;
499 int headmem = 0;
500 size_t needmem = 0;
501
502
503 b = sk / 7;
504
505 if (nRoi[0] < 0 || nRoi[0] > 1024) {
506 snprintf (str, MXSTR, "illegal nRoi[0]: %d", nRoi[0]);
507 factOut (kError, 999, str);
508 gj.badRoiR++;
509 gj.badRoi[b]++;
510 return -9999;
511 }
512
513 for (jr = 1; jr < 8; jr++) {
514 if (nRoi[jr] != nRoi[0]) {
515 snprintf (str, MXSTR, "wrong nRoi[%d]: %d %d", jr, nRoi[jr],
516 nRoi[0]);
517 factOut (kError, 711, str);
518 gj.badRoiB++;
519 gj.badRoi[b]++;
520 return -7101;
521 }
522 }
523 if (nRoi[8] < nRoi[0]) {
524 snprintf (str, MXSTR, "wrong nRoi_TM: %d %d", nRoi[8], nRoi[0]);
525 factOut (kError, 712, str);
526 gj.badRoiB++;
527 gj.badRoi[b]++;
528 return -7102;
529 }
530
531
532 i = evID % MAX_EVT;
533 evFree = -1;
534
535 for (k = 0; k < MAX_RUN; k++) {
536 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
537 // is it ok ????
538 if (mBuffer[i].nRoi != nRoi[0]
539 || mBuffer[i].nRoiTM != nRoi[8]) {
540 snprintf (str, MXSTR, "illegal evt_roi %d %d ; %d %d",
541 nRoi[0], nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM);
542 factOut (kError, 821, str);
543 gj.badRoiE++;
544 gj.badRoi[b]++;
545 return -8201;
546 }
547// count for inconsistencies
548
549 if (mBuffer[i].trgNum != trgNum)
550 mBuffer[i].Errors[0]++;
551 if (mBuffer[i].fadNum != fadNum)
552 mBuffer[i].Errors[1]++;
553 if (mBuffer[i].trgTyp != trgTyp)
554 mBuffer[i].Errors[2]++;
555
556 //everything seems fine so far ==> use this slot ....
557 return i;
558 }
559 if (evFree < 0 && mBuffer[i].evNum < 0)
560 evFree = i;
561 i += MAX_EVT;
562 }
563
564
565 //event does not yet exist; create it
566 if (evFree < 0) { //no space available in ctrl
567 snprintf (str, MXSTR, "no control slot to keep event %d", evID);
568 factOut (kError, 881, str);
569 return -1;
570 }
571 i = evFree; //found free entry; use it ...
572
573 gettimeofday (tv, NULL);
574 tsec = atv.tv_sec;
575 tusec = atv.tv_usec;
576
577 //check if runId already registered in runCtrl
578 evFree = -1;
579 oldest = g_actTime + 1000;
580 jold = -1;
581 for (k = 0; k < MAX_RUN; k++) {
582 if (runCtrl[k].runId == runID) {
583// if (runCtrl[k].procId > 0) { //run is closed -> reject
584// snprintf (str, MXSTR, "skip event since run %d finished", runID);
585// factOut (kInfo, 931, str);
586// return -21;
587// }
588
589 if (runCtrl[k].roi0 != nRoi[0]
590 || runCtrl[k].roi8 != nRoi[8]) {
591 snprintf (str, MXSTR, "illegal run_roi %d %d ; %d %d",
592 nRoi[0], nRoi[8], runCtrl[k].roi0, runCtrl[k].roi8);
593 factOut (kError, 931, str);
594 gj.badRoiR++;
595 gj.badRoi[b]++;
596 return -9301;
597 }
598 goto RUNFOUND;
599 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
600 evFree = k;
601 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
602 if (runCtrl[k].closeTime < oldest) {
603 oldest = runCtrl[k].closeTime;
604 jold = k;
605 }
606 }
607 }
608
609 if (evFree < 0 && jold < 0) {
610 snprintf (str, MXSTR, "not able to register the new run %d", runID);
611 factOut (kFatal, 883, str);
612 return -1001;
613 } else {
614 if (evFree < 0)
615 evFree = jold;
616 snprintf (str, MXSTR, "register new run %d(%d) roi: %d %d", runID,
617 evFree, nRoi[0], nRoi[8]);
618 factOut (kInfo, 503, str);
619 runCtrl[evFree].runId = runID;
620 runCtrl[evFree].roi0 = nRoi[0];
621 runCtrl[evFree].roi8 = nRoi[8];
622 runCtrl[evFree].fileId = -2;
623 runCtrl[evFree].procId = -2;
624 runCtrl[evFree].lastEvt = -1;
625 runCtrl[evFree].nextEvt = 0;
626 runCtrl[evFree].actEvt = 0;
627 runCtrl[evFree].procEvt = 0;
628 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
629 runCtrl[evFree].firstUsec = tusec;
630 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
631 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
632// runCtrl[evFree].lastTime = 0;
633
634 runTail[evFree].nEventsOk =
635 runTail[evFree].nEventsRej =
636 runTail[evFree].nEventsBad =
637 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
638 }
639
640 RUNFOUND:
641 //ETIENNE
642/* needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
643
644 headmem = NBOARDS * sizeof (PEVNT_HEADER);
645
646 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
647 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
648 if (gi_memStat > 0) {
649 gi_memStat = -99;
650 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
651 evID, sk);
652 factOut (kError, 882, str);
653 } else {
654 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
655 evID, sk);
656 factOut (kDebug, 882, str);
657 }
658 return -11;
659 }
660*/
661 mBuffer[i].FADhead = ETI_Malloc(evID, i);
662 mBuffer[i].buffer = NULL;
663 if (mBuffer[i].FADhead != NULL)
664 mBuffer[i].fEvent = (EVENT*)&(((char*)(mBuffer[i].FADhead))[MAX_HEAD_MEM]);
665 else
666 {
667 mBuffer[i].fEvent = NULL;
668 gj.usdMem = 0;
669 for (k=0;k<numAllocatedChunks;k++)
670 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
671 if (gj.usdMem > gj.maxMem)
672 gj.maxMem = gj.usdMem;
673 if (gi_memStat > 0) {
674 gi_memStat = -99;
675 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
676 evID, sk);
677 factOut (kError, 882, str);
678 } else {
679 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
680 evID, sk);
681 factOut (kDebug, 882, str);
682 }
683 return -11;
684 }
685 /*
686 mBuffer[i].FADhead = malloc (headmem);
687 if (mBuffer[i].FADhead == NULL) {
688 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
689 factOut (kError, 882, str);
690 return -12;
691 }
692
693 mBuffer[i].fEvent = malloc (needmem);
694 if (mBuffer[i].fEvent == NULL) {
695 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
696 factOut (kError, 882, str);
697 free (mBuffer[i].FADhead);
698 mBuffer[i].FADhead = NULL;
699 return -22;
700 }
701
702 mBuffer[i].buffer = malloc (gi_maxSize);
703 if (mBuffer[i].buffer == NULL) {
704 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
705 factOut (kError, 882, str);
706 free (mBuffer[i].FADhead);
707 mBuffer[i].FADhead = NULL;
708 free (mBuffer[i].fEvent);
709 mBuffer[i].fEvent = NULL;
710 return -32;
711 }*/
712 //END ETIENNE
713 //flag all boards as unused
714 mBuffer[i].nBoard = 0;
715 for (k = 0; k < NBOARDS; k++) {
716 mBuffer[i].board[k] = -1;
717 }
718 //flag all pixels as unused
719 for (k = 0; k < NPIX; k++) {
720 mBuffer[i].fEvent->StartPix[k] = -1;
721 }
722 //flag all TMark as unused
723 for (k = 0; k < NTMARK; k++) {
724 mBuffer[i].fEvent->StartTM[k] = -1;
725 }
726
727 mBuffer[i].fEvent->NumBoards = 0;
728 mBuffer[i].fEvent->PCUsec = tusec;
729 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
730 mBuffer[i].nRoi = nRoi[0];
731 mBuffer[i].nRoiTM = nRoi[8];
732 mBuffer[i].evNum = evID;
733 mBuffer[i].runNum = runID;
734 mBuffer[i].fadNum = fadNum;
735 mBuffer[i].trgNum = trgNum;
736 mBuffer[i].trgTyp = trgTyp;
737 mBuffer[i].evtLen = needmem;
738 mBuffer[i].Errors[0] =
739 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
740//ETIENNE
741 //gj.usdMem += needmem + headmem + gi_maxSize;
742 gj.usdMem = 0;
743 for (k=0;k<numAllocatedChunks;k++)
744 {
745 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
746 }
747//END ETIENNE
748 if (gj.usdMem > gj.maxMem)
749 gj.maxMem = gj.usdMem;
750
751 gj.bufTot++;
752 if (gj.bufTot > gj.maxEvt)
753 gj.maxEvt = gj.bufTot;
754
755 gj.rateNew++;
756
757 //register event in 'active list (reading)'
758
759 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
760 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
761 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
762 evtIdx[i] = evtCtrl.lastPtr;
763
764
765 snprintf (str, MXSTR,
766 "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", evID,
767 runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
768 factOut (kDebug, -11, str);
769 evtCtrl.lastPtr++;
770 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN)
771 evtCtrl.lastPtr = 0;
772
773 gi.evtGet++;
774
775 return i;
776
777} /*-----------------------------------------------------------------*/
778
779
780
781
782int
783mBufFree (int i)
784{
785//delete entry [i] from mBuffer:
786//(and make sure multiple calls do no harm ....)
787
788 int headmem = 0;
789 int evid;
790 size_t freemem = 0;
791
792 evid = mBuffer[i].evNum;
793 freemem = mBuffer[i].evtLen;
794 //ETIENNE
795 ETI_Free(evid, i);
796// free (mBuffer[i].fEvent);
797 mBuffer[i].fEvent = NULL;
798
799// free (mBuffer[i].FADhead);
800 mBuffer[i].FADhead = NULL;
801
802// free (mBuffer[i].buffer);
803 mBuffer[i].buffer = NULL;
804//END ETIENNE
805 headmem = NBOARDS * sizeof (PEVNT_HEADER);
806 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
807 mBuffer[i].runNum = 0;
808//ETIENNE
809// gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
810 gj.usdMem = 0;
811 int k;
812 for (k=0;k<numAllocatedChunks;k++)
813 {
814 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
815 }
816//END ETIENNE
817 gj.bufTot--;
818
819 if (gi_memStat < 0) {
820 if (gj.usdMem <= 0.75 * gj.maxMem)
821 gi_memStat = +1;
822 }
823
824
825 return 0;
826
827} /*-----------------------------------------------------------------*/
828
829
830void
831resetEvtStat ()
832{
833 int i;
834
835 for (i = 0; i < MAX_SOCK; i++)
836 gi.numRead[i] = 0;
837
838 for (i = 0; i < NBOARDS; i++) {
839 gi.gotByte[i] = 0;
840 gi.gotErr[i] = 0;
841
842 }
843
844 gi.evtGet = 0; //#new Start of Events read
845 gi.evtTot = 0; //#complete Events read
846 gi.evtErr = 0; //#Events with Errors
847 gi.evtSkp = 0; //#Events incomplete (timeout)
848
849 gi.procTot = 0; //#Events processed
850 gi.procErr = 0; //#Events showed problem in processing
851 gi.procTrg = 0; //#Events accepted by SW trigger
852 gi.procSkp = 0; //#Events rejected by SW trigger
853
854 gi.feedTot = 0; //#Events used for feedBack system
855 gi.feedErr = 0; //#Events rejected by feedBack
856
857 gi.wrtTot = 0; //#Events written to disk
858 gi.wrtErr = 0; //#Events with write-error
859
860 gi.runOpen = 0; //#Runs opened
861 gi.runClose = 0; //#Runs closed
862 gi.runErr = 0; //#Runs with open/close errors
863
864 return;
865} /*-----------------------------------------------------------------*/
866
867
868
869void
870initReadFAD ()
871{
872 return;
873} /*-----------------------------------------------------------------*/
874
875
876
877void *
878readFAD (void *ptr)
879{
880/* *** main loop reading FAD data and sorting them to complete events */
881 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k;
882 int actBoards = 0, minLen;
883 int32_t jrd;
884 uint gi_SecTime; //time in seconds
885 int boardId, roi[9], drs, px, src, pixS, pixH, pixC, pixR, tmS;
886
887 int goodhed = 0;
888
889 int sockDef[NBOARDS]; //internal state of sockets
890 int jrdx;
891
892
893 struct timespec xwait;
894
895
896 struct timeval *tv, atv;
897 tv = &atv;
898 uint32_t tsec, tusec;
899
900
901 snprintf (str, MXSTR, "start initializing");
902 factOut (kInfo, -1, str);
903
904 int cpu = 7; //read thread
905 cpu_set_t mask;
906
907/* CPU_ZERO initializes all the bits in the mask to zero. */
908// CPU_ZERO (&mask);
909/* CPU_SET sets only the bit corresponding to cpu. */
910 cpu = 7;
911// CPU_SET (cpu, &mask);
912
913/* sched_setaffinity returns 0 in success */
914// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
915// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
916// factOut (kWarn, -1, str);
917// }
918
919
920 head_len = sizeof (PEVNT_HEADER);
921 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
922 minLen = head_len; //min #bytes needed to check header: full header for debug
923
924 start.S = 0xFB01;
925 stop.S = 0x04FE;
926
927/* initialize run control logics */
928 for (i = 0; i < MAX_RUN; i++) {
929 runCtrl[i].runId = 0;
930 runCtrl[i].fileId = -2;
931 runCtrl[i].procId = -2;
932 }
933 gi_resetS = gi_resetR = 9;
934 for (i = 0; i < NBOARDS; i++)
935 sockDef[i] = 0;
936
937 START:
938 gettimeofday (tv, NULL);
939 g_actTime = tsec = atv.tv_sec;
940 g_actUsec = tusec = atv.tv_usec;
941 gi_myRun = g_actTime;
942 evtCtrl.frstPtr = 0;
943 evtCtrl.lastPtr = 0;
944
945 gi_SecTime = g_actTime;
946 gi_runStat = g_runStat;
947 gj.readStat = g_runStat;
948 numok = numok2 = 0;
949
950 int cntsock = 8 - NUMSOCK ;
951
952 if (gi_resetS > 0) {
953 //make sure all sockets are preallocated as 'not exist'
954 for (i = 0; i < MAX_SOCK; i++) {
955 rd[i].socket = -1;
956 rd[i].sockStat = 99;
957 }
958
959 for (k = 0; k < NBOARDS; k++) {
960 gi_NumConnect[k] = 0;
961 gi.numConn[k] = 0;
962 gj.numConn[k] = 0;
963 gj.errConn[k] = 0;
964 gj.rateBytes[k] = 0;
965 gj.totBytes[k] = 0;
966 }
967
968 }
969
970
971 if (gi_resetR > 0) {
972 resetEvtStat ();
973 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
974 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
975 gj.totMem = g_maxMem;
976 gj.bufNew = gj.bufEvt = 0;
977 gj.badRoiE = gj.badRoiR = gj.badRoiB =
978 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
979
980 int b,p;
981 for (b = 0; b < NBOARDS; b++)
982 gj.badRoi[b] = 0;
983
984 mBufInit (); //initialize buffers
985
986 snprintf (str, MXSTR, "end initializing");
987 factOut (kInfo, -1, str);
988 }
989
990
991 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
992
993 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
994
995 gi_runStat = g_runStat;
996 gj.readStat = g_runStat;
997 gettimeofday (tv, NULL);
998 g_actTime = tsec = atv.tv_sec;
999 g_actUsec = tusec = atv.tv_usec;
1000
1001
1002 int b, p, p0, s0, nch;
1003 nch = 0;
1004 for (b = 0; b < NBOARDS; b++) {
1005 k = b * 7;
1006 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
1007 nch++;
1008 gi_NumConnect[b] = 0; //must close all connections
1009 gi.numConn[b] = 0;
1010 gj.numConn[b] = 0;
1011 if (sockDef[b] == 0)
1012 s0 = 0; //sockets to be defined and opened
1013 else if (g_port[b].sockDef == 0)
1014 s0 = -1; //sockets to be destroyed
1015 else
1016 s0 = +1; //sockets to be closed and reopened
1017
1018 if (s0 == 0)
1019 p0 = ntohs (g_port[b].sockAddr.sin_port);
1020 else
1021 p0 = 0;
1022
1023 for (p = p0 + 1; p < p0 + 8; p++) {
1024 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
1025 k++;
1026 }
1027 sockDef[b] = g_port[b].sockDef;
1028 }
1029 }
1030
1031 if (nch > 0) {
1032 actBoards = 0;
1033 for (b = 0; b < NBOARDS; b++) {
1034 if (sockDef[b] > 0)
1035 actBoards++;
1036 }
1037 }
1038
1039
1040 jrdx = 0;
1041 numokx = 0;
1042 numok = 0; //count number of succesfull actions
1043
1044 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read
1045 b = i / 7 ;
1046 p = i % 7 ;
1047
1048if ( p >= NUMSOCK) { ; }
1049else {
1050 if (sockDef[b] > 0)
1051 s0 = +1;
1052 else
1053 s0 = -1;
1054
1055 if (rd[i].sockStat < 0) { //try to connect if not yet done
1056 if (rd[i].sockStat == -1) {
1057 rd[i].sockStat = connect (rd[i].socket,
1058 (struct sockaddr *) &rd[i].SockAddr,
1059 sizeof (rd[i].SockAddr));
1060 if (rd[i].sockStat == -1) {
1061 rd[i].errCnt++ ;
1062// if (rd[i].errCnt < 100) rd[i].sockStat = rd[i].errCnt ;
1063// else if (rd[i].errCnt < 1000) rd[i].sockStat = 1000 ;
1064// else rd[i].sockStat = 10000 ;
1065 }
1066//printf("try to connect %d -> %d\n",i,rd[i].sockStat);
1067
1068 }
1069 if (rd[i].sockStat < -1 ) {
1070 rd[i].sockStat++ ;
1071 }
1072 if (rd[i].sockStat == 0) { //successfull ==>
1073 if (sockDef[b] > 0) {
1074 rd[i].bufTyp = 0; // expect a header
1075 rd[i].bufLen = frst_len; // max size to read at begining
1076 } else {
1077 rd[i].bufTyp = -1; // full data to be skipped
1078 rd[i].bufLen = MAX_LEN; //huge for skipping
1079 }
1080 rd[i].bufPos = 0; // no byte read so far
1081 rd[i].skip = 0; // start empty
1082// gi_NumConnect[b]++;
1083 gi_NumConnect[b] += cntsock ;
1084
1085 gi.numConn[b]++;
1086 gj.numConn[b]++;
1087 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);
1088 factOut (kInfo, -1, str);
1089 }
1090 }
1091
1092 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
1093 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
1094 numok++;
1095 size_t maxread = rd[i].bufLen ;
1096 if (maxread > MAXREAD ) maxread=MAXREAD ;
1097
1098 jrd =
1099 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
1100 maxread, MSG_DONTWAIT);
1101// rd[i].bufLen, MSG_DONTWAIT);
1102
1103 if (jrd > 0) {
1104 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
1105#ifdef EVTDEBUG
1106 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
1107 &rd[i].rBuf->B[rd[i].bufPos], jrd);
1108 snprintf (str, MXSTR,
1109 "read sock %3d bytes %5d len %5d first %d %d", i,
1110 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos],
1111 rd[i].rBuf->B[rd[i].bufPos + 1]);
1112 factOut (kDebug, 301, str);
1113#endif
1114 }
1115
1116 if (jrd == 0) { //connection has closed ...
1117 snprintf (str, MXSTR, "Socket %d closed by FAD", i);
1118 factOut (kInfo, 441, str);
1119 GenSock (s0, i, 0, NULL, &rd[i]);
1120 gi.gotErr[b]++;
1121// gi_NumConnect[b]--;
1122 gi_NumConnect[b]-= cntsock ;
1123 gi.numConn[b]--;
1124 gj.numConn[b]--;
1125
1126 } else if (jrd < 0) { //did not read anything
1127 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1128 snprintf (str, MXSTR, "Error Reading from %d | %m", i);
1129 factOut (kError, 442, str);
1130 gi.gotErr[b]++;
1131 } else
1132 numok--; //else nothing waiting to be read
1133 jrd = 0;
1134 }
1135 } else {
1136 jrd = 0; //did read nothing as requested
1137 snprintf (str, MXSTR, "do not read from socket %d %d", i,
1138 rd[i].bufLen);
1139 factOut (kDebug, 301, str);
1140 }
1141
1142 gi.gotByte[b] += jrd;
1143 gj.rateBytes[b] += jrd;
1144
1145 if (jrd > 0) {
1146 numokx++;
1147 jrdx += jrd;
1148 }
1149
1150
1151 if (rd[i].bufTyp < 0) { // we are skipping this board ...
1152// just do nothing
1153#ifdef EVTDEBUG
1154 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
1155 i);
1156 factOut (kInfo, 301, str);
1157#endif
1158
1159 } else if (rd[i].bufTyp > 0) { // we are reading data ...
1160 if (jrd < rd[i].bufLen) { //not yet all read
1161 rd[i].bufPos += jrd; //==> prepare for continuation
1162 rd[i].bufLen -= jrd;
1163 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
1164 } else { //full dataset read
1165 rd[i].bufLen = 0;
1166 rd[i].bufPos = rd[i].fadLen;
1167 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
1168 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
1169 gi.evtErr++;
1170 snprintf (str, MXSTR,
1171 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
1172 i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0],
1173 rd[i].rBuf->B[1],
1174 rd[i].rBuf->B[rd[i].bufPos - 2],
1175 rd[i].rBuf->B[rd[i].bufPos - 1]);
1176 factOut (kError, 301, str);
1177 goto EndBuf;
1178
1179#ifdef EVTDEBUG
1180 } else {
1181 snprintf (str, MXSTR,
1182 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
1183 i, rd[i].fadLen, rd[i].rBuf->B[0],
1184 rd[i].rBuf->B[1], start.B[1], start.B[0],
1185 rd[i].rBuf->B[rd[i].bufPos - 2],
1186 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
1187 stop.B[0]);
1188 factOut (kDebug, 301, str);
1189#endif
1190 }
1191
1192 if (jrd > 0)
1193 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1194
1195/* //we have a complete buffer, copy to WORK area
1196 int jr;
1197 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1198 for (jr = 1; jr < 9; jr++) {
1199 roi[jr] =
1200 ntohs (rd[i].
1201 rBuf->S[head_len / 2 + 2 + jr * (roi[jr-1] + 4)]);
1202
1203 }
1204*/
1205 //we have a complete buffer, copy to WORK area
1206 int jr, kr;
1207 int checkRoi;
1208 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1209 for (kr = 1; kr < 4; kr++)
1210 {
1211 checkRoi = ntohs(rd[i].rBuf->S[head_len/ 2 + 2
1212 + kr*(roi[0]+4)]);
1213 if (checkRoi != roi[0])
1214 {
1215 snprintf (str, MXSTR, "Inconsistent Roi accross board patches a %d %d %d", kr, checkRoi, roi[0]);
1216 factOut (kError, 1, str);
1217 goto EndBuf;
1218 }
1219
1220 }
1221 for (jr = 1; jr < 9; jr++) {
1222 roi[jr] =
1223 ntohs (rd[i].
1224 rBuf->S[head_len / 2 + 2 + 4 * jr * (roi[jr-1] + 4)]);
1225 for (kr = 1; kr < 4; kr++)
1226 {
1227 checkRoi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 //header up to first roi
1228 + kr*(roi[jr]+4) //shift up to the next board
1229 + 4*jr*(roi[jr-1]+4)]); //shift up to the correct pixel
1230 if (checkRoi != roi[jr])
1231 {
1232 snprintf (str, MXSTR, "Inconsistent Roi accross board patches b %d %d %d %d", kr, jr, roi[jr], checkRoi);
1233 factOut (kError, 1, str);
1234 goto EndBuf;
1235 }
1236 }
1237 }
1238 //get index into mBuffer for this event (create if needed)
1239
1240 int actid;
1241 if (g_useFTM > 0)
1242 actid = rd[i].evtID;
1243 else
1244 actid = rd[i].ftmID;
1245
1246 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1247 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1248 rd[i].evtID);
1249
1250 if (evID < -1000) {
1251 goto EndBuf; //not usable board/event/run --> skip it
1252 }
1253 if (evID < 0) { //no space left, retry later
1254#ifdef EVTDEBUG
1255 if (rd[i].bufLen != 0) {
1256 snprintf (str, MXSTR, "something screwed up");
1257 factOut (kFatal, 1, str);
1258 }
1259#endif
1260 xwait.tv_sec = 0;
1261 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1262 nanosleep (&xwait, NULL);
1263 goto EndBuf1; //hope there is free space next round
1264 }
1265 //we have a valid entry in mBuffer[]; fill it
1266
1267#ifdef EVTDEBUG
1268 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1269 rd[i].fadLen);
1270 if (xchk != 0) {
1271 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d",
1272 xchk, rd[i].fadLen, i);
1273 factOut (kFatal, 1, str);
1274
1275 uint iq;
1276 for (iq = 0; iq < rd[i].fadLen; iq++) {
1277 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1278 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq,
1279 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
1280 factOut (kFatal, 1, str);
1281 }
1282 }
1283 }
1284#endif
1285 int qncpy = 0;
1286 boardId = b;
1287 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1288 int fadCrate = fadBoard / 256;
1289 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1290 snprintf (str, MXSTR, "wrong Board ID %d %d %d",
1291 fadCrate, fadBoard % 256, boardId);
1292 factOut (kWarn, 301, str);
1293 }
1294 if (mBuffer[evID].board[boardId] != -1) {
1295 snprintf (str, MXSTR,
1296 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ",
1297 evID, boardId, i, rd[i].fadLen,
1298 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1299 rd[i].rBuf->B[rd[i].bufPos - 2],
1300 rd[i].rBuf->B[rd[i].bufPos - 1]);
1301 factOut (kWarn, 501, str);
1302 goto EndBuf; //--> skip Board
1303 }
1304
1305 int iDx = evtIdx[evID]; //index into evtCtrl
1306
1307 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
1308 &rd[i].rBuf->S[0], head_len);
1309 qncpy += head_len;
1310
1311 src = head_len / 2;
1312 for (px = 0; px < 9; px++) { //different sort in FAD board.....
1313 for (drs = 0; drs < 4; drs++) {
1314 pixH = ntohs (rd[i].rBuf->S[src++]); // ID
1315 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell
1316 pixR = ntohs (rd[i].rBuf->S[src++]); // roi
1317//here we should check if pixH is correct ....
1318
1319 pixS = boardId * 36 + drs * 9 + px;
1320 src++;
1321
1322
1323 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1324 dest = pixS * roi[0];
1325 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1326 &rd[i].rBuf->S[src], roi[0] * 2);
1327 qncpy += roi[0] * 2;
1328 src += pixR;
1329
1330 if (px == 8) {
1331 tmS = boardId * 4 + drs;
1332 if (pixR > roi[0]) { //and we have additional TM info
1333 dest = tmS * roi[0] + NPIX * roi[0];
1334 int srcT = src - roi[0];
1335 mBuffer[evID].fEvent->StartTM[tmS] =
1336 (pixC + pixR - roi[0]) % 1024;
1337 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1338 &rd[i].rBuf->S[srcT], roi[0] * 2);
1339 qncpy += roi[0] * 2;
1340 } else {
1341 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1342 }
1343 }
1344 }
1345 } // now we have stored a new board contents into Event structure
1346
1347 mBuffer[evID].fEvent->NumBoards++;
1348 mBuffer[evID].board[boardId] = boardId;
1349 evtCtrl.evtStat[iDx]++;
1350 evtCtrl.pcTime[iDx] = g_actTime;
1351
1352 if (++mBuffer[evID].nBoard >= actBoards) {
1353 int qnrun = 0;
1354 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ???
1355 actrun = mBuffer[evID].runNum;
1356 int ir;
1357 for (ir = 0; ir < MAX_RUN; ir++) {
1358 qnrun++;
1359 if (runCtrl[ir].runId == actrun) {
1360 if (++runCtrl[ir].lastEvt == 0) {
1361 gotNewRun (actrun, mBuffer[evID].FADhead);
1362 snprintf (str, MXSTR, "gotNewRun %d (ev %d)",
1363 mBuffer[evID].runNum,
1364 mBuffer[evID].evNum);
1365 factOut (kInfo, 1, str);
1366 break;
1367 }
1368 }
1369 }
1370 }
1371 snprintf (str, MXSTR,
1372 "%5d complete event roi %4d roiTM %d cpy %8d %5d",
1373 mBuffer[evID].evNum, roi[0], roi[8] - roi[0],
1374 qncpy, qnrun);
1375 factOut (kDebug, -1, str);
1376
1377 //complete event read ---> flag for next processing
1378 evtCtrl.evtStat[iDx] = 99;
1379 gi.evtTot++;
1380 }
1381
1382 EndBuf:
1383 rd[i].bufTyp = 0; //ready to read next header
1384 rd[i].bufLen = frst_len;
1385 rd[i].bufPos = 0;
1386 EndBuf1:
1387 ;
1388 }
1389
1390 } else { //we are reading event header
1391 rd[i].bufPos += jrd;
1392 rd[i].bufLen -= jrd;
1393 if (rd[i].bufPos >= minLen) { //sufficient data to take action
1394 //check if startflag correct; else shift block ....
1395 for (k = 0; k < rd[i].bufPos - 1; k++) {
1396 if (rd[i].rBuf->B[k] == start.B[1]
1397 && rd[i].rBuf->B[k + 1] == start.B[0])
1398 break;
1399 }
1400 rd[i].skip += k;
1401
1402 if (k >= rd[i].bufPos - 1) { //no start of header found
1403 rd[i].bufPos = 0;
1404 rd[i].bufLen = head_len;
1405 } else if (k > 0) {
1406 rd[i].bufPos -= k;
1407 rd[i].bufLen += k;
1408 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1409 rd[i].bufPos);
1410#ifdef EVTDEBUG
1411 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1412 rd[i].bufPos);
1413#endif
1414 }
1415 if (rd[i].bufPos >= minLen) {
1416 if (rd[i].skip > 0) {
1417 snprintf (str, MXSTR, "skipped %d bytes on port%d",
1418 rd[i].skip, i);
1419 factOut (kInfo, 666, str);
1420 rd[i].skip = 0;
1421 }
1422 goodhed++;
1423 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2;
1424 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]);
1425 rd[i].ftmTyp = ntohs (rd[i].rBuf->S[5]);
1426 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt)
1427 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt)
1428 rd[i].runID = ntohl (rd[i].rBuf->I[11]);
1429 rd[i].bufTyp = 1; //ready to read full record
1430 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1431
1432 int fadboard = ntohs (rd[i].rBuf->S[12]);
1433 int fadcrate = fadboard / 256;
1434 fadboard = (fadcrate * 10 + fadboard % 256);
1435#ifdef EVTDEBUG
1436 snprintf (str, MXSTR,
1437 "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
1438 rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
1439 rd[i].runID, fadboard, jrd);
1440 factOut (kDebug, 1, str);
1441#endif
1442
1443 if (rd[i].runID == 0)
1444 rd[i].runID = gi_myRun;
1445
1446 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
1447 snprintf (str, MXSTR,
1448 "illegal event-length on port %d", i);
1449 factOut (kFatal, 881, str);
1450 rd[i].bufLen = 100000; //?
1451 }
1452 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1453 debugHead (i, fadBoard, rd[i].rBuf);
1454 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1455 } else {
1456 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1457 }
1458 } else {
1459 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1460 }
1461
1462 } //end interpreting last read
1463}
1464 } //end of successful read anything
1465 } //finished trying to read all sockets
1466
1467#ifdef EVTDEBUG
1468 snprintf (str, MXSTR, "Loop ---- %3d --- %8d", numokx, jrdx);
1469 factOut (kDebug, -1, str);
1470#endif
1471
1472 gi.numRead[numok]++;
1473
1474 g_actTime = time (NULL);
1475 if (g_actTime > gi_SecTime) {
1476 gi_SecTime = g_actTime;
1477
1478
1479 //loop over all active events and flag those older than read-timeout
1480 //delete those that are written to disk ....
1481
1482 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1483 if (kd < 0)
1484 kd += (MAX_EVT * MAX_RUN);
1485
1486 gj.bufNew = gj.bufEvt = 0;
1487 int k1 = evtCtrl.frstPtr;
1488 for (k = k1; k < (k1 + kd); k++) {
1489 int k0 = k % (MAX_EVT * MAX_RUN);
1490//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1491
1492 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) {
1493 gj.bufNew++; //incomplete event in Buffer
1494 if (evtCtrl.evtStat[k0] < 90
1495 && evtCtrl.pcTime[k0] < g_actTime - 10) {
1496 int id = evtCtrl.evtBuf[k0];
1497 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d",
1498 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0,
1499 evtCtrl.evtStat[k0]);
1500 factOut (kWarn, 601, str);
1501
1502 int ik,ib,jb;
1503 ik=0;
1504 for (ib=0; ib<NBOARDS; ib++) {
1505 if (ib%10==0) {
1506 snprintf (&str[ik], MXSTR, "'");
1507 ik++;
1508 }
1509 jb = mBuffer[id].board[ib];
1510 if ( jb<0 ) {
1511 if (gi_NumConnect[b] >0 ) {
1512 snprintf (&str[ik], MXSTR, ".");
1513 } else {
1514 snprintf (&str[ik], MXSTR, "x");
1515 }
1516 } else {
1517 snprintf (&str[ik], MXSTR, "%d",jb%10);
1518 }
1519 ik++;
1520 }
1521 snprintf (&str[ik], MXSTR, "'");
1522 factOut (kWarn, 601, str);
1523
1524 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events
1525 gi.evtSkp++;
1526 gi.evtTot++;
1527 gj.evtSkip++;
1528 }
1529 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete'
1530 || evtCtrl.evtStat[k0] == 0) { //'useless'
1531
1532 int id = evtCtrl.evtBuf[k0];
1533 snprintf (str, MXSTR, "%5d free event buffer, nb=%3d",
1534 mBuffer[id].evNum, mBuffer[id].nBoard);
1535 factOut (kDebug, -1, str);
1536 mBufFree (id); //event written--> free memory
1537 evtCtrl.evtStat[k0] = -1;
1538 gj.evtWrite++;
1539 gj.rateWrite++;
1540 } else if (evtCtrl.evtStat[k0] >= 95) {
1541 gj.bufEvt++; //complete event in Buffer
1542 }
1543
1544 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) {
1545 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1546 }
1547 }
1548
1549
1550 gj.deltaT = 1000; //temporary, must be improved
1551
1552 int b;
1553 for (b = 0; b < NBOARDS; b++)
1554 gj.totBytes[b] += gj.rateBytes[b];
1555 gj.totMem = g_maxMem;
1556 if (gj.maxMem > gj.xxxMem)
1557 gj.xxxMem = gj.maxMem;
1558 if (gj.maxEvt > gj.xxxEvt)
1559 gj.xxxEvt = gj.maxEvt;
1560
1561 factStat (gj);
1562 factStatNew (gi);
1563 gj.rateNew = gj.rateWrite = 0;
1564 gj.maxMem = gj.usdMem;
1565 gj.maxEvt = gj.bufTot;
1566 for (b = 0; b < NBOARDS; b++)
1567 gj.rateBytes[b] = 0;
1568 }
1569
1570 if (numok > 0)
1571 numok2 = 0;
1572 else if (numok2++ > 3) {
1573 if (g_runStat == 1) {
1574 xwait.tv_sec = 1;
1575 xwait.tv_nsec = 0; // hibernate for 1 sec
1576 } else {
1577 xwait.tv_sec = 0;
1578 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1579 }
1580 nanosleep (&xwait, NULL);
1581 }
1582
1583 } //and do next loop over all sockets ...
1584
1585
1586 snprintf (str, MXSTR, "stop reading ... RESET=%d", g_reset);
1587 factOut (kInfo, -1, str);
1588
1589 if (g_reset > 0) {
1590 gi_reset = g_reset;
1591 gi_resetR = gi_reset % 10; //shall we stop reading ?
1592 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1593 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1594 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1595 g_reset = 0;
1596 } else {
1597 gi_reset = 0;
1598 if (g_runStat == -1)
1599 gi_resetR = 1;
1600 else
1601 gi_resetR = 7;
1602 gi_resetS = 7; //close all sockets
1603 gi_resetW = 7; //close all files
1604 gi_resetX = 0;
1605
1606 //inform others we have to quit ....
1607 gi_runStat = -11; //inform all that no update to happen any more
1608 gj.readStat = -11; //inform all that no update to happen any more
1609 }
1610
1611 if (gi_resetS > 0) {
1612 //must close all open sockets ...
1613 snprintf (str, MXSTR, "close all sockets ...");
1614 factOut (kInfo, -1, str);
1615 for (i = 0; i < MAX_SOCK; i++) {
1616 if (rd[i].sockStat == 0) {
1617 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1618 if (i % 7 == 0) {
1619// gi_NumConnect[i / 7]--;
1620 gi_NumConnect[i / 7]-= cntsock ;
1621 gi.numConn[i / 7]--;
1622 gj.numConn[i / 7]--;
1623 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1624 rd[i / 7].sockStat = -1; //and try to open asap
1625 }
1626 }
1627 }
1628 }
1629
1630
1631 if (gi_resetR > 0) {
1632 //flag all events as 'read finished'
1633 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1634 if (kd < 0)
1635 kd += (MAX_EVT * MAX_RUN);
1636
1637 int k1 = evtCtrl.frstPtr;
1638 for (k = k1; k < (k1 + kd); k++) {
1639 int k0 = k % (MAX_EVT * MAX_RUN);
1640 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) {
1641 evtCtrl.evtStat[k0] = 91;
1642 gi.evtSkp++;
1643 gi.evtTot++;
1644 }
1645 }
1646
1647 xwait.tv_sec = 0;
1648 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1649 nanosleep (&xwait, NULL);
1650
1651 //and clear all buffers (might have to wait until all others are done)
1652 int minclear;
1653 if (gi_resetR == 1) {
1654 minclear = 900;
1655 snprintf (str, MXSTR, "drain all buffers ...");
1656 } else {
1657 minclear = 0;
1658 snprintf (str, MXSTR, "flush all buffers ...");
1659 }
1660 factOut (kInfo, -1, str);
1661
1662 int numclear = 1;
1663 while (numclear > 0) {
1664 numclear = 0;
1665 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1666 if (kd < 0)
1667 kd += (MAX_EVT * MAX_RUN);
1668
1669 int k1 = evtCtrl.frstPtr;
1670 for (k = k1; k < (k1 + kd); k++) {
1671 int k0 = k % (MAX_EVT * MAX_RUN);
1672 if (evtCtrl.evtStat[k0] > minclear) {
1673 int id = evtCtrl.evtBuf[k0];
1674 snprintf (str, MXSTR, "ev %5d free event buffer, nb=%3d",
1675 mBuffer[id].evNum, mBuffer[id].nBoard);
1676 factOut (kDebug, -1, str);
1677 mBufFree (id); //event written--> free memory
1678 evtCtrl.evtStat[k0] = -1;
1679 } else if (evtCtrl.evtStat[k0] > 0)
1680 numclear++; //writing is still ongoing...
1681
1682 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1683 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1684 }
1685
1686 xwait.tv_sec = 0;
1687 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1688 nanosleep (&xwait, NULL);
1689 }
1690 }
1691
1692 if (gi_reset > 0) {
1693 if (gi_resetW > 0) {
1694 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1695 }
1696 if (gi_resetX > 0) {
1697 xwait.tv_sec = gi_resetX;
1698 xwait.tv_nsec = 0;
1699 nanosleep (&xwait, NULL);
1700 }
1701
1702 snprintf (str, MXSTR, "Continue read Process ...");
1703 factOut (kInfo, -1, str);
1704 gi_reset = 0;
1705 goto START;
1706 }
1707
1708
1709
1710 snprintf (str, MXSTR, "Exit read Process ...");
1711 factOut (kInfo, -1, str);
1712 gi_runStat = -99;
1713 gj.readStat = -99;
1714 factStat (gj);
1715 factStatNew (gi);
1716 return 0;
1717
1718} /*-----------------------------------------------------------------*/
1719
1720
1721void *
1722subProc (void *thrid)
1723{
1724 int threadID, status, numWait, numProc, kd, k1, k0, k, jret;
1725 struct timespec xwait;
1726
1727 int32_t cntr ;
1728
1729 threadID = (int) thrid;
1730
1731 snprintf (str, MXSTR, "Starting sub-process-thread %d", threadID);
1732 factOut (kInfo, -1, str);
1733
1734 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1735 numWait = numProc = 0;
1736 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1737 if (kd < 0)
1738 kd += (MAX_EVT * MAX_RUN);
1739
1740 int k1 = evtCtrl.frstPtr;
1741 for (k = k1; k < (k1 + kd); k++) {
1742 int k0 = k % (MAX_EVT * MAX_RUN);
1743
1744 if (evtCtrl.evtStat[k0] == 1000 + threadID) {
1745 if (gi_resetR > 1) { //we are asked to flush buffers asap
1746 jret = 9100; //flag to be deleted
1747 } else {
1748 int id = evtCtrl.evtBuf[k0];
1749
1750
1751 jret =
1752 subProcEvt (threadID, mBuffer[id].FADhead,
1753 mBuffer[id].fEvent, mBuffer[id].buffer);
1754
1755
1756 if (jret <= threadID) {
1757 snprintf (str, MXSTR,
1758 "process %d wants to send to process %d",
1759 threadID, jret);
1760 factOut (kError, -1, str);
1761 jret = 5300;
1762 } else if (jret <= 0)
1763 jret = 9200 + threadID; //flag as 'to be deleted'
1764 else if (jret >= gi_maxProc)
1765 jret = 5200 + threadID; //flag as 'to be written'
1766 else
1767 jret = 1000 + jret; //flag for next proces
1768 }
1769 evtCtrl.evtStat[k0] = jret;
1770 numProc++;
1771 } else if (evtCtrl.evtStat[k0] < 1000 + threadID)
1772 numWait++;
1773 }
1774
1775 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1776 snprintf (str, MXSTR, "Exit subProcessing Process %d", threadID);
1777 factOut (kInfo, -1, str);
1778 return 0;
1779 }
1780 if (numProc == 0) {
1781 //seems we have nothing to do, so sleep a little
1782 xwait.tv_sec = 0;
1783 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1784 nanosleep (&xwait, NULL);
1785 }
1786 }
1787
1788 snprintf (str, MXSTR, "Ending sub-process-thread %d", threadID);
1789 factOut (kInfo, -1, str);
1790 return;
1791} /*-----------------------------------------------------------------*/
1792
1793
1794void *
1795procEvt (void *ptr)
1796{
1797/* *** main loop processing file, including SW-trigger */
1798 int numProc, numWait;
1799 int k, status, j;
1800 struct timespec xwait;
1801 char str[MXSTR];
1802
1803
1804
1805 int lastRun = 0; //usually run from last event still valid
1806
1807 cpu_set_t mask;
1808 int cpu = 1; //process thread (will be several in final version)
1809
1810 snprintf (str, MXSTR, "Starting process-thread with %d subprocess",
1811 gi_maxProc);
1812 factOut (kInfo, -1, str);
1813
1814/* CPU_ZERO initializes all the bits in the mask to zero. */
1815// CPU_ZERO (&mask);
1816/* CPU_SET sets only the bit corresponding to cpu. */
1817// CPU_SET( 0 , &mask ); leave for system
1818// CPU_SET( 1 , &mask ); used by write process
1819// CPU_SET (2, &mask);
1820// CPU_SET (3, &mask);
1821// CPU_SET (4, &mask);
1822// CPU_SET (5, &mask);
1823// CPU_SET (6, &mask);
1824// CPU_SET( 7 , &mask ); used by read process
1825/* sched_setaffinity returns 0 in success */
1826// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1827// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1828// factOut (kWarn, -1, str);
1829// }
1830
1831
1832 pthread_t thread[100];
1833 int th_ret[100];
1834
1835 for (k = 0; k < gi_maxProc; k++) {
1836 th_ret[k] = pthread_create (&thread[k], NULL, subProc, (void *) k);
1837 }
1838
1839 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1840
1841 numWait = numProc = 0;
1842 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1843 if (kd < 0)
1844 kd += (MAX_EVT * MAX_RUN);
1845
1846 int k1 = evtCtrl.frstPtr;
1847 for (k = k1; k < (k1 + kd); k++) {
1848 int k0 = k % (MAX_EVT * MAX_RUN);
1849//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1850 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) {
1851
1852 if (gi_resetR > 1) { //we are asked to flush buffers asap
1853 evtCtrl.evtStat[k0] = 9991;
1854 } else {
1855
1856//-------- it is better to open the run already here, so call can be used to initialize
1857//-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1858 int id = evtCtrl.evtBuf[k0];
1859 uint32_t irun = mBuffer[id].runNum;
1860 int32_t ievt = mBuffer[id].evNum;
1861 if (runCtrl[lastRun].runId == irun) {
1862 j = lastRun;
1863 } else {
1864 //check which fileID to use (or open if needed)
1865 for (j = 0; j < MAX_RUN; j++) {
1866 if (runCtrl[j].runId == irun)
1867 break;
1868 }
1869 if (j >= MAX_RUN) {
1870 snprintf (str, MXSTR,
1871 "P error: can not find run %d for event %d in %d",
1872 irun, ievt, id);
1873 factOut (kFatal, 901, str);
1874 }
1875 lastRun = j;
1876 }
1877
1878 if (runCtrl[j].fileId < 0) {
1879//---- we need to open a new run ==> make sure all older runs are
1880//---- finished and marked to be closed ....
1881 int j1;
1882 for (j1 = 0; j1 < MAX_RUN; j1++) {
1883 if (runCtrl[j1].fileId == 0) {
1884 runCtrl[j1].procId = 2; //--> do no longer accept events for processing
1885//---- problem: processing still going on ==> must wait for closing ....
1886 snprintf (str, MXSTR,
1887 "P finish run since new one opened %d",
1888 runCtrl[j1].runId);
1889 runFinish1 (runCtrl[j1].runId);
1890 }
1891
1892 }
1893
1894 actRun.Version = 1;
1895 actRun.RunType = -1; //to be adapted
1896
1897 actRun.Nroi = runCtrl[j].roi0;
1898 actRun.NroiTM = runCtrl[j].roi8;
1899 if (actRun.Nroi == actRun.NroiTM)
1900 actRun.NroiTM = 0;
1901 actRun.RunTime = runCtrl[j].firstTime;
1902 actRun.RunUsec = runCtrl[j].firstTime;
1903 actRun.NBoard = NBOARDS;
1904 actRun.NPix = NPIX;
1905 actRun.NTm = NTMARK;
1906 actRun.Nroi = mBuffer[id].nRoi;
1907 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1908 NBOARDS * sizeof (PEVNT_HEADER));
1909
1910 runCtrl[j].fileHd =
1911 runOpen (irun, &actRun, sizeof (actRun));
1912 if (runCtrl[j].fileHd == NULL) {
1913 snprintf (str, MXSTR,
1914 "P could not open a file for run %d", irun);
1915 factOut (kError, 502, str);
1916 runCtrl[j].fileId = 91;
1917 runCtrl[j].procId = 91;
1918 } else {
1919 snprintf (str, MXSTR, "P opened new run_file %d evt %d",
1920 irun, ievt);
1921 factOut (kInfo, -1, str);
1922 runCtrl[j].fileId = 0;
1923 runCtrl[j].procId = 0;
1924 }
1925
1926 }
1927//-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
1928 if (runCtrl[j].procId == 0) {
1929 if (runCtrl[j].closeTime < g_actTime
1930 || runCtrl[j].lastTime < g_actTime - 300
1931 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) {
1932 snprintf (str, MXSTR,
1933 "P reached end of run condition for run %d",
1934 irun);
1935 factOut (kInfo, 502, str);
1936 runFinish1 (runCtrl[j].runId);
1937 runCtrl[j].procId = 1;
1938 }
1939 }
1940 if (runCtrl[j].procId != 0) {
1941 snprintf (str, MXSTR,
1942 "P skip event %d because no active run %d", ievt,
1943 irun);
1944 factOut (kDebug, 502, str);
1945 evtCtrl.evtStat[k0] = 9091;
1946 } else {
1947//--------
1948//--------
1949 id = evtCtrl.evtBuf[k0];
1950 int itevt = mBuffer[id].trgNum;
1951 int itrg = mBuffer[id].trgTyp;
1952 int roi = mBuffer[id].nRoi;
1953 int roiTM = mBuffer[id].nRoiTM;
1954
1955//make sure unused pixels/tmarks are cleared to zero
1956 if (roiTM == roi)
1957 roiTM = 0;
1958 int ip, it, dest, ib;
1959 for (ip = 0; ip < NPIX; ip++) {
1960 if (mBuffer[id].fEvent->StartPix[ip] == -1) {
1961 dest = ip * roi;
1962 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1963 }
1964 }
1965 for (it = 0; it < NTMARK; it++) {
1966 if (mBuffer[id].fEvent->StartTM[it] == -1) {
1967 dest = it * roi + NPIX * roi;
1968 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1969 }
1970 }
1971
1972
1973//and set correct event header ; also check for consistency in event (not yet)
1974 mBuffer[id].fEvent->Roi = roi;
1975 mBuffer[id].fEvent->RoiTM = roiTM;
1976 mBuffer[id].fEvent->EventNum = ievt;
1977 mBuffer[id].fEvent->TriggerNum = itevt;
1978 mBuffer[id].fEvent->TriggerType = itrg;
1979 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
1980 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
1981 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
1982 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
1983 mBuffer[id].fEvent->SoftTrig = 0;
1984
1985
1986 for (ib = 0; ib < NBOARDS; ib++) {
1987 if (mBuffer[id].board[ib] == -1) { //board is not read
1988 mBuffer[id].FADhead[ib].start_package_flag = 0;
1989 mBuffer[id].fEvent->BoardTime[ib] = 0;
1990 } else {
1991 mBuffer[id].fEvent->BoardTime[ib] =
1992 ntohl (mBuffer[id].FADhead[ib].time);
1993 }
1994 }
1995
1996 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
1997 mBuffer[id].fEvent);
1998 gi.procTot++;
1999 numProc++;
2000
2001 if (i < 0) {
2002 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
2003 gi.procErr++;
2004 } else {
2005 evtCtrl.evtStat[k0] = 1000;
2006 runCtrl[j].procEvt++;
2007 }
2008 }
2009 }
2010 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) {
2011 numWait++;
2012 }
2013 }
2014
2015 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2016 snprintf (str, MXSTR, "Exit Processing Process ...");
2017 factOut (kInfo, -1, str);
2018 gp_runStat = -22; //==> we should exit
2019 gj.procStat = -22; //==> we should exit
2020 return 0;
2021 }
2022
2023 if (numProc == 0) {
2024 //seems we have nothing to do, so sleep a little
2025 xwait.tv_sec = 0;
2026 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2027 nanosleep (&xwait, NULL);
2028 }
2029 gp_runStat = gi_runStat;
2030 gj.procStat = gj.readStat;
2031
2032 }
2033
2034 //we are asked to abort asap ==> must flag all remaining events
2035 // when gi_runStat claims that all events are in the buffer...
2036
2037 snprintf (str, MXSTR, "Abort Processing Process ...");
2038 factOut (kInfo, -1, str);
2039 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2040 if (kd < 0)
2041 kd += (MAX_EVT * MAX_RUN);
2042
2043 for (k = 0; k < gi_maxProc; k++) {
2044 pthread_join (thread[k], (void **) &status);
2045 }
2046
2047 int k1 = evtCtrl.frstPtr;
2048 for (k = k1; k < (k1 + kd); k++) {
2049 int k0 = k % (MAX_EVT * MAX_RUN);
2050 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {
2051 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
2052 }
2053 }
2054
2055 gp_runStat = -99;
2056 gj.procStat = -99;
2057
2058 return 0;
2059
2060} /*-----------------------------------------------------------------*/
2061
2062int
2063CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
2064{
2065/* close run runId (all all runs if runId=0) */
2066/* return: 0=close scheduled / >0 already closed / <0 does not exist */
2067 int j;
2068
2069
2070 if (runId == 0) {
2071 for (j = 0; j < MAX_RUN; j++) {
2072 if (runCtrl[j].fileId == 0) { //run is open
2073 runCtrl[j].closeTime = closeTime;
2074 runCtrl[j].maxEvt = maxEvt;
2075 }
2076 }
2077 return 0;
2078 }
2079
2080 for (j = 0; j < MAX_RUN; j++) {
2081 if (runCtrl[j].runId == runId) {
2082 if (runCtrl[j].fileId == 0) { //run is open
2083 runCtrl[j].closeTime = closeTime;
2084 runCtrl[j].maxEvt = maxEvt;
2085 return 0;
2086 } else if (runCtrl[j].fileId < 0) { //run not yet opened
2087 runCtrl[j].closeTime = closeTime;
2088 runCtrl[j].maxEvt = maxEvt;
2089 return +1;
2090 } else { // run already closed
2091 return +2;
2092 }
2093 }
2094 } //we only reach here if the run was never created
2095 return -1;
2096
2097} /*-----------------------------------------------------------------*/
2098
2099
2100void *
2101writeEvt (void *ptr)
2102{
2103/* *** main loop writing event (including opening and closing run-files */
2104
2105 int numWrite, numWait;
2106 int k, j ;
2107 struct timespec xwait;
2108 char str[MXSTR];
2109
2110 cpu_set_t mask;
2111 int cpu = 1; //write thread
2112
2113 snprintf (str, MXSTR, "Starting write-thread");
2114 factOut (kInfo, -1, str);
2115
2116/* CPU_ZERO initializes all the bits in the mask to zero. */
2117// CPU_ZERO (&mask);
2118/* CPU_SET sets only the bit corresponding to cpu. */
2119// CPU_SET (cpu, &mask);
2120/* sched_setaffinity returns 0 in success */
2121// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
2122// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
2123// }
2124
2125 int lastRun = 0; //usually run from last event still valid
2126
2127 while (g_runStat > -2) {
2128
2129 numWait = numWrite = 0;
2130 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2131 if (kd < 0)
2132 kd += (MAX_EVT * MAX_RUN);
2133
2134 int k1 = evtCtrl.frstPtr;
2135 for (k = k1; k < (k1 + kd); k++) {
2136 int k0 = k % (MAX_EVT * MAX_RUN);
2137//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
2138 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) {
2139
2140 if (gi_resetR > 1) { //we must drain the buffer asap
2141 evtCtrl.evtStat[k0] = 9904;
2142 } else {
2143
2144
2145 int id = evtCtrl.evtBuf[k0];
2146 uint32_t irun = mBuffer[id].runNum;
2147 int32_t ievt = mBuffer[id].evNum;
2148
2149 gi.wrtTot++;
2150 if (runCtrl[lastRun].runId == irun) {
2151 j = lastRun;
2152 } else {
2153 //check which fileID to use (or open if needed)
2154 for (j = 0; j < MAX_RUN; j++) {
2155 if (runCtrl[j].runId == irun)
2156 break;
2157 }
2158 if (j >= MAX_RUN) {
2159 snprintf (str, MXSTR,
2160 "W error: can not find run %d for event %d in %d",
2161 irun, ievt, id);
2162 factOut (kFatal, 901, str);
2163 gi.wrtErr++;
2164 }
2165 lastRun = j;
2166 }
2167
2168 if (runCtrl[j].fileId < 0) {
2169 actRun.Version = 1;
2170 actRun.RunType = -1; //to be adapted
2171
2172 actRun.Nroi = runCtrl[j].roi0;
2173 actRun.NroiTM = runCtrl[j].roi8;
2174 if (actRun.Nroi == actRun.NroiTM)
2175 actRun.NroiTM = 0;
2176 actRun.RunTime = runCtrl[j].firstTime;
2177 actRun.RunUsec = runCtrl[j].firstTime;
2178 actRun.NBoard = NBOARDS;
2179 actRun.NPix = NPIX;
2180 actRun.NTm = NTMARK;
2181 actRun.Nroi = mBuffer[id].nRoi;
2182 memcpy (actRun.FADhead, mBuffer[id].FADhead,
2183 NBOARDS * sizeof (PEVNT_HEADER));
2184
2185 runCtrl[j].fileHd =
2186 runOpen (irun, &actRun, sizeof (actRun));
2187 if (runCtrl[j].fileHd == NULL) {
2188 snprintf (str, MXSTR,
2189 "W could not open a file for run %d", irun);
2190 factOut (kError, 502, str);
2191 runCtrl[j].fileId = 91;
2192 } else {
2193 snprintf (str, MXSTR, "W opened new run_file %d evt %d",
2194 irun, ievt);
2195 factOut (kInfo, -1, str);
2196 runCtrl[j].fileId = 0;
2197 }
2198
2199 }
2200
2201 if (runCtrl[j].fileId != 0) {
2202 if (runCtrl[j].fileId < 0) {
2203 snprintf (str, MXSTR,
2204 "W never opened file for this run %d", irun);
2205 factOut (kError, 123, str);
2206 } else if (runCtrl[j].fileId < 100) {
2207 snprintf (str, MXSTR, "W.file for this run is closed %d",
2208 irun);
2209 factOut (kWarn, 123, str);
2210 runCtrl[j].fileId += 100;
2211 } else {
2212 snprintf (str, MXSTR, "W:file for this run is closed %d",
2213 irun);
2214 factOut (kDebug, 123, str);
2215 }
2216 evtCtrl.evtStat[k0] = 9903;
2217 gi.wrtErr++;
2218 } else {
2219// snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id]));
2220// factOut (kInfo, 504, str);
2221 int i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
2222 sizeof (mBuffer[id]));
2223 if (i >= 0) {
2224 runCtrl[j].lastTime = g_actTime;
2225 runCtrl[j].actEvt++;
2226 evtCtrl.evtStat[k0] = 9901;
2227 snprintf (str, MXSTR,
2228 "%5d successfully wrote for run %d id %5d",
2229 ievt, irun, k0);
2230 factOut (kDebug, 504, str);
2231// gj.writEvt++ ;
2232 } else {
2233 snprintf (str, MXSTR, "W error writing event for run %d",
2234 irun);
2235 factOut (kError, 503, str);
2236 evtCtrl.evtStat[k0] = 9902;
2237 gi.wrtErr++;
2238 }
2239
2240 if (i < 0
2241 || runCtrl[j].lastTime < g_actTime - 300
2242 || runCtrl[j].closeTime < g_actTime
2243 || runCtrl[j].maxEvt < runCtrl[j].actEvt) {
2244 int ii = 0;
2245 if (i < 0)
2246 ii = 1;
2247 else if (runCtrl[j].closeTime < g_actTime)
2248 ii = 2;
2249 else if (runCtrl[j].lastTime < g_actTime - 300)
2250 ii = 3;
2251 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2252 ii = 4;
2253
2254
2255
2256 //close run for whatever reason
2257 if (runCtrl[j].runId == gi_myRun)
2258 gi_myRun = g_actTime;
2259
2260 if (runCtrl[j].procId == 0) {
2261 runFinish1 (runCtrl[j].runId);
2262 runCtrl[j].procId = 92;
2263 }
2264
2265 runCtrl[j].closeTime = g_actTime - 1;
2266 i = runClose (runCtrl[j].fileHd, &runTail[j],
2267 sizeof (runTail[j]));
2268 if (i < 0) {
2269 snprintf (str, MXSTR, "error closing run %d %d AAA",
2270 runCtrl[j].runId, i);
2271 factOut (kError, 503, str);
2272 runCtrl[j].fileId = 92;
2273 } else {
2274 snprintf (str, MXSTR, "W closed run %d AAA %d", irun,
2275 ii);
2276 factOut (kInfo, 503, str);
2277 runCtrl[j].fileId = 93;
2278 }
2279 }
2280 }
2281 }
2282 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2283 numWait++;
2284 }
2285
2286 //check if we should close a run (mainly when no event pending)
2287 for (j = 0; j < MAX_RUN; j++) {
2288 if (runCtrl[j].fileId == 0
2289 && (runCtrl[j].closeTime < g_actTime
2290 || runCtrl[j].lastTime < g_actTime - 300
2291 || runCtrl[j].maxEvt <= runCtrl[j].actEvt)) {
2292 if (runCtrl[j].runId == gi_myRun)
2293 gi_myRun = g_actTime;
2294 int ii = 0;
2295 if (runCtrl[j].closeTime < g_actTime)
2296 ii = 2;
2297 else if (runCtrl[j].lastTime < g_actTime - 300)
2298 ii = 3;
2299 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2300 ii = 4;
2301
2302 if (runCtrl[j].procId == 0) {
2303 runFinish1 (runCtrl[j].runId);
2304 runCtrl[j].procId = 92;
2305 }
2306
2307 runCtrl[j].closeTime = g_actTime - 1;
2308 int i = runClose (runCtrl[j].fileHd, &runTail[j],
2309 sizeof (runTail[j]));
2310 if (i < 0) {
2311 snprintf (str, MXSTR, "error closing run %d %d BBB",
2312 runCtrl[j].runId, i);
2313 factOut (kError, 506, str);
2314 runCtrl[j].fileId = 94;
2315 } else {
2316 snprintf (str, MXSTR, "W closed run %d BBB %d",
2317 runCtrl[j].runId, ii);
2318 factOut (kInfo, 507, str);
2319 runCtrl[j].fileId = 95;
2320 }
2321 }
2322 }
2323
2324 if (numWrite == 0) {
2325 //seems we have nothing to do, so sleep a little
2326 xwait.tv_sec = 0;
2327 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2328 nanosleep (&xwait, NULL);
2329 }
2330
2331 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2332 snprintf (str, MXSTR, "Finish Write Process ...");
2333 factOut (kInfo, -1, str);
2334 gw_runStat = -22; //==> we should exit
2335 gj.writStat = -22; //==> we should exit
2336 goto closerun;
2337 }
2338 gw_runStat = gi_runStat;
2339 gj.writStat = gj.readStat;
2340
2341 }
2342
2343 //must close all open files ....
2344 snprintf (str, MXSTR, "Abort Writing Process ...");
2345 factOut (kInfo, -1, str);
2346
2347 closerun:
2348 snprintf (str, MXSTR, "Close all open files ...");
2349 factOut (kInfo, -1, str);
2350 for (j = 0; j < MAX_RUN; j++)
2351 if (runCtrl[j].fileId == 0) {
2352 if (runCtrl[j].runId == gi_myRun)
2353 gi_myRun = g_actTime;
2354
2355 if (runCtrl[j].procId == 0) {
2356 runFinish1 (runCtrl[j].runId);
2357 runCtrl[j].procId = 92;
2358 }
2359
2360 runCtrl[j].closeTime = g_actTime - 1;
2361 int i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));
2362 int ii = 0;
2363 if (runCtrl[j].closeTime < g_actTime)
2364 ii = 2;
2365 else if (runCtrl[j].lastTime < g_actTime - 300)
2366 ii = 3;
2367 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2368 ii = 4;
2369 if (i < 0) {
2370 snprintf (str, MXSTR, "error closing run %d %d CCC",
2371 runCtrl[j].runId, i);
2372 factOut (kError, 506, str);
2373 runCtrl[j].fileId = 96;
2374 } else {
2375 snprintf (str, MXSTR, "W closed run %d CCC %d", runCtrl[j].runId,
2376 ii);
2377 factOut (kInfo, 507, str);
2378 runCtrl[j].fileId = 97;
2379 }
2380 }
2381
2382 gw_runStat = -99;
2383 gj.writStat = -99;
2384 snprintf (str, MXSTR, "Exit Writing Process ...");
2385 factOut (kInfo, -1, str);
2386 return 0;
2387
2388
2389
2390
2391} /*-----------------------------------------------------------------*/
2392
2393
2394
2395
2396void
2397StartEvtBuild ()
2398{
2399
2400 int i, j, imax, status, th_ret[50];
2401 pthread_t thread[50];
2402 struct timespec xwait;
2403
2404 gi_runStat = gp_runStat = gw_runStat = 0;
2405 gj.readStat = gj.procStat = gj.writStat = 0;
2406
2407 snprintf (str, MXSTR, "Starting EventBuilder V15.07 A");
2408 factOut (kInfo, -1, str);
2409
2410//initialize run control logics
2411 for (i = 0; i < MAX_RUN; i++) {
2412 runCtrl[i].runId = 0;
2413 runCtrl[i].fileId = -2;
2414 }
2415
2416//prepare for subProcesses
2417 gi_maxSize = g_maxSize;
2418 if (gi_maxSize <= 0)
2419 gi_maxSize = 1;
2420
2421 gi_maxProc = g_maxProc;
2422 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2423 snprintf (str, MXSTR, "illegal number of processes %d", gi_maxProc);
2424 factOut (kFatal, 301, str);
2425 gi_maxProc = 1;
2426 }
2427//partially initialize event control logics
2428 evtCtrl.frstPtr = 0;
2429 evtCtrl.lastPtr = 0;
2430
2431//start all threads (more to come) when we are allowed to ....
2432 while (g_runStat == 0) {
2433 xwait.tv_sec = 0;
2434 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2435 nanosleep (&xwait, NULL);
2436 }
2437
2438 i = 0;
2439 th_ret[i] = pthread_create (&thread[i], NULL, readFAD, NULL);
2440 i++;
2441 th_ret[i] = pthread_create (&thread[i], NULL, procEvt, NULL);
2442 i++;
2443 th_ret[i] = pthread_create (&thread[i], NULL, writeEvt, NULL);
2444 i++;
2445 imax = i;
2446
2447
2448#ifdef BILAND
2449 xwait.tv_sec = 30;;
2450 xwait.tv_nsec = 0; // sleep for ~20sec
2451 nanosleep (&xwait, NULL);
2452
2453 printf ("close all runs in 2 seconds\n");
2454
2455 CloseRunFile (0, time (NULL) + 2, 0);
2456
2457 xwait.tv_sec = 1;;
2458 xwait.tv_nsec = 0; // sleep for ~20sec
2459 nanosleep (&xwait, NULL);
2460
2461 printf ("setting g_runstat to -1\n");
2462
2463 g_runStat = -1;
2464#endif
2465
2466
2467//wait for all threads to finish
2468 for (i = 0; i < imax; i++) {
2469 j = pthread_join (thread[i], (void **) &status);
2470 }
2471
2472} /*-----------------------------------------------------------------*/
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488 /*-----------------------------------------------------------------*/
2489 /*-----------------------------------------------------------------*/
2490 /*-----------------------------------------------------------------*/
2491 /*-----------------------------------------------------------------*/
2492 /*-----------------------------------------------------------------*/
2493
2494#ifdef BILAND
2495
2496int
2497subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2498 int8_t * buffer)
2499{
2500 printf ("called subproc %d\n", threadID);
2501 return threadID + 1;
2502}
2503
2504
2505
2506
2507 /*-----------------------------------------------------------------*/
2508 /*-----------------------------------------------------------------*/
2509 /*-----------------------------------------------------------------*/
2510 /*-----------------------------------------------------------------*/
2511 /*-----------------------------------------------------------------*/
2512
2513
2514
2515
2516FileHandle_t
2517runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2518{
2519 return 1;
2520};
2521
2522int
2523runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2524{
2525 return 1;
2526 usleep (10000);
2527 return 1;
2528}
2529
2530
2531//{ return 1; } ;
2532
2533int
2534runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2535{
2536 return 1;
2537};
2538
2539
2540
2541
2542int
2543eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2544{
2545 int i = 0;
2546
2547// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2548// for (i=0; i<NBOARDS; i++) {
2549// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2550// }
2551 return 0;
2552}
2553
2554
2555void
2556factStatNew (EVT_STAT gi)
2557{
2558 int i;
2559
2560//for (i=0;i<MAX_SOCK;i++) {
2561// printf("%4d",gi.numRead[i]);
2562// if (i%20 == 0 ) printf("\n");
2563//}
2564}
2565
2566void
2567gotNewRun (int runnr, PEVNT_HEADER * headers)
2568{
2569 printf ("got new run %d\n", runnr);
2570 return;
2571}
2572
2573void
2574factStat (GUI_STAT gj)
2575{
2576// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2577// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2578}
2579
2580
2581void
2582debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2583 int state, uint32_t tsec, uint32_t tusec)
2584{
2585// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2586}
2587
2588
2589
2590void
2591debugStream (int isock, void *buf, int len)
2592{
2593}
2594
2595void
2596debugHead (int i, int j, void *buf)
2597{
2598}
2599
2600
2601void
2602factOut (int severity, int err, char *message)
2603{
2604 static FILE *fd;
2605 static int file = 0;
2606
2607 if (file == 0) {
2608 printf ("open file\n");
2609 fd = fopen ("x.out", "w+");
2610 file = 999;
2611 }
2612
2613 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2614
2615 if (severity != kDebug)
2616 printf ("%3d %3d | %s\n", severity, err, message);
2617}
2618
2619
2620
2621int
2622main ()
2623{
2624 int i, b, c, p;
2625 char ipStr[100];
2626 struct in_addr IPaddr;
2627
2628 g_maxMem = 1024 * 1024; //MBytes
2629//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2630 g_maxMem = g_maxMem * 200; //100MBytes
2631
2632 g_maxProc = 20;
2633 g_maxSize = 30000;
2634
2635 g_runStat = 40;
2636
2637 i = 0;
2638
2639// version for standard crates
2640//for (c=0; c<4,c++) {
2641// for (b=0; b<10; b++) {
2642// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2643//
2644// inet_pton(PF_INET, ipStr, &IPaddr) ;
2645//
2646// g_port[i].sockAddr.sin_family = PF_INET;
2647// g_port[i].sockAddr.sin_port = htons(5000) ;
2648// g_port[i].sockAddr.sin_addr = IPaddr ;
2649// g_port[i].sockDef = 1 ;
2650// i++ ;
2651// }
2652//}
2653//
2654//version for PC-test *
2655 for (c = 0; c < 4; c++) {
2656 for (b = 0; b < 10; b++) {
2657 sprintf (ipStr, "10.0.%d.11", 128 + c);
2658 if (c < 2)
2659 sprintf (ipStr, "10.0.%d.11", 128);
2660 else
2661 sprintf (ipStr, "10.0.%d.11", 131);
2662// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2663
2664 inet_pton (PF_INET, ipStr, &IPaddr);
2665 p = 31919 + 100 * c + 10 * b;
2666
2667
2668 g_port[i].sockAddr.sin_family = PF_INET;
2669 g_port[i].sockAddr.sin_port = htons (p);
2670 g_port[i].sockAddr.sin_addr = IPaddr;
2671 g_port[i].sockDef = 1;
2672
2673 i++;
2674 }
2675 }
2676
2677
2678//g_port[17].sockDef =-1 ;
2679//g_actBoards-- ;
2680
2681 StartEvtBuild ();
2682
2683 return 0;
2684
2685}
2686#endif
Note: See TracBrowser for help on using the repository browser.