source: trunk/FACT++/src/EventBuilder.cc@ 16841

Last change on this file since 16841 was 16836, checked in by tbretz, 11 years ago
Use make_shared where possible.
File size: 44.3 KB
Line 
1#include <poll.h>
2#include <sys/time.h>
3#include <sys/epoll.h>
4#include <netinet/tcp.h>
5
6#include <cstring>
7#include <cstdarg>
8#include <list>
9#include <queue>
10
11#include <boost/algorithm/string/join.hpp>
12
13#include "queue.h"
14
15#include "MessageImp.h"
16#include "EventBuilder.h"
17
18using namespace std;
19
20#define MIN_LEN 32 // min #bytes needed to interpret FADheader
21#define MAX_LEN 81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092 (data+boardheader+eventheader+endflag)
22
23//#define COMPLETE_EVENTS
24//#define USE_POLL
25//#define USE_EPOLL
26//#define USE_SELECT
27//#define COMPLETE_EPOLL
28//#define PRIORITY_QUEUE
29
30// Reading only 1024: 13: 77Hz, 87%
31// Reading only 1024: 12: 78Hz, 46%
32// Reading only 300: 4: 250Hz, 92%
33// Reading only 300: 3: 258Hz, 40%
34
35// Reading only four threads 1024: 13: 77Hz, 60%
36// Reading only four threads 1024: 12: 78Hz, 46%
37// Reading only four threads 300: 4: 250Hz, 92%
38// Reading only four threads 300: 3: 258Hz, 40%
39
40// Default 300: 4: 249Hz, 92%
41// Default 300: 3: 261Hz, 40%
42// Default 1024: 13: 76Hz, 93%
43// Default 1024: 12: 79Hz, 46%
44
45// Poll [selected] 1024: 13: 63Hz, 45%
46// Poll [selected] 1024: 14: 63Hz, 63%
47// Poll [selected] 1024: 15: 64Hz, 80%
48// Poll [selected] 300: 4: 230Hz, 47%
49// Poll [selected] 300: 3: 200Hz, 94%
50
51// Poll [all] 1024: 13: 65Hz, 47%
52// Poll [all] 1024: 14: 64Hz, 59%
53// Poll [all] 1024: 15: 62Hz, 67%
54// Poll [all] 300: 4: 230Hz, 47%
55// Poll [all] 300: 3: 230Hz, 35%
56
57// ==========================================================================
58
59bool runOpen(const EVT_CTRL2 &evt);
60bool runWrite(const EVT_CTRL2 &evt);
61void runClose(RUN_CTRL2 &run);
62void applyCalib(const EVT_CTRL2 &evt, const size_t &size);
63void factOut(int severity, const char *message);
64void factReportIncomplete (uint64_t rep);
65void gotNewRun(RUN_CTRL2 &run);
66void runFinished();
67void factStat(const GUI_STAT &gj);
68bool eventCheck(const EVT_CTRL2 &evt);
69void debugHead(void *buf);
70
71// ==========================================================================
72
73int g_reset;
74
75size_t g_maxMem; //maximum memory allowed for buffer
76
77uint16_t g_evtTimeout; // timeout (sec) for one event
78
79FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
80
81uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
82
83GUI_STAT gj;
84
85// ==========================================================================
86
87namespace Memory
88{
89 uint64_t inuse = 0;
90 uint64_t allocated = 0;
91
92 uint64_t max_inuse = 0;
93
94 std::mutex mtx;
95
96 std::forward_list<void*> memory;
97
98 void *malloc()
99 {
100 // No free slot available, next alloc would exceed max memory
101 if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem)
102 return NULL;
103
104 // We will return this amount of memory
105 // This is not 100% thread safe, but it is not a super accurate measure anyway
106 inuse += MAX_TOT_MEM;
107 if (inuse>max_inuse)
108 max_inuse = inuse;
109
110 if (memory.empty())
111 {
112 // No free slot available, allocate a new one
113 allocated += MAX_TOT_MEM;
114 return new char[MAX_TOT_MEM];
115 }
116
117 // Get the next free slot from the stack and return it
118 const std::lock_guard<std::mutex> lock(mtx);
119
120 void *mem = memory.front();
121 memory.pop_front();
122 return mem;
123 };
124
125 void free(void *mem)
126 {
127 if (!mem)
128 return;
129
130 // Decrease the amont of memory in use accordingly
131 inuse -= MAX_TOT_MEM;
132
133 // If the maximum memory has changed, we might be over the limit.
134 // In this case: free a slot
135 if (allocated>g_maxMem)
136 {
137 delete [] (char*)mem;
138 allocated -= MAX_TOT_MEM;
139 return;
140 }
141
142 const std::lock_guard<std::mutex> lock(mtx);
143 memory.push_front(mem);
144 }
145
146};
147
148// ==========================================================================
149
150void factPrintf(int severity, const char *fmt, ...)
151{
152 char str[1000];
153
154 va_list ap;
155 va_start(ap, fmt);
156 vsnprintf(str, 1000, fmt, ap);
157 va_end(ap);
158
159 factOut(severity, str);
160}
161
162// ==========================================================================
163
164struct READ_STRUCT
165{
166 enum buftyp_t
167 {
168 kStream,
169 kHeader,
170 kData,
171#ifdef COMPLETE_EVENTS
172 kWait
173#endif
174 };
175
176 // ---------- connection ----------
177
178 static uint activeSockets;
179
180 int sockId; // socket id (board number)
181 int socket; // socket handle
182 bool connected; // is this socket connected?
183
184 struct sockaddr_in SockAddr; // Socket address copied from wrapper during socket creation
185
186 // ------------ epoll -------------
187
188 static int fd_epoll;
189 static epoll_event events[NBOARDS];
190
191 static void init();
192 static void close();
193 static int wait();
194 static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); }
195
196 // ------------ buffer ------------
197
198 buftyp_t bufTyp; // what are we reading at the moment: 0=header 1=data -1=skip ...
199
200 uint32_t bufLen; // number of bytes left to read
201 uint8_t *bufPos; // next byte to read to the buffer next
202
203 union
204 {
205 uint8_t B[MAX_LEN];
206 uint16_t S[MAX_LEN / 2];
207 uint32_t I[MAX_LEN / 4];
208 uint64_t L[MAX_LEN / 8];
209 PEVNT_HEADER H;
210 };
211
212 timeval time;
213 uint64_t totBytes; // total received bytes
214 uint64_t relBytes; // total released bytes
215 uint32_t skip; // number of bytes skipped before start of event
216
217 uint32_t len() const { return uint32_t(H.package_length)*2; }
218
219 void swapHeader();
220 void swapData();
221
222 // --------------------------------
223
224 READ_STRUCT() : socket(-1), connected(false), totBytes(0), relBytes(0)
225 {
226 if (fd_epoll<0)
227 init();
228 }
229 ~READ_STRUCT()
230 {
231 destroy();
232 }
233
234 void destroy();
235 bool create(sockaddr_in addr);
236 bool check(int, sockaddr_in addr);
237 bool read();
238
239};
240
241#ifdef PRIORITY_QUEUE
242struct READ_STRUCTcomp
243{
244 bool operator()(const READ_STRUCT *r1, const READ_STRUCT *r2)
245 {
246 const int64_t rel1 = r1->totBytes - r1->relBytes;
247 const int64_t rel2 = r2->totBytes - r2->relBytes;
248 return rel1 > rel2;
249 }
250};
251#endif
252
253int READ_STRUCT::wait()
254{
255 // wait for something to do...
256 const int rc = epoll_wait(fd_epoll, events, NBOARDS, 100); // max, timeout[ms]
257 if (rc>=0)
258 return rc;
259
260 if (errno==EINTR) // timout or signal interruption
261 return 0;
262
263 factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno);
264 return -1;
265}
266
267uint READ_STRUCT::activeSockets = 0;
268int READ_STRUCT::fd_epoll = -1;
269epoll_event READ_STRUCT::events[NBOARDS];
270
271void READ_STRUCT::init()
272{
273 if (fd_epoll>=0)
274 return;
275
276#ifdef USE_EPOLL
277 fd_epoll = epoll_create(NBOARDS);
278 if (fd_epoll<0)
279 {
280 factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno);
281 return;
282 }
283#endif
284}
285
286void READ_STRUCT::close()
287{
288#ifdef USE_EPOLL
289 if (fd_epoll>=0 && ::close(fd_epoll)>0)
290 factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno);
291#endif
292
293 fd_epoll = -1;
294}
295
296bool READ_STRUCT::create(sockaddr_in sockAddr)
297{
298 if (socket>=0)
299 return false;
300
301 const int port = ntohs(sockAddr.sin_port) + 1;
302
303 SockAddr.sin_family = sockAddr.sin_family;
304 SockAddr.sin_addr = sockAddr.sin_addr;
305 SockAddr.sin_port = htons(port);
306
307 if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
308 {
309 factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno);
310 socket = -1;
311 return false;
312 }
313
314 int optval = 1;
315 if (setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(int)) < 0)
316 factPrintf(MessageImp::kInfo, "Setting TCP_NODELAY for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
317
318 optval = 1;
319 if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0)
320 factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
321
322 optval = 10; //start after 10 seconds
323 if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0)
324 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
325
326 optval = 10; //do every 10 seconds
327 if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0)
328 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
329
330 optval = 2; //close after 2 unsuccessful tries
331 if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0)
332 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
333
334 factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket);
335
336 //connected = false;
337 activeSockets++;
338
339 return true;
340}
341
342void READ_STRUCT::destroy()
343{
344 if (socket<0)
345 return;
346
347#ifdef USE_EPOLL
348 // strictly speaking this should not be necessary
349 if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
350 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
351#endif
352
353 if (::close(socket) > 0)
354 factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
355 else
356 factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket);
357
358 socket = -1;
359 connected = false;
360 activeSockets--;
361}
362
363bool READ_STRUCT::check(int sockDef, sockaddr_in addr)
364{
365 // Continue in the most most likely case (performance)
366 //if (socket>=0 && sockDef!=0 && connected)
367 // return;
368 const int old = socket;
369
370 // socket open, but should not be open
371 if (socket>=0 && sockDef==0)
372 destroy();
373
374 // Socket closed, but should be open
375 if (socket<0 && sockDef!=0)
376 create(addr); //generate address and socket
377
378 const bool retval = old!=socket;
379
380 // Socket closed
381 if (socket<0)
382 return retval;
383
384 // Socket open and connected: Nothing to do
385 if (connected)
386 return retval;
387
388 //try to connect if not yet done
389 const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
390 if (rc == -1)
391 return retval;
392
393 connected = true;
394
395 if (sockDef<0)
396 {
397 bufTyp = READ_STRUCT::kStream; // full data to be skipped
398 bufLen = MAX_LEN; // huge for skipping
399 }
400 else
401 {
402 bufTyp = READ_STRUCT::kHeader; // expect a header
403 bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
404 }
405
406 bufPos = B; // no byte read so far
407 skip = 0; // start empty
408 totBytes = 0;
409 relBytes = 0;
410
411 factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
412
413#ifdef USE_EPOLL
414 epoll_event ev;
415 ev.events = EPOLLIN;
416 ev.data.ptr = this; // user data (union: ev.ptr)
417 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
418 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
419#endif
420
421 return retval;
422}
423
424bool READ_STRUCT::read()
425{
426 if (!connected)
427 return false;
428
429 if (bufLen==0)
430 return true;
431
432 const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
433 // recv failed
434 if (jrd<0)
435 {
436 // There was just nothing waiting
437 if (errno==EWOULDBLOCK || errno==EAGAIN)
438 return false;
439
440 factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno);
441 return false;
442 }
443
444 // connection was closed ...
445 if (jrd==0)
446 {
447 factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId);
448
449 destroy();//DestroySocket(rd[i]); //generate address and socket
450 return false;
451 }
452
453 totBytes += jrd;
454
455 // are we skipping this board ...
456 if (bufTyp==kStream)
457 return false;
458
459 if (bufPos==B)
460 gettimeofday(&time, NULL);
461
462 bufPos += jrd; //==> prepare for continuation
463 bufLen -= jrd;
464
465 // not yet all read
466 return bufLen==0;
467}
468
469void READ_STRUCT::swapHeader()
470{
471 S[1] = ntohs(S[1]); // package_length (bytes not swapped!)
472 S[2] = ntohs(S[2]); // version_no
473 S[3] = ntohs(S[3]); // PLLLCK
474 S[4] = ntohs(S[4]); // trigger_crc
475 S[5] = ntohs(S[5]); // trigger_type
476
477 I[3] = ntohl(I[3]); // trigger_id
478 I[4] = ntohl(I[4]); // fad_evt_counter
479 I[5] = ntohl(I[5]); // REFCLK_frequency
480
481 S[12] = ntohs(S[12]); // board id
482 S[13] = ntohs(S[13]); // adc_clock_phase_shift
483 S[14] = ntohs(S[14]); // number_of_triggers_to_generate
484 S[15] = ntohs(S[15]); // trigger_generator_prescaler
485
486 I[10] = ntohl(I[10]); // runnumber;
487 I[11] = ntohl(I[11]); // time;
488
489 // Use back inserter??
490 for (int s=24; s<24+NTemp+NDAC; s++)
491 S[s] = ntohs(S[s]); // drs_temperature / dac
492}
493
494void READ_STRUCT::swapData()
495{
496 // swapEventHeaderBytes: End of the header. to channels now
497
498 int i = 36;
499 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
500 {
501 S[i+0] = ntohs(S[i+0]);//id
502 S[i+1] = ntohs(S[i+1]);//start_cell
503 S[i+2] = ntohs(S[i+2]);//roi
504 S[i+3] = ntohs(S[i+3]);//filling
505
506 i += 4+S[i+2];//skip the pixel data
507 }
508}
509
510// ==========================================================================
511
512bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
513{
514 int xjr = -1;
515 int xkr = -1;
516
517 //points to the very first roi
518 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
519
520 roi[0] = ntohs(rd.S[roiPtr]);
521
522 for (int jr = 0; jr < 9; jr++)
523 {
524 roi[jr] = ntohs(rd.S[roiPtr]);
525
526 if (roi[jr]>1024)
527 {
528 factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]);
529 return false;
530 }
531
532 // Check that the roi of pixels jr are compatible with the one of pixel 0
533 if (jr!=8 && roi[jr]!=roi[0])
534 {
535 xjr = jr;
536 break;
537 }
538
539 // Check that the roi of all other DRS chips on boards are compatible
540 for (int kr = 1; kr < 4; kr++)
541 {
542 const int kroi = ntohs(rd.S[roiPtr]);
543 if (kroi != roi[jr])
544 {
545 xjr = jr;
546 xkr = kr;
547 break;
548 }
549 roiPtr += kroi+4;
550 }
551 }
552
553 if (xjr>=0)
554 {
555 if (xkr<0)
556 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
557 else
558 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr]));
559
560 return false;
561 }
562
563 if (roi[8] < roi[0])
564 {
565 factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
566 return false;
567 }
568
569 return true;
570}
571
572list<shared_ptr<EVT_CTRL2>> evtCtrl;
573
574shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
575{
576 /*
577 checkroi consistence
578 find existing entry
579 if no entry, try to allocate memory
580 if entry and memory, init event structure
581 */
582
583 uint16_t nRoi[9];
584 if (!checkRoiConsistency(rd, nRoi))
585 return shared_ptr<EVT_CTRL2>();
586
587 for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
588 {
589 // A reference is enough because the evtCtrl holds the shared_ptr anyway
590 const shared_ptr<EVT_CTRL2> &evt = *it;
591
592 // If the run is different, go on searching.
593 // We cannot stop searching if a lower run-id is found as in
594 // the case of the events, because theoretically, there
595 // can be the same run on two different days.
596 if (rd.H.runnumber != evt->runNum)
597 continue;
598
599 // If the ID of the new event if higher than the last one stored
600 // in that run, we have to assign a new slot (leave the loop)
601 if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/)
602 break;
603
604 if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/)
605 continue;
606
607 // We have found an entry with the same runID and evtID
608 // Check if ROI is consistent
609 if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
610 {
611 factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
612 evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
613 return shared_ptr<EVT_CTRL2>();
614 }
615
616 // It is maybe not likely, but the header of this board might have
617 // arrived earlier. (We could also update the run-info, but
618 // this should not make a difference here)
619 if ((rd.time.tv_sec==evt->time.tv_sec && rd.time.tv_usec<evt->time.tv_usec) ||
620 rd.time.tv_sec<evt->time.tv_sec)
621 evt->time = rd.time;
622
623 //everything seems fine so far ==> use this slot ....
624 return evt;
625 }
626
627 if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
628 {
629 factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
630 actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter);
631 return shared_ptr<EVT_CTRL2>();
632 }
633
634 EVT_CTRL2 *evt = new EVT_CTRL2;
635
636 evt->time = rd.time;
637
638 evt->runNum = rd.H.runnumber;
639 evt->evNum = rd.H.fad_evt_counter;
640
641 evt->trgNum = rd.H.trigger_id;
642 evt->trgTyp = rd.H.trigger_type;
643
644 evt->nRoi = nRoi[0];
645 evt->nRoiTM = nRoi[8];
646
647 const bool newrun = actrun->runId != rd.H.runnumber;
648 if (newrun)
649 {
650 // Since we have started a new run, we know already when to close the
651 // previous run in terms of number of events
652 actrun->maxEvt = actrun->lastEvt;
653
654 factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d",
655 rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
656
657 // The new run is the active run now
658 actrun = make_shared<RUN_CTRL2>();
659
660 const time_t &tsec = evt->time.tv_sec;
661
662 actrun->openTime = tsec;
663 actrun->closeTime = tsec + 3600 * 24; // max time allowed
664 actrun->runId = rd.H.runnumber;
665 actrun->roi0 = nRoi[0]; // FIXME: Make obsolete!
666 actrun->roi8 = nRoi[8]; // FIXME: Make obsolete!
667
668 // Signal the fadctrl that a new run has been started
669 // Note this is the only place at which we can ensure that
670 // gotnewRun is called only once
671 gotNewRun(*actrun);
672 }
673
674 // Keep pointer to run of this event
675 evt->runCtrl = actrun;
676
677 // Increase the number of events we have started to receive in this run
678 actrun->lastTime = evt->time.tv_sec; // Time when the last event was received
679 actrun->lastEvt++;
680
681 // An event can be the first and the last, but not the last and the first.
682 // Therefore gotNewRun is called before runFinished.
683 // runFinished signals that the last event of a run was just received. Processing
684 // might still be ongoing, but we can start a new run.
685 const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached
686 const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached
687 if (!cond1 || !cond2)
688 runFinished();
689
690 // We don't mind here that this is not common to all events,
691 // because every coming event will fullfil the condition as well.
692 if (!cond1)
693 evt->closeRequest |= kRequestMaxEvtsReached;
694 if (!cond2)
695 evt->closeRequest |= kRequestMaxTimeReached;
696
697 // Secure access to evtCtrl against access in CloseRunFile
698 // This should be the last... otherwise we can run into threading issues
699 // if the event is accessed before it is fully initialized.
700 evtCtrl.emplace_back(evt);
701 return evtCtrl.back();
702}
703
704
705void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
706{
707 const int i = rBuf.sockId;
708
709 memcpy(evt->FADhead+i, &rBuf.H, sizeof(PEVNT_HEADER));
710
711 int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts
712
713 // consistency of ROIs have been checked already (is it all correct?)
714 const uint16_t &roi = rBuf.S[src+2];
715
716 // different sort in FAD board.....
717 EVENT *event = evt->fEvent;
718 for (int px = 0; px < 9; px++)
719 {
720 for (int drs = 0; drs < 4; drs++)
721 {
722 const int16_t pixC = rBuf.S[src+1]; // start-cell
723 const int16_t pixR = rBuf.S[src+2]; // roi
724 //here we should check if pixH is correct ....
725
726 const int pixS = i*36 + drs*9 + px;
727
728 event->StartPix[pixS] = pixC;
729
730 memcpy(event->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2);
731
732 src += 4+pixR;
733
734 // Treatment for ch 9 (TM channel)
735 if (px != 8)
736 continue;
737
738 const int tmS = i*4 + drs;
739
740 //and we have additional TM info
741 if (pixR > roi)
742 {
743 event->StartTM[tmS] = (pixC + pixR - roi) % 1024;
744
745 memcpy(event->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2);
746 }
747 else
748 {
749 event->StartTM[tmS] = -1;
750 }
751 }
752 }
753}
754
755// ==========================================================================
756
757uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt)
758{
759 factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)",
760 evt->runNum, evt->evNum, evtCtrl.size(), txt);
761
762 uint64_t report = 0;
763
764 char str[1000];
765
766 int ik=0;
767 for (int ib=0; ib<NBOARDS; ib++)
768 {
769 if (ib%10==0)
770 str[ik++] = '|';
771
772 const int jb = evt->board[ib];
773 if (jb>=0) // data received from that board
774 {
775 str[ik++] = '0'+(jb%10);
776 continue;
777 }
778
779 // FIXME: This is not synchronous... it reports
780 // accoridng to the current connection status, not w.r.t. to the
781 // one when the event was taken.
782 if (gi_NumConnect[ib]==0) // board not connected
783 {
784 str[ik++] = 'x';
785 continue;
786 }
787
788 // data from this board lost
789 str[ik++] = '.';
790 report |= ((uint64_t)1)<<ib;
791 }
792
793 str[ik++] = '|';
794 str[ik] = 0;
795
796 factOut(MessageImp::kWarn, str);
797
798 return report;
799}
800
801// ==========================================================================
802// ==========================================================================
803
804void proc1(const shared_ptr<EVT_CTRL2> &);
805
806Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&proc1, placeholders::_1));
807
808void proc1(const shared_ptr<EVT_CTRL2> &evt)
809{
810 applyCalib(*evt, processingQueue1.size());
811}
812
813// If this is not convenient anymore, it could be replaced by
814// a command queue, to which command+data is posted,
815// (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo)
816void writeEvt(const shared_ptr<EVT_CTRL2> &evt)
817{
818 //const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
819 RUN_CTRL2 &run = *evt->runCtrl;
820
821 // Is this a valid event or just an empty event to trigger run close?
822 // If this is not an empty event open the new run-file
823 // Empty events are there to trigger run-closing conditions
824 if (evt->valid())
825 {
826 // File not yet open
827 if (run.fileStat==kFileNotYetOpen)
828 {
829 // runOpen will close a previous run, if still open
830 if (!runOpen(*evt))
831 {
832 factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
833 run.fileStat = kFileClosed;
834 return;
835 }
836
837 factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
838 run.fileStat = kFileOpen;
839 }
840
841 // Here we have a valid calibration and can go on with that.
842 // It is important that _all_ events are sent for calibration (except broken ones)
843 processingQueue1.post(evt);
844 }
845
846 // File already closed
847 if (run.fileStat==kFileClosed)
848 return;
849
850 bool rc1 = true;
851 if (evt->valid())
852 {
853 rc1 = runWrite(*evt);
854 if (!rc1)
855 factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
856 }
857
858 // File not open... no need to close or to check for close
859 // ... this is the case if CloseRunFile was called before any file was opened.
860 if (run.fileStat!=kFileOpen)
861 return;
862
863 // File is not yet to be closed.
864 if (rc1 && evt->closeRequest==kRequestNone)
865 return;
866
867 runClose(run);
868 run.fileStat = kFileClosed;
869
870 vector<string> reason;
871 if (evt->closeRequest&kRequestManual)
872 reason.emplace_back("close requested");
873 if (evt->closeRequest&kRequestTimeout)
874 reason.emplace_back("receive timeout");
875 if (evt->closeRequest&kRequestConnectionChange)
876 reason.emplace_back("connection changed");
877 if (evt->closeRequest&kRequestEventCheckFailed)
878 reason.emplace_back("event check failed");
879 if (evt->closeRequest&kRequestMaxTimeReached)
880 reason.push_back(to_string(run.closeTime-run.openTime)+"s reached");
881 if (evt->closeRequest&kRequestMaxEvtsReached)
882 reason.push_back(to_string(run.maxEvt)+" evts reached");
883 if (!rc1)
884 reason.emplace_back("runWrite failed");
885
886 const string str = boost::algorithm::join(reason, ", ");
887 factPrintf(MessageImp::kInfo, "File closed because %s", str.c_str());
888}
889
890Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1));
891
892void procEvt(const shared_ptr<EVT_CTRL2> &evt)
893{
894 if (evt->valid())
895 {
896 EVENT *event = evt->fEvent;
897
898 // This is already done in initMemory()
899 //event->Roi = evt->runCtrl->roi0;
900 //event->RoiTM = evt->runCtrl->roi8;
901 //event->EventNum = evt->evNum;
902 //event->TriggerNum = evt->trgNum;
903 //event->TriggerType = evt->trgTyp;
904
905 event->NumBoards = evt->nBoard;
906
907 event->PCTime = evt->time.tv_sec;
908 event->PCUsec = evt->time.tv_usec;
909
910 for (int ib=0; ib<NBOARDS; ib++)
911 event->BoardTime[ib] = evt->FADhead[ib].time;
912
913 if (!eventCheck(*evt))
914 {
915 secondaryQueue.emplace(new EVT_CTRL2(kRequestEventCheckFailed, evt->runCtrl));
916 return;
917 }
918 }
919
920 // If file is open post the event for being written
921 secondaryQueue.post(evt);
922}
923
924// ==========================================================================
925// ==========================================================================
926
927/*
928 task 1-4:
929
930 lock1()-lock4();
931 while (1)
932 {
933 wait for signal [lockN]; // unlocked
934
935 while (n!=10)
936 wait sockets;
937 read;
938
939 lockM();
940 finished[n] = true;
941 signal(mainloop);
942 unlockM();
943 }
944
945
946 mainloop:
947
948 while (1)
949 {
950 lockM();
951 while (!finished[0] || !finished[1] ...)
952 wait for signal [lockM]; // unlocked... signals can be sent
953 finished[0-1] = false;
954 unlockM()
955
956 copy data to queue // locked
957
958 lockN[0-3];
959 signalN[0-3];
960 unlockN[0-3];
961 }
962
963
964 */
965
966/*
967 while (g_reset)
968 {
969 shared_ptr<EVT_CTRL2> evt = new shared_ptr<>;
970
971 // Check that all sockets are connected
972
973 for (int i=0; i<40; i++)
974 if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0)
975 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
976
977 while (g_reset)
978 {
979 if (READ_STRUCT::wait()<0)
980 break;
981
982 if (rc_epoll==0)
983 break;
984
985 for (int jj=0; jj<rc_epoll; jj++)
986 {
987 READ_STRUCT *rs = READ_STRUCT::get(jj);
988 if (!rs->connected)
989 continue;
990
991 const bool rc_read = rs->read();
992 if (!rc_read)
993 continue;
994
995 if (rs->bufTyp==READ_STRUCT::kHeader)
996 {
997 [...]
998 }
999
1000 [...]
1001
1002 if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
1003 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1004 }
1005
1006 if (once_a_second)
1007 {
1008 if (evt==timeout)
1009 break;
1010 }
1011 }
1012
1013 if (evt.nBoards==actBoards)
1014 primaryQueue.post(evt);
1015 }
1016*/
1017
1018Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
1019
1020// This corresponds more or less to fFile... should we merge both?
1021shared_ptr<RUN_CTRL2> actrun;
1022
1023void CloseRunFile()
1024{
1025 // Currently we need actrun here, to be able to set kFileClosed.
1026 // Apart from that we have to ensure that there is an open file at all
1027 // which we can close.
1028 // Submission to the primary queue ensures that the event
1029 // is placed at the right place in the processing chain.
1030 // (Corresponds to the correct run)
1031 primaryQueue.emplace(new EVT_CTRL2(kRequestManual, actrun));
1032}
1033
1034bool mainloop(READ_STRUCT *rd)
1035{
1036 factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
1037
1038 primaryQueue.start();
1039 secondaryQueue.start();
1040 processingQueue1.start();;
1041
1042 actrun = make_shared<RUN_CTRL2>();
1043
1044 //time in seconds
1045 time_t gi_SecTime = time(NULL)-1;
1046
1047 //loop until global variable g_runStat claims stop
1048 g_reset = 0;
1049 while (g_reset == 0)
1050 {
1051#ifdef USE_POLL
1052 int pp[40];
1053 int nn = 0;
1054 pollfd fds[40];
1055 for (int i=0; i<40; i++)
1056 {
1057 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1058 {
1059 fds[nn].fd = rd[i].socket;
1060 fds[nn].events = POLLIN;
1061 pp[nn] = i;
1062 nn++;
1063 }
1064 }
1065
1066 const int rc_epoll = poll(fds, nn, 100);
1067 if (rc_epoll<0)
1068 break;
1069#endif
1070
1071#ifdef USE_SELECT
1072 fd_set readfs;
1073 FD_ZERO(&readfs);
1074 int nfsd = 0;
1075 for (int i=0; i<NBOARDS; i++)
1076 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1077 {
1078 FD_SET(rd[i].socket, &readfs);
1079 if (rd[i].socket>nfsd)
1080 nfsd = rd[i].socket;
1081 }
1082
1083 timeval tv;
1084 tv.tv_sec = 0;
1085 tv.tv_usec = 100000;
1086 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
1087 // 0: timeout
1088 // -1: error
1089 if (rc_select<0)
1090 {
1091 factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
1092 continue;
1093 }
1094#endif
1095
1096#ifdef USE_EPOLL
1097 const int rc_epoll = READ_STRUCT::wait();
1098 if (rc_epoll<0)
1099 break;
1100#endif
1101
1102#ifdef PRIORITY_QUEUE
1103 priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio;
1104
1105 for (int i=0; i<NBOARDS; i++)
1106 if (rd[i].connected)
1107 prio.push(rd+i);
1108
1109 if (!prio.empty()) do
1110#endif
1111
1112
1113#ifdef USE_POLL
1114 for (int jj=0; jj<nn; jj++)
1115#endif
1116#ifdef USE_EPOLL
1117 for (int jj=0; jj<rc_epoll; jj++)
1118#endif
1119#if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE)
1120 for (int jj=0; jj<NBOARDS; jj++)
1121#endif
1122 {
1123#ifdef PRIORITY_QUEUE
1124 READ_STRUCT *rs = prio.top();
1125#endif
1126#ifdef USE_SELECT
1127 if (!FD_ISSET(rs->socket, &readfs))
1128 continue;
1129#endif
1130
1131#ifdef USE_POLL
1132 if ((fds[jj].revents&POLLIN)==0)
1133 continue;
1134#endif
1135
1136#ifdef USE_EPOLL
1137 // FIXME: How to get i?
1138 READ_STRUCT *rs = READ_STRUCT::get(jj);
1139#endif
1140
1141#ifdef USE_POLL
1142 // FIXME: How to get i?
1143 READ_STRUCT *rs = &rd[pp[jj]];
1144#endif
1145
1146#if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE)
1147 const int i = (jj%4)*10 + (jj/4);
1148 READ_STRUCT *rs = &rd[i];
1149#endif
1150
1151#ifdef COMPLETE_EVENTS
1152 if (rs->bufTyp==READ_STRUCT::kWait)
1153 continue;
1154#endif
1155
1156 // ==================================================================
1157
1158 const bool rc_read = rs->read();
1159
1160 // Connect might have gotten closed during read
1161 gi_NumConnect[rs->sockId] = rs->connected;
1162 gj.numConn[rs->sockId] = rs->connected;
1163
1164 // Read either failed or disconnected, or the buffer is not yet full
1165 if (!rc_read)
1166 continue;
1167
1168 // ==================================================================
1169
1170 if (rs->bufTyp==READ_STRUCT::kHeader)
1171 {
1172 //check if startflag correct; else shift block ....
1173 // FIXME: This is not enough... this combination of
1174 // bytes can be anywhere... at least the end bytes
1175 // must be checked somewhere, too.
1176 uint k;
1177 for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
1178 {
1179 //if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
1180 if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0x01fb)
1181 break;
1182 }
1183 rs->skip += k;
1184
1185 //no start of header found
1186 if (k==sizeof(PEVNT_HEADER)-1)
1187 {
1188 rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1];
1189 rs->bufPos = rs->B+1;
1190 rs->bufLen = sizeof(PEVNT_HEADER)-1;
1191 continue;
1192 }
1193
1194 if (k > 0)
1195 {
1196 memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
1197
1198 rs->bufPos -= k;
1199 rs->bufLen += k;
1200
1201 continue; // We need to read more (bufLen>0)
1202 }
1203
1204 if (rs->skip>0)
1205 {
1206 factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
1207 rs->skip = 0;
1208 }
1209
1210 // Swap the header entries from network to host order
1211 rs->swapHeader();
1212
1213 rs->bufTyp = READ_STRUCT::kData;
1214 rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
1215
1216 debugHead(rs->B); // i and fadBoard not used
1217
1218 continue;
1219 }
1220
1221 const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
1222 if (end != 0xfe04)
1223 {
1224 factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
1225 rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
1226
1227 // ready to read next header
1228 rs->bufTyp = READ_STRUCT::kHeader;
1229 rs->bufLen = sizeof(PEVNT_HEADER);
1230 rs->bufPos = rs->B;
1231 // FIXME: What to do with the validity flag?
1232 continue;
1233 }
1234
1235 // get index into mBuffer for this event (create if needed)
1236 const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
1237
1238 // We have a valid entry, but no memory has yet been allocated
1239 if (evt && !evt->initMemory())
1240 {
1241 const time_t tm = time(NULL);
1242 if (evt->runCtrl->reportMem==tm)
1243 continue;
1244
1245 factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
1246 evt->runCtrl->reportMem = tm;
1247 continue;
1248 }
1249
1250 // ready to read next header
1251 rs->bufTyp = READ_STRUCT::kHeader;
1252 rs->bufLen = sizeof(PEVNT_HEADER);
1253 rs->bufPos = rs->B;
1254
1255 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1256 if (!evt)
1257 continue;
1258
1259 // This should never happen
1260 if (evt->board[rs->sockId] != -1)
1261 {
1262 factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
1263 evt->evNum, rs->sockId, rs->sockId, rs->len());
1264 // FIXME: What to do with the validity flag?
1265 continue; // Continue reading next header
1266 }
1267
1268 // Swap the data entries (board headers) from network to host order
1269 rs->swapData();
1270
1271 // Copy data from rd[i] to mBuffer[evID]
1272 copyData(*rs, evt.get());
1273
1274#ifdef COMPLETE_EVENTS
1275 // Do not read anmymore from this board until the whole event has been received
1276 rs->bufTyp = READ_STRUCT::kWait;
1277#endif
1278 // now we have stored a new board contents into Event structure
1279 evt->board[rs->sockId] = rs->sockId;
1280 evt->header = evt->FADhead+rs->sockId;
1281 evt->nBoard++;
1282
1283#ifdef COMPLETE_EPOLL
1284 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
1285 {
1286 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1287 break;
1288 }
1289#endif
1290 // event not yet complete
1291 if (evt->nBoard < READ_STRUCT::activeSockets)
1292 continue;
1293
1294 // All previous events are now flagged as incomplete ("expired")
1295 // and will be removed. (This is a bit tricky, because pop_front()
1296 // would invalidate the current iterator if not done _after_ the increment)
1297 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1298 {
1299 const bool found = it->get()==evt.get();
1300 if (!found)
1301 reportIncomplete(*it, "expired");
1302 else
1303 primaryQueue.post(evt);
1304
1305 // package_len is 0 if nothing was received.
1306 for (int ib=0; ib<40; ib++)
1307 rd[ib].relBytes += (*it)->FADhead[ib].package_length;
1308
1309 it++;
1310 evtCtrl.pop_front();
1311
1312 // We reached the current event, so we are done
1313 if (found)
1314 break;
1315 }
1316
1317#ifdef COMPLETE_EPOLL
1318 for (int j=0; j<40; j++)
1319 {
1320 epoll_event ev;
1321 ev.events = EPOLLIN;
1322 ev.data.ptr = &rd[j]; // user data (union: ev.ptr)
1323 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
1324 {
1325 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1326 return;
1327 }
1328 }
1329#endif
1330
1331#ifdef COMPLETE_EVENTS
1332 for (int j=0; j<40; j++)
1333 {
1334 //if (rs->bufTyp==READ_STRUCT::kWait)
1335 {
1336 rs->bufTyp = READ_STRUCT::kHeader;
1337 rs->bufLen = sizeof(PEVNT_HEADER);
1338 rs->bufPos = rs->B;
1339 }
1340 }
1341#endif
1342 } // end for loop over all sockets
1343#ifdef PRIORITY_QUEUE
1344 while (0); // convert continue into break ;)
1345#endif
1346
1347 // ==================================================================
1348
1349 const time_t actTime = time(NULL);
1350 if (actTime == gi_SecTime)
1351 {
1352#if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL)
1353 if (evtCtrl.empty())
1354 usleep(actTime-actrun->lastTime>300 ? 10000 : 1);
1355#endif
1356 continue;
1357 }
1358 gi_SecTime = actTime;
1359
1360 // ==================================================================
1361 //loop over all active events and flag those older than read-timeout
1362 //delete those that are written to disk ....
1363
1364 // This could be improved having the pointer which separates the queue with
1365 // the incomplete events from the queue with the complete events
1366 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1367 {
1368 // A reference is enough because the shared_ptr is hold by the evtCtrl
1369 const shared_ptr<EVT_CTRL2> &evt = *it;
1370
1371 // The first event is the oldest. If the first event within the
1372 // timeout window was received, we can stop searchinf further.
1373 if (evt->time.tv_sec+g_evtTimeout>=actTime)
1374 break;
1375
1376 // This will result in the emission of a dim service.
1377 // It doesn't matter if that takes comparably long,
1378 // because we have to stop the run anyway.
1379 const uint64_t rep = reportIncomplete(evt, "timeout");
1380 factReportIncomplete(rep);
1381
1382 // package_len is 0 when nothing was received from this board
1383 for (int ib=0; ib<40; ib++)
1384 rd[ib].relBytes += (*it)->FADhead[ib].package_length;
1385
1386 it++;
1387 evtCtrl.pop_front();
1388 }
1389
1390 // =================================================================
1391
1392 gj.bufNew = evtCtrl.size(); //# incomplete events in buffer
1393 gj.bufEvt = primaryQueue.size(); //# complete events in buffer
1394 gj.bufWrite = secondaryQueue.size(); //# complete events in buffer
1395 gj.bufProc = processingQueue1.size(); //# complete events in buffer
1396 gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
1397 gj.usdMem = Memory::max_inuse;
1398 gj.totMem = Memory::allocated;
1399 gj.maxMem = g_maxMem;
1400
1401 gj.deltaT = 1000; // temporary, must be improved
1402
1403 bool changed = false;
1404
1405 static vector<uint64_t> store(NBOARDS);
1406
1407 for (int ib=0; ib<NBOARDS; ib++)
1408 {
1409 gj.rateBytes[ib] = store[ib]>rd[ib].totBytes ? rd[ib].totBytes : rd[ib].totBytes-store[ib];
1410 gj.relBytes[ib] = rd[ib].totBytes-rd[ib].relBytes;
1411
1412 store[ib] = rd[ib].totBytes;
1413
1414 if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
1415 changed = true;
1416
1417 gi_NumConnect[ib] = rd[ib].connected;
1418 gj.numConn[ib] = rd[ib].connected;
1419 }
1420
1421 factStat(gj);
1422
1423 Memory::max_inuse = 0;
1424
1425 // =================================================================
1426
1427 // This is a fake event to trigger possible run-closing conditions once a second
1428 // FIXME: This is not yet ideal because a file would never be closed
1429 // if a new file has been started and no events of the new file
1430 // have been received yet
1431 int request = kRequestNone;
1432
1433 // If nothing was received for more than 5min, close file
1434 if (actTime-actrun->lastTime>300)
1435 request |= kRequestTimeout;
1436
1437 // If connection status has changed
1438 if (changed)
1439 request |= kRequestConnectionChange;
1440
1441 if (request!=kRequestNone)
1442 runFinished();
1443
1444 if (actrun->fileStat==kFileOpen)
1445 primaryQueue.emplace(new EVT_CTRL2(request, actrun));
1446 }
1447
1448 // 1: Stop, wait for event to get processed
1449 // 2: Stop, finish immediately
1450 // 101: Restart, wait for events to get processed
1451 // 101: Restart, finish immediately
1452 //
1453 const int gi_reset = g_reset;
1454
1455 const bool abort = gi_reset%100==2;
1456
1457 factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
1458
1459 primaryQueue.wait(abort);
1460 secondaryQueue.wait(abort);
1461 processingQueue1.wait(abort);
1462
1463 // Here we also destroy all runCtrl structures and hence close all open files
1464 evtCtrl.clear();
1465 actrun.reset();
1466
1467 factPrintf(MessageImp::kInfo, "Exit read Process...");
1468 factPrintf(MessageImp::kInfo, "%ld Bytes flagged as in-use.", Memory::inuse);
1469
1470 factStat(gj);
1471
1472 return gi_reset>=100;
1473}
1474
1475// ==========================================================================
1476// ==========================================================================
1477
1478void StartEvtBuild()
1479{
1480 factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
1481
1482 memset(gi_NumConnect, 0, NBOARDS*sizeof(*gi_NumConnect));
1483
1484 memset(&gj, 0, sizeof(GUI_STAT));
1485
1486 gj.usdMem = Memory::inuse;
1487 gj.totMem = Memory::allocated;
1488 gj.maxMem = g_maxMem;
1489
1490
1491 READ_STRUCT rd[NBOARDS];
1492
1493 // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets)
1494 for (int i=0; i<NBOARDS; i++)
1495 rd[i].sockId = i;
1496
1497 while (mainloop(rd));
1498
1499 //must close all open sockets ...
1500 factPrintf(MessageImp::kInfo, "Close all sockets...");
1501
1502 READ_STRUCT::close();
1503
1504 // Now all sockets get closed. This is not reflected in gi_NumConnect
1505 // The current workaround is to count all sockets as closed when the thread is not running
1506}
Note: See TracBrowser for help on using the repository browser.