source: trunk/FACT++/src/EventBuilder.cc@ 15784

Last change on this file since 15784 was 15512, checked in by tbretz, 11 years ago
First draft of a C++ version of the event builder.
File size: 58.4 KB
Line 
1// #define EVTDEBUG
2
3#define NUMSOCK 1 //set to 7 for old configuration
4#define MAXREAD 65536 //64kB wiznet buffer
5
6#include <stdlib.h>
7#include <stdint.h>
8#include <stdarg.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include <memory>
26#include <deque>
27#include <map>
28
29#include "queue.h"
30
31#include "EventBuilder.h"
32
33enum Severity
34{
35 kMessage = 10, ///< Just a message, usually obsolete
36 kInfo = 20, ///< An info telling something which can be interesting to know
37 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
38 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
39 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
40 kDebug = 99, ///< A message used for debugging only
41};
42
43using namespace std;
44
45#define MIN_LEN 32 // min #bytes needed to interpret FADheader
46#define MAX_LEN 256*1024 // size of read-buffer per socket
47
48//#define nanosleep(x,y)
49
50extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
51extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
52extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
53//extern int runFinish (uint32_t runnr);
54
55extern "C" void factOut (int severity, int err, char *message);
56extern void factReportIncomplete (uint64_t rep);
57
58extern "C" void gotNewRun (int runnr, PEVNT_HEADER * headers);
59
60
61extern void factStat (GUI_STAT gj);
62
63extern void factStatNew (EVT_STAT gi);
64
65extern "C" int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
66
67extern "C" int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
68 int8_t * buffer);
69
70extern void debugHead (int i, int j, void *buf);
71
72extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
73 int32_t runnr, int state, uint32_t tsec,
74 uint32_t tusec);
75extern void debugStream (int isock, void *buf, int len);
76
77int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
78
79int g_maxProc;
80int gi_maxProc;
81
82uint g_actTime;
83int g_runStat;
84int g_reset;
85
86size_t g_maxMem; //maximum memory allowed for buffer
87
88FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
89
90uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
91
92//EVT_STAT gi;
93GUI_STAT gj;
94
95#define MAX_EVT 65536 // ( 300s @ 220Hz; 16GB = 5000 evt @ roi=1024 (27s) ; 18000 evt @ roi = 300 )
96#define MAX_RUN 8 // Number of concurrent runs
97
98void factPrintf(int severity, int id, const char *fmt, ...)
99{
100 char str[1000];
101
102 va_list ap;
103 va_start(ap, fmt);
104 vsnprintf(str, 1000, fmt, ap);
105 va_end(ap);
106
107 factOut(severity, id, str);
108}
109
110
111#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
112#define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)
113typedef struct TGB_struct
114{
115 struct TGB_struct *prev;
116 void *mem;
117} TGB_entry;
118
119TGB_entry *tgb_last = NULL;
120uint64_t tgb_memory = 0;
121uint64_t tgb_inuse = 0;
122
123void *TGB_Malloc()
124{
125 // No free slot available, next alloc would exceed max memory
126 if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem)
127 return NULL;
128
129 // We will return this amount of memory
130 tgb_inuse += MAX_TOT_MEM;
131
132 // No free slot available, allocate a new one
133 if (!tgb_last)
134 {
135 tgb_memory += MAX_TOT_MEM;
136 return malloc(MAX_TOT_MEM);
137 }
138
139 // Get the next free slot from the stack and return it
140 TGB_entry *last = tgb_last;
141
142 void *mem = last->mem;
143 tgb_last = last->prev;
144
145 free(last);
146
147 return mem;
148};
149
150void TGB_free(void *mem)
151{
152 if (!mem)
153 return;
154
155 // Add the last free slot to the stack
156 TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry));
157
158 // FIXME: Really free memory if memory usuage exceeds g_maxMem
159
160 entry->prev = tgb_last;
161 entry->mem = mem;
162
163 tgb_last = entry;
164
165 // Decrease the amont of memory in use accordingly
166 tgb_inuse -= MAX_TOT_MEM;
167
168 gj.usdMem = tgb_inuse;
169 gj.bufTot--;
170}
171
172//RUN_CTRL runCtrl[MAX_RUN];
173
174/*
175*** Definition of rdBuffer to read in IP packets; keep it global !!!!
176 */
177
178typedef union
179{
180 uint8_t B[MAX_LEN];
181 uint16_t S[MAX_LEN / 2];
182 uint32_t I[MAX_LEN / 4];
183 uint64_t L[MAX_LEN / 8];
184} CNV_FACT;
185
186typedef struct
187{
188 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
189 int32_t bufPos; //next byte to read to the buffer next
190 int32_t bufLen; //number of bytes left to read
191 int32_t skip; //number of bytes skipped before start of event
192
193 int errCnt; //how often connect failed since last successful
194 int sockStat; //-1 if socket not yet connected , 99 if not exist
195 int socket; //contains the sockets
196
197 struct sockaddr_in SockAddr; //IP for each socket
198
199 int evtID; // event ID of event currently read
200 int runID; // run "
201 int ftmID; // event ID from FTM
202 uint fadLen; // FADlength of event currently read
203 int fadVers; // Version of FAD
204 int ftmTyp; // trigger type
205 int Port;
206
207 CNV_FACT *rBuf;
208
209} READ_STRUCT;
210
211/*-----------------------------------------------------------------*/
212
213
214/*-----------------------------------------------------------------*/
215
216
217int
218GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
219 READ_STRUCT * rs)
220{
221/*
222*** generate Address, create sockets and allocates readbuffer for it
223***
224*** if flag==0 generate socket and buffer
225*** <0 destroy socket and buffer
226*** >0 close and redo socket
227***
228*** sid : board*7 + port id
229 */
230
231 //close socket if open
232 if (rs->sockStat == 0)
233 {
234 if (close (rs->socket) > 0) {
235 factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno);
236 } else {
237 factPrintf(kInfo, 771, "Succesfully closed socket %d", sid);
238 }
239 }
240
241 rs->sockStat = 99;
242
243 if (flag < 0) {
244 free (rs->rBuf); //and never open again
245 rs->rBuf = NULL;
246 return 0;
247 }
248
249
250 if (flag == 0) { //generate address and buffer ...
251 rs->Port = port;
252 rs->SockAddr.sin_family = sockAddr->sin_family;
253 rs->SockAddr.sin_port = htons (port);
254 rs->SockAddr.sin_addr = sockAddr->sin_addr;
255
256 rs->rBuf = (CNV_FACT*)malloc (sizeof (CNV_FACT));
257 if (rs->rBuf == NULL) {
258 factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid);
259 rs->sockStat = 77;
260 return -3;
261 }
262 }
263
264
265 if ((rs->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
266 factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno);
267 rs->sockStat = 88;
268 return -2;
269 }
270
271 int optval = 1;
272 if (setsockopt (rs->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) {
273 factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
274 }
275 optval = 10; //start after 10 seconds
276 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) {
277 factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
278 }
279 optval = 10; //do every 10 seconds
280 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) {
281 factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
282 }
283 optval = 2; //close after 2 unsuccessful tries
284 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) {
285 factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
286 }
287
288 factPrintf(kInfo, 773, "Successfully generated socket %d", sid);
289
290 rs->sockStat = -1; //try to (re)open socket
291 rs->errCnt = 0;
292 return 0;
293
294} /*-----------------------------------------------------------------*/
295
296 /*-----------------------------------------------------------------*/
297
298int checkRoiConsistency(const CNV_FACT *rbuf, int roi[])
299{
300 int xjr = -1;
301 int xkr = -1;
302
303 //points to the very first roi
304 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
305
306 roi[0] = ntohs(rbuf->S[roiPtr]);
307
308 for (int jr = 0; jr < 9; jr++)
309 {
310 roi[jr] = ntohs(rbuf->S[roiPtr]);
311
312 if (roi[jr]<0 || roi[jr]>1024)
313 {
314 factPrintf(kError, 999, "Illegal roi in channel %d (allowed: 0<=roi<=1024)", jr, roi[jr]);
315 return 0;
316 }
317
318 // Check that the roi of pixels jr are compatible with the one of pixel 0
319 if (jr!=8 && roi[jr]!=roi[0])
320 {
321 xjr = jr;
322 break;
323 }
324
325 // Check that the roi of all other DRS chips on boards are compatible
326 for (int kr = 1; kr < 4; kr++)
327 {
328 const int kroi = ntohs(rbuf->S[roiPtr]);
329 if (kroi != roi[jr])
330 {
331 xjr = jr;
332 xkr = kr;
333 break;
334 }
335 roiPtr += kroi+4;
336 }
337 }
338
339 if (xjr>=0)
340 {
341 if (xkr<0)
342 factPrintf(kFatal, 1, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
343 else
344 factPrintf(kFatal, 1, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rbuf->S[roiPtr]));
345
346 return 0;
347 }
348
349 if (roi[8] < roi[0])
350 {
351 factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
352 //gj.badRoiB++;
353 //gj.badRoi[b]++;
354 return 0;
355 }
356
357 return 1;
358}
359
360deque<shared_ptr<EVT_CTRL>> evtCtrl;
361map<int, shared_ptr<RUN_CTRL>> runCtrl;
362
363void mBufFree(EVT_CTRL *evt)
364{
365 TGB_free(evt->FADhead);
366}
367
368shared_ptr<EVT_CTRL> mBufEvt(const READ_STRUCT *rs)
369{
370 int nRoi[9];
371 if (!checkRoiConsistency(rs->rBuf, nRoi))
372 return shared_ptr<EVT_CTRL>();
373
374 const int evID = rs->evtID;
375 const uint runID = rs->runID;
376 const int trgTyp = rs->ftmTyp;
377 const int trgNum = rs->ftmID;
378 const int fadNum = rs->evtID;
379
380 for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
381 {
382 const shared_ptr<EVT_CTRL> evt = *it;
383
384 // If the run is different, go on searching.
385 // We cannot stop searching if a lower run-id is found as in
386 // the case of the events, because theoretically, there
387 // can be the same run on two different days.
388 if (runID != evt->runNum)
389 continue;
390
391 // If the ID of the new event if higher than the last one stored
392 // in that run, we have to assign a new slot (leave the loop)
393 if (evID > evt->evNum)
394 break;
395
396 if (evID != evt->evNum)
397 continue;
398
399 // We have found an entry with the same runID and evtID
400 // Check if ROI is consistent
401 if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
402 {
403 factPrintf(kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
404 evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
405 return shared_ptr<EVT_CTRL>();
406 }
407
408 // count for inconsistencies
409 if (evt->trgNum != trgNum)
410 evt->Errors[0]++;
411 if (evt->fadNum != fadNum)
412 evt->Errors[1]++;
413 if (evt->trgTyp != trgTyp)
414 evt->Errors[2]++;
415
416 //everything seems fine so far ==> use this slot ....
417 return evt;
418 }
419
420 struct timeval tv;
421 gettimeofday(&tv, NULL);
422
423 auto ir = runCtrl.find(runID);
424 if (ir==runCtrl.end())
425 {
426 shared_ptr<RUN_CTRL> run(new RUN_CTRL);
427
428 run->runId = runID;
429 run->roi0 = nRoi[0]; // FIXME: Make obsolete!
430 run->roi8 = nRoi[8]; // FIXME: Make obsolete!
431 run->fileId = -2;
432 run->lastEvt = 1; // Number of events partially started to read
433 run->actEvt = 0; // Number of written events (write)
434 run->procEvt = 0; // Number of successfully checked events (checkEvent)
435 run->maxEvt = 999999999; // max number events allowed
436 run->lastTime = tv.tv_sec; // Time when the last event was written
437 run->closeTime = tv.tv_sec + 3600 * 24; //max time allowed
438
439 ir = runCtrl.insert(make_pair(runID, run)).first;
440 }
441
442 const shared_ptr<RUN_CTRL> run = ir->second;
443
444 if (run->roi0 != nRoi[0] || run->roi8 != nRoi[8])
445 {
446 factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
447 run->roi0, run->roi8, nRoi[0], nRoi[8], runID, evID);
448 return shared_ptr<EVT_CTRL>();
449 }
450
451 const shared_ptr<EVT_CTRL> evt(new EVT_CTRL, mBufFree);
452
453 //flag all boards as unused
454 evt->nBoard = 0;
455 for (int b=0; b<NBOARDS; b++)
456 evt->board[b] = -1;
457
458 evt->run = run;
459 evt->pcTime[0] = tv.tv_sec;
460 evt->pcTime[1] = tv.tv_usec;
461 evt->nRoi = nRoi[0];
462 evt->nRoiTM = nRoi[8];
463 evt->evNum = evID;
464 evt->runNum = runID;
465 evt->fadNum = fadNum;
466 evt->trgNum = trgNum;
467 evt->trgTyp = trgTyp;
468 evt->Errors[0] = 0;
469 evt->Errors[1] = 0;
470 evt->Errors[2] = 0;
471 evt->Errors[3] = 0;
472 evt->fEvent = NULL;
473 evt->FADhead = NULL;
474
475 // -1: kInValid
476 // 0: kValid
477 // 1-40: kIncomplete
478 // 90: kIncompleteReported
479 // 100: kCompleteEventInBuffer
480 // 1000+x: kToBeProcessedByThreadX
481 // 5000: kToBeWritten
482 // 10000: kToBeDeleted
483
484 evt->evtStat = 0;
485
486 evtCtrl.push_back(evt);
487
488 return evt;
489
490} /*-----------------------------------------------------------------*/
491
492
493void initEvent(const shared_ptr<EVT_CTRL> &evt)
494{
495 evt->fEvent = (EVENT*)((char*)evt->FADhead+MAX_HEAD_MEM);
496 memset(evt->fEvent->Adc_Data, 0, (NPIX+NTMARK)*2*evt->nRoi);
497
498 //flag all pixels as unused
499 for (int k = 0; k < NPIX; k++)
500 evt->fEvent->StartPix[k] = -1;
501
502 //flag all TMark as unused
503 for (int k = 0; k < NTMARK; k++)
504 evt->fEvent->StartTM[k] = -1;
505
506 evt->fEvent->NumBoards = 0;
507 evt->fEvent->SoftTrig = 0;
508 evt->fEvent->PCTime = evt->pcTime[0];
509 evt->fEvent->PCUsec = evt->pcTime[1];
510 evt->fEvent->Roi = evt->nRoi;
511 evt->fEvent->RoiTM = evt->nRoiTM;
512 evt->fEvent->EventNum = evt->evNum;
513 evt->fEvent->TriggerNum = evt->trgNum;
514 evt->fEvent->TriggerType = evt->trgTyp;
515}
516
517
518uint64_t reportIncomplete(const shared_ptr<EVT_CTRL> &evt, const char *txt)
519{
520 factPrintf(kWarn, 601, "skip incomplete evt (run=%d, evt=%d, %s)",
521 evt->runNum, evt->evNum, txt);
522
523 uint64_t report = 0;
524
525 char str[1000];
526
527 int ik=0;
528 for (int ib=0; ib<NBOARDS; ib++)
529 {
530 if (ib%10==0)
531 str[ik++] = '|';
532
533 const int jb = evt->board[ib];
534 if (jb>=0) // data received from that board
535 {
536 str[ik++] = '0'+(jb%10);
537 continue;
538 }
539
540 // FIXME: This is not synchronous... it reports
541 // accoridng to the current connection status, not w.r.t. to the
542 // one when the event was taken.
543 if (gi_NumConnect[ib]<=0) // board not connected
544 {
545 str[ik++] = 'x';
546 continue;
547 }
548
549 // data from this board lost
550 str[ik++] = '.';
551 report |= ((uint64_t)1)<<ib;
552 }
553
554 str[ik++] = '|';
555 str[ik] = 0;
556
557 factOut(kWarn, 601, str);
558
559 return report;
560}
561
562// i == board
563void copyData(CNV_FACT *rbuf, int i, const shared_ptr<EVT_CTRL> &evt)
564{
565 // swapEventHeaderBytes: End of the header. to channels now
566 int eStart = 36;
567 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
568 {
569 rbuf->S[eStart+0] = ntohs(rbuf->S[eStart+0]);//id
570 rbuf->S[eStart+1] = ntohs(rbuf->S[eStart+1]);//start_cell
571 rbuf->S[eStart+2] = ntohs(rbuf->S[eStart+2]);//roi
572 rbuf->S[eStart+3] = ntohs(rbuf->S[eStart+3]);//filling
573
574 eStart += 4+rbuf->S[eStart+2];//skip the pixel data
575 }
576
577 memcpy(&evt->FADhead[i], rbuf, sizeof(PEVNT_HEADER));
578
579 int src = sizeof(PEVNT_HEADER) / 2;
580
581 // consistency of ROIs have been checked already (is it all correct?)
582 const int roi = rbuf->S[src+2];
583
584 // different sort in FAD board.....
585 for (int px = 0; px < 9; px++)
586 {
587 for (int drs = 0; drs < 4; drs++)
588 {
589 // pixH = rd[i].rBuf->S[src++]; // ID
590 src++;
591
592 const int pixC = rbuf->S[src++]; // start-cell
593 const int pixR = rbuf->S[src++]; // roi
594 //here we should check if pixH is correct ....
595
596 const int pixS = i * 36 + drs * 9 + px;
597 src++;
598
599 evt->fEvent->StartPix[pixS] = pixC;
600
601 const int dest1 = pixS * roi;
602 memcpy(&evt->fEvent->Adc_Data[dest1], &rbuf->S[src], roi * 2);
603
604 src += pixR;
605
606 if (px == 8)
607 {
608 const int tmS = i * 4 + drs;
609
610 //and we have additional TM info
611 if (pixR > roi)
612 {
613 const int dest2 = tmS * roi + NPIX * roi;
614
615 const int srcT = src - roi;
616 evt->fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024;
617
618 memcpy(&evt->fEvent->Adc_Data[dest2], &rbuf->S[srcT], roi * 2);
619 }
620 else
621 {
622 evt->fEvent->StartTM[tmS] = -1;
623 }
624 }
625 }
626 }
627}
628
629void doProcess(const shared_ptr<EVT_CTRL> &evt);
630void doWrite(const shared_ptr<EVT_CTRL> &evt);
631
632void checkAndCloseRun(const shared_ptr<RUN_CTRL> &run, int cond, int where);
633
634Queue<shared_ptr<EVT_CTRL>> process(bind(doProcess, placeholders::_1));
635Queue<shared_ptr<EVT_CTRL>> write_queue(bind(doWrite, placeholders::_1));
636
637void preProcess(const shared_ptr<EVT_CTRL> &evt)
638{
639 //-------- it is better to open the run already here, so call can be used to initialize
640 //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
641 const shared_ptr<RUN_CTRL> run = evt->run;
642 if (run->runId==0)
643 return;
644
645 // File not yet open
646 if (run->fileId < 0)
647 {
648 RUN_HEAD actRun;
649 actRun.Version = 1;
650 actRun.RunType = -1; //to be adapted
651 actRun.Nroi = evt->nRoi; //runCtrl[lastRun].roi0;
652 actRun.NroiTM = evt->nRoiTM; //runCtrl[lastRun].roi8;
653 actRun.RunTime = evt->pcTime[0]; //runCtrl[lastRun].firstTime;
654 actRun.RunUsec = evt->pcTime[1]; //runCtrl[lastRun].firstUsec;
655 actRun.NBoard = NBOARDS;
656 actRun.NPix = NPIX;
657 actRun.NTm = NTMARK;
658
659 memcpy(actRun.FADhead, evt->FADhead, NBOARDS*sizeof(PEVNT_HEADER));
660
661 run->fileHd = runOpen(evt->runNum, &actRun, sizeof (actRun));
662 if (run->fileHd == NULL)
663 {
664 factPrintf(kError, 502, "procEvt: Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
665 run->fileId = 91;
666
667 // No further processing of this event
668 return;
669 }
670
671 run->fileId = 0;
672 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
673 }
674
675 //and set correct event header ; also check for consistency in event (not yet)
676 evt->fEvent->Errors[0] = evt->Errors[0];
677 evt->fEvent->Errors[1] = evt->Errors[1];
678 evt->fEvent->Errors[2] = evt->Errors[2];
679 evt->fEvent->Errors[3] = evt->Errors[3];
680
681 for (int ib=0; ib<NBOARDS; ib++)
682 {
683 // board is not read
684 if (evt->board[ib] == -1)
685 {
686 evt->FADhead[ib].start_package_flag = 0;
687 evt->fEvent->BoardTime[ib] = 0;
688 }
689 else
690 {
691 evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time;
692 }
693 }
694
695 const int rc = eventCheck(evt->runNum, evt->FADhead, evt->fEvent);
696
697 // no further processing of event ('delete')
698 if (rc < 0)
699 return;
700
701 //evt->evtStat = 1000; // flag 'start processing'
702 run->procEvt++;
703 process.post(evt);
704}
705
706void doProcess(const shared_ptr<EVT_CTRL> &evt)
707{
708 const int jret = subProcEvt(1, evt->FADhead, evt->fEvent, 0);
709
710 if (jret>0 && jret<=1)
711 factPrintf(kError, -1, "Process wants to send event to process %d... not allowed.", jret);
712
713 // flag as 'to be written'
714 if (jret<=1)
715 return;
716
717 //evt->evtStat = 5000;
718 write_queue.post(evt);
719}
720
721void doWrite(const shared_ptr<EVT_CTRL> &evt)
722{
723 const shared_ptr<RUN_CTRL> run = evt->run;
724 if (run->runId==0)
725 return;
726
727 // File is not open
728 if (run->fileId!=0)
729 return;
730
731 const int rc = runWrite(run->fileHd, evt->fEvent, 0);
732 if (rc >= 0)
733 {
734 // Sucessfully wrote event
735 run->lastTime = g_actTime;
736 run->actEvt++;
737 }
738 else
739 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", evt->runNum);
740
741 checkAndCloseRun(run, rc<0, 1);
742
743 /*
744 // Although the are no pending events, we have to check if a run should be closed (timeout)
745 for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
746 {
747 if (ir->second->fileId == 0)
748 {
749 //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
750 const int cond = ir->second->runId == 0;
751 checkAndCloseRun(ir->second, cond, 2);
752 }
753 }
754 */
755}
756
757void *readFAD (void *ptr)
758{
759/* *** main loop reading FAD data and sorting them to complete events */
760
761 Queue<shared_ptr<EVT_CTRL>> queue(bind(preProcess, placeholders::_1));
762
763 factPrintf(kInfo, -1, "Start initializing (readFAD)");
764
765 READ_STRUCT rd[NBOARDS]; //buffer to read IP and afterwards store in mBuffer
766
767 uint32_t actrun = 0;
768
769 const int minLen = sizeof(PEVNT_HEADER); //min #bytes needed to check header: full header for debug
770
771
772 int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
773 gi_resetS = gi_resetR = 9;
774
775 int sockDef[NBOARDS]; //internal state of sockets
776 memset(sockDef, 0, NBOARDS*sizeof(int));
777
778 START:
779 //time in seconds
780 uint gi_SecTime = time(NULL);;
781 g_actTime = gi_SecTime;
782
783 const int cntsock = 8 - NUMSOCK ;
784
785 if (gi_resetS > 0) {
786 //make sure all sockets are preallocated as 'not exist'
787 for (int i = 0; i < NBOARDS; i++) {
788 rd[i].socket = -1;
789 rd[i].sockStat = 99;
790 }
791
792 for (int k = 0; k < NBOARDS; k++) {
793 gi_NumConnect[k] = 0;
794 //gi.numConn[k] = 0;
795 gj.numConn[k] = 0;
796 //gj.errConn[k] = 0;
797 gj.rateBytes[k] = 0;
798 gj.totBytes[k] = 0;
799 }
800
801 }
802
803 if (gi_resetR > 0)
804 {
805 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
806 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
807 gj.totMem = tgb_memory;
808 gj.bufNew = gj.bufEvt = 0;
809 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
810
811 factPrintf(kInfo, -1, "End initializing (readFAD)");
812 }
813
814
815 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
816
817 //loop until global variable g_runStat claims stop
818 while (g_runStat >= 0 && g_reset == 0)
819 {
820 gj.readStat = g_runStat;
821
822 for (int b = 0; b < NBOARDS; b++)
823 {
824 // Nothing has changed
825 if (g_port[b].sockDef == sockDef[b])
826 continue;
827
828 gi_NumConnect[b] = 0; //must close all connections
829 gj.numConn[b] = 0;
830
831 // s0 = 0: sockets to be defined and opened
832 // s0 = -1: sockets to be destroyed
833 // s0 = +1: sockets to be closed and reopened
834
835 int s0 = 0;
836 if (sockDef[b] != 0)
837 s0 = g_port[b].sockDef==0 ? -1 : +1;
838
839 const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0;
840
841 GenSock(s0, b, p0+1, &g_port[b].sockAddr, &rd[b]); //generate address and socket
842
843 sockDef[b] = g_port[b].sockDef;
844 }
845
846 // count the number of active boards
847 int actBoards = 0;
848 for (int b = 0; b < NBOARDS; b++)
849 if (sockDef[b] > 0)
850 actBoards++;
851
852 //check all sockets if something to read
853 for (int i = 0; i < NBOARDS; i++)
854 {
855 // Do not try to connect this socket
856 if (rd[i].sockStat > 0)
857 continue;
858
859 if (rd[i].sockStat == -1)
860 {
861 //try to connect if not yet done
862 rd[i].sockStat = connect (rd[i].socket,
863 (struct sockaddr *) &rd[i].SockAddr,
864 sizeof (rd[i].SockAddr));
865 // Failed
866 if (rd[i].sockStat == -1)
867 {
868 rd[i].errCnt++;
869 usleep(25000);
870 continue;
871 }
872
873 // Success (rd[i].sockStat == 0)
874
875 if (sockDef[i] > 0)
876 {
877 rd[i].bufTyp = 0; // expect a header
878 rd[i].bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
879 }
880 else
881 {
882 rd[i].bufTyp = -1; // full data to be skipped
883 rd[i].bufLen = MAX_LEN; // huge for skipping
884 }
885
886 rd[i].bufPos = 0; // no byte read so far
887 rd[i].skip = 0; // start empty
888
889 gi_NumConnect[i] += cntsock;
890 gj.numConn[i]++;
891
892 factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", i, gj.numConn[i]);
893 }
894
895 // Do not read from this socket
896 if (rd[i].bufLen<0)
897 continue;
898
899 if (rd[i].bufLen>0)
900 {
901 const int32_t jrd =
902 recv(rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
903 rd[i].bufLen, MSG_DONTWAIT);
904
905 // recv failed
906 if (jrd<0)
907 {
908 // There was just nothing waiting
909 if (errno==EWOULDBLOCK || errno==EAGAIN)
910 continue;
911
912 factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);
913 continue;
914 }
915
916 // connection was closed ...
917 if (jrd==0)
918 {
919 factPrintf(kInfo, 441, "Socket %d closed by FAD", i);
920
921 const int s0 = sockDef[i] > 0 ? +1 : -1;
922 GenSock(s0, i, 0, NULL, &rd[i]);
923
924 gi_NumConnect[i]-= cntsock ;
925 gj.numConn[i]--;
926
927 continue;
928 }
929
930 gj.rateBytes[i] += jrd;
931
932 // are we skipping this board ...
933 if (rd[i].bufTyp < 0)
934 continue;
935
936 rd[i].bufPos += jrd; //==> prepare for continuation
937 rd[i].bufLen -= jrd;
938
939#ifdef EVTDEBUG
940 debugRead(i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, rd[i].bufTyp, tv.tv_sec, tv.tv_usec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
941#endif
942 }
943
944 //we are reading event header
945 if (rd[i].bufTyp <= 0)
946 {
947 //not yet sufficient data to take action
948 if (rd[i].bufPos < minLen)
949 continue;
950
951 //check if startflag correct; else shift block ....
952 // FIXME: This is not enough... this combination of
953 // bytes can be anywhere... at least the end bytes
954 // must be checked somewhere, too.
955 int k;
956 for (k = 0; k < rd[i].bufPos - 1; k++)
957 {
958 //start.S = 0xFB01;
959 if (rd[i].rBuf->B[k] == 0xfb && rd[i].rBuf->B[k+1] == 0x01)
960 break;
961 }
962 rd[i].skip += k;
963
964 //no start of header found
965 if (k >= rd[i].bufPos - 1)
966 {
967 rd[i].rBuf->B[0] = rd[i].rBuf->B[rd[i].bufPos];
968 rd[i].bufPos = 1;
969 rd[i].bufLen = sizeof(PEVNT_HEADER)-1;
970 continue;
971 }
972
973 if (k > 0)
974 {
975 rd[i].bufPos -= k;
976 rd[i].bufLen += k;
977 memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
978 rd[i].bufPos);
979 }
980
981 if (rd[i].bufPos < minLen)
982 continue;
983
984 if (rd[i].skip > 0)
985 {
986 factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i);
987 rd[i].skip = 0;
988 }
989
990 // TGB: This needs much more checks than just the first two bytes!
991
992 // Swap everything except start_package_flag.
993 // It is to difficult to find out where it is used how,
994 // but it doesn't really matter because it is not really
995 // used anywehere else
996 // rd[i].rBuf->S[1] = ntohs(rd[i].rBuf->S[1]); // package_length
997 rd[i].rBuf->S[2] = ntohs(rd[i].rBuf->S[2]); // version_no
998 rd[i].rBuf->S[3] = ntohs(rd[i].rBuf->S[3]); // PLLLCK
999 rd[i].rBuf->S[4] = ntohs(rd[i].rBuf->S[4]); // trigger_crc
1000 rd[i].rBuf->S[5] = ntohs(rd[i].rBuf->S[5]); // trigger_type
1001
1002 rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]); // board id
1003 rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]); // adc_clock_phase_shift
1004 rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]); // number_of_triggers_to_generate
1005 rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]); // trigger_generator_prescaler
1006
1007 rd[i].rBuf->I[3] = ntohl(rd[i].rBuf->I[3]); // trigger_id
1008 rd[i].rBuf->I[4] = ntohl(rd[i].rBuf->I[4]); // fad_evt_counter
1009 rd[i].rBuf->I[5] = ntohl(rd[i].rBuf->I[5]); // REFCLK_frequency
1010
1011 rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]); // runnumber;
1012 rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]); // time;
1013
1014 for (int s=24;s<24+NTemp+NDAC;s++)
1015 rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac
1016
1017 rd[i].fadLen = ntohs(rd[i].rBuf->S[1]) * 2;
1018 rd[i].fadVers = rd[i].rBuf->S[2];
1019 rd[i].ftmTyp = rd[i].rBuf->S[5];
1020 rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt)
1021 rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt)
1022 rd[i].runID = rd[i].rBuf->I[11]==0 ? g_actTime : rd[i].rBuf->I[11];
1023 rd[i].bufTyp = 1; //ready to read full record
1024 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1025
1026 const int fadBoard = rd[i].rBuf->S[12];
1027 debugHead(i, fadBoard, rd[i].rBuf);
1028
1029 continue;
1030 }
1031
1032 // are we reading data
1033
1034 // not yet all read
1035 if (rd[i].bufLen > 0)
1036 continue;
1037
1038 // stop.S = 0x04FE;
1039 if (rd[i].rBuf->B[rd[i].fadLen - 1] != 0xfe ||
1040 rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04)
1041 {
1042 factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d",
1043 i, rd[i].evtID, rd[i].fadLen,
1044 rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]);
1045
1046 // ready to read next header
1047 rd[i].bufTyp = 0;
1048 rd[i].bufLen = sizeof(PEVNT_HEADER);
1049 rd[i].bufPos = 0;
1050
1051 continue;
1052 }
1053
1054 // int actid;
1055 // if (g_useFTM > 0)
1056 // actid = rd[i].evtID;
1057 // else
1058 // actid = rd[i].ftmID;
1059
1060 //get index into mBuffer for this event (create if needed)
1061 const shared_ptr<EVT_CTRL> evt = mBufEvt(&rd[i]);
1062
1063 // We have a valid entry, but no memory has yet been allocated
1064 if (evt && evt->FADhead == NULL)
1065 {
1066 // Try to get memory from the big buffer
1067 evt->FADhead = (PEVNT_HEADER*)TGB_Malloc();
1068 if (evt->FADhead == NULL)
1069 {
1070 // If this works properly, this is a hack which can be removed, or
1071 // replaced by a signal or dim message
1072 if (rd[i].bufTyp==2)
1073 factPrintf(kError, 882, "malloc failed for event %d (run=%d)", evt->evNum, evt->runNum);
1074 rd[i].bufTyp = 2;
1075 continue;
1076 }
1077
1078 // Initialise contents of mBuffer[evID]->fEvent
1079 initEvent(evt);
1080
1081 // Some statistics
1082 gj.usdMem = tgb_inuse;
1083
1084 if (gj.usdMem > gj.maxMem)
1085 gj.maxMem = gj.usdMem;
1086
1087 gj.rateNew++;
1088 gj.bufTot++;
1089 if (gj.bufTot > gj.maxEvt)
1090 gj.maxEvt = gj.bufTot;
1091 }
1092
1093 rd[i].bufTyp = 0;
1094 rd[i].bufLen = sizeof(PEVNT_HEADER);
1095 rd[i].bufPos = 0;
1096
1097 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1098 if (!evt)
1099 continue;
1100
1101 //we have a valid entry in mBuffer[]; fill it
1102 const int fadBoard = rd[i].rBuf->S[12];
1103 const int fadCrate = fadBoard>>8;
1104
1105 if (i != (fadCrate * 10 + (fadBoard&0xff)))
1106 {
1107 factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)",
1108 i, fadBoard, fadCrate, fadBoard&0xff);
1109 }
1110
1111 if (evt->board[i] != -1)
1112 {
1113 factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d",
1114 evt->evNum, i, i, rd[i].fadLen,
1115 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1116 rd[i].rBuf->B[rd[i].fadLen - 2],
1117 rd[i].rBuf->B[rd[i].fadLen - 1]);
1118 continue; // Continue reading next header
1119 }
1120
1121 // Copy data from rd[i] to mBuffer[evID]
1122 copyData(rd[i].rBuf, i, evt);
1123
1124 // now we have stored a new board contents into Event structure
1125
1126 evt->fEvent->NumBoards++;
1127 evt->board[i] = i;
1128 evt->nBoard++;
1129 evt->evtStat = evt->nBoard;
1130
1131 // have we already reported first (partial) event of this run ???
1132 if (evt->nBoard==1 && evt->runNum != actrun)
1133 {
1134 // Signal the fadctrl that a new run has been started
1135 gotNewRun(evt->runNum, NULL);
1136
1137 factPrintf(kInfo, 1, "gotNewRun called, prev run %d, new run %d, event %d",
1138 actrun, evt->runNum, evt->evNum);
1139
1140 // We got the first part of this event, so this is
1141 // the number of events we expect for this run
1142 evt->run->lastEvt++;
1143
1144 // Since we have started a new run, we know already when to close the
1145 // previous run in terms of number of events
1146 const auto ir = runCtrl.find(actrun);
1147 if (ir!=runCtrl.end())
1148 ir->second->maxEvt = ir->second->lastEvt;
1149
1150 // Change 'actrun' the the new runnumber
1151 actrun = evt->runNum;
1152 }
1153
1154 // event not yet complete
1155 if (evt->nBoard < actBoards)
1156 continue;
1157
1158 // GARBAGE COLLECTION
1159 // This is a non-ideal hack to lower the probability that
1160 // in mBufEvt the search for correct entry in runCtrl
1161 // will not return a super-old entry. I don't want
1162 // to manipulate that in another thread.
1163 for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
1164 {
1165 if (ir->runId==evt->runNum)
1166 break;
1167
1168 if (ir->second->fileId>0)
1169 runCtrl.erase(ir);
1170 }
1171
1172 // we have just completed an event... so all previous events
1173 // must have been completed already. If they are not, there
1174 // is no need to wait for the timeout, because they will never
1175 // get completed. We can just ensure that if we check for the previous
1176 // event to be complete every time we receive a new complete event.
1177 // If we find an incomplete one, we remove all consecutive
1178 // incomplete ones.
1179 for (auto it=evtCtrl.begin()+1; it!=evtCtrl.end(); it++)
1180 {
1181 const shared_ptr<EVT_CTRL> e = *it;
1182
1183 if (e.get()==evt.get())
1184 {
1185 queue.post(e);
1186 evtCtrl.erase(it);
1187 break;
1188 }
1189
1190 reportIncomplete(e, "expired");
1191 evtCtrl.pop_front();
1192 }
1193
1194 } // end for loop over all sockets
1195
1196 g_actTime = time (NULL);
1197 if (g_actTime <= gi_SecTime)
1198 {
1199 usleep(1);
1200 continue;
1201 }
1202 gi_SecTime = g_actTime;
1203
1204 gj.bufNew = 0;
1205
1206 //loop over all active events and flag those older than read-timeout
1207 //delete those that are written to disk ....
1208
1209 const int count = evtCtrl.size();//(evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT;
1210
1211 // This could be improved having the pointer which separates the queue with
1212 // the incomplete events from the queue with the complete events
1213 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
1214 {
1215 const shared_ptr<EVT_CTRL> evt = *it;
1216
1217 // Check the more likely case first: incomplete events
1218 if (evt->evtStat>=0 && evt->evtStat<100)
1219 {
1220 gj.bufNew++; //incomplete event in Buffer
1221
1222 // Event has not yet timed out or was reported already
1223 if (evt->evtStat==90 || evt->pcTime[0]>=g_actTime - 30)
1224 continue;
1225
1226 // This will result in the emission of a dim service.
1227 // It doesn't matter if that takes comparably long,
1228 // because we have to stop the run anyway.
1229 const uint64_t rep = reportIncomplete(evt, "timeout");
1230 factReportIncomplete(rep);
1231
1232 //timeout for incomplete events
1233 evt->evtStat = 90;
1234 gj.evtSkip++;
1235
1236 continue;
1237 }
1238
1239 // Check the less likely case: 'useless' or 'delete'
1240 // evtState==0 can happen if the event was initialized (some data received)
1241 // but the data did not make sense (e.g. inconsistent rois)
1242 if (evt->evtStat==0 || evt->evtStat == 10000)
1243 {
1244 evtCtrl.erase(it); //event written--> free memory
1245
1246 gj.evtWrite++;
1247 gj.rateWrite++;
1248 }
1249
1250 // Remove leading invalidated slots from queue
1251 // Do they exist at all?
1252 if (evt->evtStat==-1)
1253 evtCtrl.erase(it);
1254 }
1255
1256
1257
1258 // The number of complete events in the buffer is the total number of
1259 // events in the buffer minus the number of incomplete events.
1260 gj.bufEvt = count - gj.bufNew;
1261
1262 gj.deltaT = 1000; //temporary, must be improved
1263
1264 for (int ib = 0; ib < NBOARDS; ib++)
1265 gj.totBytes[ib] += gj.rateBytes[ib];
1266
1267 gj.totMem = tgb_memory;
1268
1269 if (gj.maxMem > gj.xxxMem)
1270 gj.xxxMem = gj.maxMem;
1271 if (gj.maxEvt > gj.xxxEvt)
1272 gj.xxxEvt = gj.maxEvt;
1273
1274 factStat (gj);
1275 //factStatNew (gi);
1276 gj.rateNew = gj.rateWrite = 0;
1277 gj.maxMem = gj.usdMem;
1278 gj.maxEvt = gj.bufTot;
1279 for (int b = 0; b < NBOARDS; b++)
1280 gj.rateBytes[b] = 0;
1281
1282 } // while (g_runStat >= 0 && g_reset == 0)
1283
1284 factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset);
1285
1286 if (g_reset > 0)
1287 {
1288 gi_reset = g_reset;
1289 gi_resetR = gi_reset % 10; //shall we stop reading ?
1290 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1291 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1292 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1293 g_reset = 0;
1294 }
1295 else
1296 {
1297 gi_reset = 0;
1298 gi_resetR = g_runStat == -1 ? 1 : 7;
1299
1300 gi_resetS = 7; //close all sockets
1301 gi_resetW = 7; //close all files
1302 gi_resetX = 0;
1303
1304 //inform others we have to quit ....
1305 gj.readStat = -11; //inform all that no update to happen any more
1306 }
1307
1308 if (gi_resetS > 0)
1309 {
1310 //must close all open sockets ...
1311 factPrintf(kInfo, -1, "Close all sockets...");
1312
1313 for (int i = 0; i < NBOARDS; i++)
1314 {
1315 if (rd[i].sockStat != 0)
1316 continue;
1317
1318 GenSock(-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1319
1320 gi_NumConnect[i]-= cntsock ;
1321 gj.numConn[i]--;
1322 sockDef[i] = 0; //flag ro recreate the sockets ...
1323 rd[i].sockStat = -1; //and try to open asap
1324 }
1325 }
1326
1327
1328 if (gi_resetR > 0)
1329 {
1330 //and clear all buffers (might have to wait until all others are done)
1331 while (evtCtrl.size())
1332 {
1333 const shared_ptr<EVT_CTRL> evt = evtCtrl.front();
1334
1335 // flag incomplete events as 'read finished'
1336 // We cannot just detele all events, because some might currently being processed,
1337 // so we have to wait until the processing thread currently processing the event
1338 // signals that the event can be deleted. (Note, that there are currently never
1339 // two threads processing the same event at the same time)
1340 if ((evt->evtStat>0 && evt->evtStat<90) || evt->evtStat==10000)
1341 evtCtrl.pop_front();
1342
1343 usleep(1);
1344 }
1345 }
1346
1347 //queue.wait();
1348 //queue.join();
1349
1350 if (gi_reset > 0)
1351 {
1352 if (gi_resetW > 0)
1353 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1354
1355 if (gi_resetX > 0)
1356 {
1357 struct timespec xwait;
1358 xwait.tv_sec = gi_resetX;
1359 xwait.tv_nsec = 0;
1360 nanosleep (&xwait, NULL);
1361 }
1362
1363 factPrintf(kInfo, -1, "Continue read Process ...");
1364 gi_reset = 0;
1365 goto START;
1366 }
1367
1368 factPrintf(kInfo, -1, "Exit read Process...");
1369
1370 factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse);
1371
1372 gj.readStat = -99;
1373
1374 factStat (gj);
1375 //factStatNew (gi);
1376
1377 return 0;
1378
1379} /*-----------------------------------------------------------------*/
1380/*
1381
1382void *subProc(void *thrid)
1383{
1384 const int64_t threadID = (int64_t)thrid;
1385
1386 factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID);
1387
1388 while (g_runStat > -2) //in case of 'exit' we still must process pending events
1389 {
1390 int numWait = 0;
1391
1392 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
1393 {
1394 const shared_ptr<EVT_CTRL> evt = *it;
1395
1396 // This is a threading issue... the evtStat might have been invalid
1397 // but the frstPtr is not yet updated
1398 if (evt->evtStat==-1)
1399 continue;
1400
1401 // If we find the first event still waiting for processing
1402 // there will be only unprocessed events after this one in the queue
1403 if (evt->evtStat<1000+threadID)
1404 {
1405 numWait = 1;
1406 break;
1407 }
1408
1409 // If the event was processed already, skip it
1410 // We could replace that to a moving pointer pointing to the first
1411 // non-processed event
1412 if (evt->evtStat!=1000+threadID)
1413 continue;
1414
1415 const int jret = subProcEvt(threadID, evt->FADhead, evt->fEvent, 0);
1416
1417 if (jret>0 && jret<=threadID)
1418 factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret);
1419
1420 if (jret<=threadID)
1421 {
1422 evt->evtStat = 10000; // flag as 'to be deleted'
1423 continue;
1424 }
1425
1426 if (jret>=gi_maxProc)
1427 {
1428 evt->evtStat = 5000; // flag as 'to be written'
1429 continue;
1430 }
1431
1432 evt->evtStat = 1000 + jret; // flag for next proces
1433 }
1434
1435 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1436 factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID);
1437 return 0;
1438 }
1439
1440 usleep(1);
1441 }
1442
1443 factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID);
1444
1445 return 0;
1446}
1447
1448
1449void *
1450procEvt (void *ptr)
1451{
1452 int status;
1453
1454 factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc);
1455
1456 pthread_t thread[100];
1457// int th_ret[100];
1458
1459 for (long long k = 0; k < gi_maxProc; k++) {
1460 pthread_create (&thread[k], NULL, subProc, (void *) k);
1461 }
1462
1463 // in case of 'exit' we still must process pending events
1464 while (g_runStat > -2)
1465 {
1466 int numWait = 0;
1467
1468 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
1469 {
1470 const shared_ptr<EVT_CTRL> evt = *it;
1471
1472 // This is a threading issue... the evtStat might have been invalid
1473 // but the frstPtr is not yet updated
1474 if (evt->evtStat==-1)
1475 continue;
1476
1477 // If we find the first incomplete event which is not supposed to
1478 // be processed, there are only more incomplete events in the queue
1479 if (evt->evtStat<90)
1480 {
1481 numWait = 1;
1482 break;
1483 }
1484
1485 // If the event was processed already, skip it.
1486 // We could replace that to a moving pointer pointing to the first
1487 // non-processed event
1488 if (evt->evtStat>=1000)
1489 continue;
1490
1491 //-------- it is better to open the run already here, so call can be used to initialize
1492 //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1493 const uint32_t irun = evt->runNum;
1494 const int32_t ievt = evt->evNum;
1495
1496 const shared_ptr<RUN_CTRL> run = evt->run;
1497 if (run->runId==0)
1498 continue;
1499
1500 // File not yet open
1501 if (run->fileId < 0)
1502 {
1503 RUN_HEAD actRun;
1504 actRun.Version = 1;
1505 actRun.RunType = -1; //to be adapted
1506 actRun.Nroi = evt->nRoi; //runCtrl[lastRun].roi0;
1507 actRun.NroiTM = evt->nRoiTM; //runCtrl[lastRun].roi8;
1508 actRun.RunTime = evt->pcTime[0]; //runCtrl[lastRun].firstTime;
1509 actRun.RunUsec = evt->pcTime[1]; //runCtrl[lastRun].firstUsec;
1510 actRun.NBoard = NBOARDS;
1511 actRun.NPix = NPIX;
1512 actRun.NTm = NTMARK;
1513
1514 memcpy(actRun.FADhead, evt->FADhead, NBOARDS*sizeof(PEVNT_HEADER));
1515
1516 run->fileHd = runOpen(irun, &actRun, sizeof (actRun));
1517 if (run->fileHd == NULL)
1518 {
1519 factPrintf(kError, 502, "procEvt: Could not open new file for run %d (evt=%d, runOpen failed)", irun, ievt);
1520 run->fileId = 91;
1521 continue;
1522 }
1523
1524 run->fileId = 0;
1525
1526 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt);
1527 }
1528
1529 //--------
1530 //--------
1531
1532 //and set correct event header ; also check for consistency in event (not yet)
1533 evt->fEvent->Errors[0] = evt->Errors[0];
1534 evt->fEvent->Errors[1] = evt->Errors[1];
1535 evt->fEvent->Errors[2] = evt->Errors[2];
1536 evt->fEvent->Errors[3] = evt->Errors[3];
1537
1538 for (int ib=0; ib<NBOARDS; ib++)
1539 {
1540 // board is not read
1541 if (evt->board[ib] == -1)
1542 {
1543 evt->FADhead[ib].start_package_flag = 0;
1544 evt->fEvent->BoardTime[ib] = 0;
1545 }
1546 else
1547 {
1548 evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time;
1549 }
1550 }
1551
1552 const int rc = eventCheck(evt->runNum, evt->FADhead, evt->fEvent);
1553 if (rc < 0)
1554 {
1555 evt->evtStat = 10000; // flag event to be deleted
1556 }
1557 else
1558 {
1559 evt->evtStat = 1000; // flag 'start processing'
1560 run->procEvt++;
1561 }
1562 }
1563
1564 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1565 factPrintf(kInfo, -1, "Exit Processing Process ...");
1566 gj.procStat = -22; //==> we should exit
1567 return 0;
1568 }
1569
1570 usleep(1);
1571
1572 gj.procStat = gj.readStat;
1573 }
1574
1575 //we are asked to abort asap ==> must flag all remaining events
1576 // when gi_runStat claims that all events are in the buffer...
1577
1578 factPrintf(kInfo, -1, "Abort Processing Process ...");
1579
1580 for (int k = 0; k < gi_maxProc; k++) {
1581 pthread_join (thread[k], (void **) &status);
1582 }
1583
1584 gj.procStat = -99;
1585
1586 return 0;
1587
1588} */
1589
1590int
1591CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
1592{
1593/* close run runId (all all runs if runId=0) */
1594/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1595
1596 if (runId == 0)
1597 {
1598 for (auto it=runCtrl.begin(); it!=runCtrl.end(); it++)
1599 {
1600 const shared_ptr<RUN_CTRL> run = it->second;
1601
1602 //run is open
1603 if (run->fileId == 0)
1604 {
1605 run->closeTime = closeTime;
1606 run->maxEvt = maxEvt;
1607 }
1608 }
1609 return 0;
1610 }
1611
1612 auto it=runCtrl.find(runId);
1613 if (it==runCtrl.end())
1614 return -1;
1615
1616 const shared_ptr<RUN_CTRL> run = it->second;
1617
1618 // run already closed
1619 if (run->fileId>0)
1620 return +2;
1621
1622 run->closeTime = closeTime;
1623 run->maxEvt = maxEvt;
1624
1625 return run->fileId==0 ? 0 : 1;
1626
1627}
1628
1629void checkAndCloseRun(const shared_ptr<RUN_CTRL> &run, int cond, int where)
1630{
1631 if (!cond &&
1632 run->closeTime >= g_actTime &&
1633 run->lastTime >= g_actTime - 300 &&
1634 run->maxEvt > run->actEvt)
1635 return;
1636
1637 //close run for whatever reason
1638 int ii = 0;
1639 if (cond)
1640 ii = 1;
1641 if (run->closeTime < g_actTime)
1642 ii |= 2; // = 2;
1643 if (run->lastTime < g_actTime - 300)
1644 ii |= 4; // = 3;
1645 if (run->maxEvt <= run->actEvt)
1646 ii |= 8; // = 4;
1647
1648 run->closeTime = g_actTime - 1;
1649
1650 const int rc = runClose(run->fileHd, NULL, 0);//&runTail[j], sizeof(runTail[j]));
1651 if (rc<0)
1652 {
1653 factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)",
1654 where, run->runId, rc);
1655 run->fileId = 92+where*2;
1656 }
1657 else
1658 {
1659 factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)",
1660 where, run->runId, ii);
1661 run->fileId = 93+where*2;
1662 }
1663}
1664
1665/*-----------------------------------------------------------------*/
1666
1667/*
1668void *writeEvt (void *ptr)
1669{
1670 factPrintf(kInfo, -1, "Starting write-thread");
1671
1672 while (g_runStat > -2)
1673 {
1674 //int numWrite = 0;
1675 int numWait = 0;
1676
1677 // Note that the current loop does not at all gurantee that
1678 // the events are written in the correct order.
1679 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
1680 {
1681 const shared_ptr<EVT_CTRL> evt = *it;
1682
1683 // This is a threading issue... the evtStat might have been invalid
1684 // but the frstPtr is not yet updated
1685 if (evt->evtStat==-1)
1686 continue;
1687
1688 // If we find the first non-written event which is not supposed to
1689 // be written, there are only more incomplete events in the queue
1690 if (evt->evtStat<5000)
1691 {
1692 numWait = 1;
1693 break;
1694 }
1695
1696 // If the event was written already already, skip it
1697 // We could replace that to a moving pointer pointing to the first
1698 // non-processed event
1699 if (evt->evtStat!=5000)
1700 continue;
1701
1702 const shared_ptr<RUN_CTRL> run = evt->run;
1703 if (run->runId==0)
1704 continue;
1705
1706 // File is open
1707 if (run->fileId==0)
1708 {
1709 const int rc = runWrite(run->fileHd, evt->fEvent, 0);
1710 if (rc >= 0)
1711 {
1712 // Sucessfully wrote event
1713 run->lastTime = g_actTime;
1714 run->actEvt++;
1715 }
1716 else
1717 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", evt->runNum);
1718
1719 checkAndCloseRun(run, rc<0, 1);
1720 }
1721
1722 evt->evtStat = 10000; // event written (or has to be discarded) -> delete
1723 }
1724
1725 // Although the are no pending events, we have to check if a run should be closed (timeout)
1726 for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
1727 {
1728 if (ir->second->fileId == 0)
1729 {
1730 //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
1731 const int cond = ir->second->runId == 0;
1732 checkAndCloseRun(ir->second, cond, 2);
1733 }
1734 }
1735
1736 usleep(1);
1737
1738 //nothing left to do
1739 if (gj.readStat < -10 && numWait == 0)
1740 {
1741 factPrintf(kInfo, -1, "Finish Write Process ...");
1742 gj.writStat = -22; //==> we should exit
1743 break;
1744 }
1745
1746 gj.writStat = gj.readStat;
1747 }
1748
1749 factPrintf(kInfo, -1, "Close all open files ...");
1750 for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
1751 {
1752 if (ir->second->fileId == 0)
1753 checkAndCloseRun(ir->second, 1, 3);
1754 }
1755
1756 gj.writStat = -99;
1757
1758 factPrintf(kInfo, -1, "Exit Writing Process ...");
1759
1760 return 0;
1761}
1762 */
1763
1764
1765
1766
1767void
1768StartEvtBuild ()
1769{
1770
1771 int i, /*j,*/ imax, status/*, th_ret[50]*/;
1772 pthread_t thread[50];
1773 struct timespec xwait;
1774
1775 gj.readStat = gj.procStat = gj.writStat = 0;
1776
1777 factPrintf(kInfo, -1, "Starting EventBuilder V15.07 A");
1778
1779
1780 gi_maxProc = g_maxProc;
1781 if (gi_maxProc <= 0 || gi_maxProc > 90) {
1782 factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc);
1783 gi_maxProc = 1;
1784 }
1785
1786//start all threads (more to come) when we are allowed to ....
1787 while (g_runStat == 0) {
1788 xwait.tv_sec = 0;
1789 xwait.tv_nsec = 10000000; // sleep for ~10 msec
1790 nanosleep (&xwait, NULL);
1791 }
1792
1793 i = 0;
1794 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL);
1795 i++;
1796 ///*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL);
1797 //i++;
1798 ///*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL);
1799 //i++;
1800 imax = i;
1801
1802
1803#ifdef BILAND
1804 xwait.tv_sec = 30;;
1805 xwait.tv_nsec = 0; // sleep for ~20sec
1806 nanosleep (&xwait, NULL);
1807
1808 printf ("close all runs in 2 seconds\n");
1809
1810 CloseRunFile (0, time (NULL) + 2, 0);
1811
1812 xwait.tv_sec = 1;;
1813 xwait.tv_nsec = 0; // sleep for ~20sec
1814 nanosleep (&xwait, NULL);
1815
1816 printf ("setting g_runstat to -1\n");
1817
1818 g_runStat = -1;
1819#endif
1820
1821
1822//wait for all threads to finish
1823 for (i = 0; i < imax; i++) {
1824 /*j =*/ pthread_join (thread[i], (void **) &status);
1825 }
1826
1827} /*-----------------------------------------------------------------*/
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843 /*-----------------------------------------------------------------*/
1844 /*-----------------------------------------------------------------*/
1845 /*-----------------------------------------------------------------*/
1846 /*-----------------------------------------------------------------*/
1847 /*-----------------------------------------------------------------*/
1848
1849#ifdef BILAND
1850
1851int
1852subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
1853 int8_t * buffer)
1854{
1855 printf ("called subproc %d\n", threadID);
1856 return threadID + 1;
1857}
1858
1859
1860
1861
1862 /*-----------------------------------------------------------------*/
1863 /*-----------------------------------------------------------------*/
1864 /*-----------------------------------------------------------------*/
1865 /*-----------------------------------------------------------------*/
1866 /*-----------------------------------------------------------------*/
1867
1868
1869
1870
1871FileHandle_t
1872runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
1873{
1874 return 1;
1875};
1876
1877int
1878runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
1879{
1880 return 1;
1881 usleep (10000);
1882 return 1;
1883}
1884
1885
1886//{ return 1; } ;
1887
1888int
1889runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
1890{
1891 return 1;
1892};
1893
1894
1895
1896
1897int
1898eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
1899{
1900 int i = 0;
1901
1902// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
1903// for (i=0; i<NBOARDS; i++) {
1904// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
1905// }
1906 return 0;
1907}
1908
1909
1910void
1911factStatNew (EVT_STAT gi)
1912{
1913 int i;
1914
1915//for (i=0;i<MAX_SOCK;i++) {
1916// printf("%4d",gi.numRead[i]);
1917// if (i%20 == 0 ) printf("\n");
1918//}
1919}
1920
1921void
1922gotNewRun (int runnr, PEVNT_HEADER * headers)
1923{
1924 printf ("got new run %d\n", runnr);
1925 return;
1926}
1927
1928void
1929factStat (GUI_STAT gj)
1930{
1931// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
1932// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
1933}
1934
1935
1936void
1937debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
1938 int state, uint32_t tsec, uint32_t tusec)
1939{
1940// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
1941}
1942
1943
1944
1945void
1946debugStream (int isock, void *buf, int len)
1947{
1948}
1949
1950void
1951debugHead (int i, int j, void *buf)
1952{
1953}
1954
1955
1956void
1957factOut (int severity, int err, char *message)
1958{
1959 static FILE *fd;
1960 static int file = 0;
1961
1962 if (file == 0) {
1963 printf ("open file\n");
1964 fd = fopen ("x.out", "w+");
1965 file = 999;
1966 }
1967
1968 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
1969
1970 if (severity != kDebug)
1971 printf ("%3d %3d | %s\n", severity, err, message);
1972}
1973
1974
1975
1976int
1977main ()
1978{
1979 int i, b, c, p;
1980 char ipStr[100];
1981 struct in_addr IPaddr;
1982
1983 g_maxMem = 1024 * 1024; //MBytes
1984 g_maxMem = g_maxMem * 200; //100MBytes
1985
1986 g_maxProc = 20;
1987
1988 g_runStat = 40;
1989
1990 i = 0;
1991
1992// version for standard crates
1993//for (c=0; c<4,c++) {
1994// for (b=0; b<10; b++) {
1995// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
1996//
1997// inet_pton(PF_INET, ipStr, &IPaddr) ;
1998//
1999// g_port[i].sockAddr.sin_family = PF_INET;
2000// g_port[i].sockAddr.sin_port = htons(5000) ;
2001// g_port[i].sockAddr.sin_addr = IPaddr ;
2002// g_port[i].sockDef = 1 ;
2003// i++ ;
2004// }
2005//}
2006//
2007//version for PC-test *
2008 for (c = 0; c < 4; c++) {
2009 for (b = 0; b < 10; b++) {
2010 sprintf (ipStr, "10.0.%d.11", 128 + c);
2011 if (c < 2)
2012 sprintf (ipStr, "10.0.%d.11", 128);
2013 else
2014 sprintf (ipStr, "10.0.%d.11", 131);
2015// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2016
2017 inet_pton (PF_INET, ipStr, &IPaddr);
2018 p = 31919 + 100 * c + 10 * b;
2019
2020
2021 g_port[i].sockAddr.sin_family = PF_INET;
2022 g_port[i].sockAddr.sin_port = htons (p);
2023 g_port[i].sockAddr.sin_addr = IPaddr;
2024 g_port[i].sockDef = 1;
2025
2026 i++;
2027 }
2028 }
2029
2030
2031//g_port[17].sockDef =-1 ;
2032//g_actBoards-- ;
2033
2034 StartEvtBuild ();
2035
2036 return 0;
2037
2038}
2039#endif
Note: See TracBrowser for help on using the repository browser.