Changeset 17221


Ignore:
Timestamp:
10/16/13 16:04:41 (11 years ago)
Author:
lyard
Message:
better version of zofits. New factofits. Only cleanup, defines, comments and catalog shrinkage are missing
Location:
trunk/Mars/mcore
Files:
1 added
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/Mars/mcore/ofits.h

    r17216 r17221  
    364364        this->open(fname);
    365365    }
    366     ~ofits() { if (is_open()) close(); }
     366    virtual ~ofits() { if (is_open()) close(); }
    367367
    368368    virtual void open(const char * filename, bool addEXTNAMEKey=true)
     
    621621
    622622        typecom << CommentFromType(typechar);
    623 /*        switch (typechar)
    624         {
    625         case 'L': typecom << "1-byte BOOL]";  break;
    626         case 'A': typecom << "1-byte CHAR]";  break;
    627         case 'B': typecom << "1-byte BOOL]";  break;
    628         case 'I': typecom << "2-byte INT]";   break;
    629         case 'J': typecom << "4-byte INT]";   break;
    630         case 'K': typecom << "8-byte INT]";   break;
    631         case 'E': typecom << "4-byte FLOAT]"; break;
    632         case 'D': typecom << "8-byte FLOAT]"; break;
    633         case 'Q': typecom << "var. Length]"; break;
    634         }
    635 */
     623
    636624        if (addHeaderKeys)
    637625        {
     
    643631        size_t size = SizeFromType(typechar);
    644632
    645 /*        switch (typechar)
    646         {
    647         case 'L': size = 1; break;
    648         case 'A': size = 1; break;
    649         case 'B': size = 1; break;
    650         case 'I': size = 2; break;
    651         case 'J': size = 4; break;
    652         case 'K': size = 8; break;
    653         case 'E': size = 4; break;
    654         case 'D': size = 8; break;
    655         case 'Q': size = 16; break;
    656         }
    657 */
    658633        Table::Column col;
    659634
     
    672647
    673648        return true;
     649    }
     650
     651    virtual bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const BlockHeaderWriter& header, const string& comment="", bool addHeaderKeys=true)
     652    {
     653        return AddColumn(cnt, typechar, name, unit, comment, addHeaderKeys);
     654    }
     655    virtual bool AddColumn(const string& compressionScheme, uint32_t cnt, char typechar, const string& name, const string& unit,  const string& comment="", bool addHeaderKeys=true)
     656    {
     657        if (compressionScheme != "" &&
     658            compressionScheme != "RAW")
     659        {
     660#ifdef __EXCEPTIONS
     661            throw runtime_error("Trying to add a compressed column to an uncompressed file");
     662#else
     663            gLog << ___err___ << "ERROR - Trying to add a compressed column to an uncompressed file" << endl;
     664            return false;
     665#endif
     666        }
     667        return AddColumn(cnt, typechar, name, unit, comment, addHeaderKeys);
    674668    }
    675669
  • trunk/Mars/mcore/zfits.h

    r17141 r17221  
    1212#include "huffman.h"
    1313
     14#include "FITS.h"
     15
     16using namespace FITS;
     17
    1418#ifndef __MARS__
    1519namespace std
     
    2024{
    2125public:
    22 
    23     enum CompressionProcess_t
    24     {
    25         kFactRaw       = 0x0,
    26         kFactSmoothing = 0x1,
    27         kFactHuffman16 = 0x2
    28     };
    29 
    30     enum RowOrdering_t
    31     {
    32         kOrderByCol = 'C',
    33         kOrderByRow = 'R'
    34     };
    3526
    3627    // Basic constructor
     
    110101            return;
    111102        }
    112 
    113103        ReadBinaryRow(row, dest);
    114104    }
    115105
    116106private:
    117 #ifndef __CINT__
    118     //Structure helper for reading tiles headers
    119     struct TileHeader
    120     {
    121       char     id[4];
    122       uint32_t numRows;
    123       uint64_t size;
    124 
    125       TileHeader() {}
    126 
    127       TileHeader(uint32_t nRows,
    128                  uint64_t s) : id({'T', 'I', 'L', 'E'}),
    129                                  numRows(nRows),
    130                                  size(s)
    131       { };
    132     } __attribute__((__packed__));
    133 
    134     //Structure helper for reading blocks headers and compresion schemes
    135     struct BlockHeader
    136     {
    137         uint64_t      size;
    138         char          ordering;
    139         unsigned char numProcs;
    140         uint16_t      processings[];
    141 
    142         BlockHeader(uint64_t      s=0,
    143                     char          o=kOrderByRow,
    144                     unsigned char n=1) : size(s),
    145                                          ordering(o),
    146                                          numProcs(n)
    147         {}
    148     } __attribute__((__packed__));
    149 #endif
     107
    150108    // Do what it takes to initialize the compressed structured
    151109    void InitCompressionReading()
     
    240198        const streampos catalogStart = tellg();
    241199
    242 fChkData.reset();
     200        fChkData.reset();
    243201
    244202        //do the actual reading
     
    501459                default:
    502460                    clear(rdstate()|ios::badbit);
     461                    ostringstream str;
     462                    str << "Unkown processing applied to data. Col " << i << " proc " << j << " out of " << (int)head->numProcs;
    503463#ifdef __EXCEPTIONS
    504                     throw runtime_error("Unknown processing applied to data. Aborting");
     464                    throw runtime_error(str.str());
    505465#else
    506466                    gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
  • trunk/Mars/mcore/zofits.h

    r17219 r17221  
    1010#include "MemoryManager.h"
    1111
     12#include "FITS.h"
     13
     14#ifdef USE_BOOST_THREADS
     15#include <boost/thread.hpp>
     16#endif
     17
     18using namespace FITS;
     19
    1220#ifndef __MARS__
    1321namespace std
     
    2129    public:
    2230
    23         //This has been duplicated from zfits. Should be be located one level up ?
    24         //If so, where ?
    25         enum CompressionProcess_t
    26         {
    27             kFactRaw       = 0x0,
    28             kFactSmoothing = 0x1,
    29             kFactHuffman16 = 0x2
    30         };
    31 
    32         enum RowOrdering_t
    33         {
    34             kOrderByCol = 'C',
    35             kOrderByRow = 'R'
    36         };
    37 
    38         //TileHeaders are only written, but never read-back
    39         //They are here to be able to recover raw data from binary if the header is corrupted
    40         //Or to cross-check the data, if desired: the zfits method CheckIfFileIsConsistent can do this
    41         struct TileHeader
    42         {
    43           char     id[4];
    44           uint32_t numRows;
    45           uint64_t size;
    46           TileHeader(uint32_t nRows=0,
    47                      uint64_t s=0) : id({'T', 'I', 'L', 'E'}),
    48                                      numRows(nRows),
    49                                      size(s)
    50           { };
    51         } __attribute__((__packed__));
    52 
    53         //BlockHeaders are written before every compressed blob of data
    54         struct BlockHeader
    55         {
    56             uint64_t      size;
    57             char          ordering;
    58             unsigned char numProcs;
    59             BlockHeader(uint64_t      s=0,
    60                         char          o=zfits::kOrderByRow,
    61                         unsigned char n=1) : size(s),
    62                                              ordering(o),
    63                                              numProcs(n)
    64             {}
    65         } __attribute__((__packed__)) ;
    66 
    67 
    6831        struct WriteTarget
    6932        {
    7033            bool operator < (const WriteTarget& other)
    7134            {
    72                 tile_num < other.tile_num;
     35                return tile_num < other.tile_num;
    7336            }
    7437            uint32_t tile_num;
     
    8447            }
    8548            shared_ptr<MemoryChunk> src;
     49            shared_ptr<MemoryChunk> transposed_src;
    8650            WriteTarget             target;
    8751            uint32_t                num_rows;
     
    9458               uint64_t maxUsableMem=0) : ofits(),
    9559                                          fMemPool(0, maxUsableMem),
    96                                           fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true)
     60                                          fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true, false)
    9761        {
    9862            InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
    99             SetNumWorkingThreads(1);
     63            SetNumWorkingThreads(fNumQueues);
    10064        }
    10165
     
    10569               uint64_t maxUsableMem=0) : ofits(fname),
    10670                                          fMemPool(0, maxUsableMem),
    107                                           fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true)
     71                                          fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true, false)
    10872        {
    10973            InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
    110             SetNumWorkingThreads(1);
    111         }
    112 
    113         ~zofits()
     74            SetNumWorkingThreads(fNumQueues);
     75        }
     76
     77        virtual ~zofits()
    11478        {
    11579        }
     
    11882        void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0, uint64_t maxUsableMem=0)
    11983        {
     84            if (nt == 0)
     85                throw runtime_error("Cannot work with a catalog of size 0. sorry.");
     86
    12087            fCheckOffset  = 0;
    12188
     
    12390            fNumRowsPerTile = rpt;
    12491
    125             fNumQueues   = 0;
    126             fQueueLooper = 0;
    127 
    12892            fBuffer       = NULL;
    12993            fRealRowWidth = 0;
     94            fCatalogExtraRows = 0;
    13095
    13196            fCatalogOffset    =  0;
    132             fStartCellsOffset = -1;
    133             fDataOffset       = -1;
    13497
    13598            fMaxUsableMem = maxUsableMem;
    136         }
    137 
    138         //whether or not a calibration was given to the file writer
    139         bool IsOffsetCalibrated()
    140         {
    141             return (fOffsetCalibration.size() != 0);
    142         }
    143 
    144         //assign a given drs offset calibration
    145         void SetDrsCalibration(const float* calib)
    146         {
    147             if (!IsOffsetCalibrated())
    148                 fOffsetCalibration.resize(1440*1024);
    149 
    150             for (uint32_t i=0;i<1440*1024;i++)
    151                 fOffsetCalibration[i] = (int16_t)(calib[i]*4096.f/2000.f);
    152         }
    153 
    154         void SetDrsCalibration(const vector<float>& calib)
    155         {
    156             if (calib.size() != 1440*1024)
    157 #ifdef __EXCEPTIONS
    158             throw runtime_error("Cannot load calibration with anything else than 1024 samples per pixel");
    159 #else
    160             gLog << ___err___ << "ERROR - Cannot load calibration with anything else than 1024 samples per pixel");
    161 #endif
    162             SetDrsCalibration(calib.data());
    163         }
    164 
    165         void LoadDrsCalibrationFromFile(const string& fileName)
    166         {
    167             factfits drsFile(fileName);
    168             float* drsCalibFloat  = reinterpret_cast<float*>(drsFile.SetPtrAddress("BaselineMean"));
    169 
    170             drsFile.GetNextRow();
    171 
    172             SetDrsCalibration(drsCalibFloat);
    173         }
     99#ifdef __EXCEPTIONS
     100            fThreadsException = exception_ptr();
     101#endif
     102        }
     103
    174104
    175105        //write the header of the binary table
    176         bool WriteTableHeader(const char* name="DATA")
     106        virtual bool WriteTableHeader(const char* name="DATA")
    177107        {
    178108            if (!reallocateBuffers())
     
    181111            ofits::WriteTableHeader(name);
    182112
    183             //start the compression queues
    184             for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
    185                 it->start();
     113            if (fNumQueues != 0)
     114            {
     115                //start the compression queues
     116                for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
     117                    it->start();
     118
     119                fWriteToDiskQueue.start();
     120            }
    186121
    187122            //mark that no tile has been written so far
    188123            fLatestWrittenTile = -1;
    189124
    190             if (IsOffsetCalibrated())
    191             {//retrieve the column storing the start cell offsets, if required.
    192 
    193                 for (auto it=fRealColumns.begin(); it!=fRealColumns.end(); it++)//Table.cols.begin(); it!= fTable.cols.end(); it++)
    194                 {
    195                     if (it->col.name == "StartCellData")
    196                         fStartCellsOffset = it->col.offset;
    197                     if (it->col.name == "Data")
    198                     {
    199                         fNumSlices = it->col.num;
    200                         fDataOffset = it->col.offset;
    201                         if (fNumSlices % 1440 != 0)
    202                         {
    203 #ifdef __EXCEPTIONS
    204                             throw runtime_error("Number of data samples not a multiple of 1440.");
    205 #else
    206                             gLog << ___err___ << "ERROR - Number of data samples not a multiple of 1440. Doing it uncalibrated." << endl;
    207 #endif
    208                             fOffsetCalibration.resize(0);
    209                         }
    210                         fNumSlices /= 1440;
    211                     }
    212                 }
    213                 if (fStartCellsOffset < 0)
    214                 {
    215 #ifdef __EXCEPTIONS
    216                     throw runtime_error("FACT Calibration requested, but \"StartCellData\" column not found.");
    217 #else
    218                     gLog << ___err___ << "ERROR - FACT Calibration requested, but \"StartCellData\" column not found. Doing it uncalibrated." << endl;
    219 #endif
    220                     //throw away the calibration data
    221                     fOffsetCalibration.resize(0);
    222                 }
    223                 if (fDataOffset < 0)
    224                 {
    225 #ifdef __EXCEPTIONS
    226                     throw runtime_error("FACT Calibration requested, but \"Data\" column not found.");
    227 #else
    228                     gLog << ___err___ << "ERROR - FACT Calibration requested, but \"Data\" column not found. Doing it uncalibrated." << endl;
    229 #endif
    230                     //throw away the calibration data
    231                     fOffsetCalibration.resize(0);
    232                 }
    233             }
     125            return good();
    234126        }
    235127
     
    246138            SetInt("ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
    247139            SetInt("THEAP", 0, "");
    248             SetStr("RAWSUM", "         0", "Checksum of raw littlen endian data");
    249 
    250 
     140            SetStr("RAWSUM", "         0", "Checksum of raw little endian data");
     141            SetFloat("ZRATIO", 0, "Compression ratio");
     142
     143            fCatalogExtraRows = 0;
    251144            fRawSum.reset();
    252145        }
    253146
    254         bool WriteDrsOffsetsTable()
    255         {
    256             if (!IsOffsetCalibrated())
    257                 return false;
    258 
    259             ofits c;
    260             c.SetStr("XTENSION", "BINTABLE"            , "binary table extension");
    261             c.SetInt("BITPIX"  , 8                     , "8-bit bytes");
    262             c.SetInt("NAXIS"   , 2                     , "2-dimensional binary table");
    263             c.SetInt("NAXIS1"  , 1024*1440*2           , "width of table in bytes");
    264             c.SetInt("NAXIS2"  , 1                     , "number of rows in table");
    265             c.SetInt("PCOUNT"  , 0                     , "size of special data area");
    266             c.SetInt("GCOUNT"  , 1                     , "one data group (required keyword)");
    267             c.SetInt("TFIELDS" , 1                     , "number of fields in each row");
    268             c.SetStr("CHECKSUM", "0000000000000000"    , "Checksum for the whole HDU");
    269             c.SetStr("DATASUM" ,  "         0"         , "Checksum for the data block");
    270             c.SetStr("EXTNAME" , "ZDrsCellOffsets"     , "name of this binary table extension");
    271             c.SetStr("TTYPE1"  , "OffsetCalibration"   , "label for field   1");
    272             c.SetStr("TFORM1"  , "1474560I"            , "data format of field: 2-byte INTEGER");
    273             c.End();
    274 
    275             vector<char> swappedOffsets;
    276             swappedOffsets.resize(1024*1440*sizeof(int16_t));
    277             revcpy<sizeof(int16_t)>(swappedOffsets.data(), (char*)(fOffsetCalibration.data()), 1024*1440);
    278 
    279             Checksum datasum;
    280             datasum.add(swappedOffsets.data(), sizeof(int16_t)*1024*1440);
    281 
    282             ostringstream dataSumStr;
    283             dataSumStr << datasum.val();
    284             c.SetStr("DATASUM", dataSumStr.str());
    285 
    286             datasum += c.WriteHeader(*this);
    287 
    288             const off_t here_I_am = tellp();
    289 
    290             c.SetStr("CHECKSUM", datasum.str());
    291             c.WriteHeader(*this);
    292 
    293             seekp(here_I_am);
    294 
    295             write(swappedOffsets.data(), swappedOffsets.size());
    296 
    297             AlignTo2880Bytes();
    298 
     147        virtual bool WriteDrsOffsetsTable()
     148        {
    299149            return good();
    300150        }
     
    335185            return good();
    336186        }
     187        virtual void DrsOffsetCalibrate(char* )
     188        {
     189
     190        }
     191
     192        void GrowCatalog()
     193        {
     194            uint32_t orig_catalog_size = fCatalog.size();
     195
     196            fCatalog.resize(fCatalog.size()*2);
     197            for (uint32_t i=orig_catalog_size;i<fCatalog.size(); i++)
     198            {
     199                fCatalog[i].resize(fTable.num_cols);
     200                for (auto it=(fCatalog[i].begin()); it!=fCatalog[i].end(); it++)
     201                    *it = CatalogEntry(0,0);
     202            }
     203
     204            fCatalogExtraRows += orig_catalog_size;
     205            fNumTiles         += orig_catalog_size;
     206        }
    337207
    338208        bool WriteRow(const void* ptr, size_t cnt, bool byte_swap=true)
     
    350220            if (fTable.num_rows >= fNumRowsPerTile*fNumTiles)
    351221            {
     222//                GrowCatalog();
    352223#ifdef __EXCEPTIONS
    353224                throw runtime_error("Maximum number of rows exceeded for this file");
     
    384255            fRawSum.add(fRawSumBuffer, false);
    385256
    386             if (IsOffsetCalibrated())
    387             {
    388 
    389                 int16_t* startCell = reinterpret_cast<int16_t*>(target_location + fStartCellsOffset);
    390                 int16_t* data      = reinterpret_cast<int16_t*>(target_location + fDataOffset);
    391 
    392                 for (uint32_t ch=0; ch<1440; ch++)
    393                 {
    394                     if (startCell[ch] < 0)
    395                     {
    396                         data += fNumSlices;
    397                         continue;
    398                     }
    399 
    400                     const int16_t modStart = startCell[ch]%1024;
    401                     const int16_t *off     = fOffsetCalibration.data() + ch*1024;
    402 
    403                     const int16_t* cal        = off+modStart;
    404                     const int16_t* end_stride = data+fNumSlices;
    405 
    406                     if (modStart+fNumSlices > 1024)
    407                     {
    408                         while (cal < off+1024)
    409                             *data++ -= *cal++;
    410                         cal = off;
    411                     }
    412 
    413                     while (data<end_stride)
    414                         *data++ -= *cal++;
    415                 }
    416             }
     257            DrsOffsetCalibrate(target_location);
    417258
    418259            fTable.num_rows++;
     
    423264                SetNextCompression(compress_target);
    424265
    425                 if (!fCompressionQueues[fQueueLooper].post(compress_target))
    426                     throw runtime_error("I could not post this buffer. This does not make sense...");
    427 
    428                 fQueueLooper = (fQueueLooper+1)%fNumQueues;
    429             }
    430 
    431             return true;
     266                if (fNumQueues == 0)
     267                { //no worker threads. do everything in-line
     268                    uint64_t size_to_write = CompressBuffer(compress_target);
     269
     270                    WriteTarget write_target;
     271                    write_target.size     = size_to_write;
     272                    write_target.target   = compress_target.target.target;
     273                    write_target.tile_num = compress_target.target.tile_num;
     274
     275                    if (!WriteBufferToDisk(write_target))
     276                        throw runtime_error("Something went wrong while writing to disk");
     277                }
     278                else
     279                {
     280                    //if all queues are empty, use queue 0
     281                     uint32_t min_index     = 0;
     282                     uint32_t min_size      = numeric_limits<uint32_t>::max();
     283                     uint32_t current_index = 0;
     284
     285                     for (auto it=fCompressionQueues.begin(); it!=fCompressionQueues.end(); it++)
     286                     {
     287                         if (it->size() < min_size)
     288                         {
     289                             min_index = current_index;
     290                             min_size = it->size();
     291                         }
     292                         current_index++;
     293                     }
     294
     295                    if (!fCompressionQueues[min_index].post(compress_target))
     296                        throw runtime_error("I could not post this buffer. This does not make sense...");
     297                }
     298            }
     299
     300            return good();
    432301        }
    433302
     
    441310        void SetNextCompression(CompressionTarget& target)
    442311        {
     312            //get space for transposed data
    443313            shared_ptr<MemoryChunk> transposed_data = fMemPool.malloc();
    444314
    445             copyTransposeTile(fBuffer, transposed_data.get()->get());
    446 
     315            //fill up write to disk target
    447316            WriteTarget write_target;
    448317            write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
     
    450319            write_target.target   = fMemPool.malloc();
    451320
    452             target.src      = transposed_data;
     321            //fill up compression target
     322            target.src            = fSmartBuffer;
     323            target.transposed_src      = transposed_data;
    453324            target.target   = write_target;
    454325            target.num_rows = fTable.num_rows;
     326
     327            //get a new buffer to host the incoming data
     328            fSmartBuffer = fMemPool.malloc();
     329            fBuffer      = fSmartBuffer.get()->get();
     330        }
     331
     332        void ShrinkCatalog()
     333        {
     334            //did we write more rows than what the catalog could host ?
     335            if (fCatalogExtraRows != 0)
     336            {
     337                //how many rows can the regular catalog host ?
     338                const uint32_t max_regular_rows = (fCatalog.size() - fCatalogExtraRows)*fNumRowsPerTile;
     339                //what's the shrink factor to be applied ?
     340                const uint32_t shrink_factor = fTable.num_rows/max_regular_rows + ((fTable.num_rows%max_regular_rows) ? 1 : 0);
     341
     342                //shrink the catalog !
     343                for (uint32_t i=0; i<fTable.num_rows/fNumRowsPerTile; i+= shrink_factor)
     344                {//add the elements one by one, so that the empty ones at the end (i.e. fTable.num_rows%shrink_factor) do not create havok
     345                    const uint32_t target_catalog_row = i/shrink_factor;
     346                    //move data from current row (i) to target row
     347                    for (uint32_t j=0; j<fTable.num_cols; j++)
     348                    {
     349                        fCatalog[target_catalog_row][j].second = fCatalog[i][j].second;
     350                        fCatalog[target_catalog_row][j].first  = 0;
     351                        uint64_t last_size   = fCatalog[i][j].first;
     352                        uint64_t last_offset = fCatalog[i][j].second;
     353
     354                        for (uint32_t k=1; k<shrink_factor; k++)
     355                        {
     356                           if (fCatalog[i+k][j].second != 0)
     357                           {
     358                               fCatalog[target_catalog_row][j].first +=  fCatalog[i+k][j].second - last_offset;
     359                           }
     360                           else
     361                           {
     362                               fCatalog[target_catalog_row][j].first += last_size;
     363                               break;
     364                           }
     365                           last_size   = fCatalog[i+k][j].first;
     366                           last_offset = fCatalog[i+k][j].second;
     367                        }
     368                    }
     369                }
     370
     371                fCatalog.resize(fCatalog.size() - fCatalogExtraRows);
     372
     373                //update header keywords
     374                const uint32_t new_num_rows_per_tiles = fNumRowsPerTile*shrink_factor;
     375                const uint32_t new_num_tiles_written = (fTable.num_rows + new_num_rows_per_tiles-1)/new_num_rows_per_tiles;
     376                SetInt("THEAP", new_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols);
     377                SetInt("NAXIS2", new_num_tiles_written);
     378                SetInt("ZTILELEN", new_num_rows_per_tiles);
     379                cout << "New num rows per tiles: " << new_num_rows_per_tiles << " shrink factor: " << shrink_factor << endl;
     380                cout << "Num tiles written: " << new_num_tiles_written << endl;
     381            }
    455382        }
    456383
    457384        bool close()
    458385        {
    459             if (tellp() < 0)
    460                 return false;
    461 
    462386            for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
    463387                it->wait();
     
    465389            fWriteToDiskQueue.wait();
    466390
     391            if (tellp() < 0)
     392            {
     393#ifdef __EXCEPTIONS
     394                throw runtime_error("Something went wrong while writing to disk...");
     395#else
     396                return false;
     397#endif
     398            }
     399
     400#ifdef __EXCEPTIONS
     401            //check if something hapenned to the compression threads
     402            if (fThreadsException != exception_ptr())
     403            {
     404                rethrow_exception(fThreadsException);
     405            }
     406#endif
     407
    467408            if (fTable.num_rows%fNumRowsPerTile != 0)
    468409            {
     
    470411                SetNextCompression(compress_target);
    471412
     413                //set number of threads to zero before calling compressBuffer
     414                int32_t backup_num_queues = fNumQueues;
     415                fNumQueues = 0;
    472416                uint64_t size_to_write = CompressBuffer(compress_target);
     417                fNumQueues = backup_num_queues;
    473418
    474419                WriteTarget write_target;
     
    518463            }
    519464
     465            float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
     466            SetFloat("ZRATIO", compression_ratio);
     467
    520468            //add to the heap size the size of the gap between the catalog and the actual heap
    521469            heap_size += (fCatalog.size() - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
     
    523471            SetInt("PCOUNT", heap_size, "size of special data area");
    524472
     473
    525474            //Just for updating the fCatalogSum value
    526475            WriteCatalog();
     
    548497        bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
    549498        {
    550             BlockHeader head;
    551             vector<uint16_t> processing(1);
    552             processing[0] = kFactRaw;
    553             AddColumn(cnt, typechar, name, unit, head, processing, comment, addHeaderKeys);
    554         }
    555 
    556         bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, BlockHeader& header, vector<uint16_t>& comp_sequence, const string& comment="", bool addHeaderKeys=true)
     499            BlockHeaderWriter head;
     500            return AddColumn(cnt, typechar, name, unit, head, comment, addHeaderKeys);
     501        }
     502
     503        bool AddColumn(const string& compressionScheme, uint32_t cnt, char typechar, const string& name, const string& unit,  const string& comment="", bool addHeaderKeys=true)
     504        {
     505            BlockHeaderWriter head(compressionScheme);
     506            return AddColumn(cnt, typechar, name, unit, head, comment, addHeaderKeys);
     507        }
     508        bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const BlockHeaderWriter& header, const string& comment="", bool addHeaderKeys=true)
    557509        {
    558510            if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
     
    570522            fRealRowWidth += size*cnt;
    571523
    572             fRealColumns.emplace_back(CompressedColumn(col, header, comp_sequence));
     524            fRealColumns.emplace_back(CompressedColumn(col, header));
    573525
    574526            ostringstream strKey, strVal, strCom;
     
    583535            strKey << "ZCTYP" << fRealColumns.size();
    584536            strVal << "FACT";
    585             strCom << "Comp. of FACT telescope";
     537            strCom << "Compression type FACT";
    586538            SetStr(strKey.str(), strVal.str(), strCom.str());
    587539
     
    589541        }
    590542
    591         bool SetNumWorkingThreads(uint32_t num)
     543        bool AddColumnShort(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
     544        { return AddColumn(compressionScheme, cnt, 'I', name, unit, comment); }
     545        bool AddColumnInt(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
     546        { return AddColumn(compressionScheme, cnt, 'J', name, unit, comment); }
     547        bool AddColumnLong(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
     548        { return AddColumn(compressionScheme, cnt, 'K', name, unit, comment); }
     549        bool AddColumnFloat(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
     550        { return AddColumn(compressionScheme, cnt, 'E', name, unit, comment); }
     551        bool AddColumnDouble(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
     552        { return AddColumn(compressionScheme, cnt, 'D', name, unit, comment); }
     553        bool AddColumnChar(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
     554        { return AddColumn(compressionScheme, cnt, 'A', name, unit, comment); }
     555        bool AddColumnByte(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
     556        { return AddColumn(compressionScheme, cnt, 'B', name, unit, comment); }
     557        bool AddColumnBool(const string& compressionScheme, uint32_t cnt, const string &name, const string &unit="", const string &comment="")
     558        { return AddColumn(compressionScheme, cnt, 'L', name, unit, comment); }
     559
     560        static void SetNumThreads(int32_t num) { fNumQueues = num;}
     561        static int32_t GetNumThreads() { return fNumQueues;}
     562    protected:
     563
     564        bool SetNumWorkingThreads(int32_t num)
    592565        {
    593566            if (is_open())
     
    596569                throw runtime_error("File must be closed before changing the number of compression threads");
    597570#else
    598                 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads");
     571                gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads";
    599572#endif
    600573                return false;
    601574            }
    602             if (num < 1 || num > 64)
    603             {
    604 #ifdef __EXCEPTIONS
    605                 throw runtime_error("Number of threads must be between 1 and 64");
    606 #else
    607                 gLog << ___err___ << "ERROR - Number of threads must be between 1 and 64");
     575#ifdef USE_BOOST_THREADS
     576            int32_t num_available_cores = boost::thread::hardware_concurrency();
     577#else
     578            int32_t num_available_cores = thread::hardware_concurrency();
     579#endif
     580
     581            if (num_available_cores == 0)
     582            {//could not detect number of available cores from system properties...
     583                //Assuming that 5 cores are availables (4 compression, 1 write)
     584                num_available_cores = 5;
     585            }
     586            if (num > num_available_cores)
     587            {
     588                ostringstream str;
     589                str << "Number of threads cannot be greater than physically available (" << num_available_cores << ")";
     590#ifdef __EXCEPTIONS
     591                throw runtime_error(str.str());
     592#else
     593                gLog << ___err___ << "ERROR - " << str.str();
    608594#endif
    609595                return false;
    610596            }
    611597
    612             if (fCompressionQueues.size() == num)
     598            if (num == -1)
     599                num = num_available_cores-2; // 1 for writing, one for the main thread
     600
     601            if (fCompressionQueues.size() == (uint32_t)num)
    613602                return true;
    614603
     
    617606
    618607            //shrink
    619             if (num < fCompressionQueues.size())
     608            if ((uint32_t)num < fCompressionQueues.size())
    620609            {
    621610                fCompressionQueues.resize(num, queue);
     
    626615            fCompressionQueues.resize(num, queue);
    627616
    628             fNumQueues   = num;
    629             fQueueLooper = 0;
     617            fNumQueues = num;
    630618
    631619            return true;
    632620        }
    633 
    634 
    635     private:
    636621
    637622        bool reallocateBuffers()
     
    641626
    642627            fSmartBuffer = fMemPool.malloc();
    643             fBuffer = fSmartBuffer.get()->get();
    644 //            memset(fBuffer, 0, 4);
    645 //            fBuffer += 4;
     628            fBuffer      = fSmartBuffer.get()->get();
    646629
    647630            fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
     
    686669        }
    687670
    688         bool CompressBuffer(const CompressionTarget& target)
    689         {
    690             //compress the buffer
    691             uint64_t compressed_size = compressBuffer(target.target.target.get()->get(), target.src.get()->get(), target.num_rows);
     671        uint32_t CompressBuffer(const CompressionTarget& target)
     672        {
     673            uint64_t compressed_size = 0;
     674#ifdef __EXCEPTIONS
     675            try
     676            {
     677#endif
     678                //transpose the original data
     679                copyTransposeTile(target.src.get()->get(), target.transposed_src.get()->get());
     680
     681                //compress the buffer
     682                compressed_size = compressBuffer(target.target.target.get()->get(), target.transposed_src.get()->get(), target.num_rows);
     683#ifdef __EXCEPTIONS
     684            }
     685            catch (...)
     686            {
     687                fThreadsException = current_exception();
     688                if (fNumQueues == 0)
     689                    rethrow_exception(fThreadsException);
     690            }
     691#endif
     692
     693            if (fNumQueues == 0)
     694                return compressed_size;
    692695
    693696            //post the result to the writing queue
     
    699702
    700703            fWriteToDiskQueue.post(wt);
    701             return true;
     704
     705            return compressed_size;
    702706        }
    703707
     
    705709        {
    706710            //is this the tile we're supposed to write ?
    707             if (target.tile_num != fLatestWrittenTile+1)
     711            if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
    708712                return false;
    709713
     
    711715
    712716            //write the buffer to disk.
    713             writeCompressedDataToDisk(target.target.get()->get(), target.size);
    714 
    715             return true;
     717            return writeCompressedDataToDisk(target.target.get()->get(), target.size);
    716718        }
    717719
     
    736738                if (fRealColumns[i].col.num == 0) continue;
    737739
    738                 BlockHeader& head = fRealColumns[i].head;
    739                 const vector<uint16_t>& sequence = fRealColumns[i].comp_sequence;
     740                BlockHeaderWriter& head = fRealColumns[i].block_head;
    740741
    741742                //set the default byte telling if uncompressed the compressed Flag
    742743                uint64_t previousOffset = compressedOffset;
     744
    743745                //skip header data
    744                 compressedOffset += sizeof(BlockHeader) + sizeof(uint16_t)*sequence.size();
    745 
    746                 for (uint32_t j=0;j<sequence.size(); j++)
     746                compressedOffset += head.SizeOnDisk();
     747
     748                for (uint32_t j=0;j<head.NumProcs();j++)//sequence.size(); j++)
    747749                {
    748                     switch (sequence[j])
     750                    switch (head.Proc(j))
    749751                    {
    750                         case zfits::kFactRaw:
    751                                 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset,
    752                                                                          src  + offset,
    753                                                                          thisRoundNumRows,
    754                                                                          fRealColumns[i].col.size,
    755                                                                          fRealColumns[i].col.num);
     752                        case kFactRaw:
     753                                compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src  + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
    756754                        break;
    757                         case zfits::kFactSmoothing:
    758                                 applySMOOTHING(dest + compressedOffset,
    759                                                src  + offset,
    760                                                thisRoundNumRows,
    761                                                fRealColumns[i].col.size,
    762                                                fRealColumns[i].col.num);
     755                        case kFactSmoothing:
     756                                applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
    763757                        break;
    764                         case zfits::kFactHuffman16:
    765                             if (head.ordering == zfits::kOrderByCol)
    766                                 compressedOffset += compressHUFFMAN(dest + compressedOffset,
    767                                                                     src  + offset,
    768                                                                     thisRoundNumRows,
    769                                                                     fRealColumns[i].col.size,
    770                                                                     fRealColumns[i].col.num);
     758                        case kFactHuffman16:
     759                            if (head.Ordering() == kOrderByCol)
     760                                compressedOffset += compressHUFFMAN(dest + compressedOffset, src  + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
    771761                            else
    772                                 compressedOffset += compressHUFFMAN(dest + compressedOffset,
    773                                                                     src  + offset,
    774                                                                     fRealColumns[i].col.num,
    775                                                                     fRealColumns[i].col.size,
    776                                                                     thisRoundNumRows);
     762                                compressedOffset += compressHUFFMAN(dest + compressedOffset, src  + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
    777763                        break;
    778764                        default:
    779                             cout << "ERROR: Unkown compression sequence entry: " << sequence[j] << endl;
    780                         break;
     765                        {
     766                            ostringstream str;
     767                            str << "Unkown compression sequence entry: " << head.Proc(j);
     768#ifdef __EXCEPTIONS
     769                            throw runtime_error(str.str());
     770#else
     771                            gLog << ___err___ << "ERROR - " << str.str();
     772                            return 0;
     773#endif
     774                        }
    781775                    }
    782776                }
    783777
    784                 //check if compressed size is larger than uncompressed
    785                 if (sequence[0] != zfits::kFactRaw &&
    786                     compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size())
     778               //check if compressed size is larger than uncompressed
     779                if ((head.Proc(0) != kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.SizeOnDisk()))// && two)
    787780                {//if so set flag and redo it uncompressed
    788                     cout << "REDOING UNCOMPRESSED" << endl;
    789                     compressedOffset = previousOffset + sizeof(BlockHeader) + 1;
    790                     compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
    791                     BlockHeader he;
    792                     he.size = compressedOffset - previousOffset;
    793                     he.numProcs = 1;
    794                     he.ordering = zfits::kOrderByRow;
    795                     memcpy(dest + previousOffset, (char*)(&he), sizeof(BlockHeader));
    796                     dest[previousOffset+sizeof(BlockHeader)] = zfits::kFactRaw;
     781                    cout << "Redoing uncompressed ! " << endl;
     782                    //de-smooth !
     783                    if (head.Proc(0) == kFactSmoothing)
     784                        UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
     785
     786                    BlockHeaderWriter he;
     787                    compressedOffset = previousOffset + he.SizeOnDisk();
     788                    compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
     789                    he.SetBlockSize(compressedOffset - previousOffset);
     790                    he.Write(dest+previousOffset);
    797791                    offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
    798792                    fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
     
    800794                }
    801795
    802                 head.size = compressedOffset - previousOffset;
    803                 memcpy(dest + previousOffset, (char*)(&head), sizeof(BlockHeader));
    804                 memcpy(dest + previousOffset+sizeof(BlockHeader), sequence.data(), sizeof(uint16_t)*sequence.size());
     796                head.SetBlockSize(compressedOffset - previousOffset);
     797                head.Write(dest + previousOffset);
    805798
    806799                offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
     
    814807        }
    815808
    816         void copyTransposeTile(const char* src, char* dest)//uint32_t index)
     809        void copyTransposeTile(const char* src, char* dest)
    817810        {
    818811            uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
     
    821814            for (uint32_t i=0;i<fRealColumns.size();i++)
    822815            {
    823                 switch (fRealColumns[i].head.ordering)
     816                switch (fRealColumns[i].block_head.Ordering())
    824817                {
    825                     case zfits::kOrderByRow:
     818                    case kOrderByRow:
    826819                        for (uint32_t k=0;k<thisRoundNumRows;k++)
    827820                        {//regular, "semi-transposed" copy
     
    831824                    break;
    832825
    833                     case zfits::kOrderByCol :
    834                         for (int j=0;j<fRealColumns[i].col.num;j++)
     826                    case kOrderByCol :
     827                        for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
    835828                            for (uint32_t k=0;k<thisRoundNumRows;k++)
    836829                            {//transposed copy
     
    840833                    break;
    841834                    default:
    842                             cout << "Error: unknown column ordering: " << fRealColumns[i].head.ordering << endl;
     835                    {
     836                            ostringstream str;
     837                            str << "Unkown column ordering: " << fRealColumns[i].block_head.Ordering();
     838#ifdef __EXCEPTIONS
     839                            throw runtime_error(str.str());
     840#else
     841                            gLog << ___err___ << "ERROR - " << str.str();
     842                            return;
     843#endif
     844                    }
    843845                };
    844846            }
     
    846848
    847849        /// Specific compression functions
    848         uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
    849         {
    850             memcpy(dest, src, numRows*sizeOfElems*numRowElems);
    851             return numRows*sizeOfElems*numRowElems;
     850        uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
     851        {
     852            memcpy(dest, src, size);
     853            return size;
    852854        }
    853855
     
    862864            if (sizeOfElems < 2 )
    863865            {
    864                 cout << "Fatal ERROR: HUFMANN can only encode short or longer types" << endl;
     866#ifdef __EXCEPTIONS
     867                throw runtime_error("Fatal ERROR: HUFMANN can only encode short or longer types");
     868#else
     869                gLog << ___err___ << "ERROR - Fatal ERROR: HUFMANN can only encode short or longer types";
    865870                return 0;
     871#endif
    866872            }
    867873            uint32_t huffmanOffset = 0;
     
    884890        }
    885891
    886         uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
    887         {
    888             uint32_t colWidth = numRowElems;
    889             for (int j=colWidth*numRows-1;j>1;j--)
    890                 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
    891 
    892             return numRows*sizeOfElems*numRowElems;
    893         }
    894 
    895         //Offsets calibration stuff.
    896         vector<int16_t> fOffsetCalibration; ///< The calibration itself
    897         int32_t         fStartCellsOffset;  ///< Offset in bytes for the startcell data
    898         int32_t         fDataOffset;        ///< Offset in bytes for the data
    899         int32_t         fNumSlices;         ///< Number of samples per pixel per event
    900 
     892        uint32_t applySMOOTHING(char* data, uint32_t numElems)//uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
     893        {
     894            int16_t* short_data = reinterpret_cast<int16_t*>(data);
     895            for (int j=numElems-1;j>1;j--)
     896                short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
     897
     898            return numElems*sizeof(int16_t);
     899        }
     900        // Apply the inverse transform of the integer smoothing
     901        uint32_t UnApplySMOOTHING(char*   data, uint32_t   numElems)
     902        {
     903            int16_t* short_data = reinterpret_cast<int16_t*>(data);
     904            //un-do the integer smoothing
     905            for (uint32_t j=2;j<numElems;j++)
     906                short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
     907
     908            return numElems*sizeof(uint16_t);
     909        }
    901910        //Compressed data stuff
    902911        int32_t         fCheckOffset;       ///< offset to the data pointer to calculate the checksum
     
    904913        uint32_t        fNumRowsPerTile;
    905914
     915        MemoryManager        fMemPool;
     916
    906917        //thread related stuff
    907918        vector<Queue<CompressionTarget>> fCompressionQueues;
     
    909920
    910921        //thread related stuff
    911         uint32_t          fNumQueues;    ///< The number of threads that will be used to compress
    912         uint32_t          fQueueLooper;
     922        static int32_t          fNumQueues;    ///< The number of threads that will be used to compress
     923
    913924        int32_t           fLatestWrittenTile;
    914 
     925#ifdef __EXCEPTIONS
     926        exception_ptr     fThreadsException;
     927#endif
    915928        struct CatalogEntry
    916929        {
     
    927940        off_t                fCatalogOffset;
    928941        uint32_t             fRealRowWidth;
    929 
     942        uint32_t             fCatalogExtraRows;
    930943        vector<char>         fRawSumBuffer;
    931         MemoryManager        fMemPool;
    932944        uint64_t             fMaxUsableMem;
    933945
     
    935947        char*                   fBuffer;
    936948
    937 
    938949        struct CompressedColumn
    939950        {
    940             CompressedColumn(Table::Column& c, BlockHeader& h, vector<uint16_t>& cs) : col(c),
    941                                                                                        head(h),
    942                                                                                        comp_sequence(cs)
     951            CompressedColumn(const Table::Column& c, const BlockHeaderWriter& h) : col(c),
     952                                                                                   block_head(h)
    943953            {}
    944             Table::Column    col;
    945             BlockHeader      head;
    946             vector<uint16_t> comp_sequence;
     954            Table::Column     col;
     955            BlockHeaderWriter block_head;
    947956        };
    948957        vector<CompressedColumn> fRealColumns;
    949958
    950959};
     960
     961int32_t zofits::fNumQueues = 0;
    951962
    952963#ifndef __MARS__
     
    958969zofitsfile.SetNumWorkingThreads(numThreads);
    959970zofitsfile.open((fileNameOut).c_str());
    960 std::zofits::BlockHeader zoheader(0, zfits::kOrderByRow, 2);
     971std::zofits::BlockHeader zoheader(0, kOrderByRow, 2);
    961972vector<uint16_t> smoothmanProcessings(2);
    962 smoothmanProcessings[0] = zfits::kFactSmoothing;
    963 smoothmanProcessings[1] = zfits::kFactHuffman16;
     973smoothmanProcessings[0] = kFactSmoothing;
     974smoothmanProcessings[1] = kFactHuffman16;
    964975
    965976zofitsfile.AddColumn(sortedColumns[i].num,
Note: See TracChangeset for help on using the changeset viewer.