source: trunk/Mars/mcore/zfits.h@ 17338

Last change on this file since 17338 was 17305, checked in by lyard, 11 years ago
Moved catalog reconstruction only when a row is read so that headers can be retrieved quickly in all cases
File size: 20.6 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_zfits
9#define MARS_zfits
10
11#include "fits.h"
12#include "huffman.h"
13
14#include "FITS.h"
15
16class zfits : public fits
17{
18public:
19
20 // Basic constructor
21 zfits(const std::string& fname, const std::string& tableName="", bool force=false)
22 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
23 {
24 open(fname.c_str());
25 Constructor(fname, "", tableName, force);
26// InitCompressionReading();
27 }
28
29 // Alternative contstructor
30 zfits(const std::string& fname, const std::string& fout, const std::string& tableName, bool force=false)
31 : fCatalogInitialized(false), fNumTiles(0), fNumRowsPerTile(0), fCurrentRow(-1), fHeapOff(0), fTileSize(0)
32 {
33 open(fname.c_str());
34 Constructor(fname, fout, tableName, force);
35// InitCompressionReading();
36 }
37
38 // Skip the next row
39 bool SkipNextRow()
40 {
41 if (!fTable.is_compressed)
42 return fits::SkipNextRow();
43
44 fRow++;
45 return true;
46 }
47
48 virtual bool IsFileOk() const
49 {
50 bool rawsum = true;
51
52 if (HasKey("RAWSUM"))
53 {
54 std::ostringstream str;
55 str << fRawsum.val();
56 rawsum = (GetStr("RAWSUM") == str.str());
57 }
58
59 return fits::IsFileOk() && rawsum;
60 };
61
62 size_t GetNumRows() const
63 {
64 if (fTable.is_compressed)
65 return fTable.Get<size_t>("ZNAXIS2");
66 else
67 return fTable.Get<size_t>("NAXIS2");
68 }
69 size_t GetBytesPerRow() const
70 {
71 if (fTable.is_compressed)
72 return fTable.Get<size_t>("ZNAXIS1");
73 else
74 return fTable.Get<size_t>("NAXIS1");
75 }
76
77protected:
78
79 // Stage the requested row to internal buffer
80 // Does NOT return data to users
81 virtual void StageRow(size_t row, char* dest)
82 {
83 if (!fTable.is_compressed)
84 {
85 fits::StageRow(row, dest);
86 return;
87 }
88 ReadBinaryRow(row, dest);
89 }
90
91private:
92
93 // Do what it takes to initialize the compressed structured
94 void InitCompressionReading()
95 {
96 fCatalogInitialized = true;
97
98 if (!fTable.is_compressed)
99 return;
100
101 //The constructor may have failed
102 if (!good())
103 return;
104
105 if (fTable.is_compressed)
106 for (auto it=fTable.sorted_cols.cbegin(); it!= fTable.sorted_cols.cend(); it++)
107 {
108 if (it->comp == kCompFACT)
109 continue;
110
111 clear(rdstate()|std::ios::badbit);
112#ifdef __EXCEPTIONS
113 throw std::runtime_error("Only the FACT compression scheme is handled by this reader.");
114#else
115 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << std::endl;
116 return;
117#endif
118 }
119
120 fColumnOrdering.resize(fTable.sorted_cols.size(), FITS::kOrderByRow);
121
122 //Get compressed specific keywords
123 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
124 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
125
126 //read the file's catalog
127 ReadCatalog();
128
129 //give it some space for uncompressing
130 AllocateBuffers();
131
132 //check that heap agrees with head
133 //CheckIfFileIsConsistent();
134 }
135
136 // Copy decompressed data to location requested by user
137 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
138 {
139 if (!fTable.is_compressed)
140 {
141 fits::MoveColumnDataToUserSpace(dest, src, c);
142 return;
143 }
144
145 memcpy(dest, src, c.num*c.size);
146 }
147
148 bool fCatalogInitialized;
149
150 std::vector<char> fBuffer; ///<store the uncompressed rows
151 std::vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
152 std::vector<char> fCompressedBuffer; ///<compressed rows
153 std::vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
154
155 size_t fNumTiles; ///< Total number of tiles
156 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
157 int64_t fCurrentRow; ///< current row in memory signed because we need -1
158
159 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
160 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
161
162 std::vector<std::vector<std::pair<int64_t, int64_t>>> fCatalog; ///< Catalog, i.e. the main table that points to the compressed data.
163 std::vector<size_t> fTileSize; ///< size in bytes of each compressed tile
164 std::vector<std::vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
165
166 Checksum fRawsum; ///< Checksum of the uncompressed, raw data
167
168 // Get buffer space
169 void AllocateBuffers()
170 {
171 uint32_t buffer_size = fTable.bytes_per_row*fNumRowsPerTile;
172 uint32_t compressed_buffer_size = fTable.bytes_per_row*fNumRowsPerTile +
173 //use a bit more memory for block headers. 256 char coding the compression sequence max.
174 fTable.num_cols*(sizeof(FITS::BlockHeader)+256) +
175 //a bit more for the tile headers
176 sizeof(FITS::TileHeader) +
177 //and a bit more for checksuming
178 8;
179
180 if (buffer_size % 4 != 0)
181 buffer_size += 4 - (buffer_size%4);
182
183 if (compressed_buffer_size % 4 != 0)
184 compressed_buffer_size += 4 - (compressed_buffer_size%4);
185
186 fBuffer.resize(buffer_size);
187
188 fTransposedBuffer.resize(buffer_size);
189 fCompressedBuffer.resize(compressed_buffer_size);
190 }
191
192 // Read catalog data. I.e. the address of the compressed data inside the heap
193 void ReadCatalog()
194 {
195 std::vector<char> readBuf(16);
196 fCatalog.resize(fNumTiles);
197
198 const streampos catalogStart = tellg();
199
200 fChkData.reset();
201
202 //do the actual reading
203 for (uint32_t i=0;i<fNumTiles;i++)
204 for (uint32_t j=0;j<fTable.num_cols;j++)
205 {
206 read(readBuf.data(), 2*sizeof(int64_t));
207 fChkData.add(readBuf);
208 //swap the bytes
209 int64_t tempValues[2] = {0,0};
210 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
211 if (tempValues[0] < 0 || tempValues[1] < 0)
212 {
213 clear(rdstate()|std::ios::badbit);
214#ifdef __EXCEPTIONS
215 throw std::runtime_error("Negative value in the catalog");
216#else
217 gLog << ___err___ << "ERROR - negative value in the catalog" << std::endl;
218 return;
219#endif
220 }
221 //add catalog entry
222 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
223 }
224
225 //see if there is a gap before heap data
226 fHeapOff = tellg()+fTable.GetHeapShift();
227 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
228
229 //check if the catalog has been shrinked
230 uint32_t shrink_factor = 1;
231 if (HasKey("ZSHRINK"))
232 shrink_factor = GetInt("ZSHRINK");
233
234 if (shrink_factor != 1)
235 {
236 CheckIfFileIsConsistent(true);
237 fNumTiles = fCatalog.size();
238 fNumRowsPerTile /= shrink_factor;
239 }
240
241 //compute the total size of each compressed tile
242 fTileSize.resize(fNumTiles);
243 fTileOffsets.resize(fNumTiles);
244 for (uint32_t i=0;i<fNumTiles;i++)
245 {
246 fTileSize[i] = 0;
247 for (uint32_t j=0;j<fTable.num_cols;j++)
248 {
249 fTileSize[i] += fCatalog[i][j].first;
250 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
251 }
252 }
253
254 if (!fCopy.is_open())
255 return;
256
257 //write catalog and heap gap to target file
258 seekg(catalogStart);
259
260 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
261
262 std::vector<char> buf(catSize);
263 read(buf.data(), catSize);
264
265 fCopy.write(buf.data(), catSize);
266 if (!fCopy)
267 clear(rdstate()|std::ios::badbit);
268 }
269
270 //overrides fits.h method with empty one
271 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
272 virtual void WriteRowToCopyFile(size_t row)
273 {
274 if (row == fRow+1)
275 fRawsum.add(fBufferRow, false);
276 }
277
278 // Compressed version of the read row
279 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
280 {
281 if (rowNum >= GetNumRows())
282 return false;
283
284 if (!fCatalogInitialized)
285 InitCompressionReading();
286
287 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
288 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
289
290 bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1));
291
292 fCurrentRow = rowNum;
293 //should we read yet another chunk of data ?
294 if (requestedTile != currentTile)
295 {
296 //read yet another chunk from the file
297 const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(FITS::TileHeader);
298
299 //skip to the beginning of the tile
300 const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(FITS::TileHeader);
301
302 seekg(fHeapOff+tileStart);
303
304 //calculate the 32 bits offset of the current tile.
305 const uint32_t offset = (tileStart + fHeapFromDataStart)%4;
306
307 //point the tile header where it should be
308 //we ain't checking the header now
309// TileHeader* tHead = reinterpret_cast<TileHeader*>(fCompressedBuffer.data()+offset);
310
311 ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8));
312
313 //read one tile from disk
314 read(fCompressedBuffer.data()+offset, sizeToRead);
315
316 if (addCheckSum)
317 fChkData.add(fCompressedBuffer);
318
319 if (requestedTile == currentTile+1 &&
320 fCopy.is_open() &&
321 fCopy.good())
322 {
323 fCopy.write(fCompressedBuffer.data()+offset, sizeToRead);
324 if (!fCopy)
325 clear(rdstate()|std::ios::badbit);
326 }
327 else
328 if (fCopy.is_open())
329 clear(rdstate()|std::ios::badbit);
330
331 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
332
333 //uncompress it
334 UncompressBuffer(requestedTile, thisRoundNumRows, offset+sizeof(FITS::TileHeader));
335
336 // pointer to column (source buffer)
337 const char *src = fTransposedBuffer.data();
338
339 uint32_t i=0;
340 for (auto it=fTable.sorted_cols.cbegin(); it!=fTable.sorted_cols.cend(); it++, i++)
341 {
342 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
343
344 switch (fColumnOrdering[i])
345 {
346 case FITS::kOrderByRow:
347 // regular, "semi-transposed" copy
348 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
349 {
350 memcpy(dest, src, it->bytes);
351 src += it->bytes; // next column
352 }
353 break;
354
355 case FITS::kOrderByCol:
356 // transposed copy
357 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
358 {
359 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
360 {
361 memcpy(dest, src, it->size);
362 src += it->size; // next element
363 }
364 }
365 break;
366
367 default:
368 clear(rdstate()|std::ios::badbit);
369#ifdef __EXCEPTIONS
370 throw std::runtime_error("Unkown column ordering scheme found");
371#else
372 gLog << ___err___ << "ERROR - unkown column ordering scheme" << std::endl;
373 return false;
374#endif
375 break;
376 };
377 }
378 }
379
380 //Data loaded and uncompressed. Copy it to destination
381 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
382 return good();
383 }
384
385 // Read a bunch of uncompressed data
386 uint32_t UncompressUNCOMPRESSED(char* dest,
387 const char* src,
388 uint32_t numElems,
389 uint32_t sizeOfElems)
390 {
391 memcpy(dest, src, numElems*sizeOfElems);
392 return numElems*sizeOfElems;
393 }
394
395 // Read a bunch of data compressed with the Huffman algorithm
396 uint32_t UncompressHUFFMAN16(char* dest,
397 const char* src,
398 uint32_t numChunks)
399 {
400 std::vector<uint16_t> uncompressed;
401
402 //read compressed sizes (one per row)
403 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
404 src += sizeof(uint32_t)*numChunks;
405
406 //uncompress the rows, one by one
407 uint32_t sizeWritten = 0;
408 for (uint32_t j=0;j<numChunks;j++)
409 {
410 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
411
412 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
413
414 sizeWritten += uncompressed.size()*sizeof(uint16_t);
415 dest += uncompressed.size()*sizeof(uint16_t);
416 src += compressedSizes[j];
417 }
418 return sizeWritten;
419 }
420
421 // Apply the inverse transform of the integer smoothing
422 uint32_t UnApplySMOOTHING(int16_t* data,
423 uint32_t numElems)
424 {
425 //un-do the integer smoothing
426 for (uint32_t j=2;j<numElems;j++)
427 data[j] = data[j] + (data[j-1]+data[j-2])/2;
428
429 return numElems*sizeof(uint16_t);
430 }
431
432 // Data has been read from disk. Uncompress it !
433 void UncompressBuffer(const uint32_t &catalogCurrentRow,
434 const uint32_t &thisRoundNumRows,
435 const uint32_t offset)
436 {
437 char *dest = fTransposedBuffer.data();
438
439 //uncompress column by column
440 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
441 {
442 const fits::Table::Column &col = fTable.sorted_cols[i];
443 if (col.num == 0)
444 continue;
445
446 //get the compression flag
447 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i]+offset;
448
449 const FITS::BlockHeader* head = reinterpret_cast<FITS::BlockHeader*>(&fCompressedBuffer[compressedOffset]);
450
451 fColumnOrdering[i] = head->ordering;
452
453 const uint32_t numRows = (head->ordering==FITS::kOrderByRow) ? thisRoundNumRows : col.num;
454 const uint32_t numCols = (head->ordering==FITS::kOrderByCol) ? thisRoundNumRows : col.num;
455
456 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(FITS::BlockHeader)+sizeof(uint16_t)*head->numProcs;
457
458 for (int32_t j=head->numProcs-1;j >= 0; j--)
459 {
460 uint32_t sizeWritten=0;
461
462 switch (head->processings[j])
463 {
464 case FITS::kFactRaw:
465 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
466 break;
467
468 case FITS::kFactSmoothing:
469 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
470 break;
471
472 case FITS::kFactHuffman16:
473 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
474 break;
475
476 default:
477 clear(rdstate()|std::ios::badbit);
478
479 std::ostringstream str;
480 str << "Unkown processing applied to data. Col " << i << " proc " << j << " out of " << (int)head->numProcs;
481#ifdef __EXCEPTIONS
482 throw std::runtime_error(str.str());
483#else
484 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << std::endl;
485 return;
486#endif
487 }
488 //increment destination counter only when processing done.
489 if (j==0)
490 dest+= sizeWritten;
491 }
492 }
493 }
494
495 void CheckIfFileIsConsistent(bool update_catalog=false)
496 {
497 //goto start of heap
498 streamoff whereAreWe = tellg();
499 seekg(fHeapOff);
500
501 //init number of rows to zero
502 uint64_t numRows = 0;
503
504 //get number of columns from header
505 size_t numCols = fTable.num_cols;
506
507 std::vector<std::vector<std::pair<int64_t, int64_t> > > catalog;
508
509 FITS::TileHeader tileHead;
510 FITS::BlockHeader columnHead;
511
512 streamoff offsetInHeap = 0;
513 //skip through the heap
514 while (true)
515 {
516 read((char*)(&tileHead), sizeof(FITS::TileHeader));
517 //end of file
518 if (!good())
519 break;
520
521 //padding or corrupt data
522 if (memcmp(tileHead.id, "TILE", 4))
523 {
524 clear(rdstate()|std::ios::badbit);
525 break;
526 }
527
528 //a new tile begins here
529 catalog.emplace_back(std::vector<std::pair<int64_t, int64_t> >(0));
530 offsetInHeap += sizeof(FITS::TileHeader);
531
532 //skip through the columns
533 for (size_t i=0;i<numCols;i++)
534 {
535 //zero sized column do not have headers. Skip it
536 if (fTable.sorted_cols[i].num == 0)
537 {
538 catalog.back().emplace_back(0,0);
539 continue;
540 }
541
542 //read column header
543 read((char*)(&columnHead), sizeof(FITS::BlockHeader));
544
545 //corrupted tile
546 if (!good())
547 break;
548
549 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
550 offsetInHeap += columnHead.size;
551 seekg(fHeapOff+offsetInHeap);
552 }
553
554 //if we ain't good, this means that something went wrong inside the current tile.
555 if (!good())
556 {
557 catalog.pop_back();
558 break;
559 }
560 //current tile is complete. Add rows
561 numRows += tileHead.numRows;
562 }
563
564 if (numRows != fTable.num_rows)
565 {
566 clear(rdstate()|std::ios::badbit);
567 std::ostringstream str;
568 str << "Heap data does not agree with header: " << numRows << " calculated vs " << fTable.num_rows << " from header.";
569#ifdef __EXCEPTIONS
570 throw std::runtime_error(str.str());
571#else
572 gLog << ___err___ << "ERROR - " << str.str() << std::endl;
573 return;
574#endif
575 }
576
577 if (update_catalog)
578 {
579 fCatalog = catalog;
580 //clear the bad bit before seeking back (we hit eof)
581 clear();
582 seekg(whereAreWe);
583 return;
584 }
585
586 if (catalog.size() != fCatalog.size())
587 {
588 clear(rdstate()|std::ios::badbit);
589#ifdef __EXCEPTIONS
590 throw std::runtime_error("Heap data does not agree with header.");
591#else
592 gLog << ___err___ << "ERROR - Heap data does not agree with header." << std::endl;
593 return;
594#endif
595 }
596
597 for (uint32_t i=0;i<catalog.size(); i++)
598 for (uint32_t j=0;j<numCols;j++)
599 {
600 if (catalog[i][j].first != fCatalog[i][j].first ||
601 catalog[i][j].second != fCatalog[i][j].second)
602 {
603 clear(rdstate()|std::ios::badbit);
604#ifdef __EXCEPTIONS
605 throw std::runtime_error("Heap data does not agree with header.");
606#else
607 gLog << ___err___ << "ERROR - Heap data does not agree with header." << std::endl;
608 return;
609#endif
610 }
611 }
612 //go back to start of heap
613 //clear the bad bit before seeking back (we hit eof)
614 clear();
615 seekg(whereAreWe);
616 }
617
618};//class zfits
619
620#endif
Note: See TracBrowser for help on using the repository browser.