source: trunk/FACT++/src/EventBuilder.c@ 12461

Last change on this file since 12461 was 12460, checked in by lyard, 14 years ago
added double check before releasing memory
File size: 87.1 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6
7#include <stdlib.h>
8#include <stdint.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include "EventBuilder.h"
26
27enum Severity
28{
29 kMessage = 10, ///< Just a message, usually obsolete
30 kInfo = 20, ///< An info telling something which can be interesting to know
31 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
32 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
33 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
34 kDebug = 99, ///< A message used for debugging only
35};
36
37#define MIN_LEN 32 // min #bytes needed to interpret FADheader
38#define MAX_LEN 256*1024 // size of read-buffer per socket
39
40//#define nanosleep(x,y)
41
42extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
43extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
44extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
45//extern int runFinish (uint32_t runnr);
46
47extern void factOut (int severity, int err, char *message);
48
49extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
50
51
52extern void factStat (GUI_STAT gj);
53
54extern void factStatNew (EVT_STAT gi);
55
56extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
57
58extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
59 int8_t * buffer);
60
61extern void debugHead (int i, int j, void *buf);
62
63extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
64 int32_t runnr, int state, uint32_t tsec,
65 uint32_t tusec);
66extern void debugStream (int isock, void *buf, int len);
67
68int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
69
70
71
72
73
74int g_maxProc;
75int g_maxSize;
76int gi_maxSize;
77int gi_maxProc;
78
79uint g_actTime;
80uint g_actUsec;
81int g_runStat;
82int g_reset;
83int g_useFTM;
84
85int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
86size_t g_maxMem; //maximum memory allowed for buffer
87
88//no longer needed ...
89int g_maxBoards; //maximum number of boards to be initialized
90int g_actBoards;
91//
92
93FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
94
95
96int gi_runStat;
97int gp_runStat;
98int gw_runStat;
99
100int gi_memStat = +1;
101
102uint32_t gi_myRun = 0;
103uint32_t actrun = 0;
104
105
106uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
107
108//uint gi_EvtStart= 0 ;
109//uint gi_EvtRead = 0 ;
110//uint gi_EvtBad = 0 ;
111//uint gi_EvtTot = 0 ;
112//size_t gi_usedMem = 0 ;
113
114//uint gw_EvtTot = 0 ;
115//uint gp_EvtTot = 0 ;
116
117PIX_MAP g_pixMap[NPIX];
118
119EVT_STAT gi;
120GUI_STAT gj;
121
122EVT_CTRL evtCtrl; //control of events during processing
123int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
124
125WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
126
127#define MXSTR 1000
128char str[MXSTR];
129
130//ETIENNE
131#define MAX_SLOTS_PER_CHUNK 100
132
133typedef struct {
134 int32_t eventNumber;
135 int32_t chunk;
136 int32_t slot;
137} CHUNK_MAPPING;
138
139CHUNK_MAPPING mBufferMapping[MAX_EVT * MAX_RUN];
140
141#define MAX_EVT_MEM (sizeof(EVENT) + NPIX*1024*2 + NTMARK*1024*2)
142#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
143#define MAX_SLOT_SIZE (MAX_EVT_MEM + MAX_HEAD_MEM)
144#define MAX_CHUNK_SIZE (MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK)
145
146typedef struct {
147 void * pointers[MAX_SLOTS_PER_CHUNK];
148 int32_t events[MAX_SLOTS_PER_CHUNK];
149 int32_t nFreeSlots;
150 int32_t nSlots;
151} ETI_CHUNK;
152
153int32_t numAllocatedChunks = 0;
154
155#define MAX_CHUNKS 8096
156ETI_CHUNK EtiMemoryChunks[MAX_CHUNKS];
157
158void* ETI_Malloc(int evtId, int evtIndex)
159{
160 int i,j;
161 for (i=0;i<numAllocatedChunks;i++) {
162 if (EtiMemoryChunks[i].nFreeSlots > 0) {
163 for (j=0;j<EtiMemoryChunks[i].nSlots;j++)
164 {
165 if (EtiMemoryChunks[i].events[j] == -1)
166 {
167 EtiMemoryChunks[i].events[j] = evtId;
168 EtiMemoryChunks[i].nFreeSlots--;
169 mBufferMapping[evtIndex].eventNumber = evtId;
170 mBufferMapping[evtIndex].chunk = i;
171 mBufferMapping[evtIndex].slot = j;
172 return EtiMemoryChunks[i].pointers[j];
173 }
174 }
175 //If I reach this point then we have a problem because it should have found
176 //a free spot just above.
177 }
178 }
179 //If we reach this point this means that we should allocate more memory
180 int32_t numNewSlots = 0;
181 if ((numAllocatedChunks + 1)*MAX_CHUNK_SIZE < g_maxMem)
182 numNewSlots = MAX_SLOTS_PER_CHUNK;
183 else
184 numNewSlots = (g_maxMem - numAllocatedChunks*MAX_CHUNK_SIZE)/MAX_SLOT_SIZE;
185
186 if (numNewSlots == 0)//cannot allocate more without exceeding the max mem limit
187 {
188 return NULL;
189 }
190
191 EtiMemoryChunks[numAllocatedChunks].pointers[0] = malloc(MAX_SLOT_SIZE*numNewSlots);
192 if (EtiMemoryChunks[numAllocatedChunks].pointers[0] == NULL)
193 {
194 snprintf (str, MXSTR, "could not allocate %lu bytes. %d chunks are currently allocated (max allowed %lu bytes)", MAX_CHUNK_SIZE, numAllocatedChunks, g_maxMem);
195 factOut (kError, 000, str);
196 return NULL;
197 }
198
199 EtiMemoryChunks[numAllocatedChunks].nSlots = numNewSlots;
200 EtiMemoryChunks[numAllocatedChunks].events[0] = evtId;
201 EtiMemoryChunks[numAllocatedChunks].nFreeSlots = numNewSlots-1;
202 mBufferMapping[evtIndex].eventNumber = evtId;
203 mBufferMapping[evtIndex].chunk = numAllocatedChunks;
204 mBufferMapping[evtIndex].slot = 0;
205
206 for (i=1;i<numNewSlots;i++)
207 {
208 EtiMemoryChunks[numAllocatedChunks].pointers[i] = &(((char*)(EtiMemoryChunks[numAllocatedChunks].pointers[i-1]))[MAX_SLOT_SIZE]);
209 EtiMemoryChunks[numAllocatedChunks].events[i] = -1;
210 }
211 numAllocatedChunks++;
212
213 return EtiMemoryChunks[numAllocatedChunks-1].pointers[0];
214}
215
216void ETI_Free(int evtId, int evtIndex)
217{
218 ETI_CHUNK* currentChunk = &EtiMemoryChunks[mBufferMapping[evtIndex].chunk];
219 if (currentChunk->events[mBufferMapping[evtIndex].slot] != evtId)
220 {
221 snprintf (str, MXSTR, "Mismatch in chunk mapping table. expected evtId %d. Got %d. No memory was freed.", evtId, currentChunk->events[mBufferMapping[evtIndex].slot]);
222 factOut (kError, 000, str);
223 return;
224 }
225 currentChunk->events[mBufferMapping[evtIndex].slot] = -1;
226 currentChunk->nFreeSlots++;
227
228 int chunkIndex = mBufferMapping[evtIndex].chunk;
229 if (chunkIndex != numAllocatedChunks-1)
230 return;
231
232 while (EtiMemoryChunks[chunkIndex].nFreeSlots == EtiMemoryChunks[chunkIndex].nSlots)
233 {//free this chunk
234 if (EtiMemoryChunks[chunkIndex].pointers[0] == NULL)
235 {
236 factOut(kError, "Chunk %d not allocated as it ought to be. Skipping release", chunkIndex);
237 return;
238 }
239 free(EtiMemoryChunks[chunkIndex].pointers[0]);
240 EtiMemoryChunks[chunkIndex].pointers[0] = NULL;
241 numAllocatedChunks--;
242 chunkIndex--;
243 if (numAllocatedChunks == 0)
244 break;
245 }
246}
247//END ETIENNE
248
249
250
251RUN_HEAD actRun;
252
253RUN_CTRL runCtrl[MAX_RUN];
254
255RUN_TAIL runTail[MAX_RUN];
256
257
258/*
259*** Definition of rdBuffer to read in IP packets; keep it global !!!!
260 */
261
262
263typedef union
264{
265 int8_t B[MAX_LEN];
266 int16_t S[MAX_LEN / 2];
267 int32_t I[MAX_LEN / 4];
268 int64_t L[MAX_LEN / 8];
269} CNV_FACT;
270
271typedef struct
272{
273 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
274 int32_t bufPos; //next byte to read to the buffer next
275 int32_t bufLen; //number of bytes left to read
276// size_t bufLen; //number of bytes left to read size_t might be better
277 int32_t skip; //number of bytes skipped before start of event
278
279 int errCnt; //how often connect failed since last successful
280 int sockStat; //-1 if socket not yet connected , 99 if not exist
281 int socket; //contains the sockets
282 struct sockaddr_in SockAddr; //IP for each socket
283
284 int evtID; // event ID of event currently read
285 int runID; // run "
286 int ftmID; // event ID from FTM
287 uint fadLen; // FADlength of event currently read
288 int fadVers; // Version of FAD
289 int ftmTyp; // trigger type
290 int board; // boardID (softwareID: 0..40 )
291 int Port;
292
293 CNV_FACT *rBuf;
294
295#ifdef EVTDEBUG
296 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
297#endif
298
299} READ_STRUCT;
300
301
302typedef union
303{
304 int8_t B[2];
305 int16_t S;
306} SHORT_BYTE;
307
308
309
310
311
312SHORT_BYTE start, stop;
313
314READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
315
316
317
318/*-----------------------------------------------------------------*/
319
320
321/*-----------------------------------------------------------------*/
322
323int
324runFinish1 (uint32_t runnr)
325{
326 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
327 runnr);
328 factOut (kInfo, 173, str); //but continue anyhow
329 return 0;
330}
331int
332runFinish (uint32_t runnr)
333{
334 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
335 runnr);
336 factOut (kInfo, 173, str); //but continue anyhow
337 return 0;
338}
339
340int
341GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
342 READ_STRUCT * rd)
343{
344/*
345*** generate Address, create sockets and allocates readbuffer for it
346***
347*** if flag==0 generate socket and buffer
348*** <0 destroy socket and buffer
349*** >0 close and redo socket
350***
351*** sid : board*7 + port id
352 */
353
354 int j;
355 int optval = 1; //activate keepalive
356 socklen_t optlen = sizeof (optval);
357
358
359 if (sid % 7 >= NUMSOCK) {
360 //this is a not used socket, so do nothing ...
361 rd->sockStat = 77;
362 rd->rBuf = NULL ;
363 return 0;
364 }
365
366 if (rd->sockStat == 0) { //close socket if open
367 j = close (rd->socket);
368 if (j > 0) {
369 snprintf (str, MXSTR, "Error closing socket %d | %m", sid);
370 factOut (kFatal, 771, str);
371 } else {
372 snprintf (str, MXSTR, "Succesfully closed socket %d", sid);
373 factOut (kInfo, 771, str);
374 }
375 }
376
377 rd->sockStat = 99;
378
379 if (flag < 0) {
380 free (rd->rBuf); //and never open again
381#ifdef EVTDEBUG
382 free (rd->xBuf); //and never open again
383#endif
384 rd->rBuf = NULL;
385 return 0;
386 }
387
388
389 if (flag == 0) { //generate address and buffer ...
390 rd->Port = port;
391 rd->SockAddr.sin_family = sockAddr->sin_family;
392 rd->SockAddr.sin_port = htons (port);
393 rd->SockAddr.sin_addr = sockAddr->sin_addr;
394
395#ifdef EVTDEBUG
396 rd->xBuf = malloc (sizeof (CNV_FACT));
397#endif
398 rd->rBuf = malloc (sizeof (CNV_FACT));
399 if (rd->rBuf == NULL) {
400 snprintf (str, MXSTR, "Could not create local buffer %d", sid);
401 factOut (kFatal, 774, str);
402 rd->sockStat = 77;
403 return -3;
404 }
405 }
406
407
408 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
409 snprintf (str, MXSTR, "Could not generate socket %d | %m", sid);
410 factOut (kFatal, 773, str);
411 rd->sockStat = 88;
412 return -2;
413 }
414 optval = 1;
415 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
416 snprintf (str, MXSTR, "Could not set keepalive %d | %m", sid);
417 factOut (kInfo, 173, str); //but continue anyhow
418 }
419 optval = 10; //start after 10 seconds
420 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
421 snprintf (str, MXSTR, "Could not set keepidle %d | %m", sid);
422 factOut (kInfo, 173, str); //but continue anyhow
423 }
424 optval = 10; //do every 10 seconds
425 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
426 snprintf (str, MXSTR, "Could not set keepintvl %d | %m", sid);
427 factOut (kInfo, 173, str); //but continue anyhow
428 }
429 optval = 2; //close after 2 unsuccessful tries
430 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
431 snprintf (str, MXSTR, "Could not set keepalive probes %d | %m", sid);
432 factOut (kInfo, 173, str); //but continue anyhow
433 }
434
435
436 snprintf (str, MXSTR, "Successfully generated socket %d ", sid);
437 factOut (kInfo, 773, str);
438 rd->sockStat = -1; //try to (re)open socket
439 rd->errCnt = 0;
440 return 0;
441
442} /*-----------------------------------------------------------------*/
443
444 /*-----------------------------------------------------------------*/
445
446
447
448
449int
450mBufInit ()
451{
452// initialize mBuffer (mark all entries as unused\empty)
453
454 int i;
455 uint32_t actime;
456
457 actime = g_actTime + 50000000;
458
459 for (i = 0; i < MAX_EVT * MAX_RUN; i++) {
460 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
461 mBuffer[i].runNum = 0;
462
463 evtCtrl.evtBuf[i] = -1;
464 evtCtrl.evtStat[i] = -1;
465 evtCtrl.pcTime[i] = actime; //initiate to far future
466
467 //ETIENNE
468 mBufferMapping[i].chunk = -1;
469 mBufferMapping[i].eventNumber = -1;
470 mBufferMapping[i].slot = -1;
471 for (i=0;i<MAX_CHUNKS;i++)
472 EtiMemoryChunks[i].pointers[0] = NULL;
473 //END ETIENNE
474 }
475
476 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
477
478 evtCtrl.frstPtr = 0;
479 evtCtrl.lastPtr = 0;
480
481 return 0;
482
483} /*-----------------------------------------------------------------*/
484
485
486
487
488int
489mBufEvt (int evID, uint runID, int nRoi[], int sk,
490 int fadlen, int trgTyp, int trgNum, int fadNum)
491{
492// generate a new Event into mBuffer:
493// make sure only complete Event are possible, so 'free' will always work
494// returns index into mBuffer[], or negative value in case of error
495// error: <-9000 if roi screwed up (not consistent with run)
496// <-8000 (not consistent with event)
497// <-7000 (not consistent with board)
498// < 0 if no space left
499
500 struct timeval *tv, atv;
501 tv = &atv;
502 uint32_t tsec, tusec;
503 uint oldest;
504 int jold;
505
506 int i, k, jr, b, evFree;
507 int headmem = 0;
508 size_t needmem = 0;
509
510
511 b = sk / 7;
512
513 if (nRoi[0] < 0 || nRoi[0] > 1024) {
514 snprintf (str, MXSTR, "illegal nRoi[0]: %d", nRoi[0]);
515 factOut (kError, 999, str);
516 gj.badRoiR++;
517 gj.badRoi[b]++;
518 return -9999;
519 }
520
521 for (jr = 1; jr < 8; jr++) {
522 if (nRoi[jr] != nRoi[0]) {
523 snprintf (str, MXSTR, "wrong nRoi[%d]: %d %d", jr, nRoi[jr],
524 nRoi[0]);
525 factOut (kError, 711, str);
526 gj.badRoiB++;
527 gj.badRoi[b]++;
528 return -7101;
529 }
530 }
531 if (nRoi[8] < nRoi[0]) {
532 snprintf (str, MXSTR, "wrong nRoi_TM: %d %d", nRoi[8], nRoi[0]);
533 factOut (kError, 712, str);
534 gj.badRoiB++;
535 gj.badRoi[b]++;
536 return -7102;
537 }
538
539
540 i = evID % MAX_EVT;
541 evFree = -1;
542
543 for (k = 0; k < MAX_RUN; k++) {
544 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
545 // is it ok ????
546 if (mBuffer[i].nRoi != nRoi[0]
547 || mBuffer[i].nRoiTM != nRoi[8]) {
548 snprintf (str, MXSTR, "illegal evt_roi %d %d ; %d %d",
549 nRoi[0], nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM);
550 factOut (kError, 821, str);
551 gj.badRoiE++;
552 gj.badRoi[b]++;
553 return -8201;
554 }
555// count for inconsistencies
556
557 if (mBuffer[i].trgNum != trgNum)
558 mBuffer[i].Errors[0]++;
559 if (mBuffer[i].fadNum != fadNum)
560 mBuffer[i].Errors[1]++;
561 if (mBuffer[i].trgTyp != trgTyp)
562 mBuffer[i].Errors[2]++;
563
564 //everything seems fine so far ==> use this slot ....
565 return i;
566 }
567 if (evFree < 0 && mBuffer[i].evNum < 0)
568 evFree = i;
569 i += MAX_EVT;
570 }
571
572
573 //event does not yet exist; create it
574 if (evFree < 0) { //no space available in ctrl
575 snprintf (str, MXSTR, "no control slot to keep event %d", evID);
576 factOut (kError, 881, str);
577 return -1;
578 }
579 i = evFree; //found free entry; use it ...
580
581 gettimeofday (tv, NULL);
582 tsec = atv.tv_sec;
583 tusec = atv.tv_usec;
584
585 //check if runId already registered in runCtrl
586 evFree = -1;
587 oldest = g_actTime + 1000;
588 jold = -1;
589 for (k = 0; k < MAX_RUN; k++) {
590 if (runCtrl[k].runId == runID) {
591// if (runCtrl[k].procId > 0) { //run is closed -> reject
592// snprintf (str, MXSTR, "skip event since run %d finished", runID);
593// factOut (kInfo, 931, str);
594// return -21;
595// }
596
597 if (runCtrl[k].roi0 != nRoi[0]
598 || runCtrl[k].roi8 != nRoi[8]) {
599 snprintf (str, MXSTR, "illegal run_roi %d %d ; %d %d",
600 nRoi[0], nRoi[8], runCtrl[k].roi0, runCtrl[k].roi8);
601 factOut (kError, 931, str);
602 gj.badRoiR++;
603 gj.badRoi[b]++;
604 return -9301;
605 }
606 goto RUNFOUND;
607 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
608 evFree = k;
609 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
610 if (runCtrl[k].closeTime < oldest) {
611 oldest = runCtrl[k].closeTime;
612 jold = k;
613 }
614 }
615 }
616
617 if (evFree < 0 && jold < 0) {
618 snprintf (str, MXSTR, "not able to register the new run %d", runID);
619 factOut (kFatal, 883, str);
620 return -1001;
621 } else {
622 if (evFree < 0)
623 evFree = jold;
624 snprintf (str, MXSTR, "register new run %d(%d) roi: %d %d", runID,
625 evFree, nRoi[0], nRoi[8]);
626 factOut (kInfo, 503, str);
627 runCtrl[evFree].runId = runID;
628 runCtrl[evFree].roi0 = nRoi[0];
629 runCtrl[evFree].roi8 = nRoi[8];
630 runCtrl[evFree].fileId = -2;
631 runCtrl[evFree].procId = -2;
632 runCtrl[evFree].lastEvt = -1;
633 runCtrl[evFree].nextEvt = 0;
634 runCtrl[evFree].actEvt = 0;
635 runCtrl[evFree].procEvt = 0;
636 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
637 runCtrl[evFree].firstUsec = tusec;
638 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
639 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
640// runCtrl[evFree].lastTime = 0;
641
642 runTail[evFree].nEventsOk =
643 runTail[evFree].nEventsRej =
644 runTail[evFree].nEventsBad =
645 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
646 }
647
648 RUNFOUND:
649 //ETIENNE
650/* needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
651
652 headmem = NBOARDS * sizeof (PEVNT_HEADER);
653
654 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
655 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
656 if (gi_memStat > 0) {
657 gi_memStat = -99;
658 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
659 evID, sk);
660 factOut (kError, 882, str);
661 } else {
662 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
663 evID, sk);
664 factOut (kDebug, 882, str);
665 }
666 return -11;
667 }
668*/
669 mBuffer[i].FADhead = ETI_Malloc(evID, i);
670 mBuffer[i].buffer = NULL;
671 if (mBuffer[i].FADhead != NULL)
672 mBuffer[i].fEvent = (EVENT*)&(((char*)(mBuffer[i].FADhead))[MAX_HEAD_MEM]);
673 else
674 {
675 mBuffer[i].fEvent = NULL;
676 gj.usdMem = 0;
677 for (k=0;k<numAllocatedChunks;k++)
678 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
679 if (gj.usdMem > gj.maxMem)
680 gj.maxMem = gj.usdMem;
681 if (gi_memStat > 0) {
682 gi_memStat = -99;
683 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
684 evID, sk);
685 factOut (kError, 882, str);
686 } else {
687 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
688 evID, sk);
689 factOut (kDebug, 882, str);
690 }
691 return -11;
692 }
693 /*
694 mBuffer[i].FADhead = malloc (headmem);
695 if (mBuffer[i].FADhead == NULL) {
696 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
697 factOut (kError, 882, str);
698 return -12;
699 }
700
701 mBuffer[i].fEvent = malloc (needmem);
702 if (mBuffer[i].fEvent == NULL) {
703 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
704 factOut (kError, 882, str);
705 free (mBuffer[i].FADhead);
706 mBuffer[i].FADhead = NULL;
707 return -22;
708 }
709
710 mBuffer[i].buffer = malloc (gi_maxSize);
711 if (mBuffer[i].buffer == NULL) {
712 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
713 factOut (kError, 882, str);
714 free (mBuffer[i].FADhead);
715 mBuffer[i].FADhead = NULL;
716 free (mBuffer[i].fEvent);
717 mBuffer[i].fEvent = NULL;
718 return -32;
719 }*/
720 //END ETIENNE
721 //flag all boards as unused
722 mBuffer[i].nBoard = 0;
723 for (k = 0; k < NBOARDS; k++) {
724 mBuffer[i].board[k] = -1;
725 }
726 //flag all pixels as unused
727 for (k = 0; k < NPIX; k++) {
728 mBuffer[i].fEvent->StartPix[k] = -1;
729 }
730 //flag all TMark as unused
731 for (k = 0; k < NTMARK; k++) {
732 mBuffer[i].fEvent->StartTM[k] = -1;
733 }
734
735 mBuffer[i].fEvent->NumBoards = 0;
736 mBuffer[i].fEvent->PCUsec = tusec;
737 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
738 mBuffer[i].nRoi = nRoi[0];
739 mBuffer[i].nRoiTM = nRoi[8];
740 mBuffer[i].evNum = evID;
741 mBuffer[i].runNum = runID;
742 mBuffer[i].fadNum = fadNum;
743 mBuffer[i].trgNum = trgNum;
744 mBuffer[i].trgTyp = trgTyp;
745 mBuffer[i].evtLen = needmem;
746 mBuffer[i].Errors[0] =
747 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
748//ETIENNE
749 //gj.usdMem += needmem + headmem + gi_maxSize;
750 gj.usdMem = 0;
751 for (k=0;k<numAllocatedChunks;k++)
752 {
753 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
754 }
755//END ETIENNE
756 if (gj.usdMem > gj.maxMem)
757 gj.maxMem = gj.usdMem;
758
759 gj.bufTot++;
760 if (gj.bufTot > gj.maxEvt)
761 gj.maxEvt = gj.bufTot;
762
763 gj.rateNew++;
764
765 //register event in 'active list (reading)'
766
767 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
768 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
769 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
770 evtIdx[i] = evtCtrl.lastPtr;
771
772
773 snprintf (str, MXSTR,
774 "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", evID,
775 runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
776 factOut (kDebug, -11, str);
777 evtCtrl.lastPtr++;
778 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN)
779 evtCtrl.lastPtr = 0;
780
781 gi.evtGet++;
782
783 return i;
784
785} /*-----------------------------------------------------------------*/
786
787
788
789
790int
791mBufFree (int i)
792{
793//delete entry [i] from mBuffer:
794//(and make sure multiple calls do no harm ....)
795
796 int headmem = 0;
797 int evid;
798 size_t freemem = 0;
799
800 evid = mBuffer[i].evNum;
801 freemem = mBuffer[i].evtLen;
802 //ETIENNE
803 ETI_Free(evid, i);
804// free (mBuffer[i].fEvent);
805 mBuffer[i].fEvent = NULL;
806
807// free (mBuffer[i].FADhead);
808 mBuffer[i].FADhead = NULL;
809
810// free (mBuffer[i].buffer);
811 mBuffer[i].buffer = NULL;
812//END ETIENNE
813 headmem = NBOARDS * sizeof (PEVNT_HEADER);
814 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
815 mBuffer[i].runNum = 0;
816//ETIENNE
817// gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
818 gj.usdMem = 0;
819 int k;
820 for (k=0;k<numAllocatedChunks;k++)
821 {
822 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
823 }
824//END ETIENNE
825 gj.bufTot--;
826
827 if (gi_memStat < 0) {
828 if (gj.usdMem <= 0.75 * gj.maxMem)
829 gi_memStat = +1;
830 }
831
832
833 return 0;
834
835} /*-----------------------------------------------------------------*/
836
837
838void
839resetEvtStat ()
840{
841 int i;
842
843 for (i = 0; i < MAX_SOCK; i++)
844 gi.numRead[i] = 0;
845
846 for (i = 0; i < NBOARDS; i++) {
847 gi.gotByte[i] = 0;
848 gi.gotErr[i] = 0;
849
850 }
851
852 gi.evtGet = 0; //#new Start of Events read
853 gi.evtTot = 0; //#complete Events read
854 gi.evtErr = 0; //#Events with Errors
855 gi.evtSkp = 0; //#Events incomplete (timeout)
856
857 gi.procTot = 0; //#Events processed
858 gi.procErr = 0; //#Events showed problem in processing
859 gi.procTrg = 0; //#Events accepted by SW trigger
860 gi.procSkp = 0; //#Events rejected by SW trigger
861
862 gi.feedTot = 0; //#Events used for feedBack system
863 gi.feedErr = 0; //#Events rejected by feedBack
864
865 gi.wrtTot = 0; //#Events written to disk
866 gi.wrtErr = 0; //#Events with write-error
867
868 gi.runOpen = 0; //#Runs opened
869 gi.runClose = 0; //#Runs closed
870 gi.runErr = 0; //#Runs with open/close errors
871
872 return;
873} /*-----------------------------------------------------------------*/
874
875
876
877void
878initReadFAD ()
879{
880 return;
881} /*-----------------------------------------------------------------*/
882
883
884
885void *
886readFAD (void *ptr)
887{
888/* *** main loop reading FAD data and sorting them to complete events */
889 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k;
890 int actBoards = 0, minLen;
891 int32_t jrd;
892 uint gi_SecTime; //time in seconds
893 int boardId, roi[9], drs, px, src, pixS, pixH, pixC, pixR, tmS;
894
895 int goodhed = 0;
896
897 int sockDef[NBOARDS]; //internal state of sockets
898 int jrdx;
899
900
901 struct timespec xwait;
902
903
904 struct timeval *tv, atv;
905 tv = &atv;
906 uint32_t tsec, tusec;
907
908
909 snprintf (str, MXSTR, "start initializing");
910 factOut (kInfo, -1, str);
911
912 int cpu = 7; //read thread
913 cpu_set_t mask;
914
915/* CPU_ZERO initializes all the bits in the mask to zero. */
916// CPU_ZERO (&mask);
917/* CPU_SET sets only the bit corresponding to cpu. */
918 cpu = 7;
919// CPU_SET (cpu, &mask);
920
921/* sched_setaffinity returns 0 in success */
922// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
923// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
924// factOut (kWarn, -1, str);
925// }
926
927
928 head_len = sizeof (PEVNT_HEADER);
929 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
930 minLen = head_len; //min #bytes needed to check header: full header for debug
931
932 start.S = 0xFB01;
933 stop.S = 0x04FE;
934
935/* initialize run control logics */
936 for (i = 0; i < MAX_RUN; i++) {
937 runCtrl[i].runId = 0;
938 runCtrl[i].fileId = -2;
939 runCtrl[i].procId = -2;
940 }
941 gi_resetS = gi_resetR = 9;
942 for (i = 0; i < NBOARDS; i++)
943 sockDef[i] = 0;
944
945 START:
946 gettimeofday (tv, NULL);
947 g_actTime = tsec = atv.tv_sec;
948 g_actUsec = tusec = atv.tv_usec;
949 gi_myRun = g_actTime;
950 evtCtrl.frstPtr = 0;
951 evtCtrl.lastPtr = 0;
952
953 gi_SecTime = g_actTime;
954 gi_runStat = g_runStat;
955 gj.readStat = g_runStat;
956 numok = numok2 = 0;
957
958 int cntsock = 8 - NUMSOCK ;
959
960 if (gi_resetS > 0) {
961 //make sure all sockets are preallocated as 'not exist'
962 for (i = 0; i < MAX_SOCK; i++) {
963 rd[i].socket = -1;
964 rd[i].sockStat = 99;
965 }
966
967 for (k = 0; k < NBOARDS; k++) {
968 gi_NumConnect[k] = 0;
969 gi.numConn[k] = 0;
970 gj.numConn[k] = 0;
971 gj.errConn[k] = 0;
972 gj.rateBytes[k] = 0;
973 gj.totBytes[k] = 0;
974 }
975
976 }
977
978
979 if (gi_resetR > 0) {
980 resetEvtStat ();
981 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
982 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
983 gj.totMem = g_maxMem;
984 gj.bufNew = gj.bufEvt = 0;
985 gj.badRoiE = gj.badRoiR = gj.badRoiB =
986 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
987
988 int b,p;
989 for (b = 0; b < NBOARDS; b++)
990 gj.badRoi[b] = 0;
991
992 mBufInit (); //initialize buffers
993
994 snprintf (str, MXSTR, "end initializing");
995 factOut (kInfo, -1, str);
996 }
997
998
999 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
1000
1001 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
1002
1003 gi_runStat = g_runStat;
1004 gj.readStat = g_runStat;
1005 gettimeofday (tv, NULL);
1006 g_actTime = tsec = atv.tv_sec;
1007 g_actUsec = tusec = atv.tv_usec;
1008
1009
1010 int b, p, p0, s0, nch;
1011 nch = 0;
1012 for (b = 0; b < NBOARDS; b++) {
1013 k = b * 7;
1014 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
1015 nch++;
1016 gi_NumConnect[b] = 0; //must close all connections
1017 gi.numConn[b] = 0;
1018 gj.numConn[b] = 0;
1019 if (sockDef[b] == 0)
1020 s0 = 0; //sockets to be defined and opened
1021 else if (g_port[b].sockDef == 0)
1022 s0 = -1; //sockets to be destroyed
1023 else
1024 s0 = +1; //sockets to be closed and reopened
1025
1026 if (s0 == 0)
1027 p0 = ntohs (g_port[b].sockAddr.sin_port);
1028 else
1029 p0 = 0;
1030
1031 for (p = p0 + 1; p < p0 + 8; p++) {
1032 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
1033 k++;
1034 }
1035 sockDef[b] = g_port[b].sockDef;
1036 }
1037 }
1038
1039 if (nch > 0) {
1040 actBoards = 0;
1041 for (b = 0; b < NBOARDS; b++) {
1042 if (sockDef[b] > 0)
1043 actBoards++;
1044 }
1045 }
1046
1047
1048 jrdx = 0;
1049 numokx = 0;
1050 numok = 0; //count number of succesfull actions
1051
1052 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read
1053 b = i / 7 ;
1054 p = i % 7 ;
1055
1056if ( p >= NUMSOCK) { ; }
1057else {
1058 if (sockDef[b] > 0)
1059 s0 = +1;
1060 else
1061 s0 = -1;
1062
1063 if (rd[i].sockStat < 0) { //try to connect if not yet done
1064 if (rd[i].sockStat == -1) {
1065 rd[i].sockStat = connect (rd[i].socket,
1066 (struct sockaddr *) &rd[i].SockAddr,
1067 sizeof (rd[i].SockAddr));
1068 if (rd[i].sockStat == -1) {
1069 rd[i].errCnt++ ;
1070// if (rd[i].errCnt < 100) rd[i].sockStat = rd[i].errCnt ;
1071// else if (rd[i].errCnt < 1000) rd[i].sockStat = 1000 ;
1072// else rd[i].sockStat = 10000 ;
1073 }
1074//printf("try to connect %d -> %d\n",i,rd[i].sockStat);
1075
1076 }
1077 if (rd[i].sockStat < -1 ) {
1078 rd[i].sockStat++ ;
1079 }
1080 if (rd[i].sockStat == 0) { //successfull ==>
1081 if (sockDef[b] > 0) {
1082 rd[i].bufTyp = 0; // expect a header
1083 rd[i].bufLen = frst_len; // max size to read at begining
1084 } else {
1085 rd[i].bufTyp = -1; // full data to be skipped
1086 rd[i].bufLen = MAX_LEN; //huge for skipping
1087 }
1088 rd[i].bufPos = 0; // no byte read so far
1089 rd[i].skip = 0; // start empty
1090// gi_NumConnect[b]++;
1091 gi_NumConnect[b] += cntsock ;
1092
1093 gi.numConn[b]++;
1094 gj.numConn[b]++;
1095 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);
1096 factOut (kInfo, -1, str);
1097 }
1098 }
1099
1100 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
1101 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
1102 numok++;
1103 size_t maxread = rd[i].bufLen ;
1104 if (maxread > MAXREAD ) maxread=MAXREAD ;
1105
1106 jrd =
1107 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
1108 maxread, MSG_DONTWAIT);
1109// rd[i].bufLen, MSG_DONTWAIT);
1110
1111 if (jrd > 0) {
1112 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
1113#ifdef EVTDEBUG
1114 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
1115 &rd[i].rBuf->B[rd[i].bufPos], jrd);
1116 snprintf (str, MXSTR,
1117 "read sock %3d bytes %5d len %5d first %d %d", i,
1118 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos],
1119 rd[i].rBuf->B[rd[i].bufPos + 1]);
1120 factOut (kDebug, 301, str);
1121#endif
1122 }
1123
1124 if (jrd == 0) { //connection has closed ...
1125 snprintf (str, MXSTR, "Socket %d closed by FAD", i);
1126 factOut (kInfo, 441, str);
1127 GenSock (s0, i, 0, NULL, &rd[i]);
1128 gi.gotErr[b]++;
1129// gi_NumConnect[b]--;
1130 gi_NumConnect[b]-= cntsock ;
1131 gi.numConn[b]--;
1132 gj.numConn[b]--;
1133
1134 } else if (jrd < 0) { //did not read anything
1135 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1136 snprintf (str, MXSTR, "Error Reading from %d | %m", i);
1137 factOut (kError, 442, str);
1138 gi.gotErr[b]++;
1139 } else
1140 numok--; //else nothing waiting to be read
1141 jrd = 0;
1142 }
1143 } else {
1144 jrd = 0; //did read nothing as requested
1145 snprintf (str, MXSTR, "do not read from socket %d %d", i,
1146 rd[i].bufLen);
1147 factOut (kDebug, 301, str);
1148 }
1149
1150 gi.gotByte[b] += jrd;
1151 gj.rateBytes[b] += jrd;
1152
1153 if (jrd > 0) {
1154 numokx++;
1155 jrdx += jrd;
1156 }
1157
1158
1159 if (rd[i].bufTyp < 0) { // we are skipping this board ...
1160// just do nothing
1161#ifdef EVTDEBUG
1162 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
1163 i);
1164 factOut (kInfo, 301, str);
1165#endif
1166
1167 } else if (rd[i].bufTyp > 0) { // we are reading data ...
1168 if (jrd < rd[i].bufLen) { //not yet all read
1169 rd[i].bufPos += jrd; //==> prepare for continuation
1170 rd[i].bufLen -= jrd;
1171 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
1172 } else { //full dataset read
1173 rd[i].bufLen = 0;
1174 rd[i].bufPos = rd[i].fadLen;
1175 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
1176 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
1177 gi.evtErr++;
1178 snprintf (str, MXSTR,
1179 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
1180 i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0],
1181 rd[i].rBuf->B[1],
1182 rd[i].rBuf->B[rd[i].bufPos - 2],
1183 rd[i].rBuf->B[rd[i].bufPos - 1]);
1184 factOut (kError, 301, str);
1185 goto EndBuf;
1186
1187#ifdef EVTDEBUG
1188 } else {
1189 snprintf (str, MXSTR,
1190 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
1191 i, rd[i].fadLen, rd[i].rBuf->B[0],
1192 rd[i].rBuf->B[1], start.B[1], start.B[0],
1193 rd[i].rBuf->B[rd[i].bufPos - 2],
1194 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
1195 stop.B[0]);
1196 factOut (kDebug, 301, str);
1197#endif
1198 }
1199
1200 if (jrd > 0)
1201 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1202
1203/* //we have a complete buffer, copy to WORK area
1204 int jr, kr;
1205 int checkRoi;
1206 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1207 for (kr = 1; kr < 4; kr++)
1208 {
1209 checkRoi = ntohs(rd[i].rBuf->S[head_len/ 2 +
1210 + kr*(roi[jr-1]+4)]);
1211 if (checkRoi != roi[0])
1212 {
1213 snprintf (str, MXSTR, "Inconsistent Roi accross board patches");
1214 factOut (kFatal, 1, str);
1215 goto EndBuf;
1216 }
1217
1218 }
1219 for (jr = 1; jr < 9; jr++) {
1220 roi[jr] =
1221 ntohs (rd[i].
1222 rBuf->S[head_len / 2 + 2 + jr * (roi[jr-1] + 4)]);
1223
1224 }
1225*/
1226 //we have a complete buffer, copy to WORK area
1227 int jr, kr;
1228 int checkRoi;
1229 int roiHopper = head_len/2 + 2; //points to the very first roi
1230 roi[0] = ntohs (rd[i].rBuf->S[roiHopper]);
1231 roiHopper += roi[0]+4;//skip to the second roi (i.e. next board, same patch-pixel)
1232 for (kr = 1; kr < 4; kr++)
1233 {
1234 checkRoi = ntohs(rd[i].rBuf->S[roiHopper]);
1235 if (checkRoi != roi[0])
1236 {
1237 snprintf (str, MXSTR, "Inconsistent Roi accross board patches a %d %d %d", kr, checkRoi, roi[0]);
1238 factOut (kError, 1, str);
1239 goto EndBuf;
1240 }
1241 roiHopper += checkRoi+4;
1242 }
1243 //roiHopper now points to the first pixel of board 2. Do the 8 remaining pixels
1244 for (jr = 1; jr < 9; jr++) {
1245 roi[jr] = ntohs(rd[i].rBuf->S[roiHopper]);
1246 checkRoi = roi[jr];
1247 for (kr = 1; kr < 4; kr++)
1248 {
1249 roiHopper += checkRoi+4;
1250 checkRoi = ntohs(rd[i].rBuf->S[roiHopper]);
1251 if (checkRoi != roi[jr])
1252 {
1253 snprintf (str, MXSTR, "Inconsistent Roi accross board patches B=%d P=%d %d %d", kr, jr, roi[jr], checkRoi);
1254 factOut (kFatal, 1, str);
1255 goto EndBuf;
1256 }
1257 }
1258 }
1259 //get index into mBuffer for this event (create if needed)
1260
1261 int actid;
1262 if (g_useFTM > 0)
1263 actid = rd[i].evtID;
1264 else
1265 actid = rd[i].ftmID;
1266
1267 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1268 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1269 rd[i].evtID);
1270
1271 if (evID < -1000) {
1272 goto EndBuf; //not usable board/event/run --> skip it
1273 }
1274 if (evID < 0) { //no space left, retry later
1275#ifdef EVTDEBUG
1276 if (rd[i].bufLen != 0) {
1277 snprintf (str, MXSTR, "something screwed up");
1278 factOut (kFatal, 1, str);
1279 }
1280#endif
1281 xwait.tv_sec = 0;
1282 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1283 nanosleep (&xwait, NULL);
1284 goto EndBuf1; //hope there is free space next round
1285 }
1286 //we have a valid entry in mBuffer[]; fill it
1287
1288#ifdef EVTDEBUG
1289 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1290 rd[i].fadLen);
1291 if (xchk != 0) {
1292 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d",
1293 xchk, rd[i].fadLen, i);
1294 factOut (kFatal, 1, str);
1295
1296 uint iq;
1297 for (iq = 0; iq < rd[i].fadLen; iq++) {
1298 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1299 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq,
1300 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
1301 factOut (kFatal, 1, str);
1302 }
1303 }
1304 }
1305#endif
1306 int qncpy = 0;
1307 boardId = b;
1308 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1309 int fadCrate = fadBoard / 256;
1310 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1311 snprintf (str, MXSTR, "wrong Board ID %d %d %d",
1312 fadCrate, fadBoard % 256, boardId);
1313 factOut (kWarn, 301, str);
1314 }
1315 if (mBuffer[evID].board[boardId] != -1) {
1316 snprintf (str, MXSTR,
1317 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ",
1318 evID, boardId, i, rd[i].fadLen,
1319 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1320 rd[i].rBuf->B[rd[i].bufPos - 2],
1321 rd[i].rBuf->B[rd[i].bufPos - 1]);
1322 factOut (kWarn, 501, str);
1323 goto EndBuf; //--> skip Board
1324 }
1325
1326 int iDx = evtIdx[evID]; //index into evtCtrl
1327
1328 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
1329 &rd[i].rBuf->S[0], head_len);
1330 qncpy += head_len;
1331
1332 src = head_len / 2;
1333 for (px = 0; px < 9; px++) { //different sort in FAD board.....
1334 for (drs = 0; drs < 4; drs++) {
1335 pixH = ntohs (rd[i].rBuf->S[src++]); // ID
1336 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell
1337 pixR = ntohs (rd[i].rBuf->S[src++]); // roi
1338//here we should check if pixH is correct ....
1339
1340 pixS = boardId * 36 + drs * 9 + px;
1341 src++;
1342
1343
1344 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1345 dest = pixS * roi[0];
1346 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1347 &rd[i].rBuf->S[src], roi[0] * 2);
1348 qncpy += roi[0] * 2;
1349 src += pixR;
1350
1351 if (px == 8) {
1352 tmS = boardId * 4 + drs;
1353 if (pixR > roi[0]) { //and we have additional TM info
1354 dest = tmS * roi[0] + NPIX * roi[0];
1355 int srcT = src - roi[0];
1356 mBuffer[evID].fEvent->StartTM[tmS] =
1357 (pixC + pixR - roi[0]) % 1024;
1358 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1359 &rd[i].rBuf->S[srcT], roi[0] * 2);
1360 qncpy += roi[0] * 2;
1361 } else {
1362 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1363 //ETIENNE because the TM channels are always processed during drs calib,
1364 //set them to zero if they are not present
1365 //I suspect that it may be more efficient to set all the allocated mem to
1366 //zero when allocating it
1367 dest = tmS*roi[0] + NPIX*roi[0];
1368 bzero(&mBuffer[evID].fEvent->Adc_Data[dest],roi[0]*2);
1369 }
1370 }
1371 }
1372 } // now we have stored a new board contents into Event structure
1373
1374 mBuffer[evID].fEvent->NumBoards++;
1375 mBuffer[evID].board[boardId] = boardId;
1376 evtCtrl.evtStat[iDx]++;
1377 evtCtrl.pcTime[iDx] = g_actTime;
1378
1379 if (++mBuffer[evID].nBoard >= actBoards) {
1380 int qnrun = 0;
1381 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ???
1382 actrun = mBuffer[evID].runNum;
1383 int ir;
1384 for (ir = 0; ir < MAX_RUN; ir++) {
1385 qnrun++;
1386 if (runCtrl[ir].runId == actrun) {
1387 if (++runCtrl[ir].lastEvt == 0) {
1388 gotNewRun (actrun, mBuffer[evID].FADhead);
1389 snprintf (str, MXSTR, "gotNewRun %d (ev %d)",
1390 mBuffer[evID].runNum,
1391 mBuffer[evID].evNum);
1392 factOut (kInfo, 1, str);
1393 break;
1394 }
1395 }
1396 }
1397 }
1398 snprintf (str, MXSTR,
1399 "%5d complete event roi %4d roiTM %d cpy %8d %5d",
1400 mBuffer[evID].evNum, roi[0], roi[8] - roi[0],
1401 qncpy, qnrun);
1402 factOut (kDebug, -1, str);
1403
1404 //complete event read ---> flag for next processing
1405 evtCtrl.evtStat[iDx] = 99;
1406 gi.evtTot++;
1407 }
1408
1409 EndBuf:
1410 rd[i].bufTyp = 0; //ready to read next header
1411 rd[i].bufLen = frst_len;
1412 rd[i].bufPos = 0;
1413 EndBuf1:
1414 ;
1415 }
1416
1417 } else { //we are reading event header
1418 rd[i].bufPos += jrd;
1419 rd[i].bufLen -= jrd;
1420 if (rd[i].bufPos >= minLen) { //sufficient data to take action
1421 //check if startflag correct; else shift block ....
1422 for (k = 0; k < rd[i].bufPos - 1; k++) {
1423 if (rd[i].rBuf->B[k] == start.B[1]
1424 && rd[i].rBuf->B[k + 1] == start.B[0])
1425 break;
1426 }
1427 rd[i].skip += k;
1428
1429 if (k >= rd[i].bufPos - 1) { //no start of header found
1430 rd[i].bufPos = 0;
1431 rd[i].bufLen = head_len;
1432 } else if (k > 0) {
1433 rd[i].bufPos -= k;
1434 rd[i].bufLen += k;
1435 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1436 rd[i].bufPos);
1437#ifdef EVTDEBUG
1438 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1439 rd[i].bufPos);
1440#endif
1441 }
1442 if (rd[i].bufPos >= minLen) {
1443 if (rd[i].skip > 0) {
1444 snprintf (str, MXSTR, "skipped %d bytes on port%d",
1445 rd[i].skip, i);
1446 factOut (kInfo, 666, str);
1447 rd[i].skip = 0;
1448 }
1449 goodhed++;
1450 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2;
1451 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]);
1452 rd[i].ftmTyp = ntohs (rd[i].rBuf->S[5]);
1453 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt)
1454 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt)
1455 rd[i].runID = ntohl (rd[i].rBuf->I[11]);
1456 rd[i].bufTyp = 1; //ready to read full record
1457 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1458
1459 int fadboard = ntohs (rd[i].rBuf->S[12]);
1460 int fadcrate = fadboard / 256;
1461 fadboard = (fadcrate * 10 + fadboard % 256);
1462#ifdef EVTDEBUG
1463 snprintf (str, MXSTR,
1464 "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
1465 rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
1466 rd[i].runID, fadboard, jrd);
1467 factOut (kDebug, 1, str);
1468#endif
1469
1470 if (rd[i].runID == 0)
1471 rd[i].runID = gi_myRun;
1472
1473 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
1474 snprintf (str, MXSTR,
1475 "illegal event-length on port %d", i);
1476 factOut (kFatal, 881, str);
1477 rd[i].bufLen = 100000; //?
1478 }
1479 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1480 debugHead (i, fadBoard, rd[i].rBuf);
1481 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1482 } else {
1483 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1484 }
1485 } else {
1486 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1487 }
1488
1489 } //end interpreting last read
1490}
1491 } //end of successful read anything
1492 } //finished trying to read all sockets
1493
1494#ifdef EVTDEBUG
1495 snprintf (str, MXSTR, "Loop ---- %3d --- %8d", numokx, jrdx);
1496 factOut (kDebug, -1, str);
1497#endif
1498
1499 gi.numRead[numok]++;
1500
1501 g_actTime = time (NULL);
1502 if (g_actTime > gi_SecTime) {
1503 gi_SecTime = g_actTime;
1504
1505
1506 //loop over all active events and flag those older than read-timeout
1507 //delete those that are written to disk ....
1508
1509 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1510 if (kd < 0)
1511 kd += (MAX_EVT * MAX_RUN);
1512
1513 gj.bufNew = gj.bufEvt = 0;
1514 int k1 = evtCtrl.frstPtr;
1515 for (k = k1; k < (k1 + kd); k++) {
1516 int k0 = k % (MAX_EVT * MAX_RUN);
1517//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1518
1519 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) {
1520 gj.bufNew++; //incomplete event in Buffer
1521 if (evtCtrl.evtStat[k0] < 90
1522 && evtCtrl.pcTime[k0] < g_actTime - 10) {
1523 int id = evtCtrl.evtBuf[k0];
1524 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d",
1525 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0,
1526 evtCtrl.evtStat[k0]);
1527 factOut (kWarn, 601, str);
1528
1529 int ik,ib,jb;
1530 ik=0;
1531 for (ib=0; ib<NBOARDS; ib++) {
1532 if (ib%10==0) {
1533 snprintf (&str[ik], MXSTR, "'");
1534 ik++;
1535 }
1536 jb = mBuffer[id].board[ib];
1537 if ( jb<0 ) {
1538 if (gi_NumConnect[b] >0 ) {
1539 snprintf (&str[ik], MXSTR, ".");
1540 } else {
1541 snprintf (&str[ik], MXSTR, "x");
1542 }
1543 } else {
1544 snprintf (&str[ik], MXSTR, "%d",jb%10);
1545 }
1546 ik++;
1547 }
1548 snprintf (&str[ik], MXSTR, "'");
1549 factOut (kWarn, 601, str);
1550
1551 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events
1552 gi.evtSkp++;
1553 gi.evtTot++;
1554 gj.evtSkip++;
1555 }
1556 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete'
1557 || evtCtrl.evtStat[k0] == 0) { //'useless'
1558
1559 int id = evtCtrl.evtBuf[k0];
1560 snprintf (str, MXSTR, "%5d free event buffer, nb=%3d",
1561 mBuffer[id].evNum, mBuffer[id].nBoard);
1562 factOut (kDebug, -1, str);
1563 mBufFree (id); //event written--> free memory
1564 evtCtrl.evtStat[k0] = -1;
1565 gj.evtWrite++;
1566 gj.rateWrite++;
1567 } else if (evtCtrl.evtStat[k0] >= 95) {
1568 gj.bufEvt++; //complete event in Buffer
1569 }
1570
1571 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) {
1572 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1573 }
1574 }
1575
1576
1577 gj.deltaT = 1000; //temporary, must be improved
1578
1579 int b;
1580 for (b = 0; b < NBOARDS; b++)
1581 gj.totBytes[b] += gj.rateBytes[b];
1582 gj.totMem = g_maxMem;
1583 if (gj.maxMem > gj.xxxMem)
1584 gj.xxxMem = gj.maxMem;
1585 if (gj.maxEvt > gj.xxxEvt)
1586 gj.xxxEvt = gj.maxEvt;
1587
1588 factStat (gj);
1589 factStatNew (gi);
1590 gj.rateNew = gj.rateWrite = 0;
1591 gj.maxMem = gj.usdMem;
1592 gj.maxEvt = gj.bufTot;
1593 for (b = 0; b < NBOARDS; b++)
1594 gj.rateBytes[b] = 0;
1595 }
1596
1597 if (numok > 0)
1598 numok2 = 0;
1599 else if (numok2++ > 3) {
1600 if (g_runStat == 1) {
1601 xwait.tv_sec = 1;
1602 xwait.tv_nsec = 0; // hibernate for 1 sec
1603 } else {
1604 xwait.tv_sec = 0;
1605 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1606 }
1607 nanosleep (&xwait, NULL);
1608 }
1609
1610 } //and do next loop over all sockets ...
1611
1612
1613 snprintf (str, MXSTR, "stop reading ... RESET=%d", g_reset);
1614 factOut (kInfo, -1, str);
1615
1616 if (g_reset > 0) {
1617 gi_reset = g_reset;
1618 gi_resetR = gi_reset % 10; //shall we stop reading ?
1619 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1620 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1621 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1622 g_reset = 0;
1623 } else {
1624 gi_reset = 0;
1625 if (g_runStat == -1)
1626 gi_resetR = 1;
1627 else
1628 gi_resetR = 7;
1629 gi_resetS = 7; //close all sockets
1630 gi_resetW = 7; //close all files
1631 gi_resetX = 0;
1632
1633 //inform others we have to quit ....
1634 gi_runStat = -11; //inform all that no update to happen any more
1635 gj.readStat = -11; //inform all that no update to happen any more
1636 }
1637
1638 if (gi_resetS > 0) {
1639 //must close all open sockets ...
1640 snprintf (str, MXSTR, "close all sockets ...");
1641 factOut (kInfo, -1, str);
1642 for (i = 0; i < MAX_SOCK; i++) {
1643 if (rd[i].sockStat == 0) {
1644 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1645 if (i % 7 == 0) {
1646// gi_NumConnect[i / 7]--;
1647 gi_NumConnect[i / 7]-= cntsock ;
1648 gi.numConn[i / 7]--;
1649 gj.numConn[i / 7]--;
1650 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1651 rd[i / 7].sockStat = -1; //and try to open asap
1652 }
1653 }
1654 }
1655 }
1656
1657
1658 if (gi_resetR > 0) {
1659 //flag all events as 'read finished'
1660 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1661 if (kd < 0)
1662 kd += (MAX_EVT * MAX_RUN);
1663
1664 int k1 = evtCtrl.frstPtr;
1665 for (k = k1; k < (k1 + kd); k++) {
1666 int k0 = k % (MAX_EVT * MAX_RUN);
1667 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) {
1668 evtCtrl.evtStat[k0] = 91;
1669 gi.evtSkp++;
1670 gi.evtTot++;
1671 }
1672 }
1673
1674 xwait.tv_sec = 0;
1675 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1676 nanosleep (&xwait, NULL);
1677
1678 //and clear all buffers (might have to wait until all others are done)
1679 int minclear;
1680 if (gi_resetR == 1) {
1681 minclear = 900;
1682 snprintf (str, MXSTR, "drain all buffers ...");
1683 } else {
1684 minclear = 0;
1685 snprintf (str, MXSTR, "flush all buffers ...");
1686 }
1687 factOut (kInfo, -1, str);
1688
1689 int numclear = 1;
1690 while (numclear > 0) {
1691 numclear = 0;
1692 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1693 if (kd < 0)
1694 kd += (MAX_EVT * MAX_RUN);
1695
1696 int k1 = evtCtrl.frstPtr;
1697 for (k = k1; k < (k1 + kd); k++) {
1698 int k0 = k % (MAX_EVT * MAX_RUN);
1699 if (evtCtrl.evtStat[k0] > minclear) {
1700 int id = evtCtrl.evtBuf[k0];
1701 snprintf (str, MXSTR, "ev %5d free event buffer, nb=%3d",
1702 mBuffer[id].evNum, mBuffer[id].nBoard);
1703 factOut (kDebug, -1, str);
1704 mBufFree (id); //event written--> free memory
1705 evtCtrl.evtStat[k0] = -1;
1706 } else if (evtCtrl.evtStat[k0] > 0)
1707 numclear++; //writing is still ongoing...
1708
1709 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1710 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1711 }
1712
1713 xwait.tv_sec = 0;
1714 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1715 nanosleep (&xwait, NULL);
1716 }
1717 }
1718
1719 if (gi_reset > 0) {
1720 if (gi_resetW > 0) {
1721 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1722 }
1723 if (gi_resetX > 0) {
1724 xwait.tv_sec = gi_resetX;
1725 xwait.tv_nsec = 0;
1726 nanosleep (&xwait, NULL);
1727 }
1728
1729 snprintf (str, MXSTR, "Continue read Process ...");
1730 factOut (kInfo, -1, str);
1731 gi_reset = 0;
1732 goto START;
1733 }
1734
1735
1736
1737 snprintf (str, MXSTR, "Exit read Process ...");
1738 factOut (kInfo, -1, str);
1739 gi_runStat = -99;
1740 gj.readStat = -99;
1741 factStat (gj);
1742 factStatNew (gi);
1743 return 0;
1744
1745} /*-----------------------------------------------------------------*/
1746
1747
1748void *
1749subProc (void *thrid)
1750{
1751 int threadID, status, numWait, numProc, kd, k1, k0, k, jret;
1752 struct timespec xwait;
1753
1754 int32_t cntr ;
1755
1756 threadID = (int) thrid;
1757
1758 snprintf (str, MXSTR, "Starting sub-process-thread %d", threadID);
1759 factOut (kInfo, -1, str);
1760
1761 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1762 numWait = numProc = 0;
1763 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1764 if (kd < 0)
1765 kd += (MAX_EVT * MAX_RUN);
1766
1767 int k1 = evtCtrl.frstPtr;
1768 for (k = k1; k < (k1 + kd); k++) {
1769 int k0 = k % (MAX_EVT * MAX_RUN);
1770
1771 if (evtCtrl.evtStat[k0] == 1000 + threadID) {
1772 if (gi_resetR > 1) { //we are asked to flush buffers asap
1773 jret = 9100; //flag to be deleted
1774 } else {
1775 int id = evtCtrl.evtBuf[k0];
1776
1777
1778 jret =
1779 subProcEvt (threadID, mBuffer[id].FADhead,
1780 mBuffer[id].fEvent, mBuffer[id].buffer);
1781
1782
1783 if (jret <= threadID) {
1784 snprintf (str, MXSTR,
1785 "process %d wants to send to process %d",
1786 threadID, jret);
1787 factOut (kError, -1, str);
1788 jret = 5300;
1789 } else if (jret <= 0)
1790 jret = 9200 + threadID; //flag as 'to be deleted'
1791 else if (jret >= gi_maxProc)
1792 jret = 5200 + threadID; //flag as 'to be written'
1793 else
1794 jret = 1000 + jret; //flag for next proces
1795 }
1796 evtCtrl.evtStat[k0] = jret;
1797 numProc++;
1798 } else if (evtCtrl.evtStat[k0] < 1000 + threadID)
1799 numWait++;
1800 }
1801
1802 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1803 snprintf (str, MXSTR, "Exit subProcessing Process %d", threadID);
1804 factOut (kInfo, -1, str);
1805 return 0;
1806 }
1807 if (numProc == 0) {
1808 //seems we have nothing to do, so sleep a little
1809 xwait.tv_sec = 0;
1810 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1811 nanosleep (&xwait, NULL);
1812 }
1813 }
1814
1815 snprintf (str, MXSTR, "Ending sub-process-thread %d", threadID);
1816 factOut (kInfo, -1, str);
1817 return;
1818} /*-----------------------------------------------------------------*/
1819
1820
1821void *
1822procEvt (void *ptr)
1823{
1824/* *** main loop processing file, including SW-trigger */
1825 int numProc, numWait;
1826 int k, status, j;
1827 struct timespec xwait;
1828 char str[MXSTR];
1829
1830
1831
1832 int lastRun = 0; //usually run from last event still valid
1833
1834 cpu_set_t mask;
1835 int cpu = 1; //process thread (will be several in final version)
1836
1837 snprintf (str, MXSTR, "Starting process-thread with %d subprocess",
1838 gi_maxProc);
1839 factOut (kInfo, -1, str);
1840
1841/* CPU_ZERO initializes all the bits in the mask to zero. */
1842// CPU_ZERO (&mask);
1843/* CPU_SET sets only the bit corresponding to cpu. */
1844// CPU_SET( 0 , &mask ); leave for system
1845// CPU_SET( 1 , &mask ); used by write process
1846// CPU_SET (2, &mask);
1847// CPU_SET (3, &mask);
1848// CPU_SET (4, &mask);
1849// CPU_SET (5, &mask);
1850// CPU_SET (6, &mask);
1851// CPU_SET( 7 , &mask ); used by read process
1852/* sched_setaffinity returns 0 in success */
1853// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1854// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1855// factOut (kWarn, -1, str);
1856// }
1857
1858
1859 pthread_t thread[100];
1860 int th_ret[100];
1861
1862 for (k = 0; k < gi_maxProc; k++) {
1863 th_ret[k] = pthread_create (&thread[k], NULL, subProc, (void *) k);
1864 }
1865
1866 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1867
1868 numWait = numProc = 0;
1869 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1870 if (kd < 0)
1871 kd += (MAX_EVT * MAX_RUN);
1872
1873 int k1 = evtCtrl.frstPtr;
1874 for (k = k1; k < (k1 + kd); k++) {
1875 int k0 = k % (MAX_EVT * MAX_RUN);
1876//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1877 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) {
1878
1879 if (gi_resetR > 1) { //we are asked to flush buffers asap
1880 evtCtrl.evtStat[k0] = 9991;
1881 } else {
1882
1883//-------- it is better to open the run already here, so call can be used to initialize
1884//-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1885 int id = evtCtrl.evtBuf[k0];
1886 uint32_t irun = mBuffer[id].runNum;
1887 int32_t ievt = mBuffer[id].evNum;
1888 if (runCtrl[lastRun].runId == irun) {
1889 j = lastRun;
1890 } else {
1891 //check which fileID to use (or open if needed)
1892 for (j = 0; j < MAX_RUN; j++) {
1893 if (runCtrl[j].runId == irun)
1894 break;
1895 }
1896 if (j >= MAX_RUN) {
1897 snprintf (str, MXSTR,
1898 "P error: can not find run %d for event %d in %d",
1899 irun, ievt, id);
1900 factOut (kFatal, 901, str);
1901 }
1902 lastRun = j;
1903 }
1904
1905 if (runCtrl[j].fileId < 0) {
1906//---- we need to open a new run ==> make sure all older runs are
1907//---- finished and marked to be closed ....
1908 int j1;
1909 for (j1 = 0; j1 < MAX_RUN; j1++) {
1910 if (runCtrl[j1].fileId == 0) {
1911 runCtrl[j1].procId = 2; //--> do no longer accept events for processing
1912//---- problem: processing still going on ==> must wait for closing ....
1913 snprintf (str, MXSTR,
1914 "P finish run since new one opened %d",
1915 runCtrl[j1].runId);
1916 runFinish1 (runCtrl[j1].runId);
1917 }
1918
1919 }
1920
1921 actRun.Version = 1;
1922 actRun.RunType = -1; //to be adapted
1923
1924 actRun.Nroi = runCtrl[j].roi0;
1925 actRun.NroiTM = runCtrl[j].roi8;
1926//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
1927// if (actRun.Nroi == actRun.NroiTM)
1928// actRun.NroiTM = 0;
1929 actRun.RunTime = runCtrl[j].firstTime;
1930 actRun.RunUsec = runCtrl[j].firstTime;
1931 actRun.NBoard = NBOARDS;
1932 actRun.NPix = NPIX;
1933 actRun.NTm = NTMARK;
1934 actRun.Nroi = mBuffer[id].nRoi;
1935 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1936 NBOARDS * sizeof (PEVNT_HEADER));
1937
1938 runCtrl[j].fileHd =
1939 runOpen (irun, &actRun, sizeof (actRun));
1940 if (runCtrl[j].fileHd == NULL) {
1941 snprintf (str, MXSTR,
1942 "P could not open a file for run %d", irun);
1943 factOut (kError, 502, str);
1944 runCtrl[j].fileId = 91;
1945 runCtrl[j].procId = 91;
1946 } else {
1947 snprintf (str, MXSTR, "P opened new run_file %d evt %d",
1948 irun, ievt);
1949 factOut (kInfo, -1, str);
1950 runCtrl[j].fileId = 0;
1951 runCtrl[j].procId = 0;
1952 }
1953
1954 }
1955//-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
1956 if (runCtrl[j].procId == 0) {
1957 if (runCtrl[j].closeTime < g_actTime
1958 || runCtrl[j].lastTime < g_actTime - 300
1959 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) {
1960 snprintf (str, MXSTR,
1961 "P reached end of run condition for run %d",
1962 irun);
1963 factOut (kInfo, 502, str);
1964 runFinish1 (runCtrl[j].runId);
1965 runCtrl[j].procId = 1;
1966 }
1967 }
1968 if (runCtrl[j].procId != 0) {
1969 snprintf (str, MXSTR,
1970 "P skip event %d because no active run %d", ievt,
1971 irun);
1972 factOut (kDebug, 502, str);
1973 evtCtrl.evtStat[k0] = 9091;
1974 } else {
1975//--------
1976//--------
1977 id = evtCtrl.evtBuf[k0];
1978 int itevt = mBuffer[id].trgNum;
1979 int itrg = mBuffer[id].trgTyp;
1980 int roi = mBuffer[id].nRoi;
1981 int roiTM = mBuffer[id].nRoiTM;
1982
1983//make sure unused pixels/tmarks are cleared to zero
1984//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
1985// if (roiTM == roi)
1986// roiTM = 0;
1987 int ip, it, dest, ib;
1988 for (ip = 0; ip < NPIX; ip++) {
1989 if (mBuffer[id].fEvent->StartPix[ip] == -1) {
1990 dest = ip * roi;
1991 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1992 }
1993 }
1994
1995 for (it = 0; it < NTMARK; it++) {
1996 if (mBuffer[id].fEvent->StartTM[it] == -1) {
1997 dest = it * roi + NPIX * roi;
1998 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1999 }
2000 }
2001
2002
2003//and set correct event header ; also check for consistency in event (not yet)
2004 mBuffer[id].fEvent->Roi = roi;
2005 mBuffer[id].fEvent->RoiTM = roiTM;
2006 mBuffer[id].fEvent->EventNum = ievt;
2007 mBuffer[id].fEvent->TriggerNum = itevt;
2008 mBuffer[id].fEvent->TriggerType = itrg;
2009 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
2010 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
2011 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
2012 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
2013 mBuffer[id].fEvent->SoftTrig = 0;
2014
2015
2016 for (ib = 0; ib < NBOARDS; ib++) {
2017 if (mBuffer[id].board[ib] == -1) { //board is not read
2018 mBuffer[id].FADhead[ib].start_package_flag = 0;
2019 mBuffer[id].fEvent->BoardTime[ib] = 0;
2020 } else {
2021 mBuffer[id].fEvent->BoardTime[ib] =
2022 ntohl (mBuffer[id].FADhead[ib].time);
2023 }
2024 }
2025 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
2026 mBuffer[id].fEvent);
2027 gi.procTot++;
2028 numProc++;
2029
2030 if (i < 0) {
2031 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
2032 gi.procErr++;
2033 } else {
2034 evtCtrl.evtStat[k0] = 1000;
2035 runCtrl[j].procEvt++;
2036 }
2037 }
2038 }
2039 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) {
2040 numWait++;
2041 }
2042 }
2043
2044 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2045 snprintf (str, MXSTR, "Exit Processing Process ...");
2046 factOut (kInfo, -1, str);
2047 gp_runStat = -22; //==> we should exit
2048 gj.procStat = -22; //==> we should exit
2049 return 0;
2050 }
2051
2052 if (numProc == 0) {
2053 //seems we have nothing to do, so sleep a little
2054 xwait.tv_sec = 0;
2055 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2056 nanosleep (&xwait, NULL);
2057 }
2058 gp_runStat = gi_runStat;
2059 gj.procStat = gj.readStat;
2060
2061 }
2062
2063 //we are asked to abort asap ==> must flag all remaining events
2064 // when gi_runStat claims that all events are in the buffer...
2065
2066 snprintf (str, MXSTR, "Abort Processing Process ...");
2067 factOut (kInfo, -1, str);
2068 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2069 if (kd < 0)
2070 kd += (MAX_EVT * MAX_RUN);
2071
2072 for (k = 0; k < gi_maxProc; k++) {
2073 pthread_join (thread[k], (void **) &status);
2074 }
2075
2076 int k1 = evtCtrl.frstPtr;
2077 for (k = k1; k < (k1 + kd); k++) {
2078 int k0 = k % (MAX_EVT * MAX_RUN);
2079 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {
2080 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
2081 }
2082 }
2083
2084 gp_runStat = -99;
2085 gj.procStat = -99;
2086
2087 return 0;
2088
2089} /*-----------------------------------------------------------------*/
2090
2091int
2092CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
2093{
2094/* close run runId (all all runs if runId=0) */
2095/* return: 0=close scheduled / >0 already closed / <0 does not exist */
2096 int j;
2097
2098
2099 if (runId == 0) {
2100 for (j = 0; j < MAX_RUN; j++) {
2101 if (runCtrl[j].fileId == 0) { //run is open
2102 runCtrl[j].closeTime = closeTime;
2103 runCtrl[j].maxEvt = maxEvt;
2104 }
2105 }
2106 return 0;
2107 }
2108
2109 for (j = 0; j < MAX_RUN; j++) {
2110 if (runCtrl[j].runId == runId) {
2111 if (runCtrl[j].fileId == 0) { //run is open
2112 runCtrl[j].closeTime = closeTime;
2113 runCtrl[j].maxEvt = maxEvt;
2114 return 0;
2115 } else if (runCtrl[j].fileId < 0) { //run not yet opened
2116 runCtrl[j].closeTime = closeTime;
2117 runCtrl[j].maxEvt = maxEvt;
2118 return +1;
2119 } else { // run already closed
2120 return +2;
2121 }
2122 }
2123 } //we only reach here if the run was never created
2124 return -1;
2125
2126} /*-----------------------------------------------------------------*/
2127
2128
2129void *
2130writeEvt (void *ptr)
2131{
2132/* *** main loop writing event (including opening and closing run-files */
2133
2134 int numWrite, numWait;
2135 int k, j ;
2136 struct timespec xwait;
2137 char str[MXSTR];
2138
2139 cpu_set_t mask;
2140 int cpu = 1; //write thread
2141
2142 snprintf (str, MXSTR, "Starting write-thread");
2143 factOut (kInfo, -1, str);
2144
2145/* CPU_ZERO initializes all the bits in the mask to zero. */
2146// CPU_ZERO (&mask);
2147/* CPU_SET sets only the bit corresponding to cpu. */
2148// CPU_SET (cpu, &mask);
2149/* sched_setaffinity returns 0 in success */
2150// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
2151// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
2152// }
2153
2154 int lastRun = 0; //usually run from last event still valid
2155
2156 while (g_runStat > -2) {
2157
2158 numWait = numWrite = 0;
2159 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2160 if (kd < 0)
2161 kd += (MAX_EVT * MAX_RUN);
2162
2163 int k1 = evtCtrl.frstPtr;
2164 for (k = k1; k < (k1 + kd); k++) {
2165 int k0 = k % (MAX_EVT * MAX_RUN);
2166//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
2167 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) {
2168
2169 if (gi_resetR > 1) { //we must drain the buffer asap
2170 evtCtrl.evtStat[k0] = 9904;
2171 } else {
2172
2173
2174 int id = evtCtrl.evtBuf[k0];
2175 uint32_t irun = mBuffer[id].runNum;
2176 int32_t ievt = mBuffer[id].evNum;
2177
2178 gi.wrtTot++;
2179 if (runCtrl[lastRun].runId == irun) {
2180 j = lastRun;
2181 } else {
2182 //check which fileID to use (or open if needed)
2183 for (j = 0; j < MAX_RUN; j++) {
2184 if (runCtrl[j].runId == irun)
2185 break;
2186 }
2187 if (j >= MAX_RUN) {
2188 snprintf (str, MXSTR,
2189 "W error: can not find run %d for event %d in %d",
2190 irun, ievt, id);
2191 factOut (kFatal, 901, str);
2192 gi.wrtErr++;
2193 }
2194 lastRun = j;
2195 }
2196
2197 if (runCtrl[j].fileId < 0) {
2198 actRun.Version = 1;
2199 actRun.RunType = -1; //to be adapted
2200
2201 actRun.Nroi = runCtrl[j].roi0;
2202 actRun.NroiTM = runCtrl[j].roi8;
2203//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
2204// if (actRun.Nroi == actRun.NroiTM)
2205// actRun.NroiTM = 0;
2206 actRun.RunTime = runCtrl[j].firstTime;
2207 actRun.RunUsec = runCtrl[j].firstTime;
2208 actRun.NBoard = NBOARDS;
2209 actRun.NPix = NPIX;
2210 actRun.NTm = NTMARK;
2211 actRun.Nroi = mBuffer[id].nRoi;
2212 memcpy (actRun.FADhead, mBuffer[id].FADhead,
2213 NBOARDS * sizeof (PEVNT_HEADER));
2214
2215 runCtrl[j].fileHd =
2216 runOpen (irun, &actRun, sizeof (actRun));
2217 if (runCtrl[j].fileHd == NULL) {
2218 snprintf (str, MXSTR,
2219 "W could not open a file for run %d", irun);
2220 factOut (kError, 502, str);
2221 runCtrl[j].fileId = 91;
2222 } else {
2223 snprintf (str, MXSTR, "W opened new run_file %d evt %d",
2224 irun, ievt);
2225 factOut (kInfo, -1, str);
2226 runCtrl[j].fileId = 0;
2227 }
2228
2229 }
2230
2231 if (runCtrl[j].fileId != 0) {
2232 if (runCtrl[j].fileId < 0) {
2233 snprintf (str, MXSTR,
2234 "W never opened file for this run %d", irun);
2235 factOut (kError, 123, str);
2236 } else if (runCtrl[j].fileId < 100) {
2237 snprintf (str, MXSTR, "W.file for this run is closed %d",
2238 irun);
2239 factOut (kWarn, 123, str);
2240 runCtrl[j].fileId += 100;
2241 } else {
2242 snprintf (str, MXSTR, "W:file for this run is closed %d",
2243 irun);
2244 factOut (kDebug, 123, str);
2245 }
2246 evtCtrl.evtStat[k0] = 9903;
2247 gi.wrtErr++;
2248 } else {
2249// snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id]));
2250// factOut (kInfo, 504, str);
2251 int i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
2252 sizeof (mBuffer[id]));
2253 if (i >= 0) {
2254 runCtrl[j].lastTime = g_actTime;
2255 runCtrl[j].actEvt++;
2256 evtCtrl.evtStat[k0] = 9901;
2257 snprintf (str, MXSTR,
2258 "%5d successfully wrote for run %d id %5d",
2259 ievt, irun, k0);
2260 factOut (kDebug, 504, str);
2261// gj.writEvt++ ;
2262 } else {
2263 snprintf (str, MXSTR, "W error writing event for run %d",
2264 irun);
2265 factOut (kError, 503, str);
2266 evtCtrl.evtStat[k0] = 9902;
2267 gi.wrtErr++;
2268 }
2269
2270 if (i < 0
2271 || runCtrl[j].lastTime < g_actTime - 300
2272 || runCtrl[j].closeTime < g_actTime
2273 || runCtrl[j].maxEvt < runCtrl[j].actEvt) {
2274 int ii = 0;
2275 if (i < 0)
2276 ii = 1;
2277 else if (runCtrl[j].closeTime < g_actTime)
2278 ii = 2;
2279 else if (runCtrl[j].lastTime < g_actTime - 300)
2280 ii = 3;
2281 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2282 ii = 4;
2283
2284
2285
2286 //close run for whatever reason
2287 if (runCtrl[j].runId == gi_myRun)
2288 gi_myRun = g_actTime;
2289
2290 if (runCtrl[j].procId == 0) {
2291 runFinish1 (runCtrl[j].runId);
2292 runCtrl[j].procId = 92;
2293 }
2294
2295 runCtrl[j].closeTime = g_actTime - 1;
2296 i = runClose (runCtrl[j].fileHd, &runTail[j],
2297 sizeof (runTail[j]));
2298 if (i < 0) {
2299 snprintf (str, MXSTR, "error closing run %d %d AAA",
2300 runCtrl[j].runId, i);
2301 factOut (kError, 503, str);
2302 runCtrl[j].fileId = 92;
2303 } else {
2304 snprintf (str, MXSTR, "W closed run %d AAA %d", irun,
2305 ii);
2306 factOut (kInfo, 503, str);
2307 runCtrl[j].fileId = 93;
2308 }
2309 }
2310 }
2311 }
2312 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2313 numWait++;
2314 }
2315
2316 //check if we should close a run (mainly when no event pending)
2317 for (j = 0; j < MAX_RUN; j++) {
2318 if (runCtrl[j].fileId == 0
2319 && (runCtrl[j].closeTime < g_actTime
2320 || runCtrl[j].lastTime < g_actTime - 300
2321 || runCtrl[j].maxEvt <= runCtrl[j].actEvt)) {
2322 if (runCtrl[j].runId == gi_myRun)
2323 gi_myRun = g_actTime;
2324 int ii = 0;
2325 if (runCtrl[j].closeTime < g_actTime)
2326 ii = 2;
2327 else if (runCtrl[j].lastTime < g_actTime - 300)
2328 ii = 3;
2329 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2330 ii = 4;
2331
2332 if (runCtrl[j].procId == 0) {
2333 runFinish1 (runCtrl[j].runId);
2334 runCtrl[j].procId = 92;
2335 }
2336
2337 runCtrl[j].closeTime = g_actTime - 1;
2338 int i = runClose (runCtrl[j].fileHd, &runTail[j],
2339 sizeof (runTail[j]));
2340 if (i < 0) {
2341 snprintf (str, MXSTR, "error closing run %d %d BBB",
2342 runCtrl[j].runId, i);
2343 factOut (kError, 506, str);
2344 runCtrl[j].fileId = 94;
2345 } else {
2346 snprintf (str, MXSTR, "W closed run %d BBB %d",
2347 runCtrl[j].runId, ii);
2348 factOut (kInfo, 507, str);
2349 runCtrl[j].fileId = 95;
2350 }
2351 }
2352 }
2353
2354 if (numWrite == 0) {
2355 //seems we have nothing to do, so sleep a little
2356 xwait.tv_sec = 0;
2357 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2358 nanosleep (&xwait, NULL);
2359 }
2360
2361 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2362 snprintf (str, MXSTR, "Finish Write Process ...");
2363 factOut (kInfo, -1, str);
2364 gw_runStat = -22; //==> we should exit
2365 gj.writStat = -22; //==> we should exit
2366 goto closerun;
2367 }
2368 gw_runStat = gi_runStat;
2369 gj.writStat = gj.readStat;
2370
2371 }
2372
2373 //must close all open files ....
2374 snprintf (str, MXSTR, "Abort Writing Process ...");
2375 factOut (kInfo, -1, str);
2376
2377 closerun:
2378 snprintf (str, MXSTR, "Close all open files ...");
2379 factOut (kInfo, -1, str);
2380 for (j = 0; j < MAX_RUN; j++)
2381 if (runCtrl[j].fileId == 0) {
2382 if (runCtrl[j].runId == gi_myRun)
2383 gi_myRun = g_actTime;
2384
2385 if (runCtrl[j].procId == 0) {
2386 runFinish1 (runCtrl[j].runId);
2387 runCtrl[j].procId = 92;
2388 }
2389
2390 runCtrl[j].closeTime = g_actTime - 1;
2391 int i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));
2392 int ii = 0;
2393 if (runCtrl[j].closeTime < g_actTime)
2394 ii = 2;
2395 else if (runCtrl[j].lastTime < g_actTime - 300)
2396 ii = 3;
2397 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2398 ii = 4;
2399 if (i < 0) {
2400 snprintf (str, MXSTR, "error closing run %d %d CCC",
2401 runCtrl[j].runId, i);
2402 factOut (kError, 506, str);
2403 runCtrl[j].fileId = 96;
2404 } else {
2405 snprintf (str, MXSTR, "W closed run %d CCC %d", runCtrl[j].runId,
2406 ii);
2407 factOut (kInfo, 507, str);
2408 runCtrl[j].fileId = 97;
2409 }
2410 }
2411
2412 gw_runStat = -99;
2413 gj.writStat = -99;
2414 snprintf (str, MXSTR, "Exit Writing Process ...");
2415 factOut (kInfo, -1, str);
2416 return 0;
2417
2418
2419
2420
2421} /*-----------------------------------------------------------------*/
2422
2423
2424
2425
2426void
2427StartEvtBuild ()
2428{
2429
2430 int i, j, imax, status, th_ret[50];
2431 pthread_t thread[50];
2432 struct timespec xwait;
2433
2434 gi_runStat = gp_runStat = gw_runStat = 0;
2435 gj.readStat = gj.procStat = gj.writStat = 0;
2436
2437 snprintf (str, MXSTR, "Starting EventBuilder V15.07 A");
2438 factOut (kInfo, -1, str);
2439
2440//initialize run control logics
2441 for (i = 0; i < MAX_RUN; i++) {
2442 runCtrl[i].runId = 0;
2443 runCtrl[i].fileId = -2;
2444 }
2445
2446//prepare for subProcesses
2447 gi_maxSize = g_maxSize;
2448 if (gi_maxSize <= 0)
2449 gi_maxSize = 1;
2450
2451 gi_maxProc = g_maxProc;
2452 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2453 snprintf (str, MXSTR, "illegal number of processes %d", gi_maxProc);
2454 factOut (kFatal, 301, str);
2455 gi_maxProc = 1;
2456 }
2457//partially initialize event control logics
2458 evtCtrl.frstPtr = 0;
2459 evtCtrl.lastPtr = 0;
2460
2461//start all threads (more to come) when we are allowed to ....
2462 while (g_runStat == 0) {
2463 xwait.tv_sec = 0;
2464 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2465 nanosleep (&xwait, NULL);
2466 }
2467
2468 i = 0;
2469 th_ret[i] = pthread_create (&thread[i], NULL, readFAD, NULL);
2470 i++;
2471 th_ret[i] = pthread_create (&thread[i], NULL, procEvt, NULL);
2472 i++;
2473 th_ret[i] = pthread_create (&thread[i], NULL, writeEvt, NULL);
2474 i++;
2475 imax = i;
2476
2477
2478#ifdef BILAND
2479 xwait.tv_sec = 30;;
2480 xwait.tv_nsec = 0; // sleep for ~20sec
2481 nanosleep (&xwait, NULL);
2482
2483 printf ("close all runs in 2 seconds\n");
2484
2485 CloseRunFile (0, time (NULL) + 2, 0);
2486
2487 xwait.tv_sec = 1;;
2488 xwait.tv_nsec = 0; // sleep for ~20sec
2489 nanosleep (&xwait, NULL);
2490
2491 printf ("setting g_runstat to -1\n");
2492
2493 g_runStat = -1;
2494#endif
2495
2496
2497//wait for all threads to finish
2498 for (i = 0; i < imax; i++) {
2499 j = pthread_join (thread[i], (void **) &status);
2500 }
2501
2502} /*-----------------------------------------------------------------*/
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518 /*-----------------------------------------------------------------*/
2519 /*-----------------------------------------------------------------*/
2520 /*-----------------------------------------------------------------*/
2521 /*-----------------------------------------------------------------*/
2522 /*-----------------------------------------------------------------*/
2523
2524#ifdef BILAND
2525
2526int
2527subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2528 int8_t * buffer)
2529{
2530 printf ("called subproc %d\n", threadID);
2531 return threadID + 1;
2532}
2533
2534
2535
2536
2537 /*-----------------------------------------------------------------*/
2538 /*-----------------------------------------------------------------*/
2539 /*-----------------------------------------------------------------*/
2540 /*-----------------------------------------------------------------*/
2541 /*-----------------------------------------------------------------*/
2542
2543
2544
2545
2546FileHandle_t
2547runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2548{
2549 return 1;
2550};
2551
2552int
2553runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2554{
2555 return 1;
2556 usleep (10000);
2557 return 1;
2558}
2559
2560
2561//{ return 1; } ;
2562
2563int
2564runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2565{
2566 return 1;
2567};
2568
2569
2570
2571
2572int
2573eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2574{
2575 int i = 0;
2576
2577// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2578// for (i=0; i<NBOARDS; i++) {
2579// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2580// }
2581 return 0;
2582}
2583
2584
2585void
2586factStatNew (EVT_STAT gi)
2587{
2588 int i;
2589
2590//for (i=0;i<MAX_SOCK;i++) {
2591// printf("%4d",gi.numRead[i]);
2592// if (i%20 == 0 ) printf("\n");
2593//}
2594}
2595
2596void
2597gotNewRun (int runnr, PEVNT_HEADER * headers)
2598{
2599 printf ("got new run %d\n", runnr);
2600 return;
2601}
2602
2603void
2604factStat (GUI_STAT gj)
2605{
2606// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2607// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2608}
2609
2610
2611void
2612debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2613 int state, uint32_t tsec, uint32_t tusec)
2614{
2615// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2616}
2617
2618
2619
2620void
2621debugStream (int isock, void *buf, int len)
2622{
2623}
2624
2625void
2626debugHead (int i, int j, void *buf)
2627{
2628}
2629
2630
2631void
2632factOut (int severity, int err, char *message)
2633{
2634 static FILE *fd;
2635 static int file = 0;
2636
2637 if (file == 0) {
2638 printf ("open file\n");
2639 fd = fopen ("x.out", "w+");
2640 file = 999;
2641 }
2642
2643 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2644
2645 if (severity != kDebug)
2646 printf ("%3d %3d | %s\n", severity, err, message);
2647}
2648
2649
2650
2651int
2652main ()
2653{
2654 int i, b, c, p;
2655 char ipStr[100];
2656 struct in_addr IPaddr;
2657
2658 g_maxMem = 1024 * 1024; //MBytes
2659//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2660 g_maxMem = g_maxMem * 200; //100MBytes
2661
2662 g_maxProc = 20;
2663 g_maxSize = 30000;
2664
2665 g_runStat = 40;
2666
2667 i = 0;
2668
2669// version for standard crates
2670//for (c=0; c<4,c++) {
2671// for (b=0; b<10; b++) {
2672// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2673//
2674// inet_pton(PF_INET, ipStr, &IPaddr) ;
2675//
2676// g_port[i].sockAddr.sin_family = PF_INET;
2677// g_port[i].sockAddr.sin_port = htons(5000) ;
2678// g_port[i].sockAddr.sin_addr = IPaddr ;
2679// g_port[i].sockDef = 1 ;
2680// i++ ;
2681// }
2682//}
2683//
2684//version for PC-test *
2685 for (c = 0; c < 4; c++) {
2686 for (b = 0; b < 10; b++) {
2687 sprintf (ipStr, "10.0.%d.11", 128 + c);
2688 if (c < 2)
2689 sprintf (ipStr, "10.0.%d.11", 128);
2690 else
2691 sprintf (ipStr, "10.0.%d.11", 131);
2692// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2693
2694 inet_pton (PF_INET, ipStr, &IPaddr);
2695 p = 31919 + 100 * c + 10 * b;
2696
2697
2698 g_port[i].sockAddr.sin_family = PF_INET;
2699 g_port[i].sockAddr.sin_port = htons (p);
2700 g_port[i].sockAddr.sin_addr = IPaddr;
2701 g_port[i].sockDef = 1;
2702
2703 i++;
2704 }
2705 }
2706
2707
2708//g_port[17].sockDef =-1 ;
2709//g_actBoards-- ;
2710
2711 StartEvtBuild ();
2712
2713 return 0;
2714
2715}
2716#endif
Note: See TracBrowser for help on using the repository browser.