/* * zofits.h * * FACT native compressed FITS writer * Author: lyard */ #include "ofits.h" #ifndef __MARS__ namespace std { #else using namespace std; #endif class zofits : public ofits { public: //This has been duplicated from zfits. Should be be located one level up ? //If so, where ? enum CompressionProcess_t { kFactRaw = 0x0, kFactSmoothing = 0x1, kFactHuffman16 = 0x2 }; enum RowOrdering_t { kOrderByCol = 'C', kOrderByRow = 'R' }; //TileHeaders are only written, but never read-back //They are here to be able to recover raw data from binary if the header is corrupted //Or to cross-check the data, if desired: the zfits method CheckIfFileIsConsistent can do this struct TileHeader { char id[4]; uint32_t numRows; uint64_t size; TileHeader(uint32_t nRows=0, uint64_t s=0) : id({'T', 'I', 'L', 'E'}), numRows(nRows), size(s) { }; } __attribute__((__packed__)); //BlockHeaders are written before every compressed blob of data struct BlockHeader { uint64_t size; char ordering; unsigned char numProcs; BlockHeader(uint64_t s=0, char o=zfits::kOrderByRow, unsigned char n=1) : size(s), ordering(o), numProcs(n) {} } __attribute__((__packed__)) ; //constructors zofits(uint32_t numTiles=1000, uint32_t rowPerTile=100) : ofits() { InitMemberVariables(numTiles, rowPerTile); SetNumWorkingThreads(1); } zofits(const char* fname, uint32_t numTiles=1000, uint32_t rowPerTile=100) : ofits(fname) { InitMemberVariables(numTiles, rowPerTile); SetNumWorkingThreads(1); } ~zofits() { } //initialization of member variables void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0) { fCheckOffset = 0; fThreadLooper = 0; fNumTiles = nt; fNumRowsPerTile = rpt; fNumThreads = 1; fThreadIndex = 0; ///< A variable to assign threads indices fBuffer = NULL; fRealRowWidth = 0; fCatalogOffset = 0; fStartCellsOffset = -1; fDataOffset = -1; } //whether or not a calibration was given to the file writer bool IsOffsetCalibrated() { return (fOffsetCalibration.size() != 0); } //assign a given drs offset calibration void SetDrsCalibration(const float* calib) { if (!IsOffsetCalibrated()) fOffsetCalibration.resize(1440*1024); for (uint32_t i=0;i<1440*1024;i++) fOffsetCalibration[i] = (int16_t)(calib[i]*4096.f/2000.f); } void SetDrsCalibration(const vector& calib) { if (calib.size() != 1440*1024) #ifdef __EXCEPTIONS throw runtime_error("Cannot load calibration with anything else than 1024 samples per pixel"); #else gLog << ___err___ << "ERROR - Cannot load calibration with anything else than 1024 samples per pixel"); #endif SetDrsCalibration(calib.data()); } void LoadDrsCalibrationFromFile(const string& fileName) { factfits drsFile(fileName); float* drsCalibFloat = reinterpret_cast(drsFile.SetPtrAddress("BaselineMean")); drsFile.GetNextRow(); SetDrsCalibration(drsCalibFloat); } //write the header of the binary table bool WriteTableHeader(const char* name="DATA") { ofits::WriteTableHeader(name); if (IsOffsetCalibrated()) {//retrieve the column storing the start cell offsets, if required. for (auto it=fRealColumns.begin(); it!=fRealColumns.end(); it++)//Table.cols.begin(); it!= fTable.cols.end(); it++) { if (it->col.name == "StartCellData") fStartCellsOffset = it->col.offset; if (it->col.name == "Data") { fNumSlices = it->col.num; fDataOffset = it->col.offset; if (fNumSlices % 1440 != 0) { #ifdef __EXCEPTIONS throw runtime_error("Number of data samples not a multiple of 1440."); #else gLog << ___err___ << "ERROR - Number of data samples not a multiple of 1440. Doing it uncalibrated." << endl; #endif fOffsetCalibration.resize(0); } fNumSlices /= 1440; } } if (fStartCellsOffset < 0) { #ifdef __EXCEPTIONS throw runtime_error("FACT Calibration requested, but \"StartCellData\" column not found."); #else gLog << ___err___ << "ERROR - FACT Calibration requested, but \"StartCellData\" column not found. Doing it uncalibrated." << endl; #endif //throw away the calibration data fOffsetCalibration.resize(0); } if (fDataOffset < 0) { #ifdef __EXCEPTIONS throw runtime_error("FACT Calibration requested, but \"Data\" column not found."); #else gLog << ___err___ << "ERROR - FACT Calibration requested, but \"Data\" column not found. Doing it uncalibrated." << endl; #endif //throw away the calibration data fOffsetCalibration.resize(0); } } } void open(const char* filename, bool addEXTNAMEKey=true) { ofits::open(filename, addEXTNAMEKey); //add compression-related header entries SetBool("ZTABLE", true, "Table is compressed"); SetInt("ZNAXIS1", 0, "Width of uncompressed rows"); SetInt("ZNAXIS2", 0, "Number of uncompressed rows"); SetInt("ZPCOUNT", 0, ""); SetInt("ZHEAPPTR", 0, ""); SetInt("ZTILELEN", fNumRowsPerTile, "Number of rows per tile"); SetInt("THEAP", 0, ""); SetStr("RAWSUM", " 0", "Checksum of raw littlen endian data"); fRawSum.reset(); //start the compression threads pthread_mutex_init(&fMutex, NULL); fThreadIndex = 0; for (uint32_t i=0;i swappedOffsets; swappedOffsets.resize(1024*1440*sizeof(int16_t)); revcpy(swappedOffsets.data(), (char*)(fOffsetCalibration.data()), 1024*1440); Checksum datasum; datasum.add(swappedOffsets.data(), sizeof(int16_t)*1024*1440); ostringstream dataSumStr; dataSumStr << datasum.val(); c.SetStr("DATASUM", dataSumStr.str()); datasum += c.WriteHeader(*this); const off_t here_I_am = tellp(); c.SetStr("CHECKSUM", datasum.str()); c.WriteHeader(*this); seekp(here_I_am); write(swappedOffsets.data(), swappedOffsets.size()); AlignTo2880Bytes(); return good(); } uint32_t GetBytesPerRow() const { return fRealRowWidth; } bool WriteCatalog() { const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t); const uint32_t total_catalog_size = fCatalog.size()*one_catalog_row_size; vector swapped_catalog(total_catalog_size); uint32_t shift = 0; for (auto it=fCatalog.begin(); it!=fCatalog.end(); it++) { revcpy(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2); shift += one_catalog_row_size; } if (fCatalogOffset == 0) { fCatalogOffset = tellp(); } const off_t where_are_we = tellp(); seekp(fCatalogOffset); write(swapped_catalog.data(), total_catalog_size); if (where_are_we != fCatalogOffset) seekp(where_are_we); fCatalogSum.reset(); fCatalogSum.add(swapped_catalog.data(), total_catalog_size); return good(); } bool WriteRow(const void* ptr, size_t cnt, bool byte_swap=true) { if (cnt != fRealRowWidth) { #ifdef __EXCEPTIONS throw runtime_error("Wrong size of row given to WriteRow"); #else gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << endl; return false; #endif } if (fTable.num_rows >= fNumRowsPerTile*fNumTiles) { #ifdef __EXCEPTIONS throw runtime_error("Maximum number of rows exceeded for this file"); #else gLog << ___err___ << "ERROR - Maximum number of rows exceeded for this file" << endl; return false; #endif } //copy current row to pool or rows waiting for compression char* target_location = fBuffer + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile); memcpy(target_location, ptr, fRealRowWidth); //for now, make an extra copy of the data, for RAWSUM checksuming. //Ideally this should be moved to the threads, along with the drs-offset-calibration //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full.... const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4; char* buffer = fRawSumBuffer.data() + rawOffset; auto ib = fRawSumBuffer.begin(); auto ie = fRawSumBuffer.rbegin(); *ib++ = 0; *ib++ = 0; *ib++ = 0; *ib = 0; *ie++ = 0; *ie++ = 0; *ie++ = 0; *ie = 0; memcpy(buffer, ptr, fRealRowWidth); fRawSum.add(fRawSumBuffer, false); if (IsOffsetCalibrated()) { int16_t* startCell = reinterpret_cast(target_location + fStartCellsOffset); int16_t* data = reinterpret_cast(target_location + fDataOffset); for (uint32_t ch=0; ch<1440; ch++) { if (startCell[ch] < 0) { data += fNumSlices; continue; } const int16_t modStart = startCell[ch]%1024; const int16_t *off = fOffsetCalibration.data() + ch*1024; const int16_t* cal = off+modStart; const int16_t* end_stride = data+fNumSlices; if (modStart+fNumSlices > 1024) { while (cal < off+1024) *data++ -= *cal++; cal = off; } while (data processing(1); processing[0] = kFactRaw; AddColumn(cnt, typechar, name, unit, head, processing, comment, addHeaderKeys); } bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, BlockHeader& header, vector& comp_sequence, const string& comment="", bool addHeaderKeys=true) { if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys)) return false; Table::Column col; size_t size = SizeFromType(typechar); col.name = name; col.type = typechar; col.num = cnt; col.size = size; col.offset = fRealRowWidth; fRealRowWidth += size*cnt; fRealColumns.emplace_back(CompressedColumn(col, header, comp_sequence)); ostringstream strKey, strVal, strCom; strKey << "ZFORM" << fRealColumns.size(); strVal << cnt << typechar; strCom << "format of " << name << " [" << CommentFromType(typechar); SetStr(strKey.str(), strVal.str(), strCom.str()); strKey.str(""); strVal.str(""); strCom.str(""); strKey << "ZCTYP" << fRealColumns.size(); strVal << "FACT"; strCom << "Comp. of FACT telescope"; SetStr(strKey.str(), strVal.str(), strCom.str()); return reallocateBuffers(); } bool SetNumWorkingThreads(uint32_t num) { if (is_open()) { #ifdef __EXCEPTIONS throw runtime_error("File must be closed before changing the number of compression threads"); #else gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads"); #endif return false; } if (num < 1 || num > 64) { #ifdef __EXCEPTIONS throw runtime_error("Number of threads must be between 1 and 64"); #else gLog << ___err___ << "ERROR - Number of threads must be between 1 and 64"); #endif return false; } fNumThreads = num; fThreadStatus.resize(num); fThread.resize(num); fThreadNumRows.resize(num); for (uint32_t i=0;i(context); uint32_t myID = 0; pthread_mutex_lock(&(myself->fMutex)); myID = myself->fThreadIndex++; pthread_mutex_unlock(&(myself->fMutex)); uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->fNumThreads-1 : myID-1; while (myself->fThreadStatus[myID] != _THREAD_EXIT_) { while (myself->fThreadStatus[myID] == _THREAD_WAIT_) usleep(100000); if (myself->fThreadStatus[myID] != _THREAD_COMPRESS_) continue; uint32_t numBytes = myself->compressBuffer(myID); myself->fThreadStatus[myID] = _THREAD_WRITE_; //wait for the previous data to be written while (myself->fThreadIndex != threadToWaitForBeforeWriting) usleep(1000); //do the actual writing to disk pthread_mutex_lock(&(myself->fMutex)); myself->writeCompressedDataToDisk(myID, numBytes); myself->fThreadIndex = myID; pthread_mutex_unlock(&(myself->fMutex)); myself->fThreadStatus[myID] = _THREAD_WAIT_; } return NULL; } uint64_t compressBuffer(uint32_t threadIndex) { uint32_t thisRoundNumRows = (fThreadNumRows[threadIndex]%fNumRowsPerTile) ? fThreadNumRows[threadIndex]%fNumRowsPerTile : fNumRowsPerTile; uint32_t offset=0; uint32_t currentCatalogRow = (fThreadNumRows[threadIndex]-1)/fNumRowsPerTile; uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry //now compress each column one by one by calling compression on arrays for (uint32_t i=0;i& sequence = fRealColumns[i].comp_sequence; //set the default byte telling if uncompressed the compressed Flag uint64_t previousOffset = compressedOffset; //skip header data compressedOffset += sizeof(BlockHeader) + sizeof(uint16_t)*sequence.size(); for (uint32_t j=0;j fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size()) {//if so set flag and redo it uncompressed //cout << "REDOING UNCOMPRESSED" << endl; compressedOffset = previousOffset + sizeof(BlockHeader) + 1; compressedOffset += compressUNCOMPRESSED(&(fCompressedBuffer[threadIndex][compressedOffset]), &(fTransposedBuffer[threadIndex][offset]), thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num); BlockHeader he; he.size = compressedOffset - previousOffset; he.numProcs = 1; he.ordering = zfits::kOrderByRow; memcpy(&(fCompressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader)); fCompressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = zfits::kFactRaw; offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num; fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second; continue; } head.size = compressedOffset - previousOffset; memcpy(&(fCompressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader)); memcpy(&(fCompressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size()); offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num; fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second; } TileHeader tHead(thisRoundNumRows, compressedOffset); memcpy(fCompressedBuffer[threadIndex], &tHead, sizeof(TileHeader)); return compressedOffset; } void copyTransposeTile(uint32_t index) { uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile; //copy the tile and transpose it uint32_t offset = 0; for (uint32_t i=0;i(&src[j*sizeOfElems*numRows]), numRows*(sizeOfElems/2)); reinterpret_cast(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize; huffmanOffset += sizeof(uint32_t); previousHuffmanSize = huffmanOutput.size(); } const size_t totalSize = huffmanOutput.size() + huffmanOffset; //only copy if not larger than not-compressed size if (totalSize < numRows*sizeOfElems*numRowElems) memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size()); return totalSize; } uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems) { uint32_t colWidth = numRowElems; for (int j=colWidth*numRows-1;j>1;j--) reinterpret_cast(src)[j] = reinterpret_cast(src)[j] - (reinterpret_cast(src)[j-1]+reinterpret_cast(src)[j-2])/2; //call the huffman transposed return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows); } uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems) { uint32_t colWidth = numRowElems; for (int j=colWidth*numRows-1;j>1;j--) reinterpret_cast(src)[j] = reinterpret_cast(src)[j] - (reinterpret_cast(src)[j-1]+reinterpret_cast(src)[j-2])/2; return numRows*sizeOfElems*numRowElems; } //Offsets calibration stuff. vector fOffsetCalibration; ///< The calibration itself int32_t fStartCellsOffset; ///< Offset in bytes for the startcell data int32_t fDataOffset; ///< Offset in bytes for the data int32_t fNumSlices; ///< Number of samples per pixel per event //Compressed data stuff int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum uint32_t fNumTiles; uint32_t fNumRowsPerTile; //thread related stuff uint32_t fNumThreads; ///< The number of threads that will be used to compress uint32_t fThreadIndex; ///< A variable to assign threads indices vector fThread; ///< The thread handler of the compressor vector fThreadNumRows; ///< Total number of rows for thread to compresswww.;wwwwww vector fThreadStatus; ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty int32_t fThreadLooper; ///< Which thread will deal with the upcoming bunch of data ? pthread_mutex_t fMutex; ///< mutex for compressing threads struct CatalogEntry { CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {}; int64_t first; int64_t second; } __attribute__((__packed__)); // typedef pair CatalogEntry; typedef vector CatalogRow; typedef vector CatalogType; CatalogType fCatalog; Checksum fCatalogSum; Checksum fRawSum; off_t fCatalogOffset; vector fBufferVector; vector fRawSumBuffer; vector> fTransposedBufferVector; vector> fCompressedBufferVector; char* fBuffer; vector fTransposedBuffer; vector fCompressedBuffer; uint32_t fRealRowWidth; struct CompressedColumn { CompressedColumn(Table::Column& c, BlockHeader& h, vector& cs) : col(c), head(h), comp_sequence(cs) {} Table::Column col; BlockHeader head; vector comp_sequence; }; vector fRealColumns; //thread states. Not all used, but they do not hurt static const uint32_t _THREAD_WAIT_ = 0; ///< Thread doing nothing static const uint32_t _THREAD_COMPRESS_ = 1; ///< Thread working, compressing static const uint32_t _THREAD_DECOMPRESS_ = 2; ///< Thread working, decompressing static const uint32_t _THREAD_WRITE_ = 3; ///< Thread writing data to disk static const uint32_t _THREAD_READ_ = 4; ///< Thread reading data from disk static const uint32_t _THREAD_EXIT_ = 5; ///< Thread exiting }; #ifndef __MARS__ }; //namespace std #endif #ifdef crappy_example_usage zofits zofitsfile(123456, 100); zofitsfile.SetNumWorkingThreads(numThreads); zofitsfile.open((fileNameOut).c_str()); std::zofits::BlockHeader zoheader(0, zfits::kOrderByRow, 2); vector smoothmanProcessings(2); smoothmanProcessings[0] = zfits::kFactSmoothing; smoothmanProcessings[1] = zfits::kFactHuffman16; zofitsfile.AddColumn(sortedColumns[i].num, sortedColumns[i].type, colName, ""); zofitsfile.AddColumn(sortedColumns[i].num, sortedColumns[i].type, colName, "", zoheader, smoothmanProcessings); zofitsfile.SetStr("ZCHKSUM", i->second.value, i->second.comment); zofitsfile.SetDrsCalibration(drsCalibFloat); zofitsfile.WriteTableHeader(tableName.c_str()); zofitsfile.WriteRow(buffer, rowWidth); zofitsfile.close(); #endif