Ignore:
Timestamp:
10/18/13 13:49:17 (11 years ago)
Author:
lyard
Message:
zofits with catalog shrinking and compressed calibration table
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/Mars/mcore/zofits.h

    r17243 r17253  
    3737        };
    3838
     39        //catalog types
     40        struct CatalogEntry
     41        {
     42            CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};
     43            int64_t first;   ///< Size of this column in the tile
     44            int64_t second; ///< offset of this column in the tile, from the start of the heap area
     45        } __attribute__((__packed__));
     46        typedef vector<CatalogEntry> CatalogRow;
     47        typedef list<CatalogRow>     CatalogType;
     48
    3949        /// Parameters required to write a tile to disk
    4050        struct WriteTarget
     
    5363        struct CompressionTarget
    5464        {
    55             shared_ptr<MemoryChunk> src;             ///< Original data
    56             shared_ptr<MemoryChunk> transposed_src; ///<  Transposed data
    57             WriteTarget             target;        ///<   Compressed data
    58             uint32_t                num_rows;     ///<    Number of rows to compress
    59         };
     65            CompressionTarget(CatalogRow& r) : catalog_entry(r)
     66            {}
     67
     68            CatalogRow&             catalog_entry;   ///< Reference to the catalog entry to deal with
     69            shared_ptr<MemoryChunk> src;            ///<  Original data
     70            shared_ptr<MemoryChunk> transposed_src;///<   Transposed data
     71            WriteTarget             target;       ///<    Compressed data
     72            uint32_t                num_rows;    ///<     Number of rows to compress
     73         };
    6074
    6175public:
     
    112126            fBuffer           = NULL;
    113127            fRealRowWidth     = 0;
    114             fCatalogExtraRows = 0;
    115128            fCatalogOffset    = 0;
     129            fCatalogSize      = 0;
    116130
    117131            fMaxUsableMem = maxUsableMem;
     
    163177            SetStr(  "RAWSUM",   "         0",    "Checksum of raw little endian data");
    164178            SetFloat("ZRATIO",   0,               "Compression ratio");
    165 
    166             fCatalogExtraRows = 0;
     179            SetInt(  "ZSHRINK",  1,               "Catalog shrink factor");
     180
     181            fCatalogSize      = 0;
     182            fCatalog.clear();
    167183            fRawSum.reset();
    168184        }
     
    187203        {
    188204            const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
    189             const uint32_t total_catalog_size   = fCatalog.size()*one_catalog_row_size;
     205            const uint32_t total_catalog_size   = fNumTiles*one_catalog_row_size;//fCatalog.size()*one_catalog_row_size;
    190206
    191207            // swap the catalog bytes before writing
     
    198214            }
    199215
     216            if (fCatalogSize < fNumTiles)
     217                memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
     218
    200219            // first time writing ? remember where we are
    201220            if (fCatalogOffset == 0)
     
    208227            seekp(fCatalogOffset);
    209228            write(swapped_catalog.data(), total_catalog_size);
     229
    210230            if (where_are_we != fCatalogOffset)
    211231                seekp(where_are_we);
     
    224244        }
    225245
    226         /// Grows the catalog in case not enough rows were allocated
    227         void GrowCatalog()
    228         {
    229             uint32_t orig_catalog_size = fCatalog.size();
    230 
    231             fCatalog.resize(fCatalog.size()*2);
    232             for (uint32_t i=orig_catalog_size;i<fCatalog.size(); i++)
    233             {
    234                 fCatalog[i].resize(fTable.num_cols);
    235                 for (auto it=(fCatalog[i].begin()); it!=fCatalog[i].end(); it++)
    236                     *it = CatalogEntry(0,0);
    237             }
    238 
    239             fCatalogExtraRows += orig_catalog_size;
    240             fNumTiles         += orig_catalog_size;
    241         }
    242 
     246        CatalogRow& AddOneCatalogRow()
     247        {
     248            // add one row to the catalog
     249            fCatalog.emplace_back(CatalogRow());
     250            fCatalog.back().resize(fTable.num_cols);
     251            for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
     252                *it = CatalogEntry(0,0);
     253
     254            fCatalogSize++;
     255
     256            return fCatalog.back();
     257        }
    243258        /// write one row of data
    244259        /// @param ptr the source buffer
     
    253268#else
    254269                gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << endl;
    255                 return false;
    256 #endif
    257             }
    258 
    259             if (fTable.num_rows >= fNumRowsPerTile*fNumTiles)
    260             {
    261 //                GrowCatalog();
    262 #ifdef __EXCEPTIONS
    263                 throw runtime_error("Maximum number of rows exceeded for this file");
    264 #else
    265                 gLog << ___err___ << "ERROR - Maximum number of rows exceeded for this file" << endl;
    266270                return false;
    267271#endif
     
    294298            fRawSum.add(fRawSumBuffer, false);
    295299
     300            fTable.num_rows++;
     301
    296302            DrsOffsetCalibrate(target_location);
    297303
    298             fTable.num_rows++;
    299 
    300304            if (fTable.num_rows % fNumRowsPerTile == 0)
    301305            {
    302                 CompressionTarget compress_target;
     306                CompressionTarget compress_target(AddOneCatalogRow());
    303307                SetNextCompression(compress_target);
    304308
     
    374378        void ShrinkCatalog()
    375379        {
     380            //add empty row to get either the target number of rows, or a multiple of the allowed size
     381            for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
     382                AddOneCatalogRow();
     383
    376384            //did we write more rows than what the catalog could host ?
    377             if (fCatalogExtraRows != 0)
    378             {
    379                 //how many rows can the regular catalog host ?
    380                 const uint32_t max_regular_rows = (fCatalog.size() - fCatalogExtraRows)*fNumRowsPerTile;
    381                 //what's the shrink factor to be applied ?
    382                 const uint32_t shrink_factor = fTable.num_rows/max_regular_rows + ((fTable.num_rows%max_regular_rows) ? 1 : 0);
     385            if (fCatalogSize > fNumTiles)
     386            {
     387                const uint32_t shrink_factor = fCatalogSize / fNumTiles; //always exact as extra rows were added just above
    383388
    384389                //shrink the catalog !
    385                 for (uint32_t i=0; i<fTable.num_rows/fNumRowsPerTile; i+= shrink_factor)
    386                 {//add the elements one by one, so that the empty ones at the end (i.e. fTable.num_rows%shrink_factor) do not create havok
    387                     const uint32_t target_catalog_row = i/shrink_factor;
    388                     //move data from current row (i) to target row
    389                     for (uint32_t j=0; j<fTable.num_cols; j++)
    390                     {
    391                         fCatalog[target_catalog_row][j].second = fCatalog[i][j].second;
    392                         fCatalog[target_catalog_row][j].first  = 0;
    393                         uint64_t last_size   = fCatalog[i][j].first;
    394                         uint64_t last_offset = fCatalog[i][j].second;
    395 
    396                         for (uint32_t k=1; k<shrink_factor; k++)
    397                         {
    398                            if (fCatalog[i+k][j].second != 0)
    399                            {
    400                                fCatalog[target_catalog_row][j].first +=  fCatalog[i+k][j].second - last_offset;
    401                            }
    402                            else
    403                            {
    404                                fCatalog[target_catalog_row][j].first += last_size;
    405                                break;
    406                            }
    407                            last_size   = fCatalog[i+k][j].first;
    408                            last_offset = fCatalog[i+k][j].second;
    409                         }
    410                     }
     390                uint32_t entry_id = 1;
     391                auto it = fCatalog.begin();
     392                it++;
     393                for (; it != fCatalog.end(); it++)
     394                {
     395                    if (entry_id >= fNumTiles) break;
     396
     397                    uint32_t target_id = entry_id*shrink_factor;
     398
     399                    auto jt = it;
     400                    for (uint32_t i=0;i<target_id-entry_id;i++)
     401                        jt++;
     402
     403                    *it = *jt;
     404
     405                    entry_id++;
    411406                }
    412407
    413                 fCatalog.resize(fCatalog.size() - fCatalogExtraRows);
    414 
     408                const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
     409                //remove the too many entries
     410                for (uint32_t i=0;i<num_tiles_to_remove;i++)
     411                {
     412                    fCatalog.pop_back();
     413                    fCatalogSize--;
     414                }
    415415                //update header keywords
    416                 const uint32_t new_num_rows_per_tiles = fNumRowsPerTile*shrink_factor;
    417                 const uint32_t new_num_tiles_written = (fTable.num_rows + new_num_rows_per_tiles-1)/new_num_rows_per_tiles;
    418                 SetInt("THEAP", new_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols);
    419                 SetInt("NAXIS2", new_num_tiles_written);
    420                 SetInt("ZTILELEN", new_num_rows_per_tiles);
    421                 cout << "New num rows per tiles: " << new_num_rows_per_tiles << " shrink factor: " << shrink_factor << endl;
    422                 cout << "Num tiles written: " << new_num_tiles_written << endl;
     416                fNumRowsPerTile *= shrink_factor;
     417
     418                SetInt("ZTILELEN", fNumRowsPerTile);
     419                SetInt("ZSHRINK",  shrink_factor);
    423420            }
    424421        }
     
    455452            if (fTable.num_rows%fNumRowsPerTile != 0)
    456453            {
    457                 CompressionTarget compress_target;
     454                CompressionTarget compress_target(AddOneCatalogRow());
    458455                SetNextCompression(compress_target);
    459456
     
    475472            AlignTo2880Bytes();
    476473
     474            int64_t heap_size = 0;
     475            int64_t compressed_offset = 0;
     476            for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
     477            {
     478                compressed_offset += sizeof(TileHeader);
     479                heap_size         += sizeof(TileHeader);
     480                for (uint32_t j=0; j<it->size(); j++)
     481                {
     482                    heap_size += (*it)[j].first;
     483                    (*it)[j].second = compressed_offset;
     484                    compressed_offset += (*it)[j].first;
     485                    if ((*it)[j].first == 0)
     486                        (*it)[j].second = 0;
     487                }
     488            }
     489
     490            ShrinkCatalog();
     491
    477492            //update header keywords
    478493            SetInt("ZNAXIS1", fRealRowWidth);
    479494            SetInt("ZNAXIS2", fTable.num_rows);
    480495
    481             SetInt("ZHEAPPTR", fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2);
     496            SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
    482497
    483498            const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
     
    492507            SetStr("RAWSUM", str.str());
    493508
    494             int64_t heap_size = 0;
    495             int64_t compressed_offset = 0;
    496 
    497             for (uint32_t i=0; i<total_num_tiles_written; i++)
    498             {
    499                 compressed_offset += sizeof(TileHeader);
    500                 heap_size         += sizeof(TileHeader);
    501                 for (uint32_t j=0; j<fCatalog[i].size(); j++)
    502                 {
    503                     heap_size += fCatalog[i][j].first;
    504                     fCatalog[i][j].second = compressed_offset;
    505                     compressed_offset += fCatalog[i][j].first;
    506                     if (fCatalog[i][j].first == 0)
    507                         fCatalog[i][j].second = 0;
    508                 }
    509             }
    510 
    511509            const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
    512510            SetFloat("ZRATIO", compression_ratio);
    513511
    514512            //add to the heap size the size of the gap between the catalog and the actual heap
    515             heap_size += (fCatalog.size() - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
     513            heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
    516514
    517515            SetInt("PCOUNT", heap_size, "size of special data area");
     
    630628
    631629            fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
    632 
    633             //give the catalog enough space
    634             fCatalog.resize(fNumTiles);
    635             for (uint32_t i=0;i<fNumTiles;i++)
    636             {
    637                 fCatalog[i].resize(fRealColumns.size());
    638                 for (auto it=fCatalog[i].begin(); it!=fCatalog[i].end(); it++)
    639                     *it = CatalogEntry(0,0);
    640             }
    641630        }
    642631
     
    667656
    668657            fCheckOffset = (4 - extraBytes)%4;
     658
    669659            //write data to disk
    670660            write(src+4, sizeToWrite);
     
    679669        {
    680670            uint64_t compressed_size = 0;
     671
     672            //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
     673            //Really do not understand what's wrong...
     674            //calibrate data if required
     675            //const uint32_t thisRoundNumRows  = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
     676//            for (uint32_t i=0;i<thisRoundNumRows;i++)
     677//            {
     678//                char* target_location = target.src.get()->get() + fRealRowWidth*i;
     679//                cout << "Target Location there...." << hex << static_cast<void*>(target_location) << endl;
     680//                DrsOffsetCalibrate(target_location);
     681//            }
     682
    681683#ifdef __EXCEPTIONS
    682684            try
     
    687689
    688690                //compress the buffer
    689                 compressed_size = compressBuffer(target.target.data.get()->get(), target.transposed_src.get()->get(), target.num_rows);
     691                compressed_size = compressBuffer(target.target.data.get()->get(), target.transposed_src.get()->get(), target.num_rows, target.catalog_entry);
    690692#ifdef __EXCEPTIONS
    691693            }
     
    762764        /// @param num_rows the number of uncompressed rows in the transposed buffer
    763765        /// @param the number of bytes of the compressed data
    764         uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows)
     766        uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
    765767        {
    766768            const uint32_t thisRoundNumRows  = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
    767             const uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile;
    768769            uint32_t       offset            = 0;
    769770
     
    777778            for (uint32_t i=0;i<fRealColumns.size();i++)
    778779            {
    779                 fCatalog[currentCatalogRow][i].second = compressedOffset;
     780                catalog_row[i].second = compressedOffset;
    780781
    781782                if (fRealColumns[i].col.num == 0) continue;
     
    811812                if ((head.getProc(0) != kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
    812813                {//if so set flag and redo it uncompressed
    813                    // cout << "Redoing uncompressed ! " << endl;
    814814                    //de-smooth !
    815815                    if (head.getProc(0) == kFactSmoothing)
     
    826826                    offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
    827827
    828                     fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
     828                    catalog_row[i].first = compressedOffset - catalog_row[i].second;
    829829                    continue;
    830830                }
     
    834834
    835835                offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
    836                 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
     836                catalog_row[i].first = compressedOffset - catalog_row[i].second;
    837837            }
    838838
     
    968968
    969969        // catalog related stuff
    970         struct CatalogEntry
    971         {
    972             CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};
    973             int64_t first;   ///< Size of this column in the tile
    974             int64_t second; ///< offset of this column in the tile, from the start of the heap area
    975         } __attribute__((__packed__));
    976 
    977         typedef vector<CatalogEntry> CatalogRow;
    978         typedef vector<CatalogRow>     CatalogType;
    979970        CatalogType fCatalog;               ///< Catalog for this file
    980 //        uint32_t    fCatalogSize;          ///<  Actual catalog size (.size() is slow on large lists)
     971        uint32_t    fCatalogSize;          ///<  Actual catalog size (.size() is slow on large lists)
    981972        uint32_t    fNumTiles;            ///<   Number of pre-reserved tiles
    982973        uint32_t    fNumRowsPerTile;     ///<    Number of rows per tile
    983974        off_t       fCatalogOffset;     ///<     Offset of the catalog from the beginning of the file
    984         uint32_t    fCatalogExtraRows; ///<      Number of extra rows written on top of the initial capacity of the file
    985975
    986976        // checksum related stuff
Note: See TracChangeset for help on using the changeset viewer.