source: trunk/FACT++/src/EventBuilder.c@ 15472

Last change on this file since 15472 was 15472, checked in by tbretz, 13 years ago
Made rd (the read buffer) a local variable to readFAD; replaced some more assignments of id to k0 by directly k0; fixed a bug in some newly introduced mosulo calculation; removed the loop over 280 sockets, loop over only the existing 40; improved the speed of the loop to check for incomplete and events wich can be deleted; some more minor structural improvements
File size: 68.0 KB
Line 
1// #define EVTDEBUG
2
3#define NUMSOCK 1 //set to 7 for old configuration
4#define MAXREAD 65536 //64kB wiznet buffer
5
6#include <stdlib.h>
7#include <stdint.h>
8#include <stdarg.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include "EventBuilder.h"
26
27enum Severity
28{
29 kMessage = 10, ///< Just a message, usually obsolete
30 kInfo = 20, ///< An info telling something which can be interesting to know
31 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
32 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
33 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
34 kDebug = 99, ///< A message used for debugging only
35};
36
37#define MIN_LEN 32 // min #bytes needed to interpret FADheader
38#define MAX_LEN 256*1024 // size of read-buffer per socket
39
40//#define nanosleep(x,y)
41
42extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
43extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
44extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
45//extern int runFinish (uint32_t runnr);
46
47extern void factOut (int severity, int err, char *message);
48extern void factReportIncomplete (uint64_t rep);
49
50extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
51
52
53extern void factStat (GUI_STAT gj);
54
55extern void factStatNew (EVT_STAT gi);
56
57extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
58
59extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
60 int8_t * buffer);
61
62extern void debugHead (int i, int j, void *buf);
63
64extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
65 int32_t runnr, int state, uint32_t tsec,
66 uint32_t tusec);
67extern void debugStream (int isock, void *buf, int len);
68
69int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
70
71int evtCtrl_frstPtr;
72int evtCtrl_lastPtr;
73
74int g_maxProc;
75int g_maxSize;
76int gi_maxSize;
77int gi_maxProc;
78
79uint g_actTime;
80uint g_actUsec;
81int g_runStat;
82int g_reset;
83int g_useFTM;
84
85int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
86size_t g_maxMem; //maximum memory allowed for buffer
87
88//no longer needed ...
89int g_maxBoards; //maximum number of boards to be initialized
90int g_actBoards;
91//
92
93FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
94
95
96int gi_runStat;
97int gp_runStat;
98int gw_runStat;
99
100uint32_t actrun = 0;
101
102uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
103
104EVT_STAT gi;
105GUI_STAT gj;
106
107EVT_CTRL evtCtrl[MAX_EVT * MAX_RUN]; //control of events during processing
108
109void factPrintf(int severity, int id, const char *fmt, ...)
110{
111 char str[1000];
112
113 va_list ap;
114 va_start(ap, fmt);
115 vsnprintf(str, 1000, fmt, ap);
116 va_end(ap);
117
118 factOut(severity, id, str);
119}
120
121
122#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
123#define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)
124typedef struct TGB_struct
125{
126 struct TGB_struct *prev;
127 void *mem;
128} TGB_entry;
129
130TGB_entry *tgb_last = NULL;
131uint64_t tgb_memory = 0;
132uint64_t tgb_inuse = 0;
133
134void *TGB_Malloc()
135{
136 // No free slot available, next alloc would exceed max memory
137 if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem)
138 return NULL;
139
140 // We will return this amount of memory
141 tgb_inuse += MAX_TOT_MEM;
142
143 // No free slot available, allocate a new one
144 if (!tgb_last)
145 {
146 tgb_memory += MAX_TOT_MEM;
147 return malloc(MAX_TOT_MEM);
148 }
149
150 // Get the next free slot from the stack and return it
151 TGB_entry *last = tgb_last;
152
153 void *mem = last->mem;
154 tgb_last = last->prev;
155
156 free(last);
157
158 return mem;
159};
160
161void TGB_free(void *mem)
162{
163 // Add the last free slot to the stack
164 TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry));
165
166 entry->prev = tgb_last;
167 entry->mem = mem;
168
169 tgb_last = entry;
170
171 // Decrease the amont of memory in use accordingly
172 tgb_inuse -= MAX_TOT_MEM;
173}
174
175RUN_CTRL runCtrl[MAX_RUN];
176
177/*
178*** Definition of rdBuffer to read in IP packets; keep it global !!!!
179 */
180
181typedef union
182{
183 uint8_t B[MAX_LEN];
184 uint16_t S[MAX_LEN / 2];
185 uint32_t I[MAX_LEN / 4];
186 uint64_t L[MAX_LEN / 8];
187} CNV_FACT;
188
189typedef struct
190{
191 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
192 int32_t bufPos; //next byte to read to the buffer next
193 int32_t bufLen; //number of bytes left to read
194 int32_t skip; //number of bytes skipped before start of event
195
196 int errCnt; //how often connect failed since last successful
197 int sockStat; //-1 if socket not yet connected , 99 if not exist
198 int socket; //contains the sockets
199
200 struct sockaddr_in SockAddr; //IP for each socket
201
202 int evtID; // event ID of event currently read
203 int runID; // run "
204 int ftmID; // event ID from FTM
205 uint fadLen; // FADlength of event currently read
206 int fadVers; // Version of FAD
207 int ftmTyp; // trigger type
208 int Port;
209
210 CNV_FACT *rBuf;
211
212} READ_STRUCT;
213
214/*-----------------------------------------------------------------*/
215
216
217/*-----------------------------------------------------------------*/
218
219
220
221int
222runFinish1 (uint32_t runnr)
223{
224 factPrintf(kInfo, 173, "Should finish(1) run %d (but not yet possible)", runnr);
225 return 0;
226}
227int
228runFinish (uint32_t runnr)
229{
230 factPrintf(kInfo, 173, "Should finish run %d (but not yet possible)", runnr);
231 return 0;
232}
233
234int
235GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
236 READ_STRUCT * rs)
237{
238/*
239*** generate Address, create sockets and allocates readbuffer for it
240***
241*** if flag==0 generate socket and buffer
242*** <0 destroy socket and buffer
243*** >0 close and redo socket
244***
245*** sid : board*7 + port id
246 */
247
248 //close socket if open
249 if (rs->sockStat == 0)
250 {
251 if (close (rs->socket) > 0) {
252 factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno);
253 } else {
254 factPrintf(kInfo, 771, "Succesfully closed socket %d", sid);
255 }
256 }
257
258 rs->sockStat = 99;
259
260 if (flag < 0) {
261 free (rs->rBuf); //and never open again
262 rs->rBuf = NULL;
263 return 0;
264 }
265
266
267 if (flag == 0) { //generate address and buffer ...
268 rs->Port = port;
269 rs->SockAddr.sin_family = sockAddr->sin_family;
270 rs->SockAddr.sin_port = htons (port);
271 rs->SockAddr.sin_addr = sockAddr->sin_addr;
272
273 rs->rBuf = (CNV_FACT*)malloc (sizeof (CNV_FACT));
274 if (rs->rBuf == NULL) {
275 factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid);
276 rs->sockStat = 77;
277 return -3;
278 }
279 }
280
281
282 if ((rs->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
283 factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno);
284 rs->sockStat = 88;
285 return -2;
286 }
287
288 int optval = 1;
289 if (setsockopt (rs->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) {
290 factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
291 }
292 optval = 10; //start after 10 seconds
293 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) {
294 factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
295 }
296 optval = 10; //do every 10 seconds
297 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) {
298 factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
299 }
300 optval = 2; //close after 2 unsuccessful tries
301 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) {
302 factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
303 }
304
305 factPrintf(kInfo, 773, "Successfully generated socket %d", sid);
306
307 rs->sockStat = -1; //try to (re)open socket
308 rs->errCnt = 0;
309 return 0;
310
311} /*-----------------------------------------------------------------*/
312
313 /*-----------------------------------------------------------------*/
314
315int checkRoiConsistency(const CNV_FACT *rbuf, int roi[])
316{
317 int xjr = -1;
318 int xkr = -1;
319
320 //points to the very first roi
321 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
322
323 roi[0] = ntohs(rbuf->S[roiPtr]);
324
325 for (int jr = 0; jr < 9; jr++)
326 {
327 roi[jr] = ntohs(rbuf->S[roiPtr]);
328
329 if (roi[jr]<0 || roi[jr]>1024)
330 {
331 factPrintf(kError, 999, "Illegal roi in channel %d (allowed: 0<=roi<=1024)", jr, roi[jr]);
332 return 0;
333 }
334
335 // Check that the roi of pixels jr are compatible with the one of pixel 0
336 if (jr!=8 && roi[jr]!=roi[0])
337 {
338 xjr = jr;
339 break;
340 }
341
342 // Check that the roi of all other DRS chips on boards are compatible
343 for (int kr = 1; kr < 4; kr++)
344 {
345 const int kroi = ntohs(rbuf->S[roiPtr]);
346 if (kroi != roi[jr])
347 {
348 xjr = jr;
349 xkr = kr;
350 break;
351 }
352 roiPtr += kroi+4;
353 }
354 }
355
356 if (xjr>=0)
357 {
358 if (xkr<0)
359 factPrintf(kFatal, 1, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
360 else
361 factPrintf(kFatal, 1, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rbuf->S[roiPtr]));
362
363 return 0;
364 }
365
366 if (roi[8] < roi[0])
367 {
368 factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
369 //gj.badRoiB++;
370 //gj.badRoi[b]++;
371 return 0;
372 }
373
374 return 1;
375}
376
377int mBufEvt(const READ_STRUCT *rs)
378{
379// generate a new Event into mBuffer:
380// make sure only complete Event are possible, so 'free' will always work
381// returns index into mBuffer[], or negative value in case of error
382// error: <-9000 if roi screwed up (not consistent with run)
383// <-8000 (not consistent with event)
384// <-7000 (not consistent with board)
385// < 0 if no space left
386
387 int nRoi[9];
388 if (!checkRoiConsistency(rs->rBuf, nRoi))
389 return -9999;
390
391 const int evID = rs->evtID;
392 const uint runID = rs->runID;
393 const int trgTyp = rs->ftmTyp;
394 const int trgNum = rs->ftmID;
395 const int fadNum = rs->evtID;
396
397 const int beg = (evtCtrl_lastPtr + MAX_EVT*MAX_RUN - 1) % (MAX_EVT*MAX_RUN);
398 const int end = (evtCtrl_frstPtr + MAX_EVT*MAX_RUN - 1) % (MAX_EVT*MAX_RUN);
399
400 for (int k=beg; k!=end; k=(k+MAX_EVT*MAX_RUN-1)%(MAX_EVT*MAX_RUN))
401 {
402 // If the run is different, go on searching.
403 // We cannot stop searching if a lower run-id is found as in
404 // the case of the events, because theoretically, there
405 // can be the same run on two different days.
406 if (runID != evtCtrl[k].runNum)
407 continue;
408
409 // If the ID of the new event if higher than the last one stored
410 // in that run, we have to assign a new slot (leave the loop)
411 if (evID > evtCtrl[k].evNum/* && runID == evtCtrl[k].runNum*/)
412 break;
413
414 if (evID != evtCtrl[k].evNum/* || runID != evtCtrl[k].runNum*/)
415 continue;
416
417 // We have found an entry with the same runID and evtID
418 // Check if ROI is consistent
419 if (evtCtrl[k].nRoi != nRoi[0] || evtCtrl[k].nRoiTM != nRoi[8])
420 {
421 factPrintf(kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
422 evtCtrl[k].nRoi, evtCtrl[k].nRoiTM, nRoi[0], nRoi[8]);
423 return -8201;
424 }
425
426 // count for inconsistencies
427 if (evtCtrl[k].trgNum != trgNum)
428 evtCtrl[k].Errors[0]++;
429 if (evtCtrl[k].fadNum != fadNum)
430 evtCtrl[k].Errors[1]++;
431 if (evtCtrl[k].trgTyp != trgTyp)
432 evtCtrl[k].Errors[2]++;
433
434 //everything seems fine so far ==> use this slot ....
435 return k;
436 }
437
438 // Check if the control structure still has space left
439 if (end-beg==1 || (end==0 && beg==MAX_EVT*MAX_RUN-1))
440 {
441 factPrintf(kError, 881, "No control slot to keep event %d (run %d) %d %d", evID, runID, beg, end);
442 return -1;
443 }
444
445 // FIXME: This should be the time of the first receiped board
446 struct timeval tv;
447 gettimeofday (&tv, NULL);
448
449 const uint32_t tsec = tv.tv_sec;
450 const uint32_t tusec = tv.tv_usec;
451
452 //check if runId already registered in runCtrl
453
454 uint oldest = g_actTime + 1000;
455 int jold = -1;
456
457 int found = 0;
458
459 // fileId==-2: not yet used or run assigned but not open
460 // fileId== 0: file open
461 // fileId>0: run closed
462
463 for (int k=0; k<MAX_RUN; k++)
464 {
465 // Check if run already registered (old entries should have runId==-1)
466 if (runCtrl[k].runId == runID)
467 {
468 // FIXME: Compare to previous event
469 if (runCtrl[k].roi0 != nRoi[0] || runCtrl[k].roi8 != nRoi[8])
470 {
471 factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
472 runCtrl[k].roi0, runCtrl[k].roi8, nRoi[0], nRoi[8], runID, evID);
473 return -9301;
474 }
475
476 found = 1;
477 break;
478 }
479
480 // This is just for sanity. We use the oldest free entry (until
481 // we have understood the concept and can use "just" a free entry
482 if (runCtrl[k].runId==0 && runCtrl[k].closeTime < oldest)
483 {
484 oldest = runCtrl[k].closeTime;
485 jold = k;
486 }
487 }
488
489 if (!found) // Run not yet registered, register run
490 {
491 if (jold < 0)
492 {
493 factPrintf(kFatal, 883, "Not able to register the new run %d", runID);
494 return -1001;
495 }
496
497 const int evFree = jold;
498
499 factPrintf(kInfo, 503, "New run %d (evID=%d, evFree=%d) registered with roi=%d and roi_tm=%d",
500 runID, evID, evFree, nRoi[0], nRoi[8]);
501
502 runCtrl[evFree].runId = runID;
503 runCtrl[evFree].roi0 = nRoi[0]; // FIXME: Make obsolete!
504 runCtrl[evFree].roi8 = nRoi[8]; // FIXME: Make obsolete!
505 runCtrl[evFree].fileId = -2;
506 runCtrl[evFree].procId = -2;
507 runCtrl[evFree].lastEvt = 1; // Number of events partially started to read
508 runCtrl[evFree].actEvt = 0; // Number of written events (write)
509 runCtrl[evFree].procEvt = 0; // Number of successfully checked events (checkEvent)
510 runCtrl[evFree].maxEvt = 999999999; // max number events allowed
511 runCtrl[evFree].lastTime = tsec; // Time when the last event was written
512 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
513 }
514
515 // The new entry which is populated is the one lastPtr is pointing to
516 const int k = evtCtrl_lastPtr;
517
518 //flag all boards as unused
519 evtCtrl[k].nBoard = 0;
520 for (int b=0; b<NBOARDS; b++)
521 evtCtrl[k].board[b] = -1;
522
523 evtCtrl[k].pcTime[0] = tsec;
524 evtCtrl[k].pcTime[1] = tusec;
525 evtCtrl[k].nRoi = nRoi[0];
526 evtCtrl[k].nRoiTM = nRoi[8];
527 evtCtrl[k].evNum = evID;
528 evtCtrl[k].runNum = runID;
529 evtCtrl[k].fadNum = fadNum;
530 evtCtrl[k].trgNum = trgNum;
531 evtCtrl[k].trgTyp = trgTyp;
532 evtCtrl[k].Errors[0] = 0;
533 evtCtrl[k].Errors[1] = 0;
534 evtCtrl[k].Errors[2] = 0;
535 evtCtrl[k].Errors[3] = 0;
536 evtCtrl[k].fEvent = NULL;
537 evtCtrl[k].FADhead = NULL;
538
539 evtCtrl[k].evtStat = 0;
540
541 // This is dangerous, because theoretically, it can result is
542 // acessing invalid memory in another thread if this is split
543 // in two instructions. Must be done only _after_ the contents
544 // have been initialized
545 evtCtrl_lastPtr = (evtCtrl_lastPtr+1) % (MAX_EVT*MAX_RUN);
546
547 return k;
548
549} /*-----------------------------------------------------------------*/
550
551
552void initEvent(int i)
553{
554 evtCtrl[i].fEvent = (EVENT*)((char*)evtCtrl[i].FADhead+MAX_HEAD_MEM);
555 memset(evtCtrl[i].fEvent->Adc_Data, 0, (NPIX+NTMARK)*2*evtCtrl[i].nRoi);
556
557 //flag all pixels as unused
558 for (int k = 0; k < NPIX; k++)
559 evtCtrl[i].fEvent->StartPix[k] = -1;
560
561 //flag all TMark as unused
562 for (int k = 0; k < NTMARK; k++)
563 evtCtrl[i].fEvent->StartTM[k] = -1;
564
565 evtCtrl[i].fEvent->NumBoards = 0;
566 evtCtrl[i].fEvent->SoftTrig = 0;
567 evtCtrl[i].fEvent->PCTime = evtCtrl[i].pcTime[0];
568 evtCtrl[i].fEvent->PCUsec = evtCtrl[i].pcTime[1];
569 evtCtrl[i].fEvent->Roi = evtCtrl[i].nRoi;
570 evtCtrl[i].fEvent->RoiTM = evtCtrl[i].nRoiTM;
571 evtCtrl[i].fEvent->EventNum = evtCtrl[i].evNum;
572 evtCtrl[i].fEvent->TriggerNum = evtCtrl[i].trgNum;
573 evtCtrl[i].fEvent->TriggerType = evtCtrl[i].trgTyp;
574}
575
576
577int
578mBufFree (int i)
579{
580//delete entry [i] from mBuffer:
581//(and make sure multiple calls do no harm ....)
582
583 TGB_free(evtCtrl[i].FADhead);
584
585 evtCtrl[i].fEvent = NULL;
586 evtCtrl[i].FADhead = NULL;
587
588 evtCtrl[i].evNum = evtCtrl[i].nRoi = -1;
589 evtCtrl[i].runNum = 0;
590
591 gj.usdMem = tgb_inuse;
592
593 gj.bufTot--;
594
595 /*if (gi_memStat < 0) {
596 if (gj.usdMem <= 0.75 * gj.maxMem)
597 gi_memStat = +1;
598 }*/
599
600 return 0;
601
602} /*-----------------------------------------------------------------*/
603
604/*
605void
606resetEvtStat ()
607{
608 for (int i = 0; i < MAX_SOCK; i++)
609 gi.numRead[i] = 0;
610
611 for (int i = 0; i < NBOARDS; i++) {
612 gi.gotByte[i] = 0;
613 gi.gotErr[i] = 0;
614
615 }
616
617 gi.evtGet = 0; //#new Start of Events read
618 gi.evtTot = 0; //#complete Events read
619 gi.evtErr = 0; //#Events with Errors
620 gi.evtSkp = 0; //#Events incomplete (timeout)
621
622 gi.procTot = 0; //#Events processed
623 gi.procErr = 0; //#Events showed problem in processing
624 gi.procTrg = 0; //#Events accepted by SW trigger
625 gi.procSkp = 0; //#Events rejected by SW trigger
626
627 gi.feedTot = 0; //#Events used for feedBack system
628 gi.feedErr = 0; //#Events rejected by feedBack
629
630 gi.wrtTot = 0; //#Events written to disk
631 gi.wrtErr = 0; //#Events with write-error
632
633 gi.runOpen = 0; //#Runs opened
634 gi.runClose = 0; //#Runs closed
635 gi.runErr = 0; //#Runs with open/close errors
636
637 return;
638}*/ /*-----------------------------------------------------------------*/
639
640void reportIncomplete(int id)
641{
642 factPrintf(kWarn, 601, "%5d skip incomplete evt %8d",
643 evtCtrl[id].evNum, id);
644
645 uint64_t report = 0;
646
647 char str[1000];
648
649 int ik=0;
650 for (int ib=0; ib<NBOARDS; ib++)
651 {
652 if (ib%10==0)
653 str[ik++] = '|';
654
655 const int jb = evtCtrl[id].board[ib];
656 if (jb>=0) // data received from that board
657 {
658 str[ik++] = '0'+(jb%10);
659 continue;
660 }
661
662 // FIXME: Is that really 'b' or should that be 'ib' ?
663 if (gi_NumConnect[ib]<=0) // board not connected
664 {
665 str[ik++] = 'x';
666 continue;
667 }
668
669 // data from this board lost
670 str[ik++] = '.';
671 report |= ((uint64_t)1)<<ib;
672 }
673
674 str[ik++] = '|';
675 str[ik] = 0;
676
677 factOut(kWarn, 601, str);
678
679 factReportIncomplete(report);
680}
681
682// i == board
683void copyData(CNV_FACT *rbuf, int i, int evID)
684{
685 // swapEventHeaderBytes: End of the header. to channels now
686 int eStart = 36;
687 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
688 {
689 rbuf->S[eStart+0] = ntohs(rbuf->S[eStart+0]);//id
690 rbuf->S[eStart+1] = ntohs(rbuf->S[eStart+1]);//start_cell
691 rbuf->S[eStart+2] = ntohs(rbuf->S[eStart+2]);//roi
692 rbuf->S[eStart+3] = ntohs(rbuf->S[eStart+3]);//filling
693
694 eStart += 4+rbuf->S[eStart+2];//skip the pixel data
695 }
696
697 memcpy(&evtCtrl[evID].FADhead[i], rbuf, sizeof(PEVNT_HEADER));
698
699 int src = sizeof(PEVNT_HEADER) / 2;
700
701 // consistency of ROIs have been checked already (is it all correct?)
702 const int roi = rbuf->S[src+2];
703
704 // different sort in FAD board.....
705 for (int px = 0; px < 9; px++)
706 {
707 for (int drs = 0; drs < 4; drs++)
708 {
709 // pixH = rd[i].rBuf->S[src++]; // ID
710 src++;
711
712 const int pixC = rbuf->S[src++]; // start-cell
713 const int pixR = rbuf->S[src++]; // roi
714 //here we should check if pixH is correct ....
715
716 const int pixS = i * 36 + drs * 9 + px;
717 src++;
718
719 evtCtrl[evID].fEvent->StartPix[pixS] = pixC;
720
721 const int dest1 = pixS * roi;
722 memcpy(&evtCtrl[evID].fEvent->Adc_Data[dest1], &rbuf->S[src], roi * 2);
723
724 src += pixR;
725
726 if (px == 8)
727 {
728 const int tmS = i * 4 + drs;
729
730 //and we have additional TM info
731 if (pixR > roi)
732 {
733 const int dest2 = tmS * roi + NPIX * roi;
734
735 const int srcT = src - roi;
736 evtCtrl[evID].fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024;
737
738 memcpy(&evtCtrl[evID].fEvent->Adc_Data[dest2], &rbuf->S[srcT], roi * 2);
739 }
740 else
741 {
742 evtCtrl[evID].fEvent->StartTM[tmS] = -1;
743 }
744 }
745 }
746 }
747}
748
749
750void *readFAD (void *ptr)
751{
752/* *** main loop reading FAD data and sorting them to complete events */
753
754 factPrintf(kInfo, -1, "Start initializing (readFAD)");
755
756 READ_STRUCT rd[NBOARDS]; //buffer to read IP and afterwards store in mBuffer
757
758 const int minLen = sizeof(PEVNT_HEADER); //min #bytes needed to check header: full header for debug
759
760 //start.S = 0xFB01;
761 //stop.S = 0x04FE;
762
763/* initialize run control logics */
764 for (int i = 0; i < MAX_RUN; i++) {
765 runCtrl[i].runId = 0;
766 runCtrl[i].fileId = -2;
767 runCtrl[i].procId = -2;
768 }
769 gi_resetS = gi_resetR = 9;
770
771 int sockDef[NBOARDS]; //internal state of sockets
772 memset(sockDef, 0, NBOARDS*sizeof(int));
773
774 START:
775 evtCtrl_frstPtr = 0;
776 evtCtrl_lastPtr = 0;
777
778 //time in seconds
779 uint gi_SecTime = time(NULL);;
780
781 const int cntsock = 8 - NUMSOCK ;
782
783 if (gi_resetS > 0) {
784 //make sure all sockets are preallocated as 'not exist'
785 for (int i = 0; i < NBOARDS; i++) {
786 rd[i].socket = -1;
787 rd[i].sockStat = 99;
788 }
789
790 for (int k = 0; k < NBOARDS; k++) {
791 gi_NumConnect[k] = 0;
792 //gi.numConn[k] = 0;
793 gj.numConn[k] = 0;
794 //gj.errConn[k] = 0;
795 gj.rateBytes[k] = 0;
796 gj.totBytes[k] = 0;
797 }
798
799 }
800
801 if (gi_resetR > 0)
802 {
803 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
804 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
805 gj.totMem = tgb_memory;
806 gj.bufNew = gj.bufEvt = 0;
807 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
808
809 // initialize mBuffer (mark all entries as unused\empty)
810 for (int i = 0; i < MAX_EVT * MAX_RUN; i++)
811 {
812 evtCtrl[i].evNum = evtCtrl[i].nRoi = -1;
813 evtCtrl[i].runNum = 0;
814 evtCtrl[i].evtStat = -1;
815 }
816
817 evtCtrl_frstPtr = 0;
818 evtCtrl_lastPtr = 0;
819
820 factPrintf(kInfo, -1, "End initializing (readFAD)");
821 }
822
823
824 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
825
826 //loop until global variable g_runStat claims stop
827 while (g_runStat >= 0 && g_reset == 0)
828 {
829 gi_runStat = g_runStat;
830 gj.readStat = g_runStat;
831
832 struct timeval tv;
833 gettimeofday (&tv, NULL);
834 g_actTime = tv.tv_sec;
835 g_actUsec = tv.tv_usec;
836
837
838 for (int b = 0; b < NBOARDS; b++)
839 {
840 // Nothing has changed
841 if (g_port[b].sockDef == sockDef[b])
842 continue;
843
844 gi_NumConnect[b] = 0; //must close all connections
845 //gi.numConn[b] = 0;
846 gj.numConn[b] = 0;
847
848 // s0 = 0: sockets to be defined and opened
849 // s0 = -1: sockets to be destroyed
850 // s0 = +1: sockets to be closed and reopened
851
852 int s0 = 0;
853 if (sockDef[b] != 0)
854 s0 = g_port[b].sockDef==0 ? -1 : +1;
855
856 const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0;
857
858 GenSock(s0, b, p0+1, &g_port[b].sockAddr, &rd[b]); //generate address and socket
859
860 sockDef[b] = g_port[b].sockDef;
861 }
862
863 // count the number of active boards
864 int actBoards = 0;
865 for (int b = 0; b < NBOARDS; b++)
866 if (sockDef[b] > 0)
867 actBoards++;
868
869 //check all sockets if something to read
870 for (int i = 0; i < NBOARDS; i++)
871 {
872 // Do not try to connect this socket
873 if (rd[i].sockStat > 0)
874 continue;
875
876 if (rd[i].sockStat == -1)
877 {
878 //try to connect if not yet done
879 rd[i].sockStat = connect (rd[i].socket,
880 (struct sockaddr *) &rd[i].SockAddr,
881 sizeof (rd[i].SockAddr));
882 // Failed
883 if (rd[i].sockStat == -1)
884 {
885 rd[i].errCnt++;
886 usleep(25000);
887 continue;
888 }
889
890 // Success (rd[i].sockStat == 0)
891
892 if (sockDef[i] > 0)
893 {
894 rd[i].bufTyp = 0; // expect a header
895 rd[i].bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
896 }
897 else
898 {
899 rd[i].bufTyp = -1; // full data to be skipped
900 rd[i].bufLen = MAX_LEN; // huge for skipping
901 }
902
903 rd[i].bufPos = 0; // no byte read so far
904 rd[i].skip = 0; // start empty
905
906 gi_NumConnect[i] += cntsock;
907 gj.numConn[i]++;
908
909 factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", i, gj.numConn[i]);
910 }
911
912 // Do not read from this socket
913 if (rd[i].bufLen<0)
914 continue;
915
916 //numok++;
917
918 if (rd[i].bufLen>0)
919 {
920 const int32_t jrd =
921 recv(rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
922 rd[i].bufLen, MSG_DONTWAIT);
923
924 // recv failed
925 if (jrd<0)
926 {
927 // There was just nothing waiting
928 if (errno==EWOULDBLOCK || errno==EAGAIN)
929 {
930 //numok--;
931 continue;
932 }
933
934 factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);
935 //gi.gotErr[b]++;
936 continue;
937 }
938
939 // connection was closed ...
940 if (jrd==0)
941 {
942 factPrintf(kInfo, 441, "Socket %d closed by FAD", i);
943
944 const int s0 = sockDef[i] > 0 ? +1 : -1;
945 GenSock(s0, i, 0, NULL, &rd[i]);
946
947 //gi.gotErr[b]++;
948
949 gi_NumConnect[i]-= cntsock ;
950 //gi.numConn[b]--;
951 gj.numConn[i]--;
952
953 continue;
954 }
955 // Success (jrd > 0)
956
957 gj.rateBytes[i] += jrd;
958
959 // are we skipping this board ...
960 if (rd[i].bufTyp < 0)
961 continue;
962
963 rd[i].bufPos += jrd; //==> prepare for continuation
964 rd[i].bufLen -= jrd;
965
966#ifdef EVTDEBUG
967 debugRead(i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, rd[i].bufTyp, tv.tv_sec, tv.tv_usec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
968#endif
969 }
970
971 //we are reading event header
972 if (rd[i].bufTyp <= 0)
973 {
974 //not yet sufficient data to take action
975 if (rd[i].bufPos < minLen)
976 continue;
977
978 //check if startflag correct; else shift block ....
979 // FIXME: This is not enough... this combination of
980 // bytes can be anywhere... at least the end bytes
981 // must be checked somewhere, too.
982 int k;
983 for (k = 0; k < rd[i].bufPos - 1; k++)
984 {
985 //start.S = 0xFB01;
986 if (rd[i].rBuf->B[k] == 0xfb && rd[i].rBuf->B[k+1] == 0x01)
987 break;
988 }
989 rd[i].skip += k;
990
991 //no start of header found
992 if (k >= rd[i].bufPos - 1)
993 {
994 rd[i].bufPos = 0;
995 rd[i].bufLen = sizeof(PEVNT_HEADER);
996 continue;
997 }
998
999 if (k > 0)
1000 {
1001 rd[i].bufPos -= k;
1002 rd[i].bufLen += k;
1003 memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1004 rd[i].bufPos);
1005 }
1006
1007 if (rd[i].bufPos < minLen)
1008 continue;
1009
1010 if (rd[i].skip > 0)
1011 {
1012 factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i);
1013 rd[i].skip = 0;
1014 }
1015
1016 // TGB: This needs much more checks than just the first two bytes!
1017
1018 // Swap everything except start_package_flag.
1019 // It is to difficult to find out where it is used how,
1020 // but it doesn't really matter because it is not really
1021 // used anywehere else
1022 // rd[i].rBuf->S[1] = ntohs(rd[i].rBuf->S[1]); // package_length
1023 rd[i].rBuf->S[2] = ntohs(rd[i].rBuf->S[2]); // version_no
1024 rd[i].rBuf->S[3] = ntohs(rd[i].rBuf->S[3]); // PLLLCK
1025 rd[i].rBuf->S[4] = ntohs(rd[i].rBuf->S[4]); // trigger_crc
1026 rd[i].rBuf->S[5] = ntohs(rd[i].rBuf->S[5]); // trigger_type
1027
1028 rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]); // board id
1029 rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]); // adc_clock_phase_shift
1030 rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]); // number_of_triggers_to_generate
1031 rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]); // trigger_generator_prescaler
1032
1033 rd[i].rBuf->I[3] = ntohl(rd[i].rBuf->I[3]); // trigger_id
1034 rd[i].rBuf->I[4] = ntohl(rd[i].rBuf->I[4]); // fad_evt_counter
1035 rd[i].rBuf->I[5] = ntohl(rd[i].rBuf->I[5]); // REFCLK_frequency
1036
1037 rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]); // runnumber;
1038 rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]); // time;
1039
1040 for (int s=24;s<24+NTemp+NDAC;s++)
1041 rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac
1042
1043 rd[i].fadLen = ntohs(rd[i].rBuf->S[1]) * 2;
1044 rd[i].fadVers = rd[i].rBuf->S[2];
1045 rd[i].ftmTyp = rd[i].rBuf->S[5];
1046 rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt)
1047 rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt)
1048 rd[i].runID = rd[i].rBuf->I[11]==0 ? g_actTime : rd[i].rBuf->I[11];
1049 rd[i].bufTyp = 1; //ready to read full record
1050 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1051
1052 const int fadBoard = rd[i].rBuf->S[12];
1053 debugHead(i, fadBoard, rd[i].rBuf);
1054
1055 continue;
1056 }
1057
1058 // are we reading data
1059
1060 // not yet all read
1061 if (rd[i].bufLen > 0)
1062 continue;
1063
1064 // stop.S = 0x04FE;
1065 if (rd[i].rBuf->B[rd[i].fadLen - 1] != 0xfe ||
1066 rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04)
1067 {
1068 //gi.evtErr++;
1069 factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d",
1070 i, rd[i].evtID, rd[i].fadLen,
1071 rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]);
1072
1073 // ready to read next header
1074 rd[i].bufTyp = 0;
1075 rd[i].bufLen = sizeof(PEVNT_HEADER);
1076 rd[i].bufPos = 0;
1077
1078 continue;
1079 }
1080
1081 // int actid;
1082 // if (g_useFTM > 0)
1083 // actid = rd[i].evtID;
1084 // else
1085 // actid = rd[i].ftmID;
1086
1087 //get index into mBuffer for this event (create if needed)
1088 const int idx = mBufEvt(&rd[i]);
1089
1090 // no free entry in mBuffer, retry later
1091 if (idx == -1)
1092 continue;
1093
1094 // We have a valid entry, but no memory has yet been allocated
1095 if (idx >= 0 && evtCtrl[idx].FADhead == NULL)
1096 {
1097 // Try to get memory from the big buffer
1098 evtCtrl[idx].FADhead = (PEVNT_HEADER*)TGB_Malloc();
1099 if (evtCtrl[idx].FADhead == NULL)
1100 {
1101 // If this works properly, this is a hack which can be removed, or
1102 // replaced by a signal or dim message
1103 if (rd[i].bufTyp==2)
1104 factPrintf(kError, 882, "malloc failed for event %d (run=%d)", evtCtrl[idx].evNum, evtCtrl[idx].runNum);
1105 rd[i].bufTyp = 2;
1106 continue;
1107 }
1108
1109 // Initialise contents of mBuffer[evID]->fEvent
1110 initEvent(idx);
1111
1112 // Some statistics
1113 gj.usdMem = tgb_inuse;
1114
1115 if (gj.usdMem > gj.maxMem)
1116 gj.maxMem = gj.usdMem;
1117
1118 gj.rateNew++;
1119 gj.bufTot++;
1120 if (gj.bufTot > gj.maxEvt)
1121 gj.maxEvt = gj.bufTot;
1122 }
1123
1124 // ready to read next header
1125 rd[i].bufTyp = 0;
1126 rd[i].bufLen = sizeof(PEVNT_HEADER);
1127 rd[i].bufPos = 0;
1128
1129 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1130 if (idx < -1000)
1131 continue;
1132
1133 //we have a valid entry in mBuffer[]; fill it
1134 const int fadBoard = rd[i].rBuf->S[12];
1135 const int fadCrate = fadBoard>>8;
1136
1137 if (i != (fadCrate * 10 + (fadBoard&0xff)))
1138 {
1139 factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)",
1140 i, fadBoard, fadCrate, fadBoard&0xff);
1141 }
1142
1143 if (evtCtrl[idx].board[i] != -1)
1144 {
1145 factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d",
1146 evtCtrl[idx].evNum, i, i, rd[i].fadLen,
1147 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1148 rd[i].rBuf->B[rd[i].fadLen - 2],
1149 rd[i].rBuf->B[rd[i].fadLen - 1]);
1150 continue; // Continue reading next header
1151 }
1152
1153 // Copy data from rd[i] to mBuffer[evID]
1154 copyData(rd[i].rBuf, i, idx);
1155
1156 // now we have stored a new board contents into Event structure
1157
1158 evtCtrl[idx].fEvent->NumBoards++;
1159 evtCtrl[idx].board[i] = i;
1160 evtCtrl[idx].nBoard++;
1161 evtCtrl[idx].evtStat = evtCtrl[idx].nBoard;
1162
1163 // have we already reported first (partial) event of this run ???
1164 if (evtCtrl[idx].nBoard==1 && evtCtrl[idx].runNum != actrun)
1165 {
1166 // Signal the fadctrl that a new run has been started
1167 gotNewRun(evtCtrl[idx].runNum, NULL);
1168
1169 factPrintf(kInfo, 1, "gotNewRun called, prev run %d, new run %d, event %d",
1170 actrun, evtCtrl[idx].runNum, evtCtrl[idx].evNum);
1171
1172 for (int j=0; j<MAX_RUN; j++)
1173 {
1174 // Since we have started a new run, we know already when to close the
1175 // previous run in terms of number of events
1176 if (runCtrl[j].runId==actrun)
1177 runCtrl[j].maxEvt = runCtrl[j].lastEvt;
1178
1179 // We got the first part of this event, so this is
1180 // the number of events we expect for this run
1181 if (runCtrl[j].runId==evtCtrl[idx].runNum)
1182 runCtrl[j].lastEvt++;
1183 }
1184
1185 // Change 'actrun' the the new runnumber
1186 actrun = evtCtrl[idx].runNum;
1187 }
1188
1189 // event not yet complete
1190 if (evtCtrl[idx].nBoard < actBoards)
1191 continue;
1192
1193 // This is a non-ideal hack to lower the probability that
1194 // in mBufEvt the search for correct entry in runCtrl
1195 // will not return a super-old entry
1196 for (int ir=0; ir<MAX_RUN; ir++)
1197 {
1198 if (runCtrl[ir].runId != actrun && runCtrl[ir].fileId>0)
1199 runCtrl[ir].runId = 0;
1200 }
1201
1202 // Flag that the event is ready for processing
1203 evtCtrl[idx].evtStat = 99;
1204
1205 } // end for loop over all sockets
1206
1207 g_actTime = time (NULL);
1208 if (g_actTime <= gi_SecTime)
1209 {
1210 usleep(1);
1211 continue;
1212 }
1213 gi_SecTime = g_actTime;
1214
1215 gj.bufNew = 0;
1216
1217 //loop over all active events and flag those older than read-timeout
1218 //delete those that are written to disk ....
1219
1220 const int count = (evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT*MAX_RUN)%(MAX_EVT*MAX_RUN);
1221
1222 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1223 {
1224 // Check the more likely case first: incomplete events
1225 if (evtCtrl[k0].evtStat>=0 && evtCtrl[k0].evtStat<92)
1226 {
1227 gj.bufNew++; //incomplete event in Buffer
1228
1229 // Event has not yet timed out or was reported already
1230 if (evtCtrl[k0].evtStat>=90 || evtCtrl[k0].pcTime[0]/*evtCtrl[k0].lastRecv*/>=g_actTime - 30)
1231 continue;
1232
1233 // This will result in the emission of a dim service.
1234 // It doesn't matter if that takes comparably long,
1235 // because we have to stop the run anyway.
1236 reportIncomplete(k0);
1237
1238 //timeout for incomplete events
1239 evtCtrl[k0].evtStat = 91;
1240 gj.evtSkip++;
1241
1242 continue;
1243 }
1244
1245 // complete event in Buffer
1246 //if (evtCtrl[k0].evtStat >= 95)
1247 // gj.bufEvt++;
1248
1249 // Check the less likely case: 'useless' or 'delete'
1250 // evtState==0 can happen if the event was initialized (some data received)
1251 // but the data did not make sense (e.g. inconsistent rois)
1252 if (evtCtrl[k0].evtStat==0 || evtCtrl[k0].evtStat >= 9000)
1253 {
1254 mBufFree(k0); //event written--> free memory
1255 evtCtrl[k0].evtStat = -1;
1256
1257 if (k0==evtCtrl_frstPtr)
1258 evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % (MAX_EVT*MAX_RUN);
1259 else
1260 factPrintf(kError, -1, "Freed a non-first slot");
1261
1262 gj.evtWrite++;
1263 gj.rateWrite++;
1264
1265 continue;
1266 }
1267
1268 // The first events in the queue are either incomplete or
1269 // can be deleted (processing finished). As soon as we reach the
1270 // first complete events which processing is pending, we can stop.
1271 // All other events (if everything works well) must have the same state.
1272 break;
1273 }
1274
1275 // The number of complete events in the buffer is the total number of
1276 // events in the buffer minus the number of incomplete events.
1277 gj.bufEvt = count - gj.bufNew;
1278
1279 gj.deltaT = 1000; //temporary, must be improved
1280
1281 for (int ib = 0; ib < NBOARDS; ib++)
1282 gj.totBytes[ib] += gj.rateBytes[ib];
1283
1284 gj.totMem = tgb_memory;
1285
1286 if (gj.maxMem > gj.xxxMem)
1287 gj.xxxMem = gj.maxMem;
1288 if (gj.maxEvt > gj.xxxEvt)
1289 gj.xxxEvt = gj.maxEvt;
1290
1291 factStat (gj);
1292 //factStatNew (gi);
1293 gj.rateNew = gj.rateWrite = 0;
1294 gj.maxMem = gj.usdMem;
1295 gj.maxEvt = gj.bufTot;
1296 for (int b = 0; b < NBOARDS; b++)
1297 gj.rateBytes[b] = 0;
1298
1299 } // while (g_runStat >= 0 && g_reset == 0)
1300
1301 factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset);
1302
1303 if (g_reset > 0)
1304 {
1305 gi_reset = g_reset;
1306 gi_resetR = gi_reset % 10; //shall we stop reading ?
1307 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1308 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1309 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1310 g_reset = 0;
1311 }
1312 else
1313 {
1314 gi_reset = 0;
1315 gi_resetR = g_runStat == -1 ? 1 : 7;
1316
1317 gi_resetS = 7; //close all sockets
1318 gi_resetW = 7; //close all files
1319 gi_resetX = 0;
1320
1321 //inform others we have to quit ....
1322 gi_runStat = -11; //inform all that no update to happen any more
1323 gj.readStat = -11; //inform all that no update to happen any more
1324 }
1325
1326 if (gi_resetS > 0)
1327 {
1328 //must close all open sockets ...
1329 factPrintf(kInfo, -1, "Close all sockets...");
1330
1331 for (int i = 0; i < NBOARDS; i++)
1332 {
1333 if (rd[i].sockStat != 0)
1334 continue;
1335
1336 GenSock(-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1337
1338 gi_NumConnect[i]-= cntsock ;
1339 gj.numConn[i]--;
1340 sockDef[i] = 0; //flag ro recreate the sockets ...
1341 rd[i].sockStat = -1; //and try to open asap
1342 }
1343 }
1344
1345
1346 if (gi_resetR > 0)
1347 {
1348 //and clear all buffers (might have to wait until all others are done)
1349 while (evtCtrl_frstPtr!=evtCtrl_lastPtr)
1350 {
1351 const int k0=evtCtrl_frstPtr;
1352
1353 // flag incomplete events as 'read finished'
1354 if (evtCtrl[k0].evtStat>0 && evtCtrl[k0].evtStat < 90)
1355 evtCtrl[k0].evtStat = 91;
1356
1357 if (evtCtrl[k0].evtStat==0 || evtCtrl[k0].evtStat>900)
1358 {
1359 mBufFree(k0); //event written--> free memory
1360 evtCtrl[k0].evtStat = -1;
1361
1362 evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % (MAX_EVT*MAX_RUN);
1363 }
1364
1365 usleep(1);
1366 }
1367 }
1368
1369 if (gi_reset > 0)
1370 {
1371 if (gi_resetW > 0)
1372 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1373
1374 if (gi_resetX > 0)
1375 {
1376 struct timespec xwait;
1377 xwait.tv_sec = gi_resetX;
1378 xwait.tv_nsec = 0;
1379 nanosleep (&xwait, NULL);
1380 }
1381
1382 factPrintf(kInfo, -1, "Continue read Process ...");
1383 gi_reset = 0;
1384 goto START;
1385 }
1386
1387 factPrintf(kInfo, -1, "Exit read Process...");
1388
1389 factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse);
1390
1391 gi_runStat = -99;
1392 gj.readStat = -99;
1393
1394 factStat (gj);
1395 factStatNew (gi);
1396
1397 return 0;
1398
1399} /*-----------------------------------------------------------------*/
1400
1401
1402void *subProc(void *thrid)
1403{
1404 const int64_t threadID = (int64_t)thrid;
1405
1406 factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID);
1407
1408 while (g_runStat > -2) //in case of 'exit' we still must process pending events
1409 {
1410 int numWait = 0;
1411 int numProc = 0;
1412
1413 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1414 {
1415 if (evtCtrl[k0].evtStat != 1000 + threadID)
1416 {
1417 if (evtCtrl[k0].evtStat < 1000 + threadID)
1418 numWait++;
1419
1420 continue;
1421 }
1422
1423 int jret = 9100; // flag to be deleted (gi_resetR>1 : flush buffers asap)
1424
1425 if (gi_resetR<=1)
1426 {
1427 jret = subProcEvt(threadID, evtCtrl[k0].FADhead,
1428 evtCtrl[k0].fEvent, NULL/*mBuffer[id].buffer*/);
1429
1430
1431 if (jret <= threadID) {
1432 factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret);
1433 jret = 5300;
1434 } else if (jret <= 0)
1435 jret = 9200 + threadID; // flag as 'to be deleted'
1436 else if (jret >= gi_maxProc)
1437 jret = 5200 + threadID; // flag as 'to be written'
1438 else
1439 jret = 1000 + jret; // flag for next proces
1440 }
1441
1442 evtCtrl[k0].evtStat = jret;
1443 numProc++;
1444 }
1445
1446 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1447 factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID);
1448 return 0;
1449 }
1450
1451 //seems we have nothing to do, so sleep a little
1452 if (numProc == 0)
1453 usleep(1);
1454 }
1455
1456 factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID);
1457
1458 return 0;
1459}
1460
1461/*-----------------------------------------------------------------*/
1462
1463
1464void *
1465procEvt (void *ptr)
1466{
1467/* *** main loop processing file, including SW-trigger */
1468 int status;
1469
1470 int lastRun = 0; //usually run from last event still valid
1471
1472// cpu_set_t mask;
1473// int cpu = 1; //process thread (will be several in final version)
1474
1475 factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc);
1476
1477/* CPU_ZERO initializes all the bits in the mask to zero. */
1478// CPU_ZERO (&mask);
1479/* CPU_SET sets only the bit corresponding to cpu. */
1480// CPU_SET( 0 , &mask ); leave for system
1481// CPU_SET( 1 , &mask ); used by write process
1482// CPU_SET (2, &mask);
1483// CPU_SET (3, &mask);
1484// CPU_SET (4, &mask);
1485// CPU_SET (5, &mask);
1486// CPU_SET (6, &mask);
1487// CPU_SET( 7 , &mask ); used by read process
1488/* sched_setaffinity returns 0 in success */
1489// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1490// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1491// factOut (kWarn, -1, str);
1492// }
1493
1494
1495 pthread_t thread[100];
1496// int th_ret[100];
1497
1498 for (long long k = 0; k < gi_maxProc; k++) {
1499 /*th_ret[k] =*/ pthread_create (&thread[k], NULL, subProc, (void *) k);
1500 }
1501
1502 // in case of 'exit' we still must process pending events
1503 while (g_runStat > -2)
1504 {
1505 int numWait = 0;
1506 int numProc = 0;
1507
1508 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1509 {
1510 if (evtCtrl[k0].evtStat <= 90 || evtCtrl[k0].evtStat >= 1000)
1511 {
1512 if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat< 90)
1513 numWait++;
1514
1515 continue;
1516 }
1517
1518 //we are asked to flush buffers asap
1519 if (gi_resetR > 1)
1520 {
1521 evtCtrl[k0].evtStat = 9991;
1522 continue;
1523 }
1524
1525 //-------- it is better to open the run already here, so call can be used to initialize
1526 //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1527 const uint32_t irun = evtCtrl[k0].runNum;
1528 const int32_t ievt = evtCtrl[k0].evNum;
1529
1530 // Find entry in runCtrl which belongs to the event mBuffer[id]
1531 // (only check if there is a need to check)
1532 if (runCtrl[lastRun].runId != irun)
1533 {
1534 //check which fileID to use (or open if needed)
1535 int j;
1536 for (j=0;j<MAX_RUN; j++)
1537 if (runCtrl[j].runId == irun)
1538 break;
1539
1540 if (j>=MAX_RUN)
1541 {
1542 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, k0);
1543 // FIXME: What is the right action? (Flag event for deletion?)
1544 continue;
1545 }
1546
1547 lastRun = j;
1548 }
1549
1550 // File not yet open
1551 if (runCtrl[lastRun].fileId < 0)
1552 {
1553 //---- we need to open a new run ==> make sure all older runs are
1554 //---- finished and marked to be closed ....
1555 // This loop is unique to procEvt
1556 for (int j=0; j<MAX_RUN; j++)
1557 {
1558 if (runCtrl[j].fileId == 0)
1559 {
1560 runCtrl[j].procId = 2; //--> do no longer accept events for processing
1561
1562 //---- problem: processing still going on ==> must wait for closing ....
1563 factPrintf(kInfo, -1, "procEvt: Finished run since new one opened %d", runCtrl[j].runId);
1564 runFinish1(runCtrl[j].runId);
1565 }
1566 }
1567
1568 RUN_HEAD actRun;
1569 actRun.Version = 1;
1570 actRun.RunType = -1; //to be adapted
1571 actRun.Nroi = evtCtrl[k0].nRoi; //runCtrl[lastRun].roi0;
1572 actRun.NroiTM = evtCtrl[k0].nRoiTM; //runCtrl[lastRun].roi8;
1573 actRun.RunTime = evtCtrl[k0].pcTime[0]; //runCtrl[lastRun].firstTime;
1574 actRun.RunUsec = evtCtrl[k0].pcTime[1]; //runCtrl[lastRun].firstUsec;
1575 actRun.NBoard = NBOARDS;
1576 actRun.NPix = NPIX;
1577 actRun.NTm = NTMARK;
1578
1579 memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS*sizeof(PEVNT_HEADER));
1580
1581 runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun));
1582 if (runCtrl[lastRun].fileHd == NULL)
1583 {
1584 factPrintf(kError, 502, "procEvt: Could not open a file for run %d (runOpen failed)", irun);
1585 runCtrl[lastRun].fileId = 91;
1586 runCtrl[lastRun].procId = 91; // Is not set in writeEvt
1587 continue;
1588 }
1589
1590 runCtrl[lastRun].fileId = 0;
1591 runCtrl[lastRun].procId = 0; // Is not set in writeEvt
1592
1593 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt);
1594 }
1595
1596 //-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
1597 if (runCtrl[lastRun].procId == 0)
1598 {
1599 if (runCtrl[lastRun].closeTime < g_actTime ||
1600 runCtrl[lastRun].lastTime < g_actTime - 300 ||
1601 runCtrl[lastRun].maxEvt <= runCtrl[lastRun].procEvt)
1602 {
1603 factPrintf(kInfo, 502, "procEvt: Reached end of run condition for run %d", irun);
1604 runFinish1 (runCtrl[lastRun].runId);
1605 runCtrl[lastRun].procId = 1;
1606 }
1607 }
1608
1609 // Skip event because of no active run
1610 if (runCtrl[lastRun].procId != 0)
1611 {
1612 evtCtrl[k0].evtStat = 9091;
1613 continue;
1614 }
1615
1616 //--------
1617 //--------
1618
1619 //and set correct event header ; also check for consistency in event (not yet)
1620 evtCtrl[k0].fEvent->Errors[0] = evtCtrl[k0].Errors[0];
1621 evtCtrl[k0].fEvent->Errors[1] = evtCtrl[k0].Errors[1];
1622 evtCtrl[k0].fEvent->Errors[2] = evtCtrl[k0].Errors[2];
1623 evtCtrl[k0].fEvent->Errors[3] = evtCtrl[k0].Errors[3];
1624
1625 for (int ib=0; ib<NBOARDS; ib++)
1626 {
1627 // board is not read
1628 if (evtCtrl[k0].board[ib] == -1)
1629 {
1630 evtCtrl[k0].FADhead[ib].start_package_flag = 0;
1631 evtCtrl[k0].fEvent->BoardTime[ib] = 0;
1632 }
1633 else
1634 {
1635 evtCtrl[k0].fEvent->BoardTime[ib] = evtCtrl[k0].FADhead[ib].time;
1636 }
1637 }
1638
1639 const int rc = eventCheck(evtCtrl[k0].runNum, evtCtrl[k0].FADhead,
1640 evtCtrl[k0].fEvent);
1641 //gi.procTot++;
1642 numProc++;
1643
1644 if (rc < 0)
1645 {
1646 evtCtrl[k0].evtStat = 9999; //flag event to be skipped
1647 //gi.procErr++;
1648 }
1649 else
1650 {
1651 evtCtrl[k0].evtStat = 1000;
1652 runCtrl[lastRun].procEvt++;
1653 }
1654 }
1655
1656 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1657 factPrintf(kInfo, -1, "Exit Processing Process ...");
1658 gp_runStat = -22; //==> we should exit
1659 gj.procStat = -22; //==> we should exit
1660 return 0;
1661 }
1662
1663 //seems we have nothing to do, so sleep a little
1664 if (numProc == 0)
1665 usleep(1);
1666
1667 gp_runStat = gi_runStat;
1668 gj.procStat = gj.readStat;
1669
1670 }
1671
1672 //we are asked to abort asap ==> must flag all remaining events
1673 // when gi_runStat claims that all events are in the buffer...
1674
1675 factPrintf(kInfo, -1, "Abort Processing Process ...");
1676
1677 for (int k = 0; k < gi_maxProc; k++) {
1678 pthread_join (thread[k], (void **) &status);
1679 }
1680
1681 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1682 {
1683 if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat < 1000)
1684 evtCtrl[k0].evtStat = 9800; //flag event as 'processed'
1685 }
1686
1687 gp_runStat = -99;
1688 gj.procStat = -99;
1689
1690 return 0;
1691
1692} /*-----------------------------------------------------------------*/
1693
1694int
1695CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
1696{
1697/* close run runId (all all runs if runId=0) */
1698/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1699 int j;
1700
1701
1702 if (runId == 0) {
1703 for (j = 0; j < MAX_RUN; j++) {
1704 if (runCtrl[j].fileId == 0) { //run is open
1705 runCtrl[j].closeTime = closeTime;
1706 runCtrl[j].maxEvt = maxEvt;
1707 }
1708 }
1709 return 0;
1710 }
1711
1712 for (j = 0; j < MAX_RUN; j++) {
1713 if (runCtrl[j].runId == runId) {
1714 if (runCtrl[j].fileId == 0) { //run is open
1715 runCtrl[j].closeTime = closeTime;
1716 runCtrl[j].maxEvt = maxEvt;
1717 return 0;
1718 } else if (runCtrl[j].fileId < 0) { //run not yet opened
1719 runCtrl[j].closeTime = closeTime;
1720 runCtrl[j].maxEvt = maxEvt;
1721 return +1;
1722 } else { // run already closed
1723 return +2;
1724 }
1725 }
1726 } //we only reach here if the run was never created
1727 return -1;
1728
1729}
1730
1731void checkAndCloseRun(int j, int irun, int cond, int where)
1732{
1733 if (!cond &&
1734 runCtrl[j].closeTime >= g_actTime &&
1735 runCtrl[j].lastTime >= g_actTime - 300 &&
1736 runCtrl[j].maxEvt > runCtrl[j].actEvt)
1737 return;
1738
1739 //close run for whatever reason
1740 int ii = 0;
1741 if (cond)
1742 ii = 1;
1743 if (runCtrl[j].closeTime < g_actTime)
1744 ii |= 2; // = 2;
1745 if (runCtrl[j].lastTime < g_actTime - 300)
1746 ii |= 4; // = 3;
1747 if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
1748 ii |= 8; // = 4;
1749
1750 if (runCtrl[j].procId == 0)
1751 {
1752 runFinish1(runCtrl[j].runId);
1753 runCtrl[j].procId = 92;
1754 }
1755
1756 runCtrl[j].closeTime = g_actTime - 1;
1757
1758 const int rc = runClose(runCtrl[j].fileHd, NULL, 0);//&runTail[j], sizeof(runTail[j]));
1759 if (rc<0)
1760 {
1761 factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)",
1762 where, runCtrl[j].runId, rc);
1763 runCtrl[j].fileId = 92+where*2;
1764 }
1765 else
1766 {
1767 factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)",
1768 where, irun, ii);
1769 runCtrl[j].fileId = 93+where*2;
1770 }
1771}
1772
1773/*-----------------------------------------------------------------*/
1774
1775
1776void *writeEvt (void *ptr)
1777{
1778/* *** main loop writing event (including opening and closing run-files */
1779
1780// cpu_set_t mask;
1781// int cpu = 1; //write thread
1782
1783 factPrintf(kInfo, -1, "Starting write-thread");
1784
1785/* CPU_ZERO initializes all the bits in the mask to zero. */
1786// CPU_ZERO (&mask);
1787/* CPU_SET sets only the bit corresponding to cpu. */
1788// CPU_SET (cpu, &mask);
1789/* sched_setaffinity returns 0 in success */
1790// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1791// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
1792// }
1793
1794 int lastRun = 0; //usually run from last event still valid
1795
1796 while (g_runStat > -2)
1797 {
1798 int numWrite = 0;
1799 int numWait = 0;
1800
1801 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1802 {
1803 if (evtCtrl[k0].evtStat <= 5000 || evtCtrl[k0].evtStat >= 9000)
1804 {
1805 if (evtCtrl[k0].evtStat > 0 && evtCtrl[k0].evtStat < 9000)
1806 numWait++;
1807
1808 continue;
1809 }
1810
1811 //we must drain the buffer asap
1812 if (gi_resetR > 1)
1813 {
1814 evtCtrl[k0].evtStat = 9904;
1815 continue;
1816 }
1817
1818 const uint32_t irun = evtCtrl[k0].runNum;
1819 const int32_t ievt = evtCtrl[k0].evNum;
1820
1821 // Find entry in runCtrl which belongs to the event mBuffer[id]
1822 // (only check if there is a need to check)
1823 if (runCtrl[lastRun].runId != irun)
1824 {
1825 //check which fileID to use (or open if needed)
1826 int j;
1827 for (j=0;j<MAX_RUN; j++)
1828 if (runCtrl[j].runId == irun)
1829 break;
1830
1831 if (j>=MAX_RUN)
1832 {
1833 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, k0);
1834 // FIXME: What is the right action?
1835 continue;
1836 }
1837
1838 lastRun = j;
1839 }
1840
1841 // File not yet open
1842 if (runCtrl[lastRun].fileId < 0)
1843 {
1844 RUN_HEAD actRun;
1845 actRun.Version = 1;
1846 actRun.RunType = -1; //to be adapted
1847 actRun.Nroi = evtCtrl[k0].nRoi; //runCtrl[lastRun].roi0;
1848 actRun.NroiTM = evtCtrl[k0].nRoiTM; //runCtrl[lastRun].roi8;
1849 actRun.RunTime = evtCtrl[k0].pcTime[0];//runCtrl[lastRun].firstTime;
1850 actRun.RunUsec = evtCtrl[k0].pcTime[1];//runCtrl[lastRun].firstUsec;
1851 actRun.NBoard = NBOARDS;
1852 actRun.NPix = NPIX;
1853 actRun.NTm = NTMARK;
1854
1855 memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS * sizeof (PEVNT_HEADER));
1856
1857 runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun));
1858 if (runCtrl[lastRun].fileHd == NULL)
1859 {
1860 factPrintf(kError, 502, "writeEvt: Could not open a file for run %d (runOpen failed)", irun);
1861 runCtrl[lastRun].fileId = 91;
1862 continue;
1863 }
1864
1865 runCtrl[lastRun].fileId = 0;
1866 factPrintf(kInfo, -1, "writeEvt: Opened new file for run %d (evt %d)", irun, ievt);
1867 }
1868
1869 if (runCtrl[lastRun].fileId > 0)
1870 {
1871 // There is an event but file is already closed
1872 /*
1873 if (runCtrl[j].fileId < 100)
1874 {
1875 factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun);
1876 runCtrl[j].fileId += 100;
1877 }*/
1878
1879 evtCtrl[k0].evtStat = 9903;
1880 }
1881
1882 // File is open
1883 if (runCtrl[lastRun].fileId==0)
1884 {
1885 const int rc = runWrite(runCtrl[lastRun].fileHd, evtCtrl[k0].fEvent,
1886 0/*sizeof (evtCtrl[k0])*/);
1887 if (rc >= 0)
1888 {
1889 // Sucessfully wrote event
1890 runCtrl[lastRun].lastTime = g_actTime;
1891 runCtrl[lastRun].actEvt++;
1892
1893 evtCtrl[k0].evtStat = 9901;
1894 }
1895 else
1896 {
1897 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun);
1898 evtCtrl[k0].evtStat = 9902;
1899 }
1900
1901 checkAndCloseRun(lastRun, irun, rc<0, 1);
1902 }
1903 }
1904/*
1905 //check if we should close a run (mainly when no event pending)
1906 //ETIENNE but first figure out which one is the latest run with a complete event.
1907 //i.e. max run Id and lastEvt >= 0
1908 //this condition is sufficient because all pending events were written already in the loop just above
1909 //actrun
1910 uint32_t lastStartedTime = 0;
1911 uint32_t runIdFound = 1;
1912
1913 //If we have an active run, look for its start time
1914 if (actrun != 0)
1915 {
1916 runIdfound = 0;
1917 for (int j=0;j<MAX_RUN;j++)
1918 {
1919 if (runCtrl[j].runId == actrun)
1920 {
1921 lastStartedTime = runCtrl[j].lastTime;
1922 runIdFound = 1;
1923 }
1924 }
1925 }
1926
1927 if (runIdFound == 0)
1928 {
1929 factPrintf(kInfo, 0, "An Active run (number %u) has been registered, but it could not be found in the runs list", actrun);
1930 }
1931
1932 //Also check if some files will never be opened
1933 //EDIT: this is completely useless, because as run Numbers are taken from FADs board,
1934 //I will never get run numbers for which no file is to be opened
1935 for (int j=0;j<MAX_RUN;j++)
1936 {
1937 if ((runCtrl[j].fileId < 0) &&
1938 (runCtrl[j].lastTime < lastStartedTime) &&
1939 (runCtrl[j].runId != 0))
1940 {
1941 factPrintf(kInfo, 0, "writeEvt: No file will be opened for run %u. Last run: %u (started)", runCtrl[j].runId, actrun);
1942 ;//TODO notify that this run will never be opened
1943 }
1944 }
1945 */
1946
1947 // Although the are no pending events, we have to check if a run should be closed (timeout)
1948 for (int j=0; j<MAX_RUN; j++)
1949 {
1950 if (runCtrl[j].fileId == 0)
1951 {
1952 //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
1953 const int cond = /*runCtrl[j].lastTime < lastStartedTime &&*/ runCtrl[j].runId == 0;
1954 checkAndCloseRun(j, runCtrl[j].runId, cond, 2);
1955 }
1956 }
1957
1958 //seems we have nothing to do, so sleep a little
1959 if (numWrite == 0)
1960 usleep(1);
1961
1962 //nothing left to do
1963 if (gj.readStat < -10 && numWait == 0)
1964 {
1965 factPrintf(kInfo, -1, "Finish Write Process ...");
1966 gw_runStat = -22; //==> we should exit
1967 gj.writStat = -22; //==> we should exit
1968 break;
1969 }
1970
1971 gw_runStat = gi_runStat;
1972 gj.writStat = gj.readStat;
1973 }
1974
1975 factPrintf(kInfo, -1, "Close all open files ...");
1976 for (int j=0; j<MAX_RUN; j++)
1977 {
1978 if (runCtrl[j].fileId == 0)
1979 checkAndCloseRun(j, runCtrl[j].runId, 1, 3);
1980 }
1981
1982 gw_runStat = -99;
1983 gj.writStat = -99;
1984
1985 factPrintf(kInfo, -1, "Exit Writing Process ...");
1986
1987 return 0;
1988} /*-----------------------------------------------------------------*/
1989
1990
1991
1992
1993void
1994StartEvtBuild ()
1995{
1996
1997 int i, /*j,*/ imax, status/*, th_ret[50]*/;
1998 pthread_t thread[50];
1999 struct timespec xwait;
2000
2001 gi_runStat = gp_runStat = gw_runStat = 0;
2002 gj.readStat = gj.procStat = gj.writStat = 0;
2003
2004 factPrintf(kInfo, -1, "Starting EventBuilder V15.07 A");
2005
2006//initialize run control logics
2007 for (i = 0; i < MAX_RUN; i++) {
2008 runCtrl[i].runId = 0;
2009 runCtrl[i].fileId = -2;
2010 }
2011
2012//prepare for subProcesses
2013 gi_maxSize = g_maxSize;
2014 if (gi_maxSize <= 0)
2015 gi_maxSize = 1;
2016
2017 gi_maxProc = g_maxProc;
2018 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2019 factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc);
2020 gi_maxProc = 1;
2021 }
2022//partially initialize event control logics
2023 evtCtrl_frstPtr = 0;
2024 evtCtrl_lastPtr = 0;
2025
2026//start all threads (more to come) when we are allowed to ....
2027 while (g_runStat == 0) {
2028 xwait.tv_sec = 0;
2029 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2030 nanosleep (&xwait, NULL);
2031 }
2032
2033 i = 0;
2034 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL);
2035 i++;
2036 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL);
2037 i++;
2038 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL);
2039 i++;
2040 imax = i;
2041
2042
2043#ifdef BILAND
2044 xwait.tv_sec = 30;;
2045 xwait.tv_nsec = 0; // sleep for ~20sec
2046 nanosleep (&xwait, NULL);
2047
2048 printf ("close all runs in 2 seconds\n");
2049
2050 CloseRunFile (0, time (NULL) + 2, 0);
2051
2052 xwait.tv_sec = 1;;
2053 xwait.tv_nsec = 0; // sleep for ~20sec
2054 nanosleep (&xwait, NULL);
2055
2056 printf ("setting g_runstat to -1\n");
2057
2058 g_runStat = -1;
2059#endif
2060
2061
2062//wait for all threads to finish
2063 for (i = 0; i < imax; i++) {
2064 /*j =*/ pthread_join (thread[i], (void **) &status);
2065 }
2066
2067} /*-----------------------------------------------------------------*/
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083 /*-----------------------------------------------------------------*/
2084 /*-----------------------------------------------------------------*/
2085 /*-----------------------------------------------------------------*/
2086 /*-----------------------------------------------------------------*/
2087 /*-----------------------------------------------------------------*/
2088
2089#ifdef BILAND
2090
2091int
2092subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2093 int8_t * buffer)
2094{
2095 printf ("called subproc %d\n", threadID);
2096 return threadID + 1;
2097}
2098
2099
2100
2101
2102 /*-----------------------------------------------------------------*/
2103 /*-----------------------------------------------------------------*/
2104 /*-----------------------------------------------------------------*/
2105 /*-----------------------------------------------------------------*/
2106 /*-----------------------------------------------------------------*/
2107
2108
2109
2110
2111FileHandle_t
2112runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2113{
2114 return 1;
2115};
2116
2117int
2118runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2119{
2120 return 1;
2121 usleep (10000);
2122 return 1;
2123}
2124
2125
2126//{ return 1; } ;
2127
2128int
2129runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2130{
2131 return 1;
2132};
2133
2134
2135
2136
2137int
2138eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2139{
2140 int i = 0;
2141
2142// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2143// for (i=0; i<NBOARDS; i++) {
2144// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2145// }
2146 return 0;
2147}
2148
2149
2150void
2151factStatNew (EVT_STAT gi)
2152{
2153 int i;
2154
2155//for (i=0;i<MAX_SOCK;i++) {
2156// printf("%4d",gi.numRead[i]);
2157// if (i%20 == 0 ) printf("\n");
2158//}
2159}
2160
2161void
2162gotNewRun (int runnr, PEVNT_HEADER * headers)
2163{
2164 printf ("got new run %d\n", runnr);
2165 return;
2166}
2167
2168void
2169factStat (GUI_STAT gj)
2170{
2171// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2172// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2173}
2174
2175
2176void
2177debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2178 int state, uint32_t tsec, uint32_t tusec)
2179{
2180// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2181}
2182
2183
2184
2185void
2186debugStream (int isock, void *buf, int len)
2187{
2188}
2189
2190void
2191debugHead (int i, int j, void *buf)
2192{
2193}
2194
2195
2196void
2197factOut (int severity, int err, char *message)
2198{
2199 static FILE *fd;
2200 static int file = 0;
2201
2202 if (file == 0) {
2203 printf ("open file\n");
2204 fd = fopen ("x.out", "w+");
2205 file = 999;
2206 }
2207
2208 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2209
2210 if (severity != kDebug)
2211 printf ("%3d %3d | %s\n", severity, err, message);
2212}
2213
2214
2215
2216int
2217main ()
2218{
2219 int i, b, c, p;
2220 char ipStr[100];
2221 struct in_addr IPaddr;
2222
2223 g_maxMem = 1024 * 1024; //MBytes
2224//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2225 g_maxMem = g_maxMem * 200; //100MBytes
2226
2227 g_maxProc = 20;
2228 g_maxSize = 30000;
2229
2230 g_runStat = 40;
2231
2232 i = 0;
2233
2234// version for standard crates
2235//for (c=0; c<4,c++) {
2236// for (b=0; b<10; b++) {
2237// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2238//
2239// inet_pton(PF_INET, ipStr, &IPaddr) ;
2240//
2241// g_port[i].sockAddr.sin_family = PF_INET;
2242// g_port[i].sockAddr.sin_port = htons(5000) ;
2243// g_port[i].sockAddr.sin_addr = IPaddr ;
2244// g_port[i].sockDef = 1 ;
2245// i++ ;
2246// }
2247//}
2248//
2249//version for PC-test *
2250 for (c = 0; c < 4; c++) {
2251 for (b = 0; b < 10; b++) {
2252 sprintf (ipStr, "10.0.%d.11", 128 + c);
2253 if (c < 2)
2254 sprintf (ipStr, "10.0.%d.11", 128);
2255 else
2256 sprintf (ipStr, "10.0.%d.11", 131);
2257// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2258
2259 inet_pton (PF_INET, ipStr, &IPaddr);
2260 p = 31919 + 100 * c + 10 * b;
2261
2262
2263 g_port[i].sockAddr.sin_family = PF_INET;
2264 g_port[i].sockAddr.sin_port = htons (p);
2265 g_port[i].sockAddr.sin_addr = IPaddr;
2266 g_port[i].sockDef = 1;
2267
2268 i++;
2269 }
2270 }
2271
2272
2273//g_port[17].sockDef =-1 ;
2274//g_actBoards-- ;
2275
2276 StartEvtBuild ();
2277
2278 return 0;
2279
2280}
2281#endif
Note: See TracBrowser for help on using the repository browser.