Changeset 17237 for trunk/Mars


Ignore:
Timestamp:
10/17/13 15:21:09 (11 years ago)
Author:
lyard
Message:
Cleaner version of zofits
Location:
trunk/Mars/mcore
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/Mars/mcore/factofits.h

    r17226 r17237  
    1111#include "zofits.h"
    1212
     13#include "../src/DataWriteFits2.h"
     14
     15#include "DrsCalib.h"
     16
    1317#ifndef __MARS__
    1418namespace std
     
    2327    public:
    2428
     29        /// constructors
    2530        factofits(uint32_t numTiles=1000,
    2631                  uint32_t rowPerTile=100,
     
    3035            fDataOffset       = -1;
    3136        }
     37
    3238        factofits(const char* fname,
    3339                  uint32_t numTiles=1000,
     
    3844            fDataOffset       = -1;
    3945        }
     46
    4047        virtual ~factofits()
    4148        {
    4249        }
    4350
    44         //whether or not a calibration was given to the file writer
     51        /// whether or not a calibration was given to the file writer
    4552        virtual bool IsOffsetCalibration()
    4653        {
     
    4855        }
    4956
    50         //assign a given drs offset calibration
     57        ///assign a given drs offset calibration
    5158        void SetDrsCalibration(const vector<float> &calib)
    5259        {
     60            VerifyCalibrationSize(calib.size());
     61
    5362            if (!IsOffsetCalibration())
    5463                fOffsetCalibration.resize(1440*1024);
     
    5867        }
    5968
    60         void SetDrsCalibration(const vector<float>& calib)
    61         {
    62             if (calib.size() != 1440*1024)
    63     #ifdef __EXCEPTIONS
    64             throw runtime_error("Cannot load calibration with anything else than 1024 samples per pixel");
    65     #else
    66             gLog << ___err___ << "ERROR - Cannot load calibration with anything else than 1024 samples per pixel");
    67     #endif
    68             SetDrsCalibration(calib.data());
    69         }
    70 
     69        ///assign a given drs offset calibration
    7170        void SetDrsCalibration(const vector<int16_t>& vec)
    7271        {
     72            VerifyCalibrationSize(vec.size());
     73
    7374            if (!IsOffsetCalibration())
    7475                fOffsetCalibration.resize(1440*1024);
     
    7879        }
    7980
     81        ///assign a given drs offset calibration
    8082        void SetDrsCalibration(const DrsCalibration& drs)
    8183        {
     
    8385                return;
    8486
     87            VerifyCalibrationSize(drs.fOffset.size());
     88
    8589            if (!IsOffsetCalibration())
    8690                fOffsetCalibration.resize(1440*1024);
     
    9094        }
    9195
     96        ///Overload of the super function
    9297        bool WriteTableHeader(const char* name="DATA")
    9398        {
     
    102107                    if (it->col.name == "StartCellData")
    103108                        fStartCellsOffset = it->col.offset;
     109
    104110                    if (it->col.name == "Data")
    105111                    {
    106                         fNumSlices = it->col.num;
     112                        fNumSlices  = it->col.num;
    107113                        fDataOffset = it->col.offset;
    108114                        if (fNumSlices % 1440 != 0)
     
    118124                    }
    119125                }
     126
    120127                if (fStartCellsOffset < 0)
    121128                {
     
    128135                    fOffsetCalibration.resize(0);
    129136                }
     137
    130138                if (fDataOffset < 0)
    131139                {
     
    143151        }
    144152
     153        ///Actually write the drs calibration table
    145154        virtual bool WriteDrsOffsetsTable()
    146155        {
     
    191200        }
    192201
     202        ///Apply the drs offset calibration (overload of super-method)
    193203        virtual void DrsOffsetCalibrate(char* target_location)
    194204        {
    195205            if (IsOffsetCalibration())
    196206            {
    197                 int16_t* startCell = reinterpret_cast<int16_t*>(target_location + fStartCellsOffset);
    198                 int16_t* data      = reinterpret_cast<int16_t*>(target_location + fDataOffset);
     207                const int16_t* startCell = reinterpret_cast<int16_t*>(target_location + fStartCellsOffset);
     208                int16_t*       data      = reinterpret_cast<int16_t*>(target_location + fDataOffset);
    199209
    200210                for (uint32_t ch=0; ch<1440; ch++)
     
    226236
    227237private:
     238        /// Checks if the size of the input calibration is ok
     239        void VerifyCalibrationSize(uint32_t size)
     240        {
     241            if (size != 1440*1024)
     242            {
     243                ostringstream str;
     244                str << "Cannot load calibration with anything else than 1440 pixels and 1024 samples per pixel. Got a total size of " << size;
     245#ifdef __EXCEPTIONS
     246            throw runtime_error(str.str());
     247#else
     248            gLog << ___err___ << "ERROR - " << str.str() << endl;
     249#endif
     250            }
     251         }
     252
    228253        //Offsets calibration stuff.
    229         vector<int16_t> fOffsetCalibration; ///< The calibration itself
    230         int32_t         fStartCellsOffset;  ///< Offset in bytes for the startcell data
    231         int32_t         fDataOffset;        ///< Offset in bytes for the data
    232         int32_t         fNumSlices;         ///< Number of samples per pixel per event
     254        vector<int16_t> fOffsetCalibration;    ///< The calibration itself
     255        int32_t         fStartCellsOffset;    ///< Offset in bytes for the startcell data
     256        int32_t         fDataOffset;         ///<  Offset in bytes for the data
     257        int32_t         fNumSlices;         ///<    Number of samples per pixel per event
    233258
    234259}; //class factofits
  • trunk/Mars/mcore/zofits.h

    r17228 r17237  
    2222#endif
    2323
    24 
    2524class zofits : public ofits
    2625{
     26        /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
    2727        template<class S>
    2828        struct QueueMin : std::list<S>
     
    3434        };
    3535
     36        /// Parameters required to write a tile to disk
    3637        struct WriteTarget
    3738        {
     
    4142            }
    4243
    43             uint32_t tile_num;
    44             uint32_t size;
    45             shared_ptr<MemoryChunk> target;
     44            uint32_t                tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
     45            uint32_t                size;    ///<  Size to write
     46            shared_ptr<MemoryChunk> data;   ///<   Memory block to write
    4647        };
    4748
     49        /// Parameters required to compress a tile of data
    4850        struct CompressionTarget
    4951        {
    50             /*
    51             bool operator < (const CompressionTarget& other)
    52             {
    53                 return target < other.target;
    54             }*/
    55 
    56             shared_ptr<MemoryChunk> src;
    57             shared_ptr<MemoryChunk> transposed_src;
    58             WriteTarget             target;
    59             uint32_t                num_rows;
     52            shared_ptr<MemoryChunk> src;             ///< Original data
     53            shared_ptr<MemoryChunk> transposed_src; ///<  Transposed data
     54            WriteTarget             target;        ///<   Compressed data
     55            uint32_t                num_rows;     ///<    Number of rows to compress
    6056        };
    6157
    6258public:
    63         //constructors
     59        /// constructors
     60        /// @param numTiles how many data groups should be pre-reserved ?
     61        /// @param rowPerTile how many rows will be grouped together in a single tile
     62        /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
    6463        zofits(uint32_t numTiles=1000,
    6564               uint32_t rowPerTile=100,
     
    7271        }
    7372
     73        /// @param fname the target filename
     74        /// @param numTiles how many data groups should be pre-reserved ?
     75        /// @param rowPerTile how many rows will be grouped together in a single tile
     76        /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
    7477        zofits(const char* fname,
    7578               uint32_t numTiles=1000,
     
    8386        }
    8487
     88        /// destructors
    8589        virtual ~zofits()
    8690        {
     
    8892
    8993        //initialization of member variables
    90         void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0, uint64_t maxUsableMem=0)
     94        /// @param nt number of tiles
     95        /// @param rpt number of rows per tile
     96        /// @param maxUsableMem max amount of RAM to be used by the compression buffers
     97        void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
    9198        {
    9299            if (nt == 0)
    93                 throw runtime_error("Cannot work with a catalog of size 0. sorry.");
    94 
    95             fCheckOffset  = 0;
     100                throw runtime_error("There must be at least 1 tile of data (0 specified). This is required by the FITS standard. Please try again with num_tile >= 1.");
     101
     102            fCheckOffset = 0;
     103            fNumQueues   = 0;
    96104
    97105            fNumTiles       = nt;
    98106            fNumRowsPerTile = rpt;
    99107
    100             fBuffer       = NULL;
    101             fRealRowWidth = 0;
     108            fBuffer           = NULL;
     109            fRealRowWidth     = 0;
    102110            fCatalogExtraRows = 0;
    103 
    104             fCatalogOffset    =  0;
     111            fCatalogOffset    = 0;
    105112
    106113            fMaxUsableMem = maxUsableMem;
     
    110117        }
    111118
    112 
    113         //write the header of the binary table
     119        /// write the header of the binary table
     120        /// @param name the name of the table to be created
     121        /// @return the state of the file
    114122        virtual bool WriteTableHeader(const char* name="DATA")
    115123        {
    116             if (!reallocateBuffers())
    117                 throw ("While allocating memory: apparently there not as much free memory as advertized...");
     124            reallocateBuffers();
    118125
    119126            ofits::WriteTableHeader(name);
     
    125132                    it->start();
    126133
     134                //start the disk writer
    127135                fWriteToDiskQueue.start();
    128136            }
     
    134142        }
    135143
     144        /// open a new file.
     145        /// @param filename the name of the file
     146        /// @param Whether or not the name of the extension should be added or not
    136147        void open(const char* filename, bool addEXTNAMEKey=true)
    137148        {
     
    139150
    140151            //add compression-related header entries
    141             SetBool("ZTABLE", true, "Table is compressed");
    142             SetInt("ZNAXIS1", 0, "Width of uncompressed rows");
    143             SetInt("ZNAXIS2", 0, "Number of uncompressed rows");
    144             SetInt("ZPCOUNT", 0, "");
    145             SetInt("ZHEAPPTR", 0, "");
    146             SetInt("ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
    147             SetInt("THEAP", 0, "");
    148             SetStr("RAWSUM", "         0", "Checksum of raw little endian data");
    149             SetFloat("ZRATIO", 0, "Compression ratio");
     152            SetBool( "ZTABLE",   true,            "Table is compressed");
     153            SetInt(  "ZNAXIS1",  0,              "Width of uncompressed rows");
     154            SetInt(  "ZNAXIS2",  0,              "Number of uncompressed rows");
     155            SetInt(  "ZPCOUNT",  0,              "");
     156            SetInt(  "ZHEAPPTR", 0,              "");
     157            SetInt(  "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
     158            SetInt(  "THEAP",    0,              "");
     159            SetStr(  "RAWSUM",   "         0",    "Checksum of raw little endian data");
     160            SetFloat("ZRATIO",   0,              "Compression ratio");
    150161
    151162            fCatalogExtraRows = 0;
     
    153164        }
    154165
     166        /// Super method. does nothing as zofits does not know about DrsOffsets
     167        /// @return the state of the file
    155168        virtual bool WriteDrsOffsetsTable()
    156169        {
     
    158171        }
    159172
     173        /// Returns the number of bytes per uncompressed row
     174        /// @return number of bytes per uncompressed row
    160175        uint32_t GetBytesPerRow() const
    161176        {
     
    163178        }
    164179
     180        /// Write the data catalog
     181        /// @return the state of the file
    165182        bool WriteCatalog()
    166183        {
    167184            const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
    168             const uint32_t total_catalog_size = fCatalog.size()*one_catalog_row_size;
    169 
     185            const uint32_t total_catalog_size   = fCatalog.size()*one_catalog_row_size;
     186
     187            // swap the catalog bytes before writing
    170188            vector<char> swapped_catalog(total_catalog_size);
    171189            uint32_t shift = 0;
     
    176194            }
    177195
     196            // first time writing ? remember where we are
    178197            if (fCatalogOffset == 0)
    179             {
    180198                fCatalogOffset = tellp();
    181             }
    182 
     199
     200            // remember where we came from
    183201            const off_t where_are_we = tellp();
    184202
     203            // write to disk
    185204            seekp(fCatalogOffset);
    186205            write(swapped_catalog.data(), total_catalog_size);
     
    188207                seekp(where_are_we);
    189208
     209            // udpate checksum
    190210            fCatalogSum.reset();
    191211            fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
     
    193213            return good();
    194214        }
     215
     216        /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
    195217        virtual void DrsOffsetCalibrate(char* )
    196218        {
     
    198220        }
    199221
     222        /// Grows the catalog in case not enough rows were allocated
    200223        void GrowCatalog()
    201224        {
     
    214237        }
    215238
     239        /// write one row of data
     240        /// @param ptr the source buffer
     241        /// @param the number of bytes to write
     242        /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
    216243        bool WriteRow(const void* ptr, size_t cnt, bool = true)
    217244        {
     
    242269
    243270            //for now, make an extra copy of the data, for RAWSUM checksuming.
    244             //Ideally this should be moved to the threads, along with the drs-offset-calibration
     271            //Ideally this should be moved to the threads
    245272            //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
    246273            //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
     
    278305                    WriteTarget write_target;
    279306                    write_target.size     = size_to_write;
    280                     write_target.target   = compress_target.target.target;
     307                    write_target.data   = compress_target.target.data;
    281308                    write_target.tile_num = compress_target.target.tile_num;
    282309
    283                     if (!WriteBufferToDisk(write_target))
    284                         throw runtime_error("Something went wrong while writing to disk");
     310                    return WriteBufferToDisk(write_target);
    285311                }
    286312                else
     
    302328
    303329                    if (!fCompressionQueues[min_index].post(compress_target))
    304                         throw runtime_error("I could not post this buffer. This does not make sense...");
     330                        throw runtime_error("The compression queues are not started. Did you close the file before writing this row ?");
    305331                }
    306332            }
     
    309335        }
    310336
     337        /// update the real number of rows
    311338        void FlushNumRows()
    312339        {
    313             SetInt("NAXIS2", fTable.num_rows/fNumRowsPerTile);
     340            SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
    314341            SetInt("ZNAXIS2", fTable.num_rows);
    315342            FlushHeader();
    316343        }
    317344
     345        /// Setup the environment to compress yet another tile of data
     346        /// @param target the struct where to host the produced parameters
    318347        void SetNextCompression(CompressionTarget& target)
    319348        {
     
    325354            write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
    326355            write_target.size     = 0;
    327             write_target.target   = fMemPool.malloc();
     356            write_target.data     = fMemPool.malloc();
    328357
    329358            //fill up compression target
    330359            target.src            = fSmartBuffer;
    331             target.transposed_src      = transposed_data;
    332             target.target   = write_target;
    333             target.num_rows = fTable.num_rows;
     360            target.transposed_src = transposed_data;
     361            target.target         = write_target;
     362            target.num_rows       = fTable.num_rows;
    334363
    335364            //get a new buffer to host the incoming data
     
    338367        }
    339368
     369        /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
    340370        void ShrinkCatalog()
    341371        {
     
    390420        }
    391421
     422        /// close an open file.
     423        /// @return the state of the file
    392424        bool close()
    393425        {
     426            // stop compression and write threads
    394427            for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
    395428                it->wait();
     
    400433            {
    401434#ifdef __EXCEPTIONS
    402                 throw runtime_error("Something went wrong while writing to disk...");
     435                throw runtime_error("Looks like the file has been closed already");
    403436#else
    404437                return false;
     
    407440
    408441#ifdef __EXCEPTIONS
    409             //check if something hapenned to the compression threads
     442            //check if something hapenned while the compression threads were working
    410443            if (fThreadsException != exception_ptr())
    411444            {
     445                //if so, re-throw the exception that was generated
    412446                rethrow_exception(fThreadsException);
    413447            }
    414448#endif
    415449
     450            //write the last tile of data (if any
    416451            if (fTable.num_rows%fNumRowsPerTile != 0)
    417452            {
     
    427462                WriteTarget write_target;
    428463                write_target.size     = size_to_write;
    429                 write_target.target   = compress_target.target.target;
     464                write_target.data   = compress_target.target.data;
    430465                write_target.tile_num = compress_target.target.tile_num;
    431466
     
    440475            SetInt("ZNAXIS2", fTable.num_rows);
    441476
    442             uint64_t heap_offset = fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2;
    443             SetInt("ZHEAPPTR", heap_offset);
     477            SetInt("ZHEAPPTR", fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2);
    444478
    445479            const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
    446 
    447             SetInt("THEAP", total_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols);
    448 
    449             SetInt("NAXIS1", 2*sizeof(int64_t)*fTable.num_cols);
     480            const uint32_t total_catalog_width     = 2*sizeof(int64_t)*fTable.num_cols;
     481
     482            SetInt("THEAP",  total_num_tiles_written*total_catalog_width);
     483            SetInt("NAXIS1", total_catalog_width);
    450484            SetInt("NAXIS2", total_num_tiles_written);
    451485
     
    471505            }
    472506
    473             float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
     507            const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
    474508            SetFloat("ZRATIO", compression_ratio);
    475509
     
    478512
    479513            SetInt("PCOUNT", heap_size, "size of special data area");
    480 
    481514
    482515            //Just for updating the fCatalogSum value
     
    502535        }
    503536
    504         //Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
     537        /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
    505538        bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
    506539        {
     
    508541        }
    509542
     543        /// Overload of the simplified compressed version
    510544        bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
    511545        {
     
    543577        }
    544578
     579        /// static setter for the default number of threads to use. -1 means all available physical cores
    545580        static void SetDefaultNumThreads(int32_t num) { fgNumQueues = num;}
    546581        static int32_t GetDefaultNumThreads() { return fgNumQueues;}
    547582
    548         int32_t GetNumThreads() { return fNumQueues;}
     583        /// Get and set the actual number of threads for this object
     584        int32_t GetNumThreads() const { return fNumQueues;}
    549585        bool SetNumThreads(int32_t num)
    550586        {
     
    554590                throw runtime_error("File must be closed before changing the number of compression threads");
    555591#else
    556                 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads";
     592                gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" << endl;
    557593#endif
    558594                return false;
    559595            }
     596
     597            //get number of physically available threads
    560598#ifdef USE_BOOST_THREADS
    561599            int32_t num_available_cores = boost::thread::hardware_concurrency();
     
    564602#endif
    565603
     604            // could not detect number of available cores from system properties...
     605            // assume that 5 cores are available (4 compression, 1 write)
    566606            if (num_available_cores == 0)
    567             {//could not detect number of available cores from system properties...
    568                 //Assuming that 5 cores are availables (4 compression, 1 write)
    569607                num_available_cores = 5;
    570             }
     608
     609            // Throw an exception if too many cores are requested
    571610            if (num > num_available_cores)
    572611            {
    573612                ostringstream str;
    574                 str << "Number of threads cannot be greater than physically available (" << num_available_cores << ")";
     613                str << "You will be using more threads(" << num << ") than available cores(" << num_available_cores << "). Expect sub-optimal performances";
    575614#ifdef __EXCEPTIONS
    576615                throw runtime_error(str.str());
    577616#else
    578                 gLog << ___err___ << "ERROR - " << str.str();
    579 #endif
    580                 return false;
     617                gLog << ___err___ << "WARNING - " << str.str() << endl;
     618#endif
    581619            }
    582620
    583621            if (num == -1)
    584                 num = num_available_cores-2; // 1 for writing, one for the main thread
     622                num = num_available_cores-2; // 1 for writing, 1 for the main thread
    585623
    586624            if (fCompressionQueues.size() == (uint32_t)num)
     
    590628            Queue<CompressionTarget> queue(bind(&zofits::CompressBuffer, this, placeholders::_1), false);
    591629
    592             //shrink
     630            //shrink if required
    593631            if ((uint32_t)num < fCompressionQueues.size())
    594632            {
     
    597635            }
    598636
    599             //grow
     637            //grow if required
    600638            fCompressionQueues.resize(num, queue);
    601639
     
    607645protected:
    608646
    609         bool reallocateBuffers()
    610         {
    611             size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming;
     647        /// Allocates the required objects.
     648        void reallocateBuffers()
     649        {
     650            const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming;
    612651            fMemPool.setChunkSize(chunk_size);
    613652
     
    625664                    *it = CatalogEntry(0,0);
    626665            }
    627             return true;
    628         }
    629 
    630         bool writeCompressedDataToDisk(char* src, uint32_t sizeToWrite)
     666        }
     667
     668        /// Actually does the writing to disk (and checksuming)
     669        /// @param src the buffer to write
     670        /// @param sizeToWrite how many bytes should be written
     671        /// @return the state of the file
     672        bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
    631673        {
    632674            char* checkSumPointer = src+4;
     
    656698        }
    657699
     700        /// Compress a given buffer based on the target. This is the method executed by the threads
     701        /// @param target the struct hosting the parameters of the compression
     702        /// @return number of bytes of the compressed data, or always 1 when used by the Queues
    658703        uint32_t CompressBuffer(const CompressionTarget& target)
    659704        {
     
    667712
    668713                //compress the buffer
    669                 compressed_size = compressBuffer(target.target.target.get()->get(), target.transposed_src.get()->get(), target.num_rows);
     714                compressed_size = compressBuffer(target.target.data.get()->get(), target.transposed_src.get()->get(), target.num_rows);
    670715#ifdef __EXCEPTIONS
    671716            }
     
    686731            wt.tile_num = target.target.tile_num;
    687732            wt.size     = compressed_size;
    688             wt.target   = target.target.target;
     733            wt.data   = target.target.data;
    689734
    690735            fWriteToDiskQueue.post(wt);
    691736
    692             return compressed_size;
    693         }
    694 
     737            // if used by the queue, always return true as the elements are not ordered
     738            return 1;
     739        }
     740
     741        /// Write one compressed tile to disk. This is the method executed by the writing thread
     742        /// @param target the struct hosting the write parameters
    695743        bool WriteBufferToDisk(const WriteTarget& target)
    696744        {
     
    701749            fLatestWrittenTile++;
    702750
    703             //write the buffer to disk.
    704             return writeCompressedDataToDisk(target.target.get()->get(), target.size);
    705         }
    706 
     751#ifdef __EXCEPTIONS
     752            try
     753            {
     754#endif
     755                if (!writeCompressedDataToDisk(target.data.get()->get(), target.size))
     756                {//could not write the data to disk
     757                    ostringstream str;
     758                    str << "An error occured while writing to disk: ";
     759                    if (eof())
     760                        str << "End-Of-File";
     761                    if (failbit)
     762                        str << "Logical error on i/o operation";
     763                    if (badbit)
     764                        str << "Writing error on i/o operation";
     765#ifdef __EXCEPTIONS
     766                    throw runtime_error(str.str());
     767#else
     768                    gLog << ___err___ << "ERROR - " << str.str() << endl;
     769#endif
     770                }
     771#ifdef __EXCEPTIONS
     772            }
     773            catch(...)
     774            {
     775                fThreadsException = current_exception();
     776                if (fNumQueues == 0)
     777                    rethrow_exception(fThreadsException);
     778            }
     779#endif
     780            return true;
     781        }
     782
     783        /// Compress a given buffer based on its source and destination
    707784        //src cannot be const, as applySMOOTHING is done in place
     785        /// @param dest the buffer hosting the compressed data
     786        /// @param src the buffer hosting the transposed data
     787        /// @param num_rows the number of uncompressed rows in the transposed buffer
     788        /// @param the number of bytes of the compressed data
    708789        uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows)
    709790        {
    710             uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
    711             uint32_t offset=0;
    712             uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile;
     791            const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
     792            const uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile;
     793            uint32_t       offset            = 0;
    713794
    714795            //skip the checksum reserved area
     
    728809
    729810                //set the default byte telling if uncompressed the compressed Flag
    730                 uint64_t previousOffset = compressedOffset;
     811                const uint64_t previousOffset = compressedOffset;
    731812
    732813                //skip header data
     
    755836                if ((head.getProc(0) != kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
    756837                {//if so set flag and redo it uncompressed
    757                     cout << "Redoing uncompressed ! " << endl;
     838                   // cout << "Redoing uncompressed ! " << endl;
    758839                    //de-smooth !
    759840                    if (head.getProc(0) == kFactSmoothing)
     
    787868        }
    788869
     870        /// Transpose a tile to a new buffer
     871        /// @param src buffer hosting the regular, row-ordered data
     872        /// @param dest the target buffer that will receive the transposed data
    789873        void copyTransposeTile(const char* src, char* dest)
    790874        {
    791             uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
     875            const uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
    792876
    793877            //copy the tile and transpose it
     
    817901
    818902        /// Specific compression functions
     903        /// @param dest the target buffer
     904        /// @param src the source buffer
     905        /// @param size number of bytes to copy
     906        /// @return number of bytes written
    819907        uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
    820908        {
     
    823911        }
    824912
     913        /// Do huffman encoding
     914        /// @param dest the buffer that will receive the compressed data
     915        /// @param src the buffer hosting the transposed data
     916        /// @param numRows number of rows of data in the transposed buffer
     917        /// @param sizeOfElems size in bytes of one data elements
     918        /// @param numRowElems number of elements on each row
     919        /// @return number of bytes written
    825920        uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
    826921        {
     
    836931                throw runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
    837932#else
    838                 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types";
     933                gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << endl;
    839934                return 0;
    840935#endif
     
    859954        }
    860955
    861         uint32_t applySMOOTHING(char* data, uint32_t numElems)//uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
     956        /// Applies Thomas' DRS4 smoothing
     957        /// @param data where to apply it
     958        /// @param numElems how many elements of type int16_t are stored in the buffer
     959        /// @return number of bytes modified
     960        uint32_t applySMOOTHING(char* data, uint32_t numElems)
    862961        {
    863962            int16_t* short_data = reinterpret_cast<int16_t*>(data);
     
    867966            return numElems*sizeof(int16_t);
    868967        }
    869         // Apply the inverse transform of the integer smoothing
    870         uint32_t UnApplySMOOTHING(char*   data, uint32_t   numElems)
     968
     969        /// Apply the inverse transform of the integer smoothing
     970        /// @param data where to apply it
     971        /// @param numElems how many elements of type int16_t are stored in the buffer
     972        /// @return number of bytes modified
     973        uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
    871974        {
    872975            int16_t* short_data = reinterpret_cast<int16_t*>(data);
     
    877980            return numElems*sizeof(uint16_t);
    878981        }
    879         //Compressed data stuff
    880         int32_t         fCheckOffset;       ///< offset to the data pointer to calculate the checksum
    881         uint32_t        fNumTiles;
    882         uint32_t        fNumRowsPerTile;
    883 
    884         MemoryManager        fMemPool;
     982
     983
    885984
    886985        //thread related stuff
    887         vector<Queue<CompressionTarget>>          fCompressionQueues;
    888         Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue;
    889 
    890         //thread related stuff
    891         static int32_t          fgNumQueues;    ///< The number of threads that will be used to compress
    892         int32_t          fNumQueues;    ///< The number of threads that will be used to compress
    893 
    894         int32_t           fLatestWrittenTile;
    895 #ifdef __EXCEPTIONS
    896         exception_ptr     fThreadsException;
    897 #endif
     986        MemoryManager  fMemPool;               ///< Actual memory manager, providing memory for the compression buffers
     987        static int32_t fgNumQueues;           ///<  Default number of threads to be used by the objects
     988        int32_t        fNumQueues;           ///<   Current number of threads that will be used by this object
     989        uint64_t       fMaxUsableMem;       ///<    Maximum number of bytes that can be allocated by the memory manager
     990        int32_t        fLatestWrittenTile; ///<     Index of the last tile written to disk (for correct ordering while using several threads)
     991
     992        vector<Queue<CompressionTarget>>          fCompressionQueues;  ///< Processing queues (=threads)
     993        Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue;  ///<  Writing queue (=thread)
     994
     995        // catalog related stuff
    898996        struct CatalogEntry
    899997        {
    900998            CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};
    901             int64_t first;
    902             int64_t second;
     999            int64_t first;   ///< Size of this column in the tile
     1000            int64_t second; ///< offset of this column in the tile, from the start of the heap area
    9031001        } __attribute__((__packed__));
    9041002
    905         typedef vector<CatalogEntry>   CatalogRow;
    906         typedef vector<CatalogRow>     CatalogType;
    907         CatalogType          fCatalog;
    908         Checksum             fCatalogSum;
    909         Checksum             fRawSum;
    910         off_t                fCatalogOffset;
    911         uint32_t             fRealRowWidth;
    912         uint32_t             fCatalogExtraRows;
    913         vector<char>         fRawSumBuffer;
    914         uint64_t             fMaxUsableMem;
    915 
    916         shared_ptr<MemoryChunk> fSmartBuffer;
    917         char*                   fBuffer;
    918 
     1003        typedef vector<CatalogEntry> CatalogRow;
     1004        typedef vector<CatalogRow>   CatalogType;
     1005        CatalogType fCatalog;              ///< Catalog for this file
     1006        uint32_t    fNumTiles;            ///<  Number of pre-reserved tiles
     1007        uint32_t    fNumRowsPerTile;     ///<   Number of rows per tile
     1008        off_t       fCatalogOffset;     ///<    Offset of the catalog from the beginning of the file
     1009        uint32_t    fCatalogExtraRows; ///<     Number of extra rows written on top of the initial capacity of the file
     1010
     1011        // checksum related stuff
     1012        Checksum fCatalogSum;    ///< Checksum of the catalog
     1013        Checksum fRawSum;       ///<  Raw sum (specific to FACT)
     1014        int32_t  fCheckOffset; ///<   offset to the data pointer to calculate the checksum
     1015
     1016        // data layout related stuff
     1017        /// Regular columns augmented with compression informations
    9191018        struct CompressedColumn
    9201019        {
     
    9221021                block_head(h)
    9231022            {}
    924             Table::Column     col;
    925             Compression block_head;
     1023            Table::Column     col;         ///< the regular column entry
     1024            Compression       block_head; ///< the compression data associated with that column
    9261025        };
    927         vector<CompressedColumn> fRealColumns;
     1026        vector<CompressedColumn> fRealColumns;     ///< Vector hosting the columns of the file
     1027        uint32_t                 fRealRowWidth;   ///<  Width in bytes of one uncompressed row
     1028        shared_ptr<MemoryChunk>  fSmartBuffer;   ///<   Smart pointer to the buffer where the incoming rows are written
     1029        char*                    fBuffer;       ///<    regular version of fSmartBuffer
     1030        vector<char>             fRawSumBuffer;///<     buffer used for checksuming the incoming data, before compression
     1031
     1032#ifdef __EXCEPTIONS
     1033        exception_ptr     fThreadsException; ///< exception pointer to store exceptions coming from the threads
     1034#endif
    9281035
    9291036};
Note: See TracChangeset for help on using the changeset viewer.