source: trunk/FACT++/src/EventBuilder.c@ 12619

Last change on this file since 12619 was 12520, checked in by tbretz, 13 years ago
Do not swap the package length... for some unknown reasons this also doesn't work an produce crashes.
File size: 92.0 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6
7#include <stdlib.h>
8#include <stdint.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include "EventBuilder.h"
26
27enum Severity
28{
29 kMessage = 10, ///< Just a message, usually obsolete
30 kInfo = 20, ///< An info telling something which can be interesting to know
31 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
32 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
33 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
34 kDebug = 99, ///< A message used for debugging only
35};
36
37#define MIN_LEN 32 // min #bytes needed to interpret FADheader
38#define MAX_LEN 256*1024 // size of read-buffer per socket
39
40//#define nanosleep(x,y)
41
42extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
43extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
44extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
45//extern int runFinish (uint32_t runnr);
46
47extern void factOut (int severity, int err, char *message);
48
49extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
50
51
52extern void factStat (GUI_STAT gj);
53
54extern void factStatNew (EVT_STAT gi);
55
56extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
57
58extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
59 int8_t * buffer);
60
61extern void debugHead (int i, int j, void *buf);
62
63extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
64 int32_t runnr, int state, uint32_t tsec,
65 uint32_t tusec);
66extern void debugStream (int isock, void *buf, int len);
67
68int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
69
70
71
72
73
74int g_maxProc;
75int g_maxSize;
76int gi_maxSize;
77int gi_maxProc;
78
79uint g_actTime;
80uint g_actUsec;
81int g_runStat;
82int g_reset;
83int g_useFTM;
84
85int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
86size_t g_maxMem; //maximum memory allowed for buffer
87
88//no longer needed ...
89int g_maxBoards; //maximum number of boards to be initialized
90int g_actBoards;
91//
92
93FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
94
95
96int gi_runStat;
97int gp_runStat;
98int gw_runStat;
99
100int gi_memStat = +1;
101
102uint32_t gi_myRun = 0;
103uint32_t actrun = 0;
104
105
106uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
107
108//uint gi_EvtStart= 0 ;
109//uint gi_EvtRead = 0 ;
110//uint gi_EvtBad = 0 ;
111//uint gi_EvtTot = 0 ;
112//size_t gi_usedMem = 0 ;
113
114//uint gw_EvtTot = 0 ;
115//uint gp_EvtTot = 0 ;
116
117PIX_MAP g_pixMap[NPIX];
118
119EVT_STAT gi;
120GUI_STAT gj;
121
122EVT_CTRL evtCtrl; //control of events during processing
123int evtIdx[MAX_EVT * MAX_RUN]; //index from mBuffer to evtCtrl
124
125WRK_DATA mBuffer[MAX_EVT * MAX_RUN]; //local working space
126
127#define MXSTR 1000
128char str[MXSTR];
129
130//ETIENNE
131#define MAX_SLOTS_PER_CHUNK 100
132
133typedef struct {
134 int32_t eventNumber;
135 int32_t chunk;
136 int32_t slot;
137} CHUNK_MAPPING;
138
139CHUNK_MAPPING mBufferMapping[MAX_EVT * MAX_RUN];
140
141#define MAX_EVT_MEM (sizeof(EVENT) + NPIX*1024*2 + NTMARK*1024*2)
142#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
143#define MAX_SLOT_SIZE (MAX_EVT_MEM + MAX_HEAD_MEM)
144#define MAX_CHUNK_SIZE (MAX_SLOT_SIZE*MAX_SLOTS_PER_CHUNK)
145
146typedef struct {
147 void * pointers[MAX_SLOTS_PER_CHUNK];
148 int32_t events[MAX_SLOTS_PER_CHUNK];
149 int32_t nFreeSlots;
150 int32_t nSlots;
151} ETI_CHUNK;
152
153int32_t numAllocatedChunks = 0;
154
155#define MAX_CHUNKS 8096
156ETI_CHUNK EtiMemoryChunks[MAX_CHUNKS];
157
158void* ETI_Malloc(int evtId, int evtIndex)
159{
160 for (int i=0;i<numAllocatedChunks;i++) {
161 if (EtiMemoryChunks[i].nFreeSlots > 0) {
162 for (int j=0;j<EtiMemoryChunks[i].nSlots;j++)
163 {
164 if (EtiMemoryChunks[i].events[j] == -1)
165 {
166 EtiMemoryChunks[i].events[j] = evtId;
167 EtiMemoryChunks[i].nFreeSlots--;
168 mBufferMapping[evtIndex].eventNumber = evtId;
169 mBufferMapping[evtIndex].chunk = i;
170 mBufferMapping[evtIndex].slot = j;
171 return EtiMemoryChunks[i].pointers[j];
172 }
173 }
174 //If I reach this point then we have a problem because it should have found
175 //a free spot just above.
176 }
177 }
178 //If we reach this point this means that we should allocate more memory
179 int32_t numNewSlots = 0;
180 if ((numAllocatedChunks + 1)*MAX_CHUNK_SIZE < g_maxMem)
181 numNewSlots = MAX_SLOTS_PER_CHUNK;
182 else
183 numNewSlots = (g_maxMem - numAllocatedChunks*MAX_CHUNK_SIZE)/MAX_SLOT_SIZE;
184
185 if (numNewSlots == 0)//cannot allocate more without exceeding the max mem limit
186 {
187 return NULL;
188 }
189
190 EtiMemoryChunks[numAllocatedChunks].pointers[0] = malloc(MAX_SLOT_SIZE*numNewSlots);
191 if (EtiMemoryChunks[numAllocatedChunks].pointers[0] == NULL)
192 {
193 snprintf (str, MXSTR, "Allocation of %lu bytes failed. %d chunks are currently allocated (max allowed %lu bytes)", MAX_CHUNK_SIZE, numAllocatedChunks, g_maxMem);
194 factOut (kError, 000, str);
195 return NULL;
196 }
197
198 EtiMemoryChunks[numAllocatedChunks].nSlots = numNewSlots;
199 EtiMemoryChunks[numAllocatedChunks].events[0] = evtId;
200 EtiMemoryChunks[numAllocatedChunks].nFreeSlots = numNewSlots-1;
201 mBufferMapping[evtIndex].eventNumber = evtId;
202 mBufferMapping[evtIndex].chunk = numAllocatedChunks;
203 mBufferMapping[evtIndex].slot = 0;
204
205 for (int i=1;i<numNewSlots;i++)
206 {
207 EtiMemoryChunks[numAllocatedChunks].pointers[i] = &(((char*)(EtiMemoryChunks[numAllocatedChunks].pointers[i-1]))[MAX_SLOT_SIZE]);
208 EtiMemoryChunks[numAllocatedChunks].events[i] = -1;
209 }
210 numAllocatedChunks++;
211
212 return EtiMemoryChunks[numAllocatedChunks-1].pointers[0];
213}
214
215void ETI_Free(int evtId, int evtIndex)
216{
217 ETI_CHUNK* currentChunk = &EtiMemoryChunks[mBufferMapping[evtIndex].chunk];
218 if (currentChunk->events[mBufferMapping[evtIndex].slot] != evtId)
219 {
220 snprintf (str, MXSTR, "Mismatch in chunk mapping table. Expected evtId %d. Got %d. No memory was freed.", evtId, currentChunk->events[mBufferMapping[evtIndex].slot]);
221 factOut (kError, 000, str);
222 return;
223 }
224 currentChunk->events[mBufferMapping[evtIndex].slot] = -1;
225 currentChunk->nFreeSlots++;
226
227 int chunkIndex = mBufferMapping[evtIndex].chunk;
228 if (chunkIndex != numAllocatedChunks-1)
229 return;
230
231 while (EtiMemoryChunks[chunkIndex].nFreeSlots == EtiMemoryChunks[chunkIndex].nSlots)
232 {//free this chunk
233 if (EtiMemoryChunks[chunkIndex].pointers[0] == NULL)
234 {
235 snprintf(str, MXSTR, "Chunk %d not allocated as it ought to be. Skipping memory release.", chunkIndex);
236 factOut(kError, 000, str);
237 return;
238 }
239 free(EtiMemoryChunks[chunkIndex].pointers[0]);
240 EtiMemoryChunks[chunkIndex].pointers[0] = NULL;
241 numAllocatedChunks--;
242 chunkIndex--;
243 if (numAllocatedChunks == 0)
244 break;
245 }
246}
247//END ETIENNE
248
249
250
251RUN_HEAD actRun;
252
253RUN_CTRL runCtrl[MAX_RUN];
254
255RUN_TAIL runTail[MAX_RUN];
256
257
258/*
259*** Definition of rdBuffer to read in IP packets; keep it global !!!!
260 */
261
262
263typedef union
264{
265 int8_t B[MAX_LEN];
266 int16_t S[MAX_LEN / 2];
267 int32_t I[MAX_LEN / 4];
268 int64_t L[MAX_LEN / 8];
269} CNV_FACT;
270
271typedef struct
272{
273 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
274 int32_t bufPos; //next byte to read to the buffer next
275 int32_t bufLen; //number of bytes left to read
276// size_t bufLen; //number of bytes left to read size_t might be better
277 int32_t skip; //number of bytes skipped before start of event
278
279 int errCnt; //how often connect failed since last successful
280 int sockStat; //-1 if socket not yet connected , 99 if not exist
281 int socket; //contains the sockets
282 struct sockaddr_in SockAddr; //IP for each socket
283
284 int evtID; // event ID of event currently read
285 int runID; // run "
286 int ftmID; // event ID from FTM
287 uint fadLen; // FADlength of event currently read
288 int fadVers; // Version of FAD
289 int ftmTyp; // trigger type
290 int board; // boardID (softwareID: 0..40 )
291 int Port;
292
293 CNV_FACT *rBuf;
294
295#ifdef EVTDEBUG
296 CNV_FACT *xBuf; //a copy of rBuf (temporary for debuging)
297#endif
298
299} READ_STRUCT;
300
301
302typedef union
303{
304 int8_t B[2];
305 int16_t S;
306} SHORT_BYTE;
307
308
309
310
311
312SHORT_BYTE start, stop;
313
314READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
315
316
317
318/*-----------------------------------------------------------------*/
319
320
321/*-----------------------------------------------------------------*/
322
323int
324runFinish1 (uint32_t runnr)
325{
326 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
327 runnr);
328 factOut (kInfo, 173, str); //but continue anyhow
329 return 0;
330}
331int
332runFinish (uint32_t runnr)
333{
334 snprintf (str, MXSTR, "Should finish run %d (but not yet possible)",
335 runnr);
336 factOut (kInfo, 173, str); //but continue anyhow
337 return 0;
338}
339
340int
341GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
342 READ_STRUCT * rd)
343{
344/*
345*** generate Address, create sockets and allocates readbuffer for it
346***
347*** if flag==0 generate socket and buffer
348*** <0 destroy socket and buffer
349*** >0 close and redo socket
350***
351*** sid : board*7 + port id
352 */
353
354 int j;
355 int optval = 1; //activate keepalive
356 socklen_t optlen = sizeof (optval);
357
358
359 if (sid % 7 >= NUMSOCK) {
360 //this is a not used socket, so do nothing ...
361 rd->sockStat = 77;
362 rd->rBuf = NULL ;
363 return 0;
364 }
365
366 if (rd->sockStat == 0) { //close socket if open
367 j = close (rd->socket);
368 if (j > 0) {
369 snprintf (str, MXSTR, "Closing socket %d failed: %m (close,rc=%d)", sid, errno);
370 factOut (kFatal, 771, str);
371 } else {
372 snprintf (str, MXSTR, "Succesfully closed socket %d", sid);
373 factOut (kInfo, 771, str);
374 }
375 }
376
377 rd->sockStat = 99;
378
379 if (flag < 0) {
380 free (rd->rBuf); //and never open again
381#ifdef EVTDEBUG
382 free (rd->xBuf); //and never open again
383#endif
384 rd->rBuf = NULL;
385 return 0;
386 }
387
388
389 if (flag == 0) { //generate address and buffer ...
390 rd->Port = port;
391 rd->SockAddr.sin_family = sockAddr->sin_family;
392 rd->SockAddr.sin_port = htons (port);
393 rd->SockAddr.sin_addr = sockAddr->sin_addr;
394
395#ifdef EVTDEBUG
396 rd->xBuf = malloc (sizeof (CNV_FACT));
397#endif
398 rd->rBuf = malloc (sizeof (CNV_FACT));
399 if (rd->rBuf == NULL) {
400 snprintf (str, MXSTR, "Could not create local buffer %d (malloc failed)", sid);
401 factOut (kFatal, 774, str);
402 rd->sockStat = 77;
403 return -3;
404 }
405 }
406
407
408 if ((rd->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
409 snprintf (str, MXSTR, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno);
410 factOut (kFatal, 773, str);
411 rd->sockStat = 88;
412 return -2;
413 }
414 optval = 1;
415 if (setsockopt (rd->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
416 snprintf (str, MXSTR, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
417 factOut (kInfo, 173, str); //but continue anyhow
418 }
419 optval = 10; //start after 10 seconds
420 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
421 snprintf (str, MXSTR, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
422 factOut (kInfo, 173, str); //but continue anyhow
423 }
424 optval = 10; //do every 10 seconds
425 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
426 snprintf (str, MXSTR, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
427 factOut (kInfo, 173, str); //but continue anyhow
428 }
429 optval = 2; //close after 2 unsuccessful tries
430 if (setsockopt (rd->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
431 snprintf (str, MXSTR, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
432 factOut (kInfo, 173, str); //but continue anyhow
433 }
434
435
436 snprintf (str, MXSTR, "Successfully generated socket %d", sid);
437 factOut (kInfo, 773, str);
438 rd->sockStat = -1; //try to (re)open socket
439 rd->errCnt = 0;
440 return 0;
441
442} /*-----------------------------------------------------------------*/
443
444 /*-----------------------------------------------------------------*/
445
446
447
448
449int
450mBufInit ()
451{
452// initialize mBuffer (mark all entries as unused\empty)
453
454 uint32_t actime = g_actTime + 50000000;
455
456 for (int i = 0; i < MAX_EVT * MAX_RUN; i++) {
457 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
458 mBuffer[i].runNum = 0;
459
460 evtCtrl.evtBuf[i] = -1;
461 evtCtrl.evtStat[i] = -1;
462 evtCtrl.pcTime[i] = actime; //initiate to far future
463
464 //ETIENNE
465 mBufferMapping[i].chunk = -1;
466 mBufferMapping[i].eventNumber = -1;
467 mBufferMapping[i].slot = -1;
468 //END ETIENNE
469 }
470 for (int j=0;j<MAX_CHUNKS;j++)
471 EtiMemoryChunks[j].pointers[0] = NULL;
472
473 actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
474
475 evtCtrl.frstPtr = 0;
476 evtCtrl.lastPtr = 0;
477
478 return 0;
479
480} /*-----------------------------------------------------------------*/
481
482
483
484
485int
486mBufEvt (int evID, uint runID, int nRoi[], int sk,
487 int fadlen, int trgTyp, int trgNum, int fadNum)
488{
489// generate a new Event into mBuffer:
490// make sure only complete Event are possible, so 'free' will always work
491// returns index into mBuffer[], or negative value in case of error
492// error: <-9000 if roi screwed up (not consistent with run)
493// <-8000 (not consistent with event)
494// <-7000 (not consistent with board)
495// < 0 if no space left
496
497 struct timeval *tv, atv;
498 tv = &atv;
499 uint32_t tsec, tusec;
500 uint oldest;
501 int jold;
502
503 int i, b, evFree;
504// int headmem = 0;
505 size_t needmem = 0;
506
507
508 b = sk / 7;
509
510 if (nRoi[0] < 0 || nRoi[0] > 1024) {
511 snprintf (str, MXSTR, "Illegal roi in channel 0: %d (allowed: 0<=roi<=1024)", nRoi[0]);
512 factOut (kError, 999, str);
513 gj.badRoiR++;
514 gj.badRoi[b]++;
515 return -9999;
516 }
517
518 for (int jr = 1; jr < 8; jr++) {
519 if (nRoi[jr] != nRoi[0]) {
520 snprintf (str, MXSTR, "Mismatch of roi (%d) in channel %d with roi (%d) in channel 0.", nRoi[jr], jr, nRoi[0]);
521 factOut (kError, 711, str);
522 gj.badRoiB++;
523 gj.badRoi[b]++;
524 return -7101;
525 }
526 }
527 if (nRoi[8] < nRoi[0]) {
528 snprintf (str, MXSTR, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", nRoi[8], nRoi[0]);
529 factOut (kError, 712, str);
530 gj.badRoiB++;
531 gj.badRoi[b]++;
532 return -7102;
533 }
534
535
536 i = evID % MAX_EVT;
537 evFree = -1;
538
539 for (int k = 0; k < MAX_RUN; k++) {
540 if (mBuffer[i].evNum == evID && mBuffer[i].runNum == runID) { //event is already registered;
541 // is it ok ????
542 if (mBuffer[i].nRoi != nRoi[0]
543 || mBuffer[i].nRoiTM != nRoi[8]) {
544 snprintf (str, MXSTR, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
545 mBuffer[i].nRoi, mBuffer[i].nRoiTM, nRoi[0], nRoi[8]);
546 factOut (kError, 821, str);
547 gj.badRoiE++;
548 gj.badRoi[b]++;
549 return -8201;
550 }
551// count for inconsistencies
552
553 if (mBuffer[i].trgNum != trgNum)
554 mBuffer[i].Errors[0]++;
555 if (mBuffer[i].fadNum != fadNum)
556 mBuffer[i].Errors[1]++;
557 if (mBuffer[i].trgTyp != trgTyp)
558 mBuffer[i].Errors[2]++;
559
560 //everything seems fine so far ==> use this slot ....
561 return i;
562 }
563 if (evFree < 0 && mBuffer[i].evNum < 0)
564 evFree = i;
565 i += MAX_EVT;
566 }
567
568
569 //event does not yet exist; create it
570 if (evFree < 0) { //no space available in ctrl
571 snprintf (str, MXSTR, "No control slot to keep event %d", evID);
572 factOut (kError, 881, str);
573 return -1;
574 }
575 i = evFree; //found free entry; use it ...
576
577 gettimeofday (tv, NULL);
578 tsec = atv.tv_sec;
579 tusec = atv.tv_usec;
580
581 //check if runId already registered in runCtrl
582 evFree = -1;
583 oldest = g_actTime + 1000;
584 jold = -1;
585 for (int k = 0; k < MAX_RUN; k++) {
586 if (runCtrl[k].runId == runID) {
587// if (runCtrl[k].procId > 0) { //run is closed -> reject
588// snprintf (str, MXSTR, "skip event since run %d finished", runID);
589// factOut (kInfo, 931, str);
590// return -21;
591// }
592
593 if (runCtrl[k].roi0 != nRoi[0]
594 || runCtrl[k].roi8 != nRoi[8]) {
595 snprintf (str, MXSTR, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d.",
596 runCtrl[k].roi0, runCtrl[k].roi8, nRoi[0], nRoi[8]);
597 factOut (kError, 931, str);
598 gj.badRoiR++;
599 gj.badRoi[b]++;
600 return -9301;
601 }
602 goto RUNFOUND;
603 } else if (evFree < 0 && runCtrl[k].fileId < 0) { //not yet used
604 evFree = k;
605 } else if (evFree < 0 && runCtrl[k].fileId > 0) { //already closed
606 if (runCtrl[k].closeTime < oldest) {
607 oldest = runCtrl[k].closeTime;
608 jold = k;
609 }
610 }
611 }
612
613 if (evFree < 0 && jold < 0) {
614 snprintf (str, MXSTR, "Not able to register the new run %d", runID);
615 factOut (kFatal, 883, str);
616 return -1001;
617 } else {
618 if (evFree < 0)
619 evFree = jold;
620 snprintf (str, MXSTR, "New run %d (evFree=%d) registered with roi=%d and roi_tm=%d", runID,
621 evFree, nRoi[0], nRoi[8]);
622 factOut (kInfo, 503, str);
623 runCtrl[evFree].runId = runID;
624 runCtrl[evFree].roi0 = nRoi[0];
625 runCtrl[evFree].roi8 = nRoi[8];
626 runCtrl[evFree].fileId = -2;
627 runCtrl[evFree].procId = -2;
628 runCtrl[evFree].lastEvt = -1;
629 runCtrl[evFree].nextEvt = 0;
630 runCtrl[evFree].actEvt = 0;
631 runCtrl[evFree].procEvt = 0;
632 runCtrl[evFree].maxEvt = 999999999; //max number events allowed
633 runCtrl[evFree].firstUsec = tusec;
634 runCtrl[evFree].firstTime = runCtrl[evFree].lastTime = tsec;
635 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
636// runCtrl[evFree].lastTime = 0;
637
638 runTail[evFree].nEventsOk =
639 runTail[evFree].nEventsRej =
640 runTail[evFree].nEventsBad =
641 runTail[evFree].PCtime0 = runTail[evFree].PCtimeX = 0;
642 }
643
644 RUNFOUND:
645 //ETIENNE
646/* needmem = sizeof (EVENT) + NPIX * nRoi[0] * 2 + NTMARK * nRoi[0] * 2; //
647
648 headmem = NBOARDS * sizeof (PEVNT_HEADER);
649
650 if (gj.usdMem + needmem + headmem + gi_maxSize > g_maxMem) {
651 gj.maxMem = gj.usdMem + needmem + headmem + gi_maxSize;
652 if (gi_memStat > 0) {
653 gi_memStat = -99;
654 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
655 evID, sk);
656 factOut (kError, 882, str);
657 } else {
658 snprintf (str, MXSTR, "no memory left to keep event %6d sock %3d",
659 evID, sk);
660 factOut (kDebug, 882, str);
661 }
662 return -11;
663 }
664*/
665 mBuffer[i].FADhead = ETI_Malloc(evID, i);
666 mBuffer[i].buffer = NULL;
667 if (mBuffer[i].FADhead != NULL)
668 mBuffer[i].fEvent = (EVENT*)&(((char*)(mBuffer[i].FADhead))[MAX_HEAD_MEM]);
669 else
670 {
671 mBuffer[i].fEvent = NULL;
672 gj.usdMem = 0;
673 for (int k=0;k<numAllocatedChunks;k++)
674 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
675 if (gj.usdMem > gj.maxMem)
676 gj.maxMem = gj.usdMem;
677 if (gi_memStat > 0) {
678 gi_memStat = -99;
679 snprintf (str, MXSTR, "No memory left to keep event %6d sock %3d",
680 evID, sk);
681 factOut (kError, 882, str);
682 } else {
683 snprintf (str, MXSTR, "No memory left to keep event %6d sock %3d",
684 evID, sk);
685 factOut (kDebug, 882, str);
686 }
687 return -11;
688 }
689 /*
690 mBuffer[i].FADhead = malloc (headmem);
691 if (mBuffer[i].FADhead == NULL) {
692 snprintf (str, MXSTR, "malloc header failed for event %d", evID);
693 factOut (kError, 882, str);
694 return -12;
695 }
696
697 mBuffer[i].fEvent = malloc (needmem);
698 if (mBuffer[i].fEvent == NULL) {
699 snprintf (str, MXSTR, "malloc data failed for event %d", evID);
700 factOut (kError, 882, str);
701 free (mBuffer[i].FADhead);
702 mBuffer[i].FADhead = NULL;
703 return -22;
704 }
705
706 mBuffer[i].buffer = malloc (gi_maxSize);
707 if (mBuffer[i].buffer == NULL) {
708 snprintf (str, MXSTR, "malloc buffer failed for event %d", evID);
709 factOut (kError, 882, str);
710 free (mBuffer[i].FADhead);
711 mBuffer[i].FADhead = NULL;
712 free (mBuffer[i].fEvent);
713 mBuffer[i].fEvent = NULL;
714 return -32;
715 }*/
716 //END ETIENNE
717 //flag all boards as unused
718 mBuffer[i].nBoard = 0;
719 for (int k = 0; k < NBOARDS; k++) {
720 mBuffer[i].board[k] = -1;
721 }
722 //flag all pixels as unused
723 for (int k = 0; k < NPIX; k++) {
724 mBuffer[i].fEvent->StartPix[k] = -1;
725 }
726 //flag all TMark as unused
727 for (int k = 0; k < NTMARK; k++) {
728 mBuffer[i].fEvent->StartTM[k] = -1;
729 }
730
731 mBuffer[i].fEvent->NumBoards = 0;
732 mBuffer[i].fEvent->PCUsec = tusec;
733 mBuffer[i].fEvent->PCTime = mBuffer[i].pcTime = tsec;
734 mBuffer[i].nRoi = nRoi[0];
735 mBuffer[i].nRoiTM = nRoi[8];
736 mBuffer[i].evNum = evID;
737 mBuffer[i].runNum = runID;
738 mBuffer[i].fadNum = fadNum;
739 mBuffer[i].trgNum = trgNum;
740 mBuffer[i].trgTyp = trgTyp;
741 mBuffer[i].evtLen = needmem;
742 mBuffer[i].Errors[0] =
743 mBuffer[i].Errors[1] = mBuffer[i].Errors[2] = mBuffer[i].Errors[3] = 0;
744//ETIENNE
745 //gj.usdMem += needmem + headmem + gi_maxSize;
746 gj.usdMem = 0;
747 for (int k=0;k<numAllocatedChunks;k++)
748 {
749 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
750 }
751//END ETIENNE
752 if (gj.usdMem > gj.maxMem)
753 gj.maxMem = gj.usdMem;
754
755 gj.bufTot++;
756 if (gj.bufTot > gj.maxEvt)
757 gj.maxEvt = gj.bufTot;
758
759 gj.rateNew++;
760
761 //register event in 'active list (reading)'
762
763 evtCtrl.evtBuf[evtCtrl.lastPtr] = i;
764 evtCtrl.evtStat[evtCtrl.lastPtr] = 0;
765 evtCtrl.pcTime[evtCtrl.lastPtr] = g_actTime;
766 evtIdx[i] = evtCtrl.lastPtr;
767
768
769 snprintf (str, MXSTR,
770 "%5d %8d start new evt %8d %8d sock %3d len %5d t %10d", evID,
771 runID, i, evtCtrl.lastPtr, sk, fadlen, mBuffer[i].pcTime);
772 factOut (kDebug, -11, str);
773 evtCtrl.lastPtr++;
774 if (evtCtrl.lastPtr == MAX_EVT * MAX_RUN)
775 evtCtrl.lastPtr = 0;
776
777 gi.evtGet++;
778
779 return i;
780
781} /*-----------------------------------------------------------------*/
782
783
784
785
786int
787mBufFree (int i)
788{
789//delete entry [i] from mBuffer:
790//(and make sure multiple calls do no harm ....)
791
792 int headmem = 0;
793 int evid;
794 size_t freemem = 0;
795
796 evid = mBuffer[i].evNum;
797 freemem = mBuffer[i].evtLen;
798 //ETIENNE
799 ETI_Free(evid, i);
800// free (mBuffer[i].fEvent);
801 mBuffer[i].fEvent = NULL;
802
803// free (mBuffer[i].FADhead);
804 mBuffer[i].FADhead = NULL;
805
806// free (mBuffer[i].buffer);
807 mBuffer[i].buffer = NULL;
808//END ETIENNE
809 headmem = NBOARDS * sizeof (PEVNT_HEADER);
810 mBuffer[i].evNum = mBuffer[i].nRoi = -1;
811 mBuffer[i].runNum = 0;
812//ETIENNE
813// gj.usdMem = gj.usdMem - freemem - headmem - gi_maxSize;
814 gj.usdMem = 0;
815 for (int k=0;k<numAllocatedChunks;k++)
816 {
817 gj.usdMem += EtiMemoryChunks[k].nSlots * MAX_SLOT_SIZE;
818 }
819//END ETIENNE
820 gj.bufTot--;
821
822 if (gi_memStat < 0) {
823 if (gj.usdMem <= 0.75 * gj.maxMem)
824 gi_memStat = +1;
825 }
826
827
828 return 0;
829
830} /*-----------------------------------------------------------------*/
831
832
833void
834resetEvtStat ()
835{
836 for (int i = 0; i < MAX_SOCK; i++)
837 gi.numRead[i] = 0;
838
839 for (int i = 0; i < NBOARDS; i++) {
840 gi.gotByte[i] = 0;
841 gi.gotErr[i] = 0;
842
843 }
844
845 gi.evtGet = 0; //#new Start of Events read
846 gi.evtTot = 0; //#complete Events read
847 gi.evtErr = 0; //#Events with Errors
848 gi.evtSkp = 0; //#Events incomplete (timeout)
849
850 gi.procTot = 0; //#Events processed
851 gi.procErr = 0; //#Events showed problem in processing
852 gi.procTrg = 0; //#Events accepted by SW trigger
853 gi.procSkp = 0; //#Events rejected by SW trigger
854
855 gi.feedTot = 0; //#Events used for feedBack system
856 gi.feedErr = 0; //#Events rejected by feedBack
857
858 gi.wrtTot = 0; //#Events written to disk
859 gi.wrtErr = 0; //#Events with write-error
860
861 gi.runOpen = 0; //#Runs opened
862 gi.runClose = 0; //#Runs closed
863 gi.runErr = 0; //#Runs with open/close errors
864
865 return;
866} /*-----------------------------------------------------------------*/
867
868
869
870void
871initReadFAD ()
872{
873 return;
874} /*-----------------------------------------------------------------*/
875
876
877
878void *
879readFAD (void *ptr)
880{
881/* *** main loop reading FAD data and sorting them to complete events */
882 int head_len, frst_len, numok, numok2, numokx, dest, evID, i, k;
883 int actBoards = 0, minLen;
884 int32_t jrd;
885 uint gi_SecTime; //time in seconds
886 int boardId, roi[9], drs, px, src, pixS, pixH, pixC, pixR, tmS;
887
888 int goodhed = 0;
889
890 int sockDef[NBOARDS]; //internal state of sockets
891 int jrdx;
892
893
894 struct timespec xwait;
895
896
897 struct timeval *tv, atv;
898 tv = &atv;
899 uint32_t tsec, tusec;
900
901
902 snprintf (str, MXSTR, "Start initializing (readFAD)");
903 factOut (kInfo, -1, str);
904
905 int cpu = 7; //read thread
906// cpu_set_t mask;
907
908/* CPU_ZERO initializes all the bits in the mask to zero. */
909// CPU_ZERO (&mask);
910/* CPU_SET sets only the bit corresponding to cpu. */
911 cpu = 7;
912// CPU_SET (cpu, &mask);
913
914/* sched_setaffinity returns 0 in success */
915// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
916// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
917// factOut (kWarn, -1, str);
918// }
919
920
921 head_len = sizeof (PEVNT_HEADER);
922 frst_len = head_len; //max #bytes to read first: fad_header only, so each event must be longer, even for roi=0
923 minLen = head_len; //min #bytes needed to check header: full header for debug
924
925 start.S = 0xFB01;
926 stop.S = 0x04FE;
927
928/* initialize run control logics */
929 for (i = 0; i < MAX_RUN; i++) {
930 runCtrl[i].runId = 0;
931 runCtrl[i].fileId = -2;
932 runCtrl[i].procId = -2;
933 }
934 gi_resetS = gi_resetR = 9;
935 for (i = 0; i < NBOARDS; i++)
936 sockDef[i] = 0;
937
938 START:
939 gettimeofday (tv, NULL);
940 g_actTime = tsec = atv.tv_sec;
941 g_actUsec = tusec = atv.tv_usec;
942 gi_myRun = g_actTime;
943 evtCtrl.frstPtr = 0;
944 evtCtrl.lastPtr = 0;
945
946 gi_SecTime = g_actTime;
947 gi_runStat = g_runStat;
948 gj.readStat = g_runStat;
949 numok = numok2 = 0;
950
951 int cntsock = 8 - NUMSOCK ;
952
953 if (gi_resetS > 0) {
954 //make sure all sockets are preallocated as 'not exist'
955 for (i = 0; i < MAX_SOCK; i++) {
956 rd[i].socket = -1;
957 rd[i].sockStat = 99;
958 }
959
960 for (k = 0; k < NBOARDS; k++) {
961 gi_NumConnect[k] = 0;
962 gi.numConn[k] = 0;
963 gj.numConn[k] = 0;
964 gj.errConn[k] = 0;
965 gj.rateBytes[k] = 0;
966 gj.totBytes[k] = 0;
967 }
968
969 }
970
971
972 if (gi_resetR > 0) {
973 resetEvtStat ();
974 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
975 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
976 gj.totMem = g_maxMem;
977 gj.bufNew = gj.bufEvt = 0;
978 gj.badRoiE = gj.badRoiR = gj.badRoiB =
979 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
980
981 int b;
982 for (b = 0; b < NBOARDS; b++)
983 gj.badRoi[b] = 0;
984
985 mBufInit (); //initialize buffers
986
987 snprintf (str, MXSTR, "End initializing (readFAD)");
988 factOut (kInfo, -1, str);
989 }
990
991
992 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
993
994 while (g_runStat >= 0 && g_reset == 0) { //loop until global variable g_runStat claims stop
995
996 gi_runStat = g_runStat;
997 gj.readStat = g_runStat;
998 gettimeofday (tv, NULL);
999 g_actTime = tsec = atv.tv_sec;
1000 g_actUsec = tusec = atv.tv_usec;
1001
1002
1003 int b, p, p0, s0, nch;
1004 nch = 0;
1005 for (b = 0; b < NBOARDS; b++) {
1006 k = b * 7;
1007 if (g_port[b].sockDef != sockDef[b]) { //something has changed ...
1008 nch++;
1009 gi_NumConnect[b] = 0; //must close all connections
1010 gi.numConn[b] = 0;
1011 gj.numConn[b] = 0;
1012 if (sockDef[b] == 0)
1013 s0 = 0; //sockets to be defined and opened
1014 else if (g_port[b].sockDef == 0)
1015 s0 = -1; //sockets to be destroyed
1016 else
1017 s0 = +1; //sockets to be closed and reopened
1018
1019 if (s0 == 0)
1020 p0 = ntohs (g_port[b].sockAddr.sin_port);
1021 else
1022 p0 = 0;
1023
1024 for (p = p0 + 1; p < p0 + 8; p++) {
1025 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
1026 k++;
1027 }
1028 sockDef[b] = g_port[b].sockDef;
1029 }
1030 }
1031
1032 if (nch > 0) {
1033 actBoards = 0;
1034 for (b = 0; b < NBOARDS; b++) {
1035 if (sockDef[b] > 0)
1036 actBoards++;
1037 }
1038 }
1039
1040
1041 jrdx = 0;
1042 numokx = 0;
1043 numok = 0; //count number of succesfull actions
1044
1045 for (i = 0; i < MAX_SOCK; i++) { //check all sockets if something to read
1046 b = i / 7 ;
1047 p = i % 7 ;
1048
1049if ( p >= NUMSOCK) { ; }
1050else {
1051 if (sockDef[b] > 0)
1052 s0 = +1;
1053 else
1054 s0 = -1;
1055
1056 if (rd[i].sockStat < 0) { //try to connect if not yet done
1057 if (rd[i].sockStat == -1) {
1058 rd[i].sockStat = connect (rd[i].socket,
1059 (struct sockaddr *) &rd[i].SockAddr,
1060 sizeof (rd[i].SockAddr));
1061 if (rd[i].sockStat == -1) {
1062 rd[i].errCnt++ ;
1063// if (rd[i].errCnt < 100) rd[i].sockStat = rd[i].errCnt ;
1064// else if (rd[i].errCnt < 1000) rd[i].sockStat = 1000 ;
1065// else rd[i].sockStat = 10000 ;
1066 }
1067//printf("try to connect %d -> %d\n",i,rd[i].sockStat);
1068
1069 }
1070 if (rd[i].sockStat < -1 ) {
1071 rd[i].sockStat++ ;
1072 }
1073 if (rd[i].sockStat == 0) { //successfull ==>
1074 if (sockDef[b] > 0) {
1075 rd[i].bufTyp = 0; // expect a header
1076 rd[i].bufLen = frst_len; // max size to read at begining
1077 } else {
1078 rd[i].bufTyp = -1; // full data to be skipped
1079 rd[i].bufLen = MAX_LEN; //huge for skipping
1080 }
1081 rd[i].bufPos = 0; // no byte read so far
1082 rd[i].skip = 0; // start empty
1083// gi_NumConnect[b]++;
1084 gi_NumConnect[b] += cntsock ;
1085
1086 gi.numConn[b]++;
1087 gj.numConn[b]++;
1088 snprintf (str, MXSTR, "New connection %d (number of connections: %d)", b, gi.numConn[b]);
1089 factOut (kInfo, -1, str);
1090 }
1091 }
1092
1093 if (rd[i].sockStat == 0) { //we have a connection ==> try to read
1094 if (rd[i].bufLen > 0) { //might be nothing to read [buffer full]
1095 numok++;
1096 size_t maxread = rd[i].bufLen ;
1097 if (maxread > MAXREAD ) maxread=MAXREAD ;
1098
1099 jrd =
1100 recv (rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
1101 maxread, MSG_DONTWAIT);
1102// rd[i].bufLen, MSG_DONTWAIT);
1103 if (jrd > 0) {
1104 debugStream (i, &rd[i].rBuf->B[rd[i].bufPos], jrd);
1105#ifdef EVTDEBUG
1106 memcpy (&rd[i].xBuf->B[rd[i].bufPos],
1107 &rd[i].rBuf->B[rd[i].bufPos], jrd);
1108 snprintf (str, MXSTR,
1109 "read sock %3d bytes %5d len %5d first %d %d", i,
1110 jrd, rd[i].bufLen, rd[i].rBuf->B[rd[i].bufPos],
1111 rd[i].rBuf->B[rd[i].bufPos + 1]);
1112 factOut (kDebug, 301, str);
1113#endif
1114 }
1115
1116 if (jrd == 0) { //connection has closed ...
1117 snprintf (str, MXSTR, "Socket %d closed by FAD", i);
1118 factOut (kInfo, 441, str);
1119 GenSock (s0, i, 0, NULL, &rd[i]);
1120 gi.gotErr[b]++;
1121// gi_NumConnect[b]--;
1122 gi_NumConnect[b]-= cntsock ;
1123 gi.numConn[b]--;
1124 gj.numConn[b]--;
1125
1126 } else if (jrd < 0) { //did not read anything
1127 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1128 snprintf (str, MXSTR, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);
1129 factOut (kError, 442, str);
1130 gi.gotErr[b]++;
1131 } else
1132 numok--; //else nothing waiting to be read
1133 jrd = 0;
1134 }
1135 } else {
1136 jrd = 0; //did read nothing as requested
1137 snprintf (str, MXSTR, "do not read from socket %d %d", i,
1138 rd[i].bufLen);
1139 factOut (kDebug, 301, str);
1140 }
1141
1142 gi.gotByte[b] += jrd;
1143 gj.rateBytes[b] += jrd;
1144
1145 if (jrd > 0) {
1146 numokx++;
1147 jrdx += jrd;
1148 }
1149
1150
1151 if (rd[i].bufTyp < 0) { // we are skipping this board ...
1152// just do nothing
1153#ifdef EVTDEBUG
1154 snprintf (str, MXSTR, "skipping %d bytes on socket %d", jrd,
1155 i);
1156 factOut (kInfo, 301, str);
1157#endif
1158
1159 } else if (rd[i].bufTyp > 0) { // we are reading data ...
1160 if (jrd < rd[i].bufLen) { //not yet all read
1161 rd[i].bufPos += jrd; //==> prepare for continuation
1162 rd[i].bufLen -= jrd;
1163 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 0, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 0=reading data
1164 } else { //full dataset read
1165 rd[i].bufLen = 0;
1166 rd[i].bufPos = rd[i].fadLen;
1167 if (rd[i].rBuf->B[rd[i].bufPos - 1] != stop.B[0]
1168 || rd[i].rBuf->B[rd[i].bufPos - 2] != stop.B[1]) {
1169 gi.evtErr++;
1170 snprintf (str, MXSTR,
1171 "End-of-event flag wrong on socket %3d for event %4d (len=%5d), expected %3d %3d, got %3d %3d",
1172 i, rd[i].evtID, rd[i].fadLen, stop.B[0], stop.B[1],
1173 rd[i].rBuf->B[rd[i].bufPos - 1], rd[i].rBuf->B[rd[i].bufPos - 2]);
1174 factOut (kError, 301, str);
1175 goto EndBuf;
1176
1177#ifdef EVTDEBUG
1178 } else {
1179 snprintf (str, MXSTR,
1180 "good end of buffer found sock %3d len %5d %d %d : %d %d - %d %d : %d %d",
1181 i, rd[i].fadLen, rd[i].rBuf->B[0],
1182 rd[i].rBuf->B[1], start.B[1], start.B[0],
1183 rd[i].rBuf->B[rd[i].bufPos - 2],
1184 rd[i].rBuf->B[rd[i].bufPos - 1], stop.B[1],
1185 stop.B[0]);
1186 factOut (kDebug, 301, str);
1187#endif
1188 }
1189
1190 if (jrd > 0)
1191 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, 1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1192
1193/* //we have a complete buffer, copy to WORK area
1194 int jr, kr;
1195 int checkRoi;
1196 roi[0] = ntohs (rd[i].rBuf->S[head_len / 2 + 2]);
1197 for (kr = 1; kr < 4; kr++)
1198 {
1199 checkRoi = ntohs(rd[i].rBuf->S[head_len/ 2 +
1200 + kr*(roi[jr-1]+4)]);
1201 if (checkRoi != roi[0])
1202 {
1203 snprintf (str, MXSTR, "Inconsistent Roi accross board patches");
1204 factOut (kFatal, 1, str);
1205 goto EndBuf;
1206 }
1207
1208 }
1209 for (jr = 1; jr < 9; jr++) {
1210 roi[jr] =
1211 ntohs (rd[i].
1212 rBuf->S[head_len / 2 + 2 + jr * (roi[jr-1] + 4)]);
1213
1214 }
1215*/
1216 //we have a complete buffer, copy to WORK area
1217
1218
1219 //End of the header. to channels now
1220 int eStart = 36;
1221 for (int ePatchesCount = 0; ePatchesCount<4*9;ePatchesCount++)
1222 {
1223 rd[i].rBuf->S[eStart+0] = ntohs(rd[i].rBuf->S[eStart+0]);//id
1224 rd[i].rBuf->S[eStart+1] = ntohs(rd[i].rBuf->S[eStart+1]);//start_cell
1225 rd[i].rBuf->S[eStart+2] = ntohs(rd[i].rBuf->S[eStart+2]);//roi
1226 rd[i].rBuf->S[eStart+3] = ntohs(rd[i].rBuf->S[eStart+3]);//filling
1227
1228 eStart += 4+rd[i].rBuf->S[eStart+2];//skip the pixel data
1229 }
1230
1231 //channels done. footer now
1232 // rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//package_crc
1233 // eCount++;
1234 // rd[i].rBuf->S[eStart+eCount] = ntohs(rd[i].rBuf->S[eStart+eCount]);//end_package_flag
1235 // snprintf(str, MXSTR, "Bytes read %d bytes swapped %d", jrd, eCount*2);
1236 // factOut(kInfo, 000, str);
1237 //ETIENNE end of bytes swapping already
1238
1239
1240 int jr, kr;
1241 int checkRoi;
1242 int roiHopper = head_len/2 + 2; //points to the very first roi
1243 roi[0] = rd[i].rBuf->S[roiHopper];
1244 roiHopper += roi[0]+4;//skip to the second roi (i.e. next board, same patch-pixel)
1245 for (kr = 1; kr < 4; kr++)
1246 {
1247 checkRoi = rd[i].rBuf->S[roiHopper];
1248 if (checkRoi != roi[0])
1249 {
1250 snprintf (str, MXSTR, "Inconsistent Roi accross boards B=%d, expected %d, got %d", kr, checkRoi, roi[0]);
1251 factOut (kError, 1, str);
1252 goto EndBuf;
1253 }
1254 roiHopper += checkRoi+4;
1255 }
1256 //roiHopper now points to the first pixel of board 2. Do the 8 remaining pixels
1257 for (jr = 1; jr < 9; jr++) {
1258 roi[jr] = rd[i].rBuf->S[roiHopper];
1259 checkRoi = roi[jr];
1260 for (kr = 1; kr < 4; kr++)
1261 {
1262 roiHopper += checkRoi+4;
1263 checkRoi = rd[i].rBuf->S[roiHopper];
1264 if (checkRoi != roi[jr])
1265 {
1266 snprintf (str, MXSTR, "Inconsistent Roi accross patches B=%d P=%d, expected %d, got %d", kr, jr, roi[jr], checkRoi);
1267 factOut (kFatal, 1, str);
1268 goto EndBuf;
1269 }
1270 }
1271 }
1272 //get index into mBuffer for this event (create if needed)
1273
1274 int actid;
1275 if (g_useFTM > 0)
1276 actid = rd[i].evtID;
1277 else
1278 actid = rd[i].ftmID;
1279
1280 evID = mBufEvt (rd[i].evtID, rd[i].runID, roi, i,
1281 rd[i].fadLen, rd[i].ftmTyp, rd[i].ftmID,
1282 rd[i].evtID);
1283
1284 if (evID < -1000) {
1285 goto EndBuf; //not usable board/event/run --> skip it
1286 }
1287 if (evID < 0) { //no space left, retry later
1288#ifdef EVTDEBUG
1289 if (rd[i].bufLen != 0) {
1290 snprintf (str, MXSTR, "something screwed up");
1291 factOut (kFatal, 1, str);
1292 }
1293#endif
1294 xwait.tv_sec = 0;
1295 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1296 nanosleep (&xwait, NULL);
1297 goto EndBuf1; //hope there is free space next round
1298 }
1299 //we have a valid entry in mBuffer[]; fill it
1300
1301#ifdef EVTDEBUG
1302 int xchk = memcmp (&rd[i].xBuf->B[0], &rd[i].rBuf->B[0],
1303 rd[i].fadLen);
1304 if (xchk != 0) {
1305 snprintf (str, MXSTR, "ERROR OVERWRITE %d %d on port %d",
1306 xchk, rd[i].fadLen, i);
1307 factOut (kFatal, 1, str);
1308
1309 uint iq;
1310 for (iq = 0; iq < rd[i].fadLen; iq++) {
1311 if (rd[i].rBuf->B[iq] != rd[i].xBuf->B[iq]) {
1312 snprintf (str, MXSTR, "ERROR %4d %4d %x %x", i, iq,
1313 rd[i].rBuf->B[iq], rd[i].xBuf->B[iq]);
1314 factOut (kFatal, 1, str);
1315 }
1316 }
1317 }
1318#endif
1319 int qncpy = 0;
1320 boardId = b;
1321 int fadBoard = rd[i].rBuf->S[12];
1322 int fadCrate = fadBoard / 256;
1323 if (boardId != (fadCrate * 10 + fadBoard % 256)) {
1324 snprintf (str, MXSTR, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)",
1325 boardId, fadBoard, fadCrate, fadBoard % 256);
1326 factOut (kWarn, 301, str);
1327 }
1328 if (mBuffer[evID].board[boardId] != -1) {
1329 snprintf (str, MXSTR,
1330 "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d",
1331 evID, boardId, i, rd[i].fadLen,
1332 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1333 rd[i].rBuf->B[rd[i].bufPos - 2],
1334 rd[i].rBuf->B[rd[i].bufPos - 1]);
1335 factOut (kWarn, 501, str);
1336 goto EndBuf; //--> skip Board
1337 }
1338
1339 int iDx = evtIdx[evID]; //index into evtCtrl
1340
1341 memcpy (&mBuffer[evID].FADhead[boardId].start_package_flag,
1342 &rd[i].rBuf->S[0], head_len);
1343 qncpy += head_len;
1344
1345 src = head_len / 2;
1346 for (px = 0; px < 9; px++) { //different sort in FAD board.....
1347 for (drs = 0; drs < 4; drs++) {
1348 pixH = rd[i].rBuf->S[src++]; // ID
1349 pixC = rd[i].rBuf->S[src++]; // start-cell
1350 pixR = rd[i].rBuf->S[src++]; // roi
1351//here we should check if pixH is correct ....
1352
1353 pixS = boardId * 36 + drs * 9 + px;
1354 src++;
1355
1356
1357 mBuffer[evID].fEvent->StartPix[pixS] = pixC;
1358 dest = pixS * roi[0];
1359 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1360 &rd[i].rBuf->S[src], roi[0] * 2);
1361 qncpy += roi[0] * 2;
1362 src += pixR;
1363
1364 if (px == 8) {
1365 tmS = boardId * 4 + drs;
1366 if (pixR > roi[0]) { //and we have additional TM info
1367 dest = tmS * roi[0] + NPIX * roi[0];
1368 int srcT = src - roi[0];
1369 mBuffer[evID].fEvent->StartTM[tmS] =
1370 (pixC + pixR - roi[0]) % 1024;
1371 memcpy (&mBuffer[evID].fEvent->Adc_Data[dest],
1372 &rd[i].rBuf->S[srcT], roi[0] * 2);
1373 qncpy += roi[0] * 2;
1374 } else {
1375 mBuffer[evID].fEvent->StartTM[tmS] = -1;
1376 //ETIENNE because the TM channels are always processed during drs calib,
1377 //set them to zero if they are not present
1378 //I suspect that it may be more efficient to set all the allocated mem to
1379 //zero when allocating it
1380// dest = tmS*roi[0] + NPIX*roi[0];
1381 // bzero(&mBuffer[evID].fEvent->Adc_Data[dest],roi[0]*2);
1382 }
1383 }
1384 }
1385 } // now we have stored a new board contents into Event structure
1386
1387 mBuffer[evID].fEvent->NumBoards++;
1388 mBuffer[evID].board[boardId] = boardId;
1389 evtCtrl.evtStat[iDx]++;
1390 evtCtrl.pcTime[iDx] = g_actTime;
1391
1392 if (++mBuffer[evID].nBoard >= actBoards) {
1393 int qnrun = 0;
1394 if (mBuffer[evID].runNum != actrun) { // have we already reported first event of this run ???
1395 actrun = mBuffer[evID].runNum;
1396 int ir;
1397 for (ir = 0; ir < MAX_RUN; ir++) {
1398 qnrun++;
1399 if (runCtrl[ir].runId == actrun) {
1400 if (++runCtrl[ir].lastEvt == 0) {
1401 gotNewRun (actrun, mBuffer[evID].FADhead);
1402 snprintf (str, MXSTR, "gotNewRun called for run %d, event %d",
1403 mBuffer[evID].runNum,
1404 mBuffer[evID].evNum);
1405 factOut (kInfo, 1, str);
1406 break;
1407 }
1408 }
1409 }
1410 }
1411 snprintf (str, MXSTR,
1412 "%5d complete event roi %4d roiTM %d cpy %8d %5d",
1413 mBuffer[evID].evNum, roi[0], roi[8] - roi[0],
1414 qncpy, qnrun);
1415 factOut (kDebug, -1, str);
1416
1417 //complete event read ---> flag for next processing
1418 evtCtrl.evtStat[iDx] = 99;
1419 gi.evtTot++;
1420 }
1421
1422 EndBuf:
1423 rd[i].bufTyp = 0; //ready to read next header
1424 rd[i].bufLen = frst_len;
1425 rd[i].bufPos = 0;
1426 EndBuf1:
1427 ;
1428 }
1429
1430 } else { //we are reading event header
1431 rd[i].bufPos += jrd;
1432 rd[i].bufLen -= jrd;
1433 if (rd[i].bufPos >= minLen) { //sufficient data to take action
1434 //check if startflag correct; else shift block ....
1435 for (k = 0; k < rd[i].bufPos - 1; k++) {
1436 if (rd[i].rBuf->B[k] == start.B[1]
1437 && rd[i].rBuf->B[k + 1] == start.B[0])
1438 break;
1439 }
1440 rd[i].skip += k;
1441
1442 if (k >= rd[i].bufPos - 1) { //no start of header found
1443 rd[i].bufPos = 0;
1444 rd[i].bufLen = head_len;
1445 } else if (k > 0) {
1446 rd[i].bufPos -= k;
1447 rd[i].bufLen += k;
1448 memcpy (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1449 rd[i].bufPos);
1450#ifdef EVTDEBUG
1451 memcpy (&rd[i].xBuf->B[0], &rd[i].xBuf->B[k],
1452 rd[i].bufPos);
1453#endif
1454 }
1455 if (rd[i].bufPos >= minLen) {
1456 if (rd[i].skip > 0) {
1457 snprintf (str, MXSTR, "Skipped %d bytes on port %d",
1458 rd[i].skip, i);
1459 factOut (kInfo, 666, str);
1460 rd[i].skip = 0;
1461 }
1462 goodhed++;
1463
1464 // Swap everything except start_package_flag.
1465 // It is to difficult to find out where it is used how,
1466 // but it doesn't really matter because it is not really
1467 // used anywehere else
1468// rd[i].rBuf->S[1] = ntohs(rd[i].rBuf->S[1]); // package_length
1469 rd[i].rBuf->S[2] = ntohs(rd[i].rBuf->S[2]); // version_no
1470 rd[i].rBuf->S[3] = ntohs(rd[i].rBuf->S[3]); // PLLLCK
1471 rd[i].rBuf->S[4] = ntohs(rd[i].rBuf->S[4]); // trigger_crc
1472 rd[i].rBuf->S[5] = ntohs(rd[i].rBuf->S[5]); // trigger_type
1473
1474 rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]); // board id
1475 rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]); // adc_clock_phase_shift
1476 rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]); // number_of_triggers_to_generate
1477 rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]); // trigger_generator_prescaler
1478
1479 rd[i].rBuf->I[3] = ntohl(rd[i].rBuf->I[3]); // trigger_id
1480 rd[i].rBuf->I[4] = ntohl(rd[i].rBuf->I[4]); // fad_evt_counter
1481 rd[i].rBuf->I[5] = ntohl(rd[i].rBuf->I[5]); // REFCLK_frequency
1482
1483 rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]); // runnumber;
1484 rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]); // time;
1485
1486 for (int s=24;s<24+NTemp+NDAC;s++)
1487 rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac
1488
1489 rd[i].fadLen = ntohs(rd[i].rBuf->S[1]) * 2;
1490 rd[i].fadVers = rd[i].rBuf->S[2];
1491 rd[i].ftmTyp = rd[i].rBuf->S[5];
1492 rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt)
1493 rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt)
1494 rd[i].runID = rd[i].rBuf->I[11];
1495 rd[i].bufTyp = 1; //ready to read full record
1496 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1497
1498#ifdef EVTDEBUG
1499 int fadboard = rd[i].rBuf->S[12];
1500 int fadcrate = fadboard / 256;
1501 fadboard = (fadcrate * 10 + fadboard % 256);
1502 snprintf (str, MXSTR,
1503 "sk %3d head: %5d %5d %5d %10d %4d %6d", i,
1504 rd[i].fadLen, rd[i].evtID, rd[i].ftmID,
1505 rd[i].runID, fadboard, jrd);
1506 factOut (kDebug, 1, str);
1507#endif
1508
1509 if (rd[i].runID == 0)
1510 rd[i].runID = gi_myRun;
1511
1512 if (rd[i].bufLen <= head_len || rd[i].bufLen > MAX_LEN) {
1513 snprintf (str, MXSTR,
1514 "Illegal event-length %d on port %d, %d expected.", rd[i].bufLen, i, head_len);
1515 factOut (kFatal, 881, str);
1516 rd[i].bufLen = 100000; //?
1517 }
1518
1519 int fadBoard = rd[i].rBuf->S[12];
1520 debugHead (i, fadBoard, rd[i].rBuf);
1521 debugRead (i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, -1, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid;-1=start event
1522 } else {
1523 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1524 }
1525 } else {
1526 debugRead (i, jrd, 0, 0, 0, -2, tsec, tusec); // i=socket; jrd=#bytes; ievt=eventid; -2=start event, unknown id yet
1527 }
1528
1529 } //end interpreting last read
1530}
1531 } //end of successful read anything
1532 } //finished trying to read all sockets
1533
1534#ifdef EVTDEBUG
1535 snprintf (str, MXSTR, "Loop ---- %3d --- %8d", numokx, jrdx);
1536 factOut (kDebug, -1, str);
1537#endif
1538
1539 gi.numRead[numok]++;
1540
1541 g_actTime = time (NULL);
1542 if (g_actTime > gi_SecTime) {
1543 gi_SecTime = g_actTime;
1544
1545
1546 //loop over all active events and flag those older than read-timeout
1547 //delete those that are written to disk ....
1548
1549 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1550 if (kd < 0)
1551 kd += (MAX_EVT * MAX_RUN);
1552
1553 gj.bufNew = gj.bufEvt = 0;
1554 int k1 = evtCtrl.frstPtr;
1555 for (k = k1; k < (k1 + kd); k++) {
1556 int k0 = k % (MAX_EVT * MAX_RUN);
1557//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1558
1559 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 92) {
1560 gj.bufNew++; //incomplete event in Buffer
1561 if (evtCtrl.evtStat[k0] < 90
1562 && evtCtrl.pcTime[k0] < g_actTime - 10) {
1563 int id = evtCtrl.evtBuf[k0];
1564 snprintf (str, MXSTR, "%5d skip short evt %8d %8d %2d",
1565 mBuffer[id].evNum, evtCtrl.evtBuf[k0], k0,
1566 evtCtrl.evtStat[k0]);
1567 factOut (kWarn, 601, str);
1568
1569 int ik,ib,jb;
1570 ik=0;
1571 for (ib=0; ib<NBOARDS; ib++) {
1572 if (ib%10==0) {
1573 snprintf (&str[ik], MXSTR, "|");
1574 ik++;
1575 }
1576 jb = mBuffer[id].board[ib];
1577 if ( jb<0 ) {
1578 if (gi_NumConnect[b] >0 ) {
1579 snprintf (&str[ik], MXSTR, ".");
1580 } else {
1581 snprintf (&str[ik], MXSTR, "x");
1582 }
1583 } else {
1584 snprintf (&str[ik], MXSTR, "%d",jb%10);
1585 }
1586 ik++;
1587 }
1588 snprintf (&str[ik], MXSTR, "|");
1589 factOut (kWarn, 601, str);
1590
1591 evtCtrl.evtStat[k0] = 91; //timeout for incomplete events
1592 gi.evtSkp++;
1593 gi.evtTot++;
1594 gj.evtSkip++;
1595 }
1596 } else if (evtCtrl.evtStat[k0] >= 9000 //'delete'
1597 || evtCtrl.evtStat[k0] == 0) { //'useless'
1598
1599 int id = evtCtrl.evtBuf[k0];
1600 snprintf (str, MXSTR, "%5d free event buffer, nb=%3d",
1601 mBuffer[id].evNum, mBuffer[id].nBoard);
1602 factOut (kDebug, -1, str);
1603 mBufFree (id); //event written--> free memory
1604 evtCtrl.evtStat[k0] = -1;
1605 gj.evtWrite++;
1606 gj.rateWrite++;
1607 } else if (evtCtrl.evtStat[k0] >= 95) {
1608 gj.bufEvt++; //complete event in Buffer
1609 }
1610
1611 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0) {
1612 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1613 }
1614 }
1615
1616
1617 gj.deltaT = 1000; //temporary, must be improved
1618
1619 int b;
1620 for (b = 0; b < NBOARDS; b++)
1621 gj.totBytes[b] += gj.rateBytes[b];
1622 gj.totMem = g_maxMem;
1623 if (gj.maxMem > gj.xxxMem)
1624 gj.xxxMem = gj.maxMem;
1625 if (gj.maxEvt > gj.xxxEvt)
1626 gj.xxxEvt = gj.maxEvt;
1627
1628 factStat (gj);
1629 factStatNew (gi);
1630 gj.rateNew = gj.rateWrite = 0;
1631 gj.maxMem = gj.usdMem;
1632 gj.maxEvt = gj.bufTot;
1633 for (b = 0; b < NBOARDS; b++)
1634 gj.rateBytes[b] = 0;
1635 }
1636
1637 if (numok > 0)
1638 numok2 = 0;
1639 else if (numok2++ > 3) {
1640 if (g_runStat == 1) {
1641 xwait.tv_sec = 1;
1642 xwait.tv_nsec = 0; // hibernate for 1 sec
1643 } else {
1644 xwait.tv_sec = 0;
1645 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1646 }
1647 nanosleep (&xwait, NULL);
1648 }
1649
1650 } //and do next loop over all sockets ...
1651
1652
1653 snprintf (str, MXSTR, "Stop reading ... RESET=%d", g_reset);
1654 factOut (kInfo, -1, str);
1655
1656 if (g_reset > 0) {
1657 gi_reset = g_reset;
1658 gi_resetR = gi_reset % 10; //shall we stop reading ?
1659 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1660 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1661 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1662 g_reset = 0;
1663 } else {
1664 gi_reset = 0;
1665 if (g_runStat == -1)
1666 gi_resetR = 1;
1667 else
1668 gi_resetR = 7;
1669 gi_resetS = 7; //close all sockets
1670 gi_resetW = 7; //close all files
1671 gi_resetX = 0;
1672
1673 //inform others we have to quit ....
1674 gi_runStat = -11; //inform all that no update to happen any more
1675 gj.readStat = -11; //inform all that no update to happen any more
1676 }
1677
1678 if (gi_resetS > 0) {
1679 //must close all open sockets ...
1680 snprintf (str, MXSTR, "Close all sockets...");
1681 factOut (kInfo, -1, str);
1682 for (i = 0; i < MAX_SOCK; i++) {
1683 if (rd[i].sockStat == 0) {
1684 GenSock (-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1685 if (i % 7 == 0) {
1686// gi_NumConnect[i / 7]--;
1687 gi_NumConnect[i / 7]-= cntsock ;
1688 gi.numConn[i / 7]--;
1689 gj.numConn[i / 7]--;
1690 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1691 rd[i / 7].sockStat = -1; //and try to open asap
1692 }
1693 }
1694 }
1695 }
1696
1697
1698 if (gi_resetR > 0) {
1699 //flag all events as 'read finished'
1700 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1701 if (kd < 0)
1702 kd += (MAX_EVT * MAX_RUN);
1703
1704 int k1 = evtCtrl.frstPtr;
1705 for (k = k1; k < (k1 + kd); k++) {
1706 int k0 = k % (MAX_EVT * MAX_RUN);
1707 if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 90) {
1708 evtCtrl.evtStat[k0] = 91;
1709 gi.evtSkp++;
1710 gi.evtTot++;
1711 }
1712 }
1713
1714 xwait.tv_sec = 0;
1715 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1716 nanosleep (&xwait, NULL);
1717
1718 //and clear all buffers (might have to wait until all others are done)
1719 int minclear;
1720 if (gi_resetR == 1) {
1721 minclear = 900;
1722 snprintf (str, MXSTR, "Drain all buffers ...");
1723 } else {
1724 minclear = 0;
1725 snprintf (str, MXSTR, "Flush all buffers ...");
1726 }
1727 factOut (kInfo, -1, str);
1728
1729 int numclear = 1;
1730 while (numclear > 0) {
1731 numclear = 0;
1732 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1733 if (kd < 0)
1734 kd += (MAX_EVT * MAX_RUN);
1735
1736 int k1 = evtCtrl.frstPtr;
1737 for (k = k1; k < (k1 + kd); k++) {
1738 int k0 = k % (MAX_EVT * MAX_RUN);
1739 if (evtCtrl.evtStat[k0] > minclear) {
1740 int id = evtCtrl.evtBuf[k0];
1741 snprintf (str, MXSTR, "ev %5d free event buffer, nb=%3d",
1742 mBuffer[id].evNum, mBuffer[id].nBoard);
1743 factOut (kDebug, -1, str);
1744 mBufFree (id); //event written--> free memory
1745 evtCtrl.evtStat[k0] = -1;
1746 } else if (evtCtrl.evtStat[k0] > 0)
1747 numclear++; //writing is still ongoing...
1748
1749 if (k0 == evtCtrl.frstPtr && evtCtrl.evtStat[k0] < 0)
1750 evtCtrl.frstPtr = (evtCtrl.frstPtr + 1) % (MAX_EVT * MAX_RUN);
1751 }
1752
1753 xwait.tv_sec = 0;
1754 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1755 nanosleep (&xwait, NULL);
1756 }
1757 }
1758
1759 if (gi_reset > 0) {
1760 if (gi_resetW > 0) {
1761 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1762 }
1763 if (gi_resetX > 0) {
1764 xwait.tv_sec = gi_resetX;
1765 xwait.tv_nsec = 0;
1766 nanosleep (&xwait, NULL);
1767 }
1768
1769 snprintf (str, MXSTR, "Continue read Process ...");
1770 factOut (kInfo, -1, str);
1771 gi_reset = 0;
1772 goto START;
1773 }
1774
1775
1776
1777 snprintf (str, MXSTR, "Exit read Process ...");
1778 factOut (kInfo, -1, str);
1779 gi_runStat = -99;
1780 gj.readStat = -99;
1781 factStat (gj);
1782 factStatNew (gi);
1783 return 0;
1784
1785} /*-----------------------------------------------------------------*/
1786
1787
1788void *
1789subProc (void *thrid)
1790{
1791 int64_t threadID;
1792 int numWait, numProc, k, jret;
1793 struct timespec xwait;
1794
1795// int32_t cntr ;
1796
1797 threadID = (int64_t)thrid;
1798
1799 snprintf (str, MXSTR, "Starting sub-process-thread %ld", threadID);
1800 factOut (kInfo, -1, str);
1801
1802 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1803 numWait = numProc = 0;
1804 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1805 if (kd < 0)
1806 kd += (MAX_EVT * MAX_RUN);
1807
1808 int k1 = evtCtrl.frstPtr;
1809 for (k = k1; k < (k1 + kd); k++) {
1810 int k0 = k % (MAX_EVT * MAX_RUN);
1811
1812 if (evtCtrl.evtStat[k0] == 1000 + threadID) {
1813 if (gi_resetR > 1) { //we are asked to flush buffers asap
1814 jret = 9100; //flag to be deleted
1815 } else {
1816 int id = evtCtrl.evtBuf[k0];
1817
1818
1819 jret =
1820 subProcEvt (threadID, mBuffer[id].FADhead,
1821 mBuffer[id].fEvent, mBuffer[id].buffer);
1822
1823
1824 if (jret <= threadID) {
1825 snprintf (str, MXSTR,
1826 "Process %ld wants to send event to process %d... not allowed.",
1827 threadID, jret);
1828 factOut (kError, -1, str);
1829 jret = 5300;
1830 } else if (jret <= 0)
1831 jret = 9200 + threadID; //flag as 'to be deleted'
1832 else if (jret >= gi_maxProc)
1833 jret = 5200 + threadID; //flag as 'to be written'
1834 else
1835 jret = 1000 + jret; //flag for next proces
1836 }
1837 evtCtrl.evtStat[k0] = jret;
1838 numProc++;
1839 } else if (evtCtrl.evtStat[k0] < 1000 + threadID)
1840 numWait++;
1841 }
1842
1843 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1844 snprintf (str, MXSTR, "Exit subProcessing in process %ld", threadID);
1845 factOut (kInfo, -1, str);
1846 return 0;
1847 }
1848 if (numProc == 0) {
1849 //seems we have nothing to do, so sleep a little
1850 xwait.tv_sec = 0;
1851 xwait.tv_nsec = 2000000; // sleep for ~2 msec
1852 nanosleep (&xwait, NULL);
1853 }
1854 }
1855
1856 snprintf (str, MXSTR, "Ending sub-process-thread %ld", threadID);
1857 factOut (kInfo, -1, str);
1858 return 0;
1859} /*-----------------------------------------------------------------*/
1860
1861
1862void *
1863procEvt (void *ptr)
1864{
1865/* *** main loop processing file, including SW-trigger */
1866 int numProc, numWait;
1867 int status, j;
1868 struct timespec xwait;
1869 char str[MXSTR];
1870
1871
1872
1873 int lastRun = 0; //usually run from last event still valid
1874
1875// cpu_set_t mask;
1876// int cpu = 1; //process thread (will be several in final version)
1877
1878 snprintf (str, MXSTR, "Starting process-thread with %d subprocesses",
1879 gi_maxProc);
1880 factOut (kInfo, -1, str);
1881
1882/* CPU_ZERO initializes all the bits in the mask to zero. */
1883// CPU_ZERO (&mask);
1884/* CPU_SET sets only the bit corresponding to cpu. */
1885// CPU_SET( 0 , &mask ); leave for system
1886// CPU_SET( 1 , &mask ); used by write process
1887// CPU_SET (2, &mask);
1888// CPU_SET (3, &mask);
1889// CPU_SET (4, &mask);
1890// CPU_SET (5, &mask);
1891// CPU_SET (6, &mask);
1892// CPU_SET( 7 , &mask ); used by read process
1893/* sched_setaffinity returns 0 in success */
1894// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1895// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1896// factOut (kWarn, -1, str);
1897// }
1898
1899
1900 pthread_t thread[100];
1901 int th_ret[100];
1902
1903 for (long long k = 0; k < gi_maxProc; k++) {
1904 th_ret[k] = pthread_create (&thread[k], NULL, subProc, (void *) k);
1905 }
1906
1907 while (g_runStat > -2) { //in case of 'exit' we still must process pending events
1908
1909 numWait = numProc = 0;
1910 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
1911 if (kd < 0)
1912 kd += (MAX_EVT * MAX_RUN);
1913
1914 int k1 = evtCtrl.frstPtr;
1915 for (int k = k1; k < (k1 + kd); k++) {
1916 int k0 = k % (MAX_EVT * MAX_RUN);
1917//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
1918 if (evtCtrl.evtStat[k0] > 90 && evtCtrl.evtStat[k0] < 1000) {
1919
1920 if (gi_resetR > 1) { //we are asked to flush buffers asap
1921 evtCtrl.evtStat[k0] = 9991;
1922 } else {
1923
1924//-------- it is better to open the run already here, so call can be used to initialize
1925//-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1926 int id = evtCtrl.evtBuf[k0];
1927 uint32_t irun = mBuffer[id].runNum;
1928 int32_t ievt = mBuffer[id].evNum;
1929 if (runCtrl[lastRun].runId == irun) {
1930 j = lastRun;
1931 } else {
1932 //check which fileID to use (or open if needed)
1933 for (j = 0; j < MAX_RUN; j++) {
1934 if (runCtrl[j].runId == irun)
1935 break;
1936 }
1937 if (j >= MAX_RUN) {
1938 snprintf (str, MXSTR,
1939 "procEvt: Can not find run %d for event %d in %d",
1940 irun, ievt, id);
1941 factOut (kFatal, 901, str);
1942 }
1943 lastRun = j;
1944 }
1945
1946 if (runCtrl[j].fileId < 0) {
1947//---- we need to open a new run ==> make sure all older runs are
1948//---- finished and marked to be closed ....
1949 int j1;
1950 for (j1 = 0; j1 < MAX_RUN; j1++) {
1951 if (runCtrl[j1].fileId == 0) {
1952 runCtrl[j1].procId = 2; //--> do no longer accept events for processing
1953//---- problem: processing still going on ==> must wait for closing ....
1954 snprintf (str, MXSTR,
1955 "procEvt: Finished run since new one opened %d",
1956 runCtrl[j1].runId);
1957 runFinish1 (runCtrl[j1].runId);
1958 }
1959
1960 }
1961
1962 actRun.Version = 1;
1963 actRun.RunType = -1; //to be adapted
1964
1965 actRun.Nroi = runCtrl[j].roi0;
1966 actRun.NroiTM = runCtrl[j].roi8;
1967//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
1968// if (actRun.Nroi == actRun.NroiTM)
1969// actRun.NroiTM = 0;
1970 actRun.RunTime = runCtrl[j].firstTime;
1971 actRun.RunUsec = runCtrl[j].firstTime;
1972 actRun.NBoard = NBOARDS;
1973 actRun.NPix = NPIX;
1974 actRun.NTm = NTMARK;
1975 actRun.Nroi = mBuffer[id].nRoi;
1976 memcpy (actRun.FADhead, mBuffer[id].FADhead,
1977 NBOARDS * sizeof (PEVNT_HEADER));
1978
1979 runCtrl[j].fileHd =
1980 runOpen (irun, &actRun, sizeof (actRun));
1981 if (runCtrl[j].fileHd == NULL) {
1982 snprintf (str, MXSTR,
1983 "procEvt: Could not open a file for run %d (runOpen failed)", irun);
1984 factOut (kError, 502, str);
1985 runCtrl[j].fileId = 91;
1986 runCtrl[j].procId = 91;
1987 } else {
1988 snprintf (str, MXSTR, "procEvt: Opened new file for run %d (evt=%d)",
1989 irun, ievt);
1990 factOut (kInfo, -1, str);
1991 runCtrl[j].fileId = 0;
1992 runCtrl[j].procId = 0;
1993 }
1994
1995 }
1996//-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
1997 if (runCtrl[j].procId == 0) {
1998 if (runCtrl[j].closeTime < g_actTime
1999 || runCtrl[j].lastTime < g_actTime - 300
2000 || runCtrl[j].maxEvt <= runCtrl[j].procEvt) {
2001 snprintf (str, MXSTR,
2002 "procEvt: Reached end of run condition for run %d",
2003 irun);
2004 factOut (kInfo, 502, str);
2005 runFinish1 (runCtrl[j].runId);
2006 runCtrl[j].procId = 1;
2007 }
2008 }
2009 if (runCtrl[j].procId != 0) {
2010 snprintf (str, MXSTR,
2011 "procEvt: Skip event %d because no active run %d", ievt,
2012 irun);
2013 factOut (kDebug, 502, str);
2014 evtCtrl.evtStat[k0] = 9091;
2015 } else {
2016//--------
2017//--------
2018 id = evtCtrl.evtBuf[k0];
2019 int itevt = mBuffer[id].trgNum;
2020 int itrg = mBuffer[id].trgTyp;
2021 int roi = mBuffer[id].nRoi;
2022 int roiTM = mBuffer[id].nRoiTM;
2023
2024//make sure unused pixels/tmarks are cleared to zero
2025//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
2026// if (roiTM == roi)
2027// roiTM = 0;
2028 int ip, it, dest, ib;
2029 for (ip = 0; ip < NPIX; ip++) {
2030 if (mBuffer[id].fEvent->StartPix[ip] == -1) {
2031 dest = ip * roi;
2032 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
2033 }
2034 }
2035
2036 for (it = 0; it < NTMARK; it++) {
2037 if (mBuffer[id].fEvent->StartTM[it] == -1) {
2038 dest = it * roi + NPIX * roi;
2039 bzero (&mBuffer[id].fEvent->Adc_Data[dest], roi * 2);
2040 }
2041 }
2042
2043
2044//and set correct event header ; also check for consistency in event (not yet)
2045 mBuffer[id].fEvent->Roi = roi;
2046 mBuffer[id].fEvent->RoiTM = roiTM;
2047 mBuffer[id].fEvent->EventNum = ievt;
2048 mBuffer[id].fEvent->TriggerNum = itevt;
2049 mBuffer[id].fEvent->TriggerType = itrg;
2050 mBuffer[id].fEvent->Errors[0] = mBuffer[id].Errors[0];
2051 mBuffer[id].fEvent->Errors[1] = mBuffer[id].Errors[1];
2052 mBuffer[id].fEvent->Errors[2] = mBuffer[id].Errors[2];
2053 mBuffer[id].fEvent->Errors[3] = mBuffer[id].Errors[3];
2054 mBuffer[id].fEvent->SoftTrig = 0;
2055
2056
2057 for (ib = 0; ib < NBOARDS; ib++) {
2058 if (mBuffer[id].board[ib] == -1) { //board is not read
2059 mBuffer[id].FADhead[ib].start_package_flag = 0;
2060 mBuffer[id].fEvent->BoardTime[ib] = 0;
2061 } else {
2062 mBuffer[id].fEvent->BoardTime[ib] =
2063 mBuffer[id].FADhead[ib].time;
2064 }
2065 }
2066 int i = eventCheck (mBuffer[id].runNum, mBuffer[id].FADhead,
2067 mBuffer[id].fEvent);
2068 gi.procTot++;
2069 numProc++;
2070
2071 if (i < 0) {
2072 evtCtrl.evtStat[k0] = 9999; //flag event to be skipped
2073 gi.procErr++;
2074 } else {
2075 evtCtrl.evtStat[k0] = 1000;
2076 runCtrl[j].procEvt++;
2077 }
2078 }
2079 }
2080 } else if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 90) {
2081 numWait++;
2082 }
2083 }
2084
2085 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2086 snprintf (str, MXSTR, "Exit Processing Process ...");
2087 factOut (kInfo, -1, str);
2088 gp_runStat = -22; //==> we should exit
2089 gj.procStat = -22; //==> we should exit
2090 return 0;
2091 }
2092
2093 if (numProc == 0) {
2094 //seems we have nothing to do, so sleep a little
2095 xwait.tv_sec = 0;
2096 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2097 nanosleep (&xwait, NULL);
2098 }
2099 gp_runStat = gi_runStat;
2100 gj.procStat = gj.readStat;
2101
2102 }
2103
2104 //we are asked to abort asap ==> must flag all remaining events
2105 // when gi_runStat claims that all events are in the buffer...
2106
2107 snprintf (str, MXSTR, "Abort Processing Process ...");
2108 factOut (kInfo, -1, str);
2109 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2110 if (kd < 0)
2111 kd += (MAX_EVT * MAX_RUN);
2112
2113 for (int k = 0; k < gi_maxProc; k++) {
2114 pthread_join (thread[k], (void **) &status);
2115 }
2116
2117 int k1 = evtCtrl.frstPtr;
2118 for (int k = k1; k < (k1 + kd); k++) {
2119 int k0 = k % (MAX_EVT * MAX_RUN);
2120 if (evtCtrl.evtStat[k0] >= 0 && evtCtrl.evtStat[k0] < 1000) {
2121 evtCtrl.evtStat[k0] = 9800; //flag event as 'processed'
2122 }
2123 }
2124
2125 gp_runStat = -99;
2126 gj.procStat = -99;
2127
2128 return 0;
2129
2130} /*-----------------------------------------------------------------*/
2131
2132int
2133CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
2134{
2135/* close run runId (all all runs if runId=0) */
2136/* return: 0=close scheduled / >0 already closed / <0 does not exist */
2137 int j;
2138
2139
2140 if (runId == 0) {
2141 for (j = 0; j < MAX_RUN; j++) {
2142 if (runCtrl[j].fileId == 0) { //run is open
2143 runCtrl[j].closeTime = closeTime;
2144 runCtrl[j].maxEvt = maxEvt;
2145 }
2146 }
2147 return 0;
2148 }
2149
2150 for (j = 0; j < MAX_RUN; j++) {
2151 if (runCtrl[j].runId == runId) {
2152 if (runCtrl[j].fileId == 0) { //run is open
2153 runCtrl[j].closeTime = closeTime;
2154 runCtrl[j].maxEvt = maxEvt;
2155 return 0;
2156 } else if (runCtrl[j].fileId < 0) { //run not yet opened
2157 runCtrl[j].closeTime = closeTime;
2158 runCtrl[j].maxEvt = maxEvt;
2159 return +1;
2160 } else { // run already closed
2161 return +2;
2162 }
2163 }
2164 } //we only reach here if the run was never created
2165 return -1;
2166
2167} /*-----------------------------------------------------------------*/
2168
2169
2170void *
2171writeEvt (void *ptr)
2172{
2173/* *** main loop writing event (including opening and closing run-files */
2174
2175 int numWrite, numWait;
2176 int k, j ;
2177 struct timespec xwait;
2178 char str[MXSTR];
2179
2180// cpu_set_t mask;
2181// int cpu = 1; //write thread
2182
2183 snprintf (str, MXSTR, "Starting write-thread");
2184 factOut (kInfo, -1, str);
2185
2186/* CPU_ZERO initializes all the bits in the mask to zero. */
2187// CPU_ZERO (&mask);
2188/* CPU_SET sets only the bit corresponding to cpu. */
2189// CPU_SET (cpu, &mask);
2190/* sched_setaffinity returns 0 in success */
2191// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
2192// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
2193// }
2194
2195 int lastRun = 0; //usually run from last event still valid
2196
2197 while (g_runStat > -2) {
2198
2199 numWait = numWrite = 0;
2200 int kd = evtCtrl.lastPtr - evtCtrl.frstPtr;
2201 if (kd < 0)
2202 kd += (MAX_EVT * MAX_RUN);
2203
2204 int k1 = evtCtrl.frstPtr;
2205 for (k = k1; k < (k1 + kd); k++) {
2206 int k0 = k % (MAX_EVT * MAX_RUN);
2207//would be better to use bitmaps for evtStat (allow '&' instead of multi-if)
2208 if (evtCtrl.evtStat[k0] > 5000 && evtCtrl.evtStat[k0] < 9000) {
2209
2210 if (gi_resetR > 1) { //we must drain the buffer asap
2211 evtCtrl.evtStat[k0] = 9904;
2212 } else {
2213
2214
2215 int id = evtCtrl.evtBuf[k0];
2216 uint32_t irun = mBuffer[id].runNum;
2217 int32_t ievt = mBuffer[id].evNum;
2218
2219 gi.wrtTot++;
2220 if (runCtrl[lastRun].runId == irun) {
2221 j = lastRun;
2222 } else {
2223 //check which fileID to use (or open if needed)
2224 for (j = 0; j < MAX_RUN; j++) {
2225 if (runCtrl[j].runId == irun)
2226 break;
2227 }
2228 if (j >= MAX_RUN) {
2229 snprintf (str, MXSTR,
2230 "writeEvt: Can not find run %d for event %d in %d",
2231 irun, ievt, id);
2232 factOut (kFatal, 901, str);
2233 gi.wrtErr++;
2234 }
2235 lastRun = j;
2236 }
2237
2238 if (runCtrl[j].fileId < 0) {
2239 actRun.Version = 1;
2240 actRun.RunType = -1; //to be adapted
2241
2242 actRun.Nroi = runCtrl[j].roi0;
2243 actRun.NroiTM = runCtrl[j].roi8;
2244//ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
2245// if (actRun.Nroi == actRun.NroiTM)
2246// actRun.NroiTM = 0;
2247 actRun.RunTime = runCtrl[j].firstTime;
2248 actRun.RunUsec = runCtrl[j].firstTime;
2249 actRun.NBoard = NBOARDS;
2250 actRun.NPix = NPIX;
2251 actRun.NTm = NTMARK;
2252 actRun.Nroi = mBuffer[id].nRoi;
2253 memcpy (actRun.FADhead, mBuffer[id].FADhead,
2254 NBOARDS * sizeof (PEVNT_HEADER));
2255
2256 runCtrl[j].fileHd =
2257 runOpen (irun, &actRun, sizeof (actRun));
2258 if (runCtrl[j].fileHd == NULL) {
2259 snprintf (str, MXSTR,
2260 "writeEvt: Could not open a file for run %d (runOpen failed)", irun);
2261 factOut (kError, 502, str);
2262 runCtrl[j].fileId = 91;
2263 } else {
2264 snprintf (str, MXSTR, "writeEvt: Opened new file for run %d (evt %d)",
2265 irun, ievt);
2266 factOut (kInfo, -1, str);
2267 runCtrl[j].fileId = 0;
2268 }
2269
2270 }
2271
2272 if (runCtrl[j].fileId != 0) {
2273 if (runCtrl[j].fileId < 0) {
2274 snprintf (str, MXSTR,
2275 "writeEvt: Never opened file for run %d", irun);
2276 factOut (kError, 123, str);
2277 } else if (runCtrl[j].fileId < 100) {
2278 snprintf (str, MXSTR, "writeEvt: File for run %d is closed",
2279 irun);
2280 factOut (kWarn, 123, str);
2281 runCtrl[j].fileId += 100;
2282 } else {
2283 snprintf (str, MXSTR, "writeEvt: File for run %d is closed",
2284 irun);
2285 factOut (kDebug, 123, str);
2286 }
2287 evtCtrl.evtStat[k0] = 9903;
2288 gi.wrtErr++;
2289 } else {
2290// snprintf (str, MXSTR,"write event %d size %d",ievt,sizeof (mBuffer[id]));
2291// factOut (kInfo, 504, str);
2292 int i = runWrite (runCtrl[j].fileHd, mBuffer[id].fEvent,
2293 sizeof (mBuffer[id]));
2294 if (i >= 0) {
2295 runCtrl[j].lastTime = g_actTime;
2296 runCtrl[j].actEvt++;
2297 evtCtrl.evtStat[k0] = 9901;
2298 snprintf (str, MXSTR,
2299 "%5d successfully wrote for run %d id %5d",
2300 ievt, irun, k0);
2301 factOut (kDebug, 504, str);
2302// gj.writEvt++ ;
2303 } else {
2304 snprintf (str, MXSTR, "writeEvt: Writing event for run %d failed (runWrite)",
2305 irun);
2306 factOut (kError, 503, str);
2307 evtCtrl.evtStat[k0] = 9902;
2308 gi.wrtErr++;
2309 }
2310
2311 if (i < 0
2312 || runCtrl[j].lastTime < g_actTime - 300
2313 || runCtrl[j].closeTime < g_actTime
2314 || runCtrl[j].maxEvt < runCtrl[j].actEvt) {
2315 int ii = 0;
2316 if (i < 0)
2317 ii = 1;
2318 else if (runCtrl[j].closeTime < g_actTime)
2319 ii = 2;
2320 else if (runCtrl[j].lastTime < g_actTime - 300)
2321 ii = 3;
2322 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2323 ii = 4;
2324
2325
2326
2327 //close run for whatever reason
2328 if (runCtrl[j].runId == gi_myRun)
2329 gi_myRun = g_actTime;
2330
2331 if (runCtrl[j].procId == 0) {
2332 runFinish1 (runCtrl[j].runId);
2333 runCtrl[j].procId = 92;
2334 }
2335
2336 runCtrl[j].closeTime = g_actTime - 1;
2337 i = runClose (runCtrl[j].fileHd, &runTail[j],
2338 sizeof (runTail[j]));
2339 if (i < 0) {
2340 snprintf (str, MXSTR, "writeEvt: Error closing run %d (runClose[1],i=%d)",
2341 runCtrl[j].runId, i);
2342 factOut (kError, 503, str);
2343 runCtrl[j].fileId = 92;
2344 } else {
2345 snprintf (str, MXSTR, "writeEvt: Closed run %d (ii[1]=%d)", irun,
2346 ii);
2347 factOut (kInfo, 503, str);
2348 runCtrl[j].fileId = 93;
2349 }
2350 }
2351 }
2352 }
2353 } else if (evtCtrl.evtStat[k0] > 0 && evtCtrl.evtStat[k0] < 9000)
2354 numWait++;
2355 }
2356
2357 //check if we should close a run (mainly when no event pending)
2358 //ETIENNE but first figure out which one is the latest run with a complete event.
2359 //i.e. max run Id and lastEvt >= 0
2360 //this condition is sufficient because all pending events were written already in the loop just above
2361 unsigned int maxStartedRun = 0;
2362 for (j=0;j<MAX_RUN;j++)
2363 {
2364 if ((runCtrl[j].runId > maxStartedRun) &&
2365 (runCtrl[j].lastEvt > -1) &&
2366 (runCtrl[j].runId != 0))
2367 maxStartedRun = runCtrl[j].runId;
2368 }
2369// snprintf(str, MXSTR, "Maximum runId: %d", maxStartedRun);
2370// factOut(kInfo, 000, str);
2371 //Also check if some files will never be opened
2372 //EDIT: this is completely useless, because as run Numbers are taken from FADs board,
2373 //I will never get run numbers for which no file is to be opened
2374 for (j=0;j<MAX_RUN;j++)
2375 {
2376 if ((runCtrl[j].fileId < 0) &&
2377 (runCtrl[j].runId < maxStartedRun) &&
2378 (runCtrl[j].runId != 0))
2379 {
2380 snprintf (str, MXSTR, "writeEvt: No file will be opened for run %ud. Last run: %ud (started)", runCtrl[j].runId, maxStartedRun);
2381 factOut (kInfo, 000, str);
2382 ;//TODO notify that this run will never be opened
2383 }
2384 }
2385 for (j = 0; j < MAX_RUN; j++) {
2386 if (runCtrl[j].fileId == 0
2387 && (runCtrl[j].closeTime < g_actTime
2388 || runCtrl[j].lastTime < g_actTime - 300
2389 || runCtrl[j].maxEvt <= runCtrl[j].actEvt
2390 || (runCtrl[j].runId < maxStartedRun && runCtrl[j].runId != 0))) //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
2391{
2392 if (runCtrl[j].runId == gi_myRun)
2393 gi_myRun = g_actTime;
2394 int ii = 0;
2395 if (runCtrl[j].closeTime < g_actTime)
2396 ii = 2;
2397 else if (runCtrl[j].lastTime < g_actTime - 300)
2398 ii = 3;
2399 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2400 ii = 4;
2401
2402 if (runCtrl[j].procId == 0) {
2403 runFinish1 (runCtrl[j].runId);
2404 runCtrl[j].procId = 92;
2405 }
2406
2407 runCtrl[j].closeTime = g_actTime - 1;
2408 int i = runClose (runCtrl[j].fileHd, &runTail[j],
2409 sizeof (runTail[j]));
2410 if (i < 0) {
2411 snprintf (str, MXSTR, "writeEvt: Error closing run %d (runClose[2],i=%d)",
2412 runCtrl[j].runId, i);
2413 factOut (kError, 506, str);
2414 runCtrl[j].fileId = 94;
2415 } else {
2416 snprintf (str, MXSTR, "writeEvt: Closed run %d (ii[2]=%d)",
2417 runCtrl[j].runId, ii);
2418 factOut (kInfo, 507, str);
2419 runCtrl[j].fileId = 95;
2420 }
2421 }
2422 }
2423
2424 if (numWrite == 0) {
2425 //seems we have nothing to do, so sleep a little
2426 xwait.tv_sec = 0;
2427 xwait.tv_nsec = 2000000; // sleep for ~2 msec
2428 nanosleep (&xwait, NULL);
2429 }
2430
2431 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
2432 snprintf (str, MXSTR, "Finish Write Process ...");
2433 factOut (kInfo, -1, str);
2434 gw_runStat = -22; //==> we should exit
2435 gj.writStat = -22; //==> we should exit
2436 goto closerun;
2437 }
2438 gw_runStat = gi_runStat;
2439 gj.writStat = gj.readStat;
2440
2441 }
2442
2443 //must close all open files ....
2444 snprintf (str, MXSTR, "Abort Writing Process ...");
2445 factOut (kInfo, -1, str);
2446
2447 closerun:
2448 snprintf (str, MXSTR, "Close all open files ...");
2449 factOut (kInfo, -1, str);
2450 for (j = 0; j < MAX_RUN; j++)
2451 if (runCtrl[j].fileId == 0) {
2452 if (runCtrl[j].runId == gi_myRun)
2453 gi_myRun = g_actTime;
2454
2455 if (runCtrl[j].procId == 0) {
2456 runFinish1 (runCtrl[j].runId);
2457 runCtrl[j].procId = 92;
2458 }
2459
2460 runCtrl[j].closeTime = g_actTime - 1;
2461 int i = runClose (runCtrl[j].fileHd, &runTail[j], sizeof (runTail[j]));
2462 int ii = 0;
2463 if (runCtrl[j].closeTime < g_actTime)
2464 ii = 2;
2465 else if (runCtrl[j].lastTime < g_actTime - 300)
2466 ii = 3;
2467 else if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2468 ii = 4;
2469 if (i < 0) {
2470 snprintf (str, MXSTR, "writeEvt: Error closing run %d (runClose[3],i=%d)",
2471 runCtrl[j].runId, i);
2472 factOut (kError, 506, str);
2473 runCtrl[j].fileId = 96;
2474 } else {
2475 snprintf (str, MXSTR, "writeEvt: Closed run %d (ii[3]=%d)", runCtrl[j].runId,
2476 ii);
2477 factOut (kInfo, 507, str);
2478 runCtrl[j].fileId = 97;
2479 }
2480 }
2481
2482 gw_runStat = -99;
2483 gj.writStat = -99;
2484 snprintf (str, MXSTR, "Exit Writing Process ...");
2485 factOut (kInfo, -1, str);
2486 return 0;
2487
2488
2489
2490
2491} /*-----------------------------------------------------------------*/
2492
2493
2494
2495
2496void
2497StartEvtBuild ()
2498{
2499
2500 int i, j, imax, status, th_ret[50];
2501 pthread_t thread[50];
2502 struct timespec xwait;
2503
2504 gi_runStat = gp_runStat = gw_runStat = 0;
2505 gj.readStat = gj.procStat = gj.writStat = 0;
2506
2507 snprintf (str, MXSTR, "Starting EventBuilder V15.07 A");
2508 factOut (kInfo, -1, str);
2509
2510//initialize run control logics
2511 for (i = 0; i < MAX_RUN; i++) {
2512 runCtrl[i].runId = 0;
2513 runCtrl[i].fileId = -2;
2514 }
2515
2516//prepare for subProcesses
2517 gi_maxSize = g_maxSize;
2518 if (gi_maxSize <= 0)
2519 gi_maxSize = 1;
2520
2521 gi_maxProc = g_maxProc;
2522 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2523 snprintf (str, MXSTR, "Illegal number of processes %d", gi_maxProc);
2524 factOut (kFatal, 301, str);
2525 gi_maxProc = 1;
2526 }
2527//partially initialize event control logics
2528 evtCtrl.frstPtr = 0;
2529 evtCtrl.lastPtr = 0;
2530
2531//start all threads (more to come) when we are allowed to ....
2532 while (g_runStat == 0) {
2533 xwait.tv_sec = 0;
2534 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2535 nanosleep (&xwait, NULL);
2536 }
2537
2538 i = 0;
2539 th_ret[i] = pthread_create (&thread[i], NULL, readFAD, NULL);
2540 i++;
2541 th_ret[i] = pthread_create (&thread[i], NULL, procEvt, NULL);
2542 i++;
2543 th_ret[i] = pthread_create (&thread[i], NULL, writeEvt, NULL);
2544 i++;
2545 imax = i;
2546
2547
2548#ifdef BILAND
2549 xwait.tv_sec = 30;;
2550 xwait.tv_nsec = 0; // sleep for ~20sec
2551 nanosleep (&xwait, NULL);
2552
2553 printf ("close all runs in 2 seconds\n");
2554
2555 CloseRunFile (0, time (NULL) + 2, 0);
2556
2557 xwait.tv_sec = 1;;
2558 xwait.tv_nsec = 0; // sleep for ~20sec
2559 nanosleep (&xwait, NULL);
2560
2561 printf ("setting g_runstat to -1\n");
2562
2563 g_runStat = -1;
2564#endif
2565
2566
2567//wait for all threads to finish
2568 for (i = 0; i < imax; i++) {
2569 j = pthread_join (thread[i], (void **) &status);
2570 }
2571
2572} /*-----------------------------------------------------------------*/
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588 /*-----------------------------------------------------------------*/
2589 /*-----------------------------------------------------------------*/
2590 /*-----------------------------------------------------------------*/
2591 /*-----------------------------------------------------------------*/
2592 /*-----------------------------------------------------------------*/
2593
2594#ifdef BILAND
2595
2596int
2597subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2598 int8_t * buffer)
2599{
2600 printf ("called subproc %d\n", threadID);
2601 return threadID + 1;
2602}
2603
2604
2605
2606
2607 /*-----------------------------------------------------------------*/
2608 /*-----------------------------------------------------------------*/
2609 /*-----------------------------------------------------------------*/
2610 /*-----------------------------------------------------------------*/
2611 /*-----------------------------------------------------------------*/
2612
2613
2614
2615
2616FileHandle_t
2617runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2618{
2619 return 1;
2620};
2621
2622int
2623runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2624{
2625 return 1;
2626 usleep (10000);
2627 return 1;
2628}
2629
2630
2631//{ return 1; } ;
2632
2633int
2634runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2635{
2636 return 1;
2637};
2638
2639
2640
2641
2642int
2643eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2644{
2645 int i = 0;
2646
2647// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2648// for (i=0; i<NBOARDS; i++) {
2649// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2650// }
2651 return 0;
2652}
2653
2654
2655void
2656factStatNew (EVT_STAT gi)
2657{
2658 int i;
2659
2660//for (i=0;i<MAX_SOCK;i++) {
2661// printf("%4d",gi.numRead[i]);
2662// if (i%20 == 0 ) printf("\n");
2663//}
2664}
2665
2666void
2667gotNewRun (int runnr, PEVNT_HEADER * headers)
2668{
2669 printf ("got new run %d\n", runnr);
2670 return;
2671}
2672
2673void
2674factStat (GUI_STAT gj)
2675{
2676// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2677// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2678}
2679
2680
2681void
2682debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2683 int state, uint32_t tsec, uint32_t tusec)
2684{
2685// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2686}
2687
2688
2689
2690void
2691debugStream (int isock, void *buf, int len)
2692{
2693}
2694
2695void
2696debugHead (int i, int j, void *buf)
2697{
2698}
2699
2700
2701void
2702factOut (int severity, int err, char *message)
2703{
2704 static FILE *fd;
2705 static int file = 0;
2706
2707 if (file == 0) {
2708 printf ("open file\n");
2709 fd = fopen ("x.out", "w+");
2710 file = 999;
2711 }
2712
2713 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2714
2715 if (severity != kDebug)
2716 printf ("%3d %3d | %s\n", severity, err, message);
2717}
2718
2719
2720
2721int
2722main ()
2723{
2724 int i, b, c, p;
2725 char ipStr[100];
2726 struct in_addr IPaddr;
2727
2728 g_maxMem = 1024 * 1024; //MBytes
2729//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2730 g_maxMem = g_maxMem * 200; //100MBytes
2731
2732 g_maxProc = 20;
2733 g_maxSize = 30000;
2734
2735 g_runStat = 40;
2736
2737 i = 0;
2738
2739// version for standard crates
2740//for (c=0; c<4,c++) {
2741// for (b=0; b<10; b++) {
2742// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2743//
2744// inet_pton(PF_INET, ipStr, &IPaddr) ;
2745//
2746// g_port[i].sockAddr.sin_family = PF_INET;
2747// g_port[i].sockAddr.sin_port = htons(5000) ;
2748// g_port[i].sockAddr.sin_addr = IPaddr ;
2749// g_port[i].sockDef = 1 ;
2750// i++ ;
2751// }
2752//}
2753//
2754//version for PC-test *
2755 for (c = 0; c < 4; c++) {
2756 for (b = 0; b < 10; b++) {
2757 sprintf (ipStr, "10.0.%d.11", 128 + c);
2758 if (c < 2)
2759 sprintf (ipStr, "10.0.%d.11", 128);
2760 else
2761 sprintf (ipStr, "10.0.%d.11", 131);
2762// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2763
2764 inet_pton (PF_INET, ipStr, &IPaddr);
2765 p = 31919 + 100 * c + 10 * b;
2766
2767
2768 g_port[i].sockAddr.sin_family = PF_INET;
2769 g_port[i].sockAddr.sin_port = htons (p);
2770 g_port[i].sockAddr.sin_addr = IPaddr;
2771 g_port[i].sockDef = 1;
2772
2773 i++;
2774 }
2775 }
2776
2777
2778//g_port[17].sockDef =-1 ;
2779//g_actBoards-- ;
2780
2781 StartEvtBuild ();
2782
2783 return 0;
2784
2785}
2786#endif
Note: See TracBrowser for help on using the repository browser.