| 1 | #include <poll.h> | 
|---|
| 2 | #include <sys/time.h> | 
|---|
| 3 | #include <sys/epoll.h> | 
|---|
| 4 | #include <netinet/tcp.h> | 
|---|
| 5 |  | 
|---|
| 6 | #include <cstring> | 
|---|
| 7 | #include <cstdarg> | 
|---|
| 8 | #include <list> | 
|---|
| 9 | #include <queue> | 
|---|
| 10 |  | 
|---|
| 11 | #include <boost/algorithm/string/join.hpp> | 
|---|
| 12 |  | 
|---|
| 13 | #include "../externals/Queue.h" | 
|---|
| 14 |  | 
|---|
| 15 | #include "MessageImp.h" | 
|---|
| 16 | #include "EventBuilder.h" | 
|---|
| 17 | #include "HeadersFAD.h" | 
|---|
| 18 |  | 
|---|
| 19 | using namespace std; | 
|---|
| 20 |  | 
|---|
| 21 | #define MIN_LEN  32    // min #bytes needed to interpret FADheader | 
|---|
| 22 | #define MAX_LEN  81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092  (data+boardheader+eventheader+endflag) | 
|---|
| 23 |  | 
|---|
| 24 | //#define COMPLETE_EVENTS | 
|---|
| 25 | //#define USE_POLL | 
|---|
| 26 | //#define USE_EPOLL | 
|---|
| 27 | //#define USE_SELECT | 
|---|
| 28 | //#define COMPLETE_EPOLL | 
|---|
| 29 | //#define PRIORITY_QUEUE | 
|---|
| 30 |  | 
|---|
| 31 | // Reading only 1024: 13:  77Hz, 87% | 
|---|
| 32 | // Reading only 1024: 12:  78Hz, 46% | 
|---|
| 33 | // Reading only  300:  4: 250Hz, 92% | 
|---|
| 34 | // Reading only  300:  3: 258Hz, 40% | 
|---|
| 35 |  | 
|---|
| 36 | // Reading only four threads 1024: 13:  77Hz, 60% | 
|---|
| 37 | // Reading only four threads 1024: 12:  78Hz, 46% | 
|---|
| 38 | // Reading only four threads  300:  4: 250Hz, 92% | 
|---|
| 39 | // Reading only four threads  300:  3: 258Hz, 40% | 
|---|
| 40 |  | 
|---|
| 41 | // Default  300:  4: 249Hz, 92% | 
|---|
| 42 | // Default  300:  3: 261Hz, 40% | 
|---|
| 43 | // Default 1024: 13:  76Hz, 93% | 
|---|
| 44 | // Default 1024: 12:  79Hz, 46% | 
|---|
| 45 |  | 
|---|
| 46 | // Poll [selected] 1024: 13:  63Hz, 45% | 
|---|
| 47 | // Poll [selected] 1024: 14:  63Hz, 63% | 
|---|
| 48 | // Poll [selected] 1024: 15:  64Hz, 80% | 
|---|
| 49 | // Poll [selected]  300:  4: 230Hz, 47% | 
|---|
| 50 | // Poll [selected]  300:  3: 200Hz, 94% | 
|---|
| 51 |  | 
|---|
| 52 | // Poll [all]      1024: 13:  65Hz, 47% | 
|---|
| 53 | // Poll [all]      1024: 14:  64Hz, 59% | 
|---|
| 54 | // Poll [all]      1024: 15:  62Hz, 67% | 
|---|
| 55 | // Poll [all]       300:  4: 230Hz, 47% | 
|---|
| 56 | // Poll [all]       300:  3: 230Hz, 35% | 
|---|
| 57 |  | 
|---|
| 58 | // ========================================================================== | 
|---|
| 59 |  | 
|---|
| 60 | bool runOpen(const EVT_CTRL2 &evt); | 
|---|
| 61 | bool runWrite(const EVT_CTRL2 &evt); | 
|---|
| 62 | void runClose(const EVT_CTRL2 &run); | 
|---|
| 63 | void applyCalib(const EVT_CTRL2 &evt, const size_t &size); | 
|---|
| 64 | void factOut(int severity, const char *message); | 
|---|
| 65 | void factReportIncomplete (uint64_t rep); | 
|---|
| 66 | void gotNewRun(RUN_CTRL2 &run); | 
|---|
| 67 | void runFinished(); | 
|---|
| 68 | void factStat(const GUI_STAT &gj); | 
|---|
| 69 | bool eventCheck(const EVT_CTRL2 &evt); | 
|---|
| 70 | void debugHead(void *buf); | 
|---|
| 71 |  | 
|---|
| 72 | // ========================================================================== | 
|---|
| 73 |  | 
|---|
| 74 | int g_reset; | 
|---|
| 75 |  | 
|---|
| 76 | size_t g_maxMem;                //maximum memory allowed for buffer | 
|---|
| 77 |  | 
|---|
| 78 | uint16_t g_evtTimeout;           // timeout (sec) for one event | 
|---|
| 79 |  | 
|---|
| 80 | FACT_SOCK g_port[NBOARDS];      // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd" | 
|---|
| 81 |  | 
|---|
| 82 | uint gi_NumConnect[NBOARDS];    //4 crates * 10 boards | 
|---|
| 83 |  | 
|---|
| 84 | GUI_STAT gj; | 
|---|
| 85 |  | 
|---|
| 86 | // ========================================================================== | 
|---|
| 87 |  | 
|---|
| 88 | namespace Memory | 
|---|
| 89 | { | 
|---|
| 90 | uint64_t inuse     = 0; | 
|---|
| 91 | uint64_t allocated = 0; | 
|---|
| 92 |  | 
|---|
| 93 | uint64_t max_inuse = 0; | 
|---|
| 94 |  | 
|---|
| 95 | std::mutex mtx; | 
|---|
| 96 |  | 
|---|
| 97 | std::forward_list<void*> memory; | 
|---|
| 98 |  | 
|---|
| 99 | void *malloc() | 
|---|
| 100 | { | 
|---|
| 101 | // No free slot available, next alloc would exceed max memory | 
|---|
| 102 | if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem) | 
|---|
| 103 | return NULL; | 
|---|
| 104 |  | 
|---|
| 105 | // We will return this amount of memory | 
|---|
| 106 | // This is not 100% thread safe, but it is not a super accurate measure anyway | 
|---|
| 107 | inuse += MAX_TOT_MEM; | 
|---|
| 108 | if (inuse>max_inuse) | 
|---|
| 109 | max_inuse = inuse; | 
|---|
| 110 |  | 
|---|
| 111 | if (memory.empty()) | 
|---|
| 112 | { | 
|---|
| 113 | // No free slot available, allocate a new one | 
|---|
| 114 | allocated += MAX_TOT_MEM; | 
|---|
| 115 | return  new char[MAX_TOT_MEM]; | 
|---|
| 116 | } | 
|---|
| 117 |  | 
|---|
| 118 | // Get the next free slot from the stack and return it | 
|---|
| 119 | const std::lock_guard<std::mutex> lock(mtx); | 
|---|
| 120 |  | 
|---|
| 121 | void *mem = memory.front(); | 
|---|
| 122 | memory.pop_front(); | 
|---|
| 123 | return mem; | 
|---|
| 124 | }; | 
|---|
| 125 |  | 
|---|
| 126 | void free(void *mem) | 
|---|
| 127 | { | 
|---|
| 128 | if (!mem) | 
|---|
| 129 | return; | 
|---|
| 130 |  | 
|---|
| 131 | // Decrease the amont of memory in use accordingly | 
|---|
| 132 | inuse -= MAX_TOT_MEM; | 
|---|
| 133 |  | 
|---|
| 134 | // If the maximum memory has changed, we might be over the limit. | 
|---|
| 135 | // In this case: free a slot | 
|---|
| 136 | if (allocated>g_maxMem) | 
|---|
| 137 | { | 
|---|
| 138 | delete [] (char*)mem; | 
|---|
| 139 | allocated -= MAX_TOT_MEM; | 
|---|
| 140 | return; | 
|---|
| 141 | } | 
|---|
| 142 |  | 
|---|
| 143 | const std::lock_guard<std::mutex> lock(mtx); | 
|---|
| 144 | memory.push_front(mem); | 
|---|
| 145 | } | 
|---|
| 146 |  | 
|---|
| 147 | }; | 
|---|
| 148 |  | 
|---|
| 149 | // ========================================================================== | 
|---|
| 150 |  | 
|---|
| 151 | void factPrintf(int severity, const char *fmt, ...) | 
|---|
| 152 | { | 
|---|
| 153 | char str[1000]; | 
|---|
| 154 |  | 
|---|
| 155 | va_list ap; | 
|---|
| 156 | va_start(ap, fmt); | 
|---|
| 157 | vsnprintf(str, 1000, fmt, ap); | 
|---|
| 158 | va_end(ap); | 
|---|
| 159 |  | 
|---|
| 160 | factOut(severity, str); | 
|---|
| 161 | } | 
|---|
| 162 |  | 
|---|
| 163 | // ========================================================================== | 
|---|
| 164 |  | 
|---|
| 165 | struct READ_STRUCT | 
|---|
| 166 | { | 
|---|
| 167 | enum buftyp_t | 
|---|
| 168 | { | 
|---|
| 169 | kStream, | 
|---|
| 170 | kHeader, | 
|---|
| 171 | kData, | 
|---|
| 172 | #ifdef COMPLETE_EVENTS | 
|---|
| 173 | kWait | 
|---|
| 174 | #endif | 
|---|
| 175 | }; | 
|---|
| 176 |  | 
|---|
| 177 | // ---------- connection ---------- | 
|---|
| 178 |  | 
|---|
| 179 | static uint activeSockets; | 
|---|
| 180 |  | 
|---|
| 181 | int  sockId;       // socket id (board number) | 
|---|
| 182 | int  socket;       // socket handle | 
|---|
| 183 | bool connected;    // is this socket connected? | 
|---|
| 184 |  | 
|---|
| 185 | struct sockaddr_in SockAddr;  // Socket address copied from wrapper during socket creation | 
|---|
| 186 |  | 
|---|
| 187 | // ------------ epoll ------------- | 
|---|
| 188 |  | 
|---|
| 189 | static int  fd_epoll; | 
|---|
| 190 | static epoll_event events[NBOARDS]; | 
|---|
| 191 |  | 
|---|
| 192 | static void init(); | 
|---|
| 193 | static void close(); | 
|---|
| 194 | static int  wait(); | 
|---|
| 195 | static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); } | 
|---|
| 196 |  | 
|---|
| 197 | // ------------ buffer ------------ | 
|---|
| 198 |  | 
|---|
| 199 | buftyp_t  bufTyp;  // what are we reading at the moment: 0=header 1=data -1=skip ... | 
|---|
| 200 |  | 
|---|
| 201 | uint32_t  bufLen;  // number of bytes left to read | 
|---|
| 202 | uint8_t  *bufPos;  // next byte to read to the buffer next | 
|---|
| 203 |  | 
|---|
| 204 | union | 
|---|
| 205 | { | 
|---|
| 206 | uint8_t  B[MAX_LEN]; | 
|---|
| 207 | uint16_t S[MAX_LEN / 2]; | 
|---|
| 208 | uint32_t I[MAX_LEN / 4]; | 
|---|
| 209 | uint64_t L[MAX_LEN / 8]; | 
|---|
| 210 | PEVNT_HEADER H; | 
|---|
| 211 | }; | 
|---|
| 212 |  | 
|---|
| 213 | timeval  time; | 
|---|
| 214 | uint64_t totBytes;  // total received bytes | 
|---|
| 215 | uint64_t relBytes;  // total released bytes | 
|---|
| 216 | uint32_t skip;      // number of bytes skipped before start of event | 
|---|
| 217 |  | 
|---|
| 218 | uint32_t len() const { return uint32_t(H.package_length)*2; } | 
|---|
| 219 |  | 
|---|
| 220 | void swapHeader(); | 
|---|
| 221 | void swapData(); | 
|---|
| 222 |  | 
|---|
| 223 | // -------------------------------- | 
|---|
| 224 |  | 
|---|
| 225 | READ_STRUCT() : socket(-1), connected(false), totBytes(0), relBytes(0) | 
|---|
| 226 | { | 
|---|
| 227 | if (fd_epoll<0) | 
|---|
| 228 | init(); | 
|---|
| 229 | } | 
|---|
| 230 | ~READ_STRUCT() | 
|---|
| 231 | { | 
|---|
| 232 | destroy(); | 
|---|
| 233 | } | 
|---|
| 234 |  | 
|---|
| 235 | void destroy(); | 
|---|
| 236 | bool create(sockaddr_in addr); | 
|---|
| 237 | bool check(int, sockaddr_in addr); | 
|---|
| 238 | bool read(); | 
|---|
| 239 |  | 
|---|
| 240 | }; | 
|---|
| 241 |  | 
|---|
| 242 | #ifdef PRIORITY_QUEUE | 
|---|
| 243 | struct READ_STRUCTcomp | 
|---|
| 244 | { | 
|---|
| 245 | bool operator()(const READ_STRUCT *r1, const READ_STRUCT *r2) | 
|---|
| 246 | { | 
|---|
| 247 | const int64_t rel1 = r1->totBytes - r1->relBytes; | 
|---|
| 248 | const int64_t rel2 = r2->totBytes - r2->relBytes; | 
|---|
| 249 | return rel1 > rel2; | 
|---|
| 250 | } | 
|---|
| 251 | }; | 
|---|
| 252 | #endif | 
|---|
| 253 |  | 
|---|
| 254 | int READ_STRUCT::wait() | 
|---|
| 255 | { | 
|---|
| 256 | // wait for something to do... | 
|---|
| 257 | const int rc = epoll_wait(fd_epoll, events, NBOARDS, 100); // max, timeout[ms] | 
|---|
| 258 | if (rc>=0) | 
|---|
| 259 | return rc; | 
|---|
| 260 |  | 
|---|
| 261 | if (errno==EINTR) // timout or signal interruption | 
|---|
| 262 | return 0; | 
|---|
| 263 |  | 
|---|
| 264 | factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno); | 
|---|
| 265 | return -1; | 
|---|
| 266 | } | 
|---|
| 267 |  | 
|---|
| 268 | uint READ_STRUCT::activeSockets = 0; | 
|---|
| 269 | int READ_STRUCT::fd_epoll = -1; | 
|---|
| 270 | epoll_event READ_STRUCT::events[NBOARDS]; | 
|---|
| 271 |  | 
|---|
| 272 | void READ_STRUCT::init() | 
|---|
| 273 | { | 
|---|
| 274 | if (fd_epoll>=0) | 
|---|
| 275 | return; | 
|---|
| 276 |  | 
|---|
| 277 | #ifdef USE_EPOLL | 
|---|
| 278 | fd_epoll = epoll_create(NBOARDS); | 
|---|
| 279 | if (fd_epoll<0) | 
|---|
| 280 | { | 
|---|
| 281 | factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno); | 
|---|
| 282 | return; | 
|---|
| 283 | } | 
|---|
| 284 | #endif | 
|---|
| 285 | } | 
|---|
| 286 |  | 
|---|
| 287 | void READ_STRUCT::close() | 
|---|
| 288 | { | 
|---|
| 289 | #ifdef USE_EPOLL | 
|---|
| 290 | if (fd_epoll>=0 && ::close(fd_epoll)>0) | 
|---|
| 291 | factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno); | 
|---|
| 292 | #endif | 
|---|
| 293 |  | 
|---|
| 294 | fd_epoll = -1; | 
|---|
| 295 | } | 
|---|
| 296 |  | 
|---|
| 297 | bool READ_STRUCT::create(sockaddr_in sockAddr) | 
|---|
| 298 | { | 
|---|
| 299 | if (socket>=0) | 
|---|
| 300 | return false; | 
|---|
| 301 |  | 
|---|
| 302 | const int port = ntohs(sockAddr.sin_port) + 1; | 
|---|
| 303 |  | 
|---|
| 304 | SockAddr.sin_family = sockAddr.sin_family; | 
|---|
| 305 | SockAddr.sin_addr   = sockAddr.sin_addr; | 
|---|
| 306 | SockAddr.sin_port   = htons(port); | 
|---|
| 307 |  | 
|---|
| 308 | if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) | 
|---|
| 309 | { | 
|---|
| 310 | factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno); | 
|---|
| 311 | socket = -1; | 
|---|
| 312 | return false; | 
|---|
| 313 | } | 
|---|
| 314 |  | 
|---|
| 315 | int optval = 1; | 
|---|
| 316 | if (setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(int)) < 0) | 
|---|
| 317 | factPrintf(MessageImp::kInfo, "Setting TCP_NODELAY for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); | 
|---|
| 318 |  | 
|---|
| 319 | optval = 1; | 
|---|
| 320 | if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) | 
|---|
| 321 | factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); | 
|---|
| 322 |  | 
|---|
| 323 | optval = 10;                 //start after 10 seconds | 
|---|
| 324 | if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) | 
|---|
| 325 | factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); | 
|---|
| 326 |  | 
|---|
| 327 | optval = 10;                 //do every 10 seconds | 
|---|
| 328 | if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) | 
|---|
| 329 | factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); | 
|---|
| 330 |  | 
|---|
| 331 | optval = 2;                  //close after 2 unsuccessful tries | 
|---|
| 332 | if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) | 
|---|
| 333 | factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); | 
|---|
| 334 |  | 
|---|
| 335 | factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket); | 
|---|
| 336 |  | 
|---|
| 337 | //connected = false; | 
|---|
| 338 | activeSockets++; | 
|---|
| 339 |  | 
|---|
| 340 | return true; | 
|---|
| 341 | } | 
|---|
| 342 |  | 
|---|
| 343 | void READ_STRUCT::destroy() | 
|---|
| 344 | { | 
|---|
| 345 | if (socket<0) | 
|---|
| 346 | return; | 
|---|
| 347 |  | 
|---|
| 348 | #ifdef USE_EPOLL | 
|---|
| 349 | // strictly speaking this should not be necessary | 
|---|
| 350 | if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0) | 
|---|
| 351 | factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno); | 
|---|
| 352 | #endif | 
|---|
| 353 |  | 
|---|
| 354 | if (::close(socket) > 0) | 
|---|
| 355 | factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno); | 
|---|
| 356 | else | 
|---|
| 357 | factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket); | 
|---|
| 358 |  | 
|---|
| 359 | // Set the socket to "not connected" | 
|---|
| 360 | socket = -1; | 
|---|
| 361 | connected = false; | 
|---|
| 362 | activeSockets--; | 
|---|
| 363 | bufLen = 0; | 
|---|
| 364 | } | 
|---|
| 365 |  | 
|---|
| 366 | bool READ_STRUCT::check(int sockDef, sockaddr_in addr) | 
|---|
| 367 | { | 
|---|
| 368 | // Continue in the most most likely case (performance) | 
|---|
| 369 | //if (socket>=0 && sockDef!=0 && connected) | 
|---|
| 370 | //    return; | 
|---|
| 371 | const int old = socket; | 
|---|
| 372 |  | 
|---|
| 373 | // socket open, but should not be open | 
|---|
| 374 | if (socket>=0 && sockDef==0) | 
|---|
| 375 | destroy(); | 
|---|
| 376 |  | 
|---|
| 377 | // Socket closed, but should be open | 
|---|
| 378 | if (socket<0 && sockDef!=0) | 
|---|
| 379 | create(addr); //generate address and socket | 
|---|
| 380 |  | 
|---|
| 381 | const bool retval = old!=socket; | 
|---|
| 382 |  | 
|---|
| 383 | // Socket closed | 
|---|
| 384 | if (socket<0) | 
|---|
| 385 | return retval; | 
|---|
| 386 |  | 
|---|
| 387 | // Socket open and connected: Nothing to do | 
|---|
| 388 | if (connected) | 
|---|
| 389 | return retval; | 
|---|
| 390 |  | 
|---|
| 391 | //try to connect if not yet done | 
|---|
| 392 | const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr)); | 
|---|
| 393 | if (rc == -1) | 
|---|
| 394 | return retval; | 
|---|
| 395 |  | 
|---|
| 396 | connected = true; | 
|---|
| 397 |  | 
|---|
| 398 | if (sockDef<0) | 
|---|
| 399 | { | 
|---|
| 400 | bufTyp = READ_STRUCT::kStream; // full data to be skipped | 
|---|
| 401 | bufLen = MAX_LEN;              // huge for skipping | 
|---|
| 402 | } | 
|---|
| 403 | else | 
|---|
| 404 | { | 
|---|
| 405 | bufTyp = READ_STRUCT::kHeader;  // expect a header | 
|---|
| 406 | bufLen = sizeof(PEVNT_HEADER);  // max size to read at begining | 
|---|
| 407 | } | 
|---|
| 408 |  | 
|---|
| 409 | bufPos   = B;  // no byte read so far | 
|---|
| 410 | skip     = 0;  // start empty | 
|---|
| 411 | totBytes = 0; | 
|---|
| 412 | relBytes = 0; | 
|---|
| 413 |  | 
|---|
| 414 | factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket); | 
|---|
| 415 |  | 
|---|
| 416 | #ifdef USE_EPOLL | 
|---|
| 417 | epoll_event ev; | 
|---|
| 418 | ev.events = EPOLLIN; | 
|---|
| 419 | ev.data.ptr = this;  // user data (union: ev.ptr) | 
|---|
| 420 | if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0) | 
|---|
| 421 | factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno); | 
|---|
| 422 | #endif | 
|---|
| 423 |  | 
|---|
| 424 | return retval; | 
|---|
| 425 | } | 
|---|
| 426 |  | 
|---|
| 427 | bool READ_STRUCT::read() | 
|---|
| 428 | { | 
|---|
| 429 | if (!connected) | 
|---|
| 430 | return false; | 
|---|
| 431 |  | 
|---|
| 432 | if (bufLen==0) | 
|---|
| 433 | return true; | 
|---|
| 434 |  | 
|---|
| 435 | const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT); | 
|---|
| 436 | // recv failed | 
|---|
| 437 | if (jrd<0) | 
|---|
| 438 | { | 
|---|
| 439 | // There was just nothing waiting | 
|---|
| 440 | if (errno==EWOULDBLOCK || errno==EAGAIN) | 
|---|
| 441 | return false; | 
|---|
| 442 |  | 
|---|
| 443 | factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno); | 
|---|
| 444 | return false; | 
|---|
| 445 | } | 
|---|
| 446 |  | 
|---|
| 447 | // connection was closed ... | 
|---|
| 448 | if (jrd==0) | 
|---|
| 449 | { | 
|---|
| 450 | factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId); | 
|---|
| 451 |  | 
|---|
| 452 | destroy();//DestroySocket(rd[i]); //generate address and socket | 
|---|
| 453 | return false; | 
|---|
| 454 | } | 
|---|
| 455 |  | 
|---|
| 456 | totBytes += jrd; | 
|---|
| 457 |  | 
|---|
| 458 | // are we skipping this board ... | 
|---|
| 459 | if (bufTyp==kStream) | 
|---|
| 460 | return false; | 
|---|
| 461 |  | 
|---|
| 462 | if (bufPos==B) | 
|---|
| 463 | gettimeofday(&time, NULL); | 
|---|
| 464 |  | 
|---|
| 465 | bufPos += jrd;  //==> prepare for continuation | 
|---|
| 466 | bufLen -= jrd; | 
|---|
| 467 |  | 
|---|
| 468 | // not yet all read | 
|---|
| 469 | return bufLen==0; | 
|---|
| 470 | } | 
|---|
| 471 |  | 
|---|
| 472 | void READ_STRUCT::swapHeader() | 
|---|
| 473 | { | 
|---|
| 474 | S[1]  = ntohs(S[1]);    // package_length (bytes not swapped!) | 
|---|
| 475 | S[2]  = ntohs(S[2]);    // version_no | 
|---|
| 476 | S[3]  = ntohs(S[3]);    // PLLLCK | 
|---|
| 477 | S[4]  = ntohs(S[4]);    // trigger_crc | 
|---|
| 478 | S[5]  = ntohs(S[5]);    // trigger_type | 
|---|
| 479 |  | 
|---|
| 480 | I[3]  = ntohl(I[3]);    // trigger_id | 
|---|
| 481 | I[4]  = ntohl(I[4]);    // fad_evt_counter | 
|---|
| 482 | I[5]  = ntohl(I[5]);    // REFCLK_frequency | 
|---|
| 483 |  | 
|---|
| 484 | S[12] = ntohs(S[12]);   // board id | 
|---|
| 485 | S[13] = ntohs(S[13]);   // adc_clock_phase_shift | 
|---|
| 486 | S[14] = ntohs(S[14]);   // number_of_triggers_to_generate | 
|---|
| 487 | S[15] = ntohs(S[15]);   // trigger_generator_prescaler | 
|---|
| 488 |  | 
|---|
| 489 | I[10] = ntohl(I[10]);   // runnumber; | 
|---|
| 490 | I[11] = ntohl(I[11]);   // time; | 
|---|
| 491 |  | 
|---|
| 492 | // Use back inserter?? | 
|---|
| 493 | for (int s=24; s<24+NTemp+NDAC; s++) | 
|---|
| 494 | S[s] = ntohs(S[s]); // drs_temperature / dac | 
|---|
| 495 | } | 
|---|
| 496 |  | 
|---|
| 497 | void READ_STRUCT::swapData() | 
|---|
| 498 | { | 
|---|
| 499 | // swapEventHeaderBytes: End of the header. to channels now | 
|---|
| 500 |  | 
|---|
| 501 | int i = 36; | 
|---|
| 502 | for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++) | 
|---|
| 503 | { | 
|---|
| 504 | S[i+0] = ntohs(S[i+0]);//id | 
|---|
| 505 | S[i+1] = ntohs(S[i+1]);//start_cell | 
|---|
| 506 | S[i+2] = ntohs(S[i+2]);//roi | 
|---|
| 507 | S[i+3] = ntohs(S[i+3]);//filling | 
|---|
| 508 |  | 
|---|
| 509 | i += 4+S[i+2];//skip the pixel data | 
|---|
| 510 | } | 
|---|
| 511 | } | 
|---|
| 512 |  | 
|---|
| 513 | // ========================================================================== | 
|---|
| 514 |  | 
|---|
| 515 | bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[]) | 
|---|
| 516 | { | 
|---|
| 517 | int xjr = -1; | 
|---|
| 518 | int xkr = -1; | 
|---|
| 519 |  | 
|---|
| 520 | //points to the very first roi | 
|---|
| 521 | int roiPtr = sizeof(PEVNT_HEADER)/2 + 2; | 
|---|
| 522 |  | 
|---|
| 523 | roi[0] = ntohs(rd.S[roiPtr]); | 
|---|
| 524 |  | 
|---|
| 525 | for (int jr = 0; jr < 9; jr++) | 
|---|
| 526 | { | 
|---|
| 527 | roi[jr] = ntohs(rd.S[roiPtr]); | 
|---|
| 528 |  | 
|---|
| 529 | if (roi[jr]>1024) | 
|---|
| 530 | { | 
|---|
| 531 | factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]); | 
|---|
| 532 | return false; | 
|---|
| 533 | } | 
|---|
| 534 |  | 
|---|
| 535 | // Check that the roi of pixels jr are compatible with the one of pixel 0 | 
|---|
| 536 | if (jr!=8 && roi[jr]!=roi[0]) | 
|---|
| 537 | { | 
|---|
| 538 | xjr = jr; | 
|---|
| 539 | break; | 
|---|
| 540 | } | 
|---|
| 541 |  | 
|---|
| 542 | // Check that the roi of all other DRS chips on boards are compatible | 
|---|
| 543 | for (int kr = 1; kr < 4; kr++) | 
|---|
| 544 | { | 
|---|
| 545 | const int kroi = ntohs(rd.S[roiPtr]); | 
|---|
| 546 | if (kroi != roi[jr]) | 
|---|
| 547 | { | 
|---|
| 548 | xjr = jr; | 
|---|
| 549 | xkr = kr; | 
|---|
| 550 | break; | 
|---|
| 551 | } | 
|---|
| 552 | roiPtr += kroi+4; | 
|---|
| 553 | } | 
|---|
| 554 | } | 
|---|
| 555 |  | 
|---|
| 556 | if (xjr>=0) | 
|---|
| 557 | { | 
|---|
| 558 | if (xkr<0) | 
|---|
| 559 | factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]); | 
|---|
| 560 | else | 
|---|
| 561 | factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr])); | 
|---|
| 562 |  | 
|---|
| 563 | return false; | 
|---|
| 564 | } | 
|---|
| 565 |  | 
|---|
| 566 | if (roi[8] < roi[0]) | 
|---|
| 567 | { | 
|---|
| 568 | factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]); | 
|---|
| 569 | return false; | 
|---|
| 570 | } | 
|---|
| 571 |  | 
|---|
| 572 | return true; | 
|---|
| 573 | } | 
|---|
| 574 |  | 
|---|
| 575 | list<shared_ptr<EVT_CTRL2>> evtCtrl; | 
|---|
| 576 |  | 
|---|
| 577 | shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun) | 
|---|
| 578 | { | 
|---|
| 579 | /* | 
|---|
| 580 | checkroi consistence | 
|---|
| 581 | find existing entry | 
|---|
| 582 | if no entry, try to allocate memory | 
|---|
| 583 | if entry and memory, init event structure | 
|---|
| 584 | */ | 
|---|
| 585 |  | 
|---|
| 586 | uint16_t nRoi[9]; | 
|---|
| 587 | if (!checkRoiConsistency(rd, nRoi)) | 
|---|
| 588 | return shared_ptr<EVT_CTRL2>(); | 
|---|
| 589 |  | 
|---|
| 590 | for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++) | 
|---|
| 591 | { | 
|---|
| 592 | // A reference is enough because the evtCtrl holds the shared_ptr anyway | 
|---|
| 593 | const shared_ptr<EVT_CTRL2> &evt = *it; | 
|---|
| 594 |  | 
|---|
| 595 | // If the run is different, go on searching. | 
|---|
| 596 | // We cannot stop searching if a lower run-id is found as in | 
|---|
| 597 | // the case of the events, because theoretically, there | 
|---|
| 598 | // can be the same run on two different days. | 
|---|
| 599 | if (rd.H.runnumber != evt->runNum) | 
|---|
| 600 | continue; | 
|---|
| 601 |  | 
|---|
| 602 | // If the ID of the new event if higher than the last one stored | 
|---|
| 603 | // in that run, we have to assign a new slot (leave the loop) | 
|---|
| 604 | if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/) | 
|---|
| 605 | break; | 
|---|
| 606 |  | 
|---|
| 607 | if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/) | 
|---|
| 608 | continue; | 
|---|
| 609 |  | 
|---|
| 610 | // We have found an entry with the same runID and evtID | 
|---|
| 611 | // Check if ROI is consistent | 
|---|
| 612 | if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8]) | 
|---|
| 613 | { | 
|---|
| 614 | factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.", | 
|---|
| 615 | evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]); | 
|---|
| 616 | return shared_ptr<EVT_CTRL2>(); | 
|---|
| 617 | } | 
|---|
| 618 |  | 
|---|
| 619 | // It is maybe not likely, but the header of this board might have | 
|---|
| 620 | // arrived earlier. (We could also update the run-info, but | 
|---|
| 621 | // this should not make a difference here) | 
|---|
| 622 | if ((rd.time.tv_sec==evt->time.tv_sec && rd.time.tv_usec<evt->time.tv_usec) || | 
|---|
| 623 | rd.time.tv_sec<evt->time.tv_sec) | 
|---|
| 624 | evt->time = rd.time; | 
|---|
| 625 |  | 
|---|
| 626 | //everything seems fine so far ==> use this slot .... | 
|---|
| 627 | return evt; | 
|---|
| 628 | } | 
|---|
| 629 |  | 
|---|
| 630 | if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8])) | 
|---|
| 631 | { | 
|---|
| 632 | factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)", | 
|---|
| 633 | actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter); | 
|---|
| 634 | return shared_ptr<EVT_CTRL2>(); | 
|---|
| 635 | } | 
|---|
| 636 |  | 
|---|
| 637 | EVT_CTRL2 *evt = new EVT_CTRL2; | 
|---|
| 638 |  | 
|---|
| 639 | evt->time   = rd.time; | 
|---|
| 640 |  | 
|---|
| 641 | evt->runNum = rd.H.runnumber; | 
|---|
| 642 | evt->evNum  = rd.H.fad_evt_counter; | 
|---|
| 643 |  | 
|---|
| 644 | evt->trgNum = rd.H.trigger_id; | 
|---|
| 645 | evt->trgTyp = rd.H.trigger_type; | 
|---|
| 646 |  | 
|---|
| 647 | evt->nRoi   = nRoi[0]; | 
|---|
| 648 | evt->nRoiTM = nRoi[8]; | 
|---|
| 649 |  | 
|---|
| 650 | //evt->firstBoard = rd.sockId; | 
|---|
| 651 |  | 
|---|
| 652 | const bool newrun = actrun->runId != rd.H.runnumber; | 
|---|
| 653 | if (newrun) | 
|---|
| 654 | { | 
|---|
| 655 | // Since we have started a new run, we know already when to close the | 
|---|
| 656 | // previous run in terms of number of events | 
|---|
| 657 | actrun->maxEvt = actrun->lastEvt; | 
|---|
| 658 |  | 
|---|
| 659 | factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d", | 
|---|
| 660 | rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId); | 
|---|
| 661 |  | 
|---|
| 662 | // The new run is the active run now | 
|---|
| 663 | actrun = make_shared<RUN_CTRL2>(); | 
|---|
| 664 |  | 
|---|
| 665 | const time_t &tsec = evt->time.tv_sec; | 
|---|
| 666 |  | 
|---|
| 667 | actrun->openTime  = tsec; | 
|---|
| 668 | actrun->closeTime = tsec + 3600 * 24; // max time allowed | 
|---|
| 669 | actrun->runId     = rd.H.runnumber; | 
|---|
| 670 | actrun->roi0      = nRoi[0];  // FIXME: Make obsolete! | 
|---|
| 671 | actrun->roi8      = nRoi[8];  // FIXME: Make obsolete! | 
|---|
| 672 |  | 
|---|
| 673 | // Signal the fadctrl that a new run has been started | 
|---|
| 674 | // Note this is the only place at which we can ensure that | 
|---|
| 675 | // gotnewRun is called only once | 
|---|
| 676 | gotNewRun(*actrun); | 
|---|
| 677 | } | 
|---|
| 678 |  | 
|---|
| 679 | // Keep pointer to run of this event | 
|---|
| 680 | evt->runCtrl = actrun; | 
|---|
| 681 |  | 
|---|
| 682 | // Increase the number of events we have started to receive in this run | 
|---|
| 683 | actrun->lastTime = evt->time.tv_sec;  // Time when the last event was received | 
|---|
| 684 | actrun->lastEvt++; | 
|---|
| 685 |  | 
|---|
| 686 | // An event can be the first and the last, but not the last and the first. | 
|---|
| 687 | // Therefore gotNewRun is called before runFinished. | 
|---|
| 688 | // runFinished signals that the last event of a run was just received. Processing | 
|---|
| 689 | // might still be ongoing, but we can start a new run. | 
|---|
| 690 | const bool cond1 = actrun->lastEvt  < actrun->maxEvt;     // max number of events not reached | 
|---|
| 691 | const bool cond2 = actrun->lastTime < actrun->closeTime;  // max time not reached | 
|---|
| 692 | if (!cond1 || !cond2) | 
|---|
| 693 | runFinished(); | 
|---|
| 694 |  | 
|---|
| 695 | // We don't mind here that this is not common to all events, | 
|---|
| 696 | // because every coming event will fullfil the condition as well. | 
|---|
| 697 | if (!cond1) | 
|---|
| 698 | evt->closeRequest |= kRequestMaxEvtsReached; | 
|---|
| 699 | if (!cond2) | 
|---|
| 700 | evt->closeRequest |= kRequestMaxTimeReached; | 
|---|
| 701 |  | 
|---|
| 702 | // Secure access to evtCtrl against access in CloseRunFile | 
|---|
| 703 | // This should be the last... otherwise we can run into threading issues | 
|---|
| 704 | // if the event is accessed before it is fully initialized. | 
|---|
| 705 | evtCtrl.emplace_back(evt); | 
|---|
| 706 | return evtCtrl.back(); | 
|---|
| 707 | } | 
|---|
| 708 |  | 
|---|
| 709 |  | 
|---|
| 710 | void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt) | 
|---|
| 711 | { | 
|---|
| 712 | const int i = rBuf.sockId; | 
|---|
| 713 |  | 
|---|
| 714 | memcpy(evt->FADhead+i, &rBuf.H, sizeof(PEVNT_HEADER)); | 
|---|
| 715 |  | 
|---|
| 716 | int src = sizeof(PEVNT_HEADER) / 2;  // Header is 72 byte = 36 shorts | 
|---|
| 717 |  | 
|---|
| 718 | // consistency of ROIs have been checked already (is it all correct?) | 
|---|
| 719 | const uint16_t &roi = rBuf.S[src+2]; | 
|---|
| 720 |  | 
|---|
| 721 | // different sort in FAD board..... | 
|---|
| 722 | EVENT *event = evt->fEvent; | 
|---|
| 723 | for (int px = 0; px < 9; px++) | 
|---|
| 724 | { | 
|---|
| 725 | for (int drs = 0; drs < 4; drs++) | 
|---|
| 726 | { | 
|---|
| 727 | const int16_t pixC = rBuf.S[src+1];    // start-cell | 
|---|
| 728 | const int16_t pixR = rBuf.S[src+2];    // roi | 
|---|
| 729 | //here we should check if pixH is correct .... | 
|---|
| 730 |  | 
|---|
| 731 | const int pixS = i*36 + drs*9 + px; | 
|---|
| 732 |  | 
|---|
| 733 | event->StartPix[pixS] = pixC; | 
|---|
| 734 |  | 
|---|
| 735 | memcpy(event->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2); | 
|---|
| 736 |  | 
|---|
| 737 | src += 4+pixR; | 
|---|
| 738 |  | 
|---|
| 739 | // Treatment for ch 9 (TM channel) | 
|---|
| 740 | if (px != 8) | 
|---|
| 741 | continue; | 
|---|
| 742 |  | 
|---|
| 743 | const int tmS = i*4 + drs; | 
|---|
| 744 |  | 
|---|
| 745 | //and we have additional TM info | 
|---|
| 746 | if (pixR > roi) | 
|---|
| 747 | { | 
|---|
| 748 | event->StartTM[tmS] = (pixC + pixR - roi) % 1024; | 
|---|
| 749 |  | 
|---|
| 750 | memcpy(event->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2); | 
|---|
| 751 | } | 
|---|
| 752 | else | 
|---|
| 753 | { | 
|---|
| 754 | event->StartTM[tmS] = -1; | 
|---|
| 755 | } | 
|---|
| 756 | } | 
|---|
| 757 | } | 
|---|
| 758 | } | 
|---|
| 759 |  | 
|---|
| 760 | // ========================================================================== | 
|---|
| 761 |  | 
|---|
| 762 | uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt) | 
|---|
| 763 | { | 
|---|
| 764 | factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)", | 
|---|
| 765 | evt->runNum, evt->evNum, evtCtrl.size(), txt); | 
|---|
| 766 |  | 
|---|
| 767 | uint64_t report = 0; | 
|---|
| 768 |  | 
|---|
| 769 | char str[1000]; | 
|---|
| 770 |  | 
|---|
| 771 | int ik=0; | 
|---|
| 772 | for (int ib=0; ib<NBOARDS; ib++) | 
|---|
| 773 | { | 
|---|
| 774 | if (ib%10==0) | 
|---|
| 775 | str[ik++] = '|'; | 
|---|
| 776 |  | 
|---|
| 777 | const int jb = evt->board[ib]; | 
|---|
| 778 | if (jb>=0) // data received from that board | 
|---|
| 779 | { | 
|---|
| 780 | str[ik++] = '0'+(jb%10); | 
|---|
| 781 | continue; | 
|---|
| 782 | } | 
|---|
| 783 |  | 
|---|
| 784 | // FIXME: This is not synchronous... it reports | 
|---|
| 785 | // accoridng to the current connection status, not w.r.t. to the | 
|---|
| 786 | // one when the event was taken. | 
|---|
| 787 | if (gi_NumConnect[ib]==0) // board not connected | 
|---|
| 788 | { | 
|---|
| 789 | str[ik++] = 'x'; | 
|---|
| 790 | continue; | 
|---|
| 791 | } | 
|---|
| 792 |  | 
|---|
| 793 | // data from this board lost | 
|---|
| 794 | str[ik++] = '.'; | 
|---|
| 795 | report |= ((uint64_t)1)<<ib; | 
|---|
| 796 | } | 
|---|
| 797 |  | 
|---|
| 798 | str[ik++] = '|'; | 
|---|
| 799 | str[ik]   = 0; | 
|---|
| 800 |  | 
|---|
| 801 | factOut(MessageImp::kWarn, str); | 
|---|
| 802 |  | 
|---|
| 803 | return report; | 
|---|
| 804 | } | 
|---|
| 805 |  | 
|---|
| 806 | // ========================================================================== | 
|---|
| 807 | // ========================================================================== | 
|---|
| 808 |  | 
|---|
| 809 | bool proc1(const shared_ptr<EVT_CTRL2> &); | 
|---|
| 810 |  | 
|---|
| 811 | Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&proc1, placeholders::_1)); | 
|---|
| 812 |  | 
|---|
| 813 | bool proc1(const shared_ptr<EVT_CTRL2> &evt) | 
|---|
| 814 | { | 
|---|
| 815 | applyCalib(*evt, processingQueue1.size()); | 
|---|
| 816 | return true; | 
|---|
| 817 | } | 
|---|
| 818 |  | 
|---|
| 819 | // If this is not convenient anymore, it could be replaced by | 
|---|
| 820 | // a command queue, to which command+data is posted, | 
|---|
| 821 | // (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo) | 
|---|
| 822 | bool writeEvt(const shared_ptr<EVT_CTRL2> &evt) | 
|---|
| 823 | { | 
|---|
| 824 | //const shared_ptr<RUN_CTRL2> &run = evt->runCtrl; | 
|---|
| 825 | RUN_CTRL2 &run = *evt->runCtrl; | 
|---|
| 826 |  | 
|---|
| 827 | // Is this a valid event or just an empty event to trigger run close? | 
|---|
| 828 | // If this is not an empty event open the new run-file | 
|---|
| 829 | // Empty events are there to trigger run-closing conditions | 
|---|
| 830 | if (evt->valid()) | 
|---|
| 831 | { | 
|---|
| 832 | // File not yet open | 
|---|
| 833 | if (run.fileStat==kFileNotYetOpen) | 
|---|
| 834 | { | 
|---|
| 835 | // runOpen will close a previous run, if still open | 
|---|
| 836 | if (!runOpen(*evt)) | 
|---|
| 837 | { | 
|---|
| 838 | factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum); | 
|---|
| 839 | run.fileStat = kFileClosed; | 
|---|
| 840 | return true; | 
|---|
| 841 | } | 
|---|
| 842 |  | 
|---|
| 843 | factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum); | 
|---|
| 844 | run.fileStat = kFileOpen; | 
|---|
| 845 | } | 
|---|
| 846 |  | 
|---|
| 847 | // Here we have a valid calibration and can go on with that. | 
|---|
| 848 | // It is important that _all_ events are sent for calibration (except broken ones) | 
|---|
| 849 | processingQueue1.post(evt); | 
|---|
| 850 | } | 
|---|
| 851 |  | 
|---|
| 852 | // File already closed | 
|---|
| 853 | if (run.fileStat==kFileClosed) | 
|---|
| 854 | return true; | 
|---|
| 855 |  | 
|---|
| 856 | // If we will have a software trigger which prevents single events from writing, | 
|---|
| 857 | // the logic of writing the stop time and the trigger counters need to be adapted. | 
|---|
| 858 | // Currently it is just the values of the last valid event. | 
|---|
| 859 | bool rc1 = true; | 
|---|
| 860 | if (evt->valid()) | 
|---|
| 861 | { | 
|---|
| 862 | rc1 = runWrite(*evt); | 
|---|
| 863 | if (!rc1) | 
|---|
| 864 | factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum); | 
|---|
| 865 | } | 
|---|
| 866 |  | 
|---|
| 867 | // File not open... no need to close or to check for close | 
|---|
| 868 | // ... this is the case if CloseRunFile was called before any file was opened. | 
|---|
| 869 | if (run.fileStat!=kFileOpen) | 
|---|
| 870 | return true; | 
|---|
| 871 |  | 
|---|
| 872 | // File is not yet to be closed. | 
|---|
| 873 | if (rc1 && evt->closeRequest==kRequestNone) | 
|---|
| 874 | return true; | 
|---|
| 875 |  | 
|---|
| 876 | runClose(*evt); | 
|---|
| 877 | run.fileStat = kFileClosed; | 
|---|
| 878 |  | 
|---|
| 879 | vector<string> reason; | 
|---|
| 880 | if (evt->closeRequest&kRequestManual) | 
|---|
| 881 | reason.emplace_back("close requested"); | 
|---|
| 882 | if (evt->closeRequest&kRequestTimeout) | 
|---|
| 883 | reason.emplace_back("receive timeout"); | 
|---|
| 884 | if (evt->closeRequest&kRequestConnectionChange) | 
|---|
| 885 | reason.emplace_back("connection changed"); | 
|---|
| 886 | if (evt->closeRequest&kRequestEventCheckFailed) | 
|---|
| 887 | reason.emplace_back("event check failed"); | 
|---|
| 888 | if (evt->closeRequest&kRequestMaxTimeReached) | 
|---|
| 889 | reason.push_back(to_string(run.closeTime-run.openTime)+"s reached"); | 
|---|
| 890 | if (evt->closeRequest&kRequestMaxEvtsReached) | 
|---|
| 891 | reason.push_back(to_string(run.maxEvt)+" evts reached"); | 
|---|
| 892 | if (!rc1) | 
|---|
| 893 | reason.emplace_back("runWrite failed"); | 
|---|
| 894 |  | 
|---|
| 895 | const string str = boost::algorithm::join(reason, ", "); | 
|---|
| 896 | factPrintf(MessageImp::kInfo, "File closed because %s",  str.c_str()); | 
|---|
| 897 |  | 
|---|
| 898 | return true; | 
|---|
| 899 | } | 
|---|
| 900 |  | 
|---|
| 901 | Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1)); | 
|---|
| 902 |  | 
|---|
| 903 | bool procEvt(const shared_ptr<EVT_CTRL2> &evt) | 
|---|
| 904 | { | 
|---|
| 905 | RUN_CTRL2 &run = *evt->runCtrl; | 
|---|
| 906 |  | 
|---|
| 907 | bool check = true; | 
|---|
| 908 | if (evt->valid()) | 
|---|
| 909 | { | 
|---|
| 910 | EVENT *event = evt->fEvent; | 
|---|
| 911 |  | 
|---|
| 912 | // This is already done in initMemory() | 
|---|
| 913 | //event->Roi         = evt->runCtrl->roi0; | 
|---|
| 914 | //event->RoiTM       = evt->runCtrl->roi8; | 
|---|
| 915 | //event->EventNum    = evt->evNum; | 
|---|
| 916 | //event->TriggerNum  = evt->trgNum; | 
|---|
| 917 | //event->TriggerType = evt->trgTyp; | 
|---|
| 918 |  | 
|---|
| 919 | event->NumBoards = evt->nBoard; | 
|---|
| 920 |  | 
|---|
| 921 | event->PCTime = evt->time.tv_sec; | 
|---|
| 922 | event->PCUsec = evt->time.tv_usec; | 
|---|
| 923 |  | 
|---|
| 924 | for (int ib=0; ib<NBOARDS; ib++) | 
|---|
| 925 | event->BoardTime[ib] = evt->FADhead[ib].time; | 
|---|
| 926 |  | 
|---|
| 927 | check = eventCheck(*evt); | 
|---|
| 928 |  | 
|---|
| 929 | // If the event is valid, increase the trigger counter accordingly | 
|---|
| 930 | if (check) | 
|---|
| 931 | { | 
|---|
| 932 | // Physics trigger | 
|---|
| 933 | if (evt->trgTyp && !(evt->trgTyp & FAD::EventHeader::kAll)) | 
|---|
| 934 | run.triggerCounter[0]++; | 
|---|
| 935 | // Pure pedestal trigger | 
|---|
| 936 | else  if ((evt->trgTyp&FAD::EventHeader::kPedestal) && !(evt->trgTyp&FAD::EventHeader::kTIM)) | 
|---|
| 937 | run.triggerCounter[1]++; | 
|---|
| 938 | // external light pulser trigger | 
|---|
| 939 | else if (evt->trgTyp & FAD::EventHeader::kLPext) | 
|---|
| 940 | run.triggerCounter[2]++; | 
|---|
| 941 | // time calibration triggers | 
|---|
| 942 | else if (evt->trgTyp & (FAD::EventHeader::kTIM|FAD::EventHeader::kPedestal)) | 
|---|
| 943 | run.triggerCounter[3]++; | 
|---|
| 944 | // internal light pulser trigger | 
|---|
| 945 | else if (evt->trgTyp & FAD::EventHeader::kLPint) | 
|---|
| 946 | run.triggerCounter[4]++; | 
|---|
| 947 | // external trigger input 1 | 
|---|
| 948 | else if (evt->trgTyp & FAD::EventHeader::kExt1) | 
|---|
| 949 | run.triggerCounter[5]++; | 
|---|
| 950 | // external trigger input 2 | 
|---|
| 951 | else if (evt->trgTyp & FAD::EventHeader::kExt2) | 
|---|
| 952 | run.triggerCounter[6]++; | 
|---|
| 953 | // other triggers | 
|---|
| 954 | else | 
|---|
| 955 | run.triggerCounter[7]++; | 
|---|
| 956 | } | 
|---|
| 957 | } | 
|---|
| 958 |  | 
|---|
| 959 | // If this is an invalid event, the current triggerCounter needs to be copied | 
|---|
| 960 | // because runClose will use that one to update the TRIGGER_COUNTER. | 
|---|
| 961 | // When closing the file, the trigger counter of the last successfully | 
|---|
| 962 | // written event is used. | 
|---|
| 963 | evt->triggerCounter = run.triggerCounter; | 
|---|
| 964 |  | 
|---|
| 965 | // If event check has failed, skip the event and post a close request instead. | 
|---|
| 966 | // Otherwise, if file is open post the event for being written | 
|---|
| 967 | if (!check) | 
|---|
| 968 | secondaryQueue.emplace(new EVT_CTRL2(kRequestEventCheckFailed, evt->runCtrl)); | 
|---|
| 969 | else | 
|---|
| 970 | secondaryQueue.post(evt); | 
|---|
| 971 |  | 
|---|
| 972 | return true; | 
|---|
| 973 | } | 
|---|
| 974 |  | 
|---|
| 975 | // ========================================================================== | 
|---|
| 976 | // ========================================================================== | 
|---|
| 977 |  | 
|---|
| 978 | /* | 
|---|
| 979 | task 1-4: | 
|---|
| 980 |  | 
|---|
| 981 | lock1()-lock4(); | 
|---|
| 982 | while (1) | 
|---|
| 983 | { | 
|---|
| 984 | wait for signal [lockN];  // unlocked | 
|---|
| 985 |  | 
|---|
| 986 | while (n!=10) | 
|---|
| 987 | wait sockets; | 
|---|
| 988 | read; | 
|---|
| 989 |  | 
|---|
| 990 | lockM(); | 
|---|
| 991 | finished[n] = true; | 
|---|
| 992 | signal(mainloop); | 
|---|
| 993 | unlockM(); | 
|---|
| 994 | } | 
|---|
| 995 |  | 
|---|
| 996 |  | 
|---|
| 997 | mainloop: | 
|---|
| 998 |  | 
|---|
| 999 | while (1) | 
|---|
| 1000 | { | 
|---|
| 1001 | lockM(); | 
|---|
| 1002 | while (!finished[0] || !finished[1] ...) | 
|---|
| 1003 | wait for signal [lockM];  // unlocked... signals can be sent | 
|---|
| 1004 | finished[0-1] = false; | 
|---|
| 1005 | unlockM() | 
|---|
| 1006 |  | 
|---|
| 1007 | copy data to queue    // locked | 
|---|
| 1008 |  | 
|---|
| 1009 | lockN[0-3]; | 
|---|
| 1010 | signalN[0-3]; | 
|---|
| 1011 | unlockN[0-3]; | 
|---|
| 1012 | } | 
|---|
| 1013 |  | 
|---|
| 1014 |  | 
|---|
| 1015 | */ | 
|---|
| 1016 |  | 
|---|
| 1017 | /* | 
|---|
| 1018 | while (g_reset) | 
|---|
| 1019 | { | 
|---|
| 1020 | shared_ptr<EVT_CTRL2> evt = new shared_ptr<>; | 
|---|
| 1021 |  | 
|---|
| 1022 | // Check that all sockets are connected | 
|---|
| 1023 |  | 
|---|
| 1024 | for (int i=0; i<40; i++) | 
|---|
| 1025 | if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0) | 
|---|
| 1026 | factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno); | 
|---|
| 1027 |  | 
|---|
| 1028 | while (g_reset) | 
|---|
| 1029 | { | 
|---|
| 1030 | if (READ_STRUCT::wait()<0) | 
|---|
| 1031 | break; | 
|---|
| 1032 |  | 
|---|
| 1033 | if (rc_epoll==0) | 
|---|
| 1034 | break; | 
|---|
| 1035 |  | 
|---|
| 1036 | for (int jj=0; jj<rc_epoll; jj++) | 
|---|
| 1037 | { | 
|---|
| 1038 | READ_STRUCT *rs = READ_STRUCT::get(jj); | 
|---|
| 1039 | if (!rs->connected) | 
|---|
| 1040 | continue; | 
|---|
| 1041 |  | 
|---|
| 1042 | const bool rc_read = rs->read(); | 
|---|
| 1043 | if (!rc_read) | 
|---|
| 1044 | continue; | 
|---|
| 1045 |  | 
|---|
| 1046 | if (rs->bufTyp==READ_STRUCT::kHeader) | 
|---|
| 1047 | { | 
|---|
| 1048 | [...] | 
|---|
| 1049 | } | 
|---|
| 1050 |  | 
|---|
| 1051 | [...] | 
|---|
| 1052 |  | 
|---|
| 1053 | if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0) | 
|---|
| 1054 | factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno); | 
|---|
| 1055 | } | 
|---|
| 1056 |  | 
|---|
| 1057 | if (once_a_second) | 
|---|
| 1058 | { | 
|---|
| 1059 | if (evt==timeout) | 
|---|
| 1060 | break; | 
|---|
| 1061 | } | 
|---|
| 1062 | } | 
|---|
| 1063 |  | 
|---|
| 1064 | if (evt.nBoards==actBoards) | 
|---|
| 1065 | primaryQueue.post(evt); | 
|---|
| 1066 | } | 
|---|
| 1067 | */ | 
|---|
| 1068 |  | 
|---|
| 1069 | Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1)); | 
|---|
| 1070 |  | 
|---|
| 1071 | // This corresponds more or less to fFile... should we merge both? | 
|---|
| 1072 | shared_ptr<RUN_CTRL2> actrun; | 
|---|
| 1073 |  | 
|---|
| 1074 | void CloseRunFile() | 
|---|
| 1075 | { | 
|---|
| 1076 | // Currently we need actrun here, to be able to set kFileClosed. | 
|---|
| 1077 | // Apart from that we have to ensure that there is an open file at all | 
|---|
| 1078 | // which we can close. | 
|---|
| 1079 | // Submission to the primary queue ensures that the event | 
|---|
| 1080 | // is placed at the right place in the processing chain. | 
|---|
| 1081 | // (Corresponds to the correct run) | 
|---|
| 1082 | primaryQueue.emplace(new EVT_CTRL2(kRequestManual, actrun)); | 
|---|
| 1083 | } | 
|---|
| 1084 |  | 
|---|
| 1085 | bool mainloop(READ_STRUCT *rd) | 
|---|
| 1086 | { | 
|---|
| 1087 | factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop"); | 
|---|
| 1088 |  | 
|---|
| 1089 | primaryQueue.start(); | 
|---|
| 1090 | secondaryQueue.start(); | 
|---|
| 1091 | processingQueue1.start();; | 
|---|
| 1092 |  | 
|---|
| 1093 | actrun = make_shared<RUN_CTRL2>(); | 
|---|
| 1094 |  | 
|---|
| 1095 | //time in seconds | 
|---|
| 1096 | time_t gi_SecTime = time(NULL)-1; | 
|---|
| 1097 |  | 
|---|
| 1098 | //loop until global variable g_runStat claims stop | 
|---|
| 1099 | g_reset = 0; | 
|---|
| 1100 | while (g_reset == 0) | 
|---|
| 1101 | { | 
|---|
| 1102 | #ifdef USE_POLL | 
|---|
| 1103 | int    pp[40]; | 
|---|
| 1104 | int    nn = 0; | 
|---|
| 1105 | pollfd fds[40]; | 
|---|
| 1106 | for (int i=0; i<40; i++) | 
|---|
| 1107 | { | 
|---|
| 1108 | if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0) | 
|---|
| 1109 | { | 
|---|
| 1110 | fds[nn].fd = rd[i].socket; | 
|---|
| 1111 | fds[nn].events = POLLIN; | 
|---|
| 1112 | pp[nn] = i; | 
|---|
| 1113 | nn++; | 
|---|
| 1114 | } | 
|---|
| 1115 | } | 
|---|
| 1116 |  | 
|---|
| 1117 | const int rc_epoll = poll(fds, nn, 100); | 
|---|
| 1118 | if (rc_epoll<0) | 
|---|
| 1119 | break; | 
|---|
| 1120 | #endif | 
|---|
| 1121 |  | 
|---|
| 1122 | #ifdef USE_SELECT | 
|---|
| 1123 | fd_set readfs; | 
|---|
| 1124 | FD_ZERO(&readfs); | 
|---|
| 1125 | int nfsd = 0; | 
|---|
| 1126 | for (int i=0; i<NBOARDS; i++) | 
|---|
| 1127 | if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0) | 
|---|
| 1128 | { | 
|---|
| 1129 | FD_SET(rd[i].socket, &readfs); | 
|---|
| 1130 | if (rd[i].socket>nfsd) | 
|---|
| 1131 | nfsd = rd[i].socket; | 
|---|
| 1132 | } | 
|---|
| 1133 |  | 
|---|
| 1134 | timeval tv; | 
|---|
| 1135 | tv.tv_sec = 0; | 
|---|
| 1136 | tv.tv_usec = 100000; | 
|---|
| 1137 | const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv); | 
|---|
| 1138 | // 0: timeout | 
|---|
| 1139 | // -1: error | 
|---|
| 1140 | if (rc_select<0) | 
|---|
| 1141 | { | 
|---|
| 1142 | factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno); | 
|---|
| 1143 | continue; | 
|---|
| 1144 | } | 
|---|
| 1145 | #endif | 
|---|
| 1146 |  | 
|---|
| 1147 | #ifdef USE_EPOLL | 
|---|
| 1148 | const int rc_epoll = READ_STRUCT::wait(); | 
|---|
| 1149 | if (rc_epoll<0) | 
|---|
| 1150 | break; | 
|---|
| 1151 | #endif | 
|---|
| 1152 |  | 
|---|
| 1153 | #ifdef PRIORITY_QUEUE | 
|---|
| 1154 | priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio; | 
|---|
| 1155 |  | 
|---|
| 1156 | for (int i=0; i<NBOARDS; i++) | 
|---|
| 1157 | if (rd[i].connected) | 
|---|
| 1158 | prio.push(rd+i); | 
|---|
| 1159 |  | 
|---|
| 1160 | if (!prio.empty()) do | 
|---|
| 1161 | #endif | 
|---|
| 1162 |  | 
|---|
| 1163 |  | 
|---|
| 1164 | #ifdef USE_POLL | 
|---|
| 1165 | for (int jj=0; jj<nn; jj++) | 
|---|
| 1166 | #endif | 
|---|
| 1167 | #ifdef USE_EPOLL | 
|---|
| 1168 | for (int jj=0; jj<rc_epoll; jj++) | 
|---|
| 1169 | #endif | 
|---|
| 1170 | #if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE) | 
|---|
| 1171 | for (int jj=0; jj<NBOARDS; jj++) | 
|---|
| 1172 | #endif | 
|---|
| 1173 | { | 
|---|
| 1174 | #ifdef PRIORITY_QUEUE | 
|---|
| 1175 | READ_STRUCT *rs = prio.top(); | 
|---|
| 1176 | #endif | 
|---|
| 1177 | #ifdef USE_SELECT | 
|---|
| 1178 | if (!FD_ISSET(rs->socket, &readfs)) | 
|---|
| 1179 | continue; | 
|---|
| 1180 | #endif | 
|---|
| 1181 |  | 
|---|
| 1182 | #ifdef USE_POLL | 
|---|
| 1183 | if ((fds[jj].revents&POLLIN)==0) | 
|---|
| 1184 | continue; | 
|---|
| 1185 | #endif | 
|---|
| 1186 |  | 
|---|
| 1187 | #ifdef USE_EPOLL | 
|---|
| 1188 | // FIXME: How to get i? | 
|---|
| 1189 | READ_STRUCT *rs = READ_STRUCT::get(jj); | 
|---|
| 1190 | #endif | 
|---|
| 1191 |  | 
|---|
| 1192 | #ifdef USE_POLL | 
|---|
| 1193 | // FIXME: How to get i? | 
|---|
| 1194 | READ_STRUCT *rs = &rd[pp[jj]]; | 
|---|
| 1195 | #endif | 
|---|
| 1196 |  | 
|---|
| 1197 | #if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE) | 
|---|
| 1198 | const int i = (jj%4)*10 + (jj/4); | 
|---|
| 1199 | READ_STRUCT *rs = &rd[i]; | 
|---|
| 1200 | #endif | 
|---|
| 1201 |  | 
|---|
| 1202 | #ifdef COMPLETE_EVENTS | 
|---|
| 1203 | if (rs->bufTyp==READ_STRUCT::kWait) | 
|---|
| 1204 | continue; | 
|---|
| 1205 | #endif | 
|---|
| 1206 |  | 
|---|
| 1207 | // ================================================================== | 
|---|
| 1208 |  | 
|---|
| 1209 | const bool rc_read = rs->read(); | 
|---|
| 1210 |  | 
|---|
| 1211 | // Connect might have gotten closed during read | 
|---|
| 1212 | gi_NumConnect[rs->sockId] = rs->connected; | 
|---|
| 1213 | gj.numConn[rs->sockId]    = rs->connected; | 
|---|
| 1214 |  | 
|---|
| 1215 | // Read either failed or disconnected, or the buffer is not yet full | 
|---|
| 1216 | if (!rc_read) | 
|---|
| 1217 | continue; | 
|---|
| 1218 |  | 
|---|
| 1219 | // ================================================================== | 
|---|
| 1220 |  | 
|---|
| 1221 | if (rs->bufTyp==READ_STRUCT::kHeader) | 
|---|
| 1222 | { | 
|---|
| 1223 | //check if startflag correct; else shift block .... | 
|---|
| 1224 | // FIXME: This is not enough... this combination of | 
|---|
| 1225 | //        bytes can be anywhere... at least the end bytes | 
|---|
| 1226 | //        must be checked somewhere, too. | 
|---|
| 1227 | uint k; | 
|---|
| 1228 | for (k=0; k<sizeof(PEVNT_HEADER)-1; k++) | 
|---|
| 1229 | { | 
|---|
| 1230 | if (rs->B[k]==0xfb && rs->B[k+1] == 0x01) | 
|---|
| 1231 | break; | 
|---|
| 1232 | } | 
|---|
| 1233 | rs->skip += k; | 
|---|
| 1234 |  | 
|---|
| 1235 | //no start of header found | 
|---|
| 1236 | if (k==sizeof(PEVNT_HEADER)-1) | 
|---|
| 1237 | { | 
|---|
| 1238 | rs->B[0]   = rs->B[sizeof(PEVNT_HEADER)-1]; | 
|---|
| 1239 | rs->bufPos = rs->B+1; | 
|---|
| 1240 | rs->bufLen = sizeof(PEVNT_HEADER)-1; | 
|---|
| 1241 | continue; | 
|---|
| 1242 | } | 
|---|
| 1243 |  | 
|---|
| 1244 | if (k > 0) | 
|---|
| 1245 | { | 
|---|
| 1246 | memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k); | 
|---|
| 1247 |  | 
|---|
| 1248 | rs->bufPos -= k; | 
|---|
| 1249 | rs->bufLen += k; | 
|---|
| 1250 |  | 
|---|
| 1251 | continue; // We need to read more (bufLen>0) | 
|---|
| 1252 | } | 
|---|
| 1253 |  | 
|---|
| 1254 | if (rs->skip>0) | 
|---|
| 1255 | { | 
|---|
| 1256 | factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId); | 
|---|
| 1257 | rs->skip = 0; | 
|---|
| 1258 | } | 
|---|
| 1259 |  | 
|---|
| 1260 | // Swap the header entries from network to host order | 
|---|
| 1261 | rs->swapHeader(); | 
|---|
| 1262 |  | 
|---|
| 1263 | rs->bufTyp = READ_STRUCT::kData; | 
|---|
| 1264 | rs->bufLen = rs->len() - sizeof(PEVNT_HEADER); | 
|---|
| 1265 |  | 
|---|
| 1266 | debugHead(rs->B);  // i and fadBoard not used | 
|---|
| 1267 |  | 
|---|
| 1268 | continue; | 
|---|
| 1269 | } | 
|---|
| 1270 |  | 
|---|
| 1271 | const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2); | 
|---|
| 1272 | if (end != 0xfe04) | 
|---|
| 1273 | { | 
|---|
| 1274 | factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x", | 
|---|
| 1275 | rs->sockId, rs->H.fad_evt_counter, rs->len(), end); | 
|---|
| 1276 |  | 
|---|
| 1277 | // ready to read next header | 
|---|
| 1278 | rs->bufTyp = READ_STRUCT::kHeader; | 
|---|
| 1279 | rs->bufLen = sizeof(PEVNT_HEADER); | 
|---|
| 1280 | rs->bufPos = rs->B; | 
|---|
| 1281 | // FIXME: What to do with the validity flag? | 
|---|
| 1282 | continue; | 
|---|
| 1283 | } | 
|---|
| 1284 |  | 
|---|
| 1285 | // get index into mBuffer for this event (create if needed) | 
|---|
| 1286 | const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun); | 
|---|
| 1287 |  | 
|---|
| 1288 | // We have a valid entry, but no memory has yet been allocated | 
|---|
| 1289 | if (evt && !evt->initMemory()) | 
|---|
| 1290 | { | 
|---|
| 1291 | const time_t tm = time(NULL); | 
|---|
| 1292 | if (evt->runCtrl->reportMem==tm) | 
|---|
| 1293 | continue; | 
|---|
| 1294 |  | 
|---|
| 1295 | factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum); | 
|---|
| 1296 | evt->runCtrl->reportMem = tm; | 
|---|
| 1297 | continue; | 
|---|
| 1298 | } | 
|---|
| 1299 |  | 
|---|
| 1300 | // ready to read next header | 
|---|
| 1301 | rs->bufTyp = READ_STRUCT::kHeader; | 
|---|
| 1302 | rs->bufLen = sizeof(PEVNT_HEADER); | 
|---|
| 1303 | rs->bufPos = rs->B; | 
|---|
| 1304 |  | 
|---|
| 1305 | // Fatal error occured. Event cannot be processed. Skip it. Start reading next header. | 
|---|
| 1306 | if (!evt) | 
|---|
| 1307 | continue; | 
|---|
| 1308 |  | 
|---|
| 1309 | // This should never happen | 
|---|
| 1310 | if (evt->board[rs->sockId] != -1) | 
|---|
| 1311 | { | 
|---|
| 1312 | factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.", | 
|---|
| 1313 | evt->evNum, rs->sockId, jj, rs->len()); | 
|---|
| 1314 | // FIXME: What to do with the validity flag? | 
|---|
| 1315 | continue; // Continue reading next header | 
|---|
| 1316 | } | 
|---|
| 1317 |  | 
|---|
| 1318 | // Swap the data entries (board headers) from network to host order | 
|---|
| 1319 | rs->swapData(); | 
|---|
| 1320 |  | 
|---|
| 1321 | // Copy data from rd[i] to mBuffer[evID] | 
|---|
| 1322 | copyData(*rs, evt.get()); | 
|---|
| 1323 |  | 
|---|
| 1324 | #ifdef COMPLETE_EVENTS | 
|---|
| 1325 | // Do not read anmymore from this board until the whole event has been received | 
|---|
| 1326 | rs->bufTyp = READ_STRUCT::kWait; | 
|---|
| 1327 | #endif | 
|---|
| 1328 | // now we have stored a new board contents into Event structure | 
|---|
| 1329 | evt->board[rs->sockId] = rs->sockId; | 
|---|
| 1330 | evt->header = evt->FADhead+rs->sockId; | 
|---|
| 1331 | evt->nBoard++; | 
|---|
| 1332 |  | 
|---|
| 1333 | #ifdef COMPLETE_EPOLL | 
|---|
| 1334 | if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0) | 
|---|
| 1335 | { | 
|---|
| 1336 | factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno); | 
|---|
| 1337 | break; | 
|---|
| 1338 | } | 
|---|
| 1339 | #endif | 
|---|
| 1340 | // event not yet complete | 
|---|
| 1341 | if (evt->nBoard < READ_STRUCT::activeSockets) | 
|---|
| 1342 | continue; | 
|---|
| 1343 |  | 
|---|
| 1344 | // All previous events are now flagged as incomplete ("expired") | 
|---|
| 1345 | // and will be removed. (This is a bit tricky, because pop_front() | 
|---|
| 1346 | // would invalidate the current iterator if not done _after_ the increment) | 
|---|
| 1347 | for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); ) | 
|---|
| 1348 | { | 
|---|
| 1349 | const bool found = it->get()==evt.get(); | 
|---|
| 1350 | if (!found) | 
|---|
| 1351 | reportIncomplete(*it, "expired"); | 
|---|
| 1352 | else | 
|---|
| 1353 | primaryQueue.post(evt); | 
|---|
| 1354 |  | 
|---|
| 1355 | // package_len is 0 if nothing was received. | 
|---|
| 1356 | for (int ib=0; ib<40; ib++) | 
|---|
| 1357 | rd[ib].relBytes += uint32_t((*it)->FADhead[ib].package_length)*2; | 
|---|
| 1358 |  | 
|---|
| 1359 | // The counter must be increased _before_ the pop_front, | 
|---|
| 1360 | // otherwise the counter is invalidated by the pop_front! | 
|---|
| 1361 | it++; | 
|---|
| 1362 | evtCtrl.pop_front(); | 
|---|
| 1363 |  | 
|---|
| 1364 | // We reached the current event, so we are done | 
|---|
| 1365 | if (found) | 
|---|
| 1366 | break; | 
|---|
| 1367 | } | 
|---|
| 1368 |  | 
|---|
| 1369 | #ifdef COMPLETE_EPOLL | 
|---|
| 1370 | for (int j=0; j<40; j++) | 
|---|
| 1371 | { | 
|---|
| 1372 | epoll_event ev; | 
|---|
| 1373 | ev.events = EPOLLIN; | 
|---|
| 1374 | ev.data.ptr = &rd[j];  // user data (union: ev.ptr) | 
|---|
| 1375 | if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0) | 
|---|
| 1376 | { | 
|---|
| 1377 | factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno); | 
|---|
| 1378 | return; | 
|---|
| 1379 | } | 
|---|
| 1380 | } | 
|---|
| 1381 | #endif | 
|---|
| 1382 |  | 
|---|
| 1383 | #ifdef COMPLETE_EVENTS | 
|---|
| 1384 | for (int j=0; j<40; j++) | 
|---|
| 1385 | { | 
|---|
| 1386 | //if (rs->bufTyp==READ_STRUCT::kWait) | 
|---|
| 1387 | { | 
|---|
| 1388 | rs->bufTyp = READ_STRUCT::kHeader; | 
|---|
| 1389 | rs->bufLen = sizeof(PEVNT_HEADER); | 
|---|
| 1390 | rs->bufPos = rs->B; | 
|---|
| 1391 | } | 
|---|
| 1392 | } | 
|---|
| 1393 | #endif | 
|---|
| 1394 | } // end for loop over all sockets | 
|---|
| 1395 | #ifdef PRIORITY_QUEUE | 
|---|
| 1396 | while (0); // convert continue into break ;) | 
|---|
| 1397 | #endif | 
|---|
| 1398 |  | 
|---|
| 1399 | // ================================================================== | 
|---|
| 1400 |  | 
|---|
| 1401 | const time_t actTime = time(NULL); | 
|---|
| 1402 | if (actTime == gi_SecTime) | 
|---|
| 1403 | { | 
|---|
| 1404 | #if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL) | 
|---|
| 1405 | if (evtCtrl.empty()) | 
|---|
| 1406 | usleep(actTime-actrun->lastTime>300 ? 10000 : 1); | 
|---|
| 1407 | #endif | 
|---|
| 1408 | continue; | 
|---|
| 1409 | } | 
|---|
| 1410 | gi_SecTime = actTime; | 
|---|
| 1411 |  | 
|---|
| 1412 | // ================================================================== | 
|---|
| 1413 | //loop over all active events and flag those older than read-timeout | 
|---|
| 1414 | //delete those that are written to disk .... | 
|---|
| 1415 |  | 
|---|
| 1416 | // This could be improved having the pointer which separates the queue with | 
|---|
| 1417 | // the incomplete events from the queue with the complete events | 
|---|
| 1418 | for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); ) | 
|---|
| 1419 | { | 
|---|
| 1420 | // A reference is enough because the shared_ptr is hold by the evtCtrl | 
|---|
| 1421 | const shared_ptr<EVT_CTRL2> &evt = *it; | 
|---|
| 1422 |  | 
|---|
| 1423 | // The first event is the oldest. If the first event within the | 
|---|
| 1424 | // timeout window was received, we can stop searching further. | 
|---|
| 1425 | if (evt->time.tv_sec+g_evtTimeout>=actTime) | 
|---|
| 1426 | break; | 
|---|
| 1427 |  | 
|---|
| 1428 | // The counter must be increased _before_ the pop_front, | 
|---|
| 1429 | // otherwise the counter is invalidated by the pop_front! | 
|---|
| 1430 | it++; | 
|---|
| 1431 |  | 
|---|
| 1432 | // This timeout is caused because complete data from one or more | 
|---|
| 1433 | // boards has been received, but the memory could not be allocated. | 
|---|
| 1434 | // There is no reason why we should not go on waiting for | 
|---|
| 1435 | // memory to become free. However, the FADs will disconnect | 
|---|
| 1436 | // after 60s due to their keep-alive timeout, but the event builder | 
|---|
| 1437 | // will still wait for memory to become available. | 
|---|
| 1438 | // Currently, the only possibility to free the memory from the | 
|---|
| 1439 | // evtCtrl to restart the event builder (STOP/START). | 
|---|
| 1440 | if (!evt->valid()) | 
|---|
| 1441 | continue; | 
|---|
| 1442 |  | 
|---|
| 1443 | // This will result in the emission of a dim service. | 
|---|
| 1444 | // It doesn't matter if that takes comparably long, | 
|---|
| 1445 | // because we have to stop the run anyway. | 
|---|
| 1446 | const uint64_t rep = reportIncomplete(evt, "timeout"); | 
|---|
| 1447 | factReportIncomplete(rep); | 
|---|
| 1448 |  | 
|---|
| 1449 | // At least the data from one boards is complete... | 
|---|
| 1450 | // package_len is 0 when nothing was received from this board | 
|---|
| 1451 | for (int ib=0; ib<40; ib++) | 
|---|
| 1452 | rd[ib].relBytes += uint32_t(evt->FADhead[ib].package_length)*2; | 
|---|
| 1453 |  | 
|---|
| 1454 | evtCtrl.pop_front(); | 
|---|
| 1455 | } | 
|---|
| 1456 |  | 
|---|
| 1457 | // ================================================================= | 
|---|
| 1458 |  | 
|---|
| 1459 | gj.bufNew   = evtCtrl.size();            //# incomplete events in buffer | 
|---|
| 1460 | gj.bufEvt   = primaryQueue.size();       //# complete events in buffer | 
|---|
| 1461 | gj.bufWrite = secondaryQueue.size();     //# complete events in buffer | 
|---|
| 1462 | gj.bufProc  = processingQueue1.size();   //# complete events in buffer | 
|---|
| 1463 | gj.bufTot   = Memory::max_inuse/MAX_TOT_MEM; | 
|---|
| 1464 | gj.usdMem   = Memory::max_inuse; | 
|---|
| 1465 | gj.totMem   = Memory::allocated; | 
|---|
| 1466 | gj.maxMem   = g_maxMem; | 
|---|
| 1467 |  | 
|---|
| 1468 | gj.deltaT = 1000; // temporary, must be improved | 
|---|
| 1469 |  | 
|---|
| 1470 | bool changed = false; | 
|---|
| 1471 |  | 
|---|
| 1472 | static vector<uint64_t> store(NBOARDS); | 
|---|
| 1473 |  | 
|---|
| 1474 | for (int ib=0; ib<NBOARDS; ib++) | 
|---|
| 1475 | { | 
|---|
| 1476 | gj.rateBytes[ib] = store[ib]>rd[ib].totBytes ? rd[ib].totBytes : rd[ib].totBytes-store[ib]; | 
|---|
| 1477 | gj.relBytes[ib]  = rd[ib].totBytes-rd[ib].relBytes; | 
|---|
| 1478 |  | 
|---|
| 1479 | store[ib] = rd[ib].totBytes; | 
|---|
| 1480 |  | 
|---|
| 1481 | if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr)) | 
|---|
| 1482 | changed = true; | 
|---|
| 1483 |  | 
|---|
| 1484 | gi_NumConnect[ib] = rd[ib].connected; | 
|---|
| 1485 | gj.numConn[ib]    = rd[ib].connected; | 
|---|
| 1486 | } | 
|---|
| 1487 |  | 
|---|
| 1488 | factStat(gj); | 
|---|
| 1489 |  | 
|---|
| 1490 | Memory::max_inuse = 0; | 
|---|
| 1491 |  | 
|---|
| 1492 | // ================================================================= | 
|---|
| 1493 |  | 
|---|
| 1494 | // This is a fake event to trigger possible run-closing conditions once a second | 
|---|
| 1495 | // FIXME: This is not yet ideal because a file would never be closed | 
|---|
| 1496 | //        if a new file has been started and no events of the new file | 
|---|
| 1497 | //        have been received yet | 
|---|
| 1498 | int request = kRequestNone; | 
|---|
| 1499 |  | 
|---|
| 1500 | // If nothing was received for more than 5min, close file | 
|---|
| 1501 | if (actTime-actrun->lastTime>300) | 
|---|
| 1502 | request |= kRequestTimeout; | 
|---|
| 1503 |  | 
|---|
| 1504 | // If connection status has changed | 
|---|
| 1505 | if (changed) | 
|---|
| 1506 | request |= kRequestConnectionChange; | 
|---|
| 1507 |  | 
|---|
| 1508 | if (request!=kRequestNone) | 
|---|
| 1509 | runFinished(); | 
|---|
| 1510 |  | 
|---|
| 1511 | if (actrun->fileStat==kFileOpen) | 
|---|
| 1512 | primaryQueue.emplace(new EVT_CTRL2(request, actrun)); | 
|---|
| 1513 | } | 
|---|
| 1514 |  | 
|---|
| 1515 | //   1: Stop, wait for event to get processed | 
|---|
| 1516 | //   2: Stop, finish immediately | 
|---|
| 1517 | // 101: Restart, wait for events to get processed | 
|---|
| 1518 | // 101: Restart, finish immediately | 
|---|
| 1519 | // | 
|---|
| 1520 | const int gi_reset = g_reset; | 
|---|
| 1521 |  | 
|---|
| 1522 | const bool abort = gi_reset%100==2; | 
|---|
| 1523 |  | 
|---|
| 1524 | factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join"); | 
|---|
| 1525 |  | 
|---|
| 1526 | primaryQueue.wait(abort); | 
|---|
| 1527 | secondaryQueue.wait(abort); | 
|---|
| 1528 | processingQueue1.wait(abort); | 
|---|
| 1529 |  | 
|---|
| 1530 | // Here we also destroy all runCtrl structures and hence close all open files | 
|---|
| 1531 | evtCtrl.clear(); | 
|---|
| 1532 | actrun.reset(); | 
|---|
| 1533 |  | 
|---|
| 1534 | factPrintf(MessageImp::kInfo, "Exit read Process..."); | 
|---|
| 1535 | factPrintf(MessageImp::kInfo, "%llu Bytes flagged as in-use.", Memory::inuse); | 
|---|
| 1536 |  | 
|---|
| 1537 | factStat(gj); | 
|---|
| 1538 |  | 
|---|
| 1539 | return gi_reset>=100; | 
|---|
| 1540 | } | 
|---|
| 1541 |  | 
|---|
| 1542 | // ========================================================================== | 
|---|
| 1543 | // ========================================================================== | 
|---|
| 1544 |  | 
|---|
| 1545 | void StartEvtBuild() | 
|---|
| 1546 | { | 
|---|
| 1547 | factPrintf(MessageImp::kInfo, "Starting EventBuilder++"); | 
|---|
| 1548 |  | 
|---|
| 1549 | memset(gi_NumConnect, 0, NBOARDS*sizeof(*gi_NumConnect)); | 
|---|
| 1550 |  | 
|---|
| 1551 | memset(&gj, 0, sizeof(GUI_STAT)); | 
|---|
| 1552 |  | 
|---|
| 1553 | gj.usdMem   = Memory::inuse; | 
|---|
| 1554 | gj.totMem   = Memory::allocated; | 
|---|
| 1555 | gj.maxMem   = g_maxMem; | 
|---|
| 1556 |  | 
|---|
| 1557 |  | 
|---|
| 1558 | READ_STRUCT rd[NBOARDS]; | 
|---|
| 1559 |  | 
|---|
| 1560 | // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets) | 
|---|
| 1561 | for (int i=0; i<NBOARDS; i++) | 
|---|
| 1562 | rd[i].sockId = i; | 
|---|
| 1563 |  | 
|---|
| 1564 | while (mainloop(rd)); | 
|---|
| 1565 |  | 
|---|
| 1566 | //must close all open sockets ... | 
|---|
| 1567 | factPrintf(MessageImp::kInfo, "Close all sockets..."); | 
|---|
| 1568 |  | 
|---|
| 1569 | READ_STRUCT::close(); | 
|---|
| 1570 |  | 
|---|
| 1571 | // Now all sockets get closed. This is not reflected in gi_NumConnect | 
|---|
| 1572 | // The current workaround is to count all sockets as closed when the thread is not running | 
|---|
| 1573 | factPrintf(MessageImp::kInfo, "EventBuilder++ closed"); | 
|---|
| 1574 | } | 
|---|