source: trunk/FACT++/src/EventBuilder.cc@ 19054

Last change on this file since 19054 was 19027, checked in by tbretz, 7 years ago
Be a bit more specific when closing a file to avoid confusion. Same for opening a file. In additon there is no need when the run number changes to overwrite maxEvt. This was an early hack to close the file (which is done now automatically whenever a new file is opened) and produces wrong output ('file closed after 1005 events').
File size: 47.3 KB
Line 
1#include <poll.h>
2#include <sys/time.h>
3#include <sys/epoll.h>
4#include <netinet/tcp.h>
5
6#include <cstring>
7#include <cstdarg>
8#include <list>
9#include <queue>
10
11#include <boost/algorithm/string/join.hpp>
12
13#include "../externals/Queue.h"
14
15#include "MessageImp.h"
16#include "EventBuilder.h"
17#include "HeadersFAD.h"
18
19using namespace std;
20
21#define MIN_LEN 32 // min #bytes needed to interpret FADheader
22#define MAX_LEN 81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092 (data+boardheader+eventheader+endflag)
23
24//#define COMPLETE_EVENTS
25//#define USE_POLL
26//#define USE_EPOLL
27//#define USE_SELECT
28//#define COMPLETE_EPOLL
29//#define PRIORITY_QUEUE
30
31// Reading only 1024: 13: 77Hz, 87%
32// Reading only 1024: 12: 78Hz, 46%
33// Reading only 300: 4: 250Hz, 92%
34// Reading only 300: 3: 258Hz, 40%
35
36// Reading only four threads 1024: 13: 77Hz, 60%
37// Reading only four threads 1024: 12: 78Hz, 46%
38// Reading only four threads 300: 4: 250Hz, 92%
39// Reading only four threads 300: 3: 258Hz, 40%
40
41// Default 300: 4: 249Hz, 92%
42// Default 300: 3: 261Hz, 40%
43// Default 1024: 13: 76Hz, 93%
44// Default 1024: 12: 79Hz, 46%
45
46// Poll [selected] 1024: 13: 63Hz, 45%
47// Poll [selected] 1024: 14: 63Hz, 63%
48// Poll [selected] 1024: 15: 64Hz, 80%
49// Poll [selected] 300: 4: 230Hz, 47%
50// Poll [selected] 300: 3: 200Hz, 94%
51
52// Poll [all] 1024: 13: 65Hz, 47%
53// Poll [all] 1024: 14: 64Hz, 59%
54// Poll [all] 1024: 15: 62Hz, 67%
55// Poll [all] 300: 4: 230Hz, 47%
56// Poll [all] 300: 3: 230Hz, 35%
57
58// ==========================================================================
59
60bool runOpen(const EVT_CTRL2 &evt);
61bool runWrite(const EVT_CTRL2 &evt);
62void runClose(const EVT_CTRL2 &run);
63void applyCalib(const EVT_CTRL2 &evt, const size_t &size);
64void factOut(int severity, const char *message);
65void factReportIncomplete (uint64_t rep);
66void gotNewRun(RUN_CTRL2 &run);
67void runFinished();
68void factStat(const GUI_STAT &gj);
69bool eventCheck(const EVT_CTRL2 &evt);
70void debugHead(void *buf);
71
72// ==========================================================================
73
74int g_reset;
75
76size_t g_maxMem; //maximum memory allowed for buffer
77
78uint16_t g_evtTimeout; // timeout (sec) for one event
79
80FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
81
82uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
83
84GUI_STAT gj;
85
86// ==========================================================================
87
88namespace Memory
89{
90 uint64_t inuse = 0;
91 uint64_t allocated = 0;
92
93 uint64_t max_inuse = 0;
94
95 std::mutex mtx;
96
97 std::forward_list<void*> memory;
98
99 void *malloc()
100 {
101 // No free slot available, next alloc would exceed max memory
102 if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem)
103 return NULL;
104
105 // We will return this amount of memory
106 // This is not 100% thread safe, but it is not a super accurate measure anyway
107 inuse += MAX_TOT_MEM;
108 if (inuse>max_inuse)
109 max_inuse = inuse;
110
111 if (memory.empty())
112 {
113 // No free slot available, allocate a new one
114 allocated += MAX_TOT_MEM;
115 return new char[MAX_TOT_MEM];
116 }
117
118 // Get the next free slot from the stack and return it
119 const std::lock_guard<std::mutex> lock(mtx);
120
121 void *mem = memory.front();
122 memory.pop_front();
123 return mem;
124 };
125
126 void free(void *mem)
127 {
128 if (!mem)
129 return;
130
131 // Decrease the amont of memory in use accordingly
132 inuse -= MAX_TOT_MEM;
133
134 // If the maximum memory has changed, we might be over the limit.
135 // In this case: free a slot
136 if (allocated>g_maxMem)
137 {
138 delete [] (char*)mem;
139 allocated -= MAX_TOT_MEM;
140 return;
141 }
142
143 const std::lock_guard<std::mutex> lock(mtx);
144 memory.push_front(mem);
145 }
146
147};
148
149// ==========================================================================
150
151__attribute__((__format__ (__printf__, 2, 0)))
152void factPrintf(int severity, const char *fmt, ...)
153{
154 char str[1000];
155
156 va_list ap;
157 va_start(ap, fmt);
158 vsnprintf(str, 1000, fmt, ap);
159 va_end(ap);
160
161 factOut(severity, str);
162}
163
164// ==========================================================================
165
166struct READ_STRUCT
167{
168 enum buftyp_t
169 {
170 kStream,
171 kHeader,
172 kData,
173#ifdef COMPLETE_EVENTS
174 kWait
175#endif
176 };
177
178 // ---------- connection ----------
179
180 static uint activeSockets;
181
182 int sockId; // socket id (board number)
183 int socket; // socket handle
184 bool connected; // is this socket connected?
185
186 struct sockaddr_in SockAddr; // Socket address copied from wrapper during socket creation
187
188 // ------------ epoll -------------
189
190 static int fd_epoll;
191 static epoll_event events[NBOARDS];
192
193 static void init();
194 static void close();
195 static int wait();
196 static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); }
197
198 // ------------ buffer ------------
199
200 buftyp_t bufTyp; // what are we reading at the moment: 0=header 1=data -1=skip ...
201
202 uint32_t bufLen; // number of bytes left to read
203 uint8_t *bufPos; // next byte to read to the buffer next
204
205 union
206 {
207 uint8_t B[MAX_LEN];
208 uint16_t S[MAX_LEN / 2];
209 uint32_t I[MAX_LEN / 4];
210 uint64_t L[MAX_LEN / 8];
211 PEVNT_HEADER H;
212 };
213
214 timeval time;
215 uint64_t totBytes; // total received bytes
216 uint64_t relBytes; // total released bytes
217 uint32_t skip; // number of bytes skipped before start of event
218
219 uint32_t len() const { return uint32_t(H.package_length)*2; }
220
221 void swapHeader();
222 void swapData();
223
224 // --------------------------------
225
226 READ_STRUCT() : socket(-1), connected(false), totBytes(0), relBytes(0)
227 {
228 if (fd_epoll<0)
229 init();
230 }
231 ~READ_STRUCT()
232 {
233 destroy();
234 }
235
236 void destroy();
237 bool create(sockaddr_in addr);
238 bool check(int, sockaddr_in addr);
239 bool read();
240
241};
242
243#ifdef PRIORITY_QUEUE
244struct READ_STRUCTcomp
245{
246 bool operator()(const READ_STRUCT *r1, const READ_STRUCT *r2)
247 {
248 const int64_t rel1 = r1->totBytes - r1->relBytes;
249 const int64_t rel2 = r2->totBytes - r2->relBytes;
250 return rel1 > rel2;
251 }
252};
253#endif
254
255int READ_STRUCT::wait()
256{
257 // wait for something to do...
258 const int rc = epoll_wait(fd_epoll, events, NBOARDS, 100); // max, timeout[ms]
259 if (rc>=0)
260 return rc;
261
262 if (errno==EINTR) // timout or signal interruption
263 return 0;
264
265 factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno);
266 return -1;
267}
268
269uint READ_STRUCT::activeSockets = 0;
270int READ_STRUCT::fd_epoll = -1;
271epoll_event READ_STRUCT::events[NBOARDS];
272
273void READ_STRUCT::init()
274{
275 if (fd_epoll>=0)
276 return;
277
278#ifdef USE_EPOLL
279 fd_epoll = epoll_create(NBOARDS);
280 if (fd_epoll<0)
281 {
282 factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno);
283 return;
284 }
285#endif
286}
287
288void READ_STRUCT::close()
289{
290#ifdef USE_EPOLL
291 if (fd_epoll>=0 && ::close(fd_epoll)>0)
292 factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno);
293#endif
294
295 fd_epoll = -1;
296}
297
298bool READ_STRUCT::create(sockaddr_in sockAddr)
299{
300 if (socket>=0)
301 return false;
302
303 const int port = ntohs(sockAddr.sin_port) + 1;
304
305 SockAddr.sin_family = sockAddr.sin_family;
306 SockAddr.sin_addr = sockAddr.sin_addr;
307 SockAddr.sin_port = htons(port);
308
309 if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
310 {
311 factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno);
312 socket = -1;
313 return false;
314 }
315
316 int optval = 1;
317 if (setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(int)) < 0)
318 factPrintf(MessageImp::kInfo, "Setting TCP_NODELAY for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
319
320 optval = 1;
321 if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0)
322 factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
323
324 optval = 10; //start after 10 seconds
325 if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0)
326 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
327
328 optval = 10; //do every 10 seconds
329 if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0)
330 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
331
332 optval = 2; //close after 2 unsuccessful tries
333 if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0)
334 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
335
336 factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket);
337
338 //connected = false;
339 activeSockets++;
340
341 return true;
342}
343
344void READ_STRUCT::destroy()
345{
346 if (socket<0)
347 return;
348
349#ifdef USE_EPOLL
350 // strictly speaking this should not be necessary
351 if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
352 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
353#endif
354
355 if (::close(socket) > 0)
356 factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
357 else
358 factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket);
359
360 // Set the socket to "not connected"
361 socket = -1;
362 connected = false;
363 activeSockets--;
364 bufLen = 0;
365}
366
367bool READ_STRUCT::check(int sockDef, sockaddr_in addr)
368{
369 // Continue in the most most likely case (performance)
370 //if (socket>=0 && sockDef!=0 && connected)
371 // return;
372 const int old = socket;
373
374 // socket open, but should not be open
375 if (socket>=0 && sockDef==0)
376 destroy();
377
378 // Socket closed, but should be open
379 if (socket<0 && sockDef!=0)
380 create(addr); //generate address and socket
381
382 const bool retval = old!=socket;
383
384 // Socket closed
385 if (socket<0)
386 return retval;
387
388 // Socket open and connected: Nothing to do
389 if (connected)
390 return retval;
391
392 //try to connect if not yet done
393 const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
394 if (rc == -1)
395 return retval;
396
397 connected = true;
398
399 if (sockDef<0)
400 {
401 bufTyp = READ_STRUCT::kStream; // full data to be skipped
402 bufLen = MAX_LEN; // huge for skipping
403 }
404 else
405 {
406 bufTyp = READ_STRUCT::kHeader; // expect a header
407 bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
408 }
409
410 bufPos = B; // no byte read so far
411 skip = 0; // start empty
412 totBytes = 0;
413 relBytes = 0;
414
415 factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
416
417#ifdef USE_EPOLL
418 epoll_event ev;
419 ev.events = EPOLLIN;
420 ev.data.ptr = this; // user data (union: ev.ptr)
421 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
422 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
423#endif
424
425 return retval;
426}
427
428bool READ_STRUCT::read()
429{
430 if (!connected)
431 return false;
432
433 if (bufLen==0)
434 return true;
435
436 const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
437 // recv failed
438 if (jrd<0)
439 {
440 // There was just nothing waiting
441 if (errno==EWOULDBLOCK || errno==EAGAIN)
442 return false;
443
444 factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno);
445 return false;
446 }
447
448 // connection was closed ...
449 if (jrd==0)
450 {
451 factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId);
452
453 destroy();//DestroySocket(rd[i]); //generate address and socket
454 return false;
455 }
456
457 totBytes += jrd;
458
459 // are we skipping this board ...
460 if (bufTyp==kStream)
461 return false;
462
463 if (bufPos==B)
464 gettimeofday(&time, NULL);
465
466 bufPos += jrd; //==> prepare for continuation
467 bufLen -= jrd;
468
469 // not yet all read
470 return bufLen==0;
471}
472
473void READ_STRUCT::swapHeader()
474{
475 S[1] = ntohs(S[1]); // package_length (bytes not swapped!)
476 S[2] = ntohs(S[2]); // version_no
477 S[3] = ntohs(S[3]); // PLLLCK
478 S[4] = ntohs(S[4]); // trigger_crc
479 S[5] = ntohs(S[5]); // trigger_type
480
481 I[3] = ntohl(I[3]); // trigger_id
482 I[4] = ntohl(I[4]); // fad_evt_counter
483 I[5] = ntohl(I[5]); // REFCLK_frequency
484
485 S[12] = ntohs(S[12]); // board id
486 S[13] = ntohs(S[13]); // adc_clock_phase_shift
487 S[14] = ntohs(S[14]); // number_of_triggers_to_generate
488 S[15] = ntohs(S[15]); // trigger_generator_prescaler
489
490 I[10] = ntohl(I[10]); // runnumber;
491 I[11] = ntohl(I[11]); // time;
492
493 // Use back inserter??
494 for (int s=24; s<24+NTemp+NDAC; s++)
495 S[s] = ntohs(S[s]); // drs_temperature / dac
496}
497
498void READ_STRUCT::swapData()
499{
500 // swapEventHeaderBytes: End of the header. to channels now
501
502 int i = 36;
503 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
504 {
505 S[i+0] = ntohs(S[i+0]);//id
506 S[i+1] = ntohs(S[i+1]);//start_cell
507 S[i+2] = ntohs(S[i+2]);//roi
508 S[i+3] = ntohs(S[i+3]);//filling
509
510 i += 4+S[i+2];//skip the pixel data
511 }
512}
513
514// ==========================================================================
515
516bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
517{
518 int xjr = -1;
519 int xkr = -1;
520
521 //points to the very first roi
522 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
523
524 roi[0] = ntohs(rd.S[roiPtr]);
525
526 for (int jr = 0; jr < 9; jr++)
527 {
528 roi[jr] = ntohs(rd.S[roiPtr]);
529
530 if (roi[jr]>1024)
531 {
532 factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]);
533 return false;
534 }
535
536 // Check that the roi of pixels jr are compatible with the one of pixel 0
537 if (jr!=8 && roi[jr]!=roi[0])
538 {
539 xjr = jr;
540 break;
541 }
542
543 // Check that the roi of all other DRS chips on boards are compatible
544 for (int kr = 1; kr < 4; kr++)
545 {
546 const int kroi = ntohs(rd.S[roiPtr]);
547 if (kroi != roi[jr])
548 {
549 xjr = jr;
550 xkr = kr;
551 break;
552 }
553 roiPtr += kroi+4;
554 }
555 }
556
557 if (xjr>=0)
558 {
559 if (xkr<0)
560 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
561 else
562 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr]));
563
564 return false;
565 }
566
567 if (roi[8] < roi[0])
568 {
569 factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
570 return false;
571 }
572
573 return true;
574}
575
576list<shared_ptr<EVT_CTRL2>> evtCtrl;
577
578shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
579{
580 /*
581 checkroi consistence
582 find existing entry
583 if no entry, try to allocate memory
584 if entry and memory, init event structure
585 */
586
587 uint16_t nRoi[9];
588 if (!checkRoiConsistency(rd, nRoi))
589 return shared_ptr<EVT_CTRL2>();
590
591 for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
592 {
593 // A reference is enough because the evtCtrl holds the shared_ptr anyway
594 const shared_ptr<EVT_CTRL2> &evt = *it;
595
596 // If the run is different, go on searching.
597 // We cannot stop searching if a lower run-id is found as in
598 // the case of the events, because theoretically, there
599 // can be the same run on two different days.
600 if (rd.H.runnumber != evt->runNum)
601 continue;
602
603 // If the ID of the new event if higher than the last one stored
604 // in that run, we have to assign a new slot (leave the loop)
605 if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/)
606 break;
607
608 if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/)
609 continue;
610
611 // We have found an entry with the same runID and evtID
612 // Check if ROI is consistent
613 if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
614 {
615 factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
616 evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
617 return shared_ptr<EVT_CTRL2>();
618 }
619
620 // It is maybe not likely, but the header of this board might have
621 // arrived earlier. (We could also update the run-info, but
622 // this should not make a difference here)
623 if ((rd.time.tv_sec==evt->time.tv_sec && rd.time.tv_usec<evt->time.tv_usec) ||
624 rd.time.tv_sec<evt->time.tv_sec)
625 evt->time = rd.time;
626
627 //everything seems fine so far ==> use this slot ....
628 return evt;
629 }
630
631 if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
632 {
633 factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
634 actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter);
635 return shared_ptr<EVT_CTRL2>();
636 }
637
638 EVT_CTRL2 *evt = new EVT_CTRL2;
639
640 evt->time = rd.time;
641
642 evt->runNum = rd.H.runnumber;
643 evt->evNum = rd.H.fad_evt_counter;
644
645 evt->trgNum = rd.H.trigger_id;
646 evt->trgTyp = rd.H.trigger_type;
647
648 evt->nRoi = nRoi[0];
649 evt->nRoiTM = nRoi[8];
650
651 //evt->firstBoard = rd.sockId;
652
653 const bool newrun = actrun->runId != rd.H.runnumber;
654 if (newrun)
655 {
656 factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d",
657 rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
658
659 // The new run is the active run now
660 actrun = make_shared<RUN_CTRL2>();
661
662 const time_t &tsec = evt->time.tv_sec;
663
664 actrun->openTime = tsec;
665 actrun->closeTime = tsec + 3600 * 24; // max time allowed
666 actrun->runId = rd.H.runnumber;
667 actrun->roi0 = nRoi[0]; // FIXME: Make obsolete!
668 actrun->roi8 = nRoi[8]; // FIXME: Make obsolete!
669
670 // Signal the fadctrl that a new run has been started
671 // Note this is the only place at which we can ensure that
672 // gotnewRun is called only once
673 gotNewRun(*actrun);
674 }
675
676 // Keep pointer to run of this event
677 evt->runCtrl = actrun;
678
679 // Increase the number of events we have started to receive in this run
680 actrun->lastTime = evt->time.tv_sec; // Time when the last event was received
681 actrun->lastEvt++;
682
683 // An event can be the first and the last, but not the last and the first.
684 // Therefore gotNewRun is called before runFinished.
685 // runFinished signals that the last event of a run was just received. Processing
686 // might still be ongoing, but we can start a new run.
687 const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached
688 const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached
689 if (!cond1 || !cond2)
690 runFinished();
691
692 // We don't mind here that this is not common to all events,
693 // because every coming event will fullfil the condition as well.
694 if (!cond1)
695 evt->closeRequest |= kRequestMaxEvtsReached;
696 if (!cond2)
697 evt->closeRequest |= kRequestMaxTimeReached;
698
699 // Secure access to evtCtrl against access in CloseRunFile
700 // This should be the last... otherwise we can run into threading issues
701 // if the event is accessed before it is fully initialized.
702 evtCtrl.emplace_back(evt);
703 return evtCtrl.back();
704}
705
706
707void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
708{
709 const int i = rBuf.sockId;
710
711 memcpy(evt->FADhead+i, &rBuf.H, sizeof(PEVNT_HEADER));
712
713 int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts
714
715 // consistency of ROIs have been checked already (is it all correct?)
716 const uint16_t &roi = rBuf.S[src+2];
717
718 // different sort in FAD board.....
719 EVENT *event = evt->fEvent;
720 for (int px = 0; px < 9; px++)
721 {
722 for (int drs = 0; drs < 4; drs++)
723 {
724 const int16_t pixC = rBuf.S[src+1]; // start-cell
725 const int16_t pixR = rBuf.S[src+2]; // roi
726 //here we should check if pixH is correct ....
727
728 const int pixS = i*36 + drs*9 + px;
729
730 event->StartPix[pixS] = pixC;
731
732 memcpy(event->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2);
733
734 src += 4+pixR;
735
736 // Treatment for ch 9 (TM channel)
737 if (px != 8)
738 continue;
739
740 const int tmS = i*4 + drs;
741
742 //and we have additional TM info
743 if (pixR > roi)
744 {
745 event->StartTM[tmS] = (pixC + pixR - roi) % 1024;
746
747 memcpy(event->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2);
748 }
749 else
750 {
751 event->StartTM[tmS] = -1;
752 }
753 }
754 }
755}
756
757// ==========================================================================
758
759uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt)
760{
761 factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)",
762 evt->runNum, evt->evNum, evtCtrl.size(), txt);
763
764 uint64_t report = 0;
765
766 char str[1000];
767
768 int ik=0;
769 for (int ib=0; ib<NBOARDS; ib++)
770 {
771 if (ib%10==0)
772 str[ik++] = '|';
773
774 const int jb = evt->board[ib];
775 if (jb>=0) // data received from that board
776 {
777 str[ik++] = '0'+(jb%10);
778 continue;
779 }
780
781 // FIXME: This is not synchronous... it reports
782 // accoridng to the current connection status, not w.r.t. to the
783 // one when the event was taken.
784 if (gi_NumConnect[ib]==0) // board not connected
785 {
786 str[ik++] = 'x';
787 continue;
788 }
789
790 // data from this board lost
791 str[ik++] = '.';
792 report |= ((uint64_t)1)<<ib;
793 }
794
795 str[ik++] = '|';
796 str[ik] = 0;
797
798 factOut(MessageImp::kWarn, str);
799
800 return report;
801}
802
803// ==========================================================================
804// ==========================================================================
805
806bool proc1(const shared_ptr<EVT_CTRL2> &);
807
808Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&proc1, placeholders::_1));
809
810bool proc1(const shared_ptr<EVT_CTRL2> &evt)
811{
812 applyCalib(*evt, processingQueue1.size());
813 return true;
814}
815
816// If this is not convenient anymore, it could be replaced by
817// a command queue, to which command+data is posted,
818// (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo)
819bool writeEvt(const shared_ptr<EVT_CTRL2> &evt)
820{
821 //const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
822 RUN_CTRL2 &run = *evt->runCtrl;
823
824 // Is this a valid event or just an empty event to trigger run close?
825 // If this is not an empty event open the new run-file
826 // Empty events are there to trigger run-closing conditions
827 if (evt->valid())
828 {
829 // File not yet open
830 if (run.fileStat==kFileNotYetOpen)
831 {
832 // runOpen will close a previous run, if still open
833 if (!runOpen(*evt))
834 {
835 factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
836 run.fileStat = kFileClosed;
837 return true;
838 }
839
840 factPrintf(MessageImp::kInfo, "Opened new file for data from run %d (evt=%d)", evt->runNum, evt->evNum);
841 run.fileStat = kFileOpen;
842 }
843
844 // Here we have a valid calibration and can go on with that.
845 // It is important that _all_ events are sent for calibration (except broken ones)
846 processingQueue1.post(evt);
847 }
848
849 // File already closed
850 if (run.fileStat==kFileClosed)
851 return true;
852
853 // If we will have a software trigger which prevents single events from writing,
854 // the logic of writing the stop time and the trigger counters need to be adapted.
855 // Currently it is just the values of the last valid event.
856 bool rc1 = true;
857 if (evt->valid())
858 {
859 rc1 = runWrite(*evt);
860 if (!rc1)
861 factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
862 }
863
864 // File not open... no need to close or to check for close
865 // ... this is the case if CloseRunFile was called before any file was opened.
866 if (run.fileStat!=kFileOpen)
867 return true;
868
869 // File is not yet to be closed.
870 if (rc1 && evt->closeRequest==kRequestNone)
871 return true;
872
873 runClose(*evt);
874 run.fileStat = kFileClosed;
875
876 vector<string> reason;
877 if (evt->closeRequest&kRequestManual)
878 reason.emplace_back("close was requested");
879 if (evt->closeRequest&kRequestTimeout)
880 reason.emplace_back("receive timed out");
881 if (evt->closeRequest&kRequestConnectionChange)
882 reason.emplace_back("connection changed");
883 if (evt->closeRequest&kRequestEventCheckFailed)
884 reason.emplace_back("event check failed");
885 if (evt->closeRequest&kRequestMaxTimeReached)
886 reason.push_back(to_string(run.closeTime-run.openTime)+"s had been reached");
887 if (evt->closeRequest&kRequestMaxEvtsReached)
888 reason.push_back(to_string(run.maxEvt)+" evts had been reached");
889 if (!rc1)
890 reason.emplace_back("runWrite failed");
891
892 const string str = boost::algorithm::join(reason, ", ");
893 factPrintf(MessageImp::kInfo, "File %d was closed because %s", run.runId, str.c_str());
894
895 return true;
896}
897
898Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1));
899
900bool procEvt(const shared_ptr<EVT_CTRL2> &evt)
901{
902 RUN_CTRL2 &run = *evt->runCtrl;
903
904 bool check = true;
905 if (evt->valid())
906 {
907 EVENT *event = evt->fEvent;
908
909 // This is already done in initMemory()
910 //event->Roi = evt->runCtrl->roi0;
911 //event->RoiTM = evt->runCtrl->roi8;
912 //event->EventNum = evt->evNum;
913 //event->TriggerNum = evt->trgNum;
914 //event->TriggerType = evt->trgTyp;
915
916 event->NumBoards = evt->nBoard;
917
918 event->PCTime = evt->time.tv_sec;
919 event->PCUsec = evt->time.tv_usec;
920
921 for (int ib=0; ib<NBOARDS; ib++)
922 event->BoardTime[ib] = evt->FADhead[ib].time;
923
924 check = eventCheck(*evt);
925
926 // If the event is valid, increase the trigger counter accordingly
927 if (check)
928 {
929 // Physics trigger
930 if (evt->trgTyp && !(evt->trgTyp & FAD::EventHeader::kAll))
931 run.triggerCounter[0]++;
932 // Pure pedestal trigger
933 else if ((evt->trgTyp&FAD::EventHeader::kPedestal) && !(evt->trgTyp&FAD::EventHeader::kTIM))
934 run.triggerCounter[1]++;
935 // external light pulser trigger
936 else if (evt->trgTyp & FAD::EventHeader::kLPext)
937 run.triggerCounter[2]++;
938 // time calibration triggers
939 else if (evt->trgTyp & (FAD::EventHeader::kTIM|FAD::EventHeader::kPedestal))
940 run.triggerCounter[3]++;
941 // internal light pulser trigger
942 else if (evt->trgTyp & FAD::EventHeader::kLPint)
943 run.triggerCounter[4]++;
944 // external trigger input 1
945 else if (evt->trgTyp & FAD::EventHeader::kExt1)
946 run.triggerCounter[5]++;
947 // external trigger input 2
948 else if (evt->trgTyp & FAD::EventHeader::kExt2)
949 run.triggerCounter[6]++;
950 // other triggers
951 else
952 run.triggerCounter[7]++;
953 }
954 }
955
956 // If this is an invalid event, the current triggerCounter needs to be copied
957 // because runClose will use that one to update the TRIGGER_COUNTER.
958 // When closing the file, the trigger counter of the last successfully
959 // written event is used.
960 evt->triggerCounter = run.triggerCounter;
961
962 // If event check has failed, skip the event and post a close request instead.
963 // Otherwise, if file is open post the event for being written
964 if (!check)
965 secondaryQueue.emplace(new EVT_CTRL2(kRequestEventCheckFailed, evt->runCtrl));
966 else
967 secondaryQueue.post(evt);
968
969 return true;
970}
971
972// ==========================================================================
973// ==========================================================================
974
975/*
976 task 1-4:
977
978 lock1()-lock4();
979 while (1)
980 {
981 wait for signal [lockN]; // unlocked
982
983 while (n!=10)
984 wait sockets;
985 read;
986
987 lockM();
988 finished[n] = true;
989 signal(mainloop);
990 unlockM();
991 }
992
993
994 mainloop:
995
996 while (1)
997 {
998 lockM();
999 while (!finished[0] || !finished[1] ...)
1000 wait for signal [lockM]; // unlocked... signals can be sent
1001 finished[0-1] = false;
1002 unlockM()
1003
1004 copy data to queue // locked
1005
1006 lockN[0-3];
1007 signalN[0-3];
1008 unlockN[0-3];
1009 }
1010
1011
1012 */
1013
1014/*
1015 while (g_reset)
1016 {
1017 shared_ptr<EVT_CTRL2> evt = new shared_ptr<>;
1018
1019 // Check that all sockets are connected
1020
1021 for (int i=0; i<40; i++)
1022 if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0)
1023 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1024
1025 while (g_reset)
1026 {
1027 if (READ_STRUCT::wait()<0)
1028 break;
1029
1030 if (rc_epoll==0)
1031 break;
1032
1033 for (int jj=0; jj<rc_epoll; jj++)
1034 {
1035 READ_STRUCT *rs = READ_STRUCT::get(jj);
1036 if (!rs->connected)
1037 continue;
1038
1039 const bool rc_read = rs->read();
1040 if (!rc_read)
1041 continue;
1042
1043 if (rs->bufTyp==READ_STRUCT::kHeader)
1044 {
1045 [...]
1046 }
1047
1048 [...]
1049
1050 if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
1051 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1052 }
1053
1054 if (once_a_second)
1055 {
1056 if (evt==timeout)
1057 break;
1058 }
1059 }
1060
1061 if (evt.nBoards==actBoards)
1062 primaryQueue.post(evt);
1063 }
1064*/
1065
1066Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
1067
1068// This corresponds more or less to fFile... should we merge both?
1069shared_ptr<RUN_CTRL2> actrun;
1070
1071void CloseRunFile()
1072{
1073 // Currently we need actrun here, to be able to set kFileClosed.
1074 // Apart from that we have to ensure that there is an open file at all
1075 // which we can close.
1076 // Submission to the primary queue ensures that the event
1077 // is placed at the right place in the processing chain.
1078 // (Corresponds to the correct run)
1079 primaryQueue.emplace(new EVT_CTRL2(kRequestManual, actrun));
1080}
1081
1082bool mainloop(READ_STRUCT *rd)
1083{
1084 factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
1085
1086 primaryQueue.start();
1087 secondaryQueue.start();
1088 processingQueue1.start();;
1089
1090 actrun = make_shared<RUN_CTRL2>();
1091
1092 //time in seconds
1093 time_t gi_SecTime = time(NULL)-1;
1094
1095 //loop until global variable g_runStat claims stop
1096 g_reset = 0;
1097 while (g_reset == 0)
1098 {
1099#ifdef USE_POLL
1100 int pp[40];
1101 int nn = 0;
1102 pollfd fds[40];
1103 for (int i=0; i<40; i++)
1104 {
1105 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1106 {
1107 fds[nn].fd = rd[i].socket;
1108 fds[nn].events = POLLIN;
1109 pp[nn] = i;
1110 nn++;
1111 }
1112 }
1113
1114 const int rc_epoll = poll(fds, nn, 100);
1115 if (rc_epoll<0)
1116 break;
1117#endif
1118
1119#ifdef USE_SELECT
1120 fd_set readfs;
1121 FD_ZERO(&readfs);
1122 int nfsd = 0;
1123 for (int i=0; i<NBOARDS; i++)
1124 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1125 {
1126 FD_SET(rd[i].socket, &readfs);
1127 if (rd[i].socket>nfsd)
1128 nfsd = rd[i].socket;
1129 }
1130
1131 timeval tv;
1132 tv.tv_sec = 0;
1133 tv.tv_usec = 100000;
1134 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
1135 // 0: timeout
1136 // -1: error
1137 if (rc_select<0)
1138 {
1139 factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
1140 continue;
1141 }
1142#endif
1143
1144#ifdef USE_EPOLL
1145 const int rc_epoll = READ_STRUCT::wait();
1146 if (rc_epoll<0)
1147 break;
1148#endif
1149
1150#ifdef PRIORITY_QUEUE
1151 priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio;
1152
1153 for (int i=0; i<NBOARDS; i++)
1154 if (rd[i].connected)
1155 prio.push(rd+i);
1156
1157 if (!prio.empty()) do
1158#endif
1159
1160
1161#ifdef USE_POLL
1162 for (int jj=0; jj<nn; jj++)
1163#endif
1164#ifdef USE_EPOLL
1165 for (int jj=0; jj<rc_epoll; jj++)
1166#endif
1167#if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE)
1168 for (int jj=0; jj<NBOARDS; jj++)
1169#endif
1170 {
1171#ifdef PRIORITY_QUEUE
1172 READ_STRUCT *rs = prio.top();
1173#endif
1174#ifdef USE_SELECT
1175 if (!FD_ISSET(rs->socket, &readfs))
1176 continue;
1177#endif
1178
1179#ifdef USE_POLL
1180 if ((fds[jj].revents&POLLIN)==0)
1181 continue;
1182#endif
1183
1184#ifdef USE_EPOLL
1185 // FIXME: How to get i?
1186 READ_STRUCT *rs = READ_STRUCT::get(jj);
1187#endif
1188
1189#ifdef USE_POLL
1190 // FIXME: How to get i?
1191 READ_STRUCT *rs = &rd[pp[jj]];
1192#endif
1193
1194#if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE)
1195 const int i = (jj%4)*10 + (jj/4);
1196 READ_STRUCT *rs = &rd[i];
1197#endif
1198
1199#ifdef COMPLETE_EVENTS
1200 if (rs->bufTyp==READ_STRUCT::kWait)
1201 continue;
1202#endif
1203
1204 // ==================================================================
1205
1206 const bool rc_read = rs->read();
1207
1208 // Connect might have gotten closed during read
1209 gi_NumConnect[rs->sockId] = rs->connected;
1210 gj.numConn[rs->sockId] = rs->connected;
1211
1212 // Read either failed or disconnected, or the buffer is not yet full
1213 if (!rc_read)
1214 continue;
1215
1216 // ==================================================================
1217
1218 if (rs->bufTyp==READ_STRUCT::kHeader)
1219 {
1220 //check if startflag correct; else shift block ....
1221 // FIXME: This is not enough... this combination of
1222 // bytes can be anywhere... at least the end bytes
1223 // must be checked somewhere, too.
1224 uint k;
1225 for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
1226 {
1227 if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
1228 break;
1229 }
1230 rs->skip += k;
1231
1232 //no start of header found
1233 if (k==sizeof(PEVNT_HEADER)-1)
1234 {
1235 rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1];
1236 rs->bufPos = rs->B+1;
1237 rs->bufLen = sizeof(PEVNT_HEADER)-1;
1238 continue;
1239 }
1240
1241 if (k > 0)
1242 {
1243 memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
1244
1245 rs->bufPos -= k;
1246 rs->bufLen += k;
1247
1248 continue; // We need to read more (bufLen>0)
1249 }
1250
1251 if (rs->skip>0)
1252 {
1253 factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
1254 rs->skip = 0;
1255 }
1256
1257 // Swap the header entries from network to host order
1258 rs->swapHeader();
1259
1260 rs->bufTyp = READ_STRUCT::kData;
1261 rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
1262
1263 debugHead(rs->B); // i and fadBoard not used
1264
1265 continue;
1266 }
1267
1268 const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
1269 if (end != 0xfe04)
1270 {
1271 factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
1272 rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
1273
1274 // ready to read next header
1275 rs->bufTyp = READ_STRUCT::kHeader;
1276 rs->bufLen = sizeof(PEVNT_HEADER);
1277 rs->bufPos = rs->B;
1278 // FIXME: What to do with the validity flag?
1279 continue;
1280 }
1281
1282 // get index into mBuffer for this event (create if needed)
1283 const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
1284
1285 // We have a valid entry, but no memory has yet been allocated
1286 if (evt && !evt->initMemory())
1287 {
1288 const time_t tm = time(NULL);
1289 if (evt->runCtrl->reportMem==tm)
1290 continue;
1291
1292 factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
1293 evt->runCtrl->reportMem = tm;
1294 continue;
1295 }
1296
1297 // ready to read next header
1298 rs->bufTyp = READ_STRUCT::kHeader;
1299 rs->bufLen = sizeof(PEVNT_HEADER);
1300 rs->bufPos = rs->B;
1301
1302 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1303 if (!evt)
1304 continue;
1305
1306 // This should never happen
1307 if (evt->board[rs->sockId] != -1)
1308 {
1309 factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
1310 evt->evNum, rs->sockId, jj, rs->len());
1311 // FIXME: What to do with the validity flag?
1312 continue; // Continue reading next header
1313 }
1314
1315 // Swap the data entries (board headers) from network to host order
1316 rs->swapData();
1317
1318 // Copy data from rd[i] to mBuffer[evID]
1319 copyData(*rs, evt.get());
1320
1321#ifdef COMPLETE_EVENTS
1322 // Do not read anmymore from this board until the whole event has been received
1323 rs->bufTyp = READ_STRUCT::kWait;
1324#endif
1325 // now we have stored a new board contents into Event structure
1326 evt->board[rs->sockId] = rs->sockId;
1327 evt->header = evt->FADhead+rs->sockId;
1328 evt->nBoard++;
1329
1330#ifdef COMPLETE_EPOLL
1331 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
1332 {
1333 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1334 break;
1335 }
1336#endif
1337 // event not yet complete
1338 if (evt->nBoard < READ_STRUCT::activeSockets)
1339 continue;
1340
1341 // All previous events are now flagged as incomplete ("expired")
1342 // and will be removed. (This is a bit tricky, because pop_front()
1343 // would invalidate the current iterator if not done _after_ the increment)
1344 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1345 {
1346 const bool found = it->get()==evt.get();
1347 if (!found)
1348 reportIncomplete(*it, "expired");
1349 else
1350 primaryQueue.post(evt);
1351
1352 // package_len is 0 if nothing was received.
1353 for (int ib=0; ib<40; ib++)
1354 rd[ib].relBytes += uint32_t((*it)->FADhead[ib].package_length)*2;
1355
1356 // The counter must be increased _before_ the pop_front,
1357 // otherwise the counter is invalidated by the pop_front!
1358 it++;
1359 evtCtrl.pop_front();
1360
1361 // We reached the current event, so we are done
1362 if (found)
1363 break;
1364 }
1365
1366#ifdef COMPLETE_EPOLL
1367 for (int j=0; j<40; j++)
1368 {
1369 epoll_event ev;
1370 ev.events = EPOLLIN;
1371 ev.data.ptr = &rd[j]; // user data (union: ev.ptr)
1372 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
1373 {
1374 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1375 return;
1376 }
1377 }
1378#endif
1379
1380#ifdef COMPLETE_EVENTS
1381 for (int j=0; j<40; j++)
1382 {
1383 //if (rs->bufTyp==READ_STRUCT::kWait)
1384 {
1385 rs->bufTyp = READ_STRUCT::kHeader;
1386 rs->bufLen = sizeof(PEVNT_HEADER);
1387 rs->bufPos = rs->B;
1388 }
1389 }
1390#endif
1391 } // end for loop over all sockets
1392#ifdef PRIORITY_QUEUE
1393 while (0); // convert continue into break ;)
1394#endif
1395
1396 // ==================================================================
1397
1398 const time_t actTime = time(NULL);
1399 if (actTime == gi_SecTime)
1400 {
1401#if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL)
1402 if (evtCtrl.empty())
1403 usleep(actTime-actrun->lastTime>300 ? 10000 : 1);
1404#endif
1405 continue;
1406 }
1407 gi_SecTime = actTime;
1408
1409 // ==================================================================
1410 //loop over all active events and flag those older than read-timeout
1411 //delete those that are written to disk ....
1412
1413 // This could be improved having the pointer which separates the queue with
1414 // the incomplete events from the queue with the complete events
1415 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1416 {
1417 // A reference is enough because the shared_ptr is hold by the evtCtrl
1418 const shared_ptr<EVT_CTRL2> &evt = *it;
1419
1420 // The first event is the oldest. If the first event within the
1421 // timeout window was received, we can stop searching further.
1422 if (evt->time.tv_sec+g_evtTimeout>=actTime)
1423 break;
1424
1425 // The counter must be increased _before_ the pop_front,
1426 // otherwise the counter is invalidated by the pop_front!
1427 it++;
1428
1429 // This timeout is caused because complete data from one or more
1430 // boards has been received, but the memory could not be allocated.
1431 // There is no reason why we should not go on waiting for
1432 // memory to become free. However, the FADs will disconnect
1433 // after 60s due to their keep-alive timeout, but the event builder
1434 // will still wait for memory to become available.
1435 // Currently, the only possibility to free the memory from the
1436 // evtCtrl to restart the event builder (STOP/START).
1437 if (!evt->valid())
1438 continue;
1439
1440 // This will result in the emission of a dim service.
1441 // It doesn't matter if that takes comparably long,
1442 // because we have to stop the run anyway.
1443 const uint64_t rep = reportIncomplete(evt, "timeout");
1444 factReportIncomplete(rep);
1445
1446 // At least the data from one boards is complete...
1447 // package_len is 0 when nothing was received from this board
1448 for (int ib=0; ib<40; ib++)
1449 rd[ib].relBytes += uint32_t(evt->FADhead[ib].package_length)*2;
1450
1451 evtCtrl.pop_front();
1452 }
1453
1454 // =================================================================
1455
1456 gj.bufNew = evtCtrl.size(); //# incomplete events in buffer
1457 gj.bufEvt = primaryQueue.size(); //# complete events in buffer
1458 gj.bufWrite = secondaryQueue.size(); //# complete events in buffer
1459 gj.bufProc = processingQueue1.size(); //# complete events in buffer
1460 gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
1461 gj.usdMem = Memory::max_inuse;
1462 gj.totMem = Memory::allocated;
1463 gj.maxMem = g_maxMem;
1464
1465 gj.deltaT = 1000; // temporary, must be improved
1466
1467 bool changed = false;
1468
1469 static vector<uint64_t> store(NBOARDS);
1470
1471 for (int ib=0; ib<NBOARDS; ib++)
1472 {
1473 gj.rateBytes[ib] = store[ib]>rd[ib].totBytes ? rd[ib].totBytes : rd[ib].totBytes-store[ib];
1474 gj.relBytes[ib] = rd[ib].totBytes-rd[ib].relBytes;
1475
1476 store[ib] = rd[ib].totBytes;
1477
1478 if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
1479 changed = true;
1480
1481 gi_NumConnect[ib] = rd[ib].connected;
1482 gj.numConn[ib] = rd[ib].connected;
1483 }
1484
1485 factStat(gj);
1486
1487 Memory::max_inuse = 0;
1488
1489 // =================================================================
1490
1491 // This is a fake event to trigger possible run-closing conditions once a second
1492 // FIXME: This is not yet ideal because a file would never be closed
1493 // if a new file has been started and no events of the new file
1494 // have been received yet
1495 int request = kRequestNone;
1496
1497 // If nothing was received for more than 5min, close file
1498 if (actTime-actrun->lastTime>300)
1499 request |= kRequestTimeout;
1500
1501 // If connection status has changed
1502 if (changed)
1503 request |= kRequestConnectionChange;
1504
1505 if (request!=kRequestNone)
1506 runFinished();
1507
1508 if (actrun->fileStat==kFileOpen)
1509 primaryQueue.emplace(new EVT_CTRL2(request, actrun));
1510 }
1511
1512 // 1: Stop, wait for event to get processed
1513 // 2: Stop, finish immediately
1514 // 101: Restart, wait for events to get processed
1515 // 101: Restart, finish immediately
1516 //
1517 const int gi_reset = g_reset;
1518
1519 const bool abort = gi_reset%100==2;
1520
1521 factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
1522
1523 primaryQueue.wait(abort);
1524 secondaryQueue.wait(abort);
1525 processingQueue1.wait(abort);
1526
1527 // Here we also destroy all runCtrl structures and hence close all open files
1528 evtCtrl.clear();
1529 actrun.reset();
1530
1531 factPrintf(MessageImp::kInfo, "Exit read Process...");
1532 factPrintf(MessageImp::kInfo, "%llu Bytes flagged as in-use.", Memory::inuse);
1533
1534 factStat(gj);
1535
1536 return gi_reset>=100;
1537}
1538
1539// ==========================================================================
1540// ==========================================================================
1541
1542void StartEvtBuild()
1543{
1544 factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
1545
1546 memset(gi_NumConnect, 0, NBOARDS*sizeof(*gi_NumConnect));
1547
1548 memset(&gj, 0, sizeof(GUI_STAT));
1549
1550 gj.usdMem = Memory::inuse;
1551 gj.totMem = Memory::allocated;
1552 gj.maxMem = g_maxMem;
1553
1554
1555 READ_STRUCT rd[NBOARDS];
1556
1557 // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets)
1558 for (int i=0; i<NBOARDS; i++)
1559 rd[i].sockId = i;
1560
1561 while (mainloop(rd));
1562
1563 //must close all open sockets ...
1564 factPrintf(MessageImp::kInfo, "Close all sockets...");
1565
1566 READ_STRUCT::close();
1567
1568 // Now all sockets get closed. This is not reflected in gi_NumConnect
1569 // The current workaround is to count all sockets as closed when the thread is not running
1570 factPrintf(MessageImp::kInfo, "EventBuilder++ closed");
1571}
Note: See TracBrowser for help on using the repository browser.