Changeset 17254 for trunk/Mars/mcore


Ignore:
Timestamp:
10/18/13 14:10:24 (11 years ago)
Author:
tbretz
Message:
Use a shared_ptr with aliasing of char* instead of a MemoryChunk
Location:
trunk/Mars/mcore
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/Mars/mcore/MemoryManager.h

    r17251 r17254  
    9999
    100100    std::shared_ptr<MemoryStock> fMemoryStock;
    101     std::shared_ptr<char>   fPointer;
    102     char*                   fRealPointer;
     101    std::shared_ptr<char>        fPointer;
    103102
    104     MemoryChunk(const std::shared_ptr<MemoryStock> &mem, bool block) : fMemoryStock(mem)
     103    MemoryChunk(const std::shared_ptr<MemoryStock> &mem, bool block)
     104        : fMemoryStock(mem)
    105105    {
    106106        fPointer = fMemoryStock->pop(block);
    107         fRealPointer = fPointer.get();
    108107    }
    109108
     
    113112        fMemoryStock->push(fPointer);
    114113    }
    115 
    116 public:
    117     char* get() { return fPointer.get();}
    118114};
    119115
     
    123119
    124120public:
    125 
    126121    MemoryManager(size_t chunk, size_t max) : fMemoryStock(std::make_shared<MemoryStock>(chunk, max))
    127122    {
    128123    }
    129124
    130     std::shared_ptr<MemoryChunk> malloc(bool block=true)
     125    std::shared_ptr<char> malloc(bool block=true)
    131126    {
    132         MemoryChunk *chunk = new MemoryChunk(fMemoryStock, block);
    133         //Etienne cannot get the aliasing to work (at least with g++ 4.4.6
    134         return std::shared_ptr<MemoryChunk>(std::shared_ptr<MemoryChunk>(chunk));//, chunk->fRealPointer);
     127        const std::shared_ptr<MemoryChunk> chunk(new MemoryChunk(fMemoryStock, block));
     128        return std::shared_ptr<char>(chunk, chunk->fPointer.get());
    135129    }
    136130
     
    142136            throw std::runtime_error("Cannot change the chunk size while there is memory in use");
    143137        if (getMaxMemory()<size)
    144             throw std::runtime_error("Chunk size("+to_string(size)+") larger thann allowed memory ("+to_string(getMaxMemory())+")");
     138            throw std::runtime_error("Chunk size ("+std::to_string(size)+") larger than allowed memory ("+std::to_string(getMaxMemory())+")");
    145139#else
    146140        if (getInUse() || getMaxMemory()<size)
  • trunk/Mars/mcore/zofits.h

    r17253 r17254  
    3737        };
    3838
     39
    3940        //catalog types
    4041        struct CatalogEntry
     
    4243            CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};
    4344            int64_t first;   ///< Size of this column in the tile
    44             int64_t second; ///< offset of this column in the tile, from the start of the heap area
     45            int64_t second;  ///< offset of this column in the tile, from the start of the heap area
    4546        } __attribute__((__packed__));
     47
    4648        typedef vector<CatalogEntry> CatalogRow;
    4749        typedef list<CatalogRow>     CatalogType;
    4850
     51
    4952        /// Parameters required to write a tile to disk
    5053        struct WriteTarget
     
    5558            }
    5659
    57             uint32_t                tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
    58             uint32_t                size;    ///< Size to write
    59             shared_ptr<MemoryChunk> data;   ///<   Memory block to write
     60            uint32_t         tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
     61            uint32_t         size;     ///< Size to write
     62            shared_ptr<char> data;     ///<  Memory block to write
    6063        };
     64
    6165
    6266        /// Parameters required to compress a tile of data
     
    6670            {}
    6771
    68             CatalogRow&             catalog_entry;   ///< Reference to the catalog entry to deal with
    69             shared_ptr<MemoryChunk> src;            ///< Original data
    70             shared_ptr<MemoryChunk> transposed_src;///<  Transposed data
    71             WriteTarget             target;       ///<    Compressed data
    72             uint32_t                num_rows;    ///<    Number of rows to compress
     72            CatalogRow&      catalog_entry;   ///< Reference to the catalog entry to deal with
     73            shared_ptr<char> src;             ///< Original data
     74            shared_ptr<char> transposed_src;  ///< Transposed data
     75            WriteTarget      target;          ///< Compressed data
     76            uint32_t         num_rows;        ///< Number of rows to compress
    7377         };
    7478
     
    124128            fNumRowsPerTile = rpt;
    125129
    126             fBuffer           = NULL;
    127130            fRealRowWidth     = 0;
    128131            fCatalogOffset    = 0;
     
    273276
    274277            //copy current row to pool or rows waiting for compression
    275             char* target_location = fBuffer + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
     278            char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
    276279            memcpy(target_location, ptr, fRealRowWidth);
    277280
     
    309312                if (fNumQueues == 0)
    310313                { //no worker threads. do everything in-line
    311                     uint64_t size_to_write = CompressBuffer(compress_target);
     314                    const uint64_t size_to_write = CompressBuffer(compress_target);
    312315
    313316                    WriteTarget write_target;
     
    325328                     uint32_t current_index = 0;
    326329
    327                      for (auto it=fCompressionQueues.begin(); it!=fCompressionQueues.end(); it++)
     330                     for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++)
    328331                     {
    329332                         if (it->size() < min_size)
     
    356359        {
    357360            //get space for transposed data
    358             shared_ptr<MemoryChunk> transposed_data = fMemPool.malloc();
     361            shared_ptr<char> transposed_data = fMemPool.malloc();
    359362
    360363            //fill up write to disk target
     
    372375            //get a new buffer to host the incoming data
    373376            fSmartBuffer = fMemPool.malloc();
    374             fBuffer      = fSmartBuffer.get()->get();
    375377        }
    376378
     
    625627
    626628            fSmartBuffer = fMemPool.malloc();
    627             fBuffer      = fSmartBuffer.get()->get();
    628 
    629629            fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
    630630        }
     
    686686#endif
    687687                //transpose the original data
    688                 copyTransposeTile(target.src.get()->get(), target.transposed_src.get()->get());
     688                copyTransposeTile(target.src.get(), target.transposed_src.get());
    689689
    690690                //compress the buffer
    691                 compressed_size = compressBuffer(target.target.data.get()->get(), target.transposed_src.get()->get(), target.num_rows, target.catalog_entry);
     691                compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
    692692#ifdef __EXCEPTIONS
    693693            }
     
    730730            {
    731731#endif
    732                 if (!writeCompressedDataToDisk(target.data.get()->get(), target.size))
     732                if (!writeCompressedDataToDisk(target.data.get(), target.size))
    733733                {//could not write the data to disk
    734734                    ostringstream str;
     
    990990        };
    991991        vector<CompressedColumn> fRealColumns;     ///< Vector hosting the columns of the file
    992         uint32_t                 fRealRowWidth;   ///<  Width in bytes of one uncompressed row
    993         shared_ptr<MemoryChunk>  fSmartBuffer;   ///<   Smart pointer to the buffer where the incoming rows are written
    994         char*                    fBuffer;       ///<    regular version of fSmartBuffer
    995         vector<char>             fRawSumBuffer;///<     buffer used for checksuming the incoming data, before compression
     992        uint32_t                 fRealRowWidth;    ///< Width in bytes of one uncompressed row
     993        shared_ptr<char>         fSmartBuffer;     ///< Smart pointer to the buffer where the incoming rows are written
     994        vector<char>             fRawSumBuffer;    ///< buffer used for checksuming the incoming data, before compression
    996995
    997996#ifdef __EXCEPTIONS
Note: See TracChangeset for help on using the changeset viewer.