source: trunk/FACT++/src/EventBuilder.c@ 12458

Last change on this file since 12458 was 12453, checked in by lyard, 13 years ago
set TM channels to zero if not required (for DRS calib consistency)
File size: 86.7 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6
7#include <stdlib.h>
8#include <stdint.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include "EventBuilder.h"
26
27enum Severity
28{
29 kMessage = 10, ///< Just a message, usually obsolete
30 kInfo = 20, ///< An info telling something which can be interesting to know
31 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
32 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
33 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
34 kDebug = 99, ///< A message used for debugging only
35};
36
37#define MIN_LEN 32 // min #bytes needed to interpret FADheader
38#define MAX_LEN 256*1024 // size of read-buffer per socket
39
40//#define nanosleep(x,y)
41
42extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
43extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
44extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
45//extern int runFinish (uint32_t runnr);
46
47extern void factOut (int severity, int err, char *message);
48
49extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
50
51
52extern void factStat (GUI_STAT gj);
53
54extern void factStatNew (EVT_STAT gi);
55
56extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
57
58extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
59 int8_t * buffer);
60
61extern void debugHead (int i, int j, void *buf);
62
63extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
64 int32_t runnr, int state, uint32_t tsec,
65 uint32_t tusec);
66extern void debugStream (int isock, void *buf, int len);
67
68int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
69
70
71
72
73
74int g_maxProc;
75int g_maxSize;
76int gi_maxSize;
77int gi_maxProc;
78
79uint g_actTime;
80uint g_actUsec;
81int g_runStat;
82int g_reset;
83int g_useFTM;
84
85int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
86size_t g_maxMem; //maximum memory allowed for buffer
87
88//no longer needed ...
89int g_maxBoards; //maximum number of boards to be initialized
90int g_actBoards;
91//
92
93FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
94
95
96int gi_runStat;
97int gp_runStat;
98int gw_runStat;
99
100int gi_memStat = +1;
101
102uint32_t gi_myRun = 0;
103uint32_t actrun = 0;
104
105
106uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
107
108//uint gi_EvtStart= 0 ;
109//uint gi_EvtRead = 0 ;
110//uint gi_EvtBad = 0 ;
111//uint gi_EvtTot = 0 ;
112//size_t gi_usedMem = 0 ;
113
114//uint gw_EvtTot = 0 ;
115//uint gp_EvtTot = 0 ;
116
117PIX_MAP g_pixMap[NPIX];
118
119EVT_STAT gi;
120GUI_STAT gj;
121
122EVT_CTRL evtCtrl; //control of events during processing
123int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
124
125WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
126
127#define MXSTR 1000
128char str[MXSTR];
129
130//ETIENNE
131#define MAX_SLOTS_PER_CHUNK 100
132
133typedef struct {
134 int32_t eventNumber;
135 int32_t chunk;
136 int32_t slot;
137} CHUNK_MAPPING;
138
139CHUNK_MAPPING mBufferMapping[MAX_EVT * MAX_RUN];
140
141#define MAX_EVT_MEM (sizeof(EVENT) + NPIX*1024*2 + NTMARK*1024*2)
142#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
143#define MAX_SLOT_SIZE (MAX_EVT_MEM + MAX_HEAD_MEM)
144#define MAX_CHUNK_SIZE (MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK)
145
146typedef struct {
147 void * pointers[MAX_SLOTS_PER_CHUNK];
148 int32_t events[MAX_SLOTS_PER_CHUNK];
149 int32_t nFreeSlots;
150 int32_t nSlots;
151} ETI_CHUNK;
152
153int32_t numAllocatedChunks = 0;
154
155#define MAX_CHUNKS 8096
156ETI_CHUNK EtiMemoryChunks[MAX_CHUNKS];
157
158void* ETI_Malloc(int evtId, int evtIndex)
159{
160 int i,j;
161 for (i=0;i<numAllocatedChunks;i++) {
162 if (EtiMemoryChunks[i].nFreeSlots > 0) {
163 for (j=0;j<EtiMemoryChunks[i].nSlots;j++)
164 {
165 if (EtiMemoryChunks[i].events[j] == -1)
166 {
167 EtiMemoryChunks[i].events[j] = evtId;
168 EtiMemoryChunks[i].nFreeSlots--;
169 mBufferMapping[evtIndex].eventNumber = evtId;
170 mBufferMapping[evtIndex].chunk = i;
171 mBufferMapping[evtIndex].slot = j;
172 return EtiMemoryChunks[i].pointers[j];
173 }
174 }
175 //If I reach this point then we have a problem because it should have found
176 //a free spot just above.
177 }
178 }
179 //If we reach this point this means that we should allocate more memory
180 int32_t numNewSlots = 0;
181 if ((numAllocatedChunks + 1)*MAX_CHUNK_SIZE < g_maxMem)
182 numNewSlots = MAX_SLOTS_PER_CHUNK;
183 else
184 numNewSlots = (g_maxMem - numAllocatedChunks*MAX_CHUNK_SIZE)/MAX_SLOT_SIZE;
185
186 if (numNewSlots == 0)//cannot allocate more without exceeding the max mem limit
187 {
188 return NULL;
189 }
190
191 EtiMemoryChunks[numAllocatedChunks].pointers[0] = malloc(MAX_SLOT_SIZE*numNewSlots);
192 if (EtiMemoryChunks[numAllocatedChunks].pointers[0] == NULL)
193 {
194 snprintf (str, MXSTR, "could not allocate %lu bytes. %d chunks are currently allocated (max allowed %lu bytes)", MAX_CHUNK_SIZE, numAllocatedChunks, g_maxMem);
195 factOut (kError, 000, str);
196 return NULL;
197 }
198
199 EtiMemoryChunks[numAllocatedChunks].nSlots = numNewSlots;
200 EtiMemoryChunks[numAllocatedChunks].events[0] = evtId;
201 EtiMemoryChunks[numAllocatedChunks].nFreeSlots = numNewSlots-1;
202 mBufferMapping[evtIndex].eventNumber = evtId;
203 mBufferMapping[evtIndex].chunk = numAllocatedChunks;
204 mBufferMapping[evtIndex].slot = 0;
205
206 for (i=1;i<numNewSlots;i++)
207 {
208 EtiMemoryChunks[numAllocatedChunks].pointers[i] = &(((char*)(EtiMemoryChunks[numAllocatedChunks].pointers[i-1]))[MAX_SLOT_SIZE]);
209 EtiMemoryChunks[numAllocatedChunks].events[i] = -1;
210 }
211 numAllocatedChunks++;
212
213 return EtiMemoryChunks[numAllocatedChunks-1].pointers[0];
214}
215
216void ETI_Free(int evtId, int evtIndex)
217{
218 ETI_CHUNK* currentChunk = &EtiMemoryChunks[mBufferMapping[evtIndex].chunk];
219 if (currentChunk->events[mBufferMapping[evtIndex].slot] != evtId)
220 {
221 snprintf (str, MXSTR, "Mismatch in chunk mapping table. expected evtId %d. Got %d. No memory was freed.", evtId, currentChunk->events[mBufferMapping[evtIndex].slot]);
222 factOut (kError, 000, str);
223 return;
224 }
225 currentChunk->events[mBufferMapping[evtIndex].slot] = -1;
226 currentChunk->nFreeSlots++;
227
228 int chunkIndex = mBufferMapping[evtIndex].chunk;
229 if (chunkIndex != numAllocatedChunks-1)
230 return;
231
232 while (EtiMemoryChunks[chunkIndex].nFreeSlots == EtiMemoryChunks[chunkIndex].nSlots)
233 {//free this chunk
234 free(EtiMemoryChunks[chunkIndex].pointers[0]);
235 numAllocatedChunks--;
236 chunkIndex--;
237 if (numAllocatedChunks == 0)
238 break;
239 }
240}
241//END ETIENNE
242
243
244
245RUN_HEAD actRun;
246
247RUN_CTRL runCtrl[MAX_RUN];
248
249RUN_TAIL runTail[MAX_RUN];
250
251
252/*
253*** Definition of rdBuffer to read in IP packets; keep it global !!!!
254 */
255
256
257typedef union
258{
259 int8_t B[MAX_LEN];
260 int16_t S[MAX_LEN / 2];
261 int32_t I[MAX_LEN / 4];
262 int64_t L[MAX_LEN / 8];
263} CNV_FACT;
264
265typedef struct
266{
267 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
268 int32_t bufPos; //next byte to read to the buffer next
269 int32_t bufLen; //number of bytes left to read
270// size_t bufLen; //number of bytes left to read size_t might be better
271 int32_t skip; //number of bytes skipped before start of event
272
273 int errCnt; //how often connect failed since last successful
274 int sockStat; //-1 if socket not yet connected , 99 if not exist
275 int socket; //contains the sockets
276 struct sockaddr_in SockAddr; //IP for each socket
277
278 int evtID; // event ID of event currently read
279 int runID; // run "
280 int ftmID; // event ID from FTM
281 uint fadLen; // FADlength of event currently read
282 int fadVers; // Version of FAD
283 int ftmTyp; // trigger type
284 int board; // boardID (softwareID: 0..40 )
285 int Port;
286
287 CNV_FACT *rBuf;
288
289#ifdef EVTDEBUG
290 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
291#endif
292
293} READ_STRUCT;
294
295
296typedef union
297{
298 int8_t B[2];
299 int16_t S;
300} SHORT_BYTE;
301
302
303
304
305
306SHORT_BYTE start, stop;
307
308READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
309
310
311
312/*-----------------------------------------------------------------*/
313
314
315/*-----------------------------------------------------------------*/
316
317int
318runFinish1 (uint32_t runnr)
319{
320 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
321 runnr);
322 factOut (kInfo, 173, str); //but continue anyhow
323 return 0;
324}
325int
326runFinish (uint32_t runnr)
327{
328 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
329 runnr);
330 factOut (kInfo, 173, str); //but continue anyhow
331 return 0;
332}
333
334int
335GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
336 READ_STRUCT * rd)
337{
338/*
339*** generate Address, create sockets and allocates readbuffer for it
340***
341*** if flag==0 generate socket and buffer
342*** <0 destroy socket and buffer
343*** >0 close and redo socket
344***
345*** sid : board*7 + port id
346 */
347
348 int j;
349 int optval = 1; //activate keepalive
350 socklen_t optlen = sizeof (optval);
351
352
353 if (sid % 7 >= NUMSOCK) {
354 //this is a not used socket, so do nothing ...
355 rd->sockStat = 77;
356 rd->rBuf = NULL ;
357 return 0;
358 }
359
360 if (rd->sockStat == 0) { //close socket if open
361 j = close (rd->socket);
362 if (j > 0) {
363 snprintf (str, MXSTR, "Error closing socket %d | %m", sid);
364 factOut (kFatal, 771, str);
365 } else {
366 snprintf (str, MXSTR, "Succesfully closed socket %d", sid);
367 factOut (kInfo, 771, str);
368 }
369 }
370
371 rd->sockStat = 99;
372
373 if (flag < 0) {
374 free (rd->rBuf); //and never open again
375#ifdef EVTDEBUG
376 free (rd->xBuf); //and never open again
377#endif
378 rd->rBuf = NULL;
379 return 0;
380 }
381
382
383 if (flag == 0) { //generate address and buffer ...
384 rd->Port = port;
385 rd->SockAddr.sin_family = sockAddr->sin_family;
386 rd->SockAddr.sin_port = htons (port);
387 rd->SockAddr.sin_addr = sockAddr->sin_addr;
388
389#ifdef EVTDEBUG
390 rd->xBuf = malloc (sizeof (CNV_FACT));
391#endif
392 rd->rBuf = malloc (sizeof (CNV_FACT));
393 if (rd->rBuf == NULL) {
394 snprintf (str, MXSTR, "Could not create local buffer %d", sid);
395 factOut (kFatal, 774, str);
396 rd->sockStat = 77;
397 return -3;
398 }
399 }
400
401
402 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
403 snprintf (str, MXSTR, "Could not generate socket %d | %m", sid);
404 factOut (kFatal, 773, str);
405 rd->sockStat = 88;
406 return -2;
407 }
408 optval = 1;
409 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
410 snprintf (str, MXSTR, "Could not set keepalive %d | %m", sid);
411 factOut (kInfo, 173, str); //but continue anyhow
412 }
413 optval = 10; //start after 10 seconds
414 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
415 snprintf (str, MXSTR, "Could not set keepidle %d | %m", sid);
416 factOut (kInfo, 173, str); //but continue anyhow
417 }
418 optval = 10; //do every 10 seconds
419 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
420 snprintf (str, MXSTR, "Could not set keepintvl %d | %m", sid);
421 factOut (kInfo, 173, str); //but continue anyhow
422 }
423 optval = 2; //close after 2 unsuccessful tries
424 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
425 snprintf (str, MXSTR, "Could not set keepalive probes %d | %m", sid);
426 factOut (kInfo, 173, str); //but continue anyhow
427 }
428
429
430 snprintf (str, MXSTR, "Successfully generated socket %d ", sid);
431 factOut (kInfo, 773, str);
432 rd->sockStat = -1; //try to (re)open socket
433 rd->errCnt = 0;
434 return 0;
435
436} /*-----------------------------------------------------------------*/
437
438 /*-----------------------------------------------------------------*/
439
440
441
442
443int
444mBufInit ()
445{
446// initialize mBuffer (mark all entries as unused\empty)
447
448 int i;
449 uint32_t actime;
450
451 actime = g_actTime + 50000000;
452
453 for (i = 0; i < MAX_EVT * MAX_RUN; i++) {
454 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
455 mBuffer[i].runNum = 0;
456
457 evtCtrl.evtBuf[i] = -1;
458 evtCtrl.evtStat[i] = -1;
459 evtCtrl.pcTime[i] = actime; //initiate to far future
460
461 //ETIENNE
462 mBufferMapping[i].chunk = -1;
463 mBufferMapping[i].eventNumber = -1;
464 mBufferMapping[i].slot = -1;
465 //END ETIENNE
466 }
467
468 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
469
470 evtCtrl.frstPtr = 0;
471 evtCtrl.lastPtr = 0;
472
473 return 0;
474
475} /*-----------------------------------------------------------------*/
476
477
478
479
480int
481mBufEvt (int evID, uint runID, int nRoi[], int sk,
482 int fadlen, int trgTyp, int trgNum, int fadNum)
483{
484// generate a new Event into mBuffer:
485// make sure only complete Event are possible, so 'free' will always work
486// returns index into mBuffer[], or negative value in case of error
487// error: <-9000 if roi screwed up (not consistent with run)
488// <-8000 (not consistent with event)
489// <-7000 (not consistent with board)
490// < 0 if no space left
491
492 struct timeval *tv, atv;
493 tv = &atv;
494 uint32_t tsec, tusec;
495 uint oldest;
496 int jold;
497
498 int i, k, jr, b, evFree;
499 int headmem = 0;
500 size_t needmem = 0;
501
502
503 b = sk / 7;
504
505 if (nRoi[0] < 0 || nRoi[0] > 1024) {
506 snprintf (str, MXSTR, "illegal nRoi[0]: %d", nRoi[0]);
507 factOut (kError, 999, str);
508 gj.badRoiR++;
509 gj.badRoi[b]++;
510 return -9999;
511 }
512
513 for (jr = 1; jr < 8; jr++) {
514 if (nRoi[jr] != nRoi[0]) {
515 snprintf (str, MXSTR, "wrong nRoi[%d]: %d %d", jr, nRoi[jr],
516 nRoi[0]);
517 factOut (kError, 711, str);
518 gj.badRoiB++;
519 gj.badRoi[b]++;
520 return -7101;
521 }
522 }
523 if (nRoi[8] < nRoi[0]) {
524 snprintf (str, MXSTR, "wrong nRoi_TM: %d %d", nRoi[8], nRoi[0]);
525 factOut (kError, 712, str);
526 gj.badRoiB++;
527 gj.badRoi[b]++;
528 return -7102;
529 }
530
531
532 i = evID % MAX_EVT;
533 evFree = -1;
534
535 for (k = 0; k < MAX_RUN; k++) {
536 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
537 // is it ok ????
538 if (mBuffer[i].nRoi != nRoi[0]
539 || mBuffer[i].nRoiTM != nRoi[8]) {
540 snprintf (str, MXSTR, "illegal evt_roi %d %d ; %d %d",
541 nRoi[0], nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM);
542 factOut (kError, 821, str);
543 gj.badRoiE++;
544 gj.badRoi[b]++;
545 return -8201;
546 }
547// count for inconsistencies
548
549 if (mBuffer[i].trgNum != trgNum)
550 mBuffer[i].Errors[0]++;
551 if (mBuffer[i].fadNum != fadNum)
552 mBuffer[i].Errors[1]++;
553 if (mBuffer[i].trgTyp != trgTyp)
554 mBuffer[i].Errors[2]++;
555
556 //everything seems fine so far ==> use this slot ....
557 return i;
558 }
559 if (evFree < 0 && mBuffer[i].evNum < 0)
560 evFree = i;
561 i += MAX_EVT;
562 }
563
564
565 //event does not yet exist; create it
566 if (evFree < 0) { //no space available in ctrl
567 snprintf (str, MXSTR, "no control slot to keep event %d", evID);
568 factOut (kError, 881, str);
569 return -1;
570 }
571 i = evFree; //found free entry; use it ...
572
573 gettimeofday (tv, NULL);
574 tsec = atv.tv_sec;
575 tusec = atv.tv_usec;
576
577 //check if runId already registered in runCtrl
578 evFree = -1;
579 oldest = g_actTime + 1000;
580 jold = -1;
581 for (k = 0; k < MAX_RUN; k++) {
582 if (runCtrl[k].runId == runID) {
583// if (runCtrl[k].procId > 0) { //run is closed -> reject
584// snprintf (str, MXSTR, "skip event since run %d finished", runID);
585// factOut (kInfo, 931, str);
586// return -21;
587// }
588
589 if (runCtrl[k].roi0 != nRoi[0]
590 || runCtrl[k].roi8 != nRoi[8]) {
591 snprintf (str, MXSTR, "illegal run_roi %d %d ; %d %d",
592 nRoi[0], nRoi[8], runCtrl[k].roi0, runCtrl[k].roi8);
593 factOut (kError, 931, str);
594 gj.badRoiR++;
595 gj.badRoi[b]++;
596 return -9301;
597 }
598 goto RUNFOUND;
599 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
600 evFree = k;
601 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
602 if (runCtrl[k].closeTime < oldest) {
603 oldest = runCtrl[k].closeTime;
604 jold = k;
605 }
606 }
607 }
608
609 if (evFree < 0 && jold < 0) {
610 snprintf (str, MXSTR, "not able to register the new run %d", runID);
611 factOut (kFatal, 883, str);
612 return -1001;
613 } else {
614 if (evFree < 0)
615 evFree = jold;
616 snprintf (str, MXSTR, "register new run %d(%d) roi: %d %d", runID,
617 evFree, nRoi[0], nRoi[8]);
618 factOut (kInfo, 503, str);
619 runCtrl[evFree].runId = runID;
620 runCtrl[evFree].roi0 = nRoi[0];
621 runCtrl[evFree].roi8 = nRoi[8];
622 runCtrl[evFree].fileId = -2;
623 runCtrl[evFree].procId = -2;
624 runCtrl[evFree].lastEvt = -1;
625 runCtrl[evFree].nextEvt = 0;
626 runCtrl[evFree].actEvt = 0;
627 runCtrl[evFree].procEvt = 0;
628 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
629 runCtrl[evFree].firstUsec = tusec;
630 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
631 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
632// runCtrl[evFree].lastTime = 0;
633
634 runTail[evFree].nEventsOk =
635 runTail[evFree].nEventsRej =
636 runTail[evFree].nEventsBad =
637 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
638 }
639
640 RUNFOUND:
641 //ETIENNE
642/* needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
643
644 headmem = NBOARDS * sizeof (PEVNT_HEADER);
645
646 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
647 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
648 if (gi_memStat > 0) {
649 gi_memStat = -99;
650 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
651 evID, sk);
652 factOut (kError, 882, str);
653 } else {
654 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
655 evID, sk);
656 factOut (kDebug, 882, str);
657 }
658 return -11;
659 }
660*/
661 mBuffer[i].FADhead = ETI_Malloc(evID, i);
662 mBuffer[i].buffer = NULL;
663 if (mBuffer[i].FADhead != NULL)
664 mBuffer[i].fEvent = (EVENT*)&(((char*)(mBuffer[i].FADhead))[MAX_HEAD_MEM]);
665 else
666 {
667 mBuffer[i].fEvent = NULL;
668 gj.usdMem = 0;
669 for (k=0;k<numAllocatedChunks;k++)
670 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
671 if (gj.usdMem > gj.maxMem)
672 gj.maxMem = gj.usdMem;
673 if (gi_memStat > 0) {
674 gi_memStat = -99;
675 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
676 evID, sk);
677 factOut (kError, 882, str);
678 } else {
679 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
680 evID, sk);
681 factOut (kDebug, 882, str);
682 }
683 return -11;
684 }
685 /*
686 mBuffer[i].FADhead = malloc (headmem);
687 if (mBuffer[i].FADhead == NULL) {
688 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
689 factOut (kError, 882, str);
690 return -12;
691 }
692
693 mBuffer[i].fEvent = malloc (needmem);
694 if (mBuffer[i].fEvent == NULL) {
695 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
696 factOut (kError, 882, str);
697 free (mBuffer[i].FADhead);
698 mBuffer[i].FADhead = NULL;
699 return -22;
700 }
701
702 mBuffer[i].buffer = malloc (gi_maxSize);
703 if (mBuffer[i].buffer == NULL) {
704 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
705 factOut (kError, 882, str);
706 free (mBuffer[i].FADhead);
707 mBuffer[i].FADhead = NULL;
708 free (mBuffer[i].fEvent);
709 mBuffer[i].fEvent = NULL;
710 return -32;
711 }*/
712 //END ETIENNE
713 //flag all boards as unused
714 mBuffer[i].nBoard = 0;
715 for (k = 0; k < NBOARDS; k++) {
716 mBuffer[i].board[k] = -1;
717 }
718 //flag all pixels as unused
719 for (k = 0; k < NPIX; k++) {
720 mBuffer[i].fEvent->StartPix[k] = -1;
721 }
722 //flag all TMark as unused
723 for (k = 0; k < NTMARK; k++) {
724 mBuffer[i].fEvent->StartTM[k] = -1;
725 }
726
727 mBuffer[i].fEvent->NumBoards = 0;
728 mBuffer[i].fEvent->PCUsec = tusec;
729 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
730 mBuffer[i].nRoi = nRoi[0];
731 mBuffer[i].nRoiTM = nRoi[8];
732 mBuffer[i].evNum = evID;
733 mBuffer[i].runNum = runID;
734 mBuffer[i].fadNum = fadNum;
735 mBuffer[i].trgNum = trgNum;
736 mBuffer[i].trgTyp = trgTyp;
737 mBuffer[i].evtLen = needmem;
738 mBuffer[i].Errors[0] =
739 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
740//ETIENNE
741 //gj.usdMem += needmem + headmem + gi_maxSize;
742 gj.usdMem = 0;
743 for (k=0;k<numAllocatedChunks;k++)
744 {
745 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
746 }
747//END ETIENNE
748 if (gj.usdMem > gj.maxMem)
749 gj.maxMem = gj.usdMem;
750
751 gj.bufTot++;
752 if (gj.bufTot > gj.maxEvt)
753 gj.maxEvt = gj.bufTot;
754
755 gj.rateNew++;
756
757 //register event in 'active list (reading)'
758
759 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
760 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
761 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
762 evtIdx[i] = evtCtrl.lastPtr;
763
764
765 snprintf (str, MXSTR,
766 "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", evID,
767 runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
768 factOut (kDebug, -11, str);
769 evtCtrl.lastPtr++;
770 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN)
771 evtCtrl.lastPtr = 0;
772
773 gi.evtGet++;
774
775 return i;
776
777} /*-----------------------------------------------------------------*/
778
779
780
781
782int
783mBufFree (int i)
784{
785//delete entry [i] from mBuffer:
786//(and make sure multiple calls do no harm ....)
787
788 int headmem = 0;
789 int evid;
790 size_t freemem = 0;
791
792 evid = mBuffer[i].evNum;
793 freemem = mBuffer[i].evtLen;
794 //ETIENNE
795 ETI_Free(evid, i);
796// free (mBuffer[i].fEvent);
797 mBuffer[i].fEvent = NULL;
798
799// free (mBuffer[i].FADhead);
800 mBuffer[i].FADhead = NULL;
801
802// free (mBuffer[i].buffer);
803 mBuffer[i].buffer = NULL;
804//END ETIENNE
805 headmem = NBOARDS * sizeof (PEVNT_HEADER);
806 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
807 mBuffer[i].runNum = 0;
808//ETIENNE
809// gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
810 gj.usdMem = 0;
811 int k;
812 for (k=0;k<numAllocatedChunks;k++)
813 {
814 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
815 }
816//END ETIENNE
817 gj.bufTot--;
818
819 if (gi_memStat < 0) {
820 if (gj.usdMem <= 0.75 * gj.maxMem)
821 gi_memStat = +1;
822 }
823
824
825 return 0;
826
827} /*-----------------------------------------------------------------*/
828
829
830void
831resetEvtStat ()
832{
833 int i;
834
835 for (i = 0; i < MAX_SOCK; i++)
836 gi.numRead[i] = 0;
837
838 for (i = 0; i < NBOARDS; i++) {
839 gi.gotByte[i] = 0;
840 gi.gotErr[i] = 0;
841
842 }
843
844 gi.evtGet = 0; //#new Start of Events read
845 gi.evtTot = 0; //#complete Events read
846 gi.evtErr = 0; //#Events with Errors
847 gi.evtSkp = 0; //#Events incomplete (timeout)
848
849 gi.procTot = 0; //#Events processed
850 gi.procErr = 0; //#Events showed problem in processing
851 gi.procTrg = 0; //#Events accepted by SW trigger
852 gi.procSkp = 0; //#Events rejected by SW trigger
853
854 gi.feedTot = 0; //#Events used for feedBack system
855 gi.feedErr = 0; //#Events rejected by feedBack
856
857 gi.wrtTot = 0; //#Events written to disk
858 gi.wrtErr = 0; //#Events with write-error
859
860 gi.runOpen = 0; //#Runs opened
861 gi.runClose = 0; //#Runs closed
862 gi.runErr = 0; //#Runs with open/close errors
863
864 return;
865} /*-----------------------------------------------------------------*/
866
867
868
869void
870initReadFAD ()
871{
872 return;
873} /*-----------------------------------------------------------------*/
874
875
876
877void *
878readFAD (void *ptr)
879{
880/* *** main loop reading FAD data and sorting them to complete events */
881 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k;
882 int actBoards = 0, minLen;
883 int32_t jrd;
884 uint gi_SecTime; //time in seconds
885 int boardId, roi[9], drs, px, src, pixS, pixH, pixC, pixR, tmS;
886
887 int goodhed = 0;
888
889 int sockDef[NBOARDS]; //internal state of sockets
890 int jrdx;
891
892
893 struct timespec xwait;
894
895
896 struct timeval *tv, atv;
897 tv = &atv;
898 uint32_t tsec, tusec;
899
900
901 snprintf (str, MXSTR, "start initializing");
902 factOut (kInfo, -1, str);
903
904 int cpu = 7; //read thread
905 cpu_set_t mask;
906
907/* CPU_ZERO initializes all the bits in the mask to zero. */
908// CPU_ZERO (&mask);
909/* CPU_SET sets only the bit corresponding to cpu. */
910 cpu = 7;
911// CPU_SET (cpu, &mask);
912
913/* sched_setaffinity returns 0 in success */
914// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
915// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
916// factOut (kWarn, -1, str);
917// }
918
919
920 head_len = sizeof (PEVNT_HEADER);
921 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
922 minLen = head_len; //min #bytes needed to check header: full header for debug
923
924 start.S = 0xFB01;
925 stop.S = 0x04FE;
926
927/* initialize run control logics */
928 for (i = 0; i < MAX_RUN; i++) {
929 runCtrl[i].runId = 0;
930 runCtrl[i].fileId = -2;
931 runCtrl[i].procId = -2;
932 }
933 gi_resetS = gi_resetR = 9;
934 for (i = 0; i < NBOARDS; i++)
935 sockDef[i] = 0;
936
937 START:
938 gettimeofday (tv, NULL);
939 g_actTime = tsec = atv.tv_sec;
940 g_actUsec = tusec = atv.tv_usec;
941 gi_myRun = g_actTime;
942 evtCtrl.frstPtr = 0;
943 evtCtrl.lastPtr = 0;
944
945 gi_SecTime = g_actTime;
946 gi_runStat = g_runStat;
947 gj.readStat = g_runStat;
948 numok = numok2 = 0;
949
950 int cntsock = 8 - NUMSOCK ;
951
952 if (gi_resetS > 0) {
953 //make sure all sockets are preallocated as 'not exist'
954 for (i = 0; i < MAX_SOCK; i++) {
955 rd[i].socket = -1;
956 rd[i].sockStat = 99;
957 }
958
959 for (k = 0; k < NBOARDS; k++) {
960 gi_NumConnect[k] = 0;
961 gi.numConn[k] = 0;
962 gj.numConn[k] = 0;
963 gj.errConn[k] = 0;
964 gj.rateBytes[k] = 0;
965 gj.totBytes[k] = 0;
966 }
967
968 }
969
970
971 if (gi_resetR > 0) {
972 resetEvtStat ();
973 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
974 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
975 gj.totMem = g_maxMem;
976 gj.bufNew = gj.bufEvt = 0;
977 gj.badRoiE = gj.badRoiR = gj.badRoiB =
978 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
979
980 int b,p;
981 for (b = 0; b < NBOARDS; b++)
982 gj.badRoi[b] = 0;
983
984 mBufInit (); //initialize buffers
985
986 snprintf (str, MXSTR, "end initializing");
987 factOut (kInfo, -1, str);
988 }
989
990
991 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
992
993 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
994
995 gi_runStat = g_runStat;
996 gj.readStat = g_runStat;
997 gettimeofday (tv, NULL);
998 g_actTime = tsec = atv.tv_sec;
999 g_actUsec = tusec = atv.tv_usec;
1000
1001
1002 int b, p, p0, s0, nch;
1003 nch = 0;
1004 for (b = 0; b < NBOARDS; b++) {
1005 k = b * 7;
1006 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
1007 nch++;
1008 gi_NumConnect[b] = 0; //must close all connections
1009 gi.numConn[b] = 0;
1010 gj.numConn[b] = 0;
1011 if (sockDef[b] == 0)
1012 s0 = 0; //sockets to be defined and opened
1013 else if (g_port[b].sockDef == 0)
1014 s0 = -1; //sockets to be destroyed
1015 else
1016 s0 = +1; //sockets to be closed and reopened
1017
1018 if (s0 == 0)
1019 p0 = ntohs (g_port[b].sockAddr.sin_port);
1020 else
1021 p0 = 0;
1022
1023 for (p = p0 + 1; p < p0 + 8; p++) {
1024 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
1025 k++;
1026 }
1027 sockDef[b] = g_port[b].sockDef;
1028 }
1029 }
1030
1031 if (nch > 0) {
1032 actBoards = 0;
1033 for (b = 0; b < NBOARDS; b++) {
1034 if (sockDef[b] > 0)
1035 actBoards++;
1036 }
1037 }
1038
1039
1040 jrdx = 0;
1041 numokx = 0;
1042 numok = 0; //count number of succesfull actions
1043
1044 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read
1045 b = i / 7 ;
1046 p = i % 7 ;
1047
1048if ( p >= NUMSOCK) { ; }
1049else {
1050 if (sockDef[b] > 0)
1051 s0 = +1;
1052 else
1053 s0 = -1;
1054
1055 if (rd[i].sockStat < 0) { //try to connect if not yet done
1056 if (rd[i].sockStat == -1) {
1057 rd[i].sockStat = connect (rd[i].socket,
1058 (struct sockaddr *) &rd[i].SockAddr,
1059 sizeof (rd[i].SockAddr));
1060 if (rd[i].sockStat == -1) {
1061 rd[i].errCnt++ ;
1062// if (rd[i].errCnt < 100) rd[i].sockStat = rd[i].errCnt ;
1063// else if (rd[i].errCnt < 1000) rd[i].sockStat = 1000 ;
1064// else rd[i].sockStat = 10000 ;
1065 }
1066//printf("try to connect %d -> %d\n",i,rd[i].sockStat);
1067
1068 }
1069 if (rd[i].sockStat < -1 ) {
1070 rd[i].sockStat++ ;
1071 }
1072 if (rd[i].sockStat == 0) { //successfull ==>
1073 if (sockDef[b] > 0) {
1074 rd[i].bufTyp = 0; // expect a header
1075 rd[i].bufLen = frst_len; // max size to read at begining
1076 } else {
1077 rd[i].bufTyp = -1; // full data to be skipped
1078 rd[i].bufLen = MAX_LEN; //huge for skipping
1079 }
1080 rd[i].bufPos = 0; // no byte read so far
1081 rd[i].skip = 0; // start empty
1082// gi_NumConnect[b]++;
1083 gi_NumConnect[b] += cntsock ;
1084
1085 gi.numConn[b]++;
1086 gj.numConn[b]++;
1087 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);
1088 factOut (kInfo, -1, str);
1089 }
1090 }
1091
1092 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
1093 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
1094 numok++;
1095 size_t maxread = rd[i].bufLen ;
1096 if (maxread > MAXREAD ) maxread=MAXREAD ;
1097
1098 jrd =
1099 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
1100 maxread, MSG_DONTWAIT);
1101// rd[i].bufLen, MSG_DONTWAIT);
1102
1103 if (jrd > 0) {
1104 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
1105#ifdef EVTDEBUG
1106 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
1107 &rd[i].rBuf->B[rd[i].bufPos], jrd);
1108 snprintf (str, MXSTR,
1109 "read sock %3d bytes %5d len %5d first %d %d", i,
1110 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos],
1111 rd[i].rBuf->B[rd[i].bufPos + 1]);
1112 factOut (kDebug, 301, str);
1113#endif
1114 }
1115
1116 if (jrd == 0) { //connection has closed ...
1117 snprintf (str, MXSTR, "Socket %d closed by FAD", i);
1118 factOut (kInfo, 441, str);
1119 GenSock (s0, i, 0, NULL, &rd[i]);
1120 gi.gotErr[b]++;
1121// gi_NumConnect[b]--;
1122 gi_NumConnect[b]-= cntsock ;
1123 gi.numConn[b]--;
1124 gj.numConn[b]--;
1125
1126 } else if (jrd < 0) { //did not read anything
1127 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1128 snprintf (str, MXSTR, "Error Reading from %d | %m", i);
1129 factOut (kError, 442, str);
1130 gi.gotErr[b]++;
1131 } else
1132 numok--; //else nothing waiting to be read
1133 jrd = 0;
1134 }
1135 } else {
1136 jrd = 0; //did read nothing as requested
1137 snprintf (str, MXSTR, "do not read from socket %d %d", i,
1138 rd[i].bufLen);
1139 factOut (kDebug, 301, str);
1140 }
1141
1142 gi.gotByte[b] += jrd;
1143 gj.rateBytes[b] += jrd;
1144
1145 if (jrd > 0) {
1146 numokx++;
1147 jrdx += jrd;
1148 }
1149
1150
1151 if (rd[i].bufTyp < 0) { // we are skipping this board ...
1152// just do nothing
1153#ifdef EVTDEBUG
1154 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
1155 i);
1156 factOut (kInfo, 301, str);
1157#endif
1158
1159 } else if (rd[i].bufTyp > 0) { // we are reading data ...
1160 if (jrd < rd[i].bufLen) { //not yet all read
1161 rd[i].bufPos += jrd; //==> prepare for continuation
1162 rd[i].bufLen -= jrd;
1163 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
1164 } else { //full dataset read
1165 rd[i].bufLen = 0;
1166 rd[i].bufPos = rd[i].fadLen;
1167 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
1168 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
1169 gi.evtErr++;
1170 snprintf (str, MXSTR,
1171 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
1172 i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0],
1173 rd[i].rBuf->B[1],
1174 rd[i].rBuf->B[rd[i].bufPos - 2],
1175 rd[i].rBuf->B[rd[i].bufPos - 1]);
1176 factOut (kError, 301, str);
1177 goto EndBuf;
1178
1179#ifdef EVTDEBUG
1180 } else {
1181 snprintf (str, MXSTR,
1182 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
1183 i, rd[i].fadLen, rd[i].rBuf->B[0],
1184 rd[i].rBuf->B[1], start.B[1], start.B[0],
1185 rd[i].rBuf->B[rd[i].bufPos - 2],
1186 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
1187 stop.B[0]);
1188 factOut (kDebug, 301, str);
1189#endif
1190 }
1191
1192 if (jrd > 0)
1193 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1194
1195/* //we have a complete buffer, copy to WORK area
1196 int jr, kr;
1197 int checkRoi;
1198 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1199 for (kr = 1; kr < 4; kr++)
1200 {
1201 checkRoi = ntohs(rd[i].rBuf->S[head_len/ 2 +
1202 + kr*(roi[jr-1]+4)]);
1203 if (checkRoi != roi[0])
1204 {
1205 snprintf (str, MXSTR, "Inconsistent Roi accross board patches");
1206 factOut (kFatal, 1, str);
1207 goto EndBuf;
1208 }
1209
1210 }
1211 for (jr = 1; jr < 9; jr++) {
1212 roi[jr] =
1213 ntohs (rd[i].
1214 rBuf->S[head_len / 2 + 2 + jr * (roi[jr-1] + 4)]);
1215
1216 }
1217*/
1218 //we have a complete buffer, copy to WORK area
1219 int jr, kr;
1220 int checkRoi;
1221 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1222 for (kr = 1; kr < 4; kr++)
1223 {
1224 checkRoi = ntohs(rd[i].rBuf->S[head_len/ 2 + 2
1225 + kr*(roi[0]+4)]);
1226 if (checkRoi != roi[0])
1227 {
1228 snprintf (str, MXSTR, "Inconsistent Roi accross board patches a %d %d %d", kr, checkRoi, roi[0]);
1229 factOut (kError, 1, str);
1230 goto EndBuf;
1231 }
1232
1233 }
1234 for (jr = 1; jr < 9; jr++) {
1235 roi[jr] =
1236 ntohs (rd[i].
1237 rBuf->S[head_len / 2 + 2 + 9 * jr * (roi[jr-1] + 4)]);
1238 for (kr = 1; kr < 4; kr++)
1239 {
1240 checkRoi = ntohs(rd[i].rBuf->S[ head_len/2 + 2 //header up to first roi
1241 + kr*(roi[jr]+4) //shift up to the next board
1242 + 9*jr*(roi[jr-1]+4)]); //shift up to the correct pixel
1243 if (checkRoi != roi[jr])
1244 {
1245 snprintf (str, MXSTR, "Inconsistent Roi accross board patches");
1246 factOut (kFatal, 1, str);
1247 goto EndBuf;
1248 }
1249 }
1250 }
1251 //get index into mBuffer for this event (create if needed)
1252
1253 int actid;
1254 if (g_useFTM > 0)
1255 actid = rd[i].evtID;
1256 else
1257 actid = rd[i].ftmID;
1258
1259 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1260 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1261 rd[i].evtID);
1262
1263 if (evID < -1000) {
1264 goto EndBuf; //not usable board/event/run --> skip it
1265 }
1266 if (evID < 0) { //no space left, retry later
1267#ifdef EVTDEBUG
1268 if (rd[i].bufLen != 0) {
1269 snprintf (str, MXSTR, "something screwed up");
1270 factOut (kFatal, 1, str);
1271 }
1272#endif
1273 xwait.tv_sec = 0;
1274 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1275 nanosleep (&xwait, NULL);
1276 goto EndBuf1; //hope there is free space next round
1277 }
1278 //we have a valid entry in mBuffer[]; fill it
1279
1280#ifdef EVTDEBUG
1281 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1282 rd[i].fadLen);
1283 if (xchk != 0) {
1284 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d",
1285 xchk, rd[i].fadLen, i);
1286 factOut (kFatal, 1, str);
1287
1288 uint iq;
1289 for (iq = 0; iq < rd[i].fadLen; iq++) {
1290 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1291 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq,
1292 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
1293 factOut (kFatal, 1, str);
1294 }
1295 }
1296 }
1297#endif
1298 int qncpy = 0;
1299 boardId = b;
1300 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1301 int fadCrate = fadBoard / 256;
1302 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1303 snprintf (str, MXSTR, "wrong Board ID %d %d %d",
1304 fadCrate, fadBoard % 256, boardId);
1305 factOut (kWarn, 301, str);
1306 }
1307 if (mBuffer[evID].board[boardId] != -1) {
1308 snprintf (str, MXSTR,
1309 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ",
1310 evID, boardId, i, rd[i].fadLen,
1311 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1312 rd[i].rBuf->B[rd[i].bufPos - 2],
1313 rd[i].rBuf->B[rd[i].bufPos - 1]);
1314 factOut (kWarn, 501, str);
1315 goto EndBuf; //--> skip Board
1316 }
1317
1318 int iDx = evtIdx[evID]; //index into evtCtrl
1319
1320 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
1321 &rd[i].rBuf->S[0], head_len);
1322 qncpy += head_len;
1323
1324 src = head_len / 2;
1325 for (px = 0; px < 9; px++) { //different sort in FAD board.....
1326 for (drs = 0; drs < 4; drs++) {
1327 pixH = ntohs (rd[i].rBuf->S[src++]); // ID
1328 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell
1329 pixR = ntohs (rd[i].rBuf->S[src++]); // roi
1330//here we should check if pixH is correct ....
1331
1332 pixS = boardId * 36 + drs * 9 + px;
1333 src++;
1334
1335
1336 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1337 dest = pixS * roi[0];
1338 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1339 &rd[i].rBuf->S[src], roi[0] * 2);
1340 qncpy += roi[0] * 2;
1341 src += pixR;
1342
1343 if (px == 8) {
1344 tmS = boardId * 4 + drs;
1345 if (pixR > roi[0]) { //and we have additional TM info
1346 dest = tmS * roi[0] + NPIX * roi[0];
1347 int srcT = src - roi[0];
1348 mBuffer[evID].fEvent->StartTM[tmS] =
1349 (pixC + pixR - roi[0]) % 1024;
1350 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1351 &rd[i].rBuf->S[srcT], roi[0] * 2);
1352 qncpy += roi[0] * 2;
1353 } else {
1354 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1355 //ETIENNE because the TM channels are always processed during drs calib,
1356 //set them to zero if they are not present
1357 //I suspect that it may be more efficient to set all the allocated mem to
1358 //zero when allocating it
1359 dest = tmS*roi[0] + NPIX*roi[0];
1360 bzero(&mBuffer[evID].fEvent->Adc_Data[dest],roi[0]*2);
1361 }
1362 }
1363 }
1364 } // now we have stored a new board contents into Event structure
1365
1366 mBuffer[evID].fEvent->NumBoards++;
1367 mBuffer[evID].board[boardId] = boardId;
1368 evtCtrl.evtStat[iDx]++;
1369 evtCtrl.pcTime[iDx] = g_actTime;
1370
1371 if (++mBuffer[evID].nBoard >= actBoards) {
1372 int qnrun = 0;
1373 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ???
1374 actrun = mBuffer[evID].runNum;
1375 int ir;
1376 for (ir = 0; ir < MAX_RUN; ir++) {
1377 qnrun++;
1378 if (runCtrl[ir].runId == actrun) {
1379 if (++runCtrl[ir].lastEvt == 0) {
1380 gotNewRun (actrun, mBuffer[evID].FADhead);
1381 snprintf (str, MXSTR, "gotNewRun %d (ev %d)",
1382 mBuffer[evID].runNum,
1383 mBuffer[evID].evNum);
1384 factOut (kInfo, 1, str);
1385 break;
1386 }
1387 }
1388 }
1389 }
1390 snprintf (str, MXSTR,
1391 "%5d complete event roi %4d roiTM %d cpy %8d %5d",
1392 mBuffer[evID].evNum, roi[0], roi[8] - roi[0],
1393 qncpy, qnrun);
1394 factOut (kDebug, -1, str);
1395
1396 //complete event read ---> flag for next processing
1397 evtCtrl.evtStat[iDx] = 99;
1398 gi.evtTot++;
1399 }
1400
1401 EndBuf:
1402 rd[i].bufTyp = 0; //ready to read next header
1403 rd[i].bufLen = frst_len;
1404 rd[i].bufPos = 0;
1405 EndBuf1:
1406 ;
1407 }
1408
1409 } else { //we are reading event header
1410 rd[i].bufPos += jrd;
1411 rd[i].bufLen -= jrd;
1412 if (rd[i].bufPos >= minLen) { //sufficient data to take action
1413 //check if startflag correct; else shift block ....
1414 for (k = 0; k < rd[i].bufPos - 1; k++) {
1415 if (rd[i].rBuf->B[k] == start.B[1]
1416 && rd[i].rBuf->B[k + 1] == start.B[0])
1417 break;
1418 }
1419 rd[i].skip += k;
1420
1421 if (k >= rd[i].bufPos - 1) { //no start of header found
1422 rd[i].bufPos = 0;
1423 rd[i].bufLen = head_len;
1424 } else if (k > 0) {
1425 rd[i].bufPos -= k;
1426 rd[i].bufLen += k;
1427 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1428 rd[i].bufPos);
1429#ifdef EVTDEBUG
1430 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1431 rd[i].bufPos);
1432#endif
1433 }
1434 if (rd[i].bufPos >= minLen) {
1435 if (rd[i].skip > 0) {
1436 snprintf (str, MXSTR, "skipped %d bytes on port%d",
1437 rd[i].skip, i);
1438 factOut (kInfo, 666, str);
1439 rd[i].skip = 0;
1440 }
1441 goodhed++;
1442 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2;
1443 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]);
1444 rd[i].ftmTyp = ntohs (rd[i].rBuf->S[5]);
1445 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt)
1446 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt)
1447 rd[i].runID = ntohl (rd[i].rBuf->I[11]);
1448 rd[i].bufTyp = 1; //ready to read full record
1449 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1450
1451 int fadboard = ntohs (rd[i].rBuf->S[12]);
1452 int fadcrate = fadboard / 256;
1453 fadboard = (fadcrate * 10 + fadboard % 256);
1454#ifdef EVTDEBUG
1455 snprintf (str, MXSTR,
1456 "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
1457 rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
1458 rd[i].runID, fadboard, jrd);
1459 factOut (kDebug, 1, str);
1460#endif
1461
1462 if (rd[i].runID == 0)
1463 rd[i].runID = gi_myRun;
1464
1465 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
1466 snprintf (str, MXSTR,
1467 "illegal event-length on port %d", i);
1468 factOut (kFatal, 881, str);
1469 rd[i].bufLen = 100000; //?
1470 }
1471 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1472 debugHead (i, fadBoard, rd[i].rBuf);
1473 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1474 } else {
1475 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1476 }
1477 } else {
1478 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1479 }
1480
1481 } //end interpreting last read
1482}
1483 } //end of successful read anything
1484 } //finished trying to read all sockets
1485
1486#ifdef EVTDEBUG
1487 snprintf (str, MXSTR, "Loop ---- %3d --- %8d", numokx, jrdx);
1488 factOut (kDebug, -1, str);
1489#endif
1490
1491 gi.numRead[numok]++;
1492
1493 g_actTime = time (NULL);
1494 if (g_actTime > gi_SecTime) {
1495 gi_SecTime = g_actTime;
1496
1497
1498 //loop over all active events and flag those older than read-timeout
1499 //delete those that are written to disk ....
1500
1501 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1502 if (kd < 0)
1503 kd += (MAX_EVT * MAX_RUN);
1504
1505 gj.bufNew = gj.bufEvt = 0;
1506 int k1 = evtCtrl.frstPtr;
1507 for (k = k1; k < (k1 + kd); k++) {
1508 int k0 = k % (MAX_EVT * MAX_RUN);
1509//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1510
1511 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) {
1512 gj.bufNew++; //incomplete event in Buffer
1513 if (evtCtrl.evtStat[k0] < 90
1514 && evtCtrl.pcTime[k0] < g_actTime - 10) {
1515 int id = evtCtrl.evtBuf[k0];
1516 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d",
1517 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0,
1518 evtCtrl.evtStat[k0]);
1519 factOut (kWarn, 601, str);
1520
1521 int ik,ib,jb;
1522 ik=0;
1523 for (ib=0; ib<NBOARDS; ib++) {
1524 if (ib%10==0) {
1525 snprintf (&str[ik], MXSTR, "'");
1526 ik++;
1527 }
1528 jb = mBuffer[id].board[ib];
1529 if ( jb<0 ) {
1530 if (gi_NumConnect[b] >0 ) {
1531 snprintf (&str[ik], MXSTR, ".");
1532 } else {
1533 snprintf (&str[ik], MXSTR, "x");
1534 }
1535 } else {
1536 snprintf (&str[ik], MXSTR, "%d",jb%10);
1537 }
1538 ik++;
1539 }
1540 snprintf (&str[ik], MXSTR, "'");
1541 factOut (kWarn, 601, str);
1542
1543 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events
1544 gi.evtSkp++;
1545 gi.evtTot++;
1546 gj.evtSkip++;
1547 }
1548 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete'
1549 || evtCtrl.evtStat[k0] == 0) { //'useless'
1550
1551 int id = evtCtrl.evtBuf[k0];
1552 snprintf (str, MXSTR, "%5d free event buffer, nb=%3d",
1553 mBuffer[id].evNum, mBuffer[id].nBoard);
1554 factOut (kDebug, -1, str);
1555 mBufFree (id); //event written--> free memory
1556 evtCtrl.evtStat[k0] = -1;
1557 gj.evtWrite++;
1558 gj.rateWrite++;
1559 } else if (evtCtrl.evtStat[k0] >= 95) {
1560 gj.bufEvt++; //complete event in Buffer
1561 }
1562
1563 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) {
1564 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1565 }
1566 }
1567
1568
1569 gj.deltaT = 1000; //temporary, must be improved
1570
1571 int b;
1572 for (b = 0; b < NBOARDS; b++)
1573 gj.totBytes[b] += gj.rateBytes[b];
1574 gj.totMem = g_maxMem;
1575 if (gj.maxMem > gj.xxxMem)
1576 gj.xxxMem = gj.maxMem;
1577 if (gj.maxEvt > gj.xxxEvt)
1578 gj.xxxEvt = gj.maxEvt;
1579
1580 factStat (gj);
1581 factStatNew (gi);
1582 gj.rateNew = gj.rateWrite = 0;
1583 gj.maxMem = gj.usdMem;
1584 gj.maxEvt = gj.bufTot;
1585 for (b = 0; b < NBOARDS; b++)
1586 gj.rateBytes[b] = 0;
1587 }
1588
1589 if (numok > 0)
1590 numok2 = 0;
1591 else if (numok2++ > 3) {
1592 if (g_runStat == 1) {
1593 xwait.tv_sec = 1;
1594 xwait.tv_nsec = 0; // hibernate for 1 sec
1595 } else {
1596 xwait.tv_sec = 0;
1597 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1598 }
1599 nanosleep (&xwait, NULL);
1600 }
1601
1602 } //and do next loop over all sockets ...
1603
1604
1605 snprintf (str, MXSTR, "stop reading ... RESET=%d", g_reset);
1606 factOut (kInfo, -1, str);
1607
1608 if (g_reset > 0) {
1609 gi_reset = g_reset;
1610 gi_resetR = gi_reset % 10; //shall we stop reading ?
1611 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1612 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1613 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1614 g_reset = 0;
1615 } else {
1616 gi_reset = 0;
1617 if (g_runStat == -1)
1618 gi_resetR = 1;
1619 else
1620 gi_resetR = 7;
1621 gi_resetS = 7; //close all sockets
1622 gi_resetW = 7; //close all files
1623 gi_resetX = 0;
1624
1625 //inform others we have to quit ....
1626 gi_runStat = -11; //inform all that no update to happen any more
1627 gj.readStat = -11; //inform all that no update to happen any more
1628 }
1629
1630 if (gi_resetS > 0) {
1631 //must close all open sockets ...
1632 snprintf (str, MXSTR, "close all sockets ...");
1633 factOut (kInfo, -1, str);
1634 for (i = 0; i < MAX_SOCK; i++) {
1635 if (rd[i].sockStat == 0) {
1636 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1637 if (i % 7 == 0) {
1638// gi_NumConnect[i / 7]--;
1639 gi_NumConnect[i / 7]-= cntsock ;
1640 gi.numConn[i / 7]--;
1641 gj.numConn[i / 7]--;
1642 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1643 rd[i / 7].sockStat = -1; //and try to open asap
1644 }
1645 }
1646 }
1647 }
1648
1649
1650 if (gi_resetR > 0) {
1651 //flag all events as 'read finished'
1652 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1653 if (kd < 0)
1654 kd += (MAX_EVT * MAX_RUN);
1655
1656 int k1 = evtCtrl.frstPtr;
1657 for (k = k1; k < (k1 + kd); k++) {
1658 int k0 = k % (MAX_EVT * MAX_RUN);
1659 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) {
1660 evtCtrl.evtStat[k0] = 91;
1661 gi.evtSkp++;
1662 gi.evtTot++;
1663 }
1664 }
1665
1666 xwait.tv_sec = 0;
1667 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1668 nanosleep (&xwait, NULL);
1669
1670 //and clear all buffers (might have to wait until all others are done)
1671 int minclear;
1672 if (gi_resetR == 1) {
1673 minclear = 900;
1674 snprintf (str, MXSTR, "drain all buffers ...");
1675 } else {
1676 minclear = 0;
1677 snprintf (str, MXSTR, "flush all buffers ...");
1678 }
1679 factOut (kInfo, -1, str);
1680
1681 int numclear = 1;
1682 while (numclear > 0) {
1683 numclear = 0;
1684 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1685 if (kd < 0)
1686 kd += (MAX_EVT * MAX_RUN);
1687
1688 int k1 = evtCtrl.frstPtr;
1689 for (k = k1; k < (k1 + kd); k++) {
1690 int k0 = k % (MAX_EVT * MAX_RUN);
1691 if (evtCtrl.evtStat[k0] > minclear) {
1692 int id = evtCtrl.evtBuf[k0];
1693 snprintf (str, MXSTR, "ev %5d free event buffer, nb=%3d",
1694 mBuffer[id].evNum, mBuffer[id].nBoard);
1695 factOut (kDebug, -1, str);
1696 mBufFree (id); //event written--> free memory
1697 evtCtrl.evtStat[k0] = -1;
1698 } else if (evtCtrl.evtStat[k0] > 0)
1699 numclear++; //writing is still ongoing...
1700
1701 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1702 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1703 }
1704
1705 xwait.tv_sec = 0;
1706 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1707 nanosleep (&xwait, NULL);
1708 }
1709 }
1710
1711 if (gi_reset > 0) {
1712 if (gi_resetW > 0) {
1713 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1714 }
1715 if (gi_resetX > 0) {
1716 xwait.tv_sec = gi_resetX;
1717 xwait.tv_nsec = 0;
1718 nanosleep (&xwait, NULL);
1719 }
1720
1721 snprintf (str, MXSTR, "Continue read Process ...");
1722 factOut (kInfo, -1, str);
1723 gi_reset = 0;
1724 goto START;
1725 }
1726
1727
1728
1729 snprintf (str, MXSTR, "Exit read Process ...");
1730 factOut (kInfo, -1, str);
1731 gi_runStat = -99;
1732 gj.readStat = -99;
1733 factStat (gj);
1734 factStatNew (gi);
1735 return 0;
1736
1737} /*-----------------------------------------------------------------*/
1738
1739
1740void *
1741subProc (void *thrid)
1742{
1743 int threadID, status, numWait, numProc, kd, k1, k0, k, jret;
1744 struct timespec xwait;
1745
1746 int32_t cntr ;
1747
1748 threadID = (int) thrid;
1749
1750 snprintf (str, MXSTR, "Starting sub-process-thread %d", threadID);
1751 factOut (kInfo, -1, str);
1752
1753 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1754 numWait = numProc = 0;
1755 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1756 if (kd < 0)
1757 kd += (MAX_EVT * MAX_RUN);
1758
1759 int k1 = evtCtrl.frstPtr;
1760 for (k = k1; k < (k1 + kd); k++) {
1761 int k0 = k % (MAX_EVT * MAX_RUN);
1762
1763 if (evtCtrl.evtStat[k0] == 1000 + threadID) {
1764 if (gi_resetR > 1) { //we are asked to flush buffers asap
1765 jret = 9100; //flag to be deleted
1766 } else {
1767 int id = evtCtrl.evtBuf[k0];
1768
1769
1770 jret =
1771 subProcEvt (threadID, mBuffer[id].FADhead,
1772 mBuffer[id].fEvent, mBuffer[id].buffer);
1773
1774
1775 if (jret <= threadID) {
1776 snprintf (str, MXSTR,
1777 "process %d wants to send to process %d",
1778 threadID, jret);
1779 factOut (kError, -1, str);
1780 jret = 5300;
1781 } else if (jret <= 0)
1782 jret = 9200 + threadID; //flag as 'to be deleted'
1783 else if (jret >= gi_maxProc)
1784 jret = 5200 + threadID; //flag as 'to be written'
1785 else
1786 jret = 1000 + jret; //flag for next proces
1787 }
1788 evtCtrl.evtStat[k0] = jret;
1789 numProc++;
1790 } else if (evtCtrl.evtStat[k0] < 1000 + threadID)
1791 numWait++;
1792 }
1793
1794 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1795 snprintf (str, MXSTR, "Exit subProcessing Process %d", threadID);
1796 factOut (kInfo, -1, str);
1797 return 0;
1798 }
1799 if (numProc == 0) {
1800 //seems we have nothing to do, so sleep a little
1801 xwait.tv_sec = 0;
1802 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1803 nanosleep (&xwait, NULL);
1804 }
1805 }
1806
1807 snprintf (str, MXSTR, "Ending sub-process-thread %d", threadID);
1808 factOut (kInfo, -1, str);
1809 return;
1810} /*-----------------------------------------------------------------*/
1811
1812
1813void *
1814procEvt (void *ptr)
1815{
1816/* *** main loop processing file, including SW-trigger */
1817 int numProc, numWait;
1818 int k, status, j;
1819 struct timespec xwait;
1820 char str[MXSTR];
1821
1822
1823
1824 int lastRun = 0; //usually run from last event still valid
1825
1826 cpu_set_t mask;
1827 int cpu = 1; //process thread (will be several in final version)
1828
1829 snprintf (str, MXSTR, "Starting process-thread with %d subprocess",
1830 gi_maxProc);
1831 factOut (kInfo, -1, str);
1832
1833/* CPU_ZERO initializes all the bits in the mask to zero. */
1834// CPU_ZERO (&mask);
1835/* CPU_SET sets only the bit corresponding to cpu. */
1836// CPU_SET( 0 , &mask ); leave for system
1837// CPU_SET( 1 , &mask ); used by write process
1838// CPU_SET (2, &mask);
1839// CPU_SET (3, &mask);
1840// CPU_SET (4, &mask);
1841// CPU_SET (5, &mask);
1842// CPU_SET (6, &mask);
1843// CPU_SET( 7 , &mask ); used by read process
1844/* sched_setaffinity returns 0 in success */
1845// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1846// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1847// factOut (kWarn, -1, str);
1848// }
1849
1850
1851 pthread_t thread[100];
1852 int th_ret[100];
1853
1854 for (k = 0; k < gi_maxProc; k++) {
1855 th_ret[k] = pthread_create (&thread[k], NULL, subProc, (void *) k);
1856 }
1857
1858 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1859
1860 numWait = numProc = 0;
1861 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1862 if (kd < 0)
1863 kd += (MAX_EVT * MAX_RUN);
1864
1865 int k1 = evtCtrl.frstPtr;
1866 for (k = k1; k < (k1 + kd); k++) {
1867 int k0 = k % (MAX_EVT * MAX_RUN);
1868//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1869 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) {
1870
1871 if (gi_resetR > 1) { //we are asked to flush buffers asap
1872 evtCtrl.evtStat[k0] = 9991;
1873 } else {
1874
1875//-------- it is better to open the run already here, so call can be used to initialize
1876//-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1877 int id = evtCtrl.evtBuf[k0];
1878 uint32_t irun = mBuffer[id].runNum;
1879 int32_t ievt = mBuffer[id].evNum;
1880 if (runCtrl[lastRun].runId == irun) {
1881 j = lastRun;
1882 } else {
1883 //check which fileID to use (or open if needed)
1884 for (j = 0; j < MAX_RUN; j++) {
1885 if (runCtrl[j].runId == irun)
1886 break;
1887 }
1888 if (j >= MAX_RUN) {
1889 snprintf (str, MXSTR,
1890 "P error: can not find run %d for event %d in %d",
1891 irun, ievt, id);
1892 factOut (kFatal, 901, str);
1893 }
1894 lastRun = j;
1895 }
1896
1897 if (runCtrl[j].fileId < 0) {
1898//---- we need to open a new run ==> make sure all older runs are
1899//---- finished and marked to be closed ....
1900 int j1;
1901 for (j1 = 0; j1 < MAX_RUN; j1++) {
1902 if (runCtrl[j1].fileId == 0) {
1903 runCtrl[j1].procId = 2; //--> do no longer accept events for processing
1904//---- problem: processing still going on ==> must wait for closing ....
1905 snprintf (str, MXSTR,
1906 "P finish run since new one opened %d",
1907 runCtrl[j1].runId);
1908 runFinish1 (runCtrl[j1].runId);
1909 }
1910
1911 }
1912
1913 actRun.Version = 1;
1914 actRun.RunType = -1; //to be adapted
1915
1916 actRun.Nroi = runCtrl[j].roi0;
1917 actRun.NroiTM = runCtrl[j].roi8;
1918//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
1919// if (actRun.Nroi == actRun.NroiTM)
1920// actRun.NroiTM = 0;
1921 actRun.RunTime = runCtrl[j].firstTime;
1922 actRun.RunUsec = runCtrl[j].firstTime;
1923 actRun.NBoard = NBOARDS;
1924 actRun.NPix = NPIX;
1925 actRun.NTm = NTMARK;
1926 actRun.Nroi = mBuffer[id].nRoi;
1927 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1928 NBOARDS * sizeof (PEVNT_HEADER));
1929
1930 runCtrl[j].fileHd =
1931 runOpen (irun, &actRun, sizeof (actRun));
1932 if (runCtrl[j].fileHd == NULL) {
1933 snprintf (str, MXSTR,
1934 "P could not open a file for run %d", irun);
1935 factOut (kError, 502, str);
1936 runCtrl[j].fileId = 91;
1937 runCtrl[j].procId = 91;
1938 } else {
1939 snprintf (str, MXSTR, "P opened new run_file %d evt %d",
1940 irun, ievt);
1941 factOut (kInfo, -1, str);
1942 runCtrl[j].fileId = 0;
1943 runCtrl[j].procId = 0;
1944 }
1945
1946 }
1947//-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
1948 if (runCtrl[j].procId == 0) {
1949 if (runCtrl[j].closeTime < g_actTime
1950 || runCtrl[j].lastTime < g_actTime - 300
1951 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) {
1952 snprintf (str, MXSTR,
1953 "P reached end of run condition for run %d",
1954 irun);
1955 factOut (kInfo, 502, str);
1956 runFinish1 (runCtrl[j].runId);
1957 runCtrl[j].procId = 1;
1958 }
1959 }
1960 if (runCtrl[j].procId != 0) {
1961 snprintf (str, MXSTR,
1962 "P skip event %d because no active run %d", ievt,
1963 irun);
1964 factOut (kDebug, 502, str);
1965 evtCtrl.evtStat[k0] = 9091;
1966 } else {
1967//--------
1968//--------
1969 id = evtCtrl.evtBuf[k0];
1970 int itevt = mBuffer[id].trgNum;
1971 int itrg = mBuffer[id].trgTyp;
1972 int roi = mBuffer[id].nRoi;
1973 int roiTM = mBuffer[id].nRoiTM;
1974
1975//make sure unused pixels/tmarks are cleared to zero
1976//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
1977// if (roiTM == roi)
1978// roiTM = 0;
1979 int ip, it, dest, ib;
1980 for (ip = 0; ip < NPIX; ip++) {
1981 if (mBuffer[id].fEvent->StartPix[ip] == -1) {
1982 dest = ip * roi;
1983 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1984 }
1985 }
1986
1987 for (it = 0; it < NTMARK; it++) {
1988 if (mBuffer[id].fEvent->StartTM[it] == -1) {
1989 dest = it * roi + NPIX * roi;
1990 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1991 }
1992 }
1993
1994
1995//and set correct event header ; also check for consistency in event (not yet)
1996 mBuffer[id].fEvent->Roi = roi;
1997 mBuffer[id].fEvent->RoiTM = roiTM;
1998 mBuffer[id].fEvent->EventNum = ievt;
1999 mBuffer[id].fEvent->TriggerNum = itevt;
2000 mBuffer[id].fEvent->TriggerType = itrg;
2001 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
2002 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
2003 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
2004 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
2005 mBuffer[id].fEvent->SoftTrig = 0;
2006
2007
2008 for (ib = 0; ib < NBOARDS; ib++) {
2009 if (mBuffer[id].board[ib] == -1) { //board is not read
2010 mBuffer[id].FADhead[ib].start_package_flag = 0;
2011 mBuffer[id].fEvent->BoardTime[ib] = 0;
2012 } else {
2013 mBuffer[id].fEvent->BoardTime[ib] =
2014 ntohl (mBuffer[id].FADhead[ib].time);
2015 }
2016 }
2017 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
2018 mBuffer[id].fEvent);
2019 gi.procTot++;
2020 numProc++;
2021
2022 if (i < 0) {
2023 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
2024 gi.procErr++;
2025 } else {
2026 evtCtrl.evtStat[k0] = 1000;
2027 runCtrl[j].procEvt++;
2028 }
2029 }
2030 }
2031 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) {
2032 numWait++;
2033 }
2034 }
2035
2036 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2037 snprintf (str, MXSTR, "Exit Processing Process ...");
2038 factOut (kInfo, -1, str);
2039 gp_runStat = -22; //==> we should exit
2040 gj.procStat = -22; //==> we should exit
2041 return 0;
2042 }
2043
2044 if (numProc == 0) {
2045 //seems we have nothing to do, so sleep a little
2046 xwait.tv_sec = 0;
2047 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2048 nanosleep (&xwait, NULL);
2049 }
2050 gp_runStat = gi_runStat;
2051 gj.procStat = gj.readStat;
2052
2053 }
2054
2055 //we are asked to abort asap ==> must flag all remaining events
2056 // when gi_runStat claims that all events are in the buffer...
2057
2058 snprintf (str, MXSTR, "Abort Processing Process ...");
2059 factOut (kInfo, -1, str);
2060 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2061 if (kd < 0)
2062 kd += (MAX_EVT * MAX_RUN);
2063
2064 for (k = 0; k < gi_maxProc; k++) {
2065 pthread_join (thread[k], (void **) &status);
2066 }
2067
2068 int k1 = evtCtrl.frstPtr;
2069 for (k = k1; k < (k1 + kd); k++) {
2070 int k0 = k % (MAX_EVT * MAX_RUN);
2071 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {
2072 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
2073 }
2074 }
2075
2076 gp_runStat = -99;
2077 gj.procStat = -99;
2078
2079 return 0;
2080
2081} /*-----------------------------------------------------------------*/
2082
2083int
2084CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
2085{
2086/* close run runId (all all runs if runId=0) */
2087/* return: 0=close scheduled / >0 already closed / <0 does not exist */
2088 int j;
2089
2090
2091 if (runId == 0) {
2092 for (j = 0; j < MAX_RUN; j++) {
2093 if (runCtrl[j].fileId == 0) { //run is open
2094 runCtrl[j].closeTime = closeTime;
2095 runCtrl[j].maxEvt = maxEvt;
2096 }
2097 }
2098 return 0;
2099 }
2100
2101 for (j = 0; j < MAX_RUN; j++) {
2102 if (runCtrl[j].runId == runId) {
2103 if (runCtrl[j].fileId == 0) { //run is open
2104 runCtrl[j].closeTime = closeTime;
2105 runCtrl[j].maxEvt = maxEvt;
2106 return 0;
2107 } else if (runCtrl[j].fileId < 0) { //run not yet opened
2108 runCtrl[j].closeTime = closeTime;
2109 runCtrl[j].maxEvt = maxEvt;
2110 return +1;
2111 } else { // run already closed
2112 return +2;
2113 }
2114 }
2115 } //we only reach here if the run was never created
2116 return -1;
2117
2118} /*-----------------------------------------------------------------*/
2119
2120
2121void *
2122writeEvt (void *ptr)
2123{
2124/* *** main loop writing event (including opening and closing run-files */
2125
2126 int numWrite, numWait;
2127 int k, j ;
2128 struct timespec xwait;
2129 char str[MXSTR];
2130
2131 cpu_set_t mask;
2132 int cpu = 1; //write thread
2133
2134 snprintf (str, MXSTR, "Starting write-thread");
2135 factOut (kInfo, -1, str);
2136
2137/* CPU_ZERO initializes all the bits in the mask to zero. */
2138// CPU_ZERO (&mask);
2139/* CPU_SET sets only the bit corresponding to cpu. */
2140// CPU_SET (cpu, &mask);
2141/* sched_setaffinity returns 0 in success */
2142// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
2143// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
2144// }
2145
2146 int lastRun = 0; //usually run from last event still valid
2147
2148 while (g_runStat > -2) {
2149
2150 numWait = numWrite = 0;
2151 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2152 if (kd < 0)
2153 kd += (MAX_EVT * MAX_RUN);
2154
2155 int k1 = evtCtrl.frstPtr;
2156 for (k = k1; k < (k1 + kd); k++) {
2157 int k0 = k % (MAX_EVT * MAX_RUN);
2158//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
2159 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) {
2160
2161 if (gi_resetR > 1) { //we must drain the buffer asap
2162 evtCtrl.evtStat[k0] = 9904;
2163 } else {
2164
2165
2166 int id = evtCtrl.evtBuf[k0];
2167 uint32_t irun = mBuffer[id].runNum;
2168 int32_t ievt = mBuffer[id].evNum;
2169
2170 gi.wrtTot++;
2171 if (runCtrl[lastRun].runId == irun) {
2172 j = lastRun;
2173 } else {
2174 //check which fileID to use (or open if needed)
2175 for (j = 0; j < MAX_RUN; j++) {
2176 if (runCtrl[j].runId == irun)
2177 break;
2178 }
2179 if (j >= MAX_RUN) {
2180 snprintf (str, MXSTR,
2181 "W error: can not find run %d for event %d in %d",
2182 irun, ievt, id);
2183 factOut (kFatal, 901, str);
2184 gi.wrtErr++;
2185 }
2186 lastRun = j;
2187 }
2188
2189 if (runCtrl[j].fileId < 0) {
2190 actRun.Version = 1;
2191 actRun.RunType = -1; //to be adapted
2192
2193 actRun.Nroi = runCtrl[j].roi0;
2194 actRun.NroiTM = runCtrl[j].roi8;
2195//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
2196// if (actRun.Nroi == actRun.NroiTM)
2197// actRun.NroiTM = 0;
2198 actRun.RunTime = runCtrl[j].firstTime;
2199 actRun.RunUsec = runCtrl[j].firstTime;
2200 actRun.NBoard = NBOARDS;
2201 actRun.NPix = NPIX;
2202 actRun.NTm = NTMARK;
2203 actRun.Nroi = mBuffer[id].nRoi;
2204 memcpy (actRun.FADhead, mBuffer[id].FADhead,
2205 NBOARDS * sizeof (PEVNT_HEADER));
2206
2207 runCtrl[j].fileHd =
2208 runOpen (irun, &actRun, sizeof (actRun));
2209 if (runCtrl[j].fileHd == NULL) {
2210 snprintf (str, MXSTR,
2211 "W could not open a file for run %d", irun);
2212 factOut (kError, 502, str);
2213 runCtrl[j].fileId = 91;
2214 } else {
2215 snprintf (str, MXSTR, "W opened new run_file %d evt %d",
2216 irun, ievt);
2217 factOut (kInfo, -1, str);
2218 runCtrl[j].fileId = 0;
2219 }
2220
2221 }
2222
2223 if (runCtrl[j].fileId != 0) {
2224 if (runCtrl[j].fileId < 0) {
2225 snprintf (str, MXSTR,
2226 "W never opened file for this run %d", irun);
2227 factOut (kError, 123, str);
2228 } else if (runCtrl[j].fileId < 100) {
2229 snprintf (str, MXSTR, "W.file for this run is closed %d",
2230 irun);
2231 factOut (kWarn, 123, str);
2232 runCtrl[j].fileId += 100;
2233 } else {
2234 snprintf (str, MXSTR, "W:file for this run is closed %d",
2235 irun);
2236 factOut (kDebug, 123, str);
2237 }
2238 evtCtrl.evtStat[k0] = 9903;
2239 gi.wrtErr++;
2240 } else {
2241// snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id]));
2242// factOut (kInfo, 504, str);
2243 int i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
2244 sizeof (mBuffer[id]));
2245 if (i >= 0) {
2246 runCtrl[j].lastTime = g_actTime;
2247 runCtrl[j].actEvt++;
2248 evtCtrl.evtStat[k0] = 9901;
2249 snprintf (str, MXSTR,
2250 "%5d successfully wrote for run %d id %5d",
2251 ievt, irun, k0);
2252 factOut (kDebug, 504, str);
2253// gj.writEvt++ ;
2254 } else {
2255 snprintf (str, MXSTR, "W error writing event for run %d",
2256 irun);
2257 factOut (kError, 503, str);
2258 evtCtrl.evtStat[k0] = 9902;
2259 gi.wrtErr++;
2260 }
2261
2262 if (i < 0
2263 || runCtrl[j].lastTime < g_actTime - 300
2264 || runCtrl[j].closeTime < g_actTime
2265 || runCtrl[j].maxEvt < runCtrl[j].actEvt) {
2266 int ii = 0;
2267 if (i < 0)
2268 ii = 1;
2269 else if (runCtrl[j].closeTime < g_actTime)
2270 ii = 2;
2271 else if (runCtrl[j].lastTime < g_actTime - 300)
2272 ii = 3;
2273 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2274 ii = 4;
2275
2276
2277
2278 //close run for whatever reason
2279 if (runCtrl[j].runId == gi_myRun)
2280 gi_myRun = g_actTime;
2281
2282 if (runCtrl[j].procId == 0) {
2283 runFinish1 (runCtrl[j].runId);
2284 runCtrl[j].procId = 92;
2285 }
2286
2287 runCtrl[j].closeTime = g_actTime - 1;
2288 i = runClose (runCtrl[j].fileHd, &runTail[j],
2289 sizeof (runTail[j]));
2290 if (i < 0) {
2291 snprintf (str, MXSTR, "error closing run %d %d AAA",
2292 runCtrl[j].runId, i);
2293 factOut (kError, 503, str);
2294 runCtrl[j].fileId = 92;
2295 } else {
2296 snprintf (str, MXSTR, "W closed run %d AAA %d", irun,
2297 ii);
2298 factOut (kInfo, 503, str);
2299 runCtrl[j].fileId = 93;
2300 }
2301 }
2302 }
2303 }
2304 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2305 numWait++;
2306 }
2307
2308 //check if we should close a run (mainly when no event pending)
2309 for (j = 0; j < MAX_RUN; j++) {
2310 if (runCtrl[j].fileId == 0
2311 && (runCtrl[j].closeTime < g_actTime
2312 || runCtrl[j].lastTime < g_actTime - 300
2313 || runCtrl[j].maxEvt <= runCtrl[j].actEvt)) {
2314 if (runCtrl[j].runId == gi_myRun)
2315 gi_myRun = g_actTime;
2316 int ii = 0;
2317 if (runCtrl[j].closeTime < g_actTime)
2318 ii = 2;
2319 else if (runCtrl[j].lastTime < g_actTime - 300)
2320 ii = 3;
2321 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2322 ii = 4;
2323
2324 if (runCtrl[j].procId == 0) {
2325 runFinish1 (runCtrl[j].runId);
2326 runCtrl[j].procId = 92;
2327 }
2328
2329 runCtrl[j].closeTime = g_actTime - 1;
2330 int i = runClose (runCtrl[j].fileHd, &runTail[j],
2331 sizeof (runTail[j]));
2332 if (i < 0) {
2333 snprintf (str, MXSTR, "error closing run %d %d BBB",
2334 runCtrl[j].runId, i);
2335 factOut (kError, 506, str);
2336 runCtrl[j].fileId = 94;
2337 } else {
2338 snprintf (str, MXSTR, "W closed run %d BBB %d",
2339 runCtrl[j].runId, ii);
2340 factOut (kInfo, 507, str);
2341 runCtrl[j].fileId = 95;
2342 }
2343 }
2344 }
2345
2346 if (numWrite == 0) {
2347 //seems we have nothing to do, so sleep a little
2348 xwait.tv_sec = 0;
2349 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2350 nanosleep (&xwait, NULL);
2351 }
2352
2353 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2354 snprintf (str, MXSTR, "Finish Write Process ...");
2355 factOut (kInfo, -1, str);
2356 gw_runStat = -22; //==> we should exit
2357 gj.writStat = -22; //==> we should exit
2358 goto closerun;
2359 }
2360 gw_runStat = gi_runStat;
2361 gj.writStat = gj.readStat;
2362
2363 }
2364
2365 //must close all open files ....
2366 snprintf (str, MXSTR, "Abort Writing Process ...");
2367 factOut (kInfo, -1, str);
2368
2369 closerun:
2370 snprintf (str, MXSTR, "Close all open files ...");
2371 factOut (kInfo, -1, str);
2372 for (j = 0; j < MAX_RUN; j++)
2373 if (runCtrl[j].fileId == 0) {
2374 if (runCtrl[j].runId == gi_myRun)
2375 gi_myRun = g_actTime;
2376
2377 if (runCtrl[j].procId == 0) {
2378 runFinish1 (runCtrl[j].runId);
2379 runCtrl[j].procId = 92;
2380 }
2381
2382 runCtrl[j].closeTime = g_actTime - 1;
2383 int i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));
2384 int ii = 0;
2385 if (runCtrl[j].closeTime < g_actTime)
2386 ii = 2;
2387 else if (runCtrl[j].lastTime < g_actTime - 300)
2388 ii = 3;
2389 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2390 ii = 4;
2391 if (i < 0) {
2392 snprintf (str, MXSTR, "error closing run %d %d CCC",
2393 runCtrl[j].runId, i);
2394 factOut (kError, 506, str);
2395 runCtrl[j].fileId = 96;
2396 } else {
2397 snprintf (str, MXSTR, "W closed run %d CCC %d", runCtrl[j].runId,
2398 ii);
2399 factOut (kInfo, 507, str);
2400 runCtrl[j].fileId = 97;
2401 }
2402 }
2403
2404 gw_runStat = -99;
2405 gj.writStat = -99;
2406 snprintf (str, MXSTR, "Exit Writing Process ...");
2407 factOut (kInfo, -1, str);
2408 return 0;
2409
2410
2411
2412
2413} /*-----------------------------------------------------------------*/
2414
2415
2416
2417
2418void
2419StartEvtBuild ()
2420{
2421
2422 int i, j, imax, status, th_ret[50];
2423 pthread_t thread[50];
2424 struct timespec xwait;
2425
2426 gi_runStat = gp_runStat = gw_runStat = 0;
2427 gj.readStat = gj.procStat = gj.writStat = 0;
2428
2429 snprintf (str, MXSTR, "Starting EventBuilder V15.07 A");
2430 factOut (kInfo, -1, str);
2431
2432//initialize run control logics
2433 for (i = 0; i < MAX_RUN; i++) {
2434 runCtrl[i].runId = 0;
2435 runCtrl[i].fileId = -2;
2436 }
2437
2438//prepare for subProcesses
2439 gi_maxSize = g_maxSize;
2440 if (gi_maxSize <= 0)
2441 gi_maxSize = 1;
2442
2443 gi_maxProc = g_maxProc;
2444 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2445 snprintf (str, MXSTR, "illegal number of processes %d", gi_maxProc);
2446 factOut (kFatal, 301, str);
2447 gi_maxProc = 1;
2448 }
2449//partially initialize event control logics
2450 evtCtrl.frstPtr = 0;
2451 evtCtrl.lastPtr = 0;
2452
2453//start all threads (more to come) when we are allowed to ....
2454 while (g_runStat == 0) {
2455 xwait.tv_sec = 0;
2456 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2457 nanosleep (&xwait, NULL);
2458 }
2459
2460 i = 0;
2461 th_ret[i] = pthread_create (&thread[i], NULL, readFAD, NULL);
2462 i++;
2463 th_ret[i] = pthread_create (&thread[i], NULL, procEvt, NULL);
2464 i++;
2465 th_ret[i] = pthread_create (&thread[i], NULL, writeEvt, NULL);
2466 i++;
2467 imax = i;
2468
2469
2470#ifdef BILAND
2471 xwait.tv_sec = 30;;
2472 xwait.tv_nsec = 0; // sleep for ~20sec
2473 nanosleep (&xwait, NULL);
2474
2475 printf ("close all runs in 2 seconds\n");
2476
2477 CloseRunFile (0, time (NULL) + 2, 0);
2478
2479 xwait.tv_sec = 1;;
2480 xwait.tv_nsec = 0; // sleep for ~20sec
2481 nanosleep (&xwait, NULL);
2482
2483 printf ("setting g_runstat to -1\n");
2484
2485 g_runStat = -1;
2486#endif
2487
2488
2489//wait for all threads to finish
2490 for (i = 0; i < imax; i++) {
2491 j = pthread_join (thread[i], (void **) &status);
2492 }
2493
2494} /*-----------------------------------------------------------------*/
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510 /*-----------------------------------------------------------------*/
2511 /*-----------------------------------------------------------------*/
2512 /*-----------------------------------------------------------------*/
2513 /*-----------------------------------------------------------------*/
2514 /*-----------------------------------------------------------------*/
2515
2516#ifdef BILAND
2517
2518int
2519subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2520 int8_t * buffer)
2521{
2522 printf ("called subproc %d\n", threadID);
2523 return threadID + 1;
2524}
2525
2526
2527
2528
2529 /*-----------------------------------------------------------------*/
2530 /*-----------------------------------------------------------------*/
2531 /*-----------------------------------------------------------------*/
2532 /*-----------------------------------------------------------------*/
2533 /*-----------------------------------------------------------------*/
2534
2535
2536
2537
2538FileHandle_t
2539runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2540{
2541 return 1;
2542};
2543
2544int
2545runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2546{
2547 return 1;
2548 usleep (10000);
2549 return 1;
2550}
2551
2552
2553//{ return 1; } ;
2554
2555int
2556runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2557{
2558 return 1;
2559};
2560
2561
2562
2563
2564int
2565eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2566{
2567 int i = 0;
2568
2569// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2570// for (i=0; i<NBOARDS; i++) {
2571// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2572// }
2573 return 0;
2574}
2575
2576
2577void
2578factStatNew (EVT_STAT gi)
2579{
2580 int i;
2581
2582//for (i=0;i<MAX_SOCK;i++) {
2583// printf("%4d",gi.numRead[i]);
2584// if (i%20 == 0 ) printf("\n");
2585//}
2586}
2587
2588void
2589gotNewRun (int runnr, PEVNT_HEADER * headers)
2590{
2591 printf ("got new run %d\n", runnr);
2592 return;
2593}
2594
2595void
2596factStat (GUI_STAT gj)
2597{
2598// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2599// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2600}
2601
2602
2603void
2604debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2605 int state, uint32_t tsec, uint32_t tusec)
2606{
2607// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2608}
2609
2610
2611
2612void
2613debugStream (int isock, void *buf, int len)
2614{
2615}
2616
2617void
2618debugHead (int i, int j, void *buf)
2619{
2620}
2621
2622
2623void
2624factOut (int severity, int err, char *message)
2625{
2626 static FILE *fd;
2627 static int file = 0;
2628
2629 if (file == 0) {
2630 printf ("open file\n");
2631 fd = fopen ("x.out", "w+");
2632 file = 999;
2633 }
2634
2635 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2636
2637 if (severity != kDebug)
2638 printf ("%3d %3d | %s\n", severity, err, message);
2639}
2640
2641
2642
2643int
2644main ()
2645{
2646 int i, b, c, p;
2647 char ipStr[100];
2648 struct in_addr IPaddr;
2649
2650 g_maxMem = 1024 * 1024; //MBytes
2651//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2652 g_maxMem = g_maxMem * 200; //100MBytes
2653
2654 g_maxProc = 20;
2655 g_maxSize = 30000;
2656
2657 g_runStat = 40;
2658
2659 i = 0;
2660
2661// version for standard crates
2662//for (c=0; c<4,c++) {
2663// for (b=0; b<10; b++) {
2664// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2665//
2666// inet_pton(PF_INET, ipStr, &IPaddr) ;
2667//
2668// g_port[i].sockAddr.sin_family = PF_INET;
2669// g_port[i].sockAddr.sin_port = htons(5000) ;
2670// g_port[i].sockAddr.sin_addr = IPaddr ;
2671// g_port[i].sockDef = 1 ;
2672// i++ ;
2673// }
2674//}
2675//
2676//version for PC-test *
2677 for (c = 0; c < 4; c++) {
2678 for (b = 0; b < 10; b++) {
2679 sprintf (ipStr, "10.0.%d.11", 128 + c);
2680 if (c < 2)
2681 sprintf (ipStr, "10.0.%d.11", 128);
2682 else
2683 sprintf (ipStr, "10.0.%d.11", 131);
2684// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2685
2686 inet_pton (PF_INET, ipStr, &IPaddr);
2687 p = 31919 + 100 * c + 10 * b;
2688
2689
2690 g_port[i].sockAddr.sin_family = PF_INET;
2691 g_port[i].sockAddr.sin_port = htons (p);
2692 g_port[i].sockAddr.sin_addr = IPaddr;
2693 g_port[i].sockDef = 1;
2694
2695 i++;
2696 }
2697 }
2698
2699
2700//g_port[17].sockDef =-1 ;
2701//g_actBoards-- ;
2702
2703 StartEvtBuild ();
2704
2705 return 0;
2706
2707}
2708#endif
Note: See TracBrowser for help on using the repository browser.