Changeset 17243 for trunk/Mars/mcore


Ignore:
Timestamp:
10/17/13 16:49:25 (11 years ago)
Author:
tbretz
Message:
Encapsulated the header with a guard (I don't understand why this was not yet done); reworked the static setup functions to avoid the need for a source file if included in multiple targets; simplified SetNumThreads... a good documentation is enough.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/Mars/mcore/zofits.h

    r17239 r17243  
     1#ifndef FACT_zofits
     2#define FACT_zofits
     3
    14/*
    25 * zofits.h
     
    5760
    5861public:
     62        /// static setter for the default number of threads to use. -1 means all available physical cores
     63        static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
     64        static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
     65        static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
     66        static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
     67
    5968        /// constructors
    6069        /// @param numTiles how many data groups should be pre-reserved ?
    6170        /// @param rowPerTile how many rows will be grouped together in a single tile
    6271        /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
    63         zofits(uint32_t numTiles    = fgNumTiles,
    64                uint32_t rowPerTile  = fgRowPerTile,
    65                uint64_t maxUsableMem= fgMaxUsableMem) : ofits(),
    66                                                         fMemPool(0, maxUsableMem),
     72        zofits(uint32_t numTiles    = DefaultMaxNumTiles(),
     73               uint32_t rowPerTile  = DefaultNumRowsPerTile(),
     74               uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
     75                                                        fMemPool(0, maxUsableMem*1000),
    6776                                                        fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
    6877        {
    69             InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
    70             SetNumThreads(fgNumQueues);
     78            InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
     79            SetNumThreads(DefaultNumThreads());
    7180        }
    7281
     
    7685        /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
    7786        zofits(const char* fname,
    78                uint32_t numTiles    = fgNumTiles,
    79                uint32_t rowPerTile  = fgRowPerTile,
    80                uint64_t maxUsableMem= fgMaxUsableMem) : ofits(fname),
    81                                                         fMemPool(0, maxUsableMem),
     87               uint32_t numTiles    = DefaultMaxNumTiles(),
     88               uint32_t rowPerTile  = DefaultNumRowsPerTile(),
     89               uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname),
     90                                                        fMemPool(0, maxUsableMem*1000),
    8291                                                        fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
    8392        {
    84             InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
    85             SetNumThreads(fgNumQueues);
    86         }
    87 
    88         /// destructors
    89         virtual ~zofits()
    90         {
     93            InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
     94            SetNumThreads(DefaultNumThreads());
    9195        }
    9296
     
    577581        }
    578582
    579         /// static setter for the default number of threads to use. -1 means all available physical cores
    580         static void SetDefaultNumThreads   (int32_t num)   { fgNumQueues = num;}
    581         static void SetDefaultNumTiles     (uint32_t num)  { fgNumTiles = num;}
    582         static void SetDefaultNumRowPerTile(uint32_t num)  { fgRowPerTile = num;}
    583         static void SetDefaultMaxUsableMem (uint64_t size) { fgMaxUsableMem = size;}
    584 
    585         static int32_t GetDefaultNumThreads()     { return fgNumQueues;}
    586         static uint32_t GetDefaultNumTiles()      { return fgNumTiles;}
    587         static uint32_t GetDefaultNumRowPerTile() { return fgRowPerTile;}
    588         static uint64_t GetDefaulMaxUsableMem()   { return fgMaxUsableMem;}
    589 
    590583        /// Get and set the actual number of threads for this object
    591584        int32_t GetNumThreads() const { return fNumQueues;}
    592         bool SetNumThreads(int32_t num)
     585        bool SetNumThreads(uint32_t num)
    593586        {
    594587            if (is_open())
     
    604597            //get number of physically available threads
    605598#ifdef USE_BOOST_THREADS
    606             int32_t num_available_cores = boost::thread::hardware_concurrency();
     599            unsigned int num_available_cores = boost::thread::hardware_concurrency();
    607600#else
    608             int32_t num_available_cores = thread::hardware_concurrency();
    609 #endif
    610 
     601            unsigned int num_available_cores = thread::hardware_concurrency();
     602#endif
    611603            // could not detect number of available cores from system properties...
    612             // assume that 5 cores are available (4 compression, 1 write)
    613604            if (num_available_cores == 0)
    614                 num_available_cores = 5;
    615 
    616             // Throw an exception if too many cores are requested
     605                num_available_cores = 1;
     606
     607            // leave one core for the main thread and one for the writing
    617608            if (num > num_available_cores)
    618             {
    619                 ostringstream str;
    620                 str << "You will be using more threads(" << num << ") than available cores(" << num_available_cores << "). Expect sub-optimal performances";
    621 #ifdef __EXCEPTIONS
    622                 throw runtime_error(str.str());
    623 #else
    624                 gLog << ___err___ << "WARNING - " << str.str() << endl;
    625 #endif
    626             }
    627 
    628             if (num == -1)
    629                 num = num_available_cores-2; // 1 for writing, 1 for the main thread
    630 
    631             if (fCompressionQueues.size() == (uint32_t)num)
    632                 return true;
    633 
    634             //cannot be const, as resize does not want it that way
    635             Queue<CompressionTarget> queue(bind(&zofits::CompressBuffer, this, placeholders::_1), false);
    636 
    637             //shrink if required
    638             if ((uint32_t)num < fCompressionQueues.size())
    639             {
    640                 fCompressionQueues.resize(num, queue);
    641                 return true;
    642             }
    643 
    644             //grow if required
    645             fCompressionQueues.resize(num, queue);
    646 
    647             fNumQueues = num;
     609                num = num_available_cores>2 ? num_available_cores-2 : 1;
     610
     611            if (fCompressionQueues.size() != uint32_t(num))
     612            {
     613                fCompressionQueues.resize(num, Queue<CompressionTarget>(bind(&zofits::CompressBuffer, this, placeholders::_1), false));
     614                fNumQueues = num;
     615            }
    648616
    649617            return true;
     
    991959
    992960        //thread related stuff
    993         MemoryManager   fMemPool;                  ///< Actual memory manager, providing memory for the compression buffers
    994         static int32_t  fgNumQueues;              ///<  Default number of threads to be used by the objects
    995         static uint32_t fgNumTiles;              ///< Default number of reserved tiles
    996         static uint32_t fgRowPerTile;           ///< Default number of rows per tile
    997         static uint64_t fgMaxUsableMem;        ///< Default usable memory PER OBJECT
    998         int32_t         fNumQueues;           ///<   Current number of threads that will be used by this object
    999         uint64_t        fMaxUsableMem;       ///<    Maximum number of bytes that can be allocated by the memory manager
    1000         int32_t         fLatestWrittenTile; ///<     Index of the last tile written to disk (for correct ordering while using several threads)
     961        MemoryManager   fMemPool;           ///< Actual memory manager, providing memory for the compression buffers
     962        int32_t         fNumQueues;         ///< Current number of threads that will be used by this object
     963        uint64_t        fMaxUsableMem;      ///< Maximum number of bytes that can be allocated by the memory manager
     964        int32_t         fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
    1001965
    1002966        vector<Queue<CompressionTarget>>          fCompressionQueues;  ///< Processing queues (=threads)
     
    10471011};
    10481012
    1049 int32_t  zofits::fgNumQueues = 0;
    1050 uint32_t zofits::fgNumTiles = 1000;
    1051 uint32_t zofits::fgRowPerTile = 100;
    1052 uint64_t zofits::fgMaxUsableMem = 1073741824; // one gigabyte
    1053 
    10541013#ifndef __MARS__
    10551014}; //namespace std
    10561015#endif
     1016
     1017#endif
Note: See TracChangeset for help on using the changeset viewer.