source: trunk/Mars/mcore/zfits.h@ 17269

Last change on this file since 17269 was 17261, checked in by tbretz, 11 years ago
Removed 'using namespace' from header; some minor simplifications
File size: 20.3 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_zfits
9#define MARS_zfits
10
11#include "fits.h"
12#include "huffman.h"
13
14#include "FITS.h"
15
16#ifndef __MARS__
17namespace std
18{
19#endif
20
21class zfits : public fits
22{
23public:
24
25 // Basic constructor
26 zfits(const string& fname, const string& tableName="", bool force=false)
27 : fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
28 {
29 open(fname.c_str());
30 Constructor(fname, "", tableName, force);
31 InitCompressionReading();
32 }
33
34 // Alternative contstructor
35 zfits(const string& fname, const string& fout, const string& tableName, bool force=false)
36 : fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
37 {
38 open(fname.c_str());
39 Constructor(fname, fout, tableName, force);
40 InitCompressionReading();
41 }
42
43 // Skip the next row
44 bool SkipNextRow()
45 {
46 if (!fTable.is_compressed)
47 return fits::SkipNextRow();
48
49 fRow++;
50 return true;
51 }
52
53 virtual bool IsFileOk() const
54 {
55 bool rawsum = true;
56
57 if (HasKey("RAWSUM"))
58 {
59 std::ostringstream str;
60 str << fRawsum.val();
61 rawsum = (GetStr("RAWSUM") == str.str());
62 }
63
64 return fits::IsFileOk() && rawsum;
65 };
66
67 size_t GetNumRows() const
68 {
69 if (fTable.is_compressed)
70 return fTable.Get<size_t>("ZNAXIS2");
71 else
72 return fTable.Get<size_t>("NAXIS2");
73 }
74 size_t GetBytesPerRow() const
75 {
76 if (fTable.is_compressed)
77 return fTable.Get<size_t>("ZNAXIS1");
78 else
79 return fTable.Get<size_t>("NAXIS1");
80 }
81
82protected:
83
84 // Stage the requested row to internal buffer
85 // Does NOT return data to users
86 virtual void StageRow(size_t row, char* dest)
87 {
88 if (!fTable.is_compressed)
89 {
90 fits::StageRow(row, dest);
91 return;
92 }
93 ReadBinaryRow(row, dest);
94 }
95
96private:
97
98 // Do what it takes to initialize the compressed structured
99 void InitCompressionReading()
100 {
101 if (!fTable.is_compressed)
102 return;
103
104 //The constructor may have failed
105 if (!good())
106 return;
107
108 if (fTable.is_compressed)
109 for (auto it=fTable.sorted_cols.cbegin(); it!= fTable.sorted_cols.cend(); it++)
110 {
111 if (it->comp == kCompFACT)
112 continue;
113
114 clear(rdstate()|ios::badbit);
115#ifdef __EXCEPTIONS
116 throw runtime_error("Only the FACT compression scheme is handled by this reader.");
117#else
118 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << endl;
119 return;
120#endif
121 }
122
123 fColumnOrdering.resize(fTable.sorted_cols.size(), FITS::kOrderByRow);
124
125 //Get compressed specific keywords
126 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
127 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
128
129 //read the file's catalog
130 ReadCatalog();
131
132 //give it some space for uncompressing
133 AllocateBuffers();
134
135 //check that heap agrees with head
136 //CheckIfFileIsConsistent();
137 }
138
139 // Copy decompressed data to location requested by user
140 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
141 {
142 if (!fTable.is_compressed)
143 {
144 fits::MoveColumnDataToUserSpace(dest, src, c);
145 return;
146 }
147
148 memcpy(dest, src, c.num*c.size);
149 }
150
151 std::vector<char> fBuffer; ///<store the uncompressed rows
152 std::vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
153 std::vector<char> fCompressedBuffer; ///<compressed rows
154 std::vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
155
156 size_t fNumTiles; ///< Total number of tiles
157 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
158 int64_t fCurrentRow; ///< current row in memory signed because we need -1
159
160 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
161 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
162
163 std::vector<std::vector<pair<int64_t, int64_t>>> fCatalog; ///< Catalog, i.e. the main table that points to the compressed data.
164 std::vector<size_t> fTileSize; ///< size in bytes of each compressed tile
165 std::vector<std::vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
166
167 Checksum fRawsum; ///< Checksum of the uncompressed, raw data
168
169 // Get buffer space
170 void AllocateBuffers()
171 {
172 uint32_t buffer_size = fTable.bytes_per_row*fNumRowsPerTile;
173 uint32_t compressed_buffer_size = fTable.bytes_per_row*fNumRowsPerTile +
174 //use a bit more memory for block headers. 256 char coding the compression sequence max.
175 fTable.num_cols*(sizeof(FITS::BlockHeader)+256) +
176 //a bit more for the tile headers
177 sizeof(FITS::TileHeader) +
178 //and a bit more for checksuming
179 8;
180
181 if (buffer_size % 4 != 0)
182 buffer_size += 4 - (buffer_size%4);
183
184 if (compressed_buffer_size % 4 != 0)
185 compressed_buffer_size += 4 - (compressed_buffer_size%4);
186
187 fBuffer.resize(buffer_size);
188
189 fTransposedBuffer.resize(buffer_size);
190 fCompressedBuffer.resize(compressed_buffer_size);
191 }
192
193 // Read catalog data. I.e. the address of the compressed data inside the heap
194 void ReadCatalog()
195 {
196 std::vector<char> readBuf(16);
197 fCatalog.resize(fNumTiles);
198
199 const streampos catalogStart = tellg();
200
201 fChkData.reset();
202
203 //do the actual reading
204 for (uint32_t i=0;i<fNumTiles;i++)
205 for (uint32_t j=0;j<fTable.num_cols;j++)
206 {
207 read(readBuf.data(), 2*sizeof(int64_t));
208 fChkData.add(readBuf);
209 //swap the bytes
210 int64_t tempValues[2] = {0,0};
211 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
212 if (tempValues[0] < 0 || tempValues[1] < 0)
213 {
214 clear(rdstate()|ios::badbit);
215#ifdef __EXCEPTIONS
216 throw runtime_error("Negative value in the catalog");
217#else
218 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
219 return;
220#endif
221 }
222 //add catalog entry
223 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
224 }
225
226 //see if there is a gap before heap data
227 fHeapOff = tellg()+fTable.GetHeapShift();
228 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
229
230 //check if the catalog has been shrinked
231 uint32_t shrink_factor = 1;
232 if (HasKey("ZSHRINK"))
233 shrink_factor = GetInt("ZSHRINK");
234
235 if (shrink_factor != 1)
236 {
237 CheckIfFileIsConsistent(true);
238 fNumTiles = fCatalog.size();
239 fNumRowsPerTile /= shrink_factor;
240 }
241
242 //compute the total size of each compressed tile
243 fTileSize.resize(fNumTiles);
244 fTileOffsets.resize(fNumTiles);
245 for (uint32_t i=0;i<fNumTiles;i++)
246 {
247 fTileSize[i] = 0;
248 for (uint32_t j=0;j<fTable.num_cols;j++)
249 {
250 fTileSize[i] += fCatalog[i][j].first;
251 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
252 }
253 }
254
255 if (!fCopy.is_open())
256 return;
257
258 //write catalog and heap gap to target file
259 seekg(catalogStart);
260
261 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
262
263 std::vector<char> buf(catSize);
264 read(buf.data(), catSize);
265
266 fCopy.write(buf.data(), catSize);
267 if (!fCopy)
268 clear(rdstate()|ios::badbit);
269 }
270
271 //overrides fits.h method with empty one
272 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
273 virtual void WriteRowToCopyFile(size_t row)
274 {
275 if (row == fRow+1)
276 fRawsum.add(fBufferRow, false);
277 }
278
279 // Compressed version of the read row
280 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
281 {
282 if (rowNum >= GetNumRows())
283 return false;
284
285 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
286 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
287
288 bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1));
289
290 fCurrentRow = rowNum;
291 //should we read yet another chunk of data ?
292 if (requestedTile != currentTile)
293 {
294 //read yet another chunk from the file
295 const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(FITS::TileHeader);
296
297 //skip to the beginning of the tile
298 const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(FITS::TileHeader);
299
300 seekg(fHeapOff+tileStart);
301
302 //calculate the 32 bits offset of the current tile.
303 const uint32_t offset = (tileStart + fHeapFromDataStart)%4;
304
305 //point the tile header where it should be
306 //we ain't checking the header now
307// TileHeader* tHead = reinterpret_cast<TileHeader*>(fCompressedBuffer.data()+offset);
308
309 ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8));
310
311 //read one tile from disk
312 read(fCompressedBuffer.data()+offset, sizeToRead);
313
314 if (addCheckSum)
315 fChkData.add(fCompressedBuffer);
316
317 if (requestedTile == currentTile+1 &&
318 fCopy.is_open() &&
319 fCopy.good())
320 {
321 fCopy.write(fCompressedBuffer.data()+offset, sizeToRead);
322 if (!fCopy)
323 clear(rdstate()|ios::badbit);
324 }
325 else
326 if (fCopy.is_open())
327 clear(rdstate()|ios::badbit);
328
329 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
330
331 //uncompress it
332 UncompressBuffer(requestedTile, thisRoundNumRows, offset+sizeof(FITS::TileHeader));
333
334 // pointer to column (source buffer)
335 const char *src = fTransposedBuffer.data();
336
337 uint32_t i=0;
338 for (auto it=fTable.sorted_cols.cbegin(); it!=fTable.sorted_cols.cend(); it++, i++)
339 {
340 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
341
342 switch (fColumnOrdering[i])
343 {
344 case FITS::kOrderByRow:
345 // regular, "semi-transposed" copy
346 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
347 {
348 memcpy(dest, src, it->bytes);
349 src += it->bytes; // next column
350 }
351 break;
352
353 case FITS::kOrderByCol:
354 // transposed copy
355 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
356 {
357 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
358 {
359 memcpy(dest, src, it->size);
360 src += it->size; // next element
361 }
362 }
363 break;
364
365 default:
366 clear(rdstate()|ios::badbit);
367#ifdef __EXCEPTIONS
368 throw runtime_error("Unkown column ordering scheme found");
369#else
370 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
371 return false;
372#endif
373 break;
374 };
375 }
376 }
377
378 //Data loaded and uncompressed. Copy it to destination
379 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
380 return good();
381 }
382
383 // Read a bunch of uncompressed data
384 uint32_t UncompressUNCOMPRESSED(char* dest,
385 const char* src,
386 uint32_t numElems,
387 uint32_t sizeOfElems)
388 {
389 memcpy(dest, src, numElems*sizeOfElems);
390 return numElems*sizeOfElems;
391 }
392
393 // Read a bunch of data compressed with the Huffman algorithm
394 uint32_t UncompressHUFFMAN16(char* dest,
395 const char* src,
396 uint32_t numChunks)
397 {
398 std::vector<uint16_t> uncompressed;
399
400 //read compressed sizes (one per row)
401 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
402 src += sizeof(uint32_t)*numChunks;
403
404 //uncompress the rows, one by one
405 uint32_t sizeWritten = 0;
406 for (uint32_t j=0;j<numChunks;j++)
407 {
408 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
409
410 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
411
412 sizeWritten += uncompressed.size()*sizeof(uint16_t);
413 dest += uncompressed.size()*sizeof(uint16_t);
414 src += compressedSizes[j];
415 }
416 return sizeWritten;
417 }
418
419 // Apply the inverse transform of the integer smoothing
420 uint32_t UnApplySMOOTHING(int16_t* data,
421 uint32_t numElems)
422 {
423 //un-do the integer smoothing
424 for (uint32_t j=2;j<numElems;j++)
425 data[j] = data[j] + (data[j-1]+data[j-2])/2;
426
427 return numElems*sizeof(uint16_t);
428 }
429
430 // Data has been read from disk. Uncompress it !
431 void UncompressBuffer(const uint32_t &catalogCurrentRow,
432 const uint32_t &thisRoundNumRows,
433 const uint32_t offset)
434 {
435 char *dest = fTransposedBuffer.data();
436
437 //uncompress column by column
438 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
439 {
440 const fits::Table::Column &col = fTable.sorted_cols[i];
441 if (col.num == 0)
442 continue;
443
444 //get the compression flag
445 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i]+offset;
446
447 const FITS::BlockHeader* head = reinterpret_cast<FITS::BlockHeader*>(&fCompressedBuffer[compressedOffset]);
448
449 fColumnOrdering[i] = head->ordering;
450
451 const uint32_t numRows = (head->ordering==FITS::kOrderByRow) ? thisRoundNumRows : col.num;
452 const uint32_t numCols = (head->ordering==FITS::kOrderByCol) ? thisRoundNumRows : col.num;
453
454 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(FITS::BlockHeader)+sizeof(uint16_t)*head->numProcs;
455
456 for (int32_t j=head->numProcs-1;j >= 0; j--)
457 {
458 uint32_t sizeWritten=0;
459
460 switch (head->processings[j])
461 {
462 case FITS::kFactRaw:
463 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
464 break;
465
466 case FITS::kFactSmoothing:
467 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
468 break;
469
470 case FITS::kFactHuffman16:
471 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
472 break;
473
474 default:
475 clear(rdstate()|ios::badbit);
476
477 std::ostringstream str;
478 str << "Unkown processing applied to data. Col " << i << " proc " << j << " out of " << (int)head->numProcs;
479#ifdef __EXCEPTIONS
480 throw runtime_error(str.str());
481#else
482 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
483 return;
484#endif
485 }
486 //increment destination counter only when processing done.
487 if (j==0)
488 dest+= sizeWritten;
489 }
490 }
491 }
492
493 void CheckIfFileIsConsistent(bool update_catalog=false)
494 {
495 //goto start of heap
496 streamoff whereAreWe = tellg();
497 seekg(fHeapOff);
498
499 //init number of rows to zero
500 uint64_t numRows = 0;
501
502 //get number of columns from header
503 size_t numCols = fTable.num_cols;
504
505 std::vector<std::vector<std::pair<int64_t, int64_t> > > catalog;
506
507 FITS::TileHeader tileHead;
508 FITS::BlockHeader columnHead;
509
510 streamoff offsetInHeap = 0;
511 //skip through the heap
512 while (true)
513 {
514 read((char*)(&tileHead), sizeof(FITS::TileHeader));
515 //end of file
516 if (!good())
517 break;
518
519 //padding or corrupt data
520 if (memcmp(tileHead.id, "TILE", 4))
521 {
522 clear(rdstate()|ios::badbit);
523 break;
524 }
525
526 //a new tile begins here
527 catalog.emplace_back(std::vector<std::pair<int64_t, int64_t> >(0));
528 offsetInHeap += sizeof(FITS::TileHeader);
529
530 //skip through the columns
531 for (size_t i=0;i<numCols;i++)
532 {
533 //zero sized column do not have headers. Skip it
534 if (fTable.sorted_cols[i].num == 0)
535 {
536 catalog.back().emplace_back(0,0);
537 continue;
538 }
539
540 //read column header
541 read((char*)(&columnHead), sizeof(FITS::BlockHeader));
542
543 //corrupted tile
544 if (!good())
545 break;
546
547 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
548 offsetInHeap += columnHead.size;
549 seekg(fHeapOff+offsetInHeap);
550 }
551
552 //if we ain't good, this means that something went wrong inside the current tile.
553 if (!good())
554 {
555 catalog.pop_back();
556 break;
557 }
558 //current tile is complete. Add rows
559 numRows += tileHead.numRows;
560 }
561
562 if (numRows != fTable.num_rows)
563 {
564 clear(rdstate()|ios::badbit);
565 std::ostringstream str;
566 str << "Heap data does not agree with header: " << numRows << " calculated vs " << fTable.num_rows << " from header.";
567#ifdef __EXCEPTIONS
568 throw runtime_error(str.str());
569#else
570 gLog << ___err___ << "ERROR - " << str.str() << endl;
571 return;
572#endif
573 }
574
575 if (update_catalog)
576 {
577 fCatalog = catalog;
578 //clear the bad bit before seeking back (we hit eof)
579 clear();
580 seekg(whereAreWe);
581 return;
582 }
583
584 if (catalog.size() != fCatalog.size())
585 {
586 clear(rdstate()|ios::badbit);
587#ifdef __EXCEPTIONS
588 throw runtime_error("Heap data does not agree with header.");
589#else
590 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
591 return;
592#endif
593 }
594
595 for (uint32_t i=0;i<catalog.size(); i++)
596 for (uint32_t j=0;j<numCols;j++)
597 {
598 if (catalog[i][j].first != fCatalog[i][j].first ||
599 catalog[i][j].second != fCatalog[i][j].second)
600 {
601 clear(rdstate()|ios::badbit);
602#ifdef __EXCEPTIONS
603 throw runtime_error("Heap data does not agree with header.");
604#else
605 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
606 return;
607#endif
608 }
609 }
610 //go back to start of heap
611 //clear the bad bit before seeking back (we hit eof)
612 clear();
613 seekg(whereAreWe);
614 }
615
616};//class zfits
617
618#ifndef __MARS__
619}; //namespace std
620#endif
621
622#endif
Note: See TracBrowser for help on using the repository browser.