Changeset 17237 for trunk/Mars
- Timestamp:
- 10/17/13 15:21:09 (11 years ago)
- Location:
- trunk/Mars/mcore
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Mars/mcore/factofits.h
r17226 r17237 11 11 #include "zofits.h" 12 12 13 #include "../src/DataWriteFits2.h" 14 15 #include "DrsCalib.h" 16 13 17 #ifndef __MARS__ 14 18 namespace std … … 23 27 public: 24 28 29 /// constructors 25 30 factofits(uint32_t numTiles=1000, 26 31 uint32_t rowPerTile=100, … … 30 35 fDataOffset = -1; 31 36 } 37 32 38 factofits(const char* fname, 33 39 uint32_t numTiles=1000, … … 38 44 fDataOffset = -1; 39 45 } 46 40 47 virtual ~factofits() 41 48 { 42 49 } 43 50 44 // whether or not a calibration was given to the file writer51 /// whether or not a calibration was given to the file writer 45 52 virtual bool IsOffsetCalibration() 46 53 { … … 48 55 } 49 56 50 // assign a given drs offset calibration57 ///assign a given drs offset calibration 51 58 void SetDrsCalibration(const vector<float> &calib) 52 59 { 60 VerifyCalibrationSize(calib.size()); 61 53 62 if (!IsOffsetCalibration()) 54 63 fOffsetCalibration.resize(1440*1024); … … 58 67 } 59 68 60 void SetDrsCalibration(const vector<float>& calib) 61 { 62 if (calib.size() != 1440*1024) 63 #ifdef __EXCEPTIONS 64 throw runtime_error("Cannot load calibration with anything else than 1024 samples per pixel"); 65 #else 66 gLog << ___err___ << "ERROR - Cannot load calibration with anything else than 1024 samples per pixel"); 67 #endif 68 SetDrsCalibration(calib.data()); 69 } 70 69 ///assign a given drs offset calibration 71 70 void SetDrsCalibration(const vector<int16_t>& vec) 72 71 { 72 VerifyCalibrationSize(vec.size()); 73 73 74 if (!IsOffsetCalibration()) 74 75 fOffsetCalibration.resize(1440*1024); … … 78 79 } 79 80 81 ///assign a given drs offset calibration 80 82 void SetDrsCalibration(const DrsCalibration& drs) 81 83 { … … 83 85 return; 84 86 87 VerifyCalibrationSize(drs.fOffset.size()); 88 85 89 if (!IsOffsetCalibration()) 86 90 fOffsetCalibration.resize(1440*1024); … … 90 94 } 91 95 96 ///Overload of the super function 92 97 bool WriteTableHeader(const char* name="DATA") 93 98 { … … 102 107 if (it->col.name == "StartCellData") 103 108 fStartCellsOffset = it->col.offset; 109 104 110 if (it->col.name == "Data") 105 111 { 106 fNumSlices = it->col.num;112 fNumSlices = it->col.num; 107 113 fDataOffset = it->col.offset; 108 114 if (fNumSlices % 1440 != 0) … … 118 124 } 119 125 } 126 120 127 if (fStartCellsOffset < 0) 121 128 { … … 128 135 fOffsetCalibration.resize(0); 129 136 } 137 130 138 if (fDataOffset < 0) 131 139 { … … 143 151 } 144 152 153 ///Actually write the drs calibration table 145 154 virtual bool WriteDrsOffsetsTable() 146 155 { … … 191 200 } 192 201 202 ///Apply the drs offset calibration (overload of super-method) 193 203 virtual void DrsOffsetCalibrate(char* target_location) 194 204 { 195 205 if (IsOffsetCalibration()) 196 206 { 197 int16_t* startCell = reinterpret_cast<int16_t*>(target_location + fStartCellsOffset);198 int16_t* data = reinterpret_cast<int16_t*>(target_location + fDataOffset);207 const int16_t* startCell = reinterpret_cast<int16_t*>(target_location + fStartCellsOffset); 208 int16_t* data = reinterpret_cast<int16_t*>(target_location + fDataOffset); 199 209 200 210 for (uint32_t ch=0; ch<1440; ch++) … … 226 236 227 237 private: 238 /// Checks if the size of the input calibration is ok 239 void VerifyCalibrationSize(uint32_t size) 240 { 241 if (size != 1440*1024) 242 { 243 ostringstream str; 244 str << "Cannot load calibration with anything else than 1440 pixels and 1024 samples per pixel. Got a total size of " << size; 245 #ifdef __EXCEPTIONS 246 throw runtime_error(str.str()); 247 #else 248 gLog << ___err___ << "ERROR - " << str.str() << endl; 249 #endif 250 } 251 } 252 228 253 //Offsets calibration stuff. 229 vector<int16_t> fOffsetCalibration; ///< The calibration itself230 int32_t fStartCellsOffset; ///<Offset in bytes for the startcell data231 int32_t fDataOffset; ///<Offset in bytes for the data232 int32_t fNumSlices; ///< Number of samples per pixel per event254 vector<int16_t> fOffsetCalibration; ///< The calibration itself 255 int32_t fStartCellsOffset; ///< Offset in bytes for the startcell data 256 int32_t fDataOffset; ///< Offset in bytes for the data 257 int32_t fNumSlices; ///< Number of samples per pixel per event 233 258 234 259 }; //class factofits -
trunk/Mars/mcore/zofits.h
r17228 r17237 22 22 #endif 23 23 24 25 24 class zofits : public ofits 26 25 { 26 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin 27 27 template<class S> 28 28 struct QueueMin : std::list<S> … … 34 34 }; 35 35 36 /// Parameters required to write a tile to disk 36 37 struct WriteTarget 37 38 { … … 41 42 } 42 43 43 uint32_t tile_num;44 uint32_t size;45 shared_ptr<MemoryChunk> target;44 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order) 45 uint32_t size; ///< Size to write 46 shared_ptr<MemoryChunk> data; ///< Memory block to write 46 47 }; 47 48 49 /// Parameters required to compress a tile of data 48 50 struct CompressionTarget 49 51 { 50 /* 51 bool operator < (const CompressionTarget& other) 52 { 53 return target < other.target; 54 }*/ 55 56 shared_ptr<MemoryChunk> src; 57 shared_ptr<MemoryChunk> transposed_src; 58 WriteTarget target; 59 uint32_t num_rows; 52 shared_ptr<MemoryChunk> src; ///< Original data 53 shared_ptr<MemoryChunk> transposed_src; ///< Transposed data 54 WriteTarget target; ///< Compressed data 55 uint32_t num_rows; ///< Number of rows to compress 60 56 }; 61 57 62 58 public: 63 //constructors 59 /// constructors 60 /// @param numTiles how many data groups should be pre-reserved ? 61 /// @param rowPerTile how many rows will be grouped together in a single tile 62 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers 64 63 zofits(uint32_t numTiles=1000, 65 64 uint32_t rowPerTile=100, … … 72 71 } 73 72 73 /// @param fname the target filename 74 /// @param numTiles how many data groups should be pre-reserved ? 75 /// @param rowPerTile how many rows will be grouped together in a single tile 76 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers 74 77 zofits(const char* fname, 75 78 uint32_t numTiles=1000, … … 83 86 } 84 87 88 /// destructors 85 89 virtual ~zofits() 86 90 { … … 88 92 89 93 //initialization of member variables 90 void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0, uint64_t maxUsableMem=0) 94 /// @param nt number of tiles 95 /// @param rpt number of rows per tile 96 /// @param maxUsableMem max amount of RAM to be used by the compression buffers 97 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0) 91 98 { 92 99 if (nt == 0) 93 throw runtime_error("Cannot work with a catalog of size 0. sorry."); 94 95 fCheckOffset = 0; 100 throw runtime_error("There must be at least 1 tile of data (0 specified). This is required by the FITS standard. Please try again with num_tile >= 1."); 101 102 fCheckOffset = 0; 103 fNumQueues = 0; 96 104 97 105 fNumTiles = nt; 98 106 fNumRowsPerTile = rpt; 99 107 100 fBuffer = NULL;101 fRealRowWidth = 0;108 fBuffer = NULL; 109 fRealRowWidth = 0; 102 110 fCatalogExtraRows = 0; 103 104 fCatalogOffset = 0; 111 fCatalogOffset = 0; 105 112 106 113 fMaxUsableMem = maxUsableMem; … … 110 117 } 111 118 112 113 //write the header of the binary table 119 /// write the header of the binary table 120 /// @param name the name of the table to be created 121 /// @return the state of the file 114 122 virtual bool WriteTableHeader(const char* name="DATA") 115 123 { 116 if (!reallocateBuffers()) 117 throw ("While allocating memory: apparently there not as much free memory as advertized..."); 124 reallocateBuffers(); 118 125 119 126 ofits::WriteTableHeader(name); … … 125 132 it->start(); 126 133 134 //start the disk writer 127 135 fWriteToDiskQueue.start(); 128 136 } … … 134 142 } 135 143 144 /// open a new file. 145 /// @param filename the name of the file 146 /// @param Whether or not the name of the extension should be added or not 136 147 void open(const char* filename, bool addEXTNAMEKey=true) 137 148 { … … 139 150 140 151 //add compression-related header entries 141 SetBool( "ZTABLE", true,"Table is compressed");142 SetInt( "ZNAXIS1", 0,"Width of uncompressed rows");143 SetInt( "ZNAXIS2", 0,"Number of uncompressed rows");144 SetInt( "ZPCOUNT", 0,"");145 SetInt( "ZHEAPPTR", 0,"");146 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");147 SetInt( "THEAP", 0,"");148 SetStr( "RAWSUM", " 0","Checksum of raw little endian data");149 SetFloat("ZRATIO", 0,"Compression ratio");152 SetBool( "ZTABLE", true, "Table is compressed"); 153 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows"); 154 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows"); 155 SetInt( "ZPCOUNT", 0, ""); 156 SetInt( "ZHEAPPTR", 0, ""); 157 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile"); 158 SetInt( "THEAP", 0, ""); 159 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data"); 160 SetFloat("ZRATIO", 0, "Compression ratio"); 150 161 151 162 fCatalogExtraRows = 0; … … 153 164 } 154 165 166 /// Super method. does nothing as zofits does not know about DrsOffsets 167 /// @return the state of the file 155 168 virtual bool WriteDrsOffsetsTable() 156 169 { … … 158 171 } 159 172 173 /// Returns the number of bytes per uncompressed row 174 /// @return number of bytes per uncompressed row 160 175 uint32_t GetBytesPerRow() const 161 176 { … … 163 178 } 164 179 180 /// Write the data catalog 181 /// @return the state of the file 165 182 bool WriteCatalog() 166 183 { 167 184 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t); 168 const uint32_t total_catalog_size = fCatalog.size()*one_catalog_row_size; 169 185 const uint32_t total_catalog_size = fCatalog.size()*one_catalog_row_size; 186 187 // swap the catalog bytes before writing 170 188 vector<char> swapped_catalog(total_catalog_size); 171 189 uint32_t shift = 0; … … 176 194 } 177 195 196 // first time writing ? remember where we are 178 197 if (fCatalogOffset == 0) 179 {180 198 fCatalogOffset = tellp(); 181 } 182 199 200 // remember where we came from 183 201 const off_t where_are_we = tellp(); 184 202 203 // write to disk 185 204 seekp(fCatalogOffset); 186 205 write(swapped_catalog.data(), total_catalog_size); … … 188 207 seekp(where_are_we); 189 208 209 // udpate checksum 190 210 fCatalogSum.reset(); 191 211 fCatalogSum.add(swapped_catalog.data(), total_catalog_size); … … 193 213 return good(); 194 214 } 215 216 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets. 195 217 virtual void DrsOffsetCalibrate(char* ) 196 218 { … … 198 220 } 199 221 222 /// Grows the catalog in case not enough rows were allocated 200 223 void GrowCatalog() 201 224 { … … 214 237 } 215 238 239 /// write one row of data 240 /// @param ptr the source buffer 241 /// @param the number of bytes to write 242 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written 216 243 bool WriteRow(const void* ptr, size_t cnt, bool = true) 217 244 { … … 242 269 243 270 //for now, make an extra copy of the data, for RAWSUM checksuming. 244 //Ideally this should be moved to the threads , along with the drs-offset-calibration271 //Ideally this should be moved to the threads 245 272 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether 246 273 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full.... … … 278 305 WriteTarget write_target; 279 306 write_target.size = size_to_write; 280 write_target. target = compress_target.target.target;307 write_target.data = compress_target.target.data; 281 308 write_target.tile_num = compress_target.target.tile_num; 282 309 283 if (!WriteBufferToDisk(write_target)) 284 throw runtime_error("Something went wrong while writing to disk"); 310 return WriteBufferToDisk(write_target); 285 311 } 286 312 else … … 302 328 303 329 if (!fCompressionQueues[min_index].post(compress_target)) 304 throw runtime_error(" I could not post this buffer. This does not make sense...");330 throw runtime_error("The compression queues are not started. Did you close the file before writing this row ?"); 305 331 } 306 332 } … … 309 335 } 310 336 337 /// update the real number of rows 311 338 void FlushNumRows() 312 339 { 313 SetInt("NAXIS2", fTable.num_rows/fNumRowsPerTile);340 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile); 314 341 SetInt("ZNAXIS2", fTable.num_rows); 315 342 FlushHeader(); 316 343 } 317 344 345 /// Setup the environment to compress yet another tile of data 346 /// @param target the struct where to host the produced parameters 318 347 void SetNextCompression(CompressionTarget& target) 319 348 { … … 325 354 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile; 326 355 write_target.size = 0; 327 write_target. target= fMemPool.malloc();356 write_target.data = fMemPool.malloc(); 328 357 329 358 //fill up compression target 330 359 target.src = fSmartBuffer; 331 target.transposed_src 332 target.target = write_target;333 target.num_rows = fTable.num_rows;360 target.transposed_src = transposed_data; 361 target.target = write_target; 362 target.num_rows = fTable.num_rows; 334 363 335 364 //get a new buffer to host the incoming data … … 338 367 } 339 368 369 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file. 340 370 void ShrinkCatalog() 341 371 { … … 390 420 } 391 421 422 /// close an open file. 423 /// @return the state of the file 392 424 bool close() 393 425 { 426 // stop compression and write threads 394 427 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++) 395 428 it->wait(); … … 400 433 { 401 434 #ifdef __EXCEPTIONS 402 throw runtime_error(" Something went wrong while writing to disk...");435 throw runtime_error("Looks like the file has been closed already"); 403 436 #else 404 437 return false; … … 407 440 408 441 #ifdef __EXCEPTIONS 409 //check if something hapenned to the compression threads442 //check if something hapenned while the compression threads were working 410 443 if (fThreadsException != exception_ptr()) 411 444 { 445 //if so, re-throw the exception that was generated 412 446 rethrow_exception(fThreadsException); 413 447 } 414 448 #endif 415 449 450 //write the last tile of data (if any 416 451 if (fTable.num_rows%fNumRowsPerTile != 0) 417 452 { … … 427 462 WriteTarget write_target; 428 463 write_target.size = size_to_write; 429 write_target. target = compress_target.target.target;464 write_target.data = compress_target.target.data; 430 465 write_target.tile_num = compress_target.target.tile_num; 431 466 … … 440 475 SetInt("ZNAXIS2", fTable.num_rows); 441 476 442 uint64_t heap_offset = fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2; 443 SetInt("ZHEAPPTR", heap_offset); 477 SetInt("ZHEAPPTR", fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2); 444 478 445 479 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile; 446 447 SetInt("THEAP", total_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols); 448 449 SetInt("NAXIS1", 2*sizeof(int64_t)*fTable.num_cols);480 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols; 481 482 SetInt("THEAP", total_num_tiles_written*total_catalog_width); 483 SetInt("NAXIS1", total_catalog_width); 450 484 SetInt("NAXIS2", total_num_tiles_written); 451 485 … … 471 505 } 472 506 473 float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;507 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size; 474 508 SetFloat("ZRATIO", compression_ratio); 475 509 … … 478 512 479 513 SetInt("PCOUNT", heap_size, "size of special data area"); 480 481 514 482 515 //Just for updating the fCatalogSum value … … 502 535 } 503 536 504 // Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column537 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column 505 538 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true) 506 539 { … … 508 541 } 509 542 543 /// Overload of the simplified compressed version 510 544 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true) 511 545 { … … 543 577 } 544 578 579 /// static setter for the default number of threads to use. -1 means all available physical cores 545 580 static void SetDefaultNumThreads(int32_t num) { fgNumQueues = num;} 546 581 static int32_t GetDefaultNumThreads() { return fgNumQueues;} 547 582 548 int32_t GetNumThreads() { return fNumQueues;} 583 /// Get and set the actual number of threads for this object 584 int32_t GetNumThreads() const { return fNumQueues;} 549 585 bool SetNumThreads(int32_t num) 550 586 { … … 554 590 throw runtime_error("File must be closed before changing the number of compression threads"); 555 591 #else 556 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" ;592 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" << endl; 557 593 #endif 558 594 return false; 559 595 } 596 597 //get number of physically available threads 560 598 #ifdef USE_BOOST_THREADS 561 599 int32_t num_available_cores = boost::thread::hardware_concurrency(); … … 564 602 #endif 565 603 604 // could not detect number of available cores from system properties... 605 // assume that 5 cores are available (4 compression, 1 write) 566 606 if (num_available_cores == 0) 567 {//could not detect number of available cores from system properties...568 //Assuming that 5 cores are availables (4 compression, 1 write)569 607 num_available_cores = 5; 570 } 608 609 // Throw an exception if too many cores are requested 571 610 if (num > num_available_cores) 572 611 { 573 612 ostringstream str; 574 str << " Number of threads cannot be greater than physically available (" << num_available_cores << ")";613 str << "You will be using more threads(" << num << ") than available cores(" << num_available_cores << "). Expect sub-optimal performances"; 575 614 #ifdef __EXCEPTIONS 576 615 throw runtime_error(str.str()); 577 616 #else 578 gLog << ___err___ << "ERROR - " << str.str(); 579 #endif 580 return false; 617 gLog << ___err___ << "WARNING - " << str.str() << endl; 618 #endif 581 619 } 582 620 583 621 if (num == -1) 584 num = num_available_cores-2; // 1 for writing, onefor the main thread622 num = num_available_cores-2; // 1 for writing, 1 for the main thread 585 623 586 624 if (fCompressionQueues.size() == (uint32_t)num) … … 590 628 Queue<CompressionTarget> queue(bind(&zofits::CompressBuffer, this, placeholders::_1), false); 591 629 592 //shrink 630 //shrink if required 593 631 if ((uint32_t)num < fCompressionQueues.size()) 594 632 { … … 597 635 } 598 636 599 //grow 637 //grow if required 600 638 fCompressionQueues.resize(num, queue); 601 639 … … 607 645 protected: 608 646 609 bool reallocateBuffers() 610 { 611 size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming; 647 /// Allocates the required objects. 648 void reallocateBuffers() 649 { 650 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming; 612 651 fMemPool.setChunkSize(chunk_size); 613 652 … … 625 664 *it = CatalogEntry(0,0); 626 665 } 627 return true; 628 } 629 630 bool writeCompressedDataToDisk(char* src, uint32_t sizeToWrite) 666 } 667 668 /// Actually does the writing to disk (and checksuming) 669 /// @param src the buffer to write 670 /// @param sizeToWrite how many bytes should be written 671 /// @return the state of the file 672 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite) 631 673 { 632 674 char* checkSumPointer = src+4; … … 656 698 } 657 699 700 /// Compress a given buffer based on the target. This is the method executed by the threads 701 /// @param target the struct hosting the parameters of the compression 702 /// @return number of bytes of the compressed data, or always 1 when used by the Queues 658 703 uint32_t CompressBuffer(const CompressionTarget& target) 659 704 { … … 667 712 668 713 //compress the buffer 669 compressed_size = compressBuffer(target.target. target.get()->get(), target.transposed_src.get()->get(), target.num_rows);714 compressed_size = compressBuffer(target.target.data.get()->get(), target.transposed_src.get()->get(), target.num_rows); 670 715 #ifdef __EXCEPTIONS 671 716 } … … 686 731 wt.tile_num = target.target.tile_num; 687 732 wt.size = compressed_size; 688 wt. target = target.target.target;733 wt.data = target.target.data; 689 734 690 735 fWriteToDiskQueue.post(wt); 691 736 692 return compressed_size; 693 } 694 737 // if used by the queue, always return true as the elements are not ordered 738 return 1; 739 } 740 741 /// Write one compressed tile to disk. This is the method executed by the writing thread 742 /// @param target the struct hosting the write parameters 695 743 bool WriteBufferToDisk(const WriteTarget& target) 696 744 { … … 701 749 fLatestWrittenTile++; 702 750 703 //write the buffer to disk. 704 return writeCompressedDataToDisk(target.target.get()->get(), target.size); 705 } 706 751 #ifdef __EXCEPTIONS 752 try 753 { 754 #endif 755 if (!writeCompressedDataToDisk(target.data.get()->get(), target.size)) 756 {//could not write the data to disk 757 ostringstream str; 758 str << "An error occured while writing to disk: "; 759 if (eof()) 760 str << "End-Of-File"; 761 if (failbit) 762 str << "Logical error on i/o operation"; 763 if (badbit) 764 str << "Writing error on i/o operation"; 765 #ifdef __EXCEPTIONS 766 throw runtime_error(str.str()); 767 #else 768 gLog << ___err___ << "ERROR - " << str.str() << endl; 769 #endif 770 } 771 #ifdef __EXCEPTIONS 772 } 773 catch(...) 774 { 775 fThreadsException = current_exception(); 776 if (fNumQueues == 0) 777 rethrow_exception(fThreadsException); 778 } 779 #endif 780 return true; 781 } 782 783 /// Compress a given buffer based on its source and destination 707 784 //src cannot be const, as applySMOOTHING is done in place 785 /// @param dest the buffer hosting the compressed data 786 /// @param src the buffer hosting the transposed data 787 /// @param num_rows the number of uncompressed rows in the transposed buffer 788 /// @param the number of bytes of the compressed data 708 789 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows) 709 790 { 710 uint32_t thisRoundNumRows= (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;711 uint32_t offset=0;712 uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile;791 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile; 792 const uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile; 793 uint32_t offset = 0; 713 794 714 795 //skip the checksum reserved area … … 728 809 729 810 //set the default byte telling if uncompressed the compressed Flag 730 uint64_t previousOffset = compressedOffset;811 const uint64_t previousOffset = compressedOffset; 731 812 732 813 //skip header data … … 755 836 if ((head.getProc(0) != kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two) 756 837 {//if so set flag and redo it uncompressed 757 cout << "Redoing uncompressed ! " << endl;838 // cout << "Redoing uncompressed ! " << endl; 758 839 //de-smooth ! 759 840 if (head.getProc(0) == kFactSmoothing) … … 787 868 } 788 869 870 /// Transpose a tile to a new buffer 871 /// @param src buffer hosting the regular, row-ordered data 872 /// @param dest the target buffer that will receive the transposed data 789 873 void copyTransposeTile(const char* src, char* dest) 790 874 { 791 uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;875 const uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile; 792 876 793 877 //copy the tile and transpose it … … 817 901 818 902 /// Specific compression functions 903 /// @param dest the target buffer 904 /// @param src the source buffer 905 /// @param size number of bytes to copy 906 /// @return number of bytes written 819 907 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size) 820 908 { … … 823 911 } 824 912 913 /// Do huffman encoding 914 /// @param dest the buffer that will receive the compressed data 915 /// @param src the buffer hosting the transposed data 916 /// @param numRows number of rows of data in the transposed buffer 917 /// @param sizeOfElems size in bytes of one data elements 918 /// @param numRowElems number of elements on each row 919 /// @return number of bytes written 825 920 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems) 826 921 { … … 836 931 throw runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types"); 837 932 #else 838 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" ;933 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << endl; 839 934 return 0; 840 935 #endif … … 859 954 } 860 955 861 uint32_t applySMOOTHING(char* data, uint32_t numElems)//uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems) 956 /// Applies Thomas' DRS4 smoothing 957 /// @param data where to apply it 958 /// @param numElems how many elements of type int16_t are stored in the buffer 959 /// @return number of bytes modified 960 uint32_t applySMOOTHING(char* data, uint32_t numElems) 862 961 { 863 962 int16_t* short_data = reinterpret_cast<int16_t*>(data); … … 867 966 return numElems*sizeof(int16_t); 868 967 } 869 // Apply the inverse transform of the integer smoothing 870 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems) 968 969 /// Apply the inverse transform of the integer smoothing 970 /// @param data where to apply it 971 /// @param numElems how many elements of type int16_t are stored in the buffer 972 /// @return number of bytes modified 973 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems) 871 974 { 872 975 int16_t* short_data = reinterpret_cast<int16_t*>(data); … … 877 980 return numElems*sizeof(uint16_t); 878 981 } 879 //Compressed data stuff 880 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum 881 uint32_t fNumTiles; 882 uint32_t fNumRowsPerTile; 883 884 MemoryManager fMemPool; 982 983 885 984 886 985 //thread related stuff 887 vector<Queue<CompressionTarget>> fCompressionQueues; 888 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; 889 890 //thread related stuff 891 static int32_t fgNumQueues; ///< The number of threads that will be used to compress 892 int32_t fNumQueues; ///< The number of threads that will be used to compress 893 894 int32_t fLatestWrittenTile; 895 #ifdef __EXCEPTIONS 896 exception_ptr fThreadsException; 897 #endif 986 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers 987 static int32_t fgNumQueues; ///< Default number of threads to be used by the objects 988 int32_t fNumQueues; ///< Current number of threads that will be used by this object 989 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager 990 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads) 991 992 vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads) 993 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread) 994 995 // catalog related stuff 898 996 struct CatalogEntry 899 997 { 900 998 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {}; 901 int64_t first; 902 int64_t second; 999 int64_t first; ///< Size of this column in the tile 1000 int64_t second; ///< offset of this column in the tile, from the start of the heap area 903 1001 } __attribute__((__packed__)); 904 1002 905 typedef vector<CatalogEntry> CatalogRow; 906 typedef vector<CatalogRow> CatalogType; 907 CatalogType fCatalog; 908 Checksum fCatalogSum; 909 Checksum fRawSum; 910 off_t fCatalogOffset; 911 uint32_t fRealRowWidth; 912 uint32_t fCatalogExtraRows; 913 vector<char> fRawSumBuffer; 914 uint64_t fMaxUsableMem; 915 916 shared_ptr<MemoryChunk> fSmartBuffer; 917 char* fBuffer; 918 1003 typedef vector<CatalogEntry> CatalogRow; 1004 typedef vector<CatalogRow> CatalogType; 1005 CatalogType fCatalog; ///< Catalog for this file 1006 uint32_t fNumTiles; ///< Number of pre-reserved tiles 1007 uint32_t fNumRowsPerTile; ///< Number of rows per tile 1008 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file 1009 uint32_t fCatalogExtraRows; ///< Number of extra rows written on top of the initial capacity of the file 1010 1011 // checksum related stuff 1012 Checksum fCatalogSum; ///< Checksum of the catalog 1013 Checksum fRawSum; ///< Raw sum (specific to FACT) 1014 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum 1015 1016 // data layout related stuff 1017 /// Regular columns augmented with compression informations 919 1018 struct CompressedColumn 920 1019 { … … 922 1021 block_head(h) 923 1022 {} 924 Table::Column col; 925 Compression block_head;1023 Table::Column col; ///< the regular column entry 1024 Compression block_head; ///< the compression data associated with that column 926 1025 }; 927 vector<CompressedColumn> fRealColumns; 1026 vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file 1027 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row 1028 shared_ptr<MemoryChunk> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written 1029 char* fBuffer; ///< regular version of fSmartBuffer 1030 vector<char> fRawSumBuffer;///< buffer used for checksuming the incoming data, before compression 1031 1032 #ifdef __EXCEPTIONS 1033 exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads 1034 #endif 928 1035 929 1036 };
Note:
See TracChangeset
for help on using the changeset viewer.