source: trunk/Mars/mcore/zfits.h@ 17142

Last change on this file since 17142 was 17141, checked in by tbretz, 11 years ago
More inclused to make any include order possible, removed some obsolete cint hacks
File size: 20.8 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_zfits
9#define MARS_zfits
10
11#include "fits.h"
12#include "huffman.h"
13
14#ifndef __MARS__
15namespace std
16{
17#endif
18
19class zfits : public fits
20{
21public:
22
23 enum CompressionProcess_t
24 {
25 kFactRaw = 0x0,
26 kFactSmoothing = 0x1,
27 kFactHuffman16 = 0x2
28 };
29
30 enum RowOrdering_t
31 {
32 kOrderByCol = 'C',
33 kOrderByRow = 'R'
34 };
35
36 // Basic constructor
37 zfits(const string& fname, const string& tableName="", bool force=false) : fits(),
38 fNumTiles(0),
39 fNumRowsPerTile(0),
40 fCurrentRow(-1),
41 fHeapOff(0),
42 fTileSize(0)
43 {
44 open(fname.c_str());
45 Constructor(fname, "", tableName, force);
46 InitCompressionReading();
47 }
48
49 // Alternative contstructor
50 zfits(const string& fname, const string& fout, const string& tableName, bool force=false) : fits(),
51 fNumTiles(0),
52 fNumRowsPerTile(0),
53 fCurrentRow(-1),
54 fHeapOff(0),
55 fTileSize(0)
56 {
57 open(fname.c_str());
58 Constructor(fname, fout, tableName, force);
59 InitCompressionReading();
60 }
61
62 // Skip the next row
63 bool SkipNextRow()
64 {
65 if (!fTable.is_compressed)
66 return fits::SkipNextRow();
67
68 fRow++;
69 return true;
70 }
71
72 virtual bool IsFileOk() const
73 {
74 bool rawsum = true;
75
76 if (HasKey("RAWSUM"))
77 {
78 ostringstream str;
79 str << fRawsum.val();
80 rawsum = (GetStr("RAWSUM") == str.str());
81 }
82
83 return fits::IsFileOk() && rawsum;
84 };
85
86 size_t GetNumRows() const
87 {
88 if (fTable.is_compressed)
89 return fTable.Get<size_t>("ZNAXIS2");
90 else
91 return fTable.Get<size_t>("NAXIS2");
92 }
93 size_t GetBytesPerRow() const
94 {
95 if (fTable.is_compressed)
96 return fTable.Get<size_t>("ZNAXIS1");
97 else
98 return fTable.Get<size_t>("NAXIS1");
99 }
100
101protected:
102
103 // Stage the requested row to internal buffer
104 // Does NOT return data to users
105 virtual void StageRow(size_t row, char* dest)
106 {
107 if (!fTable.is_compressed)
108 {
109 fits::StageRow(row, dest);
110 return;
111 }
112
113 ReadBinaryRow(row, dest);
114 }
115
116private:
117#ifndef __CINT__
118 //Structure helper for reading tiles headers
119 struct TileHeader
120 {
121 char id[4];
122 uint32_t numRows;
123 uint64_t size;
124
125 TileHeader() {}
126
127 TileHeader(uint32_t nRows,
128 uint64_t s) : id({'T', 'I', 'L', 'E'}),
129 numRows(nRows),
130 size(s)
131 { };
132 } __attribute__((__packed__));
133
134 //Structure helper for reading blocks headers and compresion schemes
135 struct BlockHeader
136 {
137 uint64_t size;
138 char ordering;
139 unsigned char numProcs;
140 uint16_t processings[];
141
142 BlockHeader(uint64_t s=0,
143 char o=kOrderByRow,
144 unsigned char n=1) : size(s),
145 ordering(o),
146 numProcs(n)
147 {}
148 } __attribute__((__packed__));
149#endif
150 // Do what it takes to initialize the compressed structured
151 void InitCompressionReading()
152 {
153 if (!fTable.is_compressed)
154 return;
155
156 //The constructor may have failed
157 if (!good())
158 return;
159
160 if (fTable.is_compressed)
161 for (auto it=fTable.sorted_cols.begin(); it!= fTable.sorted_cols.end(); it++)
162 {
163 if (it->comp == kCompFACT)
164 continue;
165
166 clear(rdstate()|ios::badbit);
167#ifdef __EXCEPTIONS
168 throw runtime_error("Only the FACT compression scheme is handled by this reader.");
169#else
170 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << endl;
171 return;
172#endif
173 }
174
175 fColumnOrdering.resize(fTable.sorted_cols.size());
176 for (auto it=fColumnOrdering.begin(); it != fColumnOrdering.end(); it++)
177 (*it) = kOrderByRow;
178 //Get compressed specific keywords
179 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
180 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
181
182 //give it some space for uncompressing
183 AllocateBuffers();
184
185 //read the file's catalog
186 ReadCatalog();
187
188 //check that heap agrees with head
189 //CheckIfFileIsConsistent();
190 }
191
192 // Copy decompressed data to location requested by user
193 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
194 {
195 if (!fTable.is_compressed)
196 {
197 fits::MoveColumnDataToUserSpace(dest, src, c);
198 return;
199 }
200
201 memcpy(dest, src, c.num*c.size);
202 }
203
204 vector<char> fBuffer; ///<store the uncompressed rows
205 vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
206 vector<char> fCompressedBuffer; ///<compressed rows
207 vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
208
209 size_t fNumTiles; ///< Total number of tiles
210 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
211 int64_t fCurrentRow; ///< current row in memory signed because we need -1
212
213 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
214 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
215
216 vector<vector<pair<int64_t, int64_t>>> fCatalog;///< Catalog, i.e. the main table that points to the compressed data.
217 vector<size_t> fTileSize; ///< size in bytes of each compressed tile
218 vector<vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
219
220 Checksum fRawsum; ///< Checksum of the uncompressed, raw data
221
222 // Get buffer space
223 void AllocateBuffers()
224 {
225 fBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
226
227 fTransposedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
228 fCompressedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile +
229 fTable.num_cols*(sizeof(BlockHeader)+256) + //use a bit more memory for block headers. 256 char coding the compression sequence max.
230 sizeof(TileHeader), //a bit more for the tile headers
231 8); //and a bit more for checksuming
232 }
233
234 // Read catalog data. I.e. the address of the compressed data inside the heap
235 void ReadCatalog()
236 {
237 vector<char> readBuf(16);
238 fCatalog.resize(fNumTiles);
239
240 const streampos catalogStart = tellg();
241
242fChkData.reset();
243
244 //do the actual reading
245 for (uint32_t i=0;i<fNumTiles;i++)
246 for (uint32_t j=0;j<fTable.num_cols;j++)
247 {
248 read(readBuf.data(), 2*sizeof(int64_t));
249 fChkData.add(readBuf);
250 //swap the bytes
251 int64_t tempValues[2] = {0,0};
252 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
253 if (tempValues[0] < 0 || tempValues[1] < 0)
254 {
255 clear(rdstate()|ios::badbit);
256#ifdef __EXCEPTIONS
257 throw runtime_error("Negative value in the catalog");
258#else
259 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
260 return;
261#endif
262 }
263 //add catalog entry
264 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
265 }
266
267 //compute the total size of each compressed tile
268 fTileSize.resize(fNumTiles);
269 fTileOffsets.resize(fNumTiles);
270 for (uint32_t i=0;i<fNumTiles;i++)
271 {
272 fTileSize[i] = 0;
273 for (uint32_t j=0;j<fTable.num_cols;j++)
274 {
275 fTileSize[i] += fCatalog[i][j].first;
276 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
277 }
278 }
279 //see if there is a gap before heap data
280 fHeapOff = tellg()+fTable.GetHeapShift();
281 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
282
283 if (!fCopy.is_open())
284 return;
285
286 //write catalog and heap gap to target file
287 seekg(catalogStart);
288
289 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
290
291 vector<char> buf(catSize);
292 read(buf.data(), catSize);
293
294 fCopy.write(buf.data(), catSize);
295 if (!fCopy)
296 clear(rdstate()|ios::badbit);
297 }
298
299 //overrides fits.h method with empty one
300 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
301 virtual void WriteRowToCopyFile(size_t row)
302 {
303 if (row == fRow+1)
304 fRawsum.add(fBufferRow, false);
305 }
306
307 // Compressed version of the read row
308 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
309 {
310 if (rowNum >= GetNumRows())
311 return false;
312
313 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
314 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
315
316 bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1));
317
318 fCurrentRow = rowNum;
319 //should we read yet another chunk of data ?
320 if (requestedTile != currentTile)
321 {
322 //read yet another chunk from the file
323 const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(TileHeader);
324
325 //skip to the beginning of the tile
326 const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(TileHeader);
327
328 seekg(fHeapOff+tileStart);
329
330 //calculate the 32 bits offset of the current tile.
331 const uint32_t offset = (tileStart + fHeapFromDataStart)%4;
332
333 //point the tile header where it should be
334 //we ain't checking the header now
335// TileHeader* tHead = reinterpret_cast<TileHeader*>(fCompressedBuffer.data()+offset);
336
337 ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8));
338
339 //read one tile from disk
340 read(fCompressedBuffer.data()+offset, sizeToRead);
341
342 if (addCheckSum)
343 fChkData.add(fCompressedBuffer);
344
345 if (requestedTile == currentTile+1 &&
346 fCopy.is_open() &&
347 fCopy.good())
348 {
349 fCopy.write(fCompressedBuffer.data()+offset, sizeToRead);
350 if (!fCopy)
351 clear(rdstate()|ios::badbit);
352 }
353 else
354 if (fCopy.is_open())
355 clear(rdstate()|ios::badbit);
356
357 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
358
359 //uncompress it
360 UncompressBuffer(requestedTile, thisRoundNumRows, offset+sizeof(TileHeader));
361
362 // pointer to column (source buffer)
363 const char *src = fTransposedBuffer.data();
364
365 uint32_t i=0;
366 for (auto it=fTable.sorted_cols.begin(); it!=fTable.sorted_cols.end(); it++, i++)
367 {
368 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
369
370 switch (fColumnOrdering[i])
371 {
372 case kOrderByRow:
373 // regular, "semi-transposed" copy
374 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
375 {
376 memcpy(dest, src, it->bytes);
377 src += it->bytes; // next column
378 }
379 break;
380
381 case kOrderByCol:
382 // transposed copy
383 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
384 {
385 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
386 {
387 memcpy(dest, src, it->size);
388 src += it->size; // next element
389 }
390 }
391 break;
392 default:
393 clear(rdstate()|ios::badbit);
394 #ifdef __EXCEPTIONS
395 throw runtime_error("Unkown column ordering scheme found");
396 #else
397 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
398 return false;
399 #endif
400 break;
401 };
402 }
403 }
404
405 //Data loaded and uncompressed. Copy it to destination
406 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
407 return good();
408 }
409
410 // Read a bunch of uncompressed data
411 uint32_t UncompressUNCOMPRESSED(char* dest,
412 const char* src,
413 uint32_t numElems,
414 uint32_t sizeOfElems)
415 {
416 memcpy(dest, src, numElems*sizeOfElems);
417 return numElems*sizeOfElems;
418 }
419
420 // Read a bunch of data compressed with the Huffman algorithm
421 uint32_t UncompressHUFFMAN16(char* dest,
422 const char* src,
423 uint32_t numChunks)
424 {
425 vector<uint16_t> uncompressed;
426
427 //read compressed sizes (one per row)
428 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
429 src += sizeof(uint32_t)*numChunks;
430
431 //uncompress the rows, one by one
432 uint32_t sizeWritten = 0;
433 for (uint32_t j=0;j<numChunks;j++)
434 {
435 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
436
437 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
438
439 sizeWritten += uncompressed.size()*sizeof(uint16_t);
440 dest += uncompressed.size()*sizeof(uint16_t);
441 src += compressedSizes[j];
442 }
443 return sizeWritten;
444 }
445
446 // Apply the inverse transform of the integer smoothing
447 uint32_t UnApplySMOOTHING(int16_t* data,
448 uint32_t numElems)
449 {
450 //un-do the integer smoothing
451 for (uint32_t j=2;j<numElems;j++)
452 data[j] = data[j] + (data[j-1]+data[j-2])/2;
453
454 return numElems*sizeof(uint16_t);
455 }
456
457 // Data has been read from disk. Uncompress it !
458 void UncompressBuffer(const uint32_t &catalogCurrentRow,
459 const uint32_t &thisRoundNumRows,
460 const uint32_t offset)
461 {
462 char *dest = fTransposedBuffer.data();
463
464 //uncompress column by column
465 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
466 {
467 const fits::Table::Column &col = fTable.sorted_cols[i];
468 if (col.num == 0)
469 continue;
470
471 //get the compression flag
472 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i]+offset;
473
474 const BlockHeader* head = reinterpret_cast<BlockHeader*>(&fCompressedBuffer[compressedOffset]);
475
476 fColumnOrdering[i] = head->ordering;
477
478 const uint32_t numRows = (head->ordering==kOrderByRow) ? thisRoundNumRows : col.num;
479 const uint32_t numCols = (head->ordering==kOrderByCol) ? thisRoundNumRows : col.num;
480
481 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(BlockHeader)+sizeof(uint16_t)*head->numProcs;
482
483 for (int32_t j=head->numProcs-1;j >= 0; j--)
484 {
485 uint32_t sizeWritten=0;
486
487 switch (head->processings[j])
488 {
489 case kFactRaw:
490 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
491 break;
492
493 case kFactSmoothing:
494 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
495 break;
496
497 case kFactHuffman16:
498 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
499 break;
500
501 default:
502 clear(rdstate()|ios::badbit);
503#ifdef __EXCEPTIONS
504 throw runtime_error("Unknown processing applied to data. Aborting");
505#else
506 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
507 return;
508#endif
509 }
510 //increment destination counter only when processing done.
511 if (j==0)
512 dest+= sizeWritten;
513 }
514 }
515 }
516
517 void CheckIfFileIsConsistent()
518 {
519 //goto start of heap
520 streamoff whereAreWe = tellg();
521 seekg(fHeapOff);
522
523 //init number of rows to zero
524 uint64_t numRows = 0;
525
526 //get number of columns from header
527 size_t numCols = fTable.num_cols;
528
529 vector<vector<pair<int64_t, int64_t> > > catalog;
530
531 TileHeader tileHead;
532 BlockHeader columnHead;
533 streamoff offsetInHeap = 0;
534 //skip through the heap
535 while (true)
536 {
537 read((char*)(&tileHead), sizeof(TileHeader));
538 //end of file
539 if (!good())
540 break;
541 //padding or corrupt data
542 if (memcmp(tileHead.id, "TILE", 4))
543 {
544 clear(rdstate()|ios::badbit);
545 break;
546 }
547
548 //a new tile begins here
549 catalog.push_back(vector<pair<int64_t, int64_t> >(0));
550 offsetInHeap += sizeof(TileHeader);
551
552 //skip through the columns
553 for (size_t i=0;i<numCols;i++)
554 {
555 //zero sized column do not have headers. Skip it
556 if (fTable.sorted_cols[i].num == 0)
557 {
558 catalog.back().push_back(make_pair(0,0));
559 continue;
560 }
561 //read column header
562 read((char*)(&columnHead), sizeof(BlockHeader));
563 //corrupted tile
564 if (!good())
565 break;
566 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
567 offsetInHeap += columnHead.size;
568 seekg(fHeapOff+offsetInHeap);
569 }
570
571 //if we ain't good, this means that something went wrong inside the current tile.
572 if (!good())
573 {
574 catalog.pop_back();
575 break;
576 }
577
578 //current tile is complete. Add rows
579 numRows += tileHead.numRows;
580 }
581
582 if (catalog.size() != fCatalog.size() ||
583 numRows != fTable.num_rows)
584 {
585 clear(rdstate()|ios::badbit);
586#ifdef __EXCEPTIONS
587 throw runtime_error("Heap data does not agree with header.");
588#else
589 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
590 return;
591#endif
592 }
593
594 for (uint32_t i=0;i<catalog.size(); i++)
595 for (uint32_t j=0;j<numCols;j++)
596 {
597 if (catalog[i][j].first != fCatalog[i][j].first ||
598 catalog[i][j].second != fCatalog[i][j].second)
599 {
600 clear(rdstate()|ios::badbit);
601#ifdef __EXCEPTIONS
602 throw runtime_error("Heap data does not agree with header.");
603#else
604 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
605 return;
606#endif
607 }
608 }
609 //go back to start of heap
610 seekg(whereAreWe);
611 }
612
613};//class zfits
614
615#ifndef __MARS__
616}; //namespace std
617#endif
618
619#endif
Note: See TracBrowser for help on using the repository browser.