source: trunk/FACT++/src/EventBuilder.cc@ 19246

Last change on this file since 19246 was 19064, checked in by tbretz, 6 years ago
Some minor changes to make it compatible with newer sets of headers.
File size: 47.3 KB
Line 
1#include <poll.h>
2#include <sys/time.h>
3#include <sys/epoll.h>
4#include <netinet/tcp.h>
5
6#include <cstring>
7#include <cstdarg>
8#include <list>
9#include <queue>
10#include <functional> // std::bind
11
12#include <boost/algorithm/string/join.hpp>
13
14#include "../externals/Queue.h"
15
16#include "MessageImp.h"
17#include "EventBuilder.h"
18#include "HeadersFAD.h"
19
20using namespace std;
21
22#define MIN_LEN 32 // min #bytes needed to interpret FADheader
23#define MAX_LEN 81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092 (data+boardheader+eventheader+endflag)
24
25//#define COMPLETE_EVENTS
26//#define USE_POLL
27//#define USE_EPOLL
28//#define USE_SELECT
29//#define COMPLETE_EPOLL
30//#define PRIORITY_QUEUE
31
32// Reading only 1024: 13: 77Hz, 87%
33// Reading only 1024: 12: 78Hz, 46%
34// Reading only 300: 4: 250Hz, 92%
35// Reading only 300: 3: 258Hz, 40%
36
37// Reading only four threads 1024: 13: 77Hz, 60%
38// Reading only four threads 1024: 12: 78Hz, 46%
39// Reading only four threads 300: 4: 250Hz, 92%
40// Reading only four threads 300: 3: 258Hz, 40%
41
42// Default 300: 4: 249Hz, 92%
43// Default 300: 3: 261Hz, 40%
44// Default 1024: 13: 76Hz, 93%
45// Default 1024: 12: 79Hz, 46%
46
47// Poll [selected] 1024: 13: 63Hz, 45%
48// Poll [selected] 1024: 14: 63Hz, 63%
49// Poll [selected] 1024: 15: 64Hz, 80%
50// Poll [selected] 300: 4: 230Hz, 47%
51// Poll [selected] 300: 3: 200Hz, 94%
52
53// Poll [all] 1024: 13: 65Hz, 47%
54// Poll [all] 1024: 14: 64Hz, 59%
55// Poll [all] 1024: 15: 62Hz, 67%
56// Poll [all] 300: 4: 230Hz, 47%
57// Poll [all] 300: 3: 230Hz, 35%
58
59// ==========================================================================
60
61bool runOpen(const EVT_CTRL2 &evt);
62bool runWrite(const EVT_CTRL2 &evt);
63void runClose(const EVT_CTRL2 &run);
64void applyCalib(const EVT_CTRL2 &evt, const size_t &size);
65void factOut(int severity, const char *message);
66void factReportIncomplete (uint64_t rep);
67void gotNewRun(RUN_CTRL2 &run);
68void runFinished();
69void factStat(const GUI_STAT &gj);
70bool eventCheck(const EVT_CTRL2 &evt);
71void debugHead(void *buf);
72
73// ==========================================================================
74
75int g_reset;
76
77size_t g_maxMem; //maximum memory allowed for buffer
78
79uint16_t g_evtTimeout; // timeout (sec) for one event
80
81FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
82
83uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
84
85GUI_STAT gj;
86
87// ==========================================================================
88
89namespace Memory
90{
91 uint64_t inuse = 0;
92 uint64_t allocated = 0;
93
94 uint64_t max_inuse = 0;
95
96 std::mutex mtx;
97
98 std::forward_list<void*> memory;
99
100 void *malloc()
101 {
102 // No free slot available, next alloc would exceed max memory
103 if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem)
104 return NULL;
105
106 // We will return this amount of memory
107 // This is not 100% thread safe, but it is not a super accurate measure anyway
108 inuse += MAX_TOT_MEM;
109 if (inuse>max_inuse)
110 max_inuse = inuse;
111
112 if (memory.empty())
113 {
114 // No free slot available, allocate a new one
115 allocated += MAX_TOT_MEM;
116 return new char[MAX_TOT_MEM];
117 }
118
119 // Get the next free slot from the stack and return it
120 const std::lock_guard<std::mutex> lock(mtx);
121
122 void *mem = memory.front();
123 memory.pop_front();
124 return mem;
125 };
126
127 void free(void *mem)
128 {
129 if (!mem)
130 return;
131
132 // Decrease the amont of memory in use accordingly
133 inuse -= MAX_TOT_MEM;
134
135 // If the maximum memory has changed, we might be over the limit.
136 // In this case: free a slot
137 if (allocated>g_maxMem)
138 {
139 delete [] (char*)mem;
140 allocated -= MAX_TOT_MEM;
141 return;
142 }
143
144 const std::lock_guard<std::mutex> lock(mtx);
145 memory.push_front(mem);
146 }
147
148};
149
150// ==========================================================================
151
152__attribute__((__format__ (__printf__, 2, 0)))
153void factPrintf(int severity, const char *fmt, ...)
154{
155 char str[1000];
156
157 va_list ap;
158 va_start(ap, fmt);
159 vsnprintf(str, 1000, fmt, ap);
160 va_end(ap);
161
162 factOut(severity, str);
163}
164
165// ==========================================================================
166
167struct READ_STRUCT
168{
169 enum buftyp_t
170 {
171 kStream,
172 kHeader,
173 kData,
174#ifdef COMPLETE_EVENTS
175 kWait
176#endif
177 };
178
179 // ---------- connection ----------
180
181 static uint activeSockets;
182
183 int sockId; // socket id (board number)
184 int socket; // socket handle
185 bool connected; // is this socket connected?
186
187 struct sockaddr_in SockAddr; // Socket address copied from wrapper during socket creation
188
189 // ------------ epoll -------------
190
191 static int fd_epoll;
192 static epoll_event events[NBOARDS];
193
194 static void init();
195 static void close();
196 static int wait();
197 static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); }
198
199 // ------------ buffer ------------
200
201 buftyp_t bufTyp; // what are we reading at the moment: 0=header 1=data -1=skip ...
202
203 uint32_t bufLen; // number of bytes left to read
204 uint8_t *bufPos; // next byte to read to the buffer next
205
206 union
207 {
208 uint8_t B[MAX_LEN];
209 uint16_t S[MAX_LEN / 2];
210 uint32_t I[MAX_LEN / 4];
211 uint64_t L[MAX_LEN / 8];
212 PEVNT_HEADER H;
213 };
214
215 timeval time;
216 uint64_t totBytes; // total received bytes
217 uint64_t relBytes; // total released bytes
218 uint32_t skip; // number of bytes skipped before start of event
219
220 uint32_t len() const { return uint32_t(H.package_length)*2; }
221
222 void swapHeader();
223 void swapData();
224
225 // --------------------------------
226
227 READ_STRUCT() : socket(-1), connected(false), totBytes(0), relBytes(0)
228 {
229 if (fd_epoll<0)
230 init();
231 }
232 ~READ_STRUCT()
233 {
234 destroy();
235 }
236
237 void destroy();
238 bool create(sockaddr_in addr);
239 bool check(int, sockaddr_in addr);
240 bool read();
241
242};
243
244#ifdef PRIORITY_QUEUE
245struct READ_STRUCTcomp
246{
247 bool operator()(const READ_STRUCT *r1, const READ_STRUCT *r2)
248 {
249 const int64_t rel1 = r1->totBytes - r1->relBytes;
250 const int64_t rel2 = r2->totBytes - r2->relBytes;
251 return rel1 > rel2;
252 }
253};
254#endif
255
256int READ_STRUCT::wait()
257{
258 // wait for something to do...
259 const int rc = epoll_wait(fd_epoll, events, NBOARDS, 100); // max, timeout[ms]
260 if (rc>=0)
261 return rc;
262
263 if (errno==EINTR) // timout or signal interruption
264 return 0;
265
266 factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno);
267 return -1;
268}
269
270uint READ_STRUCT::activeSockets = 0;
271int READ_STRUCT::fd_epoll = -1;
272epoll_event READ_STRUCT::events[NBOARDS];
273
274void READ_STRUCT::init()
275{
276 if (fd_epoll>=0)
277 return;
278
279#ifdef USE_EPOLL
280 fd_epoll = epoll_create(NBOARDS);
281 if (fd_epoll<0)
282 {
283 factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno);
284 return;
285 }
286#endif
287}
288
289void READ_STRUCT::close()
290{
291#ifdef USE_EPOLL
292 if (fd_epoll>=0 && ::close(fd_epoll)>0)
293 factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno);
294#endif
295
296 fd_epoll = -1;
297}
298
299bool READ_STRUCT::create(sockaddr_in sockAddr)
300{
301 if (socket>=0)
302 return false;
303
304 const int port = ntohs(sockAddr.sin_port) + 1;
305
306 SockAddr.sin_family = sockAddr.sin_family;
307 SockAddr.sin_addr = sockAddr.sin_addr;
308 SockAddr.sin_port = htons(port);
309
310 if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
311 {
312 factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno);
313 socket = -1;
314 return false;
315 }
316
317 int optval = 1;
318 if (setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(int)) < 0)
319 factPrintf(MessageImp::kInfo, "Setting TCP_NODELAY for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
320
321 optval = 1;
322 if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0)
323 factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
324
325 optval = 10; //start after 10 seconds
326 if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0)
327 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
328
329 optval = 10; //do every 10 seconds
330 if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0)
331 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
332
333 optval = 2; //close after 2 unsuccessful tries
334 if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0)
335 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
336
337 factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket);
338
339 //connected = false;
340 activeSockets++;
341
342 return true;
343}
344
345void READ_STRUCT::destroy()
346{
347 if (socket<0)
348 return;
349
350#ifdef USE_EPOLL
351 // strictly speaking this should not be necessary
352 if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
353 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
354#endif
355
356 if (::close(socket) > 0)
357 factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
358 else
359 factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket);
360
361 // Set the socket to "not connected"
362 socket = -1;
363 connected = false;
364 activeSockets--;
365 bufLen = 0;
366}
367
368bool READ_STRUCT::check(int sockDef, sockaddr_in addr)
369{
370 // Continue in the most most likely case (performance)
371 //if (socket>=0 && sockDef!=0 && connected)
372 // return;
373 const int old = socket;
374
375 // socket open, but should not be open
376 if (socket>=0 && sockDef==0)
377 destroy();
378
379 // Socket closed, but should be open
380 if (socket<0 && sockDef!=0)
381 create(addr); //generate address and socket
382
383 const bool retval = old!=socket;
384
385 // Socket closed
386 if (socket<0)
387 return retval;
388
389 // Socket open and connected: Nothing to do
390 if (connected)
391 return retval;
392
393 //try to connect if not yet done
394 const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
395 if (rc == -1)
396 return retval;
397
398 connected = true;
399
400 if (sockDef<0)
401 {
402 bufTyp = READ_STRUCT::kStream; // full data to be skipped
403 bufLen = MAX_LEN; // huge for skipping
404 }
405 else
406 {
407 bufTyp = READ_STRUCT::kHeader; // expect a header
408 bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
409 }
410
411 bufPos = B; // no byte read so far
412 skip = 0; // start empty
413 totBytes = 0;
414 relBytes = 0;
415
416 factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
417
418#ifdef USE_EPOLL
419 epoll_event ev;
420 ev.events = EPOLLIN;
421 ev.data.ptr = this; // user data (union: ev.ptr)
422 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
423 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
424#endif
425
426 return retval;
427}
428
429bool READ_STRUCT::read()
430{
431 if (!connected)
432 return false;
433
434 if (bufLen==0)
435 return true;
436
437 const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
438 // recv failed
439 if (jrd<0)
440 {
441 // There was just nothing waiting
442 if (errno==EWOULDBLOCK || errno==EAGAIN)
443 return false;
444
445 factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno);
446 return false;
447 }
448
449 // connection was closed ...
450 if (jrd==0)
451 {
452 factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId);
453
454 destroy();//DestroySocket(rd[i]); //generate address and socket
455 return false;
456 }
457
458 totBytes += jrd;
459
460 // are we skipping this board ...
461 if (bufTyp==kStream)
462 return false;
463
464 if (bufPos==B)
465 gettimeofday(&time, NULL);
466
467 bufPos += jrd; //==> prepare for continuation
468 bufLen -= jrd;
469
470 // not yet all read
471 return bufLen==0;
472}
473
474void READ_STRUCT::swapHeader()
475{
476 S[1] = ntohs(S[1]); // package_length (bytes not swapped!)
477 S[2] = ntohs(S[2]); // version_no
478 S[3] = ntohs(S[3]); // PLLLCK
479 S[4] = ntohs(S[4]); // trigger_crc
480 S[5] = ntohs(S[5]); // trigger_type
481
482 I[3] = ntohl(I[3]); // trigger_id
483 I[4] = ntohl(I[4]); // fad_evt_counter
484 I[5] = ntohl(I[5]); // REFCLK_frequency
485
486 S[12] = ntohs(S[12]); // board id
487 S[13] = ntohs(S[13]); // adc_clock_phase_shift
488 S[14] = ntohs(S[14]); // number_of_triggers_to_generate
489 S[15] = ntohs(S[15]); // trigger_generator_prescaler
490
491 I[10] = ntohl(I[10]); // runnumber;
492 I[11] = ntohl(I[11]); // time;
493
494 // Use back inserter??
495 for (int s=24; s<24+NTemp+NDAC; s++)
496 S[s] = ntohs(S[s]); // drs_temperature / dac
497}
498
499void READ_STRUCT::swapData()
500{
501 // swapEventHeaderBytes: End of the header. to channels now
502
503 int i = 36;
504 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
505 {
506 S[i+0] = ntohs(S[i+0]);//id
507 S[i+1] = ntohs(S[i+1]);//start_cell
508 S[i+2] = ntohs(S[i+2]);//roi
509 S[i+3] = ntohs(S[i+3]);//filling
510
511 i += 4+S[i+2];//skip the pixel data
512 }
513}
514
515// ==========================================================================
516
517bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
518{
519 int xjr = -1;
520 int xkr = -1;
521
522 //points to the very first roi
523 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
524
525 roi[0] = ntohs(rd.S[roiPtr]);
526
527 for (int jr = 0; jr < 9; jr++)
528 {
529 roi[jr] = ntohs(rd.S[roiPtr]);
530
531 if (roi[jr]>1024)
532 {
533 factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]);
534 return false;
535 }
536
537 // Check that the roi of pixels jr are compatible with the one of pixel 0
538 if (jr!=8 && roi[jr]!=roi[0])
539 {
540 xjr = jr;
541 break;
542 }
543
544 // Check that the roi of all other DRS chips on boards are compatible
545 for (int kr = 1; kr < 4; kr++)
546 {
547 const int kroi = ntohs(rd.S[roiPtr]);
548 if (kroi != roi[jr])
549 {
550 xjr = jr;
551 xkr = kr;
552 break;
553 }
554 roiPtr += kroi+4;
555 }
556 }
557
558 if (xjr>=0)
559 {
560 if (xkr<0)
561 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
562 else
563 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr]));
564
565 return false;
566 }
567
568 if (roi[8] < roi[0])
569 {
570 factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
571 return false;
572 }
573
574 return true;
575}
576
577list<shared_ptr<EVT_CTRL2>> evtCtrl;
578
579shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
580{
581 /*
582 checkroi consistence
583 find existing entry
584 if no entry, try to allocate memory
585 if entry and memory, init event structure
586 */
587
588 uint16_t nRoi[9];
589 if (!checkRoiConsistency(rd, nRoi))
590 return shared_ptr<EVT_CTRL2>();
591
592 for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
593 {
594 // A reference is enough because the evtCtrl holds the shared_ptr anyway
595 const shared_ptr<EVT_CTRL2> &evt = *it;
596
597 // If the run is different, go on searching.
598 // We cannot stop searching if a lower run-id is found as in
599 // the case of the events, because theoretically, there
600 // can be the same run on two different days.
601 if (rd.H.runnumber != evt->runNum)
602 continue;
603
604 // If the ID of the new event if higher than the last one stored
605 // in that run, we have to assign a new slot (leave the loop)
606 if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/)
607 break;
608
609 if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/)
610 continue;
611
612 // We have found an entry with the same runID and evtID
613 // Check if ROI is consistent
614 if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
615 {
616 factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
617 evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
618 return shared_ptr<EVT_CTRL2>();
619 }
620
621 // It is maybe not likely, but the header of this board might have
622 // arrived earlier. (We could also update the run-info, but
623 // this should not make a difference here)
624 if ((rd.time.tv_sec==evt->time.tv_sec && rd.time.tv_usec<evt->time.tv_usec) ||
625 rd.time.tv_sec<evt->time.tv_sec)
626 evt->time = rd.time;
627
628 //everything seems fine so far ==> use this slot ....
629 return evt;
630 }
631
632 if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
633 {
634 factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
635 actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter);
636 return shared_ptr<EVT_CTRL2>();
637 }
638
639 EVT_CTRL2 *evt = new EVT_CTRL2;
640
641 evt->time = rd.time;
642
643 evt->runNum = rd.H.runnumber;
644 evt->evNum = rd.H.fad_evt_counter;
645
646 evt->trgNum = rd.H.trigger_id;
647 evt->trgTyp = rd.H.trigger_type;
648
649 evt->nRoi = nRoi[0];
650 evt->nRoiTM = nRoi[8];
651
652 //evt->firstBoard = rd.sockId;
653
654 const bool newrun = actrun->runId != rd.H.runnumber;
655 if (newrun)
656 {
657 factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d",
658 rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
659
660 // The new run is the active run now
661 actrun = make_shared<RUN_CTRL2>();
662
663 const time_t &tsec = evt->time.tv_sec;
664
665 actrun->openTime = tsec;
666 actrun->closeTime = tsec + 3600 * 24; // max time allowed
667 actrun->runId = rd.H.runnumber;
668 actrun->roi0 = nRoi[0]; // FIXME: Make obsolete!
669 actrun->roi8 = nRoi[8]; // FIXME: Make obsolete!
670
671 // Signal the fadctrl that a new run has been started
672 // Note this is the only place at which we can ensure that
673 // gotnewRun is called only once
674 gotNewRun(*actrun);
675 }
676
677 // Keep pointer to run of this event
678 evt->runCtrl = actrun;
679
680 // Increase the number of events we have started to receive in this run
681 actrun->lastTime = evt->time.tv_sec; // Time when the last event was received
682 actrun->lastEvt++;
683
684 // An event can be the first and the last, but not the last and the first.
685 // Therefore gotNewRun is called before runFinished.
686 // runFinished signals that the last event of a run was just received. Processing
687 // might still be ongoing, but we can start a new run.
688 const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached
689 const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached
690 if (!cond1 || !cond2)
691 runFinished();
692
693 // We don't mind here that this is not common to all events,
694 // because every coming event will fullfil the condition as well.
695 if (!cond1)
696 evt->closeRequest |= kRequestMaxEvtsReached;
697 if (!cond2)
698 evt->closeRequest |= kRequestMaxTimeReached;
699
700 // Secure access to evtCtrl against access in CloseRunFile
701 // This should be the last... otherwise we can run into threading issues
702 // if the event is accessed before it is fully initialized.
703 evtCtrl.emplace_back(evt);
704 return evtCtrl.back();
705}
706
707
708void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
709{
710 const int i = rBuf.sockId;
711
712 memcpy(evt->FADhead+i, &rBuf.H, sizeof(PEVNT_HEADER));
713
714 int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts
715
716 // consistency of ROIs have been checked already (is it all correct?)
717 const uint16_t &roi = rBuf.S[src+2];
718
719 // different sort in FAD board.....
720 EVENT *event = evt->fEvent;
721 for (int px = 0; px < 9; px++)
722 {
723 for (int drs = 0; drs < 4; drs++)
724 {
725 const int16_t pixC = rBuf.S[src+1]; // start-cell
726 const int16_t pixR = rBuf.S[src+2]; // roi
727 //here we should check if pixH is correct ....
728
729 const int pixS = i*36 + drs*9 + px;
730
731 event->StartPix[pixS] = pixC;
732
733 memcpy(event->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2);
734
735 src += 4+pixR;
736
737 // Treatment for ch 9 (TM channel)
738 if (px != 8)
739 continue;
740
741 const int tmS = i*4 + drs;
742
743 //and we have additional TM info
744 if (pixR > roi)
745 {
746 event->StartTM[tmS] = (pixC + pixR - roi) % 1024;
747
748 memcpy(event->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2);
749 }
750 else
751 {
752 event->StartTM[tmS] = -1;
753 }
754 }
755 }
756}
757
758// ==========================================================================
759
760uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt)
761{
762 factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)",
763 evt->runNum, evt->evNum, evtCtrl.size(), txt);
764
765 uint64_t report = 0;
766
767 char str[1000];
768
769 int ik=0;
770 for (int ib=0; ib<NBOARDS; ib++)
771 {
772 if (ib%10==0)
773 str[ik++] = '|';
774
775 const int jb = evt->board[ib];
776 if (jb>=0) // data received from that board
777 {
778 str[ik++] = '0'+(jb%10);
779 continue;
780 }
781
782 // FIXME: This is not synchronous... it reports
783 // accoridng to the current connection status, not w.r.t. to the
784 // one when the event was taken.
785 if (gi_NumConnect[ib]==0) // board not connected
786 {
787 str[ik++] = 'x';
788 continue;
789 }
790
791 // data from this board lost
792 str[ik++] = '.';
793 report |= ((uint64_t)1)<<ib;
794 }
795
796 str[ik++] = '|';
797 str[ik] = 0;
798
799 factOut(MessageImp::kWarn, str);
800
801 return report;
802}
803
804// ==========================================================================
805// ==========================================================================
806
807bool proc1(const shared_ptr<EVT_CTRL2> &);
808
809Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&proc1, placeholders::_1));
810
811bool proc1(const shared_ptr<EVT_CTRL2> &evt)
812{
813 applyCalib(*evt, processingQueue1.size());
814 return true;
815}
816
817// If this is not convenient anymore, it could be replaced by
818// a command queue, to which command+data is posted,
819// (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo)
820bool writeEvt(const shared_ptr<EVT_CTRL2> &evt)
821{
822 //const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
823 RUN_CTRL2 &run = *evt->runCtrl;
824
825 // Is this a valid event or just an empty event to trigger run close?
826 // If this is not an empty event open the new run-file
827 // Empty events are there to trigger run-closing conditions
828 if (evt->valid())
829 {
830 // File not yet open
831 if (run.fileStat==kFileNotYetOpen)
832 {
833 // runOpen will close a previous run, if still open
834 if (!runOpen(*evt))
835 {
836 factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
837 run.fileStat = kFileClosed;
838 return true;
839 }
840
841 factPrintf(MessageImp::kInfo, "Opened new file for data from run %d (evt=%d)", evt->runNum, evt->evNum);
842 run.fileStat = kFileOpen;
843 }
844
845 // Here we have a valid calibration and can go on with that.
846 // It is important that _all_ events are sent for calibration (except broken ones)
847 processingQueue1.post(evt);
848 }
849
850 // File already closed
851 if (run.fileStat==kFileClosed)
852 return true;
853
854 // If we will have a software trigger which prevents single events from writing,
855 // the logic of writing the stop time and the trigger counters need to be adapted.
856 // Currently it is just the values of the last valid event.
857 bool rc1 = true;
858 if (evt->valid())
859 {
860 rc1 = runWrite(*evt);
861 if (!rc1)
862 factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
863 }
864
865 // File not open... no need to close or to check for close
866 // ... this is the case if CloseRunFile was called before any file was opened.
867 if (run.fileStat!=kFileOpen)
868 return true;
869
870 // File is not yet to be closed.
871 if (rc1 && evt->closeRequest==kRequestNone)
872 return true;
873
874 runClose(*evt);
875 run.fileStat = kFileClosed;
876
877 vector<string> reason;
878 if (evt->closeRequest&kRequestManual)
879 reason.emplace_back("close was requested");
880 if (evt->closeRequest&kRequestTimeout)
881 reason.emplace_back("receive timed out");
882 if (evt->closeRequest&kRequestConnectionChange)
883 reason.emplace_back("connection changed");
884 if (evt->closeRequest&kRequestEventCheckFailed)
885 reason.emplace_back("event check failed");
886 if (evt->closeRequest&kRequestMaxTimeReached)
887 reason.push_back(to_string(run.closeTime-run.openTime)+"s had been reached");
888 if (evt->closeRequest&kRequestMaxEvtsReached)
889 reason.push_back(to_string(run.maxEvt)+" evts had been reached");
890 if (!rc1)
891 reason.emplace_back("runWrite failed");
892
893 const string str = boost::algorithm::join(reason, ", ");
894 factPrintf(MessageImp::kInfo, "File %d was closed because %s", run.runId, str.c_str());
895
896 return true;
897}
898
899Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1));
900
901bool procEvt(const shared_ptr<EVT_CTRL2> &evt)
902{
903 RUN_CTRL2 &run = *evt->runCtrl;
904
905 bool check = true;
906 if (evt->valid())
907 {
908 EVENT *event = evt->fEvent;
909
910 // This is already done in initMemory()
911 //event->Roi = evt->runCtrl->roi0;
912 //event->RoiTM = evt->runCtrl->roi8;
913 //event->EventNum = evt->evNum;
914 //event->TriggerNum = evt->trgNum;
915 //event->TriggerType = evt->trgTyp;
916
917 event->NumBoards = evt->nBoard;
918
919 event->PCTime = evt->time.tv_sec;
920 event->PCUsec = evt->time.tv_usec;
921
922 for (int ib=0; ib<NBOARDS; ib++)
923 event->BoardTime[ib] = evt->FADhead[ib].time;
924
925 check = eventCheck(*evt);
926
927 // If the event is valid, increase the trigger counter accordingly
928 if (check)
929 {
930 // Physics trigger
931 if (evt->trgTyp && !(evt->trgTyp & FAD::EventHeader::kAll))
932 run.triggerCounter[0]++;
933 // Pure pedestal trigger
934 else if ((evt->trgTyp&FAD::EventHeader::kPedestal) && !(evt->trgTyp&FAD::EventHeader::kTIM))
935 run.triggerCounter[1]++;
936 // external light pulser trigger
937 else if (evt->trgTyp & FAD::EventHeader::kLPext)
938 run.triggerCounter[2]++;
939 // time calibration triggers
940 else if (evt->trgTyp & (FAD::EventHeader::kTIM|FAD::EventHeader::kPedestal))
941 run.triggerCounter[3]++;
942 // internal light pulser trigger
943 else if (evt->trgTyp & FAD::EventHeader::kLPint)
944 run.triggerCounter[4]++;
945 // external trigger input 1
946 else if (evt->trgTyp & FAD::EventHeader::kExt1)
947 run.triggerCounter[5]++;
948 // external trigger input 2
949 else if (evt->trgTyp & FAD::EventHeader::kExt2)
950 run.triggerCounter[6]++;
951 // other triggers
952 else
953 run.triggerCounter[7]++;
954 }
955 }
956
957 // If this is an invalid event, the current triggerCounter needs to be copied
958 // because runClose will use that one to update the TRIGGER_COUNTER.
959 // When closing the file, the trigger counter of the last successfully
960 // written event is used.
961 evt->triggerCounter = run.triggerCounter;
962
963 // If event check has failed, skip the event and post a close request instead.
964 // Otherwise, if file is open post the event for being written
965 if (!check)
966 secondaryQueue.emplace(new EVT_CTRL2(kRequestEventCheckFailed, evt->runCtrl));
967 else
968 secondaryQueue.post(evt);
969
970 return true;
971}
972
973// ==========================================================================
974// ==========================================================================
975
976/*
977 task 1-4:
978
979 lock1()-lock4();
980 while (1)
981 {
982 wait for signal [lockN]; // unlocked
983
984 while (n!=10)
985 wait sockets;
986 read;
987
988 lockM();
989 finished[n] = true;
990 signal(mainloop);
991 unlockM();
992 }
993
994
995 mainloop:
996
997 while (1)
998 {
999 lockM();
1000 while (!finished[0] || !finished[1] ...)
1001 wait for signal [lockM]; // unlocked... signals can be sent
1002 finished[0-1] = false;
1003 unlockM()
1004
1005 copy data to queue // locked
1006
1007 lockN[0-3];
1008 signalN[0-3];
1009 unlockN[0-3];
1010 }
1011
1012
1013 */
1014
1015/*
1016 while (g_reset)
1017 {
1018 shared_ptr<EVT_CTRL2> evt = new shared_ptr<>;
1019
1020 // Check that all sockets are connected
1021
1022 for (int i=0; i<40; i++)
1023 if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0)
1024 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1025
1026 while (g_reset)
1027 {
1028 if (READ_STRUCT::wait()<0)
1029 break;
1030
1031 if (rc_epoll==0)
1032 break;
1033
1034 for (int jj=0; jj<rc_epoll; jj++)
1035 {
1036 READ_STRUCT *rs = READ_STRUCT::get(jj);
1037 if (!rs->connected)
1038 continue;
1039
1040 const bool rc_read = rs->read();
1041 if (!rc_read)
1042 continue;
1043
1044 if (rs->bufTyp==READ_STRUCT::kHeader)
1045 {
1046 [...]
1047 }
1048
1049 [...]
1050
1051 if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
1052 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1053 }
1054
1055 if (once_a_second)
1056 {
1057 if (evt==timeout)
1058 break;
1059 }
1060 }
1061
1062 if (evt.nBoards==actBoards)
1063 primaryQueue.post(evt);
1064 }
1065*/
1066
1067Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
1068
1069// This corresponds more or less to fFile... should we merge both?
1070shared_ptr<RUN_CTRL2> actrun;
1071
1072void CloseRunFile()
1073{
1074 // Currently we need actrun here, to be able to set kFileClosed.
1075 // Apart from that we have to ensure that there is an open file at all
1076 // which we can close.
1077 // Submission to the primary queue ensures that the event
1078 // is placed at the right place in the processing chain.
1079 // (Corresponds to the correct run)
1080 primaryQueue.emplace(new EVT_CTRL2(kRequestManual, actrun));
1081}
1082
1083bool mainloop(READ_STRUCT *rd)
1084{
1085 factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
1086
1087 primaryQueue.start();
1088 secondaryQueue.start();
1089 processingQueue1.start();;
1090
1091 actrun = make_shared<RUN_CTRL2>();
1092
1093 //time in seconds
1094 time_t gi_SecTime = time(NULL)-1;
1095
1096 //loop until global variable g_runStat claims stop
1097 g_reset = 0;
1098 while (g_reset == 0)
1099 {
1100#ifdef USE_POLL
1101 int pp[40];
1102 int nn = 0;
1103 pollfd fds[40];
1104 for (int i=0; i<40; i++)
1105 {
1106 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1107 {
1108 fds[nn].fd = rd[i].socket;
1109 fds[nn].events = POLLIN;
1110 pp[nn] = i;
1111 nn++;
1112 }
1113 }
1114
1115 const int rc_epoll = poll(fds, nn, 100);
1116 if (rc_epoll<0)
1117 break;
1118#endif
1119
1120#ifdef USE_SELECT
1121 fd_set readfs;
1122 FD_ZERO(&readfs);
1123 int nfsd = 0;
1124 for (int i=0; i<NBOARDS; i++)
1125 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1126 {
1127 FD_SET(rd[i].socket, &readfs);
1128 if (rd[i].socket>nfsd)
1129 nfsd = rd[i].socket;
1130 }
1131
1132 timeval tv;
1133 tv.tv_sec = 0;
1134 tv.tv_usec = 100000;
1135 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
1136 // 0: timeout
1137 // -1: error
1138 if (rc_select<0)
1139 {
1140 factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
1141 continue;
1142 }
1143#endif
1144
1145#ifdef USE_EPOLL
1146 const int rc_epoll = READ_STRUCT::wait();
1147 if (rc_epoll<0)
1148 break;
1149#endif
1150
1151#ifdef PRIORITY_QUEUE
1152 priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio;
1153
1154 for (int i=0; i<NBOARDS; i++)
1155 if (rd[i].connected)
1156 prio.push(rd+i);
1157
1158 if (!prio.empty()) do
1159#endif
1160
1161
1162#ifdef USE_POLL
1163 for (int jj=0; jj<nn; jj++)
1164#endif
1165#ifdef USE_EPOLL
1166 for (int jj=0; jj<rc_epoll; jj++)
1167#endif
1168#if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE)
1169 for (int jj=0; jj<NBOARDS; jj++)
1170#endif
1171 {
1172#ifdef PRIORITY_QUEUE
1173 READ_STRUCT *rs = prio.top();
1174#endif
1175#ifdef USE_SELECT
1176 if (!FD_ISSET(rs->socket, &readfs))
1177 continue;
1178#endif
1179
1180#ifdef USE_POLL
1181 if ((fds[jj].revents&POLLIN)==0)
1182 continue;
1183#endif
1184
1185#ifdef USE_EPOLL
1186 // FIXME: How to get i?
1187 READ_STRUCT *rs = READ_STRUCT::get(jj);
1188#endif
1189
1190#ifdef USE_POLL
1191 // FIXME: How to get i?
1192 READ_STRUCT *rs = &rd[pp[jj]];
1193#endif
1194
1195#if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE)
1196 const int i = (jj%4)*10 + (jj/4);
1197 READ_STRUCT *rs = &rd[i];
1198#endif
1199
1200#ifdef COMPLETE_EVENTS
1201 if (rs->bufTyp==READ_STRUCT::kWait)
1202 continue;
1203#endif
1204
1205 // ==================================================================
1206
1207 const bool rc_read = rs->read();
1208
1209 // Connect might have gotten closed during read
1210 gi_NumConnect[rs->sockId] = rs->connected;
1211 gj.numConn[rs->sockId] = rs->connected;
1212
1213 // Read either failed or disconnected, or the buffer is not yet full
1214 if (!rc_read)
1215 continue;
1216
1217 // ==================================================================
1218
1219 if (rs->bufTyp==READ_STRUCT::kHeader)
1220 {
1221 //check if startflag correct; else shift block ....
1222 // FIXME: This is not enough... this combination of
1223 // bytes can be anywhere... at least the end bytes
1224 // must be checked somewhere, too.
1225 uint k;
1226 for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
1227 {
1228 if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
1229 break;
1230 }
1231 rs->skip += k;
1232
1233 //no start of header found
1234 if (k==sizeof(PEVNT_HEADER)-1)
1235 {
1236 rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1];
1237 rs->bufPos = rs->B+1;
1238 rs->bufLen = sizeof(PEVNT_HEADER)-1;
1239 continue;
1240 }
1241
1242 if (k > 0)
1243 {
1244 memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
1245
1246 rs->bufPos -= k;
1247 rs->bufLen += k;
1248
1249 continue; // We need to read more (bufLen>0)
1250 }
1251
1252 if (rs->skip>0)
1253 {
1254 factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
1255 rs->skip = 0;
1256 }
1257
1258 // Swap the header entries from network to host order
1259 rs->swapHeader();
1260
1261 rs->bufTyp = READ_STRUCT::kData;
1262 rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
1263
1264 debugHead(rs->B); // i and fadBoard not used
1265
1266 continue;
1267 }
1268
1269 const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
1270 if (end != 0xfe04)
1271 {
1272 factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
1273 rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
1274
1275 // ready to read next header
1276 rs->bufTyp = READ_STRUCT::kHeader;
1277 rs->bufLen = sizeof(PEVNT_HEADER);
1278 rs->bufPos = rs->B;
1279 // FIXME: What to do with the validity flag?
1280 continue;
1281 }
1282
1283 // get index into mBuffer for this event (create if needed)
1284 const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
1285
1286 // We have a valid entry, but no memory has yet been allocated
1287 if (evt && !evt->initMemory())
1288 {
1289 const time_t tm = time(NULL);
1290 if (evt->runCtrl->reportMem==tm)
1291 continue;
1292
1293 factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
1294 evt->runCtrl->reportMem = tm;
1295 continue;
1296 }
1297
1298 // ready to read next header
1299 rs->bufTyp = READ_STRUCT::kHeader;
1300 rs->bufLen = sizeof(PEVNT_HEADER);
1301 rs->bufPos = rs->B;
1302
1303 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1304 if (!evt)
1305 continue;
1306
1307 // This should never happen
1308 if (evt->board[rs->sockId] != -1)
1309 {
1310 factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
1311 evt->evNum, rs->sockId, jj, rs->len());
1312 // FIXME: What to do with the validity flag?
1313 continue; // Continue reading next header
1314 }
1315
1316 // Swap the data entries (board headers) from network to host order
1317 rs->swapData();
1318
1319 // Copy data from rd[i] to mBuffer[evID]
1320 copyData(*rs, evt.get());
1321
1322#ifdef COMPLETE_EVENTS
1323 // Do not read anmymore from this board until the whole event has been received
1324 rs->bufTyp = READ_STRUCT::kWait;
1325#endif
1326 // now we have stored a new board contents into Event structure
1327 evt->board[rs->sockId] = rs->sockId;
1328 evt->header = evt->FADhead+rs->sockId;
1329 evt->nBoard++;
1330
1331#ifdef COMPLETE_EPOLL
1332 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
1333 {
1334 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1335 break;
1336 }
1337#endif
1338 // event not yet complete
1339 if (evt->nBoard < READ_STRUCT::activeSockets)
1340 continue;
1341
1342 // All previous events are now flagged as incomplete ("expired")
1343 // and will be removed. (This is a bit tricky, because pop_front()
1344 // would invalidate the current iterator if not done _after_ the increment)
1345 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1346 {
1347 const bool found = it->get()==evt.get();
1348 if (!found)
1349 reportIncomplete(*it, "expired");
1350 else
1351 primaryQueue.post(evt);
1352
1353 // package_len is 0 if nothing was received.
1354 for (int ib=0; ib<40; ib++)
1355 rd[ib].relBytes += uint32_t((*it)->FADhead[ib].package_length)*2;
1356
1357 // The counter must be increased _before_ the pop_front,
1358 // otherwise the counter is invalidated by the pop_front!
1359 it++;
1360 evtCtrl.pop_front();
1361
1362 // We reached the current event, so we are done
1363 if (found)
1364 break;
1365 }
1366
1367#ifdef COMPLETE_EPOLL
1368 for (int j=0; j<40; j++)
1369 {
1370 epoll_event ev;
1371 ev.events = EPOLLIN;
1372 ev.data.ptr = &rd[j]; // user data (union: ev.ptr)
1373 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
1374 {
1375 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1376 return;
1377 }
1378 }
1379#endif
1380
1381#ifdef COMPLETE_EVENTS
1382 for (int j=0; j<40; j++)
1383 {
1384 //if (rs->bufTyp==READ_STRUCT::kWait)
1385 {
1386 rs->bufTyp = READ_STRUCT::kHeader;
1387 rs->bufLen = sizeof(PEVNT_HEADER);
1388 rs->bufPos = rs->B;
1389 }
1390 }
1391#endif
1392 } // end for loop over all sockets
1393#ifdef PRIORITY_QUEUE
1394 while (0); // convert continue into break ;)
1395#endif
1396
1397 // ==================================================================
1398
1399 const time_t actTime = time(NULL);
1400 if (actTime == gi_SecTime)
1401 {
1402#if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL)
1403 if (evtCtrl.empty())
1404 usleep(actTime-actrun->lastTime>300 ? 10000 : 1);
1405#endif
1406 continue;
1407 }
1408 gi_SecTime = actTime;
1409
1410 // ==================================================================
1411 //loop over all active events and flag those older than read-timeout
1412 //delete those that are written to disk ....
1413
1414 // This could be improved having the pointer which separates the queue with
1415 // the incomplete events from the queue with the complete events
1416 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1417 {
1418 // A reference is enough because the shared_ptr is hold by the evtCtrl
1419 const shared_ptr<EVT_CTRL2> &evt = *it;
1420
1421 // The first event is the oldest. If the first event within the
1422 // timeout window was received, we can stop searching further.
1423 if (evt->time.tv_sec+g_evtTimeout>=actTime)
1424 break;
1425
1426 // The counter must be increased _before_ the pop_front,
1427 // otherwise the counter is invalidated by the pop_front!
1428 it++;
1429
1430 // This timeout is caused because complete data from one or more
1431 // boards has been received, but the memory could not be allocated.
1432 // There is no reason why we should not go on waiting for
1433 // memory to become free. However, the FADs will disconnect
1434 // after 60s due to their keep-alive timeout, but the event builder
1435 // will still wait for memory to become available.
1436 // Currently, the only possibility to free the memory from the
1437 // evtCtrl to restart the event builder (STOP/START).
1438 if (!evt->valid())
1439 continue;
1440
1441 // This will result in the emission of a dim service.
1442 // It doesn't matter if that takes comparably long,
1443 // because we have to stop the run anyway.
1444 const uint64_t rep = reportIncomplete(evt, "timeout");
1445 factReportIncomplete(rep);
1446
1447 // At least the data from one boards is complete...
1448 // package_len is 0 when nothing was received from this board
1449 for (int ib=0; ib<40; ib++)
1450 rd[ib].relBytes += uint32_t(evt->FADhead[ib].package_length)*2;
1451
1452 evtCtrl.pop_front();
1453 }
1454
1455 // =================================================================
1456
1457 gj.bufNew = evtCtrl.size(); //# incomplete events in buffer
1458 gj.bufEvt = primaryQueue.size(); //# complete events in buffer
1459 gj.bufWrite = secondaryQueue.size(); //# complete events in buffer
1460 gj.bufProc = processingQueue1.size(); //# complete events in buffer
1461 gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
1462 gj.usdMem = Memory::max_inuse;
1463 gj.totMem = Memory::allocated;
1464 gj.maxMem = g_maxMem;
1465
1466 gj.deltaT = 1000; // temporary, must be improved
1467
1468 bool changed = false;
1469
1470 static vector<uint64_t> store(NBOARDS);
1471
1472 for (int ib=0; ib<NBOARDS; ib++)
1473 {
1474 gj.rateBytes[ib] = store[ib]>rd[ib].totBytes ? rd[ib].totBytes : rd[ib].totBytes-store[ib];
1475 gj.relBytes[ib] = rd[ib].totBytes-rd[ib].relBytes;
1476
1477 store[ib] = rd[ib].totBytes;
1478
1479 if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
1480 changed = true;
1481
1482 gi_NumConnect[ib] = rd[ib].connected;
1483 gj.numConn[ib] = rd[ib].connected;
1484 }
1485
1486 factStat(gj);
1487
1488 Memory::max_inuse = 0;
1489
1490 // =================================================================
1491
1492 // This is a fake event to trigger possible run-closing conditions once a second
1493 // FIXME: This is not yet ideal because a file would never be closed
1494 // if a new file has been started and no events of the new file
1495 // have been received yet
1496 int request = kRequestNone;
1497
1498 // If nothing was received for more than 5min, close file
1499 if (actTime-actrun->lastTime>300)
1500 request |= kRequestTimeout;
1501
1502 // If connection status has changed
1503 if (changed)
1504 request |= kRequestConnectionChange;
1505
1506 if (request!=kRequestNone)
1507 runFinished();
1508
1509 if (actrun->fileStat==kFileOpen)
1510 primaryQueue.emplace(new EVT_CTRL2(request, actrun));
1511 }
1512
1513 // 1: Stop, wait for event to get processed
1514 // 2: Stop, finish immediately
1515 // 101: Restart, wait for events to get processed
1516 // 101: Restart, finish immediately
1517 //
1518 const int gi_reset = g_reset;
1519
1520 const bool abort = gi_reset%100==2;
1521
1522 factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
1523
1524 primaryQueue.wait(abort);
1525 secondaryQueue.wait(abort);
1526 processingQueue1.wait(abort);
1527
1528 // Here we also destroy all runCtrl structures and hence close all open files
1529 evtCtrl.clear();
1530 actrun.reset();
1531
1532 factPrintf(MessageImp::kInfo, "Exit read Process...");
1533 factPrintf(MessageImp::kInfo, "%llu Bytes flagged as in-use.", Memory::inuse);
1534
1535 factStat(gj);
1536
1537 return gi_reset>=100;
1538}
1539
1540// ==========================================================================
1541// ==========================================================================
1542
1543void StartEvtBuild()
1544{
1545 factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
1546
1547 memset(gi_NumConnect, 0, NBOARDS*sizeof(*gi_NumConnect));
1548
1549 memset(&gj, 0, sizeof(GUI_STAT));
1550
1551 gj.usdMem = Memory::inuse;
1552 gj.totMem = Memory::allocated;
1553 gj.maxMem = g_maxMem;
1554
1555
1556 READ_STRUCT rd[NBOARDS];
1557
1558 // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets)
1559 for (int i=0; i<NBOARDS; i++)
1560 rd[i].sockId = i;
1561
1562 while (mainloop(rd));
1563
1564 //must close all open sockets ...
1565 factPrintf(MessageImp::kInfo, "Close all sockets...");
1566
1567 READ_STRUCT::close();
1568
1569 // Now all sockets get closed. This is not reflected in gi_NumConnect
1570 // The current workaround is to count all sockets as closed when the thread is not running
1571 factPrintf(MessageImp::kInfo, "EventBuilder++ closed");
1572}
Note: See TracBrowser for help on using the repository browser.