/* * zfits.h * * Created on: May 16, 2013 * Author: lyard */ #ifndef MARS_ZFITS #define MARS_ZFITS #include #include "fits.h" #include "huffman.h" #ifndef __MARS__ namespace std { #endif class zfits : public fits { public: enum CompressionProcess_t { kFactRaw = 0x0, kFactSmoothing = 0x1, kFactHuffman16 = 0x2 }; enum RowOrdering_t { kOrderByCol = 'C', kOrderByRow = 'R' }; // Basic constructor zfits(const string& fname, const string& tableName="", bool force=false) : fits(), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0) { open(fname.c_str()); Constructor(fname, "", tableName, force); InitCompressionReading(); } // Alternative contstructor zfits(const string& fname, const string& fout, const string& tableName, bool force=false) : fits(), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0) { open(fname.c_str()); Constructor(fname, fout, tableName, force); InitCompressionReading(); } // Skip the next row bool SkipNextRow() { if (!fTable.is_compressed) return fits::SkipNextRow(); fRow++; return true; } virtual bool IsFileOk() const { bool rawsum = true; if (HasKey("RAWSUM")) { ostringstream str; str << fRawsum.val(); rawsum = (GetStr("RAWSUM") == str.str()); } return fits::IsFileOk() && rawsum; }; size_t GetNumRows() const { if (fTable.is_compressed) return fTable.Get("ZNAXIS2"); else return fTable.Get("NAXIS2"); } size_t GetBytesPerRow() const { if (fTable.is_compressed) return fTable.Get("ZNAXIS1"); else return fTable.Get("NAXIS1"); } protected: // Stage the requested row to internal buffer // Does NOT return data to users virtual void StageRow(size_t row, char* dest) { if (!fTable.is_compressed) { fits::StageRow(row, dest); return; } ReadBinaryRow(row, dest); } private: #ifndef __CINT__ //Structure helper for reading tiles headers struct TileHeader { char id[4]; uint32_t numRows; uint64_t size; TileHeader() {} TileHeader(uint32_t nRows, uint64_t s) : id({'T', 'I', 'L', 'E'}), numRows(nRows), size(s) { }; } __attribute__((__packed__)); //Structure helper for reading blocks headers and compresion schemes struct BlockHeader { uint64_t size; char ordering; unsigned char numProcs; uint16_t processings[]; BlockHeader(uint64_t s=0, char o=kOrderByRow, unsigned char n=1) : size(s), ordering(o), numProcs(n) {} } __attribute__((__packed__)); #endif // Do what it takes to initialize the compressed structured void InitCompressionReading() { if (!fTable.is_compressed) return; //The constructor may have failed if (!good()) return; if (fTable.is_compressed) for (auto it=fTable.sorted_cols.begin(); it!= fTable.sorted_cols.end(); it++) { if (it->comp == kCompFACT) continue; clear(rdstate()|ios::badbit); #ifdef __EXCEPTIONS throw runtime_error("Only the FACT compression scheme is handled by this reader."); #else gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << endl; return; #endif } fColumnOrdering.resize(fTable.sorted_cols.size()); for (auto it=fColumnOrdering.begin(); it != fColumnOrdering.end(); it++) (*it) = kOrderByRow; //Get compressed specific keywords fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0; fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0; //give it some space for uncompressing AllocateBuffers(); //read the file's catalog ReadCatalog(); //check that heap agrees with head //CheckIfFileIsConsistent(); } // Copy decompressed data to location requested by user void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c) { if (!fTable.is_compressed) { fits::MoveColumnDataToUserSpace(dest, src, c); return; } memcpy(dest, src, c.num*c.size); } vector fBuffer; /// fTransposedBuffer; /// fCompressedBuffer; /// fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile. size_t fNumTiles; ///< Total number of tiles size_t fNumRowsPerTile; ///< Number of rows per compressed tile int64_t fCurrentRow; ///< current row in memory signed because we need -1 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data streamoff fHeapFromDataStart; ///< offset from the beginning of the data table vector>> fCatalog;///< Catalog, i.e. the main table that points to the compressed data. vector fTileSize; ///< size in bytes of each compressed tile vector> fTileOffsets; ///< offset from start of tile of a given compressed column Checksum fRawsum; ///< Checksum of the uncompressed, raw data // Get buffer space void AllocateBuffers() { fBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile); fTransposedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile); fCompressedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile + fTable.num_cols*(sizeof(BlockHeader)+256) + //use a bit more memory for block headers. 256 char coding the compression sequence max. sizeof(TileHeader), //a bit more for the tile headers 8); //and a bit more for checksuming } // Read catalog data. I.e. the address of the compressed data inside the heap void ReadCatalog() { vector readBuf(16); fCatalog.resize(fNumTiles); const streampos catalogStart = tellg(); fChkData.reset(); //do the actual reading for (uint32_t i=0;i(reinterpret_cast(tempValues), readBuf.data(), 2); if (tempValues[0] < 0 || tempValues[1] < 0) { clear(rdstate()|ios::badbit); #ifdef __EXCEPTIONS throw runtime_error("Negative value in the catalog"); #else gLog << ___err___ << "ERROR - negative value in the catalog" << endl; return; #endif } //add catalog entry fCatalog[i].emplace_back(tempValues[0], tempValues[1]); } //compute the total size of each compressed tile fTileSize.resize(fNumTiles); fTileOffsets.resize(fNumTiles); for (uint32_t i=0;i buf(catSize); read(buf.data(), catSize); fCopy.write(buf.data(), catSize); if (!fCopy) clear(rdstate()|ios::badbit); } //overrides fits.h method with empty one //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow virtual void WriteRowToCopyFile(size_t row) { if (row == fRow+1) fRawsum.add(fBufferRow, false); } // Compressed version of the read row bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead) { if (rowNum >= GetNumRows()) return false; const uint32_t requestedTile = rowNum/fNumRowsPerTile; const uint32_t currentTile = fCurrentRow/fNumRowsPerTile; bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1)); fCurrentRow = rowNum; //should we read yet another chunk of data ? if (requestedTile != currentTile) { //read yet another chunk from the file const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(TileHeader); //skip to the beginning of the tile const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(TileHeader); seekg(fHeapOff+tileStart); //calculate the 32 bits offset of the current tile. const uint32_t offset = (tileStart + fHeapFromDataStart)%4; //point the tile header where it should be //we ain't checking the header now // TileHeader* tHead = reinterpret_cast(fCompressedBuffer.data()+offset); ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8)); //read one tile from disk read(fCompressedBuffer.data()+offset, sizeToRead); if (addCheckSum) fChkData.add(fCompressedBuffer); if (requestedTile == currentTile+1 && fCopy.is_open() && fCopy.good()) { fCopy.write(fCompressedBuffer.data()+offset, sizeToRead); if (!fCopy) clear(rdstate()|ios::badbit); } else if (fCopy.is_open()) clear(rdstate()|ios::badbit); const uint32_t thisRoundNumRows = (GetNumRows()offset; // pointer to column (destination buffer) switch (fColumnOrdering[i]) { case kOrderByRow: // regular, "semi-transposed" copy for (char *dest=buffer; destbytes); src += it->bytes; // next column } break; case kOrderByCol: // transposed copy for (char *elem=buffer; elembytes; elem+=it->size) // element-by-element (arrays) { for (char *dest=elem; destsize); src += it->size; // next element } } break; default: clear(rdstate()|ios::badbit); #ifdef __EXCEPTIONS throw runtime_error("Unkown column ordering scheme found"); #else gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl; return false; #endif break; }; } } //Data loaded and uncompressed. Copy it to destination memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row); return good(); } // Read a bunch of uncompressed data uint32_t UncompressUNCOMPRESSED(char* dest, const char* src, uint32_t numElems, uint32_t sizeOfElems) { memcpy(dest, src, numElems*sizeOfElems); return numElems*sizeOfElems; } // Read a bunch of data compressed with the Huffman algorithm uint32_t UncompressHUFFMAN16(char* dest, const char* src, uint32_t numChunks) { vector uncompressed; //read compressed sizes (one per row) const uint32_t* compressedSizes = reinterpret_cast(src); src += sizeof(uint32_t)*numChunks; //uncompress the rows, one by one uint32_t sizeWritten = 0; for (uint32_t j=0;j(src), compressedSizes[j], uncompressed); memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t)); sizeWritten += uncompressed.size()*sizeof(uint16_t); dest += uncompressed.size()*sizeof(uint16_t); src += compressedSizes[j]; } return sizeWritten; } // Apply the inverse transform of the integer smoothing uint32_t UnApplySMOOTHING(int16_t* data, uint32_t numElems) { //un-do the integer smoothing for (uint32_t j=2;j(&fCompressedBuffer[compressedOffset]); fColumnOrdering[i] = head->ordering; const uint32_t numRows = (head->ordering==kOrderByRow) ? thisRoundNumRows : col.num; const uint32_t numCols = (head->ordering==kOrderByCol) ? thisRoundNumRows : col.num; const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(BlockHeader)+sizeof(uint16_t)*head->numProcs; for (int32_t j=head->numProcs-1;j >= 0; j--) { uint32_t sizeWritten=0; switch (head->processings[j]) { case kFactRaw: sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size); break; case kFactSmoothing: sizeWritten = UnApplySMOOTHING(reinterpret_cast(dest), numRows*numCols); break; case kFactHuffman16: sizeWritten = UncompressHUFFMAN16(dest, src, numRows); break; default: clear(rdstate()|ios::badbit); #ifdef __EXCEPTIONS throw runtime_error("Unknown processing applied to data. Aborting"); #else gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl; return; #endif } //increment destination counter only when processing done. if (j==0) dest+= sizeWritten; } } } void CheckIfFileIsConsistent() { //goto start of heap streamoff whereAreWe = tellg(); seekg(fHeapOff); //init number of rows to zero uint64_t numRows = 0; //get number of columns from header size_t numCols = fTable.num_cols; vector > > catalog; TileHeader tileHead; BlockHeader columnHead; streamoff offsetInHeap = 0; //skip through the heap while (true) { read((char*)(&tileHead), sizeof(TileHeader)); //end of file if (!good()) break; //padding or corrupt data if (memcmp(tileHead.id, "TILE", 4)) { clear(rdstate()|ios::badbit); break; } //a new tile begins here catalog.push_back(vector >(0)); offsetInHeap += sizeof(TileHeader); //skip through the columns for (size_t i=0;i