| 1 | #include <poll.h> | 
|---|
| 2 | #include <sys/time.h> | 
|---|
| 3 | #include <sys/epoll.h> | 
|---|
| 4 | #include <netinet/tcp.h> | 
|---|
| 5 |  | 
|---|
| 6 | #include <cstring> | 
|---|
| 7 | #include <cstdarg> | 
|---|
| 8 | #include <list> | 
|---|
| 9 | #include <queue> | 
|---|
| 10 |  | 
|---|
| 11 | #include <boost/algorithm/string/join.hpp> | 
|---|
| 12 |  | 
|---|
| 13 | #include "queue.h" | 
|---|
| 14 |  | 
|---|
| 15 | #include "MessageImp.h" | 
|---|
| 16 | #include "EventBuilder.h" | 
|---|
| 17 |  | 
|---|
| 18 | using namespace std; | 
|---|
| 19 |  | 
|---|
| 20 | #define MIN_LEN  32    // min #bytes needed to interpret FADheader | 
|---|
| 21 | #define MAX_LEN  81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092  (data+boardheader+eventheader+endflag) | 
|---|
| 22 |  | 
|---|
| 23 | //#define COMPLETE_EVENTS | 
|---|
| 24 | //#define USE_POLL | 
|---|
| 25 | //#define USE_EPOLL | 
|---|
| 26 | //#define USE_SELECT | 
|---|
| 27 | //#define COMPLETE_EPOLL | 
|---|
| 28 | //#define PRIORITY_QUEUE | 
|---|
| 29 |  | 
|---|
| 30 | // Reading only 1024: 13:  77Hz, 87% | 
|---|
| 31 | // Reading only 1024: 12:  78Hz, 46% | 
|---|
| 32 | // Reading only  300:  4: 250Hz, 92% | 
|---|
| 33 | // Reading only  300:  3: 258Hz, 40% | 
|---|
| 34 |  | 
|---|
| 35 | // Reading only four threads 1024: 13:  77Hz, 60% | 
|---|
| 36 | // Reading only four threads 1024: 12:  78Hz, 46% | 
|---|
| 37 | // Reading only four threads  300:  4: 250Hz, 92% | 
|---|
| 38 | // Reading only four threads  300:  3: 258Hz, 40% | 
|---|
| 39 |  | 
|---|
| 40 | // Default  300:  4: 249Hz, 92% | 
|---|
| 41 | // Default  300:  3: 261Hz, 40% | 
|---|
| 42 | // Default 1024: 13:  76Hz, 93% | 
|---|
| 43 | // Default 1024: 12:  79Hz, 46% | 
|---|
| 44 |  | 
|---|
| 45 | // Poll [selected] 1024: 13:  63Hz, 45% | 
|---|
| 46 | // Poll [selected] 1024: 14:  63Hz, 63% | 
|---|
| 47 | // Poll [selected] 1024: 15:  64Hz, 80% | 
|---|
| 48 | // Poll [selected]  300:  4: 230Hz, 47% | 
|---|
| 49 | // Poll [selected]  300:  3: 200Hz, 94% | 
|---|
| 50 |  | 
|---|
| 51 | // Poll [all]      1024: 13:  65Hz, 47% | 
|---|
| 52 | // Poll [all]      1024: 14:  64Hz, 59% | 
|---|
| 53 | // Poll [all]      1024: 15:  62Hz, 67% | 
|---|
| 54 | // Poll [all]       300:  4: 230Hz, 47% | 
|---|
| 55 | // Poll [all]       300:  3: 230Hz, 35% | 
|---|
| 56 |  | 
|---|
| 57 | // ========================================================================== | 
|---|
| 58 |  | 
|---|
| 59 | bool runOpen(const EVT_CTRL2 &evt); | 
|---|
| 60 | bool runWrite(const EVT_CTRL2 &evt); | 
|---|
| 61 | void runClose(RUN_CTRL2 &run); | 
|---|
| 62 | void applyCalib(const EVT_CTRL2 &evt, const size_t &size); | 
|---|
| 63 | void factOut(int severity, const char *message); | 
|---|
| 64 | void factReportIncomplete (uint64_t rep); | 
|---|
| 65 | void gotNewRun(RUN_CTRL2 &run); | 
|---|
| 66 | void runFinished(); | 
|---|
| 67 | void factStat(const GUI_STAT &gj); | 
|---|
| 68 | bool eventCheck(const EVT_CTRL2 &evt); | 
|---|
| 69 | void debugHead(void *buf); | 
|---|
| 70 |  | 
|---|
| 71 | // ========================================================================== | 
|---|
| 72 |  | 
|---|
| 73 | int g_reset; | 
|---|
| 74 |  | 
|---|
| 75 | size_t g_maxMem;                //maximum memory allowed for buffer | 
|---|
| 76 |  | 
|---|
| 77 | uint16_t g_evtTimeout;           // timeout (sec) for one event | 
|---|
| 78 |  | 
|---|
| 79 | FACT_SOCK g_port[NBOARDS];      // .addr=string of IP-addr in dotted-decimal "ddd.ddd.ddd.ddd" | 
|---|
| 80 |  | 
|---|
| 81 | uint gi_NumConnect[NBOARDS];    //4 crates * 10 boards | 
|---|
| 82 |  | 
|---|
| 83 | GUI_STAT gj; | 
|---|
| 84 |  | 
|---|
| 85 | // ========================================================================== | 
|---|
| 86 |  | 
|---|
| 87 | namespace Memory | 
|---|
| 88 | { | 
|---|
| 89 | uint64_t inuse     = 0; | 
|---|
| 90 | uint64_t allocated = 0; | 
|---|
| 91 |  | 
|---|
| 92 | uint64_t max_inuse = 0; | 
|---|
| 93 |  | 
|---|
| 94 | std::mutex mtx; | 
|---|
| 95 |  | 
|---|
| 96 | std::forward_list<void*> memory; | 
|---|
| 97 |  | 
|---|
| 98 | void *malloc() | 
|---|
| 99 | { | 
|---|
| 100 | // No free slot available, next alloc would exceed max memory | 
|---|
| 101 | if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem) | 
|---|
| 102 | return NULL; | 
|---|
| 103 |  | 
|---|
| 104 | // We will return this amount of memory | 
|---|
| 105 | // This is not 100% thread safe, but it is not a super accurate measure anyway | 
|---|
| 106 | inuse += MAX_TOT_MEM; | 
|---|
| 107 | if (inuse>max_inuse) | 
|---|
| 108 | max_inuse = inuse; | 
|---|
| 109 |  | 
|---|
| 110 | if (memory.empty()) | 
|---|
| 111 | { | 
|---|
| 112 | // No free slot available, allocate a new one | 
|---|
| 113 | allocated += MAX_TOT_MEM; | 
|---|
| 114 | return  new char[MAX_TOT_MEM]; | 
|---|
| 115 | } | 
|---|
| 116 |  | 
|---|
| 117 | // Get the next free slot from the stack and return it | 
|---|
| 118 | const std::lock_guard<std::mutex> lock(mtx); | 
|---|
| 119 |  | 
|---|
| 120 | void *mem = memory.front(); | 
|---|
| 121 | memory.pop_front(); | 
|---|
| 122 | return mem; | 
|---|
| 123 | }; | 
|---|
| 124 |  | 
|---|
| 125 | void free(void *mem) | 
|---|
| 126 | { | 
|---|
| 127 | if (!mem) | 
|---|
| 128 | return; | 
|---|
| 129 |  | 
|---|
| 130 | // Decrease the amont of memory in use accordingly | 
|---|
| 131 | inuse -= MAX_TOT_MEM; | 
|---|
| 132 |  | 
|---|
| 133 | // If the maximum memory has changed, we might be over the limit. | 
|---|
| 134 | // In this case: free a slot | 
|---|
| 135 | if (allocated>g_maxMem) | 
|---|
| 136 | { | 
|---|
| 137 | delete [] (char*)mem; | 
|---|
| 138 | allocated -= MAX_TOT_MEM; | 
|---|
| 139 | return; | 
|---|
| 140 | } | 
|---|
| 141 |  | 
|---|
| 142 | const std::lock_guard<std::mutex> lock(mtx); | 
|---|
| 143 | memory.push_front(mem); | 
|---|
| 144 | } | 
|---|
| 145 |  | 
|---|
| 146 | }; | 
|---|
| 147 |  | 
|---|
| 148 | // ========================================================================== | 
|---|
| 149 |  | 
|---|
| 150 | void factPrintf(int severity, const char *fmt, ...) | 
|---|
| 151 | { | 
|---|
| 152 | char str[1000]; | 
|---|
| 153 |  | 
|---|
| 154 | va_list ap; | 
|---|
| 155 | va_start(ap, fmt); | 
|---|
| 156 | vsnprintf(str, 1000, fmt, ap); | 
|---|
| 157 | va_end(ap); | 
|---|
| 158 |  | 
|---|
| 159 | factOut(severity, str); | 
|---|
| 160 | } | 
|---|
| 161 |  | 
|---|
| 162 | // ========================================================================== | 
|---|
| 163 |  | 
|---|
| 164 | struct READ_STRUCT | 
|---|
| 165 | { | 
|---|
| 166 | enum buftyp_t | 
|---|
| 167 | { | 
|---|
| 168 | kStream, | 
|---|
| 169 | kHeader, | 
|---|
| 170 | kData, | 
|---|
| 171 | #ifdef COMPLETE_EVENTS | 
|---|
| 172 | kWait | 
|---|
| 173 | #endif | 
|---|
| 174 | }; | 
|---|
| 175 |  | 
|---|
| 176 | // ---------- connection ---------- | 
|---|
| 177 |  | 
|---|
| 178 | static uint activeSockets; | 
|---|
| 179 |  | 
|---|
| 180 | int  sockId;       // socket id (board number) | 
|---|
| 181 | int  socket;       // socket handle | 
|---|
| 182 | bool connected;    // is this socket connected? | 
|---|
| 183 |  | 
|---|
| 184 | struct sockaddr_in SockAddr;  // Socket address copied from wrapper during socket creation | 
|---|
| 185 |  | 
|---|
| 186 | // ------------ epoll ------------- | 
|---|
| 187 |  | 
|---|
| 188 | static int  fd_epoll; | 
|---|
| 189 | static epoll_event events[NBOARDS]; | 
|---|
| 190 |  | 
|---|
| 191 | static void init(); | 
|---|
| 192 | static void close(); | 
|---|
| 193 | static int  wait(); | 
|---|
| 194 | static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); } | 
|---|
| 195 |  | 
|---|
| 196 | // ------------ buffer ------------ | 
|---|
| 197 |  | 
|---|
| 198 | buftyp_t  bufTyp;  // what are we reading at the moment: 0=header 1=data -1=skip ... | 
|---|
| 199 |  | 
|---|
| 200 | uint32_t  bufLen;  // number of bytes left to read | 
|---|
| 201 | uint8_t  *bufPos;  // next byte to read to the buffer next | 
|---|
| 202 |  | 
|---|
| 203 | union | 
|---|
| 204 | { | 
|---|
| 205 | uint8_t  B[MAX_LEN]; | 
|---|
| 206 | uint16_t S[MAX_LEN / 2]; | 
|---|
| 207 | uint32_t I[MAX_LEN / 4]; | 
|---|
| 208 | uint64_t L[MAX_LEN / 8]; | 
|---|
| 209 | PEVNT_HEADER H; | 
|---|
| 210 | }; | 
|---|
| 211 |  | 
|---|
| 212 | timeval  time; | 
|---|
| 213 | uint64_t totBytes;  // total received bytes | 
|---|
| 214 | uint64_t relBytes;  // total released bytes | 
|---|
| 215 | uint32_t skip;      // number of bytes skipped before start of event | 
|---|
| 216 |  | 
|---|
| 217 | uint32_t len() const { return uint32_t(H.package_length)*2; } | 
|---|
| 218 |  | 
|---|
| 219 | void swapHeader(); | 
|---|
| 220 | void swapData(); | 
|---|
| 221 |  | 
|---|
| 222 | // -------------------------------- | 
|---|
| 223 |  | 
|---|
| 224 | READ_STRUCT() : socket(-1), connected(false), totBytes(0), relBytes(0) | 
|---|
| 225 | { | 
|---|
| 226 | if (fd_epoll<0) | 
|---|
| 227 | init(); | 
|---|
| 228 | } | 
|---|
| 229 | ~READ_STRUCT() | 
|---|
| 230 | { | 
|---|
| 231 | destroy(); | 
|---|
| 232 | } | 
|---|
| 233 |  | 
|---|
| 234 | void destroy(); | 
|---|
| 235 | bool create(sockaddr_in addr); | 
|---|
| 236 | bool check(int, sockaddr_in addr); | 
|---|
| 237 | bool read(); | 
|---|
| 238 |  | 
|---|
| 239 | }; | 
|---|
| 240 |  | 
|---|
| 241 | #ifdef PRIORITY_QUEUE | 
|---|
| 242 | struct READ_STRUCTcomp | 
|---|
| 243 | { | 
|---|
| 244 | bool operator()(const READ_STRUCT *r1, const READ_STRUCT *r2) | 
|---|
| 245 | { | 
|---|
| 246 | const int64_t rel1 = r1->totBytes - r1->relBytes; | 
|---|
| 247 | const int64_t rel2 = r2->totBytes - r2->relBytes; | 
|---|
| 248 | return rel1 > rel2; | 
|---|
| 249 | } | 
|---|
| 250 | }; | 
|---|
| 251 | #endif | 
|---|
| 252 |  | 
|---|
| 253 | int READ_STRUCT::wait() | 
|---|
| 254 | { | 
|---|
| 255 | // wait for something to do... | 
|---|
| 256 | const int rc = epoll_wait(fd_epoll, events, NBOARDS, 100); // max, timeout[ms] | 
|---|
| 257 | if (rc>=0) | 
|---|
| 258 | return rc; | 
|---|
| 259 |  | 
|---|
| 260 | if (errno==EINTR) // timout or signal interruption | 
|---|
| 261 | return 0; | 
|---|
| 262 |  | 
|---|
| 263 | factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno); | 
|---|
| 264 | return -1; | 
|---|
| 265 | } | 
|---|
| 266 |  | 
|---|
| 267 | uint READ_STRUCT::activeSockets = 0; | 
|---|
| 268 | int READ_STRUCT::fd_epoll = -1; | 
|---|
| 269 | epoll_event READ_STRUCT::events[NBOARDS]; | 
|---|
| 270 |  | 
|---|
| 271 | void READ_STRUCT::init() | 
|---|
| 272 | { | 
|---|
| 273 | if (fd_epoll>=0) | 
|---|
| 274 | return; | 
|---|
| 275 |  | 
|---|
| 276 | #ifdef USE_EPOLL | 
|---|
| 277 | fd_epoll = epoll_create(NBOARDS); | 
|---|
| 278 | if (fd_epoll<0) | 
|---|
| 279 | { | 
|---|
| 280 | factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno); | 
|---|
| 281 | return; | 
|---|
| 282 | } | 
|---|
| 283 | #endif | 
|---|
| 284 | } | 
|---|
| 285 |  | 
|---|
| 286 | void READ_STRUCT::close() | 
|---|
| 287 | { | 
|---|
| 288 | #ifdef USE_EPOLL | 
|---|
| 289 | if (fd_epoll>=0 && ::close(fd_epoll)>0) | 
|---|
| 290 | factPrintf(MessageImp::kFatal, "Closing epoll failed: %m (close,rc=%d)", errno); | 
|---|
| 291 | #endif | 
|---|
| 292 |  | 
|---|
| 293 | fd_epoll = -1; | 
|---|
| 294 | } | 
|---|
| 295 |  | 
|---|
| 296 | bool READ_STRUCT::create(sockaddr_in sockAddr) | 
|---|
| 297 | { | 
|---|
| 298 | if (socket>=0) | 
|---|
| 299 | return false; | 
|---|
| 300 |  | 
|---|
| 301 | const int port = ntohs(sockAddr.sin_port) + 1; | 
|---|
| 302 |  | 
|---|
| 303 | SockAddr.sin_family = sockAddr.sin_family; | 
|---|
| 304 | SockAddr.sin_addr   = sockAddr.sin_addr; | 
|---|
| 305 | SockAddr.sin_port   = htons(port); | 
|---|
| 306 |  | 
|---|
| 307 | if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) | 
|---|
| 308 | { | 
|---|
| 309 | factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno); | 
|---|
| 310 | socket = -1; | 
|---|
| 311 | return false; | 
|---|
| 312 | } | 
|---|
| 313 |  | 
|---|
| 314 | int optval = 1; | 
|---|
| 315 | if (setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(int)) < 0) | 
|---|
| 316 | factPrintf(MessageImp::kInfo, "Setting TCP_NODELAY for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); | 
|---|
| 317 |  | 
|---|
| 318 | optval = 1; | 
|---|
| 319 | if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) | 
|---|
| 320 | factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); | 
|---|
| 321 |  | 
|---|
| 322 | optval = 10;                 //start after 10 seconds | 
|---|
| 323 | if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) | 
|---|
| 324 | factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); | 
|---|
| 325 |  | 
|---|
| 326 | optval = 10;                 //do every 10 seconds | 
|---|
| 327 | if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) | 
|---|
| 328 | factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); | 
|---|
| 329 |  | 
|---|
| 330 | optval = 2;                  //close after 2 unsuccessful tries | 
|---|
| 331 | if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) | 
|---|
| 332 | factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno); | 
|---|
| 333 |  | 
|---|
| 334 | factPrintf(MessageImp::kInfo, "Generated socket %d (%d)", sockId, socket); | 
|---|
| 335 |  | 
|---|
| 336 | //connected = false; | 
|---|
| 337 | activeSockets++; | 
|---|
| 338 |  | 
|---|
| 339 | return true; | 
|---|
| 340 | } | 
|---|
| 341 |  | 
|---|
| 342 | void READ_STRUCT::destroy() | 
|---|
| 343 | { | 
|---|
| 344 | if (socket<0) | 
|---|
| 345 | return; | 
|---|
| 346 |  | 
|---|
| 347 | #ifdef USE_EPOLL | 
|---|
| 348 | // strictly speaking this should not be necessary | 
|---|
| 349 | if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0) | 
|---|
| 350 | factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno); | 
|---|
| 351 | #endif | 
|---|
| 352 |  | 
|---|
| 353 | if (::close(socket) > 0) | 
|---|
| 354 | factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno); | 
|---|
| 355 | else | 
|---|
| 356 | factPrintf(MessageImp::kInfo, "Closed socket %d (%d)", sockId, socket); | 
|---|
| 357 |  | 
|---|
| 358 | socket = -1; | 
|---|
| 359 | connected = false; | 
|---|
| 360 | activeSockets--; | 
|---|
| 361 | } | 
|---|
| 362 |  | 
|---|
| 363 | bool READ_STRUCT::check(int sockDef, sockaddr_in addr) | 
|---|
| 364 | { | 
|---|
| 365 | // Continue in the most most likely case (performance) | 
|---|
| 366 | //if (socket>=0 && sockDef!=0 && connected) | 
|---|
| 367 | //    return; | 
|---|
| 368 | const int old = socket; | 
|---|
| 369 |  | 
|---|
| 370 | // socket open, but should not be open | 
|---|
| 371 | if (socket>=0 && sockDef==0) | 
|---|
| 372 | destroy(); | 
|---|
| 373 |  | 
|---|
| 374 | // Socket closed, but should be open | 
|---|
| 375 | if (socket<0 && sockDef!=0) | 
|---|
| 376 | create(addr); //generate address and socket | 
|---|
| 377 |  | 
|---|
| 378 | const bool retval = old!=socket; | 
|---|
| 379 |  | 
|---|
| 380 | // Socket closed | 
|---|
| 381 | if (socket<0) | 
|---|
| 382 | return retval; | 
|---|
| 383 |  | 
|---|
| 384 | // Socket open and connected: Nothing to do | 
|---|
| 385 | if (connected) | 
|---|
| 386 | return retval; | 
|---|
| 387 |  | 
|---|
| 388 | //try to connect if not yet done | 
|---|
| 389 | const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr)); | 
|---|
| 390 | if (rc == -1) | 
|---|
| 391 | return retval; | 
|---|
| 392 |  | 
|---|
| 393 | connected = true; | 
|---|
| 394 |  | 
|---|
| 395 | if (sockDef<0) | 
|---|
| 396 | { | 
|---|
| 397 | bufTyp = READ_STRUCT::kStream; // full data to be skipped | 
|---|
| 398 | bufLen = MAX_LEN;              // huge for skipping | 
|---|
| 399 | } | 
|---|
| 400 | else | 
|---|
| 401 | { | 
|---|
| 402 | bufTyp = READ_STRUCT::kHeader;  // expect a header | 
|---|
| 403 | bufLen = sizeof(PEVNT_HEADER);  // max size to read at begining | 
|---|
| 404 | } | 
|---|
| 405 |  | 
|---|
| 406 | bufPos   = B;  // no byte read so far | 
|---|
| 407 | skip     = 0;  // start empty | 
|---|
| 408 | totBytes = 0; | 
|---|
| 409 | relBytes = 0; | 
|---|
| 410 |  | 
|---|
| 411 | factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket); | 
|---|
| 412 |  | 
|---|
| 413 | #ifdef USE_EPOLL | 
|---|
| 414 | epoll_event ev; | 
|---|
| 415 | ev.events = EPOLLIN; | 
|---|
| 416 | ev.data.ptr = this;  // user data (union: ev.ptr) | 
|---|
| 417 | if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0) | 
|---|
| 418 | factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno); | 
|---|
| 419 | #endif | 
|---|
| 420 |  | 
|---|
| 421 | return retval; | 
|---|
| 422 | } | 
|---|
| 423 |  | 
|---|
| 424 | bool READ_STRUCT::read() | 
|---|
| 425 | { | 
|---|
| 426 | if (!connected) | 
|---|
| 427 | return false; | 
|---|
| 428 |  | 
|---|
| 429 | if (bufLen==0) | 
|---|
| 430 | return true; | 
|---|
| 431 |  | 
|---|
| 432 | const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT); | 
|---|
| 433 | // recv failed | 
|---|
| 434 | if (jrd<0) | 
|---|
| 435 | { | 
|---|
| 436 | // There was just nothing waiting | 
|---|
| 437 | if (errno==EWOULDBLOCK || errno==EAGAIN) | 
|---|
| 438 | return false; | 
|---|
| 439 |  | 
|---|
| 440 | factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno); | 
|---|
| 441 | return false; | 
|---|
| 442 | } | 
|---|
| 443 |  | 
|---|
| 444 | // connection was closed ... | 
|---|
| 445 | if (jrd==0) | 
|---|
| 446 | { | 
|---|
| 447 | factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId); | 
|---|
| 448 |  | 
|---|
| 449 | destroy();//DestroySocket(rd[i]); //generate address and socket | 
|---|
| 450 | return false; | 
|---|
| 451 | } | 
|---|
| 452 |  | 
|---|
| 453 | totBytes += jrd; | 
|---|
| 454 |  | 
|---|
| 455 | // are we skipping this board ... | 
|---|
| 456 | if (bufTyp==kStream) | 
|---|
| 457 | return false; | 
|---|
| 458 |  | 
|---|
| 459 | if (bufPos==B) | 
|---|
| 460 | gettimeofday(&time, NULL); | 
|---|
| 461 |  | 
|---|
| 462 | bufPos += jrd;  //==> prepare for continuation | 
|---|
| 463 | bufLen -= jrd; | 
|---|
| 464 |  | 
|---|
| 465 | // not yet all read | 
|---|
| 466 | return bufLen==0; | 
|---|
| 467 | } | 
|---|
| 468 |  | 
|---|
| 469 | void READ_STRUCT::swapHeader() | 
|---|
| 470 | { | 
|---|
| 471 | S[1]  = ntohs(S[1]);    // package_length (bytes not swapped!) | 
|---|
| 472 | S[2]  = ntohs(S[2]);    // version_no | 
|---|
| 473 | S[3]  = ntohs(S[3]);    // PLLLCK | 
|---|
| 474 | S[4]  = ntohs(S[4]);    // trigger_crc | 
|---|
| 475 | S[5]  = ntohs(S[5]);    // trigger_type | 
|---|
| 476 |  | 
|---|
| 477 | I[3]  = ntohl(I[3]);    // trigger_id | 
|---|
| 478 | I[4]  = ntohl(I[4]);    // fad_evt_counter | 
|---|
| 479 | I[5]  = ntohl(I[5]);    // REFCLK_frequency | 
|---|
| 480 |  | 
|---|
| 481 | S[12] = ntohs(S[12]);   // board id | 
|---|
| 482 | S[13] = ntohs(S[13]);   // adc_clock_phase_shift | 
|---|
| 483 | S[14] = ntohs(S[14]);   // number_of_triggers_to_generate | 
|---|
| 484 | S[15] = ntohs(S[15]);   // trigger_generator_prescaler | 
|---|
| 485 |  | 
|---|
| 486 | I[10] = ntohl(I[10]);   // runnumber; | 
|---|
| 487 | I[11] = ntohl(I[11]);   // time; | 
|---|
| 488 |  | 
|---|
| 489 | // Use back inserter?? | 
|---|
| 490 | for (int s=24; s<24+NTemp+NDAC; s++) | 
|---|
| 491 | S[s] = ntohs(S[s]); // drs_temperature / dac | 
|---|
| 492 | } | 
|---|
| 493 |  | 
|---|
| 494 | void READ_STRUCT::swapData() | 
|---|
| 495 | { | 
|---|
| 496 | // swapEventHeaderBytes: End of the header. to channels now | 
|---|
| 497 |  | 
|---|
| 498 | int i = 36; | 
|---|
| 499 | for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++) | 
|---|
| 500 | { | 
|---|
| 501 | S[i+0] = ntohs(S[i+0]);//id | 
|---|
| 502 | S[i+1] = ntohs(S[i+1]);//start_cell | 
|---|
| 503 | S[i+2] = ntohs(S[i+2]);//roi | 
|---|
| 504 | S[i+3] = ntohs(S[i+3]);//filling | 
|---|
| 505 |  | 
|---|
| 506 | i += 4+S[i+2];//skip the pixel data | 
|---|
| 507 | } | 
|---|
| 508 | } | 
|---|
| 509 |  | 
|---|
| 510 | // ========================================================================== | 
|---|
| 511 |  | 
|---|
| 512 | bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[]) | 
|---|
| 513 | { | 
|---|
| 514 | int xjr = -1; | 
|---|
| 515 | int xkr = -1; | 
|---|
| 516 |  | 
|---|
| 517 | //points to the very first roi | 
|---|
| 518 | int roiPtr = sizeof(PEVNT_HEADER)/2 + 2; | 
|---|
| 519 |  | 
|---|
| 520 | roi[0] = ntohs(rd.S[roiPtr]); | 
|---|
| 521 |  | 
|---|
| 522 | for (int jr = 0; jr < 9; jr++) | 
|---|
| 523 | { | 
|---|
| 524 | roi[jr] = ntohs(rd.S[roiPtr]); | 
|---|
| 525 |  | 
|---|
| 526 | if (roi[jr]>1024) | 
|---|
| 527 | { | 
|---|
| 528 | factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]); | 
|---|
| 529 | return false; | 
|---|
| 530 | } | 
|---|
| 531 |  | 
|---|
| 532 | // Check that the roi of pixels jr are compatible with the one of pixel 0 | 
|---|
| 533 | if (jr!=8 && roi[jr]!=roi[0]) | 
|---|
| 534 | { | 
|---|
| 535 | xjr = jr; | 
|---|
| 536 | break; | 
|---|
| 537 | } | 
|---|
| 538 |  | 
|---|
| 539 | // Check that the roi of all other DRS chips on boards are compatible | 
|---|
| 540 | for (int kr = 1; kr < 4; kr++) | 
|---|
| 541 | { | 
|---|
| 542 | const int kroi = ntohs(rd.S[roiPtr]); | 
|---|
| 543 | if (kroi != roi[jr]) | 
|---|
| 544 | { | 
|---|
| 545 | xjr = jr; | 
|---|
| 546 | xkr = kr; | 
|---|
| 547 | break; | 
|---|
| 548 | } | 
|---|
| 549 | roiPtr += kroi+4; | 
|---|
| 550 | } | 
|---|
| 551 | } | 
|---|
| 552 |  | 
|---|
| 553 | if (xjr>=0) | 
|---|
| 554 | { | 
|---|
| 555 | if (xkr<0) | 
|---|
| 556 | factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]); | 
|---|
| 557 | else | 
|---|
| 558 | factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr])); | 
|---|
| 559 |  | 
|---|
| 560 | return false; | 
|---|
| 561 | } | 
|---|
| 562 |  | 
|---|
| 563 | if (roi[8] < roi[0]) | 
|---|
| 564 | { | 
|---|
| 565 | factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]); | 
|---|
| 566 | return false; | 
|---|
| 567 | } | 
|---|
| 568 |  | 
|---|
| 569 | return true; | 
|---|
| 570 | } | 
|---|
| 571 |  | 
|---|
| 572 | list<shared_ptr<EVT_CTRL2>> evtCtrl; | 
|---|
| 573 |  | 
|---|
| 574 | shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun) | 
|---|
| 575 | { | 
|---|
| 576 | /* | 
|---|
| 577 | checkroi consistence | 
|---|
| 578 | find existing entry | 
|---|
| 579 | if no entry, try to allocate memory | 
|---|
| 580 | if entry and memory, init event structure | 
|---|
| 581 | */ | 
|---|
| 582 |  | 
|---|
| 583 | uint16_t nRoi[9]; | 
|---|
| 584 | if (!checkRoiConsistency(rd, nRoi)) | 
|---|
| 585 | return shared_ptr<EVT_CTRL2>(); | 
|---|
| 586 |  | 
|---|
| 587 | for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++) | 
|---|
| 588 | { | 
|---|
| 589 | // A reference is enough because the evtCtrl holds the shared_ptr anyway | 
|---|
| 590 | const shared_ptr<EVT_CTRL2> &evt = *it; | 
|---|
| 591 |  | 
|---|
| 592 | // If the run is different, go on searching. | 
|---|
| 593 | // We cannot stop searching if a lower run-id is found as in | 
|---|
| 594 | // the case of the events, because theoretically, there | 
|---|
| 595 | // can be the same run on two different days. | 
|---|
| 596 | if (rd.H.runnumber != evt->runNum) | 
|---|
| 597 | continue; | 
|---|
| 598 |  | 
|---|
| 599 | // If the ID of the new event if higher than the last one stored | 
|---|
| 600 | // in that run, we have to assign a new slot (leave the loop) | 
|---|
| 601 | if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/) | 
|---|
| 602 | break; | 
|---|
| 603 |  | 
|---|
| 604 | if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/) | 
|---|
| 605 | continue; | 
|---|
| 606 |  | 
|---|
| 607 | // We have found an entry with the same runID and evtID | 
|---|
| 608 | // Check if ROI is consistent | 
|---|
| 609 | if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8]) | 
|---|
| 610 | { | 
|---|
| 611 | factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.", | 
|---|
| 612 | evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]); | 
|---|
| 613 | return shared_ptr<EVT_CTRL2>(); | 
|---|
| 614 | } | 
|---|
| 615 |  | 
|---|
| 616 | // It is maybe not likely, but the header of this board might have | 
|---|
| 617 | // arrived earlier. (We could also update the run-info, but | 
|---|
| 618 | // this should not make a difference here) | 
|---|
| 619 | if ((rd.time.tv_sec==evt->time.tv_sec && rd.time.tv_usec<evt->time.tv_usec) || | 
|---|
| 620 | rd.time.tv_sec<evt->time.tv_sec) | 
|---|
| 621 | evt->time = rd.time; | 
|---|
| 622 |  | 
|---|
| 623 | //everything seems fine so far ==> use this slot .... | 
|---|
| 624 | return evt; | 
|---|
| 625 | } | 
|---|
| 626 |  | 
|---|
| 627 | if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8])) | 
|---|
| 628 | { | 
|---|
| 629 | factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)", | 
|---|
| 630 | actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter); | 
|---|
| 631 | return shared_ptr<EVT_CTRL2>(); | 
|---|
| 632 | } | 
|---|
| 633 |  | 
|---|
| 634 | EVT_CTRL2 *evt = new EVT_CTRL2; | 
|---|
| 635 |  | 
|---|
| 636 | evt->time   = rd.time; | 
|---|
| 637 |  | 
|---|
| 638 | evt->runNum = rd.H.runnumber; | 
|---|
| 639 | evt->evNum  = rd.H.fad_evt_counter; | 
|---|
| 640 |  | 
|---|
| 641 | evt->trgNum = rd.H.trigger_id; | 
|---|
| 642 | evt->trgTyp = rd.H.trigger_type; | 
|---|
| 643 |  | 
|---|
| 644 | evt->nRoi   = nRoi[0]; | 
|---|
| 645 | evt->nRoiTM = nRoi[8]; | 
|---|
| 646 |  | 
|---|
| 647 | const bool newrun = actrun->runId != rd.H.runnumber; | 
|---|
| 648 | if (newrun) | 
|---|
| 649 | { | 
|---|
| 650 | // Since we have started a new run, we know already when to close the | 
|---|
| 651 | // previous run in terms of number of events | 
|---|
| 652 | actrun->maxEvt = actrun->lastEvt; | 
|---|
| 653 |  | 
|---|
| 654 | factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d(%d), prev=%d", | 
|---|
| 655 | rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId); | 
|---|
| 656 |  | 
|---|
| 657 | // The new run is the active run now | 
|---|
| 658 | actrun = make_shared<RUN_CTRL2>(); | 
|---|
| 659 |  | 
|---|
| 660 | const time_t &tsec = evt->time.tv_sec; | 
|---|
| 661 |  | 
|---|
| 662 | actrun->openTime  = tsec; | 
|---|
| 663 | actrun->closeTime = tsec + 3600 * 24; // max time allowed | 
|---|
| 664 | actrun->runId     = rd.H.runnumber; | 
|---|
| 665 | actrun->roi0      = nRoi[0];  // FIXME: Make obsolete! | 
|---|
| 666 | actrun->roi8      = nRoi[8];  // FIXME: Make obsolete! | 
|---|
| 667 |  | 
|---|
| 668 | // Signal the fadctrl that a new run has been started | 
|---|
| 669 | // Note this is the only place at which we can ensure that | 
|---|
| 670 | // gotnewRun is called only once | 
|---|
| 671 | gotNewRun(*actrun); | 
|---|
| 672 | } | 
|---|
| 673 |  | 
|---|
| 674 | // Keep pointer to run of this event | 
|---|
| 675 | evt->runCtrl = actrun; | 
|---|
| 676 |  | 
|---|
| 677 | // Increase the number of events we have started to receive in this run | 
|---|
| 678 | actrun->lastTime = evt->time.tv_sec;  // Time when the last event was received | 
|---|
| 679 | actrun->lastEvt++; | 
|---|
| 680 |  | 
|---|
| 681 | // An event can be the first and the last, but not the last and the first. | 
|---|
| 682 | // Therefore gotNewRun is called before runFinished. | 
|---|
| 683 | // runFinished signals that the last event of a run was just received. Processing | 
|---|
| 684 | // might still be ongoing, but we can start a new run. | 
|---|
| 685 | const bool cond1 = actrun->lastEvt  < actrun->maxEvt;     // max number of events not reached | 
|---|
| 686 | const bool cond2 = actrun->lastTime < actrun->closeTime;  // max time not reached | 
|---|
| 687 | if (!cond1 || !cond2) | 
|---|
| 688 | runFinished(); | 
|---|
| 689 |  | 
|---|
| 690 | // We don't mind here that this is not common to all events, | 
|---|
| 691 | // because every coming event will fullfil the condition as well. | 
|---|
| 692 | if (!cond1) | 
|---|
| 693 | evt->closeRequest |= kRequestMaxEvtsReached; | 
|---|
| 694 | if (!cond2) | 
|---|
| 695 | evt->closeRequest |= kRequestMaxTimeReached; | 
|---|
| 696 |  | 
|---|
| 697 | // Secure access to evtCtrl against access in CloseRunFile | 
|---|
| 698 | // This should be the last... otherwise we can run into threading issues | 
|---|
| 699 | // if the event is accessed before it is fully initialized. | 
|---|
| 700 | evtCtrl.emplace_back(evt); | 
|---|
| 701 | return evtCtrl.back(); | 
|---|
| 702 | } | 
|---|
| 703 |  | 
|---|
| 704 |  | 
|---|
| 705 | void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt) | 
|---|
| 706 | { | 
|---|
| 707 | const int i = rBuf.sockId; | 
|---|
| 708 |  | 
|---|
| 709 | memcpy(evt->FADhead+i, &rBuf.H, sizeof(PEVNT_HEADER)); | 
|---|
| 710 |  | 
|---|
| 711 | int src = sizeof(PEVNT_HEADER) / 2;  // Header is 72 byte = 36 shorts | 
|---|
| 712 |  | 
|---|
| 713 | // consistency of ROIs have been checked already (is it all correct?) | 
|---|
| 714 | const uint16_t &roi = rBuf.S[src+2]; | 
|---|
| 715 |  | 
|---|
| 716 | // different sort in FAD board..... | 
|---|
| 717 | EVENT *event = evt->fEvent; | 
|---|
| 718 | for (int px = 0; px < 9; px++) | 
|---|
| 719 | { | 
|---|
| 720 | for (int drs = 0; drs < 4; drs++) | 
|---|
| 721 | { | 
|---|
| 722 | const int16_t pixC = rBuf.S[src+1];    // start-cell | 
|---|
| 723 | const int16_t pixR = rBuf.S[src+2];    // roi | 
|---|
| 724 | //here we should check if pixH is correct .... | 
|---|
| 725 |  | 
|---|
| 726 | const int pixS = i*36 + drs*9 + px; | 
|---|
| 727 |  | 
|---|
| 728 | event->StartPix[pixS] = pixC; | 
|---|
| 729 |  | 
|---|
| 730 | memcpy(event->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2); | 
|---|
| 731 |  | 
|---|
| 732 | src += 4+pixR; | 
|---|
| 733 |  | 
|---|
| 734 | // Treatment for ch 9 (TM channel) | 
|---|
| 735 | if (px != 8) | 
|---|
| 736 | continue; | 
|---|
| 737 |  | 
|---|
| 738 | const int tmS = i*4 + drs; | 
|---|
| 739 |  | 
|---|
| 740 | //and we have additional TM info | 
|---|
| 741 | if (pixR > roi) | 
|---|
| 742 | { | 
|---|
| 743 | event->StartTM[tmS] = (pixC + pixR - roi) % 1024; | 
|---|
| 744 |  | 
|---|
| 745 | memcpy(event->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2); | 
|---|
| 746 | } | 
|---|
| 747 | else | 
|---|
| 748 | { | 
|---|
| 749 | event->StartTM[tmS] = -1; | 
|---|
| 750 | } | 
|---|
| 751 | } | 
|---|
| 752 | } | 
|---|
| 753 | } | 
|---|
| 754 |  | 
|---|
| 755 | // ========================================================================== | 
|---|
| 756 |  | 
|---|
| 757 | uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt) | 
|---|
| 758 | { | 
|---|
| 759 | factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)", | 
|---|
| 760 | evt->runNum, evt->evNum, evtCtrl.size(), txt); | 
|---|
| 761 |  | 
|---|
| 762 | uint64_t report = 0; | 
|---|
| 763 |  | 
|---|
| 764 | char str[1000]; | 
|---|
| 765 |  | 
|---|
| 766 | int ik=0; | 
|---|
| 767 | for (int ib=0; ib<NBOARDS; ib++) | 
|---|
| 768 | { | 
|---|
| 769 | if (ib%10==0) | 
|---|
| 770 | str[ik++] = '|'; | 
|---|
| 771 |  | 
|---|
| 772 | const int jb = evt->board[ib]; | 
|---|
| 773 | if (jb>=0) // data received from that board | 
|---|
| 774 | { | 
|---|
| 775 | str[ik++] = '0'+(jb%10); | 
|---|
| 776 | continue; | 
|---|
| 777 | } | 
|---|
| 778 |  | 
|---|
| 779 | // FIXME: This is not synchronous... it reports | 
|---|
| 780 | // accoridng to the current connection status, not w.r.t. to the | 
|---|
| 781 | // one when the event was taken. | 
|---|
| 782 | if (gi_NumConnect[ib]==0) // board not connected | 
|---|
| 783 | { | 
|---|
| 784 | str[ik++] = 'x'; | 
|---|
| 785 | continue; | 
|---|
| 786 | } | 
|---|
| 787 |  | 
|---|
| 788 | // data from this board lost | 
|---|
| 789 | str[ik++] = '.'; | 
|---|
| 790 | report |= ((uint64_t)1)<<ib; | 
|---|
| 791 | } | 
|---|
| 792 |  | 
|---|
| 793 | str[ik++] = '|'; | 
|---|
| 794 | str[ik]   = 0; | 
|---|
| 795 |  | 
|---|
| 796 | factOut(MessageImp::kWarn, str); | 
|---|
| 797 |  | 
|---|
| 798 | return report; | 
|---|
| 799 | } | 
|---|
| 800 |  | 
|---|
| 801 | // ========================================================================== | 
|---|
| 802 | // ========================================================================== | 
|---|
| 803 |  | 
|---|
| 804 | void proc1(const shared_ptr<EVT_CTRL2> &); | 
|---|
| 805 |  | 
|---|
| 806 | Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&proc1, placeholders::_1)); | 
|---|
| 807 |  | 
|---|
| 808 | void proc1(const shared_ptr<EVT_CTRL2> &evt) | 
|---|
| 809 | { | 
|---|
| 810 | applyCalib(*evt, processingQueue1.size()); | 
|---|
| 811 | } | 
|---|
| 812 |  | 
|---|
| 813 | // If this is not convenient anymore, it could be replaced by | 
|---|
| 814 | // a command queue, to which command+data is posted, | 
|---|
| 815 | // (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo) | 
|---|
| 816 | void writeEvt(const shared_ptr<EVT_CTRL2> &evt) | 
|---|
| 817 | { | 
|---|
| 818 | //const shared_ptr<RUN_CTRL2> &run = evt->runCtrl; | 
|---|
| 819 | RUN_CTRL2 &run = *evt->runCtrl; | 
|---|
| 820 |  | 
|---|
| 821 | // Is this a valid event or just an empty event to trigger run close? | 
|---|
| 822 | // If this is not an empty event open the new run-file | 
|---|
| 823 | // Empty events are there to trigger run-closing conditions | 
|---|
| 824 | if (evt->valid()) | 
|---|
| 825 | { | 
|---|
| 826 | // File not yet open | 
|---|
| 827 | if (run.fileStat==kFileNotYetOpen) | 
|---|
| 828 | { | 
|---|
| 829 | // runOpen will close a previous run, if still open | 
|---|
| 830 | if (!runOpen(*evt)) | 
|---|
| 831 | { | 
|---|
| 832 | factPrintf(MessageImp::kError, "Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum); | 
|---|
| 833 | run.fileStat = kFileClosed; | 
|---|
| 834 | return; | 
|---|
| 835 | } | 
|---|
| 836 |  | 
|---|
| 837 | factPrintf(MessageImp::kInfo, "Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum); | 
|---|
| 838 | run.fileStat = kFileOpen; | 
|---|
| 839 | } | 
|---|
| 840 |  | 
|---|
| 841 | // Here we have a valid calibration and can go on with that. | 
|---|
| 842 | // It is important that _all_ events are sent for calibration (except broken ones) | 
|---|
| 843 | processingQueue1.post(evt); | 
|---|
| 844 | } | 
|---|
| 845 |  | 
|---|
| 846 | // File already closed | 
|---|
| 847 | if (run.fileStat==kFileClosed) | 
|---|
| 848 | return; | 
|---|
| 849 |  | 
|---|
| 850 | bool rc1 = true; | 
|---|
| 851 | if (evt->valid()) | 
|---|
| 852 | { | 
|---|
| 853 | rc1 = runWrite(*evt); | 
|---|
| 854 | if (!rc1) | 
|---|
| 855 | factPrintf(MessageImp::kError, "Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum); | 
|---|
| 856 | } | 
|---|
| 857 |  | 
|---|
| 858 | // File not open... no need to close or to check for close | 
|---|
| 859 | // ... this is the case if CloseRunFile was called before any file was opened. | 
|---|
| 860 | if (run.fileStat!=kFileOpen) | 
|---|
| 861 | return; | 
|---|
| 862 |  | 
|---|
| 863 | // File is not yet to be closed. | 
|---|
| 864 | if (rc1 && evt->closeRequest==kRequestNone) | 
|---|
| 865 | return; | 
|---|
| 866 |  | 
|---|
| 867 | runClose(run); | 
|---|
| 868 | run.fileStat = kFileClosed; | 
|---|
| 869 |  | 
|---|
| 870 | vector<string> reason; | 
|---|
| 871 | if (evt->closeRequest&kRequestManual) | 
|---|
| 872 | reason.emplace_back("close requested"); | 
|---|
| 873 | if (evt->closeRequest&kRequestTimeout) | 
|---|
| 874 | reason.emplace_back("receive timeout"); | 
|---|
| 875 | if (evt->closeRequest&kRequestConnectionChange) | 
|---|
| 876 | reason.emplace_back("connection changed"); | 
|---|
| 877 | if (evt->closeRequest&kRequestEventCheckFailed) | 
|---|
| 878 | reason.emplace_back("event check failed"); | 
|---|
| 879 | if (evt->closeRequest&kRequestMaxTimeReached) | 
|---|
| 880 | reason.push_back(to_string(run.closeTime-run.openTime)+"s reached"); | 
|---|
| 881 | if (evt->closeRequest&kRequestMaxEvtsReached) | 
|---|
| 882 | reason.push_back(to_string(run.maxEvt)+" evts reached"); | 
|---|
| 883 | if (!rc1) | 
|---|
| 884 | reason.emplace_back("runWrite failed"); | 
|---|
| 885 |  | 
|---|
| 886 | const string str = boost::algorithm::join(reason, ", "); | 
|---|
| 887 | factPrintf(MessageImp::kInfo, "File closed because %s",  str.c_str()); | 
|---|
| 888 | } | 
|---|
| 889 |  | 
|---|
| 890 | Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1)); | 
|---|
| 891 |  | 
|---|
| 892 | void procEvt(const shared_ptr<EVT_CTRL2> &evt) | 
|---|
| 893 | { | 
|---|
| 894 | if (evt->valid()) | 
|---|
| 895 | { | 
|---|
| 896 | EVENT *event = evt->fEvent; | 
|---|
| 897 |  | 
|---|
| 898 | // This is already done in initMemory() | 
|---|
| 899 | //event->Roi         = evt->runCtrl->roi0; | 
|---|
| 900 | //event->RoiTM       = evt->runCtrl->roi8; | 
|---|
| 901 | //event->EventNum    = evt->evNum; | 
|---|
| 902 | //event->TriggerNum  = evt->trgNum; | 
|---|
| 903 | //event->TriggerType = evt->trgTyp; | 
|---|
| 904 |  | 
|---|
| 905 | event->NumBoards = evt->nBoard; | 
|---|
| 906 |  | 
|---|
| 907 | event->PCTime = evt->time.tv_sec; | 
|---|
| 908 | event->PCUsec = evt->time.tv_usec; | 
|---|
| 909 |  | 
|---|
| 910 | for (int ib=0; ib<NBOARDS; ib++) | 
|---|
| 911 | event->BoardTime[ib] = evt->FADhead[ib].time; | 
|---|
| 912 |  | 
|---|
| 913 | if (!eventCheck(*evt)) | 
|---|
| 914 | { | 
|---|
| 915 | secondaryQueue.emplace(new EVT_CTRL2(kRequestEventCheckFailed, evt->runCtrl)); | 
|---|
| 916 | return; | 
|---|
| 917 | } | 
|---|
| 918 | } | 
|---|
| 919 |  | 
|---|
| 920 | // If file is open post the event for being written | 
|---|
| 921 | secondaryQueue.post(evt); | 
|---|
| 922 | } | 
|---|
| 923 |  | 
|---|
| 924 | // ========================================================================== | 
|---|
| 925 | // ========================================================================== | 
|---|
| 926 |  | 
|---|
| 927 | /* | 
|---|
| 928 | task 1-4: | 
|---|
| 929 |  | 
|---|
| 930 | lock1()-lock4(); | 
|---|
| 931 | while (1) | 
|---|
| 932 | { | 
|---|
| 933 | wait for signal [lockN];  // unlocked | 
|---|
| 934 |  | 
|---|
| 935 | while (n!=10) | 
|---|
| 936 | wait sockets; | 
|---|
| 937 | read; | 
|---|
| 938 |  | 
|---|
| 939 | lockM(); | 
|---|
| 940 | finished[n] = true; | 
|---|
| 941 | signal(mainloop); | 
|---|
| 942 | unlockM(); | 
|---|
| 943 | } | 
|---|
| 944 |  | 
|---|
| 945 |  | 
|---|
| 946 | mainloop: | 
|---|
| 947 |  | 
|---|
| 948 | while (1) | 
|---|
| 949 | { | 
|---|
| 950 | lockM(); | 
|---|
| 951 | while (!finished[0] || !finished[1] ...) | 
|---|
| 952 | wait for signal [lockM];  // unlocked... signals can be sent | 
|---|
| 953 | finished[0-1] = false; | 
|---|
| 954 | unlockM() | 
|---|
| 955 |  | 
|---|
| 956 | copy data to queue    // locked | 
|---|
| 957 |  | 
|---|
| 958 | lockN[0-3]; | 
|---|
| 959 | signalN[0-3]; | 
|---|
| 960 | unlockN[0-3]; | 
|---|
| 961 | } | 
|---|
| 962 |  | 
|---|
| 963 |  | 
|---|
| 964 | */ | 
|---|
| 965 |  | 
|---|
| 966 | /* | 
|---|
| 967 | while (g_reset) | 
|---|
| 968 | { | 
|---|
| 969 | shared_ptr<EVT_CTRL2> evt = new shared_ptr<>; | 
|---|
| 970 |  | 
|---|
| 971 | // Check that all sockets are connected | 
|---|
| 972 |  | 
|---|
| 973 | for (int i=0; i<40; i++) | 
|---|
| 974 | if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0) | 
|---|
| 975 | factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno); | 
|---|
| 976 |  | 
|---|
| 977 | while (g_reset) | 
|---|
| 978 | { | 
|---|
| 979 | if (READ_STRUCT::wait()<0) | 
|---|
| 980 | break; | 
|---|
| 981 |  | 
|---|
| 982 | if (rc_epoll==0) | 
|---|
| 983 | break; | 
|---|
| 984 |  | 
|---|
| 985 | for (int jj=0; jj<rc_epoll; jj++) | 
|---|
| 986 | { | 
|---|
| 987 | READ_STRUCT *rs = READ_STRUCT::get(jj); | 
|---|
| 988 | if (!rs->connected) | 
|---|
| 989 | continue; | 
|---|
| 990 |  | 
|---|
| 991 | const bool rc_read = rs->read(); | 
|---|
| 992 | if (!rc_read) | 
|---|
| 993 | continue; | 
|---|
| 994 |  | 
|---|
| 995 | if (rs->bufTyp==READ_STRUCT::kHeader) | 
|---|
| 996 | { | 
|---|
| 997 | [...] | 
|---|
| 998 | } | 
|---|
| 999 |  | 
|---|
| 1000 | [...] | 
|---|
| 1001 |  | 
|---|
| 1002 | if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0) | 
|---|
| 1003 | factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno); | 
|---|
| 1004 | } | 
|---|
| 1005 |  | 
|---|
| 1006 | if (once_a_second) | 
|---|
| 1007 | { | 
|---|
| 1008 | if (evt==timeout) | 
|---|
| 1009 | break; | 
|---|
| 1010 | } | 
|---|
| 1011 | } | 
|---|
| 1012 |  | 
|---|
| 1013 | if (evt.nBoards==actBoards) | 
|---|
| 1014 | primaryQueue.post(evt); | 
|---|
| 1015 | } | 
|---|
| 1016 | */ | 
|---|
| 1017 |  | 
|---|
| 1018 | Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1)); | 
|---|
| 1019 |  | 
|---|
| 1020 | // This corresponds more or less to fFile... should we merge both? | 
|---|
| 1021 | shared_ptr<RUN_CTRL2> actrun; | 
|---|
| 1022 |  | 
|---|
| 1023 | void CloseRunFile() | 
|---|
| 1024 | { | 
|---|
| 1025 | // Currently we need actrun here, to be able to set kFileClosed. | 
|---|
| 1026 | // Apart from that we have to ensure that there is an open file at all | 
|---|
| 1027 | // which we can close. | 
|---|
| 1028 | // Submission to the primary queue ensures that the event | 
|---|
| 1029 | // is placed at the right place in the processing chain. | 
|---|
| 1030 | // (Corresponds to the correct run) | 
|---|
| 1031 | primaryQueue.emplace(new EVT_CTRL2(kRequestManual, actrun)); | 
|---|
| 1032 | } | 
|---|
| 1033 |  | 
|---|
| 1034 | bool mainloop(READ_STRUCT *rd) | 
|---|
| 1035 | { | 
|---|
| 1036 | factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop"); | 
|---|
| 1037 |  | 
|---|
| 1038 | primaryQueue.start(); | 
|---|
| 1039 | secondaryQueue.start(); | 
|---|
| 1040 | processingQueue1.start();; | 
|---|
| 1041 |  | 
|---|
| 1042 | actrun = make_shared<RUN_CTRL2>(); | 
|---|
| 1043 |  | 
|---|
| 1044 | //time in seconds | 
|---|
| 1045 | time_t gi_SecTime = time(NULL)-1; | 
|---|
| 1046 |  | 
|---|
| 1047 | //loop until global variable g_runStat claims stop | 
|---|
| 1048 | g_reset = 0; | 
|---|
| 1049 | while (g_reset == 0) | 
|---|
| 1050 | { | 
|---|
| 1051 | #ifdef USE_POLL | 
|---|
| 1052 | int    pp[40]; | 
|---|
| 1053 | int    nn = 0; | 
|---|
| 1054 | pollfd fds[40]; | 
|---|
| 1055 | for (int i=0; i<40; i++) | 
|---|
| 1056 | { | 
|---|
| 1057 | if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0) | 
|---|
| 1058 | { | 
|---|
| 1059 | fds[nn].fd = rd[i].socket; | 
|---|
| 1060 | fds[nn].events = POLLIN; | 
|---|
| 1061 | pp[nn] = i; | 
|---|
| 1062 | nn++; | 
|---|
| 1063 | } | 
|---|
| 1064 | } | 
|---|
| 1065 |  | 
|---|
| 1066 | const int rc_epoll = poll(fds, nn, 100); | 
|---|
| 1067 | if (rc_epoll<0) | 
|---|
| 1068 | break; | 
|---|
| 1069 | #endif | 
|---|
| 1070 |  | 
|---|
| 1071 | #ifdef USE_SELECT | 
|---|
| 1072 | fd_set readfs; | 
|---|
| 1073 | FD_ZERO(&readfs); | 
|---|
| 1074 | int nfsd = 0; | 
|---|
| 1075 | for (int i=0; i<NBOARDS; i++) | 
|---|
| 1076 | if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0) | 
|---|
| 1077 | { | 
|---|
| 1078 | FD_SET(rd[i].socket, &readfs); | 
|---|
| 1079 | if (rd[i].socket>nfsd) | 
|---|
| 1080 | nfsd = rd[i].socket; | 
|---|
| 1081 | } | 
|---|
| 1082 |  | 
|---|
| 1083 | timeval tv; | 
|---|
| 1084 | tv.tv_sec = 0; | 
|---|
| 1085 | tv.tv_usec = 100000; | 
|---|
| 1086 | const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv); | 
|---|
| 1087 | // 0: timeout | 
|---|
| 1088 | // -1: error | 
|---|
| 1089 | if (rc_select<0) | 
|---|
| 1090 | { | 
|---|
| 1091 | factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno); | 
|---|
| 1092 | continue; | 
|---|
| 1093 | } | 
|---|
| 1094 | #endif | 
|---|
| 1095 |  | 
|---|
| 1096 | #ifdef USE_EPOLL | 
|---|
| 1097 | const int rc_epoll = READ_STRUCT::wait(); | 
|---|
| 1098 | if (rc_epoll<0) | 
|---|
| 1099 | break; | 
|---|
| 1100 | #endif | 
|---|
| 1101 |  | 
|---|
| 1102 | #ifdef PRIORITY_QUEUE | 
|---|
| 1103 | priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio; | 
|---|
| 1104 |  | 
|---|
| 1105 | for (int i=0; i<NBOARDS; i++) | 
|---|
| 1106 | if (rd[i].connected) | 
|---|
| 1107 | prio.push(rd+i); | 
|---|
| 1108 |  | 
|---|
| 1109 | if (!prio.empty()) do | 
|---|
| 1110 | #endif | 
|---|
| 1111 |  | 
|---|
| 1112 |  | 
|---|
| 1113 | #ifdef USE_POLL | 
|---|
| 1114 | for (int jj=0; jj<nn; jj++) | 
|---|
| 1115 | #endif | 
|---|
| 1116 | #ifdef USE_EPOLL | 
|---|
| 1117 | for (int jj=0; jj<rc_epoll; jj++) | 
|---|
| 1118 | #endif | 
|---|
| 1119 | #if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE) | 
|---|
| 1120 | for (int jj=0; jj<NBOARDS; jj++) | 
|---|
| 1121 | #endif | 
|---|
| 1122 | { | 
|---|
| 1123 | #ifdef PRIORITY_QUEUE | 
|---|
| 1124 | READ_STRUCT *rs = prio.top(); | 
|---|
| 1125 | #endif | 
|---|
| 1126 | #ifdef USE_SELECT | 
|---|
| 1127 | if (!FD_ISSET(rs->socket, &readfs)) | 
|---|
| 1128 | continue; | 
|---|
| 1129 | #endif | 
|---|
| 1130 |  | 
|---|
| 1131 | #ifdef USE_POLL | 
|---|
| 1132 | if ((fds[jj].revents&POLLIN)==0) | 
|---|
| 1133 | continue; | 
|---|
| 1134 | #endif | 
|---|
| 1135 |  | 
|---|
| 1136 | #ifdef USE_EPOLL | 
|---|
| 1137 | // FIXME: How to get i? | 
|---|
| 1138 | READ_STRUCT *rs = READ_STRUCT::get(jj); | 
|---|
| 1139 | #endif | 
|---|
| 1140 |  | 
|---|
| 1141 | #ifdef USE_POLL | 
|---|
| 1142 | // FIXME: How to get i? | 
|---|
| 1143 | READ_STRUCT *rs = &rd[pp[jj]]; | 
|---|
| 1144 | #endif | 
|---|
| 1145 |  | 
|---|
| 1146 | #if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE) | 
|---|
| 1147 | const int i = (jj%4)*10 + (jj/4); | 
|---|
| 1148 | READ_STRUCT *rs = &rd[i]; | 
|---|
| 1149 | #endif | 
|---|
| 1150 |  | 
|---|
| 1151 | #ifdef COMPLETE_EVENTS | 
|---|
| 1152 | if (rs->bufTyp==READ_STRUCT::kWait) | 
|---|
| 1153 | continue; | 
|---|
| 1154 | #endif | 
|---|
| 1155 |  | 
|---|
| 1156 | // ================================================================== | 
|---|
| 1157 |  | 
|---|
| 1158 | const bool rc_read = rs->read(); | 
|---|
| 1159 |  | 
|---|
| 1160 | // Connect might have gotten closed during read | 
|---|
| 1161 | gi_NumConnect[rs->sockId] = rs->connected; | 
|---|
| 1162 | gj.numConn[rs->sockId]    = rs->connected; | 
|---|
| 1163 |  | 
|---|
| 1164 | // Read either failed or disconnected, or the buffer is not yet full | 
|---|
| 1165 | if (!rc_read) | 
|---|
| 1166 | continue; | 
|---|
| 1167 |  | 
|---|
| 1168 | // ================================================================== | 
|---|
| 1169 |  | 
|---|
| 1170 | if (rs->bufTyp==READ_STRUCT::kHeader) | 
|---|
| 1171 | { | 
|---|
| 1172 | //check if startflag correct; else shift block .... | 
|---|
| 1173 | // FIXME: This is not enough... this combination of | 
|---|
| 1174 | //        bytes can be anywhere... at least the end bytes | 
|---|
| 1175 | //        must be checked somewhere, too. | 
|---|
| 1176 | uint k; | 
|---|
| 1177 | for (k=0; k<sizeof(PEVNT_HEADER)-1; k++) | 
|---|
| 1178 | { | 
|---|
| 1179 | //if (rs->B[k]==0xfb && rs->B[k+1] == 0x01) | 
|---|
| 1180 | if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0x01fb) | 
|---|
| 1181 | break; | 
|---|
| 1182 | } | 
|---|
| 1183 | rs->skip += k; | 
|---|
| 1184 |  | 
|---|
| 1185 | //no start of header found | 
|---|
| 1186 | if (k==sizeof(PEVNT_HEADER)-1) | 
|---|
| 1187 | { | 
|---|
| 1188 | rs->B[0]   = rs->B[sizeof(PEVNT_HEADER)-1]; | 
|---|
| 1189 | rs->bufPos = rs->B+1; | 
|---|
| 1190 | rs->bufLen = sizeof(PEVNT_HEADER)-1; | 
|---|
| 1191 | continue; | 
|---|
| 1192 | } | 
|---|
| 1193 |  | 
|---|
| 1194 | if (k > 0) | 
|---|
| 1195 | { | 
|---|
| 1196 | memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k); | 
|---|
| 1197 |  | 
|---|
| 1198 | rs->bufPos -= k; | 
|---|
| 1199 | rs->bufLen += k; | 
|---|
| 1200 |  | 
|---|
| 1201 | continue; // We need to read more (bufLen>0) | 
|---|
| 1202 | } | 
|---|
| 1203 |  | 
|---|
| 1204 | if (rs->skip>0) | 
|---|
| 1205 | { | 
|---|
| 1206 | factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId); | 
|---|
| 1207 | rs->skip = 0; | 
|---|
| 1208 | } | 
|---|
| 1209 |  | 
|---|
| 1210 | // Swap the header entries from network to host order | 
|---|
| 1211 | rs->swapHeader(); | 
|---|
| 1212 |  | 
|---|
| 1213 | rs->bufTyp = READ_STRUCT::kData; | 
|---|
| 1214 | rs->bufLen = rs->len() - sizeof(PEVNT_HEADER); | 
|---|
| 1215 |  | 
|---|
| 1216 | debugHead(rs->B);  // i and fadBoard not used | 
|---|
| 1217 |  | 
|---|
| 1218 | continue; | 
|---|
| 1219 | } | 
|---|
| 1220 |  | 
|---|
| 1221 | const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2); | 
|---|
| 1222 | if (end != 0xfe04) | 
|---|
| 1223 | { | 
|---|
| 1224 | factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x", | 
|---|
| 1225 | rs->sockId, rs->H.fad_evt_counter, rs->len(), end); | 
|---|
| 1226 |  | 
|---|
| 1227 | // ready to read next header | 
|---|
| 1228 | rs->bufTyp = READ_STRUCT::kHeader; | 
|---|
| 1229 | rs->bufLen = sizeof(PEVNT_HEADER); | 
|---|
| 1230 | rs->bufPos = rs->B; | 
|---|
| 1231 | // FIXME: What to do with the validity flag? | 
|---|
| 1232 | continue; | 
|---|
| 1233 | } | 
|---|
| 1234 |  | 
|---|
| 1235 | // get index into mBuffer for this event (create if needed) | 
|---|
| 1236 | const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun); | 
|---|
| 1237 |  | 
|---|
| 1238 | // We have a valid entry, but no memory has yet been allocated | 
|---|
| 1239 | if (evt && !evt->initMemory()) | 
|---|
| 1240 | { | 
|---|
| 1241 | const time_t tm = time(NULL); | 
|---|
| 1242 | if (evt->runCtrl->reportMem==tm) | 
|---|
| 1243 | continue; | 
|---|
| 1244 |  | 
|---|
| 1245 | factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum); | 
|---|
| 1246 | evt->runCtrl->reportMem = tm; | 
|---|
| 1247 | continue; | 
|---|
| 1248 | } | 
|---|
| 1249 |  | 
|---|
| 1250 | // ready to read next header | 
|---|
| 1251 | rs->bufTyp = READ_STRUCT::kHeader; | 
|---|
| 1252 | rs->bufLen = sizeof(PEVNT_HEADER); | 
|---|
| 1253 | rs->bufPos = rs->B; | 
|---|
| 1254 |  | 
|---|
| 1255 | // Fatal error occured. Event cannot be processed. Skip it. Start reading next header. | 
|---|
| 1256 | if (!evt) | 
|---|
| 1257 | continue; | 
|---|
| 1258 |  | 
|---|
| 1259 | // This should never happen | 
|---|
| 1260 | if (evt->board[rs->sockId] != -1) | 
|---|
| 1261 | { | 
|---|
| 1262 | factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.", | 
|---|
| 1263 | evt->evNum, rs->sockId, rs->sockId, rs->len()); | 
|---|
| 1264 | // FIXME: What to do with the validity flag? | 
|---|
| 1265 | continue; // Continue reading next header | 
|---|
| 1266 | } | 
|---|
| 1267 |  | 
|---|
| 1268 | // Swap the data entries (board headers) from network to host order | 
|---|
| 1269 | rs->swapData(); | 
|---|
| 1270 |  | 
|---|
| 1271 | // Copy data from rd[i] to mBuffer[evID] | 
|---|
| 1272 | copyData(*rs, evt.get()); | 
|---|
| 1273 |  | 
|---|
| 1274 | #ifdef COMPLETE_EVENTS | 
|---|
| 1275 | // Do not read anmymore from this board until the whole event has been received | 
|---|
| 1276 | rs->bufTyp = READ_STRUCT::kWait; | 
|---|
| 1277 | #endif | 
|---|
| 1278 | // now we have stored a new board contents into Event structure | 
|---|
| 1279 | evt->board[rs->sockId] = rs->sockId; | 
|---|
| 1280 | evt->header = evt->FADhead+rs->sockId; | 
|---|
| 1281 | evt->nBoard++; | 
|---|
| 1282 |  | 
|---|
| 1283 | #ifdef COMPLETE_EPOLL | 
|---|
| 1284 | if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0) | 
|---|
| 1285 | { | 
|---|
| 1286 | factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno); | 
|---|
| 1287 | break; | 
|---|
| 1288 | } | 
|---|
| 1289 | #endif | 
|---|
| 1290 | // event not yet complete | 
|---|
| 1291 | if (evt->nBoard < READ_STRUCT::activeSockets) | 
|---|
| 1292 | continue; | 
|---|
| 1293 |  | 
|---|
| 1294 | // All previous events are now flagged as incomplete ("expired") | 
|---|
| 1295 | // and will be removed. (This is a bit tricky, because pop_front() | 
|---|
| 1296 | // would invalidate the current iterator if not done _after_ the increment) | 
|---|
| 1297 | for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); ) | 
|---|
| 1298 | { | 
|---|
| 1299 | const bool found = it->get()==evt.get(); | 
|---|
| 1300 | if (!found) | 
|---|
| 1301 | reportIncomplete(*it, "expired"); | 
|---|
| 1302 | else | 
|---|
| 1303 | primaryQueue.post(evt); | 
|---|
| 1304 |  | 
|---|
| 1305 | // package_len is 0 if nothing was received. | 
|---|
| 1306 | for (int ib=0; ib<40; ib++) | 
|---|
| 1307 | rd[ib].relBytes += uint32_t((*it)->FADhead[ib].package_length)*2; | 
|---|
| 1308 |  | 
|---|
| 1309 | it++; | 
|---|
| 1310 | evtCtrl.pop_front(); | 
|---|
| 1311 |  | 
|---|
| 1312 | // We reached the current event, so we are done | 
|---|
| 1313 | if (found) | 
|---|
| 1314 | break; | 
|---|
| 1315 | } | 
|---|
| 1316 |  | 
|---|
| 1317 | #ifdef COMPLETE_EPOLL | 
|---|
| 1318 | for (int j=0; j<40; j++) | 
|---|
| 1319 | { | 
|---|
| 1320 | epoll_event ev; | 
|---|
| 1321 | ev.events = EPOLLIN; | 
|---|
| 1322 | ev.data.ptr = &rd[j];  // user data (union: ev.ptr) | 
|---|
| 1323 | if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0) | 
|---|
| 1324 | { | 
|---|
| 1325 | factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno); | 
|---|
| 1326 | return; | 
|---|
| 1327 | } | 
|---|
| 1328 | } | 
|---|
| 1329 | #endif | 
|---|
| 1330 |  | 
|---|
| 1331 | #ifdef COMPLETE_EVENTS | 
|---|
| 1332 | for (int j=0; j<40; j++) | 
|---|
| 1333 | { | 
|---|
| 1334 | //if (rs->bufTyp==READ_STRUCT::kWait) | 
|---|
| 1335 | { | 
|---|
| 1336 | rs->bufTyp = READ_STRUCT::kHeader; | 
|---|
| 1337 | rs->bufLen = sizeof(PEVNT_HEADER); | 
|---|
| 1338 | rs->bufPos = rs->B; | 
|---|
| 1339 | } | 
|---|
| 1340 | } | 
|---|
| 1341 | #endif | 
|---|
| 1342 | } // end for loop over all sockets | 
|---|
| 1343 | #ifdef PRIORITY_QUEUE | 
|---|
| 1344 | while (0); // convert continue into break ;) | 
|---|
| 1345 | #endif | 
|---|
| 1346 |  | 
|---|
| 1347 | // ================================================================== | 
|---|
| 1348 |  | 
|---|
| 1349 | const time_t actTime = time(NULL); | 
|---|
| 1350 | if (actTime == gi_SecTime) | 
|---|
| 1351 | { | 
|---|
| 1352 | #if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL) | 
|---|
| 1353 | if (evtCtrl.empty()) | 
|---|
| 1354 | usleep(actTime-actrun->lastTime>300 ? 10000 : 1); | 
|---|
| 1355 | #endif | 
|---|
| 1356 | continue; | 
|---|
| 1357 | } | 
|---|
| 1358 | gi_SecTime = actTime; | 
|---|
| 1359 |  | 
|---|
| 1360 | // ================================================================== | 
|---|
| 1361 | //loop over all active events and flag those older than read-timeout | 
|---|
| 1362 | //delete those that are written to disk .... | 
|---|
| 1363 |  | 
|---|
| 1364 | // This could be improved having the pointer which separates the queue with | 
|---|
| 1365 | // the incomplete events from the queue with the complete events | 
|---|
| 1366 | for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); ) | 
|---|
| 1367 | { | 
|---|
| 1368 | // A reference is enough because the shared_ptr is hold by the evtCtrl | 
|---|
| 1369 | const shared_ptr<EVT_CTRL2> &evt = *it; | 
|---|
| 1370 |  | 
|---|
| 1371 | // The first event is the oldest. If the first event within the | 
|---|
| 1372 | // timeout window was received, we can stop searchinf further. | 
|---|
| 1373 | if (evt->time.tv_sec+g_evtTimeout>=actTime) | 
|---|
| 1374 | break; | 
|---|
| 1375 |  | 
|---|
| 1376 | // This will result in the emission of a dim service. | 
|---|
| 1377 | // It doesn't matter if that takes comparably long, | 
|---|
| 1378 | // because we have to stop the run anyway. | 
|---|
| 1379 | const uint64_t rep = reportIncomplete(evt, "timeout"); | 
|---|
| 1380 | factReportIncomplete(rep); | 
|---|
| 1381 |  | 
|---|
| 1382 | // package_len is 0 when nothing was received from this board | 
|---|
| 1383 | for (int ib=0; ib<40; ib++) | 
|---|
| 1384 | rd[ib].relBytes += uint32_t((*it)->FADhead[ib].package_length)*2; | 
|---|
| 1385 |  | 
|---|
| 1386 | it++; | 
|---|
| 1387 | evtCtrl.pop_front(); | 
|---|
| 1388 | } | 
|---|
| 1389 |  | 
|---|
| 1390 | // ================================================================= | 
|---|
| 1391 |  | 
|---|
| 1392 | gj.bufNew   = evtCtrl.size();            //# incomplete events in buffer | 
|---|
| 1393 | gj.bufEvt   = primaryQueue.size();       //# complete events in buffer | 
|---|
| 1394 | gj.bufWrite = secondaryQueue.size();     //# complete events in buffer | 
|---|
| 1395 | gj.bufProc  = processingQueue1.size();   //# complete events in buffer | 
|---|
| 1396 | gj.bufTot   = Memory::max_inuse/MAX_TOT_MEM; | 
|---|
| 1397 | gj.usdMem   = Memory::max_inuse; | 
|---|
| 1398 | gj.totMem   = Memory::allocated; | 
|---|
| 1399 | gj.maxMem   = g_maxMem; | 
|---|
| 1400 |  | 
|---|
| 1401 | gj.deltaT = 1000; // temporary, must be improved | 
|---|
| 1402 |  | 
|---|
| 1403 | bool changed = false; | 
|---|
| 1404 |  | 
|---|
| 1405 | static vector<uint64_t> store(NBOARDS); | 
|---|
| 1406 |  | 
|---|
| 1407 | for (int ib=0; ib<NBOARDS; ib++) | 
|---|
| 1408 | { | 
|---|
| 1409 | gj.rateBytes[ib] = store[ib]>rd[ib].totBytes ? rd[ib].totBytes : rd[ib].totBytes-store[ib]; | 
|---|
| 1410 | gj.relBytes[ib]  = rd[ib].totBytes-rd[ib].relBytes; | 
|---|
| 1411 |  | 
|---|
| 1412 | store[ib] = rd[ib].totBytes; | 
|---|
| 1413 |  | 
|---|
| 1414 | if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr)) | 
|---|
| 1415 | changed = true; | 
|---|
| 1416 |  | 
|---|
| 1417 | gi_NumConnect[ib] = rd[ib].connected; | 
|---|
| 1418 | gj.numConn[ib]    = rd[ib].connected; | 
|---|
| 1419 | } | 
|---|
| 1420 |  | 
|---|
| 1421 | factStat(gj); | 
|---|
| 1422 |  | 
|---|
| 1423 | Memory::max_inuse = 0; | 
|---|
| 1424 |  | 
|---|
| 1425 | // ================================================================= | 
|---|
| 1426 |  | 
|---|
| 1427 | // This is a fake event to trigger possible run-closing conditions once a second | 
|---|
| 1428 | // FIXME: This is not yet ideal because a file would never be closed | 
|---|
| 1429 | //        if a new file has been started and no events of the new file | 
|---|
| 1430 | //        have been received yet | 
|---|
| 1431 | int request = kRequestNone; | 
|---|
| 1432 |  | 
|---|
| 1433 | // If nothing was received for more than 5min, close file | 
|---|
| 1434 | if (actTime-actrun->lastTime>300) | 
|---|
| 1435 | request |= kRequestTimeout; | 
|---|
| 1436 |  | 
|---|
| 1437 | // If connection status has changed | 
|---|
| 1438 | if (changed) | 
|---|
| 1439 | request |= kRequestConnectionChange; | 
|---|
| 1440 |  | 
|---|
| 1441 | if (request!=kRequestNone) | 
|---|
| 1442 | runFinished(); | 
|---|
| 1443 |  | 
|---|
| 1444 | if (actrun->fileStat==kFileOpen) | 
|---|
| 1445 | primaryQueue.emplace(new EVT_CTRL2(request, actrun)); | 
|---|
| 1446 | } | 
|---|
| 1447 |  | 
|---|
| 1448 | //   1: Stop, wait for event to get processed | 
|---|
| 1449 | //   2: Stop, finish immediately | 
|---|
| 1450 | // 101: Restart, wait for events to get processed | 
|---|
| 1451 | // 101: Restart, finish immediately | 
|---|
| 1452 | // | 
|---|
| 1453 | const int gi_reset = g_reset; | 
|---|
| 1454 |  | 
|---|
| 1455 | const bool abort = gi_reset%100==2; | 
|---|
| 1456 |  | 
|---|
| 1457 | factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join"); | 
|---|
| 1458 |  | 
|---|
| 1459 | primaryQueue.wait(abort); | 
|---|
| 1460 | secondaryQueue.wait(abort); | 
|---|
| 1461 | processingQueue1.wait(abort); | 
|---|
| 1462 |  | 
|---|
| 1463 | // Here we also destroy all runCtrl structures and hence close all open files | 
|---|
| 1464 | evtCtrl.clear(); | 
|---|
| 1465 | actrun.reset(); | 
|---|
| 1466 |  | 
|---|
| 1467 | factPrintf(MessageImp::kInfo, "Exit read Process..."); | 
|---|
| 1468 | factPrintf(MessageImp::kInfo, "%ld Bytes flagged as in-use.", Memory::inuse); | 
|---|
| 1469 |  | 
|---|
| 1470 | factStat(gj); | 
|---|
| 1471 |  | 
|---|
| 1472 | return gi_reset>=100; | 
|---|
| 1473 | } | 
|---|
| 1474 |  | 
|---|
| 1475 | // ========================================================================== | 
|---|
| 1476 | // ========================================================================== | 
|---|
| 1477 |  | 
|---|
| 1478 | void StartEvtBuild() | 
|---|
| 1479 | { | 
|---|
| 1480 | factPrintf(MessageImp::kInfo, "Starting EventBuilder++"); | 
|---|
| 1481 |  | 
|---|
| 1482 | memset(gi_NumConnect, 0, NBOARDS*sizeof(*gi_NumConnect)); | 
|---|
| 1483 |  | 
|---|
| 1484 | memset(&gj, 0, sizeof(GUI_STAT)); | 
|---|
| 1485 |  | 
|---|
| 1486 | gj.usdMem   = Memory::inuse; | 
|---|
| 1487 | gj.totMem   = Memory::allocated; | 
|---|
| 1488 | gj.maxMem   = g_maxMem; | 
|---|
| 1489 |  | 
|---|
| 1490 |  | 
|---|
| 1491 | READ_STRUCT rd[NBOARDS]; | 
|---|
| 1492 |  | 
|---|
| 1493 | // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets) | 
|---|
| 1494 | for (int i=0; i<NBOARDS; i++) | 
|---|
| 1495 | rd[i].sockId = i; | 
|---|
| 1496 |  | 
|---|
| 1497 | while (mainloop(rd)); | 
|---|
| 1498 |  | 
|---|
| 1499 | //must close all open sockets ... | 
|---|
| 1500 | factPrintf(MessageImp::kInfo, "Close all sockets..."); | 
|---|
| 1501 |  | 
|---|
| 1502 | READ_STRUCT::close(); | 
|---|
| 1503 |  | 
|---|
| 1504 | // Now all sockets get closed. This is not reflected in gi_NumConnect | 
|---|
| 1505 | // The current workaround is to count all sockets as closed when the thread is not running | 
|---|
| 1506 | } | 
|---|