Changeset 17258 for trunk/Mars
- Timestamp:
- 10/18/13 16:10:03 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Mars/mcore/zofits.h
r17254 r17258 58 58 } 59 59 60 WriteTarget() { } 61 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { } 62 60 63 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order) 61 64 uint32_t size; ///< Size to write 62 shared_ptr<char> data; ///< 65 shared_ptr<char> data; ///< Memory block to write 63 66 }; 64 67 … … 119 122 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0) 120 123 { 121 if (nt == 0)122 throw runtime_error("There must be at least 1 tile of data (0 specified). This is required by the FITS standard. Please try again with num_tile >= 1.");123 124 124 fCheckOffset = 0; 125 125 fNumQueues = 0; 126 126 127 fNumTiles = nt ;127 fNumTiles = nt==0 ? 1 : nt; 128 128 fNumRowsPerTile = rpt; 129 129 … … 136 136 fThreadsException = exception_ptr(); 137 137 #endif 138 fErrno = 0; 138 139 } 139 140 … … 206 207 { 207 208 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t); 208 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size; //fCatalog.size()*one_catalog_row_size;209 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size; 209 210 210 211 // swap the catalog bytes before writing 211 212 vector<char> swapped_catalog(total_catalog_size); 213 212 214 uint32_t shift = 0; 213 for (auto it=fCatalog. begin(); it!=fCatalog.end(); it++)215 for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++) 214 216 { 215 217 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2); … … 259 261 return fCatalog.back(); 260 262 } 263 261 264 /// write one row of data 265 /// Note, in a multi-threaded environment (NumThreads>0), the return code should be checked rather 266 /// than the badbit() of the stream (it might have been set by a thread before the errno has been set) 267 /// errno will then contain the correct error number of the last error which happened during writing. 262 268 /// @param ptr the source buffer 263 269 /// @param the number of bytes to write … … 275 281 } 276 282 283 #ifdef __EXCEPTIONS 284 //check if something hapenned while the compression threads were working 285 //if so, re-throw the exception that was generated 286 if (fThreadsException != exception_ptr()) 287 rethrow_exception(fThreadsException); 288 #endif 277 289 //copy current row to pool or rows waiting for compression 278 290 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile); … … 305 317 DrsOffsetCalibrate(target_location); 306 318 307 if (fTable.num_rows % fNumRowsPerTile == 0) 308 { 309 CompressionTarget compress_target(AddOneCatalogRow()); 310 SetNextCompression(compress_target); 311 312 if (fNumQueues == 0) 313 { //no worker threads. do everything in-line 314 const uint64_t size_to_write = CompressBuffer(compress_target); 315 316 WriteTarget write_target; 317 write_target.size = size_to_write; 318 write_target.data = compress_target.target.data; 319 write_target.tile_num = compress_target.target.tile_num; 320 321 return WriteBufferToDisk(write_target); 319 if (fTable.num_rows % fNumRowsPerTile != 0) 320 { 321 errno = fErrno; 322 return errno==0; 323 } 324 325 CompressionTarget compress_target(AddOneCatalogRow()); 326 SetNextCompression(compress_target); 327 328 //no worker threads. do everything in-line 329 if (fNumQueues == 0) 330 { 331 const uint64_t size_to_write = CompressBuffer(compress_target); 332 333 const WriteTarget write_target(compress_target.target, size_to_write); 334 if (!WriteBufferToDisk(write_target)) 335 throw runtime_error("Unexpected tile number mismatch in WriteBufferToDisk in the main thread."); 336 337 // The correct 'errno' is set, because it is the main thread. 338 return good(); 339 } 340 341 //if all queues are empty, use queue 0 342 uint32_t min_index = 0; 343 uint32_t min_size = numeric_limits<uint32_t>::max(); 344 uint32_t current_index = 0; 345 346 for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++) 347 { 348 if (it->size() < min_size) 349 { 350 min_index = current_index; 351 min_size = it->size(); 322 352 } 323 else 324 { 325 //if all queues are empty, use queue 0 326 uint32_t min_index = 0; 327 uint32_t min_size = numeric_limits<uint32_t>::max(); 328 uint32_t current_index = 0; 329 330 for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++) 331 { 332 if (it->size() < min_size) 333 { 334 min_index = current_index; 335 min_size = it->size(); 336 } 337 current_index++; 338 } 339 340 if (!fCompressionQueues[min_index].post(compress_target)) 341 throw runtime_error("The compression queues are not started. Did you close the file before writing this row ?"); 342 } 343 } 344 345 return good(); 353 current_index++; 354 } 355 356 if (!fCompressionQueues[min_index].emplace(compress_target)) 357 throw runtime_error("The compression queues are not started. Did you close the file before writing this row?"); 358 359 errno = fErrno; 360 return errno==0; 346 361 } 347 362 … … 358 373 void SetNextCompression(CompressionTarget& target) 359 374 { 360 //get space for transposed data 361 shared_ptr<char> transposed_data = fMemPool.malloc(); 375 //fill up compression target 376 target.src = fSmartBuffer; 377 target.transposed_src = fMemPool.malloc(); 378 target.num_rows = fTable.num_rows; 362 379 363 380 //fill up write to disk target 364 WriteTarget write_target;381 WriteTarget &write_target = target.target; 365 382 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile; 366 383 write_target.size = 0; 367 384 write_target.data = fMemPool.malloc(); 368 369 //fill up compression target370 target.src = fSmartBuffer;371 target.transposed_src = transposed_data;372 target.target = write_target;373 target.num_rows = fTable.num_rows;374 385 375 386 //get a new buffer to host the incoming data … … 434 445 435 446 if (tellp() < 0) 436 {437 #ifdef __EXCEPTIONS438 throw runtime_error("Looks like the file has been closed already");439 #else440 447 return false; 441 #endif442 }443 448 444 449 #ifdef __EXCEPTIONS 445 450 //check if something hapenned while the compression threads were working 451 //if so, re-throw the exception that was generated 446 452 if (fThreadsException != exception_ptr()) 447 {448 //if so, re-throw the exception that was generated449 453 rethrow_exception(fThreadsException); 450 }451 454 #endif 452 455 … … 458 461 459 462 //set number of threads to zero before calling compressBuffer 460 int32_t backup_num_queues = fNumQueues;463 const int32_t backup_num_queues = fNumQueues; 461 464 fNumQueues = 0; 462 uint64_t size_to_write = CompressBuffer(compress_target); 465 466 const uint64_t size_to_write = CompressBuffer(compress_target); 463 467 fNumQueues = backup_num_queues; 464 468 465 WriteTarget write_target; 466 write_target.size = size_to_write; 467 write_target.data = compress_target.target.data; 468 write_target.tile_num = compress_target.target.tile_num; 469 469 const WriteTarget write_target(compress_target.target, size_to_write); 470 470 if (!WriteBufferToDisk(write_target)) 471 throw runtime_error(" Something went wrong while writing the last tile...");471 throw runtime_error("Tile number mismatch in WriteBufferToDisk writing the last tile."); 472 472 } 473 473 … … 504 504 SetInt("NAXIS1", total_catalog_width); 505 505 SetInt("NAXIS2", total_num_tiles_written); 506 507 ostringstream str; 508 str << fRawSum.val(); 509 SetStr("RAWSUM", str.str()); 506 SetStr("RAWSUM", to_string(fRawSum.val())); 510 507 511 508 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size; … … 551 548 return false; 552 549 550 const size_t size = SizeFromType(typechar); 551 553 552 Table::Column col; 554 size_t size = SizeFromType(typechar);555 556 553 col.name = name; 557 554 col.type = typechar; … … 562 559 fRealRowWidth += size*cnt; 563 560 564 fRealColumns.emplace_back(CompressedColumn(col, comp)); 565 566 ostringstream strKey, strVal, strCom; 567 strKey << "ZFORM" << fRealColumns.size(); 568 strVal << cnt << typechar; 569 strCom << "format of " << name << " [" << CommentFromType(typechar); 570 SetStr(strKey.str(), strVal.str(), strCom.str()); 571 572 strKey.str(""); 573 strVal.str(""); 574 strCom.str(""); 575 strKey << "ZCTYP" << fRealColumns.size(); 576 strVal << "FACT"; 577 strCom << "Compression type FACT"; 578 SetStr(strKey.str(), strVal.str(), strCom.str()); 561 fRealColumns.emplace_back(col, comp); 562 563 ostringstream str; 564 str << "format of " << name << " [" << CommentFromType(typechar); 565 566 SetStr("ZFORM"+to_string(fRealColumns.size()), to_string(cnt)+typechar, str.str()); 567 SetStr("ZCTYP"+to_string(fRealColumns.size()), "FACT", "Compression type: FACT"); 579 568 580 569 return true; … … 639 628 int32_t extraBytes = 0; 640 629 uint32_t sizeToChecksum = sizeToWrite; 630 631 //should we extend the array to the left ? 641 632 if (fCheckOffset != 0) 642 { //should we extend the array to the left ?643 sizeToChecksum += fCheckOffset;633 { 634 sizeToChecksum += fCheckOffset; 644 635 checkSumPointer -= fCheckOffset; 645 636 memset(checkSumPointer, 0, fCheckOffset); 646 637 } 638 639 //should we extend the array to the right ? 647 640 if (sizeToChecksum%4 != 0) 648 { //should we extend the array to the right ?641 { 649 642 extraBytes = 4 - (sizeToChecksum%4); 650 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);643 memset(checkSumPointer+sizeToChecksum, 0, extraBytes); 651 644 sizeToChecksum += extraBytes; 652 645 } … … 705 698 //post the result to the writing queue 706 699 //get a copy so that it becomes non-const 707 WriteTarget wt; 708 wt.tile_num = target.target.tile_num; 709 wt.size = compressed_size; 710 wt.data = target.target.data; 711 712 fWriteToDiskQueue.post(wt); 700 fWriteToDiskQueue.emplace(target.target, compressed_size); 713 701 714 702 // if used by the queue, always return true as the elements are not ordered … … 730 718 { 731 719 #endif 720 //could not write the data to disk 732 721 if (!writeCompressedDataToDisk(target.data.get(), target.size)) 733 {//could not write the data to disk 734 ostringstream str; 735 str << "An error occured while writing to disk: "; 736 if (eof()) 737 str << "End-Of-File"; 738 if (failbit) 739 str << "Logical error on i/o operation"; 740 if (badbit) 741 str << "Writing error on i/o operation"; 742 #ifdef __EXCEPTIONS 743 throw runtime_error(str.str()); 744 #else 745 gLog << ___err___ << "ERROR - " << str.str() << endl; 746 #endif 747 } 748 #ifdef __EXCEPTIONS 749 } 750 catch(...) 722 fErrno = errno; 723 #ifdef __EXCEPTIONS 724 } 725 catch (...) 751 726 { 752 727 fThreadsException = current_exception(); … … 780 755 catalog_row[i].second = compressedOffset; 781 756 782 if (fRealColumns[i].col.num == 0) continue; 757 if (fRealColumns[i].col.num == 0) 758 continue; 783 759 784 760 Compression& head = fRealColumns[i].block_head; … … 794 770 switch (head.getProc(j)) 795 771 { 796 797 772 case kFactRaw: 773 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num); 798 774 break; 799 case kFactSmoothing: 800 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num); 775 776 case kFactSmoothing: 777 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num); 801 778 break; 802 case kFactHuffman16: 803 if (head.getOrdering() == kOrderByCol) 804 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num); 805 else 806 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows); 779 780 case kFactHuffman16: 781 if (head.getOrdering() == kOrderByCol) 782 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num); 783 else 784 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows); 807 785 break; 808 786 } … … 837 815 } 838 816 839 TileHeader tile_head(thisRoundNumRows, compressedOffset);817 const TileHeader tile_head(thisRoundNumRows, compressedOffset); 840 818 memcpy(dest, &tile_head, sizeof(TileHeader)); 841 819 … … 855 833 switch (fRealColumns[i].block_head.getOrdering()) 856 834 { 857 case kOrderByRow: 835 case kOrderByRow: 836 //regular, "semi-transposed" copy 837 for (uint32_t k=0;k<thisRoundNumRows;k++) 838 { 839 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num); 840 dest += fRealColumns[i].col.size*fRealColumns[i].col.num; 841 } 842 break; 843 844 case kOrderByCol: 845 //transposed copy 846 for (uint32_t j=0;j<fRealColumns[i].col.num;j++) 858 847 for (uint32_t k=0;k<thisRoundNumRows;k++) 859 { //regular, "semi-transposed" copy860 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset , fRealColumns[i].col.size*fRealColumns[i].col.num);861 dest += fRealColumns[i].col.size *fRealColumns[i].col.num;848 { 849 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size); 850 dest += fRealColumns[i].col.size; 862 851 } 863 break;864 865 case kOrderByCol :866 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)867 for (uint32_t k=0;k<thisRoundNumRows;k++)868 {//transposed copy869 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);870 dest += fRealColumns[i].col.size;871 }872 852 break; 873 853 }; … … 897 877 string huffmanOutput; 898 878 uint32_t previousHuffmanSize = 0; 879 880 //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage. 899 881 if (numRows < 2) 900 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.901 882 return numRows*sizeOfElems*numRowElems + 1000; 902 } 883 903 884 if (sizeOfElems < 2 ) 904 885 { … … 910 891 #endif 911 892 } 893 912 894 uint32_t huffmanOffset = 0; 913 895 for (uint32_t j=0;j<numRowElems;j++) … … 920 902 previousHuffmanSize = huffmanOutput.size(); 921 903 } 904 922 905 const size_t totalSize = huffmanOutput.size() + huffmanOffset; 923 906 … … 949 932 { 950 933 int16_t* short_data = reinterpret_cast<int16_t*>(data); 951 //un-do the integer smoothing952 934 for (uint32_t j=2;j<numElems;j++) 953 935 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2; … … 965 947 966 948 vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads) 967 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///<Writing queue (=thread)949 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread) 968 950 969 951 // catalog related stuff 970 952 CatalogType fCatalog; ///< Catalog for this file 971 uint32_t fCatalogSize; ///<Actual catalog size (.size() is slow on large lists)972 uint32_t fNumTiles; ///<Number of pre-reserved tiles973 uint32_t fNumRowsPerTile; ///<Number of rows per tile974 off_t fCatalogOffset; ///<Offset of the catalog from the beginning of the file953 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists) 954 uint32_t fNumTiles; ///< Number of pre-reserved tiles 955 uint32_t fNumRowsPerTile; ///< Number of rows per tile 956 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file 975 957 976 958 // checksum related stuff 977 959 Checksum fCatalogSum; ///< Checksum of the catalog 978 Checksum fRawSum; ///<Raw sum (specific to FACT)979 int32_t fCheckOffset; ///<offset to the data pointer to calculate the checksum960 Checksum fRawSum; ///< Raw sum (specific to FACT) 961 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum 980 962 981 963 // data layout related stuff … … 986 968 block_head(h) 987 969 {} 988 Table::Column 989 Compression block_head;///< the compression data associated with that column970 Table::Column col; ///< the regular column entry 971 Compression block_head; ///< the compression data associated with that column 990 972 }; 991 973 vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file … … 997 979 exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads 998 980 #endif 981 int fErrno; ///< propagate errno to main thread 982 999 983 1000 984 };
Note:
See TracChangeset
for help on using the changeset viewer.