Changeset 17219
- Timestamp:
- 10/15/13 15:05:54 (11 years ago)
- Location:
- trunk
- Files:
-
- 2 added
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/FACT++/src/queue.h
r16562 r17219 10 10 { 11 11 size_t fSize; // Only necessary for before C++11 12 bool fSort; // Sort the list before processing 12 13 13 14 std::list<T> fList; 14 15 15 std::mutex fMutex; // Mutex needed for the conditional16 std::mutex fMutex; // Mutex needed for the conditional 16 17 std::condition_variable fCond; // Conditional 17 18 … … 22 23 kStop, 23 24 kAbort, 25 kTrigger 24 26 }; 25 27 26 28 state_t fState; // Stop signal for the thread 27 29 28 typedef std::function< void(const T &)> callback;30 typedef std::function<bool(const T &)> callback; 29 31 callback fCallback; // Callback function called by the thread 30 32 … … 34 36 { 35 37 std::unique_lock<std::mutex> lock(fMutex); 38 39 // No filling allowed by default (the queue is 40 // always processed until it is empty) 41 size_t allowed = 0; 36 42 37 43 while (1) 38 44 { 39 while (f List.empty()&& fState==kRun)45 while (fSize==allowed && fState==kRun) 40 46 fCond.wait(lock); 41 47 48 // Check if the State flag has been changed 42 49 if (fState==kAbort) 43 50 break; … … 46 53 break; 47 54 48 const T &val = fList.front(); 55 // If thread got just woken up, move back the state to kRun 56 if (fState == kTrigger) 57 fState = kRun; 58 59 // Could have been a fState==kTrigger case 60 if (fList.empty()) 61 continue; 62 63 // During the unlocked state, fSize might change. 64 // The current size of the queue needs to be saved. 65 allowed = fSize; 66 67 // get the first entry from the (sorted) list 68 const auto it = fSort ? min_element(fList.begin(), fList.end()) : fList.begin(); 49 69 50 70 // Theoretically, we can loose a signal here, but this is … … 52 72 lock.unlock(); 53 73 54 if (fCallback) 55 fCallback(val); 74 // If the first event in the queue could not be processed, 75 // no further processing makes sense until a new event has 76 // been posted (or the number of events in the queue has 77 // changed) [allowed>0], in the case processing was 78 // successfull [alloed==0], the next event will be processed 79 // immediately. 80 if (!fCallback || !fCallback(*it)) 81 allowed = 0; 56 82 57 83 lock.lock(); 58 84 59 fList.pop_front(); 85 // Whenever an event was successfully processed, allowed 86 // is larger than zero and thus the event will be popped 87 if (allowed==0) 88 continue; 89 90 if (fSort) 91 fList.erase(it); 92 else 93 fList.pop_front() ; 94 60 95 fSize--; 96 allowed--; 97 61 98 } 62 99 … … 68 105 69 106 public: 70 Queue(const callback &f) : fSize(0), fState(kIdle), fCallback(f) 71 { 72 start(); 73 } 107 Queue(const callback &f, bool sort=false, bool startup=true) : fSize(0), fSort(sort), fState(kIdle), fCallback(f) 108 { 109 if (startup) 110 start(); 111 } 112 113 Queue(const Queue<T>& q) : fSize(0), fSort(q.fSort), fState(kIdle), fCallback(q.fCallback) 114 { 115 } 116 117 Queue<T>& operator = (const Queue<T>& q) 118 { 119 fSize = 0; 120 fSort = q.fSort; 121 fState = kIdle; 122 fCallback = q.fCallback; 123 return *this; 124 } 125 74 126 ~Queue() 75 127 { … … 133 185 { 134 186 const std::lock_guard<std::mutex> lock(fMutex); 187 135 188 if (fState==kIdle) 136 189 return false; … … 139 192 fSize++; 140 193 194 fCond.notify_one(); 195 196 return true; 197 } 198 199 bool notify() 200 { 201 const std::lock_guard<std::mutex> lock(fMutex); 202 if (fState!=kRun) 203 return false; 204 205 fState = kTrigger; 141 206 fCond.notify_one(); 142 207 -
trunk/Mars/mcore/zofits.h
r17216 r17219 7 7 8 8 #include "ofits.h" 9 #include "Queue.h" 10 #include "MemoryManager.h" 9 11 10 12 #ifndef __MARS__ … … 64 66 65 67 68 struct WriteTarget 69 { 70 bool operator < (const WriteTarget& other) 71 { 72 tile_num < other.tile_num; 73 } 74 uint32_t tile_num; 75 uint32_t size; 76 shared_ptr<MemoryChunk> target; 77 }; 78 79 struct CompressionTarget 80 { 81 bool operator < (const CompressionTarget& other) 82 { 83 return target < other.target; 84 } 85 shared_ptr<MemoryChunk> src; 86 WriteTarget target; 87 uint32_t num_rows; 88 }; 89 90 66 91 //constructors 67 92 zofits(uint32_t numTiles=1000, 68 uint32_t rowPerTile=100) : ofits() 69 { 70 InitMemberVariables(numTiles, rowPerTile); 93 uint32_t rowPerTile=100, 94 uint64_t maxUsableMem=0) : ofits(), 95 fMemPool(0, maxUsableMem), 96 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true) 97 { 98 InitMemberVariables(numTiles, rowPerTile, maxUsableMem); 71 99 SetNumWorkingThreads(1); 72 100 } … … 74 102 zofits(const char* fname, 75 103 uint32_t numTiles=1000, 76 uint32_t rowPerTile=100) : ofits(fname) 77 { 78 InitMemberVariables(numTiles, rowPerTile); 104 uint32_t rowPerTile=100, 105 uint64_t maxUsableMem=0) : ofits(fname), 106 fMemPool(0, maxUsableMem), 107 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true) 108 { 109 InitMemberVariables(numTiles, rowPerTile, maxUsableMem); 79 110 SetNumWorkingThreads(1); 80 111 } … … 85 116 86 117 //initialization of member variables 87 void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0 )118 void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0, uint64_t maxUsableMem=0) 88 119 { 89 120 fCheckOffset = 0; 90 fThreadLooper = 0;91 121 92 122 fNumTiles = nt; 93 123 fNumRowsPerTile = rpt; 94 124 95 fNum Threads = 1;96 f ThreadIndex = 0; ///< A variable to assign threads indices125 fNumQueues = 0; 126 fQueueLooper = 0; 97 127 98 128 fBuffer = NULL; … … 102 132 fStartCellsOffset = -1; 103 133 fDataOffset = -1; 134 135 fMaxUsableMem = maxUsableMem; 104 136 } 105 137 … … 144 176 bool WriteTableHeader(const char* name="DATA") 145 177 { 178 if (!reallocateBuffers()) 179 throw ("While allocating memory: apparently there not as much free memory as advertized..."); 180 146 181 ofits::WriteTableHeader(name); 182 183 //start the compression queues 184 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++) 185 it->start(); 186 187 //mark that no tile has been written so far 188 fLatestWrittenTile = -1; 147 189 148 190 if (IsOffsetCalibrated()) … … 208 250 209 251 fRawSum.reset(); 210 211 //start the compression threads212 pthread_mutex_init(&fMutex, NULL);213 214 fThreadIndex = 0;215 for (uint32_t i=0;i<fNumThreads;i++)216 pthread_create(&(fThread[i]), NULL, threadFunction, this);217 218 //wait for all threads to have started219 while (fNumThreads != fThreadIndex)220 usleep(1000);221 222 //set the writing fence to the last thread (so that the first one can start writing right away)223 fThreadIndex = fNumThreads-1;224 252 } 225 253 … … 391 419 392 420 if (fTable.num_rows % fNumRowsPerTile == 0) 393 {//give a new tile to compress to a thread 394 while (fThreadStatus[fThreadLooper] == _THREAD_COMPRESS_) 395 usleep(100000); 396 397 copyTransposeTile(fThreadLooper); 398 399 while (fThreadStatus[fThreadLooper] != _THREAD_WAIT_) 400 usleep(100000); 401 402 fThreadNumRows[fThreadLooper] = fTable.num_rows; 403 fThreadStatus[fThreadLooper] = _THREAD_COMPRESS_; 404 fThreadLooper = (fThreadLooper+1)%fNumThreads; 421 { 422 CompressionTarget compress_target; 423 SetNextCompression(compress_target); 424 425 if (!fCompressionQueues[fQueueLooper].post(compress_target)) 426 throw runtime_error("I could not post this buffer. This does not make sense..."); 427 428 fQueueLooper = (fQueueLooper+1)%fNumQueues; 405 429 } 406 430 … … 415 439 } 416 440 441 void SetNextCompression(CompressionTarget& target) 442 { 443 shared_ptr<MemoryChunk> transposed_data = fMemPool.malloc(); 444 445 copyTransposeTile(fBuffer, transposed_data.get()->get()); 446 447 WriteTarget write_target; 448 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile; 449 write_target.size = 0; 450 write_target.target = fMemPool.malloc(); 451 452 target.src = transposed_data; 453 target.target = write_target; 454 target.num_rows = fTable.num_rows; 455 } 456 417 457 bool close() 418 458 { … … 420 460 return false; 421 461 422 //wait for compression threads to finish 423 for (auto it=fThreadStatus.begin(); it!= fThreadStatus.end(); it++) 424 while (*it != _THREAD_WAIT_) 425 usleep(100000); 426 427 for (auto it=fThreadStatus.begin(); it!= fThreadStatus.end(); it++) 428 *it = _THREAD_EXIT_; 429 430 for (auto it=fThread.begin(); it!= fThread.end(); it++) 431 pthread_join(*it, NULL); 432 433 pthread_mutex_destroy(&fMutex); 462 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++) 463 it->wait(); 464 465 fWriteToDiskQueue.wait(); 434 466 435 467 if (fTable.num_rows%fNumRowsPerTile != 0) 436 468 { 437 copyTransposeTile(0); 438 fThreadNumRows[0] = fTable.num_rows; 439 uint32_t numBytes = compressBuffer(0); 440 writeCompressedDataToDisk(0, numBytes); 469 CompressionTarget compress_target; 470 SetNextCompression(compress_target); 471 472 uint64_t size_to_write = CompressBuffer(compress_target); 473 474 WriteTarget write_target; 475 write_target.size = size_to_write; 476 write_target.target = compress_target.target.target; 477 write_target.tile_num = compress_target.target.tile_num; 478 479 if (!WriteBufferToDisk(write_target)) 480 throw runtime_error("Something went wrong while writing the last tile..."); 441 481 } 442 482 … … 449 489 uint64_t heap_offset = fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2; 450 490 SetInt("ZHEAPPTR", heap_offset); 451 // SetInt("THEAP" , heap_offset);0452 491 453 492 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile; … … 547 586 SetStr(strKey.str(), strVal.str(), strCom.str()); 548 587 549 return reallocateBuffers();588 return true; 550 589 } 551 590 … … 571 610 } 572 611 573 fNumThreads = num; 574 fThreadStatus.resize(num); 575 fThread.resize(num); 576 fThreadNumRows.resize(num); 577 for (uint32_t i=0;i<num;i++) 578 { 579 fThreadNumRows[i] = 0; 580 fThreadStatus[i] = _THREAD_WAIT_; 581 } 582 583 return reallocateBuffers(); 584 } 612 if (fCompressionQueues.size() == num) 613 return true; 614 615 //cannot be const, as resize does not want it that way 616 Queue<CompressionTarget> queue(bind(&zofits::CompressBuffer, this, placeholders::_1), false, false); 617 618 //shrink 619 if (num < fCompressionQueues.size()) 620 { 621 fCompressionQueues.resize(num, queue); 622 return true; 623 } 624 625 //grow 626 fCompressionQueues.resize(num, queue); 627 628 fNumQueues = num; 629 fQueueLooper = 0; 630 631 return true; 632 } 633 585 634 586 635 private: 587 636 588 589 637 bool reallocateBuffers() 590 638 { 591 fBufferVector.resize(fRealRowWidth*fNumRowsPerTile + 8); //8 more for checksuming 592 memset(fBufferVector.data(), 0, 4); 593 fBuffer = fBufferVector.data()+4; 639 size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming; 640 fMemPool.setChunkSize(chunk_size); 641 642 fSmartBuffer = fMemPool.malloc(); 643 fBuffer = fSmartBuffer.get()->get(); 644 // memset(fBuffer, 0, 4); 645 // fBuffer += 4; 594 646 595 647 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming 596 597 598 fTransposedBufferVector.resize(fNumThreads);599 fCompressedBufferVector.resize(fNumThreads);600 fTransposedBuffer.resize(fNumThreads);601 fCompressedBuffer.resize(fNumThreads);602 for (uint32_t i=0;i<fNumThreads;i++)603 {604 fTransposedBufferVector[i].resize(fRealRowWidth*fNumRowsPerTile);605 fCompressedBufferVector[i].resize(fRealRowWidth*fNumRowsPerTile + fRealColumns.size() + sizeof(TileHeader) + 8);606 memset(fCompressedBufferVector[i].data(), 0, 4);607 TileHeader tileHeader;608 memcpy(fCompressedBufferVector[i].data()+4, &tileHeader, sizeof(TileHeader));609 fTransposedBuffer[i] = fTransposedBufferVector[i].data();610 fCompressedBuffer[i] = fCompressedBufferVector[i].data()+4;611 }612 648 613 649 //give the catalog enough space … … 622 658 } 623 659 624 bool writeCompressedDataToDisk( uint32_t threadID, uint32_t sizeToWrite)625 { 626 char* checkSumPointer = fCompressedBuffer[threadID];660 bool writeCompressedDataToDisk(char* src, uint32_t sizeToWrite) 661 { 662 char* checkSumPointer = src+4; 627 663 int32_t extraBytes = 0; 628 664 uint32_t sizeToChecksum = sizeToWrite; … … 645 681 fCheckOffset = (4 - extraBytes)%4; 646 682 //write data to disk 647 write(fCompressedBuffer[threadID], sizeToWrite); 683 write(src+4, sizeToWrite); 684 648 685 return good(); 649 686 } 650 687 651 static void* threadFunction(void* context) 652 { 653 zofits* myself = static_cast<zofits*>(context); 654 655 uint32_t myID = 0; 656 pthread_mutex_lock(&(myself->fMutex)); 657 myID = myself->fThreadIndex++; 658 pthread_mutex_unlock(&(myself->fMutex)); 659 uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->fNumThreads-1 : myID-1; 660 661 while (myself->fThreadStatus[myID] != _THREAD_EXIT_) 662 { 663 while (myself->fThreadStatus[myID] == _THREAD_WAIT_) 664 usleep(100000); 665 666 if (myself->fThreadStatus[myID] != _THREAD_COMPRESS_) 667 continue; 668 uint32_t numBytes = myself->compressBuffer(myID); 669 myself->fThreadStatus[myID] = _THREAD_WRITE_; 670 671 //wait for the previous data to be written 672 while (myself->fThreadIndex != threadToWaitForBeforeWriting) 673 usleep(1000); 674 //do the actual writing to disk 675 pthread_mutex_lock(&(myself->fMutex)); 676 myself->writeCompressedDataToDisk(myID, numBytes); 677 myself->fThreadIndex = myID; 678 pthread_mutex_unlock(&(myself->fMutex)); 679 myself->fThreadStatus[myID] = _THREAD_WAIT_; 680 } 681 return NULL; 682 } 683 684 uint64_t compressBuffer(uint32_t threadIndex) 685 { 686 uint32_t thisRoundNumRows = (fThreadNumRows[threadIndex]%fNumRowsPerTile) ? fThreadNumRows[threadIndex]%fNumRowsPerTile : fNumRowsPerTile; 688 bool CompressBuffer(const CompressionTarget& target) 689 { 690 //compress the buffer 691 uint64_t compressed_size = compressBuffer(target.target.target.get()->get(), target.src.get()->get(), target.num_rows); 692 693 //post the result to the writing queue 694 //get a copy so that it becomes non-const 695 WriteTarget wt; 696 wt.tile_num = target.target.tile_num; 697 wt.size = compressed_size; 698 wt.target = target.target.target; 699 700 fWriteToDiskQueue.post(wt); 701 return true; 702 } 703 704 bool WriteBufferToDisk(const WriteTarget& target) 705 { 706 //is this the tile we're supposed to write ? 707 if (target.tile_num != fLatestWrittenTile+1) 708 return false; 709 710 fLatestWrittenTile++; 711 712 //write the buffer to disk. 713 writeCompressedDataToDisk(target.target.get()->get(), target.size); 714 715 return true; 716 } 717 718 //src cannot be const, as applySMOOTHING is done in place 719 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows) 720 { 721 uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile; 687 722 uint32_t offset=0; 688 uint32_t currentCatalogRow = (fThreadNumRows[threadIndex]-1)/fNumRowsPerTile; 689 uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry 723 uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile; 724 725 //skip the checksum reserved area 726 dest += 4; 727 728 //skip the 'TILE' marker and tile size entry 729 uint64_t compressedOffset = sizeof(TileHeader); 690 730 691 731 //now compress each column one by one by calling compression on arrays … … 709 749 { 710 750 case zfits::kFactRaw: 711 compressedOffset += compressUNCOMPRESSED( &(fCompressedBuffer[threadIndex][compressedOffset]),712 &(fTransposedBuffer[threadIndex][offset]),751 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, 752 src + offset, 713 753 thisRoundNumRows, 714 754 fRealColumns[i].col.size, … … 716 756 break; 717 757 case zfits::kFactSmoothing: 718 applySMOOTHING( &(fCompressedBuffer[threadIndex][compressedOffset]),719 &(fTransposedBuffer[threadIndex][offset]),758 applySMOOTHING(dest + compressedOffset, 759 src + offset, 720 760 thisRoundNumRows, 721 761 fRealColumns[i].col.size, … … 724 764 case zfits::kFactHuffman16: 725 765 if (head.ordering == zfits::kOrderByCol) 726 compressedOffset += compressHUFFMAN( &(fCompressedBuffer[threadIndex][compressedOffset]),727 &(fTransposedBuffer[threadIndex][offset]),766 compressedOffset += compressHUFFMAN(dest + compressedOffset, 767 src + offset, 728 768 thisRoundNumRows, 729 769 fRealColumns[i].col.size, 730 770 fRealColumns[i].col.num); 731 771 else 732 compressedOffset += compressHUFFMAN( &(fCompressedBuffer[threadIndex][compressedOffset]),733 &(fTransposedBuffer[threadIndex][offset]),772 compressedOffset += compressHUFFMAN(dest + compressedOffset, 773 src + offset, 734 774 fRealColumns[i].col.num, 735 775 fRealColumns[i].col.size, … … 746 786 compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size()) 747 787 {//if so set flag and redo it uncompressed 748 //cout << "REDOING UNCOMPRESSED" << endl;788 cout << "REDOING UNCOMPRESSED" << endl; 749 789 compressedOffset = previousOffset + sizeof(BlockHeader) + 1; 750 compressedOffset += compressUNCOMPRESSED( &(fCompressedBuffer[threadIndex][compressedOffset]), &(fTransposedBuffer[threadIndex][offset]), thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);790 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num); 751 791 BlockHeader he; 752 792 he.size = compressedOffset - previousOffset; 753 793 he.numProcs = 1; 754 794 he.ordering = zfits::kOrderByRow; 755 memcpy( &(fCompressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader));756 fCompressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = zfits::kFactRaw;795 memcpy(dest + previousOffset, (char*)(&he), sizeof(BlockHeader)); 796 dest[previousOffset+sizeof(BlockHeader)] = zfits::kFactRaw; 757 797 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num; 758 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;759 continue;798 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second; 799 continue; 760 800 } 801 761 802 head.size = compressedOffset - previousOffset; 762 memcpy( &(fCompressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader));763 memcpy( &(fCompressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size());764 765 803 memcpy(dest + previousOffset, (char*)(&head), sizeof(BlockHeader)); 804 memcpy(dest + previousOffset+sizeof(BlockHeader), sequence.data(), sizeof(uint16_t)*sequence.size()); 805 806 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num; 766 807 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second; 767 808 } 768 809 769 TileHeader tHead(thisRoundNumRows, compressedOffset); 770 memcpy(fCompressedBuffer[threadIndex], &tHead, sizeof(TileHeader)); 810 TileHeader tile_head(thisRoundNumRows, compressedOffset); 811 memcpy(dest, &tile_head, sizeof(TileHeader)); 812 771 813 return compressedOffset; 772 814 } 773 815 774 void copyTransposeTile( uint32_t index)816 void copyTransposeTile(const char* src, char* dest)//uint32_t index) 775 817 { 776 818 uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile; 777 819 778 820 //copy the tile and transpose it 779 uint32_t offset = 0;780 821 for (uint32_t i=0;i<fRealColumns.size();i++) 781 822 { … … 785 826 for (uint32_t k=0;k<thisRoundNumRows;k++) 786 827 {//regular, "semi-transposed" copy 787 memcpy( &(fTransposedBuffer[index][offset]), &fBuffer[k*fRealRowWidth + fRealColumns[i].col.offset], fRealColumns[i].col.size*fRealColumns[i].col.num);788 offset += fRealColumns[i].col.size*fRealColumns[i].col.num;828 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num); 829 dest += fRealColumns[i].col.size*fRealColumns[i].col.num; 789 830 } 790 831 break; … … 794 835 for (uint32_t k=0;k<thisRoundNumRows;k++) 795 836 {//transposed copy 796 memcpy( &(fTransposedBuffer[index][offset]), &fBuffer[k*fRealRowWidth + fRealColumns[i].col.offset + fRealColumns[i].col.size*j], fRealColumns[i].col.size);797 offset += fRealColumns[i].col.size;837 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size); 838 dest += fRealColumns[i].col.size; 798 839 } 799 840 break; 800 841 default: 801 842 cout << "Error: unknown column ordering: " << fRealColumns[i].head.ordering << endl; 802 803 843 }; 804 844 } … … 844 884 } 845 885 846 uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)847 {848 uint32_t colWidth = numRowElems;849 for (int j=colWidth*numRows-1;j>1;j--)850 reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;851 //call the huffman transposed852 return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);853 }854 855 886 uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems) 856 887 { … … 873 904 uint32_t fNumRowsPerTile; 874 905 875 876 906 //thread related stuff 877 uint32_t fNumThreads; ///< The number of threads that will be used to compress878 uint32_t fThreadIndex; ///< A variable to assign threads indices879 vector<pthread_t> fThread; ///< The thread handler of the compressor 880 vector<uint32_t> fThreadNumRows; ///< Total number of rows for thread to compresswww.;wwwwww881 vector<uint32_t> fThreadStatus; ///< Flag telling whether the buffer to be transposed (and compressed) is full or empty882 int32_t fThreadLooper; ///< Which thread will deal with the upcoming bunch of data ?883 pthread_mutex_t fMutex; ///< mutex for compressing threads907 vector<Queue<CompressionTarget>> fCompressionQueues; 908 Queue<WriteTarget> fWriteToDiskQueue; 909 910 //thread related stuff 911 uint32_t fNumQueues; ///< The number of threads that will be used to compress 912 uint32_t fQueueLooper; 913 int32_t fLatestWrittenTile; 884 914 885 915 struct CatalogEntry … … 890 920 } __attribute__((__packed__)); 891 921 892 // typedef pair<int64_t, int64_t> CatalogEntry;893 922 typedef vector<CatalogEntry> CatalogRow; 894 923 typedef vector<CatalogRow> CatalogType; … … 897 926 Checksum fRawSum; 898 927 off_t fCatalogOffset; 899 vector<char> fBufferVector; 928 uint32_t fRealRowWidth; 929 900 930 vector<char> fRawSumBuffer; 901 vector<vector<char>> fTransposedBufferVector; 902 vector<vector<char>> fCompressedBufferVector; 903 char* fBuffer; 904 vector<char*> fTransposedBuffer; 905 vector<char*> fCompressedBuffer; 906 uint32_t fRealRowWidth; 931 MemoryManager fMemPool; 932 uint64_t fMaxUsableMem; 933 934 shared_ptr<MemoryChunk> fSmartBuffer; 935 char* fBuffer; 936 937 907 938 struct CompressedColumn 908 939 { … … 917 948 vector<CompressedColumn> fRealColumns; 918 949 919 //thread states. Not all used, but they do not hurt920 static const uint32_t _THREAD_WAIT_ = 0; ///< Thread doing nothing921 static const uint32_t _THREAD_COMPRESS_ = 1; ///< Thread working, compressing922 static const uint32_t _THREAD_DECOMPRESS_ = 2; ///< Thread working, decompressing923 static const uint32_t _THREAD_WRITE_ = 3; ///< Thread writing data to disk924 static const uint32_t _THREAD_READ_ = 4; ///< Thread reading data from disk925 static const uint32_t _THREAD_EXIT_ = 5; ///< Thread exiting926 927 950 }; 928 951
Note:
See TracChangeset
for help on using the changeset viewer.