source: trunk/Mars/mcore/zfits.h@ 16882

Last change on this file since 16882 was 16882, checked in by lyard, 11 years ago
added correct checksum calculation for compressed fits reader (zfits)
File size: 19.3 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_ZFITS
9#define MARS_ZFITS
10
11#include <stdexcept>
12
13#include "fits.h"
14#include "huffman.h"
15
16/*
17#define FACT_RAW 0x0
18#define FACT_SMOOTHING 0x1
19#define FACT_HUFFMAN16 0x2
20
21#define FACT_COL_MAJOR 'C'
22#define FACT_ROW_MAJOR 'R'
23*/
24
25
26#ifndef __MARS__
27namespace std
28{
29#endif
30
31enum FACT_COMPRESSION_PROCESS
32{
33 FACT_RAW = 0x0,
34 FACT_SMOOTHING = 0x1,
35 FACT_HUFFMAN16 = 0x2
36};
37
38enum FACT_ROW_ORDERING
39{
40 FACT_COL_MAJOR = 'C',
41 FACT_ROW_MAJOR = 'R'
42};
43
44class zfits : public fits
45{
46public:
47
48 // Basic constructor
49 zfits(const string& fname, const string& tableName="",
50 bool force=false) : fits(fname, tableName, force),
51 fNumTiles(0),
52 fNumRowsPerTile(0),
53 fCurrentRow(-1),
54 fHeapOff(0),
55 fTileSize(0)
56 {
57 InitCompressionReading();
58 }
59
60 // Alternative contstructor
61 zfits(const string& fname, const string& fout, const string& tableName,
62 bool force=false) : fits(fname, fout, tableName, force),
63 fNumTiles(0),
64 fNumRowsPerTile(0),
65 fCurrentRow(-1),
66 fHeapOff(0),
67 fTileSize(0)
68 {
69 InitCompressionReading();
70 }
71
72 // Skip the next row
73 bool SkipNextRow()
74 {
75 if (!fTable.isCompressed)
76 return fits::SkipNextRow();
77
78 fRow++;
79 return true;
80 }
81
82protected:
83
84 // Stage the requested row to internal buffer
85 // Does NOT return data to users
86 virtual void StageRow(size_t row, char* dest)
87 {
88 if (!fTable.isCompressed)
89 {
90 fits::StageRow(row, dest);
91 return;
92 }
93
94 ReadBinaryRow(row, dest);
95 }
96
97private:
98#ifndef __CINT__
99 //Structure helper for reading tiles headers
100 struct TileHeader
101 {
102 char id[4];
103 uint32_t numRows;
104 uint64_t size;
105
106 TileHeader() {}
107
108 TileHeader(uint32_t nRows,
109 uint64_t s) : id({'T', 'I', 'L', 'E'}),
110 numRows(nRows),
111 size(s)
112 { };
113 } __attribute__((__packed__));
114
115 //Structure helper for reading blocks headers and compresion schemes
116 struct BlockHeader
117 {
118 uint64_t size;
119 char ordering;
120 unsigned char numProcs;
121 uint16_t processings[];
122
123 BlockHeader(uint64_t s=0,
124 char o=FACT_ROW_MAJOR,
125 unsigned char n=1) : size(s),
126 ordering(o),
127 numProcs(n)
128 {}
129 } __attribute__((__packed__));
130#endif
131 // Do what it takes to initialize the compressed structured
132 void InitCompressionReading()
133 {
134 if (!fTable.isCompressed)
135 return;
136
137 //The constructor may have failed
138 if (!good())
139 return;
140
141 if (fTable.isCompressed)
142 for (auto it=fTable.sortedCols.begin(); it!= fTable.sortedCols.end(); it++)
143 if (it->comp != FACT)
144 {
145 clear(rdstate()|ios::badbit);
146#ifdef __EXCEPTIONS
147 throw runtime_error("Only the FACT compression scheme is handled by this reader.");
148#else
149 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << endl;
150 return;
151#endif
152 }
153
154 fColumnOrdering.resize(fTable.sortedCols.size());
155 for (auto it=fColumnOrdering.begin(); it != fColumnOrdering.end(); it++)
156 (*it) = FACT_ROW_MAJOR;
157 //Get compressed specific keywords
158 fNumTiles = fTable.isCompressed ? GetInt("NAXIS2") : 0;
159 fNumRowsPerTile = fTable.isCompressed ? GetInt("ZTILELEN") : 0;
160
161 //give it some space for uncompressing
162 AllocateBuffers();
163
164 //read the file's catalog
165 ReadCatalog();
166
167 //check that heap agrees with head
168 //CheckIfFileIsConsistent();
169 }
170
171 // Copy decompressed data to location requested by user
172 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
173 {
174 if (!fTable.isCompressed)
175 {
176 fits::MoveColumnDataToUserSpace(dest, src, c);
177 return;
178 }
179
180 memcpy(dest, src, c.num*c.size);
181 }
182
183 vector<char> fBuffer; ///<store the uncompressed rows
184 vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
185 vector<char> fCompressedBuffer; ///<compressed rows
186 vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
187
188 size_t fNumTiles; ///< Total number of tiles
189 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
190 int64_t fCurrentRow; ///< current row in memory signed because we need -1
191
192 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
193 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
194
195 vector<vector<pair<int64_t, int64_t> > > fCatalog;///< Catalog, i.e. the main table that points to the compressed data.
196 vector<size_t> fTileSize; ///< size in bytes of each compressed tile
197 vector<vector<size_t> > fTileOffsets; ///< offset from start of tile of a given compressed column
198
199 // Get buffer space
200 void AllocateBuffers()
201 {
202 fBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
203
204 fTransposedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
205 fCompressedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile +
206 fTable.num_cols*(sizeof(BlockHeader)+256) + //use a bit more memory for block headers. 256 char coding the compression sequence max.
207 sizeof(TileHeader), //a bit more for the tile headers
208 8); //and a bit more for checksuming
209 }
210
211 // Read catalog data. I.e. the address of the compressed data inside the heap
212 void ReadCatalog()
213 {
214 vector<char> readBuf(16);
215 fCatalog.resize(fNumTiles);
216
217 const streampos catalogStart = tellg();
218
219fChkData.reset();
220
221 //do the actual reading
222 for (uint32_t i=0;i<fNumTiles;i++)
223 for (uint32_t j=0;j<fTable.num_cols;j++)
224 {
225 read(readBuf.data(), 2*sizeof(int64_t));
226 fChkData.add(readBuf);
227 //swap the bytes
228 int64_t tempValues[2] = {0,0};
229 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
230 if (tempValues[0] < 0 || tempValues[1] < 0)
231 {
232 clear(rdstate()|ios::badbit);
233#ifdef __EXCEPTIONS
234 throw runtime_error("Negative value in the catalog");
235#else
236 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
237 return;
238#endif
239 }
240 //add catalog entry
241 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
242 }
243
244 //compute the total size of each compressed tile
245 fTileSize.resize(fNumTiles);
246 fTileOffsets.resize(fNumTiles);
247 for (uint32_t i=0;i<fNumTiles;i++)
248 {
249 fTileSize[i] = 0;
250 for (uint32_t j=0;j<fTable.num_cols;j++)
251 {
252 fTileSize[i] += fCatalog[i][j].first;
253 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
254 }
255 }
256 //see if there is a gap before heap data
257 fHeapOff = tellg()+fTable.GetHeapShift();
258 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
259
260 if (!fCopy.is_open())
261 return;
262
263 //write catalog and heap gap to target file
264 seekg(catalogStart);
265
266 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
267
268 vector<char> buf(catSize);
269 read(buf.data(), catSize);
270
271 fCopy.write(buf.data(), catSize);
272 if (!fCopy)
273 clear(rdstate()|ios::badbit);
274 }
275
276 //overrides fits.h method with empty one
277 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
278 virtual void WriteRowToCopyFile(size_t )
279 {
280
281 }
282
283 // Compressed version of the read row
284 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
285 {
286 if (rowNum >= GetNumRows())
287 return false;
288
289 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
290 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
291
292 bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1));
293
294 fCurrentRow = rowNum;
295 //should we read yet another chunk of data ?
296 if (requestedTile != currentTile)
297 {
298 //read yet another chunk from the file
299 const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(TileHeader);
300
301 //skip to the beginning of the tile
302 const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(TileHeader);
303
304 seekg(fHeapOff+tileStart);
305
306 //calculate the 32 bits offset of the current tile.
307 const uint32_t offset = (tileStart + fHeapFromDataStart)%4;
308
309 //point the tile header where it should be
310 //we ain't checking the header now
311// TileHeader* tHead = reinterpret_cast<TileHeader*>(fCompressedBuffer.data()+offset);
312
313 ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8));
314
315 //read one tile from disk
316 read(fCompressedBuffer.data()+offset, sizeToRead);
317
318 if (addCheckSum)
319 fChkData.add(fCompressedBuffer);
320
321 if (requestedTile == currentTile+1 &&
322 fCopy.is_open() &&
323 fCopy.good())
324 {
325 fCopy.write(fCompressedBuffer.data()+offset, sizeToRead);
326 if (!fCopy)
327 clear(rdstate()|ios::badbit);
328 }
329 else
330 if (fCopy.is_open())
331 clear(rdstate()|ios::badbit);
332
333 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
334
335 //uncompress it
336 UncompressBuffer(requestedTile, thisRoundNumRows, offset+sizeof(TileHeader));
337
338 // pointer to column (source buffer)
339 const char *src = fTransposedBuffer.data();
340
341 uint32_t i=0;
342 for (auto it=fTable.sortedCols.begin(); it!=fTable.sortedCols.end(); it++, i++)
343 {
344 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
345
346 switch (fColumnOrdering[i])
347 {
348 case FACT_ROW_MAJOR:
349 // regular, "semi-transposed" copy
350 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
351 {
352 memcpy(dest, src, it->bytes);
353 src += it->bytes; // next column
354 }
355 break;
356
357 case FACT_COL_MAJOR:
358 // transposed copy
359 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
360 {
361 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
362 {
363 memcpy(dest, src, it->size);
364 src += it->size; // next element
365 }
366 }
367 break;
368 default:
369 clear(rdstate()|ios::badbit);
370 #ifdef __EXCEPTIONS
371 throw runtime_error("Unkown column ordering scheme found");
372 #else
373 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
374 return false;
375 #endif
376 break;
377 };
378 }
379 }
380
381 //Data loaded and uncompressed. Copy it to destination
382 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
383 return good();
384 }
385
386 // Read a bunch of uncompressed data
387 uint32_t UncompressUNCOMPRESSED(char* dest,
388 const char* src,
389 uint32_t numElems,
390 uint32_t sizeOfElems)
391 {
392 memcpy(dest, src, numElems*sizeOfElems);
393 return numElems*sizeOfElems;
394 }
395
396 // Read a bunch of data compressed with the Huffman algorithm
397 uint32_t UncompressHUFFMAN16(char* dest,
398 const char* src,
399 uint32_t numChunks)
400 {
401 vector<uint16_t> uncompressed;
402
403 //read compressed sizes (one per row)
404 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
405 src += sizeof(uint32_t)*numChunks;
406
407 //uncompress the rows, one by one
408 uint32_t sizeWritten = 0;
409 for (uint32_t j=0;j<numChunks;j++)
410 {
411 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
412
413 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
414
415 sizeWritten += uncompressed.size()*sizeof(uint16_t);
416 dest += uncompressed.size()*sizeof(uint16_t);
417 src += compressedSizes[j];
418 }
419 return sizeWritten;
420 }
421
422 // Apply the inverse transform of the integer smoothing
423 uint32_t UnApplySMOOTHING(int16_t* data,
424 uint32_t numElems)
425 {
426 //un-do the integer smoothing
427 for (uint32_t j=2;j<numElems;j++)
428 data[j] = data[j] + (data[j-1]+data[j-2])/2;
429
430 return numElems*sizeof(uint16_t);
431 }
432
433 // Data has been read from disk. Uncompress it !
434 void UncompressBuffer(const uint32_t &catalogCurrentRow,
435 const uint32_t &thisRoundNumRows,
436 const uint32_t offset)
437 {
438 char *dest = fTransposedBuffer.data();
439
440 //uncompress column by column
441 for (uint32_t i=0; i<fTable.sortedCols.size(); i++)
442 {
443 const fits::Table::Column &col = fTable.sortedCols[i];
444 if (col.num == 0)
445 continue;
446
447 //get the compression flag
448 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i]+offset;
449
450 const BlockHeader* head = reinterpret_cast<BlockHeader*>(&fCompressedBuffer[compressedOffset]);
451
452 fColumnOrdering[i] = head->ordering;
453
454 const uint32_t numRows = (head->ordering==FACT_ROW_MAJOR) ? thisRoundNumRows : col.num;
455 const uint32_t numCols = (head->ordering==FACT_COL_MAJOR) ? thisRoundNumRows : col.num;
456
457 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(BlockHeader)+sizeof(uint16_t)*head->numProcs;
458
459 for (int32_t j=head->numProcs-1;j >= 0; j--)
460 {
461 uint32_t sizeWritten=0;
462
463 switch (head->processings[j])
464 {
465 case FACT_RAW:
466 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
467 break;
468 case FACT_SMOOTHING:
469 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
470 break;
471 case FACT_HUFFMAN16:
472 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
473 break;
474 default:
475 clear(rdstate()|ios::badbit);
476#ifdef __EXCEPTIONS
477 throw runtime_error("Unknown processing applied to data. Aborting");
478#else
479 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
480 return;
481#endif
482 }
483 //increment destination counter only when processing done.
484 if (j==0) dest+= sizeWritten;
485 }
486 }
487 }
488
489 void CheckIfFileIsConsistent()
490 {
491 //goto start of heap
492 streamoff whereAreWe = tellg();
493 seekg(fHeapOff);
494
495 //init number of rows to zero
496 uint64_t numRows = 0;
497
498 //get number of columns from header
499 size_t numCols = fTable.num_cols;
500
501 vector<vector<pair<int64_t, int64_t> > > catalog;
502
503 TileHeader tileHead;
504 BlockHeader columnHead;
505 streamoff offsetInHeap = 0;
506 //skip through the heap
507 while (true)
508 {
509 read((char*)(&tileHead), sizeof(TileHeader));
510 //end of file
511 if (!good())
512 break;
513 //padding or corrupt data
514 if (memcmp(tileHead.id, "TILE", 4))
515 {
516 clear(rdstate()|ios::badbit);
517 break;
518 }
519
520 //a new tile begins here
521 catalog.push_back(vector<pair<int64_t, int64_t> >(0));
522 offsetInHeap += sizeof(TileHeader);
523
524 //skip through the columns
525 for (size_t i=0;i<numCols;i++)
526 {
527 //zero sized column do not have headers. Skip it
528 if (fTable.sortedCols[i].num == 0)
529 {
530 catalog.back().push_back(make_pair(0,0));
531 continue;
532 }
533 //read column header
534 read((char*)(&columnHead), sizeof(BlockHeader));
535 //corrupted tile
536 if (!good())
537 break;
538 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
539 offsetInHeap += columnHead.size;
540 seekg(fHeapOff+offsetInHeap);
541 }
542
543 //if we ain't good, this means that something went wrong inside the current tile.
544 if (!good())
545 {
546 catalog.pop_back();
547 break;
548 }
549
550 //current tile is complete. Add rows
551 numRows += tileHead.numRows;
552 }
553
554 if (catalog.size() != fCatalog.size() ||
555 numRows != fTable.num_rows)
556 {
557 clear(rdstate()|ios::badbit);
558#ifdef __EXCEPTIONS
559 throw runtime_error("Heap data does not agree with header.");
560#else
561 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
562 return;
563#endif
564 }
565
566 for (uint32_t i=0;i<catalog.size(); i++)
567 for (uint32_t j=0;j<numCols;j++)
568 {
569 if (catalog[i][j].first != fCatalog[i][j].first ||
570 catalog[i][j].second != fCatalog[i][j].second)
571 {
572 clear(rdstate()|ios::badbit);
573#ifdef __EXCEPTIONS
574 throw runtime_error("Heap data does not agree with header.");
575#else
576 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
577 return;
578#endif
579 }
580 }
581 //go back to start of heap
582 seekg(whereAreWe);
583 }
584
585};//class zfits
586
587#ifndef __MARS__
588}; //namespace std
589#endif
590
591#endif
Note: See TracBrowser for help on using the repository browser.