/* * zfits.h * * Created on: May 16, 2013 * Author: lyard */ #ifndef MARS_zfits #define MARS_zfits #include "fits.h" #include "huffman.h" #include "FITS.h" class zfits : public fits { public: // Basic constructor zfits(const std::string& fname, const std::string& tableName="", bool force=false) : fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0) { open(fname.c_str()); Constructor(fname, "", tableName, force); InitCompressionReading(); } // Alternative contstructor zfits(const std::string& fname, const std::string& fout, const std::string& tableName, bool force=false) : fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0) { open(fname.c_str()); Constructor(fname, fout, tableName, force); InitCompressionReading(); } // Skip the next row bool SkipNextRow() { if (!fTable.is_compressed) return fits::SkipNextRow(); fRow++; return true; } virtual bool IsFileOk() const { bool rawsum = true; if (HasKey("RAWSUM")) { std::ostringstream str; str << fRawsum.val(); rawsum = (GetStr("RAWSUM") == str.str()); } return fits::IsFileOk() && rawsum; }; size_t GetNumRows() const { if (fTable.is_compressed) return fTable.Get("ZNAXIS2"); else return fTable.Get("NAXIS2"); } size_t GetBytesPerRow() const { if (fTable.is_compressed) return fTable.Get("ZNAXIS1"); else return fTable.Get("NAXIS1"); } protected: // Stage the requested row to internal buffer // Does NOT return data to users virtual void StageRow(size_t row, char* dest) { if (!fTable.is_compressed) { fits::StageRow(row, dest); return; } ReadBinaryRow(row, dest); } private: // Do what it takes to initialize the compressed structured void InitCompressionReading() { if (!fTable.is_compressed) return; //The constructor may have failed if (!good()) return; if (fTable.is_compressed) for (auto it=fTable.sorted_cols.cbegin(); it!= fTable.sorted_cols.cend(); it++) { if (it->comp == kCompFACT) continue; clear(rdstate()|std::ios::badbit); #ifdef __EXCEPTIONS throw std::runtime_error("Only the FACT compression scheme is handled by this reader."); #else gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << std::endl; return; #endif } fColumnOrdering.resize(fTable.sorted_cols.size(), FITS::kOrderByRow); //Get compressed specific keywords fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0; fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0; //read the file's catalog ReadCatalog(); //give it some space for uncompressing AllocateBuffers(); //check that heap agrees with head //CheckIfFileIsConsistent(); } // Copy decompressed data to location requested by user void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c) { if (!fTable.is_compressed) { fits::MoveColumnDataToUserSpace(dest, src, c); return; } memcpy(dest, src, c.num*c.size); } std::vector fBuffer; /// fTransposedBuffer; /// fCompressedBuffer; /// fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile. size_t fNumTiles; ///< Total number of tiles size_t fNumRowsPerTile; ///< Number of rows per compressed tile int64_t fCurrentRow; ///< current row in memory signed because we need -1 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data streamoff fHeapFromDataStart; ///< offset from the beginning of the data table std::vector>> fCatalog; ///< Catalog, i.e. the main table that points to the compressed data. std::vector fTileSize; ///< size in bytes of each compressed tile std::vector> fTileOffsets; ///< offset from start of tile of a given compressed column Checksum fRawsum; ///< Checksum of the uncompressed, raw data // Get buffer space void AllocateBuffers() { uint32_t buffer_size = fTable.bytes_per_row*fNumRowsPerTile; uint32_t compressed_buffer_size = fTable.bytes_per_row*fNumRowsPerTile + //use a bit more memory for block headers. 256 char coding the compression sequence max. fTable.num_cols*(sizeof(FITS::BlockHeader)+256) + //a bit more for the tile headers sizeof(FITS::TileHeader) + //and a bit more for checksuming 8; if (buffer_size % 4 != 0) buffer_size += 4 - (buffer_size%4); if (compressed_buffer_size % 4 != 0) compressed_buffer_size += 4 - (compressed_buffer_size%4); fBuffer.resize(buffer_size); fTransposedBuffer.resize(buffer_size); fCompressedBuffer.resize(compressed_buffer_size); } // Read catalog data. I.e. the address of the compressed data inside the heap void ReadCatalog() { std::vector readBuf(16); fCatalog.resize(fNumTiles); const streampos catalogStart = tellg(); fChkData.reset(); //do the actual reading for (uint32_t i=0;i(reinterpret_cast(tempValues), readBuf.data(), 2); if (tempValues[0] < 0 || tempValues[1] < 0) { clear(rdstate()|std::ios::badbit); #ifdef __EXCEPTIONS throw std::runtime_error("Negative value in the catalog"); #else gLog << ___err___ << "ERROR - negative value in the catalog" << std::endl; return; #endif } //add catalog entry fCatalog[i].emplace_back(tempValues[0], tempValues[1]); } //see if there is a gap before heap data fHeapOff = tellg()+fTable.GetHeapShift(); fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift(); //check if the catalog has been shrinked uint32_t shrink_factor = 1; if (HasKey("ZSHRINK")) shrink_factor = GetInt("ZSHRINK"); if (shrink_factor != 1) { CheckIfFileIsConsistent(true); fNumTiles = fCatalog.size(); fNumRowsPerTile /= shrink_factor; } //compute the total size of each compressed tile fTileSize.resize(fNumTiles); fTileOffsets.resize(fNumTiles); for (uint32_t i=0;i buf(catSize); read(buf.data(), catSize); fCopy.write(buf.data(), catSize); if (!fCopy) clear(rdstate()|std::ios::badbit); } //overrides fits.h method with empty one //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow virtual void WriteRowToCopyFile(size_t row) { if (row == fRow+1) fRawsum.add(fBufferRow, false); } // Compressed version of the read row bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead) { if (rowNum >= GetNumRows()) return false; const uint32_t requestedTile = rowNum/fNumRowsPerTile; const uint32_t currentTile = fCurrentRow/fNumRowsPerTile; bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1)); fCurrentRow = rowNum; //should we read yet another chunk of data ? if (requestedTile != currentTile) { //read yet another chunk from the file const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(FITS::TileHeader); //skip to the beginning of the tile const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(FITS::TileHeader); seekg(fHeapOff+tileStart); //calculate the 32 bits offset of the current tile. const uint32_t offset = (tileStart + fHeapFromDataStart)%4; //point the tile header where it should be //we ain't checking the header now // TileHeader* tHead = reinterpret_cast(fCompressedBuffer.data()+offset); ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8)); //read one tile from disk read(fCompressedBuffer.data()+offset, sizeToRead); if (addCheckSum) fChkData.add(fCompressedBuffer); if (requestedTile == currentTile+1 && fCopy.is_open() && fCopy.good()) { fCopy.write(fCompressedBuffer.data()+offset, sizeToRead); if (!fCopy) clear(rdstate()|std::ios::badbit); } else if (fCopy.is_open()) clear(rdstate()|std::ios::badbit); const uint32_t thisRoundNumRows = (GetNumRows()offset; // pointer to column (destination buffer) switch (fColumnOrdering[i]) { case FITS::kOrderByRow: // regular, "semi-transposed" copy for (char *dest=buffer; destbytes); src += it->bytes; // next column } break; case FITS::kOrderByCol: // transposed copy for (char *elem=buffer; elembytes; elem+=it->size) // element-by-element (arrays) { for (char *dest=elem; destsize); src += it->size; // next element } } break; default: clear(rdstate()|std::ios::badbit); #ifdef __EXCEPTIONS throw std::runtime_error("Unkown column ordering scheme found"); #else gLog << ___err___ << "ERROR - unkown column ordering scheme" << std::endl; return false; #endif break; }; } } //Data loaded and uncompressed. Copy it to destination memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row); return good(); } // Read a bunch of uncompressed data uint32_t UncompressUNCOMPRESSED(char* dest, const char* src, uint32_t numElems, uint32_t sizeOfElems) { memcpy(dest, src, numElems*sizeOfElems); return numElems*sizeOfElems; } // Read a bunch of data compressed with the Huffman algorithm uint32_t UncompressHUFFMAN16(char* dest, const char* src, uint32_t numChunks) { std::vector uncompressed; //read compressed sizes (one per row) const uint32_t* compressedSizes = reinterpret_cast(src); src += sizeof(uint32_t)*numChunks; //uncompress the rows, one by one uint32_t sizeWritten = 0; for (uint32_t j=0;j(src), compressedSizes[j], uncompressed); memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t)); sizeWritten += uncompressed.size()*sizeof(uint16_t); dest += uncompressed.size()*sizeof(uint16_t); src += compressedSizes[j]; } return sizeWritten; } // Apply the inverse transform of the integer smoothing uint32_t UnApplySMOOTHING(int16_t* data, uint32_t numElems) { //un-do the integer smoothing for (uint32_t j=2;j(&fCompressedBuffer[compressedOffset]); fColumnOrdering[i] = head->ordering; const uint32_t numRows = (head->ordering==FITS::kOrderByRow) ? thisRoundNumRows : col.num; const uint32_t numCols = (head->ordering==FITS::kOrderByCol) ? thisRoundNumRows : col.num; const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(FITS::BlockHeader)+sizeof(uint16_t)*head->numProcs; for (int32_t j=head->numProcs-1;j >= 0; j--) { uint32_t sizeWritten=0; switch (head->processings[j]) { case FITS::kFactRaw: sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size); break; case FITS::kFactSmoothing: sizeWritten = UnApplySMOOTHING(reinterpret_cast(dest), numRows*numCols); break; case FITS::kFactHuffman16: sizeWritten = UncompressHUFFMAN16(dest, src, numRows); break; default: clear(rdstate()|std::ios::badbit); std::ostringstream str; str << "Unkown processing applied to data. Col " << i << " proc " << j << " out of " << (int)head->numProcs; #ifdef __EXCEPTIONS throw std::runtime_error(str.str()); #else gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << std::endl; return; #endif } //increment destination counter only when processing done. if (j==0) dest+= sizeWritten; } } } void CheckIfFileIsConsistent(bool update_catalog=false) { //goto start of heap streamoff whereAreWe = tellg(); seekg(fHeapOff); //init number of rows to zero uint64_t numRows = 0; //get number of columns from header size_t numCols = fTable.num_cols; std::vector > > catalog; FITS::TileHeader tileHead; FITS::BlockHeader columnHead; streamoff offsetInHeap = 0; //skip through the heap while (true) { read((char*)(&tileHead), sizeof(FITS::TileHeader)); //end of file if (!good()) break; //padding or corrupt data if (memcmp(tileHead.id, "TILE", 4)) { clear(rdstate()|std::ios::badbit); break; } //a new tile begins here catalog.emplace_back(std::vector >(0)); offsetInHeap += sizeof(FITS::TileHeader); //skip through the columns for (size_t i=0;i