source: trunk/FACT++/src/EventBuilder.c@ 15343

Last change on this file since 15343 was 15343, checked in by tbretz, 11 years ago
Unified two identical pieces of code around runClose
File size: 91.8 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6
7#include <stdlib.h>
8#include <stdint.h>
9#include <stdarg.h>
10#include <unistd.h>
11#include <stdio.h>
12#include <sys/time.h>
13#include <arpa/inet.h>
14#include <string.h>
15#include <math.h>
16#include <error.h>
17#include <errno.h>
18#include <unistd.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <netinet/in.h>
22#include <netinet/tcp.h>
23#include <pthread.h>
24#include <sched.h>
25
26#include "EventBuilder.h"
27
28enum Severity
29{
30 kMessage = 10, ///< Just a message, usually obsolete
31 kInfo = 20, ///< An info telling something which can be interesting to know
32 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
33 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
34 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
35 kDebug = 99, ///< A message used for debugging only
36};
37
38#define MIN_LEN 32 // min #bytes needed to interpret FADheader
39#define MAX_LEN 256*1024 // size of read-buffer per socket
40
41//#define nanosleep(x,y)
42
43extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
44extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
45extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
46//extern int runFinish (uint32_t runnr);
47
48extern void factOut (int severity, int err, char *message);
49extern void factReportIncomplete (uint64_t rep);
50
51extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
52
53
54extern void factStat (GUI_STAT gj);
55
56extern void factStatNew (EVT_STAT gi);
57
58extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
59
60extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
61 int8_t * buffer);
62
63extern void debugHead (int i, int j, void *buf);
64
65extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
66 int32_t runnr, int state, uint32_t tsec,
67 uint32_t tusec);
68extern void debugStream (int isock, void *buf, int len);
69
70int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
71
72
73
74
75
76int g_maxProc;
77int g_maxSize;
78int gi_maxSize;
79int gi_maxProc;
80
81uint g_actTime;
82uint g_actUsec;
83int g_runStat;
84int g_reset;
85int g_useFTM;
86
87int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
88size_t g_maxMem; //maximum memory allowed for buffer
89
90//no longer needed ...
91int g_maxBoards; //maximum number of boards to be initialized
92int g_actBoards;
93//
94
95FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
96
97
98int gi_runStat;
99int gp_runStat;
100int gw_runStat;
101
102//int gi_memStat = +1;
103
104uint32_t gi_myRun = 0;
105uint32_t actrun = 0;
106
107
108uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
109
110//uint gi_EvtStart= 0 ;
111//uint gi_EvtRead = 0 ;
112//uint gi_EvtBad = 0 ;
113//uint gi_EvtTot = 0 ;
114//size_t gi_usedMem = 0 ;
115
116//uint gw_EvtTot = 0 ;
117//uint gp_EvtTot = 0 ;
118
119PIX_MAP g_pixMap[NPIX];
120
121EVT_STAT gi;
122GUI_STAT gj;
123
124EVT_CTRL evtCtrl; //control of events during processing
125int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
126
127WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
128
129//#define MXSTR 1000
130//char str[MXSTR];
131
132void factPrintf(int severity, int id, const char *fmt, ...)
133{
134 char str[1000];
135
136 va_list ap;
137 va_start(ap, fmt);
138 vsnprintf(str, 1000, fmt, ap);
139 va_end(ap);
140
141 factOut(severity, id, str);
142}
143
144
145#define THOMAS_MALLOC
146
147#ifdef THOMAS_MALLOC
148#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
149#define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)
150typedef struct TGB_struct
151{
152 struct TGB_struct *prev;
153 void *mem;
154} TGB_entry;
155
156TGB_entry *tgb_last = NULL;
157uint64_t tgb_memory = 0;
158uint64_t tgb_inuse = 0;
159
160void *TGB_Malloc()
161{
162 // No free slot available, next alloc would exceed max memory
163 if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem)
164 return NULL;
165
166 // We will return this amount of memory
167 tgb_inuse += MAX_TOT_MEM;
168
169 // No free slot available, allocate a new one
170 if (!tgb_last)
171 {
172 tgb_memory += MAX_TOT_MEM;
173 return malloc(MAX_TOT_MEM);
174 }
175
176 // Get the next free slot from the stack and return it
177 TGB_entry *last = tgb_last;
178
179 TGB_entry *mem = last->mem;
180 tgb_last = last->prev;
181
182 free(last);
183
184 return mem;
185};
186
187void TGB_free(void *mem)
188{
189 // Add the last free slot to the stack
190 TGB_entry *entry = malloc(sizeof(TGB_entry));
191
192 entry->prev = tgb_last;
193 entry->mem = mem;
194
195 tgb_last = entry;
196
197 // Decrease the amont of memory in use accordingly
198 tgb_inuse -= MAX_TOT_MEM;
199}
200#endif
201
202#ifdef ETIENNE_MALLOC
203//ETIENNE
204#define MAX_SLOTS_PER_CHUNK 100
205
206typedef struct {
207 int32_t eventNumber;
208 int32_t chunk;
209 int32_t slot;
210} CHUNK_MAPPING;
211
212CHUNK_MAPPING mBufferMapping[MAX_EVT * MAX_RUN];
213
214#define MAX_EVT_MEM (sizeof(EVENT) + NPIX*1024*2 + NTMARK*1024*2)
215#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
216#define MAX_SLOT_SIZE (MAX_EVT_MEM + MAX_HEAD_MEM)
217#define MAX_CHUNK_SIZE (MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK)
218
219typedef struct {
220 void * pointers[MAX_SLOTS_PER_CHUNK];
221 int32_t events[MAX_SLOTS_PER_CHUNK];
222 int32_t nFreeSlots;
223 int32_t nSlots;
224} ETI_CHUNK;
225
226int32_t numAllocatedChunks = 0;
227
228#define MAX_CHUNKS 8096
229ETI_CHUNK EtiMemoryChunks[MAX_CHUNKS];
230
231void* ETI_Malloc(int evtId, int evtIndex)
232{
233 for (int i=0;i<numAllocatedChunks;i++) {
234 if (EtiMemoryChunks[i].nFreeSlots > 0) {
235 for (int j=0;j<EtiMemoryChunks[i].nSlots;j++)
236 {
237 if (EtiMemoryChunks[i].events[j] == -1)
238 {
239 EtiMemoryChunks[i].events[j] = evtId;
240 EtiMemoryChunks[i].nFreeSlots--;
241 if (EtiMemoryChunks[i].nFreeSlots < 0)
242 {
243 factPrintf(kError, 0, "Number of free slot in chunk %d went below zero (%d) slot: %d", i, EtiMemoryChunks[i].nFreeSlots, j);
244 return NULL;
245 }
246 mBufferMapping[evtIndex].eventNumber = evtId;
247 mBufferMapping[evtIndex].chunk = i;
248 mBufferMapping[evtIndex].slot = j;
249 return EtiMemoryChunks[i].pointers[j];
250 }
251 }
252 //If I reach this point then we have a problem because it should have found
253 //a free spot just above.
254 factPrintf(kError, 0, "Could not find a free slot in a chunk that's supposed to have some. chunk=%d", i);
255 return NULL;
256 }
257 }
258 //If we reach this point this means that we should allocate more memory
259 int32_t numNewSlots = MAX_SLOTS_PER_CHUNK;
260 if ((numAllocatedChunks + 1)*MAX_CHUNK_SIZE >= g_maxMem)
261 return NULL;
262
263 EtiMemoryChunks[numAllocatedChunks].pointers[0] = malloc(MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK);
264 if (EtiMemoryChunks[numAllocatedChunks].pointers[0] == NULL)
265 {
266 factPrintf(kError, 0, "Allocation of %lu bytes failed. %d chunks are currently allocated (max allowed %lu bytes)", MAX_CHUNK_SIZE, numAllocatedChunks, g_maxMem);
267 return NULL;
268 }
269
270 EtiMemoryChunks[numAllocatedChunks].nSlots = numNewSlots;
271 EtiMemoryChunks[numAllocatedChunks].events[0] = evtId;
272 EtiMemoryChunks[numAllocatedChunks].nFreeSlots = numNewSlots-1;
273 mBufferMapping[evtIndex].eventNumber = evtId;
274 mBufferMapping[evtIndex].chunk = numAllocatedChunks;
275 mBufferMapping[evtIndex].slot = 0;
276
277 for (int i=1;i<numNewSlots;i++)
278 {
279 EtiMemoryChunks[numAllocatedChunks].pointers[i] = (char*)EtiMemoryChunks[numAllocatedChunks].pointers[0] + i*MAX_SLOT_SIZE;// &(((char*)(EtiMemoryChunks[numAllocatedChunks].pointers[i-1]))[MAX_SLOT_SIZE]);
280 EtiMemoryChunks[numAllocatedChunks].events[i] = -1;
281 }
282 numAllocatedChunks++;
283
284 return EtiMemoryChunks[numAllocatedChunks-1].pointers[0];
285}
286
287void ETI_Free(int evtId, int evtIndex)
288{
289 ETI_CHUNK* currentChunk = &EtiMemoryChunks[mBufferMapping[evtIndex].chunk];
290 if (currentChunk->events[mBufferMapping[evtIndex].slot] != evtId)
291 {
292 factPrintf(kError, 0, "Mismatch in chunk mapping table. Expected evtId %d. Got %d. No memory was freed.", evtId, currentChunk->events[mBufferMapping[evtIndex].slot]);
293 return;
294 }
295 currentChunk->events[mBufferMapping[evtIndex].slot] = -1;
296 currentChunk->nFreeSlots++;
297
298 return; /* TEST */
299
300 int chunkIndex = mBufferMapping[evtIndex].chunk;
301 if (chunkIndex != numAllocatedChunks-1)
302 return;
303
304 while (EtiMemoryChunks[chunkIndex].nFreeSlots == EtiMemoryChunks[chunkIndex].nSlots)
305 {//free this chunk
306 if (EtiMemoryChunks[chunkIndex].pointers[0] == NULL)
307 {
308 factPrintf(kError, 0, "Chunk %d not allocated as it ought to be. Skipping memory release.", chunkIndex);
309 return;
310 }
311 free(EtiMemoryChunks[chunkIndex].pointers[0]);
312 EtiMemoryChunks[chunkIndex].pointers[0] = NULL;
313 EtiMemoryChunks[chunkIndex].nSlots = 0;
314 numAllocatedChunks--;
315 chunkIndex--;
316 if (numAllocatedChunks == 0)
317 break;
318 }
319}
320//END ETIENNE
321#endif
322
323
324RUN_HEAD actRun;
325
326RUN_CTRL runCtrl[MAX_RUN];
327
328RUN_TAIL runTail[MAX_RUN];
329
330
331/*
332*** Definition of rdBuffer to read in IP packets; keep it global !!!!
333 */
334
335
336typedef union
337{
338 int8_t B[MAX_LEN];
339 int16_t S[MAX_LEN / 2];
340 int32_t I[MAX_LEN / 4];
341 int64_t L[MAX_LEN / 8];
342} CNV_FACT;
343
344typedef struct
345{
346 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
347 int32_t bufPos; //next byte to read to the buffer next
348 int32_t bufLen; //number of bytes left to read
349// size_t bufLen; //number of bytes left to read size_t might be better
350 int32_t skip; //number of bytes skipped before start of event
351
352 int errCnt; //how often connect failed since last successful
353 int sockStat; //-1 if socket not yet connected , 99 if not exist
354 int socket; //contains the sockets
355 struct sockaddr_in SockAddr; //IP for each socket
356
357 int evtID; // event ID of event currently read
358 int runID; // run "
359 int ftmID; // event ID from FTM
360 uint fadLen; // FADlength of event currently read
361 int fadVers; // Version of FAD
362 int ftmTyp; // trigger type
363 int board; // boardID (softwareID: 0..40 )
364 int Port;
365
366 CNV_FACT *rBuf;
367
368#ifdef EVTDEBUG
369 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
370#endif
371
372} READ_STRUCT;
373
374
375typedef union
376{
377 int8_t B[2];
378 int16_t S;
379} SHORT_BYTE;
380
381
382
383
384
385SHORT_BYTE start, stop;
386
387READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
388
389
390
391/*-----------------------------------------------------------------*/
392
393
394/*-----------------------------------------------------------------*/
395
396
397
398int
399runFinish1 (uint32_t runnr)
400{
401 factPrintf(kInfo, 173, "Should finish(1) run %d (but not yet possible)", runnr);
402 return 0;
403}
404int
405runFinish (uint32_t runnr)
406{
407 factPrintf(kInfo, 173, "Should finish run %d (but not yet possible)", runnr);
408 return 0;
409}
410
411int
412GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
413 READ_STRUCT * rd)
414{
415/*
416*** generate Address, create sockets and allocates readbuffer for it
417***
418*** if flag==0 generate socket and buffer
419*** <0 destroy socket and buffer
420*** >0 close and redo socket
421***
422*** sid : board*7 + port id
423 */
424
425 int j;
426 int optval = 1; //activate keepalive
427 socklen_t optlen = sizeof (optval);
428
429
430 if (sid % 7 >= NUMSOCK) {
431 //this is a not used socket, so do nothing ...
432 rd->sockStat = 77;
433 rd->rBuf = NULL ;
434 return 0;
435 }
436
437 if (rd->sockStat == 0) { //close socket if open
438 j = close (rd->socket);
439 if (j > 0) {
440 factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno);
441 } else {
442 factPrintf(kInfo, 771, "Succesfully closed socket %d", sid);
443 }
444 }
445
446 rd->sockStat = 99;
447
448 if (flag < 0) {
449 free (rd->rBuf); //and never open again
450#ifdef EVTDEBUG
451 free (rd->xBuf); //and never open again
452#endif
453 rd->rBuf = NULL;
454 return 0;
455 }
456
457
458 if (flag == 0) { //generate address and buffer ...
459 rd->Port = port;
460 rd->SockAddr.sin_family = sockAddr->sin_family;
461 rd->SockAddr.sin_port = htons (port);
462 rd->SockAddr.sin_addr = sockAddr->sin_addr;
463
464#ifdef EVTDEBUG
465 rd->xBuf = malloc (sizeof (CNV_FACT));
466#endif
467 rd->rBuf = malloc (sizeof (CNV_FACT));
468 if (rd->rBuf == NULL) {
469 factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid);
470 rd->sockStat = 77;
471 return -3;
472 }
473 }
474
475
476 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
477 factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno);
478 rd->sockStat = 88;
479 return -2;
480 }
481 optval = 1;
482 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
483 factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
484 }
485 optval = 10; //start after 10 seconds
486 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
487 factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
488 }
489 optval = 10; //do every 10 seconds
490 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
491 factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
492 }
493 optval = 2; //close after 2 unsuccessful tries
494 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
495 factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
496 }
497
498 factPrintf(kInfo, 773, "Successfully generated socket %d", sid);
499
500 rd->sockStat = -1; //try to (re)open socket
501 rd->errCnt = 0;
502 return 0;
503
504} /*-----------------------------------------------------------------*/
505
506 /*-----------------------------------------------------------------*/
507
508
509
510
511int
512mBufInit ()
513{
514// initialize mBuffer (mark all entries as unused\empty)
515
516 uint32_t actime = g_actTime + 50000000;
517
518 for (int i = 0; i < MAX_EVT * MAX_RUN; i++) {
519 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
520 mBuffer[i].runNum = 0;
521
522 evtCtrl.evtBuf[i] = -1;
523 evtCtrl.evtStat[i] = -1;
524 evtCtrl.pcTime[i] = actime; //initiate to far future
525
526#ifdef ETIENNE_MALLOC
527 //ETIENNE
528 mBufferMapping[i].chunk = -1;
529 mBufferMapping[i].eventNumber = -1;
530 mBufferMapping[i].slot = -1;
531 //END ETIENNE
532#endif
533 }
534#ifdef ETIENNE_MALLOC
535 for (int j=0;j<MAX_CHUNKS;j++)
536 EtiMemoryChunks[j].pointers[0] = NULL;
537#endif
538
539 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
540
541 evtCtrl.frstPtr = 0;
542 evtCtrl.lastPtr = 0;
543
544 return 0;
545
546} /*-----------------------------------------------------------------*/
547
548
549
550
551int
552mBufEvt (int evID, uint runID, int nRoi[], int sk,
553 int fadlen, int trgTyp, int trgNum, int fadNum)
554{
555// generate a new Event into mBuffer:
556// make sure only complete Event are possible, so 'free' will always work
557// returns index into mBuffer[], or negative value in case of error
558// error: <-9000 if roi screwed up (not consistent with run)
559// <-8000 (not consistent with event)
560// <-7000 (not consistent with board)
561// < 0 if no space left
562
563 struct timeval tv;
564 uint32_t tsec, tusec;
565 uint oldest;
566 int jold;
567
568 int i, b, evFree;
569// int headmem = 0;
570// size_t needmem = 0;
571
572
573 b = sk / 7;
574
575 if (nRoi[0] < 0 || nRoi[0] > 1024) {
576 factPrintf(kError, 999, "Illegal roi in channel 0: %d (allowed: 0<=roi<=1024)", nRoi[0]);
577 gj.badRoiR++;
578 gj.badRoi[b]++;
579 return -9999;
580 }
581
582 for (int jr = 1; jr < 8; jr++) {
583 if (nRoi[jr] != nRoi[0]) {
584 factPrintf(kError, 711, "Mismatch of roi (%d) in channel %d with roi (%d) in channel 0.", nRoi[jr], jr, nRoi[0]);
585 gj.badRoiB++;
586 gj.badRoi[b]++;
587 return -7101;
588 }
589 }
590 if (nRoi[8] < nRoi[0]) {
591 factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", nRoi[8], nRoi[0]);
592 gj.badRoiB++;
593 gj.badRoi[b]++;
594 return -7102;
595 }
596
597
598 i = evID % MAX_EVT;
599 evFree = -1;
600
601 for (int k = 0; k < MAX_RUN; k++) {
602 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
603 // is it ok ????
604 if (mBuffer[i].nRoi != nRoi[0]
605 || mBuffer[i].nRoiTM != nRoi[8]) {
606 factPrintf(kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
607 mBuffer[i].nRoi, mBuffer[i].nRoiTM, nRoi[0], nRoi[8]);
608 gj.badRoiE++;
609 gj.badRoi[b]++;
610 return -8201;
611 }
612// count for inconsistencies
613
614 if (mBuffer[i].trgNum != trgNum)
615 mBuffer[i].Errors[0]++;
616 if (mBuffer[i].fadNum != fadNum)
617 mBuffer[i].Errors[1]++;
618 if (mBuffer[i].trgTyp != trgTyp)
619 mBuffer[i].Errors[2]++;
620
621 //everything seems fine so far ==> use this slot ....
622 return i;
623 }
624 if (evFree < 0 && mBuffer[i].evNum < 0)
625 evFree = i;
626 i += MAX_EVT;
627 }
628
629
630 //event does not yet exist; create it
631 if (evFree < 0) { //no space available in ctrl
632 factPrintf(kError, 881, "No control slot to keep event %d", evID);
633 return -1;
634 }
635 i = evFree; //found free entry; use it ...
636
637 gettimeofday (&tv, NULL);
638 tsec = tv.tv_sec;
639 tusec = tv.tv_usec;
640
641 //check if runId already registered in runCtrl
642 evFree = -1;
643 oldest = g_actTime + 1000;
644 jold = -1;
645 for (int k = 0; k < MAX_RUN; k++) {
646 if (runCtrl[k].runId == runID) {
647// if (runCtrl[k].procId > 0) { //run is closed -> reject
648// snprintf (str, MXSTR, "skip event since run %d finished", runID);
649// factOut (kInfo, 931, str);
650// return -21;
651// }
652
653 if (runCtrl[k].roi0 != nRoi[0]
654 || runCtrl[k].roi8 != nRoi[8]) {
655 factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d.",
656 runCtrl[k].roi0, runCtrl[k].roi8, nRoi[0], nRoi[8]);
657 gj.badRoiR++;
658 gj.badRoi[b]++;
659 return -9301;
660 }
661 goto RUNFOUND;
662 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
663 evFree = k;
664 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
665 if (runCtrl[k].closeTime < oldest) {
666 oldest = runCtrl[k].closeTime;
667 jold = k;
668 }
669 }
670 }
671
672 if (evFree < 0 && jold < 0) {
673 factPrintf(kFatal, 883, "Not able to register the new run %d", runID);
674 return -1001;
675 }
676
677 if (evFree < 0)
678 evFree = jold;
679 factPrintf(kInfo, 503, "New run %d (evFree=%d) registered with roi=%d and roi_tm=%d", runID,
680 evFree, nRoi[0], nRoi[8]);
681 runCtrl[evFree].runId = runID;
682 runCtrl[evFree].roi0 = nRoi[0];
683 runCtrl[evFree].roi8 = nRoi[8];
684 runCtrl[evFree].fileId = -2;
685 runCtrl[evFree].procId = -2;
686 runCtrl[evFree].lastEvt = -1;
687 runCtrl[evFree].nextEvt = 0;
688 runCtrl[evFree].actEvt = 0;
689 runCtrl[evFree].procEvt = 0;
690 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
691 runCtrl[evFree].firstUsec = tusec;
692 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
693 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
694// runCtrl[evFree].lastTime = 0;
695
696 runTail[evFree].nEventsOk =
697 runTail[evFree].nEventsRej =
698 runTail[evFree].nEventsBad =
699 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
700
701 RUNFOUND:
702 //ETIENNE
703/* needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
704
705 headmem = NBOARDS * sizeof (PEVNT_HEADER);
706
707 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
708 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
709 if (gi_memStat > 0) {
710 gi_memStat = -99;
711 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
712 evID, sk);
713 factOut (kError, 882, str);
714 } else {
715 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
716 evID, sk);
717 factOut (kDebug, 882, str);
718 }
719 return -11;
720 }
721 */
722
723#ifdef THOMAS_MALLOC
724 mBuffer[i].FADhead = TGB_Malloc();
725 mBuffer[i].fEvent = NULL;
726 mBuffer[i].buffer = NULL;
727 if (mBuffer[i].FADhead == NULL) {
728 factPrintf(kError, 882, "malloc header failed for event %d", evID);
729 return -12;
730 }
731 mBuffer[i].fEvent = (void*)((char*)mBuffer[i].FADhead+MAX_HEAD_MEM);
732#endif
733
734#ifdef ETIENNE_MALLOC
735 mBuffer[i].FADhead = ETI_Malloc(evID, i);
736 mBuffer[i].buffer = NULL;
737 if (mBuffer[i].FADhead != NULL)
738 mBuffer[i].fEvent = (EVENT*)&(((char*)(mBuffer[i].FADhead))[MAX_HEAD_MEM]);
739 else
740 {
741 mBuffer[i].fEvent = NULL;
742 gj.usdMem = 0;
743 for (int k=0;k<numAllocatedChunks;k++)
744 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
745 if (gj.usdMem > gj.maxMem)
746 gj.maxMem = gj.usdMem;
747 /*if (gi_memStat > 0) {
748 gi_memStat = -99;
749 snprintf (str, MXSTR, "No memory left to keep event %6d sock %3d",
750 evID, sk);
751 factOut (kError, 882, str);
752 }*/
753#ifdef EVTDEBUG
754 else
755 {
756 factPrintf(kDebug, 882, "No memory left to keep event %6d sock %3d", evID, sk);
757 }
758#endif
759 return -11;
760 }
761#endif
762
763#ifdef STANDARD_MALLOC
764
765 mBuffer[i].FADhead = malloc (MAX_HEAD_MEM+MAX_EVT_MEM);
766 mBuffer[i].fEvent = NULL;
767 mBuffer[i].buffer = NULL;
768 if (mBuffer[i].FADhead == NULL) {
769 factPrintf(kError, 882, "malloc header failed for event %d", evID);
770 return -12;
771 }
772 mBuffer[i].fEvent = (void*)((char*)mBuffer[i].FADhead+MAX_HEAD_MEM);
773#endif
774
775 /*
776 mBuffer[i].FADhead = malloc (headmem);
777 if (mBuffer[i].FADhead == NULL) {
778 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
779 factOut (kError, 882, str);
780 return -12;
781 }
782
783 mBuffer[i].fEvent = malloc (needmem);
784 if (mBuffer[i].fEvent == NULL) {
785 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
786 factOut (kError, 882, str);
787 free (mBuffer[i].FADhead);
788 mBuffer[i].FADhead = NULL;
789 return -22;
790 }
791
792 mBuffer[i].buffer = malloc (gi_maxSize);
793 if (mBuffer[i].buffer == NULL) {
794 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
795 factOut (kError, 882, str);
796 free (mBuffer[i].FADhead);
797 mBuffer[i].FADhead = NULL;
798 free (mBuffer[i].fEvent);
799 mBuffer[i].fEvent = NULL;
800 return -32;
801 }*/
802 //END ETIENNE
803 //flag all boards as unused
804 mBuffer[i].nBoard = 0;
805 for (int k = 0; k < NBOARDS; k++) {
806 mBuffer[i].board[k] = -1;
807 }
808 //flag all pixels as unused
809 for (int k = 0; k < NPIX; k++) {
810 mBuffer[i].fEvent->StartPix[k] = -1;
811 }
812 //flag all TMark as unused
813 for (int k = 0; k < NTMARK; k++) {
814 mBuffer[i].fEvent->StartTM[k] = -1;
815 }
816
817 mBuffer[i].fEvent->NumBoards = 0;
818 mBuffer[i].fEvent->PCUsec = tusec;
819 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
820 mBuffer[i].nRoi = nRoi[0];
821 mBuffer[i].nRoiTM = nRoi[8];
822 mBuffer[i].evNum = evID;
823 mBuffer[i].runNum = runID;
824 mBuffer[i].fadNum = fadNum;
825 mBuffer[i].trgNum = trgNum;
826 mBuffer[i].trgTyp = trgTyp;
827// mBuffer[i].evtLen = needmem;
828 mBuffer[i].Errors[0] =
829 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
830
831#ifdef ETIENNE_MALLOC
832//ETIENNE
833 //gj.usdMem += needmem + headmem + gi_maxSize;
834 gj.usdMem = 0;
835 for (int k=0;k<numAllocatedChunks;k++)
836 {
837 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
838 }
839//END ETIENNE
840#endif
841
842#ifdef THOMAS_MALLOC
843 gj.usdMem = tgb_inuse;
844#endif
845
846 if (gj.usdMem > gj.maxMem)
847 gj.maxMem = gj.usdMem;
848
849 gj.bufTot++;
850 if (gj.bufTot > gj.maxEvt)
851 gj.maxEvt = gj.bufTot;
852
853 gj.rateNew++;
854
855 //register event in 'active list (reading)'
856
857 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
858 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
859 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
860 evtIdx[i] = evtCtrl.lastPtr;
861
862
863#ifdef EVTDEBUG
864 factPrintf(kDebug, -11, "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d",
865 evID, runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
866#endif
867 evtCtrl.lastPtr++;
868 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN)
869 evtCtrl.lastPtr = 0;
870
871 gi.evtGet++;
872
873 return i;
874
875} /*-----------------------------------------------------------------*/
876
877
878
879
880int
881mBufFree (int i)
882{
883//delete entry [i] from mBuffer:
884//(and make sure multiple calls do no harm ....)
885
886// int headmem = 0;
887// size_t freemem = 0;
888
889#ifdef ETIENNE_MALLOC
890 int evid;
891 evid = mBuffer[i].evNum;
892 ETI_Free(evid, i);
893#endif
894
895// freemem = mBuffer[i].evtLen;
896 //ETIENNE
897#ifdef THOMAS_MALLOC
898 TGB_free(mBuffer[i].FADhead);
899#endif
900
901#ifdef STANDARD_MALLOC
902 free (mBuffer[i].FADhead);
903#endif
904
905// free (mBuffer[i].fEvent);
906 mBuffer[i].fEvent = NULL;
907
908// free (mBuffer[i].FADhead);
909 mBuffer[i].FADhead = NULL;
910
911// free (mBuffer[i].buffer);
912 mBuffer[i].buffer = NULL;
913//END ETIENNE
914// headmem = NBOARDS * sizeof (PEVNT_HEADER);
915 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
916 mBuffer[i].runNum = 0;
917
918#ifdef ETIENNE_MALLOC
919//ETIENNE
920// gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
921 gj.usdMem = 0;
922 for (int k=0;k<numAllocatedChunks;k++)
923 {
924 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
925 }
926//END ETIENNE
927#endif
928
929#ifdef THOMAS_MALLOC
930 gj.usdMem = tgb_inuse;
931#endif
932
933 gj.bufTot--;
934
935 /*if (gi_memStat < 0) {
936 if (gj.usdMem <= 0.75 * gj.maxMem)
937 gi_memStat = +1;
938 }*/
939
940 return 0;
941
942} /*-----------------------------------------------------------------*/
943
944
945void
946resetEvtStat ()
947{
948 for (int i = 0; i < MAX_SOCK; i++)
949 gi.numRead[i] = 0;
950
951 for (int i = 0; i < NBOARDS; i++) {
952 gi.gotByte[i] = 0;
953 gi.gotErr[i] = 0;
954
955 }
956
957 gi.evtGet = 0; //#new Start of Events read
958 gi.evtTot = 0; //#complete Events read
959 gi.evtErr = 0; //#Events with Errors
960 gi.evtSkp = 0; //#Events incomplete (timeout)
961
962 gi.procTot = 0; //#Events processed
963 gi.procErr = 0; //#Events showed problem in processing
964 gi.procTrg = 0; //#Events accepted by SW trigger
965 gi.procSkp = 0; //#Events rejected by SW trigger
966
967 gi.feedTot = 0; //#Events used for feedBack system
968 gi.feedErr = 0; //#Events rejected by feedBack
969
970 gi.wrtTot = 0; //#Events written to disk
971 gi.wrtErr = 0; //#Events with write-error
972
973 gi.runOpen = 0; //#Runs opened
974 gi.runClose = 0; //#Runs closed
975 gi.runErr = 0; //#Runs with open/close errors
976
977 return;
978} /*-----------------------------------------------------------------*/
979
980
981
982void
983initReadFAD ()
984{
985 return;
986} /*-----------------------------------------------------------------*/
987
988//struct rnd
989//{
990// int val;
991// int idx;
992//};
993//
994//struct rnd random_arr[MAX_SOCK];
995//
996//int compare(const void *f1, const void *f2)
997//{
998// struct rnd *r1 = (struct rnd*)f1;
999// struct rnd *r2 = (struct rnd*)f2;
1000// return r1->val - r2->val;
1001//}
1002
1003
1004void *
1005readFAD (void *ptr)
1006{
1007/* *** main loop reading FAD data and sorting them to complete events */
1008 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k;
1009 int actBoards = 0, minLen;
1010 int32_t jrd;
1011 uint gi_SecTime; //time in seconds
1012 int boardId, roi[9], drs, px, src, pixS, /*pixH,*/ pixC, pixR, tmS;
1013
1014 int goodhed = 0;
1015
1016 int sockDef[NBOARDS]; //internal state of sockets
1017 int jrdx;
1018
1019
1020 struct timespec xwait;
1021
1022
1023 struct timeval tv;
1024 uint32_t tsec, tusec;
1025
1026
1027 factPrintf(kInfo, -1, "Start initializing (readFAD)");
1028
1029// int cpu = 7; //read thread
1030// cpu_set_t mask;
1031
1032/* CPU_ZERO initializes all the bits in the mask to zero. */
1033// CPU_ZERO (&mask);
1034/* CPU_SET sets only the bit corresponding to cpu. */
1035// cpu = 7;
1036// CPU_SET (cpu, &mask);
1037
1038/* sched_setaffinity returns 0 in success */
1039// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1040// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
1041// factOut (kWarn, -1, str);
1042// }
1043
1044
1045 head_len = sizeof (PEVNT_HEADER);
1046 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
1047 minLen = head_len; //min #bytes needed to check header: full header for debug
1048
1049 start.S = 0xFB01;
1050 stop.S = 0x04FE;
1051
1052/* initialize run control logics */
1053 for (i = 0; i < MAX_RUN; i++) {
1054 runCtrl[i].runId = 0;
1055 runCtrl[i].fileId = -2;
1056 runCtrl[i].procId = -2;
1057 }
1058 gi_resetS = gi_resetR = 9;
1059 for (i = 0; i < NBOARDS; i++)
1060 sockDef[i] = 0;
1061
1062 START:
1063 gettimeofday (&tv, NULL);
1064 g_actTime = tsec = tv.tv_sec;
1065 g_actUsec = tusec = tv.tv_usec;
1066 gi_myRun = g_actTime;
1067 evtCtrl.frstPtr = 0;
1068 evtCtrl.lastPtr = 0;
1069
1070 gi_SecTime = g_actTime;
1071 gi_runStat = g_runStat;
1072 gj.readStat = g_runStat;
1073 numok = numok2 = 0;
1074
1075 int cntsock = 8 - NUMSOCK ;
1076
1077 if (gi_resetS > 0) {
1078 //make sure all sockets are preallocated as 'not exist'
1079 for (i = 0; i < MAX_SOCK; i++) {
1080 rd[i].socket = -1;
1081 rd[i].sockStat = 99;
1082 }
1083
1084 for (k = 0; k < NBOARDS; k++) {
1085 gi_NumConnect[k] = 0;
1086 gi.numConn[k] = 0;
1087 gj.numConn[k] = 0;
1088 gj.errConn[k] = 0;
1089 gj.rateBytes[k] = 0;
1090 gj.totBytes[k] = 0;
1091 }
1092
1093 }
1094
1095
1096 if (gi_resetR > 0) {
1097 resetEvtStat ();
1098 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
1099 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
1100#ifdef THOMAS_MALLOC
1101 gj.totMem = tgb_memory;
1102#else
1103 gj.totMem = g_maxMem;
1104#endif
1105 gj.bufNew = gj.bufEvt = 0;
1106 gj.badRoiE = gj.badRoiR = gj.badRoiB =
1107 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
1108
1109 int b;
1110 for (b = 0; b < NBOARDS; b++)
1111 gj.badRoi[b] = 0;
1112
1113 mBufInit (); //initialize buffers
1114
1115 factPrintf(kInfo, -1, "End initializing (readFAD)");
1116 }
1117
1118
1119 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
1120
1121 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
1122
1123 gi_runStat = g_runStat;
1124 gj.readStat = g_runStat;
1125 gettimeofday (&tv, NULL);
1126 g_actTime = tsec = tv.tv_sec;
1127 g_actUsec = tusec = tv.tv_usec;
1128
1129
1130 int b, p, p0, s0, nch;
1131 nch = 0;
1132 for (b = 0; b < NBOARDS; b++) {
1133 k = b * 7;
1134 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
1135 nch++;
1136 gi_NumConnect[b] = 0; //must close all connections
1137 gi.numConn[b] = 0;
1138 gj.numConn[b] = 0;
1139 if (sockDef[b] == 0)
1140 s0 = 0; //sockets to be defined and opened
1141 else if (g_port[b].sockDef == 0)
1142 s0 = -1; //sockets to be destroyed
1143 else
1144 s0 = +1; //sockets to be closed and reopened
1145
1146 if (s0 == 0)
1147 p0 = ntohs (g_port[b].sockAddr.sin_port);
1148 else
1149 p0 = 0;
1150
1151 for (p = p0 + 1; p < p0 + 8; p++) {
1152 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
1153 k++;
1154 }
1155 sockDef[b] = g_port[b].sockDef;
1156 }
1157 }
1158
1159 if (nch > 0) {
1160 actBoards = 0;
1161 for (b = 0; b < NBOARDS; b++) {
1162 if (sockDef[b] > 0)
1163 actBoards++;
1164 }
1165 }
1166
1167
1168 jrdx = 0;
1169 numokx = 0;
1170 numok = 0; //count number of succesfull actions
1171
1172/*
1173 for (i=0; i<MAX_SOCK/7; i++)
1174 {
1175 random_arr[i].val = rand();
1176 random_arr[i].idx = i;
1177 }
1178
1179 qsort(random_arr, MAX_SOCK/7, sizeof(struct rnd), compare);
1180
1181 for (int iii = 0; iii < MAX_SOCK/7; iii++) { //check all sockets if something to read
1182
1183 b = random_arr[iii].idx;
1184 i = b*7;
1185 p = 0;
1186 */
1187
1188 for (i = 0; i < MAX_SOCK; i+=7) { //check all sockets if something to read
1189
1190 b = i / 7 ;
1191 p = i % 7 ;
1192
1193/*if ( p >= NUMSOCK) { ; }
1194else*/ {
1195 if (sockDef[b] > 0)
1196 s0 = +1;
1197 else
1198 s0 = -1;
1199
1200 if (rd[i].sockStat < 0) { //try to connect if not yet done
1201 if (rd[i].sockStat == -1) {
1202 rd[i].sockStat = connect (rd[i].socket,
1203 (struct sockaddr *) &rd[i].SockAddr,
1204 sizeof (rd[i].SockAddr));
1205 if (rd[i].sockStat == -1) {
1206 rd[i].errCnt++ ;
1207 usleep(25000);
1208// if (rd[i].errCnt < 100) rd[i].sockStat = rd[i].errCnt ;
1209// else if (rd[i].errCnt < 1000) rd[i].sockStat = 1000 ;
1210// else rd[i].sockStat = 10000 ;
1211 }
1212//printf("try to connect %d -> %d\n",i,rd[i].sockStat);
1213
1214 }
1215 if (rd[i].sockStat < -1 ) {
1216 rd[i].sockStat++ ;
1217 }
1218 if (rd[i].sockStat == 0) { //successfull ==>
1219 if (sockDef[b] > 0) {
1220 rd[i].bufTyp = 0; // expect a header
1221 rd[i].bufLen = frst_len; // max size to read at begining
1222 } else {
1223 rd[i].bufTyp = -1; // full data to be skipped
1224 rd[i].bufLen = MAX_LEN; //huge for skipping
1225 }
1226 rd[i].bufPos = 0; // no byte read so far
1227 rd[i].skip = 0; // start empty
1228// gi_NumConnect[b]++;
1229 gi_NumConnect[b] += cntsock ;
1230
1231 gi.numConn[b]++;
1232 gj.numConn[b]++;
1233 factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", b, gi.numConn[b]);
1234 }
1235 }
1236
1237 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
1238
1239 if (rd[i].bufLen<=0)
1240 {
1241#ifdef EVTDEBUG
1242 factPrintf(kDebug, 301, "do not read from socket %d %d", i, rd[i].bufLen);
1243#endif
1244 continue;
1245 }
1246
1247// if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
1248 numok++;
1249
1250 jrd =
1251 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
1252 rd[i].bufLen, MSG_DONTWAIT);
1253
1254#ifdef EVTDEBUG
1255 if (jrd > 0) {
1256 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
1257 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
1258 &rd[i].rBuf->B[rd[i].bufPos], jrd);
1259 factPrintf(kDebug, 301, "read sock %3d bytes %5d len %5d first %d %d",
1260 i, jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos], rd[i].rBuf->B[rd[i].bufPos + 1]);
1261 }
1262#endif
1263
1264 if (jrd == 0) { //connection has closed ...
1265 factPrintf(kInfo, 441, "Socket %d closed by FAD", i);
1266 GenSock (s0, i, 0, NULL, &rd[i]);
1267 gi.gotErr[b]++;
1268// gi_NumConnect[b]--;
1269 gi_NumConnect[b]-= cntsock ;
1270 gi.numConn[b]--;
1271 gj.numConn[b]--;
1272
1273 } else if (jrd < 0) { //did not read anything
1274 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1275 factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);
1276 gi.gotErr[b]++;
1277 } else
1278 numok--; //else nothing waiting to be read
1279 //jrd = 0;
1280 }
1281/* } else {
1282 // jrd = 0; //did read nothing as requested
1283#ifdef EVTDEBUG
1284 snprintf (str, MXSTR, "do not read from socket %d %d", i,
1285 rd[i].bufLen);
1286 factOut (kDebug, 301, str);
1287#endif
1288 }
1289*/
1290 if (jrd<=0)
1291 continue;
1292
1293 gi.gotByte[b] += jrd;
1294 gj.rateBytes[b] += jrd;
1295
1296 //if (jrd > 0) {
1297 numokx++;
1298 jrdx += jrd;
1299 //}
1300
1301
1302 if (rd[i].bufTyp < 0) { // we are skipping this board ...
1303// just do nothing
1304#ifdef EVTDEBUG
1305 factPrintf(kInfo, 301, "skipping %d bytes on socket %d", jrd, i);
1306#endif
1307 continue;
1308 }
1309
1310 rd[i].bufPos += jrd; //==> prepare for continuation
1311 rd[i].bufLen -= jrd;
1312
1313 if (rd[i].bufTyp > 0) { // we are reading data ...
1314
1315 if (rd[i].bufLen>0/*jrd < rd[i].bufLen*/) { //not yet all read
1316#ifdef EVTDEBUG
1317 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
1318#endif
1319 continue;
1320 }
1321
1322 //int pos = rdi_bufPos = rd[i].bufPos;
1323 //int pos = rdi_bufLen = rd[i].bufLen;
1324 //int pos = rdi_bufTyp = rd[i].bufTyp;
1325
1326 rd[i].bufTyp = 0; //ready to read next header
1327 rd[i].bufLen = frst_len;
1328 rd[i].bufPos = 0;
1329
1330 //{
1331 //full dataset read
1332 //rd[i].bufLen = 0;
1333 //rd[i].bufPos = rd[i].fadLen;
1334 if (rd[i].rBuf->B[rd[i].fadLen - 1] != stop.B[0] ||
1335 rd[i].rBuf->B[rd[i].fadLen - 2] != stop.B[1]) {
1336 gi.evtErr++;
1337 factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), expected %3d %3d, got %3d %3d",
1338 i, rd[i].evtID, rd[i].fadLen, stop.B[0], stop.B[1],
1339 rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]);
1340 //goto EndBuf;
1341 continue;
1342
1343#ifdef EVTDEBUG
1344 } else {
1345 factPrintf(kDebug, 301, "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
1346 i, rd[i].fadLen, rd[i].rBuf->B[0],
1347 rd[i].rBuf->B[1], start.B[1], start.B[0],
1348 rd[i].rBuf->B[rd[i].fadLen - 2],
1349 rd[i].rBuf->B[rd[i].fadLen - 1], stop.B[1],
1350 stop.B[0]);
1351#endif
1352 }
1353
1354#ifdef EVTDEBUG
1355 //if (jrd > 0)
1356 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1357#endif
1358
1359/* //we have a complete buffer, copy to WORK area
1360 int jr, kr;
1361 int checkRoi;
1362 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1363 for (kr = 1; kr < 4; kr++)
1364 {
1365 checkRoi = ntohs(rd[i].rBuf->S[head_len/ 2 +
1366 + kr*(roi[jr-1]+4)]);
1367 if (checkRoi != roi[0])
1368 {
1369 snprintf (str, MXSTR, "Inconsistent Roi accross board patches");
1370 factOut (kFatal, 1, str);
1371 goto EndBuf;
1372 }
1373
1374 }
1375 for (jr = 1; jr < 9; jr++) {
1376 roi[jr] =
1377 ntohs (rd[i].
1378 rBuf->S[head_len / 2 + 2 + jr * (roi[jr-1] + 4)]);
1379
1380 }
1381*/
1382 //we have a complete buffer, copy to WORK area
1383
1384
1385 //End of the header. to channels now
1386 int eStart = 36;
1387 for (int ePatchesCount = 0; ePatchesCount<4*9;ePatchesCount++)
1388 {
1389 rd[i].rBuf->S[eStart+0] = ntohs(rd[i].rBuf->S[eStart+0]);//id
1390 rd[i].rBuf->S[eStart+1] = ntohs(rd[i].rBuf->S[eStart+1]);//start_cell
1391 rd[i].rBuf->S[eStart+2] = ntohs(rd[i].rBuf->S[eStart+2]);//roi
1392 rd[i].rBuf->S[eStart+3] = ntohs(rd[i].rBuf->S[eStart+3]);//filling
1393
1394 eStart += 4+rd[i].rBuf->S[eStart+2];//skip the pixel data
1395 }
1396
1397 //channels done. footer now
1398 // rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//package_crc
1399 // eCount++;
1400 // rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//end_package_flag
1401 // snprintf(str, MXSTR, "Bytes read %d bytes swapped %d", jrd, eCount*2);
1402 // factOut(kInfo, 000, str);
1403 //ETIENNE end of bytes swapping already
1404
1405
1406 int jr, kr;
1407 int checkRoi;
1408 int roiHopper = head_len/2 + 2; //points to the very first roi
1409 roi[0] = rd[i].rBuf->S[roiHopper];
1410 roiHopper += roi[0]+4;//skip to the second roi (i.e. next board, same patch-pixel)
1411 for (kr = 1; kr < 4; kr++)
1412 {
1413 checkRoi = rd[i].rBuf->S[roiHopper];
1414 if (checkRoi != roi[0])
1415 {
1416 factPrintf(kError, 1, "Inconsistent Roi accross boards B=%d, expected %d, got %d", kr, checkRoi, roi[0]);
1417// goto EndBuf;
1418 continue;
1419 }
1420 roiHopper += checkRoi+4;
1421 }
1422 //roiHopper now points to the first pixel of board 2. Do the 8 remaining pixels
1423 for (jr = 1; jr < 9; jr++) {
1424 roi[jr] = rd[i].rBuf->S[roiHopper];
1425 checkRoi = roi[jr];
1426 for (kr = 1; kr < 4; kr++)
1427 {
1428 roiHopper += checkRoi+4;
1429 checkRoi = rd[i].rBuf->S[roiHopper];
1430 if (checkRoi != roi[jr])
1431 {
1432 factPrintf(kFatal, 1, "Inconsistent Roi accross patches B=%d P=%d, expected %d, got %d", kr, jr, roi[jr], checkRoi);
1433// goto EndBuf;
1434 continue;
1435 }
1436 }
1437 }
1438 //get index into mBuffer for this event (create if needed)
1439
1440// int actid;
1441// if (g_useFTM > 0)
1442// actid = rd[i].evtID;
1443// else
1444// actid = rd[i].ftmID;
1445
1446 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1447 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1448 rd[i].evtID);
1449
1450 if (evID < -1000) {
1451// goto EndBuf; //not usable board/event/run --> skip it
1452 //rd[i].bufTyp = 0; //ready to read next header
1453 //rd[i].bufLen = frst_len;
1454 //rd[i].bufPos = 0;
1455 continue;
1456 }
1457 if (evID < 0) { //no space left, retry later
1458#ifdef EVTDEBUG
1459 if (rd[i].bufLen != 0) {
1460 factPrintf(kFatal, 1, "something screwed up");
1461 }
1462#endif
1463 rd[i].bufTyp = -1;
1464 rd[i].bufLen = 0;
1465 rd[i].bufPos = rd[i].fadLen;
1466
1467 //xwait.tv_sec = 0;
1468 //xwait.tv_nsec = 10000000; // sleep for ~10 msec
1469 //nanosleep (&xwait, NULL);
1470 continue;
1471 //goto EndBuf1; //hope there is free space next round
1472 }
1473 //we have a valid entry in mBuffer[]; fill it
1474
1475#ifdef EVTDEBUG
1476 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1477 rd[i].fadLen);
1478 if (xchk != 0) {
1479 factPrintf(kFatal, 1, "ERROR OVERWRITE %d %d on port %d", xchk, rd[i].fadLen, i);
1480
1481 uint iq;
1482 for (iq = 0; iq < rd[i].fadLen; iq++) {
1483 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1484 factPrintf(kFatal, 1, "ERROR %4d %4d %x %x", i, iq, rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
1485 }
1486 }
1487 }
1488#endif
1489 int qncpy = 0;
1490 boardId = b;
1491 int fadBoard = rd[i].rBuf->S[12];
1492 int fadCrate = fadBoard / 256;
1493 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1494 factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)",
1495 boardId, fadBoard, fadCrate, fadBoard % 256);
1496 }
1497 if (mBuffer[evID].board[boardId] != -1) {
1498 factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d",
1499 evID, boardId, i, rd[i].fadLen,
1500 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1501 rd[i].rBuf->B[rd[i].fadLen - 2],
1502 rd[i].rBuf->B[rd[i].fadLen - 1]);
1503// goto EndBuf; //--> skip Board
1504 //rd[i].bufTyp = 0; //ready to read next header
1505 //rd[i].bufLen = frst_len;
1506 //rd[i].bufPos = 0;
1507 continue;
1508 }
1509
1510 int iDx = evtIdx[evID]; //index into evtCtrl
1511
1512 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
1513 &rd[i].rBuf->S[0], head_len);
1514 qncpy += head_len;
1515
1516 src = head_len / 2;
1517 for (px = 0; px < 9; px++) { //different sort in FAD board.....
1518 for (drs = 0; drs < 4; drs++) {
1519 // pixH = rd[i].rBuf->S[src++]; // ID
1520 src++;
1521 pixC = rd[i].rBuf->S[src++]; // start-cell
1522 pixR = rd[i].rBuf->S[src++]; // roi
1523//here we should check if pixH is correct ....
1524
1525 pixS = boardId * 36 + drs * 9 + px;
1526 src++;
1527
1528
1529 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1530 dest = pixS * roi[0];
1531 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1532 &rd[i].rBuf->S[src], roi[0] * 2);
1533 qncpy += roi[0] * 2;
1534 src += pixR;
1535
1536 if (px == 8) {
1537 tmS = boardId * 4 + drs;
1538 if (pixR > roi[0]) { //and we have additional TM info
1539 dest = tmS * roi[0] + NPIX * roi[0];
1540 int srcT = src - roi[0];
1541 mBuffer[evID].fEvent->StartTM[tmS] =
1542 (pixC + pixR - roi[0]) % 1024;
1543 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1544 &rd[i].rBuf->S[srcT], roi[0] * 2);
1545 qncpy += roi[0] * 2;
1546 } else {
1547 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1548 //ETIENNE because the TM channels are always processed during drs calib,
1549 //set them to zero if they are not present
1550 //I suspect that it may be more efficient to set all the allocated mem to
1551 //zero when allocating it
1552// dest = tmS*roi[0] + NPIX*roi[0];
1553 // bzero(&mBuffer[evID].fEvent->Adc_Data[dest],roi[0]*2);
1554 }
1555 }
1556 }
1557 } // now we have stored a new board contents into Event structure
1558
1559 mBuffer[evID].fEvent->NumBoards++;
1560 mBuffer[evID].board[boardId] = boardId;
1561 evtCtrl.evtStat[iDx]++;
1562 evtCtrl.pcTime[iDx] = g_actTime;
1563
1564 if (++mBuffer[evID].nBoard >= actBoards) {
1565 int qnrun = 0;
1566 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ???
1567 actrun = mBuffer[evID].runNum;
1568 int ir;
1569 for (ir = 0; ir < MAX_RUN; ir++) {
1570 qnrun++;
1571 if (runCtrl[ir].runId == actrun) {
1572 if (++runCtrl[ir].lastEvt == 0) {
1573 gotNewRun (actrun, mBuffer[evID].FADhead);
1574 factPrintf(kInfo, 1, "gotNewRun called for run %d, event %d",
1575 mBuffer[evID].runNum, mBuffer[evID].evNum);
1576 break;
1577 }
1578 }
1579 }
1580 }
1581#ifdef EVTDEBUG
1582 factPrintf(kDebug, -1, "%5d complete event roi %4d roiTM %d cpy %8d %5d",
1583 mBuffer[evID].evNum, roi[0], roi[8] - roi[0], qncpy, qnrun);
1584#endif
1585 //complete event read ---> flag for next processing
1586 evtCtrl.evtStat[iDx] = 99;
1587 gi.evtTot++;
1588 }
1589
1590// EndBuf:
1591 //rd[i].bufTyp = 0; //ready to read next header
1592 //rd[i].bufLen = frst_len;
1593 //rd[i].bufPos = 0;
1594 //EndBuf1:
1595 ;
1596 //}
1597
1598 } else { //we are reading event header
1599 //rd[i].bufPos += jrd;
1600 //rd[i].bufLen -= jrd;
1601 if (rd[i].bufPos < minLen) //{ //sufficient data to take action
1602 {
1603#ifdef EVTDEBUG
1604 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1605#endif
1606 continue;
1607 }
1608
1609 //check if startflag correct; else shift block ....
1610 for (k = 0; k < rd[i].bufPos - 1; k++) {
1611 if (rd[i].rBuf->B[k] == start.B[1]
1612 && rd[i].rBuf->B[k + 1] == start.B[0])
1613 break;
1614 }
1615 rd[i].skip += k;
1616
1617 if (k >= rd[i].bufPos - 1) { //no start of header found
1618 rd[i].bufPos = 0;
1619 rd[i].bufLen = head_len;
1620 continue;
1621 }
1622
1623 if (k > 0) {
1624 rd[i].bufPos -= k;
1625 rd[i].bufLen += k;
1626 memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1627 rd[i].bufPos);
1628#ifdef EVTDEBUG
1629 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1630 rd[i].bufPos);
1631#endif
1632 }
1633
1634 if (rd[i].bufPos < minLen)
1635 {
1636#ifdef EVTDEBUG
1637 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1638#endif
1639 continue;
1640 }
1641
1642 //{
1643 if (rd[i].skip > 0) {
1644 factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i);
1645 rd[i].skip = 0;
1646 }
1647
1648 // TGB: This needs much more checks than just the first two bytes!
1649 goodhed++;
1650
1651 // Swap everything except start_package_flag.
1652 // It is to difficult to find out where it is used how,
1653 // but it doesn't really matter because it is not really
1654 // used anywehere else
1655// rd[i].rBuf->S[1] = ntohs(rd[i].rBuf->S[1]); // package_length
1656 rd[i].rBuf->S[2] = ntohs(rd[i].rBuf->S[2]); // version_no
1657 rd[i].rBuf->S[3] = ntohs(rd[i].rBuf->S[3]); // PLLLCK
1658 rd[i].rBuf->S[4] = ntohs(rd[i].rBuf->S[4]); // trigger_crc
1659 rd[i].rBuf->S[5] = ntohs(rd[i].rBuf->S[5]); // trigger_type
1660
1661 rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]); // board id
1662 rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]); // adc_clock_phase_shift
1663 rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]); // number_of_triggers_to_generate
1664 rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]); // trigger_generator_prescaler
1665
1666 rd[i].rBuf->I[3] = ntohl(rd[i].rBuf->I[3]); // trigger_id
1667 rd[i].rBuf->I[4] = ntohl(rd[i].rBuf->I[4]); // fad_evt_counter
1668 rd[i].rBuf->I[5] = ntohl(rd[i].rBuf->I[5]); // REFCLK_frequency
1669
1670 rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]); // runnumber;
1671 rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]); // time;
1672
1673 for (int s=24;s<24+NTemp+NDAC;s++)
1674 rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac
1675
1676 rd[i].fadLen = ntohs(rd[i].rBuf->S[1]) * 2;
1677 rd[i].fadVers = rd[i].rBuf->S[2];
1678 rd[i].ftmTyp = rd[i].rBuf->S[5];
1679 rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt)
1680 rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt)
1681 rd[i].runID = rd[i].rBuf->I[11];
1682 rd[i].bufTyp = 1; //ready to read full record
1683 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1684
1685#ifdef EVTDEBUG
1686 int fadboard = rd[i].rBuf->S[12];
1687 int fadcrate = fadboard / 256;
1688 fadboard = (fadcrate * 10 + fadboard % 256);
1689 factPrintf(kDebug, 1, "sk %3d head: %5d %5d %5d %10d %4d %6d",
1690 i, rd[i].fadLen, rd[i].evtID, rd[i].ftmID, rd[i].runID, fadboard, jrd);
1691#endif
1692
1693 if (rd[i].runID == 0)
1694 rd[i].runID = gi_myRun;
1695
1696 /*if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
1697 snprintf (str, MXSTR,
1698 "Illegal event-length %d on port %d, %d expected.", rd[i].bufLen, i, head_len);
1699 factOut (kFatal, 881, str);
1700 rd[i].bufLen = 100000; //?
1701 }*/
1702
1703 int fadBoard = rd[i].rBuf->S[12];
1704 debugHead (i, fadBoard, rd[i].rBuf);
1705 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1706 } //end interpreting last read
1707}
1708 } //end of successful read anything
1709 } //finished trying to read all sockets
1710
1711#ifdef EVTDEBUG
1712 factDebug(kDebug, -1, "Loop ---- %3d --- %8d", numokx, jrdx);
1713#endif
1714
1715 gi.numRead[numok]++;
1716
1717 g_actTime = time (NULL);
1718 if (g_actTime <= gi_SecTime) {
1719 usleep(1);
1720 continue;
1721 }
1722
1723 gi_SecTime = g_actTime;
1724
1725
1726 //loop over all active events and flag those older than read-timeout
1727 //delete those that are written to disk ....
1728
1729 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1730 if (kd < 0)
1731 kd += (MAX_EVT * MAX_RUN);
1732
1733 gj.bufNew = gj.bufEvt = 0;
1734 int k1 = evtCtrl.frstPtr;
1735 for (k = k1; k < (k1 + kd); k++) {
1736 int k0 = k % (MAX_EVT * MAX_RUN);
1737//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1738
1739 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) {
1740 gj.bufNew++; //incomplete event in Buffer
1741 if (evtCtrl.evtStat[k0] < 90
1742 && evtCtrl.pcTime[k0] < g_actTime - 30) {
1743 int id = evtCtrl.evtBuf[k0];
1744 factPrintf(kWarn, 601, "%5d skip incomplete evt %8d %8d %2d",
1745 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0, evtCtrl.evtStat[k0]);
1746
1747 uint64_t report = 0;
1748
1749 char str[1000];
1750
1751 int ik,ib,jb;
1752 ik=0;
1753 for (ib=0; ib<NBOARDS; ib++) {
1754 if (ib%10==0) {
1755 str[ik++] = '|';
1756 }
1757 jb = mBuffer[id].board[ib];
1758 if ( jb<0 ) {
1759 if (gi_NumConnect[b] >0 ) {
1760 str[ik++] = '.';
1761 report |= ((uint64_t)1)<<ib;
1762 } else {
1763 str[ik++] = 'x';
1764 }
1765 } else {
1766 str[ik++] = '0'+(jb%10);
1767 }
1768 }
1769 str[ik++] = '|';
1770 str[ik] = 0;
1771 factOut(kWarn, 601, str);
1772
1773 factReportIncomplete(report);
1774
1775 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events
1776 gi.evtSkp++;
1777 gi.evtTot++;
1778 gj.evtSkip++;
1779 }
1780 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete'
1781 || evtCtrl.evtStat[k0] == 0) { //'useless'
1782
1783 int id = evtCtrl.evtBuf[k0];
1784#ifdef EVTDEBUG
1785 factPrintf(kDebug, -1, "%5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard);
1786#endif
1787 mBufFree (id); //event written--> free memory
1788 evtCtrl.evtStat[k0] = -1;
1789 gj.evtWrite++;
1790 gj.rateWrite++;
1791 } else if (evtCtrl.evtStat[k0] >= 95) {
1792 gj.bufEvt++; //complete event in Buffer
1793 }
1794
1795 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) {
1796 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1797 }
1798 }
1799
1800
1801 gj.deltaT = 1000; //temporary, must be improved
1802
1803 for (int ib = 0; ib < NBOARDS; ib++)
1804 gj.totBytes[ib] += gj.rateBytes[ib];
1805#ifdef THOMAS_MALLOC
1806 gj.totMem = tgb_memory;
1807#else
1808 gj.totMem = g_maxMem;
1809#endif
1810 if (gj.maxMem > gj.xxxMem)
1811 gj.xxxMem = gj.maxMem;
1812 if (gj.maxEvt > gj.xxxEvt)
1813 gj.xxxEvt = gj.maxEvt;
1814
1815 factStat (gj);
1816 factStatNew (gi);
1817 gj.rateNew = gj.rateWrite = 0;
1818 gj.maxMem = gj.usdMem;
1819 gj.maxEvt = gj.bufTot;
1820 for (b = 0; b < NBOARDS; b++)
1821 gj.rateBytes[b] = 0;
1822/*
1823 }
1824 if (numok > 0)
1825 numok2 = 0;
1826 else if (numok2++ > 3) {
1827 if (g_runStat == 1) {
1828 xwait.tv_sec = 1;
1829 xwait.tv_nsec = 0; // hibernate for 1 sec
1830 } else {
1831 xwait.tv_sec = 0;
1832 xwait.tv_nsec = 1000; // sleep for ~1 usec
1833 }
1834 nanosleep (&xwait, NULL);
1835 }
1836 */
1837 } //and do next loop over all sockets ...
1838
1839 factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset);
1840
1841 if (g_reset > 0) {
1842 gi_reset = g_reset;
1843 gi_resetR = gi_reset % 10; //shall we stop reading ?
1844 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1845 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1846 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1847 g_reset = 0;
1848 } else {
1849 gi_reset = 0;
1850 if (g_runStat == -1)
1851 gi_resetR = 1;
1852 else
1853 gi_resetR = 7;
1854 gi_resetS = 7; //close all sockets
1855 gi_resetW = 7; //close all files
1856 gi_resetX = 0;
1857
1858 //inform others we have to quit ....
1859 gi_runStat = -11; //inform all that no update to happen any more
1860 gj.readStat = -11; //inform all that no update to happen any more
1861 }
1862
1863 if (gi_resetS > 0) {
1864 //must close all open sockets ...
1865 factPrintf(kInfo, -1, "Close all sockets...");
1866 for (i = 0; i < MAX_SOCK; i++) {
1867 if (rd[i].sockStat == 0) {
1868 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1869 if (i % 7 == 0) {
1870// gi_NumConnect[i / 7]--;
1871 gi_NumConnect[i / 7]-= cntsock ;
1872 gi.numConn[i / 7]--;
1873 gj.numConn[i / 7]--;
1874 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1875 rd[i / 7].sockStat = -1; //and try to open asap
1876 }
1877 }
1878 }
1879 }
1880
1881
1882 if (gi_resetR > 0) {
1883 //flag all events as 'read finished'
1884 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1885 if (kd < 0)
1886 kd += (MAX_EVT * MAX_RUN);
1887
1888 int k1 = evtCtrl.frstPtr;
1889 for (k = k1; k < (k1 + kd); k++) {
1890 int k0 = k % (MAX_EVT * MAX_RUN);
1891 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) {
1892 evtCtrl.evtStat[k0] = 91;
1893 gi.evtSkp++;
1894 gi.evtTot++;
1895 }
1896 }
1897
1898 xwait.tv_sec = 0;
1899 xwait.tv_nsec = 1000; // sleep for ~1 usec
1900 nanosleep (&xwait, NULL);
1901
1902 //and clear all buffers (might have to wait until all others are done)
1903 int minclear;
1904 if (gi_resetR == 1) {
1905 minclear = 900;
1906 factPrintf(kInfo, -1, "Drain all buffers ...");
1907 } else {
1908 minclear = 0;
1909 factPrintf(kInfo, -1, "Flush all buffers ...");
1910 }
1911
1912 int numclear = 1;
1913 while (numclear > 0) {
1914 numclear = 0;
1915 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1916 if (kd < 0)
1917 kd += (MAX_EVT * MAX_RUN);
1918
1919 int k1 = evtCtrl.frstPtr;
1920 for (k = k1; k < (k1 + kd); k++) {
1921 int k0 = k % (MAX_EVT * MAX_RUN);
1922 if (evtCtrl.evtStat[k0] > minclear) {
1923 int id = evtCtrl.evtBuf[k0];
1924#ifdef EVTDEBUG
1925 factPrintf(kDebug, -1, "ev %5d free event buffer, nb=%3d", mBuffer[id].evNum, mBuffer[id].nBoard);
1926#endif
1927 mBufFree (id); //event written--> free memory
1928 evtCtrl.evtStat[k0] = -1;
1929 } else if (evtCtrl.evtStat[k0] > 0)
1930 numclear++; //writing is still ongoing...
1931
1932 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1933 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1934 }
1935
1936 xwait.tv_sec = 0;
1937 xwait.tv_nsec = 1000; // sleep for ~1 umsec
1938 nanosleep (&xwait, NULL);
1939 }
1940 }
1941
1942 if (gi_reset > 0) {
1943 if (gi_resetW > 0) {
1944 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1945 }
1946 if (gi_resetX > 0) {
1947 xwait.tv_sec = gi_resetX;
1948 xwait.tv_nsec = 0;
1949 nanosleep (&xwait, NULL);
1950 }
1951
1952 factPrintf(kInfo, -1, "Continue read Process ...");
1953 gi_reset = 0;
1954 goto START;
1955 }
1956
1957
1958
1959 factPrintf(kInfo, -1, "Exit read Process...");
1960
1961#ifdef THOMAS_MALLOC
1962 factPrintf(kInfo, -1, "%ld Bytes flaged as in-use.", tgb_inuse);
1963#endif
1964
1965 gi_runStat = -99;
1966 gj.readStat = -99;
1967 factStat (gj);
1968 factStatNew (gi);
1969 return 0;
1970
1971} /*-----------------------------------------------------------------*/
1972
1973
1974void *
1975subProc (void *thrid)
1976{
1977 int64_t threadID;
1978 int numWait, numProc, k, jret;
1979 struct timespec xwait;
1980
1981// int32_t cntr ;
1982
1983 threadID = (int64_t)thrid;
1984
1985 factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID);
1986
1987 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1988 numWait = numProc = 0;
1989 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1990 if (kd < 0)
1991 kd += (MAX_EVT * MAX_RUN);
1992
1993 int k1 = evtCtrl.frstPtr;
1994 for (k = k1; k < (k1 + kd); k++) {
1995 int k0 = k % (MAX_EVT * MAX_RUN);
1996
1997 if (evtCtrl.evtStat[k0] == 1000 + threadID) {
1998 if (gi_resetR > 1) { //we are asked to flush buffers asap
1999 jret = 9100; //flag to be deleted
2000 } else {
2001 int id = evtCtrl.evtBuf[k0];
2002
2003
2004 jret =
2005 subProcEvt (threadID, mBuffer[id].FADhead,
2006 mBuffer[id].fEvent, mBuffer[id].buffer);
2007
2008
2009 if (jret <= threadID) {
2010 factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret);
2011 jret = 5300;
2012 } else if (jret <= 0)
2013 jret = 9200 + threadID; //flag as 'to be deleted'
2014 else if (jret >= gi_maxProc)
2015 jret = 5200 + threadID; //flag as 'to be written'
2016 else
2017 jret = 1000 + jret; //flag for next proces
2018 }
2019 evtCtrl.evtStat[k0] = jret;
2020 numProc++;
2021 } else if (evtCtrl.evtStat[k0] < 1000 + threadID)
2022 numWait++;
2023 }
2024
2025 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2026 factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID);
2027 return 0;
2028 }
2029 if (numProc == 0) {
2030 //seems we have nothing to do, so sleep a little
2031 xwait.tv_sec = 0;
2032 xwait.tv_nsec = 1000; // sleep for ~1 usec
2033 nanosleep (&xwait, NULL);
2034 }
2035 }
2036
2037 factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID);
2038 return 0;
2039} /*-----------------------------------------------------------------*/
2040
2041
2042void *
2043procEvt (void *ptr)
2044{
2045/* *** main loop processing file, including SW-trigger */
2046 int numProc, numWait;
2047 int status, j;
2048 struct timespec xwait;
2049
2050 int lastRun = 0; //usually run from last event still valid
2051
2052// cpu_set_t mask;
2053// int cpu = 1; //process thread (will be several in final version)
2054
2055 factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc);
2056
2057/* CPU_ZERO initializes all the bits in the mask to zero. */
2058// CPU_ZERO (&mask);
2059/* CPU_SET sets only the bit corresponding to cpu. */
2060// CPU_SET( 0 , &mask ); leave for system
2061// CPU_SET( 1 , &mask ); used by write process
2062// CPU_SET (2, &mask);
2063// CPU_SET (3, &mask);
2064// CPU_SET (4, &mask);
2065// CPU_SET (5, &mask);
2066// CPU_SET (6, &mask);
2067// CPU_SET( 7 , &mask ); used by read process
2068/* sched_setaffinity returns 0 in success */
2069// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
2070// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
2071// factOut (kWarn, -1, str);
2072// }
2073
2074
2075 pthread_t thread[100];
2076// int th_ret[100];
2077
2078 for (long long k = 0; k < gi_maxProc; k++) {
2079 /*th_ret[k] =*/ pthread_create (&thread[k], NULL, subProc, (void *) k);
2080 }
2081
2082 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
2083
2084 numWait = numProc = 0;
2085 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2086 if (kd < 0)
2087 kd += (MAX_EVT * MAX_RUN);
2088
2089 int k1 = evtCtrl.frstPtr;
2090 for (int k = k1; k < (k1 + kd); k++) {
2091 int k0 = k % (MAX_EVT * MAX_RUN);
2092//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
2093 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) {
2094
2095 if (gi_resetR > 1) { //we are asked to flush buffers asap
2096 evtCtrl.evtStat[k0] = 9991;
2097 } else {
2098
2099//-------- it is better to open the run already here, so call can be used to initialize
2100//-------- buffers etc. needed to interprete run (e.g. DRS calibration)
2101 int id = evtCtrl.evtBuf[k0];
2102 uint32_t irun = mBuffer[id].runNum;
2103 int32_t ievt = mBuffer[id].evNum;
2104 if (runCtrl[lastRun].runId == irun) {
2105 j = lastRun;
2106 } else {
2107 //check which fileID to use (or open if needed)
2108 for (j = 0; j < MAX_RUN; j++) {
2109 if (runCtrl[j].runId == irun)
2110 break;
2111 }
2112 if (j >= MAX_RUN) {
2113 factPrintf(kFatal, 901, "procEvt: Can not find run %d for event %d in %d", irun, ievt, id);
2114 }
2115 lastRun = j;
2116 }
2117
2118 if (runCtrl[j].fileId < 0) {
2119//---- we need to open a new run ==> make sure all older runs are
2120//---- finished and marked to be closed ....
2121 int j1;
2122 for (j1 = 0; j1 < MAX_RUN; j1++) {
2123 if (runCtrl[j1].fileId == 0) {
2124 runCtrl[j1].procId = 2; //--> do no longer accept events for processing
2125//---- problem: processing still going on ==> must wait for closing ....
2126 factPrintf(kInfo, -1, "procEvt: Finished run since new one opened %d", runCtrl[j1].runId);
2127 runFinish1 (runCtrl[j1].runId);
2128 }
2129
2130 }
2131
2132 actRun.Version = 1;
2133 actRun.RunType = -1; //to be adapted
2134
2135 actRun.Nroi = runCtrl[j].roi0;
2136 actRun.NroiTM = runCtrl[j].roi8;
2137//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
2138// if (actRun.Nroi == actRun.NroiTM)
2139// actRun.NroiTM = 0;
2140 actRun.RunTime = runCtrl[j].firstTime;
2141 actRun.RunUsec = runCtrl[j].firstTime;
2142 actRun.NBoard = NBOARDS;
2143 actRun.NPix = NPIX;
2144 actRun.NTm = NTMARK;
2145 actRun.Nroi = mBuffer[id].nRoi;
2146 memcpy (actRun.FADhead, mBuffer[id].FADhead,
2147 NBOARDS * sizeof (PEVNT_HEADER));
2148
2149 runCtrl[j].fileHd =
2150 runOpen (irun, &actRun, sizeof (actRun));
2151 if (runCtrl[j].fileHd == NULL) {
2152 factPrintf(kError, 502, "procEvt: Could not open a file for run %d (runOpen failed)", irun);
2153 runCtrl[j].fileId = 91;
2154 runCtrl[j].procId = 91;
2155 } else {
2156 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt);
2157 runCtrl[j].fileId = 0;
2158 runCtrl[j].procId = 0;
2159 }
2160
2161 }
2162//-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
2163 if (runCtrl[j].procId == 0) {
2164 if (runCtrl[j].closeTime < g_actTime
2165 || runCtrl[j].lastTime < g_actTime - 300
2166 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) {
2167 factPrintf(kInfo, 502, "procEvt: Reached end of run condition for run %d", irun);
2168 runFinish1 (runCtrl[j].runId);
2169 runCtrl[j].procId = 1;
2170 }
2171 }
2172 if (runCtrl[j].procId != 0) {
2173#ifdef EVTDEBUG
2174 fcatPrintf(kDebug, 502, "procEvt: Skip event %d because no active run %d", ievt, irun);
2175#endif
2176 evtCtrl.evtStat[k0] = 9091;
2177 } else {
2178//--------
2179//--------
2180 id = evtCtrl.evtBuf[k0];
2181 int itevt = mBuffer[id].trgNum;
2182 int itrg = mBuffer[id].trgTyp;
2183 int roi = mBuffer[id].nRoi;
2184 int roiTM = mBuffer[id].nRoiTM;
2185
2186//make sure unused pixels/tmarks are cleared to zero
2187//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
2188// if (roiTM == roi)
2189// roiTM = 0;
2190 int ip, it, dest, ib;
2191 for (ip = 0; ip < NPIX; ip++) {
2192 if (mBuffer[id].fEvent->StartPix[ip] == -1) {
2193 dest = ip * roi;
2194 memset (&mBuffer[id].fEvent->Adc_Data[dest], 0, roi * 2);
2195 }
2196 }
2197
2198 for (it = 0; it < NTMARK; it++) {
2199 if (mBuffer[id].fEvent->StartTM[it] == -1) {
2200 dest = it * roi + NPIX * roi;
2201 memset (&mBuffer[id].fEvent->Adc_Data[dest], 0, roi * 2);
2202 }
2203 }
2204
2205
2206//and set correct event header ; also check for consistency in event (not yet)
2207 mBuffer[id].fEvent->Roi = roi;
2208 mBuffer[id].fEvent->RoiTM = roiTM;
2209 mBuffer[id].fEvent->EventNum = ievt;
2210 mBuffer[id].fEvent->TriggerNum = itevt;
2211 mBuffer[id].fEvent->TriggerType = itrg;
2212 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
2213 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
2214 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
2215 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
2216 mBuffer[id].fEvent->SoftTrig = 0;
2217
2218
2219 for (ib = 0; ib < NBOARDS; ib++) {
2220 if (mBuffer[id].board[ib] == -1) { //board is not read
2221 mBuffer[id].FADhead[ib].start_package_flag = 0;
2222 mBuffer[id].fEvent->BoardTime[ib] = 0;
2223 } else {
2224 mBuffer[id].fEvent->BoardTime[ib] =
2225 mBuffer[id].FADhead[ib].time;
2226 }
2227 }
2228 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
2229 mBuffer[id].fEvent);
2230 gi.procTot++;
2231 numProc++;
2232
2233 if (i < 0) {
2234 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
2235 gi.procErr++;
2236 } else {
2237 evtCtrl.evtStat[k0] = 1000;
2238 runCtrl[j].procEvt++;
2239 }
2240 }
2241 }
2242 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) {
2243 numWait++;
2244 }
2245 }
2246
2247 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2248 factPrintf(kInfo, -1, "Exit Processing Process ...");
2249 gp_runStat = -22; //==> we should exit
2250 gj.procStat = -22; //==> we should exit
2251 return 0;
2252 }
2253
2254 if (numProc == 0) {
2255 //seems we have nothing to do, so sleep a little
2256 xwait.tv_sec = 0;
2257 xwait.tv_nsec = 1000; // sleep for ~1 usec
2258 nanosleep (&xwait, NULL);
2259 }
2260 gp_runStat = gi_runStat;
2261 gj.procStat = gj.readStat;
2262
2263 }
2264
2265 //we are asked to abort asap ==> must flag all remaining events
2266 // when gi_runStat claims that all events are in the buffer...
2267
2268 factPrintf(kInfo, -1, "Abort Processing Process ...");
2269 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2270 if (kd < 0)
2271 kd += (MAX_EVT * MAX_RUN);
2272
2273 for (int k = 0; k < gi_maxProc; k++) {
2274 pthread_join (thread[k], (void **) &status);
2275 }
2276
2277 int k1 = evtCtrl.frstPtr;
2278 for (int k = k1; k < (k1 + kd); k++) {
2279 int k0 = k % (MAX_EVT * MAX_RUN);
2280 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {
2281 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
2282 }
2283 }
2284
2285 gp_runStat = -99;
2286 gj.procStat = -99;
2287
2288 return 0;
2289
2290} /*-----------------------------------------------------------------*/
2291
2292int
2293CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
2294{
2295/* close run runId (all all runs if runId=0) */
2296/* return: 0=close scheduled / >0 already closed / <0 does not exist */
2297 int j;
2298
2299
2300 if (runId == 0) {
2301 for (j = 0; j < MAX_RUN; j++) {
2302 if (runCtrl[j].fileId == 0) { //run is open
2303 runCtrl[j].closeTime = closeTime;
2304 runCtrl[j].maxEvt = maxEvt;
2305 }
2306 }
2307 return 0;
2308 }
2309
2310 for (j = 0; j < MAX_RUN; j++) {
2311 if (runCtrl[j].runId == runId) {
2312 if (runCtrl[j].fileId == 0) { //run is open
2313 runCtrl[j].closeTime = closeTime;
2314 runCtrl[j].maxEvt = maxEvt;
2315 return 0;
2316 } else if (runCtrl[j].fileId < 0) { //run not yet opened
2317 runCtrl[j].closeTime = closeTime;
2318 runCtrl[j].maxEvt = maxEvt;
2319 return +1;
2320 } else { // run already closed
2321 return +2;
2322 }
2323 }
2324 } //we only reach here if the run was never created
2325 return -1;
2326
2327}
2328
2329void checkAndCloseRun(int j, int irun, int cond, char *where)
2330{
2331 if (!cond &&
2332 runCtrl[j].closeTime >= g_actTime &&
2333 runCtrl[j].lastTime >= g_actTime - 300 &&
2334 runCtrl[j].maxEvt >= runCtrl[j].actEvt)
2335 return;
2336
2337 //close run for whatever reason
2338 if (runCtrl[j].runId == gi_myRun)
2339 gi_myRun = g_actTime;
2340
2341 int ii = 0;
2342 if (cond)
2343 ii = 1;
2344 if (runCtrl[j].closeTime < g_actTime)
2345 ii |= 2; // = 2;
2346 if (runCtrl[j].lastTime < g_actTime - 300)
2347 ii |= 4; // = 3;
2348 if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2349 ii |= 8; // = 4;
2350
2351 if (runCtrl[j].procId == 0)
2352 {
2353 runFinish1(runCtrl[j].runId);
2354 runCtrl[j].procId = 92;
2355 }
2356
2357 runCtrl[j].closeTime = g_actTime - 1;
2358
2359 const int rc = runClose(runCtrl[j].fileHd, &runTail[j], sizeof(runTail[j]));
2360 if (rc<0)
2361 {
2362 factPrintf(kError, 503, "writeEvt-%s: Error closing run %d (runClose[1],rc=%d)",
2363 where, runCtrl[j].runId, rc);
2364 runCtrl[j].fileId = 92;
2365 }
2366 else
2367 {
2368 factPrintf(kInfo, 503, "writeEvt-%s: Closed run %d (ii=%d)",
2369 where, irun, ii);
2370 runCtrl[j].fileId = 93;
2371 }
2372}
2373
2374/*-----------------------------------------------------------------*/
2375
2376
2377void *
2378writeEvt (void *ptr)
2379{
2380/* *** main loop writing event (including opening and closing run-files */
2381
2382 int numWrite, numWait;
2383 int k, j ;
2384 struct timespec xwait;
2385
2386// cpu_set_t mask;
2387// int cpu = 1; //write thread
2388
2389 factPrintf(kInfo, -1, "Starting write-thread");
2390
2391/* CPU_ZERO initializes all the bits in the mask to zero. */
2392// CPU_ZERO (&mask);
2393/* CPU_SET sets only the bit corresponding to cpu. */
2394// CPU_SET (cpu, &mask);
2395/* sched_setaffinity returns 0 in success */
2396// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
2397// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
2398// }
2399
2400 int lastRun = 0; //usually run from last event still valid
2401
2402 while (g_runStat > -2) {
2403
2404 numWait = numWrite = 0;
2405 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2406 if (kd < 0)
2407 kd += (MAX_EVT * MAX_RUN);
2408
2409 int k1 = evtCtrl.frstPtr;
2410 for (k = k1; k < (k1 + kd); k++) {
2411 int k0 = k % (MAX_EVT * MAX_RUN);
2412//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
2413 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) {
2414
2415 if (gi_resetR > 1) { //we must drain the buffer asap
2416 evtCtrl.evtStat[k0] = 9904;
2417 } else {
2418
2419
2420 int id = evtCtrl.evtBuf[k0];
2421 uint32_t irun = mBuffer[id].runNum;
2422 int32_t ievt = mBuffer[id].evNum;
2423
2424 gi.wrtTot++;
2425 if (runCtrl[lastRun].runId == irun) {
2426 j = lastRun;
2427 } else {
2428 //check which fileID to use (or open if needed)
2429 for (j = 0; j < MAX_RUN; j++) {
2430 if (runCtrl[j].runId == irun)
2431 break;
2432 }
2433 if (j >= MAX_RUN) {
2434 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, id);
2435 gi.wrtErr++;
2436 }
2437 lastRun = j;
2438 }
2439
2440 if (runCtrl[j].fileId < 0) {
2441 actRun.Version = 1;
2442 actRun.RunType = -1; //to be adapted
2443
2444 actRun.Nroi = runCtrl[j].roi0;
2445 actRun.NroiTM = runCtrl[j].roi8;
2446//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
2447// if (actRun.Nroi == actRun.NroiTM)
2448// actRun.NroiTM = 0;
2449 actRun.RunTime = runCtrl[j].firstTime;
2450 actRun.RunUsec = runCtrl[j].firstTime;
2451 actRun.NBoard = NBOARDS;
2452 actRun.NPix = NPIX;
2453 actRun.NTm = NTMARK;
2454 actRun.Nroi = mBuffer[id].nRoi;
2455 memcpy (actRun.FADhead, mBuffer[id].FADhead,
2456 NBOARDS * sizeof (PEVNT_HEADER));
2457
2458 runCtrl[j].fileHd =
2459 runOpen (irun, &actRun, sizeof (actRun));
2460 if (runCtrl[j].fileHd == NULL) {
2461 factPrintf(kError, 502, "writeEvt: Could not open a file for run %d (runOpen failed)", irun);
2462 runCtrl[j].fileId = 91;
2463 } else {
2464 factPrintf(kInfo, -1, "writeEvt: Opened new file for run %d (evt %d)", irun, ievt);
2465 runCtrl[j].fileId = 0;
2466 }
2467
2468 }
2469
2470 if (runCtrl[j].fileId != 0) {
2471 if (runCtrl[j].fileId < 0) {
2472 factPrintf(kError, 123, "writeEvt: Never opened file for run %d", irun);
2473 } else if (runCtrl[j].fileId < 100) {
2474 factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun);
2475 runCtrl[j].fileId += 100;
2476 } else {
2477#ifdef EVTDEBUG
2478 factPrintf(kDebug, 123, "writeEvt: File for run %d is closed", irun);
2479#endif
2480 }
2481 evtCtrl.evtStat[k0] = 9903;
2482 gi.wrtErr++;
2483 } else {
2484// snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id]));
2485// factOut (kInfo, 504, str);
2486 const int rc = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
2487 sizeof (mBuffer[id]));
2488 if (rc >= 0) {
2489 runCtrl[j].lastTime = g_actTime;
2490 runCtrl[j].actEvt++;
2491 evtCtrl.evtStat[k0] = 9901;
2492#ifdef EVTDEBUG
2493 factPrintf(kDebug, 504, "%5d successfully wrote for run %d id %5d", ievt, irun, k0);
2494#endif
2495// gj.writEvt++ ;
2496 } else {
2497 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun);
2498 evtCtrl.evtStat[k0] = 9902;
2499 gi.wrtErr++;
2500 }
2501
2502 checkAndCloseRun(j, irun, rc<0, "1");
2503 }
2504 }
2505 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2506 numWait++;
2507 }
2508
2509 //check if we should close a run (mainly when no event pending)
2510 //ETIENNE but first figure out which one is the latest run with a complete event.
2511 //i.e. max run Id and lastEvt >= 0
2512 //this condition is sufficient because all pending events were written already in the loop just above
2513 //actrun
2514 uint32_t lastStartedTime = 0;
2515 uint32_t runIdFound = 0;
2516 if (actrun != 0)
2517 {//If we have an active run, look for its start time
2518 for (j=0;j<MAX_RUN;j++)
2519 {
2520 if (runCtrl[j].runId == actrun)
2521 {
2522 lastStartedTime = runCtrl[j].lastTime;
2523 runIdFound = 1;
2524 }
2525 }
2526 }
2527 else
2528 runIdFound = 1;
2529
2530 if (runIdFound == 0)
2531 {
2532 factPrintf(kInfo, 0, "An Active run (number %u) has been registered, but it could not be found in the runs list", actrun);
2533 }
2534// snprintf(str, MXSTR, "Maximum runId: %d", maxStartedRun);
2535// factOut(kInfo, 000, str);
2536 //Also check if some files will never be opened
2537 //EDIT: this is completely useless, because as run Numbers are taken from FADs board,
2538 //I will never get run numbers for which no file is to be opened
2539 for (j=0;j<MAX_RUN;j++)
2540 {
2541 if ((runCtrl[j].fileId < 0) &&
2542 (runCtrl[j].lastTime < lastStartedTime) &&
2543 (runCtrl[j].runId != 0))
2544 {
2545 factPrintf(kInfo, 0, "writeEvt: No file will be opened for run %ud. Last run: %ud (started)", runCtrl[j].runId, actrun);
2546 ;//TODO notify that this run will never be opened
2547 }
2548 }
2549 for (j = 0; j < MAX_RUN; j++)
2550 {
2551 if (runCtrl[j].fileId == 0)
2552 {
2553 //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
2554 const int cond = runCtrl[j].lastTime < lastStartedTime && runCtrl[j].runId != 0;
2555 checkAndCloseRun(j, runCtrl[j].runId, cond, "2");
2556 }
2557 }
2558
2559 if (numWrite == 0) {
2560 //seems we have nothing to do, so sleep a little
2561 xwait.tv_sec = 0;
2562 xwait.tv_nsec = 1000; // sleep for ~1 usec
2563 nanosleep (&xwait, NULL);
2564 }
2565
2566 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2567 factPrintf(kInfo, -1, "Finish Write Process ...");
2568 gw_runStat = -22; //==> we should exit
2569 gj.writStat = -22; //==> we should exit
2570 goto closerun;
2571 }
2572 gw_runStat = gi_runStat;
2573 gj.writStat = gj.readStat;
2574
2575 }
2576
2577 //must close all open files ....
2578 factPrintf(kInfo, -1, "Abort Writing Process ...");
2579
2580 closerun:
2581 factPrintf(kInfo, -1, "Close all open files ...");
2582 for (j = 0; j < MAX_RUN; j++)
2583 if (runCtrl[j].fileId == 0) {
2584 if (runCtrl[j].runId == gi_myRun)
2585 gi_myRun = g_actTime;
2586
2587 if (runCtrl[j].procId == 0) {
2588 runFinish1 (runCtrl[j].runId);
2589 runCtrl[j].procId = 92;
2590 }
2591
2592 runCtrl[j].closeTime = g_actTime - 1;
2593 int i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));
2594 int ii = 0;
2595 if (runCtrl[j].closeTime < g_actTime)
2596 ii |= 2; // = 2
2597 /*else*/ if (runCtrl[j].lastTime < g_actTime - 300)
2598 ii |= 4; // = 3
2599 /*else*/ if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2600 ii |= 8; // = 4
2601 if (i < 0) {
2602 factPrintf(kError, 506, "writeEvt: Error closing run %d (runClose[3],i=%d)", runCtrl[j].runId, i);
2603 runCtrl[j].fileId = 96;
2604 } else {
2605 factPrintf(kInfo, 507, "writeEvt: Closed run %d (ii[3]=%d)", runCtrl[j].runId, ii);
2606 runCtrl[j].fileId = 97;
2607 }
2608 }
2609
2610 gw_runStat = -99;
2611 gj.writStat = -99;
2612 factPrintf(kInfo, -1, "Exit Writing Process ...");
2613 return 0;
2614
2615
2616
2617
2618} /*-----------------------------------------------------------------*/
2619
2620
2621
2622
2623void
2624StartEvtBuild ()
2625{
2626
2627 int i, /*j,*/ imax, status/*, th_ret[50]*/;
2628 pthread_t thread[50];
2629 struct timespec xwait;
2630
2631 gi_runStat = gp_runStat = gw_runStat = 0;
2632 gj.readStat = gj.procStat = gj.writStat = 0;
2633
2634 factPrintf(kInfo, -1, "Starting EventBuilder V15.07 A");
2635
2636//initialize run control logics
2637 for (i = 0; i < MAX_RUN; i++) {
2638 runCtrl[i].runId = 0;
2639 runCtrl[i].fileId = -2;
2640 }
2641
2642//prepare for subProcesses
2643 gi_maxSize = g_maxSize;
2644 if (gi_maxSize <= 0)
2645 gi_maxSize = 1;
2646
2647 gi_maxProc = g_maxProc;
2648 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2649 factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc);
2650 gi_maxProc = 1;
2651 }
2652//partially initialize event control logics
2653 evtCtrl.frstPtr = 0;
2654 evtCtrl.lastPtr = 0;
2655
2656//start all threads (more to come) when we are allowed to ....
2657 while (g_runStat == 0) {
2658 xwait.tv_sec = 0;
2659 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2660 nanosleep (&xwait, NULL);
2661 }
2662
2663 i = 0;
2664 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL);
2665 i++;
2666 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL);
2667 i++;
2668 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL);
2669 i++;
2670 imax = i;
2671
2672
2673#ifdef BILAND
2674 xwait.tv_sec = 30;;
2675 xwait.tv_nsec = 0; // sleep for ~20sec
2676 nanosleep (&xwait, NULL);
2677
2678 printf ("close all runs in 2 seconds\n");
2679
2680 CloseRunFile (0, time (NULL) + 2, 0);
2681
2682 xwait.tv_sec = 1;;
2683 xwait.tv_nsec = 0; // sleep for ~20sec
2684 nanosleep (&xwait, NULL);
2685
2686 printf ("setting g_runstat to -1\n");
2687
2688 g_runStat = -1;
2689#endif
2690
2691
2692//wait for all threads to finish
2693 for (i = 0; i < imax; i++) {
2694 /*j =*/ pthread_join (thread[i], (void **) &status);
2695 }
2696
2697} /*-----------------------------------------------------------------*/
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713 /*-----------------------------------------------------------------*/
2714 /*-----------------------------------------------------------------*/
2715 /*-----------------------------------------------------------------*/
2716 /*-----------------------------------------------------------------*/
2717 /*-----------------------------------------------------------------*/
2718
2719#ifdef BILAND
2720
2721int
2722subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2723 int8_t * buffer)
2724{
2725 printf ("called subproc %d\n", threadID);
2726 return threadID + 1;
2727}
2728
2729
2730
2731
2732 /*-----------------------------------------------------------------*/
2733 /*-----------------------------------------------------------------*/
2734 /*-----------------------------------------------------------------*/
2735 /*-----------------------------------------------------------------*/
2736 /*-----------------------------------------------------------------*/
2737
2738
2739
2740
2741FileHandle_t
2742runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2743{
2744 return 1;
2745};
2746
2747int
2748runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2749{
2750 return 1;
2751 usleep (10000);
2752 return 1;
2753}
2754
2755
2756//{ return 1; } ;
2757
2758int
2759runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2760{
2761 return 1;
2762};
2763
2764
2765
2766
2767int
2768eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2769{
2770 int i = 0;
2771
2772// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2773// for (i=0; i<NBOARDS; i++) {
2774// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2775// }
2776 return 0;
2777}
2778
2779
2780void
2781factStatNew (EVT_STAT gi)
2782{
2783 int i;
2784
2785//for (i=0;i<MAX_SOCK;i++) {
2786// printf("%4d",gi.numRead[i]);
2787// if (i%20 == 0 ) printf("\n");
2788//}
2789}
2790
2791void
2792gotNewRun (int runnr, PEVNT_HEADER * headers)
2793{
2794 printf ("got new run %d\n", runnr);
2795 return;
2796}
2797
2798void
2799factStat (GUI_STAT gj)
2800{
2801// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2802// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2803}
2804
2805
2806void
2807debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2808 int state, uint32_t tsec, uint32_t tusec)
2809{
2810// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2811}
2812
2813
2814
2815void
2816debugStream (int isock, void *buf, int len)
2817{
2818}
2819
2820void
2821debugHead (int i, int j, void *buf)
2822{
2823}
2824
2825
2826void
2827factOut (int severity, int err, char *message)
2828{
2829 static FILE *fd;
2830 static int file = 0;
2831
2832 if (file == 0) {
2833 printf ("open file\n");
2834 fd = fopen ("x.out", "w+");
2835 file = 999;
2836 }
2837
2838 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2839
2840 if (severity != kDebug)
2841 printf ("%3d %3d | %s\n", severity, err, message);
2842}
2843
2844
2845
2846int
2847main ()
2848{
2849 int i, b, c, p;
2850 char ipStr[100];
2851 struct in_addr IPaddr;
2852
2853 g_maxMem = 1024 * 1024; //MBytes
2854//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2855 g_maxMem = g_maxMem * 200; //100MBytes
2856
2857 g_maxProc = 20;
2858 g_maxSize = 30000;
2859
2860 g_runStat = 40;
2861
2862 i = 0;
2863
2864// version for standard crates
2865//for (c=0; c<4,c++) {
2866// for (b=0; b<10; b++) {
2867// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2868//
2869// inet_pton(PF_INET, ipStr, &IPaddr) ;
2870//
2871// g_port[i].sockAddr.sin_family = PF_INET;
2872// g_port[i].sockAddr.sin_port = htons(5000) ;
2873// g_port[i].sockAddr.sin_addr = IPaddr ;
2874// g_port[i].sockDef = 1 ;
2875// i++ ;
2876// }
2877//}
2878//
2879//version for PC-test *
2880 for (c = 0; c < 4; c++) {
2881 for (b = 0; b < 10; b++) {
2882 sprintf (ipStr, "10.0.%d.11", 128 + c);
2883 if (c < 2)
2884 sprintf (ipStr, "10.0.%d.11", 128);
2885 else
2886 sprintf (ipStr, "10.0.%d.11", 131);
2887// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2888
2889 inet_pton (PF_INET, ipStr, &IPaddr);
2890 p = 31919 + 100 * c + 10 * b;
2891
2892
2893 g_port[i].sockAddr.sin_family = PF_INET;
2894 g_port[i].sockAddr.sin_port = htons (p);
2895 g_port[i].sockAddr.sin_addr = IPaddr;
2896 g_port[i].sockDef = 1;
2897
2898 i++;
2899 }
2900 }
2901
2902
2903//g_port[17].sockDef =-1 ;
2904//g_actBoards-- ;
2905
2906 StartEvtBuild ();
2907
2908 return 0;
2909
2910}
2911#endif
Note: See TracBrowser for help on using the repository browser.