source: trunk/Mars/mcore/zfits.h@ 17274

Last change on this file since 17274 was 17274, checked in by tbretz, 11 years ago
Added more missing std::
File size: 20.4 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_zfits
9#define MARS_zfits
10
11#include "fits.h"
12#include "huffman.h"
13
14#include "FITS.h"
15
16class zfits : public fits
17{
18public:
19
20 // Basic constructor
21 zfits(const std::string& fname, const std::string& tableName="", bool force=false)
22 : fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
23 {
24 open(fname.c_str());
25 Constructor(fname, "", tableName, force);
26 InitCompressionReading();
27 }
28
29 // Alternative contstructor
30 zfits(const std::string& fname, const std::string& fout, const std::string& tableName, bool force=false)
31 : fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
32 {
33 open(fname.c_str());
34 Constructor(fname, fout, tableName, force);
35 InitCompressionReading();
36 }
37
38 // Skip the next row
39 bool SkipNextRow()
40 {
41 if (!fTable.is_compressed)
42 return fits::SkipNextRow();
43
44 fRow++;
45 return true;
46 }
47
48 virtual bool IsFileOk() const
49 {
50 bool rawsum = true;
51
52 if (HasKey("RAWSUM"))
53 {
54 std::ostringstream str;
55 str << fRawsum.val();
56 rawsum = (GetStr("RAWSUM") == str.str());
57 }
58
59 return fits::IsFileOk() && rawsum;
60 };
61
62 size_t GetNumRows() const
63 {
64 if (fTable.is_compressed)
65 return fTable.Get<size_t>("ZNAXIS2");
66 else
67 return fTable.Get<size_t>("NAXIS2");
68 }
69 size_t GetBytesPerRow() const
70 {
71 if (fTable.is_compressed)
72 return fTable.Get<size_t>("ZNAXIS1");
73 else
74 return fTable.Get<size_t>("NAXIS1");
75 }
76
77protected:
78
79 // Stage the requested row to internal buffer
80 // Does NOT return data to users
81 virtual void StageRow(size_t row, char* dest)
82 {
83 if (!fTable.is_compressed)
84 {
85 fits::StageRow(row, dest);
86 return;
87 }
88 ReadBinaryRow(row, dest);
89 }
90
91private:
92
93 // Do what it takes to initialize the compressed structured
94 void InitCompressionReading()
95 {
96 if (!fTable.is_compressed)
97 return;
98
99 //The constructor may have failed
100 if (!good())
101 return;
102
103 if (fTable.is_compressed)
104 for (auto it=fTable.sorted_cols.cbegin(); it!= fTable.sorted_cols.cend(); it++)
105 {
106 if (it->comp == kCompFACT)
107 continue;
108
109 clear(rdstate()|std::ios::badbit);
110#ifdef __EXCEPTIONS
111 throw std::runtime_error("Only the FACT compression scheme is handled by this reader.");
112#else
113 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << endl;
114 return;
115#endif
116 }
117
118 fColumnOrdering.resize(fTable.sorted_cols.size(), FITS::kOrderByRow);
119
120 //Get compressed specific keywords
121 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
122 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
123
124 //read the file's catalog
125 ReadCatalog();
126
127 //give it some space for uncompressing
128 AllocateBuffers();
129
130 //check that heap agrees with head
131 //CheckIfFileIsConsistent();
132 }
133
134 // Copy decompressed data to location requested by user
135 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
136 {
137 if (!fTable.is_compressed)
138 {
139 fits::MoveColumnDataToUserSpace(dest, src, c);
140 return;
141 }
142
143 memcpy(dest, src, c.num*c.size);
144 }
145
146 std::vector<char> fBuffer; ///<store the uncompressed rows
147 std::vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
148 std::vector<char> fCompressedBuffer; ///<compressed rows
149 std::vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
150
151 size_t fNumTiles; ///< Total number of tiles
152 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
153 int64_t fCurrentRow; ///< current row in memory signed because we need -1
154
155 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
156 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
157
158 std::vector<std::vector<std::pair<int64_t, int64_t>>> fCatalog; ///< Catalog, i.e. the main table that points to the compressed data.
159 std::vector<size_t> fTileSize; ///< size in bytes of each compressed tile
160 std::vector<std::vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
161
162 Checksum fRawsum; ///< Checksum of the uncompressed, raw data
163
164 // Get buffer space
165 void AllocateBuffers()
166 {
167 uint32_t buffer_size = fTable.bytes_per_row*fNumRowsPerTile;
168 uint32_t compressed_buffer_size = fTable.bytes_per_row*fNumRowsPerTile +
169 //use a bit more memory for block headers. 256 char coding the compression sequence max.
170 fTable.num_cols*(sizeof(FITS::BlockHeader)+256) +
171 //a bit more for the tile headers
172 sizeof(FITS::TileHeader) +
173 //and a bit more for checksuming
174 8;
175
176 if (buffer_size % 4 != 0)
177 buffer_size += 4 - (buffer_size%4);
178
179 if (compressed_buffer_size % 4 != 0)
180 compressed_buffer_size += 4 - (compressed_buffer_size%4);
181
182 fBuffer.resize(buffer_size);
183
184 fTransposedBuffer.resize(buffer_size);
185 fCompressedBuffer.resize(compressed_buffer_size);
186 }
187
188 // Read catalog data. I.e. the address of the compressed data inside the heap
189 void ReadCatalog()
190 {
191 std::vector<char> readBuf(16);
192 fCatalog.resize(fNumTiles);
193
194 const streampos catalogStart = tellg();
195
196 fChkData.reset();
197
198 //do the actual reading
199 for (uint32_t i=0;i<fNumTiles;i++)
200 for (uint32_t j=0;j<fTable.num_cols;j++)
201 {
202 read(readBuf.data(), 2*sizeof(int64_t));
203 fChkData.add(readBuf);
204 //swap the bytes
205 int64_t tempValues[2] = {0,0};
206 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
207 if (tempValues[0] < 0 || tempValues[1] < 0)
208 {
209 clear(rdstate()|std::ios::badbit);
210#ifdef __EXCEPTIONS
211 throw std::runtime_error("Negative value in the catalog");
212#else
213 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
214 return;
215#endif
216 }
217 //add catalog entry
218 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
219 }
220
221 //see if there is a gap before heap data
222 fHeapOff = tellg()+fTable.GetHeapShift();
223 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
224
225 //check if the catalog has been shrinked
226 uint32_t shrink_factor = 1;
227 if (HasKey("ZSHRINK"))
228 shrink_factor = GetInt("ZSHRINK");
229
230 if (shrink_factor != 1)
231 {
232 CheckIfFileIsConsistent(true);
233 fNumTiles = fCatalog.size();
234 fNumRowsPerTile /= shrink_factor;
235 }
236
237 //compute the total size of each compressed tile
238 fTileSize.resize(fNumTiles);
239 fTileOffsets.resize(fNumTiles);
240 for (uint32_t i=0;i<fNumTiles;i++)
241 {
242 fTileSize[i] = 0;
243 for (uint32_t j=0;j<fTable.num_cols;j++)
244 {
245 fTileSize[i] += fCatalog[i][j].first;
246 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
247 }
248 }
249
250 if (!fCopy.is_open())
251 return;
252
253 //write catalog and heap gap to target file
254 seekg(catalogStart);
255
256 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
257
258 std::vector<char> buf(catSize);
259 read(buf.data(), catSize);
260
261 fCopy.write(buf.data(), catSize);
262 if (!fCopy)
263 clear(rdstate()|std::ios::badbit);
264 }
265
266 //overrides fits.h method with empty one
267 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
268 virtual void WriteRowToCopyFile(size_t row)
269 {
270 if (row == fRow+1)
271 fRawsum.add(fBufferRow, false);
272 }
273
274 // Compressed version of the read row
275 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
276 {
277 if (rowNum >= GetNumRows())
278 return false;
279
280 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
281 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
282
283 bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1));
284
285 fCurrentRow = rowNum;
286 //should we read yet another chunk of data ?
287 if (requestedTile != currentTile)
288 {
289 //read yet another chunk from the file
290 const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(FITS::TileHeader);
291
292 //skip to the beginning of the tile
293 const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(FITS::TileHeader);
294
295 seekg(fHeapOff+tileStart);
296
297 //calculate the 32 bits offset of the current tile.
298 const uint32_t offset = (tileStart + fHeapFromDataStart)%4;
299
300 //point the tile header where it should be
301 //we ain't checking the header now
302// TileHeader* tHead = reinterpret_cast<TileHeader*>(fCompressedBuffer.data()+offset);
303
304 ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8));
305
306 //read one tile from disk
307 read(fCompressedBuffer.data()+offset, sizeToRead);
308
309 if (addCheckSum)
310 fChkData.add(fCompressedBuffer);
311
312 if (requestedTile == currentTile+1 &&
313 fCopy.is_open() &&
314 fCopy.good())
315 {
316 fCopy.write(fCompressedBuffer.data()+offset, sizeToRead);
317 if (!fCopy)
318 clear(rdstate()|std::ios::badbit);
319 }
320 else
321 if (fCopy.is_open())
322 clear(rdstate()|std::ios::badbit);
323
324 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
325
326 //uncompress it
327 UncompressBuffer(requestedTile, thisRoundNumRows, offset+sizeof(FITS::TileHeader));
328
329 // pointer to column (source buffer)
330 const char *src = fTransposedBuffer.data();
331
332 uint32_t i=0;
333 for (auto it=fTable.sorted_cols.cbegin(); it!=fTable.sorted_cols.cend(); it++, i++)
334 {
335 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
336
337 switch (fColumnOrdering[i])
338 {
339 case FITS::kOrderByRow:
340 // regular, "semi-transposed" copy
341 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
342 {
343 memcpy(dest, src, it->bytes);
344 src += it->bytes; // next column
345 }
346 break;
347
348 case FITS::kOrderByCol:
349 // transposed copy
350 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
351 {
352 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
353 {
354 memcpy(dest, src, it->size);
355 src += it->size; // next element
356 }
357 }
358 break;
359
360 default:
361 clear(rdstate()|std::ios::badbit);
362#ifdef __EXCEPTIONS
363 throw std::runtime_error("Unkown column ordering scheme found");
364#else
365 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
366 return false;
367#endif
368 break;
369 };
370 }
371 }
372
373 //Data loaded and uncompressed. Copy it to destination
374 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
375 return good();
376 }
377
378 // Read a bunch of uncompressed data
379 uint32_t UncompressUNCOMPRESSED(char* dest,
380 const char* src,
381 uint32_t numElems,
382 uint32_t sizeOfElems)
383 {
384 memcpy(dest, src, numElems*sizeOfElems);
385 return numElems*sizeOfElems;
386 }
387
388 // Read a bunch of data compressed with the Huffman algorithm
389 uint32_t UncompressHUFFMAN16(char* dest,
390 const char* src,
391 uint32_t numChunks)
392 {
393 std::vector<uint16_t> uncompressed;
394
395 //read compressed sizes (one per row)
396 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
397 src += sizeof(uint32_t)*numChunks;
398
399 //uncompress the rows, one by one
400 uint32_t sizeWritten = 0;
401 for (uint32_t j=0;j<numChunks;j++)
402 {
403 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
404
405 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
406
407 sizeWritten += uncompressed.size()*sizeof(uint16_t);
408 dest += uncompressed.size()*sizeof(uint16_t);
409 src += compressedSizes[j];
410 }
411 return sizeWritten;
412 }
413
414 // Apply the inverse transform of the integer smoothing
415 uint32_t UnApplySMOOTHING(int16_t* data,
416 uint32_t numElems)
417 {
418 //un-do the integer smoothing
419 for (uint32_t j=2;j<numElems;j++)
420 data[j] = data[j] + (data[j-1]+data[j-2])/2;
421
422 return numElems*sizeof(uint16_t);
423 }
424
425 // Data has been read from disk. Uncompress it !
426 void UncompressBuffer(const uint32_t &catalogCurrentRow,
427 const uint32_t &thisRoundNumRows,
428 const uint32_t offset)
429 {
430 char *dest = fTransposedBuffer.data();
431
432 //uncompress column by column
433 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
434 {
435 const fits::Table::Column &col = fTable.sorted_cols[i];
436 if (col.num == 0)
437 continue;
438
439 //get the compression flag
440 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i]+offset;
441
442 const FITS::BlockHeader* head = reinterpret_cast<FITS::BlockHeader*>(&fCompressedBuffer[compressedOffset]);
443
444 fColumnOrdering[i] = head->ordering;
445
446 const uint32_t numRows = (head->ordering==FITS::kOrderByRow) ? thisRoundNumRows : col.num;
447 const uint32_t numCols = (head->ordering==FITS::kOrderByCol) ? thisRoundNumRows : col.num;
448
449 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(FITS::BlockHeader)+sizeof(uint16_t)*head->numProcs;
450
451 for (int32_t j=head->numProcs-1;j >= 0; j--)
452 {
453 uint32_t sizeWritten=0;
454
455 switch (head->processings[j])
456 {
457 case FITS::kFactRaw:
458 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
459 break;
460
461 case FITS::kFactSmoothing:
462 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
463 break;
464
465 case FITS::kFactHuffman16:
466 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
467 break;
468
469 default:
470 clear(rdstate()|std::ios::badbit);
471
472 std::ostringstream str;
473 str << "Unkown processing applied to data. Col " << i << " proc " << j << " out of " << (int)head->numProcs;
474#ifdef __EXCEPTIONS
475 throw std::runtime_error(str.str());
476#else
477 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
478 return;
479#endif
480 }
481 //increment destination counter only when processing done.
482 if (j==0)
483 dest+= sizeWritten;
484 }
485 }
486 }
487
488 void CheckIfFileIsConsistent(bool update_catalog=false)
489 {
490 //goto start of heap
491 streamoff whereAreWe = tellg();
492 seekg(fHeapOff);
493
494 //init number of rows to zero
495 uint64_t numRows = 0;
496
497 //get number of columns from header
498 size_t numCols = fTable.num_cols;
499
500 std::vector<std::vector<std::pair<int64_t, int64_t> > > catalog;
501
502 FITS::TileHeader tileHead;
503 FITS::BlockHeader columnHead;
504
505 streamoff offsetInHeap = 0;
506 //skip through the heap
507 while (true)
508 {
509 read((char*)(&tileHead), sizeof(FITS::TileHeader));
510 //end of file
511 if (!good())
512 break;
513
514 //padding or corrupt data
515 if (memcmp(tileHead.id, "TILE", 4))
516 {
517 clear(rdstate()|std::ios::badbit);
518 break;
519 }
520
521 //a new tile begins here
522 catalog.emplace_back(std::vector<std::pair<int64_t, int64_t> >(0));
523 offsetInHeap += sizeof(FITS::TileHeader);
524
525 //skip through the columns
526 for (size_t i=0;i<numCols;i++)
527 {
528 //zero sized column do not have headers. Skip it
529 if (fTable.sorted_cols[i].num == 0)
530 {
531 catalog.back().emplace_back(0,0);
532 continue;
533 }
534
535 //read column header
536 read((char*)(&columnHead), sizeof(FITS::BlockHeader));
537
538 //corrupted tile
539 if (!good())
540 break;
541
542 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
543 offsetInHeap += columnHead.size;
544 seekg(fHeapOff+offsetInHeap);
545 }
546
547 //if we ain't good, this means that something went wrong inside the current tile.
548 if (!good())
549 {
550 catalog.pop_back();
551 break;
552 }
553 //current tile is complete. Add rows
554 numRows += tileHead.numRows;
555 }
556
557 if (numRows != fTable.num_rows)
558 {
559 clear(rdstate()|std::ios::badbit);
560 std::ostringstream str;
561 str << "Heap data does not agree with header: " << numRows << " calculated vs " << fTable.num_rows << " from header.";
562#ifdef __EXCEPTIONS
563 throw std::runtime_error(str.str());
564#else
565 gLog << ___err___ << "ERROR - " << str.str() << endl;
566 return;
567#endif
568 }
569
570 if (update_catalog)
571 {
572 fCatalog = catalog;
573 //clear the bad bit before seeking back (we hit eof)
574 clear();
575 seekg(whereAreWe);
576 return;
577 }
578
579 if (catalog.size() != fCatalog.size())
580 {
581 clear(rdstate()|std::ios::badbit);
582#ifdef __EXCEPTIONS
583 throw std::runtime_error("Heap data does not agree with header.");
584#else
585 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
586 return;
587#endif
588 }
589
590 for (uint32_t i=0;i<catalog.size(); i++)
591 for (uint32_t j=0;j<numCols;j++)
592 {
593 if (catalog[i][j].first != fCatalog[i][j].first ||
594 catalog[i][j].second != fCatalog[i][j].second)
595 {
596 clear(rdstate()|std::ios::badbit);
597#ifdef __EXCEPTIONS
598 throw std::runtime_error("Heap data does not agree with header.");
599#else
600 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
601 return;
602#endif
603 }
604 }
605 //go back to start of heap
606 //clear the bad bit before seeking back (we hit eof)
607 clear();
608 seekg(whereAreWe);
609 }
610
611};//class zfits
612
613#endif
Note: See TracBrowser for help on using the repository browser.