source: trunk/FACT++/src/EventBuilder.cc@ 16736

Last change on this file since 16736 was 16691, checked in by tbretz, 11 years ago
Nagle switched off... no idea if this makes any sense, but it could theoretically improve the speed with which the fad boards know that we have correctly received the data.
File size: 44.2 KB
Line 
1#include <poll.h>
2#include <sys/time.h>
3#include <sys/epoll.h>
4#include <netinet/tcp.h>
5
6#include <cstring>
7#include <cstdarg>
8#include <list>
9#include <queue>
10
11#include <boost/algorithm/string/join.hpp>
12
13#include "queue.h"
14
15#include "MessageImp.h"
16#include "EventBuilder.h"
17
18using namespace std;
19
20#define MIN_LEN 32 // min #bytes needed to interpret FADheader
21#define MAX_LEN 81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092 (data+boardheader+eventheader+endflag)
22
23//#define COMPLETE_EVENTS
24//#define USE_POLL
25//#define USE_EPOLL
26//#define USE_SELECT
27//#define COMPLETE_EPOLL
28//#define PRIORITY_QUEUE
29
30// Reading only 1024: 13: 77Hz, 87%
31// Reading only 1024: 12: 78Hz, 46%
32// Reading only 300: 4: 250Hz, 92%
33// Reading only 300: 3: 258Hz, 40%
34
35// Reading only four threads 1024: 13: 77Hz, 60%
36// Reading only four threads 1024: 12: 78Hz, 46%
37// Reading only four threads 300: 4: 250Hz, 92%
38// Reading only four threads 300: 3: 258Hz, 40%
39
40// Default 300: 4: 249Hz, 92%
41// Default 300: 3: 261Hz, 40%
42// Default 1024: 13: 76Hz, 93%
43// Default 1024: 12: 79Hz, 46%
44
45// Poll [selected] 1024: 13: 63Hz, 45%
46// Poll [selected] 1024: 14: 63Hz, 63%
47// Poll [selected] 1024: 15: 64Hz, 80%
48// Poll [selected] 300: 4: 230Hz, 47%
49// Poll [selected] 300: 3: 200Hz, 94%
50
51// Poll [all] 1024: 13: 65Hz, 47%
52// Poll [all] 1024: 14: 64Hz, 59%
53// Poll [all] 1024: 15: 62Hz, 67%
54// Poll [all] 300: 4: 230Hz, 47%
55// Poll [all] 300: 3: 230Hz, 35%
56
57// ==========================================================================
58
59bool runOpen(const EVT_CTRL2 &evt);
60bool runWrite(const EVT_CTRL2 &evt);
61void runClose(RUN_CTRL2 &run);
62void applyCalib(const EVT_CTRL2 &evt, const size_t &size);
63void factOut(int severity, const char *message);
64void factReportIncomplete (uint64_t rep);
65void gotNewRun(RUN_CTRL2 &run);
66void runFinished();
67void factStat(const GUI_STAT &gj);
68bool eventCheck(const EVT_CTRL2 &evt);
69void debugHead(void *buf);
70
71// ==========================================================================
72
73int g_reset;
74
75size_t g_maxMem; //maximum memory allowed for buffer
76
77uint16_t g_evtTimeout; // timeout (sec) for one event
78
79FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
80
81uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
82
83GUI_STAT gj;
84
85// ==========================================================================
86
87namespace Memory
88{
89 uint64_t inuse = 0;
90 uint64_t allocated = 0;
91
92 uint64_t max_inuse = 0;
93
94 std::mutex mtx;
95
96 std::forward_list<void*> memory;
97
98 void *malloc()
99 {
100 // No free slot available, next alloc would exceed max memory
101 if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem)
102 return NULL;
103
104 // We will return this amount of memory
105 // This is not 100% thread safe, but it is not a super accurate measure anyway
106 inuse += MAX_TOT_MEM;
107 if (inuse>max_inuse)
108 max_inuse = inuse;
109
110 if (memory.empty())
111 {
112 // No free slot available, allocate a new one
113 allocated += MAX_TOT_MEM;
114 return new char[MAX_TOT_MEM];
115 }
116
117 // Get the next free slot from the stack and return it
118 const std::lock_guard<std::mutex> lock(mtx);
119
120 void *mem = memory.front();
121 memory.pop_front();
122 return mem;
123 };
124
125 void free(void *mem)
126 {
127 if (!mem)
128 return;
129
130 // Decrease the amont of memory in use accordingly
131 inuse -= MAX_TOT_MEM;
132
133 // If the maximum memory has changed, we might be over the limit.
134 // In this case: free a slot
135 if (allocated>g_maxMem)
136 {
137 delete [] (char*)mem;
138 allocated -= MAX_TOT_MEM;
139 return;
140 }
141
142 const std::lock_guard<std::mutex> lock(mtx);
143 memory.push_front(mem);
144 }
145
146};
147
148// ==========================================================================
149
150void factPrintf(int severity, const char *fmt, ...)
151{
152 char str[1000];
153
154 va_list ap;
155 va_start(ap, fmt);
156 vsnprintf(str, 1000, fmt, ap);
157 va_end(ap);
158
159 factOut(severity, str);
160}
161
162// ==========================================================================
163
164struct READ_STRUCT
165{
166 enum buftyp_t
167 {
168 kStream,
169 kHeader,
170 kData,
171#ifdef COMPLETE_EVENTS
172 kWait
173#endif
174 };
175
176 // ---------- connection ----------
177
178 static uint activeSockets;
179
180 int sockId; // socket id (board number)
181 int socket; // socket handle
182 bool connected; // is this socket connected?
183
184 struct sockaddr_in SockAddr; // Socket address copied from wrapper during socket creation
185
186 // ------------ epoll -------------
187
188 static int fd_epoll;
189 static epoll_event events[NBOARDS];
190
191 static void init();
192 static void close();
193 static int wait();
194 static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); }
195
196 // ------------ buffer ------------
197
198 buftyp_t bufTyp; // what are we reading at the moment: 0=header 1=data -1=skip ...
199
200 uint32_t bufLen; // number of bytes left to read
201 uint8_t *bufPos; // next byte to read to the buffer next
202
203 union
204 {
205 uint8_t B[MAX_LEN];
206 uint16_t S[MAX_LEN / 2];
207 uint32_t I[MAX_LEN / 4];
208 uint64_t L[MAX_LEN / 8];
209 PEVNT_HEADER H;
210 };
211
212 timeval time;
213 uint64_t totBytes; // total received bytes
214 uint64_t relBytes; // total released bytes
215 uint32_t skip; // number of bytes skipped before start of event
216
217 uint32_t len() const { return uint32_t(H.package_length)*2; }
218
219 void swapHeader();
220 void swapData();
221
222 // --------------------------------
223
224 READ_STRUCT() : socket(-1), connected(false), totBytes(0), relBytes(0)
225 {
226 if (fd_epoll<0)
227 init();
228 }
229 ~READ_STRUCT()
230 {
231 destroy();
232 }
233
234 void destroy();
235 bool create(sockaddr_in addr);
236 bool check(int, sockaddr_in addr);
237 bool read();
238
239};
240
241#ifdef PRIORITY_QUEUE
242struct READ_STRUCTcomp
243{
244 bool operator()(const READ_STRUCT *r1, const READ_STRUCT *r2)
245 {
246 const int64_t rel1 = r1->totBytes - r1->relBytes;
247 const int64_t rel2 = r2->totBytes - r2->relBytes;
248 return rel1 > rel2;
249 }
250};
251#endif
252
253int READ_STRUCT::wait()
254{
255 // wait for something to do...
256 const int rc = epoll_wait(fd_epoll, events, NBOARDS, 100); // max, timeout[ms]
257 if (rc>=0)
258 return rc;
259
260 if (errno==EINTR) // timout or signal interruption
261 return 0;
262
263 factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno);
264 return -1;
265}
266
267uint READ_STRUCT::activeSockets = 0;
268int READ_STRUCT::fd_epoll = -1;
269epoll_event READ_STRUCT::events[NBOARDS];
270
271void READ_STRUCT::init()
272{
273 if (fd_epoll>=0)
274 return;
275
276#ifdef USE_EPOLL
277 fd_epoll = epoll_create(NBOARDS);
278 if (fd_epoll<0)
279 {
280 factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno);
281 return;
282 }
283#endif
284}
285
286void READ_STRUCT::close()
287{
288#ifdef USE_EPOLL
289 if (fd_epoll>=0 && ::close(fd_epoll)>0)
290 factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno);
291#endif
292
293 fd_epoll = -1;
294}
295
296bool READ_STRUCT::create(sockaddr_in sockAddr)
297{
298 if (socket>=0)
299 return false;
300
301 const int port = ntohs(sockAddr.sin_port) + 1;
302
303 SockAddr.sin_family = sockAddr.sin_family;
304 SockAddr.sin_addr = sockAddr.sin_addr;
305 SockAddr.sin_port = htons(port);
306
307 if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
308 {
309 factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno);
310 socket = -1;
311 return false;
312 }
313
314 int optval = 1;
315 if (setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(int)) < 0)
316 factPrintf(MessageImp::kInfo, "Setting TCP_NODELAY for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
317
318 optval = 1;
319 if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0)
320 factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
321
322 optval = 10; //start after 10 seconds
323 if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0)
324 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
325
326 optval = 10; //do every 10 seconds
327 if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0)
328 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
329
330 optval = 2; //close after 2 unsuccessful tries
331 if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0)
332 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
333
334 factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket);
335
336 //connected = false;
337 activeSockets++;
338
339 return true;
340}
341
342void READ_STRUCT::destroy()
343{
344 if (socket<0)
345 return;
346
347#ifdef USE_EPOLL
348 // strictly speaking this should not be necessary
349 if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
350 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
351#endif
352
353 if (::close(socket) > 0)
354 factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
355 else
356 factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket);
357
358 socket = -1;
359 connected = false;
360 activeSockets--;
361}
362
363bool READ_STRUCT::check(int sockDef, sockaddr_in addr)
364{
365 // Continue in the most most likely case (performance)
366 //if (socket>=0 && sockDef!=0 && connected)
367 // return;
368 const int old = socket;
369
370 // socket open, but should not be open
371 if (socket>=0 && sockDef==0)
372 destroy();
373
374 // Socket closed, but should be open
375 if (socket<0 && sockDef!=0)
376 create(addr); //generate address and socket
377
378 const bool retval = old!=socket;
379
380 // Socket closed
381 if (socket<0)
382 return retval;
383
384 // Socket open and connected: Nothing to do
385 if (connected)
386 return retval;
387
388 //try to connect if not yet done
389 const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
390 if (rc == -1)
391 return retval;
392
393 connected = true;
394
395 if (sockDef<0)
396 {
397 bufTyp = READ_STRUCT::kStream; // full data to be skipped
398 bufLen = MAX_LEN; // huge for skipping
399 }
400 else
401 {
402 bufTyp = READ_STRUCT::kHeader; // expect a header
403 bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
404 }
405
406 bufPos = B; // no byte read so far
407 skip = 0; // start empty
408 totBytes = 0;
409 relBytes = 0;
410
411 factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
412
413#ifdef USE_EPOLL
414 epoll_event ev;
415 ev.events = EPOLLIN;
416 ev.data.ptr = this; // user data (union: ev.ptr)
417 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
418 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
419#endif
420
421 return retval;
422}
423
424bool READ_STRUCT::read()
425{
426 if (!connected)
427 return false;
428
429 if (bufLen==0)
430 return true;
431
432 const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
433 // recv failed
434 if (jrd<0)
435 {
436 // There was just nothing waiting
437 if (errno==EWOULDBLOCK || errno==EAGAIN)
438 return false;
439
440 factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno);
441 return false;
442 }
443
444 // connection was closed ...
445 if (jrd==0)
446 {
447 factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId);
448
449 destroy();//DestroySocket(rd[i]); //generate address and socket
450 return false;
451 }
452
453 totBytes += jrd;
454
455 // are we skipping this board ...
456 if (bufTyp==kStream)
457 return false;
458
459 if (bufPos==B)
460 gettimeofday(&time, NULL);
461
462 bufPos += jrd; //==> prepare for continuation
463 bufLen -= jrd;
464
465 // not yet all read
466 return bufLen==0;
467}
468
469void READ_STRUCT::swapHeader()
470{
471 S[1] = ntohs(S[1]); // package_length (bytes not swapped!)
472 S[2] = ntohs(S[2]); // version_no
473 S[3] = ntohs(S[3]); // PLLLCK
474 S[4] = ntohs(S[4]); // trigger_crc
475 S[5] = ntohs(S[5]); // trigger_type
476
477 I[3] = ntohl(I[3]); // trigger_id
478 I[4] = ntohl(I[4]); // fad_evt_counter
479 I[5] = ntohl(I[5]); // REFCLK_frequency
480
481 S[12] = ntohs(S[12]); // board id
482 S[13] = ntohs(S[13]); // adc_clock_phase_shift
483 S[14] = ntohs(S[14]); // number_of_triggers_to_generate
484 S[15] = ntohs(S[15]); // trigger_generator_prescaler
485
486 I[10] = ntohl(I[10]); // runnumber;
487 I[11] = ntohl(I[11]); // time;
488
489 // Use back inserter??
490 for (int s=24; s<24+NTemp+NDAC; s++)
491 S[s] = ntohs(S[s]); // drs_temperature / dac
492}
493
494void READ_STRUCT::swapData()
495{
496 // swapEventHeaderBytes: End of the header. to channels now
497
498 int i = 36;
499 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
500 {
501 S[i+0] = ntohs(S[i+0]);//id
502 S[i+1] = ntohs(S[i+1]);//start_cell
503 S[i+2] = ntohs(S[i+2]);//roi
504 S[i+3] = ntohs(S[i+3]);//filling
505
506 i += 4+S[i+2];//skip the pixel data
507 }
508}
509
510// ==========================================================================
511
512bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
513{
514 int xjr = -1;
515 int xkr = -1;
516
517 //points to the very first roi
518 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
519
520 roi[0] = ntohs(rd.S[roiPtr]);
521
522 for (int jr = 0; jr < 9; jr++)
523 {
524 roi[jr] = ntohs(rd.S[roiPtr]);
525
526 if (roi[jr]>1024)
527 {
528 factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]);
529 return false;
530 }
531
532 // Check that the roi of pixels jr are compatible with the one of pixel 0
533 if (jr!=8 && roi[jr]!=roi[0])
534 {
535 xjr = jr;
536 break;
537 }
538
539 // Check that the roi of all other DRS chips on boards are compatible
540 for (int kr = 1; kr < 4; kr++)
541 {
542 const int kroi = ntohs(rd.S[roiPtr]);
543 if (kroi != roi[jr])
544 {
545 xjr = jr;
546 xkr = kr;
547 break;
548 }
549 roiPtr += kroi+4;
550 }
551 }
552
553 if (xjr>=0)
554 {
555 if (xkr<0)
556 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
557 else
558 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr]));
559
560 return false;
561 }
562
563 if (roi[8] < roi[0])
564 {
565 factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
566 return false;
567 }
568
569 return true;
570}
571
572list<shared_ptr<EVT_CTRL2>> evtCtrl;
573
574shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
575{
576 /*
577 checkroi consistence
578 find existing entry
579 if no entry, try to allocate memory
580 if entry and memory, init event structure
581 */
582
583 uint16_t nRoi[9];
584 if (!checkRoiConsistency(rd, nRoi))
585 return shared_ptr<EVT_CTRL2>();
586
587 for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
588 {
589 // A reference is enough because the evtCtrl holds the shared_ptr anyway
590 const shared_ptr<EVT_CTRL2> &evt = *it;
591
592 // If the run is different, go on searching.
593 // We cannot stop searching if a lower run-id is found as in
594 // the case of the events, because theoretically, there
595 // can be the same run on two different days.
596 if (rd.H.runnumber != evt->runNum)
597 continue;
598
599 // If the ID of the new event if higher than the last one stored
600 // in that run, we have to assign a new slot (leave the loop)
601 if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/)
602 break;
603
604 if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/)
605 continue;
606
607 // We have found an entry with the same runID and evtID
608 // Check if ROI is consistent
609 if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
610 {
611 factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
612 evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
613 return shared_ptr<EVT_CTRL2>();
614 }
615
616 // It is maybe not likely, but the header of this board might have
617 // arrived earlier. (We could also update the run-info, but
618 // this should not make a difference here)
619 if ((rd.time.tv_sec==evt->time.tv_sec && rd.time.tv_usec<evt->time.tv_usec) ||
620 rd.time.tv_sec<evt->time.tv_sec)
621 evt->time = rd.time;
622
623 //everything seems fine so far ==> use this slot ....
624 return evt;
625 }
626
627 if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
628 {
629 factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
630 actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter);
631 return shared_ptr<EVT_CTRL2>();
632 }
633
634 EVT_CTRL2 *evt = new EVT_CTRL2;
635
636 evt->time = rd.time;
637
638 evt->runNum = rd.H.runnumber;
639 evt->evNum = rd.H.fad_evt_counter;
640
641 evt->trgNum = rd.H.trigger_id;
642 evt->trgTyp = rd.H.trigger_type;
643
644 evt->nRoi = nRoi[0];
645 evt->nRoiTM = nRoi[8];
646
647 const bool newrun = actrun->runId != rd.H.runnumber;
648 if (newrun)
649 {
650 // Since we have started a new run, we know already when to close the
651 // previous run in terms of number of events
652 actrun->maxEvt = actrun->lastEvt;
653
654 factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d",
655 rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
656
657 // The new run is the active run now
658 actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
659
660 const time_t &tsec = evt->time.tv_sec;
661
662 actrun->openTime = tsec;
663 actrun->closeTime = tsec + 3600 * 24; // max time allowed
664 actrun->runId = rd.H.runnumber;
665 actrun->roi0 = nRoi[0]; // FIXME: Make obsolete!
666 actrun->roi8 = nRoi[8]; // FIXME: Make obsolete!
667
668 // Signal the fadctrl that a new run has been started
669 // Note this is the only place at which we can ensure that
670 // gotnewRun is called only once
671 gotNewRun(*actrun);
672 }
673
674 // Keep pointer to run of this event
675 evt->runCtrl = actrun;
676
677 // Increase the number of events we have started to receive in this run
678 actrun->lastTime = evt->time.tv_sec; // Time when the last event was received
679 actrun->lastEvt++;
680
681 // An event can be the first and the last, but not the last and the first.
682 // Therefore gotNewRun is called before runFinished.
683 // runFinished signals that the last event of a run was just received. Processing
684 // might still be ongoing, but we can start a new run.
685 const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached
686 const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached
687 if (!cond1 || !cond2)
688 runFinished();
689
690 // We don't mind here that this is not common to all events,
691 // because every coming event will fullfil the condition as well.
692 if (!cond1)
693 evt->closeRequest |= kRequestMaxEvtsReached;
694 if (!cond2)
695 evt->closeRequest |= kRequestMaxTimeReached;
696
697 // Secure access to evtCtrl against access in CloseRunFile
698 // This should be the last... otherwise we can run into threading issues
699 // if the event is accessed before it is fully initialized.
700 evtCtrl.emplace_back(evt);
701 return evtCtrl.back();
702}
703
704
705void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
706{
707 const int i = rBuf.sockId;
708
709 memcpy(evt->FADhead+i, &rBuf.H, sizeof(PEVNT_HEADER));
710
711 int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts
712
713 // consistency of ROIs have been checked already (is it all correct?)
714 const uint16_t &roi = rBuf.S[src+2];
715
716 // different sort in FAD board.....
717 EVENT *event = evt->fEvent;
718 for (int px = 0; px < 9; px++)
719 {
720 for (int drs = 0; drs < 4; drs++)
721 {
722 const int16_t pixC = rBuf.S[src+1]; // start-cell
723 const int16_t pixR = rBuf.S[src+2]; // roi
724 //here we should check if pixH is correct ....
725
726 const int pixS = i*36 + drs*9 + px;
727
728 event->StartPix[pixS] = pixC;
729
730 memcpy(event->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2);
731
732 src += 4+pixR;
733
734 // Treatment for ch 9 (TM channel)
735 if (px != 8)
736 continue;
737
738 const int tmS = i*4 + drs;
739
740 //and we have additional TM info
741 if (pixR > roi)
742 {
743 event->StartTM[tmS] = (pixC + pixR - roi) % 1024;
744
745 memcpy(event->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2);
746 }
747 else
748 {
749 event->StartTM[tmS] = -1;
750 }
751 }
752 }
753}
754
755// ==========================================================================
756
757uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt)
758{
759 factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)",
760 evt->runNum, evt->evNum, evtCtrl.size(), txt);
761
762 uint64_t report = 0;
763
764 char str[1000];
765
766 int ik=0;
767 for (int ib=0; ib<NBOARDS; ib++)
768 {
769 if (ib%10==0)
770 str[ik++] = '|';
771
772 const int jb = evt->board[ib];
773 if (jb>=0) // data received from that board
774 {
775 str[ik++] = '0'+(jb%10);
776 continue;
777 }
778
779 // FIXME: This is not synchronous... it reports
780 // accoridng to the current connection status, not w.r.t. to the
781 // one when the event was taken.
782 if (gi_NumConnect[ib]==0) // board not connected
783 {
784 str[ik++] = 'x';
785 continue;
786 }
787
788 // data from this board lost
789 str[ik++] = '.';
790 report |= ((uint64_t)1)<<ib;
791 }
792
793 str[ik++] = '|';
794 str[ik] = 0;
795
796 factOut(MessageImp::kWarn, str);
797
798 return report;
799}
800
801// ==========================================================================
802// ==========================================================================
803
804void proc1(const shared_ptr<EVT_CTRL2> &);
805
806Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&proc1, placeholders::_1));
807
808void proc1(const shared_ptr<EVT_CTRL2> &evt)
809{
810 applyCalib(*evt, processingQueue1.size());
811}
812
813// If this is not convenient anymore, it could be replaced by
814// a command queue, to which command+data is posted,
815// (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo)
816void writeEvt(const shared_ptr<EVT_CTRL2> &evt)
817{
818 //const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
819 RUN_CTRL2 &run = *evt->runCtrl;
820
821 // Is this a valid event or just an empty event to trigger run close?
822 // If this is not an empty event open the new run-file
823 // Empty events are there to trigger run-closing conditions
824 if (evt->valid())
825 {
826 // File not yet open
827 if (run.fileStat==kFileNotYetOpen)
828 {
829 // runOpen will close a previous run, if still open
830 if (!runOpen(*evt))
831 {
832 factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
833 run.fileStat = kFileClosed;
834 return;
835 }
836
837 factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
838 run.fileStat = kFileOpen;
839 }
840
841 // Here we have a valid calibration and can go on with that.
842 // It is important that _all_ events are sent for calibration (except broken ones)
843 processingQueue1.post(evt);
844 }
845
846 // File already closed
847 if (run.fileStat==kFileClosed)
848 return;
849
850 bool rc1 = true;
851 if (evt->valid())
852 {
853 rc1 = runWrite(*evt);
854 if (!rc1)
855 factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
856 }
857
858 // File not open... no need to close or to check for close
859 // ... this is the case if CloseRunFile was called before any file was opened.
860 if (run.fileStat!=kFileOpen)
861 return;
862
863 // File is not yet to be closed.
864 if (rc1 && evt->closeRequest==kRequestNone)
865 return;
866
867 runClose(run);
868 run.fileStat = kFileClosed;
869
870 vector<string> reason;
871 if (evt->closeRequest&kRequestManual)
872 reason.emplace_back("close requested");
873 if (evt->closeRequest&kRequestTimeout)
874 reason.emplace_back("receive timeout");
875 if (evt->closeRequest&kRequestConnectionChange)
876 reason.emplace_back("connection changed");
877 if (evt->closeRequest&kRequestEventCheckFailed)
878 reason.emplace_back("event check failed");
879 if (evt->closeRequest&kRequestMaxTimeReached)
880 reason.push_back(to_string(run.closeTime-run.openTime)+"s reached");
881 if (evt->closeRequest&kRequestMaxEvtsReached)
882 reason.push_back(to_string(run.maxEvt)+" evts reached");
883 if (!rc1)
884 reason.emplace_back("runWrite failed");
885
886 const string str = boost::algorithm::join(reason, ", ");
887 factPrintf(MessageImp::kInfo, "File closed because %s", str.c_str());
888}
889
890Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1));
891
892void procEvt(const shared_ptr<EVT_CTRL2> &evt)
893{
894 if (evt->valid())
895 {
896 EVENT *event = evt->fEvent;
897
898 // This is already done in initMemory()
899 //event->Roi = evt->runCtrl->roi0;
900 //event->RoiTM = evt->runCtrl->roi8;
901 //event->EventNum = evt->evNum;
902 //event->TriggerNum = evt->trgNum;
903 //event->TriggerType = evt->trgTyp;
904
905 event->NumBoards = evt->nBoard;
906
907 event->PCTime = evt->time.tv_sec;
908 event->PCUsec = evt->time.tv_usec;
909
910 for (int ib=0; ib<NBOARDS; ib++)
911 event->BoardTime[ib] = evt->FADhead[ib].time;
912
913 if (!eventCheck(*evt))
914 {
915 secondaryQueue.emplace(new EVT_CTRL2(kRequestEventCheckFailed, evt->runCtrl));
916 return;
917 }
918 }
919
920 // If file is open post the event for being written
921 secondaryQueue.post(evt);
922}
923
924// ==========================================================================
925// ==========================================================================
926
927/*
928 task 1-4:
929
930 lock1()-lock4();
931 while (1)
932 {
933 wait for signal [lockN]; // unlocked
934
935 while (n!=10)
936 wait sockets;
937 read;
938
939 lockM();
940 finished[n] = true;
941 signal(mainloop);
942 unlockM();
943 }
944
945
946 mainloop:
947
948 while (1)
949 {
950 lockM();
951 while (!finished[0] || !finished[1] ...)
952 wait for signal [lockM]; // unlocked... signals can be sent
953 finished[0-1] = false;
954 unlockM()
955
956 copy data to queue // locked
957
958 lockN[0-3];
959 signalN[0-3];
960 unlockN[0-3];
961 }
962
963
964 */
965
966/*
967 while (g_reset)
968 {
969 shared_ptr<EVT_CTRL2> evt = new shared_ptr<>;
970
971 // Check that all sockets are connected
972
973 for (int i=0; i<40; i++)
974 if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0)
975 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
976
977 while (g_reset)
978 {
979 if (READ_STRUCT::wait()<0)
980 break;
981
982 if (rc_epoll==0)
983 break;
984
985 for (int jj=0; jj<rc_epoll; jj++)
986 {
987 READ_STRUCT *rs = READ_STRUCT::get(jj);
988 if (!rs->connected)
989 continue;
990
991 const bool rc_read = rs->read();
992 if (!rc_read)
993 continue;
994
995 if (rs->bufTyp==READ_STRUCT::kHeader)
996 {
997 [...]
998 }
999
1000 [...]
1001
1002 if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
1003 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1004 }
1005
1006 if (once_a_second)
1007 {
1008 if (evt==timeout)
1009 break;
1010 }
1011 }
1012
1013 if (evt.nBoards==actBoards)
1014 primaryQueue.post(evt);
1015 }
1016*/
1017
1018Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
1019
1020// This corresponds more or less to fFile... should we merge both?
1021shared_ptr<RUN_CTRL2> actrun;
1022
1023void CloseRunFile()
1024{
1025 // Currently we need actrun here, to be able to set kFileClosed.
1026 // Apart from that we have to ensure that there is an open file at all
1027 // which we can close.
1028 // Submission to the primary queue ensures that the event
1029 // is placed at the right place in the processing chain.
1030 // (Corresponds to the correct run)
1031 primaryQueue.emplace(new EVT_CTRL2(kRequestManual, actrun));
1032}
1033
1034bool mainloop(READ_STRUCT *rd)
1035{
1036 factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
1037
1038 primaryQueue.start();
1039 secondaryQueue.start();
1040 processingQueue1.start();;
1041
1042 actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
1043
1044 //time in seconds
1045 time_t gi_SecTime = time(NULL)-1;
1046
1047 //loop until global variable g_runStat claims stop
1048 g_reset = 0;
1049 while (g_reset == 0)
1050 {
1051#ifdef USE_POLL
1052 int pp[40];
1053 int nn = 0;
1054 pollfd fds[40];
1055 for (int i=0; i<40; i++)
1056 {
1057 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1058 {
1059 fds[nn].fd = rd[i].socket;
1060 fds[nn].events = POLLIN;
1061 pp[nn] = i;
1062 nn++;
1063 }
1064 }
1065
1066 const int rc_epoll = poll(fds, nn, 100);
1067 if (rc_epoll<0)
1068 break;
1069#endif
1070
1071#ifdef USE_SELECT
1072 fd_set readfs;
1073 FD_ZERO(&readfs);
1074 int nfsd = 0;
1075 for (int i=0; i<NBOARDS; i++)
1076 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1077 {
1078 FD_SET(rd[i].socket, &readfs);
1079 if (rd[i].socket>nfsd)
1080 nfsd = rd[i].socket;
1081 }
1082
1083 timeval tv;
1084 tv.tv_sec = 0;
1085 tv.tv_usec = 100000;
1086 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
1087 // 0: timeout
1088 // -1: error
1089 if (rc_select<0)
1090 {
1091 factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
1092 continue;
1093 }
1094#endif
1095
1096#ifdef USE_EPOLL
1097 const int rc_epoll = READ_STRUCT::wait();
1098 if (rc_epoll<0)
1099 break;
1100#endif
1101
1102#ifdef PRIORITY_QUEUE
1103 priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio;
1104
1105 for (int i=0; i<NBOARDS; i++)
1106 if (rd[i].connected)
1107 prio.push(rd+i);
1108
1109 if (!prio.empty()) do
1110#endif
1111
1112
1113#ifdef USE_POLL
1114 for (int jj=0; jj<nn; jj++)
1115#endif
1116#ifdef USE_EPOLL
1117 for (int jj=0; jj<rc_epoll; jj++)
1118#endif
1119#if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE)
1120 for (int jj=0; jj<NBOARDS; jj++)
1121#endif
1122 {
1123#ifdef PRIORITY_QUEUE
1124 READ_STRUCT *rs = prio.top();
1125#endif
1126#ifdef USE_SELECT
1127 if (!FD_ISSET(rs->socket, &readfs))
1128 continue;
1129#endif
1130
1131#ifdef USE_POLL
1132 if ((fds[jj].revents&POLLIN)==0)
1133 continue;
1134#endif
1135
1136#ifdef USE_EPOLL
1137 // FIXME: How to get i?
1138 READ_STRUCT *rs = READ_STRUCT::get(jj);
1139#endif
1140
1141#ifdef USE_POLL
1142 // FIXME: How to get i?
1143 READ_STRUCT *rs = &rd[pp[jj]];
1144#endif
1145
1146#if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE)
1147 const int i = (jj%4)*10 + (jj/4);
1148 READ_STRUCT *rs = &rd[i];
1149#endif
1150
1151#ifdef COMPLETE_EVENTS
1152 if (rs->bufTyp==READ_STRUCT::kWait)
1153 continue;
1154#endif
1155
1156 // ==================================================================
1157
1158 const bool rc_read = rs->read();
1159
1160 // Connect might have gotten closed during read
1161 gi_NumConnect[rs->sockId] = rs->connected;
1162 gj.numConn[rs->sockId] = rs->connected;
1163
1164 // Read either failed or disconnected, or the buffer is not yet full
1165 if (!rc_read)
1166 continue;
1167
1168 // ==================================================================
1169
1170 if (rs->bufTyp==READ_STRUCT::kHeader)
1171 {
1172 //check if startflag correct; else shift block ....
1173 // FIXME: This is not enough... this combination of
1174 // bytes can be anywhere... at least the end bytes
1175 // must be checked somewhere, too.
1176 uint k;
1177 for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
1178 {
1179 //if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
1180 if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0x01fb)
1181 break;
1182 }
1183 rs->skip += k;
1184
1185 //no start of header found
1186 if (k==sizeof(PEVNT_HEADER)-1)
1187 {
1188 rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1];
1189 rs->bufPos = rs->B+1;
1190 rs->bufLen = sizeof(PEVNT_HEADER)-1;
1191 continue;
1192 }
1193
1194 if (k > 0)
1195 {
1196 memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
1197
1198 rs->bufPos -= k;
1199 rs->bufLen += k;
1200
1201 continue; // We need to read more (bufLen>0)
1202 }
1203
1204 if (rs->skip>0)
1205 {
1206 factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
1207 rs->skip = 0;
1208 }
1209
1210 // Swap the header entries from network to host order
1211 rs->swapHeader();
1212
1213 rs->bufTyp = READ_STRUCT::kData;
1214 rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
1215
1216 debugHead(rs->B); // i and fadBoard not used
1217
1218 continue;
1219 }
1220
1221 const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
1222 if (end != 0xfe04)
1223 {
1224 factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
1225 rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
1226
1227 // ready to read next header
1228 rs->bufTyp = READ_STRUCT::kHeader;
1229 rs->bufLen = sizeof(PEVNT_HEADER);
1230 rs->bufPos = rs->B;
1231 // FIXME: What to do with the validity flag?
1232 continue;
1233 }
1234
1235 // get index into mBuffer for this event (create if needed)
1236 const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
1237
1238 // We have a valid entry, but no memory has yet been allocated
1239 if (evt && !evt->initMemory())
1240 {
1241 const time_t tm = time(NULL);
1242 if (evt->runCtrl->reportMem==tm)
1243 continue;
1244
1245 factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
1246 evt->runCtrl->reportMem = tm;
1247 continue;
1248 }
1249
1250 // ready to read next header
1251 rs->bufTyp = READ_STRUCT::kHeader;
1252 rs->bufLen = sizeof(PEVNT_HEADER);
1253 rs->bufPos = rs->B;
1254
1255 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1256 if (!evt)
1257 continue;
1258
1259 // This should never happen
1260 if (evt->board[rs->sockId] != -1)
1261 {
1262 factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
1263 evt->evNum, rs->sockId, rs->sockId, rs->len());
1264 // FIXME: What to do with the validity flag?
1265 continue; // Continue reading next header
1266 }
1267
1268 // Swap the data entries (board headers) from network to host order
1269 rs->swapData();
1270
1271 // Copy data from rd[i] to mBuffer[evID]
1272 copyData(*rs, evt.get());
1273
1274#ifdef COMPLETE_EVENTS
1275 // Do not read anmymore from this board until the whole event has been received
1276 rs->bufTyp = READ_STRUCT::kWait;
1277#endif
1278 // now we have stored a new board contents into Event structure
1279 evt->board[rs->sockId] = rs->sockId;
1280 evt->header = evt->FADhead+rs->sockId;
1281 evt->nBoard++;
1282
1283#ifdef COMPLETE_EPOLL
1284 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
1285 {
1286 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1287 break;
1288 }
1289#endif
1290 // event not yet complete
1291 if (evt->nBoard < READ_STRUCT::activeSockets)
1292 continue;
1293
1294 // All previous events are now flagged as incomplete ("expired")
1295 // and will be removed. (This is a bit tricky, because pop_front()
1296 // would invalidate the current iterator if not done _after_ the increment)
1297 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1298 {
1299 const bool found = it->get()==evt.get();
1300 if (!found)
1301 reportIncomplete(*it, "expired");
1302 else
1303 primaryQueue.post(evt);
1304
1305 // package_len is 0 if nothing was received.
1306 for (int ib=0; ib<40; ib++)
1307 rd[ib].relBytes += (*it)->FADhead[ib].package_length;
1308
1309 it++;
1310 evtCtrl.pop_front();
1311
1312 // We reached the current event, so we are done
1313 if (found)
1314 break;
1315 }
1316
1317#ifdef COMPLETE_EPOLL
1318 for (int j=0; j<40; j++)
1319 {
1320 epoll_event ev;
1321 ev.events = EPOLLIN;
1322 ev.data.ptr = &rd[j]; // user data (union: ev.ptr)
1323 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
1324 {
1325 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1326 return;
1327 }
1328 }
1329#endif
1330
1331#ifdef COMPLETE_EVENTS
1332 for (int j=0; j<40; j++)
1333 {
1334 //if (rs->bufTyp==READ_STRUCT::kWait)
1335 {
1336 rs->bufTyp = READ_STRUCT::kHeader;
1337 rs->bufLen = sizeof(PEVNT_HEADER);
1338 rs->bufPos = rs->B;
1339 }
1340 }
1341#endif
1342 } // end for loop over all sockets
1343#ifdef PRIORITY_QUEUE
1344 while (0); // convert continue into break ;)
1345#endif
1346
1347 // ==================================================================
1348
1349 const time_t actTime = time(NULL);
1350 if (actTime == gi_SecTime)
1351 {
1352#if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL)
1353 if (evtCtrl.empty())
1354 usleep(1);
1355#endif
1356 continue;
1357 }
1358 gi_SecTime = actTime;
1359
1360 // ==================================================================
1361 //loop over all active events and flag those older than read-timeout
1362 //delete those that are written to disk ....
1363
1364 // This could be improved having the pointer which separates the queue with
1365 // the incomplete events from the queue with the complete events
1366 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1367 {
1368 // A reference is enough because the shared_ptr is hold by the evtCtrl
1369 const shared_ptr<EVT_CTRL2> &evt = *it;
1370
1371 // The first event is the oldest. If the first event within the
1372 // timeout window was received, we can stop searchinf further.
1373 if (evt->time.tv_sec+g_evtTimeout>=actTime)
1374 break;
1375
1376 // This will result in the emission of a dim service.
1377 // It doesn't matter if that takes comparably long,
1378 // because we have to stop the run anyway.
1379 const uint64_t rep = reportIncomplete(evt, "timeout");
1380 factReportIncomplete(rep);
1381
1382 // package_len is 0 when nothing was received from this board
1383 for (int ib=0; ib<40; ib++)
1384 rd[ib].relBytes += (*it)->FADhead[ib].package_length;
1385
1386 it++;
1387 evtCtrl.pop_front();
1388 }
1389
1390 // =================================================================
1391
1392 gj.bufNew = evtCtrl.size(); //# incomplete events in buffer
1393 gj.bufEvt = primaryQueue.size(); //# complete events in buffer
1394 gj.bufWrite = secondaryQueue.size(); //# complete events in buffer
1395 gj.bufProc = processingQueue1.size(); //# complete events in buffer
1396 gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
1397 gj.usdMem = Memory::max_inuse;
1398 gj.totMem = Memory::allocated;
1399 gj.maxMem = g_maxMem;
1400
1401 gj.deltaT = 1000; // temporary, must be improved
1402
1403 bool changed = false;
1404
1405 static vector<uint64_t> store(NBOARDS);
1406
1407 for (int ib=0; ib<NBOARDS; ib++)
1408 {
1409 gj.rateBytes[ib] = store[ib]>rd[ib].totBytes ? rd[ib].totBytes : rd[ib].totBytes-store[ib];
1410 gj.relBytes[ib] = rd[ib].totBytes-rd[ib].relBytes;
1411
1412 store[ib] = rd[ib].totBytes;
1413
1414 if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
1415 changed = true;
1416
1417 gi_NumConnect[ib] = rd[ib].connected;
1418 gj.numConn[ib] = rd[ib].connected;
1419 }
1420
1421 factStat(gj);
1422
1423 Memory::max_inuse = 0;
1424
1425 // =================================================================
1426
1427 // This is a fake event to trigger possible run-closing conditions once a second
1428 // FIXME: This is not yet ideal because a file would never be closed
1429 // if a new file has been started and no events of the new file
1430 // have been received yet
1431 int request = kRequestNone;
1432
1433 // If nothing was received for more than 5min, close file
1434 if (actTime-actrun->lastTime>300)
1435 request |= kRequestTimeout;
1436
1437 // If connection status has changed
1438 if (changed)
1439 request |= kRequestConnectionChange;
1440
1441 if (request!=kRequestNone)
1442 runFinished();
1443
1444 if (actrun->fileStat==kFileOpen)
1445 primaryQueue.emplace(new EVT_CTRL2(request, actrun));
1446 }
1447
1448 // 1: Stop, wait for event to get processed
1449 // 2: Stop, finish immediately
1450 // 101: Restart, wait for events to get processed
1451 // 101: Restart, finish immediately
1452 //
1453 const int gi_reset = g_reset;
1454
1455 const bool abort = gi_reset%100==2;
1456
1457 factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
1458
1459 primaryQueue.wait(abort);
1460 secondaryQueue.wait(abort);
1461 processingQueue1.wait(abort);
1462
1463 // Here we also destroy all runCtrl structures and hence close all open files
1464 evtCtrl.clear();
1465 actrun.reset();
1466
1467 factPrintf(MessageImp::kInfo, "Exit read Process...");
1468 factPrintf(MessageImp::kInfo, "%ld Bytes flagged as in-use.", Memory::inuse);
1469
1470 factStat(gj);
1471
1472 return gi_reset>=100;
1473}
1474
1475// ==========================================================================
1476// ==========================================================================
1477
1478void StartEvtBuild()
1479{
1480 factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
1481
1482 memset(gi_NumConnect, 0, NBOARDS*sizeof(*gi_NumConnect));
1483
1484 memset(&gj, 0, sizeof(GUI_STAT));
1485
1486 gj.usdMem = Memory::inuse;
1487 gj.totMem = Memory::allocated;
1488 gj.maxMem = g_maxMem;
1489
1490
1491 READ_STRUCT rd[NBOARDS];
1492
1493 // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets)
1494 for (int i=0; i<NBOARDS; i++)
1495 rd[i].sockId = i;
1496
1497 while (mainloop(rd));
1498
1499 //must close all open sockets ...
1500 factPrintf(MessageImp::kInfo, "Close all sockets...");
1501
1502 READ_STRUCT::close();
1503
1504 // Now all sockets get closed. This is not reflected in gi_NumConnect
1505 // The current workaround is to count all sockets as closed when the thread is not running
1506}
Note: See TracBrowser for help on using the repository browser.