source: trunk/Mars/mcore/zfits.h@ 17050

Last change on this file since 17050 was 17035, checked in by lyard, 11 years ago
Added error when using wrong fits class and adapted MRawFitsRead to deal with compressed .fz fits
File size: 20.8 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_ZFITS
9#define MARS_ZFITS
10
11#include <stdexcept>
12
13#include "fits.h"
14#include "huffman.h"
15
16#ifndef __MARS__
17namespace std
18{
19#endif
20
21class zfits : public fits
22{
23public:
24
25 enum CompressionProcess_t
26 {
27 kFactRaw = 0x0,
28 kFactSmoothing = 0x1,
29 kFactHuffman16 = 0x2
30 };
31
32 enum RowOrdering_t
33 {
34 kOrderByCol = 'C',
35 kOrderByRow = 'R'
36 };
37
38 // Basic constructor
39 zfits(const string& fname, const string& tableName="", bool force=false) : fits(),
40 fNumTiles(0),
41 fNumRowsPerTile(0),
42 fCurrentRow(-1),
43 fHeapOff(0),
44 fTileSize(0)
45 {
46 open(fname.c_str());
47 Constructor(fname, "", tableName, force);
48 InitCompressionReading();
49 }
50
51 // Alternative contstructor
52 zfits(const string& fname, const string& fout, const string& tableName, bool force=false) : fits(),
53 fNumTiles(0),
54 fNumRowsPerTile(0),
55 fCurrentRow(-1),
56 fHeapOff(0),
57 fTileSize(0)
58 {
59 open(fname.c_str());
60 Constructor(fname, fout, tableName, force);
61 InitCompressionReading();
62 }
63
64 // Skip the next row
65 bool SkipNextRow()
66 {
67 if (!fTable.is_compressed)
68 return fits::SkipNextRow();
69
70 fRow++;
71 return true;
72 }
73
74 virtual bool IsFileOk() const
75 {
76 bool rawsum = true;
77
78 if (HasKey("RAWSUM"))
79 {
80 ostringstream str;
81 str << fRawsum.val();
82 rawsum = (GetStr("RAWSUM") == str.str());
83 }
84
85 return fits::IsFileOk() && rawsum;
86 };
87
88 size_t GetNumRows() const
89 {
90 if (fTable.is_compressed)
91 return fTable.Get<size_t>("ZNAXIS2");
92 else
93 return fTable.Get<size_t>("NAXIS2");
94 }
95 size_t GetBytesPerRow() const
96 {
97 if (fTable.is_compressed)
98 return fTable.Get<size_t>("ZNAXIS1");
99 else
100 return fTable.Get<size_t>("NAXIS1");
101 }
102
103protected:
104
105 // Stage the requested row to internal buffer
106 // Does NOT return data to users
107 virtual void StageRow(size_t row, char* dest)
108 {
109 if (!fTable.is_compressed)
110 {
111 fits::StageRow(row, dest);
112 return;
113 }
114
115 ReadBinaryRow(row, dest);
116 }
117
118private:
119#ifndef __CINT__
120 //Structure helper for reading tiles headers
121 struct TileHeader
122 {
123 char id[4];
124 uint32_t numRows;
125 uint64_t size;
126
127 TileHeader() {}
128
129 TileHeader(uint32_t nRows,
130 uint64_t s) : id({'T', 'I', 'L', 'E'}),
131 numRows(nRows),
132 size(s)
133 { };
134 } __attribute__((__packed__));
135
136 //Structure helper for reading blocks headers and compresion schemes
137 struct BlockHeader
138 {
139 uint64_t size;
140 char ordering;
141 unsigned char numProcs;
142 uint16_t processings[];
143
144 BlockHeader(uint64_t s=0,
145 char o=kOrderByRow,
146 unsigned char n=1) : size(s),
147 ordering(o),
148 numProcs(n)
149 {}
150 } __attribute__((__packed__));
151#endif
152 // Do what it takes to initialize the compressed structured
153 void InitCompressionReading()
154 {
155 if (!fTable.is_compressed)
156 return;
157
158 //The constructor may have failed
159 if (!good())
160 return;
161
162 if (fTable.is_compressed)
163 for (auto it=fTable.sorted_cols.begin(); it!= fTable.sorted_cols.end(); it++)
164 {
165 if (it->comp == kCompFACT)
166 continue;
167
168 clear(rdstate()|ios::badbit);
169#ifdef __EXCEPTIONS
170 throw runtime_error("Only the FACT compression scheme is handled by this reader.");
171#else
172 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << endl;
173 return;
174#endif
175 }
176
177 fColumnOrdering.resize(fTable.sorted_cols.size());
178 for (auto it=fColumnOrdering.begin(); it != fColumnOrdering.end(); it++)
179 (*it) = kOrderByRow;
180 //Get compressed specific keywords
181 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
182 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
183
184 //give it some space for uncompressing
185 AllocateBuffers();
186
187 //read the file's catalog
188 ReadCatalog();
189
190 //check that heap agrees with head
191 //CheckIfFileIsConsistent();
192 }
193
194 // Copy decompressed data to location requested by user
195 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
196 {
197 if (!fTable.is_compressed)
198 {
199 fits::MoveColumnDataToUserSpace(dest, src, c);
200 return;
201 }
202
203 memcpy(dest, src, c.num*c.size);
204 }
205
206 vector<char> fBuffer; ///<store the uncompressed rows
207 vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
208 vector<char> fCompressedBuffer; ///<compressed rows
209 vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
210
211 size_t fNumTiles; ///< Total number of tiles
212 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
213 int64_t fCurrentRow; ///< current row in memory signed because we need -1
214
215 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
216 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
217
218 vector<vector<pair<int64_t, int64_t>>> fCatalog;///< Catalog, i.e. the main table that points to the compressed data.
219 vector<size_t> fTileSize; ///< size in bytes of each compressed tile
220 vector<vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
221
222 Checksum fRawsum; ///< Checksum of the uncompressed, raw data
223
224 // Get buffer space
225 void AllocateBuffers()
226 {
227 fBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
228
229 fTransposedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
230 fCompressedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile +
231 fTable.num_cols*(sizeof(BlockHeader)+256) + //use a bit more memory for block headers. 256 char coding the compression sequence max.
232 sizeof(TileHeader), //a bit more for the tile headers
233 8); //and a bit more for checksuming
234 }
235
236 // Read catalog data. I.e. the address of the compressed data inside the heap
237 void ReadCatalog()
238 {
239 vector<char> readBuf(16);
240 fCatalog.resize(fNumTiles);
241
242 const streampos catalogStart = tellg();
243
244fChkData.reset();
245
246 //do the actual reading
247 for (uint32_t i=0;i<fNumTiles;i++)
248 for (uint32_t j=0;j<fTable.num_cols;j++)
249 {
250 read(readBuf.data(), 2*sizeof(int64_t));
251 fChkData.add(readBuf);
252 //swap the bytes
253 int64_t tempValues[2] = {0,0};
254 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
255 if (tempValues[0] < 0 || tempValues[1] < 0)
256 {
257 clear(rdstate()|ios::badbit);
258#ifdef __EXCEPTIONS
259 throw runtime_error("Negative value in the catalog");
260#else
261 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
262 return;
263#endif
264 }
265 //add catalog entry
266 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
267 }
268
269 //compute the total size of each compressed tile
270 fTileSize.resize(fNumTiles);
271 fTileOffsets.resize(fNumTiles);
272 for (uint32_t i=0;i<fNumTiles;i++)
273 {
274 fTileSize[i] = 0;
275 for (uint32_t j=0;j<fTable.num_cols;j++)
276 {
277 fTileSize[i] += fCatalog[i][j].first;
278 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
279 }
280 }
281 //see if there is a gap before heap data
282 fHeapOff = tellg()+fTable.GetHeapShift();
283 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
284
285 if (!fCopy.is_open())
286 return;
287
288 //write catalog and heap gap to target file
289 seekg(catalogStart);
290
291 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
292
293 vector<char> buf(catSize);
294 read(buf.data(), catSize);
295
296 fCopy.write(buf.data(), catSize);
297 if (!fCopy)
298 clear(rdstate()|ios::badbit);
299 }
300
301 //overrides fits.h method with empty one
302 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
303 virtual void WriteRowToCopyFile(size_t row)
304 {
305 if (row == fRow+1)
306 fRawsum.add(fBufferRow, false);
307 }
308
309 // Compressed version of the read row
310 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
311 {
312 if (rowNum >= GetNumRows())
313 return false;
314
315 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
316 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
317
318 bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1));
319
320 fCurrentRow = rowNum;
321 //should we read yet another chunk of data ?
322 if (requestedTile != currentTile)
323 {
324 //read yet another chunk from the file
325 const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(TileHeader);
326
327 //skip to the beginning of the tile
328 const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(TileHeader);
329
330 seekg(fHeapOff+tileStart);
331
332 //calculate the 32 bits offset of the current tile.
333 const uint32_t offset = (tileStart + fHeapFromDataStart)%4;
334
335 //point the tile header where it should be
336 //we ain't checking the header now
337// TileHeader* tHead = reinterpret_cast<TileHeader*>(fCompressedBuffer.data()+offset);
338
339 ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8));
340
341 //read one tile from disk
342 read(fCompressedBuffer.data()+offset, sizeToRead);
343
344 if (addCheckSum)
345 fChkData.add(fCompressedBuffer);
346
347 if (requestedTile == currentTile+1 &&
348 fCopy.is_open() &&
349 fCopy.good())
350 {
351 fCopy.write(fCompressedBuffer.data()+offset, sizeToRead);
352 if (!fCopy)
353 clear(rdstate()|ios::badbit);
354 }
355 else
356 if (fCopy.is_open())
357 clear(rdstate()|ios::badbit);
358
359 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
360
361 //uncompress it
362 UncompressBuffer(requestedTile, thisRoundNumRows, offset+sizeof(TileHeader));
363
364 // pointer to column (source buffer)
365 const char *src = fTransposedBuffer.data();
366
367 uint32_t i=0;
368 for (auto it=fTable.sorted_cols.begin(); it!=fTable.sorted_cols.end(); it++, i++)
369 {
370 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
371
372 switch (fColumnOrdering[i])
373 {
374 case kOrderByRow:
375 // regular, "semi-transposed" copy
376 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
377 {
378 memcpy(dest, src, it->bytes);
379 src += it->bytes; // next column
380 }
381 break;
382
383 case kOrderByCol:
384 // transposed copy
385 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
386 {
387 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
388 {
389 memcpy(dest, src, it->size);
390 src += it->size; // next element
391 }
392 }
393 break;
394 default:
395 clear(rdstate()|ios::badbit);
396 #ifdef __EXCEPTIONS
397 throw runtime_error("Unkown column ordering scheme found");
398 #else
399 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
400 return false;
401 #endif
402 break;
403 };
404 }
405 }
406
407 //Data loaded and uncompressed. Copy it to destination
408 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
409 return good();
410 }
411
412 // Read a bunch of uncompressed data
413 uint32_t UncompressUNCOMPRESSED(char* dest,
414 const char* src,
415 uint32_t numElems,
416 uint32_t sizeOfElems)
417 {
418 memcpy(dest, src, numElems*sizeOfElems);
419 return numElems*sizeOfElems;
420 }
421
422 // Read a bunch of data compressed with the Huffman algorithm
423 uint32_t UncompressHUFFMAN16(char* dest,
424 const char* src,
425 uint32_t numChunks)
426 {
427 vector<uint16_t> uncompressed;
428
429 //read compressed sizes (one per row)
430 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
431 src += sizeof(uint32_t)*numChunks;
432
433 //uncompress the rows, one by one
434 uint32_t sizeWritten = 0;
435 for (uint32_t j=0;j<numChunks;j++)
436 {
437 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
438
439 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
440
441 sizeWritten += uncompressed.size()*sizeof(uint16_t);
442 dest += uncompressed.size()*sizeof(uint16_t);
443 src += compressedSizes[j];
444 }
445 return sizeWritten;
446 }
447
448 // Apply the inverse transform of the integer smoothing
449 uint32_t UnApplySMOOTHING(int16_t* data,
450 uint32_t numElems)
451 {
452 //un-do the integer smoothing
453 for (uint32_t j=2;j<numElems;j++)
454 data[j] = data[j] + (data[j-1]+data[j-2])/2;
455
456 return numElems*sizeof(uint16_t);
457 }
458
459 // Data has been read from disk. Uncompress it !
460 void UncompressBuffer(const uint32_t &catalogCurrentRow,
461 const uint32_t &thisRoundNumRows,
462 const uint32_t offset)
463 {
464 char *dest = fTransposedBuffer.data();
465
466 //uncompress column by column
467 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
468 {
469 const fits::Table::Column &col = fTable.sorted_cols[i];
470 if (col.num == 0)
471 continue;
472
473 //get the compression flag
474 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i]+offset;
475
476 const BlockHeader* head = reinterpret_cast<BlockHeader*>(&fCompressedBuffer[compressedOffset]);
477
478 fColumnOrdering[i] = head->ordering;
479
480 const uint32_t numRows = (head->ordering==kOrderByRow) ? thisRoundNumRows : col.num;
481 const uint32_t numCols = (head->ordering==kOrderByCol) ? thisRoundNumRows : col.num;
482
483 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(BlockHeader)+sizeof(uint16_t)*head->numProcs;
484
485 for (int32_t j=head->numProcs-1;j >= 0; j--)
486 {
487 uint32_t sizeWritten=0;
488
489 switch (head->processings[j])
490 {
491 case kFactRaw:
492 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
493 break;
494
495 case kFactSmoothing:
496 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
497 break;
498
499 case kFactHuffman16:
500 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
501 break;
502
503 default:
504 clear(rdstate()|ios::badbit);
505#ifdef __EXCEPTIONS
506 throw runtime_error("Unknown processing applied to data. Aborting");
507#else
508 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
509 return;
510#endif
511 }
512 //increment destination counter only when processing done.
513 if (j==0)
514 dest+= sizeWritten;
515 }
516 }
517 }
518
519 void CheckIfFileIsConsistent()
520 {
521 //goto start of heap
522 streamoff whereAreWe = tellg();
523 seekg(fHeapOff);
524
525 //init number of rows to zero
526 uint64_t numRows = 0;
527
528 //get number of columns from header
529 size_t numCols = fTable.num_cols;
530
531 vector<vector<pair<int64_t, int64_t> > > catalog;
532
533 TileHeader tileHead;
534 BlockHeader columnHead;
535 streamoff offsetInHeap = 0;
536 //skip through the heap
537 while (true)
538 {
539 read((char*)(&tileHead), sizeof(TileHeader));
540 //end of file
541 if (!good())
542 break;
543 //padding or corrupt data
544 if (memcmp(tileHead.id, "TILE", 4))
545 {
546 clear(rdstate()|ios::badbit);
547 break;
548 }
549
550 //a new tile begins here
551 catalog.push_back(vector<pair<int64_t, int64_t> >(0));
552 offsetInHeap += sizeof(TileHeader);
553
554 //skip through the columns
555 for (size_t i=0;i<numCols;i++)
556 {
557 //zero sized column do not have headers. Skip it
558 if (fTable.sorted_cols[i].num == 0)
559 {
560 catalog.back().push_back(make_pair(0,0));
561 continue;
562 }
563 //read column header
564 read((char*)(&columnHead), sizeof(BlockHeader));
565 //corrupted tile
566 if (!good())
567 break;
568 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
569 offsetInHeap += columnHead.size;
570 seekg(fHeapOff+offsetInHeap);
571 }
572
573 //if we ain't good, this means that something went wrong inside the current tile.
574 if (!good())
575 {
576 catalog.pop_back();
577 break;
578 }
579
580 //current tile is complete. Add rows
581 numRows += tileHead.numRows;
582 }
583
584 if (catalog.size() != fCatalog.size() ||
585 numRows != fTable.num_rows)
586 {
587 clear(rdstate()|ios::badbit);
588#ifdef __EXCEPTIONS
589 throw runtime_error("Heap data does not agree with header.");
590#else
591 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
592 return;
593#endif
594 }
595
596 for (uint32_t i=0;i<catalog.size(); i++)
597 for (uint32_t j=0;j<numCols;j++)
598 {
599 if (catalog[i][j].first != fCatalog[i][j].first ||
600 catalog[i][j].second != fCatalog[i][j].second)
601 {
602 clear(rdstate()|ios::badbit);
603#ifdef __EXCEPTIONS
604 throw runtime_error("Heap data does not agree with header.");
605#else
606 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
607 return;
608#endif
609 }
610 }
611 //go back to start of heap
612 seekg(whereAreWe);
613 }
614
615};//class zfits
616
617#ifndef __MARS__
618}; //namespace std
619#endif
620
621#endif
Note: See TracBrowser for help on using the repository browser.