#ifndef FACT_zofits #define FACT_zofits /* * zofits.h * * FACT native compressed FITS writer * Author: lyard */ #include "ofits.h" #include "zfits.h" #include "Queue.h" #include "MemoryManager.h" #ifdef USE_BOOST_THREADS #include #endif #ifndef __MARS__ namespace std { #else using namespace std; #endif class zofits : public ofits { /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin template struct QueueMin : std::list { typename std::list::iterator begin() { return min_element(std::list::begin(), std::list::end()); } }; /// Parameters required to write a tile to disk struct WriteTarget { bool operator < (const WriteTarget& other) { return tile_num < other.tile_num; } uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order) uint32_t size; ///< Size to write shared_ptr data; ///< Memory block to write }; /// Parameters required to compress a tile of data struct CompressionTarget { shared_ptr src; ///< Original data shared_ptr transposed_src; ///< Transposed data WriteTarget target; ///< Compressed data uint32_t num_rows; ///< Number of rows to compress }; public: /// static setter for the default number of threads to use. -1 means all available physical cores static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; } static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; } static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; } static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; } /// constructors /// @param numTiles how many data groups should be pre-reserved ? /// @param rowPerTile how many rows will be grouped together in a single tile /// @param maxUsableMem how many bytes of memory can be used by the compression buffers zofits(uint32_t numTiles = DefaultMaxNumTiles(), uint32_t rowPerTile = DefaultNumRowsPerTile(), uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(), fMemPool(0, maxUsableMem*1000), fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false) { InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000); SetNumThreads(DefaultNumThreads()); } /// @param fname the target filename /// @param numTiles how many data groups should be pre-reserved ? /// @param rowPerTile how many rows will be grouped together in a single tile /// @param maxUsableMem how many bytes of memory can be used by the compression buffers zofits(const char* fname, uint32_t numTiles = DefaultMaxNumTiles(), uint32_t rowPerTile = DefaultNumRowsPerTile(), uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname), fMemPool(0, maxUsableMem*1000), fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false) { InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000); SetNumThreads(DefaultNumThreads()); } //initialization of member variables /// @param nt number of tiles /// @param rpt number of rows per tile /// @param maxUsableMem max amount of RAM to be used by the compression buffers void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0) { if (nt == 0) throw runtime_error("There must be at least 1 tile of data (0 specified). This is required by the FITS standard. Please try again with num_tile >= 1."); fCheckOffset = 0; fNumQueues = 0; fNumTiles = nt; fNumRowsPerTile = rpt; fBuffer = NULL; fRealRowWidth = 0; fCatalogExtraRows = 0; fCatalogOffset = 0; fMaxUsableMem = maxUsableMem; #ifdef __EXCEPTIONS fThreadsException = exception_ptr(); #endif } /// write the header of the binary table /// @param name the name of the table to be created /// @return the state of the file virtual bool WriteTableHeader(const char* name="DATA") { reallocateBuffers(); ofits::WriteTableHeader(name); if (fNumQueues != 0) { //start the compression queues for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++) it->start(); //start the disk writer fWriteToDiskQueue.start(); } //mark that no tile has been written so far fLatestWrittenTile = -1; return good(); } /// open a new file. /// @param filename the name of the file /// @param Whether or not the name of the extension should be added or not void open(const char* filename, bool addEXTNAMEKey=true) { ofits::open(filename, addEXTNAMEKey); //add compression-related header entries SetBool( "ZTABLE", true, "Table is compressed"); SetInt( "ZNAXIS1", 0, "Width of uncompressed rows"); SetInt( "ZNAXIS2", 0, "Number of uncompressed rows"); SetInt( "ZPCOUNT", 0, ""); SetInt( "ZHEAPPTR", 0, ""); SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile"); SetInt( "THEAP", 0, ""); SetStr( "RAWSUM", " 0", "Checksum of raw little endian data"); SetFloat("ZRATIO", 0, "Compression ratio"); fCatalogExtraRows = 0; fRawSum.reset(); } /// Super method. does nothing as zofits does not know about DrsOffsets /// @return the state of the file virtual bool WriteDrsOffsetsTable() { return good(); } /// Returns the number of bytes per uncompressed row /// @return number of bytes per uncompressed row uint32_t GetBytesPerRow() const { return fRealRowWidth; } /// Write the data catalog /// @return the state of the file bool WriteCatalog() { const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t); const uint32_t total_catalog_size = fCatalog.size()*one_catalog_row_size; // swap the catalog bytes before writing vector swapped_catalog(total_catalog_size); uint32_t shift = 0; for (auto it=fCatalog.begin(); it!=fCatalog.end(); it++) { revcpy(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2); shift += one_catalog_row_size; } // first time writing ? remember where we are if (fCatalogOffset == 0) fCatalogOffset = tellp(); // remember where we came from const off_t where_are_we = tellp(); // write to disk seekp(fCatalogOffset); write(swapped_catalog.data(), total_catalog_size); if (where_are_we != fCatalogOffset) seekp(where_are_we); // udpate checksum fCatalogSum.reset(); fCatalogSum.add(swapped_catalog.data(), total_catalog_size); return good(); } /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets. virtual void DrsOffsetCalibrate(char* ) { } /// Grows the catalog in case not enough rows were allocated void GrowCatalog() { uint32_t orig_catalog_size = fCatalog.size(); fCatalog.resize(fCatalog.size()*2); for (uint32_t i=orig_catalog_size;i= fNumRowsPerTile*fNumTiles) { // GrowCatalog(); #ifdef __EXCEPTIONS throw runtime_error("Maximum number of rows exceeded for this file"); #else gLog << ___err___ << "ERROR - Maximum number of rows exceeded for this file" << endl; return false; #endif } //copy current row to pool or rows waiting for compression char* target_location = fBuffer + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile); memcpy(target_location, ptr, fRealRowWidth); //for now, make an extra copy of the data, for RAWSUM checksuming. //Ideally this should be moved to the threads //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full.... const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4; char* buffer = fRawSumBuffer.data() + rawOffset; auto ib = fRawSumBuffer.begin(); auto ie = fRawSumBuffer.rbegin(); *ib++ = 0; *ib++ = 0; *ib++ = 0; *ib = 0; *ie++ = 0; *ie++ = 0; *ie++ = 0; *ie = 0; memcpy(buffer, ptr, fRealRowWidth); fRawSum.add(fRawSumBuffer, false); DrsOffsetCalibrate(target_location); fTable.num_rows++; if (fTable.num_rows % fNumRowsPerTile == 0) { CompressionTarget compress_target; SetNextCompression(compress_target); if (fNumQueues == 0) { //no worker threads. do everything in-line uint64_t size_to_write = CompressBuffer(compress_target); WriteTarget write_target; write_target.size = size_to_write; write_target.data = compress_target.target.data; write_target.tile_num = compress_target.target.tile_num; return WriteBufferToDisk(write_target); } else { //if all queues are empty, use queue 0 uint32_t min_index = 0; uint32_t min_size = numeric_limits::max(); uint32_t current_index = 0; for (auto it=fCompressionQueues.begin(); it!=fCompressionQueues.end(); it++) { if (it->size() < min_size) { min_index = current_index; min_size = it->size(); } current_index++; } if (!fCompressionQueues[min_index].post(compress_target)) throw runtime_error("The compression queues are not started. Did you close the file before writing this row ?"); } } return good(); } /// update the real number of rows void FlushNumRows() { SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile); SetInt("ZNAXIS2", fTable.num_rows); FlushHeader(); } /// Setup the environment to compress yet another tile of data /// @param target the struct where to host the produced parameters void SetNextCompression(CompressionTarget& target) { //get space for transposed data shared_ptr transposed_data = fMemPool.malloc(); //fill up write to disk target WriteTarget write_target; write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile; write_target.size = 0; write_target.data = fMemPool.malloc(); //fill up compression target target.src = fSmartBuffer; target.transposed_src = transposed_data; target.target = write_target; target.num_rows = fTable.num_rows; //get a new buffer to host the incoming data fSmartBuffer = fMemPool.malloc(); fBuffer = fSmartBuffer.get()->get(); } /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file. void ShrinkCatalog() { //did we write more rows than what the catalog could host ? if (fCatalogExtraRows != 0) { //how many rows can the regular catalog host ? const uint32_t max_regular_rows = (fCatalog.size() - fCatalogExtraRows)*fNumRowsPerTile; //what's the shrink factor to be applied ? const uint32_t shrink_factor = fTable.num_rows/max_regular_rows + ((fTable.num_rows%max_regular_rows) ? 1 : 0); //shrink the catalog ! for (uint32_t i=0; iwait(); fWriteToDiskQueue.wait(); if (tellp() < 0) { #ifdef __EXCEPTIONS throw runtime_error("Looks like the file has been closed already"); #else return false; #endif } #ifdef __EXCEPTIONS //check if something hapenned while the compression threads were working if (fThreadsException != exception_ptr()) { //if so, re-throw the exception that was generated rethrow_exception(fThreadsException); } #endif //write the last tile of data (if any if (fTable.num_rows%fNumRowsPerTile != 0) { CompressionTarget compress_target; SetNextCompression(compress_target); //set number of threads to zero before calling compressBuffer int32_t backup_num_queues = fNumQueues; fNumQueues = 0; uint64_t size_to_write = CompressBuffer(compress_target); fNumQueues = backup_num_queues; WriteTarget write_target; write_target.size = size_to_write; write_target.data = compress_target.target.data; write_target.tile_num = compress_target.target.tile_num; if (!WriteBufferToDisk(write_target)) throw runtime_error("Something went wrong while writing the last tile..."); } AlignTo2880Bytes(); //update header keywords SetInt("ZNAXIS1", fRealRowWidth); SetInt("ZNAXIS2", fTable.num_rows); SetInt("ZHEAPPTR", fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2); const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile; const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols; SetInt("THEAP", total_num_tiles_written*total_catalog_width); SetInt("NAXIS1", total_catalog_width); SetInt("NAXIS2", total_num_tiles_written); ostringstream str; str << fRawSum.val(); SetStr("RAWSUM", str.str()); int64_t heap_size = 0; int64_t compressed_offset = 0; for (uint32_t i=0; i num_available_cores) num = num_available_cores>2 ? num_available_cores-2 : 1; if (fCompressionQueues.size() != uint32_t(num)) { fCompressionQueues.resize(num, Queue(bind(&zofits::CompressBuffer, this, placeholders::_1), false)); fNumQueues = num; } return true; } protected: /// Allocates the required objects. void reallocateBuffers() { const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming; fMemPool.setChunkSize(chunk_size); fSmartBuffer = fMemPool.malloc(); fBuffer = fSmartBuffer.get()->get(); fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming //give the catalog enough space fCatalog.resize(fNumTiles); for (uint32_t i=0;iget(), target.transposed_src.get()->get()); //compress the buffer compressed_size = compressBuffer(target.target.data.get()->get(), target.transposed_src.get()->get(), target.num_rows); #ifdef __EXCEPTIONS } catch (...) { fThreadsException = current_exception(); if (fNumQueues == 0) rethrow_exception(fThreadsException); } #endif if (fNumQueues == 0) return compressed_size; //post the result to the writing queue //get a copy so that it becomes non-const WriteTarget wt; wt.tile_num = target.target.tile_num; wt.size = compressed_size; wt.data = target.target.data; fWriteToDiskQueue.post(wt); // if used by the queue, always return true as the elements are not ordered return 1; } /// Write one compressed tile to disk. This is the method executed by the writing thread /// @param target the struct hosting the write parameters bool WriteBufferToDisk(const WriteTarget& target) { //is this the tile we're supposed to write ? if (target.tile_num != (uint32_t)(fLatestWrittenTile+1)) return false; fLatestWrittenTile++; #ifdef __EXCEPTIONS try { #endif if (!writeCompressedDataToDisk(target.data.get()->get(), target.size)) {//could not write the data to disk ostringstream str; str << "An error occured while writing to disk: "; if (eof()) str << "End-Of-File"; if (failbit) str << "Logical error on i/o operation"; if (badbit) str << "Writing error on i/o operation"; #ifdef __EXCEPTIONS throw runtime_error(str.str()); #else gLog << ___err___ << "ERROR - " << str.str() << endl; #endif } #ifdef __EXCEPTIONS } catch(...) { fThreadsException = current_exception(); if (fNumQueues == 0) rethrow_exception(fThreadsException); } #endif return true; } /// Compress a given buffer based on its source and destination //src cannot be const, as applySMOOTHING is done in place /// @param dest the buffer hosting the compressed data /// @param src the buffer hosting the transposed data /// @param num_rows the number of uncompressed rows in the transposed buffer /// @param the number of bytes of the compressed data uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows) { const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile; const uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile; uint32_t offset = 0; //skip the checksum reserved area dest += 4; //skip the 'TILE' marker and tile size entry uint64_t compressedOffset = sizeof(TileHeader); //now compress each column one by one by calling compression on arrays for (uint32_t i=0;i fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two) {//if so set flag and redo it uncompressed // cout << "Redoing uncompressed ! " << endl; //de-smooth ! if (head.getProc(0) == kFactSmoothing) UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows); Compression he; compressedOffset = previousOffset + he.getSizeOnDisk(); compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num); he.SetBlockSize(compressedOffset - previousOffset); he.Memcpy(dest+previousOffset); offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num; fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second; continue; } head.SetBlockSize(compressedOffset - previousOffset); head.Memcpy(dest + previousOffset); offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num; fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second; } TileHeader tile_head(thisRoundNumRows, compressedOffset); memcpy(dest, &tile_head, sizeof(TileHeader)); return compressedOffset; } /// Transpose a tile to a new buffer /// @param src buffer hosting the regular, row-ordered data /// @param dest the target buffer that will receive the transposed data void copyTransposeTile(const char* src, char* dest) { const uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile; //copy the tile and transpose it for (uint32_t i=0;i(&src[j*sizeOfElems*numRows]), numRows*(sizeOfElems/2)); reinterpret_cast(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize; huffmanOffset += sizeof(uint32_t); previousHuffmanSize = huffmanOutput.size(); } const size_t totalSize = huffmanOutput.size() + huffmanOffset; //only copy if not larger than not-compressed size if (totalSize < numRows*sizeOfElems*numRowElems) memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size()); return totalSize; } /// Applies Thomas' DRS4 smoothing /// @param data where to apply it /// @param numElems how many elements of type int16_t are stored in the buffer /// @return number of bytes modified uint32_t applySMOOTHING(char* data, uint32_t numElems) { int16_t* short_data = reinterpret_cast(data); for (int j=numElems-1;j>1;j--) short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2; return numElems*sizeof(int16_t); } /// Apply the inverse transform of the integer smoothing /// @param data where to apply it /// @param numElems how many elements of type int16_t are stored in the buffer /// @return number of bytes modified uint32_t UnApplySMOOTHING(char* data, uint32_t numElems) { int16_t* short_data = reinterpret_cast(data); //un-do the integer smoothing for (uint32_t j=2;j> fCompressionQueues; ///< Processing queues (=threads) Queue> fWriteToDiskQueue; ///< Writing queue (=thread) // catalog related stuff struct CatalogEntry { CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {}; int64_t first; ///< Size of this column in the tile int64_t second; ///< offset of this column in the tile, from the start of the heap area } __attribute__((__packed__)); typedef vector CatalogRow; typedef vector CatalogType; CatalogType fCatalog; ///< Catalog for this file // uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists) uint32_t fNumTiles; ///< Number of pre-reserved tiles uint32_t fNumRowsPerTile; ///< Number of rows per tile off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file uint32_t fCatalogExtraRows; ///< Number of extra rows written on top of the initial capacity of the file // checksum related stuff Checksum fCatalogSum; ///< Checksum of the catalog Checksum fRawSum; ///< Raw sum (specific to FACT) int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum // data layout related stuff /// Regular columns augmented with compression informations struct CompressedColumn { CompressedColumn(const Table::Column& c, const Compression& h) : col(c), block_head(h) {} Table::Column col; ///< the regular column entry Compression block_head; ///< the compression data associated with that column }; vector fRealColumns; ///< Vector hosting the columns of the file uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row shared_ptr fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written char* fBuffer; ///< regular version of fSmartBuffer vector fRawSumBuffer;///< buffer used for checksuming the incoming data, before compression #ifdef __EXCEPTIONS exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads #endif }; #ifndef __MARS__ }; //namespace std #endif #endif