Changeset 17402 for trunk/Mars/mcore
- Timestamp:
- 11/29/13 18:12:54 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Mars/mcore/zfits.h
r17305 r17402 20 20 // Basic constructor 21 21 zfits(const std::string& fname, const std::string& tableName="", bool force=false) 22 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(- 1), fHeapOff(0), fTileSize(0)22 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-2), fHeapOff(0), fTileSize(0) 23 23 { 24 24 open(fname.c_str()); … … 29 29 // Alternative contstructor 30 30 zfits(const std::string& fname, const std::string& fout, const std::string& tableName, bool force=false) 31 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(- 1), fHeapOff(0), fTileSize(0)31 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-2), fHeapOff(0), fTileSize(0) 32 32 { 33 33 open(fname.c_str()); … … 129 129 //give it some space for uncompressing 130 130 AllocateBuffers(); 131 132 //check that heap agrees with head133 //CheckIfFileIsConsistent();134 131 } 135 132 … … 156 153 size_t fNumRowsPerTile; ///< Number of rows per compressed tile 157 154 int64_t fCurrentRow; ///< current row in memory signed because we need -1 155 size_t fShrinkFactor; ///< shrink factor 158 156 159 157 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data … … 228 226 229 227 //check if the catalog has been shrinked 230 uint32_t shrink_factor = 1; 231 if (HasKey("ZSHRINK")) 232 shrink_factor = GetInt("ZSHRINK"); 233 234 if (shrink_factor != 1) 235 { 236 CheckIfFileIsConsistent(true); 237 fNumTiles = fCatalog.size(); 238 fNumRowsPerTile /= shrink_factor; 239 } 228 fShrinkFactor = HasKey("ZSHRINK") ? GetUInt("ZSHRINK") : 1; 229 230 if (fNumRowsPerTile%fShrinkFactor) 231 { 232 clear(rdstate()|std::ios::badbit); 233 #ifdef __EXCEPTIONS 234 throw std::runtime_error("Rows per tile and shrink factor do not match"); 235 #else 236 gLog << ___err___ << "ERROR - Rows per tile and shrink factor do not match" << std::endl; 237 return; 238 #endif 239 } 240 241 if (fShrinkFactor>0) 242 fNumRowsPerTile /= fShrinkFactor; 240 243 241 244 //compute the total size of each compressed tile … … 276 279 } 277 280 278 // Compressed version of the read row 281 // Compressed version of the read row, even files with shrunk catalogs 282 // can be read fully sequentially so that streaming, e.g. through 283 // stdout/stdin, is possible. 279 284 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead) 280 285 { … … 285 290 InitCompressionReading(); 286 291 287 const uint32_t requestedTile = rowNum/fNumRowsPerTile; 288 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile; 289 290 bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1)); 292 // Book keeping, where are we? 293 const int64_t requestedTile = rowNum / fNumRowsPerTile; 294 const int64_t currentTile = fCurrentRow / fNumRowsPerTile; 295 296 const int64_t requestedSuperTile = requestedTile / fShrinkFactor; 297 const int64_t currentSuperTile = currentTile / fShrinkFactor; 298 299 const int64_t requestedSubTile = requestedTile % fShrinkFactor; 300 const int64_t currentSubTile = currentTile % fShrinkFactor; 291 301 292 302 fCurrentRow = rowNum; 293 //should we read yet another chunk of data ? 294 if (requestedTile != currentTile) 295 { 296 //read yet another chunk from the file 297 const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(FITS::TileHeader); 298 303 304 // Is this just the next tile in the sequence? 305 const bool isNextTile = requestedTile==currentTile+1; 306 307 // Do we have to read a new tile from disk? 308 if (requestedTile!=currentTile) 309 { 299 310 //skip to the beginning of the tile 300 const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(FITS::TileHeader); 301 302 seekg(fHeapOff+tileStart); 303 304 //calculate the 32 bits offset of the current tile. 305 const uint32_t offset = (tileStart + fHeapFromDataStart)%4; 306 307 //point the tile header where it should be 308 //we ain't checking the header now 309 // TileHeader* tHead = reinterpret_cast<TileHeader*>(fCompressedBuffer.data()+offset); 310 311 ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8)); 312 313 //read one tile from disk 314 read(fCompressedBuffer.data()+offset, sizeToRead); 315 316 if (addCheckSum) 311 const int64_t superTileStart = fCatalog[requestedSuperTile][0].second - sizeof(FITS::TileHeader); 312 313 std::vector<size_t> offsets = fTileOffsets[requestedSuperTile]; 314 315 // If this is a sub tile we might have to step forward a bit and 316 // seek for the sub tile. If we were just reading the previous one 317 // we can skip that. 318 if (!isNextTile) 319 { 320 // step to the beginnig of the super tile 321 seekg(fHeapOff+superTileStart); 322 323 // If there are sub tiles we might have to seek through the super tile 324 for (uint32_t k=0; k<requestedSubTile; k++) 325 { 326 // Read header 327 FITS::TileHeader header; 328 read((char*)&header, sizeof(FITS::TileHeader)); 329 330 // Skip to the next header 331 seekg(header.size-sizeof(FITS::TileHeader), cur); 332 } 333 } 334 335 // this is now the beginning of the sub-tile we want to read 336 const int64_t subTileStart = tellg() - fHeapOff; 337 338 // calculate the 32 bits offset of the current tile. 339 const uint32_t offset = (subTileStart + fHeapFromDataStart)%4; 340 341 // start of destination buffer (padding comes later) 342 char *destBuffer = fCompressedBuffer.data()+offset; 343 344 // Store the current tile size once known 345 size_t currentTileSize = 0; 346 347 // If this is a request for a sub tile which is not cataloged 348 // recalculate the offsets from the buffer, once read 349 if (requestedSubTile>0) 350 { 351 // Read header 352 read(destBuffer, sizeof(FITS::TileHeader)); 353 354 // Get size of tile 355 currentTileSize = reinterpret_cast<FITS::TileHeader*>(destBuffer)->size; 356 357 // now read the remaining bytes of this tile 358 read(destBuffer+sizeof(FITS::TileHeader), currentTileSize-sizeof(FITS::TileHeader)); 359 360 // Calculate the offsets recursively 361 offsets[0] = 0; 362 363 //skip through the columns 364 for (size_t i=0; i<fTable.num_cols-1; i++) 365 { 366 //zero sized column do not have headers. Skip it 367 if (fTable.sorted_cols[i].num == 0) 368 { 369 offsets[i+1] = offsets[i]; 370 continue; 371 } 372 373 const char *pos = destBuffer + offsets[i] + sizeof(FITS::TileHeader); 374 offsets[i+1] = offsets[i] + reinterpret_cast<const FITS::BlockHeader*>(pos)->size; 375 } 376 } 377 else 378 { 379 // If we are reading the first tile of a super tile, all information 380 // is already available. 381 currentTileSize = fTileSize[requestedSuperTile] + sizeof(FITS::TileHeader); 382 read(destBuffer, currentTileSize); 383 } 384 385 386 // If we are reading sequentially, calcuakte checksum 387 if (isNextTile) 388 { 389 // Padding for checksum calculation 390 memset(fCompressedBuffer.data(), 0, offset); 391 memset(destBuffer+currentTileSize, 0, fCompressedBuffer.size()-currentTileSize-offset); 317 392 fChkData.add(fCompressedBuffer); 318 319 if (requestedTile == currentTile+1 && 320 fCopy.is_open() &&321 322 { 323 fCopy.write(fCompressedBuffer.data()+offset, sizeToRead);393 } 394 395 // Check if we are writing a copy of the file 396 if (isNextTile && fCopy.is_open() && fCopy.good()) 397 { 398 fCopy.write(fCompressedBuffer.data()+offset, currentTileSize); 324 399 if (!fCopy) 325 400 clear(rdstate()|std::ios::badbit); … … 329 404 clear(rdstate()|std::ios::badbit); 330 405 406 407 // uncompress the buffer 331 408 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile; 332 333 //uncompress it 334 UncompressBuffer(requestedTile, thisRoundNumRows, offset+sizeof(FITS::TileHeader)); 409 UncompressBuffer(offsets, thisRoundNumRows, offset+sizeof(FITS::TileHeader)); 335 410 336 411 // pointer to column (source buffer) … … 431 506 432 507 // Data has been read from disk. Uncompress it ! 433 void UncompressBuffer(const uint32_t &catalogCurrentRow,508 void UncompressBuffer(const std::vector<size_t> &offsets, 434 509 const uint32_t &thisRoundNumRows, 435 510 const uint32_t offset) … … 445 520 446 521 //get the compression flag 447 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i]+offset;522 const int64_t compressedOffset = offsets[i]+offset; 448 523 449 524 const FITS::BlockHeader* head = reinterpret_cast<FITS::BlockHeader*>(&fCompressedBuffer[compressedOffset]); … … 496 571 { 497 572 //goto start of heap 498 streamoff whereAreWe = tellg();573 const streamoff whereAreWe = tellg(); 499 574 seekg(fHeapOff); 500 575 … … 503 578 504 579 //get number of columns from header 505 size_t numCols = fTable.num_cols;580 const size_t numCols = fTable.num_cols; 506 581 507 582 std::vector<std::vector<std::pair<int64_t, int64_t> > > catalog; … … 527 602 528 603 //a new tile begins here 529 catalog.emplace_back( std::vector<std::pair<int64_t, int64_t> >(0));604 catalog.emplace_back(); 530 605 offsetInHeap += sizeof(FITS::TileHeader); 531 606
Note:
See TracChangeset
for help on using the changeset viewer.