source: trunk/Mars/mcore/zfits.h@ 16856

Last change on this file since 16856 was 16856, checked in by tbretz, 11 years ago
Forgot this one...
File size: 17.9 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_ZFITS
9#define MARS_ZFITS
10
11#include <stdexcept>
12
13#include "fits.h"
14#include "huffman.h"
15
16
17#ifndef __MARS__
18namespace std
19{
20#endif
21
22class zfits : public fits
23{
24public:
25
26 // Basic constructor
27 zfits(const string& fname, const string& tableName="",
28 bool force=false) : fits(fname, tableName, force),
29 fNumTiles(0),
30 fNumRowsPerTile(0),
31 fCurrentRow(-1),
32 fHeapOff(0),
33 fTileSize(0)
34 {
35 InitCompressionReading();
36 }
37
38 // Alternative contstructor
39 zfits(const string& fname, const string& fout, const string& tableName,
40 bool force=false) : fits(fname, fout, tableName, force),
41 fNumTiles(0),
42 fNumRowsPerTile(0),
43 fCurrentRow(-1),
44 fHeapOff(0),
45 fTileSize(0)
46 {
47 InitCompressionReading();
48 }
49
50 // Skip the next row
51 bool SkipNextRow()
52 {
53 if (!fTable.isCompressed)
54 return fits::SkipNextRow();
55
56 fRow++;
57 return true;
58 }
59protected:
60
61 // Stage the requested row to internal buffer
62 // Does NOT return data to users
63 virtual void StageRow(size_t row, char* dest)
64 {
65 if (!fTable.isCompressed)
66 {
67 fits::StageRow(row, dest);
68 return;
69 }
70
71 ReadBinaryRow(row, dest);
72 }
73
74private:
75#ifndef __CINT__
76 //Structure helper for reading tiles headers
77 struct TileHeader
78 {
79 char id[4];
80 uint32_t numRows;
81 uint64_t size;
82
83 TileHeader() {}
84
85 TileHeader(uint32_t nRows,
86 uint64_t s) : id({'T', 'I', 'L', 'E'}),
87 numRows(nRows),
88 size(s)
89 { };
90 } __attribute__((__packed__));
91
92 //Structure helper for reading blocks headers and compresion schemes
93 struct BlockHeader
94 {
95 uint64_t size;
96 char ordering;
97 unsigned char numProcs;
98 uint16_t processings[];
99
100 BlockHeader(uint64_t s=0,
101 char o='R'/*FACT_ROW_MAJOR*/,
102 unsigned char n=1) : size(s),
103 ordering(o),
104 numProcs(n)
105 {}
106 } __attribute__((__packed__));
107#endif
108 // Do what it takes to initialize the compressed structured
109 void InitCompressionReading()
110 {
111 if (!fTable.isCompressed)
112 return;
113
114 //The constructor may have failed
115 if (!good())
116 return;
117
118 if (fTable.isCompressed)
119 for (auto it=fTable.sortedCols.begin(); it!= fTable.sortedCols.end(); it++)
120 if (it->comp != FACT)
121 {
122 clear(rdstate()|ios::badbit);
123#ifdef __EXCEPTIONS
124 throw runtime_error("Only the FACT compression scheme is handled by this reader.");
125#else
126 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << endl;
127 return;
128#endif
129 }
130
131 fColumnOrdering.resize(fTable.sortedCols.size());
132 for (auto it=fColumnOrdering.begin(); it != fColumnOrdering.end(); it++)
133 (*it) = 'R';//FACT_ROW_MAJOR;
134
135 //Get compressed specific keywords
136 fNumTiles = fTable.isCompressed ? GetInt("NAXIS2") : 0;
137 fNumRowsPerTile = fTable.isCompressed ? GetInt("ZTILELEN") : 0;
138
139 //give it some space for uncompressing
140 AllocateBuffers();
141
142 //read the file's catalog
143 ReadCatalog();
144
145 //check that heap agrees with head
146 //CheckIfFileIsConsistent();
147 }
148
149 // Copy decompressed data to location requested by user
150 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
151 {
152 if (!fTable.isCompressed)
153 {
154 fits::MoveColumnDataToUserSpace(dest, src, c);
155 return;
156 }
157
158 memcpy(dest, src, c.num*c.size);
159 }
160
161 vector<char> fBuffer; ///<store the uncompressed rows
162 vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
163 vector<char> fCompressedBuffer; ///<compressed rows
164 vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
165
166 size_t fNumTiles; ///< Total number of tiles
167 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
168 size_t fCurrentRow; ///< current row in memory.
169
170 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
171
172 vector<vector<pair<int64_t, int64_t> > > fCatalog;///< Catalog, i.e. the main table that points to the compressed data.
173 vector<size_t> fTileSize; ///< size in bytes of each compressed tile
174 vector<vector<size_t> > fTileOffsets; ///< offset from start of tile of a given compressed column
175
176 // Get buffer space
177 void AllocateBuffers()
178 {
179 fBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
180
181 fTransposedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
182 fCompressedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile + fTable.num_cols*(sizeof(BlockHeader)+256)); //use a bit more memory for block headers
183 }
184
185 // Read catalog data. I.e. the address of the compressed data inside the heap
186 void ReadCatalog()
187 {
188 char readBuf[16];
189 fCatalog.resize(fNumTiles);
190
191 const streampos catalogStart = tellg();
192
193 //do the actual reading
194 for (uint32_t i=0;i<fNumTiles;i++)
195 for (uint32_t j=0;j<fTable.num_cols;j++)
196 {
197 read(readBuf, 2*sizeof(int64_t));
198
199 //swap the bytes
200 int64_t tempValues[2] = {0,0};
201 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf, 2);
202 if (tempValues[0] < 0 || tempValues[1] < 0)
203 {
204 clear(rdstate()|ios::badbit);
205#ifdef __EXCEPTIONS
206 throw runtime_error("Negative value in the catalog");
207#else
208 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
209 return;
210#endif
211 }
212 //add catalog entry
213 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
214 }
215
216 //compute the total size of each compressed tile
217 fTileSize.resize(fNumTiles);
218 fTileOffsets.resize(fNumTiles);
219 for (uint32_t i=0;i<fNumTiles;i++)
220 {
221 fTileSize[i] = 0;
222 for (uint32_t j=0;j<fTable.num_cols;j++)
223 {
224 fTileSize[i] += fCatalog[i][j].first;
225 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
226 }
227 }
228 //see if there is a gap before heap data
229 fHeapOff = tellg()+fTable.GetHeapShift();
230
231 if (!fCopy.is_open())
232 return;
233
234 //write catalog and heap gap to target file
235 seekg(catalogStart);
236
237 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
238
239 vector<char> buf(catSize);
240 read(buf.data(), catSize);
241
242 fCopy.write(buf.data(), catSize);
243 if (!fCopy)
244 clear(rdstate()|ios::badbit);
245 }
246
247 //overrides fits.h method with empty one
248 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
249 virtual void WriteRowToCopyFile(size_t row)
250 {
251 if (row == fRow+1)
252 fChkData.add(fBufferRow);
253 }
254
255 // Compressed version of the read row
256 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
257 {
258 if (rowNum >= GetNumRows())
259 return false;
260
261 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
262 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
263
264 fCurrentRow = rowNum;
265
266 //should we read yet another chunk of data ?
267 if (requestedTile != currentTile)
268 {
269 //read yet another chunk from the file
270 const int64_t sizeToRead = fTileSize[requestedTile];
271
272 //skip to the beginning of the tile
273 seekg(fHeapOff+fCatalog[requestedTile][0].second - sizeof(TileHeader));
274
275 TileHeader tHead;
276 read((char*)(&tHead), sizeof(TileHeader));
277 read(fCompressedBuffer.data(), sizeToRead);
278
279 if (requestedTile == currentTile+1 &&
280 fCopy.is_open() &&
281 fCopy.good())
282 {
283 fCopy.write((char*)(&tHead), sizeof(TileHeader));
284 fCopy.write(fCompressedBuffer.data(), sizeToRead);
285 if (!fCopy)
286 clear(rdstate()|ios::badbit);
287 }
288 else
289 if (fCopy.is_open())
290 clear(rdstate()|ios::badbit);
291
292 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
293
294 //uncompress it
295 UncompressBuffer(requestedTile, thisRoundNumRows);
296
297 // pointer to column (source buffer)
298 const char *src = fTransposedBuffer.data();
299
300 uint32_t i=0;
301 for (auto it=fTable.sortedCols.begin(); it!=fTable.sortedCols.end(); it++, i++)
302 {
303 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
304
305 switch (fColumnOrdering[i])
306 {
307 case 'R': //FACT_ROW_MAJOR:
308 // regular, "semi-transposed" copy
309 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
310 {
311 memcpy(dest, src, it->bytes);
312 src += it->bytes; // next column
313 }
314 break;
315
316 case 'C'://FACT_COL_MAJOR:
317 // transposed copy
318 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
319 {
320 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
321 {
322 memcpy(dest, src, it->size);
323 src += it->size; // next element
324 }
325 }
326 break;
327 default:
328 clear(rdstate()|ios::badbit);
329 #ifdef __EXCEPTIONS
330 throw runtime_error("Unkown column ordering scheme");
331 #else
332 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
333 return false;
334 #endif
335 break;
336 };
337 }
338 }
339
340 //Data loaded and uncompressed. Copy it to destination
341 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
342 return good();
343 }
344
345 // Read a bunch of uncompressed data
346 uint32_t UncompressUNCOMPRESSED(char* dest,
347 const char* src,
348 uint32_t numElems,
349 uint32_t sizeOfElems)
350 {
351 memcpy(dest, src, numElems*sizeOfElems);
352 return numElems*sizeOfElems;
353 }
354
355 // Read a bunch of data compressed with the Huffman algorithm
356 uint32_t UncompressHUFFMAN16(char* dest,
357 const char* src,
358 uint32_t numChunks)
359 {
360 vector<uint16_t> uncompressed;
361
362 //read compressed sizes (one per row)
363 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
364 src += sizeof(uint32_t)*numChunks;
365
366 //uncompress the rows, one by one
367 uint32_t sizeWritten = 0;
368 for (uint32_t j=0;j<numChunks;j++)
369 {
370 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
371
372 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
373
374 sizeWritten += uncompressed.size()*sizeof(uint16_t);
375 dest += uncompressed.size()*sizeof(uint16_t);
376 src += compressedSizes[j];
377 }
378 return sizeWritten;
379 }
380
381 // Apply the inverse transform of the integer smoothing
382 uint32_t UnApplySMOOTHING(int16_t* data,
383 uint32_t numElems)
384 {
385 //un-do the integer smoothing
386 for (uint32_t j=2;j<numElems;j++)
387 data[j] = data[j] + (data[j-1]+data[j-2])/2;
388
389 return numElems*sizeof(uint16_t);
390 }
391
392 // Data has been read from disk. Uncompress it !
393 void UncompressBuffer(const uint32_t &catalogCurrentRow,
394 const uint32_t &thisRoundNumRows)
395 {
396 char *dest = fTransposedBuffer.data();
397
398 //uncompress column by column
399 for (uint32_t i=0; i<fTable.sortedCols.size(); i++)
400 {
401 const fits::Table::Column &col = fTable.sortedCols[i];
402 if (col.num == 0)
403 continue;
404
405 //get the compression flag
406 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i];
407
408 const BlockHeader* head = reinterpret_cast<BlockHeader*>(&fCompressedBuffer[compressedOffset]);
409
410 fColumnOrdering[i] = head->ordering;
411
412 const uint32_t numRows = (head->ordering=='R'/*FACT_ROW_MAJOR*/) ? thisRoundNumRows : col.num;
413 const uint32_t numCols = (head->ordering=='C'/*FACT_COL_MAJOR*/) ? thisRoundNumRows : col.num;
414
415 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(BlockHeader)+sizeof(uint16_t)*head->numProcs;
416
417 for (int32_t j=head->numProcs-1;j >= 0; j--)
418 {
419 uint32_t sizeWritten=0;
420
421 switch (head->processings[j])
422 {
423 case 0x0://FACT_RAW:
424 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
425 break;
426 case 0x1://FACT_SMOOTHING:
427 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
428 break;
429 case 0x2://FACT_HUFFMAN16:
430 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
431 break;
432 default:
433 clear(rdstate()|ios::badbit);
434#ifdef __EXCEPTIONS
435 throw runtime_error("Unknown processing applied to data. Aborting");
436#else
437 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
438 return;
439#endif
440 }
441 //increment destination counter only when processing done.
442 if (j==0) dest+= sizeWritten;
443 }
444 }
445 }
446
447 void CheckIfFileIsConsistent()
448 {
449 //goto start of heap
450 streamoff whereAreWe = tellg();
451 seekg(fHeapOff);
452
453 //init number of rows to zero
454 uint64_t numRows = 0;
455
456 //get number of columns from header
457 size_t numCols = fTable.num_cols;
458
459 vector<vector<pair<int64_t, int64_t> > > catalog;
460
461 TileHeader tileHead;
462 BlockHeader columnHead;
463 streamoff offsetInHeap = 0;
464 //skip through the heap
465 while (true)
466 {
467 read((char*)(&tileHead), sizeof(TileHeader));
468 //end of file
469 if (!good())
470 break;
471 //padding or corrupt data
472 if (memcmp(tileHead.id, "TILE", 4))
473 {
474 clear(rdstate()|ios::badbit);
475 break;
476 }
477
478 //a new tile begins here
479 catalog.emplace_back(0);//push_back(vector<pair<int64_t, int64_t> >(0));
480 offsetInHeap += sizeof(TileHeader);
481
482 //skip through the columns
483 for (size_t i=0;i<numCols;i++)
484 {
485 //zero sized column do not have headers. Skip it
486 if (fTable.sortedCols[i].num == 0)
487 {
488 catalog.back().emplace_back(0,0);
489 continue;
490 }
491 //read column header
492 read((char*)(&columnHead), sizeof(BlockHeader));
493 //corrupted tile
494 if (!good())
495 break;
496 catalog.back().emplace_back(int64_t(columnHead.size),offsetInHeap);
497 offsetInHeap += columnHead.size;
498 seekg(fHeapOff+offsetInHeap);
499 }
500
501 //if we ain't good, this means that something went wrong inside the current tile.
502 if (!good())
503 {
504 catalog.pop_back();
505 break;
506 }
507
508 //current tile is complete. Add rows
509 numRows += tileHead.numRows;
510 }
511
512 if (catalog.size() != fCatalog.size() ||
513 numRows != fTable.num_rows)
514 {
515 clear(rdstate()|ios::badbit);
516#ifdef __EXCEPTIONS
517 throw runtime_error("Heap data does not agree with header.");
518#else
519 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
520 return;
521#endif
522 }
523
524 for (uint32_t i=0;i<catalog.size(); i++)
525 for (uint32_t j=0;j<numCols;j++)
526 {
527 if (catalog[i][j].first != fCatalog[i][j].first ||
528 catalog[i][j].second != fCatalog[i][j].second)
529 {
530 clear(rdstate()|ios::badbit);
531#ifdef __EXCEPTIONS
532 throw runtime_error("Heap data does not agree with header.");
533#else
534 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
535 return;
536#endif
537 }
538 }
539 //go back to start of heap
540 seekg(whereAreWe);
541 }
542
543};//class zfits
544
545#ifndef __MARS__
546}; //namespace std
547#endif
548
549#endif
Note: See TracBrowser for help on using the repository browser.