source: trunk/Mars/mcore/zfits.h@ 16814

Last change on this file since 16814 was 16814, checked in by lyard, 11 years ago
added block headers and changed compression mechanism
File size: 14.9 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_ZFITS
9#define MARS_ZFITS
10
11#include <stdexcept>
12
13#include "fits.h"
14#include "huffman.h"
15
16
17#define FACT_RAW 0x0
18#define FACT_SMOOTHING 0x1
19#define FACT_HUFFMAN16 0x2
20
21#define FACT_COL_MAJOR 'C'
22#define FACT_ROW_MAJOR 'R'
23
24
25#ifndef __MARS__
26namespace std
27{
28#endif
29
30class zfits : public fits
31{
32public:
33
34 // Basic constructor
35 zfits(const string& fname, const string& tableName="",
36 bool force=false) : fits(fname, tableName, force),
37 fBuffer(0),
38 fTransposedBuffer(0),
39 fCompressedBuffer(0),
40 fNumTiles(0),
41 fNumRowsPerTile(0),
42 fCurrentRow(-1),
43 fHeapOff(0),
44 fTileSize(0)
45 {
46 InitCompressionReading();
47 }
48
49 // Alternative contstructor
50 zfits(const string& fname, const string& fout, const string& tableName,
51 bool force=false) : fits(fname, fout, tableName, force),
52 fBuffer(0),
53 fTransposedBuffer(0),
54 fCompressedBuffer(0),
55 fNumTiles(0),
56 fNumRowsPerTile(0),
57 fCurrentRow(-1),
58 fHeapOff(0),
59 fTileSize(0)
60 {
61 InitCompressionReading();
62 }
63
64 // Skip the next row
65 bool SkipNextRow()
66 {
67 if (!fTable.isCompressed)
68 return fits::SkipNextRow();
69
70 fRow++;
71 return true;
72 }
73protected:
74
75 // Stage the requested row to internal buffer
76 // Does NOT return data to users
77 virtual void StageRow(size_t row, char* dest)
78 {
79 if (!fTable.isCompressed)
80 {
81 fits::StageRow(row, dest);
82 return;
83 }
84
85 ReadBinaryRow(row, dest);
86 }
87
88private:
89
90 //Structure helper for reading tiles headers
91 typedef struct TileHeader
92 {
93 char id[4];
94 uint32_t numRows;
95 uint64_t size;
96
97 TileHeader() {}
98
99 TileHeader(uint32_t nRows,
100 uint64_t s) : id({'T', 'I', 'L', 'E'}),
101 numRows(nRows),
102 size(s)
103 { };
104 } __attribute__((__packed__)) TileHeader;
105
106 //Structure helper for reading blocks headers and compresion schemes
107 typedef struct BlockHeader
108 {
109 uint64_t size;
110 char ordering;
111 unsigned char numProcs;
112 uint16_t processings[];
113
114 BlockHeader(uint64_t s=0,
115 char o=FACT_ROW_MAJOR,
116 unsigned char n=1) : size(s),
117 ordering(o),
118 numProcs(n)
119 {}
120 } __attribute__((__packed__)) BlockHeader;
121
122 // Do what it takes to initialize the compressed structured
123 void InitCompressionReading()
124 {
125 //The constructor may have failed
126 if (!good())
127 return;
128
129 if (fTable.isCompressed)
130 for (auto it=fTable.sortedCols.begin(); it!= fTable.sortedCols.end(); it++)
131 if (it->comp != FACT)
132 {
133#ifdef __EXCEPTIONS
134 throw runtime_error("ERROR: Only the FACT compression scheme is handled by this reader.");
135#else
136 gLog << ___err___ << "ERROR: Only the FACT compression scheme is handled by this reader." << endl;
137 return;
138#endif
139 }
140
141 fColumnOrdering.resize(fTable.sortedCols.size());
142 for (auto it=fColumnOrdering.begin(); it != fColumnOrdering.end(); it++)
143 (*it) = FACT_ROW_MAJOR;
144 //Get compressed specific keywords
145 fNumTiles = fTable.isCompressed ? GetInt("NAXIS2") : 0;
146 fNumRowsPerTile = fTable.isCompressed ? GetInt("ZTILELEN") : 0;
147
148 //give it some space for uncompressing
149 AllocateBuffers();
150
151 //read the file's catalog
152 ReadCatalog();
153 }
154
155 // Copy decompressed data to location requested by user
156 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
157 {
158 if (!fTable.isCompressed)
159 {
160 fits::MoveColumnDataToUserSpace(dest, src, c);
161 return;
162 }
163
164 memcpy(dest, src, c.num*c.size);
165 }
166
167 vector<char> fBuffer; ///<store the uncompressed rows
168 vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
169 vector<char> fCompressedBuffer; ///<compressed rows
170 vector<char> fColumnOrdering; ///< ordering of the column's rows
171
172 size_t fNumTiles; ///< Total number of tiles
173 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
174 size_t fCurrentRow; ///< current row in memory.
175
176 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
177
178 vector<vector<pair<int64_t, int64_t> > > fCatalog;///< Catalog, i.e. the main table that points to the compressed data.
179 vector<size_t> fTileSize; ///< size in bytes of each compressed tile
180 vector<vector<size_t> > fTileOffsets; ///< offset from start of tile of a given compressed column
181
182 void AllocateBuffers()
183 {
184 if (!fTable.isCompressed)
185 return;
186
187 fBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
188
189 fTransposedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
190 fCompressedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile + fTable.num_cols*(sizeof(BlockHeader)+256)); //use a bit more memory for block headers
191 }
192
193 // Read catalog data. I.e. the address of the compressed data inside the heap
194 void ReadCatalog()
195 {
196 if (!fTable.isCompressed)
197 return;
198
199 char readBuf[16];
200 fCatalog.resize(fNumTiles);
201
202 const streampos catalogStart = tellg();
203
204 //do the actual reading
205 for (uint32_t i=0;i<fNumTiles;i++)
206 for (uint32_t j=0;j<fTable.num_cols;j++)
207 {
208 read(readBuf, 2*sizeof(int64_t));
209
210 //swap the bytes
211 int64_t tempValues[2] = {0,0};
212 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf, 2);
213 if (tempValues[0] < 0 || tempValues[1] < 0)
214 {
215 clear(rdstate()|ios::badbit);
216#ifdef __EXCEPTIONS
217 throw runtime_error("Negative value in the catalog");
218#else
219 gLog << ___err___ << "ERROR - negative value in the catalog" << endl;
220 return;
221#endif
222 }
223 //add catalog entry
224 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
225 }
226
227 //compute the total size of each compressed tile
228 fTileSize.resize(fNumTiles);
229 fTileOffsets.resize(fNumTiles);
230 for (uint32_t i=0;i<fNumTiles;i++)
231 {
232 fTileSize[i] = 0;
233 for (uint32_t j=0;j<fTable.num_cols;j++)
234 {
235 fTileSize[i] += fCatalog[i][j].first;
236 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
237 }
238 }
239 //see if there is a gap before heap data
240 fHeapOff = tellg()+fTable.GetHeapShift();
241
242 if (!fCopy.is_open())
243 return;
244
245 //write catalog and heap gap to target file
246 seekg(catalogStart);
247
248 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
249
250 vector<char> buf(catSize);
251 read(buf.data(), catSize);
252
253 fCopy.write(buf.data(), catSize);
254 if (!fCopy)
255 clear(rdstate()|ios::badbit);
256 }
257 //overrides fits.h method with empty one
258 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
259 virtual void WriteRowToCopyFile(size_t )
260 {
261
262 }
263 // Compressed versin of the read row
264 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
265 {
266 if (rowNum >= GetNumRows())
267 return false;
268
269 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
270 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
271 const size_t previousRow = fCurrentRow;
272
273 fCurrentRow = rowNum;
274
275 //should we read yet another chunk of data ?
276 if (requestedTile != currentTile)
277 {
278 //read yet another chunk from the file
279 const int64_t sizeToRead = fTileSize[requestedTile];
280
281 //skip to the beginning of the tile
282 seekg(fHeapOff+fCatalog[requestedTile][0].second - sizeof(TileHeader));
283 TileHeader tHead;
284 read((char*)(&tHead), sizeof(TileHeader));
285 read(fCompressedBuffer.data(), sizeToRead);
286
287 if (fCurrentRow == previousRow+1 &&
288 fCopy.is_open() &&
289 fCopy.good())
290 {
291 fCopy.write((char*)(&tHead), sizeof(TileHeader));
292 fCopy.write(fCompressedBuffer.data(), sizeToRead);
293 if (!fCopy)
294 clear(rdstate()|ios::badbit);
295 }
296 else
297 if (fCopy.is_open())
298 clear(rdstate()|ios::badbit);
299
300 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
301
302 //uncompress it
303 UncompressBuffer(requestedTile, thisRoundNumRows);
304
305 // pointer to column (source buffer)
306 const char *src = fTransposedBuffer.data();
307
308 uint32_t i=0;
309 for (auto it=fTable.sortedCols.begin(); it!=fTable.sortedCols.end(); it++, i++)
310 {
311 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
312
313 switch (fColumnOrdering[i])
314 {
315 case FACT_ROW_MAJOR:
316 // regular, "semi-transposed" copy
317 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
318 {
319 memcpy(dest, src, it->bytes);
320 src += it->bytes; // next column
321 }
322 break;
323
324 case FACT_COL_MAJOR:
325 // transposed copy
326 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
327 {
328 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
329 {
330 memcpy(dest, src, it->size);
331 src += it->size; // next element
332 }
333 }
334 break;
335 default:
336 #ifdef __EXCEPTIONS
337 throw runtime_error("Unkown column ordering scheme");
338 #else
339 gLog << ___err___ << "ERROR - unkown column ordering scheme" << endl;
340 return;
341 #endif
342 break;
343 };
344 }
345 }
346
347 //Data loaded and uncompressed. Copy it to destination
348 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
349 return good();
350 }
351
352 // Read a bunch of uncompressed data
353 uint32_t UncompressUNCOMPRESSED(char* dest,
354 const char* src,
355 uint32_t numElems,
356 uint32_t sizeOfElems)
357 {
358 memcpy(dest, src, numElems*sizeOfElems);
359 return numElems*sizeOfElems;
360 }
361
362 // Read a bunch of data compressed with the Huffman algorithm
363 uint32_t UncompressHUFFMAN16(char* dest,
364 const char* src,
365 uint32_t numChunks)
366 {
367 vector<uint16_t> uncompressed;
368
369 //read compressed sizes (one per row)
370 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
371 src += sizeof(uint32_t)*numChunks;
372
373 //uncompress the rows, one by one
374 uint32_t sizeWritten = 0;
375 for (uint32_t j=0;j<numChunks;j++)
376 {
377 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
378
379 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
380
381 sizeWritten += uncompressed.size()*sizeof(uint16_t);
382 dest += uncompressed.size()*sizeof(uint16_t);
383 src += compressedSizes[j];
384 }
385 return sizeWritten;
386 }
387
388 uint32_t UnApplySMOOTHING(int16_t* data,
389 uint32_t numElems)
390 {
391 //un-do the integer smoothing
392 for (uint32_t j=2;j<numElems;j++)
393 data[j] = data[j] + (data[j-1]+data[j-2])/2;
394
395 return numElems*sizeof(uint16_t);
396 }
397 // Data has been read from disk. Uncompress it !
398 void UncompressBuffer(const uint32_t &catalogCurrentRow, const uint32_t &thisRoundNumRows)
399 {
400 char *dest = fTransposedBuffer.data();
401
402 //uncompress column by column
403 for (uint32_t i=0; i<fTable.sortedCols.size(); i++)
404 {
405 const fits::Table::Column &col = fTable.sortedCols[i];
406 if (col.num == 0)
407 continue;
408
409 //get the compression flag
410 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i];
411
412 BlockHeader* head = reinterpret_cast<BlockHeader*>(&fCompressedBuffer[compressedOffset]);
413
414 fColumnOrdering[i] = head->ordering;
415
416 uint32_t numRows = (head->ordering==FACT_ROW_MAJOR) ? thisRoundNumRows : col.num;
417 uint32_t numCols = (head->ordering==FACT_COL_MAJOR) ? thisRoundNumRows : col.num;
418
419 const char *src = fCompressedBuffer.data()+compressedOffset+sizeof(BlockHeader)+sizeof(uint16_t)*head->numProcs;
420
421 for (uint32_t j=head->numProcs;j != 0; j--)
422 {
423 uint32_t sizeWritten=0;
424
425 switch (head->processings[j-1])
426 {
427 case FACT_RAW:
428 if (head->numProcs == 1)
429 sizeWritten = UncompressUNCOMPRESSED(dest, src, numRows*numCols, col.size);
430 break;
431 case FACT_SMOOTHING:
432 sizeWritten = UnApplySMOOTHING(reinterpret_cast<int16_t*>(dest), numRows*numCols);
433 break;
434 case FACT_HUFFMAN16:
435 sizeWritten = UncompressHUFFMAN16(dest, src, numRows);
436 break;
437 default:
438#ifdef __EXCEPTIONS
439 throw runtime_error("Unknown processing applied to data. Aborting");
440#else
441 gLog << ___err___ << "ERROR - Unknown processing applied to data. Aborting" << endl;
442 return;
443#endif
444 break;
445 }
446 //increment destination counter only when processing done.
447 if (j==1) dest+= sizeWritten;
448 }
449 }
450 }
451
452};//class zfits
453
454#ifndef __MARS__
455}; //namespace std
456#endif
457
458#endif
Note: See TracBrowser for help on using the repository browser.