source: trunk/Mars/mcore/zfits.h@ 17437

Last change on this file since 17437 was 17402, checked in by tbretz, 11 years ago
Updated to allow a real streaming, even for shrunk files.
File size: 23.7 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_zfits
9#define MARS_zfits
10
11#include "fits.h"
12#include "huffman.h"
13
14#include "FITS.h"
15
16class zfits : public fits
17{
18public:
19
20 // Basic constructor
21 zfits(const std::string& fname, const std::string& tableName="", bool force=false)
22 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-2), fHeapOff(0), fTileSize(0)
23 {
24 open(fname.c_str());
25 Constructor(fname, "", tableName, force);
26// InitCompressionReading();
27 }
28
29 // Alternative contstructor
30 zfits(const std::string& fname, const std::string& fout, const std::string& tableName, bool force=false)
31 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-2), fHeapOff(0), fTileSize(0)
32 {
33 open(fname.c_str());
34 Constructor(fname, fout, tableName, force);
35// InitCompressionReading();
36 }
37
38 // Skip the next row
39 bool SkipNextRow()
40 {
41 if (!fTable.is_compressed)
42 return fits::SkipNextRow();
43
44 fRow++;
45 return true;
46 }
47
48 virtual bool IsFileOk() const
49 {
50 bool rawsum = true;
51
52 if (HasKey("RAWSUM"))
53 {
54 std::ostringstream str;
55 str << fRawsum.val();
56 rawsum = (GetStr("RAWSUM") == str.str());
57 }
58
59 return fits::IsFileOk() && rawsum;
60 };
61
62 size_t GetNumRows() const
63 {
64 if (fTable.is_compressed)
65 return fTable.Get<size_t>("ZNAXIS2");
66 else
67 return fTable.Get<size_t>("NAXIS2");
68 }
69 size_t GetBytesPerRow() const
70 {
71 if (fTable.is_compressed)
72 return fTable.Get<size_t>("ZNAXIS1");
73 else
74 return fTable.Get<size_t>("NAXIS1");
75 }
76
77protected:
78
79 // Stage the requested row to internal buffer
80 // Does NOT return data to users
81 virtual void StageRow(size_t row, char* dest)
82 {
83 if (!fTable.is_compressed)
84 {
85 fits::StageRow(row, dest);
86 return;
87 }
88 ReadBinaryRow(row, dest);
89 }
90
91private:
92
93 // Do what it takes to initialize the compressed structured
94 void InitCompressionReading()
95 {
96 fCatalogInitialized = true;
97
98 if (!fTable.is_compressed)
99 return;
100
101 //The constructor may have failed
102 if (!good())
103 return;
104
105 if (fTable.is_compressed)
106 for (auto it=fTable.sorted_cols.cbegin(); it!= fTable.sorted_cols.cend(); it++)
107 {
108 if (it->comp == kCompFACT)
109 continue;
110
111 clear(rdstate()|std::ios::badbit);
112#ifdef __EXCEPTIONS
113 throw std::runtime_error("Only the FACT compression scheme is handled by this reader.");
114#else
115 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << std::endl;
116 return;
117#endif
118 }
119
120 fColumnOrdering.resize(fTable.sorted_cols.size(), FITS::kOrderByRow);
121
122 //Get compressed specific keywords
123 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
124 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
125
126 //read the file's catalog
127 ReadCatalog();
128
129 //give it some space for uncompressing
130 AllocateBuffers();
131 }
132
133 // Copy decompressed data to location requested by user
134 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
135 {
136 if (!fTable.is_compressed)
137 {
138 fits::MoveColumnDataToUserSpace(dest, src, c);
139 return;
140 }
141
142 memcpy(dest, src, c.num*c.size);
143 }
144
145 bool fCatalogInitialized;
146
147 std::vector<char> fBuffer; ///<store the uncompressed rows
148 std::vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
149 std::vector<char> fCompressedBuffer; ///<compressed rows
150 std::vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
151
152 size_t fNumTiles; ///< Total number of tiles
153 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
154 int64_t fCurrentRow; ///< current row in memory signed because we need -1
155 size_t fShrinkFactor; ///< shrink factor
156
157 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
158 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
159
160 std::vector<std::vector<std::pair<int64_t, int64_t>>> fCatalog; ///< Catalog, i.e. the main table that points to the compressed data.
161 std::vector<size_t> fTileSize; ///< size in bytes of each compressed tile
162 std::vector<std::vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
163
164 Checksum fRawsum; ///< Checksum of the uncompressed, raw data
165
166 // Get buffer space
167 void AllocateBuffers()
168 {
169 uint32_t buffer_size = fTable.bytes_per_row*fNumRowsPerTile;
170 uint32_t compressed_buffer_size = fTable.bytes_per_row*fNumRowsPerTile +
171 //use a bit more memory for block headers. 256 char coding the compression sequence max.
172 fTable.num_cols*(sizeof(FITS::BlockHeader)+256) +
173 //a bit more for the tile headers
174 sizeof(FITS::TileHeader) +
175 //and a bit more for checksuming
176 8;
177
178 if (buffer_size % 4 != 0)
179 buffer_size += 4 - (buffer_size%4);
180
181 if (compressed_buffer_size % 4 != 0)
182 compressed_buffer_size += 4 - (compressed_buffer_size%4);
183
184 fBuffer.resize(buffer_size);
185
186 fTransposedBuffer.resize(buffer_size);
187 fCompressedBuffer.resize(compressed_buffer_size);
188 }
189
190 // Read catalog data. I.e. the address of the compressed data inside the heap
191 void ReadCatalog()
192 {
193 std::vector<char> readBuf(16);
194 fCatalog.resize(fNumTiles);
195
196 const streampos catalogStart = tellg();
197
198 fChkData.reset();
199
200 //do the actual reading
201 for (uint32_t i=0;i<fNumTiles;i++)
202 for (uint32_t j=0;j<fTable.num_cols;j++)
203 {
204 read(readBuf.data(), 2*sizeof(int64_t));
205 fChkData.add(readBuf);
206 //swap the bytes
207 int64_t tempValues[2] = {0,0};
208 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
209 if (tempValues[0] < 0 || tempValues[1] < 0)
210 {
211 clear(rdstate()|std::ios::badbit);
212#ifdef __EXCEPTIONS
213 throw std::runtime_error("Negative value in the catalog");
214#else
215 gLog << ___err___ << "ERROR - negative value in the catalog" << std::endl;
216 return;
217#endif
218 }
219 //add catalog entry
220 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
221 }
222
223 //see if there is a gap before heap data
224 fHeapOff = tellg()+fTable.GetHeapShift();
225 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
226
227 //check if the catalog has been shrinked
228 fShrinkFactor = HasKey("ZSHRINK") ? GetUInt("ZSHRINK") : 1;
229
230 if (fNumRowsPerTile%fShrinkFactor)
231 {
232 clear(rdstate()|std::ios::badbit);
233#ifdef __EXCEPTIONS
234 throw std::runtime_error("Rows per tile and shrink factor do not match");
235#else
236 gLog << ___err___ << "ERROR - Rows per tile and shrink factor do not match" << std::endl;
237 return;
238#endif
239 }
240
241 if (fShrinkFactor>0)
242 fNumRowsPerTile /= fShrinkFactor;
243
244 //compute the total size of each compressed tile
245 fTileSize.resize(fNumTiles);
246 fTileOffsets.resize(fNumTiles);
247 for (uint32_t i=0;i<fNumTiles;i++)
248 {
249 fTileSize[i] = 0;
250 for (uint32_t j=0;j<fTable.num_cols;j++)
251 {
252 fTileSize[i] += fCatalog[i][j].first;
253 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
254 }
255 }
256
257 if (!fCopy.is_open())
258 return;
259
260 //write catalog and heap gap to target file
261 seekg(catalogStart);
262
263 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
264
265 std::vector<char> buf(catSize);
266 read(buf.data(), catSize);
267
268 fCopy.write(buf.data(), catSize);
269 if (!fCopy)
270 clear(rdstate()|std::ios::badbit);
271 }
272
273 //overrides fits.h method with empty one
274 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
275 virtual void WriteRowToCopyFile(size_t row)
276 {
277 if (row == fRow+1)
278 fRawsum.add(fBufferRow, false);
279 }
280
281 // Compressed version of the read row, even files with shrunk catalogs
282 // can be read fully sequentially so that streaming, e.g. through
283 // stdout/stdin, is possible.
284 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
285 {
286 if (rowNum >= GetNumRows())
287 return false;
288
289 if (!fCatalogInitialized)
290 InitCompressionReading();
291
292 // Book keeping, where are we?
293 const int64_t requestedTile = rowNum / fNumRowsPerTile;
294 const int64_t currentTile = fCurrentRow / fNumRowsPerTile;
295
296 const int64_t requestedSuperTile = requestedTile / fShrinkFactor;
297 const int64_t currentSuperTile = currentTile / fShrinkFactor;
298
299 const int64_t requestedSubTile = requestedTile % fShrinkFactor;
300 const int64_t currentSubTile = currentTile % fShrinkFactor;
301
302 fCurrentRow = rowNum;
303
304 // Is this just the next tile in the sequence?
305 const bool isNextTile = requestedTile==currentTile+1;
306
307 // Do we have to read a new tile from disk?
308 if (requestedTile!=currentTile)
309 {
310 //skip to the beginning of the tile
311 const int64_t superTileStart = fCatalog[requestedSuperTile][0].second - sizeof(FITS::TileHeader);
312
313 std::vector<size_t> offsets = fTileOffsets[requestedSuperTile];
314
315 // If this is a sub tile we might have to step forward a bit and
316 // seek for the sub tile. If we were just reading the previous one
317 // we can skip that.
318 if (!isNextTile)
319 {
320 // step to the beginnig of the super tile
321 seekg(fHeapOff+superTileStart);
322
323 // If there are sub tiles we might have to seek through the super tile
324 for (uint32_t k=0; k<requestedSubTile; k++)
325 {
326 // Read header
327 FITS::TileHeader header;
328 read((char*)&header, sizeof(FITS::TileHeader));
329
330 // Skip to the next header
331 seekg(header.size-sizeof(FITS::TileHeader), cur);
332 }
333 }
334
335 // this is now the beginning of the sub-tile we want to read
336 const int64_t subTileStart = tellg() - fHeapOff;
337
338 // calculate the 32 bits offset of the current tile.
339 const uint32_t offset = (subTileStart + fHeapFromDataStart)%4;
340
341 // start of destination buffer (padding comes later)
342 char *destBuffer = fCompressedBuffer.data()+offset;
343
344 // Store the current tile size once known
345 size_t currentTileSize = 0;
346
347 // If this is a request for a sub tile which is not cataloged
348 // recalculate the offsets from the buffer, once read
349 if (requestedSubTile>0)
350 {
351 // Read header
352 read(destBuffer, sizeof(FITS::TileHeader));
353
354 // Get size of tile
355 currentTileSize = reinterpret_cast<FITS::TileHeader*>(destBuffer)->size;
356
357 // now read the remaining bytes of this tile
358 read(destBuffer+sizeof(FITS::TileHeader), currentTileSize-sizeof(FITS::TileHeader));
359
360 // Calculate the offsets recursively
361 offsets[0] = 0;
362
363 //skip through the columns
364 for (size_t i=0; i<fTable.num_cols-1; i++)
365 {
366 //zero sized column do not have headers. Skip it
367 if (fTable.sorted_cols[i].num == 0)
368 {
369 offsets[i+1] = offsets[i];
370 continue;
371 }
372
373 const char *pos = destBuffer + offsets[i] + sizeof(FITS::TileHeader);
374 offsets[i+1] = offsets[i] + reinterpret_cast<const FITS::BlockHeader*>(pos)->size;
375 }
376 }
377 else
378 {
379 // If we are reading the first tile of a super tile, all information
380 // is already available.
381 currentTileSize = fTileSize[requestedSuperTile] + sizeof(FITS::TileHeader);
382 read(destBuffer, currentTileSize);
383 }
384
385
386 // If we are reading sequentially, calcuakte checksum
387 if (isNextTile)
388 {
389 // Padding for checksum calculation
390 memset(fCompressedBuffer.data(), 0, offset);
391 memset(destBuffer+currentTileSize, 0, fCompressedBuffer.size()-currentTileSize-offset);
392 fChkData.add(fCompressedBuffer);
393 }
394
395 // Check if we are writing a copy of the file
396 if (isNextTile && fCopy.is_open() && fCopy.good())
397 {
398 fCopy.write(fCompressedBuffer.data()+offset, currentTileSize);
399 if (!fCopy)
400 clear(rdstate()|std::ios::badbit);
401 }
402 else
403 if (fCopy.is_open())
404 clear(rdstate()|std::ios::badbit);
405
406
407 // uncompress the buffer
408 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
409 UncompressBuffer(offsets, thisRoundNumRows, offset+sizeof(FITS::TileHeader));
410
411 // pointer to column (source buffer)
412 const char *src = fTransposedBuffer.data();
413
414 uint32_t i=0;
415 for (auto it=fTable.sorted_cols.cbegin(); it!=fTable.sorted_cols.cend(); it++, i++)
416 {
417 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
418
419 switch (fColumnOrdering[i])
420 {
421 case FITS::kOrderByRow:
422 // regular, "semi-transposed" copy
423 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
424 {
425 memcpy(dest, src, it->bytes);
426 src += it->bytes; // next column
427 }
428 break;
429
430 case FITS::kOrderByCol:
431 // transposed copy
432 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
433 {
434 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
435 {
436 memcpy(dest, src, it->size);
437 src += it->size; // next element
438 }
439 }
440 break;
441
442 default:
443 clear(rdstate()|std::ios::badbit);
444#ifdef __EXCEPTIONS
445 throw std::runtime_error("Unkown column ordering scheme found");
446#else
447 gLog << ___err___ << "ERROR - unkown column ordering scheme" << std::endl;
448 return false;
449#endif
450 break;
451 };
452 }
453 }
454
455 //Data loaded and uncompressed. Copy it to destination
456 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
457 return good();
458 }
459
460 // Read a bunch of uncompressed data
461 uint32_t UncompressUNCOMPRESSED(char* dest,
462 const char* src,
463 uint32_t numElems,
464 uint32_t sizeOfElems)
465 {
466 memcpy(dest, src, numElems*sizeOfElems);
467 return numElems*sizeOfElems;
468 }
469
470 // Read a bunch of data compressed with the Huffman algorithm
471 uint32_t UncompressHUFFMAN16(char* dest,
472 const char* src,
473 uint32_t numChunks)
474 {
475 std::vector<uint16_t> uncompressed;
476
477 //read compressed sizes (one per row)
478 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
479 src += sizeof(uint32_t)*numChunks;
480
481 //uncompress the rows, one by one
482 uint32_t sizeWritten = 0;
483 for (uint32_t j=0;j<numChunks;j++)
484 {
485 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
486
487 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
488
489 sizeWritten += uncompressed.size()*sizeof(uint16_t);
490 dest += uncompressed.size()*sizeof(uint16_t);
491 src += compressedSizes[j];
492 }
493 return sizeWritten;
494 }
495
496 // Apply the inverse transform of the integer smoothing
497 uint32_t UnApplySMOOTHING(int16_t* data,
498 uint32_t numElems)
499 {
500 //un-do the integer smoothing
501 for (uint32_t j=2;j<numElems;j++)
502 data[j] = data[j] + (data[j-1]+data[j-2])/2;
503
504 return numElems*sizeof(uint16_t);
505 }
506
507 // Data has been read from disk. Uncompress it !
508 void UncompressBuffer(const std::vector<size_t> &offsets,
509 const uint32_t &thisRoundNumRows,
510 const uint32_t offset)
511 {
512 char *dest = fTransposedBuffer.data();
513
514 //uncompress column by column
515 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
516 {
517 const fits::Table::Column &col = fTable.sorted_cols[i];
518 if (col.num == 0)
519 continue;
520
521 //get the compression flag
522 const int64_t compressedOffset = offsets[i]+offset;
523
524 const FITS::BlockHeader* head = reinterpret_cast<FITS::BlockHeader*>(&fCompressedBuffer[compressedOffset]);
525
526 fColumnOrdering[i] = head->ordering;
527
528 const uint32_t numRows = (head->ordering==FITS::kOrderByRow) ? thisRoundNumRows : col.num;
529 const uint32_t numCols = (head->ordering==FITS::kOrderByCol) ? thisRoundNumRows : col.num;
530
531 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(FITS::BlockHeader)+sizeof(uint16_t)*head->numProcs;
532
533 for (int32_t j=head->numProcs-1;j >= 0; j--)
534 {
535 uint32_t sizeWritten=0;
536
537 switch (head->processings[j])
538 {
539 case FITS::kFactRaw:
540 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
541 break;
542
543 case FITS::kFactSmoothing:
544 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
545 break;
546
547 case FITS::kFactHuffman16:
548 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
549 break;
550
551 default:
552 clear(rdstate()|std::ios::badbit);
553
554 std::ostringstream str;
555 str << "Unkown processing applied to data. Col " << i << " proc " << j << " out of " << (int)head->numProcs;
556#ifdef __EXCEPTIONS
557 throw std::runtime_error(str.str());
558#else
559 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << std::endl;
560 return;
561#endif
562 }
563 //increment destination counter only when processing done.
564 if (j==0)
565 dest+= sizeWritten;
566 }
567 }
568 }
569
570 void CheckIfFileIsConsistent(bool update_catalog=false)
571 {
572 //goto start of heap
573 const streamoff whereAreWe = tellg();
574 seekg(fHeapOff);
575
576 //init number of rows to zero
577 uint64_t numRows = 0;
578
579 //get number of columns from header
580 const size_t numCols = fTable.num_cols;
581
582 std::vector<std::vector<std::pair<int64_t, int64_t> > > catalog;
583
584 FITS::TileHeader tileHead;
585 FITS::BlockHeader columnHead;
586
587 streamoff offsetInHeap = 0;
588 //skip through the heap
589 while (true)
590 {
591 read((char*)(&tileHead), sizeof(FITS::TileHeader));
592 //end of file
593 if (!good())
594 break;
595
596 //padding or corrupt data
597 if (memcmp(tileHead.id, "TILE", 4))
598 {
599 clear(rdstate()|std::ios::badbit);
600 break;
601 }
602
603 //a new tile begins here
604 catalog.emplace_back();
605 offsetInHeap += sizeof(FITS::TileHeader);
606
607 //skip through the columns
608 for (size_t i=0;i<numCols;i++)
609 {
610 //zero sized column do not have headers. Skip it
611 if (fTable.sorted_cols[i].num == 0)
612 {
613 catalog.back().emplace_back(0,0);
614 continue;
615 }
616
617 //read column header
618 read((char*)(&columnHead), sizeof(FITS::BlockHeader));
619
620 //corrupted tile
621 if (!good())
622 break;
623
624 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
625 offsetInHeap += columnHead.size;
626 seekg(fHeapOff+offsetInHeap);
627 }
628
629 //if we ain't good, this means that something went wrong inside the current tile.
630 if (!good())
631 {
632 catalog.pop_back();
633 break;
634 }
635 //current tile is complete. Add rows
636 numRows += tileHead.numRows;
637 }
638
639 if (numRows != fTable.num_rows)
640 {
641 clear(rdstate()|std::ios::badbit);
642 std::ostringstream str;
643 str << "Heap data does not agree with header: " << numRows << " calculated vs " << fTable.num_rows << " from header.";
644#ifdef __EXCEPTIONS
645 throw std::runtime_error(str.str());
646#else
647 gLog << ___err___ << "ERROR - " << str.str() << std::endl;
648 return;
649#endif
650 }
651
652 if (update_catalog)
653 {
654 fCatalog = catalog;
655 //clear the bad bit before seeking back (we hit eof)
656 clear();
657 seekg(whereAreWe);
658 return;
659 }
660
661 if (catalog.size() != fCatalog.size())
662 {
663 clear(rdstate()|std::ios::badbit);
664#ifdef __EXCEPTIONS
665 throw std::runtime_error("Heap data does not agree with header.");
666#else
667 gLog << ___err___ << "ERROR - Heap data does not agree with header." << std::endl;
668 return;
669#endif
670 }
671
672 for (uint32_t i=0;i<catalog.size(); i++)
673 for (uint32_t j=0;j<numCols;j++)
674 {
675 if (catalog[i][j].first != fCatalog[i][j].first ||
676 catalog[i][j].second != fCatalog[i][j].second)
677 {
678 clear(rdstate()|std::ios::badbit);
679#ifdef __EXCEPTIONS
680 throw std::runtime_error("Heap data does not agree with header.");
681#else
682 gLog << ___err___ << "ERROR - Heap data does not agree with header." << std::endl;
683 return;
684#endif
685 }
686 }
687 //go back to start of heap
688 //clear the bad bit before seeking back (we hit eof)
689 clear();
690 seekg(whereAreWe);
691 }
692
693};//class zfits
694
695#endif
Note: See TracBrowser for help on using the repository browser.