source: trunk/FACT++/src/EventBuilder.c@ 12392

Last change on this file since 12392 was 12372, checked in by lyard, 13 years ago
implemented chunk allocation of memory. Not optimal yet, but at least fadctrl uses one the amount it is supposed to use
File size: 84.2 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6
7#include <stdlib.h>
8#include <stdint.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include "EventBuilder.h"
26
27enum Severity
28{
29 kMessage = 10, ///< Just a message, usually obsolete
30 kInfo = 20, ///< An info telling something which can be interesting to know
31 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
32 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
33 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
34 kDebug = 99, ///< A message used for debugging only
35};
36
37#define MIN_LEN 32 // min #bytes needed to interpret FADheader
38#define MAX_LEN 256*1024 // size of read-buffer per socket
39
40//#define nanosleep(x,y)
41
42extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
43extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
44extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
45//extern int runFinish (uint32_t runnr);
46
47extern void factOut (int severity, int err, char *message);
48
49extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
50
51
52extern void factStat (GUI_STAT gj);
53
54extern void factStatNew (EVT_STAT gi);
55
56extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
57
58extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
59 int8_t * buffer);
60
61extern void debugHead (int i, int j, void *buf);
62
63extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
64 int32_t runnr, int state, uint32_t tsec,
65 uint32_t tusec);
66extern void debugStream (int isock, void *buf, int len);
67
68int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
69
70
71
72
73
74int g_maxProc;
75int g_maxSize;
76int gi_maxSize;
77int gi_maxProc;
78
79uint g_actTime;
80uint g_actUsec;
81int g_runStat;
82int g_reset;
83int g_useFTM;
84
85int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
86size_t g_maxMem; //maximum memory allowed for buffer
87
88//no longer needed ...
89int g_maxBoards; //maximum number of boards to be initialized
90int g_actBoards;
91//
92
93FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
94
95
96int gi_runStat;
97int gp_runStat;
98int gw_runStat;
99
100int gi_memStat = +1;
101
102uint32_t gi_myRun = 0;
103uint32_t actrun = 0;
104
105
106uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
107
108//uint gi_EvtStart= 0 ;
109//uint gi_EvtRead = 0 ;
110//uint gi_EvtBad = 0 ;
111//uint gi_EvtTot = 0 ;
112//size_t gi_usedMem = 0 ;
113
114//uint gw_EvtTot = 0 ;
115//uint gp_EvtTot = 0 ;
116
117PIX_MAP g_pixMap[NPIX];
118
119EVT_STAT gi;
120GUI_STAT gj;
121
122EVT_CTRL evtCtrl; //control of events during processing
123int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
124
125WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
126
127#define MXSTR 1000
128char str[MXSTR];
129
130//ETIENNE
131#define MAX_SLOTS_PER_CHUNK 100
132
133typedef struct {
134 int32_t eventNumber;
135 int32_t chunk;
136 int32_t slot;
137} CHUNK_MAPPING;
138
139CHUNK_MAPPING mBufferMapping[MAX_EVT * MAX_RUN];
140
141#define MAX_EVT_MEM (sizeof(EVENT) + NPIX*1024*2 + NTMARK*1024*2)
142#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
143#define MAX_SLOT_SIZE (MAX_EVT_MEM + MAX_HEAD_MEM)
144#define MAX_CHUNK_SIZE (MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK)
145
146typedef struct {
147 void * pointers[MAX_SLOTS_PER_CHUNK];
148 int32_t events[MAX_SLOTS_PER_CHUNK];
149 int32_t nFreeSlots;
150 int32_t nSlots;
151} ETI_CHUNK;
152
153int32_t numAllocatedChunks = 0;
154
155#define MAX_CHUNKS 8096
156ETI_CHUNK EtiMemoryChunks[MAX_CHUNKS];
157
158void* ETI_Malloc(int evtId, int evtIndex)
159{
160 int i,j;
161 for (i=0;i<numAllocatedChunks;i++) {
162 if (EtiMemoryChunks[i].nFreeSlots > 0) {
163 for (j=0;j<EtiMemoryChunks[i].nSlots;j++)
164 {
165 if (EtiMemoryChunks[i].events[j] == -1)
166 {
167 EtiMemoryChunks[i].events[j] = evtId;
168 EtiMemoryChunks[i].nFreeSlots--;
169 mBufferMapping[evtIndex].eventNumber = evtId;
170 mBufferMapping[evtIndex].chunk = i;
171 mBufferMapping[evtIndex].slot = j;
172 return EtiMemoryChunks[i].pointers[j];
173 }
174 }
175 //If I reach this point then we have a problem because it should have found
176 //a free spot just above.
177 }
178 }
179 //If we reach this point this means that we should allocate more memory
180 int32_t numNewSlots = 0;
181 if ((numAllocatedChunks + 1)*MAX_CHUNK_SIZE < g_maxMem)
182 numNewSlots = MAX_SLOTS_PER_CHUNK;
183 else
184 numNewSlots = (g_maxMem - numAllocatedChunks*MAX_CHUNK_SIZE)/MAX_SLOT_SIZE;
185
186 if (numNewSlots == 0)//cannot allocate more without exceeding the max mem limit
187 {
188 return NULL;
189 }
190
191 EtiMemoryChunks[numAllocatedChunks].pointers[0] = malloc(MAX_SLOT_SIZE*numNewSlots);
192 if (EtiMemoryChunks[numAllocatedChunks].pointers[0] == NULL)
193 {
194 snprintf (str, MXSTR, "could not allocate %lu bytes. %d chunks are currently allocated (max allowed %lu bytes)", MAX_CHUNK_SIZE, numAllocatedChunks, g_maxMem);
195 factOut (kError, 000, str);
196 return NULL;
197 }
198
199 EtiMemoryChunks[numAllocatedChunks].nSlots = numNewSlots;
200 EtiMemoryChunks[numAllocatedChunks].events[0] = evtId;
201 EtiMemoryChunks[numAllocatedChunks].nFreeSlots = numNewSlots-1;
202 mBufferMapping[evtIndex].eventNumber = evtId;
203 mBufferMapping[evtIndex].chunk = numAllocatedChunks;
204 mBufferMapping[evtIndex].slot = 0;
205
206 for (i=1;i<numNewSlots;i++)
207 {
208 EtiMemoryChunks[numAllocatedChunks].pointers[i] = &(((char*)(EtiMemoryChunks[numAllocatedChunks].pointers[i-1]))[MAX_SLOT_SIZE]);
209 EtiMemoryChunks[numAllocatedChunks].events[i] = -1;
210 }
211 numAllocatedChunks++;
212
213 return EtiMemoryChunks[numAllocatedChunks-1].pointers[0];
214}
215
216void ETI_Free(int evtId, int evtIndex)
217{
218 ETI_CHUNK* currentChunk = &EtiMemoryChunks[mBufferMapping[evtIndex].chunk];
219 if (currentChunk->events[mBufferMapping[evtIndex].slot] != evtId)
220 {
221 snprintf (str, MXSTR, "Mismatch in chunk mapping table. expected evtId %d. Got %d. No memory was freed.", evtId, currentChunk->events[mBufferMapping[evtIndex].slot]);
222 factOut (kError, 000, str);
223 return;
224 }
225 currentChunk->events[mBufferMapping[evtIndex].slot] = -1;
226 currentChunk->nFreeSlots++;
227 //check if some chunks should be freed
228 int i;
229 for (i=0;i<numAllocatedChunks;i++)
230 {
231 if (EtiMemoryChunks[i].nFreeSlots == EtiMemoryChunks[i].nSlots)
232 {
233 snprintf(str, MXSTR, "Slot %d could be freed", i);
234 factOut(kError, 000, str);
235 }
236 }
237 int chunkIndex = mBufferMapping[evtIndex].chunk;
238 if (chunkIndex != numAllocatedChunks-1)
239 return;
240
241 while (EtiMemoryChunks[chunkIndex].nFreeSlots == EtiMemoryChunks[chunkIndex].nSlots)
242 {//free this chunk
243 snprintf(str, MXSTR, "Freeing chunk #%d", numAllocatedChunks);
244 factOut(kError, 000, str);
245 free(EtiMemoryChunks[chunkIndex].pointers[0]);
246 numAllocatedChunks--;
247 chunkIndex--;
248 if (numAllocatedChunks == 0)
249 break;
250 }
251}
252//END ETIENNE
253
254
255
256RUN_HEAD actRun;
257
258RUN_CTRL runCtrl[MAX_RUN];
259
260RUN_TAIL runTail[MAX_RUN];
261
262
263/*
264*** Definition of rdBuffer to read in IP packets; keep it global !!!!
265 */
266
267
268typedef union
269{
270 int8_t B[MAX_LEN];
271 int16_t S[MAX_LEN / 2];
272 int32_t I[MAX_LEN / 4];
273 int64_t L[MAX_LEN / 8];
274} CNV_FACT;
275
276typedef struct
277{
278 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
279 int32_t bufPos; //next byte to read to the buffer next
280 int32_t bufLen; //number of bytes left to read
281// size_t bufLen; //number of bytes left to read size_t might be better
282 int32_t skip; //number of bytes skipped before start of event
283
284 int errCnt; //how often connect failed since last successful
285 int sockStat; //-1 if socket not yet connected , 99 if not exist
286 int socket; //contains the sockets
287 struct sockaddr_in SockAddr; //IP for each socket
288
289 int evtID; // event ID of event currently read
290 int runID; // run "
291 int ftmID; // event ID from FTM
292 uint fadLen; // FADlength of event currently read
293 int fadVers; // Version of FAD
294 int ftmTyp; // trigger type
295 int board; // boardID (softwareID: 0..40 )
296 int Port;
297
298 CNV_FACT *rBuf;
299
300#ifdef EVTDEBUG
301 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
302#endif
303
304} READ_STRUCT;
305
306
307typedef union
308{
309 int8_t B[2];
310 int16_t S;
311} SHORT_BYTE;
312
313
314
315
316
317SHORT_BYTE start, stop;
318
319READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
320
321
322
323/*-----------------------------------------------------------------*/
324
325
326/*-----------------------------------------------------------------*/
327
328int
329runFinish1 (uint32_t runnr)
330{
331 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
332 runnr);
333 factOut (kInfo, 173, str); //but continue anyhow
334 return 0;
335}
336int
337runFinish (uint32_t runnr)
338{
339 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
340 runnr);
341 factOut (kInfo, 173, str); //but continue anyhow
342 return 0;
343}
344
345int
346GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
347 READ_STRUCT * rd)
348{
349/*
350*** generate Address, create sockets and allocates readbuffer for it
351***
352*** if flag==0 generate socket and buffer
353*** <0 destroy socket and buffer
354*** >0 close and redo socket
355***
356*** sid : board*7 + port id
357 */
358
359 int j;
360 int optval = 1; //activate keepalive
361 socklen_t optlen = sizeof (optval);
362
363
364 if (sid % 7 >= NUMSOCK) {
365 //this is a not used socket, so do nothing ...
366 rd->sockStat = 77;
367 rd->rBuf = NULL ;
368 return 0;
369 }
370
371 if (rd->sockStat == 0) { //close socket if open
372 j = close (rd->socket);
373 if (j > 0) {
374 snprintf (str, MXSTR, "Error closing socket %d | %m", sid);
375 factOut (kFatal, 771, str);
376 } else {
377 snprintf (str, MXSTR, "Succesfully closed socket %d", sid);
378 factOut (kInfo, 771, str);
379 }
380 }
381
382 rd->sockStat = 99;
383
384 if (flag < 0) {
385 free (rd->rBuf); //and never open again
386#ifdef EVTDEBUG
387 free (rd->xBuf); //and never open again
388#endif
389 rd->rBuf = NULL;
390 return 0;
391 }
392
393
394 if (flag == 0) { //generate address and buffer ...
395 rd->Port = port;
396 rd->SockAddr.sin_family = sockAddr->sin_family;
397 rd->SockAddr.sin_port = htons (port);
398 rd->SockAddr.sin_addr = sockAddr->sin_addr;
399
400#ifdef EVTDEBUG
401 rd->xBuf = malloc (sizeof (CNV_FACT));
402#endif
403 rd->rBuf = malloc (sizeof (CNV_FACT));
404 if (rd->rBuf == NULL) {
405 snprintf (str, MXSTR, "Could not create local buffer %d", sid);
406 factOut (kFatal, 774, str);
407 rd->sockStat = 77;
408 return -3;
409 }
410 }
411
412
413 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
414 snprintf (str, MXSTR, "Could not generate socket %d | %m", sid);
415 factOut (kFatal, 773, str);
416 rd->sockStat = 88;
417 return -2;
418 }
419 optval = 1;
420 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
421 snprintf (str, MXSTR, "Could not set keepalive %d | %m", sid);
422 factOut (kInfo, 173, str); //but continue anyhow
423 }
424 optval = 10; //start after 10 seconds
425 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
426 snprintf (str, MXSTR, "Could not set keepidle %d | %m", sid);
427 factOut (kInfo, 173, str); //but continue anyhow
428 }
429 optval = 10; //do every 10 seconds
430 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
431 snprintf (str, MXSTR, "Could not set keepintvl %d | %m", sid);
432 factOut (kInfo, 173, str); //but continue anyhow
433 }
434 optval = 2; //close after 2 unsuccessful tries
435 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
436 snprintf (str, MXSTR, "Could not set keepalive probes %d | %m", sid);
437 factOut (kInfo, 173, str); //but continue anyhow
438 }
439
440
441 snprintf (str, MXSTR, "Successfully generated socket %d ", sid);
442 factOut (kInfo, 773, str);
443 rd->sockStat = -1; //try to (re)open socket
444 rd->errCnt = 0;
445 return 0;
446
447} /*-----------------------------------------------------------------*/
448
449 /*-----------------------------------------------------------------*/
450
451
452
453
454int
455mBufInit ()
456{
457// initialize mBuffer (mark all entries as unused\empty)
458
459 int i;
460 uint32_t actime;
461
462 actime = g_actTime + 50000000;
463
464 for (i = 0; i < MAX_EVT * MAX_RUN; i++) {
465 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
466 mBuffer[i].runNum = 0;
467
468 evtCtrl.evtBuf[i] = -1;
469 evtCtrl.evtStat[i] = -1;
470 evtCtrl.pcTime[i] = actime; //initiate to far future
471
472 //ETIENNE
473 mBufferMapping[i].chunk = -1;
474 mBufferMapping[i].eventNumber = -1;
475 mBufferMapping[i].slot = -1;
476 //END ETIENNE
477 }
478
479 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
480
481 evtCtrl.frstPtr = 0;
482 evtCtrl.lastPtr = 0;
483
484 return 0;
485
486} /*-----------------------------------------------------------------*/
487
488
489
490
491int
492mBufEvt (int evID, uint runID, int nRoi[], int sk,
493 int fadlen, int trgTyp, int trgNum, int fadNum)
494{
495// generate a new Event into mBuffer:
496// make sure only complete Event are possible, so 'free' will always work
497// returns index into mBuffer[], or negative value in case of error
498// error: <-9000 if roi screwed up (not consistent with run)
499// <-8000 (not consistent with event)
500// <-7000 (not consistent with board)
501// < 0 if no space left
502
503 struct timeval *tv, atv;
504 tv = &atv;
505 uint32_t tsec, tusec;
506 uint oldest;
507 int jold;
508
509 int i, k, jr, b, evFree;
510 int headmem = 0;
511 size_t needmem = 0;
512
513
514 b = sk / 7;
515
516 if (nRoi[0] < 0 || nRoi[0] > 1024) {
517 snprintf (str, MXSTR, "illegal nRoi[0]: %d", nRoi[0]);
518 factOut (kError, 999, str);
519 gj.badRoiR++;
520 gj.badRoi[b]++;
521 return -9999;
522 }
523
524 for (jr = 1; jr < 8; jr++) {
525 if (nRoi[jr] != nRoi[0]) {
526 snprintf (str, MXSTR, "wrong nRoi[%d]: %d %d", jr, nRoi[jr],
527 nRoi[0]);
528 factOut (kError, 711, str);
529 gj.badRoiB++;
530 gj.badRoi[b]++;
531 return -7101;
532 }
533 }
534 if (nRoi[8] < nRoi[0]) {
535 snprintf (str, MXSTR, "wrong nRoi_TM: %d %d", nRoi[8], nRoi[0]);
536 factOut (kError, 712, str);
537 gj.badRoiB++;
538 gj.badRoi[b]++;
539 return -7102;
540 }
541
542
543 i = evID % MAX_EVT;
544 evFree = -1;
545
546 for (k = 0; k < MAX_RUN; k++) {
547 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
548 // is it ok ????
549 if (mBuffer[i].nRoi != nRoi[0]
550 || mBuffer[i].nRoiTM != nRoi[8]) {
551 snprintf (str, MXSTR, "illegal evt_roi %d %d ; %d %d",
552 nRoi[0], nRoi[8], mBuffer[i].nRoi, mBuffer[i].nRoiTM);
553 factOut (kError, 821, str);
554 gj.badRoiE++;
555 gj.badRoi[b]++;
556 return -8201;
557 }
558// count for inconsistencies
559
560 if (mBuffer[i].trgNum != trgNum)
561 mBuffer[i].Errors[0]++;
562 if (mBuffer[i].fadNum != fadNum)
563 mBuffer[i].Errors[1]++;
564 if (mBuffer[i].trgTyp != trgTyp)
565 mBuffer[i].Errors[2]++;
566
567 //everything seems fine so far ==> use this slot ....
568 return i;
569 }
570 if (evFree < 0 && mBuffer[i].evNum < 0)
571 evFree = i;
572 i += MAX_EVT;
573 }
574
575
576 //event does not yet exist; create it
577 if (evFree < 0) { //no space available in ctrl
578 snprintf (str, MXSTR, "no control slot to keep event %d", evID);
579 factOut (kError, 881, str);
580 return -1;
581 }
582 i = evFree; //found free entry; use it ...
583
584 gettimeofday (tv, NULL);
585 tsec = atv.tv_sec;
586 tusec = atv.tv_usec;
587
588 //check if runId already registered in runCtrl
589 evFree = -1;
590 oldest = g_actTime + 1000;
591 jold = -1;
592 for (k = 0; k < MAX_RUN; k++) {
593 if (runCtrl[k].runId == runID) {
594// if (runCtrl[k].procId > 0) { //run is closed -> reject
595// snprintf (str, MXSTR, "skip event since run %d finished", runID);
596// factOut (kInfo, 931, str);
597// return -21;
598// }
599
600 if (runCtrl[k].roi0 != nRoi[0]
601 || runCtrl[k].roi8 != nRoi[8]) {
602 snprintf (str, MXSTR, "illegal run_roi %d %d ; %d %d",
603 nRoi[0], nRoi[8], runCtrl[k].roi0, runCtrl[k].roi8);
604 factOut (kError, 931, str);
605 gj.badRoiR++;
606 gj.badRoi[b]++;
607 return -9301;
608 }
609 goto RUNFOUND;
610 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
611 evFree = k;
612 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
613 if (runCtrl[k].closeTime < oldest) {
614 oldest = runCtrl[k].closeTime;
615 jold = k;
616 }
617 }
618 }
619
620 if (evFree < 0 && jold < 0) {
621 snprintf (str, MXSTR, "not able to register the new run %d", runID);
622 factOut (kFatal, 883, str);
623 return -1001;
624 } else {
625 if (evFree < 0)
626 evFree = jold;
627 snprintf (str, MXSTR, "register new run %d(%d) roi: %d %d", runID,
628 evFree, nRoi[0], nRoi[8]);
629 factOut (kInfo, 503, str);
630 runCtrl[evFree].runId = runID;
631 runCtrl[evFree].roi0 = nRoi[0];
632 runCtrl[evFree].roi8 = nRoi[8];
633 runCtrl[evFree].fileId = -2;
634 runCtrl[evFree].procId = -2;
635 runCtrl[evFree].lastEvt = -1;
636 runCtrl[evFree].nextEvt = 0;
637 runCtrl[evFree].actEvt = 0;
638 runCtrl[evFree].procEvt = 0;
639 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
640 runCtrl[evFree].firstUsec = tusec;
641 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
642 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
643// runCtrl[evFree].lastTime = 0;
644
645 runTail[evFree].nEventsOk =
646 runTail[evFree].nEventsRej =
647 runTail[evFree].nEventsBad =
648 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
649 }
650
651 RUNFOUND:
652 //ETIENNE
653/* needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
654
655 headmem = NBOARDS * sizeof (PEVNT_HEADER);
656
657 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
658 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
659 if (gi_memStat > 0) {
660 gi_memStat = -99;
661 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
662 evID, sk);
663 factOut (kError, 882, str);
664 } else {
665 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
666 evID, sk);
667 factOut (kDebug, 882, str);
668 }
669 return -11;
670 }
671*/
672 mBuffer[i].FADhead = ETI_Malloc(evID, i);
673 mBuffer[i].buffer = NULL;
674 if (mBuffer[i].FADhead != NULL)
675 mBuffer[i].fEvent = (EVENT*)&(((char*)(mBuffer[i].FADhead))[MAX_HEAD_MEM]);
676 else
677 {
678 mBuffer[i].fEvent = NULL;
679 gj.usdMem = 0;
680 for (k=0;k<numAllocatedChunks;k++)
681 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
682 if (gj.usdMem > gj.maxMem)
683 gj.maxMem = gj.usdMem;
684 if (gi_memStat > 0) {
685 gi_memStat = -99;
686 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
687 evID, sk);
688 factOut (kError, 882, str);
689 } else {
690 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
691 evID, sk);
692 factOut (kDebug, 882, str);
693 }
694 return -11;
695 }
696 /*
697 mBuffer[i].FADhead = malloc (headmem);
698 if (mBuffer[i].FADhead == NULL) {
699 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
700 factOut (kError, 882, str);
701 return -12;
702 }
703
704 mBuffer[i].fEvent = malloc (needmem);
705 if (mBuffer[i].fEvent == NULL) {
706 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
707 factOut (kError, 882, str);
708 free (mBuffer[i].FADhead);
709 mBuffer[i].FADhead = NULL;
710 return -22;
711 }
712
713 mBuffer[i].buffer = malloc (gi_maxSize);
714 if (mBuffer[i].buffer == NULL) {
715 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
716 factOut (kError, 882, str);
717 free (mBuffer[i].FADhead);
718 mBuffer[i].FADhead = NULL;
719 free (mBuffer[i].fEvent);
720 mBuffer[i].fEvent = NULL;
721 return -32;
722 }*/
723 //END ETIENNE
724 //flag all boards as unused
725 mBuffer[i].nBoard = 0;
726 for (k = 0; k < NBOARDS; k++) {
727 mBuffer[i].board[k] = -1;
728 }
729 //flag all pixels as unused
730 for (k = 0; k < NPIX; k++) {
731 mBuffer[i].fEvent->StartPix[k] = -1;
732 }
733 //flag all TMark as unused
734 for (k = 0; k < NTMARK; k++) {
735 mBuffer[i].fEvent->StartTM[k] = -1;
736 }
737
738 mBuffer[i].fEvent->NumBoards = 0;
739 mBuffer[i].fEvent->PCUsec = tusec;
740 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
741 mBuffer[i].nRoi = nRoi[0];
742 mBuffer[i].nRoiTM = nRoi[8];
743 mBuffer[i].evNum = evID;
744 mBuffer[i].runNum = runID;
745 mBuffer[i].fadNum = fadNum;
746 mBuffer[i].trgNum = trgNum;
747 mBuffer[i].trgTyp = trgTyp;
748 mBuffer[i].evtLen = needmem;
749 mBuffer[i].Errors[0] =
750 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
751//ETIENNE
752 //gj.usdMem += needmem + headmem + gi_maxSize;
753 gj.usdMem = 0;
754 for (k=0;k<numAllocatedChunks;k++)
755 {
756 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
757 }
758//END ETIENNE
759 if (gj.usdMem > gj.maxMem)
760 gj.maxMem = gj.usdMem;
761
762 gj.bufTot++;
763 if (gj.bufTot > gj.maxEvt)
764 gj.maxEvt = gj.bufTot;
765
766 gj.rateNew++;
767
768 //register event in 'active list (reading)'
769
770 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
771 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
772 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
773 evtIdx[i] = evtCtrl.lastPtr;
774
775
776 snprintf (str, MXSTR,
777 "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", evID,
778 runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
779 factOut (kDebug, -11, str);
780 evtCtrl.lastPtr++;
781 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN)
782 evtCtrl.lastPtr = 0;
783
784 gi.evtGet++;
785
786 return i;
787
788} /*-----------------------------------------------------------------*/
789
790
791
792
793int
794mBufFree (int i)
795{
796//delete entry [i] from mBuffer:
797//(and make sure multiple calls do no harm ....)
798
799 int headmem = 0;
800 int evid;
801 size_t freemem = 0;
802
803 evid = mBuffer[i].evNum;
804 freemem = mBuffer[i].evtLen;
805 //ETIENNE
806 ETI_Free(evid, i);
807// free (mBuffer[i].fEvent);
808 mBuffer[i].fEvent = NULL;
809
810// free (mBuffer[i].FADhead);
811 mBuffer[i].FADhead = NULL;
812
813// free (mBuffer[i].buffer);
814 mBuffer[i].buffer = NULL;
815//END ETIENNE
816 headmem = NBOARDS * sizeof (PEVNT_HEADER);
817 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
818 mBuffer[i].runNum = 0;
819//ETIENNE
820// gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
821 gj.usdMem = 0;
822 int k;
823 for (k=0;k<numAllocatedChunks;k++)
824 {
825 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
826 }
827//END ETIENNE
828 gj.bufTot--;
829
830 if (gi_memStat < 0) {
831 if (gj.usdMem <= 0.75 * gj.maxMem)
832 gi_memStat = +1;
833 }
834
835
836 return 0;
837
838} /*-----------------------------------------------------------------*/
839
840
841void
842resetEvtStat ()
843{
844 int i;
845
846 for (i = 0; i < MAX_SOCK; i++)
847 gi.numRead[i] = 0;
848
849 for (i = 0; i < NBOARDS; i++) {
850 gi.gotByte[i] = 0;
851 gi.gotErr[i] = 0;
852
853 }
854
855 gi.evtGet = 0; //#new Start of Events read
856 gi.evtTot = 0; //#complete Events read
857 gi.evtErr = 0; //#Events with Errors
858 gi.evtSkp = 0; //#Events incomplete (timeout)
859
860 gi.procTot = 0; //#Events processed
861 gi.procErr = 0; //#Events showed problem in processing
862 gi.procTrg = 0; //#Events accepted by SW trigger
863 gi.procSkp = 0; //#Events rejected by SW trigger
864
865 gi.feedTot = 0; //#Events used for feedBack system
866 gi.feedErr = 0; //#Events rejected by feedBack
867
868 gi.wrtTot = 0; //#Events written to disk
869 gi.wrtErr = 0; //#Events with write-error
870
871 gi.runOpen = 0; //#Runs opened
872 gi.runClose = 0; //#Runs closed
873 gi.runErr = 0; //#Runs with open/close errors
874
875 return;
876} /*-----------------------------------------------------------------*/
877
878
879
880void
881initReadFAD ()
882{
883 return;
884} /*-----------------------------------------------------------------*/
885
886
887
888void *
889readFAD (void *ptr)
890{
891/* *** main loop reading FAD data and sorting them to complete events */
892 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k;
893 int actBoards = 0, minLen;
894 int32_t jrd;
895 uint gi_SecTime; //time in seconds
896 int boardId, roi[9], drs, px, src, pixS, pixH, pixC, pixR, tmS;
897
898 int goodhed = 0;
899
900 int sockDef[NBOARDS]; //internal state of sockets
901 int jrdx;
902
903
904 struct timespec xwait;
905
906
907 struct timeval *tv, atv;
908 tv = &atv;
909 uint32_t tsec, tusec;
910
911
912 snprintf (str, MXSTR, "start initializing");
913 factOut (kInfo, -1, str);
914
915 int cpu = 7; //read thread
916 cpu_set_t mask;
917
918/* CPU_ZERO initializes all the bits in the mask to zero. */
919// CPU_ZERO (&mask);
920/* CPU_SET sets only the bit corresponding to cpu. */
921 cpu = 7;
922// CPU_SET (cpu, &mask);
923
924/* sched_setaffinity returns 0 in success */
925// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
926// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
927// factOut (kWarn, -1, str);
928// }
929
930
931 head_len = sizeof (PEVNT_HEADER);
932 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
933 minLen = head_len; //min #bytes needed to check header: full header for debug
934
935 start.S = 0xFB01;
936 stop.S = 0x04FE;
937
938/* initialize run control logics */
939 for (i = 0; i < MAX_RUN; i++) {
940 runCtrl[i].runId = 0;
941 runCtrl[i].fileId = -2;
942 runCtrl[i].procId = -2;
943 }
944 gi_resetS = gi_resetR = 9;
945 for (i = 0; i < NBOARDS; i++)
946 sockDef[i] = 0;
947
948 START:
949 gettimeofday (tv, NULL);
950 g_actTime = tsec = atv.tv_sec;
951 g_actUsec = tusec = atv.tv_usec;
952 gi_myRun = g_actTime;
953 evtCtrl.frstPtr = 0;
954 evtCtrl.lastPtr = 0;
955
956 gi_SecTime = g_actTime;
957 gi_runStat = g_runStat;
958 gj.readStat = g_runStat;
959 numok = numok2 = 0;
960
961 int cntsock = 8 - NUMSOCK ;
962
963 if (gi_resetS > 0) {
964 //make sure all sockets are preallocated as 'not exist'
965 for (i = 0; i < MAX_SOCK; i++) {
966 rd[i].socket = -1;
967 rd[i].sockStat = 99;
968 }
969
970 for (k = 0; k < NBOARDS; k++) {
971 gi_NumConnect[k] = 0;
972 gi.numConn[k] = 0;
973 gj.numConn[k] = 0;
974 gj.errConn[k] = 0;
975 gj.rateBytes[k] = 0;
976 gj.totBytes[k] = 0;
977 }
978
979 }
980
981
982 if (gi_resetR > 0) {
983 resetEvtStat ();
984 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
985 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
986 gj.totMem = g_maxMem;
987 gj.bufNew = gj.bufEvt = 0;
988 gj.badRoiE = gj.badRoiR = gj.badRoiB =
989 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
990
991 int b,p;
992 for (b = 0; b < NBOARDS; b++)
993 gj.badRoi[b] = 0;
994
995 mBufInit (); //initialize buffers
996
997 snprintf (str, MXSTR, "end initializing");
998 factOut (kInfo, -1, str);
999 }
1000
1001
1002 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
1003
1004 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
1005
1006 gi_runStat = g_runStat;
1007 gj.readStat = g_runStat;
1008 gettimeofday (tv, NULL);
1009 g_actTime = tsec = atv.tv_sec;
1010 g_actUsec = tusec = atv.tv_usec;
1011
1012
1013 int b, p, p0, s0, nch;
1014 nch = 0;
1015 for (b = 0; b < NBOARDS; b++) {
1016 k = b * 7;
1017 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
1018 nch++;
1019 gi_NumConnect[b] = 0; //must close all connections
1020 gi.numConn[b] = 0;
1021 gj.numConn[b] = 0;
1022 if (sockDef[b] == 0)
1023 s0 = 0; //sockets to be defined and opened
1024 else if (g_port[b].sockDef == 0)
1025 s0 = -1; //sockets to be destroyed
1026 else
1027 s0 = +1; //sockets to be closed and reopened
1028
1029 if (s0 == 0)
1030 p0 = ntohs (g_port[b].sockAddr.sin_port);
1031 else
1032 p0 = 0;
1033
1034 for (p = p0 + 1; p < p0 + 8; p++) {
1035 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
1036 k++;
1037 }
1038 sockDef[b] = g_port[b].sockDef;
1039 }
1040 }
1041
1042 if (nch > 0) {
1043 actBoards = 0;
1044 for (b = 0; b < NBOARDS; b++) {
1045 if (sockDef[b] > 0)
1046 actBoards++;
1047 }
1048 }
1049
1050
1051 jrdx = 0;
1052 numokx = 0;
1053 numok = 0; //count number of succesfull actions
1054
1055 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read
1056 b = i / 7 ;
1057 p = i % 7 ;
1058
1059if ( p >= NUMSOCK) { ; }
1060else {
1061 if (sockDef[b] > 0)
1062 s0 = +1;
1063 else
1064 s0 = -1;
1065
1066 if (rd[i].sockStat < 0) { //try to connect if not yet done
1067 if (rd[i].sockStat == -1) {
1068 rd[i].sockStat = connect (rd[i].socket,
1069 (struct sockaddr *) &rd[i].SockAddr,
1070 sizeof (rd[i].SockAddr));
1071 if (rd[i].sockStat == -1) {
1072 rd[i].errCnt++ ;
1073// if (rd[i].errCnt < 100) rd[i].sockStat = rd[i].errCnt ;
1074// else if (rd[i].errCnt < 1000) rd[i].sockStat = 1000 ;
1075// else rd[i].sockStat = 10000 ;
1076 }
1077//printf("try to connect %d -> %d\n",i,rd[i].sockStat);
1078
1079 }
1080 if (rd[i].sockStat < -1 ) {
1081 rd[i].sockStat++ ;
1082 }
1083 if (rd[i].sockStat == 0) { //successfull ==>
1084 if (sockDef[b] > 0) {
1085 rd[i].bufTyp = 0; // expect a header
1086 rd[i].bufLen = frst_len; // max size to read at begining
1087 } else {
1088 rd[i].bufTyp = -1; // full data to be skipped
1089 rd[i].bufLen = MAX_LEN; //huge for skipping
1090 }
1091 rd[i].bufPos = 0; // no byte read so far
1092 rd[i].skip = 0; // start empty
1093// gi_NumConnect[b]++;
1094 gi_NumConnect[b] += cntsock ;
1095
1096 gi.numConn[b]++;
1097 gj.numConn[b]++;
1098 snprintf (str, MXSTR, "+++connect %d %d", b, gi.numConn[b]);
1099 factOut (kInfo, -1, str);
1100 }
1101 }
1102
1103 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
1104 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
1105 numok++;
1106 size_t maxread = rd[i].bufLen ;
1107 if (maxread > MAXREAD ) maxread=MAXREAD ;
1108
1109 jrd =
1110 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
1111 maxread, MSG_DONTWAIT);
1112// rd[i].bufLen, MSG_DONTWAIT);
1113
1114 if (jrd > 0) {
1115 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
1116#ifdef EVTDEBUG
1117 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
1118 &rd[i].rBuf->B[rd[i].bufPos], jrd);
1119 snprintf (str, MXSTR,
1120 "read sock %3d bytes %5d len %5d first %d %d", i,
1121 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos],
1122 rd[i].rBuf->B[rd[i].bufPos + 1]);
1123 factOut (kDebug, 301, str);
1124#endif
1125 }
1126
1127 if (jrd == 0) { //connection has closed ...
1128 snprintf (str, MXSTR, "Socket %d closed by FAD", i);
1129 factOut (kInfo, 441, str);
1130 GenSock (s0, i, 0, NULL, &rd[i]);
1131 gi.gotErr[b]++;
1132// gi_NumConnect[b]--;
1133 gi_NumConnect[b]-= cntsock ;
1134 gi.numConn[b]--;
1135 gj.numConn[b]--;
1136
1137 } else if (jrd < 0) { //did not read anything
1138 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1139 snprintf (str, MXSTR, "Error Reading from %d | %m", i);
1140 factOut (kError, 442, str);
1141 gi.gotErr[b]++;
1142 } else
1143 numok--; //else nothing waiting to be read
1144 jrd = 0;
1145 }
1146 } else {
1147 jrd = 0; //did read nothing as requested
1148 snprintf (str, MXSTR, "do not read from socket %d %d", i,
1149 rd[i].bufLen);
1150 factOut (kDebug, 301, str);
1151 }
1152
1153 gi.gotByte[b] += jrd;
1154 gj.rateBytes[b] += jrd;
1155
1156 if (jrd > 0) {
1157 numokx++;
1158 jrdx += jrd;
1159 }
1160
1161
1162 if (rd[i].bufTyp < 0) { // we are skipping this board ...
1163// just do nothing
1164#ifdef EVTDEBUG
1165 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
1166 i);
1167 factOut (kInfo, 301, str);
1168#endif
1169
1170 } else if (rd[i].bufTyp > 0) { // we are reading data ...
1171 if (jrd < rd[i].bufLen) { //not yet all read
1172 rd[i].bufPos += jrd; //==> prepare for continuation
1173 rd[i].bufLen -= jrd;
1174 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
1175 } else { //full dataset read
1176 rd[i].bufLen = 0;
1177 rd[i].bufPos = rd[i].fadLen;
1178 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
1179 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
1180 gi.evtErr++;
1181 snprintf (str, MXSTR,
1182 "wrong end of buffer found sock %3d ev %4d len %5d %3d %3d - %3d %3d ",
1183 i, rd[i].fadLen, rd[i].evtID, rd[i].rBuf->B[0],
1184 rd[i].rBuf->B[1],
1185 rd[i].rBuf->B[rd[i].bufPos - 2],
1186 rd[i].rBuf->B[rd[i].bufPos - 1]);
1187 factOut (kError, 301, str);
1188 goto EndBuf;
1189
1190#ifdef EVTDEBUG
1191 } else {
1192 snprintf (str, MXSTR,
1193 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
1194 i, rd[i].fadLen, rd[i].rBuf->B[0],
1195 rd[i].rBuf->B[1], start.B[1], start.B[0],
1196 rd[i].rBuf->B[rd[i].bufPos - 2],
1197 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
1198 stop.B[0]);
1199 factOut (kDebug, 301, str);
1200#endif
1201 }
1202
1203 if (jrd > 0)
1204 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1205
1206 //we have a complete buffer, copy to WORK area
1207 int jr;
1208 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1209 for (jr = 0; jr < 9; jr++) {
1210 roi[jr] =
1211 ntohs (rd[i].
1212 rBuf->S[head_len / 2 + 2 + jr * (roi[0] + 4)]);
1213 }
1214 //get index into mBuffer for this event (create if needed)
1215
1216 int actid;
1217 if (g_useFTM > 0)
1218 actid = rd[i].evtID;
1219 else
1220 actid = rd[i].ftmID;
1221
1222 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1223 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1224 rd[i].evtID);
1225
1226 if (evID < -1000) {
1227 goto EndBuf; //not usable board/event/run --> skip it
1228 }
1229 if (evID < 0) { //no space left, retry later
1230#ifdef EVTDEBUG
1231 if (rd[i].bufLen != 0) {
1232 snprintf (str, MXSTR, "something screwed up");
1233 factOut (kFatal, 1, str);
1234 }
1235#endif
1236 xwait.tv_sec = 0;
1237 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1238 nanosleep (&xwait, NULL);
1239 goto EndBuf1; //hope there is free space next round
1240 }
1241 //we have a valid entry in mBuffer[]; fill it
1242
1243#ifdef EVTDEBUG
1244 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1245 rd[i].fadLen);
1246 if (xchk != 0) {
1247 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d",
1248 xchk, rd[i].fadLen, i);
1249 factOut (kFatal, 1, str);
1250
1251 uint iq;
1252 for (iq = 0; iq < rd[i].fadLen; iq++) {
1253 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1254 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq,
1255 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
1256 factOut (kFatal, 1, str);
1257 }
1258 }
1259 }
1260#endif
1261 int qncpy = 0;
1262 boardId = b;
1263 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1264 int fadCrate = fadBoard / 256;
1265 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1266 snprintf (str, MXSTR, "wrong Board ID %d %d %d",
1267 fadCrate, fadBoard % 256, boardId);
1268 factOut (kWarn, 301, str);
1269 }
1270 if (mBuffer[evID].board[boardId] != -1) {
1271 snprintf (str, MXSTR,
1272 "double board: ev %5d, b %3d, %3d; len %5d %3d %3d - %3d %3d ",
1273 evID, boardId, i, rd[i].fadLen,
1274 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1275 rd[i].rBuf->B[rd[i].bufPos - 2],
1276 rd[i].rBuf->B[rd[i].bufPos - 1]);
1277 factOut (kWarn, 501, str);
1278 goto EndBuf; //--> skip Board
1279 }
1280
1281 int iDx = evtIdx[evID]; //index into evtCtrl
1282
1283 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
1284 &rd[i].rBuf->S[0], head_len);
1285 qncpy += head_len;
1286
1287 src = head_len / 2;
1288 for (px = 0; px < 9; px++) { //different sort in FAD board.....
1289 for (drs = 0; drs < 4; drs++) {
1290 pixH = ntohs (rd[i].rBuf->S[src++]); // ID
1291 pixC = ntohs (rd[i].rBuf->S[src++]); // start-cell
1292 pixR = ntohs (rd[i].rBuf->S[src++]); // roi
1293//here we should check if pixH is correct ....
1294
1295 pixS = boardId * 36 + drs * 9 + px;
1296 src++;
1297
1298
1299 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1300 dest = pixS * roi[0];
1301 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1302 &rd[i].rBuf->S[src], roi[0] * 2);
1303 qncpy += roi[0] * 2;
1304 src += pixR;
1305
1306 if (px == 8) {
1307 tmS = boardId * 4 + drs;
1308 if (pixR > roi[0]) { //and we have additional TM info
1309 dest = tmS * roi[0] + NPIX * roi[0];
1310 int srcT = src - roi[0];
1311 mBuffer[evID].fEvent->StartTM[tmS] =
1312 (pixC + pixR - roi[0]) % 1024;
1313 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1314 &rd[i].rBuf->S[srcT], roi[0] * 2);
1315 qncpy += roi[0] * 2;
1316 } else {
1317 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1318 }
1319 }
1320 }
1321 } // now we have stored a new board contents into Event structure
1322
1323 mBuffer[evID].fEvent->NumBoards++;
1324 mBuffer[evID].board[boardId] = boardId;
1325 evtCtrl.evtStat[iDx]++;
1326 evtCtrl.pcTime[iDx] = g_actTime;
1327
1328 if (++mBuffer[evID].nBoard >= actBoards) {
1329 int qnrun = 0;
1330 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ???
1331 actrun = mBuffer[evID].runNum;
1332 int ir;
1333 for (ir = 0; ir < MAX_RUN; ir++) {
1334 qnrun++;
1335 if (runCtrl[ir].runId == actrun) {
1336 if (++runCtrl[ir].lastEvt == 0) {
1337 gotNewRun (actrun, mBuffer[evID].FADhead);
1338 snprintf (str, MXSTR, "gotNewRun %d (ev %d)",
1339 mBuffer[evID].runNum,
1340 mBuffer[evID].evNum);
1341 factOut (kInfo, 1, str);
1342 break;
1343 }
1344 }
1345 }
1346 }
1347 snprintf (str, MXSTR,
1348 "%5d complete event roi %4d roiTM %d cpy %8d %5d",
1349 mBuffer[evID].evNum, roi[0], roi[8] - roi[0],
1350 qncpy, qnrun);
1351 factOut (kDebug, -1, str);
1352
1353 //complete event read ---> flag for next processing
1354 evtCtrl.evtStat[iDx] = 99;
1355 gi.evtTot++;
1356 }
1357
1358 EndBuf:
1359 rd[i].bufTyp = 0; //ready to read next header
1360 rd[i].bufLen = frst_len;
1361 rd[i].bufPos = 0;
1362 EndBuf1:
1363 ;
1364 }
1365
1366 } else { //we are reading event header
1367 rd[i].bufPos += jrd;
1368 rd[i].bufLen -= jrd;
1369 if (rd[i].bufPos >= minLen) { //sufficient data to take action
1370 //check if startflag correct; else shift block ....
1371 for (k = 0; k < rd[i].bufPos - 1; k++) {
1372 if (rd[i].rBuf->B[k] == start.B[1]
1373 && rd[i].rBuf->B[k + 1] == start.B[0])
1374 break;
1375 }
1376 rd[i].skip += k;
1377
1378 if (k >= rd[i].bufPos - 1) { //no start of header found
1379 rd[i].bufPos = 0;
1380 rd[i].bufLen = head_len;
1381 } else if (k > 0) {
1382 rd[i].bufPos -= k;
1383 rd[i].bufLen += k;
1384 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1385 rd[i].bufPos);
1386#ifdef EVTDEBUG
1387 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1388 rd[i].bufPos);
1389#endif
1390 }
1391 if (rd[i].bufPos >= minLen) {
1392 if (rd[i].skip > 0) {
1393 snprintf (str, MXSTR, "skipped %d bytes on port%d",
1394 rd[i].skip, i);
1395 factOut (kInfo, 666, str);
1396 rd[i].skip = 0;
1397 }
1398 goodhed++;
1399 rd[i].fadLen = ntohs (rd[i].rBuf->S[1]) * 2;
1400 rd[i].fadVers = ntohs (rd[i].rBuf->S[2]);
1401 rd[i].ftmTyp = ntohs (rd[i].rBuf->S[5]);
1402 rd[i].ftmID = ntohl (rd[i].rBuf->I[3]); //(FTMevt)
1403 rd[i].evtID = ntohl (rd[i].rBuf->I[4]); //(FADevt)
1404 rd[i].runID = ntohl (rd[i].rBuf->I[11]);
1405 rd[i].bufTyp = 1; //ready to read full record
1406 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1407
1408 int fadboard = ntohs (rd[i].rBuf->S[12]);
1409 int fadcrate = fadboard / 256;
1410 fadboard = (fadcrate * 10 + fadboard % 256);
1411#ifdef EVTDEBUG
1412 snprintf (str, MXSTR,
1413 "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
1414 rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
1415 rd[i].runID, fadboard, jrd);
1416 factOut (kDebug, 1, str);
1417#endif
1418
1419 if (rd[i].runID == 0)
1420 rd[i].runID = gi_myRun;
1421
1422 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
1423 snprintf (str, MXSTR,
1424 "illegal event-length on port %d", i);
1425 factOut (kFatal, 881, str);
1426 rd[i].bufLen = 100000; //?
1427 }
1428 int fadBoard = ntohs (rd[i].rBuf->S[12]);
1429 debugHead (i, fadBoard, rd[i].rBuf);
1430 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1431 } else {
1432 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1433 }
1434 } else {
1435 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1436 }
1437
1438 } //end interpreting last read
1439}
1440 } //end of successful read anything
1441 } //finished trying to read all sockets
1442
1443#ifdef EVTDEBUG
1444 snprintf (str, MXSTR, "Loop ---- %3d --- %8d", numokx, jrdx);
1445 factOut (kDebug, -1, str);
1446#endif
1447
1448 gi.numRead[numok]++;
1449
1450 g_actTime = time (NULL);
1451 if (g_actTime > gi_SecTime) {
1452 gi_SecTime = g_actTime;
1453
1454
1455 //loop over all active events and flag those older than read-timeout
1456 //delete those that are written to disk ....
1457
1458 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1459 if (kd < 0)
1460 kd += (MAX_EVT * MAX_RUN);
1461
1462 gj.bufNew = gj.bufEvt = 0;
1463 int k1 = evtCtrl.frstPtr;
1464 for (k = k1; k < (k1 + kd); k++) {
1465 int k0 = k % (MAX_EVT * MAX_RUN);
1466//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1467
1468 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) {
1469 gj.bufNew++; //incomplete event in Buffer
1470 if (evtCtrl.evtStat[k0] < 90
1471 && evtCtrl.pcTime[k0] < g_actTime - 10) {
1472 int id = evtCtrl.evtBuf[k0];
1473 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d",
1474 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0,
1475 evtCtrl.evtStat[k0]);
1476 factOut (kWarn, 601, str);
1477
1478 int ik,ib,jb;
1479 ik=0;
1480 for (ib=0; ib<NBOARDS; ib++) {
1481 if (ib%10==0) {
1482 snprintf (&str[ik], MXSTR, "'");
1483 ik++;
1484 }
1485 jb = mBuffer[id].board[ib];
1486 if ( jb<0 ) {
1487 if (gi_NumConnect[b] >0 ) {
1488 snprintf (&str[ik], MXSTR, ".");
1489 } else {
1490 snprintf (&str[ik], MXSTR, "x");
1491 }
1492 } else {
1493 snprintf (&str[ik], MXSTR, "%d",jb%10);
1494 }
1495 ik++;
1496 }
1497 snprintf (&str[ik], MXSTR, "'");
1498 factOut (kWarn, 601, str);
1499
1500 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events
1501 gi.evtSkp++;
1502 gi.evtTot++;
1503 gj.evtSkip++;
1504 }
1505 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete'
1506 || evtCtrl.evtStat[k0] == 0) { //'useless'
1507
1508 int id = evtCtrl.evtBuf[k0];
1509 snprintf (str, MXSTR, "%5d free event buffer, nb=%3d",
1510 mBuffer[id].evNum, mBuffer[id].nBoard);
1511 factOut (kDebug, -1, str);
1512 mBufFree (id); //event written--> free memory
1513 evtCtrl.evtStat[k0] = -1;
1514 gj.evtWrite++;
1515 gj.rateWrite++;
1516 } else if (evtCtrl.evtStat[k0] >= 95) {
1517 gj.bufEvt++; //complete event in Buffer
1518 }
1519
1520 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) {
1521 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1522 }
1523 }
1524
1525
1526 gj.deltaT = 1000; //temporary, must be improved
1527
1528 int b;
1529 for (b = 0; b < NBOARDS; b++)
1530 gj.totBytes[b] += gj.rateBytes[b];
1531 gj.totMem = g_maxMem;
1532 if (gj.maxMem > gj.xxxMem)
1533 gj.xxxMem = gj.maxMem;
1534 if (gj.maxEvt > gj.xxxEvt)
1535 gj.xxxEvt = gj.maxEvt;
1536
1537 factStat (gj);
1538 factStatNew (gi);
1539 gj.rateNew = gj.rateWrite = 0;
1540 gj.maxMem = gj.usdMem;
1541 gj.maxEvt = gj.bufTot;
1542 for (b = 0; b < NBOARDS; b++)
1543 gj.rateBytes[b] = 0;
1544 }
1545
1546 if (numok > 0)
1547 numok2 = 0;
1548 else if (numok2++ > 3) {
1549 if (g_runStat == 1) {
1550 xwait.tv_sec = 1;
1551 xwait.tv_nsec = 0; // hibernate for 1 sec
1552 } else {
1553 xwait.tv_sec = 0;
1554 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1555 }
1556 nanosleep (&xwait, NULL);
1557 }
1558
1559 } //and do next loop over all sockets ...
1560
1561
1562 snprintf (str, MXSTR, "stop reading ... RESET=%d", g_reset);
1563 factOut (kInfo, -1, str);
1564
1565 if (g_reset > 0) {
1566 gi_reset = g_reset;
1567 gi_resetR = gi_reset % 10; //shall we stop reading ?
1568 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1569 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1570 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1571 g_reset = 0;
1572 } else {
1573 gi_reset = 0;
1574 if (g_runStat == -1)
1575 gi_resetR = 1;
1576 else
1577 gi_resetR = 7;
1578 gi_resetS = 7; //close all sockets
1579 gi_resetW = 7; //close all files
1580 gi_resetX = 0;
1581
1582 //inform others we have to quit ....
1583 gi_runStat = -11; //inform all that no update to happen any more
1584 gj.readStat = -11; //inform all that no update to happen any more
1585 }
1586
1587 if (gi_resetS > 0) {
1588 //must close all open sockets ...
1589 snprintf (str, MXSTR, "close all sockets ...");
1590 factOut (kInfo, -1, str);
1591 for (i = 0; i < MAX_SOCK; i++) {
1592 if (rd[i].sockStat == 0) {
1593 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1594 if (i % 7 == 0) {
1595// gi_NumConnect[i / 7]--;
1596 gi_NumConnect[i / 7]-= cntsock ;
1597 gi.numConn[i / 7]--;
1598 gj.numConn[i / 7]--;
1599 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1600 rd[i / 7].sockStat = -1; //and try to open asap
1601 }
1602 }
1603 }
1604 }
1605
1606
1607 if (gi_resetR > 0) {
1608 //flag all events as 'read finished'
1609 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1610 if (kd < 0)
1611 kd += (MAX_EVT * MAX_RUN);
1612
1613 int k1 = evtCtrl.frstPtr;
1614 for (k = k1; k < (k1 + kd); k++) {
1615 int k0 = k % (MAX_EVT * MAX_RUN);
1616 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) {
1617 evtCtrl.evtStat[k0] = 91;
1618 gi.evtSkp++;
1619 gi.evtTot++;
1620 }
1621 }
1622
1623 xwait.tv_sec = 0;
1624 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1625 nanosleep (&xwait, NULL);
1626
1627 //and clear all buffers (might have to wait until all others are done)
1628 int minclear;
1629 if (gi_resetR == 1) {
1630 minclear = 900;
1631 snprintf (str, MXSTR, "drain all buffers ...");
1632 } else {
1633 minclear = 0;
1634 snprintf (str, MXSTR, "flush all buffers ...");
1635 }
1636 factOut (kInfo, -1, str);
1637
1638 int numclear = 1;
1639 while (numclear > 0) {
1640 numclear = 0;
1641 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1642 if (kd < 0)
1643 kd += (MAX_EVT * MAX_RUN);
1644
1645 int k1 = evtCtrl.frstPtr;
1646 for (k = k1; k < (k1 + kd); k++) {
1647 int k0 = k % (MAX_EVT * MAX_RUN);
1648 if (evtCtrl.evtStat[k0] > minclear) {
1649 int id = evtCtrl.evtBuf[k0];
1650 snprintf (str, MXSTR, "ev %5d free event buffer, nb=%3d",
1651 mBuffer[id].evNum, mBuffer[id].nBoard);
1652 factOut (kDebug, -1, str);
1653 mBufFree (id); //event written--> free memory
1654 evtCtrl.evtStat[k0] = -1;
1655 } else if (evtCtrl.evtStat[k0] > 0)
1656 numclear++; //writing is still ongoing...
1657
1658 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1659 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1660 }
1661
1662 xwait.tv_sec = 0;
1663 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1664 nanosleep (&xwait, NULL);
1665 }
1666 }
1667
1668 if (gi_reset > 0) {
1669 if (gi_resetW > 0) {
1670 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1671 }
1672 if (gi_resetX > 0) {
1673 xwait.tv_sec = gi_resetX;
1674 xwait.tv_nsec = 0;
1675 nanosleep (&xwait, NULL);
1676 }
1677
1678 snprintf (str, MXSTR, "Continue read Process ...");
1679 factOut (kInfo, -1, str);
1680 gi_reset = 0;
1681 goto START;
1682 }
1683
1684
1685
1686 snprintf (str, MXSTR, "Exit read Process ...");
1687 factOut (kInfo, -1, str);
1688 gi_runStat = -99;
1689 gj.readStat = -99;
1690 factStat (gj);
1691 factStatNew (gi);
1692 return 0;
1693
1694} /*-----------------------------------------------------------------*/
1695
1696
1697void *
1698subProc (void *thrid)
1699{
1700 int threadID, status, numWait, numProc, kd, k1, k0, k, jret;
1701 struct timespec xwait;
1702
1703 int32_t cntr ;
1704
1705 threadID = (int) thrid;
1706
1707 snprintf (str, MXSTR, "Starting sub-process-thread %d", threadID);
1708 factOut (kInfo, -1, str);
1709
1710 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1711 numWait = numProc = 0;
1712 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1713 if (kd < 0)
1714 kd += (MAX_EVT * MAX_RUN);
1715
1716 int k1 = evtCtrl.frstPtr;
1717 for (k = k1; k < (k1 + kd); k++) {
1718 int k0 = k % (MAX_EVT * MAX_RUN);
1719
1720 if (evtCtrl.evtStat[k0] == 1000 + threadID) {
1721 if (gi_resetR > 1) { //we are asked to flush buffers asap
1722 jret = 9100; //flag to be deleted
1723 } else {
1724 int id = evtCtrl.evtBuf[k0];
1725
1726
1727 jret =
1728 subProcEvt (threadID, mBuffer[id].FADhead,
1729 mBuffer[id].fEvent, mBuffer[id].buffer);
1730
1731
1732 if (jret <= threadID) {
1733 snprintf (str, MXSTR,
1734 "process %d wants to send to process %d",
1735 threadID, jret);
1736 factOut (kError, -1, str);
1737 jret = 5300;
1738 } else if (jret <= 0)
1739 jret = 9200 + threadID; //flag as 'to be deleted'
1740 else if (jret >= gi_maxProc)
1741 jret = 5200 + threadID; //flag as 'to be written'
1742 else
1743 jret = 1000 + jret; //flag for next proces
1744 }
1745 evtCtrl.evtStat[k0] = jret;
1746 numProc++;
1747 } else if (evtCtrl.evtStat[k0] < 1000 + threadID)
1748 numWait++;
1749 }
1750
1751 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1752 snprintf (str, MXSTR, "Exit subProcessing Process %d", threadID);
1753 factOut (kInfo, -1, str);
1754 return 0;
1755 }
1756 if (numProc == 0) {
1757 //seems we have nothing to do, so sleep a little
1758 xwait.tv_sec = 0;
1759 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1760 nanosleep (&xwait, NULL);
1761 }
1762 }
1763
1764 snprintf (str, MXSTR, "Ending sub-process-thread %d", threadID);
1765 factOut (kInfo, -1, str);
1766 return;
1767} /*-----------------------------------------------------------------*/
1768
1769
1770void *
1771procEvt (void *ptr)
1772{
1773/* *** main loop processing file, including SW-trigger */
1774 int numProc, numWait;
1775 int k, status, j;
1776 struct timespec xwait;
1777 char str[MXSTR];
1778
1779
1780
1781 int lastRun = 0; //usually run from last event still valid
1782
1783 cpu_set_t mask;
1784 int cpu = 1; //process thread (will be several in final version)
1785
1786 snprintf (str, MXSTR, "Starting process-thread with %d subprocess",
1787 gi_maxProc);
1788 factOut (kInfo, -1, str);
1789
1790/* CPU_ZERO initializes all the bits in the mask to zero. */
1791// CPU_ZERO (&mask);
1792/* CPU_SET sets only the bit corresponding to cpu. */
1793// CPU_SET( 0 , &mask ); leave for system
1794// CPU_SET( 1 , &mask ); used by write process
1795// CPU_SET (2, &mask);
1796// CPU_SET (3, &mask);
1797// CPU_SET (4, &mask);
1798// CPU_SET (5, &mask);
1799// CPU_SET (6, &mask);
1800// CPU_SET( 7 , &mask ); used by read process
1801/* sched_setaffinity returns 0 in success */
1802// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1803// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1804// factOut (kWarn, -1, str);
1805// }
1806
1807
1808 pthread_t thread[100];
1809 int th_ret[100];
1810
1811 for (k = 0; k < gi_maxProc; k++) {
1812 th_ret[k] = pthread_create (&thread[k], NULL, subProc, (void *) k);
1813 }
1814
1815 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1816
1817 numWait = numProc = 0;
1818 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1819 if (kd < 0)
1820 kd += (MAX_EVT * MAX_RUN);
1821
1822 int k1 = evtCtrl.frstPtr;
1823 for (k = k1; k < (k1 + kd); k++) {
1824 int k0 = k % (MAX_EVT * MAX_RUN);
1825//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1826 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) {
1827
1828 if (gi_resetR > 1) { //we are asked to flush buffers asap
1829 evtCtrl.evtStat[k0] = 9991;
1830 } else {
1831
1832//-------- it is better to open the run already here, so call can be used to initialize
1833//-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1834 int id = evtCtrl.evtBuf[k0];
1835 uint32_t irun = mBuffer[id].runNum;
1836 int32_t ievt = mBuffer[id].evNum;
1837 if (runCtrl[lastRun].runId == irun) {
1838 j = lastRun;
1839 } else {
1840 //check which fileID to use (or open if needed)
1841 for (j = 0; j < MAX_RUN; j++) {
1842 if (runCtrl[j].runId == irun)
1843 break;
1844 }
1845 if (j >= MAX_RUN) {
1846 snprintf (str, MXSTR,
1847 "P error: can not find run %d for event %d in %d",
1848 irun, ievt, id);
1849 factOut (kFatal, 901, str);
1850 }
1851 lastRun = j;
1852 }
1853
1854 if (runCtrl[j].fileId < 0) {
1855//---- we need to open a new run ==> make sure all older runs are
1856//---- finished and marked to be closed ....
1857 int j1;
1858 for (j1 = 0; j1 < MAX_RUN; j1++) {
1859 if (runCtrl[j1].fileId == 0) {
1860 runCtrl[j1].procId = 2; //--> do no longer accept events for processing
1861//---- problem: processing still going on ==> must wait for closing ....
1862 snprintf (str, MXSTR,
1863 "P finish run since new one opened %d",
1864 runCtrl[j1].runId);
1865 runFinish1 (runCtrl[j1].runId);
1866 }
1867
1868 }
1869
1870 actRun.Version = 1;
1871 actRun.RunType = -1; //to be adapted
1872
1873 actRun.Nroi = runCtrl[j].roi0;
1874 actRun.NroiTM = runCtrl[j].roi8;
1875 if (actRun.Nroi == actRun.NroiTM)
1876 actRun.NroiTM = 0;
1877 actRun.RunTime = runCtrl[j].firstTime;
1878 actRun.RunUsec = runCtrl[j].firstTime;
1879 actRun.NBoard = NBOARDS;
1880 actRun.NPix = NPIX;
1881 actRun.NTm = NTMARK;
1882 actRun.Nroi = mBuffer[id].nRoi;
1883 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1884 NBOARDS * sizeof (PEVNT_HEADER));
1885
1886 runCtrl[j].fileHd =
1887 runOpen (irun, &actRun, sizeof (actRun));
1888 if (runCtrl[j].fileHd == NULL) {
1889 snprintf (str, MXSTR,
1890 "P could not open a file for run %d", irun);
1891 factOut (kError, 502, str);
1892 runCtrl[j].fileId = 91;
1893 runCtrl[j].procId = 91;
1894 } else {
1895 snprintf (str, MXSTR, "P opened new run_file %d evt %d",
1896 irun, ievt);
1897 factOut (kInfo, -1, str);
1898 runCtrl[j].fileId = 0;
1899 runCtrl[j].procId = 0;
1900 }
1901
1902 }
1903//-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
1904 if (runCtrl[j].procId == 0) {
1905 if (runCtrl[j].closeTime < g_actTime
1906 || runCtrl[j].lastTime < g_actTime - 300
1907 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) {
1908 snprintf (str, MXSTR,
1909 "P reached end of run condition for run %d",
1910 irun);
1911 factOut (kInfo, 502, str);
1912 runFinish1 (runCtrl[j].runId);
1913 runCtrl[j].procId = 1;
1914 }
1915 }
1916 if (runCtrl[j].procId != 0) {
1917 snprintf (str, MXSTR,
1918 "P skip event %d because no active run %d", ievt,
1919 irun);
1920 factOut (kDebug, 502, str);
1921 evtCtrl.evtStat[k0] = 9091;
1922 } else {
1923//--------
1924//--------
1925 id = evtCtrl.evtBuf[k0];
1926 int itevt = mBuffer[id].trgNum;
1927 int itrg = mBuffer[id].trgTyp;
1928 int roi = mBuffer[id].nRoi;
1929 int roiTM = mBuffer[id].nRoiTM;
1930
1931//make sure unused pixels/tmarks are cleared to zero
1932 if (roiTM == roi)
1933 roiTM = 0;
1934 int ip, it, dest, ib;
1935 for (ip = 0; ip < NPIX; ip++) {
1936 if (mBuffer[id].fEvent->StartPix[ip] == -1) {
1937 dest = ip * roi;
1938 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1939 }
1940 }
1941 for (it = 0; it < NTMARK; it++) {
1942 if (mBuffer[id].fEvent->StartTM[it] == -1) {
1943 dest = it * roi + NPIX * roi;
1944 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
1945 }
1946 }
1947
1948
1949//and set correct event header ; also check for consistency in event (not yet)
1950 mBuffer[id].fEvent->Roi = roi;
1951 mBuffer[id].fEvent->RoiTM = roiTM;
1952 mBuffer[id].fEvent->EventNum = ievt;
1953 mBuffer[id].fEvent->TriggerNum = itevt;
1954 mBuffer[id].fEvent->TriggerType = itrg;
1955 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
1956 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
1957 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
1958 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
1959 mBuffer[id].fEvent->SoftTrig = 0;
1960
1961
1962 for (ib = 0; ib < NBOARDS; ib++) {
1963 if (mBuffer[id].board[ib] == -1) { //board is not read
1964 mBuffer[id].FADhead[ib].start_package_flag = 0;
1965 mBuffer[id].fEvent->BoardTime[ib] = 0;
1966 } else {
1967 mBuffer[id].fEvent->BoardTime[ib] =
1968 ntohl (mBuffer[id].FADhead[ib].time);
1969 }
1970 }
1971
1972 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
1973 mBuffer[id].fEvent);
1974 gi.procTot++;
1975 numProc++;
1976
1977 if (i < 0) {
1978 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
1979 gi.procErr++;
1980 } else {
1981 evtCtrl.evtStat[k0] = 1000;
1982 runCtrl[j].procEvt++;
1983 }
1984 }
1985 }
1986 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) {
1987 numWait++;
1988 }
1989 }
1990
1991 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1992 snprintf (str, MXSTR, "Exit Processing Process ...");
1993 factOut (kInfo, -1, str);
1994 gp_runStat = -22; //==> we should exit
1995 gj.procStat = -22; //==> we should exit
1996 return 0;
1997 }
1998
1999 if (numProc == 0) {
2000 //seems we have nothing to do, so sleep a little
2001 xwait.tv_sec = 0;
2002 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2003 nanosleep (&xwait, NULL);
2004 }
2005 gp_runStat = gi_runStat;
2006 gj.procStat = gj.readStat;
2007
2008 }
2009
2010 //we are asked to abort asap ==> must flag all remaining events
2011 // when gi_runStat claims that all events are in the buffer...
2012
2013 snprintf (str, MXSTR, "Abort Processing Process ...");
2014 factOut (kInfo, -1, str);
2015 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2016 if (kd < 0)
2017 kd += (MAX_EVT * MAX_RUN);
2018
2019 for (k = 0; k < gi_maxProc; k++) {
2020 pthread_join (thread[k], (void **) &status);
2021 }
2022
2023 int k1 = evtCtrl.frstPtr;
2024 for (k = k1; k < (k1 + kd); k++) {
2025 int k0 = k % (MAX_EVT * MAX_RUN);
2026 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {
2027 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
2028 }
2029 }
2030
2031 gp_runStat = -99;
2032 gj.procStat = -99;
2033
2034 return 0;
2035
2036} /*-----------------------------------------------------------------*/
2037
2038int
2039CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
2040{
2041/* close run runId (all all runs if runId=0) */
2042/* return: 0=close scheduled / >0 already closed / <0 does not exist */
2043 int j;
2044
2045
2046 if (runId == 0) {
2047 for (j = 0; j < MAX_RUN; j++) {
2048 if (runCtrl[j].fileId == 0) { //run is open
2049 runCtrl[j].closeTime = closeTime;
2050 runCtrl[j].maxEvt = maxEvt;
2051 }
2052 }
2053 return 0;
2054 }
2055
2056 for (j = 0; j < MAX_RUN; j++) {
2057 if (runCtrl[j].runId == runId) {
2058 if (runCtrl[j].fileId == 0) { //run is open
2059 runCtrl[j].closeTime = closeTime;
2060 runCtrl[j].maxEvt = maxEvt;
2061 return 0;
2062 } else if (runCtrl[j].fileId < 0) { //run not yet opened
2063 runCtrl[j].closeTime = closeTime;
2064 runCtrl[j].maxEvt = maxEvt;
2065 return +1;
2066 } else { // run already closed
2067 return +2;
2068 }
2069 }
2070 } //we only reach here if the run was never created
2071 return -1;
2072
2073} /*-----------------------------------------------------------------*/
2074
2075
2076void *
2077writeEvt (void *ptr)
2078{
2079/* *** main loop writing event (including opening and closing run-files */
2080
2081 int numWrite, numWait;
2082 int k, j ;
2083 struct timespec xwait;
2084 char str[MXSTR];
2085
2086 cpu_set_t mask;
2087 int cpu = 1; //write thread
2088
2089 snprintf (str, MXSTR, "Starting write-thread");
2090 factOut (kInfo, -1, str);
2091
2092/* CPU_ZERO initializes all the bits in the mask to zero. */
2093// CPU_ZERO (&mask);
2094/* CPU_SET sets only the bit corresponding to cpu. */
2095// CPU_SET (cpu, &mask);
2096/* sched_setaffinity returns 0 in success */
2097// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
2098// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
2099// }
2100
2101 int lastRun = 0; //usually run from last event still valid
2102
2103 while (g_runStat > -2) {
2104
2105 numWait = numWrite = 0;
2106 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2107 if (kd < 0)
2108 kd += (MAX_EVT * MAX_RUN);
2109
2110 int k1 = evtCtrl.frstPtr;
2111 for (k = k1; k < (k1 + kd); k++) {
2112 int k0 = k % (MAX_EVT * MAX_RUN);
2113//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
2114 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) {
2115
2116 if (gi_resetR > 1) { //we must drain the buffer asap
2117 evtCtrl.evtStat[k0] = 9904;
2118 } else {
2119
2120
2121 int id = evtCtrl.evtBuf[k0];
2122 uint32_t irun = mBuffer[id].runNum;
2123 int32_t ievt = mBuffer[id].evNum;
2124
2125 gi.wrtTot++;
2126 if (runCtrl[lastRun].runId == irun) {
2127 j = lastRun;
2128 } else {
2129 //check which fileID to use (or open if needed)
2130 for (j = 0; j < MAX_RUN; j++) {
2131 if (runCtrl[j].runId == irun)
2132 break;
2133 }
2134 if (j >= MAX_RUN) {
2135 snprintf (str, MXSTR,
2136 "W error: can not find run %d for event %d in %d",
2137 irun, ievt, id);
2138 factOut (kFatal, 901, str);
2139 gi.wrtErr++;
2140 }
2141 lastRun = j;
2142 }
2143
2144 if (runCtrl[j].fileId < 0) {
2145 actRun.Version = 1;
2146 actRun.RunType = -1; //to be adapted
2147
2148 actRun.Nroi = runCtrl[j].roi0;
2149 actRun.NroiTM = runCtrl[j].roi8;
2150 if (actRun.Nroi == actRun.NroiTM)
2151 actRun.NroiTM = 0;
2152 actRun.RunTime = runCtrl[j].firstTime;
2153 actRun.RunUsec = runCtrl[j].firstTime;
2154 actRun.NBoard = NBOARDS;
2155 actRun.NPix = NPIX;
2156 actRun.NTm = NTMARK;
2157 actRun.Nroi = mBuffer[id].nRoi;
2158 memcpy (actRun.FADhead, mBuffer[id].FADhead,
2159 NBOARDS * sizeof (PEVNT_HEADER));
2160
2161 runCtrl[j].fileHd =
2162 runOpen (irun, &actRun, sizeof (actRun));
2163 if (runCtrl[j].fileHd == NULL) {
2164 snprintf (str, MXSTR,
2165 "W could not open a file for run %d", irun);
2166 factOut (kError, 502, str);
2167 runCtrl[j].fileId = 91;
2168 } else {
2169 snprintf (str, MXSTR, "W opened new run_file %d evt %d",
2170 irun, ievt);
2171 factOut (kInfo, -1, str);
2172 runCtrl[j].fileId = 0;
2173 }
2174
2175 }
2176
2177 if (runCtrl[j].fileId != 0) {
2178 if (runCtrl[j].fileId < 0) {
2179 snprintf (str, MXSTR,
2180 "W never opened file for this run %d", irun);
2181 factOut (kError, 123, str);
2182 } else if (runCtrl[j].fileId < 100) {
2183 snprintf (str, MXSTR, "W.file for this run is closed %d",
2184 irun);
2185 factOut (kWarn, 123, str);
2186 runCtrl[j].fileId += 100;
2187 } else {
2188 snprintf (str, MXSTR, "W:file for this run is closed %d",
2189 irun);
2190 factOut (kDebug, 123, str);
2191 }
2192 evtCtrl.evtStat[k0] = 9903;
2193 gi.wrtErr++;
2194 } else {
2195// snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id]));
2196// factOut (kInfo, 504, str);
2197 int i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
2198 sizeof (mBuffer[id]));
2199 if (i >= 0) {
2200 runCtrl[j].lastTime = g_actTime;
2201 runCtrl[j].actEvt++;
2202 evtCtrl.evtStat[k0] = 9901;
2203 snprintf (str, MXSTR,
2204 "%5d successfully wrote for run %d id %5d",
2205 ievt, irun, k0);
2206 factOut (kDebug, 504, str);
2207// gj.writEvt++ ;
2208 } else {
2209 snprintf (str, MXSTR, "W error writing event for run %d",
2210 irun);
2211 factOut (kError, 503, str);
2212 evtCtrl.evtStat[k0] = 9902;
2213 gi.wrtErr++;
2214 }
2215
2216 if (i < 0
2217 || runCtrl[j].lastTime < g_actTime - 300
2218 || runCtrl[j].closeTime < g_actTime
2219 || runCtrl[j].maxEvt < runCtrl[j].actEvt) {
2220 int ii = 0;
2221 if (i < 0)
2222 ii = 1;
2223 else if (runCtrl[j].closeTime < g_actTime)
2224 ii = 2;
2225 else if (runCtrl[j].lastTime < g_actTime - 300)
2226 ii = 3;
2227 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2228 ii = 4;
2229
2230
2231
2232 //close run for whatever reason
2233 if (runCtrl[j].runId == gi_myRun)
2234 gi_myRun = g_actTime;
2235
2236 if (runCtrl[j].procId == 0) {
2237 runFinish1 (runCtrl[j].runId);
2238 runCtrl[j].procId = 92;
2239 }
2240
2241 runCtrl[j].closeTime = g_actTime - 1;
2242 i = runClose (runCtrl[j].fileHd, &runTail[j],
2243 sizeof (runTail[j]));
2244 if (i < 0) {
2245 snprintf (str, MXSTR, "error closing run %d %d AAA",
2246 runCtrl[j].runId, i);
2247 factOut (kError, 503, str);
2248 runCtrl[j].fileId = 92;
2249 } else {
2250 snprintf (str, MXSTR, "W closed run %d AAA %d", irun,
2251 ii);
2252 factOut (kInfo, 503, str);
2253 runCtrl[j].fileId = 93;
2254 }
2255 }
2256 }
2257 }
2258 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2259 numWait++;
2260 }
2261
2262 //check if we should close a run (mainly when no event pending)
2263 for (j = 0; j < MAX_RUN; j++) {
2264 if (runCtrl[j].fileId == 0
2265 && (runCtrl[j].closeTime < g_actTime
2266 || runCtrl[j].lastTime < g_actTime - 300
2267 || runCtrl[j].maxEvt <= runCtrl[j].actEvt)) {
2268 if (runCtrl[j].runId == gi_myRun)
2269 gi_myRun = g_actTime;
2270 int ii = 0;
2271 if (runCtrl[j].closeTime < g_actTime)
2272 ii = 2;
2273 else if (runCtrl[j].lastTime < g_actTime - 300)
2274 ii = 3;
2275 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2276 ii = 4;
2277
2278 if (runCtrl[j].procId == 0) {
2279 runFinish1 (runCtrl[j].runId);
2280 runCtrl[j].procId = 92;
2281 }
2282
2283 runCtrl[j].closeTime = g_actTime - 1;
2284 int i = runClose (runCtrl[j].fileHd, &runTail[j],
2285 sizeof (runTail[j]));
2286 if (i < 0) {
2287 snprintf (str, MXSTR, "error closing run %d %d BBB",
2288 runCtrl[j].runId, i);
2289 factOut (kError, 506, str);
2290 runCtrl[j].fileId = 94;
2291 } else {
2292 snprintf (str, MXSTR, "W closed run %d BBB %d",
2293 runCtrl[j].runId, ii);
2294 factOut (kInfo, 507, str);
2295 runCtrl[j].fileId = 95;
2296 }
2297 }
2298 }
2299
2300 if (numWrite == 0) {
2301 //seems we have nothing to do, so sleep a little
2302 xwait.tv_sec = 0;
2303 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2304 nanosleep (&xwait, NULL);
2305 }
2306
2307 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2308 snprintf (str, MXSTR, "Finish Write Process ...");
2309 factOut (kInfo, -1, str);
2310 gw_runStat = -22; //==> we should exit
2311 gj.writStat = -22; //==> we should exit
2312 goto closerun;
2313 }
2314 gw_runStat = gi_runStat;
2315 gj.writStat = gj.readStat;
2316
2317 }
2318
2319 //must close all open files ....
2320 snprintf (str, MXSTR, "Abort Writing Process ...");
2321 factOut (kInfo, -1, str);
2322
2323 closerun:
2324 snprintf (str, MXSTR, "Close all open files ...");
2325 factOut (kInfo, -1, str);
2326 for (j = 0; j < MAX_RUN; j++)
2327 if (runCtrl[j].fileId == 0) {
2328 if (runCtrl[j].runId == gi_myRun)
2329 gi_myRun = g_actTime;
2330
2331 if (runCtrl[j].procId == 0) {
2332 runFinish1 (runCtrl[j].runId);
2333 runCtrl[j].procId = 92;
2334 }
2335
2336 runCtrl[j].closeTime = g_actTime - 1;
2337 int i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));
2338 int ii = 0;
2339 if (runCtrl[j].closeTime < g_actTime)
2340 ii = 2;
2341 else if (runCtrl[j].lastTime < g_actTime - 300)
2342 ii = 3;
2343 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2344 ii = 4;
2345 if (i < 0) {
2346 snprintf (str, MXSTR, "error closing run %d %d CCC",
2347 runCtrl[j].runId, i);
2348 factOut (kError, 506, str);
2349 runCtrl[j].fileId = 96;
2350 } else {
2351 snprintf (str, MXSTR, "W closed run %d CCC %d", runCtrl[j].runId,
2352 ii);
2353 factOut (kInfo, 507, str);
2354 runCtrl[j].fileId = 97;
2355 }
2356 }
2357
2358 gw_runStat = -99;
2359 gj.writStat = -99;
2360 snprintf (str, MXSTR, "Exit Writing Process ...");
2361 factOut (kInfo, -1, str);
2362 return 0;
2363
2364
2365
2366
2367} /*-----------------------------------------------------------------*/
2368
2369
2370
2371
2372void
2373StartEvtBuild ()
2374{
2375
2376 int i, j, imax, status, th_ret[50];
2377 pthread_t thread[50];
2378 struct timespec xwait;
2379
2380 gi_runStat = gp_runStat = gw_runStat = 0;
2381 gj.readStat = gj.procStat = gj.writStat = 0;
2382
2383 snprintf (str, MXSTR, "Starting EventBuilder V15.07 A");
2384 factOut (kInfo, -1, str);
2385
2386//initialize run control logics
2387 for (i = 0; i < MAX_RUN; i++) {
2388 runCtrl[i].runId = 0;
2389 runCtrl[i].fileId = -2;
2390 }
2391
2392//prepare for subProcesses
2393 gi_maxSize = g_maxSize;
2394 if (gi_maxSize <= 0)
2395 gi_maxSize = 1;
2396
2397 gi_maxProc = g_maxProc;
2398 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2399 snprintf (str, MXSTR, "illegal number of processes %d", gi_maxProc);
2400 factOut (kFatal, 301, str);
2401 gi_maxProc = 1;
2402 }
2403//partially initialize event control logics
2404 evtCtrl.frstPtr = 0;
2405 evtCtrl.lastPtr = 0;
2406
2407//start all threads (more to come) when we are allowed to ....
2408 while (g_runStat == 0) {
2409 xwait.tv_sec = 0;
2410 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2411 nanosleep (&xwait, NULL);
2412 }
2413
2414 i = 0;
2415 th_ret[i] = pthread_create (&thread[i], NULL, readFAD, NULL);
2416 i++;
2417 th_ret[i] = pthread_create (&thread[i], NULL, procEvt, NULL);
2418 i++;
2419 th_ret[i] = pthread_create (&thread[i], NULL, writeEvt, NULL);
2420 i++;
2421 imax = i;
2422
2423
2424#ifdef BILAND
2425 xwait.tv_sec = 30;;
2426 xwait.tv_nsec = 0; // sleep for ~20sec
2427 nanosleep (&xwait, NULL);
2428
2429 printf ("close all runs in 2 seconds\n");
2430
2431 CloseRunFile (0, time (NULL) + 2, 0);
2432
2433 xwait.tv_sec = 1;;
2434 xwait.tv_nsec = 0; // sleep for ~20sec
2435 nanosleep (&xwait, NULL);
2436
2437 printf ("setting g_runstat to -1\n");
2438
2439 g_runStat = -1;
2440#endif
2441
2442
2443//wait for all threads to finish
2444 for (i = 0; i < imax; i++) {
2445 j = pthread_join (thread[i], (void **) &status);
2446 }
2447
2448} /*-----------------------------------------------------------------*/
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464 /*-----------------------------------------------------------------*/
2465 /*-----------------------------------------------------------------*/
2466 /*-----------------------------------------------------------------*/
2467 /*-----------------------------------------------------------------*/
2468 /*-----------------------------------------------------------------*/
2469
2470#ifdef BILAND
2471
2472int
2473subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2474 int8_t * buffer)
2475{
2476 printf ("called subproc %d\n", threadID);
2477 return threadID + 1;
2478}
2479
2480
2481
2482
2483 /*-----------------------------------------------------------------*/
2484 /*-----------------------------------------------------------------*/
2485 /*-----------------------------------------------------------------*/
2486 /*-----------------------------------------------------------------*/
2487 /*-----------------------------------------------------------------*/
2488
2489
2490
2491
2492FileHandle_t
2493runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2494{
2495 return 1;
2496};
2497
2498int
2499runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2500{
2501 return 1;
2502 usleep (10000);
2503 return 1;
2504}
2505
2506
2507//{ return 1; } ;
2508
2509int
2510runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2511{
2512 return 1;
2513};
2514
2515
2516
2517
2518int
2519eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2520{
2521 int i = 0;
2522
2523// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2524// for (i=0; i<NBOARDS; i++) {
2525// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2526// }
2527 return 0;
2528}
2529
2530
2531void
2532factStatNew (EVT_STAT gi)
2533{
2534 int i;
2535
2536//for (i=0;i<MAX_SOCK;i++) {
2537// printf("%4d",gi.numRead[i]);
2538// if (i%20 == 0 ) printf("\n");
2539//}
2540}
2541
2542void
2543gotNewRun (int runnr, PEVNT_HEADER * headers)
2544{
2545 printf ("got new run %d\n", runnr);
2546 return;
2547}
2548
2549void
2550factStat (GUI_STAT gj)
2551{
2552// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2553// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2554}
2555
2556
2557void
2558debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2559 int state, uint32_t tsec, uint32_t tusec)
2560{
2561// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2562}
2563
2564
2565
2566void
2567debugStream (int isock, void *buf, int len)
2568{
2569}
2570
2571void
2572debugHead (int i, int j, void *buf)
2573{
2574}
2575
2576
2577void
2578factOut (int severity, int err, char *message)
2579{
2580 static FILE *fd;
2581 static int file = 0;
2582
2583 if (file == 0) {
2584 printf ("open file\n");
2585 fd = fopen ("x.out", "w+");
2586 file = 999;
2587 }
2588
2589 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2590
2591 if (severity != kDebug)
2592 printf ("%3d %3d | %s\n", severity, err, message);
2593}
2594
2595
2596
2597int
2598main ()
2599{
2600 int i, b, c, p;
2601 char ipStr[100];
2602 struct in_addr IPaddr;
2603
2604 g_maxMem = 1024 * 1024; //MBytes
2605//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2606 g_maxMem = g_maxMem * 200; //100MBytes
2607
2608 g_maxProc = 20;
2609 g_maxSize = 30000;
2610
2611 g_runStat = 40;
2612
2613 i = 0;
2614
2615// version for standard crates
2616//for (c=0; c<4,c++) {
2617// for (b=0; b<10; b++) {
2618// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2619//
2620// inet_pton(PF_INET, ipStr, &IPaddr) ;
2621//
2622// g_port[i].sockAddr.sin_family = PF_INET;
2623// g_port[i].sockAddr.sin_port = htons(5000) ;
2624// g_port[i].sockAddr.sin_addr = IPaddr ;
2625// g_port[i].sockDef = 1 ;
2626// i++ ;
2627// }
2628//}
2629//
2630//version for PC-test *
2631 for (c = 0; c < 4; c++) {
2632 for (b = 0; b < 10; b++) {
2633 sprintf (ipStr, "10.0.%d.11", 128 + c);
2634 if (c < 2)
2635 sprintf (ipStr, "10.0.%d.11", 128);
2636 else
2637 sprintf (ipStr, "10.0.%d.11", 131);
2638// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2639
2640 inet_pton (PF_INET, ipStr, &IPaddr);
2641 p = 31919 + 100 * c + 10 * b;
2642
2643
2644 g_port[i].sockAddr.sin_family = PF_INET;
2645 g_port[i].sockAddr.sin_port = htons (p);
2646 g_port[i].sockAddr.sin_addr = IPaddr;
2647 g_port[i].sockDef = 1;
2648
2649 i++;
2650 }
2651 }
2652
2653
2654//g_port[17].sockDef =-1 ;
2655//g_actBoards-- ;
2656
2657 StartEvtBuild ();
2658
2659 return 0;
2660
2661}
2662#endif
Note: See TracBrowser for help on using the repository browser.