source: trunk/Mars/mcore/zfits.h@ 20066

Last change on this file since 20066 was 20066, checked in by tbretz, 4 months ago
Minor fix on excpetion formatting issue.
File size: 23.9 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_zfits
9#define MARS_zfits
10
11#include "fits.h"
12#include "huffman.h"
13
14#include "FITS.h"
15
16class zfits : public fits
17{
18public:
19
20 // Basic constructor
21 zfits(const std::string& fname, const std::string& tableName="", bool force=false)
22 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
23 {
24 open(fname.c_str());
25 Constructor(fname, "", tableName, force);
26// InitCompressionReading();
27 }
28
29 // Alternative constructor
30 zfits(const std::string& fname, const std::string& fout, const std::string& tableName, bool force=false)
31 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
32 {
33 open(fname.c_str());
34 Constructor(fname, fout, tableName, force);
35// InitCompressionReading();
36 }
37
38 // Skip the next row
39 bool SkipNextRow()
40 {
41 if (!fTable.is_compressed)
42 return fits::SkipNextRow();
43
44 fRow++;
45 return true;
46 }
47
48 virtual bool IsFileOk() const
49 {
50 if (!HasKey("RAWSUM"))
51 return fits::IsFileOk();
52
53 const bool rawsum = GetStr("RAWSUM") == std::to_string((long long int)fRawsum.val());
54 return fits::IsFileOk() && rawsum;
55 };
56
57 size_t GetNumRows() const
58 {
59 return fTable.Get<size_t>(fTable.is_compressed ? "ZNAXIS2" : "NAXIS2");
60 }
61
62 size_t GetBytesPerRow() const
63 {
64 return fTable.Get<size_t>(fTable.is_compressed ? "ZNAXIS1" : "NAXIS1");
65 }
66
67protected:
68
69 // Stage the requested row to internal buffer
70 // Does NOT return data to users
71 virtual void StageRow(size_t row, char* dest)
72 {
73 if (!fTable.is_compressed)
74 {
75 fits::StageRow(row, dest);
76 return;
77 }
78 ReadBinaryRow(row, dest);
79 }
80
81private:
82
83 // Do what it takes to initialize the compressed structured
84 void InitCompressionReading()
85 {
86 fCatalogInitialized = true;
87
88 if (!fTable.is_compressed)
89 return;
90
91 //The constructor may have failed
92 if (!good())
93 return;
94
95 if (fTable.is_compressed)
96 for (auto it=fTable.sorted_cols.cbegin(); it!= fTable.sorted_cols.cend(); it++)
97 {
98 if (it->comp == kCompFACT)
99 continue;
100
101 clear(rdstate()|std::ios::badbit);
102#ifdef __EXCEPTIONS
103 throw std::runtime_error("Only the FACT compression scheme is handled by this reader.");
104#else
105 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << std::endl;
106 return;
107#endif
108 }
109
110 fColumnOrdering.resize(fTable.sorted_cols.size(), FITS::kOrderByRow);
111
112 //Get compressed specific keywords
113 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
114 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
115
116 //read the file's catalog
117 ReadCatalog();
118
119 //give it some space for uncompressing
120 AllocateBuffers();
121 }
122
123 // Copy decompressed data to location requested by user
124 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
125 {
126 if (!fTable.is_compressed)
127 {
128 fits::MoveColumnDataToUserSpace(dest, src, c);
129 return;
130 }
131
132 memcpy(dest, src, c.num*c.size);
133 }
134
135 bool fCatalogInitialized;
136
137 std::vector<char> fBuffer; ///<store the uncompressed rows
138 std::vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
139 std::vector<char> fCompressedBuffer; ///<compressed rows
140 std::vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
141
142 size_t fNumTiles; ///< Total number of tiles
143 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
144 int64_t fCurrentRow; ///< current row in memory signed because we need -1
145 size_t fShrinkFactor; ///< shrink factor
146
147 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
148 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
149
150 std::vector<std::vector<std::pair<int64_t, int64_t>>> fCatalog; ///< Catalog, i.e. the main table that points to the compressed data.
151 std::vector<size_t> fTileSize; ///< size in bytes of each compressed tile
152 std::vector<std::vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
153
154 Checksum fRawsum; ///< Checksum of the uncompressed, raw data
155
156 // Get buffer space
157 void AllocateBuffers()
158 {
159 uint32_t buffer_size = fTable.bytes_per_row*fNumRowsPerTile;
160 uint32_t compressed_buffer_size = fTable.bytes_per_row*fNumRowsPerTile +
161 //use a bit more memory for block headers. 256 char coding the compression sequence max.
162 fTable.num_cols*(sizeof(FITS::BlockHeader)+256) +
163 //a bit more for the tile headers
164 sizeof(FITS::TileHeader) +
165 //and a bit more for checksuming
166 8;
167
168 if (buffer_size % 4 != 0)
169 buffer_size += 4 - (buffer_size%4);
170
171 if (compressed_buffer_size % 4 != 0)
172 compressed_buffer_size += 4 - (compressed_buffer_size%4);
173
174 fBuffer.resize(buffer_size);
175
176 fTransposedBuffer.resize(buffer_size);
177 fCompressedBuffer.resize(compressed_buffer_size);
178 }
179
180 // Read catalog data. I.e. the address of the compressed data inside the heap
181 void ReadCatalog()
182 {
183 std::vector<char> readBuf(16);
184 fCatalog.resize(fNumTiles);
185
186 const streampos catalogStart = tellg();
187
188 fChkData.reset();
189
190 //do the actual reading
191 for (uint32_t i=0;i<fNumTiles;i++)
192 for (uint32_t j=0;j<fTable.num_cols;j++)
193 {
194 read(readBuf.data(), 2*sizeof(int64_t));
195 fChkData.add(readBuf);
196 //swap the bytes
197 int64_t tempValues[2] = {0,0};
198 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
199 if (tempValues[0] < 0 || tempValues[1] < 0)
200 {
201 clear(rdstate()|std::ios::badbit);
202#ifdef __EXCEPTIONS
203 throw std::runtime_error("Negative value in the catalog");
204#else
205 gLog << ___err___ << "ERROR - negative value in the catalog" << std::endl;
206 return;
207#endif
208 }
209 //add catalog entry
210 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
211 }
212
213 //see if there is a gap before heap data
214 fHeapOff = tellg()+fTable.GetHeapShift();
215 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
216
217 //check if the catalog has been shrinked
218 fShrinkFactor = HasKey("ZSHRINK") ? GetUInt("ZSHRINK") : 1;
219
220 if (fNumRowsPerTile%fShrinkFactor)
221 {
222 clear(rdstate()|std::ios::badbit);
223#ifdef __EXCEPTIONS
224 throw std::runtime_error("Rows per tile and shrink factor do not match");
225#else
226 gLog << ___err___ << "ERROR - Rows per tile and shrink factor do not match" << std::endl;
227 return;
228#endif
229 }
230
231 if (fShrinkFactor>0)
232 fNumRowsPerTile /= fShrinkFactor;
233
234 //compute the total size of each compressed tile
235 fTileSize.resize(fNumTiles);
236 fTileOffsets.resize(fNumTiles);
237 for (uint32_t i=0;i<fNumTiles;i++)
238 {
239 fTileSize[i] = 0;
240 for (uint32_t j=0;j<fTable.num_cols;j++)
241 {
242 fTileSize[i] += fCatalog[i][j].first;
243 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
244 }
245 }
246
247 if (!fCopy.is_open())
248 return;
249
250 //write catalog and heap gap to target file
251 seekg(catalogStart);
252
253 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
254
255 std::vector<char> buf(catSize);
256 read(buf.data(), catSize);
257
258 fCopy.write(buf.data(), catSize);
259 if (!fCopy)
260 clear(rdstate()|std::ios::badbit);
261 }
262
263 //overrides fits.h method with empty one
264 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
265 virtual void WriteRowToCopyFile(size_t row)
266 {
267 if (row == fRow+1)
268 fRawsum.add(fBufferRow);
269 }
270
271 // Compressed version of the read row, even files with shrunk catalogs
272 // can be read fully sequentially so that streaming, e.g. through
273 // stdout/stdin, is possible.
274 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
275 {
276 if (rowNum >= GetNumRows())
277 return false;
278
279 if (!fCatalogInitialized)
280 InitCompressionReading();
281
282 // Book keeping, where are we?
283 const int64_t requestedTile = rowNum / fNumRowsPerTile;
284 const int64_t currentTile = fCurrentRow / fNumRowsPerTile;
285
286 const int64_t requestedSuperTile = requestedTile / fShrinkFactor;
287 //const int64_t currentSuperTile = currentTile / fShrinkFactor;
288
289 const int64_t requestedSubTile = requestedTile % fShrinkFactor;
290 //const int64_t currentSubTile = currentTile % fShrinkFactor;
291
292 // Is this the first tile we read at all?
293 const bool isFirstTile = fCurrentRow<0;
294
295 // Is this just the next tile in the sequence?
296 const bool isNextTile = requestedTile==currentTile+1 || isFirstTile;
297
298 fCurrentRow = rowNum;
299
300 // Do we have to read a new tile from disk?
301 if (requestedTile!=currentTile || isFirstTile)
302 {
303 //skip to the beginning of the tile
304 const int64_t superTileStart = fCatalog[requestedSuperTile][0].second - sizeof(FITS::TileHeader);
305
306 std::vector<size_t> offsets = fTileOffsets[requestedSuperTile];
307
308 // If this is a sub tile we might have to step forward a bit and
309 // seek for the sub tile. If we were just reading the previous one
310 // we can skip that.
311 if (!isNextTile || isFirstTile)
312 {
313 // step to the beginnig of the super tile
314 seekg(fHeapOff+superTileStart);
315
316 // If there are sub tiles we might have to seek through the super tile
317 for (uint32_t k=0; k<requestedSubTile; k++)
318 {
319 // Read header
320 FITS::TileHeader header;
321 read((char*)&header, sizeof(FITS::TileHeader));
322
323 // Skip to the next header
324 seekg(header.size-sizeof(FITS::TileHeader), cur);
325 }
326 }
327
328 // this is now the beginning of the sub-tile we want to read
329 const int64_t subTileStart = tellg() - fHeapOff;
330 // calculate the 32 bits offset of the current tile.
331 const uint32_t offset = (subTileStart + fHeapFromDataStart)%4;
332
333 // start of destination buffer (padding comes later)
334 char *destBuffer = fCompressedBuffer.data()+offset;
335
336 // Store the current tile size once known
337 size_t currentTileSize = 0;
338
339 // If this is a request for a sub tile which is not cataloged
340 // recalculate the offsets from the buffer, once read
341 if (requestedSubTile>0)
342 {
343 // Read header
344 read(destBuffer, sizeof(FITS::TileHeader));
345
346 // Get size of tile
347 currentTileSize = reinterpret_cast<FITS::TileHeader*>(destBuffer)->size;
348
349 // now read the remaining bytes of this tile
350 read(destBuffer+sizeof(FITS::TileHeader), currentTileSize-sizeof(FITS::TileHeader));
351
352 // Calculate the offsets recursively
353 offsets[0] = 0;
354
355 //skip through the columns
356 for (size_t i=0; i<fTable.num_cols-1; i++)
357 {
358 //zero sized column do not have headers. Skip it
359 if (fTable.sorted_cols[i].num == 0)
360 {
361 offsets[i+1] = offsets[i];
362 continue;
363 }
364
365 const char *pos = destBuffer + offsets[i] + sizeof(FITS::TileHeader);
366 offsets[i+1] = offsets[i] + reinterpret_cast<const FITS::BlockHeader*>(pos)->size;
367 }
368 }
369 else
370 {
371 // If we are reading the first tile of a super tile, all information
372 // is already available.
373 currentTileSize = fTileSize[requestedSuperTile] + sizeof(FITS::TileHeader);
374 read(destBuffer, currentTileSize);
375 }
376
377
378 // If we are reading sequentially, calcualte checksum
379 if (isNextTile)
380 {
381 // Padding for checksum calculation
382 memset(fCompressedBuffer.data(), 0, offset);
383 memset(destBuffer+currentTileSize, 0, fCompressedBuffer.size()-currentTileSize-offset);
384 fChkData.add(fCompressedBuffer);
385 }
386
387 // Check if we are writing a copy of the file
388 if (isNextTile && fCopy.is_open() && fCopy.good())
389 {
390 fCopy.write(fCompressedBuffer.data()+offset, currentTileSize);
391 if (!fCopy)
392 clear(rdstate()|std::ios::badbit);
393 }
394 else
395 if (fCopy.is_open())
396 clear(rdstate()|std::ios::badbit);
397
398
399 // uncompress the buffer
400 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
401 if (!UncompressBuffer(offsets, thisRoundNumRows, offset+sizeof(FITS::TileHeader)))
402 return false;
403
404 // pointer to column (source buffer)
405 const char *src = fTransposedBuffer.data();
406
407 uint32_t i=0;
408 for (auto it=fTable.sorted_cols.cbegin(); it!=fTable.sorted_cols.cend(); it++, i++)
409 {
410 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
411
412 switch (fColumnOrdering[i])
413 {
414 case FITS::kOrderByRow:
415 // regular, "semi-transposed" copy
416 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
417 {
418 memcpy(dest, src, it->bytes);
419 src += it->bytes; // next column
420 }
421 break;
422
423 case FITS::kOrderByCol:
424 // transposed copy
425 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
426 {
427 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
428 {
429 memcpy(dest, src, it->size);
430 src += it->size; // next element
431 }
432 }
433 break;
434
435 default:
436 clear(rdstate()|std::ios::badbit);
437
438 std::ostringstream str;
439 str << "Unkown column ordering scheme found (i=" << i << ", " << int(fColumnOrdering[i]) << ")";
440#ifdef __EXCEPTIONS
441 throw std::runtime_error(str.str());
442#else
443 gLog << ___err___ << "ERROR - " << str.str() << std::endl;
444 return false;
445#endif
446 };
447 }
448 }
449
450 //Data loaded and uncompressed. Copy it to destination
451 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
452 return good();
453 }
454
455 // Read a bunch of uncompressed data
456 uint32_t UncompressUNCOMPRESSED(char* dest,
457 const char* src,
458 uint32_t numElems,
459 uint32_t sizeOfElems)
460 {
461 memcpy(dest, src, numElems*sizeOfElems);
462 return numElems*sizeOfElems;
463 }
464
465 // Read a bunch of data compressed with the Huffman algorithm
466 uint32_t UncompressHUFFMAN16(char* dest,
467 const char* src,
468 uint32_t numChunks)
469 {
470 std::vector<uint16_t> uncompressed;
471
472 //read compressed sizes (one per row)
473 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
474 src += sizeof(uint32_t)*numChunks;
475
476 //uncompress the rows, one by one
477 uint32_t sizeWritten = 0;
478 for (uint32_t j=0;j<numChunks;j++)
479 {
480 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
481
482 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
483
484 sizeWritten += uncompressed.size()*sizeof(uint16_t);
485 dest += uncompressed.size()*sizeof(uint16_t);
486 src += compressedSizes[j];
487 }
488 return sizeWritten;
489 }
490
491 // Apply the inverse transform of the integer smoothing
492 uint32_t UnApplySMOOTHING(int16_t* data,
493 uint32_t numElems)
494 {
495 //un-do the integer smoothing
496 for (uint32_t j=2;j<numElems;j++)
497 data[j] = data[j] + (data[j-1]+data[j-2])/2;
498
499 return numElems*sizeof(uint16_t);
500 }
501
502 // Data has been read from disk. Uncompress it !
503 bool UncompressBuffer(const std::vector<size_t> &offsets,
504 const uint32_t &thisRoundNumRows,
505 const uint32_t offset)
506 {
507 char *dest = fTransposedBuffer.data();
508
509 //uncompress column by column
510 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
511 {
512 const fits::Table::Column &col = fTable.sorted_cols[i];
513 if (col.num == 0)
514 continue;
515
516 //get the compression flag
517 const int64_t compressedOffset = offsets[i]+offset;
518
519 const FITS::BlockHeader* head = reinterpret_cast<FITS::BlockHeader*>(&fCompressedBuffer[compressedOffset]);
520 const uint16_t *processings = reinterpret_cast<const uint16_t*>(reinterpret_cast<const char*>(head)+sizeof(FITS::BlockHeader));
521
522 fColumnOrdering[i] = head->ordering;
523
524 const uint32_t numRows = (head->ordering==FITS::kOrderByRow) ? thisRoundNumRows : col.num;
525 const uint32_t numCols = (head->ordering==FITS::kOrderByCol) ? thisRoundNumRows : col.num;
526
527 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(FITS::BlockHeader)+sizeof(uint16_t)*head->numProcs;
528
529 for (int32_t j=head->numProcs-1;j >= 0; j--)
530 {
531 uint32_t sizeWritten=0;
532
533 switch (processings[j])
534 {
535 case FITS::kFactRaw:
536 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
537 break;
538
539 case FITS::kFactSmoothing:
540 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
541 break;
542
543 case FITS::kFactHuffman16:
544 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
545 break;
546
547 default:
548 clear(rdstate()|std::ios::badbit);
549
550 std::ostringstream str;
551 str << "Unknown processing applied to data (col=" << i << ", proc=" << j << "/" << (int)head->numProcs;
552#ifdef __EXCEPTIONS
553 throw std::runtime_error(str.str());
554#else
555 gLog << ___err___ << "ERROR - " << str.str() << std::endl;
556 return false;
557#endif
558 }
559 //increment destination counter only when processing done.
560 if (j==0)
561 dest+= sizeWritten;
562 }
563 }
564
565 return true;
566 }
567
568 void CheckIfFileIsConsistent(bool update_catalog=false)
569 {
570 //goto start of heap
571 const streamoff whereAreWe = tellg();
572 seekg(fHeapOff);
573
574 //init number of rows to zero
575 uint64_t numRows = 0;
576
577 //get number of columns from header
578 const size_t numCols = fTable.num_cols;
579
580 std::vector<std::vector<std::pair<int64_t, int64_t> > > catalog;
581
582 FITS::TileHeader tileHead;
583 FITS::BlockHeader columnHead;
584
585 streamoff offsetInHeap = 0;
586 //skip through the heap
587 while (true)
588 {
589 read((char*)(&tileHead), sizeof(FITS::TileHeader));
590 //end of file
591 if (!good())
592 break;
593
594 //padding or corrupt data
595 if (memcmp(tileHead.id, "TILE", 4))
596 {
597 clear(rdstate()|std::ios::badbit);
598 break;
599 }
600
601 //a new tile begins here
602 catalog.emplace_back();
603 offsetInHeap += sizeof(FITS::TileHeader);
604
605 //skip through the columns
606 for (size_t i=0;i<numCols;i++)
607 {
608 //zero sized column do not have headers. Skip it
609 if (fTable.sorted_cols[i].num == 0)
610 {
611 catalog.back().emplace_back(0,0);
612 continue;
613 }
614
615 //read column header
616 read((char*)(&columnHead), sizeof(FITS::BlockHeader));
617
618 //corrupted tile
619 if (!good())
620 break;
621
622 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
623 offsetInHeap += columnHead.size;
624 seekg(fHeapOff+offsetInHeap);
625 }
626
627 //if we ain't good, this means that something went wrong inside the current tile.
628 if (!good())
629 {
630 catalog.pop_back();
631 break;
632 }
633 //current tile is complete. Add rows
634 numRows += tileHead.numRows;
635 }
636
637 if (numRows != fTable.num_rows)
638 {
639 clear(rdstate()|std::ios::badbit);
640 std::ostringstream str;
641 str << "Heap data does not agree with header: " << numRows << " calculated vs " << fTable.num_rows << " from header.";
642#ifdef __EXCEPTIONS
643 throw std::runtime_error(str.str());
644#else
645 gLog << ___err___ << "ERROR - " << str.str() << std::endl;
646 return;
647#endif
648 }
649
650 if (update_catalog)
651 {
652 fCatalog = catalog;
653 //clear the bad bit before seeking back (we hit eof)
654 clear();
655 seekg(whereAreWe);
656 return;
657 }
658
659 if (catalog.size() != fCatalog.size())
660 {
661 clear(rdstate()|std::ios::badbit);
662#ifdef __EXCEPTIONS
663 throw std::runtime_error("Heap data does not agree with header.");
664#else
665 gLog << ___err___ << "ERROR - Heap data does not agree with header." << std::endl;
666 return;
667#endif
668 }
669
670 for (uint32_t i=0;i<catalog.size(); i++)
671 for (uint32_t j=0;j<numCols;j++)
672 {
673 if (catalog[i][j].first != fCatalog[i][j].first ||
674 catalog[i][j].second != fCatalog[i][j].second)
675 {
676 clear(rdstate()|std::ios::badbit);
677#ifdef __EXCEPTIONS
678 throw std::runtime_error("Heap data does not agree with header.");
679#else
680 gLog << ___err___ << "ERROR - Heap data does not agree with header." << std::endl;
681 return;
682#endif
683 }
684 }
685 //go back to start of heap
686 //clear the bad bit before seeking back (we hit eof)
687 clear();
688 seekg(whereAreWe);
689 }
690
691};//class zfits
692
693#endif
Note: See TracBrowser for help on using the repository browser.