Changeset 16618 for trunk


Ignore:
Timestamp:
06/03/13 11:10:01 (11 years ago)
Author:
tbretz
Message:
Added a different way to count the byted for the statistics; added the possibility to choose 'read always the board which has transmitted the least number of bytes first'
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilder.cc

    r16584 r16618  
    77#include <cstdarg>
    88#include <list>
     9#include <queue>
    910
    1011#include <boost/algorithm/string/join.hpp>
     
    2526//#define USE_SELECT
    2627//#define COMPLETE_EPOLL
     28//#define PRIORITY_QUEUE
    2729
    2830// Reading only 1024: 13:  77Hz, 87%
     
    207209
    208210    timeval  time;
    209     uint64_t rateBytes;
    210     uint32_t skip;     // number of bytes skipped before start of event
     211    uint64_t totBytes;  // total received bytes
     212    uint64_t relBytes;  // total released bytes
     213    uint32_t skip;      // number of bytes skipped before start of event
    211214
    212215    uint32_t len() const { return uint32_t(H.package_length)*2; }
     
    217220    // --------------------------------
    218221
    219     READ_STRUCT() : socket(-1), connected(false), rateBytes(0)
     222    READ_STRUCT() : socket(-1), connected(false), totBytes(0), relBytes(0)
    220223    {
    221224        if (fd_epoll<0)
     
    231234    bool check(int, sockaddr_in addr);
    232235    bool read();
     236
    233237};
     238
     239#ifdef PRIORITY_QUEUE
     240struct READ_STRUCTcomp
     241{
     242    bool operator()(const READ_STRUCT *r1, const READ_STRUCT *r2)
     243    {
     244        const int64_t rel1 = r1->totBytes - r1->relBytes;
     245        const int64_t rel2 = r2->totBytes - r2->relBytes;
     246        return rel1 > rel2;
     247    }
     248};
     249#endif
    234250
    235251int READ_STRUCT::wait()
     
    382398    }
    383399
    384     bufPos = B;  // no byte read so far
    385     skip   = 0;       // start empty
     400    bufPos   = B;  // no byte read so far
     401    skip     = 0;  // start empty
     402    totBytes = 0;
     403    relBytes = 0;
    386404
    387405    factPrintf(MessageImp::kInfo, "Connected socket %d (%d)", sockId, socket);
     
    427445    }
    428446
    429     rateBytes += jrd;
     447    totBytes += jrd;
    430448
    431449    // are we skipping this board ...
     
    10761094#endif
    10771095
    1078 #if defined(USE_POLL)
     1096#ifdef PRIORITY_QUEUE
     1097        priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio;
     1098
     1099        for (int i=0; i<NBOARDS; i++)
     1100            if (rd[i].connected)
     1101                prio.push(rd+i);
     1102
     1103        if (!prio.empty()) do
     1104#endif
     1105
     1106
     1107#ifdef USE_POLL
    10791108        for (int jj=0; jj<nn; jj++)
    10801109#endif
    1081 #if defined(USE_EPOLL)
     1110#ifdef USE_EPOLL
    10821111        for (int jj=0; jj<rc_epoll; jj++)
    10831112#endif
    1084 #if !defined(USE_EPOLL) && !defined(USE_POLL)
     1113#if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE)
    10851114        for (int jj=0; jj<NBOARDS; jj++)
    10861115#endif
    10871116        {
     1117#ifdef PRIORITY_QUEUE
     1118            READ_STRUCT *rs = prio.top();
     1119#endif
    10881120#ifdef USE_SELECT
    10891121            if (!FD_ISSET(rs->socket, &readfs))
     
    11061138#endif
    11071139
    1108 #if !defined(USE_POLL) && !defined(USE_EPOLL)
     1140#if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE)
    11091141            const int i = (jj%4)*10 + (jj/4);
    11101142            READ_STRUCT *rs = &rd[i];
     
    12651297                    primaryQueue.post(evt);
    12661298
     1299                // package_len is 0 if nothing was received.
     1300                for (int ib=0; ib<40; ib++)
     1301                    rd[ib].relBytes += (*it)->FADhead[ib].package_length;
     1302
    12671303                it++;
    12681304                evtCtrl.pop_front();
     
    12991335#endif
    13001336        } // end for loop over all sockets
     1337#ifdef PRIORITY_QUEUE
     1338        while (0); // convert continue into break ;)
     1339#endif
    13011340
    13021341        // ==================================================================
     
    13341373            const uint64_t rep = reportIncomplete(evt, "timeout");
    13351374            factReportIncomplete(rep);
     1375
     1376            // package_len is 0 when nothing was received from this board
     1377            for (int ib=0; ib<40; ib++)
     1378                rd[ib].relBytes += (*it)->FADhead[ib].package_length;
    13361379
    13371380            it++;
     
    13541397        bool changed = false;
    13551398
     1399        static vector<uint64_t> store(NBOARDS);
     1400
    13561401        for (int ib=0; ib<NBOARDS; ib++)
    13571402        {
    1358             gj.rateBytes[ib]  = rd[ib].rateBytes;
    1359             gj.totBytes[ib]  += rd[ib].rateBytes;
     1403            gj.rateBytes[ib] = store[ib]>rd[ib].totBytes ? rd[ib].totBytes : rd[ib].totBytes-store[ib];
     1404            gj.relBytes[ib]  = rd[ib].totBytes-rd[ib].relBytes;
     1405
     1406            store[ib] = rd[ib].totBytes;
    13601407
    13611408            if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
     
    13691416
    13701417        Memory::max_inuse = 0;
    1371 
    1372         for (int ib=0; ib<NBOARDS; ib++)
    1373             rd[ib].rateBytes = 0;
    13741418
    13751419        // =================================================================
Note: See TracChangeset for help on using the changeset viewer.