source: trunk/Mars/mcore/zofits.h@ 17225

Last change on this file since 17225 was 17225, checked in by tbretz, 11 years ago
Fixed initialization with static data member; removed buggy example; adapted AddColumn; removed obsolete error messages
File size: 33.9 KB
Line 
1/*
2 * zofits.h
3 *
4 * FACT native compressed FITS writer
5 * Author: lyard
6 */
7
8#include "ofits.h"
9#include "zfits.h"
10#include "Queue.h"
11#include "MemoryManager.h"
12
13#ifdef USE_BOOST_THREADS
14#include <boost/thread.hpp>
15#endif
16
17#ifndef __MARS__
18namespace std
19{
20#else
21using namespace std;
22#endif
23
24
25class zofits : public ofits
26{
27 public:
28
29 struct WriteTarget
30 {
31 bool operator < (const WriteTarget& other)
32 {
33 return tile_num < other.tile_num;
34 }
35 uint32_t tile_num;
36 uint32_t size;
37 shared_ptr<MemoryChunk> target;
38 };
39
40 struct CompressionTarget
41 {
42 bool operator < (const CompressionTarget& other)
43 {
44 return target < other.target;
45 }
46 shared_ptr<MemoryChunk> src;
47 shared_ptr<MemoryChunk> transposed_src;
48 WriteTarget target;
49 uint32_t num_rows;
50 };
51
52
53 //constructors
54 zofits(uint32_t numTiles=1000,
55 uint32_t rowPerTile=100,
56 uint64_t maxUsableMem=0) : ofits(),
57 fMemPool(0, maxUsableMem),
58 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true, false)
59 {
60 InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
61 SetNumThreads(fgNumQueues);
62 }
63
64 zofits(const char* fname,
65 uint32_t numTiles=1000,
66 uint32_t rowPerTile=100,
67 uint64_t maxUsableMem=0) : ofits(fname),
68 fMemPool(0, maxUsableMem),
69 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), true, false)
70 {
71 InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
72 SetNumThreads(fgNumQueues);
73 }
74
75 virtual ~zofits()
76 {
77 }
78
79 //initialization of member variables
80 void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0, uint64_t maxUsableMem=0)
81 {
82 if (nt == 0)
83 throw runtime_error("Cannot work with a catalog of size 0. sorry.");
84
85 fCheckOffset = 0;
86
87 fNumTiles = nt;
88 fNumRowsPerTile = rpt;
89
90 fBuffer = NULL;
91 fRealRowWidth = 0;
92 fCatalogExtraRows = 0;
93
94 fCatalogOffset = 0;
95
96 fMaxUsableMem = maxUsableMem;
97#ifdef __EXCEPTIONS
98 fThreadsException = exception_ptr();
99#endif
100 }
101
102
103 //write the header of the binary table
104 virtual bool WriteTableHeader(const char* name="DATA")
105 {
106 if (!reallocateBuffers())
107 throw ("While allocating memory: apparently there not as much free memory as advertized...");
108
109 ofits::WriteTableHeader(name);
110
111 if (fNumQueues != 0)
112 {
113 //start the compression queues
114 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
115 it->start();
116
117 fWriteToDiskQueue.start();
118 }
119
120 //mark that no tile has been written so far
121 fLatestWrittenTile = -1;
122
123 return good();
124 }
125
126 void open(const char* filename, bool addEXTNAMEKey=true)
127 {
128 ofits::open(filename, addEXTNAMEKey);
129
130 //add compression-related header entries
131 SetBool("ZTABLE", true, "Table is compressed");
132 SetInt("ZNAXIS1", 0, "Width of uncompressed rows");
133 SetInt("ZNAXIS2", 0, "Number of uncompressed rows");
134 SetInt("ZPCOUNT", 0, "");
135 SetInt("ZHEAPPTR", 0, "");
136 SetInt("ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
137 SetInt("THEAP", 0, "");
138 SetStr("RAWSUM", " 0", "Checksum of raw little endian data");
139 SetFloat("ZRATIO", 0, "Compression ratio");
140
141 fCatalogExtraRows = 0;
142 fRawSum.reset();
143 }
144
145 virtual bool WriteDrsOffsetsTable()
146 {
147 return good();
148 }
149
150 uint32_t GetBytesPerRow() const
151 {
152 return fRealRowWidth;
153 }
154
155 bool WriteCatalog()
156 {
157 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
158 const uint32_t total_catalog_size = fCatalog.size()*one_catalog_row_size;
159
160 vector<char> swapped_catalog(total_catalog_size);
161 uint32_t shift = 0;
162 for (auto it=fCatalog.begin(); it!=fCatalog.end(); it++)
163 {
164 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
165 shift += one_catalog_row_size;
166 }
167
168 if (fCatalogOffset == 0)
169 {
170 fCatalogOffset = tellp();
171 }
172
173 const off_t where_are_we = tellp();
174
175 seekp(fCatalogOffset);
176 write(swapped_catalog.data(), total_catalog_size);
177 if (where_are_we != fCatalogOffset)
178 seekp(where_are_we);
179
180 fCatalogSum.reset();
181 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
182
183 return good();
184 }
185 virtual void DrsOffsetCalibrate(char* )
186 {
187
188 }
189
190 void GrowCatalog()
191 {
192 uint32_t orig_catalog_size = fCatalog.size();
193
194 fCatalog.resize(fCatalog.size()*2);
195 for (uint32_t i=orig_catalog_size;i<fCatalog.size(); i++)
196 {
197 fCatalog[i].resize(fTable.num_cols);
198 for (auto it=(fCatalog[i].begin()); it!=fCatalog[i].end(); it++)
199 *it = CatalogEntry(0,0);
200 }
201
202 fCatalogExtraRows += orig_catalog_size;
203 fNumTiles += orig_catalog_size;
204 }
205
206 bool WriteRow(const void* ptr, size_t cnt, bool = true)
207 {
208 if (cnt != fRealRowWidth)
209 {
210#ifdef __EXCEPTIONS
211 throw runtime_error("Wrong size of row given to WriteRow");
212#else
213 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << endl;
214 return false;
215#endif
216 }
217
218 if (fTable.num_rows >= fNumRowsPerTile*fNumTiles)
219 {
220// GrowCatalog();
221#ifdef __EXCEPTIONS
222 throw runtime_error("Maximum number of rows exceeded for this file");
223#else
224 gLog << ___err___ << "ERROR - Maximum number of rows exceeded for this file" << endl;
225 return false;
226#endif
227 }
228
229 //copy current row to pool or rows waiting for compression
230 char* target_location = fBuffer + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
231 memcpy(target_location, ptr, fRealRowWidth);
232
233 //for now, make an extra copy of the data, for RAWSUM checksuming.
234 //Ideally this should be moved to the threads, along with the drs-offset-calibration
235 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
236 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
237 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
238 char* buffer = fRawSumBuffer.data() + rawOffset;
239 auto ib = fRawSumBuffer.begin();
240 auto ie = fRawSumBuffer.rbegin();
241 *ib++ = 0;
242 *ib++ = 0;
243 *ib++ = 0;
244 *ib = 0;
245
246 *ie++ = 0;
247 *ie++ = 0;
248 *ie++ = 0;
249 *ie = 0;
250
251 memcpy(buffer, ptr, fRealRowWidth);
252
253 fRawSum.add(fRawSumBuffer, false);
254
255 DrsOffsetCalibrate(target_location);
256
257 fTable.num_rows++;
258
259 if (fTable.num_rows % fNumRowsPerTile == 0)
260 {
261 CompressionTarget compress_target;
262 SetNextCompression(compress_target);
263
264 if (fNumQueues == 0)
265 { //no worker threads. do everything in-line
266 uint64_t size_to_write = CompressBuffer(compress_target);
267
268 WriteTarget write_target;
269 write_target.size = size_to_write;
270 write_target.target = compress_target.target.target;
271 write_target.tile_num = compress_target.target.tile_num;
272
273 if (!WriteBufferToDisk(write_target))
274 throw runtime_error("Something went wrong while writing to disk");
275 }
276 else
277 {
278 //if all queues are empty, use queue 0
279 uint32_t min_index = 0;
280 uint32_t min_size = numeric_limits<uint32_t>::max();
281 uint32_t current_index = 0;
282
283 for (auto it=fCompressionQueues.begin(); it!=fCompressionQueues.end(); it++)
284 {
285 if (it->size() < min_size)
286 {
287 min_index = current_index;
288 min_size = it->size();
289 }
290 current_index++;
291 }
292
293 if (!fCompressionQueues[min_index].post(compress_target))
294 throw runtime_error("I could not post this buffer. This does not make sense...");
295 }
296 }
297
298 return good();
299 }
300
301 void FlushNumRows()
302 {
303 SetInt("NAXIS2", fTable.num_rows/fNumRowsPerTile);
304 SetInt("ZNAXIS2", fTable.num_rows);
305 FlushHeader();
306 }
307
308 void SetNextCompression(CompressionTarget& target)
309 {
310 //get space for transposed data
311 shared_ptr<MemoryChunk> transposed_data = fMemPool.malloc();
312
313 //fill up write to disk target
314 WriteTarget write_target;
315 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
316 write_target.size = 0;
317 write_target.target = fMemPool.malloc();
318
319 //fill up compression target
320 target.src = fSmartBuffer;
321 target.transposed_src = transposed_data;
322 target.target = write_target;
323 target.num_rows = fTable.num_rows;
324
325 //get a new buffer to host the incoming data
326 fSmartBuffer = fMemPool.malloc();
327 fBuffer = fSmartBuffer.get()->get();
328 }
329
330 void ShrinkCatalog()
331 {
332 //did we write more rows than what the catalog could host ?
333 if (fCatalogExtraRows != 0)
334 {
335 //how many rows can the regular catalog host ?
336 const uint32_t max_regular_rows = (fCatalog.size() - fCatalogExtraRows)*fNumRowsPerTile;
337 //what's the shrink factor to be applied ?
338 const uint32_t shrink_factor = fTable.num_rows/max_regular_rows + ((fTable.num_rows%max_regular_rows) ? 1 : 0);
339
340 //shrink the catalog !
341 for (uint32_t i=0; i<fTable.num_rows/fNumRowsPerTile; i+= shrink_factor)
342 {//add the elements one by one, so that the empty ones at the end (i.e. fTable.num_rows%shrink_factor) do not create havok
343 const uint32_t target_catalog_row = i/shrink_factor;
344 //move data from current row (i) to target row
345 for (uint32_t j=0; j<fTable.num_cols; j++)
346 {
347 fCatalog[target_catalog_row][j].second = fCatalog[i][j].second;
348 fCatalog[target_catalog_row][j].first = 0;
349 uint64_t last_size = fCatalog[i][j].first;
350 uint64_t last_offset = fCatalog[i][j].second;
351
352 for (uint32_t k=1; k<shrink_factor; k++)
353 {
354 if (fCatalog[i+k][j].second != 0)
355 {
356 fCatalog[target_catalog_row][j].first += fCatalog[i+k][j].second - last_offset;
357 }
358 else
359 {
360 fCatalog[target_catalog_row][j].first += last_size;
361 break;
362 }
363 last_size = fCatalog[i+k][j].first;
364 last_offset = fCatalog[i+k][j].second;
365 }
366 }
367 }
368
369 fCatalog.resize(fCatalog.size() - fCatalogExtraRows);
370
371 //update header keywords
372 const uint32_t new_num_rows_per_tiles = fNumRowsPerTile*shrink_factor;
373 const uint32_t new_num_tiles_written = (fTable.num_rows + new_num_rows_per_tiles-1)/new_num_rows_per_tiles;
374 SetInt("THEAP", new_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols);
375 SetInt("NAXIS2", new_num_tiles_written);
376 SetInt("ZTILELEN", new_num_rows_per_tiles);
377 cout << "New num rows per tiles: " << new_num_rows_per_tiles << " shrink factor: " << shrink_factor << endl;
378 cout << "Num tiles written: " << new_num_tiles_written << endl;
379 }
380 }
381
382 bool close()
383 {
384 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
385 it->wait();
386
387 fWriteToDiskQueue.wait();
388
389 if (tellp() < 0)
390 {
391#ifdef __EXCEPTIONS
392 throw runtime_error("Something went wrong while writing to disk...");
393#else
394 return false;
395#endif
396 }
397
398#ifdef __EXCEPTIONS
399 //check if something hapenned to the compression threads
400 if (fThreadsException != exception_ptr())
401 {
402 rethrow_exception(fThreadsException);
403 }
404#endif
405
406 if (fTable.num_rows%fNumRowsPerTile != 0)
407 {
408 CompressionTarget compress_target;
409 SetNextCompression(compress_target);
410
411 //set number of threads to zero before calling compressBuffer
412 int32_t backup_num_queues = fNumQueues;
413 fNumQueues = 0;
414 uint64_t size_to_write = CompressBuffer(compress_target);
415 fNumQueues = backup_num_queues;
416
417 WriteTarget write_target;
418 write_target.size = size_to_write;
419 write_target.target = compress_target.target.target;
420 write_target.tile_num = compress_target.target.tile_num;
421
422 if (!WriteBufferToDisk(write_target))
423 throw runtime_error("Something went wrong while writing the last tile...");
424 }
425
426 AlignTo2880Bytes();
427
428 //update header keywords
429 SetInt("ZNAXIS1", fRealRowWidth);
430 SetInt("ZNAXIS2", fTable.num_rows);
431
432 uint64_t heap_offset = fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2;
433 SetInt("ZHEAPPTR", heap_offset);
434
435 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
436
437 SetInt("THEAP", total_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols);
438
439 SetInt("NAXIS1", 2*sizeof(int64_t)*fTable.num_cols);
440 SetInt("NAXIS2", total_num_tiles_written);
441
442 ostringstream str;
443 str << fRawSum.val();
444 SetStr("RAWSUM", str.str());
445
446 int64_t heap_size = 0;
447 int64_t compressed_offset = 0;
448
449 for (uint32_t i=0; i<total_num_tiles_written; i++)
450 {
451 compressed_offset += sizeof(TileHeader);
452 heap_size += sizeof(TileHeader);
453 for (uint32_t j=0; j<fCatalog[i].size(); j++)
454 {
455 heap_size += fCatalog[i][j].first;
456 fCatalog[i][j].second = compressed_offset;
457 compressed_offset += fCatalog[i][j].first;
458 if (fCatalog[i][j].first == 0)
459 fCatalog[i][j].second = 0;
460 }
461 }
462
463 float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
464 SetFloat("ZRATIO", compression_ratio);
465
466 //add to the heap size the size of the gap between the catalog and the actual heap
467 heap_size += (fCatalog.size() - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
468
469 SetInt("PCOUNT", heap_size, "size of special data area");
470
471
472 //Just for updating the fCatalogSum value
473 WriteCatalog();
474
475 fDataSum += fCatalogSum;
476
477 const Checksum checksm = UpdateHeaderChecksum();
478
479 ofstream::close();
480
481 if ((checksm+fDataSum).valid())
482 return true;
483
484 ostringstream sout;
485 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
486#ifdef __EXCEPTIONS
487 throw runtime_error(sout.str());
488#else
489 gLog << ___err___ << "ERROR - " << sout.str() << endl;
490 return false;
491#endif
492 }
493
494 //Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
495 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
496 {
497 return AddColumn(kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
498 }
499
500 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
501 {
502 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
503 return false;
504
505 Table::Column col;
506 size_t size = SizeFromType(typechar);
507
508 col.name = name;
509 col.type = typechar;
510 col.num = cnt;
511 col.size = size;
512 col.offset = fRealRowWidth;
513
514 fRealRowWidth += size*cnt;
515
516 fRealColumns.emplace_back(CompressedColumn(col, comp));
517
518 ostringstream strKey, strVal, strCom;
519 strKey << "ZFORM" << fRealColumns.size();
520 strVal << cnt << typechar;
521 strCom << "format of " << name << " [" << CommentFromType(typechar);
522 SetStr(strKey.str(), strVal.str(), strCom.str());
523
524 strKey.str("");
525 strVal.str("");
526 strCom.str("");
527 strKey << "ZCTYP" << fRealColumns.size();
528 strVal << "FACT";
529 strCom << "Compression type FACT";
530 SetStr(strKey.str(), strVal.str(), strCom.str());
531
532 return true;
533 }
534
535 static void SetDefaultNumThreads(int32_t num) { fgNumQueues = num;}
536 static int32_t GetDefaultNumThreads() { return fgNumQueues;}
537
538 int32_t GetNumThreads() { return fNumQueues;}
539 bool SetNumThreads(int32_t num)
540 {
541 if (is_open())
542 {
543#ifdef __EXCEPTIONS
544 throw runtime_error("File must be closed before changing the number of compression threads");
545#else
546 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads";
547#endif
548 return false;
549 }
550#ifdef USE_BOOST_THREADS
551 int32_t num_available_cores = boost::thread::hardware_concurrency();
552#else
553 int32_t num_available_cores = thread::hardware_concurrency();
554#endif
555
556 if (num_available_cores == 0)
557 {//could not detect number of available cores from system properties...
558 //Assuming that 5 cores are availables (4 compression, 1 write)
559 num_available_cores = 5;
560 }
561 if (num > num_available_cores)
562 {
563 ostringstream str;
564 str << "Number of threads cannot be greater than physically available (" << num_available_cores << ")";
565#ifdef __EXCEPTIONS
566 throw runtime_error(str.str());
567#else
568 gLog << ___err___ << "ERROR - " << str.str();
569#endif
570 return false;
571 }
572
573 if (num == -1)
574 num = num_available_cores-2; // 1 for writing, one for the main thread
575
576 if (fCompressionQueues.size() == (uint32_t)num)
577 return true;
578
579 //cannot be const, as resize does not want it that way
580 Queue<CompressionTarget> queue(bind(&zofits::CompressBuffer, this, placeholders::_1), false, false);
581
582 //shrink
583 if ((uint32_t)num < fCompressionQueues.size())
584 {
585 fCompressionQueues.resize(num, queue);
586 return true;
587 }
588
589 //grow
590 fCompressionQueues.resize(num, queue);
591
592 fNumQueues = num;
593
594 return true;
595 }
596
597protected:
598
599 bool reallocateBuffers()
600 {
601 size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming;
602 fMemPool.setChunkSize(chunk_size);
603
604 fSmartBuffer = fMemPool.malloc();
605 fBuffer = fSmartBuffer.get()->get();
606
607 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
608
609 //give the catalog enough space
610 fCatalog.resize(fNumTiles);
611 for (uint32_t i=0;i<fNumTiles;i++)
612 {
613 fCatalog[i].resize(fRealColumns.size());
614 for (auto it=fCatalog[i].begin(); it!=fCatalog[i].end(); it++)
615 *it = CatalogEntry(0,0);
616 }
617 return true;
618 }
619
620 bool writeCompressedDataToDisk(char* src, uint32_t sizeToWrite)
621 {
622 char* checkSumPointer = src+4;
623 int32_t extraBytes = 0;
624 uint32_t sizeToChecksum = sizeToWrite;
625 if (fCheckOffset != 0)
626 {//should we extend the array to the left ?
627 sizeToChecksum += fCheckOffset;
628 checkSumPointer -= fCheckOffset;
629 memset(checkSumPointer, 0, fCheckOffset);
630 }
631 if (sizeToChecksum%4 != 0)
632 {//should we extend the array to the right ?
633 extraBytes = 4 - (sizeToChecksum%4);
634 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
635 sizeToChecksum += extraBytes;
636 }
637
638 //do the checksum
639 fDataSum.add(checkSumPointer, sizeToChecksum);
640
641 fCheckOffset = (4 - extraBytes)%4;
642 //write data to disk
643 write(src+4, sizeToWrite);
644
645 return good();
646 }
647
648 uint32_t CompressBuffer(const CompressionTarget& target)
649 {
650 uint64_t compressed_size = 0;
651#ifdef __EXCEPTIONS
652 try
653 {
654#endif
655 //transpose the original data
656 copyTransposeTile(target.src.get()->get(), target.transposed_src.get()->get());
657
658 //compress the buffer
659 compressed_size = compressBuffer(target.target.target.get()->get(), target.transposed_src.get()->get(), target.num_rows);
660#ifdef __EXCEPTIONS
661 }
662 catch (...)
663 {
664 fThreadsException = current_exception();
665 if (fNumQueues == 0)
666 rethrow_exception(fThreadsException);
667 }
668#endif
669
670 if (fNumQueues == 0)
671 return compressed_size;
672
673 //post the result to the writing queue
674 //get a copy so that it becomes non-const
675 WriteTarget wt;
676 wt.tile_num = target.target.tile_num;
677 wt.size = compressed_size;
678 wt.target = target.target.target;
679
680 fWriteToDiskQueue.post(wt);
681
682 return compressed_size;
683 }
684
685 bool WriteBufferToDisk(const WriteTarget& target)
686 {
687 //is this the tile we're supposed to write ?
688 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
689 return false;
690
691 fLatestWrittenTile++;
692
693 //write the buffer to disk.
694 return writeCompressedDataToDisk(target.target.get()->get(), target.size);
695 }
696
697 //src cannot be const, as applySMOOTHING is done in place
698 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows)
699 {
700 uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
701 uint32_t offset=0;
702 uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile;
703
704 //skip the checksum reserved area
705 dest += 4;
706
707 //skip the 'TILE' marker and tile size entry
708 uint64_t compressedOffset = sizeof(TileHeader);
709
710 //now compress each column one by one by calling compression on arrays
711 for (uint32_t i=0;i<fRealColumns.size();i++)
712 {
713 fCatalog[currentCatalogRow][i].second = compressedOffset;
714
715 if (fRealColumns[i].col.num == 0) continue;
716
717 Compression& head = fRealColumns[i].block_head;
718
719 //set the default byte telling if uncompressed the compressed Flag
720 uint64_t previousOffset = compressedOffset;
721
722 //skip header data
723 compressedOffset += head.getSizeOnDisk();
724
725 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
726 {
727 switch (head.getProc(j))
728 {
729 case kFactRaw:
730 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
731 break;
732 case kFactSmoothing:
733 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
734 break;
735 case kFactHuffman16:
736 if (head.getOrdering() == kOrderByCol)
737 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
738 else
739 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
740 break;
741 }
742 }
743
744 //check if compressed size is larger than uncompressed
745 if ((head.getProc(0) != kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
746 {//if so set flag and redo it uncompressed
747 cout << "Redoing uncompressed ! " << endl;
748 //de-smooth !
749 if (head.getProc(0) == kFactSmoothing)
750 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
751
752 Compression he;
753
754 compressedOffset = previousOffset + he.getSizeOnDisk();
755 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
756
757 he.SetBlockSize(compressedOffset - previousOffset);
758 he.Memcpy(dest+previousOffset);
759
760 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
761
762 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
763 continue;
764 }
765
766 head.SetBlockSize(compressedOffset - previousOffset);
767 head.Memcpy(dest + previousOffset);
768
769 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
770 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
771 }
772
773 TileHeader tile_head(thisRoundNumRows, compressedOffset);
774 memcpy(dest, &tile_head, sizeof(TileHeader));
775
776 return compressedOffset;
777 }
778
779 void copyTransposeTile(const char* src, char* dest)
780 {
781 uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
782
783 //copy the tile and transpose it
784 for (uint32_t i=0;i<fRealColumns.size();i++)
785 {
786 switch (fRealColumns[i].block_head.getOrdering())
787 {
788 case kOrderByRow:
789 for (uint32_t k=0;k<thisRoundNumRows;k++)
790 {//regular, "semi-transposed" copy
791 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
792 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
793 }
794 break;
795
796 case kOrderByCol :
797 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
798 for (uint32_t k=0;k<thisRoundNumRows;k++)
799 {//transposed copy
800 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
801 dest += fRealColumns[i].col.size;
802 }
803 break;
804 };
805 }
806 }
807
808 /// Specific compression functions
809 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
810 {
811 memcpy(dest, src, size);
812 return size;
813 }
814
815 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
816 {
817 string huffmanOutput;
818 uint32_t previousHuffmanSize = 0;
819 if (numRows < 2)
820 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
821 return numRows*sizeOfElems*numRowElems + 1000;
822 }
823 if (sizeOfElems < 2 )
824 {
825#ifdef __EXCEPTIONS
826 throw runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
827#else
828 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types";
829 return 0;
830#endif
831 }
832 uint32_t huffmanOffset = 0;
833 for (uint32_t j=0;j<numRowElems;j++)
834 {
835 Huffman::Encode(huffmanOutput,
836 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
837 numRows*(sizeOfElems/2));
838 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
839 huffmanOffset += sizeof(uint32_t);
840 previousHuffmanSize = huffmanOutput.size();
841 }
842 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
843
844 //only copy if not larger than not-compressed size
845 if (totalSize < numRows*sizeOfElems*numRowElems)
846 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
847
848 return totalSize;
849 }
850
851 uint32_t applySMOOTHING(char* data, uint32_t numElems)//uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
852 {
853 int16_t* short_data = reinterpret_cast<int16_t*>(data);
854 for (int j=numElems-1;j>1;j--)
855 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
856
857 return numElems*sizeof(int16_t);
858 }
859 // Apply the inverse transform of the integer smoothing
860 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
861 {
862 int16_t* short_data = reinterpret_cast<int16_t*>(data);
863 //un-do the integer smoothing
864 for (uint32_t j=2;j<numElems;j++)
865 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
866
867 return numElems*sizeof(uint16_t);
868 }
869 //Compressed data stuff
870 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
871 uint32_t fNumTiles;
872 uint32_t fNumRowsPerTile;
873
874 MemoryManager fMemPool;
875
876 //thread related stuff
877 vector<Queue<CompressionTarget>> fCompressionQueues;
878 Queue<WriteTarget> fWriteToDiskQueue;
879
880 //thread related stuff
881 static int32_t fgNumQueues; ///< The number of threads that will be used to compress
882 int32_t fNumQueues; ///< The number of threads that will be used to compress
883
884 int32_t fLatestWrittenTile;
885#ifdef __EXCEPTIONS
886 exception_ptr fThreadsException;
887#endif
888 struct CatalogEntry
889 {
890 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};
891 int64_t first;
892 int64_t second;
893 } __attribute__((__packed__));
894
895 typedef vector<CatalogEntry> CatalogRow;
896 typedef vector<CatalogRow> CatalogType;
897 CatalogType fCatalog;
898 Checksum fCatalogSum;
899 Checksum fRawSum;
900 off_t fCatalogOffset;
901 uint32_t fRealRowWidth;
902 uint32_t fCatalogExtraRows;
903 vector<char> fRawSumBuffer;
904 uint64_t fMaxUsableMem;
905
906 shared_ptr<MemoryChunk> fSmartBuffer;
907 char* fBuffer;
908
909 struct CompressedColumn
910 {
911 CompressedColumn(const Table::Column& c, const Compression& h) : col(c),
912 block_head(h)
913 {}
914 Table::Column col;
915 Compression block_head;
916 };
917 vector<CompressedColumn> fRealColumns;
918
919};
920
921int32_t zofits::fgNumQueues = 0;
922
923#ifndef __MARS__
924}; //namespace std
925#endif
Note: See TracBrowser for help on using the repository browser.