source: trunk/Mars/mcore/zofits.h@ 17228

Last change on this file since 17228 was 17228, checked in by tbretz, 11 years ago
Adapted to changes in Queue; Compression queues do not need sorting... first-in-first-out is always correct
File size: 34.1 KB
Line 
1/*
2 * zofits.h
3 *
4 * FACT native compressed FITS writer
5 * Author: lyard
6 */
7
8#include "ofits.h"
9#include "zfits.h"
10#include "Queue.h"
11#include "MemoryManager.h"
12
13#ifdef USE_BOOST_THREADS
14#include <boost/thread.hpp>
15#endif
16
17#ifndef __MARS__
18namespace std
19{
20#else
21using namespace std;
22#endif
23
24
25class zofits : public ofits
26{
27 template<class S>
28 struct QueueMin : std::list<S>
29 {
30 typename std::list<S>::iterator begin()
31 {
32 return min_element(std::list<S>::begin(), std::list<S>::end());
33 }
34 };
35
36 struct WriteTarget
37 {
38 bool operator < (const WriteTarget& other)
39 {
40 return tile_num < other.tile_num;
41 }
42
43 uint32_t tile_num;
44 uint32_t size;
45 shared_ptr<MemoryChunk> target;
46 };
47
48 struct CompressionTarget
49 {
50 /*
51 bool operator < (const CompressionTarget& other)
52 {
53 return target < other.target;
54 }*/
55
56 shared_ptr<MemoryChunk> src;
57 shared_ptr<MemoryChunk> transposed_src;
58 WriteTarget target;
59 uint32_t num_rows;
60 };
61
62public:
63 //constructors
64 zofits(uint32_t numTiles=1000,
65 uint32_t rowPerTile=100,
66 uint64_t maxUsableMem=0) : ofits(),
67 fMemPool(0, maxUsableMem),
68 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
69 {
70 InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
71 SetNumThreads(fgNumQueues);
72 }
73
74 zofits(const char* fname,
75 uint32_t numTiles=1000,
76 uint32_t rowPerTile=100,
77 uint64_t maxUsableMem=0) : ofits(fname),
78 fMemPool(0, maxUsableMem),
79 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
80 {
81 InitMemberVariables(numTiles, rowPerTile, maxUsableMem);
82 SetNumThreads(fgNumQueues);
83 }
84
85 virtual ~zofits()
86 {
87 }
88
89 //initialization of member variables
90 void InitMemberVariables(uint32_t nt=0, uint32_t rpt=0, uint64_t maxUsableMem=0)
91 {
92 if (nt == 0)
93 throw runtime_error("Cannot work with a catalog of size 0. sorry.");
94
95 fCheckOffset = 0;
96
97 fNumTiles = nt;
98 fNumRowsPerTile = rpt;
99
100 fBuffer = NULL;
101 fRealRowWidth = 0;
102 fCatalogExtraRows = 0;
103
104 fCatalogOffset = 0;
105
106 fMaxUsableMem = maxUsableMem;
107#ifdef __EXCEPTIONS
108 fThreadsException = exception_ptr();
109#endif
110 }
111
112
113 //write the header of the binary table
114 virtual bool WriteTableHeader(const char* name="DATA")
115 {
116 if (!reallocateBuffers())
117 throw ("While allocating memory: apparently there not as much free memory as advertized...");
118
119 ofits::WriteTableHeader(name);
120
121 if (fNumQueues != 0)
122 {
123 //start the compression queues
124 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
125 it->start();
126
127 fWriteToDiskQueue.start();
128 }
129
130 //mark that no tile has been written so far
131 fLatestWrittenTile = -1;
132
133 return good();
134 }
135
136 void open(const char* filename, bool addEXTNAMEKey=true)
137 {
138 ofits::open(filename, addEXTNAMEKey);
139
140 //add compression-related header entries
141 SetBool("ZTABLE", true, "Table is compressed");
142 SetInt("ZNAXIS1", 0, "Width of uncompressed rows");
143 SetInt("ZNAXIS2", 0, "Number of uncompressed rows");
144 SetInt("ZPCOUNT", 0, "");
145 SetInt("ZHEAPPTR", 0, "");
146 SetInt("ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
147 SetInt("THEAP", 0, "");
148 SetStr("RAWSUM", " 0", "Checksum of raw little endian data");
149 SetFloat("ZRATIO", 0, "Compression ratio");
150
151 fCatalogExtraRows = 0;
152 fRawSum.reset();
153 }
154
155 virtual bool WriteDrsOffsetsTable()
156 {
157 return good();
158 }
159
160 uint32_t GetBytesPerRow() const
161 {
162 return fRealRowWidth;
163 }
164
165 bool WriteCatalog()
166 {
167 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
168 const uint32_t total_catalog_size = fCatalog.size()*one_catalog_row_size;
169
170 vector<char> swapped_catalog(total_catalog_size);
171 uint32_t shift = 0;
172 for (auto it=fCatalog.begin(); it!=fCatalog.end(); it++)
173 {
174 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
175 shift += one_catalog_row_size;
176 }
177
178 if (fCatalogOffset == 0)
179 {
180 fCatalogOffset = tellp();
181 }
182
183 const off_t where_are_we = tellp();
184
185 seekp(fCatalogOffset);
186 write(swapped_catalog.data(), total_catalog_size);
187 if (where_are_we != fCatalogOffset)
188 seekp(where_are_we);
189
190 fCatalogSum.reset();
191 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
192
193 return good();
194 }
195 virtual void DrsOffsetCalibrate(char* )
196 {
197
198 }
199
200 void GrowCatalog()
201 {
202 uint32_t orig_catalog_size = fCatalog.size();
203
204 fCatalog.resize(fCatalog.size()*2);
205 for (uint32_t i=orig_catalog_size;i<fCatalog.size(); i++)
206 {
207 fCatalog[i].resize(fTable.num_cols);
208 for (auto it=(fCatalog[i].begin()); it!=fCatalog[i].end(); it++)
209 *it = CatalogEntry(0,0);
210 }
211
212 fCatalogExtraRows += orig_catalog_size;
213 fNumTiles += orig_catalog_size;
214 }
215
216 bool WriteRow(const void* ptr, size_t cnt, bool = true)
217 {
218 if (cnt != fRealRowWidth)
219 {
220#ifdef __EXCEPTIONS
221 throw runtime_error("Wrong size of row given to WriteRow");
222#else
223 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << endl;
224 return false;
225#endif
226 }
227
228 if (fTable.num_rows >= fNumRowsPerTile*fNumTiles)
229 {
230// GrowCatalog();
231#ifdef __EXCEPTIONS
232 throw runtime_error("Maximum number of rows exceeded for this file");
233#else
234 gLog << ___err___ << "ERROR - Maximum number of rows exceeded for this file" << endl;
235 return false;
236#endif
237 }
238
239 //copy current row to pool or rows waiting for compression
240 char* target_location = fBuffer + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
241 memcpy(target_location, ptr, fRealRowWidth);
242
243 //for now, make an extra copy of the data, for RAWSUM checksuming.
244 //Ideally this should be moved to the threads, along with the drs-offset-calibration
245 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
246 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
247 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
248 char* buffer = fRawSumBuffer.data() + rawOffset;
249 auto ib = fRawSumBuffer.begin();
250 auto ie = fRawSumBuffer.rbegin();
251 *ib++ = 0;
252 *ib++ = 0;
253 *ib++ = 0;
254 *ib = 0;
255
256 *ie++ = 0;
257 *ie++ = 0;
258 *ie++ = 0;
259 *ie = 0;
260
261 memcpy(buffer, ptr, fRealRowWidth);
262
263 fRawSum.add(fRawSumBuffer, false);
264
265 DrsOffsetCalibrate(target_location);
266
267 fTable.num_rows++;
268
269 if (fTable.num_rows % fNumRowsPerTile == 0)
270 {
271 CompressionTarget compress_target;
272 SetNextCompression(compress_target);
273
274 if (fNumQueues == 0)
275 { //no worker threads. do everything in-line
276 uint64_t size_to_write = CompressBuffer(compress_target);
277
278 WriteTarget write_target;
279 write_target.size = size_to_write;
280 write_target.target = compress_target.target.target;
281 write_target.tile_num = compress_target.target.tile_num;
282
283 if (!WriteBufferToDisk(write_target))
284 throw runtime_error("Something went wrong while writing to disk");
285 }
286 else
287 {
288 //if all queues are empty, use queue 0
289 uint32_t min_index = 0;
290 uint32_t min_size = numeric_limits<uint32_t>::max();
291 uint32_t current_index = 0;
292
293 for (auto it=fCompressionQueues.begin(); it!=fCompressionQueues.end(); it++)
294 {
295 if (it->size() < min_size)
296 {
297 min_index = current_index;
298 min_size = it->size();
299 }
300 current_index++;
301 }
302
303 if (!fCompressionQueues[min_index].post(compress_target))
304 throw runtime_error("I could not post this buffer. This does not make sense...");
305 }
306 }
307
308 return good();
309 }
310
311 void FlushNumRows()
312 {
313 SetInt("NAXIS2", fTable.num_rows/fNumRowsPerTile);
314 SetInt("ZNAXIS2", fTable.num_rows);
315 FlushHeader();
316 }
317
318 void SetNextCompression(CompressionTarget& target)
319 {
320 //get space for transposed data
321 shared_ptr<MemoryChunk> transposed_data = fMemPool.malloc();
322
323 //fill up write to disk target
324 WriteTarget write_target;
325 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
326 write_target.size = 0;
327 write_target.target = fMemPool.malloc();
328
329 //fill up compression target
330 target.src = fSmartBuffer;
331 target.transposed_src = transposed_data;
332 target.target = write_target;
333 target.num_rows = fTable.num_rows;
334
335 //get a new buffer to host the incoming data
336 fSmartBuffer = fMemPool.malloc();
337 fBuffer = fSmartBuffer.get()->get();
338 }
339
340 void ShrinkCatalog()
341 {
342 //did we write more rows than what the catalog could host ?
343 if (fCatalogExtraRows != 0)
344 {
345 //how many rows can the regular catalog host ?
346 const uint32_t max_regular_rows = (fCatalog.size() - fCatalogExtraRows)*fNumRowsPerTile;
347 //what's the shrink factor to be applied ?
348 const uint32_t shrink_factor = fTable.num_rows/max_regular_rows + ((fTable.num_rows%max_regular_rows) ? 1 : 0);
349
350 //shrink the catalog !
351 for (uint32_t i=0; i<fTable.num_rows/fNumRowsPerTile; i+= shrink_factor)
352 {//add the elements one by one, so that the empty ones at the end (i.e. fTable.num_rows%shrink_factor) do not create havok
353 const uint32_t target_catalog_row = i/shrink_factor;
354 //move data from current row (i) to target row
355 for (uint32_t j=0; j<fTable.num_cols; j++)
356 {
357 fCatalog[target_catalog_row][j].second = fCatalog[i][j].second;
358 fCatalog[target_catalog_row][j].first = 0;
359 uint64_t last_size = fCatalog[i][j].first;
360 uint64_t last_offset = fCatalog[i][j].second;
361
362 for (uint32_t k=1; k<shrink_factor; k++)
363 {
364 if (fCatalog[i+k][j].second != 0)
365 {
366 fCatalog[target_catalog_row][j].first += fCatalog[i+k][j].second - last_offset;
367 }
368 else
369 {
370 fCatalog[target_catalog_row][j].first += last_size;
371 break;
372 }
373 last_size = fCatalog[i+k][j].first;
374 last_offset = fCatalog[i+k][j].second;
375 }
376 }
377 }
378
379 fCatalog.resize(fCatalog.size() - fCatalogExtraRows);
380
381 //update header keywords
382 const uint32_t new_num_rows_per_tiles = fNumRowsPerTile*shrink_factor;
383 const uint32_t new_num_tiles_written = (fTable.num_rows + new_num_rows_per_tiles-1)/new_num_rows_per_tiles;
384 SetInt("THEAP", new_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols);
385 SetInt("NAXIS2", new_num_tiles_written);
386 SetInt("ZTILELEN", new_num_rows_per_tiles);
387 cout << "New num rows per tiles: " << new_num_rows_per_tiles << " shrink factor: " << shrink_factor << endl;
388 cout << "Num tiles written: " << new_num_tiles_written << endl;
389 }
390 }
391
392 bool close()
393 {
394 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
395 it->wait();
396
397 fWriteToDiskQueue.wait();
398
399 if (tellp() < 0)
400 {
401#ifdef __EXCEPTIONS
402 throw runtime_error("Something went wrong while writing to disk...");
403#else
404 return false;
405#endif
406 }
407
408#ifdef __EXCEPTIONS
409 //check if something hapenned to the compression threads
410 if (fThreadsException != exception_ptr())
411 {
412 rethrow_exception(fThreadsException);
413 }
414#endif
415
416 if (fTable.num_rows%fNumRowsPerTile != 0)
417 {
418 CompressionTarget compress_target;
419 SetNextCompression(compress_target);
420
421 //set number of threads to zero before calling compressBuffer
422 int32_t backup_num_queues = fNumQueues;
423 fNumQueues = 0;
424 uint64_t size_to_write = CompressBuffer(compress_target);
425 fNumQueues = backup_num_queues;
426
427 WriteTarget write_target;
428 write_target.size = size_to_write;
429 write_target.target = compress_target.target.target;
430 write_target.tile_num = compress_target.target.tile_num;
431
432 if (!WriteBufferToDisk(write_target))
433 throw runtime_error("Something went wrong while writing the last tile...");
434 }
435
436 AlignTo2880Bytes();
437
438 //update header keywords
439 SetInt("ZNAXIS1", fRealRowWidth);
440 SetInt("ZNAXIS2", fTable.num_rows);
441
442 uint64_t heap_offset = fCatalog.size()*fTable.num_cols*sizeof(uint64_t)*2;
443 SetInt("ZHEAPPTR", heap_offset);
444
445 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
446
447 SetInt("THEAP", total_num_tiles_written*2*sizeof(int64_t)*fTable.num_cols);
448
449 SetInt("NAXIS1", 2*sizeof(int64_t)*fTable.num_cols);
450 SetInt("NAXIS2", total_num_tiles_written);
451
452 ostringstream str;
453 str << fRawSum.val();
454 SetStr("RAWSUM", str.str());
455
456 int64_t heap_size = 0;
457 int64_t compressed_offset = 0;
458
459 for (uint32_t i=0; i<total_num_tiles_written; i++)
460 {
461 compressed_offset += sizeof(TileHeader);
462 heap_size += sizeof(TileHeader);
463 for (uint32_t j=0; j<fCatalog[i].size(); j++)
464 {
465 heap_size += fCatalog[i][j].first;
466 fCatalog[i][j].second = compressed_offset;
467 compressed_offset += fCatalog[i][j].first;
468 if (fCatalog[i][j].first == 0)
469 fCatalog[i][j].second = 0;
470 }
471 }
472
473 float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
474 SetFloat("ZRATIO", compression_ratio);
475
476 //add to the heap size the size of the gap between the catalog and the actual heap
477 heap_size += (fCatalog.size() - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
478
479 SetInt("PCOUNT", heap_size, "size of special data area");
480
481
482 //Just for updating the fCatalogSum value
483 WriteCatalog();
484
485 fDataSum += fCatalogSum;
486
487 const Checksum checksm = UpdateHeaderChecksum();
488
489 ofstream::close();
490
491 if ((checksm+fDataSum).valid())
492 return true;
493
494 ostringstream sout;
495 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
496#ifdef __EXCEPTIONS
497 throw runtime_error(sout.str());
498#else
499 gLog << ___err___ << "ERROR - " << sout.str() << endl;
500 return false;
501#endif
502 }
503
504 //Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
505 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
506 {
507 return AddColumn(kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
508 }
509
510 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
511 {
512 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
513 return false;
514
515 Table::Column col;
516 size_t size = SizeFromType(typechar);
517
518 col.name = name;
519 col.type = typechar;
520 col.num = cnt;
521 col.size = size;
522 col.offset = fRealRowWidth;
523
524 fRealRowWidth += size*cnt;
525
526 fRealColumns.emplace_back(CompressedColumn(col, comp));
527
528 ostringstream strKey, strVal, strCom;
529 strKey << "ZFORM" << fRealColumns.size();
530 strVal << cnt << typechar;
531 strCom << "format of " << name << " [" << CommentFromType(typechar);
532 SetStr(strKey.str(), strVal.str(), strCom.str());
533
534 strKey.str("");
535 strVal.str("");
536 strCom.str("");
537 strKey << "ZCTYP" << fRealColumns.size();
538 strVal << "FACT";
539 strCom << "Compression type FACT";
540 SetStr(strKey.str(), strVal.str(), strCom.str());
541
542 return true;
543 }
544
545 static void SetDefaultNumThreads(int32_t num) { fgNumQueues = num;}
546 static int32_t GetDefaultNumThreads() { return fgNumQueues;}
547
548 int32_t GetNumThreads() { return fNumQueues;}
549 bool SetNumThreads(int32_t num)
550 {
551 if (is_open())
552 {
553#ifdef __EXCEPTIONS
554 throw runtime_error("File must be closed before changing the number of compression threads");
555#else
556 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads";
557#endif
558 return false;
559 }
560#ifdef USE_BOOST_THREADS
561 int32_t num_available_cores = boost::thread::hardware_concurrency();
562#else
563 int32_t num_available_cores = thread::hardware_concurrency();
564#endif
565
566 if (num_available_cores == 0)
567 {//could not detect number of available cores from system properties...
568 //Assuming that 5 cores are availables (4 compression, 1 write)
569 num_available_cores = 5;
570 }
571 if (num > num_available_cores)
572 {
573 ostringstream str;
574 str << "Number of threads cannot be greater than physically available (" << num_available_cores << ")";
575#ifdef __EXCEPTIONS
576 throw runtime_error(str.str());
577#else
578 gLog << ___err___ << "ERROR - " << str.str();
579#endif
580 return false;
581 }
582
583 if (num == -1)
584 num = num_available_cores-2; // 1 for writing, one for the main thread
585
586 if (fCompressionQueues.size() == (uint32_t)num)
587 return true;
588
589 //cannot be const, as resize does not want it that way
590 Queue<CompressionTarget> queue(bind(&zofits::CompressBuffer, this, placeholders::_1), false);
591
592 //shrink
593 if ((uint32_t)num < fCompressionQueues.size())
594 {
595 fCompressionQueues.resize(num, queue);
596 return true;
597 }
598
599 //grow
600 fCompressionQueues.resize(num, queue);
601
602 fNumQueues = num;
603
604 return true;
605 }
606
607protected:
608
609 bool reallocateBuffers()
610 {
611 size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming;
612 fMemPool.setChunkSize(chunk_size);
613
614 fSmartBuffer = fMemPool.malloc();
615 fBuffer = fSmartBuffer.get()->get();
616
617 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
618
619 //give the catalog enough space
620 fCatalog.resize(fNumTiles);
621 for (uint32_t i=0;i<fNumTiles;i++)
622 {
623 fCatalog[i].resize(fRealColumns.size());
624 for (auto it=fCatalog[i].begin(); it!=fCatalog[i].end(); it++)
625 *it = CatalogEntry(0,0);
626 }
627 return true;
628 }
629
630 bool writeCompressedDataToDisk(char* src, uint32_t sizeToWrite)
631 {
632 char* checkSumPointer = src+4;
633 int32_t extraBytes = 0;
634 uint32_t sizeToChecksum = sizeToWrite;
635 if (fCheckOffset != 0)
636 {//should we extend the array to the left ?
637 sizeToChecksum += fCheckOffset;
638 checkSumPointer -= fCheckOffset;
639 memset(checkSumPointer, 0, fCheckOffset);
640 }
641 if (sizeToChecksum%4 != 0)
642 {//should we extend the array to the right ?
643 extraBytes = 4 - (sizeToChecksum%4);
644 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
645 sizeToChecksum += extraBytes;
646 }
647
648 //do the checksum
649 fDataSum.add(checkSumPointer, sizeToChecksum);
650
651 fCheckOffset = (4 - extraBytes)%4;
652 //write data to disk
653 write(src+4, sizeToWrite);
654
655 return good();
656 }
657
658 uint32_t CompressBuffer(const CompressionTarget& target)
659 {
660 uint64_t compressed_size = 0;
661#ifdef __EXCEPTIONS
662 try
663 {
664#endif
665 //transpose the original data
666 copyTransposeTile(target.src.get()->get(), target.transposed_src.get()->get());
667
668 //compress the buffer
669 compressed_size = compressBuffer(target.target.target.get()->get(), target.transposed_src.get()->get(), target.num_rows);
670#ifdef __EXCEPTIONS
671 }
672 catch (...)
673 {
674 fThreadsException = current_exception();
675 if (fNumQueues == 0)
676 rethrow_exception(fThreadsException);
677 }
678#endif
679
680 if (fNumQueues == 0)
681 return compressed_size;
682
683 //post the result to the writing queue
684 //get a copy so that it becomes non-const
685 WriteTarget wt;
686 wt.tile_num = target.target.tile_num;
687 wt.size = compressed_size;
688 wt.target = target.target.target;
689
690 fWriteToDiskQueue.post(wt);
691
692 return compressed_size;
693 }
694
695 bool WriteBufferToDisk(const WriteTarget& target)
696 {
697 //is this the tile we're supposed to write ?
698 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
699 return false;
700
701 fLatestWrittenTile++;
702
703 //write the buffer to disk.
704 return writeCompressedDataToDisk(target.target.get()->get(), target.size);
705 }
706
707 //src cannot be const, as applySMOOTHING is done in place
708 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows)
709 {
710 uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
711 uint32_t offset=0;
712 uint32_t currentCatalogRow = (num_rows-1)/fNumRowsPerTile;
713
714 //skip the checksum reserved area
715 dest += 4;
716
717 //skip the 'TILE' marker and tile size entry
718 uint64_t compressedOffset = sizeof(TileHeader);
719
720 //now compress each column one by one by calling compression on arrays
721 for (uint32_t i=0;i<fRealColumns.size();i++)
722 {
723 fCatalog[currentCatalogRow][i].second = compressedOffset;
724
725 if (fRealColumns[i].col.num == 0) continue;
726
727 Compression& head = fRealColumns[i].block_head;
728
729 //set the default byte telling if uncompressed the compressed Flag
730 uint64_t previousOffset = compressedOffset;
731
732 //skip header data
733 compressedOffset += head.getSizeOnDisk();
734
735 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
736 {
737 switch (head.getProc(j))
738 {
739 case kFactRaw:
740 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
741 break;
742 case kFactSmoothing:
743 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
744 break;
745 case kFactHuffman16:
746 if (head.getOrdering() == kOrderByCol)
747 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
748 else
749 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
750 break;
751 }
752 }
753
754 //check if compressed size is larger than uncompressed
755 if ((head.getProc(0) != kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
756 {//if so set flag and redo it uncompressed
757 cout << "Redoing uncompressed ! " << endl;
758 //de-smooth !
759 if (head.getProc(0) == kFactSmoothing)
760 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
761
762 Compression he;
763
764 compressedOffset = previousOffset + he.getSizeOnDisk();
765 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
766
767 he.SetBlockSize(compressedOffset - previousOffset);
768 he.Memcpy(dest+previousOffset);
769
770 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
771
772 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
773 continue;
774 }
775
776 head.SetBlockSize(compressedOffset - previousOffset);
777 head.Memcpy(dest + previousOffset);
778
779 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
780 fCatalog[currentCatalogRow][i].first = compressedOffset - fCatalog[currentCatalogRow][i].second;
781 }
782
783 TileHeader tile_head(thisRoundNumRows, compressedOffset);
784 memcpy(dest, &tile_head, sizeof(TileHeader));
785
786 return compressedOffset;
787 }
788
789 void copyTransposeTile(const char* src, char* dest)
790 {
791 uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
792
793 //copy the tile and transpose it
794 for (uint32_t i=0;i<fRealColumns.size();i++)
795 {
796 switch (fRealColumns[i].block_head.getOrdering())
797 {
798 case kOrderByRow:
799 for (uint32_t k=0;k<thisRoundNumRows;k++)
800 {//regular, "semi-transposed" copy
801 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
802 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
803 }
804 break;
805
806 case kOrderByCol :
807 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
808 for (uint32_t k=0;k<thisRoundNumRows;k++)
809 {//transposed copy
810 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
811 dest += fRealColumns[i].col.size;
812 }
813 break;
814 };
815 }
816 }
817
818 /// Specific compression functions
819 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
820 {
821 memcpy(dest, src, size);
822 return size;
823 }
824
825 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
826 {
827 string huffmanOutput;
828 uint32_t previousHuffmanSize = 0;
829 if (numRows < 2)
830 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
831 return numRows*sizeOfElems*numRowElems + 1000;
832 }
833 if (sizeOfElems < 2 )
834 {
835#ifdef __EXCEPTIONS
836 throw runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
837#else
838 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types";
839 return 0;
840#endif
841 }
842 uint32_t huffmanOffset = 0;
843 for (uint32_t j=0;j<numRowElems;j++)
844 {
845 Huffman::Encode(huffmanOutput,
846 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
847 numRows*(sizeOfElems/2));
848 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
849 huffmanOffset += sizeof(uint32_t);
850 previousHuffmanSize = huffmanOutput.size();
851 }
852 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
853
854 //only copy if not larger than not-compressed size
855 if (totalSize < numRows*sizeOfElems*numRowElems)
856 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
857
858 return totalSize;
859 }
860
861 uint32_t applySMOOTHING(char* data, uint32_t numElems)//uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
862 {
863 int16_t* short_data = reinterpret_cast<int16_t*>(data);
864 for (int j=numElems-1;j>1;j--)
865 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
866
867 return numElems*sizeof(int16_t);
868 }
869 // Apply the inverse transform of the integer smoothing
870 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
871 {
872 int16_t* short_data = reinterpret_cast<int16_t*>(data);
873 //un-do the integer smoothing
874 for (uint32_t j=2;j<numElems;j++)
875 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
876
877 return numElems*sizeof(uint16_t);
878 }
879 //Compressed data stuff
880 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
881 uint32_t fNumTiles;
882 uint32_t fNumRowsPerTile;
883
884 MemoryManager fMemPool;
885
886 //thread related stuff
887 vector<Queue<CompressionTarget>> fCompressionQueues;
888 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue;
889
890 //thread related stuff
891 static int32_t fgNumQueues; ///< The number of threads that will be used to compress
892 int32_t fNumQueues; ///< The number of threads that will be used to compress
893
894 int32_t fLatestWrittenTile;
895#ifdef __EXCEPTIONS
896 exception_ptr fThreadsException;
897#endif
898 struct CatalogEntry
899 {
900 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};
901 int64_t first;
902 int64_t second;
903 } __attribute__((__packed__));
904
905 typedef vector<CatalogEntry> CatalogRow;
906 typedef vector<CatalogRow> CatalogType;
907 CatalogType fCatalog;
908 Checksum fCatalogSum;
909 Checksum fRawSum;
910 off_t fCatalogOffset;
911 uint32_t fRealRowWidth;
912 uint32_t fCatalogExtraRows;
913 vector<char> fRawSumBuffer;
914 uint64_t fMaxUsableMem;
915
916 shared_ptr<MemoryChunk> fSmartBuffer;
917 char* fBuffer;
918
919 struct CompressedColumn
920 {
921 CompressedColumn(const Table::Column& c, const Compression& h) : col(c),
922 block_head(h)
923 {}
924 Table::Column col;
925 Compression block_head;
926 };
927 vector<CompressedColumn> fRealColumns;
928
929};
930
931int32_t zofits::fgNumQueues = 0;
932
933#ifndef __MARS__
934}; //namespace std
935#endif
Note: See TracBrowser for help on using the repository browser.