source: trunk/FACT++/src/EventBuilder.cc@ 16282

Last change on this file since 16282 was 16108, checked in by tbretz, 12 years ago
Added a new run-close condition for the case an invalid event was received.
File size: 40.5 KB
Line 
1#include <sys/time.h>
2#include <sys/epoll.h>
3#include <netinet/tcp.h>
4
5#include <cstring>
6#include <cstdarg>
7#include <list>
8#include <forward_list>
9
10#include <boost/algorithm/string/join.hpp>
11
12#include "queue.h"
13
14#include "MessageImp.h"
15#include "EventBuilder.h"
16
17using namespace std;
18
19#define MIN_LEN 32 // min #bytes needed to interpret FADheader
20#define MAX_LEN 81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092 (data+boardheader+eventheader+endflag)
21
22//#define COMPLETE_EVENTS
23//#define USE_EPOLL
24//#define USE_SELECT
25//#define COMPLETE_EPOLL
26
27// ==========================================================================
28
29bool runOpen(const shared_ptr<EVT_CTRL2> &evt);
30bool runWrite(const shared_ptr<EVT_CTRL2> &evt);
31void runClose();
32void applyCalib(const shared_ptr<EVT_CTRL2> &evt);
33void factOut(int severity, const char *message);
34void factReportIncomplete (uint64_t rep);
35void gotNewRun(RUN_CTRL2 &run);
36void runFinished();
37void factStat(GUI_STAT gj);
38bool eventCheck(const shared_ptr<EVT_CTRL2> &evt);
39void debugHead(void *buf);
40
41// ==========================================================================
42
43int g_reset;
44
45size_t g_maxMem; //maximum memory allowed for buffer
46
47FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
48
49uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
50
51GUI_STAT gj;
52
53// ==========================================================================
54
55void factPrintf(int severity, const char *fmt, ...)
56{
57 char str[1000];
58
59 va_list ap;
60 va_start(ap, fmt);
61 vsnprintf(str, 1000, fmt, ap);
62 va_end(ap);
63
64 factOut(severity, str);
65}
66
67// ==========================================================================
68
69#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
70#define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)
71
72namespace Memory
73{
74 uint64_t inuse = 0;
75 uint64_t allocated = 0;
76
77 uint64_t max_inuse = 0;
78
79 mutex mtx;
80
81 forward_list<void*> memory;
82
83 void *malloc()
84 {
85 // No free slot available, next alloc would exceed max memory
86 if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem)
87 return NULL;
88
89 // We will return this amount of memory
90 // This is not 100% thread safe, but it is not a super accurate measure anyway
91 inuse += MAX_TOT_MEM;
92 if (inuse>max_inuse)
93 max_inuse = inuse;
94
95 void *mem = NULL;
96
97 if (memory.empty())
98 {
99 // No free slot available, allocate a new one
100 allocated += MAX_TOT_MEM;
101 mem = new char[MAX_TOT_MEM];
102 }
103 else
104 {
105 // Get the next free slot from the stack and return it
106 const lock_guard<mutex> lock(mtx);
107 mem = memory.front();
108 memory.pop_front();
109 }
110
111 memset(mem, 0, MAX_HEAD_MEM);
112 return mem;
113 };
114
115 void free(void *mem)
116 {
117 if (!mem)
118 return;
119
120 // Decrease the amont of memory in use accordingly
121 inuse -= MAX_TOT_MEM;
122
123 // If the maximum memory has changed, we might be over the limit.
124 // In this case: free a slot
125 if (allocated>g_maxMem)
126 {
127 delete [] (char*)mem;
128 allocated -= MAX_TOT_MEM;
129 return;
130 }
131
132 const lock_guard<mutex> lock(mtx);
133 memory.push_front(mem);
134 }
135};
136
137// ==========================================================================
138
139struct READ_STRUCT
140{
141 enum buftyp_t
142 {
143 kStream,
144 kHeader,
145 kData,
146#ifdef COMPLETE_EVENTS
147 kWait
148#endif
149 };
150
151 // ---------- connection ----------
152
153 static uint activeSockets;
154
155 int sockId; // socket id (board number)
156 int socket; // socket handle
157 bool connected; // is this socket connected?
158
159 struct sockaddr_in SockAddr; // Socket address copied from wrapper during socket creation
160
161 // ------------ epoll -------------
162
163 static int fd_epoll;
164 static epoll_event events[NBOARDS];
165
166 static void init();
167 static void close();
168 static int wait();
169 static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); }
170
171 // ------------ buffer ------------
172
173 buftyp_t bufTyp; // what are we reading at the moment: 0=header 1=data -1=skip ...
174
175 uint32_t bufLen; // number of bytes left to read
176 uint8_t *bufPos; // next byte to read to the buffer next
177
178 union
179 {
180 uint8_t B[MAX_LEN];
181 uint16_t S[MAX_LEN / 2];
182 uint32_t I[MAX_LEN / 4];
183 uint64_t L[MAX_LEN / 8];
184 PEVNT_HEADER H;
185 };
186
187 uint64_t rateBytes;
188 uint32_t skip; // number of bytes skipped before start of event
189 bool repmem; // reportet no mmemory free
190
191 uint32_t len() const { return uint32_t(H.package_length)*2; }
192
193 void swapHeader();
194 void swapData();
195
196 // --------------------------------
197
198 READ_STRUCT() : socket(-1), connected(false), rateBytes(0)
199 {
200 if (fd_epoll<0)
201 init();
202 }
203 ~READ_STRUCT()
204 {
205 destroy();
206 }
207
208 void destroy();
209 bool create(sockaddr_in addr);
210 bool check(int, sockaddr_in addr);
211 bool read();
212};
213
214int READ_STRUCT::wait()
215{
216 // wait for something to do...
217 const int rc = epoll_wait(fd_epoll, events, NBOARDS, 10); // max, timeout[ms]
218 if (rc>=0)
219 return rc;
220
221 if (errno==EINTR) // timout or signal interruption
222 return 0;
223
224 factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno);
225 return -1;
226}
227
228uint READ_STRUCT::activeSockets = 0;
229int READ_STRUCT::fd_epoll = -1;
230epoll_event READ_STRUCT::events[NBOARDS];
231
232void READ_STRUCT::init()
233{
234 if (fd_epoll>=0)
235 return;
236
237#ifdef USE_EPOLL
238 fd_epoll = epoll_create(NBOARDS);
239 if (fd_epoll<0)
240 {
241 factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno);
242 return;
243 }
244#endif
245}
246
247void READ_STRUCT::close()
248{
249#ifdef USE_EPOLL
250 if (fd_epoll>=0 && ::close(fd_epoll)>0)
251 factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno);
252#endif
253
254 fd_epoll = -1;
255}
256
257bool READ_STRUCT::create(sockaddr_in sockAddr)
258{
259 if (socket>=0)
260 return false;
261
262 const int port = ntohs(sockAddr.sin_port) + 1;
263
264 SockAddr.sin_family = sockAddr.sin_family;
265 SockAddr.sin_addr = sockAddr.sin_addr;
266 SockAddr.sin_port = htons(port);
267
268 if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
269 {
270 factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno);
271 socket = -1;
272 return false;
273 }
274
275 int optval = 1;
276 if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0)
277 factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
278
279 optval = 10; //start after 10 seconds
280 if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0)
281 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
282
283 optval = 10; //do every 10 seconds
284 if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0)
285 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
286
287 optval = 2; //close after 2 unsuccessful tries
288 if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0)
289 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
290
291 factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket);
292
293 //connected = false;
294 activeSockets++;
295
296 return true;
297}
298
299void READ_STRUCT::destroy()
300{
301 if (socket<0)
302 return;
303
304#ifdef USE_EPOLL
305 // strictly speaking this should not be necessary
306 if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
307 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
308#endif
309
310 if (::close(socket) > 0)
311 factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
312 else
313 factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket);
314
315 socket = -1;
316 connected = false;
317 activeSockets--;
318}
319
320bool READ_STRUCT::check(int sockDef, sockaddr_in addr)
321{
322 // Continue in the most most likely case (performance)
323 //if (socket>=0 && sockDef!=0 && connected)
324 // return;
325 const int old = socket;
326
327 // socket open, but should not be open
328 if (socket>=0 && sockDef==0)
329 destroy();
330
331 // Socket closed, but should be open
332 if (socket<0 && sockDef!=0)
333 create(addr); //generate address and socket
334
335 const bool retval = old!=socket;
336
337 // Socket closed
338 if (socket<0)
339 return retval;
340
341 // Socket open and connected: Nothing to do
342 if (connected)
343 return retval;
344
345 //try to connect if not yet done
346 const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
347 if (rc == -1)
348 return retval;
349
350 connected = true;
351
352 if (sockDef<0)
353 {
354 bufTyp = READ_STRUCT::kStream; // full data to be skipped
355 bufLen = MAX_LEN; // huge for skipping
356 }
357 else
358 {
359 bufTyp = READ_STRUCT::kHeader; // expect a header
360 bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
361 }
362
363 bufPos = B; // no byte read so far
364 skip = 0; // start empty
365 repmem = false;
366
367 factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
368
369#ifdef USE_EPOLL
370 epoll_event ev;
371 ev.events = EPOLLIN;
372 ev.data.ptr = this; // user data (union: ev.ptr)
373 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
374 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
375#endif
376
377 return retval;
378}
379
380bool READ_STRUCT::read()
381{
382 if (bufLen==0)
383 return true;
384
385 const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
386 // recv failed
387 if (jrd<0)
388 {
389 // There was just nothing waiting
390 if (errno==EWOULDBLOCK || errno==EAGAIN)
391 return false;
392
393 factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno);
394 return false;
395 }
396
397 // connection was closed ...
398 if (jrd==0)
399 {
400 factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId);
401
402 destroy();//DestroySocket(rd[i]); //generate address and socket
403 return false;
404 }
405
406 rateBytes += jrd;
407
408 // are we skipping this board ...
409 if (bufTyp==kStream)
410 return false;
411
412 bufPos += jrd; //==> prepare for continuation
413 bufLen -= jrd;
414
415 // not yet all read
416 return bufLen==0;
417}
418
419void READ_STRUCT::swapHeader()
420{
421 S[1] = ntohs(S[1]); // package_length (bytes not swapped!)
422 S[2] = ntohs(S[2]); // version_no
423 S[3] = ntohs(S[3]); // PLLLCK
424 S[4] = ntohs(S[4]); // trigger_crc
425 S[5] = ntohs(S[5]); // trigger_type
426
427 I[3] = ntohl(I[3]); // trigger_id
428 I[4] = ntohl(I[4]); // fad_evt_counter
429 I[5] = ntohl(I[5]); // REFCLK_frequency
430
431 S[12] = ntohs(S[12]); // board id
432 S[13] = ntohs(S[13]); // adc_clock_phase_shift
433 S[14] = ntohs(S[14]); // number_of_triggers_to_generate
434 S[15] = ntohs(S[15]); // trigger_generator_prescaler
435
436 I[10] = ntohl(I[10]); // runnumber;
437 I[11] = ntohl(I[11]); // time;
438
439 for (int s=24; s<24+NTemp+NDAC; s++)
440 S[s] = ntohs(S[s]); // drs_temperature / dac
441}
442
443void READ_STRUCT::swapData()
444{
445 // swapEventHeaderBytes: End of the header. to channels now
446
447 int i = 36;
448 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
449 {
450 S[i+0] = ntohs(S[i+0]);//id
451 S[i+1] = ntohs(S[i+1]);//start_cell
452 S[i+2] = ntohs(S[i+2]);//roi
453 S[i+3] = ntohs(S[i+3]);//filling
454
455 i += 4+S[i+2];//skip the pixel data
456 }
457}
458
459// ==========================================================================
460
461bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
462{
463 int xjr = -1;
464 int xkr = -1;
465
466 //points to the very first roi
467 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
468
469 roi[0] = ntohs(rd.S[roiPtr]);
470
471 for (int jr = 0; jr < 9; jr++)
472 {
473 roi[jr] = ntohs(rd.S[roiPtr]);
474
475 if (roi[jr]>1024)
476 {
477 factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]);
478 return false;
479 }
480
481 // Check that the roi of pixels jr are compatible with the one of pixel 0
482 if (jr!=8 && roi[jr]!=roi[0])
483 {
484 xjr = jr;
485 break;
486 }
487
488 // Check that the roi of all other DRS chips on boards are compatible
489 for (int kr = 1; kr < 4; kr++)
490 {
491 const int kroi = ntohs(rd.S[roiPtr]);
492 if (kroi != roi[jr])
493 {
494 xjr = jr;
495 xkr = kr;
496 break;
497 }
498 roiPtr += kroi+4;
499 }
500 }
501
502 if (xjr>=0)
503 {
504 if (xkr<0)
505 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
506 else
507 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr]));
508
509 return false;
510 }
511
512 if (roi[8] < roi[0])
513 {
514 factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
515 return false;
516 }
517
518 return true;
519}
520
521list<shared_ptr<EVT_CTRL2>> evtCtrl;
522
523shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
524{
525 uint16_t nRoi[9];
526 if (!checkRoiConsistency(rd, nRoi))
527 return shared_ptr<EVT_CTRL2>();
528
529 for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
530 {
531 // A reference is enough because the evtCtrl holds the shared_ptr anyway
532 const shared_ptr<EVT_CTRL2> &evt = *it;
533
534 // If the run is different, go on searching.
535 // We cannot stop searching if a lower run-id is found as in
536 // the case of the events, because theoretically, there
537 // can be the same run on two different days.
538 if (rd.H.runnumber != evt->runNum)
539 continue;
540
541 // If the ID of the new event if higher than the last one stored
542 // in that run, we have to assign a new slot (leave the loop)
543 if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/)
544 break;
545
546 if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/)
547 continue;
548
549 // We have found an entry with the same runID and evtID
550 // Check if ROI is consistent
551 if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
552 {
553 factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
554 evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
555 return shared_ptr<EVT_CTRL2>();
556 }
557
558 // count for inconsistencies
559 if (evt->trgNum != rd.H.trigger_id)
560 evt->Errors[0]++;
561 if (evt->trgTyp != rd.H.trigger_type)
562 evt->Errors[2]++;
563
564 //everything seems fine so far ==> use this slot ....
565 return evt;
566 }
567
568 if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
569 {
570 factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
571 actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter);
572 return shared_ptr<EVT_CTRL2>();
573 }
574
575 shared_ptr<EVT_CTRL2> evt(new EVT_CTRL2);
576
577 gettimeofday(&evt->time, NULL);
578
579 evt->runNum = rd.H.runnumber;
580 evt->evNum = rd.H.fad_evt_counter;
581
582 evt->trgNum = rd.H.trigger_id;
583 evt->trgTyp = rd.H.trigger_type;
584
585 evt->nRoi = nRoi[0];
586 evt->nRoiTM = nRoi[8];
587
588 const bool newrun = actrun->runId != rd.H.runnumber;
589 if (newrun)
590 {
591 // Since we have started a new run, we know already when to close the
592 // previous run in terms of number of events
593 actrun->maxEvt = actrun->lastEvt;
594
595 factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d",
596 rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
597
598 // The new run is the active run now
599 actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
600
601 const time_t &tsec = evt->time.tv_sec;
602
603 actrun->openTime = tsec;
604 actrun->closeTime = tsec + 3600 * 24; // max time allowed
605 actrun->runId = rd.H.runnumber;
606 actrun->roi0 = nRoi[0]; // FIXME: Make obsolete!
607 actrun->roi8 = nRoi[8]; // FIXME: Make obsolete!
608
609 // Signal the fadctrl that a new run has been started
610 // Note this is the only place at which we can ensure that
611 // gotnewRun is called only once
612 gotNewRun(*actrun);
613 }
614
615 // Increase the number of events we have started to receive in this run
616 actrun->lastTime = evt->time.tv_sec; // Time when the last event was received
617 actrun->lastEvt++;
618
619 // Keep pointer to run of this event
620 evt->runCtrl = actrun;
621
622 // Secure access to evtCtrl against access in CloseRunFile
623 // This should be the last... otherwise we can run into threading issues
624 // if the event is accessed before it is fully initialized.
625 evtCtrl.push_back(evt);
626
627 // An event can be the first and the last, but not the last and the first.
628 // Therefore gotNewRun is called before runFinished.
629 // runFinished signals that the last event of a run was just received. Processing
630 // might still be ongoing, but we can start a new run.
631 const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached
632 const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached
633 if (!cond1 || !cond2)
634 runFinished();
635
636 return evt;
637}
638
639
640void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
641{
642 const int i = rBuf.sockId;
643
644 memcpy(evt->FADhead.get()+i, &rBuf.H, sizeof(PEVNT_HEADER));
645
646 int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts
647
648 // consistency of ROIs have been checked already (is it all correct?)
649 const uint16_t &roi = rBuf.S[src+2];
650
651 // different sort in FAD board.....
652 for (int px = 0; px < 9; px++)
653 {
654 for (int drs = 0; drs < 4; drs++)
655 {
656 const int16_t pixC = rBuf.S[src+1]; // start-cell
657 const int16_t pixR = rBuf.S[src+2]; // roi
658 //here we should check if pixH is correct ....
659
660 const int pixS = i*36 + drs*9 + px;
661
662 evt->fEvent->StartPix[pixS] = pixC;
663
664 memcpy(evt->fEvent->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2);
665
666 src += 4+pixR;
667
668 // Treatment for ch 9 (TM channel)
669 if (px != 8)
670 continue;
671
672 const int tmS = i*4 + drs;
673
674 //and we have additional TM info
675 if (pixR > roi)
676 {
677 evt->fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024;
678
679 memcpy(evt->fEvent->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2);
680 }
681 else
682 {
683 evt->fEvent->StartTM[tmS] = -1;
684 }
685 }
686 }
687}
688
689// ==========================================================================
690
691uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt)
692{
693 factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)",
694 evt->runNum, evt->evNum, evtCtrl.size(), txt);
695
696 uint64_t report = 0;
697
698 char str[1000];
699
700 int ik=0;
701 for (int ib=0; ib<NBOARDS; ib++)
702 {
703 if (ib%10==0)
704 str[ik++] = '|';
705
706 const int jb = evt->board[ib];
707 if (jb>=0) // data received from that board
708 {
709 str[ik++] = '0'+(jb%10);
710 continue;
711 }
712
713 // FIXME: This is not synchronous... it reports
714 // accoridng to the current connection status, not w.r.t. to the
715 // one when the event was taken.
716 if (gi_NumConnect[ib]==0) // board not connected
717 {
718 str[ik++] = 'x';
719 continue;
720 }
721
722 // data from this board lost
723 str[ik++] = '.';
724 report |= ((uint64_t)1)<<ib;
725 }
726
727 str[ik++] = '|';
728 str[ik] = 0;
729
730 factOut(MessageImp::kWarn, str);
731
732 return report;
733}
734
735// ==========================================================================
736// ==========================================================================
737
738Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&applyCalib, placeholders::_1));
739
740// If this is not convenient anymore, it could be replaced by
741// a command queue, to which command+data is posted,
742// (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo)
743void writeEvt(const shared_ptr<EVT_CTRL2> &evt)
744{
745 const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
746
747 // Is this a valid event or just an empty event to trigger run close?
748 // If this is not an empty event open the new run-file
749 // Empty events are there to trigger run-closing conditions
750 if (evt->runNum>=0)
751 {
752 // File not yet open
753 if (run->fileStat==kFileNotYetOpen)
754 {
755 // runOpen will close a previous run, if still open
756 if (!runOpen(evt))
757 {
758 factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
759 run->fileStat = kFileClosed;
760 return;
761 }
762
763 factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
764 run->fileStat = kFileOpen;
765 }
766
767 // Here we have a valid calibration and can go on with that.
768 processingQueue1.post(evt);
769 }
770
771 // File already closed
772 if (run->fileStat==kFileClosed)
773 return;
774
775 bool rc1 = true;
776 if (evt->runNum>=0)
777 {
778 rc1 = runWrite(evt);
779 if (!rc1)
780 factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
781 }
782
783 const bool cond1 = run->lastEvt < run->maxEvt; // max number of events not reached
784 const bool cond2 = run->lastTime < run->closeTime; // max time not reached
785 const bool cond3 = run->closeRequest==kRequestNone; // file signaled to be closed
786 const bool cond4 = rc1; // Write successfull
787
788 // File is not yet to be closed.
789 if (cond1 && cond2 && cond3 && cond4)
790 return;
791
792 runClose();
793 run->fileStat = kFileClosed;
794
795 vector<string> reason;
796 if (!cond1)
797 reason.push_back(to_string(run->maxEvt)+" evts reached");
798 if (!cond2)
799 reason.push_back(to_string(run->closeTime-run->openTime)+"s reached");
800 if (!cond3)
801 {
802 if (run->closeRequest&kRequestManual)
803 reason.push_back("close requested");
804 if (run->closeRequest&kRequestTimeout)
805 reason.push_back("receive timeout");
806 if (run->closeRequest&kRequestConnectionChange)
807 reason.push_back("connection changed");
808 if (run->closeRequest&kRequestEventCheckFailed)
809 reason.push_back("event check failed");
810 }
811 if (!cond4)
812 reason.push_back("runWrite failed");
813
814 const string str = boost::algorithm::join(reason, ", ");
815 factPrintf(MessageImp::kInfo, "File closed because %s", str.c_str());
816}
817
818Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1));
819
820void procEvt(const shared_ptr<EVT_CTRL2> &evt)
821{
822 if (evt->runNum>=0)
823 {
824 evt->fEvent->Errors[0] = evt->Errors[0];
825 evt->fEvent->Errors[1] = evt->Errors[1];
826 evt->fEvent->Errors[2] = evt->Errors[2];
827 evt->fEvent->Errors[3] = evt->Errors[3];
828
829 for (int ib=0; ib<NBOARDS; ib++)
830 evt->fEvent->BoardTime[ib] = evt->FADhead.get()[ib].time;
831
832 if (!eventCheck(evt))
833 {
834 evt->runCtrl->closeRequest = kRequestEventCheckFailed;
835 return;
836 }
837 }
838
839 // If file is open post the event for being written
840 secondaryQueue.post(evt);
841}
842
843// ==========================================================================
844// ==========================================================================
845
846shared_ptr<RUN_CTRL2> actrun; // needed in CloseRunFile
847
848/*
849 task 1-4:
850
851 lock1()-lock4();
852 while (1)
853 {
854 wait for signal [lockN]; // unlocked
855
856 while (n!=10)
857 wait sockets;
858 read;
859
860 lockM();
861 finished[n] = true;
862 signal(mainloop);
863 unlockM();
864 }
865
866
867 mainloop:
868
869 while (1)
870 {
871 lockM();
872 while (!finished[0] || !finished[1] ...)
873 wait for signal [lockM]; // unlocked... signals can be sent
874 finished[0-1] = false;
875 unlockM()
876
877 copy data to queue // locked
878
879 lockN[0-3];
880 signalN[0-3];
881 unlockN[0-3];
882 }
883
884
885 */
886
887/*
888 while (g_reset)
889 {
890 shared_ptr<EVT_CTRL2> evt = new shared_ptr<>;
891
892 // Check that all sockets are connected
893
894 for (int i=0; i<40; i++)
895 if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0)
896 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
897
898 while (g_reset)
899 {
900 if (READ_STRUCT::wait()<0)
901 break;
902
903 if (rc_epoll==0)
904 break;
905
906 for (int jj=0; jj<rc_epoll; jj++)
907 {
908 READ_STRUCT *rs = READ_STRUCT::get(jj);
909 if (!rs->connected)
910 continue;
911
912 const bool rc_read = rs->read();
913 if (!rc_read)
914 continue;
915
916 if (rs->bufTyp==READ_STRUCT::kHeader)
917 {
918 [...]
919 }
920
921 [...]
922
923 if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
924 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
925 }
926
927 if (once_a_second)
928 {
929 if (evt==timeout)
930 break;
931 }
932 }
933
934 if (evt.nBoards==actBoards)
935 primaryQueue.post(evt);
936 }
937*/
938
939void CloseRunFile()
940{
941 // Create a copy of the shared_ptr to ensure
942 // is not replaced in the middle of the action
943 const shared_ptr<RUN_CTRL2> run = actrun;
944 if (run)
945 run->closeRequest |= kRequestManual;
946}
947
948bool mainloop(READ_STRUCT *rd)
949{
950 factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
951
952 Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
953
954 primaryQueue.start();
955 secondaryQueue.start();
956
957 actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
958
959 //time in seconds
960 time_t gi_SecTime = time(NULL)-1;
961
962 //loop until global variable g_runStat claims stop
963 g_reset = 0;
964 while (g_reset == 0)
965 {
966#ifdef USE_SELECT
967 fd_set readfs;
968 FD_ZERO(&readfs);
969 int nfsd = 0;
970 for (int i=0; i<NBOARDS; i++)
971 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
972 {
973 FD_SET(rd[i].socket, &readfs);
974 if (rd[i].socket>nfsd)
975 nfsd = rd[i].socket;
976 }
977
978 timeval tv;
979 tv.tv_sec = 0;
980 tv.tv_usec = 100;
981 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
982 // 0: timeout
983 // -1: error
984 if (rc_select<0)
985 {
986 factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
987 continue;
988 }
989#endif
990
991#ifdef USE_EPOLL
992 const int rc_epoll = READ_STRUCT::wait();
993 if (rc_epoll<0)
994 break;
995#endif
996
997#ifdef USE_EPOLL
998 for (int jj=0; jj<rc_epoll; jj++)
999#else
1000 for (int jj=0; jj<NBOARDS; jj++)
1001#endif
1002 {
1003#ifdef USE_EPOLL
1004 // FIXME: How to get i?
1005 READ_STRUCT *rs = READ_STRUCT::get(jj);
1006#else
1007
1008 const int i = (jj%4)*10 + (jj/4);
1009 READ_STRUCT *rs = &rd[i];
1010 if (!rs->connected)
1011 continue;
1012#endif
1013
1014#ifdef USE_SELECT
1015 if (!FD_ISSET(rs->socket, &readfs))
1016 continue;
1017#endif
1018
1019
1020#ifdef COMPLETE_EVENTS
1021 if (rs->bufTyp==READ_STRUCT::kWait)
1022 continue;
1023#endif
1024
1025 // ==================================================================
1026
1027 const bool rc_read = rs->read();
1028
1029 // Connect might have gotten closed during read
1030 gi_NumConnect[rs->sockId] = rs->connected;
1031 gj.numConn[rs->sockId] = rs->connected;
1032
1033 // Read either failed or disconnected, or the buffer is not yet full
1034 if (!rc_read)
1035 continue;
1036
1037 // ==================================================================
1038
1039 if (rs->bufTyp==READ_STRUCT::kHeader)
1040 {
1041 //check if startflag correct; else shift block ....
1042 // FIXME: This is not enough... this combination of
1043 // bytes can be anywhere... at least the end bytes
1044 // must be checked somewhere, too.
1045 uint k;
1046 for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
1047 {
1048 //if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
1049 if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0x01fb)
1050 break;
1051 }
1052 rs->skip += k;
1053
1054 //no start of header found
1055 if (k==sizeof(PEVNT_HEADER)-1)
1056 {
1057 rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1];
1058 rs->bufPos = rs->B+1;
1059 rs->bufLen = sizeof(PEVNT_HEADER)-1;
1060 continue;
1061 }
1062
1063 if (k > 0)
1064 {
1065 memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
1066
1067 rs->bufPos -= k;
1068 rs->bufLen += k;
1069
1070 continue; // We need to read more (bufLen>0)
1071 }
1072
1073 if (rs->skip>0)
1074 {
1075 factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
1076 rs->skip = 0;
1077 }
1078
1079 // Swap the header entries from network to host order
1080 rs->swapHeader();
1081
1082 rs->bufTyp = READ_STRUCT::kData;
1083 rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
1084
1085 debugHead(rs->B); // i and fadBoard not used
1086
1087 continue;
1088 }
1089
1090 const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
1091 if (end != 0xfe04)
1092 {
1093 factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
1094 rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
1095
1096 // ready to read next header
1097 rs->bufTyp = READ_STRUCT::kHeader;
1098 rs->bufLen = sizeof(PEVNT_HEADER);
1099 rs->bufPos = rs->B;
1100 // FIXME: What to do with the validity flag?
1101 continue;
1102 }
1103
1104 // get index into mBuffer for this event (create if needed)
1105 const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
1106
1107 // We have a valid entry, but no memory has yet been allocated
1108 if (evt && !evt->FADhead)
1109 {
1110 // Try to get memory from the big buffer
1111 PEVNT_HEADER *mem = (PEVNT_HEADER*)Memory::malloc();
1112 if (!mem)
1113 {
1114 // If this works properly, this is a hack which can be removed, or
1115 // replaced by a signal or dim message
1116 if (!rs->repmem)
1117 {
1118 factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
1119 rs->repmem = true;
1120 }
1121 continue;
1122 }
1123
1124 evt->initEvent(shared_ptr<PEVNT_HEADER>(mem, Memory::free));
1125 }
1126
1127 // ready to read next header
1128 rs->bufTyp = READ_STRUCT::kHeader;
1129 rs->bufLen = sizeof(PEVNT_HEADER);
1130 rs->bufPos = rs->B;
1131
1132 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1133 if (!evt)
1134 continue;
1135
1136 /*
1137 const int fad = (i/10)<<8)|(i%10);
1138 if (fad != rs->H.board_id)
1139 {
1140 factPrintf(MessageImp::kWarn, "Board ID mismatch. Expected %x, got %x", fad, rs->H.board_id);
1141 }*/
1142
1143 // This should never happen
1144 if (evt->board[rs->sockId] != -1)
1145 {
1146 factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
1147 evt->evNum, rs->sockId, rs->sockId, rs->len());
1148 // FIXME: What to do with the validity flag?
1149 continue; // Continue reading next header
1150 }
1151
1152 // Swap the data entries (board headers) from network to host order
1153 rs->swapData();
1154
1155 // Copy data from rd[i] to mBuffer[evID]
1156 copyData(*rs, evt.get());
1157
1158#ifdef COMPLETE_EVENTS
1159 // Do not read anmymore from this board until the whole event has been received
1160 rs->bufTyp = READ_STRUCT::kWait;
1161#endif
1162 // now we have stored a new board contents into Event structure
1163 evt->fEvent->NumBoards++;
1164 evt->board[rs->sockId] = rs->sockId;
1165 evt->nBoard++;
1166
1167#ifdef COMPLETE_EPOLL
1168 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
1169 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1170#endif
1171 // event not yet complete
1172 if (evt->nBoard < READ_STRUCT::activeSockets)
1173 continue;
1174
1175 // All previous events are now flagged as incomplete ("expired")
1176 // and will be removed. (This is a bit tricky, because pop_front()
1177 // would invalidate the current iterator if not done _after_ the increment)
1178 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1179 {
1180 const bool found = it->get()==evt.get();
1181 if (!found)
1182 reportIncomplete(*it, "expired");
1183 else
1184 primaryQueue.post(evt);
1185
1186 it++;
1187 evtCtrl.pop_front();
1188
1189 // We reached the current event, so we are done
1190 if (found)
1191 break;
1192 }
1193
1194#ifdef COMPLETE_EPOLL
1195 for (int j=0; j<40; j++)
1196 {
1197 epoll_event ev;
1198 ev.events = EPOLLIN;
1199 ev.data.ptr = &rd[j]; // user data (union: ev.ptr)
1200 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
1201 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1202 }
1203#endif
1204
1205#ifdef COMPLETE_EVENTS
1206 for (int j=0; j<40; j++)
1207 {
1208 //if (rs->bufTyp==READ_STRUCT::kWait)
1209 {
1210 rs->bufTyp = READ_STRUCT::kHeader;
1211 rs->bufLen = sizeof(PEVNT_HEADER);
1212 rs->bufPos = rs->B;
1213 }
1214 }
1215#endif
1216 } // end for loop over all sockets
1217
1218 // ==================================================================
1219
1220 gj.bufNew = evtCtrl.size(); //# incomplete events in buffer
1221 gj.bufEvt = primaryQueue.size(); //# complete events in buffer
1222 gj.bufTot = gj.bufNew+gj.bufEvt; //# total events currently in buffer
1223 if (gj.bufNew>gj.maxEvt) //# maximum events in buffer past cycle
1224 gj.maxEvt = gj.bufNew;
1225
1226 // ==================================================================
1227
1228 const time_t actTime = time(NULL);
1229 if (actTime == gi_SecTime)
1230 {
1231#if !defined(USE_SELECT) && !defined(USE_EPOLL)
1232 if (evtCtrl.empty())
1233 usleep(1);
1234#endif
1235 continue;
1236 }
1237 gi_SecTime = actTime;
1238
1239 // ==================================================================
1240 //loop over all active events and flag those older than read-timeout
1241 //delete those that are written to disk ....
1242
1243 // This could be improved having the pointer which separates the queue with
1244 // the incomplete events from the queue with the complete events
1245 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1246 {
1247 // A reference is enough because the shared_ptr is hold by the evtCtrl
1248 const shared_ptr<EVT_CTRL2> &evt = *it;
1249
1250 // The first event is the oldest. If the first event within the
1251 // timeout window was received, we can stop searchinf further.
1252 if (evt->time.tv_sec>=actTime - 30)
1253 break;
1254
1255 // This will result in the emission of a dim service.
1256 // It doesn't matter if that takes comparably long,
1257 // because we have to stop the run anyway.
1258 const uint64_t rep = reportIncomplete(evt, "timeout");
1259 factReportIncomplete(rep);
1260
1261 it++;
1262 evtCtrl.pop_front();
1263 }
1264
1265 // If nothing was received for more than 5min, close file
1266 if (actTime-actrun->lastTime>300)
1267 actrun->closeRequest |= kRequestTimeout;
1268
1269 // =================================================================
1270
1271 gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
1272 gj.usdMem = Memory::max_inuse;
1273 gj.totMem = Memory::allocated;
1274
1275 gj.deltaT = 1000; // temporary, must be improved
1276
1277 for (int ib=0; ib<NBOARDS; ib++)
1278 {
1279 gj.rateBytes[ib] = rd[ib].rateBytes;
1280 gj.totBytes[ib] += rd[ib].rateBytes;
1281
1282 if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
1283 actrun->closeRequest |= kRequestConnectionChange;
1284
1285 gi_NumConnect[ib] = rd[ib].connected;
1286 gj.numConn[ib] = rd[ib].connected;
1287 }
1288
1289
1290 factStat(gj);
1291
1292 Memory::max_inuse = 0;
1293 gj.maxEvt = 0;
1294 for (int ib=0; ib<NBOARDS; ib++)
1295 rd[ib].rateBytes = 0;
1296
1297 // =================================================================
1298
1299 // This is a fake event to trigger possible run-closing conditions once a second
1300 // FIXME: This is not yet ideal because a file would never be closed
1301 // if a new file has been started and no events of the new file
1302 // have been received yet
1303 if (actrun->fileStat==kFileOpen)
1304 primaryQueue.post(shared_ptr<EVT_CTRL2>(new EVT_CTRL2(actrun)));
1305 }
1306
1307 // 1: Stop, wait for event to get processed
1308 // 2: Stop, finish immediately
1309 // 101: Restart, wait for events to get processed
1310 // 101: Restart, finish immediately
1311 //
1312 const int gi_reset = g_reset;
1313
1314 const bool abort = gi_reset%100==2;
1315
1316 factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
1317
1318 primaryQueue.wait(abort);
1319 secondaryQueue.wait(abort);
1320 processingQueue1.wait(abort);
1321
1322 // Here we also destroy all runCtrl structures and hence close all open files
1323 evtCtrl.clear();
1324
1325 factPrintf(MessageImp::kInfo, "Exit read Process...");
1326 factPrintf(MessageImp::kInfo, "%ld Bytes flagged as in-use.", Memory::inuse);
1327
1328 factStat(gj);
1329
1330 return gi_reset>=100;
1331}
1332
1333// ==========================================================================
1334// ==========================================================================
1335
1336void StartEvtBuild()
1337{
1338 factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
1339
1340
1341 for (int k=0; k<NBOARDS; k++)
1342 {
1343 gi_NumConnect[k] = 0;
1344 gj.numConn[k] = 0;
1345 gj.totBytes[k] = 0;
1346 }
1347
1348 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
1349 gj.maxMem = gj.xxxMem = 0;
1350
1351 gj.usdMem = Memory::inuse;
1352 gj.totMem = Memory::allocated;
1353
1354 gj.bufNew = gj.bufEvt = 0;
1355 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
1356 gj.readStat = gj.procStat = gj.writStat = 0;
1357
1358
1359
1360 READ_STRUCT rd[NBOARDS];
1361
1362 // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets)
1363 for (int i=0; i<NBOARDS; i++)
1364 rd[i].sockId = i;
1365
1366 while (mainloop(rd));
1367
1368 //must close all open sockets ...
1369 factPrintf(MessageImp::kInfo, "Close all sockets...");
1370
1371 READ_STRUCT::close();
1372
1373 // Now all sockets get closed. This is not reflected in gi_NumConnect
1374 // The current workaround is to count all sockets as closed when the thread is not running
1375}
Note: See TracBrowser for help on using the repository browser.