source: trunk/FACT++/src/EventBuilder.c@ 15493

Last change on this file since 15493 was 15490, checked in by tbretz, 12 years ago
Let the event buffer store the index of the corresponding runCtrl entry.
File size: 60.8 KB
Line 
1// #define EVTDEBUG
2
3#define NUMSOCK 1 //set to 7 for old configuration
4#define MAXREAD 65536 //64kB wiznet buffer
5
6#include <stdlib.h>
7#include <stdint.h>
8#include <stdarg.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include "EventBuilder.h"
26
27enum Severity
28{
29 kMessage = 10, ///< Just a message, usually obsolete
30 kInfo = 20, ///< An info telling something which can be interesting to know
31 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
32 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
33 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
34 kDebug = 99, ///< A message used for debugging only
35};
36
37#define MIN_LEN 32 // min #bytes needed to interpret FADheader
38#define MAX_LEN 256*1024 // size of read-buffer per socket
39
40//#define nanosleep(x,y)
41
42extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
43extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
44extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
45//extern int runFinish (uint32_t runnr);
46
47extern void factOut (int severity, int err, char *message);
48extern void factReportIncomplete (uint64_t rep);
49
50extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
51
52
53extern void factStat (GUI_STAT gj);
54
55extern void factStatNew (EVT_STAT gi);
56
57extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
58
59extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
60 int8_t * buffer);
61
62extern void debugHead (int i, int j, void *buf);
63
64extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
65 int32_t runnr, int state, uint32_t tsec,
66 uint32_t tusec);
67extern void debugStream (int isock, void *buf, int len);
68
69int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
70
71int evtCtrl_frstPtr; // First event in queue
72int evtCtrl_lastPtr; // pointer to next free slot
73
74int g_maxProc;
75int gi_maxProc;
76
77uint g_actTime;
78int g_runStat;
79int g_reset;
80
81size_t g_maxMem; //maximum memory allowed for buffer
82
83FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
84
85uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
86
87//EVT_STAT gi;
88GUI_STAT gj;
89
90#define MAX_EVT 65536 // ( 300s @ 220Hz; 16GB = 5000 evt @ roi=1024 (27s) ; 18000 evt @ roi = 300 )
91#define MAX_RUN 8 // Number of concurrent runs
92
93EVT_CTRL evtCtrl[MAX_EVT]; //control of events during processing
94
95void factPrintf(int severity, int id, const char *fmt, ...)
96{
97 char str[1000];
98
99 va_list ap;
100 va_start(ap, fmt);
101 vsnprintf(str, 1000, fmt, ap);
102 va_end(ap);
103
104 factOut(severity, id, str);
105}
106
107
108#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
109#define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)
110typedef struct TGB_struct
111{
112 struct TGB_struct *prev;
113 void *mem;
114} TGB_entry;
115
116TGB_entry *tgb_last = NULL;
117uint64_t tgb_memory = 0;
118uint64_t tgb_inuse = 0;
119
120void *TGB_Malloc()
121{
122 // No free slot available, next alloc would exceed max memory
123 if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem)
124 return NULL;
125
126 // We will return this amount of memory
127 tgb_inuse += MAX_TOT_MEM;
128
129 // No free slot available, allocate a new one
130 if (!tgb_last)
131 {
132 tgb_memory += MAX_TOT_MEM;
133 return malloc(MAX_TOT_MEM);
134 }
135
136 // Get the next free slot from the stack and return it
137 TGB_entry *last = tgb_last;
138
139 void *mem = last->mem;
140 tgb_last = last->prev;
141
142 free(last);
143
144 return mem;
145};
146
147void TGB_free(void *mem)
148{
149 // Add the last free slot to the stack
150 TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry));
151
152 // FIXME: Really free memory if memory usuage exceeds g_maxMem
153
154 entry->prev = tgb_last;
155 entry->mem = mem;
156
157 tgb_last = entry;
158
159 // Decrease the amont of memory in use accordingly
160 tgb_inuse -= MAX_TOT_MEM;
161}
162
163RUN_CTRL runCtrl[MAX_RUN];
164
165/*
166*** Definition of rdBuffer to read in IP packets; keep it global !!!!
167 */
168
169typedef union
170{
171 uint8_t B[MAX_LEN];
172 uint16_t S[MAX_LEN / 2];
173 uint32_t I[MAX_LEN / 4];
174 uint64_t L[MAX_LEN / 8];
175} CNV_FACT;
176
177typedef struct
178{
179 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
180 int32_t bufPos; //next byte to read to the buffer next
181 int32_t bufLen; //number of bytes left to read
182 int32_t skip; //number of bytes skipped before start of event
183
184 int errCnt; //how often connect failed since last successful
185 int sockStat; //-1 if socket not yet connected , 99 if not exist
186 int socket; //contains the sockets
187
188 struct sockaddr_in SockAddr; //IP for each socket
189
190 int evtID; // event ID of event currently read
191 int runID; // run "
192 int ftmID; // event ID from FTM
193 uint fadLen; // FADlength of event currently read
194 int fadVers; // Version of FAD
195 int ftmTyp; // trigger type
196 int Port;
197
198 CNV_FACT *rBuf;
199
200} READ_STRUCT;
201
202/*-----------------------------------------------------------------*/
203
204
205/*-----------------------------------------------------------------*/
206
207
208int
209GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
210 READ_STRUCT * rs)
211{
212/*
213*** generate Address, create sockets and allocates readbuffer for it
214***
215*** if flag==0 generate socket and buffer
216*** <0 destroy socket and buffer
217*** >0 close and redo socket
218***
219*** sid : board*7 + port id
220 */
221
222 //close socket if open
223 if (rs->sockStat == 0)
224 {
225 if (close (rs->socket) > 0) {
226 factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno);
227 } else {
228 factPrintf(kInfo, 771, "Succesfully closed socket %d", sid);
229 }
230 }
231
232 rs->sockStat = 99;
233
234 if (flag < 0) {
235 free (rs->rBuf); //and never open again
236 rs->rBuf = NULL;
237 return 0;
238 }
239
240
241 if (flag == 0) { //generate address and buffer ...
242 rs->Port = port;
243 rs->SockAddr.sin_family = sockAddr->sin_family;
244 rs->SockAddr.sin_port = htons (port);
245 rs->SockAddr.sin_addr = sockAddr->sin_addr;
246
247 rs->rBuf = (CNV_FACT*)malloc (sizeof (CNV_FACT));
248 if (rs->rBuf == NULL) {
249 factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid);
250 rs->sockStat = 77;
251 return -3;
252 }
253 }
254
255
256 if ((rs->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
257 factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno);
258 rs->sockStat = 88;
259 return -2;
260 }
261
262 int optval = 1;
263 if (setsockopt (rs->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) {
264 factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
265 }
266 optval = 10; //start after 10 seconds
267 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) {
268 factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
269 }
270 optval = 10; //do every 10 seconds
271 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) {
272 factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
273 }
274 optval = 2; //close after 2 unsuccessful tries
275 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) {
276 factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
277 }
278
279 factPrintf(kInfo, 773, "Successfully generated socket %d", sid);
280
281 rs->sockStat = -1; //try to (re)open socket
282 rs->errCnt = 0;
283 return 0;
284
285} /*-----------------------------------------------------------------*/
286
287 /*-----------------------------------------------------------------*/
288
289int checkRoiConsistency(const CNV_FACT *rbuf, int roi[])
290{
291 int xjr = -1;
292 int xkr = -1;
293
294 //points to the very first roi
295 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
296
297 roi[0] = ntohs(rbuf->S[roiPtr]);
298
299 for (int jr = 0; jr < 9; jr++)
300 {
301 roi[jr] = ntohs(rbuf->S[roiPtr]);
302
303 if (roi[jr]<0 || roi[jr]>1024)
304 {
305 factPrintf(kError, 999, "Illegal roi in channel %d (allowed: 0<=roi<=1024)", jr, roi[jr]);
306 return 0;
307 }
308
309 // Check that the roi of pixels jr are compatible with the one of pixel 0
310 if (jr!=8 && roi[jr]!=roi[0])
311 {
312 xjr = jr;
313 break;
314 }
315
316 // Check that the roi of all other DRS chips on boards are compatible
317 for (int kr = 1; kr < 4; kr++)
318 {
319 const int kroi = ntohs(rbuf->S[roiPtr]);
320 if (kroi != roi[jr])
321 {
322 xjr = jr;
323 xkr = kr;
324 break;
325 }
326 roiPtr += kroi+4;
327 }
328 }
329
330 if (xjr>=0)
331 {
332 if (xkr<0)
333 factPrintf(kFatal, 1, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
334 else
335 factPrintf(kFatal, 1, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rbuf->S[roiPtr]));
336
337 return 0;
338 }
339
340 if (roi[8] < roi[0])
341 {
342 factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
343 //gj.badRoiB++;
344 //gj.badRoi[b]++;
345 return 0;
346 }
347
348 return 1;
349}
350
351int mBufEvt(const READ_STRUCT *rs)
352{
353 int nRoi[9];
354 if (!checkRoiConsistency(rs->rBuf, nRoi))
355 return -9999;
356
357 const int evID = rs->evtID;
358 const uint runID = rs->runID;
359 const int trgTyp = rs->ftmTyp;
360 const int trgNum = rs->ftmID;
361 const int fadNum = rs->evtID;
362
363 const int beg = (evtCtrl_lastPtr + MAX_EVT - 1) % MAX_EVT;
364 const int end = (evtCtrl_frstPtr + MAX_EVT - 1) % MAX_EVT;
365
366 for (int k=beg; k!=end; k=(k+MAX_EVT-1)%MAX_EVT)
367 {
368 // If the run is different, go on searching.
369 // We cannot stop searching if a lower run-id is found as in
370 // the case of the events, because theoretically, there
371 // can be the same run on two different days.
372 if (runID != evtCtrl[k].runNum)
373 continue;
374
375 // If the ID of the new event if higher than the last one stored
376 // in that run, we have to assign a new slot (leave the loop)
377 if (evID > evtCtrl[k].evNum/* && runID == evtCtrl[k].runNum*/)
378 break;
379
380 if (evID != evtCtrl[k].evNum/* || runID != evtCtrl[k].runNum*/)
381 continue;
382
383 // We have found an entry with the same runID and evtID
384 // Check if ROI is consistent
385 if (evtCtrl[k].nRoi != nRoi[0] || evtCtrl[k].nRoiTM != nRoi[8])
386 {
387 factPrintf(kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
388 evtCtrl[k].nRoi, evtCtrl[k].nRoiTM, nRoi[0], nRoi[8]);
389 return -8201;
390 }
391
392 // count for inconsistencies
393 if (evtCtrl[k].trgNum != trgNum)
394 evtCtrl[k].Errors[0]++;
395 if (evtCtrl[k].fadNum != fadNum)
396 evtCtrl[k].Errors[1]++;
397 if (evtCtrl[k].trgTyp != trgTyp)
398 evtCtrl[k].Errors[2]++;
399
400 //everything seems fine so far ==> use this slot ....
401 return k;
402 }
403
404 // Check if the control structure still has space left
405 if (end-beg==1 || (end==0 && beg==MAX_EVT-1))
406 {
407 factPrintf(kError, 881, "No control slot to keep event %d (run %d) %d %d", evID, runID, beg, end);
408 return -1;
409 }
410
411 // If we have already queued at least one event,
412 // check the roi if the previous event
413 // already belongs to the same run.
414
415 // Get the runCtrl entry of the previous event.
416 // If none is in the queue (we are anyhow super fast)
417 // just get the correct entry from the runCtrl array.
418 int idx = evtCtrl[beg].runCtrl_idx;
419
420 // If there is an event in the queue and it has the same runID, we can use
421 // that event to check for the roi consistency throughout the run
422 if (evtCtrl_frstPtr!=evtCtrl_lastPtr && runCtrl[idx].runId==runID)
423 {
424 // Check if run already registered (old entries should have runId==-1)
425 if (runCtrl[idx].roi0 != nRoi[0] || runCtrl[idx].roi8 != nRoi[8])
426 {
427 factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
428 runCtrl[idx].roi0, runCtrl[idx].roi8, nRoi[0], nRoi[8], runID, evID);
429 return -9301;
430 }
431 }
432
433 // If there is none in the queue, we have to search for the correct entry
434 if (evtCtrl_frstPtr==evtCtrl_lastPtr)
435 {
436 idx = -1;
437
438 for (int k=0; k<MAX_RUN; k++)
439 {
440 if (runCtrl[k].runId==runID)
441 {
442 idx = k;
443 break;
444 }
445 }
446 }
447
448 struct timeval tv;
449 gettimeofday(&tv, NULL);
450
451 const uint32_t tsec = tv.tv_sec;
452 const uint32_t tusec = tv.tv_usec;
453
454 // Run not yet registered, register run
455 // If we haven't found a corresponding entry in the queue, or the runId has changed
456 // we find the oldest empty entry in the runCtrl array and create a new runCtrl entry
457 if (idx<0 || runCtrl[idx].runId!=runID)
458 {
459 // If there is none in the queue or
460 idx = -1;
461
462 uint oldest = g_actTime + 1000;
463 for (int k=0; k<MAX_RUN; k++)
464 {
465 // This is just for sanity. We use the oldest free entry (until
466 // we have understood the concept and can use "just" a free entry
467 if (runCtrl[k].runId==0 && runCtrl[k].closeTime < oldest)
468 {
469 oldest = runCtrl[k].closeTime;
470 idx = k;
471 }
472 }
473
474 if (idx<0)
475 {
476 factPrintf(kFatal, 883, "Not able to register the new run %d", runID);
477 return -1001;
478 }
479
480 factPrintf(kInfo, 503, "New run %d (evt=%d, idx=%d) registered with roi=%d and roi_tm=%d",
481 runID, evID, idx, nRoi[0], nRoi[8]);
482
483 runCtrl[idx].runId = runID;
484 runCtrl[idx].roi0 = nRoi[0]; // FIXME: Make obsolete!
485 runCtrl[idx].roi8 = nRoi[8]; // FIXME: Make obsolete!
486 runCtrl[idx].fileId = -2;
487 runCtrl[idx].lastEvt = 1; // Number of events partially started to read
488 runCtrl[idx].actEvt = 0; // Number of written events (write)
489 runCtrl[idx].procEvt = 0; // Number of successfully checked events (checkEvent)
490 runCtrl[idx].maxEvt = 999999999; // max number events allowed
491 runCtrl[idx].lastTime = tsec; // Time when the last event was written
492 runCtrl[idx].closeTime = tsec + 3600 * 24; //max time allowed
493 }
494
495 // The new entry which is populated is the one lastPtr is pointing to
496 const int k = evtCtrl_lastPtr;
497
498 //flag all boards as unused
499 evtCtrl[k].nBoard = 0;
500 for (int b=0; b<NBOARDS; b++)
501 evtCtrl[k].board[b] = -1;
502
503 evtCtrl[k].runCtrl_idx = idx;
504 evtCtrl[k].pcTime[0] = tsec;
505 evtCtrl[k].pcTime[1] = tusec;
506 evtCtrl[k].nRoi = nRoi[0];
507 evtCtrl[k].nRoiTM = nRoi[8];
508 evtCtrl[k].evNum = evID;
509 evtCtrl[k].runNum = runID;
510 evtCtrl[k].fadNum = fadNum;
511 evtCtrl[k].trgNum = trgNum;
512 evtCtrl[k].trgTyp = trgTyp;
513 evtCtrl[k].Errors[0] = 0;
514 evtCtrl[k].Errors[1] = 0;
515 evtCtrl[k].Errors[2] = 0;
516 evtCtrl[k].Errors[3] = 0;
517 evtCtrl[k].fEvent = NULL;
518 evtCtrl[k].FADhead = NULL;
519
520 // -1: kInValid
521 // 0: kValid
522 // 1-40: kIncomplete
523 // 90: kIncompleteReported
524 // 100: kCompleteEventInBuffer
525 // 1000+x: kToBeProcessedByThreadX
526 // 5000: kToBeWritten
527 // 10000: kToBeDeleted
528
529 evtCtrl[k].evtStat = 0;
530
531 // This is dangerous, because theoretically, it can result is
532 // acessing invalid memory in another thread if this is split
533 // in two instructions. Must be done only _after_ the contents
534 // have been initialized
535 evtCtrl_lastPtr = (evtCtrl_lastPtr+1) % MAX_EVT;
536
537 return k;
538
539} /*-----------------------------------------------------------------*/
540
541
542void initEvent(int i)
543{
544 evtCtrl[i].fEvent = (EVENT*)((char*)evtCtrl[i].FADhead+MAX_HEAD_MEM);
545 memset(evtCtrl[i].fEvent->Adc_Data, 0, (NPIX+NTMARK)*2*evtCtrl[i].nRoi);
546
547 //flag all pixels as unused
548 for (int k = 0; k < NPIX; k++)
549 evtCtrl[i].fEvent->StartPix[k] = -1;
550
551 //flag all TMark as unused
552 for (int k = 0; k < NTMARK; k++)
553 evtCtrl[i].fEvent->StartTM[k] = -1;
554
555 evtCtrl[i].fEvent->NumBoards = 0;
556 evtCtrl[i].fEvent->SoftTrig = 0;
557 evtCtrl[i].fEvent->PCTime = evtCtrl[i].pcTime[0];
558 evtCtrl[i].fEvent->PCUsec = evtCtrl[i].pcTime[1];
559 evtCtrl[i].fEvent->Roi = evtCtrl[i].nRoi;
560 evtCtrl[i].fEvent->RoiTM = evtCtrl[i].nRoiTM;
561 evtCtrl[i].fEvent->EventNum = evtCtrl[i].evNum;
562 evtCtrl[i].fEvent->TriggerNum = evtCtrl[i].trgNum;
563 evtCtrl[i].fEvent->TriggerType = evtCtrl[i].trgTyp;
564}
565
566
567int
568mBufFree (int i)
569{
570 TGB_free(evtCtrl[i].FADhead);
571
572 evtCtrl[i].fEvent = NULL;
573 evtCtrl[i].FADhead = NULL;
574
575 evtCtrl[i].evNum = evtCtrl[i].nRoi = -1;
576 evtCtrl[i].runNum = 0;
577
578 gj.usdMem = tgb_inuse;
579
580 gj.bufTot--;
581
582 return 0;
583
584} /*-----------------------------------------------------------------*/
585
586uint64_t reportIncomplete(int id, const char *txt)
587{
588 factPrintf(kWarn, 601, "skip incomplete evt (run=%d, evt=%d, %s)",
589 evtCtrl[id].runNum, evtCtrl[id].evNum, txt);
590
591 uint64_t report = 0;
592
593 char str[1000];
594
595 int ik=0;
596 for (int ib=0; ib<NBOARDS; ib++)
597 {
598 if (ib%10==0)
599 str[ik++] = '|';
600
601 const int jb = evtCtrl[id].board[ib];
602 if (jb>=0) // data received from that board
603 {
604 str[ik++] = '0'+(jb%10);
605 continue;
606 }
607
608 // FIXME: This is not synchronous... it reports
609 // accoridng to the current connection status, not w.r.t. to the
610 // one when the event was taken.
611 if (gi_NumConnect[ib]<=0) // board not connected
612 {
613 str[ik++] = 'x';
614 continue;
615 }
616
617 // data from this board lost
618 str[ik++] = '.';
619 report |= ((uint64_t)1)<<ib;
620 }
621
622 str[ik++] = '|';
623 str[ik] = 0;
624
625 factOut(kWarn, 601, str);
626
627 return report;
628}
629
630// i == board
631void copyData(CNV_FACT *rbuf, int i, int evID)
632{
633 // swapEventHeaderBytes: End of the header. to channels now
634 int eStart = 36;
635 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
636 {
637 rbuf->S[eStart+0] = ntohs(rbuf->S[eStart+0]);//id
638 rbuf->S[eStart+1] = ntohs(rbuf->S[eStart+1]);//start_cell
639 rbuf->S[eStart+2] = ntohs(rbuf->S[eStart+2]);//roi
640 rbuf->S[eStart+3] = ntohs(rbuf->S[eStart+3]);//filling
641
642 eStart += 4+rbuf->S[eStart+2];//skip the pixel data
643 }
644
645 memcpy(&evtCtrl[evID].FADhead[i], rbuf, sizeof(PEVNT_HEADER));
646
647 int src = sizeof(PEVNT_HEADER) / 2;
648
649 // consistency of ROIs have been checked already (is it all correct?)
650 const int roi = rbuf->S[src+2];
651
652 // different sort in FAD board.....
653 for (int px = 0; px < 9; px++)
654 {
655 for (int drs = 0; drs < 4; drs++)
656 {
657 // pixH = rd[i].rBuf->S[src++]; // ID
658 src++;
659
660 const int pixC = rbuf->S[src++]; // start-cell
661 const int pixR = rbuf->S[src++]; // roi
662 //here we should check if pixH is correct ....
663
664 const int pixS = i * 36 + drs * 9 + px;
665 src++;
666
667 evtCtrl[evID].fEvent->StartPix[pixS] = pixC;
668
669 const int dest1 = pixS * roi;
670 memcpy(&evtCtrl[evID].fEvent->Adc_Data[dest1], &rbuf->S[src], roi * 2);
671
672 src += pixR;
673
674 if (px == 8)
675 {
676 const int tmS = i * 4 + drs;
677
678 //and we have additional TM info
679 if (pixR > roi)
680 {
681 const int dest2 = tmS * roi + NPIX * roi;
682
683 const int srcT = src - roi;
684 evtCtrl[evID].fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024;
685
686 memcpy(&evtCtrl[evID].fEvent->Adc_Data[dest2], &rbuf->S[srcT], roi * 2);
687 }
688 else
689 {
690 evtCtrl[evID].fEvent->StartTM[tmS] = -1;
691 }
692 }
693 }
694 }
695}
696
697
698void *readFAD (void *ptr)
699{
700/* *** main loop reading FAD data and sorting them to complete events */
701
702 factPrintf(kInfo, -1, "Start initializing (readFAD)");
703
704 READ_STRUCT rd[NBOARDS]; //buffer to read IP and afterwards store in mBuffer
705
706 uint32_t actrun = 0;
707
708 const int minLen = sizeof(PEVNT_HEADER); //min #bytes needed to check header: full header for debug
709
710
711/* initialize run control logics */
712 for (int i = 0; i < MAX_RUN; i++) {
713 runCtrl[i].runId = 0;
714 runCtrl[i].fileId = -2;
715 }
716
717 int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
718 gi_resetS = gi_resetR = 9;
719
720 int sockDef[NBOARDS]; //internal state of sockets
721 memset(sockDef, 0, NBOARDS*sizeof(int));
722
723 START:
724 evtCtrl_frstPtr = 0;
725 evtCtrl_lastPtr = 0;
726
727 //time in seconds
728 uint gi_SecTime = time(NULL);;
729 g_actTime = gi_SecTime;
730
731 const int cntsock = 8 - NUMSOCK ;
732
733 if (gi_resetS > 0) {
734 //make sure all sockets are preallocated as 'not exist'
735 for (int i = 0; i < NBOARDS; i++) {
736 rd[i].socket = -1;
737 rd[i].sockStat = 99;
738 }
739
740 for (int k = 0; k < NBOARDS; k++) {
741 gi_NumConnect[k] = 0;
742 //gi.numConn[k] = 0;
743 gj.numConn[k] = 0;
744 //gj.errConn[k] = 0;
745 gj.rateBytes[k] = 0;
746 gj.totBytes[k] = 0;
747 }
748
749 }
750
751 if (gi_resetR > 0)
752 {
753 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
754 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
755 gj.totMem = tgb_memory;
756 gj.bufNew = gj.bufEvt = 0;
757 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
758
759 // initialize mBuffer (mark all entries as unused\empty)
760 for (int i = 0; i < MAX_EVT; i++)
761 {
762 evtCtrl[i].evNum = evtCtrl[i].nRoi = -1;
763 evtCtrl[i].runNum = 0;
764 evtCtrl[i].evtStat = -1;
765 }
766
767 evtCtrl_frstPtr = 0;
768 evtCtrl_lastPtr = 0;
769
770 factPrintf(kInfo, -1, "End initializing (readFAD)");
771 }
772
773
774 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
775
776 //loop until global variable g_runStat claims stop
777 while (g_runStat >= 0 && g_reset == 0)
778 {
779 gj.readStat = g_runStat;
780
781 for (int b = 0; b < NBOARDS; b++)
782 {
783 // Nothing has changed
784 if (g_port[b].sockDef == sockDef[b])
785 continue;
786
787 gi_NumConnect[b] = 0; //must close all connections
788 gj.numConn[b] = 0;
789
790 // s0 = 0: sockets to be defined and opened
791 // s0 = -1: sockets to be destroyed
792 // s0 = +1: sockets to be closed and reopened
793
794 int s0 = 0;
795 if (sockDef[b] != 0)
796 s0 = g_port[b].sockDef==0 ? -1 : +1;
797
798 const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0;
799
800 GenSock(s0, b, p0+1, &g_port[b].sockAddr, &rd[b]); //generate address and socket
801
802 sockDef[b] = g_port[b].sockDef;
803 }
804
805 // count the number of active boards
806 int actBoards = 0;
807 for (int b = 0; b < NBOARDS; b++)
808 if (sockDef[b] > 0)
809 actBoards++;
810
811 //check all sockets if something to read
812 for (int i = 0; i < NBOARDS; i++)
813 {
814 // Do not try to connect this socket
815 if (rd[i].sockStat > 0)
816 continue;
817
818 if (rd[i].sockStat == -1)
819 {
820 //try to connect if not yet done
821 rd[i].sockStat = connect (rd[i].socket,
822 (struct sockaddr *) &rd[i].SockAddr,
823 sizeof (rd[i].SockAddr));
824 // Failed
825 if (rd[i].sockStat == -1)
826 {
827 rd[i].errCnt++;
828 usleep(25000);
829 continue;
830 }
831
832 // Success (rd[i].sockStat == 0)
833
834 if (sockDef[i] > 0)
835 {
836 rd[i].bufTyp = 0; // expect a header
837 rd[i].bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
838 }
839 else
840 {
841 rd[i].bufTyp = -1; // full data to be skipped
842 rd[i].bufLen = MAX_LEN; // huge for skipping
843 }
844
845 rd[i].bufPos = 0; // no byte read so far
846 rd[i].skip = 0; // start empty
847
848 gi_NumConnect[i] += cntsock;
849 gj.numConn[i]++;
850
851 factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", i, gj.numConn[i]);
852 }
853
854 // Do not read from this socket
855 if (rd[i].bufLen<0)
856 continue;
857
858 if (rd[i].bufLen>0)
859 {
860 const int32_t jrd =
861 recv(rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
862 rd[i].bufLen, MSG_DONTWAIT);
863
864 // recv failed
865 if (jrd<0)
866 {
867 // There was just nothing waiting
868 if (errno==EWOULDBLOCK || errno==EAGAIN)
869 continue;
870
871 factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);
872 continue;
873 }
874
875 // connection was closed ...
876 if (jrd==0)
877 {
878 factPrintf(kInfo, 441, "Socket %d closed by FAD", i);
879
880 const int s0 = sockDef[i] > 0 ? +1 : -1;
881 GenSock(s0, i, 0, NULL, &rd[i]);
882
883 gi_NumConnect[i]-= cntsock ;
884 gj.numConn[i]--;
885
886 continue;
887 }
888
889 gj.rateBytes[i] += jrd;
890
891 // are we skipping this board ...
892 if (rd[i].bufTyp < 0)
893 continue;
894
895 rd[i].bufPos += jrd; //==> prepare for continuation
896 rd[i].bufLen -= jrd;
897
898#ifdef EVTDEBUG
899 debugRead(i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, rd[i].bufTyp, tv.tv_sec, tv.tv_usec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
900#endif
901 }
902
903 //we are reading event header
904 if (rd[i].bufTyp <= 0)
905 {
906 //not yet sufficient data to take action
907 if (rd[i].bufPos < minLen)
908 continue;
909
910 //check if startflag correct; else shift block ....
911 // FIXME: This is not enough... this combination of
912 // bytes can be anywhere... at least the end bytes
913 // must be checked somewhere, too.
914 int k;
915 for (k = 0; k < rd[i].bufPos - 1; k++)
916 {
917 //start.S = 0xFB01;
918 if (rd[i].rBuf->B[k] == 0xfb && rd[i].rBuf->B[k+1] == 0x01)
919 break;
920 }
921 rd[i].skip += k;
922
923 //no start of header found
924 if (k >= rd[i].bufPos - 1)
925 {
926 rd[i].rBuf->B[0] = rd[i].rBuf->B[rd[i].bufPos];
927 rd[i].bufPos = 1;
928 rd[i].bufLen = sizeof(PEVNT_HEADER)-1;
929 continue;
930 }
931
932 if (k > 0)
933 {
934 rd[i].bufPos -= k;
935 rd[i].bufLen += k;
936 memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
937 rd[i].bufPos);
938 }
939
940 if (rd[i].bufPos < minLen)
941 continue;
942
943 if (rd[i].skip > 0)
944 {
945 factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i);
946 rd[i].skip = 0;
947 }
948
949 // TGB: This needs much more checks than just the first two bytes!
950
951 // Swap everything except start_package_flag.
952 // It is to difficult to find out where it is used how,
953 // but it doesn't really matter because it is not really
954 // used anywehere else
955 // rd[i].rBuf->S[1] = ntohs(rd[i].rBuf->S[1]); // package_length
956 rd[i].rBuf->S[2] = ntohs(rd[i].rBuf->S[2]); // version_no
957 rd[i].rBuf->S[3] = ntohs(rd[i].rBuf->S[3]); // PLLLCK
958 rd[i].rBuf->S[4] = ntohs(rd[i].rBuf->S[4]); // trigger_crc
959 rd[i].rBuf->S[5] = ntohs(rd[i].rBuf->S[5]); // trigger_type
960
961 rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]); // board id
962 rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]); // adc_clock_phase_shift
963 rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]); // number_of_triggers_to_generate
964 rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]); // trigger_generator_prescaler
965
966 rd[i].rBuf->I[3] = ntohl(rd[i].rBuf->I[3]); // trigger_id
967 rd[i].rBuf->I[4] = ntohl(rd[i].rBuf->I[4]); // fad_evt_counter
968 rd[i].rBuf->I[5] = ntohl(rd[i].rBuf->I[5]); // REFCLK_frequency
969
970 rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]); // runnumber;
971 rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]); // time;
972
973 for (int s=24;s<24+NTemp+NDAC;s++)
974 rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac
975
976 rd[i].fadLen = ntohs(rd[i].rBuf->S[1]) * 2;
977 rd[i].fadVers = rd[i].rBuf->S[2];
978 rd[i].ftmTyp = rd[i].rBuf->S[5];
979 rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt)
980 rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt)
981 rd[i].runID = rd[i].rBuf->I[11]==0 ? g_actTime : rd[i].rBuf->I[11];
982 rd[i].bufTyp = 1; //ready to read full record
983 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
984
985 const int fadBoard = rd[i].rBuf->S[12];
986 debugHead(i, fadBoard, rd[i].rBuf);
987
988 continue;
989 }
990
991 // are we reading data
992
993 // not yet all read
994 if (rd[i].bufLen > 0)
995 continue;
996
997 // stop.S = 0x04FE;
998 if (rd[i].rBuf->B[rd[i].fadLen - 1] != 0xfe ||
999 rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04)
1000 {
1001 factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d",
1002 i, rd[i].evtID, rd[i].fadLen,
1003 rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]);
1004
1005 // ready to read next header
1006 rd[i].bufTyp = 0;
1007 rd[i].bufLen = sizeof(PEVNT_HEADER);
1008 rd[i].bufPos = 0;
1009
1010 continue;
1011 }
1012
1013 // int actid;
1014 // if (g_useFTM > 0)
1015 // actid = rd[i].evtID;
1016 // else
1017 // actid = rd[i].ftmID;
1018
1019 //get index into mBuffer for this event (create if needed)
1020 const int idx = mBufEvt(&rd[i]);
1021
1022 // no free entry in mBuffer, retry later
1023 if (idx == -1)
1024 continue;
1025
1026 // We have a valid entry, but no memory has yet been allocated
1027 if (idx >= 0 && evtCtrl[idx].FADhead == NULL)
1028 {
1029 // Try to get memory from the big buffer
1030 evtCtrl[idx].FADhead = (PEVNT_HEADER*)TGB_Malloc();
1031 if (evtCtrl[idx].FADhead == NULL)
1032 {
1033 // If this works properly, this is a hack which can be removed, or
1034 // replaced by a signal or dim message
1035 if (rd[i].bufTyp==2)
1036 factPrintf(kError, 882, "malloc failed for event %d (run=%d)", evtCtrl[idx].evNum, evtCtrl[idx].runNum);
1037 rd[i].bufTyp = 2;
1038 continue;
1039 }
1040
1041 // Initialise contents of mBuffer[evID]->fEvent
1042 initEvent(idx);
1043
1044 // Some statistics
1045 gj.usdMem = tgb_inuse;
1046
1047 if (gj.usdMem > gj.maxMem)
1048 gj.maxMem = gj.usdMem;
1049
1050 gj.rateNew++;
1051 gj.bufTot++;
1052 if (gj.bufTot > gj.maxEvt)
1053 gj.maxEvt = gj.bufTot;
1054 }
1055
1056 // ready to read next header
1057 rd[i].bufTyp = 0;
1058 rd[i].bufLen = sizeof(PEVNT_HEADER);
1059 rd[i].bufPos = 0;
1060
1061 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1062 if (idx < -1000)
1063 continue;
1064
1065 //we have a valid entry in mBuffer[]; fill it
1066 const int fadBoard = rd[i].rBuf->S[12];
1067 const int fadCrate = fadBoard>>8;
1068
1069 if (i != (fadCrate * 10 + (fadBoard&0xff)))
1070 {
1071 factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)",
1072 i, fadBoard, fadCrate, fadBoard&0xff);
1073 }
1074
1075 if (evtCtrl[idx].board[i] != -1)
1076 {
1077 factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d",
1078 evtCtrl[idx].evNum, i, i, rd[i].fadLen,
1079 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1080 rd[i].rBuf->B[rd[i].fadLen - 2],
1081 rd[i].rBuf->B[rd[i].fadLen - 1]);
1082 continue; // Continue reading next header
1083 }
1084
1085 // Copy data from rd[i] to mBuffer[evID]
1086 copyData(rd[i].rBuf, i, idx);
1087
1088 // now we have stored a new board contents into Event structure
1089
1090 evtCtrl[idx].fEvent->NumBoards++;
1091 evtCtrl[idx].board[i] = i;
1092 evtCtrl[idx].nBoard++;
1093 evtCtrl[idx].evtStat = evtCtrl[idx].nBoard;
1094
1095 // have we already reported first (partial) event of this run ???
1096 if (evtCtrl[idx].nBoard==1 && evtCtrl[idx].runNum != actrun)
1097 {
1098 // Signal the fadctrl that a new run has been started
1099 gotNewRun(evtCtrl[idx].runNum, NULL);
1100
1101 factPrintf(kInfo, 1, "gotNewRun called, prev run %d, new run %d, event %d",
1102 actrun, evtCtrl[idx].runNum, evtCtrl[idx].evNum);
1103
1104 for (int j=0; j<MAX_RUN; j++)
1105 {
1106 // Since we have started a new run, we know already when to close the
1107 // previous run in terms of number of events
1108 if (runCtrl[j].runId==actrun)
1109 runCtrl[j].maxEvt = runCtrl[j].lastEvt;
1110
1111 // We got the first part of this event, so this is
1112 // the number of events we expect for this run
1113 if (runCtrl[j].runId==evtCtrl[idx].runNum)
1114 runCtrl[j].lastEvt++;
1115 }
1116
1117 // Change 'actrun' the the new runnumber
1118 actrun = evtCtrl[idx].runNum;
1119 }
1120
1121 // event not yet complete
1122 if (evtCtrl[idx].nBoard < actBoards)
1123 continue;
1124
1125 // This is a non-ideal hack to lower the probability that
1126 // in mBufEvt the search for correct entry in runCtrl
1127 // will not return a super-old entry. I don't want
1128 // to manipulate that in another thread.
1129 for (int ir=0; ir<MAX_RUN; ir++)
1130 {
1131 if (runCtrl[ir].runId != actrun && runCtrl[ir].fileId>0)
1132 runCtrl[ir].runId = 0;
1133 }
1134
1135 const int beg = (idx + MAX_EVT - 1) % MAX_EVT;
1136 const int end = (evtCtrl_frstPtr + MAX_EVT - 1) % MAX_EVT;
1137
1138 // we have just completed an event... so all previous events
1139 // must have been completed already. If they are not, there
1140 // is no need to wait for the timeout, because they will never
1141 // get completed. We can just ensure that if we check for the previous
1142 // event to be complete every time we receive a new complete event.
1143 // If we find an incomplete one, we remove all consecutive
1144 // incomplete ones.
1145 for (int k=beg; k!=end; k=(k+MAX_EVT-1)%MAX_EVT)
1146 {
1147 // We are done if we find a complete or fully processed event
1148 if (evtCtrl[k].evtStat>=100)
1149 break;
1150
1151 // we do not call factReportIncomplete here, because by starting a new
1152 // run and having received the first complete event from that run
1153 // the user has expressed that the old events are obsolste now
1154 // and the run will be closed anyway
1155 if (evtCtrl[k].evtStat>=0 && evtCtrl[k].evtStat<90)
1156 {
1157 reportIncomplete(k, "expired");
1158 evtCtrl[k].evtStat = 90;
1159 }
1160 }
1161
1162 // Flag that the event is ready for processing
1163 evtCtrl[idx].evtStat = 100;
1164
1165 } // end for loop over all sockets
1166
1167 g_actTime = time (NULL);
1168 if (g_actTime <= gi_SecTime)
1169 {
1170 usleep(1);
1171 continue;
1172 }
1173 gi_SecTime = g_actTime;
1174
1175 gj.bufNew = 0;
1176
1177 //loop over all active events and flag those older than read-timeout
1178 //delete those that are written to disk ....
1179
1180 const int count = (evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT;
1181
1182 // This could be improved having the pointer which separates the queue with
1183 // the incomplete events from the queue with the complete events
1184 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
1185 {
1186 // Check the more likely case first: incomplete events
1187 if (evtCtrl[k0].evtStat>=0 && evtCtrl[k0].evtStat<100)
1188 {
1189 gj.bufNew++; //incomplete event in Buffer
1190
1191 // Event has not yet timed out or was reported already
1192 if (evtCtrl[k0].evtStat==90 || evtCtrl[k0].pcTime[0]>=g_actTime - 30)
1193 continue;
1194
1195 // This will result in the emission of a dim service.
1196 // It doesn't matter if that takes comparably long,
1197 // because we have to stop the run anyway.
1198 const uint64_t rep = reportIncomplete(k0, "timeout");
1199 factReportIncomplete(rep);
1200
1201 //timeout for incomplete events
1202 evtCtrl[k0].evtStat = 90;
1203 gj.evtSkip++;
1204
1205 continue;
1206 }
1207
1208 // Check the less likely case: 'useless' or 'delete'
1209 // evtState==0 can happen if the event was initialized (some data received)
1210 // but the data did not make sense (e.g. inconsistent rois)
1211 if (evtCtrl[k0].evtStat==0 || evtCtrl[k0].evtStat == 10000)
1212 {
1213 mBufFree(k0); //event written--> free memory
1214 evtCtrl[k0].evtStat = -1;
1215
1216 gj.evtWrite++;
1217 gj.rateWrite++;
1218 }
1219
1220 // Remove leading invalidated slots from queue
1221 if (evtCtrl[evtCtrl_frstPtr].evtStat==-1 && k0==evtCtrl_frstPtr)
1222 {
1223 evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % MAX_EVT;
1224 continue;
1225 }
1226 }
1227
1228 // The number of complete events in the buffer is the total number of
1229 // events in the buffer minus the number of incomplete events.
1230 gj.bufEvt = count - gj.bufNew;
1231
1232 gj.deltaT = 1000; //temporary, must be improved
1233
1234 for (int ib = 0; ib < NBOARDS; ib++)
1235 gj.totBytes[ib] += gj.rateBytes[ib];
1236
1237 gj.totMem = tgb_memory;
1238
1239 if (gj.maxMem > gj.xxxMem)
1240 gj.xxxMem = gj.maxMem;
1241 if (gj.maxEvt > gj.xxxEvt)
1242 gj.xxxEvt = gj.maxEvt;
1243
1244 factStat (gj);
1245 //factStatNew (gi);
1246 gj.rateNew = gj.rateWrite = 0;
1247 gj.maxMem = gj.usdMem;
1248 gj.maxEvt = gj.bufTot;
1249 for (int b = 0; b < NBOARDS; b++)
1250 gj.rateBytes[b] = 0;
1251
1252 } // while (g_runStat >= 0 && g_reset == 0)
1253
1254 factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset);
1255
1256 if (g_reset > 0)
1257 {
1258 gi_reset = g_reset;
1259 gi_resetR = gi_reset % 10; //shall we stop reading ?
1260 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1261 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1262 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1263 g_reset = 0;
1264 }
1265 else
1266 {
1267 gi_reset = 0;
1268 gi_resetR = g_runStat == -1 ? 1 : 7;
1269
1270 gi_resetS = 7; //close all sockets
1271 gi_resetW = 7; //close all files
1272 gi_resetX = 0;
1273
1274 //inform others we have to quit ....
1275 gj.readStat = -11; //inform all that no update to happen any more
1276 }
1277
1278 if (gi_resetS > 0)
1279 {
1280 //must close all open sockets ...
1281 factPrintf(kInfo, -1, "Close all sockets...");
1282
1283 for (int i = 0; i < NBOARDS; i++)
1284 {
1285 if (rd[i].sockStat != 0)
1286 continue;
1287
1288 GenSock(-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1289
1290 gi_NumConnect[i]-= cntsock ;
1291 gj.numConn[i]--;
1292 sockDef[i] = 0; //flag ro recreate the sockets ...
1293 rd[i].sockStat = -1; //and try to open asap
1294 }
1295 }
1296
1297
1298 if (gi_resetR > 0)
1299 {
1300 //and clear all buffers (might have to wait until all others are done)
1301 while (evtCtrl_frstPtr!=evtCtrl_lastPtr)
1302 {
1303 const int k0=evtCtrl_frstPtr;
1304
1305 // flag incomplete events as 'read finished'
1306 // We cannot just detele all events, because some might currently being processed,
1307 // so we have to wait until the processing thread currently processing the event
1308 // signals that the event can be deleted. (Note, that there are currently never
1309 // two threads processing the same event at the same time)
1310 if ((evtCtrl[k0].evtStat>0 && evtCtrl[k0].evtStat<90) || evtCtrl[k0].evtStat==10000)
1311 {
1312 mBufFree(k0); //event written--> free memory
1313 evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % MAX_EVT;
1314 evtCtrl[k0].evtStat = -1;
1315 }
1316
1317 usleep(1);
1318 }
1319 }
1320
1321 if (gi_reset > 0)
1322 {
1323 if (gi_resetW > 0)
1324 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1325
1326 if (gi_resetX > 0)
1327 {
1328 struct timespec xwait;
1329 xwait.tv_sec = gi_resetX;
1330 xwait.tv_nsec = 0;
1331 nanosleep (&xwait, NULL);
1332 }
1333
1334 factPrintf(kInfo, -1, "Continue read Process ...");
1335 gi_reset = 0;
1336 goto START;
1337 }
1338
1339 factPrintf(kInfo, -1, "Exit read Process...");
1340
1341 factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse);
1342
1343 gj.readStat = -99;
1344
1345 factStat (gj);
1346 //factStatNew (gi);
1347
1348 return 0;
1349
1350} /*-----------------------------------------------------------------*/
1351
1352
1353void *subProc(void *thrid)
1354{
1355 const int64_t threadID = (int64_t)thrid;
1356
1357 factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID);
1358
1359 while (g_runStat > -2) //in case of 'exit' we still must process pending events
1360 {
1361 int numWait = 0;
1362
1363 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
1364 {
1365 // This is a threading issue... the evtStat might have been invalid
1366 // but the frstPtr is not yet updated
1367 if (evtCtrl[k0].evtStat==-1)
1368 continue;
1369
1370 // If we find the first event still waiting for processing
1371 // there will be only unprocessed events after this one in the queue
1372 if (evtCtrl[k0].evtStat<1000+threadID)
1373 {
1374 numWait = 1;
1375 break;
1376 }
1377
1378 // If the event was processed already, skip it
1379 // We could replace that to a moving pointer pointing to the first
1380 // non-processed event
1381 if (evtCtrl[k0].evtStat!=1000+threadID)
1382 continue;
1383
1384 const int jret = subProcEvt(threadID, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent, 0);
1385
1386 if (jret>0 && jret<=threadID)
1387 factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret);
1388
1389 if (jret<=threadID)
1390 {
1391 evtCtrl[k0].evtStat = 10000; // flag as 'to be deleted'
1392 continue;
1393 }
1394
1395 if (jret>=gi_maxProc)
1396 {
1397 evtCtrl[k0].evtStat = 5000; // flag as 'to be written'
1398 continue;
1399 }
1400
1401 evtCtrl[k0].evtStat = 1000 + jret; // flag for next proces
1402 }
1403
1404 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1405 factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID);
1406 return 0;
1407 }
1408
1409 usleep(1);
1410 }
1411
1412 factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID);
1413
1414 return 0;
1415}
1416
1417/*-----------------------------------------------------------------*/
1418
1419
1420void *
1421procEvt (void *ptr)
1422{
1423/* *** main loop processing file, including SW-trigger */
1424 int status;
1425
1426 factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc);
1427
1428 pthread_t thread[100];
1429// int th_ret[100];
1430
1431 for (long long k = 0; k < gi_maxProc; k++) {
1432 /*th_ret[k] =*/ pthread_create (&thread[k], NULL, subProc, (void *) k);
1433 }
1434
1435 // in case of 'exit' we still must process pending events
1436 while (g_runStat > -2)
1437 {
1438 int numWait = 0;
1439
1440 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
1441 {
1442 // This is a threading issue... the evtStat might have been invalid
1443 // but the frstPtr is not yet updated
1444 if (evtCtrl[k0].evtStat==-1)
1445 continue;
1446
1447 // If we find the first incomplete event which is not supposed to
1448 // be processed, there are only more incomplete events in the queue
1449 if (evtCtrl[k0].evtStat<90)
1450 {
1451 numWait = 1;
1452 break;
1453 }
1454
1455 // If the event was processed already, skip it.
1456 // We could replace that to a moving pointer pointing to the first
1457 // non-processed event
1458 if (evtCtrl[k0].evtStat>=1000)
1459 continue;
1460
1461 //-------- it is better to open the run already here, so call can be used to initialize
1462 //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1463 const uint32_t irun = evtCtrl[k0].runNum;
1464 const int32_t ievt = evtCtrl[k0].evNum;
1465
1466 const int idx = evtCtrl[k0].runCtrl_idx;
1467 if (runCtrl[idx].runId!=irun)
1468 {
1469 //factPrintf(kFatal, 901, "procEvt: runCtrl entry for run %d vanished (evt=%d)", irun, ievt);
1470 // FIXME: What is the right action? (Flag event for deletion?)
1471 continue;
1472 }
1473
1474 // File not yet open
1475 if (runCtrl[idx].fileId < 0)
1476 {
1477 RUN_HEAD actRun;
1478 actRun.Version = 1;
1479 actRun.RunType = -1; //to be adapted
1480 actRun.Nroi = evtCtrl[k0].nRoi; //runCtrl[lastRun].roi0;
1481 actRun.NroiTM = evtCtrl[k0].nRoiTM; //runCtrl[lastRun].roi8;
1482 actRun.RunTime = evtCtrl[k0].pcTime[0]; //runCtrl[lastRun].firstTime;
1483 actRun.RunUsec = evtCtrl[k0].pcTime[1]; //runCtrl[lastRun].firstUsec;
1484 actRun.NBoard = NBOARDS;
1485 actRun.NPix = NPIX;
1486 actRun.NTm = NTMARK;
1487
1488 memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS*sizeof(PEVNT_HEADER));
1489
1490 runCtrl[idx].fileHd = runOpen(irun, &actRun, sizeof (actRun));
1491 if (runCtrl[idx].fileHd == NULL)
1492 {
1493 factPrintf(kError, 502, "procEvt: Could not open new file for run %d (idx=%d, evt=%d, runOpen failed)", irun, idx, ievt);
1494 runCtrl[idx].fileId = 91;
1495 continue;
1496 }
1497
1498 runCtrl[idx].fileId = 0;
1499
1500 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (idx=%d, evt=%d)", irun, idx, ievt);
1501 }
1502
1503 //--------
1504 //--------
1505
1506 //and set correct event header ; also check for consistency in event (not yet)
1507 evtCtrl[k0].fEvent->Errors[0] = evtCtrl[k0].Errors[0];
1508 evtCtrl[k0].fEvent->Errors[1] = evtCtrl[k0].Errors[1];
1509 evtCtrl[k0].fEvent->Errors[2] = evtCtrl[k0].Errors[2];
1510 evtCtrl[k0].fEvent->Errors[3] = evtCtrl[k0].Errors[3];
1511
1512 for (int ib=0; ib<NBOARDS; ib++)
1513 {
1514 // board is not read
1515 if (evtCtrl[k0].board[ib] == -1)
1516 {
1517 evtCtrl[k0].FADhead[ib].start_package_flag = 0;
1518 evtCtrl[k0].fEvent->BoardTime[ib] = 0;
1519 }
1520 else
1521 {
1522 evtCtrl[k0].fEvent->BoardTime[ib] = evtCtrl[k0].FADhead[ib].time;
1523 }
1524 }
1525
1526 const int rc = eventCheck(evtCtrl[k0].runNum, evtCtrl[k0].FADhead, evtCtrl[k0].fEvent);
1527 if (rc < 0)
1528 {
1529 evtCtrl[k0].evtStat = 10000; // flag event to be deleted
1530 }
1531 else
1532 {
1533 evtCtrl[k0].evtStat = 1000; // flag 'start processing'
1534 runCtrl[idx].procEvt++;
1535 }
1536 }
1537
1538 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1539 factPrintf(kInfo, -1, "Exit Processing Process ...");
1540 gj.procStat = -22; //==> we should exit
1541 return 0;
1542 }
1543
1544 usleep(1);
1545
1546 gj.procStat = gj.readStat;
1547 }
1548
1549 //we are asked to abort asap ==> must flag all remaining events
1550 // when gi_runStat claims that all events are in the buffer...
1551
1552 factPrintf(kInfo, -1, "Abort Processing Process ...");
1553
1554 for (int k = 0; k < gi_maxProc; k++) {
1555 pthread_join (thread[k], (void **) &status);
1556 }
1557
1558 gj.procStat = -99;
1559
1560 return 0;
1561
1562} /*-----------------------------------------------------------------*/
1563
1564int
1565CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
1566{
1567/* close run runId (all all runs if runId=0) */
1568/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1569 int j;
1570
1571
1572 if (runId == 0) {
1573 for (j = 0; j < MAX_RUN; j++) {
1574 if (runCtrl[j].fileId == 0) { //run is open
1575 runCtrl[j].closeTime = closeTime;
1576 runCtrl[j].maxEvt = maxEvt;
1577 }
1578 }
1579 return 0;
1580 }
1581
1582 for (j = 0; j < MAX_RUN; j++) {
1583 if (runCtrl[j].runId == runId) {
1584 if (runCtrl[j].fileId == 0) { //run is open
1585 runCtrl[j].closeTime = closeTime;
1586 runCtrl[j].maxEvt = maxEvt;
1587 return 0;
1588 } else if (runCtrl[j].fileId < 0) { //run not yet opened
1589 runCtrl[j].closeTime = closeTime;
1590 runCtrl[j].maxEvt = maxEvt;
1591 return +1;
1592 } else { // run already closed
1593 return +2;
1594 }
1595 }
1596 } //we only reach here if the run was never created
1597 return -1;
1598
1599}
1600
1601void checkAndCloseRun(int j, int irun, int cond, int where)
1602{
1603 if (!cond &&
1604 runCtrl[j].closeTime >= g_actTime &&
1605 runCtrl[j].lastTime >= g_actTime - 300 &&
1606 runCtrl[j].maxEvt > runCtrl[j].actEvt)
1607 return;
1608
1609 //close run for whatever reason
1610 int ii = 0;
1611 if (cond)
1612 ii = 1;
1613 if (runCtrl[j].closeTime < g_actTime)
1614 ii |= 2; // = 2;
1615 if (runCtrl[j].lastTime < g_actTime - 300)
1616 ii |= 4; // = 3;
1617 if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
1618 ii |= 8; // = 4;
1619
1620 runCtrl[j].closeTime = g_actTime - 1;
1621
1622 const int rc = runClose(runCtrl[j].fileHd, NULL, 0);//&runTail[j], sizeof(runTail[j]));
1623 if (rc<0)
1624 {
1625 factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)",
1626 where, runCtrl[j].runId, rc);
1627 runCtrl[j].fileId = 92+where*2;
1628 }
1629 else
1630 {
1631 factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)",
1632 where, irun, ii);
1633 runCtrl[j].fileId = 93+where*2;
1634 }
1635}
1636
1637/*-----------------------------------------------------------------*/
1638
1639
1640void *writeEvt (void *ptr)
1641{
1642/* *** main loop writing event (including opening and closing run-files */
1643
1644 factPrintf(kInfo, -1, "Starting write-thread");
1645
1646 while (g_runStat > -2)
1647 {
1648 //int numWrite = 0;
1649 int numWait = 0;
1650
1651 // Note that the current loop does not at all gurantee that
1652 // the events are written in the correct order.
1653 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT)
1654 {
1655 // This is a threading issue... the evtStat might have been invalid
1656 // but the frstPtr is not yet updated
1657 if (evtCtrl[k0].evtStat==-1)
1658 continue;
1659
1660 // If we find the first non-written event which is not supposed to
1661 // be written, there are only more incomplete events in the queue
1662 if (evtCtrl[k0].evtStat<5000)
1663 {
1664 numWait = 1;
1665 break;
1666 }
1667
1668 // If the event was written already already, skip it
1669 // We could replace that to a moving pointer pointing to the first
1670 // non-processed event
1671 if (evtCtrl[k0].evtStat!=5000)
1672 continue;
1673
1674 const uint32_t irun = evtCtrl[k0].runNum;
1675 const int32_t ievt = evtCtrl[k0].evNum;
1676
1677 const int idx = evtCtrl[k0].runCtrl_idx;
1678
1679 if (runCtrl[idx].runId!=irun)
1680 {
1681 //factPrintf(kFatal, 901, "writeEvt: runCtrl entry for run %d vanished (evt=%d)", irun, ievt);
1682 // FIXME: What is the right action? (Flag event for deletion?)
1683 continue;
1684 }
1685
1686 // File is open
1687 if (runCtrl[idx].fileId==0)
1688 {
1689 const int rc = runWrite(runCtrl[idx].fileHd, evtCtrl[k0].fEvent, 0);
1690 if (rc >= 0)
1691 {
1692 // Sucessfully wrote event
1693 runCtrl[idx].lastTime = g_actTime;
1694 runCtrl[idx].actEvt++;
1695 }
1696 else
1697 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun);
1698
1699 checkAndCloseRun(idx, irun, rc<0, 1);
1700 }
1701
1702 evtCtrl[k0].evtStat = 10000; // event written (or has to be discarded) -> delete
1703 }
1704
1705 // Although the are no pending events, we have to check if a run should be closed (timeout)
1706 for (int j=0; j<MAX_RUN; j++)
1707 {
1708 if (runCtrl[j].fileId == 0)
1709 {
1710 //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
1711 const int cond = /*runCtrl[j].lastTime < lastStartedTime &&*/ runCtrl[j].runId == 0;
1712 checkAndCloseRun(j, runCtrl[j].runId, cond, 2);
1713 }
1714 }
1715
1716 usleep(1);
1717
1718 //nothing left to do
1719 if (gj.readStat < -10 && numWait == 0)
1720 {
1721 factPrintf(kInfo, -1, "Finish Write Process ...");
1722 gj.writStat = -22; //==> we should exit
1723 break;
1724 }
1725
1726 gj.writStat = gj.readStat;
1727 }
1728
1729 factPrintf(kInfo, -1, "Close all open files ...");
1730 for (int j=0; j<MAX_RUN; j++)
1731 {
1732 if (runCtrl[j].fileId == 0)
1733 checkAndCloseRun(j, runCtrl[j].runId, 1, 3);
1734 }
1735
1736 gj.writStat = -99;
1737
1738 factPrintf(kInfo, -1, "Exit Writing Process ...");
1739
1740 return 0;
1741} /*-----------------------------------------------------------------*/
1742
1743
1744
1745
1746void
1747StartEvtBuild ()
1748{
1749
1750 int i, /*j,*/ imax, status/*, th_ret[50]*/;
1751 pthread_t thread[50];
1752 struct timespec xwait;
1753
1754 gj.readStat = gj.procStat = gj.writStat = 0;
1755
1756 factPrintf(kInfo, -1, "Starting EventBuilder V15.07 A");
1757
1758//initialize run control logics
1759 for (i = 0; i < MAX_RUN; i++) {
1760 runCtrl[i].runId = 0;
1761 runCtrl[i].fileId = -2;
1762 }
1763
1764 gi_maxProc = g_maxProc;
1765 if (gi_maxProc <= 0 || gi_maxProc > 90) {
1766 factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc);
1767 gi_maxProc = 1;
1768 }
1769//partially initialize event control logics
1770 evtCtrl_frstPtr = 0;
1771 evtCtrl_lastPtr = 0;
1772
1773//start all threads (more to come) when we are allowed to ....
1774 while (g_runStat == 0) {
1775 xwait.tv_sec = 0;
1776 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1777 nanosleep (&xwait, NULL);
1778 }
1779
1780 i = 0;
1781 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL);
1782 i++;
1783 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL);
1784 i++;
1785 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL);
1786 i++;
1787 imax = i;
1788
1789
1790#ifdef BILAND
1791 xwait.tv_sec = 30;;
1792 xwait.tv_nsec = 0; // sleep for ~20sec
1793 nanosleep (&xwait, NULL);
1794
1795 printf ("close all runs in 2 seconds\n");
1796
1797 CloseRunFile (0, time (NULL) + 2, 0);
1798
1799 xwait.tv_sec = 1;;
1800 xwait.tv_nsec = 0; // sleep for ~20sec
1801 nanosleep (&xwait, NULL);
1802
1803 printf ("setting g_runstat to -1\n");
1804
1805 g_runStat = -1;
1806#endif
1807
1808
1809//wait for all threads to finish
1810 for (i = 0; i < imax; i++) {
1811 /*j =*/ pthread_join (thread[i], (void **) &status);
1812 }
1813
1814} /*-----------------------------------------------------------------*/
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830 /*-----------------------------------------------------------------*/
1831 /*-----------------------------------------------------------------*/
1832 /*-----------------------------------------------------------------*/
1833 /*-----------------------------------------------------------------*/
1834 /*-----------------------------------------------------------------*/
1835
1836#ifdef BILAND
1837
1838int
1839subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
1840 int8_t * buffer)
1841{
1842 printf ("called subproc %d\n", threadID);
1843 return threadID + 1;
1844}
1845
1846
1847
1848
1849 /*-----------------------------------------------------------------*/
1850 /*-----------------------------------------------------------------*/
1851 /*-----------------------------------------------------------------*/
1852 /*-----------------------------------------------------------------*/
1853 /*-----------------------------------------------------------------*/
1854
1855
1856
1857
1858FileHandle_t
1859runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
1860{
1861 return 1;
1862};
1863
1864int
1865runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
1866{
1867 return 1;
1868 usleep (10000);
1869 return 1;
1870}
1871
1872
1873//{ return 1; } ;
1874
1875int
1876runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
1877{
1878 return 1;
1879};
1880
1881
1882
1883
1884int
1885eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
1886{
1887 int i = 0;
1888
1889// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
1890// for (i=0; i<NBOARDS; i++) {
1891// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
1892// }
1893 return 0;
1894}
1895
1896
1897void
1898factStatNew (EVT_STAT gi)
1899{
1900 int i;
1901
1902//for (i=0;i<MAX_SOCK;i++) {
1903// printf("%4d",gi.numRead[i]);
1904// if (i%20 == 0 ) printf("\n");
1905//}
1906}
1907
1908void
1909gotNewRun (int runnr, PEVNT_HEADER * headers)
1910{
1911 printf ("got new run %d\n", runnr);
1912 return;
1913}
1914
1915void
1916factStat (GUI_STAT gj)
1917{
1918// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
1919// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
1920}
1921
1922
1923void
1924debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
1925 int state, uint32_t tsec, uint32_t tusec)
1926{
1927// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
1928}
1929
1930
1931
1932void
1933debugStream (int isock, void *buf, int len)
1934{
1935}
1936
1937void
1938debugHead (int i, int j, void *buf)
1939{
1940}
1941
1942
1943void
1944factOut (int severity, int err, char *message)
1945{
1946 static FILE *fd;
1947 static int file = 0;
1948
1949 if (file == 0) {
1950 printf ("open file\n");
1951 fd = fopen ("x.out", "w+");
1952 file = 999;
1953 }
1954
1955 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
1956
1957 if (severity != kDebug)
1958 printf ("%3d %3d | %s\n", severity, err, message);
1959}
1960
1961
1962
1963int
1964main ()
1965{
1966 int i, b, c, p;
1967 char ipStr[100];
1968 struct in_addr IPaddr;
1969
1970 g_maxMem = 1024 * 1024; //MBytes
1971 g_maxMem = g_maxMem * 200; //100MBytes
1972
1973 g_maxProc = 20;
1974
1975 g_runStat = 40;
1976
1977 i = 0;
1978
1979// version for standard crates
1980//for (c=0; c<4,c++) {
1981// for (b=0; b<10; b++) {
1982// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
1983//
1984// inet_pton(PF_INET, ipStr, &IPaddr) ;
1985//
1986// g_port[i].sockAddr.sin_family = PF_INET;
1987// g_port[i].sockAddr.sin_port = htons(5000) ;
1988// g_port[i].sockAddr.sin_addr = IPaddr ;
1989// g_port[i].sockDef = 1 ;
1990// i++ ;
1991// }
1992//}
1993//
1994//version for PC-test *
1995 for (c = 0; c < 4; c++) {
1996 for (b = 0; b < 10; b++) {
1997 sprintf (ipStr, "10.0.%d.11", 128 + c);
1998 if (c < 2)
1999 sprintf (ipStr, "10.0.%d.11", 128);
2000 else
2001 sprintf (ipStr, "10.0.%d.11", 131);
2002// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2003
2004 inet_pton (PF_INET, ipStr, &IPaddr);
2005 p = 31919 + 100 * c + 10 * b;
2006
2007
2008 g_port[i].sockAddr.sin_family = PF_INET;
2009 g_port[i].sockAddr.sin_port = htons (p);
2010 g_port[i].sockAddr.sin_addr = IPaddr;
2011 g_port[i].sockDef = 1;
2012
2013 i++;
2014 }
2015 }
2016
2017
2018//g_port[17].sockDef =-1 ;
2019//g_actBoards-- ;
2020
2021 StartEvtBuild ();
2022
2023 return 0;
2024
2025}
2026#endif
Note: See TracBrowser for help on using the repository browser.