source: trunk/Mars/mcore/zfits.h@ 17260

Last change on this file since 17260 was 17253, checked in by lyard, 11 years ago
zofits with catalog shrinking and compressed calibration table
File size: 21.2 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_zfits
9#define MARS_zfits
10
11#include "fits.h"
12#include "huffman.h"
13
14#include "FITS.h"
15
16using namespace FITS;
17
18#ifndef __MARS__
19namespace std
20{
21#endif
22
23class zfits : public fits
24{
25public:
26
27 // Basic constructor
28 zfits(const string& fname, const string& tableName="", bool force=false) : fits(),
29 fNumTiles(0),
30 fNumRowsPerTile(0),
31 fCurrentRow(-1),
32 fHeapOff(0),
33 fTileSize(0)
34 {
35 open(fname.c_str());
36 Constructor(fname, "", tableName, force);
37 InitCompressionReading();
38 }
39
40 // Alternative contstructor
41 zfits(const string& fname, const string& fout, const string& tableName, bool force=false) : fits(),
42 fNumTiles(0),
43 fNumRowsPerTile(0),
44 fCurrentRow(-1),
45 fHeapOff(0),
46 fTileSize(0)
47 {
48 open(fname.c_str());
49 Constructor(fname, fout, tableName, force);
50 InitCompressionReading();
51 }
52
53 // Skip the next row
54 bool SkipNextRow()
55 {
56 if (!fTable.is_compressed)
57 return fits::SkipNextRow();
58
59 fRow++;
60 return true;
61 }
62
63 virtual bool IsFileOk() const
64 {
65 bool rawsum = true;
66
67 if (HasKey("RAWSUM"))
68 {
69 ostringstream str;
70 str << fRawsum.val();
71 rawsum = (GetStr("RAWSUM") == str.str());
72 }
73
74 return fits::IsFileOk() && rawsum;
75 };
76
77 size_t GetNumRows() const
78 {
79 if (fTable.is_compressed)
80 return fTable.Get<size_t>("ZNAXIS2");
81 else
82 return fTable.Get<size_t>("NAXIS2");
83 }
84 size_t GetBytesPerRow() const
85 {
86 if (fTable.is_compressed)
87 return fTable.Get<size_t>("ZNAXIS1");
88 else
89 return fTable.Get<size_t>("NAXIS1");
90 }
91
92protected:
93
94 // Stage the requested row to internal buffer
95 // Does NOT return data to users
96 virtual void StageRow(size_t row, char* dest)
97 {
98 if (!fTable.is_compressed)
99 {
100 fits::StageRow(row, dest);
101 return;
102 }
103 ReadBinaryRow(row, dest);
104 }
105
106private:
107
108 // Do what it takes to initialize the compressed structured
109 void InitCompressionReading()
110 {
111 if (!fTable.is_compressed)
112 return;
113
114 //The constructor may have failed
115 if (!good())
116 return;
117
118 if (fTable.is_compressed)
119 for (auto it=fTable.sorted_cols.begin(); it!= fTable.sorted_cols.end(); it++)
120 {
121 if (it->comp == kCompFACT)
122 continue;
123
124 clear(rdstate()|ios::badbit);
125#ifdef __EXCEPTIONS
126 throw runtime_error("Only the FACT compression scheme is handled by this reader.");
127#else
128 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << endl;
129 return;
130#endif
131 }
132
133 fColumnOrdering.resize(fTable.sorted_cols.size());
134 for (auto it=fColumnOrdering.begin(); it != fColumnOrdering.end(); it++)
135 (*it) = kOrderByRow;
136 //Get compressed specific keywords
137 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
138 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
139
140 //read the file's catalog
141 ReadCatalog();
142
143 //give it some space for uncompressing
144 AllocateBuffers();
145
146 //check that heap agrees with head
147 //CheckIfFileIsConsistent();
148 }
149
150 // Copy decompressed data to location requested by user
151 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
152 {
153 if (!fTable.is_compressed)
154 {
155 fits::MoveColumnDataToUserSpace(dest, src, c);
156 return;
157 }
158
159 memcpy(dest, src, c.num*c.size);
160 }
161
162 vector<char> fBuffer; ///<store the uncompressed rows
163 vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
164 vector<char> fCompressedBuffer; ///<compressed rows
165 vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
166
167 size_t fNumTiles; ///< Total number of tiles
168 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
169 int64_t fCurrentRow; ///< current row in memory signed because we need -1
170
171 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
172 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
173
174 vector<vector<pair<int64_t, int64_t>>> fCatalog;///< Catalog, i.e. the main table that points to the compressed data.
175 vector<size_t> fTileSize; ///< size in bytes of each compressed tile
176 vector<vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
177
178 Checksum fRawsum; ///< Checksum of the uncompressed, raw data
179
180 // Get buffer space
181 void AllocateBuffers()
182 {
183 uint32_t buffer_size = fTable.bytes_per_row*fNumRowsPerTile;
184 uint32_t compressed_buffer_size = fTable.bytes_per_row*fNumRowsPerTile +
185 fTable.num_cols*(sizeof(BlockHeader)+256) + //use a bit more memory for block headers. 256 char coding the compression sequence max.
186 sizeof(TileHeader) + //a bit more for the tile headers
187 8;//and a bit more for checksuming
188 if (buffer_size % 4 != 0) buffer_size += 4 - (buffer_size%4);
189 if (compressed_buffer_size % 4 != 0) compressed_buffer_size += 4 - (compressed_buffer_size%4);
190
191 fBuffer.resize(buffer_size);
192
193 fTransposedBuffer.resize(buffer_size);
194 fCompressedBuffer.resize(compressed_buffer_size);
195 }
196
197 // Read catalog data. I.e. the address of the compressed data inside the heap
198 void ReadCatalog()
199 {
200 vector<char> readBuf(16);
201 fCatalog.resize(fNumTiles);
202
203 const streampos catalogStart = tellg();
204
205 fChkData.reset();
206
207 //do the actual reading
208 for (uint32_t i=0;i<fNumTiles;i++)
209 for (uint32_t j=0;j<fTable.num_cols;j++)
210 {
211 read(readBuf.data(), 2*sizeof(int64_t));
212 fChkData.add(readBuf);
213 //swap the bytes
214 int64_t tempValues[2] = {0,0};
215 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
216 if (tempValues[0] < 0 || tempValues[1] < 0)
217 {
218 clear(rdstate()|ios::badbit);
219#ifdef __EXCEPTIONS
220 throw runtime_error("Negative value in the catalog");
221#else
222 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
223 return;
224#endif
225 }
226 //add catalog entry
227 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
228 }
229
230 //see if there is a gap before heap data
231 fHeapOff = tellg()+fTable.GetHeapShift();
232 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
233
234 //check if the catalog has been shrinked
235 uint32_t shrink_factor = 1;
236 if (HasKey("ZSHRINK"))
237 shrink_factor = GetInt("ZSHRINK");
238
239 if (shrink_factor != 1)
240 {
241 CheckIfFileIsConsistent(true);
242 fNumTiles = fCatalog.size();
243 fNumRowsPerTile /= shrink_factor;
244 }
245
246 //compute the total size of each compressed tile
247 fTileSize.resize(fNumTiles);
248 fTileOffsets.resize(fNumTiles);
249 for (uint32_t i=0;i<fNumTiles;i++)
250 {
251 fTileSize[i] = 0;
252 for (uint32_t j=0;j<fTable.num_cols;j++)
253 {
254 fTileSize[i] += fCatalog[i][j].first;
255 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
256 }
257 }
258
259 if (!fCopy.is_open())
260 return;
261
262 //write catalog and heap gap to target file
263 seekg(catalogStart);
264
265 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
266
267 vector<char> buf(catSize);
268 read(buf.data(), catSize);
269
270 fCopy.write(buf.data(), catSize);
271 if (!fCopy)
272 clear(rdstate()|ios::badbit);
273 }
274
275 //overrides fits.h method with empty one
276 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
277 virtual void WriteRowToCopyFile(size_t row)
278 {
279 if (row == fRow+1)
280 fRawsum.add(fBufferRow, false);
281 }
282
283 // Compressed version of the read row
284 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
285 {
286 if (rowNum >= GetNumRows())
287 return false;
288
289 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
290 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
291
292 bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1));
293
294 fCurrentRow = rowNum;
295 //should we read yet another chunk of data ?
296 if (requestedTile != currentTile)
297 {
298 //read yet another chunk from the file
299 const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(TileHeader);
300
301 //skip to the beginning of the tile
302 const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(TileHeader);
303
304 seekg(fHeapOff+tileStart);
305
306 //calculate the 32 bits offset of the current tile.
307 const uint32_t offset = (tileStart + fHeapFromDataStart)%4;
308
309 //point the tile header where it should be
310 //we ain't checking the header now
311// TileHeader* tHead = reinterpret_cast<TileHeader*>(fCompressedBuffer.data()+offset);
312
313 ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8));
314
315 //read one tile from disk
316 read(fCompressedBuffer.data()+offset, sizeToRead);
317
318 if (addCheckSum)
319 fChkData.add(fCompressedBuffer);
320
321 if (requestedTile == currentTile+1 &&
322 fCopy.is_open() &&
323 fCopy.good())
324 {
325 fCopy.write(fCompressedBuffer.data()+offset, sizeToRead);
326 if (!fCopy)
327 clear(rdstate()|ios::badbit);
328 }
329 else
330 if (fCopy.is_open())
331 clear(rdstate()|ios::badbit);
332
333 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
334
335 //uncompress it
336 UncompressBuffer(requestedTile, thisRoundNumRows, offset+sizeof(TileHeader));
337
338 // pointer to column (source buffer)
339 const char *src = fTransposedBuffer.data();
340
341 uint32_t i=0;
342 for (auto it=fTable.sorted_cols.begin(); it!=fTable.sorted_cols.end(); it++, i++)
343 {
344 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
345
346 switch (fColumnOrdering[i])
347 {
348 case kOrderByRow:
349 // regular, "semi-transposed" copy
350 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
351 {
352 memcpy(dest, src, it->bytes);
353 src += it->bytes; // next column
354 }
355 break;
356
357 case kOrderByCol:
358 // transposed copy
359 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
360 {
361 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
362 {
363 memcpy(dest, src, it->size);
364 src += it->size; // next element
365 }
366 }
367 break;
368 default:
369 clear(rdstate()|ios::badbit);
370 #ifdef __EXCEPTIONS
371 throw runtime_error("Unkown column ordering scheme found");
372 #else
373 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
374 return false;
375 #endif
376 break;
377 };
378 }
379 }
380
381 //Data loaded and uncompressed. Copy it to destination
382 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
383 return good();
384 }
385
386 // Read a bunch of uncompressed data
387 uint32_t UncompressUNCOMPRESSED(char* dest,
388 const char* src,
389 uint32_t numElems,
390 uint32_t sizeOfElems)
391 {
392 memcpy(dest, src, numElems*sizeOfElems);
393 return numElems*sizeOfElems;
394 }
395
396 // Read a bunch of data compressed with the Huffman algorithm
397 uint32_t UncompressHUFFMAN16(char* dest,
398 const char* src,
399 uint32_t numChunks)
400 {
401 vector<uint16_t> uncompressed;
402
403 //read compressed sizes (one per row)
404 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
405 src += sizeof(uint32_t)*numChunks;
406
407 //uncompress the rows, one by one
408 uint32_t sizeWritten = 0;
409 for (uint32_t j=0;j<numChunks;j++)
410 {
411 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
412
413 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
414
415 sizeWritten += uncompressed.size()*sizeof(uint16_t);
416 dest += uncompressed.size()*sizeof(uint16_t);
417 src += compressedSizes[j];
418 }
419 return sizeWritten;
420 }
421
422 // Apply the inverse transform of the integer smoothing
423 uint32_t UnApplySMOOTHING(int16_t* data,
424 uint32_t numElems)
425 {
426 //un-do the integer smoothing
427 for (uint32_t j=2;j<numElems;j++)
428 data[j] = data[j] + (data[j-1]+data[j-2])/2;
429
430 return numElems*sizeof(uint16_t);
431 }
432
433 // Data has been read from disk. Uncompress it !
434 void UncompressBuffer(const uint32_t &catalogCurrentRow,
435 const uint32_t &thisRoundNumRows,
436 const uint32_t offset)
437 {
438 char *dest = fTransposedBuffer.data();
439
440 //uncompress column by column
441 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
442 {
443 const fits::Table::Column &col = fTable.sorted_cols[i];
444 if (col.num == 0)
445 continue;
446
447 //get the compression flag
448 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i]+offset;
449
450 const BlockHeader* head = reinterpret_cast<BlockHeader*>(&fCompressedBuffer[compressedOffset]);
451
452 fColumnOrdering[i] = head->ordering;
453
454 const uint32_t numRows = (head->ordering==kOrderByRow) ? thisRoundNumRows : col.num;
455 const uint32_t numCols = (head->ordering==kOrderByCol) ? thisRoundNumRows : col.num;
456
457 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(BlockHeader)+sizeof(uint16_t)*head->numProcs;
458
459 for (int32_t j=head->numProcs-1;j >= 0; j--)
460 {
461 uint32_t sizeWritten=0;
462
463 switch (head->processings[j])
464 {
465 case kFactRaw:
466 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
467 break;
468
469 case kFactSmoothing:
470 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
471 break;
472
473 case kFactHuffman16:
474 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
475 break;
476
477 default:
478 clear(rdstate()|ios::badbit);
479 ostringstream str;
480 str << "Unkown processing applied to data. Col " << i << " proc " << j << " out of " << (int)head->numProcs;
481#ifdef __EXCEPTIONS
482 throw runtime_error(str.str());
483#else
484 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
485 return;
486#endif
487 }
488 //increment destination counter only when processing done.
489 if (j==0)
490 dest+= sizeWritten;
491 }
492 }
493 }
494
495 void CheckIfFileIsConsistent(bool update_catalog=false)
496 {
497 //goto start of heap
498 streamoff whereAreWe = tellg();
499 seekg(fHeapOff);
500
501 //init number of rows to zero
502 uint64_t numRows = 0;
503
504 //get number of columns from header
505 size_t numCols = fTable.num_cols;
506
507 vector<vector<pair<int64_t, int64_t> > > catalog;
508
509 TileHeader tileHead;
510 BlockHeader columnHead;
511 streamoff offsetInHeap = 0;
512 //skip through the heap
513 while (true)
514 {
515 read((char*)(&tileHead), sizeof(TileHeader));
516 //end of file
517 if (!good())
518 break;
519
520 //padding or corrupt data
521 if (memcmp(tileHead.id, "TILE", 4))
522 {
523 clear(rdstate()|ios::badbit);
524 break;
525 }
526
527 //a new tile begins here
528 catalog.push_back(vector<pair<int64_t, int64_t> >(0));
529 offsetInHeap += sizeof(TileHeader);
530
531 //skip through the columns
532 for (size_t i=0;i<numCols;i++)
533 {
534 //zero sized column do not have headers. Skip it
535 if (fTable.sorted_cols[i].num == 0)
536 {
537 catalog.back().push_back(make_pair(0,0));
538 continue;
539 }
540 //read column header
541 read((char*)(&columnHead), sizeof(BlockHeader));
542 //corrupted tile
543 if (!good())
544 break;
545 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
546 offsetInHeap += columnHead.size;
547 seekg(fHeapOff+offsetInHeap);
548 }
549
550 //if we ain't good, this means that something went wrong inside the current tile.
551 if (!good())
552 {
553 catalog.pop_back();
554 break;
555 }
556 //current tile is complete. Add rows
557 numRows += tileHead.numRows;
558 }
559
560 if (numRows != fTable.num_rows)
561 {
562 clear(rdstate()|ios::badbit);
563 ostringstream str;
564 str << "Heap data does not agree with header: " << numRows << " calculated vs " << fTable.num_rows << " from header.";
565#ifdef __EXCEPTIONS
566 throw runtime_error(str.str());
567#else
568 gLog << ___err___ << "ERROR - " << str.str() << endl;
569 return;
570#endif
571 }
572
573 if (update_catalog)
574 {
575 fCatalog = catalog;
576 //clear the bad bit before seeking back (we hit eof)
577 clear();
578 seekg(whereAreWe);
579 return;
580 }
581
582 if (catalog.size() != fCatalog.size())
583 {
584 clear(rdstate()|ios::badbit);
585#ifdef __EXCEPTIONS
586 throw runtime_error("Heap data does not agree with header.");
587#else
588 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
589 return;
590#endif
591 }
592
593 for (uint32_t i=0;i<catalog.size(); i++)
594 for (uint32_t j=0;j<numCols;j++)
595 {
596 if (catalog[i][j].first != fCatalog[i][j].first ||
597 catalog[i][j].second != fCatalog[i][j].second)
598 {
599 clear(rdstate()|ios::badbit);
600#ifdef __EXCEPTIONS
601 throw runtime_error("Heap data does not agree with header.");
602#else
603 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
604 return;
605#endif
606 }
607 }
608 //go back to start of heap
609 //clear the bad bit before seeking back (we hit eof)
610 clear();
611 seekg(whereAreWe);
612 }
613
614};//class zfits
615
616#ifndef __MARS__
617}; //namespace std
618#endif
619
620#endif
Note: See TracBrowser for help on using the repository browser.