source: trunk/FACT++/src/EventBuilder.cc@ 16443

Last change on this file since 16443 was 16382, checked in by tbretz, 12 years ago
Some more performance improvements (e.g. emplace instead of post, memset instead of for); moved the close request from the run-ctrl (where it would be executed immediately) to the evt-ctrl; removed some shared_ptrs where not necessary because the object cannot go out of scope while the shared_ptr does not go out of scope
File size: 43.0 KB
Line 
1#include <poll.h>
2#include <sys/time.h>
3#include <sys/epoll.h>
4#include <netinet/tcp.h>
5
6#include <cstring>
7#include <cstdarg>
8#include <list>
9
10#include <boost/algorithm/string/join.hpp>
11
12#include "queue.h"
13
14#include "MessageImp.h"
15#include "EventBuilder.h"
16
17using namespace std;
18
19#define MIN_LEN 32 // min #bytes needed to interpret FADheader
20#define MAX_LEN 81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092 (data+boardheader+eventheader+endflag)
21
22//#define COMPLETE_EVENTS
23//#define USE_POLL
24//#define USE_EPOLL
25//#define USE_SELECT
26//#define COMPLETE_EPOLL
27
28// Reading only 1024: 13: 77Hz, 87%
29// Reading only 1024: 12: 78Hz, 46%
30// Reading only 300: 4: 250Hz, 92%
31// Reading only 300: 3: 258Hz, 40%
32
33// Reading only four threads 1024: 13: 77Hz, 60%
34// Reading only four threads 1024: 12: 78Hz, 46%
35// Reading only four threads 300: 4: 250Hz, 92%
36// Reading only four threads 300: 3: 258Hz, 40%
37
38// Default 300: 4: 249Hz, 92%
39// Default 300: 3: 261Hz, 40%
40// Default 1024: 13: 76Hz, 93%
41// Default 1024: 12: 79Hz, 46%
42
43// Poll [selected] 1024: 13: 63Hz, 45%
44// Poll [selected] 1024: 14: 63Hz, 63%
45// Poll [selected] 1024: 15: 64Hz, 80%
46// Poll [selected] 300: 4: 230Hz, 47%
47// Poll [selected] 300: 3: 200Hz, 94%
48
49// Poll [all] 1024: 13: 65Hz, 47%
50// Poll [all] 1024: 14: 64Hz, 59%
51// Poll [all] 1024: 15: 62Hz, 67%
52// Poll [all] 300: 4: 230Hz, 47%
53// Poll [all] 300: 3: 230Hz, 35%
54
55// ==========================================================================
56
57bool runOpen(const EVT_CTRL2 &evt);
58bool runWrite(const EVT_CTRL2 &evt);
59void runClose();
60void applyCalib(const shared_ptr<EVT_CTRL2> &evt);
61void factOut(int severity, const char *message);
62void factReportIncomplete (uint64_t rep);
63void gotNewRun(RUN_CTRL2 &run);
64void runFinished();
65void factStat(GUI_STAT gj);
66bool eventCheck(const EVT_CTRL2 &evt);
67void debugHead(void *buf);
68
69// ==========================================================================
70
71int g_reset;
72
73size_t g_maxMem; //maximum memory allowed for buffer
74
75FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
76
77uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
78
79GUI_STAT gj;
80
81// ==========================================================================
82
83namespace Memory
84{
85 uint64_t inuse = 0;
86 uint64_t allocated = 0;
87
88 uint64_t max_inuse = 0;
89
90 std::mutex mtx;
91
92 std::forward_list<void*> memory;
93
94 void *malloc()
95 {
96 // No free slot available, next alloc would exceed max memory
97 if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem)
98 return NULL;
99
100 // We will return this amount of memory
101 // This is not 100% thread safe, but it is not a super accurate measure anyway
102 inuse += MAX_TOT_MEM;
103 if (inuse>max_inuse)
104 max_inuse = inuse;
105
106 void *mem = NULL;
107
108 if (memory.empty())
109 {
110 // No free slot available, allocate a new one
111 allocated += MAX_TOT_MEM;
112 mem = new char[MAX_TOT_MEM];
113 }
114 else
115 {
116 // Get the next free slot from the stack and return it
117 const std::lock_guard<std::mutex> lock(mtx);
118 mem = memory.front();
119 memory.pop_front();
120 }
121
122 memset(mem, 0, MAX_HEAD_MEM);
123 return mem;
124 };
125
126 void free(void *mem)
127 {
128 if (!mem)
129 return;
130
131 // Decrease the amont of memory in use accordingly
132 inuse -= MAX_TOT_MEM;
133
134 // If the maximum memory has changed, we might be over the limit.
135 // In this case: free a slot
136 if (allocated>g_maxMem)
137 {
138 delete [] (char*)mem;
139 allocated -= MAX_TOT_MEM;
140 return;
141 }
142
143 const std::lock_guard<std::mutex> lock(mtx);
144 memory.push_front(mem);
145 }
146
147};
148
149// ==========================================================================
150
151void factPrintf(int severity, const char *fmt, ...)
152{
153 char str[1000];
154
155 va_list ap;
156 va_start(ap, fmt);
157 vsnprintf(str, 1000, fmt, ap);
158 va_end(ap);
159
160 factOut(severity, str);
161}
162
163// ==========================================================================
164
165struct READ_STRUCT
166{
167 enum buftyp_t
168 {
169 kStream,
170 kHeader,
171 kData,
172#ifdef COMPLETE_EVENTS
173 kWait
174#endif
175 };
176
177 // ---------- connection ----------
178
179 static uint activeSockets;
180
181 int sockId; // socket id (board number)
182 int socket; // socket handle
183 bool connected; // is this socket connected?
184
185 struct sockaddr_in SockAddr; // Socket address copied from wrapper during socket creation
186
187 // ------------ epoll -------------
188
189 static int fd_epoll;
190 static epoll_event events[NBOARDS];
191
192 static void init();
193 static void close();
194 static int wait();
195 static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); }
196
197 // ------------ buffer ------------
198
199 buftyp_t bufTyp; // what are we reading at the moment: 0=header 1=data -1=skip ...
200
201 uint32_t bufLen; // number of bytes left to read
202 uint8_t *bufPos; // next byte to read to the buffer next
203
204 union
205 {
206 uint8_t B[MAX_LEN];
207 uint16_t S[MAX_LEN / 2];
208 uint32_t I[MAX_LEN / 4];
209 uint64_t L[MAX_LEN / 8];
210 PEVNT_HEADER H;
211 };
212
213 timeval time;
214 uint64_t rateBytes;
215 uint32_t skip; // number of bytes skipped before start of event
216
217 uint32_t len() const { return uint32_t(H.package_length)*2; }
218
219 void swapHeader();
220 void swapData();
221
222 // --------------------------------
223
224 READ_STRUCT() : socket(-1), connected(false), rateBytes(0)
225 {
226 if (fd_epoll<0)
227 init();
228 }
229 ~READ_STRUCT()
230 {
231 destroy();
232 }
233
234 void destroy();
235 bool create(sockaddr_in addr);
236 bool check(int, sockaddr_in addr);
237 bool read();
238};
239
240int READ_STRUCT::wait()
241{
242 // wait for something to do...
243 const int rc = epoll_wait(fd_epoll, events, NBOARDS, 100); // max, timeout[ms]
244 if (rc>=0)
245 return rc;
246
247 if (errno==EINTR) // timout or signal interruption
248 return 0;
249
250 factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno);
251 return -1;
252}
253
254uint READ_STRUCT::activeSockets = 0;
255int READ_STRUCT::fd_epoll = -1;
256epoll_event READ_STRUCT::events[NBOARDS];
257
258void READ_STRUCT::init()
259{
260 if (fd_epoll>=0)
261 return;
262
263#ifdef USE_EPOLL
264 fd_epoll = epoll_create(NBOARDS);
265 if (fd_epoll<0)
266 {
267 factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno);
268 return;
269 }
270#endif
271}
272
273void READ_STRUCT::close()
274{
275#ifdef USE_EPOLL
276 if (fd_epoll>=0 && ::close(fd_epoll)>0)
277 factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno);
278#endif
279
280 fd_epoll = -1;
281}
282
283bool READ_STRUCT::create(sockaddr_in sockAddr)
284{
285 if (socket>=0)
286 return false;
287
288 const int port = ntohs(sockAddr.sin_port) + 1;
289
290 SockAddr.sin_family = sockAddr.sin_family;
291 SockAddr.sin_addr = sockAddr.sin_addr;
292 SockAddr.sin_port = htons(port);
293
294 if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
295 {
296 factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno);
297 socket = -1;
298 return false;
299 }
300
301 int optval = 1;
302 if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0)
303 factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
304
305 optval = 10; //start after 10 seconds
306 if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0)
307 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
308
309 optval = 10; //do every 10 seconds
310 if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0)
311 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
312
313 optval = 2; //close after 2 unsuccessful tries
314 if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0)
315 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
316
317 factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket);
318
319 //connected = false;
320 activeSockets++;
321
322 return true;
323}
324
325void READ_STRUCT::destroy()
326{
327 if (socket<0)
328 return;
329
330#ifdef USE_EPOLL
331 // strictly speaking this should not be necessary
332 if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
333 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
334#endif
335
336 if (::close(socket) > 0)
337 factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
338 else
339 factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket);
340
341 socket = -1;
342 connected = false;
343 activeSockets--;
344}
345
346bool READ_STRUCT::check(int sockDef, sockaddr_in addr)
347{
348 // Continue in the most most likely case (performance)
349 //if (socket>=0 && sockDef!=0 && connected)
350 // return;
351 const int old = socket;
352
353 // socket open, but should not be open
354 if (socket>=0 && sockDef==0)
355 destroy();
356
357 // Socket closed, but should be open
358 if (socket<0 && sockDef!=0)
359 create(addr); //generate address and socket
360
361 const bool retval = old!=socket;
362
363 // Socket closed
364 if (socket<0)
365 return retval;
366
367 // Socket open and connected: Nothing to do
368 if (connected)
369 return retval;
370
371 //try to connect if not yet done
372 const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
373 if (rc == -1)
374 return retval;
375
376 connected = true;
377
378 if (sockDef<0)
379 {
380 bufTyp = READ_STRUCT::kStream; // full data to be skipped
381 bufLen = MAX_LEN; // huge for skipping
382 }
383 else
384 {
385 bufTyp = READ_STRUCT::kHeader; // expect a header
386 bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
387 }
388
389 bufPos = B; // no byte read so far
390 skip = 0; // start empty
391
392 factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
393
394#ifdef USE_EPOLL
395 epoll_event ev;
396 ev.events = EPOLLIN;
397 ev.data.ptr = this; // user data (union: ev.ptr)
398 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
399 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
400#endif
401
402 return retval;
403}
404
405bool READ_STRUCT::read()
406{
407 if (!connected)
408 return false;
409
410 if (bufLen==0)
411 return true;
412
413 const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
414 // recv failed
415 if (jrd<0)
416 {
417 // There was just nothing waiting
418 if (errno==EWOULDBLOCK || errno==EAGAIN)
419 return false;
420
421 factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno);
422 return false;
423 }
424
425 // connection was closed ...
426 if (jrd==0)
427 {
428 factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId);
429
430 destroy();//DestroySocket(rd[i]); //generate address and socket
431 return false;
432 }
433
434 rateBytes += jrd;
435
436 // are we skipping this board ...
437 if (bufTyp==kStream)
438 return false;
439
440 if (bufPos==B)
441 gettimeofday(&time, NULL);
442
443 bufPos += jrd; //==> prepare for continuation
444 bufLen -= jrd;
445
446 // not yet all read
447 return bufLen==0;
448}
449
450void READ_STRUCT::swapHeader()
451{
452 S[1] = ntohs(S[1]); // package_length (bytes not swapped!)
453 S[2] = ntohs(S[2]); // version_no
454 S[3] = ntohs(S[3]); // PLLLCK
455 S[4] = ntohs(S[4]); // trigger_crc
456 S[5] = ntohs(S[5]); // trigger_type
457
458 I[3] = ntohl(I[3]); // trigger_id
459 I[4] = ntohl(I[4]); // fad_evt_counter
460 I[5] = ntohl(I[5]); // REFCLK_frequency
461
462 S[12] = ntohs(S[12]); // board id
463 S[13] = ntohs(S[13]); // adc_clock_phase_shift
464 S[14] = ntohs(S[14]); // number_of_triggers_to_generate
465 S[15] = ntohs(S[15]); // trigger_generator_prescaler
466
467 I[10] = ntohl(I[10]); // runnumber;
468 I[11] = ntohl(I[11]); // time;
469
470 // Use back inserter??
471 for (int s=24; s<24+NTemp+NDAC; s++)
472 S[s] = ntohs(S[s]); // drs_temperature / dac
473}
474
475void READ_STRUCT::swapData()
476{
477 // swapEventHeaderBytes: End of the header. to channels now
478
479 int i = 36;
480 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
481 {
482 S[i+0] = ntohs(S[i+0]);//id
483 S[i+1] = ntohs(S[i+1]);//start_cell
484 S[i+2] = ntohs(S[i+2]);//roi
485 S[i+3] = ntohs(S[i+3]);//filling
486
487 i += 4+S[i+2];//skip the pixel data
488 }
489}
490
491// ==========================================================================
492
493bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
494{
495 int xjr = -1;
496 int xkr = -1;
497
498 //points to the very first roi
499 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
500
501 roi[0] = ntohs(rd.S[roiPtr]);
502
503 for (int jr = 0; jr < 9; jr++)
504 {
505 roi[jr] = ntohs(rd.S[roiPtr]);
506
507 if (roi[jr]>1024)
508 {
509 factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]);
510 return false;
511 }
512
513 // Check that the roi of pixels jr are compatible with the one of pixel 0
514 if (jr!=8 && roi[jr]!=roi[0])
515 {
516 xjr = jr;
517 break;
518 }
519
520 // Check that the roi of all other DRS chips on boards are compatible
521 for (int kr = 1; kr < 4; kr++)
522 {
523 const int kroi = ntohs(rd.S[roiPtr]);
524 if (kroi != roi[jr])
525 {
526 xjr = jr;
527 xkr = kr;
528 break;
529 }
530 roiPtr += kroi+4;
531 }
532 }
533
534 if (xjr>=0)
535 {
536 if (xkr<0)
537 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
538 else
539 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr]));
540
541 return false;
542 }
543
544 if (roi[8] < roi[0])
545 {
546 factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
547 return false;
548 }
549
550 return true;
551}
552
553list<shared_ptr<EVT_CTRL2>> evtCtrl;
554
555shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
556{
557 /*
558 checkroi consistence
559 find existing entry
560 if no entry, try to allocate memory
561 if entry and memory, init event structure
562 */
563
564 uint16_t nRoi[9];
565 if (!checkRoiConsistency(rd, nRoi))
566 return shared_ptr<EVT_CTRL2>();
567
568 for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
569 {
570 // A reference is enough because the evtCtrl holds the shared_ptr anyway
571 const shared_ptr<EVT_CTRL2> &evt = *it;
572
573 // If the run is different, go on searching.
574 // We cannot stop searching if a lower run-id is found as in
575 // the case of the events, because theoretically, there
576 // can be the same run on two different days.
577 if (rd.H.runnumber != evt->runNum)
578 continue;
579
580 // If the ID of the new event if higher than the last one stored
581 // in that run, we have to assign a new slot (leave the loop)
582 if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/)
583 break;
584
585 if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/)
586 continue;
587
588 // We have found an entry with the same runID and evtID
589 // Check if ROI is consistent
590 if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
591 {
592 factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
593 evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
594 return shared_ptr<EVT_CTRL2>();
595 }
596
597 // count for inconsistencies
598 if (evt->trgNum != rd.H.trigger_id)
599 evt->Errors[0]++;
600 if (evt->trgTyp != rd.H.trigger_type)
601 evt->Errors[2]++;
602
603 // It is maybe not likely, but the header of this board might have
604 // arrived earlier. (We could also update the run-info, but
605 // this should not make a difference here)
606 if ((rd.time.tv_sec==evt->time.tv_sec && rd.time.tv_usec<evt->time.tv_usec) ||
607 rd.time.tv_sec<evt->time.tv_sec)
608 evt->time = rd.time;
609
610 //everything seems fine so far ==> use this slot ....
611 return evt;
612 }
613
614 if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
615 {
616 factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
617 actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter);
618 return shared_ptr<EVT_CTRL2>();
619 }
620
621 EVT_CTRL2 *evt = new EVT_CTRL2;
622
623 evt->time = rd.time;
624
625 evt->runNum = rd.H.runnumber;
626 evt->evNum = rd.H.fad_evt_counter;
627
628 evt->trgNum = rd.H.trigger_id;
629 evt->trgTyp = rd.H.trigger_type;
630
631 evt->nRoi = nRoi[0];
632 evt->nRoiTM = nRoi[8];
633
634 const bool newrun = actrun->runId != rd.H.runnumber;
635 if (newrun)
636 {
637 // Since we have started a new run, we know already when to close the
638 // previous run in terms of number of events
639 actrun->maxEvt = actrun->lastEvt;
640
641 factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d",
642 rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
643
644 // The new run is the active run now
645 actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
646
647 const time_t &tsec = evt->time.tv_sec;
648
649 actrun->openTime = tsec;
650 actrun->closeTime = tsec + 3600 * 24; // max time allowed
651 actrun->runId = rd.H.runnumber;
652 actrun->roi0 = nRoi[0]; // FIXME: Make obsolete!
653 actrun->roi8 = nRoi[8]; // FIXME: Make obsolete!
654
655 // Signal the fadctrl that a new run has been started
656 // Note this is the only place at which we can ensure that
657 // gotnewRun is called only once
658 gotNewRun(*actrun);
659 }
660
661 // Keep pointer to run of this event
662 evt->runCtrl = actrun;
663
664 // Increase the number of events we have started to receive in this run
665 actrun->lastTime = evt->time.tv_sec; // Time when the last event was received
666 actrun->lastEvt++;
667
668 // An event can be the first and the last, but not the last and the first.
669 // Therefore gotNewRun is called before runFinished.
670 // runFinished signals that the last event of a run was just received. Processing
671 // might still be ongoing, but we can start a new run.
672 const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached
673 const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached
674 if (!cond1 || !cond2)
675 runFinished();
676
677 // We don't mind here that this is not common to all events,
678 // because every coming event will fullfil the condition as well.
679 if (!cond1)
680 evt->closeRequest |= kRequestMaxEvtsReached;
681 if (!cond2)
682 evt->closeRequest |= kRequestMaxTimeReached;
683
684 // Secure access to evtCtrl against access in CloseRunFile
685 // This should be the last... otherwise we can run into threading issues
686 // if the event is accessed before it is fully initialized.
687 evtCtrl.emplace_back(evt);
688 return evtCtrl.back();
689}
690
691
692void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
693{
694 const int i = rBuf.sockId;
695
696 memcpy(evt->FADhead+i, &rBuf.H, sizeof(PEVNT_HEADER));
697
698 int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts
699
700 // consistency of ROIs have been checked already (is it all correct?)
701 const uint16_t &roi = rBuf.S[src+2];
702
703 // different sort in FAD board.....
704 for (int px = 0; px < 9; px++)
705 {
706 for (int drs = 0; drs < 4; drs++)
707 {
708 const int16_t pixC = rBuf.S[src+1]; // start-cell
709 const int16_t pixR = rBuf.S[src+2]; // roi
710 //here we should check if pixH is correct ....
711
712 const int pixS = i*36 + drs*9 + px;
713
714 evt->fEvent->StartPix[pixS] = pixC;
715
716 memcpy(evt->fEvent->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2);
717
718 src += 4+pixR;
719
720 // Treatment for ch 9 (TM channel)
721 if (px != 8)
722 continue;
723
724 const int tmS = i*4 + drs;
725
726 //and we have additional TM info
727 if (pixR > roi)
728 {
729 evt->fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024;
730
731 memcpy(evt->fEvent->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2);
732 }
733 else
734 {
735 evt->fEvent->StartTM[tmS] = -1;
736 }
737 }
738 }
739}
740
741// ==========================================================================
742
743uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt)
744{
745 factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)",
746 evt->runNum, evt->evNum, evtCtrl.size(), txt);
747
748 uint64_t report = 0;
749
750 char str[1000];
751
752 int ik=0;
753 for (int ib=0; ib<NBOARDS; ib++)
754 {
755 if (ib%10==0)
756 str[ik++] = '|';
757
758 const int jb = evt->board[ib];
759 if (jb>=0) // data received from that board
760 {
761 str[ik++] = '0'+(jb%10);
762 continue;
763 }
764
765 // FIXME: This is not synchronous... it reports
766 // accoridng to the current connection status, not w.r.t. to the
767 // one when the event was taken.
768 if (gi_NumConnect[ib]==0) // board not connected
769 {
770 str[ik++] = 'x';
771 continue;
772 }
773
774 // data from this board lost
775 str[ik++] = '.';
776 report |= ((uint64_t)1)<<ib;
777 }
778
779 str[ik++] = '|';
780 str[ik] = 0;
781
782 factOut(MessageImp::kWarn, str);
783
784 return report;
785}
786
787// ==========================================================================
788// ==========================================================================
789
790Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&applyCalib, placeholders::_1));
791
792// If this is not convenient anymore, it could be replaced by
793// a command queue, to which command+data is posted,
794// (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo)
795void writeEvt(const shared_ptr<EVT_CTRL2> &evt)
796{
797 //const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
798 RUN_CTRL2 &run = *evt->runCtrl;
799
800 // Is this a valid event or just an empty event to trigger run close?
801 // If this is not an empty event open the new run-file
802 // Empty events are there to trigger run-closing conditions
803 if (evt->valid())
804 {
805 // File not yet open
806 if (run.fileStat==kFileNotYetOpen)
807 {
808 // runOpen will close a previous run, if still open
809 if (!runOpen(*evt))
810 {
811 factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
812 run.fileStat = kFileClosed;
813 return;
814 }
815
816 factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
817 run.fileStat = kFileOpen;
818 }
819
820 // Here we have a valid calibration and can go on with that.
821 // It is important that _all_ events are sent for calibration (except broken ones)
822 processingQueue1.post(evt);
823 }
824
825 // File already closed
826 if (run.fileStat==kFileClosed)
827 return;
828
829 bool rc1 = true;
830 if (evt->valid())
831 {
832 rc1 = runWrite(*evt);
833 if (!rc1)
834 factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
835 }
836
837 // File not open... no need to close or to check for close
838 // ... this is the case if CloseRunFile was called before any file was opened.
839 if (run.fileStat!=kFileOpen)
840 return;
841
842 // File is not yet to be closed.
843 if (rc1 && evt->closeRequest==kRequestNone)
844 return;
845
846 runClose();
847 run.fileStat = kFileClosed;
848
849 vector<string> reason;
850 if (evt->closeRequest&kRequestManual)
851 reason.emplace_back("close requested");
852 if (evt->closeRequest&kRequestTimeout)
853 reason.emplace_back("receive timeout");
854 if (evt->closeRequest&kRequestConnectionChange)
855 reason.emplace_back("connection changed");
856 if (evt->closeRequest&kRequestEventCheckFailed)
857 reason.emplace_back("event check failed");
858 if (evt->closeRequest&kRequestMaxTimeReached)
859 reason.push_back(to_string(run.closeTime-run.openTime)+"s reached");
860 if (evt->closeRequest&kRequestMaxEvtsReached)
861 reason.push_back(to_string(run.maxEvt)+" evts reached");
862 if (!rc1)
863 reason.emplace_back("runWrite failed");
864
865 const string str = boost::algorithm::join(reason, ", ");
866 factPrintf(MessageImp::kInfo, "File closed because %s", str.c_str());
867}
868
869Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1));
870
871void procEvt(const shared_ptr<EVT_CTRL2> &evt)
872{
873 if (evt->valid())
874 {
875 evt->fEvent->Errors[0] = evt->Errors[0];
876 evt->fEvent->Errors[1] = evt->Errors[1];
877 evt->fEvent->Errors[2] = evt->Errors[2];
878 evt->fEvent->Errors[3] = evt->Errors[3];
879
880 evt->fEvent->PCTime = evt->time.tv_sec;
881 evt->fEvent->PCUsec = evt->time.tv_usec;
882
883 evt->fEvent->NumBoards = evt->nBoard;
884
885 for (int ib=0; ib<NBOARDS; ib++)
886 evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time;
887
888 if (!eventCheck(*evt))
889 {
890 secondaryQueue.emplace(new EVT_CTRL2(kRequestEventCheckFailed, evt->runCtrl));
891 return;
892 }
893 }
894
895 // If file is open post the event for being written
896 secondaryQueue.post(evt);
897}
898
899// ==========================================================================
900// ==========================================================================
901
902/*
903 task 1-4:
904
905 lock1()-lock4();
906 while (1)
907 {
908 wait for signal [lockN]; // unlocked
909
910 while (n!=10)
911 wait sockets;
912 read;
913
914 lockM();
915 finished[n] = true;
916 signal(mainloop);
917 unlockM();
918 }
919
920
921 mainloop:
922
923 while (1)
924 {
925 lockM();
926 while (!finished[0] || !finished[1] ...)
927 wait for signal [lockM]; // unlocked... signals can be sent
928 finished[0-1] = false;
929 unlockM()
930
931 copy data to queue // locked
932
933 lockN[0-3];
934 signalN[0-3];
935 unlockN[0-3];
936 }
937
938
939 */
940
941/*
942 while (g_reset)
943 {
944 shared_ptr<EVT_CTRL2> evt = new shared_ptr<>;
945
946 // Check that all sockets are connected
947
948 for (int i=0; i<40; i++)
949 if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0)
950 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
951
952 while (g_reset)
953 {
954 if (READ_STRUCT::wait()<0)
955 break;
956
957 if (rc_epoll==0)
958 break;
959
960 for (int jj=0; jj<rc_epoll; jj++)
961 {
962 READ_STRUCT *rs = READ_STRUCT::get(jj);
963 if (!rs->connected)
964 continue;
965
966 const bool rc_read = rs->read();
967 if (!rc_read)
968 continue;
969
970 if (rs->bufTyp==READ_STRUCT::kHeader)
971 {
972 [...]
973 }
974
975 [...]
976
977 if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
978 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
979 }
980
981 if (once_a_second)
982 {
983 if (evt==timeout)
984 break;
985 }
986 }
987
988 if (evt.nBoards==actBoards)
989 primaryQueue.post(evt);
990 }
991*/
992
993Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
994
995// This corresponds more or less to fFile... should we merge both?
996shared_ptr<RUN_CTRL2> actrun;
997
998void CloseRunFile()
999{
1000 // Currently we need actrun here, to be able to set kFileClosed.
1001 // Apart from that we have to ensure that there is an open file at all
1002 // which we can close.
1003 // Submission to the primary queue ensures that the event
1004 // is placed at the right place in the processing chain.
1005 // (Corresponds to the correct run)
1006 primaryQueue.emplace(new EVT_CTRL2(kRequestManual, actrun));
1007}
1008
1009bool mainloop(READ_STRUCT *rd)
1010{
1011 factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
1012
1013 primaryQueue.start();
1014 secondaryQueue.start();
1015 processingQueue1.start();;
1016
1017 actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
1018
1019 //time in seconds
1020 time_t gi_SecTime = time(NULL)-1;
1021
1022 //loop until global variable g_runStat claims stop
1023 g_reset = 0;
1024 while (g_reset == 0)
1025 {
1026#ifdef USE_POLL
1027 int pp[40];
1028 int nn = 0;
1029 pollfd fds[40];
1030 for (int i=0; i<40; i++)
1031 {
1032 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1033 {
1034 fds[nn].fd = rd[i].socket;
1035 fds[nn].events = POLLIN;
1036 pp[nn] = i;
1037 nn++;
1038 }
1039 }
1040
1041 const int rc_epoll = poll(fds, nn, 100);
1042 if (rc_epoll<0)
1043 break;
1044#endif
1045
1046#ifdef USE_SELECT
1047 fd_set readfs;
1048 FD_ZERO(&readfs);
1049 int nfsd = 0;
1050 for (int i=0; i<NBOARDS; i++)
1051 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1052 {
1053 FD_SET(rd[i].socket, &readfs);
1054 if (rd[i].socket>nfsd)
1055 nfsd = rd[i].socket;
1056 }
1057
1058 timeval tv;
1059 tv.tv_sec = 0;
1060 tv.tv_usec = 100000;
1061 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
1062 // 0: timeout
1063 // -1: error
1064 if (rc_select<0)
1065 {
1066 factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
1067 continue;
1068 }
1069#endif
1070
1071#ifdef USE_EPOLL
1072 const int rc_epoll = READ_STRUCT::wait();
1073 if (rc_epoll<0)
1074 break;
1075#endif
1076
1077#if defined(USE_POLL)
1078 for (int jj=0; jj<nn; jj++)
1079#endif
1080#if defined(USE_EPOLL)
1081 for (int jj=0; jj<rc_epoll; jj++)
1082#endif
1083#if !defined(USE_EPOLL) && !defined(USE_POLL)
1084 for (int jj=0; jj<NBOARDS; jj++)
1085#endif
1086 {
1087#ifdef USE_SELECT
1088 if (!FD_ISSET(rs->socket, &readfs))
1089 continue;
1090#endif
1091
1092#ifdef USE_POLL
1093 if ((fds[jj].revents&POLLIN)==0)
1094 continue;
1095#endif
1096
1097#ifdef USE_EPOLL
1098 // FIXME: How to get i?
1099 READ_STRUCT *rs = READ_STRUCT::get(jj);
1100#endif
1101
1102#ifdef USE_POLL
1103 // FIXME: How to get i?
1104 READ_STRUCT *rs = &rd[pp[jj]];
1105#endif
1106
1107#if !defined(USE_POLL) && !defined(USE_EPOLL)
1108 const int i = (jj%4)*10 + (jj/4);
1109 READ_STRUCT *rs = &rd[i];
1110#endif
1111
1112#ifdef COMPLETE_EVENTS
1113 if (rs->bufTyp==READ_STRUCT::kWait)
1114 continue;
1115#endif
1116
1117 // ==================================================================
1118
1119 const bool rc_read = rs->read();
1120
1121 // Connect might have gotten closed during read
1122 gi_NumConnect[rs->sockId] = rs->connected;
1123 gj.numConn[rs->sockId] = rs->connected;
1124
1125 // Read either failed or disconnected, or the buffer is not yet full
1126 if (!rc_read)
1127 continue;
1128
1129 // ==================================================================
1130
1131 if (rs->bufTyp==READ_STRUCT::kHeader)
1132 {
1133 //check if startflag correct; else shift block ....
1134 // FIXME: This is not enough... this combination of
1135 // bytes can be anywhere... at least the end bytes
1136 // must be checked somewhere, too.
1137 uint k;
1138 for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
1139 {
1140 //if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
1141 if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0x01fb)
1142 break;
1143 }
1144 rs->skip += k;
1145
1146 //no start of header found
1147 if (k==sizeof(PEVNT_HEADER)-1)
1148 {
1149 rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1];
1150 rs->bufPos = rs->B+1;
1151 rs->bufLen = sizeof(PEVNT_HEADER)-1;
1152 continue;
1153 }
1154
1155 if (k > 0)
1156 {
1157 memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
1158
1159 rs->bufPos -= k;
1160 rs->bufLen += k;
1161
1162 continue; // We need to read more (bufLen>0)
1163 }
1164
1165 if (rs->skip>0)
1166 {
1167 factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
1168 rs->skip = 0;
1169 }
1170
1171 // Swap the header entries from network to host order
1172 rs->swapHeader();
1173
1174 rs->bufTyp = READ_STRUCT::kData;
1175 rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
1176
1177 debugHead(rs->B); // i and fadBoard not used
1178
1179 continue;
1180 }
1181
1182 const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
1183 if (end != 0xfe04)
1184 {
1185 factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
1186 rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
1187
1188 // ready to read next header
1189 rs->bufTyp = READ_STRUCT::kHeader;
1190 rs->bufLen = sizeof(PEVNT_HEADER);
1191 rs->bufPos = rs->B;
1192 // FIXME: What to do with the validity flag?
1193 continue;
1194 }
1195
1196 // get index into mBuffer for this event (create if needed)
1197 const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
1198
1199 // We have a valid entry, but no memory has yet been allocated
1200 if (evt && !evt->initMemory())
1201 {
1202 if (evt->reportMem)
1203 continue;
1204
1205 factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
1206 evt->reportMem = true;
1207 continue;
1208 }
1209
1210 // ready to read next header
1211 rs->bufTyp = READ_STRUCT::kHeader;
1212 rs->bufLen = sizeof(PEVNT_HEADER);
1213 rs->bufPos = rs->B;
1214
1215 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1216 if (!evt)
1217 continue;
1218
1219 // This should never happen
1220 if (evt->board[rs->sockId] != -1)
1221 {
1222 factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
1223 evt->evNum, rs->sockId, rs->sockId, rs->len());
1224 // FIXME: What to do with the validity flag?
1225 continue; // Continue reading next header
1226 }
1227
1228 // Swap the data entries (board headers) from network to host order
1229 rs->swapData();
1230
1231 // Copy data from rd[i] to mBuffer[evID]
1232 copyData(*rs, evt.get());
1233
1234#ifdef COMPLETE_EVENTS
1235 // Do not read anmymore from this board until the whole event has been received
1236 rs->bufTyp = READ_STRUCT::kWait;
1237#endif
1238 // now we have stored a new board contents into Event structure
1239 evt->board[rs->sockId] = rs->sockId;
1240 evt->header = evt->FADhead+rs->sockId;
1241 evt->nBoard++;
1242
1243#ifdef COMPLETE_EPOLL
1244 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
1245 {
1246 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1247 break;
1248 }
1249#endif
1250 // event not yet complete
1251 if (evt->nBoard < READ_STRUCT::activeSockets)
1252 continue;
1253
1254 // All previous events are now flagged as incomplete ("expired")
1255 // and will be removed. (This is a bit tricky, because pop_front()
1256 // would invalidate the current iterator if not done _after_ the increment)
1257 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1258 {
1259 const bool found = it->get()==evt.get();
1260 if (!found)
1261 reportIncomplete(*it, "expired");
1262 else
1263 primaryQueue.post(evt);
1264
1265 it++;
1266 evtCtrl.pop_front();
1267
1268 // We reached the current event, so we are done
1269 if (found)
1270 break;
1271 }
1272
1273#ifdef COMPLETE_EPOLL
1274 for (int j=0; j<40; j++)
1275 {
1276 epoll_event ev;
1277 ev.events = EPOLLIN;
1278 ev.data.ptr = &rd[j]; // user data (union: ev.ptr)
1279 if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
1280 {
1281 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1282 return;
1283 }
1284 }
1285#endif
1286
1287#ifdef COMPLETE_EVENTS
1288 for (int j=0; j<40; j++)
1289 {
1290 //if (rs->bufTyp==READ_STRUCT::kWait)
1291 {
1292 rs->bufTyp = READ_STRUCT::kHeader;
1293 rs->bufLen = sizeof(PEVNT_HEADER);
1294 rs->bufPos = rs->B;
1295 }
1296 }
1297#endif
1298 } // end for loop over all sockets
1299
1300 // ==================================================================
1301
1302 gj.bufNew = evtCtrl.size(); //# incomplete events in buffer
1303 gj.bufEvt = primaryQueue.size(); //# complete events in buffer
1304 gj.bufTot = gj.bufNew+gj.bufEvt; //# total events currently in buffer
1305 if (gj.bufNew>gj.maxEvt) //# maximum events in buffer past cycle
1306 gj.maxEvt = gj.bufNew;
1307
1308 // ==================================================================
1309
1310 const time_t actTime = time(NULL);
1311 if (actTime == gi_SecTime)
1312 {
1313#if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL)
1314 if (evtCtrl.empty())
1315 usleep(1);
1316#endif
1317 continue;
1318 }
1319 gi_SecTime = actTime;
1320
1321 // ==================================================================
1322 //loop over all active events and flag those older than read-timeout
1323 //delete those that are written to disk ....
1324
1325 // This could be improved having the pointer which separates the queue with
1326 // the incomplete events from the queue with the complete events
1327 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1328 {
1329 // A reference is enough because the shared_ptr is hold by the evtCtrl
1330 const shared_ptr<EVT_CTRL2> &evt = *it;
1331
1332 // The first event is the oldest. If the first event within the
1333 // timeout window was received, we can stop searchinf further.
1334 if (evt->time.tv_sec>=actTime - 30)
1335 break;
1336
1337 // This will result in the emission of a dim service.
1338 // It doesn't matter if that takes comparably long,
1339 // because we have to stop the run anyway.
1340 const uint64_t rep = reportIncomplete(evt, "timeout");
1341 factReportIncomplete(rep);
1342
1343 it++;
1344 evtCtrl.pop_front();
1345 }
1346
1347 // =================================================================
1348
1349 gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
1350 gj.usdMem = Memory::max_inuse;
1351 gj.totMem = Memory::allocated;
1352
1353 gj.deltaT = 1000; // temporary, must be improved
1354
1355 bool changed = false;
1356
1357 for (int ib=0; ib<NBOARDS; ib++)
1358 {
1359 gj.rateBytes[ib] = rd[ib].rateBytes;
1360 gj.totBytes[ib] += rd[ib].rateBytes;
1361
1362 if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
1363 changed = true;
1364
1365 gi_NumConnect[ib] = rd[ib].connected;
1366 gj.numConn[ib] = rd[ib].connected;
1367 }
1368
1369 factStat(gj);
1370
1371 Memory::max_inuse = 0;
1372 gj.maxEvt = 0;
1373
1374 for (int ib=0; ib<NBOARDS; ib++)
1375 rd[ib].rateBytes = 0;
1376
1377 // =================================================================
1378
1379 // This is a fake event to trigger possible run-closing conditions once a second
1380 // FIXME: This is not yet ideal because a file would never be closed
1381 // if a new file has been started and no events of the new file
1382 // have been received yet
1383 int request = kRequestNone;
1384
1385 // If nothing was received for more than 5min, close file
1386 if (actTime-actrun->lastTime>300)
1387 request |= kRequestTimeout;
1388
1389 // If connection status has changed
1390 if (changed)
1391 request |= kRequestConnectionChange;
1392
1393 if (request!=kRequestNone)
1394 runFinished();
1395
1396 if (actrun->fileStat==kFileOpen)
1397 primaryQueue.emplace(new EVT_CTRL2(request, actrun));
1398 }
1399
1400 // 1: Stop, wait for event to get processed
1401 // 2: Stop, finish immediately
1402 // 101: Restart, wait for events to get processed
1403 // 101: Restart, finish immediately
1404 //
1405 const int gi_reset = g_reset;
1406
1407 const bool abort = gi_reset%100==2;
1408
1409 factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
1410
1411 primaryQueue.wait(abort);
1412 secondaryQueue.wait(abort);
1413 processingQueue1.wait(abort);
1414
1415 // Here we also destroy all runCtrl structures and hence close all open files
1416 evtCtrl.clear();
1417 actrun.reset();
1418
1419 factPrintf(MessageImp::kInfo, "Exit read Process...");
1420 factPrintf(MessageImp::kInfo, "%ld Bytes flagged as in-use.", Memory::inuse);
1421
1422 factStat(gj);
1423
1424 return gi_reset>=100;
1425}
1426
1427// ==========================================================================
1428// ==========================================================================
1429
1430void StartEvtBuild()
1431{
1432 factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
1433
1434 memset(gi_NumConnect, 0, NBOARDS*sizeof(*gi_NumConnect));
1435 memset(gj.numConn, 0, NBOARDS*sizeof(*gj.numConn));
1436 memset(gj.totBytes, 0, NBOARDS*sizeof(*gj.totBytes));
1437
1438 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
1439 gj.maxMem = gj.xxxMem = 0;
1440
1441 gj.usdMem = Memory::inuse;
1442 gj.totMem = Memory::allocated;
1443
1444 gj.bufNew = gj.bufEvt = 0;
1445 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
1446 gj.readStat = gj.procStat = gj.writStat = 0;
1447
1448
1449
1450 READ_STRUCT rd[NBOARDS];
1451
1452 // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets)
1453 for (int i=0; i<NBOARDS; i++)
1454 rd[i].sockId = i;
1455
1456 while (mainloop(rd));
1457
1458 //must close all open sockets ...
1459 factPrintf(MessageImp::kInfo, "Close all sockets...");
1460
1461 READ_STRUCT::close();
1462
1463 // Now all sockets get closed. This is not reflected in gi_NumConnect
1464 // The current workaround is to count all sockets as closed when the thread is not running
1465}
Note: See TracBrowser for help on using the repository browser.