Ignore:
Timestamp:
10/15/13 15:05:54 (11 years ago)
Author:
lyard
Message:
draft zofits working with Queue and MemoryManager
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/Mars/mcore/zofits.h

    r17216 r17219  
    77
    88#include "ofits.h"
     9#include "Queue.h"
     10#include "MemoryManager.h"
    911
    1012#ifndef __MARS__
     
    6466
    6567
     68        struct WriteTarget
     69        {
     70            bool operator < (const WriteTarget& other)
     71            {
     72                tile_num < other.tile_num;
     73            }
     74            uint32_t tile_num;
     75            uint32_t size;
     76            shared_ptr<MemoryChunk> target;
     77        };
     78
     79        struct CompressionTarget
     80        {
     81            bool operator < (const CompressionTarget& other)
     82            {
     83                return target < other.target;
     84            }
     85            shared_ptr<MemoryChunk> src;
     86            WriteTarget             target;
     87            uint32_t                num_rows;
     88        };
     89
     90
    6691        //constructors
    6792        zofits(uint32_t numTiles=1000,
    68                uint32_t rowPerTile=100) : ofits()
    69         {
    70             InitMemberVariables(numTiles, rowPerTile);
     93               uint32_t rowPerTile=100,
     94               uint64_t maxUsableMem=0) : ofits(),
     95                                          fMemPool(0, maxUsableMem),
     96                                          fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true)
     97        {
     98            InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
    7199            SetNumWorkingThreads(1);
    72100        }
     
    74102        zofits(const char* fname,
    75103               uint32_t numTiles=1000,
    76                uint32_t rowPerTile=100) : ofits(fname)
    77         {
    78             InitMemberVariables(numTiles, rowPerTile);
     104               uint32_t rowPerTile=100,
     105               uint64_t maxUsableMem=0) : ofits(fname),
     106                                          fMemPool(0, maxUsableMem),
     107                                          fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true)
     108        {
     109            InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
    79110            SetNumWorkingThreads(1);
    80111        }
     
    85116
    86117        //initialization of member variables
    87         void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0)
     118        void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0, uint64_t maxUsableMem=0)
    88119        {
    89120            fCheckOffset  = 0;
    90             fThreadLooper = 0;
    91121
    92122            fNumTiles       = nt;
    93123            fNumRowsPerTile = rpt;
    94124
    95             fNumThreads  = 1;
    96             fThreadIndex = 0;   ///< A variable to assign threads indices
     125            fNumQueues   = 0;
     126            fQueueLooper = 0;
    97127
    98128            fBuffer       = NULL;
     
    102132            fStartCellsOffset = -1;
    103133            fDataOffset       = -1;
     134
     135            fMaxUsableMem = maxUsableMem;
    104136        }
    105137
     
    144176        bool WriteTableHeader(const char* name="DATA")
    145177        {
     178            if (!reallocateBuffers())
     179                throw ("While allocating memory: apparently there not as much free memory as advertized...");
     180
    146181            ofits::WriteTableHeader(name);
     182
     183            //start the compression queues
     184            for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
     185                it->start();
     186
     187            //mark that no tile has been written so far
     188            fLatestWrittenTile = -1;
    147189
    148190            if (IsOffsetCalibrated())
     
    208250
    209251            fRawSum.reset();
    210 
    211             //start the compression threads
    212             pthread_mutex_init(&fMutex, NULL);
    213 
    214             fThreadIndex = 0;
    215             for (uint32_t i=0;i<fNumThreads;i++)
    216                 pthread_create(&(fThread[i]), NULL, threadFunction, this);
    217 
    218             //wait for all threads to have started
    219             while (fNumThreads != fThreadIndex)
    220                 usleep(1000);
    221 
    222             //set the writing fence to the last thread (so that the first one can start writing right away)
    223             fThreadIndex = fNumThreads-1;
    224252        }
    225253
     
    391419
    392420            if (fTable.num_rows % fNumRowsPerTile == 0)
    393             {//give a new tile to compress to a thread
    394                 while (fThreadStatus[fThreadLooper] == _THREAD_COMPRESS_)
    395                     usleep(100000);
    396 
    397                 copyTransposeTile(fThreadLooper);
    398 
    399                 while (fThreadStatus[fThreadLooper] != _THREAD_WAIT_)
    400                     usleep(100000);
    401 
    402                 fThreadNumRows[fThreadLooper] = fTable.num_rows;
    403                 fThreadStatus[fThreadLooper] = _THREAD_COMPRESS_;
    404                 fThreadLooper = (fThreadLooper+1)%fNumThreads;
     421            {
     422                CompressionTarget compress_target;
     423                SetNextCompression(compress_target);
     424
     425                if (!fCompressionQueues[fQueueLooper].post(compress_target))
     426                    throw runtime_error("I could not post this buffer. This does not make sense...");
     427
     428                fQueueLooper = (fQueueLooper+1)%fNumQueues;
    405429            }
    406430
     
    415439        }
    416440
     441        void SetNextCompression(CompressionTarget& target)
     442        {
     443            shared_ptr<MemoryChunk> transposed_data = fMemPool.malloc();
     444
     445            copyTransposeTile(fBuffer, transposed_data.get()->get());
     446
     447            WriteTarget write_target;
     448            write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
     449            write_target.size     = 0;
     450            write_target.target   = fMemPool.malloc();
     451
     452            target.src      = transposed_data;
     453            target.target   = write_target;
     454            target.num_rows = fTable.num_rows;
     455        }
     456
    417457        bool close()
    418458        {
     
    420460                return false;
    421461
    422             //wait for compression threads to finish
    423             for (auto it=fThreadStatus.begin(); it!= fThreadStatus.end(); it++)
    424                 while (*it != _THREAD_WAIT_)
    425                     usleep(100000);
    426 
    427             for (auto it=fThreadStatus.begin(); it!= fThreadStatus.end(); it++)
    428                 *it = _THREAD_EXIT_;
    429 
    430             for (auto it=fThread.begin(); it!= fThread.end(); it++)
    431                 pthread_join(*it, NULL);
    432 
    433             pthread_mutex_destroy(&fMutex);
     462            for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
     463                it->wait();
     464
     465            fWriteToDiskQueue.wait();
    434466
    435467            if (fTable.num_rows%fNumRowsPerTile != 0)
    436468            {
    437                 copyTransposeTile(0);
    438                 fThreadNumRows[0] = fTable.num_rows;
    439                 uint32_t numBytes = compressBuffer(0);
    440                 writeCompressedDataToDisk(0, numBytes);
     469                CompressionTarget compress_target;
     470                SetNextCompression(compress_target);
     471
     472                uint64_t size_to_write = CompressBuffer(compress_target);
     473
     474                WriteTarget write_target;
     475                write_target.size     = size_to_write;
     476                write_target.target   = compress_target.target.target;
     477                write_target.tile_num = compress_target.target.tile_num;
     478
     479                if (!WriteBufferToDisk(write_target))
     480                    throw runtime_error("Something went wrong while writing the last tile...");
    441481            }
    442482
     
    449489            uint64_t heap_offset = fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2;
    450490            SetInt("ZHEAPPTR", heap_offset);
    451 //            SetInt("THEAP"   , heap_offset);0
    452491
    453492            const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
     
    547586            SetStr(strKey.str(), strVal.str(), strCom.str());
    548587
    549             return reallocateBuffers();
     588            return true;
    550589        }
    551590
     
    571610            }
    572611
    573             fNumThreads = num;
    574             fThreadStatus.resize(num);
    575             fThread.resize(num);
    576             fThreadNumRows.resize(num);
    577             for (uint32_t i=0;i<num;i++)
    578             {
    579                 fThreadNumRows[i] = 0;
    580                 fThreadStatus[i] = _THREAD_WAIT_;
    581             }
    582 
    583             return reallocateBuffers();
    584         }
     612            if (fCompressionQueues.size() == num)
     613                return true;
     614
     615            //cannot be const, as resize does not want it that way
     616            Queue<CompressionTarget> queue(bind(&zofits::CompressBuffer, this, placeholders::_1), false, false);
     617
     618            //shrink
     619            if (num < fCompressionQueues.size())
     620            {
     621                fCompressionQueues.resize(num, queue);
     622                return true;
     623            }
     624
     625            //grow
     626            fCompressionQueues.resize(num, queue);
     627
     628            fNumQueues   = num;
     629            fQueueLooper = 0;
     630
     631            return true;
     632        }
     633
    585634
    586635    private:
    587636
    588 
    589637        bool reallocateBuffers()
    590638        {
    591             fBufferVector.resize(fRealRowWidth*fNumRowsPerTile + 8); //8 more for checksuming
    592             memset(fBufferVector.data(), 0, 4);
    593             fBuffer = fBufferVector.data()+4;
     639            size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming;
     640            fMemPool.setChunkSize(chunk_size);
     641
     642            fSmartBuffer = fMemPool.malloc();
     643            fBuffer = fSmartBuffer.get()->get();
     644//            memset(fBuffer, 0, 4);
     645//            fBuffer += 4;
    594646
    595647            fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
    596 
    597 
    598             fTransposedBufferVector.resize(fNumThreads);
    599             fCompressedBufferVector.resize(fNumThreads);
    600             fTransposedBuffer.resize(fNumThreads);
    601             fCompressedBuffer.resize(fNumThreads);
    602             for (uint32_t i=0;i<fNumThreads;i++)
    603             {
    604                 fTransposedBufferVector[i].resize(fRealRowWidth*fNumRowsPerTile);
    605                 fCompressedBufferVector[i].resize(fRealRowWidth*fNumRowsPerTile + fRealColumns.size() + sizeof(TileHeader) + 8);
    606                 memset(fCompressedBufferVector[i].data(), 0, 4);
    607                 TileHeader tileHeader;
    608                 memcpy(fCompressedBufferVector[i].data()+4, &tileHeader, sizeof(TileHeader));
    609                 fTransposedBuffer[i] = fTransposedBufferVector[i].data();
    610                 fCompressedBuffer[i] = fCompressedBufferVector[i].data()+4;
    611             }
    612648
    613649            //give the catalog enough space
     
    622658        }
    623659
    624         bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
    625         {
    626             char* checkSumPointer = fCompressedBuffer[threadID];
     660        bool writeCompressedDataToDisk(char* src, uint32_t sizeToWrite)
     661        {
     662            char* checkSumPointer = src+4;
    627663            int32_t extraBytes = 0;
    628664            uint32_t sizeToChecksum = sizeToWrite;
     
    645681            fCheckOffset = (4 - extraBytes)%4;
    646682            //write data to disk
    647             write(fCompressedBuffer[threadID], sizeToWrite);
     683            write(src+4, sizeToWrite);
     684
    648685            return good();
    649686        }
    650687
    651         static void* threadFunction(void* context)
    652         {
    653             zofits* myself = static_cast<zofits*>(context);
    654 
    655             uint32_t myID = 0;
    656             pthread_mutex_lock(&(myself->fMutex));
    657             myID = myself->fThreadIndex++;
    658             pthread_mutex_unlock(&(myself->fMutex));
    659             uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->fNumThreads-1 : myID-1;
    660 
    661             while (myself->fThreadStatus[myID] != _THREAD_EXIT_)
    662             {
    663                 while (myself->fThreadStatus[myID] == _THREAD_WAIT_)
    664                     usleep(100000);
    665 
    666                 if (myself->fThreadStatus[myID] != _THREAD_COMPRESS_)
    667                     continue;
    668                 uint32_t numBytes = myself->compressBuffer(myID);
    669                 myself->fThreadStatus[myID] = _THREAD_WRITE_;
    670 
    671                 //wait for the previous data to be written
    672                 while (myself->fThreadIndex != threadToWaitForBeforeWriting)
    673                     usleep(1000);
    674                 //do the actual writing to disk
    675                 pthread_mutex_lock(&(myself->fMutex));
    676                 myself->writeCompressedDataToDisk(myID, numBytes);
    677                 myself->fThreadIndex = myID;
    678                 pthread_mutex_unlock(&(myself->fMutex));
    679                 myself->fThreadStatus[myID] = _THREAD_WAIT_;
    680             }
    681             return NULL;
    682         }
    683 
    684         uint64_t compressBuffer(uint32_t threadIndex)
    685         {
    686             uint32_t thisRoundNumRows = (fThreadNumRows[threadIndex]%fNumRowsPerTile) ? fThreadNumRows[threadIndex]%fNumRowsPerTile : fNumRowsPerTile;
     688        bool CompressBuffer(const CompressionTarget& target)
     689        {
     690            //compress the buffer
     691            uint64_t compressed_size = compressBuffer(target.target.target.get()->get(), target.src.get()->get(), target.num_rows);
     692
     693            //post the result to the writing queue
     694            //get a copy so that it becomes non-const
     695            WriteTarget wt;
     696            wt.tile_num = target.target.tile_num;
     697            wt.size     = compressed_size;
     698            wt.target   = target.target.target;
     699
     700            fWriteToDiskQueue.post(wt);
     701            return true;
     702        }
     703
     704        bool WriteBufferToDisk(const WriteTarget& target)
     705        {
     706            //is this the tile we're supposed to write ?
     707            if (target.tile_num != fLatestWrittenTile+1)
     708                return false;
     709
     710            fLatestWrittenTile++;
     711
     712            //write the buffer to disk.
     713            writeCompressedDataToDisk(target.target.get()->get(), target.size);
     714
     715            return true;
     716        }
     717
     718        //src cannot be const, as applySMOOTHING is done in place
     719        uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows)
     720        {
     721            uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
    687722            uint32_t offset=0;
    688             uint32_t currentCatalogRow = (fThreadNumRows[threadIndex]-1)/fNumRowsPerTile;
    689             uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry
     723            uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile;
     724
     725            //skip the checksum reserved area
     726            dest += 4;
     727
     728            //skip the 'TILE' marker and tile size entry
     729            uint64_t compressedOffset = sizeof(TileHeader);
    690730
    691731            //now compress each column one by one by calling compression on arrays
     
    709749                    {
    710750                        case zfits::kFactRaw:
    711                                 compressedOffset += compressUNCOMPRESSED(&(fCompressedBuffer[threadIndex][compressedOffset]),
    712                                                                          &(fTransposedBuffer[threadIndex][offset]),
     751                                compressedOffset += compressUNCOMPRESSED(dest + compressedOffset,
     752                                                                         src  + offset,
    713753                                                                         thisRoundNumRows,
    714754                                                                         fRealColumns[i].col.size,
     
    716756                        break;
    717757                        case zfits::kFactSmoothing:
    718                                 applySMOOTHING(&(fCompressedBuffer[threadIndex][compressedOffset]),
    719                                                &(fTransposedBuffer[threadIndex][offset]),
     758                                applySMOOTHING(dest + compressedOffset,
     759                                               src  + offset,
    720760                                               thisRoundNumRows,
    721761                                               fRealColumns[i].col.size,
     
    724764                        case zfits::kFactHuffman16:
    725765                            if (head.ordering == zfits::kOrderByCol)
    726                                 compressedOffset += compressHUFFMAN(&(fCompressedBuffer[threadIndex][compressedOffset]),
    727                                                                     &(fTransposedBuffer[threadIndex][offset]),
     766                                compressedOffset += compressHUFFMAN(dest + compressedOffset,
     767                                                                    src  + offset,
    728768                                                                    thisRoundNumRows,
    729769                                                                    fRealColumns[i].col.size,
    730770                                                                    fRealColumns[i].col.num);
    731771                            else
    732                                 compressedOffset += compressHUFFMAN(&(fCompressedBuffer[threadIndex][compressedOffset]),
    733                                                                     &(fTransposedBuffer[threadIndex][offset]),
     772                                compressedOffset += compressHUFFMAN(dest + compressedOffset,
     773                                                                    src  + offset,
    734774                                                                    fRealColumns[i].col.num,
    735775                                                                    fRealColumns[i].col.size,
     
    746786                    compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size())
    747787                {//if so set flag and redo it uncompressed
    748                     //cout << "REDOING UNCOMPRESSED" << endl;
     788                    cout << "REDOING UNCOMPRESSED" << endl;
    749789                    compressedOffset = previousOffset + sizeof(BlockHeader) + 1;
    750                     compressedOffset += compressUNCOMPRESSED(&(fCompressedBuffer[threadIndex][compressedOffset]), &(fTransposedBuffer[threadIndex][offset]), thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
     790                    compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
    751791                    BlockHeader he;
    752792                    he.size = compressedOffset - previousOffset;
    753793                    he.numProcs = 1;
    754794                    he.ordering = zfits::kOrderByRow;
    755                     memcpy(&(fCompressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader));
    756                     fCompressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = zfits::kFactRaw;
     795                    memcpy(dest + previousOffset, (char*)(&he), sizeof(BlockHeader));
     796                    dest[previousOffset+sizeof(BlockHeader)] = zfits::kFactRaw;
    757797                    offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
    758                    fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
    759                    continue;
     798                    fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
     799                    continue;
    760800                }
     801
    761802                head.size = compressedOffset - previousOffset;
    762                 memcpy(&(fCompressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader));
    763                 memcpy(&(fCompressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size());
    764 
    765                  offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
     803                memcpy(dest + previousOffset, (char*)(&head), sizeof(BlockHeader));
     804                memcpy(dest + previousOffset+sizeof(BlockHeader), sequence.data(), sizeof(uint16_t)*sequence.size());
     805
     806                offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
    766807                fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
    767808            }
    768809
    769             TileHeader tHead(thisRoundNumRows, compressedOffset);
    770             memcpy(fCompressedBuffer[threadIndex], &tHead, sizeof(TileHeader));
     810            TileHeader tile_head(thisRoundNumRows, compressedOffset);
     811            memcpy(dest, &tile_head, sizeof(TileHeader));
     812
    771813            return compressedOffset;
    772814        }
    773815
    774         void copyTransposeTile(uint32_t index)
     816        void copyTransposeTile(const char* src, char* dest)//uint32_t index)
    775817        {
    776818            uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
    777819
    778820            //copy the tile and transpose it
    779             uint32_t offset = 0;
    780821            for (uint32_t i=0;i<fRealColumns.size();i++)
    781822            {
     
    785826                        for (uint32_t k=0;k<thisRoundNumRows;k++)
    786827                        {//regular, "semi-transposed" copy
    787                             memcpy(&(fTransposedBuffer[index][offset]), &fBuffer[k*fRealRowWidth + fRealColumns[i].col.offset], fRealColumns[i].col.size*fRealColumns[i].col.num);
    788                             offset += fRealColumns[i].col.size*fRealColumns[i].col.num;
     828                            memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
     829                            dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
    789830                        }
    790831                    break;
     
    794835                            for (uint32_t k=0;k<thisRoundNumRows;k++)
    795836                            {//transposed copy
    796                                 memcpy(&(fTransposedBuffer[index][offset]), &fBuffer[k*fRealRowWidth + fRealColumns[i].col.offset + fRealColumns[i].col.size*j], fRealColumns[i].col.size);
    797                                 offset += fRealColumns[i].col.size;
     837                                memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
     838                                dest += fRealColumns[i].col.size;
    798839                            }
    799840                    break;
    800841                    default:
    801842                            cout << "Error: unknown column ordering: " << fRealColumns[i].head.ordering << endl;
    802 
    803843                };
    804844            }
     
    844884        }
    845885
    846         uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
    847         {
    848             uint32_t colWidth = numRowElems;
    849             for (int j=colWidth*numRows-1;j>1;j--)
    850                 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
    851             //call the huffman transposed
    852             return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);
    853         }
    854 
    855886        uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
    856887        {
     
    873904        uint32_t        fNumRowsPerTile;
    874905
    875 
    876906        //thread related stuff
    877         uint32_t          fNumThreads;    ///< The number of threads that will be used to compress
    878         uint32_t          fThreadIndex;   ///< A variable to assign threads indices
    879         vector<pthread_t> fThread;        ///< The thread handler of the compressor
    880         vector<uint32_t>  fThreadNumRows; ///< Total number of rows for thread to compresswww.;wwwwww
    881         vector<uint32_t>  fThreadStatus;  ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty
    882         int32_t           fThreadLooper;      ///< Which thread will deal with the upcoming bunch of data ?
    883         pthread_mutex_t   fMutex;             ///< mutex for compressing threads
     907        vector<Queue<CompressionTarget>> fCompressionQueues;
     908        Queue<WriteTarget>               fWriteToDiskQueue;
     909
     910        //thread related stuff
     911        uint32_t          fNumQueues;    ///< The number of threads that will be used to compress
     912        uint32_t          fQueueLooper;
     913        int32_t           fLatestWrittenTile;
    884914
    885915        struct CatalogEntry
     
    890920        } __attribute__((__packed__));
    891921
    892        // typedef pair<int64_t, int64_t> CatalogEntry;
    893922        typedef vector<CatalogEntry>   CatalogRow;
    894923        typedef vector<CatalogRow>     CatalogType;
     
    897926        Checksum             fRawSum;
    898927        off_t                fCatalogOffset;
    899         vector<char>         fBufferVector;
     928        uint32_t             fRealRowWidth;
     929
    900930        vector<char>         fRawSumBuffer;
    901         vector<vector<char>> fTransposedBufferVector;
    902         vector<vector<char>> fCompressedBufferVector;
    903         char*                fBuffer;
    904         vector<char*>        fTransposedBuffer;
    905         vector<char*>        fCompressedBuffer;
    906         uint32_t             fRealRowWidth;
     931        MemoryManager        fMemPool;
     932        uint64_t             fMaxUsableMem;
     933
     934        shared_ptr<MemoryChunk> fSmartBuffer;
     935        char*                   fBuffer;
     936
     937
    907938        struct CompressedColumn
    908939        {
     
    917948        vector<CompressedColumn> fRealColumns;
    918949
    919         //thread states. Not all used, but they do not hurt
    920         static const uint32_t       _THREAD_WAIT_ = 0; ///< Thread doing nothing
    921         static const uint32_t   _THREAD_COMPRESS_ = 1; ///< Thread working, compressing
    922         static const uint32_t _THREAD_DECOMPRESS_ = 2; ///< Thread working, decompressing
    923         static const uint32_t      _THREAD_WRITE_ = 3; ///< Thread writing data to disk
    924         static const uint32_t       _THREAD_READ_ = 4; ///< Thread reading data from disk
    925         static const uint32_t       _THREAD_EXIT_ = 5; ///< Thread exiting
    926 
    927950};
    928951
Note: See TracChangeset for help on using the changeset viewer.