Changeset 17219


Ignore:
Timestamp:
10/15/13 15:05:54 (11 years ago)
Author:
lyard
Message:
draft zofits working with Queue and MemoryManager
Location:
trunk
Files:
2 added
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/FACT++/src/queue.h

    r16562 r17219  
    1010{
    1111    size_t fSize;                 // Only necessary for before C++11
     12    bool   fSort;                 // Sort the list before processing
    1213
    1314    std::list<T> fList;
    1415
    15     std::mutex fMutex;        // Mutex needed for the conditional
     16    std::mutex fMutex;             // Mutex needed for the conditional
    1617    std::condition_variable fCond; // Conditional
    1718
     
    2223        kStop,
    2324        kAbort,
     25        kTrigger
    2426    };
    2527
    2628    state_t fState;               // Stop signal for the thread
    2729
    28     typedef std::function<void(const T &)> callback;
     30    typedef std::function<bool(const T &)> callback;
    2931    callback fCallback;       // Callback function called by the thread
    3032
     
    3436    {
    3537        std::unique_lock<std::mutex> lock(fMutex);
     38
     39        // No filling allowed by default (the queue is
     40        // always processed until it is empty)
     41        size_t allowed = 0;
    3642
    3743        while (1)
    3844        {
    39             while (fList.empty() && fState==kRun)
     45            while (fSize==allowed && fState==kRun)
    4046                fCond.wait(lock);
    4147
     48            // Check if the State flag has been changed
    4249            if (fState==kAbort)
    4350                break;
     
    4653                break;
    4754
    48             const T &val = fList.front();
     55            // If thread got just woken up, move back the state to kRun
     56            if (fState == kTrigger)
     57                fState = kRun;
     58
     59            // Could have been a fState==kTrigger case
     60            if (fList.empty())
     61                continue;
     62
     63            // During the unlocked state, fSize might change.
     64            // The current size of the queue needs to be saved.
     65            allowed = fSize;
     66
     67            // get the first entry from the (sorted) list
     68            const auto it = fSort ? min_element(fList.begin(), fList.end()) : fList.begin();
    4969
    5070            // Theoretically, we can loose a signal here, but this is
     
    5272            lock.unlock();
    5373
    54             if (fCallback)
    55                 fCallback(val);
     74            // If the first event in the queue could not be processed,
     75            // no further processing makes sense until a new event has
     76            // been posted (or the number of events in the queue has
     77            // changed)  [allowed>0], in the case processing was
     78            // successfull [alloed==0], the next event will be processed
     79            // immediately.
     80            if (!fCallback || !fCallback(*it))
     81                allowed = 0;
    5682
    5783            lock.lock();
    5884
    59             fList.pop_front();
     85            // Whenever an event was successfully processed, allowed
     86            // is larger than zero and thus the event will be popped
     87            if (allowed==0)
     88                continue;
     89
     90            if (fSort)
     91                fList.erase(it);
     92            else
     93                fList.pop_front() ;
     94
    6095            fSize--;
     96            allowed--;
     97
    6198        }
    6299
     
    68105
    69106public:
    70     Queue(const callback &f) : fSize(0), fState(kIdle), fCallback(f)
    71     {
    72         start();
    73     }
     107    Queue(const callback &f, bool sort=false, bool startup=true) : fSize(0), fSort(sort), fState(kIdle), fCallback(f)
     108    {
     109        if (startup)
     110            start();
     111    }
     112
     113    Queue(const Queue<T>& q) : fSize(0), fSort(q.fSort), fState(kIdle), fCallback(q.fCallback)
     114    {
     115    }
     116
     117    Queue<T>& operator = (const Queue<T>& q)
     118    {
     119        fSize     = 0;
     120        fSort     = q.fSort;
     121        fState    = kIdle;
     122        fCallback = q.fCallback;
     123        return *this;
     124    }
     125
    74126    ~Queue()
    75127    {
     
    133185    {
    134186        const std::lock_guard<std::mutex> lock(fMutex);
     187
    135188        if (fState==kIdle)
    136189            return false;
     
    139192        fSize++;
    140193
     194        fCond.notify_one();
     195
     196        return true;
     197    }
     198
     199    bool notify()
     200    {
     201        const std::lock_guard<std::mutex> lock(fMutex);
     202        if (fState!=kRun)
     203            return false;
     204
     205        fState = kTrigger;
    141206        fCond.notify_one();
    142207
  • trunk/Mars/mcore/zofits.h

    r17216 r17219  
    77
    88#include "ofits.h"
     9#include "Queue.h"
     10#include "MemoryManager.h"
    911
    1012#ifndef __MARS__
     
    6466
    6567
     68        struct WriteTarget
     69        {
     70            bool operator < (const WriteTarget& other)
     71            {
     72                tile_num < other.tile_num;
     73            }
     74            uint32_t tile_num;
     75            uint32_t size;
     76            shared_ptr<MemoryChunk> target;
     77        };
     78
     79        struct CompressionTarget
     80        {
     81            bool operator < (const CompressionTarget& other)
     82            {
     83                return target < other.target;
     84            }
     85            shared_ptr<MemoryChunk> src;
     86            WriteTarget             target;
     87            uint32_t                num_rows;
     88        };
     89
     90
    6691        //constructors
    6792        zofits(uint32_t numTiles=1000,
    68                uint32_t rowPerTile=100) : ofits()
    69         {
    70             InitMemberVariables(numTiles, rowPerTile);
     93               uint32_t rowPerTile=100,
     94               uint64_t maxUsableMem=0) : ofits(),
     95                                          fMemPool(0, maxUsableMem),
     96                                          fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true)
     97        {
     98            InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
    7199            SetNumWorkingThreads(1);
    72100        }
     
    74102        zofits(const char* fname,
    75103               uint32_t numTiles=1000,
    76                uint32_t rowPerTile=100) : ofits(fname)
    77         {
    78             InitMemberVariables(numTiles, rowPerTile);
     104               uint32_t rowPerTile=100,
     105               uint64_t maxUsableMem=0) : ofits(fname),
     106                                          fMemPool(0, maxUsableMem),
     107                                          fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true)
     108        {
     109            InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
    79110            SetNumWorkingThreads(1);
    80111        }
     
    85116
    86117        //initialization of member variables
    87         void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0)
     118        void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0, uint64_t maxUsableMem=0)
    88119        {
    89120            fCheckOffset  = 0;
    90             fThreadLooper = 0;
    91121
    92122            fNumTiles       = nt;
    93123            fNumRowsPerTile = rpt;
    94124
    95             fNumThreads  = 1;
    96             fThreadIndex = 0;   ///< A variable to assign threads indices
     125            fNumQueues   = 0;
     126            fQueueLooper = 0;
    97127
    98128            fBuffer       = NULL;
     
    102132            fStartCellsOffset = -1;
    103133            fDataOffset       = -1;
     134
     135            fMaxUsableMem = maxUsableMem;
    104136        }
    105137
     
    144176        bool WriteTableHeader(const char* name="DATA")
    145177        {
     178            if (!reallocateBuffers())
     179                throw ("While allocating memory: apparently there not as much free memory as advertized...");
     180
    146181            ofits::WriteTableHeader(name);
     182
     183            //start the compression queues
     184            for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
     185                it->start();
     186
     187            //mark that no tile has been written so far
     188            fLatestWrittenTile = -1;
    147189
    148190            if (IsOffsetCalibrated())
     
    208250
    209251            fRawSum.reset();
    210 
    211             //start the compression threads
    212             pthread_mutex_init(&fMutex, NULL);
    213 
    214             fThreadIndex = 0;
    215             for (uint32_t i=0;i<fNumThreads;i++)
    216                 pthread_create(&(fThread[i]), NULL, threadFunction, this);
    217 
    218             //wait for all threads to have started
    219             while (fNumThreads != fThreadIndex)
    220                 usleep(1000);
    221 
    222             //set the writing fence to the last thread (so that the first one can start writing right away)
    223             fThreadIndex = fNumThreads-1;
    224252        }
    225253
     
    391419
    392420            if (fTable.num_rows % fNumRowsPerTile == 0)
    393             {//give a new tile to compress to a thread
    394                 while (fThreadStatus[fThreadLooper] == _THREAD_COMPRESS_)
    395                     usleep(100000);
    396 
    397                 copyTransposeTile(fThreadLooper);
    398 
    399                 while (fThreadStatus[fThreadLooper] != _THREAD_WAIT_)
    400                     usleep(100000);
    401 
    402                 fThreadNumRows[fThreadLooper] = fTable.num_rows;
    403                 fThreadStatus[fThreadLooper] = _THREAD_COMPRESS_;
    404                 fThreadLooper = (fThreadLooper+1)%fNumThreads;
     421            {
     422                CompressionTarget compress_target;
     423                SetNextCompression(compress_target);
     424
     425                if (!fCompressionQueues[fQueueLooper].post(compress_target))
     426                    throw runtime_error("I could not post this buffer. This does not make sense...");
     427
     428                fQueueLooper = (fQueueLooper+1)%fNumQueues;
    405429            }
    406430
     
    415439        }
    416440
     441        void SetNextCompression(CompressionTarget& target)
     442        {
     443            shared_ptr<MemoryChunk> transposed_data = fMemPool.malloc();
     444
     445            copyTransposeTile(fBuffer, transposed_data.get()->get());
     446
     447            WriteTarget write_target;
     448            write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
     449            write_target.size     = 0;
     450            write_target.target   = fMemPool.malloc();
     451
     452            target.src      = transposed_data;
     453            target.target   = write_target;
     454            target.num_rows = fTable.num_rows;
     455        }
     456
    417457        bool close()
    418458        {
     
    420460                return false;
    421461
    422             //wait for compression threads to finish
    423             for (auto it=fThreadStatus.begin(); it!= fThreadStatus.end(); it++)
    424                 while (*it != _THREAD_WAIT_)
    425                     usleep(100000);
    426 
    427             for (auto it=fThreadStatus.begin(); it!= fThreadStatus.end(); it++)
    428                 *it = _THREAD_EXIT_;
    429 
    430             for (auto it=fThread.begin(); it!= fThread.end(); it++)
    431                 pthread_join(*it, NULL);
    432 
    433             pthread_mutex_destroy(&fMutex);
     462            for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
     463                it->wait();
     464
     465            fWriteToDiskQueue.wait();
    434466
    435467            if (fTable.num_rows%fNumRowsPerTile != 0)
    436468            {
    437                 copyTransposeTile(0);
    438                 fThreadNumRows[0] = fTable.num_rows;
    439                 uint32_t numBytes = compressBuffer(0);
    440                 writeCompressedDataToDisk(0, numBytes);
     469                CompressionTarget compress_target;
     470                SetNextCompression(compress_target);
     471
     472                uint64_t size_to_write = CompressBuffer(compress_target);
     473
     474                WriteTarget write_target;
     475                write_target.size     = size_to_write;
     476                write_target.target   = compress_target.target.target;
     477                write_target.tile_num = compress_target.target.tile_num;
     478
     479                if (!WriteBufferToDisk(write_target))
     480                    throw runtime_error("Something went wrong while writing the last tile...");
    441481            }
    442482
     
    449489            uint64_t heap_offset = fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2;
    450490            SetInt("ZHEAPPTR", heap_offset);
    451 //            SetInt("THEAP"   , heap_offset);0
    452491
    453492            const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
     
    547586            SetStr(strKey.str(), strVal.str(), strCom.str());
    548587
    549             return reallocateBuffers();
     588            return true;
    550589        }
    551590
     
    571610            }
    572611
    573             fNumThreads = num;
    574             fThreadStatus.resize(num);
    575             fThread.resize(num);
    576             fThreadNumRows.resize(num);
    577             for (uint32_t i=0;i<num;i++)
    578             {
    579                 fThreadNumRows[i] = 0;
    580                 fThreadStatus[i] = _THREAD_WAIT_;
    581             }
    582 
    583             return reallocateBuffers();
    584         }
     612            if (fCompressionQueues.size() == num)
     613                return true;
     614
     615            //cannot be const, as resize does not want it that way
     616            Queue<CompressionTarget> queue(bind(&zofits::CompressBuffer, this, placeholders::_1), false, false);
     617
     618            //shrink
     619            if (num < fCompressionQueues.size())
     620            {
     621                fCompressionQueues.resize(num, queue);
     622                return true;
     623            }
     624
     625            //grow
     626            fCompressionQueues.resize(num, queue);
     627
     628            fNumQueues   = num;
     629            fQueueLooper = 0;
     630
     631            return true;
     632        }
     633
    585634
    586635    private:
    587636
    588 
    589637        bool reallocateBuffers()
    590638        {
    591             fBufferVector.resize(fRealRowWidth*fNumRowsPerTile + 8); //8 more for checksuming
    592             memset(fBufferVector.data(), 0, 4);
    593             fBuffer = fBufferVector.data()+4;
     639            size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming;
     640            fMemPool.setChunkSize(chunk_size);
     641
     642            fSmartBuffer = fMemPool.malloc();
     643            fBuffer = fSmartBuffer.get()->get();
     644//            memset(fBuffer, 0, 4);
     645//            fBuffer += 4;
    594646
    595647            fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
    596 
    597 
    598             fTransposedBufferVector.resize(fNumThreads);
    599             fCompressedBufferVector.resize(fNumThreads);
    600             fTransposedBuffer.resize(fNumThreads);
    601             fCompressedBuffer.resize(fNumThreads);
    602             for (uint32_t i=0;i<fNumThreads;i++)
    603             {
    604                 fTransposedBufferVector[i].resize(fRealRowWidth*fNumRowsPerTile);
    605                 fCompressedBufferVector[i].resize(fRealRowWidth*fNumRowsPerTile + fRealColumns.size() + sizeof(TileHeader) + 8);
    606                 memset(fCompressedBufferVector[i].data(), 0, 4);
    607                 TileHeader tileHeader;
    608                 memcpy(fCompressedBufferVector[i].data()+4, &tileHeader, sizeof(TileHeader));
    609                 fTransposedBuffer[i] = fTransposedBufferVector[i].data();
    610                 fCompressedBuffer[i] = fCompressedBufferVector[i].data()+4;
    611             }
    612648
    613649            //give the catalog enough space
     
    622658        }
    623659
    624         bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
    625         {
    626             char* checkSumPointer = fCompressedBuffer[threadID];
     660        bool writeCompressedDataToDisk(char* src, uint32_t sizeToWrite)
     661        {
     662            char* checkSumPointer = src+4;
    627663            int32_t extraBytes = 0;
    628664            uint32_t sizeToChecksum = sizeToWrite;
     
    645681            fCheckOffset = (4 - extraBytes)%4;
    646682            //write data to disk
    647             write(fCompressedBuffer[threadID], sizeToWrite);
     683            write(src+4, sizeToWrite);
     684
    648685            return good();
    649686        }
    650687
    651         static void* threadFunction(void* context)
    652         {
    653             zofits* myself = static_cast<zofits*>(context);
    654 
    655             uint32_t myID = 0;
    656             pthread_mutex_lock(&(myself->fMutex));
    657             myID = myself->fThreadIndex++;
    658             pthread_mutex_unlock(&(myself->fMutex));
    659             uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->fNumThreads-1 : myID-1;
    660 
    661             while (myself->fThreadStatus[myID] != _THREAD_EXIT_)
    662             {
    663                 while (myself->fThreadStatus[myID] == _THREAD_WAIT_)
    664                     usleep(100000);
    665 
    666                 if (myself->fThreadStatus[myID] != _THREAD_COMPRESS_)
    667                     continue;
    668                 uint32_t numBytes = myself->compressBuffer(myID);
    669                 myself->fThreadStatus[myID] = _THREAD_WRITE_;
    670 
    671                 //wait for the previous data to be written
    672                 while (myself->fThreadIndex != threadToWaitForBeforeWriting)
    673                     usleep(1000);
    674                 //do the actual writing to disk
    675                 pthread_mutex_lock(&(myself->fMutex));
    676                 myself->writeCompressedDataToDisk(myID, numBytes);
    677                 myself->fThreadIndex = myID;
    678                 pthread_mutex_unlock(&(myself->fMutex));
    679                 myself->fThreadStatus[myID] = _THREAD_WAIT_;
    680             }
    681             return NULL;
    682         }
    683 
    684         uint64_t compressBuffer(uint32_t threadIndex)
    685         {
    686             uint32_t thisRoundNumRows = (fThreadNumRows[threadIndex]%fNumRowsPerTile) ? fThreadNumRows[threadIndex]%fNumRowsPerTile : fNumRowsPerTile;
     688        bool CompressBuffer(const CompressionTarget& target)
     689        {
     690            //compress the buffer
     691            uint64_t compressed_size = compressBuffer(target.target.target.get()->get(), target.src.get()->get(), target.num_rows);
     692
     693            //post the result to the writing queue
     694            //get a copy so that it becomes non-const
     695            WriteTarget wt;
     696            wt.tile_num = target.target.tile_num;
     697            wt.size     = compressed_size;
     698            wt.target   = target.target.target;
     699
     700            fWriteToDiskQueue.post(wt);
     701            return true;
     702        }
     703
     704        bool WriteBufferToDisk(const WriteTarget& target)
     705        {
     706            //is this the tile we're supposed to write ?
     707            if (target.tile_num != fLatestWrittenTile+1)
     708                return false;
     709
     710            fLatestWrittenTile++;
     711
     712            //write the buffer to disk.
     713            writeCompressedDataToDisk(target.target.get()->get(), target.size);
     714
     715            return true;
     716        }
     717
     718        //src cannot be const, as applySMOOTHING is done in place
     719        uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows)
     720        {
     721            uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
    687722            uint32_t offset=0;
    688             uint32_t currentCatalogRow = (fThreadNumRows[threadIndex]-1)/fNumRowsPerTile;
    689             uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry
     723            uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile;
     724
     725            //skip the checksum reserved area
     726            dest += 4;
     727
     728            //skip the 'TILE' marker and tile size entry
     729            uint64_t compressedOffset = sizeof(TileHeader);
    690730
    691731            //now compress each column one by one by calling compression on arrays
     
    709749                    {
    710750                        case zfits::kFactRaw:
    711                                 compressedOffset += compressUNCOMPRESSED(&(fCompressedBuffer[threadIndex][compressedOffset]),
    712                                                                          &(fTransposedBuffer[threadIndex][offset]),
     751                                compressedOffset += compressUNCOMPRESSED(dest + compressedOffset,
     752                                                                         src  + offset,
    713753                                                                         thisRoundNumRows,
    714754                                                                         fRealColumns[i].col.size,
     
    716756                        break;
    717757                        case zfits::kFactSmoothing:
    718                                 applySMOOTHING(&(fCompressedBuffer[threadIndex][compressedOffset]),
    719                                                &(fTransposedBuffer[threadIndex][offset]),
     758                                applySMOOTHING(dest + compressedOffset,
     759                                               src  + offset,
    720760                                               thisRoundNumRows,
    721761                                               fRealColumns[i].col.size,
     
    724764                        case zfits::kFactHuffman16:
    725765                            if (head.ordering == zfits::kOrderByCol)
    726                                 compressedOffset += compressHUFFMAN(&(fCompressedBuffer[threadIndex][compressedOffset]),
    727                                                                     &(fTransposedBuffer[threadIndex][offset]),
     766                                compressedOffset += compressHUFFMAN(dest + compressedOffset,
     767                                                                    src  + offset,
    728768                                                                    thisRoundNumRows,
    729769                                                                    fRealColumns[i].col.size,
    730770                                                                    fRealColumns[i].col.num);
    731771                            else
    732                                 compressedOffset += compressHUFFMAN(&(fCompressedBuffer[threadIndex][compressedOffset]),
    733                                                                     &(fTransposedBuffer[threadIndex][offset]),
     772                                compressedOffset += compressHUFFMAN(dest + compressedOffset,
     773                                                                    src  + offset,
    734774                                                                    fRealColumns[i].col.num,
    735775                                                                    fRealColumns[i].col.size,
     
    746786                    compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size())
    747787                {//if so set flag and redo it uncompressed
    748                     //cout << "REDOING UNCOMPRESSED" << endl;
     788                    cout << "REDOING UNCOMPRESSED" << endl;
    749789                    compressedOffset = previousOffset + sizeof(BlockHeader) + 1;
    750                     compressedOffset += compressUNCOMPRESSED(&(fCompressedBuffer[threadIndex][compressedOffset]), &(fTransposedBuffer[threadIndex][offset]), thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
     790                    compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
    751791                    BlockHeader he;
    752792                    he.size = compressedOffset - previousOffset;
    753793                    he.numProcs = 1;
    754794                    he.ordering = zfits::kOrderByRow;
    755                     memcpy(&(fCompressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader));
    756                     fCompressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = zfits::kFactRaw;
     795                    memcpy(dest + previousOffset, (char*)(&he), sizeof(BlockHeader));
     796                    dest[previousOffset+sizeof(BlockHeader)] = zfits::kFactRaw;
    757797                    offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
    758                    fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
    759                    continue;
     798                    fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
     799                    continue;
    760800                }
     801
    761802                head.size = compressedOffset - previousOffset;
    762                 memcpy(&(fCompressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader));
    763                 memcpy(&(fCompressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size());
    764 
    765                  offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
     803                memcpy(dest + previousOffset, (char*)(&head), sizeof(BlockHeader));
     804                memcpy(dest + previousOffset+sizeof(BlockHeader), sequence.data(), sizeof(uint16_t)*sequence.size());
     805
     806                offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
    766807                fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
    767808            }
    768809
    769             TileHeader tHead(thisRoundNumRows, compressedOffset);
    770             memcpy(fCompressedBuffer[threadIndex], &tHead, sizeof(TileHeader));
     810            TileHeader tile_head(thisRoundNumRows, compressedOffset);
     811            memcpy(dest, &tile_head, sizeof(TileHeader));
     812
    771813            return compressedOffset;
    772814        }
    773815
    774         void copyTransposeTile(uint32_t index)
     816        void copyTransposeTile(const char* src, char* dest)//uint32_t index)
    775817        {
    776818            uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
    777819
    778820            //copy the tile and transpose it
    779             uint32_t offset = 0;
    780821            for (uint32_t i=0;i<fRealColumns.size();i++)
    781822            {
     
    785826                        for (uint32_t k=0;k<thisRoundNumRows;k++)
    786827                        {//regular, "semi-transposed" copy
    787                             memcpy(&(fTransposedBuffer[index][offset]), &fBuffer[k*fRealRowWidth + fRealColumns[i].col.offset], fRealColumns[i].col.size*fRealColumns[i].col.num);
    788                             offset += fRealColumns[i].col.size*fRealColumns[i].col.num;
     828                            memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
     829                            dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
    789830                        }
    790831                    break;
     
    794835                            for (uint32_t k=0;k<thisRoundNumRows;k++)
    795836                            {//transposed copy
    796                                 memcpy(&(fTransposedBuffer[index][offset]), &fBuffer[k*fRealRowWidth + fRealColumns[i].col.offset + fRealColumns[i].col.size*j], fRealColumns[i].col.size);
    797                                 offset += fRealColumns[i].col.size;
     837                                memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
     838                                dest += fRealColumns[i].col.size;
    798839                            }
    799840                    break;
    800841                    default:
    801842                            cout << "Error: unknown column ordering: " << fRealColumns[i].head.ordering << endl;
    802 
    803843                };
    804844            }
     
    844884        }
    845885
    846         uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
    847         {
    848             uint32_t colWidth = numRowElems;
    849             for (int j=colWidth*numRows-1;j>1;j--)
    850                 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
    851             //call the huffman transposed
    852             return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);
    853         }
    854 
    855886        uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
    856887        {
     
    873904        uint32_t        fNumRowsPerTile;
    874905
    875 
    876906        //thread related stuff
    877         uint32_t          fNumThreads;    ///< The number of threads that will be used to compress
    878         uint32_t          fThreadIndex;   ///< A variable to assign threads indices
    879         vector<pthread_t> fThread;        ///< The thread handler of the compressor
    880         vector<uint32_t>  fThreadNumRows; ///< Total number of rows for thread to compresswww.;wwwwww
    881         vector<uint32_t>  fThreadStatus;  ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty
    882         int32_t           fThreadLooper;      ///< Which thread will deal with the upcoming bunch of data ?
    883         pthread_mutex_t   fMutex;             ///< mutex for compressing threads
     907        vector<Queue<CompressionTarget>> fCompressionQueues;
     908        Queue<WriteTarget>               fWriteToDiskQueue;
     909
     910        //thread related stuff
     911        uint32_t          fNumQueues;    ///< The number of threads that will be used to compress
     912        uint32_t          fQueueLooper;
     913        int32_t           fLatestWrittenTile;
    884914
    885915        struct CatalogEntry
     
    890920        } __attribute__((__packed__));
    891921
    892        // typedef pair<int64_t, int64_t> CatalogEntry;
    893922        typedef vector<CatalogEntry>   CatalogRow;
    894923        typedef vector<CatalogRow>     CatalogType;
     
    897926        Checksum             fRawSum;
    898927        off_t                fCatalogOffset;
    899         vector<char>         fBufferVector;
     928        uint32_t             fRealRowWidth;
     929
    900930        vector<char>         fRawSumBuffer;
    901         vector<vector<char>> fTransposedBufferVector;
    902         vector<vector<char>> fCompressedBufferVector;
    903         char*                fBuffer;
    904         vector<char*>        fTransposedBuffer;
    905         vector<char*>        fCompressedBuffer;
    906         uint32_t             fRealRowWidth;
     931        MemoryManager        fMemPool;
     932        uint64_t             fMaxUsableMem;
     933
     934        shared_ptr<MemoryChunk> fSmartBuffer;
     935        char*                   fBuffer;
     936
     937
    907938        struct CompressedColumn
    908939        {
     
    917948        vector<CompressedColumn> fRealColumns;
    918949
    919         //thread states. Not all used, but they do not hurt
    920         static const uint32_t       _THREAD_WAIT_ = 0; ///< Thread doing nothing
    921         static const uint32_t   _THREAD_COMPRESS_ = 1; ///< Thread working, compressing
    922         static const uint32_t _THREAD_DECOMPRESS_ = 2; ///< Thread working, decompressing
    923         static const uint32_t      _THREAD_WRITE_ = 3; ///< Thread writing data to disk
    924         static const uint32_t       _THREAD_READ_ = 4; ///< Thread reading data from disk
    925         static const uint32_t       _THREAD_EXIT_ = 5; ///< Thread exiting
    926 
    927950};
    928951
Note: See TracChangeset for help on using the changeset viewer.