Changeset 17402 for trunk/Mars


Ignore:
Timestamp:
11/29/13 18:12:54 (11 years ago)
Author:
tbretz
Message:
Updated to allow a real streaming, even for shrunk files.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/Mars/mcore/zfits.h

    r17305 r17402  
    2020    // Basic constructor
    2121    zfits(const std::string& fname, const std::string& tableName="", bool force=false)
    22         : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
     22        : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-2), fHeapOff(0), fTileSize(0)
    2323    {
    2424        open(fname.c_str());
     
    2929    // Alternative contstructor
    3030    zfits(const std::string& fname, const std::string& fout, const std::string& tableName, bool force=false)
    31         : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
     31        : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-2), fHeapOff(0), fTileSize(0)
    3232    {
    3333        open(fname.c_str());
     
    129129        //give it some space for uncompressing
    130130        AllocateBuffers();
    131 
    132         //check that heap agrees with head
    133         //CheckIfFileIsConsistent();
    134131    }
    135132
     
    156153    size_t fNumRowsPerTile; ///< Number of rows per compressed tile
    157154    int64_t fCurrentRow;    ///< current row in memory signed because we need -1
     155    size_t fShrinkFactor;   ///< shrink factor
    158156
    159157    streamoff fHeapOff;           ///< offset from the beginning of the file of the binary data
     
    228226
    229227        //check if the catalog has been shrinked
    230         uint32_t shrink_factor = 1;
    231         if (HasKey("ZSHRINK"))
    232                 shrink_factor = GetInt("ZSHRINK");
    233 
    234         if (shrink_factor != 1)
    235         {
    236             CheckIfFileIsConsistent(true);
    237             fNumTiles = fCatalog.size();
    238             fNumRowsPerTile /= shrink_factor;
    239         }
     228        fShrinkFactor = HasKey("ZSHRINK") ? GetUInt("ZSHRINK") : 1;
     229
     230        if (fNumRowsPerTile%fShrinkFactor)
     231        {
     232            clear(rdstate()|std::ios::badbit);
     233#ifdef __EXCEPTIONS
     234            throw std::runtime_error("Rows per tile and shrink factor do not match");
     235#else
     236            gLog << ___err___ << "ERROR - Rows per tile and shrink factor do not match" << std::endl;
     237            return;
     238#endif
     239        }
     240
     241        if (fShrinkFactor>0)
     242            fNumRowsPerTile /= fShrinkFactor;
    240243
    241244        //compute the total size of each compressed tile
     
    276279    }
    277280
    278     // Compressed version of the read row
     281    // Compressed version of the read row, even files with shrunk catalogs
     282    // can be read fully sequentially so that streaming, e.g. through
     283    // stdout/stdin, is possible.
    279284    bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
    280285    {
     
    285290            InitCompressionReading();
    286291
    287         const uint32_t requestedTile = rowNum/fNumRowsPerTile;
    288         const uint32_t currentTile   = fCurrentRow/fNumRowsPerTile;
    289 
    290         bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1));
     292        // Book keeping, where are we?
     293        const int64_t requestedTile      = rowNum        / fNumRowsPerTile;
     294        const int64_t currentTile        = fCurrentRow   / fNumRowsPerTile;
     295
     296        const int64_t requestedSuperTile = requestedTile / fShrinkFactor;
     297        const int64_t currentSuperTile   = currentTile   / fShrinkFactor;
     298
     299        const int64_t requestedSubTile   = requestedTile % fShrinkFactor;
     300        const int64_t currentSubTile     = currentTile   % fShrinkFactor;
    291301
    292302        fCurrentRow = rowNum;
    293         //should we read yet another chunk of data ?
    294         if (requestedTile != currentTile)
    295         {
    296             //read yet another chunk from the file
    297             const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(FITS::TileHeader);
    298 
     303
     304        // Is this just the next tile in the sequence?
     305        const bool isNextTile = requestedTile==currentTile+1;
     306
     307        // Do we have to read a new tile from disk?
     308        if (requestedTile!=currentTile)
     309        {
    299310            //skip to the beginning of the tile
    300             const int64_t tileStart =  fCatalog[requestedTile][0].second - sizeof(FITS::TileHeader);
    301 
    302             seekg(fHeapOff+tileStart);
    303 
    304             //calculate the 32 bits offset of the current tile.
    305             const uint32_t offset = (tileStart + fHeapFromDataStart)%4;
    306 
    307             //point the tile header where it should be
    308             //we ain't checking the header now
    309 //            TileHeader* tHead = reinterpret_cast<TileHeader*>(fCompressedBuffer.data()+offset);
    310 
    311             ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8));
    312 
    313             //read one tile from disk
    314             read(fCompressedBuffer.data()+offset, sizeToRead);
    315 
    316             if (addCheckSum)
     311            const int64_t superTileStart = fCatalog[requestedSuperTile][0].second - sizeof(FITS::TileHeader);
     312
     313            std::vector<size_t> offsets = fTileOffsets[requestedSuperTile];
     314
     315            // If this is a sub tile we might have to step forward a bit and
     316            // seek for the sub tile. If we were just reading the previous one
     317            // we can skip that.
     318            if (!isNextTile)
     319            {
     320                // step to the beginnig of the super tile
     321                seekg(fHeapOff+superTileStart);
     322
     323                // If there are sub tiles we might have to seek through the super tile
     324                for (uint32_t k=0; k<requestedSubTile; k++)
     325                {
     326                    // Read header
     327                    FITS::TileHeader header;
     328                    read((char*)&header, sizeof(FITS::TileHeader));
     329
     330                    // Skip to the next header
     331                    seekg(header.size-sizeof(FITS::TileHeader), cur);
     332                }
     333            }
     334
     335            // this is now the beginning of the sub-tile we want to read
     336            const int64_t subTileStart = tellg() - fHeapOff;
     337
     338            // calculate the 32 bits offset of the current tile.
     339            const uint32_t offset = (subTileStart + fHeapFromDataStart)%4;
     340
     341            // start of destination buffer (padding comes later)
     342            char *destBuffer = fCompressedBuffer.data()+offset;
     343
     344            // Store the current tile size once known
     345            size_t currentTileSize = 0;
     346
     347            // If this is a request for a sub tile which is not cataloged
     348            // recalculate the offsets from the buffer, once read
     349            if (requestedSubTile>0)
     350            {
     351                // Read header
     352                read(destBuffer, sizeof(FITS::TileHeader));
     353
     354                // Get size of tile
     355                currentTileSize = reinterpret_cast<FITS::TileHeader*>(destBuffer)->size;
     356
     357                // now read the remaining bytes of this tile
     358                read(destBuffer+sizeof(FITS::TileHeader), currentTileSize-sizeof(FITS::TileHeader));
     359
     360                // Calculate the offsets recursively
     361                offsets[0] = 0;
     362
     363                //skip through the columns
     364                for (size_t i=0; i<fTable.num_cols-1; i++)
     365                {
     366                    //zero sized column do not have headers. Skip it
     367                    if (fTable.sorted_cols[i].num == 0)
     368                    {
     369                        offsets[i+1] = offsets[i];
     370                        continue;
     371                    }
     372
     373                    const char *pos = destBuffer + offsets[i] + sizeof(FITS::TileHeader);
     374                    offsets[i+1] = offsets[i] + reinterpret_cast<const FITS::BlockHeader*>(pos)->size;
     375                }
     376            }
     377            else
     378            {
     379                // If we are reading the first tile of a super tile, all information
     380                // is already available.
     381                currentTileSize = fTileSize[requestedSuperTile] + sizeof(FITS::TileHeader);
     382                read(destBuffer, currentTileSize);
     383            }
     384
     385
     386            // If we are reading sequentially, calcuakte checksum
     387            if (isNextTile)
     388            {
     389                // Padding for checksum calculation
     390                memset(fCompressedBuffer.data(),   0, offset);
     391                memset(destBuffer+currentTileSize, 0, fCompressedBuffer.size()-currentTileSize-offset);
    317392                fChkData.add(fCompressedBuffer);
    318 
    319             if (requestedTile == currentTile+1 &&
    320                 fCopy.is_open() &&
    321                 fCopy.good())
    322             {
    323                 fCopy.write(fCompressedBuffer.data()+offset, sizeToRead);
     393            }
     394
     395            // Check if we are writing a copy of the file
     396            if (isNextTile && fCopy.is_open() && fCopy.good())
     397            {
     398                fCopy.write(fCompressedBuffer.data()+offset, currentTileSize);
    324399                if (!fCopy)
    325400                    clear(rdstate()|std::ios::badbit);
     
    329404                    clear(rdstate()|std::ios::badbit);
    330405
     406
     407            // uncompress  the buffer
    331408            const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
    332 
    333             //uncompress it
    334             UncompressBuffer(requestedTile, thisRoundNumRows, offset+sizeof(FITS::TileHeader));
     409            UncompressBuffer(offsets, thisRoundNumRows, offset+sizeof(FITS::TileHeader));
    335410
    336411            // pointer to column (source buffer)
     
    431506
    432507    // Data has been read from disk. Uncompress it !
    433     void UncompressBuffer(const uint32_t &catalogCurrentRow,
     508    void UncompressBuffer(const std::vector<size_t> &offsets,
    434509                          const uint32_t &thisRoundNumRows,
    435510                          const uint32_t offset)
     
    445520
    446521            //get the compression flag
    447             const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i]+offset;
     522            const int64_t compressedOffset = offsets[i]+offset;
    448523
    449524            const FITS::BlockHeader* head = reinterpret_cast<FITS::BlockHeader*>(&fCompressedBuffer[compressedOffset]);
     
    496571    {
    497572        //goto start of heap
    498         streamoff whereAreWe = tellg();
     573        const streamoff whereAreWe = tellg();
    499574        seekg(fHeapOff);
    500575
     
    503578
    504579        //get number of columns from header
    505         size_t numCols = fTable.num_cols;
     580        const size_t numCols = fTable.num_cols;
    506581
    507582        std::vector<std::vector<std::pair<int64_t, int64_t> > > catalog;
     
    527602
    528603            //a new tile begins here
    529             catalog.emplace_back(std::vector<std::pair<int64_t, int64_t> >(0));
     604            catalog.emplace_back();
    530605            offsetInHeap += sizeof(FITS::TileHeader);
    531606
Note: See TracChangeset for help on using the changeset viewer.