source: trunk/FACT++/src/EventBuilder.cc@ 16587

Last change on this file since 16587 was 16584, checked in by tbretz, 12 years ago
Removed Errors and SoftTrig; let the runClose access the RUN_CTRL (to update the calibration constants); the whole memory is now reset in initMemory at once.
File size: 42.7 KB
Line 
1#include <poll.h>
2#include <sys/time.h>
3#include <sys/epoll.h>
4#include <netinet/tcp.h>
5
6#include <cstring>
7#include <cstdarg>
8#include <list>
9
10#include <boost/algorithm/string/join.hpp>
11
12#include "queue.h"
13
14#include "MessageImp.h"
15#include "EventBuilder.h"
16
17using namespace std;
18
19#define MIN_LEN 32 // min #bytes needed to interpret FADheader
20#define MAX_LEN 81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092 (data+boardheader+eventheader+endflag)
21
22//#define COMPLETE_EVENTS
23//#define USE_POLL
24//#define USE_EPOLL
25//#define USE_SELECT
26//#define COMPLETE_EPOLL
27
28// Reading only 1024: 13: 77Hz, 87%
29// Reading only 1024: 12: 78Hz, 46%
30// Reading only 300: 4: 250Hz, 92%
31// Reading only 300: 3: 258Hz, 40%
32
33// Reading only four threads 1024: 13: 77Hz, 60%
34// Reading only four threads 1024: 12: 78Hz, 46%
35// Reading only four threads 300: 4: 250Hz, 92%
36// Reading only four threads 300: 3: 258Hz, 40%
37
38// Default 300: 4: 249Hz, 92%
39// Default 300: 3: 261Hz, 40%
40// Default 1024: 13: 76Hz, 93%
41// Default 1024: 12: 79Hz, 46%
42
43// Poll [selected] 1024: 13: 63Hz, 45%
44// Poll [selected] 1024: 14: 63Hz, 63%
45// Poll [selected] 1024: 15: 64Hz, 80%
46// Poll [selected] 300: 4: 230Hz, 47%
47// Poll [selected] 300: 3: 200Hz, 94%
48
49// Poll [all] 1024: 13: 65Hz, 47%
50// Poll [all] 1024: 14: 64Hz, 59%
51// Poll [all] 1024: 15: 62Hz, 67%
52// Poll [all] 300: 4: 230Hz, 47%
53// Poll [all] 300: 3: 230Hz, 35%
54
55// ==========================================================================
56
57bool runOpen(const EVT_CTRL2 &evt);
58bool runWrite(const EVT_CTRL2 &evt);
59void runClose(RUN_CTRL2 &run);
60void applyCalib(const EVT_CTRL2 &evt, const size_t &size);
61void factOut(int severity, const char *message);
62void factReportIncomplete (uint64_t rep);
63void gotNewRun(RUN_CTRL2 &run);
64void runFinished();
65void factStat(const GUI_STAT &gj);
66bool eventCheck(const EVT_CTRL2 &evt);
67void debugHead(void *buf);
68
69// ==========================================================================
70
71int g_reset;
72
73size_t g_maxMem; //maximum memory allowed for buffer
74
75FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
76
77uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
78
79GUI_STAT gj;
80
81// ==========================================================================
82
83namespace Memory
84{
85 uint64_t inuse = 0;
86 uint64_t allocated = 0;
87
88 uint64_t max_inuse = 0;
89
90 std::mutex mtx;
91
92 std::forward_list<void*> memory;
93
94 void *malloc()
95 {
96 // No free slot available, next alloc would exceed max memory
97 if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem)
98 return NULL;
99
100 // We will return this amount of memory
101 // This is not 100% thread safe, but it is not a super accurate measure anyway
102 inuse += MAX_TOT_MEM;
103 if (inuse>max_inuse)
104 max_inuse = inuse;
105
106 if (memory.empty())
107 {
108 // No free slot available, allocate a new one
109 allocated += MAX_TOT_MEM;
110 return new char[MAX_TOT_MEM];
111 }
112
113 // Get the next free slot from the stack and return it
114 const std::lock_guard<std::mutex> lock(mtx);
115
116 void *mem = memory.front();
117 memory.pop_front();
118 return mem;
119 };
120
121 void free(void *mem)
122 {
123 if (!mem)
124 return;
125
126 // Decrease the amont of memory in use accordingly
127 inuse -= MAX_TOT_MEM;
128
129 // If the maximum memory has changed, we might be over the limit.
130 // In this case: free a slot
131 if (allocated>g_maxMem)
132 {
133 delete [] (char*)mem;
134 allocated -= MAX_TOT_MEM;
135 return;
136 }
137
138 const std::lock_guard<std::mutex> lock(mtx);
139 memory.push_front(mem);
140 }
141
142};
143
144// ==========================================================================
145
146void factPrintf(int severity, const char *fmt, ...)
147{
148 char str[1000];
149
150 va_list ap;
151 va_start(ap, fmt);
152 vsnprintf(str, 1000, fmt, ap);
153 va_end(ap);
154
155 factOut(severity, str);
156}
157
158// ==========================================================================
159
160struct READ_STRUCT
161{
162 enum buftyp_t
163 {
164 kStream,
165 kHeader,
166 kData,
167#ifdef COMPLETE_EVENTS
168 kWait
169#endif
170 };
171
172 // ---------- connection ----------
173
174 static uint activeSockets;
175
176 int sockId; // socket id (board number)
177 int socket; // socket handle
178 bool connected; // is this socket connected?
179
180 struct sockaddr_in SockAddr; // Socket address copied from wrapper during socket creation
181
182 // ------------ epoll -------------
183
184 static int fd_epoll;
185 static epoll_event events[NBOARDS];
186
187 static void init();
188 static void close();
189 static int wait();
190 static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); }
191
192 // ------------ buffer ------------
193
194 buftyp_t bufTyp; // what are we reading at the moment: 0=header 1=data -1=skip ...
195
196 uint32_t bufLen; // number of bytes left to read
197 uint8_t *bufPos; // next byte to read to the buffer next
198
199 union
200 {
201 uint8_t B[MAX_LEN];
202 uint16_t S[MAX_LEN / 2];
203 uint32_t I[MAX_LEN / 4];
204 uint64_t L[MAX_LEN / 8];
205 PEVNT_HEADER H;
206 };
207
208 timeval time;
209 uint64_t rateBytes;
210 uint32_t skip; // number of bytes skipped before start of event
211
212 uint32_t len() const { return uint32_t(H.package_length)*2; }
213
214 void swapHeader();
215 void swapData();
216
217 // --------------------------------
218
219 READ_STRUCT() : socket(-1), connected(false), rateBytes(0)
220 {
221 if (fd_epoll<0)
222 init();
223 }
224 ~READ_STRUCT()
225 {
226 destroy();
227 }
228
229 void destroy();
230 bool create(sockaddr_in addr);
231 bool check(int, sockaddr_in addr);
232 bool read();
233};
234
235int READ_STRUCT::wait()
236{
237 // wait for something to do...
238 const int rc = epoll_wait(fd_epoll, events, NBOARDS, 100); // max, timeout[ms]
239 if (rc>=0)
240 return rc;
241
242 if (errno==EINTR) // timout or signal interruption
243 return 0;
244
245 factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno);
246 return -1;
247}
248
249uint READ_STRUCT::activeSockets = 0;
250int READ_STRUCT::fd_epoll = -1;
251epoll_event READ_STRUCT::events[NBOARDS];
252
253void READ_STRUCT::init()
254{
255 if (fd_epoll>=0)
256 return;
257
258#ifdef USE_EPOLL
259 fd_epoll = epoll_create(NBOARDS);
260 if (fd_epoll<0)
261 {
262 factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno);
263 return;
264 }
265#endif
266}
267
268void READ_STRUCT::close()
269{
270#ifdef USE_EPOLL
271 if (fd_epoll>=0 && ::close(fd_epoll)>0)
272 factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno);
273#endif
274
275 fd_epoll = -1;
276}
277
278bool READ_STRUCT::create(sockaddr_in sockAddr)
279{
280 if (socket>=0)
281 return false;
282
283 const int port = ntohs(sockAddr.sin_port) + 1;
284
285 SockAddr.sin_family = sockAddr.sin_family;
286 SockAddr.sin_addr = sockAddr.sin_addr;
287 SockAddr.sin_port = htons(port);
288
289 if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
290 {
291 factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno);
292 socket = -1;
293 return false;
294 }
295
296 int optval = 1;
297 if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0)
298 factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
299
300 optval = 10; //start after 10 seconds
301 if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0)
302 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
303
304 optval = 10; //do every 10 seconds
305 if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0)
306 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
307
308 optval = 2; //close after 2 unsuccessful tries
309 if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0)
310 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
311
312 factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket);
313
314 //connected = false;
315 activeSockets++;
316
317 return true;
318}
319
320void READ_STRUCT::destroy()
321{
322 if (socket<0)
323 return;
324
325#ifdef USE_EPOLL
326 // strictly speaking this should not be necessary
327 if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
328 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
329#endif
330
331 if (::close(socket) > 0)
332 factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
333 else
334 factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket);
335
336 socket = -1;
337 connected = false;
338 activeSockets--;
339}
340
341bool READ_STRUCT::check(int sockDef, sockaddr_in addr)
342{
343 // Continue in the most most likely case (performance)
344 //if (socket>=0 && sockDef!=0 && connected)
345 // return;
346 const int old = socket;
347
348 // socket open, but should not be open
349 if (socket>=0 && sockDef==0)
350 destroy();
351
352 // Socket closed, but should be open
353 if (socket<0 && sockDef!=0)
354 create(addr); //generate address and socket
355
356 const bool retval = old!=socket;
357
358 // Socket closed
359 if (socket<0)
360 return retval;
361
362 // Socket open and connected: Nothing to do
363 if (connected)
364 return retval;
365
366 //try to connect if not yet done
367 const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
368 if (rc == -1)
369 return retval;
370
371 connected = true;
372
373 if (sockDef<0)
374 {
375 bufTyp = READ_STRUCT::kStream; // full data to be skipped
376 bufLen = MAX_LEN; // huge for skipping
377 }
378 else
379 {
380 bufTyp = READ_STRUCT::kHeader; // expect a header
381 bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
382 }
383
384 bufPos = B; // no byte read so far
385 skip = 0; // start empty
386
387 factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
388
389#ifdef USE_EPOLL
390 epoll_event ev;
391 ev.events = EPOLLIN;
392 ev.data.ptr = this; // user data (union: ev.ptr)
393 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
394 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
395#endif
396
397 return retval;
398}
399
400bool READ_STRUCT::read()
401{
402 if (!connected)
403 return false;
404
405 if (bufLen==0)
406 return true;
407
408 const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
409 // recv failed
410 if (jrd<0)
411 {
412 // There was just nothing waiting
413 if (errno==EWOULDBLOCK || errno==EAGAIN)
414 return false;
415
416 factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno);
417 return false;
418 }
419
420 // connection was closed ...
421 if (jrd==0)
422 {
423 factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId);
424
425 destroy();//DestroySocket(rd[i]); //generate address and socket
426 return false;
427 }
428
429 rateBytes += jrd;
430
431 // are we skipping this board ...
432 if (bufTyp==kStream)
433 return false;
434
435 if (bufPos==B)
436 gettimeofday(&time, NULL);
437
438 bufPos += jrd; //==> prepare for continuation
439 bufLen -= jrd;
440
441 // not yet all read
442 return bufLen==0;
443}
444
445void READ_STRUCT::swapHeader()
446{
447 S[1] = ntohs(S[1]); // package_length (bytes not swapped!)
448 S[2] = ntohs(S[2]); // version_no
449 S[3] = ntohs(S[3]); // PLLLCK
450 S[4] = ntohs(S[4]); // trigger_crc
451 S[5] = ntohs(S[5]); // trigger_type
452
453 I[3] = ntohl(I[3]); // trigger_id
454 I[4] = ntohl(I[4]); // fad_evt_counter
455 I[5] = ntohl(I[5]); // REFCLK_frequency
456
457 S[12] = ntohs(S[12]); // board id
458 S[13] = ntohs(S[13]); // adc_clock_phase_shift
459 S[14] = ntohs(S[14]); // number_of_triggers_to_generate
460 S[15] = ntohs(S[15]); // trigger_generator_prescaler
461
462 I[10] = ntohl(I[10]); // runnumber;
463 I[11] = ntohl(I[11]); // time;
464
465 // Use back inserter??
466 for (int s=24; s<24+NTemp+NDAC; s++)
467 S[s] = ntohs(S[s]); // drs_temperature / dac
468}
469
470void READ_STRUCT::swapData()
471{
472 // swapEventHeaderBytes: End of the header. to channels now
473
474 int i = 36;
475 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
476 {
477 S[i+0] = ntohs(S[i+0]);//id
478 S[i+1] = ntohs(S[i+1]);//start_cell
479 S[i+2] = ntohs(S[i+2]);//roi
480 S[i+3] = ntohs(S[i+3]);//filling
481
482 i += 4+S[i+2];//skip the pixel data
483 }
484}
485
486// ==========================================================================
487
488bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
489{
490 int xjr = -1;
491 int xkr = -1;
492
493 //points to the very first roi
494 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
495
496 roi[0] = ntohs(rd.S[roiPtr]);
497
498 for (int jr = 0; jr < 9; jr++)
499 {
500 roi[jr] = ntohs(rd.S[roiPtr]);
501
502 if (roi[jr]>1024)
503 {
504 factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]);
505 return false;
506 }
507
508 // Check that the roi of pixels jr are compatible with the one of pixel 0
509 if (jr!=8 && roi[jr]!=roi[0])
510 {
511 xjr = jr;
512 break;
513 }
514
515 // Check that the roi of all other DRS chips on boards are compatible
516 for (int kr = 1; kr < 4; kr++)
517 {
518 const int kroi = ntohs(rd.S[roiPtr]);
519 if (kroi != roi[jr])
520 {
521 xjr = jr;
522 xkr = kr;
523 break;
524 }
525 roiPtr += kroi+4;
526 }
527 }
528
529 if (xjr>=0)
530 {
531 if (xkr<0)
532 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
533 else
534 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr]));
535
536 return false;
537 }
538
539 if (roi[8] < roi[0])
540 {
541 factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
542 return false;
543 }
544
545 return true;
546}
547
548list<shared_ptr<EVT_CTRL2>> evtCtrl;
549
550shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
551{
552 /*
553 checkroi consistence
554 find existing entry
555 if no entry, try to allocate memory
556 if entry and memory, init event structure
557 */
558
559 uint16_t nRoi[9];
560 if (!checkRoiConsistency(rd, nRoi))
561 return shared_ptr<EVT_CTRL2>();
562
563 for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
564 {
565 // A reference is enough because the evtCtrl holds the shared_ptr anyway
566 const shared_ptr<EVT_CTRL2> &evt = *it;
567
568 // If the run is different, go on searching.
569 // We cannot stop searching if a lower run-id is found as in
570 // the case of the events, because theoretically, there
571 // can be the same run on two different days.
572 if (rd.H.runnumber != evt->runNum)
573 continue;
574
575 // If the ID of the new event if higher than the last one stored
576 // in that run, we have to assign a new slot (leave the loop)
577 if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/)
578 break;
579
580 if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/)
581 continue;
582
583 // We have found an entry with the same runID and evtID
584 // Check if ROI is consistent
585 if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
586 {
587 factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
588 evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
589 return shared_ptr<EVT_CTRL2>();
590 }
591
592 // It is maybe not likely, but the header of this board might have
593 // arrived earlier. (We could also update the run-info, but
594 // this should not make a difference here)
595 if ((rd.time.tv_sec==evt->time.tv_sec && rd.time.tv_usec<evt->time.tv_usec) ||
596 rd.time.tv_sec<evt->time.tv_sec)
597 evt->time = rd.time;
598
599 //everything seems fine so far ==> use this slot ....
600 return evt;
601 }
602
603 if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
604 {
605 factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
606 actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter);
607 return shared_ptr<EVT_CTRL2>();
608 }
609
610 EVT_CTRL2 *evt = new EVT_CTRL2;
611
612 evt->time = rd.time;
613
614 evt->runNum = rd.H.runnumber;
615 evt->evNum = rd.H.fad_evt_counter;
616
617 evt->trgNum = rd.H.trigger_id;
618 evt->trgTyp = rd.H.trigger_type;
619
620 evt->nRoi = nRoi[0];
621 evt->nRoiTM = nRoi[8];
622
623 const bool newrun = actrun->runId != rd.H.runnumber;
624 if (newrun)
625 {
626 // Since we have started a new run, we know already when to close the
627 // previous run in terms of number of events
628 actrun->maxEvt = actrun->lastEvt;
629
630 factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d",
631 rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
632
633 // The new run is the active run now
634 actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
635
636 const time_t &tsec = evt->time.tv_sec;
637
638 actrun->openTime = tsec;
639 actrun->closeTime = tsec + 3600 * 24; // max time allowed
640 actrun->runId = rd.H.runnumber;
641 actrun->roi0 = nRoi[0]; // FIXME: Make obsolete!
642 actrun->roi8 = nRoi[8]; // FIXME: Make obsolete!
643
644 // Signal the fadctrl that a new run has been started
645 // Note this is the only place at which we can ensure that
646 // gotnewRun is called only once
647 gotNewRun(*actrun);
648 }
649
650 // Keep pointer to run of this event
651 evt->runCtrl = actrun;
652
653 // Increase the number of events we have started to receive in this run
654 actrun->lastTime = evt->time.tv_sec; // Time when the last event was received
655 actrun->lastEvt++;
656
657 // An event can be the first and the last, but not the last and the first.
658 // Therefore gotNewRun is called before runFinished.
659 // runFinished signals that the last event of a run was just received. Processing
660 // might still be ongoing, but we can start a new run.
661 const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached
662 const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached
663 if (!cond1 || !cond2)
664 runFinished();
665
666 // We don't mind here that this is not common to all events,
667 // because every coming event will fullfil the condition as well.
668 if (!cond1)
669 evt->closeRequest |= kRequestMaxEvtsReached;
670 if (!cond2)
671 evt->closeRequest |= kRequestMaxTimeReached;
672
673 // Secure access to evtCtrl against access in CloseRunFile
674 // This should be the last... otherwise we can run into threading issues
675 // if the event is accessed before it is fully initialized.
676 evtCtrl.emplace_back(evt);
677 return evtCtrl.back();
678}
679
680
681void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
682{
683 const int i = rBuf.sockId;
684
685 memcpy(evt->FADhead+i, &rBuf.H, sizeof(PEVNT_HEADER));
686
687 int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts
688
689 // consistency of ROIs have been checked already (is it all correct?)
690 const uint16_t &roi = rBuf.S[src+2];
691
692 // different sort in FAD board.....
693 EVENT *event = evt->fEvent;
694 for (int px = 0; px < 9; px++)
695 {
696 for (int drs = 0; drs < 4; drs++)
697 {
698 const int16_t pixC = rBuf.S[src+1]; // start-cell
699 const int16_t pixR = rBuf.S[src+2]; // roi
700 //here we should check if pixH is correct ....
701
702 const int pixS = i*36 + drs*9 + px;
703
704 event->StartPix[pixS] = pixC;
705
706 memcpy(event->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2);
707
708 src += 4+pixR;
709
710 // Treatment for ch 9 (TM channel)
711 if (px != 8)
712 continue;
713
714 const int tmS = i*4 + drs;
715
716 //and we have additional TM info
717 if (pixR > roi)
718 {
719 event->StartTM[tmS] = (pixC + pixR - roi) % 1024;
720
721 memcpy(event->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2);
722 }
723 else
724 {
725 event->StartTM[tmS] = -1;
726 }
727 }
728 }
729}
730
731// ==========================================================================
732
733uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt)
734{
735 factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)",
736 evt->runNum, evt->evNum, evtCtrl.size(), txt);
737
738 uint64_t report = 0;
739
740 char str[1000];
741
742 int ik=0;
743 for (int ib=0; ib<NBOARDS; ib++)
744 {
745 if (ib%10==0)
746 str[ik++] = '|';
747
748 const int jb = evt->board[ib];
749 if (jb>=0) // data received from that board
750 {
751 str[ik++] = '0'+(jb%10);
752 continue;
753 }
754
755 // FIXME: This is not synchronous... it reports
756 // accoridng to the current connection status, not w.r.t. to the
757 // one when the event was taken.
758 if (gi_NumConnect[ib]==0) // board not connected
759 {
760 str[ik++] = 'x';
761 continue;
762 }
763
764 // data from this board lost
765 str[ik++] = '.';
766 report |= ((uint64_t)1)<<ib;
767 }
768
769 str[ik++] = '|';
770 str[ik] = 0;
771
772 factOut(MessageImp::kWarn, str);
773
774 return report;
775}
776
777// ==========================================================================
778// ==========================================================================
779
780void proc1(const shared_ptr<EVT_CTRL2> &);
781
782Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&proc1, placeholders::_1));
783
784void proc1(const shared_ptr<EVT_CTRL2> &evt)
785{
786 applyCalib(*evt, processingQueue1.size());
787}
788
789// If this is not convenient anymore, it could be replaced by
790// a command queue, to which command+data is posted,
791// (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo)
792void writeEvt(const shared_ptr<EVT_CTRL2> &evt)
793{
794 //const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
795 RUN_CTRL2 &run = *evt->runCtrl;
796
797 // Is this a valid event or just an empty event to trigger run close?
798 // If this is not an empty event open the new run-file
799 // Empty events are there to trigger run-closing conditions
800 if (evt->valid())
801 {
802 // File not yet open
803 if (run.fileStat==kFileNotYetOpen)
804 {
805 // runOpen will close a previous run, if still open
806 if (!runOpen(*evt))
807 {
808 factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
809 run.fileStat = kFileClosed;
810 return;
811 }
812
813 factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
814 run.fileStat = kFileOpen;
815 }
816
817 // Here we have a valid calibration and can go on with that.
818 // It is important that _all_ events are sent for calibration (except broken ones)
819 processingQueue1.post(evt);
820 }
821
822 // File already closed
823 if (run.fileStat==kFileClosed)
824 return;
825
826 bool rc1 = true;
827 if (evt->valid())
828 {
829 rc1 = runWrite(*evt);
830 if (!rc1)
831 factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
832 }
833
834 // File not open... no need to close or to check for close
835 // ... this is the case if CloseRunFile was called before any file was opened.
836 if (run.fileStat!=kFileOpen)
837 return;
838
839 // File is not yet to be closed.
840 if (rc1 && evt->closeRequest==kRequestNone)
841 return;
842
843 runClose(run);
844 run.fileStat = kFileClosed;
845
846 vector<string> reason;
847 if (evt->closeRequest&kRequestManual)
848 reason.emplace_back("close requested");
849 if (evt->closeRequest&kRequestTimeout)
850 reason.emplace_back("receive timeout");
851 if (evt->closeRequest&kRequestConnectionChange)
852 reason.emplace_back("connection changed");
853 if (evt->closeRequest&kRequestEventCheckFailed)
854 reason.emplace_back("event check failed");
855 if (evt->closeRequest&kRequestMaxTimeReached)
856 reason.push_back(to_string(run.closeTime-run.openTime)+"s reached");
857 if (evt->closeRequest&kRequestMaxEvtsReached)
858 reason.push_back(to_string(run.maxEvt)+" evts reached");
859 if (!rc1)
860 reason.emplace_back("runWrite failed");
861
862 const string str = boost::algorithm::join(reason, ", ");
863 factPrintf(MessageImp::kInfo, "File closed because %s", str.c_str());
864}
865
866Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1));
867
868void procEvt(const shared_ptr<EVT_CTRL2> &evt)
869{
870 if (evt->valid())
871 {
872 EVENT *event = evt->fEvent;
873
874 // This is already done in initMemory()
875 //event->Roi = evt->runCtrl->roi0;
876 //event->RoiTM = evt->runCtrl->roi8;
877 //event->EventNum = evt->evNum;
878 //event->TriggerNum = evt->trgNum;
879 //event->TriggerType = evt->trgTyp;
880
881 event->NumBoards = evt->nBoard;
882
883 event->PCTime = evt->time.tv_sec;
884 event->PCUsec = evt->time.tv_usec;
885
886 for (int ib=0; ib<NBOARDS; ib++)
887 event->BoardTime[ib] = evt->FADhead[ib].time;
888
889 if (!eventCheck(*evt))
890 {
891 secondaryQueue.emplace(new EVT_CTRL2(kRequestEventCheckFailed, evt->runCtrl));
892 return;
893 }
894 }
895
896 // If file is open post the event for being written
897 secondaryQueue.post(evt);
898}
899
900// ==========================================================================
901// ==========================================================================
902
903/*
904 task 1-4:
905
906 lock1()-lock4();
907 while (1)
908 {
909 wait for signal [lockN]; // unlocked
910
911 while (n!=10)
912 wait sockets;
913 read;
914
915 lockM();
916 finished[n] = true;
917 signal(mainloop);
918 unlockM();
919 }
920
921
922 mainloop:
923
924 while (1)
925 {
926 lockM();
927 while (!finished[0] || !finished[1] ...)
928 wait for signal [lockM]; // unlocked... signals can be sent
929 finished[0-1] = false;
930 unlockM()
931
932 copy data to queue // locked
933
934 lockN[0-3];
935 signalN[0-3];
936 unlockN[0-3];
937 }
938
939
940 */
941
942/*
943 while (g_reset)
944 {
945 shared_ptr<EVT_CTRL2> evt = new shared_ptr<>;
946
947 // Check that all sockets are connected
948
949 for (int i=0; i<40; i++)
950 if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0)
951 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
952
953 while (g_reset)
954 {
955 if (READ_STRUCT::wait()<0)
956 break;
957
958 if (rc_epoll==0)
959 break;
960
961 for (int jj=0; jj<rc_epoll; jj++)
962 {
963 READ_STRUCT *rs = READ_STRUCT::get(jj);
964 if (!rs->connected)
965 continue;
966
967 const bool rc_read = rs->read();
968 if (!rc_read)
969 continue;
970
971 if (rs->bufTyp==READ_STRUCT::kHeader)
972 {
973 [...]
974 }
975
976 [...]
977
978 if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
979 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
980 }
981
982 if (once_a_second)
983 {
984 if (evt==timeout)
985 break;
986 }
987 }
988
989 if (evt.nBoards==actBoards)
990 primaryQueue.post(evt);
991 }
992*/
993
994Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
995
996// This corresponds more or less to fFile... should we merge both?
997shared_ptr<RUN_CTRL2> actrun;
998
999void CloseRunFile()
1000{
1001 // Currently we need actrun here, to be able to set kFileClosed.
1002 // Apart from that we have to ensure that there is an open file at all
1003 // which we can close.
1004 // Submission to the primary queue ensures that the event
1005 // is placed at the right place in the processing chain.
1006 // (Corresponds to the correct run)
1007 primaryQueue.emplace(new EVT_CTRL2(kRequestManual, actrun));
1008}
1009
1010bool mainloop(READ_STRUCT *rd)
1011{
1012 factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
1013
1014 primaryQueue.start();
1015 secondaryQueue.start();
1016 processingQueue1.start();;
1017
1018 actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
1019
1020 //time in seconds
1021 time_t gi_SecTime = time(NULL)-1;
1022
1023 //loop until global variable g_runStat claims stop
1024 g_reset = 0;
1025 while (g_reset == 0)
1026 {
1027#ifdef USE_POLL
1028 int pp[40];
1029 int nn = 0;
1030 pollfd fds[40];
1031 for (int i=0; i<40; i++)
1032 {
1033 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1034 {
1035 fds[nn].fd = rd[i].socket;
1036 fds[nn].events = POLLIN;
1037 pp[nn] = i;
1038 nn++;
1039 }
1040 }
1041
1042 const int rc_epoll = poll(fds, nn, 100);
1043 if (rc_epoll<0)
1044 break;
1045#endif
1046
1047#ifdef USE_SELECT
1048 fd_set readfs;
1049 FD_ZERO(&readfs);
1050 int nfsd = 0;
1051 for (int i=0; i<NBOARDS; i++)
1052 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1053 {
1054 FD_SET(rd[i].socket, &readfs);
1055 if (rd[i].socket>nfsd)
1056 nfsd = rd[i].socket;
1057 }
1058
1059 timeval tv;
1060 tv.tv_sec = 0;
1061 tv.tv_usec = 100000;
1062 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
1063 // 0: timeout
1064 // -1: error
1065 if (rc_select<0)
1066 {
1067 factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
1068 continue;
1069 }
1070#endif
1071
1072#ifdef USE_EPOLL
1073 const int rc_epoll = READ_STRUCT::wait();
1074 if (rc_epoll<0)
1075 break;
1076#endif
1077
1078#if defined(USE_POLL)
1079 for (int jj=0; jj<nn; jj++)
1080#endif
1081#if defined(USE_EPOLL)
1082 for (int jj=0; jj<rc_epoll; jj++)
1083#endif
1084#if !defined(USE_EPOLL) && !defined(USE_POLL)
1085 for (int jj=0; jj<NBOARDS; jj++)
1086#endif
1087 {
1088#ifdef USE_SELECT
1089 if (!FD_ISSET(rs->socket, &readfs))
1090 continue;
1091#endif
1092
1093#ifdef USE_POLL
1094 if ((fds[jj].revents&POLLIN)==0)
1095 continue;
1096#endif
1097
1098#ifdef USE_EPOLL
1099 // FIXME: How to get i?
1100 READ_STRUCT *rs = READ_STRUCT::get(jj);
1101#endif
1102
1103#ifdef USE_POLL
1104 // FIXME: How to get i?
1105 READ_STRUCT *rs = &rd[pp[jj]];
1106#endif
1107
1108#if !defined(USE_POLL) && !defined(USE_EPOLL)
1109 const int i = (jj%4)*10 + (jj/4);
1110 READ_STRUCT *rs = &rd[i];
1111#endif
1112
1113#ifdef COMPLETE_EVENTS
1114 if (rs->bufTyp==READ_STRUCT::kWait)
1115 continue;
1116#endif
1117
1118 // ==================================================================
1119
1120 const bool rc_read = rs->read();
1121
1122 // Connect might have gotten closed during read
1123 gi_NumConnect[rs->sockId] = rs->connected;
1124 gj.numConn[rs->sockId] = rs->connected;
1125
1126 // Read either failed or disconnected, or the buffer is not yet full
1127 if (!rc_read)
1128 continue;
1129
1130 // ==================================================================
1131
1132 if (rs->bufTyp==READ_STRUCT::kHeader)
1133 {
1134 //check if startflag correct; else shift block ....
1135 // FIXME: This is not enough... this combination of
1136 // bytes can be anywhere... at least the end bytes
1137 // must be checked somewhere, too.
1138 uint k;
1139 for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
1140 {
1141 //if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
1142 if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0x01fb)
1143 break;
1144 }
1145 rs->skip += k;
1146
1147 //no start of header found
1148 if (k==sizeof(PEVNT_HEADER)-1)
1149 {
1150 rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1];
1151 rs->bufPos = rs->B+1;
1152 rs->bufLen = sizeof(PEVNT_HEADER)-1;
1153 continue;
1154 }
1155
1156 if (k > 0)
1157 {
1158 memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
1159
1160 rs->bufPos -= k;
1161 rs->bufLen += k;
1162
1163 continue; // We need to read more (bufLen>0)
1164 }
1165
1166 if (rs->skip>0)
1167 {
1168 factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
1169 rs->skip = 0;
1170 }
1171
1172 // Swap the header entries from network to host order
1173 rs->swapHeader();
1174
1175 rs->bufTyp = READ_STRUCT::kData;
1176 rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
1177
1178 debugHead(rs->B); // i and fadBoard not used
1179
1180 continue;
1181 }
1182
1183 const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
1184 if (end != 0xfe04)
1185 {
1186 factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
1187 rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
1188
1189 // ready to read next header
1190 rs->bufTyp = READ_STRUCT::kHeader;
1191 rs->bufLen = sizeof(PEVNT_HEADER);
1192 rs->bufPos = rs->B;
1193 // FIXME: What to do with the validity flag?
1194 continue;
1195 }
1196
1197 // get index into mBuffer for this event (create if needed)
1198 const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
1199
1200 // We have a valid entry, but no memory has yet been allocated
1201 if (evt && !evt->initMemory())
1202 {
1203 const time_t tm = time(NULL);
1204 if (evt->runCtrl->reportMem==tm)
1205 continue;
1206
1207 factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
1208 evt->runCtrl->reportMem = tm;
1209 continue;
1210 }
1211
1212 // ready to read next header
1213 rs->bufTyp = READ_STRUCT::kHeader;
1214 rs->bufLen = sizeof(PEVNT_HEADER);
1215 rs->bufPos = rs->B;
1216
1217 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1218 if (!evt)
1219 continue;
1220
1221 // This should never happen
1222 if (evt->board[rs->sockId] != -1)
1223 {
1224 factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
1225 evt->evNum, rs->sockId, rs->sockId, rs->len());
1226 // FIXME: What to do with the validity flag?
1227 continue; // Continue reading next header
1228 }
1229
1230 // Swap the data entries (board headers) from network to host order
1231 rs->swapData();
1232
1233 // Copy data from rd[i] to mBuffer[evID]
1234 copyData(*rs, evt.get());
1235
1236#ifdef COMPLETE_EVENTS
1237 // Do not read anmymore from this board until the whole event has been received
1238 rs->bufTyp = READ_STRUCT::kWait;
1239#endif
1240 // now we have stored a new board contents into Event structure
1241 evt->board[rs->sockId] = rs->sockId;
1242 evt->header = evt->FADhead+rs->sockId;
1243 evt->nBoard++;
1244
1245#ifdef COMPLETE_EPOLL
1246 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
1247 {
1248 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1249 break;
1250 }
1251#endif
1252 // event not yet complete
1253 if (evt->nBoard < READ_STRUCT::activeSockets)
1254 continue;
1255
1256 // All previous events are now flagged as incomplete ("expired")
1257 // and will be removed. (This is a bit tricky, because pop_front()
1258 // would invalidate the current iterator if not done _after_ the increment)
1259 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1260 {
1261 const bool found = it->get()==evt.get();
1262 if (!found)
1263 reportIncomplete(*it, "expired");
1264 else
1265 primaryQueue.post(evt);
1266
1267 it++;
1268 evtCtrl.pop_front();
1269
1270 // We reached the current event, so we are done
1271 if (found)
1272 break;
1273 }
1274
1275#ifdef COMPLETE_EPOLL
1276 for (int j=0; j<40; j++)
1277 {
1278 epoll_event ev;
1279 ev.events = EPOLLIN;
1280 ev.data.ptr = &rd[j]; // user data (union: ev.ptr)
1281 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
1282 {
1283 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1284 return;
1285 }
1286 }
1287#endif
1288
1289#ifdef COMPLETE_EVENTS
1290 for (int j=0; j<40; j++)
1291 {
1292 //if (rs->bufTyp==READ_STRUCT::kWait)
1293 {
1294 rs->bufTyp = READ_STRUCT::kHeader;
1295 rs->bufLen = sizeof(PEVNT_HEADER);
1296 rs->bufPos = rs->B;
1297 }
1298 }
1299#endif
1300 } // end for loop over all sockets
1301
1302 // ==================================================================
1303
1304 const time_t actTime = time(NULL);
1305 if (actTime == gi_SecTime)
1306 {
1307#if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL)
1308 if (evtCtrl.empty())
1309 usleep(1);
1310#endif
1311 continue;
1312 }
1313 gi_SecTime = actTime;
1314
1315 // ==================================================================
1316 //loop over all active events and flag those older than read-timeout
1317 //delete those that are written to disk ....
1318
1319 // This could be improved having the pointer which separates the queue with
1320 // the incomplete events from the queue with the complete events
1321 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1322 {
1323 // A reference is enough because the shared_ptr is hold by the evtCtrl
1324 const shared_ptr<EVT_CTRL2> &evt = *it;
1325
1326 // The first event is the oldest. If the first event within the
1327 // timeout window was received, we can stop searchinf further.
1328 if (evt->time.tv_sec>=actTime - 30)
1329 break;
1330
1331 // This will result in the emission of a dim service.
1332 // It doesn't matter if that takes comparably long,
1333 // because we have to stop the run anyway.
1334 const uint64_t rep = reportIncomplete(evt, "timeout");
1335 factReportIncomplete(rep);
1336
1337 it++;
1338 evtCtrl.pop_front();
1339 }
1340
1341 // =================================================================
1342
1343 gj.bufNew = evtCtrl.size(); //# incomplete events in buffer
1344 gj.bufEvt = primaryQueue.size(); //# complete events in buffer
1345 gj.bufWrite = secondaryQueue.size(); //# complete events in buffer
1346 gj.bufProc = processingQueue1.size(); //# complete events in buffer
1347 gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
1348 gj.usdMem = Memory::max_inuse;
1349 gj.totMem = Memory::allocated;
1350 gj.maxMem = g_maxMem;
1351
1352 gj.deltaT = 1000; // temporary, must be improved
1353
1354 bool changed = false;
1355
1356 for (int ib=0; ib<NBOARDS; ib++)
1357 {
1358 gj.rateBytes[ib] = rd[ib].rateBytes;
1359 gj.totBytes[ib] += rd[ib].rateBytes;
1360
1361 if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
1362 changed = true;
1363
1364 gi_NumConnect[ib] = rd[ib].connected;
1365 gj.numConn[ib] = rd[ib].connected;
1366 }
1367
1368 factStat(gj);
1369
1370 Memory::max_inuse = 0;
1371
1372 for (int ib=0; ib<NBOARDS; ib++)
1373 rd[ib].rateBytes = 0;
1374
1375 // =================================================================
1376
1377 // This is a fake event to trigger possible run-closing conditions once a second
1378 // FIXME: This is not yet ideal because a file would never be closed
1379 // if a new file has been started and no events of the new file
1380 // have been received yet
1381 int request = kRequestNone;
1382
1383 // If nothing was received for more than 5min, close file
1384 if (actTime-actrun->lastTime>300)
1385 request |= kRequestTimeout;
1386
1387 // If connection status has changed
1388 if (changed)
1389 request |= kRequestConnectionChange;
1390
1391 if (request!=kRequestNone)
1392 runFinished();
1393
1394 if (actrun->fileStat==kFileOpen)
1395 primaryQueue.emplace(new EVT_CTRL2(request, actrun));
1396 }
1397
1398 // 1: Stop, wait for event to get processed
1399 // 2: Stop, finish immediately
1400 // 101: Restart, wait for events to get processed
1401 // 101: Restart, finish immediately
1402 //
1403 const int gi_reset = g_reset;
1404
1405 const bool abort = gi_reset%100==2;
1406
1407 factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
1408
1409 primaryQueue.wait(abort);
1410 secondaryQueue.wait(abort);
1411 processingQueue1.wait(abort);
1412
1413 // Here we also destroy all runCtrl structures and hence close all open files
1414 evtCtrl.clear();
1415 actrun.reset();
1416
1417 factPrintf(MessageImp::kInfo, "Exit read Process...");
1418 factPrintf(MessageImp::kInfo, "%ld Bytes flagged as in-use.", Memory::inuse);
1419
1420 factStat(gj);
1421
1422 return gi_reset>=100;
1423}
1424
1425// ==========================================================================
1426// ==========================================================================
1427
1428void StartEvtBuild()
1429{
1430 factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
1431
1432 memset(gi_NumConnect, 0, NBOARDS*sizeof(*gi_NumConnect));
1433
1434 memset(&gj, 0, sizeof(GUI_STAT));
1435
1436 gj.usdMem = Memory::inuse;
1437 gj.totMem = Memory::allocated;
1438 gj.maxMem = g_maxMem;
1439
1440
1441 READ_STRUCT rd[NBOARDS];
1442
1443 // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets)
1444 for (int i=0; i<NBOARDS; i++)
1445 rd[i].sockId = i;
1446
1447 while (mainloop(rd));
1448
1449 //must close all open sockets ...
1450 factPrintf(MessageImp::kInfo, "Close all sockets...");
1451
1452 READ_STRUCT::close();
1453
1454 // Now all sockets get closed. This is not reflected in gi_NumConnect
1455 // The current workaround is to count all sockets as closed when the thread is not running
1456}
Note: See TracBrowser for help on using the repository browser.