| 1 | #include <poll.h>
|
|---|
| 2 | #include <sys/time.h>
|
|---|
| 3 | #include <sys/epoll.h>
|
|---|
| 4 | #include <netinet/tcp.h>
|
|---|
| 5 |
|
|---|
| 6 | #include <cstring>
|
|---|
| 7 | #include <cstdarg>
|
|---|
| 8 | #include <list>
|
|---|
| 9 | #include <queue>
|
|---|
| 10 | #include <functional> // std::bind
|
|---|
| 11 |
|
|---|
| 12 | #include <boost/algorithm/string/join.hpp>
|
|---|
| 13 |
|
|---|
| 14 | #include "../externals/Queue.h"
|
|---|
| 15 |
|
|---|
| 16 | #include "MessageImp.h"
|
|---|
| 17 | #include "EventBuilder.h"
|
|---|
| 18 | #include "HeadersFAD.h"
|
|---|
| 19 |
|
|---|
| 20 | using namespace std;
|
|---|
| 21 |
|
|---|
| 22 | #define MIN_LEN 32 // min #bytes needed to interpret FADheader
|
|---|
| 23 | #define MAX_LEN 81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092 (data+boardheader+eventheader+endflag)
|
|---|
| 24 |
|
|---|
| 25 | //#define COMPLETE_EVENTS
|
|---|
| 26 | //#define USE_POLL
|
|---|
| 27 | //#define USE_EPOLL
|
|---|
| 28 | //#define USE_SELECT
|
|---|
| 29 | //#define COMPLETE_EPOLL
|
|---|
| 30 | //#define PRIORITY_QUEUE
|
|---|
| 31 |
|
|---|
| 32 | // Reading only 1024: 13: 77Hz, 87%
|
|---|
| 33 | // Reading only 1024: 12: 78Hz, 46%
|
|---|
| 34 | // Reading only 300: 4: 250Hz, 92%
|
|---|
| 35 | // Reading only 300: 3: 258Hz, 40%
|
|---|
| 36 |
|
|---|
| 37 | // Reading only four threads 1024: 13: 77Hz, 60%
|
|---|
| 38 | // Reading only four threads 1024: 12: 78Hz, 46%
|
|---|
| 39 | // Reading only four threads 300: 4: 250Hz, 92%
|
|---|
| 40 | // Reading only four threads 300: 3: 258Hz, 40%
|
|---|
| 41 |
|
|---|
| 42 | // Default 300: 4: 249Hz, 92%
|
|---|
| 43 | // Default 300: 3: 261Hz, 40%
|
|---|
| 44 | // Default 1024: 13: 76Hz, 93%
|
|---|
| 45 | // Default 1024: 12: 79Hz, 46%
|
|---|
| 46 |
|
|---|
| 47 | // Poll [selected] 1024: 13: 63Hz, 45%
|
|---|
| 48 | // Poll [selected] 1024: 14: 63Hz, 63%
|
|---|
| 49 | // Poll [selected] 1024: 15: 64Hz, 80%
|
|---|
| 50 | // Poll [selected] 300: 4: 230Hz, 47%
|
|---|
| 51 | // Poll [selected] 300: 3: 200Hz, 94%
|
|---|
| 52 |
|
|---|
| 53 | // Poll [all] 1024: 13: 65Hz, 47%
|
|---|
| 54 | // Poll [all] 1024: 14: 64Hz, 59%
|
|---|
| 55 | // Poll [all] 1024: 15: 62Hz, 67%
|
|---|
| 56 | // Poll [all] 300: 4: 230Hz, 47%
|
|---|
| 57 | // Poll [all] 300: 3: 230Hz, 35%
|
|---|
| 58 |
|
|---|
| 59 | // ==========================================================================
|
|---|
| 60 |
|
|---|
| 61 | bool runOpen(const EVT_CTRL2 &evt);
|
|---|
| 62 | bool runWrite(const EVT_CTRL2 &evt);
|
|---|
| 63 | void runClose(const EVT_CTRL2 &run);
|
|---|
| 64 | void applyCalib(const EVT_CTRL2 &evt, const size_t &size);
|
|---|
| 65 | void factOut(int severity, const char *message);
|
|---|
| 66 | void factReportIncomplete (uint64_t rep);
|
|---|
| 67 | void gotNewRun(RUN_CTRL2 &run);
|
|---|
| 68 | void runFinished();
|
|---|
| 69 | void factStat(const GUI_STAT &gj);
|
|---|
| 70 | bool eventCheck(const EVT_CTRL2 &evt);
|
|---|
| 71 | void debugHead(void *buf);
|
|---|
| 72 |
|
|---|
| 73 | // ==========================================================================
|
|---|
| 74 |
|
|---|
| 75 | int g_reset;
|
|---|
| 76 |
|
|---|
| 77 | size_t g_maxMem; //maximum memory allowed for buffer
|
|---|
| 78 |
|
|---|
| 79 | uint16_t g_evtTimeout; // timeout (sec) for one event
|
|---|
| 80 |
|
|---|
| 81 | FACT_SOCK g_port[NBOARDS]; // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd"
|
|---|
| 82 |
|
|---|
| 83 | uint gi_NumConnect[NBOARDS]; //4 crates * 10 boards
|
|---|
| 84 |
|
|---|
| 85 | GUI_STAT gj;
|
|---|
| 86 |
|
|---|
| 87 | // ==========================================================================
|
|---|
| 88 |
|
|---|
| 89 | namespace Memory
|
|---|
| 90 | {
|
|---|
| 91 | uint64_t inuse = 0;
|
|---|
| 92 | uint64_t allocated = 0;
|
|---|
| 93 |
|
|---|
| 94 | uint64_t max_inuse = 0;
|
|---|
| 95 |
|
|---|
| 96 | std::mutex mtx;
|
|---|
| 97 |
|
|---|
| 98 | std::forward_list<void*> memory;
|
|---|
| 99 |
|
|---|
| 100 | void *malloc()
|
|---|
| 101 | {
|
|---|
| 102 | // No free slot available, next alloc would exceed max memory
|
|---|
| 103 | if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem)
|
|---|
| 104 | return NULL;
|
|---|
| 105 |
|
|---|
| 106 | // We will return this amount of memory
|
|---|
| 107 | // This is not 100% thread safe, but it is not a super accurate measure anyway
|
|---|
| 108 | inuse += MAX_TOT_MEM;
|
|---|
| 109 | if (inuse>max_inuse)
|
|---|
| 110 | max_inuse = inuse;
|
|---|
| 111 |
|
|---|
| 112 | if (memory.empty())
|
|---|
| 113 | {
|
|---|
| 114 | // No free slot available, allocate a new one
|
|---|
| 115 | allocated += MAX_TOT_MEM;
|
|---|
| 116 | return new char[MAX_TOT_MEM];
|
|---|
| 117 | }
|
|---|
| 118 |
|
|---|
| 119 | // Get the next free slot from the stack and return it
|
|---|
| 120 | const std::lock_guard<std::mutex> lock(mtx);
|
|---|
| 121 |
|
|---|
| 122 | void *mem = memory.front();
|
|---|
| 123 | memory.pop_front();
|
|---|
| 124 | return mem;
|
|---|
| 125 | };
|
|---|
| 126 |
|
|---|
| 127 | void free(void *mem)
|
|---|
| 128 | {
|
|---|
| 129 | if (!mem)
|
|---|
| 130 | return;
|
|---|
| 131 |
|
|---|
| 132 | // Decrease the amont of memory in use accordingly
|
|---|
| 133 | inuse -= MAX_TOT_MEM;
|
|---|
| 134 |
|
|---|
| 135 | // If the maximum memory has changed, we might be over the limit.
|
|---|
| 136 | // In this case: free a slot
|
|---|
| 137 | if (allocated>g_maxMem)
|
|---|
| 138 | {
|
|---|
| 139 | delete [] (char*)mem;
|
|---|
| 140 | allocated -= MAX_TOT_MEM;
|
|---|
| 141 | return;
|
|---|
| 142 | }
|
|---|
| 143 |
|
|---|
| 144 | const std::lock_guard<std::mutex> lock(mtx);
|
|---|
| 145 | memory.push_front(mem);
|
|---|
| 146 | }
|
|---|
| 147 |
|
|---|
| 148 | };
|
|---|
| 149 |
|
|---|
| 150 | // ==========================================================================
|
|---|
| 151 |
|
|---|
| 152 | __attribute__((__format__ (__printf__, 2, 0)))
|
|---|
| 153 | void factPrintf(int severity, const char *fmt, ...)
|
|---|
| 154 | {
|
|---|
| 155 | char str[1000];
|
|---|
| 156 |
|
|---|
| 157 | va_list ap;
|
|---|
| 158 | va_start(ap, fmt);
|
|---|
| 159 | vsnprintf(str, 1000, fmt, ap);
|
|---|
| 160 | va_end(ap);
|
|---|
| 161 |
|
|---|
| 162 | factOut(severity, str);
|
|---|
| 163 | }
|
|---|
| 164 |
|
|---|
| 165 | // ==========================================================================
|
|---|
| 166 |
|
|---|
| 167 | struct READ_STRUCT
|
|---|
| 168 | {
|
|---|
| 169 | enum buftyp_t
|
|---|
| 170 | {
|
|---|
| 171 | kStream,
|
|---|
| 172 | kHeader,
|
|---|
| 173 | kData,
|
|---|
| 174 | #ifdef COMPLETE_EVENTS
|
|---|
| 175 | kWait
|
|---|
| 176 | #endif
|
|---|
| 177 | };
|
|---|
| 178 |
|
|---|
| 179 | // ---------- connection ----------
|
|---|
| 180 |
|
|---|
| 181 | static uint activeSockets;
|
|---|
| 182 |
|
|---|
| 183 | int sockId; // socket id (board number)
|
|---|
| 184 | int socket; // socket handle
|
|---|
| 185 | bool connected; // is this socket connected?
|
|---|
| 186 |
|
|---|
| 187 | struct sockaddr_in SockAddr; // Socket address copied from wrapper during socket creation
|
|---|
| 188 |
|
|---|
| 189 | // ------------ epoll -------------
|
|---|
| 190 |
|
|---|
| 191 | static int fd_epoll;
|
|---|
| 192 | static epoll_event events[NBOARDS];
|
|---|
| 193 |
|
|---|
| 194 | static void init();
|
|---|
| 195 | static void close();
|
|---|
| 196 | static int wait();
|
|---|
| 197 | static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); }
|
|---|
| 198 |
|
|---|
| 199 | // ------------ buffer ------------
|
|---|
| 200 |
|
|---|
| 201 | buftyp_t bufTyp; // what are we reading at the moment: 0=header 1=data -1=skip ...
|
|---|
| 202 |
|
|---|
| 203 | uint32_t bufLen; // number of bytes left to read
|
|---|
| 204 | uint8_t *bufPos; // next byte to read to the buffer next
|
|---|
| 205 |
|
|---|
| 206 | union
|
|---|
| 207 | {
|
|---|
| 208 | uint8_t B[MAX_LEN];
|
|---|
| 209 | uint16_t S[MAX_LEN / 2];
|
|---|
| 210 | uint32_t I[MAX_LEN / 4];
|
|---|
| 211 | uint64_t L[MAX_LEN / 8];
|
|---|
| 212 | PEVNT_HEADER H;
|
|---|
| 213 | };
|
|---|
| 214 |
|
|---|
| 215 | timeval time;
|
|---|
| 216 | uint64_t totBytes; // total received bytes
|
|---|
| 217 | uint64_t relBytes; // total released bytes
|
|---|
| 218 | uint32_t skip; // number of bytes skipped before start of event
|
|---|
| 219 |
|
|---|
| 220 | uint32_t len() const { return uint32_t(H.package_length)*2; }
|
|---|
| 221 |
|
|---|
| 222 | void swapHeader();
|
|---|
| 223 | void swapData();
|
|---|
| 224 |
|
|---|
| 225 | // --------------------------------
|
|---|
| 226 |
|
|---|
| 227 | READ_STRUCT() : socket(-1), connected(false), totBytes(0), relBytes(0)
|
|---|
| 228 | {
|
|---|
| 229 | if (fd_epoll<0)
|
|---|
| 230 | init();
|
|---|
| 231 | }
|
|---|
| 232 | ~READ_STRUCT()
|
|---|
| 233 | {
|
|---|
| 234 | destroy();
|
|---|
| 235 | }
|
|---|
| 236 |
|
|---|
| 237 | void destroy();
|
|---|
| 238 | bool create(sockaddr_in addr);
|
|---|
| 239 | bool check(int, sockaddr_in addr);
|
|---|
| 240 | bool read();
|
|---|
| 241 |
|
|---|
| 242 | };
|
|---|
| 243 |
|
|---|
| 244 | #ifdef PRIORITY_QUEUE
|
|---|
| 245 | struct READ_STRUCTcomp
|
|---|
| 246 | {
|
|---|
| 247 | bool operator()(const READ_STRUCT *r1, const READ_STRUCT *r2)
|
|---|
| 248 | {
|
|---|
| 249 | const int64_t rel1 = r1->totBytes - r1->relBytes;
|
|---|
| 250 | const int64_t rel2 = r2->totBytes - r2->relBytes;
|
|---|
| 251 | return rel1 > rel2;
|
|---|
| 252 | }
|
|---|
| 253 | };
|
|---|
| 254 | #endif
|
|---|
| 255 |
|
|---|
| 256 | int READ_STRUCT::wait()
|
|---|
| 257 | {
|
|---|
| 258 | // wait for something to do...
|
|---|
| 259 | const int rc = epoll_wait(fd_epoll, events, NBOARDS, 100); // max, timeout[ms]
|
|---|
| 260 | if (rc>=0)
|
|---|
| 261 | return rc;
|
|---|
| 262 |
|
|---|
| 263 | if (errno==EINTR) // timout or signal interruption
|
|---|
| 264 | return 0;
|
|---|
| 265 |
|
|---|
| 266 | factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno);
|
|---|
| 267 | return -1;
|
|---|
| 268 | }
|
|---|
| 269 |
|
|---|
| 270 | uint READ_STRUCT::activeSockets = 0;
|
|---|
| 271 | int READ_STRUCT::fd_epoll = -1;
|
|---|
| 272 | epoll_event READ_STRUCT::events[NBOARDS];
|
|---|
| 273 |
|
|---|
| 274 | void READ_STRUCT::init()
|
|---|
| 275 | {
|
|---|
| 276 | if (fd_epoll>=0)
|
|---|
| 277 | return;
|
|---|
| 278 |
|
|---|
| 279 | #ifdef USE_EPOLL
|
|---|
| 280 | fd_epoll = epoll_create(NBOARDS);
|
|---|
| 281 | if (fd_epoll<0)
|
|---|
| 282 | {
|
|---|
| 283 | factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno);
|
|---|
| 284 | return;
|
|---|
| 285 | }
|
|---|
| 286 | #endif
|
|---|
| 287 | }
|
|---|
| 288 |
|
|---|
| 289 | void READ_STRUCT::close()
|
|---|
| 290 | {
|
|---|
| 291 | #ifdef USE_EPOLL
|
|---|
| 292 | if (fd_epoll>=0 && ::close(fd_epoll)>0)
|
|---|
| 293 | factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno);
|
|---|
| 294 | #endif
|
|---|
| 295 |
|
|---|
| 296 | fd_epoll = -1;
|
|---|
| 297 | }
|
|---|
| 298 |
|
|---|
| 299 | bool READ_STRUCT::create(sockaddr_in sockAddr)
|
|---|
| 300 | {
|
|---|
| 301 | if (socket>=0)
|
|---|
| 302 | return false;
|
|---|
| 303 |
|
|---|
| 304 | const int port = ntohs(sockAddr.sin_port) + 1;
|
|---|
| 305 |
|
|---|
| 306 | SockAddr.sin_family = sockAddr.sin_family;
|
|---|
| 307 | SockAddr.sin_addr = sockAddr.sin_addr;
|
|---|
| 308 | SockAddr.sin_port = htons(port);
|
|---|
| 309 |
|
|---|
| 310 | if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
|
|---|
| 311 | {
|
|---|
| 312 | factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno);
|
|---|
| 313 | socket = -1;
|
|---|
| 314 | return false;
|
|---|
| 315 | }
|
|---|
| 316 |
|
|---|
| 317 | int optval = 1;
|
|---|
| 318 | if (setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(int)) < 0)
|
|---|
| 319 | factPrintf(MessageImp::kInfo, "Setting TCP_NODELAY for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
|
|---|
| 320 |
|
|---|
| 321 | optval = 1;
|
|---|
| 322 | if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0)
|
|---|
| 323 | factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
|
|---|
| 324 |
|
|---|
| 325 | optval = 10; //start after 10 seconds
|
|---|
| 326 | if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0)
|
|---|
| 327 | factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
|
|---|
| 328 |
|
|---|
| 329 | optval = 10; //do every 10 seconds
|
|---|
| 330 | if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0)
|
|---|
| 331 | factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
|
|---|
| 332 |
|
|---|
| 333 | optval = 2; //close after 2 unsuccessful tries
|
|---|
| 334 | if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0)
|
|---|
| 335 | factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
|
|---|
| 336 |
|
|---|
| 337 | factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket);
|
|---|
| 338 |
|
|---|
| 339 | //connected = false;
|
|---|
| 340 | activeSockets++;
|
|---|
| 341 |
|
|---|
| 342 | return true;
|
|---|
| 343 | }
|
|---|
| 344 |
|
|---|
| 345 | void READ_STRUCT::destroy()
|
|---|
| 346 | {
|
|---|
| 347 | if (socket<0)
|
|---|
| 348 | return;
|
|---|
| 349 |
|
|---|
| 350 | #ifdef USE_EPOLL
|
|---|
| 351 | // strictly speaking this should not be necessary
|
|---|
| 352 | if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
|
|---|
| 353 | factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
|
|---|
| 354 | #endif
|
|---|
| 355 |
|
|---|
| 356 | if (::close(socket) > 0)
|
|---|
| 357 | factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
|
|---|
| 358 | else
|
|---|
| 359 | factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket);
|
|---|
| 360 |
|
|---|
| 361 | // Set the socket to "not connected"
|
|---|
| 362 | socket = -1;
|
|---|
| 363 | connected = false;
|
|---|
| 364 | activeSockets--;
|
|---|
| 365 | bufLen = 0;
|
|---|
| 366 | }
|
|---|
| 367 |
|
|---|
| 368 | bool READ_STRUCT::check(int sockDef, sockaddr_in addr)
|
|---|
| 369 | {
|
|---|
| 370 | // Continue in the most most likely case (performance)
|
|---|
| 371 | //if (socket>=0 && sockDef!=0 && connected)
|
|---|
| 372 | // return;
|
|---|
| 373 | const int old = socket;
|
|---|
| 374 |
|
|---|
| 375 | // socket open, but should not be open
|
|---|
| 376 | if (socket>=0 && sockDef==0)
|
|---|
| 377 | destroy();
|
|---|
| 378 |
|
|---|
| 379 | // Socket closed, but should be open
|
|---|
| 380 | if (socket<0 && sockDef!=0)
|
|---|
| 381 | create(addr); //generate address and socket
|
|---|
| 382 |
|
|---|
| 383 | const bool retval = old!=socket;
|
|---|
| 384 |
|
|---|
| 385 | // Socket closed
|
|---|
| 386 | if (socket<0)
|
|---|
| 387 | return retval;
|
|---|
| 388 |
|
|---|
| 389 | // Socket open and connected: Nothing to do
|
|---|
| 390 | if (connected)
|
|---|
| 391 | return retval;
|
|---|
| 392 |
|
|---|
| 393 | //try to connect if not yet done
|
|---|
| 394 | const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
|
|---|
| 395 | if (rc == -1)
|
|---|
| 396 | return retval;
|
|---|
| 397 |
|
|---|
| 398 | connected = true;
|
|---|
| 399 |
|
|---|
| 400 | if (sockDef<0)
|
|---|
| 401 | {
|
|---|
| 402 | bufTyp = READ_STRUCT::kStream; // full data to be skipped
|
|---|
| 403 | bufLen = MAX_LEN; // huge for skipping
|
|---|
| 404 | }
|
|---|
| 405 | else
|
|---|
| 406 | {
|
|---|
| 407 | bufTyp = READ_STRUCT::kHeader; // expect a header
|
|---|
| 408 | bufLen = sizeof(PEVNT_HEADER); // max size to read at begining
|
|---|
| 409 | }
|
|---|
| 410 |
|
|---|
| 411 | bufPos = B; // no byte read so far
|
|---|
| 412 | skip = 0; // start empty
|
|---|
| 413 | totBytes = 0;
|
|---|
| 414 | relBytes = 0;
|
|---|
| 415 |
|
|---|
| 416 | factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
|
|---|
| 417 |
|
|---|
| 418 | #ifdef USE_EPOLL
|
|---|
| 419 | epoll_event ev;
|
|---|
| 420 | ev.events = EPOLLIN;
|
|---|
| 421 | ev.data.ptr = this; // user data (union: ev.ptr)
|
|---|
| 422 | if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
|
|---|
| 423 | factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
|
|---|
| 424 | #endif
|
|---|
| 425 |
|
|---|
| 426 | return retval;
|
|---|
| 427 | }
|
|---|
| 428 |
|
|---|
| 429 | bool READ_STRUCT::read()
|
|---|
| 430 | {
|
|---|
| 431 | if (!connected)
|
|---|
| 432 | return false;
|
|---|
| 433 |
|
|---|
| 434 | if (bufLen==0)
|
|---|
| 435 | return true;
|
|---|
| 436 |
|
|---|
| 437 | const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
|
|---|
| 438 | // recv failed
|
|---|
| 439 | if (jrd<0)
|
|---|
| 440 | {
|
|---|
| 441 | // There was just nothing waiting
|
|---|
| 442 | if (errno==EWOULDBLOCK || errno==EAGAIN)
|
|---|
| 443 | return false;
|
|---|
| 444 |
|
|---|
| 445 | factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno);
|
|---|
| 446 | return false;
|
|---|
| 447 | }
|
|---|
| 448 |
|
|---|
| 449 | // connection was closed ...
|
|---|
| 450 | if (jrd==0)
|
|---|
| 451 | {
|
|---|
| 452 | factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId);
|
|---|
| 453 |
|
|---|
| 454 | destroy();//DestroySocket(rd[i]); //generate address and socket
|
|---|
| 455 | return false;
|
|---|
| 456 | }
|
|---|
| 457 |
|
|---|
| 458 | totBytes += jrd;
|
|---|
| 459 |
|
|---|
| 460 | // are we skipping this board ...
|
|---|
| 461 | if (bufTyp==kStream)
|
|---|
| 462 | return false;
|
|---|
| 463 |
|
|---|
| 464 | if (bufPos==B)
|
|---|
| 465 | gettimeofday(&time, NULL);
|
|---|
| 466 |
|
|---|
| 467 | bufPos += jrd; //==> prepare for continuation
|
|---|
| 468 | bufLen -= jrd;
|
|---|
| 469 |
|
|---|
| 470 | // not yet all read
|
|---|
| 471 | return bufLen==0;
|
|---|
| 472 | }
|
|---|
| 473 |
|
|---|
| 474 | void READ_STRUCT::swapHeader()
|
|---|
| 475 | {
|
|---|
| 476 | S[1] = ntohs(S[1]); // package_length (bytes not swapped!)
|
|---|
| 477 | S[2] = ntohs(S[2]); // version_no
|
|---|
| 478 | S[3] = ntohs(S[3]); // PLLLCK
|
|---|
| 479 | S[4] = ntohs(S[4]); // trigger_crc
|
|---|
| 480 | S[5] = ntohs(S[5]); // trigger_type
|
|---|
| 481 |
|
|---|
| 482 | I[3] = ntohl(I[3]); // trigger_id
|
|---|
| 483 | I[4] = ntohl(I[4]); // fad_evt_counter
|
|---|
| 484 | I[5] = ntohl(I[5]); // REFCLK_frequency
|
|---|
| 485 |
|
|---|
| 486 | S[12] = ntohs(S[12]); // board id
|
|---|
| 487 | S[13] = ntohs(S[13]); // adc_clock_phase_shift
|
|---|
| 488 | S[14] = ntohs(S[14]); // number_of_triggers_to_generate
|
|---|
| 489 | S[15] = ntohs(S[15]); // trigger_generator_prescaler
|
|---|
| 490 |
|
|---|
| 491 | I[10] = ntohl(I[10]); // runnumber;
|
|---|
| 492 | I[11] = ntohl(I[11]); // time;
|
|---|
| 493 |
|
|---|
| 494 | // Use back inserter??
|
|---|
| 495 | for (int s=24; s<24+NTemp+NDAC; s++)
|
|---|
| 496 | S[s] = ntohs(S[s]); // drs_temperature / dac
|
|---|
| 497 | }
|
|---|
| 498 |
|
|---|
| 499 | void READ_STRUCT::swapData()
|
|---|
| 500 | {
|
|---|
| 501 | // swapEventHeaderBytes: End of the header. to channels now
|
|---|
| 502 |
|
|---|
| 503 | int i = 36;
|
|---|
| 504 | for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
|
|---|
| 505 | {
|
|---|
| 506 | S[i+0] = ntohs(S[i+0]);//id
|
|---|
| 507 | S[i+1] = ntohs(S[i+1]);//start_cell
|
|---|
| 508 | S[i+2] = ntohs(S[i+2]);//roi
|
|---|
| 509 | S[i+3] = ntohs(S[i+3]);//filling
|
|---|
| 510 |
|
|---|
| 511 | i += 4+S[i+2];//skip the pixel data
|
|---|
| 512 | }
|
|---|
| 513 | }
|
|---|
| 514 |
|
|---|
| 515 | // ==========================================================================
|
|---|
| 516 |
|
|---|
| 517 | bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
|
|---|
| 518 | {
|
|---|
| 519 | int xjr = -1;
|
|---|
| 520 | int xkr = -1;
|
|---|
| 521 |
|
|---|
| 522 | //points to the very first roi
|
|---|
| 523 | int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
|
|---|
| 524 |
|
|---|
| 525 | roi[0] = ntohs(rd.S[roiPtr]);
|
|---|
| 526 |
|
|---|
| 527 | for (int jr = 0; jr < 9; jr++)
|
|---|
| 528 | {
|
|---|
| 529 | roi[jr] = ntohs(rd.S[roiPtr]);
|
|---|
| 530 |
|
|---|
| 531 | if (roi[jr]>1024)
|
|---|
| 532 | {
|
|---|
| 533 | factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]);
|
|---|
| 534 | return false;
|
|---|
| 535 | }
|
|---|
| 536 |
|
|---|
| 537 | // Check that the roi of pixels jr are compatible with the one of pixel 0
|
|---|
| 538 | if (jr!=8 && roi[jr]!=roi[0])
|
|---|
| 539 | {
|
|---|
| 540 | xjr = jr;
|
|---|
| 541 | break;
|
|---|
| 542 | }
|
|---|
| 543 |
|
|---|
| 544 | // Check that the roi of all other DRS chips on boards are compatible
|
|---|
| 545 | for (int kr = 1; kr < 4; kr++)
|
|---|
| 546 | {
|
|---|
| 547 | const int kroi = ntohs(rd.S[roiPtr]);
|
|---|
| 548 | if (kroi != roi[jr])
|
|---|
| 549 | {
|
|---|
| 550 | xjr = jr;
|
|---|
| 551 | xkr = kr;
|
|---|
| 552 | break;
|
|---|
| 553 | }
|
|---|
| 554 | roiPtr += kroi+4;
|
|---|
| 555 | }
|
|---|
| 556 | }
|
|---|
| 557 |
|
|---|
| 558 | if (xjr>=0)
|
|---|
| 559 | {
|
|---|
| 560 | if (xkr<0)
|
|---|
| 561 | factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
|
|---|
| 562 | else
|
|---|
| 563 | factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr]));
|
|---|
| 564 |
|
|---|
| 565 | return false;
|
|---|
| 566 | }
|
|---|
| 567 |
|
|---|
| 568 | if (roi[8] < roi[0])
|
|---|
| 569 | {
|
|---|
| 570 | factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
|
|---|
| 571 | return false;
|
|---|
| 572 | }
|
|---|
| 573 |
|
|---|
| 574 | return true;
|
|---|
| 575 | }
|
|---|
| 576 |
|
|---|
| 577 | list<shared_ptr<EVT_CTRL2>> evtCtrl;
|
|---|
| 578 |
|
|---|
| 579 | shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
|
|---|
| 580 | {
|
|---|
| 581 | /*
|
|---|
| 582 | checkroi consistence
|
|---|
| 583 | find existing entry
|
|---|
| 584 | if no entry, try to allocate memory
|
|---|
| 585 | if entry and memory, init event structure
|
|---|
| 586 | */
|
|---|
| 587 |
|
|---|
| 588 | uint16_t nRoi[9];
|
|---|
| 589 | if (!checkRoiConsistency(rd, nRoi))
|
|---|
| 590 | return shared_ptr<EVT_CTRL2>();
|
|---|
| 591 |
|
|---|
| 592 | for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
|
|---|
| 593 | {
|
|---|
| 594 | // A reference is enough because the evtCtrl holds the shared_ptr anyway
|
|---|
| 595 | const shared_ptr<EVT_CTRL2> &evt = *it;
|
|---|
| 596 |
|
|---|
| 597 | // If the run is different, go on searching.
|
|---|
| 598 | // We cannot stop searching if a lower run-id is found as in
|
|---|
| 599 | // the case of the events, because theoretically, there
|
|---|
| 600 | // can be the same run on two different days.
|
|---|
| 601 | if (rd.H.runnumber != evt->runNum)
|
|---|
| 602 | continue;
|
|---|
| 603 |
|
|---|
| 604 | // If the ID of the new event if higher than the last one stored
|
|---|
| 605 | // in that run, we have to assign a new slot (leave the loop)
|
|---|
| 606 | if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/)
|
|---|
| 607 | break;
|
|---|
| 608 |
|
|---|
| 609 | if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/)
|
|---|
| 610 | continue;
|
|---|
| 611 |
|
|---|
| 612 | // We have found an entry with the same runID and evtID
|
|---|
| 613 | // Check if ROI is consistent
|
|---|
| 614 | if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
|
|---|
| 615 | {
|
|---|
| 616 | factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
|
|---|
| 617 | evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
|
|---|
| 618 | return shared_ptr<EVT_CTRL2>();
|
|---|
| 619 | }
|
|---|
| 620 |
|
|---|
| 621 | // It is maybe not likely, but the header of this board might have
|
|---|
| 622 | // arrived earlier. (We could also update the run-info, but
|
|---|
| 623 | // this should not make a difference here)
|
|---|
| 624 | if ((rd.time.tv_sec==evt->time.tv_sec && rd.time.tv_usec<evt->time.tv_usec) ||
|
|---|
| 625 | rd.time.tv_sec<evt->time.tv_sec)
|
|---|
| 626 | evt->time = rd.time;
|
|---|
| 627 |
|
|---|
| 628 | //everything seems fine so far ==> use this slot ....
|
|---|
| 629 | return evt;
|
|---|
| 630 | }
|
|---|
| 631 |
|
|---|
| 632 | if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
|
|---|
| 633 | {
|
|---|
| 634 | factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
|
|---|
| 635 | actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter);
|
|---|
| 636 | return shared_ptr<EVT_CTRL2>();
|
|---|
| 637 | }
|
|---|
| 638 |
|
|---|
| 639 | EVT_CTRL2 *evt = new EVT_CTRL2;
|
|---|
| 640 |
|
|---|
| 641 | evt->time = rd.time;
|
|---|
| 642 |
|
|---|
| 643 | evt->runNum = rd.H.runnumber;
|
|---|
| 644 | evt->evNum = rd.H.fad_evt_counter;
|
|---|
| 645 |
|
|---|
| 646 | evt->trgNum = rd.H.trigger_id;
|
|---|
| 647 | evt->trgTyp = rd.H.trigger_type;
|
|---|
| 648 |
|
|---|
| 649 | evt->nRoi = nRoi[0];
|
|---|
| 650 | evt->nRoiTM = nRoi[8];
|
|---|
| 651 |
|
|---|
| 652 | //evt->firstBoard = rd.sockId;
|
|---|
| 653 |
|
|---|
| 654 | const bool newrun = actrun->runId != rd.H.runnumber;
|
|---|
| 655 | if (newrun)
|
|---|
| 656 | {
|
|---|
| 657 | factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d",
|
|---|
| 658 | rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
|
|---|
| 659 |
|
|---|
| 660 | // The new run is the active run now
|
|---|
| 661 | actrun = make_shared<RUN_CTRL2>();
|
|---|
| 662 |
|
|---|
| 663 | const time_t &tsec = evt->time.tv_sec;
|
|---|
| 664 |
|
|---|
| 665 | actrun->openTime = tsec;
|
|---|
| 666 | actrun->closeTime = tsec + 3600 * 24; // max time allowed
|
|---|
| 667 | actrun->runId = rd.H.runnumber;
|
|---|
| 668 | actrun->roi0 = nRoi[0]; // FIXME: Make obsolete!
|
|---|
| 669 | actrun->roi8 = nRoi[8]; // FIXME: Make obsolete!
|
|---|
| 670 |
|
|---|
| 671 | // Signal the fadctrl that a new run has been started
|
|---|
| 672 | // Note this is the only place at which we can ensure that
|
|---|
| 673 | // gotnewRun is called only once
|
|---|
| 674 | gotNewRun(*actrun);
|
|---|
| 675 | }
|
|---|
| 676 |
|
|---|
| 677 | // Keep pointer to run of this event
|
|---|
| 678 | evt->runCtrl = actrun;
|
|---|
| 679 |
|
|---|
| 680 | // Increase the number of events we have started to receive in this run
|
|---|
| 681 | actrun->lastTime = evt->time.tv_sec; // Time when the last event was received
|
|---|
| 682 | actrun->lastEvt++;
|
|---|
| 683 |
|
|---|
| 684 | // An event can be the first and the last, but not the last and the first.
|
|---|
| 685 | // Therefore gotNewRun is called before runFinished.
|
|---|
| 686 | // runFinished signals that the last event of a run was just received. Processing
|
|---|
| 687 | // might still be ongoing, but we can start a new run.
|
|---|
| 688 | const bool cond1 = actrun->lastEvt < actrun->maxEvt; // max number of events not reached
|
|---|
| 689 | const bool cond2 = actrun->lastTime < actrun->closeTime; // max time not reached
|
|---|
| 690 | if (!cond1 || !cond2)
|
|---|
| 691 | runFinished();
|
|---|
| 692 |
|
|---|
| 693 | // We don't mind here that this is not common to all events,
|
|---|
| 694 | // because every coming event will fullfil the condition as well.
|
|---|
| 695 | if (!cond1)
|
|---|
| 696 | evt->closeRequest |= kRequestMaxEvtsReached;
|
|---|
| 697 | if (!cond2)
|
|---|
| 698 | evt->closeRequest |= kRequestMaxTimeReached;
|
|---|
| 699 |
|
|---|
| 700 | // Secure access to evtCtrl against access in CloseRunFile
|
|---|
| 701 | // This should be the last... otherwise we can run into threading issues
|
|---|
| 702 | // if the event is accessed before it is fully initialized.
|
|---|
| 703 | evtCtrl.emplace_back(evt);
|
|---|
| 704 | return evtCtrl.back();
|
|---|
| 705 | }
|
|---|
| 706 |
|
|---|
| 707 |
|
|---|
| 708 | void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
|
|---|
| 709 | {
|
|---|
| 710 | const int i = rBuf.sockId;
|
|---|
| 711 |
|
|---|
| 712 | memcpy(evt->FADhead+i, &rBuf.H, sizeof(PEVNT_HEADER));
|
|---|
| 713 |
|
|---|
| 714 | int src = sizeof(PEVNT_HEADER) / 2; // Header is 72 byte = 36 shorts
|
|---|
| 715 |
|
|---|
| 716 | // consistency of ROIs have been checked already (is it all correct?)
|
|---|
| 717 | const uint16_t &roi = rBuf.S[src+2];
|
|---|
| 718 |
|
|---|
| 719 | // different sort in FAD board.....
|
|---|
| 720 | EVENT *event = evt->fEvent;
|
|---|
| 721 | for (int px = 0; px < 9; px++)
|
|---|
| 722 | {
|
|---|
| 723 | for (int drs = 0; drs < 4; drs++)
|
|---|
| 724 | {
|
|---|
| 725 | const int16_t pixC = rBuf.S[src+1]; // start-cell
|
|---|
| 726 | const int16_t pixR = rBuf.S[src+2]; // roi
|
|---|
| 727 | //here we should check if pixH is correct ....
|
|---|
| 728 |
|
|---|
| 729 | const int pixS = i*36 + drs*9 + px;
|
|---|
| 730 |
|
|---|
| 731 | event->StartPix[pixS] = pixC;
|
|---|
| 732 |
|
|---|
| 733 | memcpy(event->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2);
|
|---|
| 734 |
|
|---|
| 735 | src += 4+pixR;
|
|---|
| 736 |
|
|---|
| 737 | // Treatment for ch 9 (TM channel)
|
|---|
| 738 | if (px != 8)
|
|---|
| 739 | continue;
|
|---|
| 740 |
|
|---|
| 741 | const int tmS = i*4 + drs;
|
|---|
| 742 |
|
|---|
| 743 | //and we have additional TM info
|
|---|
| 744 | if (pixR > roi)
|
|---|
| 745 | {
|
|---|
| 746 | event->StartTM[tmS] = (pixC + pixR - roi) % 1024;
|
|---|
| 747 |
|
|---|
| 748 | memcpy(event->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2);
|
|---|
| 749 | }
|
|---|
| 750 | else
|
|---|
| 751 | {
|
|---|
| 752 | event->StartTM[tmS] = -1;
|
|---|
| 753 | }
|
|---|
| 754 | }
|
|---|
| 755 | }
|
|---|
| 756 | }
|
|---|
| 757 |
|
|---|
| 758 | // ==========================================================================
|
|---|
| 759 |
|
|---|
| 760 | uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt)
|
|---|
| 761 | {
|
|---|
| 762 | factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)",
|
|---|
| 763 | evt->runNum, evt->evNum, evtCtrl.size(), txt);
|
|---|
| 764 |
|
|---|
| 765 | uint64_t report = 0;
|
|---|
| 766 |
|
|---|
| 767 | char str[1000];
|
|---|
| 768 |
|
|---|
| 769 | int ik=0;
|
|---|
| 770 | for (int ib=0; ib<NBOARDS; ib++)
|
|---|
| 771 | {
|
|---|
| 772 | if (ib%10==0)
|
|---|
| 773 | str[ik++] = '|';
|
|---|
| 774 |
|
|---|
| 775 | const int jb = evt->board[ib];
|
|---|
| 776 | if (jb>=0) // data received from that board
|
|---|
| 777 | {
|
|---|
| 778 | str[ik++] = '0'+(jb%10);
|
|---|
| 779 | continue;
|
|---|
| 780 | }
|
|---|
| 781 |
|
|---|
| 782 | // FIXME: This is not synchronous... it reports
|
|---|
| 783 | // accoridng to the current connection status, not w.r.t. to the
|
|---|
| 784 | // one when the event was taken.
|
|---|
| 785 | if (gi_NumConnect[ib]==0) // board not connected
|
|---|
| 786 | {
|
|---|
| 787 | str[ik++] = 'x';
|
|---|
| 788 | continue;
|
|---|
| 789 | }
|
|---|
| 790 |
|
|---|
| 791 | // data from this board lost
|
|---|
| 792 | str[ik++] = '.';
|
|---|
| 793 | report |= ((uint64_t)1)<<ib;
|
|---|
| 794 | }
|
|---|
| 795 |
|
|---|
| 796 | str[ik++] = '|';
|
|---|
| 797 | str[ik] = 0;
|
|---|
| 798 |
|
|---|
| 799 | factOut(MessageImp::kWarn, str);
|
|---|
| 800 |
|
|---|
| 801 | return report;
|
|---|
| 802 | }
|
|---|
| 803 |
|
|---|
| 804 | // ==========================================================================
|
|---|
| 805 | // ==========================================================================
|
|---|
| 806 |
|
|---|
| 807 | bool proc1(const shared_ptr<EVT_CTRL2> &);
|
|---|
| 808 |
|
|---|
| 809 | Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&proc1, placeholders::_1));
|
|---|
| 810 |
|
|---|
| 811 | bool proc1(const shared_ptr<EVT_CTRL2> &evt)
|
|---|
| 812 | {
|
|---|
| 813 | applyCalib(*evt, processingQueue1.size());
|
|---|
| 814 | return true;
|
|---|
| 815 | }
|
|---|
| 816 |
|
|---|
| 817 | // If this is not convenient anymore, it could be replaced by
|
|---|
| 818 | // a command queue, to which command+data is posted,
|
|---|
| 819 | // (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo)
|
|---|
| 820 | bool writeEvt(const shared_ptr<EVT_CTRL2> &evt)
|
|---|
| 821 | {
|
|---|
| 822 | //const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
|
|---|
| 823 | RUN_CTRL2 &run = *evt->runCtrl;
|
|---|
| 824 |
|
|---|
| 825 | // Is this a valid event or just an empty event to trigger run close?
|
|---|
| 826 | // If this is not an empty event open the new run-file
|
|---|
| 827 | // Empty events are there to trigger run-closing conditions
|
|---|
| 828 | if (evt->valid())
|
|---|
| 829 | {
|
|---|
| 830 | // File not yet open
|
|---|
| 831 | if (run.fileStat==kFileNotYetOpen)
|
|---|
| 832 | {
|
|---|
| 833 | // runOpen will close a previous run, if still open
|
|---|
| 834 | if (!runOpen(*evt))
|
|---|
| 835 | {
|
|---|
| 836 | factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
|
|---|
| 837 | run.fileStat = kFileClosed;
|
|---|
| 838 | return true;
|
|---|
| 839 | }
|
|---|
| 840 |
|
|---|
| 841 | factPrintf(MessageImp::kInfo, "Opened new file for data from run %d (evt=%d)", evt->runNum, evt->evNum);
|
|---|
| 842 | run.fileStat = kFileOpen;
|
|---|
| 843 | }
|
|---|
| 844 |
|
|---|
| 845 | // Here we have a valid calibration and can go on with that.
|
|---|
| 846 | // It is important that _all_ events are sent for calibration (except broken ones)
|
|---|
| 847 | processingQueue1.post(evt);
|
|---|
| 848 | }
|
|---|
| 849 |
|
|---|
| 850 | // File already closed
|
|---|
| 851 | if (run.fileStat==kFileClosed)
|
|---|
| 852 | return true;
|
|---|
| 853 |
|
|---|
| 854 | // If we will have a software trigger which prevents single events from writing,
|
|---|
| 855 | // the logic of writing the stop time and the trigger counters need to be adapted.
|
|---|
| 856 | // Currently it is just the values of the last valid event.
|
|---|
| 857 | bool rc1 = true;
|
|---|
| 858 | if (evt->valid())
|
|---|
| 859 | {
|
|---|
| 860 | rc1 = runWrite(*evt);
|
|---|
| 861 | if (!rc1)
|
|---|
| 862 | factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
|
|---|
| 863 | }
|
|---|
| 864 |
|
|---|
| 865 | // File not open... no need to close or to check for close
|
|---|
| 866 | // ... this is the case if CloseRunFile was called before any file was opened.
|
|---|
| 867 | if (run.fileStat!=kFileOpen)
|
|---|
| 868 | return true;
|
|---|
| 869 |
|
|---|
| 870 | // File is not yet to be closed.
|
|---|
| 871 | if (rc1 && evt->closeRequest==kRequestNone)
|
|---|
| 872 | return true;
|
|---|
| 873 |
|
|---|
| 874 | runClose(*evt);
|
|---|
| 875 | run.fileStat = kFileClosed;
|
|---|
| 876 |
|
|---|
| 877 | vector<string> reason;
|
|---|
| 878 | if (evt->closeRequest&kRequestManual)
|
|---|
| 879 | reason.emplace_back("close was requested");
|
|---|
| 880 | if (evt->closeRequest&kRequestTimeout)
|
|---|
| 881 | reason.emplace_back("receive timed out");
|
|---|
| 882 | if (evt->closeRequest&kRequestConnectionChange)
|
|---|
| 883 | reason.emplace_back("connection changed");
|
|---|
| 884 | if (evt->closeRequest&kRequestEventCheckFailed)
|
|---|
| 885 | reason.emplace_back("event check failed");
|
|---|
| 886 | if (evt->closeRequest&kRequestMaxTimeReached)
|
|---|
| 887 | reason.push_back(to_string(run.closeTime-run.openTime)+"s had been reached");
|
|---|
| 888 | if (evt->closeRequest&kRequestMaxEvtsReached)
|
|---|
| 889 | reason.push_back(to_string(run.maxEvt)+" evts had been reached");
|
|---|
| 890 | if (!rc1)
|
|---|
| 891 | reason.emplace_back("runWrite failed");
|
|---|
| 892 |
|
|---|
| 893 | const string str = boost::algorithm::join(reason, ", ");
|
|---|
| 894 | factPrintf(MessageImp::kInfo, "File %d was closed because %s", run.runId, str.c_str());
|
|---|
| 895 |
|
|---|
| 896 | return true;
|
|---|
| 897 | }
|
|---|
| 898 |
|
|---|
| 899 | Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1));
|
|---|
| 900 |
|
|---|
| 901 | bool procEvt(const shared_ptr<EVT_CTRL2> &evt)
|
|---|
| 902 | {
|
|---|
| 903 | RUN_CTRL2 &run = *evt->runCtrl;
|
|---|
| 904 |
|
|---|
| 905 | bool check = true;
|
|---|
| 906 | if (evt->valid())
|
|---|
| 907 | {
|
|---|
| 908 | EVENT *event = evt->fEvent;
|
|---|
| 909 |
|
|---|
| 910 | // This is already done in initMemory()
|
|---|
| 911 | //event->Roi = evt->runCtrl->roi0;
|
|---|
| 912 | //event->RoiTM = evt->runCtrl->roi8;
|
|---|
| 913 | //event->EventNum = evt->evNum;
|
|---|
| 914 | //event->TriggerNum = evt->trgNum;
|
|---|
| 915 | //event->TriggerType = evt->trgTyp;
|
|---|
| 916 |
|
|---|
| 917 | event->NumBoards = evt->nBoard;
|
|---|
| 918 |
|
|---|
| 919 | event->PCTime = evt->time.tv_sec;
|
|---|
| 920 | event->PCUsec = evt->time.tv_usec;
|
|---|
| 921 |
|
|---|
| 922 | for (int ib=0; ib<NBOARDS; ib++)
|
|---|
| 923 | event->BoardTime[ib] = evt->FADhead[ib].time;
|
|---|
| 924 |
|
|---|
| 925 | check = eventCheck(*evt);
|
|---|
| 926 |
|
|---|
| 927 | // If the event is valid, increase the trigger counter accordingly
|
|---|
| 928 | if (check)
|
|---|
| 929 | {
|
|---|
| 930 | // Physics trigger
|
|---|
| 931 | if (evt->trgTyp && !(evt->trgTyp & FAD::EventHeader::kAll))
|
|---|
| 932 | run.triggerCounter[0]++;
|
|---|
| 933 | // Pure pedestal trigger
|
|---|
| 934 | else if ((evt->trgTyp&FAD::EventHeader::kPedestal) && !(evt->trgTyp&FAD::EventHeader::kTIM))
|
|---|
| 935 | run.triggerCounter[1]++;
|
|---|
| 936 | // external light pulser trigger
|
|---|
| 937 | else if (evt->trgTyp & FAD::EventHeader::kLPext)
|
|---|
| 938 | run.triggerCounter[2]++;
|
|---|
| 939 | // time calibration triggers
|
|---|
| 940 | else if (evt->trgTyp & (FAD::EventHeader::kTIM|FAD::EventHeader::kPedestal))
|
|---|
| 941 | run.triggerCounter[3]++;
|
|---|
| 942 | // internal light pulser trigger
|
|---|
| 943 | else if (evt->trgTyp & FAD::EventHeader::kLPint)
|
|---|
| 944 | run.triggerCounter[4]++;
|
|---|
| 945 | // external trigger input 1
|
|---|
| 946 | else if (evt->trgTyp & FAD::EventHeader::kExt1)
|
|---|
| 947 | run.triggerCounter[5]++;
|
|---|
| 948 | // external trigger input 2
|
|---|
| 949 | else if (evt->trgTyp & FAD::EventHeader::kExt2)
|
|---|
| 950 | run.triggerCounter[6]++;
|
|---|
| 951 | // other triggers
|
|---|
| 952 | else
|
|---|
| 953 | run.triggerCounter[7]++;
|
|---|
| 954 | }
|
|---|
| 955 | }
|
|---|
| 956 |
|
|---|
| 957 | // If this is an invalid event, the current triggerCounter needs to be copied
|
|---|
| 958 | // because runClose will use that one to update the TRIGGER_COUNTER.
|
|---|
| 959 | // When closing the file, the trigger counter of the last successfully
|
|---|
| 960 | // written event is used.
|
|---|
| 961 | evt->triggerCounter = run.triggerCounter;
|
|---|
| 962 |
|
|---|
| 963 | // If event check has failed, skip the event and post a close request instead.
|
|---|
| 964 | // Otherwise, if file is open post the event for being written
|
|---|
| 965 | if (!check)
|
|---|
| 966 | secondaryQueue.emplace(new EVT_CTRL2(kRequestEventCheckFailed, evt->runCtrl));
|
|---|
| 967 | else
|
|---|
| 968 | secondaryQueue.post(evt);
|
|---|
| 969 |
|
|---|
| 970 | return true;
|
|---|
| 971 | }
|
|---|
| 972 |
|
|---|
| 973 | // ==========================================================================
|
|---|
| 974 | // ==========================================================================
|
|---|
| 975 |
|
|---|
| 976 | /*
|
|---|
| 977 | task 1-4:
|
|---|
| 978 |
|
|---|
| 979 | lock1()-lock4();
|
|---|
| 980 | while (1)
|
|---|
| 981 | {
|
|---|
| 982 | wait for signal [lockN]; // unlocked
|
|---|
| 983 |
|
|---|
| 984 | while (n!=10)
|
|---|
| 985 | wait sockets;
|
|---|
| 986 | read;
|
|---|
| 987 |
|
|---|
| 988 | lockM();
|
|---|
| 989 | finished[n] = true;
|
|---|
| 990 | signal(mainloop);
|
|---|
| 991 | unlockM();
|
|---|
| 992 | }
|
|---|
| 993 |
|
|---|
| 994 |
|
|---|
| 995 | mainloop:
|
|---|
| 996 |
|
|---|
| 997 | while (1)
|
|---|
| 998 | {
|
|---|
| 999 | lockM();
|
|---|
| 1000 | while (!finished[0] || !finished[1] ...)
|
|---|
| 1001 | wait for signal [lockM]; // unlocked... signals can be sent
|
|---|
| 1002 | finished[0-1] = false;
|
|---|
| 1003 | unlockM()
|
|---|
| 1004 |
|
|---|
| 1005 | copy data to queue // locked
|
|---|
| 1006 |
|
|---|
| 1007 | lockN[0-3];
|
|---|
| 1008 | signalN[0-3];
|
|---|
| 1009 | unlockN[0-3];
|
|---|
| 1010 | }
|
|---|
| 1011 |
|
|---|
| 1012 |
|
|---|
| 1013 | */
|
|---|
| 1014 |
|
|---|
| 1015 | /*
|
|---|
| 1016 | while (g_reset)
|
|---|
| 1017 | {
|
|---|
| 1018 | shared_ptr<EVT_CTRL2> evt = new shared_ptr<>;
|
|---|
| 1019 |
|
|---|
| 1020 | // Check that all sockets are connected
|
|---|
| 1021 |
|
|---|
| 1022 | for (int i=0; i<40; i++)
|
|---|
| 1023 | if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0)
|
|---|
| 1024 | factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
|
|---|
| 1025 |
|
|---|
| 1026 | while (g_reset)
|
|---|
| 1027 | {
|
|---|
| 1028 | if (READ_STRUCT::wait()<0)
|
|---|
| 1029 | break;
|
|---|
| 1030 |
|
|---|
| 1031 | if (rc_epoll==0)
|
|---|
| 1032 | break;
|
|---|
| 1033 |
|
|---|
| 1034 | for (int jj=0; jj<rc_epoll; jj++)
|
|---|
| 1035 | {
|
|---|
| 1036 | READ_STRUCT *rs = READ_STRUCT::get(jj);
|
|---|
| 1037 | if (!rs->connected)
|
|---|
| 1038 | continue;
|
|---|
| 1039 |
|
|---|
| 1040 | const bool rc_read = rs->read();
|
|---|
| 1041 | if (!rc_read)
|
|---|
| 1042 | continue;
|
|---|
| 1043 |
|
|---|
| 1044 | if (rs->bufTyp==READ_STRUCT::kHeader)
|
|---|
| 1045 | {
|
|---|
| 1046 | [...]
|
|---|
| 1047 | }
|
|---|
| 1048 |
|
|---|
| 1049 | [...]
|
|---|
| 1050 |
|
|---|
| 1051 | if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
|
|---|
| 1052 | factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
|
|---|
| 1053 | }
|
|---|
| 1054 |
|
|---|
| 1055 | if (once_a_second)
|
|---|
| 1056 | {
|
|---|
| 1057 | if (evt==timeout)
|
|---|
| 1058 | break;
|
|---|
| 1059 | }
|
|---|
| 1060 | }
|
|---|
| 1061 |
|
|---|
| 1062 | if (evt.nBoards==actBoards)
|
|---|
| 1063 | primaryQueue.post(evt);
|
|---|
| 1064 | }
|
|---|
| 1065 | */
|
|---|
| 1066 |
|
|---|
| 1067 | Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
|
|---|
| 1068 |
|
|---|
| 1069 | // This corresponds more or less to fFile... should we merge both?
|
|---|
| 1070 | shared_ptr<RUN_CTRL2> actrun;
|
|---|
| 1071 |
|
|---|
| 1072 | void CloseRunFile()
|
|---|
| 1073 | {
|
|---|
| 1074 | // Currently we need actrun here, to be able to set kFileClosed.
|
|---|
| 1075 | // Apart from that we have to ensure that there is an open file at all
|
|---|
| 1076 | // which we can close.
|
|---|
| 1077 | // Submission to the primary queue ensures that the event
|
|---|
| 1078 | // is placed at the right place in the processing chain.
|
|---|
| 1079 | // (Corresponds to the correct run)
|
|---|
| 1080 | primaryQueue.emplace(new EVT_CTRL2(kRequestManual, actrun));
|
|---|
| 1081 | }
|
|---|
| 1082 |
|
|---|
| 1083 | bool mainloop(READ_STRUCT *rd)
|
|---|
| 1084 | {
|
|---|
| 1085 | factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
|
|---|
| 1086 |
|
|---|
| 1087 | primaryQueue.start();
|
|---|
| 1088 | secondaryQueue.start();
|
|---|
| 1089 | processingQueue1.start();;
|
|---|
| 1090 |
|
|---|
| 1091 | actrun = make_shared<RUN_CTRL2>();
|
|---|
| 1092 |
|
|---|
| 1093 | //time in seconds
|
|---|
| 1094 | time_t gi_SecTime = time(NULL)-1;
|
|---|
| 1095 |
|
|---|
| 1096 | //loop until global variable g_runStat claims stop
|
|---|
| 1097 | g_reset = 0;
|
|---|
| 1098 | while (g_reset == 0)
|
|---|
| 1099 | {
|
|---|
| 1100 | #ifdef USE_POLL
|
|---|
| 1101 | int pp[40];
|
|---|
| 1102 | int nn = 0;
|
|---|
| 1103 | pollfd fds[40];
|
|---|
| 1104 | for (int i=0; i<40; i++)
|
|---|
| 1105 | {
|
|---|
| 1106 | if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
|
|---|
| 1107 | {
|
|---|
| 1108 | fds[nn].fd = rd[i].socket;
|
|---|
| 1109 | fds[nn].events = POLLIN;
|
|---|
| 1110 | pp[nn] = i;
|
|---|
| 1111 | nn++;
|
|---|
| 1112 | }
|
|---|
| 1113 | }
|
|---|
| 1114 |
|
|---|
| 1115 | const int rc_epoll = poll(fds, nn, 100);
|
|---|
| 1116 | if (rc_epoll<0)
|
|---|
| 1117 | break;
|
|---|
| 1118 | #endif
|
|---|
| 1119 |
|
|---|
| 1120 | #ifdef USE_SELECT
|
|---|
| 1121 | fd_set readfs;
|
|---|
| 1122 | FD_ZERO(&readfs);
|
|---|
| 1123 | int nfsd = 0;
|
|---|
| 1124 | for (int i=0; i<NBOARDS; i++)
|
|---|
| 1125 | if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
|
|---|
| 1126 | {
|
|---|
| 1127 | FD_SET(rd[i].socket, &readfs);
|
|---|
| 1128 | if (rd[i].socket>nfsd)
|
|---|
| 1129 | nfsd = rd[i].socket;
|
|---|
| 1130 | }
|
|---|
| 1131 |
|
|---|
| 1132 | timeval tv;
|
|---|
| 1133 | tv.tv_sec = 0;
|
|---|
| 1134 | tv.tv_usec = 100000;
|
|---|
| 1135 | const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
|
|---|
| 1136 | // 0: timeout
|
|---|
| 1137 | // -1: error
|
|---|
| 1138 | if (rc_select<0)
|
|---|
| 1139 | {
|
|---|
| 1140 | factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
|
|---|
| 1141 | continue;
|
|---|
| 1142 | }
|
|---|
| 1143 | #endif
|
|---|
| 1144 |
|
|---|
| 1145 | #ifdef USE_EPOLL
|
|---|
| 1146 | const int rc_epoll = READ_STRUCT::wait();
|
|---|
| 1147 | if (rc_epoll<0)
|
|---|
| 1148 | break;
|
|---|
| 1149 | #endif
|
|---|
| 1150 |
|
|---|
| 1151 | #ifdef PRIORITY_QUEUE
|
|---|
| 1152 | priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio;
|
|---|
| 1153 |
|
|---|
| 1154 | for (int i=0; i<NBOARDS; i++)
|
|---|
| 1155 | if (rd[i].connected)
|
|---|
| 1156 | prio.push(rd+i);
|
|---|
| 1157 |
|
|---|
| 1158 | if (!prio.empty()) do
|
|---|
| 1159 | #endif
|
|---|
| 1160 |
|
|---|
| 1161 |
|
|---|
| 1162 | #ifdef USE_POLL
|
|---|
| 1163 | for (int jj=0; jj<nn; jj++)
|
|---|
| 1164 | #endif
|
|---|
| 1165 | #ifdef USE_EPOLL
|
|---|
| 1166 | for (int jj=0; jj<rc_epoll; jj++)
|
|---|
| 1167 | #endif
|
|---|
| 1168 | #if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE)
|
|---|
| 1169 | for (int jj=0; jj<NBOARDS; jj++)
|
|---|
| 1170 | #endif
|
|---|
| 1171 | {
|
|---|
| 1172 | #ifdef PRIORITY_QUEUE
|
|---|
| 1173 | READ_STRUCT *rs = prio.top();
|
|---|
| 1174 | #endif
|
|---|
| 1175 | #ifdef USE_SELECT
|
|---|
| 1176 | if (!FD_ISSET(rs->socket, &readfs))
|
|---|
| 1177 | continue;
|
|---|
| 1178 | #endif
|
|---|
| 1179 |
|
|---|
| 1180 | #ifdef USE_POLL
|
|---|
| 1181 | if ((fds[jj].revents&POLLIN)==0)
|
|---|
| 1182 | continue;
|
|---|
| 1183 | #endif
|
|---|
| 1184 |
|
|---|
| 1185 | #ifdef USE_EPOLL
|
|---|
| 1186 | // FIXME: How to get i?
|
|---|
| 1187 | READ_STRUCT *rs = READ_STRUCT::get(jj);
|
|---|
| 1188 | #endif
|
|---|
| 1189 |
|
|---|
| 1190 | #ifdef USE_POLL
|
|---|
| 1191 | // FIXME: How to get i?
|
|---|
| 1192 | READ_STRUCT *rs = &rd[pp[jj]];
|
|---|
| 1193 | #endif
|
|---|
| 1194 |
|
|---|
| 1195 | #if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE)
|
|---|
| 1196 | const int i = (jj%4)*10 + (jj/4);
|
|---|
| 1197 | READ_STRUCT *rs = &rd[i];
|
|---|
| 1198 | #endif
|
|---|
| 1199 |
|
|---|
| 1200 | #ifdef COMPLETE_EVENTS
|
|---|
| 1201 | if (rs->bufTyp==READ_STRUCT::kWait)
|
|---|
| 1202 | continue;
|
|---|
| 1203 | #endif
|
|---|
| 1204 |
|
|---|
| 1205 | // ==================================================================
|
|---|
| 1206 |
|
|---|
| 1207 | const bool rc_read = rs->read();
|
|---|
| 1208 |
|
|---|
| 1209 | // Connect might have gotten closed during read
|
|---|
| 1210 | gi_NumConnect[rs->sockId] = rs->connected;
|
|---|
| 1211 | gj.numConn[rs->sockId] = rs->connected;
|
|---|
| 1212 |
|
|---|
| 1213 | // Read either failed or disconnected, or the buffer is not yet full
|
|---|
| 1214 | if (!rc_read)
|
|---|
| 1215 | continue;
|
|---|
| 1216 |
|
|---|
| 1217 | // ==================================================================
|
|---|
| 1218 |
|
|---|
| 1219 | if (rs->bufTyp==READ_STRUCT::kHeader)
|
|---|
| 1220 | {
|
|---|
| 1221 | //check if startflag correct; else shift block ....
|
|---|
| 1222 | // FIXME: This is not enough... this combination of
|
|---|
| 1223 | // bytes can be anywhere... at least the end bytes
|
|---|
| 1224 | // must be checked somewhere, too.
|
|---|
| 1225 | uint k;
|
|---|
| 1226 | for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
|
|---|
| 1227 | {
|
|---|
| 1228 | if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
|
|---|
| 1229 | break;
|
|---|
| 1230 | }
|
|---|
| 1231 | rs->skip += k;
|
|---|
| 1232 |
|
|---|
| 1233 | //no start of header found
|
|---|
| 1234 | if (k==sizeof(PEVNT_HEADER)-1)
|
|---|
| 1235 | {
|
|---|
| 1236 | rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1];
|
|---|
| 1237 | rs->bufPos = rs->B+1;
|
|---|
| 1238 | rs->bufLen = sizeof(PEVNT_HEADER)-1;
|
|---|
| 1239 | continue;
|
|---|
| 1240 | }
|
|---|
| 1241 |
|
|---|
| 1242 | if (k > 0)
|
|---|
| 1243 | {
|
|---|
| 1244 | memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
|
|---|
| 1245 |
|
|---|
| 1246 | rs->bufPos -= k;
|
|---|
| 1247 | rs->bufLen += k;
|
|---|
| 1248 |
|
|---|
| 1249 | continue; // We need to read more (bufLen>0)
|
|---|
| 1250 | }
|
|---|
| 1251 |
|
|---|
| 1252 | if (rs->skip>0)
|
|---|
| 1253 | {
|
|---|
| 1254 | factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
|
|---|
| 1255 | rs->skip = 0;
|
|---|
| 1256 | }
|
|---|
| 1257 |
|
|---|
| 1258 | // Swap the header entries from network to host order
|
|---|
| 1259 | rs->swapHeader();
|
|---|
| 1260 |
|
|---|
| 1261 | rs->bufTyp = READ_STRUCT::kData;
|
|---|
| 1262 | rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
|
|---|
| 1263 |
|
|---|
| 1264 | debugHead(rs->B); // i and fadBoard not used
|
|---|
| 1265 |
|
|---|
| 1266 | continue;
|
|---|
| 1267 | }
|
|---|
| 1268 |
|
|---|
| 1269 | const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
|
|---|
| 1270 | if (end != 0xfe04)
|
|---|
| 1271 | {
|
|---|
| 1272 | factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
|
|---|
| 1273 | rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
|
|---|
| 1274 |
|
|---|
| 1275 | // ready to read next header
|
|---|
| 1276 | rs->bufTyp = READ_STRUCT::kHeader;
|
|---|
| 1277 | rs->bufLen = sizeof(PEVNT_HEADER);
|
|---|
| 1278 | rs->bufPos = rs->B;
|
|---|
| 1279 | // FIXME: What to do with the validity flag?
|
|---|
| 1280 | continue;
|
|---|
| 1281 | }
|
|---|
| 1282 |
|
|---|
| 1283 | // get index into mBuffer for this event (create if needed)
|
|---|
| 1284 | const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
|
|---|
| 1285 |
|
|---|
| 1286 | // We have a valid entry, but no memory has yet been allocated
|
|---|
| 1287 | if (evt && !evt->initMemory())
|
|---|
| 1288 | {
|
|---|
| 1289 | const time_t tm = time(NULL);
|
|---|
| 1290 | if (evt->runCtrl->reportMem==tm)
|
|---|
| 1291 | continue;
|
|---|
| 1292 |
|
|---|
| 1293 | factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
|
|---|
| 1294 | evt->runCtrl->reportMem = tm;
|
|---|
| 1295 | continue;
|
|---|
| 1296 | }
|
|---|
| 1297 |
|
|---|
| 1298 | // ready to read next header
|
|---|
| 1299 | rs->bufTyp = READ_STRUCT::kHeader;
|
|---|
| 1300 | rs->bufLen = sizeof(PEVNT_HEADER);
|
|---|
| 1301 | rs->bufPos = rs->B;
|
|---|
| 1302 |
|
|---|
| 1303 | // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
|
|---|
| 1304 | if (!evt)
|
|---|
| 1305 | continue;
|
|---|
| 1306 |
|
|---|
| 1307 | // This should never happen
|
|---|
| 1308 | if (evt->board[rs->sockId] != -1)
|
|---|
| 1309 | {
|
|---|
| 1310 | factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
|
|---|
| 1311 | evt->evNum, rs->sockId, jj, rs->len());
|
|---|
| 1312 | // FIXME: What to do with the validity flag?
|
|---|
| 1313 | continue; // Continue reading next header
|
|---|
| 1314 | }
|
|---|
| 1315 |
|
|---|
| 1316 | // Swap the data entries (board headers) from network to host order
|
|---|
| 1317 | rs->swapData();
|
|---|
| 1318 |
|
|---|
| 1319 | // Copy data from rd[i] to mBuffer[evID]
|
|---|
| 1320 | copyData(*rs, evt.get());
|
|---|
| 1321 |
|
|---|
| 1322 | #ifdef COMPLETE_EVENTS
|
|---|
| 1323 | // Do not read anmymore from this board until the whole event has been received
|
|---|
| 1324 | rs->bufTyp = READ_STRUCT::kWait;
|
|---|
| 1325 | #endif
|
|---|
| 1326 | // now we have stored a new board contents into Event structure
|
|---|
| 1327 | evt->board[rs->sockId] = rs->sockId;
|
|---|
| 1328 | evt->header = evt->FADhead+rs->sockId;
|
|---|
| 1329 | evt->nBoard++;
|
|---|
| 1330 |
|
|---|
| 1331 | #ifdef COMPLETE_EPOLL
|
|---|
| 1332 | if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
|
|---|
| 1333 | {
|
|---|
| 1334 | factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
|
|---|
| 1335 | break;
|
|---|
| 1336 | }
|
|---|
| 1337 | #endif
|
|---|
| 1338 | // event not yet complete
|
|---|
| 1339 | if (evt->nBoard < READ_STRUCT::activeSockets)
|
|---|
| 1340 | continue;
|
|---|
| 1341 |
|
|---|
| 1342 | // All previous events are now flagged as incomplete ("expired")
|
|---|
| 1343 | // and will be removed. (This is a bit tricky, because pop_front()
|
|---|
| 1344 | // would invalidate the current iterator if not done _after_ the increment)
|
|---|
| 1345 | for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
|
|---|
| 1346 | {
|
|---|
| 1347 | const bool found = it->get()==evt.get();
|
|---|
| 1348 | if (!found)
|
|---|
| 1349 | reportIncomplete(*it, "expired");
|
|---|
| 1350 | else
|
|---|
| 1351 | primaryQueue.post(evt);
|
|---|
| 1352 |
|
|---|
| 1353 | // package_len is 0 if nothing was received.
|
|---|
| 1354 | for (int ib=0; ib<40; ib++)
|
|---|
| 1355 | rd[ib].relBytes += uint32_t((*it)->FADhead[ib].package_length)*2;
|
|---|
| 1356 |
|
|---|
| 1357 | // The counter must be increased _before_ the pop_front,
|
|---|
| 1358 | // otherwise the counter is invalidated by the pop_front!
|
|---|
| 1359 | it++;
|
|---|
| 1360 | evtCtrl.pop_front();
|
|---|
| 1361 |
|
|---|
| 1362 | // We reached the current event, so we are done
|
|---|
| 1363 | if (found)
|
|---|
| 1364 | break;
|
|---|
| 1365 | }
|
|---|
| 1366 |
|
|---|
| 1367 | #ifdef COMPLETE_EPOLL
|
|---|
| 1368 | for (int j=0; j<40; j++)
|
|---|
| 1369 | {
|
|---|
| 1370 | epoll_event ev;
|
|---|
| 1371 | ev.events = EPOLLIN;
|
|---|
| 1372 | ev.data.ptr = &rd[j]; // user data (union: ev.ptr)
|
|---|
| 1373 | if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
|
|---|
| 1374 | {
|
|---|
| 1375 | factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
|
|---|
| 1376 | return;
|
|---|
| 1377 | }
|
|---|
| 1378 | }
|
|---|
| 1379 | #endif
|
|---|
| 1380 |
|
|---|
| 1381 | #ifdef COMPLETE_EVENTS
|
|---|
| 1382 | for (int j=0; j<40; j++)
|
|---|
| 1383 | {
|
|---|
| 1384 | //if (rs->bufTyp==READ_STRUCT::kWait)
|
|---|
| 1385 | {
|
|---|
| 1386 | rs->bufTyp = READ_STRUCT::kHeader;
|
|---|
| 1387 | rs->bufLen = sizeof(PEVNT_HEADER);
|
|---|
| 1388 | rs->bufPos = rs->B;
|
|---|
| 1389 | }
|
|---|
| 1390 | }
|
|---|
| 1391 | #endif
|
|---|
| 1392 | } // end for loop over all sockets
|
|---|
| 1393 | #ifdef PRIORITY_QUEUE
|
|---|
| 1394 | while (0); // convert continue into break ;)
|
|---|
| 1395 | #endif
|
|---|
| 1396 |
|
|---|
| 1397 | // ==================================================================
|
|---|
| 1398 |
|
|---|
| 1399 | const time_t actTime = time(NULL);
|
|---|
| 1400 | if (actTime == gi_SecTime)
|
|---|
| 1401 | {
|
|---|
| 1402 | #if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL)
|
|---|
| 1403 | if (evtCtrl.empty())
|
|---|
| 1404 | usleep(actTime-actrun->lastTime>300 ? 10000 : 1);
|
|---|
| 1405 | #endif
|
|---|
| 1406 | continue;
|
|---|
| 1407 | }
|
|---|
| 1408 | gi_SecTime = actTime;
|
|---|
| 1409 |
|
|---|
| 1410 | // ==================================================================
|
|---|
| 1411 | //loop over all active events and flag those older than read-timeout
|
|---|
| 1412 | //delete those that are written to disk ....
|
|---|
| 1413 |
|
|---|
| 1414 | // This could be improved having the pointer which separates the queue with
|
|---|
| 1415 | // the incomplete events from the queue with the complete events
|
|---|
| 1416 | for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
|
|---|
| 1417 | {
|
|---|
| 1418 | // A reference is enough because the shared_ptr is hold by the evtCtrl
|
|---|
| 1419 | const shared_ptr<EVT_CTRL2> &evt = *it;
|
|---|
| 1420 |
|
|---|
| 1421 | // The first event is the oldest. If the first event within the
|
|---|
| 1422 | // timeout window was received, we can stop searching further.
|
|---|
| 1423 | if (evt->time.tv_sec+g_evtTimeout>=actTime)
|
|---|
| 1424 | break;
|
|---|
| 1425 |
|
|---|
| 1426 | // The counter must be increased _before_ the pop_front,
|
|---|
| 1427 | // otherwise the counter is invalidated by the pop_front!
|
|---|
| 1428 | it++;
|
|---|
| 1429 |
|
|---|
| 1430 | // This timeout is caused because complete data from one or more
|
|---|
| 1431 | // boards has been received, but the memory could not be allocated.
|
|---|
| 1432 | // There is no reason why we should not go on waiting for
|
|---|
| 1433 | // memory to become free. However, the FADs will disconnect
|
|---|
| 1434 | // after 60s due to their keep-alive timeout, but the event builder
|
|---|
| 1435 | // will still wait for memory to become available.
|
|---|
| 1436 | // Currently, the only possibility to free the memory from the
|
|---|
| 1437 | // evtCtrl to restart the event builder (STOP/START).
|
|---|
| 1438 | if (!evt->valid())
|
|---|
| 1439 | continue;
|
|---|
| 1440 |
|
|---|
| 1441 | // This will result in the emission of a dim service.
|
|---|
| 1442 | // It doesn't matter if that takes comparably long,
|
|---|
| 1443 | // because we have to stop the run anyway.
|
|---|
| 1444 | const uint64_t rep = reportIncomplete(evt, "timeout");
|
|---|
| 1445 | factReportIncomplete(rep);
|
|---|
| 1446 |
|
|---|
| 1447 | // At least the data from one boards is complete...
|
|---|
| 1448 | // package_len is 0 when nothing was received from this board
|
|---|
| 1449 | for (int ib=0; ib<40; ib++)
|
|---|
| 1450 | rd[ib].relBytes += uint32_t(evt->FADhead[ib].package_length)*2;
|
|---|
| 1451 |
|
|---|
| 1452 | evtCtrl.pop_front();
|
|---|
| 1453 | }
|
|---|
| 1454 |
|
|---|
| 1455 | // =================================================================
|
|---|
| 1456 |
|
|---|
| 1457 | gj.bufNew = evtCtrl.size(); //# incomplete events in buffer
|
|---|
| 1458 | gj.bufEvt = primaryQueue.size(); //# complete events in buffer
|
|---|
| 1459 | gj.bufWrite = secondaryQueue.size(); //# complete events in buffer
|
|---|
| 1460 | gj.bufProc = processingQueue1.size(); //# complete events in buffer
|
|---|
| 1461 | gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
|
|---|
| 1462 | gj.usdMem = Memory::max_inuse;
|
|---|
| 1463 | gj.totMem = Memory::allocated;
|
|---|
| 1464 | gj.maxMem = g_maxMem;
|
|---|
| 1465 |
|
|---|
| 1466 | gj.deltaT = 1000; // temporary, must be improved
|
|---|
| 1467 |
|
|---|
| 1468 | bool changed = false;
|
|---|
| 1469 |
|
|---|
| 1470 | static vector<uint64_t> store(NBOARDS);
|
|---|
| 1471 |
|
|---|
| 1472 | for (int ib=0; ib<NBOARDS; ib++)
|
|---|
| 1473 | {
|
|---|
| 1474 | gj.rateBytes[ib] = store[ib]>rd[ib].totBytes ? rd[ib].totBytes : rd[ib].totBytes-store[ib];
|
|---|
| 1475 | gj.relBytes[ib] = rd[ib].totBytes-rd[ib].relBytes;
|
|---|
| 1476 |
|
|---|
| 1477 | store[ib] = rd[ib].totBytes;
|
|---|
| 1478 |
|
|---|
| 1479 | if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
|
|---|
| 1480 | changed = true;
|
|---|
| 1481 |
|
|---|
| 1482 | gi_NumConnect[ib] = rd[ib].connected;
|
|---|
| 1483 | gj.numConn[ib] = rd[ib].connected;
|
|---|
| 1484 | }
|
|---|
| 1485 |
|
|---|
| 1486 | factStat(gj);
|
|---|
| 1487 |
|
|---|
| 1488 | Memory::max_inuse = 0;
|
|---|
| 1489 |
|
|---|
| 1490 | // =================================================================
|
|---|
| 1491 |
|
|---|
| 1492 | // This is a fake event to trigger possible run-closing conditions once a second
|
|---|
| 1493 | // FIXME: This is not yet ideal because a file would never be closed
|
|---|
| 1494 | // if a new file has been started and no events of the new file
|
|---|
| 1495 | // have been received yet
|
|---|
| 1496 | int request = kRequestNone;
|
|---|
| 1497 |
|
|---|
| 1498 | // If nothing was received for more than 5min, close file
|
|---|
| 1499 | if (actTime-actrun->lastTime>300)
|
|---|
| 1500 | request |= kRequestTimeout;
|
|---|
| 1501 |
|
|---|
| 1502 | // If connection status has changed
|
|---|
| 1503 | if (changed)
|
|---|
| 1504 | request |= kRequestConnectionChange;
|
|---|
| 1505 |
|
|---|
| 1506 | if (request!=kRequestNone)
|
|---|
| 1507 | runFinished();
|
|---|
| 1508 |
|
|---|
| 1509 | if (actrun->fileStat==kFileOpen)
|
|---|
| 1510 | primaryQueue.emplace(new EVT_CTRL2(request, actrun));
|
|---|
| 1511 | }
|
|---|
| 1512 |
|
|---|
| 1513 | // 1: Stop, wait for event to get processed
|
|---|
| 1514 | // 2: Stop, finish immediately
|
|---|
| 1515 | // 101: Restart, wait for events to get processed
|
|---|
| 1516 | // 101: Restart, finish immediately
|
|---|
| 1517 | //
|
|---|
| 1518 | const int gi_reset = g_reset;
|
|---|
| 1519 |
|
|---|
| 1520 | const bool abort = gi_reset%100==2;
|
|---|
| 1521 |
|
|---|
| 1522 | factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
|
|---|
| 1523 |
|
|---|
| 1524 | primaryQueue.wait(abort);
|
|---|
| 1525 | secondaryQueue.wait(abort);
|
|---|
| 1526 | processingQueue1.wait(abort);
|
|---|
| 1527 |
|
|---|
| 1528 | // Here we also destroy all runCtrl structures and hence close all open files
|
|---|
| 1529 | evtCtrl.clear();
|
|---|
| 1530 | actrun.reset();
|
|---|
| 1531 |
|
|---|
| 1532 | factPrintf(MessageImp::kInfo, "Exit read Process...");
|
|---|
| 1533 | factPrintf(MessageImp::kInfo, "%llu Bytes flagged as in-use.", Memory::inuse);
|
|---|
| 1534 |
|
|---|
| 1535 | factStat(gj);
|
|---|
| 1536 |
|
|---|
| 1537 | return gi_reset>=100;
|
|---|
| 1538 | }
|
|---|
| 1539 |
|
|---|
| 1540 | // ==========================================================================
|
|---|
| 1541 | // ==========================================================================
|
|---|
| 1542 |
|
|---|
| 1543 | void StartEvtBuild()
|
|---|
| 1544 | {
|
|---|
| 1545 | factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
|
|---|
| 1546 |
|
|---|
| 1547 | memset(gi_NumConnect, 0, NBOARDS*sizeof(*gi_NumConnect));
|
|---|
| 1548 |
|
|---|
| 1549 | memset(&gj, 0, sizeof(GUI_STAT));
|
|---|
| 1550 |
|
|---|
| 1551 | gj.usdMem = Memory::inuse;
|
|---|
| 1552 | gj.totMem = Memory::allocated;
|
|---|
| 1553 | gj.maxMem = g_maxMem;
|
|---|
| 1554 |
|
|---|
| 1555 |
|
|---|
| 1556 | READ_STRUCT rd[NBOARDS];
|
|---|
| 1557 |
|
|---|
| 1558 | // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets)
|
|---|
| 1559 | for (int i=0; i<NBOARDS; i++)
|
|---|
| 1560 | rd[i].sockId = i;
|
|---|
| 1561 |
|
|---|
| 1562 | while (mainloop(rd));
|
|---|
| 1563 |
|
|---|
| 1564 | //must close all open sockets ...
|
|---|
| 1565 | factPrintf(MessageImp::kInfo, "Close all sockets...");
|
|---|
| 1566 |
|
|---|
| 1567 | READ_STRUCT::close();
|
|---|
| 1568 |
|
|---|
| 1569 | // Now all sockets get closed. This is not reflected in gi_NumConnect
|
|---|
| 1570 | // The current workaround is to count all sockets as closed when the thread is not running
|
|---|
| 1571 | factPrintf(MessageImp::kInfo, "EventBuilder++ closed");
|
|---|
| 1572 | }
|
|---|