Changeset 17253 for trunk/Mars/mcore/zofits.h
- Timestamp:
- 10/18/13 13:49:17 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Mars/mcore/zofits.h
r17243 r17253 37 37 }; 38 38 39 //catalog types 40 struct CatalogEntry 41 { 42 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {}; 43 int64_t first; ///< Size of this column in the tile 44 int64_t second; ///< offset of this column in the tile, from the start of the heap area 45 } __attribute__((__packed__)); 46 typedef vector<CatalogEntry> CatalogRow; 47 typedef list<CatalogRow> CatalogType; 48 39 49 /// Parameters required to write a tile to disk 40 50 struct WriteTarget … … 53 63 struct CompressionTarget 54 64 { 55 shared_ptr<MemoryChunk> src; ///< Original data 56 shared_ptr<MemoryChunk> transposed_src; ///< Transposed data 57 WriteTarget target; ///< Compressed data 58 uint32_t num_rows; ///< Number of rows to compress 59 }; 65 CompressionTarget(CatalogRow& r) : catalog_entry(r) 66 {} 67 68 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with 69 shared_ptr<MemoryChunk> src; ///< Original data 70 shared_ptr<MemoryChunk> transposed_src;///< Transposed data 71 WriteTarget target; ///< Compressed data 72 uint32_t num_rows; ///< Number of rows to compress 73 }; 60 74 61 75 public: … … 112 126 fBuffer = NULL; 113 127 fRealRowWidth = 0; 114 fCatalogExtraRows = 0;115 128 fCatalogOffset = 0; 129 fCatalogSize = 0; 116 130 117 131 fMaxUsableMem = maxUsableMem; … … 163 177 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data"); 164 178 SetFloat("ZRATIO", 0, "Compression ratio"); 165 166 fCatalogExtraRows = 0; 179 SetInt( "ZSHRINK", 1, "Catalog shrink factor"); 180 181 fCatalogSize = 0; 182 fCatalog.clear(); 167 183 fRawSum.reset(); 168 184 } … … 187 203 { 188 204 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t); 189 const uint32_t total_catalog_size = f Catalog.size()*one_catalog_row_size;205 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;//fCatalog.size()*one_catalog_row_size; 190 206 191 207 // swap the catalog bytes before writing … … 198 214 } 199 215 216 if (fCatalogSize < fNumTiles) 217 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift); 218 200 219 // first time writing ? remember where we are 201 220 if (fCatalogOffset == 0) … … 208 227 seekp(fCatalogOffset); 209 228 write(swapped_catalog.data(), total_catalog_size); 229 210 230 if (where_are_we != fCatalogOffset) 211 231 seekp(where_are_we); … … 224 244 } 225 245 226 /// Grows the catalog in case not enough rows were allocated 227 void GrowCatalog() 228 { 229 uint32_t orig_catalog_size = fCatalog.size(); 230 231 fCatalog.resize(fCatalog.size()*2); 232 for (uint32_t i=orig_catalog_size;i<fCatalog.size(); i++) 233 { 234 fCatalog[i].resize(fTable.num_cols); 235 for (auto it=(fCatalog[i].begin()); it!=fCatalog[i].end(); it++) 236 *it = CatalogEntry(0,0); 237 } 238 239 fCatalogExtraRows += orig_catalog_size; 240 fNumTiles += orig_catalog_size; 241 } 242 246 CatalogRow& AddOneCatalogRow() 247 { 248 // add one row to the catalog 249 fCatalog.emplace_back(CatalogRow()); 250 fCatalog.back().resize(fTable.num_cols); 251 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++) 252 *it = CatalogEntry(0,0); 253 254 fCatalogSize++; 255 256 return fCatalog.back(); 257 } 243 258 /// write one row of data 244 259 /// @param ptr the source buffer … … 253 268 #else 254 269 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << endl; 255 return false;256 #endif257 }258 259 if (fTable.num_rows >= fNumRowsPerTile*fNumTiles)260 {261 // GrowCatalog();262 #ifdef __EXCEPTIONS263 throw runtime_error("Maximum number of rows exceeded for this file");264 #else265 gLog << ___err___ << "ERROR - Maximum number of rows exceeded for this file" << endl;266 270 return false; 267 271 #endif … … 294 298 fRawSum.add(fRawSumBuffer, false); 295 299 300 fTable.num_rows++; 301 296 302 DrsOffsetCalibrate(target_location); 297 303 298 fTable.num_rows++;299 300 304 if (fTable.num_rows % fNumRowsPerTile == 0) 301 305 { 302 CompressionTarget compress_target ;306 CompressionTarget compress_target(AddOneCatalogRow()); 303 307 SetNextCompression(compress_target); 304 308 … … 374 378 void ShrinkCatalog() 375 379 { 380 //add empty row to get either the target number of rows, or a multiple of the allowed size 381 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++) 382 AddOneCatalogRow(); 383 376 384 //did we write more rows than what the catalog could host ? 377 if (fCatalogExtraRows != 0) 378 { 379 //how many rows can the regular catalog host ? 380 const uint32_t max_regular_rows = (fCatalog.size() - fCatalogExtraRows)*fNumRowsPerTile; 381 //what's the shrink factor to be applied ? 382 const uint32_t shrink_factor = fTable.num_rows/max_regular_rows + ((fTable.num_rows%max_regular_rows) ? 1 : 0); 385 if (fCatalogSize > fNumTiles) 386 { 387 const uint32_t shrink_factor = fCatalogSize / fNumTiles; //always exact as extra rows were added just above 383 388 384 389 //shrink the catalog ! 385 for (uint32_t i=0; i<fTable.num_rows/fNumRowsPerTile; i+= shrink_factor) 386 {//add the elements one by one, so that the empty ones at the end (i.e. fTable.num_rows%shrink_factor) do not create havok 387 const uint32_t target_catalog_row = i/shrink_factor; 388 //move data from current row (i) to target row 389 for (uint32_t j=0; j<fTable.num_cols; j++) 390 { 391 fCatalog[target_catalog_row][j].second = fCatalog[i][j].second; 392 fCatalog[target_catalog_row][j].first = 0; 393 uint64_t last_size = fCatalog[i][j].first; 394 uint64_t last_offset = fCatalog[i][j].second; 395 396 for (uint32_t k=1; k<shrink_factor; k++) 397 { 398 if (fCatalog[i+k][j].second != 0) 399 { 400 fCatalog[target_catalog_row][j].first += fCatalog[i+k][j].second - last_offset; 401 } 402 else 403 { 404 fCatalog[target_catalog_row][j].first += last_size; 405 break; 406 } 407 last_size = fCatalog[i+k][j].first; 408 last_offset = fCatalog[i+k][j].second; 409 } 410 } 390 uint32_t entry_id = 1; 391 auto it = fCatalog.begin(); 392 it++; 393 for (; it != fCatalog.end(); it++) 394 { 395 if (entry_id >= fNumTiles) break; 396 397 uint32_t target_id = entry_id*shrink_factor; 398 399 auto jt = it; 400 for (uint32_t i=0;i<target_id-entry_id;i++) 401 jt++; 402 403 *it = *jt; 404 405 entry_id++; 411 406 } 412 407 413 fCatalog.resize(fCatalog.size() - fCatalogExtraRows); 414 408 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles; 409 //remove the too many entries 410 for (uint32_t i=0;i<num_tiles_to_remove;i++) 411 { 412 fCatalog.pop_back(); 413 fCatalogSize--; 414 } 415 415 //update header keywords 416 const uint32_t new_num_rows_per_tiles = fNumRowsPerTile*shrink_factor; 417 const uint32_t new_num_tiles_written = (fTable.num_rows + new_num_rows_per_tiles-1)/new_num_rows_per_tiles; 418 SetInt("THEAP", new_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols); 419 SetInt("NAXIS2", new_num_tiles_written); 420 SetInt("ZTILELEN", new_num_rows_per_tiles); 421 cout << "New num rows per tiles: " << new_num_rows_per_tiles << " shrink factor: " << shrink_factor << endl; 422 cout << "Num tiles written: " << new_num_tiles_written << endl; 416 fNumRowsPerTile *= shrink_factor; 417 418 SetInt("ZTILELEN", fNumRowsPerTile); 419 SetInt("ZSHRINK", shrink_factor); 423 420 } 424 421 } … … 455 452 if (fTable.num_rows%fNumRowsPerTile != 0) 456 453 { 457 CompressionTarget compress_target ;454 CompressionTarget compress_target(AddOneCatalogRow()); 458 455 SetNextCompression(compress_target); 459 456 … … 475 472 AlignTo2880Bytes(); 476 473 474 int64_t heap_size = 0; 475 int64_t compressed_offset = 0; 476 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++) 477 { 478 compressed_offset += sizeof(TileHeader); 479 heap_size += sizeof(TileHeader); 480 for (uint32_t j=0; j<it->size(); j++) 481 { 482 heap_size += (*it)[j].first; 483 (*it)[j].second = compressed_offset; 484 compressed_offset += (*it)[j].first; 485 if ((*it)[j].first == 0) 486 (*it)[j].second = 0; 487 } 488 } 489 490 ShrinkCatalog(); 491 477 492 //update header keywords 478 493 SetInt("ZNAXIS1", fRealRowWidth); 479 494 SetInt("ZNAXIS2", fTable.num_rows); 480 495 481 SetInt("ZHEAPPTR", fCatalog .size()*fTable.num_cols*sizeof(uint64_t)*2);496 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2); 482 497 483 498 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile; … … 492 507 SetStr("RAWSUM", str.str()); 493 508 494 int64_t heap_size = 0;495 int64_t compressed_offset = 0;496 497 for (uint32_t i=0; i<total_num_tiles_written; i++)498 {499 compressed_offset += sizeof(TileHeader);500 heap_size += sizeof(TileHeader);501 for (uint32_t j=0; j<fCatalog[i].size(); j++)502 {503 heap_size += fCatalog[i][j].first;504 fCatalog[i][j].second = compressed_offset;505 compressed_offset += fCatalog[i][j].first;506 if (fCatalog[i][j].first == 0)507 fCatalog[i][j].second = 0;508 }509 }510 511 509 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size; 512 510 SetFloat("ZRATIO", compression_ratio); 513 511 514 512 //add to the heap size the size of the gap between the catalog and the actual heap 515 heap_size += (fCatalog .size()- total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;513 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2; 516 514 517 515 SetInt("PCOUNT", heap_size, "size of special data area"); … … 630 628 631 629 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming 632 633 //give the catalog enough space634 fCatalog.resize(fNumTiles);635 for (uint32_t i=0;i<fNumTiles;i++)636 {637 fCatalog[i].resize(fRealColumns.size());638 for (auto it=fCatalog[i].begin(); it!=fCatalog[i].end(); it++)639 *it = CatalogEntry(0,0);640 }641 630 } 642 631 … … 667 656 668 657 fCheckOffset = (4 - extraBytes)%4; 658 669 659 //write data to disk 670 660 write(src+4, sizeToWrite); … … 679 669 { 680 670 uint64_t compressed_size = 0; 671 672 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct. 673 //Really do not understand what's wrong... 674 //calibrate data if required 675 //const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile; 676 // for (uint32_t i=0;i<thisRoundNumRows;i++) 677 // { 678 // char* target_location = target.src.get()->get() + fRealRowWidth*i; 679 // cout << "Target Location there...." << hex << static_cast<void*>(target_location) << endl; 680 // DrsOffsetCalibrate(target_location); 681 // } 682 681 683 #ifdef __EXCEPTIONS 682 684 try … … 687 689 688 690 //compress the buffer 689 compressed_size = compressBuffer(target.target.data.get()->get(), target.transposed_src.get()->get(), target.num_rows );691 compressed_size = compressBuffer(target.target.data.get()->get(), target.transposed_src.get()->get(), target.num_rows, target.catalog_entry); 690 692 #ifdef __EXCEPTIONS 691 693 } … … 762 764 /// @param num_rows the number of uncompressed rows in the transposed buffer 763 765 /// @param the number of bytes of the compressed data 764 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows )766 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row) 765 767 { 766 768 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile; 767 const uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile;768 769 uint32_t offset = 0; 769 770 … … 777 778 for (uint32_t i=0;i<fRealColumns.size();i++) 778 779 { 779 fCatalog[currentCatalogRow][i].second = compressedOffset;780 catalog_row[i].second = compressedOffset; 780 781 781 782 if (fRealColumns[i].col.num == 0) continue; … … 811 812 if ((head.getProc(0) != kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two) 812 813 {//if so set flag and redo it uncompressed 813 // cout << "Redoing uncompressed ! " << endl;814 814 //de-smooth ! 815 815 if (head.getProc(0) == kFactSmoothing) … … 826 826 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num; 827 827 828 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;828 catalog_row[i].first = compressedOffset - catalog_row[i].second; 829 829 continue; 830 830 } … … 834 834 835 835 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num; 836 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;836 catalog_row[i].first = compressedOffset - catalog_row[i].second; 837 837 } 838 838 … … 968 968 969 969 // catalog related stuff 970 struct CatalogEntry971 {972 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};973 int64_t first; ///< Size of this column in the tile974 int64_t second; ///< offset of this column in the tile, from the start of the heap area975 } __attribute__((__packed__));976 977 typedef vector<CatalogEntry> CatalogRow;978 typedef vector<CatalogRow> CatalogType;979 970 CatalogType fCatalog; ///< Catalog for this file 980 //uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)971 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists) 981 972 uint32_t fNumTiles; ///< Number of pre-reserved tiles 982 973 uint32_t fNumRowsPerTile; ///< Number of rows per tile 983 974 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file 984 uint32_t fCatalogExtraRows; ///< Number of extra rows written on top of the initial capacity of the file985 975 986 976 // checksum related stuff
Note:
See TracChangeset
for help on using the changeset viewer.