Ignore:
Timestamp:
10/18/13 22:29:17 (11 years ago)
Author:
tbretz
Message:
Instead of a lot of conditionals in case of no queue, the queues are set to prompt execution so that the calling code is always the same; reset fErrno before a new data block is written, i.e. when a table header is written.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/Mars/mcore/zofits.h

    r17264 r17269  
    4646        struct WriteTarget
    4747        {
    48             bool operator < (const WriteTarget& other)
     48            bool operator < (const WriteTarget& other) const
    4949            {
    5050                return tile_num < other.tile_num;
     
    141141            ofits::WriteTableHeader(name);
    142142
     143            fCompressionQueues.front().setPromptExecution(fNumQueues==0);
     144            fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
     145
    143146            if (fNumQueues != 0)
    144147            {
     
    153156            //mark that no tile has been written so far
    154157            fLatestWrittenTile = -1;
     158
     159            //no wiring error (in the writing of the data) has occured so far
     160            fErrno = 0;
    155161
    156162            return good();
     
    316322            }
    317323
    318             CompressionTarget compress_target(AddOneCatalogRow());
    319             SetNextCompression(compress_target);
    320 
    321             //no worker threads. do everything in-line
    322             if (fNumQueues == 0)
    323             {
    324                 const uint64_t size_to_write = CompressBuffer(compress_target);
    325 
    326                 const WriteTarget write_target(compress_target.target, size_to_write);
    327                 if (!WriteBufferToDisk(write_target))
    328                     throw std::runtime_error("Unexpected tile number mismatch in WriteBufferToDisk in the main thread.");
    329 
    330                 // The correct 'errno' is set, because it is the main thread.
    331                 return good();
    332             }
    333 
    334             //if all queues are empty, use queue 0
    335             uint32_t min_index     = 0;
    336             uint32_t min_size      = std::numeric_limits<uint32_t>::max();
    337             uint32_t current_index = 0;
    338 
    339             for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++)
    340             {
    341                 if (it->size() < min_size)
    342                 {
    343                     min_index = current_index;
    344                     min_size = it->size();
    345                 }
    346                 current_index++;
    347             }
    348 
    349             if (!fCompressionQueues[min_index].emplace(compress_target))
     324            // use the least occupied queue
     325            const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
     326            if (!imin->emplace(InitNextCompression()))
    350327                throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
    351328
     
    364341        /// Setup the environment to compress yet another tile of data
    365342        /// @param target the struct where to host the produced parameters
    366         void SetNextCompression(CompressionTarget& target)
    367         {
     343        CompressionTarget InitNextCompression()
     344        {
     345            CompressionTarget target(AddOneCatalogRow());
     346
    368347            //fill up compression target
    369348            target.src            = fSmartBuffer;
     
    379358            //get a new buffer to host the incoming data
    380359            fSmartBuffer = fMemPool.malloc();
     360
     361            return target;
    381362        }
    382363
     
    389370
    390371            //did we write more rows than what the catalog could host ?
    391             if (fCatalogSize > fNumTiles)
    392             {
    393                 const uint32_t shrink_factor = fCatalogSize / fNumTiles; //always exact as extra rows were added just above
    394 
    395                 //shrink the catalog !
    396                 uint32_t entry_id = 1;
    397                 auto it = fCatalog.begin();
    398                 it++;
    399                 for (; it != fCatalog.end(); it++)
    400                 {
    401                     if (entry_id >= fNumTiles) break;
    402 
    403                     uint32_t target_id = entry_id*shrink_factor;
    404 
    405                     auto jt = it;
    406                     for (uint32_t i=0;i<target_id-entry_id;i++)
    407                         jt++;
    408 
    409                     *it = *jt;
    410 
    411                     entry_id++;
    412                 }
    413 
    414                 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
    415                 //remove the too many entries
    416                 for (uint32_t i=0;i<num_tiles_to_remove;i++)
    417                 {
    418                     fCatalog.pop_back();
    419                     fCatalogSize--;
    420                 }
    421                 //update header keywords
    422                 fNumRowsPerTile *= shrink_factor;
    423 
    424                 SetInt("ZTILELEN", fNumRowsPerTile);
    425                 SetInt("ZSHRINK",  shrink_factor);
    426             }
     372            if (fCatalogSize <= fNumTiles) // nothing to do
     373                return;
     374
     375            //always exact as extra rows were added just above
     376            const uint32_t shrink_factor = fCatalogSize / fNumTiles;
     377
     378            //shrink the catalog !
     379            uint32_t entry_id = 1;
     380            auto it = fCatalog.begin();
     381            it++;
     382            for (; it != fCatalog.end(); it++)
     383            {
     384                if (entry_id >= fNumTiles)
     385                    break;
     386
     387                const uint32_t target_id = entry_id*shrink_factor;
     388
     389                auto jt = it;
     390                for (uint32_t i=0; i<target_id-entry_id; i++)
     391                    jt++;
     392
     393                *it = *jt;
     394
     395                entry_id++;
     396            }
     397
     398            const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
     399
     400            //remove the too many entries
     401            for (uint32_t i=0;i<num_tiles_to_remove;i++)
     402            {
     403                fCatalog.pop_back();
     404                fCatalogSize--;
     405            }
     406
     407            //update header keywords
     408            fNumRowsPerTile *= shrink_factor;
     409
     410            SetInt("ZTILELEN", fNumRowsPerTile);
     411            SetInt("ZSHRINK",  shrink_factor);
    427412        }
    428413
     
    447432#endif
    448433
    449             //write the last tile of data (if any
    450             if (fTable.num_rows%fNumRowsPerTile != 0)
    451             {
    452                 CompressionTarget compress_target(AddOneCatalogRow());
    453                 SetNextCompression(compress_target);
    454 
    455                 //set number of threads to zero before calling compressBuffer
    456                 const int32_t backup_num_queues = fNumQueues;
    457                 fNumQueues = 0;
    458 
    459                 const uint64_t size_to_write = CompressBuffer(compress_target);
    460                 fNumQueues = backup_num_queues;
    461 
    462                 const WriteTarget write_target(compress_target.target, size_to_write);
    463                 if (!WriteBufferToDisk(write_target))
    464                     throw std::runtime_error("Tile number mismatch in WriteBufferToDisk writing the last tile.");
     434            //write the last tile of data (if any)
     435            if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
     436            {
     437                fWriteToDiskQueue.enablePromptExecution();
     438                fCompressionQueues.front().enablePromptExecution();
     439                fCompressionQueues.front().emplace(InitNextCompression());
    465440            }
    466441
     
    563538
    564539        /// Get and set the actual number of threads for this object
    565         int32_t GetNumThreads() const { return fNumQueues;}
     540        int32_t GetNumThreads() const { return fNumQueues; }
    566541        bool SetNumThreads(uint32_t num)
    567542        {
     
    590565                num = num_available_cores>2 ? num_available_cores-2 : 1;
    591566
    592             if (fCompressionQueues.size() != uint32_t(num))
    593             {
    594                 fCompressionQueues.resize(num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
    595                 fNumQueues = num;
    596             }
     567            fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
     568            fNumQueues = num;
    597569
    598570            return true;
     
    651623        /// @param target the struct hosting the parameters of the compression
    652624        /// @return number of bytes of the compressed data, or always 1 when used by the Queues
    653         uint32_t CompressBuffer(const CompressionTarget& target)
    654         {
    655             uint64_t compressed_size = 0;
    656 
     625        bool CompressBuffer(const CompressionTarget& target)
     626        {
    657627            //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
    658628            //Really do not understand what's wrong...
     
    674644
    675645                //compress the buffer
    676                 compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
     646                const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
     647
     648                //post the result to the writing queue
     649                //get a copy so that it becomes non-const
     650                fWriteToDiskQueue.emplace(target.target, compressed_size);
     651
    677652#ifdef __EXCEPTIONS
    678653            }
     
    685660#endif
    686661
    687             if (fNumQueues == 0)
    688                 return compressed_size;
    689 
    690             //post the result to the writing queue
    691             //get a copy so that it becomes non-const
    692             fWriteToDiskQueue.emplace(target.target, compressed_size);
    693 
    694             // if used by the queue, always return true as the elements are not ordered
    695             return 1;
     662            return true;
    696663        }
    697664
Note: See TracChangeset for help on using the changeset viewer.