source: trunk/FACT++/src/EventBuilder.c@ 15357

Last change on this file since 15357 was 15356, checked in by tbretz, 12 years ago
Send gotNewRun as soon as possible (after the header from the first board has been received
File size: 90.5 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6
7#include <stdlib.h>
8#include <stdint.h>
9#include <stdarg.h>
10#include <unistd.h>
11#include <stdio.h>
12#include <sys/time.h>
13#include <arpa/inet.h>
14#include <string.h>
15#include <math.h>
16#include <error.h>
17#include <errno.h>
18#include <unistd.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <netinet/in.h>
22#include <netinet/tcp.h>
23#include <pthread.h>
24#include <sched.h>
25
26#include "EventBuilder.h"
27
28enum Severity
29{
30 kMessage = 10, ///< Just a message, usually obsolete
31 kInfo = 20, ///< An info telling something which can be interesting to know
32 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
33 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
34 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
35 kDebug = 99, ///< A message used for debugging only
36};
37
38#define MIN_LEN 32 // min #bytes needed to interpret FADheader
39#define MAX_LEN 256*1024 // size of read-buffer per socket
40
41//#define nanosleep(x,y)
42
43extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
44extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
45extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
46//extern int runFinish (uint32_t runnr);
47
48extern void factOut (int severity, int err, char *message);
49extern void factReportIncomplete (uint64_t rep);
50
51extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
52
53
54extern void factStat (GUI_STAT gj);
55
56extern void factStatNew (EVT_STAT gi);
57
58extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
59
60extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
61 int8_t * buffer);
62
63extern void debugHead (int i, int j, void *buf);
64
65extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
66 int32_t runnr, int state, uint32_t tsec,
67 uint32_t tusec);
68extern void debugStream (int isock, void *buf, int len);
69
70int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
71
72
73
74
75
76int g_maxProc;
77int g_maxSize;
78int gi_maxSize;
79int gi_maxProc;
80
81uint g_actTime;
82uint g_actUsec;
83int g_runStat;
84int g_reset;
85int g_useFTM;
86
87int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
88size_t g_maxMem; //maximum memory allowed for buffer
89
90//no longer needed ...
91int g_maxBoards; //maximum number of boards to be initialized
92int g_actBoards;
93//
94
95FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
96
97
98int gi_runStat;
99int gp_runStat;
100int gw_runStat;
101
102//int gi_memStat = +1;
103
104uint32_t gi_myRun = 0;
105uint32_t actrun = 0;
106
107
108uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
109
110//uint gi_EvtStart= 0 ;
111//uint gi_EvtRead = 0 ;
112//uint gi_EvtBad = 0 ;
113//uint gi_EvtTot = 0 ;
114//size_t gi_usedMem = 0 ;
115
116//uint gw_EvtTot = 0 ;
117//uint gp_EvtTot = 0 ;
118
119PIX_MAP g_pixMap[NPIX];
120
121EVT_STAT gi;
122GUI_STAT gj;
123
124EVT_CTRL evtCtrl; //control of events during processing
125int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
126
127WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
128
129//#define MXSTR 1000
130//char str[MXSTR];
131
132void factPrintf(int severity, int id, const char *fmt, ...)
133{
134 char str[1000];
135
136 va_list ap;
137 va_start(ap, fmt);
138 vsnprintf(str, 1000, fmt, ap);
139 va_end(ap);
140
141 factOut(severity, id, str);
142}
143
144
145#define THOMAS_MALLOC
146
147#ifdef THOMAS_MALLOC
148#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
149#define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)
150typedef struct TGB_struct
151{
152 struct TGB_struct *prev;
153 void *mem;
154} TGB_entry;
155
156TGB_entry *tgb_last = NULL;
157uint64_t tgb_memory = 0;
158uint64_t tgb_inuse = 0;
159
160void *TGB_Malloc()
161{
162 // No free slot available, next alloc would exceed max memory
163 if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem)
164 return NULL;
165
166 // We will return this amount of memory
167 tgb_inuse += MAX_TOT_MEM;
168
169 // No free slot available, allocate a new one
170 if (!tgb_last)
171 {
172 tgb_memory += MAX_TOT_MEM;
173 return malloc(MAX_TOT_MEM);
174 }
175
176 // Get the next free slot from the stack and return it
177 TGB_entry *last = tgb_last;
178
179 TGB_entry *mem = last->mem;
180 tgb_last = last->prev;
181
182 free(last);
183
184 return mem;
185};
186
187void TGB_free(void *mem)
188{
189 // Add the last free slot to the stack
190 TGB_entry *entry = malloc(sizeof(TGB_entry));
191
192 entry->prev = tgb_last;
193 entry->mem = mem;
194
195 tgb_last = entry;
196
197 // Decrease the amont of memory in use accordingly
198 tgb_inuse -= MAX_TOT_MEM;
199}
200#endif
201
202#ifdef ETIENNE_MALLOC
203//ETIENNE
204#define MAX_SLOTS_PER_CHUNK 100
205
206typedef struct {
207 int32_t eventNumber;
208 int32_t chunk;
209 int32_t slot;
210} CHUNK_MAPPING;
211
212CHUNK_MAPPING mBufferMapping[MAX_EVT * MAX_RUN];
213
214#define MAX_EVT_MEM (sizeof(EVENT) + NPIX*1024*2 + NTMARK*1024*2)
215#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
216#define MAX_SLOT_SIZE (MAX_EVT_MEM + MAX_HEAD_MEM)
217#define MAX_CHUNK_SIZE (MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK)
218
219typedef struct {
220 void * pointers[MAX_SLOTS_PER_CHUNK];
221 int32_t events[MAX_SLOTS_PER_CHUNK];
222 int32_t nFreeSlots;
223 int32_t nSlots;
224} ETI_CHUNK;
225
226int32_t numAllocatedChunks = 0;
227
228#define MAX_CHUNKS 8096
229ETI_CHUNK EtiMemoryChunks[MAX_CHUNKS];
230
231void* ETI_Malloc(int evtId, int evtIndex)
232{
233 for (int i=0;i<numAllocatedChunks;i++) {
234 if (EtiMemoryChunks[i].nFreeSlots > 0) {
235 for (int j=0;j<EtiMemoryChunks[i].nSlots;j++)
236 {
237 if (EtiMemoryChunks[i].events[j] == -1)
238 {
239 EtiMemoryChunks[i].events[j] = evtId;
240 EtiMemoryChunks[i].nFreeSlots--;
241 if (EtiMemoryChunks[i].nFreeSlots < 0)
242 {
243 factPrintf(kError, 0, "Number of free slot in chunk %d went below zero (%d) slot: %d", i, EtiMemoryChunks[i].nFreeSlots, j);
244 return NULL;
245 }
246 mBufferMapping[evtIndex].eventNumber = evtId;
247 mBufferMapping[evtIndex].chunk = i;
248 mBufferMapping[evtIndex].slot = j;
249 return EtiMemoryChunks[i].pointers[j];
250 }
251 }
252 //If I reach this point then we have a problem because it should have found
253 //a free spot just above.
254 factPrintf(kError, 0, "Could not find a free slot in a chunk that's supposed to have some. chunk=%d", i);
255 return NULL;
256 }
257 }
258 //If we reach this point this means that we should allocate more memory
259 int32_t numNewSlots = MAX_SLOTS_PER_CHUNK;
260 if ((numAllocatedChunks + 1)*MAX_CHUNK_SIZE >= g_maxMem)
261 return NULL;
262
263 EtiMemoryChunks[numAllocatedChunks].pointers[0] = malloc(MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK);
264 if (EtiMemoryChunks[numAllocatedChunks].pointers[0] == NULL)
265 {
266 factPrintf(kError, 0, "Allocation of %lu bytes failed. %d chunks are currently allocated (max allowed %lu bytes)", MAX_CHUNK_SIZE, numAllocatedChunks, g_maxMem);
267 return NULL;
268 }
269
270 EtiMemoryChunks[numAllocatedChunks].nSlots = numNewSlots;
271 EtiMemoryChunks[numAllocatedChunks].events[0] = evtId;
272 EtiMemoryChunks[numAllocatedChunks].nFreeSlots = numNewSlots-1;
273 mBufferMapping[evtIndex].eventNumber = evtId;
274 mBufferMapping[evtIndex].chunk = numAllocatedChunks;
275 mBufferMapping[evtIndex].slot = 0;
276
277 for (int i=1;i<numNewSlots;i++)
278 {
279 EtiMemoryChunks[numAllocatedChunks].pointers[i] = (char*)EtiMemoryChunks[numAllocatedChunks].pointers[0] + i*MAX_SLOT_SIZE;// &(((char*)(EtiMemoryChunks[numAllocatedChunks].pointers[i-1]))[MAX_SLOT_SIZE]);
280 EtiMemoryChunks[numAllocatedChunks].events[i] = -1;
281 }
282 numAllocatedChunks++;
283
284 return EtiMemoryChunks[numAllocatedChunks-1].pointers[0];
285}
286
287void ETI_Free(int evtId, int evtIndex)
288{
289 ETI_CHUNK* currentChunk = &EtiMemoryChunks[mBufferMapping[evtIndex].chunk];
290 if (currentChunk->events[mBufferMapping[evtIndex].slot] != evtId)
291 {
292 factPrintf(kError, 0, "Mismatch in chunk mapping table. Expected evtId %d. Got %d. No memory was freed.", evtId, currentChunk->events[mBufferMapping[evtIndex].slot]);
293 return;
294 }
295 currentChunk->events[mBufferMapping[evtIndex].slot] = -1;
296 currentChunk->nFreeSlots++;
297
298 return; /* TEST */
299
300 int chunkIndex = mBufferMapping[evtIndex].chunk;
301 if (chunkIndex != numAllocatedChunks-1)
302 return;
303
304 while (EtiMemoryChunks[chunkIndex].nFreeSlots == EtiMemoryChunks[chunkIndex].nSlots)
305 {//free this chunk
306 if (EtiMemoryChunks[chunkIndex].pointers[0] == NULL)
307 {
308 factPrintf(kError, 0, "Chunk %d not allocated as it ought to be. Skipping memory release.", chunkIndex);
309 return;
310 }
311 free(EtiMemoryChunks[chunkIndex].pointers[0]);
312 EtiMemoryChunks[chunkIndex].pointers[0] = NULL;
313 EtiMemoryChunks[chunkIndex].nSlots = 0;
314 numAllocatedChunks--;
315 chunkIndex--;
316 if (numAllocatedChunks == 0)
317 break;
318 }
319}
320//END ETIENNE
321#endif
322
323
324RUN_HEAD actRun;
325
326RUN_CTRL runCtrl[MAX_RUN];
327
328RUN_TAIL runTail[MAX_RUN];
329
330
331/*
332*** Definition of rdBuffer to read in IP packets; keep it global !!!!
333 */
334
335
336typedef union
337{
338 int8_t B[MAX_LEN];
339 int16_t S[MAX_LEN / 2];
340 int32_t I[MAX_LEN / 4];
341 int64_t L[MAX_LEN / 8];
342} CNV_FACT;
343
344typedef struct
345{
346 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
347 int32_t bufPos; //next byte to read to the buffer next
348 int32_t bufLen; //number of bytes left to read
349// size_t bufLen; //number of bytes left to read size_t might be better
350 int32_t skip; //number of bytes skipped before start of event
351
352 int errCnt; //how often connect failed since last successful
353 int sockStat; //-1 if socket not yet connected , 99 if not exist
354 int socket; //contains the sockets
355 struct sockaddr_in SockAddr; //IP for each socket
356
357 int evtID; // event ID of event currently read
358 int runID; // run "
359 int ftmID; // event ID from FTM
360 uint fadLen; // FADlength of event currently read
361 int fadVers; // Version of FAD
362 int ftmTyp; // trigger type
363 int board; // boardID (softwareID: 0..40 )
364 int Port;
365
366 CNV_FACT *rBuf;
367
368#ifdef EVTDEBUG
369 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
370#endif
371
372} READ_STRUCT;
373
374
375typedef union
376{
377 int8_t B[2];
378 int16_t S;
379} SHORT_BYTE;
380
381
382
383
384
385SHORT_BYTE start, stop;
386
387READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
388
389
390
391/*-----------------------------------------------------------------*/
392
393
394/*-----------------------------------------------------------------*/
395
396
397
398int
399runFinish1 (uint32_t runnr)
400{
401 factPrintf(kInfo, 173, "Should finish(1) run %d (but not yet possible)", runnr);
402 return 0;
403}
404int
405runFinish (uint32_t runnr)
406{
407 factPrintf(kInfo, 173, "Should finish run %d (but not yet possible)", runnr);
408 return 0;
409}
410
411int
412GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
413 READ_STRUCT * rd)
414{
415/*
416*** generate Address, create sockets and allocates readbuffer for it
417***
418*** if flag==0 generate socket and buffer
419*** <0 destroy socket and buffer
420*** >0 close and redo socket
421***
422*** sid : board*7 + port id
423 */
424
425 int j;
426 int optval = 1; //activate keepalive
427 socklen_t optlen = sizeof (optval);
428
429
430 if (sid % 7 >= NUMSOCK) {
431 //this is a not used socket, so do nothing ...
432 rd->sockStat = 77;
433 rd->rBuf = NULL ;
434 return 0;
435 }
436
437 if (rd->sockStat == 0) { //close socket if open
438 j = close (rd->socket);
439 if (j > 0) {
440 factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno);
441 } else {
442 factPrintf(kInfo, 771, "Succesfully closed socket %d", sid);
443 }
444 }
445
446 rd->sockStat = 99;
447
448 if (flag < 0) {
449 free (rd->rBuf); //and never open again
450#ifdef EVTDEBUG
451 free (rd->xBuf); //and never open again
452#endif
453 rd->rBuf = NULL;
454 return 0;
455 }
456
457
458 if (flag == 0) { //generate address and buffer ...
459 rd->Port = port;
460 rd->SockAddr.sin_family = sockAddr->sin_family;
461 rd->SockAddr.sin_port = htons (port);
462 rd->SockAddr.sin_addr = sockAddr->sin_addr;
463
464#ifdef EVTDEBUG
465 rd->xBuf = malloc (sizeof (CNV_FACT));
466#endif
467 rd->rBuf = malloc (sizeof (CNV_FACT));
468 if (rd->rBuf == NULL) {
469 factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid);
470 rd->sockStat = 77;
471 return -3;
472 }
473 }
474
475
476 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
477 factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno);
478 rd->sockStat = 88;
479 return -2;
480 }
481 optval = 1;
482 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
483 factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
484 }
485 optval = 10; //start after 10 seconds
486 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
487 factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
488 }
489 optval = 10; //do every 10 seconds
490 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
491 factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
492 }
493 optval = 2; //close after 2 unsuccessful tries
494 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
495 factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
496 }
497
498 factPrintf(kInfo, 773, "Successfully generated socket %d", sid);
499
500 rd->sockStat = -1; //try to (re)open socket
501 rd->errCnt = 0;
502 return 0;
503
504} /*-----------------------------------------------------------------*/
505
506 /*-----------------------------------------------------------------*/
507
508
509
510
511int
512mBufInit ()
513{
514// initialize mBuffer (mark all entries as unused\empty)
515
516 uint32_t actime = g_actTime + 50000000;
517
518 for (int i = 0; i < MAX_EVT * MAX_RUN; i++) {
519 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
520 mBuffer[i].runNum = 0;
521
522 evtCtrl.evtBuf[i] = -1;
523 evtCtrl.evtStat[i] = -1;
524 evtCtrl.pcTime[i] = actime; //initiate to far future
525
526#ifdef ETIENNE_MALLOC
527 //ETIENNE
528 mBufferMapping[i].chunk = -1;
529 mBufferMapping[i].eventNumber = -1;
530 mBufferMapping[i].slot = -1;
531 //END ETIENNE
532#endif
533 }
534#ifdef ETIENNE_MALLOC
535 for (int j=0;j<MAX_CHUNKS;j++)
536 EtiMemoryChunks[j].pointers[0] = NULL;
537#endif
538
539 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
540
541 evtCtrl.frstPtr = 0;
542 evtCtrl.lastPtr = 0;
543
544 return 0;
545
546} /*-----------------------------------------------------------------*/
547
548
549
550
551int
552mBufEvt (int evID, uint runID, int nRoi[], int sk,
553 int fadlen, int trgTyp, int trgNum, int fadNum)
554{
555// generate a new Event into mBuffer:
556// make sure only complete Event are possible, so 'free' will always work
557// returns index into mBuffer[], or negative value in case of error
558// error: <-9000 if roi screwed up (not consistent with run)
559// <-8000 (not consistent with event)
560// <-7000 (not consistent with board)
561// < 0 if no space left
562
563// int evFree;
564// int headmem = 0;
565// size_t needmem = 0;
566
567
568 const int b = sk / 7;
569
570 if (nRoi[0] < 0 || nRoi[0] > 1024) {
571 factPrintf(kError, 999, "Illegal roi in channel 0: %d (allowed: 0<=roi<=1024)", nRoi[0]);
572 gj.badRoiR++;
573 gj.badRoi[b]++;
574 return -9999;
575 }
576
577 for (int jr = 1; jr < 8; jr++) {
578 if (nRoi[jr] != nRoi[0]) {
579 factPrintf(kError, 711, "Mismatch of roi (%d) in channel %d with roi (%d) in channel 0.", nRoi[jr], jr, nRoi[0]);
580 gj.badRoiB++;
581 gj.badRoi[b]++;
582 return -7101;
583 }
584 }
585 if (nRoi[8] < nRoi[0]) {
586 factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", nRoi[8], nRoi[0]);
587 gj.badRoiB++;
588 gj.badRoi[b]++;
589 return -7102;
590 }
591
592
593 int i = evID % MAX_EVT;
594 int evFree = -1;
595
596 for (int k = 0; k < MAX_RUN; k++) {
597 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
598 // is it ok ????
599 if (mBuffer[i].nRoi != nRoi[0]
600 || mBuffer[i].nRoiTM != nRoi[8]) {
601 factPrintf(kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
602 mBuffer[i].nRoi, mBuffer[i].nRoiTM, nRoi[0], nRoi[8]);
603 gj.badRoiE++;
604 gj.badRoi[b]++;
605 return -8201;
606 }
607// count for inconsistencies
608
609 if (mBuffer[i].trgNum != trgNum)
610 mBuffer[i].Errors[0]++;
611 if (mBuffer[i].fadNum != fadNum)
612 mBuffer[i].Errors[1]++;
613 if (mBuffer[i].trgTyp != trgTyp)
614 mBuffer[i].Errors[2]++;
615
616 //everything seems fine so far ==> use this slot ....
617 return i;
618 }
619 if (evFree < 0 && mBuffer[i].evNum < 0)
620 evFree = i;
621 i += MAX_EVT;
622 }
623
624
625 //event does not yet exist; create it
626 if (evFree < 0) { //no space available in ctrl
627 factPrintf(kError, 881, "No control slot to keep event %d", evID);
628 return -1;
629 }
630 i = evFree; //found free entry; use it ...
631
632 struct timeval tv;
633 gettimeofday (&tv, NULL);
634
635 const uint32_t tsec = tv.tv_sec;
636 const uint32_t tusec = tv.tv_usec;
637
638 //check if runId already registered in runCtrl
639 evFree = -1;
640
641 uint oldest = g_actTime + 1000;
642 int jold = -1;
643
644 for (int k = 0; k < MAX_RUN; k++) {
645 if (runCtrl[k].runId == runID) {
646// if (runCtrl[k].procId > 0) { //run is closed -> reject
647// snprintf (str, MXSTR, "skip event since run %d finished", runID);
648// factOut (kInfo, 931, str);
649// return -21;
650// }
651
652 if (runCtrl[k].roi0 != nRoi[0]
653 || runCtrl[k].roi8 != nRoi[8]) {
654 factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d.",
655 runCtrl[k].roi0, runCtrl[k].roi8, nRoi[0], nRoi[8]);
656 gj.badRoiR++;
657 gj.badRoi[b]++;
658 return -9301;
659 }
660 goto RUNFOUND;
661 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
662 evFree = k;
663 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
664 if (runCtrl[k].closeTime < oldest) {
665 oldest = runCtrl[k].closeTime;
666 jold = k;
667 }
668 }
669 }
670
671 if (evFree < 0 && jold < 0) {
672 factPrintf(kFatal, 883, "Not able to register the new run %d", runID);
673 return -1001;
674 }
675
676 if (evFree < 0)
677 evFree = jold;
678 factPrintf(kInfo, 503, "New run %d (evFree=%d) registered with roi=%d and roi_tm=%d", runID,
679 evFree, nRoi[0], nRoi[8]);
680 runCtrl[evFree].runId = runID;
681 runCtrl[evFree].roi0 = nRoi[0];
682 runCtrl[evFree].roi8 = nRoi[8];
683 runCtrl[evFree].fileId = -2;
684 runCtrl[evFree].procId = -2;
685 runCtrl[evFree].lastEvt = -1;
686 runCtrl[evFree].nextEvt = 0;
687 runCtrl[evFree].actEvt = 0;
688 runCtrl[evFree].procEvt = 0;
689 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
690 runCtrl[evFree].firstUsec = tusec;
691 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
692 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
693// runCtrl[evFree].lastTime = 0;
694
695 runTail[evFree].nEventsOk =
696 runTail[evFree].nEventsRej =
697 runTail[evFree].nEventsBad =
698 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
699
700 RUNFOUND:
701 //ETIENNE
702/* needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
703
704 headmem = NBOARDS * sizeof (PEVNT_HEADER);
705
706 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
707 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
708 if (gi_memStat > 0) {
709 gi_memStat = -99;
710 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
711 evID, sk);
712 factOut (kError, 882, str);
713 } else {
714 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
715 evID, sk);
716 factOut (kDebug, 882, str);
717 }
718 return -11;
719 }
720 */
721
722#ifdef THOMAS_MALLOC
723 mBuffer[i].FADhead = TGB_Malloc();
724 mBuffer[i].fEvent = NULL;
725 mBuffer[i].buffer = NULL;
726 if (mBuffer[i].FADhead == NULL) {
727 factPrintf(kError, 882, "malloc header failed for event %d", evID);
728 return -12;
729 }
730 mBuffer[i].fEvent = (void*)((char*)mBuffer[i].FADhead+MAX_HEAD_MEM);
731#endif
732
733#ifdef ETIENNE_MALLOC
734 mBuffer[i].FADhead = ETI_Malloc(evID, i);
735 mBuffer[i].buffer = NULL;
736 if (mBuffer[i].FADhead != NULL)
737 mBuffer[i].fEvent = (EVENT*)&(((char*)(mBuffer[i].FADhead))[MAX_HEAD_MEM]);
738 else
739 {
740 mBuffer[i].fEvent = NULL;
741 gj.usdMem = 0;
742 for (int k=0;k<numAllocatedChunks;k++)
743 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
744 if (gj.usdMem > gj.maxMem)
745 gj.maxMem = gj.usdMem;
746 /*if (gi_memStat > 0) {
747 gi_memStat = -99;
748 snprintf (str, MXSTR, "No memory left to keep event %6d sock %3d",
749 evID, sk);
750 factOut (kError, 882, str);
751 }*/
752#ifdef EVTDEBUG
753 else
754 {
755 factPrintf(kDebug, 882, "No memory left to keep event %6d sock %3d", evID, sk);
756 }
757#endif
758 return -11;
759 }
760#endif
761
762#ifdef STANDARD_MALLOC
763
764 mBuffer[i].FADhead = malloc (MAX_HEAD_MEM+MAX_EVT_MEM);
765 mBuffer[i].fEvent = NULL;
766 mBuffer[i].buffer = NULL;
767 if (mBuffer[i].FADhead == NULL) {
768 factPrintf(kError, 882, "malloc header failed for event %d", evID);
769 return -12;
770 }
771 mBuffer[i].fEvent = (void*)((char*)mBuffer[i].FADhead+MAX_HEAD_MEM);
772#endif
773
774 /*
775 mBuffer[i].FADhead = malloc (headmem);
776 if (mBuffer[i].FADhead == NULL) {
777 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
778 factOut (kError, 882, str);
779 return -12;
780 }
781
782 mBuffer[i].fEvent = malloc (needmem);
783 if (mBuffer[i].fEvent == NULL) {
784 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
785 factOut (kError, 882, str);
786 free (mBuffer[i].FADhead);
787 mBuffer[i].FADhead = NULL;
788 return -22;
789 }
790
791 mBuffer[i].buffer = malloc (gi_maxSize);
792 if (mBuffer[i].buffer == NULL) {
793 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
794 factOut (kError, 882, str);
795 free (mBuffer[i].FADhead);
796 mBuffer[i].FADhead = NULL;
797 free (mBuffer[i].fEvent);
798 mBuffer[i].fEvent = NULL;
799 return -32;
800 }*/
801 //END ETIENNE
802 //flag all boards as unused
803 mBuffer[i].nBoard = 0;
804 for (int k = 0; k < NBOARDS; k++) {
805 mBuffer[i].board[k] = -1;
806 }
807 //flag all pixels as unused
808 for (int k = 0; k < NPIX; k++) {
809 mBuffer[i].fEvent->StartPix[k] = -1;
810 }
811 //flag all TMark as unused
812 for (int k = 0; k < NTMARK; k++) {
813 mBuffer[i].fEvent->StartTM[k] = -1;
814 }
815
816 mBuffer[i].fEvent->NumBoards = 0;
817 mBuffer[i].fEvent->PCUsec = tusec;
818 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
819 mBuffer[i].nRoi = nRoi[0];
820 mBuffer[i].nRoiTM = nRoi[8];
821 mBuffer[i].evNum = evID;
822 mBuffer[i].runNum = runID;
823 mBuffer[i].fadNum = fadNum;
824 mBuffer[i].trgNum = trgNum;
825 mBuffer[i].trgTyp = trgTyp;
826// mBuffer[i].evtLen = needmem;
827 mBuffer[i].Errors[0] =
828 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
829
830#ifdef ETIENNE_MALLOC
831//ETIENNE
832 //gj.usdMem += needmem + headmem + gi_maxSize;
833 gj.usdMem = 0;
834 for (int k=0;k<numAllocatedChunks;k++)
835 {
836 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
837 }
838//END ETIENNE
839#endif
840
841#ifdef THOMAS_MALLOC
842 gj.usdMem = tgb_inuse;
843#endif
844
845 if (gj.usdMem > gj.maxMem)
846 gj.maxMem = gj.usdMem;
847
848 gj.bufTot++;
849 if (gj.bufTot > gj.maxEvt)
850 gj.maxEvt = gj.bufTot;
851
852 gj.rateNew++;
853
854 //register event in 'active list (reading)'
855
856 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
857 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
858 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
859 evtIdx[i] = evtCtrl.lastPtr;
860
861
862#ifdef EVTDEBUG
863 factPrintf(kDebug, -11, "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d",
864 evID, runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
865#endif
866 evtCtrl.lastPtr++;
867 evtCtrl.lastPtr %= MAX_EVT * MAX_RUN;
868
869 gi.evtGet++;
870
871 return i;
872
873} /*-----------------------------------------------------------------*/
874
875
876
877
878int
879mBufFree (int i)
880{
881//delete entry [i] from mBuffer:
882//(and make sure multiple calls do no harm ....)
883
884// int headmem = 0;
885// size_t freemem = 0;
886
887#ifdef ETIENNE_MALLOC
888 int evid;
889 evid = mBuffer[i].evNum;
890 ETI_Free(evid, i);
891#endif
892
893// freemem = mBuffer[i].evtLen;
894 //ETIENNE
895#ifdef THOMAS_MALLOC
896 TGB_free(mBuffer[i].FADhead);
897#endif
898
899#ifdef STANDARD_MALLOC
900 free (mBuffer[i].FADhead);
901#endif
902
903// free (mBuffer[i].fEvent);
904 mBuffer[i].fEvent = NULL;
905
906// free (mBuffer[i].FADhead);
907 mBuffer[i].FADhead = NULL;
908
909// free (mBuffer[i].buffer);
910 mBuffer[i].buffer = NULL;
911//END ETIENNE
912// headmem = NBOARDS * sizeof (PEVNT_HEADER);
913 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
914 mBuffer[i].runNum = 0;
915
916#ifdef ETIENNE_MALLOC
917//ETIENNE
918// gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
919 gj.usdMem = 0;
920 for (int k=0;k<numAllocatedChunks;k++)
921 {
922 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
923 }
924//END ETIENNE
925#endif
926
927#ifdef THOMAS_MALLOC
928 gj.usdMem = tgb_inuse;
929#endif
930
931 gj.bufTot--;
932
933 /*if (gi_memStat < 0) {
934 if (gj.usdMem <= 0.75 * gj.maxMem)
935 gi_memStat = +1;
936 }*/
937
938 return 0;
939
940} /*-----------------------------------------------------------------*/
941
942
943void
944resetEvtStat ()
945{
946 for (int i = 0; i < MAX_SOCK; i++)
947 gi.numRead[i] = 0;
948
949 for (int i = 0; i < NBOARDS; i++) {
950 gi.gotByte[i] = 0;
951 gi.gotErr[i] = 0;
952
953 }
954
955 gi.evtGet = 0; //#new Start of Events read
956 gi.evtTot = 0; //#complete Events read
957 gi.evtErr = 0; //#Events with Errors
958 gi.evtSkp = 0; //#Events incomplete (timeout)
959
960 gi.procTot = 0; //#Events processed
961 gi.procErr = 0; //#Events showed problem in processing
962 gi.procTrg = 0; //#Events accepted by SW trigger
963 gi.procSkp = 0; //#Events rejected by SW trigger
964
965 gi.feedTot = 0; //#Events used for feedBack system
966 gi.feedErr = 0; //#Events rejected by feedBack
967
968 gi.wrtTot = 0; //#Events written to disk
969 gi.wrtErr = 0; //#Events with write-error
970
971 gi.runOpen = 0; //#Runs opened
972 gi.runClose = 0; //#Runs closed
973 gi.runErr = 0; //#Runs with open/close errors
974
975 return;
976} /*-----------------------------------------------------------------*/
977
978void reportIncomplete(int k0)
979{
980 const int id = evtCtrl.evtBuf[k0];
981
982 factPrintf(kWarn, 601, "%5d skip incomplete evt %8d %8d %2d",
983 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0, evtCtrl.evtStat[k0]);
984
985 uint64_t report = 0;
986
987 char str[1000];
988
989 int ik=0;
990 for (int ib=0; ib<NBOARDS; ib++)
991 {
992 if (ib%10==0)
993 str[ik++] = '|';
994
995 const int jb = mBuffer[id].board[ib];
996 if (jb>=0) // data received from that board
997 {
998 str[ik++] = '0'+(jb%10);
999 continue;
1000 }
1001
1002 // FIXME: Is that really 'b' or should that be 'ib' ?
1003 if (gi_NumConnect[ib]<=0) // board not connected
1004 {
1005 str[ik++] = 'x';
1006 continue;
1007 }
1008
1009 // data from this board lost
1010 str[ik++] = '.';
1011 report |= ((uint64_t)1)<<ib;
1012 }
1013
1014 str[ik++] = '|';
1015 str[ik] = 0;
1016
1017 factOut(kWarn, 601, str);
1018
1019 factReportIncomplete(report);
1020}
1021
1022
1023
1024void
1025initReadFAD ()
1026{
1027 return;
1028} /*-----------------------------------------------------------------*/
1029
1030//struct rnd
1031//{
1032// int val;
1033// int idx;
1034//};
1035//
1036//struct rnd random_arr[MAX_SOCK];
1037//
1038//int compare(const void *f1, const void *f2)
1039//{
1040// struct rnd *r1 = (struct rnd*)f1;
1041// struct rnd *r2 = (struct rnd*)f2;
1042// return r1->val - r2->val;
1043//}
1044
1045
1046void *
1047readFAD (void *ptr)
1048{
1049/* *** main loop reading FAD data and sorting them to complete events */
1050 int actBoards = 0;
1051 uint gi_SecTime; //time in seconds
1052
1053 int sockDef[NBOARDS]; //internal state of sockets
1054
1055 struct timeval tv;
1056 uint32_t tsec, tusec;
1057
1058
1059 factPrintf(kInfo, -1, "Start initializing (readFAD)");
1060
1061// int cpu = 7; //read thread
1062// cpu_set_t mask;
1063
1064/* CPU_ZERO initializes all the bits in the mask to zero. */
1065// CPU_ZERO (&mask);
1066/* CPU_SET sets only the bit corresponding to cpu. */
1067// cpu = 7;
1068// CPU_SET (cpu, &mask);
1069
1070/* sched_setaffinity returns 0 in success */
1071// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1072// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
1073// factOut (kWarn, -1, str);
1074// }
1075
1076
1077 int frst_len = sizeof(PEVNT_HEADER); //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
1078 int minLen = sizeof(PEVNT_HEADER); //min #bytes needed to check header: full header for debug
1079
1080 start.S = 0xFB01;
1081 stop.S = 0x04FE;
1082
1083/* initialize run control logics */
1084 for (int i = 0; i < MAX_RUN; i++) {
1085 runCtrl[i].runId = 0;
1086 runCtrl[i].fileId = -2;
1087 runCtrl[i].procId = -2;
1088 }
1089 gi_resetS = gi_resetR = 9;
1090 for (int i = 0; i < NBOARDS; i++)
1091 sockDef[i] = 0;
1092
1093 START:
1094 gettimeofday (&tv, NULL);
1095 g_actTime = tsec = tv.tv_sec;
1096 g_actUsec = tusec = tv.tv_usec;
1097 gi_myRun = g_actTime;
1098 evtCtrl.frstPtr = 0;
1099 evtCtrl.lastPtr = 0;
1100
1101 gi_SecTime = g_actTime;
1102 gi_runStat = g_runStat;
1103 gj.readStat = g_runStat;
1104
1105 const int cntsock = 8 - NUMSOCK ;
1106
1107 if (gi_resetS > 0) {
1108 //make sure all sockets are preallocated as 'not exist'
1109 for (int i = 0; i < MAX_SOCK; i++) {
1110 rd[i].socket = -1;
1111 rd[i].sockStat = 99;
1112 }
1113
1114 for (int k = 0; k < NBOARDS; k++) {
1115 gi_NumConnect[k] = 0;
1116 gi.numConn[k] = 0;
1117 gj.numConn[k] = 0;
1118 gj.errConn[k] = 0;
1119 gj.rateBytes[k] = 0;
1120 gj.totBytes[k] = 0;
1121 }
1122
1123 }
1124
1125
1126 if (gi_resetR > 0) {
1127 resetEvtStat ();
1128 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
1129 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
1130#ifdef THOMAS_MALLOC
1131 gj.totMem = tgb_memory;
1132#else
1133 gj.totMem = g_maxMem;
1134#endif
1135 gj.bufNew = gj.bufEvt = 0;
1136 gj.badRoiE = gj.badRoiR = gj.badRoiB =
1137 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
1138
1139 for (int b = 0; b < NBOARDS; b++)
1140 gj.badRoi[b] = 0;
1141
1142 mBufInit (); //initialize buffers
1143
1144 factPrintf(kInfo, -1, "End initializing (readFAD)");
1145 }
1146
1147
1148 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
1149
1150 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
1151
1152 gi_runStat = g_runStat;
1153 gj.readStat = g_runStat;
1154 gettimeofday (&tv, NULL);
1155 g_actTime = tsec = tv.tv_sec;
1156 g_actUsec = tusec = tv.tv_usec;
1157
1158
1159 int nch = 0;
1160 for (int b = 0; b < NBOARDS; b++) {
1161 int k = b * 7;
1162 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
1163 nch++;
1164 gi_NumConnect[b] = 0; //must close all connections
1165 gi.numConn[b] = 0;
1166 gj.numConn[b] = 0;
1167
1168 int s0;
1169 if (sockDef[b] == 0)
1170 s0 = 0; //sockets to be defined and opened
1171 else if (g_port[b].sockDef == 0)
1172 s0 = -1; //sockets to be destroyed
1173 else
1174 s0 = +1; //sockets to be closed and reopened
1175
1176 const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0;
1177
1178 for (int p = p0 + 1; p < p0 + 8; p++) {
1179 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
1180 k++;
1181 }
1182 sockDef[b] = g_port[b].sockDef;
1183 }
1184 }
1185
1186 if (nch > 0) {
1187 actBoards = 0;
1188 for (int b = 0; b < NBOARDS; b++) {
1189 if (sockDef[b] > 0)
1190 actBoards++;
1191 }
1192 }
1193
1194
1195#ifdef EVTDEBUG
1196 int jrdx = 0;
1197#endif
1198
1199 int numokx = 0;
1200 int numok = 0; //count number of succesfull actions
1201
1202/*
1203 for (i=0; i<MAX_SOCK/7; i++)
1204 {
1205 random_arr[i].val = rand();
1206 random_arr[i].idx = i;
1207 }
1208
1209 qsort(random_arr, MAX_SOCK/7, sizeof(struct rnd), compare);
1210
1211 for (int iii = 0; iii < MAX_SOCK/7; iii++) { //check all sockets if something to read
1212
1213 b = random_arr[iii].idx;
1214 i = b*7;
1215 p = 0;
1216 */
1217
1218 for (int i = 0; i < MAX_SOCK; i+=7) { //check all sockets if something to read
1219
1220 const int b = i / 7 ;
1221 //const int p = i % 7 ;
1222
1223/*if ( p >= NUMSOCK) { ; }
1224 else*/ {
1225 const int s0 = sockDef[b] > 0 ? +1 : -1;
1226
1227 if (rd[i].sockStat < 0) { //try to connect if not yet done
1228 if (rd[i].sockStat == -1) {
1229 rd[i].sockStat = connect (rd[i].socket,
1230 (struct sockaddr *) &rd[i].SockAddr,
1231 sizeof (rd[i].SockAddr));
1232 if (rd[i].sockStat == -1) {
1233 rd[i].errCnt++ ;
1234 usleep(25000);
1235// if (rd[i].errCnt < 100) rd[i].sockStat = rd[i].errCnt ;
1236// else if (rd[i].errCnt < 1000) rd[i].sockStat = 1000 ;
1237// else rd[i].sockStat = 10000 ;
1238 }
1239//printf("try to connect %d -> %d\n",i,rd[i].sockStat);
1240
1241 }
1242 if (rd[i].sockStat < -1 ) {
1243 rd[i].sockStat++ ;
1244 }
1245 if (rd[i].sockStat == 0) { //successfull ==>
1246 if (sockDef[b] > 0) {
1247 rd[i].bufTyp = 0; // expect a header
1248 rd[i].bufLen = frst_len; // max size to read at begining
1249 } else {
1250 rd[i].bufTyp = -1; // full data to be skipped
1251 rd[i].bufLen = MAX_LEN; //huge for skipping
1252 }
1253 rd[i].bufPos = 0; // no byte read so far
1254 rd[i].skip = 0; // start empty
1255// gi_NumConnect[b]++;
1256 gi_NumConnect[b] += cntsock ;
1257
1258 gi.numConn[b]++;
1259 gj.numConn[b]++;
1260 factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", b, gi.numConn[b]);
1261 }
1262 }
1263
1264 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
1265
1266 if (rd[i].bufLen<=0)
1267 {
1268#ifdef EVTDEBUG
1269 factPrintf(kDebug, 301, "do not read from socket %d %d", i, rd[i].bufLen);
1270#endif
1271 continue;
1272 }
1273
1274// if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
1275 numok++;
1276
1277 const int32_t jrd =
1278 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
1279 rd[i].bufLen, MSG_DONTWAIT);
1280
1281#ifdef EVTDEBUG
1282 if (jrd > 0) {
1283 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
1284 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
1285 &rd[i].rBuf->B[rd[i].bufPos], jrd);
1286 factPrintf(kDebug, 301, "read sock %3d bytes %5d len %5d first %d %d",
1287 i, jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos], rd[i].rBuf->B[rd[i].bufPos + 1]);
1288 }
1289#endif
1290
1291 if (jrd == 0) { //connection has closed ...
1292 factPrintf(kInfo, 441, "Socket %d closed by FAD", i);
1293 GenSock (s0, i, 0, NULL, &rd[i]);
1294 gi.gotErr[b]++;
1295// gi_NumConnect[b]--;
1296 gi_NumConnect[b]-= cntsock ;
1297 gi.numConn[b]--;
1298 gj.numConn[b]--;
1299
1300 } else if (jrd < 0) { //did not read anything
1301 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1302 factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);
1303 gi.gotErr[b]++;
1304 } else
1305 numok--; //else nothing waiting to be read
1306 //jrd = 0;
1307 }
1308/* } else {
1309 // jrd = 0; //did read nothing as requested
1310#ifdef EVTDEBUG
1311 snprintf (str, MXSTR, "do not read from socket %d %d", i,
1312 rd[i].bufLen);
1313 factOut (kDebug, 301, str);
1314#endif
1315 }
1316*/
1317 if (jrd<=0)
1318 continue;
1319
1320 gi.gotByte[b] += jrd;
1321 gj.rateBytes[b] += jrd;
1322
1323 //if (jrd > 0) {
1324 numokx++;
1325#ifdef EVTDEBUG
1326 jrdx += jrd;
1327#endif
1328 //}
1329
1330
1331 if (rd[i].bufTyp < 0) { // we are skipping this board ...
1332// just do nothing
1333#ifdef EVTDEBUG
1334 factPrintf(kInfo, 301, "skipping %d bytes on socket %d", jrd, i);
1335#endif
1336 continue;
1337 }
1338
1339 rd[i].bufPos += jrd; //==> prepare for continuation
1340 rd[i].bufLen -= jrd;
1341
1342 if (rd[i].bufTyp > 0) { // we are reading data ...
1343
1344 if (rd[i].bufLen>0/*jrd < rd[i].bufLen*/) { //not yet all read
1345#ifdef EVTDEBUG
1346 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
1347#endif
1348 continue;
1349 }
1350
1351 //int pos = rdi_bufPos = rd[i].bufPos;
1352 //int pos = rdi_bufLen = rd[i].bufLen;
1353 //int pos = rdi_bufTyp = rd[i].bufTyp;
1354
1355 rd[i].bufTyp = 0; //ready to read next header
1356 rd[i].bufLen = frst_len;
1357 rd[i].bufPos = 0;
1358
1359 //{
1360 //full dataset read
1361 //rd[i].bufLen = 0;
1362 //rd[i].bufPos = rd[i].fadLen;
1363 if (rd[i].rBuf->B[rd[i].fadLen - 1] != stop.B[0] ||
1364 rd[i].rBuf->B[rd[i].fadLen - 2] != stop.B[1]) {
1365 gi.evtErr++;
1366 factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), expected %3d %3d, got %3d %3d",
1367 i, rd[i].evtID, rd[i].fadLen, stop.B[0], stop.B[1],
1368 rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]);
1369 //goto EndBuf;
1370 continue;
1371
1372#ifdef EVTDEBUG
1373 } else {
1374 factPrintf(kDebug, 301, "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
1375 i, rd[i].fadLen, rd[i].rBuf->B[0],
1376 rd[i].rBuf->B[1], start.B[1], start.B[0],
1377 rd[i].rBuf->B[rd[i].fadLen - 2],
1378 rd[i].rBuf->B[rd[i].fadLen - 1], stop.B[1],
1379 stop.B[0]);
1380#endif
1381 }
1382
1383#ifdef EVTDEBUG
1384 //if (jrd > 0)
1385 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1386#endif
1387
1388/* //we have a complete buffer, copy to WORK area
1389 int jr, kr;
1390 int checkRoi;
1391 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1392 for (kr = 1; kr < 4; kr++)
1393 {
1394 checkRoi = ntohs(rd[i].rBuf->S[head_len/ 2 +
1395 + kr*(roi[jr-1]+4)]);
1396 if (checkRoi != roi[0])
1397 {
1398 snprintf (str, MXSTR, "Inconsistent Roi accross board patches");
1399 factOut (kFatal, 1, str);
1400 goto EndBuf;
1401 }
1402
1403 }
1404 for (jr = 1; jr < 9; jr++) {
1405 roi[jr] =
1406 ntohs (rd[i].
1407 rBuf->S[head_len / 2 + 2 + jr * (roi[jr-1] + 4)]);
1408
1409 }
1410*/
1411 //we have a complete buffer, copy to WORK area
1412
1413
1414 //End of the header. to channels now
1415 int eStart = 36;
1416 for (int ePatchesCount = 0; ePatchesCount<4*9;ePatchesCount++)
1417 {
1418 rd[i].rBuf->S[eStart+0] = ntohs(rd[i].rBuf->S[eStart+0]);//id
1419 rd[i].rBuf->S[eStart+1] = ntohs(rd[i].rBuf->S[eStart+1]);//start_cell
1420 rd[i].rBuf->S[eStart+2] = ntohs(rd[i].rBuf->S[eStart+2]);//roi
1421 rd[i].rBuf->S[eStart+3] = ntohs(rd[i].rBuf->S[eStart+3]);//filling
1422
1423 eStart += 4+rd[i].rBuf->S[eStart+2];//skip the pixel data
1424 }
1425
1426 //channels done. footer now
1427 // rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//package_crc
1428 // eCount++;
1429 // rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//end_package_flag
1430 // snprintf(str, MXSTR, "Bytes read %d bytes swapped %d", jrd, eCount*2);
1431 // factOut(kInfo, 000, str);
1432 //ETIENNE end of bytes swapping already
1433
1434
1435 int checkRoi;
1436 int roiHopper = sizeof(PEVNT_HEADER)/2 + 2; //points to the very first roi
1437 int roi[9];
1438 roi[0] = rd[i].rBuf->S[roiHopper];
1439 roiHopper += roi[0]+4;//skip to the second roi (i.e. next board, same patch-pixel)
1440 for (int kr = 1; kr < 4; kr++)
1441 {
1442 checkRoi = rd[i].rBuf->S[roiHopper];
1443 if (checkRoi != roi[0])
1444 {
1445 factPrintf(kError, 1, "Inconsistent Roi accross boards B=%d, expected %d, got %d", kr, checkRoi, roi[0]);
1446// goto EndBuf;
1447 continue;
1448 }
1449 roiHopper += checkRoi+4;
1450 }
1451 //roiHopper now points to the first pixel of board 2. Do the 8 remaining pixels
1452 for (int jr = 1; jr < 9; jr++) {
1453 roi[jr] = rd[i].rBuf->S[roiHopper];
1454 checkRoi = roi[jr];
1455 for (int kr = 1; kr < 4; kr++)
1456 {
1457 roiHopper += checkRoi+4;
1458 checkRoi = rd[i].rBuf->S[roiHopper];
1459 if (checkRoi != roi[jr])
1460 {
1461 factPrintf(kFatal, 1, "Inconsistent Roi accross patches B=%d P=%d, expected %d, got %d", kr, jr, roi[jr], checkRoi);
1462// goto EndBuf;
1463 continue;
1464 }
1465 }
1466 }
1467 //get index into mBuffer for this event (create if needed)
1468
1469// int actid;
1470// if (g_useFTM > 0)
1471// actid = rd[i].evtID;
1472// else
1473// actid = rd[i].ftmID;
1474
1475 const int evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1476 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1477 rd[i].evtID);
1478
1479 if (evID < -1000) {
1480// goto EndBuf; //not usable board/event/run --> skip it
1481 //rd[i].bufTyp = 0; //ready to read next header
1482 //rd[i].bufLen = frst_len;
1483 //rd[i].bufPos = 0;
1484 continue;
1485 }
1486 if (evID < 0) { //no space left, retry later
1487#ifdef EVTDEBUG
1488 if (rd[i].bufLen != 0) {
1489 factPrintf(kFatal, 1, "something screwed up");
1490 }
1491#endif
1492 rd[i].bufTyp = -1;
1493 rd[i].bufLen = 0;
1494 rd[i].bufPos = rd[i].fadLen;
1495
1496 //xwait.tv_sec = 0;
1497 //xwait.tv_nsec = 10000000; // sleep for ~10 msec
1498 //nanosleep (&xwait, NULL);
1499 continue;
1500 //goto EndBuf1; //hope there is free space next round
1501 }
1502 //we have a valid entry in mBuffer[]; fill it
1503
1504#ifdef EVTDEBUG
1505 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1506 rd[i].fadLen);
1507 if (xchk != 0) {
1508 factPrintf(kFatal, 1, "ERROR OVERWRITE %d %d on port %d", xchk, rd[i].fadLen, i);
1509
1510 uint iq;
1511 for (iq = 0; iq < rd[i].fadLen; iq++) {
1512 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1513 factPrintf(kFatal, 1, "ERROR %4d %4d %x %x", i, iq, rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
1514 }
1515 }
1516 }
1517#endif
1518 const int boardId = b;
1519 const int fadBoard = rd[i].rBuf->S[12];
1520 const int fadCrate = fadBoard / 256;
1521
1522 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1523 factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)",
1524 boardId, fadBoard, fadCrate, fadBoard % 256);
1525 }
1526 if (mBuffer[evID].board[boardId] != -1) {
1527 factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d",
1528 evID, boardId, i, rd[i].fadLen,
1529 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1530 rd[i].rBuf->B[rd[i].fadLen - 2],
1531 rd[i].rBuf->B[rd[i].fadLen - 1]);
1532// goto EndBuf; //--> skip Board
1533 //rd[i].bufTyp = 0; //ready to read next header
1534 //rd[i].bufLen = frst_len;
1535 //rd[i].bufPos = 0;
1536 continue;
1537 }
1538
1539 const int iDx = evtIdx[evID]; //index into evtCtrl
1540
1541 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
1542 &rd[i].rBuf->S[0], sizeof(PEVNT_HEADER));
1543
1544 int src = sizeof(PEVNT_HEADER) / 2;
1545 for (int px = 0; px < 9; px++) { //different sort in FAD board.....
1546 for (int drs = 0; drs < 4; drs++) {
1547 // pixH = rd[i].rBuf->S[src++]; // ID
1548 src++;
1549 const int pixC = rd[i].rBuf->S[src++]; // start-cell
1550 const int pixR = rd[i].rBuf->S[src++]; // roi
1551//here we should check if pixH is correct ....
1552
1553 const int pixS = boardId * 36 + drs * 9 + px;
1554 src++;
1555
1556
1557 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1558
1559 int dest = pixS * roi[0];
1560 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1561 &rd[i].rBuf->S[src], roi[0] * 2);
1562 src += pixR;
1563
1564 if (px == 8) {
1565 const int tmS = boardId * 4 + drs;
1566 if (pixR > roi[0]) { //and we have additional TM info
1567 dest = tmS * roi[0] + NPIX * roi[0];
1568 int srcT = src - roi[0];
1569 mBuffer[evID].fEvent->StartTM[tmS] =
1570 (pixC + pixR - roi[0]) % 1024;
1571 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1572 &rd[i].rBuf->S[srcT], roi[0] * 2);
1573 } else {
1574 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1575 //ETIENNE because the TM channels are always processed during drs calib,
1576 //set them to zero if they are not present
1577 //I suspect that it may be more efficient to set all the allocated mem to
1578 //zero when allocating it
1579// dest = tmS*roi[0] + NPIX*roi[0];
1580 // bzero(&mBuffer[evID].fEvent->Adc_Data[dest],roi[0]*2);
1581 }
1582 }
1583 }
1584 } // now we have stored a new board contents into Event structure
1585
1586 mBuffer[evID].fEvent->NumBoards++;
1587 mBuffer[evID].board[boardId] = boardId;
1588 evtCtrl.evtStat[iDx]++;
1589 evtCtrl.pcTime[iDx] = g_actTime;
1590
1591 mBuffer[evID].nBoard++;
1592
1593 // have we already reported first (partial) event of this run ???
1594 if (mBuffer[evID].nBoard>0 && mBuffer[evID].runNum != actrun)
1595 {
1596 actrun = mBuffer[evID].runNum;
1597
1598 for (int ir = 0; ir < MAX_RUN; ir++)
1599 {
1600 if (runCtrl[ir].runId == actrun)
1601 {
1602 if (++runCtrl[ir].lastEvt == 0)
1603 {
1604 gotNewRun (actrun, NULL);
1605 factPrintf(kInfo, 1, "gotNewRun called for run %d, event %d",
1606 mBuffer[evID].runNum, mBuffer[evID].evNum);
1607 break;
1608 }
1609 }
1610 }
1611 }
1612
1613 //complete event read ---> flag for next processing
1614 if (mBuffer[evID].nBoard >= actBoards)
1615 {
1616 evtCtrl.evtStat[iDx] = 99;
1617 gi.evtTot++;
1618 }
1619
1620
1621/*
1622 if (mBuffer[evID].nBoard >= actBoards)
1623 {
1624 // have we already reported first event of this run ???
1625 if (mBuffer[evID].runNum != actrun)
1626 {
1627 actrun = mBuffer[evID].runNum;
1628
1629 for (int ir = 0; ir < MAX_RUN; ir++)
1630 {
1631 if (runCtrl[ir].runId == actrun)
1632 {
1633 if (++runCtrl[ir].lastEvt == 0)
1634 {
1635 gotNewRun (actrun, mBuffer[evID].FADhead);
1636 factPrintf(kInfo, 1, "gotNewRun called for run %d, event %d",
1637 mBuffer[evID].runNum, mBuffer[evID].evNum);
1638 break;
1639 }
1640 }
1641 }
1642 }
1643 //complete event read ---> flag for next processing
1644 evtCtrl.evtStat[iDx] = 99;
1645 gi.evtTot++;
1646 }
1647*/
1648// EndBuf:
1649 //rd[i].bufTyp = 0; //ready to read next header
1650 //rd[i].bufLen = frst_len;
1651 //rd[i].bufPos = 0;
1652 //EndBuf1:
1653 ;
1654 //}
1655
1656 } else { //we are reading event header
1657 //rd[i].bufPos += jrd;
1658 //rd[i].bufLen -= jrd;
1659 if (rd[i].bufPos < minLen) //{ //sufficient data to take action
1660 {
1661#ifdef EVTDEBUG
1662 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1663#endif
1664 continue;
1665 }
1666
1667 //check if startflag correct; else shift block ....
1668 int k;
1669 for (k = 0; k < rd[i].bufPos - 1; k++) {
1670 if (rd[i].rBuf->B[k] == start.B[1]
1671 && rd[i].rBuf->B[k + 1] == start.B[0])
1672 break;
1673 }
1674 rd[i].skip += k;
1675
1676 if (k >= rd[i].bufPos - 1) { //no start of header found
1677 rd[i].bufPos = 0;
1678 rd[i].bufLen = sizeof(PEVNT_HEADER);
1679 continue;
1680 }
1681
1682 if (k > 0) {
1683 rd[i].bufPos -= k;
1684 rd[i].bufLen += k;
1685 memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1686 rd[i].bufPos);
1687#ifdef EVTDEBUG
1688 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1689 rd[i].bufPos);
1690#endif
1691 }
1692
1693 if (rd[i].bufPos < minLen)
1694 {
1695#ifdef EVTDEBUG
1696 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1697#endif
1698 continue;
1699 }
1700
1701 //{
1702 if (rd[i].skip > 0) {
1703 factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i);
1704 rd[i].skip = 0;
1705 }
1706
1707 // TGB: This needs much more checks than just the first two bytes!
1708
1709 // Swap everything except start_package_flag.
1710 // It is to difficult to find out where it is used how,
1711 // but it doesn't really matter because it is not really
1712 // used anywehere else
1713// rd[i].rBuf->S[1] = ntohs(rd[i].rBuf->S[1]); // package_length
1714 rd[i].rBuf->S[2] = ntohs(rd[i].rBuf->S[2]); // version_no
1715 rd[i].rBuf->S[3] = ntohs(rd[i].rBuf->S[3]); // PLLLCK
1716 rd[i].rBuf->S[4] = ntohs(rd[i].rBuf->S[4]); // trigger_crc
1717 rd[i].rBuf->S[5] = ntohs(rd[i].rBuf->S[5]); // trigger_type
1718
1719 rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]); // board id
1720 rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]); // adc_clock_phase_shift
1721 rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]); // number_of_triggers_to_generate
1722 rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]); // trigger_generator_prescaler
1723
1724 rd[i].rBuf->I[3] = ntohl(rd[i].rBuf->I[3]); // trigger_id
1725 rd[i].rBuf->I[4] = ntohl(rd[i].rBuf->I[4]); // fad_evt_counter
1726 rd[i].rBuf->I[5] = ntohl(rd[i].rBuf->I[5]); // REFCLK_frequency
1727
1728 rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]); // runnumber;
1729 rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]); // time;
1730
1731 for (int s=24;s<24+NTemp+NDAC;s++)
1732 rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac
1733
1734 rd[i].fadLen = ntohs(rd[i].rBuf->S[1]) * 2;
1735 rd[i].fadVers = rd[i].rBuf->S[2];
1736 rd[i].ftmTyp = rd[i].rBuf->S[5];
1737 rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt)
1738 rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt)
1739 rd[i].runID = rd[i].rBuf->I[11];
1740 rd[i].bufTyp = 1; //ready to read full record
1741 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1742
1743#ifdef EVTDEBUG
1744 int fadboard = rd[i].rBuf->S[12];
1745 int fadcrate = fadboard / 256;
1746 fadboard = (fadcrate * 10 + fadboard % 256);
1747 factPrintf(kDebug, 1, "sk %3d head: %5d %5d %5d %10d %4d %6d",
1748 i, rd[i].fadLen, rd[i].evtID, rd[i].ftmID, rd[i].runID, fadboard, jrd);
1749#endif
1750
1751 if (rd[i].runID == 0)
1752 rd[i].runID = gi_myRun;
1753
1754 /*if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
1755 snprintf (str, MXSTR,
1756 "Illegal event-length %d on port %d, %d expected.", rd[i].bufLen, i, head_len);
1757 factOut (kFatal, 881, str);
1758 rd[i].bufLen = 100000; //?
1759 }*/
1760
1761 int fadBoard = rd[i].rBuf->S[12];
1762 debugHead (i, fadBoard, rd[i].rBuf);
1763 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1764 } //end interpreting last read
1765}
1766 } //end of successful read anything
1767 } //finished trying to read all sockets
1768
1769#ifdef EVTDEBUG
1770 factDebug(kDebug, -1, "Loop ---- %3d --- %8d", numokx, jrdx);
1771#endif
1772
1773 gi.numRead[numok]++;
1774
1775 g_actTime = time (NULL);
1776 if (g_actTime <= gi_SecTime) {
1777 usleep(1);
1778 continue;
1779 }
1780
1781 gi_SecTime = g_actTime;
1782
1783 gj.bufNew = gj.bufEvt = 0;
1784
1785 //loop over all active events and flag those older than read-timeout
1786 //delete those that are written to disk ....
1787 for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1788 {
1789 if (k0==evtCtrl.frstPtr && evtCtrl.evtStat[k0]<0)
1790 {
1791 evtCtrl.frstPtr++;
1792 evtCtrl.frstPtr %= MAX_EVT * MAX_RUN;
1793
1794 // Continue because evtCtrl.evtStat[k0] must be <0
1795 continue;
1796 }
1797
1798 // Check the more likely case first: incomplete events
1799 if (evtCtrl.evtStat[k0]>0 && evtCtrl.evtStat[k0]<92)
1800 {
1801 gj.bufNew++; //incomplete event in Buffer
1802
1803 // Event has not yet timed out or was reported already
1804 if (evtCtrl.evtStat[k0]>=90 || evtCtrl.pcTime[k0]>=g_actTime - 30)
1805 continue;
1806
1807 reportIncomplete(k0);
1808
1809 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events
1810 gi.evtSkp++;
1811 gi.evtTot++;
1812 gj.evtSkip++;
1813
1814 continue;
1815 }
1816
1817 // complete event in Buffer
1818 if (evtCtrl.evtStat[k0] >= 95)
1819 gj.bufEvt++;
1820
1821 // Check the less likely case: 'useless' or 'delete'
1822 if (evtCtrl.evtStat[k0]==0 || evtCtrl.evtStat[k0] >= 9000)
1823 {
1824 const int id = evtCtrl.evtBuf[k0];
1825#ifdef EVTDEBUG
1826 factPrintf(kDebug, -1, "%5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard);
1827#endif
1828 mBufFree (id); //event written--> free memory
1829 evtCtrl.evtStat[k0] = -1;
1830 gj.evtWrite++;
1831 gj.rateWrite++;
1832 }
1833
1834 }
1835
1836
1837 gj.deltaT = 1000; //temporary, must be improved
1838
1839 for (int ib = 0; ib < NBOARDS; ib++)
1840 gj.totBytes[ib] += gj.rateBytes[ib];
1841#ifdef THOMAS_MALLOC
1842 gj.totMem = tgb_memory;
1843#else
1844 gj.totMem = g_maxMem;
1845#endif
1846 if (gj.maxMem > gj.xxxMem)
1847 gj.xxxMem = gj.maxMem;
1848 if (gj.maxEvt > gj.xxxEvt)
1849 gj.xxxEvt = gj.maxEvt;
1850
1851 factStat (gj);
1852 factStatNew (gi);
1853 gj.rateNew = gj.rateWrite = 0;
1854 gj.maxMem = gj.usdMem;
1855 gj.maxEvt = gj.bufTot;
1856 for (int b = 0; b < NBOARDS; b++)
1857 gj.rateBytes[b] = 0;
1858/*
1859 }
1860 if (numok > 0)
1861 numok2 = 0;
1862 else if (numok2++ > 3) {
1863 if (g_runStat == 1) {
1864 xwait.tv_sec = 1;
1865 xwait.tv_nsec = 0; // hibernate for 1 sec
1866 } else {
1867 xwait.tv_sec = 0;
1868 xwait.tv_nsec = 1000; // sleep for ~1 usec
1869 }
1870 nanosleep (&xwait, NULL);
1871 }
1872 */
1873 } //and do next loop over all sockets ...
1874
1875 factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset);
1876
1877 if (g_reset > 0) {
1878 gi_reset = g_reset;
1879 gi_resetR = gi_reset % 10; //shall we stop reading ?
1880 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1881 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1882 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1883 g_reset = 0;
1884 } else {
1885 gi_reset = 0;
1886 if (g_runStat == -1)
1887 gi_resetR = 1;
1888 else
1889 gi_resetR = 7;
1890 gi_resetS = 7; //close all sockets
1891 gi_resetW = 7; //close all files
1892 gi_resetX = 0;
1893
1894 //inform others we have to quit ....
1895 gi_runStat = -11; //inform all that no update to happen any more
1896 gj.readStat = -11; //inform all that no update to happen any more
1897 }
1898
1899 if (gi_resetS > 0) {
1900 //must close all open sockets ...
1901 factPrintf(kInfo, -1, "Close all sockets...");
1902 for (int i = 0; i < MAX_SOCK; i++) {
1903 if (rd[i].sockStat == 0) {
1904 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1905 if (i % 7 == 0) {
1906// gi_NumConnect[i / 7]--;
1907 gi_NumConnect[i / 7]-= cntsock ;
1908 gi.numConn[i / 7]--;
1909 gj.numConn[i / 7]--;
1910 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1911 rd[i / 7].sockStat = -1; //and try to open asap
1912 }
1913 }
1914 }
1915 }
1916
1917
1918 if (gi_resetR > 0)
1919 {
1920 //flag all events as 'read finished'
1921 for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1922 {
1923 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90)
1924 {
1925 evtCtrl.evtStat[k0] = 91;
1926 gi.evtSkp++;
1927 gi.evtTot++;
1928 }
1929 }
1930
1931 //and clear all buffers (might have to wait until all others are done)
1932 int minclear;
1933 if (gi_resetR == 1) {
1934 minclear = 900;
1935 factPrintf(kInfo, -1, "Drain all buffers ...");
1936 } else {
1937 minclear = 0;
1938 factPrintf(kInfo, -1, "Flush all buffers ...");
1939 }
1940
1941 int numclear = 1;
1942 while (numclear > 0)
1943 {
1944 numclear = 0;
1945
1946 for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1947 {
1948 if (evtCtrl.evtStat[k0] > minclear)
1949 {
1950 const int id = evtCtrl.evtBuf[k0];
1951#ifdef EVTDEBUG
1952 factPrintf(kDebug, -1, "ev %5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard);
1953#endif
1954 mBufFree (id); //event written--> free memory
1955 evtCtrl.evtStat[k0] = -1;
1956 }
1957 else
1958 if (evtCtrl.evtStat[k0] > 0)
1959 numclear++; //writing is still ongoing...
1960
1961 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1962 {
1963 evtCtrl.frstPtr++;
1964 evtCtrl.frstPtr %= MAX_EVT * MAX_RUN;
1965 }
1966 }
1967
1968 usleep(1);
1969 }
1970 }
1971
1972 if (gi_reset > 0) {
1973 if (gi_resetW > 0) {
1974 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1975 }
1976 if (gi_resetX > 0)
1977 {
1978 struct timespec xwait;
1979 xwait.tv_sec = gi_resetX;
1980 xwait.tv_nsec = 0;
1981 nanosleep (&xwait, NULL);
1982 }
1983
1984 factPrintf(kInfo, -1, "Continue read Process ...");
1985 gi_reset = 0;
1986 goto START;
1987 }
1988
1989
1990
1991 factPrintf(kInfo, -1, "Exit read Process...");
1992
1993#ifdef THOMAS_MALLOC
1994 factPrintf(kInfo, -1, "%ld Bytes flaged as in-use.", tgb_inuse);
1995#endif
1996
1997 gi_runStat = -99;
1998 gj.readStat = -99;
1999 factStat (gj);
2000 factStatNew (gi);
2001 return 0;
2002
2003} /*-----------------------------------------------------------------*/
2004
2005
2006void *subProc(void *thrid)
2007{
2008 const int64_t threadID = (int64_t)thrid;
2009
2010 factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID);
2011
2012 while (g_runStat > -2) //in case of 'exit' we still must process pending events
2013 {
2014 int numWait = 0;
2015 int numProc = 0;
2016
2017 for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
2018 {
2019 if (!(evtCtrl.evtStat[k0] == 1000 + threadID))
2020 {
2021 if (evtCtrl.evtStat[k0] < 1000 + threadID)
2022 numWait++;
2023
2024 continue;
2025 }
2026
2027 /*** if (evtCtrl.evtStat[k0] == 1000 + threadID) ****/
2028
2029 int jret = 9100; // flag to be deleted (gi_resetR>1 : flush buffers asap)
2030
2031 if (gi_resetR<=1)
2032 {
2033 const int id = evtCtrl.evtBuf[k0];
2034
2035 jret = subProcEvt(threadID, mBuffer[id].FADhead,
2036 mBuffer[id].fEvent, mBuffer[id].buffer);
2037
2038
2039 if (jret <= threadID) {
2040 factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret);
2041 jret = 5300;
2042 } else if (jret <= 0)
2043 jret = 9200 + threadID; // flag as 'to be deleted'
2044 else if (jret >= gi_maxProc)
2045 jret = 5200 + threadID; // flag as 'to be written'
2046 else
2047 jret = 1000 + jret; // flag for next proces
2048 }
2049
2050 evtCtrl.evtStat[k0] = jret;
2051 numProc++;
2052 }
2053
2054 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2055 factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID);
2056 return 0;
2057 }
2058
2059 //seems we have nothing to do, so sleep a little
2060 if (numProc == 0)
2061 usleep(1);
2062 }
2063
2064 factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID);
2065
2066 return 0;
2067}
2068
2069/*-----------------------------------------------------------------*/
2070
2071
2072void *
2073procEvt (void *ptr)
2074{
2075/* *** main loop processing file, including SW-trigger */
2076 int status;
2077
2078 int lastRun = 0; //usually run from last event still valid
2079
2080// cpu_set_t mask;
2081// int cpu = 1; //process thread (will be several in final version)
2082
2083 factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc);
2084
2085/* CPU_ZERO initializes all the bits in the mask to zero. */
2086// CPU_ZERO (&mask);
2087/* CPU_SET sets only the bit corresponding to cpu. */
2088// CPU_SET( 0 , &mask ); leave for system
2089// CPU_SET( 1 , &mask ); used by write process
2090// CPU_SET (2, &mask);
2091// CPU_SET (3, &mask);
2092// CPU_SET (4, &mask);
2093// CPU_SET (5, &mask);
2094// CPU_SET (6, &mask);
2095// CPU_SET( 7 , &mask ); used by read process
2096/* sched_setaffinity returns 0 in success */
2097// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
2098// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
2099// factOut (kWarn, -1, str);
2100// }
2101
2102
2103 pthread_t thread[100];
2104// int th_ret[100];
2105
2106 for (long long k = 0; k < gi_maxProc; k++) {
2107 /*th_ret[k] =*/ pthread_create (&thread[k], NULL, subProc, (void *) k);
2108 }
2109
2110 // in case of 'exit' we still must process pending events
2111 while (g_runStat > -2)
2112 {
2113 int numWait = 0;
2114 int numProc = 0;
2115
2116 for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
2117 {
2118 if (evtCtrl.evtStat[k0] <= 90 || evtCtrl.evtStat[k0] >= 1000)
2119 {
2120 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90)
2121 numWait++;
2122
2123 continue;
2124 }
2125
2126 /*** if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) ***/
2127
2128 //we are asked to flush buffers asap
2129 if (gi_resetR > 1)
2130 {
2131 evtCtrl.evtStat[k0] = 9991;
2132 continue;
2133 }
2134
2135 //-------- it is better to open the run already here, so call can be used to initialize
2136 //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
2137 const int id = evtCtrl.evtBuf[k0];
2138
2139 const uint32_t irun = mBuffer[id].runNum;
2140 const int32_t ievt = mBuffer[id].evNum;
2141
2142 int j = lastRun;
2143 if (runCtrl[lastRun].runId != irun)
2144 {
2145 //check which fileID to use (or open if needed)
2146 for (j = 0; j < MAX_RUN; j++) {
2147 if (runCtrl[j].runId == irun)
2148 break;
2149 }
2150 if (j >= MAX_RUN) {
2151 factPrintf(kFatal, 901, "procEvt: Can not find run %d for event %d in %d", irun, ievt, id);
2152 }
2153 lastRun = j;
2154 }
2155
2156 if (runCtrl[j].fileId < 0)
2157 {
2158 //---- we need to open a new run ==> make sure all older runs are
2159 //---- finished and marked to be closed ....
2160 int j1;
2161 for (j1 = 0; j1 < MAX_RUN; j1++) {
2162 if (runCtrl[j1].fileId == 0) {
2163 runCtrl[j1].procId = 2; //--> do no longer accept events for processing
2164 //---- problem: processing still going on ==> must wait for closing ....
2165 factPrintf(kInfo, -1, "procEvt: Finished run since new one opened %d", runCtrl[j1].runId);
2166 runFinish1(runCtrl[j1].runId);
2167 }
2168 }
2169
2170 actRun.Version = 1;
2171 actRun.RunType = -1; //to be adapted
2172
2173 actRun.Nroi = runCtrl[j].roi0;
2174 actRun.NroiTM = runCtrl[j].roi8;
2175 //ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
2176 // if (actRun.Nroi == actRun.NroiTM)
2177 // actRun.NroiTM = 0;
2178 actRun.RunTime = runCtrl[j].firstTime;
2179 actRun.RunUsec = runCtrl[j].firstTime;
2180 actRun.NBoard = NBOARDS;
2181 actRun.NPix = NPIX;
2182 actRun.NTm = NTMARK;
2183 actRun.Nroi = mBuffer[id].nRoi;
2184
2185 memcpy(actRun.FADhead, mBuffer[id].FADhead, NBOARDS*sizeof(PEVNT_HEADER));
2186
2187 runCtrl[j].fileHd = runOpen (irun, &actRun, sizeof (actRun));
2188 if (runCtrl[j].fileHd == NULL)
2189 {
2190 factPrintf(kError, 502, "procEvt: Could not open a file for run %d (runOpen failed)", irun);
2191 runCtrl[j].fileId = 91;
2192 runCtrl[j].procId = 91;
2193 }
2194 else
2195 {
2196 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt);
2197 runCtrl[j].fileId = 0;
2198 runCtrl[j].procId = 0;
2199 }
2200 }
2201
2202 //-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
2203 if (runCtrl[j].procId == 0)
2204 {
2205 if (runCtrl[j].closeTime < g_actTime ||
2206 runCtrl[j].lastTime < g_actTime - 300 ||
2207 runCtrl[j].maxEvt <= runCtrl[j].procEvt)
2208 {
2209 factPrintf(kInfo, 502, "procEvt: Reached end of run condition for run %d", irun);
2210 runFinish1 (runCtrl[j].runId);
2211 runCtrl[j].procId = 1;
2212 }
2213 }
2214 if (runCtrl[j].procId != 0) {
2215#ifdef EVTDEBUG
2216 factPrintf(kDebug, 502, "procEvt: Skip event %d because no active run %d", ievt, irun);
2217#endif
2218 evtCtrl.evtStat[k0] = 9091;
2219 continue;
2220 }
2221
2222 //--------
2223 //--------
2224
2225 const int roi = mBuffer[id].nRoi;
2226 //const int roiTM = mBuffer[id].nRoiTM;
2227
2228 //make sure unused pixels/tmarks are cleared to zero
2229 //ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
2230 // if (roiTM == roi)
2231 // roiTM = 0;
2232 for (int ip=0; ip<NPIX; ip++)
2233 {
2234 if (mBuffer[id].fEvent->StartPix[ip] == -1)
2235 {
2236 const int dest = ip * roi;
2237 memset(&mBuffer[id].fEvent->Adc_Data[dest], 0, roi * 2);
2238 }
2239 }
2240
2241 for (int it=0; it<NTMARK; it++)
2242 {
2243 if (mBuffer[id].fEvent->StartTM[it] == -1)
2244 {
2245 const int dest = it * roi + NPIX * roi;
2246 memset(&mBuffer[id].fEvent->Adc_Data[dest], 0, roi * 2);
2247 }
2248 }
2249
2250 //and set correct event header ; also check for consistency in event (not yet)
2251 mBuffer[id].fEvent->Roi = mBuffer[id].nRoi;
2252 mBuffer[id].fEvent->RoiTM = mBuffer[id].nRoiTM;
2253 mBuffer[id].fEvent->EventNum = mBuffer[id].evNum;
2254 mBuffer[id].fEvent->TriggerNum = mBuffer[id].trgNum;
2255 mBuffer[id].fEvent->TriggerType = mBuffer[id].trgTyp;
2256 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
2257 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
2258 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
2259 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
2260 mBuffer[id].fEvent->SoftTrig = 0;
2261
2262
2263 for (int ib=0; ib<NBOARDS; ib++)
2264 {
2265 // board is not read
2266 if (mBuffer[id].board[ib] == -1)
2267 {
2268 mBuffer[id].FADhead[ib].start_package_flag = 0;
2269 mBuffer[id].fEvent->BoardTime[ib] = 0;
2270 }
2271 else
2272 {
2273 mBuffer[id].fEvent->BoardTime[ib] = mBuffer[id].FADhead[ib].time;
2274 }
2275 }
2276
2277 const int rc = eventCheck(mBuffer[id].runNum, mBuffer[id].FADhead,
2278 mBuffer[id].fEvent);
2279 gi.procTot++;
2280 numProc++;
2281
2282 if (rc < 0)
2283 {
2284 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
2285 gi.procErr++;
2286 }
2287 else
2288 {
2289 evtCtrl.evtStat[k0] = 1000;
2290 runCtrl[j].procEvt++;
2291 }
2292 }
2293
2294 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2295 factPrintf(kInfo, -1, "Exit Processing Process ...");
2296 gp_runStat = -22; //==> we should exit
2297 gj.procStat = -22; //==> we should exit
2298 return 0;
2299 }
2300
2301 //seems we have nothing to do, so sleep a little
2302 if (numProc == 0)
2303 usleep(1);
2304
2305 gp_runStat = gi_runStat;
2306 gj.procStat = gj.readStat;
2307
2308 }
2309
2310 //we are asked to abort asap ==> must flag all remaining events
2311 // when gi_runStat claims that all events are in the buffer...
2312
2313 factPrintf(kInfo, -1, "Abort Processing Process ...");
2314
2315 for (int k = 0; k < gi_maxProc; k++) {
2316 pthread_join (thread[k], (void **) &status);
2317 }
2318
2319 for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
2320 {
2321 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000)
2322 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
2323 }
2324
2325 gp_runStat = -99;
2326 gj.procStat = -99;
2327
2328 return 0;
2329
2330} /*-----------------------------------------------------------------*/
2331
2332int
2333CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
2334{
2335/* close run runId (all all runs if runId=0) */
2336/* return: 0=close scheduled / >0 already closed / <0 does not exist */
2337 int j;
2338
2339
2340 if (runId == 0) {
2341 for (j = 0; j < MAX_RUN; j++) {
2342 if (runCtrl[j].fileId == 0) { //run is open
2343 runCtrl[j].closeTime = closeTime;
2344 runCtrl[j].maxEvt = maxEvt;
2345 }
2346 }
2347 return 0;
2348 }
2349
2350 for (j = 0; j < MAX_RUN; j++) {
2351 if (runCtrl[j].runId == runId) {
2352 if (runCtrl[j].fileId == 0) { //run is open
2353 runCtrl[j].closeTime = closeTime;
2354 runCtrl[j].maxEvt = maxEvt;
2355 return 0;
2356 } else if (runCtrl[j].fileId < 0) { //run not yet opened
2357 runCtrl[j].closeTime = closeTime;
2358 runCtrl[j].maxEvt = maxEvt;
2359 return +1;
2360 } else { // run already closed
2361 return +2;
2362 }
2363 }
2364 } //we only reach here if the run was never created
2365 return -1;
2366
2367}
2368
2369void checkAndCloseRun(int j, int irun, int cond, int where)
2370{
2371 if (!cond &&
2372 runCtrl[j].closeTime >= g_actTime &&
2373 runCtrl[j].lastTime >= g_actTime - 300 &&
2374 runCtrl[j].maxEvt > runCtrl[j].actEvt)
2375 return;
2376
2377 //close run for whatever reason
2378 if (runCtrl[j].runId == gi_myRun)
2379 gi_myRun = g_actTime;
2380
2381 int ii = 0;
2382 if (cond)
2383 ii = 1;
2384 if (runCtrl[j].closeTime < g_actTime)
2385 ii |= 2; // = 2;
2386 if (runCtrl[j].lastTime < g_actTime - 300)
2387 ii |= 4; // = 3;
2388 if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2389 ii |= 8; // = 4;
2390
2391 if (runCtrl[j].procId == 0)
2392 {
2393 runFinish1(runCtrl[j].runId);
2394 runCtrl[j].procId = 92;
2395 }
2396
2397 runCtrl[j].closeTime = g_actTime - 1;
2398
2399 const int rc = runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]));
2400 if (rc<0)
2401 {
2402 factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)",
2403 where, runCtrl[j].runId, rc);
2404 runCtrl[j].fileId = 92+where*2;
2405 }
2406 else
2407 {
2408 factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)",
2409 where, irun, ii);
2410 runCtrl[j].fileId = 93+where*2;
2411 }
2412}
2413
2414/*-----------------------------------------------------------------*/
2415
2416
2417void *writeEvt (void *ptr)
2418{
2419/* *** main loop writing event (including opening and closing run-files */
2420
2421// cpu_set_t mask;
2422// int cpu = 1; //write thread
2423
2424 factPrintf(kInfo, -1, "Starting write-thread");
2425
2426/* CPU_ZERO initializes all the bits in the mask to zero. */
2427// CPU_ZERO (&mask);
2428/* CPU_SET sets only the bit corresponding to cpu. */
2429// CPU_SET (cpu, &mask);
2430/* sched_setaffinity returns 0 in success */
2431// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
2432// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
2433// }
2434
2435 int lastRun = 0; //usually run from last event still valid
2436
2437 while (g_runStat > -2)
2438 {
2439 int numWrite = 0;
2440 int numWait = 0;
2441
2442 for (int k0=evtCtrl.frstPtr; k0!=evtCtrl.lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
2443 {
2444 if (evtCtrl.evtStat[k0] <= 5000 || evtCtrl.evtStat[k0] >= 9000)
2445 {
2446 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2447 numWait++;
2448
2449 continue;
2450 }
2451
2452 /*** if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) ***/
2453
2454 //we must drain the buffer asap
2455 if (gi_resetR > 1)
2456 {
2457 evtCtrl.evtStat[k0] = 9904;
2458 continue;
2459 }
2460
2461 const int id = evtCtrl.evtBuf[k0];
2462
2463 const uint32_t irun = mBuffer[id].runNum;
2464 const int32_t ievt = mBuffer[id].evNum;
2465
2466 gi.wrtTot++;
2467
2468 int j = lastRun;
2469
2470 if (runCtrl[lastRun].runId != irun)
2471 {
2472 //check which fileID to use (or open if needed)
2473 for (j = 0; j < MAX_RUN; j++)
2474 {
2475 if (runCtrl[j].runId == irun)
2476 break;
2477 }
2478
2479 if (j >= MAX_RUN)
2480 {
2481 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, id);
2482 gi.wrtErr++;
2483 }
2484
2485 lastRun = j;
2486 }
2487
2488 if (runCtrl[j].fileId < 0) {
2489 actRun.Version = 1;
2490 actRun.RunType = -1; //to be adapted
2491
2492 actRun.Nroi = runCtrl[j].roi0;
2493 actRun.NroiTM = runCtrl[j].roi8;
2494 //ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
2495 // if (actRun.Nroi == actRun.NroiTM)
2496 // actRun.NroiTM = 0;
2497 actRun.RunTime = runCtrl[j].firstTime;
2498 actRun.RunUsec = runCtrl[j].firstTime;
2499 actRun.NBoard = NBOARDS;
2500 actRun.NPix = NPIX;
2501 actRun.NTm = NTMARK;
2502 actRun.Nroi = mBuffer[id].nRoi;
2503 memcpy (actRun.FADhead, mBuffer[id].FADhead,
2504 NBOARDS * sizeof (PEVNT_HEADER));
2505
2506 runCtrl[j].fileHd =
2507 runOpen (irun, &actRun, sizeof (actRun));
2508 if (runCtrl[j].fileHd == NULL) {
2509 factPrintf(kError, 502, "writeEvt: Could not open a file for run %d (runOpen failed)", irun);
2510 runCtrl[j].fileId = 91;
2511 } else {
2512 factPrintf(kInfo, -1, "writeEvt: Opened new file for run %d (evt %d)", irun, ievt);
2513 runCtrl[j].fileId = 0;
2514 }
2515
2516 }
2517
2518 if (runCtrl[j].fileId != 0) {
2519 if (runCtrl[j].fileId < 0) {
2520 factPrintf(kError, 123, "writeEvt: Never opened file for run %d", irun);
2521 } else if (runCtrl[j].fileId < 100) {
2522 factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun);
2523 runCtrl[j].fileId += 100;
2524 } else {
2525#ifdef EVTDEBUG
2526 factPrintf(kDebug, 123, "writeEvt: File for run %d is closed", irun);
2527#endif
2528 }
2529 evtCtrl.evtStat[k0] = 9903;
2530 gi.wrtErr++;
2531 } else {
2532 // snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id]));
2533 // factOut (kInfo, 504, str);
2534 const int rc = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
2535 sizeof (mBuffer[id]));
2536 if (rc >= 0) {
2537 runCtrl[j].lastTime = g_actTime;
2538 runCtrl[j].actEvt++;
2539 evtCtrl.evtStat[k0] = 9901;
2540#ifdef EVTDEBUG
2541 factPrintf(kDebug, 504, "%5d successfully wrote for run %d id %5d", ievt, irun, k0);
2542#endif
2543 // gj.writEvt++ ;
2544 } else {
2545 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun);
2546 evtCtrl.evtStat[k0] = 9902;
2547 gi.wrtErr++;
2548 }
2549
2550 checkAndCloseRun(j, irun, rc<0, 1);
2551 }
2552 }
2553
2554 //check if we should close a run (mainly when no event pending)
2555 //ETIENNE but first figure out which one is the latest run with a complete event.
2556 //i.e. max run Id and lastEvt >= 0
2557 //this condition is sufficient because all pending events were written already in the loop just above
2558 //actrun
2559 uint32_t lastStartedTime = 0;
2560 uint32_t runIdFound = 0;
2561 if (actrun != 0)
2562 {//If we have an active run, look for its start time
2563 for (int j=0;j<MAX_RUN;j++)
2564 {
2565 if (runCtrl[j].runId == actrun)
2566 {
2567 lastStartedTime = runCtrl[j].lastTime;
2568 runIdFound = 1;
2569 }
2570 }
2571 }
2572 else
2573 runIdFound = 1;
2574
2575 if (runIdFound == 0)
2576 {
2577 factPrintf(kInfo, 0, "An Active run (number %u) has been registered, but it could not be found in the runs list", actrun);
2578 }
2579// snprintf(str, MXSTR, "Maximum runId: %d", maxStartedRun);
2580// factOut(kInfo, 000, str);
2581 //Also check if some files will never be opened
2582 //EDIT: this is completely useless, because as run Numbers are taken from FADs board,
2583 //I will never get run numbers for which no file is to be opened
2584 for (int j=0;j<MAX_RUN;j++)
2585 {
2586 if ((runCtrl[j].fileId < 0) &&
2587 (runCtrl[j].lastTime < lastStartedTime) &&
2588 (runCtrl[j].runId != 0))
2589 {
2590 factPrintf(kInfo, 0, "writeEvt: No file will be opened for run %u. Last run: %u (started)", runCtrl[j].runId, actrun);
2591 ;//TODO notify that this run will never be opened
2592 }
2593 }
2594 for (int j=0; j<MAX_RUN; j++)
2595 {
2596 if (runCtrl[j].fileId == 0)
2597 {
2598 //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
2599 const int cond = runCtrl[j].lastTime < lastStartedTime && runCtrl[j].runId != 0;
2600 checkAndCloseRun(j, runCtrl[j].runId, cond, 2);
2601 }
2602 }
2603
2604 //seems we have nothing to do, so sleep a little
2605 if (numWrite == 0)
2606 usleep(1);
2607
2608 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2609 factPrintf(kInfo, -1, "Finish Write Process ...");
2610 gw_runStat = -22; //==> we should exit
2611 gj.writStat = -22; //==> we should exit
2612 goto closerun;
2613 }
2614 gw_runStat = gi_runStat;
2615 gj.writStat = gj.readStat;
2616
2617 }
2618
2619 //must close all open files ....
2620 factPrintf(kInfo, -1, "Abort Writing Process ...");
2621
2622 closerun:
2623 factPrintf(kInfo, -1, "Close all open files ...");
2624 for (int j=0; j<MAX_RUN; j++)
2625 {
2626 if (runCtrl[j].fileId == 0)
2627 checkAndCloseRun(j, runCtrl[j].runId, 1, 3);
2628 }
2629
2630 gw_runStat = -99;
2631 gj.writStat = -99;
2632 factPrintf(kInfo, -1, "Exit Writing Process ...");
2633 return 0;
2634
2635
2636
2637
2638} /*-----------------------------------------------------------------*/
2639
2640
2641
2642
2643void
2644StartEvtBuild ()
2645{
2646
2647 int i, /*j,*/ imax, status/*, th_ret[50]*/;
2648 pthread_t thread[50];
2649 struct timespec xwait;
2650
2651 gi_runStat = gp_runStat = gw_runStat = 0;
2652 gj.readStat = gj.procStat = gj.writStat = 0;
2653
2654 factPrintf(kInfo, -1, "Starting EventBuilder V15.07 A");
2655
2656//initialize run control logics
2657 for (i = 0; i < MAX_RUN; i++) {
2658 runCtrl[i].runId = 0;
2659 runCtrl[i].fileId = -2;
2660 }
2661
2662//prepare for subProcesses
2663 gi_maxSize = g_maxSize;
2664 if (gi_maxSize <= 0)
2665 gi_maxSize = 1;
2666
2667 gi_maxProc = g_maxProc;
2668 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2669 factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc);
2670 gi_maxProc = 1;
2671 }
2672//partially initialize event control logics
2673 evtCtrl.frstPtr = 0;
2674 evtCtrl.lastPtr = 0;
2675
2676//start all threads (more to come) when we are allowed to ....
2677 while (g_runStat == 0) {
2678 xwait.tv_sec = 0;
2679 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2680 nanosleep (&xwait, NULL);
2681 }
2682
2683 i = 0;
2684 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL);
2685 i++;
2686 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL);
2687 i++;
2688 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL);
2689 i++;
2690 imax = i;
2691
2692
2693#ifdef BILAND
2694 xwait.tv_sec = 30;;
2695 xwait.tv_nsec = 0; // sleep for ~20sec
2696 nanosleep (&xwait, NULL);
2697
2698 printf ("close all runs in 2 seconds\n");
2699
2700 CloseRunFile (0, time (NULL) + 2, 0);
2701
2702 xwait.tv_sec = 1;;
2703 xwait.tv_nsec = 0; // sleep for ~20sec
2704 nanosleep (&xwait, NULL);
2705
2706 printf ("setting g_runstat to -1\n");
2707
2708 g_runStat = -1;
2709#endif
2710
2711
2712//wait for all threads to finish
2713 for (i = 0; i < imax; i++) {
2714 /*j =*/ pthread_join (thread[i], (void **) &status);
2715 }
2716
2717} /*-----------------------------------------------------------------*/
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733 /*-----------------------------------------------------------------*/
2734 /*-----------------------------------------------------------------*/
2735 /*-----------------------------------------------------------------*/
2736 /*-----------------------------------------------------------------*/
2737 /*-----------------------------------------------------------------*/
2738
2739#ifdef BILAND
2740
2741int
2742subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2743 int8_t * buffer)
2744{
2745 printf ("called subproc %d\n", threadID);
2746 return threadID + 1;
2747}
2748
2749
2750
2751
2752 /*-----------------------------------------------------------------*/
2753 /*-----------------------------------------------------------------*/
2754 /*-----------------------------------------------------------------*/
2755 /*-----------------------------------------------------------------*/
2756 /*-----------------------------------------------------------------*/
2757
2758
2759
2760
2761FileHandle_t
2762runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2763{
2764 return 1;
2765};
2766
2767int
2768runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2769{
2770 return 1;
2771 usleep (10000);
2772 return 1;
2773}
2774
2775
2776//{ return 1; } ;
2777
2778int
2779runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2780{
2781 return 1;
2782};
2783
2784
2785
2786
2787int
2788eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2789{
2790 int i = 0;
2791
2792// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2793// for (i=0; i<NBOARDS; i++) {
2794// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2795// }
2796 return 0;
2797}
2798
2799
2800void
2801factStatNew (EVT_STAT gi)
2802{
2803 int i;
2804
2805//for (i=0;i<MAX_SOCK;i++) {
2806// printf("%4d",gi.numRead[i]);
2807// if (i%20 == 0 ) printf("\n");
2808//}
2809}
2810
2811void
2812gotNewRun (int runnr, PEVNT_HEADER * headers)
2813{
2814 printf ("got new run %d\n", runnr);
2815 return;
2816}
2817
2818void
2819factStat (GUI_STAT gj)
2820{
2821// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2822// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2823}
2824
2825
2826void
2827debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2828 int state, uint32_t tsec, uint32_t tusec)
2829{
2830// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2831}
2832
2833
2834
2835void
2836debugStream (int isock, void *buf, int len)
2837{
2838}
2839
2840void
2841debugHead (int i, int j, void *buf)
2842{
2843}
2844
2845
2846void
2847factOut (int severity, int err, char *message)
2848{
2849 static FILE *fd;
2850 static int file = 0;
2851
2852 if (file == 0) {
2853 printf ("open file\n");
2854 fd = fopen ("x.out", "w+");
2855 file = 999;
2856 }
2857
2858 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2859
2860 if (severity != kDebug)
2861 printf ("%3d %3d | %s\n", severity, err, message);
2862}
2863
2864
2865
2866int
2867main ()
2868{
2869 int i, b, c, p;
2870 char ipStr[100];
2871 struct in_addr IPaddr;
2872
2873 g_maxMem = 1024 * 1024; //MBytes
2874//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2875 g_maxMem = g_maxMem * 200; //100MBytes
2876
2877 g_maxProc = 20;
2878 g_maxSize = 30000;
2879
2880 g_runStat = 40;
2881
2882 i = 0;
2883
2884// version for standard crates
2885//for (c=0; c<4,c++) {
2886// for (b=0; b<10; b++) {
2887// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2888//
2889// inet_pton(PF_INET, ipStr, &IPaddr) ;
2890//
2891// g_port[i].sockAddr.sin_family = PF_INET;
2892// g_port[i].sockAddr.sin_port = htons(5000) ;
2893// g_port[i].sockAddr.sin_addr = IPaddr ;
2894// g_port[i].sockDef = 1 ;
2895// i++ ;
2896// }
2897//}
2898//
2899//version for PC-test *
2900 for (c = 0; c < 4; c++) {
2901 for (b = 0; b < 10; b++) {
2902 sprintf (ipStr, "10.0.%d.11", 128 + c);
2903 if (c < 2)
2904 sprintf (ipStr, "10.0.%d.11", 128);
2905 else
2906 sprintf (ipStr, "10.0.%d.11", 131);
2907// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2908
2909 inet_pton (PF_INET, ipStr, &IPaddr);
2910 p = 31919 + 100 * c + 10 * b;
2911
2912
2913 g_port[i].sockAddr.sin_family = PF_INET;
2914 g_port[i].sockAddr.sin_port = htons (p);
2915 g_port[i].sockAddr.sin_addr = IPaddr;
2916 g_port[i].sockDef = 1;
2917
2918 i++;
2919 }
2920 }
2921
2922
2923//g_port[17].sockDef =-1 ;
2924//g_actBoards-- ;
2925
2926 StartEvtBuild ();
2927
2928 return 0;
2929
2930}
2931#endif
Note: See TracBrowser for help on using the repository browser.