source: trunk/FACT++/src/EventBuilder.c@ 15468

Last change on this file since 15468 was 15468, checked in by tbretz, 12 years ago
Cleaned up the file removing some old style memory allocation schemes
File size: 74.4 KB
Line 
1
2// // // #define EVTDEBUG
3
4#define NUMSOCK 1 //set to 7 for old configuration
5#define MAXREAD 65536 //64kB wiznet buffer
6
7#include <stdlib.h>
8#include <stdint.h>
9#include <stdarg.h>
10#include <unistd.h>
11#include <stdio.h>
12#include <sys/time.h>
13#include <arpa/inet.h>
14#include <string.h>
15#include <math.h>
16#include <error.h>
17#include <errno.h>
18#include <unistd.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <netinet/in.h>
22#include <netinet/tcp.h>
23#include <pthread.h>
24#include <sched.h>
25
26#include "EventBuilder.h"
27
28enum Severity
29{
30 kMessage = 10, ///< Just a message, usually obsolete
31 kInfo = 20, ///< An info telling something which can be interesting to know
32 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
33 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
34 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
35 kDebug = 99, ///< A message used for debugging only
36};
37
38#define MIN_LEN 32 // min #bytes needed to interpret FADheader
39#define MAX_LEN 256*1024 // size of read-buffer per socket
40
41//#define nanosleep(x,y)
42
43extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
44extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
45extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
46//extern int runFinish (uint32_t runnr);
47
48extern void factOut (int severity, int err, char *message);
49extern void factReportIncomplete (uint64_t rep);
50
51extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
52
53
54extern void factStat (GUI_STAT gj);
55
56extern void factStatNew (EVT_STAT gi);
57
58extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
59
60extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
61 int8_t * buffer);
62
63extern void debugHead (int i, int j, void *buf);
64
65extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
66 int32_t runnr, int state, uint32_t tsec,
67 uint32_t tusec);
68extern void debugStream (int isock, void *buf, int len);
69
70int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
71
72
73int evtCtrl_frstPtr;
74int evtCtrl_lastPtr;
75
76
77
78int g_maxProc;
79int g_maxSize;
80int gi_maxSize;
81int gi_maxProc;
82
83uint g_actTime;
84uint g_actUsec;
85int g_runStat;
86int g_reset;
87int g_useFTM;
88
89int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
90size_t g_maxMem; //maximum memory allowed for buffer
91
92//no longer needed ...
93int g_maxBoards; //maximum number of boards to be initialized
94int g_actBoards;
95//
96
97FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
98
99
100int gi_runStat;
101int gp_runStat;
102int gw_runStat;
103
104//int gi_memStat = +1;
105
106uint32_t actrun = 0;
107
108
109uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
110
111//uint gi_EvtStart= 0 ;
112//uint gi_EvtRead = 0 ;
113//uint gi_EvtBad = 0 ;
114//uint gi_EvtTot = 0 ;
115//size_t gi_usedMem = 0 ;
116
117//uint gw_EvtTot = 0 ;
118//uint gp_EvtTot = 0 ;
119
120//PIX_MAP g_pixMap[NPIX];
121
122EVT_STAT gi;
123GUI_STAT gj;
124
125EVT_CTRL evtCtrl[MAX_EVT * MAX_RUN]; //control of events during processing
126
127//#define MXSTR 1000
128//char str[MXSTR];
129
130void factPrintf(int severity, int id, const char *fmt, ...)
131{
132 char str[1000];
133
134 va_list ap;
135 va_start(ap, fmt);
136 vsnprintf(str, 1000, fmt, ap);
137 va_end(ap);
138
139 factOut(severity, id, str);
140}
141
142
143#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
144#define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)
145typedef struct TGB_struct
146{
147 struct TGB_struct *prev;
148 void *mem;
149} TGB_entry;
150
151TGB_entry *tgb_last = NULL;
152uint64_t tgb_memory = 0;
153uint64_t tgb_inuse = 0;
154
155void *TGB_Malloc()
156{
157 // No free slot available, next alloc would exceed max memory
158 if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem)
159 return NULL;
160
161 // We will return this amount of memory
162 tgb_inuse += MAX_TOT_MEM;
163
164 // No free slot available, allocate a new one
165 if (!tgb_last)
166 {
167 tgb_memory += MAX_TOT_MEM;
168 return malloc(MAX_TOT_MEM);
169 }
170
171 // Get the next free slot from the stack and return it
172 TGB_entry *last = tgb_last;
173
174 void *mem = last->mem;
175 tgb_last = last->prev;
176
177 free(last);
178
179 return mem;
180};
181
182void TGB_free(void *mem)
183{
184 // Add the last free slot to the stack
185 TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry));
186
187 entry->prev = tgb_last;
188 entry->mem = mem;
189
190 tgb_last = entry;
191
192 // Decrease the amont of memory in use accordingly
193 tgb_inuse -= MAX_TOT_MEM;
194}
195
196RUN_CTRL runCtrl[MAX_RUN];
197//RUN_TAIL runTail[MAX_RUN];
198
199
200/*
201*** Definition of rdBuffer to read in IP packets; keep it global !!!!
202 */
203
204
205typedef union
206{
207 int8_t B[MAX_LEN];
208 int16_t S[MAX_LEN / 2];
209 int32_t I[MAX_LEN / 4];
210 int64_t L[MAX_LEN / 8];
211} CNV_FACT;
212
213typedef struct
214{
215 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
216 int32_t bufPos; //next byte to read to the buffer next
217 int32_t bufLen; //number of bytes left to read
218// size_t bufLen; //number of bytes left to read size_t might be better
219 int32_t skip; //number of bytes skipped before start of event
220
221 int errCnt; //how often connect failed since last successful
222 int sockStat; //-1 if socket not yet connected , 99 if not exist
223 int socket; //contains the sockets
224 struct sockaddr_in SockAddr; //IP for each socket
225
226 int evtID; // event ID of event currently read
227 int runID; // run "
228 int ftmID; // event ID from FTM
229 uint fadLen; // FADlength of event currently read
230 int fadVers; // Version of FAD
231 int ftmTyp; // trigger type
232 int board; // boardID (softwareID: 0..40 )
233 int Port;
234
235 CNV_FACT *rBuf;
236
237} READ_STRUCT;
238
239
240typedef union
241{
242 int8_t B[2];
243 int16_t S;
244} SHORT_BYTE;
245
246
247
248
249
250SHORT_BYTE start, stop;
251
252READ_STRUCT rd[MAX_SOCK]; //buffer to read IP and afterwards store in mBuffer
253
254
255
256/*-----------------------------------------------------------------*/
257
258
259/*-----------------------------------------------------------------*/
260
261
262
263int
264runFinish1 (uint32_t runnr)
265{
266 factPrintf(kInfo, 173, "Should finish(1) run %d (but not yet possible)", runnr);
267 return 0;
268}
269int
270runFinish (uint32_t runnr)
271{
272 factPrintf(kInfo, 173, "Should finish run %d (but not yet possible)", runnr);
273 return 0;
274}
275
276int
277GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
278 READ_STRUCT * rs)
279{
280/*
281*** generate Address, create sockets and allocates readbuffer for it
282***
283*** if flag==0 generate socket and buffer
284*** <0 destroy socket and buffer
285*** >0 close and redo socket
286***
287*** sid : board*7 + port id
288 */
289
290 int j;
291 int optval = 1; //activate keepalive
292 socklen_t optlen = sizeof (optval);
293
294
295 if (sid % 7 >= NUMSOCK) {
296 //this is a not used socket, so do nothing ...
297 rs->sockStat = 77;
298 rs->rBuf = NULL ;
299 return 0;
300 }
301
302 if (rs->sockStat == 0) { //close socket if open
303 j = close (rs->socket);
304 if (j > 0) {
305 factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno);
306 } else {
307 factPrintf(kInfo, 771, "Succesfully closed socket %d", sid);
308 }
309 }
310
311 rs->sockStat = 99;
312
313 if (flag < 0) {
314 free (rs->rBuf); //and never open again
315 rs->rBuf = NULL;
316 return 0;
317 }
318
319
320 if (flag == 0) { //generate address and buffer ...
321 rs->Port = port;
322 rs->SockAddr.sin_family = sockAddr->sin_family;
323 rs->SockAddr.sin_port = htons (port);
324 rs->SockAddr.sin_addr = sockAddr->sin_addr;
325
326 rs->rBuf = (CNV_FACT*)malloc (sizeof (CNV_FACT));
327 if (rs->rBuf == NULL) {
328 factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid);
329 rs->sockStat = 77;
330 return -3;
331 }
332 }
333
334
335 if ((rs->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
336 factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno);
337 rs->sockStat = 88;
338 return -2;
339 }
340 optval = 1;
341 if (setsockopt (rs->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
342 factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
343 }
344 optval = 10; //start after 10 seconds
345 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
346 factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
347 }
348 optval = 10; //do every 10 seconds
349 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
350 factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
351 }
352 optval = 2; //close after 2 unsuccessful tries
353 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
354 factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
355 }
356
357 factPrintf(kInfo, 773, "Successfully generated socket %d", sid);
358
359 rs->sockStat = -1; //try to (re)open socket
360 rs->errCnt = 0;
361 return 0;
362
363} /*-----------------------------------------------------------------*/
364
365 /*-----------------------------------------------------------------*/
366
367
368
369
370int
371mBufInit ()
372{
373// initialize mBuffer (mark all entries as unused\empty)
374
375 //uint32_t actime = g_actTime + 50000000;
376
377 for (int i = 0; i < MAX_EVT * MAX_RUN; i++) {
378 evtCtrl[i].evNum = evtCtrl[i].nRoi = -1;
379 evtCtrl[i].runNum = 0;
380
381 //evtCtrl[i].mBuffer_idx = -1;
382 evtCtrl[i].evtStat = -1;
383 }
384
385 //actRun.FADhead = malloc (NBOARDS * sizeof (PEVNT_HEADER));
386
387 evtCtrl_frstPtr = 0;
388 evtCtrl_lastPtr = 0;
389
390 return 0;
391
392} /*-----------------------------------------------------------------*/
393
394int checkRoiConsistency(int i, int roi[]);
395
396int mBufEvt(int sk)
397{
398// generate a new Event into mBuffer:
399// make sure only complete Event are possible, so 'free' will always work
400// returns index into mBuffer[], or negative value in case of error
401// error: <-9000 if roi screwed up (not consistent with run)
402// <-8000 (not consistent with event)
403// <-7000 (not consistent with board)
404// < 0 if no space left
405
406// int evFree;
407// int headmem = 0;
408// size_t needmem = 0;
409
410 int nRoi[9];
411 if (!checkRoiConsistency(sk, nRoi))
412 return -9999;
413
414 //const int b = sk / 7;
415
416 const int evID = rd[sk].evtID;
417 const uint runID = rd[sk].runID;
418 const int trgTyp = rd[sk].ftmTyp;
419 const int trgNum = rd[sk].ftmID;
420 const int fadNum = rd[sk].evtID;
421
422 int beg = (evtCtrl_lastPtr + MAX_EVT*MAX_RUN - 1) % (MAX_EVT*MAX_RUN);
423 int end = (evtCtrl_frstPtr + MAX_EVT*MAX_RUN - 1) % (MAX_EVT*MAX_RUN);
424
425 for (int k=beg; k!=end; k--, k += MAX_EVT*MAX_RUN, k %= MAX_EVT*MAX_RUN)
426 {
427 // If the run is different, go on searching.
428 // We cannot stop searching if a lower run-id is found as in
429 // the case of the events, because theoretically, there
430 // can be the same run on two different days.
431 if (runID != evtCtrl[k].runNum)
432 continue;
433
434 // The event in the control structure has an event id with
435 // a lower event id than the current one. All previous events
436 // should have an even lower event id, so there is no way it
437 // can be found in the structure.
438 if (evtCtrl[k].evNum < evID/* && runID == evtCtrl[k].runNum*/)
439 break;
440
441 if (evID != evtCtrl[k].evNum/* || runID != evtCtrl[k].runNum*/)
442 continue;
443
444 // is it ok ????
445 if (evtCtrl[k].nRoi != nRoi[0] || evtCtrl[k].nRoiTM != nRoi[8])
446 {
447 factPrintf(kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
448 evtCtrl[k].nRoi, evtCtrl[k].nRoiTM, nRoi[0], nRoi[8]);
449 return -8201;
450 }
451
452 // count for inconsistencies
453 if (evtCtrl[k].trgNum != trgNum)
454 evtCtrl[k].Errors[0]++;
455 if (evtCtrl[k].fadNum != fadNum)
456 evtCtrl[k].Errors[1]++;
457 if (evtCtrl[k].trgTyp != trgTyp)
458 evtCtrl[k].Errors[2]++;
459
460 //everything seems fine so far ==> use this slot ....
461 return k;
462 }
463
464 //event does not yet exist; create it
465
466 if (end-beg==1 || (end==0 && beg==MAX_EVT*MAX_RUN-1)) //no space available in ctrl
467 {
468 factPrintf(kError, 881, "No control slot to keep event %d (run %d) %d %d", evID, runID, beg, end);
469 return -1;
470 }
471
472 // FIXME: This should be the time of the first receiped board
473 struct timeval tv;
474 gettimeofday (&tv, NULL);
475
476 const uint32_t tsec = tv.tv_sec;
477 const uint32_t tusec = tv.tv_usec;
478
479 //check if runId already registered in runCtrl
480
481 uint oldest = g_actTime + 1000;
482 int jold = -1;
483
484 int found = 0;
485
486 // fileId==-2: not yet used or run assigned but not open
487 // fileId== 0: file open
488 // fileId>0: run closed
489
490 for (int k=0; k<MAX_RUN; k++)
491 {
492 // Check if run already registered (old entries should have runId==-1)
493 if (runCtrl[k].runId == runID)
494 {
495 // FIXME: Compare to previous event
496 if (runCtrl[k].roi0 != nRoi[0] || runCtrl[k].roi8 != nRoi[8])
497 {
498 factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
499 runCtrl[k].roi0, runCtrl[k].roi8, nRoi[0], nRoi[8], runID, evID);
500 return -9301;
501 }
502
503 found = 1;
504 break;
505 }
506
507 // This is just for sanity. We use the oldest free entry (until
508 // we have understood the concept and can use "just" a free entry
509 if (runCtrl[k].runId==0 && runCtrl[k].closeTime < oldest)
510 {
511 oldest = runCtrl[k].closeTime;
512 jold = k;
513 }
514 }
515
516 if (!found) // Run not yet registered, register run
517 {
518 if (jold < 0)
519 {
520 factPrintf(kFatal, 883, "Not able to register the new run %d", runID);
521 return -1001;
522 }
523
524 int evFree = jold;
525
526 factPrintf(kInfo, 503, "New run %d (evID=%d, evFree=%d) registered with roi=%d and roi_tm=%d",
527 runID, evID, evFree, nRoi[0], nRoi[8]);
528
529 runCtrl[evFree].runId = runID;
530 runCtrl[evFree].roi0 = nRoi[0]; // FIXME: Make obsolete!
531 runCtrl[evFree].roi8 = nRoi[8]; // FIXME: Make obsolete!
532 runCtrl[evFree].fileId = -2;
533 runCtrl[evFree].procId = -2;
534 runCtrl[evFree].lastEvt = 1; // Number of events partially started to read
535 runCtrl[evFree].actEvt = 0; // Number of written events (write)
536 runCtrl[evFree].procEvt = 0; // Number of successfully checked events (checkEvent)
537 runCtrl[evFree].maxEvt = 999999999; // max number events allowed
538 runCtrl[evFree].lastTime = tsec; // Time when the last event was written
539 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
540 }
541
542 const int k = evtCtrl_lastPtr;
543
544 //flag all boards as unused
545 evtCtrl[k].nBoard = 0;
546 for (int b=0; b<NBOARDS; b++)
547 evtCtrl[k].board[b] = -1;
548
549 evtCtrl[k].pcTime[0] = tsec;
550 evtCtrl[k].pcTime[1] = tusec;
551 evtCtrl[k].nRoi = nRoi[0];
552 evtCtrl[k].nRoiTM = nRoi[8];
553 evtCtrl[k].evNum = evID;
554 evtCtrl[k].runNum = runID;
555 evtCtrl[k].fadNum = fadNum;
556 evtCtrl[k].trgNum = trgNum;
557 evtCtrl[k].trgTyp = trgTyp;
558 evtCtrl[k].Errors[0] = 0;
559 evtCtrl[k].Errors[1] = 0;
560 evtCtrl[k].Errors[2] = 0;
561 evtCtrl[k].Errors[3] = 0;
562 evtCtrl[k].fEvent = NULL;
563 evtCtrl[k].FADhead = NULL;
564
565 evtCtrl[k].evtStat = 0;
566
567 // This is dangerous, because theoretically, it can result is
568 // acessing invalid memory in another thread if this is split
569 // in two instructions. Must be done only _after_ the contents
570 // have been initialized
571 evtCtrl_lastPtr = (evtCtrl_lastPtr+1) % MAX_EVT * MAX_RUN;
572
573 return k;
574
575} /*-----------------------------------------------------------------*/
576
577
578void initEvent(int i)
579{
580 evtCtrl[i].fEvent = (EVENT*)((char*)evtCtrl[i].FADhead+MAX_HEAD_MEM);
581 memset(evtCtrl[i].fEvent->Adc_Data, 0, (NPIX+NTMARK)*2*evtCtrl[i].nRoi);
582
583 //flag all pixels as unused
584 for (int k = 0; k < NPIX; k++)
585 evtCtrl[i].fEvent->StartPix[k] = -1;
586
587 //flag all TMark as unused
588 for (int k = 0; k < NTMARK; k++)
589 evtCtrl[i].fEvent->StartTM[k] = -1;
590
591 evtCtrl[i].fEvent->NumBoards = 0;
592 evtCtrl[i].fEvent->PCTime = evtCtrl[i].pcTime[0];
593 evtCtrl[i].fEvent->PCUsec = evtCtrl[i].pcTime[1];
594}
595
596
597int
598mBufFree (int i)
599{
600//delete entry [i] from mBuffer:
601//(and make sure multiple calls do no harm ....)
602
603 TGB_free(evtCtrl[i].FADhead);
604
605 evtCtrl[i].fEvent = NULL;
606 evtCtrl[i].FADhead = NULL;
607
608 evtCtrl[i].evNum = evtCtrl[i].nRoi = -1;
609 evtCtrl[i].runNum = 0;
610
611 gj.usdMem = tgb_inuse;
612
613 gj.bufTot--;
614
615 /*if (gi_memStat < 0) {
616 if (gj.usdMem <= 0.75 * gj.maxMem)
617 gi_memStat = +1;
618 }*/
619
620 return 0;
621
622} /*-----------------------------------------------------------------*/
623
624/*
625void
626resetEvtStat ()
627{
628 for (int i = 0; i < MAX_SOCK; i++)
629 gi.numRead[i] = 0;
630
631 for (int i = 0; i < NBOARDS; i++) {
632 gi.gotByte[i] = 0;
633 gi.gotErr[i] = 0;
634
635 }
636
637 gi.evtGet = 0; //#new Start of Events read
638 gi.evtTot = 0; //#complete Events read
639 gi.evtErr = 0; //#Events with Errors
640 gi.evtSkp = 0; //#Events incomplete (timeout)
641
642 gi.procTot = 0; //#Events processed
643 gi.procErr = 0; //#Events showed problem in processing
644 gi.procTrg = 0; //#Events accepted by SW trigger
645 gi.procSkp = 0; //#Events rejected by SW trigger
646
647 gi.feedTot = 0; //#Events used for feedBack system
648 gi.feedErr = 0; //#Events rejected by feedBack
649
650 gi.wrtTot = 0; //#Events written to disk
651 gi.wrtErr = 0; //#Events with write-error
652
653 gi.runOpen = 0; //#Runs opened
654 gi.runClose = 0; //#Runs closed
655 gi.runErr = 0; //#Runs with open/close errors
656
657 return;
658}*/ /*-----------------------------------------------------------------*/
659
660void reportIncomplete(int id)
661{
662 factPrintf(kWarn, 601, "%5d skip incomplete evt %8d",
663 evtCtrl[id].evNum, id);
664
665 uint64_t report = 0;
666
667 char str[1000];
668
669 int ik=0;
670 for (int ib=0; ib<NBOARDS; ib++)
671 {
672 if (ib%10==0)
673 str[ik++] = '|';
674
675 const int jb = evtCtrl[id].board[ib];
676 if (jb>=0) // data received from that board
677 {
678 str[ik++] = '0'+(jb%10);
679 continue;
680 }
681
682 // FIXME: Is that really 'b' or should that be 'ib' ?
683 if (gi_NumConnect[ib]<=0) // board not connected
684 {
685 str[ik++] = 'x';
686 continue;
687 }
688
689 // data from this board lost
690 str[ik++] = '.';
691 report |= ((uint64_t)1)<<ib;
692 }
693
694 str[ik++] = '|';
695 str[ik] = 0;
696
697 factOut(kWarn, 601, str);
698
699 factReportIncomplete(report);
700}
701
702int checkRoiConsistency(int i, int roi[])
703{
704 int xjr = -1;
705 int xkr = -1;
706
707 //points to the very first roi
708 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
709
710 roi[0] = ntohs(rd[i].rBuf->S[roiPtr]);
711
712 for (int jr = 0; jr < 9; jr++)
713 {
714 roi[jr] = ntohs(rd[i].rBuf->S[roiPtr]);
715
716 if (roi[jr]<0 || roi[jr]>1024)
717 {
718 factPrintf(kError, 999, "Illegal roi in channel %d (allowed: 0<=roi<=1024)", jr, roi[jr]);
719 return 0;
720 }
721
722 // Check that the roi of pixels jr are compatible with the one of pixel 0
723 if (jr!=8 && roi[jr]!=roi[0])
724 {
725 xjr = jr;
726 break;
727 }
728
729 // Check that the roi of all other DRS chips on boards are compatible
730 for (int kr = 1; kr < 4; kr++)
731 {
732 const int kroi = ntohs(rd[i].rBuf->S[roiPtr]);
733 if (kroi != roi[jr])
734 {
735 xjr = jr;
736 xkr = kr;
737 break;
738 }
739 roiPtr += kroi+4;
740 }
741 }
742
743 if (xjr>=0)
744 {
745 if (xkr<0)
746 factPrintf(kFatal, 1, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
747 else
748 factPrintf(kFatal, 1, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd[i].rBuf->S[roiPtr]));
749
750 return 0;
751 }
752
753 //const int b = i / 7;
754
755/*
756 if (roi[0]<0 || roi[0] > 1024)
757 {
758 factPrintf(kError, 999, "Illegal roi in channel 0: %d (allowed: 0<=roi<=1024)", roi[0]);
759 gj.badRoiR++;
760 gj.badRoi[b]++;
761 return 0;
762 }
763 */
764 /*
765 for (int jr = 1; jr < 8; jr++)
766 {
767 if (roi[jr] != roi[0])
768 {
769 factPrintf(kError, 711, "Mismatch of roi (%d) in channel %d with roi (%d) in channel 0.", roi[jr], jr, roi[0]);
770 gj.badRoiB++;
771 gj.badRoi[b]++;
772 return 0;
773 }
774 }
775*/
776 if (roi[8] < roi[0])
777 {
778 factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
779 //gj.badRoiB++;
780 //gj.badRoi[b]++;
781 return 0;
782 }
783
784 return 1;
785}
786
787void swapEventHeaderBytes(int i)
788{
789 //End of the header. to channels now
790 int eStart = 36;
791 for (int ePatchesCount = 0; ePatchesCount<4*9;ePatchesCount++)
792 {
793 rd[i].rBuf->S[eStart+0] = ntohs(rd[i].rBuf->S[eStart+0]);//id
794 rd[i].rBuf->S[eStart+1] = ntohs(rd[i].rBuf->S[eStart+1]);//start_cell
795 rd[i].rBuf->S[eStart+2] = ntohs(rd[i].rBuf->S[eStart+2]);//roi
796 rd[i].rBuf->S[eStart+3] = ntohs(rd[i].rBuf->S[eStart+3]);//filling
797
798 eStart += 4+rd[i].rBuf->S[eStart+2];//skip the pixel data
799 }
800}
801
802void copyData(int i, int evID, /*int roi,*/ int boardId)
803{
804 swapEventHeaderBytes(i);
805
806 memcpy(&evtCtrl[evID].FADhead[boardId].start_package_flag,
807 &rd[i].rBuf->S[0], sizeof(PEVNT_HEADER));
808
809 int src = sizeof(PEVNT_HEADER) / 2;
810
811 // consistency of ROIs have been checked already (is it all correct?)
812 const int roi = rd[i].rBuf->S[src+2];
813
814 // different sort in FAD board.....
815 for (int px = 0; px < 9; px++)
816 {
817 for (int drs = 0; drs < 4; drs++)
818 {
819 // pixH = rd[i].rBuf->S[src++]; // ID
820 src++;
821
822 const int pixC = rd[i].rBuf->S[src++]; // start-cell
823 const int pixR = rd[i].rBuf->S[src++]; // roi
824 //here we should check if pixH is correct ....
825
826 const int pixS = boardId * 36 + drs * 9 + px;
827 src++;
828
829 evtCtrl[evID].fEvent->StartPix[pixS] = pixC;
830
831 const int dest1 = pixS * roi;
832 memcpy(&evtCtrl[evID].fEvent->Adc_Data[dest1],
833 &rd[i].rBuf->S[src], roi * 2);
834
835 src += pixR;
836
837 if (px == 8)
838 {
839 const int tmS = boardId * 4 + drs;
840
841 //and we have additional TM info
842 if (pixR > roi)
843 {
844 const int dest2 = tmS * roi + NPIX * roi;
845
846 const int srcT = src - roi;
847 evtCtrl[evID].fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024;
848
849 memcpy(&evtCtrl[evID].fEvent->Adc_Data[dest2],
850 &rd[i].rBuf->S[srcT], roi * 2);
851 }
852 else
853 {
854 evtCtrl[evID].fEvent->StartTM[tmS] = -1;
855
856 //ETIENNE because the TM channels are always processed during drs calib,
857 //set them to zero if they are not present
858 //I suspect that it may be more efficient to set all the allocated mem to
859 //zero when allocating it
860 // dest = tmS*roi[0] + NPIX*roi[0];
861 // bzero(&mBuffer[evID].fEvent->Adc_Data[dest],roi[0]*2);
862 }
863 }
864 }
865 }
866}
867
868
869void
870initReadFAD ()
871{
872 return;
873} /*-----------------------------------------------------------------*/
874
875//struct rnd
876//{
877// int val;
878// int idx;
879//};
880//
881//struct rnd random_arr[MAX_SOCK];
882//
883//int compare(const void *f1, const void *f2)
884//{
885// struct rnd *r1 = (struct rnd*)f1;
886// struct rnd *r2 = (struct rnd*)f2;
887// return r1->val - r2->val;
888//}
889
890
891void *readFAD (void *ptr)
892{
893/* *** main loop reading FAD data and sorting them to complete events */
894
895 factPrintf(kInfo, -1, "Start initializing (readFAD)");
896
897// int cpu = 7; //read thread
898// cpu_set_t mask;
899
900/* CPU_ZERO initializes all the bits in the mask to zero. */
901// CPU_ZERO (&mask);
902/* CPU_SET sets only the bit corresponding to cpu. */
903// cpu = 7;
904// CPU_SET (cpu, &mask);
905
906/* sched_setaffinity returns 0 in success */
907// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
908// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
909// factOut (kWarn, -1, str);
910// }
911
912
913 const int minLen = sizeof(PEVNT_HEADER); //min #bytes needed to check header: full header for debug
914
915 start.S = 0xFB01;
916 stop.S = 0x04FE;
917
918/* initialize run control logics */
919 for (int i = 0; i < MAX_RUN; i++) {
920 runCtrl[i].runId = 0;
921 runCtrl[i].fileId = -2;
922 runCtrl[i].procId = -2;
923 }
924 gi_resetS = gi_resetR = 9;
925
926 int sockDef[NBOARDS]; //internal state of sockets
927 memset(sockDef, 0, NBOARDS*sizeof(int));
928
929 START:
930 evtCtrl_frstPtr = 0;
931 evtCtrl_lastPtr = 0;
932
933 //time in seconds
934 uint gi_SecTime = time(NULL);;
935
936 const int cntsock = 8 - NUMSOCK ;
937
938 if (gi_resetS > 0) {
939 //make sure all sockets are preallocated as 'not exist'
940 for (int i = 0; i < MAX_SOCK; i++) {
941 rd[i].socket = -1;
942 rd[i].sockStat = 99;
943 }
944
945 for (int k = 0; k < NBOARDS; k++) {
946 gi_NumConnect[k] = 0;
947 //gi.numConn[k] = 0;
948 gj.numConn[k] = 0;
949 //gj.errConn[k] = 0;
950 gj.rateBytes[k] = 0;
951 gj.totBytes[k] = 0;
952 }
953
954 }
955
956
957 if (gi_resetR > 0) {
958 //resetEvtStat ();
959 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
960 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
961 gj.totMem = tgb_memory;
962 gj.bufNew = gj.bufEvt = 0;
963 //gj.badRoiE = gj.badRoiR = gj.badRoiB = 0;
964 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
965
966 //for (int b = 0; b < NBOARDS; b++)
967 // gj.badRoi[b] = 0;
968
969 mBufInit (); //initialize buffers
970
971 factPrintf(kInfo, -1, "End initializing (readFAD)");
972 }
973
974
975 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
976
977 //loop until global variable g_runStat claims stop
978 while (g_runStat >= 0 && g_reset == 0)
979 {
980 gi_runStat = g_runStat;
981 gj.readStat = g_runStat;
982
983 struct timeval tv;
984 gettimeofday (&tv, NULL);
985 g_actTime = tv.tv_sec;
986 g_actUsec = tv.tv_usec;
987
988
989 for (int b = 0; b < NBOARDS; b++)
990 {
991 // Nothing has changed
992 if (g_port[b].sockDef == sockDef[b])
993 continue;
994
995 gi_NumConnect[b] = 0; //must close all connections
996 //gi.numConn[b] = 0;
997 gj.numConn[b] = 0;
998
999 // s0 = 0: sockets to be defined and opened
1000 // s0 = -1: sockets to be destroyed
1001 // s0 = +1: sockets to be closed and reopened
1002
1003 int s0 = 0;
1004 if (sockDef[b] != 0)
1005 s0 = g_port[b].sockDef==0 ? -1 : +1;
1006
1007 const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0;
1008
1009 int k = b * 7;
1010 for (int p = p0 + 1; p < p0 + 8; p++, k++)
1011 GenSock (s0, k, p, &g_port[b].sockAddr, &rd[k]); //generate address and socket
1012
1013 sockDef[b] = g_port[b].sockDef;
1014 }
1015
1016 // count the number of active boards
1017 int actBoards = 0;
1018 for (int b = 0; b < NBOARDS; b++)
1019 if (sockDef[b] > 0)
1020 actBoards++;
1021
1022 //count number of succesfull actions
1023// int numok = 0;
1024
1025/*
1026 for (i=0; i<MAX_SOCK/7; i++)
1027 {
1028 random_arr[i].val = rand();
1029 random_arr[i].idx = i;
1030 }
1031
1032 qsort(random_arr, MAX_SOCK/7, sizeof(struct rnd), compare);
1033
1034 for (int iii = 0; iii < MAX_SOCK/7; iii++) { //check all sockets if something to read
1035
1036 b = random_arr[iii].idx;
1037 i = b*7;
1038 p = 0;
1039 */
1040
1041 //check all sockets if something to read
1042 for (int i = 0; i < MAX_SOCK; i+=7)
1043 {
1044 // Do not try to connect this socket
1045 if (rd[i].sockStat > 0)
1046 continue;
1047
1048 const int board = i / 7 ;
1049 //const int p = i % 7 ;
1050
1051 if (rd[i].sockStat == -1)
1052 {
1053 //try to connect if not yet done
1054 rd[i].sockStat = connect (rd[i].socket,
1055 (struct sockaddr *) &rd[i].SockAddr,
1056 sizeof (rd[i].SockAddr));
1057 // Failed
1058 if (rd[i].sockStat == -1)
1059 {
1060 rd[i].errCnt++;
1061 usleep(25000);
1062 continue;
1063 }
1064
1065 // Success (rd[i].sockStat == 0)
1066
1067 if (sockDef[board] > 0)
1068 {
1069 rd[i].bufTyp = 0; // expect a header
1070 rd[i].bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
1071 }
1072 else
1073 {
1074 rd[i].bufTyp = -1; // full data to be skipped
1075 rd[i].bufLen = MAX_LEN; // huge for skipping
1076 }
1077
1078 rd[i].bufPos = 0; // no byte read so far
1079 rd[i].skip = 0; // start empty
1080
1081 gi_NumConnect[board] += cntsock;
1082
1083 //gi.numConn[b]++;
1084 gj.numConn[board]++;
1085
1086 factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", board, gj.numConn[board]);
1087 }
1088
1089 // Do not read from this socket
1090 if (rd[i].bufLen<0)
1091 continue;
1092
1093 //numok++;
1094
1095 if (rd[i].bufLen>0)
1096 {
1097 const int32_t jrd =
1098 recv(rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
1099 rd[i].bufLen, MSG_DONTWAIT);
1100
1101 // recv failed
1102 if (jrd<0)
1103 {
1104 // There was just nothing waiting
1105 if (errno==EWOULDBLOCK || errno==EAGAIN)
1106 {
1107 //numok--;
1108 continue;
1109 }
1110
1111 factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);
1112 //gi.gotErr[b]++;
1113 continue;
1114 }
1115
1116 // connection was closed ...
1117 if (jrd==0)
1118 {
1119 factPrintf(kInfo, 441, "Socket %d closed by FAD", i);
1120
1121 const int s0 = sockDef[board] > 0 ? +1 : -1;
1122 GenSock(s0, i, 0, NULL, &rd[i]);
1123
1124 //gi.gotErr[b]++;
1125
1126 gi_NumConnect[board]-= cntsock ;
1127 //gi.numConn[b]--;
1128 gj.numConn[board]--;
1129
1130 continue;
1131 }
1132 // Success (jrd > 0)
1133
1134 gj.rateBytes[board] += jrd;
1135
1136 // are we skipping this board ...
1137 if (rd[i].bufTyp < 0)
1138 continue;
1139
1140 rd[i].bufPos += jrd; //==> prepare for continuation
1141 rd[i].bufLen -= jrd;
1142
1143#ifdef EVTDEBUG
1144 debugRead(i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, rd[i].bufTyp, tv.tv_sec, tv.tv_usec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
1145#endif
1146 }
1147
1148 //we are reading event header
1149 if (rd[i].bufTyp <= 0)
1150 {
1151 //not yet sufficient data to take action
1152 if (rd[i].bufPos < minLen)
1153 continue;
1154
1155 //check if startflag correct; else shift block ....
1156 // FIXME: This is not enough... this combination of
1157 // bytes can be anywhere... at least the end bytes
1158 // must be checked somewhere, too.
1159 int k;
1160 for (k = 0; k < rd[i].bufPos - 1; k++)
1161 {
1162 if (rd[i].rBuf->B[k] == start.B[1] && rd[i].rBuf->B[k+1] == start.B[0])
1163 break;
1164 }
1165 rd[i].skip += k;
1166
1167 //no start of header found
1168 if (k >= rd[i].bufPos - 1)
1169 {
1170 rd[i].bufPos = 0;
1171 rd[i].bufLen = sizeof(PEVNT_HEADER);
1172 continue;
1173 }
1174
1175 if (k > 0)
1176 {
1177 rd[i].bufPos -= k;
1178 rd[i].bufLen += k;
1179 memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1180 rd[i].bufPos);
1181 }
1182
1183 if (rd[i].bufPos < minLen)
1184 continue;
1185
1186 if (rd[i].skip > 0)
1187 {
1188 factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i);
1189 rd[i].skip = 0;
1190 }
1191
1192 // TGB: This needs much more checks than just the first two bytes!
1193
1194 // Swap everything except start_package_flag.
1195 // It is to difficult to find out where it is used how,
1196 // but it doesn't really matter because it is not really
1197 // used anywehere else
1198 // rd[i].rBuf->S[1] = ntohs(rd[i].rBuf->S[1]); // package_length
1199 rd[i].rBuf->S[2] = ntohs(rd[i].rBuf->S[2]); // version_no
1200 rd[i].rBuf->S[3] = ntohs(rd[i].rBuf->S[3]); // PLLLCK
1201 rd[i].rBuf->S[4] = ntohs(rd[i].rBuf->S[4]); // trigger_crc
1202 rd[i].rBuf->S[5] = ntohs(rd[i].rBuf->S[5]); // trigger_type
1203
1204 rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]); // board id
1205 rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]); // adc_clock_phase_shift
1206 rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]); // number_of_triggers_to_generate
1207 rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]); // trigger_generator_prescaler
1208
1209 rd[i].rBuf->I[3] = ntohl(rd[i].rBuf->I[3]); // trigger_id
1210 rd[i].rBuf->I[4] = ntohl(rd[i].rBuf->I[4]); // fad_evt_counter
1211 rd[i].rBuf->I[5] = ntohl(rd[i].rBuf->I[5]); // REFCLK_frequency
1212
1213 rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]); // runnumber;
1214 rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]); // time;
1215
1216 for (int s=24;s<24+NTemp+NDAC;s++)
1217 rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac
1218
1219 rd[i].fadLen = ntohs(rd[i].rBuf->S[1]) * 2;
1220 rd[i].fadVers = rd[i].rBuf->S[2];
1221 rd[i].ftmTyp = rd[i].rBuf->S[5];
1222 rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt)
1223 rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt)
1224 rd[i].runID = rd[i].rBuf->I[11]==0 ? (int)g_actTime : rd[i].rBuf->I[11];
1225 rd[i].bufTyp = 1; //ready to read full record
1226 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1227
1228 const int fadBoard = rd[i].rBuf->S[12];
1229 debugHead(i, fadBoard, rd[i].rBuf);
1230
1231 continue;
1232 }
1233
1234 // are we reading data
1235
1236 // not yet all read
1237 if (rd[i].bufLen > 0)
1238 continue;
1239
1240 if (rd[i].rBuf->B[rd[i].fadLen - 1] != stop.B[0] ||
1241 rd[i].rBuf->B[rd[i].fadLen - 2] != stop.B[1])
1242 {
1243 //gi.evtErr++;
1244 factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), expected %3d %3d, got %3d %3d",
1245 i, rd[i].evtID, rd[i].fadLen, stop.B[0], stop.B[1],
1246 rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]);
1247
1248 // ready to read next header
1249 rd[i].bufTyp = 0;
1250 rd[i].bufLen = sizeof(PEVNT_HEADER);
1251 rd[i].bufPos = 0;
1252
1253 continue;
1254 }
1255
1256 // int actid;
1257 // if (g_useFTM > 0)
1258 // actid = rd[i].evtID;
1259 // else
1260 // actid = rd[i].ftmID;
1261
1262 //get index into mBuffer for this event (create if needed)
1263 const int evID = mBufEvt(i);
1264
1265 // no free entry in mBuffer, retry later
1266 if (evID == -1)
1267 continue;
1268
1269 // We have a valid entry, but no memory has yet been allocated
1270 if (evID >= 0 && evtCtrl[evID].FADhead == NULL)
1271 {
1272 // Try to get memory from the big buffer
1273 evtCtrl[evID].FADhead = (PEVNT_HEADER*)TGB_Malloc();
1274 if (evtCtrl[evID].FADhead == NULL)
1275 {
1276 // If this works properly, this is a hack which can be removed, or
1277 // replaced by a signal or dim message
1278 if (rd[i].bufTyp==2)
1279 factPrintf(kError, 882, "malloc failed for event %d", evID);
1280 rd[i].bufTyp = 2;
1281 continue;
1282 }
1283
1284 // Initialice mBuffer[evID]->fEvent
1285 initEvent(evID);
1286
1287 // Some statistics
1288 gj.usdMem = tgb_inuse;
1289
1290 if (gj.usdMem > gj.maxMem)
1291 gj.maxMem = gj.usdMem;
1292
1293 gj.rateNew++;
1294 gj.bufTot++;
1295 if (gj.bufTot > gj.maxEvt)
1296 gj.maxEvt = gj.bufTot;
1297
1298 //register event in 'active list (reading)'
1299 //mBuffer[evID].evtCtrl_idx = evtCtrl_lastPtr;
1300
1301 //evtCtrl[evtCtrl_lastPtr].mBuffer_idx = evID;
1302 //evtCtrl[evtCtrl_lastPtr].evtStat = 0;
1303 //evtCtrl[evtCtrl.lastPtr].pcTime = g_actTime;
1304
1305 //evtCtrl_lastPtr++;
1306 //evtCtrl_lastPtr %= MAX_EVT * MAX_RUN;
1307 }
1308
1309 // ready to read next header
1310 rd[i].bufTyp = 0;
1311 rd[i].bufLen = sizeof(PEVNT_HEADER);
1312 rd[i].bufPos = 0;
1313
1314 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1315 if (evID < -1000)
1316 continue;
1317
1318 //we have a valid entry in mBuffer[]; fill it
1319 const int fadBoard = rd[i].rBuf->S[12];
1320 const int fadCrate = fadBoard>>8;
1321
1322 if (board != (fadCrate * 10 + (fadBoard&0xff)))
1323 {
1324 factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)",
1325 board, fadBoard, fadCrate, fadBoard&0xff);
1326 }
1327
1328 if (evtCtrl[evID].board[board] != -1)
1329 {
1330 factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d",
1331 evID, board, i, rd[i].fadLen,
1332 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1333 rd[i].rBuf->B[rd[i].fadLen - 2],
1334 rd[i].rBuf->B[rd[i].fadLen - 1]);
1335 continue; // Continue reading next header
1336 }
1337
1338 // Copy data from rd[i] to mBuffer[evID]
1339 copyData(i, evID, board);
1340
1341 // now we have stored a new board contents into Event structure
1342
1343 evtCtrl[evID].fEvent->NumBoards++;
1344 evtCtrl[evID].board[board] = board;
1345 evtCtrl[evID].nBoard++;
1346 evtCtrl[evID].evtStat = evtCtrl[evID].nBoard;
1347
1348 // have we already reported first (partial) event of this run ???
1349 if (evtCtrl[evID].nBoard==1 && evtCtrl[evID].runNum != actrun)
1350 {
1351 // Signal the fadctrl that a new run has been started
1352 gotNewRun(evtCtrl[evID].runNum, NULL);
1353
1354 factPrintf(kInfo, 1, "gotNewRun called, prev run %d, new run %d, event %d",
1355 actrun, evtCtrl[evID].runNum, evtCtrl[evID].evNum);
1356
1357 for (int j=0; j<MAX_RUN; j++)
1358 {
1359 // Since we have started a new run, we know already when to close the
1360 // previous run in terms of number of events
1361 if (runCtrl[j].runId==actrun)
1362 runCtrl[j].maxEvt = runCtrl[j].lastEvt;
1363
1364 // We got the first part of this event, so this is
1365 // the number of events we expect for this run
1366 if (runCtrl[j].runId==evtCtrl[evID].runNum)
1367 runCtrl[j].lastEvt++;
1368 }
1369
1370 // Change 'actrun' the the new runnumber
1371 actrun = evtCtrl[evID].runNum;
1372 }
1373
1374 // event not yet complete
1375 if (evtCtrl[evID].nBoard < actBoards)
1376 continue;
1377
1378 // This is a non-ideal hack to lower the probability that
1379 // in mBufEvt the search for correct entry in runCtrl
1380 // will not return a super-old entry
1381 for (int ir=0; ir<MAX_RUN; ir++)
1382 {
1383 if (runCtrl[ir].runId != actrun && runCtrl[ir].fileId>0)
1384 runCtrl[ir].runId = 0;
1385 }
1386
1387 // Flag that the event is ready for processing
1388 evtCtrl[evID].evtStat = 99;
1389
1390 } // end for loop over all sockets
1391
1392 g_actTime = time (NULL);
1393 if (g_actTime <= gi_SecTime)
1394 {
1395 usleep(1);
1396 continue;
1397 }
1398
1399 gi_SecTime = g_actTime;
1400
1401 gj.bufNew = gj.bufEvt = 0;
1402
1403 //loop over all active events and flag those older than read-timeout
1404 //delete those that are written to disk ....
1405 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1406 {
1407 /*
1408 if (k0==evtCtrl_frstPtr && evtCtrl[k0].evtStat<0)
1409 {
1410 evtCtrl_frstPtr++;
1411 evtCtrl_frstPtr %= MAX_EVT * MAX_RUN;
1412
1413 // Continue because evtCtrl.evtStat[k0] must be <0
1414 continue;
1415 }*/
1416
1417 // Check the more likely case first: incomplete events
1418 if (evtCtrl[k0].evtStat>0 && evtCtrl[k0].evtStat<92)
1419 {
1420 gj.bufNew++; //incomplete event in Buffer
1421
1422 // Event has not yet timed out or was reported already
1423 const int id = k0;//vtCtrl[k0].mBuffer_idx;
1424
1425 if (evtCtrl[k0].evtStat>=90 || evtCtrl[id].pcTime[0]/*evtCtrl[k0].lastRecv*/>=g_actTime - 30)
1426 continue;
1427
1428 reportIncomplete(id);
1429
1430 //timeout for incomplete events
1431 evtCtrl[k0].evtStat = 91;
1432 gj.evtSkip++;
1433
1434 continue;
1435 }
1436
1437 // complete event in Buffer
1438 if (evtCtrl[k0].evtStat >= 95)
1439 gj.bufEvt++;
1440
1441 // Check the less likely case: 'useless' or 'delete'
1442 if (evtCtrl[k0].evtStat==0 || evtCtrl[k0].evtStat >= 9000)
1443 {
1444 const int id = k0;//evtCtrl[k0].mBuffer_idx;
1445#ifdef EVTDEBUG
1446 factPrintf(kDebug, -1, "%5d free event buffer, nb=%3d", evtCtrl[id].evNum, evtCtrl[id].nBoard);
1447#endif
1448 mBufFree (id); //event written--> free memory
1449 evtCtrl[k0].evtStat = -1;
1450
1451 if (k0==evtCtrl_frstPtr)
1452 {
1453 evtCtrl_frstPtr++;
1454 evtCtrl_frstPtr %= MAX_EVT * MAX_RUN;
1455 }
1456 else
1457 factPrintf(kDebug, -1, "Freed a non-first slot");
1458
1459
1460 gj.evtWrite++;
1461 gj.rateWrite++;
1462 }
1463
1464 }
1465
1466 gj.deltaT = 1000; //temporary, must be improved
1467
1468 for (int ib = 0; ib < NBOARDS; ib++)
1469 gj.totBytes[ib] += gj.rateBytes[ib];
1470
1471 gj.totMem = tgb_memory;
1472
1473 if (gj.maxMem > gj.xxxMem)
1474 gj.xxxMem = gj.maxMem;
1475 if (gj.maxEvt > gj.xxxEvt)
1476 gj.xxxEvt = gj.maxEvt;
1477
1478 factStat (gj);
1479 factStatNew (gi);
1480 gj.rateNew = gj.rateWrite = 0;
1481 gj.maxMem = gj.usdMem;
1482 gj.maxEvt = gj.bufTot;
1483 for (int b = 0; b < NBOARDS; b++)
1484 gj.rateBytes[b] = 0;
1485
1486 } // while (g_runStat >= 0 && g_reset == 0)
1487
1488 factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset);
1489
1490 if (g_reset > 0)
1491 {
1492 gi_reset = g_reset;
1493 gi_resetR = gi_reset % 10; //shall we stop reading ?
1494 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1495 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1496 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1497 g_reset = 0;
1498 }
1499 else
1500 {
1501 gi_reset = 0;
1502 gi_resetR = g_runStat == -1 ? 1 : 7;
1503
1504 gi_resetS = 7; //close all sockets
1505 gi_resetW = 7; //close all files
1506 gi_resetX = 0;
1507
1508 //inform others we have to quit ....
1509 gi_runStat = -11; //inform all that no update to happen any more
1510 gj.readStat = -11; //inform all that no update to happen any more
1511 }
1512
1513 if (gi_resetS > 0)
1514 {
1515 //must close all open sockets ...
1516 factPrintf(kInfo, -1, "Close all sockets...");
1517
1518 for (int i = 0; i < MAX_SOCK; i++)
1519 {
1520 if (rd[i].sockStat != 0)
1521 continue;
1522
1523 GenSock(-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1524 if (i%7)
1525 continue;
1526
1527 gi_NumConnect[i / 7]-= cntsock ;
1528 //gi.numConn[i / 7]--;
1529 gj.numConn[i / 7]--;
1530 sockDef[i / 7] = 0; //flag ro recreate the sockets ...
1531 rd[i / 7].sockStat = -1; //and try to open asap
1532 }
1533 }
1534
1535
1536 if (gi_resetR > 0)
1537 {
1538 //flag all events as 'read finished'
1539 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1540 {
1541 if (evtCtrl[k0].evtStat > 0 && evtCtrl[k0].evtStat < 90)
1542 {
1543 evtCtrl[k0].evtStat = 91;
1544 //gi.evtSkp++;
1545 //gi.evtTot++;
1546 }
1547 }
1548
1549 //and clear all buffers (might have to wait until all others are done)
1550 // If minclear is 0, an event could be deleted while writing is still ongoing
1551 /*
1552 int minclear;
1553 if (gi_resetR == 1) {
1554 minclear = 900;
1555 factPrintf(kInfo, -1, "Drain all buffers ...");
1556 } else {
1557 minclear = 0;
1558 factPrintf(kInfo, -1, "Flush all buffers ...");
1559 }*/
1560 const int minclear = 900;
1561
1562 int numclear = 1;
1563 while (numclear > 0)
1564 {
1565 numclear = 0;
1566
1567 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1568 {
1569 if (evtCtrl[k0].evtStat > minclear)
1570 {
1571 const int id = k0;//evtCtrl[k0].mBuffer_idx;
1572#ifdef EVTDEBUG
1573 factPrintf(kDebug, -1, "ev %5d free event buffer, nb=%3d", evtCtrl[id].evNum, evtCtrl[id].nBoard);
1574#endif
1575 mBufFree (id); //event written--> free memory
1576 evtCtrl[k0].evtStat = -1;
1577
1578 if (k0==evtCtrl_frstPtr)
1579 {
1580 evtCtrl_frstPtr++;
1581 evtCtrl_frstPtr %= MAX_EVT * MAX_RUN;
1582 }
1583 else
1584 factPrintf(kDebug, -1, "Freed a non-first slot");
1585 }
1586 else
1587 if (evtCtrl[k0].evtStat > 0)
1588 numclear++; //writing is still ongoing...
1589
1590 /*
1591 if (k0 == evtCtrl_frstPtr && evtCtrl[k0].evtStat < 0)
1592 {
1593 evtCtrl_frstPtr++;
1594 evtCtrl_frstPtr %= MAX_EVT * MAX_RUN;
1595 }*/
1596 }
1597
1598 usleep(1);
1599 }
1600 }
1601
1602 if (gi_reset > 0)
1603 {
1604 if (gi_resetW > 0)
1605 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1606
1607 if (gi_resetX > 0)
1608 {
1609 struct timespec xwait;
1610 xwait.tv_sec = gi_resetX;
1611 xwait.tv_nsec = 0;
1612 nanosleep (&xwait, NULL);
1613 }
1614
1615 factPrintf(kInfo, -1, "Continue read Process ...");
1616 gi_reset = 0;
1617 goto START;
1618 }
1619
1620 factPrintf(kInfo, -1, "Exit read Process...");
1621
1622 factPrintf(kInfo, -1, "%ld Bytes flaged as in-use.", tgb_inuse);
1623
1624 gi_runStat = -99;
1625 gj.readStat = -99;
1626
1627 factStat (gj);
1628 factStatNew (gi);
1629
1630 return 0;
1631
1632} /*-----------------------------------------------------------------*/
1633
1634
1635void *subProc(void *thrid)
1636{
1637 const int64_t threadID = (int64_t)thrid;
1638
1639 factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID);
1640
1641 while (g_runStat > -2) //in case of 'exit' we still must process pending events
1642 {
1643 int numWait = 0;
1644 int numProc = 0;
1645
1646 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1647 {
1648 if (evtCtrl[k0].evtStat != 1000 + threadID)
1649 {
1650 if (evtCtrl[k0].evtStat < 1000 + threadID)
1651 numWait++;
1652
1653 continue;
1654 }
1655
1656 /*** if (evtCtrl.evtStat[k0] == 1000 + threadID) ****/
1657
1658 int jret = 9100; // flag to be deleted (gi_resetR>1 : flush buffers asap)
1659
1660 if (gi_resetR<=1)
1661 {
1662 const int id = k0;//evtCtrl[k0].mBuffer_idx;
1663
1664 jret = subProcEvt(threadID, evtCtrl[id].FADhead,
1665 evtCtrl[id].fEvent, NULL/*mBuffer[id].buffer*/);
1666
1667
1668 if (jret <= threadID) {
1669 factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret);
1670 jret = 5300;
1671 } else if (jret <= 0)
1672 jret = 9200 + threadID; // flag as 'to be deleted'
1673 else if (jret >= gi_maxProc)
1674 jret = 5200 + threadID; // flag as 'to be written'
1675 else
1676 jret = 1000 + jret; // flag for next proces
1677 }
1678
1679 evtCtrl[k0].evtStat = jret;
1680 numProc++;
1681 }
1682
1683 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1684 factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID);
1685 return 0;
1686 }
1687
1688 //seems we have nothing to do, so sleep a little
1689 if (numProc == 0)
1690 usleep(1);
1691 }
1692
1693 factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID);
1694
1695 return 0;
1696}
1697
1698/*-----------------------------------------------------------------*/
1699
1700
1701void *
1702procEvt (void *ptr)
1703{
1704/* *** main loop processing file, including SW-trigger */
1705 int status;
1706
1707 int lastRun = 0; //usually run from last event still valid
1708
1709// cpu_set_t mask;
1710// int cpu = 1; //process thread (will be several in final version)
1711
1712 factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc);
1713
1714/* CPU_ZERO initializes all the bits in the mask to zero. */
1715// CPU_ZERO (&mask);
1716/* CPU_SET sets only the bit corresponding to cpu. */
1717// CPU_SET( 0 , &mask ); leave for system
1718// CPU_SET( 1 , &mask ); used by write process
1719// CPU_SET (2, &mask);
1720// CPU_SET (3, &mask);
1721// CPU_SET (4, &mask);
1722// CPU_SET (5, &mask);
1723// CPU_SET (6, &mask);
1724// CPU_SET( 7 , &mask ); used by read process
1725/* sched_setaffinity returns 0 in success */
1726// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1727// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1728// factOut (kWarn, -1, str);
1729// }
1730
1731
1732 pthread_t thread[100];
1733// int th_ret[100];
1734
1735 for (long long k = 0; k < gi_maxProc; k++) {
1736 /*th_ret[k] =*/ pthread_create (&thread[k], NULL, subProc, (void *) k);
1737 }
1738
1739 // in case of 'exit' we still must process pending events
1740 while (g_runStat > -2)
1741 {
1742 int numWait = 0;
1743 int numProc = 0;
1744
1745 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1746 {
1747 if (evtCtrl[k0].evtStat <= 90 || evtCtrl[k0].evtStat >= 1000)
1748 {
1749 if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat< 90)
1750 numWait++;
1751
1752 continue;
1753 }
1754
1755 //we are asked to flush buffers asap
1756 if (gi_resetR > 1)
1757 {
1758 evtCtrl[k0].evtStat = 9991;
1759 continue;
1760 }
1761
1762 //-------- it is better to open the run already here, so call can be used to initialize
1763 //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1764 const int id = k0;//evtCtrl[k0].mBuffer_idx;
1765
1766 const uint32_t irun = evtCtrl[id].runNum;
1767 const int32_t ievt = evtCtrl[id].evNum;
1768
1769 // Find entry in runCtrl which belongs to the event mBuffer[id]
1770 // (only check if there is a need to check)
1771 if (runCtrl[lastRun].runId != irun)
1772 {
1773 //check which fileID to use (or open if needed)
1774 int j;
1775 for (j=0;j<MAX_RUN; j++)
1776 if (runCtrl[j].runId == irun)
1777 break;
1778
1779 if (j>=MAX_RUN)
1780 {
1781 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, id);
1782 // FIXME: What is the right action? (Flag event for deletion?)
1783 continue;
1784 }
1785
1786 lastRun = j;
1787 }
1788
1789 // File not yet open
1790 if (runCtrl[lastRun].fileId < 0)
1791 {
1792 //---- we need to open a new run ==> make sure all older runs are
1793 //---- finished and marked to be closed ....
1794 // This loop is unique to procEvt
1795 for (int j=0; j<MAX_RUN; j++)
1796 {
1797 if (runCtrl[j].fileId == 0)
1798 {
1799 runCtrl[j].procId = 2; //--> do no longer accept events for processing
1800
1801 //---- problem: processing still going on ==> must wait for closing ....
1802 factPrintf(kInfo, -1, "procEvt: Finished run since new one opened %d", runCtrl[j].runId);
1803 runFinish1(runCtrl[j].runId);
1804 }
1805 }
1806
1807 RUN_HEAD actRun;
1808 actRun.Version = 1;
1809 actRun.RunType = -1; //to be adapted
1810 actRun.Nroi = evtCtrl[id].nRoi; //runCtrl[lastRun].roi0;
1811 actRun.NroiTM = evtCtrl[id].nRoiTM; //runCtrl[lastRun].roi8;
1812 actRun.RunTime = evtCtrl[id].pcTime[0]; //runCtrl[lastRun].firstTime;
1813 actRun.RunUsec = evtCtrl[id].pcTime[1]; //runCtrl[lastRun].firstUsec;
1814 actRun.NBoard = NBOARDS;
1815 actRun.NPix = NPIX;
1816 actRun.NTm = NTMARK;
1817
1818 memcpy(actRun.FADhead, evtCtrl[id].FADhead, NBOARDS*sizeof(PEVNT_HEADER));
1819
1820 runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun));
1821 if (runCtrl[lastRun].fileHd == NULL)
1822 {
1823 factPrintf(kError, 502, "procEvt: Could not open a file for run %d (runOpen failed)", irun);
1824 runCtrl[lastRun].fileId = 91;
1825 runCtrl[lastRun].procId = 91; // Is not set in writeEvt
1826 continue;
1827 }
1828
1829 runCtrl[lastRun].fileId = 0;
1830 runCtrl[lastRun].procId = 0; // Is not set in writeEvt
1831
1832 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt);
1833 }
1834
1835 //-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
1836 if (runCtrl[lastRun].procId == 0)
1837 {
1838 if (runCtrl[lastRun].closeTime < g_actTime ||
1839 runCtrl[lastRun].lastTime < g_actTime - 300 ||
1840 runCtrl[lastRun].maxEvt <= runCtrl[lastRun].procEvt)
1841 {
1842 factPrintf(kInfo, 502, "procEvt: Reached end of run condition for run %d", irun);
1843 runFinish1 (runCtrl[lastRun].runId);
1844 runCtrl[lastRun].procId = 1;
1845 }
1846 }
1847
1848 // Skip event because of no active run
1849 if (runCtrl[lastRun].procId != 0)
1850 {
1851 evtCtrl[k0].evtStat = 9091;
1852 continue;
1853 }
1854
1855 //--------
1856 //--------
1857
1858 //const int roi = mBuffer[id].nRoi;
1859 //const int roiTM = mBuffer[id].nRoiTM;
1860
1861 //make sure unused pixels/tmarks are cleared to zero
1862 //ETIENNE don't reset it to zero as it is taken care of in DataWriteFits
1863 // if (roiTM == roi)
1864 // roiTM = 0;
1865 /*
1866 for (int ip=0; ip<NPIX; ip++)
1867 {
1868 if (mBuffer[id].fEvent->StartPix[ip] == -1)
1869 {
1870 const int dest = ip * roi;
1871 memset(&mBuffer[id].fEvent->Adc_Data[dest], 0, roi * 2);
1872 }
1873 }
1874
1875 for (int it=0; it<NTMARK; it++)
1876 {
1877 if (mBuffer[id].fEvent->StartTM[it] == -1)
1878 {
1879 const int dest = it * roi + NPIX * roi;
1880 memset(&mBuffer[id].fEvent->Adc_Data[dest], 0, roi * 2);
1881 }
1882 }*/
1883
1884 //and set correct event header ; also check for consistency in event (not yet)
1885 evtCtrl[id].fEvent->Roi = evtCtrl[id].nRoi;
1886 evtCtrl[id].fEvent->RoiTM = evtCtrl[id].nRoiTM;
1887 evtCtrl[id].fEvent->EventNum = evtCtrl[id].evNum;
1888 evtCtrl[id].fEvent->TriggerNum = evtCtrl[id].trgNum;
1889 evtCtrl[id].fEvent->TriggerType = evtCtrl[id].trgTyp;
1890 evtCtrl[id].fEvent->Errors[0] = evtCtrl[id].Errors[0];
1891 evtCtrl[id].fEvent->Errors[1] = evtCtrl[id].Errors[1];
1892 evtCtrl[id].fEvent->Errors[2] = evtCtrl[id].Errors[2];
1893 evtCtrl[id].fEvent->Errors[3] = evtCtrl[id].Errors[3];
1894 evtCtrl[id].fEvent->SoftTrig = 0;
1895
1896
1897 for (int ib=0; ib<NBOARDS; ib++)
1898 {
1899 // board is not read
1900 if (evtCtrl[id].board[ib] == -1)
1901 {
1902 evtCtrl[id].FADhead[ib].start_package_flag = 0;
1903 evtCtrl[id].fEvent->BoardTime[ib] = 0;
1904 }
1905 else
1906 {
1907 evtCtrl[id].fEvent->BoardTime[ib] = evtCtrl[id].FADhead[ib].time;
1908 }
1909 }
1910
1911 const int rc = eventCheck(evtCtrl[id].runNum, evtCtrl[id].FADhead,
1912 evtCtrl[id].fEvent);
1913 //gi.procTot++;
1914 numProc++;
1915
1916 if (rc < 0)
1917 {
1918 evtCtrl[k0].evtStat = 9999; //flag event to be skipped
1919 //gi.procErr++;
1920 }
1921 else
1922 {
1923 evtCtrl[k0].evtStat = 1000;
1924 runCtrl[lastRun].procEvt++;
1925 }
1926 }
1927
1928 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1929 factPrintf(kInfo, -1, "Exit Processing Process ...");
1930 gp_runStat = -22; //==> we should exit
1931 gj.procStat = -22; //==> we should exit
1932 return 0;
1933 }
1934
1935 //seems we have nothing to do, so sleep a little
1936 if (numProc == 0)
1937 usleep(1);
1938
1939 gp_runStat = gi_runStat;
1940 gj.procStat = gj.readStat;
1941
1942 }
1943
1944 //we are asked to abort asap ==> must flag all remaining events
1945 // when gi_runStat claims that all events are in the buffer...
1946
1947 factPrintf(kInfo, -1, "Abort Processing Process ...");
1948
1949 for (int k = 0; k < gi_maxProc; k++) {
1950 pthread_join (thread[k], (void **) &status);
1951 }
1952
1953 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1954 {
1955 if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat < 1000)
1956 evtCtrl[k0].evtStat = 9800; //flag event as 'processed'
1957 }
1958
1959 gp_runStat = -99;
1960 gj.procStat = -99;
1961
1962 return 0;
1963
1964} /*-----------------------------------------------------------------*/
1965
1966int
1967CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
1968{
1969/* close run runId (all all runs if runId=0) */
1970/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1971 int j;
1972
1973
1974 if (runId == 0) {
1975 for (j = 0; j < MAX_RUN; j++) {
1976 if (runCtrl[j].fileId == 0) { //run is open
1977 runCtrl[j].closeTime = closeTime;
1978 runCtrl[j].maxEvt = maxEvt;
1979 }
1980 }
1981 return 0;
1982 }
1983
1984 for (j = 0; j < MAX_RUN; j++) {
1985 if (runCtrl[j].runId == runId) {
1986 if (runCtrl[j].fileId == 0) { //run is open
1987 runCtrl[j].closeTime = closeTime;
1988 runCtrl[j].maxEvt = maxEvt;
1989 return 0;
1990 } else if (runCtrl[j].fileId < 0) { //run not yet opened
1991 runCtrl[j].closeTime = closeTime;
1992 runCtrl[j].maxEvt = maxEvt;
1993 return +1;
1994 } else { // run already closed
1995 return +2;
1996 }
1997 }
1998 } //we only reach here if the run was never created
1999 return -1;
2000
2001}
2002
2003void checkAndCloseRun(int j, int irun, int cond, int where)
2004{
2005 if (!cond &&
2006 runCtrl[j].closeTime >= g_actTime &&
2007 runCtrl[j].lastTime >= g_actTime - 300 &&
2008 runCtrl[j].maxEvt > runCtrl[j].actEvt)
2009 return;
2010
2011 //close run for whatever reason
2012 int ii = 0;
2013 if (cond)
2014 ii = 1;
2015 if (runCtrl[j].closeTime < g_actTime)
2016 ii |= 2; // = 2;
2017 if (runCtrl[j].lastTime < g_actTime - 300)
2018 ii |= 4; // = 3;
2019 if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
2020 ii |= 8; // = 4;
2021
2022 if (runCtrl[j].procId == 0)
2023 {
2024 runFinish1(runCtrl[j].runId);
2025 runCtrl[j].procId = 92;
2026 }
2027
2028 runCtrl[j].closeTime = g_actTime - 1;
2029
2030 const int rc = runClose(runCtrl[j].fileHd, NULL, 0);//&runTail[j], sizeof(runTail[j]));
2031 if (rc<0)
2032 {
2033 factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)",
2034 where, runCtrl[j].runId, rc);
2035 runCtrl[j].fileId = 92+where*2;
2036 }
2037 else
2038 {
2039 factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)",
2040 where, irun, ii);
2041 runCtrl[j].fileId = 93+where*2;
2042 }
2043}
2044
2045/*-----------------------------------------------------------------*/
2046
2047
2048void *writeEvt (void *ptr)
2049{
2050/* *** main loop writing event (including opening and closing run-files */
2051
2052// cpu_set_t mask;
2053// int cpu = 1; //write thread
2054
2055 factPrintf(kInfo, -1, "Starting write-thread");
2056
2057/* CPU_ZERO initializes all the bits in the mask to zero. */
2058// CPU_ZERO (&mask);
2059/* CPU_SET sets only the bit corresponding to cpu. */
2060// CPU_SET (cpu, &mask);
2061/* sched_setaffinity returns 0 in success */
2062// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
2063// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
2064// }
2065
2066 int lastRun = 0; //usually run from last event still valid
2067
2068 while (g_runStat > -2)
2069 {
2070 int numWrite = 0;
2071 int numWait = 0;
2072
2073 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
2074 {
2075 if (evtCtrl[k0].evtStat <= 5000 || evtCtrl[k0].evtStat >= 9000)
2076 {
2077 if (evtCtrl[k0].evtStat > 0 && evtCtrl[k0].evtStat < 9000)
2078 numWait++;
2079
2080 continue;
2081 }
2082
2083 //we must drain the buffer asap
2084 if (gi_resetR > 1)
2085 {
2086 evtCtrl[k0].evtStat = 9904;
2087 continue;
2088 }
2089
2090 const int id = k0;//evtCtrl[k0].mBuffer_idx;
2091
2092 const uint32_t irun = evtCtrl[id].runNum;
2093 const int32_t ievt = evtCtrl[id].evNum;
2094
2095 // Find entry in runCtrl which belongs to the event mBuffer[id]
2096 // (only check if there is a need to check)
2097 if (runCtrl[lastRun].runId != irun)
2098 {
2099 //check which fileID to use (or open if needed)
2100 int j;
2101 for (j=0;j<MAX_RUN; j++)
2102 if (runCtrl[j].runId == irun)
2103 break;
2104
2105 if (j>=MAX_RUN)
2106 {
2107 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, id);
2108 // FIXME: What is the right action?
2109 continue;
2110 }
2111
2112 lastRun = j;
2113 }
2114
2115 // File not yet open
2116 if (runCtrl[lastRun].fileId < 0)
2117 {
2118 RUN_HEAD actRun;
2119 actRun.Version = 1;
2120 actRun.RunType = -1; //to be adapted
2121 actRun.Nroi = evtCtrl[id].nRoi; //runCtrl[lastRun].roi0;
2122 actRun.NroiTM = evtCtrl[id].nRoiTM; //runCtrl[lastRun].roi8;
2123 actRun.RunTime = evtCtrl[id].pcTime[0];//runCtrl[lastRun].firstTime;
2124 actRun.RunUsec = evtCtrl[id].pcTime[1];//runCtrl[lastRun].firstUsec;
2125 actRun.NBoard = NBOARDS;
2126 actRun.NPix = NPIX;
2127 actRun.NTm = NTMARK;
2128
2129 memcpy(actRun.FADhead, evtCtrl[id].FADhead, NBOARDS * sizeof (PEVNT_HEADER));
2130
2131 runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun));
2132 if (runCtrl[lastRun].fileHd == NULL)
2133 {
2134 factPrintf(kError, 502, "writeEvt: Could not open a file for run %d (runOpen failed)", irun);
2135 runCtrl[lastRun].fileId = 91;
2136 continue;
2137 }
2138
2139 runCtrl[lastRun].fileId = 0;
2140 factPrintf(kInfo, -1, "writeEvt: Opened new file for run %d (evt %d)", irun, ievt);
2141 }
2142
2143 if (runCtrl[lastRun].fileId > 0)
2144 {
2145 // There is an event but file is already closed
2146 /*
2147 if (runCtrl[j].fileId < 100)
2148 {
2149 factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun);
2150 runCtrl[j].fileId += 100;
2151 }*/
2152
2153 evtCtrl[k0].evtStat = 9903;
2154 }
2155
2156 // File is open
2157 if (runCtrl[lastRun].fileId==0)
2158 {
2159 const int rc = runWrite(runCtrl[lastRun].fileHd, evtCtrl[id].fEvent,
2160 sizeof (evtCtrl[id]));
2161 if (rc >= 0)
2162 {
2163 // Sucessfully wrote event
2164 runCtrl[lastRun].lastTime = g_actTime;
2165 runCtrl[lastRun].actEvt++;
2166
2167 evtCtrl[k0].evtStat = 9901;
2168 }
2169 else
2170 {
2171 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun);
2172 evtCtrl[k0].evtStat = 9902;
2173 }
2174
2175 checkAndCloseRun(lastRun, irun, rc<0, 1);
2176 }
2177 }
2178/*
2179 //check if we should close a run (mainly when no event pending)
2180 //ETIENNE but first figure out which one is the latest run with a complete event.
2181 //i.e. max run Id and lastEvt >= 0
2182 //this condition is sufficient because all pending events were written already in the loop just above
2183 //actrun
2184 uint32_t lastStartedTime = 0;
2185 uint32_t runIdFound = 1;
2186
2187 //If we have an active run, look for its start time
2188 if (actrun != 0)
2189 {
2190 runIdfound = 0;
2191 for (int j=0;j<MAX_RUN;j++)
2192 {
2193 if (runCtrl[j].runId == actrun)
2194 {
2195 lastStartedTime = runCtrl[j].lastTime;
2196 runIdFound = 1;
2197 }
2198 }
2199 }
2200
2201 if (runIdFound == 0)
2202 {
2203 factPrintf(kInfo, 0, "An Active run (number %u) has been registered, but it could not be found in the runs list", actrun);
2204 }
2205
2206 //Also check if some files will never be opened
2207 //EDIT: this is completely useless, because as run Numbers are taken from FADs board,
2208 //I will never get run numbers for which no file is to be opened
2209 for (int j=0;j<MAX_RUN;j++)
2210 {
2211 if ((runCtrl[j].fileId < 0) &&
2212 (runCtrl[j].lastTime < lastStartedTime) &&
2213 (runCtrl[j].runId != 0))
2214 {
2215 factPrintf(kInfo, 0, "writeEvt: No file will be opened for run %u. Last run: %u (started)", runCtrl[j].runId, actrun);
2216 ;//TODO notify that this run will never be opened
2217 }
2218 }
2219 */
2220
2221 // Although the are no pending events, we have to check if a run should be closed (timeout)
2222 for (int j=0; j<MAX_RUN; j++)
2223 {
2224 if (runCtrl[j].fileId == 0)
2225 {
2226 //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
2227 const int cond = /*runCtrl[j].lastTime < lastStartedTime &&*/ runCtrl[j].runId == 0;
2228 checkAndCloseRun(j, runCtrl[j].runId, cond, 2);
2229 }
2230 }
2231
2232 //seems we have nothing to do, so sleep a little
2233 if (numWrite == 0)
2234 usleep(1);
2235
2236 //nothing left to do
2237 if (gj.readStat < -10 && numWait == 0)
2238 {
2239 factPrintf(kInfo, -1, "Finish Write Process ...");
2240 gw_runStat = -22; //==> we should exit
2241 gj.writStat = -22; //==> we should exit
2242 break;
2243 }
2244
2245 gw_runStat = gi_runStat;
2246 gj.writStat = gj.readStat;
2247 }
2248
2249 factPrintf(kInfo, -1, "Close all open files ...");
2250 for (int j=0; j<MAX_RUN; j++)
2251 {
2252 if (runCtrl[j].fileId == 0)
2253 checkAndCloseRun(j, runCtrl[j].runId, 1, 3);
2254 }
2255
2256 gw_runStat = -99;
2257 gj.writStat = -99;
2258
2259 factPrintf(kInfo, -1, "Exit Writing Process ...");
2260
2261 return 0;
2262} /*-----------------------------------------------------------------*/
2263
2264
2265
2266
2267void
2268StartEvtBuild ()
2269{
2270
2271 int i, /*j,*/ imax, status/*, th_ret[50]*/;
2272 pthread_t thread[50];
2273 struct timespec xwait;
2274
2275 gi_runStat = gp_runStat = gw_runStat = 0;
2276 gj.readStat = gj.procStat = gj.writStat = 0;
2277
2278 factPrintf(kInfo, -1, "Starting EventBuilder V15.07 A");
2279
2280//initialize run control logics
2281 for (i = 0; i < MAX_RUN; i++) {
2282 runCtrl[i].runId = 0;
2283 runCtrl[i].fileId = -2;
2284 }
2285
2286//prepare for subProcesses
2287 gi_maxSize = g_maxSize;
2288 if (gi_maxSize <= 0)
2289 gi_maxSize = 1;
2290
2291 gi_maxProc = g_maxProc;
2292 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2293 factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc);
2294 gi_maxProc = 1;
2295 }
2296//partially initialize event control logics
2297 evtCtrl_frstPtr = 0;
2298 evtCtrl_lastPtr = 0;
2299
2300//start all threads (more to come) when we are allowed to ....
2301 while (g_runStat == 0) {
2302 xwait.tv_sec = 0;
2303 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2304 nanosleep (&xwait, NULL);
2305 }
2306
2307 i = 0;
2308 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL);
2309 i++;
2310 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL);
2311 i++;
2312 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL);
2313 i++;
2314 imax = i;
2315
2316
2317#ifdef BILAND
2318 xwait.tv_sec = 30;;
2319 xwait.tv_nsec = 0; // sleep for ~20sec
2320 nanosleep (&xwait, NULL);
2321
2322 printf ("close all runs in 2 seconds\n");
2323
2324 CloseRunFile (0, time (NULL) + 2, 0);
2325
2326 xwait.tv_sec = 1;;
2327 xwait.tv_nsec = 0; // sleep for ~20sec
2328 nanosleep (&xwait, NULL);
2329
2330 printf ("setting g_runstat to -1\n");
2331
2332 g_runStat = -1;
2333#endif
2334
2335
2336//wait for all threads to finish
2337 for (i = 0; i < imax; i++) {
2338 /*j =*/ pthread_join (thread[i], (void **) &status);
2339 }
2340
2341} /*-----------------------------------------------------------------*/
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357 /*-----------------------------------------------------------------*/
2358 /*-----------------------------------------------------------------*/
2359 /*-----------------------------------------------------------------*/
2360 /*-----------------------------------------------------------------*/
2361 /*-----------------------------------------------------------------*/
2362
2363#ifdef BILAND
2364
2365int
2366subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2367 int8_t * buffer)
2368{
2369 printf ("called subproc %d\n", threadID);
2370 return threadID + 1;
2371}
2372
2373
2374
2375
2376 /*-----------------------------------------------------------------*/
2377 /*-----------------------------------------------------------------*/
2378 /*-----------------------------------------------------------------*/
2379 /*-----------------------------------------------------------------*/
2380 /*-----------------------------------------------------------------*/
2381
2382
2383
2384
2385FileHandle_t
2386runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2387{
2388 return 1;
2389};
2390
2391int
2392runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2393{
2394 return 1;
2395 usleep (10000);
2396 return 1;
2397}
2398
2399
2400//{ return 1; } ;
2401
2402int
2403runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2404{
2405 return 1;
2406};
2407
2408
2409
2410
2411int
2412eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2413{
2414 int i = 0;
2415
2416// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2417// for (i=0; i<NBOARDS; i++) {
2418// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2419// }
2420 return 0;
2421}
2422
2423
2424void
2425factStatNew (EVT_STAT gi)
2426{
2427 int i;
2428
2429//for (i=0;i<MAX_SOCK;i++) {
2430// printf("%4d",gi.numRead[i]);
2431// if (i%20 == 0 ) printf("\n");
2432//}
2433}
2434
2435void
2436gotNewRun (int runnr, PEVNT_HEADER * headers)
2437{
2438 printf ("got new run %d\n", runnr);
2439 return;
2440}
2441
2442void
2443factStat (GUI_STAT gj)
2444{
2445// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2446// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2447}
2448
2449
2450void
2451debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2452 int state, uint32_t tsec, uint32_t tusec)
2453{
2454// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2455}
2456
2457
2458
2459void
2460debugStream (int isock, void *buf, int len)
2461{
2462}
2463
2464void
2465debugHead (int i, int j, void *buf)
2466{
2467}
2468
2469
2470void
2471factOut (int severity, int err, char *message)
2472{
2473 static FILE *fd;
2474 static int file = 0;
2475
2476 if (file == 0) {
2477 printf ("open file\n");
2478 fd = fopen ("x.out", "w+");
2479 file = 999;
2480 }
2481
2482 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2483
2484 if (severity != kDebug)
2485 printf ("%3d %3d | %s\n", severity, err, message);
2486}
2487
2488
2489
2490int
2491main ()
2492{
2493 int i, b, c, p;
2494 char ipStr[100];
2495 struct in_addr IPaddr;
2496
2497 g_maxMem = 1024 * 1024; //MBytes
2498//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2499 g_maxMem = g_maxMem * 200; //100MBytes
2500
2501 g_maxProc = 20;
2502 g_maxSize = 30000;
2503
2504 g_runStat = 40;
2505
2506 i = 0;
2507
2508// version for standard crates
2509//for (c=0; c<4,c++) {
2510// for (b=0; b<10; b++) {
2511// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2512//
2513// inet_pton(PF_INET, ipStr, &IPaddr) ;
2514//
2515// g_port[i].sockAddr.sin_family = PF_INET;
2516// g_port[i].sockAddr.sin_port = htons(5000) ;
2517// g_port[i].sockAddr.sin_addr = IPaddr ;
2518// g_port[i].sockDef = 1 ;
2519// i++ ;
2520// }
2521//}
2522//
2523//version for PC-test *
2524 for (c = 0; c < 4; c++) {
2525 for (b = 0; b < 10; b++) {
2526 sprintf (ipStr, "10.0.%d.11", 128 + c);
2527 if (c < 2)
2528 sprintf (ipStr, "10.0.%d.11", 128);
2529 else
2530 sprintf (ipStr, "10.0.%d.11", 131);
2531// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2532
2533 inet_pton (PF_INET, ipStr, &IPaddr);
2534 p = 31919 + 100 * c + 10 * b;
2535
2536
2537 g_port[i].sockAddr.sin_family = PF_INET;
2538 g_port[i].sockAddr.sin_port = htons (p);
2539 g_port[i].sockAddr.sin_addr = IPaddr;
2540 g_port[i].sockDef = 1;
2541
2542 i++;
2543 }
2544 }
2545
2546
2547//g_port[17].sockDef =-1 ;
2548//g_actBoards-- ;
2549
2550 StartEvtBuild ();
2551
2552 return 0;
2553
2554}
2555#endif
Note: See TracBrowser for help on using the repository browser.