source: trunk/FACT++/src/EventBuilder.c@ 12452

Last change on this file since 12452 was 12452, checked in by lyard, 13 years ago
set unused TM channels mem to zero
File size: 86.6 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6
7#include <stdlib.h>
8#include <stdint.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include "EventBuilder.h"
26
27enum Severity
28{
29 kMessage = 10, ///< Just a message, usually obsolete
30 kInfo = 20, ///< An info telling something which can be interesting to know
31 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
32 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
33 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
34 kDebug = 99, ///< A message used for debugging only
35};
36
37#define MIN_LEN 32 // min #bytes needed to interpret FADheader
38#define MAX_LEN 256*1024 // size of read-buffer per socket
39
40//#define nanosleep(x,y)
41
42extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
43extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
44extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
45//extern int runFinish (uint32_t runnr);
46
47extern void factOut (int severity, int err, char *message);
48
49extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
50
51
52extern void factStat (GUI_STAT gj);
53
54extern void factStatNew (EVT_STAT gi);
55
56extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
57
58extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
59 int8_t * buffer);
60
61extern void debugHead (int i, int j, void *buf);
62
63extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
64 int32_t runnr, int state, uint32_t tsec,
65 uint32_t tusec);
66extern void debugStream (int isock, void *buf, int len);
67
68int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
69
70
71
72
73
74int g_maxProc;
75int g_maxSize;
76int gi_maxSize;
77int gi_maxProc;
78
79uint g_actTime;
80uint g_actUsec;
81int g_runStat;
82int g_reset;
83int g_useFTM;
84
85int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
86size_t g_maxMem; //maximum memory allowed for buffer
87
88//no longer needed ...
89int g_maxBoards; //maximum number of boards to be initialized
90int g_actBoards;
91//
92
93FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
94
95
96int gi_runStat;
97int gp_runStat;
98int gw_runStat;
99
100int gi_memStat = +1;
101
102uint32_t gi_myRun = 0;
103uint32_t actrun = 0;
104
105
106uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
107
108//uint gi_EvtStart= 0 ;
109//uint gi_EvtRead = 0 ;
110//uint gi_EvtBad = 0 ;
111//uint gi_EvtTot = 0 ;
112//size_t gi_usedMem = 0 ;
113
114//uint gw_EvtTot = 0 ;
115//uint gp_EvtTot = 0 ;
116
117PIX_MAP g_pixMap[NPIX];
118
119EVT_STAT gi;
120GUI_STAT gj;
121
122EVT_CTRL evtCtrl; //control of events during processing
123int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
124
125WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
126
127#define MXSTR 1000
128char str[MXSTR];
129
130//ETIENNE
131#define MAX_SLOTS_PER_CHUNK 100
132
133typedef struct {
134 int32_t eventNumber;
135 int32_t chunk;
136 int32_t slot;
137} CHUNK_MAPPING;
138
139CHUNK_MAPPING mBufferMapping[MAX_EVT * MAX_RUN];
140
141#define MAX_EVT_MEM (sizeof(EVENT) + NPIX*1024*2 + NTMARK*1024*2)
142#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
143#define MAX_SLOT_SIZE (MAX_EVT_MEM + MAX_HEAD_MEM)
144#define MAX_CHUNK_SIZE (MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK)
145
146typedef struct {
147 void * pointers[MAX_SLOTS_PER_CHUNK];
148 int32_t events[MAX_SLOTS_PER_CHUNK];
149 int32_t nFreeSlots;
150 int32_t nSlots;
151} ETI_CHUNK;
152
153int32_t numAllocatedChunks = 0;
154
155#define MAX_CHUNKS 8096
156ETI_CHUNK EtiMemoryChunks[MAX_CHUNKS];
157
158void* ETI_Malloc(int evtId, int evtIndex)
159{
160 int i,j;
161 for (i=0;i<numAllocatedChunks;i++) {
162 if (EtiMemoryChunks[i].nFreeSlots > 0) {
163 for (j=0;j<EtiMemoryChunks[i].nSlots;j++)
164 {
165 if (EtiMemoryChunks[i].events[j] == -1)
166 {
167 EtiMemoryChunks[i].events[j] = evtId;
168 EtiMemoryChunks[i].nFreeSlots--;
169 mBufferMapping[evtIndex].eventNumber = evtId;
170 mBufferMapping[evtIndex].chunk = i;
171 mBufferMapping[evtIndex].slot = j;
172 return EtiMemoryChunks[i].pointers[j];
173 }
174 }
175 //If I reach this point then we have a problem because it should have found
176 //a free spot just above.
177 }
178 }
179 //If we reach this point this means that we should allocate more memory
180 int32_t numNewSlots = 0;
181 if ((numAllocatedChunks + 1)*MAX_CHUNK_SIZE < g_maxMem)
182 numNewSlots = MAX_SLOTS_PER_CHUNK;
183 else
184 numNewSlots = (g_maxMem - numAllocatedChunks*MAX_CHUNK_SIZE)/MAX_SLOT_SIZE;
185
186 if (numNewSlots == 0)//cannot allocate more without exceeding the max mem limit
187 {
188 return NULL;
189 }
190
191 EtiMemoryChunks[numAllocatedChunks].pointers[0] = malloc(MAX_SLOT_SIZE*numNewSlots);
192 if (EtiMemoryChunks[numAllocatedChunks].pointers[0] == NULL)
193 {
194 snprintf (str, MXSTR, "could not allocate %lu bytes. %d chunks are currently allocated (max allowed %lu bytes)", MAX_CHUNK_SIZE, numAllocatedChunks, g_maxMem);
195 factOut (kError, 000, str);
196 return NULL;
197 }
198
199 EtiMemoryChunks[numAllocatedChunks].nSlots = numNewSlots;
200 EtiMemoryChunks[numAllocatedChunks].events[0] = evtId;
201 EtiMemoryChunks[numAllocatedChunks].nFreeSlots = numNewSlots-1;
202 mBufferMapping[evtIndex].eventNumber = evtId;
203 mBufferMapping[evtIndex].chunk = numAllocatedChunks;
204 mBufferMapping[evtIndex].slot = 0;
205
206 for (i=1;i<numNewSlots;i++)
207 {
208 EtiMemoryChunks[numAllocatedChunks].pointers[i] = &(((char*)(EtiMemoryChunks[numAllocatedChunks].pointers[i-1]))[MAX_SLOT_SIZE]);
209 EtiMemoryChunks[numAllocatedChunks].events[i] = -1;
210 }
211 numAllocatedChunks++;
212
213 return EtiMemoryChunks[numAllocatedChunks-1].pointers[0];
214}
215
216void ETI_Free(int evtId, int evtIndex)
217{
218 ETI_CHUNK* currentChunk = &EtiMemoryChunks[mBufferMapping[evtIndex].chunk];
219 if (currentChunk->events[mBufferMapping[evtIndex].slot] != evtId)
220 {
221 snprintf (str, MXSTR, "Mismatch in chunk mapping table. expected evtId %d. Got %d. No memory was freed.", evtId, currentChunk->events[mBufferMapping[evtIndex].slot]);
222 factOut (kError, 000, str);
223 return;
224 }
225 currentChunk->events[mBufferMapping[evtIndex].slot] = -1;
226 currentChunk->nFreeSlots++;
227
228 int chunkIndex = mBufferMapping[evtIndex].chunk;
229 if (chunkIndex != numAllocatedChunks-1)
230 return;
231
232 while (EtiMemoryChunks[chunkIndex].nFreeSlots == EtiMemoryChunks[chunkIndex].nSlots)
233 {//free this chunk
234 free(EtiMemoryChunks[chunkIndex].pointers[0]);
235 numAllocatedChunks--;
236 chunkIndex--;
237 if (numAllocatedChunks == 0)
238 break;
239 }
240}
241//END ETIENNE
242
243
244
245RUN_HEAD actRun;
246
247RUN_CTRL runCtrl[MAX_RUN];
248
249RUN_TAIL runTail[MAX_RUN];
250
251
252/*
253*** Definition of rdBuffer to read in IP packets; keep it global !!!!
254 */
255
256
257typedef union
258{
259 int8_t B[MAX_LEN];
260 int16_t S[MAX_LEN / 2];
261 int32_t I[MAX_LEN / 4];
262 int64_t L[MAX_LEN / 8];
263} CNV_FACT;
264
265typedef struct
266{
267 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
268 int32_t bufPos; //next byte to read to the buffer next
269 int32_t bufLen; //number of bytes left to read
270// size_t bufLen; //number of bytes left to read size_t might be better
271 int32_t skip; //number of bytes skipped before start of event
272
273 int errCnt; //how often connect failed since last successful
274 int sockStat; //-1 if socket not yet connected , 99 if not exist
275 int socket; //contains the sockets
276 struct sockaddr_in SockAddr; //IP for each socket
277
278 int evtID; // event ID of event currently read
279 int runID; // run "
280 int ftmID; // event ID from FTM
281 uint fadLen; // FADlength of event currently read
282 int fadVers; // Version of FAD
283 int ftmTyp; // trigger type
284 int board; // boardID (softwareID: 0..40 )
285 int Port;
286
287 CNV_FACT *rBuf;
288
289#ifdef EVTDEBUG
290 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
291#endif
292
293} READ_STRUCT;
294
295
296typedef union
297{
298 int8_t B[2];
299 int16_t S;
300} SHORT_BYTE;
301
302
303
304
305
306SHORT_BYTE start, stop;
307
308READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
309
310
311
312/*-----------------------------------------------------------------*/
313
314
315/*-----------------------------------------------------------------*/
316
317int
318runFinish1 (uint32_t runnr)
319{
320 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
321 runnr);
322 factOut (kInfo, 173, str); //but continue anyhow
323 return 0;
324}
325int
326runFinish (uint32_t runnr)
327{
328 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
329 runnr);
330 factOut (kInfo, 173, str); //but continue anyhow
331 return 0;
332}
333
334int
335GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
336 READ_STRUCT * rd)
337{
338/*
339*** generate Address, create sockets and allocates readbuffer for it
340***
341*** if flag==0 generate socket and buffer
342*** <0 destroy socket and buffer
343*** >0 close and redo socket
344***
345*** sid : board*7 + port id
346 */
347
348 int j;
349 int optval = 1; //activate keepalive
350 socklen_t optlen = sizeof (optval);
351
352
353 if (sid % 7 >= NUMSOCK) {
354 //this is a not used socket, so do nothing ...
355 rd->sockStat = 77;
356 rd->rBuf = NULL ;
357 return 0;
358 }
359
360 if (rd->sockStat == 0) { //close socket if open
361 j = close (rd->socket);
362 if (j > 0) {
363 snprintf (str, MXSTR, "Error closing socket %d | %m", sid);
364 factOut (kFatal, 771, str);
365 } else {
366 snprintf (str, MXSTR, "Succesfully closed socket %d", sid);
367 factOut (kInfo, 771, str);
368 }
369 }
370
371 rd->sockStat = 99;
372
373 if (flag < 0) {
374 free (rd->rBuf); //and never open again
375#ifdef EVTDEBUG
376 free (rd->xBuf); //and never open again
377#endif
378 rd->rBuf = NULL;
379 return 0;
380 }
381
382
383 if (flag == 0) { //generate address and buffer ...
384 rd->Port = port;
385 rd->SockAddr.sin_family = sockAddr->sin_family;
386 rd->SockAddr.sin_port = htons (port);
387 rd->SockAddr.sin_addr = sockAddr->sin_addr;
388
389#ifdef EVTDEBUG
390 rd->xBuf = malloc (sizeof (CNV_FACT));
391#endif
392 rd->rBuf = malloc (sizeof (CNV_FACT));
393 if (rd->rBuf == NULL) {
394 snprintf (str, MXSTR, "Could not create local buffer %d", sid);
395 factOut (kFatal, 774, str);
396 rd->sockStat = 77;
397 return -3;
398 }
399 }
400
401
402 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
403 snprintf (str, MXSTR, "Could not generate socket %d | %m", sid);
404 factOut (kFatal, 773, str);
405 rd->sockStat = 88;
406 return -2;
407 }
408 optval = 1;
409 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
410 snprintf (str, MXSTR, "Could not set keepalive %d | %m", sid);
411 factOut (kInfo, 173, str); //but continue anyhow
412 }
413 optval = 10; //start after 10 seconds
414 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
415 snprintf (str, MXSTR, "Could not set keepidle %d | %m", sid);
416 factOut (kInfo, 173, str); //but continue anyhow
417 }
418 optval = 10; //do every 10 seconds
419 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
420 snprintf (str, MXSTR, "Could not set keepintvl %d | %m", sid);
421 factOut (kInfo, 173, str); //but continue anyhow
422 }
423 optval = 2; //close after 2 unsuccessful tries
424 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
425 snprintf (str, MXSTR, "Could not set keepalive probes %d | %m", sid);
426 factOut (kInfo, 173, str); //but continue anyhow
427 }
428
429
430 snprintf (str, MXSTR, "Successfully generated socket %d ", sid);
431 factOut (kInfo, 773, str);
432 rd->sockStat = -1; //try to (re)open socket
433 rd->errCnt = 0;
434 return 0;
435
436} /*-----------------------------------------------------------------*/
437
438 /*-----------------------------------------------------------------*/
439
440
441
442
443int
444mBufInit ()
445{
446// initialize mBuffer (mark all entries as unused\empty)
447
448 int i;
449 uint32_t actime;
450
451 actime = g_actTime + 50000000;
452
453 for (i = 0; i < MAX_EVT * MAX_RUN; i++) {
454 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
455 mBuffer[i].runNum = 0;
456
457 evtCtrl.evtBuf[i] = -1;
458 evtCtrl.evtStat[i] = -1;
459 evtCtrl.pcTime[i] = actime; //initiate to far future
460
461 //ETIENNE
462 mBufferMapping[i].chunk = -1;
463 mBufferMapping[i].eventNumber = -1;
464 mBufferMapping[i].slot = -1;
465 //END ETIENNE
466 }
467
468 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
469
470 evtCtrl.frstPtr = 0;
471 evtCtrl.lastPtr = 0;
472
473 return 0;
474
475} /*-----------------------------------------------------------------*/
476
477
478
479
480int
481mBufEvt (int evID, uint runID, int nRoi[], int sk,
482 int fadlen, int trgTyp, int trgNum, int fadNum)
483{
484// generate a new Event into mBuffer:
485// make sure only complete Event are possible, so 'free' will always work
486// returns index into mBuffer[], or negative value in case of error
487// error: <-9000 if roi screwed up (not consistent with run)
488// <-8000 (not consistent with event)
489// <-7000 (not consistent with board)
490// < 0 if no space left
491
492 struct timeval *tv, atv;
493 tv = &atv;
494 uint32_t tsec, tusec;
495 uint oldest;
496 int jold;
497
498 int i, k, jr, b, evFree;
499 int headmem = 0;
500 size_t needmem = 0;
501
502
503 b = sk / 7;
504
505 if (nRoi[0] < 0 || nRoi[0] > 1024) {
506 snprintf (str, MXSTR, "illegal nRoi[0]: %d", nRoi[0]);
507 factOut (kError, 999, str);
508 gj.badRoiR++;
509 gj.badRoi[b]++;
510 return -9999;
511 }
512
513 for (jr = 1; jr < 8; jr++) {
514 if (nRoi[jr] != nRoi[0]) {
515 snprintf (str, MXSTR, "wrong nRoi[%d]: %d %d", jr, nRoi[jr],
516 nRoi[0]);
517 factOut (kError, 711, str);
518 gj.badRoiB++;
519 gj.badRoi[b]++;
520 return -7101;
521 }
522 }
523 if (nRoi[8] < nRoi[0]) {
524 snprintf (str, MXSTR, "wrong nRoi_TM: %d %d", nRoi[8], nRoi[0]);
525 factOut (kError, 712, str);
526 gj.badRoiB++;
527 gj.badRoi[b]++;
528 return -7102;
529 }
530
531
532 i = evID % MAX_EVT;
533 evFree = -1;
534
535 for (k = 0; k < MAX_RUN; k++) {
536 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
537 // is it ok ????
538 if (mBuffer[i].nRoi != nRoi[0]
539 || mBuffer[i].nRoiTM != nRoi[8]) {
540 snprintf (str, MXSTR, "illegal evt_roi %d %d ; %d %d",
541 nRoi[0], nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM);
542 factOut (kError, 821, str);
543 gj.badRoiE++;
544 gj.badRoi[b]++;
545 return -8201;
546 }
547// count for inconsistencies
548
549 if (mBuffer[i].trgNum != trgNum)
550 mBuffer[i].Errors[0]++;
551 if (mBuffer[i].fadNum != fadNum)
552 mBuffer[i].Errors[1]++;
553 if (mBuffer[i].trgTyp != trgTyp)
554 mBuffer[i].Errors[2]++;
555
556 //everything seems fine so far ==> use this slot ....
557 return i;
558 }
559 if (evFree < 0 && mBuffer[i].evNum < 0)
560 evFree = i;
561 i += MAX_EVT;
562 }
563
564
565 //event does not yet exist; create it
566 if (evFree < 0) { //no space available in ctrl
567 snprintf (str, MXSTR, "no control slot to keep event %d", evID);
568 factOut (kError, 881, str);
569 return -1;
570 }
571 i = evFree; //found free entry; use it ...
572
573 gettimeofday (tv, NULL);
574 tsec = atv.tv_sec;
575 tusec = atv.tv_usec;
576
577 //check if runId already registered in runCtrl
578 evFree = -1;
579 oldest = g_actTime + 1000;
580 jold = -1;
581 for (k = 0; k < MAX_RUN; k++) {
582 if (runCtrl[k].runId == runID) {
583// if (runCtrl[k].procId > 0) { //run is closed -> reject
584// snprintf (str, MXSTR, "skip event since run %d finished", runID);
585// factOut (kInfo, 931, str);
586// return -21;
587// }
588
589 if (runCtrl[k].roi0 != nRoi[0]
590 || runCtrl[k].roi8 != nRoi[8]) {
591 snprintf (str, MXSTR, "illegal run_roi %d %d ; %d %d",
592 nRoi[0], nRoi[8], runCtrl[k].roi0, runCtrl[k].roi8);
593 factOut (kError, 931, str);
594 gj.badRoiR++;
595 gj.badRoi[b]++;
596 return -9301;
597 }
598 goto RUNFOUND;
599 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
600 evFree = k;
601 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
602 if (runCtrl[k].closeTime < oldest) {
603 oldest = runCtrl[k].closeTime;
604 jold = k;
605 }
606 }
607 }
608
609 if (evFree < 0 && jold < 0) {
610 snprintf (str, MXSTR, "not able to register the new run %d", runID);
611 factOut (kFatal, 883, str);
612 return -1001;
613 } else {
614 if (evFree < 0)
615 evFree = jold;
616 snprintf (str, MXSTR, "register new run %d(%d) roi: %d %d", runID,
617 evFree, nRoi[0], nRoi[8]);
618 factOut (kInfo, 503, str);
619 runCtrl[evFree].runId = runID;
620 runCtrl[evFree].roi0 = nRoi[0];
621 runCtrl[evFree].roi8 = nRoi[8];
622 runCtrl[evFree].fileId = -2;
623 runCtrl[evFree].procId = -2;
624 runCtrl[evFree].lastEvt = -1;
625 runCtrl[evFree].nextEvt = 0;
626 runCtrl[evFree].actEvt = 0;
627 runCtrl[evFree].procEvt = 0;
628 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
629 runCtrl[evFree].firstUsec = tusec;
630 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
631 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
632// runCtrl[evFree].lastTime = 0;
633
634 runTail[evFree].nEventsOk =
635 runTail[evFree].nEventsRej =
636 runTail[evFree].nEventsBad =
637 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
638 }
639
640 RUNFOUND:
641 //ETIENNE
642/* needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
643
644 headmem = NBOARDS * sizeof (PEVNT_HEADER);
645
646 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
647 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
648 if (gi_memStat > 0) {
649 gi_memStat = -99;
650 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
651 evID, sk);
652 factOut (kError, 882, str);
653 } else {
654 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
655 evID, sk);
656 factOut (kDebug, 882, str);
657 }
658 return -11;
659 }
660*/
661 mBuffer[i].FADhead = ETI_Malloc(evID, i);
662 mBuffer[i].buffer = NULL;
663 if (mBuffer[i].FADhead != NULL)
664 mBuffer[i].fEvent = (EVENT*)&(((char*)(mBuffer[i].FADhead))[MAX_HEAD_MEM]);
665 else
666 {
667 mBuffer[i].fEvent = NULL;
668 gj.usdMem = 0;
669 for (k=0;k<numAllocatedChunks;k++)
670 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
671 if (gj.usdMem > gj.maxMem)
672 gj.maxMem = gj.usdMem;
673 if (gi_memStat > 0) {
674 gi_memStat = -99;
675 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
676 evID, sk);
677 factOut (kError, 882, str);
678 } else {
679 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
680 evID, sk);
681 factOut (kDebug, 882, str);
682 }
683 return -11;
684 }
685 /*
686 mBuffer[i].FADhead = malloc (headmem);
687 if (mBuffer[i].FADhead == NULL) {
688 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
689 factOut (kError, 882, str);
690 return -12;
691 }
692
693 mBuffer[i].fEvent = malloc (needmem);
694 if (mBuffer[i].fEvent == NULL) {
695 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
696 factOut (kError, 882, str);
697 free (mBuffer[i].FADhead);
698 mBuffer[i].FADhead = NULL;
699 return -22;
700 }
701
702 mBuffer[i].buffer = malloc (gi_maxSize);
703 if (mBuffer[i].buffer == NULL) {
704 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
705 factOut (kError, 882, str);
706 free (mBuffer[i].FADhead);
707 mBuffer[i].FADhead = NULL;
708 free (mBuffer[i].fEvent);
709 mBuffer[i].fEvent = NULL;
710 return -32;
711 }*/
712 //END ETIENNE
713 //flag all boards as unused
714 mBuffer[i].nBoard = 0;
715 for (k = 0; k < NBOARDS; k++) {
716 mBuffer[i].board[k] = -1;
717 }
718 //flag all pixels as unused
719 for (k = 0; k < NPIX; k++) {
720 mBuffer[i].fEvent->StartPix[k] = -1;
721 }
722 //flag all TMark as unused
723 for (k = 0; k < NTMARK; k++) {
724 mBuffer[i].fEvent->StartTM[k] = -1;
725 }
726
727 mBuffer[i].fEvent->NumBoards = 0;
728 mBuffer[i].fEvent->PCUsec = tusec;
729 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
730 mBuffer[i].nRoi = nRoi[0];
731 mBuffer[i].nRoiTM = nRoi[8];
732 mBuffer[i].evNum = evID;
733 mBuffer[i].runNum = runID;
734 mBuffer[i].fadNum = fadNum;
735 mBuffer[i].trgNum = trgNum;
736 mBuffer[i].trgTyp = trgTyp;
737 mBuffer[i].evtLen = needmem;
738 mBuffer[i].Errors[0] =
739 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
740//ETIENNE
741 //gj.usdMem += needmem + headmem + gi_maxSize;
742 gj.usdMem = 0;
743 for (k=0;k<numAllocatedChunks;k++)
744 {
745 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
746 }
747//END ETIENNE
748 if (gj.usdMem > gj.maxMem)
749 gj.maxMem = gj.usdMem;
750
751 gj.bufTot++;
752 if (gj.bufTot > gj.maxEvt)
753 gj.maxEvt = gj.bufTot;
754
755 gj.rateNew++;
756
757 //register event in 'active list (reading)'
758
759 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
760 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
761 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
762 evtIdx[i] = evtCtrl.lastPtr;
763
764
765 snprintf (str, MXSTR,
766 "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", evID,
767 runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
768 factOut (kDebug, -11, str);
769 evtCtrl.lastPtr++;
770 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN)
771 evtCtrl.lastPtr = 0;
772
773 gi.evtGet++;
774
775 return i;
776
777} /*-----------------------------------------------------------------*/
778
779
780
781
782int
783mBufFree (int i)
784{
785//delete entry [i] from mBuffer:
786//(and make sure multiple calls do no harm ....)
787
788 int headmem = 0;
789 int evid;
790 size_t freemem = 0;
791
792 evid = mBuffer[i].evNum;
793 freemem = mBuffer[i].evtLen;
794 //ETIENNE
795 ETI_Free(evid, i);
796// free (mBuffer[i].fEvent);
797 mBuffer[i].fEvent = NULL;
798
799// free (mBuffer[i].FADhead);
800 mBuffer[i].FADhead = NULL;
801
802// free (mBuffer[i].buffer);
803 mBuffer[i].buffer = NULL;
804//END ETIENNE
805 headmem = NBOARDS * sizeof (PEVNT_HEADER);
806 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
807 mBuffer[i].runNum = 0;
808//ETIENNE
809// gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
810 gj.usdMem = 0;
811 int k;
812 for (k=0;k<numAllocatedChunks;k++)
813 {
814 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
815 }
816//END ETIENNE
817 gj.bufTot--;
818
819 if (gi_memStat < 0) {
820 if (gj.usdMem <= 0.75 * gj.maxMem)
821 gi_memStat = +1;
822 }
823
824
825 return 0;
826
827} /*-----------------------------------------------------------------*/
828
829
830void
831resetEvtStat ()
832{
833 int i;
834
835 for (i = 0; i < MAX_SOCK; i++)
836 gi.numRead[i] = 0;
837
838 for (i = 0; i < NBOARDS; i++) {
839 gi.gotByte[i] = 0;
840 gi.gotErr[i] = 0;
841
842 }
843
844 gi.evtGet = 0; //#new Start of Events read
845 gi.evtTot = 0; //#complete Events read
846 gi.evtErr = 0; //#Events with Errors
847 gi.evtSkp = 0; //#Events incomplete (timeout)
848
849 gi.procTot = 0; //#Events processed
850 gi.procErr = 0; //#Events showed problem in processing
851 gi.procTrg = 0; //#Events accepted by SW trigger
852 gi.procSkp = 0; //#Events rejected by SW trigger
853
854 gi.feedTot = 0; //#Events used for feedBack system
855 gi.feedErr = 0; //#Events rejected by feedBack
856
857 gi.wrtTot = 0; //#Events written to disk
858 gi.wrtErr = 0; //#Events with write-error
859
860 gi.runOpen = 0; //#Runs opened
861 gi.runClose = 0; //#Runs closed
862 gi.runErr = 0; //#Runs with open/close errors
863
864 return;
865} /*-----------------------------------------------------------------*/
866
867
868
869void
870initReadFAD ()
871{
872 return;
873} /*-----------------------------------------------------------------*/
874
875
876
877void *
878readFAD (void *ptr)
879{
880/* *** main loop reading FAD data and sorting them to complete events */
881 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k;
882 int actBoards = 0, minLen;
883 int32_t jrd;
884 uint gi_SecTime; //time in seconds
885 int boardId, roi[9], drs, px, src, pixS, pixH, pixC, pixR, tmS;
886
887 int goodhed = 0;
888
889 int sockDef[NBOARDS]; //internal state of sockets
890 int jrdx;
891
892
893 struct timespec xwait;
894
895
896 struct timeval *tv, atv;
897 tv = &atv;
898 uint32_t tsec, tusec;
899
900
901 snprintf (str, MXSTR, "start initializing");
902 factOut (kInfo, -1, str);
903
904 int cpu = 7; //read thread
905 cpu_set_t mask;
906
907/* CPU_ZERO initializes all the bits in the mask to zero. */
908// CPU_ZERO (&mask);
909/* CPU_SET sets only the bit corresponding to cpu. */
910 cpu = 7;
911// CPU_SET (cpu, &mask);
912
913/* sched_setaffinity returns 0 in success */
914// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
915// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
916// factOut (kWarn, -1, str);
917// }
918
919
920 head_len = sizeof (PEVNT_HEADER);
921 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
922 minLen = head_len; //min #bytes needed to check header: full header for debug
923
924 start.S = 0xFB01;
925 stop.S = 0x04FE;
926
927/* initialize run control logics */
928 for (i = 0; i < MAX_RUN; i++) {
929 runCtrl[i].runId = 0;
930 runCtrl[i].fileId = -2;
931 runCtrl[i].procId = -2;
932 }
933 gi_resetS = gi_resetR = 9;
934 for (i = 0; i < NBOARDS; i++)
935 sockDef[i] = 0;
936
937 START:
938 gettimeofday (tv, NULL);
939 g_actTime = tsec = atv.tv_sec;
940 g_actUsec = tusec = atv.tv_usec;
941 gi_myRun = g_actTime;
942 evtCtrl.frstPtr = 0;
943 evtCtrl.lastPtr = 0;
944
945 gi_SecTime = g_actTime;
946 gi_runStat = g_runStat;
947 gj.readStat = g_runStat;
948 numok = numok2 = 0;
949
950 int cntsock = 8 - NUMSOCK ;
951
952 if (gi_resetS > 0) {
953 //make sure all sockets are preallocated as 'not exist'
954 for (i = 0; i < MAX_SOCK; i++) {
955 rd[i].socket = -1;
956 rd[i].sockStat = 99;
957 }
958
959 for (k = 0; k < NBOARDS; k++) {
960 gi_NumConnect[k] = 0;
961 gi.numConn[k] = 0;
962 gj.numConn[k] = 0;
963 gj.errConn[k] = 0;
964 gj.rateBytes[k] = 0;
965 gj.totBytes[k] = 0;
966 }
967
968 }
969
970
971 if (gi_resetR > 0) {
972 resetEvtStat ();
973 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
974 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
975 gj.totMem = g_maxMem;
976 gj.bufNew = gj.bufEvt = 0;
977 gj.badRoiE = gj.badRoiR = gj.badRoiB =
978 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
979
980 int b,p;
981 for (b = 0; b < NBOARDS; b++)
982 gj.badRoi[b] = 0;
983
984 mBufInit (); //initialize buffers
985
986 snprintf (str, MXSTR, "end initializing");
987 factOut (kInfo, -1, str);
988 }
989
990
991 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
992
993 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
994
995 gi_runStat = g_runStat;
996 gj.readStat = g_runStat;
997 gettimeofday (tv, NULL);
998 g_actTime = tsec = atv.tv_sec;
999 g_actUsec = tusec = atv.tv_usec;
1000
1001
1002 int b, p, p0, s0, nch;
1003 nch = 0;
1004 for (b = 0; b < NBOARDS; b++) {
1005 k = b * 7;
1006 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
1007 nch++;
1008 gi_NumConnect[b] = 0; //must close all connections
1009 gi.numConn[b] = 0;
1010 gj.numConn[b] = 0;
1011 if (sockDef[b] == 0)
1012 s0 = 0; //sockets to be defined and opened
1013 else if (g_port[b].sockDef == 0)
1014 s0 = -1; //sockets to be destroyed
1015 else
1016 s0 = +1; //sockets to be closed and reopened
1017
1018 if (s0 == 0)
1019 p0 = ntohs (g_port[b].sockAddr.sin_port);
1020 else
1021 p0 = 0;
1022
1023 for (p = p0 + 1; p < p0 + 8; p++) {
1024 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
1025 k++;
1026 }
1027 sockDef[b] = g_port[b].sockDef;
1028 }
1029 }
1030
1031 if (nch > 0) {
1032 actBoards = 0;
1033 for (b = 0; b < NBOARDS; b++) {
1034 if (sockDef[b] > 0)
1035 actBoards++;
1036 }
1037 }
1038
1039
1040 jrdx = 0;
1041 numokx = 0;
1042 numok = 0; //count number of succesfull actions
1043
1044 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read
1045 b = i / 7 ;
1046 p = i % 7 ;
1047
1048if ( p >= NUMSOCK) { ; }
1049else {
1050 if (sockDef[b] > 0)
1051 s0 = +1;
1052 else
1053 s0 = -1;
1054
1055 if (rd[i].sockStat < 0) { //try to connect if not yet done
1056 if (rd[i].sockStat == -1) {
1057 rd[i].sockStat = connect (rd[i].socket,
1058 (struct sockaddr *) &rd[i].SockAddr,
1059 sizeof (rd[i].SockAddr));
1060 if (rd[i].sockStat == -1) {
1061 rd[i].errCnt++ ;
1062// if (rd[i].errCnt < 100) rd[i].sockStat = rd[i].errCnt ;
1063// else if (rd[i].errCnt < 1000) rd[i].sockStat = 1000 ;
1064// else rd[i].sockStat = 10000 ;
1065 }
1066//printf("try to connect %d -> %d\n",i,rd[i].sockStat);
1067
1068 }
1069 if (rd[i].sockStat < -1 ) {
1070 rd[i].sockStat++ ;
1071 }
1072 if (rd[i].sockStat == 0) { //successfull ==>
1073 if (sockDef[b] > 0) {
1074 rd[i].bufTyp = 0; // expect a header
1075 rd[i].bufLen = frst_len; // max size to read at begining
1076 } else {
1077 rd[i].bufTyp = -1; // full data to be skipped
1078 rd[i].bufLen = MAX_LEN; //huge for skipping
1079 }
1080 rd[i].bufPos = 0; // no byte read so far
1081 rd[i].skip = 0; // start empty
1082// gi_NumConnect[b]++;
1083 gi_NumConnect[b] += cntsock ;
1084
1085 gi.numConn[b]++;
1086 gj.numConn[b]++;
1087 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);
1088 factOut (kInfo, -1, str);
1089 }
1090 }
1091
1092 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
1093 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
1094 numok++;
1095 size_t maxread = rd[i].bufLen ;
1096 if (maxread > MAXREAD ) maxread=MAXREAD ;
1097
1098 jrd =
1099 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
1100 maxread, MSG_DONTWAIT);
1101// rd[i].bufLen, MSG_DONTWAIT);
1102
1103 if (jrd > 0) {
1104 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
1105#ifdef EVTDEBUG
1106 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
1107 &rd[i].rBuf->B[rd[i].bufPos], jrd);
1108 snprintf (str, MXSTR,
1109 "read sock %3d bytes %5d len %5d first %d %d", i,
1110 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos],
1111 rd[i].rBuf->B[rd[i].bufPos + 1]);
1112 factOut (kDebug, 301, str);
1113#endif
1114 }
1115
1116 if (jrd == 0) { //connection has closed ...
1117 snprintf (str, MXSTR, "Socket %d closed by FAD", i);
1118 factOut (kInfo, 441, str);
1119 GenSock (s0, i, 0, NULL, &rd[i]);
1120 gi.gotErr[b]++;
1121// gi_NumConnect[b]--;
1122 gi_NumConnect[b]-= cntsock ;
1123 gi.numConn[b]--;
1124 gj.numConn[b]--;
1125
1126 } else if (jrd < 0) { //did not read anything
1127 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1128 snprintf (str, MXSTR, "Error Reading from %d | %m", i);
1129 factOut (kError, 442, str);
1130 gi.gotErr[b]++;
1131 } else
1132 numok--; //else nothing waiting to be read
1133 jrd = 0;
1134 }
1135 } else {
1136 jrd = 0; //did read nothing as requested
1137 snprintf (str, MXSTR, "do not read from socket %d %d", i,
1138 rd[i].bufLen);
1139 factOut (kDebug, 301, str);
1140 }
1141
1142 gi.gotByte[b] += jrd;
1143 gj.rateBytes[b] += jrd;
1144
1145 if (jrd > 0) {
1146 numokx++;
1147 jrdx += jrd;
1148 }
1149
1150
1151 if (rd[i].bufTyp < 0) { // we are skipping this board ...
1152// just do nothing
1153#ifdef EVTDEBUG
1154 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
1155 i);
1156 factOut (kInfo, 301, str);
1157#endif
1158
1159 } else if (rd[i].bufTyp > 0) { // we are reading data ...
1160 if (jrd < rd[i].bufLen) { //not yet all read
1161 rd[i].bufPos += jrd; //==> prepare for continuation
1162 rd[i].bufLen -= jrd;
1163 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
1164 } else { //full dataset read
1165 rd[i].bufLen = 0;
1166 rd[i].bufPos = rd[i].fadLen;
1167 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
1168 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
1169 gi.evtErr++;
1170 snprintf (str, MXSTR,
1171 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
1172 i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0],
1173 rd[i].rBuf->B[1],
1174 rd[i].rBuf->B[rd[i].bufPos - 2],
1175 rd[i].rBuf->B[rd[i].bufPos - 1]);
1176 factOut (kError, 301, str);
1177 goto EndBuf;
1178
1179#ifdef EVTDEBUG
1180 } else {
1181 snprintf (str, MXSTR,
1182 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
1183 i, rd[i].fadLen, rd[i].rBuf->B[0],
1184 rd[i].rBuf->B[1], start.B[1], start.B[0],
1185 rd[i].rBuf->B[rd[i].bufPos - 2],
1186 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
1187 stop.B[0]);
1188 factOut (kDebug, 301, str);
1189#endif
1190 }
1191
1192 if (jrd > 0)
1193 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1194
1195/* //we have a complete buffer, copy to WORK area
1196 int jr, kr;
1197 int checkRoi;
1198 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1199 for (kr = 1; kr < 4; kr++)
1200 {
1201 checkRoi = ntohs(rd[i].rBuf->S[head_len/ 2 +
1202 + kr*(roi[jr-1]+4)]);
1203 if (checkRoi != roi[0])
1204 {
1205 snprintf (str, MXSTR, "Inconsistent Roi accross board patches");
1206 factOut (kFatal, 1, str);
1207 goto EndBuf;
1208 }
1209
1210 }
1211 for (jr = 1; jr < 9; jr++) {
1212 roi[jr] =
1213 ntohs (rd[i].
1214 rBuf->S[head_len / 2 + 2 + jr * (roi[jr-1] + 4)]);
1215
1216 }
1217*/
1218 //we have a complete buffer, copy to WORK area
1219 int jr, kr;
1220 int checkRoi;
1221 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1222 for (kr = 1; kr < 4; kr++)
1223 {
1224 checkRoi = ntohs(rd[i].rBuf->S[head_len/ 2 + 2
1225 + kr*(roi[0]+4)]);
1226 if (checkRoi != roi[0])
1227 {
1228 snprintf (str, MXSTR, "Inconsistent Roi accross board patches a %d %d %d", kr, checkRoi, roi[0]);
1229 factOut (kError, 1, str);
1230 goto EndBuf;
1231 }
1232
1233 }
1234 for (jr = 1; jr < 9; jr++) {
1235 roi[jr] =
1236 ntohs (rd[i].
1237 rBuf->S[head_len / 2 + 2 + 9 * jr * (roi[jr-1] + 4)]);
1238 for (kr = 1; kr < 4; kr++)
1239 {
1240 checkRoi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 //header up to first roi
1241 + kr*(roi[jr-1]+4) //shift up to the next board
1242 + 9*jr*(roi[jr-1]+4)]); //shift up to the correct pixel
1243 if (checkRoi != roi[jr])
1244 {
1245 snprintf (str, MXSTR, "Inconsistent Roi accross board patches");
1246 factOut (kFatal, 1, str);
1247 goto EndBuf;
1248 }
1249 }
1250 }
1251 //get index into mBuffer for this event (create if needed)
1252
1253 int actid;
1254 if (g_useFTM > 0)
1255 actid = rd[i].evtID;
1256 else
1257 actid = rd[i].ftmID;
1258
1259 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1260 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1261 rd[i].evtID);
1262
1263 if (evID < -1000) {
1264 goto EndBuf; //not usable board/event/run --> skip it
1265 }
1266 if (evID < 0) { //no space left, retry later
1267#ifdef EVTDEBUG
1268 if (rd[i].bufLen != 0) {
1269 snprintf (str, MXSTR, "something screwed up");
1270 factOut (kFatal, 1, str);
1271 }
1272#endif
1273 xwait.tv_sec = 0;
1274 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1275 nanosleep (&xwait, NULL);
1276 goto EndBuf1; //hope there is free space next round
1277 }
1278 //we have a valid entry in mBuffer[]; fill it
1279
1280#ifdef EVTDEBUG
1281 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1282 rd[i].fadLen);
1283 if (xchk != 0) {
1284 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d",
1285 xchk, rd[i].fadLen, i);
1286 factOut (kFatal, 1, str);
1287
1288 uint iq;
1289 for (iq = 0; iq < rd[i].fadLen; iq++) {
1290 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1291 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq,
1292 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
1293 factOut (kFatal, 1, str);
1294 }
1295 }
1296 }
1297#endif
1298 int qncpy = 0;
1299 boardId = b;
1300 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1301 int fadCrate = fadBoard / 256;
1302 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1303 snprintf (str, MXSTR, "wrong Board ID %d %d %d",
1304 fadCrate, fadBoard % 256, boardId);
1305 factOut (kWarn, 301, str);
1306 }
1307 if (mBuffer[evID].board[boardId] != -1) {
1308 snprintf (str, MXSTR,
1309 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ",
1310 evID, boardId, i, rd[i].fadLen,
1311 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1312 rd[i].rBuf->B[rd[i].bufPos - 2],
1313 rd[i].rBuf->B[rd[i].bufPos - 1]);
1314 factOut (kWarn, 501, str);
1315 goto EndBuf; //--> skip Board
1316 }
1317
1318 int iDx = evtIdx[evID]; //index into evtCtrl
1319
1320 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
1321 &rd[i].rBuf->S[0], head_len);
1322 qncpy += head_len;
1323
1324 src = head_len / 2;
1325 for (px = 0; px < 9; px++) { //different sort in FAD board.....
1326 for (drs = 0; drs < 4; drs++) {
1327 pixH = ntohs (rd[i].rBuf->S[src++]); // ID
1328 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell
1329 pixR = ntohs (rd[i].rBuf->S[src++]); // roi
1330//here we should check if pixH is correct ....
1331
1332 pixS = boardId * 36 + drs * 9 + px;
1333 src++;
1334
1335
1336 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1337 dest = pixS * roi[0];
1338 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1339 &rd[i].rBuf->S[src], roi[0] * 2);
1340 qncpy += roi[0] * 2;
1341 src += pixR;
1342
1343 if (px == 8) {
1344 tmS = boardId * 4 + drs;
1345 if (pixR > roi[0]) { //and we have additional TM info
1346 dest = tmS * roi[0] + NPIX * roi[0];
1347 int srcT = src - roi[0];
1348 mBuffer[evID].fEvent->StartTM[tmS] =
1349 (pixC + pixR - roi[0]) % 1024;
1350 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1351 &rd[i].rBuf->S[srcT], roi[0] * 2);
1352 qncpy += roi[0] * 2;
1353 } else {
1354 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1355 //ETIENNE because the TM channels are always processed during drs calib,
1356 //set them to zero if they are not present
1357 //I suspect that it may be more efficient to set all the allocated mem to
1358 //zero when allocating it
1359 dest = tmS*roi[0] + NPIX*roi[0];
1360 memset(&mBuffer[evID].fEvent->Adc_Data[dest],0,roi[0]*2);
1361 }
1362 }
1363 }
1364 } // now we have stored a new board contents into Event structure
1365
1366 mBuffer[evID].fEvent->NumBoards++;
1367 mBuffer[evID].board[boardId] = boardId;
1368 evtCtrl.evtStat[iDx]++;
1369 evtCtrl.pcTime[iDx] = g_actTime;
1370
1371 if (++mBuffer[evID].nBoard >= actBoards) {
1372 int qnrun = 0;
1373 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ???
1374 actrun = mBuffer[evID].runNum;
1375 int ir;
1376 for (ir = 0; ir < MAX_RUN; ir++) {
1377 qnrun++;
1378 if (runCtrl[ir].runId == actrun) {
1379 if (++runCtrl[ir].lastEvt == 0) {
1380 gotNewRun (actrun, mBuffer[evID].FADhead);
1381 snprintf (str, MXSTR, "gotNewRun %d (ev %d)",
1382 mBuffer[evID].runNum,
1383 mBuffer[evID].evNum);
1384 factOut (kInfo, 1, str);
1385 break;
1386 }
1387 }
1388 }
1389 }
1390 snprintf (str, MXSTR,
1391 "%5d complete event roi %4d roiTM %d cpy %8d %5d",
1392 mBuffer[evID].evNum, roi[0], roi[8] - roi[0],
1393 qncpy, qnrun);
1394 factOut (kDebug, -1, str);
1395
1396 //complete event read ---> flag for next processing
1397 evtCtrl.evtStat[iDx] = 99;
1398 gi.evtTot++;
1399 }
1400
1401 EndBuf:
1402 rd[i].bufTyp = 0; //ready to read next header
1403 rd[i].bufLen = frst_len;
1404 rd[i].bufPos = 0;
1405 EndBuf1:
1406 ;
1407 }
1408
1409 } else { //we are reading event header
1410 rd[i].bufPos += jrd;
1411 rd[i].bufLen -= jrd;
1412 if (rd[i].bufPos >= minLen) { //sufficient data to take action
1413 //check if startflag correct; else shift block ....
1414 for (k = 0; k < rd[i].bufPos - 1; k++) {
1415 if (rd[i].rBuf->B[k] == start.B[1]
1416 && rd[i].rBuf->B[k + 1] == start.B[0])
1417 break;
1418 }
1419 rd[i].skip += k;
1420
1421 if (k >= rd[i].bufPos - 1) { //no start of header found
1422 rd[i].bufPos = 0;
1423 rd[i].bufLen = head_len;
1424 } else if (k > 0) {
1425 rd[i].bufPos -= k;
1426 rd[i].bufLen += k;
1427 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1428 rd[i].bufPos);
1429#ifdef EVTDEBUG
1430 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1431 rd[i].bufPos);
1432#endif
1433 }
1434 if (rd[i].bufPos >= minLen) {
1435 if (rd[i].skip > 0) {
1436 snprintf (str, MXSTR, "skipped %d bytes on port%d",
1437 rd[i].skip, i);
1438 factOut (kInfo, 666, str);
1439 rd[i].skip = 0;
1440 }
1441 goodhed++;
1442 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2;
1443 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]);
1444 rd[i].ftmTyp = ntohs (rd[i].rBuf->S[5]);
1445 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt)
1446 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt)
1447 rd[i].runID = ntohl (rd[i].rBuf->I[11]);
1448 rd[i].bufTyp = 1; //ready to read full record
1449 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1450
1451 int fadboard = ntohs (rd[i].rBuf->S[12]);
1452 int fadcrate = fadboard / 256;
1453 fadboard = (fadcrate * 10 + fadboard % 256);
1454#ifdef EVTDEBUG
1455 snprintf (str, MXSTR,
1456 "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
1457 rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
1458 rd[i].runID, fadboard, jrd);
1459 factOut (kDebug, 1, str);
1460#endif
1461
1462 if (rd[i].runID == 0)
1463 rd[i].runID = gi_myRun;
1464
1465 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
1466 snprintf (str, MXSTR,
1467 "illegal event-length on port %d", i);
1468 factOut (kFatal, 881, str);
1469 rd[i].bufLen = 100000; //?
1470 }
1471 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1472 debugHead (i, fadBoard, rd[i].rBuf);
1473 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1474 } else {
1475 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1476 }
1477 } else {
1478 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1479 }
1480
1481 } //end interpreting last read
1482}
1483 } //end of successful read anything
1484 } //finished trying to read all sockets
1485
1486#ifdef EVTDEBUG
1487 snprintf (str, MXSTR, "Loop ---- %3d --- %8d", numokx, jrdx);
1488 factOut (kDebug, -1, str);
1489#endif
1490
1491 gi.numRead[numok]++;
1492
1493 g_actTime = time (NULL);
1494 if (g_actTime > gi_SecTime) {
1495 gi_SecTime = g_actTime;
1496
1497
1498 //loop over all active events and flag those older than read-timeout
1499 //delete those that are written to disk ....
1500
1501 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1502 if (kd < 0)
1503 kd += (MAX_EVT * MAX_RUN);
1504
1505 gj.bufNew = gj.bufEvt = 0;
1506 int k1 = evtCtrl.frstPtr;
1507 for (k = k1; k < (k1 + kd); k++) {
1508 int k0 = k % (MAX_EVT * MAX_RUN);
1509//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1510
1511 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) {
1512 gj.bufNew++; //incomplete event in Buffer
1513 if (evtCtrl.evtStat[k0] < 90
1514 && evtCtrl.pcTime[k0] < g_actTime - 10) {
1515 int id = evtCtrl.evtBuf[k0];
1516 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d",
1517 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0,
1518 evtCtrl.evtStat[k0]);
1519 factOut (kWarn, 601, str);
1520
1521 int ik,ib,jb;
1522 ik=0;
1523 for (ib=0; ib<NBOARDS; ib++) {
1524 if (ib%10==0) {
1525 snprintf (&str[ik], MXSTR, "'");
1526 ik++;
1527 }
1528 jb = mBuffer[id].board[ib];
1529 if ( jb<0 ) {
1530 if (gi_NumConnect[b] >0 ) {
1531 snprintf (&str[ik], MXSTR, ".");
1532 } else {
1533 snprintf (&str[ik], MXSTR, "x");
1534 }
1535 } else {
1536 snprintf (&str[ik], MXSTR, "%d",jb%10);
1537 }
1538 ik++;
1539 }
1540 snprintf (&str[ik], MXSTR, "'");
1541 factOut (kWarn, 601, str);
1542
1543 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events
1544 gi.evtSkp++;
1545 gi.evtTot++;
1546 gj.evtSkip++;
1547 }
1548 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete'
1549 || evtCtrl.evtStat[k0] == 0) { //'useless'
1550
1551 int id = evtCtrl.evtBuf[k0];
1552 snprintf (str, MXSTR, "%5d free event buffer, nb=%3d",
1553 mBuffer[id].evNum, mBuffer[id].nBoard);
1554 factOut (kDebug, -1, str);
1555 mBufFree (id); //event written--> free memory
1556 evtCtrl.evtStat[k0] = -1;
1557 gj.evtWrite++;
1558 gj.rateWrite++;
1559 } else if (evtCtrl.evtStat[k0] >= 95) {
1560 gj.bufEvt++; //complete event in Buffer
1561 }
1562
1563 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) {
1564 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1565 }
1566 }
1567
1568
1569 gj.deltaT = 1000; //temporary, must be improved
1570
1571 int b;
1572 for (b = 0; b < NBOARDS; b++)
1573 gj.totBytes[b] += gj.rateBytes[b];
1574 gj.totMem = g_maxMem;
1575 if (gj.maxMem > gj.xxxMem)
1576 gj.xxxMem = gj.maxMem;
1577 if (gj.maxEvt > gj.xxxEvt)
1578 gj.xxxEvt = gj.maxEvt;
1579
1580 factStat (gj);
1581 factStatNew (gi);
1582 gj.rateNew = gj.rateWrite = 0;
1583 gj.maxMem = gj.usdMem;
1584 gj.maxEvt = gj.bufTot;
1585 for (b = 0; b < NBOARDS; b++)
1586 gj.rateBytes[b] = 0;
1587 }
1588
1589 if (numok > 0)
1590 numok2 = 0;
1591 else if (numok2++ > 3) {
1592 if (g_runStat == 1) {
1593 xwait.tv_sec = 1;
1594 xwait.tv_nsec = 0; // hibernate for 1 sec
1595 } else {
1596 xwait.tv_sec = 0;
1597 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1598 }
1599 nanosleep (&xwait, NULL);
1600 }
1601
1602 } //and do next loop over all sockets ...
1603
1604
1605 snprintf (str, MXSTR, "stop reading ... RESET=%d", g_reset);
1606 factOut (kInfo, -1, str);
1607
1608 if (g_reset > 0) {
1609 gi_reset = g_reset;
1610 gi_resetR = gi_reset % 10; //shall we stop reading ?
1611 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1612 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1613 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1614 g_reset = 0;
1615 } else {
1616 gi_reset = 0;
1617 if (g_runStat == -1)
1618 gi_resetR = 1;
1619 else
1620 gi_resetR = 7;
1621 gi_resetS = 7; //close all sockets
1622 gi_resetW = 7; //close all files
1623 gi_resetX = 0;
1624
1625 //inform others we have to quit ....
1626 gi_runStat = -11; //inform all that no update to happen any more
1627 gj.readStat = -11; //inform all that no update to happen any more
1628 }
1629
1630 if (gi_resetS > 0) {
1631 //must close all open sockets ...
1632 snprintf (str, MXSTR, "close all sockets ...");
1633 factOut (kInfo, -1, str);
1634 for (i = 0; i < MAX_SOCK; i++) {
1635 if (rd[i].sockStat == 0) {
1636 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1637 if (i % 7 == 0) {
1638// gi_NumConnect[i / 7]--;
1639 gi_NumConnect[i / 7]-= cntsock ;
1640 gi.numConn[i / 7]--;
1641 gj.numConn[i / 7]--;
1642 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1643 rd[i / 7].sockStat = -1; //and try to open asap
1644 }
1645 }
1646 }
1647 }
1648
1649
1650 if (gi_resetR > 0) {
1651 //flag all events as 'read finished'
1652 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1653 if (kd < 0)
1654 kd += (MAX_EVT * MAX_RUN);
1655
1656 int k1 = evtCtrl.frstPtr;
1657 for (k = k1; k < (k1 + kd); k++) {
1658 int k0 = k % (MAX_EVT * MAX_RUN);
1659 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) {
1660 evtCtrl.evtStat[k0] = 91;
1661 gi.evtSkp++;
1662 gi.evtTot++;
1663 }
1664 }
1665
1666 xwait.tv_sec = 0;
1667 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1668 nanosleep (&xwait, NULL);
1669
1670 //and clear all buffers (might have to wait until all others are done)
1671 int minclear;
1672 if (gi_resetR == 1) {
1673 minclear = 900;
1674 snprintf (str, MXSTR, "drain all buffers ...");
1675 } else {
1676 minclear = 0;
1677 snprintf (str, MXSTR, "flush all buffers ...");
1678 }
1679 factOut (kInfo, -1, str);
1680
1681 int numclear = 1;
1682 while (numclear > 0) {
1683 numclear = 0;
1684 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1685 if (kd < 0)
1686 kd += (MAX_EVT * MAX_RUN);
1687
1688 int k1 = evtCtrl.frstPtr;
1689 for (k = k1; k < (k1 + kd); k++) {
1690 int k0 = k % (MAX_EVT * MAX_RUN);
1691 if (evtCtrl.evtStat[k0] > minclear) {
1692 int id = evtCtrl.evtBuf[k0];
1693 snprintf (str, MXSTR, "ev %5d free event buffer, nb=%3d",
1694 mBuffer[id].evNum, mBuffer[id].nBoard);
1695 factOut (kDebug, -1, str);
1696 mBufFree (id); //event written--> free memory
1697 evtCtrl.evtStat[k0] = -1;
1698 } else if (evtCtrl.evtStat[k0] > 0)
1699 numclear++; //writing is still ongoing...
1700
1701 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1702 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1703 }
1704
1705 xwait.tv_sec = 0;
1706 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1707 nanosleep (&xwait, NULL);
1708 }
1709 }
1710
1711 if (gi_reset > 0) {
1712 if (gi_resetW > 0) {
1713 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1714 }
1715 if (gi_resetX > 0) {
1716 xwait.tv_sec = gi_resetX;
1717 xwait.tv_nsec = 0;
1718 nanosleep (&xwait, NULL);
1719 }
1720
1721 snprintf (str, MXSTR, "Continue read Process ...");
1722 factOut (kInfo, -1, str);
1723 gi_reset = 0;
1724 goto START;
1725 }
1726
1727
1728
1729 snprintf (str, MXSTR, "Exit read Process ...");
1730 factOut (kInfo, -1, str);
1731 gi_runStat = -99;
1732 gj.readStat = -99;
1733 factStat (gj);
1734 factStatNew (gi);
1735 return 0;
1736
1737} /*-----------------------------------------------------------------*/
1738
1739
1740void *
1741subProc (void *thrid)
1742{
1743 int threadID, status, numWait, numProc, kd, k1, k0, k, jret;
1744 struct timespec xwait;
1745
1746 int32_t cntr ;
1747
1748 threadID = (int) thrid;
1749
1750 snprintf (str, MXSTR, "Starting sub-process-thread %d", threadID);
1751 factOut (kInfo, -1, str);
1752
1753 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1754 numWait = numProc = 0;
1755 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1756 if (kd < 0)
1757 kd += (MAX_EVT * MAX_RUN);
1758
1759 int k1 = evtCtrl.frstPtr;
1760 for (k = k1; k < (k1 + kd); k++) {
1761 int k0 = k % (MAX_EVT * MAX_RUN);
1762
1763 if (evtCtrl.evtStat[k0] == 1000 + threadID) {
1764 if (gi_resetR > 1) { //we are asked to flush buffers asap
1765 jret = 9100; //flag to be deleted
1766 } else {
1767 int id = evtCtrl.evtBuf[k0];
1768
1769
1770 jret =
1771 subProcEvt (threadID, mBuffer[id].FADhead,
1772 mBuffer[id].fEvent, mBuffer[id].buffer);
1773
1774
1775 if (jret <= threadID) {
1776 snprintf (str, MXSTR,
1777 "process %d wants to send to process %d",
1778 threadID, jret);
1779 factOut (kError, -1, str);
1780 jret = 5300;
1781 } else if (jret <= 0)
1782 jret = 9200 + threadID; //flag as 'to be deleted'
1783 else if (jret >= gi_maxProc)
1784 jret = 5200 + threadID; //flag as 'to be written'
1785 else
1786 jret = 1000 + jret; //flag for next proces
1787 }
1788 evtCtrl.evtStat[k0] = jret;
1789 numProc++;
1790 } else if (evtCtrl.evtStat[k0] < 1000 + threadID)
1791 numWait++;
1792 }
1793
1794 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1795 snprintf (str, MXSTR, "Exit subProcessing Process %d", threadID);
1796 factOut (kInfo, -1, str);
1797 return 0;
1798 }
1799 if (numProc == 0) {
1800 //seems we have nothing to do, so sleep a little
1801 xwait.tv_sec = 0;
1802 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1803 nanosleep (&xwait, NULL);
1804 }
1805 }
1806
1807 snprintf (str, MXSTR, "Ending sub-process-thread %d", threadID);
1808 factOut (kInfo, -1, str);
1809 return;
1810} /*-----------------------------------------------------------------*/
1811
1812
1813void *
1814procEvt (void *ptr)
1815{
1816/* *** main loop processing file, including SW-trigger */
1817 int numProc, numWait;
1818 int k, status, j;
1819 struct timespec xwait;
1820 char str[MXSTR];
1821
1822
1823
1824 int lastRun = 0; //usually run from last event still valid
1825
1826 cpu_set_t mask;
1827 int cpu = 1; //process thread (will be several in final version)
1828
1829 snprintf (str, MXSTR, "Starting process-thread with %d subprocess",
1830 gi_maxProc);
1831 factOut (kInfo, -1, str);
1832
1833/* CPU_ZERO initializes all the bits in the mask to zero. */
1834// CPU_ZERO (&mask);
1835/* CPU_SET sets only the bit corresponding to cpu. */
1836// CPU_SET( 0 , &mask ); leave for system
1837// CPU_SET( 1 , &mask ); used by write process
1838// CPU_SET (2, &mask);
1839// CPU_SET (3, &mask);
1840// CPU_SET (4, &mask);
1841// CPU_SET (5, &mask);
1842// CPU_SET (6, &mask);
1843// CPU_SET( 7 , &mask ); used by read process
1844/* sched_setaffinity returns 0 in success */
1845// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1846// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1847// factOut (kWarn, -1, str);
1848// }
1849
1850
1851 pthread_t thread[100];
1852 int th_ret[100];
1853
1854 for (k = 0; k < gi_maxProc; k++) {
1855 th_ret[k] = pthread_create (&thread[k], NULL, subProc, (void *) k);
1856 }
1857
1858 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1859
1860 numWait = numProc = 0;
1861 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1862 if (kd < 0)
1863 kd += (MAX_EVT * MAX_RUN);
1864
1865 int k1 = evtCtrl.frstPtr;
1866 for (k = k1; k < (k1 + kd); k++) {
1867 int k0 = k % (MAX_EVT * MAX_RUN);
1868//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1869 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) {
1870
1871 if (gi_resetR > 1) { //we are asked to flush buffers asap
1872 evtCtrl.evtStat[k0] = 9991;
1873 } else {
1874
1875//-------- it is better to open the run already here, so call can be used to initialize
1876//-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1877 int id = evtCtrl.evtBuf[k0];
1878 uint32_t irun = mBuffer[id].runNum;
1879 int32_t ievt = mBuffer[id].evNum;
1880 if (runCtrl[lastRun].runId == irun) {
1881 j = lastRun;
1882 } else {
1883 //check which fileID to use (or open if needed)
1884 for (j = 0; j < MAX_RUN; j++) {
1885 if (runCtrl[j].runId == irun)
1886 break;
1887 }
1888 if (j >= MAX_RUN) {
1889 snprintf (str, MXSTR,
1890 "P error: can not find run %d for event %d in %d",
1891 irun, ievt, id);
1892 factOut (kFatal, 901, str);
1893 }
1894 lastRun = j;
1895 }
1896
1897 if (runCtrl[j].fileId < 0) {
1898//---- we need to open a new run ==> make sure all older runs are
1899//---- finished and marked to be closed ....
1900 int j1;
1901 for (j1 = 0; j1 < MAX_RUN; j1++) {
1902 if (runCtrl[j1].fileId == 0) {
1903 runCtrl[j1].procId = 2; //--> do no longer accept events for processing
1904//---- problem: processing still going on ==> must wait for closing ....
1905 snprintf (str, MXSTR,
1906 "P finish run since new one opened %d",
1907 runCtrl[j1].runId);
1908 runFinish1 (runCtrl[j1].runId);
1909 }
1910
1911 }
1912
1913 actRun.Version = 1;
1914 actRun.RunType = -1; //to be adapted
1915
1916 actRun.Nroi = runCtrl[j].roi0;
1917 actRun.NroiTM = runCtrl[j].roi8;
1918//ETIENNE don't reset it to zero
1919// if (actRun.Nroi == actRun.NroiTM)
1920// actRun.NroiTM = 0;
1921 actRun.RunTime = runCtrl[j].firstTime;
1922 actRun.RunUsec = runCtrl[j].firstTime;
1923 actRun.NBoard = NBOARDS;
1924 actRun.NPix = NPIX;
1925 actRun.NTm = NTMARK;
1926 actRun.Nroi = mBuffer[id].nRoi;
1927 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1928 NBOARDS * sizeof (PEVNT_HEADER));
1929
1930 runCtrl[j].fileHd =
1931 runOpen (irun, &actRun, sizeof (actRun));
1932 if (runCtrl[j].fileHd == NULL) {
1933 snprintf (str, MXSTR,
1934 "P could not open a file for run %d", irun);
1935 factOut (kError, 502, str);
1936 runCtrl[j].fileId = 91;
1937 runCtrl[j].procId = 91;
1938 } else {
1939 snprintf (str, MXSTR, "P opened new run_file %d evt %d",
1940 irun, ievt);
1941 factOut (kInfo, -1, str);
1942 runCtrl[j].fileId = 0;
1943 runCtrl[j].procId = 0;
1944 }
1945
1946 }
1947//-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
1948 if (runCtrl[j].procId == 0) {
1949 if (runCtrl[j].closeTime < g_actTime
1950 || runCtrl[j].lastTime < g_actTime - 300
1951 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) {
1952 snprintf (str, MXSTR,
1953 "P reached end of run condition for run %d",
1954 irun);
1955 factOut (kInfo, 502, str);
1956 runFinish1 (runCtrl[j].runId);
1957 runCtrl[j].procId = 1;
1958 }
1959 }
1960 if (runCtrl[j].procId != 0) {
1961 snprintf (str, MXSTR,
1962 "P skip event %d because no active run %d", ievt,
1963 irun);
1964 factOut (kDebug, 502, str);
1965 evtCtrl.evtStat[k0] = 9091;
1966 } else {
1967//--------
1968//--------
1969 id = evtCtrl.evtBuf[k0];
1970 int itevt = mBuffer[id].trgNum;
1971 int itrg = mBuffer[id].trgTyp;
1972 int roi = mBuffer[id].nRoi;
1973 int roiTM = mBuffer[id].nRoiTM;
1974
1975//make sure unused pixels/tmarks are cleared to zero
1976//ETIENNE don't reset it to zero
1977// if (roiTM == roi)
1978// roiTM = 0;
1979 int ip, it, dest, ib;
1980 for (ip = 0; ip < NPIX; ip++) {
1981 if (mBuffer[id].fEvent->StartPix[ip] == -1) {
1982 dest = ip * roi;
1983 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1984 }
1985 }
1986
1987 for (it = 0; it < NTMARK; it++) {
1988 if (mBuffer[id].fEvent->StartTM[it] == -1) {
1989 dest = it * roi + NPIX * roi;
1990 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1991 }
1992 }
1993
1994
1995//and set correct event header ; also check for consistency in event (not yet)
1996 mBuffer[id].fEvent->Roi = roi;
1997 mBuffer[id].fEvent->RoiTM = roiTM;
1998 mBuffer[id].fEvent->EventNum = ievt;
1999 mBuffer[id].fEvent->TriggerNum = itevt;
2000 mBuffer[id].fEvent->TriggerType = itrg;
2001 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
2002 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
2003 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
2004 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
2005 mBuffer[id].fEvent->SoftTrig = 0;
2006
2007
2008 for (ib = 0; ib < NBOARDS; ib++) {
2009 if (mBuffer[id].board[ib] == -1) { //board is not read
2010 mBuffer[id].FADhead[ib].start_package_flag = 0;
2011 mBuffer[id].fEvent->BoardTime[ib] = 0;
2012 } else {
2013 mBuffer[id].fEvent->BoardTime[ib] =
2014 ntohl (mBuffer[id].FADhead[ib].time);
2015 }
2016 }
2017 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
2018 mBuffer[id].fEvent);
2019 gi.procTot++;
2020 numProc++;
2021
2022 if (i < 0) {
2023 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
2024 gi.procErr++;
2025 } else {
2026 evtCtrl.evtStat[k0] = 1000;
2027 runCtrl[j].procEvt++;
2028 }
2029 }
2030 }
2031 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) {
2032 numWait++;
2033 }
2034 }
2035
2036 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2037 snprintf (str, MXSTR, "Exit Processing Process ...");
2038 factOut (kInfo, -1, str);
2039 gp_runStat = -22; //==> we should exit
2040 gj.procStat = -22; //==> we should exit
2041 return 0;
2042 }
2043
2044 if (numProc == 0) {
2045 //seems we have nothing to do, so sleep a little
2046 xwait.tv_sec = 0;
2047 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2048 nanosleep (&xwait, NULL);
2049 }
2050 gp_runStat = gi_runStat;
2051 gj.procStat = gj.readStat;
2052
2053 }
2054
2055 //we are asked to abort asap ==> must flag all remaining events
2056 // when gi_runStat claims that all events are in the buffer...
2057
2058 snprintf (str, MXSTR, "Abort Processing Process ...");
2059 factOut (kInfo, -1, str);
2060 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2061 if (kd < 0)
2062 kd += (MAX_EVT * MAX_RUN);
2063
2064 for (k = 0; k < gi_maxProc; k++) {
2065 pthread_join (thread[k], (void **) &status);
2066 }
2067
2068 int k1 = evtCtrl.frstPtr;
2069 for (k = k1; k < (k1 + kd); k++) {
2070 int k0 = k % (MAX_EVT * MAX_RUN);
2071 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {
2072 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
2073 }
2074 }
2075
2076 gp_runStat = -99;
2077 gj.procStat = -99;
2078
2079 return 0;
2080
2081} /*-----------------------------------------------------------------*/
2082
2083int
2084CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
2085{
2086/* close run runId (all all runs if runId=0) */
2087/* return: 0=close scheduled / >0 already closed / <0 does not exist */
2088 int j;
2089
2090
2091 if (runId == 0) {
2092 for (j = 0; j < MAX_RUN; j++) {
2093 if (runCtrl[j].fileId == 0) { //run is open
2094 runCtrl[j].closeTime = closeTime;
2095 runCtrl[j].maxEvt = maxEvt;
2096 }
2097 }
2098 return 0;
2099 }
2100
2101 for (j = 0; j < MAX_RUN; j++) {
2102 if (runCtrl[j].runId == runId) {
2103 if (runCtrl[j].fileId == 0) { //run is open
2104 runCtrl[j].closeTime = closeTime;
2105 runCtrl[j].maxEvt = maxEvt;
2106 return 0;
2107 } else if (runCtrl[j].fileId < 0) { //run not yet opened
2108 runCtrl[j].closeTime = closeTime;
2109 runCtrl[j].maxEvt = maxEvt;
2110 return +1;
2111 } else { // run already closed
2112 return +2;
2113 }
2114 }
2115 } //we only reach here if the run was never created
2116 return -1;
2117
2118} /*-----------------------------------------------------------------*/
2119
2120
2121void *
2122writeEvt (void *ptr)
2123{
2124/* *** main loop writing event (including opening and closing run-files */
2125
2126 int numWrite, numWait;
2127 int k, j ;
2128 struct timespec xwait;
2129 char str[MXSTR];
2130
2131 cpu_set_t mask;
2132 int cpu = 1; //write thread
2133
2134 snprintf (str, MXSTR, "Starting write-thread");
2135 factOut (kInfo, -1, str);
2136
2137/* CPU_ZERO initializes all the bits in the mask to zero. */
2138// CPU_ZERO (&mask);
2139/* CPU_SET sets only the bit corresponding to cpu. */
2140// CPU_SET (cpu, &mask);
2141/* sched_setaffinity returns 0 in success */
2142// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
2143// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
2144// }
2145
2146 int lastRun = 0; //usually run from last event still valid
2147
2148 while (g_runStat > -2) {
2149
2150 numWait = numWrite = 0;
2151 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2152 if (kd < 0)
2153 kd += (MAX_EVT * MAX_RUN);
2154
2155 int k1 = evtCtrl.frstPtr;
2156 for (k = k1; k < (k1 + kd); k++) {
2157 int k0 = k % (MAX_EVT * MAX_RUN);
2158//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
2159 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) {
2160
2161 if (gi_resetR > 1) { //we must drain the buffer asap
2162 evtCtrl.evtStat[k0] = 9904;
2163 } else {
2164
2165
2166 int id = evtCtrl.evtBuf[k0];
2167 uint32_t irun = mBuffer[id].runNum;
2168 int32_t ievt = mBuffer[id].evNum;
2169
2170 gi.wrtTot++;
2171 if (runCtrl[lastRun].runId == irun) {
2172 j = lastRun;
2173 } else {
2174 //check which fileID to use (or open if needed)
2175 for (j = 0; j < MAX_RUN; j++) {
2176 if (runCtrl[j].runId == irun)
2177 break;
2178 }
2179 if (j >= MAX_RUN) {
2180 snprintf (str, MXSTR,
2181 "W error: can not find run %d for event %d in %d",
2182 irun, ievt, id);
2183 factOut (kFatal, 901, str);
2184 gi.wrtErr++;
2185 }
2186 lastRun = j;
2187 }
2188
2189 if (runCtrl[j].fileId < 0) {
2190 actRun.Version = 1;
2191 actRun.RunType = -1; //to be adapted
2192
2193 actRun.Nroi = runCtrl[j].roi0;
2194 actRun.NroiTM = runCtrl[j].roi8;
2195//ETIENNE don't reset it to zero
2196// if (actRun.Nroi == actRun.NroiTM)
2197// actRun.NroiTM = 0;
2198 actRun.RunTime = runCtrl[j].firstTime;
2199 actRun.RunUsec = runCtrl[j].firstTime;
2200 actRun.NBoard = NBOARDS;
2201 actRun.NPix = NPIX;
2202 actRun.NTm = NTMARK;
2203 actRun.Nroi = mBuffer[id].nRoi;
2204 memcpy (actRun.FADhead, mBuffer[id].FADhead,
2205 NBOARDS * sizeof (PEVNT_HEADER));
2206
2207 runCtrl[j].fileHd =
2208 runOpen (irun, &actRun, sizeof (actRun));
2209 if (runCtrl[j].fileHd == NULL) {
2210 snprintf (str, MXSTR,
2211 "W could not open a file for run %d", irun);
2212 factOut (kError, 502, str);
2213 runCtrl[j].fileId = 91;
2214 } else {
2215 snprintf (str, MXSTR, "W opened new run_file %d evt %d",
2216 irun, ievt);
2217 factOut (kInfo, -1, str);
2218 runCtrl[j].fileId = 0;
2219 }
2220
2221 }
2222
2223 if (runCtrl[j].fileId != 0) {
2224 if (runCtrl[j].fileId < 0) {
2225 snprintf (str, MXSTR,
2226 "W never opened file for this run %d", irun);
2227 factOut (kError, 123, str);
2228 } else if (runCtrl[j].fileId < 100) {
2229 snprintf (str, MXSTR, "W.file for this run is closed %d",
2230 irun);
2231 factOut (kWarn, 123, str);
2232 runCtrl[j].fileId += 100;
2233 } else {
2234 snprintf (str, MXSTR, "W:file for this run is closed %d",
2235 irun);
2236 factOut (kDebug, 123, str);
2237 }
2238 evtCtrl.evtStat[k0] = 9903;
2239 gi.wrtErr++;
2240 } else {
2241// snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id]));
2242// factOut (kInfo, 504, str);
2243 int i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
2244 sizeof (mBuffer[id]));
2245 if (i >= 0) {
2246 runCtrl[j].lastTime = g_actTime;
2247 runCtrl[j].actEvt++;
2248 evtCtrl.evtStat[k0] = 9901;
2249 snprintf (str, MXSTR,
2250 "%5d successfully wrote for run %d id %5d",
2251 ievt, irun, k0);
2252 factOut (kDebug, 504, str);
2253// gj.writEvt++ ;
2254 } else {
2255 snprintf (str, MXSTR, "W error writing event for run %d",
2256 irun);
2257 factOut (kError, 503, str);
2258 evtCtrl.evtStat[k0] = 9902;
2259 gi.wrtErr++;
2260 }
2261
2262 if (i < 0
2263 || runCtrl[j].lastTime < g_actTime - 300
2264 || runCtrl[j].closeTime < g_actTime
2265 || runCtrl[j].maxEvt < runCtrl[j].actEvt) {
2266 int ii = 0;
2267 if (i < 0)
2268 ii = 1;
2269 else if (runCtrl[j].closeTime < g_actTime)
2270 ii = 2;
2271 else if (runCtrl[j].lastTime < g_actTime - 300)
2272 ii = 3;
2273 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2274 ii = 4;
2275
2276
2277
2278 //close run for whatever reason
2279 if (runCtrl[j].runId == gi_myRun)
2280 gi_myRun = g_actTime;
2281
2282 if (runCtrl[j].procId == 0) {
2283 runFinish1 (runCtrl[j].runId);
2284 runCtrl[j].procId = 92;
2285 }
2286
2287 runCtrl[j].closeTime = g_actTime - 1;
2288 i = runClose (runCtrl[j].fileHd, &runTail[j],
2289 sizeof (runTail[j]));
2290 if (i < 0) {
2291 snprintf (str, MXSTR, "error closing run %d %d AAA",
2292 runCtrl[j].runId, i);
2293 factOut (kError, 503, str);
2294 runCtrl[j].fileId = 92;
2295 } else {
2296 snprintf (str, MXSTR, "W closed run %d AAA %d", irun,
2297 ii);
2298 factOut (kInfo, 503, str);
2299 runCtrl[j].fileId = 93;
2300 }
2301 }
2302 }
2303 }
2304 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2305 numWait++;
2306 }
2307
2308 //check if we should close a run (mainly when no event pending)
2309 for (j = 0; j < MAX_RUN; j++) {
2310 if (runCtrl[j].fileId == 0
2311 && (runCtrl[j].closeTime < g_actTime
2312 || runCtrl[j].lastTime < g_actTime - 300
2313 || runCtrl[j].maxEvt <= runCtrl[j].actEvt)) {
2314 if (runCtrl[j].runId == gi_myRun)
2315 gi_myRun = g_actTime;
2316 int ii = 0;
2317 if (runCtrl[j].closeTime < g_actTime)
2318 ii = 2;
2319 else if (runCtrl[j].lastTime < g_actTime - 300)
2320 ii = 3;
2321 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2322 ii = 4;
2323
2324 if (runCtrl[j].procId == 0) {
2325 runFinish1 (runCtrl[j].runId);
2326 runCtrl[j].procId = 92;
2327 }
2328
2329 runCtrl[j].closeTime = g_actTime - 1;
2330 int i = runClose (runCtrl[j].fileHd, &runTail[j],
2331 sizeof (runTail[j]));
2332 if (i < 0) {
2333 snprintf (str, MXSTR, "error closing run %d %d BBB",
2334 runCtrl[j].runId, i);
2335 factOut (kError, 506, str);
2336 runCtrl[j].fileId = 94;
2337 } else {
2338 snprintf (str, MXSTR, "W closed run %d BBB %d",
2339 runCtrl[j].runId, ii);
2340 factOut (kInfo, 507, str);
2341 runCtrl[j].fileId = 95;
2342 }
2343 }
2344 }
2345
2346 if (numWrite == 0) {
2347 //seems we have nothing to do, so sleep a little
2348 xwait.tv_sec = 0;
2349 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2350 nanosleep (&xwait, NULL);
2351 }
2352
2353 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2354 snprintf (str, MXSTR, "Finish Write Process ...");
2355 factOut (kInfo, -1, str);
2356 gw_runStat = -22; //==> we should exit
2357 gj.writStat = -22; //==> we should exit
2358 goto closerun;
2359 }
2360 gw_runStat = gi_runStat;
2361 gj.writStat = gj.readStat;
2362
2363 }
2364
2365 //must close all open files ....
2366 snprintf (str, MXSTR, "Abort Writing Process ...");
2367 factOut (kInfo, -1, str);
2368
2369 closerun:
2370 snprintf (str, MXSTR, "Close all open files ...");
2371 factOut (kInfo, -1, str);
2372 for (j = 0; j < MAX_RUN; j++)
2373 if (runCtrl[j].fileId == 0) {
2374 if (runCtrl[j].runId == gi_myRun)
2375 gi_myRun = g_actTime;
2376
2377 if (runCtrl[j].procId == 0) {
2378 runFinish1 (runCtrl[j].runId);
2379 runCtrl[j].procId = 92;
2380 }
2381
2382 runCtrl[j].closeTime = g_actTime - 1;
2383 int i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));
2384 int ii = 0;
2385 if (runCtrl[j].closeTime < g_actTime)
2386 ii = 2;
2387 else if (runCtrl[j].lastTime < g_actTime - 300)
2388 ii = 3;
2389 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2390 ii = 4;
2391 if (i < 0) {
2392 snprintf (str, MXSTR, "error closing run %d %d CCC",
2393 runCtrl[j].runId, i);
2394 factOut (kError, 506, str);
2395 runCtrl[j].fileId = 96;
2396 } else {
2397 snprintf (str, MXSTR, "W closed run %d CCC %d", runCtrl[j].runId,
2398 ii);
2399 factOut (kInfo, 507, str);
2400 runCtrl[j].fileId = 97;
2401 }
2402 }
2403
2404 gw_runStat = -99;
2405 gj.writStat = -99;
2406 snprintf (str, MXSTR, "Exit Writing Process ...");
2407 factOut (kInfo, -1, str);
2408 return 0;
2409
2410
2411
2412
2413} /*-----------------------------------------------------------------*/
2414
2415
2416
2417
2418void
2419StartEvtBuild ()
2420{
2421
2422 int i, j, imax, status, th_ret[50];
2423 pthread_t thread[50];
2424 struct timespec xwait;
2425
2426 gi_runStat = gp_runStat = gw_runStat = 0;
2427 gj.readStat = gj.procStat = gj.writStat = 0;
2428
2429 snprintf (str, MXSTR, "Starting EventBuilder V15.07 A");
2430 factOut (kInfo, -1, str);
2431
2432//initialize run control logics
2433 for (i = 0; i < MAX_RUN; i++) {
2434 runCtrl[i].runId = 0;
2435 runCtrl[i].fileId = -2;
2436 }
2437
2438//prepare for subProcesses
2439 gi_maxSize = g_maxSize;
2440 if (gi_maxSize <= 0)
2441 gi_maxSize = 1;
2442
2443 gi_maxProc = g_maxProc;
2444 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2445 snprintf (str, MXSTR, "illegal number of processes %d", gi_maxProc);
2446 factOut (kFatal, 301, str);
2447 gi_maxProc = 1;
2448 }
2449//partially initialize event control logics
2450 evtCtrl.frstPtr = 0;
2451 evtCtrl.lastPtr = 0;
2452
2453//start all threads (more to come) when we are allowed to ....
2454 while (g_runStat == 0) {
2455 xwait.tv_sec = 0;
2456 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2457 nanosleep (&xwait, NULL);
2458 }
2459
2460 i = 0;
2461 th_ret[i] = pthread_create (&thread[i], NULL, readFAD, NULL);
2462 i++;
2463 th_ret[i] = pthread_create (&thread[i], NULL, procEvt, NULL);
2464 i++;
2465 th_ret[i] = pthread_create (&thread[i], NULL, writeEvt, NULL);
2466 i++;
2467 imax = i;
2468
2469
2470#ifdef BILAND
2471 xwait.tv_sec = 30;;
2472 xwait.tv_nsec = 0; // sleep for ~20sec
2473 nanosleep (&xwait, NULL);
2474
2475 printf ("close all runs in 2 seconds\n");
2476
2477 CloseRunFile (0, time (NULL) + 2, 0);
2478
2479 xwait.tv_sec = 1;;
2480 xwait.tv_nsec = 0; // sleep for ~20sec
2481 nanosleep (&xwait, NULL);
2482
2483 printf ("setting g_runstat to -1\n");
2484
2485 g_runStat = -1;
2486#endif
2487
2488
2489//wait for all threads to finish
2490 for (i = 0; i < imax; i++) {
2491 j = pthread_join (thread[i], (void **) &status);
2492 }
2493
2494} /*-----------------------------------------------------------------*/
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510 /*-----------------------------------------------------------------*/
2511 /*-----------------------------------------------------------------*/
2512 /*-----------------------------------------------------------------*/
2513 /*-----------------------------------------------------------------*/
2514 /*-----------------------------------------------------------------*/
2515
2516#ifdef BILAND
2517
2518int
2519subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2520 int8_t * buffer)
2521{
2522 printf ("called subproc %d\n", threadID);
2523 return threadID + 1;
2524}
2525
2526
2527
2528
2529 /*-----------------------------------------------------------------*/
2530 /*-----------------------------------------------------------------*/
2531 /*-----------------------------------------------------------------*/
2532 /*-----------------------------------------------------------------*/
2533 /*-----------------------------------------------------------------*/
2534
2535
2536
2537
2538FileHandle_t
2539runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2540{
2541 return 1;
2542};
2543
2544int
2545runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2546{
2547 return 1;
2548 usleep (10000);
2549 return 1;
2550}
2551
2552
2553//{ return 1; } ;
2554
2555int
2556runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2557{
2558 return 1;
2559};
2560
2561
2562
2563
2564int
2565eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2566{
2567 int i = 0;
2568
2569// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2570// for (i=0; i<NBOARDS; i++) {
2571// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2572// }
2573 return 0;
2574}
2575
2576
2577void
2578factStatNew (EVT_STAT gi)
2579{
2580 int i;
2581
2582//for (i=0;i<MAX_SOCK;i++) {
2583// printf("%4d",gi.numRead[i]);
2584// if (i%20 == 0 ) printf("\n");
2585//}
2586}
2587
2588void
2589gotNewRun (int runnr, PEVNT_HEADER * headers)
2590{
2591 printf ("got new run %d\n", runnr);
2592 return;
2593}
2594
2595void
2596factStat (GUI_STAT gj)
2597{
2598// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2599// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2600}
2601
2602
2603void
2604debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2605 int state, uint32_t tsec, uint32_t tusec)
2606{
2607// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2608}
2609
2610
2611
2612void
2613debugStream (int isock, void *buf, int len)
2614{
2615}
2616
2617void
2618debugHead (int i, int j, void *buf)
2619{
2620}
2621
2622
2623void
2624factOut (int severity, int err, char *message)
2625{
2626 static FILE *fd;
2627 static int file = 0;
2628
2629 if (file == 0) {
2630 printf ("open file\n");
2631 fd = fopen ("x.out", "w+");
2632 file = 999;
2633 }
2634
2635 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2636
2637 if (severity != kDebug)
2638 printf ("%3d %3d | %s\n", severity, err, message);
2639}
2640
2641
2642
2643int
2644main ()
2645{
2646 int i, b, c, p;
2647 char ipStr[100];
2648 struct in_addr IPaddr;
2649
2650 g_maxMem = 1024 * 1024; //MBytes
2651//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2652 g_maxMem = g_maxMem * 200; //100MBytes
2653
2654 g_maxProc = 20;
2655 g_maxSize = 30000;
2656
2657 g_runStat = 40;
2658
2659 i = 0;
2660
2661// version for standard crates
2662//for (c=0; c<4,c++) {
2663// for (b=0; b<10; b++) {
2664// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2665//
2666// inet_pton(PF_INET, ipStr, &IPaddr) ;
2667//
2668// g_port[i].sockAddr.sin_family = PF_INET;
2669// g_port[i].sockAddr.sin_port = htons(5000) ;
2670// g_port[i].sockAddr.sin_addr = IPaddr ;
2671// g_port[i].sockDef = 1 ;
2672// i++ ;
2673// }
2674//}
2675//
2676//version for PC-test *
2677 for (c = 0; c < 4; c++) {
2678 for (b = 0; b < 10; b++) {
2679 sprintf (ipStr, "10.0.%d.11", 128 + c);
2680 if (c < 2)
2681 sprintf (ipStr, "10.0.%d.11", 128);
2682 else
2683 sprintf (ipStr, "10.0.%d.11", 131);
2684// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2685
2686 inet_pton (PF_INET, ipStr, &IPaddr);
2687 p = 31919 + 100 * c + 10 * b;
2688
2689
2690 g_port[i].sockAddr.sin_family = PF_INET;
2691 g_port[i].sockAddr.sin_port = htons (p);
2692 g_port[i].sockAddr.sin_addr = IPaddr;
2693 g_port[i].sockDef = 1;
2694
2695 i++;
2696 }
2697 }
2698
2699
2700//g_port[17].sockDef =-1 ;
2701//g_actBoards-- ;
2702
2703 StartEvtBuild ();
2704
2705 return 0;
2706
2707}
2708#endif
Note: See TracBrowser for help on using the repository browser.