source: trunk/Mars/mcore/zfits.h@ 17138

Last change on this file since 17138 was 17137, checked in by tbretz, 11 years ago
Reverting to last revision.
File size: 20.8 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_ZFITS
9#define MARS_ZFITS
10
11#include <stdexcept>
12
13#include "fits.h"
14#include "huffman.h"
15
16#ifndef __MARS__
17namespace std
18{
19#endif
20
21class zfits : public fits
22{
23public:
24
25 enum CompressionProcess_t
26 {
27 kFactRaw = 0x0,
28 kFactSmoothing = 0x1,
29 kFactHuffman16 = 0x2
30 };
31
32 enum RowOrdering_t
33 {
34 kOrderByCol = 'C',
35 kOrderByRow = 'R'
36 };
37
38 // Basic constructor
39 zfits(const string& fname, const string& tableName="", bool force=false) : fits(),
40 fNumTiles(0),
41 fNumRowsPerTile(0),
42 fCurrentRow(-1),
43 fHeapOff(0),
44 fTileSize(0)
45 {
46 open(fname.c_str());
47 Constructor(fname, "", tableName, force);
48 InitCompressionReading();
49 }
50
51 // Alternative contstructor
52 zfits(const string& fname, const string& fout, const string& tableName, bool force=false) : fits(),
53 fNumTiles(0),
54 fNumRowsPerTile(0),
55 fCurrentRow(-1),
56 fHeapOff(0),
57 fTileSize(0)
58 {
59 open(fname.c_str());
60 Constructor(fname, fout, tableName, force);
61 InitCompressionReading();
62 }
63
64 // Skip the next row
65 bool SkipNextRow()
66 {
67 if (!fTable.is_compressed)
68 return fits::SkipNextRow();
69
70 fRow++;
71 return true;
72 }
73
74 virtual bool IsFileOk() const
75 {
76 bool rawsum = true;
77
78 if (HasKey("RAWSUM"))
79 {
80 ostringstream str;
81 str << fRawsum.val();
82 rawsum = (GetStr("RAWSUM") == str.str());
83 }
84
85 return fits::IsFileOk() && rawsum;
86 };
87
88 size_t GetNumRows() const
89 {
90 if (fTable.is_compressed)
91 return fTable.Get<size_t>("ZNAXIS2");
92 else
93 return fTable.Get<size_t>("NAXIS2");
94 }
95 size_t GetBytesPerRow() const
96 {
97 if (fTable.is_compressed)
98 return fTable.Get<size_t>("ZNAXIS1");
99 else
100 return fTable.Get<size_t>("NAXIS1");
101 }
102
103protected:
104
105 // Stage the requested row to internal buffer
106 // Does NOT return data to users
107 virtual void StageRow(size_t row, char* dest)
108 {
109 if (!fTable.is_compressed)
110 {
111 fits::StageRow(row, dest);
112 return;
113 }
114
115 ReadBinaryRow(row, dest);
116 }
117
118private:
119#ifndef __CINT__
120 //Structure helper for reading tiles headers
121 struct TileHeader
122 {
123 char id[4];
124 uint32_t numRows;
125 uint64_t size;
126
127 TileHeader() {}
128
129 TileHeader(uint32_t nRows,
130 uint64_t s) : id({'T', 'I', 'L', 'E'}),
131 numRows(nRows),
132 size(s)
133 { };
134 } __attribute__((__packed__));
135
136 //Structure helper for reading blocks headers and compresion schemes
137 struct BlockHeader
138 {
139 uint64_t size;
140 char ordering;
141 unsigned char numProcs;
142 uint16_t processings[];
143
144 BlockHeader(uint64_t s=0,
145 char o=kOrderByRow,
146 unsigned char n=1) : size(s),
147 ordering(o),
148 numProcs(n)
149 {}
150 } __attribute__((__packed__));
151#endif
152 // Do what it takes to initialize the compressed structured
153 void InitCompressionReading()
154 {
155 if (!fTable.is_compressed)
156 return;
157
158 //The constructor may have failed
159 if (!good())
160 return;
161
162 if (fTable.is_compressed)
163 for (auto it=fTable.sorted_cols.begin(); it!= fTable.sorted_cols.end(); it++)
164 {
165 if (it->comp == kCompFACT)
166 continue;
167
168 clear(rdstate()|ios::badbit);
169#ifdef __EXCEPTIONS
170 throw runtime_error("Only the FACT compression scheme is handled by this reader.");
171#else
172 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << endl;
173 return;
174#endif
175 }
176
177 fColumnOrdering.resize(fTable.sorted_cols.size());
178 for (auto it=fColumnOrdering.begin(); it != fColumnOrdering.end(); it++)
179 (*it) = kOrderByRow;
180 //Get compressed specific keywords
181 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
182 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
183
184 //give it some space for uncompressing
185 AllocateBuffers();
186
187 //read the file's catalog
188 ReadCatalog();
189
190 //check that heap agrees with head
191 //CheckIfFileIsConsistent();
192 }
193
194 // Copy decompressed data to location requested by user
195 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
196 {
197 if (!fTable.is_compressed)
198 {
199 fits::MoveColumnDataToUserSpace(dest, src, c);
200 return;
201 }
202
203 memcpy(dest, src, c.num*c.size);
204 }
205
206 vector<char> fBuffer; ///<store the uncompressed rows
207 vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
208 vector<char> fCompressedBuffer; ///<compressed rows
209 vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
210
211 size_t fNumTiles; ///< Total number of tiles
212 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
213 int64_t fCurrentRow; ///< current row in memory signed because we need -1
214
215 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
216 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
217
218 vector<vector<pair<int64_t, int64_t>>> fCatalog;///< Catalog, i.e. the main table that points to the compressed data.
219 vector<size_t> fTileSize; ///< size in bytes of each compressed tile
220 vector<vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
221
222 Checksum fRawsum; ///< Checksum of the uncompressed, raw data
223
224 // Get buffer space
225 void AllocateBuffers()
226 {
227 fBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
228
229 fTransposedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
230 fCompressedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile +
231 fTable.num_cols*(sizeof(BlockHeader)+256) + //use a bit more memory for block headers. 256 char coding the compression sequence max.
232 sizeof(TileHeader), //a bit more for the tile headers
233 8); //and a bit more for checksuming
234 }
235
236 // Read catalog data. I.e. the address of the compressed data inside the heap
237 void ReadCatalog()
238 {
239 vector<char> readBuf(16);
240 fCatalog.resize(fNumTiles);
241
242 const streampos catalogStart = tellg();
243
244fChkData.reset();
245
246 //do the actual reading
247 for (uint32_t i=0;i<fNumTiles;i++)
248 for (uint32_t j=0;j<fTable.num_cols;j++)
249 {
250 read(readBuf.data(), 2*sizeof(int64_t));
251 fChkData.add(readBuf);
252 //swap the bytes
253 int64_t tempValues[2] = {0,0};
254 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
255 if (tempValues[0] < 0 || tempValues[1] < 0)
256 {
257 clear(rdstate()|ios::badbit);
258#ifdef __EXCEPTIONS
259 throw runtime_error("Negative value in the catalog");
260#else
261 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
262 return;
263#endif
264 }
265 //add catalog entry
266 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
267 }
268
269 //compute the total size of each compressed tile
270 fTileSize.resize(fNumTiles);
271 fTileOffsets.resize(fNumTiles);
272 for (uint32_t i=0;i<fNumTiles;i++)
273 {
274 fTileSize[i] = 0;
275 for (uint32_t j=0;j<fTable.num_cols;j++)
276 {
277 fTileSize[i] += fCatalog[i][j].first;
278 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
279 }
280 }
281 //see if there is a gap before heap data
282 fHeapOff = tellg()+fTable.GetHeapShift();
283 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
284
285 if (!fCopy.is_open())
286 return;
287
288 //write catalog and heap gap to target file
289 seekg(catalogStart);
290
291 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
292
293 vector<char> buf(catSize);
294 read(buf.data(), catSize);
295
296 fCopy.write(buf.data(), catSize);
297 if (!fCopy)
298 clear(rdstate()|ios::badbit);
299 }
300
301 //overrides fits.h method with empty one
302 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
303 virtual void WriteRowToCopyFile(size_t row)
304 {
305 if (row == fRow+1)
306 fRawsum.add(fBufferRow, false);
307 }
308
309 // Compressed version of the read row
310 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
311 {
312 if (rowNum >= GetNumRows())
313 return false;
314
315 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
316 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
317
318 bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1));
319
320 fCurrentRow = rowNum;
321 //should we read yet another chunk of data ?
322 if (requestedTile != currentTile)
323 {
324 //read yet another chunk from the file
325 const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(TileHeader);
326
327 //skip to the beginning of the tile
328 const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(TileHeader);
329
330 seekg(fHeapOff+tileStart);
331
332 //calculate the 32 bits offset of the current tile.
333 const uint32_t offset = (tileStart + fHeapFromDataStart)%4;
334
335 //point the tile header where it should be
336 //we ain't checking the header now
337// TileHeader* tHead = reinterpret_cast<TileHeader*>(fCompressedBuffer.data()+offset);
338
339 ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8));
340
341 //read one tile from disk
342 read(fCompressedBuffer.data()+offset, sizeToRead);
343
344 if (addCheckSum)
345 fChkData.add(fCompressedBuffer);
346
347 if (requestedTile == currentTile+1 &&
348 fCopy.is_open() &&
349 fCopy.good())
350 {
351 fCopy.write(fCompressedBuffer.data()+offset, sizeToRead);
352 if (!fCopy)
353 clear(rdstate()|ios::badbit);
354 }
355 else
356 if (fCopy.is_open())
357 clear(rdstate()|ios::badbit);
358
359 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
360
361 //uncompress it
362 UncompressBuffer(requestedTile, thisRoundNumRows, offset+sizeof(TileHeader));
363
364 // pointer to column (source buffer)
365 const char *src = fTransposedBuffer.data();
366
367 uint32_t i=0;
368 for (auto it=fTable.sorted_cols.begin(); it!=fTable.sorted_cols.end(); it++, i++)
369 {
370 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
371
372 switch (fColumnOrdering[i])
373 {
374 case kOrderByRow:
375 // regular, "semi-transposed" copy
376 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
377 {
378 memcpy(dest, src, it->bytes);
379 src += it->bytes; // next column
380 }
381 break;
382
383 case kOrderByCol:
384 // transposed copy
385 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
386 {
387 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
388 {
389 memcpy(dest, src, it->size);
390 src += it->size; // next element
391 }
392 }
393 break;
394 default:
395 clear(rdstate()|ios::badbit);
396 #ifdef __EXCEPTIONS
397 throw runtime_error("Unkown column ordering scheme found");
398 #else
399 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
400 return false;
401 #endif
402 break;
403 };
404 }
405 }
406
407 //Data loaded and uncompressed. Copy it to destination
408 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
409 return good();
410 }
411
412 // Read a bunch of uncompressed data
413 uint32_t UncompressUNCOMPRESSED(char* dest,
414 const char* src,
415 uint32_t numElems,
416 uint32_t sizeOfElems)
417 {
418 memcpy(dest, src, numElems*sizeOfElems);
419 return numElems*sizeOfElems;
420 }
421
422 // Read a bunch of data compressed with the Huffman algorithm
423 uint32_t UncompressHUFFMAN16(char* dest,
424 const char* src,
425 uint32_t numChunks)
426 {
427 vector<uint16_t> uncompressed;
428
429 //read compressed sizes (one per row)
430 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
431 src += sizeof(uint32_t)*numChunks;
432
433 //uncompress the rows, one by one
434 uint32_t sizeWritten = 0;
435 for (uint32_t j=0;j<numChunks;j++)
436 {
437 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
438
439 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
440
441 sizeWritten += uncompressed.size()*sizeof(uint16_t);
442 dest += uncompressed.size()*sizeof(uint16_t);
443 src += compressedSizes[j];
444 }
445 return sizeWritten;
446 }
447
448 // Apply the inverse transform of the integer smoothing
449 uint32_t UnApplySMOOTHING(int16_t* data,
450 uint32_t numElems)
451 {
452 //un-do the integer smoothing
453 for (uint32_t j=2;j<numElems;j++)
454 data[j] = data[j] + (data[j-1]+data[j-2])/2;
455
456 return numElems*sizeof(uint16_t);
457 }
458
459 // Data has been read from disk. Uncompress it !
460 void UncompressBuffer(const uint32_t &catalogCurrentRow,
461 const uint32_t &thisRoundNumRows,
462 const uint32_t offset)
463 {
464 char *dest = fTransposedBuffer.data();
465
466 //uncompress column by column
467 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
468 {
469 const fits::Table::Column &col = fTable.sorted_cols[i];
470 if (col.num == 0)
471 continue;
472
473 //get the compression flag
474 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i]+offset;
475
476 const BlockHeader* head = reinterpret_cast<BlockHeader*>(&fCompressedBuffer[compressedOffset]);
477
478 fColumnOrdering[i] = head->ordering;
479
480 const uint32_t numRows = (head->ordering==kOrderByRow) ? thisRoundNumRows : col.num;
481 const uint32_t numCols = (head->ordering==kOrderByCol) ? thisRoundNumRows : col.num;
482
483 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(BlockHeader)+sizeof(uint16_t)*head->numProcs;
484
485 for (int32_t j=head->numProcs-1;j >= 0; j--)
486 {
487 uint32_t sizeWritten=0;
488
489 switch (head->processings[j])
490 {
491 case kFactRaw:
492 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
493 break;
494
495 case kFactSmoothing:
496 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
497 break;
498
499 case kFactHuffman16:
500 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
501 break;
502
503 default:
504 clear(rdstate()|ios::badbit);
505#ifdef __EXCEPTIONS
506 throw runtime_error("Unknown processing applied to data. Aborting");
507#else
508 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
509 return;
510#endif
511 }
512 //increment destination counter only when processing done.
513 if (j==0)
514 dest+= sizeWritten;
515 }
516 }
517 }
518
519 void CheckIfFileIsConsistent()
520 {
521 //goto start of heap
522 streamoff whereAreWe = tellg();
523 seekg(fHeapOff);
524
525 //init number of rows to zero
526 uint64_t numRows = 0;
527
528 //get number of columns from header
529 size_t numCols = fTable.num_cols;
530
531 vector<vector<pair<int64_t, int64_t> > > catalog;
532
533 TileHeader tileHead;
534 BlockHeader columnHead;
535 streamoff offsetInHeap = 0;
536 //skip through the heap
537 while (true)
538 {
539 read((char*)(&tileHead), sizeof(TileHeader));
540 //end of file
541 if (!good())
542 break;
543 //padding or corrupt data
544 if (memcmp(tileHead.id, "TILE", 4))
545 {
546 clear(rdstate()|ios::badbit);
547 break;
548 }
549
550 //a new tile begins here
551 catalog.push_back(vector<pair<int64_t, int64_t> >(0));
552 offsetInHeap += sizeof(TileHeader);
553
554 //skip through the columns
555 for (size_t i=0;i<numCols;i++)
556 {
557 //zero sized column do not have headers. Skip it
558 if (fTable.sorted_cols[i].num == 0)
559 {
560 catalog.back().push_back(make_pair(0,0));
561 continue;
562 }
563 //read column header
564 read((char*)(&columnHead), sizeof(BlockHeader));
565 //corrupted tile
566 if (!good())
567 break;
568 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
569 offsetInHeap += columnHead.size;
570 seekg(fHeapOff+offsetInHeap);
571 }
572
573 //if we ain't good, this means that something went wrong inside the current tile.
574 if (!good())
575 {
576 catalog.pop_back();
577 break;
578 }
579
580 //current tile is complete. Add rows
581 numRows += tileHead.numRows;
582 }
583
584 if (catalog.size() != fCatalog.size() ||
585 numRows != fTable.num_rows)
586 {
587 clear(rdstate()|ios::badbit);
588#ifdef __EXCEPTIONS
589 throw runtime_error("Heap data does not agree with header.");
590#else
591 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
592 return;
593#endif
594 }
595
596 for (uint32_t i=0;i<catalog.size(); i++)
597 for (uint32_t j=0;j<numCols;j++)
598 {
599 if (catalog[i][j].first != fCatalog[i][j].first ||
600 catalog[i][j].second != fCatalog[i][j].second)
601 {
602 clear(rdstate()|ios::badbit);
603#ifdef __EXCEPTIONS
604 throw runtime_error("Heap data does not agree with header.");
605#else
606 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
607 return;
608#endif
609 }
610 }
611 //go back to start of heap
612 seekg(whereAreWe);
613 }
614
615};//class zfits
616
617#ifndef __MARS__
618}; //namespace std
619#endif
620
621#endif
Note: See TracBrowser for help on using the repository browser.