Changeset 16040


Ignore:
Timestamp:
05/22/13 18:28:01 (11 years ago)
Author:
tbretz
Message:
First working version of the event builder in C++
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilder.cc

    r15512 r16040  
    1 // #define EVTDEBUG
    2 
    3 #define NUMSOCK   1          //set to 7 for old configuration
    4 #define MAXREAD  65536       //64kB wiznet buffer
    5 
    6 #include <stdlib.h>
    7 #include <stdint.h>
    8 #include <stdarg.h>
    9 #include <unistd.h>
    10 #include <stdio.h>
    111#include <sys/time.h>
    12 #include <arpa/inet.h>
    13 #include <string.h>
    14 #include <math.h>
    15 #include <error.h>
    16 #include <errno.h>
    17 #include <unistd.h>
    18 #include <sys/types.h>
    19 #include <sys/socket.h>
    20 #include <netinet/in.h>
     2#include <sys/epoll.h>
    213#include <netinet/tcp.h>
    22 #include <pthread.h>
    23 #include <sched.h>                             
    24 
    25 #include <memory>
    26 #include <deque>
    27 #include <map>
     4
     5#include <cstring>
     6#include <cstdarg>
     7#include <list>
     8#include <forward_list>
    289
    2910#include "queue.h"
    3011
     12#include "MessageImp.h"
     13
     14using namespace std;
     15
    3116#include "EventBuilder.h"
    3217
    33 enum Severity
    34 {
    35    kMessage = 10,               ///< Just a message, usually obsolete
    36    kInfo = 20,                  ///< An info telling something which can be interesting to know
    37    kWarn = 30,                  ///< A warning, things that somehow might result in unexpected or unwanted bahaviour
    38    kError = 40,                 ///< Error, something unexpected happened, but can still be handled by the program
    39    kFatal = 50,                 ///< An error which cannot be handled at all happend, the only solution is program termination
    40    kDebug = 99,                 ///< A message used for debugging only
    41 };
    42 
    43 using namespace std;
    44 
    45 #define MIN_LEN  32             // min #bytes needed to interpret FADheader
    46 #define MAX_LEN 256*1024        // size of read-buffer per socket
    47 
    48 //#define nanosleep(x,y)
    49 
    50 extern FileHandle_t runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len);
    51 extern int runWrite (FileHandle_t fileHd, EVENT * event, size_t len);
    52 extern int runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len);
    53 //extern int runFinish (uint32_t runnr);
    54 
    55 extern "C" void factOut (int severity, int err, char *message);
    56 extern void factReportIncomplete (uint64_t rep);
    57 
    58 extern "C" void gotNewRun (int runnr, PEVNT_HEADER * headers);
    59 
    60 
    61 extern void factStat (GUI_STAT gj);
    62 
    63 extern void factStatNew (EVT_STAT gi);
    64 
    65 extern "C" int eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event);
    66 
    67 extern "C" int subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
    68                        int8_t * buffer);
    69 
    70 extern void debugHead (int i, int j, void *buf);
    71 
    72 extern void debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt,
    73                        int32_t runnr, int state, uint32_t tsec,
    74                        uint32_t tusec);
    75 extern void debugStream (int isock, void *buf, int len);
    76 
    77 int CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt);
    78 
    79 int g_maxProc;
    80 int gi_maxProc;
    81 
    82 uint g_actTime;
    83 int g_runStat;
     18#define MIN_LEN  32           // min #bytes needed to interpret FADheader
     19#define MAX_LEN (36*3*1024)   // (data+header)*num channels
     20
     21#define COMPLETE_EVENTS
     22//#define USE_EPOLL
     23//#define USE_SELECT
     24
     25// ==========================================================================
     26
     27bool runOpen(const shared_ptr<EVT_CTRL2> &evt);
     28bool runWrite(const shared_ptr<EVT_CTRL2> &evt);
     29void runClose();
     30void applyCalib(const shared_ptr<EVT_CTRL2> &evt);
     31void factOut(int severity, const char *message);
     32void factReportIncomplete (uint64_t rep);
     33void gotNewRun(RUN_CTRL2 &run);
     34void runFinished();
     35void factStat(GUI_STAT gj);
     36int  eventCheck(const shared_ptr<EVT_CTRL2> &evt);
     37void debugHead(void *buf);
     38
     39// ==========================================================================
     40
    8441int g_reset;
    8542
     
    9047uint gi_NumConnect[NBOARDS];    //4 crates * 10 boards
    9148
    92 //EVT_STAT gi;
    9349GUI_STAT gj;
    9450
    95 #define MAX_EVT   65536  // ( 300s @ 220Hz; 16GB = 5000 evt @ roi=1024 (27s) ; 18000 evt @ roi = 300 )
    96 #define MAX_RUN       8  // Number of concurrent runs
    97 
    98 void factPrintf(int severity, int id, const char *fmt, ...)
     51// ==========================================================================
     52
     53void factPrintf(int severity, const char *fmt, ...)
    9954{
    10055    char str[1000];
     
    10560    va_end(ap);
    10661
    107     factOut(severity, id, str);
    108 }
    109 
     62    factOut(severity, str);
     63}
     64
     65// ==========================================================================
    11066
    11167#define MAX_HEAD_MEM (NBOARDS * sizeof(PEVNT_HEADER))
    11268#define MAX_TOT_MEM (sizeof(EVENT) + (NPIX+NTMARK)*1024*2 + MAX_HEAD_MEM)
    113 typedef struct TGB_struct
    114 {
    115     struct TGB_struct *prev;
    116     void *mem;
    117 } TGB_entry;
    118 
    119 TGB_entry  *tgb_last = NULL;
    120 uint64_t    tgb_memory = 0;
    121 uint64_t    tgb_inuse  = 0;
    122 
    123 void *TGB_Malloc()
    124 {
    125     // No free slot available, next alloc would exceed max memory
    126     if (!tgb_last && tgb_memory+MAX_TOT_MEM>g_maxMem)
    127         return NULL;
    128 
    129     // We will return this amount of memory
    130     tgb_inuse += MAX_TOT_MEM;
    131 
    132     // No free slot available, allocate a new one
    133     if (!tgb_last)
    134     {
    135         tgb_memory += MAX_TOT_MEM;
    136         return malloc(MAX_TOT_MEM);
    137     }
    138 
    139     // Get the next free slot from the stack and return it
    140     TGB_entry *last = tgb_last;
    141 
    142     void *mem = last->mem;
    143     tgb_last  = last->prev;
    144 
    145     free(last);
    146 
    147     return mem;
     69
     70namespace Memory
     71{
     72    uint64_t inuse     = 0;
     73    uint64_t allocated = 0;
     74
     75    uint64_t max_inuse = 0;
     76
     77    mutex mtx;
     78
     79    forward_list<void*> memory;
     80
     81    void *malloc()
     82    {
     83        // No free slot available, next alloc would exceed max memory
     84        if (memory.empty() && allocated+MAX_TOT_MEM>g_maxMem)
     85            return NULL;
     86
     87        // We will return this amount of memory
     88        // This is not 100% thread safe, but it is not a super accurate measure anyway
     89        inuse += MAX_TOT_MEM;
     90        if (inuse>max_inuse)
     91            max_inuse = inuse;
     92
     93        void *mem = NULL;
     94
     95        if (memory.empty())
     96        {
     97            // No free slot available, allocate a new one
     98            allocated += MAX_TOT_MEM;
     99            mem = new char[MAX_TOT_MEM];
     100        }
     101        else
     102        {
     103            // Get the next free slot from the stack and return it
     104            const lock_guard<mutex> lock(mtx);
     105            mem = memory.front();
     106            memory.pop_front();
     107        }
     108
     109        memset(mem, 0, MAX_HEAD_MEM);
     110        return mem;
     111    };
     112
     113    void free(void *mem)
     114    {
     115        if (!mem)
     116            return;
     117
     118        // Decrease the amont of memory in use accordingly
     119        inuse -= MAX_TOT_MEM;
     120
     121        // If the maximum memory has changed, we might be over the limit.
     122        // In this case: free a slot
     123        if (allocated>g_maxMem)
     124        {
     125            delete [] (char*)mem;
     126            allocated -= MAX_TOT_MEM;
     127            return;
     128        }
     129
     130        const lock_guard<mutex> lock(mtx);
     131        memory.push_front(mem);
     132    }
    148133};
    149134
    150 void TGB_free(void *mem)
    151 {
    152     if (!mem)
     135// ==========================================================================
     136
     137struct READ_STRUCT
     138{
     139    enum buftyp_t
     140    {
     141        kStream,
     142        kHeader,
     143        kData,
     144#ifdef COMPLETE_EVENTS
     145        kWait
     146#endif
     147    };
     148
     149    // ---------- connection ----------
     150
     151    static uint activeSockets;
     152
     153    int  sockId;       // socket id (board number)
     154    int  socket;       // socket handle
     155    bool connected;    // is this socket connected?
     156
     157    struct sockaddr_in SockAddr;  // Socket address copied from wrapper during socket creation
     158
     159    // ------------ epoll -------------
     160
     161    static int  fd_epoll;
     162    static epoll_event events[NBOARDS];
     163
     164    static void init();
     165    static void close();
     166    static int  wait();
     167    static READ_STRUCT *get(int i) { return reinterpret_cast<READ_STRUCT*>(events[i].data.ptr); }
     168
     169    // ------------ buffer ------------
     170
     171    buftyp_t  bufTyp;  // what are we reading at the moment: 0=header 1=data -1=skip ...
     172
     173    uint32_t  bufLen;  // number of bytes left to read
     174    uint8_t  *bufPos;  // next byte to read to the buffer next
     175
     176    union
     177    {
     178        uint8_t  B[MAX_LEN];
     179        uint16_t S[MAX_LEN / 2];
     180        uint32_t I[MAX_LEN / 4];
     181        uint64_t L[MAX_LEN / 8];
     182        PEVNT_HEADER H;
     183    };
     184
     185    uint64_t rateBytes;
     186    uint32_t skip;     // number of bytes skipped before start of event
     187    bool     repmem;   // reportet no mmemory free
     188
     189    uint32_t len() const { return uint32_t(H.package_length)*2; }
     190
     191    void swapHeader();
     192    void swapData();
     193
     194    // --------------------------------
     195
     196    READ_STRUCT() : socket(-1), connected(false), rateBytes(0)
     197    {
     198        if (fd_epoll<0)
     199            init();
     200    }
     201    ~READ_STRUCT()
     202    {
     203        destroy();
     204    }
     205
     206    void destroy();
     207    bool create(sockaddr_in addr);
     208    void check(int, sockaddr_in addr);
     209    bool read();
     210};
     211
     212int READ_STRUCT::wait()
     213{
     214    // wait for something to do...
     215    const int rc = epoll_wait(fd_epoll, events, NBOARDS, 10); // max, timeout[ms]
     216    if (rc>=0)
     217        return rc;
     218
     219    if (errno==EINTR) // timout or signal interruption
     220        return 0;
     221
     222    factPrintf(MessageImp::kError, "epoll_wait failed: %m (rc=%d)", errno);
     223    return -1;
     224}
     225
     226uint READ_STRUCT::activeSockets = 0;
     227int READ_STRUCT::fd_epoll = -1;
     228epoll_event READ_STRUCT::events[NBOARDS];
     229
     230void READ_STRUCT::init()
     231{
     232    if (fd_epoll>=0)
    153233        return;
    154234
    155     // Add the last free slot to the stack
    156     TGB_entry *entry = (TGB_entry*)malloc(sizeof(TGB_entry));
    157 
    158     // FIXME: Really free memory if memory usuage exceeds g_maxMem
    159 
    160     entry->prev = tgb_last;
    161     entry->mem  = mem;
    162 
    163     tgb_last = entry;
    164 
    165     // Decrease the amont of memory in use accordingly
    166     tgb_inuse -= MAX_TOT_MEM;
    167 
    168     gj.usdMem = tgb_inuse;
    169     gj.bufTot--;
    170 }
    171 
    172 //RUN_CTRL runCtrl[MAX_RUN];
    173 
    174 /*
    175 *** Definition of rdBuffer to read in IP packets; keep it global !!!!
    176  */
    177 
    178 typedef union
    179 {
    180    uint8_t B[MAX_LEN];
    181    uint16_t S[MAX_LEN / 2];
    182    uint32_t I[MAX_LEN / 4];
    183    uint64_t L[MAX_LEN / 8];
    184 } CNV_FACT;
    185 
    186 typedef struct
    187 {
    188    int bufTyp;                  //what are we reading at the moment: 0=header 1=data -1=skip ...
    189    int32_t bufPos;              //next byte to read to the buffer next
    190    int32_t bufLen;              //number of bytes left to read
    191    int32_t skip;                //number of bytes skipped before start of event
    192 
    193    int errCnt;                  //how often connect failed since last successful
    194    int sockStat;                //-1 if socket not yet connected  , 99 if not exist
    195    int socket;                  //contains the sockets
    196 
    197    struct sockaddr_in SockAddr; //IP for each socket
    198 
    199    int evtID;                   // event ID of event currently read
    200    int runID;                   // run       "
    201    int ftmID;                   // event ID from FTM
    202    uint fadLen;                 // FADlength of event currently read
    203    int fadVers;                 // Version of FAD
    204    int ftmTyp;                  // trigger type
    205    int Port;
    206 
    207    CNV_FACT *rBuf;
    208 
    209 } READ_STRUCT;
    210 
    211 /*-----------------------------------------------------------------*/
    212 
    213 
    214 /*-----------------------------------------------------------------*/
    215 
    216 
    217 int
    218 GenSock (int flag, int sid, int port, struct sockaddr_in *sockAddr,
    219          READ_STRUCT * rs)
    220 {
    221 /*
    222 *** generate Address, create sockets and allocates readbuffer for it
    223 ***
    224 *** if flag==0 generate socket and buffer
    225 ***         <0 destroy socket and buffer
    226 ***         >0 close and redo socket
    227 ***
    228 *** sid : board*7 + port id
    229  */
    230 
    231    //close socket if open
    232    if (rs->sockStat == 0)
    233    {
    234       if (close (rs->socket) > 0) {
    235           factPrintf(kFatal, 771, "Closing socket %d failed: %m (close,rc=%d)", sid, errno);
    236       } else {
    237           factPrintf(kInfo, 771, "Succesfully closed socket %d", sid);
    238       }
    239    }
    240 
    241    rs->sockStat = 99;
    242 
    243    if (flag < 0) {
    244       free (rs->rBuf);          //and never open again
    245       rs->rBuf = NULL;
    246       return 0;
    247    }
    248 
    249 
    250    if (flag == 0) {             //generate address and buffer ...
    251       rs->Port = port;
    252       rs->SockAddr.sin_family = sockAddr->sin_family;
    253       rs->SockAddr.sin_port = htons (port);
    254       rs->SockAddr.sin_addr = sockAddr->sin_addr;
    255 
    256       rs->rBuf = (CNV_FACT*)malloc (sizeof (CNV_FACT));
    257       if (rs->rBuf == NULL) {
    258          factPrintf(kFatal, 774, "Could not create local buffer %d (malloc failed)", sid);
    259          rs->sockStat = 77;
    260          return -3;
    261       }
    262    }
    263 
    264 
    265    if ((rs->socket = socket (PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0) {
    266        factPrintf(kFatal, 773, "Generating socket %d failed: %m (socket,rc=%d)", sid, errno);
    267       rs->sockStat = 88;
    268       return -2;
    269    }
    270 
    271    int optval = 1;
    272    if (setsockopt (rs->socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0) {
    273        factPrintf(kInfo, 173, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
    274    }
    275    optval = 10;                 //start after 10 seconds
    276    if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0) {
    277        factPrintf(kInfo, 173, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
    278    }
    279    optval = 10;                 //do every 10 seconds
    280    if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0) {
    281       factPrintf(kInfo, 173, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
    282    }
    283    optval = 2;                  //close after 2 unsuccessful tries
    284    if (setsockopt (rs->socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0) {
    285       factPrintf(kInfo, 173, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sid, errno);
    286    }
    287 
    288    factPrintf(kInfo, 773, "Successfully generated socket %d", sid);
    289 
    290    rs->sockStat = -1;           //try to (re)open socket
    291    rs->errCnt = 0;
    292    return 0;
    293 
    294 } /*-----------------------------------------------------------------*/
    295 
    296   /*-----------------------------------------------------------------*/
    297 
    298 int checkRoiConsistency(const CNV_FACT *rbuf, int roi[])
     235#ifdef USE_EPOLL
     236    fd_epoll = epoll_create(NBOARDS);
     237    if (fd_epoll<0)
     238    {
     239        factPrintf(MessageImp::kError, "Waiting for data failed: %d (epoll_create,rc=%d)", errno);
     240        return;
     241    }
     242#endif
     243}
     244
     245void READ_STRUCT::close()
     246{
     247#ifdef USE_EPOLL
     248    if (::close(fd_epoll) > 0)
     249        factPrintf(MessageImp::kFatal, "Closing epoll: %m (close,rc=%d)", errno);
     250    else
     251        factPrintf(MessageImp::kInfo, "Succesfully closed epoll");
     252#endif
     253
     254    fd_epoll = -1;
     255}
     256
     257bool READ_STRUCT::create(sockaddr_in sockAddr)
     258{
     259    if (socket>=0)
     260        return false;
     261
     262    const int port = ntohs(sockAddr.sin_port) + 1;
     263
     264    SockAddr.sin_family = sockAddr.sin_family;
     265    SockAddr.sin_addr   = sockAddr.sin_addr;
     266    SockAddr.sin_port   = htons(port);
     267
     268    if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
     269    {
     270        factPrintf(MessageImp::kFatal, "Generating socket %d failed: %m (socket,rc=%d)", sockId, errno);
     271        socket = -1;
     272        return false;
     273    }
     274
     275    int optval = 1;
     276    if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)) < 0)
     277        factPrintf(MessageImp::kInfo, "Setting SO_KEEPALIVE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
     278
     279    optval = 10;                 //start after 10 seconds
     280    if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)) < 0)
     281        factPrintf(MessageImp::kInfo, "Setting TCP_KEEPIDLE for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
     282
     283    optval = 10;                 //do every 10 seconds
     284    if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)) < 0)
     285        factPrintf(MessageImp::kInfo, "Setting TCP_KEEPINTVL for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
     286
     287    optval = 2;                  //close after 2 unsuccessful tries
     288    if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)) < 0)
     289        factPrintf(MessageImp::kInfo, "Setting TCP_KEEPCNT for socket %d failed: %m (setsockopt,rc=%d)", sockId, errno);
     290
     291    factPrintf(MessageImp::kInfo, "Successfully generated socket %d", sockId);
     292
     293    //connected = false;
     294    activeSockets++;
     295
     296    return true;
     297}
     298
     299void READ_STRUCT::destroy()
     300{
     301    if (socket==-1)
     302        return;
     303
     304#ifdef USE_EPOLL
     305    // strictly speaking this should not be necessary
     306    if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
     307        factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
     308#endif
     309
     310    if (::close(socket) > 0)
     311        factPrintf(MessageImp::kFatal, "Closing socket %d failed: %m (close,rc=%d)", sockId, errno);
     312    else
     313        factPrintf(MessageImp::kInfo, "Succesfully closed socket %d", sockId);
     314
     315    socket = -1;
     316    connected = false;
     317    activeSockets--;
     318}
     319
     320void READ_STRUCT::check(int sockDef, sockaddr_in addr)
     321{
     322    // Continue in the most most likely case (performance)
     323    //if (socket>=0 && sockDef!=0 && connected)
     324    //    return;
     325
     326    // socket open, but should not be open
     327    if (socket>=0 && sockDef==0)
     328        destroy();
     329
     330    // Socket closed, but should be open
     331    if (socket<0 && sockDef!=0)
     332        create(addr); //generate address and socket
     333
     334    // Socket closed
     335    if (socket<0)
     336        return;
     337
     338    // Socket open and connected: Nothing to do
     339    if (connected)
     340        return;
     341
     342    //try to connect if not yet done
     343    const int rc = connect(socket, (struct sockaddr *) &SockAddr, sizeof(SockAddr));
     344    if (rc == -1)
     345        return;
     346
     347    connected = true;
     348
     349    if (sockDef<0)
     350    {
     351        bufTyp = READ_STRUCT::kStream; // full data to be skipped
     352        bufLen = MAX_LEN;              // huge for skipping
     353    }
     354    else
     355    {
     356        bufTyp = READ_STRUCT::kHeader;  // expect a header
     357        bufLen = sizeof(PEVNT_HEADER);  // max size to read at begining
     358    }
     359
     360    bufPos = B;  // no byte read so far
     361    skip   = 0;       // start empty
     362    repmem = false;
     363
     364    factPrintf(MessageImp::kInfo, "New connection %d (%d)", sockId, socket);
     365
     366#ifdef USE_EPOLL
     367    epoll_event ev;
     368    ev.events = EPOLLIN;
     369    ev.data.ptr = this;  // user data (union: ev.ptr)
     370    if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
     371        factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
     372#endif
     373}
     374
     375bool READ_STRUCT::read()
     376{
     377    if (bufLen==0)
     378        return true;
     379
     380    const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
     381    // recv failed
     382    if (jrd<0)
     383    {
     384        // There was just nothing waiting
     385        if (errno==EWOULDBLOCK || errno==EAGAIN)
     386            return false;
     387
     388        factPrintf(MessageImp::kError, "Reading from socket %d failed: %m (recv,rc=%d)", sockId, errno);
     389        return false;
     390    }
     391
     392    // connection was closed ...
     393    if (jrd==0)
     394    {
     395        factPrintf(MessageImp::kInfo, "Socket %d closed by FAD", sockId);
     396
     397        destroy();//DestroySocket(rd[i]); //generate address and socket
     398        return false;
     399    }
     400
     401    rateBytes += jrd;
     402
     403    // are we skipping this board ...
     404    if (bufTyp==kStream)
     405        return false;
     406
     407    bufPos += jrd;  //==> prepare for continuation
     408    bufLen -= jrd;
     409
     410    // not yet all read
     411    return bufLen==0;
     412}
     413
     414void READ_STRUCT::swapHeader()
     415{
     416    S[1]  = ntohs(S[1]);    // package_length (bytes not swapped!)
     417    S[2]  = ntohs(S[2]);    // version_no
     418    S[3]  = ntohs(S[3]);    // PLLLCK
     419    S[4]  = ntohs(S[4]);    // trigger_crc
     420    S[5]  = ntohs(S[5]);    // trigger_type
     421
     422    I[3]  = ntohl(I[3]);    // trigger_id
     423    I[4]  = ntohl(I[4]);    // fad_evt_counter
     424    I[5]  = ntohl(I[5]);    // REFCLK_frequency
     425
     426    S[12] = ntohs(S[12]);   // board id
     427    S[13] = ntohs(S[13]);   // adc_clock_phase_shift
     428    S[14] = ntohs(S[14]);   // number_of_triggers_to_generate
     429    S[15] = ntohs(S[15]);   // trigger_generator_prescaler
     430
     431    I[10] = ntohl(I[10]);   // runnumber;
     432    I[11] = ntohl(I[11]);   // time;
     433
     434    for (int s=24; s<24+NTemp+NDAC; s++)
     435        S[s] = ntohs(S[s]); // drs_temperature / dac
     436}
     437
     438void READ_STRUCT::swapData()
     439{
     440    // swapEventHeaderBytes: End of the header. to channels now
     441
     442    int i = 36;
     443    for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
     444    {
     445        S[i+0] = ntohs(S[i+0]);//id
     446        S[i+1] = ntohs(S[i+1]);//start_cell
     447        S[i+2] = ntohs(S[i+2]);//roi
     448        S[i+3] = ntohs(S[i+3]);//filling
     449
     450        i += 4+S[i+2];//skip the pixel data
     451    }
     452}
     453
     454// ==========================================================================
     455
     456bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
    299457{
    300458    int xjr = -1;
     
    304462    int roiPtr = sizeof(PEVNT_HEADER)/2 + 2;
    305463
    306     roi[0] = ntohs(rbuf->S[roiPtr]);
     464    roi[0] = ntohs(rd.S[roiPtr]);
    307465
    308466    for (int jr = 0; jr < 9; jr++)
    309467    {
    310         roi[jr] = ntohs(rbuf->S[roiPtr]);
    311 
    312         if (roi[jr]<0 || roi[jr]>1024)
    313         {
    314             factPrintf(kError, 999, "Illegal roi in channel %d (allowed: 0<=roi<=1024)", jr, roi[jr]);
    315             return 0;
     468        roi[jr] = ntohs(rd.S[roiPtr]);
     469
     470        if (roi[jr]>1024)
     471        {
     472            factPrintf(MessageImp::kError, "Illegal roi in channel %d (allowed: roi<=1024)", jr, roi[jr]);
     473            return false;
    316474        }
    317475
     
    326484        for (int kr = 1; kr < 4; kr++)
    327485        {
    328             const int kroi = ntohs(rbuf->S[roiPtr]);
     486            const int kroi = ntohs(rd.S[roiPtr]);
    329487            if (kroi != roi[jr])
    330488            {
     
    340498    {
    341499        if (xkr<0)
    342             factPrintf(kFatal, 1, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
     500            factPrintf(MessageImp::kFatal, "Inconsistent Roi accross chips [DRS=%d], expected %d, got %d", xjr, roi[0], roi[xjr]);
    343501        else
    344             factPrintf(kFatal, 1, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rbuf->S[roiPtr]));
    345 
    346         return 0;
     502            factPrintf(MessageImp::kFatal, "Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.S[roiPtr]));
     503
     504        return false;
    347505    }
    348506
    349507    if (roi[8] < roi[0])
    350508    {
    351         factPrintf(kError, 712, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
    352         //gj.badRoiB++;
    353         //gj.badRoi[b]++;
    354         return 0;
    355     }
    356 
    357     return 1;
    358 }
    359 
    360 deque<shared_ptr<EVT_CTRL>> evtCtrl;
    361 map<int, shared_ptr<RUN_CTRL>> runCtrl;
    362 
    363 void mBufFree(EVT_CTRL *evt)
    364 {
    365     TGB_free(evt->FADhead);
    366 }
    367 
    368 shared_ptr<EVT_CTRL> mBufEvt(const READ_STRUCT *rs)
    369 {
    370     int nRoi[9];
    371     if (!checkRoiConsistency(rs->rBuf, nRoi))
    372         return shared_ptr<EVT_CTRL>();
    373 
    374     const int  evID   = rs->evtID;
    375     const uint runID  = rs->runID;
    376     const int  trgTyp = rs->ftmTyp;
    377     const int  trgNum = rs->ftmID;
    378     const int  fadNum = rs->evtID;
     509        factPrintf(MessageImp::kError, "Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
     510        return false;
     511    }
     512
     513    return true;
     514}
     515
     516list<shared_ptr<EVT_CTRL2>> evtCtrl;
     517
     518shared_ptr<EVT_CTRL2> mBufEvt(const READ_STRUCT &rd, shared_ptr<RUN_CTRL2> &actrun)
     519{
     520    uint16_t nRoi[9];
     521    if (!checkRoiConsistency(rd, nRoi))
     522        return shared_ptr<EVT_CTRL2>();
    379523
    380524    for (auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
    381525    {
    382         const shared_ptr<EVT_CTRL> evt = *it;
     526        // A reference is enough because the evtCtrl holds the shared_ptr anyway
     527        const shared_ptr<EVT_CTRL2> &evt = *it;
    383528
    384529        // If the run is different, go on searching.
     
    386531        // the case of the events, because theoretically, there
    387532        // can be the same run on two different days.
    388         if (runID != evt->runNum)
     533        if (rd.H.runnumber != evt->runNum)
    389534            continue;
    390535
    391536        // If the ID of the new event if higher than the last one stored
    392537        // in that run, we have to assign a new slot (leave the loop)
    393         if (evID > evt->evNum)
     538        if (rd.H.fad_evt_counter > evt->evNum/* && runID == evtCtrl[k].runNum*/)
    394539            break;
    395540
    396         if (evID != evt->evNum)
     541        if (rd.H.fad_evt_counter != evt->evNum/* || runID != evtCtrl[k].runNum*/)
    397542            continue;
    398543
     
    401546        if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
    402547        {
    403             factPrintf(kError, 821, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
     548            factPrintf(MessageImp::kError, "Mismatch of roi within event. Expected roi=%d and roi_tm=%d, got %d and %d.",
    404549                       evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
    405             return shared_ptr<EVT_CTRL>();
     550            return shared_ptr<EVT_CTRL2>();
    406551        }
    407552
    408553        // count for inconsistencies
    409         if (evt->trgNum != trgNum)
     554        if (evt->trgNum != rd.H.trigger_id)
    410555            evt->Errors[0]++;
    411         if (evt->fadNum != fadNum)
    412             evt->Errors[1]++;
    413         if (evt->trgTyp != trgTyp)
     556        if (evt->trgTyp != rd.H.trigger_type)
    414557            evt->Errors[2]++;
    415558
     
    418561    }
    419562
    420     struct timeval tv;
    421     gettimeofday(&tv, NULL);
    422 
    423     auto ir = runCtrl.find(runID);
    424     if (ir==runCtrl.end())
    425     {
    426         shared_ptr<RUN_CTRL> run(new RUN_CTRL);
    427 
    428         run->runId      = runID;
    429         run->roi0       = nRoi[0];  // FIXME: Make obsolete!
    430         run->roi8       = nRoi[8];  // FIXME: Make obsolete!
    431         run->fileId     = -2;
    432         run->lastEvt    = 1;         // Number of events partially started to read
    433         run->actEvt     = 0;         // Number of written events (write)
    434         run->procEvt    = 0;         // Number of successfully checked events (checkEvent)
    435         run->maxEvt     = 999999999; // max number events allowed
    436         run->lastTime   = tv.tv_sec;      // Time when the last event was written
    437         run->closeTime  = tv.tv_sec + 3600 * 24;     //max time allowed
    438 
    439         ir = runCtrl.insert(make_pair(runID, run)).first;
    440     }
    441 
    442     const shared_ptr<RUN_CTRL> run = ir->second;
    443 
    444     if (run->roi0 != nRoi[0] || run->roi8 != nRoi[8])
    445     {
    446         factPrintf(kError, 931, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
    447                    run->roi0, run->roi8, nRoi[0], nRoi[8], runID, evID);
    448         return shared_ptr<EVT_CTRL>();
    449     }
    450 
    451     const shared_ptr<EVT_CTRL> evt(new EVT_CTRL, mBufFree);
    452 
    453     //flag all boards as unused
    454     evt->nBoard = 0;
    455     for (int b=0; b<NBOARDS; b++)
    456         evt->board[b] = -1;
    457 
    458     evt->run       = run;
    459     evt->pcTime[0] = tv.tv_sec;
    460     evt->pcTime[1] = tv.tv_usec;
    461     evt->nRoi      = nRoi[0];
    462     evt->nRoiTM    = nRoi[8];
    463     evt->evNum     = evID;
    464     evt->runNum    = runID;
    465     evt->fadNum    = fadNum;
    466     evt->trgNum    = trgNum;
    467     evt->trgTyp    = trgTyp;
    468     evt->Errors[0] = 0;
    469     evt->Errors[1] = 0;
    470     evt->Errors[2] = 0;
    471     evt->Errors[3] = 0;
    472     evt->fEvent    = NULL;
    473     evt->FADhead   = NULL;
    474 
    475     //    -1:   kInValid
    476     //     0:   kValid
    477     //  1-40:   kIncomplete
    478     //    90:   kIncompleteReported
    479     //   100:   kCompleteEventInBuffer
    480     //  1000+x: kToBeProcessedByThreadX
    481     //  5000:   kToBeWritten
    482     // 10000:   kToBeDeleted
    483 
    484     evt->evtStat = 0;
    485 
     563    if (actrun->runId==rd.H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
     564    {
     565        factPrintf(MessageImp::kError, "Mismatch of roi within run. Expected roi=%d and roi_tm=%d, got %d and %d (runID=%d, evID=%d)",
     566                   actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.H.runnumber, rd.H.fad_evt_counter);
     567        return shared_ptr<EVT_CTRL2>();
     568    }
     569
     570    shared_ptr<EVT_CTRL2> evt(new EVT_CTRL2);
     571
     572    gettimeofday(&evt->time, NULL);
     573
     574    evt->runNum = rd.H.runnumber;
     575    evt->evNum  = rd.H.fad_evt_counter;
     576
     577    evt->trgNum = rd.H.trigger_id;
     578    evt->trgTyp = rd.H.trigger_type;
     579
     580    evt->nRoi   = nRoi[0];
     581    evt->nRoiTM = nRoi[8];
     582
     583    const bool newrun = actrun->runId != rd.H.runnumber;
     584    if (newrun)
     585    {
     586        // Since we have started a new run, we know already when to close the
     587        // previous run in terms of number of events
     588        actrun->maxEvt = actrun->lastEvt;
     589
     590        factPrintf(MessageImp::kInfo, "New run %d (evt=%d) registered with roi=%d and roi_tm=%d, prev=%d",
     591                   rd.H.runnumber, rd.H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
     592
     593        // The new run is the active run now
     594        actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
     595
     596        const time_t &tsec = evt->time.tv_sec;
     597
     598        actrun->openTime  = tsec;
     599        actrun->closeTime = tsec + 3600 * 24; // max time allowed
     600        actrun->runId     = rd.H.runnumber;
     601        actrun->roi0      = nRoi[0];  // FIXME: Make obsolete!
     602        actrun->roi8      = nRoi[8];  // FIXME: Make obsolete!
     603    }
     604
     605    // Increase the number of events we have started to receive in this run
     606    actrun->lastTime = evt->time.tv_sec;  // Time when the last event was received
     607    actrun->lastEvt++;
     608
     609    // Keep pointer to run of this event
     610    evt->runCtrl = actrun;
     611
     612    // Secure access to evtCtrl against access in CloseRunFile
     613    // This should be the last... otherwise we can run into threading issues
     614    // if the event is accessed before it is fully initialized.
    486615    evtCtrl.push_back(evt);
    487616
     617    // Signal the fadctrl that a new run has been started
     618    // Note this is the only place at which we can ensure that
     619    // gotnewRun is called only once
     620    // Note that this will callback CloseRunFile, therefor the event
     621    // must already be in the evtCtrl structure
     622    if (newrun)
     623        gotNewRun(*actrun);
     624
     625    // An event can be the first and the last, but not the last and the first.
     626    // Therefore gotNewRun is called before runFinished.
     627    // runFinished signals that the last event of a run was just received. Processing
     628    // might still be ongoing, but we can start a new run.
     629    const bool cond1 = actrun->lastEvt < actrun->maxEvt;      // max number of events not reached
     630    const bool cond2 = actrun->lastTime < actrun->closeTime;  // max time not reached
     631    if (!cond1 || !cond2)
     632        runFinished();
     633
    488634    return evt;
    489 
    490 } /*-----------------------------------------------------------------*/
    491 
    492 
    493 void initEvent(const shared_ptr<EVT_CTRL> &evt)
    494 {
    495     evt->fEvent = (EVENT*)((char*)evt->FADhead+MAX_HEAD_MEM);
    496     memset(evt->fEvent->Adc_Data, 0, (NPIX+NTMARK)*2*evt->nRoi);
    497 
    498     //flag all pixels as unused
    499     for (int k = 0; k < NPIX; k++)
    500         evt->fEvent->StartPix[k] = -1;
    501 
    502     //flag all TMark as unused
    503     for (int k = 0; k < NTMARK; k++)
    504         evt->fEvent->StartTM[k] = -1;
    505 
    506     evt->fEvent->NumBoards   = 0;
    507     evt->fEvent->SoftTrig    = 0;
    508     evt->fEvent->PCTime      = evt->pcTime[0];
    509     evt->fEvent->PCUsec      = evt->pcTime[1];
    510     evt->fEvent->Roi         = evt->nRoi;
    511     evt->fEvent->RoiTM       = evt->nRoiTM;
    512     evt->fEvent->EventNum    = evt->evNum;
    513     evt->fEvent->TriggerNum  = evt->trgNum;
    514     evt->fEvent->TriggerType = evt->trgTyp;
    515 }
    516 
    517 
    518 uint64_t reportIncomplete(const shared_ptr<EVT_CTRL> &evt, const char *txt)
    519 {
    520     factPrintf(kWarn, 601, "skip incomplete evt (run=%d, evt=%d, %s)",
    521                evt->runNum, evt->evNum, txt);
     635}
     636
     637
     638void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
     639{
     640    const int i = rBuf.sockId;
     641
     642    memcpy(evt->FADhead.get()+i, &rBuf.H, sizeof(PEVNT_HEADER));
     643
     644    int src = sizeof(PEVNT_HEADER) / 2;  // Header is 72 byte = 36 shorts
     645
     646    // consistency of ROIs have been checked already (is it all correct?)
     647    const uint16_t &roi = rBuf.S[src+2];
     648
     649    // different sort in FAD board.....
     650    for (int px = 0; px < 9; px++)
     651    {
     652        for (int drs = 0; drs < 4; drs++)
     653        {
     654            const int16_t pixC = rBuf.S[src+1];    // start-cell
     655            const int16_t pixR = rBuf.S[src+2];    // roi
     656            //here we should check if pixH is correct ....
     657
     658            const int pixS = i*36 + drs*9 + px;
     659
     660            evt->fEvent->StartPix[pixS] = pixC;
     661
     662            memcpy(evt->fEvent->Adc_Data + pixS*roi, &rBuf.S[src+4], roi * 2);
     663
     664            src += 4+pixR;
     665
     666            // Treatment for ch 9 (TM channel)
     667            if (px != 8)
     668                continue;
     669
     670            const int tmS = i*4 + drs;
     671
     672            //and we have additional TM info
     673            if (pixR > roi)
     674            {
     675                evt->fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024;
     676
     677                memcpy(evt->fEvent->Adc_Data + tmS*roi + NPIX*roi, &rBuf.S[src - roi], roi * 2);
     678            }
     679            else
     680            {
     681                evt->fEvent->StartTM[tmS] = -1;
     682            }
     683        }
     684    }
     685}
     686
     687// ==========================================================================
     688
     689uint64_t reportIncomplete(const shared_ptr<EVT_CTRL2> &evt, const char *txt)
     690{
     691    factPrintf(MessageImp::kWarn, "skip incomplete evt (run=%d, evt=%d, n=%d, %s)",
     692               evt->runNum, evt->evNum, evtCtrl.size(), txt);
    522693
    523694    uint64_t report = 0;
     
    541712        // accoridng to the current connection status, not w.r.t. to the
    542713        // one when the event was taken.
    543         if (gi_NumConnect[ib]<=0) // board not connected
     714        if (gi_NumConnect[ib]==0) // board not connected
    544715        {
    545716            str[ik++] = 'x';
     
    555726    str[ik]   = 0;
    556727
    557     factOut(kWarn, 601, str);
     728    factOut(MessageImp::kWarn, str);
    558729
    559730    return report;
    560731}
    561732
    562 // i == board
    563 void copyData(CNV_FACT *rbuf, int i, const shared_ptr<EVT_CTRL> &evt)
    564 {
    565     // swapEventHeaderBytes: End of the header. to channels now
    566     int eStart = 36;
    567     for (int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
    568     {
    569         rbuf->S[eStart+0] = ntohs(rbuf->S[eStart+0]);//id
    570         rbuf->S[eStart+1] = ntohs(rbuf->S[eStart+1]);//start_cell
    571         rbuf->S[eStart+2] = ntohs(rbuf->S[eStart+2]);//roi
    572         rbuf->S[eStart+3] = ntohs(rbuf->S[eStart+3]);//filling
    573 
    574         eStart += 4+rbuf->S[eStart+2];//skip the pixel data
    575     }
    576 
    577     memcpy(&evt->FADhead[i], rbuf, sizeof(PEVNT_HEADER));
    578 
    579     int src = sizeof(PEVNT_HEADER) / 2;
    580 
    581     // consistency of ROIs have been checked already (is it all correct?)
    582     const int roi = rbuf->S[src+2];
    583 
    584     // different sort in FAD board.....
    585     for (int px = 0; px < 9; px++)
    586     {
    587         for (int drs = 0; drs < 4; drs++)
    588         {
    589             // pixH = rd[i].rBuf->S[src++];    // ID
    590             src++;
    591 
    592             const int pixC = rbuf->S[src++];    // start-cell
    593             const int pixR = rbuf->S[src++];    // roi
    594             //here we should check if pixH is correct ....
    595 
    596             const int pixS = i * 36 + drs * 9 + px;
    597             src++;
    598 
    599             evt->fEvent->StartPix[pixS] = pixC;
    600 
    601             const int dest1 = pixS * roi;
    602             memcpy(&evt->fEvent->Adc_Data[dest1], &rbuf->S[src], roi * 2);
    603 
    604             src += pixR;
    605 
    606             if (px == 8)
     733// ==========================================================================
     734// ==========================================================================
     735
     736Queue<shared_ptr<EVT_CTRL2>> processingQueue1(bind(&applyCalib, placeholders::_1));
     737
     738// If this is not convenient anymore, it could be replaced by
     739// a command queue, to which command+data is posted,
     740// (e.g. runOpen+runInfo, runClose+runInfo, evtWrite+evtInfo)
     741void writeEvt(const shared_ptr<EVT_CTRL2> &evt)
     742{
     743    const shared_ptr<RUN_CTRL2> &run = evt->runCtrl;
     744
     745    bool rc1 = true;
     746
     747    // Is this a valid event or just an empty event to trigger run close?
     748    // If this is not an empty event open the new run-file
     749    // Empty events are there to trigger run-closing conditions
     750    if (evt->runNum>=0)
     751    {
     752        // File not yet open
     753        if (run->fileStat==kFileNotYetOpen)
     754        {
     755            // runOpen will close a previous run, if still open
     756            if (!runOpen(evt))
    607757            {
    608                 const int tmS = i * 4 + drs;
    609 
    610                 //and we have additional TM info
    611                 if (pixR > roi)
     758                factPrintf(MessageImp::kError, "writeEvt: Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
     759                run->fileStat = kFileClosed;
     760                return;
     761            }
     762
     763            factPrintf(MessageImp::kInfo, "writeEvt: Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
     764            run->fileStat = kFileOpen;
     765        }
     766
     767        // Here we have a valid calibration and can go on with that.
     768        processingQueue1.post(evt);
     769
     770        // File already closed
     771        if (run->fileStat==kFileClosed)
     772            return;
     773
     774        rc1 = runWrite(evt);
     775        if (!rc1)
     776            factPrintf(MessageImp::kError, "writeEvt: Writing event %d for run %d failed (runWrite)", evt->evNum, evt->runNum);
     777    }
     778
     779    const bool cond1 = run->lastEvt < run->maxEvt;      // max number of events not reached
     780    const bool cond2 = run->lastTime < run->closeTime;  // max time not reached
     781    const bool cond3 = rc1;                             // Write successfull
     782
     783    // File is not yet to be closed.
     784    if (cond1 && cond2 && cond3)
     785        return;
     786
     787    runClose();
     788    run->fileStat = kFileClosed;
     789
     790    string str;
     791    if (!cond1) str += to_string(run->maxEvt)+" evts reached";
     792    if (!cond1 && (!cond2 || !cond3)) str += ", ";
     793    if (!cond2) str += to_string(run->closeTime-run->openTime)+"s reached";
     794    if ((!cond1 || !cond2) && !cond3) str += ", ";
     795    if (!cond3) str += "runWrite failed";
     796    factPrintf(MessageImp::kInfo, "File closed because %s",  str.c_str());
     797}
     798
     799Queue<shared_ptr<EVT_CTRL2>> secondaryQueue(bind(&writeEvt, placeholders::_1));
     800
     801void procEvt(const shared_ptr<EVT_CTRL2> &evt)
     802{
     803    if (evt->runNum>=0)
     804    {
     805        evt->fEvent->Errors[0] = evt->Errors[0];
     806        evt->fEvent->Errors[1] = evt->Errors[1];
     807        evt->fEvent->Errors[2] = evt->Errors[2];
     808        evt->fEvent->Errors[3] = evt->Errors[3];
     809
     810        for (int ib=0; ib<NBOARDS; ib++)
     811            evt->fEvent->BoardTime[ib] = evt->FADhead.get()[ib].time;
     812
     813        const int rc = eventCheck(evt);
     814        if (rc < 0)
     815            return;
     816    }
     817
     818    // If file is open post the event for being written
     819    secondaryQueue.post(evt);
     820}
     821
     822// ==========================================================================
     823// ==========================================================================
     824
     825shared_ptr<RUN_CTRL2> actrun; // needed in CloseRunFile
     826
     827/*
     828 task 1-4:
     829
     830 lock1()-lock4();
     831 while (1)
     832 {
     833       wait for signal [lockN];  // unlocked
     834
     835       while (n!=10)
     836         wait sockets;
     837         read;
     838
     839       lockM();
     840       finished[n] = true;
     841       signal(mainloop);
     842       unlockM();
     843 }
     844
     845
     846 mainloop:
     847
     848 while (1)
     849 {
     850       lockM();
     851       while (!finished[0] || !finished[1] ...)
     852          wait for signal [lockM];  // unlocked... signals can be sent
     853       finished[0-1] = false;
     854       unlockM()
     855
     856       copy data to queue    // locked
     857
     858       lockN[0-3];
     859       signalN[0-3];
     860       unlockN[0-3];
     861 }
     862
     863
     864 */
     865
     866/*
     867    while (g_reset)
     868    {
     869        shared_ptr<EVT_CTRL2> evt = new shared_ptr<>;
     870
     871        // Check that all sockets are connected
     872
     873        for (int i=0; i<40; i++)
     874            if (rd[i].connected && epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, NULL)<0)
     875               factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
     876
     877        while (g_reset)
     878        {
     879           if (READ_STRUCT::wait()<0)
     880              break;
     881
     882           if (rc_epoll==0)
     883              break;
     884
     885           for (int jj=0; jj<rc_epoll; jj++)
     886           {
     887              READ_STRUCT *rs = READ_STRUCT::get(jj);
     888              if (!rs->connected)
     889                  continue;
     890
     891              const bool rc_read = rs->read();
     892              if (!rc_read)
     893                  continue;
     894
     895              if (rs->bufTyp==READ_STRUCT::kHeader)
     896              {
     897                  [...]
     898              }
     899
     900              [...]
     901
     902              if (epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
     903                 factPrintf(kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
     904           }
     905
     906           if (once_a_second)
     907           {
     908              if (evt==timeout)
     909                  break;
     910           }
     911        }
     912
     913        if (evt.nBoards==actBoards)
     914            primaryQueue.post(evt);
     915    }
     916*/
     917
     918void CloseRunFile()
     919{
     920    // Create a copy of the shared_ptr to ensure
     921    // is not replaced in the middle of the action
     922    const shared_ptr<RUN_CTRL2> run = actrun;
     923    run->maxEvt = run->lastEvt;
     924}
     925
     926bool mainloop(READ_STRUCT *rd)
     927{
     928    factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
     929
     930    Queue<shared_ptr<EVT_CTRL2>> primaryQueue(bind(&procEvt, placeholders::_1));
     931
     932    primaryQueue.start();
     933    secondaryQueue.start();
     934
     935    actrun = shared_ptr<RUN_CTRL2>(new RUN_CTRL2);
     936
     937    //time in seconds
     938    time_t gi_SecTime = time(NULL)-1;
     939
     940    //loop until global variable g_runStat claims stop
     941    g_reset = 0;
     942    while (g_reset == 0)
     943    {
     944#ifdef USE_SELECT
     945        fd_set readfs;
     946        FD_ZERO(&readfs);
     947        int nfsd = 0;
     948        for (int i=0; i<NBOARDS; i++)
     949            if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
     950            {
     951                FD_SET(rd[i].socket, &readfs);
     952                if (rd[i].socket>nfsd)
     953                    nfsd = rd[i].socket;
     954            }
     955
     956        timeval tv;
     957        tv.tv_sec = 0;
     958        tv.tv_usec = 100;
     959        const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
     960        // 0: timeout
     961        // -1: error
     962        if (rc_select<0)
     963        {
     964            factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
     965            continue;
     966        }
     967#endif
     968
     969#ifdef USE_EPOLL
     970        const int rc_epoll = READ_STRUCT::wait();
     971        if (rc_epoll<0)
     972            break;
     973#endif
     974
     975#ifdef USE_EPOLL
     976        for (int jj=0; jj<rc_epoll; jj++)
     977#else
     978        for (int jj=0; jj<NBOARDS; jj++)
     979#endif
     980        {
     981#ifdef USE_EPOLL
     982            // FIXME: How to get i?
     983            READ_STRUCT *rs = READ_STRUCT::get(jj);
     984#else
     985
     986            const int i = (jj%4)*10 + (jj/4);
     987            READ_STRUCT *rs = &rd[i];
     988            if (!rs->connected)
     989                continue;
     990#endif
     991
     992#ifdef USE_SELECT
     993            if (!FD_ISSET(rs->socket, &readfs))
     994                continue;
     995#endif
     996
     997
     998#ifdef COMPLETE_EVENTS
     999            if (rs->bufTyp==READ_STRUCT::kWait)
     1000                continue;
     1001#endif
     1002
     1003            // ==================================================================
     1004
     1005            const bool rc_read = rs->read();
     1006
     1007            // Connect might have gotten closed during read
     1008            gi_NumConnect[rs->sockId] = rs->connected;
     1009            gj.numConn[rs->sockId]    = rs->connected;
     1010
     1011            // Read either failed or disconnected, or the buffer is not yet full
     1012            if (!rc_read)
     1013                continue;
     1014
     1015            // ==================================================================
     1016
     1017            if (rs->bufTyp==READ_STRUCT::kHeader)
     1018            {
     1019                //check if startflag correct; else shift block ....
     1020                // FIXME: This is not enough... this combination of
     1021                //        bytes can be anywhere... at least the end bytes
     1022                //        must be checked somewhere, too.
     1023                uint k;
     1024                for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
    6121025                {
    613                     const int dest2 = tmS * roi + NPIX * roi;
    614 
    615                     const int srcT = src - roi;
    616                     evt->fEvent->StartTM[tmS] = (pixC + pixR - roi) % 1024;
    617 
    618                     memcpy(&evt->fEvent->Adc_Data[dest2], &rbuf->S[srcT], roi * 2);
     1026                    if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
     1027                    //if (*reinterpret_cast<uint16_t*>(rs->B+k) == 0xfb01)
     1028                        break;
    6191029                }
     1030                rs->skip += k;
     1031
     1032                //no start of header found
     1033                if (k==sizeof(PEVNT_HEADER)-1)
     1034                {
     1035                    rs->B[0]   = rs->B[sizeof(PEVNT_HEADER)-1];
     1036                    rs->bufPos = rs->B+1;
     1037                    rs->bufLen = sizeof(PEVNT_HEADER)-1;
     1038                    continue;
     1039                }
     1040
     1041                if (k > 0)
     1042                {
     1043                    memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
     1044
     1045                    rs->bufPos -= k;
     1046                    rs->bufLen += k;
     1047
     1048                    continue; // We need to read more (bufLen>0)
     1049                }
     1050
     1051                if (rs->skip>0)
     1052                {
     1053                    factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
     1054                    rs->skip = 0;
     1055                }
     1056
     1057                // Swap the header entries from network to host order
     1058                rs->swapHeader();
     1059
     1060                rs->bufTyp = READ_STRUCT::kData;
     1061                rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
     1062
     1063                debugHead(rs->B);  // i and fadBoard not used
     1064
     1065                continue;
     1066            }
     1067
     1068            const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
     1069            if (end != 0xfe04)
     1070            {
     1071                factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
     1072                           rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
     1073
     1074                // ready to read next header
     1075                rs->bufTyp = READ_STRUCT::kHeader;
     1076                rs->bufLen = sizeof(PEVNT_HEADER);
     1077                rs->bufPos = rs->B;
     1078                // FIXME: What to do with the validity flag?
     1079                continue;
     1080            }
     1081
     1082            // get index into mBuffer for this event (create if needed)
     1083            const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
     1084
     1085            // We have a valid entry, but no memory has yet been allocated
     1086            if (evt && !evt->FADhead)
     1087            {
     1088                // Try to get memory from the big buffer
     1089                PEVNT_HEADER *mem = (PEVNT_HEADER*)Memory::malloc();
     1090                if (!mem)
     1091                {
     1092                    // If this works properly, this is a hack which can be removed, or
     1093                    // replaced by a signal or dim message
     1094                    if (!rs->repmem)
     1095                    {
     1096                        factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
     1097                        rs->repmem = true;
     1098                    }
     1099                    continue;
     1100                }
     1101
     1102                evt->initEvent(shared_ptr<PEVNT_HEADER>(mem, Memory::free));
     1103            }
     1104
     1105            // ready to read next header
     1106            rs->bufTyp = READ_STRUCT::kHeader;
     1107            rs->bufLen = sizeof(PEVNT_HEADER);
     1108            rs->bufPos = rs->B;
     1109
     1110            // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
     1111            if (!evt)
     1112                continue;
     1113
     1114            /*
     1115             const int fad = (i/10)<<8)|(i%10);
     1116             if (fad != rs->H.board_id)
     1117             {
     1118                 factPrintf(MessageImp::kWarn, "Board ID mismatch. Expected %x, got %x", fad, rs->H.board_id);
     1119             }*/
     1120
     1121            // This should never happen
     1122            if (evt->board[rs->sockId] != -1)
     1123            {
     1124                factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
     1125                           evt->evNum, rs->sockId, rs->sockId, rs->len());
     1126                // FIXME: What to do with the validity flag?
     1127                continue; // Continue reading next header
     1128            }
     1129
     1130            // Swap the data entries (board headers) from network to host order
     1131            rs->swapData();
     1132
     1133            // Copy data from rd[i] to mBuffer[evID]
     1134            copyData(*rs, evt.get());
     1135
     1136#ifdef COMPLETE_EVENTS
     1137            // Do not read anmymore from this board until the whole event has been received
     1138            rs->bufTyp = READ_STRUCT::kWait;
     1139#endif
     1140            // now we have stored a new board contents into Event structure
     1141            evt->fEvent->NumBoards++;
     1142            evt->board[rs->sockId] = rs->sockId;
     1143            evt->nBoard++;
     1144
     1145            // event not yet complete
     1146            if (evt->nBoard < READ_STRUCT::activeSockets)
     1147                continue;
     1148
     1149            // All previous events are now flagged as incomplete ("expired")
     1150            // and will be removed. (This is a bit tricky, because pop_front()
     1151            // would invalidate the current iterator if not done _after_ the increment)
     1152            for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
     1153            {
     1154                const bool found = it->get()==evt.get();
     1155                if (!found)
     1156                    reportIncomplete(*it, "expired");
    6201157                else
     1158                    primaryQueue.post(evt);
     1159
     1160                it++;
     1161                evtCtrl.pop_front();
     1162
     1163                // We reached the current event, so we are done
     1164                if (found)
     1165                    break;
     1166            }
     1167
     1168#ifdef COMPLETE_EVENTS
     1169            for (int j=0; j<40; j++)
     1170            {
     1171                //if (rs->bufTyp==READ_STRUCT::kWait)
    6211172                {
    622                     evt->fEvent->StartTM[tmS] = -1;
     1173                    rs->bufTyp = READ_STRUCT::kHeader;
     1174                    rs->bufLen = sizeof(PEVNT_HEADER);
     1175                    rs->bufPos = rs->B;
    6231176                }
    6241177            }
    625         }
    626     }
    627 }
    628 
    629 void doProcess(const shared_ptr<EVT_CTRL> &evt);
    630 void doWrite(const shared_ptr<EVT_CTRL> &evt);
    631 
    632 void checkAndCloseRun(const shared_ptr<RUN_CTRL> &run, int cond, int where);
    633 
    634 Queue<shared_ptr<EVT_CTRL>> process(bind(doProcess, placeholders::_1));
    635 Queue<shared_ptr<EVT_CTRL>> write_queue(bind(doWrite, placeholders::_1));
    636 
    637 void preProcess(const shared_ptr<EVT_CTRL> &evt)
    638 {
    639     //-------- it is better to open the run already here, so call can be used to initialize
    640     //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
    641     const shared_ptr<RUN_CTRL> run = evt->run;
    642     if (run->runId==0)
    643         return;
    644 
    645     // File not yet open
    646     if (run->fileId < 0)
    647     {
    648         RUN_HEAD actRun;
    649         actRun.Version =  1;
    650         actRun.RunType = -1;  //to be adapted
    651         actRun.Nroi    = evt->nRoi;      //runCtrl[lastRun].roi0;
    652         actRun.NroiTM  = evt->nRoiTM;    //runCtrl[lastRun].roi8;
    653         actRun.RunTime = evt->pcTime[0]; //runCtrl[lastRun].firstTime;
    654         actRun.RunUsec = evt->pcTime[1]; //runCtrl[lastRun].firstUsec;
    655         actRun.NBoard  = NBOARDS;
    656         actRun.NPix    = NPIX;
    657         actRun.NTm     = NTMARK;
    658 
    659         memcpy(actRun.FADhead, evt->FADhead, NBOARDS*sizeof(PEVNT_HEADER));
    660 
    661         run->fileHd = runOpen(evt->runNum, &actRun, sizeof (actRun));
    662         if (run->fileHd == NULL)
    663         {
    664             factPrintf(kError, 502, "procEvt: Could not open new file for run %d (evt=%d, runOpen failed)", evt->runNum, evt->evNum);
    665             run->fileId = 91;
    666 
    667             // No further processing of this event
    668             return;
    669         }
    670 
    671         run->fileId = 0;
    672         factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", evt->runNum, evt->evNum);
    673     }
    674 
    675     //and set correct event header ; also check for consistency in event (not yet)
    676     evt->fEvent->Errors[0] = evt->Errors[0];
    677     evt->fEvent->Errors[1] = evt->Errors[1];
    678     evt->fEvent->Errors[2] = evt->Errors[2];
    679     evt->fEvent->Errors[3] = evt->Errors[3];
    680 
    681     for (int ib=0; ib<NBOARDS; ib++)
    682     {
    683         // board is not read
    684         if (evt->board[ib] == -1)
    685         {
    686             evt->FADhead[ib].start_package_flag = 0;
    687             evt->fEvent->BoardTime[ib] = 0;
    688         }
    689         else
    690         {
    691             evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time;
    692         }
    693     }
    694 
    695     const int rc = eventCheck(evt->runNum, evt->FADhead, evt->fEvent);
    696 
    697     // no further processing of event ('delete')
    698     if (rc < 0)
    699         return;
    700 
    701     //evt->evtStat = 1000;       // flag 'start processing'
    702     run->procEvt++;
    703     process.post(evt);
    704 }
    705 
    706 void doProcess(const shared_ptr<EVT_CTRL> &evt)
    707 {
    708     const int jret = subProcEvt(1, evt->FADhead, evt->fEvent, 0);
    709 
    710     if (jret>0 && jret<=1)
    711         factPrintf(kError, -1, "Process wants to send event to process %d... not allowed.", jret);
    712 
    713     // flag as 'to be written'
    714     if (jret<=1)
    715         return;
    716 
    717     //evt->evtStat = 5000;
    718     write_queue.post(evt);
    719 }
    720 
    721 void doWrite(const shared_ptr<EVT_CTRL> &evt)
    722 {
    723     const shared_ptr<RUN_CTRL> run = evt->run;
    724     if (run->runId==0)
    725         return;
    726 
    727     // File is not open
    728     if (run->fileId!=0)
    729         return;
    730 
    731     const int rc = runWrite(run->fileHd, evt->fEvent, 0);
    732     if (rc >= 0)
    733     {
    734         // Sucessfully wrote event
    735         run->lastTime = g_actTime;
    736         run->actEvt++;
    737     }
    738     else
    739         factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", evt->runNum);
    740 
    741     checkAndCloseRun(run, rc<0, 1);
    742 
    743     /*
    744     // Although the are no pending events, we have to check if a run should be closed (timeout)
    745     for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
    746     {
    747         if (ir->second->fileId == 0)
    748         {
    749             //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
    750             const int cond = ir->second->runId == 0;
    751             checkAndCloseRun(ir->second, cond, 2);
    752         }
    753     }
    754     */
    755 }
    756 
    757 void *readFAD (void *ptr)
    758 {
    759 /* *** main loop reading FAD data and sorting them to complete events */
    760 
    761     Queue<shared_ptr<EVT_CTRL>> queue(bind(preProcess, placeholders::_1));
    762 
    763     factPrintf(kInfo, -1, "Start initializing (readFAD)");
    764 
    765     READ_STRUCT rd[NBOARDS];       //buffer to read IP and afterwards store in mBuffer
    766 
    767     uint32_t actrun = 0;
    768 
    769    const int minLen = sizeof(PEVNT_HEADER);  //min #bytes needed to check header: full header for debug
    770 
    771 
    772    int gi_reset, gi_resetR, gi_resetS, gi_resetW, gi_resetX;
    773    gi_resetS = gi_resetR = 9;
    774 
    775    int sockDef[NBOARDS];        //internal state of sockets
    776    memset(sockDef, 0, NBOARDS*sizeof(int));
    777 
    778  START:
    779    //time in seconds
    780    uint gi_SecTime = time(NULL);;
    781    g_actTime = gi_SecTime;
    782 
    783    const int cntsock = 8 - NUMSOCK ;
    784 
    785    if (gi_resetS > 0) {
    786       //make sure all sockets are preallocated as 'not exist'
    787       for (int i = 0; i < NBOARDS; i++) {
    788          rd[i].socket = -1;
    789          rd[i].sockStat = 99;
    790       }
    791 
    792       for (int k = 0; k < NBOARDS; k++) {
    793          gi_NumConnect[k] = 0;
    794          //gi.numConn[k] = 0;
    795          gj.numConn[k] = 0;
    796          //gj.errConn[k] = 0;
    797          gj.rateBytes[k] = 0;
    798          gj.totBytes[k] = 0;
    799       }
    800 
    801    }
    802 
    803    if (gi_resetR > 0)
    804    {
    805       gj.bufTot  = gj.maxEvt = gj.xxxEvt = 0;
    806       gj.usdMem  = gj.maxMem = gj.xxxMem = 0;
    807       gj.totMem  = tgb_memory;
    808       gj.bufNew  = gj.bufEvt = 0;
    809       gj.evtSkip = gj.evtWrite = gj.evtErr = 0;
    810 
    811       factPrintf(kInfo, -1, "End   initializing (readFAD)");
    812    }
    813 
    814 
    815    gi_reset = gi_resetR = gi_resetS = gi_resetW = 0;
    816 
    817    //loop until global variable g_runStat claims stop
    818    while (g_runStat >= 0 && g_reset == 0)
    819    {
    820       gj.readStat = g_runStat;
    821 
    822       for (int b = 0; b < NBOARDS; b++)
    823       {
    824           // Nothing has changed
    825           if (g_port[b].sockDef == sockDef[b])
    826               continue;
    827 
    828           gi_NumConnect[b] = 0;       //must close all connections
    829           gj.numConn[b] = 0;
    830 
    831           // s0 =  0: sockets to be defined and opened
    832           // s0 = -1: sockets to be destroyed
    833           // s0 = +1: sockets to be closed and reopened
    834 
    835           int s0 = 0;
    836           if (sockDef[b] != 0)
    837               s0 = g_port[b].sockDef==0 ? -1 : +1;
    838 
    839           const int p0 = s0==0 ? ntohs (g_port[b].sockAddr.sin_port) : 0;
    840 
    841           GenSock(s0, b, p0+1, &g_port[b].sockAddr, &rd[b]); //generate address and socket
    842 
    843           sockDef[b] = g_port[b].sockDef;
    844       }
    845 
    846       // count the number of active boards
    847       int actBoards = 0;
    848       for (int b = 0; b < NBOARDS; b++)
    849           if (sockDef[b] > 0)
    850               actBoards++;
    851 
    852       //check all sockets if something to read
    853       for (int i = 0; i < NBOARDS; i++)
    854       {
    855           // Do not try to connect this socket
    856           if (rd[i].sockStat > 0)
    857               continue;
    858 
    859           if (rd[i].sockStat == -1)
    860           {
    861               //try to connect if not yet done
    862               rd[i].sockStat = connect (rd[i].socket,
    863                                         (struct sockaddr *) &rd[i].SockAddr,
    864                                         sizeof (rd[i].SockAddr));
    865               // Failed
    866               if (rd[i].sockStat == -1)
    867               {
    868                   rd[i].errCnt++;
    869                   usleep(25000);
    870                   continue;
    871               }
    872 
    873               // Success (rd[i].sockStat == 0)
    874 
    875               if (sockDef[i] > 0)
    876               {
    877                   rd[i].bufTyp = 0;                     // expect a header
    878                   rd[i].bufLen = sizeof(PEVNT_HEADER);  // max size to read at begining
    879               }
    880               else
    881               {
    882                   rd[i].bufTyp = -1;       // full data to be skipped
    883                   rd[i].bufLen = MAX_LEN;  // huge for skipping
    884               }
    885 
    886               rd[i].bufPos = 0;  //  no byte read so far
    887               rd[i].skip   = 0;  //  start empty
    888 
    889               gi_NumConnect[i] += cntsock;
    890               gj.numConn[i]++;
    891 
    892               factPrintf(kInfo, -1, "New connection %d (number of connections: %d)", i, gj.numConn[i]);
    893           }
    894 
    895           // Do not read from this socket
    896           if (rd[i].bufLen<0)
    897               continue;
    898 
    899           if (rd[i].bufLen>0)
    900           {
    901               const int32_t jrd =
    902                   recv(rd[i].socket, &rd[i].rBuf->B[rd[i].bufPos],
    903                        rd[i].bufLen, MSG_DONTWAIT);
    904 
    905               // recv failed
    906               if (jrd<0)
    907               {
    908                   // There was just nothing waiting
    909                   if (errno==EWOULDBLOCK || errno==EAGAIN)
    910                       continue;
    911 
    912                   factPrintf(kError, 442, "Reading from socket %d failed: %m (recv,rc=%d)", i, errno);
    913                   continue;
    914               }
    915 
    916               // connection was closed ...
    917               if (jrd==0)
    918               {
    919                   factPrintf(kInfo, 441, "Socket %d closed by FAD", i);
    920 
    921                   const int s0 = sockDef[i] > 0 ? +1 : -1;
    922                   GenSock(s0, i, 0, NULL, &rd[i]);
    923 
    924                   gi_NumConnect[i]-= cntsock ;
    925                   gj.numConn[i]--;
    926 
    927                   continue;
    928               }
    929 
    930               gj.rateBytes[i] += jrd;
    931 
    932               // are we skipping this board ...
    933               if (rd[i].bufTyp < 0)
    934                   continue;
    935 
    936               rd[i].bufPos += jrd;  //==> prepare for continuation
    937               rd[i].bufLen -= jrd;
    938 
    939 #ifdef EVTDEBUG
    940               debugRead(i, jrd, rd[i].evtID, rd[i].ftmID, rd[i].runID, rd[i].bufTyp, tv.tv_sec, tv.tv_usec); // i=socket; jrd=#bytes; ievt=eventid; 1=finished event
    9411178#endif
    942           }
    943 
    944           //we are reading event header
    945           if (rd[i].bufTyp <= 0)
    946           {
    947               //not yet sufficient data to take action
    948               if (rd[i].bufPos < minLen)
    949                   continue;
    950 
    951               //check if startflag correct; else shift block ....
    952               // FIXME: This is not enough... this combination of
    953               //        bytes can be anywhere... at least the end bytes
    954               //        must be checked somewhere, too.
    955               int k;
    956               for (k = 0; k < rd[i].bufPos - 1; k++)
    957               {
    958                   //start.S = 0xFB01;
    959                   if (rd[i].rBuf->B[k] == 0xfb && rd[i].rBuf->B[k+1] == 0x01)
    960                       break;
    961               }
    962               rd[i].skip += k;
    963 
    964               //no start of header found
    965               if (k >= rd[i].bufPos - 1)
    966               {
    967                   rd[i].rBuf->B[0] = rd[i].rBuf->B[rd[i].bufPos];
    968                   rd[i].bufPos = 1;
    969                   rd[i].bufLen = sizeof(PEVNT_HEADER)-1;
    970                   continue;
    971               }
    972 
    973               if (k > 0)
    974               {
    975                   rd[i].bufPos -= k;
    976                   rd[i].bufLen += k;
    977                   memmove (&rd[i].rBuf->B[0], &rd[i].rBuf->B[k],
    978                            rd[i].bufPos);
    979               }
    980 
    981               if (rd[i].bufPos < minLen)
    982                   continue;
    983 
    984               if (rd[i].skip > 0)
    985               {
    986                   factPrintf(kInfo, 666, "Skipped %d bytes on port %d", rd[i].skip, i);
    987                   rd[i].skip = 0;
    988               }
    989 
    990               // TGB: This needs much more checks than just the first two bytes!
    991 
    992               // Swap everything except start_package_flag.
    993               // It is to difficult to find out where it is used how,
    994               // but it doesn't really matter because it is not really
    995               // used anywehere else
    996               //   rd[i].rBuf->S[1]  = ntohs(rd[i].rBuf->S[1]);    // package_length
    997               rd[i].rBuf->S[2]  = ntohs(rd[i].rBuf->S[2]);    // version_no
    998               rd[i].rBuf->S[3]  = ntohs(rd[i].rBuf->S[3]);    // PLLLCK
    999               rd[i].rBuf->S[4]  = ntohs(rd[i].rBuf->S[4]);    // trigger_crc
    1000               rd[i].rBuf->S[5]  = ntohs(rd[i].rBuf->S[5]);    // trigger_type
    1001 
    1002               rd[i].rBuf->S[12] = ntohs(rd[i].rBuf->S[12]);   // board id
    1003               rd[i].rBuf->S[13] = ntohs(rd[i].rBuf->S[13]);   // adc_clock_phase_shift
    1004               rd[i].rBuf->S[14] = ntohs(rd[i].rBuf->S[14]);   // number_of_triggers_to_generate
    1005               rd[i].rBuf->S[15] = ntohs(rd[i].rBuf->S[15]);   // trigger_generator_prescaler
    1006 
    1007               rd[i].rBuf->I[3]  = ntohl(rd[i].rBuf->I[3]);    // trigger_id
    1008               rd[i].rBuf->I[4]  = ntohl(rd[i].rBuf->I[4]);    // fad_evt_counter
    1009               rd[i].rBuf->I[5]  = ntohl(rd[i].rBuf->I[5]);    // REFCLK_frequency
    1010 
    1011               rd[i].rBuf->I[10] = ntohl(rd[i].rBuf->I[10]);   // runnumber;
    1012               rd[i].rBuf->I[11] = ntohl(rd[i].rBuf->I[11]);   // time;
    1013 
    1014               for (int s=24;s<24+NTemp+NDAC;s++)
    1015                   rd[i].rBuf->S[s] = ntohs(rd[i].rBuf->S[s]); // drs_temperature / dac
    1016 
    1017               rd[i].fadLen  = ntohs(rd[i].rBuf->S[1]) * 2;
    1018               rd[i].fadVers = rd[i].rBuf->S[2];
    1019               rd[i].ftmTyp  = rd[i].rBuf->S[5];
    1020               rd[i].ftmID   = rd[i].rBuf->I[3];    //(FTMevt)
    1021               rd[i].evtID   = rd[i].rBuf->I[4];    //(FADevt)
    1022               rd[i].runID   = rd[i].rBuf->I[11]==0 ? g_actTime : rd[i].rBuf->I[11];
    1023               rd[i].bufTyp  = 1;  //ready to read full record
    1024               rd[i].bufLen  = rd[i].fadLen - rd[i].bufPos;
    1025 
    1026               const int fadBoard = rd[i].rBuf->S[12];
    1027               debugHead(i, fadBoard, rd[i].rBuf);
    1028 
    1029               continue;
    1030           }
    1031 
    1032           // are we reading data
    1033 
    1034           // not yet all read
    1035           if (rd[i].bufLen > 0)
    1036               continue;
    1037 
    1038           // stop.S = 0x04FE;
    1039           if (rd[i].rBuf->B[rd[i].fadLen - 1] != 0xfe ||
    1040               rd[i].rBuf->B[rd[i].fadLen - 2] != 0x04)
    1041           {
    1042               factPrintf(kError, 301, "End-of-event flag wrong on socket %3d for event %4d (len=%5d), got %3d %3d",
    1043                          i, rd[i].evtID, rd[i].fadLen,
    1044                          rd[i].rBuf->B[rd[i].fadLen - 1], rd[i].rBuf->B[rd[i].fadLen - 2]);
    1045 
    1046               // ready to read next header
    1047               rd[i].bufTyp = 0;
    1048               rd[i].bufLen = sizeof(PEVNT_HEADER);
    1049               rd[i].bufPos = 0;
    1050 
    1051               continue;
    1052           }
    1053 
    1054           //  int actid;
    1055           //  if (g_useFTM > 0)
    1056           //     actid = rd[i].evtID;
    1057           //  else
    1058           //     actid = rd[i].ftmID;
    1059 
    1060           //get index into mBuffer for this event (create if needed)
    1061           const shared_ptr<EVT_CTRL> evt = mBufEvt(&rd[i]);
    1062 
    1063           // We have a valid entry, but no memory has yet been allocated
    1064           if (evt && evt->FADhead == NULL)
    1065           {
    1066               // Try to get memory from the big buffer
    1067               evt->FADhead = (PEVNT_HEADER*)TGB_Malloc();
    1068               if (evt->FADhead == NULL)
    1069               {
    1070                   // If this works properly, this is a hack which can be removed, or
    1071                   // replaced by a signal or dim message
    1072                   if (rd[i].bufTyp==2)
    1073                       factPrintf(kError, 882, "malloc failed for event %d (run=%d)", evt->evNum, evt->runNum);
    1074                   rd[i].bufTyp = 2;
    1075                   continue;
    1076               }
    1077 
    1078               // Initialise contents of mBuffer[evID]->fEvent
    1079               initEvent(evt);
    1080 
    1081               // Some statistics
    1082               gj.usdMem = tgb_inuse;
    1083 
    1084               if (gj.usdMem > gj.maxMem)
    1085                   gj.maxMem = gj.usdMem;
    1086 
    1087               gj.rateNew++;
    1088               gj.bufTot++;
    1089               if (gj.bufTot > gj.maxEvt)
    1090                   gj.maxEvt = gj.bufTot;
    1091           }
    1092 
    1093           rd[i].bufTyp = 0;
    1094           rd[i].bufLen = sizeof(PEVNT_HEADER);
    1095           rd[i].bufPos = 0;
    1096 
    1097           // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
    1098           if (!evt)
    1099               continue;
    1100 
    1101           //we have a valid entry in mBuffer[]; fill it
    1102           const int fadBoard = rd[i].rBuf->S[12];
    1103           const int fadCrate = fadBoard>>8;
    1104 
    1105           if (i != (fadCrate * 10 + (fadBoard&0xff)))
    1106           {
    1107               factPrintf(kWarn, 301, "Board ID mismatch. Expected %d, got %d (C=%d, B=%d)",
    1108                          i, fadBoard, fadCrate, fadBoard&0xff);
    1109           }
    1110 
    1111           if (evt->board[i] != -1)
    1112           {
    1113               factPrintf(kWarn, 501, "Got event %5d from board %3d (i=%3d, len=%5d) twice: Starts with %3d %3d - ends with %3d %3d",
    1114                          evt->evNum, i, i, rd[i].fadLen,
    1115                          rd[i].rBuf->B[0], rd[i].rBuf->B[1],
    1116                          rd[i].rBuf->B[rd[i].fadLen - 2],
    1117                          rd[i].rBuf->B[rd[i].fadLen - 1]);
    1118               continue; // Continue reading next header
    1119           }
    1120 
    1121           // Copy data from rd[i] to mBuffer[evID]
    1122           copyData(rd[i].rBuf, i, evt);
    1123 
    1124           // now we have stored a new board contents into Event structure
    1125 
    1126           evt->fEvent->NumBoards++;
    1127           evt->board[i] = i;
    1128           evt->nBoard++;
    1129           evt->evtStat = evt->nBoard;
    1130 
    1131           // have we already reported first (partial) event of this run ???
    1132           if (evt->nBoard==1 && evt->runNum != actrun)
    1133           {
    1134              // Signal the fadctrl that a new run has been started
    1135               gotNewRun(evt->runNum, NULL);
    1136 
    1137               factPrintf(kInfo, 1, "gotNewRun called, prev run %d, new run %d, event %d",
    1138                          actrun, evt->runNum, evt->evNum);
    1139 
    1140               // We got the first part of this event, so this is
    1141               // the number of events we expect for this run
    1142               evt->run->lastEvt++;
    1143 
    1144               // Since we have started a new run, we know already when to close the
    1145               // previous run in terms of number of events
    1146               const auto ir = runCtrl.find(actrun);
    1147               if (ir!=runCtrl.end())
    1148                   ir->second->maxEvt = ir->second->lastEvt;
    1149 
    1150               // Change 'actrun' the the new runnumber
    1151               actrun = evt->runNum;
    1152           }
    1153 
    1154           // event not yet complete
    1155           if (evt->nBoard < actBoards)
    1156               continue;
    1157 
    1158           // GARBAGE COLLECTION
    1159           // This is a non-ideal hack to lower the probability that
    1160           // in mBufEvt the search for correct entry in runCtrl
    1161           // will not return a super-old entry. I don't want
    1162           // to manipulate that in another thread.
    1163           for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
    1164           {
    1165               if (ir->runId==evt->runNum)
    1166                   break;
    1167 
    1168               if (ir->second->fileId>0)
    1169                   runCtrl.erase(ir);
    1170           }
    1171 
    1172           // we have just completed an event... so all previous events
    1173           // must have been completed already. If they are not, there
    1174           // is no need to wait for the timeout, because they will never
    1175           // get completed. We can just ensure that if we check for the previous
    1176           // event to be complete every time we receive a new complete event.
    1177           // If we find an incomplete one, we remove all consecutive
    1178           // incomplete ones.
    1179           for (auto it=evtCtrl.begin()+1; it!=evtCtrl.end(); it++)
    1180           {
    1181               const shared_ptr<EVT_CTRL> e = *it;
    1182 
    1183               if (e.get()==evt.get())
    1184               {
    1185                   queue.post(e);
    1186                   evtCtrl.erase(it);
    1187                   break;
    1188               }
    1189 
    1190               reportIncomplete(e, "expired");
    1191               evtCtrl.pop_front();
    1192           }
    1193 
    1194       } // end for loop over all sockets
    1195 
    1196       g_actTime = time (NULL);
    1197       if (g_actTime <= gi_SecTime)
    1198       {
    1199           usleep(1);
    1200           continue;
    1201       }
    1202       gi_SecTime = g_actTime;
    1203 
    1204       gj.bufNew = 0;
    1205 
    1206       //loop over all active events and flag those older than read-timeout
    1207       //delete those that are written to disk ....
    1208 
    1209       const int count = evtCtrl.size();//(evtCtrl_lastPtr-evtCtrl_frstPtr+MAX_EVT)%MAX_EVT;
    1210 
    1211       // This could be improved having the pointer which separates the queue with
    1212       // the incomplete events from the queue with the complete events
    1213       for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
    1214       {
    1215           const shared_ptr<EVT_CTRL> evt = *it;
    1216 
    1217           // Check the more likely case first: incomplete events
    1218           if (evt->evtStat>=0 && evt->evtStat<100)
    1219           {
    1220               gj.bufNew++;     //incomplete event in Buffer
    1221 
    1222               // Event has not yet timed out or was reported already
    1223               if (evt->evtStat==90 || evt->pcTime[0]>=g_actTime - 30)
    1224                   continue;
    1225 
    1226               // This will result in the emission of a dim service.
    1227               // It doesn't matter if that takes comparably long,
    1228               // because we have to stop the run anyway.
    1229               const uint64_t rep = reportIncomplete(evt, "timeout");
    1230               factReportIncomplete(rep);
    1231 
    1232               //timeout for incomplete events
    1233               evt->evtStat = 90;
    1234               gj.evtSkip++;
    1235 
    1236               continue;
    1237           }
    1238 
    1239           // Check the less likely case: 'useless' or 'delete'
    1240           // evtState==0 can happen if the event was initialized (some data received)
    1241           // but the data did not make sense (e.g. inconsistent rois)
    1242           if (evt->evtStat==0 || evt->evtStat == 10000)
    1243           {
    1244               evtCtrl.erase(it); //event written--> free memory
    1245 
    1246               gj.evtWrite++;
    1247               gj.rateWrite++;
    1248           }
    1249 
    1250           // Remove leading invalidated slots from queue
    1251           // Do they exist at all?
    1252           if (evt->evtStat==-1)
    1253               evtCtrl.erase(it);
    1254       }
    1255 
    1256 
    1257 
    1258       // The number of complete events in the buffer is the total number of
    1259       // events in the buffer minus the number of incomplete events.
    1260       gj.bufEvt = count - gj.bufNew;
    1261 
    1262       gj.deltaT = 1000;      //temporary, must be improved
    1263 
    1264       for (int ib = 0; ib < NBOARDS; ib++)
    1265           gj.totBytes[ib] += gj.rateBytes[ib];
    1266 
    1267       gj.totMem = tgb_memory;
    1268 
    1269       if (gj.maxMem > gj.xxxMem)
    1270           gj.xxxMem = gj.maxMem;
    1271       if (gj.maxEvt > gj.xxxEvt)
    1272           gj.xxxEvt = gj.maxEvt;
    1273 
    1274       factStat (gj);
    1275       //factStatNew (gi);
    1276       gj.rateNew = gj.rateWrite = 0;
    1277       gj.maxMem = gj.usdMem;
    1278       gj.maxEvt = gj.bufTot;
    1279       for (int b = 0; b < NBOARDS; b++)
    1280           gj.rateBytes[b] = 0;
    1281 
    1282    } // while (g_runStat >= 0 && g_reset == 0)
    1283 
    1284    factPrintf(kInfo, -1, "Stop reading ... RESET=%d", g_reset);
    1285 
    1286    if (g_reset > 0)
    1287    {
    1288        gi_reset  = g_reset;
    1289        gi_resetR = gi_reset % 10;          //shall we stop reading ?
    1290        gi_resetS = (gi_reset / 10) % 10;   //shall we close sockets ?
    1291        gi_resetW = (gi_reset / 100) % 10;  //shall we close files ?
    1292        gi_resetX = gi_reset / 1000;        //shall we simply wait resetX seconds ?
    1293        g_reset   = 0;
    1294    }
    1295    else
    1296    {
    1297        gi_reset  = 0;
    1298        gi_resetR = g_runStat == -1 ? 1 : 7;
    1299 
    1300        gi_resetS = 7;            //close all sockets
    1301        gi_resetW = 7;            //close all files
    1302        gi_resetX = 0;
    1303 
    1304        //inform others we have to quit ....
    1305        gj.readStat = -11;        //inform all that no update to happen any more
    1306    }
    1307 
    1308    if (gi_resetS > 0)
    1309    {
    1310        //must close all open sockets ...
    1311        factPrintf(kInfo, -1, "Close all sockets...");
    1312 
    1313        for (int i = 0; i < NBOARDS; i++)
    1314        {
    1315            if (rd[i].sockStat != 0)
    1316                continue;
    1317 
    1318            GenSock(-1, i, 0, NULL, &rd[i]);   //close and destroy open socket
    1319 
    1320            gi_NumConnect[i]-= cntsock ;
    1321            gj.numConn[i]--;
    1322            sockDef[i] = 0;      //flag ro recreate the sockets ...
    1323            rd[i].sockStat = -1; //and try to open asap
    1324        }
    1325    }
    1326 
    1327 
    1328    if (gi_resetR > 0)
    1329    {
    1330        //and clear all buffers (might have to wait until all others are done)
    1331        while (evtCtrl.size())
    1332        {
    1333            const shared_ptr<EVT_CTRL> evt = evtCtrl.front();
    1334 
    1335            // flag incomplete events as 'read finished'
    1336            // We cannot just detele all events, because some might currently being processed,
    1337            // so we have to wait until the processing thread currently processing the event
    1338            // signals that the event can be deleted. (Note, that there are currently never
    1339            // two threads processing the same event at the same time)
    1340            if ((evt->evtStat>0 && evt->evtStat<90) || evt->evtStat==10000)
    1341                evtCtrl.pop_front();
    1342 
    1343            usleep(1);
    1344        }
    1345    }
    1346 
    1347    //queue.wait();
    1348    //queue.join();
    1349 
    1350    if (gi_reset > 0)
    1351    {
    1352       if (gi_resetW > 0)
    1353          CloseRunFile (0, 0, 0);        //ask all Runs to be closed
    1354 
    1355       if (gi_resetX > 0)
    1356       {
    1357           struct timespec xwait;
    1358           xwait.tv_sec = gi_resetX;
    1359           xwait.tv_nsec = 0;
    1360           nanosleep (&xwait, NULL);
    1361       }
    1362 
    1363       factPrintf(kInfo, -1, "Continue read Process ...");
    1364       gi_reset = 0;
    1365       goto START;
    1366    }
    1367 
    1368    factPrintf(kInfo, -1, "Exit read Process...");
    1369 
    1370    factPrintf(kInfo, -1, "%ld Bytes flagged as in-use.", tgb_inuse);
    1371 
    1372    gj.readStat = -99;
    1373 
    1374    factStat (gj);
    1375    //factStatNew (gi);
    1376 
    1377    return 0;
    1378 
    1379 } /*-----------------------------------------------------------------*/
    1380 /*
    1381 
    1382 void *subProc(void *thrid)
    1383 {
    1384     const int64_t threadID = (int64_t)thrid;
    1385 
    1386     factPrintf(kInfo, -1, "Starting sub-process-thread %ld", threadID);
    1387 
    1388     while (g_runStat > -2) //in case of 'exit' we still must process pending events
    1389     {
    1390         int numWait = 0;
    1391 
    1392         for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
    1393         {
    1394             const shared_ptr<EVT_CTRL> evt = *it;
    1395 
    1396            // This is a threading issue... the evtStat might have been invalid
    1397            // but the frstPtr is not yet updated
    1398            if (evt->evtStat==-1)
    1399                continue;
    1400 
    1401            // If we find the first event still waiting for processing
    1402            // there will be only unprocessed events after this one in the queue
    1403            if (evt->evtStat<1000+threadID)
    1404            {
    1405                numWait = 1;
    1406                break;
    1407            }
    1408 
    1409            // If the event was processed already, skip it
    1410            // We could replace that to a moving pointer pointing to the first
    1411            // non-processed event
    1412            if (evt->evtStat!=1000+threadID)
    1413                continue;
    1414 
    1415             const int jret = subProcEvt(threadID, evt->FADhead, evt->fEvent, 0);
    1416 
    1417             if (jret>0 && jret<=threadID)
    1418                 factPrintf(kError, -1, "Process %ld wants to send event to process %d... not allowed.", threadID, jret);
    1419 
    1420             if (jret<=threadID)
    1421             {
    1422                 evt->evtStat = 10000;            // flag as 'to be deleted'
    1423                 continue;
    1424             }
    1425 
    1426             if (jret>=gi_maxProc)
    1427             {
    1428                 evt->evtStat = 5000;            // flag as 'to be written'
    1429                 continue;
    1430             }
    1431 
    1432             evt->evtStat = 1000 + jret;       // flag for next proces
    1433         }
    1434 
    1435         if (gj.readStat < -10 && numWait == 0) {  //nothing left to do
    1436             factPrintf(kInfo, -1, "Exit subProcessing in process %ld", threadID);
    1437             return 0;
    1438         }
    1439 
    1440         usleep(1);
    1441     }
    1442 
    1443     factPrintf(kInfo, -1, "Ending sub-process-thread %ld", threadID);
    1444 
    1445     return 0;
    1446 }
    1447 
    1448 
    1449 void *
    1450 procEvt (void *ptr)
    1451 {
    1452    int status;
    1453 
    1454    factPrintf(kInfo, -1, "Starting process-thread with %d subprocesses", gi_maxProc);
    1455 
    1456    pthread_t thread[100];
    1457 //   int th_ret[100];
    1458 
    1459    for (long long k = 0; k < gi_maxProc; k++) {
    1460        pthread_create (&thread[k], NULL, subProc, (void *) k);
    1461    }
    1462 
    1463    // in case of 'exit' we still must process pending events
    1464    while (g_runStat > -2)
    1465    {
    1466        int numWait = 0;
    1467 
    1468        for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
    1469        {
    1470            const shared_ptr<EVT_CTRL> evt = *it;
    1471 
    1472            // This is a threading issue... the evtStat might have been invalid
    1473            // but the frstPtr is not yet updated
    1474            if (evt->evtStat==-1)
    1475                continue;
    1476 
    1477            // If we find the first incomplete event which is not supposed to
    1478            // be processed, there are only more incomplete events in the queue
    1479            if (evt->evtStat<90)
    1480            {
    1481                numWait = 1;
    1482                break;
    1483            }
    1484 
    1485            // If the event was processed already, skip it.
    1486            // We could replace that to a moving pointer pointing to the first
    1487            // non-processed event
    1488            if (evt->evtStat>=1000)
    1489                continue;
    1490 
    1491            //-------- it is better to open the run already here, so call can be used to initialize
    1492            //-------- buffers etc. needed to interprete run (e.g. DRS calibration)
    1493            const uint32_t irun = evt->runNum;
    1494            const int32_t  ievt = evt->evNum;
    1495 
    1496            const shared_ptr<RUN_CTRL> run = evt->run;
    1497            if (run->runId==0)
    1498                continue;
    1499 
    1500            // File not yet open
    1501            if (run->fileId < 0)
    1502            {
    1503                RUN_HEAD actRun;
    1504                actRun.Version =  1;
    1505                actRun.RunType = -1;  //to be adapted
    1506                actRun.Nroi    = evt->nRoi;      //runCtrl[lastRun].roi0;
    1507                actRun.NroiTM  = evt->nRoiTM;    //runCtrl[lastRun].roi8;
    1508                actRun.RunTime = evt->pcTime[0]; //runCtrl[lastRun].firstTime;
    1509                actRun.RunUsec = evt->pcTime[1]; //runCtrl[lastRun].firstUsec;
    1510                actRun.NBoard  = NBOARDS;
    1511                actRun.NPix    = NPIX;
    1512                actRun.NTm     = NTMARK;
    1513 
    1514                memcpy(actRun.FADhead, evt->FADhead, NBOARDS*sizeof(PEVNT_HEADER));
    1515 
    1516                run->fileHd = runOpen(irun, &actRun, sizeof (actRun));
    1517                if (run->fileHd == NULL)
    1518                {
    1519                    factPrintf(kError, 502, "procEvt: Could not open new file for run %d (evt=%d, runOpen failed)", irun, ievt);
    1520                    run->fileId = 91;
    1521                    continue;
    1522                }
    1523 
    1524                run->fileId = 0;
    1525 
    1526                factPrintf(kInfo, -1, "procEvt: Opened new file for run %d (evt=%d)", irun, ievt);
    1527            }
    1528 
    1529            //--------
    1530            //--------
    1531 
    1532            //and set correct event header ; also check for consistency in event (not yet)
    1533            evt->fEvent->Errors[0] = evt->Errors[0];
    1534            evt->fEvent->Errors[1] = evt->Errors[1];
    1535            evt->fEvent->Errors[2] = evt->Errors[2];
    1536            evt->fEvent->Errors[3] = evt->Errors[3];
    1537 
    1538            for (int ib=0; ib<NBOARDS; ib++)
    1539            {
    1540                // board is not read
    1541                if (evt->board[ib] == -1)
    1542                {
    1543                    evt->FADhead[ib].start_package_flag = 0;
    1544                    evt->fEvent->BoardTime[ib] = 0;
    1545                }
    1546                else
    1547                {
    1548                    evt->fEvent->BoardTime[ib] = evt->FADhead[ib].time;
    1549                }
    1550            }
    1551 
    1552            const int rc = eventCheck(evt->runNum, evt->FADhead, evt->fEvent);
    1553            if (rc < 0)
    1554            {
    1555                evt->evtStat = 10000;        // flag event to be deleted
    1556            }
    1557            else
    1558            {
    1559                evt->evtStat = 1000;       // flag 'start processing'
    1560                run->procEvt++;
    1561            }
    1562        }
    1563 
    1564        if (gj.readStat < -10 && numWait == 0) {  //nothing left to do
    1565            factPrintf(kInfo, -1, "Exit Processing Process ...");
    1566            gj.procStat = -22;     //==> we should exit
    1567            return 0;
    1568        }
    1569 
    1570        usleep(1);
    1571 
    1572        gj.procStat = gj.readStat;
    1573    }
    1574 
    1575    //we are asked to abort asap ==> must flag all remaining events
    1576    //   when gi_runStat claims that all events are in the buffer...
    1577 
    1578    factPrintf(kInfo, -1, "Abort Processing Process ...");
    1579 
    1580    for (int k = 0; k < gi_maxProc; k++) {
    1581       pthread_join (thread[k], (void **) &status);
    1582    }
    1583 
    1584    gj.procStat = -99;
    1585 
    1586    return 0;
    1587 
    1588 } */
    1589 
    1590 int
    1591 CloseRunFile (uint32_t runId, uint32_t closeTime, uint32_t maxEvt)
    1592 {
    1593 /* close run runId (all all runs if runId=0) */
    1594 /* return: 0=close scheduled / >0 already closed / <0 does not exist */
    1595 
    1596    if (runId == 0)
    1597    {
    1598        for (auto it=runCtrl.begin(); it!=runCtrl.end(); it++)
    1599        {
    1600            const shared_ptr<RUN_CTRL> run = it->second;
    1601 
    1602            //run is open
    1603            if (run->fileId == 0)
    1604            {
    1605                run->closeTime = closeTime;
    1606                run->maxEvt = maxEvt;
    1607            }
    1608        }
    1609        return 0;
    1610    }
    1611 
    1612    auto it=runCtrl.find(runId);
    1613    if (it==runCtrl.end())
    1614        return -1;
    1615 
    1616    const shared_ptr<RUN_CTRL> run = it->second;
    1617 
    1618    // run already closed
    1619    if (run->fileId>0)
    1620        return +2;
    1621 
    1622    run->closeTime = closeTime;
    1623    run->maxEvt = maxEvt;
    1624 
    1625    return run->fileId==0 ? 0 : 1;
    1626 
    1627 }
    1628 
    1629 void checkAndCloseRun(const shared_ptr<RUN_CTRL> &run, int cond, int where)
    1630 {
    1631     if (!cond &&
    1632         run->closeTime >= g_actTime &&
    1633         run->lastTime >= g_actTime - 300 &&
    1634         run->maxEvt > run->actEvt)
    1635         return;
    1636 
    1637     //close run for whatever reason
    1638     int ii = 0;
    1639     if (cond)
    1640         ii = 1;
    1641     if (run->closeTime < g_actTime)
    1642         ii |= 2; // = 2;
    1643     if (run->lastTime < g_actTime - 300)
    1644         ii |= 4; // = 3;
    1645     if (run->maxEvt <= run->actEvt)
    1646         ii |= 8; // = 4;
    1647 
    1648     run->closeTime = g_actTime - 1;
    1649 
    1650     const int rc = runClose(run->fileHd, NULL, 0);//&runTail[j], sizeof(runTail[j]));
    1651     if (rc<0)
    1652     {
    1653         factPrintf(kError, 503, "writeEvt-%d: Error closing run %d (runClose,rc=%d)",
    1654                    where, run->runId, rc);
    1655         run->fileId = 92+where*2;
    1656     }
    1657     else
    1658     {
    1659         factPrintf(kInfo, 503, "writeEvt-%d: Closed run %d (reason=%d)",
    1660                    where, run->runId, ii);
    1661         run->fileId = 93+where*2;
    1662     }
    1663 }
    1664 
    1665 /*-----------------------------------------------------------------*/
    1666 
    1667 /*
    1668 void *writeEvt (void *ptr)
    1669 {
    1670    factPrintf(kInfo, -1, "Starting write-thread");
    1671 
    1672    while (g_runStat > -2)
    1673    {
    1674        //int numWrite = 0;
    1675        int numWait  = 0;
    1676 
    1677        // Note that the current loop does not at all gurantee that
    1678        // the events are written in the correct order.
    1679        for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); it++)
    1680        {
    1681            const shared_ptr<EVT_CTRL> evt = *it;
    1682 
    1683            // This is a threading issue... the evtStat might have been invalid
    1684            // but the frstPtr is not yet updated
    1685            if (evt->evtStat==-1)
    1686                continue;
    1687 
    1688            // If we find the first non-written event which is not supposed to
    1689            // be written, there are only more incomplete events in the queue
    1690            if (evt->evtStat<5000)
    1691            {
    1692                numWait = 1;
    1693                break;
    1694            }
    1695 
    1696            // If the event was written already already, skip it
    1697            // We could replace that to a moving pointer pointing to the first
    1698            // non-processed event
    1699            if (evt->evtStat!=5000)
    1700                continue;
    1701 
    1702            const shared_ptr<RUN_CTRL> run = evt->run;
    1703            if (run->runId==0)
    1704                continue;
    1705 
    1706            // File is open
    1707            if (run->fileId==0)
    1708            {
    1709                const int rc = runWrite(run->fileHd, evt->fEvent, 0);
    1710                if (rc >= 0)
    1711                {
    1712                    // Sucessfully wrote event
    1713                    run->lastTime = g_actTime;
    1714                    run->actEvt++;
    1715                }
    1716                else
    1717                    factPrintf(kError, 503, "writeEvt: Writing event for run %d failed (runWrite)", evt->runNum);
    1718 
    1719                checkAndCloseRun(run, rc<0, 1);
    1720            }
    1721 
    1722            evt->evtStat = 10000;  // event written (or has to be discarded) -> delete
    1723        }
    1724 
    1725        // Although the are no pending events, we have to check if a run should be closed (timeout)
    1726        for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
    1727        {
    1728            if (ir->second->fileId == 0)
    1729            {
    1730                //ETIENNE added the condition at this line. dunno what to do with run 0: skipping it
    1731                const int cond = ir->second->runId == 0;
    1732                checkAndCloseRun(ir->second, cond, 2);
    1733            }
    1734        }
    1735 
    1736        usleep(1);
    1737 
    1738        //nothing left to do
    1739        if (gj.readStat < -10 && numWait == 0)
    1740        {
    1741            factPrintf(kInfo, -1, "Finish Write Process ...");
    1742            gj.writStat = -22;     //==> we should exit
    1743            break;
    1744        }
    1745 
    1746        gj.writStat = gj.readStat;
    1747    }
    1748 
    1749    factPrintf(kInfo, -1, "Close all open files ...");
    1750    for (auto ir=runCtrl.begin(); ir!=runCtrl.end(); ir++)
    1751    {
    1752        if (ir->second->fileId == 0)
    1753            checkAndCloseRun(ir->second, 1, 3);
    1754    }
    1755 
    1756    gj.writStat = -99;
    1757 
    1758    factPrintf(kInfo, -1, "Exit Writing Process ...");
    1759 
    1760    return 0;
    1761 }
    1762    */
    1763 
    1764 
    1765 
    1766 
    1767 void
    1768 StartEvtBuild ()
    1769 {
    1770 
    1771    int i, /*j,*/ imax, status/*, th_ret[50]*/;
    1772    pthread_t thread[50];
    1773    struct timespec xwait;
    1774 
    1775    gj.readStat = gj.procStat = gj.writStat = 0;
    1776 
    1777    factPrintf(kInfo, -1, "Starting EventBuilder V15.07 A");
    1778 
    1779 
    1780    gi_maxProc = g_maxProc;
    1781    if (gi_maxProc <= 0 || gi_maxProc > 90) {
    1782       factPrintf(kFatal, 301, "Illegal number of processes %d", gi_maxProc);
    1783       gi_maxProc = 1;
    1784    }
    1785 
    1786 //start all threads (more to come) when we are allowed to ....
    1787    while (g_runStat == 0) {
    1788       xwait.tv_sec = 0;
    1789       xwait.tv_nsec = 10000000; // sleep for ~10 msec
    1790       nanosleep (&xwait, NULL);
    1791    }
    1792 
    1793    i = 0;
    1794    /*th_ret[i] =*/ pthread_create (&thread[i], NULL, readFAD, NULL);
    1795    i++;
    1796    ///*th_ret[i] =*/ pthread_create (&thread[i], NULL, procEvt, NULL);
    1797    //i++;
    1798    ///*th_ret[i] =*/ pthread_create (&thread[i], NULL, writeEvt, NULL);
    1799    //i++;
    1800    imax = i;
    1801 
    1802 
    1803 #ifdef BILAND
    1804    xwait.tv_sec = 30;;
    1805    xwait.tv_nsec = 0;           // sleep for ~20sec
    1806    nanosleep (&xwait, NULL);
    1807 
    1808    printf ("close all runs in 2 seconds\n");
    1809 
    1810    CloseRunFile (0, time (NULL) + 2, 0);
    1811 
    1812    xwait.tv_sec = 1;;
    1813    xwait.tv_nsec = 0;           // sleep for ~20sec
    1814    nanosleep (&xwait, NULL);
    1815 
    1816    printf ("setting g_runstat to -1\n");
    1817 
    1818    g_runStat = -1;
     1179        } // end for loop over all sockets
     1180
     1181        // ==================================================================
     1182
     1183        // +1 -> idx=0
     1184        // -1 -> idx=0
     1185        // +2 -> idx=0
     1186        // -2 -> idx=0
     1187        // +3 -> idx=0
     1188        // -3 -> idx=0
     1189        // +4 -> idx=0
     1190        // -4 -> idx=0
     1191        // +5 -> idx=0
     1192        // -5 -> idx=0
     1193        // +6 -> idx=0
     1194        // -6 -> idx=0
     1195        //
     1196
     1197        // ==================================================================
     1198
     1199        const time_t actTime = time(NULL);
     1200        if (actTime == gi_SecTime)
     1201        {
     1202#if !defined(USE_SELECT) && !defined(USE_EPOLL)
     1203            if (evtCtrl.size()==0)
     1204                usleep(1);
    18191205#endif
    1820 
    1821 
    1822 //wait for all threads to finish
    1823    for (i = 0; i < imax; i++) {
    1824       /*j =*/ pthread_join (thread[i], (void **) &status);
    1825    }
    1826 
    1827 } /*-----------------------------------------------------------------*/
    1828 
    1829 
    1830 
    1831 
    1832 
    1833 
    1834 
    1835 
    1836 
    1837 
    1838 
    1839 
    1840 
    1841 
    1842 
    1843   /*-----------------------------------------------------------------*/
    1844   /*-----------------------------------------------------------------*/
    1845   /*-----------------------------------------------------------------*/
    1846   /*-----------------------------------------------------------------*/
    1847   /*-----------------------------------------------------------------*/
    1848 
    1849 #ifdef BILAND
    1850 
    1851 int
    1852 subProcEvt (int threadID, PEVNT_HEADER * fadhd, EVENT * event,
    1853             int8_t * buffer)
    1854 {
    1855    printf ("called subproc %d\n", threadID);
    1856    return threadID + 1;
    1857 }
    1858 
    1859 
    1860 
    1861 
    1862   /*-----------------------------------------------------------------*/
    1863   /*-----------------------------------------------------------------*/
    1864   /*-----------------------------------------------------------------*/
    1865   /*-----------------------------------------------------------------*/
    1866   /*-----------------------------------------------------------------*/
    1867 
    1868 
    1869 
    1870 
    1871 FileHandle_t
    1872 runOpen (uint32_t irun, RUN_HEAD * runhd, size_t len)
    1873 {
    1874    return 1;
    1875 };
    1876 
    1877 int
    1878 runWrite (FileHandle_t fileHd, EVENT * event, size_t len)
    1879 {
    1880    return 1;
    1881    usleep (10000);
    1882    return 1;
    1883 }
    1884 
    1885 
    1886 //{ return 1; } ;
    1887 
    1888 int
    1889 runClose (FileHandle_t fileHd, RUN_TAIL * runth, size_t len)
    1890 {
    1891    return 1;
    1892 };
    1893 
    1894 
    1895 
    1896 
    1897 int
    1898 eventCheck (uint32_t runNr, PEVNT_HEADER * fadhd, EVENT * event)
    1899 {
    1900    int i = 0;
    1901 
    1902 // printf("------------%d\n",ntohl(fadhd[7].fad_evt_counter) );
    1903 // for (i=0; i<NBOARDS; i++) {
    1904 //    printf("b=%2d,=%5d\n",i,fadhd[i].board_id);
    1905 // }
    1906    return 0;
    1907 }
    1908 
    1909 
    1910 void
    1911 factStatNew (EVT_STAT gi)
    1912 {
    1913    int i;
    1914 
    1915 //for (i=0;i<MAX_SOCK;i++) {
    1916 //   printf("%4d",gi.numRead[i]);
    1917 //   if (i%20 == 0 ) printf("\n");
    1918 //}
    1919 }
    1920 
    1921 void
    1922 gotNewRun (int runnr, PEVNT_HEADER * headers)
    1923 {
    1924    printf ("got new run %d\n", runnr);
    1925    return;
    1926 }
    1927 
    1928 void
    1929 factStat (GUI_STAT gj)
    1930 {
    1931 //  printf("stat: bfr%5lu skp%4lu free%4lu (tot%7lu) mem%12lu rd%12lu %3lu\n",
    1932 //    array[0],array[1],array[2],array[3],array[4],array[5],array[6]);
    1933 }
    1934 
    1935 
    1936 void
    1937 debugRead (int isock, int ibyte, int32_t event, int32_t ftmevt, int32_t runnr,
    1938            int state, uint32_t tsec, uint32_t tusec)
    1939 {
    1940 //  printf("%3d %5d %9d %3d %12d\n",isock, ibyte, event, state, tusec) ;
    1941 }
    1942 
    1943 
    1944 
    1945 void
    1946 debugStream (int isock, void *buf, int len)
    1947 {
    1948 }
    1949 
    1950 void
    1951 debugHead (int i, int j, void *buf)
    1952 {
    1953 }
    1954 
    1955 
    1956 void
    1957 factOut (int severity, int err, char *message)
    1958 {
    1959    static FILE *fd;
    1960    static int file = 0;
    1961 
    1962    if (file == 0) {
    1963       printf ("open file\n");
    1964       fd = fopen ("x.out", "w+");
    1965       file = 999;
    1966    }
    1967 
    1968    fprintf (fd, "%3d %3d | %s \n", severity, err, message);
    1969 
    1970    if (severity != kDebug)
    1971       printf ("%3d %3d | %s\n", severity, err, message);
    1972 }
    1973 
    1974 
    1975 
    1976 int
    1977 main ()
    1978 {
    1979    int i, b, c, p;
    1980    char ipStr[100];
    1981    struct in_addr IPaddr;
    1982 
    1983    g_maxMem = 1024 * 1024;      //MBytes
    1984    g_maxMem = g_maxMem * 200;   //100MBytes
    1985 
    1986    g_maxProc = 20;
    1987 
    1988    g_runStat = 40;
    1989 
    1990    i = 0;
    1991 
    1992 // version for standard crates
    1993 //for (c=0; c<4,c++) {
    1994 //   for (b=0; b<10; b++) {
    1995 //      sprintf(ipStr,"10.0.%d.%d",128+c,128+b)
    1996 //
    1997 //      inet_pton(PF_INET, ipStr, &IPaddr) ;
    1998 //
    1999 //      g_port[i].sockAddr.sin_family = PF_INET;
    2000 //      g_port[i].sockAddr.sin_port = htons(5000) ;
    2001 //      g_port[i].sockAddr.sin_addr = IPaddr ;
    2002 //      g_port[i].sockDef = 1 ;
    2003 //      i++ ;
    2004 //   }
    2005 //}
    2006 //
    2007 //version for PC-test *
    2008    for (c = 0; c < 4; c++) {
    2009       for (b = 0; b < 10; b++) {
    2010          sprintf (ipStr, "10.0.%d.11", 128 + c);
    2011          if (c < 2)
    2012             sprintf (ipStr, "10.0.%d.11", 128);
    2013          else
    2014             sprintf (ipStr, "10.0.%d.11", 131);
    2015 //      if (c==0) sprintf(ipStr,"10.0.100.11") ;
    2016 
    2017          inet_pton (PF_INET, ipStr, &IPaddr);
    2018          p = 31919 + 100 * c + 10 * b;
    2019 
    2020 
    2021          g_port[i].sockAddr.sin_family = PF_INET;
    2022          g_port[i].sockAddr.sin_port = htons (p);
    2023          g_port[i].sockAddr.sin_addr = IPaddr;
    2024          g_port[i].sockDef = 1;
    2025 
    2026          i++;
    2027       }
    2028    }
    2029 
    2030 
    2031 //g_port[17].sockDef =-1 ;
    2032 //g_actBoards-- ;
    2033 
    2034    StartEvtBuild ();
    2035 
    2036    return 0;
    2037 
    2038 }
    2039 #endif
     1206            continue;
     1207        }
     1208        gi_SecTime = actTime;
     1209
     1210        // ==================================================================
     1211        //loop over all active events and flag those older than read-timeout
     1212        //delete those that are written to disk ....
     1213        //const int count = evtCtrl.size();
     1214
     1215        // This could be improved having the pointer which separates the queue with
     1216        // the incomplete events from the queue with the complete events
     1217        for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
     1218        {
     1219            // A reference is enough because the shared_ptr is hold by the evtCtrl
     1220            const shared_ptr<EVT_CTRL2> &evt = *it;
     1221
     1222            // The first event is the oldest. If the first event within the
     1223            // timeout window was received, we can stop searchinf further.
     1224            if (evt->time.tv_sec>=actTime - 30)
     1225                break;
     1226
     1227            // This will result in the emission of a dim service.
     1228            // It doesn't matter if that takes comparably long,
     1229            // because we have to stop the run anyway.
     1230            const uint64_t rep = reportIncomplete(evt, "timeout");
     1231            factReportIncomplete(rep);
     1232
     1233            it++;
     1234            evtCtrl.pop_front();
     1235        }
     1236
     1237        // =================================================================
     1238
     1239        // If nothing was received for more than 5min, close file
     1240        if (actTime-actrun->lastTime>300)
     1241            actrun->maxEvt = actrun->lastEvt;
     1242
     1243        // This is a fake event to trigger possible run-closing conditions once a second
     1244        // FIXME: This is not yet ideal because a file would never be closed
     1245        //        if a new file has been started and no events of the new file
     1246        //        have been received yet
     1247        if (actrun->fileStat==kFileOpen)
     1248            primaryQueue.post(shared_ptr<EVT_CTRL2>(new EVT_CTRL2(actrun)));
     1249
     1250        // =================================================================
     1251
     1252        gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
     1253        gj.usdMem = Memory::max_inuse;
     1254        gj.totMem = Memory::allocated;
     1255
     1256        gj.deltaT = 1000; // temporary, must be improved
     1257
     1258        for (int ib=0; ib<NBOARDS; ib++)
     1259        {
     1260            gj.rateBytes[ib]  = rd[ib].rateBytes;
     1261            gj.totBytes[ib]  += rd[ib].rateBytes;
     1262
     1263            rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr);
     1264
     1265            gi_NumConnect[ib] = rd[ib].connected;
     1266            gj.numConn[ib]    = rd[ib].connected;
     1267        }
     1268
     1269        factStat(gj);
     1270
     1271        Memory::max_inuse = 0;
     1272
     1273        for (int ib=0; ib<NBOARDS; ib++)
     1274            rd[ib].rateBytes = 0;
     1275    }
     1276
     1277    //   1: Stop, wait for event to get processed
     1278    //   2: Stop, finish immediately
     1279    // 101: Restart, wait for events to get processed
     1280    // 101: Restart, finish immediately
     1281    //
     1282    const int gi_reset = g_reset;
     1283
     1284    const bool abort = gi_reset%100==2;
     1285
     1286    factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
     1287
     1288    primaryQueue.wait(abort);
     1289    secondaryQueue.wait(abort);
     1290
     1291    // Here we also destroy all runCtrl structures and hence close all open files
     1292    evtCtrl.clear();
     1293
     1294    factPrintf(MessageImp::kInfo, "Exit read Process...");
     1295    factPrintf(MessageImp::kInfo, "%ld Bytes flagged as in-use.", Memory::inuse);
     1296
     1297    factStat(gj);
     1298
     1299    return gi_reset>=100;
     1300}
     1301
     1302// ==========================================================================
     1303// ==========================================================================
     1304
     1305void StartEvtBuild()
     1306{
     1307    factPrintf(MessageImp::kInfo, "Starting EventBuilder++");
     1308
     1309
     1310    for (int k=0; k<NBOARDS; k++)
     1311    {
     1312        gi_NumConnect[k] = 0;
     1313        gj.numConn[k] = 0;
     1314        gj.totBytes[k] = 0;
     1315    }
     1316
     1317    gj.bufTot   = gj.maxEvt = gj.xxxEvt = 0;
     1318    gj.maxMem   = gj.xxxMem = 0;
     1319
     1320    gj.usdMem   = Memory::inuse;
     1321    gj.totMem   = Memory::allocated;
     1322
     1323    gj.bufNew   = gj.bufEvt = 0;
     1324    gj.evtSkip  = gj.evtWrite = gj.evtErr = 0;
     1325    gj.readStat = gj.procStat = gj.writStat = 0;
     1326
     1327
     1328
     1329    READ_STRUCT rd[NBOARDS];
     1330
     1331    // This is only that every socket knows its id (maybe we replace that by arrays instead of an array of sockets)
     1332    for (int i=0; i<NBOARDS; i++)
     1333        rd[i].sockId = i;
     1334
     1335    while (mainloop(rd));
     1336
     1337    //must close all open sockets ...
     1338    factPrintf(MessageImp::kInfo, "Close all sockets...");
     1339
     1340    READ_STRUCT::close();
     1341
     1342    // Now all sockets get closed. This is not reflected in gi_NumConnect
     1343    // The current workaround is to count all sockets as closed when the thread is not running
     1344}
Note: See TracChangeset for help on using the changeset viewer.