source: trunk/Mars/mcore/zofits.h@ 17254

Last change on this file since 17254 was 17254, checked in by tbretz, 11 years ago
Use a shared_ptr with aliasing of char* instead of a MemoryChunk
File size: 39.9 KB
Line 
1#ifndef FACT_zofits
2#define FACT_zofits
3
4/*
5 * zofits.h
6 *
7 * FACT native compressed FITS writer
8 * Author: lyard
9 */
10
11#include "ofits.h"
12#include "zfits.h"
13#include "Queue.h"
14#include "MemoryManager.h"
15
16#ifdef USE_BOOST_THREADS
17#include <boost/thread.hpp>
18#endif
19
20#ifndef __MARS__
21namespace std
22{
23#else
24using namespace std;
25#endif
26
27class zofits : public ofits
28{
29 /// Overriding of the begin() operator to get the smallest item in the list instead of the true begin
30 template<class S>
31 struct QueueMin : std::list<S>
32 {
33 typename std::list<S>::iterator begin()
34 {
35 return min_element(std::list<S>::begin(), std::list<S>::end());
36 }
37 };
38
39
40 //catalog types
41 struct CatalogEntry
42 {
43 CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) {};
44 int64_t first; ///< Size of this column in the tile
45 int64_t second; ///< offset of this column in the tile, from the start of the heap area
46 } __attribute__((__packed__));
47
48 typedef vector<CatalogEntry> CatalogRow;
49 typedef list<CatalogRow> CatalogType;
50
51
52 /// Parameters required to write a tile to disk
53 struct WriteTarget
54 {
55 bool operator < (const WriteTarget& other)
56 {
57 return tile_num < other.tile_num;
58 }
59
60 uint32_t tile_num; ///< Tile index of the data (to make sure that they are written in the correct order)
61 uint32_t size; ///< Size to write
62 shared_ptr<char> data; ///< Memory block to write
63 };
64
65
66 /// Parameters required to compress a tile of data
67 struct CompressionTarget
68 {
69 CompressionTarget(CatalogRow& r) : catalog_entry(r)
70 {}
71
72 CatalogRow& catalog_entry; ///< Reference to the catalog entry to deal with
73 shared_ptr<char> src; ///< Original data
74 shared_ptr<char> transposed_src; ///< Transposed data
75 WriteTarget target; ///< Compressed data
76 uint32_t num_rows; ///< Number of rows to compress
77 };
78
79public:
80 /// static setter for the default number of threads to use. -1 means all available physical cores
81 static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)<-1) n=_n; return n; }
82 static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
83 static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
84 static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
85
86 /// constructors
87 /// @param numTiles how many data groups should be pre-reserved ?
88 /// @param rowPerTile how many rows will be grouped together in a single tile
89 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
90 zofits(uint32_t numTiles = DefaultMaxNumTiles(),
91 uint32_t rowPerTile = DefaultNumRowsPerTile(),
92 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
93 fMemPool(0, maxUsableMem*1000),
94 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
95 {
96 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
97 SetNumThreads(DefaultNumThreads());
98 }
99
100 /// @param fname the target filename
101 /// @param numTiles how many data groups should be pre-reserved ?
102 /// @param rowPerTile how many rows will be grouped together in a single tile
103 /// @param maxUsableMem how many bytes of memory can be used by the compression buffers
104 zofits(const char* fname,
105 uint32_t numTiles = DefaultMaxNumTiles(),
106 uint32_t rowPerTile = DefaultNumRowsPerTile(),
107 uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(fname),
108 fMemPool(0, maxUsableMem*1000),
109 fWriteToDiskQueue(bind(&zofits::WriteBufferToDisk, this, placeholders::_1), false)
110 {
111 InitMemberVariables(numTiles, rowPerTile, maxUsableMem*1000);
112 SetNumThreads(DefaultNumThreads());
113 }
114
115 //initialization of member variables
116 /// @param nt number of tiles
117 /// @param rpt number of rows per tile
118 /// @param maxUsableMem max amount of RAM to be used by the compression buffers
119 void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
120 {
121 if (nt == 0)
122 throw runtime_error("There must be at least 1 tile of data (0 specified). This is required by the FITS standard. Please try again with num_tile >= 1.");
123
124 fCheckOffset = 0;
125 fNumQueues = 0;
126
127 fNumTiles = nt;
128 fNumRowsPerTile = rpt;
129
130 fRealRowWidth = 0;
131 fCatalogOffset = 0;
132 fCatalogSize = 0;
133
134 fMaxUsableMem = maxUsableMem;
135#ifdef __EXCEPTIONS
136 fThreadsException = exception_ptr();
137#endif
138 }
139
140 /// write the header of the binary table
141 /// @param name the name of the table to be created
142 /// @return the state of the file
143 virtual bool WriteTableHeader(const char* name="DATA")
144 {
145 reallocateBuffers();
146
147 ofits::WriteTableHeader(name);
148
149 if (fNumQueues != 0)
150 {
151 //start the compression queues
152 for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
153 it->start();
154
155 //start the disk writer
156 fWriteToDiskQueue.start();
157 }
158
159 //mark that no tile has been written so far
160 fLatestWrittenTile = -1;
161
162 return good();
163 }
164
165 /// open a new file.
166 /// @param filename the name of the file
167 /// @param Whether or not the name of the extension should be added or not
168 void open(const char* filename, bool addEXTNAMEKey=true)
169 {
170 ofits::open(filename, addEXTNAMEKey);
171
172 //add compression-related header entries
173 SetBool( "ZTABLE", true, "Table is compressed");
174 SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
175 SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
176 SetInt( "ZPCOUNT", 0, "");
177 SetInt( "ZHEAPPTR", 0, "");
178 SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
179 SetInt( "THEAP", 0, "");
180 SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
181 SetFloat("ZRATIO", 0, "Compression ratio");
182 SetInt( "ZSHRINK", 1, "Catalog shrink factor");
183
184 fCatalogSize = 0;
185 fCatalog.clear();
186 fRawSum.reset();
187 }
188
189 /// Super method. does nothing as zofits does not know about DrsOffsets
190 /// @return the state of the file
191 virtual bool WriteDrsOffsetsTable()
192 {
193 return good();
194 }
195
196 /// Returns the number of bytes per uncompressed row
197 /// @return number of bytes per uncompressed row
198 uint32_t GetBytesPerRow() const
199 {
200 return fRealRowWidth;
201 }
202
203 /// Write the data catalog
204 /// @return the state of the file
205 bool WriteCatalog()
206 {
207 const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
208 const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;//fCatalog.size()*one_catalog_row_size;
209
210 // swap the catalog bytes before writing
211 vector<char> swapped_catalog(total_catalog_size);
212 uint32_t shift = 0;
213 for (auto it=fCatalog.begin(); it!=fCatalog.end(); it++)
214 {
215 revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
216 shift += one_catalog_row_size;
217 }
218
219 if (fCatalogSize < fNumTiles)
220 memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
221
222 // first time writing ? remember where we are
223 if (fCatalogOffset == 0)
224 fCatalogOffset = tellp();
225
226 // remember where we came from
227 const off_t where_are_we = tellp();
228
229 // write to disk
230 seekp(fCatalogOffset);
231 write(swapped_catalog.data(), total_catalog_size);
232
233 if (where_are_we != fCatalogOffset)
234 seekp(where_are_we);
235
236 // udpate checksum
237 fCatalogSum.reset();
238 fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
239
240 return good();
241 }
242
243 /// Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets.
244 virtual void DrsOffsetCalibrate(char* )
245 {
246
247 }
248
249 CatalogRow& AddOneCatalogRow()
250 {
251 // add one row to the catalog
252 fCatalog.emplace_back(CatalogRow());
253 fCatalog.back().resize(fTable.num_cols);
254 for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
255 *it = CatalogEntry(0,0);
256
257 fCatalogSize++;
258
259 return fCatalog.back();
260 }
261 /// write one row of data
262 /// @param ptr the source buffer
263 /// @param the number of bytes to write
264 /// @return the state of the file. WARNING: with multithreading, this will most likely be the state of the file before the data is actually written
265 bool WriteRow(const void* ptr, size_t cnt, bool = true)
266 {
267 if (cnt != fRealRowWidth)
268 {
269#ifdef __EXCEPTIONS
270 throw runtime_error("Wrong size of row given to WriteRow");
271#else
272 gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << endl;
273 return false;
274#endif
275 }
276
277 //copy current row to pool or rows waiting for compression
278 char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
279 memcpy(target_location, ptr, fRealRowWidth);
280
281 //for now, make an extra copy of the data, for RAWSUM checksuming.
282 //Ideally this should be moved to the threads
283 //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
284 //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
285 const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
286 char* buffer = fRawSumBuffer.data() + rawOffset;
287 auto ib = fRawSumBuffer.begin();
288 auto ie = fRawSumBuffer.rbegin();
289 *ib++ = 0;
290 *ib++ = 0;
291 *ib++ = 0;
292 *ib = 0;
293
294 *ie++ = 0;
295 *ie++ = 0;
296 *ie++ = 0;
297 *ie = 0;
298
299 memcpy(buffer, ptr, fRealRowWidth);
300
301 fRawSum.add(fRawSumBuffer, false);
302
303 fTable.num_rows++;
304
305 DrsOffsetCalibrate(target_location);
306
307 if (fTable.num_rows % fNumRowsPerTile == 0)
308 {
309 CompressionTarget compress_target(AddOneCatalogRow());
310 SetNextCompression(compress_target);
311
312 if (fNumQueues == 0)
313 { //no worker threads. do everything in-line
314 const uint64_t size_to_write = CompressBuffer(compress_target);
315
316 WriteTarget write_target;
317 write_target.size = size_to_write;
318 write_target.data = compress_target.target.data;
319 write_target.tile_num = compress_target.target.tile_num;
320
321 return WriteBufferToDisk(write_target);
322 }
323 else
324 {
325 //if all queues are empty, use queue 0
326 uint32_t min_index = 0;
327 uint32_t min_size = numeric_limits<uint32_t>::max();
328 uint32_t current_index = 0;
329
330 for (auto it=fCompressionQueues.cbegin(); it!=fCompressionQueues.cend(); it++)
331 {
332 if (it->size() < min_size)
333 {
334 min_index = current_index;
335 min_size = it->size();
336 }
337 current_index++;
338 }
339
340 if (!fCompressionQueues[min_index].post(compress_target))
341 throw runtime_error("The compression queues are not started. Did you close the file before writing this row ?");
342 }
343 }
344
345 return good();
346 }
347
348 /// update the real number of rows
349 void FlushNumRows()
350 {
351 SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
352 SetInt("ZNAXIS2", fTable.num_rows);
353 FlushHeader();
354 }
355
356 /// Setup the environment to compress yet another tile of data
357 /// @param target the struct where to host the produced parameters
358 void SetNextCompression(CompressionTarget& target)
359 {
360 //get space for transposed data
361 shared_ptr<char> transposed_data = fMemPool.malloc();
362
363 //fill up write to disk target
364 WriteTarget write_target;
365 write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
366 write_target.size = 0;
367 write_target.data = fMemPool.malloc();
368
369 //fill up compression target
370 target.src = fSmartBuffer;
371 target.transposed_src = transposed_data;
372 target.target = write_target;
373 target.num_rows = fTable.num_rows;
374
375 //get a new buffer to host the incoming data
376 fSmartBuffer = fMemPool.malloc();
377 }
378
379 /// Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file.
380 void ShrinkCatalog()
381 {
382 //add empty row to get either the target number of rows, or a multiple of the allowed size
383 for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
384 AddOneCatalogRow();
385
386 //did we write more rows than what the catalog could host ?
387 if (fCatalogSize > fNumTiles)
388 {
389 const uint32_t shrink_factor = fCatalogSize / fNumTiles; //always exact as extra rows were added just above
390
391 //shrink the catalog !
392 uint32_t entry_id = 1;
393 auto it = fCatalog.begin();
394 it++;
395 for (; it != fCatalog.end(); it++)
396 {
397 if (entry_id >= fNumTiles) break;
398
399 uint32_t target_id = entry_id*shrink_factor;
400
401 auto jt = it;
402 for (uint32_t i=0;i<target_id-entry_id;i++)
403 jt++;
404
405 *it = *jt;
406
407 entry_id++;
408 }
409
410 const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
411 //remove the too many entries
412 for (uint32_t i=0;i<num_tiles_to_remove;i++)
413 {
414 fCatalog.pop_back();
415 fCatalogSize--;
416 }
417 //update header keywords
418 fNumRowsPerTile *= shrink_factor;
419
420 SetInt("ZTILELEN", fNumRowsPerTile);
421 SetInt("ZSHRINK", shrink_factor);
422 }
423 }
424
425 /// close an open file.
426 /// @return the state of the file
427 bool close()
428 {
429 // stop compression and write threads
430 for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
431 it->wait();
432
433 fWriteToDiskQueue.wait();
434
435 if (tellp() < 0)
436 {
437#ifdef __EXCEPTIONS
438 throw runtime_error("Looks like the file has been closed already");
439#else
440 return false;
441#endif
442 }
443
444#ifdef __EXCEPTIONS
445 //check if something hapenned while the compression threads were working
446 if (fThreadsException != exception_ptr())
447 {
448 //if so, re-throw the exception that was generated
449 rethrow_exception(fThreadsException);
450 }
451#endif
452
453 //write the last tile of data (if any
454 if (fTable.num_rows%fNumRowsPerTile != 0)
455 {
456 CompressionTarget compress_target(AddOneCatalogRow());
457 SetNextCompression(compress_target);
458
459 //set number of threads to zero before calling compressBuffer
460 int32_t backup_num_queues = fNumQueues;
461 fNumQueues = 0;
462 uint64_t size_to_write = CompressBuffer(compress_target);
463 fNumQueues = backup_num_queues;
464
465 WriteTarget write_target;
466 write_target.size = size_to_write;
467 write_target.data = compress_target.target.data;
468 write_target.tile_num = compress_target.target.tile_num;
469
470 if (!WriteBufferToDisk(write_target))
471 throw runtime_error("Something went wrong while writing the last tile...");
472 }
473
474 AlignTo2880Bytes();
475
476 int64_t heap_size = 0;
477 int64_t compressed_offset = 0;
478 for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
479 {
480 compressed_offset += sizeof(TileHeader);
481 heap_size += sizeof(TileHeader);
482 for (uint32_t j=0; j<it->size(); j++)
483 {
484 heap_size += (*it)[j].first;
485 (*it)[j].second = compressed_offset;
486 compressed_offset += (*it)[j].first;
487 if ((*it)[j].first == 0)
488 (*it)[j].second = 0;
489 }
490 }
491
492 ShrinkCatalog();
493
494 //update header keywords
495 SetInt("ZNAXIS1", fRealRowWidth);
496 SetInt("ZNAXIS2", fTable.num_rows);
497
498 SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
499
500 const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
501 const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
502
503 SetInt("THEAP", total_num_tiles_written*total_catalog_width);
504 SetInt("NAXIS1", total_catalog_width);
505 SetInt("NAXIS2", total_num_tiles_written);
506
507 ostringstream str;
508 str << fRawSum.val();
509 SetStr("RAWSUM", str.str());
510
511 const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
512 SetFloat("ZRATIO", compression_ratio);
513
514 //add to the heap size the size of the gap between the catalog and the actual heap
515 heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
516
517 SetInt("PCOUNT", heap_size, "size of special data area");
518
519 //Just for updating the fCatalogSum value
520 WriteCatalog();
521
522 fDataSum += fCatalogSum;
523
524 const Checksum checksm = UpdateHeaderChecksum();
525
526 ofstream::close();
527
528 if ((checksm+fDataSum).valid())
529 return true;
530
531 ostringstream sout;
532 sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
533#ifdef __EXCEPTIONS
534 throw runtime_error(sout.str());
535#else
536 gLog << ___err___ << "ERROR - " << sout.str() << endl;
537 return false;
538#endif
539 }
540
541 /// Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options for this column
542 bool AddColumn(uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
543 {
544 return AddColumn(kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
545 }
546
547 /// Overload of the simplified compressed version
548 bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const string& name, const string& unit, const string& comment="", bool addHeaderKeys=true)
549 {
550 if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
551 return false;
552
553 Table::Column col;
554 size_t size = SizeFromType(typechar);
555
556 col.name = name;
557 col.type = typechar;
558 col.num = cnt;
559 col.size = size;
560 col.offset = fRealRowWidth;
561
562 fRealRowWidth += size*cnt;
563
564 fRealColumns.emplace_back(CompressedColumn(col, comp));
565
566 ostringstream strKey, strVal, strCom;
567 strKey << "ZFORM" << fRealColumns.size();
568 strVal << cnt << typechar;
569 strCom << "format of " << name << " [" << CommentFromType(typechar);
570 SetStr(strKey.str(), strVal.str(), strCom.str());
571
572 strKey.str("");
573 strVal.str("");
574 strCom.str("");
575 strKey << "ZCTYP" << fRealColumns.size();
576 strVal << "FACT";
577 strCom << "Compression type FACT";
578 SetStr(strKey.str(), strVal.str(), strCom.str());
579
580 return true;
581 }
582
583 /// Get and set the actual number of threads for this object
584 int32_t GetNumThreads() const { return fNumQueues;}
585 bool SetNumThreads(uint32_t num)
586 {
587 if (is_open())
588 {
589#ifdef __EXCEPTIONS
590 throw runtime_error("File must be closed before changing the number of compression threads");
591#else
592 gLog << ___err___ << "ERROR - File must be closed before changing the number of compression threads" << endl;
593#endif
594 return false;
595 }
596
597 //get number of physically available threads
598#ifdef USE_BOOST_THREADS
599 unsigned int num_available_cores = boost::thread::hardware_concurrency();
600#else
601 unsigned int num_available_cores = thread::hardware_concurrency();
602#endif
603 // could not detect number of available cores from system properties...
604 if (num_available_cores == 0)
605 num_available_cores = 1;
606
607 // leave one core for the main thread and one for the writing
608 if (num > num_available_cores)
609 num = num_available_cores>2 ? num_available_cores-2 : 1;
610
611 if (fCompressionQueues.size() != uint32_t(num))
612 {
613 fCompressionQueues.resize(num, Queue<CompressionTarget>(bind(&zofits::CompressBuffer, this, placeholders::_1), false));
614 fNumQueues = num;
615 }
616
617 return true;
618 }
619
620protected:
621
622 /// Allocates the required objects.
623 void reallocateBuffers()
624 {
625 const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + fRealColumns.size()*sizeof(BlockHeader) + sizeof(TileHeader) + 8; //+8 for checksuming;
626 fMemPool.setChunkSize(chunk_size);
627
628 fSmartBuffer = fMemPool.malloc();
629 fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
630 }
631
632 /// Actually does the writing to disk (and checksuming)
633 /// @param src the buffer to write
634 /// @param sizeToWrite how many bytes should be written
635 /// @return the state of the file
636 bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
637 {
638 char* checkSumPointer = src+4;
639 int32_t extraBytes = 0;
640 uint32_t sizeToChecksum = sizeToWrite;
641 if (fCheckOffset != 0)
642 {//should we extend the array to the left ?
643 sizeToChecksum += fCheckOffset;
644 checkSumPointer -= fCheckOffset;
645 memset(checkSumPointer, 0, fCheckOffset);
646 }
647 if (sizeToChecksum%4 != 0)
648 {//should we extend the array to the right ?
649 extraBytes = 4 - (sizeToChecksum%4);
650 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
651 sizeToChecksum += extraBytes;
652 }
653
654 //do the checksum
655 fDataSum.add(checkSumPointer, sizeToChecksum);
656
657 fCheckOffset = (4 - extraBytes)%4;
658
659 //write data to disk
660 write(src+4, sizeToWrite);
661
662 return good();
663 }
664
665 /// Compress a given buffer based on the target. This is the method executed by the threads
666 /// @param target the struct hosting the parameters of the compression
667 /// @return number of bytes of the compressed data, or always 1 when used by the Queues
668 uint32_t CompressBuffer(const CompressionTarget& target)
669 {
670 uint64_t compressed_size = 0;
671
672 //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
673 //Really do not understand what's wrong...
674 //calibrate data if required
675 //const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
676// for (uint32_t i=0;i<thisRoundNumRows;i++)
677// {
678// char* target_location = target.src.get()->get() + fRealRowWidth*i;
679// cout << "Target Location there...." << hex << static_cast<void*>(target_location) << endl;
680// DrsOffsetCalibrate(target_location);
681// }
682
683#ifdef __EXCEPTIONS
684 try
685 {
686#endif
687 //transpose the original data
688 copyTransposeTile(target.src.get(), target.transposed_src.get());
689
690 //compress the buffer
691 compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
692#ifdef __EXCEPTIONS
693 }
694 catch (...)
695 {
696 fThreadsException = current_exception();
697 if (fNumQueues == 0)
698 rethrow_exception(fThreadsException);
699 }
700#endif
701
702 if (fNumQueues == 0)
703 return compressed_size;
704
705 //post the result to the writing queue
706 //get a copy so that it becomes non-const
707 WriteTarget wt;
708 wt.tile_num = target.target.tile_num;
709 wt.size = compressed_size;
710 wt.data = target.target.data;
711
712 fWriteToDiskQueue.post(wt);
713
714 // if used by the queue, always return true as the elements are not ordered
715 return 1;
716 }
717
718 /// Write one compressed tile to disk. This is the method executed by the writing thread
719 /// @param target the struct hosting the write parameters
720 bool WriteBufferToDisk(const WriteTarget& target)
721 {
722 //is this the tile we're supposed to write ?
723 if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
724 return false;
725
726 fLatestWrittenTile++;
727
728#ifdef __EXCEPTIONS
729 try
730 {
731#endif
732 if (!writeCompressedDataToDisk(target.data.get(), target.size))
733 {//could not write the data to disk
734 ostringstream str;
735 str << "An error occured while writing to disk: ";
736 if (eof())
737 str << "End-Of-File";
738 if (failbit)
739 str << "Logical error on i/o operation";
740 if (badbit)
741 str << "Writing error on i/o operation";
742#ifdef __EXCEPTIONS
743 throw runtime_error(str.str());
744#else
745 gLog << ___err___ << "ERROR - " << str.str() << endl;
746#endif
747 }
748#ifdef __EXCEPTIONS
749 }
750 catch(...)
751 {
752 fThreadsException = current_exception();
753 if (fNumQueues == 0)
754 rethrow_exception(fThreadsException);
755 }
756#endif
757 return true;
758 }
759
760 /// Compress a given buffer based on its source and destination
761 //src cannot be const, as applySMOOTHING is done in place
762 /// @param dest the buffer hosting the compressed data
763 /// @param src the buffer hosting the transposed data
764 /// @param num_rows the number of uncompressed rows in the transposed buffer
765 /// @param the number of bytes of the compressed data
766 uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
767 {
768 const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
769 uint32_t offset = 0;
770
771 //skip the checksum reserved area
772 dest += 4;
773
774 //skip the 'TILE' marker and tile size entry
775 uint64_t compressedOffset = sizeof(TileHeader);
776
777 //now compress each column one by one by calling compression on arrays
778 for (uint32_t i=0;i<fRealColumns.size();i++)
779 {
780 catalog_row[i].second = compressedOffset;
781
782 if (fRealColumns[i].col.num == 0) continue;
783
784 Compression& head = fRealColumns[i].block_head;
785
786 //set the default byte telling if uncompressed the compressed Flag
787 const uint64_t previousOffset = compressedOffset;
788
789 //skip header data
790 compressedOffset += head.getSizeOnDisk();
791
792 for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
793 {
794 switch (head.getProc(j))
795 {
796 case kFactRaw:
797 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
798 break;
799 case kFactSmoothing:
800 applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
801 break;
802 case kFactHuffman16:
803 if (head.getOrdering() == kOrderByCol)
804 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
805 else
806 compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
807 break;
808 }
809 }
810
811 //check if compressed size is larger than uncompressed
812 if ((head.getProc(0) != kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
813 {//if so set flag and redo it uncompressed
814 //de-smooth !
815 if (head.getProc(0) == kFactSmoothing)
816 UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
817
818 Compression he;
819
820 compressedOffset = previousOffset + he.getSizeOnDisk();
821 compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
822
823 he.SetBlockSize(compressedOffset - previousOffset);
824 he.Memcpy(dest+previousOffset);
825
826 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
827
828 catalog_row[i].first = compressedOffset - catalog_row[i].second;
829 continue;
830 }
831
832 head.SetBlockSize(compressedOffset - previousOffset);
833 head.Memcpy(dest + previousOffset);
834
835 offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
836 catalog_row[i].first = compressedOffset - catalog_row[i].second;
837 }
838
839 TileHeader tile_head(thisRoundNumRows, compressedOffset);
840 memcpy(dest, &tile_head, sizeof(TileHeader));
841
842 return compressedOffset;
843 }
844
845 /// Transpose a tile to a new buffer
846 /// @param src buffer hosting the regular, row-ordered data
847 /// @param dest the target buffer that will receive the transposed data
848 void copyTransposeTile(const char* src, char* dest)
849 {
850 const uint32_t thisRoundNumRows = (fTable.num_rows%fNumRowsPerTile) ? fTable.num_rows%fNumRowsPerTile : fNumRowsPerTile;
851
852 //copy the tile and transpose it
853 for (uint32_t i=0;i<fRealColumns.size();i++)
854 {
855 switch (fRealColumns[i].block_head.getOrdering())
856 {
857 case kOrderByRow:
858 for (uint32_t k=0;k<thisRoundNumRows;k++)
859 {//regular, "semi-transposed" copy
860 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
861 dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
862 }
863 break;
864
865 case kOrderByCol :
866 for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
867 for (uint32_t k=0;k<thisRoundNumRows;k++)
868 {//transposed copy
869 memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
870 dest += fRealColumns[i].col.size;
871 }
872 break;
873 };
874 }
875 }
876
877 /// Specific compression functions
878 /// @param dest the target buffer
879 /// @param src the source buffer
880 /// @param size number of bytes to copy
881 /// @return number of bytes written
882 uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
883 {
884 memcpy(dest, src, size);
885 return size;
886 }
887
888 /// Do huffman encoding
889 /// @param dest the buffer that will receive the compressed data
890 /// @param src the buffer hosting the transposed data
891 /// @param numRows number of rows of data in the transposed buffer
892 /// @param sizeOfElems size in bytes of one data elements
893 /// @param numRowElems number of elements on each row
894 /// @return number of bytes written
895 uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
896 {
897 string huffmanOutput;
898 uint32_t previousHuffmanSize = 0;
899 if (numRows < 2)
900 {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
901 return numRows*sizeOfElems*numRowElems + 1000;
902 }
903 if (sizeOfElems < 2 )
904 {
905#ifdef __EXCEPTIONS
906 throw runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
907#else
908 gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << endl;
909 return 0;
910#endif
911 }
912 uint32_t huffmanOffset = 0;
913 for (uint32_t j=0;j<numRowElems;j++)
914 {
915 Huffman::Encode(huffmanOutput,
916 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
917 numRows*(sizeOfElems/2));
918 reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
919 huffmanOffset += sizeof(uint32_t);
920 previousHuffmanSize = huffmanOutput.size();
921 }
922 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
923
924 //only copy if not larger than not-compressed size
925 if (totalSize < numRows*sizeOfElems*numRowElems)
926 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
927
928 return totalSize;
929 }
930
931 /// Applies Thomas' DRS4 smoothing
932 /// @param data where to apply it
933 /// @param numElems how many elements of type int16_t are stored in the buffer
934 /// @return number of bytes modified
935 uint32_t applySMOOTHING(char* data, uint32_t numElems)
936 {
937 int16_t* short_data = reinterpret_cast<int16_t*>(data);
938 for (int j=numElems-1;j>1;j--)
939 short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
940
941 return numElems*sizeof(int16_t);
942 }
943
944 /// Apply the inverse transform of the integer smoothing
945 /// @param data where to apply it
946 /// @param numElems how many elements of type int16_t are stored in the buffer
947 /// @return number of bytes modified
948 uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
949 {
950 int16_t* short_data = reinterpret_cast<int16_t*>(data);
951 //un-do the integer smoothing
952 for (uint32_t j=2;j<numElems;j++)
953 short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
954
955 return numElems*sizeof(uint16_t);
956 }
957
958
959
960 //thread related stuff
961 MemoryManager fMemPool; ///< Actual memory manager, providing memory for the compression buffers
962 int32_t fNumQueues; ///< Current number of threads that will be used by this object
963 uint64_t fMaxUsableMem; ///< Maximum number of bytes that can be allocated by the memory manager
964 int32_t fLatestWrittenTile; ///< Index of the last tile written to disk (for correct ordering while using several threads)
965
966 vector<Queue<CompressionTarget>> fCompressionQueues; ///< Processing queues (=threads)
967 Queue<WriteTarget, QueueMin<WriteTarget>> fWriteToDiskQueue; ///< Writing queue (=thread)
968
969 // catalog related stuff
970 CatalogType fCatalog; ///< Catalog for this file
971 uint32_t fCatalogSize; ///< Actual catalog size (.size() is slow on large lists)
972 uint32_t fNumTiles; ///< Number of pre-reserved tiles
973 uint32_t fNumRowsPerTile; ///< Number of rows per tile
974 off_t fCatalogOffset; ///< Offset of the catalog from the beginning of the file
975
976 // checksum related stuff
977 Checksum fCatalogSum; ///< Checksum of the catalog
978 Checksum fRawSum; ///< Raw sum (specific to FACT)
979 int32_t fCheckOffset; ///< offset to the data pointer to calculate the checksum
980
981 // data layout related stuff
982 /// Regular columns augmented with compression informations
983 struct CompressedColumn
984 {
985 CompressedColumn(const Table::Column& c, const Compression& h) : col(c),
986 block_head(h)
987 {}
988 Table::Column col; ///< the regular column entry
989 Compression block_head; ///< the compression data associated with that column
990 };
991 vector<CompressedColumn> fRealColumns; ///< Vector hosting the columns of the file
992 uint32_t fRealRowWidth; ///< Width in bytes of one uncompressed row
993 shared_ptr<char> fSmartBuffer; ///< Smart pointer to the buffer where the incoming rows are written
994 vector<char> fRawSumBuffer; ///< buffer used for checksuming the incoming data, before compression
995
996#ifdef __EXCEPTIONS
997 exception_ptr fThreadsException; ///< exception pointer to store exceptions coming from the threads
998#endif
999
1000};
1001
1002#ifndef __MARS__
1003}; //namespace std
1004#endif
1005
1006#endif
Note: See TracBrowser for help on using the repository browser.