source: trunk/Mars/mcore/zfits.h@ 16621

Last change on this file since 16621 was 16443, checked in by lyard, 11 years ago
more tweaks to factfits
File size: 12.5 KB
Line 
1/*
2 * zfits.h
3 *
4 * Created on: May 16, 2013
5 * Author: lyard
6 */
7
8#ifndef MARS_ZFITS
9#define MARS_ZFITS
10
11#include <stdexcept>
12
13#include "fits.h"
14#include "huffman.h"
15
16
17#ifndef __MARS__
18namespace std
19{
20#endif
21
22class zfits : public fits
23{
24public:
25
26 // Basic constructor
27 zfits(const string& fname, const string& tableName="",
28 bool force=false) : fits(fname, tableName, force),
29 fBuffer(0),
30 fTransposedBuffer(0),
31 fCompressedBuffer(0),
32 fNumTiles(0),
33 fNumRowsPerTile(0),
34 fCurrentRow(-1),
35 fHeapOff(0),
36 fTileSize(0)
37 {
38 InitCompressionReading();
39 }
40
41 // Alternative contstructor
42 zfits(const string& fname, const string& fout, const string& tableName,
43 bool force=false) : fits(fname, fout, tableName, force),
44 fBuffer(0),
45 fTransposedBuffer(0),
46 fCompressedBuffer(0),
47 fNumTiles(0),
48 fNumRowsPerTile(0),
49 fCurrentRow(-1),
50 fHeapOff(0),
51 fTileSize(0)
52 {
53 InitCompressionReading();
54 }
55
56 // Skip the next row
57 bool SkipNextRow()
58 {
59 if (!fTable.isCompressed)
60 return fits::SkipNextRow();
61
62 fRow++;
63 return true;
64 }
65protected:
66
67 // Stage the requested row to internal buffer
68 // Does NOT return data to users
69 virtual void StageRow(size_t row, char* dest)
70 {
71 if (!fTable.isCompressed)
72 {
73 fits::StageRow(row, dest);
74 return;
75 }
76
77 ReadBinaryRow(row, dest);
78 }
79
80private:
81
82 // Do what it takes to initialize the compressed structured
83 void InitCompressionReading()
84 {
85 //The constructor may have failed
86 if (!good())
87 return;
88
89 //Get compressed specific keywords
90 fNumTiles = fTable.isCompressed ? GetInt("NAXIS2") : 0;
91 fNumRowsPerTile = fTable.isCompressed ? GetInt("ZTILELEN") : 0;
92
93 //give it some space for uncompressing
94 AllocateBuffers();
95
96 //read the file's catalog
97 ReadCatalog();
98 }
99
100 // Copy decompressed data to location requested by user
101 void MoveColumnDataToUserSpace(char* dest, const char* src, const Table::Column& c)
102 {
103 if (!fTable.isCompressed)
104 {
105 fits::MoveColumnDataToUserSpace(dest, src, c);
106 return;
107 }
108
109 memcpy(dest, src, c.num*c.size);
110 }
111
112 vector<char> fBuffer; ///<store the uncompressed rows
113 vector<char> fTransposedBuffer; ///<intermediate buffer to transpose the rows
114 vector<char> fCompressedBuffer; ///<compressed rows
115
116 size_t fNumTiles; ///< Total number of tiles
117 size_t fNumRowsPerTile; ///< Number of rows per compressed tile
118 size_t fCurrentRow; ///< current row in memory.
119
120 streamoff fHeapOff; ///< offset from the beginning of the file of the binary data
121
122 vector<vector<pair<int64_t, int64_t> > > fCatalog;///< Catalog, i.e. the main table that points to the compressed data.
123 vector<size_t> fTileSize; ///< size in bytes of each compressed tile
124 vector<vector<size_t> > fTileOffsets; ///< offset from start of tile of a given compressed column
125
126 void AllocateBuffers()
127 {
128 if (!fTable.isCompressed)
129 return;
130
131 fBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
132
133 fTransposedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile);
134 fCompressedBuffer.resize(fTable.bytes_per_row*fNumRowsPerTile + fTable.num_cols); //use a bit more memory for compression flags
135 }
136
137 // Read catalog data. I.e. the address of the compressed data inside the heap
138 void ReadCatalog()
139 {
140 if (!fTable.isCompressed)
141 return;
142
143 char readBuf[16];
144 fCatalog.resize(fNumTiles);
145
146 const streampos catalogStart = tellg();
147
148 //do the actual reading
149 for (uint32_t i=0;i<fNumTiles;i++)
150 for (uint32_t j=0;j<fTable.num_cols;j++)
151 {
152 read(readBuf, 2*sizeof(int64_t));
153
154 //swap the bytes
155 int64_t tempValues[2] = {0,0};
156 revcpy<8>(reinterpret_cast<char*>(tempValues), readBuf, 2);
157 if (tempValues[0] < 0 || tempValues[1] < 0)
158 {
159#ifdef __EXCEPTIONS
160 throw runtime_error("ERROR: negative value in the catalog");
161#else
162 gLog << ___err ___ << "ERROR: negative value in the catalog" << endl;
163 return;
164#endif
165 }
166 //add catalog entry
167 fCatalog[i].emplace_back(tempValues[0], tempValues[1]);
168 }
169
170 //compute the total size of each compressed tile
171 fTileSize.resize(fNumTiles);
172 fTileOffsets.resize(fNumTiles);
173 for (uint32_t i=0;i<fNumTiles;i++)
174 {
175 fTileSize[i] = 0;
176 for (uint32_t j=0;j<fTable.num_cols;j++)
177 {
178 fTileSize[i] += fCatalog[i][j].first;
179 fTileOffsets[i].emplace_back(fCatalog[i][j].second - fCatalog[i][0].second);
180 }
181 }
182 //see if there is a gap before heap data
183 fHeapOff = tellg()+fTable.GetHeapShift();
184
185 if (!fCopy.is_open())
186 return;
187
188 //write catalog and heap gap to target file
189 seekg(catalogStart);
190
191 const size_t catSize = fTable.GetHeapShift() + fTable.total_bytes;
192
193 vector<char> buf(catSize);
194 read(buf.data(), catSize);
195
196 fCopy.write(buf.data(), catSize);
197 if (!fCopy)
198 clear(rdstate()|ios::badbit);
199 }
200 //overrides fits.h method with empty one
201 //work is done in ReadBinaryRow because it requires volatile data from ReadBinaryRow
202 virtual void WriteRowToCopyFile(size_t )
203 {
204
205 }
206 // Compressed versin of the read row
207 bool ReadBinaryRow(const size_t &rowNum, char *bufferToRead)
208 {
209 if (rowNum >= GetNumRows())
210 return false;
211
212 const uint32_t requestedTile = rowNum/fNumRowsPerTile;
213 const uint32_t currentTile = fCurrentRow/fNumRowsPerTile;
214 const size_t previousRow = fCurrentRow;
215
216 fCurrentRow = rowNum;
217
218 //should we read yet another chunk of data ?
219 if (requestedTile != currentTile)
220 {
221 //read yet another chunk from the file
222 const int64_t sizeToRead = fTileSize[requestedTile];
223
224 //skip to the beginning of the tile
225 seekg(fHeapOff+fCatalog[requestedTile][0].second);
226 read(fCompressedBuffer.data(), sizeToRead);
227
228 if (fCurrentRow == previousRow+1 &&
229 fCopy.is_open() &&
230 fCopy.good())
231 {
232 fCopy.write(fCompressedBuffer.data(), sizeToRead);
233 if (!fCopy)
234 clear(rdstate()|ios::badbit);
235 }
236 else
237 if (fCopy.is_open())
238 clear(rdstate()|ios::badbit);
239
240 const uint32_t thisRoundNumRows = (GetNumRows()<fCurrentRow + fNumRowsPerTile) ? GetNumRows()%fNumRowsPerTile : fNumRowsPerTile;
241
242 //uncompress it
243 UncompressBuffer(requestedTile, thisRoundNumRows);
244
245 // pointer to column (source buffer)
246 const char *src = fTransposedBuffer.data();
247
248 for (auto it=fTable.sortedCols.begin(); it!=fTable.sortedCols.end(); it++)
249 {
250 char *buffer = fBuffer.data() + it->offset; // pointer to column (destination buffer)
251
252 switch (it->comp)
253 {
254 case UNCOMPRESSED:
255 case SMOOTHMAN:
256 // regular, "semi-transposed" copy
257 for (char *dest=buffer; dest<buffer+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
258 {
259 memcpy(dest, src, it->bytes);
260 src += it->bytes; // next column
261 }
262 break;
263
264 default:
265 // transposed copy
266 for (char *elem=buffer; elem<buffer+it->bytes; elem+=it->size) // element-by-element (arrays)
267 {
268 for (char *dest=elem; dest<elem+thisRoundNumRows*fTable.bytes_per_row; dest+=fTable.bytes_per_row) // row-by-row
269 {
270 memcpy(dest, src, it->size);
271 src += it->size; // next element
272 }
273 }
274 break;
275 };
276 }
277 }
278
279 //Data loaded and uncompressed. Copy it to destination
280 memcpy(bufferToRead, fBuffer.data()+fTable.bytes_per_row*(fCurrentRow%fNumRowsPerTile), fTable.bytes_per_row);
281 return good();
282 }
283
284 // Read a bunch of uncompressed data
285 uint32_t UncompressUNCOMPRESSED(char* dest,
286 const char* src,
287 uint32_t numRows,
288 uint32_t sizeOfElems,
289 uint32_t numRowElems)
290 {
291 memcpy(dest, src, numRows*sizeOfElems*numRowElems);
292 return numRows*sizeOfElems*numRowElems;
293 }
294
295 // Read a bunch of data compressed with the Huffman algorithm
296 uint32_t UncompressHUFFMAN(char* dest,
297 const char* src,
298 uint32_t ,
299 uint32_t sizeOfElems,
300 uint32_t numRowElems)
301 {
302 if (sizeOfElems < 2)
303 {
304 cout << "Error, Huffman only works on shorts or longer types. (here: " << sizeOfElems << "). Aborting." << endl;
305 return -1;
306 }
307
308 vector<uint16_t> uncompressed;
309
310 //read compressed sizes (one per row)
311 const uint32_t* compressedSizes = reinterpret_cast<const uint32_t*>(src);
312 src += sizeof(uint32_t)*numRowElems;
313
314 //uncompress the rows, one by one
315 uint32_t sizeWritten = 0;
316 for (uint32_t j=0;j<numRowElems;j++)
317 {
318 Huffman::Decode(reinterpret_cast<const unsigned char*>(src), compressedSizes[j], uncompressed);
319
320 memcpy(dest, uncompressed.data(), uncompressed.size()*sizeof(uint16_t));
321
322 sizeWritten += uncompressed.size()*sizeof(uint16_t);
323 dest += uncompressed.size()*sizeof(uint16_t);
324 src += compressedSizes[j];
325 }
326 return sizeWritten;
327 }
328
329 //Read a bunch of data compressed with the smoothman algorithm
330 uint32_t UncompressSMOOTHMAN(int16_t* dest,
331 const char* src,
332 uint32_t numRows,
333 uint32_t sizeOfElems,
334 uint32_t numRowElems)
335 {
336 //call huffman transposed
337 const uint32_t sizeWritten = UncompressHUFFMAN(reinterpret_cast<char*>(dest), src, numRowElems, sizeOfElems, numRows);
338
339 //un-do the integer smoothing
340 for (uint32_t j=2;j<numRowElems*numRows;j++)
341 dest[j] = dest[j] + (dest[j-1]+dest[j-2])/2;
342
343 return sizeWritten;
344 }
345
346 // Data has been read from disk. Uncompress it !
347 void UncompressBuffer(const uint32_t &catalogCurrentRow, const uint32_t &thisRoundNumRows)
348 {
349 char *dest = fTransposedBuffer.data();
350
351 //uncompress column by column
352 for (uint32_t i=0; i<fTable.sortedCols.size(); i++)
353 {
354 const fits::Table::Column &col = fTable.sortedCols[i];
355 if (col.num == 0)
356 continue;
357
358 //get the compression flag
359 const int64_t compressedOffset = fTileOffsets[catalogCurrentRow][i];//fCatalog[catalogCurrentRow][i].second - fCatalog[catalogCurrentRow][0].second;
360 const char compressedFlag = fCompressedBuffer[compressedOffset];
361
362 //#define COMPRESSED_FLAG 0x1
363 //#define UNCOMPRESSED_FLAG 0x0
364
365 const char *src = fCompressedBuffer.data()+compressedOffset+1;
366
367 //if this bunch of data is not compressed, modify the compression flag
368 const uint32_t compression = compressedFlag==0 ? UNCOMPRESSED : col.comp;
369 switch (compression)
370 {
371 case UNCOMPRESSED:
372 dest += UncompressUNCOMPRESSED(dest, src, thisRoundNumRows, col.size, col.num);
373 break;
374
375 case SMOOTHMAN:
376 dest += UncompressSMOOTHMAN(reinterpret_cast<int16_t*>(dest), src, thisRoundNumRows, col.size, col.num);
377 break;
378
379 default:
380 ;
381 }
382 }
383 }
384
385};//class zfits
386
387#ifndef __MARS__
388}; //namespace std
389#endif
390
391#endif
Note: See TracBrowser for help on using the repository browser.