source: branches/FACT++_lidctrl_usb/src/EventBuilder.cc@ 18725

Last change on this file since 18725 was 18616, checked in by tbretz, 8 years ago
Issue a message when the thread has been shut down.
File size: 47.3 KB
Line 
1#include <poll.h>
2#include <sys/time.h>
3#include <sys/epoll.h>
4#include <netinet/tcp.h>
5
6#include <cstring>
7#include <cstdarg>
8#include <list>
9#include <queue>
10
11#include <boost/algorithm/string/join.hpp>
12
13#include "../externals/Queue.h"
14
15#include "MessageImp.h"
16#include "EventBuilder.h"
17#include "HeadersFAD.h"
18
19using namespace std;
20
21#define MIN_LEN 32 // min #bytes needed to interpret FADheader
22#define MAX_LEN 81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092 (data+boardheader+eventheader+endflag)
23
24//#define COMPLETE_EVENTS
25//#define USE_POLL
26//#define USE_EPOLL
27//#define USE_SELECT
28//#define COMPLETE_EPOLL
29//#define PRIORITY_QUEUE
30
31// Reading only 1024: 13: 77Hz, 87%
32// Reading only 1024: 12: 78Hz, 46%
33// Reading only 300: 4: 250Hz, 92%
34// Reading only 300: 3: 258Hz, 40%
35
36// Reading only four threads 1024: 13: 77Hz, 60%
37// Reading only four threads 1024: 12: 78Hz, 46%
38// Reading only four threads 300: 4: 250Hz, 92%
39// Reading only four threads 300: 3: 258Hz, 40%
40
41// Default 300: 4: 249Hz, 92%
42// Default 300: 3: 261Hz, 40%
43// Default 1024: 13: 76Hz, 93%
44// Default 1024: 12: 79Hz, 46%
45
46// Poll [selected] 1024: 13: 63Hz, 45%
47// Poll [selected] 1024: 14: 63Hz, 63%
48// Poll [selected] 1024: 15: 64Hz, 80%
49// Poll [selected] 300: 4: 230Hz, 47%
50// Poll [selected] 300: 3: 200Hz, 94%
51
52// Poll [all] 1024: 13: 65Hz, 47%
53// Poll [all] 1024: 14: 64Hz, 59%
54// Poll [all] 1024: 15: 62Hz, 67%
55// Poll [all] 300: 4: 230Hz, 47%
56// Poll [all] 300: 3: 230Hz, 35%
57
58// ==========================================================================
59
60bool runOpen(const EVT_CTRL2 &evt);
61bool runWrite(const EVT_CTRL2 &evt);
62void runClose(const EVT_CTRL2 &run);
63void applyCalib(const EVT_CTRL2 &evt, const size_t &size);
64void factOut(int severity, const char *message);
65void factReportIncomplete (uint64_t rep);
66void gotNewRun(RUN_CTRL2 &run);
67void runFinished();
68void factStat(const GUI_STAT &gj);
69bool eventCheck(const EVT_CTRL2 &evt);
70void debugHead(void *buf);
71
72// ==========================================================================
73
74int g_reset;
75
76size_t g_maxMem; //maximum memory allowed for buffer
77
78uint16_t g_evtTimeout; // timeout (sec) for one event
79
80FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
81
82uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
83
84GUI_STAT gj;
85
86// ==========================================================================
87
88namespace Memory
89{
90 uint64_t inuse = 0;
91 uint64_t allocated = 0;
92
93 uint64_t max_inuse = 0;
94
95 std::mutex mtx;
96
97 std::forward_list<void*> memory;
98
99 void *malloc()
100 {
101 // No free slot available, next alloc would exceed max memory
102 if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem)
103 return NULL;
104
105 // We will return this amount of memory
106 // This is not 100% thread safe, but it is not a super accurate measure anyway
107 inuse += MAX_TOT_MEM;
108 if (inuse>max_inuse)
109 max_inuse = inuse;
110
111 if (memory.empty())
112 {
113 // No free slot available, allocate a new one
114 allocated += MAX_TOT_MEM;
115 return new char[MAX_TOT_MEM];
116 }
117
118 // Get the next free slot from the stack and return it
119 const std::lock_guard<std::mutex> lock(mtx);
120
121 void *mem = memory.front();
122 memory.pop_front();
123 return mem;
124 };
125
126 void free(void *mem)
127 {
128 if (!mem)
129 return;
130
131 // Decrease the amont of memory in use accordingly
132 inuse -= MAX_TOT_MEM;
133
134 // If the maximum memory has changed, we might be over the limit.
135 // In this case: free a slot
136 if (allocated>g_maxMem)
137 {
138 delete [] (char*)mem;
139 allocated -= MAX_TOT_MEM;
140 return;
141 }
142
143 const std::lock_guard<std::mutex> lock(mtx);
144 memory.push_front(mem);
145 }
146
147};
148
149// ==========================================================================
150
151void factPrintf(int severity, const char *fmt, ...)
152{
153 char str[1000];
154
155 va_list ap;
156 va_start(ap, fmt);
157 vsnprintf(str, 1000, fmt, ap);
158 va_end(ap);
159
160 factOut(severity, str);
161}
162
163// ==========================================================================
164
165struct READ_STRUCT
166{
167 enum buftyp_t
168 {
169 kStream,
170 kHeader,
171 kData,
172#ifdef COMPLETE_EVENTS
173 kWait
174#endif
175 };
176
177 // ---------- connection ----------
178
179 static uint activeSockets;
180
181 int sockId; // socket id (board number)
182 int socket; // socket handle
183 bool connected; // is this socket connected?
184
185 struct sockaddr_in SockAddr; // Socket address copied from wrapper during socket creation
186
187 // ------------ epoll -------------
188
189 static int fd_epoll;
190 static epoll_event events[NBOARDS];
191
192 static void init();
193 static void close();
194 static int wait();
195 static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); }
196
197 // ------------ buffer ------------
198
199 buftyp_t bufTyp; // what are we reading at the moment: 0=header 1=data -1=skip ...
200
201 uint32_t bufLen; // number of bytes left to read
202 uint8_t *bufPos; // next byte to read to the buffer next
203
204 union
205 {
206 uint8_t B[MAX_LEN];
207 uint16_t S[MAX_LEN / 2];
208 uint32_t I[MAX_LEN / 4];
209 uint64_t L[MAX_LEN / 8];
210 PEVNT_HEADER H;
211 };
212
213 timeval time;
214 uint64_t totBytes; // total received bytes
215 uint64_t relBytes; // total released bytes
216 uint32_t skip; // number of bytes skipped before start of event
217
218 uint32_t len() const { return uint32_t(H.package_length)*2; }
219
220 void swapHeader();
221 void swapData();
222
223 // --------------------------------
224
225 READ_STRUCT() : socket(-1), connected(false), totBytes(0), relBytes(0)
226 {
227 if (fd_epoll<0)
228 init();
229 }
230 ~READ_STRUCT()
231 {
232 destroy();
233 }
234
235 void destroy();
236 bool create(sockaddr_in addr);
237 bool check(int, sockaddr_in addr);
238 bool read();
239
240};
241
242#ifdef PRIORITY_QUEUE
243struct READ_STRUCTcomp
244{
245 bool operator()(const READ_STRUCT *r1, const READ_STRUCT *r2)
246 {
247 const int64_t rel1 = r1->totBytes - r1->relBytes;
248 const int64_t rel2 = r2->totBytes - r2->relBytes;
249 return rel1 > rel2;
250 }
251};
252#endif
253
254int READ_STRUCT::wait()
255{
256 // wait for something to do...
257 const int rc = epoll_wait(fd_epoll, events, NBOARDS, 100); // max, timeout[ms]
258 if (rc>=0)
259 return rc;
260
261 if (errno==EINTR) // timout or signal interruption
262 return 0;
263
264 factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno);
265 return -1;
266}
267
268uint READ_STRUCT::activeSockets = 0;
269int READ_STRUCT::fd_epoll = -1;
270epoll_event READ_STRUCT::events[NBOARDS];
271
272void READ_STRUCT::init()
273{
274 if (fd_epoll>=0)
275 return;
276
277#ifdef USE_EPOLL
278 fd_epoll = epoll_create(NBOARDS);
279 if (fd_epoll<0)
280 {
281 factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno);
282 return;
283 }
284#endif
285}
286
287void READ_STRUCT::close()
288{
289#ifdef USE_EPOLL
290 if (fd_epoll>=0 && ::close(fd_epoll)>0)
291 factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno);
292#endif
293
294 fd_epoll = -1;
295}
296
297bool READ_STRUCT::create(sockaddr_in sockAddr)
298{
299 if (socket>=0)
300 return false;
301
302 const int port = ntohs(sockAddr.sin_port) + 1;
303
304 SockAddr.sin_family = sockAddr.sin_family;
305 SockAddr.sin_addr = sockAddr.sin_addr;
306 SockAddr.sin_port = htons(port);
307
308 if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
309 {
310 factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno);
311 socket = -1;
312 return false;
313 }
314
315 int optval = 1;
316 if (setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(int)) < 0)
317 factPrintf(MessageImp::kInfo, "Setting TCP_NODELAY for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
318
319 optval = 1;
320 if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0)
321 factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
322
323 optval = 10; //start after 10 seconds
324 if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0)
325 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
326
327 optval = 10; //do every 10 seconds
328 if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0)
329 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
330
331 optval = 2; //close after 2 unsuccessful tries
332 if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0)
333 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
334
335 factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket);
336
337 //connected = false;
338 activeSockets++;
339
340 return true;
341}
342
343void READ_STRUCT::destroy()
344{
345 if (socket<0)
346 return;
347
348#ifdef USE_EPOLL
349 // strictly speaking this should not be necessary
350 if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
351 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
352#endif
353
354 if (::close(socket) > 0)
355 factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
356 else
357 factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket);
358
359 // Set the socket to "not connected"
360 socket = -1;
361 connected = false;
362 activeSockets--;
363 bufLen = 0;
364}
365
366bool READ_STRUCT::check(int sockDef, sockaddr_in addr)
367{
368 // Continue in the most most likely case (performance)
369 //if (socket>=0 && sockDef!=0 && connected)
370 // return;
371 const int old = socket;
372
373 // socket open, but should not be open
374 if (socket>=0 && sockDef==0)
375 destroy();
376
377 // Socket closed, but should be open
378 if (socket<0 && sockDef!=0)
379 create(addr); //generate address and socket
380
381 const bool retval = old!=socket;
382
383 // Socket closed
384 if (socket<0)
385 return retval;
386
387 // Socket open and connected: Nothing to do
388 if (connected)
389 return retval;
390
391 //try to connect if not yet done
392 const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
393 if (rc == -1)
394 return retval;
395
396 connected = true;
397
398 if (sockDef<0)
399 {
400 bufTyp = READ_STRUCT::kStream; // full data to be skipped
401 bufLen = MAX_LEN; // huge for skipping
402 }
403 else
404 {
405 bufTyp = READ_STRUCT::kHeader; // expect a header
406 bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
407 }
408
409 bufPos = B; // no byte read so far
410 skip = 0; // start empty
411 totBytes = 0;
412 relBytes = 0;
413
414 factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
415
416#ifdef USE_EPOLL
417 epoll_event ev;
418 ev.events = EPOLLIN;
419 ev.data.ptr = this; // user data (union: ev.ptr)
420 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
421 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
422#endif
423
424 return retval;
425}
426
427bool READ_STRUCT::read()
428{
429 if (!connected)
430 return false;
431
432 if (bufLen==0)
433 return true;
434
435 const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
436 // recv failed
437 if (jrd<0)
438 {
439 // There was just nothing waiting
440 if (errno==EWOULDBLOCK || errno==EAGAIN)
441 return false;
442
443 factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno);
444 return false;
445 }
446
447 // connection was closed ...
448 if (jrd==0)
449 {
450 factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId);
451
452 destroy();//DestroySocket(rd[i]); //generate address and socket
453 return false;
454 }
455
456 totBytes += jrd;
457
458 // are we skipping this board ...
459 if (bufTyp==kStream)
460 return false;
461
462 if (bufPos==B)
463 gettimeofday(&time, NULL);
464
465 bufPos += jrd; //==> prepare for continuation
466 bufLen -= jrd;
467
468 // not yet all read
469 return bufLen==0;
470}
471
472void READ_STRUCT::swapHeader()
473{
474 S[1] = ntohs(S[1]); // package_length (bytes not swapped!)
475 S[2] = ntohs(S[2]); // version_no
476 S[3] = ntohs(S[3]); // PLLLCK
477 S[4] = ntohs(S[4]); // trigger_crc
478 S[5] = ntohs(S[5]); // trigger_type
479
480 I[3] = ntohl(I[3]); // trigger_id
481 I[4] = ntohl(I[4]); // fad_evt_counter
482 I[5] = ntohl(I[5]); // REFCLK_frequency
483
484 S[12] = ntohs(S[12]); // board id
485 S[13] = ntohs(S[13]); // adc_clock_phase_shift
486 S[14] = ntohs(S[14]); // number_of_triggers_to_generate
487 S[15] = ntohs(S[15]); // trigger_generator_prescaler
488
489 I[10] = ntohl(I[10]); // runnumber;
490 I[11] = ntohl(I[11]); // time;
491
492 // Use back inserter??
493 for (int s=24; s<24+NTemp+NDAC; s++)
494 S[s] = ntohs(S[s]); // drs_temperature / dac
495}
496
497void READ_STRUCT::swapData()
498{
499 // swapEventHeaderBytes: End of the header. to channels now
500
501 int i = 36;
502 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
503 {
504 S[i+0] = ntohs(S[i+0]);//id
505 S[i+1] = ntohs(S[i+1]);//start_cell
506 S[i+2] = ntohs(S[i+2]);//roi
507 S[i+3] = ntohs(S[i+3]);//filling
508
509 i += 4+S[i+2];//skip the pixel data
510 }
511}
512
513// ==========================================================================
514
515bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
516{
517 int xjr = -1;
518 int xkr = -1;
519
520 //points to the very first roi
521 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
522
523 roi[0] = ntohs(rd.S[roiPtr]);
524
525 for (int jr = 0; jr < 9; jr++)
526 {
527 roi[jr] = ntohs(rd.S[roiPtr]);
528
529 if (roi[jr]>1024)
530 {
531 factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]);
532 return false;
533 }
534
535 // Check that the roi of pixels jr are compatible with the one of pixel 0
536 if (jr!=8 && roi[jr]!=roi[0])
537 {
538 xjr = jr;
539 break;
540 }
541
542 // Check that the roi of all other DRS chips on boards are compatible
543 for (int kr = 1; kr < 4; kr++)
544 {
545 const int kroi = ntohs(rd.S[roiPtr]);
546 if (kroi != roi[jr])
547 {
548 xjr = jr;
549 xkr = kr;
550 break;
551 }
552 roiPtr += kroi+4;
553 }
554 }
555
556 if (xjr>=0)
557 {
558 if (xkr<0)
559 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
560 else
561 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr]));
562
563 return false;
564 }
565
566 if (roi[8] < roi[0])
567 {
568 factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
569 return false;
570 }
571
572 return true;
573}
574
575list<shared_ptr<EVT_CTRL2>> evtCtrl;
576
577shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
578{
579 /*
580 checkroi consistence
581 find existing entry
582 if no entry, try to allocate memory
583 if entry and memory, init event structure
584 */
585
586 uint16_t nRoi[9];
587 if (!checkRoiConsistency(rd, nRoi))
588 return shared_ptr<EVT_CTRL2>();
589
590 for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
591 {
592 // A reference is enough because the evtCtrl holds the shared_ptr anyway
593 const shared_ptr<EVT_CTRL2> &evt = *it;
594
595 // If the run is different, go on searching.
596 // We cannot stop searching if a lower run-id is found as in
597 // the case of the events, because theoretically, there
598 // can be the same run on two different days.
599 if (rd.H.runnumber != evt->runNum)
600 continue;
601
602 // If the ID of the new event if higher than the last one stored
603 // in that run, we have to assign a new slot (leave the loop)
604 if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/)
605 break;
606
607 if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/)
608 continue;
609
610 // We have found an entry with the same runID and evtID
611 // Check if ROI is consistent
612 if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
613 {
614 factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
615 evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
616 return shared_ptr<EVT_CTRL2>();
617 }
618
619 // It is maybe not likely, but the header of this board might have
620 // arrived earlier. (We could also update the run-info, but
621 // this should not make a difference here)
622 if ((rd.time.tv_sec==evt->time.tv_sec && rd.time.tv_usec<evt->time.tv_usec) ||
623 rd.time.tv_sec<evt->time.tv_sec)
624 evt->time = rd.time;
625
626 //everything seems fine so far ==> use this slot ....
627 return evt;
628 }
629
630 if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
631 {
632 factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
633 actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter);
634 return shared_ptr<EVT_CTRL2>();
635 }
636
637 EVT_CTRL2 *evt = new EVT_CTRL2;
638
639 evt->time = rd.time;
640
641 evt->runNum = rd.H.runnumber;
642 evt->evNum = rd.H.fad_evt_counter;
643
644 evt->trgNum = rd.H.trigger_id;
645 evt->trgTyp = rd.H.trigger_type;
646
647 evt->nRoi = nRoi[0];
648 evt->nRoiTM = nRoi[8];
649
650 //evt->firstBoard = rd.sockId;
651
652 const bool newrun = actrun->runId != rd.H.runnumber;
653 if (newrun)
654 {
655 // Since we have started a new run, we know already when to close the
656 // previous run in terms of number of events
657 actrun->maxEvt = actrun->lastEvt;
658
659 factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d",
660 rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
661
662 // The new run is the active run now
663 actrun = make_shared<RUN_CTRL2>();
664
665 const time_t &tsec = evt->time.tv_sec;
666
667 actrun->openTime = tsec;
668 actrun->closeTime = tsec + 3600 * 24; // max time allowed
669 actrun->runId = rd.H.runnumber;
670 actrun->roi0 = nRoi[0]; // FIXME: Make obsolete!
671 actrun->roi8 = nRoi[8]; // FIXME: Make obsolete!
672
673 // Signal the fadctrl that a new run has been started
674 // Note this is the only place at which we can ensure that
675 // gotnewRun is called only once
676 gotNewRun(*actrun);
677 }
678
679 // Keep pointer to run of this event
680 evt->runCtrl = actrun;
681
682 // Increase the number of events we have started to receive in this run
683 actrun->lastTime = evt->time.tv_sec; // Time when the last event was received
684 actrun->lastEvt++;
685
686 // An event can be the first and the last, but not the last and the first.
687 // Therefore gotNewRun is called before runFinished.
688 // runFinished signals that the last event of a run was just received. Processing
689 // might still be ongoing, but we can start a new run.
690 const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached
691 const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached
692 if (!cond1 || !cond2)
693 runFinished();
694
695 // We don't mind here that this is not common to all events,
696 // because every coming event will fullfil the condition as well.
697 if (!cond1)
698 evt->closeRequest |= kRequestMaxEvtsReached;
699 if (!cond2)
700 evt->closeRequest |= kRequestMaxTimeReached;
701
702 // Secure access to evtCtrl against access in CloseRunFile
703 // This should be the last... otherwise we can run into threading issues
704 // if the event is accessed before it is fully initialized.
705 evtCtrl.emplace_back(evt);
706 return evtCtrl.back();
707}
708
709
710void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
711{
712 const int i = rBuf.sockId;
713
714 memcpy(evt->FADhead+i, &rBuf.H, sizeof(PEVNT_HEADER));
715
716 int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts
717
718 // consistency of ROIs have been checked already (is it all correct?)
719 const uint16_t &roi = rBuf.S[src+2];
720
721 // different sort in FAD board.....
722 EVENT *event = evt->fEvent;
723 for (int px = 0; px < 9; px++)
724 {
725 for (int drs = 0; drs < 4; drs++)
726 {
727 const int16_t pixC = rBuf.S[src+1]; // start-cell
728 const int16_t pixR = rBuf.S[src+2]; // roi
729 //here we should check if pixH is correct ....
730
731 const int pixS = i*36 + drs*9 + px;
732
733 event->StartPix[pixS] = pixC;
734
735 memcpy(event->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2);
736
737 src += 4+pixR;
738
739 // Treatment for ch 9 (TM channel)
740 if (px != 8)
741 continue;
742
743 const int tmS = i*4 + drs;
744
745 //and we have additional TM info
746 if (pixR > roi)
747 {
748 event->StartTM[tmS] = (pixC + pixR - roi) % 1024;
749
750 memcpy(event->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2);
751 }
752 else
753 {
754 event->StartTM[tmS] = -1;
755 }
756 }
757 }
758}
759
760// ==========================================================================
761
762uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt)
763{
764 factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)",
765 evt->runNum, evt->evNum, evtCtrl.size(), txt);
766
767 uint64_t report = 0;
768
769 char str[1000];
770
771 int ik=0;
772 for (int ib=0; ib<NBOARDS; ib++)
773 {
774 if (ib%10==0)
775 str[ik++] = '|';
776
777 const int jb = evt->board[ib];
778 if (jb>=0) // data received from that board
779 {
780 str[ik++] = '0'+(jb%10);
781 continue;
782 }
783
784 // FIXME: This is not synchronous... it reports
785 // accoridng to the current connection status, not w.r.t. to the
786 // one when the event was taken.
787 if (gi_NumConnect[ib]==0) // board not connected
788 {
789 str[ik++] = 'x';
790 continue;
791 }
792
793 // data from this board lost
794 str[ik++] = '.';
795 report |= ((uint64_t)1)<<ib;
796 }
797
798 str[ik++] = '|';
799 str[ik] = 0;
800
801 factOut(MessageImp::kWarn, str);
802
803 return report;
804}
805
806// ==========================================================================
807// ==========================================================================
808
809bool proc1(const shared_ptr<EVT_CTRL2> &);
810
811Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&proc1, placeholders::_1));
812
813bool proc1(const shared_ptr<EVT_CTRL2> &evt)
814{
815 applyCalib(*evt, processingQueue1.size());
816 return true;
817}
818
819// If this is not convenient anymore, it could be replaced by
820// a command queue, to which command+data is posted,
821// (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo)
822bool writeEvt(const shared_ptr<EVT_CTRL2> &evt)
823{
824 //const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
825 RUN_CTRL2 &run = *evt->runCtrl;
826
827 // Is this a valid event or just an empty event to trigger run close?
828 // If this is not an empty event open the new run-file
829 // Empty events are there to trigger run-closing conditions
830 if (evt->valid())
831 {
832 // File not yet open
833 if (run.fileStat==kFileNotYetOpen)
834 {
835 // runOpen will close a previous run, if still open
836 if (!runOpen(*evt))
837 {
838 factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
839 run.fileStat = kFileClosed;
840 return true;
841 }
842
843 factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
844 run.fileStat = kFileOpen;
845 }
846
847 // Here we have a valid calibration and can go on with that.
848 // It is important that _all_ events are sent for calibration (except broken ones)
849 processingQueue1.post(evt);
850 }
851
852 // File already closed
853 if (run.fileStat==kFileClosed)
854 return true;
855
856 // If we will have a software trigger which prevents single events from writing,
857 // the logic of writing the stop time and the trigger counters need to be adapted.
858 // Currently it is just the values of the last valid event.
859 bool rc1 = true;
860 if (evt->valid())
861 {
862 rc1 = runWrite(*evt);
863 if (!rc1)
864 factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
865 }
866
867 // File not open... no need to close or to check for close
868 // ... this is the case if CloseRunFile was called before any file was opened.
869 if (run.fileStat!=kFileOpen)
870 return true;
871
872 // File is not yet to be closed.
873 if (rc1 && evt->closeRequest==kRequestNone)
874 return true;
875
876 runClose(*evt);
877 run.fileStat = kFileClosed;
878
879 vector<string> reason;
880 if (evt->closeRequest&kRequestManual)
881 reason.emplace_back("close requested");
882 if (evt->closeRequest&kRequestTimeout)
883 reason.emplace_back("receive timeout");
884 if (evt->closeRequest&kRequestConnectionChange)
885 reason.emplace_back("connection changed");
886 if (evt->closeRequest&kRequestEventCheckFailed)
887 reason.emplace_back("event check failed");
888 if (evt->closeRequest&kRequestMaxTimeReached)
889 reason.push_back(to_string(run.closeTime-run.openTime)+"s reached");
890 if (evt->closeRequest&kRequestMaxEvtsReached)
891 reason.push_back(to_string(run.maxEvt)+" evts reached");
892 if (!rc1)
893 reason.emplace_back("runWrite failed");
894
895 const string str = boost::algorithm::join(reason, ", ");
896 factPrintf(MessageImp::kInfo, "File closed because %s", str.c_str());
897
898 return true;
899}
900
901Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1));
902
903bool procEvt(const shared_ptr<EVT_CTRL2> &evt)
904{
905 RUN_CTRL2 &run = *evt->runCtrl;
906
907 bool check = true;
908 if (evt->valid())
909 {
910 EVENT *event = evt->fEvent;
911
912 // This is already done in initMemory()
913 //event->Roi = evt->runCtrl->roi0;
914 //event->RoiTM = evt->runCtrl->roi8;
915 //event->EventNum = evt->evNum;
916 //event->TriggerNum = evt->trgNum;
917 //event->TriggerType = evt->trgTyp;
918
919 event->NumBoards = evt->nBoard;
920
921 event->PCTime = evt->time.tv_sec;
922 event->PCUsec = evt->time.tv_usec;
923
924 for (int ib=0; ib<NBOARDS; ib++)
925 event->BoardTime[ib] = evt->FADhead[ib].time;
926
927 check = eventCheck(*evt);
928
929 // If the event is valid, increase the trigger counter accordingly
930 if (check)
931 {
932 // Physics trigger
933 if (evt->trgTyp && !(evt->trgTyp & FAD::EventHeader::kAll))
934 run.triggerCounter[0]++;
935 // Pure pedestal trigger
936 else if ((evt->trgTyp&FAD::EventHeader::kPedestal) && !(evt->trgTyp&FAD::EventHeader::kTIM))
937 run.triggerCounter[1]++;
938 // external light pulser trigger
939 else if (evt->trgTyp & FAD::EventHeader::kLPext)
940 run.triggerCounter[2]++;
941 // time calibration triggers
942 else if (evt->trgTyp & (FAD::EventHeader::kTIM|FAD::EventHeader::kPedestal))
943 run.triggerCounter[3]++;
944 // internal light pulser trigger
945 else if (evt->trgTyp & FAD::EventHeader::kLPint)
946 run.triggerCounter[4]++;
947 // external trigger input 1
948 else if (evt->trgTyp & FAD::EventHeader::kExt1)
949 run.triggerCounter[5]++;
950 // external trigger input 2
951 else if (evt->trgTyp & FAD::EventHeader::kExt2)
952 run.triggerCounter[6]++;
953 // other triggers
954 else
955 run.triggerCounter[7]++;
956 }
957 }
958
959 // If this is an invalid event, the current triggerCounter needs to be copied
960 // because runClose will use that one to update the TRIGGER_COUNTER.
961 // When closing the file, the trigger counter of the last successfully
962 // written event is used.
963 evt->triggerCounter = run.triggerCounter;
964
965 // If event check has failed, skip the event and post a close request instead.
966 // Otherwise, if file is open post the event for being written
967 if (!check)
968 secondaryQueue.emplace(new EVT_CTRL2(kRequestEventCheckFailed, evt->runCtrl));
969 else
970 secondaryQueue.post(evt);
971
972 return true;
973}
974
975// ==========================================================================
976// ==========================================================================
977
978/*
979 task 1-4:
980
981 lock1()-lock4();
982 while (1)
983 {
984 wait for signal [lockN]; // unlocked
985
986 while (n!=10)
987 wait sockets;
988 read;
989
990 lockM();
991 finished[n] = true;
992 signal(mainloop);
993 unlockM();
994 }
995
996
997 mainloop:
998
999 while (1)
1000 {
1001 lockM();
1002 while (!finished[0] || !finished[1] ...)
1003 wait for signal [lockM]; // unlocked... signals can be sent
1004 finished[0-1] = false;
1005 unlockM()
1006
1007 copy data to queue // locked
1008
1009 lockN[0-3];
1010 signalN[0-3];
1011 unlockN[0-3];
1012 }
1013
1014
1015 */
1016
1017/*
1018 while (g_reset)
1019 {
1020 shared_ptr<EVT_CTRL2> evt = new shared_ptr<>;
1021
1022 // Check that all sockets are connected
1023
1024 for (int i=0; i<40; i++)
1025 if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0)
1026 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1027
1028 while (g_reset)
1029 {
1030 if (READ_STRUCT::wait()<0)
1031 break;
1032
1033 if (rc_epoll==0)
1034 break;
1035
1036 for (int jj=0; jj<rc_epoll; jj++)
1037 {
1038 READ_STRUCT *rs = READ_STRUCT::get(jj);
1039 if (!rs->connected)
1040 continue;
1041
1042 const bool rc_read = rs->read();
1043 if (!rc_read)
1044 continue;
1045
1046 if (rs->bufTyp==READ_STRUCT::kHeader)
1047 {
1048 [...]
1049 }
1050
1051 [...]
1052
1053 if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
1054 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1055 }
1056
1057 if (once_a_second)
1058 {
1059 if (evt==timeout)
1060 break;
1061 }
1062 }
1063
1064 if (evt.nBoards==actBoards)
1065 primaryQueue.post(evt);
1066 }
1067*/
1068
1069Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
1070
1071// This corresponds more or less to fFile... should we merge both?
1072shared_ptr<RUN_CTRL2> actrun;
1073
1074void CloseRunFile()
1075{
1076 // Currently we need actrun here, to be able to set kFileClosed.
1077 // Apart from that we have to ensure that there is an open file at all
1078 // which we can close.
1079 // Submission to the primary queue ensures that the event
1080 // is placed at the right place in the processing chain.
1081 // (Corresponds to the correct run)
1082 primaryQueue.emplace(new EVT_CTRL2(kRequestManual, actrun));
1083}
1084
1085bool mainloop(READ_STRUCT *rd)
1086{
1087 factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
1088
1089 primaryQueue.start();
1090 secondaryQueue.start();
1091 processingQueue1.start();;
1092
1093 actrun = make_shared<RUN_CTRL2>();
1094
1095 //time in seconds
1096 time_t gi_SecTime = time(NULL)-1;
1097
1098 //loop until global variable g_runStat claims stop
1099 g_reset = 0;
1100 while (g_reset == 0)
1101 {
1102#ifdef USE_POLL
1103 int pp[40];
1104 int nn = 0;
1105 pollfd fds[40];
1106 for (int i=0; i<40; i++)
1107 {
1108 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1109 {
1110 fds[nn].fd = rd[i].socket;
1111 fds[nn].events = POLLIN;
1112 pp[nn] = i;
1113 nn++;
1114 }
1115 }
1116
1117 const int rc_epoll = poll(fds, nn, 100);
1118 if (rc_epoll<0)
1119 break;
1120#endif
1121
1122#ifdef USE_SELECT
1123 fd_set readfs;
1124 FD_ZERO(&readfs);
1125 int nfsd = 0;
1126 for (int i=0; i<NBOARDS; i++)
1127 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1128 {
1129 FD_SET(rd[i].socket, &readfs);
1130 if (rd[i].socket>nfsd)
1131 nfsd = rd[i].socket;
1132 }
1133
1134 timeval tv;
1135 tv.tv_sec = 0;
1136 tv.tv_usec = 100000;
1137 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
1138 // 0: timeout
1139 // -1: error
1140 if (rc_select<0)
1141 {
1142 factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
1143 continue;
1144 }
1145#endif
1146
1147#ifdef USE_EPOLL
1148 const int rc_epoll = READ_STRUCT::wait();
1149 if (rc_epoll<0)
1150 break;
1151#endif
1152
1153#ifdef PRIORITY_QUEUE
1154 priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio;
1155
1156 for (int i=0; i<NBOARDS; i++)
1157 if (rd[i].connected)
1158 prio.push(rd+i);
1159
1160 if (!prio.empty()) do
1161#endif
1162
1163
1164#ifdef USE_POLL
1165 for (int jj=0; jj<nn; jj++)
1166#endif
1167#ifdef USE_EPOLL
1168 for (int jj=0; jj<rc_epoll; jj++)
1169#endif
1170#if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE)
1171 for (int jj=0; jj<NBOARDS; jj++)
1172#endif
1173 {
1174#ifdef PRIORITY_QUEUE
1175 READ_STRUCT *rs = prio.top();
1176#endif
1177#ifdef USE_SELECT
1178 if (!FD_ISSET(rs->socket, &readfs))
1179 continue;
1180#endif
1181
1182#ifdef USE_POLL
1183 if ((fds[jj].revents&POLLIN)==0)
1184 continue;
1185#endif
1186
1187#ifdef USE_EPOLL
1188 // FIXME: How to get i?
1189 READ_STRUCT *rs = READ_STRUCT::get(jj);
1190#endif
1191
1192#ifdef USE_POLL
1193 // FIXME: How to get i?
1194 READ_STRUCT *rs = &rd[pp[jj]];
1195#endif
1196
1197#if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE)
1198 const int i = (jj%4)*10 + (jj/4);
1199 READ_STRUCT *rs = &rd[i];
1200#endif
1201
1202#ifdef COMPLETE_EVENTS
1203 if (rs->bufTyp==READ_STRUCT::kWait)
1204 continue;
1205#endif
1206
1207 // ==================================================================
1208
1209 const bool rc_read = rs->read();
1210
1211 // Connect might have gotten closed during read
1212 gi_NumConnect[rs->sockId] = rs->connected;
1213 gj.numConn[rs->sockId] = rs->connected;
1214
1215 // Read either failed or disconnected, or the buffer is not yet full
1216 if (!rc_read)
1217 continue;
1218
1219 // ==================================================================
1220
1221 if (rs->bufTyp==READ_STRUCT::kHeader)
1222 {
1223 //check if startflag correct; else shift block ....
1224 // FIXME: This is not enough... this combination of
1225 // bytes can be anywhere... at least the end bytes
1226 // must be checked somewhere, too.
1227 uint k;
1228 for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
1229 {
1230 if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
1231 break;
1232 }
1233 rs->skip += k;
1234
1235 //no start of header found
1236 if (k==sizeof(PEVNT_HEADER)-1)
1237 {
1238 rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1];
1239 rs->bufPos = rs->B+1;
1240 rs->bufLen = sizeof(PEVNT_HEADER)-1;
1241 continue;
1242 }
1243
1244 if (k > 0)
1245 {
1246 memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
1247
1248 rs->bufPos -= k;
1249 rs->bufLen += k;
1250
1251 continue; // We need to read more (bufLen>0)
1252 }
1253
1254 if (rs->skip>0)
1255 {
1256 factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
1257 rs->skip = 0;
1258 }
1259
1260 // Swap the header entries from network to host order
1261 rs->swapHeader();
1262
1263 rs->bufTyp = READ_STRUCT::kData;
1264 rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
1265
1266 debugHead(rs->B); // i and fadBoard not used
1267
1268 continue;
1269 }
1270
1271 const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
1272 if (end != 0xfe04)
1273 {
1274 factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
1275 rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
1276
1277 // ready to read next header
1278 rs->bufTyp = READ_STRUCT::kHeader;
1279 rs->bufLen = sizeof(PEVNT_HEADER);
1280 rs->bufPos = rs->B;
1281 // FIXME: What to do with the validity flag?
1282 continue;
1283 }
1284
1285 // get index into mBuffer for this event (create if needed)
1286 const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
1287
1288 // We have a valid entry, but no memory has yet been allocated
1289 if (evt && !evt->initMemory())
1290 {
1291 const time_t tm = time(NULL);
1292 if (evt->runCtrl->reportMem==tm)
1293 continue;
1294
1295 factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
1296 evt->runCtrl->reportMem = tm;
1297 continue;
1298 }
1299
1300 // ready to read next header
1301 rs->bufTyp = READ_STRUCT::kHeader;
1302 rs->bufLen = sizeof(PEVNT_HEADER);
1303 rs->bufPos = rs->B;
1304
1305 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1306 if (!evt)
1307 continue;
1308
1309 // This should never happen
1310 if (evt->board[rs->sockId] != -1)
1311 {
1312 factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
1313 evt->evNum, rs->sockId, jj, rs->len());
1314 // FIXME: What to do with the validity flag?
1315 continue; // Continue reading next header
1316 }
1317
1318 // Swap the data entries (board headers) from network to host order
1319 rs->swapData();
1320
1321 // Copy data from rd[i] to mBuffer[evID]
1322 copyData(*rs, evt.get());
1323
1324#ifdef COMPLETE_EVENTS
1325 // Do not read anmymore from this board until the whole event has been received
1326 rs->bufTyp = READ_STRUCT::kWait;
1327#endif
1328 // now we have stored a new board contents into Event structure
1329 evt->board[rs->sockId] = rs->sockId;
1330 evt->header = evt->FADhead+rs->sockId;
1331 evt->nBoard++;
1332
1333#ifdef COMPLETE_EPOLL
1334 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
1335 {
1336 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1337 break;
1338 }
1339#endif
1340 // event not yet complete
1341 if (evt->nBoard < READ_STRUCT::activeSockets)
1342 continue;
1343
1344 // All previous events are now flagged as incomplete ("expired")
1345 // and will be removed. (This is a bit tricky, because pop_front()
1346 // would invalidate the current iterator if not done _after_ the increment)
1347 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1348 {
1349 const bool found = it->get()==evt.get();
1350 if (!found)
1351 reportIncomplete(*it, "expired");
1352 else
1353 primaryQueue.post(evt);
1354
1355 // package_len is 0 if nothing was received.
1356 for (int ib=0; ib<40; ib++)
1357 rd[ib].relBytes += uint32_t((*it)->FADhead[ib].package_length)*2;
1358
1359 // The counter must be increased _before_ the pop_front,
1360 // otherwise the counter is invalidated by the pop_front!
1361 it++;
1362 evtCtrl.pop_front();
1363
1364 // We reached the current event, so we are done
1365 if (found)
1366 break;
1367 }
1368
1369#ifdef COMPLETE_EPOLL
1370 for (int j=0; j<40; j++)
1371 {
1372 epoll_event ev;
1373 ev.events = EPOLLIN;
1374 ev.data.ptr = &rd[j]; // user data (union: ev.ptr)
1375 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
1376 {
1377 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1378 return;
1379 }
1380 }
1381#endif
1382
1383#ifdef COMPLETE_EVENTS
1384 for (int j=0; j<40; j++)
1385 {
1386 //if (rs->bufTyp==READ_STRUCT::kWait)
1387 {
1388 rs->bufTyp = READ_STRUCT::kHeader;
1389 rs->bufLen = sizeof(PEVNT_HEADER);
1390 rs->bufPos = rs->B;
1391 }
1392 }
1393#endif
1394 } // end for loop over all sockets
1395#ifdef PRIORITY_QUEUE
1396 while (0); // convert continue into break ;)
1397#endif
1398
1399 // ==================================================================
1400
1401 const time_t actTime = time(NULL);
1402 if (actTime == gi_SecTime)
1403 {
1404#if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL)
1405 if (evtCtrl.empty())
1406 usleep(actTime-actrun->lastTime>300 ? 10000 : 1);
1407#endif
1408 continue;
1409 }
1410 gi_SecTime = actTime;
1411
1412 // ==================================================================
1413 //loop over all active events and flag those older than read-timeout
1414 //delete those that are written to disk ....
1415
1416 // This could be improved having the pointer which separates the queue with
1417 // the incomplete events from the queue with the complete events
1418 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1419 {
1420 // A reference is enough because the shared_ptr is hold by the evtCtrl
1421 const shared_ptr<EVT_CTRL2> &evt = *it;
1422
1423 // The first event is the oldest. If the first event within the
1424 // timeout window was received, we can stop searching further.
1425 if (evt->time.tv_sec+g_evtTimeout>=actTime)
1426 break;
1427
1428 // The counter must be increased _before_ the pop_front,
1429 // otherwise the counter is invalidated by the pop_front!
1430 it++;
1431
1432 // This timeout is caused because complete data from one or more
1433 // boards has been received, but the memory could not be allocated.
1434 // There is no reason why we should not go on waiting for
1435 // memory to become free. However, the FADs will disconnect
1436 // after 60s due to their keep-alive timeout, but the event builder
1437 // will still wait for memory to become available.
1438 // Currently, the only possibility to free the memory from the
1439 // evtCtrl to restart the event builder (STOP/START).
1440 if (!evt->valid())
1441 continue;
1442
1443 // This will result in the emission of a dim service.
1444 // It doesn't matter if that takes comparably long,
1445 // because we have to stop the run anyway.
1446 const uint64_t rep = reportIncomplete(evt, "timeout");
1447 factReportIncomplete(rep);
1448
1449 // At least the data from one boards is complete...
1450 // package_len is 0 when nothing was received from this board
1451 for (int ib=0; ib<40; ib++)
1452 rd[ib].relBytes += uint32_t(evt->FADhead[ib].package_length)*2;
1453
1454 evtCtrl.pop_front();
1455 }
1456
1457 // =================================================================
1458
1459 gj.bufNew = evtCtrl.size(); //# incomplete events in buffer
1460 gj.bufEvt = primaryQueue.size(); //# complete events in buffer
1461 gj.bufWrite = secondaryQueue.size(); //# complete events in buffer
1462 gj.bufProc = processingQueue1.size(); //# complete events in buffer
1463 gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
1464 gj.usdMem = Memory::max_inuse;
1465 gj.totMem = Memory::allocated;
1466 gj.maxMem = g_maxMem;
1467
1468 gj.deltaT = 1000; // temporary, must be improved
1469
1470 bool changed = false;
1471
1472 static vector<uint64_t> store(NBOARDS);
1473
1474 for (int ib=0; ib<NBOARDS; ib++)
1475 {
1476 gj.rateBytes[ib] = store[ib]>rd[ib].totBytes ? rd[ib].totBytes : rd[ib].totBytes-store[ib];
1477 gj.relBytes[ib] = rd[ib].totBytes-rd[ib].relBytes;
1478
1479 store[ib] = rd[ib].totBytes;
1480
1481 if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
1482 changed = true;
1483
1484 gi_NumConnect[ib] = rd[ib].connected;
1485 gj.numConn[ib] = rd[ib].connected;
1486 }
1487
1488 factStat(gj);
1489
1490 Memory::max_inuse = 0;
1491
1492 // =================================================================
1493
1494 // This is a fake event to trigger possible run-closing conditions once a second
1495 // FIXME: This is not yet ideal because a file would never be closed
1496 // if a new file has been started and no events of the new file
1497 // have been received yet
1498 int request = kRequestNone;
1499
1500 // If nothing was received for more than 5min, close file
1501 if (actTime-actrun->lastTime>300)
1502 request |= kRequestTimeout;
1503
1504 // If connection status has changed
1505 if (changed)
1506 request |= kRequestConnectionChange;
1507
1508 if (request!=kRequestNone)
1509 runFinished();
1510
1511 if (actrun->fileStat==kFileOpen)
1512 primaryQueue.emplace(new EVT_CTRL2(request, actrun));
1513 }
1514
1515 // 1: Stop, wait for event to get processed
1516 // 2: Stop, finish immediately
1517 // 101: Restart, wait for events to get processed
1518 // 101: Restart, finish immediately
1519 //
1520 const int gi_reset = g_reset;
1521
1522 const bool abort = gi_reset%100==2;
1523
1524 factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
1525
1526 primaryQueue.wait(abort);
1527 secondaryQueue.wait(abort);
1528 processingQueue1.wait(abort);
1529
1530 // Here we also destroy all runCtrl structures and hence close all open files
1531 evtCtrl.clear();
1532 actrun.reset();
1533
1534 factPrintf(MessageImp::kInfo, "Exit read Process...");
1535 factPrintf(MessageImp::kInfo, "%llu Bytes flagged as in-use.", Memory::inuse);
1536
1537 factStat(gj);
1538
1539 return gi_reset>=100;
1540}
1541
1542// ==========================================================================
1543// ==========================================================================
1544
1545void StartEvtBuild()
1546{
1547 factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
1548
1549 memset(gi_NumConnect, 0, NBOARDS*sizeof(*gi_NumConnect));
1550
1551 memset(&gj, 0, sizeof(GUI_STAT));
1552
1553 gj.usdMem = Memory::inuse;
1554 gj.totMem = Memory::allocated;
1555 gj.maxMem = g_maxMem;
1556
1557
1558 READ_STRUCT rd[NBOARDS];
1559
1560 // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets)
1561 for (int i=0; i<NBOARDS; i++)
1562 rd[i].sockId = i;
1563
1564 while (mainloop(rd));
1565
1566 //must close all open sockets ...
1567 factPrintf(MessageImp::kInfo, "Close all sockets...");
1568
1569 READ_STRUCT::close();
1570
1571 // Now all sockets get closed. This is not reflected in gi_NumConnect
1572 // The current workaround is to count all sockets as closed when the thread is not running
1573 factPrintf(MessageImp::kInfo, "EventBuilder++ closed");
1574}
Note: See TracBrowser for help on using the repository browser.