source: trunk/Mars/mcore/zfits.h@ 17685

Last change on this file since 17685 was 17685, checked in by tbretz, 10 years ago
Fixed revision 17607. When starting with tile 0 the currentTile can be either -1 or 0 depending on fNumRowsPerTile; this can lead to wrong results for isNextTile or when checked if a new tikle should be read; therefore implemented a different solution which also ensures that the copy feature and the checksum calculation gets every tile if read sequentially.
File size: 23.8 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_zfits
9#define MARS_zfits
10
11#include "fits.h"
12#include "huffman.h"
13
14#include "FITS.h"
15
16class zfits : public fits
17{
18public:
19
20 // Basic constructor
21 zfits(const std::string& fname, const std::string& tableName="", bool force=false)
22 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
23 {
24 open(fname.c_str());
25 Constructor(fname, "", tableName, force);
26// InitCompressionReading();
27 }
28
29 // Alternative constructor
30 zfits(const std::string& fname, const std::string& fout, const std::string& tableName, bool force=false)
31 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
32 {
33 open(fname.c_str());
34 Constructor(fname, fout, tableName, force);
35// InitCompressionReading();
36 }
37
38 // Skip the next row
39 bool SkipNextRow()
40 {
41 if (!fTable.is_compressed)
42 return fits::SkipNextRow();
43
44 fRow++;
45 return true;
46 }
47
48 virtual bool IsFileOk() const
49 {
50 bool rawsum = true;
51
52 if (HasKey("RAWSUM"))
53 {
54 std::ostringstream str;
55 str << fRawsum.val();
56 rawsum = (GetStr("RAWSUM") == str.str());
57 }
58
59 return fits::IsFileOk() && rawsum;
60 };
61
62 size_t GetNumRows() const
63 {
64 if (fTable.is_compressed)
65 return fTable.Get<size_t>("ZNAXIS2");
66 else
67 return fTable.Get<size_t>("NAXIS2");
68 }
69 size_t GetBytesPerRow() const
70 {
71 if (fTable.is_compressed)
72 return fTable.Get<size_t>("ZNAXIS1");
73 else
74 return fTable.Get<size_t>("NAXIS1");
75 }
76
77protected:
78
79 // Stage the requested row to internal buffer
80 // Does NOT return data to users
81 virtual void StageRow(size_t row, char* dest)
82 {
83 if (!fTable.is_compressed)
84 {
85 fits::StageRow(row, dest);
86 return;
87 }
88 ReadBinaryRow(row, dest);
89 }
90
91private:
92
93 // Do what it takes to initialize the compressed structured
94 void InitCompressionReading()
95 {
96 fCatalogInitialized = true;
97
98 if (!fTable.is_compressed)
99 return;
100
101 //The constructor may have failed
102 if (!good())
103 return;
104
105 if (fTable.is_compressed)
106 for (auto it=fTable.sorted_cols.cbegin(); it!= fTable.sorted_cols.cend(); it++)
107 {
108 if (it->comp == kCompFACT)
109 continue;
110
111 clear(rdstate()|std::ios::badbit);
112#ifdef __EXCEPTIONS
113 throw std::runtime_error("Only the FACT compression scheme is handled by this reader.");
114#else
115 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << std::endl;
116 return;
117#endif
118 }
119
120 fColumnOrdering.resize(fTable.sorted_cols.size(), FITS::kOrderByRow);
121
122 //Get compressed specific keywords
123 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
124 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
125
126 //read the file's catalog
127 ReadCatalog();
128
129 //give it some space for uncompressing
130 AllocateBuffers();
131 }
132
133 // Copy decompressed data to location requested by user
134 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
135 {
136 if (!fTable.is_compressed)
137 {
138 fits::MoveColumnDataToUserSpace(dest, src, c);
139 return;
140 }
141
142 memcpy(dest, src, c.num*c.size);
143 }
144
145 bool fCatalogInitialized;
146
147 std::vector<char> fBuffer; ///<store the uncompressed rows
148 std::vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
149 std::vector<char> fCompressedBuffer; ///<compressed rows
150 std::vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
151
152 size_t fNumTiles; ///< Total number of tiles
153 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
154 int64_t fCurrentRow; ///< current row in memory signed because we need -1
155 size_t fShrinkFactor; ///< shrink factor
156
157 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
158 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
159
160 std::vector<std::vector<std::pair<int64_t, int64_t>>> fCatalog; ///< Catalog, i.e. the main table that points to the compressed data.
161 std::vector<size_t> fTileSize; ///< size in bytes of each compressed tile
162 std::vector<std::vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
163
164 Checksum fRawsum; ///< Checksum of the uncompressed, raw data
165
166 // Get buffer space
167 void AllocateBuffers()
168 {
169 uint32_t buffer_size = fTable.bytes_per_row*fNumRowsPerTile;
170 uint32_t compressed_buffer_size = fTable.bytes_per_row*fNumRowsPerTile +
171 //use a bit more memory for block headers. 256 char coding the compression sequence max.
172 fTable.num_cols*(sizeof(FITS::BlockHeader)+256) +
173 //a bit more for the tile headers
174 sizeof(FITS::TileHeader) +
175 //and a bit more for checksuming
176 8;
177
178 if (buffer_size % 4 != 0)
179 buffer_size += 4 - (buffer_size%4);
180
181 if (compressed_buffer_size % 4 != 0)
182 compressed_buffer_size += 4 - (compressed_buffer_size%4);
183
184 fBuffer.resize(buffer_size);
185
186 fTransposedBuffer.resize(buffer_size);
187 fCompressedBuffer.resize(compressed_buffer_size);
188 }
189
190 // Read catalog data. I.e. the address of the compressed data inside the heap
191 void ReadCatalog()
192 {
193 std::vector<char> readBuf(16);
194 fCatalog.resize(fNumTiles);
195
196 const streampos catalogStart = tellg();
197
198 fChkData.reset();
199
200 //do the actual reading
201 for (uint32_t i=0;i<fNumTiles;i++)
202 for (uint32_t j=0;j<fTable.num_cols;j++)
203 {
204 read(readBuf.data(), 2*sizeof(int64_t));
205 fChkData.add(readBuf);
206 //swap the bytes
207 int64_t tempValues[2] = {0,0};
208 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
209 if (tempValues[0] < 0 || tempValues[1] < 0)
210 {
211 clear(rdstate()|std::ios::badbit);
212#ifdef __EXCEPTIONS
213 throw std::runtime_error("Negative value in the catalog");
214#else
215 gLog << ___err___ << "ERROR - negative value in the catalog" << std::endl;
216 return;
217#endif
218 }
219 //add catalog entry
220 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
221 }
222
223 //see if there is a gap before heap data
224 fHeapOff = tellg()+fTable.GetHeapShift();
225 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
226
227 //check if the catalog has been shrinked
228 fShrinkFactor = HasKey("ZSHRINK") ? GetUInt("ZSHRINK") : 1;
229
230 if (fNumRowsPerTile%fShrinkFactor)
231 {
232 clear(rdstate()|std::ios::badbit);
233#ifdef __EXCEPTIONS
234 throw std::runtime_error("Rows per tile and shrink factor do not match");
235#else
236 gLog << ___err___ << "ERROR - Rows per tile and shrink factor do not match" << std::endl;
237 return;
238#endif
239 }
240
241 if (fShrinkFactor>0)
242 fNumRowsPerTile /= fShrinkFactor;
243
244 //compute the total size of each compressed tile
245 fTileSize.resize(fNumTiles);
246 fTileOffsets.resize(fNumTiles);
247 for (uint32_t i=0;i<fNumTiles;i++)
248 {
249 fTileSize[i] = 0;
250 for (uint32_t j=0;j<fTable.num_cols;j++)
251 {
252 fTileSize[i] += fCatalog[i][j].first;
253 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
254 }
255 }
256
257 if (!fCopy.is_open())
258 return;
259
260 //write catalog and heap gap to target file
261 seekg(catalogStart);
262
263 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
264
265 std::vector<char> buf(catSize);
266 read(buf.data(), catSize);
267
268 fCopy.write(buf.data(), catSize);
269 if (!fCopy)
270 clear(rdstate()|std::ios::badbit);
271 }
272
273 //overrides fits.h method with empty one
274 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
275 virtual void WriteRowToCopyFile(size_t row)
276 {
277 if (row == fRow+1)
278 fRawsum.add(fBufferRow);
279 }
280
281 // Compressed version of the read row, even files with shrunk catalogs
282 // can be read fully sequentially so that streaming, e.g. through
283 // stdout/stdin, is possible.
284 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
285 {
286 if (rowNum >= GetNumRows())
287 return false;
288
289 if (!fCatalogInitialized)
290 InitCompressionReading();
291
292 // Book keeping, where are we?
293 const int64_t requestedTile = rowNum / fNumRowsPerTile;
294 const int64_t currentTile = fCurrentRow / fNumRowsPerTile;
295
296 const int64_t requestedSuperTile = requestedTile / fShrinkFactor;
297 //const int64_t currentSuperTile = currentTile / fShrinkFactor;
298
299 const int64_t requestedSubTile = requestedTile % fShrinkFactor;
300 //const int64_t currentSubTile = currentTile % fShrinkFactor;
301
302 // Is this the first tile we read at all?
303 const bool isFirstTile = fCurrentRow<0;
304
305 // Is this just the next tile in the sequence?
306 const bool isNextTile = requestedTile==currentTile+1 || isFirstTile;
307
308 fCurrentRow = rowNum;
309
310 // Do we have to read a new tile from disk?
311 if (requestedTile!=currentTile || isFirstTile)
312 {
313 //skip to the beginning of the tile
314 const int64_t superTileStart = fCatalog[requestedSuperTile][0].second - sizeof(FITS::TileHeader);
315
316 std::vector<size_t> offsets = fTileOffsets[requestedSuperTile];
317
318 // If this is a sub tile we might have to step forward a bit and
319 // seek for the sub tile. If we were just reading the previous one
320 // we can skip that.
321 if (!isNextTile || isFirstTile)
322 {
323 // step to the beginnig of the super tile
324 seekg(fHeapOff+superTileStart);
325
326 // If there are sub tiles we might have to seek through the super tile
327 for (uint32_t k=0; k<requestedSubTile; k++)
328 {
329 // Read header
330 FITS::TileHeader header;
331 read((char*)&header, sizeof(FITS::TileHeader));
332
333 // Skip to the next header
334 seekg(header.size-sizeof(FITS::TileHeader), cur);
335 }
336 }
337
338 // this is now the beginning of the sub-tile we want to read
339 const int64_t subTileStart = tellg() - fHeapOff;
340 // calculate the 32 bits offset of the current tile.
341 const uint32_t offset = (subTileStart + fHeapFromDataStart)%4;
342
343 // start of destination buffer (padding comes later)
344 char *destBuffer = fCompressedBuffer.data()+offset;
345
346 // Store the current tile size once known
347 size_t currentTileSize = 0;
348
349 // If this is a request for a sub tile which is not cataloged
350 // recalculate the offsets from the buffer, once read
351 if (requestedSubTile>0)
352 {
353 // Read header
354 read(destBuffer, sizeof(FITS::TileHeader));
355
356 // Get size of tile
357 currentTileSize = reinterpret_cast<FITS::TileHeader*>(destBuffer)->size;
358
359 // now read the remaining bytes of this tile
360 read(destBuffer+sizeof(FITS::TileHeader), currentTileSize-sizeof(FITS::TileHeader));
361
362 // Calculate the offsets recursively
363 offsets[0] = 0;
364
365 //skip through the columns
366 for (size_t i=0; i<fTable.num_cols-1; i++)
367 {
368 //zero sized column do not have headers. Skip it
369 if (fTable.sorted_cols[i].num == 0)
370 {
371 offsets[i+1] = offsets[i];
372 continue;
373 }
374
375 const char *pos = destBuffer + offsets[i] + sizeof(FITS::TileHeader);
376 offsets[i+1] = offsets[i] + reinterpret_cast<const FITS::BlockHeader*>(pos)->size;
377 }
378 }
379 else
380 {
381 // If we are reading the first tile of a super tile, all information
382 // is already available.
383 currentTileSize = fTileSize[requestedSuperTile] + sizeof(FITS::TileHeader);
384 read(destBuffer, currentTileSize);
385 }
386
387
388 // If we are reading sequentially, calcualte checksum
389 if (isNextTile)
390 {
391 // Padding for checksum calculation
392 memset(fCompressedBuffer.data(), 0, offset);
393 memset(destBuffer+currentTileSize, 0, fCompressedBuffer.size()-currentTileSize-offset);
394 fChkData.add(fCompressedBuffer);
395 }
396
397 // Check if we are writing a copy of the file
398 if (isNextTile && fCopy.is_open() && fCopy.good())
399 {
400 fCopy.write(fCompressedBuffer.data()+offset, currentTileSize);
401 if (!fCopy)
402 clear(rdstate()|std::ios::badbit);
403 }
404 else
405 if (fCopy.is_open())
406 clear(rdstate()|std::ios::badbit);
407
408
409 // uncompress the buffer
410 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
411 UncompressBuffer(offsets, thisRoundNumRows, offset+sizeof(FITS::TileHeader));
412
413 // pointer to column (source buffer)
414 const char *src = fTransposedBuffer.data();
415
416 uint32_t i=0;
417 for (auto it=fTable.sorted_cols.cbegin(); it!=fTable.sorted_cols.cend(); it++, i++)
418 {
419 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
420
421 switch (fColumnOrdering[i])
422 {
423 case FITS::kOrderByRow:
424 // regular, "semi-transposed" copy
425 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
426 {
427 memcpy(dest, src, it->bytes);
428 src += it->bytes; // next column
429 }
430 break;
431
432 case FITS::kOrderByCol:
433 // transposed copy
434 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
435 {
436 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
437 {
438 memcpy(dest, src, it->size);
439 src += it->size; // next element
440 }
441 }
442 break;
443
444 default:
445 clear(rdstate()|std::ios::badbit);
446#ifdef __EXCEPTIONS
447 throw std::runtime_error("Unkown column ordering scheme found");
448#else
449 gLog << ___err___ << "ERROR - unkown column ordering scheme" << std::endl;
450 return false;
451#endif
452 break;
453 };
454 }
455 }
456
457 //Data loaded and uncompressed. Copy it to destination
458 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
459 return good();
460 }
461
462 // Read a bunch of uncompressed data
463 uint32_t UncompressUNCOMPRESSED(char* dest,
464 const char* src,
465 uint32_t numElems,
466 uint32_t sizeOfElems)
467 {
468 memcpy(dest, src, numElems*sizeOfElems);
469 return numElems*sizeOfElems;
470 }
471
472 // Read a bunch of data compressed with the Huffman algorithm
473 uint32_t UncompressHUFFMAN16(char* dest,
474 const char* src,
475 uint32_t numChunks)
476 {
477 std::vector<uint16_t> uncompressed;
478
479 //read compressed sizes (one per row)
480 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
481 src += sizeof(uint32_t)*numChunks;
482
483 //uncompress the rows, one by one
484 uint32_t sizeWritten = 0;
485 for (uint32_t j=0;j<numChunks;j++)
486 {
487 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
488
489 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
490
491 sizeWritten += uncompressed.size()*sizeof(uint16_t);
492 dest += uncompressed.size()*sizeof(uint16_t);
493 src += compressedSizes[j];
494 }
495 return sizeWritten;
496 }
497
498 // Apply the inverse transform of the integer smoothing
499 uint32_t UnApplySMOOTHING(int16_t* data,
500 uint32_t numElems)
501 {
502 //un-do the integer smoothing
503 for (uint32_t j=2;j<numElems;j++)
504 data[j] = data[j] + (data[j-1]+data[j-2])/2;
505
506 return numElems*sizeof(uint16_t);
507 }
508
509 // Data has been read from disk. Uncompress it !
510 void UncompressBuffer(const std::vector<size_t> &offsets,
511 const uint32_t &thisRoundNumRows,
512 const uint32_t offset)
513 {
514 char *dest = fTransposedBuffer.data();
515
516 //uncompress column by column
517 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
518 {
519 const fits::Table::Column &col = fTable.sorted_cols[i];
520 if (col.num == 0)
521 continue;
522
523 //get the compression flag
524 const int64_t compressedOffset = offsets[i]+offset;
525
526 const FITS::BlockHeader* head = reinterpret_cast<FITS::BlockHeader*>(&fCompressedBuffer[compressedOffset]);
527
528 fColumnOrdering[i] = head->ordering;
529
530 const uint32_t numRows = (head->ordering==FITS::kOrderByRow) ? thisRoundNumRows : col.num;
531 const uint32_t numCols = (head->ordering==FITS::kOrderByCol) ? thisRoundNumRows : col.num;
532
533 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(FITS::BlockHeader)+sizeof(uint16_t)*head->numProcs;
534
535 for (int32_t j=head->numProcs-1;j >= 0; j--)
536 {
537 uint32_t sizeWritten=0;
538
539 switch (head->processings[j])
540 {
541 case FITS::kFactRaw:
542 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
543 break;
544
545 case FITS::kFactSmoothing:
546 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
547 break;
548
549 case FITS::kFactHuffman16:
550 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
551 break;
552
553 default:
554 clear(rdstate()|std::ios::badbit);
555
556 std::ostringstream str;
557 str << "Unkown processing applied to data. Col " << i << " proc " << j << " out of " << (int)head->numProcs;
558#ifdef __EXCEPTIONS
559 throw std::runtime_error(str.str());
560#else
561 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << std::endl;
562 return;
563#endif
564 }
565 //increment destination counter only when processing done.
566 if (j==0)
567 dest+= sizeWritten;
568 }
569 }
570 }
571
572 void CheckIfFileIsConsistent(bool update_catalog=false)
573 {
574 //goto start of heap
575 const streamoff whereAreWe = tellg();
576 seekg(fHeapOff);
577
578 //init number of rows to zero
579 uint64_t numRows = 0;
580
581 //get number of columns from header
582 const size_t numCols = fTable.num_cols;
583
584 std::vector<std::vector<std::pair<int64_t, int64_t> > > catalog;
585
586 FITS::TileHeader tileHead;
587 FITS::BlockHeader columnHead;
588
589 streamoff offsetInHeap = 0;
590 //skip through the heap
591 while (true)
592 {
593 read((char*)(&tileHead), sizeof(FITS::TileHeader));
594 //end of file
595 if (!good())
596 break;
597
598 //padding or corrupt data
599 if (memcmp(tileHead.id, "TILE", 4))
600 {
601 clear(rdstate()|std::ios::badbit);
602 break;
603 }
604
605 //a new tile begins here
606 catalog.emplace_back();
607 offsetInHeap += sizeof(FITS::TileHeader);
608
609 //skip through the columns
610 for (size_t i=0;i<numCols;i++)
611 {
612 //zero sized column do not have headers. Skip it
613 if (fTable.sorted_cols[i].num == 0)
614 {
615 catalog.back().emplace_back(0,0);
616 continue;
617 }
618
619 //read column header
620 read((char*)(&columnHead), sizeof(FITS::BlockHeader));
621
622 //corrupted tile
623 if (!good())
624 break;
625
626 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
627 offsetInHeap += columnHead.size;
628 seekg(fHeapOff+offsetInHeap);
629 }
630
631 //if we ain't good, this means that something went wrong inside the current tile.
632 if (!good())
633 {
634 catalog.pop_back();
635 break;
636 }
637 //current tile is complete. Add rows
638 numRows += tileHead.numRows;
639 }
640
641 if (numRows != fTable.num_rows)
642 {
643 clear(rdstate()|std::ios::badbit);
644 std::ostringstream str;
645 str << "Heap data does not agree with header: " << numRows << " calculated vs " << fTable.num_rows << " from header.";
646#ifdef __EXCEPTIONS
647 throw std::runtime_error(str.str());
648#else
649 gLog << ___err___ << "ERROR - " << str.str() << std::endl;
650 return;
651#endif
652 }
653
654 if (update_catalog)
655 {
656 fCatalog = catalog;
657 //clear the bad bit before seeking back (we hit eof)
658 clear();
659 seekg(whereAreWe);
660 return;
661 }
662
663 if (catalog.size() != fCatalog.size())
664 {
665 clear(rdstate()|std::ios::badbit);
666#ifdef __EXCEPTIONS
667 throw std::runtime_error("Heap data does not agree with header.");
668#else
669 gLog << ___err___ << "ERROR - Heap data does not agree with header." << std::endl;
670 return;
671#endif
672 }
673
674 for (uint32_t i=0;i<catalog.size(); i++)
675 for (uint32_t j=0;j<numCols;j++)
676 {
677 if (catalog[i][j].first != fCatalog[i][j].first ||
678 catalog[i][j].second != fCatalog[i][j].second)
679 {
680 clear(rdstate()|std::ios::badbit);
681#ifdef __EXCEPTIONS
682 throw std::runtime_error("Heap data does not agree with header.");
683#else
684 gLog << ___err___ << "ERROR - Heap data does not agree with header." << std::endl;
685 return;
686#endif
687 }
688 }
689 //go back to start of heap
690 //clear the bad bit before seeking back (we hit eof)
691 clear();
692 seekg(whereAreWe);
693 }
694
695};//class zfits
696
697#endif
Note: See TracBrowser for help on using the repository browser.