Changeset 17264 for trunk/Mars/mcore/zofits.h
- Timestamp:
- 10/18/13 17:52:38 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Mars/mcore/zofits.h
r17263 r17264 10 10 11 11 #include "ofits.h" 12 #include " zfits.h"12 #include "huffman.h" 13 13 #include "Queue.h" 14 14 #include "MemoryManager.h" … … 16 16 #ifdef USE_BOOST_THREADS 17 17 #include <boost/thread.hpp> 18 #endif19 20 #ifndef __MARS__21 namespace std22 {23 18 #endif 24 19 … … 39 34 struct CatalogEntry 40 35 { 41 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { };36 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { } 42 37 int64_t first; ///< Size of this column in the tile 43 38 int64_t second; ///< offset of this column in the tile, from the start of the heap area 44 39 } __attribute__((__packed__)); 45 40 46 typedef vector<CatalogEntry> CatalogRow;47 typedef list<CatalogRow> CatalogType;41 typedef std::vector<CatalogEntry> CatalogRow; 42 typedef std::list<CatalogRow> CatalogType; 48 43 49 44 … … 59 54 WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { } 60 55 61 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)62 uint32_t size; ///< Size to write63 s hared_ptr<char> data; ///< Memory block to write56 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order) 57 uint32_t size; ///< Size to write 58 std::shared_ptr<char> data; ///< Memory block to write 64 59 }; 65 60 … … 71 66 {} 72 67 73 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with74 s hared_ptr<char> src; ///< Original data75 s hared_ptr<char> transposed_src; ///< Transposed data76 WriteTarget target; ///< Compressed data77 uint32_t num_rows; ///< Number of rows to compress68 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with 69 std::shared_ptr<char> src; ///< Original data 70 std::shared_ptr<char> transposed_src; ///< Transposed data 71 WriteTarget target; ///< Compressed data 72 uint32_t num_rows; ///< Number of rows to compress 78 73 }; 79 74 … … 92 87 uint32_t rowPerTile = DefaultNumRowsPerTile(), 93 88 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(), 94 95 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this,placeholders::_1), false)89 fMemPool(0, maxUsableMem*1000), 90 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false) 96 91 { 97 92 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000); … … 107 102 uint32_t rowPerTile = DefaultNumRowsPerTile(), 108 103 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname), 109 110 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this,placeholders::_1), false)104 fMemPool(0, maxUsableMem*1000), 105 fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false) 111 106 { 112 107 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000); … … 132 127 fMaxUsableMem = maxUsableMem; 133 128 #ifdef __EXCEPTIONS 134 fThreadsException = exception_ptr();129 fThreadsException = std::exception_ptr(); 135 130 #endif 136 131 fErrno = 0; … … 208 203 209 204 // swap the catalog bytes before writing 210 vector<char> swapped_catalog(total_catalog_size);205 std::vector<char> swapped_catalog(total_catalog_size); 211 206 212 207 uint32_t shift = 0; … … 272 267 { 273 268 #ifdef __EXCEPTIONS 274 throw runtime_error("Wrong size of row given to WriteRow");269 throw std::runtime_error("Wrong size of row given to WriteRow"); 275 270 #else 276 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << endl;271 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl; 277 272 return false; 278 273 #endif … … 282 277 //check if something hapenned while the compression threads were working 283 278 //if so, re-throw the exception that was generated 284 if (fThreadsException != exception_ptr())285 rethrow_exception(fThreadsException);279 if (fThreadsException != std::exception_ptr()) 280 std::rethrow_exception(fThreadsException); 286 281 #endif 287 282 //copy current row to pool or rows waiting for compression … … 331 326 const WriteTarget write_target(compress_target.target, size_to_write); 332 327 if (!WriteBufferToDisk(write_target)) 333 throw runtime_error("Unexpected tile number mismatch in WriteBufferToDisk in the main thread.");328 throw std::runtime_error("Unexpected tile number mismatch in WriteBufferToDisk in the main thread."); 334 329 335 330 // The correct 'errno' is set, because it is the main thread. … … 339 334 //if all queues are empty, use queue 0 340 335 uint32_t min_index = 0; 341 uint32_t min_size = numeric_limits<uint32_t>::max();336 uint32_t min_size = std::numeric_limits<uint32_t>::max(); 342 337 uint32_t current_index = 0; 343 338 … … 353 348 354 349 if (!fCompressionQueues[min_index].emplace(compress_target)) 355 throw runtime_error("The compression queues are not started. Did you close the file before writing this row?");350 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?"); 356 351 357 352 errno = fErrno; … … 448 443 //check if something hapenned while the compression threads were working 449 444 //if so, re-throw the exception that was generated 450 if (fThreadsException != exception_ptr())451 rethrow_exception(fThreadsException);445 if (fThreadsException != std::exception_ptr()) 446 std::rethrow_exception(fThreadsException); 452 447 #endif 453 448 … … 467 462 const WriteTarget write_target(compress_target.target, size_to_write); 468 463 if (!WriteBufferToDisk(write_target)) 469 throw runtime_error("Tile number mismatch in WriteBufferToDisk writing the last tile.");464 throw std::runtime_error("Tile number mismatch in WriteBufferToDisk writing the last tile."); 470 465 } 471 466 … … 502 497 SetInt("NAXIS1", total_catalog_width); 503 498 SetInt("NAXIS2", total_num_tiles_written); 504 SetStr("RAWSUM", to_string(fRawSum.val()));499 SetStr("RAWSUM", std::to_string(fRawSum.val())); 505 500 506 501 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size; … … 519 514 const Checksum checksm = UpdateHeaderChecksum(); 520 515 521 ofstream::close();516 std::ofstream::close(); 522 517 523 518 if ((checksm+fDataSum).valid()) 524 519 return true; 525 520 526 ostringstream sout;521 std::ostringstream sout; 527 522 sout << "Checksum (" << std::hex << checksm.val() << ") invalid."; 528 523 #ifdef __EXCEPTIONS 529 throw runtime_error(sout.str());524 throw std::runtime_error(sout.str()); 530 525 #else 531 gLog << ___err___ << "ERROR - " << sout.str() << endl;526 gLog << ___err___ << "ERROR - " << sout.str() << std::endl; 532 527 return false; 533 528 #endif … … 535 530 536 531 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column 537 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true) 532 bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit, 533 const std::string& comment="", bool addHeaderKeys=true) 538 534 { 539 535 return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys); … … 541 537 542 538 /// Overload of the simplified compressed version 543 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true) 539 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name, 540 const std::string& unit, const std::string& comment="", bool addHeaderKeys=true) 544 541 { 545 542 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys)) … … 559 556 fRealColumns.emplace_back(col, comp); 560 557 561 SetStr("ZFORM"+ to_string(fRealColumns.size()),to_string(cnt)+typechar, "format of "+name+" "+CommentFromType(typechar));562 SetStr("ZCTYP"+ to_string(fRealColumns.size()), "FACT", "Compression type: FACT");558 SetStr("ZFORM"+std::to_string(fRealColumns.size()), std::to_string(cnt)+typechar, "format of "+name+" "+CommentFromType(typechar)); 559 SetStr("ZCTYP"+std::to_string(fRealColumns.size()), "FACT", "Compression type: FACT"); 563 560 564 561 return true; … … 572 569 { 573 570 #ifdef __EXCEPTIONS 574 throw runtime_error("File must be closed before changing the number of compression threads");571 throw std::runtime_error("File must be closed before changing the number of compression threads"); 575 572 #else 576 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" << endl;573 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" << std::endl; 577 574 #endif 578 575 return false; … … 583 580 unsigned int num_available_cores = boost::thread::hardware_concurrency(); 584 581 #else 585 unsigned int num_available_cores = thread::hardware_concurrency();582 unsigned int num_available_cores = std::thread::hardware_concurrency(); 586 583 #endif 587 584 // could not detect number of available cores from system properties... … … 595 592 if (fCompressionQueues.size() != uint32_t(num)) 596 593 { 597 fCompressionQueues.resize(num, Queue<CompressionTarget>( bind(&zofits::CompressBuffer, this,placeholders::_1), false));594 fCompressionQueues.resize(num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false)); 598 595 fNumQueues = num; 599 596 } … … 664 661 // for (uint32_t i=0;i<thisRoundNumRows;i++) 665 662 // { 666 // char* target_location = target.src.get() ->get()+ fRealRowWidth*i;663 // char* target_location = target.src.get() + fRealRowWidth*i; 667 664 // cout << "Target Location there...." << hex << static_cast<void*>(target_location) << endl; 668 665 // DrsOffsetCalibrate(target_location); … … 682 679 catch (...) 683 680 { 684 fThreadsException = current_exception();681 fThreadsException = std::current_exception(); 685 682 if (fNumQueues == 0) 686 rethrow_exception(fThreadsException);683 std::rethrow_exception(fThreadsException); 687 684 } 688 685 #endif … … 720 717 catch (...) 721 718 { 722 fThreadsException = current_exception();719 fThreadsException = std::current_exception(); 723 720 if (fNumQueues == 0) 724 rethrow_exception(fThreadsException);721 std::rethrow_exception(fThreadsException); 725 722 } 726 723 #endif … … 871 868 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems) 872 869 { 873 st ring huffmanOutput;870 std::string huffmanOutput; 874 871 uint32_t previousHuffmanSize = 0; 875 872 … … 881 878 { 882 879 #ifdef __EXCEPTIONS 883 throw runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");880 throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types"); 884 881 #else 885 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << endl;882 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl; 886 883 return 0; 887 884 #endif … … 942 939 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads) 943 940 944 vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)941 std::vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads) 945 942 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread) 946 943 … … 967 964 FITS::Compression block_head; ///< the compression data associated with that column 968 965 }; 969 vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file 970 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row 971 shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written 972 vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression 973 974 #ifdef __EXCEPTIONS 975 exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads 976 #endif 977 int fErrno; ///< propagate errno to main thread 978 979 966 std::vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file 967 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row 968 std::shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written 969 std::vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression 970 971 #ifdef __EXCEPTIONS 972 std::exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads 973 #endif 974 int fErrno; ///< propagate errno to main thread 980 975 }; 981 976 982 #ifndef __MARS__ 983 }; //namespace std 984 #endif 985 986 #endif 977 #endif
Note:
See TracChangeset
for help on using the changeset viewer.