source: trunk/Mars/mcore/zfits.h@ 17223

Last change on this file since 17223 was 17221, checked in by lyard, 11 years ago
better version of zofits. New factofits. Only cleanup, defines, comments and catalog shrinkage are missing
File size: 19.8 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_zfits
9#define MARS_zfits
10
11#include "fits.h"
12#include "huffman.h"
13
14#include "FITS.h"
15
16using namespace FITS;
17
18#ifndef __MARS__
19namespace std
20{
21#endif
22
23class zfits : public fits
24{
25public:
26
27 // Basic constructor
28 zfits(const string& fname, const string& tableName="", bool force=false) : fits(),
29 fNumTiles(0),
30 fNumRowsPerTile(0),
31 fCurrentRow(-1),
32 fHeapOff(0),
33 fTileSize(0)
34 {
35 open(fname.c_str());
36 Constructor(fname, "", tableName, force);
37 InitCompressionReading();
38 }
39
40 // Alternative contstructor
41 zfits(const string& fname, const string& fout, const string& tableName, bool force=false) : fits(),
42 fNumTiles(0),
43 fNumRowsPerTile(0),
44 fCurrentRow(-1),
45 fHeapOff(0),
46 fTileSize(0)
47 {
48 open(fname.c_str());
49 Constructor(fname, fout, tableName, force);
50 InitCompressionReading();
51 }
52
53 // Skip the next row
54 bool SkipNextRow()
55 {
56 if (!fTable.is_compressed)
57 return fits::SkipNextRow();
58
59 fRow++;
60 return true;
61 }
62
63 virtual bool IsFileOk() const
64 {
65 bool rawsum = true;
66
67 if (HasKey("RAWSUM"))
68 {
69 ostringstream str;
70 str << fRawsum.val();
71 rawsum = (GetStr("RAWSUM") == str.str());
72 }
73
74 return fits::IsFileOk() && rawsum;
75 };
76
77 size_t GetNumRows() const
78 {
79 if (fTable.is_compressed)
80 return fTable.Get<size_t>("ZNAXIS2");
81 else
82 return fTable.Get<size_t>("NAXIS2");
83 }
84 size_t GetBytesPerRow() const
85 {
86 if (fTable.is_compressed)
87 return fTable.Get<size_t>("ZNAXIS1");
88 else
89 return fTable.Get<size_t>("NAXIS1");
90 }
91
92protected:
93
94 // Stage the requested row to internal buffer
95 // Does NOT return data to users
96 virtual void StageRow(size_t row, char* dest)
97 {
98 if (!fTable.is_compressed)
99 {
100 fits::StageRow(row, dest);
101 return;
102 }
103 ReadBinaryRow(row, dest);
104 }
105
106private:
107
108 // Do what it takes to initialize the compressed structured
109 void InitCompressionReading()
110 {
111 if (!fTable.is_compressed)
112 return;
113
114 //The constructor may have failed
115 if (!good())
116 return;
117
118 if (fTable.is_compressed)
119 for (auto it=fTable.sorted_cols.begin(); it!= fTable.sorted_cols.end(); it++)
120 {
121 if (it->comp == kCompFACT)
122 continue;
123
124 clear(rdstate()|ios::badbit);
125#ifdef __EXCEPTIONS
126 throw runtime_error("Only the FACT compression scheme is handled by this reader.");
127#else
128 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << endl;
129 return;
130#endif
131 }
132
133 fColumnOrdering.resize(fTable.sorted_cols.size());
134 for (auto it=fColumnOrdering.begin(); it != fColumnOrdering.end(); it++)
135 (*it) = kOrderByRow;
136 //Get compressed specific keywords
137 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
138 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
139
140 //give it some space for uncompressing
141 AllocateBuffers();
142
143 //read the file's catalog
144 ReadCatalog();
145
146 //check that heap agrees with head
147 //CheckIfFileIsConsistent();
148 }
149
150 // Copy decompressed data to location requested by user
151 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
152 {
153 if (!fTable.is_compressed)
154 {
155 fits::MoveColumnDataToUserSpace(dest, src, c);
156 return;
157 }
158
159 memcpy(dest, src, c.num*c.size);
160 }
161
162 vector<char> fBuffer; ///<store the uncompressed rows
163 vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
164 vector<char> fCompressedBuffer; ///<compressed rows
165 vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
166
167 size_t fNumTiles; ///< Total number of tiles
168 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
169 int64_t fCurrentRow; ///< current row in memory signed because we need -1
170
171 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
172 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
173
174 vector<vector<pair<int64_t, int64_t>>> fCatalog;///< Catalog, i.e. the main table that points to the compressed data.
175 vector<size_t> fTileSize; ///< size in bytes of each compressed tile
176 vector<vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
177
178 Checksum fRawsum; ///< Checksum of the uncompressed, raw data
179
180 // Get buffer space
181 void AllocateBuffers()
182 {
183 fBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
184
185 fTransposedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
186 fCompressedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile +
187 fTable.num_cols*(sizeof(BlockHeader)+256) + //use a bit more memory for block headers. 256 char coding the compression sequence max.
188 sizeof(TileHeader), //a bit more for the tile headers
189 8); //and a bit more for checksuming
190 }
191
192 // Read catalog data. I.e. the address of the compressed data inside the heap
193 void ReadCatalog()
194 {
195 vector<char> readBuf(16);
196 fCatalog.resize(fNumTiles);
197
198 const streampos catalogStart = tellg();
199
200 fChkData.reset();
201
202 //do the actual reading
203 for (uint32_t i=0;i<fNumTiles;i++)
204 for (uint32_t j=0;j<fTable.num_cols;j++)
205 {
206 read(readBuf.data(), 2*sizeof(int64_t));
207 fChkData.add(readBuf);
208 //swap the bytes
209 int64_t tempValues[2] = {0,0};
210 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
211 if (tempValues[0] < 0 || tempValues[1] < 0)
212 {
213 clear(rdstate()|ios::badbit);
214#ifdef __EXCEPTIONS
215 throw runtime_error("Negative value in the catalog");
216#else
217 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
218 return;
219#endif
220 }
221 //add catalog entry
222 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
223 }
224
225 //compute the total size of each compressed tile
226 fTileSize.resize(fNumTiles);
227 fTileOffsets.resize(fNumTiles);
228 for (uint32_t i=0;i<fNumTiles;i++)
229 {
230 fTileSize[i] = 0;
231 for (uint32_t j=0;j<fTable.num_cols;j++)
232 {
233 fTileSize[i] += fCatalog[i][j].first;
234 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
235 }
236 }
237 //see if there is a gap before heap data
238 fHeapOff = tellg()+fTable.GetHeapShift();
239 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
240
241 if (!fCopy.is_open())
242 return;
243
244 //write catalog and heap gap to target file
245 seekg(catalogStart);
246
247 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
248
249 vector<char> buf(catSize);
250 read(buf.data(), catSize);
251
252 fCopy.write(buf.data(), catSize);
253 if (!fCopy)
254 clear(rdstate()|ios::badbit);
255 }
256
257 //overrides fits.h method with empty one
258 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
259 virtual void WriteRowToCopyFile(size_t row)
260 {
261 if (row == fRow+1)
262 fRawsum.add(fBufferRow, false);
263 }
264
265 // Compressed version of the read row
266 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
267 {
268 if (rowNum >= GetNumRows())
269 return false;
270
271 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
272 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
273
274 bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1));
275
276 fCurrentRow = rowNum;
277 //should we read yet another chunk of data ?
278 if (requestedTile != currentTile)
279 {
280 //read yet another chunk from the file
281 const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(TileHeader);
282
283 //skip to the beginning of the tile
284 const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(TileHeader);
285
286 seekg(fHeapOff+tileStart);
287
288 //calculate the 32 bits offset of the current tile.
289 const uint32_t offset = (tileStart + fHeapFromDataStart)%4;
290
291 //point the tile header where it should be
292 //we ain't checking the header now
293// TileHeader* tHead = reinterpret_cast<TileHeader*>(fCompressedBuffer.data()+offset);
294
295 ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8));
296
297 //read one tile from disk
298 read(fCompressedBuffer.data()+offset, sizeToRead);
299
300 if (addCheckSum)
301 fChkData.add(fCompressedBuffer);
302
303 if (requestedTile == currentTile+1 &&
304 fCopy.is_open() &&
305 fCopy.good())
306 {
307 fCopy.write(fCompressedBuffer.data()+offset, sizeToRead);
308 if (!fCopy)
309 clear(rdstate()|ios::badbit);
310 }
311 else
312 if (fCopy.is_open())
313 clear(rdstate()|ios::badbit);
314
315 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
316
317 //uncompress it
318 UncompressBuffer(requestedTile, thisRoundNumRows, offset+sizeof(TileHeader));
319
320 // pointer to column (source buffer)
321 const char *src = fTransposedBuffer.data();
322
323 uint32_t i=0;
324 for (auto it=fTable.sorted_cols.begin(); it!=fTable.sorted_cols.end(); it++, i++)
325 {
326 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
327
328 switch (fColumnOrdering[i])
329 {
330 case kOrderByRow:
331 // regular, "semi-transposed" copy
332 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
333 {
334 memcpy(dest, src, it->bytes);
335 src += it->bytes; // next column
336 }
337 break;
338
339 case kOrderByCol:
340 // transposed copy
341 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
342 {
343 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
344 {
345 memcpy(dest, src, it->size);
346 src += it->size; // next element
347 }
348 }
349 break;
350 default:
351 clear(rdstate()|ios::badbit);
352 #ifdef __EXCEPTIONS
353 throw runtime_error("Unkown column ordering scheme found");
354 #else
355 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
356 return false;
357 #endif
358 break;
359 };
360 }
361 }
362
363 //Data loaded and uncompressed. Copy it to destination
364 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
365 return good();
366 }
367
368 // Read a bunch of uncompressed data
369 uint32_t UncompressUNCOMPRESSED(char* dest,
370 const char* src,
371 uint32_t numElems,
372 uint32_t sizeOfElems)
373 {
374 memcpy(dest, src, numElems*sizeOfElems);
375 return numElems*sizeOfElems;
376 }
377
378 // Read a bunch of data compressed with the Huffman algorithm
379 uint32_t UncompressHUFFMAN16(char* dest,
380 const char* src,
381 uint32_t numChunks)
382 {
383 vector<uint16_t> uncompressed;
384
385 //read compressed sizes (one per row)
386 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
387 src += sizeof(uint32_t)*numChunks;
388
389 //uncompress the rows, one by one
390 uint32_t sizeWritten = 0;
391 for (uint32_t j=0;j<numChunks;j++)
392 {
393 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
394
395 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
396
397 sizeWritten += uncompressed.size()*sizeof(uint16_t);
398 dest += uncompressed.size()*sizeof(uint16_t);
399 src += compressedSizes[j];
400 }
401 return sizeWritten;
402 }
403
404 // Apply the inverse transform of the integer smoothing
405 uint32_t UnApplySMOOTHING(int16_t* data,
406 uint32_t numElems)
407 {
408 //un-do the integer smoothing
409 for (uint32_t j=2;j<numElems;j++)
410 data[j] = data[j] + (data[j-1]+data[j-2])/2;
411
412 return numElems*sizeof(uint16_t);
413 }
414
415 // Data has been read from disk. Uncompress it !
416 void UncompressBuffer(const uint32_t &catalogCurrentRow,
417 const uint32_t &thisRoundNumRows,
418 const uint32_t offset)
419 {
420 char *dest = fTransposedBuffer.data();
421
422 //uncompress column by column
423 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
424 {
425 const fits::Table::Column &col = fTable.sorted_cols[i];
426 if (col.num == 0)
427 continue;
428
429 //get the compression flag
430 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i]+offset;
431
432 const BlockHeader* head = reinterpret_cast<BlockHeader*>(&fCompressedBuffer[compressedOffset]);
433
434 fColumnOrdering[i] = head->ordering;
435
436 const uint32_t numRows = (head->ordering==kOrderByRow) ? thisRoundNumRows : col.num;
437 const uint32_t numCols = (head->ordering==kOrderByCol) ? thisRoundNumRows : col.num;
438
439 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(BlockHeader)+sizeof(uint16_t)*head->numProcs;
440
441 for (int32_t j=head->numProcs-1;j >= 0; j--)
442 {
443 uint32_t sizeWritten=0;
444
445 switch (head->processings[j])
446 {
447 case kFactRaw:
448 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
449 break;
450
451 case kFactSmoothing:
452 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
453 break;
454
455 case kFactHuffman16:
456 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
457 break;
458
459 default:
460 clear(rdstate()|ios::badbit);
461 ostringstream str;
462 str << "Unkown processing applied to data. Col " << i << " proc " << j << " out of " << (int)head->numProcs;
463#ifdef __EXCEPTIONS
464 throw runtime_error(str.str());
465#else
466 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
467 return;
468#endif
469 }
470 //increment destination counter only when processing done.
471 if (j==0)
472 dest+= sizeWritten;
473 }
474 }
475 }
476
477 void CheckIfFileIsConsistent()
478 {
479 //goto start of heap
480 streamoff whereAreWe = tellg();
481 seekg(fHeapOff);
482
483 //init number of rows to zero
484 uint64_t numRows = 0;
485
486 //get number of columns from header
487 size_t numCols = fTable.num_cols;
488
489 vector<vector<pair<int64_t, int64_t> > > catalog;
490
491 TileHeader tileHead;
492 BlockHeader columnHead;
493 streamoff offsetInHeap = 0;
494 //skip through the heap
495 while (true)
496 {
497 read((char*)(&tileHead), sizeof(TileHeader));
498 //end of file
499 if (!good())
500 break;
501 //padding or corrupt data
502 if (memcmp(tileHead.id, "TILE", 4))
503 {
504 clear(rdstate()|ios::badbit);
505 break;
506 }
507
508 //a new tile begins here
509 catalog.push_back(vector<pair<int64_t, int64_t> >(0));
510 offsetInHeap += sizeof(TileHeader);
511
512 //skip through the columns
513 for (size_t i=0;i<numCols;i++)
514 {
515 //zero sized column do not have headers. Skip it
516 if (fTable.sorted_cols[i].num == 0)
517 {
518 catalog.back().push_back(make_pair(0,0));
519 continue;
520 }
521 //read column header
522 read((char*)(&columnHead), sizeof(BlockHeader));
523 //corrupted tile
524 if (!good())
525 break;
526 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
527 offsetInHeap += columnHead.size;
528 seekg(fHeapOff+offsetInHeap);
529 }
530
531 //if we ain't good, this means that something went wrong inside the current tile.
532 if (!good())
533 {
534 catalog.pop_back();
535 break;
536 }
537
538 //current tile is complete. Add rows
539 numRows += tileHead.numRows;
540 }
541
542 if (catalog.size() != fCatalog.size() ||
543 numRows != fTable.num_rows)
544 {
545 clear(rdstate()|ios::badbit);
546#ifdef __EXCEPTIONS
547 throw runtime_error("Heap data does not agree with header.");
548#else
549 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
550 return;
551#endif
552 }
553
554 for (uint32_t i=0;i<catalog.size(); i++)
555 for (uint32_t j=0;j<numCols;j++)
556 {
557 if (catalog[i][j].first != fCatalog[i][j].first ||
558 catalog[i][j].second != fCatalog[i][j].second)
559 {
560 clear(rdstate()|ios::badbit);
561#ifdef __EXCEPTIONS
562 throw runtime_error("Heap data does not agree with header.");
563#else
564 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
565 return;
566#endif
567 }
568 }
569 //go back to start of heap
570 seekg(whereAreWe);
571 }
572
573};//class zfits
574
575#ifndef __MARS__
576}; //namespace std
577#endif
578
579#endif
Note: See TracBrowser for help on using the repository browser.