Changeset 17221
- Timestamp:
- 10/16/13 16:04:41 (11 years ago)
- Location:
- trunk/Mars/mcore
- Files:
-
- 1 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Mars/mcore/ofits.h
r17216 r17221 364 364 this->open(fname); 365 365 } 366 ~ofits() { if (is_open()) close(); }366 virtual ~ofits() { if (is_open()) close(); } 367 367 368 368 virtual void open(const char * filename, bool addEXTNAMEKey=true) … … 621 621 622 622 typecom << CommentFromType(typechar); 623 /* switch (typechar) 624 { 625 case 'L': typecom << "1-byte BOOL]"; break; 626 case 'A': typecom << "1-byte CHAR]"; break; 627 case 'B': typecom << "1-byte BOOL]"; break; 628 case 'I': typecom << "2-byte INT]"; break; 629 case 'J': typecom << "4-byte INT]"; break; 630 case 'K': typecom << "8-byte INT]"; break; 631 case 'E': typecom << "4-byte FLOAT]"; break; 632 case 'D': typecom << "8-byte FLOAT]"; break; 633 case 'Q': typecom << "var. Length]"; break; 634 } 635 */ 623 636 624 if (addHeaderKeys) 637 625 { … … 643 631 size_t size = SizeFromType(typechar); 644 632 645 /* switch (typechar)646 {647 case 'L': size = 1; break;648 case 'A': size = 1; break;649 case 'B': size = 1; break;650 case 'I': size = 2; break;651 case 'J': size = 4; break;652 case 'K': size = 8; break;653 case 'E': size = 4; break;654 case 'D': size = 8; break;655 case 'Q': size = 16; break;656 }657 */658 633 Table::Column col; 659 634 … … 672 647 673 648 return true; 649 } 650 651 virtual bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const BlockHeaderWriter& header, const string& comment="", bool addHeaderKeys=true) 652 { 653 return AddColumn(cnt, typechar, name, unit, comment, addHeaderKeys); 654 } 655 virtual bool AddColumn(const string& compressionScheme, uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true) 656 { 657 if (compressionScheme != "" && 658 compressionScheme != "RAW") 659 { 660 #ifdef __EXCEPTIONS 661 throw runtime_error("Trying to add a compressed column to an uncompressed file"); 662 #else 663 gLog << ___err___ << "ERROR - Trying to add a compressed column to an uncompressed file" << endl; 664 return false; 665 #endif 666 } 667 return AddColumn(cnt, typechar, name, unit, comment, addHeaderKeys); 674 668 } 675 669 -
trunk/Mars/mcore/zfits.h
r17141 r17221 12 12 #include "huffman.h" 13 13 14 #include "FITS.h" 15 16 using namespace FITS; 17 14 18 #ifndef __MARS__ 15 19 namespace std … … 20 24 { 21 25 public: 22 23 enum CompressionProcess_t24 {25 kFactRaw = 0x0,26 kFactSmoothing = 0x1,27 kFactHuffman16 = 0x228 };29 30 enum RowOrdering_t31 {32 kOrderByCol = 'C',33 kOrderByRow = 'R'34 };35 26 36 27 // Basic constructor … … 110 101 return; 111 102 } 112 113 103 ReadBinaryRow(row, dest); 114 104 } 115 105 116 106 private: 117 #ifndef __CINT__ 118 //Structure helper for reading tiles headers 119 struct TileHeader 120 { 121 char id[4]; 122 uint32_t numRows; 123 uint64_t size; 124 125 TileHeader() {} 126 127 TileHeader(uint32_t nRows, 128 uint64_t s) : id({'T', 'I', 'L', 'E'}), 129 numRows(nRows), 130 size(s) 131 { }; 132 } __attribute__((__packed__)); 133 134 //Structure helper for reading blocks headers and compresion schemes 135 struct BlockHeader 136 { 137 uint64_t size; 138 char ordering; 139 unsigned char numProcs; 140 uint16_t processings[]; 141 142 BlockHeader(uint64_t s=0, 143 char o=kOrderByRow, 144 unsigned char n=1) : size(s), 145 ordering(o), 146 numProcs(n) 147 {} 148 } __attribute__((__packed__)); 149 #endif 107 150 108 // Do what it takes to initialize the compressed structured 151 109 void InitCompressionReading() … … 240 198 const streampos catalogStart = tellg(); 241 199 242 fChkData.reset();200 fChkData.reset(); 243 201 244 202 //do the actual reading … … 501 459 default: 502 460 clear(rdstate()|ios::badbit); 461 ostringstream str; 462 str << "Unkown processing applied to data. Col " << i << " proc " << j << " out of " << (int)head->numProcs; 503 463 #ifdef __EXCEPTIONS 504 throw runtime_error( "Unknown processing applied to data. Aborting");464 throw runtime_error(str.str()); 505 465 #else 506 466 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl; -
trunk/Mars/mcore/zofits.h
r17219 r17221 10 10 #include "MemoryManager.h" 11 11 12 #include "FITS.h" 13 14 #ifdef USE_BOOST_THREADS 15 #include <boost/thread.hpp> 16 #endif 17 18 using namespace FITS; 19 12 20 #ifndef __MARS__ 13 21 namespace std … … 21 29 public: 22 30 23 //This has been duplicated from zfits. Should be be located one level up ?24 //If so, where ?25 enum CompressionProcess_t26 {27 kFactRaw = 0x0,28 kFactSmoothing = 0x1,29 kFactHuffman16 = 0x230 };31 32 enum RowOrdering_t33 {34 kOrderByCol = 'C',35 kOrderByRow = 'R'36 };37 38 //TileHeaders are only written, but never read-back39 //They are here to be able to recover raw data from binary if the header is corrupted40 //Or to cross-check the data, if desired: the zfits method CheckIfFileIsConsistent can do this41 struct TileHeader42 {43 char id[4];44 uint32_t numRows;45 uint64_t size;46 TileHeader(uint32_t nRows=0,47 uint64_t s=0) : id({'T', 'I', 'L', 'E'}),48 numRows(nRows),49 size(s)50 { };51 } __attribute__((__packed__));52 53 //BlockHeaders are written before every compressed blob of data54 struct BlockHeader55 {56 uint64_t size;57 char ordering;58 unsigned char numProcs;59 BlockHeader(uint64_t s=0,60 char o=zfits::kOrderByRow,61 unsigned char n=1) : size(s),62 ordering(o),63 numProcs(n)64 {}65 } __attribute__((__packed__)) ;66 67 68 31 struct WriteTarget 69 32 { 70 33 bool operator < (const WriteTarget& other) 71 34 { 72 tile_num < other.tile_num;35 return tile_num < other.tile_num; 73 36 } 74 37 uint32_t tile_num; … … 84 47 } 85 48 shared_ptr<MemoryChunk> src; 49 shared_ptr<MemoryChunk> transposed_src; 86 50 WriteTarget target; 87 51 uint32_t num_rows; … … 94 58 uint64_t maxUsableMem=0) : ofits(), 95 59 fMemPool(0, maxUsableMem), 96 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true )60 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true, false) 97 61 { 98 62 InitMemberVariables(numTiles, rowPerTile, maxUsableMem); 99 SetNumWorkingThreads( 1);63 SetNumWorkingThreads(fNumQueues); 100 64 } 101 65 … … 105 69 uint64_t maxUsableMem=0) : ofits(fname), 106 70 fMemPool(0, maxUsableMem), 107 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true )71 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true, false) 108 72 { 109 73 InitMemberVariables(numTiles, rowPerTile, maxUsableMem); 110 SetNumWorkingThreads( 1);111 } 112 113 ~zofits()74 SetNumWorkingThreads(fNumQueues); 75 } 76 77 virtual ~zofits() 114 78 { 115 79 } … … 118 82 void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0, uint64_t maxUsableMem=0) 119 83 { 84 if (nt == 0) 85 throw runtime_error("Cannot work with a catalog of size 0. sorry."); 86 120 87 fCheckOffset = 0; 121 88 … … 123 90 fNumRowsPerTile = rpt; 124 91 125 fNumQueues = 0;126 fQueueLooper = 0;127 128 92 fBuffer = NULL; 129 93 fRealRowWidth = 0; 94 fCatalogExtraRows = 0; 130 95 131 96 fCatalogOffset = 0; 132 fStartCellsOffset = -1;133 fDataOffset = -1;134 97 135 98 fMaxUsableMem = maxUsableMem; 136 } 137 138 //whether or not a calibration was given to the file writer 139 bool IsOffsetCalibrated() 140 { 141 return (fOffsetCalibration.size() != 0); 142 } 143 144 //assign a given drs offset calibration 145 void SetDrsCalibration(const float* calib) 146 { 147 if (!IsOffsetCalibrated()) 148 fOffsetCalibration.resize(1440*1024); 149 150 for (uint32_t i=0;i<1440*1024;i++) 151 fOffsetCalibration[i] = (int16_t)(calib[i]*4096.f/2000.f); 152 } 153 154 void SetDrsCalibration(const vector<float>& calib) 155 { 156 if (calib.size() != 1440*1024) 157 #ifdef __EXCEPTIONS 158 throw runtime_error("Cannot load calibration with anything else than 1024 samples per pixel"); 159 #else 160 gLog << ___err___ << "ERROR - Cannot load calibration with anything else than 1024 samples per pixel"); 161 #endif 162 SetDrsCalibration(calib.data()); 163 } 164 165 void LoadDrsCalibrationFromFile(const string& fileName) 166 { 167 factfits drsFile(fileName); 168 float* drsCalibFloat = reinterpret_cast<float*>(drsFile.SetPtrAddress("BaselineMean")); 169 170 drsFile.GetNextRow(); 171 172 SetDrsCalibration(drsCalibFloat); 173 } 99 #ifdef __EXCEPTIONS 100 fThreadsException = exception_ptr(); 101 #endif 102 } 103 174 104 175 105 //write the header of the binary table 176 bool WriteTableHeader(const char* name="DATA")106 virtual bool WriteTableHeader(const char* name="DATA") 177 107 { 178 108 if (!reallocateBuffers()) … … 181 111 ofits::WriteTableHeader(name); 182 112 183 //start the compression queues 184 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++) 185 it->start(); 113 if (fNumQueues != 0) 114 { 115 //start the compression queues 116 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++) 117 it->start(); 118 119 fWriteToDiskQueue.start(); 120 } 186 121 187 122 //mark that no tile has been written so far 188 123 fLatestWrittenTile = -1; 189 124 190 if (IsOffsetCalibrated()) 191 {//retrieve the column storing the start cell offsets, if required. 192 193 for (auto it=fRealColumns.begin(); it!=fRealColumns.end(); it++)//Table.cols.begin(); it!= fTable.cols.end(); it++) 194 { 195 if (it->col.name == "StartCellData") 196 fStartCellsOffset = it->col.offset; 197 if (it->col.name == "Data") 198 { 199 fNumSlices = it->col.num; 200 fDataOffset = it->col.offset; 201 if (fNumSlices % 1440 != 0) 202 { 203 #ifdef __EXCEPTIONS 204 throw runtime_error("Number of data samples not a multiple of 1440."); 205 #else 206 gLog << ___err___ << "ERROR - Number of data samples not a multiple of 1440. Doing it uncalibrated." << endl; 207 #endif 208 fOffsetCalibration.resize(0); 209 } 210 fNumSlices /= 1440; 211 } 212 } 213 if (fStartCellsOffset < 0) 214 { 215 #ifdef __EXCEPTIONS 216 throw runtime_error("FACT Calibration requested, but \"StartCellData\" column not found."); 217 #else 218 gLog << ___err___ << "ERROR - FACT Calibration requested, but \"StartCellData\" column not found. Doing it uncalibrated." << endl; 219 #endif 220 //throw away the calibration data 221 fOffsetCalibration.resize(0); 222 } 223 if (fDataOffset < 0) 224 { 225 #ifdef __EXCEPTIONS 226 throw runtime_error("FACT Calibration requested, but \"Data\" column not found."); 227 #else 228 gLog << ___err___ << "ERROR - FACT Calibration requested, but \"Data\" column not found. Doing it uncalibrated." << endl; 229 #endif 230 //throw away the calibration data 231 fOffsetCalibration.resize(0); 232 } 233 } 125 return good(); 234 126 } 235 127 … … 246 138 SetInt("ZTILELEN", fNumRowsPerTile, "Number of rows per tile"); 247 139 SetInt("THEAP", 0, ""); 248 SetStr("RAWSUM", " 0", "Checksum of raw littlen endian data"); 249 250 140 SetStr("RAWSUM", " 0", "Checksum of raw little endian data"); 141 SetFloat("ZRATIO", 0, "Compression ratio"); 142 143 fCatalogExtraRows = 0; 251 144 fRawSum.reset(); 252 145 } 253 146 254 bool WriteDrsOffsetsTable() 255 { 256 if (!IsOffsetCalibrated()) 257 return false; 258 259 ofits c; 260 c.SetStr("XTENSION", "BINTABLE" , "binary table extension"); 261 c.SetInt("BITPIX" , 8 , "8-bit bytes"); 262 c.SetInt("NAXIS" , 2 , "2-dimensional binary table"); 263 c.SetInt("NAXIS1" , 1024*1440*2 , "width of table in bytes"); 264 c.SetInt("NAXIS2" , 1 , "number of rows in table"); 265 c.SetInt("PCOUNT" , 0 , "size of special data area"); 266 c.SetInt("GCOUNT" , 1 , "one data group (required keyword)"); 267 c.SetInt("TFIELDS" , 1 , "number of fields in each row"); 268 c.SetStr("CHECKSUM", "0000000000000000" , "Checksum for the whole HDU"); 269 c.SetStr("DATASUM" , " 0" , "Checksum for the data block"); 270 c.SetStr("EXTNAME" , "ZDrsCellOffsets" , "name of this binary table extension"); 271 c.SetStr("TTYPE1" , "OffsetCalibration" , "label for field 1"); 272 c.SetStr("TFORM1" , "1474560I" , "data format of field: 2-byte INTEGER"); 273 c.End(); 274 275 vector<char> swappedOffsets; 276 swappedOffsets.resize(1024*1440*sizeof(int16_t)); 277 revcpy<sizeof(int16_t)>(swappedOffsets.data(), (char*)(fOffsetCalibration.data()), 1024*1440); 278 279 Checksum datasum; 280 datasum.add(swappedOffsets.data(), sizeof(int16_t)*1024*1440); 281 282 ostringstream dataSumStr; 283 dataSumStr << datasum.val(); 284 c.SetStr("DATASUM", dataSumStr.str()); 285 286 datasum += c.WriteHeader(*this); 287 288 const off_t here_I_am = tellp(); 289 290 c.SetStr("CHECKSUM", datasum.str()); 291 c.WriteHeader(*this); 292 293 seekp(here_I_am); 294 295 write(swappedOffsets.data(), swappedOffsets.size()); 296 297 AlignTo2880Bytes(); 298 147 virtual bool WriteDrsOffsetsTable() 148 { 299 149 return good(); 300 150 } … … 335 185 return good(); 336 186 } 187 virtual void DrsOffsetCalibrate(char* ) 188 { 189 190 } 191 192 void GrowCatalog() 193 { 194 uint32_t orig_catalog_size = fCatalog.size(); 195 196 fCatalog.resize(fCatalog.size()*2); 197 for (uint32_t i=orig_catalog_size;i<fCatalog.size(); i++) 198 { 199 fCatalog[i].resize(fTable.num_cols); 200 for (auto it=(fCatalog[i].begin()); it!=fCatalog[i].end(); it++) 201 *it = CatalogEntry(0,0); 202 } 203 204 fCatalogExtraRows += orig_catalog_size; 205 fNumTiles += orig_catalog_size; 206 } 337 207 338 208 bool WriteRow(const void* ptr, size_t cnt, bool byte_swap=true) … … 350 220 if (fTable.num_rows >= fNumRowsPerTile*fNumTiles) 351 221 { 222 // GrowCatalog(); 352 223 #ifdef __EXCEPTIONS 353 224 throw runtime_error("Maximum number of rows exceeded for this file"); … … 384 255 fRawSum.add(fRawSumBuffer, false); 385 256 386 if (IsOffsetCalibrated()) 387 { 388 389 int16_t* startCell = reinterpret_cast<int16_t*>(target_location + fStartCellsOffset); 390 int16_t* data = reinterpret_cast<int16_t*>(target_location + fDataOffset); 391 392 for (uint32_t ch=0; ch<1440; ch++) 393 { 394 if (startCell[ch] < 0) 395 { 396 data += fNumSlices; 397 continue; 398 } 399 400 const int16_t modStart = startCell[ch]%1024; 401 const int16_t *off = fOffsetCalibration.data() + ch*1024; 402 403 const int16_t* cal = off+modStart; 404 const int16_t* end_stride = data+fNumSlices; 405 406 if (modStart+fNumSlices > 1024) 407 { 408 while (cal < off+1024) 409 *data++ -= *cal++; 410 cal = off; 411 } 412 413 while (data<end_stride) 414 *data++ -= *cal++; 415 } 416 } 257 DrsOffsetCalibrate(target_location); 417 258 418 259 fTable.num_rows++; … … 423 264 SetNextCompression(compress_target); 424 265 425 if (!fCompressionQueues[fQueueLooper].post(compress_target)) 426 throw runtime_error("I could not post this buffer. This does not make sense..."); 427 428 fQueueLooper = (fQueueLooper+1)%fNumQueues; 429 } 430 431 return true; 266 if (fNumQueues == 0) 267 { //no worker threads. do everything in-line 268 uint64_t size_to_write = CompressBuffer(compress_target); 269 270 WriteTarget write_target; 271 write_target.size = size_to_write; 272 write_target.target = compress_target.target.target; 273 write_target.tile_num = compress_target.target.tile_num; 274 275 if (!WriteBufferToDisk(write_target)) 276 throw runtime_error("Something went wrong while writing to disk"); 277 } 278 else 279 { 280 //if all queues are empty, use queue 0 281 uint32_t min_index = 0; 282 uint32_t min_size = numeric_limits<uint32_t>::max(); 283 uint32_t current_index = 0; 284 285 for (auto it=fCompressionQueues.begin(); it!=fCompressionQueues.end(); it++) 286 { 287 if (it->size() < min_size) 288 { 289 min_index = current_index; 290 min_size = it->size(); 291 } 292 current_index++; 293 } 294 295 if (!fCompressionQueues[min_index].post(compress_target)) 296 throw runtime_error("I could not post this buffer. This does not make sense..."); 297 } 298 } 299 300 return good(); 432 301 } 433 302 … … 441 310 void SetNextCompression(CompressionTarget& target) 442 311 { 312 //get space for transposed data 443 313 shared_ptr<MemoryChunk> transposed_data = fMemPool.malloc(); 444 314 445 copyTransposeTile(fBuffer, transposed_data.get()->get()); 446 315 //fill up write to disk target 447 316 WriteTarget write_target; 448 317 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile; … … 450 319 write_target.target = fMemPool.malloc(); 451 320 452 target.src = transposed_data; 321 //fill up compression target 322 target.src = fSmartBuffer; 323 target.transposed_src = transposed_data; 453 324 target.target = write_target; 454 325 target.num_rows = fTable.num_rows; 326 327 //get a new buffer to host the incoming data 328 fSmartBuffer = fMemPool.malloc(); 329 fBuffer = fSmartBuffer.get()->get(); 330 } 331 332 void ShrinkCatalog() 333 { 334 //did we write more rows than what the catalog could host ? 335 if (fCatalogExtraRows != 0) 336 { 337 //how many rows can the regular catalog host ? 338 const uint32_t max_regular_rows = (fCatalog.size() - fCatalogExtraRows)*fNumRowsPerTile; 339 //what's the shrink factor to be applied ? 340 const uint32_t shrink_factor = fTable.num_rows/max_regular_rows + ((fTable.num_rows%max_regular_rows) ? 1 : 0); 341 342 //shrink the catalog ! 343 for (uint32_t i=0; i<fTable.num_rows/fNumRowsPerTile; i+= shrink_factor) 344 {//add the elements one by one, so that the empty ones at the end (i.e. fTable.num_rows%shrink_factor) do not create havok 345 const uint32_t target_catalog_row = i/shrink_factor; 346 //move data from current row (i) to target row 347 for (uint32_t j=0; j<fTable.num_cols; j++) 348 { 349 fCatalog[target_catalog_row][j].second = fCatalog[i][j].second; 350 fCatalog[target_catalog_row][j].first = 0; 351 uint64_t last_size = fCatalog[i][j].first; 352 uint64_t last_offset = fCatalog[i][j].second; 353 354 for (uint32_t k=1; k<shrink_factor; k++) 355 { 356 if (fCatalog[i+k][j].second != 0) 357 { 358 fCatalog[target_catalog_row][j].first += fCatalog[i+k][j].second - last_offset; 359 } 360 else 361 { 362 fCatalog[target_catalog_row][j].first += last_size; 363 break; 364 } 365 last_size = fCatalog[i+k][j].first; 366 last_offset = fCatalog[i+k][j].second; 367 } 368 } 369 } 370 371 fCatalog.resize(fCatalog.size() - fCatalogExtraRows); 372 373 //update header keywords 374 const uint32_t new_num_rows_per_tiles = fNumRowsPerTile*shrink_factor; 375 const uint32_t new_num_tiles_written = (fTable.num_rows + new_num_rows_per_tiles-1)/new_num_rows_per_tiles; 376 SetInt("THEAP", new_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols); 377 SetInt("NAXIS2", new_num_tiles_written); 378 SetInt("ZTILELEN", new_num_rows_per_tiles); 379 cout << "New num rows per tiles: " << new_num_rows_per_tiles << " shrink factor: " << shrink_factor << endl; 380 cout << "Num tiles written: " << new_num_tiles_written << endl; 381 } 455 382 } 456 383 457 384 bool close() 458 385 { 459 if (tellp() < 0)460 return false;461 462 386 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++) 463 387 it->wait(); … … 465 389 fWriteToDiskQueue.wait(); 466 390 391 if (tellp() < 0) 392 { 393 #ifdef __EXCEPTIONS 394 throw runtime_error("Something went wrong while writing to disk..."); 395 #else 396 return false; 397 #endif 398 } 399 400 #ifdef __EXCEPTIONS 401 //check if something hapenned to the compression threads 402 if (fThreadsException != exception_ptr()) 403 { 404 rethrow_exception(fThreadsException); 405 } 406 #endif 407 467 408 if (fTable.num_rows%fNumRowsPerTile != 0) 468 409 { … … 470 411 SetNextCompression(compress_target); 471 412 413 //set number of threads to zero before calling compressBuffer 414 int32_t backup_num_queues = fNumQueues; 415 fNumQueues = 0; 472 416 uint64_t size_to_write = CompressBuffer(compress_target); 417 fNumQueues = backup_num_queues; 473 418 474 419 WriteTarget write_target; … … 518 463 } 519 464 465 float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size; 466 SetFloat("ZRATIO", compression_ratio); 467 520 468 //add to the heap size the size of the gap between the catalog and the actual heap 521 469 heap_size += (fCatalog.size() - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2; … … 523 471 SetInt("PCOUNT", heap_size, "size of special data area"); 524 472 473 525 474 //Just for updating the fCatalogSum value 526 475 WriteCatalog(); … … 548 497 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true) 549 498 { 550 BlockHeader head; 551 vector<uint16_t> processing(1); 552 processing[0] = kFactRaw; 553 AddColumn(cnt, typechar, name, unit, head, processing, comment, addHeaderKeys); 554 } 555 556 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, BlockHeader& header, vector<uint16_t>& comp_sequence, const string& comment="", bool addHeaderKeys=true) 499 BlockHeaderWriter head; 500 return AddColumn(cnt, typechar, name, unit, head, comment, addHeaderKeys); 501 } 502 503 bool AddColumn(const string& compressionScheme, uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true) 504 { 505 BlockHeaderWriter head(compressionScheme); 506 return AddColumn(cnt, typechar, name, unit, head, comment, addHeaderKeys); 507 } 508 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const BlockHeaderWriter& header, const string& comment="", bool addHeaderKeys=true) 557 509 { 558 510 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys)) … … 570 522 fRealRowWidth += size*cnt; 571 523 572 fRealColumns.emplace_back(CompressedColumn(col, header , comp_sequence));524 fRealColumns.emplace_back(CompressedColumn(col, header)); 573 525 574 526 ostringstream strKey, strVal, strCom; … … 583 535 strKey << "ZCTYP" << fRealColumns.size(); 584 536 strVal << "FACT"; 585 strCom << "Comp . of FACT telescope";537 strCom << "Compression type FACT"; 586 538 SetStr(strKey.str(), strVal.str(), strCom.str()); 587 539 … … 589 541 } 590 542 591 bool SetNumWorkingThreads(uint32_t num) 543 bool AddColumnShort(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="") 544 { return AddColumn(compressionScheme, cnt, 'I', name, unit, comment); } 545 bool AddColumnInt(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="") 546 { return AddColumn(compressionScheme, cnt, 'J', name, unit, comment); } 547 bool AddColumnLong(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="") 548 { return AddColumn(compressionScheme, cnt, 'K', name, unit, comment); } 549 bool AddColumnFloat(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="") 550 { return AddColumn(compressionScheme, cnt, 'E', name, unit, comment); } 551 bool AddColumnDouble(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="") 552 { return AddColumn(compressionScheme, cnt, 'D', name, unit, comment); } 553 bool AddColumnChar(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="") 554 { return AddColumn(compressionScheme, cnt, 'A', name, unit, comment); } 555 bool AddColumnByte(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="") 556 { return AddColumn(compressionScheme, cnt, 'B', name, unit, comment); } 557 bool AddColumnBool(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="") 558 { return AddColumn(compressionScheme, cnt, 'L', name, unit, comment); } 559 560 static void SetNumThreads(int32_t num) { fNumQueues = num;} 561 static int32_t GetNumThreads() { return fNumQueues;} 562 protected: 563 564 bool SetNumWorkingThreads(int32_t num) 592 565 { 593 566 if (is_open()) … … 596 569 throw runtime_error("File must be closed before changing the number of compression threads"); 597 570 #else 598 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" );571 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads"; 599 572 #endif 600 573 return false; 601 574 } 602 if (num < 1 || num > 64) 603 { 604 #ifdef __EXCEPTIONS 605 throw runtime_error("Number of threads must be between 1 and 64"); 606 #else 607 gLog << ___err___ << "ERROR - Number of threads must be between 1 and 64"); 575 #ifdef USE_BOOST_THREADS 576 int32_t num_available_cores = boost::thread::hardware_concurrency(); 577 #else 578 int32_t num_available_cores = thread::hardware_concurrency(); 579 #endif 580 581 if (num_available_cores == 0) 582 {//could not detect number of available cores from system properties... 583 //Assuming that 5 cores are availables (4 compression, 1 write) 584 num_available_cores = 5; 585 } 586 if (num > num_available_cores) 587 { 588 ostringstream str; 589 str << "Number of threads cannot be greater than physically available (" << num_available_cores << ")"; 590 #ifdef __EXCEPTIONS 591 throw runtime_error(str.str()); 592 #else 593 gLog << ___err___ << "ERROR - " << str.str(); 608 594 #endif 609 595 return false; 610 596 } 611 597 612 if (fCompressionQueues.size() == num) 598 if (num == -1) 599 num = num_available_cores-2; // 1 for writing, one for the main thread 600 601 if (fCompressionQueues.size() == (uint32_t)num) 613 602 return true; 614 603 … … 617 606 618 607 //shrink 619 if ( num < fCompressionQueues.size())608 if ((uint32_t)num < fCompressionQueues.size()) 620 609 { 621 610 fCompressionQueues.resize(num, queue); … … 626 615 fCompressionQueues.resize(num, queue); 627 616 628 fNumQueues = num; 629 fQueueLooper = 0; 617 fNumQueues = num; 630 618 631 619 return true; 632 620 } 633 634 635 private:636 621 637 622 bool reallocateBuffers() … … 641 626 642 627 fSmartBuffer = fMemPool.malloc(); 643 fBuffer = fSmartBuffer.get()->get(); 644 // memset(fBuffer, 0, 4); 645 // fBuffer += 4; 628 fBuffer = fSmartBuffer.get()->get(); 646 629 647 630 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming … … 686 669 } 687 670 688 bool CompressBuffer(const CompressionTarget& target) 689 { 690 //compress the buffer 691 uint64_t compressed_size = compressBuffer(target.target.target.get()->get(), target.src.get()->get(), target.num_rows); 671 uint32_t CompressBuffer(const CompressionTarget& target) 672 { 673 uint64_t compressed_size = 0; 674 #ifdef __EXCEPTIONS 675 try 676 { 677 #endif 678 //transpose the original data 679 copyTransposeTile(target.src.get()->get(), target.transposed_src.get()->get()); 680 681 //compress the buffer 682 compressed_size = compressBuffer(target.target.target.get()->get(), target.transposed_src.get()->get(), target.num_rows); 683 #ifdef __EXCEPTIONS 684 } 685 catch (...) 686 { 687 fThreadsException = current_exception(); 688 if (fNumQueues == 0) 689 rethrow_exception(fThreadsException); 690 } 691 #endif 692 693 if (fNumQueues == 0) 694 return compressed_size; 692 695 693 696 //post the result to the writing queue … … 699 702 700 703 fWriteToDiskQueue.post(wt); 701 return true; 704 705 return compressed_size; 702 706 } 703 707 … … 705 709 { 706 710 //is this the tile we're supposed to write ? 707 if (target.tile_num != fLatestWrittenTile+1)711 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1)) 708 712 return false; 709 713 … … 711 715 712 716 //write the buffer to disk. 713 writeCompressedDataToDisk(target.target.get()->get(), target.size); 714 715 return true; 717 return writeCompressedDataToDisk(target.target.get()->get(), target.size); 716 718 } 717 719 … … 736 738 if (fRealColumns[i].col.num == 0) continue; 737 739 738 BlockHeader& head = fRealColumns[i].head; 739 const vector<uint16_t>& sequence = fRealColumns[i].comp_sequence; 740 BlockHeaderWriter& head = fRealColumns[i].block_head; 740 741 741 742 //set the default byte telling if uncompressed the compressed Flag 742 743 uint64_t previousOffset = compressedOffset; 744 743 745 //skip header data 744 compressedOffset += sizeof(BlockHeader) + sizeof(uint16_t)*sequence.size();745 746 for (uint32_t j=0;j< sequence.size(); j++)746 compressedOffset += head.SizeOnDisk(); 747 748 for (uint32_t j=0;j<head.NumProcs();j++)//sequence.size(); j++) 747 749 { 748 switch ( sequence[j])750 switch (head.Proc(j)) 749 751 { 750 case zfits::kFactRaw: 751 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, 752 src + offset, 753 thisRoundNumRows, 754 fRealColumns[i].col.size, 755 fRealColumns[i].col.num); 752 case kFactRaw: 753 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num); 756 754 break; 757 case zfits::kFactSmoothing: 758 applySMOOTHING(dest + compressedOffset, 759 src + offset, 760 thisRoundNumRows, 761 fRealColumns[i].col.size, 762 fRealColumns[i].col.num); 755 case kFactSmoothing: 756 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num); 763 757 break; 764 case zfits::kFactHuffman16: 765 if (head.ordering == zfits::kOrderByCol) 766 compressedOffset += compressHUFFMAN(dest + compressedOffset, 767 src + offset, 768 thisRoundNumRows, 769 fRealColumns[i].col.size, 770 fRealColumns[i].col.num); 758 case kFactHuffman16: 759 if (head.Ordering() == kOrderByCol) 760 compressedOffset += compressHUFFMAN(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num); 771 761 else 772 compressedOffset += compressHUFFMAN(dest + compressedOffset, 773 src + offset, 774 fRealColumns[i].col.num, 775 fRealColumns[i].col.size, 776 thisRoundNumRows); 762 compressedOffset += compressHUFFMAN(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows); 777 763 break; 778 764 default: 779 cout << "ERROR: Unkown compression sequence entry: " << sequence[j] << endl; 780 break; 765 { 766 ostringstream str; 767 str << "Unkown compression sequence entry: " << head.Proc(j); 768 #ifdef __EXCEPTIONS 769 throw runtime_error(str.str()); 770 #else 771 gLog << ___err___ << "ERROR - " << str.str(); 772 return 0; 773 #endif 774 } 781 775 } 782 776 } 783 777 784 //check if compressed size is larger than uncompressed 785 if (sequence[0] != zfits::kFactRaw && 786 compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size()) 778 //check if compressed size is larger than uncompressed 779 if ((head.Proc(0) != kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.SizeOnDisk()))// && two) 787 780 {//if so set flag and redo it uncompressed 788 cout << "REDOING UNCOMPRESSED" << endl; 789 compressedOffset = previousOffset + sizeof(BlockHeader) + 1; 790 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num); 791 BlockHeader he; 792 he.size = compressedOffset - previousOffset; 793 he.numProcs = 1; 794 he.ordering = zfits::kOrderByRow; 795 memcpy(dest + previousOffset, (char*)(&he), sizeof(BlockHeader)); 796 dest[previousOffset+sizeof(BlockHeader)] = zfits::kFactRaw; 781 cout << "Redoing uncompressed ! " << endl; 782 //de-smooth ! 783 if (head.Proc(0) == kFactSmoothing) 784 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows); 785 786 BlockHeaderWriter he; 787 compressedOffset = previousOffset + he.SizeOnDisk(); 788 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num); 789 he.SetBlockSize(compressedOffset - previousOffset); 790 he.Write(dest+previousOffset); 797 791 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num; 798 792 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second; … … 800 794 } 801 795 802 head.size = compressedOffset - previousOffset; 803 memcpy(dest + previousOffset, (char*)(&head), sizeof(BlockHeader)); 804 memcpy(dest + previousOffset+sizeof(BlockHeader), sequence.data(), sizeof(uint16_t)*sequence.size()); 796 head.SetBlockSize(compressedOffset - previousOffset); 797 head.Write(dest + previousOffset); 805 798 806 799 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num; … … 814 807 } 815 808 816 void copyTransposeTile(const char* src, char* dest) //uint32_t index)809 void copyTransposeTile(const char* src, char* dest) 817 810 { 818 811 uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile; … … 821 814 for (uint32_t i=0;i<fRealColumns.size();i++) 822 815 { 823 switch (fRealColumns[i]. head.ordering)816 switch (fRealColumns[i].block_head.Ordering()) 824 817 { 825 case zfits::kOrderByRow:818 case kOrderByRow: 826 819 for (uint32_t k=0;k<thisRoundNumRows;k++) 827 820 {//regular, "semi-transposed" copy … … 831 824 break; 832 825 833 case zfits::kOrderByCol :834 for ( int j=0;j<fRealColumns[i].col.num;j++)826 case kOrderByCol : 827 for (uint32_t j=0;j<fRealColumns[i].col.num;j++) 835 828 for (uint32_t k=0;k<thisRoundNumRows;k++) 836 829 {//transposed copy … … 840 833 break; 841 834 default: 842 cout << "Error: unknown column ordering: " << fRealColumns[i].head.ordering << endl; 835 { 836 ostringstream str; 837 str << "Unkown column ordering: " << fRealColumns[i].block_head.Ordering(); 838 #ifdef __EXCEPTIONS 839 throw runtime_error(str.str()); 840 #else 841 gLog << ___err___ << "ERROR - " << str.str(); 842 return; 843 #endif 844 } 843 845 }; 844 846 } … … 846 848 847 849 /// Specific compression functions 848 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)849 { 850 memcpy(dest, src, numRows*sizeOfElems*numRowElems);851 return numRows*sizeOfElems*numRowElems;850 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size) 851 { 852 memcpy(dest, src, size); 853 return size; 852 854 } 853 855 … … 862 864 if (sizeOfElems < 2 ) 863 865 { 864 cout << "Fatal ERROR: HUFMANN can only encode short or longer types" << endl; 866 #ifdef __EXCEPTIONS 867 throw runtime_error("Fatal ERROR: HUFMANN can only encode short or longer types"); 868 #else 869 gLog << ___err___ << "ERROR - Fatal ERROR: HUFMANN can only encode short or longer types"; 865 870 return 0; 871 #endif 866 872 } 867 873 uint32_t huffmanOffset = 0; … … 884 890 } 885 891 886 uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems) 887 { 888 uint32_t colWidth = numRowElems; 889 for (int j=colWidth*numRows-1;j>1;j--) 890 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2; 891 892 return numRows*sizeOfElems*numRowElems; 893 } 894 895 //Offsets calibration stuff. 896 vector<int16_t> fOffsetCalibration; ///< The calibration itself 897 int32_t fStartCellsOffset; ///< Offset in bytes for the startcell data 898 int32_t fDataOffset; ///< Offset in bytes for the data 899 int32_t fNumSlices; ///< Number of samples per pixel per event 900 892 uint32_t applySMOOTHING(char* data, uint32_t numElems)//uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems) 893 { 894 int16_t* short_data = reinterpret_cast<int16_t*>(data); 895 for (int j=numElems-1;j>1;j--) 896 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2; 897 898 return numElems*sizeof(int16_t); 899 } 900 // Apply the inverse transform of the integer smoothing 901 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems) 902 { 903 int16_t* short_data = reinterpret_cast<int16_t*>(data); 904 //un-do the integer smoothing 905 for (uint32_t j=2;j<numElems;j++) 906 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2; 907 908 return numElems*sizeof(uint16_t); 909 } 901 910 //Compressed data stuff 902 911 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum … … 904 913 uint32_t fNumRowsPerTile; 905 914 915 MemoryManager fMemPool; 916 906 917 //thread related stuff 907 918 vector<Queue<CompressionTarget>> fCompressionQueues; … … 909 920 910 921 //thread related stuff 911 uint32_t fNumQueues; ///< The number of threads that will be used to compress912 uint32_t fQueueLooper; 922 static int32_t fNumQueues; ///< The number of threads that will be used to compress 923 913 924 int32_t fLatestWrittenTile; 914 925 #ifdef __EXCEPTIONS 926 exception_ptr fThreadsException; 927 #endif 915 928 struct CatalogEntry 916 929 { … … 927 940 off_t fCatalogOffset; 928 941 uint32_t fRealRowWidth; 929 942 uint32_t fCatalogExtraRows; 930 943 vector<char> fRawSumBuffer; 931 MemoryManager fMemPool;932 944 uint64_t fMaxUsableMem; 933 945 … … 935 947 char* fBuffer; 936 948 937 938 949 struct CompressedColumn 939 950 { 940 CompressedColumn(Table::Column& c, BlockHeader& h, vector<uint16_t>& cs) : col(c), 941 head(h), 942 comp_sequence(cs) 951 CompressedColumn(const Table::Column& c, const BlockHeaderWriter& h) : col(c), 952 block_head(h) 943 953 {} 944 Table::Column col; 945 BlockHeader head; 946 vector<uint16_t> comp_sequence; 954 Table::Column col; 955 BlockHeaderWriter block_head; 947 956 }; 948 957 vector<CompressedColumn> fRealColumns; 949 958 950 959 }; 960 961 int32_t zofits::fNumQueues = 0; 951 962 952 963 #ifndef __MARS__ … … 958 969 zofitsfile.SetNumWorkingThreads(numThreads); 959 970 zofitsfile.open((fileNameOut).c_str()); 960 std::zofits::BlockHeader zoheader(0, zfits::kOrderByRow, 2);971 std::zofits::BlockHeader zoheader(0, kOrderByRow, 2); 961 972 vector<uint16_t> smoothmanProcessings(2); 962 smoothmanProcessings[0] = zfits::kFactSmoothing;963 smoothmanProcessings[1] = zfits::kFactHuffman16;973 smoothmanProcessings[0] = kFactSmoothing; 974 smoothmanProcessings[1] = kFactHuffman16; 964 975 965 976 zofitsfile.AddColumn(sortedColumns[i].num,
Note:
See TracChangeset
for help on using the changeset viewer.