source: trunk/FACT++/src/EventBuilder.cc@ 16040

Last change on this file since 16040 was 16040, checked in by tbretz, 11 years ago
First working version of the event builder in C++
File size: 39.0 KB
Line 
1#include <sys/time.h>
2#include <sys/epoll.h>
3#include <netinet/tcp.h>
4
5#include <cstring>
6#include <cstdarg>
7#include <list>
8#include <forward_list>
9
10#include "queue.h"
11
12#include "MessageImp.h"
13
14using namespace std;
15
16#include "EventBuilder.h"
17
18#define MIN_LEN 32 // min #bytes needed to interpret FADheader
19#define MAX_LEN (36*3*1024) // (data+header)*num channels
20
21#define COMPLETE_EVENTS
22//#define USE_EPOLL
23//#define USE_SELECT
24
25// ==========================================================================
26
27bool runOpen(const shared_ptr<EVT_CTRL2> &evt);
28bool runWrite(const shared_ptr<EVT_CTRL2> &evt);
29void runClose();
30void applyCalib(const shared_ptr<EVT_CTRL2> &evt);
31void factOut(int severity, const char *message);
32void factReportIncomplete (uint64_t rep);
33void gotNewRun(RUN_CTRL2 &run);
34void runFinished();
35void factStat(GUI_STAT gj);
36int eventCheck(const shared_ptr<EVT_CTRL2> &evt);
37void debugHead(void *buf);
38
39// ==========================================================================
40
41int g_reset;
42
43size_t g_maxMem; //maximum memory allowed for buffer
44
45FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
46
47uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
48
49GUI_STAT gj;
50
51// ==========================================================================
52
53void factPrintf(int severity, const char *fmt, ...)
54{
55 char str[1000];
56
57 va_list ap;
58 va_start(ap, fmt);
59 vsnprintf(str, 1000, fmt, ap);
60 va_end(ap);
61
62 factOut(severity, str);
63}
64
65// ==========================================================================
66
67#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
68#define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)
69
70namespace Memory
71{
72 uint64_t inuse = 0;
73 uint64_t allocated = 0;
74
75 uint64_t max_inuse = 0;
76
77 mutex mtx;
78
79 forward_list<void*> memory;
80
81 void *malloc()
82 {
83 // No free slot available, next alloc would exceed max memory
84 if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem)
85 return NULL;
86
87 // We will return this amount of memory
88 // This is not 100% thread safe, but it is not a super accurate measure anyway
89 inuse += MAX_TOT_MEM;
90 if (inuse>max_inuse)
91 max_inuse = inuse;
92
93 void *mem = NULL;
94
95 if (memory.empty())
96 {
97 // No free slot available, allocate a new one
98 allocated += MAX_TOT_MEM;
99 mem = new char[MAX_TOT_MEM];
100 }
101 else
102 {
103 // Get the next free slot from the stack and return it
104 const lock_guard<mutex> lock(mtx);
105 mem = memory.front();
106 memory.pop_front();
107 }
108
109 memset(mem, 0, MAX_HEAD_MEM);
110 return mem;
111 };
112
113 void free(void *mem)
114 {
115 if (!mem)
116 return;
117
118 // Decrease the amont of memory in use accordingly
119 inuse -= MAX_TOT_MEM;
120
121 // If the maximum memory has changed, we might be over the limit.
122 // In this case: free a slot
123 if (allocated>g_maxMem)
124 {
125 delete [] (char*)mem;
126 allocated -= MAX_TOT_MEM;
127 return;
128 }
129
130 const lock_guard<mutex> lock(mtx);
131 memory.push_front(mem);
132 }
133};
134
135// ==========================================================================
136
137struct READ_STRUCT
138{
139 enum buftyp_t
140 {
141 kStream,
142 kHeader,
143 kData,
144#ifdef COMPLETE_EVENTS
145 kWait
146#endif
147 };
148
149 // ---------- connection ----------
150
151 static uint activeSockets;
152
153 int sockId; // socket id (board number)
154 int socket; // socket handle
155 bool connected; // is this socket connected?
156
157 struct sockaddr_in SockAddr; // Socket address copied from wrapper during socket creation
158
159 // ------------ epoll -------------
160
161 static int fd_epoll;
162 static epoll_event events[NBOARDS];
163
164 static void init();
165 static void close();
166 static int wait();
167 static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); }
168
169 // ------------ buffer ------------
170
171 buftyp_t bufTyp; // what are we reading at the moment: 0=header 1=data -1=skip ...
172
173 uint32_t bufLen; // number of bytes left to read
174 uint8_t *bufPos; // next byte to read to the buffer next
175
176 union
177 {
178 uint8_t B[MAX_LEN];
179 uint16_t S[MAX_LEN / 2];
180 uint32_t I[MAX_LEN / 4];
181 uint64_t L[MAX_LEN / 8];
182 PEVNT_HEADER H;
183 };
184
185 uint64_t rateBytes;
186 uint32_t skip; // number of bytes skipped before start of event
187 bool repmem; // reportet no mmemory free
188
189 uint32_t len() const { return uint32_t(H.package_length)*2; }
190
191 void swapHeader();
192 void swapData();
193
194 // --------------------------------
195
196 READ_STRUCT() : socket(-1), connected(false), rateBytes(0)
197 {
198 if (fd_epoll<0)
199 init();
200 }
201 ~READ_STRUCT()
202 {
203 destroy();
204 }
205
206 void destroy();
207 bool create(sockaddr_in addr);
208 void check(int, sockaddr_in addr);
209 bool read();
210};
211
212int READ_STRUCT::wait()
213{
214 // wait for something to do...
215 const int rc = epoll_wait(fd_epoll, events, NBOARDS, 10); // max, timeout[ms]
216 if (rc>=0)
217 return rc;
218
219 if (errno==EINTR) // timout or signal interruption
220 return 0;
221
222 factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno);
223 return -1;
224}
225
226uint READ_STRUCT::activeSockets = 0;
227int READ_STRUCT::fd_epoll = -1;
228epoll_event READ_STRUCT::events[NBOARDS];
229
230void READ_STRUCT::init()
231{
232 if (fd_epoll>=0)
233 return;
234
235#ifdef USE_EPOLL
236 fd_epoll = epoll_create(NBOARDS);
237 if (fd_epoll<0)
238 {
239 factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno);
240 return;
241 }
242#endif
243}
244
245void READ_STRUCT::close()
246{
247#ifdef USE_EPOLL
248 if (::close(fd_epoll) > 0)
249 factPrintf(MessageImp::kFatal, "Closing epoll: %m (close,rc=%d)", errno);
250 else
251 factPrintf(MessageImp::kInfo, "Succesfully closed epoll");
252#endif
253
254 fd_epoll = -1;
255}
256
257bool READ_STRUCT::create(sockaddr_in sockAddr)
258{
259 if (socket>=0)
260 return false;
261
262 const int port = ntohs(sockAddr.sin_port) + 1;
263
264 SockAddr.sin_family = sockAddr.sin_family;
265 SockAddr.sin_addr = sockAddr.sin_addr;
266 SockAddr.sin_port = htons(port);
267
268 if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
269 {
270 factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno);
271 socket = -1;
272 return false;
273 }
274
275 int optval = 1;
276 if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0)
277 factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
278
279 optval = 10; //start after 10 seconds
280 if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0)
281 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
282
283 optval = 10; //do every 10 seconds
284 if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0)
285 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
286
287 optval = 2; //close after 2 unsuccessful tries
288 if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0)
289 factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
290
291 factPrintf(MessageImp::kInfo, "Successfully generated socket %d", sockId);
292
293 //connected = false;
294 activeSockets++;
295
296 return true;
297}
298
299void READ_STRUCT::destroy()
300{
301 if (socket==-1)
302 return;
303
304#ifdef USE_EPOLL
305 // strictly speaking this should not be necessary
306 if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
307 factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
308#endif
309
310 if (::close(socket) > 0)
311 factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
312 else
313 factPrintf(MessageImp::kInfo, "Succesfully closed socket %d", sockId);
314
315 socket = -1;
316 connected = false;
317 activeSockets--;
318}
319
320void READ_STRUCT::check(int sockDef, sockaddr_in addr)
321{
322 // Continue in the most most likely case (performance)
323 //if (socket>=0 && sockDef!=0 && connected)
324 // return;
325
326 // socket open, but should not be open
327 if (socket>=0 && sockDef==0)
328 destroy();
329
330 // Socket closed, but should be open
331 if (socket<0 && sockDef!=0)
332 create(addr); //generate address and socket
333
334 // Socket closed
335 if (socket<0)
336 return;
337
338 // Socket open and connected: Nothing to do
339 if (connected)
340 return;
341
342 //try to connect if not yet done
343 const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
344 if (rc == -1)
345 return;
346
347 connected = true;
348
349 if (sockDef<0)
350 {
351 bufTyp = READ_STRUCT::kStream; // full data to be skipped
352 bufLen = MAX_LEN; // huge for skipping
353 }
354 else
355 {
356 bufTyp = READ_STRUCT::kHeader; // expect a header
357 bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
358 }
359
360 bufPos = B; // no byte read so far
361 skip = 0; // start empty
362 repmem = false;
363
364 factPrintf(MessageImp::kInfo, "New connection %d (%d)", sockId, socket);
365
366#ifdef USE_EPOLL
367 epoll_event ev;
368 ev.events = EPOLLIN;
369 ev.data.ptr = this; // user data (union: ev.ptr)
370 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
371 factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
372#endif
373}
374
375bool READ_STRUCT::read()
376{
377 if (bufLen==0)
378 return true;
379
380 const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
381 // recv failed
382 if (jrd<0)
383 {
384 // There was just nothing waiting
385 if (errno==EWOULDBLOCK || errno==EAGAIN)
386 return false;
387
388 factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno);
389 return false;
390 }
391
392 // connection was closed ...
393 if (jrd==0)
394 {
395 factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId);
396
397 destroy();//DestroySocket(rd[i]); //generate address and socket
398 return false;
399 }
400
401 rateBytes += jrd;
402
403 // are we skipping this board ...
404 if (bufTyp==kStream)
405 return false;
406
407 bufPos += jrd; //==> prepare for continuation
408 bufLen -= jrd;
409
410 // not yet all read
411 return bufLen==0;
412}
413
414void READ_STRUCT::swapHeader()
415{
416 S[1] = ntohs(S[1]); // package_length (bytes not swapped!)
417 S[2] = ntohs(S[2]); // version_no
418 S[3] = ntohs(S[3]); // PLLLCK
419 S[4] = ntohs(S[4]); // trigger_crc
420 S[5] = ntohs(S[5]); // trigger_type
421
422 I[3] = ntohl(I[3]); // trigger_id
423 I[4] = ntohl(I[4]); // fad_evt_counter
424 I[5] = ntohl(I[5]); // REFCLK_frequency
425
426 S[12] = ntohs(S[12]); // board id
427 S[13] = ntohs(S[13]); // adc_clock_phase_shift
428 S[14] = ntohs(S[14]); // number_of_triggers_to_generate
429 S[15] = ntohs(S[15]); // trigger_generator_prescaler
430
431 I[10] = ntohl(I[10]); // runnumber;
432 I[11] = ntohl(I[11]); // time;
433
434 for (int s=24; s<24+NTemp+NDAC; s++)
435 S[s] = ntohs(S[s]); // drs_temperature / dac
436}
437
438void READ_STRUCT::swapData()
439{
440 // swapEventHeaderBytes: End of the header. to channels now
441
442 int i = 36;
443 for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
444 {
445 S[i+0] = ntohs(S[i+0]);//id
446 S[i+1] = ntohs(S[i+1]);//start_cell
447 S[i+2] = ntohs(S[i+2]);//roi
448 S[i+3] = ntohs(S[i+3]);//filling
449
450 i += 4+S[i+2];//skip the pixel data
451 }
452}
453
454// ==========================================================================
455
456bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
457{
458 int xjr = -1;
459 int xkr = -1;
460
461 //points to the very first roi
462 int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
463
464 roi[0] = ntohs(rd.S[roiPtr]);
465
466 for (int jr = 0; jr < 9; jr++)
467 {
468 roi[jr] = ntohs(rd.S[roiPtr]);
469
470 if (roi[jr]>1024)
471 {
472 factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]);
473 return false;
474 }
475
476 // Check that the roi of pixels jr are compatible with the one of pixel 0
477 if (jr!=8 && roi[jr]!=roi[0])
478 {
479 xjr = jr;
480 break;
481 }
482
483 // Check that the roi of all other DRS chips on boards are compatible
484 for (int kr = 1; kr < 4; kr++)
485 {
486 const int kroi = ntohs(rd.S[roiPtr]);
487 if (kroi != roi[jr])
488 {
489 xjr = jr;
490 xkr = kr;
491 break;
492 }
493 roiPtr += kroi+4;
494 }
495 }
496
497 if (xjr>=0)
498 {
499 if (xkr<0)
500 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
501 else
502 factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr]));
503
504 return false;
505 }
506
507 if (roi[8] < roi[0])
508 {
509 factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
510 return false;
511 }
512
513 return true;
514}
515
516list<shared_ptr<EVT_CTRL2>> evtCtrl;
517
518shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
519{
520 uint16_t nRoi[9];
521 if (!checkRoiConsistency(rd, nRoi))
522 return shared_ptr<EVT_CTRL2>();
523
524 for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
525 {
526 // A reference is enough because the evtCtrl holds the shared_ptr anyway
527 const shared_ptr<EVT_CTRL2> &evt = *it;
528
529 // If the run is different, go on searching.
530 // We cannot stop searching if a lower run-id is found as in
531 // the case of the events, because theoretically, there
532 // can be the same run on two different days.
533 if (rd.H.runnumber != evt->runNum)
534 continue;
535
536 // If the ID of the new event if higher than the last one stored
537 // in that run, we have to assign a new slot (leave the loop)
538 if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/)
539 break;
540
541 if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/)
542 continue;
543
544 // We have found an entry with the same runID and evtID
545 // Check if ROI is consistent
546 if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
547 {
548 factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
549 evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
550 return shared_ptr<EVT_CTRL2>();
551 }
552
553 // count for inconsistencies
554 if (evt->trgNum != rd.H.trigger_id)
555 evt->Errors[0]++;
556 if (evt->trgTyp != rd.H.trigger_type)
557 evt->Errors[2]++;
558
559 //everything seems fine so far ==> use this slot ....
560 return evt;
561 }
562
563 if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
564 {
565 factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
566 actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter);
567 return shared_ptr<EVT_CTRL2>();
568 }
569
570 shared_ptr<EVT_CTRL2> evt(new EVT_CTRL2);
571
572 gettimeofday(&evt->time, NULL);
573
574 evt->runNum = rd.H.runnumber;
575 evt->evNum = rd.H.fad_evt_counter;
576
577 evt->trgNum = rd.H.trigger_id;
578 evt->trgTyp = rd.H.trigger_type;
579
580 evt->nRoi = nRoi[0];
581 evt->nRoiTM = nRoi[8];
582
583 const bool newrun = actrun->runId != rd.H.runnumber;
584 if (newrun)
585 {
586 // Since we have started a new run, we know already when to close the
587 // previous run in terms of number of events
588 actrun->maxEvt = actrun->lastEvt;
589
590 factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d and roi_tm=%d, prev=%d",
591 rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
592
593 // The new run is the active run now
594 actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
595
596 const time_t &tsec = evt->time.tv_sec;
597
598 actrun->openTime = tsec;
599 actrun->closeTime = tsec + 3600 * 24; // max time allowed
600 actrun->runId = rd.H.runnumber;
601 actrun->roi0 = nRoi[0]; // FIXME: Make obsolete!
602 actrun->roi8 = nRoi[8]; // FIXME: Make obsolete!
603 }
604
605 // Increase the number of events we have started to receive in this run
606 actrun->lastTime = evt->time.tv_sec; // Time when the last event was received
607 actrun->lastEvt++;
608
609 // Keep pointer to run of this event
610 evt->runCtrl = actrun;
611
612 // Secure access to evtCtrl against access in CloseRunFile
613 // This should be the last... otherwise we can run into threading issues
614 // if the event is accessed before it is fully initialized.
615 evtCtrl.push_back(evt);
616
617 // Signal the fadctrl that a new run has been started
618 // Note this is the only place at which we can ensure that
619 // gotnewRun is called only once
620 // Note that this will callback CloseRunFile, therefor the event
621 // must already be in the evtCtrl structure
622 if (newrun)
623 gotNewRun(*actrun);
624
625 // An event can be the first and the last, but not the last and the first.
626 // Therefore gotNewRun is called before runFinished.
627 // runFinished signals that the last event of a run was just received. Processing
628 // might still be ongoing, but we can start a new run.
629 const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached
630 const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached
631 if (!cond1 || !cond2)
632 runFinished();
633
634 return evt;
635}
636
637
638void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
639{
640 const int i = rBuf.sockId;
641
642 memcpy(evt->FADhead.get()+i, &rBuf.H, sizeof(PEVNT_HEADER));
643
644 int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts
645
646 // consistency of ROIs have been checked already (is it all correct?)
647 const uint16_t &roi = rBuf.S[src+2];
648
649 // different sort in FAD board.....
650 for (int px = 0; px < 9; px++)
651 {
652 for (int drs = 0; drs < 4; drs++)
653 {
654 const int16_t pixC = rBuf.S[src+1]; // start-cell
655 const int16_t pixR = rBuf.S[src+2]; // roi
656 //here we should check if pixH is correct ....
657
658 const int pixS = i*36 + drs*9 + px;
659
660 evt->fEvent->StartPix[pixS] = pixC;
661
662 memcpy(evt->fEvent->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2);
663
664 src += 4+pixR;
665
666 // Treatment for ch 9 (TM channel)
667 if (px != 8)
668 continue;
669
670 const int tmS = i*4 + drs;
671
672 //and we have additional TM info
673 if (pixR > roi)
674 {
675 evt->fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024;
676
677 memcpy(evt->fEvent->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2);
678 }
679 else
680 {
681 evt->fEvent->StartTM[tmS] = -1;
682 }
683 }
684 }
685}
686
687// ==========================================================================
688
689uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt)
690{
691 factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)",
692 evt->runNum, evt->evNum, evtCtrl.size(), txt);
693
694 uint64_t report = 0;
695
696 char str[1000];
697
698 int ik=0;
699 for (int ib=0; ib<NBOARDS; ib++)
700 {
701 if (ib%10==0)
702 str[ik++] = '|';
703
704 const int jb = evt->board[ib];
705 if (jb>=0) // data received from that board
706 {
707 str[ik++] = '0'+(jb%10);
708 continue;
709 }
710
711 // FIXME: This is not synchronous... it reports
712 // accoridng to the current connection status, not w.r.t. to the
713 // one when the event was taken.
714 if (gi_NumConnect[ib]==0) // board not connected
715 {
716 str[ik++] = 'x';
717 continue;
718 }
719
720 // data from this board lost
721 str[ik++] = '.';
722 report |= ((uint64_t)1)<<ib;
723 }
724
725 str[ik++] = '|';
726 str[ik] = 0;
727
728 factOut(MessageImp::kWarn, str);
729
730 return report;
731}
732
733// ==========================================================================
734// ==========================================================================
735
736Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&applyCalib, placeholders::_1));
737
738// If this is not convenient anymore, it could be replaced by
739// a command queue, to which command+data is posted,
740// (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo)
741void writeEvt(const shared_ptr<EVT_CTRL2> &evt)
742{
743 const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
744
745 bool rc1 = true;
746
747 // Is this a valid event or just an empty event to trigger run close?
748 // If this is not an empty event open the new run-file
749 // Empty events are there to trigger run-closing conditions
750 if (evt->runNum>=0)
751 {
752 // File not yet open
753 if (run->fileStat==kFileNotYetOpen)
754 {
755 // runOpen will close a previous run, if still open
756 if (!runOpen(evt))
757 {
758 factPrintf(MessageImp::kError, "writeEvt: Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
759 run->fileStat = kFileClosed;
760 return;
761 }
762
763 factPrintf(MessageImp::kInfo, "writeEvt: Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
764 run->fileStat = kFileOpen;
765 }
766
767 // Here we have a valid calibration and can go on with that.
768 processingQueue1.post(evt);
769
770 // File already closed
771 if (run->fileStat==kFileClosed)
772 return;
773
774 rc1 = runWrite(evt);
775 if (!rc1)
776 factPrintf(MessageImp::kError, "writeEvt: Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
777 }
778
779 const bool cond1 = run->lastEvt < run->maxEvt; // max number of events not reached
780 const bool cond2 = run->lastTime < run->closeTime; // max time not reached
781 const bool cond3 = rc1; // Write successfull
782
783 // File is not yet to be closed.
784 if (cond1 && cond2 && cond3)
785 return;
786
787 runClose();
788 run->fileStat = kFileClosed;
789
790 string str;
791 if (!cond1) str += to_string(run->maxEvt)+" evts reached";
792 if (!cond1 && (!cond2 || !cond3)) str += ", ";
793 if (!cond2) str += to_string(run->closeTime-run->openTime)+"s reached";
794 if ((!cond1 || !cond2) && !cond3) str += ", ";
795 if (!cond3) str += "runWrite failed";
796 factPrintf(MessageImp::kInfo, "File closed because %s", str.c_str());
797}
798
799Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1));
800
801void procEvt(const shared_ptr<EVT_CTRL2> &evt)
802{
803 if (evt->runNum>=0)
804 {
805 evt->fEvent->Errors[0] = evt->Errors[0];
806 evt->fEvent->Errors[1] = evt->Errors[1];
807 evt->fEvent->Errors[2] = evt->Errors[2];
808 evt->fEvent->Errors[3] = evt->Errors[3];
809
810 for (int ib=0; ib<NBOARDS; ib++)
811 evt->fEvent->BoardTime[ib] = evt->FADhead.get()[ib].time;
812
813 const int rc = eventCheck(evt);
814 if (rc < 0)
815 return;
816 }
817
818 // If file is open post the event for being written
819 secondaryQueue.post(evt);
820}
821
822// ==========================================================================
823// ==========================================================================
824
825shared_ptr<RUN_CTRL2> actrun; // needed in CloseRunFile
826
827/*
828 task 1-4:
829
830 lock1()-lock4();
831 while (1)
832 {
833 wait for signal [lockN]; // unlocked
834
835 while (n!=10)
836 wait sockets;
837 read;
838
839 lockM();
840 finished[n] = true;
841 signal(mainloop);
842 unlockM();
843 }
844
845
846 mainloop:
847
848 while (1)
849 {
850 lockM();
851 while (!finished[0] || !finished[1] ...)
852 wait for signal [lockM]; // unlocked... signals can be sent
853 finished[0-1] = false;
854 unlockM()
855
856 copy data to queue // locked
857
858 lockN[0-3];
859 signalN[0-3];
860 unlockN[0-3];
861 }
862
863
864 */
865
866/*
867 while (g_reset)
868 {
869 shared_ptr<EVT_CTRL2> evt = new shared_ptr<>;
870
871 // Check that all sockets are connected
872
873 for (int i=0; i<40; i++)
874 if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0)
875 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
876
877 while (g_reset)
878 {
879 if (READ_STRUCT::wait()<0)
880 break;
881
882 if (rc_epoll==0)
883 break;
884
885 for (int jj=0; jj<rc_epoll; jj++)
886 {
887 READ_STRUCT *rs = READ_STRUCT::get(jj);
888 if (!rs->connected)
889 continue;
890
891 const bool rc_read = rs->read();
892 if (!rc_read)
893 continue;
894
895 if (rs->bufTyp==READ_STRUCT::kHeader)
896 {
897 [...]
898 }
899
900 [...]
901
902 if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
903 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
904 }
905
906 if (once_a_second)
907 {
908 if (evt==timeout)
909 break;
910 }
911 }
912
913 if (evt.nBoards==actBoards)
914 primaryQueue.post(evt);
915 }
916*/
917
918void CloseRunFile()
919{
920 // Create a copy of the shared_ptr to ensure
921 // is not replaced in the middle of the action
922 const shared_ptr<RUN_CTRL2> run = actrun;
923 run->maxEvt = run->lastEvt;
924}
925
926bool mainloop(READ_STRUCT *rd)
927{
928 factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
929
930 Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
931
932 primaryQueue.start();
933 secondaryQueue.start();
934
935 actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
936
937 //time in seconds
938 time_t gi_SecTime = time(NULL)-1;
939
940 //loop until global variable g_runStat claims stop
941 g_reset = 0;
942 while (g_reset == 0)
943 {
944#ifdef USE_SELECT
945 fd_set readfs;
946 FD_ZERO(&readfs);
947 int nfsd = 0;
948 for (int i=0; i<NBOARDS; i++)
949 if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
950 {
951 FD_SET(rd[i].socket, &readfs);
952 if (rd[i].socket>nfsd)
953 nfsd = rd[i].socket;
954 }
955
956 timeval tv;
957 tv.tv_sec = 0;
958 tv.tv_usec = 100;
959 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
960 // 0: timeout
961 // -1: error
962 if (rc_select<0)
963 {
964 factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
965 continue;
966 }
967#endif
968
969#ifdef USE_EPOLL
970 const int rc_epoll = READ_STRUCT::wait();
971 if (rc_epoll<0)
972 break;
973#endif
974
975#ifdef USE_EPOLL
976 for (int jj=0; jj<rc_epoll; jj++)
977#else
978 for (int jj=0; jj<NBOARDS; jj++)
979#endif
980 {
981#ifdef USE_EPOLL
982 // FIXME: How to get i?
983 READ_STRUCT *rs = READ_STRUCT::get(jj);
984#else
985
986 const int i = (jj%4)*10 + (jj/4);
987 READ_STRUCT *rs = &rd[i];
988 if (!rs->connected)
989 continue;
990#endif
991
992#ifdef USE_SELECT
993 if (!FD_ISSET(rs->socket, &readfs))
994 continue;
995#endif
996
997
998#ifdef COMPLETE_EVENTS
999 if (rs->bufTyp==READ_STRUCT::kWait)
1000 continue;
1001#endif
1002
1003 // ==================================================================
1004
1005 const bool rc_read = rs->read();
1006
1007 // Connect might have gotten closed during read
1008 gi_NumConnect[rs->sockId] = rs->connected;
1009 gj.numConn[rs->sockId] = rs->connected;
1010
1011 // Read either failed or disconnected, or the buffer is not yet full
1012 if (!rc_read)
1013 continue;
1014
1015 // ==================================================================
1016
1017 if (rs->bufTyp==READ_STRUCT::kHeader)
1018 {
1019 //check if startflag correct; else shift block ....
1020 // FIXME: This is not enough... this combination of
1021 // bytes can be anywhere... at least the end bytes
1022 // must be checked somewhere, too.
1023 uint k;
1024 for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
1025 {
1026 if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
1027 //if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0xfb01)
1028 break;
1029 }
1030 rs->skip += k;
1031
1032 //no start of header found
1033 if (k==sizeof(PEVNT_HEADER)-1)
1034 {
1035 rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1];
1036 rs->bufPos = rs->B+1;
1037 rs->bufLen = sizeof(PEVNT_HEADER)-1;
1038 continue;
1039 }
1040
1041 if (k > 0)
1042 {
1043 memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
1044
1045 rs->bufPos -= k;
1046 rs->bufLen += k;
1047
1048 continue; // We need to read more (bufLen>0)
1049 }
1050
1051 if (rs->skip>0)
1052 {
1053 factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
1054 rs->skip = 0;
1055 }
1056
1057 // Swap the header entries from network to host order
1058 rs->swapHeader();
1059
1060 rs->bufTyp = READ_STRUCT::kData;
1061 rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
1062
1063 debugHead(rs->B); // i and fadBoard not used
1064
1065 continue;
1066 }
1067
1068 const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
1069 if (end != 0xfe04)
1070 {
1071 factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
1072 rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
1073
1074 // ready to read next header
1075 rs->bufTyp = READ_STRUCT::kHeader;
1076 rs->bufLen = sizeof(PEVNT_HEADER);
1077 rs->bufPos = rs->B;
1078 // FIXME: What to do with the validity flag?
1079 continue;
1080 }
1081
1082 // get index into mBuffer for this event (create if needed)
1083 const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
1084
1085 // We have a valid entry, but no memory has yet been allocated
1086 if (evt && !evt->FADhead)
1087 {
1088 // Try to get memory from the big buffer
1089 PEVNT_HEADER *mem = (PEVNT_HEADER*)Memory::malloc();
1090 if (!mem)
1091 {
1092 // If this works properly, this is a hack which can be removed, or
1093 // replaced by a signal or dim message
1094 if (!rs->repmem)
1095 {
1096 factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
1097 rs->repmem = true;
1098 }
1099 continue;
1100 }
1101
1102 evt->initEvent(shared_ptr<PEVNT_HEADER>(mem, Memory::free));
1103 }
1104
1105 // ready to read next header
1106 rs->bufTyp = READ_STRUCT::kHeader;
1107 rs->bufLen = sizeof(PEVNT_HEADER);
1108 rs->bufPos = rs->B;
1109
1110 // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1111 if (!evt)
1112 continue;
1113
1114 /*
1115 const int fad = (i/10)<<8)|(i%10);
1116 if (fad != rs->H.board_id)
1117 {
1118 factPrintf(MessageImp::kWarn, "Board ID mismatch. Expected %x, got %x", fad, rs->H.board_id);
1119 }*/
1120
1121 // This should never happen
1122 if (evt->board[rs->sockId] != -1)
1123 {
1124 factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
1125 evt->evNum, rs->sockId, rs->sockId, rs->len());
1126 // FIXME: What to do with the validity flag?
1127 continue; // Continue reading next header
1128 }
1129
1130 // Swap the data entries (board headers) from network to host order
1131 rs->swapData();
1132
1133 // Copy data from rd[i] to mBuffer[evID]
1134 copyData(*rs, evt.get());
1135
1136#ifdef COMPLETE_EVENTS
1137 // Do not read anmymore from this board until the whole event has been received
1138 rs->bufTyp = READ_STRUCT::kWait;
1139#endif
1140 // now we have stored a new board contents into Event structure
1141 evt->fEvent->NumBoards++;
1142 evt->board[rs->sockId] = rs->sockId;
1143 evt->nBoard++;
1144
1145 // event not yet complete
1146 if (evt->nBoard < READ_STRUCT::activeSockets)
1147 continue;
1148
1149 // All previous events are now flagged as incomplete ("expired")
1150 // and will be removed. (This is a bit tricky, because pop_front()
1151 // would invalidate the current iterator if not done _after_ the increment)
1152 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1153 {
1154 const bool found = it->get()==evt.get();
1155 if (!found)
1156 reportIncomplete(*it, "expired");
1157 else
1158 primaryQueue.post(evt);
1159
1160 it++;
1161 evtCtrl.pop_front();
1162
1163 // We reached the current event, so we are done
1164 if (found)
1165 break;
1166 }
1167
1168#ifdef COMPLETE_EVENTS
1169 for (int j=0; j<40; j++)
1170 {
1171 //if (rs->bufTyp==READ_STRUCT::kWait)
1172 {
1173 rs->bufTyp = READ_STRUCT::kHeader;
1174 rs->bufLen = sizeof(PEVNT_HEADER);
1175 rs->bufPos = rs->B;
1176 }
1177 }
1178#endif
1179 } // end for loop over all sockets
1180
1181 // ==================================================================
1182
1183 // +1 -> idx=0
1184 // -1 -> idx=0
1185 // +2 -> idx=0
1186 // -2 -> idx=0
1187 // +3 -> idx=0
1188 // -3 -> idx=0
1189 // +4 -> idx=0
1190 // -4 -> idx=0
1191 // +5 -> idx=0
1192 // -5 -> idx=0
1193 // +6 -> idx=0
1194 // -6 -> idx=0
1195 //
1196
1197 // ==================================================================
1198
1199 const time_t actTime = time(NULL);
1200 if (actTime == gi_SecTime)
1201 {
1202#if !defined(USE_SELECT) && !defined(USE_EPOLL)
1203 if (evtCtrl.size()==0)
1204 usleep(1);
1205#endif
1206 continue;
1207 }
1208 gi_SecTime = actTime;
1209
1210 // ==================================================================
1211 //loop over all active events and flag those older than read-timeout
1212 //delete those that are written to disk ....
1213 //const int count = evtCtrl.size();
1214
1215 // This could be improved having the pointer which separates the queue with
1216 // the incomplete events from the queue with the complete events
1217 for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1218 {
1219 // A reference is enough because the shared_ptr is hold by the evtCtrl
1220 const shared_ptr<EVT_CTRL2> &evt = *it;
1221
1222 // The first event is the oldest. If the first event within the
1223 // timeout window was received, we can stop searchinf further.
1224 if (evt->time.tv_sec>=actTime - 30)
1225 break;
1226
1227 // This will result in the emission of a dim service.
1228 // It doesn't matter if that takes comparably long,
1229 // because we have to stop the run anyway.
1230 const uint64_t rep = reportIncomplete(evt, "timeout");
1231 factReportIncomplete(rep);
1232
1233 it++;
1234 evtCtrl.pop_front();
1235 }
1236
1237 // =================================================================
1238
1239 // If nothing was received for more than 5min, close file
1240 if (actTime-actrun->lastTime>300)
1241 actrun->maxEvt = actrun->lastEvt;
1242
1243 // This is a fake event to trigger possible run-closing conditions once a second
1244 // FIXME: This is not yet ideal because a file would never be closed
1245 // if a new file has been started and no events of the new file
1246 // have been received yet
1247 if (actrun->fileStat==kFileOpen)
1248 primaryQueue.post(shared_ptr<EVT_CTRL2>(new EVT_CTRL2(actrun)));
1249
1250 // =================================================================
1251
1252 gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
1253 gj.usdMem = Memory::max_inuse;
1254 gj.totMem = Memory::allocated;
1255
1256 gj.deltaT = 1000; // temporary, must be improved
1257
1258 for (int ib=0; ib<NBOARDS; ib++)
1259 {
1260 gj.rateBytes[ib] = rd[ib].rateBytes;
1261 gj.totBytes[ib] += rd[ib].rateBytes;
1262
1263 rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr);
1264
1265 gi_NumConnect[ib] = rd[ib].connected;
1266 gj.numConn[ib] = rd[ib].connected;
1267 }
1268
1269 factStat(gj);
1270
1271 Memory::max_inuse = 0;
1272
1273 for (int ib=0; ib<NBOARDS; ib++)
1274 rd[ib].rateBytes = 0;
1275 }
1276
1277 // 1: Stop, wait for event to get processed
1278 // 2: Stop, finish immediately
1279 // 101: Restart, wait for events to get processed
1280 // 101: Restart, finish immediately
1281 //
1282 const int gi_reset = g_reset;
1283
1284 const bool abort = gi_reset%100==2;
1285
1286 factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
1287
1288 primaryQueue.wait(abort);
1289 secondaryQueue.wait(abort);
1290
1291 // Here we also destroy all runCtrl structures and hence close all open files
1292 evtCtrl.clear();
1293
1294 factPrintf(MessageImp::kInfo, "Exit read Process...");
1295 factPrintf(MessageImp::kInfo, "%ld Bytes flagged as in-use.", Memory::inuse);
1296
1297 factStat(gj);
1298
1299 return gi_reset>=100;
1300}
1301
1302// ==========================================================================
1303// ==========================================================================
1304
1305void StartEvtBuild()
1306{
1307 factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
1308
1309
1310 for (int k=0; k<NBOARDS; k++)
1311 {
1312 gi_NumConnect[k] = 0;
1313 gj.numConn[k] = 0;
1314 gj.totBytes[k] = 0;
1315 }
1316
1317 gj.bufTot = gj.maxEvt = gj.xxxEvt = 0;
1318 gj.maxMem = gj.xxxMem = 0;
1319
1320 gj.usdMem = Memory::inuse;
1321 gj.totMem = Memory::allocated;
1322
1323 gj.bufNew = gj.bufEvt = 0;
1324 gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
1325 gj.readStat = gj.procStat = gj.writStat = 0;
1326
1327
1328
1329 READ_STRUCT rd[NBOARDS];
1330
1331 // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets)
1332 for (int i=0; i<NBOARDS; i++)
1333 rd[i].sockId = i;
1334
1335 while (mainloop(rd));
1336
1337 //must close all open sockets ...
1338 factPrintf(MessageImp::kInfo, "Close all sockets...");
1339
1340 READ_STRUCT::close();
1341
1342 // Now all sockets get closed. This is not reflected in gi_NumConnect
1343 // The current workaround is to count all sockets as closed when the thread is not running
1344}
Note: See TracBrowser for help on using the repository browser.