Changeset 17269 for trunk/Mars/mcore
- Timestamp:
- 10/18/13 22:29:17 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Mars/mcore/zofits.h
r17264 r17269 46 46 struct WriteTarget 47 47 { 48 bool operator < (const WriteTarget& other) 48 bool operator < (const WriteTarget& other) const 49 49 { 50 50 return tile_num < other.tile_num; … … 141 141 ofits::WriteTableHeader(name); 142 142 143 fCompressionQueues.front().setPromptExecution(fNumQueues==0); 144 fWriteToDiskQueue.setPromptExecution(fNumQueues==0); 145 143 146 if (fNumQueues != 0) 144 147 { … … 153 156 //mark that no tile has been written so far 154 157 fLatestWrittenTile = -1; 158 159 //no wiring error (in the writing of the data) has occured so far 160 fErrno = 0; 155 161 156 162 return good(); … … 316 322 } 317 323 318 CompressionTarget compress_target(AddOneCatalogRow()); 319 SetNextCompression(compress_target); 320 321 //no worker threads. do everything in-line 322 if (fNumQueues == 0) 323 { 324 const uint64_t size_to_write = CompressBuffer(compress_target); 325 326 const WriteTarget write_target(compress_target.target, size_to_write); 327 if (!WriteBufferToDisk(write_target)) 328 throw std::runtime_error("Unexpected tile number mismatch in WriteBufferToDisk in the main thread."); 329 330 // The correct 'errno' is set, because it is the main thread. 331 return good(); 332 } 333 334 //if all queues are empty, use queue 0 335 uint32_t min_index = 0; 336 uint32_t min_size = std::numeric_limits<uint32_t>::max(); 337 uint32_t current_index = 0; 338 339 for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++) 340 { 341 if (it->size() < min_size) 342 { 343 min_index = current_index; 344 min_size = it->size(); 345 } 346 current_index++; 347 } 348 349 if (!fCompressionQueues[min_index].emplace(compress_target)) 324 // use the least occupied queue 325 const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end()); 326 if (!imin->emplace(InitNextCompression())) 350 327 throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?"); 351 328 … … 364 341 /// Setup the environment to compress yet another tile of data 365 342 /// @param target the struct where to host the produced parameters 366 void SetNextCompression(CompressionTarget& target) 367 { 343 CompressionTarget InitNextCompression() 344 { 345 CompressionTarget target(AddOneCatalogRow()); 346 368 347 //fill up compression target 369 348 target.src = fSmartBuffer; … … 379 358 //get a new buffer to host the incoming data 380 359 fSmartBuffer = fMemPool.malloc(); 360 361 return target; 381 362 } 382 363 … … 389 370 390 371 //did we write more rows than what the catalog could host ? 391 if (fCatalogSize > fNumTiles) 392 { 393 const uint32_t shrink_factor = fCatalogSize / fNumTiles; //always exact as extra rows were added just above 394 395 //shrink the catalog ! 396 uint32_t entry_id = 1; 397 auto it = fCatalog.begin(); 398 it++; 399 for (; it != fCatalog.end(); it++) 400 { 401 if (entry_id >= fNumTiles) break; 402 403 uint32_t target_id = entry_id*shrink_factor; 404 405 auto jt = it; 406 for (uint32_t i=0;i<target_id-entry_id;i++) 407 jt++; 408 409 *it = *jt; 410 411 entry_id++; 412 } 413 414 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles; 415 //remove the too many entries 416 for (uint32_t i=0;i<num_tiles_to_remove;i++) 417 { 418 fCatalog.pop_back(); 419 fCatalogSize--; 420 } 421 //update header keywords 422 fNumRowsPerTile *= shrink_factor; 423 424 SetInt("ZTILELEN", fNumRowsPerTile); 425 SetInt("ZSHRINK", shrink_factor); 426 } 372 if (fCatalogSize <= fNumTiles) // nothing to do 373 return; 374 375 //always exact as extra rows were added just above 376 const uint32_t shrink_factor = fCatalogSize / fNumTiles; 377 378 //shrink the catalog ! 379 uint32_t entry_id = 1; 380 auto it = fCatalog.begin(); 381 it++; 382 for (; it != fCatalog.end(); it++) 383 { 384 if (entry_id >= fNumTiles) 385 break; 386 387 const uint32_t target_id = entry_id*shrink_factor; 388 389 auto jt = it; 390 for (uint32_t i=0; i<target_id-entry_id; i++) 391 jt++; 392 393 *it = *jt; 394 395 entry_id++; 396 } 397 398 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles; 399 400 //remove the too many entries 401 for (uint32_t i=0;i<num_tiles_to_remove;i++) 402 { 403 fCatalog.pop_back(); 404 fCatalogSize--; 405 } 406 407 //update header keywords 408 fNumRowsPerTile *= shrink_factor; 409 410 SetInt("ZTILELEN", fNumRowsPerTile); 411 SetInt("ZSHRINK", shrink_factor); 427 412 } 428 413 … … 447 432 #endif 448 433 449 //write the last tile of data (if any 450 if (fTable.num_rows%fNumRowsPerTile != 0) 451 { 452 CompressionTarget compress_target(AddOneCatalogRow()); 453 SetNextCompression(compress_target); 454 455 //set number of threads to zero before calling compressBuffer 456 const int32_t backup_num_queues = fNumQueues; 457 fNumQueues = 0; 458 459 const uint64_t size_to_write = CompressBuffer(compress_target); 460 fNumQueues = backup_num_queues; 461 462 const WriteTarget write_target(compress_target.target, size_to_write); 463 if (!WriteBufferToDisk(write_target)) 464 throw std::runtime_error("Tile number mismatch in WriteBufferToDisk writing the last tile."); 434 //write the last tile of data (if any) 435 if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0) 436 { 437 fWriteToDiskQueue.enablePromptExecution(); 438 fCompressionQueues.front().enablePromptExecution(); 439 fCompressionQueues.front().emplace(InitNextCompression()); 465 440 } 466 441 … … 563 538 564 539 /// Get and set the actual number of threads for this object 565 int32_t GetNumThreads() const { return fNumQueues; }540 int32_t GetNumThreads() const { return fNumQueues; } 566 541 bool SetNumThreads(uint32_t num) 567 542 { … … 590 565 num = num_available_cores>2 ? num_available_cores-2 : 1; 591 566 592 if (fCompressionQueues.size() != uint32_t(num)) 593 { 594 fCompressionQueues.resize(num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false)); 595 fNumQueues = num; 596 } 567 fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false)); 568 fNumQueues = num; 597 569 598 570 return true; … … 651 623 /// @param target the struct hosting the parameters of the compression 652 624 /// @return number of bytes of the compressed data, or always 1 when used by the Queues 653 uint32_t CompressBuffer(const CompressionTarget& target) 654 { 655 uint64_t compressed_size = 0; 656 625 bool CompressBuffer(const CompressionTarget& target) 626 { 657 627 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct. 658 628 //Really do not understand what's wrong... … … 674 644 675 645 //compress the buffer 676 compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry); 646 const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry); 647 648 //post the result to the writing queue 649 //get a copy so that it becomes non-const 650 fWriteToDiskQueue.emplace(target.target, compressed_size); 651 677 652 #ifdef __EXCEPTIONS 678 653 } … … 685 660 #endif 686 661 687 if (fNumQueues == 0) 688 return compressed_size; 689 690 //post the result to the writing queue 691 //get a copy so that it becomes non-const 692 fWriteToDiskQueue.emplace(target.target, compressed_size); 693 694 // if used by the queue, always return true as the elements are not ordered 695 return 1; 662 return true; 696 663 } 697 664
Note:
See TracChangeset
for help on using the changeset viewer.