source: trunk/FACT++/src/EventBuilder.cc@ 17678

Last change on this file since 17678 was 17678, checked in by tbretz, 11 years ago
It can happen that an event has been created but no header is ever received. If it times out, the header was accessed which is not valid. With the recent fix, I assume that whenever something is received all the rest from the board is received as well, or: half events from one board do not time out.
File size: 46.7 KB
Line 
1#include <poll.h>
2#include <sys/time.h>
3#include <sys/epoll.h>
4#include <netinet/tcp.h>
5
6#include <cstring>
7#include <cstdarg>
8#include <list>
9#include <queue>
10
11#include <boost/algorithm/string/join.hpp>
12
13#include "../externals/Queue.h"
14
15#include "MessageImp.h"
16#include "EventBuilder.h"
17#include "HeadersFAD.h"
18
19using namespace std;
20
21#define MIN_LEN 32 // min #bytes needed to interpret FADheader
22#define MAX_LEN 81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092 (data+boardheader+eventheader+endflag)
23
24//#define COMPLETE_EVENTS
25//#define USE_POLL
26//#define USE_EPOLL
27//#define USE_SELECT
28//#define COMPLETE_EPOLL
29//#define PRIORITY_QUEUE
30
31// Reading only 1024: 13: 77Hz, 87%
32// Reading only 1024: 12: 78Hz, 46%
33// Reading only 300: 4: 250Hz, 92%
34// Reading only 300: 3: 258Hz, 40%
35
36// Reading only four threads 1024: 13: 77Hz, 60%
37// Reading only four threads 1024: 12: 78Hz, 46%
38// Reading only four threads 300: 4: 250Hz, 92%
39// Reading only four threads 300: 3: 258Hz, 40%
40
41// Default 300: 4: 249Hz, 92%
42// Default 300: 3: 261Hz, 40%
43// Default 1024: 13: 76Hz, 93%
44// Default 1024: 12: 79Hz, 46%
45
46// Poll [selected] 1024: 13: 63Hz, 45%
47// Poll [selected] 1024: 14: 63Hz, 63%
48// Poll [selected] 1024: 15: 64Hz, 80%
49// Poll [selected] 300: 4: 230Hz, 47%
50// Poll [selected] 300: 3: 200Hz, 94%
51
52// Poll [all] 1024: 13: 65Hz, 47%
53// Poll [all] 1024: 14: 64Hz, 59%
54// Poll [all] 1024: 15: 62Hz, 67%
55// Poll [all] 300: 4: 230Hz, 47%
56// Poll [all] 300: 3: 230Hz, 35%
57
58// ==========================================================================
59
60bool runOpen(const EVT_CTRL2 &evt);
61bool runWrite(const EVT_CTRL2 &evt);
62void runClose(const EVT_CTRL2 &run);
63void applyCalib(const EVT_CTRL2 &evt, const size_t &size);
64void factOut(int severity, const char *message);
65void factReportIncomplete (uint64_t rep);
66void gotNewRun(RUN_CTRL2 &run);
67void runFinished();
68void factStat(const GUI_STAT &gj);
69bool eventCheck(const EVT_CTRL2 &evt);
70void debugHead(void *buf);
71
72// ==========================================================================
73
74int g_reset;
75
76size_t g_maxMem; //maximum memory allowed for buffer
77
78uint16_t g_evtTimeout; // timeout (sec) for one event
79
80FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
81
82uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
83
84GUI_STAT gj;
85
86// ==========================================================================
87
88namespace Memory
89{
90 uint64_t inuse = 0;
91 uint64_t allocated = 0;
92
93 uint64_t max_inuse = 0;
94
95 std::mutex mtx;
96
97 std::forward_list<void*> memory;
98
99 void *malloc()
100 {
101 // No free slot available, next alloc would exceed max memory
102 if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem)
103 return NULL;
104
105 // We will return this amount of memory
106 // This is not 100% thread safe, but it is not a super accurate measure anyway
107 inuse += MAX_TOT_MEM;
108 if (inuse>max_inuse)
109 max_inuse = inuse;
110
111 if (memory.empty())
112 {
113 // No free slot available, allocate a new one
114 allocated += MAX_TOT_MEM;
115 return new char[MAX_TOT_MEM];
116 }
117
118 // Get the next free slot from the stack and return it
119 const std::lock_guard<std::mutex> lock(mtx);
120
121 void *mem = memory.front();
122 memory.pop_front();
123 return mem;
124 };
125
126 void free(void *mem)
127 {
128 if (!mem)
129 return;
130
131 // Decrease the amont of memory in use accordingly
132 inuse -= MAX_TOT_MEM;
133
134 // If the maximum memory has changed, we might be over the limit.
135 // In this case: free a slot
136 if (allocated>g_maxMem)
137 {
138 delete [] (char*)mem;
139 allocated -= MAX_TOT_MEM;
140 return;
141 }
142
143 const std::lock_guard<std::mutex> lock(mtx);
144 memory.push_front(mem);
145 }
146
147};
148
149// ==========================================================================
150
151void factPrintf(int severity, const char *fmt, ...)
152{
153 char str[1000];
154
155 va_list ap;
156 va_start(ap, fmt);
157 vsnprintf(str, 1000, fmt, ap);
158 va_end(ap);
159
160 factOut(severity, str);
161}
162
163// ==========================================================================
164
165struct READ_STRUCT
166{
167 enum buftyp_t
168 {
169 kStream,
170 kHeader,
171 kData,
172#ifdef COMPLETE_EVENTS
173 kWait
174#endif
175 };
176
177 // ---------- connection ----------
178
179 static uint activeSockets;
180
181 int sockId; // socket id (board number)
182 int socket; // socket handle
183 bool connected; // is this socket connected?
184
185 struct sockaddr_in SockAddr; // Socket address copied from wrapper during socket creation
186
187 // ------------ epoll -------------
188
189 static int fd_epoll;
190 static epoll_event events[NBOARDS];
191
192 static void init();
193 static void close();
194 static int wait();
195 static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); }
196
197 // ------------ buffer ------------
198
199 buftyp_t bufTyp; // what are we reading at the moment: 0=header 1=data -1=skip ...
200
201 uint32_t bufLen; // number of bytes left to read
202 uint8_t *bufPos; // next byte to read to the buffer next
203
204 union
205 {
206 uint8_t B[MAX_LEN];
207 uint16_t S[MAX_LEN / 2];
208 uint32_t I[MAX_LEN / 4];
209 uint64_t L[MAX_LEN / 8];
210 PEVNT_HEADER H;
211 };
212
213 timeval time;
214 uint64_t totBytes; // total received bytes
215 uint64_t relBytes; // total released bytes
216 uint32_t skip; // number of bytes skipped before start of event
217
218 uint32_t len() const { return uint32_t(H.package_length)*2; }
219
220 void swapHeader();
221 void swapData();
222
223 // --------------------------------
224
225 READ_STRUCT() : socket(-1), connected(false), totBytes(0), relBytes(0)
226 {
227 if (fd_epoll<0)
228 init();
229 }
230 ~READ_STRUCT()
231 {
232 destroy();
233 }
234
235 void destroy();
236 bool create(sockaddr_in addr);
237 bool check(int, sockaddr_in addr);
238 bool read();
239
240};
241
242#ifdef PRIORITY_QUEUE
243struct READ_STRUCTcomp
244{
245 bool operator()(const READ_STRUCT *r1, const READ_STRUCT *r2)
246 {
247 const int64_t rel1 = r1->totBytes - r1->relBytes;
248 const int64_t rel2 = r2->totBytes - r2->relBytes;
249 return rel1 > rel2;
250 }
251};
252#endif
253
254int READ_STRUCT::wait()
255{
256 // wait for something to do...
257 const int rc = epoll_wait(fd_epoll, events, NBOARDS, 100); // max, timeout[ms]
258 if (rc>=0)
259 return rc;
260
261 if (errno==EINTR) // timout or signal interruption
262 return 0;
263
264 factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno);
265 return -1;
266}
267
268uint READ_STRUCT::activeSockets = 0;
269int READ_STRUCT::fd_epoll = -1;
270epoll_event READ_STRUCT::events[NBOARDS];
271
272void READ_STRUCT::init()
273{
274 if (fd_epoll>=0)
275 return;
276
277#ifdef USE_EPOLL
278 fd_epoll = epoll_create(NBOARDS);
279 if (fd_epoll<0)
280 {
281 factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno);
282 return;
283 }
284#endif
285}
286
287void READ_STRUCT::close()
288{
289#ifdef USE_EPOLL
290 if (fd_epoll>=0 && ::close(fd_epoll)>0)
291 factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno);
292#endif
293
294 fd_epoll = -1;
295}
296
297bool READ_STRUCT::create(sockaddr_in sockAddr)
298{
299 if (socket>=0)
300 return false;
301
302 const int port = ntohs(sockAddr.sin_port) + 1;
303
304 SockAddr.sin_family = sockAddr.sin_family;
305 SockAddr.sin_addr = sockAddr.sin_addr;
306 SockAddr.sin_port = htons(port);
307
308 if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
309 {
310 factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno);
311 socket = -1;
312 return false;
313 }
314
315 int optval = 1;
316 if (setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(int)) < 0)
317 factPrintf(MessageImp::kInfo, "Setting TCP_NODELAY for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
318
319 optval = 1;
320 if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0)
321 factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
322
323 optval = 10; //start after 10 seconds
324 if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0)
325 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
326
327 optval = 10; //do every 10 seconds
328 if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0)
329 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
330
331 optval = 2; //close after 2 unsuccessful tries
332 if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0)
333 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
334
335 factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket);
336
337 //connected = false;
338 activeSockets++;
339
340 return true;
341}
342
343void READ_STRUCT::destroy()
344{
345 if (socket<0)
346 return;
347
348#ifdef USE_EPOLL
349 // strictly speaking this should not be necessary
350 if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
351 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
352#endif
353
354 if (::close(socket) > 0)
355 factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
356 else
357 factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket);
358
359 socket = -1;
360 connected = false;
361 activeSockets--;
362}
363
364bool READ_STRUCT::check(int sockDef, sockaddr_in addr)
365{
366 // Continue in the most most likely case (performance)
367 //if (socket>=0 && sockDef!=0 && connected)
368 // return;
369 const int old = socket;
370
371 // socket open, but should not be open
372 if (socket>=0 && sockDef==0)
373 destroy();
374
375 // Socket closed, but should be open
376 if (socket<0 && sockDef!=0)
377 create(addr); //generate address and socket
378
379 const bool retval = old!=socket;
380
381 // Socket closed
382 if (socket<0)
383 return retval;
384
385 // Socket open and connected: Nothing to do
386 if (connected)
387 return retval;
388
389 //try to connect if not yet done
390 const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
391 if (rc == -1)
392 return retval;
393
394 connected = true;
395
396 if (sockDef<0)
397 {
398 bufTyp = READ_STRUCT::kStream; // full data to be skipped
399 bufLen = MAX_LEN; // huge for skipping
400 }
401 else
402 {
403 bufTyp = READ_STRUCT::kHeader; // expect a header
404 bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
405 }
406
407 bufPos = B; // no byte read so far
408 skip = 0; // start empty
409 totBytes = 0;
410 relBytes = 0;
411
412 factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
413
414#ifdef USE_EPOLL
415 epoll_event ev;
416 ev.events = EPOLLIN;
417 ev.data.ptr = this; // user data (union: ev.ptr)
418 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
419 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
420#endif
421
422 return retval;
423}
424
425bool READ_STRUCT::read()
426{
427 if (!connected)
428 return false;
429
430 if (bufLen==0)
431 return true;
432
433 const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
434 // recv failed
435 if (jrd<0)
436 {
437 // There was just nothing waiting
438 if (errno==EWOULDBLOCK || errno==EAGAIN)
439 return false;
440
441 factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno);
442 return false;
443 }
444
445 // connection was closed ...
446 if (jrd==0)
447 {
448 factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId);
449
450 destroy();//DestroySocket(rd[i]); //generate address and socket
451 return false;
452 }
453
454 totBytes += jrd;
455
456 // are we skipping this board ...
457 if (bufTyp==kStream)
458 return false;
459
460 if (bufPos==B)
461 gettimeofday(&time, NULL);
462
463 bufPos += jrd; //==> prepare for continuation
464 bufLen -= jrd;
465
466 // not yet all read
467 return bufLen==0;
468}
469
470void READ_STRUCT::swapHeader()
471{
472 S[1] = ntohs(S[1]); // package_length (bytes not swapped!)
473 S[2] = ntohs(S[2]); // version_no
474 S[3] = ntohs(S[3]); // PLLLCK
475 S[4] = ntohs(S[4]); // trigger_crc
476 S[5] = ntohs(S[5]); // trigger_type
477
478 I[3] = ntohl(I[3]); // trigger_id
479 I[4] = ntohl(I[4]); // fad_evt_counter
480 I[5] = ntohl(I[5]); // REFCLK_frequency
481
482 S[12] = ntohs(S[12]); // board id
483 S[13] = ntohs(S[13]); // adc_clock_phase_shift
484 S[14] = ntohs(S[14]); // number_of_triggers_to_generate
485 S[15] = ntohs(S[15]); // trigger_generator_prescaler
486
487 I[10] = ntohl(I[10]); // runnumber;
488 I[11] = ntohl(I[11]); // time;
489
490 // Use back inserter??
491 for (int s=24; s<24+NTemp+NDAC; s++)
492 S[s] = ntohs(S[s]); // drs_temperature / dac
493}
494
495void READ_STRUCT::swapData()
496{
497 // swapEventHeaderBytes: End of the header. to channels now
498
499 int i = 36;
500 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
501 {
502 S[i+0] = ntohs(S[i+0]);//id
503 S[i+1] = ntohs(S[i+1]);//start_cell
504 S[i+2] = ntohs(S[i+2]);//roi
505 S[i+3] = ntohs(S[i+3]);//filling
506
507 i += 4+S[i+2];//skip the pixel data
508 }
509}
510
511// ==========================================================================
512
513bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
514{
515 int xjr = -1;
516 int xkr = -1;
517
518 //points to the very first roi
519 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
520
521 roi[0] = ntohs(rd.S[roiPtr]);
522
523 for (int jr = 0; jr < 9; jr++)
524 {
525 roi[jr] = ntohs(rd.S[roiPtr]);
526
527 if (roi[jr]>1024)
528 {
529 factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]);
530 return false;
531 }
532
533 // Check that the roi of pixels jr are compatible with the one of pixel 0
534 if (jr!=8 && roi[jr]!=roi[0])
535 {
536 xjr = jr;
537 break;
538 }
539
540 // Check that the roi of all other DRS chips on boards are compatible
541 for (int kr = 1; kr < 4; kr++)
542 {
543 const int kroi = ntohs(rd.S[roiPtr]);
544 if (kroi != roi[jr])
545 {
546 xjr = jr;
547 xkr = kr;
548 break;
549 }
550 roiPtr += kroi+4;
551 }
552 }
553
554 if (xjr>=0)
555 {
556 if (xkr<0)
557 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
558 else
559 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr]));
560
561 return false;
562 }
563
564 if (roi[8] < roi[0])
565 {
566 factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
567 return false;
568 }
569
570 return true;
571}
572
573list<shared_ptr<EVT_CTRL2>> evtCtrl;
574
575shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
576{
577 /*
578 checkroi consistence
579 find existing entry
580 if no entry, try to allocate memory
581 if entry and memory, init event structure
582 */
583
584 uint16_t nRoi[9];
585 if (!checkRoiConsistency(rd, nRoi))
586 return shared_ptr<EVT_CTRL2>();
587
588 for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
589 {
590 // A reference is enough because the evtCtrl holds the shared_ptr anyway
591 const shared_ptr<EVT_CTRL2> &evt = *it;
592
593 // If the run is different, go on searching.
594 // We cannot stop searching if a lower run-id is found as in
595 // the case of the events, because theoretically, there
596 // can be the same run on two different days.
597 if (rd.H.runnumber != evt->runNum)
598 continue;
599
600 // If the ID of the new event if higher than the last one stored
601 // in that run, we have to assign a new slot (leave the loop)
602 if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/)
603 break;
604
605 if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/)
606 continue;
607
608 // We have found an entry with the same runID and evtID
609 // Check if ROI is consistent
610 if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
611 {
612 factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
613 evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
614 return shared_ptr<EVT_CTRL2>();
615 }
616
617 // It is maybe not likely, but the header of this board might have
618 // arrived earlier. (We could also update the run-info, but
619 // this should not make a difference here)
620 if ((rd.time.tv_sec==evt->time.tv_sec && rd.time.tv_usec<evt->time.tv_usec) ||
621 rd.time.tv_sec<evt->time.tv_sec)
622 evt->time = rd.time;
623
624 //everything seems fine so far ==> use this slot ....
625 return evt;
626 }
627
628 if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
629 {
630 factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
631 actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter);
632 return shared_ptr<EVT_CTRL2>();
633 }
634
635 EVT_CTRL2 *evt = new EVT_CTRL2;
636
637 evt->time = rd.time;
638
639 evt->runNum = rd.H.runnumber;
640 evt->evNum = rd.H.fad_evt_counter;
641
642 evt->trgNum = rd.H.trigger_id;
643 evt->trgTyp = rd.H.trigger_type;
644
645 evt->nRoi = nRoi[0];
646 evt->nRoiTM = nRoi[8];
647
648 const bool newrun = actrun->runId != rd.H.runnumber;
649 if (newrun)
650 {
651 // Since we have started a new run, we know already when to close the
652 // previous run in terms of number of events
653 actrun->maxEvt = actrun->lastEvt;
654
655 factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d",
656 rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
657
658 // The new run is the active run now
659 actrun = make_shared<RUN_CTRL2>();
660
661 const time_t &tsec = evt->time.tv_sec;
662
663 actrun->openTime = tsec;
664 actrun->closeTime = tsec + 3600 * 24; // max time allowed
665 actrun->runId = rd.H.runnumber;
666 actrun->roi0 = nRoi[0]; // FIXME: Make obsolete!
667 actrun->roi8 = nRoi[8]; // FIXME: Make obsolete!
668
669 // Signal the fadctrl that a new run has been started
670 // Note this is the only place at which we can ensure that
671 // gotnewRun is called only once
672 gotNewRun(*actrun);
673 }
674
675 // Keep pointer to run of this event
676 evt->runCtrl = actrun;
677
678 // Increase the number of events we have started to receive in this run
679 actrun->lastTime = evt->time.tv_sec; // Time when the last event was received
680 actrun->lastEvt++;
681
682 // An event can be the first and the last, but not the last and the first.
683 // Therefore gotNewRun is called before runFinished.
684 // runFinished signals that the last event of a run was just received. Processing
685 // might still be ongoing, but we can start a new run.
686 const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached
687 const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached
688 if (!cond1 || !cond2)
689 runFinished();
690
691 // We don't mind here that this is not common to all events,
692 // because every coming event will fullfil the condition as well.
693 if (!cond1)
694 evt->closeRequest |= kRequestMaxEvtsReached;
695 if (!cond2)
696 evt->closeRequest |= kRequestMaxTimeReached;
697
698 // Secure access to evtCtrl against access in CloseRunFile
699 // This should be the last... otherwise we can run into threading issues
700 // if the event is accessed before it is fully initialized.
701 evtCtrl.emplace_back(evt);
702 return evtCtrl.back();
703}
704
705
706void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
707{
708 const int i = rBuf.sockId;
709
710 memcpy(evt->FADhead+i, &rBuf.H, sizeof(PEVNT_HEADER));
711
712 int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts
713
714 // consistency of ROIs have been checked already (is it all correct?)
715 const uint16_t &roi = rBuf.S[src+2];
716
717 // different sort in FAD board.....
718 EVENT *event = evt->fEvent;
719 for (int px = 0; px < 9; px++)
720 {
721 for (int drs = 0; drs < 4; drs++)
722 {
723 const int16_t pixC = rBuf.S[src+1]; // start-cell
724 const int16_t pixR = rBuf.S[src+2]; // roi
725 //here we should check if pixH is correct ....
726
727 const int pixS = i*36 + drs*9 + px;
728
729 event->StartPix[pixS] = pixC;
730
731 memcpy(event->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2);
732
733 src += 4+pixR;
734
735 // Treatment for ch 9 (TM channel)
736 if (px != 8)
737 continue;
738
739 const int tmS = i*4 + drs;
740
741 //and we have additional TM info
742 if (pixR > roi)
743 {
744 event->StartTM[tmS] = (pixC + pixR - roi) % 1024;
745
746 memcpy(event->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2);
747 }
748 else
749 {
750 event->StartTM[tmS] = -1;
751 }
752 }
753 }
754}
755
756// ==========================================================================
757
758uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt)
759{
760 factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)",
761 evt->runNum, evt->evNum, evtCtrl.size(), txt);
762
763 uint64_t report = 0;
764
765 char str[1000];
766
767 int ik=0;
768 for (int ib=0; ib<NBOARDS; ib++)
769 {
770 if (ib%10==0)
771 str[ik++] = '|';
772
773 const int jb = evt->board[ib];
774 if (jb>=0) // data received from that board
775 {
776 str[ik++] = '0'+(jb%10);
777 continue;
778 }
779
780 // FIXME: This is not synchronous... it reports
781 // accoridng to the current connection status, not w.r.t. to the
782 // one when the event was taken.
783 if (gi_NumConnect[ib]==0) // board not connected
784 {
785 str[ik++] = 'x';
786 continue;
787 }
788
789 // data from this board lost
790 str[ik++] = '.';
791 report |= ((uint64_t)1)<<ib;
792 }
793
794 str[ik++] = '|';
795 str[ik] = 0;
796
797 factOut(MessageImp::kWarn, str);
798
799 return report;
800}
801
802// ==========================================================================
803// ==========================================================================
804
805bool proc1(const shared_ptr<EVT_CTRL2> &);
806
807Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&proc1, placeholders::_1));
808
809bool proc1(const shared_ptr<EVT_CTRL2> &evt)
810{
811 applyCalib(*evt, processingQueue1.size());
812 return true;
813}
814
815// If this is not convenient anymore, it could be replaced by
816// a command queue, to which command+data is posted,
817// (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo)
818bool writeEvt(const shared_ptr<EVT_CTRL2> &evt)
819{
820 //const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
821 RUN_CTRL2 &run = *evt->runCtrl;
822
823 // Is this a valid event or just an empty event to trigger run close?
824 // If this is not an empty event open the new run-file
825 // Empty events are there to trigger run-closing conditions
826 if (evt->valid())
827 {
828 // File not yet open
829 if (run.fileStat==kFileNotYetOpen)
830 {
831 // runOpen will close a previous run, if still open
832 if (!runOpen(*evt))
833 {
834 factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
835 run.fileStat = kFileClosed;
836 return true;
837 }
838
839 factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
840 run.fileStat = kFileOpen;
841 }
842
843 // Here we have a valid calibration and can go on with that.
844 // It is important that _all_ events are sent for calibration (except broken ones)
845 processingQueue1.post(evt);
846 }
847
848 // File already closed
849 if (run.fileStat==kFileClosed)
850 return true;
851
852 // If we will have a software trigger which prevents single events from writing,
853 // the logic of writing the stop time and the trigger counters need to be adapted.
854 // Currently it is just the values of the last valid event.
855 bool rc1 = true;
856 if (evt->valid())
857 {
858 rc1 = runWrite(*evt);
859 if (!rc1)
860 factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
861 }
862
863 // File not open... no need to close or to check for close
864 // ... this is the case if CloseRunFile was called before any file was opened.
865 if (run.fileStat!=kFileOpen)
866 return true;
867
868 // File is not yet to be closed.
869 if (rc1 && evt->closeRequest==kRequestNone)
870 return true;
871
872 runClose(*evt);
873 run.fileStat = kFileClosed;
874
875 vector<string> reason;
876 if (evt->closeRequest&kRequestManual)
877 reason.emplace_back("close requested");
878 if (evt->closeRequest&kRequestTimeout)
879 reason.emplace_back("receive timeout");
880 if (evt->closeRequest&kRequestConnectionChange)
881 reason.emplace_back("connection changed");
882 if (evt->closeRequest&kRequestEventCheckFailed)
883 reason.emplace_back("event check failed");
884 if (evt->closeRequest&kRequestMaxTimeReached)
885 reason.push_back(to_string(run.closeTime-run.openTime)+"s reached");
886 if (evt->closeRequest&kRequestMaxEvtsReached)
887 reason.push_back(to_string(run.maxEvt)+" evts reached");
888 if (!rc1)
889 reason.emplace_back("runWrite failed");
890
891 const string str = boost::algorithm::join(reason, ", ");
892 factPrintf(MessageImp::kInfo, "File closed because %s", str.c_str());
893
894 return true;
895}
896
897Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1));
898
899bool procEvt(const shared_ptr<EVT_CTRL2> &evt)
900{
901 RUN_CTRL2 &run = *evt->runCtrl;
902
903 bool check = true;
904 if (evt->valid())
905 {
906 EVENT *event = evt->fEvent;
907
908 // This is already done in initMemory()
909 //event->Roi = evt->runCtrl->roi0;
910 //event->RoiTM = evt->runCtrl->roi8;
911 //event->EventNum = evt->evNum;
912 //event->TriggerNum = evt->trgNum;
913 //event->TriggerType = evt->trgTyp;
914
915 event->NumBoards = evt->nBoard;
916
917 event->PCTime = evt->time.tv_sec;
918 event->PCUsec = evt->time.tv_usec;
919
920 for (int ib=0; ib<NBOARDS; ib++)
921 event->BoardTime[ib] = evt->FADhead[ib].time;
922
923 check = eventCheck(*evt);
924
925 // If the event is valid, increase the trigger counter accordingly
926 if (check)
927 {
928 // Physics trigger
929 if (evt->trgTyp && !(evt->trgTyp & FAD::EventHeader::kAll))
930 run.triggerCounter[0]++;
931 // Pure pedestal trigger
932 else if ((evt->trgTyp&FAD::EventHeader::kPedestal) && !(evt->trgTyp&FAD::EventHeader::kTIM))
933 run.triggerCounter[1]++;
934 // external light pulser trigger
935 else if (evt->trgTyp & FAD::EventHeader::kLPext)
936 run.triggerCounter[2]++;
937 // time calibration triggers
938 else if (evt->trgTyp & (FAD::EventHeader::kTIM|FAD::EventHeader::kPedestal))
939 run.triggerCounter[3]++;
940 // internal light pulser trigger
941 else if (evt->trgTyp & FAD::EventHeader::kLPint)
942 run.triggerCounter[4]++;
943 // external trigger input 1
944 else if (evt->trgTyp & FAD::EventHeader::kExt1)
945 run.triggerCounter[5]++;
946 // external trigger input 2
947 else if (evt->trgTyp & FAD::EventHeader::kExt2)
948 run.triggerCounter[6]++;
949 // other triggers
950 else
951 run.triggerCounter[7]++;
952 }
953 }
954
955 // If this is an invalid event, the current triggerCounter needs to be copied
956 // because runClose will use that one to update the TRIGGER_COUNTER.
957 // When closing the file, the trigger counter of the last successfully
958 // written event is used.
959 evt->triggerCounter = run.triggerCounter;
960
961 // If event check has failed, skip the event and post a close request instead.
962 // Otherwise, if file is open post the event for being written
963 if (!check)
964 secondaryQueue.emplace(new EVT_CTRL2(kRequestEventCheckFailed, evt->runCtrl));
965 else
966 secondaryQueue.post(evt);
967
968 return true;
969}
970
971// ==========================================================================
972// ==========================================================================
973
974/*
975 task 1-4:
976
977 lock1()-lock4();
978 while (1)
979 {
980 wait for signal [lockN]; // unlocked
981
982 while (n!=10)
983 wait sockets;
984 read;
985
986 lockM();
987 finished[n] = true;
988 signal(mainloop);
989 unlockM();
990 }
991
992
993 mainloop:
994
995 while (1)
996 {
997 lockM();
998 while (!finished[0] || !finished[1] ...)
999 wait for signal [lockM]; // unlocked... signals can be sent
1000 finished[0-1] = false;
1001 unlockM()
1002
1003 copy data to queue // locked
1004
1005 lockN[0-3];
1006 signalN[0-3];
1007 unlockN[0-3];
1008 }
1009
1010
1011 */
1012
1013/*
1014 while (g_reset)
1015 {
1016 shared_ptr<EVT_CTRL2> evt = new shared_ptr<>;
1017
1018 // Check that all sockets are connected
1019
1020 for (int i=0; i<40; i++)
1021 if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0)
1022 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1023
1024 while (g_reset)
1025 {
1026 if (READ_STRUCT::wait()<0)
1027 break;
1028
1029 if (rc_epoll==0)
1030 break;
1031
1032 for (int jj=0; jj<rc_epoll; jj++)
1033 {
1034 READ_STRUCT *rs = READ_STRUCT::get(jj);
1035 if (!rs->connected)
1036 continue;
1037
1038 const bool rc_read = rs->read();
1039 if (!rc_read)
1040 continue;
1041
1042 if (rs->bufTyp==READ_STRUCT::kHeader)
1043 {
1044 [...]
1045 }
1046
1047 [...]
1048
1049 if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
1050 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1051 }
1052
1053 if (once_a_second)
1054 {
1055 if (evt==timeout)
1056 break;
1057 }
1058 }
1059
1060 if (evt.nBoards==actBoards)
1061 primaryQueue.post(evt);
1062 }
1063*/
1064
1065Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
1066
1067// This corresponds more or less to fFile... should we merge both?
1068shared_ptr<RUN_CTRL2> actrun;
1069
1070void CloseRunFile()
1071{
1072 // Currently we need actrun here, to be able to set kFileClosed.
1073 // Apart from that we have to ensure that there is an open file at all
1074 // which we can close.
1075 // Submission to the primary queue ensures that the event
1076 // is placed at the right place in the processing chain.
1077 // (Corresponds to the correct run)
1078 primaryQueue.emplace(new EVT_CTRL2(kRequestManual, actrun));
1079}
1080
1081bool mainloop(READ_STRUCT *rd)
1082{
1083 factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
1084
1085 primaryQueue.start();
1086 secondaryQueue.start();
1087 processingQueue1.start();;
1088
1089 actrun = make_shared<RUN_CTRL2>();
1090
1091 //time in seconds
1092 time_t gi_SecTime = time(NULL)-1;
1093
1094 //loop until global variable g_runStat claims stop
1095 g_reset = 0;
1096 while (g_reset == 0)
1097 {
1098#ifdef USE_POLL
1099 int pp[40];
1100 int nn = 0;
1101 pollfd fds[40];
1102 for (int i=0; i<40; i++)
1103 {
1104 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1105 {
1106 fds[nn].fd = rd[i].socket;
1107 fds[nn].events = POLLIN;
1108 pp[nn] = i;
1109 nn++;
1110 }
1111 }
1112
1113 const int rc_epoll = poll(fds, nn, 100);
1114 if (rc_epoll<0)
1115 break;
1116#endif
1117
1118#ifdef USE_SELECT
1119 fd_set readfs;
1120 FD_ZERO(&readfs);
1121 int nfsd = 0;
1122 for (int i=0; i<NBOARDS; i++)
1123 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1124 {
1125 FD_SET(rd[i].socket, &readfs);
1126 if (rd[i].socket>nfsd)
1127 nfsd = rd[i].socket;
1128 }
1129
1130 timeval tv;
1131 tv.tv_sec = 0;
1132 tv.tv_usec = 100000;
1133 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
1134 // 0: timeout
1135 // -1: error
1136 if (rc_select<0)
1137 {
1138 factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
1139 continue;
1140 }
1141#endif
1142
1143#ifdef USE_EPOLL
1144 const int rc_epoll = READ_STRUCT::wait();
1145 if (rc_epoll<0)
1146 break;
1147#endif
1148
1149#ifdef PRIORITY_QUEUE
1150 priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio;
1151
1152 for (int i=0; i<NBOARDS; i++)
1153 if (rd[i].connected)
1154 prio.push(rd+i);
1155
1156 if (!prio.empty()) do
1157#endif
1158
1159
1160#ifdef USE_POLL
1161 for (int jj=0; jj<nn; jj++)
1162#endif
1163#ifdef USE_EPOLL
1164 for (int jj=0; jj<rc_epoll; jj++)
1165#endif
1166#if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE)
1167 for (int jj=0; jj<NBOARDS; jj++)
1168#endif
1169 {
1170#ifdef PRIORITY_QUEUE
1171 READ_STRUCT *rs = prio.top();
1172#endif
1173#ifdef USE_SELECT
1174 if (!FD_ISSET(rs->socket, &readfs))
1175 continue;
1176#endif
1177
1178#ifdef USE_POLL
1179 if ((fds[jj].revents&POLLIN)==0)
1180 continue;
1181#endif
1182
1183#ifdef USE_EPOLL
1184 // FIXME: How to get i?
1185 READ_STRUCT *rs = READ_STRUCT::get(jj);
1186#endif
1187
1188#ifdef USE_POLL
1189 // FIXME: How to get i?
1190 READ_STRUCT *rs = &rd[pp[jj]];
1191#endif
1192
1193#if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE)
1194 const int i = (jj%4)*10 + (jj/4);
1195 READ_STRUCT *rs = &rd[i];
1196#endif
1197
1198#ifdef COMPLETE_EVENTS
1199 if (rs->bufTyp==READ_STRUCT::kWait)
1200 continue;
1201#endif
1202
1203 // ==================================================================
1204
1205 const bool rc_read = rs->read();
1206
1207 // Connect might have gotten closed during read
1208 gi_NumConnect[rs->sockId] = rs->connected;
1209 gj.numConn[rs->sockId] = rs->connected;
1210
1211 // Read either failed or disconnected, or the buffer is not yet full
1212 if (!rc_read)
1213 continue;
1214
1215 // ==================================================================
1216
1217 if (rs->bufTyp==READ_STRUCT::kHeader)
1218 {
1219 //check if startflag correct; else shift block ....
1220 // FIXME: This is not enough... this combination of
1221 // bytes can be anywhere... at least the end bytes
1222 // must be checked somewhere, too.
1223 uint k;
1224 for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
1225 {
1226 //if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
1227 if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0x01fb)
1228 break;
1229 }
1230 rs->skip += k;
1231
1232 //no start of header found
1233 if (k==sizeof(PEVNT_HEADER)-1)
1234 {
1235 rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1];
1236 rs->bufPos = rs->B+1;
1237 rs->bufLen = sizeof(PEVNT_HEADER)-1;
1238 continue;
1239 }
1240
1241 if (k > 0)
1242 {
1243 memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
1244
1245 rs->bufPos -= k;
1246 rs->bufLen += k;
1247
1248 continue; // We need to read more (bufLen>0)
1249 }
1250
1251 if (rs->skip>0)
1252 {
1253 factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
1254 rs->skip = 0;
1255 }
1256
1257 // Swap the header entries from network to host order
1258 rs->swapHeader();
1259
1260 rs->bufTyp = READ_STRUCT::kData;
1261 rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
1262
1263 debugHead(rs->B); // i and fadBoard not used
1264
1265 continue;
1266 }
1267
1268 const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
1269 if (end != 0xfe04)
1270 {
1271 factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
1272 rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
1273
1274 // ready to read next header
1275 rs->bufTyp = READ_STRUCT::kHeader;
1276 rs->bufLen = sizeof(PEVNT_HEADER);
1277 rs->bufPos = rs->B;
1278 // FIXME: What to do with the validity flag?
1279 continue;
1280 }
1281
1282 // get index into mBuffer for this event (create if needed)
1283 const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
1284
1285 // We have a valid entry, but no memory has yet been allocated
1286 if (evt && !evt->initMemory())
1287 {
1288 const time_t tm = time(NULL);
1289 if (evt->runCtrl->reportMem==tm)
1290 continue;
1291
1292 factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
1293 evt->runCtrl->reportMem = tm;
1294 continue;
1295 }
1296
1297 // ready to read next header
1298 rs->bufTyp = READ_STRUCT::kHeader;
1299 rs->bufLen = sizeof(PEVNT_HEADER);
1300 rs->bufPos = rs->B;
1301
1302 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1303 if (!evt)
1304 continue;
1305
1306 // This should never happen
1307 if (evt->board[rs->sockId] != -1)
1308 {
1309 factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
1310 evt->evNum, rs->sockId, rs->sockId, rs->len());
1311 // FIXME: What to do with the validity flag?
1312 continue; // Continue reading next header
1313 }
1314
1315 // Swap the data entries (board headers) from network to host order
1316 rs->swapData();
1317
1318 // Copy data from rd[i] to mBuffer[evID]
1319 copyData(*rs, evt.get());
1320
1321#ifdef COMPLETE_EVENTS
1322 // Do not read anmymore from this board until the whole event has been received
1323 rs->bufTyp = READ_STRUCT::kWait;
1324#endif
1325 // now we have stored a new board contents into Event structure
1326 evt->board[rs->sockId] = rs->sockId;
1327 evt->header = evt->FADhead+rs->sockId;
1328 evt->nBoard++;
1329
1330#ifdef COMPLETE_EPOLL
1331 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
1332 {
1333 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1334 break;
1335 }
1336#endif
1337 // event not yet complete
1338 if (evt->nBoard < READ_STRUCT::activeSockets)
1339 continue;
1340
1341 // All previous events are now flagged as incomplete ("expired")
1342 // and will be removed. (This is a bit tricky, because pop_front()
1343 // would invalidate the current iterator if not done _after_ the increment)
1344 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1345 {
1346 const bool found = it->get()==evt.get();
1347 if (!found)
1348 reportIncomplete(*it, "expired");
1349 else
1350 primaryQueue.post(evt);
1351
1352 // package_len is 0 if nothing was received.
1353 for (int ib=0; ib<40; ib++)
1354 rd[ib].relBytes += uint32_t((*it)->FADhead[ib].package_length)*2;
1355
1356 it++;
1357 evtCtrl.pop_front();
1358
1359 // We reached the current event, so we are done
1360 if (found)
1361 break;
1362 }
1363
1364#ifdef COMPLETE_EPOLL
1365 for (int j=0; j<40; j++)
1366 {
1367 epoll_event ev;
1368 ev.events = EPOLLIN;
1369 ev.data.ptr = &rd[j]; // user data (union: ev.ptr)
1370 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
1371 {
1372 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1373 return;
1374 }
1375 }
1376#endif
1377
1378#ifdef COMPLETE_EVENTS
1379 for (int j=0; j<40; j++)
1380 {
1381 //if (rs->bufTyp==READ_STRUCT::kWait)
1382 {
1383 rs->bufTyp = READ_STRUCT::kHeader;
1384 rs->bufLen = sizeof(PEVNT_HEADER);
1385 rs->bufPos = rs->B;
1386 }
1387 }
1388#endif
1389 } // end for loop over all sockets
1390#ifdef PRIORITY_QUEUE
1391 while (0); // convert continue into break ;)
1392#endif
1393
1394 // ==================================================================
1395
1396 const time_t actTime = time(NULL);
1397 if (actTime == gi_SecTime)
1398 {
1399#if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL)
1400 if (evtCtrl.empty())
1401 usleep(actTime-actrun->lastTime>300 ? 10000 : 1);
1402#endif
1403 continue;
1404 }
1405 gi_SecTime = actTime;
1406
1407 // ==================================================================
1408 //loop over all active events and flag those older than read-timeout
1409 //delete those that are written to disk ....
1410
1411 // This could be improved having the pointer which separates the queue with
1412 // the incomplete events from the queue with the complete events
1413 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1414 {
1415 // A reference is enough because the shared_ptr is hold by the evtCtrl
1416 const shared_ptr<EVT_CTRL2> &evt = *it;
1417
1418 // If there was no memory, it is possible that no header has been processed yet
1419 // but the event already exists. What we want to check here are only
1420 // if from partially complete events a baord is missing, not
1421 // whether the whole event is missing
1422 if (!evt->valid())
1423 continue;
1424
1425 // The first event is the oldest. If the first event within the
1426 // timeout window was received, we can stop searching further.
1427 if (evt->time.tv_sec+g_evtTimeout>=actTime)
1428 break;
1429
1430 // This will result in the emission of a dim service.
1431 // It doesn't matter if that takes comparably long,
1432 // because we have to stop the run anyway.
1433 const uint64_t rep = reportIncomplete(evt, "timeout");
1434 factReportIncomplete(rep);
1435
1436 // package_len is 0 when nothing was received from this board
1437 for (int ib=0; ib<40; ib++)
1438 rd[ib].relBytes += uint32_t((*it)->FADhead[ib].package_length)*2;
1439
1440 it++;
1441 evtCtrl.pop_front();
1442 }
1443
1444 // =================================================================
1445
1446 gj.bufNew = evtCtrl.size(); //# incomplete events in buffer
1447 gj.bufEvt = primaryQueue.size(); //# complete events in buffer
1448 gj.bufWrite = secondaryQueue.size(); //# complete events in buffer
1449 gj.bufProc = processingQueue1.size(); //# complete events in buffer
1450 gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
1451 gj.usdMem = Memory::max_inuse;
1452 gj.totMem = Memory::allocated;
1453 gj.maxMem = g_maxMem;
1454
1455 gj.deltaT = 1000; // temporary, must be improved
1456
1457 bool changed = false;
1458
1459 static vector<uint64_t> store(NBOARDS);
1460
1461 for (int ib=0; ib<NBOARDS; ib++)
1462 {
1463 gj.rateBytes[ib] = store[ib]>rd[ib].totBytes ? rd[ib].totBytes : rd[ib].totBytes-store[ib];
1464 gj.relBytes[ib] = rd[ib].totBytes-rd[ib].relBytes;
1465
1466 store[ib] = rd[ib].totBytes;
1467
1468 if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
1469 changed = true;
1470
1471 gi_NumConnect[ib] = rd[ib].connected;
1472 gj.numConn[ib] = rd[ib].connected;
1473 }
1474
1475 factStat(gj);
1476
1477 Memory::max_inuse = 0;
1478
1479 // =================================================================
1480
1481 // This is a fake event to trigger possible run-closing conditions once a second
1482 // FIXME: This is not yet ideal because a file would never be closed
1483 // if a new file has been started and no events of the new file
1484 // have been received yet
1485 int request = kRequestNone;
1486
1487 // If nothing was received for more than 5min, close file
1488 if (actTime-actrun->lastTime>300)
1489 request |= kRequestTimeout;
1490
1491 // If connection status has changed
1492 if (changed)
1493 request |= kRequestConnectionChange;
1494
1495 if (request!=kRequestNone)
1496 runFinished();
1497
1498 if (actrun->fileStat==kFileOpen)
1499 primaryQueue.emplace(new EVT_CTRL2(request, actrun));
1500 }
1501
1502 // 1: Stop, wait for event to get processed
1503 // 2: Stop, finish immediately
1504 // 101: Restart, wait for events to get processed
1505 // 101: Restart, finish immediately
1506 //
1507 const int gi_reset = g_reset;
1508
1509 const bool abort = gi_reset%100==2;
1510
1511 factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
1512
1513 primaryQueue.wait(abort);
1514 secondaryQueue.wait(abort);
1515 processingQueue1.wait(abort);
1516
1517 // Here we also destroy all runCtrl structures and hence close all open files
1518 evtCtrl.clear();
1519 actrun.reset();
1520
1521 factPrintf(MessageImp::kInfo, "Exit read Process...");
1522 factPrintf(MessageImp::kInfo, "%ld Bytes flagged as in-use.", Memory::inuse);
1523
1524 factStat(gj);
1525
1526 return gi_reset>=100;
1527}
1528
1529// ==========================================================================
1530// ==========================================================================
1531
1532void StartEvtBuild()
1533{
1534 factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
1535
1536 memset(gi_NumConnect, 0, NBOARDS*sizeof(*gi_NumConnect));
1537
1538 memset(&gj, 0, sizeof(GUI_STAT));
1539
1540 gj.usdMem = Memory::inuse;
1541 gj.totMem = Memory::allocated;
1542 gj.maxMem = g_maxMem;
1543
1544
1545 READ_STRUCT rd[NBOARDS];
1546
1547 // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets)
1548 for (int i=0; i<NBOARDS; i++)
1549 rd[i].sockId = i;
1550
1551 while (mainloop(rd));
1552
1553 //must close all open sockets ...
1554 factPrintf(MessageImp::kInfo, "Close all sockets...");
1555
1556 READ_STRUCT::close();
1557
1558 // Now all sockets get closed. This is not reflected in gi_NumConnect
1559 // The current workaround is to count all sockets as closed when the thread is not running
1560}
Note: See TracBrowser for help on using the repository browser.