source: trunk/Mars/mcore/zfits.h@ 16896

Last change on this file since 16896 was 16890, checked in by tbretz, 11 years ago
Consistency of member names
File size: 19.2 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_ZFITS
9#define MARS_ZFITS
10
11#include <stdexcept>
12
13#include "fits.h"
14#include "huffman.h"
15
16#ifndef __MARS__
17namespace std
18{
19#endif
20
21class zfits : public fits
22{
23public:
24
25 enum CompressionProcess_t
26 {
27 kFactRaw = 0x0,
28 kFactSmoothing = 0x1,
29 kFactHuffman16 = 0x2
30 };
31
32 enum RowOrdering_t
33 {
34 kOrderByCol = 'C',
35 kOrderByRow = 'R'
36 };
37
38 // Basic constructor
39 zfits(const string& fname, const string& tableName="",
40 bool force=false) : fits(fname, tableName, force),
41 fNumTiles(0),
42 fNumRowsPerTile(0),
43 fCurrentRow(-1),
44 fHeapOff(0),
45 fTileSize(0)
46 {
47 InitCompressionReading();
48 }
49
50 // Alternative contstructor
51 zfits(const string& fname, const string& fout, const string& tableName,
52 bool force=false) : fits(fname, fout, tableName, force),
53 fNumTiles(0),
54 fNumRowsPerTile(0),
55 fCurrentRow(-1),
56 fHeapOff(0),
57 fTileSize(0)
58 {
59 InitCompressionReading();
60 }
61
62 // Skip the next row
63 bool SkipNextRow()
64 {
65 if (!fTable.is_compressed)
66 return fits::SkipNextRow();
67
68 fRow++;
69 return true;
70 }
71
72protected:
73
74 // Stage the requested row to internal buffer
75 // Does NOT return data to users
76 virtual void StageRow(size_t row, char* dest)
77 {
78 if (!fTable.is_compressed)
79 {
80 fits::StageRow(row, dest);
81 return;
82 }
83
84 ReadBinaryRow(row, dest);
85 }
86
87private:
88#ifndef __CINT__
89 //Structure helper for reading tiles headers
90 struct TileHeader
91 {
92 char id[4];
93 uint32_t numRows;
94 uint64_t size;
95
96 TileHeader() {}
97
98 TileHeader(uint32_t nRows,
99 uint64_t s) : id({'T', 'I', 'L', 'E'}),
100 numRows(nRows),
101 size(s)
102 { };
103 } __attribute__((__packed__));
104
105 //Structure helper for reading blocks headers and compresion schemes
106 struct BlockHeader
107 {
108 uint64_t size;
109 char ordering;
110 unsigned char numProcs;
111 uint16_t processings[];
112
113 BlockHeader(uint64_t s=0,
114 char o=kOrderByRow,
115 unsigned char n=1) : size(s),
116 ordering(o),
117 numProcs(n)
118 {}
119 } __attribute__((__packed__));
120#endif
121 // Do what it takes to initialize the compressed structured
122 void InitCompressionReading()
123 {
124 if (!fTable.is_compressed)
125 return;
126
127 //The constructor may have failed
128 if (!good())
129 return;
130
131 if (fTable.is_compressed)
132 {
133 for (auto it=fTable.sorted_cols.begin(); it!= fTable.sorted_cols.end(); it++)
134 {
135 if (it->comp == kCompFACT)
136 continue;
137
138 clear(rdstate()|ios::badbit);
139#ifdef __EXCEPTIONS
140 throw runtime_error("Only the FACT compression scheme is handled by this reader.");
141#else
142 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << endl;
143 return;
144#endif
145 }
146 }
147
148 fColumnOrdering.resize(fTable.sorted_cols.size());
149 for (auto it=fColumnOrdering.begin(); it != fColumnOrdering.end(); it++)
150 (*it) = kOrderByRow;
151 //Get compressed specific keywords
152 fNumTiles = fTable.is_compressed ? GetInt("NAXIS2") : 0;
153 fNumRowsPerTile = fTable.is_compressed ? GetInt("ZTILELEN") : 0;
154
155 //give it some space for uncompressing
156 AllocateBuffers();
157
158 //read the file's catalog
159 ReadCatalog();
160
161 //check that heap agrees with head
162 //CheckIfFileIsConsistent();
163 }
164
165 // Copy decompressed data to location requested by user
166 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
167 {
168 if (!fTable.is_compressed)
169 {
170 fits::MoveColumnDataToUserSpace(dest, src, c);
171 return;
172 }
173
174 memcpy(dest, src, c.num*c.size);
175 }
176
177 vector<char> fBuffer; ///<store the uncompressed rows
178 vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
179 vector<char> fCompressedBuffer; ///<compressed rows
180 vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
181
182 size_t fNumTiles; ///< Total number of tiles
183 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
184 int64_t fCurrentRow; ///< current row in memory signed because we need -1
185
186 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
187 streamoff fHeapFromDataStart; ///< offset from the beginning of the data table
188
189 vector<vector<pair<int64_t, int64_t>>> fCatalog;///< Catalog, i.e. the main table that points to the compressed data.
190 vector<size_t> fTileSize; ///< size in bytes of each compressed tile
191 vector<vector<size_t>> fTileOffsets; ///< offset from start of tile of a given compressed column
192
193 // Get buffer space
194 void AllocateBuffers()
195 {
196 fBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
197
198 fTransposedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
199 fCompressedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile +
200 fTable.num_cols*(sizeof(BlockHeader)+256) + //use a bit more memory for block headers. 256 char coding the compression sequence max.
201 sizeof(TileHeader), //a bit more for the tile headers
202 8); //and a bit more for checksuming
203 }
204
205 // Read catalog data. I.e. the address of the compressed data inside the heap
206 void ReadCatalog()
207 {
208 vector<char> readBuf(16);
209 fCatalog.resize(fNumTiles);
210
211 const streampos catalogStart = tellg();
212
213fChkData.reset();
214
215 //do the actual reading
216 for (uint32_t i=0;i<fNumTiles;i++)
217 for (uint32_t j=0;j<fTable.num_cols;j++)
218 {
219 read(readBuf.data(), 2*sizeof(int64_t));
220 fChkData.add(readBuf);
221 //swap the bytes
222 int64_t tempValues[2] = {0,0};
223 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf.data(), 2);
224 if (tempValues[0] < 0 || tempValues[1] < 0)
225 {
226 clear(rdstate()|ios::badbit);
227#ifdef __EXCEPTIONS
228 throw runtime_error("Negative value in the catalog");
229#else
230 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
231 return;
232#endif
233 }
234 //add catalog entry
235 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
236 }
237
238 //compute the total size of each compressed tile
239 fTileSize.resize(fNumTiles);
240 fTileOffsets.resize(fNumTiles);
241 for (uint32_t i=0;i<fNumTiles;i++)
242 {
243 fTileSize[i] = 0;
244 for (uint32_t j=0;j<fTable.num_cols;j++)
245 {
246 fTileSize[i] += fCatalog[i][j].first;
247 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
248 }
249 }
250 //see if there is a gap before heap data
251 fHeapOff = tellg()+fTable.GetHeapShift();
252 fHeapFromDataStart = fNumTiles*fTable.num_cols*2*sizeof(int64_t) + fTable.GetHeapShift();
253
254 if (!fCopy.is_open())
255 return;
256
257 //write catalog and heap gap to target file
258 seekg(catalogStart);
259
260 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
261
262 vector<char> buf(catSize);
263 read(buf.data(), catSize);
264
265 fCopy.write(buf.data(), catSize);
266 if (!fCopy)
267 clear(rdstate()|ios::badbit);
268 }
269
270 //overrides fits.h method with empty one
271 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
272 virtual void WriteRowToCopyFile(size_t )
273 {
274
275 }
276
277 // Compressed version of the read row
278 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
279 {
280 if (rowNum >= GetNumRows())
281 return false;
282
283 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
284 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
285
286 bool addCheckSum = ((requestedTile == currentTile+1) || (fCurrentRow == -1));
287
288 fCurrentRow = rowNum;
289 //should we read yet another chunk of data ?
290 if (requestedTile != currentTile)
291 {
292 //read yet another chunk from the file
293 const int64_t sizeToRead = fTileSize[requestedTile] + sizeof(TileHeader);
294
295 //skip to the beginning of the tile
296 const int64_t tileStart = fCatalog[requestedTile][0].second - sizeof(TileHeader);
297
298 seekg(fHeapOff+tileStart);
299
300 //calculate the 32 bits offset of the current tile.
301 const uint32_t offset = (tileStart + fHeapFromDataStart)%4;
302
303 //point the tile header where it should be
304 //we ain't checking the header now
305// TileHeader* tHead = reinterpret_cast<TileHeader*>(fCompressedBuffer.data()+offset);
306
307 ZeroBufferForChecksum(fCompressedBuffer, fCompressedBuffer.size()-(sizeToRead+offset+8));
308
309 //read one tile from disk
310 read(fCompressedBuffer.data()+offset, sizeToRead);
311
312 if (addCheckSum)
313 fChkData.add(fCompressedBuffer);
314
315 if (requestedTile == currentTile+1 &&
316 fCopy.is_open() &&
317 fCopy.good())
318 {
319 fCopy.write(fCompressedBuffer.data()+offset, sizeToRead);
320 if (!fCopy)
321 clear(rdstate()|ios::badbit);
322 }
323 else
324 if (fCopy.is_open())
325 clear(rdstate()|ios::badbit);
326
327 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
328
329 //uncompress it
330 UncompressBuffer(requestedTile, thisRoundNumRows, offset+sizeof(TileHeader));
331
332 // pointer to column (source buffer)
333 const char *src = fTransposedBuffer.data();
334
335 uint32_t i=0;
336 for (auto it=fTable.sorted_cols.begin(); it!=fTable.sorted_cols.end(); it++, i++)
337 {
338 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
339
340 switch (fColumnOrdering[i])
341 {
342 case kOrderByRow:
343 // regular, "semi-transposed" copy
344 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
345 {
346 memcpy(dest, src, it->bytes);
347 src += it->bytes; // next column
348 }
349 break;
350
351 case kOrderByCol:
352 // transposed copy
353 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
354 {
355 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
356 {
357 memcpy(dest, src, it->size);
358 src += it->size; // next element
359 }
360 }
361 break;
362 default:
363 clear(rdstate()|ios::badbit);
364 #ifdef __EXCEPTIONS
365 throw runtime_error("Unkown column ordering scheme found");
366 #else
367 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
368 return false;
369 #endif
370 break;
371 };
372 }
373 }
374
375 //Data loaded and uncompressed. Copy it to destination
376 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
377 return good();
378 }
379
380 // Read a bunch of uncompressed data
381 uint32_t UncompressUNCOMPRESSED(char* dest,
382 const char* src,
383 uint32_t numElems,
384 uint32_t sizeOfElems)
385 {
386 memcpy(dest, src, numElems*sizeOfElems);
387 return numElems*sizeOfElems;
388 }
389
390 // Read a bunch of data compressed with the Huffman algorithm
391 uint32_t UncompressHUFFMAN16(char* dest,
392 const char* src,
393 uint32_t numChunks)
394 {
395 vector<uint16_t> uncompressed;
396
397 //read compressed sizes (one per row)
398 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
399 src += sizeof(uint32_t)*numChunks;
400
401 //uncompress the rows, one by one
402 uint32_t sizeWritten = 0;
403 for (uint32_t j=0;j<numChunks;j++)
404 {
405 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
406
407 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
408
409 sizeWritten += uncompressed.size()*sizeof(uint16_t);
410 dest += uncompressed.size()*sizeof(uint16_t);
411 src += compressedSizes[j];
412 }
413 return sizeWritten;
414 }
415
416 // Apply the inverse transform of the integer smoothing
417 uint32_t UnApplySMOOTHING(int16_t* data,
418 uint32_t numElems)
419 {
420 //un-do the integer smoothing
421 for (uint32_t j=2;j<numElems;j++)
422 data[j] = data[j] + (data[j-1]+data[j-2])/2;
423
424 return numElems*sizeof(uint16_t);
425 }
426
427 // Data has been read from disk. Uncompress it !
428 void UncompressBuffer(const uint32_t &catalogCurrentRow,
429 const uint32_t &thisRoundNumRows,
430 const uint32_t offset)
431 {
432 char *dest = fTransposedBuffer.data();
433
434 //uncompress column by column
435 for (uint32_t i=0; i<fTable.sorted_cols.size(); i++)
436 {
437 const fits::Table::Column &col = fTable.sorted_cols[i];
438 if (col.num == 0)
439 continue;
440
441 //get the compression flag
442 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i]+offset;
443
444 const BlockHeader* head = reinterpret_cast<BlockHeader*>(&fCompressedBuffer[compressedOffset]);
445
446 fColumnOrdering[i] = head->ordering;
447
448 const uint32_t numRows = (head->ordering==kOrderByRow) ? thisRoundNumRows : col.num;
449 const uint32_t numCols = (head->ordering==kOrderByCol) ? thisRoundNumRows : col.num;
450
451 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(BlockHeader)+sizeof(uint16_t)*head->numProcs;
452
453 for (int32_t j=head->numProcs-1;j >= 0; j--)
454 {
455 uint32_t sizeWritten=0;
456
457 switch (head->processings[j])
458 {
459 case kFactRaw:
460 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
461 break;
462
463 case kFactSmoothing:
464 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
465 break;
466
467 case kFactHuffman16:
468 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
469 break;
470
471 default:
472 clear(rdstate()|ios::badbit);
473#ifdef __EXCEPTIONS
474 throw runtime_error("Unknown processing applied to data. Aborting");
475#else
476 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
477 return;
478#endif
479 }
480 //increment destination counter only when processing done.
481 if (j==0)
482 dest+= sizeWritten;
483 }
484 }
485 }
486
487 void CheckIfFileIsConsistent()
488 {
489 //goto start of heap
490 streamoff whereAreWe = tellg();
491 seekg(fHeapOff);
492
493 //init number of rows to zero
494 uint64_t numRows = 0;
495
496 //get number of columns from header
497 size_t numCols = fTable.num_cols;
498
499 vector<vector<pair<int64_t, int64_t> > > catalog;
500
501 TileHeader tileHead;
502 BlockHeader columnHead;
503 streamoff offsetInHeap = 0;
504 //skip through the heap
505 while (true)
506 {
507 read((char*)(&tileHead), sizeof(TileHeader));
508 //end of file
509 if (!good())
510 break;
511 //padding or corrupt data
512 if (memcmp(tileHead.id, "TILE", 4))
513 {
514 clear(rdstate()|ios::badbit);
515 break;
516 }
517
518 //a new tile begins here
519 catalog.push_back(vector<pair<int64_t, int64_t> >(0));
520 offsetInHeap += sizeof(TileHeader);
521
522 //skip through the columns
523 for (size_t i=0;i<numCols;i++)
524 {
525 //zero sized column do not have headers. Skip it
526 if (fTable.sorted_cols[i].num == 0)
527 {
528 catalog.back().push_back(make_pair(0,0));
529 continue;
530 }
531 //read column header
532 read((char*)(&columnHead), sizeof(BlockHeader));
533 //corrupted tile
534 if (!good())
535 break;
536 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
537 offsetInHeap += columnHead.size;
538 seekg(fHeapOff+offsetInHeap);
539 }
540
541 //if we ain't good, this means that something went wrong inside the current tile.
542 if (!good())
543 {
544 catalog.pop_back();
545 break;
546 }
547
548 //current tile is complete. Add rows
549 numRows += tileHead.numRows;
550 }
551
552 if (catalog.size() != fCatalog.size() ||
553 numRows != fTable.num_rows)
554 {
555 clear(rdstate()|ios::badbit);
556#ifdef __EXCEPTIONS
557 throw runtime_error("Heap data does not agree with header.");
558#else
559 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
560 return;
561#endif
562 }
563
564 for (uint32_t i=0;i<catalog.size(); i++)
565 for (uint32_t j=0;j<numCols;j++)
566 {
567 if (catalog[i][j].first != fCatalog[i][j].first ||
568 catalog[i][j].second != fCatalog[i][j].second)
569 {
570 clear(rdstate()|ios::badbit);
571#ifdef __EXCEPTIONS
572 throw runtime_error("Heap data does not agree with header.");
573#else
574 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
575 return;
576#endif
577 }
578 }
579 //go back to start of heap
580 seekg(whereAreWe);
581 }
582
583};//class zfits
584
585#ifndef __MARS__
586}; //namespace std
587#endif
588
589#endif
Note: See TracBrowser for help on using the repository browser.