source: trunk/Mars/mcore/zfits.h@ 16898

Last change on this file since 16898 was 16898, checked in by lyard, 11 years ago
Propagated the new naming of enums to other files....
File size: 19.6 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_ZFITS
9#define MARS_ZFITS
10
11#include <stdexcept>
12
13#include "fits.h"
14#include "huffman.h"
15
16#ifndef __MARS__
17namespace std
18{
19#endif
20
21class zfits : public fits
22{
23public:
24
25 enum CompressionProcess_t
26 {
27 kFactRaw = 0x0,
28 kFactSmoothing = 0x1,
29 kFactHuffman16 = 0x2
30 };
31
32 enum RowOrdering_t
33 {
34 kOrderByCol = 'C',
35 kOrderByRow = 'R'
36 };
37
38 // Basic constructor
39 zfits(const string& fname, const string& tableName="",
40 bool force=false) : fits(fname, tableName, force),
41 fNumTiles(0),
42 fNumRowsPerTile(0),
43 fCurrentRow(-1),
44 fHeapOff(0),
45 fTileSize(0)
46 {
47 InitCompressionReading();
48 }
49
50 // Alternative contstructor
51 zfits(const string& fname, const string& fout, const string& tableName,
52 bool force=false) : fits(fname, fout, tableName, force),
53 fNumTiles(0),
54 fNumRowsPerTile(0),
55 fCurrentRow(-1),
56 fHeapOff(0),
57 fTileSize(0)
58 {
59 InitCompressionReading();
60 }
61
62 // Skip the next row
63 bool SkipNextRow()
64 {
65 if (!fTable.is_compressed)
66 return fits::SkipNextRow();
67
68 fRow++;
69 return true;
70 }
71
72 virtual bool IsFileOk() const
73 {
74 bool rawsum = true;
75
76 if (HasKey("RAWSUM"))
77 {
78 ostringstream str;
79 str << fRawsum.val();
80 rawsum = (GetStr("RAWSUM") == str.str());
81 }
82
83 return fits::IsFileOk() && rawsum;
84 };
85
86protected:
87
88 // Stage the requested row to internal buffer
89 // Does NOT return data to users
90 virtual void StageRow(size_t row, char* dest)
91 {
92 if (!fTable.is_compressed)
93 {
94 fits::StageRow(row, dest);
95 return;
96 }
97
98 ReadBinaryRow(row, dest);
99 }
100
101private:
102#ifndef __CINT__
103 //Structure helper for reading tiles headers
104 struct TileHeader
105 {
106 char id[4];
107 uint32_t numRows;
108 uint64_t size;
109
110 TileHeader() {}
111
112 TileHeader(uint32_t nRows,
113 uint64_t s) : id({'T', 'I', 'L', 'E'}),
114 numRows(nRows),
115 size(s)
116 { };
117 } __attribute__((__packed__));
118
119 //Structure helper for reading blocks headers and compresion schemes
120 struct BlockHeader
121 {
122 uint64_t size;
123 char ordering;
124 unsigned char numProcs;
125 uint16_t processings[];
126
127 BlockHeader(uint64_t s=0,
128 char o=kOrderByRow,
129 unsigned char n=1) : size(s),
130 ordering(o),
131 numProcs(n)
132 {}
133 } __attribute__((__packed__));
134#endif
135 // Do what it takes to initialize the compressed structured
136 void InitCompressionReading()
137 {
138 if (!fTable.is_compressed)
139 return;
140
141 //The constructor may have failed
142 if (!good())
143 return;
144
145 if (fTable.is_compressed)
146 for (auto it=fTable.sorted_cols.begin(); it!= fTable.sorted_cols.end(); it++)
147 {
148 if (it->comp == kCompFACT)
149 continue;
150
151 clear(rdstate()|ios::badbit);
152#ifdef __EXCEPTIONS
153 throw runtime_error("Only the FACT compression scheme is handled by this reader.");
154#else
155 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << endl;
156 return;
157#endif
158 }
159
160 fColumnOrdering.resize(fTable.sorted_cols.size());
161 for (auto it=fColumnOrdering.begin(); it != fColumnOrdering.end(); it++)
162 (*it) = kOrderByRow;
163 //Get compressed specific keywords
164 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
165 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
166
167 //give it some space for uncompressing
168 AllocateBuffers();
169
170 //read the file's catalog
171 ReadCatalog();
172
173 //check that heap agrees with head
174 //CheckIfFileIsConsistent();
175 }
176
177 // Copy decompressed data to location requested by user
178 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
179 {
180 if (!fTable.is_compressed)
181 {
182 fits::MoveColumnDataToUserSpace(dest, src, c);
183 return;
184 }
185
186 memcpy(dest, src, c.num*c.size);
187 }
188
189 vector<char> fBuffer; ///<store the uncompressed rows
190 vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
191 vector<char> fCompressedBuffer; ///<compressed rows
192 vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
193
194 size_t fNumTiles; ///< Total number of tiles
195 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
196 int64_t fCurrentRow; ///< current row in memory signed because we need -1
197
198 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
199 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
200
201 vector<vector<pair<int64_t, int64_t>>> fCatalog;///< Catalog, i.e. the main table that points to the compressed data.
202 vector<size_t> fTileSize; ///< size in bytes of each compressed tile
203 vector<vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
204
205 Checksum fRawsum; ///< Checksum of the uncompressed, raw data
206
207 // Get buffer space
208 void AllocateBuffers()
209 {
210 fBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
211
212 fTransposedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
213 fCompressedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile +
214 fTable.num_cols*(sizeof(BlockHeader)+256) + //use a bit more memory for block headers. 256 char coding the compression sequence max.
215 sizeof(TileHeader), //a bit more for the tile headers
216 8); //and a bit more for checksuming
217 }
218
219 // Read catalog data. I.e. the address of the compressed data inside the heap
220 void ReadCatalog()
221 {
222 vector<char> readBuf(16);
223 fCatalog.resize(fNumTiles);
224
225 const streampos catalogStart = tellg();
226
227fChkData.reset();
228
229 //do the actual reading
230 for (uint32_t i=0;i<fNumTiles;i++)
231 for (uint32_t j=0;j<fTable.num_cols;j++)
232 {
233 read(readBuf.data(), 2*sizeof(int64_t));
234 fChkData.add(readBuf);
235 //swap the bytes
236 int64_t tempValues[2] = {0,0};
237 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
238 if (tempValues[0] < 0 || tempValues[1] < 0)
239 {
240 clear(rdstate()|ios::badbit);
241#ifdef __EXCEPTIONS
242 throw runtime_error("Negative value in the catalog");
243#else
244 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
245 return;
246#endif
247 }
248 //add catalog entry
249 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
250 }
251
252 //compute the total size of each compressed tile
253 fTileSize.resize(fNumTiles);
254 fTileOffsets.resize(fNumTiles);
255 for (uint32_t i=0;i<fNumTiles;i++)
256 {
257 fTileSize[i] = 0;
258 for (uint32_t j=0;j<fTable.num_cols;j++)
259 {
260 fTileSize[i] += fCatalog[i][j].first;
261 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
262 }
263 }
264 //see if there is a gap before heap data
265 fHeapOff = tellg()+fTable.GetHeapShift();
266 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
267
268 if (!fCopy.is_open())
269 return;
270
271 //write catalog and heap gap to target file
272 seekg(catalogStart);
273
274 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
275
276 vector<char> buf(catSize);
277 read(buf.data(), catSize);
278
279 fCopy.write(buf.data(), catSize);
280 if (!fCopy)
281 clear(rdstate()|ios::badbit);
282 }
283
284 //overrides fits.h method with empty one
285 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
286 virtual void WriteRowToCopyFile(size_t row)
287 {
288 if (row == fRow+1)
289 fRawsum.add(fBufferRow, false);
290 }
291
292 // Compressed version of the read row
293 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
294 {
295 if (rowNum >= GetNumRows())
296 return false;
297
298 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
299 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
300
301 bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1));
302
303 fCurrentRow = rowNum;
304 //should we read yet another chunk of data ?
305 if (requestedTile != currentTile)
306 {
307 //read yet another chunk from the file
308 const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(TileHeader);
309
310 //skip to the beginning of the tile
311 const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(TileHeader);
312
313 seekg(fHeapOff+tileStart);
314
315 //calculate the 32 bits offset of the current tile.
316 const uint32_t offset = (tileStart + fHeapFromDataStart)%4;
317
318 //point the tile header where it should be
319 //we ain't checking the header now
320// TileHeader* tHead = reinterpret_cast<TileHeader*>(fCompressedBuffer.data()+offset);
321
322 ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8));
323
324 //read one tile from disk
325 read(fCompressedBuffer.data()+offset, sizeToRead);
326
327 if (addCheckSum)
328 fChkData.add(fCompressedBuffer);
329
330 if (requestedTile == currentTile+1 &&
331 fCopy.is_open() &&
332 fCopy.good())
333 {
334 fCopy.write(fCompressedBuffer.data()+offset, sizeToRead);
335 if (!fCopy)
336 clear(rdstate()|ios::badbit);
337 }
338 else
339 if (fCopy.is_open())
340 clear(rdstate()|ios::badbit);
341
342 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
343
344 //uncompress it
345 UncompressBuffer(requestedTile, thisRoundNumRows, offset+sizeof(TileHeader));
346
347 // pointer to column (source buffer)
348 const char *src = fTransposedBuffer.data();
349
350 uint32_t i=0;
351 for (auto it=fTable.sorted_cols.begin(); it!=fTable.sorted_cols.end(); it++, i++)
352 {
353 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
354
355 switch (fColumnOrdering[i])
356 {
357 case kOrderByRow:
358 // regular, "semi-transposed" copy
359 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
360 {
361 memcpy(dest, src, it->bytes);
362 src += it->bytes; // next column
363 }
364 break;
365
366 case kOrderByCol:
367 // transposed copy
368 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
369 {
370 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
371 {
372 memcpy(dest, src, it->size);
373 src += it->size; // next element
374 }
375 }
376 break;
377 default:
378 clear(rdstate()|ios::badbit);
379 #ifdef __EXCEPTIONS
380 throw runtime_error("Unkown column ordering scheme found");
381 #else
382 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
383 return false;
384 #endif
385 break;
386 };
387 }
388 }
389
390 //Data loaded and uncompressed. Copy it to destination
391 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
392 return good();
393 }
394
395 // Read a bunch of uncompressed data
396 uint32_t UncompressUNCOMPRESSED(char* dest,
397 const char* src,
398 uint32_t numElems,
399 uint32_t sizeOfElems)
400 {
401 memcpy(dest, src, numElems*sizeOfElems);
402 return numElems*sizeOfElems;
403 }
404
405 // Read a bunch of data compressed with the Huffman algorithm
406 uint32_t UncompressHUFFMAN16(char* dest,
407 const char* src,
408 uint32_t numChunks)
409 {
410 vector<uint16_t> uncompressed;
411
412 //read compressed sizes (one per row)
413 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
414 src += sizeof(uint32_t)*numChunks;
415
416 //uncompress the rows, one by one
417 uint32_t sizeWritten = 0;
418 for (uint32_t j=0;j<numChunks;j++)
419 {
420 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
421
422 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
423
424 sizeWritten += uncompressed.size()*sizeof(uint16_t);
425 dest += uncompressed.size()*sizeof(uint16_t);
426 src += compressedSizes[j];
427 }
428 return sizeWritten;
429 }
430
431 // Apply the inverse transform of the integer smoothing
432 uint32_t UnApplySMOOTHING(int16_t* data,
433 uint32_t numElems)
434 {
435 //un-do the integer smoothing
436 for (uint32_t j=2;j<numElems;j++)
437 data[j] = data[j] + (data[j-1]+data[j-2])/2;
438
439 return numElems*sizeof(uint16_t);
440 }
441
442 // Data has been read from disk. Uncompress it !
443 void UncompressBuffer(const uint32_t &catalogCurrentRow,
444 const uint32_t &thisRoundNumRows,
445 const uint32_t offset)
446 {
447 char *dest = fTransposedBuffer.data();
448
449 //uncompress column by column
450 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
451 {
452 const fits::Table::Column &col = fTable.sorted_cols[i];
453 if (col.num == 0)
454 continue;
455
456 //get the compression flag
457 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i]+offset;
458
459 const BlockHeader* head = reinterpret_cast<BlockHeader*>(&fCompressedBuffer[compressedOffset]);
460
461 fColumnOrdering[i] = head->ordering;
462
463 const uint32_t numRows = (head->ordering==kOrderByRow) ? thisRoundNumRows : col.num;
464 const uint32_t numCols = (head->ordering==kOrderByCol) ? thisRoundNumRows : col.num;
465
466 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(BlockHeader)+sizeof(uint16_t)*head->numProcs;
467
468 for (int32_t j=head->numProcs-1;j >= 0; j--)
469 {
470 uint32_t sizeWritten=0;
471
472 switch (head->processings[j])
473 {
474 case kFactRaw:
475 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
476 break;
477
478 case kFactSmoothing:
479 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
480 break;
481
482 case kFactHuffman16:
483 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
484 break;
485
486 default:
487 clear(rdstate()|ios::badbit);
488#ifdef __EXCEPTIONS
489 throw runtime_error("Unknown processing applied to data. Aborting");
490#else
491 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
492 return;
493#endif
494 }
495 //increment destination counter only when processing done.
496 if (j==0)
497 dest+= sizeWritten;
498 }
499 }
500 }
501
502 void CheckIfFileIsConsistent()
503 {
504 //goto start of heap
505 streamoff whereAreWe = tellg();
506 seekg(fHeapOff);
507
508 //init number of rows to zero
509 uint64_t numRows = 0;
510
511 //get number of columns from header
512 size_t numCols = fTable.num_cols;
513
514 vector<vector<pair<int64_t, int64_t> > > catalog;
515
516 TileHeader tileHead;
517 BlockHeader columnHead;
518 streamoff offsetInHeap = 0;
519 //skip through the heap
520 while (true)
521 {
522 read((char*)(&tileHead), sizeof(TileHeader));
523 //end of file
524 if (!good())
525 break;
526 //padding or corrupt data
527 if (memcmp(tileHead.id, "TILE", 4))
528 {
529 clear(rdstate()|ios::badbit);
530 break;
531 }
532
533 //a new tile begins here
534 catalog.push_back(vector<pair<int64_t, int64_t> >(0));
535 offsetInHeap += sizeof(TileHeader);
536
537 //skip through the columns
538 for (size_t i=0;i<numCols;i++)
539 {
540 //zero sized column do not have headers. Skip it
541 if (fTable.sorted_cols[i].num == 0)
542 {
543 catalog.back().push_back(make_pair(0,0));
544 continue;
545 }
546 //read column header
547 read((char*)(&columnHead), sizeof(BlockHeader));
548 //corrupted tile
549 if (!good())
550 break;
551 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
552 offsetInHeap += columnHead.size;
553 seekg(fHeapOff+offsetInHeap);
554 }
555
556 //if we ain't good, this means that something went wrong inside the current tile.
557 if (!good())
558 {
559 catalog.pop_back();
560 break;
561 }
562
563 //current tile is complete. Add rows
564 numRows += tileHead.numRows;
565 }
566
567 if (catalog.size() != fCatalog.size() ||
568 numRows != fTable.num_rows)
569 {
570 clear(rdstate()|ios::badbit);
571#ifdef __EXCEPTIONS
572 throw runtime_error("Heap data does not agree with header.");
573#else
574 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
575 return;
576#endif
577 }
578
579 for (uint32_t i=0;i<catalog.size(); i++)
580 for (uint32_t j=0;j<numCols;j++)
581 {
582 if (catalog[i][j].first != fCatalog[i][j].first ||
583 catalog[i][j].second != fCatalog[i][j].second)
584 {
585 clear(rdstate()|ios::badbit);
586#ifdef __EXCEPTIONS
587 throw runtime_error("Heap data does not agree with header.");
588#else
589 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
590 return;
591#endif
592 }
593 }
594 //go back to start of heap
595 seekg(whereAreWe);
596 }
597
598};//class zfits
599
600#ifndef __MARS__
601}; //namespace std
602#endif
603
604#endif
Note: See TracBrowser for help on using the repository browser.