source: trunk/FACT++/src/EventBuilder.c@ 15476

Last change on this file since 15476 was 15476, checked in by tbretz, 11 years ago
When a new start flag is searched, the one byte was missed -- i am wondering, if not a reconnect would be the better solution
File size: 68.0 KB
Line 
1// #define EVTDEBUG
2
3#define NUMSOCK 1 //set to 7 for old configuration
4#define MAXREAD 65536 //64kB wiznet buffer
5
6#include <stdlib.h>
7#include <stdint.h>
8#include <stdarg.h>
9#include <unistd.h>
10#include <stdio.h>
11#include <sys/time.h>
12#include <arpa/inet.h>
13#include <string.h>
14#include <math.h>
15#include <error.h>
16#include <errno.h>
17#include <unistd.h>
18#include <sys/types.h>
19#include <sys/socket.h>
20#include <netinet/in.h>
21#include <netinet/tcp.h>
22#include <pthread.h>
23#include <sched.h>
24
25#include "EventBuilder.h"
26
27enum Severity
28{
29 kMessage = 10, ///< Just a message, usually obsolete
30 kInfo = 20, ///< An info telling something which can be interesting to know
31 kWarn = 30, ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
32 kError = 40, ///< Error, something unexpected happened, but can still be handled by the program
33 kFatal = 50, ///< An error which cannot be handled at all happend, the only solution is program termination
34 kDebug = 99, ///< A message used for debugging only
35};
36
37#define MIN_LEN 32 // min #bytes needed to interpret FADheader
38#define MAX_LEN 256*1024 // size of read-buffer per socket
39
40//#define nanosleep(x,y)
41
42extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
43extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
44extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
45//extern int runFinish (uint32_t runnr);
46
47extern void factOut (int severity, int err, char *message);
48extern void factReportIncomplete (uint64_t rep);
49
50extern void gotNewRun (int runnr, PEVNT_HEADER * headers);
51
52
53extern void factStat (GUI_STAT gj);
54
55extern void factStatNew (EVT_STAT gi);
56
57extern int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
58
59extern int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
60 int8_t * buffer);
61
62extern void debugHead (int i, int j, void *buf);
63
64extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
65 int32_t runnr, int state, uint32_t tsec,
66 uint32_t tusec);
67extern void debugStream (int isock, void *buf, int len);
68
69int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
70
71int evtCtrl_frstPtr;
72int evtCtrl_lastPtr;
73
74int g_maxProc;
75int g_maxSize;
76int gi_maxSize;
77int gi_maxProc;
78
79uint g_actTime;
80uint g_actUsec;
81int g_runStat;
82int g_reset;
83int g_useFTM;
84
85int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
86size_t g_maxMem; //maximum memory allowed for buffer
87
88//no longer needed ...
89int g_maxBoards; //maximum number of boards to be initialized
90int g_actBoards;
91//
92
93FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
94
95
96int gi_runStat;
97int gp_runStat;
98int gw_runStat;
99
100uint32_t actrun = 0;
101
102uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
103
104EVT_STAT gi;
105GUI_STAT gj;
106
107EVT_CTRL evtCtrl[MAX_EVT * MAX_RUN]; //control of events during processing
108
109void factPrintf(int severity, int id, const char *fmt, ...)
110{
111 char str[1000];
112
113 va_list ap;
114 va_start(ap, fmt);
115 vsnprintf(str, 1000, fmt, ap);
116 va_end(ap);
117
118 factOut(severity, id, str);
119}
120
121
122#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
123#define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)
124typedef struct TGB_struct
125{
126 struct TGB_struct *prev;
127 void *mem;
128} TGB_entry;
129
130TGB_entry *tgb_last = NULL;
131uint64_t tgb_memory = 0;
132uint64_t tgb_inuse = 0;
133
134void *TGB_Malloc()
135{
136 // No free slot available, next alloc would exceed max memory
137 if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem)
138 return NULL;
139
140 // We will return this amount of memory
141 tgb_inuse += MAX_TOT_MEM;
142
143 // No free slot available, allocate a new one
144 if (!tgb_last)
145 {
146 tgb_memory += MAX_TOT_MEM;
147 return malloc(MAX_TOT_MEM);
148 }
149
150 // Get the next free slot from the stack and return it
151 TGB_entry *last = tgb_last;
152
153 void *mem = last->mem;
154 tgb_last = last->prev;
155
156 free(last);
157
158 return mem;
159};
160
161void TGB_free(void *mem)
162{
163 // Add the last free slot to the stack
164 TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry));
165
166 entry->prev = tgb_last;
167 entry->mem = mem;
168
169 tgb_last = entry;
170
171 // Decrease the amont of memory in use accordingly
172 tgb_inuse -= MAX_TOT_MEM;
173}
174
175RUN_CTRL runCtrl[MAX_RUN];
176
177/*
178*** Definition of rdBuffer to read in IP packets; keep it global !!!!
179 */
180
181typedef union
182{
183 uint8_t B[MAX_LEN];
184 uint16_t S[MAX_LEN / 2];
185 uint32_t I[MAX_LEN / 4];
186 uint64_t L[MAX_LEN / 8];
187} CNV_FACT;
188
189typedef struct
190{
191 int bufTyp; //what are we reading at the moment: 0=header 1=data -1=skip ...
192 int32_t bufPos; //next byte to read to the buffer next
193 int32_t bufLen; //number of bytes left to read
194 int32_t skip; //number of bytes skipped before start of event
195
196 int errCnt; //how often connect failed since last successful
197 int sockStat; //-1 if socket not yet connected , 99 if not exist
198 int socket; //contains the sockets
199
200 struct sockaddr_in SockAddr; //IP for each socket
201
202 int evtID; // event ID of event currently read
203 int runID; // run "
204 int ftmID; // event ID from FTM
205 uint fadLen; // FADlength of event currently read
206 int fadVers; // Version of FAD
207 int ftmTyp; // trigger type
208 int Port;
209
210 CNV_FACT *rBuf;
211
212} READ_STRUCT;
213
214/*-----------------------------------------------------------------*/
215
216
217/*-----------------------------------------------------------------*/
218
219
220
221int
222runFinish1 (uint32_t runnr)
223{
224 factPrintf(kInfo, 173, "Should finish(1) run %d (but not yet possible)", runnr);
225 return 0;
226}
227int
228runFinish (uint32_t runnr)
229{
230 factPrintf(kInfo, 173, "Should finish run %d (but not yet possible)", runnr);
231 return 0;
232}
233
234int
235GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
236 READ_STRUCT * rs)
237{
238/*
239*** generate Address, create sockets and allocates readbuffer for it
240***
241*** if flag==0 generate socket and buffer
242*** <0 destroy socket and buffer
243*** >0 close and redo socket
244***
245*** sid : board*7 + port id
246 */
247
248 //close socket if open
249 if (rs->sockStat == 0)
250 {
251 if (close (rs->socket) > 0) {
252 factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno);
253 } else {
254 factPrintf(kInfo, 771, "Succesfully closed socket %d", sid);
255 }
256 }
257
258 rs->sockStat = 99;
259
260 if (flag < 0) {
261 free (rs->rBuf); //and never open again
262 rs->rBuf = NULL;
263 return 0;
264 }
265
266
267 if (flag == 0) { //generate address and buffer ...
268 rs->Port = port;
269 rs->SockAddr.sin_family = sockAddr->sin_family;
270 rs->SockAddr.sin_port = htons (port);
271 rs->SockAddr.sin_addr = sockAddr->sin_addr;
272
273 rs->rBuf = (CNV_FACT*)malloc (sizeof (CNV_FACT));
274 if (rs->rBuf == NULL) {
275 factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid);
276 rs->sockStat = 77;
277 return -3;
278 }
279 }
280
281
282 if ((rs->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
283 factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno);
284 rs->sockStat = 88;
285 return -2;
286 }
287
288 int optval = 1;
289 if (setsockopt (rs->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) {
290 factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
291 }
292 optval = 10; //start after 10 seconds
293 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) {
294 factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
295 }
296 optval = 10; //do every 10 seconds
297 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) {
298 factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
299 }
300 optval = 2; //close after 2 unsuccessful tries
301 if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) {
302 factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
303 }
304
305 factPrintf(kInfo, 773, "Successfully generated socket %d", sid);
306
307 rs->sockStat = -1; //try to (re)open socket
308 rs->errCnt = 0;
309 return 0;
310
311} /*-----------------------------------------------------------------*/
312
313 /*-----------------------------------------------------------------*/
314
315int checkRoiConsistency(const CNV_FACT *rbuf, int roi[])
316{
317 int xjr = -1;
318 int xkr = -1;
319
320 //points to the very first roi
321 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
322
323 roi[0] = ntohs(rbuf->S[roiPtr]);
324
325 for (int jr = 0; jr < 9; jr++)
326 {
327 roi[jr] = ntohs(rbuf->S[roiPtr]);
328
329 if (roi[jr]<0 || roi[jr]>1024)
330 {
331 factPrintf(kError, 999, "Illegal roi in channel %d (allowed: 0<=roi<=1024)", jr, roi[jr]);
332 return 0;
333 }
334
335 // Check that the roi of pixels jr are compatible with the one of pixel 0
336 if (jr!=8 && roi[jr]!=roi[0])
337 {
338 xjr = jr;
339 break;
340 }
341
342 // Check that the roi of all other DRS chips on boards are compatible
343 for (int kr = 1; kr < 4; kr++)
344 {
345 const int kroi = ntohs(rbuf->S[roiPtr]);
346 if (kroi != roi[jr])
347 {
348 xjr = jr;
349 xkr = kr;
350 break;
351 }
352 roiPtr += kroi+4;
353 }
354 }
355
356 if (xjr>=0)
357 {
358 if (xkr<0)
359 factPrintf(kFatal, 1, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
360 else
361 factPrintf(kFatal, 1, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rbuf->S[roiPtr]));
362
363 return 0;
364 }
365
366 if (roi[8] < roi[0])
367 {
368 factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
369 //gj.badRoiB++;
370 //gj.badRoi[b]++;
371 return 0;
372 }
373
374 return 1;
375}
376
377int mBufEvt(const READ_STRUCT *rs)
378{
379// generate a new Event into mBuffer:
380// make sure only complete Event are possible, so 'free' will always work
381// returns index into mBuffer[], or negative value in case of error
382// error: <-9000 if roi screwed up (not consistent with run)
383// <-8000 (not consistent with event)
384// <-7000 (not consistent with board)
385// < 0 if no space left
386
387 int nRoi[9];
388 if (!checkRoiConsistency(rs->rBuf, nRoi))
389 return -9999;
390
391 const int evID = rs->evtID;
392 const uint runID = rs->runID;
393 const int trgTyp = rs->ftmTyp;
394 const int trgNum = rs->ftmID;
395 const int fadNum = rs->evtID;
396
397 const int beg = (evtCtrl_lastPtr + MAX_EVT*MAX_RUN - 1) % (MAX_EVT*MAX_RUN);
398 const int end = (evtCtrl_frstPtr + MAX_EVT*MAX_RUN - 1) % (MAX_EVT*MAX_RUN);
399
400 for (int k=beg; k!=end; k=(k+MAX_EVT*MAX_RUN-1)%(MAX_EVT*MAX_RUN))
401 {
402 // If the run is different, go on searching.
403 // We cannot stop searching if a lower run-id is found as in
404 // the case of the events, because theoretically, there
405 // can be the same run on two different days.
406 if (runID != evtCtrl[k].runNum)
407 continue;
408
409 // If the ID of the new event if higher than the last one stored
410 // in that run, we have to assign a new slot (leave the loop)
411 if (evID > evtCtrl[k].evNum/* && runID == evtCtrl[k].runNum*/)
412 break;
413
414 if (evID != evtCtrl[k].evNum/* || runID != evtCtrl[k].runNum*/)
415 continue;
416
417 // We have found an entry with the same runID and evtID
418 // Check if ROI is consistent
419 if (evtCtrl[k].nRoi != nRoi[0] || evtCtrl[k].nRoiTM != nRoi[8])
420 {
421 factPrintf(kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
422 evtCtrl[k].nRoi, evtCtrl[k].nRoiTM, nRoi[0], nRoi[8]);
423 return -8201;
424 }
425
426 // count for inconsistencies
427 if (evtCtrl[k].trgNum != trgNum)
428 evtCtrl[k].Errors[0]++;
429 if (evtCtrl[k].fadNum != fadNum)
430 evtCtrl[k].Errors[1]++;
431 if (evtCtrl[k].trgTyp != trgTyp)
432 evtCtrl[k].Errors[2]++;
433
434 //everything seems fine so far ==> use this slot ....
435 return k;
436 }
437
438 // Check if the control structure still has space left
439 if (end-beg==1 || (end==0 && beg==MAX_EVT*MAX_RUN-1))
440 {
441 factPrintf(kError, 881, "No control slot to keep event %d (run %d) %d %d", evID, runID, beg, end);
442 return -1;
443 }
444
445 // FIXME: This should be the time of the first receiped board
446 struct timeval tv;
447 gettimeofday (&tv, NULL);
448
449 const uint32_t tsec = tv.tv_sec;
450 const uint32_t tusec = tv.tv_usec;
451
452 //check if runId already registered in runCtrl
453
454 uint oldest = g_actTime + 1000;
455 int jold = -1;
456
457 int found = 0;
458
459 // fileId==-2: not yet used or run assigned but not open
460 // fileId== 0: file open
461 // fileId>0: run closed
462
463 for (int k=0; k<MAX_RUN; k++)
464 {
465 // Check if run already registered (old entries should have runId==-1)
466 if (runCtrl[k].runId == runID)
467 {
468 // FIXME: Compare to previous event
469 if (runCtrl[k].roi0 != nRoi[0] || runCtrl[k].roi8 != nRoi[8])
470 {
471 factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
472 runCtrl[k].roi0, runCtrl[k].roi8, nRoi[0], nRoi[8], runID, evID);
473 return -9301;
474 }
475
476 found = 1;
477 break;
478 }
479
480 // This is just for sanity. We use the oldest free entry (until
481 // we have understood the concept and can use "just" a free entry
482 if (runCtrl[k].runId==0 && runCtrl[k].closeTime < oldest)
483 {
484 oldest = runCtrl[k].closeTime;
485 jold = k;
486 }
487 }
488
489 if (!found) // Run not yet registered, register run
490 {
491 if (jold < 0)
492 {
493 factPrintf(kFatal, 883, "Not able to register the new run %d", runID);
494 return -1001;
495 }
496
497 const int evFree = jold;
498
499 factPrintf(kInfo, 503, "New run %d (evID=%d, evFree=%d) registered with roi=%d and roi_tm=%d",
500 runID, evID, evFree, nRoi[0], nRoi[8]);
501
502 runCtrl[evFree].runId = runID;
503 runCtrl[evFree].roi0 = nRoi[0]; // FIXME: Make obsolete!
504 runCtrl[evFree].roi8 = nRoi[8]; // FIXME: Make obsolete!
505 runCtrl[evFree].fileId = -2;
506 runCtrl[evFree].procId = -2;
507 runCtrl[evFree].lastEvt = 1; // Number of events partially started to read
508 runCtrl[evFree].actEvt = 0; // Number of written events (write)
509 runCtrl[evFree].procEvt = 0; // Number of successfully checked events (checkEvent)
510 runCtrl[evFree].maxEvt = 999999999; // max number events allowed
511 runCtrl[evFree].lastTime = tsec; // Time when the last event was written
512 runCtrl[evFree].closeTime = tsec + 3600 * 24; //max time allowed
513 }
514
515 // The new entry which is populated is the one lastPtr is pointing to
516 const int k = evtCtrl_lastPtr;
517
518 //flag all boards as unused
519 evtCtrl[k].nBoard = 0;
520 for (int b=0; b<NBOARDS; b++)
521 evtCtrl[k].board[b] = -1;
522
523 evtCtrl[k].pcTime[0] = tsec;
524 evtCtrl[k].pcTime[1] = tusec;
525 evtCtrl[k].nRoi = nRoi[0];
526 evtCtrl[k].nRoiTM = nRoi[8];
527 evtCtrl[k].evNum = evID;
528 evtCtrl[k].runNum = runID;
529 evtCtrl[k].fadNum = fadNum;
530 evtCtrl[k].trgNum = trgNum;
531 evtCtrl[k].trgTyp = trgTyp;
532 evtCtrl[k].Errors[0] = 0;
533 evtCtrl[k].Errors[1] = 0;
534 evtCtrl[k].Errors[2] = 0;
535 evtCtrl[k].Errors[3] = 0;
536 evtCtrl[k].fEvent = NULL;
537 evtCtrl[k].FADhead = NULL;
538
539 evtCtrl[k].evtStat = 0;
540
541 // This is dangerous, because theoretically, it can result is
542 // acessing invalid memory in another thread if this is split
543 // in two instructions. Must be done only _after_ the contents
544 // have been initialized
545 evtCtrl_lastPtr = (evtCtrl_lastPtr+1) % (MAX_EVT*MAX_RUN);
546
547 return k;
548
549} /*-----------------------------------------------------------------*/
550
551
552void initEvent(int i)
553{
554 evtCtrl[i].fEvent = (EVENT*)((char*)evtCtrl[i].FADhead+MAX_HEAD_MEM);
555 memset(evtCtrl[i].fEvent->Adc_Data, 0, (NPIX+NTMARK)*2*evtCtrl[i].nRoi);
556
557 //flag all pixels as unused
558 for (int k = 0; k < NPIX; k++)
559 evtCtrl[i].fEvent->StartPix[k] = -1;
560
561 //flag all TMark as unused
562 for (int k = 0; k < NTMARK; k++)
563 evtCtrl[i].fEvent->StartTM[k] = -1;
564
565 evtCtrl[i].fEvent->NumBoards = 0;
566 evtCtrl[i].fEvent->SoftTrig = 0;
567 evtCtrl[i].fEvent->PCTime = evtCtrl[i].pcTime[0];
568 evtCtrl[i].fEvent->PCUsec = evtCtrl[i].pcTime[1];
569 evtCtrl[i].fEvent->Roi = evtCtrl[i].nRoi;
570 evtCtrl[i].fEvent->RoiTM = evtCtrl[i].nRoiTM;
571 evtCtrl[i].fEvent->EventNum = evtCtrl[i].evNum;
572 evtCtrl[i].fEvent->TriggerNum = evtCtrl[i].trgNum;
573 evtCtrl[i].fEvent->TriggerType = evtCtrl[i].trgTyp;
574}
575
576
577int
578mBufFree (int i)
579{
580//delete entry [i] from mBuffer:
581//(and make sure multiple calls do no harm ....)
582
583 TGB_free(evtCtrl[i].FADhead);
584
585 evtCtrl[i].fEvent = NULL;
586 evtCtrl[i].FADhead = NULL;
587
588 evtCtrl[i].evNum = evtCtrl[i].nRoi = -1;
589 evtCtrl[i].runNum = 0;
590
591 gj.usdMem = tgb_inuse;
592
593 gj.bufTot--;
594
595 /*if (gi_memStat < 0) {
596 if (gj.usdMem <= 0.75 * gj.maxMem)
597 gi_memStat = +1;
598 }*/
599
600 return 0;
601
602} /*-----------------------------------------------------------------*/
603
604/*
605void
606resetEvtStat ()
607{
608 for (int i = 0; i < MAX_SOCK; i++)
609 gi.numRead[i] = 0;
610
611 for (int i = 0; i < NBOARDS; i++) {
612 gi.gotByte[i] = 0;
613 gi.gotErr[i] = 0;
614
615 }
616
617 gi.evtGet = 0; //#new Start of Events read
618 gi.evtTot = 0; //#complete Events read
619 gi.evtErr = 0; //#Events with Errors
620 gi.evtSkp = 0; //#Events incomplete (timeout)
621
622 gi.procTot = 0; //#Events processed
623 gi.procErr = 0; //#Events showed problem in processing
624 gi.procTrg = 0; //#Events accepted by SW trigger
625 gi.procSkp = 0; //#Events rejected by SW trigger
626
627 gi.feedTot = 0; //#Events used for feedBack system
628 gi.feedErr = 0; //#Events rejected by feedBack
629
630 gi.wrtTot = 0; //#Events written to disk
631 gi.wrtErr = 0; //#Events with write-error
632
633 gi.runOpen = 0; //#Runs opened
634 gi.runClose = 0; //#Runs closed
635 gi.runErr = 0; //#Runs with open/close errors
636
637 return;
638}*/ /*-----------------------------------------------------------------*/
639
640void reportIncomplete(int id)
641{
642 factPrintf(kWarn, 601, "%5d skip incomplete evt %8d",
643 evtCtrl[id].evNum, id);
644
645 uint64_t report = 0;
646
647 char str[1000];
648
649 int ik=0;
650 for (int ib=0; ib<NBOARDS; ib++)
651 {
652 if (ib%10==0)
653 str[ik++] = '|';
654
655 const int jb = evtCtrl[id].board[ib];
656 if (jb>=0) // data received from that board
657 {
658 str[ik++] = '0'+(jb%10);
659 continue;
660 }
661
662 // FIXME: Is that really 'b' or should that be 'ib' ?
663 if (gi_NumConnect[ib]<=0) // board not connected
664 {
665 str[ik++] = 'x';
666 continue;
667 }
668
669 // data from this board lost
670 str[ik++] = '.';
671 report |= ((uint64_t)1)<<ib;
672 }
673
674 str[ik++] = '|';
675 str[ik] = 0;
676
677 factOut(kWarn, 601, str);
678
679 factReportIncomplete(report);
680}
681
682// i == board
683void copyData(CNV_FACT *rbuf, int i, int evID)
684{
685 // swapEventHeaderBytes: End of the header. to channels now
686 int eStart = 36;
687 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
688 {
689 rbuf->S[eStart+0] = ntohs(rbuf->S[eStart+0]);//id
690 rbuf->S[eStart+1] = ntohs(rbuf->S[eStart+1]);//start_cell
691 rbuf->S[eStart+2] = ntohs(rbuf->S[eStart+2]);//roi
692 rbuf->S[eStart+3] = ntohs(rbuf->S[eStart+3]);//filling
693
694 eStart += 4+rbuf->S[eStart+2];//skip the pixel data
695 }
696
697 memcpy(&evtCtrl[evID].FADhead[i], rbuf, sizeof(PEVNT_HEADER));
698
699 int src = sizeof(PEVNT_HEADER) / 2;
700
701 // consistency of ROIs have been checked already (is it all correct?)
702 const int roi = rbuf->S[src+2];
703
704 // different sort in FAD board.....
705 for (int px = 0; px < 9; px++)
706 {
707 for (int drs = 0; drs < 4; drs++)
708 {
709 // pixH = rd[i].rBuf->S[src++]; // ID
710 src++;
711
712 const int pixC = rbuf->S[src++]; // start-cell
713 const int pixR = rbuf->S[src++]; // roi
714 //here we should check if pixH is correct ....
715
716 const int pixS = i * 36 + drs * 9 + px;
717 src++;
718
719 evtCtrl[evID].fEvent->StartPix[pixS] = pixC;
720
721 const int dest1 = pixS * roi;
722 memcpy(&evtCtrl[evID].fEvent->Adc_Data[dest1], &rbuf->S[src], roi * 2);
723
724 src += pixR;
725
726 if (px == 8)
727 {
728 const int tmS = i * 4 + drs;
729
730 //and we have additional TM info
731 if (pixR > roi)
732 {
733 const int dest2 = tmS * roi + NPIX * roi;
734
735 const int srcT = src - roi;
736 evtCtrl[evID].fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024;
737
738 memcpy(&evtCtrl[evID].fEvent->Adc_Data[dest2], &rbuf->S[srcT], roi * 2);
739 }
740 else
741 {
742 evtCtrl[evID].fEvent->StartTM[tmS] = -1;
743 }
744 }
745 }
746 }
747}
748
749
750void *readFAD (void *ptr)
751{
752/* *** main loop reading FAD data and sorting them to complete events */
753
754 factPrintf(kInfo, -1, "Start initializing (readFAD)");
755
756 READ_STRUCT rd[NBOARDS]; //buffer to read IP and afterwards store in mBuffer
757
758 const int minLen = sizeof(PEVNT_HEADER); //min #bytes needed to check header: full header for debug
759
760 //start.S = 0xFB01;
761 //stop.S = 0x04FE;
762
763/* initialize run control logics */
764 for (int i = 0; i < MAX_RUN; i++) {
765 runCtrl[i].runId = 0;
766 runCtrl[i].fileId = -2;
767 runCtrl[i].procId = -2;
768 }
769 gi_resetS = gi_resetR = 9;
770
771 int sockDef[NBOARDS]; //internal state of sockets
772 memset(sockDef, 0, NBOARDS*sizeof(int));
773
774 START:
775 evtCtrl_frstPtr = 0;
776 evtCtrl_lastPtr = 0;
777
778 //time in seconds
779 uint gi_SecTime = time(NULL);;
780
781 const int cntsock = 8 - NUMSOCK ;
782
783 if (gi_resetS > 0) {
784 //make sure all sockets are preallocated as 'not exist'
785 for (int i = 0; i < NBOARDS; i++) {
786 rd[i].socket = -1;
787 rd[i].sockStat = 99;
788 }
789
790 for (int k = 0; k < NBOARDS; k++) {
791 gi_NumConnect[k] = 0;
792 //gi.numConn[k] = 0;
793 gj.numConn[k] = 0;
794 //gj.errConn[k] = 0;
795 gj.rateBytes[k] = 0;
796 gj.totBytes[k] = 0;
797 }
798
799 }
800
801 if (gi_resetR > 0)
802 {
803 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
804 gj.usdMem = gj.maxMem = gj.xxxMem = 0;
805 gj.totMem = tgb_memory;
806 gj.bufNew = gj.bufEvt = 0;
807 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
808
809 // initialize mBuffer (mark all entries as unused\empty)
810 for (int i = 0; i < MAX_EVT * MAX_RUN; i++)
811 {
812 evtCtrl[i].evNum = evtCtrl[i].nRoi = -1;
813 evtCtrl[i].runNum = 0;
814 evtCtrl[i].evtStat = -1;
815 }
816
817 evtCtrl_frstPtr = 0;
818 evtCtrl_lastPtr = 0;
819
820 factPrintf(kInfo, -1, "End initializing (readFAD)");
821 }
822
823
824 gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
825
826 //loop until global variable g_runStat claims stop
827 while (g_runStat >= 0 && g_reset == 0)
828 {
829 gi_runStat = g_runStat;
830 gj.readStat = g_runStat;
831
832 struct timeval tv;
833 gettimeofday (&tv, NULL);
834 g_actTime = tv.tv_sec;
835 g_actUsec = tv.tv_usec;
836
837
838 for (int b = 0; b < NBOARDS; b++)
839 {
840 // Nothing has changed
841 if (g_port[b].sockDef == sockDef[b])
842 continue;
843
844 gi_NumConnect[b] = 0; //must close all connections
845 //gi.numConn[b] = 0;
846 gj.numConn[b] = 0;
847
848 // s0 = 0: sockets to be defined and opened
849 // s0 = -1: sockets to be destroyed
850 // s0 = +1: sockets to be closed and reopened
851
852 int s0 = 0;
853 if (sockDef[b] != 0)
854 s0 = g_port[b].sockDef==0 ? -1 : +1;
855
856 const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0;
857
858 GenSock(s0, b, p0+1, &g_port[b].sockAddr, &rd[b]); //generate address and socket
859
860 sockDef[b] = g_port[b].sockDef;
861 }
862
863 // count the number of active boards
864 int actBoards = 0;
865 for (int b = 0; b < NBOARDS; b++)
866 if (sockDef[b] > 0)
867 actBoards++;
868
869 //check all sockets if something to read
870 for (int i = 0; i < NBOARDS; i++)
871 {
872 // Do not try to connect this socket
873 if (rd[i].sockStat > 0)
874 continue;
875
876 if (rd[i].sockStat == -1)
877 {
878 //try to connect if not yet done
879 rd[i].sockStat = connect (rd[i].socket,
880 (struct sockaddr *) &rd[i].SockAddr,
881 sizeof (rd[i].SockAddr));
882 // Failed
883 if (rd[i].sockStat == -1)
884 {
885 rd[i].errCnt++;
886 usleep(25000);
887 continue;
888 }
889
890 // Success (rd[i].sockStat == 0)
891
892 if (sockDef[i] > 0)
893 {
894 rd[i].bufTyp = 0; // expect a header
895 rd[i].bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
896 }
897 else
898 {
899 rd[i].bufTyp = -1; // full data to be skipped
900 rd[i].bufLen = MAX_LEN; // huge for skipping
901 }
902
903 rd[i].bufPos = 0; // no byte read so far
904 rd[i].skip = 0; // start empty
905
906 gi_NumConnect[i] += cntsock;
907 gj.numConn[i]++;
908
909 factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", i, gj.numConn[i]);
910 }
911
912 // Do not read from this socket
913 if (rd[i].bufLen<0)
914 continue;
915
916 //numok++;
917
918 if (rd[i].bufLen>0)
919 {
920 const int32_t jrd =
921 recv(rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
922 rd[i].bufLen, MSG_DONTWAIT);
923
924 // recv failed
925 if (jrd<0)
926 {
927 // There was just nothing waiting
928 if (errno==EWOULDBLOCK || errno==EAGAIN)
929 {
930 //numok--;
931 continue;
932 }
933
934 factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);
935 //gi.gotErr[b]++;
936 continue;
937 }
938
939 // connection was closed ...
940 if (jrd==0)
941 {
942 factPrintf(kInfo, 441, "Socket %d closed by FAD", i);
943
944 const int s0 = sockDef[i] > 0 ? +1 : -1;
945 GenSock(s0, i, 0, NULL, &rd[i]);
946
947 //gi.gotErr[b]++;
948
949 gi_NumConnect[i]-= cntsock ;
950 //gi.numConn[b]--;
951 gj.numConn[i]--;
952
953 continue;
954 }
955 // Success (jrd > 0)
956
957 gj.rateBytes[i] += jrd;
958
959 // are we skipping this board ...
960 if (rd[i].bufTyp < 0)
961 continue;
962
963 rd[i].bufPos += jrd; //==> prepare for continuation
964 rd[i].bufLen -= jrd;
965
966#ifdef EVTDEBUG
967 debugRead(i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, rd[i].bufTyp, tv.tv_sec, tv.tv_usec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
968#endif
969 }
970
971 //we are reading event header
972 if (rd[i].bufTyp <= 0)
973 {
974 //not yet sufficient data to take action
975 if (rd[i].bufPos < minLen)
976 continue;
977
978 //check if startflag correct; else shift block ....
979 // FIXME: This is not enough... this combination of
980 // bytes can be anywhere... at least the end bytes
981 // must be checked somewhere, too.
982 int k;
983 for (k = 0; k < rd[i].bufPos - 1; k++)
984 {
985 //start.S = 0xFB01;
986 if (rd[i].rBuf->B[k] == 0xfb && rd[i].rBuf->B[k+1] == 0x01)
987 break;
988 }
989 rd[i].skip += k;
990
991 //no start of header found
992 if (k >= rd[i].bufPos - 1)
993 {
994 rd[i].rBuf->B[0] = rd[i].rBuf->B[rd[i].bufPos];
995 rd[i].bufPos = 1;
996 rd[i].bufLen = sizeof(PEVNT_HEADER)-1;
997 continue;
998 }
999
1000 if (k > 0)
1001 {
1002 rd[i].bufPos -= k;
1003 rd[i].bufLen += k;
1004 memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
1005 rd[i].bufPos);
1006 }
1007
1008 if (rd[i].bufPos < minLen)
1009 continue;
1010
1011 if (rd[i].skip > 0)
1012 {
1013 factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i);
1014 rd[i].skip = 0;
1015 }
1016
1017 // TGB: This needs much more checks than just the first two bytes!
1018
1019 // Swap everything except start_package_flag.
1020 // It is to difficult to find out where it is used how,
1021 // but it doesn't really matter because it is not really
1022 // used anywehere else
1023 // rd[i].rBuf->S[1] = ntohs(rd[i].rBuf->S[1]); // package_length
1024 rd[i].rBuf->S[2] = ntohs(rd[i].rBuf->S[2]); // version_no
1025 rd[i].rBuf->S[3] = ntohs(rd[i].rBuf->S[3]); // PLLLCK
1026 rd[i].rBuf->S[4] = ntohs(rd[i].rBuf->S[4]); // trigger_crc
1027 rd[i].rBuf->S[5] = ntohs(rd[i].rBuf->S[5]); // trigger_type
1028
1029 rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]); // board id
1030 rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]); // adc_clock_phase_shift
1031 rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]); // number_of_triggers_to_generate
1032 rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]); // trigger_generator_prescaler
1033
1034 rd[i].rBuf->I[3] = ntohl(rd[i].rBuf->I[3]); // trigger_id
1035 rd[i].rBuf->I[4] = ntohl(rd[i].rBuf->I[4]); // fad_evt_counter
1036 rd[i].rBuf->I[5] = ntohl(rd[i].rBuf->I[5]); // REFCLK_frequency
1037
1038 rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]); // runnumber;
1039 rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]); // time;
1040
1041 for (int s=24;s<24+NTemp+NDAC;s++)
1042 rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac
1043
1044 rd[i].fadLen = ntohs(rd[i].rBuf->S[1]) * 2;
1045 rd[i].fadVers = rd[i].rBuf->S[2];
1046 rd[i].ftmTyp = rd[i].rBuf->S[5];
1047 rd[i].ftmID = rd[i].rBuf->I[3]; //(FTMevt)
1048 rd[i].evtID = rd[i].rBuf->I[4]; //(FADevt)
1049 rd[i].runID = rd[i].rBuf->I[11]==0 ? g_actTime : rd[i].rBuf->I[11];
1050 rd[i].bufTyp = 1; //ready to read full record
1051 rd[i].bufLen = rd[i].fadLen - rd[i].bufPos;
1052
1053 const int fadBoard = rd[i].rBuf->S[12];
1054 debugHead(i, fadBoard, rd[i].rBuf);
1055
1056 continue;
1057 }
1058
1059 // are we reading data
1060
1061 // not yet all read
1062 if (rd[i].bufLen > 0)
1063 continue;
1064
1065 // stop.S = 0x04FE;
1066 if (rd[i].rBuf->B[rd[i].fadLen - 1] != 0xfe ||
1067 rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04)
1068 {
1069 //gi.evtErr++;
1070 factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d",
1071 i, rd[i].evtID, rd[i].fadLen,
1072 rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]);
1073
1074 // ready to read next header
1075 rd[i].bufTyp = 0;
1076 rd[i].bufLen = sizeof(PEVNT_HEADER);
1077 rd[i].bufPos = 0;
1078
1079 continue;
1080 }
1081
1082 // int actid;
1083 // if (g_useFTM > 0)
1084 // actid = rd[i].evtID;
1085 // else
1086 // actid = rd[i].ftmID;
1087
1088 //get index into mBuffer for this event (create if needed)
1089 const int idx = mBufEvt(&rd[i]);
1090
1091 // no free entry in mBuffer, retry later
1092 if (idx == -1)
1093 continue;
1094
1095 // We have a valid entry, but no memory has yet been allocated
1096 if (idx >= 0 && evtCtrl[idx].FADhead == NULL)
1097 {
1098 // Try to get memory from the big buffer
1099 evtCtrl[idx].FADhead = (PEVNT_HEADER*)TGB_Malloc();
1100 if (evtCtrl[idx].FADhead == NULL)
1101 {
1102 // If this works properly, this is a hack which can be removed, or
1103 // replaced by a signal or dim message
1104 if (rd[i].bufTyp==2)
1105 factPrintf(kError, 882, "malloc failed for event %d (run=%d)", evtCtrl[idx].evNum, evtCtrl[idx].runNum);
1106 rd[i].bufTyp = 2;
1107 continue;
1108 }
1109
1110 // Initialise contents of mBuffer[evID]->fEvent
1111 initEvent(idx);
1112
1113 // Some statistics
1114 gj.usdMem = tgb_inuse;
1115
1116 if (gj.usdMem > gj.maxMem)
1117 gj.maxMem = gj.usdMem;
1118
1119 gj.rateNew++;
1120 gj.bufTot++;
1121 if (gj.bufTot > gj.maxEvt)
1122 gj.maxEvt = gj.bufTot;
1123 }
1124
1125 // ready to read next header
1126 rd[i].bufTyp = 0;
1127 rd[i].bufLen = sizeof(PEVNT_HEADER);
1128 rd[i].bufPos = 0;
1129
1130 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1131 if (idx < -1000)
1132 continue;
1133
1134 //we have a valid entry in mBuffer[]; fill it
1135 const int fadBoard = rd[i].rBuf->S[12];
1136 const int fadCrate = fadBoard>>8;
1137
1138 if (i != (fadCrate * 10 + (fadBoard&0xff)))
1139 {
1140 factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)",
1141 i, fadBoard, fadCrate, fadBoard&0xff);
1142 }
1143
1144 if (evtCtrl[idx].board[i] != -1)
1145 {
1146 factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d",
1147 evtCtrl[idx].evNum, i, i, rd[i].fadLen,
1148 rd[i].rBuf->B[0], rd[i].rBuf->B[1],
1149 rd[i].rBuf->B[rd[i].fadLen - 2],
1150 rd[i].rBuf->B[rd[i].fadLen - 1]);
1151 continue; // Continue reading next header
1152 }
1153
1154 // Copy data from rd[i] to mBuffer[evID]
1155 copyData(rd[i].rBuf, i, idx);
1156
1157 // now we have stored a new board contents into Event structure
1158
1159 evtCtrl[idx].fEvent->NumBoards++;
1160 evtCtrl[idx].board[i] = i;
1161 evtCtrl[idx].nBoard++;
1162 evtCtrl[idx].evtStat = evtCtrl[idx].nBoard;
1163
1164 // have we already reported first (partial) event of this run ???
1165 if (evtCtrl[idx].nBoard==1 && evtCtrl[idx].runNum != actrun)
1166 {
1167 // Signal the fadctrl that a new run has been started
1168 gotNewRun(evtCtrl[idx].runNum, NULL);
1169
1170 factPrintf(kInfo, 1, "gotNewRun called, prev run %d, new run %d, event %d",
1171 actrun, evtCtrl[idx].runNum, evtCtrl[idx].evNum);
1172
1173 for (int j=0; j<MAX_RUN; j++)
1174 {
1175 // Since we have started a new run, we know already when to close the
1176 // previous run in terms of number of events
1177 if (runCtrl[j].runId==actrun)
1178 runCtrl[j].maxEvt = runCtrl[j].lastEvt;
1179
1180 // We got the first part of this event, so this is
1181 // the number of events we expect for this run
1182 if (runCtrl[j].runId==evtCtrl[idx].runNum)
1183 runCtrl[j].lastEvt++;
1184 }
1185
1186 // Change 'actrun' the the new runnumber
1187 actrun = evtCtrl[idx].runNum;
1188 }
1189
1190 // event not yet complete
1191 if (evtCtrl[idx].nBoard < actBoards)
1192 continue;
1193
1194 // This is a non-ideal hack to lower the probability that
1195 // in mBufEvt the search for correct entry in runCtrl
1196 // will not return a super-old entry
1197 for (int ir=0; ir<MAX_RUN; ir++)
1198 {
1199 if (runCtrl[ir].runId != actrun && runCtrl[ir].fileId>0)
1200 runCtrl[ir].runId = 0;
1201 }
1202
1203 // Flag that the event is ready for processing
1204 evtCtrl[idx].evtStat = 99;
1205
1206 } // end for loop over all sockets
1207
1208 g_actTime = time (NULL);
1209 if (g_actTime <= gi_SecTime)
1210 {
1211 usleep(1);
1212 continue;
1213 }
1214 gi_SecTime = g_actTime;
1215
1216 gj.bufNew = 0;
1217
1218 //loop over all active events and flag those older than read-timeout
1219 //delete those that are written to disk ....
1220
1221 const int count = (evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT*MAX_RUN)%(MAX_EVT*MAX_RUN);
1222
1223 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1224 {
1225 // Check the more likely case first: incomplete events
1226 if (evtCtrl[k0].evtStat>=0 && evtCtrl[k0].evtStat<92)
1227 {
1228 gj.bufNew++; //incomplete event in Buffer
1229
1230 // Event has not yet timed out or was reported already
1231 if (evtCtrl[k0].evtStat>=90 || evtCtrl[k0].pcTime[0]/*evtCtrl[k0].lastRecv*/>=g_actTime - 30)
1232 continue;
1233
1234 // This will result in the emission of a dim service.
1235 // It doesn't matter if that takes comparably long,
1236 // because we have to stop the run anyway.
1237 reportIncomplete(k0);
1238
1239 //timeout for incomplete events
1240 evtCtrl[k0].evtStat = 91;
1241 gj.evtSkip++;
1242
1243 continue;
1244 }
1245
1246 // complete event in Buffer
1247 //if (evtCtrl[k0].evtStat >= 95)
1248 // gj.bufEvt++;
1249
1250 // Check the less likely case: 'useless' or 'delete'
1251 // evtState==0 can happen if the event was initialized (some data received)
1252 // but the data did not make sense (e.g. inconsistent rois)
1253 if (evtCtrl[k0].evtStat==0 || evtCtrl[k0].evtStat >= 9000)
1254 {
1255 mBufFree(k0); //event written--> free memory
1256 evtCtrl[k0].evtStat = -1;
1257
1258 if (k0==evtCtrl_frstPtr)
1259 evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % (MAX_EVT*MAX_RUN);
1260 else
1261 factPrintf(kError, -1, "Freed a non-first slot");
1262
1263 gj.evtWrite++;
1264 gj.rateWrite++;
1265
1266 continue;
1267 }
1268
1269 // The first events in the queue are either incomplete or
1270 // can be deleted (processing finished). As soon as we reach the
1271 // first complete events which processing is pending, we can stop.
1272 // All other events (if everything works well) must have the same state.
1273 break;
1274 }
1275
1276 // The number of complete events in the buffer is the total number of
1277 // events in the buffer minus the number of incomplete events.
1278 gj.bufEvt = count - gj.bufNew;
1279
1280 gj.deltaT = 1000; //temporary, must be improved
1281
1282 for (int ib = 0; ib < NBOARDS; ib++)
1283 gj.totBytes[ib] += gj.rateBytes[ib];
1284
1285 gj.totMem = tgb_memory;
1286
1287 if (gj.maxMem > gj.xxxMem)
1288 gj.xxxMem = gj.maxMem;
1289 if (gj.maxEvt > gj.xxxEvt)
1290 gj.xxxEvt = gj.maxEvt;
1291
1292 factStat (gj);
1293 //factStatNew (gi);
1294 gj.rateNew = gj.rateWrite = 0;
1295 gj.maxMem = gj.usdMem;
1296 gj.maxEvt = gj.bufTot;
1297 for (int b = 0; b < NBOARDS; b++)
1298 gj.rateBytes[b] = 0;
1299
1300 } // while (g_runStat >= 0 && g_reset == 0)
1301
1302 factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset);
1303
1304 if (g_reset > 0)
1305 {
1306 gi_reset = g_reset;
1307 gi_resetR = gi_reset % 10; //shall we stop reading ?
1308 gi_resetS = (gi_reset / 10) % 10; //shall we close sockets ?
1309 gi_resetW = (gi_reset / 100) % 10; //shall we close files ?
1310 gi_resetX = gi_reset / 1000; //shall we simply wait resetX seconds ?
1311 g_reset = 0;
1312 }
1313 else
1314 {
1315 gi_reset = 0;
1316 gi_resetR = g_runStat == -1 ? 1 : 7;
1317
1318 gi_resetS = 7; //close all sockets
1319 gi_resetW = 7; //close all files
1320 gi_resetX = 0;
1321
1322 //inform others we have to quit ....
1323 gi_runStat = -11; //inform all that no update to happen any more
1324 gj.readStat = -11; //inform all that no update to happen any more
1325 }
1326
1327 if (gi_resetS > 0)
1328 {
1329 //must close all open sockets ...
1330 factPrintf(kInfo, -1, "Close all sockets...");
1331
1332 for (int i = 0; i < NBOARDS; i++)
1333 {
1334 if (rd[i].sockStat != 0)
1335 continue;
1336
1337 GenSock(-1, i, 0, NULL, &rd[i]); //close and destroy open socket
1338
1339 gi_NumConnect[i]-= cntsock ;
1340 gj.numConn[i]--;
1341 sockDef[i] = 0; //flag ro recreate the sockets ...
1342 rd[i].sockStat = -1; //and try to open asap
1343 }
1344 }
1345
1346
1347 if (gi_resetR > 0)
1348 {
1349 //and clear all buffers (might have to wait until all others are done)
1350 while (evtCtrl_frstPtr!=evtCtrl_lastPtr)
1351 {
1352 const int k0=evtCtrl_frstPtr;
1353
1354 // flag incomplete events as 'read finished'
1355 if (evtCtrl[k0].evtStat>0 && evtCtrl[k0].evtStat < 90)
1356 evtCtrl[k0].evtStat = 91;
1357
1358 if (evtCtrl[k0].evtStat==0 || evtCtrl[k0].evtStat>900)
1359 {
1360 mBufFree(k0); //event written--> free memory
1361 evtCtrl[k0].evtStat = -1;
1362
1363 evtCtrl_frstPtr = (evtCtrl_frstPtr+1) % (MAX_EVT*MAX_RUN);
1364 }
1365
1366 usleep(1);
1367 }
1368 }
1369
1370 if (gi_reset > 0)
1371 {
1372 if (gi_resetW > 0)
1373 CloseRunFile (0, 0, 0); //ask all Runs to be closed
1374
1375 if (gi_resetX > 0)
1376 {
1377 struct timespec xwait;
1378 xwait.tv_sec = gi_resetX;
1379 xwait.tv_nsec = 0;
1380 nanosleep (&xwait, NULL);
1381 }
1382
1383 factPrintf(kInfo, -1, "Continue read Process ...");
1384 gi_reset = 0;
1385 goto START;
1386 }
1387
1388 factPrintf(kInfo, -1, "Exit read Process...");
1389
1390 factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse);
1391
1392 gi_runStat = -99;
1393 gj.readStat = -99;
1394
1395 factStat (gj);
1396 factStatNew (gi);
1397
1398 return 0;
1399
1400} /*-----------------------------------------------------------------*/
1401
1402
1403void *subProc(void *thrid)
1404{
1405 const int64_t threadID = (int64_t)thrid;
1406
1407 factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID);
1408
1409 while (g_runStat > -2) //in case of 'exit' we still must process pending events
1410 {
1411 int numWait = 0;
1412 int numProc = 0;
1413
1414 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1415 {
1416 if (evtCtrl[k0].evtStat != 1000 + threadID)
1417 {
1418 if (evtCtrl[k0].evtStat < 1000 + threadID)
1419 numWait++;
1420
1421 continue;
1422 }
1423
1424 int jret = 9100; // flag to be deleted (gi_resetR>1 : flush buffers asap)
1425
1426 if (gi_resetR<=1)
1427 {
1428 jret = subProcEvt(threadID, evtCtrl[k0].FADhead,
1429 evtCtrl[k0].fEvent, NULL/*mBuffer[id].buffer*/);
1430
1431
1432 if (jret <= threadID) {
1433 factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret);
1434 jret = 5300;
1435 } else if (jret <= 0)
1436 jret = 9200 + threadID; // flag as 'to be deleted'
1437 else if (jret >= gi_maxProc)
1438 jret = 5200 + threadID; // flag as 'to be written'
1439 else
1440 jret = 1000 + jret; // flag for next proces
1441 }
1442
1443 evtCtrl[k0].evtStat = jret;
1444 numProc++;
1445 }
1446
1447 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1448 factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID);
1449 return 0;
1450 }
1451
1452 //seems we have nothing to do, so sleep a little
1453 if (numProc == 0)
1454 usleep(1);
1455 }
1456
1457 factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID);
1458
1459 return 0;
1460}
1461
1462/*-----------------------------------------------------------------*/
1463
1464
1465void *
1466procEvt (void *ptr)
1467{
1468/* *** main loop processing file, including SW-trigger */
1469 int status;
1470
1471 int lastRun = 0; //usually run from last event still valid
1472
1473// cpu_set_t mask;
1474// int cpu = 1; //process thread (will be several in final version)
1475
1476 factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc);
1477
1478/* CPU_ZERO initializes all the bits in the mask to zero. */
1479// CPU_ZERO (&mask);
1480/* CPU_SET sets only the bit corresponding to cpu. */
1481// CPU_SET( 0 , &mask ); leave for system
1482// CPU_SET( 1 , &mask ); used by write process
1483// CPU_SET (2, &mask);
1484// CPU_SET (3, &mask);
1485// CPU_SET (4, &mask);
1486// CPU_SET (5, &mask);
1487// CPU_SET (6, &mask);
1488// CPU_SET( 7 , &mask ); used by read process
1489/* sched_setaffinity returns 0 in success */
1490// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1491// snprintf (str, MXSTR, "P ---> can not create affinity to %d", cpu);
1492// factOut (kWarn, -1, str);
1493// }
1494
1495
1496 pthread_t thread[100];
1497// int th_ret[100];
1498
1499 for (long long k = 0; k < gi_maxProc; k++) {
1500 /*th_ret[k] =*/ pthread_create (&thread[k], NULL, subProc, (void *) k);
1501 }
1502
1503 // in case of 'exit' we still must process pending events
1504 while (g_runStat > -2)
1505 {
1506 int numWait = 0;
1507 int numProc = 0;
1508
1509 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1510 {
1511 if (evtCtrl[k0].evtStat <= 90 || evtCtrl[k0].evtStat >= 1000)
1512 {
1513 if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat< 90)
1514 numWait++;
1515
1516 continue;
1517 }
1518
1519 //we are asked to flush buffers asap
1520 if (gi_resetR > 1)
1521 {
1522 evtCtrl[k0].evtStat = 9991;
1523 continue;
1524 }
1525
1526 //-------- it is better to open the run already here, so call can be used to initialize
1527 //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
1528 const uint32_t irun = evtCtrl[k0].runNum;
1529 const int32_t ievt = evtCtrl[k0].evNum;
1530
1531 // Find entry in runCtrl which belongs to the event mBuffer[id]
1532 // (only check if there is a need to check)
1533 if (runCtrl[lastRun].runId != irun)
1534 {
1535 //check which fileID to use (or open if needed)
1536 int j;
1537 for (j=0;j<MAX_RUN; j++)
1538 if (runCtrl[j].runId == irun)
1539 break;
1540
1541 if (j>=MAX_RUN)
1542 {
1543 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, k0);
1544 // FIXME: What is the right action? (Flag event for deletion?)
1545 continue;
1546 }
1547
1548 lastRun = j;
1549 }
1550
1551 // File not yet open
1552 if (runCtrl[lastRun].fileId < 0)
1553 {
1554 //---- we need to open a new run ==> make sure all older runs are
1555 //---- finished and marked to be closed ....
1556 // This loop is unique to procEvt
1557 for (int j=0; j<MAX_RUN; j++)
1558 {
1559 if (runCtrl[j].fileId == 0)
1560 {
1561 runCtrl[j].procId = 2; //--> do no longer accept events for processing
1562
1563 //---- problem: processing still going on ==> must wait for closing ....
1564 factPrintf(kInfo, -1, "procEvt: Finished run since new one opened %d", runCtrl[j].runId);
1565 runFinish1(runCtrl[j].runId);
1566 }
1567 }
1568
1569 RUN_HEAD actRun;
1570 actRun.Version = 1;
1571 actRun.RunType = -1; //to be adapted
1572 actRun.Nroi = evtCtrl[k0].nRoi; //runCtrl[lastRun].roi0;
1573 actRun.NroiTM = evtCtrl[k0].nRoiTM; //runCtrl[lastRun].roi8;
1574 actRun.RunTime = evtCtrl[k0].pcTime[0]; //runCtrl[lastRun].firstTime;
1575 actRun.RunUsec = evtCtrl[k0].pcTime[1]; //runCtrl[lastRun].firstUsec;
1576 actRun.NBoard = NBOARDS;
1577 actRun.NPix = NPIX;
1578 actRun.NTm = NTMARK;
1579
1580 memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS*sizeof(PEVNT_HEADER));
1581
1582 runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun));
1583 if (runCtrl[lastRun].fileHd == NULL)
1584 {
1585 factPrintf(kError, 502, "procEvt: Could not open a file for run %d (runOpen failed)", irun);
1586 runCtrl[lastRun].fileId = 91;
1587 runCtrl[lastRun].procId = 91; // Is not set in writeEvt
1588 continue;
1589 }
1590
1591 runCtrl[lastRun].fileId = 0;
1592 runCtrl[lastRun].procId = 0; // Is not set in writeEvt
1593
1594 factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt);
1595 }
1596
1597 //-------- also check if run shall be closed (==> skip event, but do not close the file !!! )
1598 if (runCtrl[lastRun].procId == 0)
1599 {
1600 if (runCtrl[lastRun].closeTime < g_actTime ||
1601 runCtrl[lastRun].lastTime < g_actTime - 300 ||
1602 runCtrl[lastRun].maxEvt <= runCtrl[lastRun].procEvt)
1603 {
1604 factPrintf(kInfo, 502, "procEvt: Reached end of run condition for run %d", irun);
1605 runFinish1 (runCtrl[lastRun].runId);
1606 runCtrl[lastRun].procId = 1;
1607 }
1608 }
1609
1610 // Skip event because of no active run
1611 if (runCtrl[lastRun].procId != 0)
1612 {
1613 evtCtrl[k0].evtStat = 9091;
1614 continue;
1615 }
1616
1617 //--------
1618 //--------
1619
1620 //and set correct event header ; also check for consistency in event (not yet)
1621 evtCtrl[k0].fEvent->Errors[0] = evtCtrl[k0].Errors[0];
1622 evtCtrl[k0].fEvent->Errors[1] = evtCtrl[k0].Errors[1];
1623 evtCtrl[k0].fEvent->Errors[2] = evtCtrl[k0].Errors[2];
1624 evtCtrl[k0].fEvent->Errors[3] = evtCtrl[k0].Errors[3];
1625
1626 for (int ib=0; ib<NBOARDS; ib++)
1627 {
1628 // board is not read
1629 if (evtCtrl[k0].board[ib] == -1)
1630 {
1631 evtCtrl[k0].FADhead[ib].start_package_flag = 0;
1632 evtCtrl[k0].fEvent->BoardTime[ib] = 0;
1633 }
1634 else
1635 {
1636 evtCtrl[k0].fEvent->BoardTime[ib] = evtCtrl[k0].FADhead[ib].time;
1637 }
1638 }
1639
1640 const int rc = eventCheck(evtCtrl[k0].runNum, evtCtrl[k0].FADhead,
1641 evtCtrl[k0].fEvent);
1642 //gi.procTot++;
1643 numProc++;
1644
1645 if (rc < 0)
1646 {
1647 evtCtrl[k0].evtStat = 9999; //flag event to be skipped
1648 //gi.procErr++;
1649 }
1650 else
1651 {
1652 evtCtrl[k0].evtStat = 1000;
1653 runCtrl[lastRun].procEvt++;
1654 }
1655 }
1656
1657 if (gj.readStat < -10 && numWait == 0) { //nothing left to do
1658 factPrintf(kInfo, -1, "Exit Processing Process ...");
1659 gp_runStat = -22; //==> we should exit
1660 gj.procStat = -22; //==> we should exit
1661 return 0;
1662 }
1663
1664 //seems we have nothing to do, so sleep a little
1665 if (numProc == 0)
1666 usleep(1);
1667
1668 gp_runStat = gi_runStat;
1669 gj.procStat = gj.readStat;
1670
1671 }
1672
1673 //we are asked to abort asap ==> must flag all remaining events
1674 // when gi_runStat claims that all events are in the buffer...
1675
1676 factPrintf(kInfo, -1, "Abort Processing Process ...");
1677
1678 for (int k = 0; k < gi_maxProc; k++) {
1679 pthread_join (thread[k], (void **) &status);
1680 }
1681
1682 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1683 {
1684 if (evtCtrl[k0].evtStat >= 0 && evtCtrl[k0].evtStat < 1000)
1685 evtCtrl[k0].evtStat = 9800; //flag event as 'processed'
1686 }
1687
1688 gp_runStat = -99;
1689 gj.procStat = -99;
1690
1691 return 0;
1692
1693} /*-----------------------------------------------------------------*/
1694
1695int
1696CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
1697{
1698/* close run runId (all all runs if runId=0) */
1699/* return: 0=close scheduled / >0 already closed / <0 does not exist */
1700 int j;
1701
1702
1703 if (runId == 0) {
1704 for (j = 0; j < MAX_RUN; j++) {
1705 if (runCtrl[j].fileId == 0) { //run is open
1706 runCtrl[j].closeTime = closeTime;
1707 runCtrl[j].maxEvt = maxEvt;
1708 }
1709 }
1710 return 0;
1711 }
1712
1713 for (j = 0; j < MAX_RUN; j++) {
1714 if (runCtrl[j].runId == runId) {
1715 if (runCtrl[j].fileId == 0) { //run is open
1716 runCtrl[j].closeTime = closeTime;
1717 runCtrl[j].maxEvt = maxEvt;
1718 return 0;
1719 } else if (runCtrl[j].fileId < 0) { //run not yet opened
1720 runCtrl[j].closeTime = closeTime;
1721 runCtrl[j].maxEvt = maxEvt;
1722 return +1;
1723 } else { // run already closed
1724 return +2;
1725 }
1726 }
1727 } //we only reach here if the run was never created
1728 return -1;
1729
1730}
1731
1732void checkAndCloseRun(int j, int irun, int cond, int where)
1733{
1734 if (!cond &&
1735 runCtrl[j].closeTime >= g_actTime &&
1736 runCtrl[j].lastTime >= g_actTime - 300 &&
1737 runCtrl[j].maxEvt > runCtrl[j].actEvt)
1738 return;
1739
1740 //close run for whatever reason
1741 int ii = 0;
1742 if (cond)
1743 ii = 1;
1744 if (runCtrl[j].closeTime < g_actTime)
1745 ii |= 2; // = 2;
1746 if (runCtrl[j].lastTime < g_actTime - 300)
1747 ii |= 4; // = 3;
1748 if (runCtrl[j].maxEvt <= runCtrl[j].actEvt)
1749 ii |= 8; // = 4;
1750
1751 if (runCtrl[j].procId == 0)
1752 {
1753 runFinish1(runCtrl[j].runId);
1754 runCtrl[j].procId = 92;
1755 }
1756
1757 runCtrl[j].closeTime = g_actTime - 1;
1758
1759 const int rc = runClose(runCtrl[j].fileHd, NULL, 0);//&runTail[j], sizeof(runTail[j]));
1760 if (rc<0)
1761 {
1762 factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)",
1763 where, runCtrl[j].runId, rc);
1764 runCtrl[j].fileId = 92+where*2;
1765 }
1766 else
1767 {
1768 factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)",
1769 where, irun, ii);
1770 runCtrl[j].fileId = 93+where*2;
1771 }
1772}
1773
1774/*-----------------------------------------------------------------*/
1775
1776
1777void *writeEvt (void *ptr)
1778{
1779/* *** main loop writing event (including opening and closing run-files */
1780
1781// cpu_set_t mask;
1782// int cpu = 1; //write thread
1783
1784 factPrintf(kInfo, -1, "Starting write-thread");
1785
1786/* CPU_ZERO initializes all the bits in the mask to zero. */
1787// CPU_ZERO (&mask);
1788/* CPU_SET sets only the bit corresponding to cpu. */
1789// CPU_SET (cpu, &mask);
1790/* sched_setaffinity returns 0 in success */
1791// if (sched_setaffinity (0, sizeof (mask), &mask) == -1) {
1792// snprintf (str, MXSTR, "W ---> can not create affinity to %d", cpu);
1793// }
1794
1795 int lastRun = 0; //usually run from last event still valid
1796
1797 while (g_runStat > -2)
1798 {
1799 int numWrite = 0;
1800 int numWait = 0;
1801
1802 for (int k0=evtCtrl_frstPtr; k0!=evtCtrl_lastPtr; k0++, k0 %= MAX_EVT*MAX_RUN)
1803 {
1804 if (evtCtrl[k0].evtStat <= 5000 || evtCtrl[k0].evtStat >= 9000)
1805 {
1806 if (evtCtrl[k0].evtStat > 0 && evtCtrl[k0].evtStat < 9000)
1807 numWait++;
1808
1809 continue;
1810 }
1811
1812 //we must drain the buffer asap
1813 if (gi_resetR > 1)
1814 {
1815 evtCtrl[k0].evtStat = 9904;
1816 continue;
1817 }
1818
1819 const uint32_t irun = evtCtrl[k0].runNum;
1820 const int32_t ievt = evtCtrl[k0].evNum;
1821
1822 // Find entry in runCtrl which belongs to the event mBuffer[id]
1823 // (only check if there is a need to check)
1824 if (runCtrl[lastRun].runId != irun)
1825 {
1826 //check which fileID to use (or open if needed)
1827 int j;
1828 for (j=0;j<MAX_RUN; j++)
1829 if (runCtrl[j].runId == irun)
1830 break;
1831
1832 if (j>=MAX_RUN)
1833 {
1834 factPrintf(kFatal, 901, "writeEvt: Can not find run %d for event %d in %d", irun, ievt, k0);
1835 // FIXME: What is the right action?
1836 continue;
1837 }
1838
1839 lastRun = j;
1840 }
1841
1842 // File not yet open
1843 if (runCtrl[lastRun].fileId < 0)
1844 {
1845 RUN_HEAD actRun;
1846 actRun.Version = 1;
1847 actRun.RunType = -1; //to be adapted
1848 actRun.Nroi = evtCtrl[k0].nRoi; //runCtrl[lastRun].roi0;
1849 actRun.NroiTM = evtCtrl[k0].nRoiTM; //runCtrl[lastRun].roi8;
1850 actRun.RunTime = evtCtrl[k0].pcTime[0];//runCtrl[lastRun].firstTime;
1851 actRun.RunUsec = evtCtrl[k0].pcTime[1];//runCtrl[lastRun].firstUsec;
1852 actRun.NBoard = NBOARDS;
1853 actRun.NPix = NPIX;
1854 actRun.NTm = NTMARK;
1855
1856 memcpy(actRun.FADhead, evtCtrl[k0].FADhead, NBOARDS * sizeof (PEVNT_HEADER));
1857
1858 runCtrl[lastRun].fileHd = runOpen (irun, &actRun, sizeof (actRun));
1859 if (runCtrl[lastRun].fileHd == NULL)
1860 {
1861 factPrintf(kError, 502, "writeEvt: Could not open a file for run %d (runOpen failed)", irun);
1862 runCtrl[lastRun].fileId = 91;
1863 continue;
1864 }
1865
1866 runCtrl[lastRun].fileId = 0;
1867 factPrintf(kInfo, -1, "writeEvt: Opened new file for run %d (evt %d)", irun, ievt);
1868 }
1869
1870 if (runCtrl[lastRun].fileId > 0)
1871 {
1872 // There is an event but file is already closed
1873 /*
1874 if (runCtrl[j].fileId < 100)
1875 {
1876 factPrintf(kWarn, 123, "writeEvt: File for run %d is closed", irun);
1877 runCtrl[j].fileId += 100;
1878 }*/
1879
1880 evtCtrl[k0].evtStat = 9903;
1881 }
1882
1883 // File is open
1884 if (runCtrl[lastRun].fileId==0)
1885 {
1886 const int rc = runWrite(runCtrl[lastRun].fileHd, evtCtrl[k0].fEvent,
1887 0/*sizeof (evtCtrl[k0])*/);
1888 if (rc >= 0)
1889 {
1890 // Sucessfully wrote event
1891 runCtrl[lastRun].lastTime = g_actTime;
1892 runCtrl[lastRun].actEvt++;
1893
1894 evtCtrl[k0].evtStat = 9901;
1895 }
1896 else
1897 {
1898 factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", irun);
1899 evtCtrl[k0].evtStat = 9902;
1900 }
1901
1902 checkAndCloseRun(lastRun, irun, rc<0, 1);
1903 }
1904 }
1905/*
1906 //check if we should close a run (mainly when no event pending)
1907 //ETIENNE but first figure out which one is the latest run with a complete event.
1908 //i.e. max run Id and lastEvt >= 0
1909 //this condition is sufficient because all pending events were written already in the loop just above
1910 //actrun
1911 uint32_t lastStartedTime = 0;
1912 uint32_t runIdFound = 1;
1913
1914 //If we have an active run, look for its start time
1915 if (actrun != 0)
1916 {
1917 runIdfound = 0;
1918 for (int j=0;j<MAX_RUN;j++)
1919 {
1920 if (runCtrl[j].runId == actrun)
1921 {
1922 lastStartedTime = runCtrl[j].lastTime;
1923 runIdFound = 1;
1924 }
1925 }
1926 }
1927
1928 if (runIdFound == 0)
1929 {
1930 factPrintf(kInfo, 0, "An Active run (number %u) has been registered, but it could not be found in the runs list", actrun);
1931 }
1932
1933 //Also check if some files will never be opened
1934 //EDIT: this is completely useless, because as run Numbers are taken from FADs board,
1935 //I will never get run numbers for which no file is to be opened
1936 for (int j=0;j<MAX_RUN;j++)
1937 {
1938 if ((runCtrl[j].fileId < 0) &&
1939 (runCtrl[j].lastTime < lastStartedTime) &&
1940 (runCtrl[j].runId != 0))
1941 {
1942 factPrintf(kInfo, 0, "writeEvt: No file will be opened for run %u. Last run: %u (started)", runCtrl[j].runId, actrun);
1943 ;//TODO notify that this run will never be opened
1944 }
1945 }
1946 */
1947
1948 // Although the are no pending events, we have to check if a run should be closed (timeout)
1949 for (int j=0; j<MAX_RUN; j++)
1950 {
1951 if (runCtrl[j].fileId == 0)
1952 {
1953 //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
1954 const int cond = /*runCtrl[j].lastTime < lastStartedTime &&*/ runCtrl[j].runId == 0;
1955 checkAndCloseRun(j, runCtrl[j].runId, cond, 2);
1956 }
1957 }
1958
1959 //seems we have nothing to do, so sleep a little
1960 if (numWrite == 0)
1961 usleep(1);
1962
1963 //nothing left to do
1964 if (gj.readStat < -10 && numWait == 0)
1965 {
1966 factPrintf(kInfo, -1, "Finish Write Process ...");
1967 gw_runStat = -22; //==> we should exit
1968 gj.writStat = -22; //==> we should exit
1969 break;
1970 }
1971
1972 gw_runStat = gi_runStat;
1973 gj.writStat = gj.readStat;
1974 }
1975
1976 factPrintf(kInfo, -1, "Close all open files ...");
1977 for (int j=0; j<MAX_RUN; j++)
1978 {
1979 if (runCtrl[j].fileId == 0)
1980 checkAndCloseRun(j, runCtrl[j].runId, 1, 3);
1981 }
1982
1983 gw_runStat = -99;
1984 gj.writStat = -99;
1985
1986 factPrintf(kInfo, -1, "Exit Writing Process ...");
1987
1988 return 0;
1989} /*-----------------------------------------------------------------*/
1990
1991
1992
1993
1994void
1995StartEvtBuild ()
1996{
1997
1998 int i, /*j,*/ imax, status/*, th_ret[50]*/;
1999 pthread_t thread[50];
2000 struct timespec xwait;
2001
2002 gi_runStat = gp_runStat = gw_runStat = 0;
2003 gj.readStat = gj.procStat = gj.writStat = 0;
2004
2005 factPrintf(kInfo, -1, "Starting EventBuilder V15.07 A");
2006
2007//initialize run control logics
2008 for (i = 0; i < MAX_RUN; i++) {
2009 runCtrl[i].runId = 0;
2010 runCtrl[i].fileId = -2;
2011 }
2012
2013//prepare for subProcesses
2014 gi_maxSize = g_maxSize;
2015 if (gi_maxSize <= 0)
2016 gi_maxSize = 1;
2017
2018 gi_maxProc = g_maxProc;
2019 if (gi_maxProc <= 0 || gi_maxProc > 90) {
2020 factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc);
2021 gi_maxProc = 1;
2022 }
2023//partially initialize event control logics
2024 evtCtrl_frstPtr = 0;
2025 evtCtrl_lastPtr = 0;
2026
2027//start all threads (more to come) when we are allowed to ....
2028 while (g_runStat == 0) {
2029 xwait.tv_sec = 0;
2030 xwait.tv_nsec = 10000000; // sleep for ~10 msec
2031 nanosleep (&xwait, NULL);
2032 }
2033
2034 i = 0;
2035 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL);
2036 i++;
2037 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL);
2038 i++;
2039 /*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL);
2040 i++;
2041 imax = i;
2042
2043
2044#ifdef BILAND
2045 xwait.tv_sec = 30;;
2046 xwait.tv_nsec = 0; // sleep for ~20sec
2047 nanosleep (&xwait, NULL);
2048
2049 printf ("close all runs in 2 seconds\n");
2050
2051 CloseRunFile (0, time (NULL) + 2, 0);
2052
2053 xwait.tv_sec = 1;;
2054 xwait.tv_nsec = 0; // sleep for ~20sec
2055 nanosleep (&xwait, NULL);
2056
2057 printf ("setting g_runstat to -1\n");
2058
2059 g_runStat = -1;
2060#endif
2061
2062
2063//wait for all threads to finish
2064 for (i = 0; i < imax; i++) {
2065 /*j =*/ pthread_join (thread[i], (void **) &status);
2066 }
2067
2068} /*-----------------------------------------------------------------*/
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084 /*-----------------------------------------------------------------*/
2085 /*-----------------------------------------------------------------*/
2086 /*-----------------------------------------------------------------*/
2087 /*-----------------------------------------------------------------*/
2088 /*-----------------------------------------------------------------*/
2089
2090#ifdef BILAND
2091
2092int
2093subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
2094 int8_t * buffer)
2095{
2096 printf ("called subproc %d\n", threadID);
2097 return threadID + 1;
2098}
2099
2100
2101
2102
2103 /*-----------------------------------------------------------------*/
2104 /*-----------------------------------------------------------------*/
2105 /*-----------------------------------------------------------------*/
2106 /*-----------------------------------------------------------------*/
2107 /*-----------------------------------------------------------------*/
2108
2109
2110
2111
2112FileHandle_t
2113runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
2114{
2115 return 1;
2116};
2117
2118int
2119runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
2120{
2121 return 1;
2122 usleep (10000);
2123 return 1;
2124}
2125
2126
2127//{ return 1; } ;
2128
2129int
2130runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
2131{
2132 return 1;
2133};
2134
2135
2136
2137
2138int
2139eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
2140{
2141 int i = 0;
2142
2143// printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
2144// for (i=0; i<NBOARDS; i++) {
2145// printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
2146// }
2147 return 0;
2148}
2149
2150
2151void
2152factStatNew (EVT_STAT gi)
2153{
2154 int i;
2155
2156//for (i=0;i<MAX_SOCK;i++) {
2157// printf("%4d",gi.numRead[i]);
2158// if (i%20 == 0 ) printf("\n");
2159//}
2160}
2161
2162void
2163gotNewRun (int runnr, PEVNT_HEADER * headers)
2164{
2165 printf ("got new run %d\n", runnr);
2166 return;
2167}
2168
2169void
2170factStat (GUI_STAT gj)
2171{
2172// printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
2173// array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
2174}
2175
2176
2177void
2178debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
2179 int state, uint32_t tsec, uint32_t tusec)
2180{
2181// printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
2182}
2183
2184
2185
2186void
2187debugStream (int isock, void *buf, int len)
2188{
2189}
2190
2191void
2192debugHead (int i, int j, void *buf)
2193{
2194}
2195
2196
2197void
2198factOut (int severity, int err, char *message)
2199{
2200 static FILE *fd;
2201 static int file = 0;
2202
2203 if (file == 0) {
2204 printf ("open file\n");
2205 fd = fopen ("x.out", "w+");
2206 file = 999;
2207 }
2208
2209 fprintf (fd, "%3d %3d | %s \n", severity, err, message);
2210
2211 if (severity != kDebug)
2212 printf ("%3d %3d | %s\n", severity, err, message);
2213}
2214
2215
2216
2217int
2218main ()
2219{
2220 int i, b, c, p;
2221 char ipStr[100];
2222 struct in_addr IPaddr;
2223
2224 g_maxMem = 1024 * 1024; //MBytes
2225//g_maxMem = g_maxMem * 1024 *10 ; //10GBytes
2226 g_maxMem = g_maxMem * 200; //100MBytes
2227
2228 g_maxProc = 20;
2229 g_maxSize = 30000;
2230
2231 g_runStat = 40;
2232
2233 i = 0;
2234
2235// version for standard crates
2236//for (c=0; c<4,c++) {
2237// for (b=0; b<10; b++) {
2238// sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
2239//
2240// inet_pton(PF_INET, ipStr, &IPaddr) ;
2241//
2242// g_port[i].sockAddr.sin_family = PF_INET;
2243// g_port[i].sockAddr.sin_port = htons(5000) ;
2244// g_port[i].sockAddr.sin_addr = IPaddr ;
2245// g_port[i].sockDef = 1 ;
2246// i++ ;
2247// }
2248//}
2249//
2250//version for PC-test *
2251 for (c = 0; c < 4; c++) {
2252 for (b = 0; b < 10; b++) {
2253 sprintf (ipStr, "10.0.%d.11", 128 + c);
2254 if (c < 2)
2255 sprintf (ipStr, "10.0.%d.11", 128);
2256 else
2257 sprintf (ipStr, "10.0.%d.11", 131);
2258// if (c==0) sprintf(ipStr,"10.0.100.11") ;
2259
2260 inet_pton (PF_INET, ipStr, &IPaddr);
2261 p = 31919 + 100 * c + 10 * b;
2262
2263
2264 g_port[i].sockAddr.sin_family = PF_INET;
2265 g_port[i].sockAddr.sin_port = htons (p);
2266 g_port[i].sockAddr.sin_addr = IPaddr;
2267 g_port[i].sockDef = 1;
2268
2269 i++;
2270 }
2271 }
2272
2273
2274//g_port[17].sockDef =-1 ;
2275//g_actBoards-- ;
2276
2277 StartEvtBuild ();
2278
2279 return 0;
2280
2281}
2282#endif
Note: See TracBrowser for help on using the repository browser.