source: trunk/Mars/mcore/zfits.h@ 16867

Last change on this file since 16867 was 16867, checked in by lyard, 12 years ago
got rid of defines
File size: 18.2 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_ZFITS
9#define MARS_ZFITS
10
11#include <stdexcept>
12
13#include "fits.h"
14#include "huffman.h"
15
16/*
17#define FACT_RAW 0x0
18#define FACT_SMOOTHING 0x1
19#define FACT_HUFFMAN16 0x2
20
21#define FACT_COL_MAJOR 'C'
22#define FACT_ROW_MAJOR 'R'
23*/
24
25
26#ifndef __MARS__
27namespace std
28{
29#endif
30
31enum FACT_COMPRESSION_PROCESS
32{
33 FACT_RAW = 0x0,
34 FACT_SMOOTHING = 0x1,
35 FACT_HUFFMAN16 = 0x2
36};
37
38enum FACT_ROW_ORDERING
39{
40 FACT_COL_MAJOR = 'C',
41 FACT_ROW_MAJOR = 'R'
42};
43
44class zfits : public fits
45{
46public:
47
48 // Basic constructor
49 zfits(const string& fname, const string& tableName="",
50 bool force=false) : fits(fname, tableName, force),
51 fNumTiles(0),
52 fNumRowsPerTile(0),
53 fCurrentRow(-1),
54 fHeapOff(0),
55 fTileSize(0)
56 {
57 InitCompressionReading();
58 }
59
60 // Alternative contstructor
61 zfits(const string& fname, const string& fout, const string& tableName,
62 bool force=false) : fits(fname, fout, tableName, force),
63 fNumTiles(0),
64 fNumRowsPerTile(0),
65 fCurrentRow(-1),
66 fHeapOff(0),
67 fTileSize(0)
68 {
69 InitCompressionReading();
70 }
71
72 // Skip the next row
73 bool SkipNextRow()
74 {
75 if (!fTable.isCompressed)
76 return fits::SkipNextRow();
77
78 fRow++;
79 return true;
80 }
81
82protected:
83
84 // Stage the requested row to internal buffer
85 // Does NOT return data to users
86 virtual void StageRow(size_t row, char* dest)
87 {
88 if (!fTable.isCompressed)
89 {
90 fits::StageRow(row, dest);
91 return;
92 }
93
94 ReadBinaryRow(row, dest);
95 }
96
97private:
98#ifndef __CINT__
99 //Structure helper for reading tiles headers
100 struct TileHeader
101 {
102 char id[4];
103 uint32_t numRows;
104 uint64_t size;
105
106 TileHeader() {}
107
108 TileHeader(uint32_t nRows,
109 uint64_t s) : id({'T', 'I', 'L', 'E'}),
110 numRows(nRows),
111 size(s)
112 { };
113 } __attribute__((__packed__));
114
115 //Structure helper for reading blocks headers and compresion schemes
116 struct BlockHeader
117 {
118 uint64_t size;
119 char ordering;
120 unsigned char numProcs;
121 uint16_t processings[];
122
123 BlockHeader(uint64_t s=0,
124 char o=FACT_ROW_MAJOR,
125 unsigned char n=1) : size(s),
126 ordering(o),
127 numProcs(n)
128 {}
129 } __attribute__((__packed__));
130#endif
131 // Do what it takes to initialize the compressed structured
132 void InitCompressionReading()
133 {
134 if (!fTable.isCompressed)
135 return;
136
137 //The constructor may have failed
138 if (!good())
139 return;
140
141 if (fTable.isCompressed)
142 for (auto it=fTable.sortedCols.begin(); it!= fTable.sortedCols.end(); it++)
143 if (it->comp != FACT)
144 {
145 clear(rdstate()|ios::badbit);
146#ifdef __EXCEPTIONS
147 throw runtime_error("Only the FACT compression scheme is handled by this reader.");
148#else
149 gLog << ___err___ << "ERROR - Only the FACT compression scheme is handled by this reader." << endl;
150 return;
151#endif
152 }
153
154 fColumnOrdering.resize(fTable.sortedCols.size());
155 for (auto it=fColumnOrdering.begin(); it != fColumnOrdering.end(); it++)
156 (*it) = FACT_ROW_MAJOR;
157 //Get compressed specific keywords
158 fNumTiles = fTable.isCompressed ? GetInt("NAXIS2") : 0;
159 fNumRowsPerTile = fTable.isCompressed ? GetInt("ZTILELEN") : 0;
160
161 //give it some space for uncompressing
162 AllocateBuffers();
163
164 //read the file's catalog
165 ReadCatalog();
166
167 //check that heap agrees with head
168 //CheckIfFileIsConsistent();
169 }
170
171 // Copy decompressed data to location requested by user
172 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
173 {
174 if (!fTable.isCompressed)
175 {
176 fits::MoveColumnDataToUserSpace(dest, src, c);
177 return;
178 }
179
180 memcpy(dest, src, c.num*c.size);
181 }
182
183 vector<char> fBuffer; ///<store the uncompressed rows
184 vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
185 vector<char> fCompressedBuffer; ///<compressed rows
186 vector<char> fColumnOrdering; ///< ordering of the column's rows. Can change from tile to tile.
187
188 size_t fNumTiles; ///< Total number of tiles
189 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
190 size_t fCurrentRow; ///< current row in memory.
191
192 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
193
194 vector<vector<pair<int64_t, int64_t> > > fCatalog;///< Catalog, i.e. the main table that points to the compressed data.
195 vector<size_t> fTileSize; ///< size in bytes of each compressed tile
196 vector<vector<size_t> > fTileOffsets; ///< offset from start of tile of a given compressed column
197
198 // Get buffer space
199 void AllocateBuffers()
200 {
201 fBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
202
203 fTransposedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
204 fCompressedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile + fTable.num_cols*(sizeof(BlockHeader)+256)); //use a bit more memory for block headers
205 }
206
207 // Read catalog data. I.e. the address of the compressed data inside the heap
208 void ReadCatalog()
209 {
210 char readBuf[16];
211 fCatalog.resize(fNumTiles);
212
213 const streampos catalogStart = tellg();
214
215 //do the actual reading
216 for (uint32_t i=0;i<fNumTiles;i++)
217 for (uint32_t j=0;j<fTable.num_cols;j++)
218 {
219 read(readBuf, 2*sizeof(int64_t));
220
221 //swap the bytes
222 int64_t tempValues[2] = {0,0};
223 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf, 2);
224 if (tempValues[0] < 0 || tempValues[1] < 0)
225 {
226 clear(rdstate()|ios::badbit);
227#ifdef __EXCEPTIONS
228 throw runtime_error("Negative value in the catalog");
229#else
230 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
231 return;
232#endif
233 }
234 //add catalog entry
235 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
236 }
237
238 //compute the total size of each compressed tile
239 fTileSize.resize(fNumTiles);
240 fTileOffsets.resize(fNumTiles);
241 for (uint32_t i=0;i<fNumTiles;i++)
242 {
243 fTileSize[i] = 0;
244 for (uint32_t j=0;j<fTable.num_cols;j++)
245 {
246 fTileSize[i] += fCatalog[i][j].first;
247 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
248 }
249 }
250 //see if there is a gap before heap data
251 fHeapOff = tellg()+fTable.GetHeapShift();
252
253 if (!fCopy.is_open())
254 return;
255
256 //write catalog and heap gap to target file
257 seekg(catalogStart);
258
259 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
260
261 vector<char> buf(catSize);
262 read(buf.data(), catSize);
263
264 fCopy.write(buf.data(), catSize);
265 if (!fCopy)
266 clear(rdstate()|ios::badbit);
267 }
268
269 //overrides fits.h method with empty one
270 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
271 virtual void WriteRowToCopyFile(size_t row)
272 {
273 if (row == fRow+1)
274 fChkData.add(fBufferRow);
275 }
276
277 // Compressed version of the read row
278 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
279 {
280 if (rowNum >= GetNumRows())
281 return false;
282
283 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
284 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
285
286 fCurrentRow = rowNum;
287
288 //should we read yet another chunk of data ?
289 if (requestedTile != currentTile)
290 {
291 //read yet another chunk from the file
292 const int64_t sizeToRead = fTileSize[requestedTile];
293
294 //skip to the beginning of the tile
295 seekg(fHeapOff+fCatalog[requestedTile][0].second - sizeof(TileHeader));
296 TileHeader tHead;
297 read((char*)(&tHead), sizeof(TileHeader));
298 read(fCompressedBuffer.data(), sizeToRead);
299
300 if (requestedTile == currentTile+1 &&
301 fCopy.is_open() &&
302 fCopy.good())
303 {
304 fCopy.write((char*)(&tHead), sizeof(TileHeader));
305 fCopy.write(fCompressedBuffer.data(), sizeToRead);
306 if (!fCopy)
307 clear(rdstate()|ios::badbit);
308 }
309 else
310 if (fCopy.is_open())
311 clear(rdstate()|ios::badbit);
312
313 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
314
315 //uncompress it
316 UncompressBuffer(requestedTile, thisRoundNumRows);
317
318 // pointer to column (source buffer)
319 const char *src = fTransposedBuffer.data();
320
321 uint32_t i=0;
322 for (auto it=fTable.sortedCols.begin(); it!=fTable.sortedCols.end(); it++, i++)
323 {
324 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
325
326 switch (fColumnOrdering[i])
327 {
328 case FACT_ROW_MAJOR:
329 // regular, "semi-transposed" copy
330 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
331 {
332 memcpy(dest, src, it->bytes);
333 src += it->bytes; // next column
334 }
335 break;
336
337 case FACT_COL_MAJOR:
338 // transposed copy
339 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
340 {
341 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
342 {
343 memcpy(dest, src, it->size);
344 src += it->size; // next element
345 }
346 }
347 break;
348 default:
349 clear(rdstate()|ios::badbit);
350 #ifdef __EXCEPTIONS
351 throw runtime_error("Unkown column ordering scheme found");
352 #else
353 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
354 return false;
355 #endif
356 break;
357 };
358 }
359 }
360
361 //Data loaded and uncompressed. Copy it to destination
362 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
363 return good();
364 }
365
366 // Read a bunch of uncompressed data
367 uint32_t UncompressUNCOMPRESSED(char* dest,
368 const char* src,
369 uint32_t numElems,
370 uint32_t sizeOfElems)
371 {
372 memcpy(dest, src, numElems*sizeOfElems);
373 return numElems*sizeOfElems;
374 }
375
376 // Read a bunch of data compressed with the Huffman algorithm
377 uint32_t UncompressHUFFMAN16(char* dest,
378 const char* src,
379 uint32_t numChunks)
380 {
381 vector<uint16_t> uncompressed;
382
383 //read compressed sizes (one per row)
384 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
385 src += sizeof(uint32_t)*numChunks;
386
387 //uncompress the rows, one by one
388 uint32_t sizeWritten = 0;
389 for (uint32_t j=0;j<numChunks;j++)
390 {
391 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
392
393 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
394
395 sizeWritten += uncompressed.size()*sizeof(uint16_t);
396 dest += uncompressed.size()*sizeof(uint16_t);
397 src += compressedSizes[j];
398 }
399 return sizeWritten;
400 }
401
402 // Apply the inverse transform of the integer smoothing
403 uint32_t UnApplySMOOTHING(int16_t* data,
404 uint32_t numElems)
405 {
406 //un-do the integer smoothing
407 for (uint32_t j=2;j<numElems;j++)
408 data[j] = data[j] + (data[j-1]+data[j-2])/2;
409
410 return numElems*sizeof(uint16_t);
411 }
412
413 // Data has been read from disk. Uncompress it !
414 void UncompressBuffer(const uint32_t &catalogCurrentRow,
415 const uint32_t &thisRoundNumRows)
416 {
417 char *dest = fTransposedBuffer.data();
418
419 //uncompress column by column
420 for (uint32_t i=0; i<fTable.sortedCols.size(); i++)
421 {
422 const fits::Table::Column &col = fTable.sortedCols[i];
423 if (col.num == 0)
424 continue;
425
426 //get the compression flag
427 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i];
428
429 const BlockHeader* head = reinterpret_cast<BlockHeader*>(&fCompressedBuffer[compressedOffset]);
430
431 fColumnOrdering[i] = head->ordering;
432
433 const uint32_t numRows = (head->ordering==FACT_ROW_MAJOR) ? thisRoundNumRows : col.num;
434 const uint32_t numCols = (head->ordering==FACT_COL_MAJOR) ? thisRoundNumRows : col.num;
435
436 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(BlockHeader)+sizeof(uint16_t)*head->numProcs;
437
438 for (int32_t j=head->numProcs-1;j >= 0; j--)
439 {
440 uint32_t sizeWritten=0;
441
442 switch (head->processings[j])
443 {
444 case FACT_RAW:
445 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
446 break;
447 case FACT_SMOOTHING:
448 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
449 break;
450 case FACT_HUFFMAN16:
451 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
452 break;
453 default:
454 clear(rdstate()|ios::badbit);
455#ifdef __EXCEPTIONS
456 throw runtime_error("Unknown processing applied to data. Aborting");
457#else
458 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
459 return;
460#endif
461 }
462 //increment destination counter only when processing done.
463 if (j==0) dest+= sizeWritten;
464 }
465 }
466 }
467
468 void CheckIfFileIsConsistent()
469 {
470 //goto start of heap
471 streamoff whereAreWe = tellg();
472 seekg(fHeapOff);
473
474 //init number of rows to zero
475 uint64_t numRows = 0;
476
477 //get number of columns from header
478 size_t numCols = fTable.num_cols;
479
480 vector<vector<pair<int64_t, int64_t> > > catalog;
481
482 TileHeader tileHead;
483 BlockHeader columnHead;
484 streamoff offsetInHeap = 0;
485 //skip through the heap
486 while (true)
487 {
488 read((char*)(&tileHead), sizeof(TileHeader));
489 //end of file
490 if (!good())
491 break;
492 //padding or corrupt data
493 if (memcmp(tileHead.id, "TILE", 4))
494 {
495 clear(rdstate()|ios::badbit);
496 break;
497 }
498
499 //a new tile begins here
500 catalog.push_back(vector<pair<int64_t, int64_t> >(0));
501 offsetInHeap += sizeof(TileHeader);
502
503 //skip through the columns
504 for (size_t i=0;i<numCols;i++)
505 {
506 //zero sized column do not have headers. Skip it
507 if (fTable.sortedCols[i].num == 0)
508 {
509 catalog.back().push_back(make_pair(0,0));
510 continue;
511 }
512 //read column header
513 read((char*)(&columnHead), sizeof(BlockHeader));
514 //corrupted tile
515 if (!good())
516 break;
517 catalog.back().emplace_back((int64_t)(columnHead.size),offsetInHeap);
518 offsetInHeap += columnHead.size;
519 seekg(fHeapOff+offsetInHeap);
520 }
521
522 //if we ain't good, this means that something went wrong inside the current tile.
523 if (!good())
524 {
525 catalog.pop_back();
526 break;
527 }
528
529 //current tile is complete. Add rows
530 numRows += tileHead.numRows;
531 }
532
533 if (catalog.size() != fCatalog.size() ||
534 numRows != fTable.num_rows)
535 {
536 clear(rdstate()|ios::badbit);
537#ifdef __EXCEPTIONS
538 throw runtime_error("Heap data does not agree with header.");
539#else
540 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
541 return;
542#endif
543 }
544
545 for (uint32_t i=0;i<catalog.size(); i++)
546 for (uint32_t j=0;j<numCols;j++)
547 {
548 if (catalog[i][j].first != fCatalog[i][j].first ||
549 catalog[i][j].second != fCatalog[i][j].second)
550 {
551 clear(rdstate()|ios::badbit);
552#ifdef __EXCEPTIONS
553 throw runtime_error("Heap data does not agree with header.");
554#else
555 gLog << ___err___ << "ERROR - Heap data does not agree with header." << endl;
556 return;
557#endif
558 }
559 }
560 //go back to start of heap
561 seekg(whereAreWe);
562 }
563
564};//class zfits
565
566#ifndef __MARS__
567}; //namespace std
568#endif
569
570#endif
Note: See TracBrowser for help on using the repository browser.