source: trunk/FACT++/src/EventBuilder.c@ 12485

Last change on this file since 12485 was 12485, checked in by lyard, 13 years ago
Extreme memory swapping
File size: 91.9 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6
7#include <stdlib.h>
8#include <stdint.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include "EventBuilder.h"
26
27enum Severity
28{
29 kMessage = 10, ///< Just a message, usually obsolete
30 kInfo = 20, ///< An info telling something which can be interesting to know
31 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
32 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
33 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
34 kDebug = 99, ///< A message used for debugging only
35};
36
37#define MIN_LEN 32 // min #bytes needed to interpret FADheader
38#define MAX_LEN 256*1024 // size of read-buffer per socket
39
40//#define nanosleep(x,y)
41
42extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
43extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
44extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
45//extern int runFinish (uint32_t runnr);
46
47extern void factOut (int severity, int err, char *message);
48
49extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
50
51
52extern void factStat (GUI_STAT gj);
53
54extern void factStatNew (EVT_STAT gi);
55
56extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
57
58extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
59 int8_t * buffer);
60
61extern void debugHead (int i, int j, void *buf);
62
63extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
64 int32_t runnr, int state, uint32_t tsec,
65 uint32_t tusec);
66extern void debugStream (int isock, void *buf, int len);
67
68int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
69
70
71
72
73
74int g_maxProc;
75int g_maxSize;
76int gi_maxSize;
77int gi_maxProc;
78
79uint g_actTime;
80uint g_actUsec;
81int g_runStat;
82int g_reset;
83int g_useFTM;
84
85int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
86size_t g_maxMem; //maximum memory allowed for buffer
87
88//no longer needed ...
89int g_maxBoards; //maximum number of boards to be initialized
90int g_actBoards;
91//
92
93FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
94
95
96int gi_runStat;
97int gp_runStat;
98int gw_runStat;
99
100int gi_memStat = +1;
101
102uint32_t gi_myRun = 0;
103uint32_t actrun = 0;
104
105
106uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
107
108//uint gi_EvtStart= 0 ;
109//uint gi_EvtRead = 0 ;
110//uint gi_EvtBad = 0 ;
111//uint gi_EvtTot = 0 ;
112//size_t gi_usedMem = 0 ;
113
114//uint gw_EvtTot = 0 ;
115//uint gp_EvtTot = 0 ;
116
117PIX_MAP g_pixMap[NPIX];
118
119EVT_STAT gi;
120GUI_STAT gj;
121
122EVT_CTRL evtCtrl; //control of events during processing
123int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
124
125WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
126
127#define MXSTR 1000
128char str[MXSTR];
129
130//ETIENNE
131#define MAX_SLOTS_PER_CHUNK 100
132
133typedef struct {
134 int32_t eventNumber;
135 int32_t chunk;
136 int32_t slot;
137} CHUNK_MAPPING;
138
139CHUNK_MAPPING mBufferMapping[MAX_EVT * MAX_RUN];
140
141#define MAX_EVT_MEM (sizeof(EVENT) + NPIX*1024*2 + NTMARK*1024*2)
142#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
143#define MAX_SLOT_SIZE (MAX_EVT_MEM + MAX_HEAD_MEM)
144#define MAX_CHUNK_SIZE (MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK)
145
146typedef struct {
147 void * pointers[MAX_SLOTS_PER_CHUNK];
148 int32_t events[MAX_SLOTS_PER_CHUNK];
149 int32_t nFreeSlots;
150 int32_t nSlots;
151} ETI_CHUNK;
152
153int32_t numAllocatedChunks = 0;
154
155#define MAX_CHUNKS 8096
156ETI_CHUNK EtiMemoryChunks[MAX_CHUNKS];
157
158void* ETI_Malloc(int evtId, int evtIndex)
159{
160 int i,j;
161 for (i=0;i<numAllocatedChunks;i++) {
162 if (EtiMemoryChunks[i].nFreeSlots > 0) {
163 for (j=0;j<EtiMemoryChunks[i].nSlots;j++)
164 {
165 if (EtiMemoryChunks[i].events[j] == -1)
166 {
167 EtiMemoryChunks[i].events[j] = evtId;
168 EtiMemoryChunks[i].nFreeSlots--;
169 mBufferMapping[evtIndex].eventNumber = evtId;
170 mBufferMapping[evtIndex].chunk = i;
171 mBufferMapping[evtIndex].slot = j;
172 return EtiMemoryChunks[i].pointers[j];
173 }
174 }
175 //If I reach this point then we have a problem because it should have found
176 //a free spot just above.
177 }
178 }
179 //If we reach this point this means that we should allocate more memory
180 int32_t numNewSlots = 0;
181 if ((numAllocatedChunks + 1)*MAX_CHUNK_SIZE < g_maxMem)
182 numNewSlots = MAX_SLOTS_PER_CHUNK;
183 else
184 numNewSlots = (g_maxMem - numAllocatedChunks*MAX_CHUNK_SIZE)/MAX_SLOT_SIZE;
185
186 if (numNewSlots == 0)//cannot allocate more without exceeding the max mem limit
187 {
188 return NULL;
189 }
190
191 EtiMemoryChunks[numAllocatedChunks].pointers[0] = malloc(MAX_SLOT_SIZE*numNewSlots);
192 if (EtiMemoryChunks[numAllocatedChunks].pointers[0] == NULL)
193 {
194 snprintf (str, MXSTR, "could not allocate %lu bytes. %d chunks are currently allocated (max allowed %lu bytes)", MAX_CHUNK_SIZE, numAllocatedChunks, g_maxMem);
195 factOut (kError, 000, str);
196 return NULL;
197 }
198
199 EtiMemoryChunks[numAllocatedChunks].nSlots = numNewSlots;
200 EtiMemoryChunks[numAllocatedChunks].events[0] = evtId;
201 EtiMemoryChunks[numAllocatedChunks].nFreeSlots = numNewSlots-1;
202 mBufferMapping[evtIndex].eventNumber = evtId;
203 mBufferMapping[evtIndex].chunk = numAllocatedChunks;
204 mBufferMapping[evtIndex].slot = 0;
205
206 for (i=1;i<numNewSlots;i++)
207 {
208 EtiMemoryChunks[numAllocatedChunks].pointers[i] = &(((char*)(EtiMemoryChunks[numAllocatedChunks].pointers[i-1]))[MAX_SLOT_SIZE]);
209 EtiMemoryChunks[numAllocatedChunks].events[i] = -1;
210 }
211 numAllocatedChunks++;
212
213 return EtiMemoryChunks[numAllocatedChunks-1].pointers[0];
214}
215
216void ETI_Free(int evtId, int evtIndex)
217{
218 ETI_CHUNK* currentChunk = &EtiMemoryChunks[mBufferMapping[evtIndex].chunk];
219 if (currentChunk->events[mBufferMapping[evtIndex].slot] != evtId)
220 {
221 snprintf (str, MXSTR, "Mismatch in chunk mapping table. expected evtId %d. Got %d. No memory was freed.", evtId, currentChunk->events[mBufferMapping[evtIndex].slot]);
222 factOut (kError, 000, str);
223 return;
224 }
225 currentChunk->events[mBufferMapping[evtIndex].slot] = -1;
226 currentChunk->nFreeSlots++;
227
228 int chunkIndex = mBufferMapping[evtIndex].chunk;
229 if (chunkIndex != numAllocatedChunks-1)
230 return;
231
232 while (EtiMemoryChunks[chunkIndex].nFreeSlots == EtiMemoryChunks[chunkIndex].nSlots)
233 {//free this chunk
234 if (EtiMemoryChunks[chunkIndex].pointers[0] == NULL)
235 {
236 snprintf(str, MXSTR, "Chunk %d not allocated as it ought to be. Skipping release", chunkIndex);
237 factOut(kError, 000, str);
238 return;
239 }
240 free(EtiMemoryChunks[chunkIndex].pointers[0]);
241 EtiMemoryChunks[chunkIndex].pointers[0] = NULL;
242 numAllocatedChunks--;
243 chunkIndex--;
244 if (numAllocatedChunks == 0)
245 break;
246 }
247}
248//END ETIENNE
249
250
251
252RUN_HEAD actRun;
253
254RUN_CTRL runCtrl[MAX_RUN];
255
256RUN_TAIL runTail[MAX_RUN];
257
258
259/*
260*** Definition of rdBuffer to read in IP packets; keep it global !!!!
261 */
262
263
264typedef union
265{
266 int8_t B[MAX_LEN];
267 int16_t S[MAX_LEN / 2];
268 int32_t I[MAX_LEN / 4];
269 int64_t L[MAX_LEN / 8];
270} CNV_FACT;
271
272typedef struct
273{
274 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
275 int32_t bufPos; //next byte to read to the buffer next
276 int32_t bufLen; //number of bytes left to read
277// size_t bufLen; //number of bytes left to read size_t might be better
278 int32_t skip; //number of bytes skipped before start of event
279
280 int errCnt; //how often connect failed since last successful
281 int sockStat; //-1 if socket not yet connected , 99 if not exist
282 int socket; //contains the sockets
283 struct sockaddr_in SockAddr; //IP for each socket
284
285 int evtID; // event ID of event currently read
286 int runID; // run "
287 int ftmID; // event ID from FTM
288 uint fadLen; // FADlength of event currently read
289 int fadVers; // Version of FAD
290 int ftmTyp; // trigger type
291 int board; // boardID (softwareID: 0..40 )
292 int Port;
293
294 CNV_FACT *rBuf;
295
296#ifdef EVTDEBUG
297 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
298#endif
299
300} READ_STRUCT;
301
302
303typedef union
304{
305 int8_t B[2];
306 int16_t S;
307} SHORT_BYTE;
308
309
310
311
312
313SHORT_BYTE start, stop;
314
315READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
316
317
318
319/*-----------------------------------------------------------------*/
320
321
322/*-----------------------------------------------------------------*/
323
324int
325runFinish1 (uint32_t runnr)
326{
327 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
328 runnr);
329 factOut (kInfo, 173, str); //but continue anyhow
330 return 0;
331}
332int
333runFinish (uint32_t runnr)
334{
335 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
336 runnr);
337 factOut (kInfo, 173, str); //but continue anyhow
338 return 0;
339}
340
341int
342GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
343 READ_STRUCT * rd)
344{
345/*
346*** generate Address, create sockets and allocates readbuffer for it
347***
348*** if flag==0 generate socket and buffer
349*** <0 destroy socket and buffer
350*** >0 close and redo socket
351***
352*** sid : board*7 + port id
353 */
354
355 int j;
356 int optval = 1; //activate keepalive
357 socklen_t optlen = sizeof (optval);
358
359
360 if (sid % 7 >= NUMSOCK) {
361 //this is a not used socket, so do nothing ...
362 rd->sockStat = 77;
363 rd->rBuf = NULL ;
364 return 0;
365 }
366
367 if (rd->sockStat == 0) { //close socket if open
368 j = close (rd->socket);
369 if (j > 0) {
370 snprintf (str, MXSTR, "Error closing socket %d | %m", sid);
371 factOut (kFatal, 771, str);
372 } else {
373 snprintf (str, MXSTR, "Succesfully closed socket %d", sid);
374 factOut (kInfo, 771, str);
375 }
376 }
377
378 rd->sockStat = 99;
379
380 if (flag < 0) {
381 free (rd->rBuf); //and never open again
382#ifdef EVTDEBUG
383 free (rd->xBuf); //and never open again
384#endif
385 rd->rBuf = NULL;
386 return 0;
387 }
388
389
390 if (flag == 0) { //generate address and buffer ...
391 rd->Port = port;
392 rd->SockAddr.sin_family = sockAddr->sin_family;
393 rd->SockAddr.sin_port = htons (port);
394 rd->SockAddr.sin_addr = sockAddr->sin_addr;
395
396#ifdef EVTDEBUG
397 rd->xBuf = malloc (sizeof (CNV_FACT));
398#endif
399 rd->rBuf = malloc (sizeof (CNV_FACT));
400 if (rd->rBuf == NULL) {
401 snprintf (str, MXSTR, "Could not create local buffer %d", sid);
402 factOut (kFatal, 774, str);
403 rd->sockStat = 77;
404 return -3;
405 }
406 }
407
408
409 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
410 snprintf (str, MXSTR, "Could not generate socket %d | %m", sid);
411 factOut (kFatal, 773, str);
412 rd->sockStat = 88;
413 return -2;
414 }
415 optval = 1;
416 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
417 snprintf (str, MXSTR, "Could not set keepalive %d | %m", sid);
418 factOut (kInfo, 173, str); //but continue anyhow
419 }
420 optval = 10; //start after 10 seconds
421 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
422 snprintf (str, MXSTR, "Could not set keepidle %d | %m", sid);
423 factOut (kInfo, 173, str); //but continue anyhow
424 }
425 optval = 10; //do every 10 seconds
426 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
427 snprintf (str, MXSTR, "Could not set keepintvl %d | %m", sid);
428 factOut (kInfo, 173, str); //but continue anyhow
429 }
430 optval = 2; //close after 2 unsuccessful tries
431 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
432 snprintf (str, MXSTR, "Could not set keepalive probes %d | %m", sid);
433 factOut (kInfo, 173, str); //but continue anyhow
434 }
435
436
437 snprintf (str, MXSTR, "Successfully generated socket %d ", sid);
438 factOut (kInfo, 773, str);
439 rd->sockStat = -1; //try to (re)open socket
440 rd->errCnt = 0;
441 return 0;
442
443} /*-----------------------------------------------------------------*/
444
445 /*-----------------------------------------------------------------*/
446
447
448
449
450int
451mBufInit ()
452{
453// initialize mBuffer (mark all entries as unused\empty)
454
455 int i,j;
456 uint32_t actime;
457
458 actime = g_actTime + 50000000;
459
460 for (i = 0; i < MAX_EVT * MAX_RUN; i++) {
461 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
462 mBuffer[i].runNum = 0;
463
464 evtCtrl.evtBuf[i] = -1;
465 evtCtrl.evtStat[i] = -1;
466 evtCtrl.pcTime[i] = actime; //initiate to far future
467
468 //ETIENNE
469 mBufferMapping[i].chunk = -1;
470 mBufferMapping[i].eventNumber = -1;
471 mBufferMapping[i].slot = -1;
472 //END ETIENNE
473 }
474 for (j=0;j<MAX_CHUNKS;j++)
475 EtiMemoryChunks[j].pointers[0] = NULL;
476
477 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
478
479 evtCtrl.frstPtr = 0;
480 evtCtrl.lastPtr = 0;
481
482 return 0;
483
484} /*-----------------------------------------------------------------*/
485
486
487
488
489int
490mBufEvt (int evID, uint runID, int nRoi[], int sk,
491 int fadlen, int trgTyp, int trgNum, int fadNum)
492{
493// generate a new Event into mBuffer:
494// make sure only complete Event are possible, so 'free' will always work
495// returns index into mBuffer[], or negative value in case of error
496// error: <-9000 if roi screwed up (not consistent with run)
497// <-8000 (not consistent with event)
498// <-7000 (not consistent with board)
499// < 0 if no space left
500
501 struct timeval *tv, atv;
502 tv = &atv;
503 uint32_t tsec, tusec;
504 uint oldest;
505 int jold;
506
507 int i, k, jr, b, evFree;
508 int headmem = 0;
509 size_t needmem = 0;
510
511
512 b = sk / 7;
513
514 if (nRoi[0] < 0 || nRoi[0] > 1024) {
515 snprintf (str, MXSTR, "illegal nRoi[0]: %d", nRoi[0]);
516 factOut (kError, 999, str);
517 gj.badRoiR++;
518 gj.badRoi[b]++;
519 return -9999;
520 }
521
522 for (jr = 1; jr < 8; jr++) {
523 if (nRoi[jr] != nRoi[0]) {
524 snprintf (str, MXSTR, "wrong nRoi[%d]: %d %d", jr, nRoi[jr],
525 nRoi[0]);
526 factOut (kError, 711, str);
527 gj.badRoiB++;
528 gj.badRoi[b]++;
529 return -7101;
530 }
531 }
532 if (nRoi[8] < nRoi[0]) {
533 snprintf (str, MXSTR, "wrong nRoi_TM: %d %d", nRoi[8], nRoi[0]);
534 factOut (kError, 712, str);
535 gj.badRoiB++;
536 gj.badRoi[b]++;
537 return -7102;
538 }
539
540
541 i = evID % MAX_EVT;
542 evFree = -1;
543
544 for (k = 0; k < MAX_RUN; k++) {
545 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
546 // is it ok ????
547 if (mBuffer[i].nRoi != nRoi[0]
548 || mBuffer[i].nRoiTM != nRoi[8]) {
549 snprintf (str, MXSTR, "illegal evt_roi %d %d ; %d %d",
550 nRoi[0], nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM);
551 factOut (kError, 821, str);
552 gj.badRoiE++;
553 gj.badRoi[b]++;
554 return -8201;
555 }
556// count for inconsistencies
557
558 if (mBuffer[i].trgNum != trgNum)
559 mBuffer[i].Errors[0]++;
560 if (mBuffer[i].fadNum != fadNum)
561 mBuffer[i].Errors[1]++;
562 if (mBuffer[i].trgTyp != trgTyp)
563 mBuffer[i].Errors[2]++;
564
565 //everything seems fine so far ==> use this slot ....
566 return i;
567 }
568 if (evFree < 0 && mBuffer[i].evNum < 0)
569 evFree = i;
570 i += MAX_EVT;
571 }
572
573
574 //event does not yet exist; create it
575 if (evFree < 0) { //no space available in ctrl
576 snprintf (str, MXSTR, "no control slot to keep event %d", evID);
577 factOut (kError, 881, str);
578 return -1;
579 }
580 i = evFree; //found free entry; use it ...
581
582 gettimeofday (tv, NULL);
583 tsec = atv.tv_sec;
584 tusec = atv.tv_usec;
585
586 //check if runId already registered in runCtrl
587 evFree = -1;
588 oldest = g_actTime + 1000;
589 jold = -1;
590 for (k = 0; k < MAX_RUN; k++) {
591 if (runCtrl[k].runId == runID) {
592// if (runCtrl[k].procId > 0) { //run is closed -> reject
593// snprintf (str, MXSTR, "skip event since run %d finished", runID);
594// factOut (kInfo, 931, str);
595// return -21;
596// }
597
598 if (runCtrl[k].roi0 != nRoi[0]
599 || runCtrl[k].roi8 != nRoi[8]) {
600 snprintf (str, MXSTR, "illegal run_roi %d %d ; %d %d",
601 nRoi[0], nRoi[8], runCtrl[k].roi0, runCtrl[k].roi8);
602 factOut (kError, 931, str);
603 gj.badRoiR++;
604 gj.badRoi[b]++;
605 return -9301;
606 }
607 goto RUNFOUND;
608 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
609 evFree = k;
610 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
611 if (runCtrl[k].closeTime < oldest) {
612 oldest = runCtrl[k].closeTime;
613 jold = k;
614 }
615 }
616 }
617
618 if (evFree < 0 && jold < 0) {
619 snprintf (str, MXSTR, "not able to register the new run %d", runID);
620 factOut (kFatal, 883, str);
621 return -1001;
622 } else {
623 if (evFree < 0)
624 evFree = jold;
625 snprintf (str, MXSTR, "register new run %d(%d) roi: %d %d", runID,
626 evFree, nRoi[0], nRoi[8]);
627 factOut (kInfo, 503, str);
628 runCtrl[evFree].runId = runID;
629 runCtrl[evFree].roi0 = nRoi[0];
630 runCtrl[evFree].roi8 = nRoi[8];
631 runCtrl[evFree].fileId = -2;
632 runCtrl[evFree].procId = -2;
633 runCtrl[evFree].lastEvt = -1;
634 runCtrl[evFree].nextEvt = 0;
635 runCtrl[evFree].actEvt = 0;
636 runCtrl[evFree].procEvt = 0;
637 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
638 runCtrl[evFree].firstUsec = tusec;
639 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
640 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
641// runCtrl[evFree].lastTime = 0;
642
643 runTail[evFree].nEventsOk =
644 runTail[evFree].nEventsRej =
645 runTail[evFree].nEventsBad =
646 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
647 }
648
649 RUNFOUND:
650 //ETIENNE
651/* needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
652
653 headmem = NBOARDS * sizeof (PEVNT_HEADER);
654
655 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
656 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
657 if (gi_memStat > 0) {
658 gi_memStat = -99;
659 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
660 evID, sk);
661 factOut (kError, 882, str);
662 } else {
663 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
664 evID, sk);
665 factOut (kDebug, 882, str);
666 }
667 return -11;
668 }
669*/
670 mBuffer[i].FADhead = ETI_Malloc(evID, i);
671 mBuffer[i].buffer = NULL;
672 if (mBuffer[i].FADhead != NULL)
673 mBuffer[i].fEvent = (EVENT*)&(((char*)(mBuffer[i].FADhead))[MAX_HEAD_MEM]);
674 else
675 {
676 mBuffer[i].fEvent = NULL;
677 gj.usdMem = 0;
678 for (k=0;k<numAllocatedChunks;k++)
679 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
680 if (gj.usdMem > gj.maxMem)
681 gj.maxMem = gj.usdMem;
682 if (gi_memStat > 0) {
683 gi_memStat = -99;
684 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
685 evID, sk);
686 factOut (kError, 882, str);
687 } else {
688 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
689 evID, sk);
690 factOut (kDebug, 882, str);
691 }
692 return -11;
693 }
694 /*
695 mBuffer[i].FADhead = malloc (headmem);
696 if (mBuffer[i].FADhead == NULL) {
697 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
698 factOut (kError, 882, str);
699 return -12;
700 }
701
702 mBuffer[i].fEvent = malloc (needmem);
703 if (mBuffer[i].fEvent == NULL) {
704 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
705 factOut (kError, 882, str);
706 free (mBuffer[i].FADhead);
707 mBuffer[i].FADhead = NULL;
708 return -22;
709 }
710
711 mBuffer[i].buffer = malloc (gi_maxSize);
712 if (mBuffer[i].buffer == NULL) {
713 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
714 factOut (kError, 882, str);
715 free (mBuffer[i].FADhead);
716 mBuffer[i].FADhead = NULL;
717 free (mBuffer[i].fEvent);
718 mBuffer[i].fEvent = NULL;
719 return -32;
720 }*/
721 //END ETIENNE
722 //flag all boards as unused
723 mBuffer[i].nBoard = 0;
724 for (k = 0; k < NBOARDS; k++) {
725 mBuffer[i].board[k] = -1;
726 }
727 //flag all pixels as unused
728 for (k = 0; k < NPIX; k++) {
729 mBuffer[i].fEvent->StartPix[k] = -1;
730 }
731 //flag all TMark as unused
732 for (k = 0; k < NTMARK; k++) {
733 mBuffer[i].fEvent->StartTM[k] = -1;
734 }
735
736 mBuffer[i].fEvent->NumBoards = 0;
737 mBuffer[i].fEvent->PCUsec = tusec;
738 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
739 mBuffer[i].nRoi = nRoi[0];
740 mBuffer[i].nRoiTM = nRoi[8];
741 mBuffer[i].evNum = evID;
742 mBuffer[i].runNum = runID;
743 mBuffer[i].fadNum = fadNum;
744 mBuffer[i].trgNum = trgNum;
745 mBuffer[i].trgTyp = trgTyp;
746 mBuffer[i].evtLen = needmem;
747 mBuffer[i].Errors[0] =
748 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
749//ETIENNE
750 //gj.usdMem += needmem + headmem + gi_maxSize;
751 gj.usdMem = 0;
752 for (k=0;k<numAllocatedChunks;k++)
753 {
754 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
755 }
756//END ETIENNE
757 if (gj.usdMem > gj.maxMem)
758 gj.maxMem = gj.usdMem;
759
760 gj.bufTot++;
761 if (gj.bufTot > gj.maxEvt)
762 gj.maxEvt = gj.bufTot;
763
764 gj.rateNew++;
765
766 //register event in 'active list (reading)'
767
768 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
769 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
770 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
771 evtIdx[i] = evtCtrl.lastPtr;
772
773
774 snprintf (str, MXSTR,
775 "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", evID,
776 runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
777 factOut (kDebug, -11, str);
778 evtCtrl.lastPtr++;
779 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN)
780 evtCtrl.lastPtr = 0;
781
782 gi.evtGet++;
783
784 return i;
785
786} /*-----------------------------------------------------------------*/
787
788
789
790
791int
792mBufFree (int i)
793{
794//delete entry [i] from mBuffer:
795//(and make sure multiple calls do no harm ....)
796
797 int headmem = 0;
798 int evid;
799 size_t freemem = 0;
800
801 evid = mBuffer[i].evNum;
802 freemem = mBuffer[i].evtLen;
803 //ETIENNE
804 ETI_Free(evid, i);
805// free (mBuffer[i].fEvent);
806 mBuffer[i].fEvent = NULL;
807
808// free (mBuffer[i].FADhead);
809 mBuffer[i].FADhead = NULL;
810
811// free (mBuffer[i].buffer);
812 mBuffer[i].buffer = NULL;
813//END ETIENNE
814 headmem = NBOARDS * sizeof (PEVNT_HEADER);
815 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
816 mBuffer[i].runNum = 0;
817//ETIENNE
818// gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
819 gj.usdMem = 0;
820 int k;
821 for (k=0;k<numAllocatedChunks;k++)
822 {
823 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
824 }
825//END ETIENNE
826 gj.bufTot--;
827
828 if (gi_memStat < 0) {
829 if (gj.usdMem <= 0.75 * gj.maxMem)
830 gi_memStat = +1;
831 }
832
833
834 return 0;
835
836} /*-----------------------------------------------------------------*/
837
838
839void
840resetEvtStat ()
841{
842 int i;
843
844 for (i = 0; i < MAX_SOCK; i++)
845 gi.numRead[i] = 0;
846
847 for (i = 0; i < NBOARDS; i++) {
848 gi.gotByte[i] = 0;
849 gi.gotErr[i] = 0;
850
851 }
852
853 gi.evtGet = 0; //#new Start of Events read
854 gi.evtTot = 0; //#complete Events read
855 gi.evtErr = 0; //#Events with Errors
856 gi.evtSkp = 0; //#Events incomplete (timeout)
857
858 gi.procTot = 0; //#Events processed
859 gi.procErr = 0; //#Events showed problem in processing
860 gi.procTrg = 0; //#Events accepted by SW trigger
861 gi.procSkp = 0; //#Events rejected by SW trigger
862
863 gi.feedTot = 0; //#Events used for feedBack system
864 gi.feedErr = 0; //#Events rejected by feedBack
865
866 gi.wrtTot = 0; //#Events written to disk
867 gi.wrtErr = 0; //#Events with write-error
868
869 gi.runOpen = 0; //#Runs opened
870 gi.runClose = 0; //#Runs closed
871 gi.runErr = 0; //#Runs with open/close errors
872
873 return;
874} /*-----------------------------------------------------------------*/
875
876
877
878void
879initReadFAD ()
880{
881 return;
882} /*-----------------------------------------------------------------*/
883
884
885
886void *
887readFAD (void *ptr)
888{
889/* *** main loop reading FAD data and sorting them to complete events */
890 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k;
891 int actBoards = 0, minLen;
892 int32_t jrd;
893 uint gi_SecTime; //time in seconds
894 int boardId, roi[9], drs, px, src, pixS, pixH, pixC, pixR, tmS;
895
896 int goodhed = 0;
897
898 int sockDef[NBOARDS]; //internal state of sockets
899 int jrdx;
900
901
902 struct timespec xwait;
903
904
905 struct timeval *tv, atv;
906 tv = &atv;
907 uint32_t tsec, tusec;
908
909
910 snprintf (str, MXSTR, "start initializing");
911 factOut (kInfo, -1, str);
912
913 int cpu = 7; //read thread
914 cpu_set_t mask;
915
916/* CPU_ZERO initializes all the bits in the mask to zero. */
917// CPU_ZERO (&mask);
918/* CPU_SET sets only the bit corresponding to cpu. */
919 cpu = 7;
920// CPU_SET (cpu, &mask);
921
922/* sched_setaffinity returns 0 in success */
923// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
924// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
925// factOut (kWarn, -1, str);
926// }
927
928
929 head_len = sizeof (PEVNT_HEADER);
930 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
931 minLen = head_len; //min #bytes needed to check header: full header for debug
932
933 start.S = 0xFB01;
934 stop.S = 0x04FE;
935
936/* initialize run control logics */
937 for (i = 0; i < MAX_RUN; i++) {
938 runCtrl[i].runId = 0;
939 runCtrl[i].fileId = -2;
940 runCtrl[i].procId = -2;
941 }
942 gi_resetS = gi_resetR = 9;
943 for (i = 0; i < NBOARDS; i++)
944 sockDef[i] = 0;
945
946 START:
947 gettimeofday (tv, NULL);
948 g_actTime = tsec = atv.tv_sec;
949 g_actUsec = tusec = atv.tv_usec;
950 gi_myRun = g_actTime;
951 evtCtrl.frstPtr = 0;
952 evtCtrl.lastPtr = 0;
953
954 gi_SecTime = g_actTime;
955 gi_runStat = g_runStat;
956 gj.readStat = g_runStat;
957 numok = numok2 = 0;
958
959 int cntsock = 8 - NUMSOCK ;
960
961 if (gi_resetS > 0) {
962 //make sure all sockets are preallocated as 'not exist'
963 for (i = 0; i < MAX_SOCK; i++) {
964 rd[i].socket = -1;
965 rd[i].sockStat = 99;
966 }
967
968 for (k = 0; k < NBOARDS; k++) {
969 gi_NumConnect[k] = 0;
970 gi.numConn[k] = 0;
971 gj.numConn[k] = 0;
972 gj.errConn[k] = 0;
973 gj.rateBytes[k] = 0;
974 gj.totBytes[k] = 0;
975 }
976
977 }
978
979
980 if (gi_resetR > 0) {
981 resetEvtStat ();
982 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
983 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
984 gj.totMem = g_maxMem;
985 gj.bufNew = gj.bufEvt = 0;
986 gj.badRoiE = gj.badRoiR = gj.badRoiB =
987 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
988
989 int b,p;
990 for (b = 0; b < NBOARDS; b++)
991 gj.badRoi[b] = 0;
992
993 mBufInit (); //initialize buffers
994
995 snprintf (str, MXSTR, "end initializing");
996 factOut (kInfo, -1, str);
997 }
998
999
1000 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
1001
1002 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
1003
1004 gi_runStat = g_runStat;
1005 gj.readStat = g_runStat;
1006 gettimeofday (tv, NULL);
1007 g_actTime = tsec = atv.tv_sec;
1008 g_actUsec = tusec = atv.tv_usec;
1009
1010
1011 int b, p, p0, s0, nch;
1012 nch = 0;
1013 for (b = 0; b < NBOARDS; b++) {
1014 k = b * 7;
1015 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
1016 nch++;
1017 gi_NumConnect[b] = 0; //must close all connections
1018 gi.numConn[b] = 0;
1019 gj.numConn[b] = 0;
1020 if (sockDef[b] == 0)
1021 s0 = 0; //sockets to be defined and opened
1022 else if (g_port[b].sockDef == 0)
1023 s0 = -1; //sockets to be destroyed
1024 else
1025 s0 = +1; //sockets to be closed and reopened
1026
1027 if (s0 == 0)
1028 p0 = ntohs (g_port[b].sockAddr.sin_port);
1029 else
1030 p0 = 0;
1031
1032 for (p = p0 + 1; p < p0 + 8; p++) {
1033 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
1034 k++;
1035 }
1036 sockDef[b] = g_port[b].sockDef;
1037 }
1038 }
1039
1040 if (nch > 0) {
1041 actBoards = 0;
1042 for (b = 0; b < NBOARDS; b++) {
1043 if (sockDef[b] > 0)
1044 actBoards++;
1045 }
1046 }
1047
1048
1049 jrdx = 0;
1050 numokx = 0;
1051 numok = 0; //count number of succesfull actions
1052
1053 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read
1054 b = i / 7 ;
1055 p = i % 7 ;
1056
1057if ( p >= NUMSOCK) { ; }
1058else {
1059 if (sockDef[b] > 0)
1060 s0 = +1;
1061 else
1062 s0 = -1;
1063
1064 if (rd[i].sockStat < 0) { //try to connect if not yet done
1065 if (rd[i].sockStat == -1) {
1066 rd[i].sockStat = connect (rd[i].socket,
1067 (struct sockaddr *) &rd[i].SockAddr,
1068 sizeof (rd[i].SockAddr));
1069 if (rd[i].sockStat == -1) {
1070 rd[i].errCnt++ ;
1071// if (rd[i].errCnt < 100) rd[i].sockStat = rd[i].errCnt ;
1072// else if (rd[i].errCnt < 1000) rd[i].sockStat = 1000 ;
1073// else rd[i].sockStat = 10000 ;
1074 }
1075//printf("try to connect %d -> %d\n",i,rd[i].sockStat);
1076
1077 }
1078 if (rd[i].sockStat < -1 ) {
1079 rd[i].sockStat++ ;
1080 }
1081 if (rd[i].sockStat == 0) { //successfull ==>
1082 if (sockDef[b] > 0) {
1083 rd[i].bufTyp = 0; // expect a header
1084 rd[i].bufLen = frst_len; // max size to read at begining
1085 } else {
1086 rd[i].bufTyp = -1; // full data to be skipped
1087 rd[i].bufLen = MAX_LEN; //huge for skipping
1088 }
1089 rd[i].bufPos = 0; // no byte read so far
1090 rd[i].skip = 0; // start empty
1091// gi_NumConnect[b]++;
1092 gi_NumConnect[b] += cntsock ;
1093
1094 gi.numConn[b]++;
1095 gj.numConn[b]++;
1096 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);
1097 factOut (kInfo, -1, str);
1098 }
1099 }
1100
1101 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
1102 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
1103 numok++;
1104 size_t maxread = rd[i].bufLen ;
1105 if (maxread > MAXREAD ) maxread=MAXREAD ;
1106
1107 jrd =
1108 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
1109 maxread, MSG_DONTWAIT);
1110// rd[i].bufLen, MSG_DONTWAIT);
1111 if (jrd > 0) {
1112 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
1113#ifdef EVTDEBUG
1114 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
1115 &rd[i].rBuf->B[rd[i].bufPos], jrd);
1116 snprintf (str, MXSTR,
1117 "read sock %3d bytes %5d len %5d first %d %d", i,
1118 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos],
1119 rd[i].rBuf->B[rd[i].bufPos + 1]);
1120 factOut (kDebug, 301, str);
1121#endif
1122 }
1123
1124 if (jrd == 0) { //connection has closed ...
1125 snprintf (str, MXSTR, "Socket %d closed by FAD", i);
1126 factOut (kInfo, 441, str);
1127 GenSock (s0, i, 0, NULL, &rd[i]);
1128 gi.gotErr[b]++;
1129// gi_NumConnect[b]--;
1130 gi_NumConnect[b]-= cntsock ;
1131 gi.numConn[b]--;
1132 gj.numConn[b]--;
1133
1134 } else if (jrd < 0) { //did not read anything
1135 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1136 snprintf (str, MXSTR, "Error Reading from %d | %m", i);
1137 factOut (kError, 442, str);
1138 gi.gotErr[b]++;
1139 } else
1140 numok--; //else nothing waiting to be read
1141 jrd = 0;
1142 }
1143 } else {
1144 jrd = 0; //did read nothing as requested
1145 snprintf (str, MXSTR, "do not read from socket %d %d", i,
1146 rd[i].bufLen);
1147 factOut (kDebug, 301, str);
1148 }
1149
1150 gi.gotByte[b] += jrd;
1151 gj.rateBytes[b] += jrd;
1152
1153 if (jrd > 0) {
1154 numokx++;
1155 jrdx += jrd;
1156 }
1157
1158
1159 if (rd[i].bufTyp < 0) { // we are skipping this board ...
1160// just do nothing
1161#ifdef EVTDEBUG
1162 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
1163 i);
1164 factOut (kInfo, 301, str);
1165#endif
1166
1167 } else if (rd[i].bufTyp > 0) { // we are reading data ...
1168 if (jrd < rd[i].bufLen) { //not yet all read
1169 rd[i].bufPos += jrd; //==> prepare for continuation
1170 rd[i].bufLen -= jrd;
1171 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
1172 } else { //full dataset read
1173 rd[i].bufLen = 0;
1174 rd[i].bufPos = rd[i].fadLen;
1175 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
1176 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
1177 gi.evtErr++;
1178 snprintf (str, MXSTR,
1179 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
1180 i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0],
1181 rd[i].rBuf->B[1],
1182 rd[i].rBuf->B[rd[i].bufPos - 2],
1183 rd[i].rBuf->B[rd[i].bufPos - 1]);
1184 factOut (kError, 301, str);
1185 goto EndBuf;
1186
1187#ifdef EVTDEBUG
1188 } else {
1189 snprintf (str, MXSTR,
1190 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
1191 i, rd[i].fadLen, rd[i].rBuf->B[0],
1192 rd[i].rBuf->B[1], start.B[1], start.B[0],
1193 rd[i].rBuf->B[rd[i].bufPos - 2],
1194 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
1195 stop.B[0]);
1196 factOut (kDebug, 301, str);
1197#endif
1198 }
1199
1200 if (jrd > 0)
1201 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1202
1203/* //we have a complete buffer, copy to WORK area
1204 int jr, kr;
1205 int checkRoi;
1206 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1207 for (kr = 1; kr < 4; kr++)
1208 {
1209 checkRoi = ntohs(rd[i].rBuf->S[head_len/ 2 +
1210 + kr*(roi[jr-1]+4)]);
1211 if (checkRoi != roi[0])
1212 {
1213 snprintf (str, MXSTR, "Inconsistent Roi accross board patches");
1214 factOut (kFatal, 1, str);
1215 goto EndBuf;
1216 }
1217
1218 }
1219 for (jr = 1; jr < 9; jr++) {
1220 roi[jr] =
1221 ntohs (rd[i].
1222 rBuf->S[head_len / 2 + 2 + jr * (roi[jr-1] + 4)]);
1223
1224 }
1225*/
1226 //we have a complete buffer, copy to WORK area
1227
1228
1229 //End of the header. to channels now
1230 int eCount = 0;
1231 int eStart = 36;
1232 int ePatchesCount = 0;
1233 int currentRoi;
1234 for (;ePatchesCount<4*9;ePatchesCount++)
1235 {
1236 rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//id
1237 eCount++;
1238 rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//start_cell
1239 eCount++;
1240 rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//roi
1241 currentRoi = rd[i].rBuf->S[eStart+eCount];
1242 eCount++;
1243 rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//filling
1244 eCount++;
1245 eCount+=currentRoi;//skip the pixel data
1246 }
1247 // snprintf(str, MXSTR, "Swapped %d bytes", eCount*2);
1248 // factOut(kError, 000, str);
1249
1250 //channels done. footer now
1251 // rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//package_crc
1252 // eCount++;
1253 // rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//end_package_flag
1254 // snprintf(str, MXSTR, "Bytes read %d bytes swapped %d", jrd, eCount*2);
1255 // factOut(kInfo, 000, str);
1256 //ETIENNE end of bytes swapping already
1257
1258
1259 int jr, kr;
1260 int checkRoi;
1261 int roiHopper = head_len/2 + 2; //points to the very first roi
1262 roi[0] = rd[i].rBuf->S[roiHopper];
1263 roiHopper += roi[0]+4;//skip to the second roi (i.e. next board, same patch-pixel)
1264 for (kr = 1; kr < 4; kr++)
1265 {
1266 checkRoi = rd[i].rBuf->S[roiHopper];
1267 if (checkRoi != roi[0])
1268 {
1269 snprintf (str, MXSTR, "Inconsistent Roi accross board patches a %d %d %d", kr, checkRoi, roi[0]);
1270 factOut (kError, 1, str);
1271 goto EndBuf;
1272 }
1273 roiHopper += checkRoi+4;
1274 }
1275 //roiHopper now points to the first pixel of board 2. Do the 8 remaining pixels
1276 for (jr = 1; jr < 9; jr++) {
1277 roi[jr] = rd[i].rBuf->S[roiHopper];
1278 checkRoi = roi[jr];
1279 for (kr = 1; kr < 4; kr++)
1280 {
1281 roiHopper += checkRoi+4;
1282 checkRoi = rd[i].rBuf->S[roiHopper];
1283 if (checkRoi != roi[jr])
1284 {
1285 snprintf (str, MXSTR, "Inconsistent Roi accross board patches B=%d P=%d %d %d", kr, jr, roi[jr], checkRoi);
1286 factOut (kFatal, 1, str);
1287 goto EndBuf;
1288 }
1289 }
1290 }
1291 //get index into mBuffer for this event (create if needed)
1292
1293 int actid;
1294 if (g_useFTM > 0)
1295 actid = rd[i].evtID;
1296 else
1297 actid = rd[i].ftmID;
1298
1299 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1300 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1301 rd[i].evtID);
1302
1303 if (evID < -1000) {
1304 goto EndBuf; //not usable board/event/run --> skip it
1305 }
1306 if (evID < 0) { //no space left, retry later
1307#ifdef EVTDEBUG
1308 if (rd[i].bufLen != 0) {
1309 snprintf (str, MXSTR, "something screwed up");
1310 factOut (kFatal, 1, str);
1311 }
1312#endif
1313 xwait.tv_sec = 0;
1314 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1315 nanosleep (&xwait, NULL);
1316 goto EndBuf1; //hope there is free space next round
1317 }
1318 //we have a valid entry in mBuffer[]; fill it
1319
1320#ifdef EVTDEBUG
1321 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1322 rd[i].fadLen);
1323 if (xchk != 0) {
1324 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d",
1325 xchk, rd[i].fadLen, i);
1326 factOut (kFatal, 1, str);
1327
1328 uint iq;
1329 for (iq = 0; iq < rd[i].fadLen; iq++) {
1330 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1331 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq,
1332 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
1333 factOut (kFatal, 1, str);
1334 }
1335 }
1336 }
1337#endif
1338 int qncpy = 0;
1339 boardId = b;
1340 int fadBoard = rd[i].rBuf->S[12];
1341 int fadCrate = fadBoard / 256;
1342 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1343 snprintf (str, MXSTR, "wrong Board ID %d %d %d",
1344 fadCrate, fadBoard % 256, boardId);
1345 factOut (kWarn, 301, str);
1346 }
1347 if (mBuffer[evID].board[boardId] != -1) {
1348 snprintf (str, MXSTR,
1349 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ",
1350 evID, boardId, i, rd[i].fadLen,
1351 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1352 rd[i].rBuf->B[rd[i].bufPos - 2],
1353 rd[i].rBuf->B[rd[i].bufPos - 1]);
1354 factOut (kWarn, 501, str);
1355 goto EndBuf; //--> skip Board
1356 }
1357
1358 int iDx = evtIdx[evID]; //index into evtCtrl
1359
1360 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
1361 &rd[i].rBuf->S[0], head_len);
1362 qncpy += head_len;
1363
1364 src = head_len / 2;
1365 for (px = 0; px < 9; px++) { //different sort in FAD board.....
1366 for (drs = 0; drs < 4; drs++) {
1367 pixH = rd[i].rBuf->S[src++]; // ID
1368 pixC = rd[i].rBuf->S[src++]; // start-cell
1369 pixR = rd[i].rBuf->S[src++]; // roi
1370//here we should check if pixH is correct ....
1371
1372 pixS = boardId * 36 + drs * 9 + px;
1373 src++;
1374
1375
1376 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1377 dest = pixS * roi[0];
1378 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1379 &rd[i].rBuf->S[src], roi[0] * 2);
1380 qncpy += roi[0] * 2;
1381 src += pixR;
1382
1383 if (px == 8) {
1384 tmS = boardId * 4 + drs;
1385 if (pixR > roi[0]) { //and we have additional TM info
1386 dest = tmS * roi[0] + NPIX * roi[0];
1387 int srcT = src - roi[0];
1388 mBuffer[evID].fEvent->StartTM[tmS] =
1389 (pixC + pixR - roi[0]) % 1024;
1390 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1391 &rd[i].rBuf->S[srcT], roi[0] * 2);
1392 qncpy += roi[0] * 2;
1393 } else {
1394 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1395 //ETIENNE because the TM channels are always processed during drs calib,
1396 //set them to zero if they are not present
1397 //I suspect that it may be more efficient to set all the allocated mem to
1398 //zero when allocating it
1399// dest = tmS*roi[0] + NPIX*roi[0];
1400 // bzero(&mBuffer[evID].fEvent->Adc_Data[dest],roi[0]*2);
1401 }
1402 }
1403 }
1404 } // now we have stored a new board contents into Event structure
1405
1406 mBuffer[evID].fEvent->NumBoards++;
1407 mBuffer[evID].board[boardId] = boardId;
1408 evtCtrl.evtStat[iDx]++;
1409 evtCtrl.pcTime[iDx] = g_actTime;
1410
1411 if (++mBuffer[evID].nBoard >= actBoards) {
1412 int qnrun = 0;
1413 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ???
1414 actrun = mBuffer[evID].runNum;
1415 int ir;
1416 for (ir = 0; ir < MAX_RUN; ir++) {
1417 qnrun++;
1418 if (runCtrl[ir].runId == actrun) {
1419 if (++runCtrl[ir].lastEvt == 0) {
1420 gotNewRun (actrun, mBuffer[evID].FADhead);
1421 snprintf (str, MXSTR, "gotNewRun %d (ev %d)",
1422 mBuffer[evID].runNum,
1423 mBuffer[evID].evNum);
1424 factOut (kInfo, 1, str);
1425 break;
1426 }
1427 }
1428 }
1429 }
1430 snprintf (str, MXSTR,
1431 "%5d complete event roi %4d roiTM %d cpy %8d %5d",
1432 mBuffer[evID].evNum, roi[0], roi[8] - roi[0],
1433 qncpy, qnrun);
1434 factOut (kDebug, -1, str);
1435
1436 //complete event read ---> flag for next processing
1437 evtCtrl.evtStat[iDx] = 99;
1438 gi.evtTot++;
1439 }
1440
1441 EndBuf:
1442 rd[i].bufTyp = 0; //ready to read next header
1443 rd[i].bufLen = frst_len;
1444 rd[i].bufPos = 0;
1445 EndBuf1:
1446 ;
1447 }
1448
1449 } else { //we are reading event header
1450 rd[i].bufPos += jrd;
1451 rd[i].bufLen -= jrd;
1452 if (rd[i].bufPos >= minLen) { //sufficient data to take action
1453 //check if startflag correct; else shift block ....
1454 for (k = 0; k < rd[i].bufPos - 1; k++) {
1455 if (rd[i].rBuf->B[k] == start.B[1]
1456 && rd[i].rBuf->B[k + 1] == start.B[0])
1457 break;
1458 }
1459 rd[i].skip += k;
1460
1461 if (k >= rd[i].bufPos - 1) { //no start of header found
1462 rd[i].bufPos = 0;
1463 rd[i].bufLen = head_len;
1464 } else if (k > 0) {
1465 rd[i].bufPos -= k;
1466 rd[i].bufLen += k;
1467 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1468 rd[i].bufPos);
1469#ifdef EVTDEBUG
1470 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1471 rd[i].bufPos);
1472#endif
1473 }
1474 if (rd[i].bufPos >= minLen) {
1475 if (rd[i].skip > 0) {
1476 snprintf (str, MXSTR, "skipped %d bytes on port%d",
1477 rd[i].skip, i);
1478 factOut (kInfo, 666, str);
1479 rd[i].skip = 0;
1480 }
1481 goodhed++;
1482 //ETIENNE swap the header bytes already
1483 int eStart = 0;
1484 rd[i].rBuf->S[eStart+0] = ntohs(rd[i].rBuf->S[eStart+0]);//start_package_flag
1485 rd[i].rBuf->S[eStart+1] = ntohs(rd[i].rBuf->S[eStart+1]);//package_length
1486 rd[i].rBuf->S[eStart+2] = ntohs(rd[i].rBuf->S[eStart+2]);//version_no
1487 rd[i].rBuf->S[eStart+3] = ntohs(rd[i].rBuf->S[eStart+3]);//PLLLCK
1488 rd[i].rBuf->S[eStart+4] = ntohs(rd[i].rBuf->S[eStart+4]);//trigger_crc
1489 rd[i].rBuf->S[eStart+5] = ntohs(rd[i].rBuf->S[eStart+5]);//trigger_type
1490 rd[i].rBuf->I[eStart+3] = ntohl(rd[i].rBuf->I[eStart+3]);//trigger_id
1491 rd[i].rBuf->I[eStart+4] = ntohl(rd[i].rBuf->I[eStart+4]);//fad_evt_counter
1492 rd[i].rBuf->I[eStart+5] = ntohl(rd[i].rBuf->I[eStart+5]);//REFCLK_frequency
1493 rd[i].rBuf->S[eStart+12] = ntohs(rd[i].rBuf->S[eStart+12]);//board_id
1494 rd[i].rBuf->S[eStart+14] = ntohs(rd[i].rBuf->S[eStart+14]);//number_of_triggers_to_generate
1495 rd[i].rBuf->S[eStart+15] = ntohs(rd[i].rBuf->S[eStart+15]);//trigger_generator_prescaler
1496 // int eInter = ntohl(rd[i].rBuf->I[eStart+8]);//DNA
1497 // rd[i].rBuf->I[eStart+8] = ntohl(rd[i].rBuf->I[eStart+9]);
1498 //rd[i].rBuf->I[eStart+9] = eInter;
1499 rd[i].rBuf->I[eStart+10] = ntohl(rd[i].rBuf->I[eStart+10]);//time
1500 rd[i].rBuf->I[eStart+11] = ntohl(rd[i].rBuf->I[eStart+11]);//runnumber;
1501 int eCount = 24;
1502 for (;eCount<24+NTemp;eCount++)
1503 rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//drs_temperature
1504 for (; eCount < 24+NTemp+NDAC; eCount++)
1505 rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//dac
1506
1507
1508 // rd[i].fadLen = rd[i].rBuf->S[1] * 2;
1509// snprintf(str, MXSTR, "Size of the fad buffer: %d", rd[i].fadLen);
1510// factOut(kError, 000, str);
1511
1512 rd[i].fadVers = rd[i].rBuf->S[2];
1513 rd[i].ftmTyp = rd[i].rBuf->S[5];
1514 rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt)
1515 rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt)
1516 rd[i].runID = rd[i].rBuf->I[11];
1517 rd[i].bufTyp = 1; //ready to read full record
1518 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1519
1520 int fadboard = rd[i].rBuf->S[12];
1521 int fadcrate = fadboard / 256;
1522 fadboard = (fadcrate * 10 + fadboard % 256);
1523#ifdef EVTDEBUG
1524 snprintf (str, MXSTR,
1525 "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
1526 rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
1527 rd[i].runID, fadboard, jrd);
1528 factOut (kDebug, 1, str);
1529#endif
1530
1531 if (rd[i].runID == 0)
1532 rd[i].runID = gi_myRun;
1533
1534 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
1535 snprintf (str, MXSTR,
1536 "illegal event-length on port %d", i);
1537 factOut (kFatal, 881, str);
1538 rd[i].bufLen = 100000; //?
1539 }
1540 int fadBoard = rd[i].rBuf->S[12];
1541 debugHead (i, fadBoard, rd[i].rBuf);
1542 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1543 } else {
1544 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1545 }
1546 } else {
1547 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1548 }
1549
1550 } //end interpreting last read
1551}
1552 } //end of successful read anything
1553 } //finished trying to read all sockets
1554
1555#ifdef EVTDEBUG
1556 snprintf (str, MXSTR, "Loop ---- %3d --- %8d", numokx, jrdx);
1557 factOut (kDebug, -1, str);
1558#endif
1559
1560 gi.numRead[numok]++;
1561
1562 g_actTime = time (NULL);
1563 if (g_actTime > gi_SecTime) {
1564 gi_SecTime = g_actTime;
1565
1566
1567 //loop over all active events and flag those older than read-timeout
1568 //delete those that are written to disk ....
1569
1570 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1571 if (kd < 0)
1572 kd += (MAX_EVT * MAX_RUN);
1573
1574 gj.bufNew = gj.bufEvt = 0;
1575 int k1 = evtCtrl.frstPtr;
1576 for (k = k1; k < (k1 + kd); k++) {
1577 int k0 = k % (MAX_EVT * MAX_RUN);
1578//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1579
1580 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) {
1581 gj.bufNew++; //incomplete event in Buffer
1582 if (evtCtrl.evtStat[k0] < 90
1583 && evtCtrl.pcTime[k0] < g_actTime - 10) {
1584 int id = evtCtrl.evtBuf[k0];
1585 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d",
1586 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0,
1587 evtCtrl.evtStat[k0]);
1588 factOut (kWarn, 601, str);
1589
1590 int ik,ib,jb;
1591 ik=0;
1592 for (ib=0; ib<NBOARDS; ib++) {
1593 if (ib%10==0) {
1594 snprintf (&str[ik], MXSTR, "'");
1595 ik++;
1596 }
1597 jb = mBuffer[id].board[ib];
1598 if ( jb<0 ) {
1599 if (gi_NumConnect[b] >0 ) {
1600 snprintf (&str[ik], MXSTR, ".");
1601 } else {
1602 snprintf (&str[ik], MXSTR, "x");
1603 }
1604 } else {
1605 snprintf (&str[ik], MXSTR, "%d",jb%10);
1606 }
1607 ik++;
1608 }
1609 snprintf (&str[ik], MXSTR, "'");
1610 factOut (kWarn, 601, str);
1611
1612 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events
1613 gi.evtSkp++;
1614 gi.evtTot++;
1615 gj.evtSkip++;
1616 }
1617 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete'
1618 || evtCtrl.evtStat[k0] == 0) { //'useless'
1619
1620 int id = evtCtrl.evtBuf[k0];
1621 snprintf (str, MXSTR, "%5d free event buffer, nb=%3d",
1622 mBuffer[id].evNum, mBuffer[id].nBoard);
1623 factOut (kDebug, -1, str);
1624 mBufFree (id); //event written--> free memory
1625 evtCtrl.evtStat[k0] = -1;
1626 gj.evtWrite++;
1627 gj.rateWrite++;
1628 } else if (evtCtrl.evtStat[k0] >= 95) {
1629 gj.bufEvt++; //complete event in Buffer
1630 }
1631
1632 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) {
1633 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1634 }
1635 }
1636
1637
1638 gj.deltaT = 1000; //temporary, must be improved
1639
1640 int b;
1641 for (b = 0; b < NBOARDS; b++)
1642 gj.totBytes[b] += gj.rateBytes[b];
1643 gj.totMem = g_maxMem;
1644 if (gj.maxMem > gj.xxxMem)
1645 gj.xxxMem = gj.maxMem;
1646 if (gj.maxEvt > gj.xxxEvt)
1647 gj.xxxEvt = gj.maxEvt;
1648
1649 factStat (gj);
1650 factStatNew (gi);
1651 gj.rateNew = gj.rateWrite = 0;
1652 gj.maxMem = gj.usdMem;
1653 gj.maxEvt = gj.bufTot;
1654 for (b = 0; b < NBOARDS; b++)
1655 gj.rateBytes[b] = 0;
1656 }
1657
1658 if (numok > 0)
1659 numok2 = 0;
1660 else if (numok2++ > 3) {
1661 if (g_runStat == 1) {
1662 xwait.tv_sec = 1;
1663 xwait.tv_nsec = 0; // hibernate for 1 sec
1664 } else {
1665 xwait.tv_sec = 0;
1666 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1667 }
1668 nanosleep (&xwait, NULL);
1669 }
1670
1671 } //and do next loop over all sockets ...
1672
1673
1674 snprintf (str, MXSTR, "stop reading ... RESET=%d", g_reset);
1675 factOut (kInfo, -1, str);
1676
1677 if (g_reset > 0) {
1678 gi_reset = g_reset;
1679 gi_resetR = gi_reset % 10; //shall we stop reading ?
1680 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1681 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1682 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1683 g_reset = 0;
1684 } else {
1685 gi_reset = 0;
1686 if (g_runStat == -1)
1687 gi_resetR = 1;
1688 else
1689 gi_resetR = 7;
1690 gi_resetS = 7; //close all sockets
1691 gi_resetW = 7; //close all files
1692 gi_resetX = 0;
1693
1694 //inform others we have to quit ....
1695 gi_runStat = -11; //inform all that no update to happen any more
1696 gj.readStat = -11; //inform all that no update to happen any more
1697 }
1698
1699 if (gi_resetS > 0) {
1700 //must close all open sockets ...
1701 snprintf (str, MXSTR, "close all sockets ...");
1702 factOut (kInfo, -1, str);
1703 for (i = 0; i < MAX_SOCK; i++) {
1704 if (rd[i].sockStat == 0) {
1705 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1706 if (i % 7 == 0) {
1707// gi_NumConnect[i / 7]--;
1708 gi_NumConnect[i / 7]-= cntsock ;
1709 gi.numConn[i / 7]--;
1710 gj.numConn[i / 7]--;
1711 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1712 rd[i / 7].sockStat = -1; //and try to open asap
1713 }
1714 }
1715 }
1716 }
1717
1718
1719 if (gi_resetR > 0) {
1720 //flag all events as 'read finished'
1721 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1722 if (kd < 0)
1723 kd += (MAX_EVT * MAX_RUN);
1724
1725 int k1 = evtCtrl.frstPtr;
1726 for (k = k1; k < (k1 + kd); k++) {
1727 int k0 = k % (MAX_EVT * MAX_RUN);
1728 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) {
1729 evtCtrl.evtStat[k0] = 91;
1730 gi.evtSkp++;
1731 gi.evtTot++;
1732 }
1733 }
1734
1735 xwait.tv_sec = 0;
1736 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1737 nanosleep (&xwait, NULL);
1738
1739 //and clear all buffers (might have to wait until all others are done)
1740 int minclear;
1741 if (gi_resetR == 1) {
1742 minclear = 900;
1743 snprintf (str, MXSTR, "drain all buffers ...");
1744 } else {
1745 minclear = 0;
1746 snprintf (str, MXSTR, "flush all buffers ...");
1747 }
1748 factOut (kInfo, -1, str);
1749
1750 int numclear = 1;
1751 while (numclear > 0) {
1752 numclear = 0;
1753 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1754 if (kd < 0)
1755 kd += (MAX_EVT * MAX_RUN);
1756
1757 int k1 = evtCtrl.frstPtr;
1758 for (k = k1; k < (k1 + kd); k++) {
1759 int k0 = k % (MAX_EVT * MAX_RUN);
1760 if (evtCtrl.evtStat[k0] > minclear) {
1761 int id = evtCtrl.evtBuf[k0];
1762 snprintf (str, MXSTR, "ev %5d free event buffer, nb=%3d",
1763 mBuffer[id].evNum, mBuffer[id].nBoard);
1764 factOut (kDebug, -1, str);
1765 mBufFree (id); //event written--> free memory
1766 evtCtrl.evtStat[k0] = -1;
1767 } else if (evtCtrl.evtStat[k0] > 0)
1768 numclear++; //writing is still ongoing...
1769
1770 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1771 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1772 }
1773
1774 xwait.tv_sec = 0;
1775 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1776 nanosleep (&xwait, NULL);
1777 }
1778 }
1779
1780 if (gi_reset > 0) {
1781 if (gi_resetW > 0) {
1782 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1783 }
1784 if (gi_resetX > 0) {
1785 xwait.tv_sec = gi_resetX;
1786 xwait.tv_nsec = 0;
1787 nanosleep (&xwait, NULL);
1788 }
1789
1790 snprintf (str, MXSTR, "Continue read Process ...");
1791 factOut (kInfo, -1, str);
1792 gi_reset = 0;
1793 goto START;
1794 }
1795
1796
1797
1798 snprintf (str, MXSTR, "Exit read Process ...");
1799 factOut (kInfo, -1, str);
1800 gi_runStat = -99;
1801 gj.readStat = -99;
1802 factStat (gj);
1803 factStatNew (gi);
1804 return 0;
1805
1806} /*-----------------------------------------------------------------*/
1807
1808
1809void *
1810subProc (void *thrid)
1811{
1812 int threadID, status, numWait, numProc, kd, k1, k0, k, jret;
1813 struct timespec xwait;
1814
1815 int32_t cntr ;
1816
1817 threadID = (int) thrid;
1818
1819 snprintf (str, MXSTR, "Starting sub-process-thread %d", threadID);
1820 factOut (kInfo, -1, str);
1821
1822 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1823 numWait = numProc = 0;
1824 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1825 if (kd < 0)
1826 kd += (MAX_EVT * MAX_RUN);
1827
1828 int k1 = evtCtrl.frstPtr;
1829 for (k = k1; k < (k1 + kd); k++) {
1830 int k0 = k % (MAX_EVT * MAX_RUN);
1831
1832 if (evtCtrl.evtStat[k0] == 1000 + threadID) {
1833 if (gi_resetR > 1) { //we are asked to flush buffers asap
1834 jret = 9100; //flag to be deleted
1835 } else {
1836 int id = evtCtrl.evtBuf[k0];
1837
1838
1839 jret =
1840 subProcEvt (threadID, mBuffer[id].FADhead,
1841 mBuffer[id].fEvent, mBuffer[id].buffer);
1842
1843
1844 if (jret <= threadID) {
1845 snprintf (str, MXSTR,
1846 "process %d wants to send to process %d",
1847 threadID, jret);
1848 factOut (kError, -1, str);
1849 jret = 5300;
1850 } else if (jret <= 0)
1851 jret = 9200 + threadID; //flag as 'to be deleted'
1852 else if (jret >= gi_maxProc)
1853 jret = 5200 + threadID; //flag as 'to be written'
1854 else
1855 jret = 1000 + jret; //flag for next proces
1856 }
1857 evtCtrl.evtStat[k0] = jret;
1858 numProc++;
1859 } else if (evtCtrl.evtStat[k0] < 1000 + threadID)
1860 numWait++;
1861 }
1862
1863 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1864 snprintf (str, MXSTR, "Exit subProcessing Process %d", threadID);
1865 factOut (kInfo, -1, str);
1866 return 0;
1867 }
1868 if (numProc == 0) {
1869 //seems we have nothing to do, so sleep a little
1870 xwait.tv_sec = 0;
1871 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1872 nanosleep (&xwait, NULL);
1873 }
1874 }
1875
1876 snprintf (str, MXSTR, "Ending sub-process-thread %d", threadID);
1877 factOut (kInfo, -1, str);
1878 return;
1879} /*-----------------------------------------------------------------*/
1880
1881
1882void *
1883procEvt (void *ptr)
1884{
1885/* *** main loop processing file, including SW-trigger */
1886 int numProc, numWait;
1887 int k, status, j;
1888 struct timespec xwait;
1889 char str[MXSTR];
1890
1891
1892
1893 int lastRun = 0; //usually run from last event still valid
1894
1895 cpu_set_t mask;
1896 int cpu = 1; //process thread (will be several in final version)
1897
1898 snprintf (str, MXSTR, "Starting process-thread with %d subprocess",
1899 gi_maxProc);
1900 factOut (kInfo, -1, str);
1901
1902/* CPU_ZERO initializes all the bits in the mask to zero. */
1903// CPU_ZERO (&mask);
1904/* CPU_SET sets only the bit corresponding to cpu. */
1905// CPU_SET( 0 , &mask ); leave for system
1906// CPU_SET( 1 , &mask ); used by write process
1907// CPU_SET (2, &mask);
1908// CPU_SET (3, &mask);
1909// CPU_SET (4, &mask);
1910// CPU_SET (5, &mask);
1911// CPU_SET (6, &mask);
1912// CPU_SET( 7 , &mask ); used by read process
1913/* sched_setaffinity returns 0 in success */
1914// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1915// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1916// factOut (kWarn, -1, str);
1917// }
1918
1919
1920 pthread_t thread[100];
1921 int th_ret[100];
1922
1923 for (k = 0; k < gi_maxProc; k++) {
1924 th_ret[k] = pthread_create (&thread[k], NULL, subProc, (void *) k);
1925 }
1926
1927 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1928
1929 numWait = numProc = 0;
1930 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1931 if (kd < 0)
1932 kd += (MAX_EVT * MAX_RUN);
1933
1934 int k1 = evtCtrl.frstPtr;
1935 for (k = k1; k < (k1 + kd); k++) {
1936 int k0 = k % (MAX_EVT * MAX_RUN);
1937//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1938 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) {
1939
1940 if (gi_resetR > 1) { //we are asked to flush buffers asap
1941 evtCtrl.evtStat[k0] = 9991;
1942 } else {
1943
1944//-------- it is better to open the run already here, so call can be used to initialize
1945//-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1946 int id = evtCtrl.evtBuf[k0];
1947 uint32_t irun = mBuffer[id].runNum;
1948 int32_t ievt = mBuffer[id].evNum;
1949 if (runCtrl[lastRun].runId == irun) {
1950 j = lastRun;
1951 } else {
1952 //check which fileID to use (or open if needed)
1953 for (j = 0; j < MAX_RUN; j++) {
1954 if (runCtrl[j].runId == irun)
1955 break;
1956 }
1957 if (j >= MAX_RUN) {
1958 snprintf (str, MXSTR,
1959 "P error: can not find run %d for event %d in %d",
1960 irun, ievt, id);
1961 factOut (kFatal, 901, str);
1962 }
1963 lastRun = j;
1964 }
1965
1966 if (runCtrl[j].fileId < 0) {
1967//---- we need to open a new run ==> make sure all older runs are
1968//---- finished and marked to be closed ....
1969 int j1;
1970 for (j1 = 0; j1 < MAX_RUN; j1++) {
1971 if (runCtrl[j1].fileId == 0) {
1972 runCtrl[j1].procId = 2; //--> do no longer accept events for processing
1973//---- problem: processing still going on ==> must wait for closing ....
1974 snprintf (str, MXSTR,
1975 "P finish run since new one opened %d",
1976 runCtrl[j1].runId);
1977 runFinish1 (runCtrl[j1].runId);
1978 }
1979
1980 }
1981
1982 actRun.Version = 1;
1983 actRun.RunType = -1; //to be adapted
1984
1985 actRun.Nroi = runCtrl[j].roi0;
1986 actRun.NroiTM = runCtrl[j].roi8;
1987//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
1988// if (actRun.Nroi == actRun.NroiTM)
1989// actRun.NroiTM = 0;
1990 actRun.RunTime = runCtrl[j].firstTime;
1991 actRun.RunUsec = runCtrl[j].firstTime;
1992 actRun.NBoard = NBOARDS;
1993 actRun.NPix = NPIX;
1994 actRun.NTm = NTMARK;
1995 actRun.Nroi = mBuffer[id].nRoi;
1996 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1997 NBOARDS * sizeof (PEVNT_HEADER));
1998
1999 runCtrl[j].fileHd =
2000 runOpen (irun, &actRun, sizeof (actRun));
2001 if (runCtrl[j].fileHd == NULL) {
2002 snprintf (str, MXSTR,
2003 "P could not open a file for run %d", irun);
2004 factOut (kError, 502, str);
2005 runCtrl[j].fileId = 91;
2006 runCtrl[j].procId = 91;
2007 } else {
2008 snprintf (str, MXSTR, "P opened new run_file %d evt %d",
2009 irun, ievt);
2010 factOut (kInfo, -1, str);
2011 runCtrl[j].fileId = 0;
2012 runCtrl[j].procId = 0;
2013 }
2014
2015 }
2016//-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
2017 if (runCtrl[j].procId == 0) {
2018 if (runCtrl[j].closeTime < g_actTime
2019 || runCtrl[j].lastTime < g_actTime - 300
2020 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) {
2021 snprintf (str, MXSTR,
2022 "P reached end of run condition for run %d",
2023 irun);
2024 factOut (kInfo, 502, str);
2025 runFinish1 (runCtrl[j].runId);
2026 runCtrl[j].procId = 1;
2027 }
2028 }
2029 if (runCtrl[j].procId != 0) {
2030 snprintf (str, MXSTR,
2031 "P skip event %d because no active run %d", ievt,
2032 irun);
2033 factOut (kDebug, 502, str);
2034 evtCtrl.evtStat[k0] = 9091;
2035 } else {
2036//--------
2037//--------
2038 id = evtCtrl.evtBuf[k0];
2039 int itevt = mBuffer[id].trgNum;
2040 int itrg = mBuffer[id].trgTyp;
2041 int roi = mBuffer[id].nRoi;
2042 int roiTM = mBuffer[id].nRoiTM;
2043
2044//make sure unused pixels/tmarks are cleared to zero
2045//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
2046// if (roiTM == roi)
2047// roiTM = 0;
2048 int ip, it, dest, ib;
2049 for (ip = 0; ip < NPIX; ip++) {
2050 if (mBuffer[id].fEvent->StartPix[ip] == -1) {
2051 dest = ip * roi;
2052 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
2053 }
2054 }
2055
2056 for (it = 0; it < NTMARK; it++) {
2057 if (mBuffer[id].fEvent->StartTM[it] == -1) {
2058 dest = it * roi + NPIX * roi;
2059 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
2060 }
2061 }
2062
2063
2064//and set correct event header ; also check for consistency in event (not yet)
2065 mBuffer[id].fEvent->Roi = roi;
2066 mBuffer[id].fEvent->RoiTM = roiTM;
2067 mBuffer[id].fEvent->EventNum = ievt;
2068 mBuffer[id].fEvent->TriggerNum = itevt;
2069 mBuffer[id].fEvent->TriggerType = itrg;
2070 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
2071 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
2072 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
2073 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
2074 mBuffer[id].fEvent->SoftTrig = 0;
2075
2076
2077 for (ib = 0; ib < NBOARDS; ib++) {
2078 if (mBuffer[id].board[ib] == -1) { //board is not read
2079 mBuffer[id].FADhead[ib].start_package_flag = 0;
2080 mBuffer[id].fEvent->BoardTime[ib] = 0;
2081 } else {
2082 mBuffer[id].fEvent->BoardTime[ib] =
2083 mBuffer[id].FADhead[ib].time;
2084 }
2085 }
2086 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
2087 mBuffer[id].fEvent);
2088 gi.procTot++;
2089 numProc++;
2090
2091 if (i < 0) {
2092 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
2093 gi.procErr++;
2094 } else {
2095 evtCtrl.evtStat[k0] = 1000;
2096 runCtrl[j].procEvt++;
2097 }
2098 }
2099 }
2100 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) {
2101 numWait++;
2102 }
2103 }
2104
2105 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2106 snprintf (str, MXSTR, "Exit Processing Process ...");
2107 factOut (kInfo, -1, str);
2108 gp_runStat = -22; //==> we should exit
2109 gj.procStat = -22; //==> we should exit
2110 return 0;
2111 }
2112
2113 if (numProc == 0) {
2114 //seems we have nothing to do, so sleep a little
2115 xwait.tv_sec = 0;
2116 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2117 nanosleep (&xwait, NULL);
2118 }
2119 gp_runStat = gi_runStat;
2120 gj.procStat = gj.readStat;
2121
2122 }
2123
2124 //we are asked to abort asap ==> must flag all remaining events
2125 // when gi_runStat claims that all events are in the buffer...
2126
2127 snprintf (str, MXSTR, "Abort Processing Process ...");
2128 factOut (kInfo, -1, str);
2129 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2130 if (kd < 0)
2131 kd += (MAX_EVT * MAX_RUN);
2132
2133 for (k = 0; k < gi_maxProc; k++) {
2134 pthread_join (thread[k], (void **) &status);
2135 }
2136
2137 int k1 = evtCtrl.frstPtr;
2138 for (k = k1; k < (k1 + kd); k++) {
2139 int k0 = k % (MAX_EVT * MAX_RUN);
2140 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {
2141 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
2142 }
2143 }
2144
2145 gp_runStat = -99;
2146 gj.procStat = -99;
2147
2148 return 0;
2149
2150} /*-----------------------------------------------------------------*/
2151
2152int
2153CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
2154{
2155/* close run runId (all all runs if runId=0) */
2156/* return: 0=close scheduled / >0 already closed / <0 does not exist */
2157 int j;
2158
2159
2160 if (runId == 0) {
2161 for (j = 0; j < MAX_RUN; j++) {
2162 if (runCtrl[j].fileId == 0) { //run is open
2163 runCtrl[j].closeTime = closeTime;
2164 runCtrl[j].maxEvt = maxEvt;
2165 }
2166 }
2167 return 0;
2168 }
2169
2170 for (j = 0; j < MAX_RUN; j++) {
2171 if (runCtrl[j].runId == runId) {
2172 if (runCtrl[j].fileId == 0) { //run is open
2173 runCtrl[j].closeTime = closeTime;
2174 runCtrl[j].maxEvt = maxEvt;
2175 return 0;
2176 } else if (runCtrl[j].fileId < 0) { //run not yet opened
2177 runCtrl[j].closeTime = closeTime;
2178 runCtrl[j].maxEvt = maxEvt;
2179 return +1;
2180 } else { // run already closed
2181 return +2;
2182 }
2183 }
2184 } //we only reach here if the run was never created
2185 return -1;
2186
2187} /*-----------------------------------------------------------------*/
2188
2189
2190void *
2191writeEvt (void *ptr)
2192{
2193/* *** main loop writing event (including opening and closing run-files */
2194
2195 int numWrite, numWait;
2196 int k, j ;
2197 struct timespec xwait;
2198 char str[MXSTR];
2199
2200 cpu_set_t mask;
2201 int cpu = 1; //write thread
2202
2203 snprintf (str, MXSTR, "Starting write-thread");
2204 factOut (kInfo, -1, str);
2205
2206/* CPU_ZERO initializes all the bits in the mask to zero. */
2207// CPU_ZERO (&mask);
2208/* CPU_SET sets only the bit corresponding to cpu. */
2209// CPU_SET (cpu, &mask);
2210/* sched_setaffinity returns 0 in success */
2211// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
2212// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
2213// }
2214
2215 int lastRun = 0; //usually run from last event still valid
2216
2217 while (g_runStat > -2) {
2218
2219 numWait = numWrite = 0;
2220 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2221 if (kd < 0)
2222 kd += (MAX_EVT * MAX_RUN);
2223
2224 int k1 = evtCtrl.frstPtr;
2225 for (k = k1; k < (k1 + kd); k++) {
2226 int k0 = k % (MAX_EVT * MAX_RUN);
2227//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
2228 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) {
2229
2230 if (gi_resetR > 1) { //we must drain the buffer asap
2231 evtCtrl.evtStat[k0] = 9904;
2232 } else {
2233
2234
2235 int id = evtCtrl.evtBuf[k0];
2236 uint32_t irun = mBuffer[id].runNum;
2237 int32_t ievt = mBuffer[id].evNum;
2238
2239 gi.wrtTot++;
2240 if (runCtrl[lastRun].runId == irun) {
2241 j = lastRun;
2242 } else {
2243 //check which fileID to use (or open if needed)
2244 for (j = 0; j < MAX_RUN; j++) {
2245 if (runCtrl[j].runId == irun)
2246 break;
2247 }
2248 if (j >= MAX_RUN) {
2249 snprintf (str, MXSTR,
2250 "W error: can not find run %d for event %d in %d",
2251 irun, ievt, id);
2252 factOut (kFatal, 901, str);
2253 gi.wrtErr++;
2254 }
2255 lastRun = j;
2256 }
2257
2258 if (runCtrl[j].fileId < 0) {
2259 actRun.Version = 1;
2260 actRun.RunType = -1; //to be adapted
2261
2262 actRun.Nroi = runCtrl[j].roi0;
2263 actRun.NroiTM = runCtrl[j].roi8;
2264//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
2265// if (actRun.Nroi == actRun.NroiTM)
2266// actRun.NroiTM = 0;
2267 actRun.RunTime = runCtrl[j].firstTime;
2268 actRun.RunUsec = runCtrl[j].firstTime;
2269 actRun.NBoard = NBOARDS;
2270 actRun.NPix = NPIX;
2271 actRun.NTm = NTMARK;
2272 actRun.Nroi = mBuffer[id].nRoi;
2273 memcpy (actRun.FADhead, mBuffer[id].FADhead,
2274 NBOARDS * sizeof (PEVNT_HEADER));
2275
2276 runCtrl[j].fileHd =
2277 runOpen (irun, &actRun, sizeof (actRun));
2278 if (runCtrl[j].fileHd == NULL) {
2279 snprintf (str, MXSTR,
2280 "W could not open a file for run %d", irun);
2281 factOut (kError, 502, str);
2282 runCtrl[j].fileId = 91;
2283 } else {
2284 snprintf (str, MXSTR, "W opened new run_file %d evt %d",
2285 irun, ievt);
2286 factOut (kInfo, -1, str);
2287 runCtrl[j].fileId = 0;
2288 }
2289
2290 }
2291
2292 if (runCtrl[j].fileId != 0) {
2293 if (runCtrl[j].fileId < 0) {
2294 snprintf (str, MXSTR,
2295 "W never opened file for this run %d", irun);
2296 factOut (kError, 123, str);
2297 } else if (runCtrl[j].fileId < 100) {
2298 snprintf (str, MXSTR, "W.file for this run is closed %d",
2299 irun);
2300 factOut (kWarn, 123, str);
2301 runCtrl[j].fileId += 100;
2302 } else {
2303 snprintf (str, MXSTR, "W:file for this run is closed %d",
2304 irun);
2305 factOut (kDebug, 123, str);
2306 }
2307 evtCtrl.evtStat[k0] = 9903;
2308 gi.wrtErr++;
2309 } else {
2310// snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id]));
2311// factOut (kInfo, 504, str);
2312 int i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
2313 sizeof (mBuffer[id]));
2314 if (i >= 0) {
2315 runCtrl[j].lastTime = g_actTime;
2316 runCtrl[j].actEvt++;
2317 evtCtrl.evtStat[k0] = 9901;
2318 snprintf (str, MXSTR,
2319 "%5d successfully wrote for run %d id %5d",
2320 ievt, irun, k0);
2321 factOut (kDebug, 504, str);
2322// gj.writEvt++ ;
2323 } else {
2324 snprintf (str, MXSTR, "W error writing event for run %d",
2325 irun);
2326 factOut (kError, 503, str);
2327 evtCtrl.evtStat[k0] = 9902;
2328 gi.wrtErr++;
2329 }
2330
2331 if (i < 0
2332 || runCtrl[j].lastTime < g_actTime - 300
2333 || runCtrl[j].closeTime < g_actTime
2334 || runCtrl[j].maxEvt < runCtrl[j].actEvt) {
2335 int ii = 0;
2336 if (i < 0)
2337 ii = 1;
2338 else if (runCtrl[j].closeTime < g_actTime)
2339 ii = 2;
2340 else if (runCtrl[j].lastTime < g_actTime - 300)
2341 ii = 3;
2342 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2343 ii = 4;
2344
2345
2346
2347 //close run for whatever reason
2348 if (runCtrl[j].runId == gi_myRun)
2349 gi_myRun = g_actTime;
2350
2351 if (runCtrl[j].procId == 0) {
2352 runFinish1 (runCtrl[j].runId);
2353 runCtrl[j].procId = 92;
2354 }
2355
2356 runCtrl[j].closeTime = g_actTime - 1;
2357 i = runClose (runCtrl[j].fileHd, &runTail[j],
2358 sizeof (runTail[j]));
2359 if (i < 0) {
2360 snprintf (str, MXSTR, "error closing run %d %d AAA",
2361 runCtrl[j].runId, i);
2362 factOut (kError, 503, str);
2363 runCtrl[j].fileId = 92;
2364 } else {
2365 snprintf (str, MXSTR, "W closed run %d AAA %d", irun,
2366 ii);
2367 factOut (kInfo, 503, str);
2368 runCtrl[j].fileId = 93;
2369 }
2370 }
2371 }
2372 }
2373 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2374 numWait++;
2375 }
2376
2377 //check if we should close a run (mainly when no event pending)
2378 //ETIENNE but first figure out which one is the latest run with a complete event.
2379 //i.e. max run Id and lastEvt >= 0
2380 //this condition is sufficient because all pending events were written already in the loop just above
2381 unsigned int maxStartedRun = 0;
2382 for (j=0;j<MAX_RUN;j++)
2383 {
2384 if ((runCtrl[j].runId > maxStartedRun) &&
2385 (runCtrl[j].lastEvt > -1) &&
2386 (runCtrl[j].runId != 0))
2387 maxStartedRun = runCtrl[j].runId;
2388 }
2389// snprintf(str, MXSTR, "Maximum runId: %d", maxStartedRun);
2390// factOut(kInfo, 000, str);
2391 //Also check if some files will never be opened
2392 //EDIT: this is completely useless, because as run Numbers are taken from FADs board,
2393 //I will never get run numbers for which no file is to be opened
2394 for (j=0;j<MAX_RUN;j++)
2395 {
2396 if ((runCtrl[j].fileId < 0) &&
2397 (runCtrl[j].runId < maxStartedRun) &&
2398 (runCtrl[j].runId != 0))
2399 {
2400 snprintf (str, MXSTR, "No file will be opened for run #%ud. latest run: %ud (started)", runCtrl[j].runId, maxStartedRun);
2401 factOut (kInfo, 000, str);
2402 ;//TODO notify that this run will never be opened
2403 }
2404 }
2405 for (j = 0; j < MAX_RUN; j++) {
2406 if (runCtrl[j].fileId == 0
2407 && (runCtrl[j].closeTime < g_actTime
2408 || runCtrl[j].lastTime < g_actTime - 300
2409 || runCtrl[j].maxEvt <= runCtrl[j].actEvt
2410 || (runCtrl[j].runId < maxStartedRun && runCtrl[j].runId != 0))) //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
2411{
2412 if (runCtrl[j].runId == gi_myRun)
2413 gi_myRun = g_actTime;
2414 int ii = 0;
2415 if (runCtrl[j].closeTime < g_actTime)
2416 ii = 2;
2417 else if (runCtrl[j].lastTime < g_actTime - 300)
2418 ii = 3;
2419 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2420 ii = 4;
2421
2422 if (runCtrl[j].procId == 0) {
2423 runFinish1 (runCtrl[j].runId);
2424 runCtrl[j].procId = 92;
2425 }
2426
2427 runCtrl[j].closeTime = g_actTime - 1;
2428 int i = runClose (runCtrl[j].fileHd, &runTail[j],
2429 sizeof (runTail[j]));
2430 if (i < 0) {
2431 snprintf (str, MXSTR, "error closing run %d %d BBB",
2432 runCtrl[j].runId, i);
2433 factOut (kError, 506, str);
2434 runCtrl[j].fileId = 94;
2435 } else {
2436 snprintf (str, MXSTR, "W closed run %d BBB %d",
2437 runCtrl[j].runId, ii);
2438 factOut (kInfo, 507, str);
2439 runCtrl[j].fileId = 95;
2440 }
2441 }
2442 }
2443
2444 if (numWrite == 0) {
2445 //seems we have nothing to do, so sleep a little
2446 xwait.tv_sec = 0;
2447 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2448 nanosleep (&xwait, NULL);
2449 }
2450
2451 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2452 snprintf (str, MXSTR, "Finish Write Process ...");
2453 factOut (kInfo, -1, str);
2454 gw_runStat = -22; //==> we should exit
2455 gj.writStat = -22; //==> we should exit
2456 goto closerun;
2457 }
2458 gw_runStat = gi_runStat;
2459 gj.writStat = gj.readStat;
2460
2461 }
2462
2463 //must close all open files ....
2464 snprintf (str, MXSTR, "Abort Writing Process ...");
2465 factOut (kInfo, -1, str);
2466
2467 closerun:
2468 snprintf (str, MXSTR, "Close all open files ...");
2469 factOut (kInfo, -1, str);
2470 for (j = 0; j < MAX_RUN; j++)
2471 if (runCtrl[j].fileId == 0) {
2472 if (runCtrl[j].runId == gi_myRun)
2473 gi_myRun = g_actTime;
2474
2475 if (runCtrl[j].procId == 0) {
2476 runFinish1 (runCtrl[j].runId);
2477 runCtrl[j].procId = 92;
2478 }
2479
2480 runCtrl[j].closeTime = g_actTime - 1;
2481 int i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));
2482 int ii = 0;
2483 if (runCtrl[j].closeTime < g_actTime)
2484 ii = 2;
2485 else if (runCtrl[j].lastTime < g_actTime - 300)
2486 ii = 3;
2487 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2488 ii = 4;
2489 if (i < 0) {
2490 snprintf (str, MXSTR, "error closing run %d %d CCC",
2491 runCtrl[j].runId, i);
2492 factOut (kError, 506, str);
2493 runCtrl[j].fileId = 96;
2494 } else {
2495 snprintf (str, MXSTR, "W closed run %d CCC %d", runCtrl[j].runId,
2496 ii);
2497 factOut (kInfo, 507, str);
2498 runCtrl[j].fileId = 97;
2499 }
2500 }
2501
2502 gw_runStat = -99;
2503 gj.writStat = -99;
2504 snprintf (str, MXSTR, "Exit Writing Process ...");
2505 factOut (kInfo, -1, str);
2506 return 0;
2507
2508
2509
2510
2511} /*-----------------------------------------------------------------*/
2512
2513
2514
2515
2516void
2517StartEvtBuild ()
2518{
2519
2520 int i, j, imax, status, th_ret[50];
2521 pthread_t thread[50];
2522 struct timespec xwait;
2523
2524 gi_runStat = gp_runStat = gw_runStat = 0;
2525 gj.readStat = gj.procStat = gj.writStat = 0;
2526
2527 snprintf (str, MXSTR, "Starting EventBuilder V15.07 A");
2528 factOut (kInfo, -1, str);
2529
2530//initialize run control logics
2531 for (i = 0; i < MAX_RUN; i++) {
2532 runCtrl[i].runId = 0;
2533 runCtrl[i].fileId = -2;
2534 }
2535
2536//prepare for subProcesses
2537 gi_maxSize = g_maxSize;
2538 if (gi_maxSize <= 0)
2539 gi_maxSize = 1;
2540
2541 gi_maxProc = g_maxProc;
2542 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2543 snprintf (str, MXSTR, "illegal number of processes %d", gi_maxProc);
2544 factOut (kFatal, 301, str);
2545 gi_maxProc = 1;
2546 }
2547//partially initialize event control logics
2548 evtCtrl.frstPtr = 0;
2549 evtCtrl.lastPtr = 0;
2550
2551//start all threads (more to come) when we are allowed to ....
2552 while (g_runStat == 0) {
2553 xwait.tv_sec = 0;
2554 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2555 nanosleep (&xwait, NULL);
2556 }
2557
2558 i = 0;
2559 th_ret[i] = pthread_create (&thread[i], NULL, readFAD, NULL);
2560 i++;
2561 th_ret[i] = pthread_create (&thread[i], NULL, procEvt, NULL);
2562 i++;
2563 th_ret[i] = pthread_create (&thread[i], NULL, writeEvt, NULL);
2564 i++;
2565 imax = i;
2566
2567
2568#ifdef BILAND
2569 xwait.tv_sec = 30;;
2570 xwait.tv_nsec = 0; // sleep for ~20sec
2571 nanosleep (&xwait, NULL);
2572
2573 printf ("close all runs in 2 seconds\n");
2574
2575 CloseRunFile (0, time (NULL) + 2, 0);
2576
2577 xwait.tv_sec = 1;;
2578 xwait.tv_nsec = 0; // sleep for ~20sec
2579 nanosleep (&xwait, NULL);
2580
2581 printf ("setting g_runstat to -1\n");
2582
2583 g_runStat = -1;
2584#endif
2585
2586
2587//wait for all threads to finish
2588 for (i = 0; i < imax; i++) {
2589 j = pthread_join (thread[i], (void **) &status);
2590 }
2591
2592} /*-----------------------------------------------------------------*/
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608 /*-----------------------------------------------------------------*/
2609 /*-----------------------------------------------------------------*/
2610 /*-----------------------------------------------------------------*/
2611 /*-----------------------------------------------------------------*/
2612 /*-----------------------------------------------------------------*/
2613
2614#ifdef BILAND
2615
2616int
2617subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2618 int8_t * buffer)
2619{
2620 printf ("called subproc %d\n", threadID);
2621 return threadID + 1;
2622}
2623
2624
2625
2626
2627 /*-----------------------------------------------------------------*/
2628 /*-----------------------------------------------------------------*/
2629 /*-----------------------------------------------------------------*/
2630 /*-----------------------------------------------------------------*/
2631 /*-----------------------------------------------------------------*/
2632
2633
2634
2635
2636FileHandle_t
2637runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2638{
2639 return 1;
2640};
2641
2642int
2643runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2644{
2645 return 1;
2646 usleep (10000);
2647 return 1;
2648}
2649
2650
2651//{ return 1; } ;
2652
2653int
2654runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2655{
2656 return 1;
2657};
2658
2659
2660
2661
2662int
2663eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2664{
2665 int i = 0;
2666
2667// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2668// for (i=0; i<NBOARDS; i++) {
2669// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2670// }
2671 return 0;
2672}
2673
2674
2675void
2676factStatNew (EVT_STAT gi)
2677{
2678 int i;
2679
2680//for (i=0;i<MAX_SOCK;i++) {
2681// printf("%4d",gi.numRead[i]);
2682// if (i%20 == 0 ) printf("\n");
2683//}
2684}
2685
2686void
2687gotNewRun (int runnr, PEVNT_HEADER * headers)
2688{
2689 printf ("got new run %d\n", runnr);
2690 return;
2691}
2692
2693void
2694factStat (GUI_STAT gj)
2695{
2696// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2697// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2698}
2699
2700
2701void
2702debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2703 int state, uint32_t tsec, uint32_t tusec)
2704{
2705// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2706}
2707
2708
2709
2710void
2711debugStream (int isock, void *buf, int len)
2712{
2713}
2714
2715void
2716debugHead (int i, int j, void *buf)
2717{
2718}
2719
2720
2721void
2722factOut (int severity, int err, char *message)
2723{
2724 static FILE *fd;
2725 static int file = 0;
2726
2727 if (file == 0) {
2728 printf ("open file\n");
2729 fd = fopen ("x.out", "w+");
2730 file = 999;
2731 }
2732
2733 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2734
2735 if (severity != kDebug)
2736 printf ("%3d %3d | %s\n", severity, err, message);
2737}
2738
2739
2740
2741int
2742main ()
2743{
2744 int i, b, c, p;
2745 char ipStr[100];
2746 struct in_addr IPaddr;
2747
2748 g_maxMem = 1024 * 1024; //MBytes
2749//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2750 g_maxMem = g_maxMem * 200; //100MBytes
2751
2752 g_maxProc = 20;
2753 g_maxSize = 30000;
2754
2755 g_runStat = 40;
2756
2757 i = 0;
2758
2759// version for standard crates
2760//for (c=0; c<4,c++) {
2761// for (b=0; b<10; b++) {
2762// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2763//
2764// inet_pton(PF_INET, ipStr, &IPaddr) ;
2765//
2766// g_port[i].sockAddr.sin_family = PF_INET;
2767// g_port[i].sockAddr.sin_port = htons(5000) ;
2768// g_port[i].sockAddr.sin_addr = IPaddr ;
2769// g_port[i].sockDef = 1 ;
2770// i++ ;
2771// }
2772//}
2773//
2774//version for PC-test *
2775 for (c = 0; c < 4; c++) {
2776 for (b = 0; b < 10; b++) {
2777 sprintf (ipStr, "10.0.%d.11", 128 + c);
2778 if (c < 2)
2779 sprintf (ipStr, "10.0.%d.11", 128);
2780 else
2781 sprintf (ipStr, "10.0.%d.11", 131);
2782// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2783
2784 inet_pton (PF_INET, ipStr, &IPaddr);
2785 p = 31919 + 100 * c + 10 * b;
2786
2787
2788 g_port[i].sockAddr.sin_family = PF_INET;
2789 g_port[i].sockAddr.sin_port = htons (p);
2790 g_port[i].sockAddr.sin_addr = IPaddr;
2791 g_port[i].sockDef = 1;
2792
2793 i++;
2794 }
2795 }
2796
2797
2798//g_port[17].sockDef =-1 ;
2799//g_actBoards-- ;
2800
2801 StartEvtBuild ();
2802
2803 return 0;
2804
2805}
2806#endif
Note: See TracBrowser for help on using the repository browser.