source: trunk/Mars/mcore/zfits.h@ 16849

Last change on this file since 16849 was 16844, checked in by lyard, 11 years ago
fixed checksum calculation while reading... hopefully
File size: 18.0 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_ZFITS
9#define MARS_ZFITS
10
11#include <stdexcept>
12
13#include "fits.h"
14#include "huffman.h"
15
16
17#define FACT_RAW 0x0
18#define FACT_SMOOTHING 0x1
19#define FACT_HUFFMAN16 0x2
20
21#define FACT_COL_MAJOR 'C'
22#define FACT_ROW_MAJOR 'R'
23
24
25#ifndef __MARS__
26namespace std
27{
28#endif
29
30class zfits : public fits
31{
32public:
33
34 // Basic constructor
35 zfits(const string& fname, const string& tableName="",
36 bool force=false) : fits(fname, tableName, force),
37 fBuffer(0),
38 fTransposedBuffer(0),
39 fCompressedBuffer(0),
40 fNumTiles(0),
41 fNumRowsPerTile(0),
42 fCurrentRow(-1),
43 fHeapOff(0),
44 fTileSize(0)
45 {
46 InitCompressionReading();
47 }
48
49 // Alternative contstructor
50 zfits(const string& fname, const string& fout, const string& tableName,
51 bool force=false) : fits(fname, fout, tableName, force),
52 fBuffer(0),
53 fTransposedBuffer(0),
54 fCompressedBuffer(0),
55 fNumTiles(0),
56 fNumRowsPerTile(0),
57 fCurrentRow(-1),
58 fHeapOff(0),
59 fTileSize(0)
60 {
61 InitCompressionReading();
62 }
63
64 // Skip the next row
65 bool SkipNextRow()
66 {
67 if (!fTable.isCompressed)
68 return fits::SkipNextRow();
69
70 fRow++;
71 return true;
72 }
73protected:
74
75 // Stage the requested row to internal buffer
76 // Does NOT return data to users
77 virtual void StageRow(size_t row, char* dest)
78 {
79 if (!fTable.isCompressed)
80 {
81 fits::StageRow(row, dest);
82 return;
83 }
84
85 ReadBinaryRow(row, dest);
86 }
87
88private:
89
90 //Structure helper for reading tiles headers
91 typedef struct TileHeader
92 {
93 char id[4];
94 uint32_t numRows;
95 uint64_t size;
96
97 TileHeader() {}
98
99 TileHeader(uint32_t nRows,
100 uint64_t s) : id({'T', 'I', 'L', 'E'}),
101 numRows(nRows),
102 size(s)
103 { };
104 } __attribute__((__packed__)) TileHeader;
105
106 //Structure helper for reading blocks headers and compresion schemes
107 typedef struct BlockHeader
108 {
109 uint64_t size;
110 char ordering;
111 unsigned char numProcs;
112 uint16_t processings[];
113
114 BlockHeader(uint64_t s=0,
115 char o=FACT_ROW_MAJOR,
116 unsigned char n=1) : size(s),
117 ordering(o),
118 numProcs(n)
119 {}
120 } __attribute__((__packed__)) BlockHeader;
121
122 // Do what it takes to initialize the compressed structured
123 void InitCompressionReading()
124 {
125 if (!fTable.isCompressed)
126 return;
127
128 //The constructor may have failed
129 if (!good())
130 return;
131
132 if (fTable.isCompressed)
133 for (auto it=fTable.sortedCols.begin(); it!= fTable.sortedCols.end(); it++)
134 if (it->comp != FACT)
135 {
136#ifdef __EXCEPTIONS
137 throw runtime_error("ERROR: Only the FACT compression scheme is handled by this reader.");
138#else
139 gLog << ___err___ << "ERROR: Only the FACT compression scheme is handled by this reader." << endl;
140 return;
141#endif
142 }
143
144 fColumnOrdering.resize(fTable.sortedCols.size());
145 for (auto it=fColumnOrdering.begin(); it != fColumnOrdering.end(); it++)
146 (*it) = FACT_ROW_MAJOR;
147 //Get compressed specific keywords
148 fNumTiles = fTable.isCompressed ? GetInt("NAXIS2") : 0;
149 fNumRowsPerTile = fTable.isCompressed ? GetInt("ZTILELEN") : 0;
150
151 //give it some space for uncompressing
152 AllocateBuffers();
153
154 //read the file's catalog
155 ReadCatalog();
156
157 //check that heap agrees with head
158 //CheckIfFileIsConsistent();
159 }
160
161 // Copy decompressed data to location requested by user
162 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
163 {
164 if (!fTable.isCompressed)
165 {
166 fits::MoveColumnDataToUserSpace(dest, src, c);
167 return;
168 }
169
170 memcpy(dest, src, c.num*c.size);
171 }
172
173 vector<char> fBuffer; ///<store the uncompressed rows
174 vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
175 vector<char> fCompressedBuffer; ///<compressed rows
176 vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
177
178 size_t fNumTiles; ///< Total number of tiles
179 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
180 size_t fCurrentRow; ///< current row in memory.
181
182 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
183
184 vector<vector<pair<int64_t, int64_t> > > fCatalog;///< Catalog, i.e. the main table that points to the compressed data.
185 vector<size_t> fTileSize; ///< size in bytes of each compressed tile
186 vector<vector<size_t> > fTileOffsets; ///< offset from start of tile of a given compressed column
187
188 // Get buffer space
189 void AllocateBuffers()
190 {
191 fBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
192
193 fTransposedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
194 fCompressedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile + fTable.num_cols*(sizeof(BlockHeader)+256)); //use a bit more memory for block headers
195 }
196
197 // Read catalog data. I.e. the address of the compressed data inside the heap
198 void ReadCatalog()
199 {
200 char readBuf[16];
201 fCatalog.resize(fNumTiles);
202
203 const streampos catalogStart = tellg();
204
205 //do the actual reading
206 for (uint32_t i=0;i<fNumTiles;i++)
207 for (uint32_t j=0;j<fTable.num_cols;j++)
208 {
209 read(readBuf, 2*sizeof(int64_t));
210
211 //swap the bytes
212 int64_t tempValues[2] = {0,0};
213 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf, 2);
214 if (tempValues[0] < 0 || tempValues[1] < 0)
215 {
216 clear(rdstate()|ios::badbit);
217#ifdef __EXCEPTIONS
218 throw runtime_error("Negative value in the catalog");
219#else
220 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
221 return;
222#endif
223 }
224 //add catalog entry
225 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
226 }
227
228 //compute the total size of each compressed tile
229 fTileSize.resize(fNumTiles);
230 fTileOffsets.resize(fNumTiles);
231 for (uint32_t i=0;i<fNumTiles;i++)
232 {
233 fTileSize[i] = 0;
234 for (uint32_t j=0;j<fTable.num_cols;j++)
235 {
236 fTileSize[i] += fCatalog[i][j].first;
237 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
238 }
239 }
240 //see if there is a gap before heap data
241 fHeapOff = tellg()+fTable.GetHeapShift();
242
243 if (!fCopy.is_open())
244 return;
245
246 //write catalog and heap gap to target file
247 seekg(catalogStart);
248
249 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
250
251 vector<char> buf(catSize);
252 read(buf.data(), catSize);
253
254 fCopy.write(buf.data(), catSize);
255 if (!fCopy)
256 clear(rdstate()|ios::badbit);
257 }
258
259 //overrides fits.h method with empty one
260 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
261 virtual void WriteRowToCopyFile(size_t row)
262 {
263 if (row == fRow+1)
264 fChkData.add(fBufferRow);
265 }
266
267 // Compressed version of the read row
268 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
269 {
270 if (rowNum >= GetNumRows())
271 return false;
272
273 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
274 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
275 const size_t previousRow = fCurrentRow;
276
277 fCurrentRow = rowNum;
278
279 //should we read yet another chunk of data ?
280 if (requestedTile != currentTile)
281 {
282 //read yet another chunk from the file
283 const int64_t sizeToRead = fTileSize[requestedTile];
284
285 //skip to the beginning of the tile
286 seekg(fHeapOff+fCatalog[requestedTile][0].second - sizeof(TileHeader));
287 TileHeader tHead;
288 read((char*)(&tHead), sizeof(TileHeader));
289 read(fCompressedBuffer.data(), sizeToRead);
290
291 if (requestedTile == currentTile+1 &&
292 fCopy.is_open() &&
293 fCopy.good())
294 {
295 fCopy.write((char*)(&tHead), sizeof(TileHeader));
296 fCopy.write(fCompressedBuffer.data(), sizeToRead);
297 if (!fCopy)
298 clear(rdstate()|ios::badbit);
299 }
300 else
301 if (fCopy.is_open())
302 clear(rdstate()|ios::badbit);
303
304 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
305
306 //uncompress it
307 UncompressBuffer(requestedTile, thisRoundNumRows);
308
309 // pointer to column (source buffer)
310 const char *src = fTransposedBuffer.data();
311
312 uint32_t i=0;
313 for (auto it=fTable.sortedCols.begin(); it!=fTable.sortedCols.end(); it++, i++)
314 {
315 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
316
317 switch (fColumnOrdering[i])
318 {
319 case FACT_ROW_MAJOR:
320 // regular, "semi-transposed" copy
321 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
322 {
323 memcpy(dest, src, it->bytes);
324 src += it->bytes; // next column
325 }
326 break;
327
328 case FACT_COL_MAJOR:
329 // transposed copy
330 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
331 {
332 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
333 {
334 memcpy(dest, src, it->size);
335 src += it->size; // next element
336 }
337 }
338 break;
339 default:
340 #ifdef __EXCEPTIONS
341 throw runtime_error("Unkown column ordering scheme");
342 #else
343 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
344 return;
345 #endif
346 break;
347 };
348 }
349 }
350
351 //Data loaded and uncompressed. Copy it to destination
352 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
353 return good();
354 }
355
356 // Read a bunch of uncompressed data
357 uint32_t UncompressUNCOMPRESSED(char* dest,
358 const char* src,
359 uint32_t numElems,
360 uint32_t sizeOfElems)
361 {
362 memcpy(dest, src, numElems*sizeOfElems);
363 return numElems*sizeOfElems;
364 }
365
366 // Read a bunch of data compressed with the Huffman algorithm
367 uint32_t UncompressHUFFMAN16(char* dest,
368 const char* src,
369 uint32_t numChunks)
370 {
371 vector<uint16_t> uncompressed;
372
373 //read compressed sizes (one per row)
374 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
375 src += sizeof(uint32_t)*numChunks;
376
377 //uncompress the rows, one by one
378 uint32_t sizeWritten = 0;
379 for (uint32_t j=0;j<numChunks;j++)
380 {
381 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
382
383 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
384
385 sizeWritten += uncompressed.size()*sizeof(uint16_t);
386 dest += uncompressed.size()*sizeof(uint16_t);
387 src += compressedSizes[j];
388 }
389 return sizeWritten;
390 }
391
392 // Apply the inverse transform of the integer smoothing
393 uint32_t UnApplySMOOTHING(int16_t* data,
394 uint32_t numElems)
395 {
396 //un-do the integer smoothing
397 for (uint32_t j=2;j<numElems;j++)
398 data[j] = data[j] + (data[j-1]+data[j-2])/2;
399
400 return numElems*sizeof(uint16_t);
401 }
402
403 // Data has been read from disk. Uncompress it !
404 void UncompressBuffer(const uint32_t &catalogCurrentRow,
405 const uint32_t &thisRoundNumRows)
406 {
407 char *dest = fTransposedBuffer.data();
408
409 //uncompress column by column
410 for (uint32_t i=0; i<fTable.sortedCols.size(); i++)
411 {
412 const fits::Table::Column &col = fTable.sortedCols[i];
413 if (col.num == 0)
414 continue;
415
416 //get the compression flag
417 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i];
418
419 BlockHeader* head = reinterpret_cast<BlockHeader*>(&fCompressedBuffer[compressedOffset]);
420
421 fColumnOrdering[i] = head->ordering;
422
423 uint32_t numRows = (head->ordering==FACT_ROW_MAJOR) ? thisRoundNumRows : col.num;
424 uint32_t numCols = (head->ordering==FACT_COL_MAJOR) ? thisRoundNumRows : col.num;
425
426 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(BlockHeader)+sizeof(uint16_t)*head->numProcs;
427
428 for (int32_t j=head->numProcs-1;j >= 0; j--)
429 {
430 uint32_t sizeWritten=0;
431
432 switch (head->processings[j])
433 {
434 case FACT_RAW:
435 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
436 break;
437 case FACT_SMOOTHING:
438 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
439 break;
440 case FACT_HUFFMAN16:
441 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
442 break;
443 default:
444#ifdef __EXCEPTIONS
445 throw runtime_error("Unknown processing applied to data. Aborting");
446#else
447 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
448 return;
449#endif
450 break;
451 }
452 //increment destination counter only when processing done.
453 if (j==0) dest+= sizeWritten;
454 }
455 }
456 }
457
458 void CheckIfFileIsConsistent()
459 {
460 //goto start of heap
461 streamoff whereAreWe = tellg();
462 seekg(fHeapOff);
463
464 //init number of rows to zero
465 uint64_t numRows = 0;
466
467 //get number of columns from header
468 size_t numCols = fTable.num_cols;
469
470 vector<vector<pair<int64_t, int64_t> > > catalog;
471
472 TileHeader tileHead;
473 BlockHeader columnHead;
474 streamoff offsetInHeap = 0;
475 //skip through the heap
476 while (true)
477 {
478 read((char*)(&tileHead), sizeof(TileHeader));
479 //end of file
480 if (!good())
481 break;
482 //padding or corrupt data
483 if (tileHead.id[0] != 'T' || tileHead.id[1] != 'I' ||
484 tileHead.id[2] != 'L' || tileHead.id[3] != 'E')
485 break;
486
487 //a new tile begins here
488 catalog.push_back(vector<pair<int64_t, int64_t> >(0));
489 offsetInHeap += sizeof(TileHeader);
490
491 //skip through the columns
492 for (size_t i=0;i<numCols;i++)
493 {
494 //zero sized column do not have headers. Skip it
495 if (fTable.sortedCols[i].num == 0)
496 {
497 catalog.back().push_back(make_pair(0,0));
498 continue;
499 }
500 //read column header
501 read((char*)(&columnHead), sizeof(BlockHeader));
502 //corrupted tile
503 if (!good())
504 break;
505 catalog.back().emplace_back(make_pair((int64_t)(columnHead.size),offsetInHeap));
506 offsetInHeap += columnHead.size;
507 seekg(fHeapOff+offsetInHeap);
508 }
509
510 //if we ain't good, this means that something went wrong inside the current tile.
511 if (!good())
512 {
513 catalog.pop_back();
514 break;
515 }
516
517 //current tile is complete. Add rows
518 numRows += tileHead.numRows;
519 }
520
521 if (catalog.size() != fCatalog.size() ||
522 numRows != fTable.num_rows)
523 {
524#ifdef __EXCEPTIONS
525 throw runtime_error("Heap data does not agree with header. Aborting");
526#else
527 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
528 return;
529#endif
530 }
531
532 for (uint32_t i=0;i<catalog.size(); i++)
533 for (uint32_t j=0;j<numCols;j++)
534 {
535 if (catalog[i][j].first != fCatalog[i][j].first ||
536 catalog[i][j].second != fCatalog[i][j].second)
537 {
538#ifdef __EXCEPTIONS
539 throw runtime_error("Heap data does not agree with header. Aborting");
540#else
541 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
542 return;
543#endif
544 }
545 }
546 //go back to start of heap
547 seekg(whereAreWe);
548 }
549
550};//class zfits
551
552#ifndef __MARS__
553}; //namespace std
554#endif
555
556#endif
Note: See TracBrowser for help on using the repository browser.