source: trunk/Mars/mcore/zfits.h@ 20001

Last change on this file since 20001 was 19284, checked in by tbretz, 6 years ago
Removed processings from BlockHeader, as a variable size array, it created problem with compiling the root dictionary with certain compilers. Now, the memory position is calculated directly in the code (fortunately, this 'trick' was only used in one location).
File size: 23.9 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_zfits
9#define MARS_zfits
10
11#include "fits.h"
12#include "huffman.h"
13
14#include "FITS.h"
15
16class zfits : public fits
17{
18public:
19
20 // Basic constructor
21 zfits(const std::string& fname, const std::string& tableName="", bool force=false)
22 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
23 {
24 open(fname.c_str());
25 Constructor(fname, "", tableName, force);
26// InitCompressionReading();
27 }
28
29 // Alternative constructor
30 zfits(const std::string& fname, const std::string& fout, const std::string& tableName, bool force=false)
31 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
32 {
33 open(fname.c_str());
34 Constructor(fname, fout, tableName, force);
35// InitCompressionReading();
36 }
37
38 // Skip the next row
39 bool SkipNextRow()
40 {
41 if (!fTable.is_compressed)
42 return fits::SkipNextRow();
43
44 fRow++;
45 return true;
46 }
47
48 virtual bool IsFileOk() const
49 {
50 if (!HasKey("RAWSUM"))
51 return fits::IsFileOk();
52
53 const bool rawsum = GetStr("RAWSUM") == std::to_string((long long int)fRawsum.val());
54 return fits::IsFileOk() && rawsum;
55 };
56
57 size_t GetNumRows() const
58 {
59 return fTable.Get<size_t>(fTable.is_compressed ? "ZNAXIS2" : "NAXIS2");
60 }
61
62 size_t GetBytesPerRow() const
63 {
64 return fTable.Get<size_t>(fTable.is_compressed ? "ZNAXIS1" : "NAXIS1");
65 }
66
67protected:
68
69 // Stage the requested row to internal buffer
70 // Does NOT return data to users
71 virtual void StageRow(size_t row, char* dest)
72 {
73 if (!fTable.is_compressed)
74 {
75 fits::StageRow(row, dest);
76 return;
77 }
78 ReadBinaryRow(row, dest);
79 }
80
81private:
82
83 // Do what it takes to initialize the compressed structured
84 void InitCompressionReading()
85 {
86 fCatalogInitialized = true;
87
88 if (!fTable.is_compressed)
89 return;
90
91 //The constructor may have failed
92 if (!good())
93 return;
94
95 if (fTable.is_compressed)
96 for (auto it=fTable.sorted_cols.cbegin(); it!= fTable.sorted_cols.cend(); it++)
97 {
98 if (it->comp == kCompFACT)
99 continue;
100
101 clear(rdstate()|std::ios::badbit);
102#ifdef __EXCEPTIONS
103 throw std::runtime_error("Only the FACT compression scheme is handled by this reader.");
104#else
105 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << std::endl;
106 return;
107#endif
108 }
109
110 fColumnOrdering.resize(fTable.sorted_cols.size(), FITS::kOrderByRow);
111
112 //Get compressed specific keywords
113 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
114 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
115
116 //read the file's catalog
117 ReadCatalog();
118
119 //give it some space for uncompressing
120 AllocateBuffers();
121 }
122
123 // Copy decompressed data to location requested by user
124 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
125 {
126 if (!fTable.is_compressed)
127 {
128 fits::MoveColumnDataToUserSpace(dest, src, c);
129 return;
130 }
131
132 memcpy(dest, src, c.num*c.size);
133 }
134
135 bool fCatalogInitialized;
136
137 std::vector<char> fBuffer; ///<store the uncompressed rows
138 std::vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
139 std::vector<char> fCompressedBuffer; ///<compressed rows
140 std::vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
141
142 size_t fNumTiles; ///< Total number of tiles
143 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
144 int64_t fCurrentRow; ///< current row in memory signed because we need -1
145 size_t fShrinkFactor; ///< shrink factor
146
147 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
148 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
149
150 std::vector<std::vector<std::pair<int64_t, int64_t>>> fCatalog; ///< Catalog, i.e. the main table that points to the compressed data.
151 std::vector<size_t> fTileSize; ///< size in bytes of each compressed tile
152 std::vector<std::vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
153
154 Checksum fRawsum; ///< Checksum of the uncompressed, raw data
155
156 // Get buffer space
157 void AllocateBuffers()
158 {
159 uint32_t buffer_size = fTable.bytes_per_row*fNumRowsPerTile;
160 uint32_t compressed_buffer_size = fTable.bytes_per_row*fNumRowsPerTile +
161 //use a bit more memory for block headers. 256 char coding the compression sequence max.
162 fTable.num_cols*(sizeof(FITS::BlockHeader)+256) +
163 //a bit more for the tile headers
164 sizeof(FITS::TileHeader) +
165 //and a bit more for checksuming
166 8;
167
168 if (buffer_size % 4 != 0)
169 buffer_size += 4 - (buffer_size%4);
170
171 if (compressed_buffer_size % 4 != 0)
172 compressed_buffer_size += 4 - (compressed_buffer_size%4);
173
174 fBuffer.resize(buffer_size);
175
176 fTransposedBuffer.resize(buffer_size);
177 fCompressedBuffer.resize(compressed_buffer_size);
178 }
179
180 // Read catalog data. I.e. the address of the compressed data inside the heap
181 void ReadCatalog()
182 {
183 std::vector<char> readBuf(16);
184 fCatalog.resize(fNumTiles);
185
186 const streampos catalogStart = tellg();
187
188 fChkData.reset();
189
190 //do the actual reading
191 for (uint32_t i=0;i<fNumTiles;i++)
192 for (uint32_t j=0;j<fTable.num_cols;j++)
193 {
194 read(readBuf.data(), 2*sizeof(int64_t));
195 fChkData.add(readBuf);
196 //swap the bytes
197 int64_t tempValues[2] = {0,0};
198 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
199 if (tempValues[0] < 0 || tempValues[1] < 0)
200 {
201 clear(rdstate()|std::ios::badbit);
202#ifdef __EXCEPTIONS
203 throw std::runtime_error("Negative value in the catalog");
204#else
205 gLog << ___err___ << "ERROR - negative value in the catalog" << std::endl;
206 return;
207#endif
208 }
209 //add catalog entry
210 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
211 }
212
213 //see if there is a gap before heap data
214 fHeapOff = tellg()+fTable.GetHeapShift();
215 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
216
217 //check if the catalog has been shrinked
218 fShrinkFactor = HasKey("ZSHRINK") ? GetUInt("ZSHRINK") : 1;
219
220 if (fNumRowsPerTile%fShrinkFactor)
221 {
222 clear(rdstate()|std::ios::badbit);
223#ifdef __EXCEPTIONS
224 throw std::runtime_error("Rows per tile and shrink factor do not match");
225#else
226 gLog << ___err___ << "ERROR - Rows per tile and shrink factor do not match" << std::endl;
227 return;
228#endif
229 }
230
231 if (fShrinkFactor>0)
232 fNumRowsPerTile /= fShrinkFactor;
233
234 //compute the total size of each compressed tile
235 fTileSize.resize(fNumTiles);
236 fTileOffsets.resize(fNumTiles);
237 for (uint32_t i=0;i<fNumTiles;i++)
238 {
239 fTileSize[i] = 0;
240 for (uint32_t j=0;j<fTable.num_cols;j++)
241 {
242 fTileSize[i] += fCatalog[i][j].first;
243 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
244 }
245 }
246
247 if (!fCopy.is_open())
248 return;
249
250 //write catalog and heap gap to target file
251 seekg(catalogStart);
252
253 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
254
255 std::vector<char> buf(catSize);
256 read(buf.data(), catSize);
257
258 fCopy.write(buf.data(), catSize);
259 if (!fCopy)
260 clear(rdstate()|std::ios::badbit);
261 }
262
263 //overrides fits.h method with empty one
264 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
265 virtual void WriteRowToCopyFile(size_t row)
266 {
267 if (row == fRow+1)
268 fRawsum.add(fBufferRow);
269 }
270
271 // Compressed version of the read row, even files with shrunk catalogs
272 // can be read fully sequentially so that streaming, e.g. through
273 // stdout/stdin, is possible.
274 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
275 {
276 if (rowNum >= GetNumRows())
277 return false;
278
279 if (!fCatalogInitialized)
280 InitCompressionReading();
281
282 // Book keeping, where are we?
283 const int64_t requestedTile = rowNum / fNumRowsPerTile;
284 const int64_t currentTile = fCurrentRow / fNumRowsPerTile;
285
286 const int64_t requestedSuperTile = requestedTile / fShrinkFactor;
287 //const int64_t currentSuperTile = currentTile / fShrinkFactor;
288
289 const int64_t requestedSubTile = requestedTile % fShrinkFactor;
290 //const int64_t currentSubTile = currentTile % fShrinkFactor;
291
292 // Is this the first tile we read at all?
293 const bool isFirstTile = fCurrentRow<0;
294
295 // Is this just the next tile in the sequence?
296 const bool isNextTile = requestedTile==currentTile+1 || isFirstTile;
297
298 fCurrentRow = rowNum;
299
300 // Do we have to read a new tile from disk?
301 if (requestedTile!=currentTile || isFirstTile)
302 {
303 //skip to the beginning of the tile
304 const int64_t superTileStart = fCatalog[requestedSuperTile][0].second - sizeof(FITS::TileHeader);
305
306 std::vector<size_t> offsets = fTileOffsets[requestedSuperTile];
307
308 // If this is a sub tile we might have to step forward a bit and
309 // seek for the sub tile. If we were just reading the previous one
310 // we can skip that.
311 if (!isNextTile || isFirstTile)
312 {
313 // step to the beginnig of the super tile
314 seekg(fHeapOff+superTileStart);
315
316 // If there are sub tiles we might have to seek through the super tile
317 for (uint32_t k=0; k<requestedSubTile; k++)
318 {
319 // Read header
320 FITS::TileHeader header;
321 read((char*)&header, sizeof(FITS::TileHeader));
322
323 // Skip to the next header
324 seekg(header.size-sizeof(FITS::TileHeader), cur);
325 }
326 }
327
328 // this is now the beginning of the sub-tile we want to read
329 const int64_t subTileStart = tellg() - fHeapOff;
330 // calculate the 32 bits offset of the current tile.
331 const uint32_t offset = (subTileStart + fHeapFromDataStart)%4;
332
333 // start of destination buffer (padding comes later)
334 char *destBuffer = fCompressedBuffer.data()+offset;
335
336 // Store the current tile size once known
337 size_t currentTileSize = 0;
338
339 // If this is a request for a sub tile which is not cataloged
340 // recalculate the offsets from the buffer, once read
341 if (requestedSubTile>0)
342 {
343 // Read header
344 read(destBuffer, sizeof(FITS::TileHeader));
345
346 // Get size of tile
347 currentTileSize = reinterpret_cast<FITS::TileHeader*>(destBuffer)->size;
348
349 // now read the remaining bytes of this tile
350 read(destBuffer+sizeof(FITS::TileHeader), currentTileSize-sizeof(FITS::TileHeader));
351
352 // Calculate the offsets recursively
353 offsets[0] = 0;
354
355 //skip through the columns
356 for (size_t i=0; i<fTable.num_cols-1; i++)
357 {
358 //zero sized column do not have headers. Skip it
359 if (fTable.sorted_cols[i].num == 0)
360 {
361 offsets[i+1] = offsets[i];
362 continue;
363 }
364
365 const char *pos = destBuffer + offsets[i] + sizeof(FITS::TileHeader);
366 offsets[i+1] = offsets[i] + reinterpret_cast<const FITS::BlockHeader*>(pos)->size;
367 }
368 }
369 else
370 {
371 // If we are reading the first tile of a super tile, all information
372 // is already available.
373 currentTileSize = fTileSize[requestedSuperTile] + sizeof(FITS::TileHeader);
374 read(destBuffer, currentTileSize);
375 }
376
377
378 // If we are reading sequentially, calcualte checksum
379 if (isNextTile)
380 {
381 // Padding for checksum calculation
382 memset(fCompressedBuffer.data(), 0, offset);
383 memset(destBuffer+currentTileSize, 0, fCompressedBuffer.size()-currentTileSize-offset);
384 fChkData.add(fCompressedBuffer);
385 }
386
387 // Check if we are writing a copy of the file
388 if (isNextTile && fCopy.is_open() && fCopy.good())
389 {
390 fCopy.write(fCompressedBuffer.data()+offset, currentTileSize);
391 if (!fCopy)
392 clear(rdstate()|std::ios::badbit);
393 }
394 else
395 if (fCopy.is_open())
396 clear(rdstate()|std::ios::badbit);
397
398
399 // uncompress the buffer
400 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
401 if (!UncompressBuffer(offsets, thisRoundNumRows, offset+sizeof(FITS::TileHeader)))
402 return false;
403
404 // pointer to column (source buffer)
405 const char *src = fTransposedBuffer.data();
406
407 uint32_t i=0;
408 for (auto it=fTable.sorted_cols.cbegin(); it!=fTable.sorted_cols.cend(); it++, i++)
409 {
410 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
411
412 switch (fColumnOrdering[i])
413 {
414 case FITS::kOrderByRow:
415 // regular, "semi-transposed" copy
416 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
417 {
418 memcpy(dest, src, it->bytes);
419 src += it->bytes; // next column
420 }
421 break;
422
423 case FITS::kOrderByCol:
424 // transposed copy
425 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
426 {
427 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
428 {
429 memcpy(dest, src, it->size);
430 src += it->size; // next element
431 }
432 }
433 break;
434
435 default:
436 clear(rdstate()|std::ios::badbit);
437
438 std::ostringstream str;
439 str << "Unkown column ordering scheme found (i=" << i << ", " << fColumnOrdering[i] << ")";
440#ifdef __EXCEPTIONS
441 throw std::runtime_error(str.str());
442#else
443 gLog << ___err___ << "ERROR - " << str.str() << std::endl;
444 return false;
445#endif
446 };
447 }
448 }
449
450 //Data loaded and uncompressed. Copy it to destination
451 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
452 return good();
453 }
454
455 // Read a bunch of uncompressed data
456 uint32_t UncompressUNCOMPRESSED(char* dest,
457 const char* src,
458 uint32_t numElems,
459 uint32_t sizeOfElems)
460 {
461 memcpy(dest, src, numElems*sizeOfElems);
462 return numElems*sizeOfElems;
463 }
464
465 // Read a bunch of data compressed with the Huffman algorithm
466 uint32_t UncompressHUFFMAN16(char* dest,
467 const char* src,
468 uint32_t numChunks)
469 {
470 std::vector<uint16_t> uncompressed;
471
472 //read compressed sizes (one per row)
473 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
474 src += sizeof(uint32_t)*numChunks;
475
476 //uncompress the rows, one by one
477 uint32_t sizeWritten = 0;
478 for (uint32_t j=0;j<numChunks;j++)
479 {
480 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
481
482 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
483
484 sizeWritten += uncompressed.size()*sizeof(uint16_t);
485 dest += uncompressed.size()*sizeof(uint16_t);
486 src += compressedSizes[j];
487 }
488 return sizeWritten;
489 }
490
491 // Apply the inverse transform of the integer smoothing
492 uint32_t UnApplySMOOTHING(int16_t* data,
493 uint32_t numElems)
494 {
495 //un-do the integer smoothing
496 for (uint32_t j=2;j<numElems;j++)
497 data[j] = data[j] + (data[j-1]+data[j-2])/2;
498
499 return numElems*sizeof(uint16_t);
500 }
501
502 // Data has been read from disk. Uncompress it !
503 bool UncompressBuffer(const std::vector<size_t> &offsets,
504 const uint32_t &thisRoundNumRows,
505 const uint32_t offset)
506 {
507 char *dest = fTransposedBuffer.data();
508
509 //uncompress column by column
510 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
511 {
512 const fits::Table::Column &col = fTable.sorted_cols[i];
513 if (col.num == 0)
514 continue;
515
516 //get the compression flag
517 const int64_t compressedOffset = offsets[i]+offset;
518
519 const FITS::BlockHeader* head = reinterpret_cast<FITS::BlockHeader*>(&fCompressedBuffer[compressedOffset]);
520 const uint16_t *processings = reinterpret_cast<const uint16_t*>(reinterpret_cast<const char*>(head)+sizeof(FITS::BlockHeader));
521
522 fColumnOrdering[i] = head->ordering;
523
524 const uint32_t numRows = (head->ordering==FITS::kOrderByRow) ? thisRoundNumRows : col.num;
525 const uint32_t numCols = (head->ordering==FITS::kOrderByCol) ? thisRoundNumRows : col.num;
526
527 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(FITS::BlockHeader)+sizeof(uint16_t)*head->numProcs;
528
529 for (int32_t j=head->numProcs-1;j >= 0; j--)
530 {
531 uint32_t sizeWritten=0;
532
533 switch (processings[j])
534 {
535 case FITS::kFactRaw:
536 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
537 break;
538
539 case FITS::kFactSmoothing:
540 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
541 break;
542
543 case FITS::kFactHuffman16:
544 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
545 break;
546
547 default:
548 clear(rdstate()|std::ios::badbit);
549
550 std::ostringstream str;
551 str << "Unknown processing applied to data (col=" << i << ", proc=" << j << "/" << (int)head->numProcs;
552#ifdef __EXCEPTIONS
553 throw std::runtime_error(str.str());
554#else
555 gLog << ___err___ << "ERROR - " << str.str() << std::endl;
556 return false;
557#endif
558 }
559 //increment destination counter only when processing done.
560 if (j==0)
561 dest+= sizeWritten;
562 }
563 }
564
565 return true;
566 }
567
568 void CheckIfFileIsConsistent(bool update_catalog=false)
569 {
570 //goto start of heap
571 const streamoff whereAreWe = tellg();
572 seekg(fHeapOff);
573
574 //init number of rows to zero
575 uint64_t numRows = 0;
576
577 //get number of columns from header
578 const size_t numCols = fTable.num_cols;
579
580 std::vector<std::vector<std::pair<int64_t, int64_t> > > catalog;
581
582 FITS::TileHeader tileHead;
583 FITS::BlockHeader columnHead;
584
585 streamoff offsetInHeap = 0;
586 //skip through the heap
587 while (true)
588 {
589 read((char*)(&tileHead), sizeof(FITS::TileHeader));
590 //end of file
591 if (!good())
592 break;
593
594 //padding or corrupt data
595 if (memcmp(tileHead.id, "TILE", 4))
596 {
597 clear(rdstate()|std::ios::badbit);
598 break;
599 }
600
601 //a new tile begins here
602 catalog.emplace_back();
603 offsetInHeap += sizeof(FITS::TileHeader);
604
605 //skip through the columns
606 for (size_t i=0;i<numCols;i++)
607 {
608 //zero sized column do not have headers. Skip it
609 if (fTable.sorted_cols[i].num == 0)
610 {
611 catalog.back().emplace_back(0,0);
612 continue;
613 }
614
615 //read column header
616 read((char*)(&columnHead), sizeof(FITS::BlockHeader));
617
618 //corrupted tile
619 if (!good())
620 break;
621
622 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
623 offsetInHeap += columnHead.size;
624 seekg(fHeapOff+offsetInHeap);
625 }
626
627 //if we ain't good, this means that something went wrong inside the current tile.
628 if (!good())
629 {
630 catalog.pop_back();
631 break;
632 }
633 //current tile is complete. Add rows
634 numRows += tileHead.numRows;
635 }
636
637 if (numRows != fTable.num_rows)
638 {
639 clear(rdstate()|std::ios::badbit);
640 std::ostringstream str;
641 str << "Heap data does not agree with header: " << numRows << " calculated vs " << fTable.num_rows << " from header.";
642#ifdef __EXCEPTIONS
643 throw std::runtime_error(str.str());
644#else
645 gLog << ___err___ << "ERROR - " << str.str() << std::endl;
646 return;
647#endif
648 }
649
650 if (update_catalog)
651 {
652 fCatalog = catalog;
653 //clear the bad bit before seeking back (we hit eof)
654 clear();
655 seekg(whereAreWe);
656 return;
657 }
658
659 if (catalog.size() != fCatalog.size())
660 {
661 clear(rdstate()|std::ios::badbit);
662#ifdef __EXCEPTIONS
663 throw std::runtime_error("Heap data does not agree with header.");
664#else
665 gLog << ___err___ << "ERROR - Heap data does not agree with header." << std::endl;
666 return;
667#endif
668 }
669
670 for (uint32_t i=0;i<catalog.size(); i++)
671 for (uint32_t j=0;j<numCols;j++)
672 {
673 if (catalog[i][j].first != fCatalog[i][j].first ||
674 catalog[i][j].second != fCatalog[i][j].second)
675 {
676 clear(rdstate()|std::ios::badbit);
677#ifdef __EXCEPTIONS
678 throw std::runtime_error("Heap data does not agree with header.");
679#else
680 gLog << ___err___ << "ERROR - Heap data does not agree with header." << std::endl;
681 return;
682#endif
683 }
684 }
685 //go back to start of heap
686 //clear the bad bit before seeking back (we hit eof)
687 clear();
688 seekg(whereAreWe);
689 }
690
691};//class zfits
692
693#endif
Note: See TracBrowser for help on using the repository browser.