Changeset 17254 for trunk/Mars/mcore
- Timestamp:
- 10/18/13 14:10:24 (11 years ago)
- Location:
- trunk/Mars/mcore
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Mars/mcore/MemoryManager.h
r17251 r17254 99 99 100 100 std::shared_ptr<MemoryStock> fMemoryStock; 101 std::shared_ptr<char> fPointer; 102 char* fRealPointer; 101 std::shared_ptr<char> fPointer; 103 102 104 MemoryChunk(const std::shared_ptr<MemoryStock> &mem, bool block) : fMemoryStock(mem) 103 MemoryChunk(const std::shared_ptr<MemoryStock> &mem, bool block) 104 : fMemoryStock(mem) 105 105 { 106 106 fPointer = fMemoryStock->pop(block); 107 fRealPointer = fPointer.get();108 107 } 109 108 … … 113 112 fMemoryStock->push(fPointer); 114 113 } 115 116 public:117 char* get() { return fPointer.get();}118 114 }; 119 115 … … 123 119 124 120 public: 125 126 121 MemoryManager(size_t chunk, size_t max) : fMemoryStock(std::make_shared<MemoryStock>(chunk, max)) 127 122 { 128 123 } 129 124 130 std::shared_ptr< MemoryChunk> malloc(bool block=true)125 std::shared_ptr<char> malloc(bool block=true) 131 126 { 132 MemoryChunk *chunk = new MemoryChunk(fMemoryStock, block); 133 //Etienne cannot get the aliasing to work (at least with g++ 4.4.6 134 return std::shared_ptr<MemoryChunk>(std::shared_ptr<MemoryChunk>(chunk));//, chunk->fRealPointer); 127 const std::shared_ptr<MemoryChunk> chunk(new MemoryChunk(fMemoryStock, block)); 128 return std::shared_ptr<char>(chunk, chunk->fPointer.get()); 135 129 } 136 130 … … 142 136 throw std::runtime_error("Cannot change the chunk size while there is memory in use"); 143 137 if (getMaxMemory()<size) 144 throw std::runtime_error("Chunk size ("+to_string(size)+") larger thann allowed memory ("+to_string(getMaxMemory())+")");138 throw std::runtime_error("Chunk size ("+std::to_string(size)+") larger than allowed memory ("+std::to_string(getMaxMemory())+")"); 145 139 #else 146 140 if (getInUse() || getMaxMemory()<size) -
trunk/Mars/mcore/zofits.h
r17253 r17254 37 37 }; 38 38 39 39 40 //catalog types 40 41 struct CatalogEntry … … 42 43 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {}; 43 44 int64_t first; ///< Size of this column in the tile 44 int64_t second; ///< offset of this column in the tile, from the start of the heap area45 int64_t second; ///< offset of this column in the tile, from the start of the heap area 45 46 } __attribute__((__packed__)); 47 46 48 typedef vector<CatalogEntry> CatalogRow; 47 49 typedef list<CatalogRow> CatalogType; 48 50 51 49 52 /// Parameters required to write a tile to disk 50 53 struct WriteTarget … … 55 58 } 56 59 57 uint32_t 58 uint32_t size; ///<Size to write59 shared_ptr< MemoryChunk> data; ///<Memory block to write60 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order) 61 uint32_t size; ///< Size to write 62 shared_ptr<char> data; ///< Memory block to write 60 63 }; 64 61 65 62 66 /// Parameters required to compress a tile of data … … 66 70 {} 67 71 68 CatalogRow& 69 shared_ptr< MemoryChunk> src; ///<Original data70 shared_ptr< MemoryChunk> transposed_src;///<Transposed data71 WriteTarget target; ///<Compressed data72 uint32_t num_rows; ///<Number of rows to compress72 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with 73 shared_ptr<char> src; ///< Original data 74 shared_ptr<char> transposed_src; ///< Transposed data 75 WriteTarget target; ///< Compressed data 76 uint32_t num_rows; ///< Number of rows to compress 73 77 }; 74 78 … … 124 128 fNumRowsPerTile = rpt; 125 129 126 fBuffer = NULL;127 130 fRealRowWidth = 0; 128 131 fCatalogOffset = 0; … … 273 276 274 277 //copy current row to pool or rows waiting for compression 275 char* target_location = f Buffer+ fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);278 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile); 276 279 memcpy(target_location, ptr, fRealRowWidth); 277 280 … … 309 312 if (fNumQueues == 0) 310 313 { //no worker threads. do everything in-line 311 uint64_t size_to_write = CompressBuffer(compress_target);314 const uint64_t size_to_write = CompressBuffer(compress_target); 312 315 313 316 WriteTarget write_target; … … 325 328 uint32_t current_index = 0; 326 329 327 for (auto it=fCompressionQueues. begin(); it!=fCompressionQueues.end(); it++)330 for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++) 328 331 { 329 332 if (it->size() < min_size) … … 356 359 { 357 360 //get space for transposed data 358 shared_ptr< MemoryChunk> transposed_data = fMemPool.malloc();361 shared_ptr<char> transposed_data = fMemPool.malloc(); 359 362 360 363 //fill up write to disk target … … 372 375 //get a new buffer to host the incoming data 373 376 fSmartBuffer = fMemPool.malloc(); 374 fBuffer = fSmartBuffer.get()->get();375 377 } 376 378 … … 625 627 626 628 fSmartBuffer = fMemPool.malloc(); 627 fBuffer = fSmartBuffer.get()->get();628 629 629 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming 630 630 } … … 686 686 #endif 687 687 //transpose the original data 688 copyTransposeTile(target.src.get() ->get(), target.transposed_src.get()->get());688 copyTransposeTile(target.src.get(), target.transposed_src.get()); 689 689 690 690 //compress the buffer 691 compressed_size = compressBuffer(target.target.data.get() ->get(), target.transposed_src.get()->get(), target.num_rows, target.catalog_entry);691 compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry); 692 692 #ifdef __EXCEPTIONS 693 693 } … … 730 730 { 731 731 #endif 732 if (!writeCompressedDataToDisk(target.data.get() ->get(), target.size))732 if (!writeCompressedDataToDisk(target.data.get(), target.size)) 733 733 {//could not write the data to disk 734 734 ostringstream str; … … 990 990 }; 991 991 vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file 992 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row 993 shared_ptr<MemoryChunk> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written 994 char* fBuffer; ///< regular version of fSmartBuffer 995 vector<char> fRawSumBuffer;///< buffer used for checksuming the incoming data, before compression 992 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row 993 shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written 994 vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression 996 995 997 996 #ifdef __EXCEPTIONS
Note:
See TracChangeset
for help on using the changeset viewer.