Changeset 17258 for trunk/Mars


Ignore:
Timestamp:
10/18/13 16:10:03 (11 years ago)
Author:
tbretz
Message:
Replaced calls to post by emplace. Therefore implemented new construtor of WriteTarget to simplify call. Removed some arts from the comments. Instead of throwing an exception if writing fails, errno is propagated to the WriteRow. Instead of throwing an exception for 0 tiles, use 1 instead, this is not critical. Check for possible exceptions from the queues in every WriteRow call (we don't want to process a whole file to just realize at the close() call that something is wrong. Added a few more const qualifiers; replaced ostringstream by to_string where appropriate for perfromance reasons.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/Mars/mcore/zofits.h

    r17254 r17258  
    5858            }
    5959
     60            WriteTarget() { }
     61            WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
     62
    6063            uint32_t         tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
    6164            uint32_t         size;     ///< Size to write
    62             shared_ptr<char> data;     ///<  Memory block to write
     65            shared_ptr<char> data;     ///< Memory block to write
    6366        };
    6467
     
    119122        void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
    120123        {
    121             if (nt == 0)
    122                 throw runtime_error("There must be at least 1 tile of data (0 specified). This is required by the FITS standard. Please try again with num_tile >= 1.");
    123 
    124124            fCheckOffset = 0;
    125125            fNumQueues   = 0;
    126126
    127             fNumTiles       = nt;
     127            fNumTiles       = nt==0 ? 1 : nt;
    128128            fNumRowsPerTile = rpt;
    129129
     
    136136            fThreadsException = exception_ptr();
    137137#endif
     138            fErrno = 0;
    138139        }
    139140
     
    206207        {
    207208            const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
    208             const uint32_t total_catalog_size   = fNumTiles*one_catalog_row_size;//fCatalog.size()*one_catalog_row_size;
     209            const uint32_t total_catalog_size   = fNumTiles*one_catalog_row_size;
    209210
    210211            // swap the catalog bytes before writing
    211212            vector<char> swapped_catalog(total_catalog_size);
     213
    212214            uint32_t shift = 0;
    213             for (auto it=fCatalog.begin(); it!=fCatalog.end(); it++)
     215            for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
    214216            {
    215217                revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
     
    259261            return fCatalog.back();
    260262        }
     263
    261264        /// write one row of data
     265        /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather
     266        /// than the badbit() of the stream (it might have been set by a thread before the errno has been set)
     267        /// errno will then contain the correct error number of the last error which happened during writing.
    262268        /// @param ptr the source buffer
    263269        /// @param the number of bytes to write
     
    275281            }
    276282
     283#ifdef __EXCEPTIONS
     284            //check if something hapenned while the compression threads were working
     285            //if so, re-throw the exception that was generated
     286            if (fThreadsException != exception_ptr())
     287                rethrow_exception(fThreadsException);
     288#endif
    277289            //copy current row to pool or rows waiting for compression
    278290            char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
     
    305317            DrsOffsetCalibrate(target_location);
    306318
    307             if (fTable.num_rows % fNumRowsPerTile == 0)
    308             {
    309                 CompressionTarget compress_target(AddOneCatalogRow());
    310                 SetNextCompression(compress_target);
    311 
    312                 if (fNumQueues == 0)
    313                 { //no worker threads. do everything in-line
    314                     const uint64_t size_to_write = CompressBuffer(compress_target);
    315 
    316                     WriteTarget write_target;
    317                     write_target.size     = size_to_write;
    318                     write_target.data   = compress_target.target.data;
    319                     write_target.tile_num = compress_target.target.tile_num;
    320 
    321                     return WriteBufferToDisk(write_target);
     319            if (fTable.num_rows % fNumRowsPerTile != 0)
     320            {
     321                errno = fErrno;
     322                return errno==0;
     323            }
     324
     325            CompressionTarget compress_target(AddOneCatalogRow());
     326            SetNextCompression(compress_target);
     327
     328            //no worker threads. do everything in-line
     329            if (fNumQueues == 0)
     330            {
     331                const uint64_t size_to_write = CompressBuffer(compress_target);
     332
     333                const WriteTarget write_target(compress_target.target, size_to_write);
     334                if (!WriteBufferToDisk(write_target))
     335                    throw runtime_error("Unexpected tile number mismatch in WriteBufferToDisk in the main thread.");
     336
     337                // The correct 'errno' is set, because it is the main thread.
     338                return good();
     339            }
     340
     341            //if all queues are empty, use queue 0
     342            uint32_t min_index     = 0;
     343            uint32_t min_size      = numeric_limits<uint32_t>::max();
     344            uint32_t current_index = 0;
     345
     346            for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++)
     347            {
     348                if (it->size() < min_size)
     349                {
     350                    min_index = current_index;
     351                    min_size = it->size();
    322352                }
    323                 else
    324                 {
    325                     //if all queues are empty, use queue 0
    326                      uint32_t min_index     = 0;
    327                      uint32_t min_size      = numeric_limits<uint32_t>::max();
    328                      uint32_t current_index = 0;
    329 
    330                      for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++)
    331                      {
    332                          if (it->size() < min_size)
    333                          {
    334                              min_index = current_index;
    335                              min_size = it->size();
    336                          }
    337                          current_index++;
    338                      }
    339 
    340                     if (!fCompressionQueues[min_index].post(compress_target))
    341                         throw runtime_error("The compression queues are not started. Did you close the file before writing this row ?");
    342                 }
    343             }
    344 
    345             return good();
     353                current_index++;
     354            }
     355
     356            if (!fCompressionQueues[min_index].emplace(compress_target))
     357                throw runtime_error("The compression queues are not started. Did you close the file before writing this row?");
     358
     359            errno = fErrno;
     360            return errno==0;
    346361        }
    347362
     
    358373        void SetNextCompression(CompressionTarget& target)
    359374        {
    360             //get space for transposed data
    361             shared_ptr<char> transposed_data = fMemPool.malloc();
     375            //fill up compression target
     376            target.src            = fSmartBuffer;
     377            target.transposed_src = fMemPool.malloc();
     378            target.num_rows       = fTable.num_rows;
    362379
    363380            //fill up write to disk target
    364             WriteTarget write_target;
     381            WriteTarget &write_target = target.target;
    365382            write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
    366383            write_target.size     = 0;
    367384            write_target.data     = fMemPool.malloc();
    368 
    369             //fill up compression target
    370             target.src            = fSmartBuffer;
    371             target.transposed_src = transposed_data;
    372             target.target         = write_target;
    373             target.num_rows       = fTable.num_rows;
    374385
    375386            //get a new buffer to host the incoming data
     
    434445
    435446            if (tellp() < 0)
    436             {
    437 #ifdef __EXCEPTIONS
    438                 throw runtime_error("Looks like the file has been closed already");
    439 #else
    440447                return false;
    441 #endif
    442             }
    443448
    444449#ifdef __EXCEPTIONS
    445450            //check if something hapenned while the compression threads were working
     451            //if so, re-throw the exception that was generated
    446452            if (fThreadsException != exception_ptr())
    447             {
    448                 //if so, re-throw the exception that was generated
    449453                rethrow_exception(fThreadsException);
    450             }
    451454#endif
    452455
     
    458461
    459462                //set number of threads to zero before calling compressBuffer
    460                 int32_t backup_num_queues = fNumQueues;
     463                const int32_t backup_num_queues = fNumQueues;
    461464                fNumQueues = 0;
    462                 uint64_t size_to_write = CompressBuffer(compress_target);
     465
     466                const uint64_t size_to_write = CompressBuffer(compress_target);
    463467                fNumQueues = backup_num_queues;
    464468
    465                 WriteTarget write_target;
    466                 write_target.size     = size_to_write;
    467                 write_target.data   = compress_target.target.data;
    468                 write_target.tile_num = compress_target.target.tile_num;
    469 
     469                const WriteTarget write_target(compress_target.target, size_to_write);
    470470                if (!WriteBufferToDisk(write_target))
    471                     throw runtime_error("Something went wrong while writing the last tile...");
     471                    throw runtime_error("Tile number mismatch in WriteBufferToDisk writing the last tile.");
    472472            }
    473473
     
    504504            SetInt("NAXIS1", total_catalog_width);
    505505            SetInt("NAXIS2", total_num_tiles_written);
    506 
    507             ostringstream str;
    508             str << fRawSum.val();
    509             SetStr("RAWSUM", str.str());
     506            SetStr("RAWSUM", to_string(fRawSum.val()));
    510507
    511508            const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
     
    551548                return false;
    552549
     550            const size_t size = SizeFromType(typechar);
     551
    553552            Table::Column col;
    554             size_t size = SizeFromType(typechar);
    555 
    556553            col.name   = name;
    557554            col.type   = typechar;
     
    562559            fRealRowWidth += size*cnt;
    563560
    564             fRealColumns.emplace_back(CompressedColumn(col, comp));
    565 
    566             ostringstream strKey, strVal, strCom;
    567             strKey << "ZFORM" << fRealColumns.size();
    568             strVal << cnt << typechar;
    569             strCom << "format of " << name << " [" << CommentFromType(typechar);
    570             SetStr(strKey.str(), strVal.str(), strCom.str());
    571 
    572             strKey.str("");
    573             strVal.str("");
    574             strCom.str("");
    575             strKey << "ZCTYP" << fRealColumns.size();
    576             strVal << "FACT";
    577             strCom << "Compression type FACT";
    578             SetStr(strKey.str(), strVal.str(), strCom.str());
     561            fRealColumns.emplace_back(col, comp);
     562
     563            ostringstream str;
     564            str << "format of " << name << " [" << CommentFromType(typechar);
     565
     566            SetStr("ZFORM"+to_string(fRealColumns.size()), to_string(cnt)+typechar, str.str());
     567            SetStr("ZCTYP"+to_string(fRealColumns.size()), "FACT", "Compression type: FACT");
    579568
    580569            return true;
     
    639628            int32_t extraBytes = 0;
    640629            uint32_t sizeToChecksum = sizeToWrite;
     630
     631            //should we extend the array to the left ?
    641632            if (fCheckOffset != 0)
    642             {//should we extend the array to the left ?
    643                 sizeToChecksum += fCheckOffset;
     633            {
     634                sizeToChecksum  += fCheckOffset;
    644635                checkSumPointer -= fCheckOffset;
    645636                memset(checkSumPointer, 0, fCheckOffset);
    646637            }
     638
     639            //should we extend the array to the right ?
    647640            if (sizeToChecksum%4 != 0)
    648             {//should we extend the array to the right ?
     641            {
    649642                extraBytes = 4 - (sizeToChecksum%4);
    650                 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
     643                memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
    651644                sizeToChecksum += extraBytes;
    652645            }
     
    705698            //post the result to the writing queue
    706699            //get a copy so that it becomes non-const
    707             WriteTarget wt;
    708             wt.tile_num = target.target.tile_num;
    709             wt.size     = compressed_size;
    710             wt.data   = target.target.data;
    711 
    712             fWriteToDiskQueue.post(wt);
     700            fWriteToDiskQueue.emplace(target.target, compressed_size);
    713701
    714702            // if used by the queue, always return true as the elements are not ordered
     
    730718            {
    731719#endif
     720                //could not write the data to disk
    732721                if (!writeCompressedDataToDisk(target.data.get(), target.size))
    733                 {//could not write the data to disk
    734                     ostringstream str;
    735                     str << "An error occured while writing to disk: ";
    736                     if (eof())
    737                         str << "End-Of-File";
    738                     if (failbit)
    739                         str << "Logical error on i/o operation";
    740                     if (badbit)
    741                         str << "Writing error on i/o operation";
    742 #ifdef __EXCEPTIONS
    743                     throw runtime_error(str.str());
    744 #else
    745                     gLog << ___err___ << "ERROR - " << str.str() << endl;
    746 #endif
    747                 }
    748 #ifdef __EXCEPTIONS
    749             }
    750             catch(...)
     722                    fErrno = errno;
     723#ifdef __EXCEPTIONS
     724            }
     725            catch (...)
    751726            {
    752727                fThreadsException = current_exception();
     
    780755                catalog_row[i].second = compressedOffset;
    781756
    782                 if (fRealColumns[i].col.num == 0) continue;
     757                if (fRealColumns[i].col.num == 0)
     758                    continue;
    783759
    784760                Compression& head = fRealColumns[i].block_head;
     
    794770                    switch (head.getProc(j))
    795771                    {
    796                         case kFactRaw:
    797                                 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src  + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
     772                    case kFactRaw:
     773                        compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src  + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
    798774                        break;
    799                         case kFactSmoothing:
    800                                 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
     775
     776                    case kFactSmoothing:
     777                        applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
    801778                        break;
    802                         case kFactHuffman16:
    803                             if (head.getOrdering() == kOrderByCol)
    804                                 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src  + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
    805                             else
    806                                 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src  + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
     779
     780                    case kFactHuffman16:
     781                        if (head.getOrdering() == kOrderByCol)
     782                            compressedOffset += compressHUFFMAN16(dest + compressedOffset, src  + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
     783                        else
     784                            compressedOffset += compressHUFFMAN16(dest + compressedOffset, src  + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
    807785                        break;
    808786                    }
     
    837815            }
    838816
    839             TileHeader tile_head(thisRoundNumRows, compressedOffset);
     817            const TileHeader tile_head(thisRoundNumRows, compressedOffset);
    840818            memcpy(dest, &tile_head, sizeof(TileHeader));
    841819
     
    855833                switch (fRealColumns[i].block_head.getOrdering())
    856834                {
    857                     case kOrderByRow:
     835                case kOrderByRow:
     836                    //regular, "semi-transposed" copy
     837                    for (uint32_t k=0;k<thisRoundNumRows;k++)
     838                    {
     839                        memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
     840                        dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
     841                    }
     842                    break;
     843
     844                case kOrderByCol:
     845                    //transposed copy
     846                    for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
    858847                        for (uint32_t k=0;k<thisRoundNumRows;k++)
    859                         {//regular, "semi-transposed" copy
    860                             memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
    861                             dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
     848                        {
     849                            memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
     850                            dest += fRealColumns[i].col.size;
    862851                        }
    863                     break;
    864 
    865                     case kOrderByCol :
    866                         for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
    867                             for (uint32_t k=0;k<thisRoundNumRows;k++)
    868                             {//transposed copy
    869                                 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
    870                                 dest += fRealColumns[i].col.size;
    871                             }
    872852                    break;
    873853                };
     
    897877            string huffmanOutput;
    898878            uint32_t previousHuffmanSize = 0;
     879
     880            //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
    899881            if (numRows < 2)
    900             {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
    901882                return numRows*sizeOfElems*numRowElems + 1000;
    902             }
     883
    903884            if (sizeOfElems < 2 )
    904885            {
     
    910891#endif
    911892            }
     893
    912894            uint32_t huffmanOffset = 0;
    913895            for (uint32_t j=0;j<numRowElems;j++)
     
    920902                previousHuffmanSize = huffmanOutput.size();
    921903            }
     904
    922905            const size_t totalSize = huffmanOutput.size() + huffmanOffset;
    923906
     
    949932        {
    950933            int16_t* short_data = reinterpret_cast<int16_t*>(data);
    951             //un-do the integer smoothing
    952934            for (uint32_t j=2;j<numElems;j++)
    953935                short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
     
    965947
    966948        vector<Queue<CompressionTarget>>          fCompressionQueues;  ///< Processing queues (=threads)
    967         Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue;  ///< Writing queue (=thread)
     949        Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue;   ///< Writing queue (=thread)
    968950
    969951        // catalog related stuff
    970952        CatalogType fCatalog;               ///< Catalog for this file
    971         uint32_t    fCatalogSize;          ///< Actual catalog size (.size() is slow on large lists)
    972         uint32_t    fNumTiles;            ///<  Number of pre-reserved tiles
    973         uint32_t    fNumRowsPerTile;     ///<    Number of rows per tile
    974         off_t       fCatalogOffset;     ///<    Offset of the catalog from the beginning of the file
     953        uint32_t    fCatalogSize;           ///< Actual catalog size (.size() is slow on large lists)
     954        uint32_t    fNumTiles;              ///< Number of pre-reserved tiles
     955        uint32_t    fNumRowsPerTile;        ///< Number of rows per tile
     956        off_t       fCatalogOffset;         ///< Offset of the catalog from the beginning of the file
    975957
    976958        // checksum related stuff
    977959        Checksum fCatalogSum;    ///< Checksum of the catalog
    978         Checksum fRawSum;       ///< Raw sum (specific to FACT)
    979         int32_t  fCheckOffset; ///<  offset to the data pointer to calculate the checksum
     960        Checksum fRawSum;        ///< Raw sum (specific to FACT)
     961        int32_t  fCheckOffset;   ///< offset to the data pointer to calculate the checksum
    980962
    981963        // data layout related stuff
     
    986968                block_head(h)
    987969            {}
    988             Table::Column     col;         ///< the regular column entry
    989             Compression       block_head; ///< the compression data associated with that column
     970            Table::Column col;         ///< the regular column entry
     971            Compression   block_head; ///< the compression data associated with that column
    990972        };
    991973        vector<CompressedColumn> fRealColumns;     ///< Vector hosting the columns of the file
     
    997979        exception_ptr     fThreadsException; ///< exception pointer to store exceptions coming from the threads
    998980#endif
     981        int               fErrno;            ///< propagate errno to main thread
     982
    999983
    1000984};
Note: See TracChangeset for help on using the changeset viewer.