Changeset 15477


Ignore:
Timestamp:
05/01/13 18:03:04 (12 years ago)
Author:
tbretz
Message:
Moved all Dim emitting services called from the event builder#s main loop to their own threads.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/EventBuilderWrapper.h

    r15414 r15477  
    5454
    5555private:
    56     boost::thread fThread;
     56    boost::thread fThreadMain;
     57    boost::thread fThreadDimQueue1;
     58    boost::thread fThreadDimQueue2;
     59
     60    bool fThreadDimQueueStop;
     61
     62    deque<pair<Time,GUI_STAT>> fDimQueue1;
     63    deque<tuple<Time,bool,FAD::EventHeader>> fDimQueue2;
     64
     65    mutex fMutexDimQueue1;
     66    mutex fMutexDimQueue2;
     67
    5768
    5869    enum CommandStates_t // g_runStat
     
    177188public:
    178189    EventBuilderWrapper(MessageImp &imp) : fMsg(imp),
     190        fThreadDimQueueStop(false),
    179191        fFileFormat(FAD::kNone), fMaxRun(0), fLastOpened(0), fLastClosed(0),
    180192        fDimWriteStats  ("FAD_CONTROL", imp),
     
    272284        for (size_t i=0; i<40; i++)
    273285            ConnectSlot(i, tcp::endpoint());
    274     }
     286
     287        fThreadDimQueue1 = boost::thread(boost::bind(&EventBuilderWrapper::SendStat, this));
     288        fThreadDimQueue2 = boost::thread(boost::bind(&EventBuilderWrapper::ProcHeader, this));
     289    }
     290
    275291    virtual ~EventBuilderWrapper()
    276292    {
    277293        Abort();
     294        fThreadDimQueueStop = true;
    278295
    279296        // FIXME: Used timed_join and abort afterwards
    280297        //        What's the maximum time the eb need to abort?
    281         fThread.join();
     298        fThreadMain.join();
     299        fThreadDimQueue1.join();
     300        fThreadDimQueue2.join();
     301
    282302        //ffMsg.Info("EventBuilder stopped.");
    283303
     
    318338    bool IsThreadRunning()
    319339    {
    320         return !fThread.timed_join(boost::posix_time::microseconds(0));
     340        return !fThreadMain.timed_join(boost::posix_time::microseconds(0));
    321341    }
    322342
     
    351371        fMsg.Message("Starting EventBuilder thread");
    352372
    353         fThread = boost::thread(StartEvtBuild);
     373        fThreadMain = boost::thread(StartEvtBuild);
    354374    }
    355375    void ConnectSlot(unsigned int i, const tcp::endpoint &addr)
     
    12411261    }
    12421262
     1263    void SendStat()
     1264    {
     1265        // Performance could be increased signaling the thread to wake up
     1266        while (!fThreadDimQueueStop)
     1267        {
     1268            if (fDimQueue1.size()==0)
     1269            {
     1270                usleep(1000);
     1271                continue;
     1272            }
     1273
     1274            fMutexDimQueue1.lock();
     1275            const pair<Time,GUI_STAT> stat = fDimQueue1.front();
     1276            fDimQueue1.pop_front();
     1277            fMutexDimQueue1.unlock();
     1278
     1279            fDimStatistics1.setData(&stat.second, sizeof(GUI_STAT));
     1280            fDimStatistics1.Update(stat.first);
     1281        }
     1282    }
     1283
    12431284    void factStat(const GUI_STAT &stat)
    12441285    {
    1245         fDimStatistics1.Update(stat);
     1286        const lock_guard<mutex> guard(fMutexDimQueue1);
     1287        fDimQueue1.push_back(make_pair(Time(), stat));
    12461288    }
    12471289
     
    13441386
    13451387    template<typename T, size_t N>
    1346     void Update(DimDescribedService &svc, const array<T, N> &data, int n=N)
     1388    void Update(DimDescribedService &svc, const array<T, N> &data, const Time &t=Time(), int n=N)
    13471389    {
    13481390//        svc.setQuality(vec[40]<=vec[41]);
    13491391        svc.setData(const_cast<T*>(data.data()), sizeof(T)*n);
    1350         svc.Update();
     1392        svc.Update(t);
    13511393    }
    13521394
     
    13621404    vector<uint> fNumConnected;
    13631405
     1406    void ProcHeader()
     1407    {
     1408        // Performance could be increased signaling the thread to wake up
     1409        while (!fThreadDimQueueStop)
     1410        {
     1411            if (fDimQueue2.size()==0)
     1412            {
     1413                usleep(100);
     1414                continue;
     1415            }
     1416
     1417            fMutexDimQueue2.lock();
     1418            const auto dat = fDimQueue2.front();
     1419            fDimQueue2.pop_front();
     1420            fMutexDimQueue2.unlock();
     1421
     1422            const Time             &t = get<0>(dat);
     1423            const bool        changed = get<1>(dat);
     1424            const FAD::EventHeader &h = get<2>(dat);
     1425
     1426            const FAD::EventHeader old = fVecHeader[h.Id()];
     1427            fVecHeader[h.Id()] = h;
     1428
     1429            if (old.fVersion != h.fVersion || changed)
     1430            {
     1431                const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
     1432
     1433                array<float,42> data;
     1434                for (int i=0; i<42; i++)
     1435                {
     1436                    ostringstream str;
     1437                    str << (ver[i]>>8) << '.' << (ver[i]&0xff);
     1438                    data[i] = stof(str.str());
     1439                }
     1440                Update(fDimFwVersion, data, t);
     1441            }
     1442
     1443            if (old.fRunNumber != h.fRunNumber || changed)
     1444            {
     1445                const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
     1446                fDimRunNumber.setData(&run[0], 42*sizeof(uint32_t));
     1447                fDimRunNumber.Update(t);
     1448            }
     1449
     1450            if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
     1451            {
     1452                const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
     1453                fDimPrescaler.setData(&pre[0], 42*sizeof(uint16_t));
     1454                fDimPrescaler.Update(t);
     1455            }
     1456
     1457            if (old.fDNA != h.fDNA || changed)
     1458            {
     1459                const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
     1460                Update(fDimDNA, dna, t, 40);
     1461            }
     1462
     1463            if (old.fStatus != h.fStatus || changed)
     1464            {
     1465                const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
     1466                Update(fDimStatus, sts, t);
     1467            }
     1468
     1469            if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
     1470            {
     1471                array<uint16_t, FAD::kNumDac*42> dacs;
     1472
     1473                for (int i=0; i<FAD::kNumDac; i++)
     1474                {
     1475                    const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
     1476                    memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
     1477                }
     1478
     1479                Update(fDimDac, dacs, t);
     1480            }
     1481
     1482            // -----------
     1483
     1484            static Time oldt(boost::date_time::neg_infin);
     1485            Time newt;
     1486
     1487            if (newt>oldt+boost::posix_time::seconds(1))
     1488            {
     1489                oldt = newt;
     1490
     1491                // --- RefClock
     1492
     1493                const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
     1494                Update(fDimRefClock, clk, t);
     1495
     1496                // --- Temperatures
     1497
     1498                const array<int16_t,42> tmp[4] =
     1499                {
     1500                    Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]),    // 0-39:val, 40:min, 41:max
     1501                    Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]),    // 0-39:val, 40:min, 41:max
     1502                    Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]),    // 0-39:val, 40:min, 41:max
     1503                    Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3])     // 0-39:val, 40:min, 41:max
     1504                };
     1505
     1506                vector<int16_t> data;
     1507                data.reserve(82);
     1508                data.push_back(tmp[0][40]);                                 // min: 0
     1509                data.insert(data.end(), tmp[0].data(), tmp[0].data()+40);   // val: 1-40
     1510                data.push_back(tmp[0][41]);                                 // max: 41
     1511                data.insert(data.end(), tmp[0].data(), tmp[0].data()+40);   // val: 42-81
     1512
     1513                for (int j=1; j<=3; j++)
     1514                {
     1515                    const array<int16_t,42> &ref = tmp[j];
     1516
     1517                    // Gloabl min
     1518                    if (ref[40]<data[0])           // 40=min
     1519                        data[0] = ref[40];
     1520
     1521                    // Global max
     1522                    if (ref[41]>data[41])          // 41=max
     1523                        data[41] = ref[41];
     1524
     1525                    for (int i=0; i<40; i++)
     1526                    {
     1527                        // min per board
     1528                        if (ref[i]<data[i+1])      // data:  1-40
     1529                            data[i+1] = ref[i];    // ref:   0-39
     1530
     1531                        // max per board
     1532                        if (ref[i]>data[i+42])     // data: 42-81
     1533                            data[i+42] = ref[i];   // ref:   0-39
     1534                    }
     1535                }
     1536
     1537                vector<float> deg(82);              //  0: global min,  1-40: min
     1538                for (int i=0; i<82; i++)            // 41: global max, 42-81: max
     1539                    deg[i] = data[i]/16.;
     1540
     1541                fDimTemperature.setData(deg.data(), 82*sizeof(float));
     1542                fDimTemperature.Update(t);
     1543            }
     1544
     1545        }
     1546    }
     1547
    13641548    void debugHead(int /*socket*/, const FAD::EventHeader &h)
    13651549    {
     
    13771561        fNumConnected = con;
    13781562
    1379         const FAD::EventHeader old = fVecHeader[id];
    1380         fVecHeader[id] = h;
    1381 
    1382         if (old.fVersion != h.fVersion || changed)
    1383         {
    1384             const array<uint16_t,42> ver = Compare(&fVecHeader[0], &fVecHeader[0].fVersion);
    1385 
    1386             array<float,42> data;
    1387             for (int i=0; i<42; i++)
    1388             {
    1389                 ostringstream str;
    1390                 str << (ver[i]>>8) << '.' << (ver[i]&0xff);
    1391                 data[i] = stof(str.str());
    1392             }
    1393             Update(fDimFwVersion, data);
    1394         }
    1395 
    1396         if (old.fRunNumber != h.fRunNumber || changed)
    1397         {
    1398             const array<uint32_t,42> run = Compare(&fVecHeader[0], &fVecHeader[0].fRunNumber);
    1399             fDimRunNumber.Update(run);
    1400         }
    1401 
    1402         if (old.fTriggerGeneratorPrescaler != h.fTriggerGeneratorPrescaler || changed)
    1403         {
    1404             const array<uint16_t,42> pre = Compare(&fVecHeader[0], &fVecHeader[0].fTriggerGeneratorPrescaler);
    1405             fDimPrescaler.Update(pre);
    1406         }
    1407 
    1408         if (old.fDNA != h.fDNA || changed)
    1409         {
    1410             const array<uint64_t,42> dna = Compare(&fVecHeader[0], &fVecHeader[0].fDNA);
    1411             Update(fDimDNA, dna, 40);
    1412         }
    1413 
    1414         if (old.fStatus != h.fStatus || changed)
    1415         {
    1416             const array<uint16_t,42> sts = CompareBits(&fVecHeader[0], &fVecHeader[0].fStatus);
    1417             Update(fDimStatus, sts);
    1418         }
    1419 
    1420         if (memcmp(old.fDac, h.fDac, sizeof(h.fDac)) || changed)
    1421         {
    1422             array<uint16_t, FAD::kNumDac*42> dacs;
    1423 
    1424             for (int i=0; i<FAD::kNumDac; i++)
    1425             {
    1426                 const array<uint16_t, 42> dac = Compare(&fVecHeader[0], &fVecHeader[0].fDac[i]);
    1427                 memcpy(&dacs[i*42], &dac[0], sizeof(uint16_t)*42);
    1428             }
    1429 
    1430             Update(fDimDac, dacs);
    1431         }
    1432 
    1433         // -----------
    1434 
    1435         static Time oldt(boost::date_time::neg_infin);
    1436         Time newt;
    1437 
    1438         if (newt>oldt+boost::posix_time::seconds(1))
    1439         {
    1440             oldt = newt;
    1441 
    1442             // --- RefClock
    1443 
    1444             const array<uint32_t,42> clk = Compare(&fVecHeader[0], &fVecHeader[0].fFreqRefClock);
    1445             Update(fDimRefClock, clk);
    1446 
    1447             // --- Temperatures
    1448 
    1449             const array<int16_t,42> tmp[4] =
    1450             {
    1451                 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[0]),    // 0-39:val, 40:min, 41:max
    1452                 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[1]),    // 0-39:val, 40:min, 41:max
    1453                 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[2]),    // 0-39:val, 40:min, 41:max
    1454                 Compare(&fVecHeader[0], &fVecHeader[0].fTempDrs[3])     // 0-39:val, 40:min, 41:max
    1455             };
    1456 
    1457             vector<int16_t> data;
    1458             data.reserve(82);
    1459             data.push_back(tmp[0][40]);                                 // min: 0
    1460             data.insert(data.end(), tmp[0].data(), tmp[0].data()+40);   // val: 1-40
    1461             data.push_back(tmp[0][41]);                                 // max: 41
    1462             data.insert(data.end(), tmp[0].data(), tmp[0].data()+40);   // val: 42-81
    1463 
    1464             for (int j=1; j<=3; j++)
    1465             {
    1466                 const array<int16_t,42> &ref = tmp[j];
    1467 
    1468                 // Gloabl min
    1469                 if (ref[40]<data[0])           // 40=min
    1470                     data[0] = ref[40];
    1471 
    1472                 // Global max
    1473                 if (ref[41]>data[41])          // 41=max
    1474                     data[41] = ref[41];
    1475 
    1476                 for (int i=0; i<40; i++)
    1477                 {
    1478                     // min per board
    1479                     if (ref[i]<data[i+1])      // data:  1-40
    1480                         data[i+1] = ref[i];    // ref:   0-39
    1481 
    1482                     // max per board
    1483                     if (ref[i]>data[i+42])     // data: 42-81
    1484                         data[i+42] = ref[i];   // ref:   0-39
    1485                 }
    1486 
    1487 
    1488            }
    1489 
    1490             vector<float> deg(82);              //  0: global min,  1-40: min
    1491             for (int i=0; i<82; i++)            // 41: global max, 42-81: max
    1492                 deg[i] = data[i]/16.;
    1493             fDimTemperature.Update(deg);
    1494         }
     1563        const lock_guard<mutex> guard(fMutexDimQueue2);
     1564        fDimQueue2.push_back(make_tuple(Time(), changed, h));
    14951565    }
    14961566};
Note: See TracChangeset for help on using the changeset viewer.